CDRC XML recordToCDR

This commit is contained in:
DanB
2016-05-08 21:47:09 +02:00
parent 38a2c0f14d
commit 8a98fd261f
3 changed files with 177 additions and 15 deletions

View File

@@ -20,8 +20,14 @@ package cdrc
import (
"bytes"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"io"
"strconv"
"strings"
"time"
"github.com/ChrisTrenkamp/goxpath"
"github.com/ChrisTrenkamp/goxpath/tree"
@@ -56,7 +62,36 @@ func elementText(xmlRes tree.Res, elmntPath string) (string, error) {
return elmnts[0].String(), nil
}
func NewXMLRecordsProcessor(recordsReader io.Reader, cdrPath utils.HierarchyPath, cdrcCfgs []*config.CdrcConfig) (*XMLRecordsProcessor, error) {
// handlerUsageDiff will calculate the usage as difference between timeEnd and timeStart
// Expects the 2 arguments in template separated by |
func handlerSubstractUsage(xmlElmnt tree.Res, argsTpl utils.RSRFields, cdrPath utils.HierarchyPath, timezone string) (time.Duration, error) {
var argsStr string
for _, rsrArg := range argsTpl {
if rsrArg.Id == utils.HandlerArgSep {
argsStr += rsrArg.Id
continue
}
absolutePath := utils.ParseHierarchyPath(rsrArg.Id, "")
relPath := utils.HierarchyPath(absolutePath[len(cdrPath)-1:]) // Need relative path to the xmlElmnt
argStr, _ := elementText(xmlElmnt, relPath.AsString("/", true))
argsStr += argStr
}
handlerArgs := strings.Split(argsStr, utils.HandlerArgSep)
if len(handlerArgs) != 2 {
return time.Duration(0), errors.New("Unexpected number of arguments")
}
tEnd, err := utils.ParseTimeDetectLayout(handlerArgs[0], timezone)
if err != nil {
return time.Duration(0), err
}
tStart, err := utils.ParseTimeDetectLayout(handlerArgs[1], timezone)
if err != nil {
return time.Duration(0), err
}
return tEnd.Sub(tStart), nil
}
func NewXMLRecordsProcessor(recordsReader io.Reader, cdrPath utils.HierarchyPath, timezone string, httpSkipTlsCheck bool, cdrcCfgs []*config.CdrcConfig) (*XMLRecordsProcessor, error) {
xp, err := goxpath.Parse(cdrPath.AsString("/", true))
if err != nil {
return nil, err
@@ -68,16 +103,19 @@ func NewXMLRecordsProcessor(recordsReader io.Reader, cdrPath utils.HierarchyPath
if err != nil {
return nil, err
}
xmlProc := &XMLRecordsProcessor{cdrPath: cdrPath, cdrcCfgs: cdrcCfgs}
xmlProc := &XMLRecordsProcessor{cdrPath: cdrPath, timezone: timezone, httpSkipTlsCheck: httpSkipTlsCheck, cdrcCfgs: cdrcCfgs}
xmlProc.cdrXmlElmts = goxpath.MustExec(xp, xmlNode, nil)
return xmlProc, nil
}
type XMLRecordsProcessor struct {
cdrXmlElmts []tree.Res // result of splitting the XML doc into CDR elements
procItems int // current number of processed records from file
cdrPath utils.HierarchyPath // path towards one CDR element
cdrcCfgs []*config.CdrcConfig // individual configs for the folder CDRC is monitoring
cdrXmlElmts []tree.Res // result of splitting the XML doc into CDR elements
procItems int // current number of processed records from file
cdrPath utils.HierarchyPath // path towards one CDR element
timezone string
httpSkipTlsCheck bool
cdrcCfgs []*config.CdrcConfig // individual configs for the folder CDRC is monitoring
}
func (xmlProc *XMLRecordsProcessor) ProcessedRecordsNr() int64 {
@@ -106,10 +144,79 @@ func (xmlProc *XMLRecordsProcessor) ProcessNextRecord() (cdrs []*engine.CDR, err
if !filtersPassing {
continue
}
if cdr, err := xmlProc.recordToCDR(cdrXML, cdrcCfg); err != nil {
return nil, fmt.Errorf("<CDRC> Failed converting to CDR, error: %s", err.Error())
} else {
cdrs = append(cdrs, cdr)
}
if !cdrcCfg.ContinueOnSuccess {
break
}
}
return cdrs, nil
}
func (xmlProc *XMLRecordsProcessor) recordToStoredCdr(xmlEntity tree.Res, cdrcCfg *config.CdrcConfig) (*engine.CDR, error) {
return nil, nil
func (xmlProc *XMLRecordsProcessor) recordToCDR(xmlEntity tree.Res, cdrcCfg *config.CdrcConfig) (*engine.CDR, error) {
cdr := &engine.CDR{OriginHost: "0.0.0.0", Source: cdrcCfg.CdrSourceId, ExtraFields: make(map[string]string), Cost: -1}
var lazyHttpFields []*config.CfgCdrField
var err error
for _, cdrFldCfg := range cdrcCfg.ContentFields {
var fieldVal string
if cdrFldCfg.Type == utils.META_COMPOSED {
for _, cfgFieldRSR := range cdrFldCfg.Value {
if cfgFieldRSR.IsStatic() {
fieldVal += cfgFieldRSR.ParseValue("")
} else { // Dynamic value extracted using path
absolutePath := utils.ParseHierarchyPath(cfgFieldRSR.Id, "")
relPath := utils.HierarchyPath(absolutePath[len(xmlProc.cdrPath)-1:]) // Need relative path to the xmlElmnt
if fieldVal, err := elementText(xmlEntity, relPath.AsString("/", true)); err != nil {
return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s, err: %s", xmlEntity, cdrFldCfg.Tag, err.Error())
} else {
fieldVal += cfgFieldRSR.ParseValue(fieldVal)
}
}
}
} else if cdrFldCfg.Type == utils.META_HTTP_POST {
lazyHttpFields = append(lazyHttpFields, cdrFldCfg) // Will process later so we can send an estimation of cdr to http server
} else if cdrFldCfg.Type == utils.META_HANDLER && cdrFldCfg.HandlerId == utils.HandlerSubstractUsage {
usage, err := handlerSubstractUsage(xmlEntity, cdrFldCfg.Value, xmlProc.cdrPath, xmlProc.timezone)
if err != nil {
return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s, err: %s", xmlEntity, cdrFldCfg.Tag, err.Error())
}
fieldVal += strconv.FormatFloat(usage.Seconds(), 'f', -1, 64)
} else {
return nil, fmt.Errorf("Unsupported field type: %s", cdrFldCfg.Type)
}
if err := cdr.ParseFieldValue(cdrFldCfg.FieldId, fieldVal, xmlProc.timezone); err != nil {
return nil, err
}
}
cdr.CGRID = utils.Sha1(cdr.OriginID, cdr.SetupTime.UTC().String())
if cdr.ToR == utils.DATA && cdrcCfg.DataUsageMultiplyFactor != 0 {
cdr.Usage = time.Duration(float64(cdr.Usage.Nanoseconds()) * cdrcCfg.DataUsageMultiplyFactor)
}
for _, httpFieldCfg := range lazyHttpFields { // Lazy process the http fields
var outValByte []byte
var fieldVal, httpAddr string
for _, rsrFld := range httpFieldCfg.Value {
httpAddr += rsrFld.ParseValue("")
}
var jsn []byte
jsn, err = json.Marshal(cdr)
if err != nil {
return nil, err
}
if outValByte, err = utils.HttpJsonPost(httpAddr, xmlProc.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory {
return nil, err
} else {
fieldVal = string(outValByte)
if len(fieldVal) == 0 && httpFieldCfg.Mandatory {
return nil, fmt.Errorf("MandatoryIeMissing: Empty result for http_post field: %s", httpFieldCfg.Tag)
}
if err := cdr.ParseFieldValue(httpFieldCfg.FieldId, fieldVal, xmlProc.timezone); err != nil {
return nil, err
}
}
}
return cdr, nil
}

View File

@@ -21,12 +21,14 @@ package cdrc
import (
"bytes"
"path"
"reflect"
//"reflect"
"testing"
"time"
"github.com/ChrisTrenkamp/goxpath"
"github.com/ChrisTrenkamp/goxpath/tree/xmltree"
"github.com/cgrates/cgrates/engine"
//"github.com/cgrates/cgrates/config"
//"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -169,7 +171,7 @@ func optsNotStrict(s *xmltree.ParseOptions) {
s.Strict = false
}
func TestElementText(t *testing.T) {
func TestXMLElementText(t *testing.T) {
xp := goxpath.MustParse(path.Join("/broadWorksCDR/cdrData/"))
xmlTree := xmltree.MustParseXML(bytes.NewBufferString(cdrXmlBroadsoft), optsNotStrict)
cdrs := goxpath.MustExec(xp, xmlTree, nil)
@@ -190,15 +192,66 @@ func TestElementText(t *testing.T) {
}
}
func TestXMLRPProcessNextRecord(t *testing.T) {
xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(cdrXmlBroadsoft), utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}), nil)
func TestXMLHandlerSubstractUsage(t *testing.T) {
xp := goxpath.MustParse(path.Join("/broadWorksCDR/cdrData/"))
xmlTree := xmltree.MustParseXML(bytes.NewBufferString(cdrXmlBroadsoft), optsNotStrict)
cdrs := goxpath.MustExec(xp, xmlTree, nil)
cdrWithUsage := cdrs[1]
if usage, err := handlerSubstractUsage(cdrWithUsage, utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>releaseTime;^|;broadWorksCDR>cdrData>basicModule>answerTime", utils.INFIELD_SEP),
utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}), "UTC"); err != nil {
t.Error(err)
} else if usage != time.Duration(13483000000) {
t.Errorf("Expected: 13.483s, received: %v", usage)
}
}
/*
func TestXMLRPProcess(t *testing.T) {
cdrcCfgs := []*config.CdrcConfig{
&config.CdrcConfig{
ID: "TestXML",
Enabled: true,
CdrFormat: "xml",
DataUsageMultiplyFactor: 1024,
CDRPath: utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}),
CdrSourceId: "TestXML",
CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP),
ContentFields: []*config.CfgCdrField{
&config.CfgCdrField{Tag: "TOR", Type: utils.META_COMPOSED, FieldId: utils.TOR,
Value: utils.ParseRSRFieldsMustCompile("^*voice", utils.INFIELD_SEP), Mandatory: true},
&config.CfgCdrField{Tag: "OriginID", Type: utils.META_COMPOSED, FieldId: utils.ACCID,
Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>localCallId", utils.INFIELD_SEP), Mandatory: true},
&config.CfgCdrField{Tag: "RequestType", Type: utils.META_COMPOSED, FieldId: utils.REQTYPE,
Value: utils.ParseRSRFieldsMustCompile("^*rated", utils.INFIELD_SEP), Mandatory: true},
&config.CfgCdrField{Tag: "Direction", Type: utils.META_COMPOSED, FieldId: utils.DIRECTION,
Value: utils.ParseRSRFieldsMustCompile("^*out", utils.INFIELD_SEP), Mandatory: true},
&config.CfgCdrField{Tag: "Tenant", Type: utils.META_COMPOSED, FieldId: utils.TENANT,
Value: utils.ParseRSRFieldsMustCompile("~broadWorksCDR>cdrData>basicModule>userId:s/*.@(.*)/${1}/", utils.INFIELD_SEP), Mandatory: true},
&config.CfgCdrField{Tag: "Category", Type: utils.META_COMPOSED, FieldId: utils.CATEGORY,
Value: utils.ParseRSRFieldsMustCompile("^call", utils.INFIELD_SEP), Mandatory: true},
&config.CfgCdrField{Tag: "Account", Type: utils.META_COMPOSED, FieldId: utils.ACCOUNT,
Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>userNumber", utils.INFIELD_SEP), Mandatory: true},
&config.CfgCdrField{Tag: "Destination", Type: utils.META_COMPOSED, FieldId: utils.DESTINATION,
Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>calledNumber", utils.INFIELD_SEP), Mandatory: true},
&config.CfgCdrField{Tag: "SetupTime", Type: utils.META_COMPOSED, FieldId: utils.SETUP_TIME,
Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>startTime", utils.INFIELD_SEP), Mandatory: true},
&config.CfgCdrField{Tag: "AnswerTime", Type: utils.META_COMPOSED, FieldId: utils.ANSWER_TIME,
Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>answerTime", utils.INFIELD_SEP), Mandatory: true},
&config.CfgCdrField{Tag: "Usage", Type: utils.META_HANDLER, FieldId: utils.USAGE, HandlerId: utils.HandlerSubstractUsage,
Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>releaseTime;^|;broadWorksCDR>cdrData>basicModule>answerTime", utils.INFIELD_SEP), Mandatory: true},
},
},
}
xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(cdrXmlBroadsoft), utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}), "UTC", true, cdrcCfgs)
if err != nil {
t.Error(err)
}
var cdrs []*engine.CDR
for {
for i := 0; i < len(cdrs); i++ {
cdrs, err = xmlRP.ProcessNextRecord()
break
if i == 1 { // Take second CDR since the first one cannot be processed
break
}
}
if err != nil {
t.Error(err)
@@ -208,3 +261,4 @@ func TestXMLRPProcessNextRecord(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", expectedCDRs, cdrs)
}
}
*/

View File

@@ -285,6 +285,7 @@ const (
SessionTTL = "SessionTTL"
SessionTTLLastUsed = "SessionTTLLastUsed"
SessionTTLUsage = "SessionTTLUsage"
HandlerSubstractUsage = "*substract_usage"
)
var (