From 089dfc00ae25a7b98a0dd3002b2ca5416bfd0baf Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Thu, 9 Jan 2025 21:23:55 +0200 Subject: [PATCH] Use SyncedChan to handle shutdown --- apis/cores_test.go | 4 +-- cmd/cgr-engine/cgr-engine.go | 16 ++++++------ commonlisteners/commonlistener_it_test.go | 4 +-- commonlisteners/commonlisteners.go | 32 +++++++++++------------ commonlisteners/libcommonlisteners.go | 6 ++--- cores/core.go | 6 ++--- cores/core_test.go | 4 +-- engine/caches.go | 4 +-- engine/caches_test.go | 4 +-- services/accounts.go | 4 +-- services/actions.go | 4 +-- services/adminsv1.go | 4 +-- services/analyzers.go | 6 ++--- services/asteriskagent.go | 6 ++--- services/attributes.go | 4 +-- services/caches.go | 10 +++---- services/cdrs.go | 4 +-- services/chargers.go | 4 +-- services/commonlisteners.go | 4 +-- services/config.go | 4 +-- services/cores.go | 4 +-- services/datadb.go | 4 +-- services/diameteragent.go | 8 +++--- services/dispatchers.go | 4 +-- services/dnsagent.go | 8 +++--- services/ees.go | 4 +-- services/efs.go | 4 +-- services/ers.go | 8 +++--- services/filters.go | 4 +-- services/freeswitchagent.go | 8 +++--- services/globalvars.go | 4 +-- services/guardian.go | 4 +-- services/httpagent.go | 4 +-- services/janus.go | 4 +-- services/kamailioagent.go | 8 +++--- services/loaders.go | 4 +-- services/radiusagent.go | 8 +++--- services/rankings.go | 4 +-- services/rates.go | 4 +-- services/registrarc.go | 4 +-- services/resources.go | 4 +-- services/routes.go | 4 +-- services/sessions.go | 8 +++--- services/sipagent.go | 8 +++--- services/stats.go | 4 +-- services/stordb.go | 4 +-- services/thresholds.go | 4 +-- services/tpes.go | 4 +-- services/trends.go | 4 +-- servmanager/servmanager.go | 20 +++++++------- 50 files changed, 151 insertions(+), 151 deletions(-) diff --git a/apis/cores_test.go b/apis/cores_test.go index fb5b12d9e..39d50f761 100644 --- a/apis/cores_test.go +++ b/apis/cores_test.go @@ -60,7 +60,7 @@ func TestCoreSSleep(t *testing.T) { func TestCoreSShutdown(t *testing.T) { cfg := config.NewDefaultCGRConfig() caps := engine.NewCaps(2, utils.MetaTopUp) - shutdown := make(chan struct{}) + shutdown := utils.NewSyncedChan() coreService := cores.NewCoreService(cfg, caps, nil, make(chan struct{}), nil, shutdown) cS := NewCoreSv1(coreService) arg := &utils.CGREvent{} @@ -71,7 +71,7 @@ func TestCoreSShutdown(t *testing.T) { t.Errorf("Expected OK, received %+v", reply) } select { - case <-shutdown: + case <-shutdown.Done(): default: t.Error("engine did not shut down") } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 060c7c8e1..2431bf27c 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -86,7 +86,7 @@ func runCGREngine(fs []string) (err error) { shdWg := new(sync.WaitGroup) shdWg.Add(1) - shutdown := make(chan struct{}) + shutdown := utils.NewSyncedChan() go handleSignals(shutdown, cfg, shdWg) if *flags.ScheduledShutdown != utils.EmptyString { @@ -100,8 +100,8 @@ func runCGREngine(fs []string) (err error) { tm := time.NewTimer(shtDwDur) select { case <-tm.C: - close(shutdown) - case <-shutdown: + shutdown.CloseOnce() + case <-shutdown.Done(): tm.Stop() } }() @@ -266,7 +266,7 @@ func runCGREngine(fs []string) (err error) { } } - <-shutdown + <-shutdown.Done() return } @@ -315,7 +315,7 @@ func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector, cfg *config iServMngrCh <- anz.GetInternalCodec(srv, utils.ServiceManager) } -func cgrStartRPC(cfg *config.CGRConfig, registry *servmanager.ServiceRegistry, shutdown chan struct{}) { +func cgrStartRPC(cfg *config.CGRConfig, registry *servmanager.ServiceRegistry, shutdown *utils.SyncedChan) { if cfg.DispatcherSCfg().Enabled { // wait only for dispatcher as cache is allways registered before this if utils.StructChanTimeout( registry.Lookup(utils.DispatcherS).StateChan(utils.StateServiceUP), @@ -327,7 +327,7 @@ func cgrStartRPC(cfg *config.CGRConfig, registry *servmanager.ServiceRegistry, s cl.StartServer(cfg, shutdown) } -func handleSignals(stopChan chan struct{}, cfg *config.CGRConfig, shdWg *sync.WaitGroup) { +func handleSignals(shutdown *utils.SyncedChan, cfg *config.CGRConfig, shdWg *sync.WaitGroup) { defer shdWg.Done() shutdownSignal := make(chan os.Signal, 1) reloadSignal := make(chan os.Signal, 1) @@ -336,10 +336,10 @@ func handleSignals(stopChan chan struct{}, cfg *config.CGRConfig, shdWg *sync.Wa signal.Notify(reloadSignal, syscall.SIGHUP) for { select { - case <-stopChan: + case <-shutdown.Done(): return case <-shutdownSignal: - close(stopChan) + shutdown.CloseOnce() case <-reloadSignal: // do it in its own goroutine in order to not block the signal handler with the reload functionality go func() { diff --git a/commonlisteners/commonlistener_it_test.go b/commonlisteners/commonlistener_it_test.go index 9c1e06510..4e8744197 100644 --- a/commonlisteners/commonlistener_it_test.go +++ b/commonlisteners/commonlistener_it_test.go @@ -123,8 +123,8 @@ func testServeJSON(t *testing.T) { buff := new(bytes.Buffer) log.SetOutput(buff) - shutdown := make(chan struct{}) - defer close(shutdown) + shutdown := utils.NewSyncedChan() + defer shutdown.CloseOnce() defer server.Stop() go server.ServeJSON(":88845", shutdown) runtime.Gosched() diff --git a/commonlisteners/commonlisteners.go b/commonlisteners/commonlisteners.go index 044274371..7d95ea8b0 100644 --- a/commonlisteners/commonlisteners.go +++ b/commonlisteners/commonlisteners.go @@ -133,10 +133,10 @@ func (c *CommonListenerS) handleWebSocket(ws *websocket.Conn) { c.rpcServer.ServeCodec(newCapsJSONCodec(ws, c.caps, c.anz)) } -func (c *CommonListenerS) ServeJSON(addr string, shutdown chan struct{}) (err error) { +func (c *CommonListenerS) ServeJSON(addr string, shutdown *utils.SyncedChan) (err error) { if c.rpcJSONl, err = net.Listen(utils.TCP, addr); err != nil { log.Printf("Serve%s listen error: %s", utils.JSONCaps, err) - close(shutdown) + shutdown.CloseOnce() return } utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s server at <%s>.", utils.JSONCaps, addr)) @@ -145,10 +145,10 @@ func (c *CommonListenerS) ServeJSON(addr string, shutdown chan struct{}) (err er }) } -func (c *CommonListenerS) ServeGOB(addr string, shutdown chan struct{}) (err error) { +func (c *CommonListenerS) ServeGOB(addr string, shutdown *utils.SyncedChan) (err error) { if c.rpcGOBl, err = net.Listen(utils.TCP, addr); err != nil { log.Printf("Serve%s listen error: %s", utils.GOBCaps, err) - close(shutdown) + shutdown.CloseOnce() return } utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s server at <%s>.", utils.GOBCaps, addr)) @@ -158,7 +158,7 @@ func (c *CommonListenerS) ServeGOB(addr string, shutdown chan struct{}) (err err } func (c *CommonListenerS) ServeHTTP(addr, jsonRPCURL, wsRPCURL, promURL, pprofPath string, - useBasicAuth bool, userList map[string]string, shutdown chan struct{}) { + useBasicAuth bool, userList map[string]string, shutdown *utils.SyncedChan) { c.mu.Lock() c.httpEnabled = c.httpEnabled || jsonRPCURL != "" || wsRPCURL != "" || pprofPath != "" enabled := c.httpEnabled @@ -218,7 +218,7 @@ func (c *CommonListenerS) ServeHTTP(addr, jsonRPCURL, wsRPCURL, promURL, pprofPa c.httpServer.Addr = addr if err := c.httpServer.ListenAndServe(); err != nil { log.Println(fmt.Sprintf("Error: %s when listening ", err)) - close(shutdown) + shutdown.CloseOnce() } } @@ -249,16 +249,16 @@ func (c *CommonListenerS) ServeBiRPC(addrJSON, addrGOB string, onConn, onDis fun } func (c *CommonListenerS) ServeGOBTLS(addr, serverCrt, serverKey, caCert string, serverPolicy int, - serverName string, shutdown chan struct{}) (err error) { + serverName string, shutdown *utils.SyncedChan) (err error) { config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName) if err != nil { - close(shutdown) + shutdown.CloseOnce() return } c.rpcGOBlTLS, err = tls.Listen(utils.TCP, addr, config) if err != nil { log.Println(fmt.Sprintf("Error: %s when listening", err)) - close(shutdown) + shutdown.CloseOnce() return } utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s TLS server at <%s>.", utils.GOBCaps, addr)) @@ -269,16 +269,16 @@ func (c *CommonListenerS) ServeGOBTLS(addr, serverCrt, serverKey, caCert string, } func (c *CommonListenerS) ServeJSONTLS(addr, serverCrt, serverKey, caCert string, serverPolicy int, - serverName string, shutdown chan struct{}) (err error) { + serverName string, shutdown *utils.SyncedChan) (err error) { config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName) if err != nil { - close(shutdown) + shutdown.CloseOnce() return } c.rpcJSONlTLS, err = tls.Listen(utils.TCP, addr, config) if err != nil { log.Println(fmt.Sprintf("Error: %s when listening", err)) - close(shutdown) + shutdown.CloseOnce() return } utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s TLS server at <%s>.", utils.JSONCaps, addr)) @@ -290,7 +290,7 @@ func (c *CommonListenerS) ServeJSONTLS(addr, serverCrt, serverKey, caCert string func (c *CommonListenerS) ServeHTTPS(addr, serverCrt, serverKey, caCert string, serverPolicy int, serverName, jsonRPCURL, wsRPCURL, pprofPath string, useBasicAuth bool, userList map[string]string, - shutdown chan struct{}) { + shutdown *utils.SyncedChan) { c.mu.Lock() c.httpEnabled = c.httpEnabled || jsonRPCURL != "" || wsRPCURL != "" || pprofPath != "" enabled := c.httpEnabled @@ -339,7 +339,7 @@ func (c *CommonListenerS) ServeHTTPS(addr, serverCrt, serverKey, caCert string, } config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName) if err != nil { - close(shutdown) + shutdown.CloseOnce() return } c.httpsServer.Addr = addr @@ -348,7 +348,7 @@ func (c *CommonListenerS) ServeHTTPS(addr, serverCrt, serverKey, caCert string, if err := c.httpsServer.ListenAndServeTLS(serverCrt, serverKey); err != nil { log.Println(fmt.Sprintf("Error: %s when listening ", err)) - close(shutdown) + shutdown.CloseOnce() } } @@ -380,7 +380,7 @@ func (c *CommonListenerS) StopBiRPC() { c.birpcSrv = birpc.NewBirpcServer() } -func (c *CommonListenerS) StartServer(cfg *config.CGRConfig, shutdown chan struct{}) { +func (c *CommonListenerS) StartServer(cfg *config.CGRConfig, shutdown *utils.SyncedChan) { c.startSrv.Do(func() { go c.ServeJSON(cfg.ListenCfg().RPCJSONListen, shutdown) go c.ServeGOB(cfg.ListenCfg().RPCGOBListen, shutdown) diff --git a/commonlisteners/libcommonlisteners.go b/commonlisteners/libcommonlisteners.go index 6a222551b..4b7f0dee2 100644 --- a/commonlisteners/libcommonlisteners.go +++ b/commonlisteners/libcommonlisteners.go @@ -124,14 +124,14 @@ func loadTLSConfig(serverCrt, serverKey, caCert string, serverPolicy int, return } -func acceptRPC(shutdown chan struct{}, srv *birpc.Server, l net.Listener, codecName string, newCodec func(conn conn) birpc.ServerCodec) (err error) { +func acceptRPC(shutdown *utils.SyncedChan, srv *birpc.Server, l net.Listener, codecName string, newCodec func(conn conn) birpc.ServerCodec) (err error) { var errCnt int var lastErrorTime time.Time for { var conn net.Conn if conn, err = l.Accept(); err != nil { select { - case <-shutdown: + case <-shutdown.Done(): return default: } @@ -143,7 +143,7 @@ func acceptRPC(shutdown chan struct{}, srv *birpc.Server, l net.Listener, codecN lastErrorTime = time.Now() errCnt++ if errCnt > 50 { // Too many errors in short interval, network buffer failure most probably - close(shutdown) + shutdown.CloseOnce() return } continue diff --git a/cores/core.go b/cores/core.go index 0ba4d0f39..4c01880c7 100644 --- a/cores/core.go +++ b/cores/core.go @@ -34,7 +34,7 @@ import ( ) func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, fileCPU *os.File, stopChan chan struct{}, - shdWg *sync.WaitGroup, shutdown chan struct{}) *CoreS { + shdWg *sync.WaitGroup, shutdown *utils.SyncedChan) *CoreS { var st *engine.CapsStats if caps.IsLimited() && cfg.CoreSCfg().CapsStatsInterval != 0 { st = engine.NewCapsStats(cfg.CoreSCfg().CapsStatsInterval, caps, stopChan) @@ -53,7 +53,7 @@ type CoreS struct { cfg *config.CGRConfig CapsStats *engine.CapsStats shdWg *sync.WaitGroup - shutdown chan struct{} + shutdown *utils.SyncedChan memProfMux sync.Mutex finalMemProf string // full path of the final memory profile created on stop/shutdown @@ -66,7 +66,7 @@ type CoreS struct { } func (cS *CoreS) ShutdownEngine() { - close(cS.shutdown) + cS.shutdown.CloseOnce() } // Shutdown is called to shutdown the service diff --git a/cores/core_test.go b/cores/core_test.go index 43a5c09b3..23e9e8bf8 100644 --- a/cores/core_test.go +++ b/cores/core_test.go @@ -34,7 +34,7 @@ func TestNewCoreService(t *testing.T) { stopchan := make(chan struct{}, 1) caps := engine.NewCaps(1, utils.MetaBusy) sts := engine.NewCapsStats(cfgDflt.CoreSCfg().CapsStatsInterval, caps, stopchan) - shutdown := make(chan struct{}) + shutdown := utils.NewSyncedChan() expected := &CoreS{ cfg: cfgDflt, CapsStats: sts, @@ -49,7 +49,7 @@ func TestNewCoreService(t *testing.T) { rcv.Shutdown() rcv.ShutdownEngine() select { - case <-shutdown: + case <-shutdown.Done(): default: t.Error("engine did not shut down") } diff --git a/engine/caches.go b/engine/caches.go index e7a33141a..2b7106078 100644 --- a/engine/caches.go +++ b/engine/caches.go @@ -265,7 +265,7 @@ func (chS *CacheS) GetPrecacheChannel(chID string) chan struct{} { } // Precache loads data from DataDB into cache at engine start -func (chS *CacheS) Precache(shutdown chan struct{}) { +func (chS *CacheS) Precache(shutdown *utils.SyncedChan) { for cacheID, cacheCfg := range chS.cfg.CacheCfg().Partitions { if !cacheCfg.Precache { close(chS.pcItems[cacheID]) // no need of precache @@ -278,7 +278,7 @@ func (chS *CacheS) Precache(shutdown chan struct{}) { false) if err != nil && err != context.Canceled { utils.Logger.Crit(fmt.Sprintf("<%s> precaching cacheID <%s>, got error: %s", utils.CacheS, cacheID, err)) - close(shutdown) + shutdown.CloseOnce() return } close(chS.pcItems[cacheID]) diff --git a/engine/caches_test.go b/engine/caches_test.go index a10043d85..873aab732 100644 --- a/engine/caches_test.go +++ b/engine/caches_test.go @@ -1373,7 +1373,7 @@ func TestCacheSPrecachePartitions(t *testing.T) { if _, err := dm.GetAttributeProfile(context.Background(), utils.CGRateSorg, "TEST_ATTRIBUTES_TEST", true, true, utils.NonTransactional); err != nil { t.Error(err) } - cacheS.Precache(make(chan struct{})) + cacheS.Precache(utils.NewSyncedChan()) time.Sleep(10 * time.Millisecond) if rcv, ok := Cache.Get(utils.CacheAttributeProfiles, "cgrates.org:TEST_ATTRIBUTES_TEST"); !ok { @@ -1410,7 +1410,7 @@ func TestCacheSPrecacheErr(t *testing.T) { cacheS := NewCacheS(cfg, nil, connMgr, nil) - cacheS.Precache(make(chan struct{})) + cacheS.Precache(utils.NewSyncedChan()) time.Sleep(10 * time.Millisecond) expErr := " precaching cacheID <*accounts>, got error: NO_DATABASE_CONNECTION" diff --git a/services/accounts.go b/services/accounts.go index 35469ba34..b04a60717 100644 --- a/services/accounts.go +++ b/services/accounts.go @@ -59,7 +59,7 @@ type AccountService struct { } // Start should handle the service start -func (acts *AccountService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (acts *AccountService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -102,7 +102,7 @@ func (acts *AccountService) Start(shutdown chan struct{}, registry *servmanager. } // Reload handles the change of config -func (acts *AccountService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (acts *AccountService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { acts.rldChan <- struct{}{} return // for the moment nothing to reload } diff --git a/services/actions.go b/services/actions.go index e344d1a32..148950d35 100644 --- a/services/actions.go +++ b/services/actions.go @@ -60,7 +60,7 @@ type ActionService struct { } // Start should handle the service start -func (acts *ActionService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (acts *ActionService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -103,7 +103,7 @@ func (acts *ActionService) Start(shutdown chan struct{}, registry *servmanager.S } // Reload handles the change of config -func (acts *ActionService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (acts *ActionService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { acts.rldChan <- struct{}{} return // for the moment nothing to reload } diff --git a/services/adminsv1.go b/services/adminsv1.go index 1da692742..b0f7f1978 100644 --- a/services/adminsv1.go +++ b/services/adminsv1.go @@ -57,7 +57,7 @@ type AdminSv1Service struct { // Start should handle the sercive start // For this service the start should be called from RAL Service -func (apiService *AdminSv1Service) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (apiService *AdminSv1Service) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -100,7 +100,7 @@ func (apiService *AdminSv1Service) Start(_ chan struct{}, registry *servmanager. } // Reload handles the change of config -func (apiService *AdminSv1Service) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (apiService *AdminSv1Service) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { return } diff --git a/services/analyzers.go b/services/analyzers.go index 7cc58a37a..50dc4790d 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -57,7 +57,7 @@ type AnalyzerService struct { } // Start should handle the sercive start -func (anz *AnalyzerService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (anz *AnalyzerService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { cls, err := waitForServiceState(utils.StateServiceUP, utils.CommonListenerS, registry, anz.cfg.GeneralCfg().ConnectTimeout) if err != nil { @@ -75,7 +75,7 @@ func (anz *AnalyzerService) Start(shutdown chan struct{}, registry *servmanager. go func(a *analyzers.AnalyzerS) { if err := a.ListenAndServe(anzCtx); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Error: %s listening for packets", utils.AnalyzerS, err.Error())) - close(shutdown) + shutdown.CloseOnce() } }(anz.anz) anz.cl.SetAnalyzer(anz.anz) @@ -103,7 +103,7 @@ func (anz *AnalyzerService) start(registry *servmanager.ServiceRegistry) { } // Reload handles the change of config -func (anz *AnalyzerService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (anz *AnalyzerService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { return // for the momment nothing to reload } diff --git a/services/asteriskagent.go b/services/asteriskagent.go index 4ce71e969..54cd90579 100644 --- a/services/asteriskagent.go +++ b/services/asteriskagent.go @@ -55,14 +55,14 @@ type AsteriskAgent struct { } // Start should handle the sercive start -func (ast *AsteriskAgent) Start(shutdown chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (ast *AsteriskAgent) Start(shutdown *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { ast.Lock() defer ast.Unlock() listenAndServe := func(sma *agents.AsteriskAgent, stopChan chan struct{}) { if err := sma.ListenAndServe(stopChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> runtime error: %s!", utils.AsteriskAgent, err)) - close(shutdown) + shutdown.CloseOnce() } } ast.stopChan = make(chan struct{}) @@ -75,7 +75,7 @@ func (ast *AsteriskAgent) Start(shutdown chan struct{}, _ *servmanager.ServiceRe } // Reload handles the change of config -func (ast *AsteriskAgent) Reload(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (ast *AsteriskAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { ast.shutdown() return ast.Start(shutdown, registry) } diff --git a/services/attributes.go b/services/attributes.go index 77abb0118..e54e9d6f2 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -57,7 +57,7 @@ type AttributeService struct { } // Start should handle the service start -func (attrS *AttributeService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (attrS *AttributeService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -110,7 +110,7 @@ func (attrS *AttributeService) Start(shutdown chan struct{}, registry *servmanag } // Reload handles the change of config -func (attrS *AttributeService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (attrS *AttributeService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { return // for the moment nothing to reload } diff --git a/services/caches.go b/services/caches.go index b6f9ac728..24fac472e 100644 --- a/services/caches.go +++ b/services/caches.go @@ -53,7 +53,7 @@ type CacheService struct { } // Start should handle the sercive start -func (cS *CacheService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (cS *CacheService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -90,7 +90,7 @@ func (cS *CacheService) Start(shutdown chan struct{}, registry *servmanager.Serv } // Reload handles the change of config -func (cS *CacheService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (_ error) { +func (cS *CacheService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (_ error) { return } @@ -115,17 +115,17 @@ func (cS *CacheService) GetCacheSChan() chan *engine.CacheS { return cS.cacheCh } -func (cS *CacheService) WaitToPrecache(shutdown chan struct{}, cacheIDs ...string) (err error) { +func (cS *CacheService) WaitToPrecache(shutdown *utils.SyncedChan, cacheIDs ...string) (err error) { var cacheS *engine.CacheS select { - case <-shutdown: + case <-shutdown.Done(): return case cacheS = <-cS.cacheCh: cS.cacheCh <- cacheS } for _, cacheID := range cacheIDs { select { - case <-shutdown: + case <-shutdown.Done(): return case <-cacheS.GetPrecacheChannel(cacheID): } diff --git a/services/cdrs.go b/services/cdrs.go index 33c776ba3..106ed2521 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -56,7 +56,7 @@ type CDRService struct { } // Start should handle the sercive start -func (cs *CDRService) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (cs *CDRService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -93,7 +93,7 @@ func (cs *CDRService) Start(_ chan struct{}, registry *servmanager.ServiceRegist } // Reload handles the change of config -func (cs *CDRService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (cs *CDRService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { return } diff --git a/services/chargers.go b/services/chargers.go index 69152f525..cc678ef1f 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -54,7 +54,7 @@ type ChargerService struct { } // Start should handle the service start -func (chrS *ChargerService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) error { +func (chrS *ChargerService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -94,7 +94,7 @@ func (chrS *ChargerService) Start(shutdown chan struct{}, registry *servmanager. } // Reload handles the change of config -func (chrS *ChargerService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (chrS *ChargerService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { return } diff --git a/services/commonlisteners.go b/services/commonlisteners.go index e131ada6a..00cd920fa 100644 --- a/services/commonlisteners.go +++ b/services/commonlisteners.go @@ -53,7 +53,7 @@ type CommonListenerService struct { } // Start handles the service start. -func (cl *CommonListenerService) Start(_ chan struct{}, _ *servmanager.ServiceRegistry) error { +func (cl *CommonListenerService) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error { cl.mu.Lock() defer cl.mu.Unlock() cl.cls = commonlisteners.NewCommonListenerS(cl.caps) @@ -67,7 +67,7 @@ func (cl *CommonListenerService) Start(_ chan struct{}, _ *servmanager.ServiceRe } // Reload handles the config changes. -func (cl *CommonListenerService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) error { +func (cl *CommonListenerService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error { return nil } diff --git a/services/config.go b/services/config.go index 54bf8fe0e..c9828179b 100644 --- a/services/config.go +++ b/services/config.go @@ -47,7 +47,7 @@ type ConfigService struct { } // Start handles the service start. -func (s *ConfigService) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) error { +func (s *ConfigService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -71,7 +71,7 @@ func (s *ConfigService) Start(_ chan struct{}, registry *servmanager.ServiceRegi } // Reload handles the config changes. -func (s *ConfigService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) error { +func (s *ConfigService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error { return nil } diff --git a/services/cores.go b/services/cores.go index 09ab664c9..71d328dcb 100644 --- a/services/cores.go +++ b/services/cores.go @@ -63,7 +63,7 @@ type CoreService struct { } // Start should handle the service start -func (cS *CoreService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) error { +func (cS *CoreService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -96,7 +96,7 @@ func (cS *CoreService) Start(shutdown chan struct{}, registry *servmanager.Servi } // Reload handles the change of config -func (cS *CoreService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) error { +func (cS *CoreService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error { return nil } diff --git a/services/datadb.go b/services/datadb.go index 0c394f0b9..c5440c0c9 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -58,7 +58,7 @@ type DataDBService struct { } // Start handles the service start. -func (db *DataDBService) Start(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (db *DataDBService) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { db.Lock() defer db.Unlock() db.oldDBCfg = db.cfg.DataDbCfg().Clone() @@ -86,7 +86,7 @@ func (db *DataDBService) Start(_ chan struct{}, _ *servmanager.ServiceRegistry) } // Reload handles the change of config -func (db *DataDBService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (db *DataDBService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { db.Lock() defer db.Unlock() if db.needsConnectionReload() { diff --git a/services/diameteragent.go b/services/diameteragent.go index 2c83bd8a2..84dd15de1 100644 --- a/services/diameteragent.go +++ b/services/diameteragent.go @@ -59,7 +59,7 @@ type DiameterAgent struct { } // Start should handle the sercive start -func (da *DiameterAgent) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) error { +func (da *DiameterAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry, da.cfg.GeneralCfg().ConnectTimeout) if err != nil { @@ -71,7 +71,7 @@ func (da *DiameterAgent) Start(shutdown chan struct{}, registry *servmanager.Ser return da.start(fs.(*FilterService).FilterS(), da.caps, shutdown) } -func (da *DiameterAgent) start(filterS *engine.FilterS, caps *engine.Caps, shutdown chan struct{}) error { +func (da *DiameterAgent) start(filterS *engine.FilterS, caps *engine.Caps, shutdown *utils.SyncedChan) error { var err error da.da, err = agents.NewDiameterAgent(da.cfg, filterS, da.connMgr, caps) if err != nil { @@ -86,14 +86,14 @@ func (da *DiameterAgent) start(filterS *engine.FilterS, caps *engine.Caps, shutd if err := d.ListenAndServe(da.stopChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.DiameterAgent, err)) - close(shutdown) + shutdown.CloseOnce() } }(da.da) return nil } // Reload handles the change of config -func (da *DiameterAgent) Reload(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (da *DiameterAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { da.Lock() defer da.Unlock() if da.lnet == da.cfg.DiameterAgentCfg().ListenNet && diff --git a/services/dispatchers.go b/services/dispatchers.go index 889aba913..2223a69a4 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -57,7 +57,7 @@ type DispatcherService struct { } // Start should handle the sercive start -func (dspS *DispatcherService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (dspS *DispatcherService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -103,7 +103,7 @@ func (dspS *DispatcherService) Start(shutdown chan struct{}, registry *servmanag } // Reload handles the change of config -func (dspS *DispatcherService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (dspS *DispatcherService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { return // for the momment nothing to reload } diff --git a/services/dnsagent.go b/services/dnsagent.go index a001b8b11..2b8bfba2e 100644 --- a/services/dnsagent.go +++ b/services/dnsagent.go @@ -55,7 +55,7 @@ type DNSAgent struct { } // Start should handle the service start -func (dns *DNSAgent) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (dns *DNSAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry, dns.cfg.GeneralCfg().ConnectTimeout) if err != nil { @@ -76,7 +76,7 @@ func (dns *DNSAgent) Start(shutdown chan struct{}, registry *servmanager.Service } // Reload handles the change of config -func (dns *DNSAgent) Reload(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (dns *DNSAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry, dns.cfg.GeneralCfg().ConnectTimeout) if err != nil { @@ -104,12 +104,12 @@ func (dns *DNSAgent) Reload(shutdown chan struct{}, registry *servmanager.Servic return } -func (dns *DNSAgent) listenAndServe(stopChan chan struct{}, shutdown chan struct{}) (err error) { +func (dns *DNSAgent) listenAndServe(stopChan chan struct{}, shutdown *utils.SyncedChan) (err error) { dns.dns.RLock() defer dns.dns.RUnlock() if err = dns.dns.ListenAndServe(stopChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) - close(shutdown) // stop the engine here + shutdown.CloseOnce() // stop the engine here } return } diff --git a/services/ees.go b/services/ees.go index 330c1e5ca..d50df80d3 100644 --- a/services/ees.go +++ b/services/ees.go @@ -65,7 +65,7 @@ func (es *EventExporterService) ShouldRun() (should bool) { } // Reload handles the change of config -func (es *EventExporterService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) error { +func (es *EventExporterService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error { es.mu.Lock() defer es.mu.Unlock() es.eeS.ClearExporterCache() @@ -83,7 +83,7 @@ func (es *EventExporterService) Shutdown(_ *servmanager.ServiceRegistry) error { } // Start should handle the service start -func (es *EventExporterService) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) error { +func (es *EventExporterService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, diff --git a/services/efs.go b/services/efs.go index e4c2f3363..174fc4714 100644 --- a/services/efs.go +++ b/services/efs.go @@ -56,7 +56,7 @@ func NewExportFailoverService(cfg *config.CGRConfig, connMgr *engine.ConnManager } // Start should handle the service start -func (efServ *ExportFailoverService) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (efServ *ExportFailoverService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { cls, err := waitForServiceState(utils.StateServiceUP, utils.CommonListenerS, registry, efServ.cfg.GeneralCfg().ConnectTimeout) if err != nil { @@ -75,7 +75,7 @@ func (efServ *ExportFailoverService) Start(_ chan struct{}, registry *servmanage } // Reload handles the change of config -func (efServ *ExportFailoverService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (efServ *ExportFailoverService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { return } diff --git a/services/ers.go b/services/ers.go index caa251247..b316a2e7e 100644 --- a/services/ers.go +++ b/services/ers.go @@ -60,7 +60,7 @@ type EventReaderService struct { } // Start should handle the sercive start -func (erS *EventReaderService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (erS *EventReaderService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -96,16 +96,16 @@ func (erS *EventReaderService) Start(shutdown chan struct{}, registry *servmanag return } -func (erS *EventReaderService) listenAndServe(ers *ers.ERService, stopChan, rldChan, shutdown chan struct{}) (err error) { +func (erS *EventReaderService) listenAndServe(ers *ers.ERService, stopChan, rldChan chan struct{}, shutdown *utils.SyncedChan) (err error) { if err = ers.ListenAndServe(stopChan, rldChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%v>", utils.ERs, err)) - close(shutdown) + shutdown.CloseOnce() } return } // Reload handles the change of config -func (erS *EventReaderService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (erS *EventReaderService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { erS.RLock() erS.rldChan <- struct{}{} erS.RUnlock() diff --git a/services/filters.go b/services/filters.go index a59e8a185..67f8385a3 100644 --- a/services/filters.go +++ b/services/filters.go @@ -51,7 +51,7 @@ type FilterService struct { } // Start handles the service start. -func (s *FilterService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) error { +func (s *FilterService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CacheS, @@ -75,7 +75,7 @@ func (s *FilterService) Start(shutdown chan struct{}, registry *servmanager.Serv } // Reload handles the config changes. -func (s *FilterService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) error { +func (s *FilterService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error { return nil } diff --git a/services/freeswitchagent.go b/services/freeswitchagent.go index b2d526820..07936ad7f 100644 --- a/services/freeswitchagent.go +++ b/services/freeswitchagent.go @@ -54,7 +54,7 @@ type FreeswitchAgent struct { } // Start should handle the sercive start -func (fS *FreeswitchAgent) Start(shutdown chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (fS *FreeswitchAgent) Start(shutdown *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { fS.Lock() defer fS.Unlock() @@ -65,7 +65,7 @@ func (fS *FreeswitchAgent) Start(shutdown chan struct{}, _ *servmanager.ServiceR } // Reload handles the change of config -func (fS *FreeswitchAgent) Reload(shutdown chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (fS *FreeswitchAgent) Reload(shutdown *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { fS.Lock() defer fS.Unlock() if err = fS.fS.Shutdown(); err != nil { @@ -76,10 +76,10 @@ func (fS *FreeswitchAgent) Reload(shutdown chan struct{}, _ *servmanager.Service return } -func (fS *FreeswitchAgent) connect(shutdown chan struct{}) { +func (fS *FreeswitchAgent) connect(shutdown *utils.SyncedChan) { if err := fS.fS.Connect(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.FreeSWITCHAgent, err)) - close(shutdown) // stop the engine here + shutdown.CloseOnce() // stop the engine here } return } diff --git a/services/globalvars.go b/services/globalvars.go index 07a775326..d555cbae7 100644 --- a/services/globalvars.go +++ b/services/globalvars.go @@ -44,7 +44,7 @@ type GlobalVarS struct { } // Start should handle the sercive start -func (gv *GlobalVarS) Start(_ chan struct{}, _ *servmanager.ServiceRegistry) error { +func (gv *GlobalVarS) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error { engine.SetHTTPPstrTransport(gv.cfg.HTTPCfg().ClientOpts) utils.DecimalContext.MaxScale = gv.cfg.GeneralCfg().DecimalMaxScale utils.DecimalContext.MinScale = gv.cfg.GeneralCfg().DecimalMinScale @@ -54,7 +54,7 @@ func (gv *GlobalVarS) Start(_ chan struct{}, _ *servmanager.ServiceRegistry) err } // Reload handles the change of config -func (gv *GlobalVarS) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) error { +func (gv *GlobalVarS) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error { engine.SetHTTPPstrTransport(gv.cfg.HTTPCfg().ClientOpts) utils.DecimalContext.MaxScale = gv.cfg.GeneralCfg().DecimalMaxScale utils.DecimalContext.MinScale = gv.cfg.GeneralCfg().DecimalMinScale diff --git a/services/guardian.go b/services/guardian.go index 42d91ddc3..07aafdd77 100644 --- a/services/guardian.go +++ b/services/guardian.go @@ -48,7 +48,7 @@ type GuardianService struct { } // Start handles the service start. -func (s *GuardianService) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) error { +func (s *GuardianService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -75,7 +75,7 @@ func (s *GuardianService) Start(_ chan struct{}, registry *servmanager.ServiceRe } // Reload handles the config changes. -func (s *GuardianService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) error { +func (s *GuardianService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error { return nil } diff --git a/services/httpagent.go b/services/httpagent.go index 636981dda..0d7dcb34f 100644 --- a/services/httpagent.go +++ b/services/httpagent.go @@ -58,7 +58,7 @@ type HTTPAgent struct { } // Start should handle the sercive start -func (ha *HTTPAgent) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (ha *HTTPAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -85,7 +85,7 @@ func (ha *HTTPAgent) Start(_ chan struct{}, registry *servmanager.ServiceRegistr } // Reload handles the change of config -func (ha *HTTPAgent) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (ha *HTTPAgent) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { return // no reload } diff --git a/services/janus.go b/services/janus.go index af6c5848f..2ee7b18f4 100644 --- a/services/janus.go +++ b/services/janus.go @@ -59,7 +59,7 @@ type JanusAgent struct { } // Start should jandle the sercive start -func (ja *JanusAgent) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (ja *JanusAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -99,7 +99,7 @@ func (ja *JanusAgent) Start(_ chan struct{}, registry *servmanager.ServiceRegist } // Reload jandles the change of config -func (ja *JanusAgent) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (ja *JanusAgent) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { return // no reload } diff --git a/services/kamailioagent.go b/services/kamailioagent.go index 20178960e..5476cfec3 100644 --- a/services/kamailioagent.go +++ b/services/kamailioagent.go @@ -54,7 +54,7 @@ type KamailioAgent struct { } // Start should handle the sercive start -func (kam *KamailioAgent) Start(shutdown chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (kam *KamailioAgent) Start(shutdown *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { kam.Lock() defer kam.Unlock() @@ -66,7 +66,7 @@ func (kam *KamailioAgent) Start(shutdown chan struct{}, _ *servmanager.ServiceRe } // Reload handles the change of config -func (kam *KamailioAgent) Reload(shutdown chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (kam *KamailioAgent) Reload(shutdown *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { kam.Lock() defer kam.Unlock() if err = kam.kam.Shutdown(); err != nil { @@ -77,13 +77,13 @@ func (kam *KamailioAgent) Reload(shutdown chan struct{}, _ *servmanager.ServiceR return } -func (kam *KamailioAgent) connect(k *agents.KamailioAgent, shutdown chan struct{}) (err error) { +func (kam *KamailioAgent) connect(k *agents.KamailioAgent, shutdown *utils.SyncedChan) (err error) { if err = k.Connect(); err != nil { if !strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log if !strings.Contains(err.Error(), "KamEvapi") { utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err)) } - close(shutdown) + shutdown.CloseOnce() } } return diff --git a/services/loaders.go b/services/loaders.go index a855f5c80..5c8cf7c36 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -57,7 +57,7 @@ type LoaderService struct { } // Start should handle the service start -func (ldrs *LoaderService) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (ldrs *LoaderService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -97,7 +97,7 @@ func (ldrs *LoaderService) Start(_ chan struct{}, registry *servmanager.ServiceR } // Reload handles the change of config -func (ldrs *LoaderService) Reload(_ chan struct{}, registry *servmanager.ServiceRegistry) error { +func (ldrs *LoaderService) Reload(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.FilterS, diff --git a/services/radiusagent.go b/services/radiusagent.go index 045b9f9f2..5910e9557 100644 --- a/services/radiusagent.go +++ b/services/radiusagent.go @@ -58,7 +58,7 @@ type RadiusAgent struct { } // Start should handle the sercive start -func (rad *RadiusAgent) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (rad *RadiusAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry, rad.cfg.GeneralCfg().ConnectTimeout) if err != nil { @@ -81,16 +81,16 @@ func (rad *RadiusAgent) Start(shutdown chan struct{}, registry *servmanager.Serv return } -func (rad *RadiusAgent) listenAndServe(r *agents.RadiusAgent, shutdown chan struct{}) (err error) { +func (rad *RadiusAgent) listenAndServe(r *agents.RadiusAgent, shutdown *utils.SyncedChan) (err error) { if err = r.ListenAndServe(rad.stopChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.RadiusAgent, err.Error())) - close(shutdown) + shutdown.CloseOnce() } return } // Reload handles the change of config -func (rad *RadiusAgent) Reload(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (rad *RadiusAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { if rad.lnet == rad.cfg.RadiusAgentCfg().ListenNet && rad.lauth == rad.cfg.RadiusAgentCfg().ListenAuth && rad.lacct == rad.cfg.RadiusAgentCfg().ListenAcct { diff --git a/services/rankings.go b/services/rankings.go index e13f6cb4d..5b9b7bec1 100644 --- a/services/rankings.go +++ b/services/rankings.go @@ -58,7 +58,7 @@ type RankingService struct { } // Start should handle the sercive start -func (ran *RankingService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { ran.srvDep[utils.DataDB].Add(1) srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, @@ -104,7 +104,7 @@ func (ran *RankingService) Start(shutdown chan struct{}, registry *servmanager.S } // Reload handles the change of config -func (ran *RankingService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (ran *RankingService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { ran.Lock() ran.ran.Reload(context.TODO()) ran.Unlock() diff --git a/services/rates.go b/services/rates.go index 5789d4165..e6fecf3d8 100644 --- a/services/rates.go +++ b/services/rates.go @@ -65,7 +65,7 @@ func (rs *RateService) ShouldRun() (should bool) { } // Reload handles the change of config -func (rs *RateService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (_ error) { +func (rs *RateService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (_ error) { rs.rldChan <- struct{}{} return } @@ -81,7 +81,7 @@ func (rs *RateService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { } // Start should handle the service start -func (rs *RateService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, diff --git a/services/registrarc.go b/services/registrarc.go index ce81e27f5..000c2deed 100644 --- a/services/registrarc.go +++ b/services/registrarc.go @@ -54,7 +54,7 @@ type RegistrarCService struct { } // Start should handle the sercive start -func (dspS *RegistrarCService) Start(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (dspS *RegistrarCService) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { dspS.Lock() defer dspS.Unlock() @@ -66,7 +66,7 @@ func (dspS *RegistrarCService) Start(_ chan struct{}, _ *servmanager.ServiceRegi } // Reload handles the change of config -func (dspS *RegistrarCService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (dspS *RegistrarCService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { dspS.rldChan <- struct{}{} return // for the momment nothing to reload } diff --git a/services/resources.go b/services/resources.go index 87769197a..f5d4b4966 100644 --- a/services/resources.go +++ b/services/resources.go @@ -58,7 +58,7 @@ type ResourceService struct { } // Start should handle the service start -func (reS *ResourceService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (reS *ResourceService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { reS.srvDep[utils.DataDB].Add(1) srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, @@ -102,7 +102,7 @@ func (reS *ResourceService) Start(shutdown chan struct{}, registry *servmanager. } // Reload handles the change of config -func (reS *ResourceService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (reS *ResourceService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { reS.Lock() reS.reS.Reload(context.TODO()) reS.Unlock() diff --git a/services/routes.go b/services/routes.go index eb59e4d2c..f8495055e 100644 --- a/services/routes.go +++ b/services/routes.go @@ -54,7 +54,7 @@ type RouteService struct { } // Start should handle the sercive start -func (routeS *RouteService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (routeS *RouteService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -93,7 +93,7 @@ func (routeS *RouteService) Start(shutdown chan struct{}, registry *servmanager. } // Reload handles the change of config -func (routeS *RouteService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (routeS *RouteService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { return } diff --git a/services/sessions.go b/services/sessions.go index 5954f624e..5219a74b4 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -59,7 +59,7 @@ type SessionService struct { } // Start should handle the service start -func (smg *SessionService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (smg *SessionService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -106,20 +106,20 @@ func (smg *SessionService) Start(shutdown chan struct{}, registry *servmanager.S return } -func (smg *SessionService) start(shutdown chan struct{}) (err error) { +func (smg *SessionService) start(shutdown *utils.SyncedChan) (err error) { if err := smg.cl.ServeBiRPC(smg.cfg.SessionSCfg().ListenBijson, smg.cfg.SessionSCfg().ListenBigob, smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> serve BiRPC error: %s!", utils.SessionS, err)) smg.Lock() smg.bircpEnabled = false smg.Unlock() - close(shutdown) + shutdown.CloseOnce() } return } // Reload handles the change of config -func (smg *SessionService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (smg *SessionService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { return } diff --git a/services/sipagent.go b/services/sipagent.go index a1d2ffd78..3d554ecf1 100644 --- a/services/sipagent.go +++ b/services/sipagent.go @@ -55,7 +55,7 @@ type SIPAgent struct { } // Start should handle the sercive start -func (sip *SIPAgent) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (sip *SIPAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry, sip.cfg.GeneralCfg().ConnectTimeout) if err != nil { @@ -75,15 +75,15 @@ func (sip *SIPAgent) Start(shutdown chan struct{}, registry *servmanager.Service return } -func (sip *SIPAgent) listenAndServe(shutdown chan struct{}) { +func (sip *SIPAgent) listenAndServe(shutdown *utils.SyncedChan) { if err := sip.sip.ListenAndServe(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.SIPAgent, err.Error())) - close(shutdown) // stop the engine here + shutdown.CloseOnce() // stop the engine here } } // Reload handles the change of config -func (sip *SIPAgent) Reload(shutdown chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (sip *SIPAgent) Reload(shutdown *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { if sip.oldListen == sip.cfg.SIPAgentCfg().Listen { return } diff --git a/services/stats.go b/services/stats.go index 880454801..6a9b2a849 100644 --- a/services/stats.go +++ b/services/stats.go @@ -56,7 +56,7 @@ type StatService struct { } // Start should handle the sercive start -func (sts *StatService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (sts *StatService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { sts.srvDep[utils.DataDB].Add(1) srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, @@ -99,7 +99,7 @@ func (sts *StatService) Start(shutdown chan struct{}, registry *servmanager.Serv } // Reload handles the change of config -func (sts *StatService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (sts *StatService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { sts.Lock() sts.sts.Reload(context.TODO()) sts.Unlock() diff --git a/services/stordb.go b/services/stordb.go index b9aa6deaf..060244354 100644 --- a/services/stordb.go +++ b/services/stordb.go @@ -52,7 +52,7 @@ type StorDBService struct { } // Start should handle the service start -func (db *StorDBService) Start(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (db *StorDBService) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { db.Lock() defer db.Unlock() db.oldDBCfg = db.cfg.StorDbCfg().Clone() @@ -79,7 +79,7 @@ func (db *StorDBService) Start(_ chan struct{}, _ *servmanager.ServiceRegistry) } // Reload handles the change of config -func (db *StorDBService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (db *StorDBService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { db.Lock() defer db.Unlock() if db.needsConnectionReload() { diff --git a/services/thresholds.go b/services/thresholds.go index cf61ccdae..fe8ad10db 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -58,7 +58,7 @@ type ThresholdService struct { } // Start should handle the sercive start -func (thrs *ThresholdService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (thrs *ThresholdService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { thrs.srvDep[utils.DataDB].Add(1) srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, @@ -101,7 +101,7 @@ func (thrs *ThresholdService) Start(shutdown chan struct{}, registry *servmanage } // Reload handles the change of config -func (thrs *ThresholdService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (_ error) { +func (thrs *ThresholdService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (_ error) { thrs.Lock() thrs.thrs.Reload(context.TODO()) thrs.Unlock() diff --git a/services/tpes.go b/services/tpes.go index e67a12101..83fef43a3 100644 --- a/services/tpes.go +++ b/services/tpes.go @@ -56,7 +56,7 @@ type TPeService struct { } // Start should handle the service start -func (ts *TPeService) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (ts *TPeService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ @@ -78,7 +78,7 @@ func (ts *TPeService) Start(_ chan struct{}, registry *servmanager.ServiceRegist } // Reload handles the change of config -func (ts *TPeService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (ts *TPeService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { return } diff --git a/services/trends.go b/services/trends.go index da634971e..d5c71f414 100644 --- a/services/trends.go +++ b/services/trends.go @@ -57,7 +57,7 @@ type TrendService struct { } // Start should handle the sercive start -func (trs *TrendService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { +func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { trs.srvDep[utils.DataDB].Add(1) srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, @@ -103,7 +103,7 @@ func (trs *TrendService) Start(shutdown chan struct{}, registry *servmanager.Ser } // Reload handles the change of config -func (trs *TrendService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { +func (trs *TrendService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { trs.Lock() trs.trs.Reload(context.TODO()) trs.Unlock() diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 8d716f5b9..fa9e6106f 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -55,7 +55,7 @@ type ServiceManager struct { } // StartServices starts all enabled services -func (m *ServiceManager) StartServices(shutdown chan struct{}) { +func (m *ServiceManager) StartServices(shutdown *utils.SyncedChan) { go m.handleReload(shutdown) for _, svc := range m.registry.List() { // TODO: verify if IsServiceInState check is needed. It should @@ -67,7 +67,7 @@ func (m *ServiceManager) StartServices(shutdown chan struct{}) { if err := svc.Start(shutdown, m.registry); err != nil && err != utils.ErrServiceAlreadyRunning { // in case the service was started in another gorutine utils.Logger.Err(fmt.Sprintf("<%s> failed to start <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err)) - close(shutdown) + shutdown.CloseOnce() } close(svc.StateChan(utils.StateServiceUP)) utils.Logger.Info(fmt.Sprintf("<%s> started <%s> service", utils.ServiceManager, svc.ServiceName())) @@ -104,11 +104,11 @@ func (m *ServiceManager) AddServices(services ...Service) { m.Unlock() } -func (m *ServiceManager) handleReload(shutdown chan struct{}) { +func (m *ServiceManager) handleReload(shutdown *utils.SyncedChan) { var serviceID string for { select { - case <-shutdown: + case <-shutdown.Done(): m.ShutdownServices() return case serviceID = <-m.rldChan: @@ -123,7 +123,7 @@ func (m *ServiceManager) handleReload(shutdown chan struct{}) { } } -func (m *ServiceManager) reloadService(id string, shutdown chan struct{}) (err error) { +func (m *ServiceManager) reloadService(id string, shutdown *utils.SyncedChan) (err error) { svc := m.registry.Lookup(id) isUp := IsServiceInState(svc, utils.StateServiceUP) if svc.ShouldRun() { @@ -131,7 +131,7 @@ func (m *ServiceManager) reloadService(id string, shutdown chan struct{}) (err e // TODO: state channels must be reinitiated for both SERVICE_UP and SERVICE_DOWN. if err = svc.Reload(shutdown, m.registry); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to reload <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err)) - close(shutdown) + shutdown.CloseOnce() return // stop if we encounter an error } utils.Logger.Info(fmt.Sprintf("<%s> reloaded <%s> service", utils.ServiceManager, svc.ServiceName())) @@ -139,7 +139,7 @@ func (m *ServiceManager) reloadService(id string, shutdown chan struct{}) (err e m.shdWg.Add(1) if err = svc.Start(shutdown, m.registry); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to start <%s> serivce: %v", utils.ServiceManager, svc.ServiceName(), err)) - close(shutdown) + shutdown.CloseOnce() return // stop if we encounter an error } close(svc.StateChan(utils.StateServiceUP)) @@ -148,7 +148,7 @@ func (m *ServiceManager) reloadService(id string, shutdown chan struct{}) (err e } else if isUp { if err = svc.Shutdown(m.registry); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to shut down <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err)) - close(shutdown) + shutdown.CloseOnce() } close(svc.StateChan(utils.StateServiceDOWN)) utils.Logger.Info(fmt.Sprintf("<%s> stopped <%s> service", utils.ServiceManager, svc.ServiceName())) @@ -178,9 +178,9 @@ func (m *ServiceManager) ShutdownServices() { // Service interface that describes what functions should a service implement type Service interface { // Start should handle the service start - Start(chan struct{}, *ServiceRegistry) error + Start(*utils.SyncedChan, *ServiceRegistry) error // Reload handles the change of config - Reload(chan struct{}, *ServiceRegistry) error + Reload(*utils.SyncedChan, *ServiceRegistry) error // Shutdown stops the service Shutdown(*ServiceRegistry) error // ShouldRun returns if the service should be running