Add support for EEs for inflate templates ( + integration test )

This commit is contained in:
TeoV
2020-07-31 13:30:31 +03:00
committed by Dan Christian Bogos
parent 35fc45645d
commit ba91a3dbbd
6 changed files with 191 additions and 8 deletions

View File

@@ -69,10 +69,10 @@ func (eeS *EEsCfg) loadFromJsonCfg(jsnCfg *EEsJsonCfg, sep string, dfltExpCfg *E
}
}
}
return eeS.appendEEsExporters(jsnCfg.Exporters, sep, dfltExpCfg)
return eeS.appendEEsExporters(jsnCfg.Exporters, eeS.Templates, sep, dfltExpCfg)
}
func (eeS *EEsCfg) appendEEsExporters(exporters *[]*EventExporterJsonCfg, separator string, dfltExpCfg *EventExporterCfg) (err error) {
func (eeS *EEsCfg) appendEEsExporters(exporters *[]*EventExporterJsonCfg, msgTemplates map[string][]*FCTemplate, separator string, dfltExpCfg *EventExporterCfg) (err error) {
if exporters == nil {
return
}
@@ -92,7 +92,7 @@ func (eeS *EEsCfg) appendEEsExporters(exporters *[]*EventExporterJsonCfg, separa
}
}
if err := exp.loadFromJsonCfg(jsnExp, separator); err != nil {
if err := exp.loadFromJsonCfg(jsnExp, msgTemplates, separator); err != nil {
return err
}
if !haveID {
@@ -149,7 +149,7 @@ type EventExporterCfg struct {
trailerFields []*FCTemplate
}
func (eeC *EventExporterCfg) loadFromJsonCfg(jsnEec *EventExporterJsonCfg, separator string) (err error) {
func (eeC *EventExporterCfg) loadFromJsonCfg(jsnEec *EventExporterJsonCfg, msgTemplates map[string][]*FCTemplate, separator string) (err error) {
if jsnEec == nil {
return
}
@@ -205,6 +205,11 @@ func (eeC *EventExporterCfg) loadFromJsonCfg(jsnEec *EventExporterJsonCfg, separ
if err != nil {
return
}
if tpls, err := InflateTemplates(eeC.Fields, msgTemplates); err != nil {
return err
} else if tpls != nil {
eeC.Fields = tpls
}
for _, field := range eeC.Fields {
switch field.GetPathSlice()[0] {
case utils.MetaHdr:

View File

@@ -61,6 +61,24 @@
"*file_csv": {"limit": -1, "ttl": "500ms", "static_ttl": false},
"*file_fwv": {"limit": -1, "ttl": "500ms", "static_ttl": false}
},
"templates": {
"requiredFields": [
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"},
{"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"},
{"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"},
{"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"},
{"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"},
{"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"},
{"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"},
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
{"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"},
{"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"},
{"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"},
{"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}
],
},
"exporters": [
{
"id": "CSVExporter",
@@ -337,6 +355,20 @@
{"tag": "SupplierRun","filters": ["*exists:~*uch.<~*req.CGRID;~*req.RunID;-RunID>:"],
"path": "*exp.SupplierRun", "type": "*variable", "value": "~*uch.<`~*req.CGRID;~*req.RunID;-RunID`>"},
],
},
{
"id": "CSVExporterWIthTemplate",
"type": "*file_csv",
"export_path": "/tmp/testCSVExpTemp",
"tenant": "cgrates.org",
"flags": ["*attributes"],
"attribute_context": "customContext",
"attempts": 1,
"field_separator": ",",
"filters": ["*string:~*req.ExporterUsed:CSVExporterWIthTemplate"],
"fields":[
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"},
],
}
]
},

View File

@@ -37,7 +37,7 @@ func onCacheEvicted(itmID string, value interface{}) {
// NewEventExporterS instantiates the EventExporterS
func NewEventExporterS(cfg *config.CGRConfig, filterS *engine.FilterS,
connMgr *engine.ConnManager) (eeS *EventExporterS) {
connMgr *engine.ConnManager) (eeS *EventExporterS, err error) {
eeS = &EventExporterS{
cfg: cfg,
filterS: filterS,

View File

@@ -56,6 +56,8 @@ var (
testCsvVerifyComposedExports,
testCsvExportMaskedDestination,
testCsvVerifyMaskedDestination,
testCsvExportEventWithInflateTemplate,
testCsvVerifyExportsWithInflateTemplate,
testStopCgrEngine,
testCleanDirectory,
}
@@ -405,3 +407,139 @@ func testCsvVerifyMaskedDestination(t *testing.T) {
t.Errorf("Expecting: \n<%q>, \nreceived: \n<%q>", eCnt, string(outContent1))
}
}
func testCsvExportEventWithInflateTemplate(t *testing.T) {
eventVoice := &utils.CGREventWithIDs{
CGREventWithOpts: &utils.CGREventWithOpts{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "voiceEvent",
Time: utils.TimePointer(time.Now()),
Event: map[string]interface{}{
utils.CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()),
utils.ToR: utils.VOICE,
utils.OriginID: "dsafdsaf",
utils.OriginHost: "192.168.1.1",
utils.RequestType: utils.META_RATED,
utils.Tenant: "cgrates.org",
utils.Category: "call",
utils.Account: "1001",
utils.Subject: "1001",
utils.Destination: "1002",
utils.SetupTime: time.Unix(1383813745, 0).UTC(),
utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
utils.Usage: time.Duration(10) * time.Second,
utils.RunID: utils.MetaDefault,
utils.Cost: 1.01,
"ExporterUsed": "CSVExporterWIthTemplate",
"ExtraFields": map[string]string{"extra1": "val_extra1",
"extra2": "val_extra2", "extra3": "val_extra3"},
},
},
},
}
eventData := &utils.CGREventWithIDs{
CGREventWithOpts: &utils.CGREventWithOpts{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "dataEvent",
Time: utils.TimePointer(time.Now()),
Event: map[string]interface{}{
utils.CGRID: utils.Sha1("abcdef", time.Unix(1383813745, 0).UTC().String()),
utils.ToR: utils.DATA,
utils.OriginID: "abcdef",
utils.OriginHost: "192.168.1.1",
utils.RequestType: utils.META_RATED,
utils.Tenant: "AnotherTenant",
utils.Category: "call", //for data CDR use different Tenant
utils.Account: "1001",
utils.Subject: "1001",
utils.Destination: "1002",
utils.SetupTime: time.Unix(1383813745, 0).UTC(),
utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
utils.Usage: time.Duration(10) * time.Nanosecond,
utils.RunID: utils.MetaDefault,
utils.Cost: 0.012,
"ExporterUsed": "CSVExporterWIthTemplate",
"ExtraFields": map[string]string{"extra1": "val_extra1",
"extra2": "val_extra2", "extra3": "val_extra3"},
},
},
},
}
eventSMS := &utils.CGREventWithIDs{
CGREventWithOpts: &utils.CGREventWithOpts{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "SMSEvent",
Time: utils.TimePointer(time.Now()),
Event: map[string]interface{}{
utils.CGRID: utils.Sha1("sdfwer", time.Unix(1383813745, 0).UTC().String()),
utils.ToR: utils.SMS,
utils.OriginID: "sdfwer",
utils.OriginHost: "192.168.1.1",
utils.RequestType: utils.META_RATED,
utils.Tenant: "cgrates.org",
utils.Category: "call",
utils.Account: "1001",
utils.Subject: "1001",
utils.Destination: "1002",
utils.SetupTime: time.Unix(1383813745, 0).UTC(),
utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
utils.Usage: time.Duration(1),
utils.RunID: utils.MetaDefault,
utils.Cost: 0.15,
"ExporterUsed": "CSVExporterWIthTemplate",
"ExtraFields": map[string]string{"extra1": "val_extra1",
"extra2": "val_extra2", "extra3": "val_extra3"},
},
},
},
}
var reply string
if err := csvRpc.Call(utils.EventExporterSv1ProcessEvent, eventVoice, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("Expected %+v, received: %+v", utils.OK, reply)
}
if err := csvRpc.Call(utils.EventExporterSv1ProcessEvent, eventData, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("Expected %+v, received: %+v", utils.OK, reply)
}
if err := csvRpc.Call(utils.EventExporterSv1ProcessEvent, eventSMS, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("Expected %+v, received: %+v", utils.OK, reply)
}
time.Sleep(1 * time.Second)
}
func testCsvVerifyExportsWithInflateTemplate(t *testing.T) {
var files []string
err := filepath.Walk("/tmp/testCSVExpTemp/", func(path string, info os.FileInfo, err error) error {
if strings.HasSuffix(path, utils.CSVSuffix) {
files = append(files, path)
}
return nil
})
if err != nil {
t.Error(err)
}
if len(files) != 1 {
t.Errorf("Expected %+v, received: %+v", 1, len(files))
}
eCnt := "dbafe9c8614c785a65aabd116dd3959c3c56f7f6,*default,*voice,dsafdsaf,*rated,cgrates.org,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10000000000,1.01" +
"\n" +
"ea1f1968cc207859672c332364fc7614c86b04c5,*default,*data,abcdef,*rated,AnotherTenant,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10,0.012" +
"\n" +
"2478e9f18ebcd3c684f3c14596b8bfeab2b0d6d4,*default,*sms,sdfwer,*rated,cgrates.org,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,1,0.15" +
"\n"
if outContent1, err := ioutil.ReadFile(files[0]); err != nil {
t.Error(err)
} else if eCnt != string(outContent1) {
t.Errorf("Expecting: \n<%q>, \nreceived: \n<%q>", eCnt, string(outContent1))
}
}

View File

@@ -49,7 +49,8 @@ func newRPCClient(cfg *config.ListenCfg) (c *rpc.Client, err error) {
}
}
var exportPath = []string{"/tmp/testCSV", "/tmp/testComposedCSV", "/tmp/testFWV", "/tmp/testCSVMasked", "/tmp/testCSVfromVirt"}
var exportPath = []string{"/tmp/testCSV", "/tmp/testComposedCSV", "/tmp/testFWV", "/tmp/testCSVMasked",
"/tmp/testCSVfromVirt", "/tmp/testCSVExpTemp"}
func testCreateDirectory(t *testing.T) {
for _, dir := range exportPath {

View File

@@ -110,8 +110,15 @@ func (es *EventExporterService) Start() (err error) {
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.EventExporterS))
es.Lock()
es.eeS = ees.NewEventExporterS(es.cfg, fltrS, es.connMgr)
es.Unlock()
defer es.Unlock()
es.eeS, err = ees.NewEventExporterS(es.cfg, fltrS, es.connMgr)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!",
utils.EventExporterS, err))
return
}
es.rpc = v1.NewEventExporterSv1(es.eeS)
if !es.cfg.DispatcherSCfg().Enabled {
es.server.RpcRegister(es.rpc)