From 4ea9cd3a48bd1610067b2daef575eef7b53485c8 Mon Sep 17 00:00:00 2001 From: TeoV Date: Sat, 11 Jan 2020 14:52:37 +0200 Subject: [PATCH] Add XML to EventReader and test for it --- cdrc/xml.go | 96 +------ cdrc/xml_it_test.go | 1 - cdrc/xml_test.go | 146 +++------- config/cdrccfg.go | 2 +- config/config.go | 3 +- config/config_it_test.go | 6 +- config/config_test.go | 4 +- config/configsanity.go | 16 +- config/erscfg.go | 4 +- config/erscfg_test.go | 4 +- config/xmldp.go | 122 ++++++++ config/xmldp_test.go | 440 +++++++++++++++++++++++++++++ data/conf/samples/ers/cgrates.json | 22 ++ ers/filecsv_it_test.go | 4 +- ers/filexml.go | 173 ++++++++++++ ers/filexml_it_test.go | 302 ++++++++++++++++++++ ers/reader.go | 2 + utils/consts.go | 2 +- 18 files changed, 1126 insertions(+), 223 deletions(-) create mode 100644 config/xmldp.go create mode 100644 config/xmldp_test.go create mode 100644 ers/filexml.go create mode 100644 ers/filexml_it_test.go diff --git a/cdrc/xml.go b/cdrc/xml.go index b3bb3565d..c011cad44 100644 --- a/cdrc/xml.go +++ b/cdrc/xml.go @@ -23,8 +23,6 @@ import ( "errors" "fmt" "io" - "net" - "strconv" "strings" "time" @@ -34,17 +32,6 @@ import ( "github.com/cgrates/cgrates/utils" ) -// getElementText will process the node to extract the elementName's text out of it (only first one found) -// returns utils.ErrNotFound if the element is not found in the node -func elementText(xmlElement *xmlquery.Node, elmntPath string) (string, error) { - elmnt := xmlquery.FindOne(xmlElement, elmntPath) - if elmnt == nil { - return "", utils.ErrNotFound - } - return elmnt.InnerText(), nil - -} - // handlerUsageDiff will calculate the usage as difference between timeEnd and timeStart // Expects the 2 arguments in template separated by | func handlerSubstractUsage(xmlElement *xmlquery.Node, argsTpl config.RSRParsers, @@ -57,7 +44,7 @@ func handlerSubstractUsage(xmlElement *xmlquery.Node, argsTpl config.RSRParsers, } absolutePath := utils.ParseHierarchyPath(rsrArg.Rules, "") relPath := utils.HierarchyPath(absolutePath[len(cdrPath)+1:]) // Need relative path to the xmlElmnt - argStr, _ := elementText(xmlElement, relPath.AsString("/", false)) + argStr, _ := config.ElementText(xmlElement, relPath.AsString("/", false)) argsStr += argStr } handlerArgs := strings.Split(argsStr, utils.HandlerArgSep) @@ -116,7 +103,7 @@ func (xmlProc *XMLRecordsProcessor) ProcessNextRecord() (cdrs []*engine.CDR, err cdrs = make([]*engine.CDR, 0) cdrXML := xmlProc.cdrXmlElmts[xmlProc.procItems] xmlProc.procItems += 1 - xmlProvider := newXmlProvider(cdrXML, xmlProc.cdrPath) + xmlProvider := config.NewXmlProvider(cdrXML, xmlProc.cdrPath, utils.MetaReq) for _, cdrcCfg := range xmlProc.cdrcCfgs { tenant, err := cdrcCfg.Tenant.ParseDataProvider(xmlProvider, utils.NestingSep) if err != nil { @@ -148,7 +135,7 @@ func (xmlProc *XMLRecordsProcessor) recordToCDR(xmlEntity *xmlquery.Node, cdrcCf var lazyHttpFields []*config.FCTemplate var err error fldVals := make(map[string]string) - xmlProvider := newXmlProvider(xmlEntity, xmlProc.cdrPath) + xmlProvider := config.NewXmlProvider(xmlEntity, xmlProc.cdrPath, utils.MetaReq) for _, cdrFldCfg := range cdrcCfg.ContentFields { if len(cdrFldCfg.Filters) != 0 { if pass, err := xmlProc.filterS.Pass(tenant, @@ -207,80 +194,3 @@ func (xmlProc *XMLRecordsProcessor) recordToCDR(xmlEntity *xmlquery.Node, cdrcCf } return cdr, nil } - -// newXmlProvider constructs a DataProvider -func newXmlProvider(req *xmlquery.Node, cdrPath utils.HierarchyPath) (dP config.DataProvider) { - dP = &xmlProvider{req: req, cdrPath: cdrPath, cache: config.NewNavigableMap(nil)} - return -} - -// xmlProvider implements engine.DataProvider so we can pass it to filters -type xmlProvider struct { - req *xmlquery.Node - cdrPath utils.HierarchyPath //used to compute relative path - cache *config.NavigableMap -} - -// String is part of engine.DataProvider interface -// when called, it will display the already parsed values out of cache -func (xP *xmlProvider) String() string { - return utils.ToJSON(xP) -} - -// FieldAsInterface is part of engine.DataProvider interface -func (xP *xmlProvider) FieldAsInterface(fldPath []string) (data interface{}, err error) { - if len(fldPath) == 0 { - return nil, utils.ErrNotFound - } - if fldPath[0] != utils.MetaReq { - return "", utils.ErrPrefixNotFound(strings.Join(fldPath, utils.NestingSep)) - } - if data, err = xP.cache.FieldAsInterface(fldPath); err == nil || - err != utils.ErrNotFound { // item found in cache - return - } - err = nil // cancel previous err - relPath := utils.HierarchyPath(fldPath[len(xP.cdrPath)+1:]) // Need relative path to the xmlElmnt - var slctrStr string - for i := range relPath { - if sIdx := strings.Index(relPath[i], "["); sIdx != -1 { - slctrStr = relPath[i][sIdx:] - if slctrStr[len(slctrStr)-1:] != "]" { - return nil, fmt.Errorf("filter rule <%s> needs to end in ]", slctrStr) - } - relPath[i] = relPath[i][:sIdx] - if slctrStr[1:2] != "@" { - i, err := strconv.Atoi(slctrStr[1 : len(slctrStr)-1]) - if err != nil { - return nil, err - } - slctrStr = "[" + strconv.Itoa(i+1) + "]" - } - relPath[i] = relPath[i] + slctrStr - } - } - data, err = elementText(xP.req, relPath.AsString("/", false)) - xP.cache.Set(fldPath, data, false, false) - return -} - -// FieldAsString is part of engine.DataProvider interface -func (xP *xmlProvider) FieldAsString(fldPath []string) (data string, err error) { - var valIface interface{} - valIface, err = xP.FieldAsInterface(fldPath) - if err != nil { - return - } - return utils.IfaceAsString(valIface), nil -} - -// AsNavigableMap is part of engine.DataProvider interface -func (xP *xmlProvider) AsNavigableMap([]*config.FCTemplate) ( - nm *config.NavigableMap, err error) { - return nil, utils.ErrNotImplemented -} - -// RemoteHost is part of engine.DataProvider interface -func (xP *xmlProvider) RemoteHost() net.Addr { - return utils.LocalAddr() -} diff --git a/cdrc/xml_it_test.go b/cdrc/xml_it_test.go index 4c22535be..3faa4429f 100644 --- a/cdrc/xml_it_test.go +++ b/cdrc/xml_it_test.go @@ -132,7 +132,6 @@ func TestXmlITAnalyseCDRs(t *testing.T) { } else if len(reply) != 1 { t.Error("Unexpected number of CDRs returned: ", len(reply)) } - } func TestXmlITKillEngine(t *testing.T) { diff --git a/cdrc/xml_test.go b/cdrc/xml_test.go index cdb382fa0..920faee6a 100644 --- a/cdrc/xml_test.go +++ b/cdrc/xml_test.go @@ -166,28 +166,40 @@ var cdrXmlBroadsoft = ` ` -func TestXMLElementText(t *testing.T) { - doc, err := xmlquery.Parse(strings.NewReader(cdrXmlBroadsoft)) - if err != nil { - t.Error(err) - } - cdrs := xmlquery.Find(doc, path.Join("/broadWorksCDR/cdrData/")) - cdrWithoutUserNr := cdrs[0] - if _, err := elementText(cdrWithoutUserNr, "basicModule/userNumber"); err != utils.ErrNotFound { - t.Error(err) - } - cdrWithUser := cdrs[1] - if val, err := elementText(cdrWithUser, "basicModule/userNumber"); err != nil { - t.Error(err) - } else if val != "1001" { - t.Errorf("Expecting: 1001, received: %s", val) - } - if val, err := elementText(cdrWithUser, "centrexModule/locationList/locationInformation/locationType"); err != nil { - t.Error(err) - } else if val != "Primary Device" { - t.Errorf("Expecting: , received: <%s>", val) - } -} +var xmlMultipleIndex = ` + 2005-08-26T14:16:42 + 2005-08-26T14:16:56 + 2005-08-26T14:17:34 + My Call Reference + 386 + sampleusername + 1 + Conecto LLC + US$0.21 + yes + no + US$0.13 + 44 + + +441624828505 + Isle of Man + 38 + US$0.0200 + US$0.0140 + US$0.0130 + US$0.0082 + + + +44 7624 494075 + Isle of Man + 37 + US$0.2700 + US$0.1890 + US$0.1880 + US$0.1159 + + +` func TestXMLHandlerSubstractUsage(t *testing.T) { doc, err := xmlquery.Parse(strings.NewReader(cdrXmlBroadsoft)) @@ -525,28 +537,6 @@ var xmlContent = ` ` -func TestXMLElementText3(t *testing.T) { - doc, err := xmlquery.Parse(strings.NewReader(xmlContent)) - if err != nil { - t.Error(err) - } - hPath2 := utils.ParseHierarchyPath("File.CDRs.Call", "") - cdrs := xmlquery.Find(doc, hPath2.AsString("/", true)) - if len(cdrs) != 3 { - t.Errorf("Expecting: 3, received: %+v", len(cdrs)) - } - - if _, err := elementText(cdrs[0], "SignalingInfo/PChargingVector/test"); err != utils.ErrNotFound { - t.Error(err) - } - - if val, err := elementText(cdrs[1], "SignalingInfo/PChargingVector/icidvalue"); err != nil { - t.Error(err) - } else if val != "46d7974398c2671016afccc3f2c428c7" { - t.Errorf("Expecting: 46d7974398c2671016afccc3f2c428c7, received: %s", val) - } -} - func TestXMLRPNestingSeparator(t *testing.T) { cdrcCfgs := []*config.CdrcCfg{ { @@ -627,71 +617,3 @@ func TestXMLRPNestingSeparator(t *testing.T) { t.Errorf("Expecting: %+v\n, received: %+v\n", expectedCDRs, cdrs) } } - -var xmlMultipleIndex = ` - 2005-08-26T14:16:42 - 2005-08-26T14:16:56 - 2005-08-26T14:17:34 - My Call Reference - 386 - sampleusername - 1 - Conecto LLC - US$0.21 - yes - no - US$0.13 - 44 - - +441624828505 - Isle of Man - 38 - US$0.0200 - US$0.0140 - US$0.0130 - US$0.0082 - - - +44 7624 494075 - Isle of Man - 37 - US$0.2700 - US$0.1890 - US$0.1880 - US$0.1159 - - -` - -func TestXMLIndexes(t *testing.T) { - doc, err := xmlquery.Parse(strings.NewReader(xmlMultipleIndex)) - if err != nil { - t.Error(err) - } - dP := newXmlProvider(doc, utils.HierarchyPath([]string{})) - if data, err := dP.FieldAsString([]string{"*req", "complete-success-notification", "userid"}); err != nil { - t.Error(err) - } else if data != "386" { - t.Errorf("expecting: 386, received: <%s>", data) - } - if data, err := dP.FieldAsString([]string{"*req", "complete-success-notification", "username"}); err != nil { - t.Error(err) - } else if data != "sampleusername" { - t.Errorf("expecting: sampleusername, received: <%s>", data) - } - if data, err := dP.FieldAsString([]string{"*req", "complete-success-notification", "callleg", "seconds"}); err != nil { - t.Error(err) - } else if data != "38" { - t.Errorf("expecting: 38, received: <%s>", data) - } - if data, err := dP.FieldAsString([]string{"*req", "complete-success-notification", "callleg[1]", "seconds"}); err != nil { - t.Error(err) - } else if data != "37" { - t.Errorf("expecting: 37, received: <%s>", data) - } - if data, err := dP.FieldAsString([]string{"*req", "complete-success-notification", "callleg[@calllegid='222147']", "seconds"}); err != nil { - t.Error(err) - } else if data != "37" { - t.Errorf("expecting: 37, received: <%s>", data) - } -} diff --git a/config/cdrccfg.go b/config/cdrccfg.go index 006aa60d8..245c055e4 100644 --- a/config/cdrccfg.go +++ b/config/cdrccfg.go @@ -101,7 +101,7 @@ func (self *CdrcCfg) loadFromJsonCfg(jsnCfg *CdrcJsonCfg, separator string) erro self.FailedCallsPrefix = *jsnCfg.Failed_calls_prefix } if jsnCfg.Cdr_root_path != nil { - self.CDRRootPath = utils.ParseHierarchyPath(*jsnCfg.Cdr_root_path, "") + self.CDRRootPath = utils.ParseHierarchyPath(*jsnCfg.Cdr_root_path, utils.EmptyString) } if jsnCfg.Cdr_source_id != nil { self.CdrSourceId = *jsnCfg.Cdr_source_id diff --git a/config/config.go b/config/config.go index ca6e9c134..0ea376a0d 100755 --- a/config/config.go +++ b/config/config.go @@ -301,7 +301,8 @@ var posibleLoaderTypes = utils.NewStringSet([]string{utils.MetaAttributes, utils.MetaSuppliers, utils.MetaThresholds, utils.MetaChargers, utils.MetaDispatchers, utils.MetaDispatcherHosts}) -var possibleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.MetaKafkajsonMap}) +var possibleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV, + utils.MetaKafkajsonMap, utils.MetaFileXML, utils.MetaSQL}) func (cfg *CGRConfig) LazySanityCheck() { for _, cdrePrfl := range cfg.cdrsCfg.OnlineCDRExports { diff --git a/config/config_it_test.go b/config/config_it_test.go index b9dddb4b4..13cc3a5e4 100644 --- a/config/config_it_test.go +++ b/config/config_it_test.go @@ -425,6 +425,7 @@ func TestCGRConfigReloadERs(t *testing.T) { HeaderFields: []*FCTemplate{}, ContentFields: content, TrailerFields: []*FCTemplate{}, + XmlRootPath: utils.HierarchyPath{utils.EmptyString}, }, &EventReaderCfg{ ID: "file_reader1", @@ -438,6 +439,7 @@ func TestCGRConfigReloadERs(t *testing.T) { HeaderFields: []*FCTemplate{}, ContentFields: content, TrailerFields: []*FCTemplate{}, + XmlRootPath: utils.HierarchyPath{utils.EmptyString}, }, }, } @@ -824,7 +826,7 @@ func TestCgrCfgV1ReloadConfigSection(t *testing.T) { "Timezone": "", "TrailerFields": []interface{}{}, "Type": "*file_csv", - "XmlRootPath": "", + "XmlRootPath": []interface{}{utils.EmptyString}, }, map[string]interface{}{ "ConcurrentReqs": 1024, @@ -842,7 +844,7 @@ func TestCgrCfgV1ReloadConfigSection(t *testing.T) { "Timezone": "", "TrailerFields": []interface{}{}, "Type": "*file_csv", - "XmlRootPath": "", + "XmlRootPath": []interface{}{utils.EmptyString}, "ContentFields": content, }, }, diff --git a/config/config_test.go b/config/config_test.go index d8b1f6314..9f0c67602 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -1799,7 +1799,7 @@ func TestCgrCdfEventReader(t *testing.T) { ConcurrentReqs: 1024, SourcePath: "/var/spool/cgrates/cdrc/in", ProcessedPath: "/var/spool/cgrates/cdrc/out", - XmlRootPath: utils.EmptyString, + XmlRootPath: utils.HierarchyPath{utils.EmptyString}, Tenant: nil, Timezone: utils.EmptyString, Filters: []string{}, @@ -1847,7 +1847,7 @@ func TestCgrCfgEventReaderDefault(t *testing.T) { ConcurrentReqs: 1024, SourcePath: "/var/spool/cgrates/cdrc/in", ProcessedPath: "/var/spool/cgrates/cdrc/out", - XmlRootPath: utils.EmptyString, + XmlRootPath: utils.HierarchyPath{utils.EmptyString}, Tenant: nil, Timezone: utils.EmptyString, Filters: nil, diff --git a/config/configsanity.go b/config/configsanity.go index 15d213c42..abf95f00e 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -415,7 +415,8 @@ func (cfg *CGRConfig) checkConfigSanity() error { return fmt.Errorf("<%s> unsupported data type: %s for reader with ID: %s", utils.ERs, rdr.Type, rdr.ID) } - if rdr.Type == utils.MetaFileCSV { + switch rdr.Type { + case utils.MetaFileCSV: for _, dir := range []string{rdr.ProcessedPath, rdr.SourcePath} { if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) { return fmt.Errorf("<%s> Nonexistent folder: %s for reader with ID: %s", utils.ERs, dir, rdr.ID) @@ -424,9 +425,16 @@ func (cfg *CGRConfig) checkConfigSanity() error { if rdr.FieldSep == utils.EmptyString { return fmt.Errorf("<%s> empty FieldSep for reader with ID: %s", utils.ERs, rdr.ID) } - } - if rdr.Type == utils.MetaKafkajsonMap && rdr.RunDelay > 0 { - return fmt.Errorf("<%s> RunDelay field can not be bigger than zero for reader with ID: %s", utils.ERs, rdr.ID) + case utils.MetaKafkajsonMap: + if rdr.RunDelay > 0 { + return fmt.Errorf("<%s> RunDelay field can not be bigger than zero for reader with ID: %s", utils.ERs, rdr.ID) + } + case utils.MetaFileXML: + for _, dir := range []string{rdr.ProcessedPath, rdr.SourcePath} { + if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) { + return fmt.Errorf("<%s> Nonexistent folder: %s for reader with ID: %s", utils.ERs, dir, rdr.ID) + } + } } } } diff --git a/config/erscfg.go b/config/erscfg.go index a0ea603d5..bdaa599fb 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -106,7 +106,7 @@ type EventReaderCfg struct { ConcurrentReqs int SourcePath string ProcessedPath string - XmlRootPath string + XmlRootPath utils.HierarchyPath Tenant RSRParsers Timezone string Filters []string @@ -142,7 +142,7 @@ func (er *EventReaderCfg) loadFromJsonCfg(jsnCfg *EventReaderJsonCfg, sep string er.ProcessedPath = *jsnCfg.Processed_path } if jsnCfg.Xml_root_path != nil { - er.XmlRootPath = *jsnCfg.Xml_root_path + er.XmlRootPath = utils.ParseHierarchyPath(*jsnCfg.Xml_root_path, utils.EmptyString) } if jsnCfg.Tenant != nil { if er.Tenant, err = NewRSRParsers(*jsnCfg.Tenant, true, sep); err != nil { diff --git a/config/erscfg_test.go b/config/erscfg_test.go index 94300b18f..e7ce41a1b 100644 --- a/config/erscfg_test.go +++ b/config/erscfg_test.go @@ -108,7 +108,7 @@ func TestEventReaderLoadFromJSON(t *testing.T) { ConcurrentReqs: 1024, SourcePath: "/var/spool/cgrates/cdrc/in", ProcessedPath: "/var/spool/cgrates/cdrc/out", - XmlRootPath: utils.EmptyString, + XmlRootPath: utils.HierarchyPath{utils.EmptyString}, Tenant: nil, Timezone: utils.EmptyString, Filters: []string{}, @@ -148,7 +148,7 @@ func TestEventReaderLoadFromJSON(t *testing.T) { ConcurrentReqs: 1024, SourcePath: "/tmp/ers/in", ProcessedPath: "/tmp/ers/out", - XmlRootPath: utils.EmptyString, + XmlRootPath: utils.HierarchyPath{utils.EmptyString}, Tenant: nil, Timezone: utils.EmptyString, Filters: nil, diff --git a/config/xmldp.go b/config/xmldp.go new file mode 100644 index 000000000..8cd75577c --- /dev/null +++ b/config/xmldp.go @@ -0,0 +1,122 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package config + +import ( + "fmt" + "net" + "strconv" + "strings" + + "github.com/antchfx/xmlquery" + "github.com/cgrates/cgrates/utils" +) + +// NewXmlProvider constructs a DataProvider +func NewXmlProvider(req *xmlquery.Node, cdrPath utils.HierarchyPath, pathPrfx string) (dP DataProvider) { + dP = &XmlProvider{req: req, cdrPath: cdrPath, cache: NewNavigableMap(nil), pathPrfx: pathPrfx} + return +} + +// XmlProvider implements engine.DataProvider so we can pass it to filters +type XmlProvider struct { + req *xmlquery.Node + cdrPath utils.HierarchyPath //used to compute relative path + cache *NavigableMap + pathPrfx string // if this comes in path it will be ignored + // pathPrfx should be reviewed once the cdrc is removed +} + +// String is part of engine.DataProvider interface +// when called, it will display the already parsed values out of cache +func (xP *XmlProvider) String() string { + return utils.ToJSON(xP) +} + +// FieldAsInterface is part of engine.DataProvider interface +func (xP *XmlProvider) FieldAsInterface(fldPath []string) (data interface{}, err error) { + if len(fldPath) == 0 { + return nil, utils.ErrNotFound + } + if xP.pathPrfx != utils.EmptyString && fldPath[0] != xP.pathPrfx { + return "", utils.ErrPrefixNotFound(strings.Join(fldPath, utils.NestingSep)) + } + if data, err = xP.cache.FieldAsInterface(fldPath); err == nil || + err != utils.ErrNotFound { // item found in cache + return + } + err = nil // cancel previous err + relPath := utils.HierarchyPath(fldPath[len(xP.cdrPath)+1:]) // Need relative path to the xmlElmnt + if xP.pathPrfx == utils.EmptyString { // temporary fix untile re remove cdrc + relPath = utils.HierarchyPath(fldPath[len(xP.cdrPath):]) + } + var slctrStr string + for i := range relPath { + if sIdx := strings.Index(relPath[i], "["); sIdx != -1 { + slctrStr = relPath[i][sIdx:] + if slctrStr[len(slctrStr)-1:] != "]" { + return nil, fmt.Errorf("filter rule <%s> needs to end in ]", slctrStr) + } + relPath[i] = relPath[i][:sIdx] + if slctrStr[1:2] != "@" { + i, err := strconv.Atoi(slctrStr[1 : len(slctrStr)-1]) + if err != nil { + return nil, err + } + slctrStr = "[" + strconv.Itoa(i+1) + "]" + } + relPath[i] = relPath[i] + slctrStr + } + } + data, err = ElementText(xP.req, relPath.AsString("/", false)) + xP.cache.Set(fldPath, data, false, false) + return +} + +// FieldAsString is part of engine.DataProvider interface +func (xP *XmlProvider) FieldAsString(fldPath []string) (data string, err error) { + var valIface interface{} + valIface, err = xP.FieldAsInterface(fldPath) + if err != nil { + return + } + return utils.IfaceAsString(valIface), nil +} + +// AsNavigableMap is part of engine.DataProvider interface +func (xP *XmlProvider) AsNavigableMap([]*FCTemplate) ( + nm *NavigableMap, err error) { + return nil, utils.ErrNotImplemented +} + +// RemoteHost is part of engine.DataProvider interface +func (xP *XmlProvider) RemoteHost() net.Addr { + return utils.LocalAddr() +} + +// ElementText will process the node to extract the elementName's text out of it (only first one found) +// returns utils.ErrNotFound if the element is not found in the node +// Make the method exportable until we remove the cdrc +func ElementText(xmlElement *xmlquery.Node, elmntPath string) (string, error) { + elmnt := xmlquery.FindOne(xmlElement, elmntPath) + if elmnt == nil { + return "", utils.ErrNotFound + } + return elmnt.InnerText(), nil +} diff --git a/config/xmldp_test.go b/config/xmldp_test.go new file mode 100644 index 000000000..76f93786d --- /dev/null +++ b/config/xmldp_test.go @@ -0,0 +1,440 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package config + +import ( + "path" + "strings" + "testing" + + "github.com/antchfx/xmlquery" + "github.com/cgrates/cgrates/utils" +) + +var cdrXmlBroadsoft = ` + + + + + + 0002183384 + CGRateSaabb + 20160419210000.104 + 1+020000 + + Start + + + + + + 0002183385 + CGRateSaabb + 20160419210005.247 + 1+020000 + + MBC + Normal + + + 1001 + 2001 + Terminating + Network + 1001 + Public + +4986517174963 + 20160419210005.247 + 1+020000 + 25160047719:0 + Yes + 20160419210006.813 + 20160419210020.296 + 016 + y + local + 1001@cgrates.org + Yes + Yes + + + CGR_GROUP + CGR_GROUP/CGR_GROUP_TRUNK30 + Normal + + + 1001@cgrates.org + Primary Device + + + 31.882 + + + gw04.cgrates.org + 74122796919420162305@172.16.1.2 + PCMA/8000 + 172.16.1.4 + BW2300052501904161738474465@172.16.1.10 + 31.882 + OmniPCX Enterprise R11.0.1 k1.520.22.b + + + + + + 0002183386 + CGRateSaabb + 20160419210006.909 + 1+020000 + + MBC + Normal + + + 1002 + 2001 + Terminating + Network + +4986517174964 + Public + 1002 + 20160419210006.909 + 1+020000 + 27280048121:0 + Yes + 20160419210007.037 + 20160419210030.322 + 016 + y + local + 314028947650@cgrates.org + Yes + Yes + + + CGR_GROUP + CGR_GROUP/CGR_GROUP_TRUNK65 + Normal + + + 31403456100@cgrates.org + Primary Device + + + 26.244 + + + gw01.cgrates.org + 108352493719420162306@172.31.250.150 + PCMA/8000 + 172.16.1.4 + 2345300069121904161716512907@172.16.1.10 + 26.244 + Altitude vBox + + + + + + 0002183486 + CGRateSaabb + 20160419211500.104 + 1+020000 + + End + + +` + +func TestXMLElementText(t *testing.T) { + doc, err := xmlquery.Parse(strings.NewReader(cdrXmlBroadsoft)) + if err != nil { + t.Error(err) + } + cdrs := xmlquery.Find(doc, path.Join("/broadWorksCDR/cdrData/")) + cdrWithoutUserNr := cdrs[0] + if _, err := ElementText(cdrWithoutUserNr, "basicModule/userNumber"); err != utils.ErrNotFound { + t.Error(err) + } + cdrWithUser := cdrs[1] + if val, err := ElementText(cdrWithUser, "basicModule/userNumber"); err != nil { + t.Error(err) + } else if val != "1001" { + t.Errorf("Expecting: 1001, received: %s", val) + } + if val, err := ElementText(cdrWithUser, "centrexModule/locationList/locationInformation/locationType"); err != nil { + t.Error(err) + } else if val != "Primary Device" { + t.Errorf("Expecting: , received: <%s>", val) + } +} + +var xmlContent = ` + + + Metaswitch CFS + + 1510225200002 + + + + National + + Orig + 19 + 480 + No answer + 223007622 + + +27110493421 + +27110493421 + +27110493421 + Ro_test + 0gQAAC8WAAACBAAALxYAAD57SAEV7ekv/OSKkO7qmD82OmbfHO+Z7wIZJkXdCv8z@10.170.248.200 + + + + 1 + 0 + + 8824071D@10.170.248.140 + 19 + 480 + + + 0763371551 + +270763371551 + 0763371551 + 110493421 + 110493421 + + + False + NetworkDefault + 0 + + + Speech + + 13442698e525ad2c21251f76479ab2b4 + voice.lab.liquidtelecom.net + + + 1510225513055 + 1510225513304 + 1510225514836 + + 1510225516981 + 1510225516981 + 1510225516981 + + + Premium + + Orig + 16 + No error + 223007623 + + +27110493421 + +27110493421 + +27110493421 + Ro_test + 0gQAAC8WAAACBAAALxYAAPkyWDO29Do1SyxNi5UV71mJYEIEkfNa9wCFCCjY2asU@10.170.248.200 + + + + 1 + 0 + + 8E450FA1@10.170.248.140 + + + 0843073451 + +270843073451 + 0843073451 + 110493421 + 110493421 + + + False + NetworkDefault + 0 + + + Speech + + 46d7974398c2671016afccc3f2c428c7 + voice.lab.liquidtelecom.net + + + 1510225531933 + 1510225532183 + 1510225534973 + 1510225539364 + 1510225593101 + 1510225593101 + 1510225593101 + + + International + + Orig + 16 + No error + 223007624 + + +27110493421 + +27110493421 + +27110493421 + Ro_test + 0gQAAC8WAAACBAAALxYAAJrUscTicyU5GtjPyQnpAeuNmz9p/bdOoR/Mk9RXciOI@10.170.248.200 + + + + 1 + 0 + + BC8B2801@10.170.248.140 + + + 263772822306 + +263772822306 + 263772822306 + 110493421 + 110493421 + + + False + NetworkDefault + 0 + + + Speech + + 750b8b73e41ba7b59b21240758522268 + voice.lab.liquidtelecom.net + + + 1510225865894 + 1510225866144 + 1510225866756 + 1510225876243 + 1510225916144 + 1510225916144 + 1510225916144 + + + + 1510227591467 + 3 + 0 + 0 + + +` + +func TestXMLElementText3(t *testing.T) { + doc, err := xmlquery.Parse(strings.NewReader(xmlContent)) + if err != nil { + t.Error(err) + } + hPath2 := utils.ParseHierarchyPath("File.CDRs.Call", "") + cdrs := xmlquery.Find(doc, hPath2.AsString("/", true)) + if len(cdrs) != 3 { + t.Errorf("Expecting: 3, received: %+v", len(cdrs)) + } + if _, err := ElementText(cdrs[0], "SignalingInfo/PChargingVector/test"); err != utils.ErrNotFound { + t.Error(err) + } + + if val, err := ElementText(cdrs[1], "SignalingInfo/PChargingVector/icidvalue"); err != nil { + t.Error(err) + } else if val != "46d7974398c2671016afccc3f2c428c7" { + t.Errorf("Expecting: 46d7974398c2671016afccc3f2c428c7, received: %s", val) + } +} + +var xmlMultipleIndex = ` + 2005-08-26T14:16:42 + 2005-08-26T14:16:56 + 2005-08-26T14:17:34 + My Call Reference + 386 + sampleusername + 1 + Conecto LLC + US$0.21 + yes + no + US$0.13 + 44 + + +441624828505 + Isle of Man + 38 + US$0.0200 + US$0.0140 + US$0.0130 + US$0.0082 + + + +44 7624 494075 + Isle of Man + 37 + US$0.2700 + US$0.1890 + US$0.1880 + US$0.1159 + + +` + +func TestXMLIndexes(t *testing.T) { + doc, err := xmlquery.Parse(strings.NewReader(xmlMultipleIndex)) + if err != nil { + t.Error(err) + } + dP := NewXmlProvider(doc, utils.HierarchyPath([]string{}), utils.MetaReq) + if data, err := dP.FieldAsString([]string{"*req", "complete-success-notification", "userid"}); err != nil { + t.Error(err) + } else if data != "386" { + t.Errorf("expecting: 386, received: <%s>", data) + } + if data, err := dP.FieldAsString([]string{"*req", "complete-success-notification", "username"}); err != nil { + t.Error(err) + } else if data != "sampleusername" { + t.Errorf("expecting: sampleusername, received: <%s>", data) + } + if data, err := dP.FieldAsString([]string{"*req", "complete-success-notification", "callleg", "seconds"}); err != nil { + t.Error(err) + } else if data != "38" { + t.Errorf("expecting: 38, received: <%s>", data) + } + if data, err := dP.FieldAsString([]string{"*req", "complete-success-notification", "callleg[1]", "seconds"}); err != nil { + t.Error(err) + } else if data != "37" { + t.Errorf("expecting: 37, received: <%s>", data) + } + if data, err := dP.FieldAsString([]string{"*req", "complete-success-notification", "callleg[@calllegid='222147']", "seconds"}); err != nil { + t.Error(err) + } else if data != "37" { + t.Errorf("expecting: 37, received: <%s>", data) + } +} diff --git a/data/conf/samples/ers/cgrates.json b/data/conf/samples/ers/cgrates.json index 87576f31d..d9946cd2a 100644 --- a/data/conf/samples/ers/cgrates.json +++ b/data/conf/samples/ers/cgrates.json @@ -192,6 +192,28 @@ {"tag": "HDRExtra1", "field_id": "HDRExtra1", "type": "*composed", "value": "~*req.6", "mandatory": true}, ], }, + { + "id": "XmlDryRun", + "run_delay": -1, + "type": "*file_xml", + "source_path": "/tmp/xmlErs/in", + "flags": ["*cdrs","*log"], + "processed_path": "/tmp/xmlErs/out", + "xml_root_path": "broadWorksCDR.cdrData", + "content_fields":[ + {"tag": "TOR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true}, + {"tag": "OriginID", "field_id": "OriginID", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.localCallId", "mandatory": true}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*rated", "mandatory": true}, + {"tag": "Direction", "field_id": "Direction", "type": "*constant", "value": "*out", "mandatory": true}, + {"tag": "Tenant", "field_id": "Tenant", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.userId:s/.*@(.*)/${1}/", "mandatory": true}, + {"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.userNumber", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.calledNumber", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.startTime", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.answerTime", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*usage_difference", "value": "~*req.broadWorksCDR.cdrData.basicModule.releaseTime;~*req.broadWorksCDR.cdrData.basicModule.answerTime", "mandatory": true} + ], + } ], }, diff --git a/ers/filecsv_it_test.go b/ers/filecsv_it_test.go index 9ccfec32a..315ad4d26 100644 --- a/ers/filecsv_it_test.go +++ b/ers/filecsv_it_test.go @@ -112,7 +112,8 @@ func testCsvITCreateCdrDirs(t *testing.T) { for _, dir := range []string{"/tmp/ers/in", "/tmp/ers/out", "/tmp/ers2/in", "/tmp/ers2/out", "/tmp/init_session/in", "/tmp/init_session/out", "/tmp/terminate_session/in", "/tmp/terminate_session/out", "/tmp/cdrs/in", - "/tmp/cdrs/out", "/tmp/ers_with_filters/in", "/tmp/ers_with_filters/out"} { + "/tmp/cdrs/out", "/tmp/ers_with_filters/in", "/tmp/ers_with_filters/out", + "/tmp/xmlErs/in", "/tmp/xmlErs/out"} { if err := os.RemoveAll(dir); err != nil { t.Fatal("Error removing folder: ", dir, err) } @@ -120,7 +121,6 @@ func testCsvITCreateCdrDirs(t *testing.T) { t.Fatal("Error creating folder: ", dir, err) } } - time.Sleep(10 * time.Second) } func testCsvITStartEngine(t *testing.T) { diff --git a/ers/filexml.go b/ers/filexml.go new file mode 100644 index 000000000..63d821b2b --- /dev/null +++ b/ers/filexml.go @@ -0,0 +1,173 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ers + +import ( + "fmt" + "io/ioutil" + "os" + "path" + "strings" + "sync" + "time" + + "github.com/antchfx/xmlquery" + + "github.com/cgrates/cgrates/agents" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func NewXMLFileER(cfg *config.CGRConfig, cfgIdx int, + rdrEvents chan *erEvent, rdrErr chan error, + fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { + srcPath := cfg.ERsCfg().Readers[cfgIdx].SourcePath + if strings.HasSuffix(srcPath, utils.Slash) { + srcPath = srcPath[:len(srcPath)-1] + } + return &XMLFileER{ + cgrCfg: cfg, + cfgIdx: cfgIdx, + fltrS: fltrS, + rdrDir: srcPath, + rdrEvents: rdrEvents, + rdrError: rdrErr, + rdrExit: rdrExit}, nil +} + +// XMLFileER implements EventReader interface for .xml files +type XMLFileER struct { + sync.RWMutex + cgrCfg *config.CGRConfig + cfgIdx int // index of config instance within ERsCfg.Readers + fltrS *engine.FilterS + rdrDir string + rdrEvents chan *erEvent // channel to dispatch the events created to + rdrError chan error + rdrExit chan struct{} + conReqs chan struct{} // limit number of opened files +} + +func (rdr *XMLFileER) Config() *config.EventReaderCfg { + return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] +} + +func (rdr *XMLFileER) Serve() (err error) { + switch rdr.Config().RunDelay { + case time.Duration(0): // 0 disables the automatic read, maybe done per API + return + case time.Duration(-1): + return watchDir(rdr.rdrDir, rdr.processFile, + utils.ERs, rdr.rdrExit) + default: + go func() { + for { + // Not automated, process and sleep approach + select { + case <-rdr.rdrExit: + utils.Logger.Info( + fmt.Sprintf("<%s> stop monitoring path <%s>", + utils.ERs, rdr.rdrDir)) + return + default: + } + filesInDir, _ := ioutil.ReadDir(rdr.rdrDir) + for _, file := range filesInDir { + if !strings.HasSuffix(file.Name(), utils.XMLSuffix) { // hardcoded file extension for xml event reader + continue // used in order to filter the files from directory + } + go func(fileName string) { + if err := rdr.processFile(rdr.rdrDir, fileName); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> processing file %s, error: %s", + utils.ERs, fileName, err.Error())) + } + }(file.Name()) + } + time.Sleep(rdr.Config().RunDelay) + } + }() + } + return +} + +// processFile is called for each file in a directory and dispatches erEvents from it +func (rdr *XMLFileER) processFile(fPath, fName string) (err error) { + if cap(rdr.conReqs) != 0 { // 0 goes for no limit + processFile := <-rdr.conReqs // Queue here for maxOpenFiles + defer func() { rdr.conReqs <- processFile }() + } + absPath := path.Join(fPath, fName) + utils.Logger.Info( + fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath)) + var file *os.File + if file, err = os.Open(absPath); err != nil { + return + } + defer file.Close() + doc, err := xmlquery.Parse(file) + if err != nil { + return err + } + xmlElmts := xmlquery.Find(doc, rdr.Config().XmlRootPath.AsString("/", true)) + rowNr := 0 // This counts the rows in the file, not really number of CDRs + evsPosted := 0 + timeStart := time.Now() + reqVars := make(map[string]interface{}) + for _, xmlElmt := range xmlElmts { + rowNr++ // increment the rowNr after checking if it's not the end of file + agReq := agents.NewAgentRequest( + config.NewXmlProvider(xmlElmt, rdr.Config().XmlRootPath, utils.EmptyString), reqVars, + nil, nil, rdr.Config().Tenant, + rdr.cgrCfg.GeneralCfg().DefaultTenant, + utils.FirstNonEmpty(rdr.Config().Timezone, + rdr.cgrCfg.GeneralCfg().DefaultTimezone), + rdr.fltrS) // create an AgentRequest + if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, + agReq); err != nil || !pass { + continue + } + navMp, err := agReq.AsNavigableMap(rdr.Config().ContentFields) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>", + utils.ERs, absPath, rowNr, err.Error())) + continue + } + rdr.rdrEvents <- &erEvent{cgrEvent: navMp.AsCGREvent( + agReq.Tenant, utils.NestingSep), + rdrCfg: rdr.Config()} + evsPosted++ + } + + if rdr.Config().ProcessedPath != "" { + // Finished with file, move it to processed folder + outPath := path.Join(rdr.Config().ProcessedPath, fName) + if err = os.Rename(absPath, outPath); err != nil { + return + } + } + + utils.Logger.Info( + fmt.Sprintf("%s finished processing file <%s>. Total records processed: %d, events posted: %d, run duration: %s", + utils.ERs, absPath, rowNr, evsPosted, time.Now().Sub(timeStart))) + return +} diff --git a/ers/filexml_it_test.go b/ers/filexml_it_test.go new file mode 100644 index 000000000..9fcbad22f --- /dev/null +++ b/ers/filexml_it_test.go @@ -0,0 +1,302 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package ers + +import ( + "io/ioutil" + "net/rpc" + "os" + "path" + "testing" + "time" + + "github.com/cgrates/cgrates/utils" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" +) + +var ( + xmlCfgPath string + xmlCfg *config.CGRConfig + xmlRPC *rpc.Client + + xmlTests = []func(t *testing.T){ + testXMLITCreateCdrDirs, + testXMLITInitConfig, + testXMLITInitCdrDb, + testXMLITResetDataDb, + testXMLITStartEngine, + testXMLITRpcConn, + testXMLITLoadTPFromFolder, + testXMLITHandleCdr1File, + testXmlITAnalyseCDRs, + testXMLITCleanupFiles, + testXMLITKillEngine, + } +) + +func TestXMLReadFile(t *testing.T) { + xmlCfgPath = path.Join(*dataDir, "conf", "samples", "ers") + for _, test := range xmlTests { + t.Run("TestXMLReadFile", test) + } +} + +func testXMLITCreateCdrDirs(t *testing.T) { + for _, dir := range []string{"/tmp/ers/in", "/tmp/ers/out", + "/tmp/ers2/in", "/tmp/ers2/out", "/tmp/init_session/in", "/tmp/init_session/out", + "/tmp/terminate_session/in", "/tmp/terminate_session/out", "/tmp/cdrs/in", + "/tmp/cdrs/out", "/tmp/ers_with_filters/in", "/tmp/ers_with_filters/out", + "/tmp/xmlErs/in", "/tmp/xmlErs/out"} { + if err := os.RemoveAll(dir); err != nil { + t.Fatal("Error removing folder: ", dir, err) + } + if err := os.MkdirAll(dir, 0755); err != nil { + t.Fatal("Error creating folder: ", dir, err) + } + } +} + +func testXMLITInitConfig(t *testing.T) { + var err error + if xmlCfg, err = config.NewCGRConfigFromPath(xmlCfgPath); err != nil { + t.Fatal("Got config error: ", err.Error()) + } +} + +// InitDb so we can rely on count +func testXMLITInitCdrDb(t *testing.T) { + if err := engine.InitStorDb(xmlCfg); err != nil { + t.Fatal(err) + } +} + +// Remove data in both rating and accounting db +func testXMLITResetDataDb(t *testing.T) { + if err := engine.InitDataDb(xmlCfg); err != nil { + t.Fatal(err) + } +} + +func testXMLITStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(xmlCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func testXMLITRpcConn(t *testing.T) { + var err error + xmlRPC, err = newRPCClient(xmlCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal("Could not connect to rater: ", err.Error()) + } +} + +func testXMLITLoadTPFromFolder(t *testing.T) { + attrs := &utils.AttrLoadTpFromFolder{ + FolderPath: path.Join(*dataDir, "tariffplans", "testit")} + var loadInst utils.LoadInstance + if err := xmlRPC.Call(utils.ApierV2LoadTariffPlanFromFolder, + attrs, &loadInst); err != nil { + t.Error(err) + } + time.Sleep(500 * time.Millisecond) +} + +var cdrXmlBroadsoft = ` + + + + + + 0002183384 + CGRateSaabb + 20160419210000.104 + 1+020000 + + Start + + + + + + 0002183385 + CGRateSaabb + 20160419210005.247 + 1+020000 + + MBC + Normal + + + 1001 + 2001 + Terminating + Network + 1001 + Public + +4915117174963 + 20160419210005.247 + 1+020000 + 25160047719:0 + Yes + 20160419210006.813 + 20160419210020.296 + 016 + y + local + 1001@cgrates.org + Yes + Yes + + + CGR_GROUP + CGR_GROUP/CGR_GROUP_TRUNK30 + Normal + + + 1001@cgrates.org + Primary Device + + + 31.882 + + + gw04.cgrates.org + 74122796919420162305@172.16.1.2 + PCMA/8000 + 172.16.1.4 + BW2300052501904161738474465@172.16.1.10 + 31.882 + OmniPCX Enterprise R11.0.1 k1.520.22.b + + + + + + 0002183386 + CGRateSaabb + 20160419210006.909 + 1+020000 + + MBC + Normal + + + 1002 + 2001 + Terminating + Network + +4986517174964 + Public + 1001 + 20160419210006.909 + 1+020000 + 27280048121:0 + Yes + 20160419210007.037 + 20160419210030.322 + 016 + y + local + 314028947650@cgrates.org + Yes + Yes + + + CGR_GROUP + CGR_GROUP/CGR_GROUP_TRUNK65 + Normal + + + 31403456100@cgrates.org + Primary Device + + + 26.244 + + + gw01.cgrates.org + 108352493719420162306@172.31.250.150 + PCMA/8000 + 172.16.1.4 + 2345300069121904161716512907@172.16.1.10 + 26.244 + Altitude vBox + + + + + + 0002183486 + CGRateSaabb + 20160419211500.104 + 1+020000 + + End + + +` + +// The default scenario, out of cdrc defined in .cfg file +func testXMLITHandleCdr1File(t *testing.T) { + fileName := "file1.xml" + tmpFilePath := path.Join("/tmp", fileName) + if err := ioutil.WriteFile(tmpFilePath, []byte(cdrXmlBroadsoft), 0644); err != nil { + t.Fatal(err.Error()) + } + if err := os.Rename(tmpFilePath, path.Join("/tmp/xmlErs/in", fileName)); err != nil { + t.Fatal("Error moving file to processing directory: ", err) + } + time.Sleep(100 * time.Millisecond) +} + +func testXmlITAnalyseCDRs(t *testing.T) { + var reply []*engine.ExternalCDR + if err := xmlRPC.Call(utils.ApierV2GetCDRs, utils.RPCCDRsFilter{}, &reply); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(reply) != 6 { + t.Error("Unexpected number of CDRs returned: ", len(reply)) + } + if err := xmlRPC.Call(utils.ApierV2GetCDRs, utils.RPCCDRsFilter{DestinationPrefixes: []string{"+4915117174963"}}, &reply); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(reply) != 3 { + t.Error("Unexpected number of CDRs returned: ", len(reply)) + } +} + +func testXMLITCleanupFiles(t *testing.T) { + for _, dir := range []string{"/tmp/ers", + "/tmp/ers2", "/tmp/init_session", + "/tmp/terminate_session", "/tmp/cdrs", + "/tmp/ers_with_filters", "/tmp/xmlErs"} { + if err := os.RemoveAll(dir); err != nil { + t.Fatal("Error removing folder: ", dir, err) + } + } +} + +func testXMLITKillEngine(t *testing.T) { + if err := engine.KillEngine(*waitRater); err != nil { + t.Error(err) + } +} diff --git a/ers/reader.go b/ers/reader.go index 97800312d..185796b87 100644 --- a/ers/reader.go +++ b/ers/reader.go @@ -40,6 +40,8 @@ func NewEventReader(cfg *config.CGRConfig, cfgIdx int, err = fmt.Errorf("unsupported reader type: <%s>", cfg.ERsCfg().Readers[cfgIdx].Type) case utils.MetaFileCSV: return NewCSVFileER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) + case utils.MetaFileXML: + return NewXMLFileER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) case utils.MetaKafkajsonMap: return NewKafkaER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) case utils.MetaSQL: diff --git a/utils/consts.go b/utils/consts.go index cecf42894..9421bb3c8 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -407,10 +407,10 @@ const ( UnsupportedMigrationTask = "unsupported migration task" NoStorDBConnection = "not connected to StorDB" UndefinedVersion = "undefined version" - UnsupportedDB = "unsupported database" TxtSuffix = ".txt" JSNSuffix = ".json" FormSuffix = ".form" + XMLSuffix = ".xml" CSVSuffix = ".csv" FWVSuffix = ".fwv" CONTENT_JSON = "json"