diff --git a/admins/trends.go b/admins/trends.go
new file mode 100644
index 000000000..40877b403
--- /dev/null
+++ b/admins/trends.go
@@ -0,0 +1,141 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package admins
+
+import (
+ "github.com/cgrates/birpc/context"
+ "github.com/cgrates/cgrates/utils"
+)
+
+// V1GetTrendProfile returns a Trend profile
+func (a *AdminS) V1GetTrendProfile(ctx *context.Context, arg *utils.TenantIDWithAPIOpts, reply *utils.TrendProfile) (err error) {
+ if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing
+ return utils.NewErrMandatoryIeMissing(missing...)
+ }
+ tnt := arg.Tenant
+ if tnt == utils.EmptyString {
+ tnt = a.cfg.GeneralCfg().DefaultTenant
+ }
+ sCfg, err := a.dm.GetTrendProfile(ctx, tnt, arg.ID, true, true, utils.NonTransactional)
+ if err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ *reply = *sCfg
+ return
+}
+
+// V1GetTrendProfileIDs returns list of TrendProfile IDs registered for a tenant
+func (a *AdminS) V1GetTrendProfileIDs(ctx *context.Context, args *utils.ArgsItemIDs, stsPrfIDs *[]string) (err error) {
+ tnt := args.Tenant
+ if tnt == utils.EmptyString {
+ tnt = a.cfg.GeneralCfg().DefaultTenant
+ }
+ prfx := utils.TrendProfilePrefix + tnt + utils.ConcatenatedKeySep
+ lenPrfx := len(prfx)
+ prfx += args.ItemsPrefix
+ var keys []string
+ if keys, err = a.dm.DataDB().GetKeysForPrefix(ctx, prfx); err != nil {
+ return
+ }
+ if len(keys) == 0 {
+ return utils.ErrNotFound
+ }
+ retIDs := make([]string, len(keys))
+ for i, key := range keys {
+ retIDs[i] = key[lenPrfx:]
+ }
+ var limit, offset, maxItems int
+ if limit, offset, maxItems, err = utils.GetPaginateOpts(args.APIOpts); err != nil {
+ return
+ }
+ *stsPrfIDs, err = utils.Paginate(retIDs, limit, offset, maxItems)
+ return
+}
+
+// V1GetTrendProfiles returns a list of stats profiles registered for a tenant
+func (a *AdminS) V1GetTrendProfiles(ctx *context.Context, args *utils.ArgsItemIDs, sqPrfs *[]*utils.TrendProfile) (err error) {
+ tnt := args.Tenant
+ if tnt == utils.EmptyString {
+ tnt = a.cfg.GeneralCfg().DefaultTenant
+ }
+ var sqPrfIDs []string
+ if err = a.V1GetTrendProfileIDs(ctx, args, &sqPrfIDs); err != nil {
+ return
+ }
+ *sqPrfs = make([]*utils.TrendProfile, 0, len(sqPrfIDs))
+ for _, sqPrfID := range sqPrfIDs {
+ var sqPrf *utils.TrendProfile
+ sqPrf, err = a.dm.GetTrendProfile(ctx, tnt, sqPrfID, true, true, utils.NonTransactional)
+ if err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ *sqPrfs = append(*sqPrfs, sqPrf)
+ }
+ return
+}
+
+// V1GetTrendProfilesCount returns the total number of TrendProfileIDs registered for a tenant
+// returns ErrNotFound in case of 0 TrendProfileIDs
+func (a *AdminS) V1GetTrendProfilesCount(ctx *context.Context, args *utils.ArgsItemIDs, reply *int) (err error) {
+ tnt := args.Tenant
+ if tnt == utils.EmptyString {
+ tnt = a.cfg.GeneralCfg().DefaultTenant
+ }
+ prfx := utils.TrendProfilePrefix + tnt + utils.ConcatenatedKeySep + args.ItemsPrefix
+ var keys []string
+ if keys, err = a.dm.DataDB().GetKeysForPrefix(ctx, prfx); err != nil {
+ return err
+ }
+ if len(keys) == 0 {
+ return utils.ErrNotFound
+ }
+ *reply = len(keys)
+ return
+}
+
+// V1SetTrendProfile alters/creates a TrendProfile
+func (a *AdminS) V1SetTrendProfile(ctx *context.Context, arg *utils.TrendProfileWithAPIOpts, reply *string) (err error) {
+ if missing := utils.MissingStructFields(arg.TrendProfile, []string{utils.ID}); len(missing) != 0 {
+ return utils.NewErrMandatoryIeMissing(missing...)
+ }
+ if arg.Tenant == utils.EmptyString {
+ arg.Tenant = a.cfg.GeneralCfg().DefaultTenant
+ }
+ if err = a.dm.SetTrendProfile(ctx, arg.TrendProfile); err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ *reply = utils.OK
+ return nil
+}
+
+// V1RemoveTrendProfile remove a specific stat configuration
+func (a *AdminS) V1RemoveTrendProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) error {
+ if missing := utils.MissingStructFields(args, []string{utils.ID}); len(missing) != 0 { //Params missing
+ return utils.NewErrMandatoryIeMissing(missing...)
+ }
+ tnt := args.Tenant
+ if tnt == utils.EmptyString {
+ tnt = a.cfg.GeneralCfg().DefaultTenant
+ }
+ if err := a.dm.RemoveTrendProfile(ctx, tnt, args.ID); err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ *reply = utils.OK
+ return nil
+}
diff --git a/apis/replicator.go b/apis/replicator.go
index 3323becd5..a2cacaa82 100644
--- a/apis/replicator.go
+++ b/apis/replicator.go
@@ -109,7 +109,7 @@ func (rplSv1 *ReplicatorSv1) GetStatQueueProfile(ctx *context.Context, tntID *ut
}
// GetTrend is the remote method coresponding to the dataDb driver method
-func (rplSv1 *ReplicatorSv1) GetTrend(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.Trend) error {
+func (rplSv1 *ReplicatorSv1) GetTrend(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *utils.Trend) error {
engine.UpdateReplicationFilters(utils.TrendPrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt]))
rcv, err := rplSv1.dm.DataDB().GetTrendDrv(ctx, tntID.Tenant, tntID.ID)
if err != nil {
@@ -124,7 +124,7 @@ func (rplSv1 *ReplicatorSv1) GetTrend(ctx *context.Context, tntID *utils.TenantI
}
// GetTrendProfile is the remote method coresponding to the dataDb driver method
-func (rplSv1 *ReplicatorSv1) GetTrendProfile(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.TrendProfile) error {
+func (rplSv1 *ReplicatorSv1) GetTrendProfile(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *utils.TrendProfile) error {
engine.UpdateReplicationFilters(utils.TrendProfilePrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt]))
rcv, err := rplSv1.dm.DataDB().GetTrendProfileDrv(ctx, tntID.Tenant, tntID.ID)
if err != nil {
@@ -258,7 +258,7 @@ func (rplSv1 *ReplicatorSv1) SetThreshold(ctx *context.Context, th *engine.Thres
}
// SetTrendProfile is the replication method coresponding to the dataDb driver method
-func (rplSv1 *ReplicatorSv1) SetTrendProfile(ctx *context.Context, trp *engine.TrendProfileWithAPIOpts, reply *string) (err error) {
+func (rplSv1 *ReplicatorSv1) SetTrendProfile(ctx *context.Context, trp *utils.TrendProfileWithAPIOpts, reply *string) (err error) {
if err = rplSv1.dm.DataDB().SetTrendProfileDrv(ctx, trp.TrendProfile); err != nil {
return
}
@@ -276,7 +276,7 @@ func (rplSv1 *ReplicatorSv1) SetTrendProfile(ctx *context.Context, trp *engine.T
}
// SetTrend is the replication method coresponding to the dataDb driver method
-func (rplSv1 *ReplicatorSv1) SetTrend(ctx *context.Context, tr *engine.TrendWithAPIOpts, reply *string) (err error) {
+func (rplSv1 *ReplicatorSv1) SetTrend(ctx *context.Context, tr *utils.TrendWithAPIOpts, reply *string) (err error) {
if err = rplSv1.dm.DataDB().SetTrendDrv(ctx, tr.Trend); err != nil {
return
}
diff --git a/apis/trends.go b/apis/trends.go
index a50699001..110b94ba3 100644
--- a/apis/trends.go
+++ b/apis/trends.go
@@ -20,12 +20,11 @@ package apis
import (
"github.com/cgrates/birpc/context"
- "github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
// GetTrendProfile returns a Trend profile
-func (adms *AdminSv1) GetTrendProfile(ctx *context.Context, arg *utils.TenantIDWithAPIOpts, reply *engine.TrendProfile) (err error) {
+func (adms *AdminSv1) GetTrendProfile(ctx *context.Context, arg *utils.TenantIDWithAPIOpts, reply *utils.TrendProfile) (err error) {
if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
@@ -70,7 +69,7 @@ func (adms *AdminSv1) GetTrendProfileIDs(ctx *context.Context, args *utils.ArgsI
}
// GetTrendProfiles returns a list of stats profiles registered for a tenant
-func (admS *AdminSv1) GetTrendProfiles(ctx *context.Context, args *utils.ArgsItemIDs, sqPrfs *[]*engine.TrendProfile) (err error) {
+func (admS *AdminSv1) GetTrendProfiles(ctx *context.Context, args *utils.ArgsItemIDs, sqPrfs *[]*utils.TrendProfile) (err error) {
tnt := args.Tenant
if tnt == utils.EmptyString {
tnt = admS.cfg.GeneralCfg().DefaultTenant
@@ -79,9 +78,9 @@ func (admS *AdminSv1) GetTrendProfiles(ctx *context.Context, args *utils.ArgsIte
if err = admS.GetTrendProfileIDs(ctx, args, &sqPrfIDs); err != nil {
return
}
- *sqPrfs = make([]*engine.TrendProfile, 0, len(sqPrfIDs))
+ *sqPrfs = make([]*utils.TrendProfile, 0, len(sqPrfIDs))
for _, sqPrfID := range sqPrfIDs {
- var sqPrf *engine.TrendProfile
+ var sqPrf *utils.TrendProfile
sqPrf, err = admS.dm.GetTrendProfile(ctx, tnt, sqPrfID, true, true, utils.NonTransactional)
if err != nil {
return utils.APIErrorHandler(err)
@@ -111,7 +110,7 @@ func (admS *AdminSv1) GetTrendProfilesCount(ctx *context.Context, args *utils.Ar
}
// SetTrendProfile alters/creates a TrendProfile
-func (adms *AdminSv1) SetTrendProfile(ctx *context.Context, arg *engine.TrendProfileWithAPIOpts, reply *string) (err error) {
+func (adms *AdminSv1) SetTrendProfile(ctx *context.Context, arg *utils.TrendProfileWithAPIOpts, reply *string) (err error) {
if missing := utils.MissingStructFields(arg.TrendProfile, []string{utils.ID}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
@@ -140,30 +139,3 @@ func (adms *AdminSv1) RemoveTrendProfile(ctx *context.Context, args *utils.Tenan
*reply = utils.OK
return nil
}
-
-// NewTrendSv1 initializes TrendSV1
-func NewTrendSv1(trS *engine.TrendS) *TrendSv1 {
- return &TrendSv1{trS: trS}
-}
-
-// TrendSv1 exports RPC from RLs
-type TrendSv1 struct {
- ping
- trS *engine.TrendS
-}
-
-func (trs *TrendSv1) ScheduleQueries(ctx *context.Context, args *utils.ArgScheduleTrendQueries, scheduled *int) error {
- return trs.trS.V1ScheduleQueries(ctx, args, scheduled)
-}
-
-func (trs *TrendSv1) GetTrend(ctx *context.Context, args *utils.ArgGetTrend, trend *engine.Trend) error {
- return trs.trS.V1GetTrend(ctx, args, trend)
-}
-
-func (trs *TrendSv1) GetScheduledTrends(ctx *context.Context, args *utils.ArgScheduledTrends, schedTrends *[]utils.ScheduledTrend) error {
- return trs.trS.V1GetScheduledTrends(ctx, args, schedTrends)
-}
-
-func (trs *TrendSv1) GetTrendSummary(ctx *context.Context, arg utils.TenantIDWithAPIOpts, reply *engine.TrendSummary) error {
- return trs.trS.V1GetTrendSummary(ctx, arg, reply)
-}
diff --git a/engine/datadbmock.go b/engine/datadbmock.go
index 00cb02de5..2c2e9e7c4 100644
--- a/engine/datadbmock.go
+++ b/engine/datadbmock.go
@@ -48,8 +48,8 @@ type DataDBMock struct {
RemoveResourceProfileDrvF func(ctx *context.Context, tnt, id string) error
RemoveResourceDrvF func(ctx *context.Context, tnt, id string) error
SetResourceDrvF func(ctx *context.Context, r *Resource) error
- SetTrendProfileDrvF func(ctx *context.Context, tr *TrendProfile) (err error)
- GetTrendProfileDrvF func(ctx *context.Context, tenant string, id string) (sq *TrendProfile, err error)
+ SetTrendProfileDrvF func(ctx *context.Context, tr *utils.TrendProfile) (err error)
+ GetTrendProfileDrvF func(ctx *context.Context, tenant string, id string) (sq *utils.TrendProfile, err error)
RemTrendProfileDrvF func(ctx *context.Context, tenant string, id string) (err error)
SetRankingProfileDrvF func(ctx *context.Context, sq *RankingProfile) (err error)
GetRankingProfileDrvF func(ctx *context.Context, tenant string, id string) (sq *RankingProfile, err error)
@@ -261,14 +261,14 @@ func (dbM *DataDBMock) RemoveRankingDrv(ctx *context.Context, _ string, _ string
return utils.ErrNotImplemented
}
-func (dbM *DataDBMock) GetTrendProfileDrv(ctx *context.Context, tenant, id string) (sg *TrendProfile, err error) {
+func (dbM *DataDBMock) GetTrendProfileDrv(ctx *context.Context, tenant, id string) (sg *utils.TrendProfile, err error) {
if dbM.GetStatQueueProfileDrvF != nil {
return dbM.GetTrendProfileDrvF(ctx, tenant, id)
}
return nil, utils.ErrNotImplemented
}
-func (dbM *DataDBMock) SetTrendProfileDrv(ctx *context.Context, trend *TrendProfile) (err error) {
+func (dbM *DataDBMock) SetTrendProfileDrv(ctx *context.Context, trend *utils.TrendProfile) (err error) {
if dbM.SetTrendProfileDrvF(ctx, trend) != nil {
return dbM.SetTrendProfileDrvF(ctx, trend)
}
@@ -451,11 +451,11 @@ func (dbM *DataDBMock) RemoveRateProfileDrv(ctx *context.Context, str1 string, s
return utils.ErrNotImplemented
}
-func (dbM *DataDBMock) GetTrendDrv(ctx *context.Context, tenant, id string) (*Trend, error) {
+func (dbM *DataDBMock) GetTrendDrv(ctx *context.Context, tenant, id string) (*utils.Trend, error) {
return nil, utils.ErrNotImplemented
}
-func (dbM *DataDBMock) SetTrendDrv(ctx *context.Context, tr *Trend) error {
+func (dbM *DataDBMock) SetTrendDrv(ctx *context.Context, tr *utils.Trend) error {
return utils.ErrNotImplemented
}
diff --git a/engine/datamanager.go b/engine/datamanager.go
index 4ff08dbe5..4595c4d78 100644
--- a/engine/datamanager.go
+++ b/engine/datamanager.go
@@ -949,7 +949,7 @@ func (dm *DataManager) RemoveStatQueueProfile(ctx *context.Context, tenant, id s
// GetTrend retrieves a Trend from dataDB
func (dm *DataManager) GetTrend(ctx *context.Context, tenant, id string,
- cacheRead, cacheWrite bool, transactionID string) (tr *Trend, err error) {
+ cacheRead, cacheWrite bool, transactionID string) (tr *utils.Trend, err error) {
tntID := utils.ConcatenatedKey(tenant, id)
if cacheRead {
@@ -957,7 +957,7 @@ func (dm *DataManager) GetTrend(ctx *context.Context, tenant, id string,
if x == nil {
return nil, utils.ErrNotFound
}
- return x.(*Trend), nil
+ return x.(*utils.Trend), nil
}
}
if dm == nil {
@@ -996,7 +996,7 @@ func (dm *DataManager) GetTrend(ctx *context.Context, tenant, id string,
return
}
}
- if err = tr.uncompress(dm.ms); err != nil {
+ if err = tr.Uncompress(dm.ms); err != nil {
return nil, err
}
if cacheWrite {
@@ -1010,12 +1010,12 @@ func (dm *DataManager) GetTrend(ctx *context.Context, tenant, id string,
}
// SetTrend stores Trend in dataDB
-func (dm *DataManager) SetTrend(ctx *context.Context, tr *Trend) (err error) {
+func (dm *DataManager) SetTrend(ctx *context.Context, tr *utils.Trend) (err error) {
if dm == nil {
return utils.ErrNoDatabaseConn
}
if dm.dataDB.GetStorageType() != utils.MetaInternal {
- if tr, err = tr.compress(dm.ms); err != nil {
+ if tr, err = tr.Compress(dm.ms, dm.cfg.TrendSCfg().StoreUncompressedLimit); err != nil {
return
}
}
@@ -1027,7 +1027,7 @@ func (dm *DataManager) SetTrend(ctx *context.Context, tr *Trend) (err error) {
dm.cfg.DataDbCfg().RplFiltered,
utils.TrendPrefix, tr.TenantID(), // this are used to get the host IDs from cache
utils.ReplicatorSv1SetTrend,
- &TrendWithAPIOpts{
+ &utils.TrendWithAPIOpts{
Trend: tr,
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
dm.cfg.DataDbCfg().RplCache, utils.EmptyString)}); err != nil {
@@ -1059,14 +1059,14 @@ func (dm *DataManager) RemoveTrend(ctx *context.Context, tenant, id string) (err
}
func (dm *DataManager) GetTrendProfile(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool,
- transactionID string) (trp *TrendProfile, err error) {
+ transactionID string) (trp *utils.TrendProfile, err error) {
tntID := utils.ConcatenatedKey(tenant, id)
if cacheRead {
if x, ok := Cache.Get(utils.CacheTrendProfiles, tntID); ok {
if x == nil {
return nil, utils.ErrNotFound
}
- return x.(*TrendProfile), nil
+ return x.(*utils.TrendProfile), nil
}
}
if dm == nil {
@@ -1140,7 +1140,7 @@ func (dm *DataManager) GetTrendProfileIDs(ctx *context.Context, tenants []string
return
}
-func (dm *DataManager) SetTrendProfile(ctx *context.Context, trp *TrendProfile) (err error) {
+func (dm *DataManager) SetTrendProfile(ctx *context.Context, trp *utils.TrendProfile) (err error) {
if dm == nil {
return utils.ErrNoDatabaseConn
}
@@ -1156,7 +1156,7 @@ func (dm *DataManager) SetTrendProfile(ctx *context.Context, trp *TrendProfile)
dm.cfg.DataDbCfg().RplFiltered,
utils.TrendProfilePrefix, trp.TenantID(),
utils.ReplicatorSv1SetTrendProfile,
- &TrendProfileWithAPIOpts{
+ &utils.TrendProfileWithAPIOpts{
TrendProfile: trp,
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
dm.cfg.DataDbCfg().RplCache, utils.EmptyString)})
@@ -1164,7 +1164,7 @@ func (dm *DataManager) SetTrendProfile(ctx *context.Context, trp *TrendProfile)
if oldTrd == nil ||
oldTrd.QueueLength != trp.QueueLength ||
oldTrd.Schedule != trp.Schedule {
- if err = dm.SetTrend(ctx, NewTrendFromProfile(trp)); err != nil {
+ if err = dm.SetTrend(ctx, utils.NewTrendFromProfile(trp)); err != nil {
return
}
}
diff --git a/engine/dynamicdp.go b/engine/dynamicdp.go
index 7b328d130..d9d1552d6 100644
--- a/engine/dynamicdp.go
+++ b/engine/dynamicdp.go
@@ -131,7 +131,7 @@ func (dDP *dynamicDP) fieldAsInterface(fldPath []string) (val any, err error) {
case utils.MetaTrends:
//sample of fieldName : ~*trends.TrendID.Metrics.*acd.Value
- var trendSum TrendSummary
+ var trendSum utils.TrendSummary
if err := connMgr.Call(context.TODO(), dDP.trdConns, utils.TrendSv1GetTrendSummary, &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: dDP.tenant, ID: fldPath[1]}}, &trendSum); err != nil {
return nil, err
}
diff --git a/engine/filters_test.go b/engine/filters_test.go
index 2e73f6ea0..c1fa55d01 100644
--- a/engine/filters_test.go
+++ b/engine/filters_test.go
@@ -2533,11 +2533,11 @@ func TestFilterTrends(t *testing.T) {
if argGetTrend.ID == "Trend1" && argGetTrend.Tenant == "cgrates.org" {
now := time.Now()
now2 := now.Add(time.Second)
- tr := Trend{
+ tr := utils.Trend{
Tenant: "cgrates.org",
ID: "Trend1",
RunTimes: []time.Time{now, now2},
- Metrics: map[time.Time]map[string]*MetricWithTrend{
+ Metrics: map[time.Time]map[string]*utils.MetricWithTrend{
now: {
"*acc": {ID: "*acc", Value: 45, TrendGrowth: -1.0, TrendLabel: utils.NotAvailable},
"*acd": {ID: "*acd", Value: 50, TrendGrowth: -1.0, TrendLabel: utils.NotAvailable},
@@ -2548,8 +2548,8 @@ func TestFilterTrends(t *testing.T) {
},
},
}
- trS := tr.asTrendSummary()
- *reply.(*TrendSummary) = *trS
+ trS := tr.AsTrendSummary()
+ *reply.(*utils.TrendSummary) = *trS
return nil
}
return utils.ErrNotFound
diff --git a/engine/model_helpers.go b/engine/model_helpers.go
index b1d70e393..35c97bec9 100644
--- a/engine/model_helpers.go
+++ b/engine/model_helpers.go
@@ -820,8 +820,8 @@ func APItoModelTrends(tr *utils.TPTrendsProfile) (mdls TrendMdls) {
return
}
-func APItoTrends(tpTR *utils.TPTrendsProfile) (tr *TrendProfile, err error) {
- tr = &TrendProfile{
+func APItoTrends(tpTR *utils.TPTrendsProfile) (tr *utils.TrendProfile, err error) {
+ tr = &utils.TrendProfile{
Tenant: tpTR.Tenant,
ID: tpTR.ID,
StatID: tpTR.StatID,
@@ -844,7 +844,7 @@ func APItoTrends(tpTR *utils.TPTrendsProfile) (tr *TrendProfile, err error) {
return
}
-func TrendProfileToAPI(tr *TrendProfile) (tpTR *utils.TPTrendsProfile) {
+func TrendProfileToAPI(tr *utils.TrendProfile) (tpTR *utils.TPTrendsProfile) {
tpTR = &utils.TPTrendsProfile{
Tenant: tr.Tenant,
ID: tr.ID,
diff --git a/engine/storage_interface.go b/engine/storage_interface.go
index 17860a890..98e405113 100644
--- a/engine/storage_interface.go
+++ b/engine/storage_interface.go
@@ -70,11 +70,11 @@ type DataDB interface {
SetRankingDrv(ctx *context.Context, rn *Ranking) (err error)
GetRankingDrv(ctx *context.Context, tenant string, id string) (sq *Ranking, err error)
RemoveRankingDrv(ctx *context.Context, tenant string, id string) (err error)
- SetTrendProfileDrv(ctx *context.Context, sq *TrendProfile) (err error)
- GetTrendProfileDrv(ctx *context.Context, tenant string, id string) (sq *TrendProfile, err error)
+ SetTrendProfileDrv(ctx *context.Context, sq *utils.TrendProfile) (err error)
+ GetTrendProfileDrv(ctx *context.Context, tenant string, id string) (sq *utils.TrendProfile, err error)
RemTrendProfileDrv(ctx *context.Context, tenant string, id string) (err error)
- GetTrendDrv(ctx *context.Context, tenant string, id string) (*Trend, error)
- SetTrendDrv(ctx *context.Context, tr *Trend) error
+ GetTrendDrv(ctx *context.Context, tenant string, id string) (*utils.Trend, error)
+ SetTrendDrv(ctx *context.Context, tr *utils.Trend) error
RemoveTrendDrv(ctx *context.Context, tenant string, id string) error
GetFilterDrv(ctx *context.Context, tnt string, id string) (*Filter, error)
SetFilterDrv(ctx *context.Context, f *Filter) error
diff --git a/engine/storage_internal_datadb.go b/engine/storage_internal_datadb.go
index 5657556d0..3d63e75d1 100644
--- a/engine/storage_internal_datadb.go
+++ b/engine/storage_internal_datadb.go
@@ -326,7 +326,7 @@ func (iDB *InternalDB) GetThresholdProfileDrv(_ *context.Context, tenant, id str
return x.(*ThresholdProfile), nil
}
-func (iDB *InternalDB) SetTrendProfileDrv(_ *context.Context, srp *TrendProfile) (err error) {
+func (iDB *InternalDB) SetTrendProfileDrv(_ *context.Context, srp *utils.TrendProfile) (err error) {
iDB.db.Set(utils.CacheTrendProfiles, srp.TenantID(), srp, nil, true, utils.NonTransactional)
return nil
}
@@ -336,23 +336,23 @@ func (iDB *InternalDB) RemTrendProfileDrv(_ *context.Context, tenant, id string)
return nil
}
-func (iDB *InternalDB) GetTrendProfileDrv(_ *context.Context, tenant, id string) (sg *TrendProfile, err error) {
+func (iDB *InternalDB) GetTrendProfileDrv(_ *context.Context, tenant, id string) (sg *utils.TrendProfile, err error) {
x, ok := iDB.db.Get(utils.CacheTrendProfiles, utils.ConcatenatedKey(tenant, id))
if !ok || x == nil {
return nil, utils.ErrNotFound
}
- return x.(*TrendProfile), nil
+ return x.(*utils.TrendProfile), nil
}
-func (iDB *InternalDB) GetTrendDrv(_ *context.Context, tenant, id string) (th *Trend, err error) {
+func (iDB *InternalDB) GetTrendDrv(_ *context.Context, tenant, id string) (th *utils.Trend, err error) {
x, ok := iDB.db.Get(utils.CacheTrends, utils.ConcatenatedKey(tenant, id))
if !ok || x == nil {
return nil, utils.ErrNotFound
}
- return x.(*Trend), nil
+ return x.(*utils.Trend), nil
}
-func (iDB *InternalDB) SetTrendDrv(_ *context.Context, tr *Trend) (err error) {
+func (iDB *InternalDB) SetTrendDrv(_ *context.Context, tr *utils.Trend) (err error) {
iDB.db.Set(utils.CacheTrends, tr.TenantID(), tr, nil,
true, utils.NonTransactional)
return
diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go
index baff8fb7a..e06b46130 100644
--- a/engine/storage_mongo_datadb.go
+++ b/engine/storage_mongo_datadb.go
@@ -804,8 +804,8 @@ func (ms *MongoStorage) RemoveRankingDrv(ctx *context.Context, tenant, id string
})
}
-func (ms *MongoStorage) GetTrendProfileDrv(ctx *context.Context, tenant, id string) (*TrendProfile, error) {
- srProfile := new(TrendProfile)
+func (ms *MongoStorage) GetTrendProfileDrv(ctx *context.Context, tenant, id string) (*utils.TrendProfile, error) {
+ srProfile := new(utils.TrendProfile)
err := ms.query(ctx, func(sctx mongo.SessionContext) error {
sr := ms.getCol(ColTrs).FindOne(sctx, bson.M{"tenant": tenant, "id": id})
decodeErr := sr.Decode(srProfile)
@@ -817,7 +817,7 @@ func (ms *MongoStorage) GetTrendProfileDrv(ctx *context.Context, tenant, id stri
return srProfile, err
}
-func (ms *MongoStorage) SetTrendProfileDrv(ctx *context.Context, srp *TrendProfile) (err error) {
+func (ms *MongoStorage) SetTrendProfileDrv(ctx *context.Context, srp *utils.TrendProfile) (err error) {
return ms.query(ctx, func(sctx mongo.SessionContext) error {
_, err := ms.getCol(ColTrs).UpdateOne(sctx, bson.M{"tenant": srp.Tenant, "id": srp.ID},
bson.M{"$set": srp},
@@ -836,8 +836,8 @@ func (ms *MongoStorage) RemTrendProfileDrv(ctx *context.Context, tenant, id stri
})
}
-func (ms *MongoStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (*Trend, error) {
- tr := new(Trend)
+func (ms *MongoStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (*utils.Trend, error) {
+ tr := new(utils.Trend)
err := ms.query(ctx, func(sctx mongo.SessionContext) error {
sr := ms.getCol(ColTrd).FindOne(sctx, bson.M{"tenant": tenant, "id": id})
decodeErr := sr.Decode(tr)
@@ -849,7 +849,7 @@ func (ms *MongoStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (*T
return tr, err
}
-func (ms *MongoStorage) SetTrendDrv(ctx *context.Context, tr *Trend) error {
+func (ms *MongoStorage) SetTrendDrv(ctx *context.Context, tr *utils.Trend) error {
return ms.query(ctx, func(sctx mongo.SessionContext) error {
_, err := ms.getCol(ColTrd).UpdateOne(sctx, bson.M{"tenant": tr.Tenant, "id": tr.ID},
bson.M{"$set": tr},
diff --git a/engine/storage_redis.go b/engine/storage_redis.go
index a61e4b2db..fafd70b49 100644
--- a/engine/storage_redis.go
+++ b/engine/storage_redis.go
@@ -529,7 +529,7 @@ func (rs *RedisStorage) RemStatQueueDrv(ctx *context.Context, tenant, id string)
return rs.Cmd(nil, redisDEL, utils.StatQueuePrefix+utils.ConcatenatedKey(tenant, id))
}
-func (rs *RedisStorage) SetTrendProfileDrv(ctx *context.Context, sg *TrendProfile) (err error) {
+func (rs *RedisStorage) SetTrendProfileDrv(ctx *context.Context, sg *utils.TrendProfile) (err error) {
var result []byte
if result, err = rs.ms.Marshal(sg); err != nil {
return
@@ -537,7 +537,7 @@ func (rs *RedisStorage) SetTrendProfileDrv(ctx *context.Context, sg *TrendProfil
return rs.Cmd(nil, redisSET, utils.TrendProfilePrefix+utils.ConcatenatedKey(sg.Tenant, sg.ID), string(result))
}
-func (rs *RedisStorage) GetTrendProfileDrv(ctx *context.Context, tenant string, id string) (sg *TrendProfile, err error) {
+func (rs *RedisStorage) GetTrendProfileDrv(ctx *context.Context, tenant string, id string) (sg *utils.TrendProfile, err error) {
var values []byte
if err = rs.Cmd(&values, redisGET, utils.TrendProfilePrefix+utils.ConcatenatedKey(tenant, id)); err != nil {
return
@@ -553,7 +553,7 @@ func (rs *RedisStorage) RemTrendProfileDrv(ctx *context.Context, tenant string,
return rs.Cmd(nil, redisDEL, utils.TrendProfilePrefix+utils.ConcatenatedKey(tenant, id))
}
-func (rs *RedisStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (r *Trend, err error) {
+func (rs *RedisStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (r *utils.Trend, err error) {
var values []byte
if err = rs.Cmd(&values, redisGET, utils.TrendPrefix+utils.ConcatenatedKey(tenant, id)); err != nil {
return
@@ -565,7 +565,7 @@ func (rs *RedisStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (r
return
}
-func (rs *RedisStorage) SetTrendDrv(ctx *context.Context, r *Trend) (err error) {
+func (rs *RedisStorage) SetTrendDrv(ctx *context.Context, r *utils.Trend) (err error) {
var result []byte
if result, err = rs.ms.Marshal(r); err != nil {
return
diff --git a/engine/tpreader.go b/engine/tpreader.go
index c5b5efcd2..948f851c7 100644
--- a/engine/tpreader.go
+++ b/engine/tpreader.go
@@ -447,7 +447,7 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) {
log.Print("TrendProfiles:")
}
for _, tpTR := range tpr.trProfiles {
- var tr *TrendProfile
+ var tr *utils.TrendProfile
if tr, err = APItoTrends(tpTR); err != nil {
return
}
diff --git a/general_tests/trends_schedule_it_test.go b/general_tests/trends_schedule_it_test.go
index 8fcb20bf5..8041ad102 100644
--- a/general_tests/trends_schedule_it_test.go
+++ b/general_tests/trends_schedule_it_test.go
@@ -119,9 +119,9 @@ cgrates.org,Threshold2,*string:~*req.Metrics.*pdd.ID:*pdd,;10,-1,0,1s,false,,tru
client, _ := ng.Run(t)
time.Sleep(100 * time.Millisecond)
- var tr *engine.Trend
+ var tr *utils.Trend
t.Run("TrendSchedule", func(t *testing.T) {
- var replyTrendProfiles *[]*engine.TrendProfile
+ var replyTrendProfiles *[]*utils.TrendProfile
if err := client.Call(context.Background(), utils.AdminSv1GetTrendProfiles,
&utils.ArgsItemIDs{
Tenant: "cgrates.org",
@@ -265,7 +265,7 @@ cgrates.org,Threshold2,*string:~*req.Metrics.*pdd.ID:*pdd,;10,-1,0,1s,false,,tru
}
// GetTrend with RunIndexEnd large than Runtimes length
- var tr *engine.Trend
+ var tr *utils.Trend
if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_2", RunIndexEnd: 3, TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr); err != nil {
t.Error(err)
} else if len(tr.RunTimes) != 2 || len(tr.Metrics) != 2 {
@@ -298,7 +298,7 @@ cgrates.org,Threshold2,*string:~*req.Metrics.*pdd.ID:*pdd,;10,-1,0,1s,false,,tru
time.Sleep(2 * time.Second)
t.Run("GetTrends", func(t *testing.T) {
// GetTrend with all correctParameters
- var tr *engine.Trend
+ var tr *utils.Trend
timeStart := time.Now().Add(-4 * time.Second).Format(time.RFC3339)
timeEnd := time.Now().Add(-1 * time.Second).Format(time.RFC3339)
if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", RunIndexStart: 1, RunIndexEnd: 3, RunTimeStart: timeStart, RunTimeEnd: timeEnd, TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr); err != nil {
@@ -316,7 +316,7 @@ cgrates.org,Threshold2,*string:~*req.Metrics.*pdd.ID:*pdd,;10,-1,0,1s,false,,tru
if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_2", RunTimeStart: timeStart, RunTimeEnd: timeEnd, TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
- var tr2 engine.Trend
+ var tr2 utils.Trend
// GetTrend with both index args
if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", RunIndexStart: 3, RunIndexEnd: 4, TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr2); err != nil {
t.Error(err)
diff --git a/general_tests/trends_stored_it_test.go b/general_tests/trends_stored_it_test.go
index c3e1b7a97..b23b42b72 100644
--- a/general_tests/trends_stored_it_test.go
+++ b/general_tests/trends_stored_it_test.go
@@ -121,12 +121,12 @@ cgrates.org,Stats1_1,*string:~*req.Account:1001,,,,-1,,,,*tcc;*acd;*tcd,,`}
})
t.Run("GetTrendsAfterStoreInterval", func(t *testing.T) {
- metricsChan := make(chan *engine.Trend, 1)
+ metricsChan := make(chan *utils.Trend, 1)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
go func() {
ticker := time.NewTicker(600 * time.Millisecond)
- var trnd engine.Trend
+ var trnd utils.Trend
for {
select {
case <-ticker.C:
@@ -184,7 +184,7 @@ cgrates.org,Stats1_1,*string:~*req.Account:1001,,,,-1,,,,*tcc;*acd;*tcd,,`}
})
t.Run("GetTrendsNotStored", func(t *testing.T) {
- metricsChan := make(chan *engine.Trend, 1)
+ metricsChan := make(chan *utils.Trend, 1)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
go func() {
@@ -192,7 +192,7 @@ cgrates.org,Stats1_1,*string:~*req.Account:1001,,,,-1,,,,*tcc;*acd;*tcd,,`}
ticker := time.NewTicker(700 * time.Millisecond)
select {
case <-ticker.C:
- var trnd engine.Trend
+ var trnd utils.Trend
// the trend will be not updated since storeinterval is set to 0
err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &trnd)
if err != nil {
@@ -245,7 +245,7 @@ cgrates.org,Stats1_1,*string:~*req.Account:1001,,,,-1,,,,*tcc;*acd;*tcd,,`}
}
})
t.Run("GetTrendsStoredUnlimited", func(t *testing.T) {
- metricsChan := make(chan *engine.Trend, 1)
+ metricsChan := make(chan *utils.Trend, 1)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
go func() {
@@ -253,7 +253,7 @@ cgrates.org,Stats1_1,*string:~*req.Account:1001,,,,-1,,,,*tcc;*acd;*tcd,,`}
for {
select {
case <-ticker.C:
- var trnd engine.Trend
+ var trnd utils.Trend
err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &trnd)
if err != nil {
if err.Error() != utils.ErrNotFound.Error() {
diff --git a/loaders/libloader.go b/loaders/libloader.go
index d35f5f61b..1102e84cc 100644
--- a/loaders/libloader.go
+++ b/loaders/libloader.go
@@ -312,7 +312,7 @@ func newProfileFunc(lType string) func() profile {
}
case utils.MetaTrends:
return func() profile {
- return new(engine.TrendProfile)
+ return new(utils.TrendProfile)
}
case utils.MetaRankings:
return func() profile {
diff --git a/loaders/loader.go b/loaders/loader.go
index f3ebc62c8..a5318715c 100644
--- a/loaders/loader.go
+++ b/loaders/loader.go
@@ -110,7 +110,7 @@ func setToDB(ctx *context.Context, dm *engine.DataManager, lType string, data pr
case utils.MetaAccounts:
return dm.SetAccount(ctx, data.(*utils.Account), withIndex)
case utils.MetaTrends:
- return dm.SetTrendProfile(ctx, data.(*engine.TrendProfile))
+ return dm.SetTrendProfile(ctx, data.(*utils.TrendProfile))
case utils.MetaRankings:
return dm.SetRankingProfile(ctx, data.(*engine.RankingProfile))
}
diff --git a/services/trends.go b/services/trends.go
index 99eafdf6c..cca91606d 100644
--- a/services/trends.go
+++ b/services/trends.go
@@ -25,6 +25,7 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
+ "github.com/cgrates/cgrates/trends"
"github.com/cgrates/cgrates/utils"
)
@@ -39,7 +40,7 @@ func NewTrendService(cfg *config.CGRConfig) *TrendService {
type TrendService struct {
mu sync.RWMutex
cfg *config.CGRConfig
- trs *engine.TrendS
+ trs *trends.TrendS
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -70,7 +71,7 @@ func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager
trs.mu.Lock()
defer trs.mu.Unlock()
- trs.trs = engine.NewTrendService(dbs.DataManager(), trs.cfg, fs.FilterS(), cms.ConnManager())
+ trs.trs = trends.NewTrendService(dbs.DataManager(), trs.cfg, fs.FilterS(), cms.ConnManager())
if err := trs.trs.StartTrendS(context.TODO()); err != nil {
return err
}
diff --git a/tpes/tpe_trends.go b/tpes/tpe_trends.go
index 393d5155e..2b8b3f925 100644
--- a/tpes/tpe_trends.go
+++ b/tpes/tpe_trends.go
@@ -49,7 +49,7 @@ func (tpSts TPTrends) exportItems(ctx *context.Context, wrtr io.Writer, tnt stri
return
}
for _, trendsID := range itmIDs {
- var trendPrf *engine.TrendProfile
+ var trendPrf *utils.TrendProfile
trendPrf, err = tpSts.dm.GetTrendProfile(ctx, tnt, trendsID, true, true, utils.NonTransactional)
if err != nil {
if err.Error() == utils.ErrNotFound.Error() {
diff --git a/tpes/tpe_trends_test.go b/tpes/tpe_trends_test.go
index 55334ff87..0bc45df44 100644
--- a/tpes/tpe_trends_test.go
+++ b/tpes/tpe_trends_test.go
@@ -34,8 +34,8 @@ func TestTPEnewTPTrends(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
connMng := engine.NewConnManager(cfg)
dm := engine.NewDataManager(&engine.DataDBMock{
- GetTrendProfileDrvF: func(ctx *context.Context, tnt string, id string) (*engine.TrendProfile, error) {
- trd := &engine.TrendProfile{
+ GetTrendProfileDrvF: func(ctx *context.Context, tnt string, id string) (*utils.TrendProfile, error) {
+ trd := &utils.TrendProfile{
Tenant: "cgrates.org",
ID: "TRD_2",
}
@@ -59,7 +59,7 @@ func TestTPEExportTrends(t *testing.T) {
tpTrd := TPTrends{
dm: dm,
}
- trd := &engine.TrendProfile{
+ trd := &utils.TrendProfile{
Tenant: "cgrates.org",
ID: "TRD_2",
}
@@ -76,7 +76,7 @@ func TestTPEExportItemsTrendsNoDbConn(t *testing.T) {
tpTrd := TPTrends{
dm: nil,
}
- trd := &engine.TrendProfile{
+ trd := &utils.TrendProfile{
Tenant: "cgrates.org",
ID: "TRD_2",
}
@@ -95,7 +95,7 @@ func TestTPEExportItemsTrendsIDNotFound(t *testing.T) {
tpTrd := TPTrends{
dm: dm,
}
- trd := &engine.TrendProfile{
+ trd := &utils.TrendProfile{
Tenant: "cgrates.org",
ID: "TRD_2",
}
diff --git a/trends/apis.go b/trends/apis.go
new file mode 100644
index 000000000..f03d2aa67
--- /dev/null
+++ b/trends/apis.go
@@ -0,0 +1,154 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package trends
+
+import (
+ "slices"
+ "strings"
+ "time"
+
+ "github.com/cgrates/birpc/context"
+ "github.com/cgrates/cgrates/utils"
+ "github.com/cgrates/cron"
+)
+
+// V1ScheduleQueries manually schedules or reschedules trend queries.
+func (tS *TrendS) V1ScheduleQueries(ctx *context.Context, args *utils.ArgScheduleTrendQueries, scheduled *int) (err error) {
+ if sched, errSched := tS.scheduleTrendQueries(ctx, args.Tenant, args.TrendIDs); errSched != nil {
+ return errSched
+ } else {
+ *scheduled = sched
+ }
+ return
+}
+
+// V1GetTrend retrieves trend metrics with optional time and index filtering.
+func (tS *TrendS) V1GetTrend(ctx *context.Context, arg *utils.ArgGetTrend, retTrend *utils.Trend) (err error) {
+ if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing
+ return utils.NewErrMandatoryIeMissing(missing...)
+ }
+ var trnd *utils.Trend
+ if trnd, err = tS.dm.GetTrend(ctx, arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
+ return
+ }
+ trnd.RLock()
+ defer trnd.RUnlock()
+ retTrend.Tenant = trnd.Tenant // avoid vet complaining for mutex copying
+ retTrend.ID = trnd.ID
+ startIdx := arg.RunIndexStart
+ if startIdx > len(trnd.RunTimes) {
+ startIdx = len(trnd.RunTimes)
+ }
+ endIdx := arg.RunIndexEnd
+ if endIdx > len(trnd.RunTimes) ||
+ endIdx < startIdx ||
+ endIdx == 0 {
+ endIdx = len(trnd.RunTimes)
+ }
+ runTimes := trnd.RunTimes[startIdx:endIdx]
+ if len(runTimes) == 0 {
+ return utils.ErrNotFound
+ }
+ var tStart, tEnd time.Time
+ if arg.RunTimeStart == utils.EmptyString {
+ tStart = runTimes[0]
+ } else if tStart, err = utils.ParseTimeDetectLayout(arg.RunTimeStart, tS.cfg.GeneralCfg().DefaultTimezone); err != nil {
+ return
+ }
+ if arg.RunTimeEnd == utils.EmptyString {
+ tEnd = runTimes[len(runTimes)-1].Add(time.Duration(1))
+ } else if tEnd, err = utils.ParseTimeDetectLayout(arg.RunTimeEnd, tS.cfg.GeneralCfg().DefaultTimezone); err != nil {
+ return
+ }
+ retTrend.RunTimes = make([]time.Time, 0, len(runTimes))
+ for _, runTime := range runTimes {
+ if !runTime.Before(tStart) && runTime.Before(tEnd) {
+ retTrend.RunTimes = append(retTrend.RunTimes, runTime)
+ }
+ }
+ if len(retTrend.RunTimes) == 0 { // filtered out all
+ return utils.ErrNotFound
+ }
+ retTrend.Metrics = make(map[time.Time]map[string]*utils.MetricWithTrend)
+ for _, runTime := range retTrend.RunTimes {
+ retTrend.Metrics[runTime] = trnd.Metrics[runTime]
+ }
+ return
+}
+
+// V1GetScheduledTrends retrieves information about currently scheduled trends.
+func (tS *TrendS) V1GetScheduledTrends(ctx *context.Context, args *utils.ArgScheduledTrends, schedTrends *[]utils.ScheduledTrend) (err error) {
+ tnt := args.Tenant
+ if tnt == utils.EmptyString {
+ tnt = tS.cfg.GeneralCfg().DefaultTenant
+ }
+ tS.crnTQsMux.RLock()
+ defer tS.crnTQsMux.RUnlock()
+ trendIDsMp, has := tS.crnTQs[tnt]
+ if !has {
+ return utils.ErrNotFound
+ }
+ var scheduledTrends []utils.ScheduledTrend
+ var entryIds map[string]cron.EntryID
+ if len(args.TrendIDPrefixes) == 0 {
+ entryIds = trendIDsMp
+ } else {
+ entryIds = make(map[string]cron.EntryID)
+ for _, tID := range args.TrendIDPrefixes {
+ for key, entryID := range trendIDsMp {
+ if strings.HasPrefix(key, tID) {
+ entryIds[key] = entryID
+ }
+ }
+ }
+ }
+ if len(entryIds) == 0 {
+ return utils.ErrNotFound
+ }
+ var entry cron.Entry
+ for id, entryID := range entryIds {
+ entry = tS.crn.Entry(entryID)
+ if entry.ID == 0 {
+ continue
+ }
+ scheduledTrends = append(scheduledTrends, utils.ScheduledTrend{
+ TrendID: id,
+ Next: entry.Next,
+ Previous: entry.Prev,
+ })
+ }
+ slices.SortFunc(scheduledTrends, func(a, b utils.ScheduledTrend) int {
+ return a.Next.Compare(b.Next)
+ })
+ *schedTrends = scheduledTrends
+ return nil
+}
+
+// V1GetTrendSummary retrieves the most recent trend summary.
+func (tS *TrendS) V1GetTrendSummary(ctx *context.Context, arg utils.TenantIDWithAPIOpts, reply *utils.TrendSummary) (err error) {
+ var trnd *utils.Trend
+ if trnd, err = tS.dm.GetTrend(ctx, arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
+ return
+ }
+ trnd.RLock()
+ trndS := trnd.AsTrendSummary()
+ trnd.RUnlock()
+ *reply = *trndS
+ return
+}
diff --git a/engine/trends.go b/trends/trends.go
similarity index 63%
rename from engine/trends.go
rename to trends/trends.go
index 9c3003482..1c6eab197 100644
--- a/engine/trends.go
+++ b/trends/trends.go
@@ -16,24 +16,44 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see
*/
-package engine
+package trends
import (
"fmt"
"runtime"
- "slices"
- "strings"
"sync"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/cron"
)
-func NewTrendService(dm *DataManager,
- cgrcfg *config.CGRConfig, filterS *FilterS, connMgr *ConnManager) (tS *TrendS) {
+// TrendS implements the logic for processing and managing trends based on stat metrics.
+type TrendS struct {
+ dm *engine.DataManager
+ cfg *config.CGRConfig
+ fltrS *engine.FilterS
+ connMgr *engine.ConnManager
+
+ crn *cron.Cron // cron refernce
+
+ crnTQsMux *sync.RWMutex // protects the crnTQs
+ crnTQs map[string]map[string]cron.EntryID // save the EntryIDs for TrendQueries so we can reschedule them when needed
+
+ storedTrends utils.StringSet // keep a record of trends which need saving, map[trendTenantID]bool
+ sTrndsMux sync.RWMutex // protects storedTrends
+ storingStopped chan struct{} // signal back that the operations were stopped
+
+ loopStopped chan struct{}
+ trendStop chan struct{} // signal to stop all operations
+}
+
+// NewTrendService creates a new TrendS service.
+func NewTrendService(dm *engine.DataManager,
+ cgrcfg *config.CGRConfig, filterS *engine.FilterS, connMgr *engine.ConnManager) (tS *TrendS) {
return &TrendS{
dm: dm,
cfg: cgrcfg,
@@ -49,30 +69,9 @@ func NewTrendService(dm *DataManager,
}
}
-// TrendS is responsible of implementing the logic of TrendService
-type TrendS struct {
- dm *DataManager
- cfg *config.CGRConfig
- fltrS *FilterS
- connMgr *ConnManager
-
- crn *cron.Cron // cron refernce
-
- crnTQsMux *sync.RWMutex // protects the crnTQs
- crnTQs map[string]map[string]cron.EntryID // save the EntryIDs for TrendQueries so we can reschedule them when needed
-
- storedTrends utils.StringSet // keep a record of trends which need saving, map[trendTenantID]bool
- sTrndsMux sync.RWMutex // protects storedTrends
- storingStopped chan struct{} // signal back that the operations were stopped
-
- loopStopped chan struct{}
- trendStop chan struct{} // signal to stop all operations
-}
-
-// computeTrend will query a stat and build the Trend for it
-//
-// it is to be called by Cron service
-func (tS *TrendS) computeTrend(ctx *context.Context, tP *TrendProfile) {
+// computeTrend queries a stat and builds the Trend for it based on the TrendProfile configuration.
+// Called by Cron service at scheduled intervals.
+func (tS *TrendS) computeTrend(ctx *context.Context, tP *utils.TrendProfile) {
var floatMetrics map[string]float64
if err := tS.connMgr.Call(context.Background(), tS.cfg.TrendSCfg().StatSConns,
utils.StatSv1GetQueueFloatMetrics,
@@ -92,10 +91,10 @@ func (tS *TrendS) computeTrend(ctx *context.Context, tP *TrendProfile) {
utils.TrendS, tP.Tenant, tP.ID, err.Error()))
return
}
- trnd.tMux.Lock()
- defer trnd.tMux.Unlock()
- if trnd.tPrfl == nil {
- trnd.tPrfl = tP
+ trnd.Lock()
+ defer trnd.Unlock()
+ if trnd.Config() == nil {
+ trnd.SetConfig(tP)
}
trnd.Compile(tP.TTL, tP.QueueLength)
now := time.Now()
@@ -113,25 +112,25 @@ func (tS *TrendS) computeTrend(ctx *context.Context, tP *TrendProfile) {
}
trnd.RunTimes = append(trnd.RunTimes, now)
if trnd.Metrics == nil {
- trnd.Metrics = make(map[time.Time]map[string]*MetricWithTrend)
+ trnd.Metrics = make(map[time.Time]map[string]*utils.MetricWithTrend)
}
- trnd.Metrics[now] = make(map[string]*MetricWithTrend)
+ trnd.Metrics[now] = make(map[string]*utils.MetricWithTrend)
for _, mID := range metrics {
- mWt := &MetricWithTrend{ID: mID}
+ mWt := &utils.MetricWithTrend{ID: mID}
var has bool
if mWt.Value, has = floatMetrics[mID]; !has { // no stats computed for metric
mWt.Value = -1.0
mWt.TrendLabel = utils.NotAvailable
continue
}
- if mWt.TrendGrowth, err = trnd.getTrendGrowth(mID, mWt.Value, tP.CorrelationType,
+ if mWt.TrendGrowth, err = trnd.GetTrendGrowth(mID, mWt.Value, tP.CorrelationType,
tS.cfg.GeneralCfg().RoundingDecimals); err != nil {
mWt.TrendLabel = utils.NotAvailable
} else {
- mWt.TrendLabel = trnd.getTrendLabel(mWt.TrendGrowth, tP.Tolerance)
+ mWt.TrendLabel = utils.GetTrendLabel(mWt.TrendGrowth, tP.Tolerance)
}
trnd.Metrics[now][mWt.ID] = mWt
- trnd.indexesAppendMetric(mWt, now)
+ trnd.IndexesAppendMetric(mWt, now)
}
if err = tS.storeTrend(ctx, trnd); err != nil {
utils.Logger.Warning(
@@ -155,10 +154,10 @@ func (tS *TrendS) computeTrend(ctx *context.Context, tP *TrendProfile) {
}
-// processThresholds will pass the Trend event to ThresholdS
-func (tS *TrendS) processThresholds(trnd *Trend) (err error) {
+// processThresholds sends the computed trend to ThresholdS.
+func (tS *TrendS) processThresholds(trnd *utils.Trend) (err error) {
if len(trnd.RunTimes) == 0 ||
- len(trnd.RunTimes) < trnd.tPrfl.MinItems {
+ len(trnd.RunTimes) < trnd.Config().MinItems {
return
}
if len(tS.cfg.TrendSCfg().ThresholdSConns) == 0 {
@@ -168,16 +167,16 @@ func (tS *TrendS) processThresholds(trnd *Trend) (err error) {
utils.MetaEventType: utils.TrendUpdate,
}
var thIDs []string
- if len(trnd.tPrfl.ThresholdIDs) != 0 {
- if len(trnd.tPrfl.ThresholdIDs) == 1 &&
- trnd.tPrfl.ThresholdIDs[0] == utils.MetaNone {
+ if len(trnd.Config().ThresholdIDs) != 0 {
+ if len(trnd.Config().ThresholdIDs) == 1 &&
+ trnd.Config().ThresholdIDs[0] == utils.MetaNone {
return
}
- thIDs = make([]string, len(trnd.tPrfl.ThresholdIDs))
- copy(thIDs, trnd.tPrfl.ThresholdIDs)
+ thIDs = make([]string, len(trnd.Config().ThresholdIDs))
+ copy(thIDs, trnd.Config().ThresholdIDs)
}
opts[utils.OptsThresholdsProfileIDs] = thIDs
- ts := trnd.asTrendSummary()
+ ts := trnd.AsTrendSummary()
trndEv := &utils.CGREvent{
Tenant: trnd.Tenant,
ID: utils.GenUUID(),
@@ -203,10 +202,10 @@ func (tS *TrendS) processThresholds(trnd *Trend) (err error) {
return
}
-// processEEs will pass the Trend event to EEs
-func (tS *TrendS) processEEs(trnd *Trend) (err error) {
+// processEEs sends the computed trend to EEs.
+func (tS *TrendS) processEEs(trnd *utils.Trend) (err error) {
if len(trnd.RunTimes) == 0 ||
- len(trnd.RunTimes) < trnd.tPrfl.MinItems {
+ len(trnd.RunTimes) < trnd.Config().MinItems {
return
}
if len(tS.cfg.TrendSCfg().EEsConns) == 0 {
@@ -215,7 +214,7 @@ func (tS *TrendS) processEEs(trnd *Trend) (err error) {
opts := map[string]any{
utils.MetaEventType: utils.TrendUpdate,
}
- ts := trnd.asTrendSummary()
+ ts := trnd.AsTrendSummary()
trndEv := &utils.CGREventWithEeIDs{
CGREvent: &utils.CGREvent{
Tenant: trnd.Tenant,
@@ -244,8 +243,8 @@ func (tS *TrendS) processEEs(trnd *Trend) (err error) {
return
}
-// storeTrend will store or schedule the trend based on settings
-func (tS *TrendS) storeTrend(ctx *context.Context, trnd *Trend) (err error) {
+// storeTrend stores or schedules the trend for storage based on "store_interval".
+func (tS *TrendS) storeTrend(ctx *context.Context, trnd *utils.Trend) (err error) {
if tS.cfg.TrendSCfg().StoreInterval == 0 {
return
}
@@ -260,10 +259,9 @@ func (tS *TrendS) storeTrend(ctx *context.Context, trnd *Trend) (err error) {
return
}
-// storeTrends will do one round for saving modified trends
-//
-// from cache to dataDB
-// designed to run asynchronously
+// storeTrends stores modified trends from cache in dataDB
+// Reschedules failed trend IDs for next storage cycle.
+// This function is safe for concurrent use.
func (tS *TrendS) storeTrends(ctx *context.Context) {
var failedTrndIDs []string
for {
@@ -276,7 +274,7 @@ func (tS *TrendS) storeTrends(ctx *context.Context) {
if trndID == utils.EmptyString {
break // no more keys, backup completed
}
- trndIf, ok := Cache.Get(utils.CacheTrends, trndID)
+ trndIf, ok := engine.Cache.Get(utils.CacheTrends, trndID)
if !ok || trndIf == nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> failed retrieving from cache Trend with ID: %q",
@@ -284,15 +282,15 @@ func (tS *TrendS) storeTrends(ctx *context.Context) {
failedTrndIDs = append(failedTrndIDs, trndID) // record failure so we can schedule it for next backup
continue
}
- trnd := trndIf.(*Trend)
- trnd.tMux.Lock()
+ trnd := trndIf.(*utils.Trend)
+ trnd.Lock()
if err := tS.dm.SetTrend(ctx, trnd); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> failed storing Trend with ID: %q, err: %q",
utils.TrendS, trndID, err))
failedTrndIDs = append(failedTrndIDs, trndID) // record failure so we can schedule it for next backup
}
- trnd.tMux.Unlock()
+ trnd.Unlock()
// randomize the CPU load and give up thread control
runtime.Gosched()
}
@@ -303,7 +301,7 @@ func (tS *TrendS) storeTrends(ctx *context.Context) {
}
}
-// asyncStoreTrends runs as a backround process, calling storeTrends based on storeInterval
+// asyncStoreTrends runs as a background process that periodically calls storeTrends.
func (tS *TrendS) asyncStoreTrends(ctx *context.Context) {
storeInterval := tS.cfg.TrendSCfg().StoreInterval
if storeInterval <= 0 {
@@ -321,7 +319,7 @@ func (tS *TrendS) asyncStoreTrends(ctx *context.Context) {
}
}
-// StartCron will activates the Cron, together with all scheduled Trend queries
+// StartTrendS activates the Cron service with scheduled trend queries.
func (tS *TrendS) StartTrendS(ctx *context.Context) error {
if err := tS.scheduleAutomaticQueries(ctx); err != nil {
return err
@@ -331,7 +329,7 @@ func (tS *TrendS) StartTrendS(ctx *context.Context) error {
return nil
}
-// StopCron will shutdown the Cron tasks
+// StopTrendS gracefully shuts down Cron tasks and trend operations.
func (tS *TrendS) StopTrendS() {
timeEnd := time.Now().Add(tS.cfg.CoreSCfg().ShutdownTimeout)
@@ -360,6 +358,7 @@ func (tS *TrendS) StopTrendS() {
}
}
+// Reload restarts trend services with updated configuration.
func (tS *TrendS) Reload(ctx *context.Context) {
crnctx := tS.crn.Stop()
close(tS.trendStop)
@@ -371,7 +370,7 @@ func (tS *TrendS) Reload(ctx *context.Context) {
go tS.asyncStoreTrends(ctx)
}
-// scheduleAutomaticQueries will schedule the queries at start/reload based on configured
+// scheduleAutomaticQueries schedules initial trend queries based on configuration.
func (tS *TrendS) scheduleAutomaticQueries(ctx *context.Context) error {
schedData := make(map[string][]string)
for k, v := range tS.cfg.TrendSCfg().ScheduledIDs {
@@ -403,7 +402,8 @@ func (tS *TrendS) scheduleAutomaticQueries(ctx *context.Context) error {
return nil
}
-// scheduleTrendQueries will schedule/re-schedule specific trend queries
+// scheduleTrendQueries schedules or reschedules specific trend queries
+// Safe for concurrent use.
func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs []string) (scheduled int, err error) {
var partial bool
tS.crnTQsMux.Lock()
@@ -442,129 +442,3 @@ func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs []
}
return
}
-
-// V1ScheduleQueries is the query for manually re-/scheduling Trend Queries
-func (tS *TrendS) V1ScheduleQueries(ctx *context.Context, args *utils.ArgScheduleTrendQueries, scheduled *int) (err error) {
- if sched, errSched := tS.scheduleTrendQueries(ctx, args.Tenant, args.TrendIDs); errSched != nil {
- return errSched
- } else {
- *scheduled = sched
- }
- return
-}
-
-// V1GetTrend is the API to return the trend Metrics
-// The number of runTimes can be filtered based on indexes and times provided as arguments
-//
-// in this way being possible to work with paginators
-func (tS *TrendS) V1GetTrend(ctx *context.Context, arg *utils.ArgGetTrend, retTrend *Trend) (err error) {
- if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing
- return utils.NewErrMandatoryIeMissing(missing...)
- }
- var trnd *Trend
- if trnd, err = tS.dm.GetTrend(ctx, arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
- return
- }
- trnd.tMux.RLock()
- defer trnd.tMux.RUnlock()
- retTrend.Tenant = trnd.Tenant // avoid vet complaining for mutex copying
- retTrend.ID = trnd.ID
- startIdx := arg.RunIndexStart
- if startIdx > len(trnd.RunTimes) {
- startIdx = len(trnd.RunTimes)
- }
- endIdx := arg.RunIndexEnd
- if endIdx > len(trnd.RunTimes) ||
- endIdx < startIdx ||
- endIdx == 0 {
- endIdx = len(trnd.RunTimes)
- }
- runTimes := trnd.RunTimes[startIdx:endIdx]
- if len(runTimes) == 0 {
- return utils.ErrNotFound
- }
- var tStart, tEnd time.Time
- if arg.RunTimeStart == utils.EmptyString {
- tStart = runTimes[0]
- } else if tStart, err = utils.ParseTimeDetectLayout(arg.RunTimeStart, tS.cfg.GeneralCfg().DefaultTimezone); err != nil {
- return
- }
- if arg.RunTimeEnd == utils.EmptyString {
- tEnd = runTimes[len(runTimes)-1].Add(time.Duration(1))
- } else if tEnd, err = utils.ParseTimeDetectLayout(arg.RunTimeEnd, tS.cfg.GeneralCfg().DefaultTimezone); err != nil {
- return
- }
- retTrend.RunTimes = make([]time.Time, 0, len(runTimes))
- for _, runTime := range runTimes {
- if !runTime.Before(tStart) && runTime.Before(tEnd) {
- retTrend.RunTimes = append(retTrend.RunTimes, runTime)
- }
- }
- if len(retTrend.RunTimes) == 0 { // filtered out all
- return utils.ErrNotFound
- }
- retTrend.Metrics = make(map[time.Time]map[string]*MetricWithTrend)
- for _, runTime := range retTrend.RunTimes {
- retTrend.Metrics[runTime] = trnd.Metrics[runTime]
- }
- return
-}
-
-func (tS *TrendS) V1GetScheduledTrends(ctx *context.Context, args *utils.ArgScheduledTrends, schedTrends *[]utils.ScheduledTrend) (err error) {
- tnt := args.Tenant
- if tnt == utils.EmptyString {
- tnt = tS.cfg.GeneralCfg().DefaultTenant
- }
- tS.crnTQsMux.RLock()
- defer tS.crnTQsMux.RUnlock()
- trendIDsMp, has := tS.crnTQs[tnt]
- if !has {
- return utils.ErrNotFound
- }
- var scheduledTrends []utils.ScheduledTrend
- var entryIds map[string]cron.EntryID
- if len(args.TrendIDPrefixes) == 0 {
- entryIds = trendIDsMp
- } else {
- entryIds = make(map[string]cron.EntryID)
- for _, tID := range args.TrendIDPrefixes {
- for key, entryID := range trendIDsMp {
- if strings.HasPrefix(key, tID) {
- entryIds[key] = entryID
- }
- }
- }
- }
- if len(entryIds) == 0 {
- return utils.ErrNotFound
- }
- var entry cron.Entry
- for id, entryID := range entryIds {
- entry = tS.crn.Entry(entryID)
- if entry.ID == 0 {
- continue
- }
- scheduledTrends = append(scheduledTrends, utils.ScheduledTrend{
- TrendID: id,
- Next: entry.Next,
- Previous: entry.Prev,
- })
- }
- slices.SortFunc(scheduledTrends, func(a, b utils.ScheduledTrend) int {
- return a.Next.Compare(b.Next)
- })
- *schedTrends = scheduledTrends
- return nil
-}
-
-func (tS *TrendS) V1GetTrendSummary(ctx *context.Context, arg utils.TenantIDWithAPIOpts, reply *TrendSummary) (err error) {
- var trnd *Trend
- if trnd, err = tS.dm.GetTrend(ctx, arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
- return
- }
- trnd.tMux.RLock()
- trndS := trnd.asTrendSummary()
- trnd.tMux.RUnlock()
- *reply = *trndS
- return
-}
diff --git a/apis/trends_it_test.go b/trends/trends_it_test.go
similarity index 96%
rename from apis/trends_it_test.go
rename to trends/trends_it_test.go
index f5118108d..e8b6da74c 100644
--- a/apis/trends_it_test.go
+++ b/trends/trends_it_test.go
@@ -19,7 +19,7 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see
*/
-package apis
+package trends
import (
"path"
@@ -41,7 +41,7 @@ var (
trCfg *config.CGRConfig
trRPC *birpc.Client
trConfigDIR string //run tests for specific configuration
- trendProfiles []*engine.TrendProfileWithAPIOpts
+ trendProfiles []*utils.TrendProfileWithAPIOpts
sTestsTr = []func(t *testing.T){
testTrendSInitCfg,
@@ -121,7 +121,7 @@ func testTrendsRPCConn(t *testing.T) {
}
func testTrendsGetTrendProfileBeforeSet(t *testing.T) {
- var replyTrendProfile engine.TrendProfile
+ var replyTrendProfile utils.TrendProfile
if err := trRPC.Call(context.Background(), utils.AdminSv1GetTrendProfile,
&utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{
@@ -133,7 +133,7 @@ func testTrendsGetTrendProfileBeforeSet(t *testing.T) {
}
func testTrendsGetTrendProfilesBeforeSet(t *testing.T) {
- var replyTrendProfiles *[]*engine.TrendProfile
+ var replyTrendProfiles *[]*utils.TrendProfile
if err := trRPC.Call(context.Background(), utils.AdminSv1GetTrendProfiles,
&utils.ArgsItemIDs{
Tenant: "cgrates.org",
@@ -165,9 +165,9 @@ func testTrendsGetTrendProfileCountBeforeSet(t *testing.T) {
}
func testTrendsSetTrendProfiles(t *testing.T) {
- trendProfiles = []*engine.TrendProfileWithAPIOpts{
+ trendProfiles = []*utils.TrendProfileWithAPIOpts{
{
- TrendProfile: &engine.TrendProfile{
+ TrendProfile: &utils.TrendProfile{
ID: "Trend1",
StatID: "Stats1",
Tenant: "cgrates.org",
@@ -183,7 +183,7 @@ func testTrendsSetTrendProfiles(t *testing.T) {
},
},
{
- TrendProfile: &engine.TrendProfile{
+ TrendProfile: &utils.TrendProfile{
ID: "Trend2",
StatID: "Stats2",
Tenant: "cgrates.org",
@@ -196,7 +196,7 @@ func testTrendsSetTrendProfiles(t *testing.T) {
},
},
{
- TrendProfile: &engine.TrendProfile{
+ TrendProfile: &utils.TrendProfile{
ID: "Trend3",
StatID: "Stats3",
Tenant: "cgrates.org",
@@ -227,7 +227,7 @@ func testTrendsSetTrendProfiles(t *testing.T) {
}
func testTrendsGetTrendProfileAfterSet(t *testing.T) {
- var replyTrendProfile *engine.TrendProfile
+ var replyTrendProfile *utils.TrendProfile
if err := trRPC.Call(context.Background(), utils.AdminSv1GetTrendProfile,
&utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{
@@ -305,7 +305,7 @@ func testTrendsGetTrendProfileCountAfterSet(t *testing.T) {
func testTrendsGetTrendProfilesAfterSet(t *testing.T) {
- var replyTrendProfiles []*engine.TrendProfile
+ var replyTrendProfiles []*utils.TrendProfile
if err := trRPC.Call(context.Background(), utils.AdminSv1GetTrendProfiles,
&utils.ArgsItemIDs{
Tenant: "cgrates.org",
@@ -347,7 +347,7 @@ func testTrendsGetTrendProfileAfterRemove(t *testing.T) {
}, &reply); err != nil {
t.Error(err)
}
- var replyTrendProfile engine.TrendProfile
+ var replyTrendProfile utils.TrendProfile
if err := trRPC.Call(context.Background(), utils.AdminSv1GetTrendProfile,
&utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{
@@ -426,7 +426,7 @@ func testTrendsGetTrendProfileCountAfterRemove(t *testing.T) {
func testTrendsGetTrendProfilesAfterRemove(t *testing.T) {
expectedTrendProfiles := append(trendProfiles[:1], trendProfiles[2:]...)
- var replyTrendProfiles []*engine.TrendProfile
+ var replyTrendProfiles []*utils.TrendProfile
if err := trRPC.Call(context.Background(), utils.AdminSv1GetTrendProfiles,
&utils.ArgsItemIDs{
Tenant: "cgrates.org",
diff --git a/engine/trends_test.go b/trends/trends_test.go
similarity index 92%
rename from engine/trends_test.go
rename to trends/trends_test.go
index 41344afa8..a65ebde45 100644
--- a/engine/trends_test.go
+++ b/trends/trends_test.go
@@ -16,19 +16,20 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see
*/
-package engine
+package trends
import (
"testing"
"github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
)
func TestNewTrendService(t *testing.T) {
- dm := &DataManager{}
+ dm := &engine.DataManager{}
cfg := &config.CGRConfig{}
- filterS := &FilterS{}
- connMgr := &ConnManager{}
+ filterS := &engine.FilterS{}
+ connMgr := &engine.ConnManager{}
trendService := NewTrendService(dm, cfg, filterS, connMgr)
diff --git a/utils/consts.go b/utils/consts.go
index a6e5fe575..5b30ecdbb 100644
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -587,7 +587,7 @@ const (
Tolerance = "Tolerance"
TTL = "TTL"
PurgeFilterIDs = "PurgeFilterIDs"
- Trend = "Trend"
+ TrendStr = "Trend"
MinItems = "MinItems"
MetricIDs = "MetricIDs"
MetricFilterIDs = "MetricFilterIDs"
diff --git a/engine/libtrends.go b/utils/trends.go
similarity index 68%
rename from engine/libtrends.go
rename to utils/trends.go
index 127eecd0a..b8ff67ac1 100644
--- a/engine/libtrends.go
+++ b/utils/trends.go
@@ -16,19 +16,16 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see
*/
-package engine
+package utils
import (
"math"
"slices"
"sync"
"time"
-
- "github.com/cgrates/cgrates/config"
- "github.com/cgrates/cgrates/utils"
)
-// A TrendProfile represents the settings of a Trend
+// TrendProfile defines the configuration of the Trend.
type TrendProfile struct {
Tenant string
ID string
@@ -44,7 +41,13 @@ type TrendProfile struct {
ThresholdIDs []string
}
-// Clone will clone the TrendProfile so it can be used by scheduler safely
+// TrendProfileWithAPIOpts wraps TrendProfile with APIOpts.
+type TrendProfileWithAPIOpts struct {
+ *TrendProfile
+ APIOpts map[string]any
+}
+
+// Clone creates a deep copy of TrendProfile for thread-safe use.
func (tP *TrendProfile) Clone() (clnTp *TrendProfile) {
clnTp = &TrendProfile{
Tenant: tP.Tenant,
@@ -73,277 +76,46 @@ func (tP *TrendProfile) Clone() (clnTp *TrendProfile) {
return
}
-type TrendProfileWithAPIOpts struct {
- *TrendProfile
- APIOpts map[string]any
-}
-
+// TenantID returns the concatenated tenant and ID.
func (srp *TrendProfile) TenantID() string {
- return utils.ConcatenatedKey(srp.Tenant, srp.ID)
-}
-
-type TrendWithAPIOpts struct {
- *Trend
- APIOpts map[string]any
-}
-
-// NewTrendFromProfile is a constructor for an empty trend out of it's profile
-func NewTrendFromProfile(tP *TrendProfile) *Trend {
- return &Trend{
- Tenant: tP.Tenant,
- ID: tP.ID,
- RunTimes: make([]time.Time, 0),
- Metrics: make(map[time.Time]map[string]*MetricWithTrend),
-
- tPrfl: tP,
- }
-}
-
-// Trend is the unit matched by filters
-type Trend struct {
- tMux sync.RWMutex
-
- Tenant string
- ID string
- RunTimes []time.Time
- Metrics map[time.Time]map[string]*MetricWithTrend
- CompressedMetrics []byte // if populated, Metrics and RunTimes will be emty
-
- // indexes help faster processing
- mLast map[string]time.Time // last time a metric was present
- mCounts map[string]int // number of times a metric is present in Metrics
- mTotals map[string]float64 // cached sum, used for average calculations
-
- tPrfl *TrendProfile // store here the trend profile so we can have it at hands further
-
-}
-
-func (t *Trend) Clone() (tC *Trend) {
- return
-}
-
-// AsTrendSummary transforms the trend into TrendSummary
-func (t *Trend) asTrendSummary() (ts *TrendSummary) {
- ts = &TrendSummary{
- Tenant: t.Tenant,
- ID: t.ID,
- Metrics: make(map[string]*MetricWithTrend),
- }
- if len(t.RunTimes) != 0 {
- ts.Time = t.RunTimes[len(t.RunTimes)-1]
- for mID, mWt := range t.Metrics[ts.Time] {
- ts.Metrics[mID] = &MetricWithTrend{
- ID: mWt.ID,
- Value: mWt.Value,
- TrendGrowth: mWt.TrendGrowth,
- TrendLabel: mWt.TrendLabel,
- }
- }
- }
- return
-}
-
-func (t *Trend) compress(ms utils.Marshaler) (tr *Trend, err error) {
- if config.CgrConfig().TrendSCfg().StoreUncompressedLimit > len(t.RunTimes) {
- return
- }
- tr = &Trend{
- Tenant: t.Tenant,
- ID: t.ID,
- }
- tr.CompressedMetrics, err = ms.Marshal(tr.Metrics)
- if err != nil {
- return
- }
- return tr, nil
-}
-
-func (t *Trend) uncompress(ms utils.Marshaler) (err error) {
- if t == nil || t.CompressedMetrics == nil {
- return
- }
-
- err = ms.Unmarshal(t.CompressedMetrics, &t.Metrics)
- if err != nil {
- return
- }
- t.CompressedMetrics = nil
- t.RunTimes = make([]time.Time, len(t.Metrics))
- i := 0
- for key := range t.Metrics {
- t.RunTimes[i] = key
- i++
- }
- slices.SortFunc(t.RunTimes, func(a, b time.Time) int {
- return a.Compare(b)
- })
- return
-}
-
-// Compile is used to initialize or cleanup the Trend
-//
-// thread safe since it should be used close to source
-func (t *Trend) Compile(cleanTtl time.Duration, qLength int) {
- t.cleanup(cleanTtl, qLength)
- if len(t.mTotals) == 0 { // indexes were not yet built
- t.computeIndexes()
- }
-}
-
-// cleanup will clean stale data out of
-func (t *Trend) cleanup(ttl time.Duration, qLength int) (altered bool) {
- if ttl >= 0 {
- expTime := time.Now().Add(-ttl)
- var expIdx *int
- for i, rT := range t.RunTimes {
- if rT.After(expTime) {
- continue
- }
- expIdx = &i
- delete(t.Metrics, rT)
- }
- if expIdx != nil {
- if len(t.RunTimes)-1 == *expIdx {
- t.RunTimes = make([]time.Time, 0)
- } else {
- t.RunTimes = t.RunTimes[*expIdx+1:]
- }
- altered = true
- }
- }
-
- diffLen := len(t.RunTimes) - qLength
- if qLength > 0 && diffLen > 0 {
- var rmTms []time.Time
- rmTms, t.RunTimes = t.RunTimes[:diffLen], t.RunTimes[diffLen:]
- for _, rmTm := range rmTms {
- delete(t.Metrics, rmTm)
- }
- altered = true
- }
- if altered {
- t.computeIndexes()
- }
- return
-}
-
-// computeIndexes should be called after each retrieval from DB
-func (t *Trend) computeIndexes() {
- t.mLast = make(map[string]time.Time)
- t.mCounts = make(map[string]int)
- t.mTotals = make(map[string]float64)
- for _, runTime := range t.RunTimes {
- for _, mWt := range t.Metrics[runTime] {
- t.indexesAppendMetric(mWt, runTime)
- }
- }
-}
-
-// indexesAppendMetric appends a single metric to indexes
-func (t *Trend) indexesAppendMetric(mWt *MetricWithTrend, rTime time.Time) {
- t.mLast[mWt.ID] = rTime
- t.mCounts[mWt.ID]++
- t.mTotals[mWt.ID] += mWt.Value
-}
-
-// getTrendGrowth returns the percentage growth for a specific metric
-//
-// @correlation parameter will define whether the comparison is against last or average value
-// errors in case of previous
-func (t *Trend) getTrendGrowth(mID string, mVal float64, correlation string, roundDec int) (tG float64, err error) {
- var prevVal float64
- if _, has := t.mLast[mID]; !has {
- return -1.0, utils.ErrNotFound
- }
- if _, has := t.Metrics[t.mLast[mID]][mID]; !has {
- return -1.0, utils.ErrNotFound
- }
- switch correlation {
- case utils.MetaLast:
- prevVal = t.Metrics[t.mLast[mID]][mID].Value
- case utils.MetaAverage:
- prevVal = t.mTotals[mID] / float64(t.mCounts[mID])
- default:
- return -1.0, utils.ErrCorrelationUndefined
- }
-
- diffVal := mVal - prevVal
- return utils.Round(diffVal*100/prevVal, roundDec, utils.MetaRoundingMiddle), nil
-}
-
-// getTrendLabel identifies the trend label for the instant value of the metric
-//
-// *positive, *negative, *constant, N/A
-func (t *Trend) getTrendLabel(tGrowth float64, tolerance float64) (lbl string) {
- switch {
- case tGrowth > 0:
- lbl = utils.MetaPositive
- case tGrowth < 0:
- lbl = utils.MetaNegative
- default:
- lbl = utils.MetaConstant
- }
- if math.Abs(tGrowth) <= tolerance { // percentage value of diff is lower than threshold
- lbl = utils.MetaConstant
- }
- return
-}
-
-// MetricWithTrend represents one read from StatS
-type MetricWithTrend struct {
- ID string // Metric ID
- Value float64 // Metric Value
- TrendGrowth float64 // Difference between last and previous
- TrendLabel string // *positive, *negative, *constant, N/A
-}
-
-func (tr *Trend) TenantID() string {
- return utils.ConcatenatedKey(tr.Tenant, tr.ID)
-}
-
-// TrendSummary represents the last trend computed
-type TrendSummary struct {
- Tenant string
- ID string
- Time time.Time
- Metrics map[string]*MetricWithTrend
+ return ConcatenatedKey(srp.Tenant, srp.ID)
}
func (tp *TrendProfile) Set(path []string, val any, _ bool) (err error) {
if len(path) != 1 {
- return utils.ErrWrongPath
+ return ErrWrongPath
}
switch path[0] {
default:
- return utils.ErrWrongPath
- case utils.Tenant:
- tp.Tenant = utils.IfaceAsString(val)
- case utils.ID:
- tp.ID = utils.IfaceAsString(val)
- case utils.Schedule:
- tp.Schedule = utils.IfaceAsString(val)
- case utils.StatID:
- tp.StatID = utils.IfaceAsString(val)
- case utils.Metrics:
+ return ErrWrongPath
+ case Tenant:
+ tp.Tenant = IfaceAsString(val)
+ case ID:
+ tp.ID = IfaceAsString(val)
+ case Schedule:
+ tp.Schedule = IfaceAsString(val)
+ case StatID:
+ tp.StatID = IfaceAsString(val)
+ case Metrics:
var valA []string
- valA, err = utils.IfaceAsStringSlice(val)
+ valA, err = IfaceAsStringSlice(val)
tp.Metrics = append(tp.Metrics, valA...)
- case utils.TTL:
- tp.TTL, err = utils.IfaceAsDuration(val)
- case utils.QueueLength:
- tp.QueueLength, err = utils.IfaceAsInt(val)
- case utils.MinItems:
- tp.MinItems, err = utils.IfaceAsInt(val)
- case utils.CorrelationType:
- tp.CorrelationType = utils.IfaceAsString(val)
- case utils.Tolerance:
- tp.Tolerance, err = utils.IfaceAsFloat64(val)
- case utils.Stored:
- tp.Stored, err = utils.IfaceAsBool(val)
- case utils.ThresholdIDs:
+ case TTL:
+ tp.TTL, err = IfaceAsDuration(val)
+ case QueueLength:
+ tp.QueueLength, err = IfaceAsInt(val)
+ case MinItems:
+ tp.MinItems, err = IfaceAsInt(val)
+ case CorrelationType:
+ tp.CorrelationType = IfaceAsString(val)
+ case Tolerance:
+ tp.Tolerance, err = IfaceAsFloat64(val)
+ case Stored:
+ tp.Stored, err = IfaceAsBool(val)
+ case ThresholdIDs:
var valA []string
- valA, err = utils.IfaceAsStringSlice(val)
+ valA, err = IfaceAsStringSlice(val)
tp.ThresholdIDs = append(tp.ThresholdIDs, valA...)
}
return
@@ -385,53 +157,304 @@ func (tp *TrendProfile) Merge(v2 any) {
}
}
-func (tp *TrendProfile) String() string { return utils.ToJSON(tp) }
+func (tp *TrendProfile) String() string { return ToJSON(tp) }
+
func (tp *TrendProfile) FieldAsString(fldPath []string) (_ string, err error) {
var val any
if val, err = tp.FieldAsInterface(fldPath); err != nil {
return
}
- return utils.IfaceAsString(val), nil
+ return IfaceAsString(val), nil
}
+
func (tp *TrendProfile) FieldAsInterface(fldPath []string) (_ any, err error) {
if len(fldPath) != 1 {
- return nil, utils.ErrNotFound
+ return nil, ErrNotFound
}
switch fldPath[0] {
default:
- fld, idx := utils.GetPathIndex(fldPath[0])
+ fld, idx := GetPathIndex(fldPath[0])
if idx != nil {
switch fld {
- case utils.Metrics:
+ case Metrics:
if *idx < len(tp.Metrics) {
return tp.Metrics[*idx], nil
}
- case utils.ThresholdIDs:
+ case ThresholdIDs:
if *idx < len(tp.ThresholdIDs) {
return tp.ThresholdIDs[*idx], nil
}
}
}
- return nil, utils.ErrNotFound
- case utils.Tenant:
+ return nil, ErrNotFound
+ case Tenant:
return tp.Tenant, nil
- case utils.ID:
+ case ID:
return tp.ID, nil
- case utils.Schedule:
+ case Schedule:
return tp.Schedule, nil
- case utils.StatID:
+ case StatID:
return tp.StatID, nil
- case utils.TTL:
+ case TTL:
return tp.TTL, nil
- case utils.QueueLength:
+ case QueueLength:
return tp.QueueLength, nil
- case utils.MinItems:
+ case MinItems:
return tp.MinItems, nil
- case utils.CorrelationType:
+ case CorrelationType:
return tp.CorrelationType, nil
- case utils.Tolerance:
+ case Tolerance:
return tp.Tolerance, nil
- case utils.Stored:
+ case Stored:
return tp.Stored, nil
}
}
+
+// Trend represents a collection of metrics with trend analysis.
+type Trend struct {
+ tMux sync.RWMutex
+
+ Tenant string
+ ID string
+ RunTimes []time.Time
+ Metrics map[time.Time]map[string]*MetricWithTrend
+ CompressedMetrics []byte // if populated, Metrics and RunTimes will be emty
+
+ // indexes help faster processing
+ mLast map[string]time.Time // last time a metric was present
+ mCounts map[string]int // number of times a metric is present in Metrics
+ mTotals map[string]float64 // cached sum, used for average calculations
+
+ tPrfl *TrendProfile // store here the trend profile so we can have it at hands further
+}
+
+// TrendWithAPIOpts wraps Trend with APIOpts.
+type TrendWithAPIOpts struct {
+ *Trend
+ APIOpts map[string]any
+}
+
+// TrendSummary holds the most recent trend metrics.
+type TrendSummary struct {
+ Tenant string
+ ID string
+ Time time.Time
+ Metrics map[string]*MetricWithTrend
+}
+
+// MetricWithTrend contains a metric value with its calculated trend info.
+type MetricWithTrend struct {
+ ID string // Metric ID
+ Value float64 // Metric Value
+ TrendGrowth float64 // Difference between last and previous
+ TrendLabel string // *positive, *negative, *constant, N/A
+}
+
+// NewTrendFromProfile creates an empty trend based on profile configuration.
+func NewTrendFromProfile(tP *TrendProfile) *Trend {
+ return &Trend{
+ Tenant: tP.Tenant,
+ ID: tP.ID,
+ RunTimes: make([]time.Time, 0),
+ Metrics: make(map[time.Time]map[string]*MetricWithTrend),
+
+ tPrfl: tP,
+ }
+}
+
+func (tr *Trend) TenantID() string {
+ return ConcatenatedKey(tr.Tenant, tr.ID)
+}
+
+// Config returns the trend's profile configuration.
+func (t *Trend) Config() *TrendProfile {
+ return t.tPrfl
+}
+
+func (t *Trend) SetConfig(tp *TrendProfile) {
+ t.tPrfl = tp
+}
+
+// AsTrendSummary creates a summary with the most recent trend data.
+func (t *Trend) AsTrendSummary() (ts *TrendSummary) {
+ ts = &TrendSummary{
+ Tenant: t.Tenant,
+ ID: t.ID,
+ Metrics: make(map[string]*MetricWithTrend),
+ }
+ if len(t.RunTimes) != 0 {
+ ts.Time = t.RunTimes[len(t.RunTimes)-1]
+ for mID, mWt := range t.Metrics[ts.Time] {
+ ts.Metrics[mID] = &MetricWithTrend{
+ ID: mWt.ID,
+ Value: mWt.Value,
+ TrendGrowth: mWt.TrendGrowth,
+ TrendLabel: mWt.TrendLabel,
+ }
+ }
+ }
+ return
+}
+
+// Compress creates a compressed version of the trend.
+func (t *Trend) Compress(ms Marshaler, limit int) (tr *Trend, err error) {
+ if limit > len(t.RunTimes) {
+ return
+ }
+ tr = &Trend{
+ Tenant: t.Tenant,
+ ID: t.ID,
+ }
+ tr.CompressedMetrics, err = ms.Marshal(tr.Metrics)
+ if err != nil {
+ return
+ }
+ return tr, nil
+}
+
+// Uncompress expands a compressed trend.
+func (t *Trend) Uncompress(ms Marshaler) (err error) {
+ if t == nil || t.CompressedMetrics == nil {
+ return
+ }
+
+ err = ms.Unmarshal(t.CompressedMetrics, &t.Metrics)
+ if err != nil {
+ return
+ }
+ t.CompressedMetrics = nil
+ t.RunTimes = make([]time.Time, len(t.Metrics))
+ i := 0
+ for key := range t.Metrics {
+ t.RunTimes[i] = key
+ i++
+ }
+ slices.SortFunc(t.RunTimes, func(a, b time.Time) int {
+ return a.Compare(b)
+ })
+ return
+}
+
+// Compile initializes or cleans up the Trend.
+// Safe for concurrent use.
+func (t *Trend) Compile(cleanTtl time.Duration, qLength int) {
+ t.cleanup(cleanTtl, qLength)
+ if len(t.mTotals) == 0 { // indexes were not yet built
+ t.computeIndexes()
+ }
+}
+
+// cleanup removes stale data based on TTL and queue length limits.
+func (t *Trend) cleanup(ttl time.Duration, qLength int) (altered bool) {
+ if ttl >= 0 {
+ expTime := time.Now().Add(-ttl)
+ var expIdx *int
+ for i, rT := range t.RunTimes {
+ if rT.After(expTime) {
+ continue
+ }
+ expIdx = &i
+ delete(t.Metrics, rT)
+ }
+ if expIdx != nil {
+ if len(t.RunTimes)-1 == *expIdx {
+ t.RunTimes = make([]time.Time, 0)
+ } else {
+ t.RunTimes = t.RunTimes[*expIdx+1:]
+ }
+ altered = true
+ }
+ }
+
+ diffLen := len(t.RunTimes) - qLength
+ if qLength > 0 && diffLen > 0 {
+ var rmTms []time.Time
+ rmTms, t.RunTimes = t.RunTimes[:diffLen], t.RunTimes[diffLen:]
+ for _, rmTm := range rmTms {
+ delete(t.Metrics, rmTm)
+ }
+ altered = true
+ }
+ if altered {
+ t.computeIndexes()
+ }
+ return
+}
+
+// computeIndexes rebuilds internal indexes after DB retrieval.
+func (t *Trend) computeIndexes() {
+ t.mLast = make(map[string]time.Time)
+ t.mCounts = make(map[string]int)
+ t.mTotals = make(map[string]float64)
+ for _, runTime := range t.RunTimes {
+ for _, mWt := range t.Metrics[runTime] {
+ t.IndexesAppendMetric(mWt, runTime)
+ }
+ }
+}
+
+// IndexesAppendMetric adds a single metric to internal indexes.
+func (t *Trend) IndexesAppendMetric(mWt *MetricWithTrend, rTime time.Time) {
+ t.mLast[mWt.ID] = rTime
+ t.mCounts[mWt.ID]++
+ t.mTotals[mWt.ID] += mWt.Value
+}
+
+// GetTrendGrowth calculates percentage growth for a metric compared to previous values.
+func (t *Trend) GetTrendGrowth(mID string, mVal float64, correlation string, roundDec int) (tG float64, err error) {
+ var prevVal float64
+ if _, has := t.mLast[mID]; !has {
+ return -1.0, ErrNotFound
+ }
+ if _, has := t.Metrics[t.mLast[mID]][mID]; !has {
+ return -1.0, ErrNotFound
+ }
+ switch correlation {
+ case MetaLast:
+ prevVal = t.Metrics[t.mLast[mID]][mID].Value
+ case MetaAverage:
+ prevVal = t.mTotals[mID] / float64(t.mCounts[mID])
+ default:
+ return -1.0, ErrCorrelationUndefined
+ }
+
+ diffVal := mVal - prevVal
+ return Round(diffVal*100/prevVal, roundDec, MetaRoundingMiddle), nil
+}
+
+// GetTrendLabel determines trend direction based on growth percentage and tolerance.
+// Returns "*positive", "*negative", "*constant", or "N/A" based on the growth value.
+func GetTrendLabel(tGrowth float64, tolerance float64) (lbl string) {
+ switch {
+ case tGrowth > 0:
+ lbl = MetaPositive
+ case tGrowth < 0:
+ lbl = MetaNegative
+ default:
+ lbl = MetaConstant
+ }
+ if math.Abs(tGrowth) <= tolerance { // percentage value of diff is lower than threshold
+ lbl = MetaConstant
+ }
+ return
+}
+
+// Lock locks the trend mutex.
+func (t *Trend) Lock() {
+ t.tMux.Lock()
+}
+
+// Unlock unlocks the trend mutex.
+func (t *Trend) Unlock() {
+ t.tMux.Unlock()
+}
+
+// RLock locks the trend mutex for reading.
+func (t *Trend) RLock() {
+ t.tMux.RLock()
+}
+
+// RUnlock unlocks the read lock on the trend mutex.
+func (t *Trend) RUnlock() {
+ t.tMux.RUnlock()
+}
diff --git a/engine/libtrends_test.go b/utils/trends_test.go
similarity index 85%
rename from engine/libtrends_test.go
rename to utils/trends_test.go
index 4ea2519bc..98345b138 100644
--- a/engine/libtrends_test.go
+++ b/utils/trends_test.go
@@ -16,7 +16,7 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see
*/
-package engine
+package utils
import (
"errors"
@@ -24,8 +24,6 @@ import (
"strings"
"testing"
"time"
-
- "github.com/cgrates/cgrates/utils"
)
func TestTrendProfileClone(t *testing.T) {
@@ -112,7 +110,7 @@ func TestTrendProfileTenantIDAndTrendProfileWithAPIOpts(t *testing.T) {
tenantID := tp.TenantID()
- expectedTenantID := "cgrates.org" + utils.ConcatenatedKeySep + "trend1"
+ expectedTenantID := "cgrates.org" + ConcatenatedKeySep + "trend1"
if tenantID != expectedTenantID {
t.Errorf("Expected TenantID %s, but got %s", expectedTenantID, tenantID)
}
@@ -158,9 +156,9 @@ func TestIndexesAppendMetric(t *testing.T) {
rTime1 := time.Now()
rTime2 := rTime1.Add(10 * time.Minute)
- trend.indexesAppendMetric(metric1, rTime1)
- trend.indexesAppendMetric(metric2, rTime2)
- trend.indexesAppendMetric(metric1, rTime2)
+ trend.IndexesAppendMetric(metric1, rTime1)
+ trend.IndexesAppendMetric(metric2, rTime2)
+ trend.IndexesAppendMetric(metric1, rTime2)
expectedMLast := map[string]time.Time{
"metric1": rTime2,
@@ -297,24 +295,22 @@ func TestTComputeIndexes(t *testing.T) {
}
func TestGetTrendLabel(t *testing.T) {
- trend := &Trend{}
-
tests := []struct {
tGrowth float64
tolerance float64
expected string
}{
- {1.0, 0.5, utils.MetaPositive},
- {-1.0, 0.5, utils.MetaNegative},
- {0.0, 0.5, utils.MetaConstant},
- {0.3, 0.5, utils.MetaConstant},
- {-0.3, 0.5, utils.MetaConstant},
- {0.6, 0.5, utils.MetaPositive},
- {-0.6, 0.5, utils.MetaNegative},
+ {1.0, 0.5, MetaPositive},
+ {-1.0, 0.5, MetaNegative},
+ {0.0, 0.5, MetaConstant},
+ {0.3, 0.5, MetaConstant},
+ {-0.3, 0.5, MetaConstant},
+ {0.6, 0.5, MetaPositive},
+ {-0.6, 0.5, MetaNegative},
}
for _, test := range tests {
- result := trend.getTrendLabel(test.tGrowth, test.tolerance)
+ result := GetTrendLabel(test.tGrowth, test.tolerance)
if result != test.expected {
t.Errorf("For tGrowth: %f and tolerance: %f, expected %s, got %s", test.tGrowth, test.tolerance, test.expected, result)
}
@@ -330,16 +326,16 @@ func TestGetTrendGrowth(t *testing.T) {
mCounts: map[string]int{},
}
- _, err := trend.getTrendGrowth("unknownID", 100, utils.MetaLast, 2)
- if !errors.Is(err, utils.ErrNotFound) {
+ _, err := trend.GetTrendGrowth("unknownID", 100, MetaLast, 2)
+ if !errors.Is(err, ErrNotFound) {
t.Errorf("Expected error ErrNotFound, got: %v", err)
}
now := time.Now()
trend.mLast["metric1"] = now
- _, err = trend.getTrendGrowth("metric1", 100, utils.MetaLast, 2)
- if !errors.Is(err, utils.ErrNotFound) {
+ _, err = trend.GetTrendGrowth("metric1", 100, MetaLast, 2)
+ if !errors.Is(err, ErrNotFound) {
t.Errorf("Expected error ErrNotFound, got: %v", err)
}
@@ -349,7 +345,7 @@ func TestGetTrendGrowth(t *testing.T) {
},
}
- got, err := trend.getTrendGrowth("metric1", 100, utils.MetaLast, 2)
+ got, err := trend.GetTrendGrowth("metric1", 100, MetaLast, 2)
if err != nil || got != 25.0 {
t.Errorf("Mismatch for MetaLast correlation. Got: %v, expected: %v", got, 25.0)
}
@@ -361,13 +357,13 @@ func TestGetTrendGrowth(t *testing.T) {
"metric1": 4,
}
- got, err = trend.getTrendGrowth("metric1", 120, utils.MetaAverage, 2)
+ got, err = trend.GetTrendGrowth("metric1", 120, MetaAverage, 2)
if err != nil || got != 20.0 {
t.Errorf("Mismatch for MetaAverage correlation. Got: %v, expected: %v", got, 20.0)
}
- _, err = trend.getTrendGrowth("metric1", 100, "invalidCorrelation", 2)
- if !errors.Is(err, utils.ErrCorrelationUndefined) {
+ _, err = trend.GetTrendGrowth("metric1", 100, "invalidCorrelation", 2)
+ if !errors.Is(err, ErrCorrelationUndefined) {
t.Errorf("Expected error ErrCorrelationUndefined, got: %v", err)
}
}
@@ -419,21 +415,21 @@ func TestTrendProfileFieldAsString(t *testing.T) {
err error
val any
}{
- {utils.ID, []string{utils.ID}, nil, "Trend1"},
- {utils.Tenant, []string{utils.Tenant}, nil, "cgrates.org"},
- {utils.Schedule, []string{utils.Schedule}, nil, "@every 1m"},
- {utils.StatID, []string{utils.StatID}, nil, "Stat1"},
- {utils.Metrics, []string{utils.Metrics + "[0]"}, nil, "*acc"},
- {utils.Metrics, []string{utils.Metrics + "[1]"}, nil, "*tcd"},
- {utils.TTL, []string{utils.TTL}, nil, 10 * time.Minute},
- {utils.QueueLength, []string{utils.QueueLength}, nil, 100},
- {utils.MinItems, []string{utils.MinItems}, nil, 10},
- {utils.CorrelationType, []string{utils.CorrelationType}, nil, "*average"},
- {utils.Tolerance, []string{utils.Tolerance}, nil, 0.05},
- {utils.Stored, []string{utils.Stored}, nil, true},
- {utils.ThresholdIDs, []string{utils.ThresholdIDs + "[0]"}, nil, "Thresh1"},
- {utils.ThresholdIDs, []string{utils.ThresholdIDs + "[1]"}, nil, "Thresh2"},
- {"NonExistingField", []string{"Field1"}, utils.ErrNotFound, nil},
+ {ID, []string{ID}, nil, "Trend1"},
+ {Tenant, []string{Tenant}, nil, "cgrates.org"},
+ {Schedule, []string{Schedule}, nil, "@every 1m"},
+ {StatID, []string{StatID}, nil, "Stat1"},
+ {Metrics, []string{Metrics + "[0]"}, nil, "*acc"},
+ {Metrics, []string{Metrics + "[1]"}, nil, "*tcd"},
+ {TTL, []string{TTL}, nil, 10 * time.Minute},
+ {QueueLength, []string{QueueLength}, nil, 100},
+ {MinItems, []string{MinItems}, nil, 10},
+ {CorrelationType, []string{CorrelationType}, nil, "*average"},
+ {Tolerance, []string{Tolerance}, nil, 0.05},
+ {Stored, []string{Stored}, nil, true},
+ {ThresholdIDs, []string{ThresholdIDs + "[0]"}, nil, "Thresh1"},
+ {ThresholdIDs, []string{ThresholdIDs + "[1]"}, nil, "Thresh2"},
+ {"NonExistingField", []string{"Field1"}, ErrNotFound, nil},
}
rp := &TrendProfile{
Tenant: "cgrates.org",
@@ -490,24 +486,24 @@ func TestTrendCleanUp(t *testing.T) {
},
Metrics: map[time.Time]map[string]*MetricWithTrend{
tm: {
- utils.MetaTCC: {ID: utils.MetaTCC, Value: 13, TrendGrowth: -1, TrendLabel: utils.NotAvailable},
- utils.MetaACC: {ID: utils.MetaACC, Value: 13, TrendGrowth: -1, TrendLabel: utils.NotAvailable},
+ MetaTCC: {ID: MetaTCC, Value: 13, TrendGrowth: -1, TrendLabel: NotAvailable},
+ MetaACC: {ID: MetaACC, Value: 13, TrendGrowth: -1, TrendLabel: NotAvailable},
},
tm2: {
- utils.MetaTCC: {ID: utils.MetaTCC, Value: 30, TrendGrowth: 120, TrendLabel: utils.MetaPositive},
- utils.MetaACC: {ID: utils.MetaACC, Value: 15, TrendGrowth: 4, TrendLabel: utils.MetaPositive},
+ MetaTCC: {ID: MetaTCC, Value: 30, TrendGrowth: 120, TrendLabel: MetaPositive},
+ MetaACC: {ID: MetaACC, Value: 15, TrendGrowth: 4, TrendLabel: MetaPositive},
},
tm3: {
- utils.MetaTCC: {ID: utils.MetaTCC, Value: 30, TrendGrowth: 120, TrendLabel: utils.MetaPositive},
- utils.MetaACC: {ID: utils.MetaACC, Value: 15, TrendGrowth: 4, TrendLabel: utils.MetaPositive},
+ MetaTCC: {ID: MetaTCC, Value: 30, TrendGrowth: 120, TrendLabel: MetaPositive},
+ MetaACC: {ID: MetaACC, Value: 15, TrendGrowth: 4, TrendLabel: MetaPositive},
},
tm4: {
- utils.MetaTCC: {ID: utils.MetaTCC, Value: 30, TrendGrowth: 120, TrendLabel: utils.MetaPositive},
- utils.MetaACC: {ID: utils.MetaACC, Value: 15, TrendGrowth: 4, TrendLabel: utils.MetaPositive},
+ MetaTCC: {ID: MetaTCC, Value: 30, TrendGrowth: 120, TrendLabel: MetaPositive},
+ MetaACC: {ID: MetaACC, Value: 15, TrendGrowth: 4, TrendLabel: MetaPositive},
},
tm5: {
- utils.MetaTCC: {ID: utils.MetaTCC, Value: 30, TrendGrowth: 120, TrendLabel: utils.MetaPositive},
- utils.MetaACC: {ID: utils.MetaACC, Value: 15, TrendGrowth: 4, TrendLabel: utils.MetaPositive},
+ MetaTCC: {ID: MetaTCC, Value: 30, TrendGrowth: 120, TrendLabel: MetaPositive},
+ MetaACC: {ID: MetaACC, Value: 15, TrendGrowth: 4, TrendLabel: MetaPositive},
},
},
}
@@ -667,84 +663,84 @@ func TestTrendProfileSet(t *testing.T) {
}{
{
name: "Set Tenant",
- path: []string{utils.Tenant},
+ path: []string{Tenant},
val: "newTenant",
expected: "newTenant",
hasError: false,
},
{
name: "Set ID",
- path: []string{utils.ID},
+ path: []string{ID},
val: "newID",
expected: "newID",
hasError: false,
},
{
name: "Set Schedule",
- path: []string{utils.Schedule},
+ path: []string{Schedule},
val: "@every 2m",
expected: "@every 2m",
hasError: false,
},
{
name: "Set StatID",
- path: []string{utils.StatID},
+ path: []string{StatID},
val: "newStatID",
expected: "newStatID",
hasError: false,
},
{
name: "Set Metrics",
- path: []string{utils.Metrics},
+ path: []string{Metrics},
val: []string{"*newMetric"},
expected: []string{"*acc", "*tcd", "*newMetric"},
hasError: false,
},
{
name: "Set TTL",
- path: []string{utils.TTL},
+ path: []string{TTL},
val: "15m",
expected: 15 * time.Minute,
hasError: false,
},
{
name: "Set QueueLength",
- path: []string{utils.QueueLength},
+ path: []string{QueueLength},
val: 50,
expected: 50,
hasError: false,
},
{
name: "Set MinItems",
- path: []string{utils.MinItems},
+ path: []string{MinItems},
val: 20,
expected: 20,
hasError: false,
},
{
name: "Set CorrelationType",
- path: []string{utils.CorrelationType},
+ path: []string{CorrelationType},
val: "*sum",
expected: "*sum",
hasError: false,
},
{
name: "Set Tolerance",
- path: []string{utils.Tolerance},
+ path: []string{Tolerance},
val: 0.1,
expected: 0.1,
hasError: false,
},
{
name: "Set Stored",
- path: []string{utils.Stored},
+ path: []string{Stored},
val: false,
expected: false,
hasError: false,
},
{
name: "Set ThresholdIDs",
- path: []string{utils.ThresholdIDs},
+ path: []string{ThresholdIDs},
val: []string{"Thresh3", "Thresh4"},
expected: []string{"Thresh1", "Thresh2", "Thresh3", "Thresh4"},
hasError: false,
@@ -769,23 +765,23 @@ func TestTrendProfileSet(t *testing.T) {
}
switch tt.path[0] {
- case utils.Tenant:
+ case Tenant:
if tp.Tenant != tt.expected {
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.Tenant)
}
- case utils.ID:
+ case ID:
if tp.ID != tt.expected {
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.ID)
}
- case utils.Schedule:
+ case Schedule:
if tp.Schedule != tt.expected {
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.Schedule)
}
- case utils.StatID:
+ case StatID:
if tp.StatID != tt.expected {
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.StatID)
}
- case utils.Metrics:
+ case Metrics:
if len(tp.Metrics) != len(tt.expected.([]string)) {
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.Metrics)
} else {
@@ -796,31 +792,31 @@ func TestTrendProfileSet(t *testing.T) {
}
}
}
- case utils.TTL:
+ case TTL:
if tp.TTL != tt.expected {
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.TTL)
}
- case utils.QueueLength:
+ case QueueLength:
if tp.QueueLength != tt.expected {
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.QueueLength)
}
- case utils.MinItems:
+ case MinItems:
if tp.MinItems != tt.expected {
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.MinItems)
}
- case utils.CorrelationType:
+ case CorrelationType:
if tp.CorrelationType != tt.expected {
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.CorrelationType)
}
- case utils.Tolerance:
+ case Tolerance:
if tp.Tolerance != tt.expected {
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.Tolerance)
}
- case utils.Stored:
+ case Stored:
if tp.Stored != tt.expected {
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.Stored)
}
- case utils.ThresholdIDs:
+ case ThresholdIDs:
if len(tp.ThresholdIDs) != len(tt.expected.([]string)) {
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.ThresholdIDs)
} else {