mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Basic RankingS implementation
This commit is contained in:
@@ -13,12 +13,13 @@ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
along with this program. If not, see <htrkP://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package engine
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -40,6 +41,58 @@ type RankingProfile struct {
|
||||
ThresholdIDs []string
|
||||
}
|
||||
|
||||
func (sgp *RankingProfile) TenantID() string {
|
||||
return utils.ConcatenatedKey(sgp.Tenant, sgp.ID)
|
||||
func (rkp *RankingProfile) TenantID() string {
|
||||
return utils.ConcatenatedKey(rkp.Tenant, rkp.ID)
|
||||
}
|
||||
|
||||
// Clone will clone a RankingProfile
|
||||
func (rkP *RankingProfile) Clone() (cln *RankingProfile) {
|
||||
cln = &RankingProfile{
|
||||
Tenant: rkP.Tenant,
|
||||
ID: rkP.ID,
|
||||
QueryInterval: rkP.QueryInterval,
|
||||
Sorting: rkP.Sorting,
|
||||
}
|
||||
if rkP.StatIDs != nil {
|
||||
copy(cln.StatIDs, rkP.StatIDs)
|
||||
}
|
||||
if rkP.MetricIDs != nil {
|
||||
copy(cln.MetricIDs, rkP.MetricIDs)
|
||||
}
|
||||
if rkP.SortingParameters != nil {
|
||||
copy(cln.SortingParameters, rkP.SortingParameters)
|
||||
}
|
||||
if rkP.ThresholdIDs != nil {
|
||||
copy(cln.ThresholdIDs, rkP.ThresholdIDs)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// NewRankingFromProfile is a constructor for an empty ranking out of it's profile
|
||||
func NewRankingFromProfile(rkP *RankingProfile) *Ranking {
|
||||
return &Ranking{
|
||||
Tenant: rkP.Tenant,
|
||||
ID: rkP.ID,
|
||||
StatMetrics: make(map[string]map[string]float64),
|
||||
|
||||
rkPrfl: rkP,
|
||||
metricIDs: utils.NewStringSet(rkP.MetricIDs),
|
||||
}
|
||||
}
|
||||
|
||||
// Ranking is one unit out of a profile
|
||||
type Ranking struct {
|
||||
rMux sync.RWMutex
|
||||
|
||||
Tenant string
|
||||
ID string
|
||||
StatMetrics map[string]map[string]float64 // map[statID]map[metricID]metricValue
|
||||
Sorting string
|
||||
SortingParameters []string
|
||||
|
||||
SortedStatIDs []string
|
||||
|
||||
rkPrfl *RankingProfile // store here the ranking profile so we can have it at hands further
|
||||
metricIDs utils.StringSet // convert the metricIDs here for faster matching
|
||||
|
||||
}
|
||||
|
||||
543
engine/rankings.go
Normal file
543
engine/rankings.go
Normal file
@@ -0,0 +1,543 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package engine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/cron"
|
||||
)
|
||||
|
||||
// NewRankingS is the constructor for RankingS
|
||||
func NewRankingS(dm *DataManager,
|
||||
connMgr *ConnManager,
|
||||
filterS *FilterS,
|
||||
cgrcfg *config.CGRConfig) *RankingS {
|
||||
return &RankingS{
|
||||
dm: dm,
|
||||
connMgr: connMgr,
|
||||
filterS: filterS,
|
||||
cgrcfg: cgrcfg,
|
||||
crn: cron.New(),
|
||||
crnTQsMux: new(sync.RWMutex),
|
||||
crnTQs: make(map[string]map[string]cron.EntryID),
|
||||
storedRankings: make(utils.StringSet),
|
||||
storingStopped: make(chan struct{}),
|
||||
rankingStop: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// RankingS is responsible of implementing the logic of RankingService
|
||||
type RankingS struct {
|
||||
dm *DataManager
|
||||
connMgr *ConnManager
|
||||
filterS *FilterS
|
||||
cgrcfg *config.CGRConfig
|
||||
|
||||
crn *cron.Cron // cron reference
|
||||
|
||||
crnTQsMux *sync.RWMutex // protects the crnTQs
|
||||
crnTQs map[string]map[string]cron.EntryID // save the EntryIDs for rankingQueries so we can reschedule them when needed
|
||||
|
||||
storedRankings utils.StringSet // keep a record of RankingS which need saving, map[rankingTenanrkID]bool
|
||||
sRksMux sync.RWMutex // protects storedRankings
|
||||
storingStopped chan struct{} // signal back that the operations were stopped
|
||||
|
||||
rankingStop chan struct{} // signal to stop all operations
|
||||
|
||||
}
|
||||
|
||||
// computeRanking will query the stats and build the Ranking for them
|
||||
//
|
||||
// it is to be called by Cron service
|
||||
func (rkS *RankingS) computeRanking(rkP *RankingProfile) {
|
||||
|
||||
/*rk, err := rkS.dm.GetRanking(tP.Tenant, tP.ID, true, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> querying rkP with id: <%s:%s> dm error: <%s>",
|
||||
utils.RankingS, tP.Tenant, tP.ID, err.Error()))
|
||||
return
|
||||
}
|
||||
*/
|
||||
rk := NewRankingFromProfile(rkP)
|
||||
rk.rMux.Lock()
|
||||
defer rk.rMux.Unlock()
|
||||
/*if trnd.tPrfl == nil {
|
||||
trnd.tPrfl = tP
|
||||
}
|
||||
*/
|
||||
|
||||
for _, statID := range rkP.StatIDs {
|
||||
var floatMetrics map[string]float64
|
||||
if err := rkS.connMgr.Call(context.Background(), rkS.cgrcfg.RankingSCfg().StatSConns,
|
||||
utils.StatSv1GetQueueFloatMetrics,
|
||||
&utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: rkP.Tenant, ID: statID}},
|
||||
&floatMetrics); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> computing Ranking with id: <%s:%s> for stats <%s> error: <%s>",
|
||||
utils.RankingS, rkP.Tenant, rkP.ID, statID, err.Error()))
|
||||
return
|
||||
}
|
||||
if len(rk.metricIDs) != 0 {
|
||||
for metricID := range floatMetrics {
|
||||
if _, has := rk.metricIDs[statID]; !has {
|
||||
delete(floatMetrics, metricID)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(floatMetrics) != 0 {
|
||||
rk.StatMetrics[statID] = make(map[string]float64)
|
||||
}
|
||||
for metricID, val := range floatMetrics {
|
||||
rk.StatMetrics[statID][metricID] = val
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
if err = rkS.storeRanking(rk); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> setting Ranking with id: <%s:%s> DM error: <%s>",
|
||||
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
|
||||
return
|
||||
}
|
||||
*/
|
||||
if err := rkS.processThresholds(rk); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> Ranking with id <%s:%s> error: <%s> with ThresholdS",
|
||||
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
|
||||
}
|
||||
if err := rkS.processEEs(rk); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> Trend with id <%s:%s> error: <%s> with EEs",
|
||||
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// processThresholds will pass the Ranking event to ThresholdS
|
||||
func (rkS *RankingS) processThresholds(rk *Ranking) (err error) {
|
||||
if len(rk.SortedStatIDs) == 0 {
|
||||
return
|
||||
}
|
||||
if len(rkS.cgrcfg.TrendSCfg().ThresholdSConns) == 0 {
|
||||
return
|
||||
}
|
||||
opts := map[string]any{
|
||||
utils.MetaEventType: utils.RankingUpdate,
|
||||
}
|
||||
var thIDs []string
|
||||
if len(rk.rkPrfl.ThresholdIDs) != 0 {
|
||||
if len(rk.rkPrfl.ThresholdIDs) == 1 &&
|
||||
rk.rkPrfl.ThresholdIDs[0] == utils.MetaNone {
|
||||
return
|
||||
}
|
||||
thIDs = make([]string, len(rk.rkPrfl.ThresholdIDs))
|
||||
copy(thIDs, rk.rkPrfl.ThresholdIDs)
|
||||
}
|
||||
opts[utils.OptsThresholdsProfileIDs] = thIDs
|
||||
ev := &utils.CGREvent{
|
||||
Tenant: rk.Tenant,
|
||||
ID: utils.GenUUID(),
|
||||
APIOpts: opts,
|
||||
Event: map[string]any{
|
||||
utils.RankingID: rk.ID,
|
||||
utils.SortedStatIDs: copy([]string{}, rk.SortedStatIDs),
|
||||
},
|
||||
}
|
||||
var withErrs bool
|
||||
var rkIDs []string
|
||||
if err := rkS.connMgr.Call(context.TODO(), rkS.cgrcfg.TrendSCfg().ThresholdSConns,
|
||||
utils.ThresholdSv1ProcessEvent, ev, &rkIDs); err != nil &&
|
||||
(len(thIDs) != 0 || err.Error() != utils.ErrNotFound.Error()) {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.", utils.RankingS, err.Error(), ev))
|
||||
withErrs = true
|
||||
}
|
||||
if withErrs {
|
||||
err = utils.ErrPartiallyExecuted
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// processEEs will pass the Ranking event to EEs
|
||||
func (rkS *RankingS) processEEs(rk *Ranking) (err error) {
|
||||
if len(rk.SortedStatIDs) == 0 {
|
||||
return
|
||||
}
|
||||
if len(rkS.cgrcfg.TrendSCfg().EEsConns) == 0 {
|
||||
return
|
||||
}
|
||||
opts := map[string]any{
|
||||
utils.MetaEventType: utils.RankingUpdate,
|
||||
}
|
||||
ev := &utils.CGREvent{
|
||||
Tenant: rk.Tenant,
|
||||
ID: utils.GenUUID(),
|
||||
APIOpts: opts,
|
||||
Event: map[string]any{
|
||||
utils.RankingID: rk.ID,
|
||||
utils.SortedStatIDs: copy([]string{}, rk.SortedStatIDs),
|
||||
},
|
||||
}
|
||||
var withErrs bool
|
||||
var reply map[string]map[string]any
|
||||
if err := rkS.connMgr.Call(context.TODO(), rkS.cgrcfg.TrendSCfg().EEsConns,
|
||||
utils.EeSv1ProcessEvent, ev, &reply); err != nil &&
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %q processing event %+v with EEs.", utils.RankingS, err.Error(), ev))
|
||||
withErrs = true
|
||||
}
|
||||
if withErrs {
|
||||
err = utils.ErrPartiallyExecuted
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// storeTrend will store or schedule the trend based on settings
|
||||
func (rkS *RankingS) storeRanking(rk *Ranking) (err error) {
|
||||
if rkS.cgrcfg.TrendSCfg().StoreInterval == 0 {
|
||||
return
|
||||
}
|
||||
/*
|
||||
if rkS.cgrcfg.TrendSCfg().StoreInterval == -1 {
|
||||
return rkS.dm.SetRanking(rk)
|
||||
}
|
||||
*/
|
||||
|
||||
// schedule the asynchronous save, relies for Ranking to be in cache
|
||||
rkS.sRksMux.Lock()
|
||||
rkS.storedRankings.Add(rk.rkPrfl.TenantID())
|
||||
rkS.sRksMux.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// storeRankings will do one round for saving modified Rankings
|
||||
//
|
||||
// from cache to dataDB
|
||||
// designed to run asynchronously
|
||||
func (rkS *RankingS) storeRankings() {
|
||||
var failedRkIDs []string
|
||||
for {
|
||||
rkS.sRksMux.Lock()
|
||||
rkID := rkS.storedRankings.GetOne()
|
||||
if rkID != utils.EmptyString {
|
||||
rkS.storedRankings.Remove(rkID)
|
||||
}
|
||||
rkS.sRksMux.Unlock()
|
||||
if rkID == utils.EmptyString {
|
||||
break // no more keys, backup completed
|
||||
}
|
||||
rkIf, ok := Cache.Get(utils.CacheRankings, rkID)
|
||||
if !ok || rkIf == nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> failed retrieving from cache Ranking with ID: %q",
|
||||
utils.RankingS, rkID))
|
||||
failedRkIDs = append(failedRkIDs, rkID) // record failure so we can schedule it for next backup
|
||||
continue
|
||||
}
|
||||
rk := rkIf.(*Ranking)
|
||||
rk.rMux.RLock()
|
||||
/*
|
||||
if err := rkS.dm.SetRanking(rk); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> failed storing Trend with ID: %q, err: %q",
|
||||
utils.RankingS, rkID, err))
|
||||
failedRkIDs = append(failedRkIDs, rkID) // record failure so we can schedule it for next backup
|
||||
}
|
||||
*/
|
||||
rk.rMux.RUnlock()
|
||||
// randomize the CPU load and give up thread control
|
||||
runtime.Gosched()
|
||||
}
|
||||
if len(failedRkIDs) != 0 { // there were errors on save, schedule the keys for next backup
|
||||
rkS.sRksMux.Lock()
|
||||
rkS.storedRankings.AddSlice(failedRkIDs)
|
||||
rkS.sRksMux.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// asyncStoreRankings runs as a backround process, calling storeRankings based on storeInterval
|
||||
func (rkS *RankingS) asyncStoreRankings() {
|
||||
storeInterval := rkS.cgrcfg.TrendSCfg().StoreInterval
|
||||
if storeInterval <= 0 {
|
||||
close(rkS.storingStopped)
|
||||
return
|
||||
}
|
||||
for {
|
||||
rkS.storeRankings()
|
||||
select {
|
||||
case <-rkS.rankingStop:
|
||||
close(rkS.storingStopped)
|
||||
return
|
||||
case <-time.After(storeInterval): // continue to another storing loop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// StartRankings will activates the Cron, together with all scheduled Ranking queries
|
||||
func (rkS *RankingS) StartRankingS() (err error) {
|
||||
if err = rkS.scheduleAutomaticQueries(); err != nil {
|
||||
return
|
||||
}
|
||||
rkS.crn.Start()
|
||||
go rkS.asyncStoreRankings()
|
||||
return
|
||||
}
|
||||
|
||||
// StopCron will shutdown the Cron tasks
|
||||
func (rkS *RankingS) StopRankingS() {
|
||||
timeEnd := time.Now().Add(rkS.cgrcfg.CoreSCfg().ShutdownTimeout)
|
||||
|
||||
ctx := rkS.crn.Stop()
|
||||
close(rkS.rankingStop)
|
||||
|
||||
// Wait for cron
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-time.After(timeEnd.Sub(time.Now())):
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> timeout waiting for Cron to finish",
|
||||
utils.RankingS))
|
||||
return
|
||||
}
|
||||
// Wait for backup and other operations
|
||||
select {
|
||||
case <-rkS.storingStopped:
|
||||
case <-time.After(timeEnd.Sub(time.Now())):
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> timeout waiting for RankingS to finish",
|
||||
utils.RankingS))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (rkS *RankingS) Reload() {
|
||||
ctx := rkS.crn.Stop()
|
||||
close(rkS.rankingStop)
|
||||
<-ctx.Done()
|
||||
<-rkS.storingStopped
|
||||
rkS.rankingStop = make(chan struct{})
|
||||
rkS.storingStopped = make(chan struct{})
|
||||
rkS.crn.Start()
|
||||
go rkS.asyncStoreRankings()
|
||||
}
|
||||
|
||||
// scheduleAutomaticQueries will schedule the queries at start/reload based on configured
|
||||
func (rkS *RankingS) scheduleAutomaticQueries() error {
|
||||
schedData := make(map[string][]string)
|
||||
for k, v := range rkS.cgrcfg.TrendSCfg().ScheduledIDs {
|
||||
schedData[k] = v
|
||||
}
|
||||
var tnts []string
|
||||
if len(schedData) == 0 {
|
||||
tnts = make([]string, 0)
|
||||
}
|
||||
for tnt, rkIDs := range schedData {
|
||||
if len(rkIDs) == 0 {
|
||||
tnts = append(tnts, tnt)
|
||||
}
|
||||
}
|
||||
if tnts != nil {
|
||||
qrydData, err := rkS.dm.GetTrendProfileIDs(tnts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for tnt, ids := range qrydData {
|
||||
schedData[tnt] = ids
|
||||
}
|
||||
}
|
||||
for tnt, rkIDs := range schedData {
|
||||
if _, err := rkS.scheduleRankingQueries(context.TODO(), tnt, rkIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// scheduleTrendQueries will schedule/re-schedule specific trend queries
|
||||
func (rkS *RankingS) scheduleRankingQueries(_ *context.Context,
|
||||
tnt string, rkIDs []string) (scheduled int, err error) {
|
||||
var partial bool
|
||||
rkS.crnTQsMux.Lock()
|
||||
if _, has := rkS.crnTQs[tnt]; !has {
|
||||
rkS.crnTQs[tnt] = make(map[string]cron.EntryID)
|
||||
}
|
||||
rkS.crnTQsMux.Unlock()
|
||||
for _, rkID := range rkIDs {
|
||||
rkS.crnTQsMux.RLock()
|
||||
if entryID, has := rkS.crnTQs[tnt][rkID]; has {
|
||||
rkS.crn.Remove(entryID) // deschedule the query
|
||||
}
|
||||
rkS.crnTQsMux.RUnlock()
|
||||
if rkP, err := rkS.dm.GetRankingProfile(tnt, rkID, true, true, utils.NonTransactional); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> failed retrieving RankingProfile with id: <%s:%s> for scheduling, error: <%s>",
|
||||
utils.RankingS, tnt, rkID, err.Error()))
|
||||
partial = true
|
||||
} else if entryID, err := rkS.crn.AddFunc(utils.EmptyString,
|
||||
func() { rkS.computeRanking(rkP.Clone()) }); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> scheduling RankingProfile <%s:%s>, error: <%s>",
|
||||
utils.RankingS, tnt, rkID, err.Error()))
|
||||
partial = true
|
||||
} else { // log the entry ID for debugging
|
||||
rkS.crnTQsMux.Lock()
|
||||
rkS.crnTQs[rkP.Tenant][rkP.ID] = entryID
|
||||
rkS.crnTQsMux.Unlock()
|
||||
}
|
||||
scheduled += 1
|
||||
}
|
||||
if partial {
|
||||
return 0, utils.ErrPartiallyExecuted
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// V1ScheduleQueries is the query for manually re-/scheduling Ranking Queries
|
||||
func (rkS *RankingS) V1ScheduleQueries(ctx *context.Context, args *utils.ArgScheduleRankingQueries, scheduled *int) (err error) {
|
||||
if sched, errSched := rkS.scheduleRankingQueries(ctx, args.Tenant, args.RankingIDs); errSched != nil {
|
||||
return errSched
|
||||
} else {
|
||||
*scheduled = sched
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
/*
|
||||
// V1GetRanking is the API to return the Ranking instance
|
||||
//
|
||||
func (rkS *RankingS) V1GetRanking(ctx *context.Context, arg *utils.ArgGetRanking, retRanking *Ranking) (err error) {
|
||||
if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
}
|
||||
var rk *Ranking
|
||||
if rk, err = rkS.dm.GetRanking(arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
|
||||
return
|
||||
}
|
||||
retRanking.Tenant = trnd.Tenant // avoid vet complaining for mutex copying
|
||||
retTrend.ID = trnd.ID
|
||||
starrkIDx := arg.RunIndexStart
|
||||
if starrkIDx > len(trnd.RunTimes) {
|
||||
starrkIDx = len(trnd.RunTimes)
|
||||
}
|
||||
endIdx := arg.RunIndexEnd
|
||||
if endIdx > len(trnd.RunTimes) ||
|
||||
endIdx < starrkIDx ||
|
||||
endIdx == 0 {
|
||||
endIdx = len(trnd.RunTimes)
|
||||
}
|
||||
runTimes := trnd.RunTimes[starrkIDx:endIdx]
|
||||
if len(runTimes) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
var tStart, tEnd time.Time
|
||||
if arg.RunTimeStart == utils.EmptyString {
|
||||
tStart = runTimes[0]
|
||||
} else if tStart, err = utils.ParseTimeDetectLayout(arg.RunTimeStart, rkS.cgrcfg.GeneralCfg().DefaultTimezone); err != nil {
|
||||
return
|
||||
}
|
||||
if arg.RunTimeEnd == utils.EmptyString {
|
||||
tEnd = runTimes[len(runTimes)-1].Add(time.Duration(1))
|
||||
} else if tEnd, err = utils.ParseTimeDetectLayout(arg.RunTimeEnd, rkS.cgrcfg.GeneralCfg().DefaultTimezone); err != nil {
|
||||
return
|
||||
}
|
||||
retTrend.RunTimes = make([]time.Time, 0, len(runTimes))
|
||||
for _, runTime := range runTimes {
|
||||
if !runTime.Before(tStart) && runTime.Before(tEnd) {
|
||||
retTrend.RunTimes = append(retTrend.RunTimes, runTime)
|
||||
}
|
||||
}
|
||||
if len(retTrend.RunTimes) == 0 { // filtered out all
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
retTrend.Metrics = make(map[time.Time]map[string]*MetricWithTrend)
|
||||
for _, runTime := range retTrend.RunTimes {
|
||||
retTrend.Metrics[runTime] = trnd.Metrics[runTime]
|
||||
}
|
||||
return
|
||||
}
|
||||
*/
|
||||
|
||||
// V1GetSchedule returns the active schedule for Raking queries
|
||||
func (rkS *RankingS) V1GetSchedule(ctx *context.Context, args *utils.ArgScheduledRankings, schedRankings *[]utils.ScheduledRanking) (err error) {
|
||||
tnt := args.Tenant
|
||||
if tnt == utils.EmptyString {
|
||||
tnt = rkS.cgrcfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
rkS.crnTQsMux.RLock()
|
||||
defer rkS.crnTQsMux.RUnlock()
|
||||
trendIDsMp, has := rkS.crnTQs[tnt]
|
||||
if !has {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
var scheduledRankings []utils.ScheduledRanking
|
||||
var entryIds map[string]cron.EntryID
|
||||
if len(args.RankingIDPrefixes) == 0 {
|
||||
entryIds = trendIDsMp
|
||||
} else {
|
||||
entryIds = make(map[string]cron.EntryID)
|
||||
for _, rkID := range args.RankingIDPrefixes {
|
||||
for key, entryID := range trendIDsMp {
|
||||
if strings.HasPrefix(key, rkID) {
|
||||
entryIds[key] = entryID
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(entryIds) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
var entry cron.Entry
|
||||
for id, entryID := range entryIds {
|
||||
entry = rkS.crn.Entry(entryID)
|
||||
if entry.ID == 0 {
|
||||
continue
|
||||
}
|
||||
scheduledRankings = append(scheduledRankings,
|
||||
utils.ScheduledRanking{
|
||||
RankingID: id,
|
||||
Next: entry.Next,
|
||||
Previous: entry.Prev,
|
||||
})
|
||||
}
|
||||
slices.SortFunc(scheduledRankings, func(a, b utils.ScheduledRanking) int {
|
||||
return a.Next.Compare(b.Next)
|
||||
})
|
||||
*schedRankings = scheduledRankings
|
||||
return nil
|
||||
}
|
||||
@@ -82,7 +82,7 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) {
|
||||
&floatMetrics); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> computing trend for with id: <%s:%s> stats <%s> error: <%s>",
|
||||
"<%s> computing trend with id: <%s:%s> for stats <%s> error: <%s>",
|
||||
utils.TrendS, tP.Tenant, tP.ID, tP.StatID, err.Error()))
|
||||
return
|
||||
}
|
||||
@@ -538,11 +538,11 @@ func (tS *TrendS) V1GetScheduledTrends(ctx *context.Context, args *utils.ArgSche
|
||||
}
|
||||
var scheduledTrends []utils.ScheduledTrend
|
||||
var entryIds map[string]cron.EntryID
|
||||
if len(args.TrendIDPrefix) == 0 {
|
||||
if len(args.TrendIDPrefixes) == 0 {
|
||||
entryIds = trendIDsMp
|
||||
} else {
|
||||
entryIds = make(map[string]cron.EntryID)
|
||||
for _, tID := range args.TrendIDPrefix {
|
||||
for _, tID := range args.TrendIDPrefixes {
|
||||
for key, entryID := range trendIDsMp {
|
||||
if strings.HasPrefix(key, tID) {
|
||||
entryIds[key] = entryID
|
||||
@@ -560,9 +560,9 @@ func (tS *TrendS) V1GetScheduledTrends(ctx *context.Context, args *utils.ArgSche
|
||||
continue
|
||||
}
|
||||
scheduledTrends = append(scheduledTrends, utils.ScheduledTrend{
|
||||
TrendID: id,
|
||||
Next: entry.Next,
|
||||
Prev: entry.Prev,
|
||||
TrendID: id,
|
||||
Next: entry.Next,
|
||||
Previous: entry.Prev,
|
||||
})
|
||||
}
|
||||
slices.SortFunc(scheduledTrends, func(a, b utils.ScheduledTrend) int {
|
||||
|
||||
@@ -61,63 +61,71 @@ type RankingService struct {
|
||||
connChan chan birpc.ClientConnector
|
||||
anz *AnalyzerService
|
||||
srvDep map[string]*sync.WaitGroup
|
||||
rks *engine.RankingS
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (rg *RankingService) Start() error {
|
||||
if rg.IsRunning() {
|
||||
func (rk *RankingService) Start() error {
|
||||
if rk.IsRunning() {
|
||||
return utils.ErrServiceAlreadyRunning
|
||||
}
|
||||
rg.srvDep[utils.DataDB].Add(1)
|
||||
<-rg.cacheS.GetPrecacheChannel(utils.CacheRankingProfiles)
|
||||
<-rg.cacheS.GetPrecacheChannel(utils.CacheRankingFilterIndexes)
|
||||
rk.srvDep[utils.DataDB].Add(1)
|
||||
<-rk.cacheS.GetPrecacheChannel(utils.CacheRankingProfiles)
|
||||
<-rk.cacheS.GetPrecacheChannel(utils.CacheRankingFilterIndexes)
|
||||
|
||||
filterS := <-rg.filterSChan
|
||||
rg.filterSChan <- filterS
|
||||
dbchan := rg.dm.GetDMChan()
|
||||
filterS := <-rk.filterSChan
|
||||
rk.filterSChan <- filterS
|
||||
dbchan := rk.dm.GetDMChan()
|
||||
datadb := <-dbchan
|
||||
dbchan <- datadb
|
||||
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem",
|
||||
utils.CoreS, utils.RankingS))
|
||||
|
||||
rk.rks = engine.NewRankingS(datadb, rk.connMgr, filterS, rk.cfg)
|
||||
if err := rk.rks.StartRankingS(); err != nil {
|
||||
return err
|
||||
}
|
||||
srv, err := engine.NewService(v1.NewRankingSv1())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !rg.cfg.DispatcherSCfg().Enabled {
|
||||
rg.server.RpcRegister(srv)
|
||||
if !rk.cfg.DispatcherSCfg().Enabled {
|
||||
rk.server.RpcRegister(srv)
|
||||
}
|
||||
rg.connChan <- rg.anz.GetInternalCodec(srv, utils.StatS)
|
||||
rk.connChan <- rk.anz.GetInternalCodec(srv, utils.StatS)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (rg *RankingService) Reload() (err error) {
|
||||
func (rk *RankingService) Reload() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// Shutdown stops the service
|
||||
func (rg *RankingService) Shutdown() (err error) {
|
||||
defer rg.srvDep[utils.DataDB].Done()
|
||||
rg.Lock()
|
||||
defer rg.Unlock()
|
||||
<-rg.connChan
|
||||
func (rk *RankingService) Shutdown() (err error) {
|
||||
defer rk.srvDep[utils.DataDB].Done()
|
||||
rk.Lock()
|
||||
defer rk.Unlock()
|
||||
rk.rks.StopRankingS()
|
||||
rk.rks = nil
|
||||
<-rk.connChan
|
||||
return
|
||||
}
|
||||
|
||||
// IsRunning returns if the service is running
|
||||
func (rg *RankingService) IsRunning() bool {
|
||||
rg.RLock()
|
||||
defer rg.RUnlock()
|
||||
return false
|
||||
func (rk *RankingService) IsRunning() bool {
|
||||
rk.RLock()
|
||||
defer rk.RUnlock()
|
||||
return rk.rks != nil
|
||||
}
|
||||
|
||||
// ServiceName returns the service name
|
||||
func (rg *RankingService) ServiceName() string {
|
||||
func (rk *RankingService) ServiceName() string {
|
||||
return utils.RankingS
|
||||
}
|
||||
|
||||
// ShouldRun returns if the service should be running
|
||||
func (rg *RankingService) ShouldRun() bool {
|
||||
return rg.cfg.RankingSCfg().Enabled
|
||||
func (rk *RankingService) ShouldRun() bool {
|
||||
return rk.cfg.RankingSCfg().Enabled
|
||||
}
|
||||
|
||||
@@ -112,6 +112,7 @@ func (tr *TrendService) Shutdown() (err error) {
|
||||
tr.Lock()
|
||||
defer tr.Unlock()
|
||||
tr.trs.StopTrendS()
|
||||
tr.trs = nil
|
||||
<-tr.connChan
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1660,7 +1660,7 @@ type ArgScheduleTrendQueries struct {
|
||||
}
|
||||
type ArgScheduledTrends struct {
|
||||
TenantIDWithAPIOpts
|
||||
TrendIDPrefix []string
|
||||
TrendIDPrefixes []string
|
||||
}
|
||||
|
||||
type ArgGetTrend struct {
|
||||
@@ -1675,5 +1675,21 @@ type ArgGetTrend struct {
|
||||
type ScheduledTrend struct {
|
||||
TrendID string
|
||||
Next time.Time
|
||||
Prev time.Time
|
||||
Previous time.Time
|
||||
}
|
||||
|
||||
type ArgScheduleRankingQueries struct {
|
||||
TenantIDWithAPIOpts
|
||||
RankingIDs []string
|
||||
}
|
||||
|
||||
type ArgScheduledRankings struct {
|
||||
TenantIDWithAPIOpts
|
||||
RankingIDPrefixes []string
|
||||
}
|
||||
|
||||
type ScheduledRanking struct {
|
||||
RankingID string
|
||||
Next time.Time
|
||||
Previous time.Time
|
||||
}
|
||||
|
||||
@@ -512,7 +512,9 @@ const (
|
||||
TotalUsage = "TotalUsage"
|
||||
StatID = "StatID"
|
||||
StatIDs = "StatIDs"
|
||||
SortedStatIDs = "SortedStatIDs"
|
||||
TrendID = "TrendID"
|
||||
RankingID = "RankingID"
|
||||
BalanceType = "BalanceType"
|
||||
BalanceID = "BalanceID"
|
||||
BalanceDestinationIds = "BalanceDestinationIds"
|
||||
@@ -529,6 +531,7 @@ const (
|
||||
AccountUpdate = "AccountUpdate"
|
||||
StatUpdate = "StatUpdate"
|
||||
TrendUpdate = "TrendUpdate"
|
||||
RankingUpdate = "RankingUpdate"
|
||||
ResourceUpdate = "ResourceUpdate"
|
||||
CDR = "CDR"
|
||||
CDRs = "CDRs"
|
||||
@@ -1993,6 +1996,7 @@ const (
|
||||
CacheRankingProfiles = "*ranking_profiles"
|
||||
CacheTrendProfiles = "*trend_profiles"
|
||||
CacheTrends = "*trends"
|
||||
CacheRankings = "*rankings"
|
||||
CacheThresholdProfiles = "*threshold_profiles"
|
||||
CacheThresholds = "*thresholds"
|
||||
CacheFilters = "*filters"
|
||||
|
||||
Reference in New Issue
Block a user