diff --git a/services/accounts.go b/services/accounts.go
index d988a94bc..a13d2959f 100644
--- a/services/accounts.go
+++ b/services/accounts.go
@@ -66,32 +66,29 @@ func (acts *AccountService) Start(shutdown chan struct{}) (err error) {
if acts.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
- cls := acts.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ActionS, utils.CommonListenerS, utils.StateServiceUP)
- }
- acts.cl = cls.CLS()
- cacheS := acts.srvIndexer.GetService(utils.CacheS).(*CacheService)
- if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.AccountS, utils.CacheS, utils.StateServiceUP)
+
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.CacheS,
+ utils.FilterS,
+ utils.DataDB,
+ utils.AnalyzerS,
+ },
+ acts.srvIndexer, acts.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
}
+ acts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheAccounts,
utils.CacheAccountsFilterIndexes); err != nil {
- return
- }
- fs := acts.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.AccountS, utils.FilterS, utils.StateServiceUP)
- }
- dbs := acts.srvIndexer.GetService(utils.DataDB).(*DataDBService)
- if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.AccountS, utils.DataDB, utils.StateServiceUP)
- }
- anz := acts.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
- if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.AccountS, utils.AnalyzerS, utils.StateServiceUP)
+ return err
}
+ fs := srvDeps[utils.FilterS].(*FilterService)
+ dbs := srvDeps[utils.DataDB].(*DataDBService)
+ anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
acts.Lock()
defer acts.Unlock()
diff --git a/services/actions.go b/services/actions.go
index 2c0575a17..891c6ec2e 100644
--- a/services/actions.go
+++ b/services/actions.go
@@ -68,32 +68,28 @@ func (acts *ActionService) Start(shutdown chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
- cls := acts.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ActionS, utils.CommonListenerS, utils.StateServiceUP)
- }
- acts.cl = cls.CLS()
- cacheS := acts.srvIndexer.GetService(utils.CacheS).(*CacheService)
- if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ActionS, utils.CacheS, utils.StateServiceUP)
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.CacheS,
+ utils.FilterS,
+ utils.DataDB,
+ utils.AnalyzerS,
+ },
+ acts.srvIndexer, acts.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
}
+ acts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheActionProfiles,
utils.CacheActionProfilesFilterIndexes); err != nil {
- return
- }
- fs := acts.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ActionS, utils.FilterS, utils.StateServiceUP)
- }
- dbs := acts.srvIndexer.GetService(utils.DataDB).(*DataDBService)
- if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ActionS, utils.DataDB, utils.StateServiceUP)
- }
- anz := acts.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
- if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ActionS, utils.AnalyzerS, utils.StateServiceUP)
+ return err
}
+ fs := srvDeps[utils.FilterS].(*FilterService)
+ dbs := srvDeps[utils.DataDB].(*DataDBService)
+ anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
acts.Lock()
defer acts.Unlock()
diff --git a/services/adminsv1.go b/services/adminsv1.go
index 193be5b5f..bbaffabe6 100644
--- a/services/adminsv1.go
+++ b/services/adminsv1.go
@@ -65,27 +65,23 @@ func (apiService *AdminSv1Service) Start(_ chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
- cls := apiService.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.AdminS, utils.CommonListenerS, utils.StateServiceUP)
- }
- apiService.cl = cls.CLS()
- fs := apiService.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.AdminS, utils.FilterS, utils.StateServiceUP)
- }
- dbs := apiService.srvIndexer.GetService(utils.DataDB).(*DataDBService)
- if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.AdminS, utils.DataDB, utils.StateServiceUP)
- }
- anz := apiService.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
- if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.AdminS, utils.AnalyzerS, utils.StateServiceUP)
- }
- sdbs := apiService.srvIndexer.GetService(utils.StorDB).(*StorDBService)
- if utils.StructChanTimeout(sdbs.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.AdminS, utils.StorDB, utils.StateServiceUP)
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.FilterS,
+ utils.DataDB,
+ utils.AnalyzerS,
+ utils.StorDB,
+ },
+ apiService.srvIndexer, apiService.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
}
+ apiService.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ fs := srvDeps[utils.FilterS].(*FilterService)
+ dbs := srvDeps[utils.DataDB].(*DataDBService)
+ anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
+ sdbs := srvDeps[utils.StorDB].(*StorDBService)
apiService.Lock()
defer apiService.Unlock()
diff --git a/services/analyzers.go b/services/analyzers.go
index e31dc436b..b4a570177 100644
--- a/services/analyzers.go
+++ b/services/analyzers.go
@@ -71,11 +71,12 @@ func (anz *AnalyzerService) Start(shutdown chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
- cls := anz.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), anz.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.AnalyzerS, utils.CommonListenerS, utils.StateServiceUP)
+ cls, err := waitForServiceState(utils.StateServiceUP, utils.CommonListenerS, anz.srvIndexer,
+ anz.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return
}
- anz.cl = cls.CLS()
+ anz.cl = cls.(*CommonListenerService).CLS()
anz.Lock()
defer anz.Unlock()
diff --git a/services/attributes.go b/services/attributes.go
index f2d4de397..7b2add4ee 100644
--- a/services/attributes.go
+++ b/services/attributes.go
@@ -64,37 +64,29 @@ func (attrS *AttributeService) Start(shutdown chan struct{}) (err error) {
if attrS.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
- if utils.StructChanTimeout(
- attrS.srvIndexer.GetService(utils.CommonListenerS).StateChan(utils.StateServiceUP),
- attrS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CommonListenerS, utils.StateServiceUP)
- }
- cls := attrS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CommonListenerS, utils.StateServiceUP)
- }
- attrS.cl = cls.CLS()
- cacheS := attrS.srvIndexer.GetService(utils.CacheS).(*CacheService)
- if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CacheS, utils.StateServiceUP)
+
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.CacheS,
+ utils.FilterS,
+ utils.DataDB,
+ utils.AnalyzerS,
+ },
+ attrS.srvIndexer, attrS.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return
}
+ attrS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheAttributeProfiles,
utils.CacheAttributeFilterIndexes); err != nil {
return
}
- fs := attrS.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.FilterS, utils.StateServiceUP)
- }
- dbs := attrS.srvIndexer.GetService(utils.DataDB).(*DataDBService)
- if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.DataDB, utils.StateServiceUP)
- }
- anz := attrS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
- if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.AnalyzerS, utils.StateServiceUP)
- }
+ fs := srvDeps[utils.FilterS].(*FilterService)
+ dbs := srvDeps[utils.DataDB].(*DataDBService)
+ anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
attrS.Lock()
defer attrS.Unlock()
diff --git a/services/caches.go b/services/caches.go
index bd5161948..dbd55fe9b 100644
--- a/services/caches.go
+++ b/services/caches.go
@@ -19,6 +19,8 @@ along with this program. If not, see
package services
import (
+ "sync"
+
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
@@ -41,6 +43,7 @@ func NewCacheService(cfg *config.CGRConfig, connMgr *engine.ConnManager,
// CacheService implements Agent interface
type CacheService struct {
+ mu sync.Mutex
cl *commonlisteners.CommonListenerS
cacheCh chan *engine.CacheS
@@ -54,23 +57,29 @@ type CacheService struct {
// Start should handle the sercive start
func (cS *CacheService) Start(shutdown chan struct{}) (err error) {
- cls := cS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.CacheS, utils.CommonListenerS, utils.StateServiceUP)
+ if cS.IsRunning() {
+ return utils.ErrServiceAlreadyRunning
}
- cS.cl = cls.CLS()
- dbs := cS.srvIndexer.GetService(utils.DataDB).(*DataDBService)
- if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.CacheS, utils.DataDB, utils.StateServiceUP)
- }
- anz := cS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
- if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.CacheS, utils.AnalyzerS, utils.StateServiceUP)
- }
- cs := cS.srvIndexer.GetService(utils.CoreS).(*CoreService)
- if utils.StructChanTimeout(cs.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.CacheS, utils.CoreS, utils.StateServiceUP)
+
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.DataDB,
+ utils.AnalyzerS,
+ utils.CoreS,
+ },
+ cS.srvIndexer, cS.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
}
+ cS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ dbs := srvDeps[utils.DataDB].(*DataDBService)
+ anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
+ cs := srvDeps[utils.CoreS].(*CoreService)
+
+ cS.mu.Lock()
+ defer cS.mu.Unlock()
+
engine.Cache = engine.NewCacheS(cS.cfg, dbs.DataManager(), cS.connMgr, cs.CoreS().CapsStats)
go engine.Cache.Precache(shutdown)
diff --git a/services/cdrs.go b/services/cdrs.go
index 1d966b55b..5ddf80451 100644
--- a/services/cdrs.go
+++ b/services/cdrs.go
@@ -64,34 +64,29 @@ func (cs *CDRService) Start(_ chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
- cls := cs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.CDRs, utils.CommonListenerS, utils.StateServiceUP)
- }
- cs.cl = cls.CLS()
- fs := cs.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.CDRs, utils.FilterS, utils.StateServiceUP)
- }
- dbs := cs.srvIndexer.GetService(utils.DataDB).(*DataDBService)
- if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.CDRs, utils.DataDB, utils.StateServiceUP)
- }
- anz := cs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
- if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.CDRs, utils.AnalyzerS, utils.StateServiceUP)
- }
- sdbs := cs.srvIndexer.GetService(utils.StorDB).(*StorDBService)
- if utils.StructChanTimeout(sdbs.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.CDRs, utils.StorDB, utils.StateServiceUP)
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.FilterS,
+ utils.DataDB,
+ utils.AnalyzerS,
+ utils.StorDB,
+ },
+ cs.srvIndexer, cs.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
}
+ cs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ fs := srvDeps[utils.FilterS].(*FilterService)
+ dbs := srvDeps[utils.DataDB].(*DataDBService)
+ anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
+ sdbs := srvDeps[utils.StorDB].(*StorDBService)
cs.Lock()
defer cs.Unlock()
cs.cdrS = cdrs.NewCDRServer(cs.cfg, dbs.DataManager(), fs.FilterS(), cs.connMgr, sdbs.DB())
runtime.Gosched()
- utils.Logger.Info("Registering CDRS RPC service.")
srv, err := engine.NewServiceWithPing(cs.cdrS, utils.CDRsV1, utils.V1Prfx)
if err != nil {
return err
diff --git a/services/chargers.go b/services/chargers.go
index 18d0d6805..ed3cf8888 100644
--- a/services/chargers.go
+++ b/services/chargers.go
@@ -57,37 +57,33 @@ type ChargerService struct {
}
// Start should handle the service start
-func (chrS *ChargerService) Start(shutdown chan struct{}) (err error) {
+func (chrS *ChargerService) Start(shutdown chan struct{}) error {
if chrS.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
- cls := chrS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.CommonListenerS, utils.StateServiceUP)
- }
- chrS.cl = cls.CLS()
- cacheS := chrS.srvIndexer.GetService(utils.CacheS).(*CacheService)
- if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.CacheS, utils.StateServiceUP)
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.CacheS,
+ utils.FilterS,
+ utils.DataDB,
+ utils.AnalyzerS,
+ },
+ chrS.srvIndexer, chrS.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
}
+ chrS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheChargerProfiles,
utils.CacheChargerFilterIndexes); err != nil {
- return
- }
- fs := chrS.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.FilterS, utils.StateServiceUP)
- }
- dbs := chrS.srvIndexer.GetService(utils.DataDB).(*DataDBService)
- if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.DataDB, utils.StateServiceUP)
- }
- anz := chrS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
- if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.AnalyzerS, utils.StateServiceUP)
+ return err
}
+ fs := srvDeps[utils.FilterS].(*FilterService)
+ dbs := srvDeps[utils.DataDB].(*DataDBService)
+ anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
chrS.Lock()
defer chrS.Unlock()
@@ -102,7 +98,7 @@ func (chrS *ChargerService) Start(shutdown chan struct{}) (err error) {
chrS.intRPCconn = anz.GetInternalCodec(srv, utils.ChargerS)
close(chrS.stateDeps.StateChan(utils.StateServiceUP))
- return
+ return nil
}
// Reload handles the change of config
diff --git a/services/config.go b/services/config.go
index 5eee72fa2..decfa0f79 100644
--- a/services/config.go
+++ b/services/config.go
@@ -22,6 +22,7 @@ import (
"sync"
"github.com/cgrates/birpc"
+ "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -41,6 +42,7 @@ func NewConfigService(cfg *config.CGRConfig, srvIndexer *servmanager.ServiceInde
type ConfigService struct {
mu sync.RWMutex
cfg *config.CGRConfig
+ cl *commonlisteners.CommonListenerS
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
@@ -48,22 +50,29 @@ type ConfigService struct {
// Start handles the service start.
func (s *ConfigService) Start(_ chan struct{}) error {
- cls := s.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.GuardianS, utils.CommonListenerS, utils.StateServiceUP)
- }
- anz := s.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
- if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.GuardianS, utils.AnalyzerS, utils.StateServiceUP)
+ if s.IsRunning() {
+ return utils.ErrServiceAlreadyRunning
}
- srv, _ := engine.NewServiceWithName(s.cfg, utils.ConfigS, true)
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.AnalyzerS,
+ },
+ s.srvIndexer, s.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
+ }
+ s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
+
+ svcs, _ := engine.NewServiceWithName(s.cfg, utils.ConfigS, true)
if !s.cfg.DispatcherSCfg().Enabled {
- for _, s := range srv {
- cls.CLS().RpcRegister(s)
+ for _, svc := range svcs {
+ s.cl.RpcRegister(svc)
}
}
- s.intRPCconn = anz.GetInternalCodec(srv, utils.ConfigSv1)
+ s.intRPCconn = anz.GetInternalCodec(svcs, utils.ConfigSv1)
close(s.stateDeps.StateChan(utils.StateServiceUP))
return nil
}
@@ -75,6 +84,7 @@ func (s *ConfigService) Reload(_ chan struct{}) error {
// Shutdown stops the service.
func (s *ConfigService) Shutdown() error {
+ s.cl.RpcUnregisterName(utils.ConfigSv1)
return nil
}
diff --git a/services/cores.go b/services/cores.go
index 441635259..957693590 100644
--- a/services/cores.go
+++ b/services/cores.go
@@ -71,15 +71,17 @@ func (cS *CoreService) Start(shutdown chan struct{}) error {
return utils.ErrServiceAlreadyRunning
}
- cls := cS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.CoreS, utils.CommonListenerS, utils.StateServiceUP)
- }
- cS.cl = cls.CLS()
- anz := cS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
- if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.CoreS, utils.AnalyzerS, utils.StateServiceUP)
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.AnalyzerS,
+ },
+ cS.srvIndexer, cS.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
}
+ cS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
cS.mu.Lock()
defer cS.mu.Unlock()
diff --git a/services/diameteragent.go b/services/diameteragent.go
index ed4b2cd93..f87aa3206 100644
--- a/services/diameteragent.go
+++ b/services/diameteragent.go
@@ -67,13 +67,15 @@ func (da *DiameterAgent) Start(shutdown chan struct{}) error {
return utils.ErrServiceAlreadyRunning
}
- fs := da.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), da.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.DiameterAgent, utils.FilterS, utils.StateServiceUP)
+ fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, da.srvIndexer,
+ da.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
}
+
da.Lock()
defer da.Unlock()
- return da.start(fs.FilterS(), da.caps, shutdown)
+ return da.start(fs.(*FilterService).FilterS(), da.caps, shutdown)
}
func (da *DiameterAgent) start(filterS *engine.FilterS, caps *engine.Caps, shutdown chan struct{}) error {
diff --git a/services/dispatchers.go b/services/dispatchers.go
index 8457e7943..83ce35164 100644
--- a/services/dispatchers.go
+++ b/services/dispatchers.go
@@ -64,35 +64,30 @@ func (dspS *DispatcherService) Start(shutdown chan struct{}) (err error) {
if dspS.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
- utils.Logger.Info("Starting CGRateS DispatcherS service.")
- cls := dspS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.CommonListenerS, utils.StateServiceUP)
- }
- dspS.cl = cls.CLS()
- cacheS := dspS.srvIndexer.GetService(utils.CacheS).(*CacheService)
- if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.CacheS, utils.StateServiceUP)
+
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.CacheS,
+ utils.FilterS,
+ utils.DataDB,
+ utils.AnalyzerS,
+ },
+ dspS.srvIndexer, dspS.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
}
+ dspS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheDispatcherProfiles,
utils.CacheDispatcherHosts,
utils.CacheDispatcherFilterIndexes); err != nil {
return
}
-
- fs := dspS.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.FilterS, utils.StateServiceUP)
- }
- dbs := dspS.srvIndexer.GetService(utils.DataDB).(*DataDBService)
- if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.DataDB, utils.StateServiceUP)
- }
- anz := dspS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
- if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.AnalyzerS, utils.StateServiceUP)
- }
+ fs := srvDeps[utils.FilterS].(*FilterService)
+ dbs := srvDeps[utils.DataDB].(*DataDBService)
+ anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
dspS.Lock()
defer dspS.Unlock()
diff --git a/services/dnsagent.go b/services/dnsagent.go
index 9b3f5a851..9144c1adb 100644
--- a/services/dnsagent.go
+++ b/services/dnsagent.go
@@ -62,14 +62,16 @@ func (dns *DNSAgent) Start(shutdown chan struct{}) (err error) {
if dns.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
- fs := dns.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), dns.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.DNSAgent, utils.FilterS, utils.StateServiceUP)
+
+ fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, dns.srvIndexer,
+ dns.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return
}
dns.Lock()
defer dns.Unlock()
- dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.FilterS(), dns.connMgr)
+ dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.(*FilterService).FilterS(), dns.connMgr)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error()))
dns.dns = nil
diff --git a/services/ees.go b/services/ees.go
index 35bb24095..35451f57d 100644
--- a/services/ees.go
+++ b/services/ees.go
@@ -98,24 +98,23 @@ func (es *EventExporterService) Start(_ chan struct{}) error {
return utils.ErrServiceAlreadyRunning
}
- cls := es.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), es.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.EEs, utils.CommonListenerS, utils.StateServiceUP)
- }
- es.cl = cls.CLS()
- fs := es.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), es.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.EEs, utils.FilterS, utils.StateServiceUP)
- }
- anz := es.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
- if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), es.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.EEs, utils.AnalyzerS, utils.StateServiceUP)
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.FilterS,
+ utils.AnalyzerS,
+ },
+ es.srvIndexer, es.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
}
+ es.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ fs := srvDeps[utils.FilterS].(*FilterService)
+ anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
es.mu.Lock()
defer es.mu.Unlock()
- var err error
es.eeS, err = ees.NewEventExporterS(es.cfg, fs.FilterS(), es.connMgr)
if err != nil {
return err
diff --git a/services/efs.go b/services/efs.go
index 1a68b2b18..750131a3f 100644
--- a/services/efs.go
+++ b/services/efs.go
@@ -63,17 +63,21 @@ func (efServ *ExportFailoverService) Start(_ chan struct{}) (err error) {
if efServ.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
- cls := efServ.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), efServ.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.EFs, utils.CommonListenerS, utils.StateServiceUP)
+
+ cls, err := waitForServiceState(utils.StateServiceUP, utils.CommonListenerS, efServ.srvIndexer,
+ efServ.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return
}
- efServ.cl = cls.CLS()
+ efServ.cl = cls.(*CommonListenerService).CLS()
+
efServ.Lock()
+ defer efServ.Unlock()
+
efServ.efS = efs.NewEfs(efServ.cfg, efServ.connMgr)
efServ.stopChan = make(chan struct{})
efServ.srv, _ = engine.NewServiceWithPing(efServ.efS, utils.EfSv1, utils.V1Prfx)
efServ.cl.RpcRegister(efServ.srv)
- efServ.Unlock()
close(efServ.stateDeps.StateChan(utils.StateServiceUP))
return
}
diff --git a/services/ers.go b/services/ers.go
index 6748edc59..d8e9c6c0c 100644
--- a/services/ers.go
+++ b/services/ers.go
@@ -68,19 +68,19 @@ func (erS *EventReaderService) Start(shutdown chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
- cls := erS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), erS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ERs, utils.CommonListenerS, utils.StateServiceUP)
- }
- erS.cl = cls.CLS()
- fs := erS.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), erS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ERs, utils.FilterS, utils.StateServiceUP)
- }
- anz := erS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
- if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), erS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ERs, utils.AnalyzerS, utils.StateServiceUP)
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.FilterS,
+ utils.AnalyzerS,
+ },
+ erS.srvIndexer, erS.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
}
+ erS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ fs := srvDeps[utils.FilterS].(*FilterService)
+ anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
erS.Lock()
defer erS.Unlock()
diff --git a/services/filters.go b/services/filters.go
index 17f1fce36..227763b5d 100644
--- a/services/filters.go
+++ b/services/filters.go
@@ -55,17 +55,28 @@ type FilterService struct {
// Start handles the service start.
func (s *FilterService) Start(shutdown chan struct{}) error {
- cacheS := s.srvIndexer.GetService(utils.CacheS).(*CacheService)
- if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.FilterS, utils.CacheS, utils.StateServiceUP)
+ if s.IsRunning() {
+ return utils.ErrServiceAlreadyRunning
}
- if err := cacheS.WaitToPrecache(shutdown, utils.CacheFilters); err != nil {
+
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CacheS,
+ utils.DataDB,
+ },
+ s.srvIndexer, s.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
return err
}
- dbs := s.srvIndexer.GetService(utils.DataDB).(*DataDBService)
- if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.FilterS, utils.DataDB, utils.StateServiceUP)
+ cacheS := srvDeps[utils.CacheS].(*CacheService)
+ if err = cacheS.WaitToPrecache(shutdown, utils.CacheFilters); err != nil {
+ return err
}
+ dbs := srvDeps[utils.DataDB].(*DataDBService)
+
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
s.fltrS = engine.NewFilterS(s.cfg, s.connMgr, dbs.DataManager())
close(s.stateDeps.StateChan(utils.StateServiceUP))
return nil
diff --git a/services/guardian.go b/services/guardian.go
index 668875cb8..bb8f5de78 100644
--- a/services/guardian.go
+++ b/services/guardian.go
@@ -22,6 +22,7 @@ import (
"sync"
"github.com/cgrates/birpc"
+ "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/guardian"
@@ -42,6 +43,7 @@ func NewGuardianService(cfg *config.CGRConfig, srvIndexer *servmanager.ServiceIn
type GuardianService struct {
mu sync.RWMutex
cfg *config.CGRConfig
+ cl *commonlisteners.CommonListenerS
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
@@ -49,21 +51,32 @@ type GuardianService struct {
// Start handles the service start.
func (s *GuardianService) Start(_ chan struct{}) error {
- cls := s.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.GuardianS, utils.CommonListenerS, utils.StateServiceUP)
+ if s.IsRunning() {
+ return utils.ErrServiceAlreadyRunning
}
- anz := s.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
- if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.GuardianS, utils.AnalyzerS, utils.StateServiceUP)
+
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.AnalyzerS,
+ },
+ s.srvIndexer, s.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
}
- srv, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true)
+ s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
+
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ svcs, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true)
if !s.cfg.DispatcherSCfg().Enabled {
- for _, s := range srv {
- cls.CLS().RpcRegister(s)
+ for _, svc := range svcs {
+ s.cl.RpcRegister(svc)
}
}
- s.intRPCconn = anz.GetInternalCodec(srv, utils.GuardianS)
+ s.intRPCconn = anz.GetInternalCodec(svcs, utils.GuardianS)
close(s.stateDeps.StateChan(utils.StateServiceUP))
return nil
}
@@ -75,6 +88,9 @@ func (s *GuardianService) Reload(_ chan struct{}) error {
// Shutdown stops the service.
func (s *GuardianService) Shutdown() error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.cl.RpcUnregisterName(utils.GuardianSv1)
return nil
}
@@ -85,7 +101,7 @@ func (s *GuardianService) IsRunning() bool {
// ServiceName returns the service name
func (s *GuardianService) ServiceName() string {
- return utils.FilterS
+ return utils.GuardianS
}
// ShouldRun returns if the service should be running.
diff --git a/services/httpagent.go b/services/httpagent.go
index aaa76c21e..d9922ac02 100644
--- a/services/httpagent.go
+++ b/services/httpagent.go
@@ -19,7 +19,6 @@ along with this program. If not, see
package services
import (
- "fmt"
"sync"
"github.com/cgrates/birpc"
@@ -67,26 +66,28 @@ func (ha *HTTPAgent) Start(_ chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
- cls := ha.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ha.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.HTTPAgent, utils.CommonListenerS, utils.StateServiceUP)
- }
- cl := cls.CLS()
- fs := ha.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ha.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.HTTPAgent, utils.FilterS, utils.StateServiceUP)
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.FilterS,
+ },
+ ha.srvIndexer, ha.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
}
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ fs := srvDeps[utils.FilterS].(*FilterService)
ha.Lock()
+ defer ha.Unlock()
+
ha.started = true
- utils.Logger.Info(fmt.Sprintf("<%s> successfully started HTTPAgent", utils.HTTPAgent))
for _, agntCfg := range ha.cfg.HTTPAgentCfg() {
cl.RegisterHttpHandler(agntCfg.URL,
agents.NewHTTPAgent(ha.connMgr, agntCfg.SessionSConns, fs.FilterS(),
ha.cfg.GeneralCfg().DefaultTenant, agntCfg.RequestPayload,
agntCfg.ReplyPayload, agntCfg.RequestProcessors))
}
- ha.Unlock()
close(ha.stateDeps.StateChan(utils.StateServiceUP))
return
}
diff --git a/services/janus.go b/services/janus.go
index 60679ff6a..5d2134183 100644
--- a/services/janus.go
+++ b/services/janus.go
@@ -63,15 +63,17 @@ type JanusAgent struct {
// Start should jandle the sercive start
func (ja *JanusAgent) Start(_ chan struct{}) (err error) {
- cls := ja.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ja.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.JanusAgent, utils.CommonListenerS, utils.StateServiceUP)
- }
- cl := cls.CLS()
- fs := ja.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ja.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.JanusAgent, utils.FilterS, utils.StateServiceUP)
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.FilterS,
+ },
+ ja.srvIndexer, ja.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
}
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ fs := srvDeps[utils.FilterS].(*FilterService)
ja.Lock()
if ja.started {
@@ -97,7 +99,6 @@ func (ja *JanusAgent) Start(_ chan struct{}) (err error) {
ja.started = true
ja.Unlock()
close(ja.stateDeps.StateChan(utils.StateServiceUP))
- utils.Logger.Info(fmt.Sprintf("<%s> successfully started.", utils.JanusAgent))
return
}
diff --git a/services/loaders.go b/services/loaders.go
index 22207ea4c..add5423e2 100644
--- a/services/loaders.go
+++ b/services/loaders.go
@@ -65,24 +65,21 @@ func (ldrs *LoaderService) Start(_ chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
- cls := ldrs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.CommonListenerS, utils.StateServiceUP)
- }
- ldrs.cl = cls.CLS()
-
- fs := ldrs.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.FilterS, utils.StateServiceUP)
- }
- dbs := ldrs.srvIndexer.GetService(utils.DataDB).(*DataDBService)
- if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.DataDB, utils.StateServiceUP)
- }
- anz := ldrs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
- if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.AnalyzerS, utils.StateServiceUP)
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.FilterS,
+ utils.DataDB,
+ utils.AnalyzerS,
+ },
+ ldrs.srvIndexer, ldrs.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
}
+ ldrs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ fs := srvDeps[utils.FilterS].(*FilterService)
+ dbs := srvDeps[utils.DataDB].(*DataDBService)
+ anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
ldrs.Lock()
defer ldrs.Unlock()
diff --git a/services/radiusagent.go b/services/radiusagent.go
index 714d0f33a..879db8b68 100644
--- a/services/radiusagent.go
+++ b/services/radiusagent.go
@@ -66,9 +66,10 @@ func (rad *RadiusAgent) Start(shutdown chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
- fs := rad.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), rad.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.RadiusAgent, utils.FilterS, utils.StateServiceUP)
+ fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, rad.srvIndexer,
+ rad.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return
}
rad.Lock()
@@ -78,8 +79,7 @@ func (rad *RadiusAgent) Start(shutdown chan struct{}) (err error) {
rad.lauth = rad.cfg.RadiusAgentCfg().ListenAuth
rad.lacct = rad.cfg.RadiusAgentCfg().ListenAcct
- if rad.rad, err = agents.NewRadiusAgent(rad.cfg, fs.FilterS(), rad.connMgr); err != nil {
- utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.RadiusAgent, err.Error()))
+ if rad.rad, err = agents.NewRadiusAgent(rad.cfg, fs.(*FilterService).FilterS(), rad.connMgr); err != nil {
return
}
rad.stopChan = make(chan struct{})
diff --git a/services/rankings.go b/services/rankings.go
index 80a1b9b3d..8d06c5aea 100644
--- a/services/rankings.go
+++ b/services/rankings.go
@@ -67,33 +67,29 @@ func (ran *RankingService) Start(shutdown chan struct{}) (err error) {
}
ran.srvDep[utils.DataDB].Add(1)
- cls := ran.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.RankingS, utils.CommonListenerS, utils.StateServiceUP)
- }
- ran.cl = cls.CLS()
- cacheS := ran.srvIndexer.GetService(utils.CacheS).(*CacheService)
- if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.RankingS, utils.CacheS, utils.StateServiceUP)
- }
- if err = cacheS.WaitToPrecache(shutdown,
- utils.CacheRankingProfiles,
- utils.CacheRankings,
- ); err != nil {
+
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.CacheS,
+ utils.FilterS,
+ utils.DataDB,
+ utils.AnalyzerS,
+ },
+ ran.srvIndexer, ran.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
return err
}
- dbs := ran.srvIndexer.GetService(utils.DataDB).(*DataDBService)
- if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.RankingS, utils.DataDB, utils.StateServiceUP)
- }
- fs := ran.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.RankingS, utils.FilterS, utils.StateServiceUP)
- }
- anz := ran.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
- if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.RankingS, utils.AnalyzerS, utils.StateServiceUP)
+ ran.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cacheS := srvDeps[utils.CacheS].(*CacheService)
+ if err = cacheS.WaitToPrecache(shutdown,
+ utils.CacheRankingProfiles,
+ utils.CacheRankings); err != nil {
+ return err
}
+ fs := srvDeps[utils.FilterS].(*FilterService)
+ dbs := srvDeps[utils.DataDB].(*DataDBService)
+ anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
ran.Lock()
defer ran.Unlock()
diff --git a/services/rates.go b/services/rates.go
index 1ff179fef..51e07b070 100644
--- a/services/rates.go
+++ b/services/rates.go
@@ -96,33 +96,29 @@ func (rs *RateService) Start(shutdown chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
- cls := rs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.RateS, utils.CommonListenerS, utils.StateServiceUP)
- }
- rs.cl = cls.CLS()
- cacheS := rs.srvIndexer.GetService(utils.CacheS).(*CacheService)
- if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.RateS, utils.CacheS, utils.StateServiceUP)
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.CacheS,
+ utils.FilterS,
+ utils.DataDB,
+ utils.AnalyzerS,
+ },
+ rs.srvIndexer, rs.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
}
+ rs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheRateProfiles,
utils.CacheRateProfilesFilterIndexes,
utils.CacheRateFilterIndexes); err != nil {
- return
- }
- fs := rs.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.RateS, utils.FilterS, utils.StateServiceUP)
- }
- dbs := rs.srvIndexer.GetService(utils.DataDB).(*DataDBService)
- if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.RateS, utils.DataDB, utils.StateServiceUP)
- }
- anz := rs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
- if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.RateS, utils.AnalyzerS, utils.StateServiceUP)
+ return err
}
+ fs := srvDeps[utils.FilterS].(*FilterService)
+ dbs := srvDeps[utils.DataDB].(*DataDBService)
+ anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
rs.Lock()
rs.rateS = rates.NewRateS(rs.cfg, fs.FilterS(), dbs.DataManager())
diff --git a/services/resources.go b/services/resources.go
index 19a5fc1c2..610f62b7f 100644
--- a/services/resources.go
+++ b/services/resources.go
@@ -67,33 +67,30 @@ func (reS *ResourceService) Start(shutdown chan struct{}) (err error) {
}
reS.srvDep[utils.DataDB].Add(1)
- cls := reS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.CommonListenerS, utils.StateServiceUP)
- }
- reS.cl = cls.CLS()
- cacheS := reS.srvIndexer.GetService(utils.CacheS).(*CacheService)
- if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.CacheS, utils.StateServiceUP)
+
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.CacheS,
+ utils.FilterS,
+ utils.DataDB,
+ utils.AnalyzerS,
+ },
+ reS.srvIndexer, reS.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
}
+ reS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheResourceProfiles,
utils.CacheResources,
utils.CacheResourceFilterIndexes); err != nil {
return
}
- fs := reS.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.FilterS, utils.StateServiceUP)
- }
- dbs := reS.srvIndexer.GetService(utils.DataDB).(*DataDBService)
- if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.DataDB, utils.StateServiceUP)
- }
- anz := reS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
- if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.AnalyzerS, utils.StateServiceUP)
- }
+ fs := srvDeps[utils.FilterS].(*FilterService)
+ dbs := srvDeps[utils.DataDB].(*DataDBService)
+ anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
reS.Lock()
defer reS.Unlock()
diff --git a/services/routes.go b/services/routes.go
index 9710042e1..f002181f6 100644
--- a/services/routes.go
+++ b/services/routes.go
@@ -62,32 +62,28 @@ func (routeS *RouteService) Start(shutdown chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
- cls := routeS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.RouteS, utils.CommonListenerS, utils.StateServiceUP)
- }
- routeS.cl = cls.CLS()
- cacheS := routeS.srvIndexer.GetService(utils.CacheS).(*CacheService)
- if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.RouteS, utils.CacheS, utils.StateServiceUP)
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.CacheS,
+ utils.FilterS,
+ utils.DataDB,
+ utils.AnalyzerS,
+ },
+ routeS.srvIndexer, routeS.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
}
+ routeS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheRouteProfiles,
utils.CacheRouteFilterIndexes); err != nil {
return
}
- fs := routeS.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.RouteS, utils.FilterS, utils.StateServiceUP)
- }
- dbs := routeS.srvIndexer.GetService(utils.DataDB).(*DataDBService)
- if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.RouteS, utils.DataDB, utils.StateServiceUP)
- }
- anz := routeS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
- if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.RouteS, utils.AnalyzerS, utils.StateServiceUP)
- }
+ fs := srvDeps[utils.FilterS].(*FilterService)
+ dbs := srvDeps[utils.DataDB].(*DataDBService)
+ anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
routeS.Lock()
defer routeS.Unlock()
diff --git a/services/sessions.go b/services/sessions.go
index 45355b009..c13c9b018 100644
--- a/services/sessions.go
+++ b/services/sessions.go
@@ -67,23 +67,21 @@ func (smg *SessionService) Start(shutdown chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
- cls := smg.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.SessionS, utils.CommonListenerS, utils.StateServiceUP)
- }
- smg.cl = cls.CLS()
- fs := smg.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.SessionS, utils.FilterS, utils.StateServiceUP)
- }
- dbs := smg.srvIndexer.GetService(utils.DataDB).(*DataDBService)
- if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.SessionS, utils.DataDB, utils.StateServiceUP)
- }
- anz := smg.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
- if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.SessionS, utils.AnalyzerS, utils.StateServiceUP)
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.FilterS,
+ utils.DataDB,
+ utils.AnalyzerS,
+ },
+ smg.srvIndexer, smg.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
}
+ smg.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ fs := srvDeps[utils.FilterS].(*FilterService)
+ dbs := srvDeps[utils.DataDB].(*DataDBService)
+ anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
smg.Lock()
defer smg.Unlock()
@@ -111,6 +109,7 @@ func (smg *SessionService) Start(shutdown chan struct{}) (err error) {
// run this in it's own goroutine
go smg.start(shutdown)
}
+ smg.intRPCconn = anz.GetInternalCodec(srv, utils.SessionS)
close(smg.stateDeps.StateChan(utils.StateServiceUP))
return
}
diff --git a/services/sipagent.go b/services/sipagent.go
index 03621692d..6a9354fe8 100644
--- a/services/sipagent.go
+++ b/services/sipagent.go
@@ -63,15 +63,16 @@ func (sip *SIPAgent) Start(shutdown chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
- fs := sip.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), sip.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.SIPAgent, utils.FilterS, utils.StateServiceUP)
+ fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, sip.srvIndexer,
+ sip.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return
}
sip.Lock()
defer sip.Unlock()
sip.oldListen = sip.cfg.SIPAgentCfg().Listen
- sip.sip, err = agents.NewSIPAgent(sip.connMgr, sip.cfg, fs.FilterS())
+ sip.sip, err = agents.NewSIPAgent(sip.connMgr, sip.cfg, fs.(*FilterService).FilterS())
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!",
utils.SIPAgent, err))
diff --git a/services/stats.go b/services/stats.go
index c303c6fd3..aa26e3410 100644
--- a/services/stats.go
+++ b/services/stats.go
@@ -67,33 +67,30 @@ func (sts *StatService) Start(shutdown chan struct{}) (err error) {
}
sts.srvDep[utils.DataDB].Add(1)
- cls := sts.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.StatS, utils.CommonListenerS, utils.StateServiceUP)
- }
- sts.cl = cls.CLS()
- cacheS := sts.srvIndexer.GetService(utils.CacheS).(*CacheService)
- if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.StatS, utils.CacheS, utils.StateServiceUP)
+
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.CacheS,
+ utils.FilterS,
+ utils.DataDB,
+ utils.AnalyzerS,
+ },
+ sts.srvIndexer, sts.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
}
+ sts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheStatQueueProfiles,
utils.CacheStatQueues,
utils.CacheStatFilterIndexes); err != nil {
return
}
- fs := sts.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.StatS, utils.FilterS, utils.StateServiceUP)
- }
- dbs := sts.srvIndexer.GetService(utils.DataDB).(*DataDBService)
- if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.StatS, utils.DataDB, utils.StateServiceUP)
- }
- anz := sts.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
- if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.StatS, utils.AnalyzerS, utils.StateServiceUP)
- }
+ fs := srvDeps[utils.FilterS].(*FilterService)
+ dbs := srvDeps[utils.DataDB].(*DataDBService)
+ anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
sts.Lock()
defer sts.Unlock()
diff --git a/services/thresholds.go b/services/thresholds.go
index 204e21ee6..754e3980c 100644
--- a/services/thresholds.go
+++ b/services/thresholds.go
@@ -67,33 +67,30 @@ func (thrs *ThresholdService) Start(shutdown chan struct{}) (err error) {
}
thrs.srvDep[utils.DataDB].Add(1)
- cls := thrs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.CommonListenerS, utils.StateServiceUP)
- }
- thrs.cl = cls.CLS()
- cacheS := thrs.srvIndexer.GetService(utils.CacheS).(*CacheService)
- if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.CacheS, utils.StateServiceUP)
+
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.CacheS,
+ utils.FilterS,
+ utils.DataDB,
+ utils.AnalyzerS,
+ },
+ thrs.srvIndexer, thrs.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
}
+ thrs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheThresholdProfiles,
utils.CacheThresholds,
utils.CacheThresholdFilterIndexes); err != nil {
return
}
- fs := thrs.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.FilterS, utils.StateServiceUP)
- }
- dbs := thrs.srvIndexer.GetService(utils.DataDB).(*DataDBService)
- if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.DataDB, utils.StateServiceUP)
- }
- anz := thrs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
- if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.AnalyzerS, utils.StateServiceUP)
- }
+ fs := srvDeps[utils.FilterS].(*FilterService)
+ dbs := srvDeps[utils.DataDB].(*DataDBService)
+ anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
thrs.Lock()
defer thrs.Unlock()
diff --git a/services/tpes.go b/services/tpes.go
index 628119f27..ca3915c9c 100644
--- a/services/tpes.go
+++ b/services/tpes.go
@@ -60,15 +60,18 @@ type TPeService struct {
// Start should handle the service start
func (ts *TPeService) Start(_ chan struct{}) (err error) {
- cls := ts.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ts.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.TPeS, utils.CommonListenerS, utils.StateServiceUP)
- }
- ts.cl = cls.CLS()
- dbs := ts.srvIndexer.GetService(utils.DataDB).(*DataDBService)
- if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ts.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.TPeS, utils.DataDB, utils.StateServiceUP)
+
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.DataDB,
+ },
+ ts.srvIndexer, ts.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
}
+ ts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ dbs := srvDeps[utils.DataDB].(*DataDBService)
ts.tpes = tpes.NewTPeS(ts.cfg, dbs.DataManager(), ts.connMgr)
ts.stopChan = make(chan struct{})
diff --git a/services/trends.go b/services/trends.go
index e24f2df1e..d76438fcd 100644
--- a/services/trends.go
+++ b/services/trends.go
@@ -66,33 +66,29 @@ func (trs *TrendService) Start(shutdown chan struct{}) (err error) {
}
trs.srvDep[utils.DataDB].Add(1)
- cls := trs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
- if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.TrendS, utils.CommonListenerS, utils.StateServiceUP)
- }
- trs.cl = cls.CLS()
- cacheS := trs.srvIndexer.GetService(utils.CacheS).(*CacheService)
- if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.TrendS, utils.CacheS, utils.StateServiceUP)
- }
- if err = cacheS.WaitToPrecache(shutdown,
- utils.CacheTrendProfiles,
- utils.CacheTrends,
- ); err != nil {
+
+ srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
+ []string{
+ utils.CommonListenerS,
+ utils.CacheS,
+ utils.FilterS,
+ utils.DataDB,
+ utils.AnalyzerS,
+ },
+ trs.srvIndexer, trs.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
return err
}
- dbs := trs.srvIndexer.GetService(utils.DataDB).(*DataDBService)
- if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.TrendS, utils.DataDB, utils.StateServiceUP)
- }
- fs := trs.srvIndexer.GetService(utils.FilterS).(*FilterService)
- if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.TrendS, utils.FilterS, utils.StateServiceUP)
- }
- anz := trs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
- if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) {
- return utils.NewServiceStateTimeoutError(utils.TrendS, utils.AnalyzerS, utils.StateServiceUP)
+ trs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cacheS := srvDeps[utils.CacheS].(*CacheService)
+ if err = cacheS.WaitToPrecache(shutdown,
+ utils.CacheTrendProfiles,
+ utils.CacheTrends); err != nil {
+ return err
}
+ fs := srvDeps[utils.FilterS].(*FilterService)
+ dbs := srvDeps[utils.DataDB].(*DataDBService)
+ anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
trs.Lock()
defer trs.Unlock()