Add XML to EventReader and test for it

This commit is contained in:
TeoV
2020-01-11 14:52:37 +02:00
parent 8ed5fc3956
commit 4ea9cd3a48
18 changed files with 1126 additions and 223 deletions

View File

@@ -23,8 +23,6 @@ import (
"errors"
"fmt"
"io"
"net"
"strconv"
"strings"
"time"
@@ -34,17 +32,6 @@ import (
"github.com/cgrates/cgrates/utils"
)
// getElementText will process the node to extract the elementName's text out of it (only first one found)
// returns utils.ErrNotFound if the element is not found in the node
func elementText(xmlElement *xmlquery.Node, elmntPath string) (string, error) {
elmnt := xmlquery.FindOne(xmlElement, elmntPath)
if elmnt == nil {
return "", utils.ErrNotFound
}
return elmnt.InnerText(), nil
}
// handlerUsageDiff will calculate the usage as difference between timeEnd and timeStart
// Expects the 2 arguments in template separated by |
func handlerSubstractUsage(xmlElement *xmlquery.Node, argsTpl config.RSRParsers,
@@ -57,7 +44,7 @@ func handlerSubstractUsage(xmlElement *xmlquery.Node, argsTpl config.RSRParsers,
}
absolutePath := utils.ParseHierarchyPath(rsrArg.Rules, "")
relPath := utils.HierarchyPath(absolutePath[len(cdrPath)+1:]) // Need relative path to the xmlElmnt
argStr, _ := elementText(xmlElement, relPath.AsString("/", false))
argStr, _ := config.ElementText(xmlElement, relPath.AsString("/", false))
argsStr += argStr
}
handlerArgs := strings.Split(argsStr, utils.HandlerArgSep)
@@ -116,7 +103,7 @@ func (xmlProc *XMLRecordsProcessor) ProcessNextRecord() (cdrs []*engine.CDR, err
cdrs = make([]*engine.CDR, 0)
cdrXML := xmlProc.cdrXmlElmts[xmlProc.procItems]
xmlProc.procItems += 1
xmlProvider := newXmlProvider(cdrXML, xmlProc.cdrPath)
xmlProvider := config.NewXmlProvider(cdrXML, xmlProc.cdrPath, utils.MetaReq)
for _, cdrcCfg := range xmlProc.cdrcCfgs {
tenant, err := cdrcCfg.Tenant.ParseDataProvider(xmlProvider, utils.NestingSep)
if err != nil {
@@ -148,7 +135,7 @@ func (xmlProc *XMLRecordsProcessor) recordToCDR(xmlEntity *xmlquery.Node, cdrcCf
var lazyHttpFields []*config.FCTemplate
var err error
fldVals := make(map[string]string)
xmlProvider := newXmlProvider(xmlEntity, xmlProc.cdrPath)
xmlProvider := config.NewXmlProvider(xmlEntity, xmlProc.cdrPath, utils.MetaReq)
for _, cdrFldCfg := range cdrcCfg.ContentFields {
if len(cdrFldCfg.Filters) != 0 {
if pass, err := xmlProc.filterS.Pass(tenant,
@@ -207,80 +194,3 @@ func (xmlProc *XMLRecordsProcessor) recordToCDR(xmlEntity *xmlquery.Node, cdrcCf
}
return cdr, nil
}
// newXmlProvider constructs a DataProvider
func newXmlProvider(req *xmlquery.Node, cdrPath utils.HierarchyPath) (dP config.DataProvider) {
dP = &xmlProvider{req: req, cdrPath: cdrPath, cache: config.NewNavigableMap(nil)}
return
}
// xmlProvider implements engine.DataProvider so we can pass it to filters
type xmlProvider struct {
req *xmlquery.Node
cdrPath utils.HierarchyPath //used to compute relative path
cache *config.NavigableMap
}
// String is part of engine.DataProvider interface
// when called, it will display the already parsed values out of cache
func (xP *xmlProvider) String() string {
return utils.ToJSON(xP)
}
// FieldAsInterface is part of engine.DataProvider interface
func (xP *xmlProvider) FieldAsInterface(fldPath []string) (data interface{}, err error) {
if len(fldPath) == 0 {
return nil, utils.ErrNotFound
}
if fldPath[0] != utils.MetaReq {
return "", utils.ErrPrefixNotFound(strings.Join(fldPath, utils.NestingSep))
}
if data, err = xP.cache.FieldAsInterface(fldPath); err == nil ||
err != utils.ErrNotFound { // item found in cache
return
}
err = nil // cancel previous err
relPath := utils.HierarchyPath(fldPath[len(xP.cdrPath)+1:]) // Need relative path to the xmlElmnt
var slctrStr string
for i := range relPath {
if sIdx := strings.Index(relPath[i], "["); sIdx != -1 {
slctrStr = relPath[i][sIdx:]
if slctrStr[len(slctrStr)-1:] != "]" {
return nil, fmt.Errorf("filter rule <%s> needs to end in ]", slctrStr)
}
relPath[i] = relPath[i][:sIdx]
if slctrStr[1:2] != "@" {
i, err := strconv.Atoi(slctrStr[1 : len(slctrStr)-1])
if err != nil {
return nil, err
}
slctrStr = "[" + strconv.Itoa(i+1) + "]"
}
relPath[i] = relPath[i] + slctrStr
}
}
data, err = elementText(xP.req, relPath.AsString("/", false))
xP.cache.Set(fldPath, data, false, false)
return
}
// FieldAsString is part of engine.DataProvider interface
func (xP *xmlProvider) FieldAsString(fldPath []string) (data string, err error) {
var valIface interface{}
valIface, err = xP.FieldAsInterface(fldPath)
if err != nil {
return
}
return utils.IfaceAsString(valIface), nil
}
// AsNavigableMap is part of engine.DataProvider interface
func (xP *xmlProvider) AsNavigableMap([]*config.FCTemplate) (
nm *config.NavigableMap, err error) {
return nil, utils.ErrNotImplemented
}
// RemoteHost is part of engine.DataProvider interface
func (xP *xmlProvider) RemoteHost() net.Addr {
return utils.LocalAddr()
}

View File

@@ -132,7 +132,6 @@ func TestXmlITAnalyseCDRs(t *testing.T) {
} else if len(reply) != 1 {
t.Error("Unexpected number of CDRs returned: ", len(reply))
}
}
func TestXmlITKillEngine(t *testing.T) {

View File

@@ -166,28 +166,40 @@ var cdrXmlBroadsoft = `<?xml version="1.0" encoding="ISO-8859-1"?>
</cdrData>
</broadWorksCDR>`
func TestXMLElementText(t *testing.T) {
doc, err := xmlquery.Parse(strings.NewReader(cdrXmlBroadsoft))
if err != nil {
t.Error(err)
}
cdrs := xmlquery.Find(doc, path.Join("/broadWorksCDR/cdrData/"))
cdrWithoutUserNr := cdrs[0]
if _, err := elementText(cdrWithoutUserNr, "basicModule/userNumber"); err != utils.ErrNotFound {
t.Error(err)
}
cdrWithUser := cdrs[1]
if val, err := elementText(cdrWithUser, "basicModule/userNumber"); err != nil {
t.Error(err)
} else if val != "1001" {
t.Errorf("Expecting: 1001, received: %s", val)
}
if val, err := elementText(cdrWithUser, "centrexModule/locationList/locationInformation/locationType"); err != nil {
t.Error(err)
} else if val != "Primary Device" {
t.Errorf("Expecting: <Primary Device>, received: <%s>", val)
}
}
var xmlMultipleIndex = `<complete-success-notification callid="109870">
<createtime>2005-08-26T14:16:42</createtime>
<connecttime>2005-08-26T14:16:56</connecttime>
<endtime>2005-08-26T14:17:34</endtime>
<reference>My Call Reference</reference>
<userid>386</userid>
<username>sampleusername</username>
<customerid>1</customerid>
<companyname>Conecto LLC</companyname>
<totalcost amount="0.21" currency="USD">US$0.21</totalcost>
<hasrecording>yes</hasrecording>
<hasvoicemail>no</hasvoicemail>
<agenttotalcost amount="0.13" currency="USD">US$0.13</agenttotalcost>
<agentid>44</agentid>
<callleg calllegid="222146">
<number>+441624828505</number>
<description>Isle of Man</description>
<seconds>38</seconds>
<perminuterate amount="0.0200" currency="USD">US$0.0200</perminuterate>
<cost amount="0.0140" currency="USD">US$0.0140</cost>
<agentperminuterate amount="0.0130" currency="USD">US$0.0130</agentperminuterate>
<agentcost amount="0.0082" currency="USD">US$0.0082</agentcost>
</callleg>
<callleg calllegid="222147">
<number>+44 7624 494075</number>
<description>Isle of Man</description>
<seconds>37</seconds>
<perminuterate amount="0.2700" currency="USD">US$0.2700</perminuterate>
<cost amount="0.1890" currency="USD">US$0.1890</cost>
<agentperminuterate amount="0.1880" currency="USD">US$0.1880</agentperminuterate>
<agentcost amount="0.1159" currency="USD">US$0.1159</agentcost>
</callleg>
</complete-success-notification>
`
func TestXMLHandlerSubstractUsage(t *testing.T) {
doc, err := xmlquery.Parse(strings.NewReader(cdrXmlBroadsoft))
@@ -525,28 +537,6 @@ var xmlContent = `<?xml version="1.0" encoding="UTF-8"?>
</File>
`
func TestXMLElementText3(t *testing.T) {
doc, err := xmlquery.Parse(strings.NewReader(xmlContent))
if err != nil {
t.Error(err)
}
hPath2 := utils.ParseHierarchyPath("File.CDRs.Call", "")
cdrs := xmlquery.Find(doc, hPath2.AsString("/", true))
if len(cdrs) != 3 {
t.Errorf("Expecting: 3, received: %+v", len(cdrs))
}
if _, err := elementText(cdrs[0], "SignalingInfo/PChargingVector/test"); err != utils.ErrNotFound {
t.Error(err)
}
if val, err := elementText(cdrs[1], "SignalingInfo/PChargingVector/icidvalue"); err != nil {
t.Error(err)
} else if val != "46d7974398c2671016afccc3f2c428c7" {
t.Errorf("Expecting: 46d7974398c2671016afccc3f2c428c7, received: %s", val)
}
}
func TestXMLRPNestingSeparator(t *testing.T) {
cdrcCfgs := []*config.CdrcCfg{
{
@@ -627,71 +617,3 @@ func TestXMLRPNestingSeparator(t *testing.T) {
t.Errorf("Expecting: %+v\n, received: %+v\n", expectedCDRs, cdrs)
}
}
var xmlMultipleIndex = `<complete-success-notification callid="109870">
<createtime>2005-08-26T14:16:42</createtime>
<connecttime>2005-08-26T14:16:56</connecttime>
<endtime>2005-08-26T14:17:34</endtime>
<reference>My Call Reference</reference>
<userid>386</userid>
<username>sampleusername</username>
<customerid>1</customerid>
<companyname>Conecto LLC</companyname>
<totalcost amount="0.21" currency="USD">US$0.21</totalcost>
<hasrecording>yes</hasrecording>
<hasvoicemail>no</hasvoicemail>
<agenttotalcost amount="0.13" currency="USD">US$0.13</agenttotalcost>
<agentid>44</agentid>
<callleg calllegid="222146">
<number>+441624828505</number>
<description>Isle of Man</description>
<seconds>38</seconds>
<perminuterate amount="0.0200" currency="USD">US$0.0200</perminuterate>
<cost amount="0.0140" currency="USD">US$0.0140</cost>
<agentperminuterate amount="0.0130" currency="USD">US$0.0130</agentperminuterate>
<agentcost amount="0.0082" currency="USD">US$0.0082</agentcost>
</callleg>
<callleg calllegid="222147">
<number>+44 7624 494075</number>
<description>Isle of Man</description>
<seconds>37</seconds>
<perminuterate amount="0.2700" currency="USD">US$0.2700</perminuterate>
<cost amount="0.1890" currency="USD">US$0.1890</cost>
<agentperminuterate amount="0.1880" currency="USD">US$0.1880</agentperminuterate>
<agentcost amount="0.1159" currency="USD">US$0.1159</agentcost>
</callleg>
</complete-success-notification>
`
func TestXMLIndexes(t *testing.T) {
doc, err := xmlquery.Parse(strings.NewReader(xmlMultipleIndex))
if err != nil {
t.Error(err)
}
dP := newXmlProvider(doc, utils.HierarchyPath([]string{}))
if data, err := dP.FieldAsString([]string{"*req", "complete-success-notification", "userid"}); err != nil {
t.Error(err)
} else if data != "386" {
t.Errorf("expecting: 386, received: <%s>", data)
}
if data, err := dP.FieldAsString([]string{"*req", "complete-success-notification", "username"}); err != nil {
t.Error(err)
} else if data != "sampleusername" {
t.Errorf("expecting: sampleusername, received: <%s>", data)
}
if data, err := dP.FieldAsString([]string{"*req", "complete-success-notification", "callleg", "seconds"}); err != nil {
t.Error(err)
} else if data != "38" {
t.Errorf("expecting: 38, received: <%s>", data)
}
if data, err := dP.FieldAsString([]string{"*req", "complete-success-notification", "callleg[1]", "seconds"}); err != nil {
t.Error(err)
} else if data != "37" {
t.Errorf("expecting: 37, received: <%s>", data)
}
if data, err := dP.FieldAsString([]string{"*req", "complete-success-notification", "callleg[@calllegid='222147']", "seconds"}); err != nil {
t.Error(err)
} else if data != "37" {
t.Errorf("expecting: 37, received: <%s>", data)
}
}

View File

@@ -101,7 +101,7 @@ func (self *CdrcCfg) loadFromJsonCfg(jsnCfg *CdrcJsonCfg, separator string) erro
self.FailedCallsPrefix = *jsnCfg.Failed_calls_prefix
}
if jsnCfg.Cdr_root_path != nil {
self.CDRRootPath = utils.ParseHierarchyPath(*jsnCfg.Cdr_root_path, "")
self.CDRRootPath = utils.ParseHierarchyPath(*jsnCfg.Cdr_root_path, utils.EmptyString)
}
if jsnCfg.Cdr_source_id != nil {
self.CdrSourceId = *jsnCfg.Cdr_source_id

View File

@@ -301,7 +301,8 @@ var posibleLoaderTypes = utils.NewStringSet([]string{utils.MetaAttributes,
utils.MetaSuppliers, utils.MetaThresholds, utils.MetaChargers,
utils.MetaDispatchers, utils.MetaDispatcherHosts})
var possibleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.MetaKafkajsonMap})
var possibleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV,
utils.MetaKafkajsonMap, utils.MetaFileXML, utils.MetaSQL})
func (cfg *CGRConfig) LazySanityCheck() {
for _, cdrePrfl := range cfg.cdrsCfg.OnlineCDRExports {

View File

@@ -425,6 +425,7 @@ func TestCGRConfigReloadERs(t *testing.T) {
HeaderFields: []*FCTemplate{},
ContentFields: content,
TrailerFields: []*FCTemplate{},
XmlRootPath: utils.HierarchyPath{utils.EmptyString},
},
&EventReaderCfg{
ID: "file_reader1",
@@ -438,6 +439,7 @@ func TestCGRConfigReloadERs(t *testing.T) {
HeaderFields: []*FCTemplate{},
ContentFields: content,
TrailerFields: []*FCTemplate{},
XmlRootPath: utils.HierarchyPath{utils.EmptyString},
},
},
}
@@ -824,7 +826,7 @@ func TestCgrCfgV1ReloadConfigSection(t *testing.T) {
"Timezone": "",
"TrailerFields": []interface{}{},
"Type": "*file_csv",
"XmlRootPath": "",
"XmlRootPath": []interface{}{utils.EmptyString},
},
map[string]interface{}{
"ConcurrentReqs": 1024,
@@ -842,7 +844,7 @@ func TestCgrCfgV1ReloadConfigSection(t *testing.T) {
"Timezone": "",
"TrailerFields": []interface{}{},
"Type": "*file_csv",
"XmlRootPath": "",
"XmlRootPath": []interface{}{utils.EmptyString},
"ContentFields": content,
},
},

View File

@@ -1799,7 +1799,7 @@ func TestCgrCdfEventReader(t *testing.T) {
ConcurrentReqs: 1024,
SourcePath: "/var/spool/cgrates/cdrc/in",
ProcessedPath: "/var/spool/cgrates/cdrc/out",
XmlRootPath: utils.EmptyString,
XmlRootPath: utils.HierarchyPath{utils.EmptyString},
Tenant: nil,
Timezone: utils.EmptyString,
Filters: []string{},
@@ -1847,7 +1847,7 @@ func TestCgrCfgEventReaderDefault(t *testing.T) {
ConcurrentReqs: 1024,
SourcePath: "/var/spool/cgrates/cdrc/in",
ProcessedPath: "/var/spool/cgrates/cdrc/out",
XmlRootPath: utils.EmptyString,
XmlRootPath: utils.HierarchyPath{utils.EmptyString},
Tenant: nil,
Timezone: utils.EmptyString,
Filters: nil,

View File

@@ -415,7 +415,8 @@ func (cfg *CGRConfig) checkConfigSanity() error {
return fmt.Errorf("<%s> unsupported data type: %s for reader with ID: %s", utils.ERs, rdr.Type, rdr.ID)
}
if rdr.Type == utils.MetaFileCSV {
switch rdr.Type {
case utils.MetaFileCSV:
for _, dir := range []string{rdr.ProcessedPath, rdr.SourcePath} {
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
return fmt.Errorf("<%s> Nonexistent folder: %s for reader with ID: %s", utils.ERs, dir, rdr.ID)
@@ -424,9 +425,16 @@ func (cfg *CGRConfig) checkConfigSanity() error {
if rdr.FieldSep == utils.EmptyString {
return fmt.Errorf("<%s> empty FieldSep for reader with ID: %s", utils.ERs, rdr.ID)
}
}
if rdr.Type == utils.MetaKafkajsonMap && rdr.RunDelay > 0 {
return fmt.Errorf("<%s> RunDelay field can not be bigger than zero for reader with ID: %s", utils.ERs, rdr.ID)
case utils.MetaKafkajsonMap:
if rdr.RunDelay > 0 {
return fmt.Errorf("<%s> RunDelay field can not be bigger than zero for reader with ID: %s", utils.ERs, rdr.ID)
}
case utils.MetaFileXML:
for _, dir := range []string{rdr.ProcessedPath, rdr.SourcePath} {
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
return fmt.Errorf("<%s> Nonexistent folder: %s for reader with ID: %s", utils.ERs, dir, rdr.ID)
}
}
}
}
}

View File

@@ -106,7 +106,7 @@ type EventReaderCfg struct {
ConcurrentReqs int
SourcePath string
ProcessedPath string
XmlRootPath string
XmlRootPath utils.HierarchyPath
Tenant RSRParsers
Timezone string
Filters []string
@@ -142,7 +142,7 @@ func (er *EventReaderCfg) loadFromJsonCfg(jsnCfg *EventReaderJsonCfg, sep string
er.ProcessedPath = *jsnCfg.Processed_path
}
if jsnCfg.Xml_root_path != nil {
er.XmlRootPath = *jsnCfg.Xml_root_path
er.XmlRootPath = utils.ParseHierarchyPath(*jsnCfg.Xml_root_path, utils.EmptyString)
}
if jsnCfg.Tenant != nil {
if er.Tenant, err = NewRSRParsers(*jsnCfg.Tenant, true, sep); err != nil {

View File

@@ -108,7 +108,7 @@ func TestEventReaderLoadFromJSON(t *testing.T) {
ConcurrentReqs: 1024,
SourcePath: "/var/spool/cgrates/cdrc/in",
ProcessedPath: "/var/spool/cgrates/cdrc/out",
XmlRootPath: utils.EmptyString,
XmlRootPath: utils.HierarchyPath{utils.EmptyString},
Tenant: nil,
Timezone: utils.EmptyString,
Filters: []string{},
@@ -148,7 +148,7 @@ func TestEventReaderLoadFromJSON(t *testing.T) {
ConcurrentReqs: 1024,
SourcePath: "/tmp/ers/in",
ProcessedPath: "/tmp/ers/out",
XmlRootPath: utils.EmptyString,
XmlRootPath: utils.HierarchyPath{utils.EmptyString},
Tenant: nil,
Timezone: utils.EmptyString,
Filters: nil,

122
config/xmldp.go Normal file
View File

@@ -0,0 +1,122 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package config
import (
"fmt"
"net"
"strconv"
"strings"
"github.com/antchfx/xmlquery"
"github.com/cgrates/cgrates/utils"
)
// NewXmlProvider constructs a DataProvider
func NewXmlProvider(req *xmlquery.Node, cdrPath utils.HierarchyPath, pathPrfx string) (dP DataProvider) {
dP = &XmlProvider{req: req, cdrPath: cdrPath, cache: NewNavigableMap(nil), pathPrfx: pathPrfx}
return
}
// XmlProvider implements engine.DataProvider so we can pass it to filters
type XmlProvider struct {
req *xmlquery.Node
cdrPath utils.HierarchyPath //used to compute relative path
cache *NavigableMap
pathPrfx string // if this comes in path it will be ignored
// pathPrfx should be reviewed once the cdrc is removed
}
// String is part of engine.DataProvider interface
// when called, it will display the already parsed values out of cache
func (xP *XmlProvider) String() string {
return utils.ToJSON(xP)
}
// FieldAsInterface is part of engine.DataProvider interface
func (xP *XmlProvider) FieldAsInterface(fldPath []string) (data interface{}, err error) {
if len(fldPath) == 0 {
return nil, utils.ErrNotFound
}
if xP.pathPrfx != utils.EmptyString && fldPath[0] != xP.pathPrfx {
return "", utils.ErrPrefixNotFound(strings.Join(fldPath, utils.NestingSep))
}
if data, err = xP.cache.FieldAsInterface(fldPath); err == nil ||
err != utils.ErrNotFound { // item found in cache
return
}
err = nil // cancel previous err
relPath := utils.HierarchyPath(fldPath[len(xP.cdrPath)+1:]) // Need relative path to the xmlElmnt
if xP.pathPrfx == utils.EmptyString { // temporary fix untile re remove cdrc
relPath = utils.HierarchyPath(fldPath[len(xP.cdrPath):])
}
var slctrStr string
for i := range relPath {
if sIdx := strings.Index(relPath[i], "["); sIdx != -1 {
slctrStr = relPath[i][sIdx:]
if slctrStr[len(slctrStr)-1:] != "]" {
return nil, fmt.Errorf("filter rule <%s> needs to end in ]", slctrStr)
}
relPath[i] = relPath[i][:sIdx]
if slctrStr[1:2] != "@" {
i, err := strconv.Atoi(slctrStr[1 : len(slctrStr)-1])
if err != nil {
return nil, err
}
slctrStr = "[" + strconv.Itoa(i+1) + "]"
}
relPath[i] = relPath[i] + slctrStr
}
}
data, err = ElementText(xP.req, relPath.AsString("/", false))
xP.cache.Set(fldPath, data, false, false)
return
}
// FieldAsString is part of engine.DataProvider interface
func (xP *XmlProvider) FieldAsString(fldPath []string) (data string, err error) {
var valIface interface{}
valIface, err = xP.FieldAsInterface(fldPath)
if err != nil {
return
}
return utils.IfaceAsString(valIface), nil
}
// AsNavigableMap is part of engine.DataProvider interface
func (xP *XmlProvider) AsNavigableMap([]*FCTemplate) (
nm *NavigableMap, err error) {
return nil, utils.ErrNotImplemented
}
// RemoteHost is part of engine.DataProvider interface
func (xP *XmlProvider) RemoteHost() net.Addr {
return utils.LocalAddr()
}
// ElementText will process the node to extract the elementName's text out of it (only first one found)
// returns utils.ErrNotFound if the element is not found in the node
// Make the method exportable until we remove the cdrc
func ElementText(xmlElement *xmlquery.Node, elmntPath string) (string, error) {
elmnt := xmlquery.FindOne(xmlElement, elmntPath)
if elmnt == nil {
return "", utils.ErrNotFound
}
return elmnt.InnerText(), nil
}

440
config/xmldp_test.go Normal file
View File

@@ -0,0 +1,440 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package config
import (
"path"
"strings"
"testing"
"github.com/antchfx/xmlquery"
"github.com/cgrates/cgrates/utils"
)
var cdrXmlBroadsoft = `<?xml version="1.0" encoding="ISO-8859-1"?>
<!DOCTYPE broadWorksCDR>
<broadWorksCDR version="19.0">
<cdrData>
<headerModule>
<recordId>
<eventCounter>0002183384</eventCounter>
<systemId>CGRateSaabb</systemId>
<date>20160419210000.104</date>
<systemTimeZone>1+020000</systemTimeZone>
</recordId>
<type>Start</type>
</headerModule>
</cdrData>
<cdrData>
<headerModule>
<recordId>
<eventCounter>0002183385</eventCounter>
<systemId>CGRateSaabb</systemId>
<date>20160419210005.247</date>
<systemTimeZone>1+020000</systemTimeZone>
</recordId>
<serviceProvider>MBC</serviceProvider>
<type>Normal</type>
</headerModule>
<basicModule>
<userNumber>1001</userNumber>
<groupNumber>2001</groupNumber>
<direction>Terminating</direction>
<asCallType>Network</asCallType>
<callingNumber>1001</callingNumber>
<callingPresentationIndicator>Public</callingPresentationIndicator>
<calledNumber>+4986517174963</calledNumber>
<startTime>20160419210005.247</startTime>
<userTimeZone>1+020000</userTimeZone>
<localCallId>25160047719:0</localCallId>
<answerIndicator>Yes</answerIndicator>
<answerTime>20160419210006.813</answerTime>
<releaseTime>20160419210020.296</releaseTime>
<terminationCause>016</terminationCause>
<chargeIndicator>y</chargeIndicator>
<releasingParty>local</releasingParty>
<userId>1001@cgrates.org</userId>
<clidPermitted>Yes</clidPermitted>
<namePermitted>Yes</namePermitted>
</basicModule>
<centrexModule>
<group>CGR_GROUP</group>
<trunkGroupName>CGR_GROUP/CGR_GROUP_TRUNK30</trunkGroupName>
<trunkGroupInfo>Normal</trunkGroupInfo>
<locationList>
<locationInformation>
<location>1001@cgrates.org</location>
<locationType>Primary Device</locationType>
</locationInformation>
</locationList>
<locationUsage>31.882</locationUsage>
</centrexModule>
<ipModule>
<route>gw04.cgrates.org</route>
<networkCallID>74122796919420162305@172.16.1.2</networkCallID>
<codec>PCMA/8000</codec>
<accessDeviceAddress>172.16.1.4</accessDeviceAddress>
<accessCallID>BW2300052501904161738474465@172.16.1.10</accessCallID>
<codecUsage>31.882</codecUsage>
<userAgent>OmniPCX Enterprise R11.0.1 k1.520.22.b</userAgent>
</ipModule>
</cdrData>
<cdrData>
<headerModule>
<recordId>
<eventCounter>0002183386</eventCounter>
<systemId>CGRateSaabb</systemId>
<date>20160419210006.909</date>
<systemTimeZone>1+020000</systemTimeZone>
</recordId>
<serviceProvider>MBC</serviceProvider>
<type>Normal</type>
</headerModule>
<basicModule>
<userNumber>1002</userNumber>
<groupNumber>2001</groupNumber>
<direction>Terminating</direction>
<asCallType>Network</asCallType>
<callingNumber>+4986517174964</callingNumber>
<callingPresentationIndicator>Public</callingPresentationIndicator>
<calledNumber>1002</calledNumber>
<startTime>20160419210006.909</startTime>
<userTimeZone>1+020000</userTimeZone>
<localCallId>27280048121:0</localCallId>
<answerIndicator>Yes</answerIndicator>
<answerTime>20160419210007.037</answerTime>
<releaseTime>20160419210030.322</releaseTime>
<terminationCause>016</terminationCause>
<chargeIndicator>y</chargeIndicator>
<releasingParty>local</releasingParty>
<userId>314028947650@cgrates.org</userId>
<clidPermitted>Yes</clidPermitted>
<namePermitted>Yes</namePermitted>
</basicModule>
<centrexModule>
<group>CGR_GROUP</group>
<trunkGroupName>CGR_GROUP/CGR_GROUP_TRUNK65</trunkGroupName>
<trunkGroupInfo>Normal</trunkGroupInfo>
<locationList>
<locationInformation>
<location>31403456100@cgrates.org</location>
<locationType>Primary Device</locationType>
</locationInformation>
</locationList>
<locationUsage>26.244</locationUsage>
</centrexModule>
<ipModule>
<route>gw01.cgrates.org</route>
<networkCallID>108352493719420162306@172.31.250.150</networkCallID>
<codec>PCMA/8000</codec>
<accessDeviceAddress>172.16.1.4</accessDeviceAddress>
<accessCallID>2345300069121904161716512907@172.16.1.10</accessCallID>
<codecUsage>26.244</codecUsage>
<userAgent>Altitude vBox</userAgent>
</ipModule>
</cdrData>
<cdrData>
<headerModule>
<recordId>
<eventCounter>0002183486</eventCounter>
<systemId>CGRateSaabb</systemId>
<date>20160419211500.104</date>
<systemTimeZone>1+020000</systemTimeZone>
</recordId>
<type>End</type>
</headerModule>
</cdrData>
</broadWorksCDR>`
func TestXMLElementText(t *testing.T) {
doc, err := xmlquery.Parse(strings.NewReader(cdrXmlBroadsoft))
if err != nil {
t.Error(err)
}
cdrs := xmlquery.Find(doc, path.Join("/broadWorksCDR/cdrData/"))
cdrWithoutUserNr := cdrs[0]
if _, err := ElementText(cdrWithoutUserNr, "basicModule/userNumber"); err != utils.ErrNotFound {
t.Error(err)
}
cdrWithUser := cdrs[1]
if val, err := ElementText(cdrWithUser, "basicModule/userNumber"); err != nil {
t.Error(err)
} else if val != "1001" {
t.Errorf("Expecting: 1001, received: %s", val)
}
if val, err := ElementText(cdrWithUser, "centrexModule/locationList/locationInformation/locationType"); err != nil {
t.Error(err)
} else if val != "Primary Device" {
t.Errorf("Expecting: <Primary Device>, received: <%s>", val)
}
}
var xmlContent = `<?xml version="1.0" encoding="UTF-8"?>
<File xmlns="http://www.metaswitch.com/cfs/billing/V1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" compatibility="9">
<FileHeader seqnum="169">
<EquipmentType>Metaswitch CFS</EquipmentType>
<EquipmentId></EquipmentId>
<CreateTime>1510225200002</CreateTime>
</FileHeader>
<CDRs>
<Call seqnum="0000000001" error="no" longcall="false" testcall="false" class="0" operator="false" correlator="397828983391" connected="false">
<CallType>National</CallType>
<Features/>
<ReleasingParty>Orig</ReleasingParty>
<ReleaseReason type="q850" loc="u">19</ReleaseReason>
<ReleaseReason type="sip">480</ReleaseReason>
<ReleaseReason type="internal">No answer</ReleaseReason>
<InternalIndex>223007622</InternalIndex>
<OrigParty xsi:type="BusinessLinePartyType" subscribergroup="Subscribers in Guernsey, NJ" billingtype="flat rate" privacy="false" cpc="normal" ani-ii="00">
<SubscriberAddr type="e164">+27110493421</SubscriberAddr>
<CallingPartyAddr type="e164">+27110493421</CallingPartyAddr>
<ChargeAddr type="e164">+27110493421</ChargeAddr>
<BusinessGroupName>Ro_test</BusinessGroupName>
<SIPCallId>0gQAAC8WAAACBAAALxYAAD57SAEV7ekv/OSKkO7qmD82OmbfHO+Z7wIZJkXdCv8z@10.170.248.200</SIPCallId>
</OrigParty>
<TermParty xsi:type="NetworkTrunkPartyType">
<TrunkGroup type="sip" trunkname="IMS Core">
<TrunkGroupId>1</TrunkGroupId>
<TrunkMemberId>0</TrunkMemberId>
</TrunkGroup>
<SIPCallId>8824071D@10.170.248.140</SIPCallId>
<Reason type="q850">19</Reason>
<Reason type="sip">480</Reason>
</TermParty>
<RoutingInfo>
<RequestedAddr type="unknown">0763371551</RequestedAddr>
<DestAddr type="e164">+270763371551</DestAddr>
<RoutedAddr type="national">0763371551</RoutedAddr>
<CallingPartyRoutedAddr type="national">110493421</CallingPartyRoutedAddr>
<CallingPartyOrigAddr type="national">110493421</CallingPartyOrigAddr>
</RoutingInfo>
<CarrierSelectInfo>
<CarrierOperatorInvolved>False</CarrierOperatorInvolved>
<SelectionMethod>NetworkDefault</SelectionMethod>
<NetworkId>0</NetworkId>
</CarrierSelectInfo>
<SignalingInfo>
<MediaCapabilityRequested>Speech</MediaCapabilityRequested>
<PChargingVector>
<icidvalue>13442698e525ad2c21251f76479ab2b4</icidvalue>
<origioi>voice.lab.liquidtelecom.net</origioi>
</PChargingVector>
</SignalingInfo>
<IcSeizeTime>1510225513055</IcSeizeTime>
<OgSeizeTime>1510225513304</OgSeizeTime>
<RingingTime>1510225514836</RingingTime>
<ConnectTime/>
<DisconnectTime>1510225516981</DisconnectTime>
<ReleaseTime>1510225516981</ReleaseTime>
<CompleteTime>1510225516981</CompleteTime>
</Call>
<Call seqnum="0000000002" error="no" longcall="false" testcall="false" class="0" operator="false" correlator="402123969565" connected="true">
<CallType>Premium</CallType>
<Features/>
<ReleasingParty>Orig</ReleasingParty>
<ReleaseReason type="q850" loc="u">16</ReleaseReason>
<ReleaseReason type="internal">No error</ReleaseReason>
<InternalIndex>223007623</InternalIndex>
<OrigParty xsi:type="BusinessLinePartyType" subscribergroup="Subscribers in Guernsey, NJ" billingtype="flat rate" privacy="false" cpc="normal" ani-ii="00">
<SubscriberAddr type="e164">+27110493421</SubscriberAddr>
<CallingPartyAddr type="e164">+27110493421</CallingPartyAddr>
<ChargeAddr type="e164">+27110493421</ChargeAddr>
<BusinessGroupName>Ro_test</BusinessGroupName>
<SIPCallId>0gQAAC8WAAACBAAALxYAAPkyWDO29Do1SyxNi5UV71mJYEIEkfNa9wCFCCjY2asU@10.170.248.200</SIPCallId>
</OrigParty>
<TermParty xsi:type="NetworkTrunkPartyType">
<TrunkGroup type="sip" trunkname="IMS Core">
<TrunkGroupId>1</TrunkGroupId>
<TrunkMemberId>0</TrunkMemberId>
</TrunkGroup>
<SIPCallId>8E450FA1@10.170.248.140</SIPCallId>
</TermParty>
<RoutingInfo>
<RequestedAddr type="unknown">0843073451</RequestedAddr>
<DestAddr type="e164">+270843073451</DestAddr>
<RoutedAddr type="national">0843073451</RoutedAddr>
<CallingPartyRoutedAddr type="national">110493421</CallingPartyRoutedAddr>
<CallingPartyOrigAddr type="national">110493421</CallingPartyOrigAddr>
</RoutingInfo>
<CarrierSelectInfo>
<CarrierOperatorInvolved>False</CarrierOperatorInvolved>
<SelectionMethod>NetworkDefault</SelectionMethod>
<NetworkId>0</NetworkId>
</CarrierSelectInfo>
<SignalingInfo>
<MediaCapabilityRequested>Speech</MediaCapabilityRequested>
<PChargingVector>
<icidvalue>46d7974398c2671016afccc3f2c428c7</icidvalue>
<origioi>voice.lab.liquidtelecom.net</origioi>
</PChargingVector>
</SignalingInfo>
<IcSeizeTime>1510225531933</IcSeizeTime>
<OgSeizeTime>1510225532183</OgSeizeTime>
<RingingTime>1510225534973</RingingTime>
<ConnectTime>1510225539364</ConnectTime>
<DisconnectTime>1510225593101</DisconnectTime>
<ReleaseTime>1510225593101</ReleaseTime>
<CompleteTime>1510225593101</CompleteTime>
</Call>
<Call seqnum="0000000003" error="no" longcall="false" testcall="false" class="0" operator="false" correlator="406419270822" connected="true">
<CallType>International</CallType>
<Features/>
<ReleasingParty>Orig</ReleasingParty>
<ReleaseReason type="q850" loc="u">16</ReleaseReason>
<ReleaseReason type="internal">No error</ReleaseReason>
<InternalIndex>223007624</InternalIndex>
<OrigParty xsi:type="BusinessLinePartyType" subscribergroup="Subscribers in Guernsey, NJ" billingtype="flat rate" privacy="false" cpc="normal" ani-ii="00">
<SubscriberAddr type="e164">+27110493421</SubscriberAddr>
<CallingPartyAddr type="e164">+27110493421</CallingPartyAddr>
<ChargeAddr type="e164">+27110493421</ChargeAddr>
<BusinessGroupName>Ro_test</BusinessGroupName>
<SIPCallId>0gQAAC8WAAACBAAALxYAAJrUscTicyU5GtjPyQnpAeuNmz9p/bdOoR/Mk9RXciOI@10.170.248.200</SIPCallId>
</OrigParty>
<TermParty xsi:type="NetworkTrunkPartyType">
<TrunkGroup type="sip" trunkname="IMS Core">
<TrunkGroupId>1</TrunkGroupId>
<TrunkMemberId>0</TrunkMemberId>
</TrunkGroup>
<SIPCallId>BC8B2801@10.170.248.140</SIPCallId>
</TermParty>
<RoutingInfo>
<RequestedAddr type="unknown">263772822306</RequestedAddr>
<DestAddr type="e164">+263772822306</DestAddr>
<RoutedAddr type="e164">263772822306</RoutedAddr>
<CallingPartyRoutedAddr type="national">110493421</CallingPartyRoutedAddr>
<CallingPartyOrigAddr type="national">110493421</CallingPartyOrigAddr>
</RoutingInfo>
<CarrierSelectInfo>
<CarrierOperatorInvolved>False</CarrierOperatorInvolved>
<SelectionMethod>NetworkDefault</SelectionMethod>
<NetworkId>0</NetworkId>
</CarrierSelectInfo>
<SignalingInfo>
<MediaCapabilityRequested>Speech</MediaCapabilityRequested>
<PChargingVector>
<icidvalue>750b8b73e41ba7b59b21240758522268</icidvalue>
<origioi>voice.lab.liquidtelecom.net</origioi>
</PChargingVector>
</SignalingInfo>
<IcSeizeTime>1510225865894</IcSeizeTime>
<OgSeizeTime>1510225866144</OgSeizeTime>
<RingingTime>1510225866756</RingingTime>
<ConnectTime>1510225876243</ConnectTime>
<DisconnectTime>1510225916144</DisconnectTime>
<ReleaseTime>1510225916144</ReleaseTime>
<CompleteTime>1510225916144</CompleteTime>
</Call>
</CDRs>
<FileFooter>
<LastModTime>1510227591467</LastModTime>
<NumCDRs>3</NumCDRs>
<DataErroredCDRs>0</DataErroredCDRs>
<WriteErroredCDRs>0</WriteErroredCDRs>
</FileFooter>
</File>
`
func TestXMLElementText3(t *testing.T) {
doc, err := xmlquery.Parse(strings.NewReader(xmlContent))
if err != nil {
t.Error(err)
}
hPath2 := utils.ParseHierarchyPath("File.CDRs.Call", "")
cdrs := xmlquery.Find(doc, hPath2.AsString("/", true))
if len(cdrs) != 3 {
t.Errorf("Expecting: 3, received: %+v", len(cdrs))
}
if _, err := ElementText(cdrs[0], "SignalingInfo/PChargingVector/test"); err != utils.ErrNotFound {
t.Error(err)
}
if val, err := ElementText(cdrs[1], "SignalingInfo/PChargingVector/icidvalue"); err != nil {
t.Error(err)
} else if val != "46d7974398c2671016afccc3f2c428c7" {
t.Errorf("Expecting: 46d7974398c2671016afccc3f2c428c7, received: %s", val)
}
}
var xmlMultipleIndex = `<complete-success-notification callid="109870">
<createtime>2005-08-26T14:16:42</createtime>
<connecttime>2005-08-26T14:16:56</connecttime>
<endtime>2005-08-26T14:17:34</endtime>
<reference>My Call Reference</reference>
<userid>386</userid>
<username>sampleusername</username>
<customerid>1</customerid>
<companyname>Conecto LLC</companyname>
<totalcost amount="0.21" currency="USD">US$0.21</totalcost>
<hasrecording>yes</hasrecording>
<hasvoicemail>no</hasvoicemail>
<agenttotalcost amount="0.13" currency="USD">US$0.13</agenttotalcost>
<agentid>44</agentid>
<callleg calllegid="222146">
<number>+441624828505</number>
<description>Isle of Man</description>
<seconds>38</seconds>
<perminuterate amount="0.0200" currency="USD">US$0.0200</perminuterate>
<cost amount="0.0140" currency="USD">US$0.0140</cost>
<agentperminuterate amount="0.0130" currency="USD">US$0.0130</agentperminuterate>
<agentcost amount="0.0082" currency="USD">US$0.0082</agentcost>
</callleg>
<callleg calllegid="222147">
<number>+44 7624 494075</number>
<description>Isle of Man</description>
<seconds>37</seconds>
<perminuterate amount="0.2700" currency="USD">US$0.2700</perminuterate>
<cost amount="0.1890" currency="USD">US$0.1890</cost>
<agentperminuterate amount="0.1880" currency="USD">US$0.1880</agentperminuterate>
<agentcost amount="0.1159" currency="USD">US$0.1159</agentcost>
</callleg>
</complete-success-notification>
`
func TestXMLIndexes(t *testing.T) {
doc, err := xmlquery.Parse(strings.NewReader(xmlMultipleIndex))
if err != nil {
t.Error(err)
}
dP := NewXmlProvider(doc, utils.HierarchyPath([]string{}), utils.MetaReq)
if data, err := dP.FieldAsString([]string{"*req", "complete-success-notification", "userid"}); err != nil {
t.Error(err)
} else if data != "386" {
t.Errorf("expecting: 386, received: <%s>", data)
}
if data, err := dP.FieldAsString([]string{"*req", "complete-success-notification", "username"}); err != nil {
t.Error(err)
} else if data != "sampleusername" {
t.Errorf("expecting: sampleusername, received: <%s>", data)
}
if data, err := dP.FieldAsString([]string{"*req", "complete-success-notification", "callleg", "seconds"}); err != nil {
t.Error(err)
} else if data != "38" {
t.Errorf("expecting: 38, received: <%s>", data)
}
if data, err := dP.FieldAsString([]string{"*req", "complete-success-notification", "callleg[1]", "seconds"}); err != nil {
t.Error(err)
} else if data != "37" {
t.Errorf("expecting: 37, received: <%s>", data)
}
if data, err := dP.FieldAsString([]string{"*req", "complete-success-notification", "callleg[@calllegid='222147']", "seconds"}); err != nil {
t.Error(err)
} else if data != "37" {
t.Errorf("expecting: 37, received: <%s>", data)
}
}

View File

@@ -192,6 +192,28 @@
{"tag": "HDRExtra1", "field_id": "HDRExtra1", "type": "*composed", "value": "~*req.6", "mandatory": true},
],
},
{
"id": "XmlDryRun",
"run_delay": -1,
"type": "*file_xml",
"source_path": "/tmp/xmlErs/in",
"flags": ["*cdrs","*log"],
"processed_path": "/tmp/xmlErs/out",
"xml_root_path": "broadWorksCDR.cdrData",
"content_fields":[
{"tag": "TOR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
{"tag": "OriginID", "field_id": "OriginID", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.localCallId", "mandatory": true},
{"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*rated", "mandatory": true},
{"tag": "Direction", "field_id": "Direction", "type": "*constant", "value": "*out", "mandatory": true},
{"tag": "Tenant", "field_id": "Tenant", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.userId:s/.*@(.*)/${1}/", "mandatory": true},
{"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call", "mandatory": true},
{"tag": "Account", "field_id": "Account", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.userNumber", "mandatory": true},
{"tag": "Destination", "field_id": "Destination", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.calledNumber", "mandatory": true},
{"tag": "SetupTime", "field_id": "SetupTime", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.startTime", "mandatory": true},
{"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.answerTime", "mandatory": true},
{"tag": "Usage", "field_id": "Usage", "type": "*usage_difference", "value": "~*req.broadWorksCDR.cdrData.basicModule.releaseTime;~*req.broadWorksCDR.cdrData.basicModule.answerTime", "mandatory": true}
],
}
],
},

View File

@@ -112,7 +112,8 @@ func testCsvITCreateCdrDirs(t *testing.T) {
for _, dir := range []string{"/tmp/ers/in", "/tmp/ers/out",
"/tmp/ers2/in", "/tmp/ers2/out", "/tmp/init_session/in", "/tmp/init_session/out",
"/tmp/terminate_session/in", "/tmp/terminate_session/out", "/tmp/cdrs/in",
"/tmp/cdrs/out", "/tmp/ers_with_filters/in", "/tmp/ers_with_filters/out"} {
"/tmp/cdrs/out", "/tmp/ers_with_filters/in", "/tmp/ers_with_filters/out",
"/tmp/xmlErs/in", "/tmp/xmlErs/out"} {
if err := os.RemoveAll(dir); err != nil {
t.Fatal("Error removing folder: ", dir, err)
}
@@ -120,7 +121,6 @@ func testCsvITCreateCdrDirs(t *testing.T) {
t.Fatal("Error creating folder: ", dir, err)
}
}
time.Sleep(10 * time.Second)
}
func testCsvITStartEngine(t *testing.T) {

173
ers/filexml.go Normal file
View File

@@ -0,0 +1,173 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package ers
import (
"fmt"
"io/ioutil"
"os"
"path"
"strings"
"sync"
"time"
"github.com/antchfx/xmlquery"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func NewXMLFileER(cfg *config.CGRConfig, cfgIdx int,
rdrEvents chan *erEvent, rdrErr chan error,
fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) {
srcPath := cfg.ERsCfg().Readers[cfgIdx].SourcePath
if strings.HasSuffix(srcPath, utils.Slash) {
srcPath = srcPath[:len(srcPath)-1]
}
return &XMLFileER{
cgrCfg: cfg,
cfgIdx: cfgIdx,
fltrS: fltrS,
rdrDir: srcPath,
rdrEvents: rdrEvents,
rdrError: rdrErr,
rdrExit: rdrExit}, nil
}
// XMLFileER implements EventReader interface for .xml files
type XMLFileER struct {
sync.RWMutex
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
fltrS *engine.FilterS
rdrDir string
rdrEvents chan *erEvent // channel to dispatch the events created to
rdrError chan error
rdrExit chan struct{}
conReqs chan struct{} // limit number of opened files
}
func (rdr *XMLFileER) Config() *config.EventReaderCfg {
return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx]
}
func (rdr *XMLFileER) Serve() (err error) {
switch rdr.Config().RunDelay {
case time.Duration(0): // 0 disables the automatic read, maybe done per API
return
case time.Duration(-1):
return watchDir(rdr.rdrDir, rdr.processFile,
utils.ERs, rdr.rdrExit)
default:
go func() {
for {
// Not automated, process and sleep approach
select {
case <-rdr.rdrExit:
utils.Logger.Info(
fmt.Sprintf("<%s> stop monitoring path <%s>",
utils.ERs, rdr.rdrDir))
return
default:
}
filesInDir, _ := ioutil.ReadDir(rdr.rdrDir)
for _, file := range filesInDir {
if !strings.HasSuffix(file.Name(), utils.XMLSuffix) { // hardcoded file extension for xml event reader
continue // used in order to filter the files from directory
}
go func(fileName string) {
if err := rdr.processFile(rdr.rdrDir, fileName); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing file %s, error: %s",
utils.ERs, fileName, err.Error()))
}
}(file.Name())
}
time.Sleep(rdr.Config().RunDelay)
}
}()
}
return
}
// processFile is called for each file in a directory and dispatches erEvents from it
func (rdr *XMLFileER) processFile(fPath, fName string) (err error) {
if cap(rdr.conReqs) != 0 { // 0 goes for no limit
processFile := <-rdr.conReqs // Queue here for maxOpenFiles
defer func() { rdr.conReqs <- processFile }()
}
absPath := path.Join(fPath, fName)
utils.Logger.Info(
fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath))
var file *os.File
if file, err = os.Open(absPath); err != nil {
return
}
defer file.Close()
doc, err := xmlquery.Parse(file)
if err != nil {
return err
}
xmlElmts := xmlquery.Find(doc, rdr.Config().XmlRootPath.AsString("/", true))
rowNr := 0 // This counts the rows in the file, not really number of CDRs
evsPosted := 0
timeStart := time.Now()
reqVars := make(map[string]interface{})
for _, xmlElmt := range xmlElmts {
rowNr++ // increment the rowNr after checking if it's not the end of file
agReq := agents.NewAgentRequest(
config.NewXmlProvider(xmlElmt, rdr.Config().XmlRootPath, utils.EmptyString), reqVars,
nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
rdr.fltrS) // create an AgentRequest
if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
agReq); err != nil || !pass {
continue
}
navMp, err := agReq.AsNavigableMap(rdr.Config().ContentFields)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>",
utils.ERs, absPath, rowNr, err.Error()))
continue
}
rdr.rdrEvents <- &erEvent{cgrEvent: navMp.AsCGREvent(
agReq.Tenant, utils.NestingSep),
rdrCfg: rdr.Config()}
evsPosted++
}
if rdr.Config().ProcessedPath != "" {
// Finished with file, move it to processed folder
outPath := path.Join(rdr.Config().ProcessedPath, fName)
if err = os.Rename(absPath, outPath); err != nil {
return
}
}
utils.Logger.Info(
fmt.Sprintf("%s finished processing file <%s>. Total records processed: %d, events posted: %d, run duration: %s",
utils.ERs, absPath, rowNr, evsPosted, time.Now().Sub(timeStart)))
return
}

302
ers/filexml_it_test.go Normal file
View File

@@ -0,0 +1,302 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package ers
import (
"io/ioutil"
"net/rpc"
"os"
"path"
"testing"
"time"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
)
var (
xmlCfgPath string
xmlCfg *config.CGRConfig
xmlRPC *rpc.Client
xmlTests = []func(t *testing.T){
testXMLITCreateCdrDirs,
testXMLITInitConfig,
testXMLITInitCdrDb,
testXMLITResetDataDb,
testXMLITStartEngine,
testXMLITRpcConn,
testXMLITLoadTPFromFolder,
testXMLITHandleCdr1File,
testXmlITAnalyseCDRs,
testXMLITCleanupFiles,
testXMLITKillEngine,
}
)
func TestXMLReadFile(t *testing.T) {
xmlCfgPath = path.Join(*dataDir, "conf", "samples", "ers")
for _, test := range xmlTests {
t.Run("TestXMLReadFile", test)
}
}
func testXMLITCreateCdrDirs(t *testing.T) {
for _, dir := range []string{"/tmp/ers/in", "/tmp/ers/out",
"/tmp/ers2/in", "/tmp/ers2/out", "/tmp/init_session/in", "/tmp/init_session/out",
"/tmp/terminate_session/in", "/tmp/terminate_session/out", "/tmp/cdrs/in",
"/tmp/cdrs/out", "/tmp/ers_with_filters/in", "/tmp/ers_with_filters/out",
"/tmp/xmlErs/in", "/tmp/xmlErs/out"} {
if err := os.RemoveAll(dir); err != nil {
t.Fatal("Error removing folder: ", dir, err)
}
if err := os.MkdirAll(dir, 0755); err != nil {
t.Fatal("Error creating folder: ", dir, err)
}
}
}
func testXMLITInitConfig(t *testing.T) {
var err error
if xmlCfg, err = config.NewCGRConfigFromPath(xmlCfgPath); err != nil {
t.Fatal("Got config error: ", err.Error())
}
}
// InitDb so we can rely on count
func testXMLITInitCdrDb(t *testing.T) {
if err := engine.InitStorDb(xmlCfg); err != nil {
t.Fatal(err)
}
}
// Remove data in both rating and accounting db
func testXMLITResetDataDb(t *testing.T) {
if err := engine.InitDataDb(xmlCfg); err != nil {
t.Fatal(err)
}
}
func testXMLITStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(xmlCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
// Connect rpc client to rater
func testXMLITRpcConn(t *testing.T) {
var err error
xmlRPC, err = newRPCClient(xmlCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
}
}
func testXMLITLoadTPFromFolder(t *testing.T) {
attrs := &utils.AttrLoadTpFromFolder{
FolderPath: path.Join(*dataDir, "tariffplans", "testit")}
var loadInst utils.LoadInstance
if err := xmlRPC.Call(utils.ApierV2LoadTariffPlanFromFolder,
attrs, &loadInst); err != nil {
t.Error(err)
}
time.Sleep(500 * time.Millisecond)
}
var cdrXmlBroadsoft = `<?xml version="1.0" encoding="ISO-8859-1"?>
<!DOCTYPE broadWorksCDR>
<broadWorksCDR version="19.0">
<cdrData>
<headerModule>
<recordId>
<eventCounter>0002183384</eventCounter>
<systemId>CGRateSaabb</systemId>
<date>20160419210000.104</date>
<systemTimeZone>1+020000</systemTimeZone>
</recordId>
<type>Start</type>
</headerModule>
</cdrData>
<cdrData>
<headerModule>
<recordId>
<eventCounter>0002183385</eventCounter>
<systemId>CGRateSaabb</systemId>
<date>20160419210005.247</date>
<systemTimeZone>1+020000</systemTimeZone>
</recordId>
<serviceProvider>MBC</serviceProvider>
<type>Normal</type>
</headerModule>
<basicModule>
<userNumber>1001</userNumber>
<groupNumber>2001</groupNumber>
<direction>Terminating</direction>
<asCallType>Network</asCallType>
<callingNumber>1001</callingNumber>
<callingPresentationIndicator>Public</callingPresentationIndicator>
<calledNumber>+4915117174963</calledNumber>
<startTime>20160419210005.247</startTime>
<userTimeZone>1+020000</userTimeZone>
<localCallId>25160047719:0</localCallId>
<answerIndicator>Yes</answerIndicator>
<answerTime>20160419210006.813</answerTime>
<releaseTime>20160419210020.296</releaseTime>
<terminationCause>016</terminationCause>
<chargeIndicator>y</chargeIndicator>
<releasingParty>local</releasingParty>
<userId>1001@cgrates.org</userId>
<clidPermitted>Yes</clidPermitted>
<namePermitted>Yes</namePermitted>
</basicModule>
<centrexModule>
<group>CGR_GROUP</group>
<trunkGroupName>CGR_GROUP/CGR_GROUP_TRUNK30</trunkGroupName>
<trunkGroupInfo>Normal</trunkGroupInfo>
<locationList>
<locationInformation>
<location>1001@cgrates.org</location>
<locationType>Primary Device</locationType>
</locationInformation>
</locationList>
<locationUsage>31.882</locationUsage>
</centrexModule>
<ipModule>
<route>gw04.cgrates.org</route>
<networkCallID>74122796919420162305@172.16.1.2</networkCallID>
<codec>PCMA/8000</codec>
<accessDeviceAddress>172.16.1.4</accessDeviceAddress>
<accessCallID>BW2300052501904161738474465@172.16.1.10</accessCallID>
<codecUsage>31.882</codecUsage>
<userAgent>OmniPCX Enterprise R11.0.1 k1.520.22.b</userAgent>
</ipModule>
</cdrData>
<cdrData>
<headerModule>
<recordId>
<eventCounter>0002183386</eventCounter>
<systemId>CGRateSaabb</systemId>
<date>20160419210006.909</date>
<systemTimeZone>1+020000</systemTimeZone>
</recordId>
<serviceProvider>MBC</serviceProvider>
<type>Normal</type>
</headerModule>
<basicModule>
<userNumber>1002</userNumber>
<groupNumber>2001</groupNumber>
<direction>Terminating</direction>
<asCallType>Network</asCallType>
<callingNumber>+4986517174964</callingNumber>
<callingPresentationIndicator>Public</callingPresentationIndicator>
<calledNumber>1001</calledNumber>
<startTime>20160419210006.909</startTime>
<userTimeZone>1+020000</userTimeZone>
<localCallId>27280048121:0</localCallId>
<answerIndicator>Yes</answerIndicator>
<answerTime>20160419210007.037</answerTime>
<releaseTime>20160419210030.322</releaseTime>
<terminationCause>016</terminationCause>
<chargeIndicator>y</chargeIndicator>
<releasingParty>local</releasingParty>
<userId>314028947650@cgrates.org</userId>
<clidPermitted>Yes</clidPermitted>
<namePermitted>Yes</namePermitted>
</basicModule>
<centrexModule>
<group>CGR_GROUP</group>
<trunkGroupName>CGR_GROUP/CGR_GROUP_TRUNK65</trunkGroupName>
<trunkGroupInfo>Normal</trunkGroupInfo>
<locationList>
<locationInformation>
<location>31403456100@cgrates.org</location>
<locationType>Primary Device</locationType>
</locationInformation>
</locationList>
<locationUsage>26.244</locationUsage>
</centrexModule>
<ipModule>
<route>gw01.cgrates.org</route>
<networkCallID>108352493719420162306@172.31.250.150</networkCallID>
<codec>PCMA/8000</codec>
<accessDeviceAddress>172.16.1.4</accessDeviceAddress>
<accessCallID>2345300069121904161716512907@172.16.1.10</accessCallID>
<codecUsage>26.244</codecUsage>
<userAgent>Altitude vBox</userAgent>
</ipModule>
</cdrData>
<cdrData>
<headerModule>
<recordId>
<eventCounter>0002183486</eventCounter>
<systemId>CGRateSaabb</systemId>
<date>20160419211500.104</date>
<systemTimeZone>1+020000</systemTimeZone>
</recordId>
<type>End</type>
</headerModule>
</cdrData>
</broadWorksCDR>`
// The default scenario, out of cdrc defined in .cfg file
func testXMLITHandleCdr1File(t *testing.T) {
fileName := "file1.xml"
tmpFilePath := path.Join("/tmp", fileName)
if err := ioutil.WriteFile(tmpFilePath, []byte(cdrXmlBroadsoft), 0644); err != nil {
t.Fatal(err.Error())
}
if err := os.Rename(tmpFilePath, path.Join("/tmp/xmlErs/in", fileName)); err != nil {
t.Fatal("Error moving file to processing directory: ", err)
}
time.Sleep(100 * time.Millisecond)
}
func testXmlITAnalyseCDRs(t *testing.T) {
var reply []*engine.ExternalCDR
if err := xmlRPC.Call(utils.ApierV2GetCDRs, utils.RPCCDRsFilter{}, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(reply) != 6 {
t.Error("Unexpected number of CDRs returned: ", len(reply))
}
if err := xmlRPC.Call(utils.ApierV2GetCDRs, utils.RPCCDRsFilter{DestinationPrefixes: []string{"+4915117174963"}}, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(reply) != 3 {
t.Error("Unexpected number of CDRs returned: ", len(reply))
}
}
func testXMLITCleanupFiles(t *testing.T) {
for _, dir := range []string{"/tmp/ers",
"/tmp/ers2", "/tmp/init_session",
"/tmp/terminate_session", "/tmp/cdrs",
"/tmp/ers_with_filters", "/tmp/xmlErs"} {
if err := os.RemoveAll(dir); err != nil {
t.Fatal("Error removing folder: ", dir, err)
}
}
}
func testXMLITKillEngine(t *testing.T) {
if err := engine.KillEngine(*waitRater); err != nil {
t.Error(err)
}
}

View File

@@ -40,6 +40,8 @@ func NewEventReader(cfg *config.CGRConfig, cfgIdx int,
err = fmt.Errorf("unsupported reader type: <%s>", cfg.ERsCfg().Readers[cfgIdx].Type)
case utils.MetaFileCSV:
return NewCSVFileER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit)
case utils.MetaFileXML:
return NewXMLFileER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit)
case utils.MetaKafkajsonMap:
return NewKafkaER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit)
case utils.MetaSQL:

View File

@@ -407,10 +407,10 @@ const (
UnsupportedMigrationTask = "unsupported migration task"
NoStorDBConnection = "not connected to StorDB"
UndefinedVersion = "undefined version"
UnsupportedDB = "unsupported database"
TxtSuffix = ".txt"
JSNSuffix = ".json"
FormSuffix = ".form"
XMLSuffix = ".xml"
CSVSuffix = ".csv"
FWVSuffix = ".fwv"
CONTENT_JSON = "json"