mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-22 15:48:44 +05:00
Updated SetStoredStatQueueDrv and removed unused Marshaler function
This commit is contained in:
@@ -68,8 +68,8 @@ func (rplSv1 *ReplicatorSv1) GetReverseDestination(key string, reply *[]string)
|
||||
}
|
||||
|
||||
//GetStatQueue
|
||||
func (rplSv1 *ReplicatorSv1) GetStatQueue(tntID *utils.TenantID, reply *engine.StoredStatQueue) error {
|
||||
if rcv, err := rplSv1.dm.DataDB().GetStoredStatQueueDrv(tntID.Tenant, tntID.ID); err != nil {
|
||||
func (rplSv1 *ReplicatorSv1) GetStatQueue(tntID *utils.TenantID, reply *engine.StatQueue) error {
|
||||
if rcv, err := rplSv1.dm.DataDB().GetStatQueueDrv(tntID.Tenant, tntID.ID); err != nil {
|
||||
return err
|
||||
} else {
|
||||
*reply = *rcv
|
||||
@@ -365,8 +365,8 @@ func (rplSv1 *ReplicatorSv1) SetReverseDestination(dst *engine.Destination, repl
|
||||
}
|
||||
|
||||
// SetDestination
|
||||
func (rplSv1 *ReplicatorSv1) SetStatQueue(ssq *engine.StoredStatQueue, reply *string) error {
|
||||
if err := rplSv1.dm.DataDB().SetStoredStatQueueDrv(ssq); err != nil {
|
||||
func (rplSv1 *ReplicatorSv1) SetStatQueue(sq *engine.StatQueue, reply *string) error {
|
||||
if err := rplSv1.dm.DataDB().SetStatQueueDrv(sq); err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = utils.OK
|
||||
@@ -543,7 +543,7 @@ func (rplSv1 *ReplicatorSv1) RemoveAccount(id string, reply *string) error {
|
||||
}
|
||||
|
||||
func (rplSv1 *ReplicatorSv1) RemoveStatQueue(args *utils.TenantID, reply *string) error {
|
||||
if err := rplSv1.dm.DataDB().RemStoredStatQueueDrv(args.Tenant, args.ID); err != nil {
|
||||
if err := rplSv1.dm.DataDB().RemStatQueueDrv(args.Tenant, args.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = utils.OK
|
||||
|
||||
@@ -452,13 +452,13 @@ func (dm *DataManager) GetStatQueue(tenant, id string,
|
||||
return x.(*StatQueue), nil
|
||||
}
|
||||
}
|
||||
ssq, err := dm.dataDB.GetStoredStatQueueDrv(tenant, id)
|
||||
sq, err = dm.dataDB.GetStatQueueDrv(tenant, id)
|
||||
if err != nil {
|
||||
if err == utils.ErrNotFound &&
|
||||
config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues].Remote {
|
||||
if err = dm.rmtConns.Call(utils.ReplicatorSv1GetStatQueue,
|
||||
&utils.TenantID{Tenant: tenant, ID: id}, &ssq); err == nil {
|
||||
err = dm.dataDB.SetStoredStatQueueDrv(ssq)
|
||||
&utils.TenantID{Tenant: tenant, ID: id}, sq); err == nil {
|
||||
err = dm.dataDB.SetStatQueueDrv(sq)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
@@ -471,9 +471,6 @@ func (dm *DataManager) GetStatQueue(tenant, id string,
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if sq, err = ssq.AsStatQueue(dm.dataDB.Marshaler()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cacheWrite {
|
||||
Cache.Set(utils.CacheStatQueues, tntID, sq, nil,
|
||||
cacheCommit(transactionID), transactionID)
|
||||
@@ -483,16 +480,12 @@ func (dm *DataManager) GetStatQueue(tenant, id string,
|
||||
|
||||
// SetStatQueue converts to StoredStatQueue and stores the result in dataDB
|
||||
func (dm *DataManager) SetStatQueue(sq *StatQueue) (err error) {
|
||||
ssq, err := NewStoredStatQueue(sq, dm.dataDB.Marshaler())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = dm.dataDB.SetStoredStatQueueDrv(ssq); err != nil {
|
||||
if err = dm.dataDB.SetStatQueueDrv(sq); err != nil {
|
||||
return
|
||||
}
|
||||
if config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues].Replicate {
|
||||
var reply string
|
||||
if err = dm.rplConns.Call(utils.ReplicatorSv1SetStatQueue, ssq, &reply); err != nil {
|
||||
if err = dm.rplConns.Call(utils.ReplicatorSv1SetStatQueue, sq, &reply); err != nil {
|
||||
err = utils.CastRPCErr(err)
|
||||
return
|
||||
}
|
||||
@@ -502,7 +495,7 @@ func (dm *DataManager) SetStatQueue(sq *StatQueue) (err error) {
|
||||
|
||||
// RemoveStatQueue removes the StoredStatQueue
|
||||
func (dm *DataManager) RemoveStatQueue(tenant, id string, transactionID string) (err error) {
|
||||
if err = dm.dataDB.RemStoredStatQueueDrv(tenant, id); err != nil {
|
||||
if err = dm.dataDB.RemStatQueueDrv(tenant, id); err != nil {
|
||||
return
|
||||
}
|
||||
if config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues].Replicate {
|
||||
|
||||
@@ -93,6 +93,9 @@ func (ssq *StoredStatQueue) SqID() string {
|
||||
|
||||
// AsStatQueue converts into StatQueue unmarshaling SQMetrics
|
||||
func (ssq *StoredStatQueue) AsStatQueue(ms Marshaler) (sq *StatQueue, err error) {
|
||||
if ssq == nil {
|
||||
return
|
||||
}
|
||||
sq = &StatQueue{
|
||||
Tenant: ssq.Tenant,
|
||||
ID: ssq.ID,
|
||||
|
||||
@@ -47,7 +47,6 @@ type Storage interface {
|
||||
// OnlineStorage contains methods to use for administering online data
|
||||
type DataDB interface {
|
||||
Storage
|
||||
Marshaler() Marshaler
|
||||
HasDataDrv(string, string, string) (bool, error)
|
||||
GetRatingPlanDrv(string) (*RatingPlan, error)
|
||||
SetRatingPlanDrv(*RatingPlan) error
|
||||
@@ -104,9 +103,9 @@ type DataDB interface {
|
||||
GetStatQueueProfileDrv(tenant string, ID string) (sq *StatQueueProfile, err error)
|
||||
SetStatQueueProfileDrv(sq *StatQueueProfile) (err error)
|
||||
RemStatQueueProfileDrv(tenant, id string) (err error)
|
||||
GetStoredStatQueueDrv(tenant, id string) (sq *StoredStatQueue, err error)
|
||||
SetStoredStatQueueDrv(sq *StoredStatQueue) (err error)
|
||||
RemStoredStatQueueDrv(tenant, id string) (err error)
|
||||
GetStatQueueDrv(tenant, id string) (sq *StatQueue, err error)
|
||||
SetStatQueueDrv(sq *StatQueue) (err error)
|
||||
RemStatQueueDrv(tenant, id string) (err error)
|
||||
GetThresholdProfileDrv(tenant string, ID string) (tp *ThresholdProfile, err error)
|
||||
SetThresholdProfileDrv(tp *ThresholdProfile) (err error)
|
||||
RemThresholdProfileDrv(tenant, id string) (err error)
|
||||
|
||||
@@ -31,7 +31,6 @@ import (
|
||||
|
||||
type InternalDB struct {
|
||||
tasks []*Task
|
||||
ms Marshaler
|
||||
db *ltcache.TransCache
|
||||
mu sync.RWMutex
|
||||
stringIndexedFields []string
|
||||
@@ -48,7 +47,6 @@ func NewInternalDB(stringIndexedFields, prefixIndexedFields []string) (iDB *Inte
|
||||
prefixIndexedFields: prefixIndexedFields,
|
||||
cnter: utils.NewCounter(time.Now().UnixNano(), 0),
|
||||
}
|
||||
iDB.ms = new(GOBMarshaler)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -69,10 +67,6 @@ func (iDB *InternalDB) Flush(_ string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) Marshaler() Marshaler {
|
||||
return iDB.ms
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) SelectDatabase(dbName string) (err error) {
|
||||
return nil
|
||||
}
|
||||
@@ -882,19 +876,19 @@ func (iDB *InternalDB) RemStatQueueProfileDrv(tenant, id string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) GetStoredStatQueueDrv(tenant, id string) (sq *StoredStatQueue, err error) {
|
||||
func (iDB *InternalDB) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err error) {
|
||||
x, ok := iDB.db.Get(utils.CacheStatQueues, utils.StatQueuePrefix+utils.ConcatenatedKey(tenant, id))
|
||||
if !ok || x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*StoredStatQueue), nil
|
||||
return x.(*StatQueue), nil
|
||||
}
|
||||
func (iDB *InternalDB) SetStoredStatQueueDrv(sq *StoredStatQueue) (err error) {
|
||||
func (iDB *InternalDB) SetStatQueueDrv(sq *StatQueue) (err error) {
|
||||
iDB.db.Set(utils.CacheStatQueues, utils.StatQueuePrefix+utils.ConcatenatedKey(sq.Tenant, sq.ID), sq, nil,
|
||||
cacheCommit(utils.NonTransactional), utils.NonTransactional)
|
||||
return
|
||||
}
|
||||
func (iDB *InternalDB) RemStoredStatQueueDrv(tenant, id string) (err error) {
|
||||
func (iDB *InternalDB) RemStatQueueDrv(tenant, id string) (err error) {
|
||||
iDB.db.Remove(utils.CacheStatQueues, utils.StatQueuePrefix+utils.ConcatenatedKey(tenant, id),
|
||||
cacheCommit(utils.NonTransactional), utils.NonTransactional)
|
||||
return
|
||||
|
||||
@@ -410,11 +410,6 @@ func (ms *MongoStorage) Flush(ignore string) (err error) {
|
||||
})
|
||||
}
|
||||
|
||||
// Marshaler returns the marshall
|
||||
func (ms *MongoStorage) Marshaler() Marshaler {
|
||||
return ms.ms
|
||||
}
|
||||
|
||||
// DB returnes a database object
|
||||
func (ms *MongoStorage) DB() *mongo.Database {
|
||||
return ms.client.Database(ms.db)
|
||||
@@ -1913,12 +1908,12 @@ func (ms *MongoStorage) RemStatQueueProfileDrv(tenant, id string) (err error) {
|
||||
})
|
||||
}
|
||||
|
||||
// GetStoredStatQueueDrv retrieves a StoredStatQueue
|
||||
func (ms *MongoStorage) GetStoredStatQueueDrv(tenant, id string) (sq *StoredStatQueue, err error) {
|
||||
sq = new(StoredStatQueue)
|
||||
err = ms.query(func(sctx mongo.SessionContext) (err error) {
|
||||
// GetStatQueueDrv retrieves a StoredStatQueue
|
||||
func (ms *MongoStorage) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err error) {
|
||||
ssq := new(StoredStatQueue)
|
||||
if err = ms.query(func(sctx mongo.SessionContext) (err error) {
|
||||
cur := ms.getCol(ColSqs).FindOne(sctx, bson.M{"tenant": tenant, "id": id})
|
||||
if err := cur.Decode(sq); err != nil {
|
||||
if err := cur.Decode(ssq); err != nil {
|
||||
sq = nil
|
||||
if err == mongo.ErrNoDocuments {
|
||||
return utils.ErrNotFound
|
||||
@@ -1926,23 +1921,30 @@ func (ms *MongoStorage) GetStoredStatQueueDrv(tenant, id string) (sq *StoredStat
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}); err != nil {
|
||||
return
|
||||
}
|
||||
sq, err = ssq.AsStatQueue(ms.ms)
|
||||
return
|
||||
}
|
||||
|
||||
// SetStoredStatQueueDrv stores the metrics for a StoredStatQueue
|
||||
func (ms *MongoStorage) SetStoredStatQueueDrv(sq *StoredStatQueue) (err error) {
|
||||
// SetStatQueueDrv stores the metrics for a StoredStatQueue
|
||||
func (ms *MongoStorage) SetStatQueueDrv(sq *StatQueue) (err error) {
|
||||
var ssq *StoredStatQueue
|
||||
if ssq, err = NewStoredStatQueue(sq, ms.ms); err != nil {
|
||||
return
|
||||
}
|
||||
return ms.query(func(sctx mongo.SessionContext) (err error) {
|
||||
_, err = ms.getCol(ColSqs).UpdateOne(sctx, bson.M{"tenant": sq.Tenant, "id": sq.ID},
|
||||
bson.M{"$set": sq},
|
||||
bson.M{"$set": ssq},
|
||||
options.Update().SetUpsert(true),
|
||||
)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// RemStoredStatQueueDrv removes stored metrics for a StoredStatQueue
|
||||
func (ms *MongoStorage) RemStoredStatQueueDrv(tenant, id string) (err error) {
|
||||
// RemStatQueueDrv removes stored metrics for a StoredStatQueue
|
||||
func (ms *MongoStorage) RemStatQueueDrv(tenant, id string) (err error) {
|
||||
return ms.query(func(sctx mongo.SessionContext) (err error) {
|
||||
dr, err := ms.getCol(ColSqs).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id})
|
||||
if dr.DeletedCount == 0 {
|
||||
|
||||
@@ -1359,7 +1359,7 @@ func (rs *RedisStorage) RemStatQueueProfileDrv(tenant, id string) (err error) {
|
||||
}
|
||||
|
||||
// GetStoredStatQueue retrieves the stored metrics for a StatsQueue
|
||||
func (rs *RedisStorage) GetStoredStatQueueDrv(tenant, id string) (sq *StoredStatQueue, err error) {
|
||||
func (rs *RedisStorage) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err error) {
|
||||
key := utils.StatQueuePrefix + utils.ConcatenatedKey(tenant, id)
|
||||
var values []byte
|
||||
if values, err = rs.Cmd(redis_GET, key).Bytes(); err != nil {
|
||||
@@ -1368,24 +1368,30 @@ func (rs *RedisStorage) GetStoredStatQueueDrv(tenant, id string) (sq *StoredStat
|
||||
}
|
||||
return
|
||||
}
|
||||
if err = rs.ms.Unmarshal(values, &sq); err != nil {
|
||||
var ssq StoredStatQueue
|
||||
if err = rs.ms.Unmarshal(values, &ssq); err != nil {
|
||||
return
|
||||
}
|
||||
sq, err = ssq.AsStatQueue(rs.ms)
|
||||
return
|
||||
}
|
||||
|
||||
// SetStoredStatQueue stores the metrics for a StatsQueue
|
||||
func (rs *RedisStorage) SetStoredStatQueueDrv(sq *StoredStatQueue) (err error) {
|
||||
func (rs *RedisStorage) SetStatQueueDrv(sq *StatQueue) (err error) {
|
||||
var ssq *StoredStatQueue
|
||||
if ssq, err = NewStoredStatQueue(sq, rs.ms); err != nil {
|
||||
return
|
||||
}
|
||||
var result []byte
|
||||
result, err = rs.ms.Marshal(sq)
|
||||
result, err = rs.ms.Marshal(ssq)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return rs.Cmd(redis_SET, utils.StatQueuePrefix+sq.SqID(), result).Err
|
||||
return rs.Cmd(redis_SET, utils.StatQueuePrefix+ssq.SqID(), result).Err
|
||||
}
|
||||
|
||||
// RemoveStatQueue removes a StatsQueue
|
||||
func (rs *RedisStorage) RemStoredStatQueueDrv(tenant, id string) (err error) {
|
||||
func (rs *RedisStorage) RemStatQueueDrv(tenant, id string) (err error) {
|
||||
key := utils.StatQueuePrefix + utils.ConcatenatedKey(tenant, id)
|
||||
if err = rs.Cmd(redis_DEL, key).Err; err != nil {
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user