diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index ee6e5bfd5..d55b72ef8 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -95,6 +95,7 @@ func populateStoredCdrField(cdr *engine.StoredCdr, fieldId, fieldVal, timezone s // Understands and processes a specific format of cdr (eg: .csv or .fwv) type RecordsProcessor interface { ProcessNextRecord() ([]*engine.StoredCdr, error) // Process a single record in the CDR file, return a slice of CDRs since based on configuration we can have more templates + ProcessedRecordsNr() int64 } /* @@ -260,7 +261,7 @@ func (self *Cdrc) processFile(filePath string) error { default: return fmt.Errorf("Unsupported CDR format: %s", self.cdrFormat) } - procRowNr := 0 + rowNr := 0 // This counts the rows in the file, not really number of CDRs cdrsPosted := 0 timeStart := time.Now() for { @@ -268,9 +269,8 @@ func (self *Cdrc) processFile(filePath string) error { if err != nil && err == io.EOF { break } - procRowNr += 1 if err != nil { - engine.Logger.Err(fmt.Sprintf(" Row %d, error: %s", procRowNr, err.Error())) + engine.Logger.Err(fmt.Sprintf(" Row %d, error: %s", rowNr, err.Error())) continue } for _, storedCdr := range cdrs { // Send CDRs to CDRS @@ -283,9 +283,8 @@ func (self *Cdrc) processFile(filePath string) error { engine.Logger.Err(fmt.Sprintf(" Failed sending CDR, %+v, error: %s", storedCdr, err.Error())) } else if reply != "OK" { engine.Logger.Err(fmt.Sprintf(" Received unexpected reply for CDR, %+v, reply: %s", storedCdr, reply)) - } else { - cdrsPosted += 1 } + cdrsPosted += 1 } } // Finished with file, move it to processed folder @@ -295,6 +294,6 @@ func (self *Cdrc) processFile(filePath string) error { return err } engine.Logger.Info(fmt.Sprintf("Finished processing %s, moved to %s. Total records processed: %d, CDRs posted: %d, run duration: %s", - fn, newPath, procRowNr, cdrsPosted, time.Now().Sub(timeStart))) + fn, newPath, recordsProcessor.ProcessedRecordsNr(), cdrsPosted, time.Now().Sub(timeStart))) return nil } diff --git a/cdrc/cdrc_local_test.go b/cdrc/cdrc_local_test.go index 8e8566d99..5e0521d73 100644 --- a/cdrc/cdrc_local_test.go +++ b/cdrc/cdrc_local_test.go @@ -93,7 +93,7 @@ func stopEngine() error { } // Need it here and not in init since Travis has no possibility to load local file -func TestLoadConfigt(*testing.T) { +func TestCsvLclLoadConfigt(*testing.T) { if !*testLocal { return } @@ -104,7 +104,7 @@ func TestLoadConfigt(*testing.T) { } } -func TestEmptyTables(t *testing.T) { +func TestCsvLclEmptyTables(t *testing.T) { if !*testLocal { return } @@ -131,7 +131,7 @@ func TestEmptyTables(t *testing.T) { } // Creates cdr files and starts the engine -func TestCreateCdrFiles(t *testing.T) { +func TestCsvLclCreateCdrFiles(t *testing.T) { if !*testLocal { return } @@ -162,7 +162,7 @@ func TestCreateCdrFiles(t *testing.T) { } -func TestProcessCdrDir(t *testing.T) { +func TestCsvLclProcessCdrDir(t *testing.T) { if !*testLocal { return } @@ -187,7 +187,7 @@ func TestProcessCdrDir(t *testing.T) { } // Creates cdr files and starts the engine -func TestCreateCdr3File(t *testing.T) { +func TestCsvLclCreateCdr3File(t *testing.T) { if !*testLocal { return } @@ -202,7 +202,7 @@ func TestCreateCdr3File(t *testing.T) { } } -func TestProcessCdr3Dir(t *testing.T) { +func TestCsvLclProcessCdr3Dir(t *testing.T) { if !*testLocal { return } diff --git a/cdrc/csv.go b/cdrc/csv.go index fa8cac9d5..9d08644a1 100644 --- a/cdrc/csv.go +++ b/cdrc/csv.go @@ -195,6 +195,7 @@ type CsvRecordsProcessor struct { timezone string // Timezone for CDRs which are not clearly specifying it fileName string failedCallsPrefix string + processedRecordsNr int64 // Number of content records in file cdrSourceIds []string // Should be in sync with cdrFields on indexes duMultiplyFactors []float64 cdrFilters []utils.RSRFields // Should be in sync with cdrFields on indexes @@ -203,11 +204,16 @@ type CsvRecordsProcessor struct { partialRecordsCache *PartialRecordsCache // Shared by cdrc so we can cache for all files in a folder } +func (self *CsvRecordsProcessor) ProcessedRecordsNr() int64 { + return self.processedRecordsNr +} + func (self *CsvRecordsProcessor) ProcessNextRecord() ([]*engine.StoredCdr, error) { record, err := self.csvReader.Read() if err != nil { return nil, err } + self.processedRecordsNr += 1 if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.cdrFormat) { if record, err = self.processPartialRecord(record); err != nil { return nil, err diff --git a/cdrc/fwv.go b/cdrc/fwv.go index a7cf89cfa..984f6a665 100644 --- a/cdrc/fwv.go +++ b/cdrc/fwv.go @@ -52,16 +52,17 @@ func NewFwvRecordsProcessor(file *os.File, cdrcCfgs map[string]*config.CdrcConfi } type FwvRecordsProcessor struct { - file *os.File - cdrcCfgs map[string]*config.CdrcConfig - dfltCfg *config.CdrcConfig // General parameters - httpClient *http.Client - httpSkipTlsCheck bool - timezone string - lineLen int64 // Length of the line in the file - offset int64 // Index of the next byte to process - trailerOffset int64 // Index where trailer starts, to be used as boundary when reading cdrs - headerCdr *engine.StoredCdr // Cache here the general purpose stored CDR + file *os.File + cdrcCfgs map[string]*config.CdrcConfig + dfltCfg *config.CdrcConfig // General parameters + httpClient *http.Client + httpSkipTlsCheck bool + timezone string + lineLen int64 // Length of the line in the file + offset int64 // Index of the next byte to process + processedRecordsNr int64 // Number of content records in file + trailerOffset int64 // Index where trailer starts, to be used as boundary when reading cdrs + headerCdr *engine.StoredCdr // Cache here the general purpose stored CDR } // Sets the line length based on first line, sets offset back to initial after reading @@ -78,6 +79,10 @@ func (self *FwvRecordsProcessor) setLineLen() error { return nil } +func (self *FwvRecordsProcessor) ProcessedRecordsNr() int64 { + return self.processedRecordsNr +} + func (self *FwvRecordsProcessor) ProcessNextRecord() ([]*engine.StoredCdr, error) { defer func() { self.offset += self.lineLen }() // Schedule increasing the offset once we are out from processing the record if self.offset == 0 { // First time, set the necessary offsets @@ -116,6 +121,7 @@ func (self *FwvRecordsProcessor) ProcessNextRecord() ([]*engine.StoredCdr, error engine.Logger.Err(fmt.Sprintf(" Could not read complete line, have instead: %s", string(buf))) return nil, io.EOF } + self.processedRecordsNr += 1 record := string(buf) for cfgKey := range self.cdrcCfgs { if passes := self.recordPassesCfgFilter(record, cfgKey); !passes {