Revise StateDeps implementation

Now tracks states by passing around an empty struct as a signal to
states defined on the service, as opposed to signal state changes by
closing the channel. This makes sure services can only be in one
state at once and allows for multiple state changes which were not
possible before.
This commit is contained in:
ionutboangiu
2025-02-04 17:03:54 +02:00
committed by Dan Christian Bogos
parent df3dcfb840
commit 712aeb0d4a
7 changed files with 69 additions and 18 deletions

View File

@@ -37,7 +37,6 @@ func NewAnalyzerService(cfg *config.CGRConfig) *AnalyzerService {
anz := &AnalyzerService{
cfg: cfg,
stateDeps: NewStateDependencies([]string{
utils.StateServiceInit,
utils.StateServiceUP,
utils.StateServiceDOWN,
}),
@@ -69,7 +68,6 @@ func (anz *AnalyzerService) Start(shutdown *utils.SyncedChan, registry *servmana
if anz.anz, err = analyzers.NewAnalyzerS(anz.cfg); err != nil {
return
}
close(anz.stateDeps.StateChan(utils.StateServiceInit))
anzCtx, cancel := context.WithCancel(context.TODO())
anz.cancelFunc = cancel
go func(a *analyzers.AnalyzerS) {

View File

@@ -100,7 +100,7 @@ func (s *CommonListenerService) Shutdown(registry *servmanager.ServiceRegistry)
utils.TrendS,
}
for _, svcID := range deps {
if !servmanager.IsServiceInState(registry.Lookup(svcID), utils.StateServiceUP) {
if servmanager.State(registry.Lookup(svcID)) != utils.StateServiceUP {
continue
}
_, err := WaitForServiceState(utils.StateServiceDOWN, svcID, registry, s.cfg.GeneralCfg().ConnectTimeout)

View File

@@ -50,7 +50,7 @@ type ConnManagerService struct {
func (s *ConnManagerService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
s.anz = registry.Lookup(utils.AnalyzerS).(*AnalyzerService)
if s.anz.ShouldRun() { // wait for AnalyzerS only if it should run
if _, err := WaitForServiceState(utils.StateServiceInit, utils.AnalyzerS, registry,
if _, err := WaitForServiceState(utils.StateServiceUP, utils.AnalyzerS, registry,
s.cfg.GeneralCfg().ConnectTimeout); err != nil {
return err
}

View File

@@ -118,7 +118,7 @@ func (db *DataDBService) Shutdown(registry *servmanager.ServiceRegistry) error {
utils.ThresholdS,
}
for _, svcID := range deps {
if !servmanager.IsServiceInState(registry.Lookup(svcID), utils.StateServiceUP) {
if servmanager.State(registry.Lookup(svcID)) != utils.StateServiceUP {
continue
}
_, err := WaitForServiceState(utils.StateServiceDOWN, svcID, registry, db.cfg.GeneralCfg().ConnectTimeout)

View File

@@ -27,12 +27,22 @@ import (
"github.com/cgrates/cgrates/utils"
)
// NewStateDependencies constructs a StateDependencies struct
// NewStateDependencies sets up state tracking using buffered channels, where
// each service state has its own channel and they pass around a single signal.
func NewStateDependencies(servStates []string) (stDeps *StateDependencies) {
stDeps = &StateDependencies{stateDeps: make(map[string]chan struct{})}
for _, stateID := range servStates {
stDeps.stateDeps[stateID] = make(chan struct{})
stDeps.stateDeps[stateID] = make(chan struct{}, 1) // non-blocking
}
// A single state signal is shared between all state channels.
// Initially placed in SERVICE_DOWN during initialization.
c, has := stDeps.stateDeps[utils.StateServiceDOWN]
if !has {
panic(fmt.Sprintf("missing required initial state %q", utils.StateServiceDOWN))
}
c <- struct{}{}
return
}
@@ -82,8 +92,10 @@ func WaitForServiceState(state, serviceID string, registry *servmanager.ServiceR
return srv, nil
}
}
stateCh := srv.StateChan(state)
select {
case <-srv.StateChan(state):
case <-stateCh:
stateCh <- struct{}{}
return srv, nil
case <-time.After(timeout):
return nil, fmt.Errorf("timed out waiting for service %q state %q", serviceID, state)