added get/set methods for ranking in datamanager

This commit is contained in:
gezimbll
2024-10-17 14:26:07 +02:00
committed by Dan Christian Bogos
parent c1b0efd455
commit 88805ac074
9 changed files with 277 additions and 3 deletions

View File

@@ -142,6 +142,22 @@ func (rplSv1 *ReplicatorSv1) GetRankingProfile(ctx *context.Context, tntID *util
return nil
}
// GetRanking is the remote method coresponding to the dataDb driver method
func (rplSv1 *ReplicatorSv1) GetRanking(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.Ranking) error {
engine.UpdateReplicationFilters(utils.RankingPrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt]))
rcv, err := rplSv1.dm.DataDB().GetRankingDrv(tntID.Tenant, tntID.ID)
if err != nil {
return err
}
reply.ID = rcv.ID
reply.Tenant = rcv.Tenant
reply.Sorting = rcv.Sorting
reply.StatMetrics = rcv.StatMetrics
reply.SortedStatIDs = rcv.SortedStatIDs
reply.SortingParameters = rcv.SortingParameters
return nil
}
// GetTrend is the remote method coresponding to the dataDb driver method
func (rplSv1 *ReplicatorSv1) GetTrend(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.Trend) error {
engine.UpdateReplicationFilters(utils.TrendPrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt]))
@@ -453,6 +469,19 @@ func (rplSv1 *ReplicatorSv1) SetStatQueueProfile(ctx *context.Context, sq *engin
return
}
// SetRanking is the replication method coresponding to the dataDb driver method
func (rplSv1 *ReplicatorSv1) SetRanking(ctx *context.Context, arg *engine.RankingWithAPIOpts, reply *string) (err error) {
if err = rplSv1.dm.DataDB().SetRankingDrv(arg.Ranking); err != nil {
return
}
if err = rplSv1.v1.CallCache(utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]),
arg.Tenant, utils.CacheRankings, arg.TenantID(), utils.EmptyString, nil, nil, arg.APIOpts); err != nil {
return
}
*reply = utils.OK
return
}
// SetRankingQueueProfile is the replication method coresponding to the dataDb driver method
func (rplSv1 *ReplicatorSv1) SetRankingProfile(ctx *context.Context, sg *engine.RankingProfileWithAPIOpts, reply *string) (err error) {
if err = rplSv1.dm.DataDB().SetRankingProfileDrv(sg.RankingProfile); err != nil {
@@ -903,6 +932,19 @@ func (rplSv1 *ReplicatorSv1) RemoveRankingProfile(ctx *context.Context, args *ut
return
}
// RemoveRanking is the replication method coresponding to the dataDb driver method
func (rplSv1 *ReplicatorSv1) RemoveRanking(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) {
if err = rplSv1.dm.DataDB().RemoveRankingDrv(args.Tenant, args.ID); err != nil {
return
}
if err = rplSv1.v1.CallCache(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]),
args.Tenant, utils.CacheTrends, args.TenantID.TenantID(), utils.EmptyString, nil, nil, args.APIOpts); err != nil {
return
}
*reply = utils.OK
return
}
// RemoveTrend is the replication method coresponding to the dataDb driver method
func (rplSv1 *ReplicatorSv1) RemoveTrend(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) {
if err = rplSv1.dm.DataDB().RemoveTrendDrv(args.Tenant, args.ID); err != nil {

View File

@@ -389,6 +389,18 @@ func (dbM *DataDBMock) RemoveTrendDrv(string, string) error {
return utils.ErrNotImplemented
}
func (dbM *DataDBMock) GetRankingDrv(tenant, id string) (*Ranking, error) {
return nil, utils.ErrNotImplemented
}
func (dbM *DataDBMock) SetRankingDrv(*Ranking) error {
return utils.ErrNotImplemented
}
func (dbM *DataDBMock) RemoveRankingDrv(string, string) error {
return utils.ErrNotImplemented
}
func (dbM *DataDBMock) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err error) {
return nil, utils.ErrNotImplemented
}

View File

@@ -1480,7 +1480,6 @@ func (dm *DataManager) GetTrendProfileIDs(tenants []string) (tps map[string][]st
tps[tenant] = append(tps[tenant], id)
}
return
}
func (dm *DataManager) SetTrendProfile(trp *TrendProfile) (err error) {
@@ -1590,6 +1589,38 @@ func (dm *DataManager) GetRankingProfile(tenant, id string, cacheRead, cacheWrit
return
}
func (dm *DataManager) GetRankingProfileIDs(tenants []string) (rns map[string][]string, err error) {
prfx := utils.RankingsProfilePrefix
var keys []string
if len(tenants) == 0 {
keys, err = dm.dataDB.GetKeysForPrefix(prfx)
if err != nil {
return
}
} else {
for _, tenant := range tenants {
var tntkeys []string
tntPrfx := prfx + tenant + utils.ConcatenatedKeySep
tntkeys, err = dm.dataDB.GetKeysForPrefix(tntPrfx)
if err != nil {
return
}
keys = append(keys, tntkeys...)
}
}
if len(keys) == 0 {
return nil, utils.ErrNotFound
}
rns = make(map[string][]string)
for _, key := range keys {
indx := strings.Index(key, utils.ConcatenatedKeySep)
tenant := key[len(utils.RankingsProfilePrefix):indx]
id := key[indx+1:]
rns[tenant] = append(rns[tenant], id)
}
return
}
func (dm *DataManager) SetRankingProfile(sgp *RankingProfile) (err error) {
if dm == nil {
return utils.ErrNoDatabaseConn
@@ -1636,6 +1667,97 @@ func (dm *DataManager) RemoveRankingProfile(tenant, id string) (err error) {
}
return
}
func (dm *DataManager) GetRanking(tenant, id string, cacheRead, cacheWrite bool, transactionID string) (rn *Ranking, err error) {
tntID := utils.ConcatenatedKey(tenant, id)
if cacheRead {
if x, ok := Cache.Get(utils.CacheRankings, tntID); ok {
if x == nil {
return nil, utils.ErrNotFound
}
return x.(*Ranking), nil
}
}
if dm == nil {
err = utils.ErrNoDatabaseConn
return
}
if rn, err = dm.dataDB.GetRankingDrv(tenant, id); err != nil {
if err != utils.ErrNotFound { // database error
return
}
// ErrNotFound
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankings]; itm.Remote {
if err = dm.connMgr.Call(context.TODO(), config.CgrConfig().DataDbCfg().RmtConns,
utils.ReplicatorSv1GetRanking, &utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{Tenant: tenant, ID: id},
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, utils.EmptyString,
utils.FirstNonEmpty(config.CgrConfig().DataDbCfg().RmtConnID,
config.CgrConfig().GeneralCfg().NodeID)),
}, &rn); err == nil {
err = dm.dataDB.SetRankingDrv(rn)
}
}
if err != nil {
err = utils.CastRPCErr(err)
if err == utils.ErrNotFound && cacheWrite {
if errCh := Cache.Set(utils.CacheRankings, tntID, nil, nil, cacheCommit(transactionID), transactionID); errCh != nil {
return nil, errCh
}
}
return nil, err
}
if cacheWrite {
if errCh := Cache.Set(utils.CacheRankings, tntID, rn, nil, cacheCommit(transactionID), transactionID); errCh != nil {
return nil, errCh
}
}
}
return
}
// SetRanking stores Ranking in dataDB
func (dm *DataManager) SetRanking(rn *Ranking) (err error) {
if dm == nil {
return utils.ErrNoDatabaseConn
}
if err = dm.DataDB().SetRankingDrv(rn); err != nil {
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrends]; itm.Replicate {
if err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.RankingPrefix, rn.TenantID(), // this are used to get the host IDs from cache
utils.ReplicatorSv1SetRanking,
&RankingWithAPIOpts{
Ranking: rn,
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}); err != nil {
return
}
}
return
}
// RemoveRanking removes the stored Ranking
func (dm *DataManager) RemoveRanking(tenant, id string) (err error) {
if dm == nil {
return utils.ErrNoDatabaseConn
}
if err = dm.DataDB().RemoveRankingDrv(tenant, id); err != nil {
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankings]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.RankingPrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveRanking,
&utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{Tenant: tenant, ID: id},
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)})
}
return
}
func (dm *DataManager) GetTiming(id string, skipCache bool,
transactionID string) (t *utils.TPTiming, err error) {

View File

@@ -82,6 +82,11 @@ func NewRankingFromProfile(rkP *RankingProfile) *Ranking {
}
}
type RankingWithAPIOpts struct {
*Ranking
APIOpts map[string]any
}
// Ranking is one unit out of a profile
type Ranking struct {
rMux sync.RWMutex
@@ -99,6 +104,10 @@ type Ranking struct {
}
func (r *Ranking) TenantID() string {
return utils.ConcatenatedKey(r.Tenant, r.ID)
}
type rankingSorter interface {
sortStatIDs() []string // sortStatIDs returns the sorted list of statIDs
}

View File

@@ -104,6 +104,9 @@ type DataDB interface {
SetRankingProfileDrv(sq *RankingProfile) (err error)
GetRankingProfileDrv(tenant string, id string) (sq *RankingProfile, err error)
RemRankingProfileDrv(tenant string, id string) (err error)
GetRankingDrv(string, string) (*Ranking, error)
SetRankingDrv(*Ranking) error
RemoveRankingDrv(string, string) error
SetTrendProfileDrv(tr *TrendProfile) (err error)
GetTrendProfileDrv(tenant string, id string) (sq *TrendProfile, err error)
RemTrendProfileDrv(tenant string, id string) (err error)

View File

@@ -648,6 +648,26 @@ func (iDB *InternalDB) GetRankingProfileDrv(tenant, id string) (sg *RankingProfi
return x.(*RankingProfile), nil
}
func (iDB *InternalDB) GetRankingDrv(tenant, id string) (rn *Ranking, err error) {
x, ok := iDB.db.Get(utils.CacheRankings, utils.ConcatenatedKey(tenant, id))
if !ok || x == nil {
return nil, utils.ErrNotFound
}
return x.(*Ranking), nil
}
func (iDB *InternalDB) SetRankingDrv(rn *Ranking) (err error) {
iDB.db.Set(utils.CacheRankings, rn.TenantID(), rn, nil,
true, utils.NonTransactional)
return
}
func (iDB *InternalDB) RemoveRankingDrv(tenant, id string) (err error) {
iDB.db.Remove(utils.CacheRankings, utils.ConcatenatedKey(tenant, id),
true, utils.NonTransactional)
return
}
func (iDB *InternalDB) GetThresholdProfileDrv(tenant, id string) (tp *ThresholdProfile, err error) {
x, ok := iDB.db.Get(utils.CacheThresholdProfiles, utils.ConcatenatedKey(tenant, id))
if !ok || x == nil {

View File

@@ -68,6 +68,7 @@ const (
ColTrd = "trends"
ColSqp = "statqueue_profiles"
ColRgp = "ranking_profiles"
ColRnk = "rankings"
ColTps = "threshold_profiles"
ColThs = "thresholds"
ColFlt = "filters"
@@ -303,7 +304,7 @@ func (ms *MongoStorage) ensureIndexesForCol(col string) error { // exported for
switch col {
case ColAct, ColApl, ColAAp, ColAtr, ColRpl, ColDst, ColRds, ColLht, ColIndx:
err = ms.enusureIndex(col, true, "key")
case ColRsP, ColRes, ColSqs, ColRgp, ColTrp, ColSqp, ColTps, ColThs, ColTrd, ColRts, ColAttr, ColFlt, ColCpp, ColDpp, ColDph:
case ColRsP, ColRes, ColSqs, ColRgp, ColTrp, ColRnk, ColSqp, ColTps, ColThs, ColTrd, ColRts, ColAttr, ColFlt, ColCpp, ColDpp, ColDph:
err = ms.enusureIndex(col, true, "tenant", "id")
case ColRpf, ColShg, ColAcc:
err = ms.enusureIndex(col, true, "id")
@@ -349,7 +350,7 @@ func (ms *MongoStorage) EnsureIndexes(cols ...string) error {
cols = []string{
ColAct, ColApl, ColAAp, ColAtr, ColRpl, ColDst, ColRds, ColLht, ColIndx,
ColRsP, ColRes, ColSqs, ColSqp, ColTps, ColThs, ColRts, ColAttr, ColFlt, ColCpp,
ColDpp, ColRpf, ColShg, ColAcc, ColRgp, ColTrp, ColTrd,
ColDpp, ColRpf, ColShg, ColAcc, ColRgp, ColTrp, ColTrd, ColRnk,
}
} else {
cols = []string{
@@ -442,6 +443,8 @@ func (ms *MongoStorage) RemoveKeysForPrefix(prefix string) error {
colName = ColTrp
case utils.TrendPrefix:
colName = ColTrd
case utils.RankingPrefix:
colName = ColRnk
case utils.ThresholdPrefix:
colName = ColThs
case utils.FilterPrefix:
@@ -619,6 +622,8 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (keys []string, err erro
keys, qryErr = ms.getAllKeysMatchingField(sctx, ColTmg, utils.TimingsPrefix, subject, "id")
case utils.TrendPrefix:
keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColTrd, utils.TrendPrefix, subject, tntID)
case utils.RankingPrefix:
keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColRnk, utils.RankingPrefix, subject, tntID)
case utils.FilterPrefix:
keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColFlt, utils.FilterPrefix, subject, tntID)
case utils.ThresholdPrefix:
@@ -685,6 +690,8 @@ func (ms *MongoStorage) HasDataDrv(category, subject, tenant string) (has bool,
count, err = ms.getCol(ColSqs).CountDocuments(sctx, bson.M{"tenant": tenant, "id": subject})
case utils.StatQueueProfilePrefix:
count, err = ms.getCol(ColSqp).CountDocuments(sctx, bson.M{"tenant": tenant, "id": subject})
case utils.RankingPrefix:
count, err = ms.getCol(ColRnk).CountDocuments(sctx, bson.M{"tenant": tenant, "id": subject})
case utils.RankingsProfilePrefix:
count, err = ms.getCol(ColSqp).CountDocuments(sctx, bson.M{"tenant": tenant, "id": subject})
case utils.TrendPrefix:
@@ -1567,7 +1574,38 @@ func (ms *MongoStorage) RemRankingProfileDrv(tenant, id string) (err error) {
}
return err
})
}
func (ms *MongoStorage) GetRankingDrv(tenant, id string) (*Ranking, error) {
rn := new(Ranking)
err := ms.query(func(sctx mongo.SessionContext) error {
sr := ms.getCol(ColRnk).FindOne(sctx, bson.M{"tenant": tenant, "id": id})
decodeErr := sr.Decode(rn)
if errors.Is(decodeErr, mongo.ErrNoDocuments) {
return utils.ErrNotFound
}
return decodeErr
})
return rn, err
}
func (ms *MongoStorage) SetRankingDrv(rn *Ranking) error {
return ms.query(func(sctx mongo.SessionContext) error {
_, err := ms.getCol(ColRnk).UpdateOne(sctx, bson.M{"tenant": rn.Tenant, "id": rn.ID},
bson.M{"$set": rn},
options.Update().SetUpsert(true),
)
return err
})
}
func (ms *MongoStorage) RemoveRankingDrv(tenant, id string) error {
return ms.query(func(sctx mongo.SessionContext) error {
dr, err := ms.getCol(ColRnk).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id})
if dr.DeletedCount == 0 {
return utils.ErrNotFound
}
return err
})
}
func (ms *MongoStorage) GetTrendProfileDrv(tenant, id string) (*TrendProfile, error) {

View File

@@ -1010,6 +1010,30 @@ func (rs *RedisStorage) RemRankingProfileDrv(tenant string, id string) (err erro
return rs.Cmd(nil, redis_DEL, utils.RankingsProfilePrefix+utils.ConcatenatedKey(tenant, id))
}
func (rs *RedisStorage) GetRankingDrv(tenant, id string) (rn *Ranking, err error) {
var values []byte
if err = rs.Cmd(&values, redis_GET, utils.RankingPrefix+utils.ConcatenatedKey(tenant, id)); err != nil {
return
} else if len(values) == 0 {
err = utils.ErrNotFound
return
}
err = rs.ms.Unmarshal(values, &rn)
return rn, err
}
func (rs *RedisStorage) SetRankingDrv(rn *Ranking) (err error) {
var result []byte
if result, err = rs.ms.Marshal(rn); err != nil {
return
}
return rs.Cmd(nil, redis_SET, utils.RankingPrefix+utils.ConcatenatedKey(rn.Tenant, rn.ID), string(result))
}
func (rs *RedisStorage) RemoveRankingDrv(tenant, id string) (err error) {
return rs.Cmd(nil, redis_DEL, utils.RankingPrefix+utils.ConcatenatedKey(tenant, id))
}
// GetThresholdProfileDrv retrieves a ThresholdProfile from dataDB
func (rs *RedisStorage) GetThresholdProfileDrv(tenant, ID string) (tp *ThresholdProfile, err error) {
var values []byte

View File

@@ -295,6 +295,7 @@ const (
ResourceProfilesPrefix = "rsp_"
ThresholdPrefix = "thd_"
TrendPrefix = "trd_"
RankingPrefix = "rnk_"
TimingsPrefix = "tmg_"
FilterPrefix = "ftr_"
CDRsStatsPrefix = "cst_"
@@ -1288,6 +1289,7 @@ const (
ReplicatorSv1GetThreshold = "ReplicatorSv1.GetThreshold"
ReplicatorSv1GetThresholdProfile = "ReplicatorSv1.GetThresholdProfile"
ReplicatorSv1GetStatQueueProfile = "ReplicatorSv1.GetStatQueueProfile"
ReplicatorSv1GetRanking = "ReplicatorSv1.GetRanking"
ReplicatorSv1GetRankingProfile = "ReplicatorSv1.GetRankingProfile"
ReplicatorSv1GetTrend = "ReplicatorSv1.GetTrend"
ReplicatorSv1GetTrendProfile = "ReplicatorSv1.GetTrendProfile"
@@ -1316,6 +1318,7 @@ const (
ReplicatorSv1SetStatQueue = "ReplicatorSv1.SetStatQueue"
ReplicatorSv1SetFilter = "ReplicatorSv1.SetFilter"
ReplicatorSv1SetStatQueueProfile = "ReplicatorSv1.SetStatQueueProfile"
ReplicatorSv1SetRanking = "ReplicatorSv1.SetRanking"
ReplicatorSv1SetRankingProfile = "ReplicatorSv1.SetRankingProfile"
ReplicatorSv1SetTrend = "ReplicatorSv1.SetTrend"
ReplicatorSv1SetTrendProfile = "ReplicatorSv1.SetTrendProfile"
@@ -1344,6 +1347,7 @@ const (
ReplicatorSv1RemoveFilter = "ReplicatorSv1.RemoveFilter"
ReplicatorSv1RemoveThresholdProfile = "ReplicatorSv1.RemoveThresholdProfile"
ReplicatorSv1RemoveStatQueueProfile = "ReplicatorSv1.RemoveStatQueueProfile"
ReplicatorSv1RemoveRanking = "ReplicatorSv1.RemoveRanking"
ReplicatorSv1RemoveRankingProfile = "ReplicatorSv1.RemoveRankingProfile"
ReplicatorSv1RemoveTrend = "ReplicatorSv1.RemoveTrend"
ReplicatorSv1RemoveTrendProfile = "ReplicatorSv1.RemoveTrendProfile"