From 712aeb0d4a36bf2cab951e757a09bc589f34fdac Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Tue, 4 Feb 2025 17:03:54 +0200 Subject: [PATCH] Revise StateDeps implementation Now tracks states by passing around an empty struct as a signal to states defined on the service, as opposed to signal state changes by closing the channel. This makes sure services can only be in one state at once and allows for multiple state changes which were not possible before. --- services/analyzers.go | 2 -- services/commonlisteners.go | 2 +- services/connmanager.go | 2 +- services/datadb.go | 2 +- services/statedeps.go | 18 +++++++++-- servmanager/servmanager.go | 60 +++++++++++++++++++++++++++++++------ utils/consts.go | 1 - 7 files changed, 69 insertions(+), 18 deletions(-) diff --git a/services/analyzers.go b/services/analyzers.go index 0d3ec6cae..1e622d9e6 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -37,7 +37,6 @@ func NewAnalyzerService(cfg *config.CGRConfig) *AnalyzerService { anz := &AnalyzerService{ cfg: cfg, stateDeps: NewStateDependencies([]string{ - utils.StateServiceInit, utils.StateServiceUP, utils.StateServiceDOWN, }), @@ -69,7 +68,6 @@ func (anz *AnalyzerService) Start(shutdown *utils.SyncedChan, registry *servmana if anz.anz, err = analyzers.NewAnalyzerS(anz.cfg); err != nil { return } - close(anz.stateDeps.StateChan(utils.StateServiceInit)) anzCtx, cancel := context.WithCancel(context.TODO()) anz.cancelFunc = cancel go func(a *analyzers.AnalyzerS) { diff --git a/services/commonlisteners.go b/services/commonlisteners.go index 7c76cfead..514dff9e3 100644 --- a/services/commonlisteners.go +++ b/services/commonlisteners.go @@ -100,7 +100,7 @@ func (s *CommonListenerService) Shutdown(registry *servmanager.ServiceRegistry) utils.TrendS, } for _, svcID := range deps { - if !servmanager.IsServiceInState(registry.Lookup(svcID), utils.StateServiceUP) { + if servmanager.State(registry.Lookup(svcID)) != utils.StateServiceUP { continue } _, err := WaitForServiceState(utils.StateServiceDOWN, svcID, registry, s.cfg.GeneralCfg().ConnectTimeout) diff --git a/services/connmanager.go b/services/connmanager.go index 4c9dcdf96..05f58f3b1 100644 --- a/services/connmanager.go +++ b/services/connmanager.go @@ -50,7 +50,7 @@ type ConnManagerService struct { func (s *ConnManagerService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { s.anz = registry.Lookup(utils.AnalyzerS).(*AnalyzerService) if s.anz.ShouldRun() { // wait for AnalyzerS only if it should run - if _, err := WaitForServiceState(utils.StateServiceInit, utils.AnalyzerS, registry, + if _, err := WaitForServiceState(utils.StateServiceUP, utils.AnalyzerS, registry, s.cfg.GeneralCfg().ConnectTimeout); err != nil { return err } diff --git a/services/datadb.go b/services/datadb.go index 23a747d16..4af0f7325 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -118,7 +118,7 @@ func (db *DataDBService) Shutdown(registry *servmanager.ServiceRegistry) error { utils.ThresholdS, } for _, svcID := range deps { - if !servmanager.IsServiceInState(registry.Lookup(svcID), utils.StateServiceUP) { + if servmanager.State(registry.Lookup(svcID)) != utils.StateServiceUP { continue } _, err := WaitForServiceState(utils.StateServiceDOWN, svcID, registry, db.cfg.GeneralCfg().ConnectTimeout) diff --git a/services/statedeps.go b/services/statedeps.go index 31bf50670..7db397dba 100644 --- a/services/statedeps.go +++ b/services/statedeps.go @@ -27,12 +27,22 @@ import ( "github.com/cgrates/cgrates/utils" ) -// NewStateDependencies constructs a StateDependencies struct +// NewStateDependencies sets up state tracking using buffered channels, where +// each service state has its own channel and they pass around a single signal. func NewStateDependencies(servStates []string) (stDeps *StateDependencies) { stDeps = &StateDependencies{stateDeps: make(map[string]chan struct{})} for _, stateID := range servStates { - stDeps.stateDeps[stateID] = make(chan struct{}) + stDeps.stateDeps[stateID] = make(chan struct{}, 1) // non-blocking } + + // A single state signal is shared between all state channels. + // Initially placed in SERVICE_DOWN during initialization. + c, has := stDeps.stateDeps[utils.StateServiceDOWN] + if !has { + panic(fmt.Sprintf("missing required initial state %q", utils.StateServiceDOWN)) + } + c <- struct{}{} + return } @@ -82,8 +92,10 @@ func WaitForServiceState(state, serviceID string, registry *servmanager.ServiceR return srv, nil } } + stateCh := srv.StateChan(state) select { - case <-srv.StateChan(state): + case <-stateCh: + stateCh <- struct{}{} return srv, nil case <-time.After(timeout): return nil, fmt.Errorf("timed out waiting for service %q state %q", serviceID, state) diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index da9cabd9e..b35ffb586 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -20,6 +20,7 @@ package servmanager import ( "fmt" + "slices" "sync" "github.com/cgrates/birpc/context" @@ -53,17 +54,17 @@ type ServiceManager struct { func (m *ServiceManager) StartServices(shutdown *utils.SyncedChan) { go m.handleReload(shutdown) for _, svc := range m.registry.List() { - // TODO: verify if IsServiceInState check is needed. It should + // TODO: verify if service state check is needed. It should // be redundant since ServManager manages all services and this // runs only at startup - if svc.ShouldRun() && !IsServiceInState(svc, utils.StateServiceUP) { + if svc.ShouldRun() && State(svc) == utils.StateServiceDOWN { m.shdWg.Add(1) go func() { if err := svc.Start(shutdown, m.registry); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to start <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err)) shutdown.CloseOnce() } - close(svc.StateChan(utils.StateServiceUP)) + MustSetState(svc, utils.StateServiceUP) utils.Logger.Info(fmt.Sprintf("<%s> started <%s> service", utils.ServiceManager, svc.ServiceName())) }() } @@ -87,10 +88,13 @@ func (m *ServiceManager) handleReload(shutdown *utils.SyncedChan) { func (m *ServiceManager) reloadService(id string, shutdown *utils.SyncedChan) (err error) { svc := m.registry.Lookup(id) - isUp := IsServiceInState(svc, utils.StateServiceUP) + + // Consider services in pending states (not up/down) to be up. This assumes + // Start/Reload/Shutdown functions are handled synchronously. + isUp := State(svc) != utils.StateServiceDOWN + if svc.ShouldRun() { if isUp { - // TODO: state channels must be reinitiated for both SERVICE_UP and SERVICE_DOWN. if err = svc.Reload(shutdown, m.registry); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to reload <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err)) shutdown.CloseOnce() @@ -104,7 +108,7 @@ func (m *ServiceManager) reloadService(id string, shutdown *utils.SyncedChan) (e shutdown.CloseOnce() return // stop if we encounter an error } - close(svc.StateChan(utils.StateServiceUP)) + MustSetState(svc, utils.StateServiceUP) utils.Logger.Info(fmt.Sprintf("<%s> started <%s> service", utils.ServiceManager, svc.ServiceName())) } } else if isUp { @@ -112,7 +116,7 @@ func (m *ServiceManager) reloadService(id string, shutdown *utils.SyncedChan) (e utils.Logger.Err(fmt.Sprintf("<%s> failed to shut down <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err)) shutdown.CloseOnce() } - close(svc.StateChan(utils.StateServiceDOWN)) + MustSetState(svc, utils.StateServiceDOWN) utils.Logger.Info(fmt.Sprintf("<%s> stopped <%s> service", utils.ServiceManager, svc.ServiceName())) m.shdWg.Done() } @@ -122,7 +126,7 @@ func (m *ServiceManager) reloadService(id string, shutdown *utils.SyncedChan) (e // ShutdownServices will stop all services func (m *ServiceManager) ShutdownServices() { for _, svc := range m.registry.List() { - if IsServiceInState(svc, utils.StateServiceUP) { + if State(svc) != utils.StateServiceDOWN { go func() { defer m.shdWg.Done() if err := svc.Shutdown(m.registry); err != nil { @@ -130,7 +134,7 @@ func (m *ServiceManager) ShutdownServices() { utils.ServiceManager, svc.ServiceName(), err)) return } - close(svc.StateChan(utils.StateServiceDOWN)) + MustSetState(svc, utils.StateServiceDOWN) utils.Logger.Info(fmt.Sprintf("<%s> stopped <%s> service", utils.ServiceManager, svc.ServiceName())) }() } @@ -304,3 +308,41 @@ func IsServiceInState(svc Service, state string) bool { return false } } + +// MustSetState changes a service's state, panicking if it fails. +func MustSetState(svc Service, state string) { + if err := SetState(svc, state); err != nil { + panic(err) + } +} + +// SetServiceState moves the state signal to a new valid state. +// Returns an error if the state is invalid or if no current state was found. +func SetState(svc Service, state string) error { + if !slices.Contains([]string{utils.StateServiceUP, utils.StateServiceDOWN}, state) { + return fmt.Errorf("invalid service state: %q", state) + } + select { + case <-svc.StateChan(utils.StateServiceUP): + case <-svc.StateChan(utils.StateServiceDOWN): + default: + return fmt.Errorf("service %q in undefined state", svc.ServiceName()) + } + svc.StateChan(state) <- struct{}{} + return nil +} + +// State returns the current state of a service by checking which channel holds +// the state signal. Returns empty string if no valid state is found. +func State(svc Service) string { + select { + case <-svc.StateChan(utils.StateServiceUP): + svc.StateChan(utils.StateServiceUP) <- struct{}{} + return utils.StateServiceUP + case <-svc.StateChan(utils.StateServiceDOWN): + svc.StateChan(utils.StateServiceDOWN) <- struct{}{} + return utils.StateServiceDOWN + default: + return "" + } +} diff --git a/utils/consts.go b/utils/consts.go index b12f1a5ad..e418ad311 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2817,7 +2817,6 @@ const ( const ( StateServiceUP = "SERVICE_UP" StateServiceDOWN = "SERVICE_DOWN" - StateServiceInit = "SERVICE_INIT" ) func buildCacheInstRevPrefixes() {