From 0229c6e90e4349751f4fad7ebbbd2d1d4d533d2d Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 1 Sep 2019 18:45:49 +0200 Subject: [PATCH] ERs with one id per path implementation --- ers/ers.go | 170 +++++++++++++------------------------------------ ers/filecsv.go | 4 ++ ers/libers.go | 60 +++++++++++++++++ ers/reader.go | 1 + 4 files changed, 109 insertions(+), 126 deletions(-) create mode 100644 ers/libers.go diff --git a/ers/ers.go b/ers/ers.go index ebe20db1e..e746cfcca 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -20,18 +20,12 @@ package ers import ( "fmt" - "path" - "path/filepath" - "strings" "sync" - "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" - "github.com/fsnotify/fsnotify" ) // NewERService instantiates the ERService @@ -39,7 +33,7 @@ func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, sS rpcclient.RpcClientConnection, exitChan chan bool) (erS *ERService, err error) { erS = &ERService{ cfg: cfg, - rdrs: make(map[string][]EventReader), + rdrs: make(map[string]EventReader), stopLsn: make(map[string]chan struct{}), sS: sS, exitChan: exitChan, @@ -51,42 +45,42 @@ func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, type ERService struct { sync.RWMutex cfg *config.CGRConfig - rdrs map[string][]EventReader // list of readers on specific paths map[path]reader - stopLsn map[string]chan struct{} // stops listening on paths + rdrs map[string]EventReader // map[rdrID]EventReader + stopLsn map[string]chan struct{} // map[rdrID] chan struct{} filterS *engine.FilterS sS rpcclient.RpcClientConnection // connection towards SessionS exitChan chan bool } -// ListenAndServe loops keeps the service alive +// ListenAndServe keeps the service alive func (erS *ERService) ListenAndServe(cfgRldChan chan struct{}) (err error) { - var watchDirs []string for _, rdrCfg := range erS.cfg.ERsCfg().Readers { - var rdr EventReader - if rdr, err = NewEventReader(rdrCfg); err != nil { + if err = erS.addReader(rdrCfg); err != nil { + utils.Logger.Crit( + fmt.Sprintf("<%s> adding reader <%s> got error: <%s>", + utils.ERs, rdrCfg.ID, err.Error())) return } - srcPath := rdrCfg.SourcePath - if strings.HasSuffix(srcPath, utils.Slash) { - srcPath = strings.TrimSuffix(srcPath, utils.Slash) - } - if _, hasPath := erS.rdrs[srcPath]; !hasPath && - rdrCfg.Type == utils.MetaFileCSV && - rdrCfg.RunDelay == time.Duration(-1) { // set the channel to control listen stop - erS.stopLsn[srcPath] = make(chan struct{}) - watchDirs = append(watchDirs, srcPath) - } - erS.rdrs[srcPath] = append(erS.rdrs[srcPath], rdr) } go erS.handleReloads(cfgRldChan) - erS.setDirWatchers(watchDirs) e := <-erS.exitChan erS.exitChan <- e // put back for the others listening for shutdown request return } +// addReader will add a new reader to the service +func (erS *ERService) addReader(rdrCfg *config.EventReaderCfg) (err error) { + var rdr EventReader + if rdr, err = NewEventReader(rdrCfg); err != nil { + return + } + erS.rdrs[rdrCfg.ID] = rdr + erS.stopLsn[rdrCfg.ID] = make(chan struct{}) + return rdr.Subscribe() +} + // setDirWatchers sets up directory watchers -func (erS *ERService) setDirWatchers(dirPaths []string) { +/*func (erS *ERService) setDirWatchers(dirPaths []string) { for _, dirPath := range dirPaths { go func() { if err := erS.watchDir(dirPath); err != nil { @@ -98,12 +92,7 @@ func (erS *ERService) setDirWatchers(dirPaths []string) { }() } } - -// erCfgRef will be used to reference a specific reader -type erCfgRef struct { - path string - idx int -} +*/ // handleReloads will handle the config reloads which are signaled over cfgRldChan func (erS *ERService) handleReloads(cfgRldChan chan struct{}) { @@ -112,111 +101,39 @@ func (erS *ERService) handleReloads(cfgRldChan chan struct{}) { case <-erS.exitChan: return case <-cfgRldChan: - cfgIDs := make(map[string]int) // IDs which are configured in EventReader profiles as map[id]cfgIdx - inUseIDs := make(map[string]*erCfgRef) // IDs which are running in ERService map[id]rdrIdx - addIDs := make(map[string]struct{}) // IDs which need to be added to ERService - remIDs := make(map[string]struct{}) // IDs which need to be removed from ERService + cfgIDs := make(map[string]*config.EventReaderCfg) // index config IDs - for i, rdrCfg := range erS.cfg.ERsCfg().Readers { - cfgIDs[rdrCfg.ID] = i + for _, rdrCfg := range erS.cfg.ERsCfg().Readers { + cfgIDs[rdrCfg.ID] = rdrCfg } erS.Lock() - // index in use IDs - for path, rdrs := range erS.rdrs { - for i, rdr := range rdrs { - inUseIDs[rdr.Config().ID] = &erCfgRef{path: path, idx: i} - } - } - // find out removed ids - for id := range inUseIDs { - if _, has := cfgIDs[id]; !has { - remIDs[id] = struct{}{} - } - } - // find out added ids - for id := range cfgIDs { - if _, has := inUseIDs[id]; !has { - addIDs[id] = struct{}{} - } - } // remove the necessary ids - for id := range remIDs { - ref := inUseIDs[id] - rdrSlc := erS.rdrs[ref.path] - - copy(rdrSlc[ref.idx:], rdrSlc[ref.idx+1:]) - rdrSlc[len(rdrSlc)-1] = nil // so it can be garbage collected - rdrSlc = rdrSlc[:len(rdrSlc)-1] - if len(rdrSlc) == 0 { // no more - delete(erS.rdrs, ref.path) - if chn, has := erS.stopLsn[ref.path]; has { - close(chn) - } + for id := range erS.rdrs { + if _, has := cfgIDs[id]; has { // still present + continue + } + delete(erS.rdrs, id) + close(erS.stopLsn[id]) + delete(erS.stopLsn, id) + } + // add new ids + for id, rdrCfg := range cfgIDs { + if _, has := erS.rdrs[id]; has { + continue + } + if err := erS.addReader(rdrCfg); err != nil { + utils.Logger.Crit( + fmt.Sprintf("<%s> adding reader <%s> got error: <%s>", + utils.ERs, rdrCfg.ID, err.Error())) + erS.exitChan <- true } } - // add new ids: - var watchDirs []string - for id := range addIDs { - rdrCfg := erS.cfg.ERsCfg().Readers[cfgIDs[id]] - srcPath := rdrCfg.SourcePath - if strings.HasSuffix(srcPath, utils.Slash) { - srcPath = strings.TrimSuffix(srcPath, utils.Slash) - } - if rdr, err := NewEventReader(rdrCfg); err != nil { - utils.Logger.Warning( - fmt.Sprintf( - "<%s> error reloading config with ID: <%s>, err: <%s>", - utils.ERs, id, err.Error())) - } else { - if _, hasPath := erS.rdrs[srcPath]; !hasPath && - rdrCfg.Type == utils.MetaFileCSV && - rdrCfg.RunDelay == time.Duration(-1) { // set the channel to control listen stop - erS.stopLsn[srcPath] = make(chan struct{}) - watchDirs = append(watchDirs, srcPath) - } - erS.rdrs[srcPath] = append(erS.rdrs[srcPath], rdr) - } - } - erS.setDirWatchers(watchDirs) erS.Unlock() } } } -// watchDir sets up a watcher via inotify to be triggered on new files -func (erS *ERService) watchDir(dirPath string) (err error) { - watcher, err := fsnotify.NewWatcher() - if err != nil { - return - } - defer watcher.Close() - err = watcher.Add(dirPath) - if err != nil { - return - } - utils.Logger.Info(fmt.Sprintf("<%s> monitoring <%s> for file moves.", utils.ERs, dirPath)) - stopLsnChan := erS.stopLsn[dirPath] - for { - select { - case <-stopLsnChan: // stop listening - utils.Logger.Info(fmt.Sprintf("<%s> stop listening on path <%s>", utils.ERs, dirPath)) - return - case ev := <-watcher.Events: - if ev.Op&fsnotify.Create == fsnotify.Create && - path.Ext(ev.Name) == utils.CSVSuffix { - go func() { //Enable async processing here - if err = erS.processPath(filepath.Dir(ev.Name), filepath.Base(ev.Name)); err != nil { - utils.Logger.Warning(fmt.Sprintf("<%s> processing path <%s>, error: <%s>", - utils.ERs, ev.Name, err.Error())) - } - }() - } - case err := <-watcher.Errors: - utils.Logger.Err(fmt.Sprintf("<%s> inotify error: <%s>", utils.ERs, err.Error())) - } - } -} - +/* // processPath will be called each time a new run should be triggered func (erS *ERService) processPath(itmPath string, itmID string) error { rdrs, has := erS.rdrs[itmPath] @@ -388,3 +305,4 @@ func (erS *ERService) processPath(itmPath string, itmID string) error { } return nil } +*/ diff --git a/ers/filecsv.go b/ers/filecsv.go index 4629ae368..5200efb22 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -39,6 +39,10 @@ func (csv *CSVFileER) Init(itmPath, itmID string) (err error) { return } +func (csv *CSVFileER) Subscribe() (err error) { + return +} + func (csv *CSVFileER) Read() (ev *utils.CGREvent, err error) { return } diff --git a/ers/libers.go b/ers/libers.go new file mode 100644 index 000000000..00bcb5757 --- /dev/null +++ b/ers/libers.go @@ -0,0 +1,60 @@ +/* +Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ers + +import ( + "fmt" + "path/filepath" + + "github.com/cgrates/cgrates/utils" + "github.com/fsnotify/fsnotify" +) + +// watchDir sets up a watcher via inotify to be triggered on new files +// sysID is the subsystem ID, f will be triggered on match +func watchDir(dirPath string, f func(itmPath, itmID string) error, + sysID string, stopWatching chan struct{}) (err error) { + var watcher *fsnotify.Watcher + if watcher, err = fsnotify.NewWatcher(); err != nil { + return + } + defer watcher.Close() + if err = watcher.Add(dirPath); err != nil { + return + } + utils.Logger.Info(fmt.Sprintf("<%s> monitoring <%s> for file moves.", sysID, dirPath)) + for { + select { + case <-stopWatching: + utils.Logger.Info(fmt.Sprintf("<%s> stop watching path <%s>", sysID, dirPath)) + return + case ev := <-watcher.Events: + if ev.Op&fsnotify.Create == fsnotify.Create { + go func() { //Enable async processing here + if err = f(filepath.Dir(ev.Name), filepath.Base(ev.Name)); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> processing path <%s>, error: <%s>", + sysID, ev.Name, err.Error())) + } + }() + } + case err = <-watcher.Errors: + return + } + } +} diff --git a/ers/reader.go b/ers/reader.go index 3c81d5852..4230c3d0b 100644 --- a/ers/reader.go +++ b/ers/reader.go @@ -28,6 +28,7 @@ import ( type EventReader interface { Config() *config.EventReaderCfg // reader configuration Init(itmPath, itmName string) error // init will initialize the Reader, ie: open the file to read or http connection + Subscribe() error // subscribe the reader on the path Read() (*utils.CGREvent, error) // process a single record in the events file Processed() int64 // number of records processed Close() error // called when the reader should release resources