From e7c24c9d0f20f32f19f84c67f922ba8a56c59732 Mon Sep 17 00:00:00 2001 From: TeoV Date: Fri, 31 Jul 2020 17:09:36 +0300 Subject: [PATCH] Add support for inflate templates in ERs ( + integration test) --- config/erscfg.go | 18 ++++++++--- data/conf/samples/ers_internal/cgrates.json | 30 +++++++++++++++++ data/conf/samples/ers_mongo/cgrates.json | 30 +++++++++++++++++ data/conf/samples/ers_mysql/cgrates.json | 30 +++++++++++++++++ data/conf/samples/ers_postgres/cgrates.json | 30 +++++++++++++++++ ers/filecsv_it_test.go | 36 +++++++++++++++++++++ ers/lib_test.go | 5 +-- 7 files changed, 173 insertions(+), 6 deletions(-) diff --git a/config/erscfg.go b/config/erscfg.go index 38d9d7fae..6ce875ee4 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -60,10 +60,10 @@ func (erS *ERsCfg) loadFromJsonCfg(jsnCfg *ERsJsonCfg, sep string, dfltRdrCfg *E } } } - return erS.appendERsReaders(jsnCfg.Readers, sep, dfltRdrCfg) + return erS.appendERsReaders(jsnCfg.Readers, erS.Templates, sep, dfltRdrCfg) } -func (ers *ERsCfg) appendERsReaders(jsnReaders *[]*EventReaderJsonCfg, sep string, +func (ers *ERsCfg) appendERsReaders(jsnReaders *[]*EventReaderJsonCfg, msgTemplates map[string][]*FCTemplate, sep string, dfltRdrCfg *EventReaderCfg) (err error) { if jsnReaders == nil { return @@ -84,7 +84,7 @@ func (ers *ERsCfg) appendERsReaders(jsnReaders *[]*EventReaderJsonCfg, sep strin } } - if err := rdr.loadFromJsonCfg(jsnReader, sep); err != nil { + if err := rdr.loadFromJsonCfg(jsnReader, msgTemplates, sep); err != nil { return err } if !haveID { @@ -144,7 +144,7 @@ type EventReaderCfg struct { CacheDumpFields []*FCTemplate } -func (er *EventReaderCfg) loadFromJsonCfg(jsnCfg *EventReaderJsonCfg, sep string) (err error) { +func (er *EventReaderCfg) loadFromJsonCfg(jsnCfg *EventReaderJsonCfg, msgTemplates map[string][]*FCTemplate, sep string) (err error) { if jsnCfg == nil { return } @@ -212,11 +212,21 @@ func (er *EventReaderCfg) loadFromJsonCfg(jsnCfg *EventReaderJsonCfg, sep string if er.Fields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnCfg.Fields, sep); err != nil { return err } + if tpls, err := InflateTemplates(er.Fields, msgTemplates); err != nil { + return err + } else if tpls != nil { + er.Fields = tpls + } } if jsnCfg.Cache_dump_fields != nil { if er.CacheDumpFields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnCfg.Cache_dump_fields, sep); err != nil { return err } + if tpls, err := InflateTemplates(er.CacheDumpFields, msgTemplates); err != nil { + return err + } else if tpls != nil { + er.CacheDumpFields = tpls + } } return } diff --git a/data/conf/samples/ers_internal/cgrates.json b/data/conf/samples/ers_internal/cgrates.json index e357dd53f..a6f93afd5 100644 --- a/data/conf/samples/ers_internal/cgrates.json +++ b/data/conf/samples/ers_internal/cgrates.json @@ -77,6 +77,22 @@ "ers": { "enabled": true, "sessions_conns": ["*internal"], + "templates": { + "reqFields": [ + {"tag": "Source", "path": "*cgreq.Source", "type": "*constant", "value": "ers_template_combined", "mandatory": true}, + {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.2", "mandatory": true}, + {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.3", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.4", "mandatory": true}, + {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.6", "mandatory": true}, + {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.7", "mandatory": true}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.8", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.9", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.10", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.11", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.12", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.13", "mandatory": true} + ], + }, "readers": [ { "id": "file_reader1", @@ -370,6 +386,20 @@ {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "mandatory": true}, {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.Usage", "mandatory": true}, ], + }, + { + "id": "readerWithTemplate", + "run_delay": "-1", + "field_separator": ",", + "type": "*file_csv", + "source_path": "/tmp/readerWithTemplate/in", + "flags": ["*cdrs","*log"], + "processed_path": "/tmp/readerWithTemplate/out", + "fields":[ + {"tag": "reqFields","type": "*template", "value": "reqFields"}, + {"tag": "ExtraInfo1", "path": "*cgreq.ExtraInfo1", "type": "*constant", "value": "ExtraInfo1"}, + {"tag": "ExtraInfo2", "path": "*cgreq.ExtraInfo2", "type": "*constant", "value": "ExtraInfo2"}, + ], } ], }, diff --git a/data/conf/samples/ers_mongo/cgrates.json b/data/conf/samples/ers_mongo/cgrates.json index 28d20009f..da34efe55 100644 --- a/data/conf/samples/ers_mongo/cgrates.json +++ b/data/conf/samples/ers_mongo/cgrates.json @@ -78,6 +78,22 @@ "ers": { "enabled": true, "sessions_conns": ["*internal"], + "templates": { + "reqFields": [ + {"tag": "Source", "path": "*cgreq.Source", "type": "*constant", "value": "ers_template_combined", "mandatory": true}, + {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.2", "mandatory": true}, + {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.3", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.4", "mandatory": true}, + {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.6", "mandatory": true}, + {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.7", "mandatory": true}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.8", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.9", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.10", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.11", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.12", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.13", "mandatory": true} + ], + }, "readers": [ { "id": "file_reader1", @@ -371,6 +387,20 @@ {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "mandatory": true}, {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.Usage", "mandatory": true}, ], + }, + { + "id": "readerWithTemplate", + "run_delay": "-1", + "field_separator": ",", + "type": "*file_csv", + "source_path": "/tmp/readerWithTemplate/in", + "flags": ["*cdrs","*log"], + "processed_path": "/tmp/readerWithTemplate/out", + "fields":[ + {"tag": "reqFields","type": "*template", "value": "reqFields"}, + {"tag": "ExtraInfo1", "path": "*cgreq.ExtraInfo1", "type": "*constant", "value": "ExtraInfo1"}, + {"tag": "ExtraInfo2", "path": "*cgreq.ExtraInfo2", "type": "*constant", "value": "ExtraInfo2"}, + ], } ] }, diff --git a/data/conf/samples/ers_mysql/cgrates.json b/data/conf/samples/ers_mysql/cgrates.json index ba0f86c32..4e31e172b 100644 --- a/data/conf/samples/ers_mysql/cgrates.json +++ b/data/conf/samples/ers_mysql/cgrates.json @@ -75,6 +75,22 @@ "ers": { "enabled": true, "sessions_conns": ["*internal"], + "templates": { + "reqFields": [ + {"tag": "Source", "path": "*cgreq.Source", "type": "*constant", "value": "ers_template_combined", "mandatory": true}, + {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.2", "mandatory": true}, + {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.3", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.4", "mandatory": true}, + {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.6", "mandatory": true}, + {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.7", "mandatory": true}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.8", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.9", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.10", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.11", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.12", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.13", "mandatory": true} + ], + }, "readers": [ { "id": "file_reader1", @@ -368,6 +384,20 @@ {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "mandatory": true}, {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.Usage", "mandatory": true}, ], + }, + { + "id": "readerWithTemplate", + "run_delay": "-1", + "field_separator": ",", + "type": "*file_csv", + "source_path": "/tmp/readerWithTemplate/in", + "flags": ["*cdrs","*log"], + "processed_path": "/tmp/readerWithTemplate/out", + "fields":[ + {"tag": "reqFields","type": "*template", "value": "reqFields"}, + {"tag": "ExtraInfo1", "path": "*cgreq.ExtraInfo1", "type": "*constant", "value": "ExtraInfo1"}, + {"tag": "ExtraInfo2", "path": "*cgreq.ExtraInfo2", "type": "*constant", "value": "ExtraInfo2"}, + ], } ] }, diff --git a/data/conf/samples/ers_postgres/cgrates.json b/data/conf/samples/ers_postgres/cgrates.json index ba7f0b2c9..488c07cb8 100644 --- a/data/conf/samples/ers_postgres/cgrates.json +++ b/data/conf/samples/ers_postgres/cgrates.json @@ -72,6 +72,22 @@ "ers": { "enabled": true, "sessions_conns": ["*internal"], + "templates": { + "reqFields": [ + {"tag": "Source", "path": "*cgreq.Source", "type": "*constant", "value": "ers_template_combined", "mandatory": true}, + {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.2", "mandatory": true}, + {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.3", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.4", "mandatory": true}, + {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.6", "mandatory": true}, + {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.7", "mandatory": true}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.8", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.9", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.10", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.11", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.12", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.13", "mandatory": true} + ], + }, "readers": [ { "id": "file_reader1", @@ -365,6 +381,20 @@ {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "mandatory": true}, {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.Usage", "mandatory": true}, ], + }, + { + "id": "readerWithTemplate", + "run_delay": "-1", + "field_separator": ",", + "type": "*file_csv", + "source_path": "/tmp/readerWithTemplate/in", + "flags": ["*cdrs","*log"], + "processed_path": "/tmp/readerWithTemplate/out", + "fields":[ + {"tag": "reqFields","type": "*template", "value": "reqFields"}, + {"tag": "ExtraInfo1", "path": "*cgreq.ExtraInfo1", "type": "*constant", "value": "ExtraInfo1"}, + {"tag": "ExtraInfo2", "path": "*cgreq.ExtraInfo2", "type": "*constant", "value": "ExtraInfo2"}, + ], } ] }, diff --git a/ers/filecsv_it_test.go b/ers/filecsv_it_test.go index 9a1638862..25c2d2fe1 100644 --- a/ers/filecsv_it_test.go +++ b/ers/filecsv_it_test.go @@ -74,6 +74,8 @@ accid23;*rated;cgrates.org;1001;086517174963;2013-02-03 19:54:00;26;val_extra3;" testCsvITProcessFilteredCDR, testCsvITAnalyzeFilteredCDR, testCsvITProcessedFiles, + testCsvITReaderWithFilter, + testCsvITAnalyzeReaderWithFilter, testCleanupFiles, testCsvITKillEngine, } @@ -330,6 +332,40 @@ func testCsvITProcessedFiles(t *testing.T) { } } +func testCsvITReaderWithFilter(t *testing.T) { + fileName := "file1.csv" + tmpFilePath := path.Join("/tmp", fileName) + if err := ioutil.WriteFile(tmpFilePath, []byte(fileContent1), 0644); err != nil { + t.Fatal(err.Error()) + } + if err := os.Rename(tmpFilePath, path.Join("/tmp/readerWithTemplate/in", fileName)); err != nil { + t.Fatal("Error moving file to processing directory: ", err) + } +} + +func testCsvITAnalyzeReaderWithFilter(t *testing.T) { + time.Sleep(500 * time.Millisecond) + + var cdrs []*engine.CDR + args := &utils.RPCCDRsFilterWithOpts{ + RPCCDRsFilter: &utils.RPCCDRsFilter{ + NotRunIDs: []string{"CustomerCharges", "SupplierCharges"}, + Sources: []string{"ers_template_combined"}, + }, + } + if err := csvRPC.Call(utils.CDRsV1GetCDRs, args, &cdrs); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(cdrs) != 2 { + t.Error("Unexpected number of CDRs returned: ", utils.ToJSON(cdrs)) + } else if cdrs[0].Account != "1001" || cdrs[1].Account != "1001" { + t.Errorf("Expecting: 1001, received: <%s> , <%s>", cdrs[0].Account, cdrs[1].Account) + } else if cdrs[0].Tenant != "cgrates.org" || cdrs[1].Tenant != "cgrates.org" { + t.Errorf("Expecting: itsyscom.com, received: <%s> , <%s>", cdrs[0].Tenant, cdrs[1].Tenant) + } else if cdrs[0].ExtraFields["ExtraInfo1"] != "ExtraInfo1" || cdrs[1].ExtraFields["ExtraInfo1"] != "ExtraInfo1" { + t.Errorf("Expecting: itsyscom.com, received: <%s> , <%s>", cdrs[1].ExtraFields["ExtraInfo1"], cdrs[1].ExtraFields["ExtraInfo1"]) + } +} + func testCsvITKillEngine(t *testing.T) { if err := engine.KillEngine(*waitRater); err != nil { t.Error(err) diff --git a/ers/lib_test.go b/ers/lib_test.go index a91e0062a..8b9f18a6f 100644 --- a/ers/lib_test.go +++ b/ers/lib_test.go @@ -55,7 +55,8 @@ func testCreateDirs(t *testing.T) { "/tmp/cdrs/out", "/tmp/ers_with_filters/in", "/tmp/ers_with_filters/out", "/tmp/xmlErs/in", "/tmp/xmlErs/out", "/tmp/fwvErs/in", "/tmp/fwvErs/out", "/tmp/partErs1/in", "/tmp/partErs1/out", "/tmp/partErs2/in", "/tmp/partErs2/out", - "/tmp/flatstoreErs/in", "/tmp/flatstoreErs/out", "/tmp/ErsJSON/in", "/tmp/ErsJSON/out"} { + "/tmp/flatstoreErs/in", "/tmp/flatstoreErs/out", "/tmp/ErsJSON/in", "/tmp/ErsJSON/out", + "/tmp/readerWithTemplate/in", "/tmp/readerWithTemplate/out"} { if err := os.RemoveAll(dir); err != nil { t.Fatal("Error removing folder: ", dir, err) } @@ -70,7 +71,7 @@ func testCleanupFiles(t *testing.T) { "/tmp/ers2", "/tmp/init_session", "/tmp/terminate_session", "/tmp/cdrs", "/tmp/ers_with_filters", "/tmp/xmlErs", "/tmp/fwvErs", "/tmp/partErs1", "/tmp/partErs2", "tmp/flatstoreErs", - "/tmp/ErsJSON"} { + "/tmp/ErsJSON", "/tmp/readerWithTemplate"} { if err := os.RemoveAll(dir); err != nil { t.Fatal("Error removing folder: ", dir, err) }