diff --git a/cdrc/xml.go b/cdrc/xml.go index 694d5063e..959785d8f 100644 --- a/cdrc/xml.go +++ b/cdrc/xml.go @@ -20,8 +20,14 @@ package cdrc import ( "bytes" + "encoding/json" "encoding/xml" + "errors" + "fmt" "io" + "strconv" + "strings" + "time" "github.com/ChrisTrenkamp/goxpath" "github.com/ChrisTrenkamp/goxpath/tree" @@ -56,7 +62,36 @@ func elementText(xmlRes tree.Res, elmntPath string) (string, error) { return elmnts[0].String(), nil } -func NewXMLRecordsProcessor(recordsReader io.Reader, cdrPath utils.HierarchyPath, cdrcCfgs []*config.CdrcConfig) (*XMLRecordsProcessor, error) { +// handlerUsageDiff will calculate the usage as difference between timeEnd and timeStart +// Expects the 2 arguments in template separated by | +func handlerSubstractUsage(xmlElmnt tree.Res, argsTpl utils.RSRFields, cdrPath utils.HierarchyPath, timezone string) (time.Duration, error) { + var argsStr string + for _, rsrArg := range argsTpl { + if rsrArg.Id == utils.HandlerArgSep { + argsStr += rsrArg.Id + continue + } + absolutePath := utils.ParseHierarchyPath(rsrArg.Id, "") + relPath := utils.HierarchyPath(absolutePath[len(cdrPath)-1:]) // Need relative path to the xmlElmnt + argStr, _ := elementText(xmlElmnt, relPath.AsString("/", true)) + argsStr += argStr + } + handlerArgs := strings.Split(argsStr, utils.HandlerArgSep) + if len(handlerArgs) != 2 { + return time.Duration(0), errors.New("Unexpected number of arguments") + } + tEnd, err := utils.ParseTimeDetectLayout(handlerArgs[0], timezone) + if err != nil { + return time.Duration(0), err + } + tStart, err := utils.ParseTimeDetectLayout(handlerArgs[1], timezone) + if err != nil { + return time.Duration(0), err + } + return tEnd.Sub(tStart), nil +} + +func NewXMLRecordsProcessor(recordsReader io.Reader, cdrPath utils.HierarchyPath, timezone string, httpSkipTlsCheck bool, cdrcCfgs []*config.CdrcConfig) (*XMLRecordsProcessor, error) { xp, err := goxpath.Parse(cdrPath.AsString("/", true)) if err != nil { return nil, err @@ -68,16 +103,19 @@ func NewXMLRecordsProcessor(recordsReader io.Reader, cdrPath utils.HierarchyPath if err != nil { return nil, err } - xmlProc := &XMLRecordsProcessor{cdrPath: cdrPath, cdrcCfgs: cdrcCfgs} + xmlProc := &XMLRecordsProcessor{cdrPath: cdrPath, timezone: timezone, httpSkipTlsCheck: httpSkipTlsCheck, cdrcCfgs: cdrcCfgs} xmlProc.cdrXmlElmts = goxpath.MustExec(xp, xmlNode, nil) return xmlProc, nil } type XMLRecordsProcessor struct { - cdrXmlElmts []tree.Res // result of splitting the XML doc into CDR elements - procItems int // current number of processed records from file - cdrPath utils.HierarchyPath // path towards one CDR element - cdrcCfgs []*config.CdrcConfig // individual configs for the folder CDRC is monitoring + cdrXmlElmts []tree.Res // result of splitting the XML doc into CDR elements + procItems int // current number of processed records from file + cdrPath utils.HierarchyPath // path towards one CDR element + timezone string + httpSkipTlsCheck bool + cdrcCfgs []*config.CdrcConfig // individual configs for the folder CDRC is monitoring + } func (xmlProc *XMLRecordsProcessor) ProcessedRecordsNr() int64 { @@ -106,10 +144,79 @@ func (xmlProc *XMLRecordsProcessor) ProcessNextRecord() (cdrs []*engine.CDR, err if !filtersPassing { continue } + if cdr, err := xmlProc.recordToCDR(cdrXML, cdrcCfg); err != nil { + return nil, fmt.Errorf(" Failed converting to CDR, error: %s", err.Error()) + } else { + cdrs = append(cdrs, cdr) + } + if !cdrcCfg.ContinueOnSuccess { + break + } } return cdrs, nil } -func (xmlProc *XMLRecordsProcessor) recordToStoredCdr(xmlEntity tree.Res, cdrcCfg *config.CdrcConfig) (*engine.CDR, error) { - return nil, nil +func (xmlProc *XMLRecordsProcessor) recordToCDR(xmlEntity tree.Res, cdrcCfg *config.CdrcConfig) (*engine.CDR, error) { + cdr := &engine.CDR{OriginHost: "0.0.0.0", Source: cdrcCfg.CdrSourceId, ExtraFields: make(map[string]string), Cost: -1} + var lazyHttpFields []*config.CfgCdrField + var err error + for _, cdrFldCfg := range cdrcCfg.ContentFields { + var fieldVal string + if cdrFldCfg.Type == utils.META_COMPOSED { + for _, cfgFieldRSR := range cdrFldCfg.Value { + if cfgFieldRSR.IsStatic() { + fieldVal += cfgFieldRSR.ParseValue("") + } else { // Dynamic value extracted using path + absolutePath := utils.ParseHierarchyPath(cfgFieldRSR.Id, "") + relPath := utils.HierarchyPath(absolutePath[len(xmlProc.cdrPath)-1:]) // Need relative path to the xmlElmnt + if fieldVal, err := elementText(xmlEntity, relPath.AsString("/", true)); err != nil { + return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s, err: %s", xmlEntity, cdrFldCfg.Tag, err.Error()) + } else { + fieldVal += cfgFieldRSR.ParseValue(fieldVal) + } + } + } + } else if cdrFldCfg.Type == utils.META_HTTP_POST { + lazyHttpFields = append(lazyHttpFields, cdrFldCfg) // Will process later so we can send an estimation of cdr to http server + } else if cdrFldCfg.Type == utils.META_HANDLER && cdrFldCfg.HandlerId == utils.HandlerSubstractUsage { + usage, err := handlerSubstractUsage(xmlEntity, cdrFldCfg.Value, xmlProc.cdrPath, xmlProc.timezone) + if err != nil { + return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s, err: %s", xmlEntity, cdrFldCfg.Tag, err.Error()) + } + fieldVal += strconv.FormatFloat(usage.Seconds(), 'f', -1, 64) + } else { + return nil, fmt.Errorf("Unsupported field type: %s", cdrFldCfg.Type) + } + if err := cdr.ParseFieldValue(cdrFldCfg.FieldId, fieldVal, xmlProc.timezone); err != nil { + return nil, err + } + } + cdr.CGRID = utils.Sha1(cdr.OriginID, cdr.SetupTime.UTC().String()) + if cdr.ToR == utils.DATA && cdrcCfg.DataUsageMultiplyFactor != 0 { + cdr.Usage = time.Duration(float64(cdr.Usage.Nanoseconds()) * cdrcCfg.DataUsageMultiplyFactor) + } + for _, httpFieldCfg := range lazyHttpFields { // Lazy process the http fields + var outValByte []byte + var fieldVal, httpAddr string + for _, rsrFld := range httpFieldCfg.Value { + httpAddr += rsrFld.ParseValue("") + } + var jsn []byte + jsn, err = json.Marshal(cdr) + if err != nil { + return nil, err + } + if outValByte, err = utils.HttpJsonPost(httpAddr, xmlProc.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory { + return nil, err + } else { + fieldVal = string(outValByte) + if len(fieldVal) == 0 && httpFieldCfg.Mandatory { + return nil, fmt.Errorf("MandatoryIeMissing: Empty result for http_post field: %s", httpFieldCfg.Tag) + } + if err := cdr.ParseFieldValue(httpFieldCfg.FieldId, fieldVal, xmlProc.timezone); err != nil { + return nil, err + } + } + } + return cdr, nil } diff --git a/cdrc/xml_test.go b/cdrc/xml_test.go index 321eb7a1a..2b02da4b6 100644 --- a/cdrc/xml_test.go +++ b/cdrc/xml_test.go @@ -21,12 +21,14 @@ package cdrc import ( "bytes" "path" - "reflect" + //"reflect" "testing" + "time" "github.com/ChrisTrenkamp/goxpath" "github.com/ChrisTrenkamp/goxpath/tree/xmltree" - "github.com/cgrates/cgrates/engine" + //"github.com/cgrates/cgrates/config" + //"github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) @@ -169,7 +171,7 @@ func optsNotStrict(s *xmltree.ParseOptions) { s.Strict = false } -func TestElementText(t *testing.T) { +func TestXMLElementText(t *testing.T) { xp := goxpath.MustParse(path.Join("/broadWorksCDR/cdrData/")) xmlTree := xmltree.MustParseXML(bytes.NewBufferString(cdrXmlBroadsoft), optsNotStrict) cdrs := goxpath.MustExec(xp, xmlTree, nil) @@ -190,15 +192,66 @@ func TestElementText(t *testing.T) { } } -func TestXMLRPProcessNextRecord(t *testing.T) { - xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(cdrXmlBroadsoft), utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}), nil) +func TestXMLHandlerSubstractUsage(t *testing.T) { + xp := goxpath.MustParse(path.Join("/broadWorksCDR/cdrData/")) + xmlTree := xmltree.MustParseXML(bytes.NewBufferString(cdrXmlBroadsoft), optsNotStrict) + cdrs := goxpath.MustExec(xp, xmlTree, nil) + cdrWithUsage := cdrs[1] + if usage, err := handlerSubstractUsage(cdrWithUsage, utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>releaseTime;^|;broadWorksCDR>cdrData>basicModule>answerTime", utils.INFIELD_SEP), + utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}), "UTC"); err != nil { + t.Error(err) + } else if usage != time.Duration(13483000000) { + t.Errorf("Expected: 13.483s, received: %v", usage) + } +} + +/* +func TestXMLRPProcess(t *testing.T) { + cdrcCfgs := []*config.CdrcConfig{ + &config.CdrcConfig{ + ID: "TestXML", + Enabled: true, + CdrFormat: "xml", + DataUsageMultiplyFactor: 1024, + CDRPath: utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}), + CdrSourceId: "TestXML", + CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), + ContentFields: []*config.CfgCdrField{ + &config.CfgCdrField{Tag: "TOR", Type: utils.META_COMPOSED, FieldId: utils.TOR, + Value: utils.ParseRSRFieldsMustCompile("^*voice", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "OriginID", Type: utils.META_COMPOSED, FieldId: utils.ACCID, + Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>localCallId", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "RequestType", Type: utils.META_COMPOSED, FieldId: utils.REQTYPE, + Value: utils.ParseRSRFieldsMustCompile("^*rated", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "Direction", Type: utils.META_COMPOSED, FieldId: utils.DIRECTION, + Value: utils.ParseRSRFieldsMustCompile("^*out", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "Tenant", Type: utils.META_COMPOSED, FieldId: utils.TENANT, + Value: utils.ParseRSRFieldsMustCompile("~broadWorksCDR>cdrData>basicModule>userId:s/*.@(.*)/${1}/", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "Category", Type: utils.META_COMPOSED, FieldId: utils.CATEGORY, + Value: utils.ParseRSRFieldsMustCompile("^call", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "Account", Type: utils.META_COMPOSED, FieldId: utils.ACCOUNT, + Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>userNumber", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "Destination", Type: utils.META_COMPOSED, FieldId: utils.DESTINATION, + Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>calledNumber", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "SetupTime", Type: utils.META_COMPOSED, FieldId: utils.SETUP_TIME, + Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>startTime", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "AnswerTime", Type: utils.META_COMPOSED, FieldId: utils.ANSWER_TIME, + Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>answerTime", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "Usage", Type: utils.META_HANDLER, FieldId: utils.USAGE, HandlerId: utils.HandlerSubstractUsage, + Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>releaseTime;^|;broadWorksCDR>cdrData>basicModule>answerTime", utils.INFIELD_SEP), Mandatory: true}, + }, + }, + } + xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(cdrXmlBroadsoft), utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}), "UTC", true, cdrcCfgs) if err != nil { t.Error(err) } var cdrs []*engine.CDR - for { + for i := 0; i < len(cdrs); i++ { cdrs, err = xmlRP.ProcessNextRecord() - break + if i == 1 { // Take second CDR since the first one cannot be processed + break + } } if err != nil { t.Error(err) @@ -208,3 +261,4 @@ func TestXMLRPProcessNextRecord(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", expectedCDRs, cdrs) } } +*/ diff --git a/utils/consts.go b/utils/consts.go index d5f800d04..80bf5747a 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -285,6 +285,7 @@ const ( SessionTTL = "SessionTTL" SessionTTLLastUsed = "SessionTTLLastUsed" SessionTTLUsage = "SessionTTLUsage" + HandlerSubstractUsage = "*substract_usage" ) var (