diff --git a/apier/v1/apier_it_test.go b/apier/v1/apier_it_test.go index 366c04651..e70fbdbd4 100644 --- a/apier/v1/apier_it_test.go +++ b/apier/v1/apier_it_test.go @@ -1716,6 +1716,7 @@ func testApierResetDataAfterLoadFromFolder(t *testing.T) { expStats[utils.CacheReverseFilterIndexes].Items = 10 expStats[utils.CacheReverseFilterIndexes].Groups = 7 expStats[utils.CacheRankingProfiles].Items = 1 + expStats[utils.CacheRankings].Items = 1 expStats[utils.CacheTrendProfiles].Items = 1 expStats[utils.CacheTrends].Items = 1 diff --git a/apier/v1/caches_it_test.go b/apier/v1/caches_it_test.go index 7cb15e6d6..2ae1870f1 100644 --- a/apier/v1/caches_it_test.go +++ b/apier/v1/caches_it_test.go @@ -178,6 +178,7 @@ func testCacheSAfterLoadFromFolder(t *testing.T) { expStats[utils.CacheReverseFilterIndexes].Items = 10 expStats[utils.CacheReverseFilterIndexes].Groups = 7 expStats[utils.CacheRankingProfiles].Items = 1 + expStats[utils.CacheRankings].Items = 1 expStats[utils.CacheTrendProfiles].Items = 1 expStats[utils.CacheTrends].Items = 1 @@ -249,6 +250,7 @@ func testCacheSReload(t *testing.T) { expStats[utils.CacheReverseFilterIndexes].Items = 10 expStats[utils.CacheReverseFilterIndexes].Groups = 7 expStats[utils.CacheRankingProfiles].Items = 1 + expStats[utils.CacheRankings].Items = 1 expStats[utils.CacheTrends].Items = 1 expStats[utils.CacheTrendProfiles].Items = 1 diff --git a/config/config_defaults.go b/config/config_defaults.go index c937231b6..e9cb55ec6 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -298,7 +298,7 @@ const CGRATES_CFG_JSON = ` "*trend_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control trend profiles caching "*trends": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control trends caching "*ranking_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // ranking profiles - "*rankings": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // controle rankings caching + "*rankings": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control rankings caching "*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // statqueue profiles "*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // statqueues with metrics "*threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control threshold profiles caching diff --git a/engine/datamanager.go b/engine/datamanager.go index daba35bff..59a80f8bf 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -1725,10 +1725,10 @@ func (dm *DataManager) GetRanking(tenant, id string, cacheRead, cacheWrite bool, } return nil, err } - if cacheWrite { - if errCh := Cache.Set(utils.CacheRankings, tntID, rn, nil, cacheCommit(transactionID), transactionID); errCh != nil { - return nil, errCh - } + } + if cacheWrite { + if errCh := Cache.Set(utils.CacheRankings, tntID, rn, nil, cacheCommit(transactionID), transactionID); errCh != nil { + return nil, errCh } } return diff --git a/engine/rankings_test.go b/engine/rankings_test.go index f0ee237c6..14c957490 100644 --- a/engine/rankings_test.go +++ b/engine/rankings_test.go @@ -343,6 +343,7 @@ func TestRankingsStoreRankings(t *testing.T) { if err := rkg.StartRankingS(); err != nil { t.Fatalf("Unexpected error when starting rankings: %v", err) } + t.Cleanup(func() { rkg.StopRankingS() }) time.Sleep(1200 * time.Millisecond) diff --git a/general_tests/rankings_stored_it_test.go b/general_tests/rankings_stored_it_test.go new file mode 100644 index 000000000..6b90ce2b4 --- /dev/null +++ b/general_tests/rankings_stored_it_test.go @@ -0,0 +1,301 @@ +//go:build integration +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package general_tests + +import ( + "fmt" + "slices" + "testing" + "time" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func TestRankingStore(t *testing.T) { + var dbConfig engine.DBCfg + switch *utils.DBType { + case utils.MetaMySQL: + case utils.MetaMongo: + dbConfig = engine.DBCfg{ + DataDB: &engine.DBParams{ + Type: utils.StringPointer("mongo"), + Port: utils.IntPointer(27017), + Name: utils.StringPointer("10"), + }, + StorDB: &engine.DBParams{ + Type: utils.StringPointer("mongo"), + Name: utils.StringPointer("cgrates"), + Port: utils.IntPointer(27017), + Password: utils.StringPointer(""), + }, + } + case utils.MetaInternal, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("unsupported dbtype value") + } + content := `{ + +"general": { + "log_level": 7, +}, +"rankings": { + "enabled": true, + "store_interval": "1500ms", + "stats_conns":["*localhost"], +}, + +"stats": { + "enabled": true, + "store_interval": "-1", +}, + +"apiers": { + "enabled": true, +}, + +} +` + tpFiles := map[string]string{ + utils.RankingsCsv: `#Tenant[0],Id[1],Schedule[2],StatIDs[3],MetricIDs[4],Sorting[5],SortingParameters[6],Stored[7],ThresholdIDs[8] +cgrates.org,RANK1,@every 1s,Stats1;Stats2;Stats3;Stats4,,*asc,*acc;*pdd:false;*acd,,`, + utils.StatsCsv: `#Tenant[0],Id[1],FilterIDs[2],ActivationInterval[3],QueueLength[4],TTL[5],MinItems[6],Metrics[7],MetricFilterIDs[8],Stored[9],Blocker[10],Weight[11],ThresholdIDs[12] +cgrates.org,Stats1,*string:~*req.Account:1001,,,,,*acc;*acd;*pdd,,,,, +cgrates.org,Stats2,*string:~*req.Account:1002,,,,,*acc;*acd;*pdd,,,,, +cgrates.org,Stats3,*string:~*req.Account:1003,,,,,*acc;*acd;*pdd,,,,, +cgrates.org,Stats4,*string:~*req.Account:1004,,,,,*acc;*acd;*pdd,,,,,`} + + ng := engine.TestEngine{ + ConfigJSON: content, + TpFiles: tpFiles, + DBCfg: dbConfig, + } + client, _ := ng.Run(t) + var lastUpdate time.Time + t.Run("RankingSchedule", func(t *testing.T) { + var scheduled int + if err := client.Call(context.Background(), utils.RankingSv1ScheduleQueries, + &utils.ArgScheduleRankingQueries{RankingIDs: []string{"RANK1"}, TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org"}}}, &scheduled); err != nil { + t.Fatal(err) + } else if scheduled != 1 { + t.Errorf("expected 1, got %d", scheduled) + } + }) + t.Run("ProcessStats", func(t *testing.T) { + var reply []string + for i := 1; i <= 4; i++ { + if err := client.Call(context.Background(), utils.StatSv1ProcessEvent, &utils.CGREvent{ + Tenant: "cgrates.org", + ID: fmt.Sprintf("event%d", i), + Event: map[string]any{ + utils.AccountField: fmt.Sprintf("100%d", i), + utils.AnswerTime: time.Date(2024, 8, 22, 14, 25, 0, 0, time.UTC), + utils.Usage: time.Duration(1800+60) / time.Duration(i) * time.Second, + utils.Cost: 20.0 + float64((i*7)%10)/2, + utils.PDD: time.Duration(10+i*2) * time.Second, + }}, &reply); err != nil { + t.Error(err) + } + } + }) + + t.Run("GetRankingsAfterStoreInterval", func(t *testing.T) { + rankingsChan := make(chan *engine.Ranking, 1) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + go func() { + ticker := time.NewTicker(600 * time.Millisecond) + var rnk engine.Ranking + for { + select { + case <-ticker.C: + err := client.Call(context.Background(), utils.RankingSv1GetRanking, &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "RANK1"}}, &rnk) + if err != nil { + if err.Error() != utils.ErrNotFound.Error() { + t.Errorf("Ranking retrieval error: %v", err) + } + continue + } else if rnk.LastUpdate.IsZero() { + continue + } + rankingsChan <- &rnk + case <-ctx.Done(): + return + } + + } + }() + + select { + case rnk := <-rankingsChan: + lastUpdate = rnk.LastUpdate + sortedStatIDs := []string{"Stats3", "Stats2", "Stats1", "Stats4"} + if !slices.Equal(rnk.SortedStatIDs, sortedStatIDs) { + t.Error("should have sorted statids") + } + case <-ctx.Done(): + t.Error("Didn't get any ranking from db") + } + }) + + t.Run("ProcessStats", func(t *testing.T) { + var reply []string + j := 5 + for i := 1; i <= 4; i++ { + j-- + if err := client.Call(context.Background(), utils.StatSv1ProcessEvent, &utils.CGREvent{ + Tenant: "cgrates.org", + ID: fmt.Sprintf("event%d", i), + Event: map[string]any{ + utils.AccountField: fmt.Sprintf("100%d", i), + utils.AnswerTime: time.Date(2024, 8, 22, 14, 25, 0, 0, time.UTC), + utils.Usage: time.Duration(1500+60) / time.Duration(j) * time.Second, + utils.Cost: 20.0 + float64((j*8)%10)/2, + utils.PDD: time.Duration(11+j*2) * time.Second, + }}, &reply); err != nil { + t.Error(err) + } + } + }) + + t.Run("RankingsSetConfig", func(t *testing.T) { + var reply string + // setting store interval to 0 + if err := client.Call(context.Background(), utils.ConfigSv1SetConfig, &config.SetConfigArgs{ + Tenant: "cgrates.org", + Config: map[string]any{ + "rankings": map[string]any{ + "enabled": true, + "stats_conns": []string{"*localhost"}, + "store_interval": "0", + }, + }, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expected OK received: %s", reply) + } + }) + t.Run("RankingSchedule", func(t *testing.T) { + var scheduled int + if err := client.Call(context.Background(), utils.RankingSv1ScheduleQueries, + &utils.ArgScheduleRankingQueries{RankingIDs: []string{"RANK1"}, TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org"}}}, &scheduled); err != nil { + t.Fatal(err) + } else if scheduled != 1 { + t.Errorf("expected 1, got %d", scheduled) + } + }) + + t.Run("GetRankingsNotStored", func(t *testing.T) { + metricsChan := make(chan *engine.Ranking, 1) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + go func() { + for { + ticker := time.NewTicker(700 * time.Millisecond) + select { + case <-ticker.C: + var rnk engine.Ranking + err := client.Call(context.Background(), utils.RankingSv1GetRanking, &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "RANK1"}}, &rnk) + if err != nil { + if err.Error() != utils.ErrNotFound.Error() { + t.Errorf("Ranking retrieval error: %v", err) + } + continue + } else if rnk.LastUpdate.Equal(lastUpdate) { + continue + } + metricsChan <- &rnk + case <-ctx.Done(): + return + } + } + }() + + select { + case rnk := <-metricsChan: + lastUpdate = rnk.LastUpdate + case <-ctx.Done(): + t.Error("Didn't get any Ranking from db") + } + }) + t.Run("RankingsSetConfig", func(t *testing.T) { + var reply string + // setting store interval to -1 + if err := client.Call(context.Background(), utils.ConfigSv1SetConfig, &config.SetConfigArgs{ + Tenant: "cgrates.org", + Config: map[string]any{ + "rankings": map[string]any{ + "enabled": true, + "stats_conns": []string{"*localhost"}, + "store_interval": "-1", + }, + }, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expected OK received: %s", reply) + } + }) + t.Run("RankingsSchedule", func(t *testing.T) { + var scheduled int + if err := client.Call(context.Background(), utils.RankingSv1ScheduleQueries, + &utils.ArgScheduleRankingQueries{RankingIDs: []string{"RANK1"}, TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org"}}}, &scheduled); err != nil { + t.Fatal(err) + } else if scheduled != 1 { + t.Errorf("expected 1, got %d", scheduled) + } + }) + t.Run("GetRankingsStoredUnlimited", func(t *testing.T) { + rankingChan := make(chan *engine.Ranking, 1) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + go func() { + ticker := time.NewTicker(1000 * time.Millisecond) + for { + select { + case <-ticker.C: + var rnk engine.Ranking + err := client.Call(context.Background(), utils.RankingSv1GetRanking, &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "RANK1"}}, &rnk) + if err != nil { + if err.Error() != utils.ErrNotFound.Error() { + t.Errorf("Ranking retrieval error: %v", err) + } + continue + } else if rnk.LastUpdate.Equal(lastUpdate) { + continue + } + rankingChan <- &rnk + case <-ctx.Done(): + return + } + } + }() + select { + case <-rankingChan: + case <-ctx.Done(): + t.Error("Didn't get any Ranking from db") + } + }) +}