From 0bab7d555713fa1bce4a2eca70e89e35fe77f4fa Mon Sep 17 00:00:00 2001 From: arberkatellari Date: Wed, 7 Aug 2024 15:30:55 +0200 Subject: [PATCH] Add reader id within the *vars in ERs --- data/conf/samples/ers_internal/cgrates.json | 4 ++++ data/conf/samples/ers_mongo/cgrates.json | 4 ++++ data/conf/samples/ers_mysql/cgrates.json | 4 ++++ data/conf/samples/ers_postgres/cgrates.json | 4 ++++ ers/amqp.go | 3 ++- ers/amqp_it_test.go | 5 +++-- ers/amqpv1.go | 5 ++++- ers/amqpv1_it_test.go | 5 ++++- ers/filecsv.go | 2 +- ers/filecsv_it_test.go | 4 ++++ ers/filefwv.go | 2 +- ers/filefwv_it_test.go | 5 +++++ ers/filejson.go | 2 +- ers/filejson_it_test.go | 2 ++ ers/filexml.go | 2 +- ers/filexml_it_test.go | 5 +++++ ers/kafka.go | 4 +++- ers/kafka_it_test.go | 6 ++++-- ers/nats.go | 5 ++++- ers/nats_it_test.go | 3 ++- ers/readers_test.go | 2 +- ers/s3.go | 4 +++- ers/s3_it_test.go | 5 ++++- ers/sql.go | 3 ++- ers/sql_it_test.go | 7 +++++-- ers/sqs.go | 4 +++- ers/sqs_it_test.go | 5 ++++- utils/consts.go | 1 + 28 files changed, 85 insertions(+), 22 deletions(-) diff --git a/data/conf/samples/ers_internal/cgrates.json b/data/conf/samples/ers_internal/cgrates.json index f4f47aaa5..22f5934ba 100644 --- a/data/conf/samples/ers_internal/cgrates.json +++ b/data/conf/samples/ers_internal/cgrates.json @@ -200,6 +200,7 @@ {"tag": "HDRExtra3", "path": "*cgreq.HDRExtra3", "type": "*variable", "value": "~*req.6", "mandatory": true}, {"tag": "HDRExtra2", "path": "*cgreq.HDRExtra2", "type": "*variable", "value": "~*req.6", "mandatory": true}, {"tag": "HDRExtra1", "path": "*cgreq.HDRExtra1", "type": "*variable", "value": "~*req.6", "mandatory": true}, + {"tag": "ReaderId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], }, { @@ -213,6 +214,7 @@ "xmlRootPath": "broadWorksCDR.cdrData", }, "fields":[ + {"tag": "ReaderId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.localCallId", "mandatory": true}, {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*rated", "mandatory": true}, @@ -259,6 +261,7 @@ "flags": ["*cdrs"], "filters": ["*regex:~*req.0-10:^CDR00000|^CDR00001|^CDR00002"], "fields": [ + {"tag": "ReaderId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, {"tag": "FileName", "path": "*cgreq.CdrFileName", "type": "*variable", "value": "~*hdr.95-135", "padding":"*right"}, {"tag": "FileSeqNr", "path": "*cgreq.FileSeqNr", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"}, {"tag": "AccId1", "path": "*cgreq.AccId1", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"}, @@ -482,6 +485,7 @@ "flags": ["*cdrs"], "processed_path": "/tmp/ErsJSON/out", "fields":[ + {"tag": "ReaderId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true}, {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.RequestType", "mandatory": true}, diff --git a/data/conf/samples/ers_mongo/cgrates.json b/data/conf/samples/ers_mongo/cgrates.json index bd941b782..1dc2587e9 100644 --- a/data/conf/samples/ers_mongo/cgrates.json +++ b/data/conf/samples/ers_mongo/cgrates.json @@ -203,6 +203,7 @@ {"tag": "HDRExtra3", "path": "*cgreq.HDRExtra3", "type": "*variable", "value": "~*req.6", "mandatory": true}, {"tag": "HDRExtra2", "path": "*cgreq.HDRExtra2", "type": "*variable", "value": "~*req.6", "mandatory": true}, {"tag": "HDRExtra1", "path": "*cgreq.HDRExtra1", "type": "*variable", "value": "~*req.6", "mandatory": true}, + {"tag": "ReaderId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], }, { @@ -216,6 +217,7 @@ "xmlRootPath": "broadWorksCDR.cdrData", }, "fields":[ + {"tag": "ReaderId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.localCallId", "mandatory": true}, {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*rated", "mandatory": true}, @@ -262,6 +264,7 @@ "flags": ["*cdrs"], "filters": ["*regex:~*req.0-10:^CDR00000|^CDR00001|^CDR00002"], "fields": [ + {"tag": "ReaderId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, {"tag": "FileName", "path": "*cgreq.CdrFileName", "type": "*variable", "value": "~*hdr.95-135", "padding":"*right"}, {"tag": "FileSeqNr", "path": "*cgreq.FileSeqNr", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"}, {"tag": "AccId1", "path": "*cgreq.AccId1", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"}, @@ -485,6 +488,7 @@ "flags": ["*cdrs"], "processed_path": "/tmp/ErsJSON/out", "fields":[ + {"tag": "ReaderId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true}, {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.RequestType", "mandatory": true}, diff --git a/data/conf/samples/ers_mysql/cgrates.json b/data/conf/samples/ers_mysql/cgrates.json index 5612731c1..35ced5374 100644 --- a/data/conf/samples/ers_mysql/cgrates.json +++ b/data/conf/samples/ers_mysql/cgrates.json @@ -199,6 +199,7 @@ {"tag": "HDRExtra3", "path": "*cgreq.HDRExtra3", "type": "*variable", "value": "~*req.6", "mandatory": true}, {"tag": "HDRExtra2", "path": "*cgreq.HDRExtra2", "type": "*variable", "value": "~*req.6", "mandatory": true}, {"tag": "HDRExtra1", "path": "*cgreq.HDRExtra1", "type": "*variable", "value": "~*req.6", "mandatory": true}, + {"tag": "ReaderId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID", "mandatory": true}, ], }, { @@ -212,6 +213,7 @@ "xmlRootPath": "broadWorksCDR.cdrData", }, "fields":[ + {"tag": "ReaderId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.localCallId", "mandatory": true}, {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*rated", "mandatory": true}, @@ -258,6 +260,7 @@ "flags": ["*cdrs"], "filters": ["*regex:~*req.0-10:^CDR00000|^CDR00001|^CDR00002"], "fields": [ + {"tag": "ReaderId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, {"tag": "FileName", "path": "*cgreq.CdrFileName", "type": "*variable", "value": "~*hdr.95-135", "padding":"*right"}, {"tag": "FileSeqNr", "path": "*cgreq.FileSeqNr", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"}, {"tag": "AccId1", "path": "*cgreq.AccId1", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"}, @@ -481,6 +484,7 @@ "flags": ["*cdrs"], "processed_path": "/tmp/ErsJSON/out", "fields":[ + {"tag": "ReaderId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true}, {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.RequestType", "mandatory": true}, diff --git a/data/conf/samples/ers_postgres/cgrates.json b/data/conf/samples/ers_postgres/cgrates.json index 871c72ddf..b171c576a 100644 --- a/data/conf/samples/ers_postgres/cgrates.json +++ b/data/conf/samples/ers_postgres/cgrates.json @@ -196,6 +196,7 @@ {"tag": "HDRExtra3", "path": "*cgreq.HDRExtra3", "type": "*variable", "value": "~*req.6", "mandatory": true}, {"tag": "HDRExtra2", "path": "*cgreq.HDRExtra2", "type": "*variable", "value": "~*req.6", "mandatory": true}, {"tag": "HDRExtra1", "path": "*cgreq.HDRExtra1", "type": "*variable", "value": "~*req.6", "mandatory": true}, + {"tag": "ReaderId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], }, { @@ -209,6 +210,7 @@ "xmlRootPath": "broadWorksCDR.cdrData", }, "fields":[ + {"tag": "ReaderId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.localCallId", "mandatory": true}, {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*rated", "mandatory": true}, @@ -255,6 +257,7 @@ "flags": ["*cdrs"], "filters": ["*regex:~*req.0-10:^CDR00000|^CDR00001|^CDR00002"], "fields": [ + {"tag": "ReaderId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, {"tag": "FileName", "path": "*cgreq.CdrFileName", "type": "*variable", "value": "~*hdr.95-135", "padding":"*right"}, {"tag": "FileSeqNr", "path": "*cgreq.FileSeqNr", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"}, {"tag": "AccId1", "path": "*cgreq.AccId1", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"}, @@ -478,6 +481,7 @@ "flags": ["*cdrs"], "processed_path": "/tmp/ErsJSON/out", "fields":[ + {"tag": "ReaderId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true}, {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.RequestType", "mandatory": true}, diff --git a/ers/amqp.go b/ers/amqp.go index 7ed9b7cab..4073df121 100644 --- a/ers/amqp.go +++ b/ers/amqp.go @@ -121,9 +121,10 @@ func (rdr *AMQPER) processMessage(msg []byte) error { if err != nil { return err } + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} agReq := agents.NewAgentRequest( - utils.MapStorage(decodedMessage), nil, + utils.MapStorage(decodedMessage), reqVars, nil, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, diff --git a/ers/amqp_it_test.go b/ers/amqp_it_test.go index 441b8bd29..5aae15cc2 100644 --- a/ers/amqp_it_test.go +++ b/ers/amqp_it_test.go @@ -59,6 +59,7 @@ func TestAMQPER(t *testing.T) { "flags": [], // flags to influence the event processing "fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], }, ], @@ -76,7 +77,6 @@ func TestAMQPER(t *testing.T) { rdrEvents = make(chan *erEvent, 1) rdrErr = make(chan error, 1) rdrExit = make(chan struct{}, 1) - if rdr, err = NewAMQPER(cfg, 1, rdrEvents, make(chan *erEvent, 1), rdrErr, new(engine.FilterS), rdrExit); err != nil { t.Fatal(err) @@ -120,7 +120,8 @@ func TestAMQPER(t *testing.T) { ID: ev.cgrEvent.ID, Time: ev.cgrEvent.Time, Event: map[string]any{ - "CGRID": randomCGRID, + "CGRID": randomCGRID, + "ReaderID": "amqp", }, APIOpts: map[string]any{}, } diff --git a/ers/amqpv1.go b/ers/amqpv1.go index d972a7540..a4a8fd5bc 100644 --- a/ers/amqpv1.go +++ b/ers/amqpv1.go @@ -155,8 +155,11 @@ func (rdr *AMQPv1ER) processMessage(msg []byte) (err error) { if err = json.Unmarshal(msg, &decodedMessage); err != nil { return } + + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} + agReq := agents.NewAgentRequest( - utils.MapStorage(decodedMessage), nil, + utils.MapStorage(decodedMessage), reqVars, nil, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, diff --git a/ers/amqpv1_it_test.go b/ers/amqpv1_it_test.go index 410791d61..66a800036 100644 --- a/ers/amqpv1_it_test.go +++ b/ers/amqpv1_it_test.go @@ -64,6 +64,7 @@ func TestAMQPERv1(t *testing.T) { "flags": [], // flags to influence the event processing "fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], }, ], @@ -125,8 +126,10 @@ func TestAMQPERv1(t *testing.T) { ID: ev.cgrEvent.ID, Time: ev.cgrEvent.Time, Event: map[string]any{ - "CGRID": randomCGRID, + "CGRID": randomCGRID, + "ReaderID": "amqpv1", }, + APIOpts: map[string]any{}, } if !reflect.DeepEqual(ev.cgrEvent, expected) { t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent)) diff --git a/ers/filecsv.go b/ers/filecsv.go index 90a9a1a80..ec5602f59 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -140,7 +140,7 @@ func (rdr *CSVFileER) processFile(fName string) (err error) { rowNr := 0 // This counts the rows in the file, not really number of CDRs evsPosted := 0 timeStart := time.Now() - reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName)}} + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName), utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} var hdrDefChar string if rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].Opts.CSV.HeaderDefineChar != nil { hdrDefChar = *rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].Opts.CSV.HeaderDefineChar diff --git a/ers/filecsv_it_test.go b/ers/filecsv_it_test.go index 5998a4e31..56da10ff7 100644 --- a/ers/filecsv_it_test.go +++ b/ers/filecsv_it_test.go @@ -309,6 +309,10 @@ func testCsvITAnalyzeFilteredCDR(t *testing.T) { t.Errorf("Expecting: 1002, received: <%s> , <%s>", cdrs[0].Account, cdrs[1].Account) } else if cdrs[0].Tenant != "itsyscom.com" || cdrs[1].Tenant != "itsyscom.com" { t.Errorf("Expecting: itsyscom.com, received: <%s> , <%s>", cdrs[0].Tenant, cdrs[1].Tenant) + } else if cdrs[0].ExtraFields["ReaderID"] != "file_reader_with_filters" { + t.Errorf("Expected <%v>, received <%v>", "file_reader_with_filters", cdrs[0]) + } else if cdrs[1].ExtraFields["ReaderID"] != "file_reader_with_filters" { + t.Errorf("Expected <%v>, received <%v>", "file_reader_with_filters", cdrs[1]) } } diff --git a/ers/filefwv.go b/ers/filefwv.go index 34ea726d7..bb9affca9 100644 --- a/ers/filefwv.go +++ b/ers/filefwv.go @@ -135,7 +135,7 @@ func (rdr *FWVFileER) processFile(fName string) (err error) { rowNr := 0 // This counts the rows in the file, not really number of CDRs evsPosted := 0 timeStart := time.Now() - reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName)}} + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName), utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} for { var hasHeader, hasTrailer bool diff --git a/ers/filefwv_it_test.go b/ers/filefwv_it_test.go index 1fe713ab2..e46f4d292 100644 --- a/ers/filefwv_it_test.go +++ b/ers/filefwv_it_test.go @@ -193,6 +193,11 @@ func testFWVITAnalyseCDRs(t *testing.T) { } else if len(reply) != 29 { t.Error("Unexpected number of CDRs returned: ", len(reply)) } + for _, cdr := range reply { + if cdr.ExtraFields["ReaderID"] != "FWV1" { + t.Errorf("Expected <%v>, received <%v>", "FWV1", cdr.ExtraFields["ReaderID"]) + } + } if err := fwvRPC.Call(context.Background(), utils.APIerSv2GetCDRs, &utils.RPCCDRsFilter{OriginIDs: []string{"CDR0000010"}}, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) } else if len(reply) != 1 { diff --git a/ers/filejson.go b/ers/filejson.go index 2562d8ec8..5d406d97b 100644 --- a/ers/filejson.go +++ b/ers/filejson.go @@ -137,7 +137,7 @@ func (rdr *JSONFileER) processFile(fName string) (err error) { } evsPosted := 0 - reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName)}} + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName), utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} agReq := agents.NewAgentRequest( utils.MapStorage(data), reqVars, diff --git a/ers/filejson_it_test.go b/ers/filejson_it_test.go index 3358a0d05..746747b23 100644 --- a/ers/filejson_it_test.go +++ b/ers/filejson_it_test.go @@ -212,6 +212,8 @@ func testJSONVerify(t *testing.T) { } else { if cdrs[0].Usage != 2*time.Minute { t.Errorf("Unexpected usage for CDR: %d", cdrs[0].Usage) + } else if cdrs[0].ExtraFields["ReaderID"] != "JSONReader" { + t.Errorf("Expected readerID <%v>, received <%v>", "JSONReader", cdrs[0].ExtraFields["ReaderID"]) } } diff --git a/ers/filexml.go b/ers/filexml.go index f828f5f3c..b5f0a1a4a 100644 --- a/ers/filexml.go +++ b/ers/filexml.go @@ -159,7 +159,7 @@ func (rdr *XMLFileER) processFile(fName string) error { rowNr := 0 // This counts the rows in the file, not really number of CDRs evsPosted := 0 timeStart := time.Now() - reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName)}} + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName), utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} for _, xmlElmt := range xmlElmts { rowNr++ // increment the rowNr after checking if it's not the end of file agReq := agents.NewAgentRequest( diff --git a/ers/filexml_it_test.go b/ers/filexml_it_test.go index ec4740c52..108142e6f 100644 --- a/ers/filexml_it_test.go +++ b/ers/filexml_it_test.go @@ -279,6 +279,11 @@ func testXmlITAnalyseCDRs(t *testing.T) { } else if len(reply) != 6 { t.Error("Unexpected number of CDRs returned: ", len(reply)) } + for _, cdr := range reply { + if cdr.ExtraFields["ReaderID"] != "XmlDryRun" { + t.Errorf("Expected <%v>, received <%v>", "XmlDryRun", cdr.ExtraFields["ReaderID"]) + } + } if err := xmlRPC.Call(context.Background(), utils.APIerSv2GetCDRs, &utils.RPCCDRsFilter{DestinationPrefixes: []string{"+4915117174963"}}, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) } else if len(reply) != 3 { diff --git a/ers/kafka.go b/ers/kafka.go index 3c66a7db1..2b2f04c12 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -175,8 +175,10 @@ func (rdr *KafkaER) processMessage(msg []byte) (err error) { return } + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} + agReq := agents.NewAgentRequest( - utils.MapStorage(decodedMessage), nil, + utils.MapStorage(decodedMessage), reqVars, nil, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, diff --git a/ers/kafka_it_test.go b/ers/kafka_it_test.go index aa7326bc7..486199de3 100644 --- a/ers/kafka_it_test.go +++ b/ers/kafka_it_test.go @@ -91,7 +91,8 @@ func TestKafkaER(t *testing.T) { "filters": [], "flags": [], "fields":[ - {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"} + {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ] } ] @@ -146,7 +147,8 @@ func TestKafkaER(t *testing.T) { ID: ev.cgrEvent.ID, Time: ev.cgrEvent.Time, Event: map[string]any{ - "CGRID": randomCGRID, + "CGRID": randomCGRID, + "ReaderID": cfg.ERsCfg().Readers[1].ID, }, APIOpts: map[string]any{}, } diff --git a/ers/nats.go b/ers/nats.go index b7f6008dc..b09c724e5 100644 --- a/ers/nats.go +++ b/ers/nats.go @@ -178,8 +178,11 @@ func (rdr *NatsER) processMessage(msg []byte) (err error) { if err = json.Unmarshal(msg, &decodedMessage); err != nil { return } + + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} + agReq := agents.NewAgentRequest( - utils.MapStorage(decodedMessage), nil, + utils.MapStorage(decodedMessage), reqVars, nil, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, diff --git a/ers/nats_it_test.go b/ers/nats_it_test.go index d85c03365..4a322e1fe 100644 --- a/ers/nats_it_test.go +++ b/ers/nats_it_test.go @@ -172,7 +172,8 @@ var natsCfg string = `{ %s }, "fields":[ - {"tag": "CGRID", "type": "*variable", "value": "~*req.CGRID", "path": "*cgreq.CGRID"} + {"tag": "CGRID", "type": "*variable", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ] } ] diff --git a/ers/readers_test.go b/ers/readers_test.go index 7d4e671e4..5ca464a73 100644 --- a/ers/readers_test.go +++ b/ers/readers_test.go @@ -219,7 +219,7 @@ func TestNewAMQPReader(t *testing.T) { if err != nil { t.Error(err) } else if !reflect.DeepEqual(exp, rcv) { - t.Errorf("Expected %v but received %v", utils.ToJSON(exp), utils.ToJSON(rcv)) + t.Errorf("Expected %+v but received %+v", exp, rcv) } } diff --git a/ers/s3.go b/ers/s3.go index 8aaf76c12..8c93979e9 100644 --- a/ers/s3.go +++ b/ers/s3.go @@ -116,8 +116,10 @@ func (rdr *S3ER) processMessage(body []byte) (err error) { return } + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} + agReq := agents.NewAgentRequest( - utils.MapStorage(decodedMessage), nil, + utils.MapStorage(decodedMessage), reqVars, nil, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, diff --git a/ers/s3_it_test.go b/ers/s3_it_test.go index a8d1be888..0f36d6820 100644 --- a/ers/s3_it_test.go +++ b/ers/s3_it_test.go @@ -70,6 +70,7 @@ func TestS3ER(t *testing.T) { }, "fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], }, ], @@ -123,8 +124,10 @@ func TestS3ER(t *testing.T) { ID: ev.cgrEvent.ID, Time: ev.cgrEvent.Time, Event: map[string]any{ - "CGRID": randomCGRID, + "CGRID": randomCGRID, + "ReaderID": "s3", }, + APIOpts: map[string]any{}, } if !reflect.DeepEqual(ev.cgrEvent, expected) { t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent)) diff --git a/ers/sql.go b/ers/sql.go index f40e2af1b..273535c7b 100644 --- a/ers/sql.go +++ b/ers/sql.go @@ -222,8 +222,9 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) { } func (rdr *SQLEventReader) processMessage(msg map[string]any) (err error) { + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} agReq := agents.NewAgentRequest( - utils.MapStorage(msg), nil, + utils.MapStorage(msg), reqVars, nil, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, diff --git a/ers/sql_it_test.go b/ers/sql_it_test.go index 9b42a697e..bf4e5299b 100644 --- a/ers/sql_it_test.go +++ b/ers/sql_it_test.go @@ -110,6 +110,7 @@ func testSQLInitConfig(t *testing.T) { "flags": [], // flags to influence the event processing "fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value {"tag": "CGRID", "type": "*composed", "value": "~*req.cgrid", "path": "*cgreq.CGRID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], }, ], @@ -252,7 +253,8 @@ func testSQLReader(t *testing.T) { ID: ev.cgrEvent.ID, Time: ev.cgrEvent.Time, Event: map[string]any{ - "CGRID": cdr.CGRID, + "CGRID": cdr.CGRID, + "ReaderID": "mysql", }, APIOpts: map[string]any{}, } @@ -286,7 +288,8 @@ func testSQLReader2(t *testing.T) { ID: ev.cgrEvent.ID, Time: ev.cgrEvent.Time, Event: map[string]any{ - "CGRID": cdr.CGRID, + "CGRID": cdr.CGRID, + "ReaderID": "mysql", }, APIOpts: map[string]any{}, } diff --git a/ers/sqs.go b/ers/sqs.go index 3e8164c4e..cd6791b1a 100644 --- a/ers/sqs.go +++ b/ers/sqs.go @@ -104,8 +104,10 @@ func (rdr *SQSER) processMessage(body []byte) (err error) { return } + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} + agReq := agents.NewAgentRequest( - utils.MapStorage(decodedMessage), nil, + utils.MapStorage(decodedMessage), reqVars, nil, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, diff --git a/ers/sqs_it_test.go b/ers/sqs_it_test.go index 0dc912955..33a3ed540 100644 --- a/ers/sqs_it_test.go +++ b/ers/sqs_it_test.go @@ -69,6 +69,7 @@ func TestSQSER(t *testing.T) { }, "fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], }, ], @@ -121,8 +122,10 @@ func TestSQSER(t *testing.T) { ID: ev.cgrEvent.ID, Time: ev.cgrEvent.Time, Event: map[string]any{ - "CGRID": randomCGRID, + "CGRID": randomCGRID, + "ReaderID": "sqs", }, + APIOpts: map[string]any{}, } if !reflect.DeepEqual(ev.cgrEvent, expected) { t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent)) diff --git a/utils/consts.go b/utils/consts.go index e7c98b80f..faca069a4 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -824,6 +824,7 @@ const ( MetaGroup = "*group" InternalRPCSet = "InternalRPCSet" MetaFileName = "*fileName" + MetaReaderID = "*readerID" MetaRadauth = "*radauth" UserPassword = "UserPassword" RadauthFailed = "RADAUTH_FAILED"