Use the waitForServicesToReachState helper

This commit is contained in:
ionutboangiu
2024-12-18 13:58:52 +02:00
committed by Dan Christian Bogos
parent b68a804967
commit 20ee079e12
32 changed files with 466 additions and 466 deletions

View File

@@ -66,32 +66,29 @@ func (acts *AccountService) Start(shutdown chan struct{}) (err error) {
if acts.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
cls := acts.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ActionS, utils.CommonListenerS, utils.StateServiceUP)
}
acts.cl = cls.CLS()
cacheS := acts.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AccountS, utils.CacheS, utils.StateServiceUP)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
acts.srvIndexer, acts.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
acts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheAccounts,
utils.CacheAccountsFilterIndexes); err != nil {
return
}
fs := acts.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AccountS, utils.FilterS, utils.StateServiceUP)
}
dbs := acts.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AccountS, utils.DataDB, utils.StateServiceUP)
}
anz := acts.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AccountS, utils.AnalyzerS, utils.StateServiceUP)
return err
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
acts.Lock()
defer acts.Unlock()

View File

@@ -68,32 +68,28 @@ func (acts *ActionService) Start(shutdown chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
cls := acts.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ActionS, utils.CommonListenerS, utils.StateServiceUP)
}
acts.cl = cls.CLS()
cacheS := acts.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ActionS, utils.CacheS, utils.StateServiceUP)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
acts.srvIndexer, acts.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
acts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheActionProfiles,
utils.CacheActionProfilesFilterIndexes); err != nil {
return
}
fs := acts.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ActionS, utils.FilterS, utils.StateServiceUP)
}
dbs := acts.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ActionS, utils.DataDB, utils.StateServiceUP)
}
anz := acts.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ActionS, utils.AnalyzerS, utils.StateServiceUP)
return err
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
acts.Lock()
defer acts.Unlock()

View File

@@ -65,27 +65,23 @@ func (apiService *AdminSv1Service) Start(_ chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
cls := apiService.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AdminS, utils.CommonListenerS, utils.StateServiceUP)
}
apiService.cl = cls.CLS()
fs := apiService.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AdminS, utils.FilterS, utils.StateServiceUP)
}
dbs := apiService.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AdminS, utils.DataDB, utils.StateServiceUP)
}
anz := apiService.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AdminS, utils.AnalyzerS, utils.StateServiceUP)
}
sdbs := apiService.srvIndexer.GetService(utils.StorDB).(*StorDBService)
if utils.StructChanTimeout(sdbs.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AdminS, utils.StorDB, utils.StateServiceUP)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
utils.StorDB,
},
apiService.srvIndexer, apiService.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
apiService.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
sdbs := srvDeps[utils.StorDB].(*StorDBService)
apiService.Lock()
defer apiService.Unlock()

View File

@@ -71,11 +71,12 @@ func (anz *AnalyzerService) Start(shutdown chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
cls := anz.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), anz.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AnalyzerS, utils.CommonListenerS, utils.StateServiceUP)
cls, err := waitForServiceState(utils.StateServiceUP, utils.CommonListenerS, anz.srvIndexer,
anz.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
anz.cl = cls.CLS()
anz.cl = cls.(*CommonListenerService).CLS()
anz.Lock()
defer anz.Unlock()

View File

@@ -64,37 +64,29 @@ func (attrS *AttributeService) Start(shutdown chan struct{}) (err error) {
if attrS.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
if utils.StructChanTimeout(
attrS.srvIndexer.GetService(utils.CommonListenerS).StateChan(utils.StateServiceUP),
attrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CommonListenerS, utils.StateServiceUP)
}
cls := attrS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CommonListenerS, utils.StateServiceUP)
}
attrS.cl = cls.CLS()
cacheS := attrS.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CacheS, utils.StateServiceUP)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
attrS.srvIndexer, attrS.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
attrS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheAttributeProfiles,
utils.CacheAttributeFilterIndexes); err != nil {
return
}
fs := attrS.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.FilterS, utils.StateServiceUP)
}
dbs := attrS.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.DataDB, utils.StateServiceUP)
}
anz := attrS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.AnalyzerS, utils.StateServiceUP)
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
attrS.Lock()
defer attrS.Unlock()

View File

@@ -19,6 +19,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
@@ -41,6 +43,7 @@ func NewCacheService(cfg *config.CGRConfig, connMgr *engine.ConnManager,
// CacheService implements Agent interface
type CacheService struct {
mu sync.Mutex
cl *commonlisteners.CommonListenerS
cacheCh chan *engine.CacheS
@@ -54,23 +57,29 @@ type CacheService struct {
// Start should handle the sercive start
func (cS *CacheService) Start(shutdown chan struct{}) (err error) {
cls := cS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.CacheS, utils.CommonListenerS, utils.StateServiceUP)
if cS.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
cS.cl = cls.CLS()
dbs := cS.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.CacheS, utils.DataDB, utils.StateServiceUP)
}
anz := cS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.CacheS, utils.AnalyzerS, utils.StateServiceUP)
}
cs := cS.srvIndexer.GetService(utils.CoreS).(*CoreService)
if utils.StructChanTimeout(cs.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.CacheS, utils.CoreS, utils.StateServiceUP)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.DataDB,
utils.AnalyzerS,
utils.CoreS,
},
cS.srvIndexer, cS.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
cS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
cs := srvDeps[utils.CoreS].(*CoreService)
cS.mu.Lock()
defer cS.mu.Unlock()
engine.Cache = engine.NewCacheS(cS.cfg, dbs.DataManager(), cS.connMgr, cs.CoreS().CapsStats)
go engine.Cache.Precache(shutdown)

View File

@@ -64,34 +64,29 @@ func (cs *CDRService) Start(_ chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
cls := cs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.CDRs, utils.CommonListenerS, utils.StateServiceUP)
}
cs.cl = cls.CLS()
fs := cs.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.CDRs, utils.FilterS, utils.StateServiceUP)
}
dbs := cs.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.CDRs, utils.DataDB, utils.StateServiceUP)
}
anz := cs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.CDRs, utils.AnalyzerS, utils.StateServiceUP)
}
sdbs := cs.srvIndexer.GetService(utils.StorDB).(*StorDBService)
if utils.StructChanTimeout(sdbs.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.CDRs, utils.StorDB, utils.StateServiceUP)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
utils.StorDB,
},
cs.srvIndexer, cs.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
cs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
sdbs := srvDeps[utils.StorDB].(*StorDBService)
cs.Lock()
defer cs.Unlock()
cs.cdrS = cdrs.NewCDRServer(cs.cfg, dbs.DataManager(), fs.FilterS(), cs.connMgr, sdbs.DB())
runtime.Gosched()
utils.Logger.Info("Registering CDRS RPC service.")
srv, err := engine.NewServiceWithPing(cs.cdrS, utils.CDRsV1, utils.V1Prfx)
if err != nil {
return err

View File

@@ -57,37 +57,33 @@ type ChargerService struct {
}
// Start should handle the service start
func (chrS *ChargerService) Start(shutdown chan struct{}) (err error) {
func (chrS *ChargerService) Start(shutdown chan struct{}) error {
if chrS.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
cls := chrS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.CommonListenerS, utils.StateServiceUP)
}
chrS.cl = cls.CLS()
cacheS := chrS.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.CacheS, utils.StateServiceUP)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
chrS.srvIndexer, chrS.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
chrS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheChargerProfiles,
utils.CacheChargerFilterIndexes); err != nil {
return
}
fs := chrS.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.FilterS, utils.StateServiceUP)
}
dbs := chrS.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.DataDB, utils.StateServiceUP)
}
anz := chrS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.AnalyzerS, utils.StateServiceUP)
return err
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
chrS.Lock()
defer chrS.Unlock()
@@ -102,7 +98,7 @@ func (chrS *ChargerService) Start(shutdown chan struct{}) (err error) {
chrS.intRPCconn = anz.GetInternalCodec(srv, utils.ChargerS)
close(chrS.stateDeps.StateChan(utils.StateServiceUP))
return
return nil
}
// Reload handles the change of config

View File

@@ -22,6 +22,7 @@ import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -41,6 +42,7 @@ func NewConfigService(cfg *config.CGRConfig, srvIndexer *servmanager.ServiceInde
type ConfigService struct {
mu sync.RWMutex
cfg *config.CGRConfig
cl *commonlisteners.CommonListenerS
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
@@ -48,22 +50,29 @@ type ConfigService struct {
// Start handles the service start.
func (s *ConfigService) Start(_ chan struct{}) error {
cls := s.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.GuardianS, utils.CommonListenerS, utils.StateServiceUP)
}
anz := s.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.GuardianS, utils.AnalyzerS, utils.StateServiceUP)
if s.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
srv, _ := engine.NewServiceWithName(s.cfg, utils.ConfigS, true)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.AnalyzerS,
},
s.srvIndexer, s.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
svcs, _ := engine.NewServiceWithName(s.cfg, utils.ConfigS, true)
if !s.cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
cls.CLS().RpcRegister(s)
for _, svc := range svcs {
s.cl.RpcRegister(svc)
}
}
s.intRPCconn = anz.GetInternalCodec(srv, utils.ConfigSv1)
s.intRPCconn = anz.GetInternalCodec(svcs, utils.ConfigSv1)
close(s.stateDeps.StateChan(utils.StateServiceUP))
return nil
}
@@ -75,6 +84,7 @@ func (s *ConfigService) Reload(_ chan struct{}) error {
// Shutdown stops the service.
func (s *ConfigService) Shutdown() error {
s.cl.RpcUnregisterName(utils.ConfigSv1)
return nil
}

View File

@@ -71,15 +71,17 @@ func (cS *CoreService) Start(shutdown chan struct{}) error {
return utils.ErrServiceAlreadyRunning
}
cls := cS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.CoreS, utils.CommonListenerS, utils.StateServiceUP)
}
cS.cl = cls.CLS()
anz := cS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.CoreS, utils.AnalyzerS, utils.StateServiceUP)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.AnalyzerS,
},
cS.srvIndexer, cS.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
cS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
cS.mu.Lock()
defer cS.mu.Unlock()

View File

@@ -67,13 +67,15 @@ func (da *DiameterAgent) Start(shutdown chan struct{}) error {
return utils.ErrServiceAlreadyRunning
}
fs := da.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), da.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.DiameterAgent, utils.FilterS, utils.StateServiceUP)
fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, da.srvIndexer,
da.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
da.Lock()
defer da.Unlock()
return da.start(fs.FilterS(), da.caps, shutdown)
return da.start(fs.(*FilterService).FilterS(), da.caps, shutdown)
}
func (da *DiameterAgent) start(filterS *engine.FilterS, caps *engine.Caps, shutdown chan struct{}) error {

View File

@@ -64,35 +64,30 @@ func (dspS *DispatcherService) Start(shutdown chan struct{}) (err error) {
if dspS.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
utils.Logger.Info("Starting CGRateS DispatcherS service.")
cls := dspS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.CommonListenerS, utils.StateServiceUP)
}
dspS.cl = cls.CLS()
cacheS := dspS.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.CacheS, utils.StateServiceUP)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
dspS.srvIndexer, dspS.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
dspS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheDispatcherProfiles,
utils.CacheDispatcherHosts,
utils.CacheDispatcherFilterIndexes); err != nil {
return
}
fs := dspS.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.FilterS, utils.StateServiceUP)
}
dbs := dspS.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.DataDB, utils.StateServiceUP)
}
anz := dspS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.AnalyzerS, utils.StateServiceUP)
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
dspS.Lock()
defer dspS.Unlock()

View File

@@ -62,14 +62,16 @@ func (dns *DNSAgent) Start(shutdown chan struct{}) (err error) {
if dns.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
fs := dns.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), dns.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.DNSAgent, utils.FilterS, utils.StateServiceUP)
fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, dns.srvIndexer,
dns.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
dns.Lock()
defer dns.Unlock()
dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.FilterS(), dns.connMgr)
dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.(*FilterService).FilterS(), dns.connMgr)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error()))
dns.dns = nil

View File

@@ -98,24 +98,23 @@ func (es *EventExporterService) Start(_ chan struct{}) error {
return utils.ErrServiceAlreadyRunning
}
cls := es.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), es.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.EEs, utils.CommonListenerS, utils.StateServiceUP)
}
es.cl = cls.CLS()
fs := es.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), es.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.EEs, utils.FilterS, utils.StateServiceUP)
}
anz := es.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), es.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.EEs, utils.AnalyzerS, utils.StateServiceUP)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.FilterS,
utils.AnalyzerS,
},
es.srvIndexer, es.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
es.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
fs := srvDeps[utils.FilterS].(*FilterService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
es.mu.Lock()
defer es.mu.Unlock()
var err error
es.eeS, err = ees.NewEventExporterS(es.cfg, fs.FilterS(), es.connMgr)
if err != nil {
return err

View File

@@ -63,17 +63,21 @@ func (efServ *ExportFailoverService) Start(_ chan struct{}) (err error) {
if efServ.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
cls := efServ.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), efServ.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.EFs, utils.CommonListenerS, utils.StateServiceUP)
cls, err := waitForServiceState(utils.StateServiceUP, utils.CommonListenerS, efServ.srvIndexer,
efServ.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
efServ.cl = cls.CLS()
efServ.cl = cls.(*CommonListenerService).CLS()
efServ.Lock()
defer efServ.Unlock()
efServ.efS = efs.NewEfs(efServ.cfg, efServ.connMgr)
efServ.stopChan = make(chan struct{})
efServ.srv, _ = engine.NewServiceWithPing(efServ.efS, utils.EfSv1, utils.V1Prfx)
efServ.cl.RpcRegister(efServ.srv)
efServ.Unlock()
close(efServ.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -68,19 +68,19 @@ func (erS *EventReaderService) Start(shutdown chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
cls := erS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), erS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ERs, utils.CommonListenerS, utils.StateServiceUP)
}
erS.cl = cls.CLS()
fs := erS.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), erS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ERs, utils.FilterS, utils.StateServiceUP)
}
anz := erS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), erS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ERs, utils.AnalyzerS, utils.StateServiceUP)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.FilterS,
utils.AnalyzerS,
},
erS.srvIndexer, erS.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
erS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
fs := srvDeps[utils.FilterS].(*FilterService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
erS.Lock()
defer erS.Unlock()

View File

@@ -55,17 +55,28 @@ type FilterService struct {
// Start handles the service start.
func (s *FilterService) Start(shutdown chan struct{}) error {
cacheS := s.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.FilterS, utils.CacheS, utils.StateServiceUP)
if s.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
if err := cacheS.WaitToPrecache(shutdown, utils.CacheFilters); err != nil {
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CacheS,
utils.DataDB,
},
s.srvIndexer, s.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
dbs := s.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.FilterS, utils.DataDB, utils.StateServiceUP)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown, utils.CacheFilters); err != nil {
return err
}
dbs := srvDeps[utils.DataDB].(*DataDBService)
s.mu.Lock()
defer s.mu.Unlock()
s.fltrS = engine.NewFilterS(s.cfg, s.connMgr, dbs.DataManager())
close(s.stateDeps.StateChan(utils.StateServiceUP))
return nil

View File

@@ -22,6 +22,7 @@ import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/guardian"
@@ -42,6 +43,7 @@ func NewGuardianService(cfg *config.CGRConfig, srvIndexer *servmanager.ServiceIn
type GuardianService struct {
mu sync.RWMutex
cfg *config.CGRConfig
cl *commonlisteners.CommonListenerS
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
@@ -49,21 +51,32 @@ type GuardianService struct {
// Start handles the service start.
func (s *GuardianService) Start(_ chan struct{}) error {
cls := s.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.GuardianS, utils.CommonListenerS, utils.StateServiceUP)
if s.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
anz := s.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.GuardianS, utils.AnalyzerS, utils.StateServiceUP)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.AnalyzerS,
},
s.srvIndexer, s.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
srv, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true)
s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
s.mu.Lock()
defer s.mu.Unlock()
svcs, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true)
if !s.cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
cls.CLS().RpcRegister(s)
for _, svc := range svcs {
s.cl.RpcRegister(svc)
}
}
s.intRPCconn = anz.GetInternalCodec(srv, utils.GuardianS)
s.intRPCconn = anz.GetInternalCodec(svcs, utils.GuardianS)
close(s.stateDeps.StateChan(utils.StateServiceUP))
return nil
}
@@ -75,6 +88,9 @@ func (s *GuardianService) Reload(_ chan struct{}) error {
// Shutdown stops the service.
func (s *GuardianService) Shutdown() error {
s.mu.Lock()
defer s.mu.Unlock()
s.cl.RpcUnregisterName(utils.GuardianSv1)
return nil
}
@@ -85,7 +101,7 @@ func (s *GuardianService) IsRunning() bool {
// ServiceName returns the service name
func (s *GuardianService) ServiceName() string {
return utils.FilterS
return utils.GuardianS
}
// ShouldRun returns if the service should be running.

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package services
import (
"fmt"
"sync"
"github.com/cgrates/birpc"
@@ -67,26 +66,28 @@ func (ha *HTTPAgent) Start(_ chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
cls := ha.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ha.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.HTTPAgent, utils.CommonListenerS, utils.StateServiceUP)
}
cl := cls.CLS()
fs := ha.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ha.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.HTTPAgent, utils.FilterS, utils.StateServiceUP)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.FilterS,
},
ha.srvIndexer, ha.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
fs := srvDeps[utils.FilterS].(*FilterService)
ha.Lock()
defer ha.Unlock()
ha.started = true
utils.Logger.Info(fmt.Sprintf("<%s> successfully started HTTPAgent", utils.HTTPAgent))
for _, agntCfg := range ha.cfg.HTTPAgentCfg() {
cl.RegisterHttpHandler(agntCfg.URL,
agents.NewHTTPAgent(ha.connMgr, agntCfg.SessionSConns, fs.FilterS(),
ha.cfg.GeneralCfg().DefaultTenant, agntCfg.RequestPayload,
agntCfg.ReplyPayload, agntCfg.RequestProcessors))
}
ha.Unlock()
close(ha.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -63,15 +63,17 @@ type JanusAgent struct {
// Start should jandle the sercive start
func (ja *JanusAgent) Start(_ chan struct{}) (err error) {
cls := ja.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ja.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.JanusAgent, utils.CommonListenerS, utils.StateServiceUP)
}
cl := cls.CLS()
fs := ja.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ja.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.JanusAgent, utils.FilterS, utils.StateServiceUP)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.FilterS,
},
ja.srvIndexer, ja.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
fs := srvDeps[utils.FilterS].(*FilterService)
ja.Lock()
if ja.started {
@@ -97,7 +99,6 @@ func (ja *JanusAgent) Start(_ chan struct{}) (err error) {
ja.started = true
ja.Unlock()
close(ja.stateDeps.StateChan(utils.StateServiceUP))
utils.Logger.Info(fmt.Sprintf("<%s> successfully started.", utils.JanusAgent))
return
}

View File

@@ -65,24 +65,21 @@ func (ldrs *LoaderService) Start(_ chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
cls := ldrs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.CommonListenerS, utils.StateServiceUP)
}
ldrs.cl = cls.CLS()
fs := ldrs.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.FilterS, utils.StateServiceUP)
}
dbs := ldrs.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.DataDB, utils.StateServiceUP)
}
anz := ldrs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.AnalyzerS, utils.StateServiceUP)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
ldrs.srvIndexer, ldrs.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
ldrs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
ldrs.Lock()
defer ldrs.Unlock()

View File

@@ -66,9 +66,10 @@ func (rad *RadiusAgent) Start(shutdown chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
fs := rad.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), rad.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RadiusAgent, utils.FilterS, utils.StateServiceUP)
fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, rad.srvIndexer,
rad.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
rad.Lock()
@@ -78,8 +79,7 @@ func (rad *RadiusAgent) Start(shutdown chan struct{}) (err error) {
rad.lauth = rad.cfg.RadiusAgentCfg().ListenAuth
rad.lacct = rad.cfg.RadiusAgentCfg().ListenAcct
if rad.rad, err = agents.NewRadiusAgent(rad.cfg, fs.FilterS(), rad.connMgr); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.RadiusAgent, err.Error()))
if rad.rad, err = agents.NewRadiusAgent(rad.cfg, fs.(*FilterService).FilterS(), rad.connMgr); err != nil {
return
}
rad.stopChan = make(chan struct{})

View File

@@ -67,33 +67,29 @@ func (ran *RankingService) Start(shutdown chan struct{}) (err error) {
}
ran.srvDep[utils.DataDB].Add(1)
cls := ran.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RankingS, utils.CommonListenerS, utils.StateServiceUP)
}
ran.cl = cls.CLS()
cacheS := ran.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RankingS, utils.CacheS, utils.StateServiceUP)
}
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheRankingProfiles,
utils.CacheRankings,
); err != nil {
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
ran.srvIndexer, ran.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
dbs := ran.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RankingS, utils.DataDB, utils.StateServiceUP)
}
fs := ran.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RankingS, utils.FilterS, utils.StateServiceUP)
}
anz := ran.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RankingS, utils.AnalyzerS, utils.StateServiceUP)
ran.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheRankingProfiles,
utils.CacheRankings); err != nil {
return err
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
ran.Lock()
defer ran.Unlock()

View File

@@ -96,33 +96,29 @@ func (rs *RateService) Start(shutdown chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
cls := rs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RateS, utils.CommonListenerS, utils.StateServiceUP)
}
rs.cl = cls.CLS()
cacheS := rs.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RateS, utils.CacheS, utils.StateServiceUP)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
rs.srvIndexer, rs.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
rs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheRateProfiles,
utils.CacheRateProfilesFilterIndexes,
utils.CacheRateFilterIndexes); err != nil {
return
}
fs := rs.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RateS, utils.FilterS, utils.StateServiceUP)
}
dbs := rs.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RateS, utils.DataDB, utils.StateServiceUP)
}
anz := rs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RateS, utils.AnalyzerS, utils.StateServiceUP)
return err
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
rs.Lock()
rs.rateS = rates.NewRateS(rs.cfg, fs.FilterS(), dbs.DataManager())

View File

@@ -67,33 +67,30 @@ func (reS *ResourceService) Start(shutdown chan struct{}) (err error) {
}
reS.srvDep[utils.DataDB].Add(1)
cls := reS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.CommonListenerS, utils.StateServiceUP)
}
reS.cl = cls.CLS()
cacheS := reS.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.CacheS, utils.StateServiceUP)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
reS.srvIndexer, reS.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
reS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheResourceProfiles,
utils.CacheResources,
utils.CacheResourceFilterIndexes); err != nil {
return
}
fs := reS.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.FilterS, utils.StateServiceUP)
}
dbs := reS.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.DataDB, utils.StateServiceUP)
}
anz := reS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.AnalyzerS, utils.StateServiceUP)
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
reS.Lock()
defer reS.Unlock()

View File

@@ -62,32 +62,28 @@ func (routeS *RouteService) Start(shutdown chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
cls := routeS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RouteS, utils.CommonListenerS, utils.StateServiceUP)
}
routeS.cl = cls.CLS()
cacheS := routeS.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RouteS, utils.CacheS, utils.StateServiceUP)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
routeS.srvIndexer, routeS.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
routeS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheRouteProfiles,
utils.CacheRouteFilterIndexes); err != nil {
return
}
fs := routeS.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RouteS, utils.FilterS, utils.StateServiceUP)
}
dbs := routeS.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RouteS, utils.DataDB, utils.StateServiceUP)
}
anz := routeS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RouteS, utils.AnalyzerS, utils.StateServiceUP)
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
routeS.Lock()
defer routeS.Unlock()

View File

@@ -67,23 +67,21 @@ func (smg *SessionService) Start(shutdown chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
cls := smg.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.SessionS, utils.CommonListenerS, utils.StateServiceUP)
}
smg.cl = cls.CLS()
fs := smg.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.SessionS, utils.FilterS, utils.StateServiceUP)
}
dbs := smg.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.SessionS, utils.DataDB, utils.StateServiceUP)
}
anz := smg.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.SessionS, utils.AnalyzerS, utils.StateServiceUP)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
smg.srvIndexer, smg.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
smg.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
smg.Lock()
defer smg.Unlock()
@@ -111,6 +109,7 @@ func (smg *SessionService) Start(shutdown chan struct{}) (err error) {
// run this in it's own goroutine
go smg.start(shutdown)
}
smg.intRPCconn = anz.GetInternalCodec(srv, utils.SessionS)
close(smg.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -63,15 +63,16 @@ func (sip *SIPAgent) Start(shutdown chan struct{}) (err error) {
return utils.ErrServiceAlreadyRunning
}
fs := sip.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), sip.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.SIPAgent, utils.FilterS, utils.StateServiceUP)
fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, sip.srvIndexer,
sip.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
sip.Lock()
defer sip.Unlock()
sip.oldListen = sip.cfg.SIPAgentCfg().Listen
sip.sip, err = agents.NewSIPAgent(sip.connMgr, sip.cfg, fs.FilterS())
sip.sip, err = agents.NewSIPAgent(sip.connMgr, sip.cfg, fs.(*FilterService).FilterS())
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!",
utils.SIPAgent, err))

View File

@@ -67,33 +67,30 @@ func (sts *StatService) Start(shutdown chan struct{}) (err error) {
}
sts.srvDep[utils.DataDB].Add(1)
cls := sts.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.StatS, utils.CommonListenerS, utils.StateServiceUP)
}
sts.cl = cls.CLS()
cacheS := sts.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.StatS, utils.CacheS, utils.StateServiceUP)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
sts.srvIndexer, sts.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
sts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheStatQueueProfiles,
utils.CacheStatQueues,
utils.CacheStatFilterIndexes); err != nil {
return
}
fs := sts.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.StatS, utils.FilterS, utils.StateServiceUP)
}
dbs := sts.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.StatS, utils.DataDB, utils.StateServiceUP)
}
anz := sts.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.StatS, utils.AnalyzerS, utils.StateServiceUP)
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
sts.Lock()
defer sts.Unlock()

View File

@@ -67,33 +67,30 @@ func (thrs *ThresholdService) Start(shutdown chan struct{}) (err error) {
}
thrs.srvDep[utils.DataDB].Add(1)
cls := thrs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.CommonListenerS, utils.StateServiceUP)
}
thrs.cl = cls.CLS()
cacheS := thrs.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.CacheS, utils.StateServiceUP)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
thrs.srvIndexer, thrs.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
thrs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheThresholdProfiles,
utils.CacheThresholds,
utils.CacheThresholdFilterIndexes); err != nil {
return
}
fs := thrs.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.FilterS, utils.StateServiceUP)
}
dbs := thrs.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.DataDB, utils.StateServiceUP)
}
anz := thrs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.AnalyzerS, utils.StateServiceUP)
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
thrs.Lock()
defer thrs.Unlock()

View File

@@ -60,15 +60,18 @@ type TPeService struct {
// Start should handle the service start
func (ts *TPeService) Start(_ chan struct{}) (err error) {
cls := ts.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.TPeS, utils.CommonListenerS, utils.StateServiceUP)
}
ts.cl = cls.CLS()
dbs := ts.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.TPeS, utils.DataDB, utils.StateServiceUP)
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.DataDB,
},
ts.srvIndexer, ts.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
ts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
dbs := srvDeps[utils.DataDB].(*DataDBService)
ts.tpes = tpes.NewTPeS(ts.cfg, dbs.DataManager(), ts.connMgr)
ts.stopChan = make(chan struct{})

View File

@@ -66,33 +66,29 @@ func (trs *TrendService) Start(shutdown chan struct{}) (err error) {
}
trs.srvDep[utils.DataDB].Add(1)
cls := trs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.TrendS, utils.CommonListenerS, utils.StateServiceUP)
}
trs.cl = cls.CLS()
cacheS := trs.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.TrendS, utils.CacheS, utils.StateServiceUP)
}
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheTrendProfiles,
utils.CacheTrends,
); err != nil {
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
trs.srvIndexer, trs.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
dbs := trs.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.TrendS, utils.DataDB, utils.StateServiceUP)
}
fs := trs.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.TrendS, utils.FilterS, utils.StateServiceUP)
}
anz := trs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.TrendS, utils.AnalyzerS, utils.StateServiceUP)
trs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheTrendProfiles,
utils.CacheTrends); err != nil {
return err
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
trs.Lock()
defer trs.Unlock()