From b8b65bb0b8008c50f952f1fb0192e7775dc54e1c Mon Sep 17 00:00:00 2001 From: TeoV Date: Wed, 15 Jan 2020 14:20:04 +0200 Subject: [PATCH] Send StoredStatQueue instead of StatQueue when replicate fixes #1864 --- apier/v1/dm_remote_it_test.go | 50 +++++++++++++++++++++++++++++++ apier/v1/replicator.go | 4 +-- engine/datamanager.go | 24 +++++++++++++-- engine/storage_interface.go | 2 +- engine/storage_internal_datadb.go | 11 ++++++- engine/storage_mongo_datadb.go | 8 ++--- engine/storage_redis.go | 20 ++++--------- 7 files changed, 92 insertions(+), 27 deletions(-) diff --git a/apier/v1/dm_remote_it_test.go b/apier/v1/dm_remote_it_test.go index 9e0b5aff6..02562ebfd 100644 --- a/apier/v1/dm_remote_it_test.go +++ b/apier/v1/dm_remote_it_test.go @@ -74,6 +74,7 @@ var ( testInternalMatchThreshold, testInternalAccountBalanceOperations, testInternalSetAccount, + testInternalReplicateStats, testInternalRemoteITKillEngine, } ) @@ -926,6 +927,55 @@ func testInternalSetAccount(t *testing.T) { } } +func testInternalReplicateStats(t *testing.T) { + var reply string + + statConfig = &StatQueueWithCache{ + StatQueueProfile: &engine.StatQueueProfile{ + Tenant: "cgrates.org", + ID: "StatsToReplicate", + FilterIDs: []string{"*string:~*req.Account:1001"}, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), + }, + QueueLength: 10, + TTL: time.Duration(10) * time.Second, + Metrics: []*engine.MetricWithFilters{ + &engine.MetricWithFilters{ + MetricID: "*acd", + }, + &engine.MetricWithFilters{ + MetricID: "*tcd", + }, + }, + ThresholdIDs: []string{"*none"}, + Weight: 20, + MinItems: 1, + }, + } + + if err := internalRPC.Call(utils.ApierV1SetStatQueueProfile, statConfig, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Error("Unexpected reply returned", reply) + } + var rcv *engine.StatQueueProfile + if err := engineOneRPC.Call(utils.ApierV1GetStatQueueProfile, + &utils.TenantID{Tenant: "cgrates.org", ID: "StatsToReplicate"}, &rcv); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(statConfig.StatQueueProfile, rcv) { + t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(statConfig.StatQueueProfile), utils.ToJSON(rcv)) + } + + if err := engineTwoRPC.Call(utils.ApierV1GetStatQueueProfile, + &utils.TenantID{Tenant: "cgrates.org", ID: "StatsToReplicate"}, &rcv); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(statConfig.StatQueueProfile, rcv) { + t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(statConfig.StatQueueProfile), utils.ToJSON(rcv)) + } + +} + func testInternalRemoteITKillEngine(t *testing.T) { if err := engine.KillEngine(100); err != nil { t.Error(err) diff --git a/apier/v1/replicator.go b/apier/v1/replicator.go index c0ee40659..c256cea88 100644 --- a/apier/v1/replicator.go +++ b/apier/v1/replicator.go @@ -365,8 +365,8 @@ func (rplSv1 *ReplicatorSv1) SetReverseDestination(dst *engine.Destination, repl } // SetDestination -func (rplSv1 *ReplicatorSv1) SetStatQueue(sq *engine.StatQueue, reply *string) error { - if err := rplSv1.dm.DataDB().SetStatQueueDrv(sq); err != nil { +func (rplSv1 *ReplicatorSv1) SetStatQueue(ssq *engine.StoredStatQueue, reply *string) error { + if err := rplSv1.dm.DataDB().SetStatQueueDrv(ssq, nil); err != nil { return err } *reply = utils.OK diff --git a/engine/datamanager.go b/engine/datamanager.go index fe8860756..ec8d02bdc 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -90,10 +90,12 @@ var ( // NewDataManager returns a new DataManager func NewDataManager(dataDB DataDB, cacheCfg config.CacheCfg, connMgr *ConnManager) *DataManager { + ms, _ := NewMarshaler(config.CgrConfig().GeneralCfg().DBDataEncoding) return &DataManager{ dataDB: dataDB, cacheCfg: cacheCfg, connMgr: connMgr, + ms: ms, } } @@ -103,6 +105,7 @@ type DataManager struct { dataDB DataDB cacheCfg config.CacheCfg connMgr *ConnManager + ms Marshaler } // DataDB exports access to dataDB @@ -459,7 +462,14 @@ func (dm *DataManager) GetStatQueue(tenant, id string, if err == utils.ErrNotFound && config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues].Remote { if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RmtConns, nil, utils.ReplicatorSv1GetStatQueue, &utils.TenantID{Tenant: tenant, ID: id}, sq); err == nil { - err = dm.dataDB.SetStatQueueDrv(sq) + var ssq *StoredStatQueue + if dm.dataDB.GetStorageType() != utils.MetaInternal { + // in case of internal we don't marshal + if ssq, err = NewStoredStatQueue(sq, dm.ms); err != nil { + return + } + } + err = dm.dataDB.SetStatQueueDrv(ssq, sq) } } if err != nil { @@ -481,13 +491,21 @@ func (dm *DataManager) GetStatQueue(tenant, id string, // SetStatQueue converts to StoredStatQueue and stores the result in dataDB func (dm *DataManager) SetStatQueue(sq *StatQueue) (err error) { - if err = dm.dataDB.SetStatQueueDrv(sq); err != nil { + var ssq *StoredStatQueue + if dm.dataDB.GetStorageType() != utils.MetaInternal || + config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues].Replicate { + // in case of internal we don't marshal + if ssq, err = NewStoredStatQueue(sq, dm.ms); err != nil { + return + } + } + if err = dm.dataDB.SetStatQueueDrv(ssq, sq); err != nil { return } if config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues].Replicate { var reply string if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1SetStatQueue, sq, &reply); err != nil { + utils.ReplicatorSv1SetStatQueue, ssq, &reply); err != nil { err = utils.CastRPCErr(err) return } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index d7760e594..701543c29 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -104,7 +104,7 @@ type DataDB interface { SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) RemStatQueueProfileDrv(tenant, id string) (err error) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err error) - SetStatQueueDrv(sq *StatQueue) (err error) + SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (err error) RemStatQueueDrv(tenant, id string) (err error) GetThresholdProfileDrv(tenant string, ID string) (tp *ThresholdProfile, err error) SetThresholdProfileDrv(tp *ThresholdProfile) (err error) diff --git a/engine/storage_internal_datadb.go b/engine/storage_internal_datadb.go index 8a1b54763..bc01d4fe6 100644 --- a/engine/storage_internal_datadb.go +++ b/engine/storage_internal_datadb.go @@ -266,16 +266,19 @@ type InternalDB struct { prefixIndexedFields []string indexedFieldsMutex sync.RWMutex // used for reload cnter *utils.Counter // used for OrderID for cdr + ms Marshaler } // NewInternalDB constructs an InternalDB func NewInternalDB(stringIndexedFields, prefixIndexedFields []string, isDataDB bool, itemsCacheCfg map[string]*config.ItemOpt) (iDB *InternalDB) { + ms, _ := NewMarshaler(config.CgrConfig().GeneralCfg().DBDataEncoding) iDB = &InternalDB{ db: ltcache.NewTransCache(newInternalDBCfg(itemsCacheCfg, isDataDB)), stringIndexedFields: stringIndexedFields, prefixIndexedFields: prefixIndexedFields, cnter: utils.NewCounter(time.Now().UnixNano(), 0), + ms: ms, } return } @@ -1120,7 +1123,13 @@ func (iDB *InternalDB) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err er } return x.(*StatQueue), nil } -func (iDB *InternalDB) SetStatQueueDrv(sq *StatQueue) (err error) { +func (iDB *InternalDB) SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (err error) { + if sq == nil { + sq, err = ssq.AsStatQueue(iDB.ms) + if err != nil { + return + } + } iDB.db.Set(utils.CacheStatQueues, utils.ConcatenatedKey(sq.Tenant, sq.ID), sq, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index cc8c33c99..67d87788a 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -1936,13 +1936,9 @@ func (ms *MongoStorage) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err e } // SetStatQueueDrv stores the metrics for a StoredStatQueue -func (ms *MongoStorage) SetStatQueueDrv(sq *StatQueue) (err error) { - var ssq *StoredStatQueue - if ssq, err = NewStoredStatQueue(sq, ms.ms); err != nil { - return - } +func (ms *MongoStorage) SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (err error) { return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColSqs).UpdateOne(sctx, bson.M{"tenant": sq.Tenant, "id": sq.ID}, + _, err = ms.getCol(ColSqs).UpdateOne(sctx, bson.M{"tenant": ssq.Tenant, "id": ssq.ID}, bson.M{"$set": ssq}, options.Update().SetUpsert(true), ) diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 178dd9ecd..85396a8bf 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -103,13 +103,9 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string, return client, nil } - var mrshler Marshaler - if mrshlerStr == utils.MSGPACK { - mrshler = NewCodecMsgpackMarshaler() - } else if mrshlerStr == utils.JSON { - mrshler = new(JSONMarshaler) - } else { - return nil, fmt.Errorf("Unsupported marshaler: %v", mrshlerStr) + ms, err := NewMarshaler(mrshlerStr) + if err != nil { + return nil, err } if sentinelName != "" { @@ -131,7 +127,7 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string, } return &RedisStorage{ maxConns: maxConns, - ms: mrshler, + ms: ms, sentinelName: sentinelName, sentinelInsts: sentinelInsts, db: db, @@ -144,7 +140,7 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string, return &RedisStorage{ dbPool: p, maxConns: maxConns, - ms: mrshler, + ms: ms, }, nil } } @@ -1379,11 +1375,7 @@ func (rs *RedisStorage) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err e } // SetStoredStatQueue stores the metrics for a StatsQueue -func (rs *RedisStorage) SetStatQueueDrv(sq *StatQueue) (err error) { - var ssq *StoredStatQueue - if ssq, err = NewStoredStatQueue(sq, rs.ms); err != nil { - return - } +func (rs *RedisStorage) SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (err error) { var result []byte result, err = rs.ms.Marshal(ssq) if err != nil {