From 9de6a2d172206960c0bc08641d73e0b2f1dec196 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Fri, 27 Nov 2020 15:35:27 +0200 Subject: [PATCH] Updated shutdown channel handling --- analyzers/analyzers.go | 4 +- analyzers/analyzers_test.go | 6 +- cmd/cgr-engine/cgr-engine.go | 174 ++++++++++++++++------------ cores/server.go | 42 ++++--- engine/caps_test.go | 16 +-- engine/responder.go | 4 +- general_tests/sessions_race_test.go | 2 +- loaders/loader.go | 14 +-- loaders/loaders.go | 4 +- rates/rates_test.go | 6 +- services/analyzers.go | 8 +- services/apiers_it_test.go | 12 +- services/asteriskagent.go | 16 +-- services/asteriskagent_it_test.go | 16 +-- services/attributes_it_test.go | 12 +- services/cdrs_it_test.go | 14 ++- services/chargers_it_test.go | 12 +- services/datadb_it_test.go | 12 +- services/diameteragent.go | 8 +- services/diameteragent_it_test.go | 16 +-- services/dispatchers_it_test.go | 12 +- services/dnsagent.go | 10 +- services/dnsagent_it_test.go | 16 +-- services/ees_it_test.go | 12 +- services/ers.go | 8 +- services/ers_it_test.go | 16 +-- services/freeswitchagent.go | 16 +-- services/freeswitchagent_it_test.go | 32 +++-- services/globalvars.go | 2 + services/httpagent.go | 10 +- services/kamailioagent.go | 22 ++-- services/kamailioagent_it_test.go | 31 +++-- services/radiusagent.go | 8 +- services/radiusagent_it_test.go | 18 +-- services/rals.go | 4 +- services/rals_it_test.go | 16 +-- services/rates_it_test.go | 14 ++- services/resources_it_test.go | 14 ++- services/responders.go | 12 +- services/routes_it_test.go | 14 ++- services/schedulers_it_test.go | 14 ++- services/sessions.go | 8 +- services/sessions_it_test.go | 18 +-- services/sipagent.go | 10 +- services/sipagent_it_test.go | 18 +-- services/stats_it_test.go | 14 ++- services/thresholds_it_test.go | 14 ++- servmanager/servmanager.go | 76 +++++------- utils/syncedchan.go | 43 +++++++ utils/syncedchan_test.go | 41 +++++++ 50 files changed, 543 insertions(+), 398 deletions(-) create mode 100644 utils/syncedchan.go create mode 100644 utils/syncedchan_test.go diff --git a/analyzers/analyzers.go b/analyzers/analyzers.go index 8db5a5887..16ccb5797 100755 --- a/analyzers/analyzers.go +++ b/analyzers/analyzers.go @@ -95,14 +95,14 @@ func (aS *AnalyzerService) deleteHits(hits search.DocumentMatchCollection) (err } // ListenAndServe will initialize the service -func (aS *AnalyzerService) ListenAndServe(exitChan <-chan struct{}) (err error) { +func (aS *AnalyzerService) ListenAndServe(stopChan <-chan struct{}) (err error) { utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.AnalyzerS)) if err = aS.clenaUp(); err != nil { // clean up the data at the system start return } for { select { - case <-exitChan: + case <-stopChan: return case <-time.After(aS.cfg.AnalyzerSCfg().CleanupInterval): if err = aS.clenaUp(); err != nil { diff --git a/analyzers/analyzers_test.go b/analyzers/analyzers_test.go index 35e5a710e..100992f3f 100644 --- a/analyzers/analyzers_test.go +++ b/analyzers/analyzers_test.go @@ -57,9 +57,9 @@ func TestNewAnalyzerService(t *testing.T) { if err = anz.initDB(); err != nil { t.Fatal(err) } - exitChan := make(chan struct{}, 1) - exitChan <- struct{}{} - if err := anz.ListenAndServe(exitChan); err != nil { + shdChan := make(chan struct{}, 1) + shdChan <- struct{}{} + if err := anz.ListenAndServe(shdChan); err != nil { t.Fatal(err) } anz.db.Close() diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index cae408bcb..a6d1e35a2 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -21,6 +21,7 @@ package main import ( "flag" "fmt" + "io" "log" "os" "os/signal" @@ -29,6 +30,7 @@ import ( "runtime/pprof" "strconv" "strings" + "sync" "syscall" "time" @@ -75,14 +77,14 @@ func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS, // initCacheS inits the CacheS and starts precaching as well as populating internal channel for RPC conns func initCacheS(internalCacheSChan chan rpcclient.ClientConnector, - server *cores.Server, dm *engine.DataManager, exitChan chan<- struct{}, + server *cores.Server, dm *engine.DataManager, shdChan *utils.SyncedChan, anz *services.AnalyzerService, cpS *engine.CapsStats) (chS *engine.CacheS) { chS = engine.NewCacheS(cfg, dm, cpS) go func() { if err := chS.Precache(); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> could not init, error: %s", utils.CacheS, err.Error())) - close(exitChan) + shdChan.CloseOnce() } }() @@ -143,7 +145,7 @@ func startRPC(server *cores.Server, internalRaterChan, internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan, internalLoaderSChan, internalRALsv1Chan, internalCacheSChan, internalEEsChan, internalRateSChan chan rpcclient.ClientConnector, - stopChan <-chan struct{}, exitChan chan<- struct{}) { + shdChan *utils.SyncedChan) { if !cfg.DispatcherSCfg().Enabled { select { // Any of the rpc methods will unlock listening to rpc requests case resp := <-internalRaterChan: @@ -176,27 +178,27 @@ func startRPC(server *cores.Server, internalRaterChan, internalEEsChan <- eeS case rateS := <-internalRateSChan: internalRateSChan <- rateS - case <-stopChan: + case <-shdChan.Done(): return } } else { select { case dispatcherS := <-internalDispatcherSChan: internalDispatcherSChan <- dispatcherS - case <-stopChan: + case <-shdChan.Done(): return } } - go server.ServeJSON(cfg.ListenCfg().RPCJSONListen, exitChan) - go server.ServeGOB(cfg.ListenCfg().RPCGOBListen, exitChan) + go server.ServeJSON(cfg.ListenCfg().RPCJSONListen, shdChan) + go server.ServeGOB(cfg.ListenCfg().RPCGOBListen, shdChan) go server.ServeHTTP( cfg.ListenCfg().HTTPListen, cfg.HTTPCfg().HTTPJsonRPCURL, cfg.HTTPCfg().HTTPWSURL, cfg.HTTPCfg().HTTPUseBasicAuth, cfg.HTTPCfg().HTTPAuthUsers, - exitChan, + shdChan, ) if (len(cfg.ListenCfg().RPCGOBTLSListen) != 0 || len(cfg.ListenCfg().RPCJSONTLSListen) != 0 || @@ -214,7 +216,7 @@ func startRPC(server *cores.Server, internalRaterChan, cfg.TLSCfg().CaCertificate, cfg.TLSCfg().ServerPolicy, cfg.TLSCfg().ServerName, - exitChan, + shdChan, ) } if cfg.ListenCfg().RPCJSONTLSListen != "" { @@ -225,7 +227,7 @@ func startRPC(server *cores.Server, internalRaterChan, cfg.TLSCfg().CaCertificate, cfg.TLSCfg().ServerPolicy, cfg.TLSCfg().ServerName, - exitChan, + shdChan, ) } if cfg.ListenCfg().HTTPTLSListen != "" { @@ -240,7 +242,7 @@ func startRPC(server *cores.Server, internalRaterChan, cfg.HTTPCfg().HTTPWSURL, cfg.HTTPCfg().HTTPUseBasicAuth, cfg.HTTPCfg().HTTPAuthUsers, - exitChan, + shdChan, ) } } @@ -273,36 +275,46 @@ func memProfFile(memProfPath string) bool { return true } -func memProfiling(memProfDir string, interval time.Duration, nrFiles int, exitChan chan<- struct{}) { +func memProfiling(memProfDir string, interval time.Duration, nrFiles int, shdWg *sync.WaitGroup, shdChan *utils.SyncedChan) { + tm := time.NewTimer(interval) for i := 1; ; i++ { + select { + case <-shdChan.Done(): + tm.Stop() + shdWg.Done() + return + case <-tm.C: + } time.Sleep(interval) memPath := path.Join(memProfDir, fmt.Sprintf("mem%v.prof", i)) if !memProfFile(memPath) { - close(exitChan) + shdChan.CloseOnce() + shdWg.Done() + return } if i%nrFiles == 0 { i = 0 // reset the counting } + tm.Reset(interval) } } -func cpuProfiling(cpuProfDir string, stopChan, doneChan chan struct{}, exitChan chan<- struct{}) { +func startCPUProfiling(cpuProfDir string) (f io.WriteCloser, err error) { cpuPath := path.Join(cpuProfDir, "cpu.prof") - f, err := os.Create(cpuPath) - defer func() { close(doneChan) }() - if err != nil { + if f, err = os.Create(cpuPath); err != nil { utils.Logger.Crit(fmt.Sprintf("could not create cpu profile file: %s", err)) - close(exitChan) return } pprof.StartCPUProfile(f) - <-stopChan - pprof.StopCPUProfile() - f.Close() - + return } -func singnalHandler(stopChan <-chan struct{}, exitChan chan<- struct{}) { +func stopCPUProfiling(f io.Closer) { + pprof.StopCPUProfile() + f.Close() +} + +func singnalHandler(shdWg *sync.WaitGroup, shdChan *utils.SyncedChan) { shutdownSignal := make(chan os.Signal) reloadSignal := make(chan os.Signal) signal.Notify(shutdownSignal, os.Interrupt, @@ -310,10 +322,12 @@ func singnalHandler(stopChan <-chan struct{}, exitChan chan<- struct{}) { signal.Notify(reloadSignal, syscall.SIGHUP) for { select { - case <-stopChan: + case <-shdChan.Done(): + shdWg.Done() return case <-shutdownSignal: - close(exitChan) + shdChan.CloseOnce() + shdWg.Done() return case <-reloadSignal: // do it in it's own gorutine in order to not block the signal handler with the reload functionality @@ -333,10 +347,10 @@ func singnalHandler(stopChan <-chan struct{}, exitChan chan<- struct{}) { } func runPreload(loader *services.LoaderService, internalLoaderSChan chan rpcclient.ClientConnector, - exitChan chan<- struct{}) { + shdChan *utils.SyncedChan) { if !cfg.LoaderCfg().Enabled() { utils.Logger.Err(fmt.Sprintf("<%s> not enabled but required by preload mechanism", utils.LoaderS)) - close(exitChan) + shdChan.CloseOnce() return } @@ -351,7 +365,7 @@ func runPreload(loader *services.LoaderService, internalLoaderSChan chan rpcclie StopOnError: true, }, &reply); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> preload failed on loadID <%s> , err: <%s>", utils.LoaderS, loaderID, err.Error())) - close(exitChan) + shdChan.CloseOnce() return } } @@ -378,18 +392,22 @@ func main() { runtime.GOMAXPROCS(1) // Having multiple cpus may slow down computing due to CPU management, to be reviewed in future Go releases } - exitChan := make(chan struct{}) - signStop := make(chan struct{}) - rpcStop := make(chan struct{}) - go singnalHandler(signStop, exitChan) + shdWg := new(sync.WaitGroup) + shdChan := utils.NewSyncedChan() + + shdWg.Add(1) + go singnalHandler(shdWg, shdChan) if *memProfDir != utils.EmptyString { - go memProfiling(*memProfDir, *memProfInterval, *memProfNrFiles, exitChan) + shdWg.Add(1) + go memProfiling(*memProfDir, *memProfInterval, *memProfNrFiles, shdWg, shdChan) } - cpuProfChanStop := make(chan struct{}) - cpuProfChanDone := make(chan struct{}) if *cpuProfDir != utils.EmptyString { - go cpuProfiling(*cpuProfDir, cpuProfChanStop, cpuProfChanDone, exitChan) + f, err := startCPUProfiling(*cpuProfDir) + if err != nil { + return + } + defer stopCPUProfiling(f) } if *scheduledShutdown != utils.EmptyString { @@ -397,27 +415,36 @@ func main() { if err != nil { log.Fatal(err) } + shdWg.Add(1) go func() { // Schedule shutdown - time.Sleep(shutdownDur) - close(exitChan) - return + tm := time.NewTimer(shutdownDur) + select { + case <-tm.C: + shdChan.CloseOnce() + case <-shdChan.Done(): + tm.Stop() + } + shdWg.Done() }() } + // Init config cfg, err = config.NewCGRConfigFromPath(*cfgPath) if err != nil { log.Fatalf("Could not parse config: <%s>", err.Error()) return } - if *nodeID != utils.EmptyString { - cfg.GeneralCfg().NodeID = *nodeID - } if *checkConfig { if err := cfg.CheckConfigSanity(); err != nil { fmt.Println(err) } return } + + if *nodeID != utils.EmptyString { + cfg.GeneralCfg().NodeID = *nodeID + } + config.SetCgrConfig(cfg) // Share the config object // init syslog @@ -496,11 +523,13 @@ func main() { utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers): internalDispatcherSChan, }) gvService := services.NewGlobalVarS(cfg) + shdWg.Add(1) if err = gvService.Start(); err != nil { return } dmService := services.NewDataDBService(cfg, connManager) if dmService.ShouldRun() { // Some services can run without db, ie: ERs + shdWg.Add(1) if err = dmService.Start(); err != nil { return } @@ -508,15 +537,12 @@ func main() { storDBService := services.NewStorDBService(cfg) if storDBService.ShouldRun() { // Some services can run without db, ie: ERs + shdWg.Add(1) if err = storDBService.Start(); err != nil { return } } - // Done initing DBs - engine.SetRoundingDecimals(cfg.GeneralCfg().RoundingDecimals) - engine.SetFailedPostCacheTTL(cfg.GeneralCfg().FailedPostsTTL) - // Rpc/http server server := cores.NewServer(caps) if len(cfg.HTTPCfg().DispatchersRegistrarURL) != 0 { @@ -526,16 +552,16 @@ func main() { server.RegisterHttpFunc(cfg.ConfigSCfg().URL, config.HandlerConfigS) } if *httpPprofPath != "" { - go server.RegisterProfiler(*httpPprofPath) + server.RegisterProfiler(*httpPprofPath) } - // Async starts here, will follow cgrates.json start order // Define internal connections via channels filterSChan := make(chan *engine.FilterS, 1) // init AnalyzerS - anz := services.NewAnalyzerService(cfg, server, filterSChan, exitChan, internalAnalyzerSChan) + anz := services.NewAnalyzerService(cfg, server, filterSChan, shdChan, internalAnalyzerSChan) if anz.ShouldRun() { + shdWg.Add(1) if err := anz.Start(); err != nil { fmt.Println(err) return @@ -544,20 +570,21 @@ func main() { // init CoreSv1 coreS := services.NewCoreService(cfg, caps, server, internalCoreSv1Chan, anz) + shdWg.Add(1) if err := coreS.Start(); err != nil { fmt.Println(err) return } // init CacheS - cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan, anz, coreS.GetCoreS().CapsStats) + cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), shdChan, anz, coreS.GetCoreS().CapsStats) engine.SetCache(cacheS) // init GuardianSv1 initGuardianSv1(internalGuardianSChan, server, anz) // Start ServiceManager - srvManager := servmanager.NewServiceManager(cfg, exitChan) + srvManager := servmanager.NewServiceManager(cfg, shdChan, shdWg) attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan, anz) dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalDispatcherSChan, connManager, anz) dspH := services.NewDispatcherHostsService(cfg, server, connManager, anz) @@ -576,7 +603,7 @@ func main() { rals := services.NewRalService(cfg, cacheS, server, internalRALsChan, internalResponderChan, - exitChan, connManager, anz) + shdChan, connManager, anz) apiSv1 := services.NewAPIerSv1Service(cfg, dmService, storDBService, filterSChan, server, schS, rals.GetResponder(), internalAPIerSv1Chan, connManager, anz) @@ -586,27 +613,27 @@ func main() { cdrS := services.NewCDRServer(cfg, dmService, storDBService, filterSChan, server, internalCDRServerChan, connManager, anz) - smg := services.NewSessionService(cfg, dmService, server, internalSessionSChan, exitChan, connManager, caps, anz) + smg := services.NewSessionService(cfg, dmService, server, internalSessionSChan, shdChan, connManager, caps, anz) ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, internalLoaderSChan, connManager, anz) srvManager.AddServices(gvService, attrS, chrS, tS, stS, reS, routeS, schS, rals, apiSv1, apiSv2, cdrS, smg, coreS, - services.NewEventReaderService(cfg, filterSChan, exitChan, connManager), - services.NewDNSAgent(cfg, filterSChan, exitChan, connManager), - services.NewFreeswitchAgent(cfg, exitChan, connManager), - services.NewKamailioAgent(cfg, exitChan, connManager), - services.NewAsteriskAgent(cfg, exitChan, connManager), // partial reload - services.NewRadiusAgent(cfg, filterSChan, exitChan, connManager), // partial reload - services.NewDiameterAgent(cfg, filterSChan, exitChan, connManager), // partial reload - services.NewHTTPAgent(cfg, filterSChan, server, connManager), // no reload + services.NewEventReaderService(cfg, filterSChan, shdChan, connManager), + services.NewDNSAgent(cfg, filterSChan, shdChan, connManager), + services.NewFreeswitchAgent(cfg, shdChan, connManager), + services.NewKamailioAgent(cfg, shdChan, connManager), + services.NewAsteriskAgent(cfg, shdChan, connManager), // partial reload + services.NewRadiusAgent(cfg, filterSChan, shdChan, connManager), // partial reload + services.NewDiameterAgent(cfg, filterSChan, shdChan, connManager), // partial reload + services.NewHTTPAgent(cfg, filterSChan, server, connManager), // no reload ldrs, anz, dspS, dspH, dmService, storDBService, services.NewEventExporterService(cfg, filterSChan, connManager, server, internalEEsChan, anz), services.NewRateService(cfg, cacheS, filterSChan, dmService, server, internalRateSChan, anz), - services.NewSIPAgent(cfg, filterSChan, exitChan, connManager), + services.NewSIPAgent(cfg, filterSChan, shdChan, connManager), ) srvManager.StartServices() // Start FilterS @@ -645,7 +672,7 @@ func main() { initConfigSv1(internalConfigChan, server, anz) if *preload != utils.EmptyString { - runPreload(ldrs, internalLoaderSChan, exitChan) + runPreload(ldrs, internalLoaderSChan, shdChan) } // Serve rpc connections @@ -654,16 +681,21 @@ func main() { internalAttributeSChan, internalChargerSChan, internalThresholdSChan, internalRouteSChan, internalSessionSChan, internalAnalyzerSChan, internalDispatcherSChan, internalLoaderSChan, internalRALsChan, - internalCacheSChan, internalEEsChan, internalRateSChan, rpcStop, exitChan) - <-exitChan - close(rpcStop) - close(signStop) - srvManager.ShutdownServices(cfg.CoreSCfg().ShutdownTimeout) + internalCacheSChan, internalEEsChan, internalRateSChan, shdChan) - if *cpuProfDir != "" { // wait to end cpuProfiling - close(cpuProfChanStop) - <-cpuProfChanDone + <-shdChan.Done() + shtdDone := make(chan struct{}) + go func() { + shdWg.Wait() + close(shtdDone) + }() + select { + case <-shtdDone: + case <-time.After(10 * time.Second): //cfg.CoreSCfg().ShutdownTimeout): + utils.Logger.Err(fmt.Sprintf("<%s> Failed to shutdown all subsystems in the given time", + utils.ServiceManager)) } + if *memProfDir != "" { // write last memory profiling memProfFile(path.Join(*memProfDir, "mem_final.prof")) } diff --git a/cores/server.go b/cores/server.go index a2bb0b283..bce455097 100644 --- a/cores/server.go +++ b/cores/server.go @@ -150,7 +150,7 @@ func (s *Server) BiRPCRegister(rcvr interface{}) { } } -func (s *Server) ServeJSON(addr string, exitChan chan<- struct{}) { +func (s *Server) ServeJSON(addr string, shdChan *utils.SyncedChan) { s.RLock() enabled := s.rpcEnabled s.RUnlock() @@ -158,10 +158,10 @@ func (s *Server) ServeJSON(addr string, exitChan chan<- struct{}) { return } - defer func() { close(exitChan) }() lJSON, e := net.Listen(utils.TCP, addr) if e != nil { log.Println("ServeJSON listen error:", e) + shdChan.CloseOnce() return } utils.Logger.Info(fmt.Sprintf("Starting CGRateS JSON server at <%s>.", addr)) @@ -178,26 +178,26 @@ func (s *Server) ServeJSON(addr string, exitChan chan<- struct{}) { lastErrorTime = time.Now() errCnt++ if errCnt > 50 { // Too many errors in short interval, network buffer failure most probably - break + shdChan.CloseOnce() + return } continue } go rpc.ServeCodec(newCapsJSONCodec(conn, s.caps, s.anz)) } - } -func (s *Server) ServeGOB(addr string, exitChan chan<- struct{}) { +func (s *Server) ServeGOB(addr string, shdChan *utils.SyncedChan) { s.RLock() enabled := s.rpcEnabled s.RUnlock() if !enabled { return } - defer func() { close(exitChan) }() lGOB, e := net.Listen(utils.TCP, addr) if e != nil { log.Println("ServeGOB listen error:", e) + shdChan.CloseOnce() return } utils.Logger.Info(fmt.Sprintf("Starting CGRateS GOB server at <%s>.", addr)) @@ -214,7 +214,8 @@ func (s *Server) ServeGOB(addr string, exitChan chan<- struct{}) { lastErrorTime = time.Now() errCnt++ if errCnt > 50 { // Too many errors in short interval, network buffer failure most probably - break + shdChan.CloseOnce() + return } continue } @@ -254,14 +255,13 @@ func (s *Server) RegisterProfiler(addr string) { } func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, - useBasicAuth bool, userList map[string]string, exitChan chan<- struct{}) { + useBasicAuth bool, userList map[string]string, shdChan *utils.SyncedChan) { s.RLock() enabled := s.rpcEnabled s.RUnlock() if !enabled { return } - // s.httpMux = http.NewServeMux() if jsonRPCURL != "" { s.Lock() s.httpEnabled = true @@ -297,8 +297,8 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, utils.Logger.Info(fmt.Sprintf(" start listening at <%s>", addr)) if err := http.ListenAndServe(addr, s.httpMux); err != nil { log.Println(fmt.Sprintf("Error: %s when listening ", err)) + shdChan.CloseOnce() } - close(exitChan) } // ServeBiJSON create a gorutine to listen and serve as BiRPC server @@ -428,21 +428,22 @@ func loadTLSConfig(serverCrt, serverKey, caCert string, serverPolicy int, } func (s *Server) ServeGOBTLS(addr, serverCrt, serverKey, caCert string, - serverPolicy int, serverName string, exitChan chan<- struct{}) { + serverPolicy int, serverName string, shdChan *utils.SyncedChan) { s.RLock() enabled := s.rpcEnabled s.RUnlock() if !enabled { return } - defer func() { close(exitChan) }() config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName) if err != nil { + shdChan.CloseOnce() return } listener, err := tls.Listen(utils.TCP, addr, config) if err != nil { log.Println(fmt.Sprintf("Error: %s when listening", err)) + shdChan.CloseOnce() return } @@ -461,7 +462,8 @@ func (s *Server) ServeGOBTLS(addr, serverCrt, serverKey, caCert string, lastErrorTime = time.Now() errCnt++ if errCnt > 50 { // Too many errors in short interval, network buffer failure most probably - break + shdChan.CloseOnce() + return } continue } @@ -470,21 +472,22 @@ func (s *Server) ServeGOBTLS(addr, serverCrt, serverKey, caCert string, } func (s *Server) ServeJSONTLS(addr, serverCrt, serverKey, caCert string, - serverPolicy int, serverName string, exitChan chan<- struct{}) { + serverPolicy int, serverName string, shdChan *utils.SyncedChan) { s.RLock() enabled := s.rpcEnabled s.RUnlock() if !enabled { return } - defer func() { close(exitChan) }() config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName) if err != nil { + shdChan.CloseOnce() return } listener, err := tls.Listen(utils.TCP, addr, config) if err != nil { log.Println(fmt.Sprintf("Error: %s when listening", err)) + shdChan.CloseOnce() return } utils.Logger.Info(fmt.Sprintf("Starting CGRateS JSON TLS server at <%s>.", addr)) @@ -502,7 +505,8 @@ func (s *Server) ServeJSONTLS(addr, serverCrt, serverKey, caCert string, lastErrorTime = time.Now() errCnt++ if errCnt > 50 { // Too many errors in short interval, network buffer failure most probably - break + shdChan.CloseOnce() + return } continue } @@ -512,7 +516,7 @@ func (s *Server) ServeJSONTLS(addr, serverCrt, serverKey, caCert string, func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverPolicy int, serverName string, jsonRPCURL string, wsRPCURL string, - useBasicAuth bool, userList map[string]string, exitChan chan<- struct{}) { + useBasicAuth bool, userList map[string]string, shdChan *utils.SyncedChan) { s.RLock() enabled := s.rpcEnabled s.RUnlock() @@ -550,9 +554,9 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP if useBasicAuth { utils.Logger.Info(" enabling basic auth") } - defer func() { close(exitChan) }() config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName) if err != nil { + shdChan.CloseOnce() return } httpSrv := http.Server{ @@ -563,6 +567,6 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP utils.Logger.Info(fmt.Sprintf(" start listening at <%s>", addr)) if err := httpSrv.ListenAndServeTLS(serverCrt, serverKey); err != nil { log.Println(fmt.Sprintf("Error: %s when listening ", err)) + shdChan.CloseOnce() } - return } diff --git a/engine/caps_test.go b/engine/caps_test.go index c319d062e..6410c79df 100644 --- a/engine/caps_test.go +++ b/engine/caps_test.go @@ -64,30 +64,30 @@ func TestCapsStats(t *testing.T) { } exp := &CapsStats{st: st} cr := NewCaps(0, utils.MetaBusy) - exitChan := make(chan struct{}, 1) - close(exitChan) - cs := NewCapsStats(1, cr, exitChan) + stopChan := make(chan struct{}, 1) + close(stopChan) + cs := NewCapsStats(1, cr, stopChan) if !reflect.DeepEqual(exp, cs) { t.Errorf("Expected: %v ,received: %v", exp, cs) } - <-exitChan - exitChan = make(chan struct{}, 1) + <-stopChan + stopChan = make(chan struct{}, 1) go func() { runtime.Gosched() time.Sleep(100) - close(exitChan) + close(stopChan) }() cr = NewCaps(10, utils.MetaBusy) cr.Allocate() cr.Allocate() - cs.loop(1, exitChan, cr) + cs.loop(1, stopChan, cr) if avg := cs.GetAverage(2); avg <= 0 { t.Errorf("Expected at least an event to be processed: %v", avg) } if pk := cs.GetPeak(); pk != 2 { t.Errorf("Expected the peak to be 2 received: %v", pk) } - <-exitChan + <-stopChan } func TestCapsStatsGetAverage(t *testing.T) { diff --git a/engine/responder.go b/engine/responder.go index c5b9b2e03..9d2534d4a 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -31,7 +31,7 @@ import ( ) type Responder struct { - ExitChan chan<- struct{} + ShdChan *utils.SyncedChan Timeout time.Duration Timezone string MaxComputedUsage map[string]time.Duration @@ -369,7 +369,7 @@ func (rs *Responder) GetMaxSessionTimeOnAccounts(arg *utils.GetMaxSessionTimeOnA func (rs *Responder) Shutdown(arg *utils.TenantWithOpts, reply *string) (err error) { dm.DataDB().Close() cdrStorage.Close() - defer func() { close(rs.ExitChan) }() + defer rs.ShdChan.CloseOnce() *reply = "Done!" return } diff --git a/general_tests/sessions_race_test.go b/general_tests/sessions_race_test.go index ca005084a..fac7718a2 100644 --- a/general_tests/sessions_race_test.go +++ b/general_tests/sessions_race_test.go @@ -95,7 +95,7 @@ func TestSessionSRace(t *testing.T) { // resp resp = &engine.Responder{ - ExitChan: make(chan struct{}, 1), + ShdChan: utils.NewSyncedChan(), MaxComputedUsage: cfg.RalsCfg().MaxComputedUsage, } respChan <- resp diff --git a/loaders/loader.go b/loaders/loader.go index 206fc51af..9767f2dd8 100644 --- a/loaders/loader.go +++ b/loaders/loader.go @@ -107,9 +107,9 @@ type Loader struct { cacheConns []string } -func (ldr *Loader) ListenAndServe(exitChan chan struct{}) (err error) { +func (ldr *Loader) ListenAndServe(stopChan chan struct{}) (err error) { utils.Logger.Info(fmt.Sprintf("Starting <%s-%s>", utils.LoaderS, ldr.ldrID)) - return ldr.serve(exitChan) + return ldr.serve(stopChan) } // ProcessFolder will process the content in the folder with locking @@ -971,26 +971,26 @@ func (ldr *Loader) removeLoadedData(loaderType string, lds map[string][]LoaderDa return } -func (ldr *Loader) serve(exitChan chan struct{}) (err error) { +func (ldr *Loader) serve(stopChan chan struct{}) (err error) { fmt.Println(ldr.runDelay) switch ldr.runDelay { case time.Duration(0): // 0 disables the automatic read, maybe done per API return case time.Duration(-1): return utils.WatchDir(ldr.tpInDir, ldr.processFile, - utils.LoaderS+"-"+ldr.ldrID, exitChan) + utils.LoaderS+"-"+ldr.ldrID, stopChan) default: - go ldr.handleFolder(exitChan) + go ldr.handleFolder(stopChan) } return } -func (ldr *Loader) handleFolder(exitChan chan struct{}) { +func (ldr *Loader) handleFolder(stopChan chan struct{}) { for { go ldr.ProcessFolder(config.CgrConfig().GeneralCfg().DefaultCaching, utils.MetaStore, false) timer := time.NewTimer(ldr.runDelay) select { - case <-exitChan: + case <-stopChan: utils.Logger.Info( fmt.Sprintf("<%s-%s> stop monitoring path <%s>", utils.LoaderS, ldr.ldrID, ldr.tpInDir)) diff --git a/loaders/loaders.go b/loaders/loaders.go index ace113f50..47b024ebd 100644 --- a/loaders/loaders.go +++ b/loaders/loaders.go @@ -51,9 +51,9 @@ func (ldrS *LoaderService) Enabled() bool { return len(ldrS.ldrs) != 0 } -func (ldrS *LoaderService) ListenAndServe(exitChan chan struct{}) (err error) { +func (ldrS *LoaderService) ListenAndServe(stopChan chan struct{}) (err error) { for _, ldr := range ldrS.ldrs { - if err = ldr.ListenAndServe(exitChan); err != nil { + if err = ldr.ListenAndServe(stopChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s-%s> error: <%s>", utils.LoaderS, ldr.ldrID, err.Error())) return } diff --git a/rates/rates_test.go b/rates/rates_test.go index 13241b856..042f9331e 100644 --- a/rates/rates_test.go +++ b/rates/rates_test.go @@ -32,13 +32,13 @@ import ( func TestListenAndServe(t *testing.T) { newRates := &RateS{} cfgRld := make(chan struct{}, 1) - exitChan := make(chan struct{}, 1) + stopChan := make(chan struct{}, 1) cfgRld <- struct{}{} go func() { time.Sleep(10) - exitChan <- struct{}{} + stopChan <- struct{}{} }() - newRates.ListenAndServe(exitChan, cfgRld) + newRates.ListenAndServe(stopChan, cfgRld) } func TestNewRateS(t *testing.T) { diff --git a/services/analyzers.go b/services/analyzers.go index 1a3c3b528..fd54861c1 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -33,14 +33,14 @@ import ( // NewAnalyzerService returns the Analyzer Service func NewAnalyzerService(cfg *config.CGRConfig, server *cores.Server, - filterSChan chan *engine.FilterS, exitChan chan<- struct{}, + filterSChan chan *engine.FilterS, shdChan *utils.SyncedChan, internalAnalyzerSChan chan rpcclient.ClientConnector) *AnalyzerService { return &AnalyzerService{ connChan: internalAnalyzerSChan, cfg: cfg, server: server, filterSChan: filterSChan, - exitChan: exitChan, + shdChan: shdChan, } } @@ -51,7 +51,7 @@ type AnalyzerService struct { server *cores.Server filterSChan chan *engine.FilterS stopChan chan struct{} - exitChan chan<- struct{} + shdChan *utils.SyncedChan anz *analyzers.AnalyzerService rpc *v1.AnalyzerSv1 @@ -74,7 +74,7 @@ func (anz *AnalyzerService) Start() (err error) { go func() { if err := anz.anz.ListenAndServe(anz.stopChan); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Error: %s listening for packets", utils.AnalyzerS, err.Error())) - close(anz.exitChan) + anz.shdChan.CloseOnce() } return }() diff --git a/services/apiers_it_test.go b/services/apiers_it_test.go index fbacb0426..c0c60ea6f 100644 --- a/services/apiers_it_test.go +++ b/services/apiers_it_test.go @@ -21,6 +21,7 @@ package services import ( "path" + "sync" "testing" "time" @@ -41,7 +42,8 @@ func TestApiersReload(t *testing.T) { utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan struct{}, 2) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) chS := engine.NewCacheS(cfg, nil, nil) close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles)) close(chS.GetPrecacheChannel(utils.CacheThresholds)) @@ -51,11 +53,11 @@ func TestApiersReload(t *testing.T) { cfg.ThresholdSCfg().Enabled = true cfg.SchedulerCfg().Enabled = true server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) db := NewDataDBService(cfg, nil) cfg.StorDbCfg().Type = utils.INTERNAL stordb := NewStorDBService(cfg) - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) apiSv1 := NewAPIerSv1Service(cfg, db, stordb, filterSChan, server, schS, new(ResponderService), @@ -110,6 +112,6 @@ func TestApiersReload(t *testing.T) { if apiSv2.IsRunning() { t.Errorf("Expected service to be down") } - close(engineShutdown) - srvMngr.ShutdownServices(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) } diff --git a/services/asteriskagent.go b/services/asteriskagent.go index a75ff8c3a..508019da2 100644 --- a/services/asteriskagent.go +++ b/services/asteriskagent.go @@ -32,11 +32,11 @@ import ( // NewAsteriskAgent returns the Asterisk Agent func NewAsteriskAgent(cfg *config.CGRConfig, - exitChan chan<- struct{}, connMgr *engine.ConnManager) servmanager.Service { + shdChan *utils.SyncedChan, connMgr *engine.ConnManager) servmanager.Service { return &AsteriskAgent{ - cfg: cfg, - exitChan: exitChan, - connMgr: connMgr, + cfg: cfg, + shdChan: shdChan, + connMgr: connMgr, } } @@ -44,7 +44,7 @@ func NewAsteriskAgent(cfg *config.CGRConfig, type AsteriskAgent struct { sync.RWMutex cfg *config.CGRConfig - exitChan chan<- struct{} + shdChan *utils.SyncedChan stopChan chan struct{} smas []*agents.AsteriskAgent @@ -60,10 +60,10 @@ func (ast *AsteriskAgent) Start() (err error) { ast.Lock() defer ast.Unlock() - listenAndServe := func(sma *agents.AsteriskAgent, stopChan chan struct{}, exitChan chan<- struct{}) { + listenAndServe := func(sma *agents.AsteriskAgent, stopChan chan struct{}, shdChan *utils.SyncedChan) { if err := sma.ListenAndServe(stopChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> runtime error: %s!", utils.AsteriskAgent, err)) - close(exitChan) + shdChan.CloseOnce() } } ast.stopChan = make(chan struct{}) @@ -73,7 +73,7 @@ func (ast *AsteriskAgent) Start() (err error) { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.AsteriskAgent, err)) return } - go listenAndServe(ast.smas[connIdx], ast.stopChan, ast.exitChan) + go listenAndServe(ast.smas[connIdx], ast.stopChan, ast.shdChan) } return } diff --git a/services/asteriskagent_it_test.go b/services/asteriskagent_it_test.go index 7c0cbf672..dce4d213c 100644 --- a/services/asteriskagent_it_test.go +++ b/services/asteriskagent_it_test.go @@ -21,6 +21,7 @@ package services import ( "path" + "sync" "testing" "time" @@ -42,19 +43,20 @@ func TestAsteriskAgentReload(t *testing.T) { utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan struct{}, 1) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) chS := engine.NewCacheS(cfg, nil, nil) cacheSChan := make(chan rpcclient.ClientConnector, 1) cacheSChan <- chS server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) db := NewDataDBService(cfg, nil) - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - engineShutdown, nil, nil, anz) - srv := NewAsteriskAgent(cfg, engineShutdown, nil) + shdChan, nil, nil, anz) + srv := NewAsteriskAgent(cfg, shdChan, nil) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) @@ -83,6 +85,6 @@ func TestAsteriskAgentReload(t *testing.T) { if srv.IsRunning() { t.Errorf("Expected service to be down") } - close(engineShutdown) - srvMngr.ShutdownServices(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) } diff --git a/services/attributes_it_test.go b/services/attributes_it_test.go index 5fe50c9b3..bb0ba2232 100644 --- a/services/attributes_it_test.go +++ b/services/attributes_it_test.go @@ -21,6 +21,7 @@ package services import ( "path" + "sync" "testing" "time" @@ -41,17 +42,18 @@ func TestAttributeSReload(t *testing.T) { utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) utils.Logger.SetLogLevel(7) - engineShutdown := make(chan struct{}, 1) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) chS := engine.NewCacheS(cfg, nil, nil) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles)) close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes)) server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) db := NewDataDBService(cfg, nil) attrRPC := make(chan rpcclient.ClientConnector, 1) - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) attrS := NewAttributeService(cfg, db, chS, filterSChan, server, attrRPC, anz) @@ -95,6 +97,6 @@ func TestAttributeSReload(t *testing.T) { if attrS.IsRunning() { t.Errorf("Expected service to be down") } - close(engineShutdown) - srvMngr.ShutdownServices(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) } diff --git a/services/cdrs_it_test.go b/services/cdrs_it_test.go index 0bc4dfdb6..ceed4772d 100644 --- a/services/cdrs_it_test.go +++ b/services/cdrs_it_test.go @@ -21,6 +21,7 @@ package services import ( "path" + "sync" "testing" "time" @@ -41,7 +42,8 @@ func TestCdrsReload(t *testing.T) { utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan struct{}, 1) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) chS := engine.NewCacheS(cfg, nil, nil) close(chS.GetPrecacheChannel(utils.CacheChargerProfiles)) @@ -60,17 +62,17 @@ func TestCdrsReload(t *testing.T) { cfg.ChargerSCfg().Enabled = true server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) db := NewDataDBService(cfg, nil) cfg.StorDbCfg().Type = utils.INTERNAL stordb := NewStorDBService(cfg) - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) ralS := NewRalService(cfg, chS, server, make(chan rpcclient.ClientConnector, 1), make(chan rpcclient.ClientConnector, 1), - engineShutdown, nil, anz) + shdChan, nil, anz) cdrsRPC := make(chan rpcclient.ClientConnector, 1) cdrS := NewCDRServer(cfg, db, stordb, filterSChan, server, cdrsRPC, nil, anz) @@ -120,6 +122,6 @@ func TestCdrsReload(t *testing.T) { if cdrS.IsRunning() { t.Errorf("Expected service to be down") } - close(engineShutdown) - srvMngr.ShutdownServices(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) } diff --git a/services/chargers_it_test.go b/services/chargers_it_test.go index bb536aa72..ac1df7381 100644 --- a/services/chargers_it_test.go +++ b/services/chargers_it_test.go @@ -21,6 +21,7 @@ package services import ( "path" + "sync" "testing" "time" @@ -41,7 +42,8 @@ func TestChargerSReload(t *testing.T) { utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) utils.Logger.SetLogLevel(7) cfg.AttributeSCfg().Enabled = true - engineShutdown := make(chan struct{}, 1) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) chS := engine.NewCacheS(cfg, nil, nil) close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles)) close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes)) @@ -50,9 +52,9 @@ func TestChargerSReload(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) db := NewDataDBService(cfg, nil) - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) engine.NewConnManager(cfg, nil) @@ -90,6 +92,6 @@ func TestChargerSReload(t *testing.T) { if chrS.IsRunning() { t.Errorf("Expected service to be down") } - close(engineShutdown) - srvMngr.ShutdownServices(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) } diff --git a/services/datadb_it_test.go b/services/datadb_it_test.go index c64cf6476..d0390073b 100644 --- a/services/datadb_it_test.go +++ b/services/datadb_it_test.go @@ -22,6 +22,7 @@ package services import ( "path" "reflect" + "sync" "testing" "time" @@ -42,17 +43,18 @@ func TestDataDBReload(t *testing.T) { utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) utils.Logger.SetLogLevel(7) - engineShutdown := make(chan struct{}, 1) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) chS := engine.NewCacheS(cfg, nil, nil) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles)) close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes)) server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) cM := engine.NewConnManager(cfg, nil) db := NewDataDBService(cfg, cM) - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) srvMngr.AddServices(NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz), NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) @@ -185,6 +187,6 @@ func TestDataDBReload(t *testing.T) { if db.IsRunning() { t.Errorf("Expected service to be down") } - close(engineShutdown) - srvMngr.ShutdownServices(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) } diff --git a/services/diameteragent.go b/services/diameteragent.go index d247cb54b..3ea65b39c 100644 --- a/services/diameteragent.go +++ b/services/diameteragent.go @@ -31,11 +31,11 @@ import ( // NewDiameterAgent returns the Diameter Agent func NewDiameterAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - exitChan chan<- struct{}, connMgr *engine.ConnManager) servmanager.Service { + shdChan *utils.SyncedChan, connMgr *engine.ConnManager) servmanager.Service { return &DiameterAgent{ cfg: cfg, filterSChan: filterSChan, - exitChan: exitChan, + shdChan: shdChan, connMgr: connMgr, } } @@ -45,7 +45,7 @@ type DiameterAgent struct { sync.RWMutex cfg *config.CGRConfig filterSChan chan *engine.FilterS - exitChan chan<- struct{} + shdChan *utils.SyncedChan stopChan chan struct{} da *agents.DiameterAgent @@ -80,7 +80,7 @@ func (da *DiameterAgent) Start() (err error) { if err = d.ListenAndServe(da.stopChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.DiameterAgent, err)) - close(da.exitChan) + da.shdChan.CloseOnce() } }(da.da) return diff --git a/services/diameteragent_it_test.go b/services/diameteragent_it_test.go index 852fc3f2d..306e87607 100644 --- a/services/diameteragent_it_test.go +++ b/services/diameteragent_it_test.go @@ -21,6 +21,7 @@ package services import ( "path" + "sync" "testing" "time" @@ -42,19 +43,20 @@ func TestDiameterAgentReload(t *testing.T) { utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan struct{}, 1) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) chS := engine.NewCacheS(cfg, nil, nil) cacheSChan := make(chan rpcclient.ClientConnector, 1) cacheSChan <- chS server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) db := NewDataDBService(cfg, nil) - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - engineShutdown, nil, nil, anz) - srv := NewDiameterAgent(cfg, filterSChan, engineShutdown, nil) + shdChan, nil, nil, anz) + srv := NewDiameterAgent(cfg, filterSChan, shdChan, nil) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) @@ -83,6 +85,6 @@ func TestDiameterAgentReload(t *testing.T) { if srv.IsRunning() { t.Errorf("Expected service to be down") } - close(engineShutdown) - srvMngr.ShutdownServices(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) } diff --git a/services/dispatchers_it_test.go b/services/dispatchers_it_test.go index d3ffafed7..b090e201d 100644 --- a/services/dispatchers_it_test.go +++ b/services/dispatchers_it_test.go @@ -21,6 +21,7 @@ package services import ( "path" + "sync" "testing" "time" @@ -41,7 +42,8 @@ func TestDispatcherSReload(t *testing.T) { utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) utils.Logger.SetLogLevel(7) cfg.AttributeSCfg().Enabled = true - engineShutdown := make(chan struct{}, 1) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) chS := engine.NewCacheS(cfg, nil, nil) close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles)) close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes)) @@ -51,9 +53,9 @@ func TestDispatcherSReload(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) db := NewDataDBService(cfg, nil) - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) srv := NewDispatcherService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) @@ -93,6 +95,6 @@ func TestDispatcherSReload(t *testing.T) { if srv.IsRunning() { t.Errorf("Expected service to be down") } - close(engineShutdown) - srvMngr.ShutdownServices(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) } diff --git a/services/dnsagent.go b/services/dnsagent.go index 982f746bf..b47065e1a 100644 --- a/services/dnsagent.go +++ b/services/dnsagent.go @@ -31,11 +31,11 @@ import ( // NewDNSAgent returns the DNS Agent func NewDNSAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - exitChan chan<- struct{}, connMgr *engine.ConnManager) servmanager.Service { + shdChan *utils.SyncedChan, connMgr *engine.ConnManager) servmanager.Service { return &DNSAgent{ cfg: cfg, filterSChan: filterSChan, - exitChan: exitChan, + shdChan: shdChan, connMgr: connMgr, } } @@ -45,7 +45,7 @@ type DNSAgent struct { sync.RWMutex cfg *config.CGRConfig filterSChan chan *engine.FilterS - exitChan chan<- struct{} + shdChan *utils.SyncedChan dns *agents.DNSAgent connMgr *engine.ConnManager @@ -73,7 +73,7 @@ func (dns *DNSAgent) Start() (err error) { go func() { if err = dns.dns.ListenAndServe(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) - close(dns.exitChan) // stop the engine here + dns.shdChan.CloseOnce() // stop the engine here } }() return @@ -96,7 +96,7 @@ func (dns *DNSAgent) Reload() (err error) { go func() { if err := dns.dns.ListenAndServe(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) - close(dns.exitChan) // stop the engine here + dns.shdChan.CloseOnce() // stop the engine here } }() return diff --git a/services/dnsagent_it_test.go b/services/dnsagent_it_test.go index e0d65cc96..3e9f46dd8 100644 --- a/services/dnsagent_it_test.go +++ b/services/dnsagent_it_test.go @@ -21,6 +21,7 @@ package services import ( "path" + "sync" "testing" "time" @@ -42,19 +43,20 @@ func TestDNSAgentReload(t *testing.T) { utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan struct{}, 1) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) chS := engine.NewCacheS(cfg, nil, nil) cacheSChan := make(chan rpcclient.ClientConnector, 1) cacheSChan <- chS server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) db := NewDataDBService(cfg, nil) - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - engineShutdown, nil, nil, anz) - srv := NewDNSAgent(cfg, filterSChan, engineShutdown, nil) + shdChan, nil, nil, anz) + srv := NewDNSAgent(cfg, filterSChan, shdChan, nil) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) @@ -83,6 +85,6 @@ func TestDNSAgentReload(t *testing.T) { if srv.IsRunning() { t.Errorf("Expected service to be down") } - close(engineShutdown) - srvMngr.ShutdownServices(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) } diff --git a/services/ees_it_test.go b/services/ees_it_test.go index de9fd6dc9..1771fe5b2 100644 --- a/services/ees_it_test.go +++ b/services/ees_it_test.go @@ -22,6 +22,7 @@ package services import ( "os" "path" + "sync" "testing" "time" @@ -53,14 +54,15 @@ func TestEventExporterSReload(t *testing.T) { cfg.AttributeSCfg().Enabled = true filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan struct{}, 1) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) db := NewDataDBService(cfg, nil) chS := engine.NewCacheS(cfg, nil, nil) close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles)) close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes)) - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) @@ -101,6 +103,6 @@ func TestEventExporterSReload(t *testing.T) { if ees.IsRunning() { t.Errorf("Expected service to be down") } - close(engineShutdown) - srvMngr.ShutdownServices(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) } diff --git a/services/ers.go b/services/ers.go index 8e8ff3182..7c1c1dd57 100644 --- a/services/ers.go +++ b/services/ers.go @@ -31,12 +31,12 @@ import ( // NewEventReaderService returns the EventReader Service func NewEventReaderService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - exitChan chan<- struct{}, connMgr *engine.ConnManager) servmanager.Service { + shdChan *utils.SyncedChan, connMgr *engine.ConnManager) servmanager.Service { return &EventReaderService{ rldChan: make(chan struct{}, 1), cfg: cfg, filterSChan: filterSChan, - exitChan: exitChan, + shdChan: shdChan, connMgr: connMgr, } } @@ -46,7 +46,7 @@ type EventReaderService struct { sync.RWMutex cfg *config.CGRConfig filterSChan chan *engine.FilterS - exitChan chan<- struct{} + shdChan *utils.SyncedChan ers *ers.ERService rldChan chan struct{} @@ -76,7 +76,7 @@ func (erS *EventReaderService) Start() (err error) { go func(ers *ers.ERService, stopChan, rldChan chan struct{}) { if err := ers.ListenAndServe(stopChan, rldChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error())) - close(erS.exitChan) + erS.shdChan.CloseOnce() } }(erS.ers, erS.stopChan, erS.rldChan) return diff --git a/services/ers_it_test.go b/services/ers_it_test.go index 0bfd37403..d52579aa3 100644 --- a/services/ers_it_test.go +++ b/services/ers_it_test.go @@ -22,6 +22,7 @@ package services import ( "os" "path" + "sync" "testing" "time" @@ -52,13 +53,14 @@ func TestEventReaderSReload(t *testing.T) { cfg.SessionSCfg().Enabled = true filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan struct{}, 1) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) db := NewDataDBService(cfg, nil) - sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), engineShutdown, nil, nil, anz) - attrS := NewEventReaderService(cfg, filterSChan, engineShutdown, nil) + sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), shdChan, nil, nil, anz) + attrS := NewEventReaderService(cfg, filterSChan, shdChan, nil) engine.NewConnManager(cfg, nil) srvMngr.AddServices(attrS, sS, NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) @@ -87,6 +89,6 @@ func TestEventReaderSReload(t *testing.T) { if attrS.IsRunning() { t.Errorf("Expected service to be down") } - close(engineShutdown) - srvMngr.ShutdownServices(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) } diff --git a/services/freeswitchagent.go b/services/freeswitchagent.go index 162a2c1f4..b2c6a5f79 100644 --- a/services/freeswitchagent.go +++ b/services/freeswitchagent.go @@ -32,19 +32,19 @@ import ( // NewFreeswitchAgent returns the Freeswitch Agent func NewFreeswitchAgent(cfg *config.CGRConfig, - exitChan chan<- struct{}, connMgr *engine.ConnManager) servmanager.Service { + shdChan *utils.SyncedChan, connMgr *engine.ConnManager) servmanager.Service { return &FreeswitchAgent{ - cfg: cfg, - exitChan: exitChan, - connMgr: connMgr, + cfg: cfg, + shdChan: shdChan, + connMgr: connMgr, } } // FreeswitchAgent implements Agent interface type FreeswitchAgent struct { sync.RWMutex - cfg *config.CGRConfig - exitChan chan<- struct{} + cfg *config.CGRConfig + shdChan *utils.SyncedChan fS *agents.FSsessions connMgr *engine.ConnManager @@ -64,7 +64,7 @@ func (fS *FreeswitchAgent) Start() (err error) { go func(f *agents.FSsessions) { if err := f.Connect(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.FreeSWITCHAgent, err)) - close(fS.exitChan) // stop the engine here + fS.shdChan.CloseOnce() // stop the engine here } }(fS.fS) return @@ -81,7 +81,7 @@ func (fS *FreeswitchAgent) Reload() (err error) { go func(f *agents.FSsessions) { if err := fS.fS.Connect(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.FreeSWITCHAgent, err)) - close(fS.exitChan) // stop the engine here + fS.shdChan.CloseOnce() // stop the engine here } }(fS.fS) return diff --git a/services/freeswitchagent_it_test.go b/services/freeswitchagent_it_test.go index c06fbf709..ae3c8b0d6 100644 --- a/services/freeswitchagent_it_test.go +++ b/services/freeswitchagent_it_test.go @@ -21,6 +21,8 @@ package services import ( "path" + "runtime" + "sync" "testing" "time" @@ -38,23 +40,25 @@ func TestFreeSwitchAgentReload(t *testing.T) { t.Fatal(err) } cfg.SessionSCfg().Enabled = true + cfg.SessionSCfg().ListenBijson = "" utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan struct{}, 1) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) chS := engine.NewCacheS(cfg, nil, nil) cacheSChan := make(chan rpcclient.ClientConnector, 1) cacheSChan <- chS server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) db := NewDataDBService(cfg, nil) - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - engineShutdown, nil, nil, anz) - srv := NewFreeswitchAgent(cfg, engineShutdown, nil) + shdChan, nil, nil, anz) + srv := NewFreeswitchAgent(cfg, shdChan, nil) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) @@ -74,20 +78,12 @@ func TestFreeSwitchAgentReload(t *testing.T) { t.Errorf("Expecting OK ,received %s", reply) } time.Sleep(10 * time.Millisecond) //need to switch to gorutine - if !srv.IsRunning() { - t.Errorf("Expected service to be running") - } - cfg.FsAgentCfg().Enabled = false - cfg.GetReloadChan(config.FreeSWITCHAgentJSN) <- struct{}{} - time.Sleep(10 * time.Millisecond) + // the engine should be stoped as we could not connect to freeswich if srv.IsRunning() { t.Errorf("Expected service to be down") } - select { - case <-engineShutdown: - // if the chanel was closed by the connect error (we did not start the freeswitch for this tests) - default: - close(engineShutdown) - } - srvMngr.ShutdownServices(10 * time.Millisecond) + + shdChan.CloseOnce() + runtime.Gosched() + time.Sleep(10 * time.Millisecond) } diff --git a/services/globalvars.go b/services/globalvars.go index d035336cb..772091578 100644 --- a/services/globalvars.go +++ b/services/globalvars.go @@ -43,6 +43,8 @@ type GlobalVarS struct { // Start should handle the sercive start func (gv *GlobalVarS) Start() (err error) { + engine.SetRoundingDecimals(gv.cfg.GeneralCfg().RoundingDecimals) + engine.SetFailedPostCacheTTL(gv.cfg.GeneralCfg().FailedPostsTTL) return gv.initHTTPTransport() } diff --git a/services/httpagent.go b/services/httpagent.go index f90440d41..f03877fb9 100644 --- a/services/httpagent.go +++ b/services/httpagent.go @@ -48,7 +48,9 @@ type HTTPAgent struct { filterSChan chan *engine.FilterS server *cores.Server - ha *agents.HTTPAgent + // we can realy stop the HTTPAgent so keep a flag + // if we registerd the handlers + started bool connMgr *engine.ConnManager } @@ -62,6 +64,7 @@ func (ha *HTTPAgent) Start() (err error) { ha.filterSChan <- filterS ha.Lock() + ha.started = true utils.Logger.Info(fmt.Sprintf("<%s> successfully started HTTPAgent", utils.HTTPAgent)) for _, agntCfg := range ha.cfg.HTTPAgentCfg() { ha.server.RegisterHttpHandler(agntCfg.URL, @@ -80,6 +83,9 @@ func (ha *HTTPAgent) Reload() (err error) { // Shutdown stops the service func (ha *HTTPAgent) Shutdown() (err error) { + ha.Lock() + ha.started = false + ha.Unlock() return // no shutdown for the momment } @@ -87,7 +93,7 @@ func (ha *HTTPAgent) Shutdown() (err error) { func (ha *HTTPAgent) IsRunning() bool { ha.RLock() defer ha.RUnlock() - return ha != nil && ha.ha != nil + return ha != nil && ha.started } // ServiceName returns the service name diff --git a/services/kamailioagent.go b/services/kamailioagent.go index f4b3af9a8..956e1a26e 100644 --- a/services/kamailioagent.go +++ b/services/kamailioagent.go @@ -33,19 +33,19 @@ import ( // NewKamailioAgent returns the Kamailio Agent func NewKamailioAgent(cfg *config.CGRConfig, - exitChan chan<- struct{}, connMgr *engine.ConnManager) servmanager.Service { + shdChan *utils.SyncedChan, connMgr *engine.ConnManager) servmanager.Service { return &KamailioAgent{ - cfg: cfg, - exitChan: exitChan, - connMgr: connMgr, + cfg: cfg, + shdChan: shdChan, + connMgr: connMgr, } } // KamailioAgent implements Agent interface type KamailioAgent struct { sync.RWMutex - cfg *config.CGRConfig - exitChan chan<- struct{} + cfg *config.CGRConfig + shdChan *utils.SyncedChan kam *agents.KamailioAgent connMgr *engine.ConnManager @@ -64,12 +64,10 @@ func (kam *KamailioAgent) Start() (err error) { utils.FirstNonEmpty(kam.cfg.KamAgentCfg().Timezone, kam.cfg.GeneralCfg().DefaultTimezone)) go func(k *agents.KamailioAgent) { - if err = k.Connect(); err != nil { - if strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log - return - } + if err = k.Connect(); err != nil && + !strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err)) - close(kam.exitChan) + kam.shdChan.CloseOnce() } }(kam.kam) return @@ -90,7 +88,7 @@ func (kam *KamailioAgent) Reload() (err error) { return } utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err)) - close(kam.exitChan) + kam.shdChan.CloseOnce() } }(kam.kam) return diff --git a/services/kamailioagent_it_test.go b/services/kamailioagent_it_test.go index 457665bac..92aafcd9c 100644 --- a/services/kamailioagent_it_test.go +++ b/services/kamailioagent_it_test.go @@ -21,6 +21,8 @@ package services import ( "path" + "runtime" + "sync" "testing" "time" @@ -38,23 +40,25 @@ func TestKamailioAgentReload(t *testing.T) { t.Fatal(err) } cfg.SessionSCfg().Enabled = true + cfg.SessionSCfg().ListenBijson = "" utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan struct{}, 1) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) chS := engine.NewCacheS(cfg, nil, nil) cacheSChan := make(chan rpcclient.ClientConnector, 1) cacheSChan <- chS server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) db := NewDataDBService(cfg, nil) - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - engineShutdown, nil, nil, anz) - srv := NewKamailioAgent(cfg, engineShutdown, nil) + shdChan, nil, nil, anz) + srv := NewKamailioAgent(cfg, shdChan, nil) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) @@ -73,21 +77,12 @@ func TestKamailioAgentReload(t *testing.T) { } else if reply != utils.OK { t.Errorf("Expecting OK ,received %s", reply) } + runtime.Gosched() time.Sleep(10 * time.Millisecond) //need to switch to gorutine - if !srv.IsRunning() { - t.Errorf("Expected service to be running") - } - cfg.KamAgentCfg().Enabled = false - cfg.GetReloadChan(config.KamailioAgentJSN) <- struct{}{} - time.Sleep(10 * time.Millisecond) + // the engine should be stoped as we could not connect to kamailio if srv.IsRunning() { t.Errorf("Expected service to be down") } - select { - case <-engineShutdown: - // if the chanel was closed by the connect error (we did not start the kamailio for this tests) - default: - close(engineShutdown) - } - srvMngr.ShutdownServices(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) } diff --git a/services/radiusagent.go b/services/radiusagent.go index b3cd87f80..58ea63fe1 100644 --- a/services/radiusagent.go +++ b/services/radiusagent.go @@ -31,11 +31,11 @@ import ( // NewRadiusAgent returns the Radius Agent func NewRadiusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - exitChan chan<- struct{}, connMgr *engine.ConnManager) servmanager.Service { + shdChan *utils.SyncedChan, connMgr *engine.ConnManager) servmanager.Service { return &RadiusAgent{ cfg: cfg, filterSChan: filterSChan, - exitChan: exitChan, + shdChan: shdChan, connMgr: connMgr, } } @@ -45,7 +45,7 @@ type RadiusAgent struct { sync.RWMutex cfg *config.CGRConfig filterSChan chan *engine.FilterS - exitChan chan<- struct{} + shdChan *utils.SyncedChan stopChan chan struct{} rad *agents.RadiusAgent @@ -80,7 +80,7 @@ func (rad *RadiusAgent) Start() (err error) { go func(r *agents.RadiusAgent) { if err = r.ListenAndServe(rad.stopChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.RadiusAgent, err.Error())) - close(rad.exitChan) + rad.shdChan.CloseOnce() } }(rad.rad) return diff --git a/services/radiusagent_it_test.go b/services/radiusagent_it_test.go index ae6753f4b..0186e2d85 100644 --- a/services/radiusagent_it_test.go +++ b/services/radiusagent_it_test.go @@ -21,6 +21,7 @@ package services import ( "path" + "sync" "testing" "time" @@ -42,19 +43,20 @@ func TestRadiusAgentReload(t *testing.T) { utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan struct{}, 1) - chS := engine.NewCacheS(cfg, nil) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) + chS := engine.NewCacheS(cfg, nil, nil) cacheSChan := make(chan rpcclient.ClientConnector, 1) cacheSChan <- chS server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) db := NewDataDBService(cfg, nil) - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - engineShutdown, nil, nil, anz) - srv := NewRadiusAgent(cfg, filterSChan, engineShutdown, nil) + shdChan, nil, nil, anz) + srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) @@ -83,6 +85,6 @@ func TestRadiusAgentReload(t *testing.T) { if srv.IsRunning() { t.Errorf("Expected service to be down") } - close(engineShutdown) - srvMngr.ShutdownServices(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) } diff --git a/services/rals.go b/services/rals.go index d381c779b..c2a814ea1 100644 --- a/services/rals.go +++ b/services/rals.go @@ -31,10 +31,10 @@ import ( // NewRalService returns the Ral Service func NewRalService(cfg *config.CGRConfig, cacheS *engine.CacheS, server *cores.Server, - internalRALsChan, internalResponderChan chan rpcclient.ClientConnector, exitChan chan<- struct{}, + internalRALsChan, internalResponderChan chan rpcclient.ClientConnector, shdChan *utils.SyncedChan, connMgr *engine.ConnManager, anz *AnalyzerService) *RalService { - resp := NewResponderService(cfg, server, internalResponderChan, exitChan, anz) + resp := NewResponderService(cfg, server, internalResponderChan, shdChan, anz) return &RalService{ connChan: internalRALsChan, diff --git a/services/rals_it_test.go b/services/rals_it_test.go index e27576c44..49ae0f613 100644 --- a/services/rals_it_test.go +++ b/services/rals_it_test.go @@ -21,6 +21,7 @@ package services import ( "path" + "sync" "testing" "time" @@ -41,8 +42,9 @@ func TestRalsReload(t *testing.T) { utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan struct{}, 1) - chS := engine.NewCacheS(cfg, nil) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) + chS := engine.NewCacheS(cfg, nil, nil) close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles)) close(chS.GetPrecacheChannel(utils.CacheThresholds)) close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes)) @@ -60,17 +62,17 @@ func TestRalsReload(t *testing.T) { cfg.ThresholdSCfg().Enabled = true server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) db := NewDataDBService(cfg, nil) cfg.StorDbCfg().Type = utils.INTERNAL - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) stordb := NewStorDBService(cfg) schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) ralS := NewRalService(cfg, chS, server, make(chan rpcclient.ClientConnector, 1), make(chan rpcclient.ClientConnector, 1), - engineShutdown, nil, anz) + shdChan, nil, anz) srvMngr.AddServices(ralS, schS, tS, NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db, stordb) if err = srvMngr.StartServices(); err != nil { @@ -119,6 +121,6 @@ func TestRalsReload(t *testing.T) { if resp := ralS.GetResponder(); resp.IsRunning() { t.Errorf("Expected service to be down") } - close(engineShutdown) - srvMngr.ShutdownServices(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) } diff --git a/services/rates_it_test.go b/services/rates_it_test.go index 9109ba265..bc665fc10 100644 --- a/services/rates_it_test.go +++ b/services/rates_it_test.go @@ -21,6 +21,7 @@ package services import ( "path" + "sync" "testing" "time" @@ -42,15 +43,16 @@ func TestRateSReload(t *testing.T) { utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan struct{}, 1) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) db := NewDataDBService(cfg, nil) - chS := engine.NewCacheS(cfg, nil) + chS := engine.NewCacheS(cfg, nil, nil) close(chS.GetPrecacheChannel(utils.CacheRateProfiles)) close(chS.GetPrecacheChannel(utils.CacheRateProfilesFilterIndexes)) close(chS.GetPrecacheChannel(utils.CacheRateFilterIndexes)) - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) rS := NewRateService(cfg, chS, filterSChan, db, server, make(chan rpcclient.ClientConnector, 1), anz) srvMngr.AddServices(rS, NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) @@ -79,6 +81,6 @@ func TestRateSReload(t *testing.T) { if rS.IsRunning() { t.Errorf("Expected service to be down") } - close(engineShutdown) - srvMngr.ShutdownServices(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) } diff --git a/services/resources_it_test.go b/services/resources_it_test.go index c610fe99b..11f7e31c5 100644 --- a/services/resources_it_test.go +++ b/services/resources_it_test.go @@ -21,6 +21,7 @@ package services import ( "path" + "sync" "testing" "time" @@ -44,8 +45,9 @@ func TestResourceSReload(t *testing.T) { cfg.ThresholdSCfg().Enabled = true filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan struct{}, 1) - chS := engine.NewCacheS(cfg, nil) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) + chS := engine.NewCacheS(cfg, nil, nil) close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles)) close(chS.GetPrecacheChannel(utils.CacheThresholds)) close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes)) @@ -53,8 +55,8 @@ func TestResourceSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheResources)) close(chS.GetPrecacheChannel(utils.CacheResourceFilterIndexes)) server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) db := NewDataDBService(cfg, nil) tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) reS := NewResourceService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) @@ -92,6 +94,6 @@ func TestResourceSReload(t *testing.T) { if reS.IsRunning() { t.Errorf("Expected service to be down") } - close(engineShutdown) - srvMngr.ShutdownServices(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) } diff --git a/services/responders.go b/services/responders.go index d815cb410..6e28a3701 100644 --- a/services/responders.go +++ b/services/responders.go @@ -31,12 +31,12 @@ import ( // NewResponderService returns the Resonder Service func NewResponderService(cfg *config.CGRConfig, server *cores.Server, internalRALsChan chan rpcclient.ClientConnector, - exitChan chan<- struct{}, anz *AnalyzerService) *ResponderService { + shdChan *utils.SyncedChan, anz *AnalyzerService) *ResponderService { return &ResponderService{ connChan: internalRALsChan, cfg: cfg, server: server, - exitChan: exitChan, + shdChan: shdChan, anz: anz, } } @@ -45,9 +45,9 @@ func NewResponderService(cfg *config.CGRConfig, server *cores.Server, // this service is manged by the RALs as a component type ResponderService struct { sync.RWMutex - cfg *config.CGRConfig - server *cores.Server - exitChan chan<- struct{} + cfg *config.CGRConfig + server *cores.Server + shdChan *utils.SyncedChan resp *engine.Responder connChan chan rpcclient.ClientConnector @@ -64,7 +64,7 @@ func (resp *ResponderService) Start() (err error) { resp.Lock() defer resp.Unlock() resp.resp = &engine.Responder{ - ExitChan: resp.exitChan, + ShdChan: resp.shdChan, MaxComputedUsage: resp.cfg.RalsCfg().MaxComputedUsage, } diff --git a/services/routes_it_test.go b/services/routes_it_test.go index 881d095f4..0d1782a6d 100644 --- a/services/routes_it_test.go +++ b/services/routes_it_test.go @@ -21,6 +21,7 @@ package services import ( "path" + "sync" "testing" "time" @@ -43,17 +44,18 @@ func TestSupplierSReload(t *testing.T) { cfg.StatSCfg().Enabled = true filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan struct{}, 1) - chS := engine.NewCacheS(cfg, nil) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) + chS := engine.NewCacheS(cfg, nil, nil) close(chS.GetPrecacheChannel(utils.CacheRouteProfiles)) close(chS.GetPrecacheChannel(utils.CacheRouteFilterIndexes)) close(chS.GetPrecacheChannel(utils.CacheStatQueueProfiles)) close(chS.GetPrecacheChannel(utils.CacheStatQueues)) close(chS.GetPrecacheChannel(utils.CacheStatFilterIndexes)) server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) db := NewDataDBService(cfg, nil) - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) sts := NewStatService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) supS := NewRouteService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) engine.NewConnManager(cfg, nil) @@ -90,6 +92,6 @@ func TestSupplierSReload(t *testing.T) { if supS.IsRunning() { t.Errorf("Expected service to be down") } - close(engineShutdown) - srvMngr.ShutdownServices(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) } diff --git a/services/schedulers_it_test.go b/services/schedulers_it_test.go index 6d8331a22..29160c3ad 100644 --- a/services/schedulers_it_test.go +++ b/services/schedulers_it_test.go @@ -21,6 +21,7 @@ package services import ( "path" + "sync" "testing" "time" @@ -39,15 +40,16 @@ func TestSchedulerSReload(t *testing.T) { } utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) utils.Logger.SetLogLevel(7) - engineShutdown := make(chan struct{}, 1) - chS := engine.NewCacheS(cfg, nil) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) + chS := engine.NewCacheS(cfg, nil, nil) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil close(chS.GetPrecacheChannel(utils.CacheActionPlans)) server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) db := NewDataDBService(cfg, nil) - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) engine.NewConnManager(cfg, nil) srvMngr.AddServices(schS, @@ -83,6 +85,6 @@ func TestSchedulerSReload(t *testing.T) { if schS.IsRunning() { t.Errorf("Expected service to be down") } - close(engineShutdown) - srvMngr.ShutdownServices(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) } diff --git a/services/sessions.go b/services/sessions.go index 8b3589014..0a895791f 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -36,7 +36,7 @@ import ( // NewSessionService returns the Session Service func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, server *cores.Server, internalChan chan rpcclient.ClientConnector, - exitChan chan<- struct{}, connMgr *engine.ConnManager, + shdChan *utils.SyncedChan, connMgr *engine.ConnManager, caps *engine.Caps, anz *AnalyzerService) servmanager.Service { return &SessionService{ @@ -44,7 +44,7 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, cfg: cfg, dm: dm, server: server, - exitChan: exitChan, + shdChan: shdChan, connMgr: connMgr, caps: caps, anz: anz, @@ -57,7 +57,7 @@ type SessionService struct { cfg *config.CGRConfig dm *DataDBService server *cores.Server - exitChan chan<- struct{} + shdChan *utils.SyncedChan stopChan chan struct{} sm *sessions.SessionS @@ -116,7 +116,7 @@ func (smg *SessionService) Start() (err error) { smg.Lock() smg.bircpEnabled = false smg.Unlock() - close(smg.exitChan) + smg.shdChan.CloseOnce() } }() } diff --git a/services/sessions_it_test.go b/services/sessions_it_test.go index de1bb9ddb..23c742007 100644 --- a/services/sessions_it_test.go +++ b/services/sessions_it_test.go @@ -21,6 +21,7 @@ package services import ( "path" + "sync" "testing" "time" @@ -44,8 +45,9 @@ func TestSessionSReload(t *testing.T) { utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan struct{}, 1) - chS := engine.NewCacheS(cfg, nil) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) + chS := engine.NewCacheS(cfg, nil, nil) close(chS.GetPrecacheChannel(utils.CacheChargerProfiles)) close(chS.GetPrecacheChannel(utils.CacheChargerFilterIndexes)) @@ -67,20 +69,20 @@ func TestSessionSReload(t *testing.T) { cacheSChan <- chS server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) db := NewDataDBService(cfg, nil) cfg.StorDbCfg().Type = utils.INTERNAL - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) stordb := NewStorDBService(cfg) chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) ralS := NewRalService(cfg, chS, server, make(chan rpcclient.ClientConnector, 1), make(chan rpcclient.ClientConnector, 1), - engineShutdown, nil, anz) + shdChan, nil, anz) cdrS := NewCDRServer(cfg, db, stordb, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) - srv := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), engineShutdown, nil, nil, anz) + srv := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), shdChan, nil, nil, anz) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, chrS, schS, ralS, cdrS, NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db, stordb) @@ -118,6 +120,6 @@ func TestSessionSReload(t *testing.T) { if srv.IsRunning() { t.Errorf("Expected service to be down") } - close(engineShutdown) - srvMngr.ShutdownServices(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) } diff --git a/services/sipagent.go b/services/sipagent.go index 94e7bef88..e4d8b4e17 100644 --- a/services/sipagent.go +++ b/services/sipagent.go @@ -31,11 +31,11 @@ import ( // NewSIPAgent returns the sip Agent func NewSIPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - exitChan chan<- struct{}, connMgr *engine.ConnManager) servmanager.Service { + shdChan *utils.SyncedChan, connMgr *engine.ConnManager) servmanager.Service { return &SIPAgent{ cfg: cfg, filterSChan: filterSChan, - exitChan: exitChan, + shdChan: shdChan, connMgr: connMgr, } } @@ -45,7 +45,7 @@ type SIPAgent struct { sync.RWMutex cfg *config.CGRConfig filterSChan chan *engine.FilterS - exitChan chan<- struct{} + shdChan *utils.SyncedChan sip *agents.SIPAgent connMgr *engine.ConnManager @@ -74,7 +74,7 @@ func (sip *SIPAgent) Start() (err error) { go func() { if err = sip.sip.ListenAndServe(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.SIPAgent, err.Error())) - close(sip.exitChan) // stop the engine here + sip.shdChan.CloseOnce() // stop the engine here } }() return @@ -94,7 +94,7 @@ func (sip *SIPAgent) Reload() (err error) { go func() { if err := sip.sip.ListenAndServe(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.SIPAgent, err.Error())) - close(sip.exitChan) // stop the engine here + sip.shdChan.CloseOnce() // stop the engine here } }() return diff --git a/services/sipagent_it_test.go b/services/sipagent_it_test.go index 27bb661ba..80d8f100c 100644 --- a/services/sipagent_it_test.go +++ b/services/sipagent_it_test.go @@ -21,6 +21,7 @@ package services import ( "path" + "sync" "testing" "time" @@ -42,19 +43,20 @@ func TestSIPAgentReload(t *testing.T) { utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan struct{}, 1) - chS := engine.NewCacheS(cfg, nil) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) + chS := engine.NewCacheS(cfg, nil, nil) cacheSChan := make(chan rpcclient.ClientConnector, 1) cacheSChan <- chS server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) db := NewDataDBService(cfg, nil) - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - engineShutdown, nil, nil, anz) - srv := NewSIPAgent(cfg, filterSChan, engineShutdown, nil) + shdChan, nil, nil, anz) + srv := NewSIPAgent(cfg, filterSChan, shdChan, nil) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) @@ -83,6 +85,6 @@ func TestSIPAgentReload(t *testing.T) { if srv.IsRunning() { t.Errorf("Expected service to be down") } - close(engineShutdown) - srvMngr.ShutdownServices(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) } diff --git a/services/stats_it_test.go b/services/stats_it_test.go index 5c5cd3e9a..998514de1 100644 --- a/services/stats_it_test.go +++ b/services/stats_it_test.go @@ -21,6 +21,7 @@ package services import ( "path" + "sync" "testing" "time" @@ -44,8 +45,9 @@ func TestStatSReload(t *testing.T) { cfg.ThresholdSCfg().Enabled = true filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan struct{}, 1) - chS := engine.NewCacheS(cfg, nil) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) + chS := engine.NewCacheS(cfg, nil, nil) close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles)) close(chS.GetPrecacheChannel(utils.CacheThresholds)) close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes)) @@ -53,8 +55,8 @@ func TestStatSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheStatQueues)) close(chS.GetPrecacheChannel(utils.CacheStatFilterIndexes)) server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) db := NewDataDBService(cfg, nil) tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) sS := NewStatService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) @@ -92,6 +94,6 @@ func TestStatSReload(t *testing.T) { if sS.IsRunning() { t.Errorf("Expected service to be down") } - close(engineShutdown) - srvMngr.ShutdownServices(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) } diff --git a/services/thresholds_it_test.go b/services/thresholds_it_test.go index 552e448fc..144fa4c94 100644 --- a/services/thresholds_it_test.go +++ b/services/thresholds_it_test.go @@ -21,6 +21,7 @@ package services import ( "path" + "sync" "testing" "time" @@ -42,14 +43,15 @@ func TestThresholdSReload(t *testing.T) { utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil - engineShutdown := make(chan struct{}, 1) - chS := engine.NewCacheS(cfg, nil) + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) + chS := engine.NewCacheS(cfg, nil, nil) close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles)) close(chS.GetPrecacheChannel(utils.CacheThresholds)) close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes)) server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) - anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1)) db := NewDataDBService(cfg, nil) tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) engine.NewConnManager(cfg, nil) @@ -86,6 +88,6 @@ func TestThresholdSReload(t *testing.T) { if tS.IsRunning() { t.Errorf("Expected service to be down") } - close(engineShutdown) - srvMngr.ShutdownServices(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) } diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index bc5912641..e7264ffbd 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -24,7 +24,6 @@ import ( "reflect" "strings" "sync" - "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -33,22 +32,24 @@ import ( ) // NewServiceManager returns a service manager -func NewServiceManager(cfg *config.CGRConfig, engineShutdown chan<- struct{}) *ServiceManager { +func NewServiceManager(cfg *config.CGRConfig, shdChan *utils.SyncedChan, shdWg *sync.WaitGroup) *ServiceManager { sm := &ServiceManager{ - cfg: cfg, - engineShutdown: engineShutdown, - subsystems: make(map[string]Service), + cfg: cfg, + subsystems: make(map[string]Service), + shdChan: shdChan, + shdWg: shdWg, } return sm } // ServiceManager handles service management ran by the engine type ServiceManager struct { - sync.RWMutex // lock access to any shared data - cfg *config.CGRConfig - engineShutdown chan<- struct{} - stopReload chan struct{} - subsystems map[string]Service + sync.RWMutex // lock access to any shared data + cfg *config.CGRConfig + subsystems map[string]Service + + shdChan *utils.SyncedChan + shdWg *sync.WaitGroup } // Call . @@ -149,17 +150,15 @@ func (srvMngr *ServiceManager) GetConfig() *config.CGRConfig { // StartServices starts all enabled services func (srvMngr *ServiceManager) StartServices() (err error) { - srvMngr.stopReload = make(chan struct{}) go srvMngr.handleReload() for _, service := range srvMngr.subsystems { if service.ShouldRun() && !service.IsRunning() { + srvMngr.shdWg.Add(1) go func(srv Service) { - if err := srv.Start(); err != nil { - if err == utils.ErrServiceAlreadyRunning { // in case the service was started in another gorutine - return - } + if err := srv.Start(); err != nil && + err != utils.ErrServiceAlreadyRunning { // in case the service was started in another gorutine utils.Logger.Err(fmt.Sprintf("<%s> failed to start %s because: %s", utils.ServiceManager, srv.ServiceName(), err)) - close(srvMngr.engineShutdown) + srvMngr.shdChan.CloseOnce() } }(service) } @@ -182,7 +181,8 @@ func (srvMngr *ServiceManager) AddServices(services ...Service) { func (srvMngr *ServiceManager) handleReload() { for { select { - case <-srvMngr.stopReload: + case <-srvMngr.shdChan.Done(): + srvMngr.ShutdownServices() return case <-srvMngr.GetConfig().GetReloadChan(config.ATTRIBUTE_JSN): go srvMngr.reloadService(utils.AttributeS) @@ -258,22 +258,23 @@ func (srvMngr *ServiceManager) reloadService(srviceName string) (err error) { if srv.IsRunning() { if err = srv.Reload(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to reload <%s> err <%s>", utils.ServiceManager, srv.ServiceName(), err)) - close(srvMngr.engineShutdown) + srvMngr.shdChan.CloseOnce() return // stop if we encounter an error } } else { + srvMngr.shdWg.Add(1) if err = srv.Start(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to start <%s> err <%s>", utils.ServiceManager, srv.ServiceName(), err)) - close(srvMngr.engineShutdown) + srvMngr.shdChan.CloseOnce() return // stop if we encounter an error } } } else if srv.IsRunning() { if err = srv.Shutdown(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to stop service <%s> err <%s>", utils.ServiceManager, srv.ServiceName(), err)) - close(srvMngr.engineShutdown) - return // stop if we encounter an error + srvMngr.shdChan.CloseOnce() } + srvMngr.shdWg.Done() } return } @@ -287,32 +288,17 @@ func (srvMngr *ServiceManager) GetService(subsystem string) (srv Service) { } // ShutdownServices will stop all services -func (srvMngr *ServiceManager) ShutdownServices(timeout time.Duration) { - close(srvMngr.stopReload) - var wg sync.WaitGroup +func (srvMngr *ServiceManager) ShutdownServices() { for _, srv := range srvMngr.subsystems { // gracefully stop all running subsystems - if !srv.IsRunning() { - continue + if srv.IsRunning() { + go func(srv Service) { + if err := srv.Shutdown(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to shutdown subsystem <%s> because: %s", + utils.ServiceManager, srv.ServiceName(), err)) + } + srvMngr.shdWg.Done() + }(srv) } - wg.Add(1) - go func(srv Service) { - if err := srv.Shutdown(); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to shutdown subsystem <%s> because: %s", - utils.ServiceManager, srv.ServiceName(), err)) - } - wg.Done() - }(srv) - } - c := make(chan struct{}) - go func() { - wg.Wait() - close(c) - }() - select { - case <-c: - case <-time.After(timeout): - utils.Logger.Err(fmt.Sprintf("<%s> Failed to shutdown all subsystems in the given time", - utils.ServiceManager)) } } diff --git a/utils/syncedchan.go b/utils/syncedchan.go new file mode 100644 index 000000000..1207eded3 --- /dev/null +++ b/utils/syncedchan.go @@ -0,0 +1,43 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package utils + +import "sync" + +func NewSyncedChan() *SyncedChan { + return &SyncedChan{ + c: make(chan struct{}), + d: new(sync.Once), + } +} + +type SyncedChan struct { + c chan struct{} + d *sync.Once +} + +func (s *SyncedChan) CloseOnce() { + s.d.Do(func() { + close(s.c) + }) +} + +func (s *SyncedChan) Done() <-chan struct{} { + return s.c +} diff --git a/utils/syncedchan_test.go b/utils/syncedchan_test.go new file mode 100644 index 000000000..c782ff841 --- /dev/null +++ b/utils/syncedchan_test.go @@ -0,0 +1,41 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package utils + +import ( + "testing" + "time" +) + +func TestSyncedChan(t *testing.T) { + defer func() { + if v := recover(); v != nil { + t.Error("Expected to not panic") + } + }() + sc := NewSyncedChan() + sc.CloseOnce() + sc.CloseOnce() + sc.CloseOnce() + select { + case <-sc.Done(): + case <-time.After(10 * time.Millisecond): + t.Error("Timeout") + } +}