diff --git a/apier/v1/replicator.go b/apier/v1/replicator.go index 140048e30..e167d866e 100644 --- a/apier/v1/replicator.go +++ b/apier/v1/replicator.go @@ -68,8 +68,8 @@ func (rplSv1 *ReplicatorSv1) GetReverseDestination(key string, reply *[]string) } //GetStatQueue -func (rplSv1 *ReplicatorSv1) GetStatQueue(tntID *utils.TenantID, reply *engine.StoredStatQueue) error { - if rcv, err := rplSv1.dm.DataDB().GetStoredStatQueueDrv(tntID.Tenant, tntID.ID); err != nil { +func (rplSv1 *ReplicatorSv1) GetStatQueue(tntID *utils.TenantID, reply *engine.StatQueue) error { + if rcv, err := rplSv1.dm.DataDB().GetStatQueueDrv(tntID.Tenant, tntID.ID); err != nil { return err } else { *reply = *rcv @@ -365,8 +365,8 @@ func (rplSv1 *ReplicatorSv1) SetReverseDestination(dst *engine.Destination, repl } // SetDestination -func (rplSv1 *ReplicatorSv1) SetStatQueue(ssq *engine.StoredStatQueue, reply *string) error { - if err := rplSv1.dm.DataDB().SetStoredStatQueueDrv(ssq); err != nil { +func (rplSv1 *ReplicatorSv1) SetStatQueue(sq *engine.StatQueue, reply *string) error { + if err := rplSv1.dm.DataDB().SetStatQueueDrv(sq); err != nil { return err } *reply = utils.OK @@ -543,7 +543,7 @@ func (rplSv1 *ReplicatorSv1) RemoveAccount(id string, reply *string) error { } func (rplSv1 *ReplicatorSv1) RemoveStatQueue(args *utils.TenantID, reply *string) error { - if err := rplSv1.dm.DataDB().RemStoredStatQueueDrv(args.Tenant, args.ID); err != nil { + if err := rplSv1.dm.DataDB().RemStatQueueDrv(args.Tenant, args.ID); err != nil { return err } *reply = utils.OK diff --git a/engine/datamanager.go b/engine/datamanager.go index 4a5c6624e..e7150e16f 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -452,13 +452,13 @@ func (dm *DataManager) GetStatQueue(tenant, id string, return x.(*StatQueue), nil } } - ssq, err := dm.dataDB.GetStoredStatQueueDrv(tenant, id) + sq, err = dm.dataDB.GetStatQueueDrv(tenant, id) if err != nil { if err == utils.ErrNotFound && config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues].Remote { if err = dm.rmtConns.Call(utils.ReplicatorSv1GetStatQueue, - &utils.TenantID{Tenant: tenant, ID: id}, &ssq); err == nil { - err = dm.dataDB.SetStoredStatQueueDrv(ssq) + &utils.TenantID{Tenant: tenant, ID: id}, sq); err == nil { + err = dm.dataDB.SetStatQueueDrv(sq) } } if err != nil { @@ -471,9 +471,6 @@ func (dm *DataManager) GetStatQueue(tenant, id string, return nil, err } } - if sq, err = ssq.AsStatQueue(dm.dataDB.Marshaler()); err != nil { - return nil, err - } if cacheWrite { Cache.Set(utils.CacheStatQueues, tntID, sq, nil, cacheCommit(transactionID), transactionID) @@ -483,16 +480,12 @@ func (dm *DataManager) GetStatQueue(tenant, id string, // SetStatQueue converts to StoredStatQueue and stores the result in dataDB func (dm *DataManager) SetStatQueue(sq *StatQueue) (err error) { - ssq, err := NewStoredStatQueue(sq, dm.dataDB.Marshaler()) - if err != nil { - return err - } - if err = dm.dataDB.SetStoredStatQueueDrv(ssq); err != nil { + if err = dm.dataDB.SetStatQueueDrv(sq); err != nil { return } if config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues].Replicate { var reply string - if err = dm.rplConns.Call(utils.ReplicatorSv1SetStatQueue, ssq, &reply); err != nil { + if err = dm.rplConns.Call(utils.ReplicatorSv1SetStatQueue, sq, &reply); err != nil { err = utils.CastRPCErr(err) return } @@ -502,7 +495,7 @@ func (dm *DataManager) SetStatQueue(sq *StatQueue) (err error) { // RemoveStatQueue removes the StoredStatQueue func (dm *DataManager) RemoveStatQueue(tenant, id string, transactionID string) (err error) { - if err = dm.dataDB.RemStoredStatQueueDrv(tenant, id); err != nil { + if err = dm.dataDB.RemStatQueueDrv(tenant, id); err != nil { return } if config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues].Replicate { diff --git a/engine/libstats.go b/engine/libstats.go index 60a58cb78..8ee3f4b6a 100644 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -93,6 +93,9 @@ func (ssq *StoredStatQueue) SqID() string { // AsStatQueue converts into StatQueue unmarshaling SQMetrics func (ssq *StoredStatQueue) AsStatQueue(ms Marshaler) (sq *StatQueue, err error) { + if ssq == nil { + return + } sq = &StatQueue{ Tenant: ssq.Tenant, ID: ssq.ID, diff --git a/engine/storage_interface.go b/engine/storage_interface.go index dc74477c4..d7760e594 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -47,7 +47,6 @@ type Storage interface { // OnlineStorage contains methods to use for administering online data type DataDB interface { Storage - Marshaler() Marshaler HasDataDrv(string, string, string) (bool, error) GetRatingPlanDrv(string) (*RatingPlan, error) SetRatingPlanDrv(*RatingPlan) error @@ -104,9 +103,9 @@ type DataDB interface { GetStatQueueProfileDrv(tenant string, ID string) (sq *StatQueueProfile, err error) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) RemStatQueueProfileDrv(tenant, id string) (err error) - GetStoredStatQueueDrv(tenant, id string) (sq *StoredStatQueue, err error) - SetStoredStatQueueDrv(sq *StoredStatQueue) (err error) - RemStoredStatQueueDrv(tenant, id string) (err error) + GetStatQueueDrv(tenant, id string) (sq *StatQueue, err error) + SetStatQueueDrv(sq *StatQueue) (err error) + RemStatQueueDrv(tenant, id string) (err error) GetThresholdProfileDrv(tenant string, ID string) (tp *ThresholdProfile, err error) SetThresholdProfileDrv(tp *ThresholdProfile) (err error) RemThresholdProfileDrv(tenant, id string) (err error) diff --git a/engine/storage_internal_datadb.go b/engine/storage_internal_datadb.go index 25a941e2a..354171f36 100644 --- a/engine/storage_internal_datadb.go +++ b/engine/storage_internal_datadb.go @@ -31,7 +31,6 @@ import ( type InternalDB struct { tasks []*Task - ms Marshaler db *ltcache.TransCache mu sync.RWMutex stringIndexedFields []string @@ -48,7 +47,6 @@ func NewInternalDB(stringIndexedFields, prefixIndexedFields []string) (iDB *Inte prefixIndexedFields: prefixIndexedFields, cnter: utils.NewCounter(time.Now().UnixNano(), 0), } - iDB.ms = new(GOBMarshaler) return } @@ -69,10 +67,6 @@ func (iDB *InternalDB) Flush(_ string) error { return nil } -func (iDB *InternalDB) Marshaler() Marshaler { - return iDB.ms -} - func (iDB *InternalDB) SelectDatabase(dbName string) (err error) { return nil } @@ -882,19 +876,19 @@ func (iDB *InternalDB) RemStatQueueProfileDrv(tenant, id string) (err error) { return } -func (iDB *InternalDB) GetStoredStatQueueDrv(tenant, id string) (sq *StoredStatQueue, err error) { +func (iDB *InternalDB) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err error) { x, ok := iDB.db.Get(utils.CacheStatQueues, utils.StatQueuePrefix+utils.ConcatenatedKey(tenant, id)) if !ok || x == nil { return nil, utils.ErrNotFound } - return x.(*StoredStatQueue), nil + return x.(*StatQueue), nil } -func (iDB *InternalDB) SetStoredStatQueueDrv(sq *StoredStatQueue) (err error) { +func (iDB *InternalDB) SetStatQueueDrv(sq *StatQueue) (err error) { iDB.db.Set(utils.CacheStatQueues, utils.StatQueuePrefix+utils.ConcatenatedKey(sq.Tenant, sq.ID), sq, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } -func (iDB *InternalDB) RemStoredStatQueueDrv(tenant, id string) (err error) { +func (iDB *InternalDB) RemStatQueueDrv(tenant, id string) (err error) { iDB.db.Remove(utils.CacheStatQueues, utils.StatQueuePrefix+utils.ConcatenatedKey(tenant, id), cacheCommit(utils.NonTransactional), utils.NonTransactional) return diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 8c763c8a7..e3a6d5773 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -410,11 +410,6 @@ func (ms *MongoStorage) Flush(ignore string) (err error) { }) } -// Marshaler returns the marshall -func (ms *MongoStorage) Marshaler() Marshaler { - return ms.ms -} - // DB returnes a database object func (ms *MongoStorage) DB() *mongo.Database { return ms.client.Database(ms.db) @@ -1913,12 +1908,12 @@ func (ms *MongoStorage) RemStatQueueProfileDrv(tenant, id string) (err error) { }) } -// GetStoredStatQueueDrv retrieves a StoredStatQueue -func (ms *MongoStorage) GetStoredStatQueueDrv(tenant, id string) (sq *StoredStatQueue, err error) { - sq = new(StoredStatQueue) - err = ms.query(func(sctx mongo.SessionContext) (err error) { +// GetStatQueueDrv retrieves a StoredStatQueue +func (ms *MongoStorage) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err error) { + ssq := new(StoredStatQueue) + if err = ms.query(func(sctx mongo.SessionContext) (err error) { cur := ms.getCol(ColSqs).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(sq); err != nil { + if err := cur.Decode(ssq); err != nil { sq = nil if err == mongo.ErrNoDocuments { return utils.ErrNotFound @@ -1926,23 +1921,30 @@ func (ms *MongoStorage) GetStoredStatQueueDrv(tenant, id string) (sq *StoredStat return err } return nil - }) + }); err != nil { + return + } + sq, err = ssq.AsStatQueue(ms.ms) return } -// SetStoredStatQueueDrv stores the metrics for a StoredStatQueue -func (ms *MongoStorage) SetStoredStatQueueDrv(sq *StoredStatQueue) (err error) { +// SetStatQueueDrv stores the metrics for a StoredStatQueue +func (ms *MongoStorage) SetStatQueueDrv(sq *StatQueue) (err error) { + var ssq *StoredStatQueue + if ssq, err = NewStoredStatQueue(sq, ms.ms); err != nil { + return + } return ms.query(func(sctx mongo.SessionContext) (err error) { _, err = ms.getCol(ColSqs).UpdateOne(sctx, bson.M{"tenant": sq.Tenant, "id": sq.ID}, - bson.M{"$set": sq}, + bson.M{"$set": ssq}, options.Update().SetUpsert(true), ) return err }) } -// RemStoredStatQueueDrv removes stored metrics for a StoredStatQueue -func (ms *MongoStorage) RemStoredStatQueueDrv(tenant, id string) (err error) { +// RemStatQueueDrv removes stored metrics for a StoredStatQueue +func (ms *MongoStorage) RemStatQueueDrv(tenant, id string) (err error) { return ms.query(func(sctx mongo.SessionContext) (err error) { dr, err := ms.getCol(ColSqs).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 3bd3864c8..8496b8691 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1359,7 +1359,7 @@ func (rs *RedisStorage) RemStatQueueProfileDrv(tenant, id string) (err error) { } // GetStoredStatQueue retrieves the stored metrics for a StatsQueue -func (rs *RedisStorage) GetStoredStatQueueDrv(tenant, id string) (sq *StoredStatQueue, err error) { +func (rs *RedisStorage) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err error) { key := utils.StatQueuePrefix + utils.ConcatenatedKey(tenant, id) var values []byte if values, err = rs.Cmd(redis_GET, key).Bytes(); err != nil { @@ -1368,24 +1368,30 @@ func (rs *RedisStorage) GetStoredStatQueueDrv(tenant, id string) (sq *StoredStat } return } - if err = rs.ms.Unmarshal(values, &sq); err != nil { + var ssq StoredStatQueue + if err = rs.ms.Unmarshal(values, &ssq); err != nil { return } + sq, err = ssq.AsStatQueue(rs.ms) return } // SetStoredStatQueue stores the metrics for a StatsQueue -func (rs *RedisStorage) SetStoredStatQueueDrv(sq *StoredStatQueue) (err error) { +func (rs *RedisStorage) SetStatQueueDrv(sq *StatQueue) (err error) { + var ssq *StoredStatQueue + if ssq, err = NewStoredStatQueue(sq, rs.ms); err != nil { + return + } var result []byte - result, err = rs.ms.Marshal(sq) + result, err = rs.ms.Marshal(ssq) if err != nil { return } - return rs.Cmd(redis_SET, utils.StatQueuePrefix+sq.SqID(), result).Err + return rs.Cmd(redis_SET, utils.StatQueuePrefix+ssq.SqID(), result).Err } // RemoveStatQueue removes a StatsQueue -func (rs *RedisStorage) RemStoredStatQueueDrv(tenant, id string) (err error) { +func (rs *RedisStorage) RemStatQueueDrv(tenant, id string) (err error) { key := utils.StatQueuePrefix + utils.ConcatenatedKey(tenant, id) if err = rs.Cmd(redis_DEL, key).Err; err != nil { return