From a81fce8341adac002446d563318221b809ae4d03 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 12 Nov 2020 17:29:48 +0200 Subject: [PATCH] Updated services shutdown --- agents/astagent.go | 3 +- analyzers/analyzers.go | 5 +- analyzers/analyzers_test.go | 8 +- apier/v1/apier.go | 2 +- cmd/cgr-engine/cgr-engine.go | 192 +++++++++++++--------------- cores/caps.go | 9 +- cores/caps_test.go | 8 +- cores/core.go | 14 +- cores/server.go | 61 ++++----- dispatcherh/dispatcherh.go | 12 +- dispatcherh/dispatcherh_test.go | 10 +- dispatchers/dispatchers.go | 11 +- ees/ees.go | 6 +- engine/attributes.go | 8 -- engine/cdrs.go | 2 +- engine/chargers.go | 8 -- engine/resources.go | 9 -- engine/responder.go | 4 +- engine/routes.go | 8 -- engine/stats.go | 9 -- engine/thresholds.go | 9 -- ers/ers.go | 12 +- ers/ers_test.go | 6 +- general_tests/sessions_race_test.go | 2 +- guardian/guardian.go | 2 +- loaders/loaders.go | 2 +- rates/rates.go | 6 +- rates/rates_test.go | 8 +- scheduler/scheduler.go | 14 +- services/analyzers.go | 13 +- services/apiers_it_test.go | 7 +- services/apierv1.go | 14 +- services/apierv2.go | 2 +- services/asteriskagent.go | 16 ++- services/attributes_it_test.go | 7 +- services/cdrs.go | 13 +- services/cdrs_it_test.go | 7 +- services/chargers_it_test.go | 7 +- services/cores.go | 110 ++++++++++++++++ services/datadb_it_test.go | 7 +- services/diameteragent.go | 6 +- services/dispatcherh.go | 28 ++-- services/dispatchers.go | 4 +- services/dispatchers_it_test.go | 7 +- services/dnsagent.go | 8 +- services/dnsagent_it_test.go | 8 +- services/ees.go | 15 +-- services/ees_it_test.go | 9 +- services/ers.go | 18 +-- services/ers_it_test.go | 7 +- services/freeswitchagent.go | 8 +- services/kamailioagent.go | 8 +- services/loaders.go | 7 +- services/radiusagent.go | 6 +- services/rals.go | 10 +- services/rals_it_test.go | 7 +- services/rates.go | 17 +-- services/rates_it_test.go | 9 +- services/resources_it_test.go | 7 +- services/responders.go | 5 +- services/routes_it_test.go | 7 +- services/schedulers_it_test.go | 7 +- services/sessions.go | 11 +- services/sessions_it_test.go | 7 +- services/sipagent.go | 8 +- services/sipagent_it_test.go | 8 +- services/stats_it_test.go | 7 +- services/thresholds_it_test.go | 7 +- servmanager/servmanager.go | 58 ++++++--- sessions/sessions.go | 22 ++-- 70 files changed, 492 insertions(+), 497 deletions(-) create mode 100644 services/cores.go diff --git a/agents/astagent.go b/agents/astagent.go index a74116fe0..408d5d556 100644 --- a/agents/astagent.go +++ b/agents/astagent.go @@ -89,7 +89,7 @@ func (sma *AsteriskAgent) connectAsterisk() (err error) { } // ListenAndServe is called to start the service -func (sma *AsteriskAgent) ListenAndServe() (err error) { +func (sma *AsteriskAgent) ListenAndServe(stopChan <-chan struct{}) (err error) { if err = sma.connectAsterisk(); err != nil { return } @@ -97,6 +97,7 @@ func (sma *AsteriskAgent) ListenAndServe() (err error) { utils.AsteriskAgent, sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address)) for { select { + case <-stopChan: case err = <-sma.astErrChan: return case astRawEv := <-sma.astEvChan: diff --git a/analyzers/analyzers.go b/analyzers/analyzers.go index 647c1cf8b..85eb9ce84 100755 --- a/analyzers/analyzers.go +++ b/analyzers/analyzers.go @@ -82,15 +82,14 @@ func (aS *AnalyzerService) deleteHits(hits search.DocumentMatchCollection) (err } // ListenAndServe will initialize the service -func (aS *AnalyzerService) ListenAndServe(exitChan chan bool) (err error) { +func (aS *AnalyzerService) ListenAndServe(exitChan <-chan struct{}) (err error) { utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.AnalyzerS)) if err = aS.clenaUp(); err != nil { // clean up the data at the system start return } for { select { - case e := <-exitChan: - exitChan <- e // put back for the others listening for shutdown request + case <-exitChan: return case <-time.After(aS.cfg.AnalyzerSCfg().CleanupInterval): if err = aS.clenaUp(); err != nil { diff --git a/analyzers/analyzers_test.go b/analyzers/analyzers_test.go index 6565dfea5..d8e18bb5b 100644 --- a/analyzers/analyzers_test.go +++ b/analyzers/analyzers_test.go @@ -57,8 +57,8 @@ func TestNewAnalyzerService(t *testing.T) { if err = anz.initDB(); err != nil { t.Fatal(err) } - exitChan := make(chan bool, 1) - exitChan <- true + exitChan := make(chan struct{}, 1) + exitChan <- struct{}{} if err := anz.ListenAndServe(exitChan); err != nil { t.Fatal(err) } @@ -169,7 +169,7 @@ func TestAnalyzersListenAndServe(t *testing.T) { if err := anz.db.Close(); err != nil { t.Fatal(err) } - anz.ListenAndServe(make(chan bool)) + anz.ListenAndServe(make(chan struct{})) cfg.AnalyzerSCfg().CleanupInterval = 1 anz, err = NewAnalyzerService(cfg) @@ -181,7 +181,7 @@ func TestAnalyzersListenAndServe(t *testing.T) { runtime.Gosched() anz.db.Close() }() - anz.ListenAndServe(make(chan bool)) + anz.ListenAndServe(make(chan struct{})) if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { t.Fatal(err) } diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 31896fce9..cb56fc278 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -1439,7 +1439,7 @@ func (apierSv1 *APIerSv1) GetRatingPlanIDs(args *utils.PaginatorWithTenant, attr } // ListenAndServe listen for storbd reload -func (apierSv1 *APIerSv1) ListenAndServe(stopChan chan struct{}) (err error) { +func (apierSv1 *APIerSv1) ListenAndServe(stopChan chan struct{}) { utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ApierS)) for { select { diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 88413e46a..2d70087a9 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -68,20 +68,20 @@ var ( // startFilterService fires up the FilterS func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS, connMgr *engine.ConnManager, cfg *config.CGRConfig, - dm *engine.DataManager, exitChan chan bool) { + dm *engine.DataManager) { <-cacheS.GetPrecacheChannel(utils.CacheFilters) filterSChan <- engine.NewFilterS(cfg, connMgr, dm) } // initCacheS inits the CacheS and starts precaching as well as populating internal channel for RPC conns func initCacheS(internalCacheSChan chan rpcclient.ClientConnector, - server *cores.Server, dm *engine.DataManager, exitChan chan bool, + server *cores.Server, dm *engine.DataManager, exitChan chan<- struct{}, anz *services.AnalyzerService) (chS *engine.CacheS) { chS = engine.NewCacheS(cfg, dm) go func() { if err := chS.Precache(); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> could not init, error: %s", utils.CacheS, err.Error())) - exitChan <- true + close(exitChan) } }() @@ -110,19 +110,6 @@ func initGuardianSv1(internalGuardianSChan chan rpcclient.ClientConnector, serve internalGuardianSChan <- rpc } -func initCoreSv1(internalCoreSv1Chan chan rpcclient.ClientConnector, server *cores.Server, - anz *services.AnalyzerService, cfg *config.CGRConfig, caps *cores.Caps, exitChan chan bool) { - cSv1 := v1.NewCoreSv1(cores.NewCoreService(cfg, caps, exitChan)) - if !cfg.DispatcherSCfg().Enabled { - server.RpcRegister(cSv1) - } - var rpc rpcclient.ClientConnector = cSv1 - if anz.IsRunning() { - rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.CoreS) - } - internalCoreSv1Chan <- rpc -} - func initServiceManagerV1(internalServiceManagerChan chan rpcclient.ClientConnector, srvMngr *servmanager.ServiceManager, server *cores.Server, anz *services.AnalyzerService) { @@ -136,15 +123,6 @@ func initServiceManagerV1(internalServiceManagerChan chan rpcclient.ClientConnec internalServiceManagerChan <- rpc } -// initLogger will initialize syslog writter, needs to be called after config init -func initLogger(cfg *config.CGRConfig) error { - sylogger := cfg.GeneralCfg().Logger - if *syslogger != "" { // Modify the log level if provided by command arguments - sylogger = *syslogger - } - return utils.Newlogger(sylogger, cfg.GeneralCfg().NodeID) -} - func initConfigSv1(internalConfigChan chan rpcclient.ClientConnector, server *cores.Server, anz *services.AnalyzerService) { cfgSv1 := v1.NewConfigSv1(cfg) @@ -164,8 +142,7 @@ func startRPC(server *cores.Server, internalRaterChan, internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan, internalLoaderSChan, internalRALsv1Chan, internalCacheSChan, internalEEsChan, internalRateSChan chan rpcclient.ClientConnector, - internalPreloadChan chan struct{}, - exitChan chan bool) { + stopChan <-chan struct{}, exitChan chan<- struct{}) { if !cfg.DispatcherSCfg().Enabled { select { // Any of the rpc methods will unlock listening to rpc requests case resp := <-internalRaterChan: @@ -198,13 +175,15 @@ func startRPC(server *cores.Server, internalRaterChan, internalEEsChan <- eeS case rateS := <-internalRateSChan: internalRateSChan <- rateS - case preload := <-internalPreloadChan: - internalPreloadChan <- preload + case <-stopChan: + return } } else { select { case dispatcherS := <-internalDispatcherSChan: internalDispatcherSChan <- dispatcherS + case <-stopChan: + return } } @@ -218,54 +197,50 @@ func startRPC(server *cores.Server, internalRaterChan, cfg.HTTPCfg().HTTPAuthUsers, exitChan, ) + if (len(cfg.ListenCfg().RPCGOBTLSListen) != 0 || + len(cfg.ListenCfg().RPCJSONTLSListen) != 0 || + len(cfg.ListenCfg().HTTPTLSListen) != 0) && + (len(cfg.TlsCfg().ServerCerificate) == 0 || + len(cfg.TlsCfg().ServerKey) == 0) { + utils.Logger.Warning("WARNING: missing TLS certificate/key file!") + return + } if cfg.ListenCfg().RPCGOBTLSListen != "" { - if cfg.TlsCfg().ServerCerificate == "" || cfg.TlsCfg().ServerKey == "" { - utils.Logger.Warning("WARNING: missing TLS certificate/key file!") - } else { - go server.ServeGOBTLS( - cfg.ListenCfg().RPCGOBTLSListen, - cfg.TlsCfg().ServerCerificate, - cfg.TlsCfg().ServerKey, - cfg.TlsCfg().CaCertificate, - cfg.TlsCfg().ServerPolicy, - cfg.TlsCfg().ServerName, - exitChan, - ) - } + go server.ServeGOBTLS( + cfg.ListenCfg().RPCGOBTLSListen, + cfg.TlsCfg().ServerCerificate, + cfg.TlsCfg().ServerKey, + cfg.TlsCfg().CaCertificate, + cfg.TlsCfg().ServerPolicy, + cfg.TlsCfg().ServerName, + exitChan, + ) } if cfg.ListenCfg().RPCJSONTLSListen != "" { - if cfg.TlsCfg().ServerCerificate == "" || cfg.TlsCfg().ServerKey == "" { - utils.Logger.Warning("WARNING: missing TLS certificate/key file!") - } else { - go server.ServeJSONTLS( - cfg.ListenCfg().RPCJSONTLSListen, - cfg.TlsCfg().ServerCerificate, - cfg.TlsCfg().ServerKey, - cfg.TlsCfg().CaCertificate, - cfg.TlsCfg().ServerPolicy, - cfg.TlsCfg().ServerName, - exitChan, - ) - } + go server.ServeJSONTLS( + cfg.ListenCfg().RPCJSONTLSListen, + cfg.TlsCfg().ServerCerificate, + cfg.TlsCfg().ServerKey, + cfg.TlsCfg().CaCertificate, + cfg.TlsCfg().ServerPolicy, + cfg.TlsCfg().ServerName, + exitChan, + ) } if cfg.ListenCfg().HTTPTLSListen != "" { - if cfg.TlsCfg().ServerCerificate == "" || cfg.TlsCfg().ServerKey == "" { - utils.Logger.Warning("WARNING: missing TLS certificate/key file!") - } else { - go server.ServeHTTPTLS( - cfg.ListenCfg().HTTPTLSListen, - cfg.TlsCfg().ServerCerificate, - cfg.TlsCfg().ServerKey, - cfg.TlsCfg().CaCertificate, - cfg.TlsCfg().ServerPolicy, - cfg.TlsCfg().ServerName, - cfg.HTTPCfg().HTTPJsonRPCURL, - cfg.HTTPCfg().HTTPWSURL, - cfg.HTTPCfg().HTTPUseBasicAuth, - cfg.HTTPCfg().HTTPAuthUsers, - exitChan, - ) - } + go server.ServeHTTPTLS( + cfg.ListenCfg().HTTPTLSListen, + cfg.TlsCfg().ServerCerificate, + cfg.TlsCfg().ServerKey, + cfg.TlsCfg().CaCertificate, + cfg.TlsCfg().ServerPolicy, + cfg.TlsCfg().ServerName, + cfg.HTTPCfg().HTTPJsonRPCURL, + cfg.HTTPCfg().HTTPWSURL, + cfg.HTTPCfg().HTTPUseBasicAuth, + cfg.HTTPCfg().HTTPAuthUsers, + exitChan, + ) } } @@ -297,12 +272,12 @@ func memProfFile(memProfPath string) bool { return true } -func memProfiling(memProfDir string, interval time.Duration, nrFiles int, exitChan chan bool) { +func memProfiling(memProfDir string, interval time.Duration, nrFiles int, exitChan chan<- struct{}) { for i := 1; ; i++ { time.Sleep(interval) memPath := path.Join(memProfDir, fmt.Sprintf("mem%v.prof", i)) if !memProfFile(memPath) { - exitChan <- true + close(exitChan) } if i%nrFiles == 0 { i = 0 // reset the counting @@ -310,22 +285,23 @@ func memProfiling(memProfDir string, interval time.Duration, nrFiles int, exitCh } } -func cpuProfiling(cpuProfDir string, stopChan, doneChan chan struct{}, exitChan chan bool) { +func cpuProfiling(cpuProfDir string, stopChan, doneChan chan struct{}, exitChan chan<- struct{}) { cpuPath := path.Join(cpuProfDir, "cpu.prof") f, err := os.Create(cpuPath) + defer func() { close(doneChan) }() if err != nil { utils.Logger.Crit(fmt.Sprintf("could not create cpu profile file: %s", err)) - exitChan <- true + close(exitChan) return } pprof.StartCPUProfile(f) <-stopChan pprof.StopCPUProfile() f.Close() - doneChan <- struct{}{} + } -func singnalHandler(exitChan chan bool) { +func singnalHandler(stopChan <-chan struct{}, exitChan chan<- struct{}) { shutdownSignal := make(chan os.Signal) reloadSignal := make(chan os.Signal) signal.Notify(shutdownSignal, os.Interrupt, @@ -333,8 +309,11 @@ func singnalHandler(exitChan chan bool) { signal.Notify(reloadSignal, syscall.SIGHUP) for { select { + case <-stopChan: + return case <-shutdownSignal: - exitChan <- true + close(exitChan) + return case <-reloadSignal: // do it in it's own gorutine in order to not block the signal handler with the reload functionality go func() { @@ -353,16 +332,14 @@ func singnalHandler(exitChan chan bool) { } func runPreload(loader *services.LoaderService, internalLoaderSChan chan rpcclient.ClientConnector, - internalPreloadChan chan struct{}, exitChan chan bool) { - + exitChan chan<- struct{}) { if !cfg.LoaderCfg().Enabled() { utils.Logger.Err(fmt.Sprintf("<%s> not enabled but required by preload mechanism", utils.LoaderS)) - exitChan <- true + close(exitChan) return } - ldr := <-internalLoaderSChan - internalLoaderSChan <- ldr + internalLoaderSChan <- <-internalLoaderSChan var reply string for _, loaderID := range strings.Split(*preload, utils.FIELDS_SEP) { @@ -372,11 +349,10 @@ func runPreload(loader *services.LoaderService, internalLoaderSChan chan rpcclie StopOnError: true, }, &reply); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> preload failed on loadID <%s> , err: <%s>", utils.LoaderS, loaderID, err.Error())) - exitChan <- true + close(exitChan) return } } - internalPreloadChan <- struct{}{} } func main() { @@ -400,8 +376,10 @@ func main() { runtime.GOMAXPROCS(1) // Having multiple cpus may slow down computing due to CPU management, to be reviewed in future Go releases } - exitChan := make(chan bool) - go singnalHandler(exitChan) + exitChan := make(chan struct{}) + signStop := make(chan struct{}) + rpcStop := make(chan struct{}) + go singnalHandler(signStop, exitChan) if *memProfDir != utils.EmptyString { go memProfiling(*memProfDir, *memProfInterval, *memProfNrFiles, exitChan) @@ -419,7 +397,7 @@ func main() { } go func() { // Schedule shutdown time.Sleep(shutdownDur) - exitChan <- true + close(exitChan) return }() } @@ -441,7 +419,8 @@ func main() { config.SetCgrConfig(cfg) // Share the config object // init syslog - if err = initLogger(cfg); err != nil { + if err = utils.Newlogger(utils.FirstNonEmpty(*syslogger, + cfg.GeneralCfg().Logger), cfg.GeneralCfg().NodeID); err != nil { log.Fatalf("Could not initialize syslog connection, err: <%s>", err.Error()) return } @@ -473,7 +452,6 @@ func main() { internalCDRServerChan := make(chan rpcclient.ClientConnector, 1) internalAttributeSChan := make(chan rpcclient.ClientConnector, 1) internalDispatcherSChan := make(chan rpcclient.ClientConnector, 1) - internalDispatcherHChan := make(chan rpcclient.ClientConnector, 1) internalSessionSChan := make(chan rpcclient.ClientConnector, 1) internalChargerSChan := make(chan rpcclient.ClientConnector, 1) internalThresholdSChan := make(chan rpcclient.ClientConnector, 1) @@ -488,7 +466,6 @@ func main() { internalLoaderSChan := make(chan rpcclient.ClientConnector, 1) internalEEsChan := make(chan rpcclient.ClientConnector, 1) internalRateSChan := make(chan rpcclient.ClientConnector, 1) - internalPreloadChan := make(chan struct{}, 1) // initialize the connManager before creating the DMService // because we need to pass the connection to it @@ -515,7 +492,6 @@ func main() { utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs): internalEEsChan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRateS): internalRateSChan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers): internalDispatcherSChan, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatcherh): internalDispatcherHChan, }) gvService := services.NewGlobalVarS(cfg) if err = gvService.Start(); err != nil { @@ -572,13 +548,17 @@ func main() { initGuardianSv1(internalGuardianSChan, server, anz) // init CoreSv1 - initCoreSv1(internalCoreSv1Chan, server, anz, cfg, caps, exitChan) + coreS := services.NewCoreService(cfg, caps, server, internalCoreSv1Chan, anz) + if err := coreS.Start(); err != nil { + fmt.Println(err) + return + } // Start ServiceManager srvManager := servmanager.NewServiceManager(cfg, exitChan) attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan, anz) dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalDispatcherSChan, connManager, anz) - dspH := services.NewDispatcherHostsService(cfg, server, internalDispatcherSChan, connManager, exitChan, anz) + dspH := services.NewDispatcherHostsService(cfg, server, connManager, anz) chrS := services.NewChargerService(cfg, dmService, cacheS, filterSChan, server, internalChargerSChan, connManager, anz) tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server, internalThresholdSChan, anz) @@ -596,7 +576,7 @@ func main() { internalRALsChan, internalResponderChan, exitChan, connManager, anz) - apiSv1 := services.NewAPIerSv1Service(cfg, dmService, storDBService, filterSChan, server, schS, rals.GetResponderService(), + apiSv1 := services.NewAPIerSv1Service(cfg, dmService, storDBService, filterSChan, server, schS, rals.GetResponder(), internalAPIerSv1Chan, connManager, anz) apiSv2 := services.NewAPIerSv2Service(apiSv1, cfg, server, internalAPIerSv2Chan, anz) @@ -606,11 +586,11 @@ func main() { smg := services.NewSessionService(cfg, dmService, server, internalSessionSChan, exitChan, connManager, caps, anz) - ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, exitChan, + ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, internalLoaderSChan, connManager, anz) srvManager.AddServices(gvService, attrS, chrS, tS, stS, reS, routeS, schS, rals, - rals.GetResponder(), apiSv1, apiSv2, cdrS, smg, + apiSv1, apiSv2, cdrS, smg, coreS, services.NewEventReaderService(cfg, filterSChan, exitChan, connManager), services.NewDNSAgent(cfg, filterSChan, exitChan, connManager), services.NewFreeswitchAgent(cfg, exitChan, connManager), @@ -621,15 +601,15 @@ func main() { services.NewHTTPAgent(cfg, filterSChan, server, connManager), // no reload ldrs, anz, dspS, dspH, dmService, storDBService, services.NewEventExporterService(cfg, filterSChan, - connManager, server, exitChan, internalEEsChan, anz), + connManager, server, internalEEsChan, anz), services.NewRateService(cfg, cacheS, filterSChan, dmService, - server, exitChan, internalRateSChan, anz), + server, internalRateSChan, anz), services.NewSIPAgent(cfg, filterSChan, exitChan, connManager), ) srvManager.StartServices() // Start FilterS go startFilterService(filterSChan, cacheS, connManager, - cfg, dmService.GetDM(), exitChan) + cfg, dmService.GetDM()) initServiceManagerV1(internalServeManagerChan, srvManager, server, anz) @@ -659,12 +639,11 @@ func main() { engine.IntRPC.AddInternalRPCClient(utils.RateSv1, internalRateSChan) engine.IntRPC.AddInternalRPCClient(utils.EventExporterSv1, internalEEsChan) engine.IntRPC.AddInternalRPCClient(utils.DispatcherSv1, internalDispatcherSChan) - // engine.IntRPC.AddInternalRPCClient(utils.DispatcherHv1, internalDispatcherHChan) initConfigSv1(internalConfigChan, server, anz) if *preload != utils.EmptyString { - runPreload(ldrs, internalLoaderSChan, internalPreloadChan, exitChan) + runPreload(ldrs, internalLoaderSChan, exitChan) } // Serve rpc connections @@ -673,11 +652,14 @@ func main() { internalAttributeSChan, internalChargerSChan, internalThresholdSChan, internalRouteSChan, internalSessionSChan, internalAnalyzerSChan, internalDispatcherSChan, internalLoaderSChan, internalRALsChan, - internalCacheSChan, internalEEsChan, internalRateSChan, internalPreloadChan, exitChan) + internalCacheSChan, internalEEsChan, internalRateSChan, rpcStop, exitChan) <-exitChan + close(rpcStop) + close(signStop) + srvManager.ShutdownServices(time.Second) if *cpuProfDir != "" { // wait to end cpuProfiling - cpuProfChanStop <- struct{}{} + close(cpuProfChanStop) <-cpuProfChanDone } if *memProfDir != "" { // write last memory profiling diff --git a/cores/caps.go b/cores/caps.go index 4fb5fade4..f3e56b7e0 100644 --- a/cores/caps.go +++ b/cores/caps.go @@ -155,10 +155,10 @@ func (c *capsServerCodec) WriteResponse(r *rpc.Response, x interface{}) error { func (c *capsServerCodec) Close() error { return c.sc.Close() } // NewCapsStats returns the stats for the caps -func NewCapsStats(sampleinterval time.Duration, caps *Caps, exitChan chan bool) (cs *CapsStats) { +func NewCapsStats(sampleinterval time.Duration, caps *Caps, stopChan chan struct{}) (cs *CapsStats) { st, _ := engine.NewStatAverage(1, utils.MetaDynReq, nil) cs = &CapsStats{st: st} - go cs.loop(sampleinterval, exitChan, caps) + go cs.loop(sampleinterval, stopChan, caps) return } @@ -174,11 +174,10 @@ func (cs *CapsStats) OnEvict(itmID string, value interface{}) { cs.st.RemEvent(itmID) } -func (cs *CapsStats) loop(intr time.Duration, exitChan chan bool, caps *Caps) { +func (cs *CapsStats) loop(intr time.Duration, stopChan chan struct{}, caps *Caps) { for { select { - case v := <-exitChan: - exitChan <- v + case <-stopChan: return case <-time.After(intr): evID := time.Now().String() diff --git a/cores/caps_test.go b/cores/caps_test.go index d84f8e858..c263820d8 100644 --- a/cores/caps_test.go +++ b/cores/caps_test.go @@ -67,18 +67,18 @@ func TestCapsStats(t *testing.T) { } exp := &CapsStats{st: st} cr := NewCaps(0, utils.MetaBusy) - exitChan := make(chan bool, 1) - exitChan <- true + exitChan := make(chan struct{}, 1) + close(exitChan) cs := NewCapsStats(1, cr, exitChan) if !reflect.DeepEqual(exp, cs) { t.Errorf("Expected: %v ,received: %v", exp, cs) } <-exitChan - exitChan = make(chan bool, 1) + exitChan = make(chan struct{}, 1) go func() { runtime.Gosched() time.Sleep(100) - exitChan <- true + close(exitChan) }() cr = NewCaps(10, utils.MetaBusy) cr.Allocate() diff --git a/cores/core.go b/cores/core.go index 1df533503..466b72b65 100644 --- a/cores/core.go +++ b/cores/core.go @@ -26,10 +26,10 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewCoreService(cfg *config.CGRConfig, caps *Caps, exitChan chan bool) *CoreService { +func NewCoreService(cfg *config.CGRConfig, caps *Caps, stopChan chan struct{}) *CoreService { var st *CapsStats if caps.IsLimited() && cfg.CoreSCfg().CapsStatsInterval != 0 { - st = NewCapsStats(cfg.CoreSCfg().CapsStatsInterval, caps, exitChan) + st = NewCapsStats(cfg.CoreSCfg().CapsStatsInterval, caps, stopChan) } return &CoreService{ cfg: cfg, @@ -42,16 +42,8 @@ type CoreService struct { capsStats *CapsStats } -// ListenAndServe will initialize the service -func (cS *CoreService) ListenAndServe(exitChan chan bool) (err error) { - utils.Logger.Info("Starting Core service") - e := <-exitChan - exitChan <- e // put back for the others listening for shutdown request - return -} - // Shutdown is called to shutdown the service -func (cS *CoreService) Shutdown() (err error) { +func (cS *CoreService) Shutdown() { utils.Logger.Info(fmt.Sprintf("<%s> shutdown initialized", utils.CoreS)) utils.Logger.Info(fmt.Sprintf("<%s> shutdown complete", utils.CoreS)) return diff --git a/cores/server.go b/cores/server.go index bce8b9742..0fc7234c9 100644 --- a/cores/server.go +++ b/cores/server.go @@ -149,7 +149,7 @@ func (s *Server) BiRPCRegister(rcvr interface{}) { } } -func (s *Server) ServeJSON(addr string, exitChan chan bool) { +func (s *Server) ServeJSON(addr string, exitChan chan<- struct{}) { s.RLock() enabled := s.rpcEnabled s.RUnlock() @@ -157,10 +157,10 @@ func (s *Server) ServeJSON(addr string, exitChan chan bool) { return } + defer func() { close(exitChan) }() lJSON, e := net.Listen(utils.TCP, addr) if e != nil { log.Println("ServeJSON listen error:", e) - exitChan <- true return } utils.Logger.Info(fmt.Sprintf("Starting CGRateS JSON server at <%s>.", addr)) @@ -186,17 +186,17 @@ func (s *Server) ServeJSON(addr string, exitChan chan bool) { } -func (s *Server) ServeGOB(addr string, exitChan chan bool) { +func (s *Server) ServeGOB(addr string, exitChan chan<- struct{}) { s.RLock() enabled := s.rpcEnabled s.RUnlock() if !enabled { return } + defer func() { close(exitChan) }() lGOB, e := net.Listen(utils.TCP, addr) if e != nil { log.Println("ServeGOB listen error:", e) - exitChan <- true return } utils.Logger.Info(fmt.Sprintf("Starting CGRateS GOB server at <%s>.", addr)) @@ -253,7 +253,7 @@ func (s *Server) RegisterProfiler(addr string) { } func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, - useBasicAuth bool, userList map[string]string, exitChan chan bool) { + useBasicAuth bool, userList map[string]string, exitChan chan<- struct{}) { s.RLock() enabled := s.rpcEnabled s.RUnlock() @@ -261,7 +261,7 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, return } // s.httpMux = http.NewServeMux() - if enabled && jsonRPCURL != "" { + if jsonRPCURL != "" { s.Lock() s.httpEnabled = true s.Unlock() @@ -273,7 +273,7 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, s.httpMux.HandleFunc(jsonRPCURL, s.handleRequest) } } - if enabled && wsRPCURL != "" { + if wsRPCURL != "" { s.Lock() s.httpEnabled = true s.Unlock() @@ -282,9 +282,7 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, rpc.ServeCodec(newCapsJSONCodec(ws, s.caps, s.anz)) }) if useBasicAuth { - s.httpMux.HandleFunc(wsRPCURL, use(func(w http.ResponseWriter, r *http.Request) { - wsHandler.ServeHTTP(w, r) - }, basicAuth(userList))) + s.httpMux.HandleFunc(wsRPCURL, use(wsHandler.ServeHTTP, basicAuth(userList))) } else { s.httpMux.Handle(wsRPCURL, wsHandler) } @@ -299,7 +297,7 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, if err := http.ListenAndServe(addr, s.httpMux); err != nil { log.Println(fmt.Sprintf("Error: %s when listening ", err)) } - exitChan <- true + close(exitChan) } // ServeBiJSON create a gorutine to listen and serve as BiRPC server @@ -346,22 +344,18 @@ func (s *Server) StopBiRPC() { // rpcRequest represents a RPC request. // rpcRequest implements the io.ReadWriteCloser interface. type rpcRequest struct { - r io.Reader // holds the JSON formated RPC request + r io.ReadCloser // holds the JSON formated RPC request rw io.ReadWriter // holds the JSON formated RPC response - done chan bool // signals then end of the RPC request remoteAddr net.Addr caps *Caps anzWarpper *analyzers.AnalyzerService } // newRPCRequest returns a new rpcRequest. -func newRPCRequest(r io.Reader, remoteAddr net.Addr, caps *Caps, anz *analyzers.AnalyzerService) *rpcRequest { - var buf bytes.Buffer - done := make(chan bool) +func newRPCRequest(r io.ReadCloser, remoteAddr net.Addr, caps *Caps, anz *analyzers.AnalyzerService) *rpcRequest { return &rpcRequest{ r: r, - rw: &buf, - done: done, + rw: new(bytes.Buffer), remoteAddr: remoteAddr, caps: caps, anzWarpper: anz, @@ -373,9 +367,7 @@ func (r *rpcRequest) Read(p []byte) (n int, err error) { } func (r *rpcRequest) Write(p []byte) (n int, err error) { - n, err = r.rw.Write(p) - r.done <- true - return + return r.rw.Write(p) } func (r *rpcRequest) LocalAddr() net.Addr { @@ -386,14 +378,12 @@ func (r *rpcRequest) RemoteAddr() net.Addr { } func (r *rpcRequest) Close() error { - //r.done <- true // seem to be called sometimes before the write command finishes! - return nil + return r.r.Close() } // Call invokes the RPC request, waits for it to complete, and returns the results. func (r *rpcRequest) Call() io.Reader { - go rpc.ServeCodec(newCapsJSONCodec(r, r.caps, r.anzWarpper)) - <-r.done + rpc.ServeCodec(newCapsJSONCodec(r, r.caps, r.anzWarpper)) return r.rw } @@ -437,13 +427,14 @@ func loadTLSConfig(serverCrt, serverKey, caCert string, serverPolicy int, } func (s *Server) ServeGOBTLS(addr, serverCrt, serverKey, caCert string, - serverPolicy int, serverName string, exitChan chan bool) { + serverPolicy int, serverName string, exitChan chan<- struct{}) { s.RLock() enabled := s.rpcEnabled s.RUnlock() if !enabled { return } + defer func() { close(exitChan) }() config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName) if err != nil { return @@ -451,7 +442,6 @@ func (s *Server) ServeGOBTLS(addr, serverCrt, serverKey, caCert string, listener, err := tls.Listen(utils.TCP, addr, config) if err != nil { log.Println(fmt.Sprintf("Error: %s when listening", err)) - exitChan <- true return } @@ -479,13 +469,14 @@ func (s *Server) ServeGOBTLS(addr, serverCrt, serverKey, caCert string, } func (s *Server) ServeJSONTLS(addr, serverCrt, serverKey, caCert string, - serverPolicy int, serverName string, exitChan chan bool) { + serverPolicy int, serverName string, exitChan chan<- struct{}) { s.RLock() enabled := s.rpcEnabled s.RUnlock() if !enabled { return } + defer func() { close(exitChan) }() config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName) if err != nil { return @@ -493,7 +484,6 @@ func (s *Server) ServeJSONTLS(addr, serverCrt, serverKey, caCert string, listener, err := tls.Listen(utils.TCP, addr, config) if err != nil { log.Println(fmt.Sprintf("Error: %s when listening", err)) - exitChan <- true return } utils.Logger.Info(fmt.Sprintf("Starting CGRateS JSON TLS server at <%s>.", addr)) @@ -521,15 +511,14 @@ func (s *Server) ServeJSONTLS(addr, serverCrt, serverKey, caCert string, func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverPolicy int, serverName string, jsonRPCURL string, wsRPCURL string, - useBasicAuth bool, userList map[string]string, exitChan chan bool) { + useBasicAuth bool, userList map[string]string, exitChan chan<- struct{}) { s.RLock() enabled := s.rpcEnabled s.RUnlock() if !enabled { return } - // s.httpsMux = http.NewServeMux() - if enabled && jsonRPCURL != "" { + if jsonRPCURL != "" { s.Lock() s.httpEnabled = true s.Unlock() @@ -540,7 +529,7 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP s.httpsMux.HandleFunc(jsonRPCURL, s.handleRequest) } } - if enabled && wsRPCURL != "" { + if wsRPCURL != "" { s.Lock() s.httpEnabled = true s.Unlock() @@ -549,9 +538,7 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP rpc.ServeCodec(newCapsJSONCodec(ws, s.caps, s.anz)) }) if useBasicAuth { - s.httpsMux.HandleFunc(wsRPCURL, use(func(w http.ResponseWriter, r *http.Request) { - wsHandler.ServeHTTP(w, r) - }, basicAuth(userList))) + s.httpsMux.HandleFunc(wsRPCURL, use(wsHandler.ServeHTTP, basicAuth(userList))) } else { s.httpsMux.Handle(wsRPCURL, wsHandler) } @@ -562,6 +549,7 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP if useBasicAuth { utils.Logger.Info(" enabling basic auth") } + defer func() { close(exitChan) }() config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName) if err != nil { return @@ -575,6 +563,5 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP if err := httpSrv.ListenAndServeTLS(serverCrt, serverKey); err != nil { log.Println(fmt.Sprintf("Error: %s when listening ", err)) } - exitChan <- true return } diff --git a/dispatcherh/dispatcherh.go b/dispatcherh/dispatcherh.go index b19b968b6..c277f2271 100644 --- a/dispatcherh/dispatcherh.go +++ b/dispatcherh/dispatcherh.go @@ -33,7 +33,6 @@ func NewDispatcherHService(cfg *config.CGRConfig, return &DispatcherHostsService{ cfg: cfg, connMgr: connMgr, - stop: make(chan struct{}), } } @@ -42,16 +41,15 @@ func NewDispatcherHService(cfg *config.CGRConfig, type DispatcherHostsService struct { cfg *config.CGRConfig connMgr *engine.ConnManager - stop chan struct{} } // ListenAndServe will initialize the service -func (dhS *DispatcherHostsService) ListenAndServe() { +func (dhS *DispatcherHostsService) ListenAndServe(stopChan chan struct{}) { utils.Logger.Info("Starting DispatcherH service") for { dhS.registerHosts() select { - case <-dhS.stop: + case <-stopChan: return case <-time.After(dhS.cfg.DispatcherHCfg().RegisterInterval): } @@ -62,7 +60,6 @@ func (dhS *DispatcherHostsService) ListenAndServe() { func (dhS *DispatcherHostsService) Shutdown() { utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.DispatcherH)) dhS.unregisterHosts() - close(dhS.stop) utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.DispatcherH)) return } @@ -103,8 +100,3 @@ func (dhS *DispatcherHostsService) unregisterHosts() { } } } - -// Call only to implement rpcclient.ClientConnector interface -func (*DispatcherHostsService) Call(_ string, _, _ interface{}) error { - return utils.ErrNotImplemented -} diff --git a/dispatcherh/dispatcherh_test.go b/dispatcherh/dispatcherh_test.go index 7db8a2d08..9a82dcf3f 100644 --- a/dispatcherh/dispatcherh_test.go +++ b/dispatcherh/dispatcherh_test.go @@ -31,12 +31,6 @@ import ( "github.com/cgrates/rpcclient" ) -func TestDispatcherHostsServiceCall(t *testing.T) { - if err := new(DispatcherHostsService).Call("", nil, nil); err != utils.ErrNotImplemented { - t.Errorf("Expected error: %s ,received: %v", utils.ErrNotImplemented, err) - } -} - func TestDispatcherHostsService(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(Registar)) defer ts.Close() @@ -128,5 +122,7 @@ func TestDispatcherHostsService(t *testing.T) { ds = NewDispatcherHService(cfg, engine.NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{})) ds.Shutdown() - ds.ListenAndServe() + stopChan := make(chan struct{}) + close(stopChan) + ds.ListenAndServe(stopChan) } diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go index e664251f7..b9b061aa1 100755 --- a/dispatchers/dispatchers.go +++ b/dispatchers/dispatchers.go @@ -51,19 +51,10 @@ type DispatcherService struct { connMgr *engine.ConnManager } -// ListenAndServe will initialize the service -func (dS *DispatcherService) ListenAndServe(exitChan chan bool) error { - utils.Logger.Info("Starting Dispatcher service") - e := <-exitChan - exitChan <- e // put back for the others listening for shutdown request - return nil -} - // Shutdown is called to shutdown the service -func (dS *DispatcherService) Shutdown() error { +func (dS *DispatcherService) Shutdown() { utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.DispatcherS)) utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.DispatcherS)) - return nil } func (dS *DispatcherService) authorizeEvent(ev *utils.CGREvent, diff --git a/ees/ees.go b/ees/ees.go index 22ee5a512..318300cfa 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -59,14 +59,12 @@ type EventExporterS struct { } // ListenAndServe keeps the service alive -func (eeS *EventExporterS) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (err error) { +func (eeS *EventExporterS) ListenAndServe(stopChan, cfgRld chan struct{}) { utils.Logger.Info(fmt.Sprintf("<%s> starting <%s>", utils.CoreS, utils.EventExporterS)) for { select { - case e := <-exitChan: // global exit - eeS.Shutdown() - exitChan <- e // put back for the others listening for shutdown request + case <-stopChan: // global exit return case rld := <-cfgRld: // configuration was reloaded, destroy the cache cfgRld <- rld diff --git a/engine/attributes.go b/engine/attributes.go index 0f01de126..08c335788 100644 --- a/engine/attributes.go +++ b/engine/attributes.go @@ -46,14 +46,6 @@ type AttributeService struct { cgrcfg *config.CGRConfig } -// ListenAndServe will initialize the service -func (alS *AttributeService) ListenAndServe(exitChan chan bool) (err error) { - utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.AttributeS)) - e := <-exitChan - exitChan <- e // put back for the others listening for shutdown request - return -} - // Shutdown is called to shutdown the service func (alS *AttributeService) Shutdown() (err error) { utils.Logger.Info(fmt.Sprintf("<%s> shutdown initialized", utils.AttributeS)) diff --git a/engine/cdrs.go b/engine/cdrs.go index d2a6037be..01b9aa81f 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -113,7 +113,7 @@ type CDRServer struct { } // ListenAndServe listen for storbd reload -func (cdrS *CDRServer) ListenAndServe(stopChan chan struct{}) (err error) { +func (cdrS *CDRServer) ListenAndServe(stopChan chan struct{}) { for { select { case <-stopChan: diff --git a/engine/chargers.go b/engine/chargers.go index fd55047b3..cd02ef8d8 100644 --- a/engine/chargers.go +++ b/engine/chargers.go @@ -40,14 +40,6 @@ type ChargerService struct { connMgr *ConnManager } -// ListenAndServe will initialize the service -func (cS *ChargerService) ListenAndServe(exitChan chan bool) (err error) { - utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ChargerS)) - e := <-exitChan - exitChan <- e - return -} - // Shutdown is called to shutdown the service func (cS *ChargerService) Shutdown() (err error) { utils.Logger.Info(fmt.Sprintf("<%s> shutdown initialized", utils.ChargerS)) diff --git a/engine/resources.go b/engine/resources.go index 60805a677..dbf450987 100644 --- a/engine/resources.go +++ b/engine/resources.go @@ -336,15 +336,6 @@ type ResourceService struct { connMgr *ConnManager } -// ListenAndServe is called to start the service -func (rS *ResourceService) ListenAndServe(exitChan chan bool) error { - utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ResourceS)) - go rS.runBackup() // start backup loop - e := <-exitChan - exitChan <- e // put back for the others listening for shutdown request - return nil -} - // Shutdown is called to shutdown the service func (rS *ResourceService) Shutdown() error { utils.Logger.Info(" service shutdown initialized") diff --git a/engine/responder.go b/engine/responder.go index 2b75addaa..c5b9b2e03 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -31,7 +31,7 @@ import ( ) type Responder struct { - ExitChan chan bool + ExitChan chan<- struct{} Timeout time.Duration Timezone string MaxComputedUsage map[string]time.Duration @@ -369,7 +369,7 @@ func (rs *Responder) GetMaxSessionTimeOnAccounts(arg *utils.GetMaxSessionTimeOnA func (rs *Responder) Shutdown(arg *utils.TenantWithOpts, reply *string) (err error) { dm.DataDB().Close() cdrStorage.Close() - defer func() { rs.ExitChan <- true }() + defer func() { close(rs.ExitChan) }() *reply = "Done!" return } diff --git a/engine/routes.go b/engine/routes.go index 91051078b..885e30b09 100644 --- a/engine/routes.go +++ b/engine/routes.go @@ -137,14 +137,6 @@ type RouteService struct { connMgr *ConnManager } -// ListenAndServe will initialize the service -func (rpS *RouteService) ListenAndServe(exitChan chan bool) error { - utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.RouteS)) - e := <-exitChan - exitChan <- e // put back for the others listening for shutdown request - return nil -} - // Shutdown is called to shutdown the service func (rpS *RouteService) Shutdown() error { utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.RouteS)) diff --git a/engine/stats.go b/engine/stats.go index e32c64901..77e3890d9 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -55,15 +55,6 @@ type StatService struct { ssqMux sync.RWMutex // protects storedStatQueues } -// ListenAndServe loops keeps the service alive -func (sS *StatService) ListenAndServe(exitChan chan bool) error { - utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.StatS)) - go sS.runBackup() // start backup loop - e := <-exitChan - exitChan <- e // put back for the others listening for shutdown request - return nil -} - // Shutdown is called to shutdown the service func (sS *StatService) Shutdown() error { utils.Logger.Info(" service shutdown initialized") diff --git a/engine/thresholds.go b/engine/thresholds.go index 2fd1053d3..93c91abeb 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -156,15 +156,6 @@ type ThresholdService struct { stMux sync.RWMutex // protects storedTdIDs } -// ListenAndServe is called to start the service -func (tS *ThresholdService) ListenAndServe(exitChan chan bool) error { - utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ThresholdS)) - go tS.runBackup() // start backup loop - e := <-exitChan - exitChan <- e // put back for the others listening for shutdown request - return nil -} - // Shutdown is called to shutdown the service func (tS *ThresholdService) Shutdown() error { utils.Logger.Info(" shutdown initialized") diff --git a/ers/ers.go b/ers/ers.go index 55111061a..43b4c0c4c 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -36,7 +36,7 @@ type erEvent struct { } // NewERService instantiates the ERService -func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, stopChan chan struct{}, connMgr *engine.ConnManager) *ERService { +func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, connMgr *engine.ConnManager) *ERService { return &ERService{ cfg: cfg, rdrs: make(map[string]EventReader), @@ -45,7 +45,6 @@ func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, stopChan chan rdrEvents: make(chan *erEvent), rdrErr: make(chan error), filterS: filterS, - stopChan: stopChan, connMgr: connMgr, } } @@ -60,13 +59,12 @@ type ERService struct { rdrEvents chan *erEvent // receive here the events from readers rdrErr chan error // receive here errors which should stop the app - filterS *engine.FilterS - stopChan chan struct{} - connMgr *engine.ConnManager + filterS *engine.FilterS + connMgr *engine.ConnManager } // ListenAndServe keeps the service alive -func (erS *ERService) ListenAndServe(cfgRldChan chan struct{}) (err error) { +func (erS *ERService) ListenAndServe(stopChan, cfgRldChan chan struct{}) (err error) { for cfgIdx, rdrCfg := range erS.cfg.ERsCfg().Readers { if rdrCfg.Type == utils.META_NONE { // ignore *default reader continue @@ -86,7 +84,7 @@ func (erS *ERService) ListenAndServe(cfgRldChan chan struct{}) (err error) { fmt.Sprintf("<%s> running reader got error: <%s>", utils.ERs, err.Error())) return - case <-erS.stopChan: + case <-stopChan: erS.closeAllRdrs() return case erEv := <-erS.rdrEvents: diff --git a/ers/ers_test.go b/ers/ers_test.go index 38c14d31e..454d9d84c 100644 --- a/ers/ers_test.go +++ b/ers/ers_test.go @@ -37,8 +37,8 @@ func TestERsNewERService(t *testing.T) { stopLsn: make(map[string]chan struct{}), rdrEvents: make(chan *erEvent), rdrErr: make(chan error), - stopChan: nil} - rcv := NewERService(cfg, fltrS, nil, nil) + } + rcv := NewERService(cfg, fltrS, nil) if !reflect.DeepEqual(expected.cfg, rcv.cfg) { t.Errorf("Expecting: <%+v>, received: <%+v>", expected.cfg, rcv.cfg) @@ -50,7 +50,7 @@ func TestERsNewERService(t *testing.T) { func TestERsAddReader(t *testing.T) { cfg, _ := config.NewDefaultCGRConfig() fltrS := &engine.FilterS{} - erS := NewERService(cfg, fltrS, nil, nil) + erS := NewERService(cfg, fltrS, nil) reader := cfg.ERsCfg().Readers[0] reader.Type = utils.MetaFileCSV reader.ID = "file_reader" diff --git a/general_tests/sessions_race_test.go b/general_tests/sessions_race_test.go index 681ad535d..ca005084a 100644 --- a/general_tests/sessions_race_test.go +++ b/general_tests/sessions_race_test.go @@ -95,7 +95,7 @@ func TestSessionSRace(t *testing.T) { // resp resp = &engine.Responder{ - ExitChan: make(chan bool, 1), + ExitChan: make(chan struct{}, 1), MaxComputedUsage: cfg.RalsCfg().MaxComputedUsage, } respChan <- resp diff --git a/guardian/guardian.go b/guardian/guardian.go index d4489fa9e..244d8bae0 100644 --- a/guardian/guardian.go +++ b/guardian/guardian.go @@ -26,7 +26,7 @@ import ( "github.com/cgrates/cgrates/utils" ) -// global package variable +// Guardian is the global package variable var Guardian = &GuardianLocker{ locks: make(map[string]*itemLock), refs: make(map[string][]string)} diff --git a/loaders/loaders.go b/loaders/loaders.go index 8f7220f3c..34e656b1c 100644 --- a/loaders/loaders.go +++ b/loaders/loaders.go @@ -29,7 +29,7 @@ import ( ) func NewLoaderService(dm *engine.DataManager, ldrsCfg []*config.LoaderSCfg, - timezone string, exitChan chan bool, filterS *engine.FilterS, + timezone string, filterS *engine.FilterS, connMgr *engine.ConnManager) (ldrS *LoaderService) { ldrS = &LoaderService{ldrs: make(map[string]*Loader)} for _, ldrCfg := range ldrsCfg { diff --git a/rates/rates.go b/rates/rates.go index f2d3f5576..2cf52848b 100644 --- a/rates/rates.go +++ b/rates/rates.go @@ -44,14 +44,12 @@ type RateS struct { } // ListenAndServe keeps the service alive -func (rS *RateS) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (err error) { +func (rS *RateS) ListenAndServe(stopChan, cfgRld chan struct{}) { utils.Logger.Info(fmt.Sprintf("<%s> starting <%s>", utils.CoreS, utils.RateS)) for { select { - case e := <-exitChan: // global exit - rS.Shutdown() - exitChan <- e // put back for the others listening for shutdown request + case <-stopChan: // global exit return case rld := <-cfgRld: // configuration was reloaded cfgRld <- rld diff --git a/rates/rates_test.go b/rates/rates_test.go index c62d68b6a..e5dc6620c 100644 --- a/rates/rates_test.go +++ b/rates/rates_test.go @@ -32,15 +32,13 @@ import ( func TestListenAndServe(t *testing.T) { newRates := &RateS{} cfgRld := make(chan struct{}, 1) - exitChan := make(chan bool, 1) + exitChan := make(chan struct{}, 1) cfgRld <- struct{}{} go func() { time.Sleep(10) - exitChan <- true + exitChan <- struct{}{} }() - if err := newRates.ListenAndServe(exitChan, cfgRld); err != nil { - t.Error(err) - } + newRates.ListenAndServe(exitChan, cfgRld) } func TestNewRateS(t *testing.T) { diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index c824475c9..2c8afa271 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -34,7 +34,7 @@ type Scheduler struct { sync.RWMutex queue engine.ActionTimingPriorityList timer *time.Timer - restartLoop chan bool + restartLoop chan struct{} dm *engine.DataManager cfg *config.CGRConfig fltrS *engine.FilterS @@ -48,7 +48,7 @@ type Scheduler struct { func NewScheduler(dm *engine.DataManager, cfg *config.CGRConfig, fltrS *engine.FilterS) (s *Scheduler) { s = &Scheduler{ - restartLoop: make(chan bool), + restartLoop: make(chan struct{}), dm: dm, cfg: cfg, fltrS: fltrS, @@ -141,7 +141,7 @@ func (s *Scheduler) loadActionPlans() { s.Lock() defer s.Unlock() // limit the number of concurrent tasks - limit := make(chan bool, 10) + limit := make(chan struct{}, 10) // execute existing tasks for { task, err := s.dm.DataDB().PopTask() @@ -167,7 +167,7 @@ func (s *Scheduler) loadActionPlans() { } continue } - limit <- true + limit <- struct{}{} go func() { utils.Logger.Info(fmt.Sprintf("<%s> executing task %s on account %s", utils.SchedulerS, task.ActionsID, task.AccountID)) @@ -222,7 +222,7 @@ func (s *Scheduler) loadActionPlans() { func (s *Scheduler) restart() { if s.schedulerStarted { - s.restartLoop <- true + s.restartLoop <- struct{}{} } if s.timer != nil { s.timer.Stop() @@ -290,8 +290,8 @@ func (s *Scheduler) GetScheduledActions(fltr ArgsGetScheduledActions) (schedActi } func (s *Scheduler) Shutdown() { - s.schedulerStarted = false // disable loop on next run - s.restartLoop <- true // cancel waiting tasks + s.schedulerStarted = false // disable loop on next run + s.restartLoop <- struct{}{} // cancel waiting tasks if s.timer != nil { s.timer.Stop() } diff --git a/services/analyzers.go b/services/analyzers.go index 0366ecb76..79b3aaf07 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -31,7 +31,7 @@ import ( ) // NewAnalyzerService returns the Analyzer Service -func NewAnalyzerService(cfg *config.CGRConfig, server *cores.Server, exitChan chan bool, +func NewAnalyzerService(cfg *config.CGRConfig, server *cores.Server, exitChan chan<- struct{}, internalAnalyzerSChan chan rpcclient.ClientConnector) *AnalyzerService { return &AnalyzerService{ connChan: internalAnalyzerSChan, @@ -46,7 +46,8 @@ type AnalyzerService struct { sync.RWMutex cfg *config.CGRConfig server *cores.Server - exitChan chan bool + stopChan chan struct{} + exitChan chan<- struct{} anz *analyzers.AnalyzerService rpc *v1.AnalyzerSv1 @@ -62,12 +63,12 @@ func (anz *AnalyzerService) Start() (err error) { utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.AnalyzerS, err.Error())) return } + anz.stopChan = make(chan struct{}) go func() { - if err := anz.anz.ListenAndServe(anz.exitChan); err != nil { + if err := anz.anz.ListenAndServe(anz.stopChan); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Error: %s listening for packets", utils.AnalyzerS, err.Error())) + close(anz.exitChan) } - anz.anz.Shutdown() - anz.exitChan <- true return }() anz.server.SetAnalyzer(anz.anz) @@ -76,7 +77,6 @@ func (anz *AnalyzerService) Start() (err error) { anz.server.RpcRegister(anz.rpc) } anz.connChan <- anz.rpc - return } @@ -88,6 +88,7 @@ func (anz *AnalyzerService) Reload() (err error) { // Shutdown stops the service func (anz *AnalyzerService) Shutdown() (err error) { anz.Lock() + close(anz.stopChan) anz.server.SetAnalyzer(nil) anz.anz.Shutdown() anz.anz = nil diff --git a/services/apiers_it_test.go b/services/apiers_it_test.go index 2debae343..1fadbc470 100644 --- a/services/apiers_it_test.go +++ b/services/apiers_it_test.go @@ -41,7 +41,7 @@ func TestApiersReload(t *testing.T) { utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan bool, 2) + engineShutdown := make(chan struct{}, 2) chS := engine.NewCacheS(cfg, nil) close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles)) close(chS.GetPrecacheChannel(utils.CacheThresholds)) @@ -63,7 +63,7 @@ func TestApiersReload(t *testing.T) { apiSv2 := NewAPIerSv2Service(apiSv1, cfg, server, make(chan rpcclient.ClientConnector, 1), anz) srvMngr.AddServices(apiSv1, apiSv2, schS, tS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db, stordb) + NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db, stordb) if err = srvMngr.StartServices(); err != nil { t.Error(err) } @@ -110,5 +110,6 @@ func TestApiersReload(t *testing.T) { if apiSv2.IsRunning() { t.Errorf("Expected service to be down") } - engineShutdown <- true + close(engineShutdown) + srvMngr.ShutdownServices(10 * time.Millisecond) } diff --git a/services/apierv1.go b/services/apierv1.go index 0a5ec3559..c76ed8ae6 100644 --- a/services/apierv1.go +++ b/services/apierv1.go @@ -19,7 +19,6 @@ along with this program. If not, see package services import ( - "fmt" "runtime" "sync" @@ -70,7 +69,7 @@ type APIerSv1Service struct { api *v1.APIerSv1 connChan chan rpcclient.ClientConnector - syncStop chan struct{} + stopChan chan struct{} APIerSv1Chan chan *v1.APIerSv1 anz *AnalyzerService @@ -90,7 +89,7 @@ func (apiService *APIerSv1Service) Start() (err error) { dbchan <- datadb storDBChan := make(chan engine.StorDB, 1) - apiService.syncStop = make(chan struct{}) + apiService.stopChan = make(chan struct{}) apiService.storDB.RegisterSyncChan(storDBChan) stordb := <-storDBChan apiService.Lock() @@ -108,12 +107,7 @@ func (apiService *APIerSv1Service) Start() (err error) { StorDBChan: storDBChan, } - go func(api *v1.APIerSv1, stopChan chan struct{}) { - if err := api.ListenAndServe(stopChan); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.CDRServer, err.Error())) - // erS.exitChan <- true - } - }(apiService.api, apiService.syncStop) + go apiService.api.ListenAndServe(apiService.stopChan) runtime.Gosched() if !apiService.cfg.DispatcherSCfg().Enabled { @@ -137,7 +131,7 @@ func (apiService *APIerSv1Service) Reload() (err error) { // Shutdown stops the service func (apiService *APIerSv1Service) Shutdown() (err error) { apiService.Lock() - close(apiService.syncStop) + close(apiService.stopChan) apiService.api = nil <-apiService.connChan apiService.Unlock() diff --git a/services/apierv2.go b/services/apierv2.go index 0ed473958..588009c8c 100644 --- a/services/apierv2.go +++ b/services/apierv2.go @@ -89,9 +89,9 @@ func (api *APIerSv2Service) Reload() (err error) { // Shutdown stops the service func (api *APIerSv2Service) Shutdown() (err error) { api.Lock() - defer api.Unlock() api.api = nil <-api.connChan + api.Unlock() return } diff --git a/services/asteriskagent.go b/services/asteriskagent.go index 3a97df469..8331786df 100644 --- a/services/asteriskagent.go +++ b/services/asteriskagent.go @@ -32,7 +32,7 @@ import ( // NewAsteriskAgent returns the Asterisk Agent func NewAsteriskAgent(cfg *config.CGRConfig, - exitChan chan bool, connMgr *engine.ConnManager) servmanager.Service { + exitChan chan<- struct{}, connMgr *engine.ConnManager) servmanager.Service { return &AsteriskAgent{ cfg: cfg, exitChan: exitChan, @@ -44,7 +44,8 @@ func NewAsteriskAgent(cfg *config.CGRConfig, type AsteriskAgent struct { sync.RWMutex cfg *config.CGRConfig - exitChan chan bool + exitChan chan<- struct{} + stopChan chan struct{} smas []*agents.AsteriskAgent connMgr *engine.ConnManager @@ -59,19 +60,20 @@ func (ast *AsteriskAgent) Start() (err error) { ast.Lock() defer ast.Unlock() - listenAndServe := func(sma *agents.AsteriskAgent, exitChan chan bool) { - if err := sma.ListenAndServe(); err != nil { + listenAndServe := func(sma *agents.AsteriskAgent, stopChan chan struct{}, exitChan chan<- struct{}) { + if err := sma.ListenAndServe(stopChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> runtime error: %s!", utils.AsteriskAgent, err)) + close(exitChan) } - exitChan <- true } + ast.stopChan = make(chan struct{}) ast.smas = make([]*agents.AsteriskAgent, len(ast.cfg.AsteriskAgentCfg().AsteriskConns)) for connIdx := range ast.cfg.AsteriskAgentCfg().AsteriskConns { // Instantiate connections towards asterisk servers if ast.smas[connIdx], err = agents.NewAsteriskAgent(ast.cfg, connIdx, ast.connMgr); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.AsteriskAgent, err)) return } - go listenAndServe(ast.smas[connIdx], ast.exitChan) + go listenAndServe(ast.smas[connIdx], ast.stopChan, ast.exitChan) } return } @@ -83,6 +85,8 @@ func (ast *AsteriskAgent) Reload() (err error) { // Shutdown stops the service func (ast *AsteriskAgent) Shutdown() (err error) { + close(ast.stopChan) + ast.smas = nil return // no shutdown for the momment } diff --git a/services/attributes_it_test.go b/services/attributes_it_test.go index fdec64f08..2c7863bcb 100644 --- a/services/attributes_it_test.go +++ b/services/attributes_it_test.go @@ -41,7 +41,7 @@ func TestAttributeSReload(t *testing.T) { utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) utils.Logger.SetLogLevel(7) - engineShutdown := make(chan bool, 1) + engineShutdown := make(chan struct{}, 1) chS := engine.NewCacheS(cfg, nil) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil @@ -57,7 +57,7 @@ func TestAttributeSReload(t *testing.T) { anz) engine.NewConnManager(cfg, nil) srvMngr.AddServices(attrS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) + NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } @@ -95,5 +95,6 @@ func TestAttributeSReload(t *testing.T) { if attrS.IsRunning() { t.Errorf("Expected service to be down") } - engineShutdown <- true + close(engineShutdown) + srvMngr.ShutdownServices(10 * time.Millisecond) } diff --git a/services/cdrs.go b/services/cdrs.go index 1bb975e0d..f0c2771ad 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -66,7 +66,7 @@ type CDRServer struct { connChan chan rpcclient.ClientConnector connMgr *engine.ConnManager - syncStop chan struct{} + stopChan chan struct{} anz *AnalyzerService // storDBChan chan engine.StorDB } @@ -86,19 +86,14 @@ func (cdrService *CDRServer) Start() (err error) { dbchan <- datadb storDBChan := make(chan engine.StorDB, 1) - cdrService.syncStop = make(chan struct{}) + cdrService.stopChan = make(chan struct{}) cdrService.storDB.RegisterSyncChan(storDBChan) cdrService.Lock() defer cdrService.Unlock() cdrService.cdrS = engine.NewCDRServer(cdrService.cfg, storDBChan, datadb, filterS, cdrService.connMgr) - go func(cdrS *engine.CDRServer, stopChan chan struct{}) { - if err := cdrS.ListenAndServe(stopChan); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.CDRServer, err.Error())) - // erS.exitChan <- true - } - }(cdrService.cdrS, cdrService.syncStop) + go cdrService.cdrS.ListenAndServe(cdrService.stopChan) runtime.Gosched() utils.Logger.Info("Registering CDRS HTTP Handlers.") cdrService.cdrS.RegisterHandlersToServer(cdrService.server) @@ -123,7 +118,7 @@ func (cdrService *CDRServer) Reload() (err error) { // Shutdown stops the service func (cdrService *CDRServer) Shutdown() (err error) { cdrService.Lock() - close(cdrService.syncStop) + close(cdrService.stopChan) cdrService.cdrS = nil cdrService.rpcv1 = nil cdrService.rpcv2 = nil diff --git a/services/cdrs_it_test.go b/services/cdrs_it_test.go index 1691784c9..26b3beac6 100644 --- a/services/cdrs_it_test.go +++ b/services/cdrs_it_test.go @@ -41,7 +41,7 @@ func TestCdrsReload(t *testing.T) { utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan bool, 1) + engineShutdown := make(chan struct{}, 1) chS := engine.NewCacheS(cfg, nil) close(chS.GetPrecacheChannel(utils.CacheChargerProfiles)) @@ -75,7 +75,7 @@ func TestCdrsReload(t *testing.T) { cdrS := NewCDRServer(cfg, db, stordb, filterSChan, server, cdrsRPC, nil, anz) srvMngr.AddServices(cdrS, ralS, schS, chrS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, + NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db, stordb) if err = srvMngr.StartServices(); err != nil { t.Error(err) @@ -120,5 +120,6 @@ func TestCdrsReload(t *testing.T) { if cdrS.IsRunning() { t.Errorf("Expected service to be down") } - engineShutdown <- true + close(engineShutdown) + srvMngr.ShutdownServices(10 * time.Millisecond) } diff --git a/services/chargers_it_test.go b/services/chargers_it_test.go index 033f3a9e2..aceb03475 100644 --- a/services/chargers_it_test.go +++ b/services/chargers_it_test.go @@ -41,7 +41,7 @@ func TestChargerSReload(t *testing.T) { utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) utils.Logger.SetLogLevel(7) cfg.AttributeSCfg().Enabled = true - engineShutdown := make(chan bool, 1) + engineShutdown := make(chan struct{}, 1) chS := engine.NewCacheS(cfg, nil) close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles)) close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes)) @@ -58,7 +58,7 @@ func TestChargerSReload(t *testing.T) { engine.NewConnManager(cfg, nil) srvMngr.AddServices(attrS, chrS, NewLoaderService(cfg, db, filterSChan, server, - engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) + make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } @@ -90,5 +90,6 @@ func TestChargerSReload(t *testing.T) { if chrS.IsRunning() { t.Errorf("Expected service to be down") } - engineShutdown <- true + close(engineShutdown) + srvMngr.ShutdownServices(10 * time.Millisecond) } diff --git a/services/cores.go b/services/cores.go new file mode 100644 index 000000000..9deb741b7 --- /dev/null +++ b/services/cores.go @@ -0,0 +1,110 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + "sync" + + v1 "github.com/cgrates/cgrates/apier/v1" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/cores" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewCoreService returns the Core Service +func NewCoreService(cfg *config.CGRConfig, caps *cores.Caps, server *cores.Server, + internalCoreSChan chan rpcclient.ClientConnector, anz *AnalyzerService) servmanager.Service { + return &CoreService{ + connChan: internalCoreSChan, + cfg: cfg, + caps: caps, + server: server, + anz: anz, + } +} + +// CoreService implements Service interface +type CoreService struct { + sync.RWMutex + cfg *config.CGRConfig + server *cores.Server + caps *cores.Caps + stopChan chan struct{} + + cS *cores.CoreService + rpc *v1.CoreSv1 + connChan chan rpcclient.ClientConnector + anz *AnalyzerService +} + +// Start should handle the sercive start +func (cS *CoreService) Start() (err error) { + if cS.IsRunning() { + return utils.ErrServiceAlreadyRunning + } + + cS.Lock() + defer cS.Unlock() + utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CoreS)) + cS.stopChan = make(chan struct{}) + cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.stopChan) + cS.rpc = v1.NewCoreSv1(cS.cS) + if !cS.cfg.DispatcherSCfg().Enabled { + cS.server.RpcRegister(cS.rpc) + } + cS.connChan <- cS.anz.GetInternalCodec(cS.rpc, utils.CoreS) + return +} + +// Reload handles the change of config +func (cS *CoreService) Reload() (err error) { + return +} + +// Shutdown stops the service +func (cS *CoreService) Shutdown() (err error) { + cS.Lock() + defer cS.Unlock() + cS.cS.Shutdown() + close(cS.stopChan) + cS.cS = nil + cS.rpc = nil + <-cS.connChan + return +} + +// IsRunning returns if the service is running +func (cS *CoreService) IsRunning() bool { + cS.RLock() + defer cS.RUnlock() + return cS != nil && cS.cS != nil +} + +// ServiceName returns the service name +func (cS *CoreService) ServiceName() string { + return utils.CoreS +} + +// ShouldRun returns if the service should be running +func (cS *CoreService) ShouldRun() bool { + return true +} diff --git a/services/datadb_it_test.go b/services/datadb_it_test.go index 728af7051..8509c401f 100644 --- a/services/datadb_it_test.go +++ b/services/datadb_it_test.go @@ -42,7 +42,7 @@ func TestDataDBReload(t *testing.T) { utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) utils.Logger.SetLogLevel(7) - engineShutdown := make(chan bool, 1) + engineShutdown := make(chan struct{}, 1) chS := engine.NewCacheS(cfg, nil) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil @@ -55,7 +55,7 @@ func TestDataDBReload(t *testing.T) { anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) srvMngr.AddServices(NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz), - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) + NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } @@ -185,5 +185,6 @@ func TestDataDBReload(t *testing.T) { if db.IsRunning() { t.Errorf("Expected service to be down") } - engineShutdown <- true + close(engineShutdown) + srvMngr.ShutdownServices(10 * time.Millisecond) } diff --git a/services/diameteragent.go b/services/diameteragent.go index 9f6d037d5..a89fba17d 100644 --- a/services/diameteragent.go +++ b/services/diameteragent.go @@ -31,7 +31,7 @@ import ( // NewDiameterAgent returns the Diameter Agent func NewDiameterAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - exitChan chan bool, connMgr *engine.ConnManager) servmanager.Service { + exitChan chan<- struct{}, connMgr *engine.ConnManager) servmanager.Service { return &DiameterAgent{ cfg: cfg, filterSChan: filterSChan, @@ -45,7 +45,7 @@ type DiameterAgent struct { sync.RWMutex cfg *config.CGRConfig filterSChan chan *engine.FilterS - exitChan chan bool + exitChan chan<- struct{} da *agents.DiameterAgent connMgr *engine.ConnManager @@ -75,7 +75,7 @@ func (da *DiameterAgent) Start() (err error) { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.DiameterAgent, err)) } - da.exitChan <- true + close(da.exitChan) }() return } diff --git a/services/dispatcherh.go b/services/dispatcherh.go index 56f2067ad..9f36d1b90 100644 --- a/services/dispatcherh.go +++ b/services/dispatcherh.go @@ -27,21 +27,16 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/rpcclient" ) // NewDispatcherHostsService returns the Dispatcher Service func NewDispatcherHostsService(cfg *config.CGRConfig, server *cores.Server, - internalChan chan rpcclient.ClientConnector, connMgr *engine.ConnManager, - exitChan chan bool, - anz *AnalyzerService) servmanager.Service { + connMgr *engine.ConnManager, anz *AnalyzerService) servmanager.Service { return &DispatcherHostsService{ - connChan: internalChan, - cfg: cfg, - server: server, - connMgr: connMgr, - exitChan: exitChan, - anz: anz, + cfg: cfg, + server: server, + connMgr: connMgr, + anz: anz, } } @@ -51,12 +46,10 @@ type DispatcherHostsService struct { cfg *config.CGRConfig server *cores.Server connMgr *engine.ConnManager - exitChan chan bool + stopChan chan struct{} dspS *dispatcherh.DispatcherHostsService - // rpc *v1.DispatcherHSv1 - connChan chan rpcclient.ClientConnector - anz *AnalyzerService + anz *AnalyzerService } // Start should handle the sercive start @@ -68,9 +61,9 @@ func (dspS *DispatcherHostsService) Start() (err error) { dspS.Lock() defer dspS.Unlock() + dspS.stopChan = make(chan struct{}) dspS.dspS = dispatcherh.NewDispatcherHService(dspS.cfg, dspS.connMgr) - go dspS.dspS.ListenAndServe() - dspS.connChan <- dspS.anz.GetInternalCodec(dspS.dspS, utils.DispatcherH) + go dspS.dspS.ListenAndServe(dspS.stopChan) return } @@ -83,10 +76,9 @@ func (dspS *DispatcherHostsService) Reload() (err error) { // Shutdown stops the service func (dspS *DispatcherHostsService) Shutdown() (err error) { dspS.Lock() + close(dspS.stopChan) dspS.dspS.Shutdown() dspS.dspS = nil - // dspS.rpc = nil - <-dspS.connChan dspS.Unlock() return } diff --git a/services/dispatchers.go b/services/dispatchers.go index 7a845a3b7..ff6806906 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -164,9 +164,7 @@ func (dspS *DispatcherService) Reload() (err error) { func (dspS *DispatcherService) Shutdown() (err error) { dspS.Lock() defer dspS.Unlock() - if err = dspS.dspS.Shutdown(); err != nil { - return - } + dspS.dspS.Shutdown() dspS.dspS = nil dspS.rpc = nil <-dspS.connChan diff --git a/services/dispatchers_it_test.go b/services/dispatchers_it_test.go index c4ca98d1c..824b977b9 100644 --- a/services/dispatchers_it_test.go +++ b/services/dispatchers_it_test.go @@ -41,7 +41,7 @@ func TestDispatcherSReload(t *testing.T) { utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) utils.Logger.SetLogLevel(7) cfg.AttributeSCfg().Enabled = true - engineShutdown := make(chan bool, 1) + engineShutdown := make(chan struct{}, 1) chS := engine.NewCacheS(cfg, nil) close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles)) close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes)) @@ -60,7 +60,7 @@ func TestDispatcherSReload(t *testing.T) { engine.NewConnManager(cfg, nil) srvMngr.AddServices(attrS, srv, NewLoaderService(cfg, db, filterSChan, server, - engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) + make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } @@ -93,5 +93,6 @@ func TestDispatcherSReload(t *testing.T) { if srv.IsRunning() { t.Errorf("Expected service to be down") } - engineShutdown <- true + close(engineShutdown) + srvMngr.ShutdownServices(10 * time.Millisecond) } diff --git a/services/dnsagent.go b/services/dnsagent.go index f2c70366d..982f746bf 100644 --- a/services/dnsagent.go +++ b/services/dnsagent.go @@ -31,7 +31,7 @@ import ( // NewDNSAgent returns the DNS Agent func NewDNSAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - exitChan chan bool, connMgr *engine.ConnManager) servmanager.Service { + exitChan chan<- struct{}, connMgr *engine.ConnManager) servmanager.Service { return &DNSAgent{ cfg: cfg, filterSChan: filterSChan, @@ -45,7 +45,7 @@ type DNSAgent struct { sync.RWMutex cfg *config.CGRConfig filterSChan chan *engine.FilterS - exitChan chan bool + exitChan chan<- struct{} dns *agents.DNSAgent connMgr *engine.ConnManager @@ -73,7 +73,7 @@ func (dns *DNSAgent) Start() (err error) { go func() { if err = dns.dns.ListenAndServe(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) - dns.exitChan <- true // stop the engine here + close(dns.exitChan) // stop the engine here } }() return @@ -96,7 +96,7 @@ func (dns *DNSAgent) Reload() (err error) { go func() { if err := dns.dns.ListenAndServe(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) - dns.exitChan <- true // stop the engine here + close(dns.exitChan) // stop the engine here } }() return diff --git a/services/dnsagent_it_test.go b/services/dnsagent_it_test.go index e186d24ab..0d763ef95 100644 --- a/services/dnsagent_it_test.go +++ b/services/dnsagent_it_test.go @@ -42,7 +42,7 @@ func TestDNSAgentReload(t *testing.T) { utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan bool, 1) + engineShutdown := make(chan struct{}, 1) chS := engine.NewCacheS(cfg, nil) cacheSChan := make(chan rpcclient.ClientConnector, 1) @@ -57,7 +57,7 @@ func TestDNSAgentReload(t *testing.T) { srv := NewDNSAgent(cfg, filterSChan, engineShutdown, nil) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) + NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Fatal(err) } @@ -83,6 +83,6 @@ func TestDNSAgentReload(t *testing.T) { if srv.IsRunning() { t.Errorf("Expected service to be down") } - engineShutdown <- true - time.Sleep(10 * time.Millisecond) // wait to stop session bijsonrpc server + close(engineShutdown) + srvMngr.ShutdownServices(10 * time.Millisecond) } diff --git a/services/ees.go b/services/ees.go index 922dfb5f1..96fa02b91 100644 --- a/services/ees.go +++ b/services/ees.go @@ -34,7 +34,7 @@ import ( // NewEventExporterService constructs EventExporterService func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - connMgr *engine.ConnManager, server *cores.Server, exitChan chan bool, + connMgr *engine.ConnManager, server *cores.Server, intConnChan chan rpcclient.ClientConnector, anz *AnalyzerService) servmanager.Service { return &EventExporterService{ @@ -42,7 +42,6 @@ func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.Fil filterSChan: filterSChan, connMgr: connMgr, server: server, - exitChan: exitChan, intConnChan: intConnChan, rldChan: make(chan struct{}), anz: anz, @@ -57,9 +56,9 @@ type EventExporterService struct { filterSChan chan *engine.FilterS connMgr *engine.ConnManager server *cores.Server - exitChan chan bool intConnChan chan rpcclient.ClientConnector rldChan chan struct{} + stopChan chan struct{} eeS *ees.EventExporterS rpc *v1.EventExporterSv1 @@ -92,8 +91,8 @@ func (es *EventExporterService) Reload() (err error) { // Shutdown stops the service func (es *EventExporterService) Shutdown() (err error) { es.Lock() - defer es.Unlock() + close(es.stopChan) if err = es.eeS.Shutdown(); err != nil { return } @@ -122,12 +121,8 @@ func (es *EventExporterService) Start() (err error) { utils.EventExporterS, err)) return } - go func(eeS *ees.EventExporterS, exitChan chan bool, rldChan chan struct{}) { - if err := eeS.ListenAndServe(exitChan, rldChan); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.EventExporterS, err.Error())) - exitChan <- true - } - }(es.eeS, es.exitChan, es.rldChan) + es.stopChan = make(chan struct{}) + go es.eeS.ListenAndServe(es.stopChan, es.rldChan) es.rpc = v1.NewEventExporterSv1(es.eeS) if !es.cfg.DispatcherSCfg().Enabled { diff --git a/services/ees_it_test.go b/services/ees_it_test.go index 737c763ec..2375d4667 100644 --- a/services/ees_it_test.go +++ b/services/ees_it_test.go @@ -53,7 +53,7 @@ func TestEventExporterSReload(t *testing.T) { cfg.AttributeSCfg().Enabled = true filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan bool, 1) + engineShutdown := make(chan struct{}, 1) server := cores.NewServer(nil) srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) @@ -64,9 +64,9 @@ func TestEventExporterSReload(t *testing.T) { attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) - ees := NewEventExporterService(cfg, filterSChan, engine.NewConnManager(cfg, nil), server, engineShutdown, make(chan rpcclient.ClientConnector, 1), anz) + ees := NewEventExporterService(cfg, filterSChan, engine.NewConnManager(cfg, nil), server, make(chan rpcclient.ClientConnector, 1), anz) srvMngr.AddServices(ees, attrS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) + NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } @@ -101,5 +101,6 @@ func TestEventExporterSReload(t *testing.T) { if ees.IsRunning() { t.Errorf("Expected service to be down") } - engineShutdown <- true + close(engineShutdown) + srvMngr.ShutdownServices(10 * time.Millisecond) } diff --git a/services/ers.go b/services/ers.go index bb2ea947b..8e8ff3182 100644 --- a/services/ers.go +++ b/services/ers.go @@ -31,7 +31,7 @@ import ( // NewEventReaderService returns the EventReader Service func NewEventReaderService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - exitChan chan bool, connMgr *engine.ConnManager) servmanager.Service { + exitChan chan<- struct{}, connMgr *engine.ConnManager) servmanager.Service { return &EventReaderService{ rldChan: make(chan struct{}, 1), cfg: cfg, @@ -46,7 +46,7 @@ type EventReaderService struct { sync.RWMutex cfg *config.CGRConfig filterSChan chan *engine.FilterS - exitChan chan bool + exitChan chan<- struct{} ers *ers.ERService rldChan chan struct{} @@ -66,19 +66,19 @@ func (erS *EventReaderService) Start() (err error) { filterS := <-erS.filterSChan erS.filterSChan <- filterS - // remake tht stop chan - erS.stopChan = make(chan struct{}, 1) + // remake the stop chan + erS.stopChan = make(chan struct{}) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ERs)) // build the service - erS.ers = ers.NewERService(erS.cfg, filterS, erS.stopChan, erS.connMgr) - go func(ers *ers.ERService, rldChan chan struct{}) { - if err := ers.ListenAndServe(rldChan); err != nil { + erS.ers = ers.NewERService(erS.cfg, filterS, erS.connMgr) + go func(ers *ers.ERService, stopChan, rldChan chan struct{}) { + if err := ers.ListenAndServe(stopChan, rldChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error())) - erS.exitChan <- true + close(erS.exitChan) } - }(erS.ers, erS.rldChan) + }(erS.ers, erS.stopChan, erS.rldChan) return } diff --git a/services/ers_it_test.go b/services/ers_it_test.go index 542a6e805..422050461 100644 --- a/services/ers_it_test.go +++ b/services/ers_it_test.go @@ -52,7 +52,7 @@ func TestEventReaderSReload(t *testing.T) { cfg.SessionSCfg().Enabled = true filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan bool, 1) + engineShutdown := make(chan struct{}, 1) server := cores.NewServer(nil) srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) @@ -61,7 +61,7 @@ func TestEventReaderSReload(t *testing.T) { attrS := NewEventReaderService(cfg, filterSChan, engineShutdown, nil) engine.NewConnManager(cfg, nil) srvMngr.AddServices(attrS, sS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) + NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } @@ -87,5 +87,6 @@ func TestEventReaderSReload(t *testing.T) { if attrS.IsRunning() { t.Errorf("Expected service to be down") } - engineShutdown <- true + close(engineShutdown) + srvMngr.ShutdownServices(10 * time.Millisecond) } diff --git a/services/freeswitchagent.go b/services/freeswitchagent.go index 8e23235b0..6f012b583 100644 --- a/services/freeswitchagent.go +++ b/services/freeswitchagent.go @@ -32,7 +32,7 @@ import ( // NewFreeswitchAgent returns the Freeswitch Agent func NewFreeswitchAgent(cfg *config.CGRConfig, - exitChan chan bool, connMgr *engine.ConnManager) servmanager.Service { + exitChan chan<- struct{}, connMgr *engine.ConnManager) servmanager.Service { return &FreeswitchAgent{ cfg: cfg, exitChan: exitChan, @@ -44,7 +44,7 @@ func NewFreeswitchAgent(cfg *config.CGRConfig, type FreeswitchAgent struct { sync.RWMutex cfg *config.CGRConfig - exitChan chan bool + exitChan chan<- struct{} fS *agents.FSsessions connMgr *engine.ConnManager @@ -64,7 +64,7 @@ func (fS *FreeswitchAgent) Start() (err error) { go func() { if err := fS.fS.Connect(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.FreeSWITCHAgent, err)) - fS.exitChan <- true // stop the engine here + close(fS.exitChan) // stop the engine here } }() return @@ -81,7 +81,7 @@ func (fS *FreeswitchAgent) Reload() (err error) { go func() { if err := fS.fS.Connect(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.FreeSWITCHAgent, err)) - fS.exitChan <- true // stop the engine here + close(fS.exitChan) // stop the engine here } }() return diff --git a/services/kamailioagent.go b/services/kamailioagent.go index d5c38b589..6fe3908fd 100644 --- a/services/kamailioagent.go +++ b/services/kamailioagent.go @@ -33,7 +33,7 @@ import ( // NewKamailioAgent returns the Kamailio Agent func NewKamailioAgent(cfg *config.CGRConfig, - exitChan chan bool, connMgr *engine.ConnManager) servmanager.Service { + exitChan chan<- struct{}, connMgr *engine.ConnManager) servmanager.Service { return &KamailioAgent{ cfg: cfg, exitChan: exitChan, @@ -45,7 +45,7 @@ func NewKamailioAgent(cfg *config.CGRConfig, type KamailioAgent struct { sync.RWMutex cfg *config.CGRConfig - exitChan chan bool + exitChan chan<- struct{} kam *agents.KamailioAgent connMgr *engine.ConnManager @@ -69,7 +69,7 @@ func (kam *KamailioAgent) Start() (err error) { return } utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err)) - kam.exitChan <- true + close(kam.exitChan) } }() return @@ -90,7 +90,7 @@ func (kam *KamailioAgent) Reload() (err error) { return } utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err)) - kam.exitChan <- true + close(kam.exitChan) } }() return diff --git a/services/loaders.go b/services/loaders.go index b1878f548..c696769aa 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -33,7 +33,7 @@ import ( // NewLoaderService returns the Loader Service func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, server *cores.Server, - exitChan chan bool, internalLoaderSChan chan rpcclient.ClientConnector, + internalLoaderSChan chan rpcclient.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService) *LoaderService { return &LoaderService{ @@ -42,7 +42,6 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService, dm: dm, filterSChan: filterSChan, server: server, - exitChan: exitChan, connMgr: connMgr, stopChan: make(chan struct{}), anz: anz, @@ -56,7 +55,6 @@ type LoaderService struct { dm *DataDBService filterSChan chan *engine.FilterS server *cores.Server - exitChan chan bool stopChan chan struct{} ldrs *loaders.LoaderService @@ -82,7 +80,7 @@ func (ldrs *LoaderService) Start() (err error) { defer ldrs.Unlock() ldrs.ldrs = loaders.NewLoaderService(datadb, ldrs.cfg.LoaderCfg(), - ldrs.cfg.GeneralCfg().DefaultTimezone, ldrs.exitChan, filterS, ldrs.connMgr) + ldrs.cfg.GeneralCfg().DefaultTimezone, filterS, ldrs.connMgr) if !ldrs.ldrs.Enabled() { return @@ -147,6 +145,7 @@ func (ldrs *LoaderService) ShouldRun() bool { return ldrs.cfg.LoaderCfg().Enabled() } +// GetLoaderS returns the initialized LoaderService func (ldrs *LoaderService) GetLoaderS() *loaders.LoaderService { return ldrs.ldrs } diff --git a/services/radiusagent.go b/services/radiusagent.go index 5dc663da6..fa8340b6e 100644 --- a/services/radiusagent.go +++ b/services/radiusagent.go @@ -31,7 +31,7 @@ import ( // NewRadiusAgent returns the Radius Agent func NewRadiusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - exitChan chan bool, connMgr *engine.ConnManager) servmanager.Service { + exitChan chan<- struct{}, connMgr *engine.ConnManager) servmanager.Service { return &RadiusAgent{ cfg: cfg, filterSChan: filterSChan, @@ -45,7 +45,7 @@ type RadiusAgent struct { sync.RWMutex cfg *config.CGRConfig filterSChan chan *engine.FilterS - exitChan chan bool + exitChan chan<- struct{} rad *agents.RadiusAgent connMgr *engine.ConnManager @@ -72,7 +72,7 @@ func (rad *RadiusAgent) Start() (err error) { if err = rad.rad.ListenAndServe(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.RadiusAgent, err.Error())) } - rad.exitChan <- true + close(rad.exitChan) }() return } diff --git a/services/rals.go b/services/rals.go index 64576c82e..d381c779b 100644 --- a/services/rals.go +++ b/services/rals.go @@ -25,14 +25,13 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) // NewRalService returns the Ral Service func NewRalService(cfg *config.CGRConfig, cacheS *engine.CacheS, server *cores.Server, - internalRALsChan, internalResponderChan chan rpcclient.ClientConnector, exitChan chan bool, + internalRALsChan, internalResponderChan chan rpcclient.ClientConnector, exitChan chan<- struct{}, connMgr *engine.ConnManager, anz *AnalyzerService) *RalService { resp := NewResponderService(cfg, server, internalResponderChan, exitChan, anz) @@ -138,11 +137,6 @@ func (rals *RalService) ShouldRun() bool { } // GetResponder returns the responder service -func (rals *RalService) GetResponder() servmanager.Service { - return rals.responder -} - -// GetResponderService returns the responder service -func (rals *RalService) GetResponderService() *ResponderService { +func (rals *RalService) GetResponder() *ResponderService { return rals.responder } diff --git a/services/rals_it_test.go b/services/rals_it_test.go index 7afc09c5e..09610b0c2 100644 --- a/services/rals_it_test.go +++ b/services/rals_it_test.go @@ -41,7 +41,7 @@ func TestRalsReload(t *testing.T) { utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan bool, 1) + engineShutdown := make(chan struct{}, 1) chS := engine.NewCacheS(cfg, nil) close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles)) close(chS.GetPrecacheChannel(utils.CacheThresholds)) @@ -72,7 +72,7 @@ func TestRalsReload(t *testing.T) { make(chan rpcclient.ClientConnector, 1), engineShutdown, nil, anz) srvMngr.AddServices(ralS, schS, tS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db, stordb) + NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db, stordb) if err = srvMngr.StartServices(); err != nil { t.Error(err) } @@ -119,5 +119,6 @@ func TestRalsReload(t *testing.T) { if resp := ralS.GetResponder(); resp.IsRunning() { t.Errorf("Expected service to be down") } - engineShutdown <- true + close(engineShutdown) + srvMngr.ShutdownServices(10 * time.Millisecond) } diff --git a/services/rates.go b/services/rates.go index 14b8bf668..ef3472b36 100644 --- a/services/rates.go +++ b/services/rates.go @@ -19,7 +19,6 @@ along with this program. If not, see package services import ( - "fmt" "sync" v1 "github.com/cgrates/cgrates/apier/v1" @@ -40,7 +39,7 @@ func NewRateService( cfg *config.CGRConfig, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, dmS *DataDBService, - server *cores.Server, exitChan chan bool, + server *cores.Server, intConnChan chan rpcclient.ClientConnector, anz *AnalyzerService) servmanager.Service { return &RateService{ @@ -49,7 +48,6 @@ func NewRateService( filterSChan: filterSChan, dmS: dmS, server: server, - exitChan: exitChan, intConnChan: intConnChan, rldChan: make(chan struct{}), anz: anz, @@ -65,9 +63,9 @@ type RateService struct { dmS *DataDBService cacheS *engine.CacheS server *cores.Server - exitChan chan bool - rldChan chan struct{} + rldChan chan struct{} + stopChan chan struct{} rateS *rates.RateS rpc *v1.RateSv1 @@ -102,6 +100,7 @@ func (rs *RateService) Reload() (err error) { func (rs *RateService) Shutdown() (err error) { rs.Lock() defer rs.Unlock() + close(rs.stopChan) if err = rs.rateS.Shutdown(); err != nil { return } @@ -130,12 +129,8 @@ func (rs *RateService) Start() (err error) { rs.rateS = rates.NewRateS(rs.cfg, fltrS, dm) rs.Unlock() - go func(rtS *rates.RateS, exitChan chan bool, rldChan chan struct{}) { - if err := rtS.ListenAndServe(exitChan, rldChan); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.EventExporterS, err.Error())) - exitChan <- true - } - }(rs.rateS, rs.exitChan, rs.rldChan) + rs.stopChan = make(chan struct{}) + go rs.rateS.ListenAndServe(rs.stopChan, rs.rldChan) rs.rpc = v1.NewRateSv1(rs.rateS) if !rs.cfg.DispatcherSCfg().Enabled { diff --git a/services/rates_it_test.go b/services/rates_it_test.go index c598af80a..83cc26537 100644 --- a/services/rates_it_test.go +++ b/services/rates_it_test.go @@ -42,7 +42,7 @@ func TestRateSReload(t *testing.T) { utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan bool, 1) + engineShutdown := make(chan struct{}, 1) server := cores.NewServer(nil) srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) @@ -51,9 +51,9 @@ func TestRateSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheRateProfilesFilterIndexes)) close(chS.GetPrecacheChannel(utils.CacheRateFilterIndexes)) anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) - rS := NewRateService(cfg, chS, filterSChan, db, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), anz) + rS := NewRateService(cfg, chS, filterSChan, db, server, make(chan rpcclient.ClientConnector, 1), anz) srvMngr.AddServices(rS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) + NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } @@ -79,5 +79,6 @@ func TestRateSReload(t *testing.T) { if rS.IsRunning() { t.Errorf("Expected service to be down") } - engineShutdown <- true + close(engineShutdown) + srvMngr.ShutdownServices(10 * time.Millisecond) } diff --git a/services/resources_it_test.go b/services/resources_it_test.go index 7fed5e2d7..f6a3631b1 100644 --- a/services/resources_it_test.go +++ b/services/resources_it_test.go @@ -44,7 +44,7 @@ func TestResourceSReload(t *testing.T) { cfg.ThresholdSCfg().Enabled = true filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan bool, 1) + engineShutdown := make(chan struct{}, 1) chS := engine.NewCacheS(cfg, nil) close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles)) close(chS.GetPrecacheChannel(utils.CacheThresholds)) @@ -60,7 +60,7 @@ func TestResourceSReload(t *testing.T) { reS := NewResourceService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) engine.NewConnManager(cfg, nil) srvMngr.AddServices(tS, reS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) + NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } @@ -92,5 +92,6 @@ func TestResourceSReload(t *testing.T) { if reS.IsRunning() { t.Errorf("Expected service to be down") } - engineShutdown <- true + close(engineShutdown) + srvMngr.ShutdownServices(10 * time.Millisecond) } diff --git a/services/responders.go b/services/responders.go index ae12a5203..d815cb410 100644 --- a/services/responders.go +++ b/services/responders.go @@ -31,7 +31,7 @@ import ( // NewResponderService returns the Resonder Service func NewResponderService(cfg *config.CGRConfig, server *cores.Server, internalRALsChan chan rpcclient.ClientConnector, - exitChan chan bool, anz *AnalyzerService) *ResponderService { + exitChan chan<- struct{}, anz *AnalyzerService) *ResponderService { return &ResponderService{ connChan: internalRALsChan, cfg: cfg, @@ -42,11 +42,12 @@ func NewResponderService(cfg *config.CGRConfig, server *cores.Server, } // ResponderService implements Service interface +// this service is manged by the RALs as a component type ResponderService struct { sync.RWMutex cfg *config.CGRConfig server *cores.Server - exitChan chan bool + exitChan chan<- struct{} resp *engine.Responder connChan chan rpcclient.ClientConnector diff --git a/services/routes_it_test.go b/services/routes_it_test.go index 8ac6d4fc7..a821887cf 100644 --- a/services/routes_it_test.go +++ b/services/routes_it_test.go @@ -43,7 +43,7 @@ func TestSupplierSReload(t *testing.T) { cfg.StatSCfg().Enabled = true filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan bool, 1) + engineShutdown := make(chan struct{}, 1) chS := engine.NewCacheS(cfg, nil) close(chS.GetPrecacheChannel(utils.CacheRouteProfiles)) close(chS.GetPrecacheChannel(utils.CacheRouteFilterIndexes)) @@ -58,7 +58,7 @@ func TestSupplierSReload(t *testing.T) { supS := NewRouteService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) engine.NewConnManager(cfg, nil) srvMngr.AddServices(supS, sts, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) + NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } @@ -90,5 +90,6 @@ func TestSupplierSReload(t *testing.T) { if supS.IsRunning() { t.Errorf("Expected service to be down") } - engineShutdown <- true + close(engineShutdown) + srvMngr.ShutdownServices(10 * time.Millisecond) } diff --git a/services/schedulers_it_test.go b/services/schedulers_it_test.go index 0d8c7390a..592e4ed63 100644 --- a/services/schedulers_it_test.go +++ b/services/schedulers_it_test.go @@ -39,7 +39,7 @@ func TestSchedulerSReload(t *testing.T) { } utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) utils.Logger.SetLogLevel(7) - engineShutdown := make(chan bool, 1) + engineShutdown := make(chan struct{}, 1) chS := engine.NewCacheS(cfg, nil) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil @@ -51,7 +51,7 @@ func TestSchedulerSReload(t *testing.T) { schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) engine.NewConnManager(cfg, nil) srvMngr.AddServices(schS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) + NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } @@ -83,5 +83,6 @@ func TestSchedulerSReload(t *testing.T) { if schS.IsRunning() { t.Errorf("Expected service to be down") } - engineShutdown <- true + close(engineShutdown) + srvMngr.ShutdownServices(10 * time.Millisecond) } diff --git a/services/sessions.go b/services/sessions.go index 098029dca..754e08026 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -36,7 +36,7 @@ import ( // NewSessionService returns the Session Service func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, server *cores.Server, internalChan chan rpcclient.ClientConnector, - exitChan chan bool, connMgr *engine.ConnManager, + exitChan chan<- struct{}, connMgr *engine.ConnManager, caps *cores.Caps, anz *AnalyzerService) servmanager.Service { return &SessionService{ @@ -57,7 +57,8 @@ type SessionService struct { cfg *config.CGRConfig dm *DataDBService server *cores.Server - exitChan chan bool + exitChan chan<- struct{} + stopChan chan struct{} sm *sessions.SessionS rpc *v1.SMGenericV1 @@ -87,7 +88,8 @@ func (smg *SessionService) Start() (err error) { smg.sm = sessions.NewSessionS(smg.cfg, datadb, smg.connMgr) //start sync session in a separate gorutine - go smg.sm.ListenAndServe(smg.exitChan) + smg.stopChan = make(chan struct{}) + go smg.sm.ListenAndServe(smg.stopChan) // Pass internal connection via BiRPCClient smg.connChan <- smg.anz.GetInternalCodec(smg.sm, utils.SessionS) // Register RPC handler @@ -114,7 +116,7 @@ func (smg *SessionService) Start() (err error) { smg.Lock() smg.bircpEnabled = false smg.Unlock() - smg.exitChan <- true + close(smg.exitChan) } }() } @@ -130,6 +132,7 @@ func (smg *SessionService) Reload() (err error) { func (smg *SessionService) Shutdown() (err error) { smg.Lock() defer smg.Unlock() + close(smg.stopChan) if err = smg.sm.Shutdown(); err != nil { return } diff --git a/services/sessions_it_test.go b/services/sessions_it_test.go index 31f1b29c5..dc9d28a58 100644 --- a/services/sessions_it_test.go +++ b/services/sessions_it_test.go @@ -44,7 +44,7 @@ func TestSessionSReload(t *testing.T) { utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan bool, 1) + engineShutdown := make(chan struct{}, 1) chS := engine.NewCacheS(cfg, nil) close(chS.GetPrecacheChannel(utils.CacheChargerProfiles)) @@ -83,7 +83,7 @@ func TestSessionSReload(t *testing.T) { srv := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), engineShutdown, nil, nil, anz) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, chrS, schS, ralS, cdrS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db, stordb) + NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db, stordb) if err = srvMngr.StartServices(); err != nil { t.Error(err) } @@ -118,5 +118,6 @@ func TestSessionSReload(t *testing.T) { if srv.IsRunning() { t.Errorf("Expected service to be down") } - engineShutdown <- true + close(engineShutdown) + srvMngr.ShutdownServices(10 * time.Millisecond) } diff --git a/services/sipagent.go b/services/sipagent.go index 328fc7605..94e7bef88 100644 --- a/services/sipagent.go +++ b/services/sipagent.go @@ -31,7 +31,7 @@ import ( // NewSIPAgent returns the sip Agent func NewSIPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - exitChan chan bool, connMgr *engine.ConnManager) servmanager.Service { + exitChan chan<- struct{}, connMgr *engine.ConnManager) servmanager.Service { return &SIPAgent{ cfg: cfg, filterSChan: filterSChan, @@ -45,7 +45,7 @@ type SIPAgent struct { sync.RWMutex cfg *config.CGRConfig filterSChan chan *engine.FilterS - exitChan chan bool + exitChan chan<- struct{} sip *agents.SIPAgent connMgr *engine.ConnManager @@ -74,7 +74,7 @@ func (sip *SIPAgent) Start() (err error) { go func() { if err = sip.sip.ListenAndServe(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.SIPAgent, err.Error())) - sip.exitChan <- true // stop the engine here + close(sip.exitChan) // stop the engine here } }() return @@ -94,7 +94,7 @@ func (sip *SIPAgent) Reload() (err error) { go func() { if err := sip.sip.ListenAndServe(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.SIPAgent, err.Error())) - sip.exitChan <- true // stop the engine here + close(sip.exitChan) // stop the engine here } }() return diff --git a/services/sipagent_it_test.go b/services/sipagent_it_test.go index 0997dc2bf..f3094e608 100644 --- a/services/sipagent_it_test.go +++ b/services/sipagent_it_test.go @@ -42,7 +42,7 @@ func TestSIPAgentReload(t *testing.T) { utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan bool, 1) + engineShutdown := make(chan struct{}, 1) chS := engine.NewCacheS(cfg, nil) cacheSChan := make(chan rpcclient.ClientConnector, 1) @@ -57,7 +57,7 @@ func TestSIPAgentReload(t *testing.T) { srv := NewSIPAgent(cfg, filterSChan, engineShutdown, nil) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) + NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Fatal(err) } @@ -83,6 +83,6 @@ func TestSIPAgentReload(t *testing.T) { if srv.IsRunning() { t.Errorf("Expected service to be down") } - engineShutdown <- true - time.Sleep(10 * time.Millisecond) // wait to stop session bijsonrpc server + close(engineShutdown) + srvMngr.ShutdownServices(10 * time.Millisecond) } diff --git a/services/stats_it_test.go b/services/stats_it_test.go index 81fc69f03..ff5fc19ce 100644 --- a/services/stats_it_test.go +++ b/services/stats_it_test.go @@ -44,7 +44,7 @@ func TestStatSReload(t *testing.T) { cfg.ThresholdSCfg().Enabled = true filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan bool, 1) + engineShutdown := make(chan struct{}, 1) chS := engine.NewCacheS(cfg, nil) close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles)) close(chS.GetPrecacheChannel(utils.CacheThresholds)) @@ -60,7 +60,7 @@ func TestStatSReload(t *testing.T) { sS := NewStatService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) engine.NewConnManager(cfg, nil) srvMngr.AddServices(tS, sS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) + NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } @@ -92,5 +92,6 @@ func TestStatSReload(t *testing.T) { if sS.IsRunning() { t.Errorf("Expected service to be down") } - engineShutdown <- true + close(engineShutdown) + srvMngr.ShutdownServices(10 * time.Millisecond) } diff --git a/services/thresholds_it_test.go b/services/thresholds_it_test.go index 02c7ad665..766895cce 100644 --- a/services/thresholds_it_test.go +++ b/services/thresholds_it_test.go @@ -42,7 +42,7 @@ func TestThresholdSReload(t *testing.T) { utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan bool, 1) + engineShutdown := make(chan struct{}, 1) chS := engine.NewCacheS(cfg, nil) close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles)) close(chS.GetPrecacheChannel(utils.CacheThresholds)) @@ -54,7 +54,7 @@ func TestThresholdSReload(t *testing.T) { tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) engine.NewConnManager(cfg, nil) srvMngr.AddServices(tS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) + NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } @@ -86,5 +86,6 @@ func TestThresholdSReload(t *testing.T) { if tS.IsRunning() { t.Errorf("Expected service to be down") } - engineShutdown <- true + close(engineShutdown) + srvMngr.ShutdownServices(10 * time.Millisecond) } diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index dab6f7983..49cf9a318 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -24,6 +24,7 @@ import ( "reflect" "strings" "sync" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -32,7 +33,7 @@ import ( ) // NewServiceManager returns a service manager -func NewServiceManager(cfg *config.CGRConfig, engineShutdown chan bool) *ServiceManager { +func NewServiceManager(cfg *config.CGRConfig, engineShutdown chan<- struct{}) *ServiceManager { sm := &ServiceManager{ cfg: cfg, engineShutdown: engineShutdown, @@ -45,10 +46,12 @@ func NewServiceManager(cfg *config.CGRConfig, engineShutdown chan bool) *Service type ServiceManager struct { sync.RWMutex // lock access to any shared data cfg *config.CGRConfig - engineShutdown chan bool + engineShutdown chan<- struct{} + stopReload chan struct{} subsystems map[string]Service } +// Call . func (srvMngr *ServiceManager) Call(serviceMethod string, args interface{}, reply interface{}) error { parts := strings.Split(serviceMethod, ".") if len(parts) != 2 { @@ -146,6 +149,7 @@ func (srvMngr *ServiceManager) GetConfig() *config.CGRConfig { // StartServices starts all enabled services func (srvMngr *ServiceManager) StartServices() (err error) { + srvMngr.stopReload = make(chan struct{}) go srvMngr.handleReload() for _, service := range srvMngr.subsystems { if service.ShouldRun() && !service.IsRunning() { @@ -155,7 +159,7 @@ func (srvMngr *ServiceManager) StartServices() (err error) { return } utils.Logger.Err(fmt.Sprintf("<%s> failed to start %s because: %s", utils.ServiceManager, srv.ServiceName(), err)) - srvMngr.engineShutdown <- true + close(srvMngr.engineShutdown) } }(service) } @@ -178,17 +182,7 @@ func (srvMngr *ServiceManager) AddServices(services ...Service) { func (srvMngr *ServiceManager) handleReload() { for { select { - case ext := <-srvMngr.engineShutdown: - srvMngr.engineShutdown <- ext - for srviceName, srv := range srvMngr.subsystems { // gracefully stop all running subsystems - if !srv.IsRunning() { - continue - } - if err := srv.Shutdown(); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to shutdown subsystem <%s> because: %s", - utils.ServiceManager, srviceName, err)) - } - } + case <-srvMngr.stopReload: return case <-srvMngr.GetConfig().GetReloadChan(config.ATTRIBUTE_JSN): go srvMngr.reloadService(utils.AttributeS) @@ -264,20 +258,20 @@ func (srvMngr *ServiceManager) reloadService(srviceName string) (err error) { if srv.IsRunning() { if err = srv.Reload(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to reload <%s> err <%s>", utils.ServiceManager, srv.ServiceName(), err)) - srvMngr.engineShutdown <- true + close(srvMngr.engineShutdown) return // stop if we encounter an error } } else { if err = srv.Start(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to start <%s> err <%s>", utils.ServiceManager, srv.ServiceName(), err)) - srvMngr.engineShutdown <- true + close(srvMngr.engineShutdown) return // stop if we encounter an error } } } else if srv.IsRunning() { if err = srv.Shutdown(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to stop service <%s> err <%s>", utils.ServiceManager, srv.ServiceName(), err)) - srvMngr.engineShutdown <- true + close(srvMngr.engineShutdown) return // stop if we encounter an error } } @@ -292,6 +286,36 @@ func (srvMngr *ServiceManager) GetService(subsystem string) (srv Service) { return } +// ShutdownServices will stop all services +func (srvMngr *ServiceManager) ShutdownServices(timeout time.Duration) { + close(srvMngr.stopReload) + var wg sync.WaitGroup + for _, srv := range srvMngr.subsystems { // gracefully stop all running subsystems + if !srv.IsRunning() { + continue + } + wg.Add(1) + go func(srv Service) { + if err := srv.Shutdown(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to shutdown subsystem <%s> because: %s", + utils.ServiceManager, srv.ServiceName(), err)) + } + wg.Done() + }(srv) + } + c := make(chan struct{}) + go func() { + defer close(c) + wg.Wait() + }() + select { + case <-c: + case <-time.After(timeout): + utils.Logger.Err(fmt.Sprintf("<%s> Failed to shutdown all subsystems in the given time", + utils.ServiceManager)) + } +} + // Service interface that describes what functions should a service implement type Service interface { // Start should handle the sercive start diff --git a/sessions/sessions.go b/sessions/sessions.go index e9f5c466b..ec64590be 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -95,24 +95,18 @@ type SessionS struct { } // ListenAndServe starts the service and binds it to the listen loop -func (sS *SessionS) ListenAndServe(exitChan chan bool) (err error) { +func (sS *SessionS) ListenAndServe(stopChan chan struct{}) { utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.SessionS)) if sS.cgrCfg.SessionSCfg().ChannelSyncInterval != 0 { - go func() { - for { // Schedule sync channels to run repeately - select { - case e := <-exitChan: - exitChan <- e - break - case <-time.After(sS.cgrCfg.SessionSCfg().ChannelSyncInterval): - sS.syncSessions() - } + for { // Schedule sync channels to run repeately + select { + case <-stopChan: + return + case <-time.After(sS.cgrCfg.SessionSCfg().ChannelSyncInterval): + sS.syncSessions() } - - }() + } } - e := <-exitChan // block here until shutdown request - exitChan <- e // put back for the others listening for shutdown request return }