diff --git a/cdrc/csv.go b/cdrc/csv.go
index 1d8832e65..05058fe50 100644
--- a/cdrc/csv.go
+++ b/cdrc/csv.go
@@ -123,9 +123,9 @@ func (self *CsvRecordsProcessor) processRecord(record []string) ([]*engine.CDR,
continue
}
} else {
- csvprovider, _ := newCsvProvider(record)
+ csvProvider, _ := newCsvProvider(record)
if pass, err := self.filterS.Pass("cgrates.org",
- cdrcCfg.Filters, csvprovider); err != nil || !pass {
+ cdrcCfg.Filters, csvProvider); err != nil || !pass {
continue // Not passes filters, ignore this CDR
}
}
@@ -249,7 +249,7 @@ func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcCfg *con
return storedCdr, nil
}
-// newRADataProvider constructs a DataProvider
+// newCsvProvider constructs a DataProvider
func newCsvProvider(record []string) (dP engine.DataProvider, err error) {
dP = &csvProvider{req: record, cache: engine.NewNavigableMap(nil)}
return
diff --git a/cdrc/fwv_it_test.go b/cdrc/fwv_it_test.go
index edcaa2778..c312c6222 100644
--- a/cdrc/fwv_it_test.go
+++ b/cdrc/fwv_it_test.go
@@ -19,6 +19,7 @@ along with this program. If not, see
*/
package cdrc
+//
import (
"io/ioutil"
"net/rpc"
diff --git a/cdrc/xml.go b/cdrc/xml.go
index 47f3c16ce..fb01e518d 100644
--- a/cdrc/xml.go
+++ b/cdrc/xml.go
@@ -132,21 +132,29 @@ func (xmlProc *XMLRecordsProcessor) ProcessNextRecord() (cdrs []*engine.CDR, err
cdrXML := xmlProc.cdrXmlElmts[xmlProc.procItems]
xmlProc.procItems += 1
for _, cdrcCfg := range xmlProc.cdrcCfgs {
- filtersPassing := true
- for _, rsrFltr := range cdrcCfg.CdrFilter {
- if rsrFltr == nil {
- continue // Pass
+ if len(cdrcCfg.Filters) == 0 {
+ filtersPassing := true
+ for _, rsrFltr := range cdrcCfg.CdrFilter {
+ if rsrFltr == nil {
+ continue // Pass
+ }
+ absolutePath := utils.ParseHierarchyPath(rsrFltr.Id, "")
+ relPath := utils.HierarchyPath(absolutePath[len(xmlProc.cdrPath)-1:]) // Need relative path to the xmlElmnt
+ fieldVal, _ := elementText(cdrXML, relPath.AsString("/", true))
+ if _, err := rsrFltr.Parse(fieldVal); err != nil {
+ filtersPassing = false
+ break
+ }
}
- absolutePath := utils.ParseHierarchyPath(rsrFltr.Id, "")
- relPath := utils.HierarchyPath(absolutePath[len(xmlProc.cdrPath)-1:]) // Need relative path to the xmlElmnt
- fieldVal, _ := elementText(cdrXML, relPath.AsString("/", true))
- if _, err := rsrFltr.Parse(fieldVal); err != nil {
- filtersPassing = false
- break
+ if !filtersPassing {
+ continue
+ }
+ } else {
+ xmlProvider, _ := newXmlProvider(cdrXML, xmlProc.cdrPath)
+ if pass, err := xmlProc.filterS.Pass("cgrates.org",
+ cdrcCfg.Filters, xmlProvider); err != nil || !pass {
+ continue // Not passes filters, ignore this CDR
}
- }
- if !filtersPassing {
- continue
}
if cdr, err := xmlProc.recordToCDR(cdrXML, cdrcCfg); err != nil {
return nil, fmt.Errorf(" Failed converting to CDR, error: %s", err.Error())
@@ -236,3 +244,56 @@ func (xmlProc *XMLRecordsProcessor) recordToCDR(xmlEntity tree.Res, cdrcCfg *con
}
return cdr, nil
}
+
+// newXmlProvider constructs a DataProvider
+func newXmlProvider(req tree.Res, cdrPath utils.HierarchyPath) (dP engine.DataProvider, err error) {
+ dP = &xmlProvider{req: req, cdrPath: cdrPath, cache: engine.NewNavigableMap(nil)}
+ return
+}
+
+// xmlProvider implements engine.DataProvider so we can pass it to filters
+type xmlProvider struct {
+ req tree.Res
+ cdrPath utils.HierarchyPath //used to compute relative path
+ cache *engine.NavigableMap
+}
+
+// String is part of engine.DataProvider interface
+// when called, it will display the already parsed values out of cache
+func (xP *xmlProvider) String() string {
+ return utils.ToJSON(xP)
+}
+
+// FieldAsInterface is part of engine.DataProvider interface
+func (xP *xmlProvider) FieldAsInterface(fldPath []string) (data interface{}, err error) {
+ if len(fldPath) != 1 {
+ return nil, utils.ErrNotFound
+ }
+ if data, err = xP.cache.FieldAsInterface(fldPath); err == nil ||
+ err != utils.ErrNotFound { // item found in cache
+ return
+ }
+ err = nil // cancel previous err
+ absolutePath := utils.ParseHierarchyPath(fldPath[0], "")
+ relPath := utils.HierarchyPath(absolutePath[len(xP.cdrPath)-1:]) // Need relative path to the xmlElmnt
+ data, err = elementText(xP.req, relPath.AsString("/", true))
+ xP.cache.Set(fldPath, data, false)
+ return
+}
+
+// FieldAsString is part of engine.DataProvider interface
+func (xP *xmlProvider) FieldAsString(fldPath []string) (data string, err error) {
+ var valIface interface{}
+ valIface, err = xP.FieldAsInterface(fldPath)
+ if err != nil {
+ return
+ }
+ data, _ = utils.CastFieldIfToString(valIface)
+ return
+}
+
+// AsNavigableMap is part of engine.DataProvider interface
+func (xP *xmlProvider) AsNavigableMap([]*config.CfgCdrField) (
+ nm *engine.NavigableMap, err error) {
+ return nil, utils.ErrNotImplemented
+}
diff --git a/cdrc/xml_it_test.go b/cdrc/xml_it_test.go
index 80aef8231..7daa42bd2 100644
--- a/cdrc/xml_it_test.go
+++ b/cdrc/xml_it_test.go
@@ -130,3 +130,89 @@ func TestXmlITKillEngine(t *testing.T) {
t.Error(err)
}
}
+
+// Begin tests for cdrc xml with new filters
+func TestXmlIT2InitConfig(t *testing.T) {
+ var err error
+ xmlCfgPath = path.Join(*dataDir, "conf", "samples", "cdrcxmlwithfilter")
+ if xmlCfg, err = config.NewCGRConfigFromFolder(xmlCfgPath); err != nil {
+ t.Fatal("Got config error: ", err.Error())
+ }
+}
+
+// InitDb so we can rely on count
+func TestXmlIT2InitCdrDb(t *testing.T) {
+ if err := engine.InitStorDb(xmlCfg); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestXmlIT2CreateCdrDirs(t *testing.T) {
+ for _, cdrcProfiles := range xmlCfg.CdrcProfiles {
+ for i, cdrcInst := range cdrcProfiles {
+ for _, dir := range []string{cdrcInst.CdrInDir, cdrcInst.CdrOutDir} {
+ if err := os.RemoveAll(dir); err != nil {
+ t.Fatal("Error removing folder: ", dir, err)
+ }
+ if err := os.MkdirAll(dir, 0755); err != nil {
+ t.Fatal("Error creating folder: ", dir, err)
+ }
+ }
+ if i == 0 { // Initialize the folders to check later
+ xmlPathIn1 = cdrcInst.CdrInDir
+ xmlPathOut1 = cdrcInst.CdrOutDir
+ }
+ }
+ }
+}
+
+func TestXmlIT2StartEngine(t *testing.T) {
+ if _, err := engine.StopStartEngine(xmlCfgPath, *waitRater); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// Connect rpc client to rater
+func TestXmlIT2RpcConn(t *testing.T) {
+ var err error
+ cdrcXmlRPC, err = jsonrpc.Dial("tcp", xmlCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
+ if err != nil {
+ t.Fatal("Could not connect to rater: ", err.Error())
+ }
+}
+
+// The default scenario, out of cdrc defined in .cfg file
+func TestXmlIT2HandleCdr1File(t *testing.T) {
+ fileName := "file1.xml"
+ tmpFilePath := path.Join("/tmp", fileName)
+ if err := ioutil.WriteFile(tmpFilePath, []byte(cdrXmlBroadsoft), 0644); err != nil {
+ t.Fatal(err.Error())
+ }
+ if err := os.Rename(tmpFilePath, path.Join(xmlPathIn1, fileName)); err != nil {
+ t.Fatal("Error moving file to processing directory: ", err)
+ }
+}
+
+func TestXmlIT2ProcessedFiles(t *testing.T) {
+ time.Sleep(time.Duration(2**waitRater) * time.Millisecond)
+ if outContent1, err := ioutil.ReadFile(path.Join(xmlPathOut1, "file1.xml")); err != nil {
+ t.Error(err)
+ } else if cdrXmlBroadsoft != string(outContent1) {
+ t.Errorf("Expecting: %q, received: %q", cdrXmlBroadsoft, string(outContent1))
+ }
+}
+
+func TestXmlIT2AnalyseCDRs(t *testing.T) {
+ var reply []*engine.ExternalCDR
+ if err := cdrcXmlRPC.Call("ApierV2.GetCdrs", utils.RPCCDRsFilter{}, &reply); err != nil {
+ t.Error("Unexpected error: ", err.Error())
+ } else if len(reply) != 1 {
+ t.Error("Unexpected number of CDRs returned: ", len(reply))
+ }
+}
+
+func TestXmlIT2KillEngine(t *testing.T) {
+ if err := engine.KillEngine(*waitRater); err != nil {
+ t.Error(err)
+ }
+}
diff --git a/cdrc/xml_test.go b/cdrc/xml_test.go
index 93510fa2d..17a38b5bb 100644
--- a/cdrc/xml_test.go
+++ b/cdrc/xml_test.go
@@ -271,3 +271,77 @@ func TestXMLRPProcess(t *testing.T) {
t.Errorf("Expecting: %+v\n, received: %+v\n", expectedCDRs, cdrs)
}
}
+
+func TestXMLRPProcessWithNewFilters(t *testing.T) {
+ cdrcCfgs := []*config.CdrcConfig{
+ &config.CdrcConfig{
+ ID: "XMLWithFilters",
+ Enabled: true,
+ CdrFormat: "xml",
+ DataUsageMultiplyFactor: 1024,
+ CDRPath: utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}),
+ CdrSourceId: "XMLWithFilters",
+ Filters: []string{"*string:broadWorksCDR>cdrData>headerModule>type:Normal"},
+ ContentFields: []*config.CfgCdrField{
+ &config.CfgCdrField{Tag: "TOR", Type: utils.META_COMPOSED, FieldId: utils.ToR,
+ Value: utils.ParseRSRFieldsMustCompile("^*voice", utils.INFIELD_SEP), Mandatory: true},
+ &config.CfgCdrField{Tag: "OriginID", Type: utils.META_COMPOSED, FieldId: utils.OriginID,
+ Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>localCallId", utils.INFIELD_SEP), Mandatory: true},
+ &config.CfgCdrField{Tag: "RequestType", Type: utils.META_COMPOSED, FieldId: utils.RequestType,
+ Value: utils.ParseRSRFieldsMustCompile("^*rated", utils.INFIELD_SEP), Mandatory: true},
+ &config.CfgCdrField{Tag: "Tenant", Type: utils.META_COMPOSED, FieldId: utils.Tenant,
+ Value: utils.ParseRSRFieldsMustCompile("~broadWorksCDR>cdrData>basicModule>userId:s/.*@(.*)/${1}/", utils.INFIELD_SEP), Mandatory: true},
+ &config.CfgCdrField{Tag: "Category", Type: utils.META_COMPOSED, FieldId: utils.Category,
+ Value: utils.ParseRSRFieldsMustCompile("^call", utils.INFIELD_SEP), Mandatory: true},
+ &config.CfgCdrField{Tag: "Account", Type: utils.META_COMPOSED, FieldId: utils.Account,
+ Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>userNumber", utils.INFIELD_SEP), Mandatory: true},
+ &config.CfgCdrField{Tag: "Destination", Type: utils.META_COMPOSED, FieldId: utils.Destination,
+ Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>calledNumber", utils.INFIELD_SEP), Mandatory: true},
+ &config.CfgCdrField{Tag: "SetupTime", Type: utils.META_COMPOSED, FieldId: utils.SetupTime,
+ Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>startTime", utils.INFIELD_SEP), Mandatory: true},
+ &config.CfgCdrField{Tag: "AnswerTime", Type: utils.META_COMPOSED, FieldId: utils.AnswerTime,
+ Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>answerTime", utils.INFIELD_SEP), Mandatory: true},
+ &config.CfgCdrField{Tag: "Usage", Type: utils.META_HANDLER,
+ FieldId: utils.Usage, HandlerId: utils.HandlerSubstractUsage,
+ Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>releaseTime;^|;broadWorksCDR>cdrData>basicModule>answerTime",
+ utils.INFIELD_SEP), Mandatory: true},
+ &config.CfgCdrField{Tag: "UsageSeconds", Type: utils.META_COMPOSED, FieldId: utils.Usage,
+ Value: utils.ParseRSRFieldsMustCompile("^s", utils.INFIELD_SEP), Mandatory: true},
+ },
+ },
+ }
+ data, _ := engine.NewMapStorage()
+ defaultCfg, err := config.NewDefaultCGRConfig()
+ if err != nil {
+ t.Errorf("Error: %+v", err)
+ }
+ xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(cdrXmlBroadsoft),
+ utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}), "UTC", true,
+ cdrcCfgs, engine.NewFilterS(defaultCfg, nil, engine.NewDataManager(data)))
+ if err != nil {
+ t.Error(err)
+ }
+ var cdrs []*engine.CDR
+ for i := 0; i < 4; i++ {
+ cdrs, err = xmlRP.ProcessNextRecord()
+ if i == 1 { // Take second CDR since the first one cannot be processed
+ break
+ }
+ }
+ if err != nil {
+ t.Error(err)
+ }
+ expectedCDRs := []*engine.CDR{
+ &engine.CDR{CGRID: "1f045359a0784d15e051d7e41ae30132b139d714",
+ OriginHost: "0.0.0.0", Source: "XMLWithFilters", OriginID: "25160047719:0",
+ ToR: "*voice", RequestType: "*rated", Tenant: "cgrates.org",
+ Category: "call", Account: "1001", Destination: "+4986517174963",
+ SetupTime: time.Date(2016, 4, 19, 21, 0, 5, 247000000, time.UTC),
+ AnswerTime: time.Date(2016, 4, 19, 21, 0, 6, 813000000, time.UTC),
+ Usage: time.Duration(13483000000),
+ ExtraFields: map[string]string{}, Cost: -1},
+ }
+ if !reflect.DeepEqual(expectedCDRs, cdrs) {
+ t.Errorf("Expecting: %+v\n, received: %+v\n", expectedCDRs, cdrs)
+ }
+}
diff --git a/data/conf/samples/cdrcfwv/cgrates.json b/data/conf/samples/cdrcfwv/cgrates.json
index 0ffba3f6c..4473fa6c9 100644
--- a/data/conf/samples/cdrcfwv/cgrates.json
+++ b/data/conf/samples/cdrcfwv/cgrates.json
@@ -37,7 +37,7 @@
"cdr_format": "fwv", // CDR file format
"cdr_in_dir": "/tmp/cgr_fwv/cdrc/in", // absolute path towards the directory where the CDRs are stored
"cdr_out_dir": "/tmp/cgr_fwv/cdrc/out", // absolute path towards the directory where processed CDRs will be moved
- "cdr_source_id": "fwv_localtest", // free form field, tag identifying the source of the CDRs within CDRS database
+ "cdr_source_id": "cdrc", // free form field, tag identifying the source of the CDRs within CDRS database
"cdr_filter": "", // filter CDR records to import
"header_fields": [
{"tag": "FileName", "cdr_field_id": "CdrFileName", "type": "cdrfield", "value": "95", "width": 40, "padding":"right"},
diff --git a/data/conf/samples/cdrcxmlwithfilter/cgrates.json b/data/conf/samples/cdrcxmlwithfilter/cgrates.json
new file mode 100755
index 000000000..736f8283f
--- /dev/null
+++ b/data/conf/samples/cdrcxmlwithfilter/cgrates.json
@@ -0,0 +1,47 @@
+{
+
+// Real-time Charging System for Telecom & ISP environments
+// Copyright (C) ITsysCOM GmbH
+
+"stor_db": { // database used to store offline tariff plans and CDRs
+ "db_password": "CGRateS.org", // password to use when connecting to stordb
+},
+
+ "rals": {
+ "enabled": true
+ },
+
+ "cdrs": {
+ "enabled": true,
+ "rals_conns": [],
+},
+
+
+ "cdrc": [
+ {
+ "id": "XMLWithFilter",
+ "enabled": true,
+ "cdr_format": "xml",
+ "cdr_in_dir": "/tmp/cdrcxmlwithfilters/xmlit1/in",
+ "cdr_out_dir": "/tmp/cdrcxmlwithfilters/xmlit1/out",
+ "cdr_path": "broadWorksCDR>cdrData",
+ "cdr_source_id": "xmlit1",
+ "filters": ["*string:broadWorksCDR>cdrData>basicModule>userNumber:1002","*string:broadWorksCDR>cdrData>headerModule>type:Normal"],
+ "content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
+ {"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "^*voice", "mandatory": true},
+ {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "broadWorksCDR>cdrData>basicModule>localCallId", "mandatory": true},
+ {"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "^*rated", "mandatory": true},
+ {"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "^*out", "mandatory": true},
+ {"tag": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "~broadWorksCDR>cdrData>basicModule>userId:s/.*@(.*)/${1}/", "mandatory": true},
+ {"tag": "Category", "field_id": "Category", "type": "*composed", "value": "^call", "mandatory": true},
+ {"tag": "Account", "field_id": "Account", "type": "*composed", "value": "broadWorksCDR>cdrData>basicModule>userNumber", "mandatory": true},
+ {"tag": "Destination", "field_id": "Destination", "type": "*composed", "value": "broadWorksCDR>cdrData>basicModule>calledNumber", "mandatory": true},
+ {"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "broadWorksCDR>cdrData>basicModule>startTime", "mandatory": true},
+ {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "broadWorksCDR>cdrData>basicModule>answerTime", "mandatory": true},
+ {"tag": "Usage", "field_id": "Usage", "type": "*handler", "handler_id": "*substract_usage", "value": "broadWorksCDR>cdrData>basicModule>releaseTime;^|;broadWorksCDR>cdrData>basicModule>answerTime", "mandatory": true},
+ ],
+ },
+],
+
+
+}
\ No newline at end of file