diff --git a/ers/filefwv.go b/ers/filefwv.go index 86799f7f3..055ceb069 100644 --- a/ers/filefwv.go +++ b/ers/filefwv.go @@ -49,6 +49,7 @@ func NewFWVFileERER(cfg *config.CGRConfig, cfgIdx int, rdrEvents: rdrEvents, rdrError: rdrErr, rdrExit: rdrExit, + files: make(map[string]*fileVars), conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)} var processFile struct{} for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ { @@ -60,21 +61,28 @@ func NewFWVFileERER(cfg *config.CGRConfig, cfgIdx int, // XMLFileER implements EventReader interface for .xml files type FWVFileER struct { sync.RWMutex - cgrCfg *config.CGRConfig - cfgIdx int // index of config instance within ERsCfg.Readers - fltrS *engine.FilterS - rdrDir string - rdrEvents chan *erEvent // channel to dispatch the events created to - rdrError chan error - rdrExit chan struct{} - conReqs chan struct{} // limit number of opened files - lineLen int64 // Length of the line in the file - offset int64 // Index of the next byte to process + cgrCfg *config.CGRConfig + cfgIdx int // index of config instance within ERsCfg.Readers + fltrS *engine.FilterS + rdrDir string + rdrEvents chan *erEvent // channel to dispatch the events created to + rdrError chan error + rdrExit chan struct{} + conReqs chan struct{} // limit number of opened files + files map[string]*fileVars // map that contains the relevant variables for each file being processed + fileMutex sync.RWMutex // mutex used for the map with file variables +} + +type fileVars struct { + offset int64 // index of the next byte to process + lineLength int64 // length of a line in the file headerOffset int64 - trailerOffset int64 // Index where trailer starts, to be used as boundary when reading cdrs + trailerOffset int64 // index where trailer starts, to be used as boundary when reading cdrs trailerLength int64 + path string // absolute path of the file headerDP utils.DataProvider trailerDP utils.DataProvider + file *os.File } func (rdr *FWVFileER) Config() *config.EventReaderCfg { @@ -135,15 +143,20 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) { } defer file.Close() - rowNr := 0 // This counts the rows in the file, not really number of CDRs - evsPosted := 0 + rdr.fileMutex.Lock() + rdr.files[fName] = &fileVars{file: file} // add map entry containing variables related to the current file that's being processed + rdr.fileMutex.Unlock() + + rowNr := 0 // This counts the rows in the file, not really number of CDRs + evsPosted := 0 // Number of CDRs successfully processed timeStart := time.Now() reqVars := utils.NavigableMap2{utils.MetaFileName: utils.NewNMData(fName)} for { var hasHeader, hasTrailer bool var headerFields, trailerFields []*config.FCTemplate - if rdr.offset == 0 { // First time, set the necessary offsets + rdr.fileMutex.Lock() + if rdr.files[fName].offset == 0 { // First time, set the necessary offsets // preprocess the fields for header and trailer for _, fld := range rdr.Config().Fields { if strings.HasPrefix(fld.Value[0].Rules, utils.DynamicDataPrefix+utils.MetaHdr) { @@ -156,62 +169,77 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) { } } - if err = rdr.setLineLen(file, hasHeader, hasTrailer); err != nil { + if err = rdr.setLineLen(rdr.files[fName], hasHeader, hasTrailer); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Row 0, error: cannot set lineLen: %s", utils.ERs, err.Error())) + rdr.fileMutex.Unlock() break } if hasTrailer { - - // process trailer here - if err = rdr.processTrailer(file, rowNr, evsPosted, absPath, trailerFields); err != nil { + if err = rdr.processTrailer(rdr.files[fName], trailerFields); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Read trailer error: %s ", utils.ERs, err.Error())) + rdr.fileMutex.Unlock() return } } if hasHeader { - if err = rdr.processHeader(file, rowNr, evsPosted, absPath, headerFields); err != nil { + if err = rdr.processHeader(rdr.files[fName], headerFields); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Row 0, error reading header: %s", utils.ERs, err.Error())) + rdr.fileMutex.Unlock() return } + rdr.fileMutex.Unlock() continue } } - if rdr.offset >= rdr.trailerOffset { + rdr.fileMutex.Unlock() + rdr.fileMutex.RLock() + if rdr.files[fName].offset >= rdr.files[fName].trailerOffset { + rdr.fileMutex.RUnlock() break } - buf := make([]byte, rdr.lineLen) + buf := make([]byte, rdr.files[fName].lineLength) if nRead, err := file.Read(buf); err != nil { if err == io.EOF { + rdr.fileMutex.RUnlock() break } + rdr.fileMutex.RUnlock() return err - } else if nRead != len(buf) && int64(nRead) != rdr.trailerLength { + } else if nRead != len(buf) && int64(nRead) != rdr.files[fName].trailerLength { utils.Logger.Err(fmt.Sprintf("<%s> Could not read complete line, have instead: %s", utils.ERs, string(buf))) - rdr.offset += rdr.lineLen // increase the offset when exit + rdr.fileMutex.RUnlock() + rdr.fileMutex.Lock() + rdr.files[fName].offset += rdr.files[fName].lineLength // increase the offset when exit + rdr.fileMutex.Unlock() continue } + rdr.fileMutex.RUnlock() rowNr++ // increment the rowNr after checking if it's not the end of file record := string(buf) + rdr.fileMutex.Lock() agReq := agents.NewAgentRequest( config.NewFWVProvider(record), reqVars, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS, rdr.headerDP, rdr.trailerDP) // create an AgentRequest + rdr.fltrS, rdr.files[fName].headerDP, rdr.files[fName].trailerDP) // create an AgentRequest if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass { + rdr.fileMutex.Unlock() continue } if err := agReq.SetFields(rdr.Config().Fields); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>", utils.ERs, absPath, rowNr, err.Error())) - rdr.offset += rdr.lineLen // increase the offset when exit + rdr.files[fName].offset += rdr.files[fName].lineLength // increase the offset when exit + rdr.fileMutex.Unlock() continue } - rdr.offset += rdr.lineLen // increase the offset + rdr.files[fName].offset += rdr.files[fName].lineLength // increase the offset + rdr.fileMutex.Unlock() rdr.rdrEvents <- &erEvent{ cgrEvent: config.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep), rdrCfg: rdr.Config(), @@ -220,6 +248,10 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) { } + rdr.fileMutex.Lock() + delete(rdr.files, fName) // file done processing, remove corresponding entry + rdr.fileMutex.Unlock() + if rdr.Config().ProcessedPath != "" { // Finished with file, move it to processed folder outPath := path.Join(rdr.Config().ProcessedPath, fName) @@ -227,22 +259,15 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) { return } } - rdr.offset = 0 - rdr.headerOffset = 0 - rdr.trailerOffset = 0 - rdr.trailerLength = 0 - rdr.lineLen = 0 - rdr.trailerDP = nil - rdr.headerDP = nil utils.Logger.Info( fmt.Sprintf("%s finished processing file <%s>. Total records processed: %d, events posted: %d, run duration: %s", - utils.ERs, absPath, rowNr, evsPosted, time.Now().Sub(timeStart))) + utils.ERs, absPath, rowNr, evsPosted, time.Since(timeStart))) return } // Sets the line length based on first line, sets offset back to initial after reading -func (rdr *FWVFileER) setLineLen(file *os.File, hasHeader, hasTrailer bool) error { - buff := bufio.NewReader(file) +func (rdr *FWVFileER) setLineLen(fileVars *fileVars, hasHeader, hasTrailer bool) error { + buff := bufio.NewReader(fileVars.file) // in case we have header we take the length of first line and add it as headerOffset i := 0 lastLineSize := 0 @@ -252,102 +277,93 @@ func (rdr *FWVFileER) setLineLen(file *os.File, hasHeader, hasTrailer bool) erro break } if hasHeader && i == 0 { - rdr.headerOffset = int64(len(readBytes)) + fileVars.headerOffset = int64(len(readBytes)) i++ continue } if (hasHeader && i == 1) || (!hasHeader && i == 0) { - rdr.lineLen = int64(len(readBytes)) + fileVars.lineLength = int64(len(readBytes)) i++ continue } lastLineSize = len(readBytes) } if hasTrailer { - if fi, err := file.Stat(); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Row 0, error: cannot get file stats: %s", utils.ERs, err.Error())) + if fi, err := fileVars.file.Stat(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> cannot retrieve stats for file: %s, %s", utils.ERs, fileVars.path, err.Error())) return err } else { - rdr.trailerOffset = fi.Size() - int64(lastLineSize) - rdr.trailerLength = int64(lastLineSize) + fileVars.trailerOffset = fi.Size() - int64(lastLineSize) + fileVars.trailerLength = int64(lastLineSize) } } - if _, err := file.Seek(0, 0); err != nil { + // reset the cursor + if _, err := fileVars.file.Seek(0, 0); err != nil { return err } return nil } -func (rdr *FWVFileER) processTrailer(file *os.File, rowNr, evsPosted int, absPath string, trailerFields []*config.FCTemplate) (err error) { - buf := make([]byte, rdr.trailerLength) - if nRead, err := file.ReadAt(buf, rdr.trailerOffset); err != nil && err != io.EOF { +func (rdr *FWVFileER) processTrailer(fileVars *fileVars, trailerFields []*config.FCTemplate) (err error) { + buf := make([]byte, fileVars.trailerLength) + if nRead, err := fileVars.file.ReadAt(buf, fileVars.trailerOffset); err != nil && err != io.EOF { return err } else if nRead != len(buf) { - return fmt.Errorf("In trailer, line len: %d, have read: %d instead of: %d", rdr.trailerOffset, nRead, len(buf)) + return fmt.Errorf("in trailer, offset: %d, have read: %d instead of: %d", fileVars.trailerOffset, nRead, len(buf)) } record := string(buf) - rdr.trailerDP = config.NewFWVProvider(record) + fileVars.trailerDP = config.NewFWVProvider(record) agReq := agents.NewAgentRequest( utils.NavigableMap2{}, nil, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS, nil, rdr.trailerDP) // create an AgentRequest + rdr.fltrS, nil, fileVars.trailerDP) // create an AgentRequest if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass { return nil } if err := agReq.SetFields(trailerFields); err != nil { utils.Logger.Warning( - fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>", - utils.ERs, absPath, rowNr, err.Error())) + fmt.Sprintf("<%s> reading file: <%s> trailer row, ignoring due to error: <%s>", + utils.ERs, fileVars.path, err.Error())) return err } - rdr.rdrEvents <- &erEvent{ - cgrEvent: config.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep), - rdrCfg: rdr.Config(), - } - evsPosted++ // reset the cursor after process the trailer - _, err = file.Seek(0, 0) + _, err = fileVars.file.Seek(0, 0) return } -func (rdr *FWVFileER) processHeader(file *os.File, rowNr, evsPosted int, absPath string, hdrFields []*config.FCTemplate) error { - buf := make([]byte, rdr.headerOffset) - if nRead, err := file.Read(buf); err != nil { +func (rdr *FWVFileER) processHeader(fileVars *fileVars, hdrFields []*config.FCTemplate) error { + buf := make([]byte, fileVars.headerOffset) + if nRead, err := fileVars.file.Read(buf); err != nil { return err } else if nRead != len(buf) { - return fmt.Errorf("In header, line len: %d, have read: %d", rdr.headerOffset, nRead) + return fmt.Errorf("in header, offset: %d, have read: %d", fileVars.headerOffset, nRead) } - return rdr.createHeaderMap(string(buf), rowNr, evsPosted, absPath, hdrFields) + return rdr.createHeaderMap(string(buf), fileVars, hdrFields) } -func (rdr *FWVFileER) createHeaderMap(record string, rowNr, evsPosted int, absPath string, hdrFields []*config.FCTemplate) (err error) { - rdr.offset += rdr.headerOffset // increase the offset - rdr.headerDP = config.NewFWVProvider(record) +func (rdr *FWVFileER) createHeaderMap(record string, fileVars *fileVars, hdrFields []*config.FCTemplate) (err error) { + fileVars.offset += fileVars.headerOffset // increase the offset + fileVars.headerDP = config.NewFWVProvider(record) agReq := agents.NewAgentRequest( utils.NavigableMap2{}, nil, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS, rdr.headerDP, nil) // create an AgentRequest + rdr.fltrS, fileVars.headerDP, nil) // create an AgentRequest if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass { return nil } if err := agReq.SetFields(hdrFields); err != nil { utils.Logger.Warning( - fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>", - utils.ERs, absPath, rowNr, err.Error())) + fmt.Sprintf("<%s> reading file: <%s> header row, ignoring due to error: <%s>", + utils.ERs, fileVars.path, err.Error())) return err } - rdr.rdrEvents <- &erEvent{ - cgrEvent: config.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep), - rdrCfg: rdr.Config(), - } - evsPosted++ return }