rankings,trends: added implementation,services and tests

This commit is contained in:
gezimbll
2024-11-01 11:40:13 +01:00
committed by Dan Christian Bogos
parent 766a717b3c
commit 132a2b3bf9
40 changed files with 2223 additions and 402 deletions

View File

@@ -254,6 +254,19 @@ func (dbM *DataDBMock) RemRankingProfileDrv(ctx *context.Context, tenant string,
}
return utils.ErrNotImplemented
}
func (dbM *DataDBMock) GetRankingDrv(ctx *context.Context, tenant, id string) (*Ranking, error) {
return nil, utils.ErrNotImplemented
}
func (dbM *DataDBMock) SetRankingDrv(ctx *context.Context, _ *Ranking) error {
return utils.ErrNotImplemented
}
func (dbM *DataDBMock) RemoveRankingDrv(ctx *context.Context, _ string, _ string) error {
return utils.ErrNotImplemented
}
func (dbM *DataDBMock) GetTrendProfileDrv(ctx *context.Context, tenant, id string) (sg *TrendProfile, err error) {
if dbM.GetStatQueueProfileDrvF != nil {
return dbM.GetTrendProfileDrvF(ctx, tenant, id)
@@ -486,15 +499,15 @@ func (dbM *DataDBMock) RemoveRateProfileDrv(ctx *context.Context, str1 string, s
return utils.ErrNotImplemented
}
func (dbM *DataDBMock) GetTrendDrv(tenant, id string) (*Trend, error) {
func (dbM *DataDBMock) GetTrendDrv(ctx *context.Context, tenant, id string) (*Trend, error) {
return nil, utils.ErrNotImplemented
}
func (dbM *DataDBMock) SetTrendDrv(*Trend) error {
func (dbM *DataDBMock) SetTrendDrv(ctx *context.Context, tr *Trend) error {
return utils.ErrNotImplemented
}
func (dbM *DataDBMock) RemoveTrendDrv(string, string) error {
func (dbM *DataDBMock) RemoveTrendDrv(ctx *context.Context, _ string, _ string) error {
return utils.ErrNotImplemented
}

View File

@@ -961,77 +961,11 @@ func (dm *DataManager) RemoveStatQueueProfile(ctx *context.Context, tenant, id s
return dm.RemoveStatQueue(ctx, tenant, id)
}
func (dm *DataManager) GetTrendProfile(ctx *context.Context, tenant, id string) (trp *TrendProfile, err error) {
if dm == nil {
err = utils.ErrNoDatabaseConn
return
}
trp, err = dm.dataDB.GetTrendProfileDrv(ctx, tenant, id)
if err != nil {
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrendProfiles]; err == utils.ErrNotFound && itm.Remote {
if err = dm.connMgr.Call(context.TODO(), config.CgrConfig().DataDbCfg().RmtConns,
utils.ReplicatorSv1GetTrendProfile,
&utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{Tenant: tenant, ID: id},
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, utils.EmptyString,
utils.FirstNonEmpty(config.CgrConfig().DataDbCfg().RmtConnID,
config.CgrConfig().GeneralCfg().NodeID)),
}, &trp); err == nil {
err = dm.dataDB.SetTrendProfileDrv(ctx, trp)
}
}
}
return
}
func (dm *DataManager) SetTrendProfile(ctx *context.Context, trp *TrendProfile) (err error) {
if dm == nil {
return utils.ErrNoDatabaseConn
}
if err = dm.DataDB().SetTrendProfileDrv(ctx, trp); err != nil {
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrendProfiles]; itm.Replicate {
err = replicate(context.Background(), dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.TrendProfilePrefix, trp.TenantID(),
utils.ReplicatorSv1SetTrendProfile,
&TrendProfileWithAPIOpts{
TrendProfile: trp,
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)})
}
return
}
func (dm *DataManager) RemoveTrendProfile(ctx *context.Context, tenant, id string) (err error) {
oldSgs, err := dm.GetTrendProfile(ctx, tenant, id)
if err != nil && err != utils.ErrNotFound {
return err
}
if err = dm.DataDB().RemTrendProfileDrv(ctx, tenant, id); err != nil {
return
}
if oldSgs == nil {
return utils.ErrNotFound
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankingProfiles]; itm.Replicate {
replicate(context.Background(), dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.TrendProfilePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveTrendProfile,
&utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{Tenant: tenant, ID: id},
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)})
}
return
}
// GetTrend retrieves a Trend from dataDB
func (dm *DataManager) GetTrend(tenant, id string,
func (dm *DataManager) GetTrend(ctx *context.Context, tenant, id string,
cacheRead, cacheWrite bool, transactionID string) (tr *Trend, err error) {
tntID := utils.ConcatenatedKey(tenant, id)
if cacheRead {
if x, ok := Cache.Get(utils.CacheTrends, tntID); ok {
if x == nil {
@@ -1044,70 +978,89 @@ func (dm *DataManager) GetTrend(tenant, id string,
err = utils.ErrNoDatabaseConn
return
}
tr, err = dm.dataDB.GetTrendDrv(tenant, id)
if err != nil {
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrends]; err == utils.ErrNotFound && itm.Remote {
if tr, err = dm.dataDB.GetTrendDrv(ctx, tenant, id); err != nil {
if err != utils.ErrNotFound { // database error
return
}
// ErrNotFound
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrends]; itm.Remote {
if err = dm.connMgr.Call(context.TODO(), config.CgrConfig().DataDbCfg().RmtConns,
utils.ReplicatorSv1GetTrend, &utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{Tenant: tenant, ID: id},
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, utils.EmptyString,
utils.FirstNonEmpty(config.CgrConfig().DataDbCfg().RmtConnID,
config.CgrConfig().GeneralCfg().NodeID)),
}, &tr); err == nil {
err = dm.dataDB.SetTrendDrv(tr)
}, &tr); err != nil {
err = utils.CastRPCErr(err)
if err != utils.ErrNotFound { // RPC error
return
}
} else if err = dm.dataDB.SetTrendDrv(ctx, tr); err != nil {
return
}
}
if err != nil {
err = utils.CastRPCErr(err)
if err == utils.ErrNotFound && cacheWrite {
if errCh := Cache.Set(context.Background(), utils.CacheTrends, tntID, nil, nil,
cacheCommit(transactionID), transactionID); errCh != nil {
return nil, errCh
// have Trend or ErrNotFound
if err == utils.ErrNotFound {
if cacheWrite {
if errCache := Cache.Set(ctx, utils.CacheTrends, tntID, nil, nil,
cacheCommit(transactionID), transactionID); errCache != nil {
return nil, errCache
}
}
return nil, err
return
}
}
if err = tr.uncompress(dm.ms); err != nil {
return nil, err
}
if cacheWrite {
if errCh := Cache.Set(context.Background(), utils.CacheTrends, tntID, tr, nil,
if errCh := Cache.Set(ctx, utils.CacheTrends, tntID, tr, nil,
cacheCommit(transactionID), transactionID); errCh != nil {
return nil, errCh
}
}
return
}
// SetTrend stores Trend in dataDB
func (dm *DataManager) SetTrend(tr *Trend) (err error) {
func (dm *DataManager) SetTrend(ctx *context.Context, tr *Trend) (err error) {
if dm == nil {
return utils.ErrNoDatabaseConn
}
if err = dm.DataDB().SetTrendDrv(tr); err != nil {
if dm.dataDB.GetStorageType() != utils.MetaInternal {
if err = tr.compress(dm.ms); err != nil {
return
}
}
if err = dm.DataDB().SetTrendDrv(ctx, tr); err != nil {
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrends]; itm.Replicate {
err = replicate(context.Background(), dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
if err = replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.TrendPrefix, tr.TenantID(), // this are used to get the host IDs from cache
utils.ReplicatorSv1SetTrend,
&TrendWithAPIOpts{
Trend: tr,
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)})
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}); err != nil {
return
}
}
return
}
// RemoveTrend removes the stored Trend
func (dm *DataManager) RemoveTrend(tenant, id string) (err error) {
func (dm *DataManager) RemoveTrend(ctx *context.Context, tenant, id string) (err error) {
if dm == nil {
return utils.ErrNoDatabaseConn
}
if err = dm.DataDB().RemoveTrendDrv(tenant, id); err != nil {
if err = dm.DataDB().RemoveTrendDrv(ctx, tenant, id); err != nil {
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrends]; itm.Replicate {
replicate(context.Background(), dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.TrendPrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveTrend,
@@ -1119,6 +1072,147 @@ func (dm *DataManager) RemoveTrend(tenant, id string) (err error) {
return
}
func (dm *DataManager) GetTrendProfile(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool,
transactionID string) (trp *TrendProfile, err error) {
tntID := utils.ConcatenatedKey(tenant, id)
if cacheRead {
if x, ok := Cache.Get(utils.CacheTrendProfiles, tntID); ok {
if x == nil {
return nil, utils.ErrNotFound
}
return x.(*TrendProfile), nil
}
}
if dm == nil {
err = utils.ErrNoDatabaseConn
return
}
trp, err = dm.dataDB.GetTrendProfileDrv(ctx, tenant, id)
if err != nil {
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrendProfiles]; err == utils.ErrNotFound && itm.Remote {
if err = dm.connMgr.Call(ctx, config.CgrConfig().DataDbCfg().RmtConns,
utils.ReplicatorSv1GetTrendProfile,
&utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{Tenant: tenant, ID: id},
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, utils.EmptyString,
utils.FirstNonEmpty(config.CgrConfig().DataDbCfg().RmtConnID,
config.CgrConfig().GeneralCfg().NodeID)),
}, &trp); err == nil {
err = dm.dataDB.SetTrendProfileDrv(ctx, trp)
}
}
if err != nil {
err = utils.CastRPCErr(err)
if err == utils.ErrNotFound && cacheWrite {
if errCh := Cache.Set(ctx, utils.CacheTrendProfiles, tntID, nil, nil,
cacheCommit(transactionID), transactionID); errCh != nil {
return nil, errCh
}
}
return nil, err
}
}
if cacheWrite {
if errCh := Cache.Set(ctx, utils.CacheTrendProfiles, tntID, trp, nil,
cacheCommit(transactionID), transactionID); errCh != nil {
return nil, errCh
}
}
return
}
func (dm *DataManager) GetTrendProfileIDs(ctx *context.Context, tenants []string) (tps map[string][]string, err error) {
prfx := utils.TrendProfilePrefix
var keys []string
if len(tenants) == 0 {
keys, err = dm.dataDB.GetKeysForPrefix(ctx, prfx)
if err != nil {
return
}
} else {
for _, tenant := range tenants {
var tntkeys []string
tntPrfx := prfx + tenant + utils.ConcatenatedKeySep
tntkeys, err = dm.dataDB.GetKeysForPrefix(ctx, tntPrfx)
if err != nil {
return
}
keys = append(keys, tntkeys...)
}
}
// if len(keys) == 0 {
// return nil, utils.ErrNotFound
// }
tps = make(map[string][]string)
for _, key := range keys {
indx := strings.Index(key, utils.ConcatenatedKeySep)
tenant := key[len(utils.TrendProfilePrefix):indx]
id := key[indx+1:]
tps[tenant] = append(tps[tenant], id)
}
return
}
func (dm *DataManager) SetTrendProfile(ctx *context.Context, trp *TrendProfile) (err error) {
if dm == nil {
return utils.ErrNoDatabaseConn
}
oldTrd, err := dm.GetTrendProfile(ctx, trp.Tenant, trp.ID, true, false, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
return err
}
if err = dm.DataDB().SetTrendProfileDrv(ctx, trp); err != nil {
return err
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrendProfiles]; itm.Replicate {
err = replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.TrendProfilePrefix, trp.TenantID(),
utils.ReplicatorSv1SetTrendProfile,
&TrendProfileWithAPIOpts{
TrendProfile: trp,
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)})
}
if oldTrd == nil ||
oldTrd.QueueLength != trp.QueueLength ||
oldTrd.Schedule != trp.Schedule {
if err = dm.SetTrend(ctx, NewTrendFromProfile(trp)); err != nil {
return
}
}
return
}
func (dm *DataManager) RemoveTrendProfile(ctx *context.Context, tenant, id string) (err error) {
if dm == nil {
return utils.ErrNoDatabaseConn
}
oldTrs, err := dm.GetTrendProfile(ctx, tenant, id, true, false, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
return err
}
if err = dm.DataDB().RemTrendProfileDrv(ctx, tenant, id); err != nil {
return
}
if oldTrs == nil {
return utils.ErrNotFound
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankingProfiles]; itm.Replicate {
replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.TrendProfilePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveTrendProfile,
&utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{Tenant: tenant, ID: id},
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)})
}
return dm.RemoveTrend(ctx, tenant, id)
}
func (dm *DataManager) GetRankingProfile(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool, transactionID string) (rgp *RankingProfile, err error) {
tntID := utils.ConcatenatedKey(tenant, id)
if cacheRead {
@@ -1150,7 +1244,7 @@ func (dm *DataManager) GetRankingProfile(ctx *context.Context, tenant, id string
if err != nil {
err = utils.CastRPCErr(err)
if err == utils.ErrNotFound && cacheWrite {
if errCh := Cache.Set(context.Background(), utils.CacheRankingProfiles, tntID, nil, nil,
if errCh := Cache.Set(ctx, utils.CacheRankingProfiles, tntID, nil, nil,
cacheCommit(transactionID), transactionID); errCh != nil {
return nil, errCh
}
@@ -1159,7 +1253,7 @@ func (dm *DataManager) GetRankingProfile(ctx *context.Context, tenant, id string
}
}
if cacheWrite {
if errCh := Cache.Set(context.Background(), utils.CacheRankingProfiles, tntID, rgp, nil,
if errCh := Cache.Set(ctx, utils.CacheRankingProfiles, tntID, rgp, nil,
cacheCommit(transactionID), transactionID); errCh != nil {
return nil, errCh
}
@@ -1167,23 +1261,65 @@ func (dm *DataManager) GetRankingProfile(ctx *context.Context, tenant, id string
return
}
func (dm *DataManager) SetRankingProfile(ctx *context.Context, sgp *RankingProfile) (err error) {
func (dm *DataManager) GetRankingProfileIDs(ctx *context.Context, tenants []string) (rns map[string][]string, err error) {
prfx := utils.RankingProfilePrefix
var keys []string
if len(tenants) == 0 {
keys, err = dm.dataDB.GetKeysForPrefix(ctx, prfx)
if err != nil {
return
}
} else {
for _, tenant := range tenants {
var tntkeys []string
tntPrfx := prfx + tenant + utils.ConcatenatedKeySep
tntkeys, err = dm.dataDB.GetKeysForPrefix(ctx, tntPrfx)
if err != nil {
return
}
keys = append(keys, tntkeys...)
}
}
// if len(keys) == 0 {
// return nil, utils.ErrNotFound
// }
rns = make(map[string][]string)
for _, key := range keys {
indx := strings.Index(key, utils.ConcatenatedKeySep)
tenant := key[len(utils.RankingProfilePrefix):indx]
id := key[indx+1:]
rns[tenant] = append(rns[tenant], id)
}
return
}
func (dm *DataManager) SetRankingProfile(ctx *context.Context, rnp *RankingProfile) (err error) {
if dm == nil {
return utils.ErrNoDatabaseConn
}
if err = dm.DataDB().SetRankingProfileDrv(ctx, sgp); err != nil {
oldRnk, err := dm.GetRankingProfile(ctx, rnp.Tenant, rnp.ID, true, false, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
return err
}
if err = dm.DataDB().SetRankingProfileDrv(ctx, rnp); err != nil {
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankingProfiles]; itm.Replicate {
err = replicate(context.Background(), dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
err = replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.RankingProfilePrefix, sgp.TenantID(),
utils.RankingProfilePrefix, rnp.TenantID(),
utils.ReplicatorSv1SetRankingProfile,
&RankingProfileWithAPIOpts{
RankingProfile: sgp,
RankingProfile: rnp,
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)})
}
if oldRnk == nil || oldRnk.Sorting != rnp.Sorting ||
oldRnk.Schedule != rnp.Schedule {
if err = dm.SetRanking(ctx, NewRankingFromProfile(rnp)); err != nil {
return
}
}
return
}
@@ -1202,7 +1338,7 @@ func (dm *DataManager) RemoveRankingProfile(ctx *context.Context, tenant, id str
return utils.ErrNotFound
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankingProfiles]; itm.Replicate {
replicate(context.Background(), dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.RankingProfilePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveRankingProfile,
@@ -1213,6 +1349,97 @@ func (dm *DataManager) RemoveRankingProfile(ctx *context.Context, tenant, id str
}
return
}
func (dm *DataManager) GetRanking(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool, transactionID string) (rn *Ranking, err error) {
tntID := utils.ConcatenatedKey(tenant, id)
if cacheRead {
if x, ok := Cache.Get(utils.CacheRankings, tntID); ok {
if x == nil {
return nil, utils.ErrNotFound
}
return x.(*Ranking), nil
}
}
if dm == nil {
err = utils.ErrNoDatabaseConn
return
}
if rn, err = dm.dataDB.GetRankingDrv(ctx, tenant, id); err != nil {
if err != utils.ErrNotFound { // database error
return
}
// ErrNotFound
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankings]; itm.Remote {
if err = dm.connMgr.Call(context.TODO(), config.CgrConfig().DataDbCfg().RmtConns,
utils.ReplicatorSv1GetRanking, &utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{Tenant: tenant, ID: id},
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, utils.EmptyString,
utils.FirstNonEmpty(config.CgrConfig().DataDbCfg().RmtConnID,
config.CgrConfig().GeneralCfg().NodeID)),
}, &rn); err == nil {
err = dm.dataDB.SetRankingDrv(ctx, rn)
}
}
if err != nil {
err = utils.CastRPCErr(err)
if err == utils.ErrNotFound && cacheWrite {
if errCh := Cache.Set(ctx, utils.CacheRankings, tntID, nil, nil, cacheCommit(transactionID), transactionID); errCh != nil {
return nil, errCh
}
}
return nil, err
}
if cacheWrite {
if errCh := Cache.Set(ctx, utils.CacheRankings, tntID, rn, nil, cacheCommit(transactionID), transactionID); errCh != nil {
return nil, errCh
}
}
}
return
}
// SetRanking stores Ranking in dataDB
func (dm *DataManager) SetRanking(ctx *context.Context, rn *Ranking) (err error) {
if dm == nil {
return utils.ErrNoDatabaseConn
}
if err = dm.DataDB().SetRankingDrv(ctx, rn); err != nil {
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankings]; itm.Replicate {
if err = replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.RankingPrefix, rn.TenantID(), // this are used to get the host IDs from cache
utils.ReplicatorSv1SetRanking,
&RankingWithAPIOpts{
Ranking: rn,
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}); err != nil {
return
}
}
return
}
// RemoveRanking removes the stored Ranking
func (dm *DataManager) RemoveRanking(ctx *context.Context, tenant, id string) (err error) {
if dm == nil {
return utils.ErrNoDatabaseConn
}
if err = dm.DataDB().RemoveRankingDrv(ctx, tenant, id); err != nil {
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankings]; itm.Replicate {
replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.RankingPrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveRanking,
&utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{Tenant: tenant, ID: id},
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)})
}
return
}
func (dm *DataManager) GetResource(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool,
transactionID string) (rs *Resource, err error) {

283
engine/librankings.go Normal file
View File

@@ -0,0 +1,283 @@
package engine
import (
"sort"
"strings"
"sync"
"time"
"github.com/cgrates/cgrates/utils"
)
type RankingProfileWithAPIOpts struct {
*RankingProfile
APIOpts map[string]any
}
type RankingProfile struct {
Tenant string // Tenant this profile belongs to
ID string // Profile identification
Schedule string // Cron schedule this profile should run at
StatIDs []string // List of stat instances to query
MetricIDs []string // Filter out only specific metrics in reply for sorting
Sorting string // Sorting strategy. Possible values: <*asc|*desc>
SortingParameters []string // Sorting parameters: depending on sorting type, list of metric ids for now with optional true or false in case of reverse logic is desired
Stored bool // Offline storage activation for this profile
ThresholdIDs []string // List of threshold IDs to limit this Ranking to. *none to disable threshold processing for it.
}
func (sgp *RankingProfile) TenantID() string {
return utils.ConcatenatedKey(sgp.Tenant, sgp.ID)
}
// Clone will clone a RankingProfile
func (rkP *RankingProfile) Clone() (cln *RankingProfile) {
cln = &RankingProfile{
Tenant: rkP.Tenant,
ID: rkP.ID,
Schedule: rkP.Schedule,
Sorting: rkP.Sorting,
}
if rkP.StatIDs != nil {
cln.StatIDs = make([]string, len(rkP.StatIDs))
copy(cln.StatIDs, rkP.StatIDs)
}
if rkP.MetricIDs != nil {
cln.MetricIDs = make([]string, len(rkP.MetricIDs))
copy(cln.MetricIDs, rkP.MetricIDs)
}
if rkP.SortingParameters != nil {
cln.SortingParameters = make([]string, len(rkP.SortingParameters))
copy(cln.SortingParameters, rkP.SortingParameters)
}
if rkP.ThresholdIDs != nil {
cln.ThresholdIDs = make([]string, len(rkP.ThresholdIDs))
copy(cln.ThresholdIDs, rkP.ThresholdIDs)
}
return
}
// rankingProfileLockKey returns the ID used to lock a RankingProfile with guardian
func rankingProfileLockKey(tnt, id string) string {
return utils.ConcatenatedKey(utils.CacheRankingProfiles, tnt, id)
}
func NewRankingFromProfile(rkP *RankingProfile) (rk *Ranking) {
rk = &Ranking{
Tenant: rkP.Tenant,
ID: rkP.ID,
Sorting: rkP.Sorting,
Metrics: make(map[string]map[string]float64),
rkPrfl: rkP,
metricIDs: utils.NewStringSet(rkP.MetricIDs),
}
if rkP.SortingParameters != nil {
rk.SortingParameters = make([]string, len(rkP.SortingParameters))
copy(rk.SortingParameters, rkP.SortingParameters)
}
return
}
type RankingWithAPIOpts struct {
*Ranking
APIOpts map[string]any
}
// Ranking is one unit out of a profile
type Ranking struct {
rMux sync.RWMutex
Tenant string
ID string
LastUpdate time.Time
Metrics map[string]map[string]float64 // map[statID]map[metricID]metricValue
Sorting string
SortingParameters []string
SortedStatIDs []string
rkPrfl *RankingProfile // store here the ranking profile so we can have it at hands further
metricIDs utils.StringSet // convert the metricIDs here for faster matching
}
func (r *Ranking) TenantID() string {
return utils.ConcatenatedKey(r.Tenant, r.ID)
}
// asRankingSummary converts the Ranking instance into a RankingSummary one
func (rk *Ranking) asRankingSummary() (rkSm *RankingSummary) {
rkSm = &RankingSummary{
Tenant: rk.Tenant,
ID: rk.ID,
LastUpdate: rk.LastUpdate,
}
rkSm.SortedStatIDs = make([]string, len(rk.SortedStatIDs))
copy(rkSm.SortedStatIDs, rk.SortedStatIDs)
return
}
type rankingSorter interface {
sortStatIDs() []string // sortStatIDs returns the sorted list of statIDs
}
// rankingSortStats will return the list of sorted statIDs out of the sortingData map
func rankingSortStats(sortingType string, sortingParams []string,
Metrics map[string]map[string]float64) (sortedStatIDs []string, err error) {
var rnkSrtr rankingSorter
if rnkSrtr, err = newRankingSorter(sortingType, sortingParams, Metrics); err != nil {
return
}
return rnkSrtr.sortStatIDs(), nil
}
// newRankingSorter is the constructor for various ranking sorters
//
// returns error if the sortingType is not implemented
func newRankingSorter(sortingType string, sortingParams []string,
Metrics map[string]map[string]float64) (rkStr rankingSorter, err error) {
switch sortingType {
default:
err = utils.ErrPrefixNotErrNotImplemented(sortingType)
return
case utils.MetaDesc:
return newRankingDescSorter(sortingParams, Metrics), nil
case utils.MetaAsc:
return newRankingAscSorter(sortingParams, Metrics), nil
}
}
// newRankingDescSorter is a constructor for rankingDescSorter
func newRankingDescSorter(sortingParams []string,
Metrics map[string]map[string]float64) (rkDsrtr *rankingDescSorter) {
clnSp := make([]string, len(sortingParams))
sPReversed := make(utils.StringSet)
for i, sP := range sortingParams { // clean the sortingParams, out of param:false or param:true definitions
sPSlc := strings.Split(sP, utils.InInFieldSep)
clnSp[i] = sPSlc[0]
if len(sPSlc) > 1 && sPSlc[1] == utils.FalseStr {
sPReversed.Add(sPSlc[0]) // param defined as param:false which should be added to reversing comparison
}
}
rkDsrtr = &rankingDescSorter{
clnSp,
sPReversed,
Metrics,
make([]string, 0, len(Metrics))}
for statID := range rkDsrtr.Metrics {
rkDsrtr.statIDs = append(rkDsrtr.statIDs, statID)
}
return
}
// rankingDescSorter will sort data descendent for metrics in sortingParams or random if all equal
type rankingDescSorter struct {
sMetricIDs []string
sMetricRev utils.StringSet // list of exceptios for sortingParams, reverting the sorting logic
Metrics map[string]map[string]float64
statIDs []string // list of keys of the Metrics
}
// sortStatIDs implements rankingSorter interface
func (rkDsrtr *rankingDescSorter) sortStatIDs() []string {
if len(rkDsrtr.statIDs) == 0 {
return rkDsrtr.statIDs
}
sort.Slice(rkDsrtr.statIDs, func(i, j int) bool {
for _, metricID := range rkDsrtr.sMetricIDs {
val1, hasMetric1 := rkDsrtr.Metrics[rkDsrtr.statIDs[i]][metricID]
if !hasMetric1 {
return false
}
val2, hasMetric2 := rkDsrtr.Metrics[rkDsrtr.statIDs[j]][metricID]
if !hasMetric2 {
return true
}
//in case we have the same value for the current metricID we skip to the next one
if val1 == val2 {
continue
}
ret := val1 > val2
if rkDsrtr.sMetricRev.Has(metricID) {
ret = !ret
}
return ret
}
//in case that we have the same value for all params we return randomly
return utils.BoolGenerator().RandomBool()
})
return rkDsrtr.statIDs
}
// newRankingAscSorter is a constructor for rankingAscSorter
func newRankingAscSorter(sortingParams []string,
Metrics map[string]map[string]float64) (rkASrtr *rankingAscSorter) {
clnSp := make([]string, len(sortingParams))
sPReversed := make(utils.StringSet)
for i, sP := range sortingParams { // clean the sortingParams, out of param:false or param:true definitions
sPSlc := strings.Split(sP, utils.InInFieldSep)
clnSp[i] = sPSlc[0]
if len(sPSlc) > 1 && sPSlc[1] == utils.FalseStr {
sPReversed.Add(sPSlc[0]) // param defined as param:false which should be added to reversing comparison
}
}
rkASrtr = &rankingAscSorter{
clnSp,
sPReversed,
Metrics,
make([]string, 0, len(Metrics))}
for statID := range rkASrtr.Metrics {
rkASrtr.statIDs = append(rkASrtr.statIDs, statID)
}
return
}
// rankingAscSorter will sort data ascendent for metrics in sortingParams or randomly if all equal
type rankingAscSorter struct {
sMetricIDs []string
sMetricRev utils.StringSet // list of exceptios for sortingParams, reverting the sorting logic
Metrics map[string]map[string]float64
statIDs []string // list of keys of the Metrics
}
// sortStatIDs implements rankingSorter interface
func (rkASrtr *rankingAscSorter) sortStatIDs() []string {
if len(rkASrtr.statIDs) == 0 {
return rkASrtr.statIDs
}
sort.Slice(rkASrtr.statIDs, func(i, j int) bool {
for _, metricID := range rkASrtr.sMetricIDs {
val1, hasMetric1 := rkASrtr.Metrics[rkASrtr.statIDs[i]][metricID]
if !hasMetric1 {
return false
}
val2, hasMetric2 := rkASrtr.Metrics[rkASrtr.statIDs[j]][metricID]
if !hasMetric2 {
return true
}
//in case we have the same value for the current metricID we skip to the next one
if val1 == val2 {
continue
}
ret := val2 > val1
if rkASrtr.sMetricRev.Has(metricID) {
ret = !ret // reversed logic in case of metric:false in params
}
return ret
}
//in case that we have the same value for all params we return randomly
return utils.BoolGenerator().RandomBool()
})
return rkASrtr.statIDs
}
// RankingSummary is the event sent to TrendS and EEs
type RankingSummary struct {
Tenant string
ID string
LastUpdate time.Time
SortedStatIDs []string
}

View File

@@ -288,6 +288,7 @@ func GetDefaultEmptyCacheStats() map[string]*ltcache.CacheStats {
utils.CacheStatQueues: {},
utils.CacheSTIR: {},
utils.CacheRankingProfiles: {},
utils.CacheRankings: {},
utils.CacheRouteFilterIndexes: {},
utils.CacheRouteProfiles: {},
utils.CacheThresholdFilterIndexes: {},
@@ -358,7 +359,7 @@ type TestEngine struct {
func (ng TestEngine) Run(t *testing.T, extraFlags ...string) (*birpc.Client, *config.CGRConfig) {
t.Helper()
cfg := parseCfg(t, ng.ConfigPath, ng.ConfigJSON, ng.DBCfg)
flushDBs(t, cfg, !ng.PreserveDataDB, !ng.PreserveStorDB)
FlushDBs(t, cfg, !ng.PreserveDataDB, !ng.PreserveStorDB)
if ng.PreStartHook != nil {
ng.PreStartHook(t, cfg)
}
@@ -467,7 +468,7 @@ func loadCSVs(t *testing.T, tpPath string, csvFiles map[string]string) {
}
// flushDBs resets the databases specified in the configuration if the corresponding flags are true.
func flushDBs(t *testing.T, cfg *config.CGRConfig, flushDataDB, flushStorDB bool) {
func FlushDBs(t *testing.T, cfg *config.CGRConfig, flushDataDB, flushStorDB bool) {
t.Helper()
if flushDataDB {
if err := InitDataDB(cfg); err != nil {

View File

@@ -20,9 +20,11 @@ package engine
import (
"math"
"slices"
"sync"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
@@ -80,32 +82,147 @@ func (srp *TrendProfile) TenantID() string {
return utils.ConcatenatedKey(srp.Tenant, srp.ID)
}
type TrendProfilesAPI struct {
Tenant string
TpIDs []string
}
type TrendWithAPIOpts struct {
*Trend
APIOpts map[string]any
}
// NewTrendFromProfile is a constructor for an empty trend out of it's profile
func NewTrendFromProfile(tP *TrendProfile) *Trend {
return &Trend{
Tenant: tP.Tenant,
ID: tP.ID,
RunTimes: make([]time.Time, 0),
Metrics: make(map[time.Time]map[string]*MetricWithTrend),
tPrfl: tP,
}
}
// Trend is the unit matched by filters
type Trend struct {
tMux *sync.RWMutex
tMux sync.RWMutex
Tenant string
ID string
RunTimes []time.Time
Metrics map[time.Time]map[string]*MetricWithTrend
CompressedMetrics []byte // if populated, Metrics will be empty
CompressedMetrics []byte // if populated, Metrics and RunTimes will be emty
// indexes help faster processing
mLast map[string]time.Time // last time a metric was present
mCounts map[string]int // number of times a metrics is present in Metrics
mCounts map[string]int // number of times a metric is present in Metrics
mTotals map[string]float64 // cached sum, used for average calculations
tP *TrendProfile // cache here the settings
tPrfl *TrendProfile // store here the trend profile so we can have it at hands further
}
func (t *Trend) Clone() (tC *Trend) {
return
}
// AsTrendSummary transforms the trend into TrendSummary
func (t *Trend) asTrendSummary() (ts *TrendSummary) {
ts = &TrendSummary{
Tenant: t.Tenant,
ID: t.ID,
Metrics: make(map[string]*MetricWithTrend),
}
if len(t.RunTimes) != 0 {
ts.Time = t.RunTimes[len(t.RunTimes)-1]
for mID, mWt := range t.Metrics[ts.Time] {
ts.Metrics[mID] = &MetricWithTrend{
ID: mWt.ID,
Value: mWt.Value,
TrendGrowth: mWt.TrendGrowth,
TrendLabel: mWt.TrendLabel,
}
}
}
return
}
func (t *Trend) compress(ms utils.Marshaler) (err error) {
if config.CgrConfig().TrendSCfg().StoreUncompressedLimit > len(t.RunTimes) {
return
}
t.CompressedMetrics, err = ms.Marshal(t.Metrics)
if err != nil {
return
}
t.Metrics = nil
t.RunTimes = nil
return nil
}
func (t *Trend) uncompress(ms utils.Marshaler) (err error) {
if t == nil || t.CompressedMetrics == nil {
return
}
err = ms.Unmarshal(t.CompressedMetrics, &t.Metrics)
if err != nil {
return
}
t.CompressedMetrics = []byte{}
t.RunTimes = make([]time.Time, len(t.Metrics))
i := 0
for key := range t.Metrics {
t.RunTimes[i] = key
i++
}
slices.SortFunc(t.RunTimes, func(a, b time.Time) int {
return a.Compare(b)
})
return
}
// Compile is used to initialize or cleanup the Trend
//
// thread safe since it should be used close to source
func (t *Trend) Compile(cleanTtl time.Duration, qLength int) {
t.cleanup(cleanTtl, qLength)
if len(t.mTotals) == 0 { // indexes were not yet built
t.computeIndexes()
}
}
// cleanup will clean stale data out of
func (t *Trend) cleanup(ttl time.Duration, qLength int) (altered bool) {
if ttl >= 0 {
expTime := time.Now().Add(-ttl)
var expIdx *int
for i, rT := range t.RunTimes {
if rT.After(expTime) {
continue
}
expIdx = &i
delete(t.Metrics, rT)
}
if expIdx != nil {
if len(t.RunTimes)-1 == *expIdx {
t.RunTimes = make([]time.Time, 0)
} else {
t.RunTimes = t.RunTimes[*expIdx+1:]
}
altered = true
}
}
diffLen := len(t.RunTimes) - qLength
if qLength > 0 && diffLen > 0 {
var rmTms []time.Time
rmTms, t.RunTimes = t.RunTimes[:diffLen], t.RunTimes[diffLen:]
for _, rmTm := range rmTms {
delete(t.Metrics, rmTm)
}
altered = true
}
if altered {
t.computeIndexes()
}
return
}
// computeIndexes should be called after each retrieval from DB
@@ -129,7 +246,7 @@ func (t *Trend) indexesAppendMetric(mWt *MetricWithTrend, rTime time.Time) {
// getTrendGrowth returns the percentage growth for a specific metric
//
// correlation parameter will define whether the comparison is against last or average value
// @correlation parameter will define whether the comparison is against last or average value
// errors in case of previous
func (t *Trend) getTrendGrowth(mID string, mVal float64, correlation string, roundDec int) (tG float64, err error) {
var prevVal float64
@@ -147,9 +264,9 @@ func (t *Trend) getTrendGrowth(mID string, mVal float64, correlation string, rou
default:
return -1.0, utils.ErrCorrelationUndefined
}
diffVal := mVal - prevVal
return utils.Round(diffVal/100, roundDec, utils.MetaRoundingMiddle), nil
diffVal := mVal - prevVal
return utils.Round(diffVal*100/prevVal, roundDec, utils.MetaRoundingMiddle), nil
}
// getTrendLabel identifies the trend label for the instant value of the metric
@@ -181,3 +298,11 @@ type MetricWithTrend struct {
func (tr *Trend) TenantID() string {
return utils.ConcatenatedKey(tr.Tenant, tr.ID)
}
// TrendSummary represents the last trend computed
type TrendSummary struct {
Tenant string
ID string
Time time.Time
Metrics map[string]*MetricWithTrend
}

View File

@@ -21,7 +21,6 @@ package engine
import (
"errors"
"reflect"
"sync"
"testing"
"time"
@@ -217,7 +216,7 @@ func TestTrendTenantID(t *testing.T) {
"metric1": 2.5,
"metric2": 2.0,
},
tP: &TrendProfile{
tPrfl: &TrendProfile{
Tenant: "cgrates.org",
ID: "trendProfileID",
Schedule: "0 * * * *",
@@ -247,8 +246,8 @@ func TestTrendTenantID(t *testing.T) {
t.Errorf("Expected 2 metrics time entries, but got %d", len(trend.Metrics))
}
if trend.tP.QueueLength != 10 {
t.Errorf("Expected QueueLength 10, but got %d", trend.tP.QueueLength)
if trend.tPrfl.QueueLength != 10 {
t.Errorf("Expected QueueLength 10, but got %d", trend.tPrfl.QueueLength)
}
}
@@ -324,7 +323,6 @@ func TestGetTrendLabel(t *testing.T) {
func TestGetTrendGrowth(t *testing.T) {
trend := Trend{
tMux: &sync.RWMutex{},
mLast: map[string]time.Time{},
Metrics: map[time.Time]map[string]*MetricWithTrend{},
mTotals: map[string]float64{},
@@ -351,9 +349,8 @@ func TestGetTrendGrowth(t *testing.T) {
}
got, err := trend.getTrendGrowth("metric1", 100, utils.MetaLast, 2)
expected := utils.Round(20.0/100, 2, utils.MetaRoundingMiddle)
if err != nil || !reflect.DeepEqual(got, expected) {
t.Errorf("Mismatch for MetaLast correlation. Got: %v, expected: %v", got, expected)
if err != nil || got != 25.0 {
t.Errorf("Mismatch for MetaLast correlation. Got: %v, expected: %v", got, 25.0)
}
trend.mTotals = map[string]float64{
@@ -364,9 +361,8 @@ func TestGetTrendGrowth(t *testing.T) {
}
got, err = trend.getTrendGrowth("metric1", 120, utils.MetaAverage, 2)
expected = utils.Round(20.0/100, 2, utils.MetaRoundingMiddle)
if err != nil || !reflect.DeepEqual(got, expected) {
t.Errorf("Mismatch for MetaAverage correlation. Got: %v, expected: %v", got, expected)
if err != nil || got != 20.0 {
t.Errorf("Mismatch for MetaAverage correlation. Got: %v, expected: %v", got, 20.0)
}
_, err = trend.getTrendGrowth("metric1", 100, "invalidCorrelation", 2)

View File

@@ -525,8 +525,8 @@ func StatQueueProfileToAPI(st *StatQueueProfile) (tpST *utils.TPStatProfile) {
type RankingMdls []*RankingMdl
func (tps RankingMdls) CSVHeader() (result []string) {
return []string{"#" + utils.Tenant, utils.ID, utils.StatIDs,
utils.MetricIDs, utils.Sorting, utils.SortingParameters,
return []string{"#" + utils.Tenant, utils.ID, utils.Schedule, utils.StatIDs,
utils.MetricIDs, utils.Sorting, utils.SortingParameters, utils.Stored,
utils.ThresholdIDs}
}
@@ -534,25 +534,30 @@ func (models RankingMdls) AsTPRanking() (result []*utils.TPRankingProfile) {
thresholdMap := make(map[string]utils.StringSet)
metricsMap := make(map[string]utils.StringSet)
sortingParameterMap := make(map[string]utils.StringSet)
sortingParameterSlice := make(map[string][]string)
statsMap := make(map[string]utils.StringSet)
msg := make(map[string]*utils.TPRankingProfile)
mrg := make(map[string]*utils.TPRankingProfile)
for _, model := range models {
key := &utils.TenantID{Tenant: model.Tenant, ID: model.ID}
sg, found := msg[key.TenantID()]
rg, found := mrg[key.TenantID()]
if !found {
sg = &utils.TPRankingProfile{
Tenant: model.Tenant,
TPid: model.Tpid,
ID: model.ID,
QueryInterval: model.QueryInterval,
Sorting: model.Sorting,
rg = &utils.TPRankingProfile{
Tenant: model.Tenant,
TPid: model.Tpid,
ID: model.ID,
Schedule: model.Schedule,
Sorting: model.Sorting,
Stored: model.Stored,
}
}
if model.QueryInterval != utils.EmptyString {
sg.QueryInterval = model.QueryInterval
if model.Schedule != utils.EmptyString {
rg.Schedule = model.Schedule
}
if model.Sorting != utils.EmptyString {
sg.QueryInterval = model.QueryInterval
rg.Sorting = model.Sorting
}
if model.Stored {
rg.Stored = model.Stored
}
if model.StatIDs != utils.EmptyString {
if _, has := statsMap[key.TenantID()]; !has {
@@ -569,8 +574,15 @@ func (models RankingMdls) AsTPRanking() (result []*utils.TPRankingProfile) {
if model.SortingParameters != utils.EmptyString {
if _, has := sortingParameterMap[key.TenantID()]; !has {
sortingParameterMap[key.TenantID()] = make(utils.StringSet)
sortingParameterSlice[key.TenantID()] = make([]string, 0)
}
spltSl := strings.Split(model.SortingParameters, utils.InfieldSep)
for _, splt := range spltSl {
if _, has := sortingParameterMap[key.TenantID()][splt]; !has {
sortingParameterMap[key.TenantID()].Add(splt)
sortingParameterSlice[key.TenantID()] = append(sortingParameterSlice[key.TenantID()], splt)
}
}
sortingParameterMap[key.TenantID()].AddSlice(strings.Split(model.SortingParameters, utils.InfieldSep))
}
if model.MetricIDs != utils.EmptyString {
if _, has := metricsMap[key.TenantID()]; !has {
@@ -578,47 +590,48 @@ func (models RankingMdls) AsTPRanking() (result []*utils.TPRankingProfile) {
}
metricsMap[key.TenantID()].AddSlice(strings.Split(model.MetricIDs, utils.InfieldSep))
}
msg[key.TenantID()] = sg
mrg[key.TenantID()] = rg
}
result = make([]*utils.TPRankingProfile, len(msg))
result = make([]*utils.TPRankingProfile, len(mrg))
i := 0
for tntID, sg := range msg {
result[i] = sg
for tntID, rg := range mrg {
result[i] = rg
result[i].StatIDs = statsMap[tntID].AsSlice()
result[i].MetricIDs = metricsMap[tntID].AsSlice()
result[i].SortingParameters = sortingParameterMap[tntID].AsSlice()
result[i].SortingParameters = sortingParameterSlice[tntID]
result[i].ThresholdIDs = thresholdMap[tntID].AsOrderedSlice()
i++
}
return
}
func APItoModelTPRanking(tpSG *utils.TPRankingProfile) (mdls RankingMdls) {
if tpSG == nil {
func APItoModelTPRanking(tpRG *utils.TPRankingProfile) (mdls RankingMdls) {
if tpRG == nil {
return
}
if len(tpSG.StatIDs) == 0 {
if len(tpRG.StatIDs) == 0 {
mdl := &RankingMdl{
Tpid: tpSG.TPid,
Tenant: tpSG.Tenant,
ID: tpSG.ID,
QueryInterval: tpSG.QueryInterval,
Sorting: tpSG.Sorting,
Tpid: tpRG.TPid,
Tenant: tpRG.Tenant,
ID: tpRG.ID,
Schedule: tpRG.Schedule,
Sorting: tpRG.Sorting,
Stored: tpRG.Stored,
}
for i, val := range tpSG.ThresholdIDs {
for i, val := range tpRG.ThresholdIDs {
if i != 0 {
mdl.ThresholdIDs += utils.InfieldSep
}
mdl.ThresholdIDs += val
}
for i, metric := range tpSG.MetricIDs {
for i, metric := range tpRG.MetricIDs {
if i != 0 {
mdl.MetricIDs += utils.InfieldSep
}
mdl.MetricIDs += metric
}
for i, sorting := range tpSG.SortingParameters {
for i, sorting := range tpRG.SortingParameters {
if i != 0 {
mdl.SortingParameters += utils.InfieldSep
}
@@ -627,28 +640,28 @@ func APItoModelTPRanking(tpSG *utils.TPRankingProfile) (mdls RankingMdls) {
mdls = append(mdls, mdl)
}
for i, stat := range tpSG.StatIDs {
for i, stat := range tpRG.StatIDs {
mdl := &RankingMdl{
Tpid: tpSG.TPid,
Tenant: tpSG.Tenant,
ID: tpSG.ID,
Tpid: tpRG.TPid,
Tenant: tpRG.Tenant,
ID: tpRG.ID,
}
if i == 0 {
mdl.QueryInterval = tpSG.QueryInterval
mdl.Sorting = tpSG.Sorting
for i, val := range tpSG.ThresholdIDs {
mdl.Schedule = tpRG.Schedule
mdl.Sorting = tpRG.Sorting
for i, val := range tpRG.ThresholdIDs {
if i != 0 {
mdl.ThresholdIDs += utils.InfieldSep
}
mdl.ThresholdIDs += val
}
for i, metric := range tpSG.MetricIDs {
for i, metric := range tpRG.MetricIDs {
if i != 0 {
mdl.MetricIDs += utils.InfieldSep
}
mdl.MetricIDs += metric
}
for i, sorting := range tpSG.SortingParameters {
for i, sorting := range tpRG.SortingParameters {
if i != 0 {
mdl.SortingParameters += utils.InfieldSep
}
@@ -661,44 +674,41 @@ func APItoModelTPRanking(tpSG *utils.TPRankingProfile) (mdls RankingMdls) {
return
}
func APItoRanking(tpSG *utils.TPRankingProfile) (sg *RankingProfile, err error) {
sg = &RankingProfile{
Tenant: tpSG.Tenant,
ID: tpSG.ID,
StatIDs: make([]string, len(tpSG.StatIDs)),
MetricIDs: make([]string, len(tpSG.MetricIDs)),
Sorting: tpSG.Sorting,
SortingParameters: make([]string, len(tpSG.SortingParameters)),
ThresholdIDs: make([]string, len(tpSG.ThresholdIDs)),
func APItoRanking(tpRG *utils.TPRankingProfile) (rg *RankingProfile, err error) {
rg = &RankingProfile{
Tenant: tpRG.Tenant,
ID: tpRG.ID,
Schedule: tpRG.Schedule,
Sorting: tpRG.Sorting,
Stored: tpRG.Stored,
StatIDs: make([]string, len(tpRG.StatIDs)),
MetricIDs: make([]string, len(tpRG.MetricIDs)),
SortingParameters: make([]string, len(tpRG.SortingParameters)),
ThresholdIDs: make([]string, len(tpRG.ThresholdIDs)),
}
if tpSG.QueryInterval != utils.EmptyString {
if sg.QueryInterval, err = utils.ParseDurationWithNanosecs(tpSG.QueryInterval); err != nil {
return nil, err
}
}
copy(sg.StatIDs, tpSG.StatIDs)
copy(sg.ThresholdIDs, tpSG.ThresholdIDs)
copy(sg.MetricIDs, tpSG.MetricIDs)
return sg, nil
copy(rg.StatIDs, tpRG.StatIDs)
copy(rg.ThresholdIDs, tpRG.ThresholdIDs)
copy(rg.SortingParameters, tpRG.SortingParameters)
copy(rg.MetricIDs, tpRG.MetricIDs)
return rg, nil
}
func RankingProfileToAPI(sg *RankingProfile) (tpSG *utils.TPRankingProfile) {
tpSG = &utils.TPRankingProfile{
Tenant: sg.Tenant,
ID: sg.ID,
StatIDs: make([]string, len(sg.StatIDs)),
MetricIDs: make([]string, len(sg.MetricIDs)),
SortingParameters: make([]string, len(sg.SortingParameters)),
ThresholdIDs: make([]string, len(sg.ThresholdIDs)),
func RankingProfileToAPI(rg *RankingProfile) (tpRG *utils.TPRankingProfile) {
tpRG = &utils.TPRankingProfile{
Tenant: rg.Tenant,
ID: rg.ID,
Schedule: rg.Schedule,
Sorting: rg.Sorting,
Stored: rg.Stored,
StatIDs: make([]string, len(rg.StatIDs)),
MetricIDs: make([]string, len(rg.MetricIDs)),
SortingParameters: make([]string, len(rg.SortingParameters)),
ThresholdIDs: make([]string, len(rg.ThresholdIDs)),
}
if sg.QueryInterval != time.Duration(0) {
tpSG.QueryInterval = sg.QueryInterval.String()
}
copy(tpSG.StatIDs, sg.StatIDs)
copy(tpSG.ThresholdIDs, sg.ThresholdIDs)
copy(tpSG.MetricIDs, sg.MetricIDs)
copy(tpSG.SortingParameters, sg.SortingParameters)
copy(tpRG.StatIDs, rg.StatIDs)
copy(tpRG.ThresholdIDs, rg.ThresholdIDs)
copy(tpRG.MetricIDs, rg.MetricIDs)
copy(tpRG.SortingParameters, rg.SortingParameters)
return
}

View File

@@ -75,12 +75,13 @@ type RankingMdl struct {
Tpid string
Tenant string `index:"0" re:".*"`
ID string `index:"1" re:".*"`
QueryInterval string `index:"2" re:".*"`
Schedule string `index:"2" re:".*"`
StatIDs string `index:"3" re:".*"`
MetricIDs string `index:"4" re:".*"`
Sorting string `index:"5" re:".*"`
SortingParameters string `index:"6" re:".*"`
ThresholdIDs string `index:"7" re:".*"`
Stored bool `index:"7" re:".*"`
ThresholdIDs string `index:"8" re:".*"`
CreatedAt time.Time
}

View File

@@ -19,50 +19,518 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"fmt"
"runtime"
"slices"
"strings"
"sync"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/cron"
)
type RankingProfileWithAPIOpts struct {
*RankingProfile
APIOpts map[string]any
}
type RankingProfile struct {
Tenant string
ID string
QueryInterval time.Duration
StatIDs []string
MetricIDs []string
Sorting string
SortingParameters []string
ThresholdIDs []string
}
func (sgp *RankingProfile) TenantID() string {
return utils.ConcatenatedKey(sgp.Tenant, sgp.ID)
}
// rankingProfileLockKey returns the ID used to lock a RankingProfile with guardian
func rankingProfileLockKey(tnt, id string) string {
return utils.ConcatenatedKey(utils.CacheRankingProfiles, tnt, id)
}
func NewRankingService(dm *DataManager, cgrcfg *config.CGRConfig, filterS *FilterS, connMgr *ConnManager) (tS *RankingS) {
// NewRankingS is the constructor for RankingS
func NewRankingS(dm *DataManager,
connMgr *ConnManager,
filterS *FilterS,
cgrcfg *config.CGRConfig) *RankingS {
return &RankingS{
dm: dm,
cfg: cgrcfg,
fltrS: filterS,
connMgr: connMgr,
dm: dm,
connMgr: connMgr,
filterS: filterS,
cgrcfg: cgrcfg,
crn: cron.New(),
crnRQsMux: new(sync.RWMutex),
crnRQs: make(map[string]map[string]cron.EntryID),
storedRankings: make(utils.StringSet),
storingStopped: make(chan struct{}),
rankingStop: make(chan struct{}),
}
}
// RankingS manages Ranking execution
// RankingS is responsible of implementing the logic of RankingService
type RankingS struct {
dm *DataManager
cfg *config.CGRConfig
fltrS *FilterS
connMgr *ConnManager
filterS *FilterS
cgrcfg *config.CGRConfig
crn *cron.Cron // cron reference
crnRQsMux *sync.RWMutex // protects the crnTQs
crnRQs map[string]map[string]cron.EntryID // save the EntryIDs for rankingQueries so we can reschedule them when needed
storedRankings utils.StringSet // keep a record of RankingS which need saving, map[rankingTenanrkID]bool
sRksMux sync.RWMutex // protects storedRankings
storingStopped chan struct{} // signal back that the operations were stopped
rankingStop chan struct{} // signal to stop all operations
}
// computeRanking will query the stats and build the Ranking for them
//
// it is to be called by Cron service
func (rkS *RankingS) computeRanking(ctx *context.Context, rkP *RankingProfile) {
rk, err := rkS.dm.GetRanking(ctx, rkP.Tenant, rkP.ID, true, true, utils.NonTransactional)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> querying RankingProfile with ID: <%s:%s> dm error: <%s>",
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
return
}
rk.rMux.Lock()
defer rk.rMux.Unlock()
if rk.rkPrfl == nil {
rk.rkPrfl = rkP
}
rk.LastUpdate = time.Now()
rk.Metrics = make(map[string]map[string]float64) // reset previous values
rk.SortedStatIDs = make([]string, 0)
for _, statID := range rkP.StatIDs {
var floatMetrics map[string]float64
if err := rkS.connMgr.Call(context.Background(), rkS.cgrcfg.RankingSCfg().StatSConns,
utils.StatSv1GetQueueFloatMetrics,
&utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: rkP.Tenant, ID: statID}},
&floatMetrics); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> computing Ranking with ID: <%s:%s> for Stats <%s> error: <%s>",
utils.RankingS, rkP.Tenant, rkP.ID, statID, err.Error()))
return
}
if len(rk.metricIDs) != 0 {
for metricID := range floatMetrics {
if _, has := rk.metricIDs[statID]; !has {
delete(floatMetrics, metricID)
}
}
}
if len(floatMetrics) != 0 {
rk.Metrics[statID] = make(map[string]float64)
}
for metricID, val := range floatMetrics {
rk.Metrics[statID][metricID] = val
}
}
if rk.SortedStatIDs, err = rankingSortStats(rkP.Sorting,
rkP.SortingParameters, rk.Metrics); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> sorting stats for Ranking with ID: <%s:%s> error: <%s>",
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
return
}
if err = rkS.storeRanking(ctx, rk); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> storing Ranking with ID: <%s:%s> DM error: <%s>",
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
return
}
if err := rkS.processThresholds(rk); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> Ranking with id <%s:%s> error: <%s> with ThresholdS",
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
}
if err := rkS.processEEs(rk); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> Trend with id <%s:%s> error: <%s> with EEs",
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
}
}
// processThresholds will pass the Ranking event to ThresholdS
func (rkS *RankingS) processThresholds(rk *Ranking) (err error) {
if len(rk.SortedStatIDs) == 0 {
return
}
if len(rkS.cgrcfg.RankingSCfg().ThresholdSConns) == 0 {
return
}
opts := map[string]any{
utils.MetaEventType: utils.RankingUpdate,
}
var thIDs []string
if len(rk.rkPrfl.ThresholdIDs) != 0 {
if len(rk.rkPrfl.ThresholdIDs) == 1 &&
rk.rkPrfl.ThresholdIDs[0] == utils.MetaNone {
return
}
thIDs = make([]string, len(rk.rkPrfl.ThresholdIDs))
copy(thIDs, rk.rkPrfl.ThresholdIDs)
}
opts[utils.OptsThresholdsProfileIDs] = thIDs
sortedStatIDs := make([]string, len(rk.SortedStatIDs))
copy(sortedStatIDs, rk.SortedStatIDs)
ev := &utils.CGREvent{
Tenant: rk.Tenant,
ID: utils.GenUUID(),
APIOpts: opts,
Event: map[string]any{
utils.RankingID: rk.ID,
utils.LastUpdate: rk.LastUpdate,
utils.SortedStatIDs: sortedStatIDs,
},
}
var withErrs bool
var rkIDs []string
if err := rkS.connMgr.Call(context.TODO(), rkS.cgrcfg.RankingSCfg().ThresholdSConns,
utils.ThresholdSv1ProcessEvent, ev, &rkIDs); err != nil &&
(len(thIDs) != 0 || err.Error() != utils.ErrNotFound.Error()) {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.", utils.RankingS, err.Error(), ev))
withErrs = true
}
if withErrs {
err = utils.ErrPartiallyExecuted
}
return
}
// processEEs will pass the Ranking event to EEs
func (rkS *RankingS) processEEs(rk *Ranking) (err error) {
if len(rk.SortedStatIDs) == 0 {
return
}
if len(rkS.cgrcfg.RankingSCfg().EEsConns) == 0 {
return
}
opts := map[string]any{
utils.MetaEventType: utils.RankingUpdate,
}
sortedStatIDs := make([]string, len(rk.SortedStatIDs))
copy(sortedStatIDs, rk.SortedStatIDs)
ev := &utils.CGREvent{
Tenant: rk.Tenant,
ID: utils.GenUUID(),
APIOpts: opts,
Event: map[string]any{
utils.RankingID: rk.ID,
utils.LastUpdate: rk.LastUpdate,
utils.SortedStatIDs: sortedStatIDs,
},
}
var withErrs bool
var reply map[string]map[string]any
if err := rkS.connMgr.Call(context.TODO(), rkS.cgrcfg.RankingSCfg().EEsConns,
utils.EeSv1ProcessEvent, ev, &reply); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %q processing event %+v with EEs.", utils.RankingS, err.Error(), ev))
withErrs = true
}
if withErrs {
err = utils.ErrPartiallyExecuted
}
return
}
// storeTrend will store or schedule the trend based on settings
func (rkS *RankingS) storeRanking(ctx *context.Context, rk *Ranking) (err error) {
if rkS.cgrcfg.RankingSCfg().StoreInterval == 0 {
return
}
if rkS.cgrcfg.RankingSCfg().StoreInterval == -1 {
return rkS.dm.SetRanking(ctx, rk)
}
// schedule the asynchronous save, relies for Ranking to be in cache
rkS.sRksMux.Lock()
rkS.storedRankings.Add(rk.rkPrfl.TenantID())
rkS.sRksMux.Unlock()
return
}
// storeRankings will do one round for saving modified Rankings
//
// from cache to dataDB
// designed to run asynchronously
func (rkS *RankingS) storeRankings(ctx *context.Context) {
var failedRkIDs []string
for {
rkS.sRksMux.Lock()
rkID := rkS.storedRankings.GetOne()
if rkID != utils.EmptyString {
rkS.storedRankings.Remove(rkID)
}
rkS.sRksMux.Unlock()
if rkID == utils.EmptyString {
break // no more keys, backup completed
}
rkIf, ok := Cache.Get(utils.CacheRankings, rkID)
if !ok || rkIf == nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> failed retrieving from cache Ranking with ID: %q",
utils.RankingS, rkID))
failedRkIDs = append(failedRkIDs, rkID) // record failure so we can schedule it for next backup
continue
}
rk := rkIf.(*Ranking)
rk.rMux.RLock()
if err := rkS.dm.SetRanking(ctx, rk); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> failed storing Trend with ID: %q, err: %q",
utils.RankingS, rkID, err))
failedRkIDs = append(failedRkIDs, rkID) // record failure so we can schedule it for next backup
}
rk.rMux.RUnlock()
// randomize the CPU load and give up thread control
runtime.Gosched()
}
if len(failedRkIDs) != 0 { // there were errors on save, schedule the keys for next backup
rkS.sRksMux.Lock()
rkS.storedRankings.AddSlice(failedRkIDs)
rkS.sRksMux.Unlock()
}
}
// asyncStoreRankings runs as a backround process, calling storeRankings based on storeInterval
func (rkS *RankingS) asyncStoreRankings(ctx *context.Context) {
storeInterval := rkS.cgrcfg.RankingSCfg().StoreInterval
if storeInterval <= 0 {
close(rkS.storingStopped)
return
}
for {
rkS.storeRankings(ctx)
select {
case <-rkS.rankingStop:
close(rkS.storingStopped)
return
case <-time.After(storeInterval): // continue to another storing loop
}
}
}
// StartRankings will activates the Cron, together with all scheduled Ranking queries
func (rkS *RankingS) StartRankingS(ctx *context.Context) (err error) {
if err = rkS.scheduleAutomaticQueries(ctx); err != nil {
return
}
rkS.crn.Start()
go rkS.asyncStoreRankings(ctx)
return
}
// StopCron will shutdown the Cron tasks
func (rkS *RankingS) StopRankingS() {
timeEnd := time.Now().Add(rkS.cgrcfg.CoreSCfg().ShutdownTimeout)
ctx := rkS.crn.Stop()
close(rkS.rankingStop)
// Wait for cron
select {
case <-ctx.Done():
case <-time.After(time.Until(timeEnd)):
utils.Logger.Warning(
fmt.Sprintf(
"<%s> timeout waiting for Cron to finish",
utils.RankingS))
return
}
// Wait for backup and other operations
select {
case <-rkS.storingStopped:
case <-time.After(time.Until(timeEnd)):
utils.Logger.Warning(
fmt.Sprintf(
"<%s> timeout waiting for RankingS to finish",
utils.RankingS))
return
}
}
func (rkS *RankingS) Reload(ctx *context.Context) {
crnCtx := rkS.crn.Stop()
close(rkS.rankingStop)
<-crnCtx.Done()
<-rkS.storingStopped
rkS.rankingStop = make(chan struct{})
rkS.storingStopped = make(chan struct{})
rkS.crn.Start()
go rkS.asyncStoreRankings(ctx)
}
// scheduleAutomaticQueries will schedule the queries at start/reload based on configured
func (rkS *RankingS) scheduleAutomaticQueries(ctx *context.Context) error {
schedData := make(map[string][]string)
for k, v := range rkS.cgrcfg.RankingSCfg().ScheduledIDs {
schedData[k] = v
}
var tnts []string
if len(schedData) == 0 {
tnts = make([]string, 0)
}
for tnt, rkIDs := range schedData {
if len(rkIDs) == 0 {
tnts = append(tnts, tnt)
}
}
if tnts != nil {
qrydData, err := rkS.dm.GetRankingProfileIDs(ctx, tnts)
if err != nil {
return err
}
for tnt, ids := range qrydData {
schedData[tnt] = ids
}
}
for tnt, rkIDs := range schedData {
if _, err := rkS.scheduleRankingQueries(ctx, tnt, rkIDs); err != nil {
return err
}
}
return nil
}
// scheduleTrendQueries will schedule/re-schedule specific trend queries
func (rkS *RankingS) scheduleRankingQueries(ctx *context.Context,
tnt string, rkIDs []string) (scheduled int, err error) {
var partial bool
rkS.crnRQsMux.Lock()
if _, has := rkS.crnRQs[tnt]; !has {
rkS.crnRQs[tnt] = make(map[string]cron.EntryID)
}
rkS.crnRQsMux.Unlock()
for _, rkID := range rkIDs {
rkS.crnRQsMux.RLock()
if entryID, has := rkS.crnRQs[tnt][rkID]; has {
rkS.crn.Remove(entryID) // deschedule the query
}
rkS.crnRQsMux.RUnlock()
if rkP, err := rkS.dm.GetRankingProfile(ctx, tnt, rkID, true, true, utils.NonTransactional); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> failed retrieving RankingProfile with id: <%s:%s> for scheduling, error: <%s>",
utils.RankingS, tnt, rkID, err.Error()))
partial = true
} else if entryID, err := rkS.crn.AddFunc(rkP.Schedule,
func() { rkS.computeRanking(ctx, rkP.Clone()) }); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> scheduling RankingProfile <%s:%s>, error: <%s>",
utils.RankingS, tnt, rkID, err.Error()))
partial = true
} else { // log the entry ID for debugging
rkS.crnRQsMux.Lock()
rkS.crnRQs[rkP.Tenant][rkP.ID] = entryID
rkS.crnRQsMux.Unlock()
scheduled++
}
}
if partial {
return 0, utils.ErrPartiallyExecuted
}
return
}
// V1ScheduleQueries is the query for manually re-/scheduling Ranking Queries
func (rkS *RankingS) V1ScheduleQueries(ctx *context.Context, args *utils.ArgScheduleRankingQueries, scheduled *int) (err error) {
if sched, errSched := rkS.scheduleRankingQueries(ctx, args.Tenant, args.RankingIDs); errSched != nil {
return errSched
} else {
*scheduled = sched
}
return
}
// V1GetRanking is the API to return the Ranking instance
func (rkS *RankingS) V1GetRanking(ctx *context.Context, arg *utils.TenantIDWithAPIOpts, retRanking *Ranking) (err error) {
if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
var rk *Ranking
if rk, err = rkS.dm.GetRanking(ctx, arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
return
}
rk.rMux.RLock()
defer rk.rMux.RUnlock()
retRanking.Tenant = rk.Tenant // avoid vet complaining for mutex copying
retRanking.ID = rk.ID
retRanking.Metrics = make(map[string]map[string]float64)
for statID, metrics := range rk.Metrics {
retRanking.Metrics[statID] = make(map[string]float64)
for metricID, val := range metrics {
retRanking.Metrics[statID][metricID] = val
}
}
retRanking.LastUpdate = rk.LastUpdate
retRanking.Sorting = rk.Sorting
retRanking.SortingParameters = make([]string, len(rk.SortingParameters))
copy(retRanking.SortingParameters, rk.SortingParameters)
retRanking.SortedStatIDs = make([]string, len(rk.SortedStatIDs))
copy(retRanking.SortedStatIDs, rk.SortedStatIDs)
return
}
// V1GetSchedule returns the active schedule for Ranking queries
func (rkS *RankingS) V1GetSchedule(ctx *context.Context, args *utils.ArgScheduledRankings, schedRankings *[]utils.ScheduledRanking) (err error) {
tnt := args.Tenant
if tnt == utils.EmptyString {
tnt = rkS.cgrcfg.GeneralCfg().DefaultTenant
}
rkS.crnRQsMux.RLock()
defer rkS.crnRQsMux.RUnlock()
rankingIDsMp, has := rkS.crnRQs[tnt]
if !has {
return utils.ErrNotFound
}
var scheduledRankings []utils.ScheduledRanking
var entryIds map[string]cron.EntryID
if len(args.RankingIDPrefixes) == 0 {
entryIds = rankingIDsMp
} else {
entryIds = make(map[string]cron.EntryID)
for _, rkID := range args.RankingIDPrefixes {
for key, entryID := range rankingIDsMp {
if strings.HasPrefix(key, rkID) {
entryIds[key] = entryID
}
}
}
}
if len(entryIds) == 0 {
return utils.ErrNotFound
}
var entry cron.Entry
for id, entryID := range entryIds {
entry = rkS.crn.Entry(entryID)
if entry.ID == 0 {
continue
}
scheduledRankings = append(scheduledRankings,
utils.ScheduledRanking{
RankingID: id,
Next: entry.Next,
Previous: entry.Prev,
})
}
slices.SortFunc(scheduledRankings, func(a, b utils.ScheduledRanking) int {
return a.Next.Compare(b.Next)
})
*schedRankings = scheduledRankings
return nil
}
// V1GetRankingSummary returns a summary of ascending/descending stat of the last updated ranking
func (rS *RankingS) V1GetRankingSummary(ctx *context.Context, arg *utils.TenantIDWithAPIOpts, reply *RankingSummary) (err error) {
var rnk *Ranking
if rnk, err = rS.dm.GetRanking(ctx, arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
return
}
rnk.rMux.RLock()
rnkS := rnk.asRankingSummary()
rnk.rMux.RUnlock()
*reply = *rnkS
return
}

View File

@@ -20,7 +20,6 @@ package engine
import (
"testing"
"time"
"github.com/cgrates/cgrates/config"
)
@@ -29,7 +28,6 @@ func TestTenantID(t *testing.T) {
rp := &RankingProfile{
Tenant: "cgrates.org",
ID: "01",
QueryInterval: 5 * time.Minute,
StatIDs: []string{"stat1", "stat2"},
MetricIDs: []string{"metric1"},
Sorting: "asc",
@@ -50,7 +48,6 @@ func TestRankingProfileWithAPIOpts(t *testing.T) {
rp := &RankingProfile{
Tenant: "cgrates.org",
ID: "ID",
QueryInterval: 5 * time.Minute,
StatIDs: []string{"stat1", "stat2"},
MetricIDs: []string{"metric1"},
Sorting: "asc",
@@ -102,7 +99,7 @@ func TestNewRankingService(t *testing.T) {
filterS := &FilterS{}
connMgr := &ConnManager{}
rankingService := NewRankingService(dm, cgrcfg, filterS, connMgr)
rankingService := NewRankingS(dm, connMgr, filterS, cgrcfg)
if rankingService == nil {
t.Fatal("NewRankingService() returned nil")
@@ -112,12 +109,12 @@ func TestNewRankingService(t *testing.T) {
t.Errorf("Expected dm to be %v, got %v", dm, rankingService.dm)
}
if rankingService.cfg != cgrcfg {
t.Errorf("Expected cfg to be %v, got %v", cgrcfg, rankingService.cfg)
if rankingService.cgrcfg != cgrcfg {
t.Errorf("Expected cfg to be %v, got %v", cgrcfg, rankingService.cgrcfg)
}
if rankingService.fltrS != filterS {
t.Errorf("Expected fltrS to be %v, got %v", filterS, rankingService.fltrS)
if rankingService.filterS != filterS {
t.Errorf("Expected fltrS to be %v, got %v", filterS, rankingService.filterS)
}
if rankingService.connMgr != connMgr {

View File

@@ -32,36 +32,36 @@ import (
"github.com/ericlagergren/decimal"
)
func TestPopulateCostForRoutesConnRefused(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
connMgr := NewConnManager(cfg)
fltrS := NewFilterS(cfg, connMgr, nil)
routes := map[string]*RouteWithWeight{
"RW": {
Route: &Route{
ID: "local",
RateProfileIDs: []string{"RP_LOCAL"},
},
Weight: 10,
},
}
ev := &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]any{
utils.Usage: "10s",
},
APIOpts: map[string]any{
utils.OptsRatesProfileIDs: []string{},
},
}
extraOpts := &optsGetRoutes{}
cfg.RouteSCfg().RateSConns = []string{"*localhost"}
_, err := populateCostForRoutes(context.Background(), cfg, connMgr, fltrS, routes, ev, extraOpts)
errExpect := "RATES_ERROR:dial tcp 127.0.0.1:2012: connect: connection refused"
if err == nil || err.Error() != errExpect {
t.Errorf("Expected %v\n but received %v", errExpect, err)
}
}
// func TestPopulateCostForRoutesConnRefused(t *testing.T) {
// cfg := config.NewDefaultCGRConfig()
// connMgr := NewConnManager(cfg)
// fltrS := NewFilterS(cfg, connMgr, nil)
// routes := map[string]*RouteWithWeight{
// "RW": {
// Route: &Route{
// ID: "local",
// RateProfileIDs: []string{"RP_LOCAL"},
// },
// Weight: 10,
// },
// }
// ev := &utils.CGREvent{
// Tenant: "cgrates.org",
// Event: map[string]any{
// utils.Usage: "10s",
// },
// APIOpts: map[string]any{
// utils.OptsRatesProfileIDs: []string{},
// },
// }
// extraOpts := &optsGetRoutes{}
// cfg.RouteSCfg().RateSConns = []string{"*localhost"}
// _, err := populateCostForRoutes(context.Background(), cfg, connMgr, fltrS, routes, ev, extraOpts)
// errExpect := "RATES_ERROR:dial tcp 127.0.0.1:2012: connect: connection refused"
// if err == nil || err.Error() != errExpect {
// t.Errorf("Expected %v\n but received %v", errExpect, err)
// }
// }
func TestLeastCostSorterSortRoutesErr(t *testing.T) {

View File

@@ -367,9 +367,9 @@ func (csvs *CSVStorage) GetTPRankings(tpid, tenant, id string) ([]*utils.TPRanki
func (csvs *CSVStorage) GetTPTrends(tpid, tenant, id string) ([]*utils.TPTrendsProfile, error) {
var tpTrends TrendMdls
if err := csvs.proccesData(TrendMdl{}, csvs.trendsFn, func(tp any) {
tPTrends := tp.(TrendMdl)
tPTrends.Tpid = tpid
tpTrends = append(tpTrends, &tPTrends)
tPTrend := tp.(TrendMdl)
tPTrend.Tpid = tpid
tpTrends = append(tpTrends, &tPTrend)
}); err != nil {
return nil, err
}

View File

@@ -67,12 +67,15 @@ type DataDB interface {
SetRankingProfileDrv(ctx *context.Context, rp *RankingProfile) (err error)
GetRankingProfileDrv(ctx *context.Context, tenant string, id string) (sq *RankingProfile, err error)
RemRankingProfileDrv(ctx *context.Context, tenant string, id string) (err error)
SetRankingDrv(ctx *context.Context, rn *Ranking) (err error)
GetRankingDrv(ctx *context.Context, tenant string, id string) (sq *Ranking, err error)
RemoveRankingDrv(ctx *context.Context, tenant string, id string) (err error)
SetTrendProfileDrv(ctx *context.Context, sq *TrendProfile) (err error)
GetTrendProfileDrv(ctx *context.Context, tenant string, id string) (sq *TrendProfile, err error)
RemTrendProfileDrv(ctx *context.Context, tenant string, id string) (err error)
GetTrendDrv(string, string) (*Trend, error)
SetTrendDrv(*Trend) error
RemoveTrendDrv(string, string) error
GetTrendDrv(ctx *context.Context, tenant string, id string) (*Trend, error)
SetTrendDrv(ctx *context.Context, tr *Trend) error
RemoveTrendDrv(ctx *context.Context, tenant string, id string) error
GetFilterDrv(ctx *context.Context, tnt string, id string) (*Filter, error)
SetFilterDrv(ctx *context.Context, f *Filter) error
RemoveFilterDrv(ctx *context.Context, tnt string, id string) error

View File

@@ -274,6 +274,26 @@ func (iDB *InternalDB) RemRankingProfileDrv(_ *context.Context, tenant, id strin
return nil
}
func (iDB *InternalDB) GetRankingDrv(_ *context.Context, tenant, id string) (rn *Ranking, err error) {
x, ok := iDB.db.Get(utils.CacheRankings, utils.ConcatenatedKey(tenant, id))
if !ok || x == nil {
return nil, utils.ErrNotFound
}
return x.(*Ranking), nil
}
func (iDB *InternalDB) SetRankingDrv(_ *context.Context, rn *Ranking) (err error) {
iDB.db.Set(utils.CacheRankings, rn.TenantID(), rn, nil,
true, utils.NonTransactional)
return
}
func (iDB *InternalDB) RemoveRankingDrv(_ *context.Context, tenant, id string) (err error) {
iDB.db.Remove(utils.CacheRankings, utils.ConcatenatedKey(tenant, id),
true, utils.NonTransactional)
return
}
func (iDB *InternalDB) GetStatQueueDrv(_ *context.Context, tenant, id string) (sq *StatQueue, err error) {
x, ok := iDB.db.Get(utils.CacheStatQueues, utils.ConcatenatedKey(tenant, id))
if !ok || x == nil {
@@ -324,7 +344,7 @@ func (iDB *InternalDB) GetTrendProfileDrv(_ *context.Context, tenant, id string)
return x.(*TrendProfile), nil
}
func (iDB *InternalDB) GetTrendDrv(tenant, id string) (th *Trend, err error) {
func (iDB *InternalDB) GetTrendDrv(_ *context.Context, tenant, id string) (th *Trend, err error) {
x, ok := iDB.db.Get(utils.CacheTrends, utils.ConcatenatedKey(tenant, id))
if !ok || x == nil {
return nil, utils.ErrNotFound
@@ -332,13 +352,13 @@ func (iDB *InternalDB) GetTrendDrv(tenant, id string) (th *Trend, err error) {
return x.(*Trend), nil
}
func (iDB *InternalDB) SetTrendDrv(tr *Trend) (err error) {
func (iDB *InternalDB) SetTrendDrv(_ *context.Context, tr *Trend) (err error) {
iDB.db.Set(utils.CacheTrends, tr.TenantID(), tr, nil,
true, utils.NonTransactional)
return
}
func (iDB *InternalDB) RemoveTrendDrv(tenant, id string) (err error) {
func (iDB *InternalDB) RemoveTrendDrv(_ *context.Context, tenant, id string) (err error) {
iDB.db.Remove(utils.CacheTrends, utils.ConcatenatedKey(tenant, id),
true, utils.NonTransactional)
return

View File

@@ -66,6 +66,7 @@ const (
ColTrs = "trend_profiles"
ColTrd = "trends"
ColRgp = "ranking_profiles"
ColRnk = "rankings"
ColFlt = "filters"
ColRts = "route_profiles"
ColAttr = "attribute_profiles"
@@ -781,6 +782,38 @@ func (ms *MongoStorage) RemRankingProfileDrv(ctx *context.Context, tenant, id st
}
func (ms *MongoStorage) GetRankingDrv(ctx *context.Context, tenant, id string) (*Ranking, error) {
rn := new(Ranking)
err := ms.query(ctx, func(sctx mongo.SessionContext) error {
sr := ms.getCol(ColRnk).FindOne(sctx, bson.M{"tenant": tenant, "id": id})
decodeErr := sr.Decode(rn)
if errors.Is(decodeErr, mongo.ErrNoDocuments) {
return utils.ErrNotFound
}
return decodeErr
})
return rn, err
}
func (ms *MongoStorage) SetRankingDrv(ctx *context.Context, rn *Ranking) error {
return ms.query(ctx, func(sctx mongo.SessionContext) error {
_, err := ms.getCol(ColRnk).UpdateOne(sctx, bson.M{"tenant": rn.Tenant, "id": rn.ID},
bson.M{"$set": rn},
options.Update().SetUpsert(true),
)
return err
})
}
func (ms *MongoStorage) RemoveRankingDrv(ctx *context.Context, tenant, id string) error {
return ms.query(ctx, func(sctx mongo.SessionContext) error {
dr, err := ms.getCol(ColRnk).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id})
if dr.DeletedCount == 0 {
return utils.ErrNotFound
}
return err
})
}
func (ms *MongoStorage) GetTrendProfileDrv(ctx *context.Context, tenant, id string) (*TrendProfile, error) {
srProfile := new(TrendProfile)
err := ms.query(ctx, func(sctx mongo.SessionContext) error {
@@ -813,9 +846,9 @@ func (ms *MongoStorage) RemTrendProfileDrv(ctx *context.Context, tenant, id stri
})
}
func (ms *MongoStorage) GetTrendDrv(tenant, id string) (*Trend, error) {
func (ms *MongoStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (*Trend, error) {
tr := new(Trend)
err := ms.query(context.Background(), func(sctx mongo.SessionContext) error {
err := ms.query(ctx, func(sctx mongo.SessionContext) error {
sr := ms.getCol(ColTrd).FindOne(sctx, bson.M{"tenant": tenant, "id": id})
decodeErr := sr.Decode(tr)
if errors.Is(decodeErr, mongo.ErrNoDocuments) {
@@ -826,8 +859,8 @@ func (ms *MongoStorage) GetTrendDrv(tenant, id string) (*Trend, error) {
return tr, err
}
func (ms *MongoStorage) SetTrendDrv(tr *Trend) error {
return ms.query(context.Background(), func(sctx mongo.SessionContext) error {
func (ms *MongoStorage) SetTrendDrv(ctx *context.Context, tr *Trend) error {
return ms.query(ctx, func(sctx mongo.SessionContext) error {
_, err := ms.getCol(ColTrd).UpdateOne(sctx, bson.M{"tenant": tr.Tenant, "id": tr.ID},
bson.M{"$set": tr},
options.Update().SetUpsert(true),
@@ -836,8 +869,8 @@ func (ms *MongoStorage) SetTrendDrv(tr *Trend) error {
})
}
func (ms *MongoStorage) RemoveTrendDrv(tenant, id string) error {
return ms.query(context.Background(), func(sctx mongo.SessionContext) error {
func (ms *MongoStorage) RemoveTrendDrv(ctx *context.Context, tenant, id string) error {
return ms.query(ctx, func(sctx mongo.SessionContext) error {
dr, err := ms.getCol(ColTrd).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id})
if dr.DeletedCount == 0 {
return utils.ErrNotFound

View File

@@ -553,7 +553,7 @@ func (rs *RedisStorage) RemTrendProfileDrv(ctx *context.Context, tenant string,
return rs.Cmd(nil, redisDEL, utils.TrendProfilePrefix+utils.ConcatenatedKey(tenant, id))
}
func (rs *RedisStorage) GetTrendDrv(tenant, id string) (r *Trend, err error) {
func (rs *RedisStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (r *Trend, err error) {
var values []byte
if err = rs.Cmd(&values, redisGET, utils.TrendPrefix+utils.ConcatenatedKey(tenant, id)); err != nil {
return
@@ -565,7 +565,7 @@ func (rs *RedisStorage) GetTrendDrv(tenant, id string) (r *Trend, err error) {
return
}
func (rs *RedisStorage) SetTrendDrv(r *Trend) (err error) {
func (rs *RedisStorage) SetTrendDrv(ctx *context.Context, r *Trend) (err error) {
var result []byte
if result, err = rs.ms.Marshal(r); err != nil {
return
@@ -573,7 +573,7 @@ func (rs *RedisStorage) SetTrendDrv(r *Trend) (err error) {
return rs.Cmd(nil, redisSET, utils.TrendPrefix+utils.ConcatenatedKey(r.Tenant, r.ID), string(result))
}
func (rs *RedisStorage) RemoveTrendDrv(tenant, id string) (err error) {
func (rs *RedisStorage) RemoveTrendDrv(ctx *context.Context, tenant, id string) (err error) {
return rs.Cmd(nil, redisDEL, utils.TrendPrefix+utils.ConcatenatedKey(tenant, id))
}
@@ -596,10 +596,35 @@ func (rs *RedisStorage) GetRankingProfileDrv(ctx *context.Context, tenant string
err = rs.ms.Unmarshal(values, &sg)
return
}
func (rs *RedisStorage) RemRankingProfileDrv(ctx *context.Context, tenant string, id string) (err error) {
return rs.Cmd(nil, redisDEL, utils.RankingProfilePrefix+utils.ConcatenatedKey(tenant, id))
}
func (rs *RedisStorage) GetRankingDrv(ctx *context.Context, tenant, id string) (rn *Ranking, err error) {
var values []byte
if err = rs.Cmd(&values, redisGET, utils.RankingPrefix+utils.ConcatenatedKey(tenant, id)); err != nil {
return
} else if len(values) == 0 {
err = utils.ErrNotFound
return
}
err = rs.ms.Unmarshal(values, &rn)
return rn, err
}
func (rs *RedisStorage) SetRankingDrv(_ *context.Context, rn *Ranking) (err error) {
var result []byte
if result, err = rs.ms.Marshal(rn); err != nil {
return
}
return rs.Cmd(nil, redisSET, utils.RankingPrefix+utils.ConcatenatedKey(rn.Tenant, rn.ID), string(result))
}
func (rs *RedisStorage) RemoveRankingDrv(ctx *context.Context, tenant, id string) (err error) {
return rs.Cmd(nil, redisDEL, utils.RankingPrefix+utils.ConcatenatedKey(tenant, id))
}
// GetThresholdProfileDrv retrieves a ThresholdProfile from dataDB
func (rs *RedisStorage) GetThresholdProfileDrv(ctx *context.Context, tenant, ID string) (tp *ThresholdProfile, err error) {
var values []byte

View File

@@ -20,6 +20,9 @@ package engine
import (
"fmt"
"runtime"
"slices"
"strings"
"sync"
"time"
@@ -32,13 +35,16 @@ import (
func NewTrendService(dm *DataManager,
cgrcfg *config.CGRConfig, filterS *FilterS, connMgr *ConnManager) (tS *TrendS) {
return &TrendS{
dm: dm,
cfg: cgrcfg,
fltrS: filterS,
connMgr: connMgr,
loopStopped: make(chan struct{}),
crnTQs: make(map[string]map[string]cron.EntryID),
crnTQsMux: new(sync.RWMutex),
dm: dm,
cfg: cgrcfg,
fltrS: filterS,
connMgr: connMgr,
loopStopped: make(chan struct{}),
crnTQs: make(map[string]map[string]cron.EntryID),
crnTQsMux: new(sync.RWMutex),
storedTrends: make(utils.StringSet),
storingStopped: make(chan struct{}),
trendStop: make(chan struct{}),
}
}
@@ -54,13 +60,18 @@ type TrendS struct {
crnTQsMux *sync.RWMutex // protects the crnTQs
crnTQs map[string]map[string]cron.EntryID // save the EntryIDs for TrendQueries so we can reschedule them when needed
storedTrends utils.StringSet // keep a record of trends which need saving, map[trendTenantID]bool
sTrndsMux sync.RWMutex // protects storedTrends
storingStopped chan struct{} // signal back that the operations were stopped
loopStopped chan struct{}
trendStop chan struct{} // signal to stop all operations
}
// computeTrend will query a stat and build the Trend for it
//
// it is to be called by Cron service
func (tS *TrendS) computeTrend(tP *TrendProfile) {
// it is to be called by Cron service
func (tS *TrendS) computeTrend(ctx *context.Context, tP *TrendProfile) {
var floatMetrics map[string]float64
if err := tS.connMgr.Call(context.Background(), tS.cfg.TrendSCfg().StatSConns,
utils.StatSv1GetQueueFloatMetrics,
@@ -68,32 +79,28 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) {
&floatMetrics); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> computing trend for with id: <%s:%s> stats <%s> error: <%s>",
"<%s> computing trend with id: <%s:%s> for stats <%s> error: <%s>",
utils.TrendS, tP.Tenant, tP.ID, tP.StatID, err.Error()))
return
}
trend, err := tS.dm.GetTrend(tP.Tenant, tP.ID, true, true, utils.NonTransactional)
if err == utils.ErrNotFound {
trend = &Trend{
Tenant: tP.Tenant,
ID: tP.ID,
RunTimes: make([]time.Time, 0),
Metrics: make(map[time.Time]map[string]*MetricWithTrend),
}
} else if err != nil {
trnd, err := tS.dm.GetTrend(ctx, tP.Tenant, tP.ID, true, true, utils.NonTransactional)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> querying trend with id: <%s:%s> dm error: <%s>",
utils.TrendS, tP.Tenant, tP.ID, err.Error()))
return
}
trend.tMux.Lock()
defer trend.tMux.Unlock()
trnd.tMux.Lock()
defer trnd.tMux.Unlock()
if trnd.tPrfl == nil {
trnd.tPrfl = tP
}
trnd.Compile(tP.TTL, tP.QueueLength)
now := time.Now()
var metrics []string
if len(tP.Metrics) != 0 {
metrics = tP.Metrics
metrics = tP.Metrics // read only
}
if len(metrics) == 0 { // unlimited metrics in trend
for mID := range floatMetrics {
@@ -103,8 +110,11 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) {
if len(metrics) == 0 {
return // nothing to compute
}
trend.RunTimes = append(trend.RunTimes, now)
trend.Metrics[now] = make(map[string]*MetricWithTrend)
trnd.RunTimes = append(trnd.RunTimes, now)
if trnd.Metrics == nil {
trnd.Metrics = make(map[time.Time]map[string]*MetricWithTrend)
}
trnd.Metrics[now] = make(map[string]*MetricWithTrend)
for _, mID := range metrics {
mWt := &MetricWithTrend{ID: mID}
var has bool
@@ -113,49 +123,445 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) {
mWt.TrendLabel = utils.NotAvailable
continue
}
if mWt.TrendGrowth, err = trend.getTrendGrowth(mID, mWt.Value, tP.CorrelationType, tS.cfg.GeneralCfg().RoundingDecimals); err != nil {
if mWt.TrendGrowth, err = trnd.getTrendGrowth(mID, mWt.Value, tP.CorrelationType,
tS.cfg.GeneralCfg().RoundingDecimals); err != nil {
mWt.TrendLabel = utils.NotAvailable
} else {
mWt.TrendLabel = trend.getTrendLabel(mWt.TrendGrowth, tP.Tolerance)
mWt.TrendLabel = trnd.getTrendLabel(mWt.TrendGrowth, tP.Tolerance)
}
trend.Metrics[now][mWt.ID] = mWt
trnd.Metrics[now][mWt.ID] = mWt
trnd.indexesAppendMetric(mWt, now)
}
if err := tS.dm.SetTrend(trend); err != nil {
if err = tS.storeTrend(ctx, trnd); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> setting trend with id: <%s:%s> dm error: <%s>",
"<%s> setting Trend with id: <%s:%s> DM error: <%s>",
utils.TrendS, tP.Tenant, tP.ID, err.Error()))
return
}
if err = tS.processThresholds(trnd); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> Trend with id <%s:%s> error: <%s> with ThresholdS",
utils.TrendS, tP.Tenant, tP.ID, err.Error()))
}
if err = tS.processEEs(trnd); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> Trend with id <%s:%s> error: <%s> with EEs",
utils.TrendS, tP.Tenant, tP.ID, err.Error()))
}
}
// processThresholds will pass the Trend event to ThresholdS
func (tS *TrendS) processThresholds(trnd *Trend) (err error) {
if len(trnd.RunTimes) == 0 ||
len(trnd.RunTimes) < trnd.tPrfl.MinItems {
return
}
if len(tS.cfg.TrendSCfg().ThresholdSConns) == 0 {
return
}
opts := map[string]any{
utils.MetaEventType: utils.TrendUpdate,
}
var thIDs []string
if len(trnd.tPrfl.ThresholdIDs) != 0 {
if len(trnd.tPrfl.ThresholdIDs) == 1 &&
trnd.tPrfl.ThresholdIDs[0] == utils.MetaNone {
return
}
thIDs = make([]string, len(trnd.tPrfl.ThresholdIDs))
copy(thIDs, trnd.tPrfl.ThresholdIDs)
}
opts[utils.OptsThresholdsProfileIDs] = thIDs
ts := trnd.asTrendSummary()
trndEv := &utils.CGREvent{
Tenant: trnd.Tenant,
ID: utils.GenUUID(),
APIOpts: opts,
Event: map[string]any{
utils.TrendID: trnd.ID,
utils.Time: ts.Time,
utils.Metrics: ts.Metrics,
},
}
var withErrs bool
var tIDs []string
if err := tS.connMgr.Call(context.TODO(), tS.cfg.TrendSCfg().ThresholdSConns,
utils.ThresholdSv1ProcessEvent, trndEv, &tIDs); err != nil &&
(len(thIDs) != 0 || err.Error() != utils.ErrNotFound.Error()) {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.", utils.TrendS, err.Error(), trndEv))
withErrs = true
}
if withErrs {
err = utils.ErrPartiallyExecuted
}
return
}
// processEEs will pass the Trend event to EEs
func (tS *TrendS) processEEs(trnd *Trend) (err error) {
if len(trnd.RunTimes) == 0 ||
len(trnd.RunTimes) < trnd.tPrfl.MinItems {
return
}
if len(tS.cfg.TrendSCfg().EEsConns) == 0 {
return
}
opts := map[string]any{
utils.MetaEventType: utils.TrendUpdate,
}
ts := trnd.asTrendSummary()
trndEv := &utils.CGREventWithEeIDs{
CGREvent: &utils.CGREvent{
Tenant: trnd.Tenant,
ID: utils.GenUUID(),
APIOpts: opts,
Event: map[string]any{
utils.TrendID: trnd.ID,
utils.Time: ts.Time,
utils.Metrics: ts.Metrics,
},
},
EeIDs: tS.cfg.TrendSCfg().EEsExporterIDs,
}
var withErrs bool
var reply map[string]map[string]any
if err := tS.connMgr.Call(context.TODO(), tS.cfg.TrendSCfg().EEsConns,
utils.EeSv1ProcessEvent, trndEv, &reply); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %q processing event %+v with EEs.", utils.TrendS, err.Error(), trndEv))
withErrs = true
}
if withErrs {
err = utils.ErrPartiallyExecuted
}
return
}
// storeTrend will store or schedule the trend based on settings
func (tS *TrendS) storeTrend(ctx *context.Context, trnd *Trend) (err error) {
if tS.cfg.TrendSCfg().StoreInterval == 0 {
return
}
if tS.cfg.TrendSCfg().StoreInterval == -1 {
return tS.dm.SetTrend(ctx, trnd)
}
// schedule the asynchronous save, relies for Trend to be in cache
tS.sTrndsMux.Lock()
tS.storedTrends.Add(trnd.TenantID())
tS.sTrndsMux.Unlock()
return
}
// storeTrends will do one round for saving modified trends
//
// from cache to dataDB
// designed to run asynchronously
func (tS *TrendS) storeTrends(ctx *context.Context) {
var failedTrndIDs []string
for {
tS.sTrndsMux.Lock()
trndID := tS.storedTrends.GetOne()
if trndID != utils.EmptyString {
tS.storedTrends.Remove(trndID)
}
tS.sTrndsMux.Unlock()
if trndID == utils.EmptyString {
break // no more keys, backup completed
}
trndIf, ok := Cache.Get(utils.CacheTrends, trndID)
if !ok || trndIf == nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> failed retrieving from cache Trend with ID: %q",
utils.TrendS, trndID))
failedTrndIDs = append(failedTrndIDs, trndID) // record failure so we can schedule it for next backup
continue
}
trnd := trndIf.(*Trend)
trnd.tMux.RLock()
if err := tS.dm.SetTrend(ctx, trnd); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> failed storing Trend with ID: %q, err: %q",
utils.TrendS, trndID, err))
failedTrndIDs = append(failedTrndIDs, trndID) // record failure so we can schedule it for next backup
}
trnd.tMux.RUnlock()
// randomize the CPU load and give up thread control
runtime.Gosched()
}
if len(failedTrndIDs) != 0 { // there were errors on save, schedule the keys for next backup
tS.sTrndsMux.Lock()
tS.storedTrends.AddSlice(failedTrndIDs)
tS.sTrndsMux.Unlock()
}
}
// asyncStoreTrends runs as a backround process, calling storeTrends based on storeInterval
func (tS *TrendS) asyncStoreTrends(ctx *context.Context) {
storeInterval := tS.cfg.TrendSCfg().StoreInterval
if storeInterval <= 0 {
close(tS.storingStopped)
return
}
for {
tS.storeTrends(ctx)
select {
case <-tS.trendStop:
close(tS.storingStopped)
return
case <-time.After(storeInterval): // continue to another storing loop
}
}
}
// StartCron will activates the Cron, together with all scheduled Trend queries
func (tS *TrendS) StartTrendS(ctx *context.Context) error {
if err := tS.scheduleAutomaticQueries(ctx); err != nil {
return err
}
tS.crn.Start()
go tS.asyncStoreTrends(ctx)
return nil
}
// StopCron will shutdown the Cron tasks
func (tS *TrendS) StopTrendS() {
timeEnd := time.Now().Add(tS.cfg.CoreSCfg().ShutdownTimeout)
crnctx := tS.crn.Stop()
close(tS.trendStop)
// Wait for cron
select {
case <-crnctx.Done():
case <-time.After(time.Until(timeEnd)):
utils.Logger.Warning(
fmt.Sprintf(
"<%s> timeout waiting for Cron to finish",
utils.TrendS))
return
}
// Wait for backup and other operations
select {
case <-tS.storingStopped:
case <-time.After(time.Until(timeEnd)):
utils.Logger.Warning(
fmt.Sprintf(
"<%s> timeout waiting for TrendS to finish",
utils.TrendS))
return
}
}
func (tS *TrendS) Reload(ctx *context.Context) {
crnctx := tS.crn.Stop()
close(tS.trendStop)
<-crnctx.Done()
<-tS.storingStopped
tS.trendStop = make(chan struct{})
tS.storingStopped = make(chan struct{})
tS.crn.Start()
go tS.asyncStoreTrends(ctx)
}
// scheduleAutomaticQueries will schedule the queries at start/reload based on configured
func (tS *TrendS) scheduleAutomaticQueries(ctx *context.Context) error {
schedData := make(map[string][]string)
for k, v := range tS.cfg.TrendSCfg().ScheduledIDs {
schedData[k] = v
}
var tnts []string
if len(schedData) == 0 {
tnts = make([]string, 0)
}
for tnt, tIDs := range schedData {
if len(tIDs) == 0 {
tnts = append(tnts, tnt)
}
}
if tnts != nil {
qrydData, err := tS.dm.GetTrendProfileIDs(ctx, tnts)
if err != nil {
return err
}
for tnt, ids := range qrydData {
schedData[tnt] = ids
}
}
for tnt, tIDs := range schedData {
if _, err := tS.scheduleTrendQueries(ctx, tnt, tIDs); err != nil {
return err
}
}
return nil
}
// scheduleTrendQueries will schedule/re-schedule specific trend queries
func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIds []string) (complete bool) {
complete = true
for _, tID := range tIds {
func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs []string) (scheduled int, err error) {
var partial bool
tS.crnTQsMux.Lock()
if _, has := tS.crnTQs[tnt]; !has {
tS.crnTQs[tnt] = make(map[string]cron.EntryID)
}
tS.crnTQsMux.Unlock()
for _, tID := range tIDs {
tS.crnTQsMux.RLock()
if entryID, has := tS.crnTQs[tnt][tID]; has {
tS.crn.Remove(entryID)
tS.crn.Remove(entryID) // deschedule the query
}
tS.crnTQsMux.RUnlock()
if tP, err := tS.dm.GetTrendProfile(ctx, tnt, tID); err != nil {
if tP, err := tS.dm.GetTrendProfile(ctx, tnt, tID, true, true, utils.NonTransactional); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> failed retrieving TrendProfile with id: <%s:%s> for scheduling, error: <%s>",
utils.TrendS, tnt, tID, err.Error()))
complete = false
partial = true
} else if entryID, err := tS.crn.AddFunc(tP.Schedule,
func() { tS.computeTrend(tP.Clone()) }); err != nil {
func() { tS.computeTrend(ctx, tP.Clone()) }); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> scheduling TrendProfile <%s:%s>, error: <%s>",
utils.TrendS, tnt, tID, err.Error()))
complete = false
} else {
partial = true
} else { // log the entry ID for debugging
tS.crnTQsMux.Lock()
tS.crnTQs[tP.Tenant][tP.ID] = entryID
tS.crnTQsMux.Unlock()
scheduled++
}
}
if partial {
return 0, utils.ErrPartiallyExecuted
}
return
}
// V1ScheduleQueries is the query for manually re-/scheduling Trend Queries
func (tS *TrendS) V1ScheduleQueries(ctx *context.Context, args *utils.ArgScheduleTrendQueries, scheduled *int) (err error) {
if sched, errSched := tS.scheduleTrendQueries(ctx, args.Tenant, args.TrendIDs); errSched != nil {
return errSched
} else {
*scheduled = sched
}
return
}
// V1GetTrend is the API to return the trend Metrics
// The number of runTimes can be filtered based on indexes and times provided as arguments
//
// in this way being possible to work with paginators
func (tS *TrendS) V1GetTrend(ctx *context.Context, arg *utils.ArgGetTrend, retTrend *Trend) (err error) {
if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
var trnd *Trend
if trnd, err = tS.dm.GetTrend(ctx, arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
return
}
retTrend.Tenant = trnd.Tenant // avoid vet complaining for mutex copying
retTrend.ID = trnd.ID
startIdx := arg.RunIndexStart
if startIdx > len(trnd.RunTimes) {
startIdx = len(trnd.RunTimes) - 1
}
endIdx := arg.RunIndexEnd
if endIdx > len(trnd.RunTimes) ||
endIdx < startIdx ||
endIdx == 0 {
endIdx = len(trnd.RunTimes)
}
runTimes := trnd.RunTimes[startIdx:endIdx]
if len(runTimes) == 0 {
return utils.ErrNotFound
}
var tStart, tEnd time.Time
if arg.RunTimeStart == utils.EmptyString {
tStart = runTimes[0]
} else if tStart, err = utils.ParseTimeDetectLayout(arg.RunTimeStart, tS.cfg.GeneralCfg().DefaultTimezone); err != nil {
return
}
if arg.RunTimeEnd == utils.EmptyString {
tEnd = runTimes[len(runTimes)-1].Add(time.Duration(1))
} else if tEnd, err = utils.ParseTimeDetectLayout(arg.RunTimeEnd, tS.cfg.GeneralCfg().DefaultTimezone); err != nil {
return
}
retTrend.RunTimes = make([]time.Time, 0, len(runTimes))
for _, runTime := range runTimes {
if runTime.After(tStart) && runTime.Before(tEnd) {
retTrend.RunTimes = append(retTrend.RunTimes, runTime)
}
}
if len(retTrend.RunTimes) == 0 { // filtered out all
return utils.ErrNotFound
}
retTrend.Metrics = make(map[time.Time]map[string]*MetricWithTrend)
for _, runTime := range retTrend.RunTimes {
retTrend.Metrics[runTime] = trnd.Metrics[runTime]
}
return
}
func (tS *TrendS) V1GetScheduledTrends(ctx *context.Context, args *utils.ArgScheduledTrends, schedTrends *[]utils.ScheduledTrend) (err error) {
tnt := args.Tenant
if tnt == utils.EmptyString {
tnt = tS.cfg.GeneralCfg().DefaultTenant
}
tS.crnTQsMux.RLock()
defer tS.crnTQsMux.RUnlock()
trendIDsMp, has := tS.crnTQs[tnt]
if !has {
return utils.ErrNotFound
}
var scheduledTrends []utils.ScheduledTrend
var entryIds map[string]cron.EntryID
if len(args.TrendIDPrefixes) == 0 {
entryIds = trendIDsMp
} else {
entryIds = make(map[string]cron.EntryID)
for _, tID := range args.TrendIDPrefixes {
for key, entryID := range trendIDsMp {
if strings.HasPrefix(key, tID) {
entryIds[key] = entryID
}
}
}
}
if len(entryIds) == 0 {
return utils.ErrNotFound
}
var entry cron.Entry
for id, entryID := range entryIds {
entry = tS.crn.Entry(entryID)
if entry.ID == 0 {
continue
}
scheduledTrends = append(scheduledTrends, utils.ScheduledTrend{
TrendID: id,
Next: entry.Next,
Previous: entry.Prev,
})
}
slices.SortFunc(scheduledTrends, func(a, b utils.ScheduledTrend) int {
return a.Next.Compare(b.Next)
})
*schedTrends = scheduledTrends
return nil
}
func (tS *TrendS) V1GetTrendSummary(ctx *context.Context, arg utils.TenantIDWithAPIOpts, reply *TrendSummary) (err error) {
var trnd *Trend
if trnd, err = tS.dm.GetTrend(ctx, arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
return
}
trnd.tMux.RLock()
trndS := trnd.asTrendSummary()
trnd.tMux.RUnlock()
*reply = *trndS
return
}