mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Send StoredStatQueue instead of StatQueue when replicate fixes #1864
This commit is contained in:
@@ -74,6 +74,7 @@ var (
|
||||
testInternalMatchThreshold,
|
||||
testInternalAccountBalanceOperations,
|
||||
testInternalSetAccount,
|
||||
testInternalReplicateStats,
|
||||
testInternalRemoteITKillEngine,
|
||||
}
|
||||
)
|
||||
@@ -926,6 +927,55 @@ func testInternalSetAccount(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func testInternalReplicateStats(t *testing.T) {
|
||||
var reply string
|
||||
|
||||
statConfig = &StatQueueWithCache{
|
||||
StatQueueProfile: &engine.StatQueueProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "StatsToReplicate",
|
||||
FilterIDs: []string{"*string:~*req.Account:1001"},
|
||||
ActivationInterval: &utils.ActivationInterval{
|
||||
ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC),
|
||||
},
|
||||
QueueLength: 10,
|
||||
TTL: time.Duration(10) * time.Second,
|
||||
Metrics: []*engine.MetricWithFilters{
|
||||
&engine.MetricWithFilters{
|
||||
MetricID: "*acd",
|
||||
},
|
||||
&engine.MetricWithFilters{
|
||||
MetricID: "*tcd",
|
||||
},
|
||||
},
|
||||
ThresholdIDs: []string{"*none"},
|
||||
Weight: 20,
|
||||
MinItems: 1,
|
||||
},
|
||||
}
|
||||
|
||||
if err := internalRPC.Call(utils.ApierV1SetStatQueueProfile, statConfig, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Error("Unexpected reply returned", reply)
|
||||
}
|
||||
var rcv *engine.StatQueueProfile
|
||||
if err := engineOneRPC.Call(utils.ApierV1GetStatQueueProfile,
|
||||
&utils.TenantID{Tenant: "cgrates.org", ID: "StatsToReplicate"}, &rcv); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(statConfig.StatQueueProfile, rcv) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(statConfig.StatQueueProfile), utils.ToJSON(rcv))
|
||||
}
|
||||
|
||||
if err := engineTwoRPC.Call(utils.ApierV1GetStatQueueProfile,
|
||||
&utils.TenantID{Tenant: "cgrates.org", ID: "StatsToReplicate"}, &rcv); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(statConfig.StatQueueProfile, rcv) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(statConfig.StatQueueProfile), utils.ToJSON(rcv))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func testInternalRemoteITKillEngine(t *testing.T) {
|
||||
if err := engine.KillEngine(100); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
@@ -365,8 +365,8 @@ func (rplSv1 *ReplicatorSv1) SetReverseDestination(dst *engine.Destination, repl
|
||||
}
|
||||
|
||||
// SetDestination
|
||||
func (rplSv1 *ReplicatorSv1) SetStatQueue(sq *engine.StatQueue, reply *string) error {
|
||||
if err := rplSv1.dm.DataDB().SetStatQueueDrv(sq); err != nil {
|
||||
func (rplSv1 *ReplicatorSv1) SetStatQueue(ssq *engine.StoredStatQueue, reply *string) error {
|
||||
if err := rplSv1.dm.DataDB().SetStatQueueDrv(ssq, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = utils.OK
|
||||
|
||||
@@ -90,10 +90,12 @@ var (
|
||||
|
||||
// NewDataManager returns a new DataManager
|
||||
func NewDataManager(dataDB DataDB, cacheCfg config.CacheCfg, connMgr *ConnManager) *DataManager {
|
||||
ms, _ := NewMarshaler(config.CgrConfig().GeneralCfg().DBDataEncoding)
|
||||
return &DataManager{
|
||||
dataDB: dataDB,
|
||||
cacheCfg: cacheCfg,
|
||||
connMgr: connMgr,
|
||||
ms: ms,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,6 +105,7 @@ type DataManager struct {
|
||||
dataDB DataDB
|
||||
cacheCfg config.CacheCfg
|
||||
connMgr *ConnManager
|
||||
ms Marshaler
|
||||
}
|
||||
|
||||
// DataDB exports access to dataDB
|
||||
@@ -459,7 +462,14 @@ func (dm *DataManager) GetStatQueue(tenant, id string,
|
||||
if err == utils.ErrNotFound && config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues].Remote {
|
||||
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RmtConns, nil, utils.ReplicatorSv1GetStatQueue,
|
||||
&utils.TenantID{Tenant: tenant, ID: id}, sq); err == nil {
|
||||
err = dm.dataDB.SetStatQueueDrv(sq)
|
||||
var ssq *StoredStatQueue
|
||||
if dm.dataDB.GetStorageType() != utils.MetaInternal {
|
||||
// in case of internal we don't marshal
|
||||
if ssq, err = NewStoredStatQueue(sq, dm.ms); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
err = dm.dataDB.SetStatQueueDrv(ssq, sq)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
@@ -481,13 +491,21 @@ func (dm *DataManager) GetStatQueue(tenant, id string,
|
||||
|
||||
// SetStatQueue converts to StoredStatQueue and stores the result in dataDB
|
||||
func (dm *DataManager) SetStatQueue(sq *StatQueue) (err error) {
|
||||
if err = dm.dataDB.SetStatQueueDrv(sq); err != nil {
|
||||
var ssq *StoredStatQueue
|
||||
if dm.dataDB.GetStorageType() != utils.MetaInternal ||
|
||||
config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues].Replicate {
|
||||
// in case of internal we don't marshal
|
||||
if ssq, err = NewStoredStatQueue(sq, dm.ms); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
if err = dm.dataDB.SetStatQueueDrv(ssq, sq); err != nil {
|
||||
return
|
||||
}
|
||||
if config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues].Replicate {
|
||||
var reply string
|
||||
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
|
||||
utils.ReplicatorSv1SetStatQueue, sq, &reply); err != nil {
|
||||
utils.ReplicatorSv1SetStatQueue, ssq, &reply); err != nil {
|
||||
err = utils.CastRPCErr(err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -104,7 +104,7 @@ type DataDB interface {
|
||||
SetStatQueueProfileDrv(sq *StatQueueProfile) (err error)
|
||||
RemStatQueueProfileDrv(tenant, id string) (err error)
|
||||
GetStatQueueDrv(tenant, id string) (sq *StatQueue, err error)
|
||||
SetStatQueueDrv(sq *StatQueue) (err error)
|
||||
SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (err error)
|
||||
RemStatQueueDrv(tenant, id string) (err error)
|
||||
GetThresholdProfileDrv(tenant string, ID string) (tp *ThresholdProfile, err error)
|
||||
SetThresholdProfileDrv(tp *ThresholdProfile) (err error)
|
||||
|
||||
@@ -266,16 +266,19 @@ type InternalDB struct {
|
||||
prefixIndexedFields []string
|
||||
indexedFieldsMutex sync.RWMutex // used for reload
|
||||
cnter *utils.Counter // used for OrderID for cdr
|
||||
ms Marshaler
|
||||
}
|
||||
|
||||
// NewInternalDB constructs an InternalDB
|
||||
func NewInternalDB(stringIndexedFields, prefixIndexedFields []string,
|
||||
isDataDB bool, itemsCacheCfg map[string]*config.ItemOpt) (iDB *InternalDB) {
|
||||
ms, _ := NewMarshaler(config.CgrConfig().GeneralCfg().DBDataEncoding)
|
||||
iDB = &InternalDB{
|
||||
db: ltcache.NewTransCache(newInternalDBCfg(itemsCacheCfg, isDataDB)),
|
||||
stringIndexedFields: stringIndexedFields,
|
||||
prefixIndexedFields: prefixIndexedFields,
|
||||
cnter: utils.NewCounter(time.Now().UnixNano(), 0),
|
||||
ms: ms,
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -1120,7 +1123,13 @@ func (iDB *InternalDB) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err er
|
||||
}
|
||||
return x.(*StatQueue), nil
|
||||
}
|
||||
func (iDB *InternalDB) SetStatQueueDrv(sq *StatQueue) (err error) {
|
||||
func (iDB *InternalDB) SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (err error) {
|
||||
if sq == nil {
|
||||
sq, err = ssq.AsStatQueue(iDB.ms)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
iDB.db.Set(utils.CacheStatQueues, utils.ConcatenatedKey(sq.Tenant, sq.ID), sq, nil,
|
||||
cacheCommit(utils.NonTransactional), utils.NonTransactional)
|
||||
return
|
||||
|
||||
@@ -1936,13 +1936,9 @@ func (ms *MongoStorage) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err e
|
||||
}
|
||||
|
||||
// SetStatQueueDrv stores the metrics for a StoredStatQueue
|
||||
func (ms *MongoStorage) SetStatQueueDrv(sq *StatQueue) (err error) {
|
||||
var ssq *StoredStatQueue
|
||||
if ssq, err = NewStoredStatQueue(sq, ms.ms); err != nil {
|
||||
return
|
||||
}
|
||||
func (ms *MongoStorage) SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (err error) {
|
||||
return ms.query(func(sctx mongo.SessionContext) (err error) {
|
||||
_, err = ms.getCol(ColSqs).UpdateOne(sctx, bson.M{"tenant": sq.Tenant, "id": sq.ID},
|
||||
_, err = ms.getCol(ColSqs).UpdateOne(sctx, bson.M{"tenant": ssq.Tenant, "id": ssq.ID},
|
||||
bson.M{"$set": ssq},
|
||||
options.Update().SetUpsert(true),
|
||||
)
|
||||
|
||||
@@ -103,13 +103,9 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string,
|
||||
return client, nil
|
||||
}
|
||||
|
||||
var mrshler Marshaler
|
||||
if mrshlerStr == utils.MSGPACK {
|
||||
mrshler = NewCodecMsgpackMarshaler()
|
||||
} else if mrshlerStr == utils.JSON {
|
||||
mrshler = new(JSONMarshaler)
|
||||
} else {
|
||||
return nil, fmt.Errorf("Unsupported marshaler: %v", mrshlerStr)
|
||||
ms, err := NewMarshaler(mrshlerStr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if sentinelName != "" {
|
||||
@@ -131,7 +127,7 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string,
|
||||
}
|
||||
return &RedisStorage{
|
||||
maxConns: maxConns,
|
||||
ms: mrshler,
|
||||
ms: ms,
|
||||
sentinelName: sentinelName,
|
||||
sentinelInsts: sentinelInsts,
|
||||
db: db,
|
||||
@@ -144,7 +140,7 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string,
|
||||
return &RedisStorage{
|
||||
dbPool: p,
|
||||
maxConns: maxConns,
|
||||
ms: mrshler,
|
||||
ms: ms,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
@@ -1379,11 +1375,7 @@ func (rs *RedisStorage) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err e
|
||||
}
|
||||
|
||||
// SetStoredStatQueue stores the metrics for a StatsQueue
|
||||
func (rs *RedisStorage) SetStatQueueDrv(sq *StatQueue) (err error) {
|
||||
var ssq *StoredStatQueue
|
||||
if ssq, err = NewStoredStatQueue(sq, rs.ms); err != nil {
|
||||
return
|
||||
}
|
||||
func (rs *RedisStorage) SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (err error) {
|
||||
var result []byte
|
||||
result, err = rs.ms.Marshal(ssq)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user