diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index dffbe9f4d..5897c33e6 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -347,13 +347,13 @@ func startERs(sSChan, dspSChan chan rpcclient.RpcClientConnection, } var erS *ers.ERService - if erS, err = ers.NewERService(cfg, filterS, sS); err != nil { + if erS, err = ers.NewERService(cfg, filterS, sS, exitChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error())) exitChan <- true return } - if err = erS.ListenAndServe(cfgRld, exitChan); err != nil { + if err = erS.ListenAndServe(cfgRld); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error())) } diff --git a/data/conf/samples/ers/cgrates.json b/data/conf/samples/ers/cgrates.json index ae3b70fc9..80ca9fa31 100644 --- a/data/conf/samples/ers/cgrates.json +++ b/data/conf/samples/ers/cgrates.json @@ -47,6 +47,9 @@ { "id": "file_reader1", "run_delay": -1, + "type": "*file_csv", + "source_path": "/tmp/ers/in", + "processed_path": "/tmp/ers/out", }, ], }, diff --git a/ers/ers.go b/ers/ers.go index e98790f67..adc0cd57e 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -21,6 +21,8 @@ package ers import ( "fmt" "path" + "path/filepath" + "strings" "sync" "time" @@ -33,12 +35,13 @@ import ( // NewERService instantiates the ERService func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, - sS rpcclient.RpcClientConnection) (erS *ERService, err error) { + sS rpcclient.RpcClientConnection, exitChan chan bool) (erS *ERService, err error) { erS = &ERService{ - cfg: cfg, - rdrs: make(map[string][]EventReader), - stopLsn: make(map[string]chan struct{}), - sS: sS, + cfg: cfg, + rdrs: make(map[string][]EventReader), + stopLsn: make(map[string]chan struct{}), + sS: sS, + exitChan: exitChan, } return } @@ -46,39 +49,55 @@ func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, // ERService is managing the EventReaders type ERService struct { sync.RWMutex - cfg *config.CGRConfig - rdrs map[string][]EventReader // list of readers on specific paths map[path]reader - stopLsn map[string]chan struct{} // stops listening on paths - filterS *engine.FilterS - sS rpcclient.RpcClientConnection // connection towards SessionS - + cfg *config.CGRConfig + rdrs map[string][]EventReader // list of readers on specific paths map[path]reader + stopLsn map[string]chan struct{} // stops listening on paths + filterS *engine.FilterS + sS rpcclient.RpcClientConnection // connection towards SessionS + exitChan chan bool } // ListenAndServe loops keeps the service alive -func (erS *ERService) ListenAndServe(cfgRldChan chan struct{}, - exitChan chan bool) (err error) { +func (erS *ERService) ListenAndServe(cfgRldChan chan struct{}) (err error) { + var watchDirs []string for _, rdrCfg := range erS.cfg.ERsCfg().Readers { var rdr EventReader if rdr, err = NewEventReader(rdrCfg); err != nil { return } - if _, hasPath := erS.rdrs[rdrCfg.SourcePath]; !hasPath && + srcPath := rdrCfg.SourcePath + if strings.HasSuffix(srcPath, utils.Slash) { + srcPath = strings.TrimSuffix(srcPath, utils.Slash) + } + if _, hasPath := erS.rdrs[srcPath]; !hasPath && rdrCfg.Type == utils.MetaFileCSV && rdrCfg.RunDelay == time.Duration(-1) { // set the channel to control listen stop - erS.stopLsn[rdrCfg.SourcePath] = make(chan struct{}) - if err = erS.watchDir(rdrCfg.SourcePath); err != nil { - return - } + erS.stopLsn[srcPath] = make(chan struct{}) + watchDirs = append(watchDirs, srcPath) } - erS.rdrs[rdrCfg.SourcePath] = append(erS.rdrs[rdrCfg.SourcePath], rdr) + erS.rdrs[srcPath] = append(erS.rdrs[srcPath], rdr) } - //fmt.Printf("ERSService: %s\n", utils.ToIJSON(erS.rdrs)) - go erS.handleReloads(cfgRldChan, exitChan) - e := <-exitChan - exitChan <- e // put back for the others listening for shutdown request + go erS.handleReloads(cfgRldChan) + erS.setDirWatchers(watchDirs) + e := <-erS.exitChan + erS.exitChan <- e // put back for the others listening for shutdown request return } +// setDirWatchers sets up directory watchers +func (erS *ERService) setDirWatchers(dirPaths []string) { + for _, dirPath := range dirPaths { + go func() { + if err := erS.watchDir(dirPath); err != nil { + utils.Logger.Crit( + fmt.Sprintf("<%s> watching directory <%s> got error: <%s>", + utils.ERs, dirPath, err.Error())) + erS.exitChan <- true + } + }() + } +} + // erCfgRef will be used to reference a specific reader type erCfgRef struct { path string @@ -86,10 +105,10 @@ type erCfgRef struct { } // handleReloads will handle the config reloads which are signaled over cfgRldChan -func (erS *ERService) handleReloads(cfgRldChan chan struct{}, exitChan chan bool) { +func (erS *ERService) handleReloads(cfgRldChan chan struct{}) { for { select { - case <-exitChan: + case <-erS.exitChan: return case <-cfgRldChan: cfgIDs := make(map[string]int) // IDs which are configured in EventReader profiles as map[id]cfgIdx @@ -119,10 +138,11 @@ func (erS *ERService) handleReloads(cfgRldChan chan struct{}, exitChan chan bool addIDs[id] = struct{}{} } } + // remove the necessary ids for id := range remIDs { ref := inUseIDs[id] rdrSlc := erS.rdrs[ref.path] - // remove the ids + copy(rdrSlc[ref.idx:], rdrSlc[ref.idx+1:]) rdrSlc[len(rdrSlc)-1] = nil // so it can be garbage collected rdrSlc = rdrSlc[:len(rdrSlc)-1] @@ -134,28 +154,29 @@ func (erS *ERService) handleReloads(cfgRldChan chan struct{}, exitChan chan bool } } // add new ids: + var watchDirs []string for id := range addIDs { rdrCfg := erS.cfg.ERsCfg().Readers[cfgIDs[id]] + srcPath := rdrCfg.SourcePath + if strings.HasSuffix(srcPath, utils.Slash) { + srcPath = strings.TrimSuffix(srcPath, utils.Slash) + } if rdr, err := NewEventReader(rdrCfg); err != nil { utils.Logger.Warning( fmt.Sprintf( "<%s> error reloading config with ID: <%s>, err: <%s>", utils.ERs, id, err.Error())) } else { - if _, hasPath := erS.rdrs[rdrCfg.SourcePath]; !hasPath && + if _, hasPath := erS.rdrs[srcPath]; !hasPath && rdrCfg.Type == utils.MetaFileCSV && rdrCfg.RunDelay == time.Duration(-1) { // set the channel to control listen stop - erS.stopLsn[rdrCfg.SourcePath] = make(chan struct{}) - if err := erS.watchDir(rdrCfg.SourcePath); err != nil { - utils.Logger.Warning( - fmt.Sprintf( - "<%s> error scheduling dir watch for config: <%s>, err: <%s>", - utils.ERs, id, err.Error())) - } + erS.stopLsn[srcPath] = make(chan struct{}) + watchDirs = append(watchDirs, srcPath) } - erS.rdrs[rdrCfg.SourcePath] = append(erS.rdrs[rdrCfg.SourcePath], rdr) + erS.rdrs[srcPath] = append(erS.rdrs[srcPath], rdr) } } + erS.setDirWatchers(watchDirs) erS.Unlock() } } @@ -183,8 +204,8 @@ func (erS *ERService) watchDir(dirPath string) (err error) { if ev.Op&fsnotify.Create == fsnotify.Create && path.Ext(ev.Name) == utils.CSVSuffix { go func() { //Enable async processing here - if err = erS.processPath(ev.Name); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> processing path <%s>, error: <%s>", + if err = erS.processFPath(filepath.Dir(ev.Name), filepath.Base(ev.Name)); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> processing path <%s>, error: <%s>", utils.ERs, ev.Name, err.Error())) } }() @@ -195,7 +216,29 @@ func (erS *ERService) watchDir(dirPath string) (err error) { } } -func (erS *ERService) processPath(path string) (err error) { - fmt.Printf("ERService processPath: <%s>", path) +// processFPath is called by inotify or manual handler to process a file with path +func (erS *ERService) processFPath(dirPath string, fName string) (err error) { + rdrs, has := erS.rdrs[dirPath] + if !has { + return fmt.Errorf("no reader for path: <%s>", dirPath) + } + fPath := path.Join(dirPath, fName) + for _, rdr := range rdrs { + if errRdr := rdr.Init(fPath); errRdr != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> processing filePath <%s>, error: <%s>", + utils.ERs, fPath, errRdr.Error())) + continue + } + /*for { + if cdr, err := rdr.Read(); err != nil { + + } + } + */ + if errRdr := rdr.Close(); errRdr != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> closing filePath <%s>, error: <%s>", + utils.ERs, fPath, errRdr.Error())) + } + } return } diff --git a/ers/csv.go b/ers/filecsv.go similarity index 83% rename from ers/csv.go rename to ers/filecsv.go index a53c8f1cb..dc8959ad2 100644 --- a/ers/csv.go +++ b/ers/filecsv.go @@ -23,11 +23,11 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewCSVFileER(cfg *config.CGRConfig, cfgIdx int) (er EventReader, err error) { - return +func NewCSVFileER(cfg *config.EventReaderCfg) (er EventReader, err error) { + return new(CSVFileER), nil } -// CSVer implements EventReader interface for .csv files +// CSVFileER implements EventReader interface for .csv files type CSVFileER struct { } @@ -35,6 +35,10 @@ func (csv *CSVFileER) ID() (id string) { return } +func (csv *CSVFileER) Config() (rdrCfg *config.EventReaderCfg) { + return +} + func (csv *CSVFileER) Init(args interface{}) (err error) { return } diff --git a/ers/reader.go b/ers/reader.go index 02d66904b..fc21e01ac 100644 --- a/ers/reader.go +++ b/ers/reader.go @@ -29,7 +29,7 @@ type EventReader interface { ID() string // configuration identifier Config() *config.EventReaderCfg // reader configuration Init(args interface{}) error // init will initialize the Reader, ie: open the file to read or http connection - Read() (*utils.CGREvent, error) // Process a single record in the events file + Read() (*utils.CGREvent, error) // process a single record in the events file Processed() int64 // number of records processed Close() error // called when the reader should release resources } @@ -40,7 +40,7 @@ func NewEventReader(rdrCfg *config.EventReaderCfg) (er EventReader, err error) { default: err = fmt.Errorf("unsupported reader type: <%s>", rdrCfg.Type) case utils.MetaFileCSV: - + return NewCSVFileER(rdrCfg) } return } diff --git a/utils/consts.go b/utils/consts.go index 3eb23749c..c26cfc1ac 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -567,6 +567,7 @@ const ( ERs = "ERs" Ratio = "Ratio" Load = "Load" + Slash = "/" ) // Migrator Action