From 7f9b23d4229cde20b9860139db21d3267d362964 Mon Sep 17 00:00:00 2001 From: TeoV Date: Wed, 8 Aug 2018 06:09:48 -0400 Subject: [PATCH] Add for cdrc xml new filters --- cdrc/csv.go | 6 +- cdrc/fwv_it_test.go | 1 + cdrc/xml.go | 87 ++++++++++++++++--- cdrc/xml_it_test.go | 86 ++++++++++++++++++ cdrc/xml_test.go | 74 ++++++++++++++++ data/conf/samples/cdrcfwv/cgrates.json | 2 +- .../samples/cdrcxmlwithfilter/cgrates.json | 47 ++++++++++ 7 files changed, 286 insertions(+), 17 deletions(-) create mode 100755 data/conf/samples/cdrcxmlwithfilter/cgrates.json diff --git a/cdrc/csv.go b/cdrc/csv.go index 1d8832e65..05058fe50 100644 --- a/cdrc/csv.go +++ b/cdrc/csv.go @@ -123,9 +123,9 @@ func (self *CsvRecordsProcessor) processRecord(record []string) ([]*engine.CDR, continue } } else { - csvprovider, _ := newCsvProvider(record) + csvProvider, _ := newCsvProvider(record) if pass, err := self.filterS.Pass("cgrates.org", - cdrcCfg.Filters, csvprovider); err != nil || !pass { + cdrcCfg.Filters, csvProvider); err != nil || !pass { continue // Not passes filters, ignore this CDR } } @@ -249,7 +249,7 @@ func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcCfg *con return storedCdr, nil } -// newRADataProvider constructs a DataProvider +// newCsvProvider constructs a DataProvider func newCsvProvider(record []string) (dP engine.DataProvider, err error) { dP = &csvProvider{req: record, cache: engine.NewNavigableMap(nil)} return diff --git a/cdrc/fwv_it_test.go b/cdrc/fwv_it_test.go index edcaa2778..c312c6222 100644 --- a/cdrc/fwv_it_test.go +++ b/cdrc/fwv_it_test.go @@ -19,6 +19,7 @@ along with this program. If not, see */ package cdrc +// import ( "io/ioutil" "net/rpc" diff --git a/cdrc/xml.go b/cdrc/xml.go index 47f3c16ce..fb01e518d 100644 --- a/cdrc/xml.go +++ b/cdrc/xml.go @@ -132,21 +132,29 @@ func (xmlProc *XMLRecordsProcessor) ProcessNextRecord() (cdrs []*engine.CDR, err cdrXML := xmlProc.cdrXmlElmts[xmlProc.procItems] xmlProc.procItems += 1 for _, cdrcCfg := range xmlProc.cdrcCfgs { - filtersPassing := true - for _, rsrFltr := range cdrcCfg.CdrFilter { - if rsrFltr == nil { - continue // Pass + if len(cdrcCfg.Filters) == 0 { + filtersPassing := true + for _, rsrFltr := range cdrcCfg.CdrFilter { + if rsrFltr == nil { + continue // Pass + } + absolutePath := utils.ParseHierarchyPath(rsrFltr.Id, "") + relPath := utils.HierarchyPath(absolutePath[len(xmlProc.cdrPath)-1:]) // Need relative path to the xmlElmnt + fieldVal, _ := elementText(cdrXML, relPath.AsString("/", true)) + if _, err := rsrFltr.Parse(fieldVal); err != nil { + filtersPassing = false + break + } } - absolutePath := utils.ParseHierarchyPath(rsrFltr.Id, "") - relPath := utils.HierarchyPath(absolutePath[len(xmlProc.cdrPath)-1:]) // Need relative path to the xmlElmnt - fieldVal, _ := elementText(cdrXML, relPath.AsString("/", true)) - if _, err := rsrFltr.Parse(fieldVal); err != nil { - filtersPassing = false - break + if !filtersPassing { + continue + } + } else { + xmlProvider, _ := newXmlProvider(cdrXML, xmlProc.cdrPath) + if pass, err := xmlProc.filterS.Pass("cgrates.org", + cdrcCfg.Filters, xmlProvider); err != nil || !pass { + continue // Not passes filters, ignore this CDR } - } - if !filtersPassing { - continue } if cdr, err := xmlProc.recordToCDR(cdrXML, cdrcCfg); err != nil { return nil, fmt.Errorf(" Failed converting to CDR, error: %s", err.Error()) @@ -236,3 +244,56 @@ func (xmlProc *XMLRecordsProcessor) recordToCDR(xmlEntity tree.Res, cdrcCfg *con } return cdr, nil } + +// newXmlProvider constructs a DataProvider +func newXmlProvider(req tree.Res, cdrPath utils.HierarchyPath) (dP engine.DataProvider, err error) { + dP = &xmlProvider{req: req, cdrPath: cdrPath, cache: engine.NewNavigableMap(nil)} + return +} + +// xmlProvider implements engine.DataProvider so we can pass it to filters +type xmlProvider struct { + req tree.Res + cdrPath utils.HierarchyPath //used to compute relative path + cache *engine.NavigableMap +} + +// String is part of engine.DataProvider interface +// when called, it will display the already parsed values out of cache +func (xP *xmlProvider) String() string { + return utils.ToJSON(xP) +} + +// FieldAsInterface is part of engine.DataProvider interface +func (xP *xmlProvider) FieldAsInterface(fldPath []string) (data interface{}, err error) { + if len(fldPath) != 1 { + return nil, utils.ErrNotFound + } + if data, err = xP.cache.FieldAsInterface(fldPath); err == nil || + err != utils.ErrNotFound { // item found in cache + return + } + err = nil // cancel previous err + absolutePath := utils.ParseHierarchyPath(fldPath[0], "") + relPath := utils.HierarchyPath(absolutePath[len(xP.cdrPath)-1:]) // Need relative path to the xmlElmnt + data, err = elementText(xP.req, relPath.AsString("/", true)) + xP.cache.Set(fldPath, data, false) + return +} + +// FieldAsString is part of engine.DataProvider interface +func (xP *xmlProvider) FieldAsString(fldPath []string) (data string, err error) { + var valIface interface{} + valIface, err = xP.FieldAsInterface(fldPath) + if err != nil { + return + } + data, _ = utils.CastFieldIfToString(valIface) + return +} + +// AsNavigableMap is part of engine.DataProvider interface +func (xP *xmlProvider) AsNavigableMap([]*config.CfgCdrField) ( + nm *engine.NavigableMap, err error) { + return nil, utils.ErrNotImplemented +} diff --git a/cdrc/xml_it_test.go b/cdrc/xml_it_test.go index 80aef8231..7daa42bd2 100644 --- a/cdrc/xml_it_test.go +++ b/cdrc/xml_it_test.go @@ -130,3 +130,89 @@ func TestXmlITKillEngine(t *testing.T) { t.Error(err) } } + +// Begin tests for cdrc xml with new filters +func TestXmlIT2InitConfig(t *testing.T) { + var err error + xmlCfgPath = path.Join(*dataDir, "conf", "samples", "cdrcxmlwithfilter") + if xmlCfg, err = config.NewCGRConfigFromFolder(xmlCfgPath); err != nil { + t.Fatal("Got config error: ", err.Error()) + } +} + +// InitDb so we can rely on count +func TestXmlIT2InitCdrDb(t *testing.T) { + if err := engine.InitStorDb(xmlCfg); err != nil { + t.Fatal(err) + } +} + +func TestXmlIT2CreateCdrDirs(t *testing.T) { + for _, cdrcProfiles := range xmlCfg.CdrcProfiles { + for i, cdrcInst := range cdrcProfiles { + for _, dir := range []string{cdrcInst.CdrInDir, cdrcInst.CdrOutDir} { + if err := os.RemoveAll(dir); err != nil { + t.Fatal("Error removing folder: ", dir, err) + } + if err := os.MkdirAll(dir, 0755); err != nil { + t.Fatal("Error creating folder: ", dir, err) + } + } + if i == 0 { // Initialize the folders to check later + xmlPathIn1 = cdrcInst.CdrInDir + xmlPathOut1 = cdrcInst.CdrOutDir + } + } + } +} + +func TestXmlIT2StartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(xmlCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func TestXmlIT2RpcConn(t *testing.T) { + var err error + cdrcXmlRPC, err = jsonrpc.Dial("tcp", xmlCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal("Could not connect to rater: ", err.Error()) + } +} + +// The default scenario, out of cdrc defined in .cfg file +func TestXmlIT2HandleCdr1File(t *testing.T) { + fileName := "file1.xml" + tmpFilePath := path.Join("/tmp", fileName) + if err := ioutil.WriteFile(tmpFilePath, []byte(cdrXmlBroadsoft), 0644); err != nil { + t.Fatal(err.Error()) + } + if err := os.Rename(tmpFilePath, path.Join(xmlPathIn1, fileName)); err != nil { + t.Fatal("Error moving file to processing directory: ", err) + } +} + +func TestXmlIT2ProcessedFiles(t *testing.T) { + time.Sleep(time.Duration(2**waitRater) * time.Millisecond) + if outContent1, err := ioutil.ReadFile(path.Join(xmlPathOut1, "file1.xml")); err != nil { + t.Error(err) + } else if cdrXmlBroadsoft != string(outContent1) { + t.Errorf("Expecting: %q, received: %q", cdrXmlBroadsoft, string(outContent1)) + } +} + +func TestXmlIT2AnalyseCDRs(t *testing.T) { + var reply []*engine.ExternalCDR + if err := cdrcXmlRPC.Call("ApierV2.GetCdrs", utils.RPCCDRsFilter{}, &reply); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(reply) != 1 { + t.Error("Unexpected number of CDRs returned: ", len(reply)) + } +} + +func TestXmlIT2KillEngine(t *testing.T) { + if err := engine.KillEngine(*waitRater); err != nil { + t.Error(err) + } +} diff --git a/cdrc/xml_test.go b/cdrc/xml_test.go index 93510fa2d..17a38b5bb 100644 --- a/cdrc/xml_test.go +++ b/cdrc/xml_test.go @@ -271,3 +271,77 @@ func TestXMLRPProcess(t *testing.T) { t.Errorf("Expecting: %+v\n, received: %+v\n", expectedCDRs, cdrs) } } + +func TestXMLRPProcessWithNewFilters(t *testing.T) { + cdrcCfgs := []*config.CdrcConfig{ + &config.CdrcConfig{ + ID: "XMLWithFilters", + Enabled: true, + CdrFormat: "xml", + DataUsageMultiplyFactor: 1024, + CDRPath: utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}), + CdrSourceId: "XMLWithFilters", + Filters: []string{"*string:broadWorksCDR>cdrData>headerModule>type:Normal"}, + ContentFields: []*config.CfgCdrField{ + &config.CfgCdrField{Tag: "TOR", Type: utils.META_COMPOSED, FieldId: utils.ToR, + Value: utils.ParseRSRFieldsMustCompile("^*voice", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "OriginID", Type: utils.META_COMPOSED, FieldId: utils.OriginID, + Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>localCallId", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "RequestType", Type: utils.META_COMPOSED, FieldId: utils.RequestType, + Value: utils.ParseRSRFieldsMustCompile("^*rated", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "Tenant", Type: utils.META_COMPOSED, FieldId: utils.Tenant, + Value: utils.ParseRSRFieldsMustCompile("~broadWorksCDR>cdrData>basicModule>userId:s/.*@(.*)/${1}/", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "Category", Type: utils.META_COMPOSED, FieldId: utils.Category, + Value: utils.ParseRSRFieldsMustCompile("^call", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "Account", Type: utils.META_COMPOSED, FieldId: utils.Account, + Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>userNumber", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "Destination", Type: utils.META_COMPOSED, FieldId: utils.Destination, + Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>calledNumber", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "SetupTime", Type: utils.META_COMPOSED, FieldId: utils.SetupTime, + Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>startTime", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "AnswerTime", Type: utils.META_COMPOSED, FieldId: utils.AnswerTime, + Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>answerTime", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "Usage", Type: utils.META_HANDLER, + FieldId: utils.Usage, HandlerId: utils.HandlerSubstractUsage, + Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>releaseTime;^|;broadWorksCDR>cdrData>basicModule>answerTime", + utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "UsageSeconds", Type: utils.META_COMPOSED, FieldId: utils.Usage, + Value: utils.ParseRSRFieldsMustCompile("^s", utils.INFIELD_SEP), Mandatory: true}, + }, + }, + } + data, _ := engine.NewMapStorage() + defaultCfg, err := config.NewDefaultCGRConfig() + if err != nil { + t.Errorf("Error: %+v", err) + } + xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(cdrXmlBroadsoft), + utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}), "UTC", true, + cdrcCfgs, engine.NewFilterS(defaultCfg, nil, engine.NewDataManager(data))) + if err != nil { + t.Error(err) + } + var cdrs []*engine.CDR + for i := 0; i < 4; i++ { + cdrs, err = xmlRP.ProcessNextRecord() + if i == 1 { // Take second CDR since the first one cannot be processed + break + } + } + if err != nil { + t.Error(err) + } + expectedCDRs := []*engine.CDR{ + &engine.CDR{CGRID: "1f045359a0784d15e051d7e41ae30132b139d714", + OriginHost: "0.0.0.0", Source: "XMLWithFilters", OriginID: "25160047719:0", + ToR: "*voice", RequestType: "*rated", Tenant: "cgrates.org", + Category: "call", Account: "1001", Destination: "+4986517174963", + SetupTime: time.Date(2016, 4, 19, 21, 0, 5, 247000000, time.UTC), + AnswerTime: time.Date(2016, 4, 19, 21, 0, 6, 813000000, time.UTC), + Usage: time.Duration(13483000000), + ExtraFields: map[string]string{}, Cost: -1}, + } + if !reflect.DeepEqual(expectedCDRs, cdrs) { + t.Errorf("Expecting: %+v\n, received: %+v\n", expectedCDRs, cdrs) + } +} diff --git a/data/conf/samples/cdrcfwv/cgrates.json b/data/conf/samples/cdrcfwv/cgrates.json index 0ffba3f6c..4473fa6c9 100644 --- a/data/conf/samples/cdrcfwv/cgrates.json +++ b/data/conf/samples/cdrcfwv/cgrates.json @@ -37,7 +37,7 @@ "cdr_format": "fwv", // CDR file format "cdr_in_dir": "/tmp/cgr_fwv/cdrc/in", // absolute path towards the directory where the CDRs are stored "cdr_out_dir": "/tmp/cgr_fwv/cdrc/out", // absolute path towards the directory where processed CDRs will be moved - "cdr_source_id": "fwv_localtest", // free form field, tag identifying the source of the CDRs within CDRS database + "cdr_source_id": "cdrc", // free form field, tag identifying the source of the CDRs within CDRS database "cdr_filter": "", // filter CDR records to import "header_fields": [ {"tag": "FileName", "cdr_field_id": "CdrFileName", "type": "cdrfield", "value": "95", "width": 40, "padding":"right"}, diff --git a/data/conf/samples/cdrcxmlwithfilter/cgrates.json b/data/conf/samples/cdrcxmlwithfilter/cgrates.json new file mode 100755 index 000000000..736f8283f --- /dev/null +++ b/data/conf/samples/cdrcxmlwithfilter/cgrates.json @@ -0,0 +1,47 @@ +{ + +// Real-time Charging System for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH + +"stor_db": { // database used to store offline tariff plans and CDRs + "db_password": "CGRateS.org", // password to use when connecting to stordb +}, + + "rals": { + "enabled": true + }, + + "cdrs": { + "enabled": true, + "rals_conns": [], +}, + + + "cdrc": [ + { + "id": "XMLWithFilter", + "enabled": true, + "cdr_format": "xml", + "cdr_in_dir": "/tmp/cdrcxmlwithfilters/xmlit1/in", + "cdr_out_dir": "/tmp/cdrcxmlwithfilters/xmlit1/out", + "cdr_path": "broadWorksCDR>cdrData", + "cdr_source_id": "xmlit1", + "filters": ["*string:broadWorksCDR>cdrData>basicModule>userNumber:1002","*string:broadWorksCDR>cdrData>headerModule>type:Normal"], + "content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value + {"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "^*voice", "mandatory": true}, + {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "broadWorksCDR>cdrData>basicModule>localCallId", "mandatory": true}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "^*rated", "mandatory": true}, + {"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "^*out", "mandatory": true}, + {"tag": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "~broadWorksCDR>cdrData>basicModule>userId:s/.*@(.*)/${1}/", "mandatory": true}, + {"tag": "Category", "field_id": "Category", "type": "*composed", "value": "^call", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*composed", "value": "broadWorksCDR>cdrData>basicModule>userNumber", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*composed", "value": "broadWorksCDR>cdrData>basicModule>calledNumber", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "broadWorksCDR>cdrData>basicModule>startTime", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "broadWorksCDR>cdrData>basicModule>answerTime", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*handler", "handler_id": "*substract_usage", "value": "broadWorksCDR>cdrData>basicModule>releaseTime;^|;broadWorksCDR>cdrData>basicModule>answerTime", "mandatory": true}, + ], + }, +], + + +} \ No newline at end of file