From 47fb25b4ef69e581ffede248063490368df90878 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Thu, 6 Mar 2025 18:49:15 +0200 Subject: [PATCH] move trends to dedicated package reivse/add comments and order of funcs/definitions --- admins/trends.go | 141 +++++ apis/replicator.go | 8 +- apis/trends.go | 38 +- engine/datadbmock.go | 12 +- engine/datamanager.go | 22 +- engine/dynamicdp.go | 2 +- engine/filters_test.go | 8 +- engine/model_helpers.go | 6 +- engine/storage_interface.go | 8 +- engine/storage_internal_datadb.go | 12 +- engine/storage_mongo_datadb.go | 12 +- engine/storage_redis.go | 8 +- engine/tpreader.go | 2 +- general_tests/trends_schedule_it_test.go | 10 +- general_tests/trends_stored_it_test.go | 12 +- loaders/libloader.go | 2 +- loaders/loader.go | 2 +- services/trends.go | 5 +- tpes/tpe_trends.go | 2 +- tpes/tpe_trends_test.go | 10 +- trends/apis.go | 154 +++++ {engine => trends}/trends.go | 260 ++------ {apis => trends}/trends_it_test.go | 24 +- {engine => trends}/trends_test.go | 9 +- utils/consts.go | 2 +- engine/libtrends.go => utils/trends.go | 587 +++++++++--------- .../libtrends_test.go => utils/trends_test.go | 144 +++-- 27 files changed, 832 insertions(+), 670 deletions(-) create mode 100644 admins/trends.go create mode 100644 trends/apis.go rename {engine => trends}/trends.go (63%) rename {apis => trends}/trends_it_test.go (96%) rename {engine => trends}/trends_test.go (92%) rename engine/libtrends.go => utils/trends.go (68%) rename engine/libtrends_test.go => utils/trends_test.go (85%) diff --git a/admins/trends.go b/admins/trends.go new file mode 100644 index 000000000..40877b403 --- /dev/null +++ b/admins/trends.go @@ -0,0 +1,141 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package admins + +import ( + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/utils" +) + +// V1GetTrendProfile returns a Trend profile +func (a *AdminS) V1GetTrendProfile(ctx *context.Context, arg *utils.TenantIDWithAPIOpts, reply *utils.TrendProfile) (err error) { + if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) + } + tnt := arg.Tenant + if tnt == utils.EmptyString { + tnt = a.cfg.GeneralCfg().DefaultTenant + } + sCfg, err := a.dm.GetTrendProfile(ctx, tnt, arg.ID, true, true, utils.NonTransactional) + if err != nil { + return utils.APIErrorHandler(err) + } + *reply = *sCfg + return +} + +// V1GetTrendProfileIDs returns list of TrendProfile IDs registered for a tenant +func (a *AdminS) V1GetTrendProfileIDs(ctx *context.Context, args *utils.ArgsItemIDs, stsPrfIDs *[]string) (err error) { + tnt := args.Tenant + if tnt == utils.EmptyString { + tnt = a.cfg.GeneralCfg().DefaultTenant + } + prfx := utils.TrendProfilePrefix + tnt + utils.ConcatenatedKeySep + lenPrfx := len(prfx) + prfx += args.ItemsPrefix + var keys []string + if keys, err = a.dm.DataDB().GetKeysForPrefix(ctx, prfx); err != nil { + return + } + if len(keys) == 0 { + return utils.ErrNotFound + } + retIDs := make([]string, len(keys)) + for i, key := range keys { + retIDs[i] = key[lenPrfx:] + } + var limit, offset, maxItems int + if limit, offset, maxItems, err = utils.GetPaginateOpts(args.APIOpts); err != nil { + return + } + *stsPrfIDs, err = utils.Paginate(retIDs, limit, offset, maxItems) + return +} + +// V1GetTrendProfiles returns a list of stats profiles registered for a tenant +func (a *AdminS) V1GetTrendProfiles(ctx *context.Context, args *utils.ArgsItemIDs, sqPrfs *[]*utils.TrendProfile) (err error) { + tnt := args.Tenant + if tnt == utils.EmptyString { + tnt = a.cfg.GeneralCfg().DefaultTenant + } + var sqPrfIDs []string + if err = a.V1GetTrendProfileIDs(ctx, args, &sqPrfIDs); err != nil { + return + } + *sqPrfs = make([]*utils.TrendProfile, 0, len(sqPrfIDs)) + for _, sqPrfID := range sqPrfIDs { + var sqPrf *utils.TrendProfile + sqPrf, err = a.dm.GetTrendProfile(ctx, tnt, sqPrfID, true, true, utils.NonTransactional) + if err != nil { + return utils.APIErrorHandler(err) + } + *sqPrfs = append(*sqPrfs, sqPrf) + } + return +} + +// V1GetTrendProfilesCount returns the total number of TrendProfileIDs registered for a tenant +// returns ErrNotFound in case of 0 TrendProfileIDs +func (a *AdminS) V1GetTrendProfilesCount(ctx *context.Context, args *utils.ArgsItemIDs, reply *int) (err error) { + tnt := args.Tenant + if tnt == utils.EmptyString { + tnt = a.cfg.GeneralCfg().DefaultTenant + } + prfx := utils.TrendProfilePrefix + tnt + utils.ConcatenatedKeySep + args.ItemsPrefix + var keys []string + if keys, err = a.dm.DataDB().GetKeysForPrefix(ctx, prfx); err != nil { + return err + } + if len(keys) == 0 { + return utils.ErrNotFound + } + *reply = len(keys) + return +} + +// V1SetTrendProfile alters/creates a TrendProfile +func (a *AdminS) V1SetTrendProfile(ctx *context.Context, arg *utils.TrendProfileWithAPIOpts, reply *string) (err error) { + if missing := utils.MissingStructFields(arg.TrendProfile, []string{utils.ID}); len(missing) != 0 { + return utils.NewErrMandatoryIeMissing(missing...) + } + if arg.Tenant == utils.EmptyString { + arg.Tenant = a.cfg.GeneralCfg().DefaultTenant + } + if err = a.dm.SetTrendProfile(ctx, arg.TrendProfile); err != nil { + return utils.APIErrorHandler(err) + } + *reply = utils.OK + return nil +} + +// V1RemoveTrendProfile remove a specific stat configuration +func (a *AdminS) V1RemoveTrendProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) error { + if missing := utils.MissingStructFields(args, []string{utils.ID}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) + } + tnt := args.Tenant + if tnt == utils.EmptyString { + tnt = a.cfg.GeneralCfg().DefaultTenant + } + if err := a.dm.RemoveTrendProfile(ctx, tnt, args.ID); err != nil { + return utils.APIErrorHandler(err) + } + *reply = utils.OK + return nil +} diff --git a/apis/replicator.go b/apis/replicator.go index 3323becd5..a2cacaa82 100644 --- a/apis/replicator.go +++ b/apis/replicator.go @@ -109,7 +109,7 @@ func (rplSv1 *ReplicatorSv1) GetStatQueueProfile(ctx *context.Context, tntID *ut } // GetTrend is the remote method coresponding to the dataDb driver method -func (rplSv1 *ReplicatorSv1) GetTrend(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.Trend) error { +func (rplSv1 *ReplicatorSv1) GetTrend(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *utils.Trend) error { engine.UpdateReplicationFilters(utils.TrendPrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt])) rcv, err := rplSv1.dm.DataDB().GetTrendDrv(ctx, tntID.Tenant, tntID.ID) if err != nil { @@ -124,7 +124,7 @@ func (rplSv1 *ReplicatorSv1) GetTrend(ctx *context.Context, tntID *utils.TenantI } // GetTrendProfile is the remote method coresponding to the dataDb driver method -func (rplSv1 *ReplicatorSv1) GetTrendProfile(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.TrendProfile) error { +func (rplSv1 *ReplicatorSv1) GetTrendProfile(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *utils.TrendProfile) error { engine.UpdateReplicationFilters(utils.TrendProfilePrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt])) rcv, err := rplSv1.dm.DataDB().GetTrendProfileDrv(ctx, tntID.Tenant, tntID.ID) if err != nil { @@ -258,7 +258,7 @@ func (rplSv1 *ReplicatorSv1) SetThreshold(ctx *context.Context, th *engine.Thres } // SetTrendProfile is the replication method coresponding to the dataDb driver method -func (rplSv1 *ReplicatorSv1) SetTrendProfile(ctx *context.Context, trp *engine.TrendProfileWithAPIOpts, reply *string) (err error) { +func (rplSv1 *ReplicatorSv1) SetTrendProfile(ctx *context.Context, trp *utils.TrendProfileWithAPIOpts, reply *string) (err error) { if err = rplSv1.dm.DataDB().SetTrendProfileDrv(ctx, trp.TrendProfile); err != nil { return } @@ -276,7 +276,7 @@ func (rplSv1 *ReplicatorSv1) SetTrendProfile(ctx *context.Context, trp *engine.T } // SetTrend is the replication method coresponding to the dataDb driver method -func (rplSv1 *ReplicatorSv1) SetTrend(ctx *context.Context, tr *engine.TrendWithAPIOpts, reply *string) (err error) { +func (rplSv1 *ReplicatorSv1) SetTrend(ctx *context.Context, tr *utils.TrendWithAPIOpts, reply *string) (err error) { if err = rplSv1.dm.DataDB().SetTrendDrv(ctx, tr.Trend); err != nil { return } diff --git a/apis/trends.go b/apis/trends.go index a50699001..110b94ba3 100644 --- a/apis/trends.go +++ b/apis/trends.go @@ -20,12 +20,11 @@ package apis import ( "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) // GetTrendProfile returns a Trend profile -func (adms *AdminSv1) GetTrendProfile(ctx *context.Context, arg *utils.TenantIDWithAPIOpts, reply *engine.TrendProfile) (err error) { +func (adms *AdminSv1) GetTrendProfile(ctx *context.Context, arg *utils.TenantIDWithAPIOpts, reply *utils.TrendProfile) (err error) { if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing return utils.NewErrMandatoryIeMissing(missing...) } @@ -70,7 +69,7 @@ func (adms *AdminSv1) GetTrendProfileIDs(ctx *context.Context, args *utils.ArgsI } // GetTrendProfiles returns a list of stats profiles registered for a tenant -func (admS *AdminSv1) GetTrendProfiles(ctx *context.Context, args *utils.ArgsItemIDs, sqPrfs *[]*engine.TrendProfile) (err error) { +func (admS *AdminSv1) GetTrendProfiles(ctx *context.Context, args *utils.ArgsItemIDs, sqPrfs *[]*utils.TrendProfile) (err error) { tnt := args.Tenant if tnt == utils.EmptyString { tnt = admS.cfg.GeneralCfg().DefaultTenant @@ -79,9 +78,9 @@ func (admS *AdminSv1) GetTrendProfiles(ctx *context.Context, args *utils.ArgsIte if err = admS.GetTrendProfileIDs(ctx, args, &sqPrfIDs); err != nil { return } - *sqPrfs = make([]*engine.TrendProfile, 0, len(sqPrfIDs)) + *sqPrfs = make([]*utils.TrendProfile, 0, len(sqPrfIDs)) for _, sqPrfID := range sqPrfIDs { - var sqPrf *engine.TrendProfile + var sqPrf *utils.TrendProfile sqPrf, err = admS.dm.GetTrendProfile(ctx, tnt, sqPrfID, true, true, utils.NonTransactional) if err != nil { return utils.APIErrorHandler(err) @@ -111,7 +110,7 @@ func (admS *AdminSv1) GetTrendProfilesCount(ctx *context.Context, args *utils.Ar } // SetTrendProfile alters/creates a TrendProfile -func (adms *AdminSv1) SetTrendProfile(ctx *context.Context, arg *engine.TrendProfileWithAPIOpts, reply *string) (err error) { +func (adms *AdminSv1) SetTrendProfile(ctx *context.Context, arg *utils.TrendProfileWithAPIOpts, reply *string) (err error) { if missing := utils.MissingStructFields(arg.TrendProfile, []string{utils.ID}); len(missing) != 0 { return utils.NewErrMandatoryIeMissing(missing...) } @@ -140,30 +139,3 @@ func (adms *AdminSv1) RemoveTrendProfile(ctx *context.Context, args *utils.Tenan *reply = utils.OK return nil } - -// NewTrendSv1 initializes TrendSV1 -func NewTrendSv1(trS *engine.TrendS) *TrendSv1 { - return &TrendSv1{trS: trS} -} - -// TrendSv1 exports RPC from RLs -type TrendSv1 struct { - ping - trS *engine.TrendS -} - -func (trs *TrendSv1) ScheduleQueries(ctx *context.Context, args *utils.ArgScheduleTrendQueries, scheduled *int) error { - return trs.trS.V1ScheduleQueries(ctx, args, scheduled) -} - -func (trs *TrendSv1) GetTrend(ctx *context.Context, args *utils.ArgGetTrend, trend *engine.Trend) error { - return trs.trS.V1GetTrend(ctx, args, trend) -} - -func (trs *TrendSv1) GetScheduledTrends(ctx *context.Context, args *utils.ArgScheduledTrends, schedTrends *[]utils.ScheduledTrend) error { - return trs.trS.V1GetScheduledTrends(ctx, args, schedTrends) -} - -func (trs *TrendSv1) GetTrendSummary(ctx *context.Context, arg utils.TenantIDWithAPIOpts, reply *engine.TrendSummary) error { - return trs.trS.V1GetTrendSummary(ctx, arg, reply) -} diff --git a/engine/datadbmock.go b/engine/datadbmock.go index 00cb02de5..2c2e9e7c4 100644 --- a/engine/datadbmock.go +++ b/engine/datadbmock.go @@ -48,8 +48,8 @@ type DataDBMock struct { RemoveResourceProfileDrvF func(ctx *context.Context, tnt, id string) error RemoveResourceDrvF func(ctx *context.Context, tnt, id string) error SetResourceDrvF func(ctx *context.Context, r *Resource) error - SetTrendProfileDrvF func(ctx *context.Context, tr *TrendProfile) (err error) - GetTrendProfileDrvF func(ctx *context.Context, tenant string, id string) (sq *TrendProfile, err error) + SetTrendProfileDrvF func(ctx *context.Context, tr *utils.TrendProfile) (err error) + GetTrendProfileDrvF func(ctx *context.Context, tenant string, id string) (sq *utils.TrendProfile, err error) RemTrendProfileDrvF func(ctx *context.Context, tenant string, id string) (err error) SetRankingProfileDrvF func(ctx *context.Context, sq *RankingProfile) (err error) GetRankingProfileDrvF func(ctx *context.Context, tenant string, id string) (sq *RankingProfile, err error) @@ -261,14 +261,14 @@ func (dbM *DataDBMock) RemoveRankingDrv(ctx *context.Context, _ string, _ string return utils.ErrNotImplemented } -func (dbM *DataDBMock) GetTrendProfileDrv(ctx *context.Context, tenant, id string) (sg *TrendProfile, err error) { +func (dbM *DataDBMock) GetTrendProfileDrv(ctx *context.Context, tenant, id string) (sg *utils.TrendProfile, err error) { if dbM.GetStatQueueProfileDrvF != nil { return dbM.GetTrendProfileDrvF(ctx, tenant, id) } return nil, utils.ErrNotImplemented } -func (dbM *DataDBMock) SetTrendProfileDrv(ctx *context.Context, trend *TrendProfile) (err error) { +func (dbM *DataDBMock) SetTrendProfileDrv(ctx *context.Context, trend *utils.TrendProfile) (err error) { if dbM.SetTrendProfileDrvF(ctx, trend) != nil { return dbM.SetTrendProfileDrvF(ctx, trend) } @@ -451,11 +451,11 @@ func (dbM *DataDBMock) RemoveRateProfileDrv(ctx *context.Context, str1 string, s return utils.ErrNotImplemented } -func (dbM *DataDBMock) GetTrendDrv(ctx *context.Context, tenant, id string) (*Trend, error) { +func (dbM *DataDBMock) GetTrendDrv(ctx *context.Context, tenant, id string) (*utils.Trend, error) { return nil, utils.ErrNotImplemented } -func (dbM *DataDBMock) SetTrendDrv(ctx *context.Context, tr *Trend) error { +func (dbM *DataDBMock) SetTrendDrv(ctx *context.Context, tr *utils.Trend) error { return utils.ErrNotImplemented } diff --git a/engine/datamanager.go b/engine/datamanager.go index 4ff08dbe5..4595c4d78 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -949,7 +949,7 @@ func (dm *DataManager) RemoveStatQueueProfile(ctx *context.Context, tenant, id s // GetTrend retrieves a Trend from dataDB func (dm *DataManager) GetTrend(ctx *context.Context, tenant, id string, - cacheRead, cacheWrite bool, transactionID string) (tr *Trend, err error) { + cacheRead, cacheWrite bool, transactionID string) (tr *utils.Trend, err error) { tntID := utils.ConcatenatedKey(tenant, id) if cacheRead { @@ -957,7 +957,7 @@ func (dm *DataManager) GetTrend(ctx *context.Context, tenant, id string, if x == nil { return nil, utils.ErrNotFound } - return x.(*Trend), nil + return x.(*utils.Trend), nil } } if dm == nil { @@ -996,7 +996,7 @@ func (dm *DataManager) GetTrend(ctx *context.Context, tenant, id string, return } } - if err = tr.uncompress(dm.ms); err != nil { + if err = tr.Uncompress(dm.ms); err != nil { return nil, err } if cacheWrite { @@ -1010,12 +1010,12 @@ func (dm *DataManager) GetTrend(ctx *context.Context, tenant, id string, } // SetTrend stores Trend in dataDB -func (dm *DataManager) SetTrend(ctx *context.Context, tr *Trend) (err error) { +func (dm *DataManager) SetTrend(ctx *context.Context, tr *utils.Trend) (err error) { if dm == nil { return utils.ErrNoDatabaseConn } if dm.dataDB.GetStorageType() != utils.MetaInternal { - if tr, err = tr.compress(dm.ms); err != nil { + if tr, err = tr.Compress(dm.ms, dm.cfg.TrendSCfg().StoreUncompressedLimit); err != nil { return } } @@ -1027,7 +1027,7 @@ func (dm *DataManager) SetTrend(ctx *context.Context, tr *Trend) (err error) { dm.cfg.DataDbCfg().RplFiltered, utils.TrendPrefix, tr.TenantID(), // this are used to get the host IDs from cache utils.ReplicatorSv1SetTrend, - &TrendWithAPIOpts{ + &utils.TrendWithAPIOpts{ Trend: tr, APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, dm.cfg.DataDbCfg().RplCache, utils.EmptyString)}); err != nil { @@ -1059,14 +1059,14 @@ func (dm *DataManager) RemoveTrend(ctx *context.Context, tenant, id string) (err } func (dm *DataManager) GetTrendProfile(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool, - transactionID string) (trp *TrendProfile, err error) { + transactionID string) (trp *utils.TrendProfile, err error) { tntID := utils.ConcatenatedKey(tenant, id) if cacheRead { if x, ok := Cache.Get(utils.CacheTrendProfiles, tntID); ok { if x == nil { return nil, utils.ErrNotFound } - return x.(*TrendProfile), nil + return x.(*utils.TrendProfile), nil } } if dm == nil { @@ -1140,7 +1140,7 @@ func (dm *DataManager) GetTrendProfileIDs(ctx *context.Context, tenants []string return } -func (dm *DataManager) SetTrendProfile(ctx *context.Context, trp *TrendProfile) (err error) { +func (dm *DataManager) SetTrendProfile(ctx *context.Context, trp *utils.TrendProfile) (err error) { if dm == nil { return utils.ErrNoDatabaseConn } @@ -1156,7 +1156,7 @@ func (dm *DataManager) SetTrendProfile(ctx *context.Context, trp *TrendProfile) dm.cfg.DataDbCfg().RplFiltered, utils.TrendProfilePrefix, trp.TenantID(), utils.ReplicatorSv1SetTrendProfile, - &TrendProfileWithAPIOpts{ + &utils.TrendProfileWithAPIOpts{ TrendProfile: trp, APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, dm.cfg.DataDbCfg().RplCache, utils.EmptyString)}) @@ -1164,7 +1164,7 @@ func (dm *DataManager) SetTrendProfile(ctx *context.Context, trp *TrendProfile) if oldTrd == nil || oldTrd.QueueLength != trp.QueueLength || oldTrd.Schedule != trp.Schedule { - if err = dm.SetTrend(ctx, NewTrendFromProfile(trp)); err != nil { + if err = dm.SetTrend(ctx, utils.NewTrendFromProfile(trp)); err != nil { return } } diff --git a/engine/dynamicdp.go b/engine/dynamicdp.go index 7b328d130..d9d1552d6 100644 --- a/engine/dynamicdp.go +++ b/engine/dynamicdp.go @@ -131,7 +131,7 @@ func (dDP *dynamicDP) fieldAsInterface(fldPath []string) (val any, err error) { case utils.MetaTrends: //sample of fieldName : ~*trends.TrendID.Metrics.*acd.Value - var trendSum TrendSummary + var trendSum utils.TrendSummary if err := connMgr.Call(context.TODO(), dDP.trdConns, utils.TrendSv1GetTrendSummary, &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: dDP.tenant, ID: fldPath[1]}}, &trendSum); err != nil { return nil, err } diff --git a/engine/filters_test.go b/engine/filters_test.go index 2e73f6ea0..c1fa55d01 100644 --- a/engine/filters_test.go +++ b/engine/filters_test.go @@ -2533,11 +2533,11 @@ func TestFilterTrends(t *testing.T) { if argGetTrend.ID == "Trend1" && argGetTrend.Tenant == "cgrates.org" { now := time.Now() now2 := now.Add(time.Second) - tr := Trend{ + tr := utils.Trend{ Tenant: "cgrates.org", ID: "Trend1", RunTimes: []time.Time{now, now2}, - Metrics: map[time.Time]map[string]*MetricWithTrend{ + Metrics: map[time.Time]map[string]*utils.MetricWithTrend{ now: { "*acc": {ID: "*acc", Value: 45, TrendGrowth: -1.0, TrendLabel: utils.NotAvailable}, "*acd": {ID: "*acd", Value: 50, TrendGrowth: -1.0, TrendLabel: utils.NotAvailable}, @@ -2548,8 +2548,8 @@ func TestFilterTrends(t *testing.T) { }, }, } - trS := tr.asTrendSummary() - *reply.(*TrendSummary) = *trS + trS := tr.AsTrendSummary() + *reply.(*utils.TrendSummary) = *trS return nil } return utils.ErrNotFound diff --git a/engine/model_helpers.go b/engine/model_helpers.go index b1d70e393..35c97bec9 100644 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -820,8 +820,8 @@ func APItoModelTrends(tr *utils.TPTrendsProfile) (mdls TrendMdls) { return } -func APItoTrends(tpTR *utils.TPTrendsProfile) (tr *TrendProfile, err error) { - tr = &TrendProfile{ +func APItoTrends(tpTR *utils.TPTrendsProfile) (tr *utils.TrendProfile, err error) { + tr = &utils.TrendProfile{ Tenant: tpTR.Tenant, ID: tpTR.ID, StatID: tpTR.StatID, @@ -844,7 +844,7 @@ func APItoTrends(tpTR *utils.TPTrendsProfile) (tr *TrendProfile, err error) { return } -func TrendProfileToAPI(tr *TrendProfile) (tpTR *utils.TPTrendsProfile) { +func TrendProfileToAPI(tr *utils.TrendProfile) (tpTR *utils.TPTrendsProfile) { tpTR = &utils.TPTrendsProfile{ Tenant: tr.Tenant, ID: tr.ID, diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 17860a890..98e405113 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -70,11 +70,11 @@ type DataDB interface { SetRankingDrv(ctx *context.Context, rn *Ranking) (err error) GetRankingDrv(ctx *context.Context, tenant string, id string) (sq *Ranking, err error) RemoveRankingDrv(ctx *context.Context, tenant string, id string) (err error) - SetTrendProfileDrv(ctx *context.Context, sq *TrendProfile) (err error) - GetTrendProfileDrv(ctx *context.Context, tenant string, id string) (sq *TrendProfile, err error) + SetTrendProfileDrv(ctx *context.Context, sq *utils.TrendProfile) (err error) + GetTrendProfileDrv(ctx *context.Context, tenant string, id string) (sq *utils.TrendProfile, err error) RemTrendProfileDrv(ctx *context.Context, tenant string, id string) (err error) - GetTrendDrv(ctx *context.Context, tenant string, id string) (*Trend, error) - SetTrendDrv(ctx *context.Context, tr *Trend) error + GetTrendDrv(ctx *context.Context, tenant string, id string) (*utils.Trend, error) + SetTrendDrv(ctx *context.Context, tr *utils.Trend) error RemoveTrendDrv(ctx *context.Context, tenant string, id string) error GetFilterDrv(ctx *context.Context, tnt string, id string) (*Filter, error) SetFilterDrv(ctx *context.Context, f *Filter) error diff --git a/engine/storage_internal_datadb.go b/engine/storage_internal_datadb.go index 5657556d0..3d63e75d1 100644 --- a/engine/storage_internal_datadb.go +++ b/engine/storage_internal_datadb.go @@ -326,7 +326,7 @@ func (iDB *InternalDB) GetThresholdProfileDrv(_ *context.Context, tenant, id str return x.(*ThresholdProfile), nil } -func (iDB *InternalDB) SetTrendProfileDrv(_ *context.Context, srp *TrendProfile) (err error) { +func (iDB *InternalDB) SetTrendProfileDrv(_ *context.Context, srp *utils.TrendProfile) (err error) { iDB.db.Set(utils.CacheTrendProfiles, srp.TenantID(), srp, nil, true, utils.NonTransactional) return nil } @@ -336,23 +336,23 @@ func (iDB *InternalDB) RemTrendProfileDrv(_ *context.Context, tenant, id string) return nil } -func (iDB *InternalDB) GetTrendProfileDrv(_ *context.Context, tenant, id string) (sg *TrendProfile, err error) { +func (iDB *InternalDB) GetTrendProfileDrv(_ *context.Context, tenant, id string) (sg *utils.TrendProfile, err error) { x, ok := iDB.db.Get(utils.CacheTrendProfiles, utils.ConcatenatedKey(tenant, id)) if !ok || x == nil { return nil, utils.ErrNotFound } - return x.(*TrendProfile), nil + return x.(*utils.TrendProfile), nil } -func (iDB *InternalDB) GetTrendDrv(_ *context.Context, tenant, id string) (th *Trend, err error) { +func (iDB *InternalDB) GetTrendDrv(_ *context.Context, tenant, id string) (th *utils.Trend, err error) { x, ok := iDB.db.Get(utils.CacheTrends, utils.ConcatenatedKey(tenant, id)) if !ok || x == nil { return nil, utils.ErrNotFound } - return x.(*Trend), nil + return x.(*utils.Trend), nil } -func (iDB *InternalDB) SetTrendDrv(_ *context.Context, tr *Trend) (err error) { +func (iDB *InternalDB) SetTrendDrv(_ *context.Context, tr *utils.Trend) (err error) { iDB.db.Set(utils.CacheTrends, tr.TenantID(), tr, nil, true, utils.NonTransactional) return diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index baff8fb7a..e06b46130 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -804,8 +804,8 @@ func (ms *MongoStorage) RemoveRankingDrv(ctx *context.Context, tenant, id string }) } -func (ms *MongoStorage) GetTrendProfileDrv(ctx *context.Context, tenant, id string) (*TrendProfile, error) { - srProfile := new(TrendProfile) +func (ms *MongoStorage) GetTrendProfileDrv(ctx *context.Context, tenant, id string) (*utils.TrendProfile, error) { + srProfile := new(utils.TrendProfile) err := ms.query(ctx, func(sctx mongo.SessionContext) error { sr := ms.getCol(ColTrs).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) decodeErr := sr.Decode(srProfile) @@ -817,7 +817,7 @@ func (ms *MongoStorage) GetTrendProfileDrv(ctx *context.Context, tenant, id stri return srProfile, err } -func (ms *MongoStorage) SetTrendProfileDrv(ctx *context.Context, srp *TrendProfile) (err error) { +func (ms *MongoStorage) SetTrendProfileDrv(ctx *context.Context, srp *utils.TrendProfile) (err error) { return ms.query(ctx, func(sctx mongo.SessionContext) error { _, err := ms.getCol(ColTrs).UpdateOne(sctx, bson.M{"tenant": srp.Tenant, "id": srp.ID}, bson.M{"$set": srp}, @@ -836,8 +836,8 @@ func (ms *MongoStorage) RemTrendProfileDrv(ctx *context.Context, tenant, id stri }) } -func (ms *MongoStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (*Trend, error) { - tr := new(Trend) +func (ms *MongoStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (*utils.Trend, error) { + tr := new(utils.Trend) err := ms.query(ctx, func(sctx mongo.SessionContext) error { sr := ms.getCol(ColTrd).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) decodeErr := sr.Decode(tr) @@ -849,7 +849,7 @@ func (ms *MongoStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (*T return tr, err } -func (ms *MongoStorage) SetTrendDrv(ctx *context.Context, tr *Trend) error { +func (ms *MongoStorage) SetTrendDrv(ctx *context.Context, tr *utils.Trend) error { return ms.query(ctx, func(sctx mongo.SessionContext) error { _, err := ms.getCol(ColTrd).UpdateOne(sctx, bson.M{"tenant": tr.Tenant, "id": tr.ID}, bson.M{"$set": tr}, diff --git a/engine/storage_redis.go b/engine/storage_redis.go index a61e4b2db..fafd70b49 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -529,7 +529,7 @@ func (rs *RedisStorage) RemStatQueueDrv(ctx *context.Context, tenant, id string) return rs.Cmd(nil, redisDEL, utils.StatQueuePrefix+utils.ConcatenatedKey(tenant, id)) } -func (rs *RedisStorage) SetTrendProfileDrv(ctx *context.Context, sg *TrendProfile) (err error) { +func (rs *RedisStorage) SetTrendProfileDrv(ctx *context.Context, sg *utils.TrendProfile) (err error) { var result []byte if result, err = rs.ms.Marshal(sg); err != nil { return @@ -537,7 +537,7 @@ func (rs *RedisStorage) SetTrendProfileDrv(ctx *context.Context, sg *TrendProfil return rs.Cmd(nil, redisSET, utils.TrendProfilePrefix+utils.ConcatenatedKey(sg.Tenant, sg.ID), string(result)) } -func (rs *RedisStorage) GetTrendProfileDrv(ctx *context.Context, tenant string, id string) (sg *TrendProfile, err error) { +func (rs *RedisStorage) GetTrendProfileDrv(ctx *context.Context, tenant string, id string) (sg *utils.TrendProfile, err error) { var values []byte if err = rs.Cmd(&values, redisGET, utils.TrendProfilePrefix+utils.ConcatenatedKey(tenant, id)); err != nil { return @@ -553,7 +553,7 @@ func (rs *RedisStorage) RemTrendProfileDrv(ctx *context.Context, tenant string, return rs.Cmd(nil, redisDEL, utils.TrendProfilePrefix+utils.ConcatenatedKey(tenant, id)) } -func (rs *RedisStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (r *Trend, err error) { +func (rs *RedisStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (r *utils.Trend, err error) { var values []byte if err = rs.Cmd(&values, redisGET, utils.TrendPrefix+utils.ConcatenatedKey(tenant, id)); err != nil { return @@ -565,7 +565,7 @@ func (rs *RedisStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (r return } -func (rs *RedisStorage) SetTrendDrv(ctx *context.Context, r *Trend) (err error) { +func (rs *RedisStorage) SetTrendDrv(ctx *context.Context, r *utils.Trend) (err error) { var result []byte if result, err = rs.ms.Marshal(r); err != nil { return diff --git a/engine/tpreader.go b/engine/tpreader.go index c5b5efcd2..948f851c7 100644 --- a/engine/tpreader.go +++ b/engine/tpreader.go @@ -447,7 +447,7 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) { log.Print("TrendProfiles:") } for _, tpTR := range tpr.trProfiles { - var tr *TrendProfile + var tr *utils.TrendProfile if tr, err = APItoTrends(tpTR); err != nil { return } diff --git a/general_tests/trends_schedule_it_test.go b/general_tests/trends_schedule_it_test.go index 8fcb20bf5..8041ad102 100644 --- a/general_tests/trends_schedule_it_test.go +++ b/general_tests/trends_schedule_it_test.go @@ -119,9 +119,9 @@ cgrates.org,Threshold2,*string:~*req.Metrics.*pdd.ID:*pdd,;10,-1,0,1s,false,,tru client, _ := ng.Run(t) time.Sleep(100 * time.Millisecond) - var tr *engine.Trend + var tr *utils.Trend t.Run("TrendSchedule", func(t *testing.T) { - var replyTrendProfiles *[]*engine.TrendProfile + var replyTrendProfiles *[]*utils.TrendProfile if err := client.Call(context.Background(), utils.AdminSv1GetTrendProfiles, &utils.ArgsItemIDs{ Tenant: "cgrates.org", @@ -265,7 +265,7 @@ cgrates.org,Threshold2,*string:~*req.Metrics.*pdd.ID:*pdd,;10,-1,0,1s,false,,tru } // GetTrend with RunIndexEnd large than Runtimes length - var tr *engine.Trend + var tr *utils.Trend if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_2", RunIndexEnd: 3, TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr); err != nil { t.Error(err) } else if len(tr.RunTimes) != 2 || len(tr.Metrics) != 2 { @@ -298,7 +298,7 @@ cgrates.org,Threshold2,*string:~*req.Metrics.*pdd.ID:*pdd,;10,-1,0,1s,false,,tru time.Sleep(2 * time.Second) t.Run("GetTrends", func(t *testing.T) { // GetTrend with all correctParameters - var tr *engine.Trend + var tr *utils.Trend timeStart := time.Now().Add(-4 * time.Second).Format(time.RFC3339) timeEnd := time.Now().Add(-1 * time.Second).Format(time.RFC3339) if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", RunIndexStart: 1, RunIndexEnd: 3, RunTimeStart: timeStart, RunTimeEnd: timeEnd, TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr); err != nil { @@ -316,7 +316,7 @@ cgrates.org,Threshold2,*string:~*req.Metrics.*pdd.ID:*pdd,;10,-1,0,1s,false,,tru if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_2", RunTimeStart: timeStart, RunTimeEnd: timeEnd, TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } - var tr2 engine.Trend + var tr2 utils.Trend // GetTrend with both index args if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", RunIndexStart: 3, RunIndexEnd: 4, TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr2); err != nil { t.Error(err) diff --git a/general_tests/trends_stored_it_test.go b/general_tests/trends_stored_it_test.go index c3e1b7a97..b23b42b72 100644 --- a/general_tests/trends_stored_it_test.go +++ b/general_tests/trends_stored_it_test.go @@ -121,12 +121,12 @@ cgrates.org,Stats1_1,*string:~*req.Account:1001,,,,-1,,,,*tcc;*acd;*tcd,,`} }) t.Run("GetTrendsAfterStoreInterval", func(t *testing.T) { - metricsChan := make(chan *engine.Trend, 1) + metricsChan := make(chan *utils.Trend, 1) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() go func() { ticker := time.NewTicker(600 * time.Millisecond) - var trnd engine.Trend + var trnd utils.Trend for { select { case <-ticker.C: @@ -184,7 +184,7 @@ cgrates.org,Stats1_1,*string:~*req.Account:1001,,,,-1,,,,*tcc;*acd;*tcd,,`} }) t.Run("GetTrendsNotStored", func(t *testing.T) { - metricsChan := make(chan *engine.Trend, 1) + metricsChan := make(chan *utils.Trend, 1) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() go func() { @@ -192,7 +192,7 @@ cgrates.org,Stats1_1,*string:~*req.Account:1001,,,,-1,,,,*tcc;*acd;*tcd,,`} ticker := time.NewTicker(700 * time.Millisecond) select { case <-ticker.C: - var trnd engine.Trend + var trnd utils.Trend // the trend will be not updated since storeinterval is set to 0 err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &trnd) if err != nil { @@ -245,7 +245,7 @@ cgrates.org,Stats1_1,*string:~*req.Account:1001,,,,-1,,,,*tcc;*acd;*tcd,,`} } }) t.Run("GetTrendsStoredUnlimited", func(t *testing.T) { - metricsChan := make(chan *engine.Trend, 1) + metricsChan := make(chan *utils.Trend, 1) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() go func() { @@ -253,7 +253,7 @@ cgrates.org,Stats1_1,*string:~*req.Account:1001,,,,-1,,,,*tcc;*acd;*tcd,,`} for { select { case <-ticker.C: - var trnd engine.Trend + var trnd utils.Trend err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &trnd) if err != nil { if err.Error() != utils.ErrNotFound.Error() { diff --git a/loaders/libloader.go b/loaders/libloader.go index d35f5f61b..1102e84cc 100644 --- a/loaders/libloader.go +++ b/loaders/libloader.go @@ -312,7 +312,7 @@ func newProfileFunc(lType string) func() profile { } case utils.MetaTrends: return func() profile { - return new(engine.TrendProfile) + return new(utils.TrendProfile) } case utils.MetaRankings: return func() profile { diff --git a/loaders/loader.go b/loaders/loader.go index f3ebc62c8..a5318715c 100644 --- a/loaders/loader.go +++ b/loaders/loader.go @@ -110,7 +110,7 @@ func setToDB(ctx *context.Context, dm *engine.DataManager, lType string, data pr case utils.MetaAccounts: return dm.SetAccount(ctx, data.(*utils.Account), withIndex) case utils.MetaTrends: - return dm.SetTrendProfile(ctx, data.(*engine.TrendProfile)) + return dm.SetTrendProfile(ctx, data.(*utils.TrendProfile)) case utils.MetaRankings: return dm.SetRankingProfile(ctx, data.(*engine.RankingProfile)) } diff --git a/services/trends.go b/services/trends.go index 99eafdf6c..cca91606d 100644 --- a/services/trends.go +++ b/services/trends.go @@ -25,6 +25,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/trends" "github.com/cgrates/cgrates/utils" ) @@ -39,7 +40,7 @@ func NewTrendService(cfg *config.CGRConfig) *TrendService { type TrendService struct { mu sync.RWMutex cfg *config.CGRConfig - trs *engine.TrendS + trs *trends.TrendS stateDeps *StateDependencies // channel subscriptions for state changes } @@ -70,7 +71,7 @@ func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager trs.mu.Lock() defer trs.mu.Unlock() - trs.trs = engine.NewTrendService(dbs.DataManager(), trs.cfg, fs.FilterS(), cms.ConnManager()) + trs.trs = trends.NewTrendService(dbs.DataManager(), trs.cfg, fs.FilterS(), cms.ConnManager()) if err := trs.trs.StartTrendS(context.TODO()); err != nil { return err } diff --git a/tpes/tpe_trends.go b/tpes/tpe_trends.go index 393d5155e..2b8b3f925 100644 --- a/tpes/tpe_trends.go +++ b/tpes/tpe_trends.go @@ -49,7 +49,7 @@ func (tpSts TPTrends) exportItems(ctx *context.Context, wrtr io.Writer, tnt stri return } for _, trendsID := range itmIDs { - var trendPrf *engine.TrendProfile + var trendPrf *utils.TrendProfile trendPrf, err = tpSts.dm.GetTrendProfile(ctx, tnt, trendsID, true, true, utils.NonTransactional) if err != nil { if err.Error() == utils.ErrNotFound.Error() { diff --git a/tpes/tpe_trends_test.go b/tpes/tpe_trends_test.go index 55334ff87..0bc45df44 100644 --- a/tpes/tpe_trends_test.go +++ b/tpes/tpe_trends_test.go @@ -34,8 +34,8 @@ func TestTPEnewTPTrends(t *testing.T) { cfg := config.NewDefaultCGRConfig() connMng := engine.NewConnManager(cfg) dm := engine.NewDataManager(&engine.DataDBMock{ - GetTrendProfileDrvF: func(ctx *context.Context, tnt string, id string) (*engine.TrendProfile, error) { - trd := &engine.TrendProfile{ + GetTrendProfileDrvF: func(ctx *context.Context, tnt string, id string) (*utils.TrendProfile, error) { + trd := &utils.TrendProfile{ Tenant: "cgrates.org", ID: "TRD_2", } @@ -59,7 +59,7 @@ func TestTPEExportTrends(t *testing.T) { tpTrd := TPTrends{ dm: dm, } - trd := &engine.TrendProfile{ + trd := &utils.TrendProfile{ Tenant: "cgrates.org", ID: "TRD_2", } @@ -76,7 +76,7 @@ func TestTPEExportItemsTrendsNoDbConn(t *testing.T) { tpTrd := TPTrends{ dm: nil, } - trd := &engine.TrendProfile{ + trd := &utils.TrendProfile{ Tenant: "cgrates.org", ID: "TRD_2", } @@ -95,7 +95,7 @@ func TestTPEExportItemsTrendsIDNotFound(t *testing.T) { tpTrd := TPTrends{ dm: dm, } - trd := &engine.TrendProfile{ + trd := &utils.TrendProfile{ Tenant: "cgrates.org", ID: "TRD_2", } diff --git a/trends/apis.go b/trends/apis.go new file mode 100644 index 000000000..f03d2aa67 --- /dev/null +++ b/trends/apis.go @@ -0,0 +1,154 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package trends + +import ( + "slices" + "strings" + "time" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/cron" +) + +// V1ScheduleQueries manually schedules or reschedules trend queries. +func (tS *TrendS) V1ScheduleQueries(ctx *context.Context, args *utils.ArgScheduleTrendQueries, scheduled *int) (err error) { + if sched, errSched := tS.scheduleTrendQueries(ctx, args.Tenant, args.TrendIDs); errSched != nil { + return errSched + } else { + *scheduled = sched + } + return +} + +// V1GetTrend retrieves trend metrics with optional time and index filtering. +func (tS *TrendS) V1GetTrend(ctx *context.Context, arg *utils.ArgGetTrend, retTrend *utils.Trend) (err error) { + if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) + } + var trnd *utils.Trend + if trnd, err = tS.dm.GetTrend(ctx, arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil { + return + } + trnd.RLock() + defer trnd.RUnlock() + retTrend.Tenant = trnd.Tenant // avoid vet complaining for mutex copying + retTrend.ID = trnd.ID + startIdx := arg.RunIndexStart + if startIdx > len(trnd.RunTimes) { + startIdx = len(trnd.RunTimes) + } + endIdx := arg.RunIndexEnd + if endIdx > len(trnd.RunTimes) || + endIdx < startIdx || + endIdx == 0 { + endIdx = len(trnd.RunTimes) + } + runTimes := trnd.RunTimes[startIdx:endIdx] + if len(runTimes) == 0 { + return utils.ErrNotFound + } + var tStart, tEnd time.Time + if arg.RunTimeStart == utils.EmptyString { + tStart = runTimes[0] + } else if tStart, err = utils.ParseTimeDetectLayout(arg.RunTimeStart, tS.cfg.GeneralCfg().DefaultTimezone); err != nil { + return + } + if arg.RunTimeEnd == utils.EmptyString { + tEnd = runTimes[len(runTimes)-1].Add(time.Duration(1)) + } else if tEnd, err = utils.ParseTimeDetectLayout(arg.RunTimeEnd, tS.cfg.GeneralCfg().DefaultTimezone); err != nil { + return + } + retTrend.RunTimes = make([]time.Time, 0, len(runTimes)) + for _, runTime := range runTimes { + if !runTime.Before(tStart) && runTime.Before(tEnd) { + retTrend.RunTimes = append(retTrend.RunTimes, runTime) + } + } + if len(retTrend.RunTimes) == 0 { // filtered out all + return utils.ErrNotFound + } + retTrend.Metrics = make(map[time.Time]map[string]*utils.MetricWithTrend) + for _, runTime := range retTrend.RunTimes { + retTrend.Metrics[runTime] = trnd.Metrics[runTime] + } + return +} + +// V1GetScheduledTrends retrieves information about currently scheduled trends. +func (tS *TrendS) V1GetScheduledTrends(ctx *context.Context, args *utils.ArgScheduledTrends, schedTrends *[]utils.ScheduledTrend) (err error) { + tnt := args.Tenant + if tnt == utils.EmptyString { + tnt = tS.cfg.GeneralCfg().DefaultTenant + } + tS.crnTQsMux.RLock() + defer tS.crnTQsMux.RUnlock() + trendIDsMp, has := tS.crnTQs[tnt] + if !has { + return utils.ErrNotFound + } + var scheduledTrends []utils.ScheduledTrend + var entryIds map[string]cron.EntryID + if len(args.TrendIDPrefixes) == 0 { + entryIds = trendIDsMp + } else { + entryIds = make(map[string]cron.EntryID) + for _, tID := range args.TrendIDPrefixes { + for key, entryID := range trendIDsMp { + if strings.HasPrefix(key, tID) { + entryIds[key] = entryID + } + } + } + } + if len(entryIds) == 0 { + return utils.ErrNotFound + } + var entry cron.Entry + for id, entryID := range entryIds { + entry = tS.crn.Entry(entryID) + if entry.ID == 0 { + continue + } + scheduledTrends = append(scheduledTrends, utils.ScheduledTrend{ + TrendID: id, + Next: entry.Next, + Previous: entry.Prev, + }) + } + slices.SortFunc(scheduledTrends, func(a, b utils.ScheduledTrend) int { + return a.Next.Compare(b.Next) + }) + *schedTrends = scheduledTrends + return nil +} + +// V1GetTrendSummary retrieves the most recent trend summary. +func (tS *TrendS) V1GetTrendSummary(ctx *context.Context, arg utils.TenantIDWithAPIOpts, reply *utils.TrendSummary) (err error) { + var trnd *utils.Trend + if trnd, err = tS.dm.GetTrend(ctx, arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil { + return + } + trnd.RLock() + trndS := trnd.AsTrendSummary() + trnd.RUnlock() + *reply = *trndS + return +} diff --git a/engine/trends.go b/trends/trends.go similarity index 63% rename from engine/trends.go rename to trends/trends.go index 9c3003482..1c6eab197 100644 --- a/engine/trends.go +++ b/trends/trends.go @@ -16,24 +16,44 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -package engine +package trends import ( "fmt" "runtime" - "slices" - "strings" "sync" "time" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/cgrates/cron" ) -func NewTrendService(dm *DataManager, - cgrcfg *config.CGRConfig, filterS *FilterS, connMgr *ConnManager) (tS *TrendS) { +// TrendS implements the logic for processing and managing trends based on stat metrics. +type TrendS struct { + dm *engine.DataManager + cfg *config.CGRConfig + fltrS *engine.FilterS + connMgr *engine.ConnManager + + crn *cron.Cron // cron refernce + + crnTQsMux *sync.RWMutex // protects the crnTQs + crnTQs map[string]map[string]cron.EntryID // save the EntryIDs for TrendQueries so we can reschedule them when needed + + storedTrends utils.StringSet // keep a record of trends which need saving, map[trendTenantID]bool + sTrndsMux sync.RWMutex // protects storedTrends + storingStopped chan struct{} // signal back that the operations were stopped + + loopStopped chan struct{} + trendStop chan struct{} // signal to stop all operations +} + +// NewTrendService creates a new TrendS service. +func NewTrendService(dm *engine.DataManager, + cgrcfg *config.CGRConfig, filterS *engine.FilterS, connMgr *engine.ConnManager) (tS *TrendS) { return &TrendS{ dm: dm, cfg: cgrcfg, @@ -49,30 +69,9 @@ func NewTrendService(dm *DataManager, } } -// TrendS is responsible of implementing the logic of TrendService -type TrendS struct { - dm *DataManager - cfg *config.CGRConfig - fltrS *FilterS - connMgr *ConnManager - - crn *cron.Cron // cron refernce - - crnTQsMux *sync.RWMutex // protects the crnTQs - crnTQs map[string]map[string]cron.EntryID // save the EntryIDs for TrendQueries so we can reschedule them when needed - - storedTrends utils.StringSet // keep a record of trends which need saving, map[trendTenantID]bool - sTrndsMux sync.RWMutex // protects storedTrends - storingStopped chan struct{} // signal back that the operations were stopped - - loopStopped chan struct{} - trendStop chan struct{} // signal to stop all operations -} - -// computeTrend will query a stat and build the Trend for it -// -// it is to be called by Cron service -func (tS *TrendS) computeTrend(ctx *context.Context, tP *TrendProfile) { +// computeTrend queries a stat and builds the Trend for it based on the TrendProfile configuration. +// Called by Cron service at scheduled intervals. +func (tS *TrendS) computeTrend(ctx *context.Context, tP *utils.TrendProfile) { var floatMetrics map[string]float64 if err := tS.connMgr.Call(context.Background(), tS.cfg.TrendSCfg().StatSConns, utils.StatSv1GetQueueFloatMetrics, @@ -92,10 +91,10 @@ func (tS *TrendS) computeTrend(ctx *context.Context, tP *TrendProfile) { utils.TrendS, tP.Tenant, tP.ID, err.Error())) return } - trnd.tMux.Lock() - defer trnd.tMux.Unlock() - if trnd.tPrfl == nil { - trnd.tPrfl = tP + trnd.Lock() + defer trnd.Unlock() + if trnd.Config() == nil { + trnd.SetConfig(tP) } trnd.Compile(tP.TTL, tP.QueueLength) now := time.Now() @@ -113,25 +112,25 @@ func (tS *TrendS) computeTrend(ctx *context.Context, tP *TrendProfile) { } trnd.RunTimes = append(trnd.RunTimes, now) if trnd.Metrics == nil { - trnd.Metrics = make(map[time.Time]map[string]*MetricWithTrend) + trnd.Metrics = make(map[time.Time]map[string]*utils.MetricWithTrend) } - trnd.Metrics[now] = make(map[string]*MetricWithTrend) + trnd.Metrics[now] = make(map[string]*utils.MetricWithTrend) for _, mID := range metrics { - mWt := &MetricWithTrend{ID: mID} + mWt := &utils.MetricWithTrend{ID: mID} var has bool if mWt.Value, has = floatMetrics[mID]; !has { // no stats computed for metric mWt.Value = -1.0 mWt.TrendLabel = utils.NotAvailable continue } - if mWt.TrendGrowth, err = trnd.getTrendGrowth(mID, mWt.Value, tP.CorrelationType, + if mWt.TrendGrowth, err = trnd.GetTrendGrowth(mID, mWt.Value, tP.CorrelationType, tS.cfg.GeneralCfg().RoundingDecimals); err != nil { mWt.TrendLabel = utils.NotAvailable } else { - mWt.TrendLabel = trnd.getTrendLabel(mWt.TrendGrowth, tP.Tolerance) + mWt.TrendLabel = utils.GetTrendLabel(mWt.TrendGrowth, tP.Tolerance) } trnd.Metrics[now][mWt.ID] = mWt - trnd.indexesAppendMetric(mWt, now) + trnd.IndexesAppendMetric(mWt, now) } if err = tS.storeTrend(ctx, trnd); err != nil { utils.Logger.Warning( @@ -155,10 +154,10 @@ func (tS *TrendS) computeTrend(ctx *context.Context, tP *TrendProfile) { } -// processThresholds will pass the Trend event to ThresholdS -func (tS *TrendS) processThresholds(trnd *Trend) (err error) { +// processThresholds sends the computed trend to ThresholdS. +func (tS *TrendS) processThresholds(trnd *utils.Trend) (err error) { if len(trnd.RunTimes) == 0 || - len(trnd.RunTimes) < trnd.tPrfl.MinItems { + len(trnd.RunTimes) < trnd.Config().MinItems { return } if len(tS.cfg.TrendSCfg().ThresholdSConns) == 0 { @@ -168,16 +167,16 @@ func (tS *TrendS) processThresholds(trnd *Trend) (err error) { utils.MetaEventType: utils.TrendUpdate, } var thIDs []string - if len(trnd.tPrfl.ThresholdIDs) != 0 { - if len(trnd.tPrfl.ThresholdIDs) == 1 && - trnd.tPrfl.ThresholdIDs[0] == utils.MetaNone { + if len(trnd.Config().ThresholdIDs) != 0 { + if len(trnd.Config().ThresholdIDs) == 1 && + trnd.Config().ThresholdIDs[0] == utils.MetaNone { return } - thIDs = make([]string, len(trnd.tPrfl.ThresholdIDs)) - copy(thIDs, trnd.tPrfl.ThresholdIDs) + thIDs = make([]string, len(trnd.Config().ThresholdIDs)) + copy(thIDs, trnd.Config().ThresholdIDs) } opts[utils.OptsThresholdsProfileIDs] = thIDs - ts := trnd.asTrendSummary() + ts := trnd.AsTrendSummary() trndEv := &utils.CGREvent{ Tenant: trnd.Tenant, ID: utils.GenUUID(), @@ -203,10 +202,10 @@ func (tS *TrendS) processThresholds(trnd *Trend) (err error) { return } -// processEEs will pass the Trend event to EEs -func (tS *TrendS) processEEs(trnd *Trend) (err error) { +// processEEs sends the computed trend to EEs. +func (tS *TrendS) processEEs(trnd *utils.Trend) (err error) { if len(trnd.RunTimes) == 0 || - len(trnd.RunTimes) < trnd.tPrfl.MinItems { + len(trnd.RunTimes) < trnd.Config().MinItems { return } if len(tS.cfg.TrendSCfg().EEsConns) == 0 { @@ -215,7 +214,7 @@ func (tS *TrendS) processEEs(trnd *Trend) (err error) { opts := map[string]any{ utils.MetaEventType: utils.TrendUpdate, } - ts := trnd.asTrendSummary() + ts := trnd.AsTrendSummary() trndEv := &utils.CGREventWithEeIDs{ CGREvent: &utils.CGREvent{ Tenant: trnd.Tenant, @@ -244,8 +243,8 @@ func (tS *TrendS) processEEs(trnd *Trend) (err error) { return } -// storeTrend will store or schedule the trend based on settings -func (tS *TrendS) storeTrend(ctx *context.Context, trnd *Trend) (err error) { +// storeTrend stores or schedules the trend for storage based on "store_interval". +func (tS *TrendS) storeTrend(ctx *context.Context, trnd *utils.Trend) (err error) { if tS.cfg.TrendSCfg().StoreInterval == 0 { return } @@ -260,10 +259,9 @@ func (tS *TrendS) storeTrend(ctx *context.Context, trnd *Trend) (err error) { return } -// storeTrends will do one round for saving modified trends -// -// from cache to dataDB -// designed to run asynchronously +// storeTrends stores modified trends from cache in dataDB +// Reschedules failed trend IDs for next storage cycle. +// This function is safe for concurrent use. func (tS *TrendS) storeTrends(ctx *context.Context) { var failedTrndIDs []string for { @@ -276,7 +274,7 @@ func (tS *TrendS) storeTrends(ctx *context.Context) { if trndID == utils.EmptyString { break // no more keys, backup completed } - trndIf, ok := Cache.Get(utils.CacheTrends, trndID) + trndIf, ok := engine.Cache.Get(utils.CacheTrends, trndID) if !ok || trndIf == nil { utils.Logger.Warning( fmt.Sprintf("<%s> failed retrieving from cache Trend with ID: %q", @@ -284,15 +282,15 @@ func (tS *TrendS) storeTrends(ctx *context.Context) { failedTrndIDs = append(failedTrndIDs, trndID) // record failure so we can schedule it for next backup continue } - trnd := trndIf.(*Trend) - trnd.tMux.Lock() + trnd := trndIf.(*utils.Trend) + trnd.Lock() if err := tS.dm.SetTrend(ctx, trnd); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> failed storing Trend with ID: %q, err: %q", utils.TrendS, trndID, err)) failedTrndIDs = append(failedTrndIDs, trndID) // record failure so we can schedule it for next backup } - trnd.tMux.Unlock() + trnd.Unlock() // randomize the CPU load and give up thread control runtime.Gosched() } @@ -303,7 +301,7 @@ func (tS *TrendS) storeTrends(ctx *context.Context) { } } -// asyncStoreTrends runs as a backround process, calling storeTrends based on storeInterval +// asyncStoreTrends runs as a background process that periodically calls storeTrends. func (tS *TrendS) asyncStoreTrends(ctx *context.Context) { storeInterval := tS.cfg.TrendSCfg().StoreInterval if storeInterval <= 0 { @@ -321,7 +319,7 @@ func (tS *TrendS) asyncStoreTrends(ctx *context.Context) { } } -// StartCron will activates the Cron, together with all scheduled Trend queries +// StartTrendS activates the Cron service with scheduled trend queries. func (tS *TrendS) StartTrendS(ctx *context.Context) error { if err := tS.scheduleAutomaticQueries(ctx); err != nil { return err @@ -331,7 +329,7 @@ func (tS *TrendS) StartTrendS(ctx *context.Context) error { return nil } -// StopCron will shutdown the Cron tasks +// StopTrendS gracefully shuts down Cron tasks and trend operations. func (tS *TrendS) StopTrendS() { timeEnd := time.Now().Add(tS.cfg.CoreSCfg().ShutdownTimeout) @@ -360,6 +358,7 @@ func (tS *TrendS) StopTrendS() { } } +// Reload restarts trend services with updated configuration. func (tS *TrendS) Reload(ctx *context.Context) { crnctx := tS.crn.Stop() close(tS.trendStop) @@ -371,7 +370,7 @@ func (tS *TrendS) Reload(ctx *context.Context) { go tS.asyncStoreTrends(ctx) } -// scheduleAutomaticQueries will schedule the queries at start/reload based on configured +// scheduleAutomaticQueries schedules initial trend queries based on configuration. func (tS *TrendS) scheduleAutomaticQueries(ctx *context.Context) error { schedData := make(map[string][]string) for k, v := range tS.cfg.TrendSCfg().ScheduledIDs { @@ -403,7 +402,8 @@ func (tS *TrendS) scheduleAutomaticQueries(ctx *context.Context) error { return nil } -// scheduleTrendQueries will schedule/re-schedule specific trend queries +// scheduleTrendQueries schedules or reschedules specific trend queries +// Safe for concurrent use. func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs []string) (scheduled int, err error) { var partial bool tS.crnTQsMux.Lock() @@ -442,129 +442,3 @@ func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs [] } return } - -// V1ScheduleQueries is the query for manually re-/scheduling Trend Queries -func (tS *TrendS) V1ScheduleQueries(ctx *context.Context, args *utils.ArgScheduleTrendQueries, scheduled *int) (err error) { - if sched, errSched := tS.scheduleTrendQueries(ctx, args.Tenant, args.TrendIDs); errSched != nil { - return errSched - } else { - *scheduled = sched - } - return -} - -// V1GetTrend is the API to return the trend Metrics -// The number of runTimes can be filtered based on indexes and times provided as arguments -// -// in this way being possible to work with paginators -func (tS *TrendS) V1GetTrend(ctx *context.Context, arg *utils.ArgGetTrend, retTrend *Trend) (err error) { - if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing - return utils.NewErrMandatoryIeMissing(missing...) - } - var trnd *Trend - if trnd, err = tS.dm.GetTrend(ctx, arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil { - return - } - trnd.tMux.RLock() - defer trnd.tMux.RUnlock() - retTrend.Tenant = trnd.Tenant // avoid vet complaining for mutex copying - retTrend.ID = trnd.ID - startIdx := arg.RunIndexStart - if startIdx > len(trnd.RunTimes) { - startIdx = len(trnd.RunTimes) - } - endIdx := arg.RunIndexEnd - if endIdx > len(trnd.RunTimes) || - endIdx < startIdx || - endIdx == 0 { - endIdx = len(trnd.RunTimes) - } - runTimes := trnd.RunTimes[startIdx:endIdx] - if len(runTimes) == 0 { - return utils.ErrNotFound - } - var tStart, tEnd time.Time - if arg.RunTimeStart == utils.EmptyString { - tStart = runTimes[0] - } else if tStart, err = utils.ParseTimeDetectLayout(arg.RunTimeStart, tS.cfg.GeneralCfg().DefaultTimezone); err != nil { - return - } - if arg.RunTimeEnd == utils.EmptyString { - tEnd = runTimes[len(runTimes)-1].Add(time.Duration(1)) - } else if tEnd, err = utils.ParseTimeDetectLayout(arg.RunTimeEnd, tS.cfg.GeneralCfg().DefaultTimezone); err != nil { - return - } - retTrend.RunTimes = make([]time.Time, 0, len(runTimes)) - for _, runTime := range runTimes { - if !runTime.Before(tStart) && runTime.Before(tEnd) { - retTrend.RunTimes = append(retTrend.RunTimes, runTime) - } - } - if len(retTrend.RunTimes) == 0 { // filtered out all - return utils.ErrNotFound - } - retTrend.Metrics = make(map[time.Time]map[string]*MetricWithTrend) - for _, runTime := range retTrend.RunTimes { - retTrend.Metrics[runTime] = trnd.Metrics[runTime] - } - return -} - -func (tS *TrendS) V1GetScheduledTrends(ctx *context.Context, args *utils.ArgScheduledTrends, schedTrends *[]utils.ScheduledTrend) (err error) { - tnt := args.Tenant - if tnt == utils.EmptyString { - tnt = tS.cfg.GeneralCfg().DefaultTenant - } - tS.crnTQsMux.RLock() - defer tS.crnTQsMux.RUnlock() - trendIDsMp, has := tS.crnTQs[tnt] - if !has { - return utils.ErrNotFound - } - var scheduledTrends []utils.ScheduledTrend - var entryIds map[string]cron.EntryID - if len(args.TrendIDPrefixes) == 0 { - entryIds = trendIDsMp - } else { - entryIds = make(map[string]cron.EntryID) - for _, tID := range args.TrendIDPrefixes { - for key, entryID := range trendIDsMp { - if strings.HasPrefix(key, tID) { - entryIds[key] = entryID - } - } - } - } - if len(entryIds) == 0 { - return utils.ErrNotFound - } - var entry cron.Entry - for id, entryID := range entryIds { - entry = tS.crn.Entry(entryID) - if entry.ID == 0 { - continue - } - scheduledTrends = append(scheduledTrends, utils.ScheduledTrend{ - TrendID: id, - Next: entry.Next, - Previous: entry.Prev, - }) - } - slices.SortFunc(scheduledTrends, func(a, b utils.ScheduledTrend) int { - return a.Next.Compare(b.Next) - }) - *schedTrends = scheduledTrends - return nil -} - -func (tS *TrendS) V1GetTrendSummary(ctx *context.Context, arg utils.TenantIDWithAPIOpts, reply *TrendSummary) (err error) { - var trnd *Trend - if trnd, err = tS.dm.GetTrend(ctx, arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil { - return - } - trnd.tMux.RLock() - trndS := trnd.asTrendSummary() - trnd.tMux.RUnlock() - *reply = *trndS - return -} diff --git a/apis/trends_it_test.go b/trends/trends_it_test.go similarity index 96% rename from apis/trends_it_test.go rename to trends/trends_it_test.go index f5118108d..e8b6da74c 100644 --- a/apis/trends_it_test.go +++ b/trends/trends_it_test.go @@ -19,7 +19,7 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -package apis +package trends import ( "path" @@ -41,7 +41,7 @@ var ( trCfg *config.CGRConfig trRPC *birpc.Client trConfigDIR string //run tests for specific configuration - trendProfiles []*engine.TrendProfileWithAPIOpts + trendProfiles []*utils.TrendProfileWithAPIOpts sTestsTr = []func(t *testing.T){ testTrendSInitCfg, @@ -121,7 +121,7 @@ func testTrendsRPCConn(t *testing.T) { } func testTrendsGetTrendProfileBeforeSet(t *testing.T) { - var replyTrendProfile engine.TrendProfile + var replyTrendProfile utils.TrendProfile if err := trRPC.Call(context.Background(), utils.AdminSv1GetTrendProfile, &utils.TenantIDWithAPIOpts{ TenantID: &utils.TenantID{ @@ -133,7 +133,7 @@ func testTrendsGetTrendProfileBeforeSet(t *testing.T) { } func testTrendsGetTrendProfilesBeforeSet(t *testing.T) { - var replyTrendProfiles *[]*engine.TrendProfile + var replyTrendProfiles *[]*utils.TrendProfile if err := trRPC.Call(context.Background(), utils.AdminSv1GetTrendProfiles, &utils.ArgsItemIDs{ Tenant: "cgrates.org", @@ -165,9 +165,9 @@ func testTrendsGetTrendProfileCountBeforeSet(t *testing.T) { } func testTrendsSetTrendProfiles(t *testing.T) { - trendProfiles = []*engine.TrendProfileWithAPIOpts{ + trendProfiles = []*utils.TrendProfileWithAPIOpts{ { - TrendProfile: &engine.TrendProfile{ + TrendProfile: &utils.TrendProfile{ ID: "Trend1", StatID: "Stats1", Tenant: "cgrates.org", @@ -183,7 +183,7 @@ func testTrendsSetTrendProfiles(t *testing.T) { }, }, { - TrendProfile: &engine.TrendProfile{ + TrendProfile: &utils.TrendProfile{ ID: "Trend2", StatID: "Stats2", Tenant: "cgrates.org", @@ -196,7 +196,7 @@ func testTrendsSetTrendProfiles(t *testing.T) { }, }, { - TrendProfile: &engine.TrendProfile{ + TrendProfile: &utils.TrendProfile{ ID: "Trend3", StatID: "Stats3", Tenant: "cgrates.org", @@ -227,7 +227,7 @@ func testTrendsSetTrendProfiles(t *testing.T) { } func testTrendsGetTrendProfileAfterSet(t *testing.T) { - var replyTrendProfile *engine.TrendProfile + var replyTrendProfile *utils.TrendProfile if err := trRPC.Call(context.Background(), utils.AdminSv1GetTrendProfile, &utils.TenantIDWithAPIOpts{ TenantID: &utils.TenantID{ @@ -305,7 +305,7 @@ func testTrendsGetTrendProfileCountAfterSet(t *testing.T) { func testTrendsGetTrendProfilesAfterSet(t *testing.T) { - var replyTrendProfiles []*engine.TrendProfile + var replyTrendProfiles []*utils.TrendProfile if err := trRPC.Call(context.Background(), utils.AdminSv1GetTrendProfiles, &utils.ArgsItemIDs{ Tenant: "cgrates.org", @@ -347,7 +347,7 @@ func testTrendsGetTrendProfileAfterRemove(t *testing.T) { }, &reply); err != nil { t.Error(err) } - var replyTrendProfile engine.TrendProfile + var replyTrendProfile utils.TrendProfile if err := trRPC.Call(context.Background(), utils.AdminSv1GetTrendProfile, &utils.TenantIDWithAPIOpts{ TenantID: &utils.TenantID{ @@ -426,7 +426,7 @@ func testTrendsGetTrendProfileCountAfterRemove(t *testing.T) { func testTrendsGetTrendProfilesAfterRemove(t *testing.T) { expectedTrendProfiles := append(trendProfiles[:1], trendProfiles[2:]...) - var replyTrendProfiles []*engine.TrendProfile + var replyTrendProfiles []*utils.TrendProfile if err := trRPC.Call(context.Background(), utils.AdminSv1GetTrendProfiles, &utils.ArgsItemIDs{ Tenant: "cgrates.org", diff --git a/engine/trends_test.go b/trends/trends_test.go similarity index 92% rename from engine/trends_test.go rename to trends/trends_test.go index 41344afa8..a65ebde45 100644 --- a/engine/trends_test.go +++ b/trends/trends_test.go @@ -16,19 +16,20 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -package engine +package trends import ( "testing" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" ) func TestNewTrendService(t *testing.T) { - dm := &DataManager{} + dm := &engine.DataManager{} cfg := &config.CGRConfig{} - filterS := &FilterS{} - connMgr := &ConnManager{} + filterS := &engine.FilterS{} + connMgr := &engine.ConnManager{} trendService := NewTrendService(dm, cfg, filterS, connMgr) diff --git a/utils/consts.go b/utils/consts.go index a6e5fe575..5b30ecdbb 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -587,7 +587,7 @@ const ( Tolerance = "Tolerance" TTL = "TTL" PurgeFilterIDs = "PurgeFilterIDs" - Trend = "Trend" + TrendStr = "Trend" MinItems = "MinItems" MetricIDs = "MetricIDs" MetricFilterIDs = "MetricFilterIDs" diff --git a/engine/libtrends.go b/utils/trends.go similarity index 68% rename from engine/libtrends.go rename to utils/trends.go index 127eecd0a..b8ff67ac1 100644 --- a/engine/libtrends.go +++ b/utils/trends.go @@ -16,19 +16,16 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -package engine +package utils import ( "math" "slices" "sync" "time" - - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/utils" ) -// A TrendProfile represents the settings of a Trend +// TrendProfile defines the configuration of the Trend. type TrendProfile struct { Tenant string ID string @@ -44,7 +41,13 @@ type TrendProfile struct { ThresholdIDs []string } -// Clone will clone the TrendProfile so it can be used by scheduler safely +// TrendProfileWithAPIOpts wraps TrendProfile with APIOpts. +type TrendProfileWithAPIOpts struct { + *TrendProfile + APIOpts map[string]any +} + +// Clone creates a deep copy of TrendProfile for thread-safe use. func (tP *TrendProfile) Clone() (clnTp *TrendProfile) { clnTp = &TrendProfile{ Tenant: tP.Tenant, @@ -73,277 +76,46 @@ func (tP *TrendProfile) Clone() (clnTp *TrendProfile) { return } -type TrendProfileWithAPIOpts struct { - *TrendProfile - APIOpts map[string]any -} - +// TenantID returns the concatenated tenant and ID. func (srp *TrendProfile) TenantID() string { - return utils.ConcatenatedKey(srp.Tenant, srp.ID) -} - -type TrendWithAPIOpts struct { - *Trend - APIOpts map[string]any -} - -// NewTrendFromProfile is a constructor for an empty trend out of it's profile -func NewTrendFromProfile(tP *TrendProfile) *Trend { - return &Trend{ - Tenant: tP.Tenant, - ID: tP.ID, - RunTimes: make([]time.Time, 0), - Metrics: make(map[time.Time]map[string]*MetricWithTrend), - - tPrfl: tP, - } -} - -// Trend is the unit matched by filters -type Trend struct { - tMux sync.RWMutex - - Tenant string - ID string - RunTimes []time.Time - Metrics map[time.Time]map[string]*MetricWithTrend - CompressedMetrics []byte // if populated, Metrics and RunTimes will be emty - - // indexes help faster processing - mLast map[string]time.Time // last time a metric was present - mCounts map[string]int // number of times a metric is present in Metrics - mTotals map[string]float64 // cached sum, used for average calculations - - tPrfl *TrendProfile // store here the trend profile so we can have it at hands further - -} - -func (t *Trend) Clone() (tC *Trend) { - return -} - -// AsTrendSummary transforms the trend into TrendSummary -func (t *Trend) asTrendSummary() (ts *TrendSummary) { - ts = &TrendSummary{ - Tenant: t.Tenant, - ID: t.ID, - Metrics: make(map[string]*MetricWithTrend), - } - if len(t.RunTimes) != 0 { - ts.Time = t.RunTimes[len(t.RunTimes)-1] - for mID, mWt := range t.Metrics[ts.Time] { - ts.Metrics[mID] = &MetricWithTrend{ - ID: mWt.ID, - Value: mWt.Value, - TrendGrowth: mWt.TrendGrowth, - TrendLabel: mWt.TrendLabel, - } - } - } - return -} - -func (t *Trend) compress(ms utils.Marshaler) (tr *Trend, err error) { - if config.CgrConfig().TrendSCfg().StoreUncompressedLimit > len(t.RunTimes) { - return - } - tr = &Trend{ - Tenant: t.Tenant, - ID: t.ID, - } - tr.CompressedMetrics, err = ms.Marshal(tr.Metrics) - if err != nil { - return - } - return tr, nil -} - -func (t *Trend) uncompress(ms utils.Marshaler) (err error) { - if t == nil || t.CompressedMetrics == nil { - return - } - - err = ms.Unmarshal(t.CompressedMetrics, &t.Metrics) - if err != nil { - return - } - t.CompressedMetrics = nil - t.RunTimes = make([]time.Time, len(t.Metrics)) - i := 0 - for key := range t.Metrics { - t.RunTimes[i] = key - i++ - } - slices.SortFunc(t.RunTimes, func(a, b time.Time) int { - return a.Compare(b) - }) - return -} - -// Compile is used to initialize or cleanup the Trend -// -// thread safe since it should be used close to source -func (t *Trend) Compile(cleanTtl time.Duration, qLength int) { - t.cleanup(cleanTtl, qLength) - if len(t.mTotals) == 0 { // indexes were not yet built - t.computeIndexes() - } -} - -// cleanup will clean stale data out of -func (t *Trend) cleanup(ttl time.Duration, qLength int) (altered bool) { - if ttl >= 0 { - expTime := time.Now().Add(-ttl) - var expIdx *int - for i, rT := range t.RunTimes { - if rT.After(expTime) { - continue - } - expIdx = &i - delete(t.Metrics, rT) - } - if expIdx != nil { - if len(t.RunTimes)-1 == *expIdx { - t.RunTimes = make([]time.Time, 0) - } else { - t.RunTimes = t.RunTimes[*expIdx+1:] - } - altered = true - } - } - - diffLen := len(t.RunTimes) - qLength - if qLength > 0 && diffLen > 0 { - var rmTms []time.Time - rmTms, t.RunTimes = t.RunTimes[:diffLen], t.RunTimes[diffLen:] - for _, rmTm := range rmTms { - delete(t.Metrics, rmTm) - } - altered = true - } - if altered { - t.computeIndexes() - } - return -} - -// computeIndexes should be called after each retrieval from DB -func (t *Trend) computeIndexes() { - t.mLast = make(map[string]time.Time) - t.mCounts = make(map[string]int) - t.mTotals = make(map[string]float64) - for _, runTime := range t.RunTimes { - for _, mWt := range t.Metrics[runTime] { - t.indexesAppendMetric(mWt, runTime) - } - } -} - -// indexesAppendMetric appends a single metric to indexes -func (t *Trend) indexesAppendMetric(mWt *MetricWithTrend, rTime time.Time) { - t.mLast[mWt.ID] = rTime - t.mCounts[mWt.ID]++ - t.mTotals[mWt.ID] += mWt.Value -} - -// getTrendGrowth returns the percentage growth for a specific metric -// -// @correlation parameter will define whether the comparison is against last or average value -// errors in case of previous -func (t *Trend) getTrendGrowth(mID string, mVal float64, correlation string, roundDec int) (tG float64, err error) { - var prevVal float64 - if _, has := t.mLast[mID]; !has { - return -1.0, utils.ErrNotFound - } - if _, has := t.Metrics[t.mLast[mID]][mID]; !has { - return -1.0, utils.ErrNotFound - } - switch correlation { - case utils.MetaLast: - prevVal = t.Metrics[t.mLast[mID]][mID].Value - case utils.MetaAverage: - prevVal = t.mTotals[mID] / float64(t.mCounts[mID]) - default: - return -1.0, utils.ErrCorrelationUndefined - } - - diffVal := mVal - prevVal - return utils.Round(diffVal*100/prevVal, roundDec, utils.MetaRoundingMiddle), nil -} - -// getTrendLabel identifies the trend label for the instant value of the metric -// -// *positive, *negative, *constant, N/A -func (t *Trend) getTrendLabel(tGrowth float64, tolerance float64) (lbl string) { - switch { - case tGrowth > 0: - lbl = utils.MetaPositive - case tGrowth < 0: - lbl = utils.MetaNegative - default: - lbl = utils.MetaConstant - } - if math.Abs(tGrowth) <= tolerance { // percentage value of diff is lower than threshold - lbl = utils.MetaConstant - } - return -} - -// MetricWithTrend represents one read from StatS -type MetricWithTrend struct { - ID string // Metric ID - Value float64 // Metric Value - TrendGrowth float64 // Difference between last and previous - TrendLabel string // *positive, *negative, *constant, N/A -} - -func (tr *Trend) TenantID() string { - return utils.ConcatenatedKey(tr.Tenant, tr.ID) -} - -// TrendSummary represents the last trend computed -type TrendSummary struct { - Tenant string - ID string - Time time.Time - Metrics map[string]*MetricWithTrend + return ConcatenatedKey(srp.Tenant, srp.ID) } func (tp *TrendProfile) Set(path []string, val any, _ bool) (err error) { if len(path) != 1 { - return utils.ErrWrongPath + return ErrWrongPath } switch path[0] { default: - return utils.ErrWrongPath - case utils.Tenant: - tp.Tenant = utils.IfaceAsString(val) - case utils.ID: - tp.ID = utils.IfaceAsString(val) - case utils.Schedule: - tp.Schedule = utils.IfaceAsString(val) - case utils.StatID: - tp.StatID = utils.IfaceAsString(val) - case utils.Metrics: + return ErrWrongPath + case Tenant: + tp.Tenant = IfaceAsString(val) + case ID: + tp.ID = IfaceAsString(val) + case Schedule: + tp.Schedule = IfaceAsString(val) + case StatID: + tp.StatID = IfaceAsString(val) + case Metrics: var valA []string - valA, err = utils.IfaceAsStringSlice(val) + valA, err = IfaceAsStringSlice(val) tp.Metrics = append(tp.Metrics, valA...) - case utils.TTL: - tp.TTL, err = utils.IfaceAsDuration(val) - case utils.QueueLength: - tp.QueueLength, err = utils.IfaceAsInt(val) - case utils.MinItems: - tp.MinItems, err = utils.IfaceAsInt(val) - case utils.CorrelationType: - tp.CorrelationType = utils.IfaceAsString(val) - case utils.Tolerance: - tp.Tolerance, err = utils.IfaceAsFloat64(val) - case utils.Stored: - tp.Stored, err = utils.IfaceAsBool(val) - case utils.ThresholdIDs: + case TTL: + tp.TTL, err = IfaceAsDuration(val) + case QueueLength: + tp.QueueLength, err = IfaceAsInt(val) + case MinItems: + tp.MinItems, err = IfaceAsInt(val) + case CorrelationType: + tp.CorrelationType = IfaceAsString(val) + case Tolerance: + tp.Tolerance, err = IfaceAsFloat64(val) + case Stored: + tp.Stored, err = IfaceAsBool(val) + case ThresholdIDs: var valA []string - valA, err = utils.IfaceAsStringSlice(val) + valA, err = IfaceAsStringSlice(val) tp.ThresholdIDs = append(tp.ThresholdIDs, valA...) } return @@ -385,53 +157,304 @@ func (tp *TrendProfile) Merge(v2 any) { } } -func (tp *TrendProfile) String() string { return utils.ToJSON(tp) } +func (tp *TrendProfile) String() string { return ToJSON(tp) } + func (tp *TrendProfile) FieldAsString(fldPath []string) (_ string, err error) { var val any if val, err = tp.FieldAsInterface(fldPath); err != nil { return } - return utils.IfaceAsString(val), nil + return IfaceAsString(val), nil } + func (tp *TrendProfile) FieldAsInterface(fldPath []string) (_ any, err error) { if len(fldPath) != 1 { - return nil, utils.ErrNotFound + return nil, ErrNotFound } switch fldPath[0] { default: - fld, idx := utils.GetPathIndex(fldPath[0]) + fld, idx := GetPathIndex(fldPath[0]) if idx != nil { switch fld { - case utils.Metrics: + case Metrics: if *idx < len(tp.Metrics) { return tp.Metrics[*idx], nil } - case utils.ThresholdIDs: + case ThresholdIDs: if *idx < len(tp.ThresholdIDs) { return tp.ThresholdIDs[*idx], nil } } } - return nil, utils.ErrNotFound - case utils.Tenant: + return nil, ErrNotFound + case Tenant: return tp.Tenant, nil - case utils.ID: + case ID: return tp.ID, nil - case utils.Schedule: + case Schedule: return tp.Schedule, nil - case utils.StatID: + case StatID: return tp.StatID, nil - case utils.TTL: + case TTL: return tp.TTL, nil - case utils.QueueLength: + case QueueLength: return tp.QueueLength, nil - case utils.MinItems: + case MinItems: return tp.MinItems, nil - case utils.CorrelationType: + case CorrelationType: return tp.CorrelationType, nil - case utils.Tolerance: + case Tolerance: return tp.Tolerance, nil - case utils.Stored: + case Stored: return tp.Stored, nil } } + +// Trend represents a collection of metrics with trend analysis. +type Trend struct { + tMux sync.RWMutex + + Tenant string + ID string + RunTimes []time.Time + Metrics map[time.Time]map[string]*MetricWithTrend + CompressedMetrics []byte // if populated, Metrics and RunTimes will be emty + + // indexes help faster processing + mLast map[string]time.Time // last time a metric was present + mCounts map[string]int // number of times a metric is present in Metrics + mTotals map[string]float64 // cached sum, used for average calculations + + tPrfl *TrendProfile // store here the trend profile so we can have it at hands further +} + +// TrendWithAPIOpts wraps Trend with APIOpts. +type TrendWithAPIOpts struct { + *Trend + APIOpts map[string]any +} + +// TrendSummary holds the most recent trend metrics. +type TrendSummary struct { + Tenant string + ID string + Time time.Time + Metrics map[string]*MetricWithTrend +} + +// MetricWithTrend contains a metric value with its calculated trend info. +type MetricWithTrend struct { + ID string // Metric ID + Value float64 // Metric Value + TrendGrowth float64 // Difference between last and previous + TrendLabel string // *positive, *negative, *constant, N/A +} + +// NewTrendFromProfile creates an empty trend based on profile configuration. +func NewTrendFromProfile(tP *TrendProfile) *Trend { + return &Trend{ + Tenant: tP.Tenant, + ID: tP.ID, + RunTimes: make([]time.Time, 0), + Metrics: make(map[time.Time]map[string]*MetricWithTrend), + + tPrfl: tP, + } +} + +func (tr *Trend) TenantID() string { + return ConcatenatedKey(tr.Tenant, tr.ID) +} + +// Config returns the trend's profile configuration. +func (t *Trend) Config() *TrendProfile { + return t.tPrfl +} + +func (t *Trend) SetConfig(tp *TrendProfile) { + t.tPrfl = tp +} + +// AsTrendSummary creates a summary with the most recent trend data. +func (t *Trend) AsTrendSummary() (ts *TrendSummary) { + ts = &TrendSummary{ + Tenant: t.Tenant, + ID: t.ID, + Metrics: make(map[string]*MetricWithTrend), + } + if len(t.RunTimes) != 0 { + ts.Time = t.RunTimes[len(t.RunTimes)-1] + for mID, mWt := range t.Metrics[ts.Time] { + ts.Metrics[mID] = &MetricWithTrend{ + ID: mWt.ID, + Value: mWt.Value, + TrendGrowth: mWt.TrendGrowth, + TrendLabel: mWt.TrendLabel, + } + } + } + return +} + +// Compress creates a compressed version of the trend. +func (t *Trend) Compress(ms Marshaler, limit int) (tr *Trend, err error) { + if limit > len(t.RunTimes) { + return + } + tr = &Trend{ + Tenant: t.Tenant, + ID: t.ID, + } + tr.CompressedMetrics, err = ms.Marshal(tr.Metrics) + if err != nil { + return + } + return tr, nil +} + +// Uncompress expands a compressed trend. +func (t *Trend) Uncompress(ms Marshaler) (err error) { + if t == nil || t.CompressedMetrics == nil { + return + } + + err = ms.Unmarshal(t.CompressedMetrics, &t.Metrics) + if err != nil { + return + } + t.CompressedMetrics = nil + t.RunTimes = make([]time.Time, len(t.Metrics)) + i := 0 + for key := range t.Metrics { + t.RunTimes[i] = key + i++ + } + slices.SortFunc(t.RunTimes, func(a, b time.Time) int { + return a.Compare(b) + }) + return +} + +// Compile initializes or cleans up the Trend. +// Safe for concurrent use. +func (t *Trend) Compile(cleanTtl time.Duration, qLength int) { + t.cleanup(cleanTtl, qLength) + if len(t.mTotals) == 0 { // indexes were not yet built + t.computeIndexes() + } +} + +// cleanup removes stale data based on TTL and queue length limits. +func (t *Trend) cleanup(ttl time.Duration, qLength int) (altered bool) { + if ttl >= 0 { + expTime := time.Now().Add(-ttl) + var expIdx *int + for i, rT := range t.RunTimes { + if rT.After(expTime) { + continue + } + expIdx = &i + delete(t.Metrics, rT) + } + if expIdx != nil { + if len(t.RunTimes)-1 == *expIdx { + t.RunTimes = make([]time.Time, 0) + } else { + t.RunTimes = t.RunTimes[*expIdx+1:] + } + altered = true + } + } + + diffLen := len(t.RunTimes) - qLength + if qLength > 0 && diffLen > 0 { + var rmTms []time.Time + rmTms, t.RunTimes = t.RunTimes[:diffLen], t.RunTimes[diffLen:] + for _, rmTm := range rmTms { + delete(t.Metrics, rmTm) + } + altered = true + } + if altered { + t.computeIndexes() + } + return +} + +// computeIndexes rebuilds internal indexes after DB retrieval. +func (t *Trend) computeIndexes() { + t.mLast = make(map[string]time.Time) + t.mCounts = make(map[string]int) + t.mTotals = make(map[string]float64) + for _, runTime := range t.RunTimes { + for _, mWt := range t.Metrics[runTime] { + t.IndexesAppendMetric(mWt, runTime) + } + } +} + +// IndexesAppendMetric adds a single metric to internal indexes. +func (t *Trend) IndexesAppendMetric(mWt *MetricWithTrend, rTime time.Time) { + t.mLast[mWt.ID] = rTime + t.mCounts[mWt.ID]++ + t.mTotals[mWt.ID] += mWt.Value +} + +// GetTrendGrowth calculates percentage growth for a metric compared to previous values. +func (t *Trend) GetTrendGrowth(mID string, mVal float64, correlation string, roundDec int) (tG float64, err error) { + var prevVal float64 + if _, has := t.mLast[mID]; !has { + return -1.0, ErrNotFound + } + if _, has := t.Metrics[t.mLast[mID]][mID]; !has { + return -1.0, ErrNotFound + } + switch correlation { + case MetaLast: + prevVal = t.Metrics[t.mLast[mID]][mID].Value + case MetaAverage: + prevVal = t.mTotals[mID] / float64(t.mCounts[mID]) + default: + return -1.0, ErrCorrelationUndefined + } + + diffVal := mVal - prevVal + return Round(diffVal*100/prevVal, roundDec, MetaRoundingMiddle), nil +} + +// GetTrendLabel determines trend direction based on growth percentage and tolerance. +// Returns "*positive", "*negative", "*constant", or "N/A" based on the growth value. +func GetTrendLabel(tGrowth float64, tolerance float64) (lbl string) { + switch { + case tGrowth > 0: + lbl = MetaPositive + case tGrowth < 0: + lbl = MetaNegative + default: + lbl = MetaConstant + } + if math.Abs(tGrowth) <= tolerance { // percentage value of diff is lower than threshold + lbl = MetaConstant + } + return +} + +// Lock locks the trend mutex. +func (t *Trend) Lock() { + t.tMux.Lock() +} + +// Unlock unlocks the trend mutex. +func (t *Trend) Unlock() { + t.tMux.Unlock() +} + +// RLock locks the trend mutex for reading. +func (t *Trend) RLock() { + t.tMux.RLock() +} + +// RUnlock unlocks the read lock on the trend mutex. +func (t *Trend) RUnlock() { + t.tMux.RUnlock() +} diff --git a/engine/libtrends_test.go b/utils/trends_test.go similarity index 85% rename from engine/libtrends_test.go rename to utils/trends_test.go index 4ea2519bc..98345b138 100644 --- a/engine/libtrends_test.go +++ b/utils/trends_test.go @@ -16,7 +16,7 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -package engine +package utils import ( "errors" @@ -24,8 +24,6 @@ import ( "strings" "testing" "time" - - "github.com/cgrates/cgrates/utils" ) func TestTrendProfileClone(t *testing.T) { @@ -112,7 +110,7 @@ func TestTrendProfileTenantIDAndTrendProfileWithAPIOpts(t *testing.T) { tenantID := tp.TenantID() - expectedTenantID := "cgrates.org" + utils.ConcatenatedKeySep + "trend1" + expectedTenantID := "cgrates.org" + ConcatenatedKeySep + "trend1" if tenantID != expectedTenantID { t.Errorf("Expected TenantID %s, but got %s", expectedTenantID, tenantID) } @@ -158,9 +156,9 @@ func TestIndexesAppendMetric(t *testing.T) { rTime1 := time.Now() rTime2 := rTime1.Add(10 * time.Minute) - trend.indexesAppendMetric(metric1, rTime1) - trend.indexesAppendMetric(metric2, rTime2) - trend.indexesAppendMetric(metric1, rTime2) + trend.IndexesAppendMetric(metric1, rTime1) + trend.IndexesAppendMetric(metric2, rTime2) + trend.IndexesAppendMetric(metric1, rTime2) expectedMLast := map[string]time.Time{ "metric1": rTime2, @@ -297,24 +295,22 @@ func TestTComputeIndexes(t *testing.T) { } func TestGetTrendLabel(t *testing.T) { - trend := &Trend{} - tests := []struct { tGrowth float64 tolerance float64 expected string }{ - {1.0, 0.5, utils.MetaPositive}, - {-1.0, 0.5, utils.MetaNegative}, - {0.0, 0.5, utils.MetaConstant}, - {0.3, 0.5, utils.MetaConstant}, - {-0.3, 0.5, utils.MetaConstant}, - {0.6, 0.5, utils.MetaPositive}, - {-0.6, 0.5, utils.MetaNegative}, + {1.0, 0.5, MetaPositive}, + {-1.0, 0.5, MetaNegative}, + {0.0, 0.5, MetaConstant}, + {0.3, 0.5, MetaConstant}, + {-0.3, 0.5, MetaConstant}, + {0.6, 0.5, MetaPositive}, + {-0.6, 0.5, MetaNegative}, } for _, test := range tests { - result := trend.getTrendLabel(test.tGrowth, test.tolerance) + result := GetTrendLabel(test.tGrowth, test.tolerance) if result != test.expected { t.Errorf("For tGrowth: %f and tolerance: %f, expected %s, got %s", test.tGrowth, test.tolerance, test.expected, result) } @@ -330,16 +326,16 @@ func TestGetTrendGrowth(t *testing.T) { mCounts: map[string]int{}, } - _, err := trend.getTrendGrowth("unknownID", 100, utils.MetaLast, 2) - if !errors.Is(err, utils.ErrNotFound) { + _, err := trend.GetTrendGrowth("unknownID", 100, MetaLast, 2) + if !errors.Is(err, ErrNotFound) { t.Errorf("Expected error ErrNotFound, got: %v", err) } now := time.Now() trend.mLast["metric1"] = now - _, err = trend.getTrendGrowth("metric1", 100, utils.MetaLast, 2) - if !errors.Is(err, utils.ErrNotFound) { + _, err = trend.GetTrendGrowth("metric1", 100, MetaLast, 2) + if !errors.Is(err, ErrNotFound) { t.Errorf("Expected error ErrNotFound, got: %v", err) } @@ -349,7 +345,7 @@ func TestGetTrendGrowth(t *testing.T) { }, } - got, err := trend.getTrendGrowth("metric1", 100, utils.MetaLast, 2) + got, err := trend.GetTrendGrowth("metric1", 100, MetaLast, 2) if err != nil || got != 25.0 { t.Errorf("Mismatch for MetaLast correlation. Got: %v, expected: %v", got, 25.0) } @@ -361,13 +357,13 @@ func TestGetTrendGrowth(t *testing.T) { "metric1": 4, } - got, err = trend.getTrendGrowth("metric1", 120, utils.MetaAverage, 2) + got, err = trend.GetTrendGrowth("metric1", 120, MetaAverage, 2) if err != nil || got != 20.0 { t.Errorf("Mismatch for MetaAverage correlation. Got: %v, expected: %v", got, 20.0) } - _, err = trend.getTrendGrowth("metric1", 100, "invalidCorrelation", 2) - if !errors.Is(err, utils.ErrCorrelationUndefined) { + _, err = trend.GetTrendGrowth("metric1", 100, "invalidCorrelation", 2) + if !errors.Is(err, ErrCorrelationUndefined) { t.Errorf("Expected error ErrCorrelationUndefined, got: %v", err) } } @@ -419,21 +415,21 @@ func TestTrendProfileFieldAsString(t *testing.T) { err error val any }{ - {utils.ID, []string{utils.ID}, nil, "Trend1"}, - {utils.Tenant, []string{utils.Tenant}, nil, "cgrates.org"}, - {utils.Schedule, []string{utils.Schedule}, nil, "@every 1m"}, - {utils.StatID, []string{utils.StatID}, nil, "Stat1"}, - {utils.Metrics, []string{utils.Metrics + "[0]"}, nil, "*acc"}, - {utils.Metrics, []string{utils.Metrics + "[1]"}, nil, "*tcd"}, - {utils.TTL, []string{utils.TTL}, nil, 10 * time.Minute}, - {utils.QueueLength, []string{utils.QueueLength}, nil, 100}, - {utils.MinItems, []string{utils.MinItems}, nil, 10}, - {utils.CorrelationType, []string{utils.CorrelationType}, nil, "*average"}, - {utils.Tolerance, []string{utils.Tolerance}, nil, 0.05}, - {utils.Stored, []string{utils.Stored}, nil, true}, - {utils.ThresholdIDs, []string{utils.ThresholdIDs + "[0]"}, nil, "Thresh1"}, - {utils.ThresholdIDs, []string{utils.ThresholdIDs + "[1]"}, nil, "Thresh2"}, - {"NonExistingField", []string{"Field1"}, utils.ErrNotFound, nil}, + {ID, []string{ID}, nil, "Trend1"}, + {Tenant, []string{Tenant}, nil, "cgrates.org"}, + {Schedule, []string{Schedule}, nil, "@every 1m"}, + {StatID, []string{StatID}, nil, "Stat1"}, + {Metrics, []string{Metrics + "[0]"}, nil, "*acc"}, + {Metrics, []string{Metrics + "[1]"}, nil, "*tcd"}, + {TTL, []string{TTL}, nil, 10 * time.Minute}, + {QueueLength, []string{QueueLength}, nil, 100}, + {MinItems, []string{MinItems}, nil, 10}, + {CorrelationType, []string{CorrelationType}, nil, "*average"}, + {Tolerance, []string{Tolerance}, nil, 0.05}, + {Stored, []string{Stored}, nil, true}, + {ThresholdIDs, []string{ThresholdIDs + "[0]"}, nil, "Thresh1"}, + {ThresholdIDs, []string{ThresholdIDs + "[1]"}, nil, "Thresh2"}, + {"NonExistingField", []string{"Field1"}, ErrNotFound, nil}, } rp := &TrendProfile{ Tenant: "cgrates.org", @@ -490,24 +486,24 @@ func TestTrendCleanUp(t *testing.T) { }, Metrics: map[time.Time]map[string]*MetricWithTrend{ tm: { - utils.MetaTCC: {ID: utils.MetaTCC, Value: 13, TrendGrowth: -1, TrendLabel: utils.NotAvailable}, - utils.MetaACC: {ID: utils.MetaACC, Value: 13, TrendGrowth: -1, TrendLabel: utils.NotAvailable}, + MetaTCC: {ID: MetaTCC, Value: 13, TrendGrowth: -1, TrendLabel: NotAvailable}, + MetaACC: {ID: MetaACC, Value: 13, TrendGrowth: -1, TrendLabel: NotAvailable}, }, tm2: { - utils.MetaTCC: {ID: utils.MetaTCC, Value: 30, TrendGrowth: 120, TrendLabel: utils.MetaPositive}, - utils.MetaACC: {ID: utils.MetaACC, Value: 15, TrendGrowth: 4, TrendLabel: utils.MetaPositive}, + MetaTCC: {ID: MetaTCC, Value: 30, TrendGrowth: 120, TrendLabel: MetaPositive}, + MetaACC: {ID: MetaACC, Value: 15, TrendGrowth: 4, TrendLabel: MetaPositive}, }, tm3: { - utils.MetaTCC: {ID: utils.MetaTCC, Value: 30, TrendGrowth: 120, TrendLabel: utils.MetaPositive}, - utils.MetaACC: {ID: utils.MetaACC, Value: 15, TrendGrowth: 4, TrendLabel: utils.MetaPositive}, + MetaTCC: {ID: MetaTCC, Value: 30, TrendGrowth: 120, TrendLabel: MetaPositive}, + MetaACC: {ID: MetaACC, Value: 15, TrendGrowth: 4, TrendLabel: MetaPositive}, }, tm4: { - utils.MetaTCC: {ID: utils.MetaTCC, Value: 30, TrendGrowth: 120, TrendLabel: utils.MetaPositive}, - utils.MetaACC: {ID: utils.MetaACC, Value: 15, TrendGrowth: 4, TrendLabel: utils.MetaPositive}, + MetaTCC: {ID: MetaTCC, Value: 30, TrendGrowth: 120, TrendLabel: MetaPositive}, + MetaACC: {ID: MetaACC, Value: 15, TrendGrowth: 4, TrendLabel: MetaPositive}, }, tm5: { - utils.MetaTCC: {ID: utils.MetaTCC, Value: 30, TrendGrowth: 120, TrendLabel: utils.MetaPositive}, - utils.MetaACC: {ID: utils.MetaACC, Value: 15, TrendGrowth: 4, TrendLabel: utils.MetaPositive}, + MetaTCC: {ID: MetaTCC, Value: 30, TrendGrowth: 120, TrendLabel: MetaPositive}, + MetaACC: {ID: MetaACC, Value: 15, TrendGrowth: 4, TrendLabel: MetaPositive}, }, }, } @@ -667,84 +663,84 @@ func TestTrendProfileSet(t *testing.T) { }{ { name: "Set Tenant", - path: []string{utils.Tenant}, + path: []string{Tenant}, val: "newTenant", expected: "newTenant", hasError: false, }, { name: "Set ID", - path: []string{utils.ID}, + path: []string{ID}, val: "newID", expected: "newID", hasError: false, }, { name: "Set Schedule", - path: []string{utils.Schedule}, + path: []string{Schedule}, val: "@every 2m", expected: "@every 2m", hasError: false, }, { name: "Set StatID", - path: []string{utils.StatID}, + path: []string{StatID}, val: "newStatID", expected: "newStatID", hasError: false, }, { name: "Set Metrics", - path: []string{utils.Metrics}, + path: []string{Metrics}, val: []string{"*newMetric"}, expected: []string{"*acc", "*tcd", "*newMetric"}, hasError: false, }, { name: "Set TTL", - path: []string{utils.TTL}, + path: []string{TTL}, val: "15m", expected: 15 * time.Minute, hasError: false, }, { name: "Set QueueLength", - path: []string{utils.QueueLength}, + path: []string{QueueLength}, val: 50, expected: 50, hasError: false, }, { name: "Set MinItems", - path: []string{utils.MinItems}, + path: []string{MinItems}, val: 20, expected: 20, hasError: false, }, { name: "Set CorrelationType", - path: []string{utils.CorrelationType}, + path: []string{CorrelationType}, val: "*sum", expected: "*sum", hasError: false, }, { name: "Set Tolerance", - path: []string{utils.Tolerance}, + path: []string{Tolerance}, val: 0.1, expected: 0.1, hasError: false, }, { name: "Set Stored", - path: []string{utils.Stored}, + path: []string{Stored}, val: false, expected: false, hasError: false, }, { name: "Set ThresholdIDs", - path: []string{utils.ThresholdIDs}, + path: []string{ThresholdIDs}, val: []string{"Thresh3", "Thresh4"}, expected: []string{"Thresh1", "Thresh2", "Thresh3", "Thresh4"}, hasError: false, @@ -769,23 +765,23 @@ func TestTrendProfileSet(t *testing.T) { } switch tt.path[0] { - case utils.Tenant: + case Tenant: if tp.Tenant != tt.expected { t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.Tenant) } - case utils.ID: + case ID: if tp.ID != tt.expected { t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.ID) } - case utils.Schedule: + case Schedule: if tp.Schedule != tt.expected { t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.Schedule) } - case utils.StatID: + case StatID: if tp.StatID != tt.expected { t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.StatID) } - case utils.Metrics: + case Metrics: if len(tp.Metrics) != len(tt.expected.([]string)) { t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.Metrics) } else { @@ -796,31 +792,31 @@ func TestTrendProfileSet(t *testing.T) { } } } - case utils.TTL: + case TTL: if tp.TTL != tt.expected { t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.TTL) } - case utils.QueueLength: + case QueueLength: if tp.QueueLength != tt.expected { t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.QueueLength) } - case utils.MinItems: + case MinItems: if tp.MinItems != tt.expected { t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.MinItems) } - case utils.CorrelationType: + case CorrelationType: if tp.CorrelationType != tt.expected { t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.CorrelationType) } - case utils.Tolerance: + case Tolerance: if tp.Tolerance != tt.expected { t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.Tolerance) } - case utils.Stored: + case Stored: if tp.Stored != tt.expected { t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.Stored) } - case utils.ThresholdIDs: + case ThresholdIDs: if len(tp.ThresholdIDs) != len(tt.expected.([]string)) { t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.ThresholdIDs) } else {