From bd6f5c661d920224d25414b1dbd0cb56995cd82d Mon Sep 17 00:00:00 2001 From: arberkatellari Date: Fri, 20 Sep 2024 15:16:25 +0200 Subject: [PATCH] Add *eventTimestamp for EES --- .../samples/reprocess_cdrs_stats_ees_mongo/cgrates.json | 2 +- .../samples/reprocess_cdrs_stats_ees_mysql/cgrates.json | 2 +- ees/ees.go | 3 +++ engine/dynamicdp.go | 2 +- engine/filters_test.go | 9 +++++---- engine/stats.go | 1 + general_tests/reproc_cdrs_for_stats_it_test.go | 6 ++++-- utils/consts.go | 1 + utils/dataprovider.go | 3 --- utils/dataprovider_test.go | 9 +-------- 10 files changed, 18 insertions(+), 20 deletions(-) diff --git a/data/conf/samples/reprocess_cdrs_stats_ees_mongo/cgrates.json b/data/conf/samples/reprocess_cdrs_stats_ees_mongo/cgrates.json index 5184ae460..55ca38287 100644 --- a/data/conf/samples/reprocess_cdrs_stats_ees_mongo/cgrates.json +++ b/data/conf/samples/reprocess_cdrs_stats_ees_mongo/cgrates.json @@ -41,7 +41,6 @@ "stats": { "enabled": true, - "store_interval": "1s", "ees_conns": ["*localhost"], "ees_exporter_ids": ["exporter1"] }, @@ -59,6 +58,7 @@ "id": "exporter1", "type": "*file_csv", "export_path": "/tmp/testCSV", + "filters": ["*gt:~*eventTimestamp:2024-09-19T14:00:58+02:00"], "attempts": 1, "synchronous": true, "field_separator": ",", diff --git a/data/conf/samples/reprocess_cdrs_stats_ees_mysql/cgrates.json b/data/conf/samples/reprocess_cdrs_stats_ees_mysql/cgrates.json index e67b2ec37..333b23cba 100644 --- a/data/conf/samples/reprocess_cdrs_stats_ees_mysql/cgrates.json +++ b/data/conf/samples/reprocess_cdrs_stats_ees_mysql/cgrates.json @@ -38,7 +38,6 @@ "stats": { "enabled": true, - "store_interval": "1s", "ees_conns": ["*localhost"], "ees_exporter_ids": ["exporter1"] }, @@ -56,6 +55,7 @@ "id": "exporter1", "type": "*file_csv", "export_path": "/tmp/testCSV", + "filters": ["*gt:~*eventTimestamp:2024-09-19T14:00:58+02:00"], "attempts": 1, "synchronous": true, "field_separator": ",", diff --git a/ees/ees.go b/ees/ees.go index 56d153507..cec7c6820 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -159,6 +159,9 @@ func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *engine.CG utils.MetaReq: cgrEv.Event, utils.MetaOpts: cgrEv.APIOpts, } + if cgrEv.CGREvent.Time != nil { + cgrDp[utils.MetaEventTimestamp] = *cgrEv.CGREvent.Time + } var wg sync.WaitGroup var withErr bool diff --git a/engine/dynamicdp.go b/engine/dynamicdp.go index 17cfddba5..f37cfbd2e 100644 --- a/engine/dynamicdp.go +++ b/engine/dynamicdp.go @@ -66,7 +66,7 @@ var initialDPPrefixes = utils.NewStringSet([]string{ utils.MetaCgrep, utils.MetaRep, utils.MetaAct, utils.MetaEC, utils.MetaUCH, utils.MetaOpts, utils.MetaHdr, utils.MetaTrl, utils.MetaCfg, - utils.MetaTenant}) + utils.MetaTenant, utils.MetaEventTimestamp}) func (dDP *dynamicDP) FieldAsInterface(fldPath []string) (val any, err error) { if len(fldPath) == 0 { diff --git a/engine/filters_test.go b/engine/filters_test.go index c80fb6d13..35ec05d25 100644 --- a/engine/filters_test.go +++ b/engine/filters_test.go @@ -24,6 +24,7 @@ import ( "net/http" "net/http/httptest" "reflect" + "strings" "testing" "time" @@ -2383,12 +2384,12 @@ func TestCheckFilterErr(t *testing.T) { Rules: []*FilterRule{ { Type: utils.MetaString, - Element: "~*reqCharger", + Element: "~.", Values: []string{"ChargerProfile2"}, }, }, } - if err := CheckFilter(fltr); err == nil { + if err := CheckFilter(fltr); err == nil || !strings.Contains(err.Error(), "Empty field path for filter <&{cgrates.org FLTR_CP_2 ") { t.Error(err) } fltr = &Filter{ @@ -2397,11 +2398,11 @@ func TestCheckFilterErr(t *testing.T) { Rules: []*FilterRule{{ Element: "~*req.Account", Type: utils.MetaString, - Values: []string{"~1001"}, + Values: []string{"~."}, }, }, } - if err := CheckFilter(fltr); err == nil { + if err := CheckFilter(fltr); err == nil || !strings.Contains(err.Error(), "Empty field path for filter <&{cgrates.org TestFilter ") { t.Error(err) } } diff --git a/engine/stats.go b/engine/stats.go index 71f8f9a3c..851f1dab3 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -334,6 +334,7 @@ func (sS *StatService) processEEs(sQs StatQueues, opts map[string]any) (err erro cgrEv := &utils.CGREvent{ Tenant: sq.Tenant, ID: utils.GenUUID(), + Time: utils.TimePointer(time.Now()), Event: map[string]any{ utils.EventType: utils.StatUpdate, utils.StatID: sq.ID, diff --git a/general_tests/reproc_cdrs_for_stats_it_test.go b/general_tests/reproc_cdrs_for_stats_it_test.go index dfdc1640b..61cc7ffad 100644 --- a/general_tests/reproc_cdrs_for_stats_it_test.go +++ b/general_tests/reproc_cdrs_for_stats_it_test.go @@ -66,6 +66,7 @@ var ( testRpcdrsCheckAccountBalancesAfterSecondProcessCDR, testRpcdrsGetQueueStringMetrics, testRpcdrsStopEngine, + testCsvVerifyExports, testRpcdrsRemoveDirectory, } ) @@ -177,6 +178,7 @@ func testRpcdrsProcessFirstCDR(t *testing.T) { CGREvent: utils.CGREvent{ Tenant: "cgrates.org", ID: "event1", + Time: utils.TimePointer(time.Now()), Event: map[string]any{ utils.RunID: "run_1", utils.CGRID: CGRID, @@ -254,6 +256,7 @@ func testRpcdrsProcessSecondCDR(t *testing.T) { CGREvent: utils.CGREvent{ Tenant: "cgrates.org", ID: "event2", + Time: utils.TimePointer(time.Now()), Event: map[string]any{ utils.RunID: "run_2", utils.CGRID: CGRID, @@ -334,7 +337,6 @@ func testRpcdrsGetCDRs(t *testing.T) { if err != nil { t.Fatal(err) } - // t.Log(utils.ToJSON(cdrs)) } func testRpcdrsStopEngine(t *testing.T) { @@ -434,7 +436,7 @@ func testCsvVerifyExports(t *testing.T) { if len(files) != 1 { t.Fatalf("Expected %+v, received: %+v", 1, len(files)) } - eCnt := "STAT_AGG,120000000000,1.2,1" + "\n" + "STAT_AGG,240000000000,1.8,2" + eCnt := "STAT_AGG,120000000000,1.2,1\nSTAT_AGG,240000000000,1.8,2\n" if outContent1, err := os.ReadFile(files[0]); err != nil { t.Error(err) } else if len(eCnt) != len(string(outContent1)) { diff --git a/utils/consts.go b/utils/consts.go index 40302470f..37aa0f6f5 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -680,6 +680,7 @@ const ( MetaCfg = "*cfg" MetaDynReq = "~*req" MetaScPrefix = "~*sc." // used for SMCostFilter + MetaEventTimestamp = "*eventTimestamp" CGROriginHost = "cgr_originhost" MetaInitiate = "*initiate" MetaUpdate = "*update" diff --git a/utils/dataprovider.go b/utils/dataprovider.go index dd4005133..a4f71aba7 100644 --- a/utils/dataprovider.go +++ b/utils/dataprovider.go @@ -79,9 +79,6 @@ func IsPathValid(path string) (err error) { return nil } paths := SplitPath(path, NestingSep[0], -1) - if len(paths) <= 1 { - return errors.New("Path is missing ") - } for _, path := range paths { if strings.TrimSpace(path) == EmptyString { return errors.New("Empty field path ") diff --git a/utils/dataprovider_test.go b/utils/dataprovider_test.go index a8b67b9bf..91ff858b3 100644 --- a/utils/dataprovider_test.go +++ b/utils/dataprovider_test.go @@ -141,16 +141,9 @@ func TestIsPathValid(t *testing.T) { t.Error(err) } - /// - path = "~Field1" - errExpect := "Path is missing " - if err := IsPathValid(path); err == nil || err.Error() != errExpect { - t.Errorf("Expected %v but received %v", errExpect, err) - } - /// path = "~Field1.\n\t.Field2[0]" - errExpect = "Empty field path " + errExpect := "Empty field path " if err := IsPathValid(path); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) }