diff --git a/engine/librankings.go b/engine/librankings.go index a4097bf1b..fc8f43bab 100644 --- a/engine/librankings.go +++ b/engine/librankings.go @@ -71,15 +71,20 @@ func (rkP *RankingProfile) Clone() (cln *RankingProfile) { } // NewRankingFromProfile is a constructor for an empty ranking out of it's profile -func NewRankingFromProfile(rkP *RankingProfile) *Ranking { - return &Ranking{ +func NewRankingFromProfile(rkP *RankingProfile) (rk *Ranking) { + rk = &Ranking{ Tenant: rkP.Tenant, ID: rkP.ID, + Sorting: rkP.Sorting, StatMetrics: make(map[string]map[string]float64), rkPrfl: rkP, metricIDs: utils.NewStringSet(rkP.MetricIDs), } + if rkP.SortingParameters != nil { + copy(rk.SortingParameters, rkP.SortingParameters) + } + return } type RankingWithAPIOpts struct { diff --git a/engine/rankings.go b/engine/rankings.go index ddaae413a..cf2de11a3 100644 --- a/engine/rankings.go +++ b/engine/rankings.go @@ -75,24 +75,19 @@ type RankingS struct { // // it is to be called by Cron service func (rkS *RankingS) computeRanking(rkP *RankingProfile) { - - /*rk, err := rkS.dm.GetRanking(tP.Tenant, tP.ID, true, true, utils.NonTransactional) + rk, err := rkS.dm.GetRanking(rkP.Tenant, rkP.ID, true, true, utils.NonTransactional) if err != nil { utils.Logger.Warning( fmt.Sprintf( - "<%s> querying rkP with id: <%s:%s> dm error: <%s>", - utils.RankingS, tP.Tenant, tP.ID, err.Error())) + "<%s> querying RankingProfile with ID: <%s:%s> dm error: <%s>", + utils.RankingS, rkP.Tenant, rkP.ID, err.Error())) return } - */ - rk := NewRankingFromProfile(rkP) rk.rMux.Lock() defer rk.rMux.Unlock() - /*if trnd.tPrfl == nil { - trnd.tPrfl = tP + if rk.rkPrfl == nil { + rk.rkPrfl = rkP } - */ - for _, statID := range rkP.StatIDs { var floatMetrics map[string]float64 if err := rkS.connMgr.Call(context.Background(), rkS.cgrcfg.RankingSCfg().StatSConns, @@ -101,7 +96,7 @@ func (rkS *RankingS) computeRanking(rkP *RankingProfile) { &floatMetrics); err != nil { utils.Logger.Warning( fmt.Sprintf( - "<%s> computing Ranking with id: <%s:%s> for stats <%s> error: <%s>", + "<%s> computing Ranking with ID: <%s:%s> for Stats <%s> error: <%s>", utils.RankingS, rkP.Tenant, rkP.ID, statID, err.Error())) return } @@ -119,16 +114,21 @@ func (rkS *RankingS) computeRanking(rkP *RankingProfile) { rk.StatMetrics[statID][metricID] = val } } - - /* - if err = rkS.storeRanking(rk); err != nil { - utils.Logger.Warning( - fmt.Sprintf( - "<%s> setting Ranking with id: <%s:%s> DM error: <%s>", - utils.RankingS, rkP.Tenant, rkP.ID, err.Error())) - return - } - */ + if rk.SortedStatIDs, err = rankingSortStats(rkP.Sorting, + rkP.SortingParameters, rk.StatMetrics); err != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> sorting stats for Ranking with ID: <%s:%s> error: <%s>", + utils.RankingS, rkP.Tenant, rkP.ID, err.Error())) + return + } + if err = rkS.storeRanking(rk); err != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> storing Ranking with ID: <%s:%s> DM error: <%s>", + utils.RankingS, rkP.Tenant, rkP.ID, err.Error())) + return + } if err := rkS.processThresholds(rk); err != nil { utils.Logger.Warning( fmt.Sprintf( @@ -141,7 +141,6 @@ func (rkS *RankingS) computeRanking(rkP *RankingProfile) { "<%s> Trend with id <%s:%s> error: <%s> with EEs", utils.RankingS, rkP.Tenant, rkP.ID, err.Error())) } - } // processThresholds will pass the Ranking event to ThresholdS @@ -149,7 +148,7 @@ func (rkS *RankingS) processThresholds(rk *Ranking) (err error) { if len(rk.SortedStatIDs) == 0 { return } - if len(rkS.cgrcfg.TrendSCfg().ThresholdSConns) == 0 { + if len(rkS.cgrcfg.RankingSCfg().ThresholdSConns) == 0 { return } opts := map[string]any{ @@ -176,7 +175,7 @@ func (rkS *RankingS) processThresholds(rk *Ranking) (err error) { } var withErrs bool var rkIDs []string - if err := rkS.connMgr.Call(context.TODO(), rkS.cgrcfg.TrendSCfg().ThresholdSConns, + if err := rkS.connMgr.Call(context.TODO(), rkS.cgrcfg.RankingSCfg().ThresholdSConns, utils.ThresholdSv1ProcessEvent, ev, &rkIDs); err != nil && (len(thIDs) != 0 || err.Error() != utils.ErrNotFound.Error()) { utils.Logger.Warning( @@ -194,7 +193,7 @@ func (rkS *RankingS) processEEs(rk *Ranking) (err error) { if len(rk.SortedStatIDs) == 0 { return } - if len(rkS.cgrcfg.TrendSCfg().EEsConns) == 0 { + if len(rkS.cgrcfg.RankingSCfg().EEsConns) == 0 { return } opts := map[string]any{ @@ -211,7 +210,7 @@ func (rkS *RankingS) processEEs(rk *Ranking) (err error) { } var withErrs bool var reply map[string]map[string]any - if err := rkS.connMgr.Call(context.TODO(), rkS.cgrcfg.TrendSCfg().EEsConns, + if err := rkS.connMgr.Call(context.TODO(), rkS.cgrcfg.RankingSCfg().EEsConns, utils.EeSv1ProcessEvent, ev, &reply); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( @@ -226,15 +225,12 @@ func (rkS *RankingS) processEEs(rk *Ranking) (err error) { // storeTrend will store or schedule the trend based on settings func (rkS *RankingS) storeRanking(rk *Ranking) (err error) { - if rkS.cgrcfg.TrendSCfg().StoreInterval == 0 { + if rkS.cgrcfg.RankingSCfg().StoreInterval == 0 { return } - /* - if rkS.cgrcfg.TrendSCfg().StoreInterval == -1 { - return rkS.dm.SetRanking(rk) - } - */ - + if rkS.cgrcfg.RankingSCfg().StoreInterval == -1 { + return rkS.dm.SetRanking(rk) + } // schedule the asynchronous save, relies for Ranking to be in cache rkS.sRksMux.Lock() rkS.storedRankings.Add(rk.rkPrfl.TenantID()) @@ -268,14 +264,12 @@ func (rkS *RankingS) storeRankings() { } rk := rkIf.(*Ranking) rk.rMux.RLock() - /* - if err := rkS.dm.SetRanking(rk); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> failed storing Trend with ID: %q, err: %q", - utils.RankingS, rkID, err)) - failedRkIDs = append(failedRkIDs, rkID) // record failure so we can schedule it for next backup - } - */ + if err := rkS.dm.SetRanking(rk); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> failed storing Trend with ID: %q, err: %q", + utils.RankingS, rkID, err)) + failedRkIDs = append(failedRkIDs, rkID) // record failure so we can schedule it for next backup + } rk.rMux.RUnlock() // randomize the CPU load and give up thread control runtime.Gosched() @@ -289,7 +283,7 @@ func (rkS *RankingS) storeRankings() { // asyncStoreRankings runs as a backround process, calling storeRankings based on storeInterval func (rkS *RankingS) asyncStoreRankings() { - storeInterval := rkS.cgrcfg.TrendSCfg().StoreInterval + storeInterval := rkS.cgrcfg.RankingSCfg().StoreInterval if storeInterval <= 0 { close(rkS.storingStopped) return @@ -358,7 +352,7 @@ func (rkS *RankingS) Reload() { // scheduleAutomaticQueries will schedule the queries at start/reload based on configured func (rkS *RankingS) scheduleAutomaticQueries() error { schedData := make(map[string][]string) - for k, v := range rkS.cgrcfg.TrendSCfg().ScheduledIDs { + for k, v := range rkS.cgrcfg.RankingSCfg().ScheduledIDs { schedData[k] = v } var tnts []string @@ -438,10 +432,8 @@ func (rkS *RankingS) V1ScheduleQueries(ctx *context.Context, args *utils.ArgSche return } -/* // V1GetRanking is the API to return the Ranking instance -// -func (rkS *RankingS) V1GetRanking(ctx *context.Context, arg *utils.ArgGetRanking, retRanking *Ranking) (err error) { +func (rkS *RankingS) V1GetRanking(ctx *context.Context, arg *utils.TenantIDWithAPIOpts, retRanking *Ranking) (err error) { if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing return utils.NewErrMandatoryIeMissing(missing...) } @@ -449,49 +441,22 @@ func (rkS *RankingS) V1GetRanking(ctx *context.Context, arg *utils.ArgGetRanking if rk, err = rkS.dm.GetRanking(arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil { return } - retRanking.Tenant = trnd.Tenant // avoid vet complaining for mutex copying - retTrend.ID = trnd.ID - starrkIDx := arg.RunIndexStart - if starrkIDx > len(trnd.RunTimes) { - starrkIDx = len(trnd.RunTimes) - } - endIdx := arg.RunIndexEnd - if endIdx > len(trnd.RunTimes) || - endIdx < starrkIDx || - endIdx == 0 { - endIdx = len(trnd.RunTimes) - } - runTimes := trnd.RunTimes[starrkIDx:endIdx] - if len(runTimes) == 0 { - return utils.ErrNotFound - } - var tStart, tEnd time.Time - if arg.RunTimeStart == utils.EmptyString { - tStart = runTimes[0] - } else if tStart, err = utils.ParseTimeDetectLayout(arg.RunTimeStart, rkS.cgrcfg.GeneralCfg().DefaultTimezone); err != nil { - return - } - if arg.RunTimeEnd == utils.EmptyString { - tEnd = runTimes[len(runTimes)-1].Add(time.Duration(1)) - } else if tEnd, err = utils.ParseTimeDetectLayout(arg.RunTimeEnd, rkS.cgrcfg.GeneralCfg().DefaultTimezone); err != nil { - return - } - retTrend.RunTimes = make([]time.Time, 0, len(runTimes)) - for _, runTime := range runTimes { - if !runTime.Before(tStart) && runTime.Before(tEnd) { - retTrend.RunTimes = append(retTrend.RunTimes, runTime) + rk.rMux.RLock() + defer rk.rMux.RUnlock() + retRanking.Tenant = rk.Tenant // avoid vet complaining for mutex copying + retRanking.ID = rk.ID + retRanking.StatMetrics = make(map[string]map[string]float64) + for statID, metrics := range rk.StatMetrics { + retRanking.StatMetrics[statID] = make(map[string]float64) + for metricID, val := range metrics { + retRanking.StatMetrics[statID][metricID] = val } } - if len(retTrend.RunTimes) == 0 { // filtered out all - return utils.ErrNotFound - } - retTrend.Metrics = make(map[time.Time]map[string]*MetricWithTrend) - for _, runTime := range retTrend.RunTimes { - retTrend.Metrics[runTime] = trnd.Metrics[runTime] - } + retRanking.Sorting = rk.Sorting + copy(retRanking.SortingParameters, rk.SortingParameters) + copy(retRanking.SortedStatIDs, rk.SortedStatIDs) return } -*/ // V1GetSchedule returns the active schedule for Raking queries func (rkS *RankingS) V1GetSchedule(ctx *context.Context, args *utils.ArgScheduledRankings, schedRankings *[]utils.ScheduledRanking) (err error) {