Merge pull request #1869 from TeoV/master

Send StoredStatQueue instead of StatQueue when replicate fixes #1864
This commit is contained in:
Dan Christian Bogos
2020-01-15 13:32:56 +01:00
committed by GitHub
7 changed files with 92 additions and 27 deletions

View File

@@ -74,6 +74,7 @@ var (
testInternalMatchThreshold,
testInternalAccountBalanceOperations,
testInternalSetAccount,
testInternalReplicateStats,
testInternalRemoteITKillEngine,
}
)
@@ -926,6 +927,55 @@ func testInternalSetAccount(t *testing.T) {
}
}
func testInternalReplicateStats(t *testing.T) {
var reply string
statConfig = &StatQueueWithCache{
StatQueueProfile: &engine.StatQueueProfile{
Tenant: "cgrates.org",
ID: "StatsToReplicate",
FilterIDs: []string{"*string:~*req.Account:1001"},
ActivationInterval: &utils.ActivationInterval{
ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC),
},
QueueLength: 10,
TTL: time.Duration(10) * time.Second,
Metrics: []*engine.MetricWithFilters{
&engine.MetricWithFilters{
MetricID: "*acd",
},
&engine.MetricWithFilters{
MetricID: "*tcd",
},
},
ThresholdIDs: []string{"*none"},
Weight: 20,
MinItems: 1,
},
}
if err := internalRPC.Call(utils.ApierV1SetStatQueueProfile, statConfig, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Error("Unexpected reply returned", reply)
}
var rcv *engine.StatQueueProfile
if err := engineOneRPC.Call(utils.ApierV1GetStatQueueProfile,
&utils.TenantID{Tenant: "cgrates.org", ID: "StatsToReplicate"}, &rcv); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(statConfig.StatQueueProfile, rcv) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(statConfig.StatQueueProfile), utils.ToJSON(rcv))
}
if err := engineTwoRPC.Call(utils.ApierV1GetStatQueueProfile,
&utils.TenantID{Tenant: "cgrates.org", ID: "StatsToReplicate"}, &rcv); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(statConfig.StatQueueProfile, rcv) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(statConfig.StatQueueProfile), utils.ToJSON(rcv))
}
}
func testInternalRemoteITKillEngine(t *testing.T) {
if err := engine.KillEngine(100); err != nil {
t.Error(err)

View File

@@ -365,8 +365,8 @@ func (rplSv1 *ReplicatorSv1) SetReverseDestination(dst *engine.Destination, repl
}
// SetDestination
func (rplSv1 *ReplicatorSv1) SetStatQueue(sq *engine.StatQueue, reply *string) error {
if err := rplSv1.dm.DataDB().SetStatQueueDrv(sq); err != nil {
func (rplSv1 *ReplicatorSv1) SetStatQueue(ssq *engine.StoredStatQueue, reply *string) error {
if err := rplSv1.dm.DataDB().SetStatQueueDrv(ssq, nil); err != nil {
return err
}
*reply = utils.OK

View File

@@ -90,10 +90,12 @@ var (
// NewDataManager returns a new DataManager
func NewDataManager(dataDB DataDB, cacheCfg config.CacheCfg, connMgr *ConnManager) *DataManager {
ms, _ := NewMarshaler(config.CgrConfig().GeneralCfg().DBDataEncoding)
return &DataManager{
dataDB: dataDB,
cacheCfg: cacheCfg,
connMgr: connMgr,
ms: ms,
}
}
@@ -103,6 +105,7 @@ type DataManager struct {
dataDB DataDB
cacheCfg config.CacheCfg
connMgr *ConnManager
ms Marshaler
}
// DataDB exports access to dataDB
@@ -459,7 +462,14 @@ func (dm *DataManager) GetStatQueue(tenant, id string,
if err == utils.ErrNotFound && config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues].Remote {
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RmtConns, nil, utils.ReplicatorSv1GetStatQueue,
&utils.TenantID{Tenant: tenant, ID: id}, sq); err == nil {
err = dm.dataDB.SetStatQueueDrv(sq)
var ssq *StoredStatQueue
if dm.dataDB.GetStorageType() != utils.MetaInternal {
// in case of internal we don't marshal
if ssq, err = NewStoredStatQueue(sq, dm.ms); err != nil {
return
}
}
err = dm.dataDB.SetStatQueueDrv(ssq, sq)
}
}
if err != nil {
@@ -481,13 +491,21 @@ func (dm *DataManager) GetStatQueue(tenant, id string,
// SetStatQueue converts to StoredStatQueue and stores the result in dataDB
func (dm *DataManager) SetStatQueue(sq *StatQueue) (err error) {
if err = dm.dataDB.SetStatQueueDrv(sq); err != nil {
var ssq *StoredStatQueue
if dm.dataDB.GetStorageType() != utils.MetaInternal ||
config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues].Replicate {
// in case of internal we don't marshal
if ssq, err = NewStoredStatQueue(sq, dm.ms); err != nil {
return
}
}
if err = dm.dataDB.SetStatQueueDrv(ssq, sq); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues].Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1SetStatQueue, sq, &reply); err != nil {
utils.ReplicatorSv1SetStatQueue, ssq, &reply); err != nil {
err = utils.CastRPCErr(err)
return
}

View File

@@ -104,7 +104,7 @@ type DataDB interface {
SetStatQueueProfileDrv(sq *StatQueueProfile) (err error)
RemStatQueueProfileDrv(tenant, id string) (err error)
GetStatQueueDrv(tenant, id string) (sq *StatQueue, err error)
SetStatQueueDrv(sq *StatQueue) (err error)
SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (err error)
RemStatQueueDrv(tenant, id string) (err error)
GetThresholdProfileDrv(tenant string, ID string) (tp *ThresholdProfile, err error)
SetThresholdProfileDrv(tp *ThresholdProfile) (err error)

View File

@@ -266,16 +266,19 @@ type InternalDB struct {
prefixIndexedFields []string
indexedFieldsMutex sync.RWMutex // used for reload
cnter *utils.Counter // used for OrderID for cdr
ms Marshaler
}
// NewInternalDB constructs an InternalDB
func NewInternalDB(stringIndexedFields, prefixIndexedFields []string,
isDataDB bool, itemsCacheCfg map[string]*config.ItemOpt) (iDB *InternalDB) {
ms, _ := NewMarshaler(config.CgrConfig().GeneralCfg().DBDataEncoding)
iDB = &InternalDB{
db: ltcache.NewTransCache(newInternalDBCfg(itemsCacheCfg, isDataDB)),
stringIndexedFields: stringIndexedFields,
prefixIndexedFields: prefixIndexedFields,
cnter: utils.NewCounter(time.Now().UnixNano(), 0),
ms: ms,
}
return
}
@@ -1120,7 +1123,13 @@ func (iDB *InternalDB) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err er
}
return x.(*StatQueue), nil
}
func (iDB *InternalDB) SetStatQueueDrv(sq *StatQueue) (err error) {
func (iDB *InternalDB) SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (err error) {
if sq == nil {
sq, err = ssq.AsStatQueue(iDB.ms)
if err != nil {
return
}
}
iDB.db.Set(utils.CacheStatQueues, utils.ConcatenatedKey(sq.Tenant, sq.ID), sq, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return

View File

@@ -1936,13 +1936,9 @@ func (ms *MongoStorage) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err e
}
// SetStatQueueDrv stores the metrics for a StoredStatQueue
func (ms *MongoStorage) SetStatQueueDrv(sq *StatQueue) (err error) {
var ssq *StoredStatQueue
if ssq, err = NewStoredStatQueue(sq, ms.ms); err != nil {
return
}
func (ms *MongoStorage) SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
_, err = ms.getCol(ColSqs).UpdateOne(sctx, bson.M{"tenant": sq.Tenant, "id": sq.ID},
_, err = ms.getCol(ColSqs).UpdateOne(sctx, bson.M{"tenant": ssq.Tenant, "id": ssq.ID},
bson.M{"$set": ssq},
options.Update().SetUpsert(true),
)

View File

@@ -103,13 +103,9 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string,
return client, nil
}
var mrshler Marshaler
if mrshlerStr == utils.MSGPACK {
mrshler = NewCodecMsgpackMarshaler()
} else if mrshlerStr == utils.JSON {
mrshler = new(JSONMarshaler)
} else {
return nil, fmt.Errorf("Unsupported marshaler: %v", mrshlerStr)
ms, err := NewMarshaler(mrshlerStr)
if err != nil {
return nil, err
}
if sentinelName != "" {
@@ -131,7 +127,7 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string,
}
return &RedisStorage{
maxConns: maxConns,
ms: mrshler,
ms: ms,
sentinelName: sentinelName,
sentinelInsts: sentinelInsts,
db: db,
@@ -144,7 +140,7 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string,
return &RedisStorage{
dbPool: p,
maxConns: maxConns,
ms: mrshler,
ms: ms,
}, nil
}
}
@@ -1379,11 +1375,7 @@ func (rs *RedisStorage) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err e
}
// SetStoredStatQueue stores the metrics for a StatsQueue
func (rs *RedisStorage) SetStatQueueDrv(sq *StatQueue) (err error) {
var ssq *StoredStatQueue
if ssq, err = NewStoredStatQueue(sq, rs.ms); err != nil {
return
}
func (rs *RedisStorage) SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (err error) {
var result []byte
result, err = rs.ms.Marshal(ssq)
if err != nil {