diff --git a/ers/ers.go b/ers/ers.go index 44a85c450..635cbdbb7 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -20,26 +20,28 @@ package ers import ( "fmt" + "path" "sync" + "time" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" + "github.com/fsnotify/fsnotify" ) // NewERService instantiates the ERService -func NewERService(cfg *config.CGRConfig, cdrS rpcclient.RpcClientConnection, +func NewERService(cfg *config.CGRConfig, + filterS *engine.FilterS, + sS rpcclient.RpcClientConnection, cfgRld chan struct{}) (erS *ERService, err error) { erS = &ERService{ - rdrs: make(map[string][]EventReader), - cfgRld: cfgRld, - } - for _, rdrCfg := range cfg.ERsCfg().Readers { - if rdr, err := NewEventReader(rdrCfg); err != nil { - return nil, err - } else { - erS.rdrs[rdrCfg.SourcePath] = append(erS.rdrs[rdrCfg.SourcePath], rdr) - } + cfg: cfg, + rdrs: make(map[string][]EventReader), + stopLsn: make(map[string]chan struct{}), + cfgRld: cfgRld, + sS: sS, } return } @@ -47,18 +49,37 @@ func NewERService(cfg *config.CGRConfig, cdrS rpcclient.RpcClientConnection, // ERService is managing the EventReaders type ERService struct { sync.RWMutex - cfg *config.CGRConfig - rdrs map[string][]EventReader // list of readers on specific paths map[path]reader - cfgRld chan struct{} // signal the need of config reloading - chan path / *any - sS rpcclient.RpcClientConnection + cfg *config.CGRConfig + filterS *engine.FilterS + rdrs map[string][]EventReader // list of readers on specific paths map[path]reader + stopLsn map[string]chan struct{} // stops listening on paths + cfgRld chan struct{} // signal the need of config reloading - chan path / *any + sS rpcclient.RpcClientConnection // connection towards SessionS + } // ListenAndServe loops keeps the service alive -func (erS *ERService) ListenAndServe(exitChan chan bool) error { - go erS.handleReloads() // start backup loop +func (erS *ERService) ListenAndServe(exitChan chan bool) (err error) { + for _, rdrCfg := range erS.cfg.ERsCfg().Readers { + var rdr EventReader + if rdr, err = NewEventReader(rdrCfg); err != nil { + return + } + if _, hasPath := erS.rdrs[rdrCfg.SourcePath]; !hasPath && + rdrCfg.Type == utils.MetaFileCSV && + rdrCfg.RunDelay == time.Duration(-1) { // set the channel to control listen stop + erS.stopLsn[rdrCfg.SourcePath] = make(chan struct{}) + if err = erS.watchDir(rdrCfg.SourcePath); err != nil { + return + } + } + erS.rdrs[rdrCfg.SourcePath] = append(erS.rdrs[rdrCfg.SourcePath], rdr) + + } + go erS.handleReloads() e := <-exitChan exitChan <- e // put back for the others listening for shutdown request - return nil + return } // erCfgRef will be used to reference a specific reader @@ -70,13 +91,13 @@ type erCfgRef struct { func (erS *ERService) handleReloads() { for { <-erS.cfgRld - cfgIDs := make(map[string]*erCfgRef) // IDs which are configured in EventReader profiles - inUseIDs := make(map[string]*erCfgRef) // IDs which are running in ERService indexed on path + cfgIDs := make(map[string]int) // IDs which are configured in EventReader profiles as map[id]cfgIdx + inUseIDs := make(map[string]*erCfgRef) // IDs which are running in ERService map[id]rdrIdx addIDs := make(map[string]struct{}) // IDs which need to be added to ERService remIDs := make(map[string]struct{}) // IDs which need to be removed from ERService // index config IDs for i, rdrCfg := range erS.cfg.ERsCfg().Readers { - cfgIDs[rdrCfg.ID] = &erCfgRef{path: rdrCfg.SourcePath, idx: i} + cfgIDs[rdrCfg.ID] = i } erS.Lock() // index in use IDs @@ -104,20 +125,74 @@ func (erS *ERService) handleReloads() { copy(rdrSlc[ref.idx:], rdrSlc[ref.idx+1:]) rdrSlc[len(rdrSlc)-1] = nil // so it can be garbage collected rdrSlc = rdrSlc[:len(rdrSlc)-1] + if len(rdrSlc) == 0 { // no more + delete(erS.rdrs, ref.path) + if chn, has := erS.stopLsn[ref.path]; has { + close(chn) + } + } } // add new ids: for id := range addIDs { - cfgRef := cfgIDs[id] - if newRdr, err := NewEventReader(erS.cfg.ERsCfg().Readers[cfgRef.idx]); err != nil { + rdrCfg := erS.cfg.ERsCfg().Readers[cfgIDs[id]] + if rdr, err := NewEventReader(rdrCfg); err != nil { utils.Logger.Warning( fmt.Sprintf( "<%s> error reloading config with ID: <%s>, err: <%s>", utils.ERs, id, err.Error())) } else { - erS.rdrs[cfgRef.path] = append(erS.rdrs[cfgRef.path], newRdr) + if _, hasPath := erS.rdrs[rdrCfg.SourcePath]; !hasPath && + rdrCfg.Type == utils.MetaFileCSV && + rdrCfg.RunDelay == time.Duration(-1) { // set the channel to control listen stop + erS.stopLsn[rdrCfg.SourcePath] = make(chan struct{}) + if err := erS.watchDir(rdrCfg.SourcePath); err != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> error scheduling dir watch for config: <%s>, err: <%s>", + utils.ERs, id, err.Error())) + } + } + erS.rdrs[rdrCfg.SourcePath] = append(erS.rdrs[rdrCfg.SourcePath], rdr) } - } erS.Unlock() } } + +// trackFiles +func (erS *ERService) watchDir(dirPath string) (err error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return + } + defer watcher.Close() + err = watcher.Add(dirPath) + if err != nil { + return + } + utils.Logger.Info(fmt.Sprintf("<%s> monitoring <%s> for file moves.", utils.ERs, dirPath)) + stopLsnChan := erS.stopLsn[dirPath] + for { + select { + case <-stopLsnChan: // stop listening + utils.Logger.Info(fmt.Sprintf("<%s> stop listening on path <%s>", utils.ERs, dirPath)) + return + case ev := <-watcher.Events: + if ev.Op&fsnotify.Create == fsnotify.Create && + path.Ext(ev.Name) == utils.CSVSuffix { + go func() { //Enable async processing here + if err = erS.processPath(ev.Name); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> processing path <%s>, error: <%s>", + utils.ERs, ev.Name, err.Error())) + } + }() + } + case err := <-watcher.Errors: + utils.Logger.Err(fmt.Sprintf("<%s> inotify error: <%s>", utils.ERs, err.Error())) + } + } +} + +func (erS *ERService) processPath(path string) (err error) { + return +} diff --git a/ers/reader.go b/ers/reader.go index 331059cab..8e3e6ceff 100644 --- a/ers/reader.go +++ b/ers/reader.go @@ -25,6 +25,7 @@ import ( type EventReader interface { ID() string // configuration identifier + Config() *config.EventReaderCfg // reader configuration Init(args interface{}) error // init will initialize the Reader, ie: open the file to read or http connection Read() (*utils.CGREvent, error) // Process a single record in the events file Processed() int64 // number of records processed