mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-20 06:38:45 +05:00
move rankings to dedicated package
This commit is contained in:
committed by
Dan Christian Bogos
parent
47fb25b4ef
commit
c762de5c28
@@ -51,8 +51,8 @@ type DataDBMock struct {
|
||||
SetTrendProfileDrvF func(ctx *context.Context, tr *utils.TrendProfile) (err error)
|
||||
GetTrendProfileDrvF func(ctx *context.Context, tenant string, id string) (sq *utils.TrendProfile, err error)
|
||||
RemTrendProfileDrvF func(ctx *context.Context, tenant string, id string) (err error)
|
||||
SetRankingProfileDrvF func(ctx *context.Context, sq *RankingProfile) (err error)
|
||||
GetRankingProfileDrvF func(ctx *context.Context, tenant string, id string) (sq *RankingProfile, err error)
|
||||
SetRankingProfileDrvF func(ctx *context.Context, sq *utils.RankingProfile) (err error)
|
||||
GetRankingProfileDrvF func(ctx *context.Context, tenant string, id string) (sq *utils.RankingProfile, err error)
|
||||
RemRankingProfileDrvF func(ctx *context.Context, tenant string, id string) (err error)
|
||||
GetStatQueueProfileDrvF func(ctx *context.Context, tenant, id string) (sq *StatQueueProfile, err error)
|
||||
SetStatQueueProfileDrvF func(ctx *context.Context, sq *StatQueueProfile) (err error)
|
||||
@@ -228,14 +228,14 @@ func (dbM *DataDBMock) RemStatQueueDrv(ctx *context.Context, tenant, id string)
|
||||
}
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
func (dbM *DataDBMock) GetRankingProfileDrv(ctx *context.Context, tenant, id string) (sg *RankingProfile, err error) {
|
||||
func (dbM *DataDBMock) GetRankingProfileDrv(ctx *context.Context, tenant, id string) (sg *utils.RankingProfile, err error) {
|
||||
if dbM.GetStatQueueProfileDrvF != nil {
|
||||
return dbM.GetRankingProfileDrvF(ctx, tenant, id)
|
||||
}
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
func (dbM *DataDBMock) SetRankingProfileDrv(ctx *context.Context, rg *RankingProfile) (err error) {
|
||||
func (dbM *DataDBMock) SetRankingProfileDrv(ctx *context.Context, rg *utils.RankingProfile) (err error) {
|
||||
if dbM.SetRankingProfileDrvF(ctx, rg) != nil {
|
||||
return dbM.SetRankingProfileDrvF(ctx, rg)
|
||||
}
|
||||
@@ -249,11 +249,11 @@ func (dbM *DataDBMock) RemRankingProfileDrv(ctx *context.Context, tenant string,
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
func (dbM *DataDBMock) GetRankingDrv(ctx *context.Context, tenant, id string) (*Ranking, error) {
|
||||
func (dbM *DataDBMock) GetRankingDrv(ctx *context.Context, tenant, id string) (*utils.Ranking, error) {
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
func (dbM *DataDBMock) SetRankingDrv(ctx *context.Context, _ *Ranking) error {
|
||||
func (dbM *DataDBMock) SetRankingDrv(ctx *context.Context, _ *utils.Ranking) error {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
|
||||
@@ -175,7 +175,7 @@ func (dm *DataManager) CacheDataFromDB(ctx *context.Context, prfx string, ids []
|
||||
guardian.Guardian.UnguardIDs(lkID)
|
||||
case utils.RankingProfilePrefix:
|
||||
tntID := utils.NewTenantID(dataID)
|
||||
lkID := guardian.Guardian.GuardIDs("", dm.cfg.GeneralCfg().LockingTimeout, rankingProfileLockKey(tntID.Tenant, tntID.ID))
|
||||
lkID := guardian.Guardian.GuardIDs("", dm.cfg.GeneralCfg().LockingTimeout, utils.RankingProfileLockKey(tntID.Tenant, tntID.ID))
|
||||
_, err = dm.GetRankingProfile(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
|
||||
guardian.Guardian.UnguardIDs(lkID)
|
||||
case utils.TrendProfilePrefix:
|
||||
@@ -1199,14 +1199,14 @@ func (dm *DataManager) RemoveTrendProfile(ctx *context.Context, tenant, id strin
|
||||
return dm.RemoveTrend(ctx, tenant, id)
|
||||
}
|
||||
|
||||
func (dm *DataManager) GetRankingProfile(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool, transactionID string) (rgp *RankingProfile, err error) {
|
||||
func (dm *DataManager) GetRankingProfile(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool, transactionID string) (rgp *utils.RankingProfile, err error) {
|
||||
tntID := utils.ConcatenatedKey(tenant, id)
|
||||
if cacheRead {
|
||||
if x, ok := Cache.Get(utils.CacheRankingProfiles, tntID); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*RankingProfile), nil
|
||||
return x.(*utils.RankingProfile), nil
|
||||
}
|
||||
}
|
||||
if dm == nil {
|
||||
@@ -1279,7 +1279,7 @@ func (dm *DataManager) GetRankingProfileIDs(ctx *context.Context, tenants []stri
|
||||
return
|
||||
}
|
||||
|
||||
func (dm *DataManager) SetRankingProfile(ctx *context.Context, rnp *RankingProfile) (err error) {
|
||||
func (dm *DataManager) SetRankingProfile(ctx *context.Context, rnp *utils.RankingProfile) (err error) {
|
||||
if dm == nil {
|
||||
return utils.ErrNoDatabaseConn
|
||||
}
|
||||
@@ -1295,14 +1295,14 @@ func (dm *DataManager) SetRankingProfile(ctx *context.Context, rnp *RankingProfi
|
||||
dm.cfg.DataDbCfg().RplFiltered,
|
||||
utils.RankingProfilePrefix, rnp.TenantID(),
|
||||
utils.ReplicatorSv1SetRankingProfile,
|
||||
&RankingProfileWithAPIOpts{
|
||||
&utils.RankingProfileWithAPIOpts{
|
||||
RankingProfile: rnp,
|
||||
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
|
||||
dm.cfg.DataDbCfg().RplCache, utils.EmptyString)})
|
||||
}
|
||||
if oldRnk == nil || oldRnk.Sorting != rnp.Sorting ||
|
||||
oldRnk.Schedule != rnp.Schedule {
|
||||
if err = dm.SetRanking(ctx, NewRankingFromProfile(rnp)); err != nil {
|
||||
if err = dm.SetRanking(ctx, utils.NewRankingFromProfile(rnp)); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -1335,14 +1335,14 @@ func (dm *DataManager) RemoveRankingProfile(ctx *context.Context, tenant, id str
|
||||
}
|
||||
return
|
||||
}
|
||||
func (dm *DataManager) GetRanking(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool, transactionID string) (rn *Ranking, err error) {
|
||||
func (dm *DataManager) GetRanking(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool, transactionID string) (rn *utils.Ranking, err error) {
|
||||
tntID := utils.ConcatenatedKey(tenant, id)
|
||||
if cacheRead {
|
||||
if x, ok := Cache.Get(utils.CacheRankings, tntID); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*Ranking), nil
|
||||
return x.(*utils.Ranking), nil
|
||||
}
|
||||
}
|
||||
if dm == nil {
|
||||
@@ -1384,7 +1384,7 @@ func (dm *DataManager) GetRanking(ctx *context.Context, tenant, id string, cache
|
||||
}
|
||||
|
||||
// SetRanking stores Ranking in dataDB
|
||||
func (dm *DataManager) SetRanking(ctx *context.Context, rn *Ranking) (err error) {
|
||||
func (dm *DataManager) SetRanking(ctx *context.Context, rn *utils.Ranking) (err error) {
|
||||
if dm == nil {
|
||||
return utils.ErrNoDatabaseConn
|
||||
}
|
||||
@@ -1396,7 +1396,7 @@ func (dm *DataManager) SetRanking(ctx *context.Context, rn *Ranking) (err error)
|
||||
dm.cfg.DataDbCfg().RplFiltered,
|
||||
utils.RankingPrefix, rn.TenantID(), // this are used to get the host IDs from cache
|
||||
utils.ReplicatorSv1SetRanking,
|
||||
&RankingWithAPIOpts{
|
||||
&utils.RankingWithAPIOpts{
|
||||
Ranking: rn,
|
||||
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
|
||||
dm.cfg.DataDbCfg().RplCache, utils.EmptyString)}); err != nil {
|
||||
|
||||
@@ -140,7 +140,7 @@ func (dDP *dynamicDP) fieldAsInterface(fldPath []string) (val any, err error) {
|
||||
return dp.FieldAsInterface(fldPath[2:])
|
||||
case utils.MetaRankings:
|
||||
// sample of fieldName : ~*rankings.RankingID.SortedStatIDs[0]
|
||||
var rankingSum RankingSummary
|
||||
var rankingSum utils.RankingSummary
|
||||
if err := connMgr.Call(context.TODO(), dDP.rnkConns, utils.RankingSv1GetRankingSummary, &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: dDP.tenant, ID: fldPath[1]}}, &rankingSum); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -2633,14 +2633,14 @@ func TestFilterRanking(t *testing.T) {
|
||||
return fmt.Errorf("wrong args")
|
||||
}
|
||||
if argTntID.ID == "Ranking1" && argTntID.Tenant == "cgrates.org" {
|
||||
rn := Ranking{
|
||||
rn := utils.Ranking{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "Ranking1",
|
||||
LastUpdate: time.Now(),
|
||||
SortedStatIDs: []string{"Stat5", "Stat6", "Stat7", "Stat4", "Stat3", "Stat1", "Stat2"},
|
||||
}
|
||||
rnS := rn.asRankingSummary()
|
||||
*reply.(*RankingSummary) = *rnS
|
||||
rnS := rn.AsRankingSummary()
|
||||
*reply.(*utils.RankingSummary) = *rnS
|
||||
return nil
|
||||
}
|
||||
return utils.ErrNotFound
|
||||
|
||||
@@ -1,416 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package engine
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
type RankingProfileWithAPIOpts struct {
|
||||
*RankingProfile
|
||||
APIOpts map[string]any
|
||||
}
|
||||
|
||||
type RankingProfile struct {
|
||||
Tenant string // Tenant this profile belongs to
|
||||
ID string // Profile identification
|
||||
Schedule string // Cron schedule this profile should run at
|
||||
StatIDs []string // List of stat instances to query
|
||||
MetricIDs []string // Filter out only specific metrics in reply for sorting
|
||||
Sorting string // Sorting strategy. Possible values: <*asc|*desc>
|
||||
SortingParameters []string // Sorting parameters: depending on sorting type, list of metric ids for now with optional true or false in case of reverse logic is desired
|
||||
Stored bool // Offline storage activation for this profile
|
||||
ThresholdIDs []string // List of threshold IDs to limit this Ranking to. *none to disable threshold processing for it.
|
||||
}
|
||||
|
||||
func (sgp *RankingProfile) TenantID() string {
|
||||
return utils.ConcatenatedKey(sgp.Tenant, sgp.ID)
|
||||
}
|
||||
|
||||
// Clone will clone a RankingProfile
|
||||
func (rkP *RankingProfile) Clone() (cln *RankingProfile) {
|
||||
cln = &RankingProfile{
|
||||
Tenant: rkP.Tenant,
|
||||
ID: rkP.ID,
|
||||
Schedule: rkP.Schedule,
|
||||
Sorting: rkP.Sorting,
|
||||
}
|
||||
if rkP.StatIDs != nil {
|
||||
cln.StatIDs = make([]string, len(rkP.StatIDs))
|
||||
copy(cln.StatIDs, rkP.StatIDs)
|
||||
}
|
||||
if rkP.MetricIDs != nil {
|
||||
cln.MetricIDs = make([]string, len(rkP.MetricIDs))
|
||||
copy(cln.MetricIDs, rkP.MetricIDs)
|
||||
}
|
||||
if rkP.SortingParameters != nil {
|
||||
|
||||
cln.SortingParameters = make([]string, len(rkP.SortingParameters))
|
||||
copy(cln.SortingParameters, rkP.SortingParameters)
|
||||
}
|
||||
if rkP.ThresholdIDs != nil {
|
||||
cln.ThresholdIDs = make([]string, len(rkP.ThresholdIDs))
|
||||
copy(cln.ThresholdIDs, rkP.ThresholdIDs)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// rankingProfileLockKey returns the ID used to lock a RankingProfile with guardian
|
||||
func rankingProfileLockKey(tnt, id string) string {
|
||||
return utils.ConcatenatedKey(utils.CacheRankingProfiles, tnt, id)
|
||||
}
|
||||
func NewRankingFromProfile(rkP *RankingProfile) (rk *Ranking) {
|
||||
rk = &Ranking{
|
||||
Tenant: rkP.Tenant,
|
||||
ID: rkP.ID,
|
||||
Sorting: rkP.Sorting,
|
||||
Metrics: make(map[string]map[string]float64),
|
||||
|
||||
rkPrfl: rkP,
|
||||
metricIDs: utils.NewStringSet(rkP.MetricIDs),
|
||||
}
|
||||
if rkP.SortingParameters != nil {
|
||||
rk.SortingParameters = make([]string, len(rkP.SortingParameters))
|
||||
copy(rk.SortingParameters, rkP.SortingParameters)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type RankingWithAPIOpts struct {
|
||||
*Ranking
|
||||
APIOpts map[string]any
|
||||
}
|
||||
|
||||
// Ranking is one unit out of a profile
|
||||
type Ranking struct {
|
||||
rMux sync.RWMutex
|
||||
|
||||
Tenant string
|
||||
ID string
|
||||
LastUpdate time.Time
|
||||
Metrics map[string]map[string]float64 // map[statID]map[metricID]metricValue
|
||||
Sorting string
|
||||
SortingParameters []string
|
||||
|
||||
SortedStatIDs []string
|
||||
|
||||
rkPrfl *RankingProfile // store here the ranking profile so we can have it at hands further
|
||||
metricIDs utils.StringSet // convert the metricIDs here for faster matching
|
||||
|
||||
}
|
||||
|
||||
func (r *Ranking) TenantID() string {
|
||||
return utils.ConcatenatedKey(r.Tenant, r.ID)
|
||||
}
|
||||
|
||||
// asRankingSummary converts the Ranking instance into a RankingSummary one
|
||||
func (rk *Ranking) asRankingSummary() (rkSm *RankingSummary) {
|
||||
rkSm = &RankingSummary{
|
||||
Tenant: rk.Tenant,
|
||||
ID: rk.ID,
|
||||
LastUpdate: rk.LastUpdate,
|
||||
}
|
||||
rkSm.SortedStatIDs = make([]string, len(rk.SortedStatIDs))
|
||||
copy(rkSm.SortedStatIDs, rk.SortedStatIDs)
|
||||
return
|
||||
}
|
||||
|
||||
type rankingSorter interface {
|
||||
sortStatIDs() []string // sortStatIDs returns the sorted list of statIDs
|
||||
}
|
||||
|
||||
// rankingSortStats will return the list of sorted statIDs out of the sortingData map
|
||||
func rankingSortStats(sortingType string, sortingParams []string,
|
||||
Metrics map[string]map[string]float64) (sortedStatIDs []string, err error) {
|
||||
var rnkSrtr rankingSorter
|
||||
if rnkSrtr, err = newRankingSorter(sortingType, sortingParams, Metrics); err != nil {
|
||||
return
|
||||
}
|
||||
return rnkSrtr.sortStatIDs(), nil
|
||||
}
|
||||
|
||||
// newRankingSorter is the constructor for various ranking sorters
|
||||
//
|
||||
// returns error if the sortingType is not implemented
|
||||
func newRankingSorter(sortingType string, sortingParams []string,
|
||||
Metrics map[string]map[string]float64) (rkStr rankingSorter, err error) {
|
||||
switch sortingType {
|
||||
default:
|
||||
err = utils.ErrPrefixNotErrNotImplemented(sortingType)
|
||||
return
|
||||
case utils.MetaDesc:
|
||||
return newRankingDescSorter(sortingParams, Metrics), nil
|
||||
case utils.MetaAsc:
|
||||
return newRankingAscSorter(sortingParams, Metrics), nil
|
||||
}
|
||||
}
|
||||
|
||||
// newRankingDescSorter is a constructor for rankingDescSorter
|
||||
func newRankingDescSorter(sortingParams []string,
|
||||
Metrics map[string]map[string]float64) (rkDsrtr *rankingDescSorter) {
|
||||
clnSp := make([]string, len(sortingParams))
|
||||
sPReversed := make(utils.StringSet)
|
||||
for i, sP := range sortingParams { // clean the sortingParams, out of param:false or param:true definitions
|
||||
sPSlc := strings.Split(sP, utils.InInFieldSep)
|
||||
clnSp[i] = sPSlc[0]
|
||||
if len(sPSlc) > 1 && sPSlc[1] == utils.FalseStr {
|
||||
sPReversed.Add(sPSlc[0]) // param defined as param:false which should be added to reversing comparison
|
||||
}
|
||||
}
|
||||
rkDsrtr = &rankingDescSorter{
|
||||
clnSp,
|
||||
sPReversed,
|
||||
Metrics,
|
||||
make([]string, 0, len(Metrics))}
|
||||
for statID := range rkDsrtr.Metrics {
|
||||
rkDsrtr.statIDs = append(rkDsrtr.statIDs, statID)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// rankingDescSorter will sort data descendent for metrics in sortingParams or random if all equal
|
||||
type rankingDescSorter struct {
|
||||
sMetricIDs []string
|
||||
sMetricRev utils.StringSet // list of exceptios for sortingParams, reverting the sorting logic
|
||||
Metrics map[string]map[string]float64
|
||||
|
||||
statIDs []string // list of keys of the Metrics
|
||||
}
|
||||
|
||||
// sortStatIDs implements rankingSorter interface
|
||||
func (rkDsrtr *rankingDescSorter) sortStatIDs() []string {
|
||||
if len(rkDsrtr.statIDs) == 0 {
|
||||
return rkDsrtr.statIDs
|
||||
}
|
||||
sort.Slice(rkDsrtr.statIDs, func(i, j int) bool {
|
||||
for _, metricID := range rkDsrtr.sMetricIDs {
|
||||
val1, hasMetric1 := rkDsrtr.Metrics[rkDsrtr.statIDs[i]][metricID]
|
||||
val2, hasMetric2 := rkDsrtr.Metrics[rkDsrtr.statIDs[j]][metricID]
|
||||
if !hasMetric1 && !hasMetric2 {
|
||||
continue
|
||||
}
|
||||
if !hasMetric1 {
|
||||
return false
|
||||
}
|
||||
if !hasMetric2 {
|
||||
return true
|
||||
}
|
||||
//in case we have the same value for the current metricID we skip to the next one
|
||||
if val1 == val2 {
|
||||
continue
|
||||
}
|
||||
ret := val1 > val2
|
||||
if rkDsrtr.sMetricRev.Has(metricID) {
|
||||
ret = !ret
|
||||
}
|
||||
return ret
|
||||
}
|
||||
//in case that we have the same value for all params we return randomly
|
||||
return utils.BoolGenerator().RandomBool()
|
||||
})
|
||||
return rkDsrtr.statIDs
|
||||
}
|
||||
|
||||
// newRankingAscSorter is a constructor for rankingAscSorter
|
||||
func newRankingAscSorter(sortingParams []string,
|
||||
Metrics map[string]map[string]float64) (rkASrtr *rankingAscSorter) {
|
||||
clnSp := make([]string, len(sortingParams))
|
||||
sPReversed := make(utils.StringSet)
|
||||
for i, sP := range sortingParams { // clean the sortingParams, out of param:false or param:true definitions
|
||||
sPSlc := strings.Split(sP, utils.InInFieldSep)
|
||||
clnSp[i] = sPSlc[0]
|
||||
if len(sPSlc) > 1 && sPSlc[1] == utils.FalseStr {
|
||||
sPReversed.Add(sPSlc[0]) // param defined as param:false which should be added to reversing comparison
|
||||
}
|
||||
}
|
||||
rkASrtr = &rankingAscSorter{
|
||||
clnSp,
|
||||
sPReversed,
|
||||
Metrics,
|
||||
make([]string, 0, len(Metrics))}
|
||||
for statID := range rkASrtr.Metrics {
|
||||
rkASrtr.statIDs = append(rkASrtr.statIDs, statID)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// rankingAscSorter will sort data ascendent for metrics in sortingParams or randomly if all equal
|
||||
type rankingAscSorter struct {
|
||||
sMetricIDs []string
|
||||
sMetricRev utils.StringSet // list of exceptios for sortingParams, reverting the sorting logic
|
||||
Metrics map[string]map[string]float64
|
||||
|
||||
statIDs []string // list of keys of the Metrics
|
||||
}
|
||||
|
||||
// sortStatIDs implements rankingSorter interface
|
||||
func (rkASrtr *rankingAscSorter) sortStatIDs() []string {
|
||||
if len(rkASrtr.statIDs) == 0 {
|
||||
return rkASrtr.statIDs
|
||||
}
|
||||
sort.Slice(rkASrtr.statIDs, func(i, j int) bool {
|
||||
for _, metricID := range rkASrtr.sMetricIDs {
|
||||
val1, hasMetric1 := rkASrtr.Metrics[rkASrtr.statIDs[i]][metricID]
|
||||
val2, hasMetric2 := rkASrtr.Metrics[rkASrtr.statIDs[j]][metricID]
|
||||
if !hasMetric1 && !hasMetric2 {
|
||||
continue
|
||||
}
|
||||
if !hasMetric1 {
|
||||
return false
|
||||
}
|
||||
if !hasMetric2 {
|
||||
return true
|
||||
}
|
||||
//in case we have the same value for the current metricID we skip to the next one
|
||||
if val1 == val2 {
|
||||
continue
|
||||
}
|
||||
ret := val2 > val1
|
||||
if rkASrtr.sMetricRev.Has(metricID) {
|
||||
ret = !ret // reversed logic in case of metric:false in params
|
||||
}
|
||||
return ret
|
||||
}
|
||||
//in case that we have the same value for all params we return randomly
|
||||
return utils.BoolGenerator().RandomBool()
|
||||
})
|
||||
return rkASrtr.statIDs
|
||||
}
|
||||
|
||||
// RankingSummary is the event sent to TrendS and EEs
|
||||
type RankingSummary struct {
|
||||
Tenant string
|
||||
ID string
|
||||
LastUpdate time.Time
|
||||
SortedStatIDs []string
|
||||
}
|
||||
|
||||
func (tp *RankingProfile) Set(path []string, val any, _ bool) (err error) {
|
||||
if len(path) != 1 {
|
||||
return utils.ErrWrongPath
|
||||
}
|
||||
|
||||
switch path[0] {
|
||||
default:
|
||||
return utils.ErrWrongPath
|
||||
case utils.Tenant:
|
||||
tp.Tenant = utils.IfaceAsString(val)
|
||||
case utils.ID:
|
||||
tp.ID = utils.IfaceAsString(val)
|
||||
case utils.Schedule:
|
||||
tp.Schedule = utils.IfaceAsString(val)
|
||||
case utils.StatIDs:
|
||||
var valA []string
|
||||
valA, err = utils.IfaceAsStringSlice(val)
|
||||
tp.StatIDs = append(tp.StatIDs, valA...)
|
||||
case utils.MetricIDs:
|
||||
var valA []string
|
||||
valA, err = utils.IfaceAsStringSlice(val)
|
||||
tp.MetricIDs = append(tp.MetricIDs, valA...)
|
||||
case utils.Sorting:
|
||||
tp.Sorting = utils.IfaceAsString(val)
|
||||
case utils.SortingParameters:
|
||||
var valA []string
|
||||
valA, err = utils.IfaceAsStringSlice(val)
|
||||
tp.SortingParameters = append(tp.SortingParameters, valA...)
|
||||
case utils.Stored:
|
||||
tp.Stored, err = utils.IfaceAsBool(val)
|
||||
case utils.ThresholdIDs:
|
||||
var valA []string
|
||||
valA, err = utils.IfaceAsStringSlice(val)
|
||||
tp.ThresholdIDs = append(tp.ThresholdIDs, valA...)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (tp *RankingProfile) Merge(v2 any) {
|
||||
vi := v2.(*RankingProfile)
|
||||
if len(vi.Tenant) != 0 {
|
||||
tp.Tenant = vi.Tenant
|
||||
}
|
||||
if len(vi.ID) != 0 {
|
||||
tp.ID = vi.ID
|
||||
}
|
||||
if len(vi.Schedule) != 0 {
|
||||
tp.Schedule = vi.Schedule
|
||||
}
|
||||
tp.StatIDs = append(tp.StatIDs, vi.StatIDs...)
|
||||
tp.MetricIDs = append(tp.MetricIDs, vi.MetricIDs...)
|
||||
tp.SortingParameters = append(tp.SortingParameters, vi.SortingParameters...)
|
||||
tp.ThresholdIDs = append(tp.ThresholdIDs, vi.ThresholdIDs...)
|
||||
if len(vi.Sorting) != 0 {
|
||||
tp.Sorting = vi.Sorting
|
||||
}
|
||||
if vi.Stored {
|
||||
tp.Stored = vi.Stored
|
||||
}
|
||||
}
|
||||
|
||||
func (tp *RankingProfile) String() string { return utils.ToJSON(tp) }
|
||||
func (tp *RankingProfile) FieldAsString(fldPath []string) (_ string, err error) {
|
||||
var val any
|
||||
if val, err = tp.FieldAsInterface(fldPath); err != nil {
|
||||
return
|
||||
}
|
||||
return utils.IfaceAsString(val), nil
|
||||
}
|
||||
func (tp *RankingProfile) FieldAsInterface(fldPath []string) (_ any, err error) {
|
||||
if len(fldPath) != 1 {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
switch fldPath[0] {
|
||||
default:
|
||||
fld, idx := utils.GetPathIndex(fldPath[0])
|
||||
if idx != nil {
|
||||
switch fld {
|
||||
case utils.StatIDs:
|
||||
if *idx < len(tp.StatIDs) {
|
||||
return tp.StatIDs[*idx], nil
|
||||
}
|
||||
case utils.MetricIDs:
|
||||
if *idx < len(tp.MetricIDs) {
|
||||
return tp.MetricIDs[*idx], nil
|
||||
}
|
||||
case utils.SortingParameters:
|
||||
if *idx < len(tp.SortingParameters) {
|
||||
return tp.SortingParameters[*idx], nil
|
||||
}
|
||||
case utils.ThresholdIDs:
|
||||
if *idx < len(tp.ThresholdIDs) {
|
||||
return tp.ThresholdIDs[*idx], nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, utils.ErrNotFound
|
||||
case utils.Tenant:
|
||||
return tp.Tenant, nil
|
||||
case utils.ID:
|
||||
return tp.ID, nil
|
||||
case utils.Schedule:
|
||||
return tp.Schedule, nil
|
||||
case utils.Sorting:
|
||||
return tp.Sorting, nil
|
||||
case utils.Stored:
|
||||
return tp.Stored, nil
|
||||
}
|
||||
}
|
||||
@@ -1,701 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package engine
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func TestRankingProfileTenantID(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
profile RankingProfile
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "Tenant and ID",
|
||||
profile: RankingProfile{Tenant: "cgrates.org", ID: "1"},
|
||||
expected: "cgrates.org:1",
|
||||
},
|
||||
{
|
||||
name: "Empty tenant",
|
||||
profile: RankingProfile{ID: "2"},
|
||||
expected: ":2",
|
||||
},
|
||||
{
|
||||
name: "Empty ID",
|
||||
profile: RankingProfile{Tenant: "cgrates.org"},
|
||||
expected: "cgrates.org:",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := tc.profile.TenantID()
|
||||
if got != tc.expected {
|
||||
t.Errorf("TenantID() = %v, want %v", got, tc.expected)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRankingProfileClone(t *testing.T) {
|
||||
original := &RankingProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "ID1",
|
||||
Schedule: "@every 1sec",
|
||||
Sorting: "asc",
|
||||
}
|
||||
|
||||
cloned := original.Clone()
|
||||
|
||||
if cloned.Tenant != original.Tenant {
|
||||
t.Errorf("Expected Tenant %s, got %s", original.Tenant, cloned.Tenant)
|
||||
}
|
||||
if cloned.ID != original.ID {
|
||||
t.Errorf("Expected ID %s, got %s", original.ID, cloned.ID)
|
||||
}
|
||||
if cloned.Schedule != original.Schedule {
|
||||
t.Errorf("Expected Schedule %s, got %s", original.Schedule, cloned.Schedule)
|
||||
}
|
||||
if cloned.Sorting != original.Sorting {
|
||||
t.Errorf("Expected Sorting %s, got %s", original.Sorting, cloned.Sorting)
|
||||
}
|
||||
|
||||
if cloned == original {
|
||||
t.Error("Clone should return a new instance, but it returned the same reference")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewRankingFromProfile(t *testing.T) {
|
||||
profile := &RankingProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "ID1",
|
||||
Sorting: "asc",
|
||||
MetricIDs: []string{"metric1", "metric2"},
|
||||
SortingParameters: []string{"param1", "param2"},
|
||||
}
|
||||
|
||||
ranking := NewRankingFromProfile(profile)
|
||||
|
||||
if ranking.Tenant != profile.Tenant {
|
||||
t.Errorf("Expected Tenant %s, got %s", profile.Tenant, ranking.Tenant)
|
||||
}
|
||||
if ranking.ID != profile.ID {
|
||||
t.Errorf("Expected ID %s, got %s", profile.ID, ranking.ID)
|
||||
}
|
||||
if ranking.Sorting != profile.Sorting {
|
||||
t.Errorf("Expected Sorting %s, got %s", profile.Sorting, ranking.Sorting)
|
||||
}
|
||||
|
||||
if ranking.Metrics == nil {
|
||||
t.Error("Expected Metrics map to be initialized, but it is nil")
|
||||
}
|
||||
|
||||
expectedMetricIDs := utils.NewStringSet(profile.MetricIDs)
|
||||
if !ranking.metricIDs.Equals(expectedMetricIDs) {
|
||||
t.Errorf("Expected metricIDs %v, got %v", expectedMetricIDs, ranking.metricIDs)
|
||||
}
|
||||
|
||||
if len(ranking.SortingParameters) != len(profile.SortingParameters) {
|
||||
t.Errorf("Expected SortingParameters length %d, got %d", len(profile.SortingParameters), len(ranking.SortingParameters))
|
||||
}
|
||||
for i, param := range profile.SortingParameters {
|
||||
if ranking.SortingParameters[i] != param {
|
||||
t.Errorf("Expected SortingParameters[%d] %s, got %s", i, param, ranking.SortingParameters[i])
|
||||
}
|
||||
}
|
||||
|
||||
if ranking.rkPrfl != profile {
|
||||
t.Error("Expected rkPrfl to reference the original profile")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRankingTenantID(t *testing.T) {
|
||||
r := &Ranking{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "1",
|
||||
}
|
||||
expectedTenantID := "cgrates.org:1"
|
||||
actualTenantID := r.TenantID()
|
||||
if actualTenantID != expectedTenantID {
|
||||
t.Errorf("Expected tenant ID %s, got %s", expectedTenantID, actualTenantID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRanking_asRankingSummary(t *testing.T) {
|
||||
rk := &Ranking{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "ID1",
|
||||
LastUpdate: time.Now(),
|
||||
SortedStatIDs: []string{"stat1", "stat2", "stat3"},
|
||||
}
|
||||
|
||||
rkSummary := rk.asRankingSummary()
|
||||
|
||||
if rkSummary.Tenant != rk.Tenant {
|
||||
t.Errorf("Expected Tenant %s, but got %s", rk.Tenant, rkSummary.Tenant)
|
||||
}
|
||||
if rkSummary.ID != rk.ID {
|
||||
t.Errorf("Expected ID %s, but got %s", rk.ID, rkSummary.ID)
|
||||
}
|
||||
if !rkSummary.LastUpdate.Equal(rk.LastUpdate) {
|
||||
t.Errorf("Expected LastUpdate %v, but got %v", rk.LastUpdate, rkSummary.LastUpdate)
|
||||
}
|
||||
if !reflect.DeepEqual(rkSummary.SortedStatIDs, rk.SortedStatIDs) {
|
||||
t.Errorf("Expected SortedStatIDs %v, but got %v", rk.SortedStatIDs, rkSummary.SortedStatIDs)
|
||||
}
|
||||
|
||||
if &rkSummary.SortedStatIDs == &rk.SortedStatIDs {
|
||||
t.Errorf("Expected SortedStatIDs slice to be copied, not referenced")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRankingSortStats(t *testing.T) {
|
||||
metrics := map[string]map[string]float64{
|
||||
"stat1": {
|
||||
"metric1": 5.0,
|
||||
"metric2": 10.0,
|
||||
},
|
||||
"stat2": {
|
||||
"metric1": 3.0,
|
||||
"metric2": 12.0,
|
||||
},
|
||||
"stat3": {
|
||||
"metric1": 7.0,
|
||||
"metric2": 8.0,
|
||||
},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
sortingType string
|
||||
sortingParams []string
|
||||
expectedOrder []string
|
||||
expectError bool
|
||||
}{
|
||||
{
|
||||
name: "Sort Descending by metric1",
|
||||
sortingType: utils.MetaDesc,
|
||||
sortingParams: []string{"metric1"},
|
||||
expectedOrder: []string{"stat3", "stat1", "stat2"},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "Sort Ascending by metric2",
|
||||
sortingType: utils.MetaAsc,
|
||||
sortingParams: []string{"metric2"},
|
||||
expectedOrder: []string{"stat3", "stat1", "stat2"},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "Unsupported sorting type",
|
||||
sortingType: "unsupported",
|
||||
sortingParams: []string{"metric1"},
|
||||
expectedOrder: nil,
|
||||
expectError: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
sortedStatIDs, err := rankingSortStats(tt.sortingType, tt.sortingParams, metrics)
|
||||
if tt.expectError {
|
||||
if err == nil {
|
||||
t.Errorf("expected error but got nil")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
for i, id := range sortedStatIDs {
|
||||
if id != tt.expectedOrder[i] {
|
||||
t.Errorf("expected sorted statID %v at index %d, got %v", tt.expectedOrder[i], i, id)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRankingMixedOrder(t *testing.T) {
|
||||
statmetrics := map[string]map[string]float64{
|
||||
"Stat1": {"*acc": 13},
|
||||
"Stat6": {"*acc": 10, "*pdd": 700, "*tcc": 121},
|
||||
"Stat2": {"*acc": 14},
|
||||
"Stat5": {"*acc": 10, "*pdd": 700, "*tcc": 120},
|
||||
"Stat3": {"*acc": 12.1, "*pdd": 900},
|
||||
"Stat7": {"*acc": 10, "*pdd": 600, "*tcc": 123},
|
||||
"Stat4": {"*acc": 12.1, "*pdd": 1000},
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
sortMetric []string
|
||||
sorter string
|
||||
statIDs []string
|
||||
expErr error
|
||||
}{
|
||||
{
|
||||
name: "TestSortStatsAsc",
|
||||
sortMetric: []string{"*acc", "*pdd:false", "*tcc"},
|
||||
sorter: "*asc",
|
||||
statIDs: []string{"Stat5", "Stat6", "Stat7", "Stat4", "Stat3", "Stat1", "Stat2"},
|
||||
},
|
||||
{
|
||||
name: "TestSortStatsDesc",
|
||||
sortMetric: []string{"*tcc", "*pdd:false", "*acc"},
|
||||
sorter: "*desc",
|
||||
statIDs: []string{"Stat7", "Stat6", "Stat5", "Stat3", "Stat4", "Stat2", "Stat1"},
|
||||
},
|
||||
{
|
||||
name: "TestSortStatsDesc",
|
||||
sortMetric: []string{"*acc", "*tcc", "*pdd:false"},
|
||||
sorter: "*desc",
|
||||
statIDs: []string{"Stat2", "Stat1", "Stat3", "Stat4", "Stat7", "Stat6", "Stat5"},
|
||||
},
|
||||
{
|
||||
name: "TestSortStatsAsc",
|
||||
sortMetric: []string{"*tcc", "*pdd:false", "*acc"},
|
||||
sorter: "*asc",
|
||||
statIDs: []string{"Stat5", "Stat6", "Stat7", "Stat4", "Stat3", "Stat1", "Stat2"},
|
||||
},
|
||||
{
|
||||
name: "TestSortStatsDesc",
|
||||
sortMetric: []string{"*pdd:false", "*acc", "*tcc"},
|
||||
sorter: "*desc",
|
||||
statIDs: []string{"Stat7", "Stat6", "Stat5", "Stat3", "Stat4", "Stat2", "Stat1"},
|
||||
},
|
||||
{
|
||||
name: "TestSortStatsAsc",
|
||||
sortMetric: []string{"*tcc", "*acc", "*pdd:false"},
|
||||
sorter: "*asc",
|
||||
statIDs: []string{"Stat5", "Stat6", "Stat7", "Stat4", "Stat3", "Stat1", "Stat2"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
rs, err := newRankingSorter(tc.sorter, tc.sortMetric, statmetrics)
|
||||
if tc.expErr != nil {
|
||||
if err == nil {
|
||||
t.Error("Expected error, got nil")
|
||||
}
|
||||
if tc.expErr.Error() != err.Error() {
|
||||
t.Errorf("Expected error: %v, got: %v", tc.expErr, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
if resStatIDs := rs.sortStatIDs(); !reflect.DeepEqual(resStatIDs, tc.statIDs) {
|
||||
t.Errorf("Expecting: %v, received %v", tc.statIDs, resStatIDs)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestRankingProfileFieldAsString(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
fldPath []string
|
||||
err error
|
||||
val any
|
||||
}{
|
||||
{utils.ID, []string{utils.ID}, nil, "RP1"},
|
||||
{utils.Tenant, []string{utils.Tenant}, nil, "cgrates.org"},
|
||||
{utils.Schedule, []string{utils.Schedule}, nil, "@every 2s"},
|
||||
{utils.StatIDs, []string{utils.StatIDs + "[0]"}, nil, "Stat1"},
|
||||
{utils.StatIDs, []string{utils.StatIDs + "[1]"}, nil, "Stat2"},
|
||||
{utils.MetricIDs, []string{utils.MetricIDs + "[0]"}, nil, "*tcc"},
|
||||
{utils.MetricIDs, []string{utils.MetricIDs + "[1]"}, nil, "*acc"},
|
||||
{utils.Sorting, []string{utils.Sorting}, nil, "*asc"},
|
||||
{utils.Stored, []string{utils.Stored}, nil, false},
|
||||
{utils.SortingParameters, []string{utils.SortingParameters + "[0]"}, nil, "*acc"},
|
||||
{utils.SortingParameters, []string{utils.SortingParameters + "[1]"}, nil, "*pdd:false"},
|
||||
{utils.ThresholdIDs, []string{utils.ThresholdIDs + "[0]"}, nil, "Threshold1"},
|
||||
{utils.ThresholdIDs, []string{utils.ThresholdIDs + "[1]"}, nil, "Threshold2"},
|
||||
{"NonExistingField", []string{"Field1"}, utils.ErrNotFound, nil},
|
||||
}
|
||||
rp := &RankingProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "RP1",
|
||||
Schedule: "@every 2s",
|
||||
StatIDs: []string{"Stat1", "Stat2"},
|
||||
MetricIDs: []string{"*tcc", "*acc", "*pdd"},
|
||||
Sorting: "*asc",
|
||||
SortingParameters: []string{"*acc", "*pdd:false"},
|
||||
ThresholdIDs: []string{"Threshold1", "Threshold2"},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
val, err := rp.FieldAsInterface(tc.fldPath)
|
||||
if tc.err != nil {
|
||||
if err == nil {
|
||||
t.Error("expect to receive an error")
|
||||
}
|
||||
if tc.err != err {
|
||||
t.Errorf("expected %v,received %v", tc.err, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error %v", err)
|
||||
}
|
||||
if val != tc.val {
|
||||
t.Errorf("expected %v,received %v", tc.val, val)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewRankingSorter(t *testing.T) {
|
||||
Metrics := map[string]map[string]float64{
|
||||
"STATS1": {"*acc": 12.1, "*tcc": 24.2},
|
||||
"STATS2": {"*acc": 12.1, "*tcc": 24.3},
|
||||
"STATS3": {"*acc": 10.1, "*tcc": 25.3},
|
||||
"STATS4": {"*tcc": 26.3},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
sortingType string
|
||||
sortingParams []string
|
||||
expectErr bool
|
||||
expectSorterType string
|
||||
}{
|
||||
{
|
||||
sortingType: utils.MetaAsc,
|
||||
sortingParams: []string{"*acc"},
|
||||
expectErr: false,
|
||||
expectSorterType: "RankingAscSorter",
|
||||
},
|
||||
{
|
||||
sortingType: utils.MetaDesc,
|
||||
sortingParams: []string{"*tcc"},
|
||||
expectErr: false,
|
||||
expectSorterType: "RankingDescSorter",
|
||||
},
|
||||
{
|
||||
sortingType: "unsupported",
|
||||
sortingParams: []string{"*tcc"},
|
||||
expectErr: true,
|
||||
expectSorterType: "",
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
rkSorter, err := newRankingSorter(test.sortingType, test.sortingParams, Metrics)
|
||||
|
||||
if test.expectErr {
|
||||
if err == nil {
|
||||
t.Errorf("Expected an error for sorting type %q, but got none", test.sortingType)
|
||||
}
|
||||
} else {
|
||||
if err != nil {
|
||||
t.Errorf("Did not expect an error for sorting type %q, but got: %v", test.sortingType, err)
|
||||
}
|
||||
switch test.sortingType {
|
||||
case utils.MetaAsc:
|
||||
if _, ok := rkSorter.(*rankingAscSorter); !ok {
|
||||
t.Errorf("Expected sorter type 'rankingAscSorter', but got %T", rkSorter)
|
||||
}
|
||||
case utils.MetaDesc:
|
||||
if _, ok := rkSorter.(*rankingDescSorter); !ok {
|
||||
t.Errorf("Expected sorter type 'rankingDescSorter', but got %T", rkSorter)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRankingProfileSet(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
path []string
|
||||
val any
|
||||
expectedErr error
|
||||
expectedRP RankingProfile
|
||||
}{
|
||||
{
|
||||
name: "Set Tenant",
|
||||
path: []string{utils.Tenant},
|
||||
val: "cgrates.org",
|
||||
expectedErr: nil,
|
||||
expectedRP: RankingProfile{Tenant: "cgrates.org"},
|
||||
},
|
||||
{
|
||||
name: "Set ID",
|
||||
path: []string{utils.ID},
|
||||
val: "profile1",
|
||||
expectedErr: nil,
|
||||
expectedRP: RankingProfile{ID: "profile1"},
|
||||
},
|
||||
{
|
||||
name: "Set Schedule",
|
||||
path: []string{utils.Schedule},
|
||||
val: "0 0 * * *",
|
||||
expectedErr: nil,
|
||||
expectedRP: RankingProfile{Schedule: "0 0 * * *"},
|
||||
},
|
||||
{
|
||||
name: "Set StatIDs",
|
||||
path: []string{utils.StatIDs},
|
||||
val: []string{"stat1", "stat2"},
|
||||
expectedErr: nil,
|
||||
expectedRP: RankingProfile{StatIDs: []string{"stat1", "stat2"}},
|
||||
},
|
||||
{
|
||||
name: "Set MetricIDs",
|
||||
path: []string{utils.MetricIDs},
|
||||
val: []string{"metric1", "metric2"},
|
||||
expectedErr: nil,
|
||||
expectedRP: RankingProfile{MetricIDs: []string{"metric1", "metric2"}},
|
||||
},
|
||||
{
|
||||
name: "Set Sorting",
|
||||
path: []string{utils.Sorting},
|
||||
val: "asc",
|
||||
expectedErr: nil,
|
||||
expectedRP: RankingProfile{Sorting: "asc"},
|
||||
},
|
||||
{
|
||||
name: "Set SortingParameters",
|
||||
path: []string{utils.SortingParameters},
|
||||
val: []string{"param1", "param2"},
|
||||
expectedErr: nil,
|
||||
expectedRP: RankingProfile{SortingParameters: []string{"param1", "param2"}},
|
||||
},
|
||||
{
|
||||
name: "Set Stored",
|
||||
path: []string{utils.Stored},
|
||||
val: true,
|
||||
expectedErr: nil,
|
||||
expectedRP: RankingProfile{Stored: true},
|
||||
},
|
||||
{
|
||||
name: "Set ThresholdIDs",
|
||||
path: []string{utils.ThresholdIDs},
|
||||
val: []string{"threshold1", "threshold2"},
|
||||
expectedErr: nil,
|
||||
expectedRP: RankingProfile{ThresholdIDs: []string{"threshold1", "threshold2"}},
|
||||
},
|
||||
{
|
||||
name: "Wrong path",
|
||||
path: []string{"wrongpath"},
|
||||
val: "value",
|
||||
expectedErr: utils.ErrWrongPath,
|
||||
expectedRP: RankingProfile{},
|
||||
},
|
||||
{
|
||||
name: "Empty path",
|
||||
path: []string{},
|
||||
val: "value",
|
||||
expectedErr: utils.ErrWrongPath,
|
||||
expectedRP: RankingProfile{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
rp := &RankingProfile{}
|
||||
err := rp.Set(tt.path, tt.val, false)
|
||||
|
||||
if err != tt.expectedErr {
|
||||
t.Errorf("Test %s failed: expected error %v, got %v", tt.name, tt.expectedErr, err)
|
||||
}
|
||||
|
||||
if rp.Tenant != tt.expectedRP.Tenant {
|
||||
t.Errorf("Test %s failed: expected Tenant %s, got %s", tt.name, tt.expectedRP.Tenant, rp.Tenant)
|
||||
}
|
||||
if rp.ID != tt.expectedRP.ID {
|
||||
t.Errorf("Test %s failed: expected ID %s, got %s", tt.name, tt.expectedRP.ID, rp.ID)
|
||||
}
|
||||
if rp.Schedule != tt.expectedRP.Schedule {
|
||||
t.Errorf("Test %s failed: expected Schedule %s, got %s", tt.name, tt.expectedRP.Schedule, rp.Schedule)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(rp.StatIDs, tt.expectedRP.StatIDs) {
|
||||
t.Errorf("Test %s failed: expected StatIDs %v, got %v", tt.name, tt.expectedRP.StatIDs, rp.StatIDs)
|
||||
}
|
||||
if !reflect.DeepEqual(rp.MetricIDs, tt.expectedRP.MetricIDs) {
|
||||
t.Errorf("Test %s failed: expected MetricIDs %v, got %v", tt.name, tt.expectedRP.MetricIDs, rp.MetricIDs)
|
||||
}
|
||||
if !reflect.DeepEqual(rp.SortingParameters, tt.expectedRP.SortingParameters) {
|
||||
t.Errorf("Test %s failed: expected SortingParameters %v, got %v", tt.name, tt.expectedRP.SortingParameters, rp.SortingParameters)
|
||||
}
|
||||
if !reflect.DeepEqual(rp.ThresholdIDs, tt.expectedRP.ThresholdIDs) {
|
||||
t.Errorf("Test %s failed: expected ThresholdIDs %v, got %v", tt.name, tt.expectedRP.ThresholdIDs, rp.ThresholdIDs)
|
||||
}
|
||||
if rp.Sorting != tt.expectedRP.Sorting {
|
||||
t.Errorf("Test %s failed: expected Sorting %s, got %s", tt.name, tt.expectedRP.Sorting, rp.Sorting)
|
||||
}
|
||||
if rp.Stored != tt.expectedRP.Stored {
|
||||
t.Errorf("Test %s failed: expected Stored %v, got %v", tt.name, tt.expectedRP.Stored, rp.Stored)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRankingProfileStringJson(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
rp RankingProfile
|
||||
expectedJSON string
|
||||
}{
|
||||
{
|
||||
name: "Valid RankingProfile",
|
||||
rp: RankingProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "profile1",
|
||||
Schedule: "0 0 * * *",
|
||||
StatIDs: []string{"stat1", "stat2"},
|
||||
MetricIDs: []string{"metric1", "metric2"},
|
||||
Sorting: "asc",
|
||||
SortingParameters: []string{"param1", "param2"},
|
||||
Stored: true,
|
||||
ThresholdIDs: []string{"threshold1"},
|
||||
},
|
||||
expectedJSON: `{"Tenant":"cgrates.org","ID":"profile1","Schedule":"0 0 * * *","StatIDs":["stat1","stat2"],"MetricIDs":["metric1","metric2"],"Sorting":"asc","SortingParameters":["param1","param2"],"Stored":true,"ThresholdIDs":["threshold1"]}`,
|
||||
},
|
||||
{
|
||||
name: "Empty RankingProfile",
|
||||
rp: RankingProfile{
|
||||
Tenant: "",
|
||||
ID: "",
|
||||
Schedule: "",
|
||||
StatIDs: []string{},
|
||||
MetricIDs: []string{},
|
||||
Sorting: "",
|
||||
SortingParameters: []string{},
|
||||
Stored: false,
|
||||
ThresholdIDs: []string{},
|
||||
},
|
||||
expectedJSON: `{"Tenant":"","ID":"","Schedule":"","StatIDs":[],"MetricIDs":[],"Sorting":"","SortingParameters":[],"Stored":false,"ThresholdIDs":[]}`,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result := tt.rp.String()
|
||||
|
||||
var resultMap map[string]interface{}
|
||||
err := json.Unmarshal([]byte(result), &resultMap)
|
||||
if err != nil {
|
||||
t.Errorf("Error unmarshalling result: %v", err)
|
||||
}
|
||||
|
||||
expectedMap := map[string]interface{}{}
|
||||
err = json.Unmarshal([]byte(tt.expectedJSON), &expectedMap)
|
||||
if err != nil {
|
||||
t.Errorf("Error unmarshalling expected JSON: %v", err)
|
||||
}
|
||||
|
||||
for key, value1 := range resultMap {
|
||||
if value2, exists := expectedMap[key]; exists {
|
||||
if value1Slice, ok1 := value1.([]interface{}); ok1 {
|
||||
if value2Slice, ok2 := value2.([]interface{}); ok2 {
|
||||
if len(value1Slice) != len(value2Slice) {
|
||||
t.Errorf("Test %s failed: slice length mismatch for key %s", tt.name, key)
|
||||
}
|
||||
for i, v1 := range value1Slice {
|
||||
if v1 != value2Slice[i] {
|
||||
t.Errorf("Test %s failed: slice mismatch for key %s at index %d", tt.name, key, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if value1 != value2 {
|
||||
t.Errorf("Test %s failed: expected %v for key %s, got %v", tt.name, value2, key, value1)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
t.Errorf("Test %s failed: key %s not found in expected result", tt.name, key)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTpRankingProfileFieldAsString(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
profile RankingProfile
|
||||
fldPath []string
|
||||
expected string
|
||||
expectErr bool
|
||||
}{
|
||||
{
|
||||
name: "Valid field path",
|
||||
profile: RankingProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "profile1",
|
||||
Schedule: "0 0 * * *",
|
||||
StatIDs: []string{"stat1", "stat2"},
|
||||
MetricIDs: []string{"metric1", "metric2"},
|
||||
SortingParameters: []string{"param1", "param2"},
|
||||
ThresholdIDs: []string{"threshold1", "threshold2"},
|
||||
},
|
||||
fldPath: []string{"Tenant"},
|
||||
expected: "cgrates.org",
|
||||
expectErr: false,
|
||||
},
|
||||
{
|
||||
name: "Invalid field path",
|
||||
profile: RankingProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "profile1",
|
||||
Schedule: "0 0 * * *",
|
||||
StatIDs: []string{"stat1", "stat2"},
|
||||
MetricIDs: []string{"metric1", "metric2"},
|
||||
SortingParameters: []string{"param1", "param2"},
|
||||
ThresholdIDs: []string{"threshold1", "threshold2"},
|
||||
},
|
||||
fldPath: []string{"NonExistentField"},
|
||||
expected: "",
|
||||
expectErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result, err := tt.profile.FieldAsString(tt.fldPath)
|
||||
|
||||
if tt.expectErr && err == nil {
|
||||
t.Errorf("Expected an error for test %s, but got none", tt.name)
|
||||
}
|
||||
if !tt.expectErr && err != nil {
|
||||
t.Errorf("Unexpected error for test %s: %v", tt.name, err)
|
||||
}
|
||||
|
||||
if result != tt.expected {
|
||||
t.Errorf("Test %s failed: expected %v, got %v", tt.name, tt.expected, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -672,8 +672,8 @@ func APItoModelTPRanking(tpRG *utils.TPRankingProfile) (mdls RankingMdls) {
|
||||
return
|
||||
}
|
||||
|
||||
func APItoRanking(tpRG *utils.TPRankingProfile) (rg *RankingProfile, err error) {
|
||||
rg = &RankingProfile{
|
||||
func APItoRanking(tpRG *utils.TPRankingProfile) (rg *utils.RankingProfile, err error) {
|
||||
rg = &utils.RankingProfile{
|
||||
Tenant: tpRG.Tenant,
|
||||
ID: tpRG.ID,
|
||||
Schedule: tpRG.Schedule,
|
||||
@@ -691,7 +691,7 @@ func APItoRanking(tpRG *utils.TPRankingProfile) (rg *RankingProfile, err error)
|
||||
return rg, nil
|
||||
}
|
||||
|
||||
func RankingProfileToAPI(rg *RankingProfile) (tpRG *utils.TPRankingProfile) {
|
||||
func RankingProfileToAPI(rg *utils.RankingProfile) (tpRG *utils.TPRankingProfile) {
|
||||
tpRG = &utils.TPRankingProfile{
|
||||
Tenant: rg.Tenant,
|
||||
ID: rg.ID,
|
||||
|
||||
@@ -1,536 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package engine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/cron"
|
||||
)
|
||||
|
||||
// NewRankingS is the constructor for RankingS
|
||||
func NewRankingS(dm *DataManager,
|
||||
connMgr *ConnManager,
|
||||
filterS *FilterS,
|
||||
cgrcfg *config.CGRConfig) *RankingS {
|
||||
return &RankingS{
|
||||
dm: dm,
|
||||
connMgr: connMgr,
|
||||
filterS: filterS,
|
||||
cgrcfg: cgrcfg,
|
||||
crn: cron.New(),
|
||||
crnRQsMux: new(sync.RWMutex),
|
||||
crnRQs: make(map[string]map[string]cron.EntryID),
|
||||
storedRankings: make(utils.StringSet),
|
||||
storingStopped: make(chan struct{}),
|
||||
rankingStop: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// RankingS is responsible of implementing the logic of RankingService
|
||||
type RankingS struct {
|
||||
dm *DataManager
|
||||
connMgr *ConnManager
|
||||
filterS *FilterS
|
||||
cgrcfg *config.CGRConfig
|
||||
|
||||
crn *cron.Cron // cron reference
|
||||
|
||||
crnRQsMux *sync.RWMutex // protects the crnTQs
|
||||
crnRQs map[string]map[string]cron.EntryID // save the EntryIDs for rankingQueries so we can reschedule them when needed
|
||||
|
||||
storedRankings utils.StringSet // keep a record of RankingS which need saving, map[rankingTenanrkID]bool
|
||||
sRksMux sync.RWMutex // protects storedRankings
|
||||
storingStopped chan struct{} // signal back that the operations were stopped
|
||||
|
||||
rankingStop chan struct{} // signal to stop all operations
|
||||
|
||||
}
|
||||
|
||||
// computeRanking will query the stats and build the Ranking for them
|
||||
//
|
||||
// it is to be called by Cron service
|
||||
func (rkS *RankingS) computeRanking(ctx *context.Context, rkP *RankingProfile) {
|
||||
rk, err := rkS.dm.GetRanking(ctx, rkP.Tenant, rkP.ID, true, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> querying RankingProfile with ID: <%s:%s> dm error: <%s>",
|
||||
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
|
||||
return
|
||||
}
|
||||
rk.rMux.Lock()
|
||||
defer rk.rMux.Unlock()
|
||||
if rk.rkPrfl == nil {
|
||||
rk.rkPrfl = rkP
|
||||
}
|
||||
rk.LastUpdate = time.Now()
|
||||
rk.Metrics = make(map[string]map[string]float64) // reset previous values
|
||||
rk.SortedStatIDs = make([]string, 0)
|
||||
for _, statID := range rkP.StatIDs {
|
||||
var floatMetrics map[string]float64
|
||||
if err := rkS.connMgr.Call(context.Background(), rkS.cgrcfg.RankingSCfg().StatSConns,
|
||||
utils.StatSv1GetQueueFloatMetrics,
|
||||
&utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: rkP.Tenant, ID: statID}},
|
||||
&floatMetrics); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> computing Ranking with ID: <%s:%s> for Stats <%s> error: <%s>",
|
||||
utils.RankingS, rkP.Tenant, rkP.ID, statID, err.Error()))
|
||||
return
|
||||
}
|
||||
if len(rk.metricIDs) != 0 {
|
||||
for metricID := range floatMetrics {
|
||||
if _, has := rk.metricIDs[statID]; !has {
|
||||
delete(floatMetrics, metricID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(floatMetrics) != 0 {
|
||||
rk.Metrics[statID] = make(map[string]float64)
|
||||
}
|
||||
for metricID, val := range floatMetrics {
|
||||
rk.Metrics[statID][metricID] = val
|
||||
}
|
||||
}
|
||||
if rk.SortedStatIDs, err = rankingSortStats(rkP.Sorting,
|
||||
rkP.SortingParameters, rk.Metrics); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> sorting stats for Ranking with ID: <%s:%s> error: <%s>",
|
||||
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
|
||||
return
|
||||
}
|
||||
if err = rkS.storeRanking(ctx, rk); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> storing Ranking with ID: <%s:%s> DM error: <%s>",
|
||||
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
|
||||
return
|
||||
}
|
||||
if err := rkS.processThresholds(rk); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> Ranking with id <%s:%s> error: <%s> with ThresholdS",
|
||||
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
|
||||
}
|
||||
if err := rkS.processEEs(rk); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> Trend with id <%s:%s> error: <%s> with EEs",
|
||||
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
|
||||
}
|
||||
}
|
||||
|
||||
// processThresholds will pass the Ranking event to ThresholdS
|
||||
func (rkS *RankingS) processThresholds(rk *Ranking) (err error) {
|
||||
if len(rk.SortedStatIDs) == 0 {
|
||||
return
|
||||
}
|
||||
if len(rkS.cgrcfg.RankingSCfg().ThresholdSConns) == 0 {
|
||||
return
|
||||
}
|
||||
opts := map[string]any{
|
||||
utils.MetaEventType: utils.RankingUpdate,
|
||||
}
|
||||
var thIDs []string
|
||||
if len(rk.rkPrfl.ThresholdIDs) != 0 {
|
||||
if len(rk.rkPrfl.ThresholdIDs) == 1 &&
|
||||
rk.rkPrfl.ThresholdIDs[0] == utils.MetaNone {
|
||||
return
|
||||
}
|
||||
thIDs = make([]string, len(rk.rkPrfl.ThresholdIDs))
|
||||
copy(thIDs, rk.rkPrfl.ThresholdIDs)
|
||||
}
|
||||
opts[utils.OptsThresholdsProfileIDs] = thIDs
|
||||
sortedStatIDs := make([]string, len(rk.SortedStatIDs))
|
||||
copy(sortedStatIDs, rk.SortedStatIDs)
|
||||
ev := &utils.CGREvent{
|
||||
Tenant: rk.Tenant,
|
||||
ID: utils.GenUUID(),
|
||||
APIOpts: opts,
|
||||
Event: map[string]any{
|
||||
utils.RankingID: rk.ID,
|
||||
utils.LastUpdate: rk.LastUpdate,
|
||||
utils.SortedStatIDs: sortedStatIDs,
|
||||
},
|
||||
}
|
||||
var withErrs bool
|
||||
var rkIDs []string
|
||||
if err := rkS.connMgr.Call(context.TODO(), rkS.cgrcfg.RankingSCfg().ThresholdSConns,
|
||||
utils.ThresholdSv1ProcessEvent, ev, &rkIDs); err != nil &&
|
||||
(len(thIDs) != 0 || err.Error() != utils.ErrNotFound.Error()) {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.", utils.RankingS, err.Error(), ev))
|
||||
withErrs = true
|
||||
}
|
||||
if withErrs {
|
||||
err = utils.ErrPartiallyExecuted
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// processEEs will pass the Ranking event to EEs
|
||||
func (rkS *RankingS) processEEs(rk *Ranking) (err error) {
|
||||
if len(rk.SortedStatIDs) == 0 {
|
||||
return
|
||||
}
|
||||
if len(rkS.cgrcfg.RankingSCfg().EEsConns) == 0 {
|
||||
return
|
||||
}
|
||||
opts := map[string]any{
|
||||
utils.MetaEventType: utils.RankingUpdate,
|
||||
}
|
||||
sortedStatIDs := make([]string, len(rk.SortedStatIDs))
|
||||
copy(sortedStatIDs, rk.SortedStatIDs)
|
||||
ev := &utils.CGREvent{
|
||||
Tenant: rk.Tenant,
|
||||
ID: utils.GenUUID(),
|
||||
APIOpts: opts,
|
||||
Event: map[string]any{
|
||||
utils.RankingID: rk.ID,
|
||||
utils.LastUpdate: rk.LastUpdate,
|
||||
utils.SortedStatIDs: sortedStatIDs,
|
||||
},
|
||||
}
|
||||
var withErrs bool
|
||||
var reply map[string]map[string]any
|
||||
if err := rkS.connMgr.Call(context.TODO(), rkS.cgrcfg.RankingSCfg().EEsConns,
|
||||
utils.EeSv1ProcessEvent, ev, &reply); err != nil &&
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %q processing event %+v with EEs.", utils.RankingS, err.Error(), ev))
|
||||
withErrs = true
|
||||
}
|
||||
if withErrs {
|
||||
err = utils.ErrPartiallyExecuted
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// storeTrend will store or schedule the trend based on settings
|
||||
func (rkS *RankingS) storeRanking(ctx *context.Context, rk *Ranking) (err error) {
|
||||
if rkS.cgrcfg.RankingSCfg().StoreInterval == 0 {
|
||||
return
|
||||
}
|
||||
if rkS.cgrcfg.RankingSCfg().StoreInterval == -1 {
|
||||
return rkS.dm.SetRanking(ctx, rk)
|
||||
}
|
||||
// schedule the asynchronous save, relies for Ranking to be in cache
|
||||
rkS.sRksMux.Lock()
|
||||
rkS.storedRankings.Add(rk.rkPrfl.TenantID())
|
||||
rkS.sRksMux.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// storeRankings will do one round for saving modified Rankings
|
||||
//
|
||||
// from cache to dataDB
|
||||
// designed to run asynchronously
|
||||
func (rkS *RankingS) storeRankings(ctx *context.Context) {
|
||||
var failedRkIDs []string
|
||||
for {
|
||||
rkS.sRksMux.Lock()
|
||||
rkID := rkS.storedRankings.GetOne()
|
||||
if rkID != utils.EmptyString {
|
||||
rkS.storedRankings.Remove(rkID)
|
||||
}
|
||||
rkS.sRksMux.Unlock()
|
||||
if rkID == utils.EmptyString {
|
||||
break // no more keys, backup completed
|
||||
}
|
||||
rkIf, ok := Cache.Get(utils.CacheRankings, rkID)
|
||||
if !ok || rkIf == nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> failed retrieving from cache Ranking with ID: %q",
|
||||
utils.RankingS, rkID))
|
||||
failedRkIDs = append(failedRkIDs, rkID) // record failure so we can schedule it for next backup
|
||||
continue
|
||||
}
|
||||
rk := rkIf.(*Ranking)
|
||||
rk.rMux.RLock()
|
||||
if err := rkS.dm.SetRanking(ctx, rk); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> failed storing Trend with ID: %q, err: %q",
|
||||
utils.RankingS, rkID, err))
|
||||
failedRkIDs = append(failedRkIDs, rkID) // record failure so we can schedule it for next backup
|
||||
}
|
||||
rk.rMux.RUnlock()
|
||||
// randomize the CPU load and give up thread control
|
||||
runtime.Gosched()
|
||||
}
|
||||
if len(failedRkIDs) != 0 { // there were errors on save, schedule the keys for next backup
|
||||
rkS.sRksMux.Lock()
|
||||
rkS.storedRankings.AddSlice(failedRkIDs)
|
||||
rkS.sRksMux.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// asyncStoreRankings runs as a backround process, calling storeRankings based on storeInterval
|
||||
func (rkS *RankingS) asyncStoreRankings(ctx *context.Context) {
|
||||
storeInterval := rkS.cgrcfg.RankingSCfg().StoreInterval
|
||||
if storeInterval <= 0 {
|
||||
close(rkS.storingStopped)
|
||||
return
|
||||
}
|
||||
for {
|
||||
rkS.storeRankings(ctx)
|
||||
select {
|
||||
case <-rkS.rankingStop:
|
||||
close(rkS.storingStopped)
|
||||
return
|
||||
case <-time.After(storeInterval): // continue to another storing loop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// StartRankings will activates the Cron, together with all scheduled Ranking queries
|
||||
func (rkS *RankingS) StartRankingS(ctx *context.Context) (err error) {
|
||||
if err = rkS.scheduleAutomaticQueries(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
rkS.crn.Start()
|
||||
go rkS.asyncStoreRankings(ctx)
|
||||
return
|
||||
}
|
||||
|
||||
// StopCron will shutdown the Cron tasks
|
||||
func (rkS *RankingS) StopRankingS() {
|
||||
timeEnd := time.Now().Add(rkS.cgrcfg.CoreSCfg().ShutdownTimeout)
|
||||
|
||||
ctx := rkS.crn.Stop()
|
||||
close(rkS.rankingStop)
|
||||
|
||||
// Wait for cron
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-time.After(time.Until(timeEnd)):
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> timeout waiting for Cron to finish",
|
||||
utils.RankingS))
|
||||
return
|
||||
}
|
||||
// Wait for backup and other operations
|
||||
select {
|
||||
case <-rkS.storingStopped:
|
||||
case <-time.After(time.Until(timeEnd)):
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> timeout waiting for RankingS to finish",
|
||||
utils.RankingS))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (rkS *RankingS) Reload(ctx *context.Context) {
|
||||
crnCtx := rkS.crn.Stop()
|
||||
close(rkS.rankingStop)
|
||||
<-crnCtx.Done()
|
||||
<-rkS.storingStopped
|
||||
rkS.rankingStop = make(chan struct{})
|
||||
rkS.storingStopped = make(chan struct{})
|
||||
rkS.crn.Start()
|
||||
go rkS.asyncStoreRankings(ctx)
|
||||
}
|
||||
|
||||
// scheduleAutomaticQueries will schedule the queries at start/reload based on configured
|
||||
func (rkS *RankingS) scheduleAutomaticQueries(ctx *context.Context) error {
|
||||
schedData := make(map[string][]string)
|
||||
for k, v := range rkS.cgrcfg.RankingSCfg().ScheduledIDs {
|
||||
schedData[k] = v
|
||||
}
|
||||
var tnts []string
|
||||
if len(schedData) == 0 {
|
||||
tnts = make([]string, 0)
|
||||
}
|
||||
for tnt, rkIDs := range schedData {
|
||||
if len(rkIDs) == 0 {
|
||||
tnts = append(tnts, tnt)
|
||||
}
|
||||
}
|
||||
if tnts != nil {
|
||||
qrydData, err := rkS.dm.GetRankingProfileIDs(ctx, tnts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for tnt, ids := range qrydData {
|
||||
schedData[tnt] = ids
|
||||
}
|
||||
}
|
||||
for tnt, rkIDs := range schedData {
|
||||
if _, err := rkS.scheduleRankingQueries(ctx, tnt, rkIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// scheduleTrendQueries will schedule/re-schedule specific trend queries
|
||||
func (rkS *RankingS) scheduleRankingQueries(ctx *context.Context,
|
||||
tnt string, rkIDs []string) (scheduled int, err error) {
|
||||
var partial bool
|
||||
rkS.crnRQsMux.Lock()
|
||||
if _, has := rkS.crnRQs[tnt]; !has {
|
||||
rkS.crnRQs[tnt] = make(map[string]cron.EntryID)
|
||||
}
|
||||
rkS.crnRQsMux.Unlock()
|
||||
for _, rkID := range rkIDs {
|
||||
rkS.crnRQsMux.RLock()
|
||||
if entryID, has := rkS.crnRQs[tnt][rkID]; has {
|
||||
rkS.crn.Remove(entryID) // deschedule the query
|
||||
}
|
||||
rkS.crnRQsMux.RUnlock()
|
||||
if rkP, err := rkS.dm.GetRankingProfile(ctx, tnt, rkID, true, true, utils.NonTransactional); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> failed retrieving RankingProfile with id: <%s:%s> for scheduling, error: <%s>",
|
||||
utils.RankingS, tnt, rkID, err.Error()))
|
||||
partial = true
|
||||
} else if entryID, err := rkS.crn.AddFunc(rkP.Schedule,
|
||||
func() { rkS.computeRanking(ctx, rkP.Clone()) }); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> scheduling RankingProfile <%s:%s>, error: <%s>",
|
||||
utils.RankingS, tnt, rkID, err.Error()))
|
||||
partial = true
|
||||
} else { // log the entry ID for debugging
|
||||
rkS.crnRQsMux.Lock()
|
||||
rkS.crnRQs[rkP.Tenant][rkP.ID] = entryID
|
||||
rkS.crnRQsMux.Unlock()
|
||||
scheduled++
|
||||
}
|
||||
}
|
||||
if partial {
|
||||
return 0, utils.ErrPartiallyExecuted
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// V1ScheduleQueries is the query for manually re-/scheduling Ranking Queries
|
||||
func (rkS *RankingS) V1ScheduleQueries(ctx *context.Context, args *utils.ArgScheduleRankingQueries, scheduled *int) (err error) {
|
||||
if sched, errSched := rkS.scheduleRankingQueries(ctx, args.Tenant, args.RankingIDs); errSched != nil {
|
||||
return errSched
|
||||
} else {
|
||||
*scheduled = sched
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// V1GetRanking is the API to return the Ranking instance
|
||||
func (rkS *RankingS) V1GetRanking(ctx *context.Context, arg *utils.TenantIDWithAPIOpts, retRanking *Ranking) (err error) {
|
||||
if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
}
|
||||
var rk *Ranking
|
||||
if rk, err = rkS.dm.GetRanking(ctx, arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
|
||||
return
|
||||
}
|
||||
rk.rMux.RLock()
|
||||
defer rk.rMux.RUnlock()
|
||||
retRanking.Tenant = rk.Tenant // avoid vet complaining for mutex copying
|
||||
retRanking.ID = rk.ID
|
||||
retRanking.Metrics = make(map[string]map[string]float64)
|
||||
for statID, metrics := range rk.Metrics {
|
||||
retRanking.Metrics[statID] = make(map[string]float64)
|
||||
for metricID, val := range metrics {
|
||||
retRanking.Metrics[statID][metricID] = val
|
||||
}
|
||||
}
|
||||
retRanking.LastUpdate = rk.LastUpdate
|
||||
retRanking.Sorting = rk.Sorting
|
||||
|
||||
retRanking.SortingParameters = make([]string, len(rk.SortingParameters))
|
||||
copy(retRanking.SortingParameters, rk.SortingParameters)
|
||||
|
||||
retRanking.SortedStatIDs = make([]string, len(rk.SortedStatIDs))
|
||||
copy(retRanking.SortedStatIDs, rk.SortedStatIDs)
|
||||
return
|
||||
}
|
||||
|
||||
// V1GetSchedule returns the active schedule for Ranking queries
|
||||
func (rkS *RankingS) V1GetSchedule(ctx *context.Context, args *utils.ArgScheduledRankings, schedRankings *[]utils.ScheduledRanking) (err error) {
|
||||
tnt := args.Tenant
|
||||
if tnt == utils.EmptyString {
|
||||
tnt = rkS.cgrcfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
rkS.crnRQsMux.RLock()
|
||||
defer rkS.crnRQsMux.RUnlock()
|
||||
rankingIDsMp, has := rkS.crnRQs[tnt]
|
||||
if !has {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
var scheduledRankings []utils.ScheduledRanking
|
||||
var entryIds map[string]cron.EntryID
|
||||
if len(args.RankingIDPrefixes) == 0 {
|
||||
entryIds = rankingIDsMp
|
||||
} else {
|
||||
entryIds = make(map[string]cron.EntryID)
|
||||
for _, rkID := range args.RankingIDPrefixes {
|
||||
for key, entryID := range rankingIDsMp {
|
||||
if strings.HasPrefix(key, rkID) {
|
||||
entryIds[key] = entryID
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(entryIds) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
var entry cron.Entry
|
||||
for id, entryID := range entryIds {
|
||||
entry = rkS.crn.Entry(entryID)
|
||||
if entry.ID == 0 {
|
||||
continue
|
||||
}
|
||||
scheduledRankings = append(scheduledRankings,
|
||||
utils.ScheduledRanking{
|
||||
RankingID: id,
|
||||
Next: entry.Next,
|
||||
Previous: entry.Prev,
|
||||
})
|
||||
}
|
||||
slices.SortFunc(scheduledRankings, func(a, b utils.ScheduledRanking) int {
|
||||
return a.Next.Compare(b.Next)
|
||||
})
|
||||
*schedRankings = scheduledRankings
|
||||
return nil
|
||||
}
|
||||
|
||||
// V1GetRankingSummary returns a summary of ascending/descending stat of the last updated ranking
|
||||
func (rS *RankingS) V1GetRankingSummary(ctx *context.Context, arg *utils.TenantIDWithAPIOpts, reply *RankingSummary) (err error) {
|
||||
var rnk *Ranking
|
||||
if rnk, err = rS.dm.GetRanking(ctx, arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
|
||||
return
|
||||
}
|
||||
rnk.rMux.RLock()
|
||||
rnkS := rnk.asRankingSummary()
|
||||
rnk.rMux.RUnlock()
|
||||
*reply = *rnkS
|
||||
return
|
||||
}
|
||||
@@ -1,161 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package engine
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
)
|
||||
|
||||
func TestTenantID(t *testing.T) {
|
||||
rp := &RankingProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "01",
|
||||
StatIDs: []string{"stat1", "stat2"},
|
||||
MetricIDs: []string{"metric1"},
|
||||
Sorting: "asc",
|
||||
SortingParameters: []string{"param1"},
|
||||
ThresholdIDs: []string{"threshold1"},
|
||||
}
|
||||
|
||||
tenantID := rp.TenantID()
|
||||
|
||||
expectedTenantID := "cgrates.org:01"
|
||||
|
||||
if tenantID != expectedTenantID {
|
||||
t.Errorf("TenantID() = %v; want %v", tenantID, expectedTenantID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRankingProfileWithAPIOpts(t *testing.T) {
|
||||
rp := &RankingProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "ID",
|
||||
StatIDs: []string{"stat1", "stat2"},
|
||||
MetricIDs: []string{"metric1"},
|
||||
Sorting: "asc",
|
||||
SortingParameters: []string{"param1"},
|
||||
ThresholdIDs: []string{"threshold1"},
|
||||
}
|
||||
|
||||
rpo := RankingProfileWithAPIOpts{
|
||||
RankingProfile: rp,
|
||||
APIOpts: map[string]any{"option1": "value1"},
|
||||
}
|
||||
|
||||
if rpo.APIOpts["option1"] != "value1" {
|
||||
t.Errorf("APIOpts[option1] = %v; want %v", rpo.APIOpts["option1"], "value1")
|
||||
}
|
||||
|
||||
if rpo.Tenant != rp.Tenant {
|
||||
t.Errorf("RankingProfile Tenant = %v; want %v", rpo.Tenant, rp.Tenant)
|
||||
}
|
||||
|
||||
if rpo.ID != rp.ID {
|
||||
t.Errorf("RankingProfile ID = %v; want %v", rpo.ID, rp.ID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRankingProfileLockKey(t *testing.T) {
|
||||
tests := []struct {
|
||||
tenant string
|
||||
id string
|
||||
expected string
|
||||
}{
|
||||
{"cgrates.org", "01", "*ranking_profiles:cgrates.org:01"},
|
||||
{"cgrates.org", "02", "*ranking_profiles:cgrates.org:02"},
|
||||
{"cgrates.org", "03", "*ranking_profiles:cgrates.org:03"},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
result := rankingProfileLockKey(test.tenant, test.id)
|
||||
|
||||
if result != test.expected {
|
||||
t.Errorf("rankingProfileLockKey(%q, %q) = %v; want %v", test.tenant, test.id, result, test.expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewRankingService(t *testing.T) {
|
||||
dm := &DataManager{}
|
||||
cgrcfg := &config.CGRConfig{}
|
||||
filterS := &FilterS{}
|
||||
connMgr := &ConnManager{}
|
||||
|
||||
rankingService := NewRankingS(dm, connMgr, filterS, cgrcfg)
|
||||
|
||||
if rankingService == nil {
|
||||
t.Fatal("NewRankingService() returned nil")
|
||||
}
|
||||
|
||||
if rankingService.dm != dm {
|
||||
t.Errorf("Expected dm to be %v, got %v", dm, rankingService.dm)
|
||||
}
|
||||
|
||||
if rankingService.cgrcfg != cgrcfg {
|
||||
t.Errorf("Expected cfg to be %v, got %v", cgrcfg, rankingService.cgrcfg)
|
||||
}
|
||||
|
||||
if rankingService.filterS != filterS {
|
||||
t.Errorf("Expected fltrS to be %v, got %v", filterS, rankingService.filterS)
|
||||
}
|
||||
|
||||
if rankingService.connMgr != connMgr {
|
||||
t.Errorf("Expected connMgr to be %v, got %v", connMgr, rankingService.connMgr)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStoreRanking(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
dataDB := NewInternalDB([]string{}, []string{}, map[string]*config.ItemOpts{})
|
||||
dm := NewDataManager(dataDB, cfg, nil)
|
||||
rkg := NewRankingS(dm, nil, nil, cfg)
|
||||
ranking := &Ranking{
|
||||
rkPrfl: &RankingProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "ID1",
|
||||
Schedule: "@every 1s",
|
||||
StatIDs: []string{"stat1", "stat2"},
|
||||
MetricIDs: []string{"metric1", "metric2"},
|
||||
Sorting: "asc",
|
||||
SortingParameters: []string{"metric1:true"},
|
||||
Stored: true,
|
||||
ThresholdIDs: []string{"threshold1"},
|
||||
},
|
||||
}
|
||||
ctx := context.Background()
|
||||
cfg.RankingSCfg().StoreInterval = 0
|
||||
if err := rkg.storeRanking(ctx, ranking); err != nil {
|
||||
t.Errorf("Expected no error when StoreInterval is 0, but got: %v", err)
|
||||
}
|
||||
if len(rkg.storedRankings) != 0 {
|
||||
t.Error("Expected storedRankings to be empty when StoreInterval is 0")
|
||||
}
|
||||
cfg.RankingSCfg().StoreInterval = -1
|
||||
if err := rkg.storeRanking(ctx, ranking); err != nil {
|
||||
t.Errorf("Expected no error when StoreInterval is -1, but got: %v", err)
|
||||
}
|
||||
cfg.RankingSCfg().StoreInterval = time.Second
|
||||
if err := rkg.storeRanking(ctx, ranking); err != nil {
|
||||
t.Errorf("Expected no error when StoreInterval is positive, but got: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -64,11 +64,11 @@ type DataDB interface {
|
||||
GetThresholdDrv(*context.Context, string, string) (*Threshold, error)
|
||||
SetThresholdDrv(*context.Context, *Threshold) error
|
||||
RemoveThresholdDrv(*context.Context, string, string) error
|
||||
SetRankingProfileDrv(ctx *context.Context, rp *RankingProfile) (err error)
|
||||
GetRankingProfileDrv(ctx *context.Context, tenant string, id string) (sq *RankingProfile, err error)
|
||||
SetRankingProfileDrv(ctx *context.Context, rp *utils.RankingProfile) (err error)
|
||||
GetRankingProfileDrv(ctx *context.Context, tenant string, id string) (sq *utils.RankingProfile, err error)
|
||||
RemRankingProfileDrv(ctx *context.Context, tenant string, id string) (err error)
|
||||
SetRankingDrv(ctx *context.Context, rn *Ranking) (err error)
|
||||
GetRankingDrv(ctx *context.Context, tenant string, id string) (sq *Ranking, err error)
|
||||
SetRankingDrv(ctx *context.Context, rn *utils.Ranking) (err error)
|
||||
GetRankingDrv(ctx *context.Context, tenant string, id string) (sq *utils.Ranking, err error)
|
||||
RemoveRankingDrv(ctx *context.Context, tenant string, id string) (err error)
|
||||
SetTrendProfileDrv(ctx *context.Context, sq *utils.TrendProfile) (err error)
|
||||
GetTrendProfileDrv(ctx *context.Context, tenant string, id string) (sq *utils.TrendProfile, err error)
|
||||
|
||||
@@ -256,15 +256,15 @@ func (iDB *InternalDB) RemStatQueueProfileDrv(_ *context.Context, tenant, id str
|
||||
return
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) GetRankingProfileDrv(_ *context.Context, tenant, id string) (sg *RankingProfile, err error) {
|
||||
func (iDB *InternalDB) GetRankingProfileDrv(_ *context.Context, tenant, id string) (sg *utils.RankingProfile, err error) {
|
||||
x, ok := iDB.db.Get(utils.CacheRankingProfiles, utils.ConcatenatedKey(tenant, id))
|
||||
if !ok || x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*RankingProfile), nil
|
||||
return x.(*utils.RankingProfile), nil
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) SetRankingProfileDrv(_ *context.Context, sgp *RankingProfile) (err error) {
|
||||
func (iDB *InternalDB) SetRankingProfileDrv(_ *context.Context, sgp *utils.RankingProfile) (err error) {
|
||||
iDB.db.Set(utils.CacheRankingProfiles, sgp.TenantID(), sgp, nil, true, utils.NonTransactional)
|
||||
return nil
|
||||
}
|
||||
@@ -274,15 +274,15 @@ func (iDB *InternalDB) RemRankingProfileDrv(_ *context.Context, tenant, id strin
|
||||
return nil
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) GetRankingDrv(_ *context.Context, tenant, id string) (rn *Ranking, err error) {
|
||||
func (iDB *InternalDB) GetRankingDrv(_ *context.Context, tenant, id string) (rn *utils.Ranking, err error) {
|
||||
x, ok := iDB.db.Get(utils.CacheRankings, utils.ConcatenatedKey(tenant, id))
|
||||
if !ok || x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*Ranking), nil
|
||||
return x.(*utils.Ranking), nil
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) SetRankingDrv(_ *context.Context, rn *Ranking) (err error) {
|
||||
func (iDB *InternalDB) SetRankingDrv(_ *context.Context, rn *utils.Ranking) (err error) {
|
||||
iDB.db.Set(utils.CacheRankings, rn.TenantID(), rn, nil,
|
||||
true, utils.NonTransactional)
|
||||
return
|
||||
|
||||
@@ -739,8 +739,8 @@ func (ms *MongoStorage) RemoveResourceDrv(ctx *context.Context, tenant, id strin
|
||||
})
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetRankingProfileDrv(ctx *context.Context, tenant, id string) (*RankingProfile, error) {
|
||||
rgProfile := new(RankingProfile)
|
||||
func (ms *MongoStorage) GetRankingProfileDrv(ctx *context.Context, tenant, id string) (*utils.RankingProfile, error) {
|
||||
rgProfile := new(utils.RankingProfile)
|
||||
err := ms.query(ctx, func(sctx mongo.SessionContext) error {
|
||||
sr := ms.getCol(ColRgp).FindOne(sctx, bson.M{"tenant": tenant, "id": id})
|
||||
decodeErr := sr.Decode(rgProfile)
|
||||
@@ -752,7 +752,7 @@ func (ms *MongoStorage) GetRankingProfileDrv(ctx *context.Context, tenant, id st
|
||||
return rgProfile, err
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetRankingProfileDrv(ctx *context.Context, sgp *RankingProfile) (err error) {
|
||||
func (ms *MongoStorage) SetRankingProfileDrv(ctx *context.Context, sgp *utils.RankingProfile) (err error) {
|
||||
return ms.query(ctx, func(sctx mongo.SessionContext) error {
|
||||
_, err := ms.getCol(ColRgp).UpdateOne(sctx, bson.M{"tenant": sgp.Tenant, "id": sgp.ID},
|
||||
bson.M{"$set": sgp},
|
||||
@@ -772,8 +772,8 @@ func (ms *MongoStorage) RemRankingProfileDrv(ctx *context.Context, tenant, id st
|
||||
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetRankingDrv(ctx *context.Context, tenant, id string) (*Ranking, error) {
|
||||
rn := new(Ranking)
|
||||
func (ms *MongoStorage) GetRankingDrv(ctx *context.Context, tenant, id string) (*utils.Ranking, error) {
|
||||
rn := new(utils.Ranking)
|
||||
err := ms.query(ctx, func(sctx mongo.SessionContext) error {
|
||||
sr := ms.getCol(ColRnk).FindOne(sctx, bson.M{"tenant": tenant, "id": id})
|
||||
decodeErr := sr.Decode(rn)
|
||||
@@ -784,7 +784,7 @@ func (ms *MongoStorage) GetRankingDrv(ctx *context.Context, tenant, id string) (
|
||||
})
|
||||
return rn, err
|
||||
}
|
||||
func (ms *MongoStorage) SetRankingDrv(ctx *context.Context, rn *Ranking) error {
|
||||
func (ms *MongoStorage) SetRankingDrv(ctx *context.Context, rn *utils.Ranking) error {
|
||||
return ms.query(ctx, func(sctx mongo.SessionContext) error {
|
||||
_, err := ms.getCol(ColRnk).UpdateOne(sctx, bson.M{"tenant": rn.Tenant, "id": rn.ID},
|
||||
bson.M{"$set": rn},
|
||||
|
||||
@@ -577,7 +577,7 @@ func (rs *RedisStorage) RemoveTrendDrv(ctx *context.Context, tenant, id string)
|
||||
return rs.Cmd(nil, redisDEL, utils.TrendPrefix+utils.ConcatenatedKey(tenant, id))
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetRankingProfileDrv(ctx *context.Context, sg *RankingProfile) (err error) {
|
||||
func (rs *RedisStorage) SetRankingProfileDrv(ctx *context.Context, sg *utils.RankingProfile) (err error) {
|
||||
var result []byte
|
||||
if result, err = rs.ms.Marshal(sg); err != nil {
|
||||
return
|
||||
@@ -585,7 +585,7 @@ func (rs *RedisStorage) SetRankingProfileDrv(ctx *context.Context, sg *RankingPr
|
||||
return rs.Cmd(nil, redisSET, utils.RankingProfilePrefix+utils.ConcatenatedKey(sg.Tenant, sg.ID), string(result))
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetRankingProfileDrv(ctx *context.Context, tenant string, id string) (sg *RankingProfile, err error) {
|
||||
func (rs *RedisStorage) GetRankingProfileDrv(ctx *context.Context, tenant string, id string) (sg *utils.RankingProfile, err error) {
|
||||
var values []byte
|
||||
if err = rs.Cmd(&values, redisGET, utils.RankingProfilePrefix+utils.ConcatenatedKey(tenant, id)); err != nil {
|
||||
return
|
||||
@@ -601,7 +601,7 @@ func (rs *RedisStorage) RemRankingProfileDrv(ctx *context.Context, tenant string
|
||||
return rs.Cmd(nil, redisDEL, utils.RankingProfilePrefix+utils.ConcatenatedKey(tenant, id))
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetRankingDrv(ctx *context.Context, tenant, id string) (rn *Ranking, err error) {
|
||||
func (rs *RedisStorage) GetRankingDrv(ctx *context.Context, tenant, id string) (rn *utils.Ranking, err error) {
|
||||
var values []byte
|
||||
if err = rs.Cmd(&values, redisGET, utils.RankingPrefix+utils.ConcatenatedKey(tenant, id)); err != nil {
|
||||
return
|
||||
@@ -613,7 +613,7 @@ func (rs *RedisStorage) GetRankingDrv(ctx *context.Context, tenant, id string) (
|
||||
return rn, err
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetRankingDrv(_ *context.Context, rn *Ranking) (err error) {
|
||||
func (rs *RedisStorage) SetRankingDrv(_ *context.Context, rn *utils.Ranking) (err error) {
|
||||
var result []byte
|
||||
if result, err = rs.ms.Marshal(rn); err != nil {
|
||||
return
|
||||
|
||||
@@ -466,7 +466,7 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) {
|
||||
log.Print("RankingProfiles:")
|
||||
}
|
||||
for _, tpRN := range tpr.rgProfiles {
|
||||
var rn *RankingProfile
|
||||
var rn *utils.RankingProfile
|
||||
if rn, err = APItoRanking(tpRN); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user