added integration tests for ranking scheduling

This commit is contained in:
gezimbll
2024-10-22 17:38:51 +02:00
committed by Dan Christian Bogos
parent 5242ba2440
commit b8616282ee
29 changed files with 446 additions and 36 deletions

View File

@@ -60,6 +60,7 @@ var (
utils.StatQueueProfilePrefix: {},
utils.ThresholdPrefix: {},
utils.ThresholdProfilePrefix: {},
utils.RankingPrefix: {},
utils.RankingsProfilePrefix: {},
utils.FilterPrefix: {},
utils.RouteProfilePrefix: {},
@@ -193,6 +194,9 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b
case utils.RankingsProfilePrefix:
tntID := utils.NewTenantID(dataID)
_, err = dm.GetRankingProfile(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
case utils.RankingPrefix:
tntID := utils.NewTenantID(dataID)
_, err = dm.GetRanking(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
case utils.TimingsPrefix:
_, err = dm.GetTiming(dataID, true, utils.NonTransactional)
case utils.ThresholdProfilePrefix:
@@ -205,6 +209,12 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b
lkID := guardian.Guardian.GuardIDs("", config.CgrConfig().GeneralCfg().LockingTimeout, thresholdLockKey(tntID.Tenant, tntID.ID))
_, err = dm.GetThreshold(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
guardian.Guardian.UnguardIDs(lkID)
case utils.TrendsProfilePrefix:
tntID := utils.NewTenantID(dataID)
_, err = dm.GetTrendProfile(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
case utils.TrendPrefix:
tntID := utils.NewTenantID(dataID)
_, err = dm.GetTrend(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
case utils.FilterPrefix:
tntID := utils.NewTenantID(dataID)
_, err = dm.GetFilter(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
@@ -1608,9 +1618,9 @@ func (dm *DataManager) GetRankingProfileIDs(tenants []string) (rns map[string][]
keys = append(keys, tntkeys...)
}
}
if len(keys) == 0 {
return nil, utils.ErrNotFound
}
// if len(keys) == 0 {
// return nil, utils.ErrNotFound
// }
rns = make(map[string][]string)
for _, key := range keys {
indx := strings.Index(key, utils.ConcatenatedKeySep)
@@ -1621,23 +1631,33 @@ func (dm *DataManager) GetRankingProfileIDs(tenants []string) (rns map[string][]
return
}
func (dm *DataManager) SetRankingProfile(sgp *RankingProfile) (err error) {
func (dm *DataManager) SetRankingProfile(rnp *RankingProfile) (err error) {
if dm == nil {
return utils.ErrNoDatabaseConn
}
if err = dm.DataDB().SetRankingProfileDrv(sgp); err != nil {
oldRnk, err := dm.GetRankingProfile(rnp.Tenant, rnp.ID, true, false, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
return err
}
if err = dm.DataDB().SetRankingProfileDrv(rnp); err != nil {
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankingProfiles]; itm.Replicate {
err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.RankingsProfilePrefix, sgp.TenantID(),
utils.RankingsProfilePrefix, rnp.TenantID(),
utils.ReplicatorSv1SetRankingProfile,
&RankingProfileWithAPIOpts{
RankingProfile: sgp,
RankingProfile: rnp,
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)})
}
if oldRnk == nil || oldRnk.Sorting != rnp.Sorting ||
oldRnk.Schedule != rnp.Schedule {
if err = dm.SetRanking(NewRankingFromProfile(rnp)); err != nil {
return
}
}
return
}
@@ -1723,7 +1743,7 @@ func (dm *DataManager) SetRanking(rn *Ranking) (err error) {
if err = dm.DataDB().SetRankingDrv(rn); err != nil {
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrends]; itm.Replicate {
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankings]; itm.Replicate {
if err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.RankingPrefix, rn.TenantID(), // this are used to get the host IDs from cache

View File

@@ -58,15 +58,20 @@ func (rkP *RankingProfile) Clone() (cln *RankingProfile) {
Sorting: rkP.Sorting,
}
if rkP.StatIDs != nil {
cln.StatIDs = make([]string, len(rkP.StatIDs))
copy(cln.StatIDs, rkP.StatIDs)
}
if rkP.MetricIDs != nil {
cln.MetricIDs = make([]string, len(rkP.MetricIDs))
copy(cln.MetricIDs, rkP.MetricIDs)
}
if rkP.SortingParameters != nil {
cln.SortingParameters = make([]string, len(rkP.SortingParameters))
copy(cln.SortingParameters, rkP.SortingParameters)
}
if rkP.ThresholdIDs != nil {
cln.ThresholdIDs = make([]string, len(rkP.ThresholdIDs))
copy(cln.ThresholdIDs, rkP.ThresholdIDs)
}
return
@@ -84,6 +89,7 @@ func NewRankingFromProfile(rkP *RankingProfile) (rk *Ranking) {
metricIDs: utils.NewStringSet(rkP.MetricIDs),
}
if rkP.SortingParameters != nil {
rk.SortingParameters = make([]string, len(rkP.SortingParameters))
copy(rk.SortingParameters, rkP.SortingParameters)
}
return
@@ -123,6 +129,7 @@ func (rk *Ranking) asRankingSummary() (rkSm *RankingSummary) {
ID: rk.ID,
LastUpdate: rk.LastUpdate,
}
rkSm.SortedStatIDs = make([]string, len(rk.SortedStatIDs))
copy(rkSm.SortedStatIDs, rk.SortedStatIDs)
return
}

View File

@@ -281,6 +281,7 @@ func GetDefaultEmptyCacheStats() map[string]*ltcache.CacheStats {
utils.CacheStatFilterIndexes: {},
utils.CacheStatQueueProfiles: {},
utils.CacheStatQueues: {},
utils.CacheRankings: {},
utils.CacheRankingProfiles: {},
utils.CacheSTIR: {},
utils.CacheRouteFilterIndexes: {},
@@ -351,7 +352,7 @@ type TestEngine struct {
func (ng TestEngine) Run(t testing.TB, extraFlags ...string) (*birpc.Client, *config.CGRConfig) {
t.Helper()
cfg := parseCfg(t, ng.ConfigPath, ng.ConfigJSON, ng.DBCfg)
flushDBs(t, cfg, !ng.PreserveDataDB, !ng.PreserveStorDB)
FlushDBs(t, cfg, !ng.PreserveDataDB, !ng.PreserveStorDB)
if ng.PreStartHook != nil {
ng.PreStartHook(t, cfg)
}
@@ -525,7 +526,7 @@ func LoadCSVs(t testing.TB, client *birpc.Client, tpPath string, csvFiles map[st
}
// flushDBs resets the databases specified in the configuration if the corresponding flags are true.
func flushDBs(t testing.TB, cfg *config.CGRConfig, flushDataDB, flushStorDB bool) {
func FlushDBs(t testing.TB, cfg *config.CGRConfig, flushDataDB, flushStorDB bool) {
t.Helper()
if flushDataDB {
if err := InitDataDb(cfg); err != nil {

View File

@@ -61,6 +61,7 @@ func TestGetDefaultEmptyCacheStats(t *testing.T) {
utils.CacheStatFilterIndexes,
utils.CacheStatQueueProfiles,
utils.CacheStatQueues,
utils.CacheRankings,
utils.CacheRankingProfiles,
utils.CacheSTIR,
utils.CacheRouteFilterIndexes,

View File

@@ -1472,6 +1472,7 @@ func (models RankingsMdls) AsTPRanking() (result []*utils.TPRankingProfile) {
thresholdMap := make(map[string]utils.StringSet)
metricsMap := make(map[string]utils.StringSet)
sortingParameterMap := make(map[string]utils.StringSet)
sortingParameterSlice := make(map[string][]string)
statsMap := make(map[string]utils.StringSet)
mrg := make(map[string]*utils.TPRankingProfile)
for _, model := range models {
@@ -1511,9 +1512,17 @@ func (models RankingsMdls) AsTPRanking() (result []*utils.TPRankingProfile) {
if model.SortingParameters != utils.EmptyString {
if _, has := sortingParameterMap[key.TenantID()]; !has {
sortingParameterMap[key.TenantID()] = make(utils.StringSet)
sortingParameterSlice[key.TenantID()] = make([]string, 0)
}
spltSl := strings.Split(model.SortingParameters, utils.InfieldSep)
for _, splt := range spltSl {
if _, has := sortingParameterMap[key.TenantID()][splt]; !has {
sortingParameterMap[key.TenantID()].Add(splt)
sortingParameterSlice[key.TenantID()] = append(sortingParameterSlice[key.TenantID()], splt)
}
}
sortingParameterMap[key.TenantID()].AddSlice(strings.Split(model.SortingParameters, utils.InfieldSep))
}
if model.MetricIDs != utils.EmptyString {
if _, has := metricsMap[key.TenantID()]; !has {
metricsMap[key.TenantID()] = make(utils.StringSet)
@@ -1528,7 +1537,7 @@ func (models RankingsMdls) AsTPRanking() (result []*utils.TPRankingProfile) {
result[i] = rg
result[i].StatIDs = statsMap[tntID].AsSlice()
result[i].MetricIDs = metricsMap[tntID].AsSlice()
result[i].SortingParameters = sortingParameterMap[tntID].AsSlice()
result[i].SortingParameters = sortingParameterSlice[tntID]
result[i].ThresholdIDs = thresholdMap[tntID].AsOrderedSlice()
i++
}

View File

@@ -118,7 +118,6 @@ func (rkS *RankingS) computeRanking(rkP *RankingProfile) {
rk.Metrics[statID][metricID] = val
}
}
if rk.SortedStatIDs, err = rankingSortStats(rkP.Sorting,
rkP.SortingParameters, rk.Metrics); err != nil {
utils.Logger.Warning(
@@ -169,6 +168,8 @@ func (rkS *RankingS) processThresholds(rk *Ranking) (err error) {
copy(thIDs, rk.rkPrfl.ThresholdIDs)
}
opts[utils.OptsThresholdsProfileIDs] = thIDs
sortedStatIDs := make([]string, len(rk.SortedStatIDs))
copy(sortedStatIDs, rk.SortedStatIDs)
ev := &utils.CGREvent{
Tenant: rk.Tenant,
ID: utils.GenUUID(),
@@ -176,7 +177,7 @@ func (rkS *RankingS) processThresholds(rk *Ranking) (err error) {
Event: map[string]any{
utils.RankingID: rk.ID,
utils.LastUpdate: rk.LastUpdate,
utils.SortedStatIDs: copy([]string{}, rk.SortedStatIDs),
utils.SortedStatIDs: sortedStatIDs,
},
}
var withErrs bool
@@ -205,6 +206,8 @@ func (rkS *RankingS) processEEs(rk *Ranking) (err error) {
opts := map[string]any{
utils.MetaEventType: utils.RankingUpdate,
}
sortedStatIDs := make([]string, len(rk.SortedStatIDs))
copy(sortedStatIDs, rk.SortedStatIDs)
ev := &utils.CGREvent{
Tenant: rk.Tenant,
ID: utils.GenUUID(),
@@ -212,7 +215,7 @@ func (rkS *RankingS) processEEs(rk *Ranking) (err error) {
Event: map[string]any{
utils.RankingID: rk.ID,
utils.LastUpdate: rk.LastUpdate,
utils.SortedStatIDs: copy([]string{}, rk.SortedStatIDs),
utils.SortedStatIDs: sortedStatIDs,
},
}
var withErrs bool
@@ -372,7 +375,7 @@ func (rkS *RankingS) scheduleAutomaticQueries() error {
}
}
if tnts != nil {
qrydData, err := rkS.dm.GetTrendProfileIDs(tnts)
qrydData, err := rkS.dm.GetRankingProfileIDs(tnts)
if err != nil {
return err
}
@@ -409,7 +412,7 @@ func (rkS *RankingS) scheduleRankingQueries(_ *context.Context,
"<%s> failed retrieving RankingProfile with id: <%s:%s> for scheduling, error: <%s>",
utils.RankingS, tnt, rkID, err.Error()))
partial = true
} else if entryID, err := rkS.crn.AddFunc(utils.EmptyString,
} else if entryID, err := rkS.crn.AddFunc(rkP.Schedule,
func() { rkS.computeRanking(rkP.Clone()) }); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
@@ -420,8 +423,8 @@ func (rkS *RankingS) scheduleRankingQueries(_ *context.Context,
rkS.crnTQsMux.Lock()
rkS.crnTQs[rkP.Tenant][rkP.ID] = entryID
rkS.crnTQsMux.Unlock()
scheduled++
}
scheduled += 1
}
if partial {
return 0, utils.ErrPartiallyExecuted
@@ -459,8 +462,13 @@ func (rkS *RankingS) V1GetRanking(ctx *context.Context, arg *utils.TenantIDWithA
retRanking.Metrics[statID][metricID] = val
}
}
retRanking.LastUpdate = rk.LastUpdate
retRanking.Sorting = rk.Sorting
retRanking.SortingParameters = make([]string, len(rk.SortingParameters))
copy(retRanking.SortingParameters, rk.SortingParameters)
retRanking.SortedStatIDs = make([]string, len(rk.SortedStatIDs))
copy(retRanking.SortedStatIDs, rk.SortedStatIDs)
return
}
@@ -473,18 +481,18 @@ func (rkS *RankingS) V1GetSchedule(ctx *context.Context, args *utils.ArgSchedule
}
rkS.crnTQsMux.RLock()
defer rkS.crnTQsMux.RUnlock()
trendIDsMp, has := rkS.crnTQs[tnt]
rankingIDsMp, has := rkS.crnTQs[tnt]
if !has {
return utils.ErrNotFound
}
var scheduledRankings []utils.ScheduledRanking
var entryIds map[string]cron.EntryID
if len(args.RankingIDPrefixes) == 0 {
entryIds = trendIDsMp
entryIds = rankingIDsMp
} else {
entryIds = make(map[string]cron.EntryID)
for _, rkID := range args.RankingIDPrefixes {
for key, entryID := range trendIDsMp {
for key, entryID := range rankingIDsMp {
if strings.HasPrefix(key, rkID) {
entryIds[key] = entryID
}

View File

@@ -1633,9 +1633,9 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) {
if verbose {
log.Print("RankingProfiles:")
}
for _, tpSG := range tpr.rgProfiles {
for _, tpRN := range tpr.rgProfiles {
var sg *RankingProfile
if sg, err = APItoRanking(tpSG); err != nil {
if sg, err = APItoRanking(tpRN); err != nil {
return
}
if err = tpr.dm.SetRankingProfile(sg); err != nil {