diff --git a/ers/filecsv.go b/ers/filecsv.go index dd4b1a3c1..4cdd2bf11 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -138,7 +138,7 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) { rowNr := 0 // This counts the rows in the file, not really number of CDRs evsPosted := 0 timeStart := time.Now() - reqVars := make(map[string]interface{}) + reqVars := map[string]interface{}{utils.FileName: fName} for { var record []string if record, err = csvReader.Read(); err != nil { diff --git a/ers/filefwv.go b/ers/filefwv.go index 2aede3bed..0f7101e28 100644 --- a/ers/filefwv.go +++ b/ers/filefwv.go @@ -58,7 +58,7 @@ func NewFWVFileERER(cfg *config.CGRConfig, cfgIdx int, return fwvER, nil } -// XMLFileER implements EventReader interface for .xml files +// FWVFileER implements EventReader interface for .fwv files type FWVFileER struct { sync.RWMutex cgrCfg *config.CGRConfig @@ -139,7 +139,7 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) { rowNr := 0 // This counts the rows in the file, not really number of CDRs evsPosted := 0 timeStart := time.Now() - reqVars := make(map[string]interface{}) + reqVars := map[string]interface{}{utils.FileName: fName} for { var hasHeader, hasTrailer bool @@ -255,13 +255,13 @@ func (rdr *FWVFileER) setLineLen(file *os.File, hasHeader, hasTrailer bool) erro lastLineSize = len(readBytes) } if hasTrailer { - if fi, err := file.Stat(); err != nil { + fi, err := file.Stat() + if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Row 0, error: cannot get file stats: %s", utils.ERs, err.Error())) return err - } else { - rdr.trailerOffset = fi.Size() - int64(lastLineSize) - rdr.trailerLenght = int64(lastLineSize) } + rdr.trailerOffset = fi.Size() - int64(lastLineSize) + rdr.trailerLenght = int64(lastLineSize) } if _, err := file.Seek(0, 0); err != nil { diff --git a/ers/filexml.go b/ers/filexml.go index b926948bf..9ab3a21d1 100644 --- a/ers/filexml.go +++ b/ers/filexml.go @@ -137,7 +137,7 @@ func (rdr *XMLFileER) processFile(fPath, fName string) (err error) { rowNr := 0 // This counts the rows in the file, not really number of CDRs evsPosted := 0 timeStart := time.Now() - reqVars := make(map[string]interface{}) + reqVars := map[string]interface{}{utils.FileName: fName} for _, xmlElmt := range xmlElmts { rowNr++ // increment the rowNr after checking if it's not the end of file agReq := agents.NewAgentRequest( diff --git a/ers/flatstore.go b/ers/flatstore.go index e42f8fabc..f85d824cb 100644 --- a/ers/flatstore.go +++ b/ers/flatstore.go @@ -145,7 +145,7 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) { rowNr := 0 // This counts the rows in the file, not really number of CDRs evsPosted := 0 timeStart := time.Now() - reqVars := make(map[string]interface{}) + reqVars := map[string]interface{}{utils.FileName: fName} for { var record []string if record, err = csvReader.Read(); err != nil { @@ -159,8 +159,9 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) { } else { pr, err := NewUnpairedRecord(record, rdr.Config().Timezone, fName) if err != nil { - fmt.Sprintf("<%s> Converting row : <%s> to unpairedRecord , ignoring due to error: <%s>", - utils.ERs, record, err.Error()) + utils.Logger.Warning( + fmt.Sprintf("<%s> Converting row : <%s> to unpairedRecord , ignoring due to error: <%s>", + utils.ERs, record, err.Error())) continue } if val, has := rdr.cache.Get(pr.OriginID); !has { @@ -170,8 +171,9 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) { pair := val.(*UnpairedRecord) record, err = pairToRecord(pair, pr) if err != nil { - fmt.Sprintf("<%s> Merging unpairedRecords : <%s> and <%s> to record , ignoring due to error: <%s>", - utils.ERs, utils.ToJSON(pair), utils.ToJSON(pr), err.Error()) + utils.Logger.Warning( + fmt.Sprintf("<%s> Merging unpairedRecords : <%s> and <%s> to record , ignoring due to error: <%s>", + utils.ERs, utils.ToJSON(pair), utils.ToJSON(pr), err.Error())) continue } rdr.cache.Remove(pr.OriginID) @@ -234,7 +236,7 @@ func NewUnpairedRecord(record []string, timezone string, fileName string) (*Unpa return pr, nil } -// This is a partial record received from Flatstore, can be INVITE or BYE and it needs to be paired in order to produce duration +// UnpairedRecord is a partial record received from Flatstore, can be INVITE or BYE and it needs to be paired in order to produce duration type UnpairedRecord struct { Method string // INVITE or BYE OriginID string // Copute here the OriginID diff --git a/ers/partial_csv.go b/ers/partial_csv.go index 3622aa0e3..455fa8b30 100644 --- a/ers/partial_csv.go +++ b/ers/partial_csv.go @@ -152,7 +152,7 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) { rowNr := 0 // This counts the rows in the file, not really number of CDRs evsPosted := 0 timeStart := time.Now() - reqVars := make(map[string]interface{}) + reqVars := map[string]interface{}{utils.FileName: fName} for { var record []string if record, err = csvReader.Read(); err != nil { @@ -181,7 +181,7 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) { } // take OriginID and OriginHost to compose CGRID - orgId, err := agReq.CGRRequest.FieldAsString([]string{utils.OriginID}) + orgID, err := agReq.CGRRequest.FieldAsString([]string{utils.OriginID}) if err == utils.ErrNotFound { utils.Logger.Warning( fmt.Sprintf("<%s> Missing field for row <%d> , <%s>", @@ -195,7 +195,7 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) { utils.ERs, rowNr, record)) continue } - cgrID := utils.Sha1(orgId, orgHost) + cgrID := utils.Sha1(orgID, orgHost) // take Partial field from NavigableMap partial, _ := agReq.CGRRequest.FieldAsString([]string{utils.Partial}) if val, has := rdr.cache.Get(cgrID); !has { @@ -218,10 +218,9 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) { sTime, _ := origCgrEvs[i].FieldAsTime(utils.SetupTime, agReq.Timezone) sTime2, _ := origCgrEvs[j].FieldAsTime(utils.SetupTime, agReq.Timezone) return sTime.Before(sTime2) - } else { - aTime2, _ := origCgrEvs[j].FieldAsTime(utils.AnswerTime, agReq.Timezone) - return aTime.Before(aTime2) } + aTime2, _ := origCgrEvs[j].FieldAsTime(utils.AnswerTime, agReq.Timezone) + return aTime.Before(aTime2) }) // compose the CGREvent from slice cgrEv := new(utils.CGREvent) @@ -261,10 +260,6 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) { return } -const ( - PartialRecordsSuffix = "partial" -) - func (rdr *PartialCSVFileER) dumpToFile(itmID string, value interface{}) { origCgrEvs := value.([]*utils.CGREvent) for _, origCgrEv := range origCgrEvs { @@ -348,10 +343,9 @@ func (rdr *PartialCSVFileER) postCDR(itmID string, value interface{}) { sTime, _ := origCgrEvs[i].FieldAsTime(utils.SetupTime, rdr.Config().Timezone) sTime2, _ := origCgrEvs[j].FieldAsTime(utils.SetupTime, rdr.Config().Timezone) return sTime.Before(sTime2) - } else { - aTime2, _ := origCgrEvs[j].FieldAsTime(utils.AnswerTime, rdr.Config().Timezone) - return aTime.Before(aTime2) } + aTime2, _ := origCgrEvs[j].FieldAsTime(utils.AnswerTime, rdr.Config().Timezone) + return aTime.Before(aTime2) }) // compose the CGREvent from slice cgrEv := &utils.CGREvent{ diff --git a/utils/consts.go b/utils/consts.go index 62cdc411f..aac8aab8a 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -705,6 +705,7 @@ const ( MetaCost = "*cost" MetaGroup = "*group" InternalRPCSet = "InternalRPCSet" + FileName = "FileName" ) // Migrator Action