diff --git a/services/analyzers.go b/services/analyzers.go index 0d3ec6cae..1e622d9e6 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -37,7 +37,6 @@ func NewAnalyzerService(cfg *config.CGRConfig) *AnalyzerService { anz := &AnalyzerService{ cfg: cfg, stateDeps: NewStateDependencies([]string{ - utils.StateServiceInit, utils.StateServiceUP, utils.StateServiceDOWN, }), @@ -69,7 +68,6 @@ func (anz *AnalyzerService) Start(shutdown *utils.SyncedChan, registry *servmana if anz.anz, err = analyzers.NewAnalyzerS(anz.cfg); err != nil { return } - close(anz.stateDeps.StateChan(utils.StateServiceInit)) anzCtx, cancel := context.WithCancel(context.TODO()) anz.cancelFunc = cancel go func(a *analyzers.AnalyzerS) { diff --git a/services/commonlisteners.go b/services/commonlisteners.go index 7c76cfead..514dff9e3 100644 --- a/services/commonlisteners.go +++ b/services/commonlisteners.go @@ -100,7 +100,7 @@ func (s *CommonListenerService) Shutdown(registry *servmanager.ServiceRegistry) utils.TrendS, } for _, svcID := range deps { - if !servmanager.IsServiceInState(registry.Lookup(svcID), utils.StateServiceUP) { + if servmanager.State(registry.Lookup(svcID)) != utils.StateServiceUP { continue } _, err := WaitForServiceState(utils.StateServiceDOWN, svcID, registry, s.cfg.GeneralCfg().ConnectTimeout) diff --git a/services/connmanager.go b/services/connmanager.go index 4c9dcdf96..05f58f3b1 100644 --- a/services/connmanager.go +++ b/services/connmanager.go @@ -50,7 +50,7 @@ type ConnManagerService struct { func (s *ConnManagerService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { s.anz = registry.Lookup(utils.AnalyzerS).(*AnalyzerService) if s.anz.ShouldRun() { // wait for AnalyzerS only if it should run - if _, err := WaitForServiceState(utils.StateServiceInit, utils.AnalyzerS, registry, + if _, err := WaitForServiceState(utils.StateServiceUP, utils.AnalyzerS, registry, s.cfg.GeneralCfg().ConnectTimeout); err != nil { return err } diff --git a/services/datadb.go b/services/datadb.go index 23a747d16..4af0f7325 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -118,7 +118,7 @@ func (db *DataDBService) Shutdown(registry *servmanager.ServiceRegistry) error { utils.ThresholdS, } for _, svcID := range deps { - if !servmanager.IsServiceInState(registry.Lookup(svcID), utils.StateServiceUP) { + if servmanager.State(registry.Lookup(svcID)) != utils.StateServiceUP { continue } _, err := WaitForServiceState(utils.StateServiceDOWN, svcID, registry, db.cfg.GeneralCfg().ConnectTimeout) diff --git a/services/statedeps.go b/services/statedeps.go index 31bf50670..7db397dba 100644 --- a/services/statedeps.go +++ b/services/statedeps.go @@ -27,12 +27,22 @@ import ( "github.com/cgrates/cgrates/utils" ) -// NewStateDependencies constructs a StateDependencies struct +// NewStateDependencies sets up state tracking using buffered channels, where +// each service state has its own channel and they pass around a single signal. func NewStateDependencies(servStates []string) (stDeps *StateDependencies) { stDeps = &StateDependencies{stateDeps: make(map[string]chan struct{})} for _, stateID := range servStates { - stDeps.stateDeps[stateID] = make(chan struct{}) + stDeps.stateDeps[stateID] = make(chan struct{}, 1) // non-blocking } + + // A single state signal is shared between all state channels. + // Initially placed in SERVICE_DOWN during initialization. + c, has := stDeps.stateDeps[utils.StateServiceDOWN] + if !has { + panic(fmt.Sprintf("missing required initial state %q", utils.StateServiceDOWN)) + } + c <- struct{}{} + return } @@ -82,8 +92,10 @@ func WaitForServiceState(state, serviceID string, registry *servmanager.ServiceR return srv, nil } } + stateCh := srv.StateChan(state) select { - case <-srv.StateChan(state): + case <-stateCh: + stateCh <- struct{}{} return srv, nil case <-time.After(timeout): return nil, fmt.Errorf("timed out waiting for service %q state %q", serviceID, state) diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index da9cabd9e..b35ffb586 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -20,6 +20,7 @@ package servmanager import ( "fmt" + "slices" "sync" "github.com/cgrates/birpc/context" @@ -53,17 +54,17 @@ type ServiceManager struct { func (m *ServiceManager) StartServices(shutdown *utils.SyncedChan) { go m.handleReload(shutdown) for _, svc := range m.registry.List() { - // TODO: verify if IsServiceInState check is needed. It should + // TODO: verify if service state check is needed. It should // be redundant since ServManager manages all services and this // runs only at startup - if svc.ShouldRun() && !IsServiceInState(svc, utils.StateServiceUP) { + if svc.ShouldRun() && State(svc) == utils.StateServiceDOWN { m.shdWg.Add(1) go func() { if err := svc.Start(shutdown, m.registry); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to start <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err)) shutdown.CloseOnce() } - close(svc.StateChan(utils.StateServiceUP)) + MustSetState(svc, utils.StateServiceUP) utils.Logger.Info(fmt.Sprintf("<%s> started <%s> service", utils.ServiceManager, svc.ServiceName())) }() } @@ -87,10 +88,13 @@ func (m *ServiceManager) handleReload(shutdown *utils.SyncedChan) { func (m *ServiceManager) reloadService(id string, shutdown *utils.SyncedChan) (err error) { svc := m.registry.Lookup(id) - isUp := IsServiceInState(svc, utils.StateServiceUP) + + // Consider services in pending states (not up/down) to be up. This assumes + // Start/Reload/Shutdown functions are handled synchronously. + isUp := State(svc) != utils.StateServiceDOWN + if svc.ShouldRun() { if isUp { - // TODO: state channels must be reinitiated for both SERVICE_UP and SERVICE_DOWN. if err = svc.Reload(shutdown, m.registry); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to reload <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err)) shutdown.CloseOnce() @@ -104,7 +108,7 @@ func (m *ServiceManager) reloadService(id string, shutdown *utils.SyncedChan) (e shutdown.CloseOnce() return // stop if we encounter an error } - close(svc.StateChan(utils.StateServiceUP)) + MustSetState(svc, utils.StateServiceUP) utils.Logger.Info(fmt.Sprintf("<%s> started <%s> service", utils.ServiceManager, svc.ServiceName())) } } else if isUp { @@ -112,7 +116,7 @@ func (m *ServiceManager) reloadService(id string, shutdown *utils.SyncedChan) (e utils.Logger.Err(fmt.Sprintf("<%s> failed to shut down <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err)) shutdown.CloseOnce() } - close(svc.StateChan(utils.StateServiceDOWN)) + MustSetState(svc, utils.StateServiceDOWN) utils.Logger.Info(fmt.Sprintf("<%s> stopped <%s> service", utils.ServiceManager, svc.ServiceName())) m.shdWg.Done() } @@ -122,7 +126,7 @@ func (m *ServiceManager) reloadService(id string, shutdown *utils.SyncedChan) (e // ShutdownServices will stop all services func (m *ServiceManager) ShutdownServices() { for _, svc := range m.registry.List() { - if IsServiceInState(svc, utils.StateServiceUP) { + if State(svc) != utils.StateServiceDOWN { go func() { defer m.shdWg.Done() if err := svc.Shutdown(m.registry); err != nil { @@ -130,7 +134,7 @@ func (m *ServiceManager) ShutdownServices() { utils.ServiceManager, svc.ServiceName(), err)) return } - close(svc.StateChan(utils.StateServiceDOWN)) + MustSetState(svc, utils.StateServiceDOWN) utils.Logger.Info(fmt.Sprintf("<%s> stopped <%s> service", utils.ServiceManager, svc.ServiceName())) }() } @@ -304,3 +308,41 @@ func IsServiceInState(svc Service, state string) bool { return false } } + +// MustSetState changes a service's state, panicking if it fails. +func MustSetState(svc Service, state string) { + if err := SetState(svc, state); err != nil { + panic(err) + } +} + +// SetServiceState moves the state signal to a new valid state. +// Returns an error if the state is invalid or if no current state was found. +func SetState(svc Service, state string) error { + if !slices.Contains([]string{utils.StateServiceUP, utils.StateServiceDOWN}, state) { + return fmt.Errorf("invalid service state: %q", state) + } + select { + case <-svc.StateChan(utils.StateServiceUP): + case <-svc.StateChan(utils.StateServiceDOWN): + default: + return fmt.Errorf("service %q in undefined state", svc.ServiceName()) + } + svc.StateChan(state) <- struct{}{} + return nil +} + +// State returns the current state of a service by checking which channel holds +// the state signal. Returns empty string if no valid state is found. +func State(svc Service) string { + select { + case <-svc.StateChan(utils.StateServiceUP): + svc.StateChan(utils.StateServiceUP) <- struct{}{} + return utils.StateServiceUP + case <-svc.StateChan(utils.StateServiceDOWN): + svc.StateChan(utils.StateServiceDOWN) <- struct{}{} + return utils.StateServiceDOWN + default: + return "" + } +} diff --git a/utils/consts.go b/utils/consts.go index b12f1a5ad..e418ad311 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2817,7 +2817,6 @@ const ( const ( StateServiceUP = "SERVICE_UP" StateServiceDOWN = "SERVICE_DOWN" - StateServiceInit = "SERVICE_INIT" ) func buildCacheInstRevPrefixes() {