Add reader id within the *vars in ERs

This commit is contained in:
arberkatellari
2024-08-07 15:30:55 +02:00
committed by Dan Christian Bogos
parent 13ad7f3603
commit 0bab7d5557
28 changed files with 85 additions and 22 deletions

View File

@@ -121,9 +121,10 @@ func (rdr *AMQPER) processMessage(msg []byte) error {
if err != nil {
return err
}
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
agReq := agents.NewAgentRequest(
utils.MapStorage(decodedMessage), nil,
utils.MapStorage(decodedMessage), reqVars,
nil, nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,

View File

@@ -59,6 +59,7 @@ func TestAMQPER(t *testing.T) {
"flags": [], // flags to influence the event processing
"fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
{"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
],
},
],
@@ -76,7 +77,6 @@ func TestAMQPER(t *testing.T) {
rdrEvents = make(chan *erEvent, 1)
rdrErr = make(chan error, 1)
rdrExit = make(chan struct{}, 1)
if rdr, err = NewAMQPER(cfg, 1, rdrEvents, make(chan *erEvent, 1),
rdrErr, new(engine.FilterS), rdrExit); err != nil {
t.Fatal(err)
@@ -120,7 +120,8 @@ func TestAMQPER(t *testing.T) {
ID: ev.cgrEvent.ID,
Time: ev.cgrEvent.Time,
Event: map[string]any{
"CGRID": randomCGRID,
"CGRID": randomCGRID,
"ReaderID": "amqp",
},
APIOpts: map[string]any{},
}

View File

@@ -155,8 +155,11 @@ func (rdr *AMQPv1ER) processMessage(msg []byte) (err error) {
if err = json.Unmarshal(msg, &decodedMessage); err != nil {
return
}
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
agReq := agents.NewAgentRequest(
utils.MapStorage(decodedMessage), nil,
utils.MapStorage(decodedMessage), reqVars,
nil, nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,

View File

@@ -64,6 +64,7 @@ func TestAMQPERv1(t *testing.T) {
"flags": [], // flags to influence the event processing
"fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
{"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
],
},
],
@@ -125,8 +126,10 @@ func TestAMQPERv1(t *testing.T) {
ID: ev.cgrEvent.ID,
Time: ev.cgrEvent.Time,
Event: map[string]any{
"CGRID": randomCGRID,
"CGRID": randomCGRID,
"ReaderID": "amqpv1",
},
APIOpts: map[string]any{},
}
if !reflect.DeepEqual(ev.cgrEvent, expected) {
t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent))

View File

@@ -140,7 +140,7 @@ func (rdr *CSVFileER) processFile(fName string) (err error) {
rowNr := 0 // This counts the rows in the file, not really number of CDRs
evsPosted := 0
timeStart := time.Now()
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName)}}
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName), utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
var hdrDefChar string
if rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].Opts.CSV.HeaderDefineChar != nil {
hdrDefChar = *rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].Opts.CSV.HeaderDefineChar

View File

@@ -309,6 +309,10 @@ func testCsvITAnalyzeFilteredCDR(t *testing.T) {
t.Errorf("Expecting: 1002, received: <%s> , <%s>", cdrs[0].Account, cdrs[1].Account)
} else if cdrs[0].Tenant != "itsyscom.com" || cdrs[1].Tenant != "itsyscom.com" {
t.Errorf("Expecting: itsyscom.com, received: <%s> , <%s>", cdrs[0].Tenant, cdrs[1].Tenant)
} else if cdrs[0].ExtraFields["ReaderID"] != "file_reader_with_filters" {
t.Errorf("Expected <%v>, received <%v>", "file_reader_with_filters", cdrs[0])
} else if cdrs[1].ExtraFields["ReaderID"] != "file_reader_with_filters" {
t.Errorf("Expected <%v>, received <%v>", "file_reader_with_filters", cdrs[1])
}
}

View File

@@ -135,7 +135,7 @@ func (rdr *FWVFileER) processFile(fName string) (err error) {
rowNr := 0 // This counts the rows in the file, not really number of CDRs
evsPosted := 0
timeStart := time.Now()
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName)}}
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName), utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
for {
var hasHeader, hasTrailer bool

View File

@@ -193,6 +193,11 @@ func testFWVITAnalyseCDRs(t *testing.T) {
} else if len(reply) != 29 {
t.Error("Unexpected number of CDRs returned: ", len(reply))
}
for _, cdr := range reply {
if cdr.ExtraFields["ReaderID"] != "FWV1" {
t.Errorf("Expected <%v>, received <%v>", "FWV1", cdr.ExtraFields["ReaderID"])
}
}
if err := fwvRPC.Call(context.Background(), utils.APIerSv2GetCDRs, &utils.RPCCDRsFilter{OriginIDs: []string{"CDR0000010"}}, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(reply) != 1 {

View File

@@ -137,7 +137,7 @@ func (rdr *JSONFileER) processFile(fName string) (err error) {
}
evsPosted := 0
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName)}}
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName), utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
agReq := agents.NewAgentRequest(
utils.MapStorage(data), reqVars,

View File

@@ -212,6 +212,8 @@ func testJSONVerify(t *testing.T) {
} else {
if cdrs[0].Usage != 2*time.Minute {
t.Errorf("Unexpected usage for CDR: %d", cdrs[0].Usage)
} else if cdrs[0].ExtraFields["ReaderID"] != "JSONReader" {
t.Errorf("Expected readerID <%v>, received <%v>", "JSONReader", cdrs[0].ExtraFields["ReaderID"])
}
}

View File

@@ -159,7 +159,7 @@ func (rdr *XMLFileER) processFile(fName string) error {
rowNr := 0 // This counts the rows in the file, not really number of CDRs
evsPosted := 0
timeStart := time.Now()
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName)}}
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaFileName: utils.NewLeafNode(fName), utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
for _, xmlElmt := range xmlElmts {
rowNr++ // increment the rowNr after checking if it's not the end of file
agReq := agents.NewAgentRequest(

View File

@@ -279,6 +279,11 @@ func testXmlITAnalyseCDRs(t *testing.T) {
} else if len(reply) != 6 {
t.Error("Unexpected number of CDRs returned: ", len(reply))
}
for _, cdr := range reply {
if cdr.ExtraFields["ReaderID"] != "XmlDryRun" {
t.Errorf("Expected <%v>, received <%v>", "XmlDryRun", cdr.ExtraFields["ReaderID"])
}
}
if err := xmlRPC.Call(context.Background(), utils.APIerSv2GetCDRs, &utils.RPCCDRsFilter{DestinationPrefixes: []string{"+4915117174963"}}, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(reply) != 3 {

View File

@@ -175,8 +175,10 @@ func (rdr *KafkaER) processMessage(msg []byte) (err error) {
return
}
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
agReq := agents.NewAgentRequest(
utils.MapStorage(decodedMessage), nil,
utils.MapStorage(decodedMessage), reqVars,
nil, nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,

View File

@@ -91,7 +91,8 @@ func TestKafkaER(t *testing.T) {
"filters": [],
"flags": [],
"fields":[
{"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}
{"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
]
}
]
@@ -146,7 +147,8 @@ func TestKafkaER(t *testing.T) {
ID: ev.cgrEvent.ID,
Time: ev.cgrEvent.Time,
Event: map[string]any{
"CGRID": randomCGRID,
"CGRID": randomCGRID,
"ReaderID": cfg.ERsCfg().Readers[1].ID,
},
APIOpts: map[string]any{},
}

View File

@@ -178,8 +178,11 @@ func (rdr *NatsER) processMessage(msg []byte) (err error) {
if err = json.Unmarshal(msg, &decodedMessage); err != nil {
return
}
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
agReq := agents.NewAgentRequest(
utils.MapStorage(decodedMessage), nil,
utils.MapStorage(decodedMessage), reqVars,
nil, nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,

View File

@@ -172,7 +172,8 @@ var natsCfg string = `{
%s
},
"fields":[
{"tag": "CGRID", "type": "*variable", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}
{"tag": "CGRID", "type": "*variable", "value": "~*req.CGRID", "path": "*cgreq.CGRID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
]
}
]

View File

@@ -219,7 +219,7 @@ func TestNewAMQPReader(t *testing.T) {
if err != nil {
t.Error(err)
} else if !reflect.DeepEqual(exp, rcv) {
t.Errorf("Expected %v but received %v", utils.ToJSON(exp), utils.ToJSON(rcv))
t.Errorf("Expected %+v but received %+v", exp, rcv)
}
}

View File

@@ -116,8 +116,10 @@ func (rdr *S3ER) processMessage(body []byte) (err error) {
return
}
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
agReq := agents.NewAgentRequest(
utils.MapStorage(decodedMessage), nil,
utils.MapStorage(decodedMessage), reqVars,
nil, nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,

View File

@@ -70,6 +70,7 @@ func TestS3ER(t *testing.T) {
},
"fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
{"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
],
},
],
@@ -123,8 +124,10 @@ func TestS3ER(t *testing.T) {
ID: ev.cgrEvent.ID,
Time: ev.cgrEvent.Time,
Event: map[string]any{
"CGRID": randomCGRID,
"CGRID": randomCGRID,
"ReaderID": "s3",
},
APIOpts: map[string]any{},
}
if !reflect.DeepEqual(ev.cgrEvent, expected) {
t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent))

View File

@@ -222,8 +222,9 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) {
}
func (rdr *SQLEventReader) processMessage(msg map[string]any) (err error) {
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
agReq := agents.NewAgentRequest(
utils.MapStorage(msg), nil,
utils.MapStorage(msg), reqVars,
nil, nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,

View File

@@ -110,6 +110,7 @@ func testSQLInitConfig(t *testing.T) {
"flags": [], // flags to influence the event processing
"fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
{"tag": "CGRID", "type": "*composed", "value": "~*req.cgrid", "path": "*cgreq.CGRID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
],
},
],
@@ -252,7 +253,8 @@ func testSQLReader(t *testing.T) {
ID: ev.cgrEvent.ID,
Time: ev.cgrEvent.Time,
Event: map[string]any{
"CGRID": cdr.CGRID,
"CGRID": cdr.CGRID,
"ReaderID": "mysql",
},
APIOpts: map[string]any{},
}
@@ -286,7 +288,8 @@ func testSQLReader2(t *testing.T) {
ID: ev.cgrEvent.ID,
Time: ev.cgrEvent.Time,
Event: map[string]any{
"CGRID": cdr.CGRID,
"CGRID": cdr.CGRID,
"ReaderID": "mysql",
},
APIOpts: map[string]any{},
}

View File

@@ -104,8 +104,10 @@ func (rdr *SQSER) processMessage(body []byte) (err error) {
return
}
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
agReq := agents.NewAgentRequest(
utils.MapStorage(decodedMessage), nil,
utils.MapStorage(decodedMessage), reqVars,
nil, nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,

View File

@@ -69,6 +69,7 @@ func TestSQSER(t *testing.T) {
},
"fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
{"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
],
},
],
@@ -121,8 +122,10 @@ func TestSQSER(t *testing.T) {
ID: ev.cgrEvent.ID,
Time: ev.cgrEvent.Time,
Event: map[string]any{
"CGRID": randomCGRID,
"CGRID": randomCGRID,
"ReaderID": "sqs",
},
APIOpts: map[string]any{},
}
if !reflect.DeepEqual(ev.cgrEvent, expected) {
t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent))