Add *eventTimestamp for EES

This commit is contained in:
arberkatellari
2024-09-20 15:16:25 +02:00
committed by Dan Christian Bogos
parent c7e7634189
commit bd6f5c661d
10 changed files with 18 additions and 20 deletions

View File

@@ -41,7 +41,6 @@
"stats": {
"enabled": true,
"store_interval": "1s",
"ees_conns": ["*localhost"],
"ees_exporter_ids": ["exporter1"]
},
@@ -59,6 +58,7 @@
"id": "exporter1",
"type": "*file_csv",
"export_path": "/tmp/testCSV",
"filters": ["*gt:~*eventTimestamp:2024-09-19T14:00:58+02:00"],
"attempts": 1,
"synchronous": true,
"field_separator": ",",

View File

@@ -38,7 +38,6 @@
"stats": {
"enabled": true,
"store_interval": "1s",
"ees_conns": ["*localhost"],
"ees_exporter_ids": ["exporter1"]
},
@@ -56,6 +55,7 @@
"id": "exporter1",
"type": "*file_csv",
"export_path": "/tmp/testCSV",
"filters": ["*gt:~*eventTimestamp:2024-09-19T14:00:58+02:00"],
"attempts": 1,
"synchronous": true,
"field_separator": ",",

View File

@@ -159,6 +159,9 @@ func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *engine.CG
utils.MetaReq: cgrEv.Event,
utils.MetaOpts: cgrEv.APIOpts,
}
if cgrEv.CGREvent.Time != nil {
cgrDp[utils.MetaEventTimestamp] = *cgrEv.CGREvent.Time
}
var wg sync.WaitGroup
var withErr bool

View File

@@ -66,7 +66,7 @@ var initialDPPrefixes = utils.NewStringSet([]string{
utils.MetaCgrep, utils.MetaRep, utils.MetaAct,
utils.MetaEC, utils.MetaUCH, utils.MetaOpts,
utils.MetaHdr, utils.MetaTrl, utils.MetaCfg,
utils.MetaTenant})
utils.MetaTenant, utils.MetaEventTimestamp})
func (dDP *dynamicDP) FieldAsInterface(fldPath []string) (val any, err error) {
if len(fldPath) == 0 {

View File

@@ -24,6 +24,7 @@ import (
"net/http"
"net/http/httptest"
"reflect"
"strings"
"testing"
"time"
@@ -2383,12 +2384,12 @@ func TestCheckFilterErr(t *testing.T) {
Rules: []*FilterRule{
{
Type: utils.MetaString,
Element: "~*reqCharger",
Element: "~.",
Values: []string{"ChargerProfile2"},
},
},
}
if err := CheckFilter(fltr); err == nil {
if err := CheckFilter(fltr); err == nil || !strings.Contains(err.Error(), "Empty field path for filter <&{cgrates.org FLTR_CP_2 ") {
t.Error(err)
}
fltr = &Filter{
@@ -2397,11 +2398,11 @@ func TestCheckFilterErr(t *testing.T) {
Rules: []*FilterRule{{
Element: "~*req.Account",
Type: utils.MetaString,
Values: []string{"~1001"},
Values: []string{"~."},
},
},
}
if err := CheckFilter(fltr); err == nil {
if err := CheckFilter(fltr); err == nil || !strings.Contains(err.Error(), "Empty field path for filter <&{cgrates.org TestFilter ") {
t.Error(err)
}
}

View File

@@ -334,6 +334,7 @@ func (sS *StatService) processEEs(sQs StatQueues, opts map[string]any) (err erro
cgrEv := &utils.CGREvent{
Tenant: sq.Tenant,
ID: utils.GenUUID(),
Time: utils.TimePointer(time.Now()),
Event: map[string]any{
utils.EventType: utils.StatUpdate,
utils.StatID: sq.ID,

View File

@@ -66,6 +66,7 @@ var (
testRpcdrsCheckAccountBalancesAfterSecondProcessCDR,
testRpcdrsGetQueueStringMetrics,
testRpcdrsStopEngine,
testCsvVerifyExports,
testRpcdrsRemoveDirectory,
}
)
@@ -177,6 +178,7 @@ func testRpcdrsProcessFirstCDR(t *testing.T) {
CGREvent: utils.CGREvent{
Tenant: "cgrates.org",
ID: "event1",
Time: utils.TimePointer(time.Now()),
Event: map[string]any{
utils.RunID: "run_1",
utils.CGRID: CGRID,
@@ -254,6 +256,7 @@ func testRpcdrsProcessSecondCDR(t *testing.T) {
CGREvent: utils.CGREvent{
Tenant: "cgrates.org",
ID: "event2",
Time: utils.TimePointer(time.Now()),
Event: map[string]any{
utils.RunID: "run_2",
utils.CGRID: CGRID,
@@ -334,7 +337,6 @@ func testRpcdrsGetCDRs(t *testing.T) {
if err != nil {
t.Fatal(err)
}
// t.Log(utils.ToJSON(cdrs))
}
func testRpcdrsStopEngine(t *testing.T) {
@@ -434,7 +436,7 @@ func testCsvVerifyExports(t *testing.T) {
if len(files) != 1 {
t.Fatalf("Expected %+v, received: %+v", 1, len(files))
}
eCnt := "STAT_AGG,120000000000,1.2,1" + "\n" + "STAT_AGG,240000000000,1.8,2"
eCnt := "STAT_AGG,120000000000,1.2,1\nSTAT_AGG,240000000000,1.8,2\n"
if outContent1, err := os.ReadFile(files[0]); err != nil {
t.Error(err)
} else if len(eCnt) != len(string(outContent1)) {

View File

@@ -680,6 +680,7 @@ const (
MetaCfg = "*cfg"
MetaDynReq = "~*req"
MetaScPrefix = "~*sc." // used for SMCostFilter
MetaEventTimestamp = "*eventTimestamp"
CGROriginHost = "cgr_originhost"
MetaInitiate = "*initiate"
MetaUpdate = "*update"

View File

@@ -79,9 +79,6 @@ func IsPathValid(path string) (err error) {
return nil
}
paths := SplitPath(path, NestingSep[0], -1)
if len(paths) <= 1 {
return errors.New("Path is missing ")
}
for _, path := range paths {
if strings.TrimSpace(path) == EmptyString {
return errors.New("Empty field path ")

View File

@@ -141,16 +141,9 @@ func TestIsPathValid(t *testing.T) {
t.Error(err)
}
///
path = "~Field1"
errExpect := "Path is missing "
if err := IsPathValid(path); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
///
path = "~Field1.\n\t.Field2[0]"
errExpect = "Empty field path "
errExpect := "Empty field path "
if err := IsPathValid(path); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}