From ba91a3dbbd2845373b114cd34d58b286baf2afda Mon Sep 17 00:00:00 2001 From: TeoV Date: Fri, 31 Jul 2020 13:30:31 +0300 Subject: [PATCH] Add support for EEs for inflate templates ( + integration test ) --- config/eescfg.go | 13 ++- data/conf/samples/ees/cgrates.json | 32 +++++++ ees/ees.go | 2 +- ees/filecsv_it_test.go | 138 +++++++++++++++++++++++++++++ ees/lib_test.go | 3 +- services/ees.go | 11 ++- 6 files changed, 191 insertions(+), 8 deletions(-) diff --git a/config/eescfg.go b/config/eescfg.go index 38fa42028..ed5a83bec 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -69,10 +69,10 @@ func (eeS *EEsCfg) loadFromJsonCfg(jsnCfg *EEsJsonCfg, sep string, dfltExpCfg *E } } } - return eeS.appendEEsExporters(jsnCfg.Exporters, sep, dfltExpCfg) + return eeS.appendEEsExporters(jsnCfg.Exporters, eeS.Templates, sep, dfltExpCfg) } -func (eeS *EEsCfg) appendEEsExporters(exporters *[]*EventExporterJsonCfg, separator string, dfltExpCfg *EventExporterCfg) (err error) { +func (eeS *EEsCfg) appendEEsExporters(exporters *[]*EventExporterJsonCfg, msgTemplates map[string][]*FCTemplate, separator string, dfltExpCfg *EventExporterCfg) (err error) { if exporters == nil { return } @@ -92,7 +92,7 @@ func (eeS *EEsCfg) appendEEsExporters(exporters *[]*EventExporterJsonCfg, separa } } - if err := exp.loadFromJsonCfg(jsnExp, separator); err != nil { + if err := exp.loadFromJsonCfg(jsnExp, msgTemplates, separator); err != nil { return err } if !haveID { @@ -149,7 +149,7 @@ type EventExporterCfg struct { trailerFields []*FCTemplate } -func (eeC *EventExporterCfg) loadFromJsonCfg(jsnEec *EventExporterJsonCfg, separator string) (err error) { +func (eeC *EventExporterCfg) loadFromJsonCfg(jsnEec *EventExporterJsonCfg, msgTemplates map[string][]*FCTemplate, separator string) (err error) { if jsnEec == nil { return } @@ -205,6 +205,11 @@ func (eeC *EventExporterCfg) loadFromJsonCfg(jsnEec *EventExporterJsonCfg, separ if err != nil { return } + if tpls, err := InflateTemplates(eeC.Fields, msgTemplates); err != nil { + return err + } else if tpls != nil { + eeC.Fields = tpls + } for _, field := range eeC.Fields { switch field.GetPathSlice()[0] { case utils.MetaHdr: diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json index 258024e1f..7ac69b6c0 100644 --- a/data/conf/samples/ees/cgrates.json +++ b/data/conf/samples/ees/cgrates.json @@ -61,6 +61,24 @@ "*file_csv": {"limit": -1, "ttl": "500ms", "static_ttl": false}, "*file_fwv": {"limit": -1, "ttl": "500ms", "static_ttl": false} }, + "templates": { + "requiredFields": [ + {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, + {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, + {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, + {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, + {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, + {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"}, + {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, + {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, + {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, + {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"} + ], + }, "exporters": [ { "id": "CSVExporter", @@ -337,6 +355,20 @@ {"tag": "SupplierRun","filters": ["*exists:~*uch.<~*req.CGRID;~*req.RunID;-RunID>:"], "path": "*exp.SupplierRun", "type": "*variable", "value": "~*uch.<`~*req.CGRID;~*req.RunID;-RunID`>"}, ], + }, + { + "id": "CSVExporterWIthTemplate", + "type": "*file_csv", + "export_path": "/tmp/testCSVExpTemp", + "tenant": "cgrates.org", + "flags": ["*attributes"], + "attribute_context": "customContext", + "attempts": 1, + "field_separator": ",", + "filters": ["*string:~*req.ExporterUsed:CSVExporterWIthTemplate"], + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, + ], } ] }, diff --git a/ees/ees.go b/ees/ees.go index 1624a4899..21010b616 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -37,7 +37,7 @@ func onCacheEvicted(itmID string, value interface{}) { // NewEventExporterS instantiates the EventExporterS func NewEventExporterS(cfg *config.CGRConfig, filterS *engine.FilterS, - connMgr *engine.ConnManager) (eeS *EventExporterS) { + connMgr *engine.ConnManager) (eeS *EventExporterS, err error) { eeS = &EventExporterS{ cfg: cfg, filterS: filterS, diff --git a/ees/filecsv_it_test.go b/ees/filecsv_it_test.go index 90f0c0ee7..283fe895e 100644 --- a/ees/filecsv_it_test.go +++ b/ees/filecsv_it_test.go @@ -56,6 +56,8 @@ var ( testCsvVerifyComposedExports, testCsvExportMaskedDestination, testCsvVerifyMaskedDestination, + testCsvExportEventWithInflateTemplate, + testCsvVerifyExportsWithInflateTemplate, testStopCgrEngine, testCleanDirectory, } @@ -405,3 +407,139 @@ func testCsvVerifyMaskedDestination(t *testing.T) { t.Errorf("Expecting: \n<%q>, \nreceived: \n<%q>", eCnt, string(outContent1)) } } + +func testCsvExportEventWithInflateTemplate(t *testing.T) { + eventVoice := &utils.CGREventWithIDs{ + CGREventWithOpts: &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "voiceEvent", + Time: utils.TimePointer(time.Now()), + Event: map[string]interface{}{ + utils.CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()), + utils.ToR: utils.VOICE, + utils.OriginID: "dsafdsaf", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.META_RATED, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: time.Duration(10) * time.Second, + utils.RunID: utils.MetaDefault, + utils.Cost: 1.01, + "ExporterUsed": "CSVExporterWIthTemplate", + "ExtraFields": map[string]string{"extra1": "val_extra1", + "extra2": "val_extra2", "extra3": "val_extra3"}, + }, + }, + }, + } + + eventData := &utils.CGREventWithIDs{ + CGREventWithOpts: &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "dataEvent", + Time: utils.TimePointer(time.Now()), + Event: map[string]interface{}{ + utils.CGRID: utils.Sha1("abcdef", time.Unix(1383813745, 0).UTC().String()), + utils.ToR: utils.DATA, + utils.OriginID: "abcdef", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.META_RATED, + utils.Tenant: "AnotherTenant", + utils.Category: "call", //for data CDR use different Tenant + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: time.Duration(10) * time.Nanosecond, + utils.RunID: utils.MetaDefault, + utils.Cost: 0.012, + "ExporterUsed": "CSVExporterWIthTemplate", + "ExtraFields": map[string]string{"extra1": "val_extra1", + "extra2": "val_extra2", "extra3": "val_extra3"}, + }, + }, + }, + } + + eventSMS := &utils.CGREventWithIDs{ + CGREventWithOpts: &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "SMSEvent", + Time: utils.TimePointer(time.Now()), + Event: map[string]interface{}{ + utils.CGRID: utils.Sha1("sdfwer", time.Unix(1383813745, 0).UTC().String()), + utils.ToR: utils.SMS, + utils.OriginID: "sdfwer", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.META_RATED, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: time.Duration(1), + utils.RunID: utils.MetaDefault, + utils.Cost: 0.15, + "ExporterUsed": "CSVExporterWIthTemplate", + "ExtraFields": map[string]string{"extra1": "val_extra1", + "extra2": "val_extra2", "extra3": "val_extra3"}, + }, + }, + }, + } + var reply string + if err := csvRpc.Call(utils.EventExporterSv1ProcessEvent, eventVoice, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expected %+v, received: %+v", utils.OK, reply) + } + if err := csvRpc.Call(utils.EventExporterSv1ProcessEvent, eventData, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expected %+v, received: %+v", utils.OK, reply) + } + if err := csvRpc.Call(utils.EventExporterSv1ProcessEvent, eventSMS, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expected %+v, received: %+v", utils.OK, reply) + } + time.Sleep(1 * time.Second) +} + +func testCsvVerifyExportsWithInflateTemplate(t *testing.T) { + var files []string + err := filepath.Walk("/tmp/testCSVExpTemp/", func(path string, info os.FileInfo, err error) error { + if strings.HasSuffix(path, utils.CSVSuffix) { + files = append(files, path) + } + return nil + }) + if err != nil { + t.Error(err) + } + if len(files) != 1 { + t.Errorf("Expected %+v, received: %+v", 1, len(files)) + } + eCnt := "dbafe9c8614c785a65aabd116dd3959c3c56f7f6,*default,*voice,dsafdsaf,*rated,cgrates.org,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10000000000,1.01" + + "\n" + + "ea1f1968cc207859672c332364fc7614c86b04c5,*default,*data,abcdef,*rated,AnotherTenant,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10,0.012" + + "\n" + + "2478e9f18ebcd3c684f3c14596b8bfeab2b0d6d4,*default,*sms,sdfwer,*rated,cgrates.org,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,1,0.15" + + "\n" + if outContent1, err := ioutil.ReadFile(files[0]); err != nil { + t.Error(err) + } else if eCnt != string(outContent1) { + t.Errorf("Expecting: \n<%q>, \nreceived: \n<%q>", eCnt, string(outContent1)) + } +} diff --git a/ees/lib_test.go b/ees/lib_test.go index a78730eea..0e504ccd3 100644 --- a/ees/lib_test.go +++ b/ees/lib_test.go @@ -49,7 +49,8 @@ func newRPCClient(cfg *config.ListenCfg) (c *rpc.Client, err error) { } } -var exportPath = []string{"/tmp/testCSV", "/tmp/testComposedCSV", "/tmp/testFWV", "/tmp/testCSVMasked", "/tmp/testCSVfromVirt"} +var exportPath = []string{"/tmp/testCSV", "/tmp/testComposedCSV", "/tmp/testFWV", "/tmp/testCSVMasked", + "/tmp/testCSVfromVirt", "/tmp/testCSVExpTemp"} func testCreateDirectory(t *testing.T) { for _, dir := range exportPath { diff --git a/services/ees.go b/services/ees.go index 60d4dc87c..624c700d2 100644 --- a/services/ees.go +++ b/services/ees.go @@ -110,8 +110,15 @@ func (es *EventExporterService) Start() (err error) { utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.EventExporterS)) es.Lock() - es.eeS = ees.NewEventExporterS(es.cfg, fltrS, es.connMgr) - es.Unlock() + defer es.Unlock() + + es.eeS, err = ees.NewEventExporterS(es.cfg, fltrS, es.connMgr) + if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", + utils.EventExporterS, err)) + return + } + es.rpc = v1.NewEventExporterSv1(es.eeS) if !es.cfg.DispatcherSCfg().Enabled { es.server.RpcRegister(es.rpc)