Remove srvDep map

Now syncing on shutdown based on states
This commit is contained in:
ionutboangiu
2025-01-16 11:50:11 +02:00
committed by Dan Christian Bogos
parent 7ffe0ae2f5
commit 3d693aefe5
7 changed files with 31 additions and 53 deletions

View File

@@ -124,10 +124,6 @@ func runCGREngine(fs []string) (err error) {
utils.Logger.Info(fmt.Sprintf("<CoreS> starting version <%s><%s>", vers, runtime.Version()))
srvDep := map[string]*sync.WaitGroup{
utils.DataDB: new(sync.WaitGroup),
}
// ServiceIndexer will share service references to all services
registry := servmanager.NewServiceRegistry()
gvS := services.NewGlobalVarS(cfg)
@@ -136,7 +132,7 @@ func runCGREngine(fs []string) (err error) {
anzS := services.NewAnalyzerService(cfg)
cms := services.NewConnManagerService(cfg)
lgs := services.NewLoggerService(cfg, *flags.Logger)
dmS := services.NewDataDBService(cfg, *flags.SetVersions, srvDep)
dmS := services.NewDataDBService(cfg, *flags.SetVersions)
sdbS := services.NewStorDBService(cfg, *flags.SetVersions)
configS := services.NewConfigService(cfg)
guardianS := services.NewGuardianService(cfg)
@@ -151,11 +147,11 @@ func runCGREngine(fs []string) (err error) {
attrS := services.NewAttributeService(cfg, dspS)
chrgS := services.NewChargerService(cfg)
routeS := services.NewRouteService(cfg)
resourceS := services.NewResourceService(cfg, srvDep)
trendS := services.NewTrendService(cfg, srvDep)
rankingS := services.NewRankingService(cfg, srvDep)
thS := services.NewThresholdService(cfg, srvDep)
stS := services.NewStatService(cfg, srvDep)
resourceS := services.NewResourceService(cfg)
trendS := services.NewTrendService(cfg)
rankingS := services.NewRankingService(cfg)
thS := services.NewThresholdService(cfg)
stS := services.NewStatService(cfg)
erS := services.NewEventReaderService(cfg)
dnsAgent := services.NewDNSAgent(cfg)
fsAgent := services.NewFreeswitchAgent(cfg)

View File

@@ -29,12 +29,10 @@ import (
)
// NewDataDBService returns the DataDB Service
func NewDataDBService(cfg *config.CGRConfig, setVersions bool,
srvDep map[string]*sync.WaitGroup) *DataDBService {
func NewDataDBService(cfg *config.CGRConfig, setVersions bool) *DataDBService {
return &DataDBService{
cfg: cfg,
setVersions: setVersions,
srvDep: srvDep,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -46,7 +44,6 @@ type DataDBService struct {
oldDBCfg *config.DataDbCfg
dm *engine.DataManager
setVersions bool
srvDep map[string]*sync.WaitGroup
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -112,13 +109,27 @@ func (db *DataDBService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegis
}
// Shutdown stops the service
func (db *DataDBService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) {
db.srvDep[utils.DataDB].Wait()
func (db *DataDBService) Shutdown(registry *servmanager.ServiceRegistry) error {
deps := []string{
utils.ResourceS,
utils.TrendS,
utils.RankingS,
utils.StatS,
utils.ThresholdS,
}
for _, svcID := range deps {
if !servmanager.IsServiceInState(registry.Lookup(svcID), utils.StateServiceUP) {
continue
}
_, err := WaitForServiceState(utils.StateServiceDOWN, svcID, registry, db.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
}
db.Lock()
defer db.Unlock()
db.dm.DataDB().Close()
db.dm = nil
db.Unlock()
return
return nil
}
// ServiceName returns the service name

View File

@@ -31,11 +31,9 @@ import (
)
// NewRankingService returns the RankingS Service
func NewRankingService(cfg *config.CGRConfig,
srvDep map[string]*sync.WaitGroup) *RankingService {
func NewRankingService(cfg *config.CGRConfig) *RankingService {
return &RankingService{
cfg: cfg,
srvDep: srvDep,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -47,14 +45,11 @@ type RankingService struct {
ran *engine.RankingS
cl *commonlisteners.CommonListenerS
srvDep map[string]*sync.WaitGroup
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
ran.srvDep[utils.DataDB].Add(1)
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
@@ -107,7 +102,6 @@ func (ran *RankingService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceReg
// Shutdown stops the service
func (ran *RankingService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
defer ran.srvDep[utils.DataDB].Done()
ran.Lock()
defer ran.Unlock()
ran.ran.StopRankingS()

View File

@@ -30,11 +30,9 @@ import (
)
// NewResourceService returns the Resource Service
func NewResourceService(cfg *config.CGRConfig,
srvDep map[string]*sync.WaitGroup) *ResourceService {
func NewResourceService(cfg *config.CGRConfig) *ResourceService {
return &ResourceService{
cfg: cfg,
srvDep: srvDep,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -47,14 +45,11 @@ type ResourceService struct {
reS *engine.ResourceS
cl *commonlisteners.CommonListenerS
srvDep map[string]*sync.WaitGroup
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
func (reS *ResourceService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
reS.srvDep[utils.DataDB].Add(1)
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
@@ -104,7 +99,6 @@ func (reS *ResourceService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRe
// Shutdown stops the service
func (reS *ResourceService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
defer reS.srvDep[utils.DataDB].Done()
reS.Lock()
defer reS.Unlock()
reS.reS.Shutdown(context.TODO()) //we don't verify the error because shutdown never returns an error

View File

@@ -30,10 +30,9 @@ import (
)
// NewStatService returns the Stat Service
func NewStatService(cfg *config.CGRConfig, srvDep map[string]*sync.WaitGroup) *StatService {
func NewStatService(cfg *config.CGRConfig) *StatService {
return &StatService{
cfg: cfg,
srvDep: srvDep,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -46,14 +45,11 @@ type StatService struct {
sts *engine.StatS
cl *commonlisteners.CommonListenerS
srvDep map[string]*sync.WaitGroup
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
func (sts *StatService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
sts.srvDep[utils.DataDB].Add(1)
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
@@ -103,7 +99,6 @@ func (sts *StatService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegist
// Shutdown stops the service
func (sts *StatService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
defer sts.srvDep[utils.DataDB].Done()
sts.Lock()
defer sts.Unlock()
sts.sts.Shutdown(context.TODO())

View File

@@ -30,11 +30,9 @@ import (
)
// NewThresholdService returns the Threshold Service
func NewThresholdService(cfg *config.CGRConfig,
srvDep map[string]*sync.WaitGroup) *ThresholdService {
func NewThresholdService(cfg *config.CGRConfig) *ThresholdService {
return &ThresholdService{
cfg: cfg,
srvDep: srvDep,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -47,14 +45,11 @@ type ThresholdService struct {
thrs *engine.ThresholdS
cl *commonlisteners.CommonListenerS
srvDep map[string]*sync.WaitGroup
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
func (thrs *ThresholdService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
thrs.srvDep[utils.DataDB].Add(1)
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
@@ -104,7 +99,6 @@ func (thrs *ThresholdService) Reload(_ *utils.SyncedChan, _ *servmanager.Service
// Shutdown stops the service
func (thrs *ThresholdService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) {
defer thrs.srvDep[utils.DataDB].Done()
thrs.Lock()
defer thrs.Unlock()
thrs.thrs.Shutdown(context.TODO())

View File

@@ -30,11 +30,9 @@ import (
)
// NewTrendsService returns the TrendS Service
func NewTrendService(cfg *config.CGRConfig,
srvDep map[string]*sync.WaitGroup) *TrendService {
func NewTrendService(cfg *config.CGRConfig) *TrendService {
return &TrendService{
cfg: cfg,
srvDep: srvDep,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -46,14 +44,11 @@ type TrendService struct {
trs *engine.TrendS
cl *commonlisteners.CommonListenerS
srvDep map[string]*sync.WaitGroup
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
trs.srvDep[utils.DataDB].Add(1)
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
@@ -106,7 +101,6 @@ func (trs *TrendService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegis
// Shutdown stops the service
func (trs *TrendService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
defer trs.srvDep[utils.DataDB].Done()
trs.Lock()
defer trs.Unlock()
trs.trs.StopTrendS()