diff --git a/ers/filefwv.go b/ers/filefwv.go index 055ceb069..847ad5913 100644 --- a/ers/filefwv.go +++ b/ers/filefwv.go @@ -49,7 +49,6 @@ func NewFWVFileERER(cfg *config.CGRConfig, cfgIdx int, rdrEvents: rdrEvents, rdrError: rdrErr, rdrExit: rdrExit, - files: make(map[string]*fileVars), conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)} var processFile struct{} for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ { @@ -68,9 +67,7 @@ type FWVFileER struct { rdrEvents chan *erEvent // channel to dispatch the events created to rdrError chan error rdrExit chan struct{} - conReqs chan struct{} // limit number of opened files - files map[string]*fileVars // map that contains the relevant variables for each file being processed - fileMutex sync.RWMutex // mutex used for the map with file variables + conReqs chan struct{} // limit number of opened files } type fileVars struct { @@ -143,10 +140,7 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) { } defer file.Close() - rdr.fileMutex.Lock() - rdr.files[fName] = &fileVars{file: file} // add map entry containing variables related to the current file that's being processed - rdr.fileMutex.Unlock() - + fileVars := &fileVars{file: file, path: absPath} rowNr := 0 // This counts the rows in the file, not really number of CDRs evsPosted := 0 // Number of CDRs successfully processed timeStart := time.Now() @@ -155,8 +149,7 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) { for { var hasHeader, hasTrailer bool var headerFields, trailerFields []*config.FCTemplate - rdr.fileMutex.Lock() - if rdr.files[fName].offset == 0 { // First time, set the necessary offsets + if fileVars.offset == 0 { // First time, set the necessary offsets // preprocess the fields for header and trailer for _, fld := range rdr.Config().Fields { if strings.HasPrefix(fld.Value[0].Rules, utils.DynamicDataPrefix+utils.MetaHdr) { @@ -169,77 +162,60 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) { } } - if err = rdr.setLineLen(rdr.files[fName], hasHeader, hasTrailer); err != nil { + if err = rdr.setLineLen(fileVars, hasHeader, hasTrailer); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Row 0, error: cannot set lineLen: %s", utils.ERs, err.Error())) - rdr.fileMutex.Unlock() break } if hasTrailer { - if err = rdr.processTrailer(rdr.files[fName], trailerFields); err != nil { + if err = rdr.processTrailer(fileVars, trailerFields); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Read trailer error: %s ", utils.ERs, err.Error())) - rdr.fileMutex.Unlock() return } } if hasHeader { - if err = rdr.processHeader(rdr.files[fName], headerFields); err != nil { + if err = rdr.processHeader(fileVars, headerFields); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Row 0, error reading header: %s", utils.ERs, err.Error())) - rdr.fileMutex.Unlock() return } - rdr.fileMutex.Unlock() continue } } - rdr.fileMutex.Unlock() - rdr.fileMutex.RLock() - if rdr.files[fName].offset >= rdr.files[fName].trailerOffset { - rdr.fileMutex.RUnlock() + if fileVars.offset >= fileVars.trailerOffset { break } - buf := make([]byte, rdr.files[fName].lineLength) + buf := make([]byte, fileVars.lineLength) if nRead, err := file.Read(buf); err != nil { if err == io.EOF { - rdr.fileMutex.RUnlock() break } - rdr.fileMutex.RUnlock() return err - } else if nRead != len(buf) && int64(nRead) != rdr.files[fName].trailerLength { + } else if nRead != len(buf) && int64(nRead) != fileVars.trailerLength { utils.Logger.Err(fmt.Sprintf("<%s> Could not read complete line, have instead: %s", utils.ERs, string(buf))) - rdr.fileMutex.RUnlock() - rdr.fileMutex.Lock() - rdr.files[fName].offset += rdr.files[fName].lineLength // increase the offset when exit - rdr.fileMutex.Unlock() + fileVars.offset += fileVars.lineLength // increase the offset when exit continue } - rdr.fileMutex.RUnlock() rowNr++ // increment the rowNr after checking if it's not the end of file record := string(buf) - rdr.fileMutex.Lock() agReq := agents.NewAgentRequest( config.NewFWVProvider(record), reqVars, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS, rdr.files[fName].headerDP, rdr.files[fName].trailerDP) // create an AgentRequest + rdr.fltrS, fileVars.headerDP, fileVars.trailerDP) // create an AgentRequest if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass { - rdr.fileMutex.Unlock() continue } if err := agReq.SetFields(rdr.Config().Fields); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>", utils.ERs, absPath, rowNr, err.Error())) - rdr.files[fName].offset += rdr.files[fName].lineLength // increase the offset when exit - rdr.fileMutex.Unlock() + fileVars.offset += fileVars.lineLength // increase the offset when exit continue } - rdr.files[fName].offset += rdr.files[fName].lineLength // increase the offset - rdr.fileMutex.Unlock() + fileVars.offset += fileVars.lineLength // increase the offset rdr.rdrEvents <- &erEvent{ cgrEvent: config.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep), rdrCfg: rdr.Config(), @@ -248,10 +224,6 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) { } - rdr.fileMutex.Lock() - delete(rdr.files, fName) // file done processing, remove corresponding entry - rdr.fileMutex.Unlock() - if rdr.Config().ProcessedPath != "" { // Finished with file, move it to processed folder outPath := path.Join(rdr.Config().ProcessedPath, fName)