Revert "Modifying CDRC file processing to accept sources other than .csv"

This reverts commit 51cb6c388f.
This commit is contained in:
DanB
2015-07-24 15:06:19 +02:00
parent 3b2668b22c
commit df92a1453d

View File

@@ -44,10 +44,6 @@ const (
UNPAIRED_SUFFIX = ".unpaired"
)
type RawRecordsReader interface {
Read() ([]string, error) // Reads one record as slice
}
// Populates the
func populateStoredCdrField(cdr *engine.StoredCdr, fieldId, fieldVal string) error {
var err error
@@ -170,9 +166,9 @@ func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrS
runDelay: cdrcCfg.RunDelay, csvSep: cdrcCfg.FieldSeparator,
httpSkipTlsCheck: httpSkipTlsCheck, cdrServer: cdrServer, exitChan: exitChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles),
partialRecords: make(map[string]map[string]*PartialFlatstoreRecord), guard: engine.NewGuardianLock()}
var processFile struct{}
var processCsvFile struct{}
for i := 0; i < cdrcCfg.MaxOpenFiles; i++ {
cdrc.maxOpenFiles <- processFile // Empty initiate so we do not need to wait later when we pop
cdrc.maxOpenFiles <- processCsvFile // Empty initiate so we do not need to wait later when we pop
}
cdrc.cdrSourceIds = make([]string, len(cdrcCfgs))
cdrc.duMultiplyFactors = make([]float64, len(cdrcCfgs))
@@ -262,7 +258,7 @@ func (self *Cdrc) trackCDRFiles() (err error) {
case ev := <-watcher.Events:
if ev.Op&fsnotify.Create == fsnotify.Create && (self.CdrFormat != FS_CSV || path.Ext(ev.Name) != ".csv") {
go func() { //Enable async processing here
if err = self.processFile(ev.Name); err != nil {
if err = self.processCsvFile(ev.Name); err != nil {
engine.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", ev.Name, err.Error()))
}
}()
@@ -280,7 +276,7 @@ func (self *Cdrc) processCdrDir() error {
for _, file := range filesInDir {
if self.CdrFormat != FS_CSV || path.Ext(file.Name()) != ".csv" {
go func() { //Enable async processing here
if err := self.processFile(path.Join(self.cdrInDir, file.Name())); err != nil {
if err := self.processCsvFile(path.Join(self.cdrInDir, file.Name())); err != nil {
engine.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", file, err.Error()))
}
}()
@@ -290,10 +286,10 @@ func (self *Cdrc) processCdrDir() error {
}
// Processe file at filePath and posts the valid cdr rows out of it
func (self *Cdrc) processFile(filePath string) error {
func (self *Cdrc) processCsvFile(filePath string) error {
if cap(self.maxOpenFiles) != 0 { // 0 goes for no limit
processFile := <-self.maxOpenFiles // Queue here for maxOpenFiles
defer func() { self.maxOpenFiles <- processFile }()
processCsvFile := <-self.maxOpenFiles // Queue here for maxOpenFiles
defer func() { self.maxOpenFiles <- processCsvFile }()
}
_, fn := path.Split(filePath)
engine.Logger.Info(fmt.Sprintf("<Cdrc> Parsing: %s", filePath))
@@ -303,33 +299,12 @@ func (self *Cdrc) processFile(filePath string) error {
engine.Logger.Crit(err.Error())
return err
}
var rdr RawRecordsReader
if utils.IsSliceMember([]string{CSV, FS_CSV, utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.CdrFormat) {
csvReader := csv.NewReader(bufio.NewReader(file))
csvReader.Comma = self.csvSep
rdr = csvReader
}
timeStart := time.Now()
rowsProcessed, err := self.processRecords(rdr, fn)
if err != nil {
return err
}
// Finished with file, move it to processed folder
newPath := path.Join(self.cdrOutDir, fn)
if err := os.Rename(filePath, newPath); err != nil {
engine.Logger.Err(err.Error())
return err
}
engine.Logger.Info(fmt.Sprintf("Finished processing %s, moved to %s. Total records processed: %d, run duration: %s",
fn, newPath, rowsProcessed, time.Now().Sub(timeStart)))
return nil
}
// Process all records in reader
func (self *Cdrc) processRecords(rdr RawRecordsReader, fileName string) (int, error) {
csvReader := csv.NewReader(bufio.NewReader(file))
csvReader.Comma = self.csvSep
procRowNr := 0
timeStart := time.Now()
for {
record, err := rdr.Read()
record, err := csvReader.Read()
if err != nil && err == io.EOF {
break // End of file
}
@@ -339,7 +314,7 @@ func (self *Cdrc) processRecords(rdr RawRecordsReader, fileName string) (int, er
continue // Other csv related errors, ignore
}
if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.CdrFormat) { // partial records for flatstore CDRs
if record, err = self.processPartialRecord(record, fileName); err != nil {
if record, err = self.processPartialRecord(record, fn); err != nil {
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed processing partial record, row: %d, error: %s", procRowNr, err.Error()))
continue
} else if record == nil {
@@ -352,7 +327,15 @@ func (self *Cdrc) processRecords(rdr RawRecordsReader, fileName string) (int, er
continue
}
}
return procRowNr, nil
// Finished with file, move it to processed folder
newPath := path.Join(self.cdrOutDir, fn)
if err := os.Rename(filePath, newPath); err != nil {
engine.Logger.Err(err.Error())
return err
}
engine.Logger.Info(fmt.Sprintf("Finished processing %s, moved to %s. Total records processed: %d, run duration: %s",
fn, newPath, procRowNr, time.Now().Sub(timeStart)))
return nil
}
// Processes a single partial record for flatstore CDRs