From cf44898b98fa1996fa909fdb794351676275f7cd Mon Sep 17 00:00:00 2001 From: gezimbll Date: Tue, 26 Nov 2024 11:22:02 +0100 Subject: [PATCH] revise tests for trends storeinterval && file readers --- engine/datamanager.go | 3 +- engine/libtrends.go | 37 +++-- engine/libtrends_test.go | 14 +- engine/trends.go | 7 +- ers/filecsv_it_test.go | 42 +++--- ers/filefwv_it_test.go | 36 +++-- ers/filejson_it_test.go | 36 +++-- general_tests/trends_stored_it_test.go | 185 ++++++++++++++----------- 8 files changed, 215 insertions(+), 145 deletions(-) diff --git a/engine/datamanager.go b/engine/datamanager.go index fd7bb9fe2..daba35bff 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -1302,7 +1302,6 @@ func (dm *DataManager) RemoveStatQueueProfile(tenant, id string, withIndex bool) func (dm *DataManager) GetTrend(tenant, id string, cacheRead, cacheWrite bool, transactionID string) (tr *Trend, err error) { tntID := utils.ConcatenatedKey(tenant, id) - if cacheRead { if x, ok := Cache.Get(utils.CacheTrends, tntID); ok { if x == nil { @@ -1367,7 +1366,7 @@ func (dm *DataManager) SetTrend(tr *Trend) (err error) { return utils.ErrNoDatabaseConn } if dm.dataDB.GetStorageType() != utils.MetaInternal { - if err = tr.compress(dm.ms); err != nil { + if tr, err = tr.compress(dm.ms); err != nil { return } } diff --git a/engine/libtrends.go b/engine/libtrends.go index e7398b13c..de7540009 100644 --- a/engine/libtrends.go +++ b/engine/libtrends.go @@ -119,6 +119,25 @@ type Trend struct { } func (t *Trend) Clone() (tC *Trend) { + tC = &Trend{ + Tenant: t.Tenant, + ID: t.ID, + } + if t.RunTimes != nil { + tC.RunTimes = make([]time.Time, len(t.RunTimes)) + copy(tC.RunTimes, t.RunTimes) + } + if t.CompressedMetrics != nil { + tC.CompressedMetrics = make([]byte, len(t.CompressedMetrics)) + copy(tC.CompressedMetrics, t.CompressedMetrics) + } + if t.Metrics != nil { + tC.Metrics = make(map[time.Time]map[string]*MetricWithTrend) + for key, val := range t.Metrics { + tC.Metrics[key] = val + } + } + return } @@ -143,17 +162,19 @@ func (t *Trend) asTrendSummary() (ts *TrendSummary) { return } -func (t *Trend) compress(ms Marshaler) (err error) { +func (t *Trend) compress(ms Marshaler) (tr *Trend, err error) { if config.CgrConfig().TrendSCfg().StoreUncompressedLimit > len(t.RunTimes) { return } - t.CompressedMetrics, err = ms.Marshal(t.Metrics) - if err != nil { - return + tr = &Trend{ + Tenant: t.Tenant, + ID: t.ID, } - t.Metrics = nil - t.RunTimes = nil - return nil + tr.CompressedMetrics, err = ms.Marshal(t.Metrics) + if err != nil { + return nil, err + } + return tr, nil } func (t *Trend) uncompress(ms Marshaler) (err error) { @@ -164,7 +185,7 @@ func (t *Trend) uncompress(ms Marshaler) (err error) { if err != nil { return } - t.CompressedMetrics = []byte{} + t.CompressedMetrics = nil t.RunTimes = make([]time.Time, len(t.Metrics)) i := 0 for key := range t.Metrics { diff --git a/engine/libtrends_test.go b/engine/libtrends_test.go index 32b341a57..974fc1adb 100644 --- a/engine/libtrends_test.go +++ b/engine/libtrends_test.go @@ -442,20 +442,20 @@ func TestTrendCompressSuccess(t *testing.T) { RunTimes: []time.Time{time.Now()}, } - err := trend.compress(marshaler) + tr, err := trend.compress(marshaler) if err != nil { t.Errorf("Expected no error, got <%+v>", err) } - if trend.CompressedMetrics == nil { - t.Errorf("Expected CompressedMetrics to be populated, got: %+v", trend.CompressedMetrics) + if tr.CompressedMetrics == nil { + t.Errorf("Expected CompressedMetrics to be populated, got: %+v", tr.CompressedMetrics) } - if trend.Metrics != nil { - t.Errorf("Expected Metrics to be nil after compression, got: %+v", trend.Metrics) + if tr.Metrics != nil { + t.Errorf("Expected Metrics to be nil after compression, got: %+v", tr.Metrics) } - if trend.RunTimes != nil { - t.Errorf("Expected RunTimes to be nil after compression, got: %+v", trend.RunTimes) + if tr.RunTimes != nil { + t.Errorf("Expected RunTimes to be nil after compression, got: %+v", tr.RunTimes) } } diff --git a/engine/trends.go b/engine/trends.go index 23efb41a2..d2d87ba42 100644 --- a/engine/trends.go +++ b/engine/trends.go @@ -287,14 +287,14 @@ func (tS *TrendS) storeTrends() { continue } trnd := trndIf.(*Trend) - trnd.tMux.RLock() + trnd.tMux.Lock() if err := tS.dm.SetTrend(trnd); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> failed storing Trend with ID: %q, err: %q", utils.TrendS, trndID, err)) failedTrndIDs = append(failedTrndIDs, trndID) // record failure so we can schedule it for next backup } - trnd.tMux.RUnlock() + trnd.tMux.Unlock() // randomize the CPU load and give up thread control runtime.Gosched() } @@ -467,8 +467,11 @@ func (tS *TrendS) V1GetTrend(ctx *context.Context, arg *utils.ArgGetTrend, retTr if trnd, err = tS.dm.GetTrend(arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil { return } + trnd.tMux.RLock() + defer trnd.tMux.RUnlock() retTrend.Tenant = trnd.Tenant // avoid vet complaining for mutex copying retTrend.ID = trnd.ID + startIdx := arg.RunIndexStart if startIdx > len(trnd.RunTimes) { startIdx = len(trnd.RunTimes) diff --git a/ers/filecsv_it_test.go b/ers/filecsv_it_test.go index 7e209e76f..c6e55e6f6 100644 --- a/ers/filecsv_it_test.go +++ b/ers/filecsv_it_test.go @@ -635,26 +635,28 @@ func TestFileCSVProcessEventError3(t *testing.T) { } } -// func TestFileCSVDirErr(t *testing.T) { -// cfg := config.NewDefaultCGRConfig() -// fltrs := &engine.FilterS{} -// eR := &CSVFileER{ -// cgrCfg: cfg, -// cfgIdx: 0, -// fltrS: fltrs, -// sourceDir: "/tmp/ers/out/", -// rdrEvents: make(chan *erEvent, 1), -// rdrError: make(chan error, 1), -// rdrExit: make(chan struct{}), -// conReqs: make(chan struct{}, 1), -// } -// eR.conReqs <- struct{}{} -// eR.Config().RunDelay = -1 -// errExpect := "no such file or directory" -// if err := eR.Serve(); err == nil || err.Error() != errExpect { -// t.Errorf("Expected %v but received %v", errExpect, err) -// } -// } +func TestFileCSVDirErr(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + fltrS := &engine.FilterS{} + cfg.ERsCfg().Readers = []*config.EventReaderCfg{ + { + Type: utils.MetaFileCSV, + RunDelay: -1, + ID: "csv_reader", + SourcePath: "/var/spool/cgrates/ers/in", + ProcessedPath: "/var/spool/cgrates/out", + }, + } + srv := NewERService(cfg, nil, fltrS, nil) + stopChan := make(chan struct{}, 1) + cfgRldChan := make(chan struct{}, 1) + err := srv.ListenAndServe(stopChan, cfgRldChan) + + expected := "no such file or directory" + if err == nil || err.Error() != expected { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) + } +} func TestFileCSV(t *testing.T) { cfg := config.NewDefaultCGRConfig() fltrs := &engine.FilterS{} diff --git a/ers/filefwv_it_test.go b/ers/filefwv_it_test.go index a7d140b30..b2ccc4c9c 100644 --- a/ers/filefwv_it_test.go +++ b/ers/filefwv_it_test.go @@ -332,20 +332,28 @@ func TestFileFWVServeErrTimeDuration0(t *testing.T) { } } -// func TestFileFWVServeErrTimeDurationNeg1(t *testing.T) { -// cfg := config.NewDefaultCGRConfig() -// cfgIdx := 0 -// rdr, err := NewFWVFileER(cfg, cfgIdx, nil, nil, nil, nil, nil) -// if err != nil { -// t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) -// } -// rdr.Config().RunDelay = time.Duration(-1) -// expected := "no such file or directory" -// err = rdr.Serve() -// if err == nil || err.Error() != expected { -// t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) -// } -// } +func TestFileFWVServeErrTimeDurationNeg1(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + fltrS := &engine.FilterS{} + cfg.ERsCfg().Readers = []*config.EventReaderCfg{ + { + Type: utils.MetaFileFWV, + RunDelay: -1, + ID: "fwv_reader", + SourcePath: "/var/spool/cgrates/ers/in", + ProcessedPath: "/var/spool/cgrates/out", + }, + } + srv := NewERService(cfg, nil, fltrS, nil) + stopChan := make(chan struct{}, 1) + cfgRldChan := make(chan struct{}, 1) + err := srv.ListenAndServe(stopChan, cfgRldChan) + + expected := "no such file or directory" + if err == nil || err.Error() != expected { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) + } +} func TestFileFWV(t *testing.T) { cfg := config.NewDefaultCGRConfig() diff --git a/ers/filejson_it_test.go b/ers/filejson_it_test.go index ef513c531..c9b517bca 100644 --- a/ers/filejson_it_test.go +++ b/ers/filejson_it_test.go @@ -246,20 +246,28 @@ func TestFileJSONServeErrTimeDuration0(t *testing.T) { } } -// func TestFileJSONServeErrTimeDurationNeg1(t *testing.T) { -// cfg := config.NewDefaultCGRConfig() -// cfgIdx := 0 -// rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil, nil) -// if err != nil { -// t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) -// } -// rdr.Config().RunDelay = time.Duration(-1) -// expected := "no such file or directory" -// err = rdr.Serve() -// if err == nil || err.Error() != expected { -// t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) -// } -// } +func TestFileJSONServeErrTimeDurationNeg1(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + fltrS := &engine.FilterS{} + cfg.ERsCfg().Readers = []*config.EventReaderCfg{ + { + Type: utils.MetaFileJSON, + RunDelay: -1, + ID: "json_reader", + SourcePath: "/var/spool/cgrates/ers/in", + ProcessedPath: "/var/spool/cgrates/out", + }, + } + srv := NewERService(cfg, nil, fltrS, nil) + stopChan := make(chan struct{}, 1) + cfgRldChan := make(chan struct{}, 1) + err := srv.ListenAndServe(stopChan, cfgRldChan) + + expected := "no such file or directory" + if err == nil || err.Error() != expected { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) + } +} // func TestFileJSONServeTimeDefault(t *testing.T) { // cfg := config.NewDefaultCGRConfig() diff --git a/general_tests/trends_stored_it_test.go b/general_tests/trends_stored_it_test.go index f2630f59c..255a31727 100644 --- a/general_tests/trends_stored_it_test.go +++ b/general_tests/trends_stored_it_test.go @@ -31,33 +31,23 @@ import ( ) func TestTrendStore(t *testing.T) { - var dbConfig string + var dbConfig engine.DBCfg switch *utils.DBType { case utils.MetaMySQL: - dbConfig = ` -"data_db": { - "db_type": "redis", - "db_port": 6379, - "db_name": "10", -}, - -"stor_db": { - "db_password": "CGRateS.org", -},` case utils.MetaMongo: - dbConfig = ` -"data_db": { - "db_type": "mongo", - "db_name": "10", - "db_port": 27017, -}, - -"stor_db": { - "db_type": "mongo", - "db_name": "cgrates", - "db_port": 27017, - "db_password": "", -},` + dbConfig = engine.DBCfg{ + DataDB: &engine.DBParams{ + Type: utils.StringPointer("mongo"), + Port: utils.IntPointer(27017), + Name: utils.StringPointer("10"), + }, + StorDB: &engine.DBParams{ + Type: utils.StringPointer("mongo"), + Name: utils.StringPointer("cgrates"), + Port: utils.IntPointer(27017), + Password: utils.StringPointer(""), + }, + } case utils.MetaInternal, utils.MetaPostgres: t.SkipNow() default: @@ -68,10 +58,9 @@ func TestTrendStore(t *testing.T) { "general": { "log_level": 7, }, -` + dbConfig + ` "trends": { "enabled": true, - "store_interval": "2100ms", + "store_interval": "1500ms", "stats_conns":["*localhost"], }, @@ -95,9 +84,11 @@ cgrates.org,Stats1_1,*string:~*req.Account:1001,,,,,*tcc;*acd;*tcd,,,,,`} ng := engine.TestEngine{ ConfigJSON: content, TpFiles: tpFiles, + DBCfg: dbConfig, } client, _ := ng.Run(t) + t.Run("TrendSchedule", func(t *testing.T) { var scheduled int if err := client.Call(context.Background(), utils.TrendSv1ScheduleQueries, @@ -123,35 +114,40 @@ cgrates.org,Stats1_1,*string:~*req.Account:1001,,,,,*tcc;*acd;*tcd,,,,,`} } }) - time.Sleep(1 * time.Second) - t.Run("GetTrendBeforeStoreInterval", func(t *testing.T) { - var reply string - if err := client.Call(context.Background(), utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{}, &reply); err != nil { - t.Error(err) - } else if reply != utils.OK { - t.Error("Calling CacheSv1.ReloadCache got reply: ", reply) - } - var tr *engine.Trend - if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr); err == nil || err.Error() != utils.ErrNotFound.Error() { - t.Error(err) - } - }) - time.Sleep(1200 * time.Millisecond) t.Run("GetTrendsAfterStoreInterval", func(t *testing.T) { - var reply string - if err := client.Call(context.Background(), utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{}, &reply); err != nil { - t.Error(err) - } else if reply != utils.OK { - t.Error("Calling CacheSv1.ReloadCache got reply: ", reply) - } - var trnd *engine.Trend - if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &trnd); err != nil { - t.Error(err) - } else if len(trnd.RunTimes) != 1 || len(trnd.Metrics) != 1 { - t.Errorf("expected to 1 runtimes, got %d", len(trnd.RunTimes)) - } + metricsChan := make(chan *engine.Trend, 1) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + go func() { + ticker := time.NewTicker(600 * time.Millisecond) + var trnd engine.Trend + for { + select { + case <-ticker.C: + err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &trnd) + if err != nil { + if err.Error() != utils.ErrNotFound.Error() { + t.Errorf("Trend retrieval error: %v", err) + } + continue + } + metricsChan <- &trnd + case <-ctx.Done(): + return + } + } + }() + + select { + case trnd := <-metricsChan: + if len(trnd.RunTimes) < 1 || len(trnd.Metrics) < 1 { + t.Errorf("expected at least 1 runtime, got %d", len(trnd.RunTimes)) + } + case <-ctx.Done(): + t.Error("Didn't get any trend from db") + } }) t.Run("TrendsSetConfig", func(t *testing.T) { @@ -182,21 +178,38 @@ cgrates.org,Stats1_1,*string:~*req.Account:1001,,,,,*tcc;*acd;*tcd,,,,,`} } }) - time.Sleep(1 * time.Second) t.Run("GetTrendsNotStored", func(t *testing.T) { - var reply string - if err := client.Call(context.Background(), utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{}, &reply); err != nil { - t.Error(err) - } else if reply != utils.OK { - t.Error("Calling CacheSv1.ReloadCache got reply: ", reply) - } + metricsChan := make(chan *engine.Trend, 1) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + go func() { + for { + ticker := time.NewTicker(700 * time.Millisecond) + select { + case <-ticker.C: + var trnd engine.Trend + // the trend will be not updated since storeinterval is set to 0 + err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &trnd) + if err != nil { + if err.Error() != utils.ErrNotFound.Error() { + t.Errorf("Trend retrieval error: %v", err) + } + continue + } + metricsChan <- &trnd + case <-ctx.Done(): + return + } + } + }() - var tr *engine.Trend - // the trend will be not updated since storeinterval is set to 0 - if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr); err != nil { - t.Error(err) - } else if len(tr.RunTimes) != 1 || len(tr.Metrics) != 1 { - t.Errorf("expected to 1 runtimes, got %d", len(tr.RunTimes)) + select { + case trnd := <-metricsChan: + if len(trnd.RunTimes) < 1 && len(trnd.Metrics) < 1 { + t.Errorf("expected 1 runtime, got %d", len(trnd.RunTimes)) + } + case <-ctx.Done(): + t.Error("Didn't get any trend from db") } }) t.Run("TrendsSetConfig", func(t *testing.T) { @@ -226,20 +239,36 @@ cgrates.org,Stats1_1,*string:~*req.Account:1001,,,,,*tcc;*acd;*tcd,,,,,`} t.Errorf("expected 1, got %d", scheduled) } }) - time.Sleep(1 * time.Second) t.Run("GetTrendsStoredUnlimited", func(t *testing.T) { - var reply string - if err := client.Call(context.Background(), utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{}, &reply); err != nil { - t.Error(err) - } else if reply != utils.OK { - t.Error("Calling CacheSv1.ReloadCache got reply: ", reply) - } - var tr *engine.Trend - // the trend will be updated since storeinterval is set to -1 - if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr); err != nil { - t.Error(err) - } else if len(tr.RunTimes) != 2 || len(tr.Metrics) != 2 { - t.Errorf("expected to 2 runtimes, got %d", len(tr.RunTimes)) + metricsChan := make(chan *engine.Trend, 1) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + go func() { + ticker := time.NewTicker(1000 * time.Millisecond) + for { + select { + case <-ticker.C: + var trnd engine.Trend + err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &trnd) + if err != nil { + if err.Error() != utils.ErrNotFound.Error() { + t.Errorf("Trend retrieval error: %v", err) + } + continue + } + metricsChan <- &trnd + case <-ctx.Done(): + return + } + } + }() + select { + case trnd := <-metricsChan: + if len(trnd.RunTimes) < 2 || len(trnd.Metrics) < 2 { + t.Errorf("expected at least 2 runtimes, got %d", len(trnd.RunTimes)) + } + case <-ctx.Done(): + t.Error("Didn't get any trend from db") } }) }