diff --git a/data/conf/samples/ers_internal/cgrates.json b/data/conf/samples/ers_internal/cgrates.json index 6691af337..67a9385ee 100644 --- a/data/conf/samples/ers_internal/cgrates.json +++ b/data/conf/samples/ers_internal/cgrates.json @@ -167,6 +167,7 @@ "flags": ["*cdrs","*log"], "filters":["*string:~*req.3:1002"], "fields":[ + {"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID"}, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.0", "mandatory": true}, {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.1", "mandatory": true}, @@ -195,6 +196,7 @@ "xmlRootPath": "broadWorksCDR.cdrData" }, "fields":[ + {"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "*vars.*readerID"}, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.localCallId", "mandatory": true}, {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*rated", "mandatory": true}, @@ -240,6 +242,7 @@ "flags": ["*cdrs"], "processed_path": "/tmp/fwvErs/out", "fields": [ + {"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID"}, {"tag": "FileName", "path": "*cgreq.CdrFileName", "type": "*variable", "value": "~*hdr.95-135", "padding":"*right"}, {"tag": "FileSeqNr", "path": "*cgreq.FileSeqNr", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"}, {"tag": "AccId1", "path": "*cgreq.AccId1", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"}, @@ -463,6 +466,7 @@ "flags": ["*cdrs"], "processed_path": "/tmp/ErsJSON/out", "fields":[ + {"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID" }, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true}, {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.RequestType", "mandatory": true}, diff --git a/data/conf/samples/ers_mongo/cgrates.json b/data/conf/samples/ers_mongo/cgrates.json index cea058706..821a054e3 100644 --- a/data/conf/samples/ers_mongo/cgrates.json +++ b/data/conf/samples/ers_mongo/cgrates.json @@ -176,6 +176,7 @@ "flags": ["*cdrs","*log"], "filters":["*string:~*req.3:1002"], "fields":[ + {"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID"}, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.0", "mandatory": true}, {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.1", "mandatory": true}, @@ -204,6 +205,7 @@ "xmlRootPath": "broadWorksCDR.cdrData" }, "fields":[ + {"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "*vars.*readerID"}, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.localCallId", "mandatory": true}, {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*rated", "mandatory": true}, @@ -249,6 +251,7 @@ "flags": ["*cdrs"], "processed_path": "/tmp/fwvErs/out", "fields": [ + {"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID"}, {"tag": "FileName", "path": "*cgreq.CdrFileName", "type": "*variable", "value": "~*hdr.95-135", "padding":"*right"}, {"tag": "FileSeqNr", "path": "*cgreq.FileSeqNr", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"}, {"tag": "AccId1", "path": "*cgreq.AccId1", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"}, @@ -472,6 +475,7 @@ "flags": ["*cdrs"], "processed_path": "/tmp/ErsJSON/out", "fields":[ + {"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID" }, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true}, {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.RequestType", "mandatory": true}, diff --git a/data/conf/samples/ers_mysql/cgrates.json b/data/conf/samples/ers_mysql/cgrates.json index abf39d287..e034aa9e1 100644 --- a/data/conf/samples/ers_mysql/cgrates.json +++ b/data/conf/samples/ers_mysql/cgrates.json @@ -168,6 +168,7 @@ "flags": ["*cdrs","*log"], "filters":["*string:~*req.3:1002"], "fields":[ + {"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID" }, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.0", "mandatory": true}, {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.1", "mandatory": true}, @@ -196,6 +197,7 @@ "xmlRootPath": "broadWorksCDR.cdrData" }, "fields":[ + {"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "*vars.*readerID"}, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.localCallId", "mandatory": true}, {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*rated", "mandatory": true}, @@ -241,6 +243,7 @@ "flags": ["*cdrs"], "processed_path": "/tmp/fwvErs/out", "fields": [ + {"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID" }, {"tag": "FileName", "path": "*cgreq.CdrFileName", "type": "*variable", "value": "~*hdr.95-135", "padding":"*right"}, {"tag": "FileSeqNr", "path": "*cgreq.FileSeqNr", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"}, {"tag": "AccId1", "path": "*cgreq.AccId1", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"}, @@ -463,6 +466,7 @@ "flags": ["*cdrs"], "processed_path": "/tmp/ErsJSON/out", "fields":[ + {"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID" }, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true}, {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.RequestType", "mandatory": true}, diff --git a/data/conf/samples/ers_postgres/cgrates.json b/data/conf/samples/ers_postgres/cgrates.json index b52249c34..c1e7e70c2 100644 --- a/data/conf/samples/ers_postgres/cgrates.json +++ b/data/conf/samples/ers_postgres/cgrates.json @@ -158,6 +158,7 @@ "flags": ["*cdrs","*log"], "filters":["*string:~*req.3:1002"], "fields":[ + {"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID"}, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.0", "mandatory": true}, {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.1", "mandatory": true}, @@ -186,6 +187,7 @@ "xmlRootPath": "broadWorksCDR.cdrData" }, "fields":[ + {"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "*vars.*readerID"}, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.localCallId", "mandatory": true}, {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*rated", "mandatory": true}, @@ -231,6 +233,7 @@ "flags": ["*cdrs"], "processed_path": "/tmp/fwvErs/out", "fields": [ + {"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID"}, {"tag": "FileName", "path": "*cgreq.CdrFileName", "type": "*variable", "value": "~*hdr.95-135", "padding":"*right"}, {"tag": "FileSeqNr", "path": "*cgreq.FileSeqNr", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"}, {"tag": "AccId1", "path": "*cgreq.AccId1", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"}, @@ -453,6 +456,7 @@ "flags": ["*cdrs"], "processed_path": "/tmp/ErsJSON/out", "fields":[ + {"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID"}, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true}, {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.RequestType", "mandatory": true}, diff --git a/ers/amqp.go b/ers/amqp.go index 028b6132d..a4a1fb111 100644 --- a/ers/amqp.go +++ b/ers/amqp.go @@ -216,8 +216,9 @@ func (rdr *AMQPER) processMessage(msg []byte) (err error) { if err = json.Unmarshal(msg, &decodedMessage); err != nil { return } + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} agReq := agents.NewAgentRequest( - utils.MapStorage(decodedMessage), nil, + utils.MapStorage(decodedMessage), reqVars, nil, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, diff --git a/ers/amqp_it_test.go b/ers/amqp_it_test.go index 3ba4c0ba3..abb4ebc6c 100644 --- a/ers/amqp_it_test.go +++ b/ers/amqp_it_test.go @@ -59,6 +59,7 @@ func TestAMQPER(t *testing.T) { "flags": [], // flags to influence the event processing "fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], }, ], @@ -118,6 +119,7 @@ func TestAMQPER(t *testing.T) { ID: ev.cgrEvent.ID, Event: map[string]any{ "OriginID": randomOriginID, + "ReaderID": "amqp", }, APIOpts: map[string]any{}, } diff --git a/ers/amqpv1.go b/ers/amqpv1.go index b6702f990..ebcf8320c 100644 --- a/ers/amqpv1.go +++ b/ers/amqpv1.go @@ -171,8 +171,9 @@ func (rdr *AMQPv1ER) processMessage(msg []byte) (err error) { if err = json.Unmarshal(msg, &decodedMessage); err != nil { return } + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} agReq := agents.NewAgentRequest( - utils.MapStorage(decodedMessage), nil, + utils.MapStorage(decodedMessage), reqVars, nil, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, diff --git a/ers/amqpv1_it_test.go b/ers/amqpv1_it_test.go index a59ebcca6..6244abe32 100644 --- a/ers/amqpv1_it_test.go +++ b/ers/amqpv1_it_test.go @@ -63,6 +63,7 @@ func TestAMQPERv1(t *testing.T) { "flags": [], // flags to influence the event processing "fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], }, ], @@ -121,7 +122,9 @@ func TestAMQPERv1(t *testing.T) { ID: ev.cgrEvent.ID, Event: map[string]any{ "OriginID": randomOriginID, + "ReaderID": "amqpv1", }, + APIOpts: map[string]any{}, } if !reflect.DeepEqual(ev.cgrEvent, expected) { t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent)) diff --git a/ers/filecsv.go b/ers/filecsv.go index 25df40a1b..d7e768037 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -149,7 +149,7 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) { rowNr := 0 // This counts the rows in the file, not really number of CDRs evsPosted := 0 timeStart := time.Now() - reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName)}} + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName), utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} var hdrDefChar string if rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].Opts.CSVHeaderDefineChar != nil { hdrDefChar = *rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].Opts.CSVHeaderDefineChar diff --git a/ers/filecsv_it_test.go b/ers/filecsv_it_test.go index 39da15338..0e34a52f5 100644 --- a/ers/filecsv_it_test.go +++ b/ers/filecsv_it_test.go @@ -299,7 +299,11 @@ package ers // t.Errorf("Expecting: 1002, received: <%s> , <%s>", cdrs[0].Account, cdrs[1].Account) // } else if cdrs[0].Tenant != "itsyscom.com" || cdrs[1].Tenant != "itsyscom.com" { // t.Errorf("Expecting: itsyscom.com, received: <%s> , <%s>", cdrs[0].Tenant, cdrs[1].Tenant) -// } +// } else if cdrs[0].ExtraFields["ReaderID"] != "file_reader_with_filters" { +// t.Errorf("Expected <%v>, received <%v>", "file_reader_with_filters", cdrs[0]) +// } else if cdrs[1].ExtraFields["ReaderID"] != "file_reader_with_filters" { +// t.Errorf("Expected <%v>, received <%v>", "file_reader_with_filters", cdrs[1]) +// } // } // func testCsvITProcessedFiles(t *testing.T) { diff --git a/ers/filefwv.go b/ers/filefwv.go index 61562ec99..d0e925254 100644 --- a/ers/filefwv.go +++ b/ers/filefwv.go @@ -143,7 +143,7 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) { rowNr := 0 // This counts the rows in the file, not really number of CDRs evsPosted := 0 timeStart := time.Now() - reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName)}} + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName), utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} for { var hasHeader, hasTrailer bool diff --git a/ers/filefwv_it_test.go b/ers/filefwv_it_test.go index e101e4d8a..dfaef8166 100644 --- a/ers/filefwv_it_test.go +++ b/ers/filefwv_it_test.go @@ -186,6 +186,11 @@ func testFWVITAnalyseCDRs(t *testing.T) { } else if len(reply) != 34 { t.Error("Unexpected number of CDRs returned: ", len(reply)) } + for _, cdr := range reply { + if cdr.ExtraFields["ReaderID"] != "FWV1" { + t.Errorf("Expected <%v>, received <%v>", "FWV1", cdr.ExtraFields["ReaderID"]) + } + } if err := fwvRPC.Call(utils.APIerSv2GetCDRs, &utils.RPCCDRsFilter{OriginIDs: []string{"CDR0000010"}}, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) } else if len(reply) != 1 { diff --git a/ers/filejson.go b/ers/filejson.go index dce311c05..439654e06 100644 --- a/ers/filejson.go +++ b/ers/filejson.go @@ -144,7 +144,7 @@ func (rdr *JSONFileER) processFile(fPath, fName string) (err error) { } evsPosted := 0 - reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName)}} + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName), utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} agReq := agents.NewAgentRequest( utils.MapStorage(data), reqVars, diff --git a/ers/filexml.go b/ers/filexml.go index ab50e53cc..fb0fc2af6 100644 --- a/ers/filexml.go +++ b/ers/filexml.go @@ -168,7 +168,7 @@ func (rdr *XMLFileER) processFile(fPath, fName string) error { rowNr := 0 // This counts the rows in the file, not really number of CDRs evsPosted := 0 timeStart := time.Now() - reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName)}} + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName), utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} for _, xmlElmt := range xmlElmts { rowNr++ // increment the rowNr after checking if it's not the end of file agReq := agents.NewAgentRequest( diff --git a/ers/filexml_it_test.go b/ers/filexml_it_test.go index 5c3fc08d1..0e23a77a1 100644 --- a/ers/filexml_it_test.go +++ b/ers/filexml_it_test.go @@ -304,6 +304,11 @@ var cdrXmlBroadsoft = ` } else if len(reply) != 6 { t.Error("Unexpected number of CDRs returned: ", len(reply)) } + for _, cdr := range reply { + if cdr.ExtraFields["ReaderID"] != "XmlDryRun" { + t.Errorf("Expected <%v>, received <%v>", "XmlDryRun", cdr.ExtraFields["ReaderID"]) + } + } if err := xmlRPC.Call(utils.APIerSv2GetCDRs, &utils.RPCCDRsFilter{DestinationPrefixes: []string{"+4915117174963"}}, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) } else if len(reply) != 3 { diff --git a/ers/kafka.go b/ers/kafka.go index e91698214..cbb948477 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -196,9 +196,9 @@ func (rdr *KafkaER) processMessage(msg []byte) (err error) { if err = json.Unmarshal(msg, &decodedMessage); err != nil { return } - + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} agReq := agents.NewAgentRequest( - utils.MapStorage(decodedMessage), nil, + utils.MapStorage(decodedMessage), reqVars, nil, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, diff --git a/ers/kafka_it_test.go b/ers/kafka_it_test.go index d92ccfc58..04a130f64 100644 --- a/ers/kafka_it_test.go +++ b/ers/kafka_it_test.go @@ -90,7 +90,8 @@ func TestKafkaER(t *testing.T) { "filters": [], "flags": [], "fields":[ - {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"} + {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ] } ] @@ -145,6 +146,7 @@ func TestKafkaER(t *testing.T) { ID: ev.cgrEvent.ID, Event: map[string]any{ "OriginID": randomOriginID, + "ReaderID": cfg.ERsCfg().Readers[1].ID, }, APIOpts: map[string]any{}, } diff --git a/ers/nats.go b/ers/nats.go index b130af932..ac06bd3fd 100644 --- a/ers/nats.go +++ b/ers/nats.go @@ -204,8 +204,9 @@ func (rdr *NatsER) processMessage(msg []byte) (err error) { if err = json.Unmarshal(msg, &decodedMessage); err != nil { return } + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} agReq := agents.NewAgentRequest( - utils.MapStorage(decodedMessage), nil, + utils.MapStorage(decodedMessage), reqVars, nil, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, diff --git a/ers/nats_it_test.go b/ers/nats_it_test.go index d0333e0f6..ff541eba4 100644 --- a/ers/nats_it_test.go +++ b/ers/nats_it_test.go @@ -137,6 +137,7 @@ func testCheckNatsData(t *testing.T, randomOriginID, expData string, ch chan str ID: ev.cgrEvent.ID, Event: map[string]any{ "OriginID": randomOriginID, + "ReaderID": "nats", }, APIOpts: map[string]any{}, } @@ -312,6 +313,7 @@ func TestNatsERJetStream(t *testing.T) { "flags": [], "fields":[ {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], "opts": { "natsJetStream": true, @@ -360,6 +362,7 @@ func TestNatsER(t *testing.T) { "flags": [], "fields":[ {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], "opts": { "natsSubjectProcessed": "processed_cdrs", @@ -408,6 +411,7 @@ func TestNatsERJetStreamUser(t *testing.T) { "flags": [], "fields":[ {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], "opts": { "natsJetStream": true, @@ -458,6 +462,7 @@ func TestNatsERUser(t *testing.T) { "flags": [], "fields":[ {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], "opts": { "natsSubjectProcessed": "processed_cdrs", @@ -505,6 +510,7 @@ func TestNatsERJetStreamToken(t *testing.T) { "flags": [], "fields":[ {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], "opts": { "natsJetStream": true, @@ -555,6 +561,7 @@ func TestNatsERToken(t *testing.T) { "flags": [], "fields":[ {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], "opts": { "natsSubjectProcessed": "processed_cdrs", @@ -617,6 +624,7 @@ func TestNatsERNkey(t *testing.T) { "flags": [], "fields":[ {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], "opts": { "natsSubjectProcessed": "processed_cdrs", @@ -682,6 +690,7 @@ func TestNatsERJetStreamNKey(t *testing.T) { "flags": [], "fields":[ {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], "opts": { "natsJetStream": true, @@ -770,6 +779,7 @@ resolver_preload: { "flags": [], "fields":[ {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], "opts": { "natsSubjectProcessed": "processed_cdrs", @@ -860,6 +870,7 @@ system_account:AAFIBB6C56ROU5XRVJLJYR3BTGGYK3HJGHEHQV7L7QZMTT3ZRBLHBS7F "flags": [], "fields":[ {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], "opts": { "natsJetStream": true, diff --git a/ers/s3.go b/ers/s3.go index 1387513e2..cba9d3c8b 100644 --- a/ers/s3.go +++ b/ers/s3.go @@ -119,9 +119,9 @@ func (rdr *S3ER) processMessage(body []byte) (err error) { if err = json.Unmarshal(body, &decodedMessage); err != nil { return } - + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} agReq := agents.NewAgentRequest( - utils.MapStorage(decodedMessage), nil, + utils.MapStorage(decodedMessage), reqVars, nil, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, diff --git a/ers/s3_it_test.go b/ers/s3_it_test.go index 939c00536..41019507e 100644 --- a/ers/s3_it_test.go +++ b/ers/s3_it_test.go @@ -70,6 +70,7 @@ func TestS3ER(t *testing.T) { }, "fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], }, ], @@ -123,7 +124,9 @@ func TestS3ER(t *testing.T) { ID: ev.cgrEvent.ID, Event: map[string]any{ "OriginID": randomOriginID, + "ReaderID": "s3", }, + APIOpts: map[string]any{}, } if !reflect.DeepEqual(ev.cgrEvent, expected) { t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent)) diff --git a/ers/sql.go b/ers/sql.go index 5d8813474..014ae8ac5 100644 --- a/ers/sql.go +++ b/ers/sql.go @@ -237,8 +237,9 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) { } func (rdr *SQLEventReader) processMessage(msg map[string]any) (err error) { + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} agReq := agents.NewAgentRequest( - utils.MapStorage(msg), nil, + utils.MapStorage(msg), reqVars, nil, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, diff --git a/ers/sqs.go b/ers/sqs.go index d8ae6e736..6262fed06 100644 --- a/ers/sqs.go +++ b/ers/sqs.go @@ -112,9 +112,9 @@ func (rdr *SQSER) processMessage(body []byte) (err error) { if err = json.Unmarshal(body, &decodedMessage); err != nil { return } - + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} agReq := agents.NewAgentRequest( - utils.MapStorage(decodedMessage), nil, + utils.MapStorage(decodedMessage), reqVars, nil, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, diff --git a/ers/sqs_it_test.go b/ers/sqs_it_test.go index 8d29feb13..443a8a144 100644 --- a/ers/sqs_it_test.go +++ b/ers/sqs_it_test.go @@ -69,6 +69,7 @@ func TestSQSER(t *testing.T) { }, "fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], }, ], @@ -121,7 +122,9 @@ func TestSQSER(t *testing.T) { ID: ev.cgrEvent.ID, Event: map[string]any{ "OriginID": randomOriginID, + "ReaderID": "sqs", }, + APIOpts: map[string]any{}, } if !reflect.DeepEqual(ev.cgrEvent, expected) { t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent)) diff --git a/utils/consts.go b/utils/consts.go index f6a5acc44..a8d33abfa 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -753,6 +753,7 @@ const ( MetaGroup = "*group" InternalRPCSet = "InternalRPCSet" MetaFileName = "*fileName" + MetaReaderID = "*readerID" MetaRadauth = "*radauth" UserPassword = "UserPassword" RadauthFailed = "RADAUTH_FAILED"