From 3d693aefe59ab1c74cbcfc86545248b2351c5b49 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Thu, 16 Jan 2025 11:50:11 +0200 Subject: [PATCH] Remove srvDep map Now syncing on shutdown based on states --- cmd/cgr-engine/cgr-engine.go | 16 ++++++---------- services/datadb.go | 29 ++++++++++++++++++++--------- services/rankings.go | 8 +------- services/resources.go | 8 +------- services/stats.go | 7 +------ services/thresholds.go | 8 +------- services/trends.go | 8 +------- 7 files changed, 31 insertions(+), 53 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index a0548183a..b76c354f5 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -124,10 +124,6 @@ func runCGREngine(fs []string) (err error) { utils.Logger.Info(fmt.Sprintf(" starting version <%s><%s>", vers, runtime.Version())) - srvDep := map[string]*sync.WaitGroup{ - utils.DataDB: new(sync.WaitGroup), - } - // ServiceIndexer will share service references to all services registry := servmanager.NewServiceRegistry() gvS := services.NewGlobalVarS(cfg) @@ -136,7 +132,7 @@ func runCGREngine(fs []string) (err error) { anzS := services.NewAnalyzerService(cfg) cms := services.NewConnManagerService(cfg) lgs := services.NewLoggerService(cfg, *flags.Logger) - dmS := services.NewDataDBService(cfg, *flags.SetVersions, srvDep) + dmS := services.NewDataDBService(cfg, *flags.SetVersions) sdbS := services.NewStorDBService(cfg, *flags.SetVersions) configS := services.NewConfigService(cfg) guardianS := services.NewGuardianService(cfg) @@ -151,11 +147,11 @@ func runCGREngine(fs []string) (err error) { attrS := services.NewAttributeService(cfg, dspS) chrgS := services.NewChargerService(cfg) routeS := services.NewRouteService(cfg) - resourceS := services.NewResourceService(cfg, srvDep) - trendS := services.NewTrendService(cfg, srvDep) - rankingS := services.NewRankingService(cfg, srvDep) - thS := services.NewThresholdService(cfg, srvDep) - stS := services.NewStatService(cfg, srvDep) + resourceS := services.NewResourceService(cfg) + trendS := services.NewTrendService(cfg) + rankingS := services.NewRankingService(cfg) + thS := services.NewThresholdService(cfg) + stS := services.NewStatService(cfg) erS := services.NewEventReaderService(cfg) dnsAgent := services.NewDNSAgent(cfg) fsAgent := services.NewFreeswitchAgent(cfg) diff --git a/services/datadb.go b/services/datadb.go index 697d8c33b..61c313ed3 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -29,12 +29,10 @@ import ( ) // NewDataDBService returns the DataDB Service -func NewDataDBService(cfg *config.CGRConfig, setVersions bool, - srvDep map[string]*sync.WaitGroup) *DataDBService { +func NewDataDBService(cfg *config.CGRConfig, setVersions bool) *DataDBService { return &DataDBService{ cfg: cfg, setVersions: setVersions, - srvDep: srvDep, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -46,7 +44,6 @@ type DataDBService struct { oldDBCfg *config.DataDbCfg dm *engine.DataManager setVersions bool - srvDep map[string]*sync.WaitGroup stateDeps *StateDependencies // channel subscriptions for state changes } @@ -112,13 +109,27 @@ func (db *DataDBService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegis } // Shutdown stops the service -func (db *DataDBService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) { - db.srvDep[utils.DataDB].Wait() +func (db *DataDBService) Shutdown(registry *servmanager.ServiceRegistry) error { + deps := []string{ + utils.ResourceS, + utils.TrendS, + utils.RankingS, + utils.StatS, + utils.ThresholdS, + } + for _, svcID := range deps { + if !servmanager.IsServiceInState(registry.Lookup(svcID), utils.StateServiceUP) { + continue + } + _, err := WaitForServiceState(utils.StateServiceDOWN, svcID, registry, db.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err + } + } db.Lock() + defer db.Unlock() db.dm.DataDB().Close() - db.dm = nil - db.Unlock() - return + return nil } // ServiceName returns the service name diff --git a/services/rankings.go b/services/rankings.go index bc5143ef0..c917292f7 100644 --- a/services/rankings.go +++ b/services/rankings.go @@ -31,11 +31,9 @@ import ( ) // NewRankingService returns the RankingS Service -func NewRankingService(cfg *config.CGRConfig, - srvDep map[string]*sync.WaitGroup) *RankingService { +func NewRankingService(cfg *config.CGRConfig) *RankingService { return &RankingService{ cfg: cfg, - srvDep: srvDep, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -47,14 +45,11 @@ type RankingService struct { ran *engine.RankingS cl *commonlisteners.CommonListenerS - srvDep map[string]*sync.WaitGroup stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { - ran.srvDep[utils.DataDB].Add(1) - srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -107,7 +102,6 @@ func (ran *RankingService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceReg // Shutdown stops the service func (ran *RankingService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - defer ran.srvDep[utils.DataDB].Done() ran.Lock() defer ran.Unlock() ran.ran.StopRankingS() diff --git a/services/resources.go b/services/resources.go index a972a97f4..28e025581 100644 --- a/services/resources.go +++ b/services/resources.go @@ -30,11 +30,9 @@ import ( ) // NewResourceService returns the Resource Service -func NewResourceService(cfg *config.CGRConfig, - srvDep map[string]*sync.WaitGroup) *ResourceService { +func NewResourceService(cfg *config.CGRConfig) *ResourceService { return &ResourceService{ cfg: cfg, - srvDep: srvDep, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -47,14 +45,11 @@ type ResourceService struct { reS *engine.ResourceS cl *commonlisteners.CommonListenerS - srvDep map[string]*sync.WaitGroup stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start func (reS *ResourceService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { - reS.srvDep[utils.DataDB].Add(1) - srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -104,7 +99,6 @@ func (reS *ResourceService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRe // Shutdown stops the service func (reS *ResourceService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - defer reS.srvDep[utils.DataDB].Done() reS.Lock() defer reS.Unlock() reS.reS.Shutdown(context.TODO()) //we don't verify the error because shutdown never returns an error diff --git a/services/stats.go b/services/stats.go index e7f54ce9e..e89969431 100644 --- a/services/stats.go +++ b/services/stats.go @@ -30,10 +30,9 @@ import ( ) // NewStatService returns the Stat Service -func NewStatService(cfg *config.CGRConfig, srvDep map[string]*sync.WaitGroup) *StatService { +func NewStatService(cfg *config.CGRConfig) *StatService { return &StatService{ cfg: cfg, - srvDep: srvDep, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -46,14 +45,11 @@ type StatService struct { sts *engine.StatS cl *commonlisteners.CommonListenerS - srvDep map[string]*sync.WaitGroup stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start func (sts *StatService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { - sts.srvDep[utils.DataDB].Add(1) - srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -103,7 +99,6 @@ func (sts *StatService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegist // Shutdown stops the service func (sts *StatService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - defer sts.srvDep[utils.DataDB].Done() sts.Lock() defer sts.Unlock() sts.sts.Shutdown(context.TODO()) diff --git a/services/thresholds.go b/services/thresholds.go index dfc137dbb..29b2ea729 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -30,11 +30,9 @@ import ( ) // NewThresholdService returns the Threshold Service -func NewThresholdService(cfg *config.CGRConfig, - srvDep map[string]*sync.WaitGroup) *ThresholdService { +func NewThresholdService(cfg *config.CGRConfig) *ThresholdService { return &ThresholdService{ cfg: cfg, - srvDep: srvDep, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -47,14 +45,11 @@ type ThresholdService struct { thrs *engine.ThresholdS cl *commonlisteners.CommonListenerS - srvDep map[string]*sync.WaitGroup stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start func (thrs *ThresholdService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { - thrs.srvDep[utils.DataDB].Add(1) - srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -104,7 +99,6 @@ func (thrs *ThresholdService) Reload(_ *utils.SyncedChan, _ *servmanager.Service // Shutdown stops the service func (thrs *ThresholdService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) { - defer thrs.srvDep[utils.DataDB].Done() thrs.Lock() defer thrs.Unlock() thrs.thrs.Shutdown(context.TODO()) diff --git a/services/trends.go b/services/trends.go index 7df080f19..728992c93 100644 --- a/services/trends.go +++ b/services/trends.go @@ -30,11 +30,9 @@ import ( ) // NewTrendsService returns the TrendS Service -func NewTrendService(cfg *config.CGRConfig, - srvDep map[string]*sync.WaitGroup) *TrendService { +func NewTrendService(cfg *config.CGRConfig) *TrendService { return &TrendService{ cfg: cfg, - srvDep: srvDep, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -46,14 +44,11 @@ type TrendService struct { trs *engine.TrendS cl *commonlisteners.CommonListenerS - srvDep map[string]*sync.WaitGroup stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { - trs.srvDep[utils.DataDB].Add(1) - srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -106,7 +101,6 @@ func (trs *TrendService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegis // Shutdown stops the service func (trs *TrendService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - defer trs.srvDep[utils.DataDB].Done() trs.Lock() defer trs.Unlock() trs.trs.StopTrendS()