Use channel instead of context to handle shutdown

This commit is contained in:
ionutboangiu
2024-12-12 20:27:23 +02:00
committed by Dan Christian Bogos
parent d9359a4005
commit c8a3ebe5e8
50 changed files with 281 additions and 333 deletions

View File

@@ -72,10 +72,8 @@ func runCGREngine(fs []string) (err error) {
runtime.GOMAXPROCS(1) // Having multiple cpus may slow down computing due to CPU management, to be reviewed in future Go releases
}
// Init config
ctx, cancel := context.WithCancel(context.Background())
var cfg *config.CGRConfig
if cfg, err = services.InitConfigFromPath(ctx, *flags.CfgPath, *flags.NodeID, *flags.LogLevel); err != nil || *flags.CheckConfig {
if cfg, err = services.InitConfigFromPath(context.TODO(), *flags.CfgPath, *flags.NodeID, *flags.LogLevel); err != nil || *flags.CheckConfig {
return
}
@@ -89,7 +87,8 @@ func runCGREngine(fs []string) (err error) {
shdWg := new(sync.WaitGroup)
shdWg.Add(1)
go handleSignals(ctx, cancel, cfg, shdWg)
shutdown := make(chan struct{})
go handleSignals(shutdown, cfg, shdWg)
if *flags.ScheduledShutdown != utils.EmptyString {
var shtDwDur time.Duration
@@ -101,8 +100,8 @@ func runCGREngine(fs []string) (err error) {
tm := time.NewTimer(shtDwDur)
select {
case <-tm.C:
cancel()
case <-ctx.Done():
close(shutdown)
case <-shutdown:
tm.Stop()
}
shdWg.Done()
@@ -111,7 +110,7 @@ func runCGREngine(fs []string) (err error) {
connMgr := engine.NewConnManager(cfg)
// init syslog
if utils.Logger, err = engine.NewLogger(ctx,
if utils.Logger, err = engine.NewLogger(context.TODO(),
utils.FirstNonEmpty(*flags.Logger, cfg.LoggerCfg().Type),
cfg.GeneralCfg().DefaultTenant,
cfg.GeneralCfg().NodeID,
@@ -245,14 +244,14 @@ func runCGREngine(fs []string) (err error) {
}()
shdWg.Add(1)
if err = gvS.Start(ctx, cancel); err != nil {
if err = gvS.Start(shutdown); err != nil {
shdWg.Done()
srvManager.ShutdownServices()
return
}
if cls.ShouldRun() {
shdWg.Add(1)
if err = cls.Start(ctx, cancel); err != nil {
if err = cls.Start(shutdown); err != nil {
shdWg.Done()
srvManager.ShutdownServices()
return
@@ -260,7 +259,7 @@ func runCGREngine(fs []string) (err error) {
}
if efs.ShouldRun() { // efs checking first because of loggers
shdWg.Add(1)
if err = efs.Start(ctx, cancel); err != nil {
if err = efs.Start(shutdown); err != nil {
shdWg.Done()
srvManager.ShutdownServices()
return
@@ -268,7 +267,7 @@ func runCGREngine(fs []string) (err error) {
}
if dmS.ShouldRun() { // Some services can run without db, ie: ERs
shdWg.Add(1)
if err = dmS.Start(ctx, cancel); err != nil {
if err = dmS.Start(shutdown); err != nil {
shdWg.Done()
srvManager.ShutdownServices()
return
@@ -276,7 +275,7 @@ func runCGREngine(fs []string) (err error) {
}
if sdbS.ShouldRun() {
shdWg.Add(1)
if err = sdbS.Start(ctx, cancel); err != nil {
if err = sdbS.Start(shutdown); err != nil {
shdWg.Done()
srvManager.ShutdownServices()
return
@@ -285,7 +284,7 @@ func runCGREngine(fs []string) (err error) {
if anzS.ShouldRun() {
shdWg.Add(1)
if err = anzS.Start(ctx, cancel); err != nil {
if err = anzS.Start(shutdown); err != nil {
shdWg.Done()
srvManager.ShutdownServices()
return
@@ -295,28 +294,28 @@ func runCGREngine(fs []string) (err error) {
}
shdWg.Add(1)
if err = coreS.Start(ctx, cancel); err != nil {
if err = coreS.Start(shutdown); err != nil {
shdWg.Done()
srvManager.ShutdownServices()
return
}
shdWg.Add(1)
if err = cacheS.Start(ctx, cancel); err != nil {
if err = cacheS.Start(shutdown); err != nil {
shdWg.Done()
srvManager.ShutdownServices()
return
}
srvManager.StartServices(ctx, cancel)
srvManager.StartServices(shutdown)
cgrInitServiceManagerV1(iServeManagerCh, srvManager, cfg, cls.CLS(), anzS)
if *flags.Preload != utils.EmptyString {
if err = cgrRunPreload(ctx, cfg, *flags.Preload, srvIdxr); err != nil {
if err = cgrRunPreload(cfg, *flags.Preload, srvIdxr); err != nil {
return
}
}
// Serve rpc connections
cgrStartRPC(ctx, cancel, cfg, srvIdxr)
cgrStartRPC(cfg, srvIdxr, shutdown)
// TODO: find a better location for this if block
if *flags.MemPrfDir != "" {
@@ -330,26 +329,24 @@ func runCGREngine(fs []string) (err error) {
}
}
<-ctx.Done()
<-shutdown
return
}
func cgrRunPreload(ctx *context.Context, cfg *config.CGRConfig, loaderIDs string,
// TODO: merge with LoaderService
func cgrRunPreload(cfg *config.CGRConfig, loaderIDs string,
sIdxr *servmanager.ServiceIndexer) (err error) {
if !cfg.LoaderCfg().Enabled() {
err = fmt.Errorf("<%s> not enabled but required by preload mechanism", utils.LoaderS)
return
}
loader := sIdxr.GetService(utils.LoaderS).(*services.LoaderService)
select {
case <-loader.StateChan(utils.StateServiceUP):
case <-ctx.Done():
return
if utils.StructChanTimeout(loader.StateChan(utils.StateServiceUP), cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.PreloadCgr, utils.LoaderS, utils.StateServiceUP)
}
var reply string
for _, loaderID := range strings.Split(loaderIDs, utils.FieldsSep) {
if err = loader.GetLoaderS().V1Run(ctx, &loaders.ArgsProcessFolder{
if err = loader.GetLoaderS().V1Run(context.TODO(), &loaders.ArgsProcessFolder{
APIOpts: map[string]any{
utils.MetaForceLock: true, // force lock will unlock the file in case is locked and return error
utils.MetaStopOnError: true,
@@ -373,21 +370,19 @@ func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector,
iServMngrCh <- anz.GetInternalCodec(srv, utils.ServiceManager)
}
func cgrStartRPC(ctx *context.Context, shtdwnEngine context.CancelFunc,
cfg *config.CGRConfig, sIdxr *servmanager.ServiceIndexer) {
func cgrStartRPC(cfg *config.CGRConfig, sIdxr *servmanager.ServiceIndexer, shutdown chan struct{}) {
if cfg.DispatcherSCfg().Enabled { // wait only for dispatcher as cache is allways registered before this
select {
case <-sIdxr.GetService(utils.DispatcherS).StateChan(utils.StateServiceUP):
case <-ctx.Done():
if utils.StructChanTimeout(
sIdxr.GetService(utils.DispatcherS).StateChan(utils.StateServiceUP),
cfg.GeneralCfg().ConnectTimeout) {
return
}
}
cl := sIdxr.GetService(utils.CommonListenerS).(*services.CommonListenerService).CLS()
cl.StartServer(ctx, shtdwnEngine, cfg)
cl.StartServer(cfg, shutdown)
}
func handleSignals(ctx *context.Context, shutdown context.CancelFunc,
cfg *config.CGRConfig, shdWg *sync.WaitGroup) {
func handleSignals(stopChan chan struct{}, cfg *config.CGRConfig, shdWg *sync.WaitGroup) {
shutdownSignal := make(chan os.Signal, 1)
reloadSignal := make(chan os.Signal, 1)
signal.Notify(shutdownSignal, os.Interrupt,
@@ -395,18 +390,16 @@ func handleSignals(ctx *context.Context, shutdown context.CancelFunc,
signal.Notify(reloadSignal, syscall.SIGHUP)
for {
select {
case <-ctx.Done():
case <-stopChan:
shdWg.Done()
return
case <-shutdownSignal:
shutdown()
shdWg.Done()
return
close(stopChan)
case <-reloadSignal:
// do it in its own goroutine in order to not block the signal handler with the reload functionality
go func() {
var reply string
if err := cfg.V1ReloadConfig(ctx,
if err := cfg.V1ReloadConfig(context.TODO(),
new(config.ReloadArgs), &reply); err != nil {
utils.Logger.Warning(
fmt.Sprintf("Error reloading configuration: <%s>", err))