Allow concurrent processing when reading fwv files

This commit is contained in:
ionutboangiu
2023-01-20 18:51:41 +02:00
committed by Dan Christian Bogos
parent 7777e46758
commit d20fa06201

View File

@@ -49,7 +49,6 @@ func NewFWVFileERER(cfg *config.CGRConfig, cfgIdx int,
rdrEvents: rdrEvents,
rdrError: rdrErr,
rdrExit: rdrExit,
files: make(map[string]*fileVars),
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)}
var processFile struct{}
for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ {
@@ -68,9 +67,7 @@ type FWVFileER struct {
rdrEvents chan *erEvent // channel to dispatch the events created to
rdrError chan error
rdrExit chan struct{}
conReqs chan struct{} // limit number of opened files
files map[string]*fileVars // map that contains the relevant variables for each file being processed
fileMutex sync.RWMutex // mutex used for the map with file variables
conReqs chan struct{} // limit number of opened files
}
type fileVars struct {
@@ -143,10 +140,7 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) {
}
defer file.Close()
rdr.fileMutex.Lock()
rdr.files[fName] = &fileVars{file: file} // add map entry containing variables related to the current file that's being processed
rdr.fileMutex.Unlock()
fileVars := &fileVars{file: file, path: absPath}
rowNr := 0 // This counts the rows in the file, not really number of CDRs
evsPosted := 0 // Number of CDRs successfully processed
timeStart := time.Now()
@@ -155,8 +149,7 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) {
for {
var hasHeader, hasTrailer bool
var headerFields, trailerFields []*config.FCTemplate
rdr.fileMutex.Lock()
if rdr.files[fName].offset == 0 { // First time, set the necessary offsets
if fileVars.offset == 0 { // First time, set the necessary offsets
// preprocess the fields for header and trailer
for _, fld := range rdr.Config().Fields {
if strings.HasPrefix(fld.Value[0].Rules, utils.DynamicDataPrefix+utils.MetaHdr) {
@@ -169,77 +162,60 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) {
}
}
if err = rdr.setLineLen(rdr.files[fName], hasHeader, hasTrailer); err != nil {
if err = rdr.setLineLen(fileVars, hasHeader, hasTrailer); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Row 0, error: cannot set lineLen: %s", utils.ERs, err.Error()))
rdr.fileMutex.Unlock()
break
}
if hasTrailer {
if err = rdr.processTrailer(rdr.files[fName], trailerFields); err != nil {
if err = rdr.processTrailer(fileVars, trailerFields); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Read trailer error: %s ", utils.ERs, err.Error()))
rdr.fileMutex.Unlock()
return
}
}
if hasHeader {
if err = rdr.processHeader(rdr.files[fName], headerFields); err != nil {
if err = rdr.processHeader(fileVars, headerFields); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Row 0, error reading header: %s", utils.ERs, err.Error()))
rdr.fileMutex.Unlock()
return
}
rdr.fileMutex.Unlock()
continue
}
}
rdr.fileMutex.Unlock()
rdr.fileMutex.RLock()
if rdr.files[fName].offset >= rdr.files[fName].trailerOffset {
rdr.fileMutex.RUnlock()
if fileVars.offset >= fileVars.trailerOffset {
break
}
buf := make([]byte, rdr.files[fName].lineLength)
buf := make([]byte, fileVars.lineLength)
if nRead, err := file.Read(buf); err != nil {
if err == io.EOF {
rdr.fileMutex.RUnlock()
break
}
rdr.fileMutex.RUnlock()
return err
} else if nRead != len(buf) && int64(nRead) != rdr.files[fName].trailerLength {
} else if nRead != len(buf) && int64(nRead) != fileVars.trailerLength {
utils.Logger.Err(fmt.Sprintf("<%s> Could not read complete line, have instead: %s", utils.ERs, string(buf)))
rdr.fileMutex.RUnlock()
rdr.fileMutex.Lock()
rdr.files[fName].offset += rdr.files[fName].lineLength // increase the offset when exit
rdr.fileMutex.Unlock()
fileVars.offset += fileVars.lineLength // increase the offset when exit
continue
}
rdr.fileMutex.RUnlock()
rowNr++ // increment the rowNr after checking if it's not the end of file
record := string(buf)
rdr.fileMutex.Lock()
agReq := agents.NewAgentRequest(
config.NewFWVProvider(record), reqVars,
nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
rdr.fltrS, rdr.files[fName].headerDP, rdr.files[fName].trailerDP) // create an AgentRequest
rdr.fltrS, fileVars.headerDP, fileVars.trailerDP) // create an AgentRequest
if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
agReq); err != nil || !pass {
rdr.fileMutex.Unlock()
continue
}
if err := agReq.SetFields(rdr.Config().Fields); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>",
utils.ERs, absPath, rowNr, err.Error()))
rdr.files[fName].offset += rdr.files[fName].lineLength // increase the offset when exit
rdr.fileMutex.Unlock()
fileVars.offset += fileVars.lineLength // increase the offset when exit
continue
}
rdr.files[fName].offset += rdr.files[fName].lineLength // increase the offset
rdr.fileMutex.Unlock()
fileVars.offset += fileVars.lineLength // increase the offset
rdr.rdrEvents <- &erEvent{
cgrEvent: config.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep),
rdrCfg: rdr.Config(),
@@ -248,10 +224,6 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) {
}
rdr.fileMutex.Lock()
delete(rdr.files, fName) // file done processing, remove corresponding entry
rdr.fileMutex.Unlock()
if rdr.Config().ProcessedPath != "" {
// Finished with file, move it to processed folder
outPath := path.Join(rdr.Config().ProcessedPath, fName)