From c8a3ebe5e8656a41cc9e2ed0f5d223b5fcc29e28 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Thu, 12 Dec 2024 20:27:23 +0200 Subject: [PATCH] Use channel instead of context to handle shutdown --- apis/cores_test.go | 10 ++-- cmd/cgr-engine/cgr-engine.go | 71 ++++++++++------------- commonlisteners/commonlistener_it_test.go | 13 ++--- commonlisteners/commonlisteners.go | 64 ++++++++++---------- commonlisteners/libcommonlisteners.go | 8 +-- cores/core.go | 8 +-- cores/core_test.go | 10 +++- engine/caches.go | 6 +- engine/caches_test.go | 8 +-- services/accounts.go | 10 ++-- services/actions.go | 10 ++-- services/adminsv1.go | 7 +-- services/analyzers.go | 8 +-- services/asteriskagent.go | 11 ++-- services/attributes.go | 9 ++- services/caches.go | 17 +++--- services/cdrs.go | 7 +-- services/chargers.go | 10 ++-- services/commonlisteners.go | 5 +- services/config.go | 5 +- services/cores.go | 7 +-- services/datadb.go | 5 +- services/diameteragent.go | 15 +++-- services/dispatchers.go | 7 +-- services/dnsagent.go | 15 +++-- services/ees.go | 7 +-- services/efs.go | 6 +- services/ers.go | 13 ++--- services/filters.go | 7 +-- services/freeswitchagent.go | 15 +++-- services/globalvars.go | 5 +- services/guardian.go | 5 +- services/httpagent.go | 7 +-- services/janus.go | 7 +-- services/kamailioagent.go | 18 +++--- services/loaders.go | 6 +- services/radiusagent.go | 15 +++-- services/rankings.go | 12 ++-- services/rates.go | 9 ++- services/registrarc.go | 7 +-- services/resources.go | 12 ++-- services/routes.go | 10 ++-- services/sessions.go | 14 ++--- services/sipagent.go | 15 +++-- services/stats.go | 12 ++-- services/stordb.go | 5 +- services/thresholds.go | 12 ++-- services/tpes.go | 7 +-- services/trends.go | 12 ++-- servmanager/servmanager.go | 30 +++++----- 50 files changed, 281 insertions(+), 333 deletions(-) diff --git a/apis/cores_test.go b/apis/cores_test.go index 44d745b92..fb5b12d9e 100644 --- a/apis/cores_test.go +++ b/apis/cores_test.go @@ -60,8 +60,8 @@ func TestCoreSSleep(t *testing.T) { func TestCoreSShutdown(t *testing.T) { cfg := config.NewDefaultCGRConfig() caps := engine.NewCaps(2, utils.MetaTopUp) - var closed bool - coreService := cores.NewCoreService(cfg, caps, nil, make(chan struct{}), nil, func() { closed = true }) + shutdown := make(chan struct{}) + coreService := cores.NewCoreService(cfg, caps, nil, make(chan struct{}), nil, shutdown) cS := NewCoreSv1(coreService) arg := &utils.CGREvent{} var reply string @@ -70,8 +70,10 @@ func TestCoreSShutdown(t *testing.T) { } else if reply != "OK" { t.Errorf("Expected OK, received %+v", reply) } - if !closed { - t.Error("Did not stop the engine") + select { + case <-shutdown: + default: + t.Error("engine did not shut down") } } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 95ad91592..bd75a25a9 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -72,10 +72,8 @@ func runCGREngine(fs []string) (err error) { runtime.GOMAXPROCS(1) // Having multiple cpus may slow down computing due to CPU management, to be reviewed in future Go releases } - // Init config - ctx, cancel := context.WithCancel(context.Background()) var cfg *config.CGRConfig - if cfg, err = services.InitConfigFromPath(ctx, *flags.CfgPath, *flags.NodeID, *flags.LogLevel); err != nil || *flags.CheckConfig { + if cfg, err = services.InitConfigFromPath(context.TODO(), *flags.CfgPath, *flags.NodeID, *flags.LogLevel); err != nil || *flags.CheckConfig { return } @@ -89,7 +87,8 @@ func runCGREngine(fs []string) (err error) { shdWg := new(sync.WaitGroup) shdWg.Add(1) - go handleSignals(ctx, cancel, cfg, shdWg) + shutdown := make(chan struct{}) + go handleSignals(shutdown, cfg, shdWg) if *flags.ScheduledShutdown != utils.EmptyString { var shtDwDur time.Duration @@ -101,8 +100,8 @@ func runCGREngine(fs []string) (err error) { tm := time.NewTimer(shtDwDur) select { case <-tm.C: - cancel() - case <-ctx.Done(): + close(shutdown) + case <-shutdown: tm.Stop() } shdWg.Done() @@ -111,7 +110,7 @@ func runCGREngine(fs []string) (err error) { connMgr := engine.NewConnManager(cfg) // init syslog - if utils.Logger, err = engine.NewLogger(ctx, + if utils.Logger, err = engine.NewLogger(context.TODO(), utils.FirstNonEmpty(*flags.Logger, cfg.LoggerCfg().Type), cfg.GeneralCfg().DefaultTenant, cfg.GeneralCfg().NodeID, @@ -245,14 +244,14 @@ func runCGREngine(fs []string) (err error) { }() shdWg.Add(1) - if err = gvS.Start(ctx, cancel); err != nil { + if err = gvS.Start(shutdown); err != nil { shdWg.Done() srvManager.ShutdownServices() return } if cls.ShouldRun() { shdWg.Add(1) - if err = cls.Start(ctx, cancel); err != nil { + if err = cls.Start(shutdown); err != nil { shdWg.Done() srvManager.ShutdownServices() return @@ -260,7 +259,7 @@ func runCGREngine(fs []string) (err error) { } if efs.ShouldRun() { // efs checking first because of loggers shdWg.Add(1) - if err = efs.Start(ctx, cancel); err != nil { + if err = efs.Start(shutdown); err != nil { shdWg.Done() srvManager.ShutdownServices() return @@ -268,7 +267,7 @@ func runCGREngine(fs []string) (err error) { } if dmS.ShouldRun() { // Some services can run without db, ie: ERs shdWg.Add(1) - if err = dmS.Start(ctx, cancel); err != nil { + if err = dmS.Start(shutdown); err != nil { shdWg.Done() srvManager.ShutdownServices() return @@ -276,7 +275,7 @@ func runCGREngine(fs []string) (err error) { } if sdbS.ShouldRun() { shdWg.Add(1) - if err = sdbS.Start(ctx, cancel); err != nil { + if err = sdbS.Start(shutdown); err != nil { shdWg.Done() srvManager.ShutdownServices() return @@ -285,7 +284,7 @@ func runCGREngine(fs []string) (err error) { if anzS.ShouldRun() { shdWg.Add(1) - if err = anzS.Start(ctx, cancel); err != nil { + if err = anzS.Start(shutdown); err != nil { shdWg.Done() srvManager.ShutdownServices() return @@ -295,28 +294,28 @@ func runCGREngine(fs []string) (err error) { } shdWg.Add(1) - if err = coreS.Start(ctx, cancel); err != nil { + if err = coreS.Start(shutdown); err != nil { shdWg.Done() srvManager.ShutdownServices() return } shdWg.Add(1) - if err = cacheS.Start(ctx, cancel); err != nil { + if err = cacheS.Start(shutdown); err != nil { shdWg.Done() srvManager.ShutdownServices() return } - srvManager.StartServices(ctx, cancel) + srvManager.StartServices(shutdown) cgrInitServiceManagerV1(iServeManagerCh, srvManager, cfg, cls.CLS(), anzS) if *flags.Preload != utils.EmptyString { - if err = cgrRunPreload(ctx, cfg, *flags.Preload, srvIdxr); err != nil { + if err = cgrRunPreload(cfg, *flags.Preload, srvIdxr); err != nil { return } } // Serve rpc connections - cgrStartRPC(ctx, cancel, cfg, srvIdxr) + cgrStartRPC(cfg, srvIdxr, shutdown) // TODO: find a better location for this if block if *flags.MemPrfDir != "" { @@ -330,26 +329,24 @@ func runCGREngine(fs []string) (err error) { } } - <-ctx.Done() + <-shutdown return } -func cgrRunPreload(ctx *context.Context, cfg *config.CGRConfig, loaderIDs string, +// TODO: merge with LoaderService +func cgrRunPreload(cfg *config.CGRConfig, loaderIDs string, sIdxr *servmanager.ServiceIndexer) (err error) { if !cfg.LoaderCfg().Enabled() { err = fmt.Errorf("<%s> not enabled but required by preload mechanism", utils.LoaderS) return } loader := sIdxr.GetService(utils.LoaderS).(*services.LoaderService) - select { - case <-loader.StateChan(utils.StateServiceUP): - case <-ctx.Done(): - return + if utils.StructChanTimeout(loader.StateChan(utils.StateServiceUP), cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.PreloadCgr, utils.LoaderS, utils.StateServiceUP) } - var reply string for _, loaderID := range strings.Split(loaderIDs, utils.FieldsSep) { - if err = loader.GetLoaderS().V1Run(ctx, &loaders.ArgsProcessFolder{ + if err = loader.GetLoaderS().V1Run(context.TODO(), &loaders.ArgsProcessFolder{ APIOpts: map[string]any{ utils.MetaForceLock: true, // force lock will unlock the file in case is locked and return error utils.MetaStopOnError: true, @@ -373,21 +370,19 @@ func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector, iServMngrCh <- anz.GetInternalCodec(srv, utils.ServiceManager) } -func cgrStartRPC(ctx *context.Context, shtdwnEngine context.CancelFunc, - cfg *config.CGRConfig, sIdxr *servmanager.ServiceIndexer) { +func cgrStartRPC(cfg *config.CGRConfig, sIdxr *servmanager.ServiceIndexer, shutdown chan struct{}) { if cfg.DispatcherSCfg().Enabled { // wait only for dispatcher as cache is allways registered before this - select { - case <-sIdxr.GetService(utils.DispatcherS).StateChan(utils.StateServiceUP): - case <-ctx.Done(): + if utils.StructChanTimeout( + sIdxr.GetService(utils.DispatcherS).StateChan(utils.StateServiceUP), + cfg.GeneralCfg().ConnectTimeout) { return } } cl := sIdxr.GetService(utils.CommonListenerS).(*services.CommonListenerService).CLS() - cl.StartServer(ctx, shtdwnEngine, cfg) + cl.StartServer(cfg, shutdown) } -func handleSignals(ctx *context.Context, shutdown context.CancelFunc, - cfg *config.CGRConfig, shdWg *sync.WaitGroup) { +func handleSignals(stopChan chan struct{}, cfg *config.CGRConfig, shdWg *sync.WaitGroup) { shutdownSignal := make(chan os.Signal, 1) reloadSignal := make(chan os.Signal, 1) signal.Notify(shutdownSignal, os.Interrupt, @@ -395,18 +390,16 @@ func handleSignals(ctx *context.Context, shutdown context.CancelFunc, signal.Notify(reloadSignal, syscall.SIGHUP) for { select { - case <-ctx.Done(): + case <-stopChan: shdWg.Done() return case <-shutdownSignal: - shutdown() - shdWg.Done() - return + close(stopChan) case <-reloadSignal: // do it in its own goroutine in order to not block the signal handler with the reload functionality go func() { var reply string - if err := cfg.V1ReloadConfig(ctx, + if err := cfg.V1ReloadConfig(context.TODO(), new(config.ReloadArgs), &reply); err != nil { utils.Logger.Warning( fmt.Sprintf("Error reloading configuration: <%s>", err)) diff --git a/commonlisteners/commonlistener_it_test.go b/commonlisteners/commonlistener_it_test.go index 6b1bb6baf..9c1e06510 100644 --- a/commonlisteners/commonlistener_it_test.go +++ b/commonlisteners/commonlistener_it_test.go @@ -123,10 +123,10 @@ func testServeJSON(t *testing.T) { buff := new(bytes.Buffer) log.SetOutput(buff) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + shutdown := make(chan struct{}) + defer close(shutdown) defer server.Stop() - go server.ServeJSON(ctx, cancel, ":88845") + go server.ServeJSON(":88845", shutdown) runtime.Gosched() expected := "listen tcp: address 88845: invalid port" @@ -143,17 +143,14 @@ func testServeHHTPFail(t *testing.T) { server.RpcRegister(new(mockRegister)) var closed bool ch := make(chan struct{}) - go server.ServeHTTP(func() { - closed = true - close(ch) - }, - "invalid_port_format", + go server.ServeHTTP("invalid_port_format", cfgDflt.HTTPCfg().JsonRPCURL, cfgDflt.HTTPCfg().WSURL, cfgDflt.HTTPCfg().PrometheusURL, cfgDflt.HTTPCfg().PprofPath, cfgDflt.HTTPCfg().UseBasicAuth, cfgDflt.HTTPCfg().AuthUsers, + nil, ) runtime.Gosched() diff --git a/commonlisteners/commonlisteners.go b/commonlisteners/commonlisteners.go index 87d7fce32..044274371 100644 --- a/commonlisteners/commonlisteners.go +++ b/commonlisteners/commonlisteners.go @@ -133,33 +133,32 @@ func (c *CommonListenerS) handleWebSocket(ws *websocket.Conn) { c.rpcServer.ServeCodec(newCapsJSONCodec(ws, c.caps, c.anz)) } -func (c *CommonListenerS) ServeJSON(ctx *context.Context, shtdwnEngine context.CancelFunc, addr string) (err error) { +func (c *CommonListenerS) ServeJSON(addr string, shutdown chan struct{}) (err error) { if c.rpcJSONl, err = net.Listen(utils.TCP, addr); err != nil { log.Printf("Serve%s listen error: %s", utils.JSONCaps, err) - shtdwnEngine() + close(shutdown) return } utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s server at <%s>.", utils.JSONCaps, addr)) - return acceptRPC(ctx, shtdwnEngine, c.rpcServer, c.rpcJSONl, utils.JSONCaps, func(conn conn) birpc.ServerCodec { + return acceptRPC(shutdown, c.rpcServer, c.rpcJSONl, utils.JSONCaps, func(conn conn) birpc.ServerCodec { return newCapsJSONCodec(conn, c.caps, c.anz) }) } -func (c *CommonListenerS) ServeGOB(ctx *context.Context, shtdwnEngine context.CancelFunc, addr string) (err error) { +func (c *CommonListenerS) ServeGOB(addr string, shutdown chan struct{}) (err error) { if c.rpcGOBl, err = net.Listen(utils.TCP, addr); err != nil { log.Printf("Serve%s listen error: %s", utils.GOBCaps, err) - shtdwnEngine() + close(shutdown) return } utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s server at <%s>.", utils.GOBCaps, addr)) - return acceptRPC(ctx, shtdwnEngine, c.rpcServer, c.rpcGOBl, utils.GOBCaps, func(conn conn) birpc.ServerCodec { + return acceptRPC(shutdown, c.rpcServer, c.rpcGOBl, utils.GOBCaps, func(conn conn) birpc.ServerCodec { return newCapsGOBCodec(conn, c.caps, c.anz) }) } -func (c *CommonListenerS) ServeHTTP(shtdwnEngine context.CancelFunc, - addr, jsonRPCURL, wsRPCURL, promURL, pprofPath string, - useBasicAuth bool, userList map[string]string) { +func (c *CommonListenerS) ServeHTTP(addr, jsonRPCURL, wsRPCURL, promURL, pprofPath string, + useBasicAuth bool, userList map[string]string, shutdown chan struct{}) { c.mu.Lock() c.httpEnabled = c.httpEnabled || jsonRPCURL != "" || wsRPCURL != "" || pprofPath != "" enabled := c.httpEnabled @@ -219,7 +218,7 @@ func (c *CommonListenerS) ServeHTTP(shtdwnEngine context.CancelFunc, c.httpServer.Addr = addr if err := c.httpServer.ListenAndServe(); err != nil { log.Println(fmt.Sprintf("Error: %s when listening ", err)) - shtdwnEngine() + close(shutdown) } } @@ -249,50 +248,49 @@ func (c *CommonListenerS) ServeBiRPC(addrJSON, addrGOB string, onConn, onDis fun return } -func (c *CommonListenerS) ServeGOBTLS(ctx *context.Context, shtdwnEngine context.CancelFunc, - addr, serverCrt, serverKey, caCert string, serverPolicy int, serverName string) (err error) { +func (c *CommonListenerS) ServeGOBTLS(addr, serverCrt, serverKey, caCert string, serverPolicy int, + serverName string, shutdown chan struct{}) (err error) { config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName) if err != nil { - shtdwnEngine() + close(shutdown) return } c.rpcGOBlTLS, err = tls.Listen(utils.TCP, addr, config) if err != nil { log.Println(fmt.Sprintf("Error: %s when listening", err)) - shtdwnEngine() + close(shutdown) return } utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s TLS server at <%s>.", utils.GOBCaps, addr)) - return acceptRPC(ctx, shtdwnEngine, c.rpcServer, c.rpcGOBlTLS, utils.GOBCaps, func(conn conn) birpc.ServerCodec { + return acceptRPC(shutdown, c.rpcServer, c.rpcGOBlTLS, utils.GOBCaps, func(conn conn) birpc.ServerCodec { return newCapsGOBCodec(conn, c.caps, c.anz) }) } -func (c *CommonListenerS) ServeJSONTLS(ctx *context.Context, shtdwnEngine context.CancelFunc, - addr, serverCrt, serverKey, caCert string, serverPolicy int, serverName string) (err error) { +func (c *CommonListenerS) ServeJSONTLS(addr, serverCrt, serverKey, caCert string, serverPolicy int, + serverName string, shutdown chan struct{}) (err error) { config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName) if err != nil { - shtdwnEngine() + close(shutdown) return } c.rpcJSONlTLS, err = tls.Listen(utils.TCP, addr, config) if err != nil { log.Println(fmt.Sprintf("Error: %s when listening", err)) - shtdwnEngine() + close(shutdown) return } utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s TLS server at <%s>.", utils.JSONCaps, addr)) - return acceptRPC(ctx, shtdwnEngine, c.rpcServer, c.rpcJSONlTLS, utils.JSONCaps, func(conn conn) birpc.ServerCodec { + return acceptRPC(shutdown, c.rpcServer, c.rpcJSONlTLS, utils.JSONCaps, func(conn conn) birpc.ServerCodec { return newCapsGOBCodec(conn, c.caps, c.anz) }) } -func (c *CommonListenerS) ServeHTTPS(shtdwnEngine context.CancelFunc, - addr, serverCrt, serverKey, caCert string, serverPolicy int, - serverName, jsonRPCURL, wsRPCURL, pprofPath string, - useBasicAuth bool, userList map[string]string) { +func (c *CommonListenerS) ServeHTTPS(addr, serverCrt, serverKey, caCert string, serverPolicy int, + serverName, jsonRPCURL, wsRPCURL, pprofPath string, useBasicAuth bool, userList map[string]string, + shutdown chan struct{}) { c.mu.Lock() c.httpEnabled = c.httpEnabled || jsonRPCURL != "" || wsRPCURL != "" || pprofPath != "" enabled := c.httpEnabled @@ -341,7 +339,7 @@ func (c *CommonListenerS) ServeHTTPS(shtdwnEngine context.CancelFunc, } config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName) if err != nil { - shtdwnEngine() + close(shutdown) return } c.httpsServer.Addr = addr @@ -350,7 +348,7 @@ func (c *CommonListenerS) ServeHTTPS(shtdwnEngine context.CancelFunc, if err := c.httpsServer.ListenAndServeTLS(serverCrt, serverKey); err != nil { log.Println(fmt.Sprintf("Error: %s when listening ", err)) - shtdwnEngine() + close(shutdown) } } @@ -382,12 +380,11 @@ func (c *CommonListenerS) StopBiRPC() { c.birpcSrv = birpc.NewBirpcServer() } -func (c *CommonListenerS) StartServer(ctx *context.Context, shtdwnEngine context.CancelFunc, cfg *config.CGRConfig) { +func (c *CommonListenerS) StartServer(cfg *config.CGRConfig, shutdown chan struct{}) { c.startSrv.Do(func() { - go c.ServeJSON(ctx, shtdwnEngine, cfg.ListenCfg().RPCJSONListen) - go c.ServeGOB(ctx, shtdwnEngine, cfg.ListenCfg().RPCGOBListen) + go c.ServeJSON(cfg.ListenCfg().RPCJSONListen, shutdown) + go c.ServeGOB(cfg.ListenCfg().RPCGOBListen, shutdown) go c.ServeHTTP( - shtdwnEngine, cfg.ListenCfg().HTTPListen, cfg.HTTPCfg().JsonRPCURL, cfg.HTTPCfg().WSURL, @@ -395,6 +392,7 @@ func (c *CommonListenerS) StartServer(ctx *context.Context, shtdwnEngine context cfg.HTTPCfg().PprofPath, cfg.HTTPCfg().UseBasicAuth, cfg.HTTPCfg().AuthUsers, + shutdown, ) if (len(cfg.ListenCfg().RPCGOBTLSListen) != 0 || len(cfg.ListenCfg().RPCJSONTLSListen) != 0 || @@ -406,29 +404,28 @@ func (c *CommonListenerS) StartServer(ctx *context.Context, shtdwnEngine context } if cfg.ListenCfg().RPCGOBTLSListen != utils.EmptyString { go c.ServeGOBTLS( - ctx, shtdwnEngine, cfg.ListenCfg().RPCGOBTLSListen, cfg.TLSCfg().ServerCerificate, cfg.TLSCfg().ServerKey, cfg.TLSCfg().CaCertificate, cfg.TLSCfg().ServerPolicy, cfg.TLSCfg().ServerName, + shutdown, ) } if cfg.ListenCfg().RPCJSONTLSListen != utils.EmptyString { go c.ServeJSONTLS( - ctx, shtdwnEngine, cfg.ListenCfg().RPCJSONTLSListen, cfg.TLSCfg().ServerCerificate, cfg.TLSCfg().ServerKey, cfg.TLSCfg().CaCertificate, cfg.TLSCfg().ServerPolicy, cfg.TLSCfg().ServerName, + shutdown, ) } if cfg.ListenCfg().HTTPTLSListen != utils.EmptyString { go c.ServeHTTPS( - shtdwnEngine, cfg.ListenCfg().HTTPTLSListen, cfg.TLSCfg().ServerCerificate, cfg.TLSCfg().ServerKey, @@ -440,6 +437,7 @@ func (c *CommonListenerS) StartServer(ctx *context.Context, shtdwnEngine context cfg.HTTPCfg().PprofPath, cfg.HTTPCfg().UseBasicAuth, cfg.HTTPCfg().AuthUsers, + shutdown, ) } }) diff --git a/commonlisteners/libcommonlisteners.go b/commonlisteners/libcommonlisteners.go index f1d9f6e0d..6a222551b 100644 --- a/commonlisteners/libcommonlisteners.go +++ b/commonlisteners/libcommonlisteners.go @@ -32,7 +32,6 @@ import ( "time" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/analyzers" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -125,15 +124,14 @@ func loadTLSConfig(serverCrt, serverKey, caCert string, serverPolicy int, return } -func acceptRPC(ctx *context.Context, shtdwnEngine context.CancelFunc, - srv *birpc.Server, l net.Listener, codecName string, newCodec func(conn conn) birpc.ServerCodec) (err error) { +func acceptRPC(shutdown chan struct{}, srv *birpc.Server, l net.Listener, codecName string, newCodec func(conn conn) birpc.ServerCodec) (err error) { var errCnt int var lastErrorTime time.Time for { var conn net.Conn if conn, err = l.Accept(); err != nil { select { - case <-ctx.Done(): + case <-shutdown: return default: } @@ -145,7 +143,7 @@ func acceptRPC(ctx *context.Context, shtdwnEngine context.CancelFunc, lastErrorTime = time.Now() errCnt++ if errCnt > 50 { // Too many errors in short interval, network buffer failure most probably - shtdwnEngine() + close(shutdown) return } continue diff --git a/cores/core.go b/cores/core.go index 62018213f..b7e9ea6c2 100644 --- a/cores/core.go +++ b/cores/core.go @@ -34,14 +34,14 @@ import ( ) func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, fileCPU *os.File, stopChan chan struct{}, - shdWg *sync.WaitGroup, shtDw context.CancelFunc) *CoreS { + shdWg *sync.WaitGroup, shutdown chan struct{}) *CoreS { var st *engine.CapsStats if caps.IsLimited() && cfg.CoreSCfg().CapsStatsInterval != 0 { st = engine.NewCapsStats(cfg.CoreSCfg().CapsStatsInterval, caps, stopChan) } return &CoreS{ shdWg: shdWg, - shtDw: shtDw, + shutdown: shutdown, cfg: cfg, CapsStats: st, fileCPU: fileCPU, @@ -53,7 +53,7 @@ type CoreS struct { cfg *config.CGRConfig CapsStats *engine.CapsStats shdWg *sync.WaitGroup - shtDw context.CancelFunc + shutdown chan struct{} memProfMux sync.Mutex finalMemProf string // full path of the final memory profile created on stop/shutdown @@ -66,7 +66,7 @@ type CoreS struct { } func (cS *CoreS) ShutdownEngine() { - cS.shtDw() + close(cS.shutdown) } // Shutdown is called to shutdown the service diff --git a/cores/core_test.go b/cores/core_test.go index 209673336..43a5c09b3 100644 --- a/cores/core_test.go +++ b/cores/core_test.go @@ -34,17 +34,23 @@ func TestNewCoreService(t *testing.T) { stopchan := make(chan struct{}, 1) caps := engine.NewCaps(1, utils.MetaBusy) sts := engine.NewCapsStats(cfgDflt.CoreSCfg().CapsStatsInterval, caps, stopchan) + shutdown := make(chan struct{}) expected := &CoreS{ cfg: cfgDflt, CapsStats: sts, caps: caps, + shutdown: shutdown, } - rcv := NewCoreService(cfgDflt, caps, nil, stopchan, nil, nil) + rcv := NewCoreService(cfgDflt, caps, nil, stopchan, nil, shutdown) if !reflect.DeepEqual(expected, rcv) { t.Errorf("Expected %+v, received %+v", expected, rcv) } //shut down the service - rcv.shtDw = func() {} rcv.Shutdown() rcv.ShutdownEngine() + select { + case <-shutdown: + default: + t.Error("engine did not shut down") + } } diff --git a/engine/caches.go b/engine/caches.go index 6da64c8f1..e7a33141a 100644 --- a/engine/caches.go +++ b/engine/caches.go @@ -265,20 +265,20 @@ func (chS *CacheS) GetPrecacheChannel(chID string) chan struct{} { } // Precache loads data from DataDB into cache at engine start -func (chS *CacheS) Precache(ctx *context.Context, shutdown context.CancelFunc) { +func (chS *CacheS) Precache(shutdown chan struct{}) { for cacheID, cacheCfg := range chS.cfg.CacheCfg().Partitions { if !cacheCfg.Precache { close(chS.pcItems[cacheID]) // no need of precache continue } go func(cacheID string) { - err := chS.dm.CacheDataFromDB(ctx, + err := chS.dm.CacheDataFromDB(context.TODO(), utils.CacheInstanceToPrefix[cacheID], []string{utils.MetaAny}, false) if err != nil && err != context.Canceled { utils.Logger.Crit(fmt.Sprintf("<%s> precaching cacheID <%s>, got error: %s", utils.CacheS, cacheID, err)) - shutdown() + close(shutdown) return } close(chS.pcItems[cacheID]) diff --git a/engine/caches_test.go b/engine/caches_test.go index 364d7a02c..a10043d85 100644 --- a/engine/caches_test.go +++ b/engine/caches_test.go @@ -1373,7 +1373,7 @@ func TestCacheSPrecachePartitions(t *testing.T) { if _, err := dm.GetAttributeProfile(context.Background(), utils.CGRateSorg, "TEST_ATTRIBUTES_TEST", true, true, utils.NonTransactional); err != nil { t.Error(err) } - cacheS.Precache(context.Background(), func() {}) + cacheS.Precache(make(chan struct{})) time.Sleep(10 * time.Millisecond) if rcv, ok := Cache.Get(utils.CacheAttributeProfiles, "cgrates.org:TEST_ATTRIBUTES_TEST"); !ok { @@ -1400,10 +1400,6 @@ func TestCacheSPrecacheErr(t *testing.T) { args := &utils.ArgCacheReplicateSet{ CacheID: utils.CacheAccounts, - ItemID: "itemID", - Value: &utils.CachedRPCResponse{ - Result: "reply", - Error: nil}, } cfg := config.NewDefaultCGRConfig() cfg.CacheCfg().Partitions = map[string]*config.CacheParamCfg{ @@ -1414,7 +1410,7 @@ func TestCacheSPrecacheErr(t *testing.T) { cacheS := NewCacheS(cfg, nil, connMgr, nil) - cacheS.Precache(context.Background(), func() {}) + cacheS.Precache(make(chan struct{})) time.Sleep(10 * time.Millisecond) expErr := " precaching cacheID <*accounts>, got error: NO_DATABASE_CONNECTION" diff --git a/services/accounts.go b/services/accounts.go index 35e15d469..d68c4f31d 100644 --- a/services/accounts.go +++ b/services/accounts.go @@ -22,8 +22,6 @@ import ( "fmt" "sync" - "github.com/cgrates/birpc/context" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/accounts" "github.com/cgrates/cgrates/commonlisteners" @@ -37,7 +35,7 @@ import ( // NewAccountService returns the Account Service func NewAccountService(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *AccountService { return &AccountService{ cfg: cfg, connMgr: connMgr, @@ -65,7 +63,7 @@ type AccountService struct { } // Start should handle the service start -func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { +func (acts *AccountService) Start(shutdown chan struct{}) (err error) { if acts.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -78,7 +76,7 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.AccountS, utils.CacheS, utils.StateServiceUP) } - if err = cacheS.WaitToPrecache(ctx, + if err = cacheS.WaitToPrecache(shutdown, utils.CacheAccounts, utils.CacheAccountsFilterIndexes); err != nil { return @@ -119,7 +117,7 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e } // Reload handles the change of config -func (acts *AccountService) Reload(*context.Context, context.CancelFunc) (err error) { +func (acts *AccountService) Reload(_ chan struct{}) (err error) { acts.rldChan <- struct{}{} return // for the moment nothing to reload } diff --git a/services/actions.go b/services/actions.go index 207e98c37..8301fc7e3 100644 --- a/services/actions.go +++ b/services/actions.go @@ -22,8 +22,6 @@ import ( "fmt" "sync" - "github.com/cgrates/birpc/context" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/actions" "github.com/cgrates/cgrates/commonlisteners" @@ -37,7 +35,7 @@ import ( // NewActionService returns the Action Service func NewActionService(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *ActionService { return &ActionService{ connMgr: connMgr, cfg: cfg, @@ -66,7 +64,7 @@ type ActionService struct { } // Start should handle the service start -func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { +func (acts *ActionService) Start(shutdown chan struct{}) (err error) { if acts.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -80,7 +78,7 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.ActionS, utils.CacheS, utils.StateServiceUP) } - if err = cacheS.WaitToPrecache(ctx, + if err = cacheS.WaitToPrecache(shutdown, utils.CacheActionProfiles, utils.CacheActionProfilesFilterIndexes); err != nil { return @@ -120,7 +118,7 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er } // Reload handles the change of config -func (acts *ActionService) Reload(*context.Context, context.CancelFunc) (err error) { +func (acts *ActionService) Reload(_ chan struct{}) (err error) { acts.rldChan <- struct{}{} return // for the moment nothing to reload } diff --git a/services/adminsv1.go b/services/adminsv1.go index d10a6e22f..193be5b5f 100644 --- a/services/adminsv1.go +++ b/services/adminsv1.go @@ -22,7 +22,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" @@ -34,7 +33,7 @@ import ( // NewAPIerSv1Service returns the APIerSv1 Service func NewAdminSv1Service(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *AdminSv1Service { return &AdminSv1Service{ cfg: cfg, connMgr: connMgr, @@ -61,7 +60,7 @@ type AdminSv1Service struct { // Start should handle the sercive start // For this service the start should be called from RAL Service -func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelFunc) (err error) { +func (apiService *AdminSv1Service) Start(_ chan struct{}) (err error) { if apiService.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -113,7 +112,7 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF } // Reload handles the change of config -func (apiService *AdminSv1Service) Reload(*context.Context, context.CancelFunc) (err error) { +func (apiService *AdminSv1Service) Reload(_ chan struct{}) (err error) { return } diff --git a/services/analyzers.go b/services/analyzers.go index 1fda1a913..106764c3d 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -59,7 +59,7 @@ type AnalyzerService struct { } // Start should handle the sercive start -func (anz *AnalyzerService) Start(ctx *context.Context, shtDwn context.CancelFunc) (err error) { +func (anz *AnalyzerService) Start(shutdown chan struct{}) (err error) { if anz.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -76,12 +76,12 @@ func (anz *AnalyzerService) Start(ctx *context.Context, shtDwn context.CancelFun utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.AnalyzerS, err.Error())) return } - anzCtx, cancel := context.WithCancel(ctx) + anzCtx, cancel := context.WithCancel(context.TODO()) anz.cancelFunc = cancel go func(a *analyzers.AnalyzerS) { if err := a.ListenAndServe(anzCtx); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Error: %s listening for packets", utils.AnalyzerS, err.Error())) - shtDwn() + close(shutdown) } }(anz.anz) anz.cl.SetAnalyzer(anz.anz) @@ -114,7 +114,7 @@ func (anz *AnalyzerService) start() { } // Reload handles the change of config -func (anz *AnalyzerService) Reload(*context.Context, context.CancelFunc) (err error) { +func (anz *AnalyzerService) Reload(_ chan struct{}) (err error) { return // for the momment nothing to reload } diff --git a/services/asteriskagent.go b/services/asteriskagent.go index e233f66b9..876d1c218 100644 --- a/services/asteriskagent.go +++ b/services/asteriskagent.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/agents" @@ -35,7 +34,7 @@ import ( // NewAsteriskAgent returns the Asterisk Agent func NewAsteriskAgent(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *AsteriskAgent { return &AsteriskAgent{ cfg: cfg, connMgr: connMgr, @@ -59,7 +58,7 @@ type AsteriskAgent struct { } // Start should handle the sercive start -func (ast *AsteriskAgent) Start(_ *context.Context, shtDwn context.CancelFunc) (err error) { +func (ast *AsteriskAgent) Start(shutdown chan struct{}) (err error) { if ast.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -70,7 +69,7 @@ func (ast *AsteriskAgent) Start(_ *context.Context, shtDwn context.CancelFunc) ( listenAndServe := func(sma *agents.AsteriskAgent, stopChan chan struct{}) { if err := sma.ListenAndServe(stopChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> runtime error: %s!", utils.AsteriskAgent, err)) - shtDwn() + close(shutdown) } } ast.stopChan = make(chan struct{}) @@ -84,9 +83,9 @@ func (ast *AsteriskAgent) Start(_ *context.Context, shtDwn context.CancelFunc) ( } // Reload handles the change of config -func (ast *AsteriskAgent) Reload(ctx *context.Context, shtDwn context.CancelFunc) (err error) { +func (ast *AsteriskAgent) Reload(shutdown chan struct{}) (err error) { ast.shutdown() - return ast.Start(ctx, shtDwn) + return ast.Start(shutdown) } // Shutdown stops the service diff --git a/services/attributes.go b/services/attributes.go index b74e7b2c1..401a71bdb 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" @@ -35,7 +34,7 @@ import ( // NewAttributeService returns the Attribute Service func NewAttributeService(cfg *config.CGRConfig, dspS *DispatcherService, - sIndxr *servmanager.ServiceIndexer) servmanager.Service { + sIndxr *servmanager.ServiceIndexer) *AttributeService { return &AttributeService{ cfg: cfg, dspS: dspS, @@ -62,7 +61,7 @@ type AttributeService struct { } // Start should handle the service start -func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { +func (attrS *AttributeService) Start(shutdown chan struct{}) (err error) { if attrS.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -80,7 +79,7 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc) if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CacheS, utils.StateServiceUP) } - if err = cacheS.WaitToPrecache(ctx, + if err = cacheS.WaitToPrecache(shutdown, utils.CacheAttributeProfiles, utils.CacheAttributeFilterIndexes); err != nil { return @@ -129,7 +128,7 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc) } // Reload handles the change of config -func (attrS *AttributeService) Reload(*context.Context, context.CancelFunc) (err error) { +func (attrS *AttributeService) Reload(_ chan struct{}) (err error) { return // for the moment nothing to reload } diff --git a/services/caches.go b/services/caches.go index 26d399d0d..2886602a4 100644 --- a/services/caches.go +++ b/services/caches.go @@ -20,7 +20,6 @@ package services import ( "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -54,7 +53,7 @@ type CacheService struct { } // Start should handle the sercive start -func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (err error) { +func (cS *CacheService) Start(shutdown chan struct{}) (err error) { cls := cS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.CacheS, utils.CommonListenerS, utils.StateServiceUP) @@ -73,7 +72,7 @@ func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (e return utils.NewServiceStateTimeoutError(utils.CacheS, utils.CoreS, utils.StateServiceUP) } engine.Cache = engine.NewCacheS(cS.cfg, dbs.DataManager(), cS.connMgr, cs.CoreS().CapsStats) - go engine.Cache.Precache(ctx, shtDw) + go engine.Cache.Precache(shutdown) cS.cacheCh <- engine.Cache @@ -90,7 +89,7 @@ func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (e } // Reload handles the change of config -func (cS *CacheService) Reload(*context.Context, context.CancelFunc) (_ error) { +func (cS *CacheService) Reload(_ chan struct{}) (_ error) { return } @@ -120,18 +119,18 @@ func (cS *CacheService) GetCacheSChan() chan *engine.CacheS { return cS.cacheCh } -func (cS *CacheService) WaitToPrecache(ctx *context.Context, cacheIDs ...string) (err error) { +func (cS *CacheService) WaitToPrecache(shutdown chan struct{}, cacheIDs ...string) (err error) { var cacheS *engine.CacheS select { - case <-ctx.Done(): - return ctx.Err() + case <-shutdown: + return case cacheS = <-cS.cacheCh: cS.cacheCh <- cacheS } for _, cacheID := range cacheIDs { select { - case <-ctx.Done(): - return ctx.Err() + case <-shutdown: + return case <-cacheS.GetPrecacheChannel(cacheID): } } diff --git a/services/cdrs.go b/services/cdrs.go index 7ce0387ee..034801d3f 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -24,7 +24,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/cdrs" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" @@ -36,7 +35,7 @@ import ( // NewCDRServer returns the CDR Server func NewCDRServer(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *CDRService { return &CDRService{ cfg: cfg, connMgr: connMgr, @@ -61,7 +60,7 @@ type CDRService struct { } // Start should handle the sercive start -func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { +func (cs *CDRService) Start(_ chan struct{}) (err error) { if cs.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -110,7 +109,7 @@ func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err err } // Reload handles the change of config -func (cs *CDRService) Reload(*context.Context, context.CancelFunc) (err error) { +func (cs *CDRService) Reload(_ chan struct{}) (err error) { return } diff --git a/services/chargers.go b/services/chargers.go index ecf63c2c5..6c3e2a88b 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -22,8 +22,6 @@ import ( "fmt" "sync" - "github.com/cgrates/birpc/context" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" @@ -35,7 +33,7 @@ import ( // NewChargerService returns the Charger Service func NewChargerService(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *ChargerService { return &ChargerService{ cfg: cfg, connMgr: connMgr, @@ -60,7 +58,7 @@ type ChargerService struct { } // Start should handle the service start -func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { +func (chrS *ChargerService) Start(shutdown chan struct{}) (err error) { if chrS.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -74,7 +72,7 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.CacheS, utils.StateServiceUP) } - if err = cacheS.WaitToPrecache(ctx, + if err = cacheS.WaitToPrecache(shutdown, utils.CacheChargerProfiles, utils.CacheChargerFilterIndexes); err != nil { return @@ -110,7 +108,7 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e } // Reload handles the change of config -func (chrS *ChargerService) Reload(ctx *context.Context, _ context.CancelFunc) (err error) { +func (chrS *ChargerService) Reload(_ chan struct{}) (err error) { return } diff --git a/services/commonlisteners.go b/services/commonlisteners.go index 83ac17b22..aad3c4618 100644 --- a/services/commonlisteners.go +++ b/services/commonlisteners.go @@ -22,7 +22,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -57,7 +56,7 @@ type CommonListenerService struct { } // Start handles the service start. -func (cl *CommonListenerService) Start(*context.Context, context.CancelFunc) error { +func (cl *CommonListenerService) Start(_ chan struct{}) error { if cl.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -75,7 +74,7 @@ func (cl *CommonListenerService) Start(*context.Context, context.CancelFunc) err } // Reload handles the config changes. -func (cl *CommonListenerService) Reload(*context.Context, context.CancelFunc) error { +func (cl *CommonListenerService) Reload(_ chan struct{}) error { return nil } diff --git a/services/config.go b/services/config.go index 665e3f6c3..4b5eaec64 100644 --- a/services/config.go +++ b/services/config.go @@ -22,7 +22,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -48,7 +47,7 @@ type ConfigService struct { } // Start handles the service start. -func (s *ConfigService) Start(_ *context.Context, _ context.CancelFunc) error { +func (s *ConfigService) Start(_ chan struct{}) error { cls := s.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.GuardianS, utils.CommonListenerS, utils.StateServiceUP) @@ -70,7 +69,7 @@ func (s *ConfigService) Start(_ *context.Context, _ context.CancelFunc) error { } // Reload handles the config changes. -func (s *ConfigService) Reload(*context.Context, context.CancelFunc) error { +func (s *ConfigService) Reload(_ chan struct{}) error { return nil } diff --git a/services/cores.go b/services/cores.go index 4f5218477..a67429b2f 100644 --- a/services/cores.go +++ b/services/cores.go @@ -24,7 +24,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" @@ -68,7 +67,7 @@ type CoreService struct { } // Start should handle the service start -func (cS *CoreService) Start(ctx *context.Context, shtDw context.CancelFunc) error { +func (cS *CoreService) Start(shutdown chan struct{}) error { if cS.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -87,7 +86,7 @@ func (cS *CoreService) Start(ctx *context.Context, shtDw context.CancelFunc) err defer cS.mu.Unlock() utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CoreS)) cS.stopChan = make(chan struct{}) - cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.fileCPU, cS.stopChan, cS.shdWg, shtDw) + cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.fileCPU, cS.stopChan, cS.shdWg, shutdown) cS.csCh <- cS.cS srv, err := engine.NewService(cS.cS) if err != nil { @@ -105,7 +104,7 @@ func (cS *CoreService) Start(ctx *context.Context, shtDw context.CancelFunc) err } // Reload handles the change of config -func (cS *CoreService) Reload(*context.Context, context.CancelFunc) error { +func (cS *CoreService) Reload(_ chan struct{}) error { return nil } diff --git a/services/datadb.go b/services/datadb.go index 643ba0132..0718e1f17 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -62,7 +61,7 @@ type DataDBService struct { } // Start handles the service start. -func (db *DataDBService) Start(*context.Context, context.CancelFunc) (err error) { +func (db *DataDBService) Start(_ chan struct{}) (err error) { if db.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -94,7 +93,7 @@ func (db *DataDBService) Start(*context.Context, context.CancelFunc) (err error) } // Reload handles the change of config -func (db *DataDBService) Reload(*context.Context, context.CancelFunc) (err error) { +func (db *DataDBService) Reload(_ chan struct{}) (err error) { db.Lock() defer db.Unlock() if db.needsConnectionReload() { diff --git a/services/diameteragent.go b/services/diameteragent.go index 1c0b27f62..ed4b2cd93 100644 --- a/services/diameteragent.go +++ b/services/diameteragent.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -34,7 +33,7 @@ import ( // NewDiameterAgent returns the Diameter Agent func NewDiameterAgent(cfg *config.CGRConfig, connMgr *engine.ConnManager, caps *engine.Caps, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *DiameterAgent { return &DiameterAgent{ cfg: cfg, connMgr: connMgr, @@ -63,7 +62,7 @@ type DiameterAgent struct { } // Start should handle the sercive start -func (da *DiameterAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) error { +func (da *DiameterAgent) Start(shutdown chan struct{}) error { if da.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -74,10 +73,10 @@ func (da *DiameterAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) } da.Lock() defer da.Unlock() - return da.start(fs.FilterS(), shtDwn, da.caps) + return da.start(fs.FilterS(), da.caps, shutdown) } -func (da *DiameterAgent) start(filterS *engine.FilterS, shtDwn context.CancelFunc, caps *engine.Caps) error { +func (da *DiameterAgent) start(filterS *engine.FilterS, caps *engine.Caps, shutdown chan struct{}) error { var err error da.da, err = agents.NewDiameterAgent(da.cfg, filterS, da.connMgr, caps) if err != nil { @@ -92,7 +91,7 @@ func (da *DiameterAgent) start(filterS *engine.FilterS, shtDwn context.CancelFun if err := d.ListenAndServe(da.stopChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.DiameterAgent, err)) - shtDwn() + close(shutdown) } }(da.da) close(da.stateDeps.StateChan(utils.StateServiceUP)) @@ -100,7 +99,7 @@ func (da *DiameterAgent) start(filterS *engine.FilterS, shtDwn context.CancelFun } // Reload handles the change of config -func (da *DiameterAgent) Reload(ctx *context.Context, shtDwn context.CancelFunc) (err error) { +func (da *DiameterAgent) Reload(shutdown chan struct{}) (err error) { da.Lock() defer da.Unlock() if da.lnet == da.cfg.DiameterAgentCfg().ListenNet && @@ -112,7 +111,7 @@ func (da *DiameterAgent) Reload(ctx *context.Context, shtDwn context.CancelFunc) if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), da.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.DiameterAgent, utils.FilterS, utils.StateServiceUP) } - return da.start(fs.FilterS(), shtDwn, da.caps) + return da.start(fs.FilterS(), da.caps, shutdown) } // Shutdown stops the service diff --git a/services/dispatchers.go b/services/dispatchers.go index 290f674f6..c159e49de 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -22,7 +22,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/dispatchers" @@ -61,7 +60,7 @@ type DispatcherService struct { } // Start should handle the sercive start -func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { +func (dspS *DispatcherService) Start(shutdown chan struct{}) (err error) { if dspS.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -75,7 +74,7 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc) if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.CacheS, utils.StateServiceUP) } - if err = cacheS.WaitToPrecache(ctx, + if err = cacheS.WaitToPrecache(shutdown, utils.CacheDispatcherProfiles, utils.CacheDispatcherHosts, utils.CacheDispatcherFilterIndexes); err != nil { @@ -117,7 +116,7 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc) } // Reload handles the change of config -func (dspS *DispatcherService) Reload(*context.Context, context.CancelFunc) (err error) { +func (dspS *DispatcherService) Reload(_ chan struct{}) (err error) { return // for the momment nothing to reload } diff --git a/services/dnsagent.go b/services/dnsagent.go index 519e6d1a1..9b3f5a851 100644 --- a/services/dnsagent.go +++ b/services/dnsagent.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -34,7 +33,7 @@ import ( // NewDNSAgent returns the DNS Agent func NewDNSAgent(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *DNSAgent { return &DNSAgent{ cfg: cfg, connMgr: connMgr, @@ -59,7 +58,7 @@ type DNSAgent struct { } // Start should handle the service start -func (dns *DNSAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) (err error) { +func (dns *DNSAgent) Start(shutdown chan struct{}) (err error) { if dns.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -77,13 +76,13 @@ func (dns *DNSAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) (err return } dns.stopChan = make(chan struct{}) - go dns.listenAndServe(dns.stopChan, shtDwn) + go dns.listenAndServe(dns.stopChan, shutdown) close(dns.stateDeps.StateChan(utils.StateServiceUP)) return } // Reload handles the change of config -func (dns *DNSAgent) Reload(ctx *context.Context, shtDwn context.CancelFunc) (err error) { +func (dns *DNSAgent) Reload(shutdown chan struct{}) (err error) { fs := dns.srvIndexer.GetService(utils.FilterS).(*FilterService) if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), dns.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.DNSAgent, utils.FilterS, utils.StateServiceUP) @@ -106,16 +105,16 @@ func (dns *DNSAgent) Reload(ctx *context.Context, shtDwn context.CancelFunc) (er dns.dns.Lock() defer dns.dns.Unlock() dns.stopChan = make(chan struct{}) - go dns.listenAndServe(dns.stopChan, shtDwn) + go dns.listenAndServe(dns.stopChan, shutdown) return } -func (dns *DNSAgent) listenAndServe(stopChan chan struct{}, shtDwn context.CancelFunc) (err error) { +func (dns *DNSAgent) listenAndServe(stopChan chan struct{}, shutdown chan struct{}) (err error) { dns.dns.RLock() defer dns.dns.RUnlock() if err = dns.dns.ListenAndServe(stopChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) - shtDwn() // stop the engine here + close(shutdown) // stop the engine here } return } diff --git a/services/ees.go b/services/ees.go index c7df40485..1ea1fa357 100644 --- a/services/ees.go +++ b/services/ees.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/ees" @@ -35,7 +34,7 @@ import ( // NewEventExporterService constructs EventExporterService func NewEventExporterService(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *EventExporterService { return &EventExporterService{ cfg: cfg, connMgr: connMgr, @@ -77,7 +76,7 @@ func (es *EventExporterService) IsRunning() bool { } // Reload handles the change of config -func (es *EventExporterService) Reload(*context.Context, context.CancelFunc) error { +func (es *EventExporterService) Reload(_ chan struct{}) error { es.mu.Lock() defer es.mu.Unlock() es.eeS.ClearExporterCache() @@ -96,7 +95,7 @@ func (es *EventExporterService) Shutdown() error { } // Start should handle the service start -func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc) error { +func (es *EventExporterService) Start(_ chan struct{}) error { if es.IsRunning() { return utils.ErrServiceAlreadyRunning } diff --git a/services/efs.go b/services/efs.go index 813bee5ea..b30896af7 100644 --- a/services/efs.go +++ b/services/efs.go @@ -22,8 +22,6 @@ import ( "fmt" "sync" - "github.com/cgrates/birpc/context" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" @@ -62,7 +60,7 @@ func NewExportFailoverService(cfg *config.CGRConfig, connMgr *engine.ConnManager } // Start should handle the service start -func (efServ *ExportFailoverService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { +func (efServ *ExportFailoverService) Start(_ chan struct{}) (err error) { if efServ.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -83,7 +81,7 @@ func (efServ *ExportFailoverService) Start(ctx *context.Context, _ context.Cance } // Reload handles the change of config -func (efServ *ExportFailoverService) Reload(ctx *context.Context, _ context.CancelFunc) (err error) { +func (efServ *ExportFailoverService) Reload(_ chan struct{}) (err error) { return } diff --git a/services/ers.go b/services/ers.go index 089e32310..808ca98e5 100644 --- a/services/ers.go +++ b/services/ers.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -36,7 +35,7 @@ import ( func NewEventReaderService( cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *EventReaderService { return &EventReaderService{ rldChan: make(chan struct{}, 1), cfg: cfg, @@ -64,7 +63,7 @@ type EventReaderService struct { } // Start should handle the sercive start -func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.CancelFunc) (err error) { +func (erS *EventReaderService) Start(shutdown chan struct{}) (err error) { if erS.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -93,7 +92,7 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel // build the service erS.ers = ers.NewERService(erS.cfg, fs.FilterS(), erS.connMgr) - go erS.listenAndServe(erS.ers, erS.stopChan, erS.rldChan, shtDwn) + go erS.listenAndServe(erS.ers, erS.stopChan, erS.rldChan, shutdown) srv, err := engine.NewServiceWithPing(erS.ers, utils.ErSv1, utils.V1Prfx) if err != nil { @@ -107,16 +106,16 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel return } -func (erS *EventReaderService) listenAndServe(ers *ers.ERService, stopChan chan struct{}, rldChan chan struct{}, shtDwn context.CancelFunc) (err error) { +func (erS *EventReaderService) listenAndServe(ers *ers.ERService, stopChan, rldChan, shutdown chan struct{}) (err error) { if err = ers.ListenAndServe(stopChan, rldChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%v>", utils.ERs, err)) - shtDwn() + close(shutdown) } return } // Reload handles the change of config -func (erS *EventReaderService) Reload(*context.Context, context.CancelFunc) (err error) { +func (erS *EventReaderService) Reload(_ chan struct{}) (err error) { erS.RLock() erS.rldChan <- struct{}{} erS.RUnlock() diff --git a/services/filters.go b/services/filters.go index 1b2f8de09..17f1fce36 100644 --- a/services/filters.go +++ b/services/filters.go @@ -22,7 +22,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -55,12 +54,12 @@ type FilterService struct { } // Start handles the service start. -func (s *FilterService) Start(ctx *context.Context, _ context.CancelFunc) error { +func (s *FilterService) Start(shutdown chan struct{}) error { cacheS := s.srvIndexer.GetService(utils.CacheS).(*CacheService) if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.FilterS, utils.CacheS, utils.StateServiceUP) } - if err := cacheS.WaitToPrecache(ctx, utils.CacheFilters); err != nil { + if err := cacheS.WaitToPrecache(shutdown, utils.CacheFilters); err != nil { return err } dbs := s.srvIndexer.GetService(utils.DataDB).(*DataDBService) @@ -73,7 +72,7 @@ func (s *FilterService) Start(ctx *context.Context, _ context.CancelFunc) error } // Reload handles the config changes. -func (s *FilterService) Reload(*context.Context, context.CancelFunc) error { +func (s *FilterService) Reload(_ chan struct{}) error { return nil } diff --git a/services/freeswitchagent.go b/services/freeswitchagent.go index 847607e5f..bdea1fe36 100644 --- a/services/freeswitchagent.go +++ b/services/freeswitchagent.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/agents" @@ -35,7 +34,7 @@ import ( // NewFreeswitchAgent returns the Freeswitch Agent func NewFreeswitchAgent(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *FreeswitchAgent { return &FreeswitchAgent{ cfg: cfg, connMgr: connMgr, @@ -58,7 +57,7 @@ type FreeswitchAgent struct { } // Start should handle the sercive start -func (fS *FreeswitchAgent) Start(_ *context.Context, shtDwn context.CancelFunc) (err error) { +func (fS *FreeswitchAgent) Start(shutdown chan struct{}) (err error) { if fS.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -68,27 +67,27 @@ func (fS *FreeswitchAgent) Start(_ *context.Context, shtDwn context.CancelFunc) fS.fS = agents.NewFSsessions(fS.cfg.FsAgentCfg(), fS.cfg.GeneralCfg().DefaultTimezone, fS.connMgr) - go fS.connect(shtDwn) + go fS.connect(shutdown) close(fS.stateDeps.StateChan(utils.StateServiceUP)) return } // Reload handles the change of config -func (fS *FreeswitchAgent) Reload(_ *context.Context, shtDwn context.CancelFunc) (err error) { +func (fS *FreeswitchAgent) Reload(shutdown chan struct{}) (err error) { fS.Lock() defer fS.Unlock() if err = fS.fS.Shutdown(); err != nil { return } fS.fS.Reload() - go fS.connect(shtDwn) + go fS.connect(shutdown) return } -func (fS *FreeswitchAgent) connect(shtDwn context.CancelFunc) { +func (fS *FreeswitchAgent) connect(shutdown chan struct{}) { if err := fS.fS.Connect(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.FreeSWITCHAgent, err)) - shtDwn() // stop the engine here + close(shutdown) // stop the engine here } return } diff --git a/services/globalvars.go b/services/globalvars.go index f980bdab8..988d9a575 100644 --- a/services/globalvars.go +++ b/services/globalvars.go @@ -20,7 +20,6 @@ package services import ( "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/config" @@ -48,7 +47,7 @@ type GlobalVarS struct { } // Start should handle the sercive start -func (gv *GlobalVarS) Start(*context.Context, context.CancelFunc) error { +func (gv *GlobalVarS) Start(_ chan struct{}) error { engine.SetHTTPPstrTransport(gv.cfg.HTTPCfg().ClientOpts) utils.DecimalContext.MaxScale = gv.cfg.GeneralCfg().DecimalMaxScale utils.DecimalContext.MinScale = gv.cfg.GeneralCfg().DecimalMinScale @@ -59,7 +58,7 @@ func (gv *GlobalVarS) Start(*context.Context, context.CancelFunc) error { } // Reload handles the change of config -func (gv *GlobalVarS) Reload(*context.Context, context.CancelFunc) error { +func (gv *GlobalVarS) Reload(_ chan struct{}) error { engine.SetHTTPPstrTransport(gv.cfg.HTTPCfg().ClientOpts) utils.DecimalContext.MaxScale = gv.cfg.GeneralCfg().DecimalMaxScale utils.DecimalContext.MinScale = gv.cfg.GeneralCfg().DecimalMinScale diff --git a/services/guardian.go b/services/guardian.go index df5b15d34..d6ab40865 100644 --- a/services/guardian.go +++ b/services/guardian.go @@ -22,7 +22,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/guardian" @@ -49,7 +48,7 @@ type GuardianService struct { } // Start handles the service start. -func (s *GuardianService) Start(_ *context.Context, _ context.CancelFunc) error { +func (s *GuardianService) Start(_ chan struct{}) error { cls := s.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.GuardianS, utils.CommonListenerS, utils.StateServiceUP) @@ -70,7 +69,7 @@ func (s *GuardianService) Start(_ *context.Context, _ context.CancelFunc) error } // Reload handles the config changes. -func (s *GuardianService) Reload(*context.Context, context.CancelFunc) error { +func (s *GuardianService) Reload(_ chan struct{}) error { return nil } diff --git a/services/httpagent.go b/services/httpagent.go index 1c5dd33ae..aaa76c21e 100644 --- a/services/httpagent.go +++ b/services/httpagent.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" @@ -35,7 +34,7 @@ import ( // NewHTTPAgent returns the HTTP Agent func NewHTTPAgent(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *HTTPAgent { return &HTTPAgent{ cfg: cfg, connMgr: connMgr, @@ -63,7 +62,7 @@ type HTTPAgent struct { } // Start should handle the sercive start -func (ha *HTTPAgent) Start(ctx *context.Context, _ context.CancelFunc) (err error) { +func (ha *HTTPAgent) Start(_ chan struct{}) (err error) { if ha.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -93,7 +92,7 @@ func (ha *HTTPAgent) Start(ctx *context.Context, _ context.CancelFunc) (err erro } // Reload handles the change of config -func (ha *HTTPAgent) Reload(*context.Context, context.CancelFunc) (err error) { +func (ha *HTTPAgent) Reload(_ chan struct{}) (err error) { return // no reload } diff --git a/services/janus.go b/services/janus.go index e0a358bdf..60679ff6a 100644 --- a/services/janus.go +++ b/services/janus.go @@ -24,7 +24,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -35,7 +34,7 @@ import ( // NewJanusAgent returns the Janus Agent func NewJanusAgent(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *JanusAgent { return &JanusAgent{ cfg: cfg, connMgr: connMgr, @@ -63,7 +62,7 @@ type JanusAgent struct { } // Start should jandle the sercive start -func (ja *JanusAgent) Start(ctx *context.Context, _ context.CancelFunc) (err error) { +func (ja *JanusAgent) Start(_ chan struct{}) (err error) { cls := ja.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ja.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.JanusAgent, utils.CommonListenerS, utils.StateServiceUP) @@ -103,7 +102,7 @@ func (ja *JanusAgent) Start(ctx *context.Context, _ context.CancelFunc) (err err } // Reload jandles the change of config -func (ja *JanusAgent) Reload(ctx *context.Context, _ context.CancelFunc) (err error) { +func (ja *JanusAgent) Reload(_ chan struct{}) (err error) { return // no reload } diff --git a/services/kamailioagent.go b/services/kamailioagent.go index 5b9faa606..d1dbda192 100644 --- a/services/kamailioagent.go +++ b/services/kamailioagent.go @@ -24,11 +24,9 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) @@ -36,7 +34,7 @@ import ( // NewKamailioAgent returns the Kamailio Agent func NewKamailioAgent(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *KamailioAgent { return &KamailioAgent{ cfg: cfg, connMgr: connMgr, @@ -59,7 +57,7 @@ type KamailioAgent struct { } // Start should handle the sercive start -func (kam *KamailioAgent) Start(_ *context.Context, shtDwn context.CancelFunc) (err error) { +func (kam *KamailioAgent) Start(shutdown chan struct{}) (err error) { if kam.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -70,30 +68,30 @@ func (kam *KamailioAgent) Start(_ *context.Context, shtDwn context.CancelFunc) ( kam.kam = agents.NewKamailioAgent(kam.cfg.KamAgentCfg(), kam.connMgr, utils.FirstNonEmpty(kam.cfg.KamAgentCfg().Timezone, kam.cfg.GeneralCfg().DefaultTimezone)) - go kam.connect(kam.kam, shtDwn) + go kam.connect(kam.kam, shutdown) close(kam.stateDeps.StateChan(utils.StateServiceUP)) return } // Reload handles the change of config -func (kam *KamailioAgent) Reload(_ *context.Context, shtDwn context.CancelFunc) (err error) { +func (kam *KamailioAgent) Reload(shutdown chan struct{}) (err error) { kam.Lock() defer kam.Unlock() if err = kam.kam.Shutdown(); err != nil { return } kam.kam.Reload() - go kam.connect(kam.kam, shtDwn) + go kam.connect(kam.kam, shutdown) return } -func (kam *KamailioAgent) connect(k *agents.KamailioAgent, shtDwn context.CancelFunc) (err error) { +func (kam *KamailioAgent) connect(k *agents.KamailioAgent, shutdown chan struct{}) (err error) { if err = k.Connect(); err != nil { if !strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log if !strings.Contains(err.Error(), "KamEvapi") { utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err)) } - shtDwn() + close(shutdown) } } return diff --git a/services/loaders.go b/services/loaders.go index cf6b975a4..22207ea4c 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -21,8 +21,6 @@ package services import ( "sync" - "github.com/cgrates/birpc/context" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" @@ -62,7 +60,7 @@ type LoaderService struct { } // Start should handle the service start -func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { +func (ldrs *LoaderService) Start(_ chan struct{}) (err error) { if ldrs.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -110,7 +108,7 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er } // Reload handles the change of config -func (ldrs *LoaderService) Reload(ctx *context.Context, _ context.CancelFunc) error { +func (ldrs *LoaderService) Reload(_ chan struct{}) error { fs := ldrs.srvIndexer.GetService(utils.FilterS).(*FilterService) if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.FilterS, utils.StateServiceUP) diff --git a/services/radiusagent.go b/services/radiusagent.go index 9365adcbb..714d0f33a 100644 --- a/services/radiusagent.go +++ b/services/radiusagent.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -34,7 +33,7 @@ import ( // NewRadiusAgent returns the Radius Agent func NewRadiusAgent(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *RadiusAgent { return &RadiusAgent{ cfg: cfg, connMgr: connMgr, @@ -62,7 +61,7 @@ type RadiusAgent struct { } // Start should handle the sercive start -func (rad *RadiusAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) (err error) { +func (rad *RadiusAgent) Start(shutdown chan struct{}) (err error) { if rad.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -85,21 +84,21 @@ func (rad *RadiusAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) ( } rad.stopChan = make(chan struct{}) - go rad.listenAndServe(rad.rad, shtDwn) + go rad.listenAndServe(rad.rad, shutdown) close(rad.stateDeps.StateChan(utils.StateServiceUP)) return } -func (rad *RadiusAgent) listenAndServe(r *agents.RadiusAgent, shtDwn context.CancelFunc) (err error) { +func (rad *RadiusAgent) listenAndServe(r *agents.RadiusAgent, shutdown chan struct{}) (err error) { if err = r.ListenAndServe(rad.stopChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.RadiusAgent, err.Error())) - shtDwn() + close(shutdown) } return } // Reload handles the change of config -func (rad *RadiusAgent) Reload(ctx *context.Context, shtDwn context.CancelFunc) (err error) { +func (rad *RadiusAgent) Reload(shutdown chan struct{}) (err error) { if rad.lnet == rad.cfg.RadiusAgentCfg().ListenNet && rad.lauth == rad.cfg.RadiusAgentCfg().ListenAuth && rad.lacct == rad.cfg.RadiusAgentCfg().ListenAcct { @@ -107,7 +106,7 @@ func (rad *RadiusAgent) Reload(ctx *context.Context, shtDwn context.CancelFunc) } rad.shutdown() - return rad.Start(ctx, shtDwn) + return rad.Start(shutdown) } // Shutdown stops the service diff --git a/services/rankings.go b/services/rankings.go index e518bfdee..82c421d2f 100644 --- a/services/rankings.go +++ b/services/rankings.go @@ -36,7 +36,7 @@ import ( func NewRankingService(cfg *config.CGRConfig, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *RankingService { return &RankingService{ cfg: cfg, connMgr: connMgr, @@ -62,7 +62,7 @@ type RankingService struct { } // Start should handle the sercive start -func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { +func (ran *RankingService) Start(shutdown chan struct{}) (err error) { if ran.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -77,7 +77,7 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.RankingS, utils.CacheS, utils.StateServiceUP) } - if err = cacheS.WaitToPrecache(ctx, + if err = cacheS.WaitToPrecache(shutdown, utils.CacheRankingProfiles, utils.CacheRankings, ); err != nil { @@ -102,7 +102,7 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.RankingS)) - if err := ran.ran.StartRankingS(ctx); err != nil { + if err := ran.ran.StartRankingS(context.TODO()); err != nil { return err } srv, err := engine.NewService(ran.ran) @@ -120,9 +120,9 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er } // Reload handles the change of config -func (ran *RankingService) Reload(ctx *context.Context, _ context.CancelFunc) (err error) { +func (ran *RankingService) Reload(_ chan struct{}) (err error) { ran.Lock() - ran.ran.Reload(ctx) + ran.ran.Reload(context.TODO()) ran.Unlock() return } diff --git a/services/rates.go b/services/rates.go index b75191220..b49d3d45a 100644 --- a/services/rates.go +++ b/services/rates.go @@ -22,7 +22,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -33,7 +32,7 @@ import ( // NewRateService constructs RateService func NewRateService(cfg *config.CGRConfig, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *RateService { return &RateService{ cfg: cfg, rldChan: make(chan struct{}), @@ -76,7 +75,7 @@ func (rs *RateService) IsRunning() bool { } // Reload handles the change of config -func (rs *RateService) Reload(*context.Context, context.CancelFunc) (_ error) { +func (rs *RateService) Reload(_ chan struct{}) (_ error) { rs.rldChan <- struct{}{} return } @@ -93,7 +92,7 @@ func (rs *RateService) Shutdown() (err error) { } // Start should handle the service start -func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { +func (rs *RateService) Start(shutdown chan struct{}) (err error) { if rs.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -107,7 +106,7 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.RateS, utils.CacheS, utils.StateServiceUP) } - if err = cacheS.WaitToPrecache(ctx, + if err = cacheS.WaitToPrecache(shutdown, utils.CacheRateProfiles, utils.CacheRateProfilesFilterIndexes, utils.CacheRateFilterIndexes); err != nil { diff --git a/services/registrarc.go b/services/registrarc.go index 4ad2d4f0c..07b89a019 100644 --- a/services/registrarc.go +++ b/services/registrarc.go @@ -22,7 +22,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/registrarc" @@ -32,7 +31,7 @@ import ( // NewRegistrarCService returns the Dispatcher Service func NewRegistrarCService(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *RegistrarCService { return &RegistrarCService{ cfg: cfg, connMgr: connMgr, @@ -58,7 +57,7 @@ type RegistrarCService struct { } // Start should handle the sercive start -func (dspS *RegistrarCService) Start(*context.Context, context.CancelFunc) (err error) { +func (dspS *RegistrarCService) Start(_ chan struct{}) (err error) { if dspS.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -75,7 +74,7 @@ func (dspS *RegistrarCService) Start(*context.Context, context.CancelFunc) (err } // Reload handles the change of config -func (dspS *RegistrarCService) Reload(*context.Context, context.CancelFunc) (err error) { +func (dspS *RegistrarCService) Reload(_ chan struct{}) (err error) { dspS.rldChan <- struct{}{} return // for the momment nothing to reload } diff --git a/services/resources.go b/services/resources.go index 34259a6a4..009302621 100644 --- a/services/resources.go +++ b/services/resources.go @@ -35,7 +35,7 @@ import ( func NewResourceService(cfg *config.CGRConfig, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *ResourceService { return &ResourceService{ cfg: cfg, connMgr: connMgr, @@ -62,7 +62,7 @@ type ResourceService struct { } // Start should handle the service start -func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { +func (reS *ResourceService) Start(shutdown chan struct{}) (err error) { if reS.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -77,7 +77,7 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.CacheS, utils.StateServiceUP) } - if err = cacheS.WaitToPrecache(ctx, + if err = cacheS.WaitToPrecache(shutdown, utils.CacheResourceProfiles, utils.CacheResources, utils.CacheResourceFilterIndexes); err != nil { @@ -100,7 +100,7 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e defer reS.Unlock() reS.reS = engine.NewResourceService(dbs.DataManager(), reS.cfg, fs.FilterS(), reS.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ResourceS)) - reS.reS.StartLoop(ctx) + reS.reS.StartLoop(context.TODO()) srv, _ := engine.NewService(reS.reS) // srv, _ := birpc.NewService(apis.NewResourceSv1(reS.reS), "", false) if !reS.cfg.DispatcherSCfg().Enabled { @@ -115,9 +115,9 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e } // Reload handles the change of config -func (reS *ResourceService) Reload(ctx *context.Context, _ context.CancelFunc) (err error) { +func (reS *ResourceService) Reload(_ chan struct{}) (err error) { reS.Lock() - reS.reS.Reload(ctx) + reS.reS.Reload(context.TODO()) reS.Unlock() return } diff --git a/services/routes.go b/services/routes.go index 586849fda..39eb10ac8 100644 --- a/services/routes.go +++ b/services/routes.go @@ -22,8 +22,6 @@ import ( "fmt" "sync" - "github.com/cgrates/birpc/context" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" @@ -35,7 +33,7 @@ import ( // NewRouteService returns the Route Service func NewRouteService(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *RouteService { return &RouteService{ cfg: cfg, connMgr: connMgr, @@ -60,7 +58,7 @@ type RouteService struct { } // Start should handle the sercive start -func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { +func (routeS *RouteService) Start(shutdown chan struct{}) (err error) { if routeS.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -74,7 +72,7 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.RouteS, utils.CacheS, utils.StateServiceUP) } - if err = cacheS.WaitToPrecache(ctx, + if err = cacheS.WaitToPrecache(shutdown, utils.CacheRouteProfiles, utils.CacheRouteFilterIndexes); err != nil { return @@ -110,7 +108,7 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e } // Reload handles the change of config -func (routeS *RouteService) Reload(*context.Context, context.CancelFunc) (err error) { +func (routeS *RouteService) Reload(_ chan struct{}) (err error) { return } diff --git a/services/sessions.go b/services/sessions.go index 61721e197..45355b009 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -22,8 +22,6 @@ import ( "fmt" "sync" - "github.com/cgrates/birpc/context" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/engine" @@ -37,7 +35,7 @@ import ( // NewSessionService returns the Session Service func NewSessionService(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *SessionService { return &SessionService{ cfg: cfg, connMgr: connMgr, @@ -64,7 +62,7 @@ type SessionService struct { } // Start should handle the service start -func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc) (err error) { +func (smg *SessionService) Start(shutdown chan struct{}) (err error) { if smg.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -111,26 +109,26 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc) smg.cl.BiRPCRegisterName(n, s) } // run this in it's own goroutine - go smg.start(shtDw) + go smg.start(shutdown) } close(smg.stateDeps.StateChan(utils.StateServiceUP)) return } -func (smg *SessionService) start(shtDw context.CancelFunc) (err error) { +func (smg *SessionService) start(shutdown chan struct{}) (err error) { if err := smg.cl.ServeBiRPC(smg.cfg.SessionSCfg().ListenBijson, smg.cfg.SessionSCfg().ListenBigob, smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> serve BiRPC error: %s!", utils.SessionS, err)) smg.Lock() smg.bircpEnabled = false smg.Unlock() - shtDw() + close(shutdown) } return } // Reload handles the change of config -func (smg *SessionService) Reload(*context.Context, context.CancelFunc) (err error) { +func (smg *SessionService) Reload(_ chan struct{}) (err error) { return } diff --git a/services/sipagent.go b/services/sipagent.go index fa5410a7d..03621692d 100644 --- a/services/sipagent.go +++ b/services/sipagent.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -34,7 +33,7 @@ import ( // NewSIPAgent returns the sip Agent func NewSIPAgent(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *SIPAgent { return &SIPAgent{ cfg: cfg, connMgr: connMgr, @@ -59,7 +58,7 @@ type SIPAgent struct { } // Start should handle the sercive start -func (sip *SIPAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) (err error) { +func (sip *SIPAgent) Start(shutdown chan struct{}) (err error) { if sip.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -78,20 +77,20 @@ func (sip *SIPAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) (err utils.SIPAgent, err)) return } - go sip.listenAndServe(shtDwn) + go sip.listenAndServe(shutdown) close(sip.stateDeps.StateChan(utils.StateServiceUP)) return } -func (sip *SIPAgent) listenAndServe(shtDwn context.CancelFunc) { +func (sip *SIPAgent) listenAndServe(shutdown chan struct{}) { if err := sip.sip.ListenAndServe(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.SIPAgent, err.Error())) - shtDwn() // stop the engine here + close(shutdown) // stop the engine here } } // Reload handles the change of config -func (sip *SIPAgent) Reload(_ *context.Context, shtDwn context.CancelFunc) (err error) { +func (sip *SIPAgent) Reload(shutdown chan struct{}) (err error) { if sip.oldListen == sip.cfg.SIPAgentCfg().Listen { return } @@ -100,7 +99,7 @@ func (sip *SIPAgent) Reload(_ *context.Context, shtDwn context.CancelFunc) (err sip.oldListen = sip.cfg.SIPAgentCfg().Listen sip.sip.InitStopChan() sip.Unlock() - go sip.listenAndServe(shtDwn) + go sip.listenAndServe(shutdown) return } diff --git a/services/stats.go b/services/stats.go index e40606fe2..8a2a2b03f 100644 --- a/services/stats.go +++ b/services/stats.go @@ -35,7 +35,7 @@ import ( func NewStatService(cfg *config.CGRConfig, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *StatService { return &StatService{ cfg: cfg, connMgr: connMgr, @@ -62,7 +62,7 @@ type StatService struct { } // Start should handle the sercive start -func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { +func (sts *StatService) Start(shutdown chan struct{}) (err error) { if sts.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -77,7 +77,7 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.StatS, utils.CacheS, utils.StateServiceUP) } - if err = cacheS.WaitToPrecache(ctx, + if err = cacheS.WaitToPrecache(shutdown, utils.CacheStatQueueProfiles, utils.CacheStatQueues, utils.CacheStatFilterIndexes); err != nil { @@ -102,7 +102,7 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.StatS)) - sts.sts.StartLoop(ctx) + sts.sts.StartLoop(context.TODO()) srv, _ := engine.NewService(sts.sts) // srv, _ := birpc.NewService(apis.NewStatSv1(sts.sts), "", false) if !sts.cfg.DispatcherSCfg().Enabled { @@ -116,9 +116,9 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e } // Reload handles the change of config -func (sts *StatService) Reload(ctx *context.Context, _ context.CancelFunc) (err error) { +func (sts *StatService) Reload(_ chan struct{}) (err error) { sts.Lock() - sts.sts.Reload(ctx) + sts.sts.Reload(context.TODO()) sts.Unlock() return } diff --git a/services/stordb.go b/services/stordb.go index d8edf3333..57fa4cef5 100644 --- a/services/stordb.go +++ b/services/stordb.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -56,7 +55,7 @@ type StorDBService struct { } // Start should handle the service start -func (db *StorDBService) Start(*context.Context, context.CancelFunc) (err error) { +func (db *StorDBService) Start(_ chan struct{}) (err error) { if db.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -87,7 +86,7 @@ func (db *StorDBService) Start(*context.Context, context.CancelFunc) (err error) } // Reload handles the change of config -func (db *StorDBService) Reload(*context.Context, context.CancelFunc) (err error) { +func (db *StorDBService) Reload(_ chan struct{}) (err error) { db.Lock() defer db.Unlock() if db.needsConnectionReload() { diff --git a/services/thresholds.go b/services/thresholds.go index f17285659..7e335fce2 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -35,7 +35,7 @@ import ( func NewThresholdService(cfg *config.CGRConfig, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *ThresholdService { return &ThresholdService{ cfg: cfg, srvDep: srvDep, @@ -62,7 +62,7 @@ type ThresholdService struct { } // Start should handle the sercive start -func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { +func (thrs *ThresholdService) Start(shutdown chan struct{}) (err error) { if thrs.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -77,7 +77,7 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc) if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.CacheS, utils.StateServiceUP) } - if err = cacheS.WaitToPrecache(ctx, + if err = cacheS.WaitToPrecache(shutdown, utils.CacheThresholdProfiles, utils.CacheThresholds, utils.CacheThresholdFilterIndexes); err != nil { @@ -101,7 +101,7 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc) thrs.thrs = engine.NewThresholdService(dbs.DataManager(), thrs.cfg, fs.FilterS(), thrs.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ThresholdS)) - thrs.thrs.StartLoop(ctx) + thrs.thrs.StartLoop(context.TODO()) srv, _ := engine.NewService(thrs.thrs) // srv, _ := birpc.NewService(apis.NewThresholdSv1(thrs.thrs), "", false) if !thrs.cfg.DispatcherSCfg().Enabled { @@ -115,9 +115,9 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc) } // Reload handles the change of config -func (thrs *ThresholdService) Reload(ctx *context.Context, _ context.CancelFunc) (_ error) { +func (thrs *ThresholdService) Reload(_ chan struct{}) (_ error) { thrs.Lock() - thrs.thrs.Reload(ctx) + thrs.thrs.Reload(context.TODO()) thrs.Unlock() return } diff --git a/services/tpes.go b/services/tpes.go index 296bae308..96dc45732 100644 --- a/services/tpes.go +++ b/services/tpes.go @@ -22,7 +22,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" @@ -34,7 +33,7 @@ import ( // NewTPeService is the constructor for the TpeService func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *TPeService { return &TPeService{ cfg: cfg, connMgr: connMgr, @@ -61,7 +60,7 @@ type TPeService struct { } // Start should handle the service start -func (ts *TPeService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { +func (ts *TPeService) Start(_ chan struct{}) (err error) { cls := ts.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ts.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.TPeS, utils.CommonListenerS, utils.StateServiceUP) @@ -82,7 +81,7 @@ func (ts *TPeService) Start(ctx *context.Context, _ context.CancelFunc) (err err } // Reload handles the change of config -func (ts *TPeService) Reload(*context.Context, context.CancelFunc) (err error) { +func (ts *TPeService) Reload(_ chan struct{}) (err error) { return } diff --git a/services/trends.go b/services/trends.go index cf088b0d6..e349ca0d3 100644 --- a/services/trends.go +++ b/services/trends.go @@ -35,7 +35,7 @@ import ( func NewTrendService(cfg *config.CGRConfig, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, - srvIndexer *servmanager.ServiceIndexer) servmanager.Service { + srvIndexer *servmanager.ServiceIndexer) *TrendService { return &TrendService{ cfg: cfg, connMgr: connMgr, @@ -61,7 +61,7 @@ type TrendService struct { } // Start should handle the sercive start -func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { +func (trs *TrendService) Start(shutdown chan struct{}) (err error) { if trs.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -76,7 +76,7 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.TrendS, utils.CacheS, utils.StateServiceUP) } - if err = cacheS.WaitToPrecache(ctx, + if err = cacheS.WaitToPrecache(shutdown, utils.CacheTrendProfiles, utils.CacheTrends, ); err != nil { @@ -99,7 +99,7 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err defer trs.Unlock() trs.trs = engine.NewTrendService(dbs.DataManager(), trs.cfg, fs.FilterS(), trs.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.TrendS)) - if err := trs.trs.StartTrendS(ctx); err != nil { + if err := trs.trs.StartTrendS(context.TODO()); err != nil { return err } srv, err := engine.NewService(trs.trs) @@ -117,9 +117,9 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err } // Reload handles the change of config -func (trs *TrendService) Reload(ctx *context.Context, _ context.CancelFunc) (err error) { +func (trs *TrendService) Reload(_ chan struct{}) (err error) { trs.Lock() - trs.trs.Reload(ctx) + trs.trs.Reload(context.TODO()) trs.Unlock() return } diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index d92c2de98..52d890e60 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -55,16 +55,16 @@ type ServiceManager struct { } // StartServices starts all enabled services -func (srvMngr *ServiceManager) StartServices(ctx *context.Context, shtDwn context.CancelFunc) { - go srvMngr.handleReload(ctx, shtDwn) +func (srvMngr *ServiceManager) StartServices(shutdown chan struct{}) { + go srvMngr.handleReload(shutdown) for _, service := range srvMngr.serviceIndexer.GetServices() { if service.ShouldRun() && !service.IsRunning() { srvMngr.shdWg.Add(1) go func() { - if err := service.Start(ctx, shtDwn); err != nil && + if err := service.Start(shutdown); err != nil && err != utils.ErrServiceAlreadyRunning { // in case the service was started in another gorutine utils.Logger.Err(fmt.Sprintf("<%s> failed to start %s because: %s", utils.ServiceManager, service.ServiceName(), err)) - shtDwn() + close(shutdown) } }() } @@ -99,11 +99,11 @@ func (srvMngr *ServiceManager) AddServices(services ...Service) { srvMngr.Unlock() } -func (srvMngr *ServiceManager) handleReload(ctx *context.Context, shtDwn context.CancelFunc) { +func (srvMngr *ServiceManager) handleReload(shutdown chan struct{}) { var srvName string for { select { - case <-ctx.Done(): + case <-shutdown: srvMngr.ShutdownServices() return case srvName = <-srvMngr.rldChan: @@ -111,34 +111,34 @@ func (srvMngr *ServiceManager) handleReload(ctx *context.Context, shtDwn context if srvName == config.RPCConnsJSON { go srvMngr.connMgr.Reload() } else { - go srvMngr.reloadService(srvName, ctx, shtDwn) + go srvMngr.reloadService(srvName, shutdown) } // handle RPC server } } -func (srvMngr *ServiceManager) reloadService(srvName string, ctx *context.Context, shtDwn context.CancelFunc) (err error) { +func (srvMngr *ServiceManager) reloadService(srvName string, shutdown chan struct{}) (err error) { srv := srvMngr.serviceIndexer.GetService(srvName) if srv.ShouldRun() { if srv.IsRunning() { - if err = srv.Reload(ctx, shtDwn); err != nil { + if err = srv.Reload(shutdown); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to reload <%s> err <%s>", utils.ServiceManager, srv.ServiceName(), err)) - shtDwn() + close(shutdown) return // stop if we encounter an error } } else { srvMngr.shdWg.Add(1) - if err = srv.Start(ctx, shtDwn); err != nil { + if err = srv.Start(shutdown); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to start <%s> err <%s>", utils.ServiceManager, srv.ServiceName(), err)) - shtDwn() + close(shutdown) return // stop if we encounter an error } } } else if srv.IsRunning() { if err = srv.Shutdown(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to stop service <%s> err <%s>", utils.ServiceManager, srv.ServiceName(), err)) - shtDwn() + close(shutdown) } srvMngr.shdWg.Done() } @@ -163,9 +163,9 @@ func (srvMngr *ServiceManager) ShutdownServices() { // Service interface that describes what functions should a service implement type Service interface { // Start should handle the service start - Start(*context.Context, context.CancelFunc) error + Start(chan struct{}) error // Reload handles the change of config - Reload(*context.Context, context.CancelFunc) error + Reload(chan struct{}) error // Shutdown stops the service Shutdown() error // IsRunning returns if the service is running