diff --git a/engine/rankings_test.go b/engine/rankings_test.go index aa152d5c4..84e775bd8 100644 --- a/engine/rankings_test.go +++ b/engine/rankings_test.go @@ -20,6 +20,7 @@ package engine import ( "reflect" + "slices" "sync" "testing" "time" @@ -295,28 +296,82 @@ func TestStoreRanking(t *testing.T) { } -func TestRankingsStartRankings(t *testing.T) { +func TestRankingsStoreRankings(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.RankingSCfg().Enabled = true - cfg.RankingSCfg().StoreInterval = time.Second + cfg.RankingSCfg().StoreInterval = time.Millisecond * 1300 + cfg.RankingSCfg().StatSConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats)} dataDB := NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) dm := NewDataManager(dataDB, cfg.CacheCfg(), nil) - rkg := NewRankingS(dm, nil, nil, cfg) - dm.SetRanking(&Ranking{}) + conn := make(chan context.ClientConnector, 1) + conn <- &ccMock{ + calls: map[string]func(ctx *context.Context, args any, reply any) error{ + utils.StatSv1GetQueueFloatMetrics: func(ctx *context.Context, args, reply any) error { + if args.(*utils.TenantIDWithAPIOpts).ID == "stat1" { + *reply.(*map[string]float64) = map[string]float64{ + utils.MetaTCD: float64(20 * time.Second), + utils.MetaACC: 22.2, + } + } else if args.(*utils.TenantIDWithAPIOpts).ID == "stat2" { + *reply.(*map[string]float64) = map[string]float64{ + utils.MetaTCD: float64(23 * time.Second), + utils.MetaACC: 22.2, + } + } + return nil + }, + }, + } + connMgr = NewConnManager(config.NewDefaultCGRConfig(), map[string]chan context.ClientConnector{ + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats): conn, + }) + rkg := NewRankingS(dm, connMgr, nil, cfg) - dm.SetRankingProfile(&RankingProfile{ + rankingProfile := &RankingProfile{ Tenant: "cgrates.org", ID: "ID1", Schedule: "@every 1s", StatIDs: []string{"stat1", "stat2"}, - MetricIDs: []string{"metric1", "metric2"}, - Sorting: "asc", - SortingParameters: []string{"metric1:true"}, + MetricIDs: []string{}, + Sorting: "*desc", + SortingParameters: []string{utils.MetaTCD, utils.MetaACC}, Stored: true, - ThresholdIDs: []string{"threshold1"}}) + } + dm.SetRankingProfile(rankingProfile) if err := rkg.StartRankingS(); err != nil { - t.Error(err) + t.Fatalf("Unexpected error when starting rankings: %v", err) + } + + time.Sleep(1200 * time.Millisecond) + + profile, err := dm.GetRanking("cgrates.org", "ID1", false, false, "") + if err != nil { + t.Errorf("Error retrieving ranking profile: %v", err) + } + if profile == nil { + t.Fatal("Expected ranking profile to be present, but it was not found") + } + + if profile.ID != "ID1" { + t.Errorf("Expected profile ID to be 'ID1', but got %v", profile.ID) + } + if profile.Tenant != "cgrates.org" { + t.Errorf("Expected tenant to be 'cgrates.org', but got %v", profile.Tenant) + } + + if profile.Sorting != "*desc" { + t.Errorf("Expected sorting to be 'desc', but got %v", profile.Sorting) + } + if !slices.Equal(profile.SortedStatIDs, []string{"stat2", "stat1"}) { + t.Errorf("Expected SortedStatIDs to be [stat2, stat1], but got %v", profile.SortedStatIDs) + + } + if !slices.Equal(profile.SortingParameters, profile.SortingParameters) { + t.Errorf("Expected SortingParameters to be ['metric1:true'], but got %v", profile.SortingParameters) + } + if !reflect.DeepEqual(profile.Metrics, map[string]map[string]float64{"stat1": {"*acc": 22.2, "*tcd": 20000000000}, "stat2": {"*acc": 22.2, "*tcd": 23000000000}}) { + t.Errorf("Expected sorting to be 'asc', but got %v", profile.Metrics) } } diff --git a/engine/trends_test.go b/engine/trends_test.go index 1e05d92d9..31d6a1198 100644 --- a/engine/trends_test.go +++ b/engine/trends_test.go @@ -59,6 +59,7 @@ func TestStoreTrend(t *testing.T) { cfg := config.NewDefaultCGRConfig() dataDB := NewInternalDB(nil, nil, true, nil) dm := NewDataManager(dataDB, cfg.CacheCfg(), nil) + tS := &TrendS{ cgrcfg: cfg, dm: dm, @@ -85,10 +86,10 @@ func TestStoreTrend(t *testing.T) { ID: "trendID1", tPrfl: trendProfile, Metrics: make(map[time.Time]map[string]*MetricWithTrend), - RunTimes: []time.Time{}, - mLast: make(map[string]time.Time), - mCounts: make(map[string]int), - mTotals: make(map[string]float64), + RunTimes: []time.Time{time.Now().Add(-time.Second)}, + mLast: map[string]time.Time{"metric1": time.Now().Add(-time.Minute), "metric2": time.Now().Add(-2 * time.Minute)}, + mCounts: map[string]int{"metric1": 5, "metric2": 3}, + mTotals: map[string]float64{"metric1": 100.5, "metric2": 60.3}, } cfg.TrendSCfg().StoreInterval = 0 @@ -98,37 +99,36 @@ func TestStoreTrend(t *testing.T) { if len(tS.storedTrends) != 0 { t.Error("Expected storedTrends to be empty when StoreInterval is 0") } + cfg.TrendSCfg().StoreInterval = -1 if err := tS.storeTrend(trend); err != nil { t.Errorf("Expected no error when StoreInterval is -1, but got: %v", err) } + cfg.TrendSCfg().StoreInterval = time.Second if err := tS.storeTrend(trend); err != nil { t.Errorf("Expected no error when StoreInterval is positive, but got: %v", err) } -} - -func TestStoreTrends(t *testing.T) { - cfg := config.NewDefaultCGRConfig() - dataDB := NewInternalDB(nil, nil, true, nil) - dm := NewDataManager(dataDB, cfg.CacheCfg(), nil) - - tS := &TrendS{ - cgrcfg: cfg, - dm: dm, - storedTrends: make(utils.StringSet), + if _, exists := tS.storedTrends[trend.TenantID()]; !exists { + t.Errorf("Expected trendID %v to be stored in storedTrends for positive StoreInterval", trend.ID) } - trendID := "trendID1" - tS.storedTrends.Add(trendID) + retrievedTrend, err := dm.GetTrend(trend.Tenant, trend.ID, true, true, "") + if err != nil { + t.Errorf("Error retrieving trend from data manager: %v", err) + } + if retrievedTrend == nil { + t.Error("Expected a stored trend to be found in data manager, but got nil") + } else { + if retrievedTrend.Tenant != trend.Tenant { + t.Errorf("Expected retrieved trend Tenant to be %s, but got %s", trend.Tenant, retrievedTrend.Tenant) + } + if retrievedTrend.ID != trend.ID { + t.Errorf("Expected retrieved trend ID to be %s, but got %s", trend.ID, retrievedTrend.ID) + } - tS.storeTrends() - - if len(tS.storedTrends) == 0 { - t.Error("Expected storedTrends to not be empty") } } - func TestV1GetTrendSummary(t *testing.T) { cfg := config.NewDefaultCGRConfig() dataDB := NewInternalDB(nil, nil, true, nil) @@ -299,3 +299,155 @@ func TestProcessThresholds_OptsInitialization(t *testing.T) { } } + +func TestTrendsStoreTrends(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.TrendSCfg().Enabled = true + cfg.TrendSCfg().StoreInterval = time.Millisecond * 1500 + cfg.TrendSCfg().StatSConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats)} + dataDB := NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) + dm := NewDataManager(dataDB, cfg.CacheCfg(), nil) + conn := make(chan context.ClientConnector, 1) + conn <- &ccMock{ + calls: map[string]func(ctx *context.Context, args any, reply any) error{ + utils.StatSv1GetQueueFloatMetrics: func(ctx *context.Context, args, reply any) error { + if args.(*utils.TenantIDWithAPIOpts).ID == "stat1" { + *reply.(*map[string]float64) = map[string]float64{ + utils.MetaTCD: float64(20 * time.Second), + utils.MetaACC: 22.2, + } + } else if args.(*utils.TenantIDWithAPIOpts).ID == "stat2" { + *reply.(*map[string]float64) = map[string]float64{ + utils.MetaTCD: float64(23 * time.Second), + utils.MetaACC: 22.2, + } + } + return nil + }, + }, + } + connMgr = NewConnManager(config.NewDefaultCGRConfig(), map[string]chan context.ClientConnector{ + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats): conn, + }) + trnd := NewTrendS(dm, connMgr, nil, cfg) + trendProfile := &TrendProfile{ + Tenant: "cgrates.org", + StatID: "stat1", + ID: "ID1", + Schedule: "@every 1s", + Stored: true, + } + + dm.SetTrendProfile(trendProfile) + if err := trnd.StartTrendS(); err != nil { + t.Fatalf("Unexpected error when starting trends: %v", err) + } + profile, err := dm.GetTrend("cgrates.org", "ID1", false, false, "") + if err != nil { + t.Errorf("Error retrieving trend profile: %v", err) + } + if profile == nil { + t.Fatal("Expected trend profile to be present, but it was not found") + } + if profile.ID != "ID1" { + t.Errorf("Expected profile ID to be 'ID1', but got %v", profile.ID) + } + if profile.Tenant != "cgrates.org" { + t.Errorf("Expected tenant to be 'cgrates.org', but got %v", profile.Tenant) + } + +} + +func TestTrendReload(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.TrendSCfg().Enabled = true + cfg.TrendSCfg().StoreInterval = 0 + dataDB := NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) + dm := NewDataManager(dataDB, cfg.CacheCfg(), nil) + conn := make(chan context.ClientConnector, 1) + conn <- &ccMock{ + calls: map[string]func(ctx *context.Context, args any, reply any) error{ + utils.StatSv1GetQueueFloatMetrics: func(ctx *context.Context, args, reply any) error { + *reply.(*map[string]float64) = map[string]float64{ + utils.MetaTCD: float64(20 * time.Second), + utils.MetaACC: 22.2, + } + return nil + }, + }, + } + connMgr := NewConnManager(cfg, map[string]chan context.ClientConnector{ + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats): conn, + }) + trnd := NewTrendS(dm, connMgr, nil, cfg) + trendProfile := &TrendProfile{ + Tenant: "cgrates.org", + StatID: "stat1", + ID: "ID1", + Schedule: "@every 1s", + Stored: true, + } + dm.SetTrendProfile(trendProfile) + go trnd.asyncStoreTrends() + trnd.Reload() + select { + case <-trnd.trendStop: + t.Fatal("Expected trendStop channel to be closed after Reload") + default: + } + if trnd.trendStop == nil { + t.Fatal("Expected trendStop channel to be re-initialized after Reload") + } + if trnd.storingStopped == nil { + t.Fatal("Expected storingStopped channel to be re-initialized after Reload") + } +} + +func TestV1GetTrendStoreIntervalZero(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.TrendSCfg().Enabled = true + cfg.TrendSCfg().StoreInterval = 0 + dataDB := NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) + dm := NewDataManager(dataDB, cfg.CacheCfg(), nil) + conn := make(chan context.ClientConnector, 1) + conn <- &ccMock{ + calls: map[string]func(ctx *context.Context, args any, reply any) error{ + utils.StatSv1GetQueueFloatMetrics: func(ctx *context.Context, args, reply any) error { + if args.(*utils.TenantIDWithAPIOpts).ID == "stat1" { + *reply.(*map[string]float64) = map[string]float64{ + utils.MetaTCD: float64(20 * time.Second), + utils.MetaACC: 22.2, + } + } + return nil + }, + }, + } + connMgr := NewConnManager(cfg, map[string]chan context.ClientConnector{ + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats): conn, + }) + trnd := NewTrendS(dm, connMgr, nil, cfg) + trendProfile := &TrendProfile{ + Tenant: "cgrates.org", + StatID: "stat1", + ID: "ID1", + Schedule: "@every 1s", + Stored: true, + } + dm.SetTrendProfile(trendProfile) + if err := trnd.StartTrendS(); err != nil { + t.Fatalf("Unexpected error when starting trends: %v", err) + } + time.Sleep(1 * time.Second) + ctx := context.Background() + arg := &utils.ArgGetTrend{ID: "nonexistent"} + var retTrend Trend + + err := trnd.V1GetTrend(ctx, arg, &retTrend) + if err == nil { + t.Fatal("Expected error, got nil") + } + if !errors.Is(err, utils.ErrNotFound) { + t.Fatalf("Expected error type 'ErrNotFound', got: %v", err) + } +}