diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index 45d1450a1..771e04ab0 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -53,7 +53,7 @@ Parameters specific per config instance: * cdrSourceId, cdrFilter, cdrFields */ func NewCdrc(cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection, - closeChan chan struct{}, dfltTimezone string, roundDecimals int, filterS *engine.FilterS) (cdrc *Cdrc, err error) { + closeChan chan struct{}, dfltTimezone string, filterS *engine.FilterS) (cdrc *Cdrc, err error) { cdrcCfg := cdrcCfgs[0] cdrc = &Cdrc{ httpSkipTlsCheck: httpSkipTlsCheck, @@ -63,17 +63,16 @@ func NewCdrc(cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool, cdrs rpcclient.R cdrs: cdrs, closeChan: closeChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles), - roundDecimals: roundDecimals, } // Before processing, make sure in and out folders exist - if utils.IsSliceMember(utils.MainCDRFields, cdrcCfg.CdrFormat) { + if utils.IsSliceMember(utils.CDRCFileFormats, cdrcCfg.CdrFormat) { for _, dir := range []string{cdrcCfg.CDRInPath, cdrcCfg.CDROutPath} { if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) { return nil, fmt.Errorf(" nonexistent folder: %s", dir) } } } - if utils.IsSliceMember(utils.MainCDRFields, cdrcCfg.CdrFormat) { + if utils.IsSliceMember(utils.CDRCFileFormats, cdrcCfg.CdrFormat) { var processFile struct{} for i := 0; i < cdrcCfg.MaxOpenFiles; i++ { cdrc.maxOpenFiles <- processFile // Empty initiate so we do not need to wait later when we pop @@ -92,7 +91,6 @@ type Cdrc struct { closeChan chan struct{} // Used to signal config reloads when we need to span different CDRC-Client maxOpenFiles chan struct{} // Maximum number of simultaneous files processed filterS *engine.FilterS - roundDecimals int } // When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing @@ -182,7 +180,7 @@ func (self *Cdrc) processFile(filePath string) error { csvReader.Comment = '#' recordsProcessor = NewCsvRecordsProcessor(csvReader, self.timezone, fn, self.dfltCdrcCfg, self.cdrcCfgs, self.httpSkipTlsCheck, - self.dfltCdrcCfg.CacheDumpFields, self.filterS, self.cdrs, self.roundDecimals) + self.dfltCdrcCfg.CacheDumpFields, self.filterS, self.cdrs) case utils.MetaFileFWV: recordsProcessor = NewFwvRecordsProcessor(file, self.dfltCdrcCfg, self.cdrcCfgs, self.httpSkipTlsCheck, self.timezone, self.filterS) diff --git a/cdrc/csv.go b/cdrc/csv.go index 864a57210..34a6fca73 100644 --- a/cdrc/csv.go +++ b/cdrc/csv.go @@ -35,7 +35,7 @@ import ( func NewCsvRecordsProcessor(csvReader *csv.Reader, timezone, fileName string, dfltCdrcCfg *config.CdrcCfg, cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool, cacheDumpFields []*config.FCTemplate, - filterS *engine.FilterS, cdrs rpcclient.RpcClientConnection, roundDecimals int) *CsvRecordsProcessor { + filterS *engine.FilterS, cdrs rpcclient.RpcClientConnection) *CsvRecordsProcessor { return &CsvRecordsProcessor{csvReader: csvReader, timezone: timezone, fileName: fileName, dfltCdrcCfg: dfltCdrcCfg, cdrcCfgs: cdrcCfgs, @@ -44,8 +44,7 @@ func NewCsvRecordsProcessor(csvReader *csv.Reader, timezone, fileName string, dfltCdrcCfg.CDROutPath, dfltCdrcCfg.FieldSeparator), partialRecordsCache: NewPartialRecordsCache(dfltCdrcCfg.PartialRecordCache, dfltCdrcCfg.PartialCacheExpiryAction, dfltCdrcCfg.CDROutPath, - dfltCdrcCfg.FieldSeparator, roundDecimals, - timezone, httpSkipTlsCheck, cdrs, filterS), + dfltCdrcCfg.FieldSeparator, timezone, httpSkipTlsCheck, cdrs, filterS), partialCacheDumpFields: cacheDumpFields, filterS: filterS} } @@ -132,12 +131,18 @@ func (self *CsvRecordsProcessor) processRecord(record []string) ([]*engine.CDR, if err != nil { return nil, fmt.Errorf("Failed converting to StoredCdr, error: %s", err.Error()) } else if self.dfltCdrcCfg.CdrFormat == utils.MetaPartialCSV { + fmt.Println("===Teo===") + fmt.Println(utils.ToJSON(record)) + fmt.Println(utils.ToJSON(storedCdr)) if storedCdr, err = self.partialRecordsCache.MergePartialCDRRecord(NewPartialCDRRecord(storedCdr, self.partialCacheDumpFields)); err != nil { return nil, fmt.Errorf("Failed merging PartialCDR, error: %s", err.Error()) } else if storedCdr == nil { // CDR was absorbed by cache since it was partial + fmt.Println("===CDR ABSORBED===") continue } } + fmt.Println("=== storedCdr to save : ===") + fmt.Println(utils.ToJSON(storedCdr)) recordCdrs = append(recordCdrs, storedCdr) if !cdrcCfg.ContinueOnSuccess { break diff --git a/cdrc/csv_it_test.go b/cdrc/csv_it_test.go index a7639cc7c..aa9b079b8 100644 --- a/cdrc/csv_it_test.go +++ b/cdrc/csv_it_test.go @@ -166,7 +166,8 @@ func TestCsvITAnalyseCDRs(t *testing.T) { } else if len(reply) != 5 { // 1 injected, 1 rated, 1 *raw and it's pair in *default run t.Error("Unexpected number of CDRs returned: ", len(reply)) } - if err := cdrcRpc.Call(utils.ApierV2GetCDRs, utils.RPCCDRsFilter{DestinationPrefixes: []string{"08651"}}, &reply); err == nil || err.Error() != utils.NotFoundCaps { + if err := cdrcRpc.Call(utils.ApierV2GetCDRs, utils.RPCCDRsFilter{DestinationPrefixes: []string{"08651"}}, + &reply); err == nil || err.Error() != utils.NotFoundCaps { t.Error("Unexpected error: ", err) // Original 08651 was converted } } diff --git a/cdrc/partial_cdr.go b/cdrc/partial_cdr.go index 2ab3b3f4f..fe3453998 100644 --- a/cdrc/partial_cdr.go +++ b/cdrc/partial_cdr.go @@ -39,10 +39,10 @@ const ( ) func NewPartialRecordsCache(ttl time.Duration, expiryAction string, cdrOutDir string, csvSep rune, - roundDecimals int, timezone string, httpSkipTlsCheck bool, + timezone string, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection, filterS *engine.FilterS) *PartialRecordsCache { return &PartialRecordsCache{ttl: ttl, expiryAction: expiryAction, cdrOutDir: cdrOutDir, - csvSep: csvSep, roundDecimals: roundDecimals, timezone: timezone, + csvSep: csvSep, timezone: timezone, httpSkipTlsCheck: httpSkipTlsCheck, cdrs: cdrs, partialRecords: make(map[string]*PartialCDRRecord), dumpTimers: make(map[string]*time.Timer), @@ -54,7 +54,6 @@ type PartialRecordsCache struct { expiryAction string cdrOutDir string csvSep rune - roundDecimals int timezone string httpSkipTlsCheck bool cdrs rpcclient.RpcClientConnection diff --git a/cdrc/partialcsv_it_test.go b/cdrc/partialcsv_it_test.go index 63bc4dc8d..1ca0432f6 100644 --- a/cdrc/partialcsv_it_test.go +++ b/cdrc/partialcsv_it_test.go @@ -20,6 +20,7 @@ along with this program. If not, see package cdrc import ( + "fmt" "io/ioutil" "net/rpc" "net/rpc/jsonrpc" @@ -96,9 +97,11 @@ func TestPartcsvITCreateCdrDirs(t *testing.T) { } func TestPartcsvITStartEngine(t *testing.T) { - if _, err := engine.StopStartEngine(partpartcsvCfgPath, *waitRater); err != nil { - t.Fatal(err) - } + // if _, err := engine.StopStartEngine(partpartcsvCfgPath, *waitRater); err != nil { + // t.Fatal(err) + // } + fmt.Println("START THE ENGINE MANUAL ") + time.Sleep(10 * time.Second) } // Connect rpc client to rater @@ -151,7 +154,7 @@ func TestPartcsvITProcessedFiles(t *testing.T) { if outContent1, err := ioutil.ReadFile(path.Join(partcsvCDRCDirOut1, "file1.csv")); err != nil { t.Error(err) } else if partCsvFileContent1 != string(outContent1) { - t.Errorf("Expecting: %q, received: %q", partCsvFileContent1, string(outContent1)) + t.Errorf("Expecting: %q, \n received: %q", partCsvFileContent1, string(outContent1)) } if outContent2, err := ioutil.ReadFile(path.Join(partcsvCDRCDirOut1, "file2.csv")); err != nil { t.Error(err) @@ -172,7 +175,7 @@ func TestPartcsvITProcessedFiles(t *testing.T) { if contentCacheDump, err := ioutil.ReadFile(path.Join(partcsvCDRCDirOut1, fileName)); err != nil { t.Error(err) } else if len(eCacheDumpFile1) != len(string(contentCacheDump)) { - t.Errorf("Expecting: %q, received: %q", eCacheDumpFile1, string(contentCacheDump)) + t.Errorf("Expecting: %q, \n received: %q", eCacheDumpFile1, string(contentCacheDump)) } if outContent3, err := ioutil.ReadFile(path.Join(partcsvCDRCDirOut2, "file3.csv")); err != nil { t.Error(err) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 2daa06702..c45f8beb6 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -144,8 +144,7 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne } } cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrsConn, closeChan, - cfg.GeneralCfg().DefaultTimezone, cfg.GeneralCfg().RoundingDecimals, - filterS) + cfg.GeneralCfg().DefaultTimezone, filterS) if err != nil { utils.Logger.Crit(fmt.Sprintf("Cdrc config parsing error: %s", err.Error())) exitChan <- true diff --git a/data/conf/samples/cdrc_partcsv/cgrates.json b/data/conf/samples/cdrc_partcsv/cgrates.json index 853690799..6ee20eee7 100644 --- a/data/conf/samples/cdrc_partcsv/cgrates.json +++ b/data/conf/samples/cdrc_partcsv/cgrates.json @@ -40,8 +40,6 @@ {"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~0"}, {"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"}, {"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "~1"}, - {"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "_"}, - {"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~4"}, {"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~3"}, {"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "*rated", "mandatory": true}, {"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "*out", "mandatory": true}, @@ -80,8 +78,6 @@ {"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~0"}, {"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"}, {"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "~1"}, - {"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "_"}, - {"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~4"}, {"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~3"}, {"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "*rated", "mandatory": true}, {"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "*out", "mandatory": true}, diff --git a/data/conf/samples/cdrcflatstore/cgrates.json b/data/conf/samples/cdrcflatstore/cgrates.json index c5621a118..7f2a23e8c 100644 --- a/data/conf/samples/cdrcflatstore/cgrates.json +++ b/data/conf/samples/cdrcflatstore/cgrates.json @@ -44,7 +44,6 @@ "field_separator": "|", // separator used in case of csv files "run_delay": 0, // sleep interval in seconds between consecutive runs, 0 to use automation via inotify "max_open_files": 1024, // maximum simultaneous files to process - "data_usage_multiply_factor": 1024, // conversion factor for data usage "cdr_in_path": "/tmp/cgr_flatstore/cdrc/in", // absolute path towards the directory where the CDRs are stored "cdr_out_path": "/tmp/cgr_flatstore/cdrc/out", // absolute path towards the directory where processed CDRs will be moved "failed_calls_prefix": "missed_calls", // used in case of flatstore CDRs to avoid searching for BYE records diff --git a/data/conf/samples/cdrcxmlwithfilter/cgrates.json b/data/conf/samples/cdrcxmlwithfilter/cgrates.json index 0e9761655..777c8f49a 100755 --- a/data/conf/samples/cdrcxmlwithfilter/cgrates.json +++ b/data/conf/samples/cdrcxmlwithfilter/cgrates.json @@ -51,7 +51,7 @@ "cdr_format": "*file_xml", // CDR file format