mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Extending CDRC XML, adding HierarchyPath to configs
This commit is contained in:
36
cdrc/xml.go
36
cdrc/xml.go
@@ -22,11 +22,11 @@ import (
|
||||
"bytes"
|
||||
"encoding/xml"
|
||||
"io"
|
||||
"path"
|
||||
|
||||
"github.com/ChrisTrenkamp/goxpath"
|
||||
"github.com/ChrisTrenkamp/goxpath/tree"
|
||||
"github.com/ChrisTrenkamp/goxpath/tree/xmltree"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
@@ -56,8 +56,8 @@ func elementText(xmlRes tree.Res, elmntPath string) (string, error) {
|
||||
return elmnts[0].String(), nil
|
||||
}
|
||||
|
||||
func NewXMLRecordsProcessor(recordsReader io.Reader) (*XMLRecordsProcessor, error) {
|
||||
xp, err := goxpath.Parse(path.Join("/broadWorksCDR/cdrData/"))
|
||||
func NewXMLRecordsProcessor(recordsReader io.Reader, cdrPath utils.HierarchyPath, cdrcCfgs []*config.CdrcConfig) (*XMLRecordsProcessor, error) {
|
||||
xp, err := goxpath.Parse(cdrPath.AsString("/", true))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -68,14 +68,16 @@ func NewXMLRecordsProcessor(recordsReader io.Reader) (*XMLRecordsProcessor, erro
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
xmlProc := new(XMLRecordsProcessor)
|
||||
xmlProc := &XMLRecordsProcessor{cdrPath: cdrPath, cdrcCfgs: cdrcCfgs}
|
||||
xmlProc.cdrXmlElmts = goxpath.MustExec(xp, xmlNode, nil)
|
||||
return xmlProc, nil
|
||||
}
|
||||
|
||||
type XMLRecordsProcessor struct {
|
||||
cdrXmlElmts []tree.Res // result of splitting the XML doc into CDR elements
|
||||
procItems int // current number of processed records from file
|
||||
cdrXmlElmts []tree.Res // result of splitting the XML doc into CDR elements
|
||||
procItems int // current number of processed records from file
|
||||
cdrPath utils.HierarchyPath // path towards one CDR element
|
||||
cdrcCfgs []*config.CdrcConfig // individual configs for the folder CDRC is monitoring
|
||||
}
|
||||
|
||||
func (xmlProc *XMLRecordsProcessor) ProcessedRecordsNr() int64 {
|
||||
@@ -87,7 +89,27 @@ func (xmlProc *XMLRecordsProcessor) ProcessNextRecord() (cdrs []*engine.CDR, err
|
||||
return nil, io.EOF // have processed all items
|
||||
}
|
||||
cdrs = make([]*engine.CDR, 0)
|
||||
//cdrXml := xmlProc.cdrXmlElmts[xmlProc.procItems]
|
||||
cdrXML := xmlProc.cdrXmlElmts[xmlProc.procItems]
|
||||
xmlProc.procItems += 1
|
||||
for _, cdrcCfg := range xmlProc.cdrcCfgs {
|
||||
filtersPassing := true
|
||||
for _, rsrFltr := range cdrcCfg.CdrFilter {
|
||||
if rsrFltr == nil {
|
||||
continue // Pass
|
||||
}
|
||||
fieldVal, _ := elementText(cdrXML, rsrFltr.Id)
|
||||
if !rsrFltr.FilterPasses(fieldVal) {
|
||||
filtersPassing = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if !filtersPassing {
|
||||
continue
|
||||
}
|
||||
}
|
||||
return cdrs, nil
|
||||
}
|
||||
|
||||
func (xmlProc *XMLRecordsProcessor) recordToStoredCdr(xmlEntity tree.Res, cdrcCfg *config.CdrcConfig) (*engine.CDR, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
@@ -191,7 +191,7 @@ func TestElementText(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestXMLRPProcessNextRecord(t *testing.T) {
|
||||
xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(cdrXmlBroadsoft))
|
||||
xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(cdrXmlBroadsoft), utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}), nil)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -25,23 +25,24 @@ import (
|
||||
)
|
||||
|
||||
type CdrcConfig struct {
|
||||
ID string // free-form text identifying this CDRC instance
|
||||
Enabled bool // Enable/Disable the profile
|
||||
DryRun bool // Do not post CDRs to the server
|
||||
CdrsConns []*HaPoolConfig // The address where CDRs can be reached
|
||||
CdrFormat string // The type of CDR file to process <csv|opensips_flatstore>
|
||||
FieldSeparator rune // The separator to use when reading csvs
|
||||
DataUsageMultiplyFactor float64 // Conversion factor for data usage
|
||||
Timezone string // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
|
||||
RunDelay time.Duration // Delay between runs, 0 for inotify driven requests
|
||||
MaxOpenFiles int // Maximum number of files opened simultaneously
|
||||
CdrInDir string // Folder to process CDRs from
|
||||
CdrOutDir string // Folder to move processed CDRs to
|
||||
FailedCallsPrefix string // Used in case of flatstore CDRs to avoid searching for BYE records
|
||||
CdrSourceId string // Source identifier for the processed CDRs
|
||||
CdrFilter utils.RSRFields // Filter CDR records to import
|
||||
ContinueOnSuccess bool // Continue after execution
|
||||
PartialRecordCache time.Duration // Duration to cache partial records when not pairing
|
||||
ID string // free-form text identifying this CDRC instance
|
||||
Enabled bool // Enable/Disable the profile
|
||||
DryRun bool // Do not post CDRs to the server
|
||||
CdrsConns []*HaPoolConfig // The address where CDRs can be reached
|
||||
CdrFormat string // The type of CDR file to process <csv|opensips_flatstore>
|
||||
FieldSeparator rune // The separator to use when reading csvs
|
||||
DataUsageMultiplyFactor float64 // Conversion factor for data usage
|
||||
Timezone string // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
|
||||
RunDelay time.Duration // Delay between runs, 0 for inotify driven requests
|
||||
MaxOpenFiles int // Maximum number of files opened simultaneously
|
||||
CdrInDir string // Folder to process CDRs from
|
||||
CdrOutDir string // Folder to move processed CDRs to
|
||||
FailedCallsPrefix string // Used in case of flatstore CDRs to avoid searching for BYE records
|
||||
CDRPath utils.HierarchyPath // used for XML CDRs to specify the path towards CDR elements
|
||||
CdrSourceId string // Source identifier for the processed CDRs
|
||||
CdrFilter utils.RSRFields // Filter CDR records to import
|
||||
ContinueOnSuccess bool // Continue after execution
|
||||
PartialRecordCache time.Duration // Duration to cache partial records when not pairing
|
||||
HeaderFields []*CfgCdrField
|
||||
ContentFields []*CfgCdrField
|
||||
TrailerFields []*CfgCdrField
|
||||
@@ -96,6 +97,9 @@ func (self *CdrcConfig) loadFromJsonCfg(jsnCfg *CdrcJsonCfg) error {
|
||||
if jsnCfg.Failed_calls_prefix != nil {
|
||||
self.FailedCallsPrefix = *jsnCfg.Failed_calls_prefix
|
||||
}
|
||||
if jsnCfg.Cdr_path != nil {
|
||||
self.CDRPath = utils.ParseHierarchyPath(*jsnCfg.Cdr_path, "")
|
||||
}
|
||||
if jsnCfg.Cdr_source_id != nil {
|
||||
self.CdrSourceId = *jsnCfg.Cdr_source_id
|
||||
}
|
||||
|
||||
@@ -183,6 +183,7 @@ const CGRATES_CFG_JSON = `
|
||||
"cdr_in_dir": "/var/log/cgrates/cdrc/in", // absolute path towards the directory where the CDRs are stored
|
||||
"cdr_out_dir": "/var/log/cgrates/cdrc/out", // absolute path towards the directory where processed CDRs will be moved
|
||||
"failed_calls_prefix": "missed_calls", // used in case of flatstore CDRs to avoid searching for BYE records
|
||||
"cdr_path": "", // path towards one CDR element in case of XML CDRs
|
||||
"cdr_source_id": "freeswitch_csv", // free form field, tag identifying the source of the CDRs within CDRS database
|
||||
"cdr_filter": "", // filter CDR records to import
|
||||
"continue_on_success": false, // continue to the next template if executed
|
||||
|
||||
@@ -319,6 +319,7 @@ func TestDfCdrcJsonCfg(t *testing.T) {
|
||||
Cdr_in_dir: utils.StringPointer("/var/log/cgrates/cdrc/in"),
|
||||
Cdr_out_dir: utils.StringPointer("/var/log/cgrates/cdrc/out"),
|
||||
Failed_calls_prefix: utils.StringPointer("missed_calls"),
|
||||
Cdr_path: utils.StringPointer(""),
|
||||
Cdr_source_id: utils.StringPointer("freeswitch_csv"),
|
||||
Cdr_filter: utils.StringPointer(""),
|
||||
Continue_on_success: utils.BoolPointer(false),
|
||||
|
||||
@@ -47,6 +47,7 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
|
||||
CdrInDir: "/var/log/cgrates/cdrc/in",
|
||||
CdrOutDir: "/var/log/cgrates/cdrc/out",
|
||||
FailedCallsPrefix: "missed_calls",
|
||||
CDRPath: utils.HierarchyPath([]string{""}),
|
||||
CdrSourceId: "freeswitch_csv",
|
||||
CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP),
|
||||
PartialRecordCache: time.Duration(10) * time.Second,
|
||||
@@ -92,6 +93,7 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
|
||||
MaxOpenFiles: 1024,
|
||||
CdrInDir: "/tmp/cgrates/cdrc1/in",
|
||||
CdrOutDir: "/tmp/cgrates/cdrc1/out",
|
||||
CDRPath: nil,
|
||||
CdrSourceId: "csv1",
|
||||
CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP),
|
||||
HeaderFields: make([]*CfgCdrField, 0),
|
||||
@@ -136,6 +138,7 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
|
||||
MaxOpenFiles: 1024,
|
||||
CdrInDir: "/tmp/cgrates/cdrc2/in",
|
||||
CdrOutDir: "/tmp/cgrates/cdrc2/out",
|
||||
CDRPath: nil,
|
||||
CdrSourceId: "csv2",
|
||||
CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP),
|
||||
HeaderFields: make([]*CfgCdrField, 0),
|
||||
@@ -162,6 +165,7 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
|
||||
MaxOpenFiles: 1024,
|
||||
CdrInDir: "/tmp/cgrates/cdrc3/in",
|
||||
CdrOutDir: "/tmp/cgrates/cdrc3/out",
|
||||
CDRPath: nil,
|
||||
CdrSourceId: "csv3",
|
||||
CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP),
|
||||
HeaderFields: make([]*CfgCdrField, 0),
|
||||
|
||||
@@ -155,6 +155,7 @@ type CdrcJsonCfg struct {
|
||||
Cdr_in_dir *string
|
||||
Cdr_out_dir *string
|
||||
Failed_calls_prefix *string
|
||||
Cdr_path *string
|
||||
Cdr_source_id *string
|
||||
Cdr_filter *string
|
||||
Continue_on_success *bool
|
||||
|
||||
@@ -552,3 +552,36 @@ func SizeFmt(num float64, suffix string) string {
|
||||
func TimeIs0h(t time.Time) bool {
|
||||
return t.Hour() == 0 && t.Minute() == 0 && t.Second() == 0
|
||||
}
|
||||
|
||||
func ParseHierarchyPath(path string, sep string) HierarchyPath {
|
||||
if sep == "" {
|
||||
for _, sep = range []string{HIERARCHY_SEP, "/"} {
|
||||
if idx := strings.Index(path, sep); idx != -1 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
path = strings.Trim(path, sep) // Need to strip if prefix of suffiy (eg: paths with /) so we can properly split
|
||||
return HierarchyPath(strings.Split(path, sep))
|
||||
}
|
||||
|
||||
// HierarchyPath is used in various places to represent various path hierarchies (eg: in Diameter groups, XML trees)
|
||||
type HierarchyPath []string
|
||||
|
||||
func (h HierarchyPath) AsString(sep string, prefix bool) string {
|
||||
if len(h) == 0 {
|
||||
return ""
|
||||
}
|
||||
retStr := ""
|
||||
for idx, itm := range h {
|
||||
if idx == 0 {
|
||||
if prefix {
|
||||
retStr += sep
|
||||
}
|
||||
} else {
|
||||
retStr += sep
|
||||
}
|
||||
retStr += itm
|
||||
}
|
||||
return retStr
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ package utils
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
@@ -659,3 +660,21 @@ func TestEndOfMonth(t *testing.T) {
|
||||
t.Errorf("Expected %v was %v", expected, eom)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseHierarchyPath(t *testing.T) {
|
||||
eHP := HierarchyPath([]string{"Root", "CGRateS"})
|
||||
if hp := ParseHierarchyPath("Root>CGRateS", ""); !reflect.DeepEqual(hp, eHP) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", eHP, hp)
|
||||
}
|
||||
if hp := ParseHierarchyPath("/Root/CGRateS/", ""); !reflect.DeepEqual(hp, eHP) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", eHP, hp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHierarchyPathAsString(t *testing.T) {
|
||||
eStr := "/Root/CGRateS"
|
||||
hp := HierarchyPath([]string{"Root", "CGRateS"})
|
||||
if hpStr := hp.AsString("/", true); hpStr != eStr {
|
||||
t.Errorf("Expecting: %q, received: %q", eStr, hpStr)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user