mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-14 20:59:53 +05:00
Export funcs that wait for service state
This commit is contained in:
committed by
Dan Christian Bogos
parent
089dfc00ae
commit
bf3d9a3281
@@ -60,7 +60,7 @@ type AccountService struct {
|
||||
|
||||
// Start should handle the service start
|
||||
func (acts *AccountService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.CacheS,
|
||||
|
||||
@@ -61,7 +61,7 @@ type ActionService struct {
|
||||
|
||||
// Start should handle the service start
|
||||
func (acts *ActionService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.CacheS,
|
||||
|
||||
@@ -58,7 +58,7 @@ type AdminSv1Service struct {
|
||||
// Start should handle the sercive start
|
||||
// For this service the start should be called from RAL Service
|
||||
func (apiService *AdminSv1Service) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.FilterS,
|
||||
|
||||
@@ -58,7 +58,7 @@ type AnalyzerService struct {
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (anz *AnalyzerService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
cls, err := waitForServiceState(utils.StateServiceUP, utils.CommonListenerS, registry,
|
||||
cls, err := WaitForServiceState(utils.StateServiceUP, utils.CommonListenerS, registry,
|
||||
anz.cfg.GeneralCfg().ConnectTimeout)
|
||||
if err != nil {
|
||||
return
|
||||
@@ -84,7 +84,7 @@ func (anz *AnalyzerService) Start(shutdown *utils.SyncedChan, registry *servmana
|
||||
}
|
||||
|
||||
func (anz *AnalyzerService) start(registry *servmanager.ServiceRegistry) {
|
||||
fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
|
||||
fs, err := WaitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
|
||||
anz.cfg.GeneralCfg().ConnectTimeout)
|
||||
if err != nil {
|
||||
return
|
||||
|
||||
@@ -58,7 +58,7 @@ type AttributeService struct {
|
||||
|
||||
// Start should handle the service start
|
||||
func (attrS *AttributeService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.CacheS,
|
||||
|
||||
@@ -54,7 +54,7 @@ type CacheService struct {
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (cS *CacheService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.DataDB,
|
||||
|
||||
@@ -57,7 +57,7 @@ type CDRService struct {
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (cs *CDRService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.FilterS,
|
||||
|
||||
@@ -55,7 +55,7 @@ type ChargerService struct {
|
||||
|
||||
// Start should handle the service start
|
||||
func (chrS *ChargerService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.CacheS,
|
||||
|
||||
@@ -48,7 +48,7 @@ type ConfigService struct {
|
||||
|
||||
// Start handles the service start.
|
||||
func (s *ConfigService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.AnalyzerS,
|
||||
|
||||
@@ -64,7 +64,7 @@ type CoreService struct {
|
||||
|
||||
// Start should handle the service start
|
||||
func (cS *CoreService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.AnalyzerS,
|
||||
|
||||
@@ -60,7 +60,7 @@ type DiameterAgent struct {
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (da *DiameterAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
|
||||
fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
|
||||
fs, err := WaitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
|
||||
da.cfg.GeneralCfg().ConnectTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -102,7 +102,7 @@ func (da *DiameterAgent) Reload(shutdown *utils.SyncedChan, registry *servmanage
|
||||
}
|
||||
close(da.stopChan)
|
||||
|
||||
fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
|
||||
fs, err := WaitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
|
||||
da.cfg.GeneralCfg().ConnectTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -58,7 +58,7 @@ type DispatcherService struct {
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (dspS *DispatcherService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.CacheS,
|
||||
|
||||
@@ -56,7 +56,7 @@ type DNSAgent struct {
|
||||
|
||||
// Start should handle the service start
|
||||
func (dns *DNSAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
|
||||
fs, err := WaitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
|
||||
dns.cfg.GeneralCfg().ConnectTimeout)
|
||||
if err != nil {
|
||||
return
|
||||
@@ -77,7 +77,7 @@ func (dns *DNSAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.Ser
|
||||
|
||||
// Reload handles the change of config
|
||||
func (dns *DNSAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
|
||||
fs, err := WaitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
|
||||
dns.cfg.GeneralCfg().ConnectTimeout)
|
||||
if err != nil {
|
||||
return
|
||||
|
||||
@@ -84,7 +84,7 @@ func (es *EventExporterService) Shutdown(_ *servmanager.ServiceRegistry) error {
|
||||
|
||||
// Start should handle the service start
|
||||
func (es *EventExporterService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.FilterS,
|
||||
|
||||
@@ -57,7 +57,7 @@ func NewExportFailoverService(cfg *config.CGRConfig, connMgr *engine.ConnManager
|
||||
|
||||
// Start should handle the service start
|
||||
func (efServ *ExportFailoverService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
cls, err := waitForServiceState(utils.StateServiceUP, utils.CommonListenerS, registry,
|
||||
cls, err := WaitForServiceState(utils.StateServiceUP, utils.CommonListenerS, registry,
|
||||
efServ.cfg.GeneralCfg().ConnectTimeout)
|
||||
if err != nil {
|
||||
return
|
||||
|
||||
@@ -61,7 +61,7 @@ type EventReaderService struct {
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (erS *EventReaderService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.FilterS,
|
||||
|
||||
@@ -52,7 +52,7 @@ type FilterService struct {
|
||||
|
||||
// Start handles the service start.
|
||||
func (s *FilterService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CacheS,
|
||||
utils.DataDB,
|
||||
|
||||
@@ -49,7 +49,7 @@ type GuardianService struct {
|
||||
|
||||
// Start handles the service start.
|
||||
func (s *GuardianService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.AnalyzerS,
|
||||
|
||||
@@ -59,7 +59,7 @@ type HTTPAgent struct {
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (ha *HTTPAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.FilterS,
|
||||
|
||||
@@ -60,7 +60,7 @@ type JanusAgent struct {
|
||||
|
||||
// Start should jandle the sercive start
|
||||
func (ja *JanusAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.FilterS,
|
||||
|
||||
@@ -58,7 +58,7 @@ type LoaderService struct {
|
||||
|
||||
// Start should handle the service start
|
||||
func (ldrs *LoaderService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.FilterS,
|
||||
@@ -98,7 +98,7 @@ func (ldrs *LoaderService) Start(_ *utils.SyncedChan, registry *servmanager.Serv
|
||||
|
||||
// Reload handles the change of config
|
||||
func (ldrs *LoaderService) Reload(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.FilterS,
|
||||
utils.DataDB,
|
||||
|
||||
@@ -59,7 +59,7 @@ type RadiusAgent struct {
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (rad *RadiusAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
|
||||
fs, err := WaitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
|
||||
rad.cfg.GeneralCfg().ConnectTimeout)
|
||||
if err != nil {
|
||||
return
|
||||
|
||||
@@ -61,7 +61,7 @@ type RankingService struct {
|
||||
func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
ran.srvDep[utils.DataDB].Add(1)
|
||||
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.CacheS,
|
||||
|
||||
@@ -82,7 +82,7 @@ func (rs *RateService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
|
||||
|
||||
// Start should handle the service start
|
||||
func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.CacheS,
|
||||
|
||||
@@ -61,7 +61,7 @@ type ResourceService struct {
|
||||
func (reS *ResourceService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
reS.srvDep[utils.DataDB].Add(1)
|
||||
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.CacheS,
|
||||
|
||||
@@ -55,7 +55,7 @@ type RouteService struct {
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (routeS *RouteService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.CacheS,
|
||||
|
||||
@@ -60,7 +60,7 @@ type SessionService struct {
|
||||
|
||||
// Start should handle the service start
|
||||
func (smg *SessionService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.FilterS,
|
||||
|
||||
@@ -56,7 +56,7 @@ type SIPAgent struct {
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (sip *SIPAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
|
||||
fs, err := WaitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
|
||||
sip.cfg.GeneralCfg().ConnectTimeout)
|
||||
if err != nil {
|
||||
return
|
||||
|
||||
@@ -50,13 +50,13 @@ func (sDs *StateDependencies) StateChan(stateID string) (retChan chan struct{})
|
||||
return
|
||||
}
|
||||
|
||||
// waitForServicesToReachState ensures each service reaches the desired state, with the timeout applied individually per service.
|
||||
// WaitForServicesToReachState ensures each service reaches the desired state, with the timeout applied individually per service.
|
||||
// Returns a map of service names to their instances or an error if any service fails to reach its state within its timeout window.
|
||||
func waitForServicesToReachState(state string, serviceIDs []string, indexer *servmanager.ServiceRegistry, timeout time.Duration,
|
||||
func WaitForServicesToReachState(state string, serviceIDs []string, indexer *servmanager.ServiceRegistry, timeout time.Duration,
|
||||
) (map[string]servmanager.Service, error) {
|
||||
services := make(map[string]servmanager.Service, len(serviceIDs))
|
||||
for _, serviceID := range serviceIDs {
|
||||
srv, err := waitForServiceState(state, serviceID, indexer, timeout)
|
||||
srv, err := WaitForServiceState(state, serviceID, indexer, timeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -66,9 +66,9 @@ func waitForServicesToReachState(state string, serviceIDs []string, indexer *ser
|
||||
return services, nil
|
||||
}
|
||||
|
||||
// waitForServiceState waits up to timeout duration for a service to reach the specified state.
|
||||
// WaitForServiceState waits up to timeout duration for a service to reach the specified state.
|
||||
// Returns the service instance or an error if the timeout is exceeded.
|
||||
func waitForServiceState(state, serviceID string, indexer *servmanager.ServiceRegistry, timeout time.Duration,
|
||||
func WaitForServiceState(state, serviceID string, indexer *servmanager.ServiceRegistry, timeout time.Duration,
|
||||
) (servmanager.Service, error) {
|
||||
srv := indexer.Lookup(serviceID)
|
||||
if serviceID == utils.AnalyzerS && !srv.ShouldRun() {
|
||||
|
||||
@@ -59,7 +59,7 @@ type StatService struct {
|
||||
func (sts *StatService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
sts.srvDep[utils.DataDB].Add(1)
|
||||
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.CacheS,
|
||||
|
||||
@@ -61,7 +61,7 @@ type ThresholdService struct {
|
||||
func (thrs *ThresholdService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
thrs.srvDep[utils.DataDB].Add(1)
|
||||
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.CacheS,
|
||||
|
||||
@@ -58,7 +58,7 @@ type TPeService struct {
|
||||
// Start should handle the service start
|
||||
func (ts *TPeService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.DataDB,
|
||||
|
||||
@@ -60,7 +60,7 @@ type TrendService struct {
|
||||
func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
trs.srvDep[utils.DataDB].Add(1)
|
||||
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
utils.CacheS,
|
||||
|
||||
Reference in New Issue
Block a user