Add reader id within the *vars in ERs

This commit is contained in:
arberkatellari
2024-08-12 13:33:35 +02:00
committed by Dan Christian Bogos
parent 73a7590f1c
commit c7221f0d1e
25 changed files with 75 additions and 16 deletions

View File

@@ -167,6 +167,7 @@
"flags": ["*cdrs","*log"],
"filters":["*string:~*req.3:1002"],
"fields":[
{"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID"},
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.0", "mandatory": true},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.1", "mandatory": true},
@@ -195,6 +196,7 @@
"xmlRootPath": "broadWorksCDR.cdrData"
},
"fields":[
{"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "*vars.*readerID"},
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.localCallId", "mandatory": true},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*rated", "mandatory": true},
@@ -240,6 +242,7 @@
"flags": ["*cdrs"],
"processed_path": "/tmp/fwvErs/out",
"fields": [
{"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID"},
{"tag": "FileName", "path": "*cgreq.CdrFileName", "type": "*variable", "value": "~*hdr.95-135", "padding":"*right"},
{"tag": "FileSeqNr", "path": "*cgreq.FileSeqNr", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"},
{"tag": "AccId1", "path": "*cgreq.AccId1", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"},
@@ -463,6 +466,7 @@
"flags": ["*cdrs"],
"processed_path": "/tmp/ErsJSON/out",
"fields":[
{"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID" },
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.RequestType", "mandatory": true},

View File

@@ -176,6 +176,7 @@
"flags": ["*cdrs","*log"],
"filters":["*string:~*req.3:1002"],
"fields":[
{"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID"},
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.0", "mandatory": true},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.1", "mandatory": true},
@@ -204,6 +205,7 @@
"xmlRootPath": "broadWorksCDR.cdrData"
},
"fields":[
{"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "*vars.*readerID"},
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.localCallId", "mandatory": true},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*rated", "mandatory": true},
@@ -249,6 +251,7 @@
"flags": ["*cdrs"],
"processed_path": "/tmp/fwvErs/out",
"fields": [
{"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID"},
{"tag": "FileName", "path": "*cgreq.CdrFileName", "type": "*variable", "value": "~*hdr.95-135", "padding":"*right"},
{"tag": "FileSeqNr", "path": "*cgreq.FileSeqNr", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"},
{"tag": "AccId1", "path": "*cgreq.AccId1", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"},
@@ -472,6 +475,7 @@
"flags": ["*cdrs"],
"processed_path": "/tmp/ErsJSON/out",
"fields":[
{"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID" },
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.RequestType", "mandatory": true},

View File

@@ -168,6 +168,7 @@
"flags": ["*cdrs","*log"],
"filters":["*string:~*req.3:1002"],
"fields":[
{"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID" },
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.0", "mandatory": true},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.1", "mandatory": true},
@@ -196,6 +197,7 @@
"xmlRootPath": "broadWorksCDR.cdrData"
},
"fields":[
{"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "*vars.*readerID"},
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.localCallId", "mandatory": true},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*rated", "mandatory": true},
@@ -241,6 +243,7 @@
"flags": ["*cdrs"],
"processed_path": "/tmp/fwvErs/out",
"fields": [
{"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID" },
{"tag": "FileName", "path": "*cgreq.CdrFileName", "type": "*variable", "value": "~*hdr.95-135", "padding":"*right"},
{"tag": "FileSeqNr", "path": "*cgreq.FileSeqNr", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"},
{"tag": "AccId1", "path": "*cgreq.AccId1", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"},
@@ -463,6 +466,7 @@
"flags": ["*cdrs"],
"processed_path": "/tmp/ErsJSON/out",
"fields":[
{"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID" },
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.RequestType", "mandatory": true},

View File

@@ -158,6 +158,7 @@
"flags": ["*cdrs","*log"],
"filters":["*string:~*req.3:1002"],
"fields":[
{"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID"},
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.0", "mandatory": true},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.1", "mandatory": true},
@@ -186,6 +187,7 @@
"xmlRootPath": "broadWorksCDR.cdrData"
},
"fields":[
{"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "*vars.*readerID"},
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.localCallId", "mandatory": true},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*rated", "mandatory": true},
@@ -231,6 +233,7 @@
"flags": ["*cdrs"],
"processed_path": "/tmp/fwvErs/out",
"fields": [
{"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID"},
{"tag": "FileName", "path": "*cgreq.CdrFileName", "type": "*variable", "value": "~*hdr.95-135", "padding":"*right"},
{"tag": "FileSeqNr", "path": "*cgreq.FileSeqNr", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"},
{"tag": "AccId1", "path": "*cgreq.AccId1", "type": "*variable", "value": "~*hdr.135-141", "padding":"*zeroleft"},
@@ -453,6 +456,7 @@
"flags": ["*cdrs"],
"processed_path": "/tmp/ErsJSON/out",
"fields":[
{"tag": "ReaderId", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID"},
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.RequestType", "mandatory": true},

View File

@@ -216,8 +216,9 @@ func (rdr *AMQPER) processMessage(msg []byte) (err error) {
if err = json.Unmarshal(msg, &decodedMessage); err != nil {
return
}
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
agReq := agents.NewAgentRequest(
utils.MapStorage(decodedMessage), nil,
utils.MapStorage(decodedMessage), reqVars,
nil, nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,

View File

@@ -59,6 +59,7 @@ func TestAMQPER(t *testing.T) {
"flags": [], // flags to influence the event processing
"fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
{"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
],
},
],
@@ -118,6 +119,7 @@ func TestAMQPER(t *testing.T) {
ID: ev.cgrEvent.ID,
Event: map[string]any{
"OriginID": randomOriginID,
"ReaderID": "amqp",
},
APIOpts: map[string]any{},
}

View File

@@ -171,8 +171,9 @@ func (rdr *AMQPv1ER) processMessage(msg []byte) (err error) {
if err = json.Unmarshal(msg, &decodedMessage); err != nil {
return
}
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
agReq := agents.NewAgentRequest(
utils.MapStorage(decodedMessage), nil,
utils.MapStorage(decodedMessage), reqVars,
nil, nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,

View File

@@ -63,6 +63,7 @@ func TestAMQPERv1(t *testing.T) {
"flags": [], // flags to influence the event processing
"fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
{"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
],
},
],
@@ -121,7 +122,9 @@ func TestAMQPERv1(t *testing.T) {
ID: ev.cgrEvent.ID,
Event: map[string]any{
"OriginID": randomOriginID,
"ReaderID": "amqpv1",
},
APIOpts: map[string]any{},
}
if !reflect.DeepEqual(ev.cgrEvent, expected) {
t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent))

View File

@@ -149,7 +149,7 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) {
rowNr := 0 // This counts the rows in the file, not really number of CDRs
evsPosted := 0
timeStart := time.Now()
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName)}}
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName), utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
var hdrDefChar string
if rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].Opts.CSVHeaderDefineChar != nil {
hdrDefChar = *rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].Opts.CSVHeaderDefineChar

View File

@@ -299,7 +299,11 @@ package ers
// t.Errorf("Expecting: 1002, received: <%s> , <%s>", cdrs[0].Account, cdrs[1].Account)
// } else if cdrs[0].Tenant != "itsyscom.com" || cdrs[1].Tenant != "itsyscom.com" {
// t.Errorf("Expecting: itsyscom.com, received: <%s> , <%s>", cdrs[0].Tenant, cdrs[1].Tenant)
// }
// } else if cdrs[0].ExtraFields["ReaderID"] != "file_reader_with_filters" {
// t.Errorf("Expected <%v>, received <%v>", "file_reader_with_filters", cdrs[0])
// } else if cdrs[1].ExtraFields["ReaderID"] != "file_reader_with_filters" {
// t.Errorf("Expected <%v>, received <%v>", "file_reader_with_filters", cdrs[1])
// }
// }
// func testCsvITProcessedFiles(t *testing.T) {

View File

@@ -143,7 +143,7 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) {
rowNr := 0 // This counts the rows in the file, not really number of CDRs
evsPosted := 0
timeStart := time.Now()
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName)}}
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName), utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
for {
var hasHeader, hasTrailer bool

View File

@@ -186,6 +186,11 @@ func testFWVITAnalyseCDRs(t *testing.T) {
} else if len(reply) != 34 {
t.Error("Unexpected number of CDRs returned: ", len(reply))
}
for _, cdr := range reply {
if cdr.ExtraFields["ReaderID"] != "FWV1" {
t.Errorf("Expected <%v>, received <%v>", "FWV1", cdr.ExtraFields["ReaderID"])
}
}
if err := fwvRPC.Call(utils.APIerSv2GetCDRs, &utils.RPCCDRsFilter{OriginIDs: []string{"CDR0000010"}}, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(reply) != 1 {

View File

@@ -144,7 +144,7 @@ func (rdr *JSONFileER) processFile(fPath, fName string) (err error) {
}
evsPosted := 0
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName)}}
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName), utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
agReq := agents.NewAgentRequest(
utils.MapStorage(data), reqVars,

View File

@@ -168,7 +168,7 @@ func (rdr *XMLFileER) processFile(fPath, fName string) error {
rowNr := 0 // This counts the rows in the file, not really number of CDRs
evsPosted := 0
timeStart := time.Now()
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName)}}
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName), utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
for _, xmlElmt := range xmlElmts {
rowNr++ // increment the rowNr after checking if it's not the end of file
agReq := agents.NewAgentRequest(

View File

@@ -304,6 +304,11 @@ var cdrXmlBroadsoft = `<?xml version="1.0" encoding="ISO-8859-1"?>
} else if len(reply) != 6 {
t.Error("Unexpected number of CDRs returned: ", len(reply))
}
for _, cdr := range reply {
if cdr.ExtraFields["ReaderID"] != "XmlDryRun" {
t.Errorf("Expected <%v>, received <%v>", "XmlDryRun", cdr.ExtraFields["ReaderID"])
}
}
if err := xmlRPC.Call(utils.APIerSv2GetCDRs, &utils.RPCCDRsFilter{DestinationPrefixes: []string{"+4915117174963"}}, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(reply) != 3 {

View File

@@ -196,9 +196,9 @@ func (rdr *KafkaER) processMessage(msg []byte) (err error) {
if err = json.Unmarshal(msg, &decodedMessage); err != nil {
return
}
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
agReq := agents.NewAgentRequest(
utils.MapStorage(decodedMessage), nil,
utils.MapStorage(decodedMessage), reqVars,
nil, nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,

View File

@@ -90,7 +90,8 @@ func TestKafkaER(t *testing.T) {
"filters": [],
"flags": [],
"fields":[
{"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}
{"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
]
}
]
@@ -145,6 +146,7 @@ func TestKafkaER(t *testing.T) {
ID: ev.cgrEvent.ID,
Event: map[string]any{
"OriginID": randomOriginID,
"ReaderID": cfg.ERsCfg().Readers[1].ID,
},
APIOpts: map[string]any{},
}

View File

@@ -204,8 +204,9 @@ func (rdr *NatsER) processMessage(msg []byte) (err error) {
if err = json.Unmarshal(msg, &decodedMessage); err != nil {
return
}
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
agReq := agents.NewAgentRequest(
utils.MapStorage(decodedMessage), nil,
utils.MapStorage(decodedMessage), reqVars,
nil, nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,

View File

@@ -137,6 +137,7 @@ func testCheckNatsData(t *testing.T, randomOriginID, expData string, ch chan str
ID: ev.cgrEvent.ID,
Event: map[string]any{
"OriginID": randomOriginID,
"ReaderID": "nats",
},
APIOpts: map[string]any{},
}
@@ -312,6 +313,7 @@ func TestNatsERJetStream(t *testing.T) {
"flags": [],
"fields":[
{"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
],
"opts": {
"natsJetStream": true,
@@ -360,6 +362,7 @@ func TestNatsER(t *testing.T) {
"flags": [],
"fields":[
{"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
],
"opts": {
"natsSubjectProcessed": "processed_cdrs",
@@ -408,6 +411,7 @@ func TestNatsERJetStreamUser(t *testing.T) {
"flags": [],
"fields":[
{"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
],
"opts": {
"natsJetStream": true,
@@ -458,6 +462,7 @@ func TestNatsERUser(t *testing.T) {
"flags": [],
"fields":[
{"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
],
"opts": {
"natsSubjectProcessed": "processed_cdrs",
@@ -505,6 +510,7 @@ func TestNatsERJetStreamToken(t *testing.T) {
"flags": [],
"fields":[
{"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
],
"opts": {
"natsJetStream": true,
@@ -555,6 +561,7 @@ func TestNatsERToken(t *testing.T) {
"flags": [],
"fields":[
{"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
],
"opts": {
"natsSubjectProcessed": "processed_cdrs",
@@ -617,6 +624,7 @@ func TestNatsERNkey(t *testing.T) {
"flags": [],
"fields":[
{"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
],
"opts": {
"natsSubjectProcessed": "processed_cdrs",
@@ -682,6 +690,7 @@ func TestNatsERJetStreamNKey(t *testing.T) {
"flags": [],
"fields":[
{"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
],
"opts": {
"natsJetStream": true,
@@ -770,6 +779,7 @@ resolver_preload: {
"flags": [],
"fields":[
{"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
],
"opts": {
"natsSubjectProcessed": "processed_cdrs",
@@ -860,6 +870,7 @@ system_account:AAFIBB6C56ROU5XRVJLJYR3BTGGYK3HJGHEHQV7L7QZMTT3ZRBLHBS7F
"flags": [],
"fields":[
{"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
],
"opts": {
"natsJetStream": true,

View File

@@ -119,9 +119,9 @@ func (rdr *S3ER) processMessage(body []byte) (err error) {
if err = json.Unmarshal(body, &decodedMessage); err != nil {
return
}
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
agReq := agents.NewAgentRequest(
utils.MapStorage(decodedMessage), nil,
utils.MapStorage(decodedMessage), reqVars,
nil, nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,

View File

@@ -70,6 +70,7 @@ func TestS3ER(t *testing.T) {
},
"fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
{"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
],
},
],
@@ -123,7 +124,9 @@ func TestS3ER(t *testing.T) {
ID: ev.cgrEvent.ID,
Event: map[string]any{
"OriginID": randomOriginID,
"ReaderID": "s3",
},
APIOpts: map[string]any{},
}
if !reflect.DeepEqual(ev.cgrEvent, expected) {
t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent))

View File

@@ -237,8 +237,9 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) {
}
func (rdr *SQLEventReader) processMessage(msg map[string]any) (err error) {
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
agReq := agents.NewAgentRequest(
utils.MapStorage(msg), nil,
utils.MapStorage(msg), reqVars,
nil, nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,

View File

@@ -112,9 +112,9 @@ func (rdr *SQSER) processMessage(body []byte) (err error) {
if err = json.Unmarshal(body, &decodedMessage); err != nil {
return
}
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
agReq := agents.NewAgentRequest(
utils.MapStorage(decodedMessage), nil,
utils.MapStorage(decodedMessage), reqVars,
nil, nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,

View File

@@ -69,6 +69,7 @@ func TestSQSER(t *testing.T) {
},
"fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
{"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
],
},
],
@@ -121,7 +122,9 @@ func TestSQSER(t *testing.T) {
ID: ev.cgrEvent.ID,
Event: map[string]any{
"OriginID": randomOriginID,
"ReaderID": "sqs",
},
APIOpts: map[string]any{},
}
if !reflect.DeepEqual(ev.cgrEvent, expected) {
t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent))

View File

@@ -753,6 +753,7 @@ const (
MetaGroup = "*group"
InternalRPCSet = "InternalRPCSet"
MetaFileName = "*fileName"
MetaReaderID = "*readerID"
MetaRadauth = "*radauth"
UserPassword = "UserPassword"
RadauthFailed = "RADAUTH_FAILED"