EventReader.processFPath, improvement for file reloads

This commit is contained in:
DanB
2019-08-28 19:50:29 +02:00
parent d48dc38497
commit cf51ef5362
6 changed files with 97 additions and 46 deletions

View File

@@ -347,13 +347,13 @@ func startERs(sSChan, dspSChan chan rpcclient.RpcClientConnection,
}
var erS *ers.ERService
if erS, err = ers.NewERService(cfg, filterS, sS); err != nil {
if erS, err = ers.NewERService(cfg, filterS, sS, exitChan); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error()))
exitChan <- true
return
}
if err = erS.ListenAndServe(cfgRld, exitChan); err != nil {
if err = erS.ListenAndServe(cfgRld); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error()))
}

View File

@@ -47,6 +47,9 @@
{
"id": "file_reader1",
"run_delay": -1,
"type": "*file_csv",
"source_path": "/tmp/ers/in",
"processed_path": "/tmp/ers/out",
},
],
},

View File

@@ -21,6 +21,8 @@ package ers
import (
"fmt"
"path"
"path/filepath"
"strings"
"sync"
"time"
@@ -33,12 +35,13 @@ import (
// NewERService instantiates the ERService
func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS,
sS rpcclient.RpcClientConnection) (erS *ERService, err error) {
sS rpcclient.RpcClientConnection, exitChan chan bool) (erS *ERService, err error) {
erS = &ERService{
cfg: cfg,
rdrs: make(map[string][]EventReader),
stopLsn: make(map[string]chan struct{}),
sS: sS,
cfg: cfg,
rdrs: make(map[string][]EventReader),
stopLsn: make(map[string]chan struct{}),
sS: sS,
exitChan: exitChan,
}
return
}
@@ -46,39 +49,55 @@ func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS,
// ERService is managing the EventReaders
type ERService struct {
sync.RWMutex
cfg *config.CGRConfig
rdrs map[string][]EventReader // list of readers on specific paths map[path]reader
stopLsn map[string]chan struct{} // stops listening on paths
filterS *engine.FilterS
sS rpcclient.RpcClientConnection // connection towards SessionS
cfg *config.CGRConfig
rdrs map[string][]EventReader // list of readers on specific paths map[path]reader
stopLsn map[string]chan struct{} // stops listening on paths
filterS *engine.FilterS
sS rpcclient.RpcClientConnection // connection towards SessionS
exitChan chan bool
}
// ListenAndServe loops keeps the service alive
func (erS *ERService) ListenAndServe(cfgRldChan chan struct{},
exitChan chan bool) (err error) {
func (erS *ERService) ListenAndServe(cfgRldChan chan struct{}) (err error) {
var watchDirs []string
for _, rdrCfg := range erS.cfg.ERsCfg().Readers {
var rdr EventReader
if rdr, err = NewEventReader(rdrCfg); err != nil {
return
}
if _, hasPath := erS.rdrs[rdrCfg.SourcePath]; !hasPath &&
srcPath := rdrCfg.SourcePath
if strings.HasSuffix(srcPath, utils.Slash) {
srcPath = strings.TrimSuffix(srcPath, utils.Slash)
}
if _, hasPath := erS.rdrs[srcPath]; !hasPath &&
rdrCfg.Type == utils.MetaFileCSV &&
rdrCfg.RunDelay == time.Duration(-1) { // set the channel to control listen stop
erS.stopLsn[rdrCfg.SourcePath] = make(chan struct{})
if err = erS.watchDir(rdrCfg.SourcePath); err != nil {
return
}
erS.stopLsn[srcPath] = make(chan struct{})
watchDirs = append(watchDirs, srcPath)
}
erS.rdrs[rdrCfg.SourcePath] = append(erS.rdrs[rdrCfg.SourcePath], rdr)
erS.rdrs[srcPath] = append(erS.rdrs[srcPath], rdr)
}
//fmt.Printf("ERSService: %s\n", utils.ToIJSON(erS.rdrs))
go erS.handleReloads(cfgRldChan, exitChan)
e := <-exitChan
exitChan <- e // put back for the others listening for shutdown request
go erS.handleReloads(cfgRldChan)
erS.setDirWatchers(watchDirs)
e := <-erS.exitChan
erS.exitChan <- e // put back for the others listening for shutdown request
return
}
// setDirWatchers sets up directory watchers
func (erS *ERService) setDirWatchers(dirPaths []string) {
for _, dirPath := range dirPaths {
go func() {
if err := erS.watchDir(dirPath); err != nil {
utils.Logger.Crit(
fmt.Sprintf("<%s> watching directory <%s> got error: <%s>",
utils.ERs, dirPath, err.Error()))
erS.exitChan <- true
}
}()
}
}
// erCfgRef will be used to reference a specific reader
type erCfgRef struct {
path string
@@ -86,10 +105,10 @@ type erCfgRef struct {
}
// handleReloads will handle the config reloads which are signaled over cfgRldChan
func (erS *ERService) handleReloads(cfgRldChan chan struct{}, exitChan chan bool) {
func (erS *ERService) handleReloads(cfgRldChan chan struct{}) {
for {
select {
case <-exitChan:
case <-erS.exitChan:
return
case <-cfgRldChan:
cfgIDs := make(map[string]int) // IDs which are configured in EventReader profiles as map[id]cfgIdx
@@ -119,10 +138,11 @@ func (erS *ERService) handleReloads(cfgRldChan chan struct{}, exitChan chan bool
addIDs[id] = struct{}{}
}
}
// remove the necessary ids
for id := range remIDs {
ref := inUseIDs[id]
rdrSlc := erS.rdrs[ref.path]
// remove the ids
copy(rdrSlc[ref.idx:], rdrSlc[ref.idx+1:])
rdrSlc[len(rdrSlc)-1] = nil // so it can be garbage collected
rdrSlc = rdrSlc[:len(rdrSlc)-1]
@@ -134,28 +154,29 @@ func (erS *ERService) handleReloads(cfgRldChan chan struct{}, exitChan chan bool
}
}
// add new ids:
var watchDirs []string
for id := range addIDs {
rdrCfg := erS.cfg.ERsCfg().Readers[cfgIDs[id]]
srcPath := rdrCfg.SourcePath
if strings.HasSuffix(srcPath, utils.Slash) {
srcPath = strings.TrimSuffix(srcPath, utils.Slash)
}
if rdr, err := NewEventReader(rdrCfg); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> error reloading config with ID: <%s>, err: <%s>",
utils.ERs, id, err.Error()))
} else {
if _, hasPath := erS.rdrs[rdrCfg.SourcePath]; !hasPath &&
if _, hasPath := erS.rdrs[srcPath]; !hasPath &&
rdrCfg.Type == utils.MetaFileCSV &&
rdrCfg.RunDelay == time.Duration(-1) { // set the channel to control listen stop
erS.stopLsn[rdrCfg.SourcePath] = make(chan struct{})
if err := erS.watchDir(rdrCfg.SourcePath); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> error scheduling dir watch for config: <%s>, err: <%s>",
utils.ERs, id, err.Error()))
}
erS.stopLsn[srcPath] = make(chan struct{})
watchDirs = append(watchDirs, srcPath)
}
erS.rdrs[rdrCfg.SourcePath] = append(erS.rdrs[rdrCfg.SourcePath], rdr)
erS.rdrs[srcPath] = append(erS.rdrs[srcPath], rdr)
}
}
erS.setDirWatchers(watchDirs)
erS.Unlock()
}
}
@@ -183,8 +204,8 @@ func (erS *ERService) watchDir(dirPath string) (err error) {
if ev.Op&fsnotify.Create == fsnotify.Create &&
path.Ext(ev.Name) == utils.CSVSuffix {
go func() { //Enable async processing here
if err = erS.processPath(ev.Name); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> processing path <%s>, error: <%s>",
if err = erS.processFPath(filepath.Dir(ev.Name), filepath.Base(ev.Name)); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> processing path <%s>, error: <%s>",
utils.ERs, ev.Name, err.Error()))
}
}()
@@ -195,7 +216,29 @@ func (erS *ERService) watchDir(dirPath string) (err error) {
}
}
func (erS *ERService) processPath(path string) (err error) {
fmt.Printf("ERService processPath: <%s>", path)
// processFPath is called by inotify or manual handler to process a file with path
func (erS *ERService) processFPath(dirPath string, fName string) (err error) {
rdrs, has := erS.rdrs[dirPath]
if !has {
return fmt.Errorf("no reader for path: <%s>", dirPath)
}
fPath := path.Join(dirPath, fName)
for _, rdr := range rdrs {
if errRdr := rdr.Init(fPath); errRdr != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> processing filePath <%s>, error: <%s>",
utils.ERs, fPath, errRdr.Error()))
continue
}
/*for {
if cdr, err := rdr.Read(); err != nil {
}
}
*/
if errRdr := rdr.Close(); errRdr != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> closing filePath <%s>, error: <%s>",
utils.ERs, fPath, errRdr.Error()))
}
}
return
}

View File

@@ -23,11 +23,11 @@ import (
"github.com/cgrates/cgrates/utils"
)
func NewCSVFileER(cfg *config.CGRConfig, cfgIdx int) (er EventReader, err error) {
return
func NewCSVFileER(cfg *config.EventReaderCfg) (er EventReader, err error) {
return new(CSVFileER), nil
}
// CSVer implements EventReader interface for .csv files
// CSVFileER implements EventReader interface for .csv files
type CSVFileER struct {
}
@@ -35,6 +35,10 @@ func (csv *CSVFileER) ID() (id string) {
return
}
func (csv *CSVFileER) Config() (rdrCfg *config.EventReaderCfg) {
return
}
func (csv *CSVFileER) Init(args interface{}) (err error) {
return
}

View File

@@ -29,7 +29,7 @@ type EventReader interface {
ID() string // configuration identifier
Config() *config.EventReaderCfg // reader configuration
Init(args interface{}) error // init will initialize the Reader, ie: open the file to read or http connection
Read() (*utils.CGREvent, error) // Process a single record in the events file
Read() (*utils.CGREvent, error) // process a single record in the events file
Processed() int64 // number of records processed
Close() error // called when the reader should release resources
}
@@ -40,7 +40,7 @@ func NewEventReader(rdrCfg *config.EventReaderCfg) (er EventReader, err error) {
default:
err = fmt.Errorf("unsupported reader type: <%s>", rdrCfg.Type)
case utils.MetaFileCSV:
return NewCSVFileER(rdrCfg)
}
return
}

View File

@@ -567,6 +567,7 @@ const (
ERs = "ERs"
Ratio = "Ratio"
Load = "Load"
Slash = "/"
)
// Migrator Action