From 4d7cb6c88ab9d799d3cd97d6082430ba582dabc7 Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 16 Oct 2024 18:42:13 +0200 Subject: [PATCH] Basic RankingS implementation --- engine/librankings.go | 59 ++++- engine/rankings.go | 543 ++++++++++++++++++++++++++++++++++++++++++ engine/trends.go | 12 +- services/rankings.go | 56 +++-- services/trends.go | 1 + utils/apitpdata.go | 20 +- utils/consts.go | 4 + 7 files changed, 660 insertions(+), 35 deletions(-) create mode 100644 engine/rankings.go diff --git a/engine/librankings.go b/engine/librankings.go index 0625d5f93..a82fce0a1 100644 --- a/engine/librankings.go +++ b/engine/librankings.go @@ -13,12 +13,13 @@ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License -along with this program. If not, see +along with this program. If not, see */ package engine import ( + "sync" "time" "github.com/cgrates/cgrates/utils" @@ -40,6 +41,58 @@ type RankingProfile struct { ThresholdIDs []string } -func (sgp *RankingProfile) TenantID() string { - return utils.ConcatenatedKey(sgp.Tenant, sgp.ID) +func (rkp *RankingProfile) TenantID() string { + return utils.ConcatenatedKey(rkp.Tenant, rkp.ID) +} + +// Clone will clone a RankingProfile +func (rkP *RankingProfile) Clone() (cln *RankingProfile) { + cln = &RankingProfile{ + Tenant: rkP.Tenant, + ID: rkP.ID, + QueryInterval: rkP.QueryInterval, + Sorting: rkP.Sorting, + } + if rkP.StatIDs != nil { + copy(cln.StatIDs, rkP.StatIDs) + } + if rkP.MetricIDs != nil { + copy(cln.MetricIDs, rkP.MetricIDs) + } + if rkP.SortingParameters != nil { + copy(cln.SortingParameters, rkP.SortingParameters) + } + if rkP.ThresholdIDs != nil { + copy(cln.ThresholdIDs, rkP.ThresholdIDs) + } + return +} + +// NewRankingFromProfile is a constructor for an empty ranking out of it's profile +func NewRankingFromProfile(rkP *RankingProfile) *Ranking { + return &Ranking{ + Tenant: rkP.Tenant, + ID: rkP.ID, + StatMetrics: make(map[string]map[string]float64), + + rkPrfl: rkP, + metricIDs: utils.NewStringSet(rkP.MetricIDs), + } +} + +// Ranking is one unit out of a profile +type Ranking struct { + rMux sync.RWMutex + + Tenant string + ID string + StatMetrics map[string]map[string]float64 // map[statID]map[metricID]metricValue + Sorting string + SortingParameters []string + + SortedStatIDs []string + + rkPrfl *RankingProfile // store here the ranking profile so we can have it at hands further + metricIDs utils.StringSet // convert the metricIDs here for faster matching + } diff --git a/engine/rankings.go b/engine/rankings.go new file mode 100644 index 000000000..ddaae413a --- /dev/null +++ b/engine/rankings.go @@ -0,0 +1,543 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "fmt" + "runtime" + "slices" + "strings" + "sync" + "time" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/cron" +) + +// NewRankingS is the constructor for RankingS +func NewRankingS(dm *DataManager, + connMgr *ConnManager, + filterS *FilterS, + cgrcfg *config.CGRConfig) *RankingS { + return &RankingS{ + dm: dm, + connMgr: connMgr, + filterS: filterS, + cgrcfg: cgrcfg, + crn: cron.New(), + crnTQsMux: new(sync.RWMutex), + crnTQs: make(map[string]map[string]cron.EntryID), + storedRankings: make(utils.StringSet), + storingStopped: make(chan struct{}), + rankingStop: make(chan struct{}), + } +} + +// RankingS is responsible of implementing the logic of RankingService +type RankingS struct { + dm *DataManager + connMgr *ConnManager + filterS *FilterS + cgrcfg *config.CGRConfig + + crn *cron.Cron // cron reference + + crnTQsMux *sync.RWMutex // protects the crnTQs + crnTQs map[string]map[string]cron.EntryID // save the EntryIDs for rankingQueries so we can reschedule them when needed + + storedRankings utils.StringSet // keep a record of RankingS which need saving, map[rankingTenanrkID]bool + sRksMux sync.RWMutex // protects storedRankings + storingStopped chan struct{} // signal back that the operations were stopped + + rankingStop chan struct{} // signal to stop all operations + +} + +// computeRanking will query the stats and build the Ranking for them +// +// it is to be called by Cron service +func (rkS *RankingS) computeRanking(rkP *RankingProfile) { + + /*rk, err := rkS.dm.GetRanking(tP.Tenant, tP.ID, true, true, utils.NonTransactional) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> querying rkP with id: <%s:%s> dm error: <%s>", + utils.RankingS, tP.Tenant, tP.ID, err.Error())) + return + } + */ + rk := NewRankingFromProfile(rkP) + rk.rMux.Lock() + defer rk.rMux.Unlock() + /*if trnd.tPrfl == nil { + trnd.tPrfl = tP + } + */ + + for _, statID := range rkP.StatIDs { + var floatMetrics map[string]float64 + if err := rkS.connMgr.Call(context.Background(), rkS.cgrcfg.RankingSCfg().StatSConns, + utils.StatSv1GetQueueFloatMetrics, + &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: rkP.Tenant, ID: statID}}, + &floatMetrics); err != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> computing Ranking with id: <%s:%s> for stats <%s> error: <%s>", + utils.RankingS, rkP.Tenant, rkP.ID, statID, err.Error())) + return + } + if len(rk.metricIDs) != 0 { + for metricID := range floatMetrics { + if _, has := rk.metricIDs[statID]; !has { + delete(floatMetrics, metricID) + } + } + } + if len(floatMetrics) != 0 { + rk.StatMetrics[statID] = make(map[string]float64) + } + for metricID, val := range floatMetrics { + rk.StatMetrics[statID][metricID] = val + } + } + + /* + if err = rkS.storeRanking(rk); err != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> setting Ranking with id: <%s:%s> DM error: <%s>", + utils.RankingS, rkP.Tenant, rkP.ID, err.Error())) + return + } + */ + if err := rkS.processThresholds(rk); err != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> Ranking with id <%s:%s> error: <%s> with ThresholdS", + utils.RankingS, rkP.Tenant, rkP.ID, err.Error())) + } + if err := rkS.processEEs(rk); err != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> Trend with id <%s:%s> error: <%s> with EEs", + utils.RankingS, rkP.Tenant, rkP.ID, err.Error())) + } + +} + +// processThresholds will pass the Ranking event to ThresholdS +func (rkS *RankingS) processThresholds(rk *Ranking) (err error) { + if len(rk.SortedStatIDs) == 0 { + return + } + if len(rkS.cgrcfg.TrendSCfg().ThresholdSConns) == 0 { + return + } + opts := map[string]any{ + utils.MetaEventType: utils.RankingUpdate, + } + var thIDs []string + if len(rk.rkPrfl.ThresholdIDs) != 0 { + if len(rk.rkPrfl.ThresholdIDs) == 1 && + rk.rkPrfl.ThresholdIDs[0] == utils.MetaNone { + return + } + thIDs = make([]string, len(rk.rkPrfl.ThresholdIDs)) + copy(thIDs, rk.rkPrfl.ThresholdIDs) + } + opts[utils.OptsThresholdsProfileIDs] = thIDs + ev := &utils.CGREvent{ + Tenant: rk.Tenant, + ID: utils.GenUUID(), + APIOpts: opts, + Event: map[string]any{ + utils.RankingID: rk.ID, + utils.SortedStatIDs: copy([]string{}, rk.SortedStatIDs), + }, + } + var withErrs bool + var rkIDs []string + if err := rkS.connMgr.Call(context.TODO(), rkS.cgrcfg.TrendSCfg().ThresholdSConns, + utils.ThresholdSv1ProcessEvent, ev, &rkIDs); err != nil && + (len(thIDs) != 0 || err.Error() != utils.ErrNotFound.Error()) { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.", utils.RankingS, err.Error(), ev)) + withErrs = true + } + if withErrs { + err = utils.ErrPartiallyExecuted + } + return +} + +// processEEs will pass the Ranking event to EEs +func (rkS *RankingS) processEEs(rk *Ranking) (err error) { + if len(rk.SortedStatIDs) == 0 { + return + } + if len(rkS.cgrcfg.TrendSCfg().EEsConns) == 0 { + return + } + opts := map[string]any{ + utils.MetaEventType: utils.RankingUpdate, + } + ev := &utils.CGREvent{ + Tenant: rk.Tenant, + ID: utils.GenUUID(), + APIOpts: opts, + Event: map[string]any{ + utils.RankingID: rk.ID, + utils.SortedStatIDs: copy([]string{}, rk.SortedStatIDs), + }, + } + var withErrs bool + var reply map[string]map[string]any + if err := rkS.connMgr.Call(context.TODO(), rkS.cgrcfg.TrendSCfg().EEsConns, + utils.EeSv1ProcessEvent, ev, &reply); err != nil && + err.Error() != utils.ErrNotFound.Error() { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %q processing event %+v with EEs.", utils.RankingS, err.Error(), ev)) + withErrs = true + } + if withErrs { + err = utils.ErrPartiallyExecuted + } + return +} + +// storeTrend will store or schedule the trend based on settings +func (rkS *RankingS) storeRanking(rk *Ranking) (err error) { + if rkS.cgrcfg.TrendSCfg().StoreInterval == 0 { + return + } + /* + if rkS.cgrcfg.TrendSCfg().StoreInterval == -1 { + return rkS.dm.SetRanking(rk) + } + */ + + // schedule the asynchronous save, relies for Ranking to be in cache + rkS.sRksMux.Lock() + rkS.storedRankings.Add(rk.rkPrfl.TenantID()) + rkS.sRksMux.Unlock() + return +} + +// storeRankings will do one round for saving modified Rankings +// +// from cache to dataDB +// designed to run asynchronously +func (rkS *RankingS) storeRankings() { + var failedRkIDs []string + for { + rkS.sRksMux.Lock() + rkID := rkS.storedRankings.GetOne() + if rkID != utils.EmptyString { + rkS.storedRankings.Remove(rkID) + } + rkS.sRksMux.Unlock() + if rkID == utils.EmptyString { + break // no more keys, backup completed + } + rkIf, ok := Cache.Get(utils.CacheRankings, rkID) + if !ok || rkIf == nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> failed retrieving from cache Ranking with ID: %q", + utils.RankingS, rkID)) + failedRkIDs = append(failedRkIDs, rkID) // record failure so we can schedule it for next backup + continue + } + rk := rkIf.(*Ranking) + rk.rMux.RLock() + /* + if err := rkS.dm.SetRanking(rk); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> failed storing Trend with ID: %q, err: %q", + utils.RankingS, rkID, err)) + failedRkIDs = append(failedRkIDs, rkID) // record failure so we can schedule it for next backup + } + */ + rk.rMux.RUnlock() + // randomize the CPU load and give up thread control + runtime.Gosched() + } + if len(failedRkIDs) != 0 { // there were errors on save, schedule the keys for next backup + rkS.sRksMux.Lock() + rkS.storedRankings.AddSlice(failedRkIDs) + rkS.sRksMux.Unlock() + } +} + +// asyncStoreRankings runs as a backround process, calling storeRankings based on storeInterval +func (rkS *RankingS) asyncStoreRankings() { + storeInterval := rkS.cgrcfg.TrendSCfg().StoreInterval + if storeInterval <= 0 { + close(rkS.storingStopped) + return + } + for { + rkS.storeRankings() + select { + case <-rkS.rankingStop: + close(rkS.storingStopped) + return + case <-time.After(storeInterval): // continue to another storing loop + } + } +} + +// StartRankings will activates the Cron, together with all scheduled Ranking queries +func (rkS *RankingS) StartRankingS() (err error) { + if err = rkS.scheduleAutomaticQueries(); err != nil { + return + } + rkS.crn.Start() + go rkS.asyncStoreRankings() + return +} + +// StopCron will shutdown the Cron tasks +func (rkS *RankingS) StopRankingS() { + timeEnd := time.Now().Add(rkS.cgrcfg.CoreSCfg().ShutdownTimeout) + + ctx := rkS.crn.Stop() + close(rkS.rankingStop) + + // Wait for cron + select { + case <-ctx.Done(): + case <-time.After(timeEnd.Sub(time.Now())): + utils.Logger.Warning( + fmt.Sprintf( + "<%s> timeout waiting for Cron to finish", + utils.RankingS)) + return + } + // Wait for backup and other operations + select { + case <-rkS.storingStopped: + case <-time.After(timeEnd.Sub(time.Now())): + utils.Logger.Warning( + fmt.Sprintf( + "<%s> timeout waiting for RankingS to finish", + utils.RankingS)) + return + } +} + +func (rkS *RankingS) Reload() { + ctx := rkS.crn.Stop() + close(rkS.rankingStop) + <-ctx.Done() + <-rkS.storingStopped + rkS.rankingStop = make(chan struct{}) + rkS.storingStopped = make(chan struct{}) + rkS.crn.Start() + go rkS.asyncStoreRankings() +} + +// scheduleAutomaticQueries will schedule the queries at start/reload based on configured +func (rkS *RankingS) scheduleAutomaticQueries() error { + schedData := make(map[string][]string) + for k, v := range rkS.cgrcfg.TrendSCfg().ScheduledIDs { + schedData[k] = v + } + var tnts []string + if len(schedData) == 0 { + tnts = make([]string, 0) + } + for tnt, rkIDs := range schedData { + if len(rkIDs) == 0 { + tnts = append(tnts, tnt) + } + } + if tnts != nil { + qrydData, err := rkS.dm.GetTrendProfileIDs(tnts) + if err != nil { + return err + } + for tnt, ids := range qrydData { + schedData[tnt] = ids + } + } + for tnt, rkIDs := range schedData { + if _, err := rkS.scheduleRankingQueries(context.TODO(), tnt, rkIDs); err != nil { + return err + } + } + return nil +} + +// scheduleTrendQueries will schedule/re-schedule specific trend queries +func (rkS *RankingS) scheduleRankingQueries(_ *context.Context, + tnt string, rkIDs []string) (scheduled int, err error) { + var partial bool + rkS.crnTQsMux.Lock() + if _, has := rkS.crnTQs[tnt]; !has { + rkS.crnTQs[tnt] = make(map[string]cron.EntryID) + } + rkS.crnTQsMux.Unlock() + for _, rkID := range rkIDs { + rkS.crnTQsMux.RLock() + if entryID, has := rkS.crnTQs[tnt][rkID]; has { + rkS.crn.Remove(entryID) // deschedule the query + } + rkS.crnTQsMux.RUnlock() + if rkP, err := rkS.dm.GetRankingProfile(tnt, rkID, true, true, utils.NonTransactional); err != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> failed retrieving RankingProfile with id: <%s:%s> for scheduling, error: <%s>", + utils.RankingS, tnt, rkID, err.Error())) + partial = true + } else if entryID, err := rkS.crn.AddFunc(utils.EmptyString, + func() { rkS.computeRanking(rkP.Clone()) }); err != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> scheduling RankingProfile <%s:%s>, error: <%s>", + utils.RankingS, tnt, rkID, err.Error())) + partial = true + } else { // log the entry ID for debugging + rkS.crnTQsMux.Lock() + rkS.crnTQs[rkP.Tenant][rkP.ID] = entryID + rkS.crnTQsMux.Unlock() + } + scheduled += 1 + } + if partial { + return 0, utils.ErrPartiallyExecuted + } + return +} + +// V1ScheduleQueries is the query for manually re-/scheduling Ranking Queries +func (rkS *RankingS) V1ScheduleQueries(ctx *context.Context, args *utils.ArgScheduleRankingQueries, scheduled *int) (err error) { + if sched, errSched := rkS.scheduleRankingQueries(ctx, args.Tenant, args.RankingIDs); errSched != nil { + return errSched + } else { + *scheduled = sched + } + return +} + +/* +// V1GetRanking is the API to return the Ranking instance +// +func (rkS *RankingS) V1GetRanking(ctx *context.Context, arg *utils.ArgGetRanking, retRanking *Ranking) (err error) { + if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) + } + var rk *Ranking + if rk, err = rkS.dm.GetRanking(arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil { + return + } + retRanking.Tenant = trnd.Tenant // avoid vet complaining for mutex copying + retTrend.ID = trnd.ID + starrkIDx := arg.RunIndexStart + if starrkIDx > len(trnd.RunTimes) { + starrkIDx = len(trnd.RunTimes) + } + endIdx := arg.RunIndexEnd + if endIdx > len(trnd.RunTimes) || + endIdx < starrkIDx || + endIdx == 0 { + endIdx = len(trnd.RunTimes) + } + runTimes := trnd.RunTimes[starrkIDx:endIdx] + if len(runTimes) == 0 { + return utils.ErrNotFound + } + var tStart, tEnd time.Time + if arg.RunTimeStart == utils.EmptyString { + tStart = runTimes[0] + } else if tStart, err = utils.ParseTimeDetectLayout(arg.RunTimeStart, rkS.cgrcfg.GeneralCfg().DefaultTimezone); err != nil { + return + } + if arg.RunTimeEnd == utils.EmptyString { + tEnd = runTimes[len(runTimes)-1].Add(time.Duration(1)) + } else if tEnd, err = utils.ParseTimeDetectLayout(arg.RunTimeEnd, rkS.cgrcfg.GeneralCfg().DefaultTimezone); err != nil { + return + } + retTrend.RunTimes = make([]time.Time, 0, len(runTimes)) + for _, runTime := range runTimes { + if !runTime.Before(tStart) && runTime.Before(tEnd) { + retTrend.RunTimes = append(retTrend.RunTimes, runTime) + } + } + if len(retTrend.RunTimes) == 0 { // filtered out all + return utils.ErrNotFound + } + retTrend.Metrics = make(map[time.Time]map[string]*MetricWithTrend) + for _, runTime := range retTrend.RunTimes { + retTrend.Metrics[runTime] = trnd.Metrics[runTime] + } + return +} +*/ + +// V1GetSchedule returns the active schedule for Raking queries +func (rkS *RankingS) V1GetSchedule(ctx *context.Context, args *utils.ArgScheduledRankings, schedRankings *[]utils.ScheduledRanking) (err error) { + tnt := args.Tenant + if tnt == utils.EmptyString { + tnt = rkS.cgrcfg.GeneralCfg().DefaultTenant + } + rkS.crnTQsMux.RLock() + defer rkS.crnTQsMux.RUnlock() + trendIDsMp, has := rkS.crnTQs[tnt] + if !has { + return utils.ErrNotFound + } + var scheduledRankings []utils.ScheduledRanking + var entryIds map[string]cron.EntryID + if len(args.RankingIDPrefixes) == 0 { + entryIds = trendIDsMp + } else { + entryIds = make(map[string]cron.EntryID) + for _, rkID := range args.RankingIDPrefixes { + for key, entryID := range trendIDsMp { + if strings.HasPrefix(key, rkID) { + entryIds[key] = entryID + } + } + } + } + if len(entryIds) == 0 { + return utils.ErrNotFound + } + var entry cron.Entry + for id, entryID := range entryIds { + entry = rkS.crn.Entry(entryID) + if entry.ID == 0 { + continue + } + scheduledRankings = append(scheduledRankings, + utils.ScheduledRanking{ + RankingID: id, + Next: entry.Next, + Previous: entry.Prev, + }) + } + slices.SortFunc(scheduledRankings, func(a, b utils.ScheduledRanking) int { + return a.Next.Compare(b.Next) + }) + *schedRankings = scheduledRankings + return nil +} diff --git a/engine/trends.go b/engine/trends.go index 0ad28027d..ca536ca73 100644 --- a/engine/trends.go +++ b/engine/trends.go @@ -82,7 +82,7 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) { &floatMetrics); err != nil { utils.Logger.Warning( fmt.Sprintf( - "<%s> computing trend for with id: <%s:%s> stats <%s> error: <%s>", + "<%s> computing trend with id: <%s:%s> for stats <%s> error: <%s>", utils.TrendS, tP.Tenant, tP.ID, tP.StatID, err.Error())) return } @@ -538,11 +538,11 @@ func (tS *TrendS) V1GetScheduledTrends(ctx *context.Context, args *utils.ArgSche } var scheduledTrends []utils.ScheduledTrend var entryIds map[string]cron.EntryID - if len(args.TrendIDPrefix) == 0 { + if len(args.TrendIDPrefixes) == 0 { entryIds = trendIDsMp } else { entryIds = make(map[string]cron.EntryID) - for _, tID := range args.TrendIDPrefix { + for _, tID := range args.TrendIDPrefixes { for key, entryID := range trendIDsMp { if strings.HasPrefix(key, tID) { entryIds[key] = entryID @@ -560,9 +560,9 @@ func (tS *TrendS) V1GetScheduledTrends(ctx *context.Context, args *utils.ArgSche continue } scheduledTrends = append(scheduledTrends, utils.ScheduledTrend{ - TrendID: id, - Next: entry.Next, - Prev: entry.Prev, + TrendID: id, + Next: entry.Next, + Previous: entry.Prev, }) } slices.SortFunc(scheduledTrends, func(a, b utils.ScheduledTrend) int { diff --git a/services/rankings.go b/services/rankings.go index 07789c6f7..2b2fe8bfa 100644 --- a/services/rankings.go +++ b/services/rankings.go @@ -61,63 +61,71 @@ type RankingService struct { connChan chan birpc.ClientConnector anz *AnalyzerService srvDep map[string]*sync.WaitGroup + rks *engine.RankingS } // Start should handle the sercive start -func (rg *RankingService) Start() error { - if rg.IsRunning() { +func (rk *RankingService) Start() error { + if rk.IsRunning() { return utils.ErrServiceAlreadyRunning } - rg.srvDep[utils.DataDB].Add(1) - <-rg.cacheS.GetPrecacheChannel(utils.CacheRankingProfiles) - <-rg.cacheS.GetPrecacheChannel(utils.CacheRankingFilterIndexes) + rk.srvDep[utils.DataDB].Add(1) + <-rk.cacheS.GetPrecacheChannel(utils.CacheRankingProfiles) + <-rk.cacheS.GetPrecacheChannel(utils.CacheRankingFilterIndexes) - filterS := <-rg.filterSChan - rg.filterSChan <- filterS - dbchan := rg.dm.GetDMChan() + filterS := <-rk.filterSChan + rk.filterSChan <- filterS + dbchan := rk.dm.GetDMChan() datadb := <-dbchan dbchan <- datadb utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.RankingS)) + + rk.rks = engine.NewRankingS(datadb, rk.connMgr, filterS, rk.cfg) + if err := rk.rks.StartRankingS(); err != nil { + return err + } srv, err := engine.NewService(v1.NewRankingSv1()) if err != nil { return err } - if !rg.cfg.DispatcherSCfg().Enabled { - rg.server.RpcRegister(srv) + if !rk.cfg.DispatcherSCfg().Enabled { + rk.server.RpcRegister(srv) } - rg.connChan <- rg.anz.GetInternalCodec(srv, utils.StatS) + rk.connChan <- rk.anz.GetInternalCodec(srv, utils.StatS) return nil } // Reload handles the change of config -func (rg *RankingService) Reload() (err error) { +func (rk *RankingService) Reload() (err error) { return } // Shutdown stops the service -func (rg *RankingService) Shutdown() (err error) { - defer rg.srvDep[utils.DataDB].Done() - rg.Lock() - defer rg.Unlock() - <-rg.connChan +func (rk *RankingService) Shutdown() (err error) { + defer rk.srvDep[utils.DataDB].Done() + rk.Lock() + defer rk.Unlock() + rk.rks.StopRankingS() + rk.rks = nil + <-rk.connChan return } // IsRunning returns if the service is running -func (rg *RankingService) IsRunning() bool { - rg.RLock() - defer rg.RUnlock() - return false +func (rk *RankingService) IsRunning() bool { + rk.RLock() + defer rk.RUnlock() + return rk.rks != nil } // ServiceName returns the service name -func (rg *RankingService) ServiceName() string { +func (rk *RankingService) ServiceName() string { return utils.RankingS } // ShouldRun returns if the service should be running -func (rg *RankingService) ShouldRun() bool { - return rg.cfg.RankingSCfg().Enabled +func (rk *RankingService) ShouldRun() bool { + return rk.cfg.RankingSCfg().Enabled } diff --git a/services/trends.go b/services/trends.go index 848993263..4854d5de2 100644 --- a/services/trends.go +++ b/services/trends.go @@ -112,6 +112,7 @@ func (tr *TrendService) Shutdown() (err error) { tr.Lock() defer tr.Unlock() tr.trs.StopTrendS() + tr.trs = nil <-tr.connChan return } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index ac284fb76..082f94133 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1660,7 +1660,7 @@ type ArgScheduleTrendQueries struct { } type ArgScheduledTrends struct { TenantIDWithAPIOpts - TrendIDPrefix []string + TrendIDPrefixes []string } type ArgGetTrend struct { @@ -1675,5 +1675,21 @@ type ArgGetTrend struct { type ScheduledTrend struct { TrendID string Next time.Time - Prev time.Time + Previous time.Time +} + +type ArgScheduleRankingQueries struct { + TenantIDWithAPIOpts + RankingIDs []string +} + +type ArgScheduledRankings struct { + TenantIDWithAPIOpts + RankingIDPrefixes []string +} + +type ScheduledRanking struct { + RankingID string + Next time.Time + Previous time.Time } diff --git a/utils/consts.go b/utils/consts.go index 09f4a87b8..73aea3edf 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -512,7 +512,9 @@ const ( TotalUsage = "TotalUsage" StatID = "StatID" StatIDs = "StatIDs" + SortedStatIDs = "SortedStatIDs" TrendID = "TrendID" + RankingID = "RankingID" BalanceType = "BalanceType" BalanceID = "BalanceID" BalanceDestinationIds = "BalanceDestinationIds" @@ -529,6 +531,7 @@ const ( AccountUpdate = "AccountUpdate" StatUpdate = "StatUpdate" TrendUpdate = "TrendUpdate" + RankingUpdate = "RankingUpdate" ResourceUpdate = "ResourceUpdate" CDR = "CDR" CDRs = "CDRs" @@ -1993,6 +1996,7 @@ const ( CacheRankingProfiles = "*ranking_profiles" CacheTrendProfiles = "*trend_profiles" CacheTrends = "*trends" + CacheRankings = "*rankings" CacheThresholdProfiles = "*threshold_profiles" CacheThresholds = "*thresholds" CacheFilters = "*filters"