From cd3159d8ea96b0ae1499640bcfa0a4474669011e Mon Sep 17 00:00:00 2001 From: gezimbll Date: Thu, 12 Dec 2024 16:24:13 +0100 Subject: [PATCH] revised trend compress method and added store_interval tests for trends&rankings --- engine/datamanager.go | 10 +- engine/libtrends.go | 14 +- engine/trends.go | 6 +- general_tests/rankings_stored_it_test.go | 306 +++++++++++++++++++++++ general_tests/trends_schedule_it_test.go | 3 - general_tests/trends_stored_it_test.go | 279 +++++++++++++++++++++ 6 files changed, 602 insertions(+), 16 deletions(-) create mode 100644 general_tests/rankings_stored_it_test.go create mode 100644 general_tests/trends_stored_it_test.go diff --git a/engine/datamanager.go b/engine/datamanager.go index ece6dc7c4..531f206b6 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -1032,7 +1032,7 @@ func (dm *DataManager) SetTrend(ctx *context.Context, tr *Trend) (err error) { return utils.ErrNoDatabaseConn } if dm.dataDB.GetStorageType() != utils.MetaInternal { - if err = tr.compress(dm.ms); err != nil { + if tr, err = tr.compress(dm.ms); err != nil { return } } @@ -1391,10 +1391,10 @@ func (dm *DataManager) GetRanking(ctx *context.Context, tenant, id string, cache } return nil, err } - if cacheWrite { - if errCh := Cache.Set(ctx, utils.CacheRankings, tntID, rn, nil, cacheCommit(transactionID), transactionID); errCh != nil { - return nil, errCh - } + } + if cacheWrite { + if errCh := Cache.Set(ctx, utils.CacheRankings, tntID, rn, nil, cacheCommit(transactionID), transactionID); errCh != nil { + return nil, errCh } } return diff --git a/engine/libtrends.go b/engine/libtrends.go index 6c54b5359..ffbea50b7 100644 --- a/engine/libtrends.go +++ b/engine/libtrends.go @@ -143,17 +143,19 @@ func (t *Trend) asTrendSummary() (ts *TrendSummary) { return } -func (t *Trend) compress(ms utils.Marshaler) (err error) { +func (t *Trend) compress(ms utils.Marshaler) (tr *Trend, err error) { if config.CgrConfig().TrendSCfg().StoreUncompressedLimit > len(t.RunTimes) { return } - t.CompressedMetrics, err = ms.Marshal(t.Metrics) + tr = &Trend{ + Tenant: t.Tenant, + ID: t.ID, + } + tr.CompressedMetrics, err = ms.Marshal(tr.Metrics) if err != nil { return } - t.Metrics = nil - t.RunTimes = nil - return nil + return tr, nil } func (t *Trend) uncompress(ms utils.Marshaler) (err error) { @@ -165,7 +167,7 @@ func (t *Trend) uncompress(ms utils.Marshaler) (err error) { if err != nil { return } - t.CompressedMetrics = []byte{} + t.CompressedMetrics = nil t.RunTimes = make([]time.Time, len(t.Metrics)) i := 0 for key := range t.Metrics { diff --git a/engine/trends.go b/engine/trends.go index 8a32b3afb..9c3003482 100644 --- a/engine/trends.go +++ b/engine/trends.go @@ -285,14 +285,14 @@ func (tS *TrendS) storeTrends(ctx *context.Context) { continue } trnd := trndIf.(*Trend) - trnd.tMux.RLock() + trnd.tMux.Lock() if err := tS.dm.SetTrend(ctx, trnd); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> failed storing Trend with ID: %q, err: %q", utils.TrendS, trndID, err)) failedTrndIDs = append(failedTrndIDs, trndID) // record failure so we can schedule it for next backup } - trnd.tMux.RUnlock() + trnd.tMux.Unlock() // randomize the CPU load and give up thread control runtime.Gosched() } @@ -465,6 +465,8 @@ func (tS *TrendS) V1GetTrend(ctx *context.Context, arg *utils.ArgGetTrend, retTr if trnd, err = tS.dm.GetTrend(ctx, arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil { return } + trnd.tMux.RLock() + defer trnd.tMux.RUnlock() retTrend.Tenant = trnd.Tenant // avoid vet complaining for mutex copying retTrend.ID = trnd.ID startIdx := arg.RunIndexStart diff --git a/general_tests/rankings_stored_it_test.go b/general_tests/rankings_stored_it_test.go new file mode 100644 index 000000000..d003fd082 --- /dev/null +++ b/general_tests/rankings_stored_it_test.go @@ -0,0 +1,306 @@ +//go:build integration +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package general_tests + +import ( + "fmt" + "slices" + "testing" + "time" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func TestRankingStore(t *testing.T) { + var dbConfig engine.DBCfg + switch *utils.DBType { + case utils.MetaMySQL: + case utils.MetaMongo: + dbConfig = engine.DBCfg{ + DataDB: &engine.DBParams{ + Type: utils.StringPointer("mongo"), + Port: utils.IntPointer(27017), + Name: utils.StringPointer("10"), + }, + StorDB: &engine.DBParams{ + Type: utils.StringPointer("mongo"), + Name: utils.StringPointer("cgrates"), + Port: utils.IntPointer(27017), + Password: utils.StringPointer(""), + }, + } + case utils.MetaInternal, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("unsupported dbtype value") + } + content := `{ + +"general": { + "log_level": 7, +}, +"rankings": { + "enabled": true, + "store_interval": "1500ms", + "stats_conns":["*localhost"], +}, + +"stats": { + "enabled": true, + "store_interval": "-1", +}, + +"admins": { + "enabled": true, +}, + +} +` + tpFiles := map[string]string{ + utils.RankingsCsv: `#Tenant[0],Id[1],Schedule[2],StatIDs[3],MetricIDs[4],Sorting[5],SortingParameters[6],Stored[7],ThresholdIDs[8] +cgrates.org,RANK1,@every 1s,Stats1;Stats2;Stats3;Stats4,,*asc,*acc;*pdd:false;*acd,,`, + utils.StatsCsv: `#Tenant[0],Id[1],FilterIDs[2],Weights[3],Blockers[4],QueueLength[5],TTL[6],MinItems[7],Stored[8],ThresholdIDs[9],MetricIDs[10],MetricFilterIDs[11],MetricBlockers[12] +cgrates.org,Stats1,*string:~*req.Account:1001,,,,-1,,,,*acc;*acd;*pdd,, +cgrates.org,Stats2,*string:~*req.Account:1002,,,,-1,,,,*acc;*acd;*pdd,, +cgrates.org,Stats3,*string:~*req.Account:1003,,,,-1,,,,*acc;*acd;*pdd,, +cgrates.org,Stats4,*string:~*req.Account:1004,,,,-1,,,,*acc;*acd;*pdd,,`} + + ng := engine.TestEngine{ + ConfigJSON: content, + TpFiles: tpFiles, + DBCfg: dbConfig, + Encoding: *utils.Encoding, + } + client, _ := ng.Run(t) + time.Sleep(200 * time.Millisecond) + var lastUpdate time.Time + t.Run("RankingSchedule", func(t *testing.T) { + var scheduled int + if err := client.Call(context.Background(), utils.RankingSv1ScheduleQueries, + &utils.ArgScheduleRankingQueries{RankingIDs: []string{"RANK1"}, TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org"}}}, &scheduled); err != nil { + t.Fatal(err) + } else if scheduled != 1 { + t.Errorf("expected 1, got %d", scheduled) + } + }) + t.Run("ProcessStats", func(t *testing.T) { + var reply []string + for i := 1; i <= 4; i++ { + if err := client.Call(context.Background(), utils.StatSv1ProcessEvent, &utils.CGREvent{ + Tenant: "cgrates.org", + ID: fmt.Sprintf("event%d", i), + Event: map[string]any{ + utils.AccountField: fmt.Sprintf("100%d", i), + utils.AnswerTime: time.Date(2024, 8, 22, 14, 25, 0, 0, time.UTC), + }, APIOpts: map[string]any{ + utils.MetaUsage: time.Duration(1800+60) / time.Duration(i) * time.Second, + utils.MetaCost: 20.0 + float64((i*7)%10)/2, + utils.MetaPDD: time.Duration(10+i*2) * time.Second, + }}, &reply); err != nil { + t.Error(err) + } + } + }) + + t.Run("GetRankingsAfterStoreInterval", func(t *testing.T) { + rankingsChan := make(chan *engine.Ranking, 1) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + go func() { + ticker := time.NewTicker(600 * time.Millisecond) + var rnk engine.Ranking + for { + select { + case <-ticker.C: + err := client.Call(context.Background(), utils.RankingSv1GetRanking, &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "RANK1"}}, &rnk) + if err != nil { + if err.Error() != utils.ErrNotFound.Error() { + t.Errorf("Ranking retrieval error: %v", err) + } + continue + } else if rnk.LastUpdate.IsZero() { + continue + } + rankingsChan <- &rnk + case <-ctx.Done(): + return + } + + } + }() + + select { + case rnk := <-rankingsChan: + lastUpdate = rnk.LastUpdate + sortedStatIDs := []string{"Stats3", "Stats2", "Stats1", "Stats4"} + if !slices.Equal(rnk.SortedStatIDs, sortedStatIDs) { + fmt.Println(utils.ToJSON(rnk)) + t.Error("should have sorted statids") + } + case <-ctx.Done(): + t.Error("Didn't get any ranking from db") + } + }) + + t.Run("ProcessStats", func(t *testing.T) { + var reply []string + j := 5 + for i := 1; i <= 4; i++ { + j-- + if err := client.Call(context.Background(), utils.StatSv1ProcessEvent, &utils.CGREvent{ + Tenant: "cgrates.org", + ID: fmt.Sprintf("event%d", i), + Event: map[string]any{ + utils.AccountField: fmt.Sprintf("100%d", i), + utils.AnswerTime: time.Date(2024, 8, 22, 14, 25, 0, 0, time.UTC), + }, APIOpts: map[string]any{ + utils.MetaUsage: time.Duration(1500+60) / time.Duration(j) * time.Second, + utils.MetaCost: 20.0 + float64((j*8)%10)/2, + utils.MetaPDD: time.Duration(11+j*2) * time.Second, + }}, &reply); err != nil { + t.Error(err) + } + } + }) + + t.Run("RankingsSetConfig", func(t *testing.T) { + var reply string + // setting store interval to 0 + if err := client.Call(context.Background(), utils.ConfigSv1SetConfig, &config.SetConfigArgs{ + Tenant: "cgrates.org", + Config: map[string]any{ + "rankings": map[string]any{ + "enabled": true, + "stats_conns": []string{"*localhost"}, + "store_interval": "0", + }, + }, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expected OK received: %s", reply) + } + }) + t.Run("RankingSchedule", func(t *testing.T) { + var scheduled int + if err := client.Call(context.Background(), utils.RankingSv1ScheduleQueries, + &utils.ArgScheduleRankingQueries{RankingIDs: []string{"RANK1"}, TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org"}}}, &scheduled); err != nil { + t.Fatal(err) + } else if scheduled != 1 { + t.Errorf("expected 1, got %d", scheduled) + } + }) + + t.Run("GetRankingsNotStored", func(t *testing.T) { + metricsChan := make(chan *engine.Ranking, 1) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + go func() { + for { + ticker := time.NewTicker(700 * time.Millisecond) + select { + case <-ticker.C: + var rnk engine.Ranking + err := client.Call(context.Background(), utils.RankingSv1GetRanking, &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "RANK1"}}, &rnk) + if err != nil { + if err.Error() != utils.ErrNotFound.Error() { + t.Errorf("Ranking retrieval error: %v", err) + } + continue + } else if rnk.LastUpdate.Equal(lastUpdate) { + continue + } + metricsChan <- &rnk + case <-ctx.Done(): + return + } + } + }() + + select { + case rnk := <-metricsChan: + lastUpdate = rnk.LastUpdate + case <-ctx.Done(): + t.Error("Didn't get any Ranking from db") + } + }) + t.Run("RankingsSetConfig", func(t *testing.T) { + var reply string + // setting store interval to -1 + if err := client.Call(context.Background(), utils.ConfigSv1SetConfig, &config.SetConfigArgs{ + Tenant: "cgrates.org", + Config: map[string]any{ + "rankings": map[string]any{ + "enabled": true, + "stats_conns": []string{"*localhost"}, + "store_interval": "-1", + }, + }, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expected OK received: %s", reply) + } + }) + t.Run("RankingsSchedule", func(t *testing.T) { + var scheduled int + if err := client.Call(context.Background(), utils.RankingSv1ScheduleQueries, + &utils.ArgScheduleRankingQueries{RankingIDs: []string{"RANK1"}, TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org"}}}, &scheduled); err != nil { + t.Fatal(err) + } else if scheduled != 1 { + t.Errorf("expected 1, got %d", scheduled) + } + }) + t.Run("GetRankingsStoredUnlimited", func(t *testing.T) { + rankingChan := make(chan *engine.Ranking, 1) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + go func() { + ticker := time.NewTicker(1000 * time.Millisecond) + for { + select { + case <-ticker.C: + var rnk engine.Ranking + err := client.Call(context.Background(), utils.RankingSv1GetRanking, &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "RANK1"}}, &rnk) + if err != nil { + if err.Error() != utils.ErrNotFound.Error() { + t.Errorf("Ranking retrieval error: %v", err) + } + continue + } else if rnk.LastUpdate.Equal(lastUpdate) { + continue + } + rankingChan <- &rnk + case <-ctx.Done(): + return + } + } + }() + select { + case <-rankingChan: + case <-ctx.Done(): + t.Error("Didn't get any Ranking from db") + } + }) +} diff --git a/general_tests/trends_schedule_it_test.go b/general_tests/trends_schedule_it_test.go index 414de561e..8fcb20bf5 100644 --- a/general_tests/trends_schedule_it_test.go +++ b/general_tests/trends_schedule_it_test.go @@ -66,9 +66,6 @@ func TestTrendSchedule(t *testing.T) { "store_interval": "-1", }, -"apiers": { - "enabled": true, -}, "admins": { "enabled": true, }, diff --git a/general_tests/trends_stored_it_test.go b/general_tests/trends_stored_it_test.go new file mode 100644 index 000000000..c3e1b7a97 --- /dev/null +++ b/general_tests/trends_stored_it_test.go @@ -0,0 +1,279 @@ +//go:build integration +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package general_tests + +import ( + "bytes" + "testing" + "time" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func TestTrendStore(t *testing.T) { + var dbConfig engine.DBCfg + switch *utils.DBType { + case utils.MetaMySQL: + case utils.MetaMongo: + dbConfig = engine.DBCfg{ + DataDB: &engine.DBParams{ + Type: utils.StringPointer("mongo"), + Port: utils.IntPointer(27017), + Name: utils.StringPointer("10"), + }, + StorDB: &engine.DBParams{ + Type: utils.StringPointer("mongo"), + Name: utils.StringPointer("cgrates"), + Port: utils.IntPointer(27017), + Password: utils.StringPointer(""), + }, + } + case utils.MetaInternal, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("unsupported dbtype value") + } + content := `{ + +"general": { + "log_level": 7, +}, +"trends": { + "enabled": true, + "store_interval": "1500ms", + "stats_conns":["*localhost"], +}, + +"stats": { + "enabled": true, + "store_interval": "-1", +}, + +"admins": { + "enabled": true, +}, + +} +` + tpFiles := map[string]string{ + utils.TrendsCsv: `#Tenant[0],Id[1],Schedule[2],StatID[3],Metrics[4],TTL[5],QueueLength[6],MinItems[7],CorrelationType[8],Tolerance[9],Stored[10],ThresholdIDs[11] +cgrates.org,TREND_1,@every 1s,Stats1_1,,-1,-1,1,*last,1,false,`, + utils.StatsCsv: `#Tenant[0],Id[1],FilterIDs[2],Weights[3],Blockers[4],QueueLength[5],TTL[6],MinItems[7],Stored[8],ThresholdIDs[9],MetricIDs[10],MetricFilterIDs[11],MetricBlockers[12] +cgrates.org,Stats1_1,*string:~*req.Account:1001,,,,-1,,,,*tcc;*acd;*tcd,,`} + + ng := engine.TestEngine{ + ConfigJSON: content, + TpFiles: tpFiles, + DBCfg: dbConfig, + Encoding: *utils.Encoding, + LogBuffer: bytes.NewBuffer(nil), + } + + client, _ := ng.Run(t) + time.Sleep(200 * time.Millisecond) + t.Run("TrendSchedule", func(t *testing.T) { + var scheduled int + if err := client.Call(context.Background(), utils.TrendSv1ScheduleQueries, + &utils.ArgScheduleTrendQueries{TrendIDs: []string{"TREND_1"}, TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org"}}}, &scheduled); err != nil { + t.Fatal(err) + } else if scheduled != 1 { + t.Errorf("expected 1, got %d", scheduled) + } + }) + t.Run("ProcessStats", func(t *testing.T) { + var reply []string + if err := client.Call(context.Background(), utils.StatSv1ProcessEvent, &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event1", + Event: map[string]any{ + utils.AccountField: "1001", + utils.AnswerTime: time.Date(2024, 8, 22, 14, 25, 0, 0, time.UTC), + }, + APIOpts: map[string]any{ + utils.MetaUsage: time.Duration(1800+60) * time.Second, + utils.MetaCost: 20 + float64(10), + utils.MetaPDD: time.Duration(10 * time.Second), + }}, &reply); err != nil { + t.Error(err) + } + + }) + + t.Run("GetTrendsAfterStoreInterval", func(t *testing.T) { + metricsChan := make(chan *engine.Trend, 1) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + go func() { + ticker := time.NewTicker(600 * time.Millisecond) + var trnd engine.Trend + for { + select { + case <-ticker.C: + err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &trnd) + if err != nil { + if err.Error() != utils.ErrNotFound.Error() { + t.Errorf("Trend retrieval error: %v", err) + } + continue + } + metricsChan <- &trnd + case <-ctx.Done(): + return + } + + } + }() + + select { + case trnd := <-metricsChan: + if len(trnd.RunTimes) < 1 || len(trnd.Metrics) < 1 { + t.Errorf("expected at least 1 runtime, got %d", len(trnd.RunTimes)) + } + case <-ctx.Done(): + t.Error("Didn't get any trend from db") + } + }) + + t.Run("TrendsSetConfig", func(t *testing.T) { + var reply string + // setting store interval to 0 + if err := client.Call(context.Background(), utils.ConfigSv1SetConfig, &config.SetConfigArgs{ + Tenant: "cgrates.org", + Config: map[string]any{ + "trends": map[string]any{ + "enabled": true, + "stats_conns": []string{"*localhost"}, + "store_interval": "0", + }, + }, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expected OK received: %s", reply) + } + }) + t.Run("TrendSchedule", func(t *testing.T) { + var scheduled int + if err := client.Call(context.Background(), utils.TrendSv1ScheduleQueries, + &utils.ArgScheduleTrendQueries{TrendIDs: []string{"TREND_1"}, TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org"}}}, &scheduled); err != nil { + t.Fatal(err) + } else if scheduled != 1 { + t.Errorf("expected 1, got %d", scheduled) + } + }) + + t.Run("GetTrendsNotStored", func(t *testing.T) { + metricsChan := make(chan *engine.Trend, 1) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + go func() { + for { + ticker := time.NewTicker(700 * time.Millisecond) + select { + case <-ticker.C: + var trnd engine.Trend + // the trend will be not updated since storeinterval is set to 0 + err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &trnd) + if err != nil { + if err.Error() != utils.ErrNotFound.Error() { + t.Errorf("Trend retrieval error: %v", err) + } + continue + } + metricsChan <- &trnd + case <-ctx.Done(): + return + } + } + }() + + select { + case trnd := <-metricsChan: + if len(trnd.RunTimes) < 1 && len(trnd.Metrics) < 1 { + t.Errorf("expected 1 runtime, got %d", len(trnd.RunTimes)) + } + case <-ctx.Done(): + t.Error("Didn't get any trend from db") + } + }) + t.Run("TrendsSetConfig", func(t *testing.T) { + var reply string + // setting store interval to -1 + if err := client.Call(context.Background(), utils.ConfigSv1SetConfig, &config.SetConfigArgs{ + Tenant: "cgrates.org", + Config: map[string]any{ + "trends": map[string]any{ + "enabled": true, + "stats_conns": []string{"*localhost"}, + "store_interval": "-1", + }, + }, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expected OK received: %s", reply) + } + }) + t.Run("TrendSchedule", func(t *testing.T) { + var scheduled int + if err := client.Call(context.Background(), utils.TrendSv1ScheduleQueries, + &utils.ArgScheduleTrendQueries{TrendIDs: []string{"TREND_1"}, TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org"}}}, &scheduled); err != nil { + t.Fatal(err) + } else if scheduled != 1 { + t.Errorf("expected 1, got %d", scheduled) + } + }) + t.Run("GetTrendsStoredUnlimited", func(t *testing.T) { + metricsChan := make(chan *engine.Trend, 1) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + go func() { + ticker := time.NewTicker(1000 * time.Millisecond) + for { + select { + case <-ticker.C: + var trnd engine.Trend + err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &trnd) + if err != nil { + if err.Error() != utils.ErrNotFound.Error() { + t.Errorf("Trend retrieval error: %v", err) + } + continue + } + metricsChan <- &trnd + case <-ctx.Done(): + return + } + } + }() + select { + case trnd := <-metricsChan: + if len(trnd.RunTimes) < 2 || len(trnd.Metrics) < 2 { + t.Errorf("expected at least 2 runtimes, got %d", len(trnd.RunTimes)) + } + case <-ctx.Done(): + t.Error("Didn't get any trend from db") + } + }) +}