diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 6dcaff4d5..91ca02101 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -44,7 +44,6 @@ import ( "github.com/cgrates/cgrates/services" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/rpcclient" ) func main() { @@ -161,63 +160,12 @@ func runCGREngine(fs []string) (err error) { utils.TPeS: new(sync.WaitGroup), } - // init the channel here because we need to pass them to connManager iServeManagerCh := make(chan birpc.ClientConnector, 1) - iConfigCh := make(chan birpc.ClientConnector, 1) - iCoreSv1Ch := make(chan birpc.ClientConnector, 1) - iCacheSCh := make(chan birpc.ClientConnector, 1) - iGuardianSCh := make(chan birpc.ClientConnector, 1) - iAnalyzerSCh := make(chan birpc.ClientConnector, 1) - iCDRServerCh := make(chan birpc.ClientConnector, 1) - iAttributeSCh := make(chan birpc.ClientConnector, 1) - iDispatcherSCh := make(chan birpc.ClientConnector, 1) - iSessionSCh := make(chan birpc.ClientConnector, 1) - iChargerSCh := make(chan birpc.ClientConnector, 1) - iThresholdSCh := make(chan birpc.ClientConnector, 1) - iStatSCh := make(chan birpc.ClientConnector, 1) - iTrendSCh := make(chan birpc.ClientConnector, 1) - iRankingSCh := make(chan birpc.ClientConnector, 1) - iResourceSCh := make(chan birpc.ClientConnector, 1) - iRouteSCh := make(chan birpc.ClientConnector, 1) - iAdminSCh := make(chan birpc.ClientConnector, 1) - iLoaderSCh := make(chan birpc.ClientConnector, 1) - iEEsCh := make(chan birpc.ClientConnector, 1) - iRateSCh := make(chan birpc.ClientConnector, 1) - iActionSCh := make(chan birpc.ClientConnector, 1) - iAccountSCh := make(chan birpc.ClientConnector, 1) - iTpeSCh := make(chan birpc.ClientConnector, 1) - iEFsCh := make(chan birpc.ClientConnector, 1) - iERsCh := make(chan birpc.ClientConnector, 1) - - // initialize the connManager before creating the DMService - // because we need to pass the connection to it - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAnalyzerS), utils.AnalyzerSv1, iAnalyzerSCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAdminS), utils.AdminSv1, iAdminSCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes), utils.AttributeSv1, iAttributeSCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches), utils.CacheSv1, iCacheSCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs), utils.CDRsV1, iCDRServerCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers), utils.ChargerSv1, iChargerSCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian), utils.GuardianSv1, iGuardianSCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaLoaders), utils.LoaderSv1, iLoaderSCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources), utils.ResourceSv1, iResourceSCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS), utils.SessionSv1, iSessionSCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS), utils.SessionSv1, iSessionSCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats), utils.StatSv1, iStatSCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRankings), utils.RankingSv1, iRankingSCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTrends), utils.TrendSv1, iTrendSCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRoutes), utils.RouteSv1, iRouteSCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds), utils.ThresholdSv1, iThresholdSCh) connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaServiceManager), utils.ServiceManagerV1, iServeManagerCh) + iConfigCh := make(chan birpc.ClientConnector, 1) connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig), utils.ConfigSv1, iConfigCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore), utils.CoreSv1, iCoreSv1Ch) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs), utils.EeSv1, iEEsCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRates), utils.RateSv1, iRateSCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers), utils.DispatcherSv1, iDispatcherSCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts), utils.AccountSv1, iAccountSCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions), utils.ActionSv1, iActionSCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTpes), utils.TPeSv1, iTpeSCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEFs), utils.EfSv1, iEFsCh) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaERs), utils.ErSv1, iERsCh) + iGuardianSCh := make(chan birpc.ClientConnector, 1) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian), utils.GuardianSv1, iGuardianSCh) clsCh := make(chan *commonlisteners.CommonListenerS, 1) anzCh := make(chan *services.AnalyzerService, 1) @@ -229,23 +177,23 @@ func runCGREngine(fs []string) (err error) { dmS := services.NewDataDBService(cfg, connMgr, *flags.SetVersions, srvDep, srvIdxr) sdbS := services.NewStorDBService(cfg, *flags.SetVersions, srvDep, srvIdxr) cls := services.NewCommonListenerService(cfg, caps, clsCh, srvDep, srvIdxr) - anzS := services.NewAnalyzerService(cfg, clsCh, iFilterSCh, iAnalyzerSCh, anzCh, srvDep, srvIdxr) - coreS := services.NewCoreService(cfg, caps, clsCh, iCoreSv1Ch, anzCh, cpuPrfF, shdWg, srvDep, srvIdxr) - cacheS := services.NewCacheService(cfg, dmS, connMgr, clsCh, iCacheSCh, anzCh, coreS, srvDep, srvIdxr) - dspS := services.NewDispatcherService(cfg, dmS, cacheS, iFilterSCh, clsCh, iDispatcherSCh, connMgr, anzCh, srvDep, srvIdxr) - ldrs := services.NewLoaderService(cfg, dmS, iFilterSCh, clsCh, iLoaderSCh, connMgr, anzCh, srvDep, srvIdxr) - efs := services.NewExportFailoverService(cfg, connMgr, iEFsCh, clsCh, srvDep, srvIdxr) - adminS := services.NewAdminSv1Service(cfg, dmS, sdbS, iFilterSCh, clsCh, iAdminSCh, connMgr, anzCh, srvDep, srvIdxr) - sessionS := services.NewSessionService(cfg, dmS, iFilterSCh, clsCh, iSessionSCh, connMgr, anzCh, srvDep, srvIdxr) - attrS := services.NewAttributeService(cfg, dmS, cacheS, iFilterSCh, clsCh, iAttributeSCh, anzCh, dspS, srvDep, srvIdxr) - chrgS := services.NewChargerService(cfg, dmS, cacheS, iFilterSCh, clsCh, iChargerSCh, connMgr, anzCh, srvDep, srvIdxr) - routeS := services.NewRouteService(cfg, dmS, cacheS, iFilterSCh, clsCh, iRouteSCh, connMgr, anzCh, srvDep, srvIdxr) - resourceS := services.NewResourceService(cfg, dmS, cacheS, iFilterSCh, clsCh, iResourceSCh, connMgr, anzCh, srvDep, srvIdxr) - trendS := services.NewTrendService(cfg, dmS, cacheS, iFilterSCh, clsCh, iTrendSCh, connMgr, anzCh, srvDep, srvIdxr) - rankingS := services.NewRankingService(cfg, dmS, cacheS, iFilterSCh, clsCh, iRankingSCh, connMgr, anzCh, srvDep, srvIdxr) - thS := services.NewThresholdService(cfg, dmS, cacheS, iFilterSCh, connMgr, clsCh, iThresholdSCh, anzCh, srvDep, srvIdxr) - stS := services.NewStatService(cfg, dmS, cacheS, iFilterSCh, clsCh, iStatSCh, connMgr, anzCh, srvDep, srvIdxr) - erS := services.NewEventReaderService(cfg, iFilterSCh, connMgr, clsCh, iERsCh, anzCh, srvDep, srvIdxr) + anzS := services.NewAnalyzerService(cfg, clsCh, iFilterSCh, anzCh, srvDep, srvIdxr) + coreS := services.NewCoreService(cfg, caps, clsCh, anzCh, cpuPrfF, shdWg, srvDep, srvIdxr) + cacheS := services.NewCacheService(cfg, dmS, connMgr, clsCh, anzCh, coreS, srvDep, srvIdxr) + dspS := services.NewDispatcherService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, anzCh, srvDep, srvIdxr) + ldrs := services.NewLoaderService(cfg, dmS, iFilterSCh, clsCh, connMgr, anzCh, srvDep, srvIdxr) + efs := services.NewExportFailoverService(cfg, connMgr, clsCh, srvDep, srvIdxr) + adminS := services.NewAdminSv1Service(cfg, dmS, sdbS, iFilterSCh, clsCh, connMgr, anzCh, srvDep, srvIdxr) + sessionS := services.NewSessionService(cfg, dmS, iFilterSCh, clsCh, connMgr, anzCh, srvDep, srvIdxr) + attrS := services.NewAttributeService(cfg, dmS, cacheS, iFilterSCh, clsCh, anzCh, dspS, srvDep, srvIdxr) + chrgS := services.NewChargerService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, anzCh, srvDep, srvIdxr) + routeS := services.NewRouteService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, anzCh, srvDep, srvIdxr) + resourceS := services.NewResourceService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, anzCh, srvDep, srvIdxr) + trendS := services.NewTrendService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, anzCh, srvDep, srvIdxr) + rankingS := services.NewRankingService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, anzCh, srvDep, srvIdxr) + thS := services.NewThresholdService(cfg, dmS, cacheS, iFilterSCh, connMgr, clsCh, anzCh, srvDep, srvIdxr) + stS := services.NewStatService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, anzCh, srvDep, srvIdxr) + erS := services.NewEventReaderService(cfg, iFilterSCh, connMgr, clsCh, anzCh, srvDep, srvIdxr) dnsAgent := services.NewDNSAgent(cfg, iFilterSCh, connMgr, srvDep, srvIdxr) fsAgent := services.NewFreeswitchAgent(cfg, connMgr, srvDep, srvIdxr) kamAgent := services.NewKamailioAgent(cfg, connMgr, srvDep, srvIdxr) @@ -255,12 +203,12 @@ func runCGREngine(fs []string) (err error) { diamAgent := services.NewDiameterAgent(cfg, iFilterSCh, connMgr, caps, srvDep, srvIdxr) httpAgent := services.NewHTTPAgent(cfg, iFilterSCh, clsCh, connMgr, srvDep, srvIdxr) sipAgent := services.NewSIPAgent(cfg, iFilterSCh, connMgr, srvDep, srvIdxr) - eeS := services.NewEventExporterService(cfg, iFilterSCh, connMgr, clsCh, iEEsCh, anzCh, srvDep, srvIdxr) - cdrS := services.NewCDRServer(cfg, dmS, sdbS, iFilterSCh, clsCh, iCDRServerCh, connMgr, anzCh, srvDep, srvIdxr) + eeS := services.NewEventExporterService(cfg, iFilterSCh, connMgr, clsCh, anzCh, srvDep, srvIdxr) + cdrS := services.NewCDRServer(cfg, dmS, sdbS, iFilterSCh, clsCh, connMgr, anzCh, srvDep, srvIdxr) registrarcS := services.NewRegistrarCService(cfg, connMgr, srvDep, srvIdxr) - rateS := services.NewRateService(cfg, cacheS, iFilterSCh, dmS, clsCh, iRateSCh, anzCh, srvDep, srvIdxr) - actionS := services.NewActionService(cfg, dmS, cacheS, iFilterSCh, connMgr, clsCh, iActionSCh, anzCh, srvDep, srvIdxr) - accS := services.NewAccountService(cfg, dmS, cacheS, iFilterSCh, connMgr, clsCh, iAccountSCh, anzCh, srvDep, srvIdxr) + rateS := services.NewRateService(cfg, cacheS, iFilterSCh, dmS, clsCh, anzCh, srvDep, srvIdxr) + actionS := services.NewActionService(cfg, dmS, cacheS, iFilterSCh, connMgr, clsCh, anzCh, srvDep, srvIdxr) + accS := services.NewAccountService(cfg, dmS, cacheS, iFilterSCh, connMgr, clsCh, anzCh, srvDep, srvIdxr) tpeS := services.NewTPeService(cfg, connMgr, dmS, clsCh, srvDep, srvIdxr) srvManager := servmanager.NewServiceManager(shdWg, connMgr, cfg, srvIdxr, []servmanager.Service{ @@ -403,13 +351,13 @@ func runCGREngine(fs []string) (err error) { cgrInitConfigSv1(iConfigCh, cfg, clsCh, anzS) if *flags.Preload != utils.EmptyString { - if err = cgrRunPreload(ctx, cfg, *flags.Preload, ldrs); err != nil { + if err = cgrRunPreload(ctx, cfg, *flags.Preload, srvIdxr); err != nil { return } } // Serve rpc connections - cgrStartRPC(ctx, cancel, cfg, clsCh, iDispatcherSCh) + cgrStartRPC(ctx, cancel, cfg, clsCh, srvIdxr) // TODO: find a better location for this if block if *flags.MemPrfDir != "" { @@ -429,15 +377,14 @@ func runCGREngine(fs []string) (err error) { } func cgrRunPreload(ctx *context.Context, cfg *config.CGRConfig, loaderIDs string, - loader *services.LoaderService) (err error) { + sIdxr *servmanager.ServiceIndexer) (err error) { if !cfg.LoaderCfg().Enabled() { err = fmt.Errorf("<%s> not enabled but required by preload mechanism", utils.LoaderS) return } - ch := loader.GetRPCChan() + loader := sIdxr.GetService(utils.LoaderS).(*services.LoaderService) select { - case ldrs := <-ch: - ch <- ldrs + case <-loader.StateChan(utils.StateServiceUP): case <-ctx.Done(): return } @@ -520,13 +467,12 @@ func cgrInitConfigSv1(iConfigCh chan birpc.ClientConnector, } func cgrStartRPC(ctx *context.Context, shtdwnEngine context.CancelFunc, - cfg *config.CGRConfig, clSChan chan *commonlisteners.CommonListenerS, internalDispatcherSChan chan birpc.ClientConnector) { + cfg *config.CGRConfig, clSChan chan *commonlisteners.CommonListenerS, sIdxr *servmanager.ServiceIndexer) { cl := <-clSChan clSChan <- cl if cfg.DispatcherSCfg().Enabled { // wait only for dispatcher as cache is allways registered before this select { - case dispatcherS := <-internalDispatcherSChan: - internalDispatcherSChan <- dispatcherS + case <-sIdxr.GetService(utils.DispatcherS).StateChan(utils.StateServiceUP): case <-ctx.Done(): return } diff --git a/services/accounts.go b/services/accounts.go index 756728539..0bc124810 100644 --- a/services/accounts.go +++ b/services/accounts.go @@ -38,11 +38,9 @@ import ( func NewAccountService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS, - internalChan chan birpc.ClientConnector, anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &AccountService{ - connChan: internalChan, cfg: cfg, dm: dm, cacheS: cacheS, @@ -72,7 +70,6 @@ type AccountService struct { rldChan chan struct{} stopChan chan struct{} - connChan chan birpc.ClientConnector // publish the internal Subsystem when available connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup @@ -123,7 +120,6 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e } acts.intRPCconn = anz.GetInternalCodec(srv, utils.AccountS) - acts.connChan <- acts.intRPCconn close(acts.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -140,7 +136,6 @@ func (acts *AccountService) Shutdown() (err error) { close(acts.stopChan) acts.acts.Shutdown() acts.acts = nil - <-acts.connChan acts.Unlock() acts.cl.RpcUnregisterName(utils.AccountSv1) return diff --git a/services/actions.go b/services/actions.go index b820cd87c..123a567f7 100644 --- a/services/actions.go +++ b/services/actions.go @@ -38,11 +38,10 @@ import ( func NewActionService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, - clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ActionService{ - connChan: internalChan, connMgr: connMgr, cfg: cfg, dm: dm, @@ -73,10 +72,9 @@ type ActionService struct { rldChan chan struct{} stopChan chan struct{} - connChan chan birpc.ClientConnector // publish the internal Subsystem when available - connMgr *engine.ConnManager - cfg *config.CGRConfig - srvDep map[string]*sync.WaitGroup + connMgr *engine.ConnManager + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup intRPCconn birpc.ClientConnector // share the API object implementing API calls for internal srvIndexer *servmanager.ServiceIndexer // access directly services from here @@ -124,7 +122,6 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er } acts.intRPCconn = anz.GetInternalCodec(srv, utils.ActionS) - acts.connChan <- acts.intRPCconn close(acts.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -142,7 +139,6 @@ func (acts *ActionService) Shutdown() (err error) { close(acts.stopChan) acts.acts.Shutdown() acts.acts = nil - <-acts.connChan acts.cl.RpcUnregisterName(utils.ActionSv1) return } diff --git a/services/adminsv1.go b/services/adminsv1.go index 29776bc9a..d5b49b7e2 100644 --- a/services/adminsv1.go +++ b/services/adminsv1.go @@ -35,12 +35,10 @@ import ( func NewAdminSv1Service(cfg *config.CGRConfig, dm *DataDBService, storDB *StorDBService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, - internalAPIerSv1Chan chan birpc.ClientConnector, connMgr *engine.ConnManager, anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &AdminSv1Service{ - connChan: internalAPIerSv1Chan, cfg: cfg, dm: dm, storDB: storDB, @@ -68,7 +66,6 @@ type AdminSv1Service struct { cl *commonlisteners.CommonListenerS stopChan chan struct{} - connChan chan birpc.ClientConnector connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup @@ -124,7 +121,6 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF //backwards compatible apiService.intRPCconn = anz.GetInternalCodec(srv, utils.AdminSv1) - apiService.connChan <- apiService.intRPCconn close(apiService.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -139,7 +135,6 @@ func (apiService *AdminSv1Service) Shutdown() (err error) { apiService.Lock() // close(apiService.stopChan) apiService.api = nil - <-apiService.connChan apiService.cl.RpcUnregisterName(utils.AdminSv1) apiService.Unlock() return diff --git a/services/analyzers.go b/services/analyzers.go index 2f3ec184d..a17f17e2b 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -35,12 +35,10 @@ import ( // NewAnalyzerService returns the Analyzer Service func NewAnalyzerService(cfg *config.CGRConfig, clSChan chan *commonlisteners.CommonListenerS, filterSChan chan *engine.FilterS, - internalAnalyzerSChan chan birpc.ClientConnector, anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) *AnalyzerService { return &AnalyzerService{ - connChan: internalAnalyzerSChan, cfg: cfg, clSChan: clSChan, filterSChan: filterSChan, @@ -63,7 +61,6 @@ type AnalyzerService struct { cl *commonlisteners.CommonListenerS cancelFunc context.CancelFunc - connChan chan birpc.ClientConnector cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup @@ -106,7 +103,6 @@ func (anz *AnalyzerService) Start(ctx *context.Context, shtDwn context.CancelFun func (anz *AnalyzerService) start(ctx *context.Context) { fS, err := waitForFilterS(ctx, anz.filterSChan) if err != nil { - anz.connChan <- nil return } @@ -124,7 +120,6 @@ func (anz *AnalyzerService) start(ctx *context.Context) { } } anz.Unlock() - anz.connChan <- anz.GetInternalCodec(srv, utils.AnalyzerS) } // Reload handles the change of config @@ -145,7 +140,6 @@ func (anz *AnalyzerService) Shutdown() (err error) { anz.anzChan = nil anz.anz.Shutdown() anz.anz = nil - <-anz.connChan anz.Unlock() anz.cl.RpcUnregisterName(utils.AnalyzerSv1) return diff --git a/services/attributes.go b/services/attributes.go index a881316c2..0803a197c 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -35,11 +35,10 @@ import ( // NewAttributeService returns the Attribute Service func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, anzChan chan *AnalyzerService, dspS *DispatcherService, srvDep map[string]*sync.WaitGroup, sIndxr *servmanager.ServiceIndexer) servmanager.Service { return &AttributeService{ - connChan: internalChan, cfg: cfg, dm: dm, cacheS: cacheS, @@ -68,9 +67,8 @@ type AttributeService struct { cl *commonlisteners.CommonListenerS rpc *apis.AttributeSv1 // useful on restart - connChan chan birpc.ClientConnector // publish the internal Subsystem when available - cfg *config.CGRConfig - srvDep map[string]*sync.WaitGroup + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup intRPCconn birpc.ClientConnector // expose API methods over internal connection serviceIndexer *servmanager.ServiceIndexer // access directly services from here @@ -131,7 +129,6 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc) }() attrS.intRPCconn = anz.GetInternalCodec(srv, utils.AttributeS) - attrS.connChan <- attrS.intRPCconn close(attrS.stateDeps.StateChan(utils.StateServiceUP)) // inform listeners about the service reaching UP state return } @@ -147,7 +144,6 @@ func (attrS *AttributeService) Shutdown() (err error) { attrS.attrS.Shutdown() attrS.attrS = nil attrS.rpc = nil - <-attrS.connChan attrS.cl.RpcUnregisterName(utils.AttributeSv1) attrS.dspS.UnregisterShutdownChan(attrS.ServiceName()) attrS.Unlock() diff --git a/services/caches.go b/services/caches.go index a17ce552e..04247bb51 100644 --- a/services/caches.go +++ b/services/caches.go @@ -33,7 +33,7 @@ import ( // NewCacheService . func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.ConnManager, - clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, anzChan chan *AnalyzerService, // dspS *DispatcherService, cores *CoreService, srvDep map[string]*sync.WaitGroup, @@ -46,7 +46,6 @@ func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.C clSChan: clSChan, dm: dm, connMgr: connMgr, - rpc: internalChan, cacheCh: make(chan *engine.CacheS, 1), srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -63,7 +62,6 @@ type CacheService struct { cl *commonlisteners.CommonListenerS cacheCh chan *engine.CacheS - rpc chan birpc.ClientConnector connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup @@ -101,7 +99,6 @@ func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (e } } cS.intRPCconn = anz.GetInternalCodec(srv, utils.CacheS) - cS.rpc <- cS.intRPCconn close(cS.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/cdrs.go b/services/cdrs.go index 11fb1995d..635446689 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -36,12 +36,11 @@ import ( // NewCDRServer returns the CDR Server func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, storDB *StorDBService, filterSChan chan *engine.FilterS, - clSChan chan *commonlisteners.CommonListenerS, internalCDRServerChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &CDRService{ - connChan: internalCDRServerChan, cfg: cfg, dm: dm, storDB: storDB, @@ -68,7 +67,6 @@ type CDRService struct { cdrS *cdrs.CDRServer cl *commonlisteners.CommonListenerS - connChan chan birpc.ClientConnector stopChan chan struct{} connMgr *engine.ConnManager cfg *config.CGRConfig @@ -120,7 +118,6 @@ func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err err } cs.intRPCconn = anz.GetInternalCodec(srv, utils.CDRServer) - cs.connChan <- cs.intRPCconn // Signal that cdrS is operational close(cs.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -135,7 +132,6 @@ func (cs *CDRService) Shutdown() (err error) { cs.Lock() close(cs.stopChan) cs.cdrS = nil - <-cs.connChan cs.Unlock() cs.cl.RpcUnregisterName(utils.CDRsV1) return diff --git a/services/chargers.go b/services/chargers.go index 9d01f5b7c..4c01429a2 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -35,11 +35,10 @@ import ( // NewChargerService returns the Charger Service func NewChargerService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, - internalChargerSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, + connMgr *engine.ConnManager, anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ChargerService{ - connChan: internalChargerSChan, cfg: cfg, dm: dm, cacheS: cacheS, @@ -66,10 +65,9 @@ type ChargerService struct { chrS *engine.ChargerS cl *commonlisteners.CommonListenerS - connChan chan birpc.ClientConnector - connMgr *engine.ConnManager - cfg *config.CGRConfig - srvDep map[string]*sync.WaitGroup + connMgr *engine.ConnManager + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here @@ -113,7 +111,6 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e } chrS.intRPCconn = anz.GetInternalCodec(srv, utils.ChargerS) - chrS.connChan <- chrS.intRPCconn close(chrS.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -129,7 +126,6 @@ func (chrS *ChargerService) Shutdown() (err error) { defer chrS.Unlock() chrS.chrS.Shutdown() chrS.chrS = nil - <-chrS.connChan chrS.cl.RpcUnregisterName(utils.ChargerSv1) return } diff --git a/services/cores.go b/services/cores.go index d10380970..1a42c47f8 100644 --- a/services/cores.go +++ b/services/cores.go @@ -35,13 +35,12 @@ import ( // NewCoreService returns the Core Service func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *commonlisteners.CommonListenerS, - internalCoreSChan chan birpc.ClientConnector, anzChan chan *AnalyzerService, + anzChan chan *AnalyzerService, fileCPU *os.File, shdWg *sync.WaitGroup, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) *CoreService { return &CoreService{ shdWg: shdWg, - connChan: internalCoreSChan, cfg: cfg, caps: caps, fileCPU: fileCPU, @@ -69,7 +68,6 @@ type CoreService struct { csCh chan *cores.CoreS stopChan chan struct{} shdWg *sync.WaitGroup - connChan chan birpc.ClientConnector cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup @@ -106,7 +104,6 @@ func (cS *CoreService) Start(ctx *context.Context, shtDw context.CancelFunc) err } cS.intRPCconn = anz.GetInternalCodec(srv, utils.CoreS) - cS.connChan <- cS.intRPCconn close(cS.stateDeps.StateChan(utils.StateServiceUP)) return nil } @@ -125,7 +122,6 @@ func (cS *CoreService) Shutdown() error { cS.cS.StopCPUProfiling() cS.cS.StopMemoryProfiling() cS.cS = nil - <-cS.connChan <-cS.csCh cS.cl.RpcUnregisterName(utils.CoreSv1) return nil diff --git a/services/dispatchers.go b/services/dispatchers.go index fe382282e..6ec37d9d1 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -34,12 +34,11 @@ import ( // NewDispatcherService returns the Dispatcher Service func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) *DispatcherService { return &DispatcherService{ - connChan: internalChan, cfg: cfg, dm: dm, cacheS: cacheS, @@ -67,7 +66,6 @@ type DispatcherService struct { dspS *dispatchers.DispatcherService cl *commonlisteners.CommonListenerS - connChan chan birpc.ClientConnector connMgr *engine.ConnManager cfg *config.CGRConfig srvsReload map[string]chan struct{} @@ -120,7 +118,6 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc) // until we figured out a better sollution in case of gob server // dspS.server.SetDispatched() dspS.intRPCconn = anz.GetInternalCodec(srv, utils.DispatcherS) - dspS.connChan <- dspS.intRPCconn close(dspS.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -136,7 +133,6 @@ func (dspS *DispatcherService) Shutdown() (err error) { defer dspS.Unlock() dspS.dspS.Shutdown() dspS.dspS = nil - <-dspS.connChan dspS.cl.RpcUnregisterName(utils.DispatcherSv1) dspS.cl.RpcUnregisterName(utils.AttributeSv1) diff --git a/services/ees.go b/services/ees.go index 683b4ccd6..f7ba8e7f7 100644 --- a/services/ees.go +++ b/services/ees.go @@ -34,7 +34,7 @@ import ( // NewEventExporterService constructs EventExporterService func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS, intConnChan chan birpc.ClientConnector, + connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS, anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &EventExporterService{ @@ -42,7 +42,6 @@ func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.Fil filterSChan: filterSChan, connMgr: connMgr, clSChan: clSChan, - intConnChan: intConnChan, anzChan: anzChan, srvDep: srvDep, srvIndexer: srvIndexer, @@ -61,10 +60,9 @@ type EventExporterService struct { eeS *ees.EeS cl *commonlisteners.CommonListenerS - intConnChan chan birpc.ClientConnector - connMgr *engine.ConnManager - cfg *config.CGRConfig - srvDep map[string]*sync.WaitGroup + connMgr *engine.ConnManager + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here @@ -103,7 +101,6 @@ func (es *EventExporterService) Shutdown() error { utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.EEs)) es.eeS.ClearExporterCache() es.eeS = nil - <-es.intConnChan es.cl.RpcUnregisterName(utils.EeSv1) return nil } @@ -140,7 +137,6 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc } es.intRPCconn = anz.GetInternalCodec(srv, utils.EEs) - es.intConnChan <- es.intRPCconn close(es.stateDeps.StateChan(utils.StateServiceUP)) return nil } diff --git a/services/efs.go b/services/efs.go index 7534a9c57..c8c86b39a 100644 --- a/services/efs.go +++ b/services/efs.go @@ -43,11 +43,10 @@ type ExportFailoverService struct { cl *commonlisteners.CommonListenerS srv *birpc.Service - stopChan chan struct{} - intConnChan chan birpc.ClientConnector - connMgr *engine.ConnManager - cfg *config.CGRConfig - srvDep map[string]*sync.WaitGroup + stopChan chan struct{} + connMgr *engine.ConnManager + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here @@ -56,18 +55,16 @@ type ExportFailoverService struct { // NewExportFailoverService is the constructor for the TpeService func NewExportFailoverService(cfg *config.CGRConfig, connMgr *engine.ConnManager, - intConnChan chan birpc.ClientConnector, clSChan chan *commonlisteners.CommonListenerS, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) *ExportFailoverService { return &ExportFailoverService{ - cfg: cfg, - clSChan: clSChan, - connMgr: connMgr, - intConnChan: intConnChan, - srvDep: srvDep, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + clSChan: clSChan, + connMgr: connMgr, + srvDep: srvDep, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } diff --git a/services/ers.go b/services/ers.go index d819d8fc2..2427fce0e 100644 --- a/services/ers.go +++ b/services/ers.go @@ -38,7 +38,6 @@ func NewEventReaderService( filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS, - intConn chan birpc.ClientConnector, anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { @@ -48,7 +47,6 @@ func NewEventReaderService( filterSChan: filterSChan, connMgr: connMgr, clSChan: clSChan, - intConn: intConn, anzChan: anzChan, srvDep: srvDep, srvIndexer: srvIndexer, @@ -69,7 +67,6 @@ type EventReaderService struct { rldChan chan struct{} stopChan chan struct{} - intConn chan birpc.ClientConnector connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup @@ -114,7 +111,6 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel erS.cl.RpcRegister(srv) } erS.intRPCconn = anz.GetInternalCodec(srv, utils.ERs) - erS.intConn <- erS.intRPCconn close(erS.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/loaders.go b/services/loaders.go index 357cd5685..30fe0073b 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -35,12 +35,10 @@ import ( // NewLoaderService returns the Loader Service func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, - internalLoaderSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) *LoaderService { return &LoaderService{ - connChan: internalLoaderSChan, cfg: cfg, dm: dm, filterSChan: filterSChan, @@ -67,7 +65,6 @@ type LoaderService struct { cl *commonlisteners.CommonListenerS stopChan chan struct{} - connChan chan birpc.ClientConnector connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup @@ -115,7 +112,6 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er } } ldrs.intRPCconn = anz.GetInternalCodec(srv, utils.LoaderS) - ldrs.connChan <- ldrs.intRPCconn close(ldrs.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -145,7 +141,6 @@ func (ldrs *LoaderService) Shutdown() (_ error) { ldrs.Lock() ldrs.ldrs = nil close(ldrs.stopChan) - <-ldrs.connChan ldrs.cl.RpcUnregisterName(utils.LoaderSv1) ldrs.Unlock() return @@ -173,11 +168,6 @@ func (ldrs *LoaderService) GetLoaderS() *loaders.LoaderS { return ldrs.ldrs } -// GetRPCChan returns the conn chan -func (ldrs *LoaderService) GetRPCChan() chan birpc.ClientConnector { - return ldrs.connChan -} - // StateChan returns signaling channel of specific state func (ldrs *LoaderService) StateChan(stateID string) chan struct{} { return ldrs.stateDeps.StateChan(stateID) diff --git a/services/rankings.go b/services/rankings.go index 50549a76b..4e335fe64 100644 --- a/services/rankings.go +++ b/services/rankings.go @@ -35,12 +35,11 @@ import ( // NewRankingService returns the RankingS Service func NewRankingService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - clSChan chan *commonlisteners.CommonListenerS, internalRankingSChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &RankingService{ - connChan: internalRankingSChan, cfg: cfg, dm: dm, cacheS: cacheS, @@ -66,10 +65,9 @@ type RankingService struct { ran *engine.RankingS cl *commonlisteners.CommonListenerS - connChan chan birpc.ClientConnector - connMgr *engine.ConnManager - cfg *config.CGRConfig - srvDep map[string]*sync.WaitGroup + connMgr *engine.ConnManager + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here @@ -121,7 +119,6 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er } } ran.intRPCconn = anz.GetInternalCodec(srv, utils.RankingS) - ran.connChan <- ran.intRPCconn close(ran.stateDeps.StateChan(utils.StateServiceUP)) return nil } @@ -141,7 +138,6 @@ func (ran *RankingService) Shutdown() (err error) { defer ran.Unlock() ran.ran.StopRankingS() ran.ran = nil - <-ran.connChan ran.cl.RpcUnregisterName(utils.RankingSv1) return } diff --git a/services/rates.go b/services/rates.go index b3817aca2..25d7562b8 100644 --- a/services/rates.go +++ b/services/rates.go @@ -35,7 +35,7 @@ import ( func NewRateService(cfg *config.CGRConfig, cacheS *CacheService, filterSChan chan *engine.FilterS, dmS *DataDBService, clSChan chan *commonlisteners.CommonListenerS, - intConnChan chan birpc.ClientConnector, anzChan chan *AnalyzerService, + anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &RateService{ @@ -44,7 +44,6 @@ func NewRateService(cfg *config.CGRConfig, filterSChan: filterSChan, dmS: dmS, clSChan: clSChan, - intConnChan: intConnChan, rldChan: make(chan struct{}), anzChan: anzChan, srvDep: srvDep, @@ -66,11 +65,10 @@ type RateService struct { rateS *rates.RateS cl *commonlisteners.CommonListenerS - rldChan chan struct{} - stopChan chan struct{} - intConnChan chan birpc.ClientConnector - cfg *config.CGRConfig - srvDep map[string]*sync.WaitGroup + rldChan chan struct{} + stopChan chan struct{} + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here @@ -107,7 +105,6 @@ func (rs *RateService) Shutdown() (err error) { close(rs.stopChan) rs.rateS.Shutdown() //we don't verify the error because shutdown never returns an err rs.rateS = nil - <-rs.intConnChan rs.cl.RpcUnregisterName(utils.RateSv1) return } @@ -154,7 +151,6 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er } rs.intRPCconn = anz.GetInternalCodec(srv, utils.RateS) - rs.intConnChan <- rs.intRPCconn close(rs.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/resources.go b/services/resources.go index 9cc12798d..d512ac7ad 100644 --- a/services/resources.go +++ b/services/resources.go @@ -34,12 +34,11 @@ import ( // NewResourceService returns the Resource Service func NewResourceService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - clSChan chan *commonlisteners.CommonListenerS, internalResourceSChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ResourceService{ - connChan: internalResourceSChan, cfg: cfg, dm: dm, cacheS: cacheS, @@ -66,10 +65,9 @@ type ResourceService struct { reS *engine.ResourceS cl *commonlisteners.CommonListenerS - connChan chan birpc.ClientConnector - connMgr *engine.ConnManager - cfg *config.CGRConfig - srvDep map[string]*sync.WaitGroup + connMgr *engine.ConnManager + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here @@ -116,7 +114,6 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e } reS.intRPCconn = anz.GetInternalCodec(srv, utils.ResourceS) - reS.connChan <- reS.intRPCconn close(reS.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -136,7 +133,6 @@ func (reS *ResourceService) Shutdown() (err error) { defer reS.Unlock() reS.reS.Shutdown(context.TODO()) //we don't verify the error because shutdown never returns an error reS.reS = nil - <-reS.connChan reS.cl.RpcUnregisterName(utils.ResourceSv1) return } diff --git a/services/routes.go b/services/routes.go index 84cdd374f..6a8295160 100644 --- a/services/routes.go +++ b/services/routes.go @@ -35,12 +35,11 @@ import ( // NewRouteService returns the Route Service func NewRouteService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - clSChan chan *commonlisteners.CommonListenerS, internalRouteSChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &RouteService{ - connChan: internalRouteSChan, cfg: cfg, dm: dm, cacheS: cacheS, @@ -67,10 +66,9 @@ type RouteService struct { routeS *engine.RouteS cl *commonlisteners.CommonListenerS - connChan chan birpc.ClientConnector - connMgr *engine.ConnManager - cfg *config.CGRConfig - srvDep map[string]*sync.WaitGroup + connMgr *engine.ConnManager + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here @@ -114,7 +112,6 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e } } routeS.intRPCconn = anz.GetInternalCodec(srv, utils.RouteS) - routeS.connChan <- routeS.intRPCconn close(routeS.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -130,7 +127,6 @@ func (routeS *RouteService) Shutdown() (err error) { defer routeS.Unlock() routeS.routeS.Shutdown() //we don't verify the error because shutdown never returns an error routeS.routeS = nil - <-routeS.connChan routeS.cl.RpcUnregisterName(utils.RouteSv1) return } diff --git a/services/sessions.go b/services/sessions.go index 1e464ec0e..86163fa0f 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -36,12 +36,11 @@ import ( // NewSessionService returns the Session Service func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, - clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &SessionService{ - connChan: internalChan, cfg: cfg, dm: dm, filterSChan: filterSChan, @@ -68,7 +67,6 @@ type SessionService struct { bircpEnabled bool // to stop birpc server if needed stopChan chan struct{} - connChan chan birpc.ClientConnector connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup @@ -114,7 +112,6 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc) smg.cl.RpcRegister(s) } } - smg.connChan <- anz.GetInternalCodec(srv, utils.SessionS) // Register BiRpc handlers if smg.cfg.SessionSCfg().ListenBijson != utils.EmptyString { smg.bircpEnabled = true @@ -158,7 +155,6 @@ func (smg *SessionService) Shutdown() (err error) { smg.bircpEnabled = false } smg.sm = nil - <-smg.connChan smg.cl.RpcUnregisterName(utils.SessionSv1) // smg.server.BiRPCUnregisterName(utils.SessionSv1) return diff --git a/services/stats.go b/services/stats.go index 6419e6e4b..06b613347 100644 --- a/services/stats.go +++ b/services/stats.go @@ -34,12 +34,11 @@ import ( // NewStatService returns the Stat Service func NewStatService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - clSChan chan *commonlisteners.CommonListenerS, internalStatSChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &StatService{ - connChan: internalStatSChan, cfg: cfg, dm: dm, cacheS: cacheS, @@ -66,10 +65,9 @@ type StatService struct { sts *engine.StatS cl *commonlisteners.CommonListenerS - connChan chan birpc.ClientConnector - connMgr *engine.ConnManager - cfg *config.CGRConfig - srvDep map[string]*sync.WaitGroup + connMgr *engine.ConnManager + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here @@ -117,7 +115,6 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e } } sts.intRPCconn = anz.GetInternalCodec(srv, utils.StatS) - sts.connChan <- sts.intRPCconn close(sts.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -137,7 +134,6 @@ func (sts *StatService) Shutdown() (err error) { defer sts.Unlock() sts.sts.Shutdown(context.TODO()) sts.sts = nil - <-sts.connChan sts.cl.RpcUnregisterName(utils.StatSv1) return } diff --git a/services/thresholds.go b/services/thresholds.go index 546c2814c..279c01eb7 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -35,11 +35,10 @@ import ( func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, - clSChan chan *commonlisteners.CommonListenerS, internalThresholdSChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ThresholdService{ - connChan: internalThresholdSChan, cfg: cfg, dm: dm, cacheS: cacheS, @@ -66,10 +65,9 @@ type ThresholdService struct { thrs *engine.ThresholdS cl *commonlisteners.CommonListenerS - connChan chan birpc.ClientConnector - connMgr *engine.ConnManager - cfg *config.CGRConfig - srvDep map[string]*sync.WaitGroup + connMgr *engine.ConnManager + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here @@ -116,7 +114,6 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc) } } thrs.intRPCconn = anz.GetInternalCodec(srv, utils.ThresholdS) - thrs.connChan <- thrs.intRPCconn close(thrs.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -136,7 +133,6 @@ func (thrs *ThresholdService) Shutdown() (_ error) { defer thrs.Unlock() thrs.thrs.Shutdown(context.TODO()) thrs.thrs = nil - <-thrs.connChan thrs.cl.RpcUnregisterName(utils.ThresholdSv1) return } diff --git a/services/trends.go b/services/trends.go index b67dd2838..41baf00f3 100644 --- a/services/trends.go +++ b/services/trends.go @@ -34,12 +34,11 @@ import ( // NewTrendsService returns the TrendS Service func NewTrendService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - clSChan chan *commonlisteners.CommonListenerS, internalTrendSChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &TrendService{ - connChan: internalTrendSChan, cfg: cfg, dm: dm, cacheS: cacheS, @@ -65,10 +64,9 @@ type TrendService struct { trs *engine.TrendS cl *commonlisteners.CommonListenerS - connChan chan birpc.ClientConnector - connMgr *engine.ConnManager - cfg *config.CGRConfig - srvDep map[string]*sync.WaitGroup + connMgr *engine.ConnManager + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here @@ -118,7 +116,6 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err } } trs.intRPCconn = anz.GetInternalCodec(srv, utils.Trends) - trs.connChan <- trs.intRPCconn close(trs.stateDeps.StateChan(utils.StateServiceUP)) return nil } @@ -138,7 +135,6 @@ func (trs *TrendService) Shutdown() (err error) { defer trs.Unlock() trs.trs.StopTrendS() trs.trs = nil - <-trs.connChan trs.cl.RpcUnregisterName(utils.TrendSv1) return }