diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index 0ee31ca31..0d988cfc3 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -81,18 +81,29 @@ func populateStoredCdrField(cdr *engine.StoredCdr, fieldId, fieldVal string) err return nil } +/* +One instance of CDRC will act on one folder. +Common parameters within configs processed: + * cdrS, cdrFormat, cdrInDir, cdrOutDir, runDelay +Parameters specific per config instance: + * duMultiplyFactor, cdrSourceId, cdrFilter, cdrFields +*/ func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrServer *engine.CdrServer, exitChan chan struct{}) (*Cdrc, error) { var cdrcCfg *config.CdrcConfig for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one break } cdrc := &Cdrc{cdrsAddress: cdrcCfg.Cdrs, CdrFormat: cdrcCfg.CdrFormat, cdrInDir: cdrcCfg.CdrInDir, cdrOutDir: cdrcCfg.CdrOutDir, - cdrSourceId: cdrcCfg.CdrSourceId, runDelay: cdrcCfg.RunDelay, csvSep: cdrcCfg.FieldSeparator, duMultiplyFactor: cdrcCfg.DataUsageMultiplyFactor, + runDelay: cdrcCfg.RunDelay, csvSep: cdrcCfg.FieldSeparator, httpSkipTlsCheck: httpSkipTlsCheck, cdrServer: cdrServer, exitChan: exitChan} + cdrc.cdrSourceIds = make([]string, len(cdrcCfgs)) + cdrc.duMultiplyFactors = make([]float64, len(cdrcCfgs)) cdrc.cdrFilters = make([]utils.RSRFields, len(cdrcCfgs)) cdrc.cdrFields = make([][]*config.CfgCdrField, len(cdrcCfgs)) idx := 0 for _, cfg := range cdrcCfgs { + cdrc.cdrSourceIds[idx] = cfg.CdrSourceId + cdrc.duMultiplyFactors[idx] = cfg.DataUsageMultiplyFactor cdrc.cdrFilters[idx] = cfg.CdrFilter cdrc.cdrFields[idx] = cfg.CdrFields idx += 1 @@ -111,17 +122,17 @@ type Cdrc struct { cdrsAddress, CdrFormat, cdrInDir, - cdrOutDir, - cdrSourceId string - runDelay time.Duration - csvSep rune - duMultiplyFactor float64 - cdrFilters []utils.RSRFields // Should be in sync with cdrFields on indexes - cdrFields [][]*config.CfgCdrField // Profiles directly connected with cdrFilters - httpSkipTlsCheck bool - cdrServer *engine.CdrServer // Reference towards internal cdrServer if that is the case - httpClient *http.Client - exitChan chan struct{} + cdrOutDir string + cdrSourceIds []string // Should be in sync with cdrFields on indexes + runDelay time.Duration + csvSep rune + duMultiplyFactors []float64 + cdrFilters []utils.RSRFields // Should be in sync with cdrFields on indexes + cdrFields [][]*config.CfgCdrField // Profiles directly connected with cdrFilters + httpSkipTlsCheck bool + cdrServer *engine.CdrServer // Reference towards internal cdrServer if that is the case + httpClient *http.Client + exitChan chan struct{} } // When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing @@ -216,7 +227,7 @@ func (self *Cdrc) processFile(filePath string) error { continue // Other csv related errors, ignore } recordCdrs := make([]*engine.StoredCdr, 0) // More CDRs based on the number of filters and field templates - for idx, cdrFieldsInst := range self.cdrFields { + for idx := range self.cdrFields { // Make sure filters are matching filterBreak := false for _, rsrFilter := range self.cdrFilters[idx] { @@ -233,7 +244,7 @@ func (self *Cdrc) processFile(filePath string) error { if filterBreak { // Stop importing cdrc fields profile due to non matching filter continue } - if storedCdr, err := self.recordToStoredCdr(record, cdrFieldsInst); err != nil { + if storedCdr, err := self.recordToStoredCdr(record, idx); err != nil { engine.Logger.Err(fmt.Sprintf(" Row %d - failed converting to StoredCdr, error: %s", procRowNr, err.Error())) continue } else { @@ -265,12 +276,12 @@ func (self *Cdrc) processFile(filePath string) error { return nil } -// Takes the record out of csv and turns it into http form which can be posted -func (self *Cdrc) recordToStoredCdr(record []string, cdrFields []*config.CfgCdrField) (*engine.StoredCdr, error) { - storedCdr := &engine.StoredCdr{CdrHost: "0.0.0.0", CdrSource: self.cdrSourceId, ExtraFields: make(map[string]string), Cost: -1} +// Takes the record out of csv and turns it into storedCdr which can be processed by CDRS +func (self *Cdrc) recordToStoredCdr(record []string, cfgIdx int) (*engine.StoredCdr, error) { + storedCdr := &engine.StoredCdr{CdrHost: "0.0.0.0", CdrSource: self.cdrSourceIds[cfgIdx], ExtraFields: make(map[string]string), Cost: -1} var err error var lazyHttpFields []*config.CfgCdrField - for _, cdrFldCfg := range cdrFields { + for _, cdrFldCfg := range self.cdrFields[cfgIdx] { var fieldVal string if utils.IsSliceMember([]string{CSV, FS_CSV}, self.CdrFormat) { if cdrFldCfg.Type == utils.CDRFIELD { @@ -298,8 +309,8 @@ func (self *Cdrc) recordToStoredCdr(record []string, cdrFields []*config.CfgCdrF } } storedCdr.CgrId = utils.Sha1(storedCdr.AccId, storedCdr.SetupTime.String()) - if storedCdr.TOR == utils.DATA && self.duMultiplyFactor != 0 { - storedCdr.Usage = time.Duration(float64(storedCdr.Usage.Nanoseconds()) * self.duMultiplyFactor) + if storedCdr.TOR == utils.DATA && self.duMultiplyFactors[cfgIdx] != 0 { + storedCdr.Usage = time.Duration(float64(storedCdr.Usage.Nanoseconds()) * self.duMultiplyFactors[cfgIdx]) } for _, httpFieldCfg := range lazyHttpFields { // Lazy process the http fields var outValByte []byte diff --git a/cdrc/cdrc_test.go b/cdrc/cdrc_test.go index 104be6fd0..0a19b16ae 100644 --- a/cdrc/cdrc_test.go +++ b/cdrc/cdrc_test.go @@ -32,15 +32,15 @@ func TestRecordForkCdr(t *testing.T) { cgrConfig, _ := config.NewDefaultCGRConfig() cdrcConfig := cgrConfig.CdrcProfiles["/var/log/cgrates/cdrc/in"][utils.META_DEFAULT] cdrcConfig.CdrFields = append(cdrcConfig.CdrFields, &config.CfgCdrField{Tag: "SupplierTest", Type: utils.CDRFIELD, CdrFieldId: "supplier", Value: []*utils.RSRField{&utils.RSRField{Id: "14"}}}) - cdrc := &Cdrc{CdrFormat: CSV, cdrSourceId: "TEST_CDRC", cdrFields: [][]*config.CfgCdrField{cdrcConfig.CdrFields}} + cdrc := &Cdrc{CdrFormat: CSV, cdrSourceIds: []string{"TEST_CDRC"}, cdrFields: [][]*config.CfgCdrField{cdrcConfig.CdrFields}} cdrRow := []string{"firstField", "secondField"} - _, err := cdrc.recordToStoredCdr(cdrRow, cdrc.cdrFields[0]) + _, err := cdrc.recordToStoredCdr(cdrRow, 0) if err == nil { t.Error("Failed to corectly detect missing fields from record") } cdrRow = []string{"ignored", "ignored", utils.VOICE, "acc1", utils.META_PREPAID, "*out", "cgrates.org", "call", "1001", "1001", "+4986517174963", "2013-02-03 19:50:00", "2013-02-03 19:54:00", "62", "supplier1", "172.16.1.1"} - rtCdr, err := cdrc.recordToStoredCdr(cdrRow, cdrc.cdrFields[0]) + rtCdr, err := cdrc.recordToStoredCdr(cdrRow, 0) if err != nil { t.Error("Failed to parse CDR in rated cdr", err) } @@ -71,9 +71,9 @@ func TestRecordForkCdr(t *testing.T) { func TestDataMultiplyFactor(t *testing.T) { cdrFields := []*config.CfgCdrField{&config.CfgCdrField{Tag: "TORField", Type: utils.CDRFIELD, CdrFieldId: "tor", Value: []*utils.RSRField{&utils.RSRField{Id: "0"}}}, &config.CfgCdrField{Tag: "UsageField", Type: utils.CDRFIELD, CdrFieldId: "usage", Value: []*utils.RSRField{&utils.RSRField{Id: "1"}}}} - cdrc := &Cdrc{CdrFormat: CSV, cdrSourceId: "TEST_CDRC", cdrFields: [][]*config.CfgCdrField{cdrFields}} + cdrc := &Cdrc{CdrFormat: CSV, cdrSourceIds: []string{"TEST_CDRC"}, duMultiplyFactors: []float64{0}, cdrFields: [][]*config.CfgCdrField{cdrFields}} cdrRow := []string{"*data", "1"} - rtCdr, err := cdrc.recordToStoredCdr(cdrRow, cdrc.cdrFields[0]) + rtCdr, err := cdrc.recordToStoredCdr(cdrRow, 0) if err != nil { t.Error("Failed to parse CDR in rated cdr", err) } @@ -90,7 +90,7 @@ func TestDataMultiplyFactor(t *testing.T) { if !reflect.DeepEqual(expectedCdr, rtCdr) { t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr) } - cdrc.duMultiplyFactor = 1024 + cdrc.duMultiplyFactors = []float64{1024} expectedCdr = &engine.StoredCdr{ CgrId: utils.Sha1("", sTime.String()), TOR: cdrRow[0], @@ -100,7 +100,7 @@ func TestDataMultiplyFactor(t *testing.T) { ExtraFields: map[string]string{}, Cost: -1, } - if rtCdr, _ := cdrc.recordToStoredCdr(cdrRow, cdrc.cdrFields[0]); !reflect.DeepEqual(expectedCdr, rtCdr) { + if rtCdr, _ := cdrc.recordToStoredCdr(cdrRow, 0); !reflect.DeepEqual(expectedCdr, rtCdr) { t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr) } cdrRow = []string{"*voice", "1"} @@ -113,7 +113,7 @@ func TestDataMultiplyFactor(t *testing.T) { ExtraFields: map[string]string{}, Cost: -1, } - if rtCdr, _ := cdrc.recordToStoredCdr(cdrRow, cdrc.cdrFields[0]); !reflect.DeepEqual(expectedCdr, rtCdr) { + if rtCdr, _ := cdrc.recordToStoredCdr(cdrRow, 0); !reflect.DeepEqual(expectedCdr, rtCdr) { t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr) } }