diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index 61934de03..5bd186bf2 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -44,10 +44,6 @@ const ( UNPAIRED_SUFFIX = ".unpaired" ) -type RawRecordsReader interface { - Read() ([]string, error) // Reads one record as slice -} - // Populates the func populateStoredCdrField(cdr *engine.StoredCdr, fieldId, fieldVal string) error { var err error @@ -170,9 +166,9 @@ func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrS runDelay: cdrcCfg.RunDelay, csvSep: cdrcCfg.FieldSeparator, httpSkipTlsCheck: httpSkipTlsCheck, cdrServer: cdrServer, exitChan: exitChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles), partialRecords: make(map[string]map[string]*PartialFlatstoreRecord), guard: engine.NewGuardianLock()} - var processFile struct{} + var processCsvFile struct{} for i := 0; i < cdrcCfg.MaxOpenFiles; i++ { - cdrc.maxOpenFiles <- processFile // Empty initiate so we do not need to wait later when we pop + cdrc.maxOpenFiles <- processCsvFile // Empty initiate so we do not need to wait later when we pop } cdrc.cdrSourceIds = make([]string, len(cdrcCfgs)) cdrc.duMultiplyFactors = make([]float64, len(cdrcCfgs)) @@ -262,7 +258,7 @@ func (self *Cdrc) trackCDRFiles() (err error) { case ev := <-watcher.Events: if ev.Op&fsnotify.Create == fsnotify.Create && (self.CdrFormat != FS_CSV || path.Ext(ev.Name) != ".csv") { go func() { //Enable async processing here - if err = self.processFile(ev.Name); err != nil { + if err = self.processCsvFile(ev.Name); err != nil { engine.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", ev.Name, err.Error())) } }() @@ -280,7 +276,7 @@ func (self *Cdrc) processCdrDir() error { for _, file := range filesInDir { if self.CdrFormat != FS_CSV || path.Ext(file.Name()) != ".csv" { go func() { //Enable async processing here - if err := self.processFile(path.Join(self.cdrInDir, file.Name())); err != nil { + if err := self.processCsvFile(path.Join(self.cdrInDir, file.Name())); err != nil { engine.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", file, err.Error())) } }() @@ -290,10 +286,10 @@ func (self *Cdrc) processCdrDir() error { } // Processe file at filePath and posts the valid cdr rows out of it -func (self *Cdrc) processFile(filePath string) error { +func (self *Cdrc) processCsvFile(filePath string) error { if cap(self.maxOpenFiles) != 0 { // 0 goes for no limit - processFile := <-self.maxOpenFiles // Queue here for maxOpenFiles - defer func() { self.maxOpenFiles <- processFile }() + processCsvFile := <-self.maxOpenFiles // Queue here for maxOpenFiles + defer func() { self.maxOpenFiles <- processCsvFile }() } _, fn := path.Split(filePath) engine.Logger.Info(fmt.Sprintf(" Parsing: %s", filePath)) @@ -303,33 +299,12 @@ func (self *Cdrc) processFile(filePath string) error { engine.Logger.Crit(err.Error()) return err } - var rdr RawRecordsReader - if utils.IsSliceMember([]string{CSV, FS_CSV, utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.CdrFormat) { - csvReader := csv.NewReader(bufio.NewReader(file)) - csvReader.Comma = self.csvSep - rdr = csvReader - } - timeStart := time.Now() - rowsProcessed, err := self.processRecords(rdr, fn) - if err != nil { - return err - } - // Finished with file, move it to processed folder - newPath := path.Join(self.cdrOutDir, fn) - if err := os.Rename(filePath, newPath); err != nil { - engine.Logger.Err(err.Error()) - return err - } - engine.Logger.Info(fmt.Sprintf("Finished processing %s, moved to %s. Total records processed: %d, run duration: %s", - fn, newPath, rowsProcessed, time.Now().Sub(timeStart))) - return nil -} - -// Process all records in reader -func (self *Cdrc) processRecords(rdr RawRecordsReader, fileName string) (int, error) { + csvReader := csv.NewReader(bufio.NewReader(file)) + csvReader.Comma = self.csvSep procRowNr := 0 + timeStart := time.Now() for { - record, err := rdr.Read() + record, err := csvReader.Read() if err != nil && err == io.EOF { break // End of file } @@ -339,7 +314,7 @@ func (self *Cdrc) processRecords(rdr RawRecordsReader, fileName string) (int, er continue // Other csv related errors, ignore } if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.CdrFormat) { // partial records for flatstore CDRs - if record, err = self.processPartialRecord(record, fileName); err != nil { + if record, err = self.processPartialRecord(record, fn); err != nil { engine.Logger.Err(fmt.Sprintf(" Failed processing partial record, row: %d, error: %s", procRowNr, err.Error())) continue } else if record == nil { @@ -352,7 +327,15 @@ func (self *Cdrc) processRecords(rdr RawRecordsReader, fileName string) (int, er continue } } - return procRowNr, nil + // Finished with file, move it to processed folder + newPath := path.Join(self.cdrOutDir, fn) + if err := os.Rename(filePath, newPath); err != nil { + engine.Logger.Err(err.Error()) + return err + } + engine.Logger.Info(fmt.Sprintf("Finished processing %s, moved to %s. Total records processed: %d, run duration: %s", + fn, newPath, procRowNr, time.Now().Sub(timeStart))) + return nil } // Processes a single partial record for flatstore CDRs