mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
537 lines
16 KiB
Go
537 lines
16 KiB
Go
/*
|
|
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
|
Copyright (C) ITsysCOM GmbH
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Affero General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Affero General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Affero General Public License
|
|
along with this program. If not, see <https://www.gnu.org/licenses/>
|
|
*/
|
|
|
|
package engine
|
|
|
|
import (
|
|
"fmt"
|
|
"runtime"
|
|
"slices"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/cgrates/birpc/context"
|
|
"github.com/cgrates/cgrates/config"
|
|
"github.com/cgrates/cgrates/utils"
|
|
"github.com/cgrates/cron"
|
|
)
|
|
|
|
// NewRankingS is the constructor for RankingS
|
|
func NewRankingS(dm *DataManager,
|
|
connMgr *ConnManager,
|
|
filterS *FilterS,
|
|
cgrcfg *config.CGRConfig) *RankingS {
|
|
return &RankingS{
|
|
dm: dm,
|
|
connMgr: connMgr,
|
|
filterS: filterS,
|
|
cgrcfg: cgrcfg,
|
|
crn: cron.New(),
|
|
crnTQsMux: new(sync.RWMutex),
|
|
crnTQs: make(map[string]map[string]cron.EntryID),
|
|
storedRankings: make(utils.StringSet),
|
|
storingStopped: make(chan struct{}),
|
|
rankingStop: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// RankingS is responsible of implementing the logic of RankingService
|
|
type RankingS struct {
|
|
dm *DataManager
|
|
connMgr *ConnManager
|
|
filterS *FilterS
|
|
cgrcfg *config.CGRConfig
|
|
|
|
crn *cron.Cron // cron reference
|
|
|
|
crnTQsMux *sync.RWMutex // protects the crnTQs
|
|
crnTQs map[string]map[string]cron.EntryID // save the EntryIDs for rankingQueries so we can reschedule them when needed
|
|
|
|
storedRankings utils.StringSet // keep a record of RankingS which need saving, map[rankingTenanrkID]bool
|
|
sRksMux sync.RWMutex // protects storedRankings
|
|
storingStopped chan struct{} // signal back that the operations were stopped
|
|
|
|
rankingStop chan struct{} // signal to stop all operations
|
|
|
|
}
|
|
|
|
// computeRanking will query the stats and build the Ranking for them
|
|
//
|
|
// it is to be called by Cron service
|
|
func (rkS *RankingS) computeRanking(rkP *RankingProfile) {
|
|
rk, err := rkS.dm.GetRanking(rkP.Tenant, rkP.ID, true, true, utils.NonTransactional)
|
|
if err != nil {
|
|
utils.Logger.Warning(
|
|
fmt.Sprintf(
|
|
"<%s> querying RankingProfile with ID: <%s:%s> dm error: <%s>",
|
|
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
|
|
return
|
|
}
|
|
rk.rMux.Lock()
|
|
defer rk.rMux.Unlock()
|
|
if rk.rkPrfl == nil {
|
|
rk.rkPrfl = rkP
|
|
}
|
|
rk.LastUpdate = time.Now()
|
|
rk.Metrics = make(map[string]map[string]float64) // reset previous values
|
|
rk.SortedStatIDs = make([]string, 0)
|
|
for _, statID := range rkP.StatIDs {
|
|
var floatMetrics map[string]float64
|
|
if err := rkS.connMgr.Call(context.Background(), rkS.cgrcfg.RankingSCfg().StatSConns,
|
|
utils.StatSv1GetQueueFloatMetrics,
|
|
&utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: rkP.Tenant, ID: statID}},
|
|
&floatMetrics); err != nil {
|
|
utils.Logger.Warning(
|
|
fmt.Sprintf(
|
|
"<%s> computing Ranking with ID: <%s:%s> for Stats <%s> error: <%s>",
|
|
utils.RankingS, rkP.Tenant, rkP.ID, statID, err.Error()))
|
|
return
|
|
}
|
|
if len(rk.metricIDs) != 0 {
|
|
for metricID := range floatMetrics {
|
|
if _, has := rk.metricIDs[statID]; !has {
|
|
delete(floatMetrics, metricID)
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(floatMetrics) != 0 {
|
|
rk.Metrics[statID] = make(map[string]float64)
|
|
}
|
|
for metricID, val := range floatMetrics {
|
|
rk.Metrics[statID][metricID] = val
|
|
}
|
|
}
|
|
if rk.SortedStatIDs, err = rankingSortStats(rkP.Sorting,
|
|
rkP.SortingParameters, rk.Metrics); err != nil {
|
|
utils.Logger.Warning(
|
|
fmt.Sprintf(
|
|
"<%s> sorting stats for Ranking with ID: <%s:%s> error: <%s>",
|
|
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
|
|
return
|
|
}
|
|
if err = rkS.storeRanking(rk); err != nil {
|
|
utils.Logger.Warning(
|
|
fmt.Sprintf(
|
|
"<%s> storing Ranking with ID: <%s:%s> DM error: <%s>",
|
|
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
|
|
return
|
|
}
|
|
if err := rkS.processThresholds(rk); err != nil {
|
|
utils.Logger.Warning(
|
|
fmt.Sprintf(
|
|
"<%s> Ranking with id <%s:%s> error: <%s> with ThresholdS",
|
|
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
|
|
}
|
|
if err := rkS.processEEs(rk); err != nil {
|
|
utils.Logger.Warning(
|
|
fmt.Sprintf(
|
|
"<%s> Trend with id <%s:%s> error: <%s> with EEs",
|
|
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
|
|
}
|
|
}
|
|
|
|
// processThresholds will pass the Ranking event to ThresholdS
|
|
func (rkS *RankingS) processThresholds(rk *Ranking) (err error) {
|
|
if len(rk.SortedStatIDs) == 0 {
|
|
return
|
|
}
|
|
if len(rkS.cgrcfg.RankingSCfg().ThresholdSConns) == 0 {
|
|
return
|
|
}
|
|
opts := map[string]any{
|
|
utils.MetaEventType: utils.RankingUpdate,
|
|
}
|
|
var thIDs []string
|
|
if len(rk.rkPrfl.ThresholdIDs) != 0 {
|
|
if len(rk.rkPrfl.ThresholdIDs) == 1 &&
|
|
rk.rkPrfl.ThresholdIDs[0] == utils.MetaNone {
|
|
return
|
|
}
|
|
thIDs = make([]string, len(rk.rkPrfl.ThresholdIDs))
|
|
copy(thIDs, rk.rkPrfl.ThresholdIDs)
|
|
}
|
|
opts[utils.OptsThresholdsProfileIDs] = thIDs
|
|
sortedStatIDs := make([]string, len(rk.SortedStatIDs))
|
|
copy(sortedStatIDs, rk.SortedStatIDs)
|
|
ev := &utils.CGREvent{
|
|
Tenant: rk.Tenant,
|
|
ID: utils.GenUUID(),
|
|
APIOpts: opts,
|
|
Event: map[string]any{
|
|
utils.RankingID: rk.ID,
|
|
utils.LastUpdate: rk.LastUpdate,
|
|
utils.SortedStatIDs: sortedStatIDs,
|
|
},
|
|
}
|
|
var withErrs bool
|
|
var rkIDs []string
|
|
if err := rkS.connMgr.Call(context.TODO(), rkS.cgrcfg.RankingSCfg().ThresholdSConns,
|
|
utils.ThresholdSv1ProcessEvent, ev, &rkIDs); err != nil &&
|
|
(len(thIDs) != 0 || err.Error() != utils.ErrNotFound.Error()) {
|
|
utils.Logger.Warning(
|
|
fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.", utils.RankingS, err.Error(), ev))
|
|
withErrs = true
|
|
}
|
|
if withErrs {
|
|
err = utils.ErrPartiallyExecuted
|
|
}
|
|
return
|
|
}
|
|
|
|
// processEEs will pass the Ranking event to EEs
|
|
func (rkS *RankingS) processEEs(rk *Ranking) (err error) {
|
|
if len(rk.SortedStatIDs) == 0 {
|
|
return
|
|
}
|
|
if len(rkS.cgrcfg.RankingSCfg().EEsConns) == 0 {
|
|
return
|
|
}
|
|
opts := map[string]any{
|
|
utils.MetaEventType: utils.RankingUpdate,
|
|
}
|
|
sortedStatIDs := make([]string, len(rk.SortedStatIDs))
|
|
copy(sortedStatIDs, rk.SortedStatIDs)
|
|
ev := &utils.CGREvent{
|
|
Tenant: rk.Tenant,
|
|
ID: utils.GenUUID(),
|
|
APIOpts: opts,
|
|
Event: map[string]any{
|
|
utils.RankingID: rk.ID,
|
|
utils.LastUpdate: rk.LastUpdate,
|
|
utils.SortedStatIDs: sortedStatIDs,
|
|
},
|
|
}
|
|
var withErrs bool
|
|
var reply map[string]map[string]any
|
|
if err := rkS.connMgr.Call(context.TODO(), rkS.cgrcfg.RankingSCfg().EEsConns,
|
|
utils.EeSv1ProcessEvent, ev, &reply); err != nil &&
|
|
err.Error() != utils.ErrNotFound.Error() {
|
|
utils.Logger.Warning(
|
|
fmt.Sprintf("<%s> error: %q processing event %+v with EEs.", utils.RankingS, err.Error(), ev))
|
|
withErrs = true
|
|
}
|
|
if withErrs {
|
|
err = utils.ErrPartiallyExecuted
|
|
}
|
|
return
|
|
}
|
|
|
|
// storeTrend will store or schedule the trend based on settings
|
|
func (rkS *RankingS) storeRanking(rk *Ranking) (err error) {
|
|
if rkS.cgrcfg.RankingSCfg().StoreInterval == 0 {
|
|
return
|
|
}
|
|
if rkS.cgrcfg.RankingSCfg().StoreInterval == -1 {
|
|
return rkS.dm.SetRanking(rk)
|
|
}
|
|
// schedule the asynchronous save, relies for Ranking to be in cache
|
|
rkS.sRksMux.Lock()
|
|
rkS.storedRankings.Add(rk.rkPrfl.TenantID())
|
|
rkS.sRksMux.Unlock()
|
|
return
|
|
}
|
|
|
|
// storeRankings will do one round for saving modified Rankings
|
|
//
|
|
// from cache to dataDB
|
|
// designed to run asynchronously
|
|
func (rkS *RankingS) storeRankings() {
|
|
var failedRkIDs []string
|
|
for {
|
|
rkS.sRksMux.Lock()
|
|
rkID := rkS.storedRankings.GetOne()
|
|
if rkID != utils.EmptyString {
|
|
rkS.storedRankings.Remove(rkID)
|
|
}
|
|
rkS.sRksMux.Unlock()
|
|
if rkID == utils.EmptyString {
|
|
break // no more keys, backup completed
|
|
}
|
|
rkIf, ok := Cache.Get(utils.CacheRankings, rkID)
|
|
if !ok || rkIf == nil {
|
|
utils.Logger.Warning(
|
|
fmt.Sprintf("<%s> failed retrieving from cache Ranking with ID: %q",
|
|
utils.RankingS, rkID))
|
|
failedRkIDs = append(failedRkIDs, rkID) // record failure so we can schedule it for next backup
|
|
continue
|
|
}
|
|
rk := rkIf.(*Ranking)
|
|
rk.rMux.RLock()
|
|
if err := rkS.dm.SetRanking(rk); err != nil {
|
|
utils.Logger.Warning(
|
|
fmt.Sprintf("<%s> failed storing Trend with ID: %q, err: %q",
|
|
utils.RankingS, rkID, err))
|
|
failedRkIDs = append(failedRkIDs, rkID) // record failure so we can schedule it for next backup
|
|
}
|
|
rk.rMux.RUnlock()
|
|
// randomize the CPU load and give up thread control
|
|
runtime.Gosched()
|
|
}
|
|
if len(failedRkIDs) != 0 { // there were errors on save, schedule the keys for next backup
|
|
rkS.sRksMux.Lock()
|
|
rkS.storedRankings.AddSlice(failedRkIDs)
|
|
rkS.sRksMux.Unlock()
|
|
}
|
|
}
|
|
|
|
// asyncStoreRankings runs as a backround process, calling storeRankings based on storeInterval
|
|
func (rkS *RankingS) asyncStoreRankings() {
|
|
storeInterval := rkS.cgrcfg.RankingSCfg().StoreInterval
|
|
if storeInterval <= 0 {
|
|
close(rkS.storingStopped)
|
|
return
|
|
}
|
|
for {
|
|
rkS.storeRankings()
|
|
select {
|
|
case <-rkS.rankingStop:
|
|
close(rkS.storingStopped)
|
|
return
|
|
case <-time.After(storeInterval): // continue to another storing loop
|
|
}
|
|
}
|
|
}
|
|
|
|
// StartRankings will activates the Cron, together with all scheduled Ranking queries
|
|
func (rkS *RankingS) StartRankingS() (err error) {
|
|
if err = rkS.scheduleAutomaticQueries(); err != nil {
|
|
return
|
|
}
|
|
rkS.crn.Start()
|
|
go rkS.asyncStoreRankings()
|
|
return
|
|
}
|
|
|
|
// StopCron will shutdown the Cron tasks
|
|
func (rkS *RankingS) StopRankingS() {
|
|
timeEnd := time.Now().Add(rkS.cgrcfg.CoreSCfg().ShutdownTimeout)
|
|
|
|
ctx := rkS.crn.Stop()
|
|
close(rkS.rankingStop)
|
|
|
|
// Wait for cron
|
|
select {
|
|
case <-ctx.Done():
|
|
case <-time.After(timeEnd.Sub(time.Now())):
|
|
utils.Logger.Warning(
|
|
fmt.Sprintf(
|
|
"<%s> timeout waiting for Cron to finish",
|
|
utils.RankingS))
|
|
return
|
|
}
|
|
// Wait for backup and other operations
|
|
select {
|
|
case <-rkS.storingStopped:
|
|
case <-time.After(timeEnd.Sub(time.Now())):
|
|
utils.Logger.Warning(
|
|
fmt.Sprintf(
|
|
"<%s> timeout waiting for RankingS to finish",
|
|
utils.RankingS))
|
|
return
|
|
}
|
|
}
|
|
|
|
func (rkS *RankingS) Reload() {
|
|
ctx := rkS.crn.Stop()
|
|
close(rkS.rankingStop)
|
|
<-ctx.Done()
|
|
<-rkS.storingStopped
|
|
rkS.rankingStop = make(chan struct{})
|
|
rkS.storingStopped = make(chan struct{})
|
|
rkS.crn.Start()
|
|
go rkS.asyncStoreRankings()
|
|
}
|
|
|
|
// scheduleAutomaticQueries will schedule the queries at start/reload based on configured
|
|
func (rkS *RankingS) scheduleAutomaticQueries() error {
|
|
schedData := make(map[string][]string)
|
|
for k, v := range rkS.cgrcfg.RankingSCfg().ScheduledIDs {
|
|
schedData[k] = v
|
|
}
|
|
var tnts []string
|
|
if len(schedData) == 0 {
|
|
tnts = make([]string, 0)
|
|
}
|
|
for tnt, rkIDs := range schedData {
|
|
if len(rkIDs) == 0 {
|
|
tnts = append(tnts, tnt)
|
|
}
|
|
}
|
|
if tnts != nil {
|
|
qrydData, err := rkS.dm.GetRankingProfileIDs(tnts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for tnt, ids := range qrydData {
|
|
schedData[tnt] = ids
|
|
}
|
|
}
|
|
for tnt, rkIDs := range schedData {
|
|
if _, err := rkS.scheduleRankingQueries(context.TODO(), tnt, rkIDs); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// scheduleTrendQueries will schedule/re-schedule specific trend queries
|
|
func (rkS *RankingS) scheduleRankingQueries(_ *context.Context,
|
|
tnt string, rkIDs []string) (scheduled int, err error) {
|
|
var partial bool
|
|
rkS.crnTQsMux.Lock()
|
|
if _, has := rkS.crnTQs[tnt]; !has {
|
|
rkS.crnTQs[tnt] = make(map[string]cron.EntryID)
|
|
}
|
|
rkS.crnTQsMux.Unlock()
|
|
for _, rkID := range rkIDs {
|
|
rkS.crnTQsMux.RLock()
|
|
if entryID, has := rkS.crnTQs[tnt][rkID]; has {
|
|
rkS.crn.Remove(entryID) // deschedule the query
|
|
}
|
|
rkS.crnTQsMux.RUnlock()
|
|
if rkP, err := rkS.dm.GetRankingProfile(tnt, rkID, true, true, utils.NonTransactional); err != nil {
|
|
utils.Logger.Warning(
|
|
fmt.Sprintf(
|
|
"<%s> failed retrieving RankingProfile with id: <%s:%s> for scheduling, error: <%s>",
|
|
utils.RankingS, tnt, rkID, err.Error()))
|
|
partial = true
|
|
} else if entryID, err := rkS.crn.AddFunc(rkP.Schedule,
|
|
func() { rkS.computeRanking(rkP.Clone()) }); err != nil {
|
|
utils.Logger.Warning(
|
|
fmt.Sprintf(
|
|
"<%s> scheduling RankingProfile <%s:%s>, error: <%s>",
|
|
utils.RankingS, tnt, rkID, err.Error()))
|
|
partial = true
|
|
} else { // log the entry ID for debugging
|
|
rkS.crnTQsMux.Lock()
|
|
rkS.crnTQs[rkP.Tenant][rkP.ID] = entryID
|
|
rkS.crnTQsMux.Unlock()
|
|
scheduled++
|
|
}
|
|
}
|
|
if partial {
|
|
return 0, utils.ErrPartiallyExecuted
|
|
}
|
|
return
|
|
}
|
|
|
|
// V1ScheduleQueries is the query for manually re-/scheduling Ranking Queries
|
|
func (rkS *RankingS) V1ScheduleQueries(ctx *context.Context, args *utils.ArgScheduleRankingQueries, scheduled *int) (err error) {
|
|
if sched, errSched := rkS.scheduleRankingQueries(ctx, args.Tenant, args.RankingIDs); errSched != nil {
|
|
return errSched
|
|
} else {
|
|
*scheduled = sched
|
|
}
|
|
return
|
|
}
|
|
|
|
// V1GetRanking is the API to return the Ranking instance
|
|
func (rkS *RankingS) V1GetRanking(ctx *context.Context, arg *utils.TenantIDWithAPIOpts, retRanking *Ranking) (err error) {
|
|
if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing
|
|
return utils.NewErrMandatoryIeMissing(missing...)
|
|
}
|
|
var rk *Ranking
|
|
if rk, err = rkS.dm.GetRanking(arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
|
|
return
|
|
}
|
|
rk.rMux.RLock()
|
|
defer rk.rMux.RUnlock()
|
|
retRanking.Tenant = rk.Tenant // avoid vet complaining for mutex copying
|
|
retRanking.ID = rk.ID
|
|
retRanking.Metrics = make(map[string]map[string]float64)
|
|
for statID, metrics := range rk.Metrics {
|
|
retRanking.Metrics[statID] = make(map[string]float64)
|
|
for metricID, val := range metrics {
|
|
retRanking.Metrics[statID][metricID] = val
|
|
}
|
|
}
|
|
retRanking.LastUpdate = rk.LastUpdate
|
|
retRanking.Sorting = rk.Sorting
|
|
|
|
retRanking.SortingParameters = make([]string, len(rk.SortingParameters))
|
|
copy(retRanking.SortingParameters, rk.SortingParameters)
|
|
|
|
retRanking.SortedStatIDs = make([]string, len(rk.SortedStatIDs))
|
|
copy(retRanking.SortedStatIDs, rk.SortedStatIDs)
|
|
return
|
|
}
|
|
|
|
// V1GetSchedule returns the active schedule for Raking queries
|
|
func (rkS *RankingS) V1GetSchedule(ctx *context.Context, args *utils.ArgScheduledRankings, schedRankings *[]utils.ScheduledRanking) (err error) {
|
|
tnt := args.Tenant
|
|
if tnt == utils.EmptyString {
|
|
tnt = rkS.cgrcfg.GeneralCfg().DefaultTenant
|
|
}
|
|
rkS.crnTQsMux.RLock()
|
|
defer rkS.crnTQsMux.RUnlock()
|
|
rankingIDsMp, has := rkS.crnTQs[tnt]
|
|
if !has {
|
|
return utils.ErrNotFound
|
|
}
|
|
var scheduledRankings []utils.ScheduledRanking
|
|
var entryIds map[string]cron.EntryID
|
|
if len(args.RankingIDPrefixes) == 0 {
|
|
entryIds = rankingIDsMp
|
|
} else {
|
|
entryIds = make(map[string]cron.EntryID)
|
|
for _, rkID := range args.RankingIDPrefixes {
|
|
for key, entryID := range rankingIDsMp {
|
|
if strings.HasPrefix(key, rkID) {
|
|
entryIds[key] = entryID
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if len(entryIds) == 0 {
|
|
return utils.ErrNotFound
|
|
}
|
|
var entry cron.Entry
|
|
for id, entryID := range entryIds {
|
|
entry = rkS.crn.Entry(entryID)
|
|
if entry.ID == 0 {
|
|
continue
|
|
}
|
|
scheduledRankings = append(scheduledRankings,
|
|
utils.ScheduledRanking{
|
|
RankingID: id,
|
|
Next: entry.Next,
|
|
Previous: entry.Prev,
|
|
})
|
|
}
|
|
slices.SortFunc(scheduledRankings, func(a, b utils.ScheduledRanking) int {
|
|
return a.Next.Compare(b.Next)
|
|
})
|
|
*schedRankings = scheduledRankings
|
|
return nil
|
|
}
|
|
|
|
// V1GetRankingSummary returns a summary of ascending/descending stat of the last updated ranking
|
|
func (rS *RankingS) V1GetRankingSummary(ctx *context.Context, arg *utils.TenantIDWithAPIOpts, reply *RankingSummary) (err error) {
|
|
var rnk *Ranking
|
|
if rnk, err = rS.dm.GetRanking(arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
|
|
return
|
|
}
|
|
rnk.rMux.RLock()
|
|
rnkS := rnk.asRankingSummary()
|
|
rnk.rMux.RUnlock()
|
|
*reply = *rnkS
|
|
return
|
|
}
|