diff --git a/engine/datamanager.go b/engine/datamanager.go
index ece6dc7c4..531f206b6 100644
--- a/engine/datamanager.go
+++ b/engine/datamanager.go
@@ -1032,7 +1032,7 @@ func (dm *DataManager) SetTrend(ctx *context.Context, tr *Trend) (err error) {
return utils.ErrNoDatabaseConn
}
if dm.dataDB.GetStorageType() != utils.MetaInternal {
- if err = tr.compress(dm.ms); err != nil {
+ if tr, err = tr.compress(dm.ms); err != nil {
return
}
}
@@ -1391,10 +1391,10 @@ func (dm *DataManager) GetRanking(ctx *context.Context, tenant, id string, cache
}
return nil, err
}
- if cacheWrite {
- if errCh := Cache.Set(ctx, utils.CacheRankings, tntID, rn, nil, cacheCommit(transactionID), transactionID); errCh != nil {
- return nil, errCh
- }
+ }
+ if cacheWrite {
+ if errCh := Cache.Set(ctx, utils.CacheRankings, tntID, rn, nil, cacheCommit(transactionID), transactionID); errCh != nil {
+ return nil, errCh
}
}
return
diff --git a/engine/libtrends.go b/engine/libtrends.go
index 6c54b5359..ffbea50b7 100644
--- a/engine/libtrends.go
+++ b/engine/libtrends.go
@@ -143,17 +143,19 @@ func (t *Trend) asTrendSummary() (ts *TrendSummary) {
return
}
-func (t *Trend) compress(ms utils.Marshaler) (err error) {
+func (t *Trend) compress(ms utils.Marshaler) (tr *Trend, err error) {
if config.CgrConfig().TrendSCfg().StoreUncompressedLimit > len(t.RunTimes) {
return
}
- t.CompressedMetrics, err = ms.Marshal(t.Metrics)
+ tr = &Trend{
+ Tenant: t.Tenant,
+ ID: t.ID,
+ }
+ tr.CompressedMetrics, err = ms.Marshal(tr.Metrics)
if err != nil {
return
}
- t.Metrics = nil
- t.RunTimes = nil
- return nil
+ return tr, nil
}
func (t *Trend) uncompress(ms utils.Marshaler) (err error) {
@@ -165,7 +167,7 @@ func (t *Trend) uncompress(ms utils.Marshaler) (err error) {
if err != nil {
return
}
- t.CompressedMetrics = []byte{}
+ t.CompressedMetrics = nil
t.RunTimes = make([]time.Time, len(t.Metrics))
i := 0
for key := range t.Metrics {
diff --git a/engine/trends.go b/engine/trends.go
index 8a32b3afb..9c3003482 100644
--- a/engine/trends.go
+++ b/engine/trends.go
@@ -285,14 +285,14 @@ func (tS *TrendS) storeTrends(ctx *context.Context) {
continue
}
trnd := trndIf.(*Trend)
- trnd.tMux.RLock()
+ trnd.tMux.Lock()
if err := tS.dm.SetTrend(ctx, trnd); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> failed storing Trend with ID: %q, err: %q",
utils.TrendS, trndID, err))
failedTrndIDs = append(failedTrndIDs, trndID) // record failure so we can schedule it for next backup
}
- trnd.tMux.RUnlock()
+ trnd.tMux.Unlock()
// randomize the CPU load and give up thread control
runtime.Gosched()
}
@@ -465,6 +465,8 @@ func (tS *TrendS) V1GetTrend(ctx *context.Context, arg *utils.ArgGetTrend, retTr
if trnd, err = tS.dm.GetTrend(ctx, arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
return
}
+ trnd.tMux.RLock()
+ defer trnd.tMux.RUnlock()
retTrend.Tenant = trnd.Tenant // avoid vet complaining for mutex copying
retTrend.ID = trnd.ID
startIdx := arg.RunIndexStart
diff --git a/general_tests/rankings_stored_it_test.go b/general_tests/rankings_stored_it_test.go
new file mode 100644
index 000000000..d003fd082
--- /dev/null
+++ b/general_tests/rankings_stored_it_test.go
@@ -0,0 +1,306 @@
+//go:build integration
+// +build integration
+
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+package general_tests
+
+import (
+ "fmt"
+ "slices"
+ "testing"
+ "time"
+
+ "github.com/cgrates/birpc/context"
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+)
+
+func TestRankingStore(t *testing.T) {
+ var dbConfig engine.DBCfg
+ switch *utils.DBType {
+ case utils.MetaMySQL:
+ case utils.MetaMongo:
+ dbConfig = engine.DBCfg{
+ DataDB: &engine.DBParams{
+ Type: utils.StringPointer("mongo"),
+ Port: utils.IntPointer(27017),
+ Name: utils.StringPointer("10"),
+ },
+ StorDB: &engine.DBParams{
+ Type: utils.StringPointer("mongo"),
+ Name: utils.StringPointer("cgrates"),
+ Port: utils.IntPointer(27017),
+ Password: utils.StringPointer(""),
+ },
+ }
+ case utils.MetaInternal, utils.MetaPostgres:
+ t.SkipNow()
+ default:
+ t.Fatal("unsupported dbtype value")
+ }
+ content := `{
+
+"general": {
+ "log_level": 7,
+},
+"rankings": {
+ "enabled": true,
+ "store_interval": "1500ms",
+ "stats_conns":["*localhost"],
+},
+
+"stats": {
+ "enabled": true,
+ "store_interval": "-1",
+},
+
+"admins": {
+ "enabled": true,
+},
+
+}
+`
+ tpFiles := map[string]string{
+ utils.RankingsCsv: `#Tenant[0],Id[1],Schedule[2],StatIDs[3],MetricIDs[4],Sorting[5],SortingParameters[6],Stored[7],ThresholdIDs[8]
+cgrates.org,RANK1,@every 1s,Stats1;Stats2;Stats3;Stats4,,*asc,*acc;*pdd:false;*acd,,`,
+ utils.StatsCsv: `#Tenant[0],Id[1],FilterIDs[2],Weights[3],Blockers[4],QueueLength[5],TTL[6],MinItems[7],Stored[8],ThresholdIDs[9],MetricIDs[10],MetricFilterIDs[11],MetricBlockers[12]
+cgrates.org,Stats1,*string:~*req.Account:1001,,,,-1,,,,*acc;*acd;*pdd,,
+cgrates.org,Stats2,*string:~*req.Account:1002,,,,-1,,,,*acc;*acd;*pdd,,
+cgrates.org,Stats3,*string:~*req.Account:1003,,,,-1,,,,*acc;*acd;*pdd,,
+cgrates.org,Stats4,*string:~*req.Account:1004,,,,-1,,,,*acc;*acd;*pdd,,`}
+
+ ng := engine.TestEngine{
+ ConfigJSON: content,
+ TpFiles: tpFiles,
+ DBCfg: dbConfig,
+ Encoding: *utils.Encoding,
+ }
+ client, _ := ng.Run(t)
+ time.Sleep(200 * time.Millisecond)
+ var lastUpdate time.Time
+ t.Run("RankingSchedule", func(t *testing.T) {
+ var scheduled int
+ if err := client.Call(context.Background(), utils.RankingSv1ScheduleQueries,
+ &utils.ArgScheduleRankingQueries{RankingIDs: []string{"RANK1"}, TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org"}}}, &scheduled); err != nil {
+ t.Fatal(err)
+ } else if scheduled != 1 {
+ t.Errorf("expected 1, got %d", scheduled)
+ }
+ })
+ t.Run("ProcessStats", func(t *testing.T) {
+ var reply []string
+ for i := 1; i <= 4; i++ {
+ if err := client.Call(context.Background(), utils.StatSv1ProcessEvent, &utils.CGREvent{
+ Tenant: "cgrates.org",
+ ID: fmt.Sprintf("event%d", i),
+ Event: map[string]any{
+ utils.AccountField: fmt.Sprintf("100%d", i),
+ utils.AnswerTime: time.Date(2024, 8, 22, 14, 25, 0, 0, time.UTC),
+ }, APIOpts: map[string]any{
+ utils.MetaUsage: time.Duration(1800+60) / time.Duration(i) * time.Second,
+ utils.MetaCost: 20.0 + float64((i*7)%10)/2,
+ utils.MetaPDD: time.Duration(10+i*2) * time.Second,
+ }}, &reply); err != nil {
+ t.Error(err)
+ }
+ }
+ })
+
+ t.Run("GetRankingsAfterStoreInterval", func(t *testing.T) {
+ rankingsChan := make(chan *engine.Ranking, 1)
+ ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ defer cancel()
+ go func() {
+ ticker := time.NewTicker(600 * time.Millisecond)
+ var rnk engine.Ranking
+ for {
+ select {
+ case <-ticker.C:
+ err := client.Call(context.Background(), utils.RankingSv1GetRanking, &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "RANK1"}}, &rnk)
+ if err != nil {
+ if err.Error() != utils.ErrNotFound.Error() {
+ t.Errorf("Ranking retrieval error: %v", err)
+ }
+ continue
+ } else if rnk.LastUpdate.IsZero() {
+ continue
+ }
+ rankingsChan <- &rnk
+ case <-ctx.Done():
+ return
+ }
+
+ }
+ }()
+
+ select {
+ case rnk := <-rankingsChan:
+ lastUpdate = rnk.LastUpdate
+ sortedStatIDs := []string{"Stats3", "Stats2", "Stats1", "Stats4"}
+ if !slices.Equal(rnk.SortedStatIDs, sortedStatIDs) {
+ fmt.Println(utils.ToJSON(rnk))
+ t.Error("should have sorted statids")
+ }
+ case <-ctx.Done():
+ t.Error("Didn't get any ranking from db")
+ }
+ })
+
+ t.Run("ProcessStats", func(t *testing.T) {
+ var reply []string
+ j := 5
+ for i := 1; i <= 4; i++ {
+ j--
+ if err := client.Call(context.Background(), utils.StatSv1ProcessEvent, &utils.CGREvent{
+ Tenant: "cgrates.org",
+ ID: fmt.Sprintf("event%d", i),
+ Event: map[string]any{
+ utils.AccountField: fmt.Sprintf("100%d", i),
+ utils.AnswerTime: time.Date(2024, 8, 22, 14, 25, 0, 0, time.UTC),
+ }, APIOpts: map[string]any{
+ utils.MetaUsage: time.Duration(1500+60) / time.Duration(j) * time.Second,
+ utils.MetaCost: 20.0 + float64((j*8)%10)/2,
+ utils.MetaPDD: time.Duration(11+j*2) * time.Second,
+ }}, &reply); err != nil {
+ t.Error(err)
+ }
+ }
+ })
+
+ t.Run("RankingsSetConfig", func(t *testing.T) {
+ var reply string
+ // setting store interval to 0
+ if err := client.Call(context.Background(), utils.ConfigSv1SetConfig, &config.SetConfigArgs{
+ Tenant: "cgrates.org",
+ Config: map[string]any{
+ "rankings": map[string]any{
+ "enabled": true,
+ "stats_conns": []string{"*localhost"},
+ "store_interval": "0",
+ },
+ },
+ }, &reply); err != nil {
+ t.Error(err)
+ } else if reply != utils.OK {
+ t.Errorf("Expected OK received: %s", reply)
+ }
+ })
+ t.Run("RankingSchedule", func(t *testing.T) {
+ var scheduled int
+ if err := client.Call(context.Background(), utils.RankingSv1ScheduleQueries,
+ &utils.ArgScheduleRankingQueries{RankingIDs: []string{"RANK1"}, TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org"}}}, &scheduled); err != nil {
+ t.Fatal(err)
+ } else if scheduled != 1 {
+ t.Errorf("expected 1, got %d", scheduled)
+ }
+ })
+
+ t.Run("GetRankingsNotStored", func(t *testing.T) {
+ metricsChan := make(chan *engine.Ranking, 1)
+ ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
+ defer cancel()
+ go func() {
+ for {
+ ticker := time.NewTicker(700 * time.Millisecond)
+ select {
+ case <-ticker.C:
+ var rnk engine.Ranking
+ err := client.Call(context.Background(), utils.RankingSv1GetRanking, &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "RANK1"}}, &rnk)
+ if err != nil {
+ if err.Error() != utils.ErrNotFound.Error() {
+ t.Errorf("Ranking retrieval error: %v", err)
+ }
+ continue
+ } else if rnk.LastUpdate.Equal(lastUpdate) {
+ continue
+ }
+ metricsChan <- &rnk
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
+ select {
+ case rnk := <-metricsChan:
+ lastUpdate = rnk.LastUpdate
+ case <-ctx.Done():
+ t.Error("Didn't get any Ranking from db")
+ }
+ })
+ t.Run("RankingsSetConfig", func(t *testing.T) {
+ var reply string
+ // setting store interval to -1
+ if err := client.Call(context.Background(), utils.ConfigSv1SetConfig, &config.SetConfigArgs{
+ Tenant: "cgrates.org",
+ Config: map[string]any{
+ "rankings": map[string]any{
+ "enabled": true,
+ "stats_conns": []string{"*localhost"},
+ "store_interval": "-1",
+ },
+ },
+ }, &reply); err != nil {
+ t.Error(err)
+ } else if reply != utils.OK {
+ t.Errorf("Expected OK received: %s", reply)
+ }
+ })
+ t.Run("RankingsSchedule", func(t *testing.T) {
+ var scheduled int
+ if err := client.Call(context.Background(), utils.RankingSv1ScheduleQueries,
+ &utils.ArgScheduleRankingQueries{RankingIDs: []string{"RANK1"}, TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org"}}}, &scheduled); err != nil {
+ t.Fatal(err)
+ } else if scheduled != 1 {
+ t.Errorf("expected 1, got %d", scheduled)
+ }
+ })
+ t.Run("GetRankingsStoredUnlimited", func(t *testing.T) {
+ rankingChan := make(chan *engine.Ranking, 1)
+ ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ defer cancel()
+ go func() {
+ ticker := time.NewTicker(1000 * time.Millisecond)
+ for {
+ select {
+ case <-ticker.C:
+ var rnk engine.Ranking
+ err := client.Call(context.Background(), utils.RankingSv1GetRanking, &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "RANK1"}}, &rnk)
+ if err != nil {
+ if err.Error() != utils.ErrNotFound.Error() {
+ t.Errorf("Ranking retrieval error: %v", err)
+ }
+ continue
+ } else if rnk.LastUpdate.Equal(lastUpdate) {
+ continue
+ }
+ rankingChan <- &rnk
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+ select {
+ case <-rankingChan:
+ case <-ctx.Done():
+ t.Error("Didn't get any Ranking from db")
+ }
+ })
+}
diff --git a/general_tests/trends_schedule_it_test.go b/general_tests/trends_schedule_it_test.go
index 414de561e..8fcb20bf5 100644
--- a/general_tests/trends_schedule_it_test.go
+++ b/general_tests/trends_schedule_it_test.go
@@ -66,9 +66,6 @@ func TestTrendSchedule(t *testing.T) {
"store_interval": "-1",
},
-"apiers": {
- "enabled": true,
-},
"admins": {
"enabled": true,
},
diff --git a/general_tests/trends_stored_it_test.go b/general_tests/trends_stored_it_test.go
new file mode 100644
index 000000000..c3e1b7a97
--- /dev/null
+++ b/general_tests/trends_stored_it_test.go
@@ -0,0 +1,279 @@
+//go:build integration
+// +build integration
+
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+package general_tests
+
+import (
+ "bytes"
+ "testing"
+ "time"
+
+ "github.com/cgrates/birpc/context"
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+)
+
+func TestTrendStore(t *testing.T) {
+ var dbConfig engine.DBCfg
+ switch *utils.DBType {
+ case utils.MetaMySQL:
+ case utils.MetaMongo:
+ dbConfig = engine.DBCfg{
+ DataDB: &engine.DBParams{
+ Type: utils.StringPointer("mongo"),
+ Port: utils.IntPointer(27017),
+ Name: utils.StringPointer("10"),
+ },
+ StorDB: &engine.DBParams{
+ Type: utils.StringPointer("mongo"),
+ Name: utils.StringPointer("cgrates"),
+ Port: utils.IntPointer(27017),
+ Password: utils.StringPointer(""),
+ },
+ }
+ case utils.MetaInternal, utils.MetaPostgres:
+ t.SkipNow()
+ default:
+ t.Fatal("unsupported dbtype value")
+ }
+ content := `{
+
+"general": {
+ "log_level": 7,
+},
+"trends": {
+ "enabled": true,
+ "store_interval": "1500ms",
+ "stats_conns":["*localhost"],
+},
+
+"stats": {
+ "enabled": true,
+ "store_interval": "-1",
+},
+
+"admins": {
+ "enabled": true,
+},
+
+}
+`
+ tpFiles := map[string]string{
+ utils.TrendsCsv: `#Tenant[0],Id[1],Schedule[2],StatID[3],Metrics[4],TTL[5],QueueLength[6],MinItems[7],CorrelationType[8],Tolerance[9],Stored[10],ThresholdIDs[11]
+cgrates.org,TREND_1,@every 1s,Stats1_1,,-1,-1,1,*last,1,false,`,
+ utils.StatsCsv: `#Tenant[0],Id[1],FilterIDs[2],Weights[3],Blockers[4],QueueLength[5],TTL[6],MinItems[7],Stored[8],ThresholdIDs[9],MetricIDs[10],MetricFilterIDs[11],MetricBlockers[12]
+cgrates.org,Stats1_1,*string:~*req.Account:1001,,,,-1,,,,*tcc;*acd;*tcd,,`}
+
+ ng := engine.TestEngine{
+ ConfigJSON: content,
+ TpFiles: tpFiles,
+ DBCfg: dbConfig,
+ Encoding: *utils.Encoding,
+ LogBuffer: bytes.NewBuffer(nil),
+ }
+
+ client, _ := ng.Run(t)
+ time.Sleep(200 * time.Millisecond)
+ t.Run("TrendSchedule", func(t *testing.T) {
+ var scheduled int
+ if err := client.Call(context.Background(), utils.TrendSv1ScheduleQueries,
+ &utils.ArgScheduleTrendQueries{TrendIDs: []string{"TREND_1"}, TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org"}}}, &scheduled); err != nil {
+ t.Fatal(err)
+ } else if scheduled != 1 {
+ t.Errorf("expected 1, got %d", scheduled)
+ }
+ })
+ t.Run("ProcessStats", func(t *testing.T) {
+ var reply []string
+ if err := client.Call(context.Background(), utils.StatSv1ProcessEvent, &utils.CGREvent{
+ Tenant: "cgrates.org",
+ ID: "event1",
+ Event: map[string]any{
+ utils.AccountField: "1001",
+ utils.AnswerTime: time.Date(2024, 8, 22, 14, 25, 0, 0, time.UTC),
+ },
+ APIOpts: map[string]any{
+ utils.MetaUsage: time.Duration(1800+60) * time.Second,
+ utils.MetaCost: 20 + float64(10),
+ utils.MetaPDD: time.Duration(10 * time.Second),
+ }}, &reply); err != nil {
+ t.Error(err)
+ }
+
+ })
+
+ t.Run("GetTrendsAfterStoreInterval", func(t *testing.T) {
+ metricsChan := make(chan *engine.Trend, 1)
+ ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ defer cancel()
+ go func() {
+ ticker := time.NewTicker(600 * time.Millisecond)
+ var trnd engine.Trend
+ for {
+ select {
+ case <-ticker.C:
+ err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &trnd)
+ if err != nil {
+ if err.Error() != utils.ErrNotFound.Error() {
+ t.Errorf("Trend retrieval error: %v", err)
+ }
+ continue
+ }
+ metricsChan <- &trnd
+ case <-ctx.Done():
+ return
+ }
+
+ }
+ }()
+
+ select {
+ case trnd := <-metricsChan:
+ if len(trnd.RunTimes) < 1 || len(trnd.Metrics) < 1 {
+ t.Errorf("expected at least 1 runtime, got %d", len(trnd.RunTimes))
+ }
+ case <-ctx.Done():
+ t.Error("Didn't get any trend from db")
+ }
+ })
+
+ t.Run("TrendsSetConfig", func(t *testing.T) {
+ var reply string
+ // setting store interval to 0
+ if err := client.Call(context.Background(), utils.ConfigSv1SetConfig, &config.SetConfigArgs{
+ Tenant: "cgrates.org",
+ Config: map[string]any{
+ "trends": map[string]any{
+ "enabled": true,
+ "stats_conns": []string{"*localhost"},
+ "store_interval": "0",
+ },
+ },
+ }, &reply); err != nil {
+ t.Error(err)
+ } else if reply != utils.OK {
+ t.Errorf("Expected OK received: %s", reply)
+ }
+ })
+ t.Run("TrendSchedule", func(t *testing.T) {
+ var scheduled int
+ if err := client.Call(context.Background(), utils.TrendSv1ScheduleQueries,
+ &utils.ArgScheduleTrendQueries{TrendIDs: []string{"TREND_1"}, TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org"}}}, &scheduled); err != nil {
+ t.Fatal(err)
+ } else if scheduled != 1 {
+ t.Errorf("expected 1, got %d", scheduled)
+ }
+ })
+
+ t.Run("GetTrendsNotStored", func(t *testing.T) {
+ metricsChan := make(chan *engine.Trend, 1)
+ ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
+ defer cancel()
+ go func() {
+ for {
+ ticker := time.NewTicker(700 * time.Millisecond)
+ select {
+ case <-ticker.C:
+ var trnd engine.Trend
+ // the trend will be not updated since storeinterval is set to 0
+ err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &trnd)
+ if err != nil {
+ if err.Error() != utils.ErrNotFound.Error() {
+ t.Errorf("Trend retrieval error: %v", err)
+ }
+ continue
+ }
+ metricsChan <- &trnd
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
+ select {
+ case trnd := <-metricsChan:
+ if len(trnd.RunTimes) < 1 && len(trnd.Metrics) < 1 {
+ t.Errorf("expected 1 runtime, got %d", len(trnd.RunTimes))
+ }
+ case <-ctx.Done():
+ t.Error("Didn't get any trend from db")
+ }
+ })
+ t.Run("TrendsSetConfig", func(t *testing.T) {
+ var reply string
+ // setting store interval to -1
+ if err := client.Call(context.Background(), utils.ConfigSv1SetConfig, &config.SetConfigArgs{
+ Tenant: "cgrates.org",
+ Config: map[string]any{
+ "trends": map[string]any{
+ "enabled": true,
+ "stats_conns": []string{"*localhost"},
+ "store_interval": "-1",
+ },
+ },
+ }, &reply); err != nil {
+ t.Error(err)
+ } else if reply != utils.OK {
+ t.Errorf("Expected OK received: %s", reply)
+ }
+ })
+ t.Run("TrendSchedule", func(t *testing.T) {
+ var scheduled int
+ if err := client.Call(context.Background(), utils.TrendSv1ScheduleQueries,
+ &utils.ArgScheduleTrendQueries{TrendIDs: []string{"TREND_1"}, TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org"}}}, &scheduled); err != nil {
+ t.Fatal(err)
+ } else if scheduled != 1 {
+ t.Errorf("expected 1, got %d", scheduled)
+ }
+ })
+ t.Run("GetTrendsStoredUnlimited", func(t *testing.T) {
+ metricsChan := make(chan *engine.Trend, 1)
+ ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ defer cancel()
+ go func() {
+ ticker := time.NewTicker(1000 * time.Millisecond)
+ for {
+ select {
+ case <-ticker.C:
+ var trnd engine.Trend
+ err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &trnd)
+ if err != nil {
+ if err.Error() != utils.ErrNotFound.Error() {
+ t.Errorf("Trend retrieval error: %v", err)
+ }
+ continue
+ }
+ metricsChan <- &trnd
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+ select {
+ case trnd := <-metricsChan:
+ if len(trnd.RunTimes) < 2 || len(trnd.Metrics) < 2 {
+ t.Errorf("expected at least 2 runtimes, got %d", len(trnd.RunTimes))
+ }
+ case <-ctx.Done():
+ t.Error("Didn't get any trend from db")
+ }
+ })
+}