From 4bcd7c9028925e5a57863df577cd6d25f3701550 Mon Sep 17 00:00:00 2001 From: TeoV Date: Mon, 17 Sep 2018 06:49:49 -0400 Subject: [PATCH] Fixes #1202 #1204 --- cdrc/csv.go | 18 ++++++--------- cdrc/csv_test.go | 10 ++++----- cdrc/fwv.go | 1 - cdrc/xml.go | 16 +++++--------- config/fctemplate.go | 22 +++++++++++++++++++ data/conf/samples/cdrewithfilter/cgrates.json | 3 ++- engine/cdr.go | 2 ++ engine/cdre.go | 20 ++++++++++++++++- engine/cdrs.go | 1 + 9 files changed, 64 insertions(+), 29 deletions(-) diff --git a/cdrc/csv.go b/cdrc/csv.go index 54c2e29a6..5c3045a38 100644 --- a/cdrc/csv.go +++ b/cdrc/csv.go @@ -102,21 +102,21 @@ func (self *CsvRecordsProcessor) processFlatstoreRecord(record []string) ([]stri // Takes the record from a slice and turns it into StoredCdrs, posting them to the cdrServer func (self *CsvRecordsProcessor) processRecord(record []string) ([]*engine.CDR, error) { + csvProvider := newCsvProvider(record) recordCdrs := make([]*engine.CDR, 0) // More CDRs based on the number of filters and field templates for _, cdrcCfg := range self.cdrcCfgs { // cdrFields coming from more templates will produce individual storCdr records + tenant, err := cdrcCfg.Tenant.ParseValue("") // each profile of cdrc can have different tenant + if err != nil { + return nil, err + } // Make sure filters are matching if len(cdrcCfg.Filters) != 0 { - csvProvider := newCsvProvider(record) - tenant, err := cdrcCfg.Tenant.ParseValue("") - if err != nil { - return nil, err - } if pass, err := self.filterS.Pass(tenant, cdrcCfg.Filters, csvProvider); err != nil || !pass { continue // Not passes filters, ignore this CDR } } - storedCdr, err := self.recordToStoredCdr(record, cdrcCfg) + storedCdr, err := self.recordToStoredCdr(record, cdrcCfg, tenant) if err != nil { return nil, fmt.Errorf("Failed converting to StoredCdr, error: %s", err.Error()) } else if self.dfltCdrcCfg.CdrFormat == utils.PartialCSV { @@ -135,17 +135,13 @@ func (self *CsvRecordsProcessor) processRecord(record []string) ([]*engine.CDR, } // Takes the record out of csv and turns it into storedCdr which can be processed by CDRS -func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcCfg *config.CdrcConfig) (*engine.CDR, error) { +func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcCfg *config.CdrcConfig, tenant string) (*engine.CDR, error) { storedCdr := &engine.CDR{OriginHost: "0.0.0.0", Source: cdrcCfg.CdrSourceId, ExtraFields: make(map[string]string), Cost: -1} var err error csvProvider := newCsvProvider(record) // used for filterS and for RSRParsers var lazyHttpFields []*config.FCTemplate for _, cdrFldCfg := range cdrcCfg.ContentFields { if len(cdrFldCfg.Filters) != 0 { - tenant, err := cdrcCfg.Tenant.ParseValue("") - if err != nil { - return nil, err - } if pass, err := self.filterS.Pass(tenant, cdrFldCfg.Filters, csvProvider); err != nil { return nil, err diff --git a/cdrc/csv_test.go b/cdrc/csv_test.go index b1eab92ab..b450273df 100644 --- a/cdrc/csv_test.go +++ b/cdrc/csv_test.go @@ -36,14 +36,14 @@ func TestCsvRecordToCDR(t *testing.T) { Value: config.NewRSRParsersMustCompile("*default", true)}) csvProcessor := &CsvRecordsProcessor{dfltCdrcCfg: cdrcConfig, cdrcCfgs: []*config.CdrcConfig{cdrcConfig}} cdrRow := []string{"firstField", "secondField"} - _, err := csvProcessor.recordToStoredCdr(cdrRow, cdrcConfig) + _, err := csvProcessor.recordToStoredCdr(cdrRow, cdrcConfig, "cgrates.org") if err == nil { t.Error("Failed to corectly detect missing fields from record") } cdrRow = []string{"ignored", "ignored", utils.VOICE, "acc1", utils.META_PREPAID, "*out", "cgrates.org", "call", "1001", "1001", "+4986517174963", "2013-02-03 19:50:00", "2013-02-03 19:54:00", "62s", "supplier1", "172.16.1.1", "NORMAL_DISCONNECT"} - rtCdr, err := csvProcessor.recordToStoredCdr(cdrRow, cdrcConfig) + rtCdr, err := csvProcessor.recordToStoredCdr(cdrRow, cdrcConfig, "cgrates.org") if err != nil { t.Error("Failed to parse CDR in rated cdr", err) } @@ -84,7 +84,7 @@ func TestCsvDataMultiplyFactor(t *testing.T) { csvProcessor := &CsvRecordsProcessor{dfltCdrcCfg: cdrcConfig, cdrcCfgs: []*config.CdrcConfig{cdrcConfig}} csvProcessor.cdrcCfgs[0].DataUsageMultiplyFactor = 0 cdrRow := []string{"*data", "1"} - rtCdr, err := csvProcessor.recordToStoredCdr(cdrRow, cdrcConfig) + rtCdr, err := csvProcessor.recordToStoredCdr(cdrRow, cdrcConfig, "cgrates.org") if err != nil { t.Error("Failed to parse CDR in rated cdr", err) } @@ -112,7 +112,7 @@ func TestCsvDataMultiplyFactor(t *testing.T) { Cost: -1, } if rtCdr, _ := csvProcessor.recordToStoredCdr(cdrRow, - cdrcConfig); !reflect.DeepEqual(expectedCdr, rtCdr) { + cdrcConfig, "cgrates.org"); !reflect.DeepEqual(expectedCdr, rtCdr) { t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr) } cdrRow = []string{"*voice", "1s"} @@ -126,7 +126,7 @@ func TestCsvDataMultiplyFactor(t *testing.T) { Cost: -1, } if rtCdr, _ := csvProcessor.recordToStoredCdr(cdrRow, - cdrcConfig); !reflect.DeepEqual(expectedCdr, rtCdr) { + cdrcConfig, "cgrates.org"); !reflect.DeepEqual(expectedCdr, rtCdr) { t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr) } } diff --git a/cdrc/fwv.go b/cdrc/fwv.go index 1464a697e..a28904e24 100644 --- a/cdrc/fwv.go +++ b/cdrc/fwv.go @@ -192,7 +192,6 @@ func (self *FwvRecordsProcessor) recordToStoredCdr(record string, cdrcCfg *confi if err := storedCdr.ParseFieldValue(cdrFldCfg.FieldId, fieldVal, self.timezone); err != nil { return nil, err } - } if storedCdr.CGRID == "" && storedCdr.OriginID != "" && cfgKey != "*header" { storedCdr.CGRID = utils.Sha1(storedCdr.OriginID, storedCdr.SetupTime.UTC().String()) diff --git a/cdrc/xml.go b/cdrc/xml.go index 8a78c2362..b838fdd1b 100644 --- a/cdrc/xml.go +++ b/cdrc/xml.go @@ -115,18 +115,18 @@ func (xmlProc *XMLRecordsProcessor) ProcessNextRecord() (cdrs []*engine.CDR, err cdrXML := xmlProc.cdrXmlElmts[xmlProc.procItems] xmlProc.procItems += 1 for _, cdrcCfg := range xmlProc.cdrcCfgs { + tenant, err := cdrcCfg.Tenant.ParseValue("") + if err != nil { + return nil, err + } if len(cdrcCfg.Filters) != 0 { xmlProvider := newXmlProvider(cdrXML, xmlProc.cdrPath) - tenant, err := cdrcCfg.Tenant.ParseValue("") - if err != nil { - return nil, err - } if pass, err := xmlProc.filterS.Pass(tenant, cdrcCfg.Filters, xmlProvider); err != nil || !pass { continue // Not passes filters, ignore this CDR } } - if cdr, err := xmlProc.recordToCDR(cdrXML, cdrcCfg); err != nil { + if cdr, err := xmlProc.recordToCDR(cdrXML, cdrcCfg, tenant); err != nil { return nil, fmt.Errorf(" Failed converting to CDR, error: %s", err.Error()) } else { cdrs = append(cdrs, cdr) @@ -138,7 +138,7 @@ func (xmlProc *XMLRecordsProcessor) ProcessNextRecord() (cdrs []*engine.CDR, err return cdrs, nil } -func (xmlProc *XMLRecordsProcessor) recordToCDR(xmlEntity *etree.Element, cdrcCfg *config.CdrcConfig) (*engine.CDR, error) { +func (xmlProc *XMLRecordsProcessor) recordToCDR(xmlEntity *etree.Element, cdrcCfg *config.CdrcConfig, tenant string) (*engine.CDR, error) { cdr := &engine.CDR{OriginHost: "0.0.0.0", Source: cdrcCfg.CdrSourceId, ExtraFields: make(map[string]string), Cost: -1} var lazyHttpFields []*config.FCTemplate var err error @@ -146,10 +146,6 @@ func (xmlProc *XMLRecordsProcessor) recordToCDR(xmlEntity *etree.Element, cdrcCf xmlProvider := newXmlProvider(xmlEntity, xmlProc.cdrPath) for _, cdrFldCfg := range cdrcCfg.ContentFields { if len(cdrFldCfg.Filters) != 0 { - tenant, err := cdrcCfg.Tenant.ParseValue("") - if err != nil { - return nil, err - } if pass, err := xmlProc.filterS.Pass(tenant, cdrFldCfg.Filters, xmlProvider); err != nil || !pass { continue // Not passes filters, ignore this CDR diff --git a/config/fctemplate.go b/config/fctemplate.go index 56547ee21..7edc31976 100755 --- a/config/fctemplate.go +++ b/config/fctemplate.go @@ -18,6 +18,11 @@ along with this program. If not, see package config +// import ( +// "github.com/cgrates/cgrates/engine" +// "github.com/cgrates/cgrates/utils" +// ) + func NewFCTemplateFromFCTemplateJsonCfg(jsnCfg *FcTemplateJsonCfg) *FCTemplate { fcTmp := new(FCTemplate) if jsnCfg.Tag != nil { @@ -116,3 +121,20 @@ func FCTemplatesFromFCTemapltesJsonCfg(jsnCfgFlds []*FcTemplateJsonCfg) []*FCTem } return retFields } + +// type FCTemplates []*FCTemplate + +// func (tmps *FCTemplates) AsNavigableMap(tenant, timezone string, dp DataProvider, filterS *engine.FilterS) (*NavigableMap, error) { +// nM := NewNavigableMap(nil) +// for _, tmp := range tmps { +// if len(tmp.Filters) != 0 { +// if pass, err := filterS.Pass(tenant, +// tmp.Filters, dp); err != nil || !pass { +// continue // Not passes filters, ignore this CDR +// } +// } +// switch tmp.Type { +// case utils.META_COMPOSED: +// } +// } +// } diff --git a/data/conf/samples/cdrewithfilter/cgrates.json b/data/conf/samples/cdrewithfilter/cgrates.json index 015aeba0e..394bd8d6e 100755 --- a/data/conf/samples/cdrewithfilter/cgrates.json +++ b/data/conf/samples/cdrewithfilter/cgrates.json @@ -78,13 +78,14 @@ "cdrs": { "enabled": true, + //"online_cdr_exports":["TemplateWithFilter"], }, "cdre": { "TemplateWithFilter": { "export_format": "*file_csv", - "export_path": "/tmp", + "export_path": "/tmp/", "filters" :["*string:Source:test2"], "content_fields": [ {"tag": "CGRID", "type": "*composed", "value": "~CGRID"}, diff --git a/engine/cdr.go b/engine/cdr.go index d34704d09..319094f4f 100644 --- a/engine/cdr.go +++ b/engine/cdr.go @@ -206,6 +206,8 @@ func (cdr *CDR) ParseFieldValue(fieldId, fieldVal, timezone string) error { if cdr.OrderID, err = strconv.ParseInt(fieldVal, 10, 64); err != nil { return err } + case utils.OriginHost: // overwrite if originHost is given from template + cdr.OriginHost = fieldVal case utils.ToR: cdr.ToR += fieldVal case utils.RunID: diff --git a/engine/cdre.go b/engine/cdre.go index c1e06558a..bc4849ea4 100644 --- a/engine/cdre.go +++ b/engine/cdre.go @@ -26,6 +26,7 @@ import ( "net/url" "os" "path" + "path/filepath" "strconv" "sync" "time" @@ -479,7 +480,24 @@ func (cdre *CDRExporter) ExportCDRs() (err error) { if contLen == 0 { return } - fileOut, err := os.Create(cdre.exportPath) + var expFormat string + switch cdre.exportFormat { + case utils.MetaFileFWV: + expFormat = "fwv" + case utils.MetaFileCSV: + expFormat = "csv" + default: + expFormat = cdre.exportFormat + } + utils.Logger.Debug(fmt.Sprintf("CDRS : %+v", cdre.cdrs)) + expPath := cdre.exportPath + if len(filepath.Ext(expPath)) == 0 { // verify extension from exportPath (if have extension is file else is directory) + fileName := fmt.Sprintf("cdre_%s.%s", utils.UUIDSha1Prefix(), expFormat) + expPath = path.Join(expPath, fileName) + } + utils.Logger.Debug(fmt.Sprintf("expPath : %+v", expPath)) + utils.Logger.Debug(fmt.Sprintf("-------------------------------------------------------------")) + fileOut, err := os.Create(expPath) if err != nil { return err } diff --git a/engine/cdrs.go b/engine/cdrs.go index 015e2b6ad..73180d403 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -487,6 +487,7 @@ func (self *CdrServer) getCostFromRater(cdr *CDR) (*CallCost, error) { return cc, nil } +// replicateCDRs used by online exports func (self *CdrServer) replicateCDRs(cdrs []*CDR) (err error) { for _, exportID := range self.cgrCfg.CDRSOnlineCDRExports { expTpl := self.cgrCfg.CdreProfiles[exportID] // not checking for existence of profile since this should be done in a higher layer