From bee505b27837f3c1e6a9e16e1609a4f8442c6705 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 26 Aug 2019 22:05:02 +0200 Subject: [PATCH] Engine with startERs section supporting configuration reloads --- cmd/cgr-engine/cgr-engine.go | 46 +++++++++++ config/config.go | 15 ++++ ers/ers.go | 145 ++++++++++++++++++----------------- 3 files changed, 135 insertions(+), 71 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index a3b254496..f4f74aa06 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -39,6 +39,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/dispatchers" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/ers" "github.com/cgrates/cgrates/loaders" "github.com/cgrates/cgrates/scheduler" "github.com/cgrates/cgrates/servmanager" @@ -320,6 +321,46 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in } } +// startERs handles starting of the EventReader Service +func startERs(sSChan, dspSChan chan rpcclient.RpcClientConnection, + filterSChan chan *engine.FilterS, + cfgRld chan struct{}, exitChan chan bool) { + var err error + + utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ERs)) + filterS := <-filterSChan + filterSChan <- filterS + // overwrite the session service channel with dispatcher one + if cfg.DispatcherSCfg().Enabled { + sSChan = dspSChan + } + var sS rpcclient.RpcClientConnection + if sS, err = engine.NewRPCPool(rpcclient.POOL_FIRST, + cfg.TlsCfg().ClientKey, + cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, + cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, + cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, + cfg.ERsCfg().SessionSConns, sSChan, false); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>", + utils.ERs, utils.SessionS, err.Error())) + exitChan <- true + return + } + + var erS *ers.ERService + if erS, err = ers.NewERService(cfg, filterS, sS); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error())) + exitChan <- true + return + } + + if err = erS.ListenAndServe(cfgRld, exitChan); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error())) + } + + exitChan <- true +} + func startAsteriskAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, exitChan chan bool) { var err error var sS rpcclient.RpcClientConnection @@ -1712,6 +1753,11 @@ func main() { internalAttributeSChan, internalCdrSChan, internalChargerSChan, internalDispatcherSChan, server, dm, exitChan) } + + if cfg.ERsCfg().Enabled { + go startERs(internalSMGChan, internalDispatcherSChan, + filterSChan, cfg.GetReloadChan(config.ERsJson), exitChan) + } // Start FreeSWITCHAgent if cfg.FsAgentCfg().Enabled { go startFsAgent(internalSMGChan, internalDispatcherSChan, exitChan) diff --git a/config/config.go b/config/config.go index 83e8e9697..3d052aa84 100755 --- a/config/config.go +++ b/config/config.go @@ -27,6 +27,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "time" "github.com/cgrates/cgrates/utils" @@ -128,6 +129,8 @@ func SetCgrConfig(cfg *CGRConfig) { func NewDefaultCGRConfig() (*CGRConfig, error) { cfg := new(CGRConfig) + cfg.lks = make(map[string]*sync.RWMutex) + cfg.lks[ERsJson] = new(sync.RWMutex) cfg.DataFolderPath = "/usr/share/cgrates/" cfg.MaxCallDuration = time.Duration(3) * time.Hour // Hardcoded for now @@ -182,6 +185,9 @@ func NewDefaultCGRConfig() (*CGRConfig, error) { cfg.ConfigReloads[utils.SMAsterisk] = make(chan struct{}, 1) cfg.ConfigReloads[utils.SMAsterisk] <- struct{}{} // Unlock the channel + cfg.rldChans = make(map[string]chan struct{}) + cfg.rldChans[ERsJson] = make(chan struct{}, 1) + cgrJsonCfg, err := NewCgrJsonCfgFromReader(strings.NewReader(CGRATES_CFG_JSON)) if err != nil { return nil, err @@ -311,6 +317,7 @@ func loadConfigFromHttp(cfg *CGRConfig, urlPaths string) (*CGRConfig, error) { // Holds system configuration, defaults are overwritten with values from config file if found type CGRConfig struct { + lks map[string]*sync.RWMutex MaxCallDuration time.Duration // The maximum call duration (used by responder when querying DerivedCharging) // ToDo: export it in configuration file DataFolderPath string // Path towards data folder, for tests internal usage, not loading out of .json options ConfigPath string // Path towards config @@ -326,6 +333,7 @@ type CGRConfig struct { httpAgentCfg HttpAgentCfgs // HttpAgent configs ConfigReloads map[string]chan struct{} // Signals to specific entities that a config reload should occur + rldChans map[string]chan struct{} // index here the channels used for reloads generalCfg *GeneralCfg // General config dataDbCfg *DataDbCfg // Database config @@ -1233,10 +1241,17 @@ func (cfg *CGRConfig) ApierCfg() *ApierCfg { return cfg.apier } +// ERsCfg reads the EventReader configuration func (cfg *CGRConfig) ERsCfg() *ERsCfg { + cfg.lks[ERsJson].RLock() + defer cfg.lks[ERsJson].RUnlock() return cfg.ersCfg } +func (cfg *CGRConfig) GetReloadChan(sectID string) chan struct{} { + return cfg.rldChans[sectID] +} + // Call implements rpcclient.RpcClientConnection interface for internal RPC func (cSv1 *CGRConfig) Call(serviceMethod string, args interface{}, reply interface{}) error { diff --git a/ers/ers.go b/ers/ers.go index 635cbdbb7..effbb0891 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -32,15 +32,12 @@ import ( ) // NewERService instantiates the ERService -func NewERService(cfg *config.CGRConfig, - filterS *engine.FilterS, - sS rpcclient.RpcClientConnection, - cfgRld chan struct{}) (erS *ERService, err error) { +func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, + sS rpcclient.RpcClientConnection) (erS *ERService, err error) { erS = &ERService{ cfg: cfg, rdrs: make(map[string][]EventReader), stopLsn: make(map[string]chan struct{}), - cfgRld: cfgRld, sS: sS, } return @@ -50,16 +47,16 @@ func NewERService(cfg *config.CGRConfig, type ERService struct { sync.RWMutex cfg *config.CGRConfig + rdrs map[string][]EventReader // list of readers on specific paths map[path]reader + stopLsn map[string]chan struct{} // stops listening on paths filterS *engine.FilterS - rdrs map[string][]EventReader // list of readers on specific paths map[path]reader - stopLsn map[string]chan struct{} // stops listening on paths - cfgRld chan struct{} // signal the need of config reloading - chan path / *any sS rpcclient.RpcClientConnection // connection towards SessionS } // ListenAndServe loops keeps the service alive -func (erS *ERService) ListenAndServe(exitChan chan bool) (err error) { +func (erS *ERService) ListenAndServe(cfgRldChan chan struct{}, + exitChan chan bool) (err error) { for _, rdrCfg := range erS.cfg.ERsCfg().Readers { var rdr EventReader if rdr, err = NewEventReader(rdrCfg); err != nil { @@ -76,7 +73,7 @@ func (erS *ERService) ListenAndServe(exitChan chan bool) (err error) { erS.rdrs[rdrCfg.SourcePath] = append(erS.rdrs[rdrCfg.SourcePath], rdr) } - go erS.handleReloads() + go erS.handleReloads(cfgRldChan, exitChan) e := <-exitChan exitChan <- e // put back for the others listening for shutdown request return @@ -88,78 +85,83 @@ type erCfgRef struct { idx int } -func (erS *ERService) handleReloads() { +// handleReloads will handle the config reloads which are signaled over cfgRldChan +func (erS *ERService) handleReloads(cfgRldChan chan struct{}, exitChan chan bool) { for { - <-erS.cfgRld - cfgIDs := make(map[string]int) // IDs which are configured in EventReader profiles as map[id]cfgIdx - inUseIDs := make(map[string]*erCfgRef) // IDs which are running in ERService map[id]rdrIdx - addIDs := make(map[string]struct{}) // IDs which need to be added to ERService - remIDs := make(map[string]struct{}) // IDs which need to be removed from ERService - // index config IDs - for i, rdrCfg := range erS.cfg.ERsCfg().Readers { - cfgIDs[rdrCfg.ID] = i - } - erS.Lock() - // index in use IDs - for path, rdrs := range erS.rdrs { - for i, rdr := range rdrs { - inUseIDs[rdr.ID()] = &erCfgRef{path: path, idx: i} + select { + case <-exitChan: + return + case <-cfgRldChan: + cfgIDs := make(map[string]int) // IDs which are configured in EventReader profiles as map[id]cfgIdx + inUseIDs := make(map[string]*erCfgRef) // IDs which are running in ERService map[id]rdrIdx + addIDs := make(map[string]struct{}) // IDs which need to be added to ERService + remIDs := make(map[string]struct{}) // IDs which need to be removed from ERService + // index config IDs + for i, rdrCfg := range erS.cfg.ERsCfg().Readers { + cfgIDs[rdrCfg.ID] = i } - } - // find out removed ids - for id := range inUseIDs { - if _, has := cfgIDs[id]; !has { - remIDs[id] = struct{}{} - } - } - // find out added ids - for id := range cfgIDs { - if _, has := inUseIDs[id]; !has { - addIDs[id] = struct{}{} - } - } - for id := range remIDs { - ref := inUseIDs[id] - rdrSlc := erS.rdrs[ref.path] - // remove the ids - copy(rdrSlc[ref.idx:], rdrSlc[ref.idx+1:]) - rdrSlc[len(rdrSlc)-1] = nil // so it can be garbage collected - rdrSlc = rdrSlc[:len(rdrSlc)-1] - if len(rdrSlc) == 0 { // no more - delete(erS.rdrs, ref.path) - if chn, has := erS.stopLsn[ref.path]; has { - close(chn) + erS.Lock() + // index in use IDs + for path, rdrs := range erS.rdrs { + for i, rdr := range rdrs { + inUseIDs[rdr.ID()] = &erCfgRef{path: path, idx: i} } } - } - // add new ids: - for id := range addIDs { - rdrCfg := erS.cfg.ERsCfg().Readers[cfgIDs[id]] - if rdr, err := NewEventReader(rdrCfg); err != nil { - utils.Logger.Warning( - fmt.Sprintf( - "<%s> error reloading config with ID: <%s>, err: <%s>", - utils.ERs, id, err.Error())) - } else { - if _, hasPath := erS.rdrs[rdrCfg.SourcePath]; !hasPath && - rdrCfg.Type == utils.MetaFileCSV && - rdrCfg.RunDelay == time.Duration(-1) { // set the channel to control listen stop - erS.stopLsn[rdrCfg.SourcePath] = make(chan struct{}) - if err := erS.watchDir(rdrCfg.SourcePath); err != nil { - utils.Logger.Warning( - fmt.Sprintf( - "<%s> error scheduling dir watch for config: <%s>, err: <%s>", - utils.ERs, id, err.Error())) + // find out removed ids + for id := range inUseIDs { + if _, has := cfgIDs[id]; !has { + remIDs[id] = struct{}{} + } + } + // find out added ids + for id := range cfgIDs { + if _, has := inUseIDs[id]; !has { + addIDs[id] = struct{}{} + } + } + for id := range remIDs { + ref := inUseIDs[id] + rdrSlc := erS.rdrs[ref.path] + // remove the ids + copy(rdrSlc[ref.idx:], rdrSlc[ref.idx+1:]) + rdrSlc[len(rdrSlc)-1] = nil // so it can be garbage collected + rdrSlc = rdrSlc[:len(rdrSlc)-1] + if len(rdrSlc) == 0 { // no more + delete(erS.rdrs, ref.path) + if chn, has := erS.stopLsn[ref.path]; has { + close(chn) } } - erS.rdrs[rdrCfg.SourcePath] = append(erS.rdrs[rdrCfg.SourcePath], rdr) } + // add new ids: + for id := range addIDs { + rdrCfg := erS.cfg.ERsCfg().Readers[cfgIDs[id]] + if rdr, err := NewEventReader(rdrCfg); err != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> error reloading config with ID: <%s>, err: <%s>", + utils.ERs, id, err.Error())) + } else { + if _, hasPath := erS.rdrs[rdrCfg.SourcePath]; !hasPath && + rdrCfg.Type == utils.MetaFileCSV && + rdrCfg.RunDelay == time.Duration(-1) { // set the channel to control listen stop + erS.stopLsn[rdrCfg.SourcePath] = make(chan struct{}) + if err := erS.watchDir(rdrCfg.SourcePath); err != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> error scheduling dir watch for config: <%s>, err: <%s>", + utils.ERs, id, err.Error())) + } + } + erS.rdrs[rdrCfg.SourcePath] = append(erS.rdrs[rdrCfg.SourcePath], rdr) + } + } + erS.Unlock() } - erS.Unlock() } } -// trackFiles +// watchDir sets up a watcher via inotify to be triggered on new files func (erS *ERService) watchDir(dirPath string) (err error) { watcher, err := fsnotify.NewWatcher() if err != nil { @@ -194,5 +196,6 @@ func (erS *ERService) watchDir(dirPath string) (err error) { } func (erS *ERService) processPath(path string) (err error) { + fmt.Printf("ERService processPath: <%s>", path) return }