diff --git a/config/config_defaults.go b/config/config_defaults.go index 5d098bdc3..d33d8db6d 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -514,7 +514,7 @@ const CGRATES_CFG_JSON = ` }, "efs": { - "enabled": false, // starts the EventReader service: + "enabled": false, // starts the EventReader service: "poster_attempts": 3, // number of attempts before considering post request failed (eg: *httpPost, CDR exports) "failed_posts_dir": "/var/spool/cgrates/failed_posts", // directory path where we store failed requests "failed_posts_ttl": "5s" // time to wait before writing the failed posts in a single file diff --git a/services/accounts.go b/services/accounts.go index f94e03e10..756728539 100644 --- a/services/accounts.go +++ b/services/accounts.go @@ -53,6 +53,7 @@ func NewAccountService(cfg *config.CGRConfig, dm *DataDBService, srvDep: srvDep, rldChan: make(chan struct{}, 1), srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -86,7 +87,6 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e if acts.IsRunning() { return utils.ErrServiceAlreadyRunning } - acts.stateDeps = NewStateDependencies([]string{utils.StateServiceUP}) acts.cl = <-acts.clSChan acts.clSChan <- acts.cl if err = acts.cacheS.WaitToPrecache(ctx, @@ -124,6 +124,7 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e acts.intRPCconn = anz.GetInternalCodec(srv, utils.AccountS) acts.connChan <- acts.intRPCconn + close(acts.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/actions.go b/services/actions.go index 4aa603ea2..b820cd87c 100644 --- a/services/actions.go +++ b/services/actions.go @@ -53,6 +53,7 @@ func NewActionService(cfg *config.CGRConfig, dm *DataDBService, srvDep: srvDep, rldChan: make(chan struct{}, 1), srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -124,6 +125,7 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er acts.intRPCconn = anz.GetInternalCodec(srv, utils.ActionS) acts.connChan <- acts.intRPCconn + close(acts.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/adminsv1.go b/services/adminsv1.go index 66d18b085..29776bc9a 100644 --- a/services/adminsv1.go +++ b/services/adminsv1.go @@ -50,6 +50,7 @@ func NewAdminSv1Service(cfg *config.CGRConfig, anzChan: anzChan, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -124,7 +125,7 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF //backwards compatible apiService.intRPCconn = anz.GetInternalCodec(srv, utils.AdminSv1) apiService.connChan <- apiService.intRPCconn - + close(apiService.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/analyzers.go b/services/analyzers.go index 40eb86f15..2f3ec184d 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -47,6 +47,7 @@ func NewAnalyzerService(cfg *config.CGRConfig, clSChan chan *commonlisteners.Com anzChan: anzChan, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -98,6 +99,7 @@ func (anz *AnalyzerService) Start(ctx *context.Context, shtDwn context.CancelFun }(anz.anz) anz.cl.SetAnalyzer(anz.anz) go anz.start(ctx) + close(anz.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/asteriskagent.go b/services/asteriskagent.go index a78ba2ac3..5b179bbe0 100644 --- a/services/asteriskagent.go +++ b/services/asteriskagent.go @@ -42,6 +42,7 @@ func NewAsteriskAgent(cfg *config.CGRConfig, connMgr: connMgr, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -81,6 +82,7 @@ func (ast *AsteriskAgent) Start(_ *context.Context, shtDwn context.CancelFunc) ( ast.smas[connIdx] = agents.NewAsteriskAgent(ast.cfg, connIdx, ast.connMgr) go listenAndServe(ast.smas[connIdx], ast.stopChan) } + close(ast.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/caches.go b/services/caches.go index e5dc87e1d..a17ce552e 100644 --- a/services/caches.go +++ b/services/caches.go @@ -49,6 +49,7 @@ func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.C rpc: internalChan, cacheCh: make(chan *engine.CacheS, 1), srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -101,6 +102,7 @@ func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (e } cS.intRPCconn = anz.GetInternalCodec(srv, utils.CacheS) cS.rpc <- cS.intRPCconn + close(cS.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/cdrs.go b/services/cdrs.go index c8d93dbfe..11fb1995d 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -51,6 +51,7 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, anzChan: anzChan, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -120,6 +121,7 @@ func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err err cs.intRPCconn = anz.GetInternalCodec(srv, utils.CDRServer) cs.connChan <- cs.intRPCconn // Signal that cdrS is operational + close(cs.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/chargers.go b/services/chargers.go index 3f29e028a..9d01f5b7c 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -49,6 +49,7 @@ func NewChargerService(cfg *config.CGRConfig, dm *DataDBService, anzChan: anzChan, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -113,6 +114,7 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e chrS.intRPCconn = anz.GetInternalCodec(srv, utils.ChargerS) chrS.connChan <- chrS.intRPCconn + close(chrS.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/commonlisteners.go b/services/commonlisteners.go index 96120df5a..c18405f41 100644 --- a/services/commonlisteners.go +++ b/services/commonlisteners.go @@ -41,6 +41,7 @@ func NewCommonListenerService(cfg *config.CGRConfig, caps *engine.Caps, clSChan: clSChan, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -75,6 +76,7 @@ func (cl *CommonListenerService) Start(*context.Context, context.CancelFunc) err if cl.cfg.ConfigSCfg().Enabled { cl.cls.RegisterHTTPFunc(cl.cfg.ConfigSCfg().URL, config.HandlerConfigS) } + close(cl.stateDeps.StateChan(utils.StateServiceUP)) return nil } diff --git a/services/cores.go b/services/cores.go index 6404a0dab..d10380970 100644 --- a/services/cores.go +++ b/services/cores.go @@ -50,6 +50,7 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *comm srvDep: srvDep, csCh: make(chan *cores.CoreS, 1), srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -106,6 +107,7 @@ func (cS *CoreService) Start(ctx *context.Context, shtDw context.CancelFunc) err cS.intRPCconn = anz.GetInternalCodec(srv, utils.CoreS) cS.connChan <- cS.intRPCconn + close(cS.stateDeps.StateChan(utils.StateServiceUP)) return nil } diff --git a/services/datadb.go b/services/datadb.go index df6df9086..704d3a8cf 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -41,6 +41,7 @@ func NewDataDBService(cfg *config.CGRConfig, connMgr *engine.ConnManager, setVer setVersions: setVersions, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -91,6 +92,7 @@ func (db *DataDBService) Start(*context.Context, context.CancelFunc) (err error) } db.dbchan <- db.dm + close(db.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/diameteragent.go b/services/diameteragent.go index d6b4b9dea..2d48bcf90 100644 --- a/services/diameteragent.go +++ b/services/diameteragent.go @@ -43,6 +43,7 @@ func NewDiameterAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, caps: caps, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -100,6 +101,7 @@ func (da *DiameterAgent) start(filterS *engine.FilterS, shtDwn context.CancelFun shtDwn() } }(da.da) + close(da.stateDeps.StateChan(utils.StateServiceUP)) return nil } diff --git a/services/dispatchers.go b/services/dispatchers.go index 5663a1111..fe382282e 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -50,6 +50,7 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, srvDep: srvDep, srvsReload: make(map[string]chan struct{}), srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -120,7 +121,7 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc) // dspS.server.SetDispatched() dspS.intRPCconn = anz.GetInternalCodec(srv, utils.DispatcherS) dspS.connChan <- dspS.intRPCconn - + close(dspS.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/dnsagent.go b/services/dnsagent.go index 00c0531f3..ab199dd4f 100644 --- a/services/dnsagent.go +++ b/services/dnsagent.go @@ -42,6 +42,7 @@ func NewDNSAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr: connMgr, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -82,6 +83,7 @@ func (dns *DNSAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) (err } dns.stopChan = make(chan struct{}) go dns.listenAndServe(dns.stopChan, shtDwn) + close(dns.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/ees.go b/services/ees.go index e0f7829e9..683b4ccd6 100644 --- a/services/ees.go +++ b/services/ees.go @@ -46,6 +46,7 @@ func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.Fil anzChan: anzChan, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -140,6 +141,7 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc es.intRPCconn = anz.GetInternalCodec(srv, utils.EEs) es.intConnChan <- es.intRPCconn + close(es.stateDeps.StateChan(utils.StateServiceUP)) return nil } diff --git a/services/efs.go b/services/efs.go index feb9696d9..7534a9c57 100644 --- a/services/efs.go +++ b/services/efs.go @@ -67,6 +67,7 @@ func NewExportFailoverService(cfg *config.CGRConfig, connMgr *engine.ConnManager intConnChan: intConnChan, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -84,6 +85,7 @@ func (efServ *ExportFailoverService) Start(ctx *context.Context, _ context.Cance efServ.srv, _ = engine.NewServiceWithPing(efServ.efS, utils.EfSv1, utils.V1Prfx) efServ.cl.RpcRegister(efServ.srv) efServ.Unlock() + close(efServ.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/ers.go b/services/ers.go index 51b24ade4..d819d8fc2 100644 --- a/services/ers.go +++ b/services/ers.go @@ -52,6 +52,7 @@ func NewEventReaderService( anzChan: anzChan, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -114,6 +115,7 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel } erS.intRPCconn = anz.GetInternalCodec(srv, utils.ERs) erS.intConn <- erS.intRPCconn + close(erS.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/freeswitchagent.go b/services/freeswitchagent.go index 1a0420ac9..4b5aebeb9 100644 --- a/services/freeswitchagent.go +++ b/services/freeswitchagent.go @@ -42,6 +42,7 @@ func NewFreeswitchAgent(cfg *config.CGRConfig, connMgr: connMgr, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -71,6 +72,7 @@ func (fS *FreeswitchAgent) Start(_ *context.Context, shtDwn context.CancelFunc) fS.fS = agents.NewFSsessions(fS.cfg.FsAgentCfg(), fS.cfg.GeneralCfg().DefaultTimezone, fS.connMgr) go fS.connect(shtDwn) + close(fS.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/globalvars.go b/services/globalvars.go index 62294deee..b1c0d8c69 100644 --- a/services/globalvars.go +++ b/services/globalvars.go @@ -38,6 +38,7 @@ func NewGlobalVarS(cfg *config.CGRConfig, cfg: cfg, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -58,6 +59,7 @@ func (gv *GlobalVarS) Start(*context.Context, context.CancelFunc) error { utils.DecimalContext.MinScale = gv.cfg.GeneralCfg().DecimalMinScale utils.DecimalContext.Precision = gv.cfg.GeneralCfg().DecimalPrecision utils.DecimalContext.RoundingMode = gv.cfg.GeneralCfg().DecimalRoundingMode + close(gv.stateDeps.StateChan(utils.StateServiceUP)) return nil } diff --git a/services/httpagent.go b/services/httpagent.go index 9583636b5..facc50e1d 100644 --- a/services/httpagent.go +++ b/services/httpagent.go @@ -44,6 +44,7 @@ func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr: connMgr, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -92,6 +93,7 @@ func (ha *HTTPAgent) Start(ctx *context.Context, _ context.CancelFunc) (err erro agntCfg.ReplyPayload, agntCfg.RequestProcessors)) } ha.Unlock() + close(ha.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/janus.go b/services/janus.go index 20d1156f1..0ed20d203 100644 --- a/services/janus.go +++ b/services/janus.go @@ -45,6 +45,7 @@ func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr: connMgr, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -102,6 +103,7 @@ func (ja *JanusAgent) Start(ctx *context.Context, _ context.CancelFunc) (err err ja.started = true ja.Unlock() + close(ja.stateDeps.StateChan(utils.StateServiceUP)) utils.Logger.Info(fmt.Sprintf("<%s> successfully started.", utils.JanusAgent)) return } diff --git a/services/kamailioagent.go b/services/kamailioagent.go index 96ac32f9a..0a912cace 100644 --- a/services/kamailioagent.go +++ b/services/kamailioagent.go @@ -43,6 +43,7 @@ func NewKamailioAgent(cfg *config.CGRConfig, connMgr: connMgr, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -73,6 +74,7 @@ func (kam *KamailioAgent) Start(_ *context.Context, shtDwn context.CancelFunc) ( utils.FirstNonEmpty(kam.cfg.KamAgentCfg().Timezone, kam.cfg.GeneralCfg().DefaultTimezone)) go kam.connect(kam.kam, shtDwn) + close(kam.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/loaders.go b/services/loaders.go index b7e470d10..357cd5685 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -50,6 +50,7 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService, anzChan: anzChan, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -115,6 +116,7 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er } ldrs.intRPCconn = anz.GetInternalCodec(srv, utils.LoaderS) ldrs.connChan <- ldrs.intRPCconn + close(ldrs.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/radiusagent.go b/services/radiusagent.go index 0e4a6046e..b231ddcbc 100644 --- a/services/radiusagent.go +++ b/services/radiusagent.go @@ -42,6 +42,7 @@ func NewRadiusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr: connMgr, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -90,7 +91,7 @@ func (rad *RadiusAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) ( rad.stopChan = make(chan struct{}) go rad.listenAndServe(rad.rad, shtDwn) - + close(rad.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/rankings.go b/services/rankings.go index 96feb5ace..50549a76b 100644 --- a/services/rankings.go +++ b/services/rankings.go @@ -50,6 +50,7 @@ func NewRankingService(cfg *config.CGRConfig, dm *DataDBService, anzChan: anzChan, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -121,6 +122,7 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er } ran.intRPCconn = anz.GetInternalCodec(srv, utils.RankingS) ran.connChan <- ran.intRPCconn + close(ran.stateDeps.StateChan(utils.StateServiceUP)) return nil } diff --git a/services/rates.go b/services/rates.go index ff63d2ebb..b3817aca2 100644 --- a/services/rates.go +++ b/services/rates.go @@ -49,6 +49,7 @@ func NewRateService(cfg *config.CGRConfig, anzChan: anzChan, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -154,6 +155,7 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er rs.intRPCconn = anz.GetInternalCodec(srv, utils.RateS) rs.intConnChan <- rs.intRPCconn + close(rs.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/registrarc.go b/services/registrarc.go index 2d6387450..b3860b049 100644 --- a/services/registrarc.go +++ b/services/registrarc.go @@ -39,6 +39,7 @@ func NewRegistrarCService(cfg *config.CGRConfig, connMgr *engine.ConnManager, connMgr: connMgr, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -72,7 +73,7 @@ func (dspS *RegistrarCService) Start(*context.Context, context.CancelFunc) (err dspS.rldChan = make(chan struct{}) dspS.dspS = registrarc.NewRegistrarCService(dspS.cfg, dspS.connMgr) go dspS.dspS.ListenAndServe(dspS.stopChan, dspS.rldChan) - + close(dspS.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/resources.go b/services/resources.go index 59127a45b..9cc12798d 100644 --- a/services/resources.go +++ b/services/resources.go @@ -49,6 +49,7 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService, anzChan: anzChan, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -116,6 +117,7 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e reS.intRPCconn = anz.GetInternalCodec(srv, utils.ResourceS) reS.connChan <- reS.intRPCconn + close(reS.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/routes.go b/services/routes.go index 4e0049621..84cdd374f 100644 --- a/services/routes.go +++ b/services/routes.go @@ -50,6 +50,7 @@ func NewRouteService(cfg *config.CGRConfig, dm *DataDBService, anzChan: anzChan, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -114,6 +115,7 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e } routeS.intRPCconn = anz.GetInternalCodec(srv, utils.RouteS) routeS.connChan <- routeS.intRPCconn + close(routeS.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/sessions.go b/services/sessions.go index 9588a067d..1e464ec0e 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -50,6 +50,7 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan cha anzChan: anzChan, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -123,6 +124,7 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc) // run this in it's own goroutine go smg.start(shtDw) } + close(smg.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/sipagent.go b/services/sipagent.go index 2dc19aea9..83e7ea9bd 100644 --- a/services/sipagent.go +++ b/services/sipagent.go @@ -42,6 +42,7 @@ func NewSIPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr: connMgr, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -83,6 +84,7 @@ func (sip *SIPAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) (err return } go sip.listenAndServe(shtDwn) + close(sip.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/stats.go b/services/stats.go index 659e48d65..6419e6e4b 100644 --- a/services/stats.go +++ b/services/stats.go @@ -49,6 +49,7 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService, anzChan: anzChan, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -117,6 +118,7 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e } sts.intRPCconn = anz.GetInternalCodec(srv, utils.StatS) sts.connChan <- sts.intRPCconn + close(sts.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/stordb.go b/services/stordb.go index ab01e5581..72f5d1fe7 100644 --- a/services/stordb.go +++ b/services/stordb.go @@ -39,6 +39,7 @@ func NewStorDBService(cfg *config.CGRConfig, setVersions bool, setVersions: setVersions, srvDep: srvDep, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -88,6 +89,7 @@ func (db *StorDBService) Start(*context.Context, context.CancelFunc) (err error) } db.sync() + close(db.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/thresholds.go b/services/thresholds.go index 610f00d5b..546c2814c 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -49,6 +49,7 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService, srvDep: srvDep, connMgr: connMgr, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -116,6 +117,7 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc) } thrs.intRPCconn = anz.GetInternalCodec(srv, utils.ThresholdS) thrs.connChan <- thrs.intRPCconn + close(thrs.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/tpes.go b/services/tpes.go index 7dbd019f8..b000e074d 100644 --- a/services/tpes.go +++ b/services/tpes.go @@ -43,6 +43,7 @@ func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, dm *DataD connMgr: connMgr, clSChan: clSChan, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -81,6 +82,7 @@ func (ts *TPeService) Start(ctx *context.Context, _ context.CancelFunc) (err err ts.stopChan = make(chan struct{}) ts.srv, _ = birpc.NewService(apis.NewTPeSv1(ts.tpes), utils.EmptyString, false) ts.cl.RpcRegister(ts.srv) + close(ts.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/trends.go b/services/trends.go index 82341efac..b67dd2838 100644 --- a/services/trends.go +++ b/services/trends.go @@ -49,6 +49,7 @@ func NewTrendService(cfg *config.CGRConfig, dm *DataDBService, srvDep: srvDep, filterSChan: filterSChan, srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -118,6 +119,7 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err } trs.intRPCconn = anz.GetInternalCodec(srv, utils.Trends) trs.connChan <- trs.intRPCconn + close(trs.stateDeps.StateChan(utils.StateServiceUP)) return nil }