diff --git a/apier/v1/replicator.go b/apier/v1/replicator.go index 4149b34ec..a12275b05 100644 --- a/apier/v1/replicator.go +++ b/apier/v1/replicator.go @@ -149,7 +149,11 @@ func (rplSv1 *ReplicatorSv1) GetTrend(ctx *context.Context, tntID *utils.TenantI if err != nil { return err } - *reply = *rcv + reply.Tenant = rcv.Tenant + reply.ID = rcv.ID + reply.RunTimes = rcv.RunTimes + reply.CompressedMetrics = rcv.CompressedMetrics + reply.Metrics = rcv.Metrics return nil } diff --git a/apier/v1/trends.go b/apier/v1/trends.go index b415d3cf9..4560eb6db 100644 --- a/apier/v1/trends.go +++ b/apier/v1/trends.go @@ -90,3 +90,7 @@ func (trs *TrendSv1) ScheduleQueries(ctx *context.Context, args *utils.ArgSchedu func (trs *TrendSv1) GetTrend(ctx *context.Context, args *utils.ArgGetTrend, trend *engine.Trend) error { return trs.trS.V1GetTrend(ctx, args, trend) } + +func (trs *TrendSv1) GetScheduledTrends(ctx *context.Context, args *utils.ArgScheduledTrends, schedTrends *[]utils.ScheduledTrend) error { + return trs.trS.V1GetScheduledTrends(ctx, args, schedTrends) +} diff --git a/data/conf/samples/trends/trends_mongo/cgrates.json b/data/conf/samples/trends/trends_mongo/cgrates.json new file mode 100644 index 000000000..43eef23a6 --- /dev/null +++ b/data/conf/samples/trends/trends_mongo/cgrates.json @@ -0,0 +1,36 @@ +{ + +"general": { + "log_level": 7, +}, + +"data_db": { + "db_type": "mongo", + "db_name": "10", + "db_port": 27017, +}, + + +"stor_db": { + "db_type": "mongo", + "db_name": "cgrates", + "db_port": 27017, + "db_password": "", +}, + +"trends": { + "enabled": true, + "stats_conns":["*localhost"], + "scheduled_ids": {} +}, + +"stats": { + "enabled": true, + "store_interval": "-1", +}, + +"apiers": { + "enabled": true, + }, + +} \ No newline at end of file diff --git a/data/conf/samples/trends/trends_mysql/cgrates.json b/data/conf/samples/trends/trends_mysql/cgrates.json new file mode 100644 index 000000000..7b020a06d --- /dev/null +++ b/data/conf/samples/trends/trends_mysql/cgrates.json @@ -0,0 +1,32 @@ +{ + +"general": { + "log_level": 7, +}, + +"data_db": { +"db_type": "redis", +"db_port": 6379, +"db_name": "10", +}, + +"stor_db": { +"db_password": "CGRateS.org", +}, + +"trends": { + "enabled": true, + "stats_conns":["*localhost"], + "scheduled_ids": {} +}, + +"stats": { + "enabled": true, + "store_interval": "-1", +}, + +"apiers": { + "enabled": true, + }, + +} \ No newline at end of file diff --git a/data/conf/samples/trends/trends_schedIDs_mongo/cgrates.json b/data/conf/samples/trends/trends_schedIDs_mongo/cgrates.json new file mode 100644 index 000000000..f22f9a46f --- /dev/null +++ b/data/conf/samples/trends/trends_schedIDs_mongo/cgrates.json @@ -0,0 +1,32 @@ +{ + +"general": { + "log_level": 7, +}, + +"data_db": { +"db_type": "redis", +"db_port": 6379, +"db_name": "10", +}, + +"stor_db": { +"db_password": "CGRateS.org", +}, + +"trends": { + "enabled": true, + "stats_conns":["*localhost"], + "scheduled_ids": {"cgrates.org":["TR_1min"],"tenant1":["TR_5min"],"tenant2":[]} +}, + +"stats": { + "enabled": true, + "store_interval": "-1", +}, + +"apiers": { + "enabled": true, + }, + +} \ No newline at end of file diff --git a/data/conf/samples/trends/trends_schedIDs_mysql/cgrates.json b/data/conf/samples/trends/trends_schedIDs_mysql/cgrates.json new file mode 100644 index 000000000..f22f9a46f --- /dev/null +++ b/data/conf/samples/trends/trends_schedIDs_mysql/cgrates.json @@ -0,0 +1,32 @@ +{ + +"general": { + "log_level": 7, +}, + +"data_db": { +"db_type": "redis", +"db_port": 6379, +"db_name": "10", +}, + +"stor_db": { +"db_password": "CGRateS.org", +}, + +"trends": { + "enabled": true, + "stats_conns":["*localhost"], + "scheduled_ids": {"cgrates.org":["TR_1min"],"tenant1":["TR_5min"],"tenant2":[]} +}, + +"stats": { + "enabled": true, + "store_interval": "-1", +}, + +"apiers": { + "enabled": true, + }, + +} \ No newline at end of file diff --git a/data/conf/samples/tutinternal/cgrates.json b/data/conf/samples/tutinternal/cgrates.json index 7ce1d2e39..65c848c8c 100644 --- a/data/conf/samples/tutinternal/cgrates.json +++ b/data/conf/samples/tutinternal/cgrates.json @@ -25,11 +25,6 @@ "db_type": "*internal" }, -"trends": { - "enabled": true, - "stats_conns":["*internal"], -}, - "rals": { "enabled": true, diff --git a/data/conf/samples/tutmongo/cgrates.json b/data/conf/samples/tutmongo/cgrates.json index 583dfb6fb..f4c13040a 100644 --- a/data/conf/samples/tutmongo/cgrates.json +++ b/data/conf/samples/tutmongo/cgrates.json @@ -128,14 +128,5 @@ "apiers_conns": ["*internal"], }, -// "sentrypeer":{ -// "client_id":"", -// "client_secret":"", -// "token_url":"", -// "ip_url":"", -// "number_url":"", -// "audience":"", -// "grant_type":"", -// }, } diff --git a/data/tariffplans/tuttrends/Stats.csv b/data/tariffplans/tuttrends/Stats.csv new file mode 100644 index 000000000..05417128d --- /dev/null +++ b/data/tariffplans/tuttrends/Stats.csv @@ -0,0 +1,5 @@ +#Tenant[0],Id[1],FilterIDs[2],ActivationInterval[3],QueueLength[4],TTL[5],MinItems[6],Metrics[7],MetricFilterIDs[8],Stored[9],Blocker[10],Weight[11],ThresholdIDs[12] +cgrates.org,Stats1_1,*string:~*req.Account:1001,,,,,*tcc;*acd;*tcd,,,,, +cgrates.org,Stats1_2,*string:~*req.Account:1002,,,,,*sum#~*req.Usage;*pdd,,,,, +tenant1,Stat1,*string:~*req.Account:1005,,,,,*tcd;*asr;*acc,,,,, +tenant2,Stat_Avg,,,,,,*acc,,,,, \ No newline at end of file diff --git a/data/tariffplans/tuttrends/Trends.csv b/data/tariffplans/tuttrends/Trends.csv new file mode 100644 index 000000000..c689802cd --- /dev/null +++ b/data/tariffplans/tuttrends/Trends.csv @@ -0,0 +1,8 @@ +#Tenant[0],Id[1],Schedule[2],StatID[3],Metrics[4],TTL[5],QueueLength[6],MinItems[7],CorrelationType[8],Tolerance[9],Stored[10],ThresholdIDs[11] +cgrates.org,TREND_1,@every 1s,Stats1_1,,-1,-1,1,*last,1,false, +cgrates.org,TREND_2,@every 2s,Stats1_2,,-1,2,4,*last,1,false, +cgrates.org,TR_1min,@every 1m,Stats1_1,,-1,3,2,*average,0.15,false, +tenant1,TR_5min,*/5 * * * *,Stat1,,-1,-1,8,*average,0.23,true, +tenant1,TR_1hr,0 * * * *,Stat1,,-1,-1,8,*average,0.1,true, +tenant2,Trend_avg,@every 10m,Stat_Avg,,-1,-1,8,*average,0.6,true, +tenant2,Trend_avg_30min,@every 30m,Stat_Avg,,-1,-1,100,*average,0.3,true, \ No newline at end of file diff --git a/engine/datamanager.go b/engine/datamanager.go index e58a19e71..c4b037004 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -1306,7 +1306,6 @@ func (dm *DataManager) GetTrend(tenant, id string, err = utils.ErrNoDatabaseConn return } - if tr, err = dm.dataDB.GetTrendDrv(tenant, id); err != nil { if err != utils.ErrNotFound { // database error return @@ -1340,7 +1339,6 @@ func (dm *DataManager) GetTrend(tenant, id string, return } } - if cacheWrite { if errCh := Cache.Set(utils.CacheTrends, tntID, tr, nil, cacheCommit(transactionID), transactionID); errCh != nil { @@ -1452,33 +1450,30 @@ func (dm *DataManager) GetTrendProfileIDs(tenants []string) (tps map[string][]st if err != nil { return } - if len(keys) == 0 { - return nil, utils.ErrNotFound + } else { + for _, tenant := range tenants { + var tntkeys []string + tntPrfx := prfx + tenant + utils.ConcatenatedKeySep + tntkeys, err = dm.dataDB.GetKeysForPrefix(tntPrfx) + if err != nil { + return + } + keys = append(keys, tntkeys...) } - tps = make(map[string][]string) - for _, key := range keys { - indx := strings.Index(key, utils.ConcatenatedKeySep) - tenant := key[len(utils.TrendsProfilePrefix):indx] - id := key[indx+1:] - tps[tenant] = append(tps[tenant], id) - } - return } - for _, tenant := range tenants { - tntPrfx := prfx + tenant + utils.ConcatenatedKeySep - keys, err = dm.dataDB.GetKeysForPrefix(tntPrfx) - if err != nil { - return - } - if len(keys) == 0 { - return nil, utils.ErrNotFound - } - tps = make(map[string][]string) - for _, key := range keys { - tps[tenant] = append(tps[tenant], key[len(tntPrfx):]) - } + // if len(keys) == 0 { + // return nil, utils.ErrNotFound + // } + + tps = make(map[string][]string) + for _, key := range keys { + indx := strings.Index(key, utils.ConcatenatedKeySep) + tenant := key[len(utils.TrendsProfilePrefix):indx] + id := key[indx+1:] + tps[tenant] = append(tps[tenant], id) } return + } func (dm *DataManager) SetTrendProfile(trp *TrendProfile) (err error) { diff --git a/engine/trends.go b/engine/trends.go index 3b4454364..c6eac99b0 100644 --- a/engine/trends.go +++ b/engine/trends.go @@ -21,6 +21,8 @@ package engine import ( "fmt" "runtime" + "slices" + "strings" "sync" "time" @@ -143,6 +145,18 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) { utils.TrendS, tP.Tenant, tP.ID, err.Error())) return } + if err = tS.processThresholds(trnd); err != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> Trend with id <%s:%s> error: <%s> with ThresholdS", + utils.TrendS, tP.Tenant, tP.ID, err.Error())) + } + if err = tS.processEEs(trnd); err != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> Trend with id <%s:%s> error: <%s> with EEs", + utils.TrendS, tP.Tenant, tP.ID, err.Error())) + } } @@ -331,6 +345,7 @@ func (tS *TrendS) StartTrendS() error { return err } tS.crn.Start() + go tS.asyncStoreTrends() return nil } @@ -396,8 +411,13 @@ func (tS *TrendS) scheduleAutomaticQueries() error { } // scheduleTrendQueries will schedule/re-schedule specific trend queries -func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs []string) (scheduled int, err error) { +func (tS *TrendS) scheduleTrendQueries(_ *context.Context, tnt string, tIDs []string) (scheduled int, err error) { var partial bool + tS.crnTQsMux.Lock() + if _, has := tS.crnTQs[tnt]; !has { + tS.crnTQs[tnt] = make(map[string]cron.EntryID) + } + tS.crnTQsMux.Unlock() for _, tID := range tIDs { tS.crnTQsMux.RLock() if entryID, has := tS.crnTQs[tnt][tID]; has { @@ -419,7 +439,6 @@ func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs [] partial = true } else { // log the entry ID for debugging tS.crnTQsMux.Lock() - tS.crnTQs[tP.Tenant] = make(map[string]cron.EntryID) tS.crnTQs[tP.Tenant][tP.ID] = entryID tS.crnTQsMux.Unlock() } @@ -446,6 +465,9 @@ func (tS *TrendS) V1ScheduleQueries(ctx *context.Context, args *utils.ArgSchedul // // in this way being possible to work with paginators func (tS *TrendS) V1GetTrend(ctx *context.Context, arg *utils.ArgGetTrend, retTrend *Trend) (err error) { + if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) + } var trnd *Trend if trnd, err = tS.dm.GetTrend(arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil { return @@ -474,7 +496,7 @@ func (tS *TrendS) V1GetTrend(ctx *context.Context, arg *utils.ArgGetTrend, retTr } if arg.RunTimeEnd == utils.EmptyString { tEnd = runTimes[len(runTimes)-1].Add(time.Duration(1)) - } else if tEnd, err = utils.ParseTimeDetectLayout(arg.RunTimeStart, tS.cgrcfg.GeneralCfg().DefaultTimezone); err != nil { + } else if tEnd, err = utils.ParseTimeDetectLayout(arg.RunTimeEnd, tS.cgrcfg.GeneralCfg().DefaultTimezone); err != nil { return } retTrend.RunTimes = make([]time.Time, 0, len(runTimes)) @@ -483,7 +505,7 @@ func (tS *TrendS) V1GetTrend(ctx *context.Context, arg *utils.ArgGetTrend, retTr retTrend.RunTimes = append(retTrend.RunTimes, runTime) } } - if len(runTimes) == 0 { // filtered out all + if len(retTrend.RunTimes) == 0 { // filtered out all return utils.ErrNotFound } retTrend.Metrics = make(map[time.Time]map[string]*MetricWithTrend) @@ -492,3 +514,50 @@ func (tS *TrendS) V1GetTrend(ctx *context.Context, arg *utils.ArgGetTrend, retTr } return } + +func (tS *TrendS) V1GetScheduledTrends(ctx *context.Context, args *utils.ArgScheduledTrends, schedTrends *[]utils.ScheduledTrend) (err error) { + tnt := args.Tenant + if tnt == utils.EmptyString { + tnt = tS.cgrcfg.GeneralCfg().DefaultTenant + } + tS.crnTQsMux.RLock() + defer tS.crnTQsMux.RUnlock() + trendIDsMp, has := tS.crnTQs[tnt] + if !has { + return utils.ErrNotFound + } + var scheduledTrends []utils.ScheduledTrend + var entryIds map[string]cron.EntryID + if len(args.TrendIDPrefix) == 0 { + entryIds = trendIDsMp + } else { + entryIds = make(map[string]cron.EntryID) + for _, tID := range args.TrendIDPrefix { + for key, entryID := range trendIDsMp { + if strings.HasPrefix(key, tID) { + entryIds[key] = entryID + } + } + } + } + if len(entryIds) == 0 { + return utils.ErrNotFound + } + var entry cron.Entry + for id, entryID := range entryIds { + entry = tS.crn.Entry(entryID) + if entry.ID == 0 { + continue + } + scheduledTrends = append(scheduledTrends, utils.ScheduledTrend{ + TrendID: id, + Next: entry.Next, + Prev: entry.Prev, + }) + } + slices.SortFunc(scheduledTrends, func(a, b utils.ScheduledTrend) int { + return a.Next.Compare(b.Next) + }) + *schedTrends = scheduledTrends + return nil +} diff --git a/general_tests/trends_aut_it_test.go b/general_tests/trends_aut_it_test.go new file mode 100644 index 000000000..50efda8a5 --- /dev/null +++ b/general_tests/trends_aut_it_test.go @@ -0,0 +1,274 @@ +//go:build integration +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package general_tests + +import ( + "os/exec" + "path" + "testing" + "time" + + "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" +) + +var ( + trendAuQCfgPath string + trendAuQCfg *config.CGRConfig + trendAuQRpc *birpc.Client + trendAuQConfDIR string //run tests for specific configuration + + sTeststrendAuQEmpty = []func(t *testing.T){ + testtrendAuQLoadConfig, + testtrendAuQInitDataDb, + testtrendAuQFromFolder, + testtrendAuQStartEngine, + testtrendAuQRpcConn, + testScheduledTrends, + testtrendAuQStopEngine, + } + + sTeststrendAuQSchedIDs = []func(t *testing.T){ + testtrendAuQLoadConfig, + testtrendAuQInitDataDb, + testtrendAuQFromFolder, + testtrendAuQStartEngine, + testtrendAuQRpcConn, + testScheduledTrends2, + testtrendAuQStopEngine, + } +) + +// Test start here +func TestTrendAuQEmptyScheduleIDs(t *testing.T) { + switch *utils.DBType { + case utils.MetaInternal, utils.MetaPostgres: + t.SkipNow() + case utils.MetaMySQL: + trendAuQConfDIR = "trends_mysql" + case utils.MetaMongo: + trendAuQConfDIR = "trends_mongo" + default: + t.Fatal("Unknown Database type") + } + for _, stest := range sTeststrendAuQEmpty { + t.Run(trendAuQConfDIR, stest) + } +} + +func TestTrendAuQScheduleIDs(t *testing.T) { + switch *utils.DBType { + case utils.MetaInternal, utils.MetaPostgres: + t.SkipNow() + case utils.MetaMySQL: + trendAuQConfDIR = "trends_schedIDs_mysql" + case utils.MetaMongo: + trendAuQConfDIR = "trends_schedIDs_mongo" + default: + t.Fatal("Unknown Database type") + } + for _, stest := range sTeststrendAuQSchedIDs { + t.Run(trendAuQConfDIR, stest) + } +} + +func testtrendAuQLoadConfig(t *testing.T) { + var err error + trendAuQCfgPath = path.Join(*utils.DataDir, "conf", "samples", "trends", trendAuQConfDIR) + if trendAuQCfg, err = config.NewCGRConfigFromPath(trendAuQCfgPath); err != nil { + t.Error(err) + } +} + +func testtrendAuQInitDataDb(t *testing.T) { + if err := engine.InitDataDb(trendAuQCfg); err != nil { + t.Fatal(err) + } +} + +// Wipe out the cdr database +func testtrendAuQResetStorDb(t *testing.T) { + if err := engine.InitStorDb(trendAuQCfg); err != nil { + t.Fatal(err) + } +} + +func testtrendAuQStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(trendAuQCfgPath, *utils.WaitRater); err != nil { + t.Fatal(err) + } +} + +func testtrendAuQRpcConn(t *testing.T) { + var err error + trendAuQRpc, err = newRPCClient(trendAuQCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal("Could not connect to rater: ", err.Error()) + } +} + +func testtrendAuQFromFolder(t *testing.T) { + wchan := make(chan struct{}, 1) + go func() { + loaderPath, err := exec.LookPath("cgr-loader") + if err != nil { + t.Error(err) + } + loader := exec.Command(loaderPath, "-config_path", trendAuQCfgPath, "-path", path.Join(*utils.DataDir, "tariffplans", "tuttrends")) + if err := loader.Start(); err != nil { + t.Error(err) + } + loader.Wait() + wchan <- struct{}{} + }() + select { + case <-wchan: + case <-time.After(1 * time.Second): + t.Errorf("cgr-loader failed: ") + } +} + +func testScheduledTrends(t *testing.T) { + + // getting all scheduled trends for a tenant + var schedTrends []utils.ScheduledTrend + if err := trendAuQRpc.Call(context.Background(), utils.TrendSv1GetScheduledTrends, &utils.ArgScheduledTrends{TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org"}}, TrendIDPrefix: []string{}}, &schedTrends); err != nil { + t.Error(err) + } else if len(schedTrends) != 3 { + t.Errorf("expected 3 schedTrends, got %d", len(schedTrends)) + } + if err := trendAuQRpc.Call(context.Background(), utils.TrendSv1GetScheduledTrends, &utils.ArgScheduledTrends{TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "tenant1"}}, TrendIDPrefix: []string{}}, &schedTrends); err != nil { + t.Error(err) + } else if len(schedTrends) != 2 { + t.Errorf("expected 2 schedTrends, got %d", len(schedTrends)) + } + if err := trendAuQRpc.Call(context.Background(), utils.TrendSv1GetScheduledTrends, &utils.ArgScheduledTrends{TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "tenant2"}}, TrendIDPrefix: []string{}}, &schedTrends); err != nil { + t.Error(err) + } else if len(schedTrends) != 2 { + t.Errorf("expected 2 schedTrends, got %d", len(schedTrends)) + } + + // getting scheduled trends by the prefix + expTrends := []utils.ScheduledTrend{ + { + TrendID: "TREND_1", + Next: time.Now().Add(1 * time.Second), + }, + { + TrendID: "TREND_2", + Next: time.Now().Add(2 * time.Second), + }, + } + + if err := trendAuQRpc.Call(context.Background(), utils.TrendSv1GetScheduledTrends, &utils.ArgScheduledTrends{TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org"}}, TrendIDPrefix: []string{"TREND"}}, &schedTrends); err != nil { + t.Error(err) + } else if diff := cmp.Diff(schedTrends, expTrends, cmpopts.EquateApproxTime(2*time.Second), cmpopts.IgnoreFields(utils.ScheduledTrend{}, "Prev")); diff != utils.EmptyString { + t.Errorf("unexpected scheduled trends (-want +got)\n%s", diff) + } + + expTrends = []utils.ScheduledTrend{ + { + TrendID: "TR_1hr", + }, + } + + if err := trendAuQRpc.Call(context.Background(), utils.TrendSv1GetScheduledTrends, &utils.ArgScheduledTrends{TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "tenant1"}}, TrendIDPrefix: []string{"TR_1"}}, &schedTrends); err != nil { + t.Error(err) + } else if diff := cmp.Diff(schedTrends, expTrends, cmpopts.IgnoreFields(utils.ScheduledTrend{}, "Next"), cmpopts.IgnoreFields(utils.ScheduledTrend{}, "Prev")); diff != utils.EmptyString { + t.Errorf("unexpected scheduled trends (-want +got)\n%s", diff) + } + +} + +func testScheduledTrends2(t *testing.T) { + + // getting all scheduled trends for a tenant + var schedTrends []utils.ScheduledTrend + if err := trendAuQRpc.Call(context.Background(), utils.TrendSv1GetScheduledTrends, &utils.ArgScheduledTrends{TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org"}}, TrendIDPrefix: []string{}}, &schedTrends); err != nil { + t.Error(err) + } else if len(schedTrends) != 1 { + t.Errorf("expected 1 schedTrends, got %d", len(schedTrends)) + } + if err := trendAuQRpc.Call(context.Background(), utils.TrendSv1GetScheduledTrends, &utils.ArgScheduledTrends{TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "tenant1"}}, TrendIDPrefix: []string{}}, &schedTrends); err != nil { + t.Error(err) + } else if len(schedTrends) != 1 { + t.Errorf("expected 1 schedTrends, got %d", len(schedTrends)) + } + if err := trendAuQRpc.Call(context.Background(), utils.TrendSv1GetScheduledTrends, &utils.ArgScheduledTrends{TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "tenant2"}}, TrendIDPrefix: []string{}}, &schedTrends); err != nil { + t.Error(err) + } else if len(schedTrends) != 2 { + t.Errorf("expected 2 schedTrends, got %d", len(schedTrends)) + } + + expTrends := []utils.ScheduledTrend{ + { + TrendID: "TR_1min", + Next: time.Now().Add(1 * time.Minute), + }, + } + if err := trendAuQRpc.Call(context.Background(), utils.TrendSv1GetScheduledTrends, &utils.ArgScheduledTrends{TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org"}}, TrendIDPrefix: []string{}}, &schedTrends); err != nil { + t.Error(err) + } else if diff := cmp.Diff(schedTrends, expTrends, cmpopts.EquateApproxTime(1*time.Minute), cmpopts.IgnoreFields(utils.ScheduledTrend{}, "Prev")); diff != utils.EmptyString { + t.Errorf("unexpected scheduled trends (-want +got)\n%s", diff) + } + + expTrends = []utils.ScheduledTrend{ + { + TrendID: "TR_5min", + Next: time.Now().Add(5 * time.Minute), + }, + } + if err := trendAuQRpc.Call(context.Background(), utils.TrendSv1GetScheduledTrends, &utils.ArgScheduledTrends{TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "tenant1"}}, TrendIDPrefix: []string{}}, &schedTrends); err != nil { + t.Error(err) + } else if diff := cmp.Diff(schedTrends, expTrends, cmpopts.EquateApproxTime(5*time.Minute), cmpopts.IgnoreFields(utils.ScheduledTrend{}, "Prev")); diff != utils.EmptyString { + t.Errorf("unexpected scheduled trends (-want +got)\n%s", diff) + } + + // getting scheduled trends by the prefix + expTrends = []utils.ScheduledTrend{ + { + TrendID: "Trend_avg", + Next: time.Now().Add(10 * time.Minute), + }, + { + TrendID: "Trend_avg_30min", + Next: time.Now().Add(30 * time.Minute), + }, + } + + if err := trendAuQRpc.Call(context.Background(), utils.TrendSv1GetScheduledTrends, &utils.ArgScheduledTrends{TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "tenant2"}}, TrendIDPrefix: []string{"Trend_avg"}}, &schedTrends); err != nil { + t.Error(err) + } else if diff := cmp.Diff(schedTrends, expTrends, cmpopts.EquateApproxTime(10*time.Minute), cmpopts.IgnoreFields(utils.ScheduledTrend{}, "Prev")); diff != utils.EmptyString { + t.Errorf("unexpected scheduled trends (-want +got)\n%s", diff) + } + +} + +func testtrendAuQStopEngine(t *testing.T) { + if err := engine.KillEngine(100); err != nil { + t.Error(err) + } +} diff --git a/general_tests/trends_schedule_it_test.go b/general_tests/trends_schedule_it_test.go index 24cb84dfb..29d41fe8b 100644 --- a/general_tests/trends_schedule_it_test.go +++ b/general_tests/trends_schedule_it_test.go @@ -72,7 +72,7 @@ func TestTrendSchedule(t *testing.T) { tpFiles := map[string]string{ utils.TrendsCsv: `#Tenant[0],Id[1],Schedule[2],StatID[3],Metrics[4],TTL[5],QueueLength[6],MinItems[7],CorrelationType[8],Tolerance[9],Stored[10],ThresholdIDs[11] cgrates.org,TREND_1,@every 1s,Stats1_1,,-1,-1,1,*last,1,false, -cgrates.org,TREND_2,@every 2s,Stats1_2,,-1,-1,1,*last,1,false,`, +cgrates.org,TREND_2,@every 1s,Stats1_2,,-1,-1,1,*last,1,false,`, utils.StatsCsv: `#Tenant[0],Id[1],FilterIDs[2],ActivationInterval[3],QueueLength[4],TTL[5],MinItems[6],Metrics[7],MetricFilterIDs[8],Stored[9],Blocker[10],Weight[11],ThresholdIDs[12] cgrates.org,Stats1_1,*string:~*req.Account:1001,,,,,*tcc;*acd;*tcd,,,,, cgrates.org,Stats1_2,*string:~*req.Account:1002,,,,,*sum#~*req.Usage;*pdd,,,,,`} @@ -118,11 +118,8 @@ cgrates.org,Stats1_2,*string:~*req.Account:1002,,,,,*sum#~*req.Usage;*pdd,,,,,`} } else if len(tr.RunTimes) != 1 && len(tr.Metrics) != 1 { t.Error("expected metrics to be calculated") } - - if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_2", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr); err != nil { + if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_2", RunIndexStart: 1, TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) - } else if len(tr.RunTimes) != 0 && len(tr.Metrics) != 0 { - t.Error("expected no metrics to be calculated") } }) @@ -147,27 +144,32 @@ cgrates.org,Stats1_2,*string:~*req.Account:1002,,,,,*sum#~*req.Usage;*pdd,,,,,`} time.Sleep(1 * time.Second) t.Run("TestGetTrend", func(t *testing.T) { + timeEnd := time.Now().Add(-3 * time.Second).Format(time.RFC3339) var tr engine.Trend - if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr); err != nil { + if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", RunTimeEnd: timeEnd, TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) - } else if len(tr.RunTimes) != 2 && len(tr.Metrics) != 2 { - t.Error("expected metrics to be calculated") - } else if tr.Metrics[tr.RunTimes[1]]["*acd"].TrendLabel != utils.MetaNegative { - t.Error("expected TrendLabel to be negative") - } else if tr.Metrics[tr.RunTimes[1]]["*tcc"].TrendLabel != utils.MetaPositive { - t.Error("expected TrendLabel to be positive") - } else if tr.Metrics[tr.RunTimes[1]]["*tcd"].TrendLabel != utils.MetaPositive { - t.Error("expected TrendLabel to be positive") } - if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_2", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr); err != nil { + if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_2", RunIndexEnd: 4, TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr); err != nil { + t.Error(err) + } else if len(tr.RunTimes) != 2 && len(tr.Metrics) != 2 { + t.Errorf("expected to 2 runtimes, got %d", len(tr.RunTimes)) + } + }) + + time.Sleep(2 * time.Second) + t.Run("TestGetTrend", func(t *testing.T) { + timeStart := time.Now().Add(-4 * time.Second).Format(time.RFC3339) + timeEnd := time.Now().Add(-1 * time.Second).Format(time.RFC3339) + var tr engine.Trend + if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", RunIndexStart: 1, RunIndexEnd: 3, RunTimeStart: timeStart, RunTimeEnd: timeEnd, TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr); err != nil { + t.Error(err) + } else if len(tr.RunTimes) != 2 && len(tr.Metrics) != 1 { + t.Errorf("expected to 2 runtimes, got %d", len(tr.RunTimes)) + } + + if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_2", RunIndexStart: 5, RunIndexEnd: 3, TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) - } else if len(tr.RunTimes) != 1 && len(tr.Metrics) != 1 { - t.Error("expected metrics to be calculated") - } else if tr.Metrics[tr.RunTimes[0]]["*sum#~*req.Usage"].TrendLabel != utils.NotAvailable { - t.Error("expected TrendLabel to be negative") - } else if tr.Metrics[tr.RunTimes[0]]["*pdd"].TrendLabel != utils.NotAvailable { - t.Error("expected TrendLabel to be positive") } }) } diff --git a/services/trends.go b/services/trends.go index 43bbdf786..c810d02b2 100644 --- a/services/trends.go +++ b/services/trends.go @@ -76,13 +76,14 @@ func (trs *TrendService) Start() error { filterS := <-trs.filterSChan trs.filterSChan <- filterS dbchan := trs.dm.GetDMChan() - dm := <-dbchan - dbchan <- dm + datadb := <-dbchan + dbchan <- datadb + utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.TrendS)) trs.Lock() defer trs.Unlock() - trs.trs = engine.NewTrendS(dm, trs.connMgr, filterS, trs.cfg) + trs.trs = engine.NewTrendS(datadb, trs.connMgr, filterS, trs.cfg) if err := trs.trs.StartTrendS(); err != nil { return err } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index fdc81284b..ac284fb76 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1658,6 +1658,10 @@ type ArgScheduleTrendQueries struct { TenantIDWithAPIOpts TrendIDs []string } +type ArgScheduledTrends struct { + TenantIDWithAPIOpts + TrendIDPrefix []string +} type ArgGetTrend struct { TenantWithAPIOpts @@ -1667,3 +1671,9 @@ type ArgGetTrend struct { RunTimeStart string RunTimeEnd string } + +type ScheduledTrend struct { + TrendID string + Next time.Time + Prev time.Time +} diff --git a/utils/consts.go b/utils/consts.go index 8c194a509..09f4a87b8 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1706,6 +1706,7 @@ const ( TrendSv1Ping = "TrendSv1.Ping" TrendSv1ScheduleQueries = "TrendSv1.ScheduleQueries" TrendSv1GetTrend = "TrendSv1.GetTrend" + TrendSv1GetScheduledTrends = "TrendSv1.GetScheduledTrends" ) // RankingS APIs