diff --git a/apier/v1/replicator.go b/apier/v1/replicator.go index a12275b05..744e03483 100644 --- a/apier/v1/replicator.go +++ b/apier/v1/replicator.go @@ -142,6 +142,22 @@ func (rplSv1 *ReplicatorSv1) GetRankingProfile(ctx *context.Context, tntID *util return nil } +// GetRanking is the remote method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) GetRanking(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.Ranking) error { + engine.UpdateReplicationFilters(utils.RankingPrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt])) + rcv, err := rplSv1.dm.DataDB().GetRankingDrv(tntID.Tenant, tntID.ID) + if err != nil { + return err + } + reply.ID = rcv.ID + reply.Tenant = rcv.Tenant + reply.Sorting = rcv.Sorting + reply.StatMetrics = rcv.StatMetrics + reply.SortedStatIDs = rcv.SortedStatIDs + reply.SortingParameters = rcv.SortingParameters + return nil +} + // GetTrend is the remote method coresponding to the dataDb driver method func (rplSv1 *ReplicatorSv1) GetTrend(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.Trend) error { engine.UpdateReplicationFilters(utils.TrendPrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt])) @@ -453,6 +469,19 @@ func (rplSv1 *ReplicatorSv1) SetStatQueueProfile(ctx *context.Context, sq *engin return } +// SetRanking is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) SetRanking(ctx *context.Context, arg *engine.RankingWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().SetRankingDrv(arg.Ranking); err != nil { + return + } + if err = rplSv1.v1.CallCache(utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), + arg.Tenant, utils.CacheRankings, arg.TenantID(), utils.EmptyString, nil, nil, arg.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + // SetRankingQueueProfile is the replication method coresponding to the dataDb driver method func (rplSv1 *ReplicatorSv1) SetRankingProfile(ctx *context.Context, sg *engine.RankingProfileWithAPIOpts, reply *string) (err error) { if err = rplSv1.dm.DataDB().SetRankingProfileDrv(sg.RankingProfile); err != nil { @@ -903,6 +932,19 @@ func (rplSv1 *ReplicatorSv1) RemoveRankingProfile(ctx *context.Context, args *ut return } +// RemoveRanking is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) RemoveRanking(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().RemoveRankingDrv(args.Tenant, args.ID); err != nil { + return + } + if err = rplSv1.v1.CallCache(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), + args.Tenant, utils.CacheTrends, args.TenantID.TenantID(), utils.EmptyString, nil, nil, args.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + // RemoveTrend is the replication method coresponding to the dataDb driver method func (rplSv1 *ReplicatorSv1) RemoveTrend(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) { if err = rplSv1.dm.DataDB().RemoveTrendDrv(args.Tenant, args.ID); err != nil { diff --git a/engine/datadbmock.go b/engine/datadbmock.go index c871883a7..ec0caf67c 100644 --- a/engine/datadbmock.go +++ b/engine/datadbmock.go @@ -389,6 +389,18 @@ func (dbM *DataDBMock) RemoveTrendDrv(string, string) error { return utils.ErrNotImplemented } +func (dbM *DataDBMock) GetRankingDrv(tenant, id string) (*Ranking, error) { + return nil, utils.ErrNotImplemented +} + +func (dbM *DataDBMock) SetRankingDrv(*Ranking) error { + return utils.ErrNotImplemented +} + +func (dbM *DataDBMock) RemoveRankingDrv(string, string) error { + return utils.ErrNotImplemented +} + func (dbM *DataDBMock) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err error) { return nil, utils.ErrNotImplemented } diff --git a/engine/datamanager.go b/engine/datamanager.go index 77640242b..1cc0a9a94 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -1480,7 +1480,6 @@ func (dm *DataManager) GetTrendProfileIDs(tenants []string) (tps map[string][]st tps[tenant] = append(tps[tenant], id) } return - } func (dm *DataManager) SetTrendProfile(trp *TrendProfile) (err error) { @@ -1590,6 +1589,38 @@ func (dm *DataManager) GetRankingProfile(tenant, id string, cacheRead, cacheWrit return } +func (dm *DataManager) GetRankingProfileIDs(tenants []string) (rns map[string][]string, err error) { + prfx := utils.RankingsProfilePrefix + var keys []string + if len(tenants) == 0 { + keys, err = dm.dataDB.GetKeysForPrefix(prfx) + if err != nil { + return + } + } else { + for _, tenant := range tenants { + var tntkeys []string + tntPrfx := prfx + tenant + utils.ConcatenatedKeySep + tntkeys, err = dm.dataDB.GetKeysForPrefix(tntPrfx) + if err != nil { + return + } + keys = append(keys, tntkeys...) + } + } + if len(keys) == 0 { + return nil, utils.ErrNotFound + } + rns = make(map[string][]string) + for _, key := range keys { + indx := strings.Index(key, utils.ConcatenatedKeySep) + tenant := key[len(utils.RankingsProfilePrefix):indx] + id := key[indx+1:] + rns[tenant] = append(rns[tenant], id) + } + return +} + func (dm *DataManager) SetRankingProfile(sgp *RankingProfile) (err error) { if dm == nil { return utils.ErrNoDatabaseConn @@ -1636,6 +1667,97 @@ func (dm *DataManager) RemoveRankingProfile(tenant, id string) (err error) { } return } +func (dm *DataManager) GetRanking(tenant, id string, cacheRead, cacheWrite bool, transactionID string) (rn *Ranking, err error) { + tntID := utils.ConcatenatedKey(tenant, id) + if cacheRead { + if x, ok := Cache.Get(utils.CacheRankings, tntID); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*Ranking), nil + } + } + if dm == nil { + err = utils.ErrNoDatabaseConn + return + } + if rn, err = dm.dataDB.GetRankingDrv(tenant, id); err != nil { + if err != utils.ErrNotFound { // database error + return + } + // ErrNotFound + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankings]; itm.Remote { + if err = dm.connMgr.Call(context.TODO(), config.CgrConfig().DataDbCfg().RmtConns, + utils.ReplicatorSv1GetRanking, &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{Tenant: tenant, ID: id}, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, utils.EmptyString, + utils.FirstNonEmpty(config.CgrConfig().DataDbCfg().RmtConnID, + config.CgrConfig().GeneralCfg().NodeID)), + }, &rn); err == nil { + err = dm.dataDB.SetRankingDrv(rn) + } + } + if err != nil { + err = utils.CastRPCErr(err) + if err == utils.ErrNotFound && cacheWrite { + if errCh := Cache.Set(utils.CacheRankings, tntID, nil, nil, cacheCommit(transactionID), transactionID); errCh != nil { + return nil, errCh + } + } + return nil, err + } + if cacheWrite { + if errCh := Cache.Set(utils.CacheRankings, tntID, rn, nil, cacheCommit(transactionID), transactionID); errCh != nil { + return nil, errCh + } + } + } + return +} + +// SetRanking stores Ranking in dataDB +func (dm *DataManager) SetRanking(rn *Ranking) (err error) { + if dm == nil { + return utils.ErrNoDatabaseConn + } + if err = dm.DataDB().SetRankingDrv(rn); err != nil { + return + } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrends]; itm.Replicate { + if err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.RankingPrefix, rn.TenantID(), // this are used to get the host IDs from cache + utils.ReplicatorSv1SetRanking, + &RankingWithAPIOpts{ + Ranking: rn, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}); err != nil { + return + } + } + return +} + +// RemoveRanking removes the stored Ranking +func (dm *DataManager) RemoveRanking(tenant, id string) (err error) { + if dm == nil { + return utils.ErrNoDatabaseConn + } + if err = dm.DataDB().RemoveRankingDrv(tenant, id); err != nil { + return + } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankings]; itm.Replicate { + replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.RankingPrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache + utils.ReplicatorSv1RemoveRanking, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{Tenant: tenant, ID: id}, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) + } + return +} func (dm *DataManager) GetTiming(id string, skipCache bool, transactionID string) (t *utils.TPTiming, err error) { diff --git a/engine/librankings.go b/engine/librankings.go index a9d7f841e..a4097bf1b 100644 --- a/engine/librankings.go +++ b/engine/librankings.go @@ -82,6 +82,11 @@ func NewRankingFromProfile(rkP *RankingProfile) *Ranking { } } +type RankingWithAPIOpts struct { + *Ranking + APIOpts map[string]any +} + // Ranking is one unit out of a profile type Ranking struct { rMux sync.RWMutex @@ -99,6 +104,10 @@ type Ranking struct { } +func (r *Ranking) TenantID() string { + return utils.ConcatenatedKey(r.Tenant, r.ID) +} + type rankingSorter interface { sortStatIDs() []string // sortStatIDs returns the sorted list of statIDs } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 54a5e4c7f..a470dd929 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -104,6 +104,9 @@ type DataDB interface { SetRankingProfileDrv(sq *RankingProfile) (err error) GetRankingProfileDrv(tenant string, id string) (sq *RankingProfile, err error) RemRankingProfileDrv(tenant string, id string) (err error) + GetRankingDrv(string, string) (*Ranking, error) + SetRankingDrv(*Ranking) error + RemoveRankingDrv(string, string) error SetTrendProfileDrv(tr *TrendProfile) (err error) GetTrendProfileDrv(tenant string, id string) (sq *TrendProfile, err error) RemTrendProfileDrv(tenant string, id string) (err error) diff --git a/engine/storage_internal_datadb.go b/engine/storage_internal_datadb.go index 9bc836858..3777a7995 100644 --- a/engine/storage_internal_datadb.go +++ b/engine/storage_internal_datadb.go @@ -648,6 +648,26 @@ func (iDB *InternalDB) GetRankingProfileDrv(tenant, id string) (sg *RankingProfi return x.(*RankingProfile), nil } +func (iDB *InternalDB) GetRankingDrv(tenant, id string) (rn *Ranking, err error) { + x, ok := iDB.db.Get(utils.CacheRankings, utils.ConcatenatedKey(tenant, id)) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + return x.(*Ranking), nil +} + +func (iDB *InternalDB) SetRankingDrv(rn *Ranking) (err error) { + iDB.db.Set(utils.CacheRankings, rn.TenantID(), rn, nil, + true, utils.NonTransactional) + return +} + +func (iDB *InternalDB) RemoveRankingDrv(tenant, id string) (err error) { + iDB.db.Remove(utils.CacheRankings, utils.ConcatenatedKey(tenant, id), + true, utils.NonTransactional) + return +} + func (iDB *InternalDB) GetThresholdProfileDrv(tenant, id string) (tp *ThresholdProfile, err error) { x, ok := iDB.db.Get(utils.CacheThresholdProfiles, utils.ConcatenatedKey(tenant, id)) if !ok || x == nil { diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index b018aeffb..8b0ed6d38 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -68,6 +68,7 @@ const ( ColTrd = "trends" ColSqp = "statqueue_profiles" ColRgp = "ranking_profiles" + ColRnk = "rankings" ColTps = "threshold_profiles" ColThs = "thresholds" ColFlt = "filters" @@ -303,7 +304,7 @@ func (ms *MongoStorage) ensureIndexesForCol(col string) error { // exported for switch col { case ColAct, ColApl, ColAAp, ColAtr, ColRpl, ColDst, ColRds, ColLht, ColIndx: err = ms.enusureIndex(col, true, "key") - case ColRsP, ColRes, ColSqs, ColRgp, ColTrp, ColSqp, ColTps, ColThs, ColTrd, ColRts, ColAttr, ColFlt, ColCpp, ColDpp, ColDph: + case ColRsP, ColRes, ColSqs, ColRgp, ColTrp, ColRnk, ColSqp, ColTps, ColThs, ColTrd, ColRts, ColAttr, ColFlt, ColCpp, ColDpp, ColDph: err = ms.enusureIndex(col, true, "tenant", "id") case ColRpf, ColShg, ColAcc: err = ms.enusureIndex(col, true, "id") @@ -349,7 +350,7 @@ func (ms *MongoStorage) EnsureIndexes(cols ...string) error { cols = []string{ ColAct, ColApl, ColAAp, ColAtr, ColRpl, ColDst, ColRds, ColLht, ColIndx, ColRsP, ColRes, ColSqs, ColSqp, ColTps, ColThs, ColRts, ColAttr, ColFlt, ColCpp, - ColDpp, ColRpf, ColShg, ColAcc, ColRgp, ColTrp, ColTrd, + ColDpp, ColRpf, ColShg, ColAcc, ColRgp, ColTrp, ColTrd, ColRnk, } } else { cols = []string{ @@ -442,6 +443,8 @@ func (ms *MongoStorage) RemoveKeysForPrefix(prefix string) error { colName = ColTrp case utils.TrendPrefix: colName = ColTrd + case utils.RankingPrefix: + colName = ColRnk case utils.ThresholdPrefix: colName = ColThs case utils.FilterPrefix: @@ -619,6 +622,8 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (keys []string, err erro keys, qryErr = ms.getAllKeysMatchingField(sctx, ColTmg, utils.TimingsPrefix, subject, "id") case utils.TrendPrefix: keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColTrd, utils.TrendPrefix, subject, tntID) + case utils.RankingPrefix: + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColRnk, utils.RankingPrefix, subject, tntID) case utils.FilterPrefix: keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColFlt, utils.FilterPrefix, subject, tntID) case utils.ThresholdPrefix: @@ -685,6 +690,8 @@ func (ms *MongoStorage) HasDataDrv(category, subject, tenant string) (has bool, count, err = ms.getCol(ColSqs).CountDocuments(sctx, bson.M{"tenant": tenant, "id": subject}) case utils.StatQueueProfilePrefix: count, err = ms.getCol(ColSqp).CountDocuments(sctx, bson.M{"tenant": tenant, "id": subject}) + case utils.RankingPrefix: + count, err = ms.getCol(ColRnk).CountDocuments(sctx, bson.M{"tenant": tenant, "id": subject}) case utils.RankingsProfilePrefix: count, err = ms.getCol(ColSqp).CountDocuments(sctx, bson.M{"tenant": tenant, "id": subject}) case utils.TrendPrefix: @@ -1567,7 +1574,38 @@ func (ms *MongoStorage) RemRankingProfileDrv(tenant, id string) (err error) { } return err }) +} +func (ms *MongoStorage) GetRankingDrv(tenant, id string) (*Ranking, error) { + rn := new(Ranking) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColRnk).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(rn) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound + } + return decodeErr + }) + return rn, err +} +func (ms *MongoStorage) SetRankingDrv(rn *Ranking) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColRnk).UpdateOne(sctx, bson.M{"tenant": rn.Tenant, "id": rn.ID}, + bson.M{"$set": rn}, + options.Update().SetUpsert(true), + ) + return err + }) +} + +func (ms *MongoStorage) RemoveRankingDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { + dr, err := ms.getCol(ColRnk).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) + if dr.DeletedCount == 0 { + return utils.ErrNotFound + } + return err + }) } func (ms *MongoStorage) GetTrendProfileDrv(tenant, id string) (*TrendProfile, error) { diff --git a/engine/storage_redis.go b/engine/storage_redis.go index fbfa7d986..679a7f93e 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1010,6 +1010,30 @@ func (rs *RedisStorage) RemRankingProfileDrv(tenant string, id string) (err erro return rs.Cmd(nil, redis_DEL, utils.RankingsProfilePrefix+utils.ConcatenatedKey(tenant, id)) } +func (rs *RedisStorage) GetRankingDrv(tenant, id string) (rn *Ranking, err error) { + var values []byte + if err = rs.Cmd(&values, redis_GET, utils.RankingPrefix+utils.ConcatenatedKey(tenant, id)); err != nil { + return + } else if len(values) == 0 { + err = utils.ErrNotFound + return + } + err = rs.ms.Unmarshal(values, &rn) + return rn, err +} + +func (rs *RedisStorage) SetRankingDrv(rn *Ranking) (err error) { + var result []byte + if result, err = rs.ms.Marshal(rn); err != nil { + return + } + return rs.Cmd(nil, redis_SET, utils.RankingPrefix+utils.ConcatenatedKey(rn.Tenant, rn.ID), string(result)) +} + +func (rs *RedisStorage) RemoveRankingDrv(tenant, id string) (err error) { + return rs.Cmd(nil, redis_DEL, utils.RankingPrefix+utils.ConcatenatedKey(tenant, id)) +} + // GetThresholdProfileDrv retrieves a ThresholdProfile from dataDB func (rs *RedisStorage) GetThresholdProfileDrv(tenant, ID string) (tp *ThresholdProfile, err error) { var values []byte diff --git a/utils/consts.go b/utils/consts.go index f34f62d6e..40b64ae0e 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -295,6 +295,7 @@ const ( ResourceProfilesPrefix = "rsp_" ThresholdPrefix = "thd_" TrendPrefix = "trd_" + RankingPrefix = "rnk_" TimingsPrefix = "tmg_" FilterPrefix = "ftr_" CDRsStatsPrefix = "cst_" @@ -1288,6 +1289,7 @@ const ( ReplicatorSv1GetThreshold = "ReplicatorSv1.GetThreshold" ReplicatorSv1GetThresholdProfile = "ReplicatorSv1.GetThresholdProfile" ReplicatorSv1GetStatQueueProfile = "ReplicatorSv1.GetStatQueueProfile" + ReplicatorSv1GetRanking = "ReplicatorSv1.GetRanking" ReplicatorSv1GetRankingProfile = "ReplicatorSv1.GetRankingProfile" ReplicatorSv1GetTrend = "ReplicatorSv1.GetTrend" ReplicatorSv1GetTrendProfile = "ReplicatorSv1.GetTrendProfile" @@ -1316,6 +1318,7 @@ const ( ReplicatorSv1SetStatQueue = "ReplicatorSv1.SetStatQueue" ReplicatorSv1SetFilter = "ReplicatorSv1.SetFilter" ReplicatorSv1SetStatQueueProfile = "ReplicatorSv1.SetStatQueueProfile" + ReplicatorSv1SetRanking = "ReplicatorSv1.SetRanking" ReplicatorSv1SetRankingProfile = "ReplicatorSv1.SetRankingProfile" ReplicatorSv1SetTrend = "ReplicatorSv1.SetTrend" ReplicatorSv1SetTrendProfile = "ReplicatorSv1.SetTrendProfile" @@ -1344,6 +1347,7 @@ const ( ReplicatorSv1RemoveFilter = "ReplicatorSv1.RemoveFilter" ReplicatorSv1RemoveThresholdProfile = "ReplicatorSv1.RemoveThresholdProfile" ReplicatorSv1RemoveStatQueueProfile = "ReplicatorSv1.RemoveStatQueueProfile" + ReplicatorSv1RemoveRanking = "ReplicatorSv1.RemoveRanking" ReplicatorSv1RemoveRankingProfile = "ReplicatorSv1.RemoveRankingProfile" ReplicatorSv1RemoveTrend = "ReplicatorSv1.RemoveTrend" ReplicatorSv1RemoveTrendProfile = "ReplicatorSv1.RemoveTrendProfile"