Better implementation of processedRecords in CDRC, fixes #212

This commit is contained in:
DanB
2015-09-28 16:33:22 +02:00
parent 011d662bb8
commit 43be74f509
4 changed files with 33 additions and 22 deletions

View File

@@ -95,6 +95,7 @@ func populateStoredCdrField(cdr *engine.StoredCdr, fieldId, fieldVal, timezone s
// Understands and processes a specific format of cdr (eg: .csv or .fwv)
type RecordsProcessor interface {
ProcessNextRecord() ([]*engine.StoredCdr, error) // Process a single record in the CDR file, return a slice of CDRs since based on configuration we can have more templates
ProcessedRecordsNr() int64
}
/*
@@ -260,7 +261,7 @@ func (self *Cdrc) processFile(filePath string) error {
default:
return fmt.Errorf("Unsupported CDR format: %s", self.cdrFormat)
}
procRowNr := 0
rowNr := 0 // This counts the rows in the file, not really number of CDRs
cdrsPosted := 0
timeStart := time.Now()
for {
@@ -268,9 +269,8 @@ func (self *Cdrc) processFile(filePath string) error {
if err != nil && err == io.EOF {
break
}
procRowNr += 1
if err != nil {
engine.Logger.Err(fmt.Sprintf("<Cdrc> Row %d, error: %s", procRowNr, err.Error()))
engine.Logger.Err(fmt.Sprintf("<Cdrc> Row %d, error: %s", rowNr, err.Error()))
continue
}
for _, storedCdr := range cdrs { // Send CDRs to CDRS
@@ -283,9 +283,8 @@ func (self *Cdrc) processFile(filePath string) error {
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed sending CDR, %+v, error: %s", storedCdr, err.Error()))
} else if reply != "OK" {
engine.Logger.Err(fmt.Sprintf("<Cdrc> Received unexpected reply for CDR, %+v, reply: %s", storedCdr, reply))
} else {
cdrsPosted += 1
}
cdrsPosted += 1
}
}
// Finished with file, move it to processed folder
@@ -295,6 +294,6 @@ func (self *Cdrc) processFile(filePath string) error {
return err
}
engine.Logger.Info(fmt.Sprintf("Finished processing %s, moved to %s. Total records processed: %d, CDRs posted: %d, run duration: %s",
fn, newPath, procRowNr, cdrsPosted, time.Now().Sub(timeStart)))
fn, newPath, recordsProcessor.ProcessedRecordsNr(), cdrsPosted, time.Now().Sub(timeStart)))
return nil
}

View File

@@ -93,7 +93,7 @@ func stopEngine() error {
}
// Need it here and not in init since Travis has no possibility to load local file
func TestLoadConfigt(*testing.T) {
func TestCsvLclLoadConfigt(*testing.T) {
if !*testLocal {
return
}
@@ -104,7 +104,7 @@ func TestLoadConfigt(*testing.T) {
}
}
func TestEmptyTables(t *testing.T) {
func TestCsvLclEmptyTables(t *testing.T) {
if !*testLocal {
return
}
@@ -131,7 +131,7 @@ func TestEmptyTables(t *testing.T) {
}
// Creates cdr files and starts the engine
func TestCreateCdrFiles(t *testing.T) {
func TestCsvLclCreateCdrFiles(t *testing.T) {
if !*testLocal {
return
}
@@ -162,7 +162,7 @@ func TestCreateCdrFiles(t *testing.T) {
}
func TestProcessCdrDir(t *testing.T) {
func TestCsvLclProcessCdrDir(t *testing.T) {
if !*testLocal {
return
}
@@ -187,7 +187,7 @@ func TestProcessCdrDir(t *testing.T) {
}
// Creates cdr files and starts the engine
func TestCreateCdr3File(t *testing.T) {
func TestCsvLclCreateCdr3File(t *testing.T) {
if !*testLocal {
return
}
@@ -202,7 +202,7 @@ func TestCreateCdr3File(t *testing.T) {
}
}
func TestProcessCdr3Dir(t *testing.T) {
func TestCsvLclProcessCdr3Dir(t *testing.T) {
if !*testLocal {
return
}

View File

@@ -195,6 +195,7 @@ type CsvRecordsProcessor struct {
timezone string // Timezone for CDRs which are not clearly specifying it
fileName string
failedCallsPrefix string
processedRecordsNr int64 // Number of content records in file
cdrSourceIds []string // Should be in sync with cdrFields on indexes
duMultiplyFactors []float64
cdrFilters []utils.RSRFields // Should be in sync with cdrFields on indexes
@@ -203,11 +204,16 @@ type CsvRecordsProcessor struct {
partialRecordsCache *PartialRecordsCache // Shared by cdrc so we can cache for all files in a folder
}
func (self *CsvRecordsProcessor) ProcessedRecordsNr() int64 {
return self.processedRecordsNr
}
func (self *CsvRecordsProcessor) ProcessNextRecord() ([]*engine.StoredCdr, error) {
record, err := self.csvReader.Read()
if err != nil {
return nil, err
}
self.processedRecordsNr += 1
if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.cdrFormat) {
if record, err = self.processPartialRecord(record); err != nil {
return nil, err

View File

@@ -52,16 +52,17 @@ func NewFwvRecordsProcessor(file *os.File, cdrcCfgs map[string]*config.CdrcConfi
}
type FwvRecordsProcessor struct {
file *os.File
cdrcCfgs map[string]*config.CdrcConfig
dfltCfg *config.CdrcConfig // General parameters
httpClient *http.Client
httpSkipTlsCheck bool
timezone string
lineLen int64 // Length of the line in the file
offset int64 // Index of the next byte to process
trailerOffset int64 // Index where trailer starts, to be used as boundary when reading cdrs
headerCdr *engine.StoredCdr // Cache here the general purpose stored CDR
file *os.File
cdrcCfgs map[string]*config.CdrcConfig
dfltCfg *config.CdrcConfig // General parameters
httpClient *http.Client
httpSkipTlsCheck bool
timezone string
lineLen int64 // Length of the line in the file
offset int64 // Index of the next byte to process
processedRecordsNr int64 // Number of content records in file
trailerOffset int64 // Index where trailer starts, to be used as boundary when reading cdrs
headerCdr *engine.StoredCdr // Cache here the general purpose stored CDR
}
// Sets the line length based on first line, sets offset back to initial after reading
@@ -78,6 +79,10 @@ func (self *FwvRecordsProcessor) setLineLen() error {
return nil
}
func (self *FwvRecordsProcessor) ProcessedRecordsNr() int64 {
return self.processedRecordsNr
}
func (self *FwvRecordsProcessor) ProcessNextRecord() ([]*engine.StoredCdr, error) {
defer func() { self.offset += self.lineLen }() // Schedule increasing the offset once we are out from processing the record
if self.offset == 0 { // First time, set the necessary offsets
@@ -116,6 +121,7 @@ func (self *FwvRecordsProcessor) ProcessNextRecord() ([]*engine.StoredCdr, error
engine.Logger.Err(fmt.Sprintf("<Cdrc> Could not read complete line, have instead: %s", string(buf)))
return nil, io.EOF
}
self.processedRecordsNr += 1
record := string(buf)
for cfgKey := range self.cdrcCfgs {
if passes := self.recordPassesCfgFilter(record, cfgKey); !passes {