diff --git a/engine/librankings.go b/engine/librankings.go
index 0625d5f93..a82fce0a1 100644
--- a/engine/librankings.go
+++ b/engine/librankings.go
@@ -13,12 +13,13 @@ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
-along with this program. If not, see
+along with this program. If not, see
*/
package engine
import (
+ "sync"
"time"
"github.com/cgrates/cgrates/utils"
@@ -40,6 +41,58 @@ type RankingProfile struct {
ThresholdIDs []string
}
-func (sgp *RankingProfile) TenantID() string {
- return utils.ConcatenatedKey(sgp.Tenant, sgp.ID)
+func (rkp *RankingProfile) TenantID() string {
+ return utils.ConcatenatedKey(rkp.Tenant, rkp.ID)
+}
+
+// Clone will clone a RankingProfile
+func (rkP *RankingProfile) Clone() (cln *RankingProfile) {
+ cln = &RankingProfile{
+ Tenant: rkP.Tenant,
+ ID: rkP.ID,
+ QueryInterval: rkP.QueryInterval,
+ Sorting: rkP.Sorting,
+ }
+ if rkP.StatIDs != nil {
+ copy(cln.StatIDs, rkP.StatIDs)
+ }
+ if rkP.MetricIDs != nil {
+ copy(cln.MetricIDs, rkP.MetricIDs)
+ }
+ if rkP.SortingParameters != nil {
+ copy(cln.SortingParameters, rkP.SortingParameters)
+ }
+ if rkP.ThresholdIDs != nil {
+ copy(cln.ThresholdIDs, rkP.ThresholdIDs)
+ }
+ return
+}
+
+// NewRankingFromProfile is a constructor for an empty ranking out of it's profile
+func NewRankingFromProfile(rkP *RankingProfile) *Ranking {
+ return &Ranking{
+ Tenant: rkP.Tenant,
+ ID: rkP.ID,
+ StatMetrics: make(map[string]map[string]float64),
+
+ rkPrfl: rkP,
+ metricIDs: utils.NewStringSet(rkP.MetricIDs),
+ }
+}
+
+// Ranking is one unit out of a profile
+type Ranking struct {
+ rMux sync.RWMutex
+
+ Tenant string
+ ID string
+ StatMetrics map[string]map[string]float64 // map[statID]map[metricID]metricValue
+ Sorting string
+ SortingParameters []string
+
+ SortedStatIDs []string
+
+ rkPrfl *RankingProfile // store here the ranking profile so we can have it at hands further
+ metricIDs utils.StringSet // convert the metricIDs here for faster matching
+
}
diff --git a/engine/rankings.go b/engine/rankings.go
new file mode 100644
index 000000000..ddaae413a
--- /dev/null
+++ b/engine/rankings.go
@@ -0,0 +1,543 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package engine
+
+import (
+ "fmt"
+ "runtime"
+ "slices"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/cgrates/birpc/context"
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/utils"
+ "github.com/cgrates/cron"
+)
+
+// NewRankingS is the constructor for RankingS
+func NewRankingS(dm *DataManager,
+ connMgr *ConnManager,
+ filterS *FilterS,
+ cgrcfg *config.CGRConfig) *RankingS {
+ return &RankingS{
+ dm: dm,
+ connMgr: connMgr,
+ filterS: filterS,
+ cgrcfg: cgrcfg,
+ crn: cron.New(),
+ crnTQsMux: new(sync.RWMutex),
+ crnTQs: make(map[string]map[string]cron.EntryID),
+ storedRankings: make(utils.StringSet),
+ storingStopped: make(chan struct{}),
+ rankingStop: make(chan struct{}),
+ }
+}
+
+// RankingS is responsible of implementing the logic of RankingService
+type RankingS struct {
+ dm *DataManager
+ connMgr *ConnManager
+ filterS *FilterS
+ cgrcfg *config.CGRConfig
+
+ crn *cron.Cron // cron reference
+
+ crnTQsMux *sync.RWMutex // protects the crnTQs
+ crnTQs map[string]map[string]cron.EntryID // save the EntryIDs for rankingQueries so we can reschedule them when needed
+
+ storedRankings utils.StringSet // keep a record of RankingS which need saving, map[rankingTenanrkID]bool
+ sRksMux sync.RWMutex // protects storedRankings
+ storingStopped chan struct{} // signal back that the operations were stopped
+
+ rankingStop chan struct{} // signal to stop all operations
+
+}
+
+// computeRanking will query the stats and build the Ranking for them
+//
+// it is to be called by Cron service
+func (rkS *RankingS) computeRanking(rkP *RankingProfile) {
+
+ /*rk, err := rkS.dm.GetRanking(tP.Tenant, tP.ID, true, true, utils.NonTransactional)
+ if err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf(
+ "<%s> querying rkP with id: <%s:%s> dm error: <%s>",
+ utils.RankingS, tP.Tenant, tP.ID, err.Error()))
+ return
+ }
+ */
+ rk := NewRankingFromProfile(rkP)
+ rk.rMux.Lock()
+ defer rk.rMux.Unlock()
+ /*if trnd.tPrfl == nil {
+ trnd.tPrfl = tP
+ }
+ */
+
+ for _, statID := range rkP.StatIDs {
+ var floatMetrics map[string]float64
+ if err := rkS.connMgr.Call(context.Background(), rkS.cgrcfg.RankingSCfg().StatSConns,
+ utils.StatSv1GetQueueFloatMetrics,
+ &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: rkP.Tenant, ID: statID}},
+ &floatMetrics); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf(
+ "<%s> computing Ranking with id: <%s:%s> for stats <%s> error: <%s>",
+ utils.RankingS, rkP.Tenant, rkP.ID, statID, err.Error()))
+ return
+ }
+ if len(rk.metricIDs) != 0 {
+ for metricID := range floatMetrics {
+ if _, has := rk.metricIDs[statID]; !has {
+ delete(floatMetrics, metricID)
+ }
+ }
+ }
+ if len(floatMetrics) != 0 {
+ rk.StatMetrics[statID] = make(map[string]float64)
+ }
+ for metricID, val := range floatMetrics {
+ rk.StatMetrics[statID][metricID] = val
+ }
+ }
+
+ /*
+ if err = rkS.storeRanking(rk); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf(
+ "<%s> setting Ranking with id: <%s:%s> DM error: <%s>",
+ utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
+ return
+ }
+ */
+ if err := rkS.processThresholds(rk); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf(
+ "<%s> Ranking with id <%s:%s> error: <%s> with ThresholdS",
+ utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
+ }
+ if err := rkS.processEEs(rk); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf(
+ "<%s> Trend with id <%s:%s> error: <%s> with EEs",
+ utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
+ }
+
+}
+
+// processThresholds will pass the Ranking event to ThresholdS
+func (rkS *RankingS) processThresholds(rk *Ranking) (err error) {
+ if len(rk.SortedStatIDs) == 0 {
+ return
+ }
+ if len(rkS.cgrcfg.TrendSCfg().ThresholdSConns) == 0 {
+ return
+ }
+ opts := map[string]any{
+ utils.MetaEventType: utils.RankingUpdate,
+ }
+ var thIDs []string
+ if len(rk.rkPrfl.ThresholdIDs) != 0 {
+ if len(rk.rkPrfl.ThresholdIDs) == 1 &&
+ rk.rkPrfl.ThresholdIDs[0] == utils.MetaNone {
+ return
+ }
+ thIDs = make([]string, len(rk.rkPrfl.ThresholdIDs))
+ copy(thIDs, rk.rkPrfl.ThresholdIDs)
+ }
+ opts[utils.OptsThresholdsProfileIDs] = thIDs
+ ev := &utils.CGREvent{
+ Tenant: rk.Tenant,
+ ID: utils.GenUUID(),
+ APIOpts: opts,
+ Event: map[string]any{
+ utils.RankingID: rk.ID,
+ utils.SortedStatIDs: copy([]string{}, rk.SortedStatIDs),
+ },
+ }
+ var withErrs bool
+ var rkIDs []string
+ if err := rkS.connMgr.Call(context.TODO(), rkS.cgrcfg.TrendSCfg().ThresholdSConns,
+ utils.ThresholdSv1ProcessEvent, ev, &rkIDs); err != nil &&
+ (len(thIDs) != 0 || err.Error() != utils.ErrNotFound.Error()) {
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.", utils.RankingS, err.Error(), ev))
+ withErrs = true
+ }
+ if withErrs {
+ err = utils.ErrPartiallyExecuted
+ }
+ return
+}
+
+// processEEs will pass the Ranking event to EEs
+func (rkS *RankingS) processEEs(rk *Ranking) (err error) {
+ if len(rk.SortedStatIDs) == 0 {
+ return
+ }
+ if len(rkS.cgrcfg.TrendSCfg().EEsConns) == 0 {
+ return
+ }
+ opts := map[string]any{
+ utils.MetaEventType: utils.RankingUpdate,
+ }
+ ev := &utils.CGREvent{
+ Tenant: rk.Tenant,
+ ID: utils.GenUUID(),
+ APIOpts: opts,
+ Event: map[string]any{
+ utils.RankingID: rk.ID,
+ utils.SortedStatIDs: copy([]string{}, rk.SortedStatIDs),
+ },
+ }
+ var withErrs bool
+ var reply map[string]map[string]any
+ if err := rkS.connMgr.Call(context.TODO(), rkS.cgrcfg.TrendSCfg().EEsConns,
+ utils.EeSv1ProcessEvent, ev, &reply); err != nil &&
+ err.Error() != utils.ErrNotFound.Error() {
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> error: %q processing event %+v with EEs.", utils.RankingS, err.Error(), ev))
+ withErrs = true
+ }
+ if withErrs {
+ err = utils.ErrPartiallyExecuted
+ }
+ return
+}
+
+// storeTrend will store or schedule the trend based on settings
+func (rkS *RankingS) storeRanking(rk *Ranking) (err error) {
+ if rkS.cgrcfg.TrendSCfg().StoreInterval == 0 {
+ return
+ }
+ /*
+ if rkS.cgrcfg.TrendSCfg().StoreInterval == -1 {
+ return rkS.dm.SetRanking(rk)
+ }
+ */
+
+ // schedule the asynchronous save, relies for Ranking to be in cache
+ rkS.sRksMux.Lock()
+ rkS.storedRankings.Add(rk.rkPrfl.TenantID())
+ rkS.sRksMux.Unlock()
+ return
+}
+
+// storeRankings will do one round for saving modified Rankings
+//
+// from cache to dataDB
+// designed to run asynchronously
+func (rkS *RankingS) storeRankings() {
+ var failedRkIDs []string
+ for {
+ rkS.sRksMux.Lock()
+ rkID := rkS.storedRankings.GetOne()
+ if rkID != utils.EmptyString {
+ rkS.storedRankings.Remove(rkID)
+ }
+ rkS.sRksMux.Unlock()
+ if rkID == utils.EmptyString {
+ break // no more keys, backup completed
+ }
+ rkIf, ok := Cache.Get(utils.CacheRankings, rkID)
+ if !ok || rkIf == nil {
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> failed retrieving from cache Ranking with ID: %q",
+ utils.RankingS, rkID))
+ failedRkIDs = append(failedRkIDs, rkID) // record failure so we can schedule it for next backup
+ continue
+ }
+ rk := rkIf.(*Ranking)
+ rk.rMux.RLock()
+ /*
+ if err := rkS.dm.SetRanking(rk); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> failed storing Trend with ID: %q, err: %q",
+ utils.RankingS, rkID, err))
+ failedRkIDs = append(failedRkIDs, rkID) // record failure so we can schedule it for next backup
+ }
+ */
+ rk.rMux.RUnlock()
+ // randomize the CPU load and give up thread control
+ runtime.Gosched()
+ }
+ if len(failedRkIDs) != 0 { // there were errors on save, schedule the keys for next backup
+ rkS.sRksMux.Lock()
+ rkS.storedRankings.AddSlice(failedRkIDs)
+ rkS.sRksMux.Unlock()
+ }
+}
+
+// asyncStoreRankings runs as a backround process, calling storeRankings based on storeInterval
+func (rkS *RankingS) asyncStoreRankings() {
+ storeInterval := rkS.cgrcfg.TrendSCfg().StoreInterval
+ if storeInterval <= 0 {
+ close(rkS.storingStopped)
+ return
+ }
+ for {
+ rkS.storeRankings()
+ select {
+ case <-rkS.rankingStop:
+ close(rkS.storingStopped)
+ return
+ case <-time.After(storeInterval): // continue to another storing loop
+ }
+ }
+}
+
+// StartRankings will activates the Cron, together with all scheduled Ranking queries
+func (rkS *RankingS) StartRankingS() (err error) {
+ if err = rkS.scheduleAutomaticQueries(); err != nil {
+ return
+ }
+ rkS.crn.Start()
+ go rkS.asyncStoreRankings()
+ return
+}
+
+// StopCron will shutdown the Cron tasks
+func (rkS *RankingS) StopRankingS() {
+ timeEnd := time.Now().Add(rkS.cgrcfg.CoreSCfg().ShutdownTimeout)
+
+ ctx := rkS.crn.Stop()
+ close(rkS.rankingStop)
+
+ // Wait for cron
+ select {
+ case <-ctx.Done():
+ case <-time.After(timeEnd.Sub(time.Now())):
+ utils.Logger.Warning(
+ fmt.Sprintf(
+ "<%s> timeout waiting for Cron to finish",
+ utils.RankingS))
+ return
+ }
+ // Wait for backup and other operations
+ select {
+ case <-rkS.storingStopped:
+ case <-time.After(timeEnd.Sub(time.Now())):
+ utils.Logger.Warning(
+ fmt.Sprintf(
+ "<%s> timeout waiting for RankingS to finish",
+ utils.RankingS))
+ return
+ }
+}
+
+func (rkS *RankingS) Reload() {
+ ctx := rkS.crn.Stop()
+ close(rkS.rankingStop)
+ <-ctx.Done()
+ <-rkS.storingStopped
+ rkS.rankingStop = make(chan struct{})
+ rkS.storingStopped = make(chan struct{})
+ rkS.crn.Start()
+ go rkS.asyncStoreRankings()
+}
+
+// scheduleAutomaticQueries will schedule the queries at start/reload based on configured
+func (rkS *RankingS) scheduleAutomaticQueries() error {
+ schedData := make(map[string][]string)
+ for k, v := range rkS.cgrcfg.TrendSCfg().ScheduledIDs {
+ schedData[k] = v
+ }
+ var tnts []string
+ if len(schedData) == 0 {
+ tnts = make([]string, 0)
+ }
+ for tnt, rkIDs := range schedData {
+ if len(rkIDs) == 0 {
+ tnts = append(tnts, tnt)
+ }
+ }
+ if tnts != nil {
+ qrydData, err := rkS.dm.GetTrendProfileIDs(tnts)
+ if err != nil {
+ return err
+ }
+ for tnt, ids := range qrydData {
+ schedData[tnt] = ids
+ }
+ }
+ for tnt, rkIDs := range schedData {
+ if _, err := rkS.scheduleRankingQueries(context.TODO(), tnt, rkIDs); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// scheduleTrendQueries will schedule/re-schedule specific trend queries
+func (rkS *RankingS) scheduleRankingQueries(_ *context.Context,
+ tnt string, rkIDs []string) (scheduled int, err error) {
+ var partial bool
+ rkS.crnTQsMux.Lock()
+ if _, has := rkS.crnTQs[tnt]; !has {
+ rkS.crnTQs[tnt] = make(map[string]cron.EntryID)
+ }
+ rkS.crnTQsMux.Unlock()
+ for _, rkID := range rkIDs {
+ rkS.crnTQsMux.RLock()
+ if entryID, has := rkS.crnTQs[tnt][rkID]; has {
+ rkS.crn.Remove(entryID) // deschedule the query
+ }
+ rkS.crnTQsMux.RUnlock()
+ if rkP, err := rkS.dm.GetRankingProfile(tnt, rkID, true, true, utils.NonTransactional); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf(
+ "<%s> failed retrieving RankingProfile with id: <%s:%s> for scheduling, error: <%s>",
+ utils.RankingS, tnt, rkID, err.Error()))
+ partial = true
+ } else if entryID, err := rkS.crn.AddFunc(utils.EmptyString,
+ func() { rkS.computeRanking(rkP.Clone()) }); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf(
+ "<%s> scheduling RankingProfile <%s:%s>, error: <%s>",
+ utils.RankingS, tnt, rkID, err.Error()))
+ partial = true
+ } else { // log the entry ID for debugging
+ rkS.crnTQsMux.Lock()
+ rkS.crnTQs[rkP.Tenant][rkP.ID] = entryID
+ rkS.crnTQsMux.Unlock()
+ }
+ scheduled += 1
+ }
+ if partial {
+ return 0, utils.ErrPartiallyExecuted
+ }
+ return
+}
+
+// V1ScheduleQueries is the query for manually re-/scheduling Ranking Queries
+func (rkS *RankingS) V1ScheduleQueries(ctx *context.Context, args *utils.ArgScheduleRankingQueries, scheduled *int) (err error) {
+ if sched, errSched := rkS.scheduleRankingQueries(ctx, args.Tenant, args.RankingIDs); errSched != nil {
+ return errSched
+ } else {
+ *scheduled = sched
+ }
+ return
+}
+
+/*
+// V1GetRanking is the API to return the Ranking instance
+//
+func (rkS *RankingS) V1GetRanking(ctx *context.Context, arg *utils.ArgGetRanking, retRanking *Ranking) (err error) {
+ if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing
+ return utils.NewErrMandatoryIeMissing(missing...)
+ }
+ var rk *Ranking
+ if rk, err = rkS.dm.GetRanking(arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
+ return
+ }
+ retRanking.Tenant = trnd.Tenant // avoid vet complaining for mutex copying
+ retTrend.ID = trnd.ID
+ starrkIDx := arg.RunIndexStart
+ if starrkIDx > len(trnd.RunTimes) {
+ starrkIDx = len(trnd.RunTimes)
+ }
+ endIdx := arg.RunIndexEnd
+ if endIdx > len(trnd.RunTimes) ||
+ endIdx < starrkIDx ||
+ endIdx == 0 {
+ endIdx = len(trnd.RunTimes)
+ }
+ runTimes := trnd.RunTimes[starrkIDx:endIdx]
+ if len(runTimes) == 0 {
+ return utils.ErrNotFound
+ }
+ var tStart, tEnd time.Time
+ if arg.RunTimeStart == utils.EmptyString {
+ tStart = runTimes[0]
+ } else if tStart, err = utils.ParseTimeDetectLayout(arg.RunTimeStart, rkS.cgrcfg.GeneralCfg().DefaultTimezone); err != nil {
+ return
+ }
+ if arg.RunTimeEnd == utils.EmptyString {
+ tEnd = runTimes[len(runTimes)-1].Add(time.Duration(1))
+ } else if tEnd, err = utils.ParseTimeDetectLayout(arg.RunTimeEnd, rkS.cgrcfg.GeneralCfg().DefaultTimezone); err != nil {
+ return
+ }
+ retTrend.RunTimes = make([]time.Time, 0, len(runTimes))
+ for _, runTime := range runTimes {
+ if !runTime.Before(tStart) && runTime.Before(tEnd) {
+ retTrend.RunTimes = append(retTrend.RunTimes, runTime)
+ }
+ }
+ if len(retTrend.RunTimes) == 0 { // filtered out all
+ return utils.ErrNotFound
+ }
+ retTrend.Metrics = make(map[time.Time]map[string]*MetricWithTrend)
+ for _, runTime := range retTrend.RunTimes {
+ retTrend.Metrics[runTime] = trnd.Metrics[runTime]
+ }
+ return
+}
+*/
+
+// V1GetSchedule returns the active schedule for Raking queries
+func (rkS *RankingS) V1GetSchedule(ctx *context.Context, args *utils.ArgScheduledRankings, schedRankings *[]utils.ScheduledRanking) (err error) {
+ tnt := args.Tenant
+ if tnt == utils.EmptyString {
+ tnt = rkS.cgrcfg.GeneralCfg().DefaultTenant
+ }
+ rkS.crnTQsMux.RLock()
+ defer rkS.crnTQsMux.RUnlock()
+ trendIDsMp, has := rkS.crnTQs[tnt]
+ if !has {
+ return utils.ErrNotFound
+ }
+ var scheduledRankings []utils.ScheduledRanking
+ var entryIds map[string]cron.EntryID
+ if len(args.RankingIDPrefixes) == 0 {
+ entryIds = trendIDsMp
+ } else {
+ entryIds = make(map[string]cron.EntryID)
+ for _, rkID := range args.RankingIDPrefixes {
+ for key, entryID := range trendIDsMp {
+ if strings.HasPrefix(key, rkID) {
+ entryIds[key] = entryID
+ }
+ }
+ }
+ }
+ if len(entryIds) == 0 {
+ return utils.ErrNotFound
+ }
+ var entry cron.Entry
+ for id, entryID := range entryIds {
+ entry = rkS.crn.Entry(entryID)
+ if entry.ID == 0 {
+ continue
+ }
+ scheduledRankings = append(scheduledRankings,
+ utils.ScheduledRanking{
+ RankingID: id,
+ Next: entry.Next,
+ Previous: entry.Prev,
+ })
+ }
+ slices.SortFunc(scheduledRankings, func(a, b utils.ScheduledRanking) int {
+ return a.Next.Compare(b.Next)
+ })
+ *schedRankings = scheduledRankings
+ return nil
+}
diff --git a/engine/trends.go b/engine/trends.go
index 0ad28027d..ca536ca73 100644
--- a/engine/trends.go
+++ b/engine/trends.go
@@ -82,7 +82,7 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) {
&floatMetrics); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
- "<%s> computing trend for with id: <%s:%s> stats <%s> error: <%s>",
+ "<%s> computing trend with id: <%s:%s> for stats <%s> error: <%s>",
utils.TrendS, tP.Tenant, tP.ID, tP.StatID, err.Error()))
return
}
@@ -538,11 +538,11 @@ func (tS *TrendS) V1GetScheduledTrends(ctx *context.Context, args *utils.ArgSche
}
var scheduledTrends []utils.ScheduledTrend
var entryIds map[string]cron.EntryID
- if len(args.TrendIDPrefix) == 0 {
+ if len(args.TrendIDPrefixes) == 0 {
entryIds = trendIDsMp
} else {
entryIds = make(map[string]cron.EntryID)
- for _, tID := range args.TrendIDPrefix {
+ for _, tID := range args.TrendIDPrefixes {
for key, entryID := range trendIDsMp {
if strings.HasPrefix(key, tID) {
entryIds[key] = entryID
@@ -560,9 +560,9 @@ func (tS *TrendS) V1GetScheduledTrends(ctx *context.Context, args *utils.ArgSche
continue
}
scheduledTrends = append(scheduledTrends, utils.ScheduledTrend{
- TrendID: id,
- Next: entry.Next,
- Prev: entry.Prev,
+ TrendID: id,
+ Next: entry.Next,
+ Previous: entry.Prev,
})
}
slices.SortFunc(scheduledTrends, func(a, b utils.ScheduledTrend) int {
diff --git a/services/rankings.go b/services/rankings.go
index 07789c6f7..2b2fe8bfa 100644
--- a/services/rankings.go
+++ b/services/rankings.go
@@ -61,63 +61,71 @@ type RankingService struct {
connChan chan birpc.ClientConnector
anz *AnalyzerService
srvDep map[string]*sync.WaitGroup
+ rks *engine.RankingS
}
// Start should handle the sercive start
-func (rg *RankingService) Start() error {
- if rg.IsRunning() {
+func (rk *RankingService) Start() error {
+ if rk.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
- rg.srvDep[utils.DataDB].Add(1)
- <-rg.cacheS.GetPrecacheChannel(utils.CacheRankingProfiles)
- <-rg.cacheS.GetPrecacheChannel(utils.CacheRankingFilterIndexes)
+ rk.srvDep[utils.DataDB].Add(1)
+ <-rk.cacheS.GetPrecacheChannel(utils.CacheRankingProfiles)
+ <-rk.cacheS.GetPrecacheChannel(utils.CacheRankingFilterIndexes)
- filterS := <-rg.filterSChan
- rg.filterSChan <- filterS
- dbchan := rg.dm.GetDMChan()
+ filterS := <-rk.filterSChan
+ rk.filterSChan <- filterS
+ dbchan := rk.dm.GetDMChan()
datadb := <-dbchan
dbchan <- datadb
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem",
utils.CoreS, utils.RankingS))
+
+ rk.rks = engine.NewRankingS(datadb, rk.connMgr, filterS, rk.cfg)
+ if err := rk.rks.StartRankingS(); err != nil {
+ return err
+ }
srv, err := engine.NewService(v1.NewRankingSv1())
if err != nil {
return err
}
- if !rg.cfg.DispatcherSCfg().Enabled {
- rg.server.RpcRegister(srv)
+ if !rk.cfg.DispatcherSCfg().Enabled {
+ rk.server.RpcRegister(srv)
}
- rg.connChan <- rg.anz.GetInternalCodec(srv, utils.StatS)
+ rk.connChan <- rk.anz.GetInternalCodec(srv, utils.StatS)
return nil
}
// Reload handles the change of config
-func (rg *RankingService) Reload() (err error) {
+func (rk *RankingService) Reload() (err error) {
return
}
// Shutdown stops the service
-func (rg *RankingService) Shutdown() (err error) {
- defer rg.srvDep[utils.DataDB].Done()
- rg.Lock()
- defer rg.Unlock()
- <-rg.connChan
+func (rk *RankingService) Shutdown() (err error) {
+ defer rk.srvDep[utils.DataDB].Done()
+ rk.Lock()
+ defer rk.Unlock()
+ rk.rks.StopRankingS()
+ rk.rks = nil
+ <-rk.connChan
return
}
// IsRunning returns if the service is running
-func (rg *RankingService) IsRunning() bool {
- rg.RLock()
- defer rg.RUnlock()
- return false
+func (rk *RankingService) IsRunning() bool {
+ rk.RLock()
+ defer rk.RUnlock()
+ return rk.rks != nil
}
// ServiceName returns the service name
-func (rg *RankingService) ServiceName() string {
+func (rk *RankingService) ServiceName() string {
return utils.RankingS
}
// ShouldRun returns if the service should be running
-func (rg *RankingService) ShouldRun() bool {
- return rg.cfg.RankingSCfg().Enabled
+func (rk *RankingService) ShouldRun() bool {
+ return rk.cfg.RankingSCfg().Enabled
}
diff --git a/services/trends.go b/services/trends.go
index 848993263..4854d5de2 100644
--- a/services/trends.go
+++ b/services/trends.go
@@ -112,6 +112,7 @@ func (tr *TrendService) Shutdown() (err error) {
tr.Lock()
defer tr.Unlock()
tr.trs.StopTrendS()
+ tr.trs = nil
<-tr.connChan
return
}
diff --git a/utils/apitpdata.go b/utils/apitpdata.go
index ac284fb76..082f94133 100644
--- a/utils/apitpdata.go
+++ b/utils/apitpdata.go
@@ -1660,7 +1660,7 @@ type ArgScheduleTrendQueries struct {
}
type ArgScheduledTrends struct {
TenantIDWithAPIOpts
- TrendIDPrefix []string
+ TrendIDPrefixes []string
}
type ArgGetTrend struct {
@@ -1675,5 +1675,21 @@ type ArgGetTrend struct {
type ScheduledTrend struct {
TrendID string
Next time.Time
- Prev time.Time
+ Previous time.Time
+}
+
+type ArgScheduleRankingQueries struct {
+ TenantIDWithAPIOpts
+ RankingIDs []string
+}
+
+type ArgScheduledRankings struct {
+ TenantIDWithAPIOpts
+ RankingIDPrefixes []string
+}
+
+type ScheduledRanking struct {
+ RankingID string
+ Next time.Time
+ Previous time.Time
}
diff --git a/utils/consts.go b/utils/consts.go
index 09f4a87b8..73aea3edf 100644
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -512,7 +512,9 @@ const (
TotalUsage = "TotalUsage"
StatID = "StatID"
StatIDs = "StatIDs"
+ SortedStatIDs = "SortedStatIDs"
TrendID = "TrendID"
+ RankingID = "RankingID"
BalanceType = "BalanceType"
BalanceID = "BalanceID"
BalanceDestinationIds = "BalanceDestinationIds"
@@ -529,6 +531,7 @@ const (
AccountUpdate = "AccountUpdate"
StatUpdate = "StatUpdate"
TrendUpdate = "TrendUpdate"
+ RankingUpdate = "RankingUpdate"
ResourceUpdate = "ResourceUpdate"
CDR = "CDR"
CDRs = "CDRs"
@@ -1993,6 +1996,7 @@ const (
CacheRankingProfiles = "*ranking_profiles"
CacheTrendProfiles = "*trend_profiles"
CacheTrends = "*trends"
+ CacheRankings = "*rankings"
CacheThresholdProfiles = "*threshold_profiles"
CacheThresholds = "*thresholds"
CacheFilters = "*filters"