Change processing method when reading fwv files

This commit is contained in:
ionutboangiu
2023-01-19 19:27:33 +02:00
committed by Dan Christian Bogos
parent 6687955519
commit 7777e46758

View File

@@ -49,6 +49,7 @@ func NewFWVFileERER(cfg *config.CGRConfig, cfgIdx int,
rdrEvents: rdrEvents,
rdrError: rdrErr,
rdrExit: rdrExit,
files: make(map[string]*fileVars),
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)}
var processFile struct{}
for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ {
@@ -60,21 +61,28 @@ func NewFWVFileERER(cfg *config.CGRConfig, cfgIdx int,
// XMLFileER implements EventReader interface for .xml files
type FWVFileER struct {
sync.RWMutex
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
fltrS *engine.FilterS
rdrDir string
rdrEvents chan *erEvent // channel to dispatch the events created to
rdrError chan error
rdrExit chan struct{}
conReqs chan struct{} // limit number of opened files
lineLen int64 // Length of the line in the file
offset int64 // Index of the next byte to process
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
fltrS *engine.FilterS
rdrDir string
rdrEvents chan *erEvent // channel to dispatch the events created to
rdrError chan error
rdrExit chan struct{}
conReqs chan struct{} // limit number of opened files
files map[string]*fileVars // map that contains the relevant variables for each file being processed
fileMutex sync.RWMutex // mutex used for the map with file variables
}
type fileVars struct {
offset int64 // index of the next byte to process
lineLength int64 // length of a line in the file
headerOffset int64
trailerOffset int64 // Index where trailer starts, to be used as boundary when reading cdrs
trailerOffset int64 // index where trailer starts, to be used as boundary when reading cdrs
trailerLength int64
path string // absolute path of the file
headerDP utils.DataProvider
trailerDP utils.DataProvider
file *os.File
}
func (rdr *FWVFileER) Config() *config.EventReaderCfg {
@@ -135,15 +143,20 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) {
}
defer file.Close()
rowNr := 0 // This counts the rows in the file, not really number of CDRs
evsPosted := 0
rdr.fileMutex.Lock()
rdr.files[fName] = &fileVars{file: file} // add map entry containing variables related to the current file that's being processed
rdr.fileMutex.Unlock()
rowNr := 0 // This counts the rows in the file, not really number of CDRs
evsPosted := 0 // Number of CDRs successfully processed
timeStart := time.Now()
reqVars := utils.NavigableMap2{utils.MetaFileName: utils.NewNMData(fName)}
for {
var hasHeader, hasTrailer bool
var headerFields, trailerFields []*config.FCTemplate
if rdr.offset == 0 { // First time, set the necessary offsets
rdr.fileMutex.Lock()
if rdr.files[fName].offset == 0 { // First time, set the necessary offsets
// preprocess the fields for header and trailer
for _, fld := range rdr.Config().Fields {
if strings.HasPrefix(fld.Value[0].Rules, utils.DynamicDataPrefix+utils.MetaHdr) {
@@ -156,62 +169,77 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) {
}
}
if err = rdr.setLineLen(file, hasHeader, hasTrailer); err != nil {
if err = rdr.setLineLen(rdr.files[fName], hasHeader, hasTrailer); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Row 0, error: cannot set lineLen: %s", utils.ERs, err.Error()))
rdr.fileMutex.Unlock()
break
}
if hasTrailer {
// process trailer here
if err = rdr.processTrailer(file, rowNr, evsPosted, absPath, trailerFields); err != nil {
if err = rdr.processTrailer(rdr.files[fName], trailerFields); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Read trailer error: %s ", utils.ERs, err.Error()))
rdr.fileMutex.Unlock()
return
}
}
if hasHeader {
if err = rdr.processHeader(file, rowNr, evsPosted, absPath, headerFields); err != nil {
if err = rdr.processHeader(rdr.files[fName], headerFields); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Row 0, error reading header: %s", utils.ERs, err.Error()))
rdr.fileMutex.Unlock()
return
}
rdr.fileMutex.Unlock()
continue
}
}
if rdr.offset >= rdr.trailerOffset {
rdr.fileMutex.Unlock()
rdr.fileMutex.RLock()
if rdr.files[fName].offset >= rdr.files[fName].trailerOffset {
rdr.fileMutex.RUnlock()
break
}
buf := make([]byte, rdr.lineLen)
buf := make([]byte, rdr.files[fName].lineLength)
if nRead, err := file.Read(buf); err != nil {
if err == io.EOF {
rdr.fileMutex.RUnlock()
break
}
rdr.fileMutex.RUnlock()
return err
} else if nRead != len(buf) && int64(nRead) != rdr.trailerLength {
} else if nRead != len(buf) && int64(nRead) != rdr.files[fName].trailerLength {
utils.Logger.Err(fmt.Sprintf("<%s> Could not read complete line, have instead: %s", utils.ERs, string(buf)))
rdr.offset += rdr.lineLen // increase the offset when exit
rdr.fileMutex.RUnlock()
rdr.fileMutex.Lock()
rdr.files[fName].offset += rdr.files[fName].lineLength // increase the offset when exit
rdr.fileMutex.Unlock()
continue
}
rdr.fileMutex.RUnlock()
rowNr++ // increment the rowNr after checking if it's not the end of file
record := string(buf)
rdr.fileMutex.Lock()
agReq := agents.NewAgentRequest(
config.NewFWVProvider(record), reqVars,
nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
rdr.fltrS, rdr.headerDP, rdr.trailerDP) // create an AgentRequest
rdr.fltrS, rdr.files[fName].headerDP, rdr.files[fName].trailerDP) // create an AgentRequest
if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
agReq); err != nil || !pass {
rdr.fileMutex.Unlock()
continue
}
if err := agReq.SetFields(rdr.Config().Fields); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>",
utils.ERs, absPath, rowNr, err.Error()))
rdr.offset += rdr.lineLen // increase the offset when exit
rdr.files[fName].offset += rdr.files[fName].lineLength // increase the offset when exit
rdr.fileMutex.Unlock()
continue
}
rdr.offset += rdr.lineLen // increase the offset
rdr.files[fName].offset += rdr.files[fName].lineLength // increase the offset
rdr.fileMutex.Unlock()
rdr.rdrEvents <- &erEvent{
cgrEvent: config.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep),
rdrCfg: rdr.Config(),
@@ -220,6 +248,10 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) {
}
rdr.fileMutex.Lock()
delete(rdr.files, fName) // file done processing, remove corresponding entry
rdr.fileMutex.Unlock()
if rdr.Config().ProcessedPath != "" {
// Finished with file, move it to processed folder
outPath := path.Join(rdr.Config().ProcessedPath, fName)
@@ -227,22 +259,15 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) {
return
}
}
rdr.offset = 0
rdr.headerOffset = 0
rdr.trailerOffset = 0
rdr.trailerLength = 0
rdr.lineLen = 0
rdr.trailerDP = nil
rdr.headerDP = nil
utils.Logger.Info(
fmt.Sprintf("%s finished processing file <%s>. Total records processed: %d, events posted: %d, run duration: %s",
utils.ERs, absPath, rowNr, evsPosted, time.Now().Sub(timeStart)))
utils.ERs, absPath, rowNr, evsPosted, time.Since(timeStart)))
return
}
// Sets the line length based on first line, sets offset back to initial after reading
func (rdr *FWVFileER) setLineLen(file *os.File, hasHeader, hasTrailer bool) error {
buff := bufio.NewReader(file)
func (rdr *FWVFileER) setLineLen(fileVars *fileVars, hasHeader, hasTrailer bool) error {
buff := bufio.NewReader(fileVars.file)
// in case we have header we take the length of first line and add it as headerOffset
i := 0
lastLineSize := 0
@@ -252,102 +277,93 @@ func (rdr *FWVFileER) setLineLen(file *os.File, hasHeader, hasTrailer bool) erro
break
}
if hasHeader && i == 0 {
rdr.headerOffset = int64(len(readBytes))
fileVars.headerOffset = int64(len(readBytes))
i++
continue
}
if (hasHeader && i == 1) || (!hasHeader && i == 0) {
rdr.lineLen = int64(len(readBytes))
fileVars.lineLength = int64(len(readBytes))
i++
continue
}
lastLineSize = len(readBytes)
}
if hasTrailer {
if fi, err := file.Stat(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Row 0, error: cannot get file stats: %s", utils.ERs, err.Error()))
if fi, err := fileVars.file.Stat(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> cannot retrieve stats for file: %s, %s", utils.ERs, fileVars.path, err.Error()))
return err
} else {
rdr.trailerOffset = fi.Size() - int64(lastLineSize)
rdr.trailerLength = int64(lastLineSize)
fileVars.trailerOffset = fi.Size() - int64(lastLineSize)
fileVars.trailerLength = int64(lastLineSize)
}
}
if _, err := file.Seek(0, 0); err != nil {
// reset the cursor
if _, err := fileVars.file.Seek(0, 0); err != nil {
return err
}
return nil
}
func (rdr *FWVFileER) processTrailer(file *os.File, rowNr, evsPosted int, absPath string, trailerFields []*config.FCTemplate) (err error) {
buf := make([]byte, rdr.trailerLength)
if nRead, err := file.ReadAt(buf, rdr.trailerOffset); err != nil && err != io.EOF {
func (rdr *FWVFileER) processTrailer(fileVars *fileVars, trailerFields []*config.FCTemplate) (err error) {
buf := make([]byte, fileVars.trailerLength)
if nRead, err := fileVars.file.ReadAt(buf, fileVars.trailerOffset); err != nil && err != io.EOF {
return err
} else if nRead != len(buf) {
return fmt.Errorf("In trailer, line len: %d, have read: %d instead of: %d", rdr.trailerOffset, nRead, len(buf))
return fmt.Errorf("in trailer, offset: %d, have read: %d instead of: %d", fileVars.trailerOffset, nRead, len(buf))
}
record := string(buf)
rdr.trailerDP = config.NewFWVProvider(record)
fileVars.trailerDP = config.NewFWVProvider(record)
agReq := agents.NewAgentRequest(
utils.NavigableMap2{}, nil, nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
rdr.fltrS, nil, rdr.trailerDP) // create an AgentRequest
rdr.fltrS, nil, fileVars.trailerDP) // create an AgentRequest
if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
agReq); err != nil || !pass {
return nil
}
if err := agReq.SetFields(trailerFields); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>",
utils.ERs, absPath, rowNr, err.Error()))
fmt.Sprintf("<%s> reading file: <%s> trailer row, ignoring due to error: <%s>",
utils.ERs, fileVars.path, err.Error()))
return err
}
rdr.rdrEvents <- &erEvent{
cgrEvent: config.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep),
rdrCfg: rdr.Config(),
}
evsPosted++
// reset the cursor after process the trailer
_, err = file.Seek(0, 0)
_, err = fileVars.file.Seek(0, 0)
return
}
func (rdr *FWVFileER) processHeader(file *os.File, rowNr, evsPosted int, absPath string, hdrFields []*config.FCTemplate) error {
buf := make([]byte, rdr.headerOffset)
if nRead, err := file.Read(buf); err != nil {
func (rdr *FWVFileER) processHeader(fileVars *fileVars, hdrFields []*config.FCTemplate) error {
buf := make([]byte, fileVars.headerOffset)
if nRead, err := fileVars.file.Read(buf); err != nil {
return err
} else if nRead != len(buf) {
return fmt.Errorf("In header, line len: %d, have read: %d", rdr.headerOffset, nRead)
return fmt.Errorf("in header, offset: %d, have read: %d", fileVars.headerOffset, nRead)
}
return rdr.createHeaderMap(string(buf), rowNr, evsPosted, absPath, hdrFields)
return rdr.createHeaderMap(string(buf), fileVars, hdrFields)
}
func (rdr *FWVFileER) createHeaderMap(record string, rowNr, evsPosted int, absPath string, hdrFields []*config.FCTemplate) (err error) {
rdr.offset += rdr.headerOffset // increase the offset
rdr.headerDP = config.NewFWVProvider(record)
func (rdr *FWVFileER) createHeaderMap(record string, fileVars *fileVars, hdrFields []*config.FCTemplate) (err error) {
fileVars.offset += fileVars.headerOffset // increase the offset
fileVars.headerDP = config.NewFWVProvider(record)
agReq := agents.NewAgentRequest(
utils.NavigableMap2{}, nil, nil, nil,
rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
rdr.fltrS, rdr.headerDP, nil) // create an AgentRequest
rdr.fltrS, fileVars.headerDP, nil) // create an AgentRequest
if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
agReq); err != nil || !pass {
return nil
}
if err := agReq.SetFields(hdrFields); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>",
utils.ERs, absPath, rowNr, err.Error()))
fmt.Sprintf("<%s> reading file: <%s> header row, ignoring due to error: <%s>",
utils.ERs, fileVars.path, err.Error()))
return err
}
rdr.rdrEvents <- &erEvent{
cgrEvent: config.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep),
rdrCfg: rdr.Config(),
}
evsPosted++
return
}