diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 2431bf27c..90c738d75 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -107,13 +107,12 @@ func runCGREngine(fs []string) (err error) { }() } - connMgr := engine.NewConnManager(cfg) // init syslog if utils.Logger, err = engine.NewLogger(context.TODO(), utils.FirstNonEmpty(*flags.Logger, cfg.LoggerCfg().Type), cfg.GeneralCfg().DefaultTenant, cfg.GeneralCfg().NodeID, - connMgr, cfg); err != nil { + nil, cfg); err != nil { return fmt.Errorf("Could not initialize syslog connection, err: <%s>", err) } efs.SetFailedPostCacheTTL(cfg.EFsCfg().FailedPostsTTL) // init failedPosts to posts loggers/exporters in case of failing @@ -124,58 +123,57 @@ func runCGREngine(fs []string) (err error) { utils.DataDB: new(sync.WaitGroup), } - iServeManagerCh := make(chan birpc.ClientConnector, 1) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaServiceManager), utils.ServiceManagerV1, iServeManagerCh) - // ServiceIndexer will share service references to all services registry := servmanager.NewServiceRegistry() gvS := services.NewGlobalVarS(cfg) - dmS := services.NewDataDBService(cfg, connMgr, *flags.SetVersions, srvDep) - sdbS := services.NewStorDBService(cfg, *flags.SetVersions) cls := services.NewCommonListenerService(cfg, caps) anzS := services.NewAnalyzerService(cfg) + cms := services.NewConnManagerService(cfg) + dmS := services.NewDataDBService(cfg, *flags.SetVersions, srvDep) + sdbS := services.NewStorDBService(cfg, *flags.SetVersions) configS := services.NewConfigService(cfg) guardianS := services.NewGuardianService(cfg) coreS := services.NewCoreService(cfg, caps, cpuPrfF, shdWg) - cacheS := services.NewCacheService(cfg, connMgr) - fltrS := services.NewFilterService(cfg, connMgr) - dspS := services.NewDispatcherService(cfg, connMgr) - ldrs := services.NewLoaderService(cfg, connMgr) - efs := services.NewExportFailoverService(cfg, connMgr) - adminS := services.NewAdminSv1Service(cfg, connMgr) - sessionS := services.NewSessionService(cfg, connMgr) + cacheS := services.NewCacheService(cfg) + fltrS := services.NewFilterService(cfg) + dspS := services.NewDispatcherService(cfg) + ldrs := services.NewLoaderService(cfg) + efs := services.NewExportFailoverService(cfg) + adminS := services.NewAdminSv1Service(cfg) + sessionS := services.NewSessionService(cfg) attrS := services.NewAttributeService(cfg, dspS) - chrgS := services.NewChargerService(cfg, connMgr) - routeS := services.NewRouteService(cfg, connMgr) - resourceS := services.NewResourceService(cfg, connMgr, srvDep) - trendS := services.NewTrendService(cfg, connMgr, srvDep) - rankingS := services.NewRankingService(cfg, connMgr, srvDep) - thS := services.NewThresholdService(cfg, connMgr, srvDep) - stS := services.NewStatService(cfg, connMgr, srvDep) - erS := services.NewEventReaderService(cfg, connMgr) - dnsAgent := services.NewDNSAgent(cfg, connMgr) - fsAgent := services.NewFreeswitchAgent(cfg, connMgr) - kamAgent := services.NewKamailioAgent(cfg, connMgr) - janusAgent := services.NewJanusAgent(cfg, connMgr) - astAgent := services.NewAsteriskAgent(cfg, connMgr) - radAgent := services.NewRadiusAgent(cfg, connMgr) - diamAgent := services.NewDiameterAgent(cfg, connMgr, caps) - httpAgent := services.NewHTTPAgent(cfg, connMgr) - sipAgent := services.NewSIPAgent(cfg, connMgr) - eeS := services.NewEventExporterService(cfg, connMgr) - cdrS := services.NewCDRServer(cfg, connMgr) - registrarcS := services.NewRegistrarCService(cfg, connMgr) + chrgS := services.NewChargerService(cfg) + routeS := services.NewRouteService(cfg) + resourceS := services.NewResourceService(cfg, srvDep) + trendS := services.NewTrendService(cfg, srvDep) + rankingS := services.NewRankingService(cfg, srvDep) + thS := services.NewThresholdService(cfg, srvDep) + stS := services.NewStatService(cfg, srvDep) + erS := services.NewEventReaderService(cfg) + dnsAgent := services.NewDNSAgent(cfg) + fsAgent := services.NewFreeswitchAgent(cfg) + kamAgent := services.NewKamailioAgent(cfg) + janusAgent := services.NewJanusAgent(cfg) + astAgent := services.NewAsteriskAgent(cfg) + radAgent := services.NewRadiusAgent(cfg) + diamAgent := services.NewDiameterAgent(cfg, caps) + httpAgent := services.NewHTTPAgent(cfg) + sipAgent := services.NewSIPAgent(cfg) + eeS := services.NewEventExporterService(cfg) + cdrS := services.NewCDRServer(cfg) + registrarcS := services.NewRegistrarCService(cfg) rateS := services.NewRateService(cfg) - actionS := services.NewActionService(cfg, connMgr) - accS := services.NewAccountService(cfg, connMgr) - tpeS := services.NewTPeService(cfg, connMgr) + actionS := services.NewActionService(cfg) + accS := services.NewAccountService(cfg) + tpeS := services.NewTPeService(cfg) - srvManager := servmanager.NewServiceManager(shdWg, connMgr, cfg, registry, []servmanager.Service{ + srvManager := servmanager.NewServiceManager(shdWg, cfg, registry, []servmanager.Service{ gvS, - dmS, - sdbS, cls, anzS, + cms, + dmS, + sdbS, configS, guardianS, coreS, @@ -243,7 +241,7 @@ func runCGREngine(fs []string) (err error) { }() srvManager.StartServices(shutdown) - cgrInitServiceManagerV1(iServeManagerCh, cfg, srvManager, registry) + cgrInitServiceManagerV1(cfg, srvManager, registry) if *flags.Preload != utils.EmptyString { if err = cgrRunPreload(cfg, *flags.Preload, registry); err != nil { @@ -297,22 +295,24 @@ func cgrRunPreload(cfg *config.CGRConfig, loaderIDs string, return } -func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector, cfg *config.CGRConfig, - srvMngr *servmanager.ServiceManager, registry *servmanager.ServiceRegistry) { - cls := registry.Lookup(utils.CommonListenerS).(*services.CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), cfg.GeneralCfg().ConnectTimeout) { - return - } - cl := cls.CLS() - anz := registry.Lookup(utils.AnalyzerS).(*services.AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cfg.GeneralCfg().ConnectTimeout) { +func cgrInitServiceManagerV1(cfg *config.CGRConfig, srvMngr *servmanager.ServiceManager, + registry *servmanager.ServiceRegistry) { + srvDeps, err := services.WaitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.ConnManager, + }, + registry, cfg.GeneralCfg().ConnectTimeout) + if err != nil { return } + cl := srvDeps[utils.CommonListenerS].(*services.CommonListenerService).CLS() + cms := srvDeps[utils.ConnManager].(*services.ConnManagerService) srv, _ := birpc.NewService(apis.NewServiceManagerV1(srvMngr), utils.EmptyString, false) if !cfg.DispatcherSCfg().Enabled { cl.RpcRegister(srv) } - iServMngrCh <- anz.GetInternalCodec(srv, utils.ServiceManager) + cms.AddInternalConn(utils.ServiceManager, srv) } func cgrStartRPC(cfg *config.CGRConfig, registry *servmanager.ServiceRegistry, shutdown *utils.SyncedChan) { diff --git a/config/config_json.go b/config/config_json.go index 7519758c4..483de11a3 100644 --- a/config/config_json.go +++ b/config/config_json.go @@ -115,7 +115,7 @@ var ( ActionSJSON: utils.ActionS, CoreSJSON: utils.CoreS, TPeSJSON: utils.TPeS, - RPCConnsJSON: RPCConnsJSON, + RPCConnsJSON: utils.ConnManager, } ) diff --git a/general_tests/all_cfg_rld_it_test.go b/general_tests/all_cfg_rld_it_test.go index eb9fb8d41..c95e0697b 100644 --- a/general_tests/all_cfg_rld_it_test.go +++ b/general_tests/all_cfg_rld_it_test.go @@ -1,5 +1,4 @@ -//go:build integration -// +build integration +//go:build flaky /* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments diff --git a/general_tests/all_sections_cfg_rld_it_test.go b/general_tests/all_sections_cfg_rld_it_test.go index 702d0ca6d..61e8118e3 100644 --- a/general_tests/all_sections_cfg_rld_it_test.go +++ b/general_tests/all_sections_cfg_rld_it_test.go @@ -1,5 +1,4 @@ -//go:build integration -// +build integration +//go:build flaky /* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments diff --git a/general_tests/analyzers_doc_it_test.go b/general_tests/analyzers_doc_it_test.go index 79ec8911e..de7bec7cd 100644 --- a/general_tests/analyzers_doc_it_test.go +++ b/general_tests/analyzers_doc_it_test.go @@ -122,7 +122,7 @@ func TestAnzDocIT(t *testing.T) { anzStringQuery(t, client, 7, "", "*lte:~*hdr.RequestID:2") anzStringQuery(t, client, -1, "", "*gt:~*hdr.RequestDuration:1ms") anzStringQuery(t, client, 1, `+RequestMethod:"AttributeSv1.ProcessEvent"`, "*notstring:~*rep.Event.Cost:0") - anzStringQuery(t, client, 1, `+RequestMethod:"CoreSv1.Status"`, "*gt:~*rep.goroutines:55") + anzStringQuery(t, client, 1, `+RequestMethod:"CoreSv1.Status"`, "*gt:~*rep.goroutines:47") } // anzStringQuery sends an AnalyzerSv1.StringQuery request. First filter represents diff --git a/general_tests/rpcexp_opts_it_test.go b/general_tests/rpcexp_opts_it_test.go index c4fe0a8f0..c0a44d8ea 100644 --- a/general_tests/rpcexp_opts_it_test.go +++ b/general_tests/rpcexp_opts_it_test.go @@ -90,6 +90,9 @@ func TestRPCExpIT(t *testing.T) { } ] } +}, +"efs": { + "enabled": true } }`, DBCfg: engine.InternalDBCfg, diff --git a/services/accounts.go b/services/accounts.go index 54f63b238..5df4ecfef 100644 --- a/services/accounts.go +++ b/services/accounts.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/accounts" "github.com/cgrates/cgrates/commonlisteners" @@ -32,11 +31,9 @@ import ( ) // NewAccountService returns the Account Service -func NewAccountService(cfg *config.CGRConfig, - connMgr *engine.ConnManager) *AccountService { +func NewAccountService(cfg *config.CGRConfig) *AccountService { return &AccountService{ cfg: cfg, - connMgr: connMgr, rldChan: make(chan struct{}, 1), stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } @@ -45,17 +42,15 @@ func NewAccountService(cfg *config.CGRConfig, // AccountService implements Service interface type AccountService struct { sync.RWMutex + cfg *config.CGRConfig acts *accounts.AccountS cl *commonlisteners.CommonListenerS rldChan chan struct{} stopChan chan struct{} - connMgr *engine.ConnManager - cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start @@ -63,29 +58,29 @@ func (acts *AccountService) Start(shutdown *utils.SyncedChan, registry *servmana srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, + utils.ConnManager, utils.CacheS, utils.FilterS, utils.DataDB, - utils.AnalyzerS, }, registry, acts.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } acts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheAccounts, utils.CacheAccountsFilterIndexes); err != nil { return err } - fs := srvDeps[utils.FilterS].(*FilterService) - dbs := srvDeps[utils.DataDB].(*DataDBService) - anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) + fs := srvDeps[utils.FilterS].(*FilterService).FilterS() + dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager() acts.Lock() defer acts.Unlock() - acts.acts = accounts.NewAccountS(acts.cfg, fs.FilterS(), acts.connMgr, dbs.DataManager()) + acts.acts = accounts.NewAccountS(acts.cfg, fs, cms.ConnManager(), dbs) acts.stopChan = make(chan struct{}) go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan) srv, err := engine.NewServiceWithPing(acts.acts, utils.AccountSv1, utils.V1Prfx) @@ -97,7 +92,7 @@ func (acts *AccountService) Start(shutdown *utils.SyncedChan, registry *servmana acts.cl.RpcRegister(srv) } - acts.intRPCconn = anz.GetInternalCodec(srv, utils.AccountS) + cms.AddInternalConn(utils.AccountS, srv) return } @@ -131,8 +126,3 @@ func (acts *AccountService) ShouldRun() bool { func (acts *AccountService) StateChan(stateID string) chan struct{} { return acts.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (acts *AccountService) IntRPCConn() birpc.ClientConnector { - return acts.intRPCconn -} diff --git a/services/actions.go b/services/actions.go index ff8e7676a..a6c4fbb9a 100644 --- a/services/actions.go +++ b/services/actions.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/actions" "github.com/cgrates/cgrates/commonlisteners" @@ -32,10 +31,8 @@ import ( ) // NewActionService returns the Action Service -func NewActionService(cfg *config.CGRConfig, - connMgr *engine.ConnManager) *ActionService { +func NewActionService(cfg *config.CGRConfig) *ActionService { return &ActionService{ - connMgr: connMgr, cfg: cfg, rldChan: make(chan struct{}, 1), stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), @@ -45,6 +42,7 @@ func NewActionService(cfg *config.CGRConfig, // ActionService implements Service interface type ActionService struct { sync.RWMutex + cfg *config.CGRConfig acts *actions.ActionS cl *commonlisteners.CommonListenerS @@ -52,11 +50,7 @@ type ActionService struct { rldChan chan struct{} stopChan chan struct{} - connMgr *engine.ConnManager - cfg *config.CGRConfig - - intRPCconn birpc.ClientConnector // share the API object implementing API calls for internal - stateDeps *StateDependencies // channel subscriptions for state changes + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start @@ -64,29 +58,29 @@ func (acts *ActionService) Start(shutdown *utils.SyncedChan, registry *servmanag srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, + utils.ConnManager, utils.CacheS, utils.FilterS, utils.DataDB, - utils.AnalyzerS, }, registry, acts.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } acts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheActionProfiles, utils.CacheActionProfilesFilterIndexes); err != nil { return err } - fs := srvDeps[utils.FilterS].(*FilterService) - dbs := srvDeps[utils.DataDB].(*DataDBService) - anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) + fs := srvDeps[utils.FilterS].(*FilterService).FilterS() + dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager() acts.Lock() defer acts.Unlock() - acts.acts = actions.NewActionS(acts.cfg, fs.FilterS(), dbs.DataManager(), acts.connMgr) + acts.acts = actions.NewActionS(acts.cfg, fs, dbs, cms.ConnManager()) acts.stopChan = make(chan struct{}) go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan) srv, err := engine.NewServiceWithPing(acts.acts, utils.ActionSv1, utils.V1Prfx) @@ -97,8 +91,7 @@ func (acts *ActionService) Start(shutdown *utils.SyncedChan, registry *servmanag if !acts.cfg.DispatcherSCfg().Enabled { acts.cl.RpcRegister(srv) } - - acts.intRPCconn = anz.GetInternalCodec(srv, utils.ActionS) + cms.AddInternalConn(utils.ActionS, srv) return } @@ -133,8 +126,3 @@ func (acts *ActionService) ShouldRun() bool { func (acts *ActionService) StateChan(stateID string) chan struct{} { return acts.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (acts *ActionService) IntRPCConn() birpc.ClientConnector { - return acts.intRPCconn -} diff --git a/services/adminsv1.go b/services/adminsv1.go index 8a88622d9..ca08c6875 100644 --- a/services/adminsv1.go +++ b/services/adminsv1.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" @@ -31,11 +30,9 @@ import ( ) // NewAPIerSv1Service returns the APIerSv1 Service -func NewAdminSv1Service(cfg *config.CGRConfig, - connMgr *engine.ConnManager) *AdminSv1Service { +func NewAdminSv1Service(cfg *config.CGRConfig) *AdminSv1Service { return &AdminSv1Service{ cfg: cfg, - connMgr: connMgr, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -43,16 +40,11 @@ func NewAdminSv1Service(cfg *config.CGRConfig, // AdminSv1Service implements Service interface type AdminSv1Service struct { sync.RWMutex - - api *apis.AdminSv1 - cl *commonlisteners.CommonListenerS - - stopChan chan struct{} - connMgr *engine.ConnManager - cfg *config.CGRConfig - - intRPCconn birpc.ClientConnector // RPC connector with internal APIs - stateDeps *StateDependencies // channel subscriptions for state changes + cfg *config.CGRConfig + api *apis.AdminSv1 + cl *commonlisteners.CommonListenerS + stopChan chan struct{} + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -61,9 +53,9 @@ func (apiService *AdminSv1Service) Start(_ *utils.SyncedChan, registry *servmana srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, + utils.ConnManager, utils.FilterS, utils.DataDB, - utils.AnalyzerS, utils.StorDB, }, registry, apiService.cfg.GeneralCfg().ConnectTimeout) @@ -71,15 +63,15 @@ func (apiService *AdminSv1Service) Start(_ *utils.SyncedChan, registry *servmana return err } apiService.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cms := srvDeps[utils.ConnManager].(*ConnManagerService) fs := srvDeps[utils.FilterS].(*FilterService) dbs := srvDeps[utils.DataDB].(*DataDBService) - anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) sdbs := srvDeps[utils.StorDB].(*StorDBService) apiService.Lock() defer apiService.Unlock() - apiService.api = apis.NewAdminSv1(apiService.cfg, dbs.DataManager(), apiService.connMgr, fs.FilterS(), sdbs.DB()) + apiService.api = apis.NewAdminSv1(apiService.cfg, dbs.DataManager(), cms.ConnManager(), fs.FilterS(), sdbs.DB()) srv, _ := engine.NewService(apiService.api) // srv, _ := birpc.NewService(apiService.api, "", false) @@ -93,9 +85,7 @@ func (apiService *AdminSv1Service) Start(_ *utils.SyncedChan, registry *servmana apiService.cl.RpcRegister(s) } } - - //backwards compatible - apiService.intRPCconn = anz.GetInternalCodec(srv, utils.AdminSv1) + cms.AddInternalConn(utils.AdminS, srv) return } @@ -128,8 +118,3 @@ func (apiService *AdminSv1Service) ShouldRun() bool { func (apiService *AdminSv1Service) StateChan(stateID string) chan struct{} { return apiService.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (apiService *AdminSv1Service) IntRPCConn() birpc.ClientConnector { - return apiService.intRPCconn -} diff --git a/services/analyzers.go b/services/analyzers.go index 294aa46a4..f96daf4ca 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -35,8 +35,12 @@ import ( // NewAnalyzerService returns the Analyzer Service func NewAnalyzerService(cfg *config.CGRConfig) *AnalyzerService { anz := &AnalyzerService{ - cfg: cfg, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), + cfg: cfg, + stateDeps: NewStateDependencies([]string{ + utils.StateServiceInit, + utils.StateServiceUP, + utils.StateServiceDOWN, + }), } return anz } @@ -44,15 +48,11 @@ func NewAnalyzerService(cfg *config.CGRConfig) *AnalyzerService { // AnalyzerService implements Service interface type AnalyzerService struct { sync.RWMutex - - anz *analyzers.AnalyzerS - cl *commonlisteners.CommonListenerS - - cancelFunc context.CancelFunc cfg *config.CGRConfig - - intRPCconn birpc.ClientConnector // share the API object implementing API calls for internal - stateDeps *StateDependencies // channel subscriptions for state changes + anz *analyzers.AnalyzerS + cl *commonlisteners.CommonListenerS + cancelFunc context.CancelFunc + stateDeps *StateDependencies // channel subscriptions for state changes } @@ -70,6 +70,7 @@ func (anz *AnalyzerService) Start(shutdown *utils.SyncedChan, registry *servmana if anz.anz, err = analyzers.NewAnalyzerS(anz.cfg); err != nil { return } + close(anz.stateDeps.StateChan(utils.StateServiceInit)) anzCtx, cancel := context.WithCancel(context.TODO()) anz.cancelFunc = cancel go func(a *analyzers.AnalyzerS) { @@ -146,8 +147,3 @@ func (anz *AnalyzerService) GetInternalCodec(c birpc.ClientConnector, to string) func (anz *AnalyzerService) StateChan(stateID string) chan struct{} { return anz.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (anz *AnalyzerService) IntRPCConn() birpc.ClientConnector { - return anz.intRPCconn -} diff --git a/services/asteriskagent.go b/services/asteriskagent.go index 54cd90579..79cfce440 100644 --- a/services/asteriskagent.go +++ b/services/asteriskagent.go @@ -22,9 +22,6 @@ import ( "fmt" "sync" - "github.com/cgrates/birpc" - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/servmanager" @@ -32,11 +29,9 @@ import ( ) // NewAsteriskAgent returns the Asterisk Agent -func NewAsteriskAgent(cfg *config.CGRConfig, - connMgr *engine.ConnManager) *AsteriskAgent { +func NewAsteriskAgent(cfg *config.CGRConfig) *AsteriskAgent { return &AsteriskAgent{ cfg: cfg, - connMgr: connMgr, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -44,18 +39,19 @@ func NewAsteriskAgent(cfg *config.CGRConfig, // AsteriskAgent implements Agent interface type AsteriskAgent struct { sync.RWMutex - cfg *config.CGRConfig - stopChan chan struct{} - - smas []*agents.AsteriskAgent - connMgr *engine.ConnManager - - intRPCconn birpc.ClientConnector // share the API object implementing API calls for internal - stateDeps *StateDependencies // channel subscriptions for state changes + cfg *config.CGRConfig + stopChan chan struct{} + smas []*agents.AsteriskAgent + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (ast *AsteriskAgent) Start(shutdown *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { +func (ast *AsteriskAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { + cms, err := WaitForServiceState(utils.StateServiceUP, utils.ConnManager, registry, ast.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return + } + ast.Lock() defer ast.Unlock() @@ -68,7 +64,7 @@ func (ast *AsteriskAgent) Start(shutdown *utils.SyncedChan, _ *servmanager.Servi ast.stopChan = make(chan struct{}) ast.smas = make([]*agents.AsteriskAgent, len(ast.cfg.AsteriskAgentCfg().AsteriskConns)) for connIdx := range ast.cfg.AsteriskAgentCfg().AsteriskConns { // Instantiate connections towards asterisk servers - ast.smas[connIdx] = agents.NewAsteriskAgent(ast.cfg, connIdx, ast.connMgr) + ast.smas[connIdx] = agents.NewAsteriskAgent(ast.cfg, connIdx, cms.(*ConnManagerService).ConnManager()) go listenAndServe(ast.smas[connIdx], ast.stopChan) } return @@ -108,8 +104,3 @@ func (ast *AsteriskAgent) ShouldRun() bool { func (ast *AsteriskAgent) StateChan(stateID string) chan struct{} { return ast.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (ast *AsteriskAgent) IntRPCConn() birpc.ClientConnector { - return ast.intRPCconn -} diff --git a/services/attributes.go b/services/attributes.go index 568f1fa56..783e414b3 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" @@ -43,6 +42,7 @@ func NewAttributeService(cfg *config.CGRConfig, // AttributeService implements Service interface type AttributeService struct { sync.RWMutex + cfg *config.CGRConfig dspS *DispatcherService @@ -50,10 +50,7 @@ type AttributeService struct { cl *commonlisteners.CommonListenerS rpc *apis.AttributeSv1 // useful on restart - cfg *config.CGRConfig - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies + stateDeps *StateDependencies } // Start should handle the service start @@ -61,16 +58,17 @@ func (attrS *AttributeService) Start(shutdown *utils.SyncedChan, registry *servm srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, + utils.ConnManager, utils.CacheS, utils.FilterS, utils.DataDB, - utils.AnalyzerS, }, registry, attrS.cfg.GeneralCfg().ConnectTimeout) if err != nil { return } attrS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheAttributeProfiles, @@ -79,7 +77,6 @@ func (attrS *AttributeService) Start(shutdown *utils.SyncedChan, registry *servm } fs := srvDeps[utils.FilterS].(*FilterService) dbs := srvDeps[utils.DataDB].(*DataDBService) - anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) attrS.Lock() defer attrS.Unlock() @@ -104,8 +101,7 @@ func (attrS *AttributeService) Start(shutdown *utils.SyncedChan, registry *servm } }() - - attrS.intRPCconn = anz.GetInternalCodec(srv, utils.AttributeS) + cms.AddInternalConn(utils.AttributeS, srv) return } @@ -139,8 +135,3 @@ func (attrS *AttributeService) ShouldRun() bool { func (attrS *AttributeService) StateChan(stateID string) chan struct{} { return attrS.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (attrS *AttributeService) IntRPCConn() birpc.ClientConnector { - return attrS.intRPCconn -} diff --git a/services/caches.go b/services/caches.go index 9ee12cfe6..c4826a360 100644 --- a/services/caches.go +++ b/services/caches.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -30,10 +29,9 @@ import ( ) // NewCacheService . -func NewCacheService(cfg *config.CGRConfig, connMgr *engine.ConnManager) *CacheService { +func NewCacheService(cfg *config.CGRConfig) *CacheService { return &CacheService{ cfg: cfg, - connMgr: connMgr, cacheCh: make(chan *engine.CacheS, 1), stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } @@ -41,15 +39,11 @@ func NewCacheService(cfg *config.CGRConfig, connMgr *engine.ConnManager) *CacheS // CacheService implements Agent interface type CacheService struct { - mu sync.Mutex - cl *commonlisteners.CommonListenerS - - cacheCh chan *engine.CacheS - connMgr *engine.ConnManager - cfg *config.CGRConfig - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + mu sync.Mutex + cfg *config.CGRConfig + cl *commonlisteners.CommonListenerS + cacheCh chan *engine.CacheS + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -58,7 +52,7 @@ func (cS *CacheService) Start(shutdown *utils.SyncedChan, registry *servmanager. []string{ utils.CommonListenerS, utils.DataDB, - utils.AnalyzerS, + utils.ConnManager, utils.CoreS, }, registry, cS.cfg.GeneralCfg().ConnectTimeout) @@ -67,13 +61,13 @@ func (cS *CacheService) Start(shutdown *utils.SyncedChan, registry *servmanager. } cS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() dbs := srvDeps[utils.DataDB].(*DataDBService) - anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) + cms := srvDeps[utils.ConnManager].(*ConnManagerService) cs := srvDeps[utils.CoreS].(*CoreService) cS.mu.Lock() defer cS.mu.Unlock() - engine.Cache = engine.NewCacheS(cS.cfg, dbs.DataManager(), cS.connMgr, cs.CoreS().CapsStats) + engine.Cache = engine.NewCacheS(cS.cfg, dbs.DataManager(), cms.ConnManager(), cs.CoreS().CapsStats) go engine.Cache.Precache(shutdown) cS.cacheCh <- engine.Cache @@ -85,7 +79,7 @@ func (cS *CacheService) Start(shutdown *utils.SyncedChan, registry *servmanager. cS.cl.RpcRegister(s) } } - cS.intRPCconn = anz.GetInternalCodec(srv, utils.CacheS) + cms.AddInternalConn(utils.CacheS, srv) return } @@ -137,8 +131,3 @@ func (cS *CacheService) WaitToPrecache(shutdown *utils.SyncedChan, cacheIDs ...s func (cS *CacheService) StateChan(stateID string) chan struct{} { return cS.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (cS *CacheService) IntRPCConn() birpc.ClientConnector { - return cS.intRPCconn -} diff --git a/services/cdrs.go b/services/cdrs.go index 48611b168..c3acaf397 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -22,7 +22,6 @@ import ( "runtime" "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/cdrs" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" @@ -32,11 +31,9 @@ import ( ) // NewCDRServer returns the CDR Server -func NewCDRServer(cfg *config.CGRConfig, - connMgr *engine.ConnManager) *CDRService { +func NewCDRServer(cfg *config.CGRConfig) *CDRService { return &CDRService{ cfg: cfg, - connMgr: connMgr, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -44,15 +41,12 @@ func NewCDRServer(cfg *config.CGRConfig, // CDRService implements Service interface type CDRService struct { sync.RWMutex + cfg *config.CGRConfig cdrS *cdrs.CDRServer cl *commonlisteners.CommonListenerS - connMgr *engine.ConnManager - cfg *config.CGRConfig - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -60,9 +54,9 @@ func (cs *CDRService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, + utils.ConnManager, utils.FilterS, utils.DataDB, - utils.AnalyzerS, utils.StorDB, }, registry, cs.cfg.GeneralCfg().ConnectTimeout) @@ -70,15 +64,15 @@ func (cs *CDRService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe return err } cs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() - fs := srvDeps[utils.FilterS].(*FilterService) + cms := srvDeps[utils.ConnManager].(*ConnManagerService) + fs := srvDeps[utils.FilterS].(*FilterService).FilterS() dbs := srvDeps[utils.DataDB].(*DataDBService) - anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) - sdbs := srvDeps[utils.StorDB].(*StorDBService) + sdbs := srvDeps[utils.StorDB].(*StorDBService).DB() cs.Lock() defer cs.Unlock() - cs.cdrS = cdrs.NewCDRServer(cs.cfg, dbs.DataManager(), fs.FilterS(), cs.connMgr, sdbs.DB()) + cs.cdrS = cdrs.NewCDRServer(cs.cfg, dbs.DataManager(), fs, cms.ConnManager(), sdbs) runtime.Gosched() srv, err := engine.NewServiceWithPing(cs.cdrS, utils.CDRsV1, utils.V1Prfx) if err != nil { @@ -87,8 +81,7 @@ func (cs *CDRService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe if !cs.cfg.DispatcherSCfg().Enabled { cs.cl.RpcRegister(srv) } - - cs.intRPCconn = anz.GetInternalCodec(srv, utils.CDRServer) + cms.AddInternalConn(utils.CDRServer, srv) return } @@ -120,8 +113,3 @@ func (cs *CDRService) ShouldRun() bool { func (cs *CDRService) StateChan(stateID string) chan struct{} { return cs.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (cs *CDRService) IntRPCConn() birpc.ClientConnector { - return cs.intRPCconn -} diff --git a/services/chargers.go b/services/chargers.go index 217f212a9..a7462faf0 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -30,11 +29,9 @@ import ( ) // NewChargerService returns the Charger Service -func NewChargerService(cfg *config.CGRConfig, - connMgr *engine.ConnManager) *ChargerService { +func NewChargerService(cfg *config.CGRConfig) *ChargerService { return &ChargerService{ cfg: cfg, - connMgr: connMgr, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -42,15 +39,12 @@ func NewChargerService(cfg *config.CGRConfig, // ChargerService implements Service interface type ChargerService struct { sync.RWMutex + cfg *config.CGRConfig chrS *engine.ChargerS cl *commonlisteners.CommonListenerS - connMgr *engine.ConnManager - cfg *config.CGRConfig - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start @@ -58,16 +52,17 @@ func (chrS *ChargerService) Start(shutdown *utils.SyncedChan, registry *servmana srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, + utils.ConnManager, utils.CacheS, utils.FilterS, utils.DataDB, - utils.AnalyzerS, }, registry, chrS.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } chrS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheChargerProfiles, @@ -76,11 +71,10 @@ func (chrS *ChargerService) Start(shutdown *utils.SyncedChan, registry *servmana } fs := srvDeps[utils.FilterS].(*FilterService) dbs := srvDeps[utils.DataDB].(*DataDBService) - anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) chrS.Lock() defer chrS.Unlock() - chrS.chrS = engine.NewChargerService(dbs.DataManager(), fs.FilterS(), chrS.cfg, chrS.connMgr) + chrS.chrS = engine.NewChargerService(dbs.DataManager(), fs.FilterS(), chrS.cfg, cms.ConnManager()) srv, _ := engine.NewService(chrS.chrS) // srv, _ := birpc.NewService(apis.NewChargerSv1(chrS.chrS), "", false) if !chrS.cfg.DispatcherSCfg().Enabled { @@ -88,8 +82,7 @@ func (chrS *ChargerService) Start(shutdown *utils.SyncedChan, registry *servmana chrS.cl.RpcRegister(s) } } - - chrS.intRPCconn = anz.GetInternalCodec(srv, utils.ChargerS) + cms.AddInternalConn(utils.ChargerS, srv) return nil } @@ -121,8 +114,3 @@ func (chrS *ChargerService) ShouldRun() bool { func (chrS *ChargerService) StateChan(stateID string) chan struct{} { return chrS.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (chrS *ChargerService) IntRPCConn() birpc.ClientConnector { - return chrS.intRPCconn -} diff --git a/services/commonlisteners.go b/services/commonlisteners.go index 00cd920fa..1b0a9836d 100644 --- a/services/commonlisteners.go +++ b/services/commonlisteners.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -41,15 +40,11 @@ func NewCommonListenerService(cfg *config.CGRConfig, caps *engine.Caps) *CommonL // CommonListenerService implements Service interface. type CommonListenerService struct { - mu sync.RWMutex - - cls *commonlisteners.CommonListenerS - - caps *engine.Caps - cfg *config.CGRConfig - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + mu sync.RWMutex + cfg *config.CGRConfig + cls *commonlisteners.CommonListenerS + caps *engine.Caps + stateDeps *StateDependencies // channel subscriptions for state changes } // Start handles the service start. @@ -94,11 +89,6 @@ func (cl *CommonListenerService) StateChan(stateID string) chan struct{} { return cl.stateDeps.StateChan(stateID) } -// IntRPCConn returns the internal connection used by RPCClient -func (cl *CommonListenerService) IntRPCConn() birpc.ClientConnector { - return cl.intRPCconn -} - // CLS returns the CommonListenerS object. func (cl *CommonListenerService) CLS() *commonlisteners.CommonListenerS { return cl.cls diff --git a/services/config.go b/services/config.go index e263ce4a7..c8024bf5d 100644 --- a/services/config.go +++ b/services/config.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -39,11 +38,10 @@ func NewConfigService(cfg *config.CGRConfig) *ConfigService { // ConfigService implements Service interface. type ConfigService struct { - mu sync.RWMutex - cfg *config.CGRConfig - cl *commonlisteners.CommonListenerS - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + mu sync.RWMutex + cfg *config.CGRConfig + cl *commonlisteners.CommonListenerS + stateDeps *StateDependencies // channel subscriptions for state changes } // Start handles the service start. @@ -51,14 +49,14 @@ func (s *ConfigService) Start(_ *utils.SyncedChan, registry *servmanager.Service srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, - utils.AnalyzerS, + utils.ConnManager, }, registry, s.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() - anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) + cms := srvDeps[utils.ConnManager].(*ConnManagerService) svcs, _ := engine.NewServiceWithName(s.cfg, utils.ConfigS, true) if !s.cfg.DispatcherSCfg().Enabled { @@ -66,7 +64,7 @@ func (s *ConfigService) Start(_ *utils.SyncedChan, registry *servmanager.Service s.cl.RpcRegister(svc) } } - s.intRPCconn = anz.GetInternalCodec(svcs, utils.ConfigSv1) + cms.AddInternalConn(utils.ConfigS, svcs) return nil } @@ -94,8 +92,3 @@ func (s *ConfigService) ShouldRun() bool { func (s *ConfigService) StateChan(stateID string) chan struct{} { return s.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (s *ConfigService) IntRPCConn() birpc.ClientConnector { - return s.intRPCconn -} diff --git a/services/connmanager.go b/services/connmanager.go new file mode 100644 index 000000000..c6e795165 --- /dev/null +++ b/services/connmanager.go @@ -0,0 +1,225 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "sync" + + "github.com/cgrates/birpc" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewConnManagerService instantiates a new ConnManagerService. +func NewConnManagerService(cfg *config.CGRConfig) *ConnManagerService { + return &ConnManagerService{ + cfg: cfg, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), + } +} + +// ConnManagerService implements Service interface. +type ConnManagerService struct { + mu sync.RWMutex + cfg *config.CGRConfig + connMgr *engine.ConnManager + anz *AnalyzerService + stateDeps *StateDependencies // channel subscriptions for state changes +} + +// Start handles the service start. +func (s *ConnManagerService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { + s.anz = registry.Lookup(utils.AnalyzerS).(*AnalyzerService) + if s.anz.ShouldRun() { // wait for AnalyzerS only if it should run + if _, err := WaitForServiceState(utils.StateServiceInit, utils.AnalyzerS, registry, + s.cfg.GeneralCfg().ConnectTimeout); err != nil { + return err + } + } + s.connMgr = engine.NewConnManager(s.cfg) + return nil +} + +// Reload handles the config changes. +func (s *ConnManagerService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error { + s.connMgr.Reload() + return nil +} + +// Shutdown stops the service. +func (s *ConnManagerService) Shutdown(_ *servmanager.ServiceRegistry) error { + s.connMgr = nil + engine.SetConnManager(nil) + return nil +} + +// ServiceName returns the service name +func (s *ConnManagerService) ServiceName() string { + return utils.ConnManager +} + +// ShouldRun returns if the service should be running. +func (s *ConnManagerService) ShouldRun() bool { + return true +} + +// StateChan returns signaling channel of specific state +func (s *ConnManagerService) StateChan(stateID string) chan struct{} { + return s.stateDeps.StateChan(stateID) +} + +// ConnManager returns the ConnManager object. +func (s *ConnManagerService) ConnManager() *engine.ConnManager { + return s.connMgr +} + +// AddInternalConn registers direct internal RPC access for a service. +// TODO: Add function to remove internal conns (useful for shutdown). +func (s *ConnManagerService) AddInternalConn(svcName string, receiver birpc.ClientConnector) { + s.mu.Lock() + defer s.mu.Unlock() + route, exists := serviceMethods[svcName] + if !exists { + return + } + rpcIntChan := make(chan birpc.ClientConnector, 1) + s.connMgr.AddInternalConn(route.internalPath, route.receiver, rpcIntChan) + if route.biRPCPath != "" { + s.connMgr.AddInternalConn(route.biRPCPath, route.receiver, rpcIntChan) + } + rpcIntChan <- s.anz.GetInternalCodec(receiver, svcName) +} + +// internalRoute defines how a service's methods can be accessed internally within the system. +type internalRoute struct { + receiver string // method receiver name (e.g. "ChargerSv1") + internalPath string // internal API path + biRPCPath string // bidirectional API path, if supported +} + +var serviceMethods = map[string]internalRoute{ + utils.AnalyzerS: { + receiver: utils.AnalyzerSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAnalyzerS), + }, + utils.AdminS: { + receiver: utils.AdminSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAdminS), + }, + utils.AttributeS: { + receiver: utils.AttributeSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes), + }, + utils.CacheS: { + receiver: utils.CacheSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches), + }, + utils.CDRs: { + receiver: utils.CDRsV1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs), + }, + utils.ChargerS: { + receiver: utils.ChargerSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers), + }, + utils.GuardianS: { + receiver: utils.GuardianSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian), + }, + utils.LoaderS: { + receiver: utils.LoaderSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaLoaders), + }, + utils.ResourceS: { + receiver: utils.ResourceSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources), + }, + utils.SessionS: { + receiver: utils.SessionSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS), + biRPCPath: utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS), + }, + utils.StatS: { + receiver: utils.StatSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats), + }, + utils.RankingS: { + receiver: utils.RankingSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRankings), + }, + utils.TrendS: { + receiver: utils.TrendSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTrends), + }, + utils.RouteS: { + receiver: utils.RouteSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRoutes), + }, + utils.ThresholdS: { + receiver: utils.ThresholdSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds), + }, + utils.ServiceManagerS: { + receiver: utils.ServiceManagerV1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaServiceManager), + }, + utils.ConfigS: { + receiver: utils.ConfigSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig), + }, + utils.CoreS: { + receiver: utils.CoreSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore), + }, + utils.EEs: { + receiver: utils.EeSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs), + }, + utils.RateS: { + receiver: utils.RateSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRates), + }, + utils.DispatcherS: { + receiver: utils.DispatcherSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers), + }, + utils.AccountS: { + receiver: utils.AccountSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts), + }, + utils.ActionS: { + receiver: utils.ActionSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions), + }, + utils.TPeS: { + receiver: utils.TPeSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTpes), + }, + utils.EFs: { + receiver: utils.EfSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEFs), + }, + utils.ERs: { + receiver: utils.ErSv1, + internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaERs), + }, +} diff --git a/services/cores.go b/services/cores.go index f908de331..69b3f1822 100644 --- a/services/cores.go +++ b/services/cores.go @@ -22,7 +22,6 @@ import ( "os" "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" @@ -46,20 +45,16 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, // CoreService implements Service interface type CoreService struct { - mu sync.RWMutex - - cS *cores.CoreS - cl *commonlisteners.CommonListenerS - - fileCPU *os.File - caps *engine.Caps - csCh chan *cores.CoreS - stopChan chan struct{} - shdWg *sync.WaitGroup - cfg *config.CGRConfig - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + mu sync.RWMutex + cfg *config.CGRConfig + cS *cores.CoreS + cl *commonlisteners.CommonListenerS + fileCPU *os.File + caps *engine.Caps + csCh chan *cores.CoreS + stopChan chan struct{} + shdWg *sync.WaitGroup + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start @@ -67,14 +62,14 @@ func (cS *CoreService) Start(shutdown *utils.SyncedChan, registry *servmanager.S srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, - utils.AnalyzerS, + utils.ConnManager, }, registry, cS.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } cS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() - anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) + cms := srvDeps[utils.ConnManager].(*ConnManagerService) cS.mu.Lock() defer cS.mu.Unlock() @@ -90,8 +85,7 @@ func (cS *CoreService) Start(shutdown *utils.SyncedChan, registry *servmanager.S cS.cl.RpcRegister(s) } } - - cS.intRPCconn = anz.GetInternalCodec(srv, utils.CoreS) + cms.AddInternalConn(utils.CoreS, srv) return nil } @@ -129,11 +123,6 @@ func (cS *CoreService) StateChan(stateID string) chan struct{} { return cS.stateDeps.StateChan(stateID) } -// IntRPCConn returns the internal connection used by RPCClient -func (cS *CoreService) IntRPCConn() birpc.ClientConnector { - return cS.intRPCconn -} - // CoreS returns the CoreS object. func (cS *CoreService) CoreS() *cores.CoreS { cS.mu.RLock() diff --git a/services/datadb.go b/services/datadb.go index c5440c0c9..697d8c33b 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -22,7 +22,6 @@ import ( "fmt" "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -30,11 +29,10 @@ import ( ) // NewDataDBService returns the DataDB Service -func NewDataDBService(cfg *config.CGRConfig, connMgr *engine.ConnManager, setVersions bool, +func NewDataDBService(cfg *config.CGRConfig, setVersions bool, srvDep map[string]*sync.WaitGroup) *DataDBService { return &DataDBService{ cfg: cfg, - connMgr: connMgr, setVersions: setVersions, srvDep: srvDep, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), @@ -44,21 +42,20 @@ func NewDataDBService(cfg *config.CGRConfig, connMgr *engine.ConnManager, setVer // DataDBService implements Service interface type DataDBService struct { sync.RWMutex - cfg *config.CGRConfig - oldDBCfg *config.DataDbCfg - connMgr *engine.ConnManager - + cfg *config.CGRConfig + oldDBCfg *config.DataDbCfg dm *engine.DataManager setVersions bool - - srvDep map[string]*sync.WaitGroup - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + srvDep map[string]*sync.WaitGroup + stateDeps *StateDependencies // channel subscriptions for state changes } // Start handles the service start. -func (db *DataDBService) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { +func (db *DataDBService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { + cms, err := WaitForServiceState(utils.StateServiceUP, utils.ConnManager, registry, db.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return + } db.Lock() defer db.Unlock() db.oldDBCfg = db.cfg.DataDbCfg().Clone() @@ -71,7 +68,7 @@ func (db *DataDBService) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegist utils.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err)) return } - db.dm = engine.NewDataManager(dbConn, db.cfg.CacheCfg(), db.connMgr) + db.dm = engine.NewDataManager(dbConn, db.cfg.CacheCfg(), cms.(*ConnManagerService).ConnManager()) if db.setVersions { err = engine.OverwriteDBVersions(dbConn) @@ -176,8 +173,3 @@ func (db *DataDBService) DataManager() *engine.DataManager { func (db *DataDBService) StateChan(stateID string) chan struct{} { return db.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (db *DataDBService) IntRPCConn() birpc.ClientConnector { - return db.intRPCconn -} diff --git a/services/diameteragent.go b/services/diameteragent.go index 24a8a72ca..c8f578e15 100644 --- a/services/diameteragent.go +++ b/services/diameteragent.go @@ -22,7 +22,6 @@ import ( "fmt" "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -31,11 +30,9 @@ import ( ) // NewDiameterAgent returns the Diameter Agent -func NewDiameterAgent(cfg *config.CGRConfig, - connMgr *engine.ConnManager, caps *engine.Caps) *DiameterAgent { +func NewDiameterAgent(cfg *config.CGRConfig, caps *engine.Caps) *DiameterAgent { return &DiameterAgent{ cfg: cfg, - connMgr: connMgr, caps: caps, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } @@ -47,36 +44,39 @@ type DiameterAgent struct { cfg *config.CGRConfig stopChan chan struct{} - da *agents.DiameterAgent - connMgr *engine.ConnManager - caps *engine.Caps + da *agents.DiameterAgent + caps *engine.Caps lnet string laddr string - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start func (da *DiameterAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { - fs, err := WaitForServiceState(utils.StateServiceUP, utils.FilterS, registry, - da.cfg.GeneralCfg().ConnectTimeout) + srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.ConnManager, + utils.FilterS, + }, + registry, da.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } + cms := srvDeps[utils.ConnManager].(*ConnManagerService) + fs := srvDeps[utils.FilterS].(*FilterService) da.Lock() defer da.Unlock() - return da.start(fs.(*FilterService).FilterS(), da.caps, shutdown) + return da.start(fs.FilterS(), cms.ConnManager(), da.caps, shutdown) } -func (da *DiameterAgent) start(filterS *engine.FilterS, caps *engine.Caps, shutdown *utils.SyncedChan) error { +func (da *DiameterAgent) start(filterS *engine.FilterS, cm *engine.ConnManager, caps *engine.Caps, + shutdown *utils.SyncedChan) error { var err error - da.da, err = agents.NewDiameterAgent(da.cfg, filterS, da.connMgr, caps) + da.da, err = agents.NewDiameterAgent(da.cfg, filterS, cm, caps) if err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> failed to initialize agent: %v", - utils.DiameterAgent, err)) return err } da.lnet = da.cfg.DiameterAgentCfg().ListenNet @@ -102,12 +102,18 @@ func (da *DiameterAgent) Reload(shutdown *utils.SyncedChan, registry *servmanage } close(da.stopChan) - fs, err := WaitForServiceState(utils.StateServiceUP, utils.FilterS, registry, - da.cfg.GeneralCfg().ConnectTimeout) + srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.ConnManager, + utils.FilterS, + }, + registry, da.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } - return da.start(fs.(*FilterService).FilterS(), da.caps, shutdown) + cms := srvDeps[utils.ConnManager].(*ConnManagerService) + fs := srvDeps[utils.FilterS].(*FilterService) + return da.start(fs.FilterS(), cms.ConnManager(), da.caps, shutdown) } // Shutdown stops the service @@ -133,8 +139,3 @@ func (da *DiameterAgent) ShouldRun() bool { func (da *DiameterAgent) StateChan(stateID string) chan struct{} { return da.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (da *DiameterAgent) IntRPCConn() birpc.ClientConnector { - return da.intRPCconn -} diff --git a/services/dispatchers.go b/services/dispatchers.go index b4f78a20f..fb17a85bb 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/dispatchers" @@ -31,11 +30,9 @@ import ( ) // NewDispatcherService returns the Dispatcher Service -func NewDispatcherService(cfg *config.CGRConfig, - connMgr *engine.ConnManager) *DispatcherService { +func NewDispatcherService(cfg *config.CGRConfig) *DispatcherService { return &DispatcherService{ cfg: cfg, - connMgr: connMgr, srvsReload: make(map[string]chan struct{}), stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } @@ -44,16 +41,12 @@ func NewDispatcherService(cfg *config.CGRConfig, // DispatcherService implements Service interface type DispatcherService struct { sync.RWMutex - - dspS *dispatchers.DispatcherService - cl *commonlisteners.CommonListenerS - - connMgr *engine.ConnManager cfg *config.CGRConfig + dspS *dispatchers.DispatcherService + cl *commonlisteners.CommonListenerS + connMgr *engine.ConnManager srvsReload map[string]chan struct{} - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -61,16 +54,19 @@ func (dspS *DispatcherService) Start(shutdown *utils.SyncedChan, registry *servm srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, + utils.ConnManager, utils.CacheS, utils.FilterS, utils.DataDB, - utils.AnalyzerS, + utils.AttributeS, }, registry, dspS.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } dspS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cms := srvDeps[utils.ConnManager].(*ConnManagerService) + dspS.connMgr = cms.ConnManager() cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheDispatcherProfiles, @@ -80,7 +76,6 @@ func (dspS *DispatcherService) Start(shutdown *utils.SyncedChan, registry *servm } fs := srvDeps[utils.FilterS].(*FilterService) dbs := srvDeps[utils.DataDB].(*DataDBService) - anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) dspS.Lock() defer dspS.Unlock() @@ -98,7 +93,7 @@ func (dspS *DispatcherService) Start(shutdown *utils.SyncedChan, registry *servm // for the moment we dispable Apier through dispatcher // until we figured out a better sollution in case of gob server // dspS.server.SetDispatched() - dspS.intRPCconn = anz.GetInternalCodec(srv, utils.DispatcherS) + cms.AddInternalConn(utils.DispatcherS, srv) return } @@ -162,8 +157,3 @@ func (dspS *DispatcherService) sync() { func (dspS *DispatcherService) StateChan(stateID string) chan struct{} { return dspS.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (dspS *DispatcherService) IntRPCConn() birpc.ClientConnector { - return dspS.intRPCconn -} diff --git a/services/dnsagent.go b/services/dnsagent.go index c6974d7ec..8a37eff98 100644 --- a/services/dnsagent.go +++ b/services/dnsagent.go @@ -19,23 +19,18 @@ along with this program. If not, see package services import ( - "fmt" "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) // NewDNSAgent returns the DNS Agent -func NewDNSAgent(cfg *config.CGRConfig, - connMgr *engine.ConnManager) *DNSAgent { +func NewDNSAgent(cfg *config.CGRConfig) *DNSAgent { return &DNSAgent{ cfg: cfg, - connMgr: connMgr, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -47,26 +42,29 @@ type DNSAgent struct { stopChan chan struct{} - dns *agents.DNSAgent - connMgr *engine.ConnManager + dns *agents.DNSAgent - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start func (dns *DNSAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { - fs, err := WaitForServiceState(utils.StateServiceUP, utils.FilterS, registry, - dns.cfg.GeneralCfg().ConnectTimeout) + srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.ConnManager, + utils.FilterS, + }, + registry, dns.cfg.GeneralCfg().ConnectTimeout) if err != nil { - return + return err } + cms := srvDeps[utils.ConnManager].(*ConnManagerService) + fs := srvDeps[utils.FilterS].(*FilterService) dns.Lock() defer dns.Unlock() - dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.(*FilterService).FilterS(), dns.connMgr) + dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.FilterS(), cms.ConnManager()) if err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) dns.dns = nil return } @@ -77,11 +75,17 @@ func (dns *DNSAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.Ser // Reload handles the change of config func (dns *DNSAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { - fs, err := WaitForServiceState(utils.StateServiceUP, utils.FilterS, registry, - dns.cfg.GeneralCfg().ConnectTimeout) + srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.ConnManager, + utils.FilterS, + }, + registry, dns.cfg.GeneralCfg().ConnectTimeout) if err != nil { - return + return err } + cms := srvDeps[utils.ConnManager].(*ConnManagerService) + fs := srvDeps[utils.FilterS].(*FilterService) dns.Lock() defer dns.Unlock() @@ -90,9 +94,8 @@ func (dns *DNSAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager.Se close(dns.stopChan) } - dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.(*FilterService).FilterS(), dns.connMgr) + dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.FilterS(), cms.ConnManager()) if err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) dns.dns = nil return } @@ -108,7 +111,6 @@ func (dns *DNSAgent) listenAndServe(stopChan chan struct{}, shutdown *utils.Sync dns.dns.RLock() defer dns.dns.RUnlock() if err = dns.dns.ListenAndServe(stopChan); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) shutdown.CloseOnce() // stop the engine here } return @@ -140,8 +142,3 @@ func (dns *DNSAgent) ShouldRun() bool { func (dns *DNSAgent) StateChan(stateID string) chan struct{} { return dns.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (dns *DNSAgent) IntRPCConn() birpc.ClientConnector { - return dns.intRPCconn -} diff --git a/services/ees.go b/services/ees.go index a6734974f..5ed306a9e 100644 --- a/services/ees.go +++ b/services/ees.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/ees" @@ -31,27 +30,22 @@ import ( ) // NewEventExporterService constructs EventExporterService -func NewEventExporterService(cfg *config.CGRConfig, - connMgr *engine.ConnManager) *EventExporterService { +func NewEventExporterService(cfg *config.CGRConfig) *EventExporterService { return &EventExporterService{ cfg: cfg, - connMgr: connMgr, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } // EventExporterService is the service structure for EventExporterS type EventExporterService struct { - mu sync.RWMutex + mu sync.RWMutex + cfg *config.CGRConfig eeS *ees.EeS cl *commonlisteners.CommonListenerS - connMgr *engine.ConnManager - cfg *config.CGRConfig - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + stateDeps *StateDependencies // channel subscriptions for state changes } // ServiceName returns the service name @@ -87,21 +81,21 @@ func (es *EventExporterService) Start(_ *utils.SyncedChan, registry *servmanager srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, + utils.ConnManager, utils.FilterS, - utils.AnalyzerS, }, registry, es.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } es.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() - fs := srvDeps[utils.FilterS].(*FilterService) - anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) + cms := srvDeps[utils.ConnManager].(*ConnManagerService) + fs := srvDeps[utils.FilterS].(*FilterService).FilterS() es.mu.Lock() defer es.mu.Unlock() - es.eeS, err = ees.NewEventExporterS(es.cfg, fs.FilterS(), es.connMgr) + es.eeS, err = ees.NewEventExporterS(es.cfg, fs, cms.ConnManager()) if err != nil { return err } @@ -111,8 +105,7 @@ func (es *EventExporterService) Start(_ *utils.SyncedChan, registry *servmanager if !es.cfg.DispatcherSCfg().Enabled { es.cl.RpcRegister(srv) } - - es.intRPCconn = anz.GetInternalCodec(srv, utils.EEs) + cms.AddInternalConn(utils.EEs, srv) return nil } @@ -120,8 +113,3 @@ func (es *EventExporterService) Start(_ *utils.SyncedChan, registry *servmanager func (es *EventExporterService) StateChan(stateID string) chan struct{} { return es.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (es *EventExporterService) IntRPCConn() birpc.ClientConnector { - return es.intRPCconn -} diff --git a/services/efs.go b/services/efs.go index eebefdd34..eea73a6c2 100644 --- a/services/efs.go +++ b/services/efs.go @@ -39,38 +39,41 @@ type ExportFailoverService struct { srv *birpc.Service stopChan chan struct{} - connMgr *engine.ConnManager cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + stateDeps *StateDependencies // channel subscriptions for state changes } // NewExportFailoverService is the constructor for the TpeService -func NewExportFailoverService(cfg *config.CGRConfig, connMgr *engine.ConnManager) *ExportFailoverService { +func NewExportFailoverService(cfg *config.CGRConfig) *ExportFailoverService { return &ExportFailoverService{ cfg: cfg, - connMgr: connMgr, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } // Start should handle the service start func (efServ *ExportFailoverService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { - cls, err := WaitForServiceState(utils.StateServiceUP, utils.CommonListenerS, registry, - efServ.cfg.GeneralCfg().ConnectTimeout) + srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.ConnManager, + }, + registry, efServ.cfg.GeneralCfg().ConnectTimeout) if err != nil { return } - efServ.cl = cls.(*CommonListenerService).CLS() + efServ.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cms := srvDeps[utils.ConnManager].(*ConnManagerService) efServ.Lock() defer efServ.Unlock() - efServ.efS = efs.NewEfs(efServ.cfg, efServ.connMgr) + efServ.efS = efs.NewEfs(efServ.cfg, cms.ConnManager()) efServ.stopChan = make(chan struct{}) efServ.srv, _ = engine.NewServiceWithPing(efServ.efS, utils.EfSv1, utils.V1Prfx) efServ.cl.RpcRegister(efServ.srv) + cms.AddInternalConn(utils.EFs, efServ.srv) return } @@ -101,8 +104,3 @@ func (efServ *ExportFailoverService) ServiceName() string { func (efServ *ExportFailoverService) StateChan(stateID string) chan struct{} { return efServ.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (efServ *ExportFailoverService) IntRPCConn() birpc.ClientConnector { - return efServ.intRPCconn -} diff --git a/services/ers.go b/services/ers.go index acee95b0b..8242bbfaa 100644 --- a/services/ers.go +++ b/services/ers.go @@ -22,7 +22,6 @@ import ( "fmt" "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -32,13 +31,10 @@ import ( ) // NewEventReaderService returns the EventReader Service -func NewEventReaderService( - cfg *config.CGRConfig, - connMgr *engine.ConnManager) *EventReaderService { +func NewEventReaderService(cfg *config.CGRConfig) *EventReaderService { return &EventReaderService{ rldChan: make(chan struct{}, 1), cfg: cfg, - connMgr: connMgr, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -46,17 +42,15 @@ func NewEventReaderService( // EventReaderService implements Service interface type EventReaderService struct { sync.RWMutex + cfg *config.CGRConfig ers *ers.ERService cl *commonlisteners.CommonListenerS rldChan chan struct{} stopChan chan struct{} - connMgr *engine.ConnManager - cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -64,16 +58,16 @@ func (erS *EventReaderService) Start(shutdown *utils.SyncedChan, registry *servm srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, + utils.ConnManager, utils.FilterS, - utils.AnalyzerS, }, registry, erS.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } erS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cms := srvDeps[utils.ConnManager].(*ConnManagerService) fs := srvDeps[utils.FilterS].(*FilterService) - anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) erS.Lock() defer erS.Unlock() @@ -82,7 +76,7 @@ func (erS *EventReaderService) Start(shutdown *utils.SyncedChan, registry *servm erS.stopChan = make(chan struct{}) // build the service - erS.ers = ers.NewERService(erS.cfg, fs.FilterS(), erS.connMgr) + erS.ers = ers.NewERService(erS.cfg, fs.FilterS(), cms.ConnManager()) go erS.listenAndServe(erS.ers, erS.stopChan, erS.rldChan, shutdown) srv, err := engine.NewServiceWithPing(erS.ers, utils.ErSv1, utils.V1Prfx) @@ -92,7 +86,7 @@ func (erS *EventReaderService) Start(shutdown *utils.SyncedChan, registry *servm if !erS.cfg.DispatcherSCfg().Enabled { erS.cl.RpcRegister(srv) } - erS.intRPCconn = anz.GetInternalCodec(srv, utils.ERs) + cms.AddInternalConn(utils.ERs, srv) return } @@ -136,8 +130,3 @@ func (erS *EventReaderService) ShouldRun() bool { func (erS *EventReaderService) StateChan(stateID string) chan struct{} { return erS.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (erS *EventReaderService) IntRPCConn() birpc.ClientConnector { - return erS.intRPCconn -} diff --git a/services/filters.go b/services/filters.go index 46be8e55a..710151728 100644 --- a/services/filters.go +++ b/services/filters.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -29,31 +28,26 @@ import ( ) // NewFilterService instantiates a new FilterService. -func NewFilterService(cfg *config.CGRConfig, connMgr *engine.ConnManager) *FilterService { +func NewFilterService(cfg *config.CGRConfig) *FilterService { return &FilterService{ cfg: cfg, - connMgr: connMgr, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } // FilterService implements Service interface. type FilterService struct { - mu sync.RWMutex - - fltrS *engine.FilterS - - cfg *config.CGRConfig - connMgr *engine.ConnManager - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + mu sync.RWMutex + cfg *config.CGRConfig + fltrS *engine.FilterS + stateDeps *StateDependencies // channel subscriptions for state changes } // Start handles the service start. func (s *FilterService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ + utils.ConnManager, utils.CacheS, utils.DataDB, }, @@ -61,6 +55,7 @@ func (s *FilterService) Start(shutdown *utils.SyncedChan, registry *servmanager. if err != nil { return err } + cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheFilters); err != nil { return err @@ -70,7 +65,7 @@ func (s *FilterService) Start(shutdown *utils.SyncedChan, registry *servmanager. s.mu.Lock() defer s.mu.Unlock() - s.fltrS = engine.NewFilterS(s.cfg, s.connMgr, dbs.DataManager()) + s.fltrS = engine.NewFilterS(s.cfg, cms.ConnManager(), dbs.DataManager()) return nil } @@ -102,11 +97,6 @@ func (s *FilterService) StateChan(stateID string) chan struct{} { return s.stateDeps.StateChan(stateID) } -// IntRPCConn returns the internal connection used by RPCClient -func (s *FilterService) IntRPCConn() birpc.ClientConnector { - return s.intRPCconn -} - // FilterS returns the FilterS object. func (s *FilterService) FilterS() *engine.FilterS { return s.fltrS diff --git a/services/freeswitchagent.go b/services/freeswitchagent.go index 07936ad7f..a071ab460 100644 --- a/services/freeswitchagent.go +++ b/services/freeswitchagent.go @@ -22,9 +22,6 @@ import ( "fmt" "sync" - "github.com/cgrates/birpc" - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/servmanager" @@ -32,11 +29,9 @@ import ( ) // NewFreeswitchAgent returns the Freeswitch Agent -func NewFreeswitchAgent(cfg *config.CGRConfig, - connMgr *engine.ConnManager) *FreeswitchAgent { +func NewFreeswitchAgent(cfg *config.CGRConfig) *FreeswitchAgent { return &FreeswitchAgent{ cfg: cfg, - connMgr: connMgr, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -44,21 +39,22 @@ func NewFreeswitchAgent(cfg *config.CGRConfig, // FreeswitchAgent implements Agent interface type FreeswitchAgent struct { sync.RWMutex - cfg *config.CGRConfig - - fS *agents.FSsessions - connMgr *engine.ConnManager - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + cfg *config.CGRConfig + fS *agents.FSsessions + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (fS *FreeswitchAgent) Start(shutdown *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { +func (fS *FreeswitchAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { + cms, err := WaitForServiceState(utils.StateServiceUP, utils.ConnManager, registry, fS.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return + } + fS.Lock() defer fS.Unlock() - fS.fS = agents.NewFSsessions(fS.cfg.FsAgentCfg(), fS.cfg.GeneralCfg().DefaultTimezone, fS.connMgr) + fS.fS = agents.NewFSsessions(fS.cfg.FsAgentCfg(), fS.cfg.GeneralCfg().DefaultTimezone, cms.(*ConnManagerService).ConnManager()) go fS.connect(shutdown) return @@ -107,8 +103,3 @@ func (fS *FreeswitchAgent) ShouldRun() bool { func (fS *FreeswitchAgent) StateChan(stateID string) chan struct{} { return fS.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (fS *FreeswitchAgent) IntRPCConn() birpc.ClientConnector { - return fS.intRPCconn -} diff --git a/services/globalvars.go b/services/globalvars.go index d555cbae7..50060801d 100644 --- a/services/globalvars.go +++ b/services/globalvars.go @@ -19,7 +19,6 @@ along with this program. If not, see package services import ( - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/config" @@ -37,10 +36,8 @@ func NewGlobalVarS(cfg *config.CGRConfig) *GlobalVarS { // GlobalVarS implements Agent interface type GlobalVarS struct { - cfg *config.CGRConfig - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + cfg *config.CGRConfig + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -82,8 +79,3 @@ func (gv *GlobalVarS) ShouldRun() bool { func (gv *GlobalVarS) StateChan(stateID string) chan struct{} { return gv.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (gv *GlobalVarS) IntRPCConn() birpc.ClientConnector { - return gv.intRPCconn -} diff --git a/services/guardian.go b/services/guardian.go index cd7e528a4..5253dcbe9 100644 --- a/services/guardian.go +++ b/services/guardian.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -40,11 +39,10 @@ func NewGuardianService(cfg *config.CGRConfig) *GuardianService { // GuardianService implements Service interface. type GuardianService struct { - mu sync.RWMutex - cfg *config.CGRConfig - cl *commonlisteners.CommonListenerS - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + mu sync.RWMutex + cfg *config.CGRConfig + cl *commonlisteners.CommonListenerS + stateDeps *StateDependencies // channel subscriptions for state changes } // Start handles the service start. @@ -52,14 +50,14 @@ func (s *GuardianService) Start(_ *utils.SyncedChan, registry *servmanager.Servi srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, - utils.AnalyzerS, + utils.ConnManager, }, registry, s.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() - anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) + cms := srvDeps[utils.ConnManager].(*ConnManagerService) s.mu.Lock() defer s.mu.Unlock() @@ -70,7 +68,7 @@ func (s *GuardianService) Start(_ *utils.SyncedChan, registry *servmanager.Servi s.cl.RpcRegister(svc) } } - s.intRPCconn = anz.GetInternalCodec(svcs, utils.GuardianS) + cms.AddInternalConn(utils.GuardianS, svcs) return nil } @@ -101,8 +99,3 @@ func (s *GuardianService) ShouldRun() bool { func (s *GuardianService) StateChan(stateID string) chan struct{} { return s.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (s *GuardianService) IntRPCConn() birpc.ClientConnector { - return s.intRPCconn -} diff --git a/services/httpagent.go b/services/httpagent.go index 2e81e41d9..575b8c74a 100644 --- a/services/httpagent.go +++ b/services/httpagent.go @@ -21,21 +21,17 @@ package services import ( "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) // NewHTTPAgent returns the HTTP Agent -func NewHTTPAgent(cfg *config.CGRConfig, - connMgr *engine.ConnManager) *HTTPAgent { +func NewHTTPAgent(cfg *config.CGRConfig) *HTTPAgent { return &HTTPAgent{ cfg: cfg, - connMgr: connMgr, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -43,6 +39,7 @@ func NewHTTPAgent(cfg *config.CGRConfig, // HTTPAgent implements Agent interface type HTTPAgent struct { sync.RWMutex + cfg *config.CGRConfig cl *commonlisteners.CommonListenerS @@ -50,11 +47,7 @@ type HTTPAgent struct { // if we registerd the handlers started bool - connMgr *engine.ConnManager - cfg *config.CGRConfig - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -62,6 +55,7 @@ func (ha *HTTPAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceReg srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, + utils.ConnManager, utils.FilterS, }, registry, ha.cfg.GeneralCfg().ConnectTimeout) @@ -69,6 +63,7 @@ func (ha *HTTPAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceReg return err } cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cms := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager() fs := srvDeps[utils.FilterS].(*FilterService) ha.Lock() @@ -77,7 +72,7 @@ func (ha *HTTPAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceReg ha.started = true for _, agntCfg := range ha.cfg.HTTPAgentCfg() { cl.RegisterHttpHandler(agntCfg.URL, - agents.NewHTTPAgent(ha.connMgr, agntCfg.SessionSConns, fs.FilterS(), + agents.NewHTTPAgent(cms, agntCfg.SessionSConns, fs.FilterS(), ha.cfg.GeneralCfg().DefaultTenant, agntCfg.RequestPayload, agntCfg.ReplyPayload, agntCfg.RequestProcessors)) } @@ -111,8 +106,3 @@ func (ha *HTTPAgent) ShouldRun() bool { func (ha *HTTPAgent) StateChan(stateID string) chan struct{} { return ha.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (ha *HTTPAgent) IntRPCConn() birpc.ClientConnector { - return ha.intRPCconn -} diff --git a/services/janus.go b/services/janus.go index 4ea7fdd06..a14cab76f 100644 --- a/services/janus.go +++ b/services/janus.go @@ -23,20 +23,16 @@ import ( "net/http" "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) // NewJanusAgent returns the Janus Agent -func NewJanusAgent(cfg *config.CGRConfig, - connMgr *engine.ConnManager) *JanusAgent { +func NewJanusAgent(cfg *config.CGRConfig) *JanusAgent { return &JanusAgent{ cfg: cfg, - connMgr: connMgr, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -44,6 +40,7 @@ func NewJanusAgent(cfg *config.CGRConfig, // JanusAgent implements Service interface type JanusAgent struct { sync.RWMutex + cfg *config.CGRConfig jA *agents.JanusAgent @@ -51,11 +48,7 @@ type JanusAgent struct { // if we registerd the jandlers started bool - connMgr *engine.ConnManager - cfg *config.CGRConfig - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should jandle the sercive start @@ -63,6 +56,7 @@ func (ja *JanusAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, + utils.ConnManager, utils.FilterS, }, registry, ja.cfg.GeneralCfg().ConnectTimeout) @@ -70,6 +64,7 @@ func (ja *JanusAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe return err } cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cms := srvDeps[utils.ConnManager].(*ConnManagerService) fs := srvDeps[utils.FilterS].(*FilterService) ja.Lock() @@ -77,7 +72,7 @@ func (ja *JanusAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe ja.Unlock() return utils.ErrServiceAlreadyRunning } - ja.jA, err = agents.NewJanusAgent(ja.cfg, ja.connMgr, fs.FilterS()) + ja.jA, err = agents.NewJanusAgent(ja.cfg, cms.ConnManager(), fs.FilterS()) if err != nil { return } @@ -126,8 +121,3 @@ func (ja *JanusAgent) ShouldRun() bool { func (ja *JanusAgent) StateChan(stateID string) chan struct{} { return ja.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (ja *JanusAgent) IntRPCConn() birpc.ClientConnector { - return ja.intRPCconn -} diff --git a/services/kamailioagent.go b/services/kamailioagent.go index 5476cfec3..6db8feb5a 100644 --- a/services/kamailioagent.go +++ b/services/kamailioagent.go @@ -23,20 +23,16 @@ import ( "strings" "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) // NewKamailioAgent returns the Kamailio Agent -func NewKamailioAgent(cfg *config.CGRConfig, - connMgr *engine.ConnManager) *KamailioAgent { +func NewKamailioAgent(cfg *config.CGRConfig) *KamailioAgent { return &KamailioAgent{ cfg: cfg, - connMgr: connMgr, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -44,21 +40,22 @@ func NewKamailioAgent(cfg *config.CGRConfig, // KamailioAgent implements Agent interface type KamailioAgent struct { sync.RWMutex - cfg *config.CGRConfig - - kam *agents.KamailioAgent - connMgr *engine.ConnManager - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + cfg *config.CGRConfig + kam *agents.KamailioAgent + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (kam *KamailioAgent) Start(shutdown *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { +func (kam *KamailioAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { + cms, err := WaitForServiceState(utils.StateServiceUP, utils.ConnManager, registry, kam.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return + } + kam.Lock() defer kam.Unlock() - kam.kam = agents.NewKamailioAgent(kam.cfg.KamAgentCfg(), kam.connMgr, + kam.kam = agents.NewKamailioAgent(kam.cfg.KamAgentCfg(), cms.(*ConnManagerService).ConnManager(), utils.FirstNonEmpty(kam.cfg.KamAgentCfg().Timezone, kam.cfg.GeneralCfg().DefaultTimezone)) go kam.connect(kam.kam, shutdown) @@ -112,8 +109,3 @@ func (kam *KamailioAgent) ShouldRun() bool { func (kam *KamailioAgent) StateChan(stateID string) chan struct{} { return kam.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (kam *KamailioAgent) IntRPCConn() birpc.ClientConnector { - return kam.intRPCconn -} diff --git a/services/loaders.go b/services/loaders.go index 570c61a83..966fd97e7 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -31,11 +30,9 @@ import ( ) // NewLoaderService returns the Loader Service -func NewLoaderService(cfg *config.CGRConfig, - connMgr *engine.ConnManager) *LoaderService { +func NewLoaderService(cfg *config.CGRConfig) *LoaderService { return &LoaderService{ cfg: cfg, - connMgr: connMgr, stopChan: make(chan struct{}), stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } @@ -49,11 +46,9 @@ type LoaderService struct { cl *commonlisteners.CommonListenerS stopChan chan struct{} - connMgr *engine.ConnManager cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start @@ -61,23 +56,23 @@ func (ldrs *LoaderService) Start(_ *utils.SyncedChan, registry *servmanager.Serv srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, + utils.ConnManager, utils.FilterS, utils.DataDB, - utils.AnalyzerS, }, registry, ldrs.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } ldrs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cms := srvDeps[utils.ConnManager].(*ConnManagerService) fs := srvDeps[utils.FilterS].(*FilterService) dbs := srvDeps[utils.DataDB].(*DataDBService) - anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) ldrs.Lock() defer ldrs.Unlock() - ldrs.ldrs = loaders.NewLoaderS(ldrs.cfg, dbs.DataManager(), fs.FilterS(), ldrs.connMgr) + ldrs.ldrs = loaders.NewLoaderS(ldrs.cfg, dbs.DataManager(), fs.FilterS(), cms.ConnManager()) if !ldrs.ldrs.Enabled() { return @@ -92,7 +87,7 @@ func (ldrs *LoaderService) Start(_ *utils.SyncedChan, registry *servmanager.Serv ldrs.cl.RpcRegister(s) } } - ldrs.intRPCconn = anz.GetInternalCodec(srv, utils.LoaderS) + cms.AddInternalConn(utils.LoaderS, srv) return } @@ -100,6 +95,7 @@ func (ldrs *LoaderService) Start(_ *utils.SyncedChan, registry *servmanager.Serv func (ldrs *LoaderService) Reload(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ + utils.ConnManager, utils.FilterS, utils.DataDB, }, @@ -107,6 +103,7 @@ func (ldrs *LoaderService) Reload(_ *utils.SyncedChan, registry *servmanager.Ser if err != nil { return err } + cms := srvDeps[utils.ConnManager].(*ConnManagerService) fs := srvDeps[utils.FilterS].(*FilterService) dbs := srvDeps[utils.DataDB].(*DataDBService) close(ldrs.stopChan) @@ -115,7 +112,7 @@ func (ldrs *LoaderService) Reload(_ *utils.SyncedChan, registry *servmanager.Ser ldrs.RLock() defer ldrs.RUnlock() - ldrs.ldrs.Reload(dbs.DataManager(), fs.FilterS(), ldrs.connMgr) + ldrs.ldrs.Reload(dbs.DataManager(), fs.FilterS(), cms.ConnManager()) return ldrs.ldrs.ListenAndServe(ldrs.stopChan) } @@ -148,8 +145,3 @@ func (ldrs *LoaderService) GetLoaderS() *loaders.LoaderS { func (ldrs *LoaderService) StateChan(stateID string) chan struct{} { return ldrs.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (ldrs *LoaderService) IntRPCConn() birpc.ClientConnector { - return ldrs.intRPCconn -} diff --git a/services/radiusagent.go b/services/radiusagent.go index b975eeee5..66194b9b2 100644 --- a/services/radiusagent.go +++ b/services/radiusagent.go @@ -22,20 +22,16 @@ import ( "fmt" "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) // NewRadiusAgent returns the Radius Agent -func NewRadiusAgent(cfg *config.CGRConfig, - connMgr *engine.ConnManager) *RadiusAgent { +func NewRadiusAgent(cfg *config.CGRConfig) *RadiusAgent { return &RadiusAgent{ cfg: cfg, - connMgr: connMgr, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -46,24 +42,28 @@ type RadiusAgent struct { cfg *config.CGRConfig stopChan chan struct{} - rad *agents.RadiusAgent - connMgr *engine.ConnManager + rad *agents.RadiusAgent lnet string lauth string lacct string - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start func (rad *RadiusAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { - fs, err := WaitForServiceState(utils.StateServiceUP, utils.FilterS, registry, - rad.cfg.GeneralCfg().ConnectTimeout) + srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.ConnManager, + utils.FilterS, + }, + registry, rad.cfg.GeneralCfg().ConnectTimeout) if err != nil { return } + cms := srvDeps[utils.ConnManager].(*ConnManagerService) + fs := srvDeps[utils.FilterS].(*FilterService) rad.Lock() defer rad.Unlock() @@ -72,7 +72,7 @@ func (rad *RadiusAgent) Start(shutdown *utils.SyncedChan, registry *servmanager. rad.lauth = rad.cfg.RadiusAgentCfg().ListenAuth rad.lacct = rad.cfg.RadiusAgentCfg().ListenAcct - if rad.rad, err = agents.NewRadiusAgent(rad.cfg, fs.(*FilterService).FilterS(), rad.connMgr); err != nil { + if rad.rad, err = agents.NewRadiusAgent(rad.cfg, fs.FilterS(), cms.ConnManager()); err != nil { return } rad.stopChan = make(chan struct{}) @@ -128,8 +128,3 @@ func (rad *RadiusAgent) ShouldRun() bool { func (rad *RadiusAgent) StateChan(stateID string) chan struct{} { return rad.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (rad *RadiusAgent) IntRPCConn() birpc.ClientConnector { - return rad.intRPCconn -} diff --git a/services/rankings.go b/services/rankings.go index dcc74e625..bc5143ef0 100644 --- a/services/rankings.go +++ b/services/rankings.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/commonlisteners" @@ -33,11 +32,9 @@ import ( // NewRankingService returns the RankingS Service func NewRankingService(cfg *config.CGRConfig, - connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup) *RankingService { return &RankingService{ cfg: cfg, - connMgr: connMgr, srvDep: srvDep, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } @@ -45,16 +42,13 @@ func NewRankingService(cfg *config.CGRConfig, type RankingService struct { sync.RWMutex + cfg *config.CGRConfig ran *engine.RankingS cl *commonlisteners.CommonListenerS - connMgr *engine.ConnManager - cfg *config.CGRConfig - srvDep map[string]*sync.WaitGroup - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + srvDep map[string]*sync.WaitGroup + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -64,16 +58,17 @@ func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanag srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, + utils.ConnManager, utils.CacheS, utils.FilterS, utils.DataDB, - utils.AnalyzerS, }, registry, ran.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } ran.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheRankingProfiles, @@ -82,11 +77,10 @@ func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanag } fs := srvDeps[utils.FilterS].(*FilterService) dbs := srvDeps[utils.DataDB].(*DataDBService) - anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) ran.Lock() defer ran.Unlock() - ran.ran = engine.NewRankingS(dbs.DataManager(), ran.connMgr, fs.FilterS(), ran.cfg) + ran.ran = engine.NewRankingS(dbs.DataManager(), cms.ConnManager(), fs.FilterS(), ran.cfg) if err := ran.ran.StartRankingS(context.TODO()); err != nil { return err } @@ -99,7 +93,7 @@ func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanag ran.cl.RpcRegister(s) } } - ran.intRPCconn = anz.GetInternalCodec(srv, utils.RankingS) + cms.AddInternalConn(utils.RankingS, srv) return nil } @@ -136,8 +130,3 @@ func (ran *RankingService) ShouldRun() bool { func (ran *RankingService) StateChan(stateID string) chan struct{} { return ran.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (ran *RankingService) IntRPCConn() birpc.ClientConnector { - return ran.intRPCconn -} diff --git a/services/rates.go b/services/rates.go index af091ba52..25f460deb 100644 --- a/services/rates.go +++ b/services/rates.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -42,16 +41,12 @@ func NewRateService(cfg *config.CGRConfig) *RateService { // RateService is the service structure for RateS type RateService struct { sync.RWMutex - - rateS *rates.RateS - cl *commonlisteners.CommonListenerS - - rldChan chan struct{} - stopChan chan struct{} - cfg *config.CGRConfig - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + cfg *config.CGRConfig + rateS *rates.RateS + cl *commonlisteners.CommonListenerS + rldChan chan struct{} + stopChan chan struct{} + stateDeps *StateDependencies // channel subscriptions for state changes } // ServiceName returns the service name @@ -85,16 +80,17 @@ func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.S srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, + utils.ConnManager, utils.CacheS, utils.FilterS, utils.DataDB, - utils.AnalyzerS, }, registry, rs.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } rs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheRateProfiles, @@ -102,12 +98,11 @@ func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.S utils.CacheRateFilterIndexes); err != nil { return err } - fs := srvDeps[utils.FilterS].(*FilterService) - dbs := srvDeps[utils.DataDB].(*DataDBService) - anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) + fs := srvDeps[utils.FilterS].(*FilterService).FilterS() + dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager() rs.Lock() - rs.rateS = rates.NewRateS(rs.cfg, fs.FilterS(), dbs.DataManager()) + rs.rateS = rates.NewRateS(rs.cfg, fs, dbs) rs.Unlock() rs.stopChan = make(chan struct{}) @@ -121,8 +116,7 @@ func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.S if !rs.cfg.DispatcherSCfg().Enabled { rs.cl.RpcRegister(srv) } - - rs.intRPCconn = anz.GetInternalCodec(srv, utils.RateS) + cms.AddInternalConn(utils.RateS, srv) return } @@ -130,8 +124,3 @@ func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.S func (rs *RateService) StateChan(stateID string) chan struct{} { return rs.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (rs *RateService) IntRPCConn() birpc.ClientConnector { - return rs.intRPCconn -} diff --git a/services/registrarc.go b/services/registrarc.go index 000c2deed..326bcb980 100644 --- a/services/registrarc.go +++ b/services/registrarc.go @@ -21,19 +21,16 @@ package services import ( "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/registrarc" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) // NewRegistrarCService returns the Dispatcher Service -func NewRegistrarCService(cfg *config.CGRConfig, connMgr *engine.ConnManager) *RegistrarCService { +func NewRegistrarCService(cfg *config.CGRConfig) *RegistrarCService { return &RegistrarCService{ cfg: cfg, - connMgr: connMgr, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -41,26 +38,29 @@ func NewRegistrarCService(cfg *config.CGRConfig, connMgr *engine.ConnManager) *R // RegistrarCService implements Service interface type RegistrarCService struct { sync.RWMutex + cfg *config.CGRConfig dspS *registrarc.RegistrarCService stopChan chan struct{} rldChan chan struct{} - connMgr *engine.ConnManager - cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (dspS *RegistrarCService) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { +func (dspS *RegistrarCService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { dspS.Lock() defer dspS.Unlock() + cms, err := WaitForServiceState(utils.StateServiceUP, utils.ConnManager, registry, dspS.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return + } + dspS.stopChan = make(chan struct{}) dspS.rldChan = make(chan struct{}) - dspS.dspS = registrarc.NewRegistrarCService(dspS.cfg, dspS.connMgr) + dspS.dspS = registrarc.NewRegistrarCService(dspS.cfg, cms.(*ConnManagerService).ConnManager()) go dspS.dspS.ListenAndServe(dspS.stopChan, dspS.rldChan) return } @@ -96,8 +96,3 @@ func (dspS *RegistrarCService) ShouldRun() bool { func (dspS *RegistrarCService) StateChan(stateID string) chan struct{} { return dspS.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (dspS *RegistrarCService) IntRPCConn() birpc.ClientConnector { - return dspS.intRPCconn -} diff --git a/services/resources.go b/services/resources.go index 12f19bb0a..a972a97f4 100644 --- a/services/resources.go +++ b/services/resources.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" @@ -32,11 +31,9 @@ import ( // NewResourceService returns the Resource Service func NewResourceService(cfg *config.CGRConfig, - connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup) *ResourceService { return &ResourceService{ cfg: cfg, - connMgr: connMgr, srvDep: srvDep, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } @@ -45,16 +42,13 @@ func NewResourceService(cfg *config.CGRConfig, // ResourceService implements Service interface type ResourceService struct { sync.RWMutex + cfg *config.CGRConfig reS *engine.ResourceS cl *commonlisteners.CommonListenerS - connMgr *engine.ConnManager - cfg *config.CGRConfig - srvDep map[string]*sync.WaitGroup - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + srvDep map[string]*sync.WaitGroup + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start @@ -64,16 +58,17 @@ func (reS *ResourceService) Start(shutdown *utils.SyncedChan, registry *servmana srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, + utils.ConnManager, utils.CacheS, utils.FilterS, utils.DataDB, - utils.AnalyzerS, }, registry, reS.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } reS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheResourceProfiles, @@ -83,11 +78,10 @@ func (reS *ResourceService) Start(shutdown *utils.SyncedChan, registry *servmana } fs := srvDeps[utils.FilterS].(*FilterService) dbs := srvDeps[utils.DataDB].(*DataDBService) - anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) reS.Lock() defer reS.Unlock() - reS.reS = engine.NewResourceService(dbs.DataManager(), reS.cfg, fs.FilterS(), reS.connMgr) + reS.reS = engine.NewResourceService(dbs.DataManager(), reS.cfg, fs.FilterS(), cms.ConnManager()) reS.reS.StartLoop(context.TODO()) srv, _ := engine.NewService(reS.reS) // srv, _ := birpc.NewService(apis.NewResourceSv1(reS.reS), "", false) @@ -96,8 +90,7 @@ func (reS *ResourceService) Start(shutdown *utils.SyncedChan, registry *servmana reS.cl.RpcRegister(s) } } - - reS.intRPCconn = anz.GetInternalCodec(srv, utils.ResourceS) + cms.AddInternalConn(utils.ResourceS, srv) return } @@ -134,8 +127,3 @@ func (reS *ResourceService) ShouldRun() bool { func (reS *ResourceService) StateChan(stateID string) chan struct{} { return reS.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (reS *ResourceService) IntRPCConn() birpc.ClientConnector { - return reS.intRPCconn -} diff --git a/services/routes.go b/services/routes.go index 870f6ce8f..5500ddb6c 100644 --- a/services/routes.go +++ b/services/routes.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -30,11 +29,9 @@ import ( ) // NewRouteService returns the Route Service -func NewRouteService(cfg *config.CGRConfig, - connMgr *engine.ConnManager) *RouteService { +func NewRouteService(cfg *config.CGRConfig) *RouteService { return &RouteService{ cfg: cfg, - connMgr: connMgr, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -42,15 +39,12 @@ func NewRouteService(cfg *config.CGRConfig, // RouteService implements Service interface type RouteService struct { sync.RWMutex + cfg *config.CGRConfig routeS *engine.RouteS cl *commonlisteners.CommonListenerS - connMgr *engine.ConnManager - cfg *config.CGRConfig - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -58,16 +52,17 @@ func (routeS *RouteService) Start(shutdown *utils.SyncedChan, registry *servmana srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, + utils.ConnManager, utils.CacheS, utils.FilterS, utils.DataDB, - utils.AnalyzerS, }, registry, routeS.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } routeS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheRouteProfiles, @@ -76,11 +71,10 @@ func (routeS *RouteService) Start(shutdown *utils.SyncedChan, registry *servmana } fs := srvDeps[utils.FilterS].(*FilterService) dbs := srvDeps[utils.DataDB].(*DataDBService) - anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) routeS.Lock() defer routeS.Unlock() - routeS.routeS = engine.NewRouteService(dbs.DataManager(), fs.FilterS(), routeS.cfg, routeS.connMgr) + routeS.routeS = engine.NewRouteService(dbs.DataManager(), fs.FilterS(), routeS.cfg, cms.ConnManager()) srv, _ := engine.NewService(routeS.routeS) // srv, _ := birpc.NewService(apis.NewRouteSv1(routeS.routeS), "", false) if !routeS.cfg.DispatcherSCfg().Enabled { @@ -88,7 +82,7 @@ func (routeS *RouteService) Start(shutdown *utils.SyncedChan, registry *servmana routeS.cl.RpcRegister(s) } } - routeS.intRPCconn = anz.GetInternalCodec(srv, utils.RouteS) + cms.AddInternalConn(utils.RouteS, srv) return } @@ -120,8 +114,3 @@ func (routeS *RouteService) ShouldRun() bool { func (routeS *RouteService) StateChan(stateID string) chan struct{} { return routeS.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (routeS *RouteService) IntRPCConn() birpc.ClientConnector { - return routeS.intRPCconn -} diff --git a/services/sessions.go b/services/sessions.go index ab8179924..5c5ae3e12 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -22,7 +22,6 @@ import ( "fmt" "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/engine" @@ -33,11 +32,9 @@ import ( ) // NewSessionService returns the Session Service -func NewSessionService(cfg *config.CGRConfig, - connMgr *engine.ConnManager) *SessionService { +func NewSessionService(cfg *config.CGRConfig) *SessionService { return &SessionService{ cfg: cfg, - connMgr: connMgr, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -51,11 +48,9 @@ type SessionService struct { bircpEnabled bool // to stop birpc server if needed stopChan chan struct{} - connMgr *engine.ConnManager cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start @@ -63,23 +58,23 @@ func (smg *SessionService) Start(shutdown *utils.SyncedChan, registry *servmanag srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, + utils.ConnManager, utils.FilterS, utils.DataDB, - utils.AnalyzerS, }, registry, smg.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } smg.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cms := srvDeps[utils.ConnManager].(*ConnManagerService) fs := srvDeps[utils.FilterS].(*FilterService) dbs := srvDeps[utils.DataDB].(*DataDBService) - anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) smg.Lock() defer smg.Unlock() - smg.sm = sessions.NewSessionS(smg.cfg, dbs.DataManager(), fs.FilterS(), smg.connMgr) + smg.sm = sessions.NewSessionS(smg.cfg, dbs.DataManager(), fs.FilterS(), cms.ConnManager()) //start sync session in a separate goroutine smg.stopChan = make(chan struct{}) go smg.sm.ListenAndServe(smg.stopChan) @@ -102,7 +97,7 @@ func (smg *SessionService) Start(shutdown *utils.SyncedChan, registry *servmanag // run this in it's own goroutine go smg.start(shutdown) } - smg.intRPCconn = anz.GetInternalCodec(srv, utils.SessionS) + cms.AddInternalConn(utils.SessionS, srv) return } @@ -155,8 +150,3 @@ func (smg *SessionService) ShouldRun() bool { func (smg *SessionService) StateChan(stateID string) chan struct{} { return smg.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (smg *SessionService) IntRPCConn() birpc.ClientConnector { - return smg.intRPCconn -} diff --git a/services/sipagent.go b/services/sipagent.go index a18414acb..6447ef29c 100644 --- a/services/sipagent.go +++ b/services/sipagent.go @@ -22,20 +22,16 @@ import ( "fmt" "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) // NewSIPAgent returns the sip Agent -func NewSIPAgent(cfg *config.CGRConfig, - connMgr *engine.ConnManager) *SIPAgent { +func NewSIPAgent(cfg *config.CGRConfig) *SIPAgent { return &SIPAgent{ cfg: cfg, - connMgr: connMgr, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -45,30 +41,31 @@ type SIPAgent struct { sync.RWMutex cfg *config.CGRConfig - sip *agents.SIPAgent - connMgr *engine.ConnManager - + sip *agents.SIPAgent oldListen string - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start func (sip *SIPAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { - fs, err := WaitForServiceState(utils.StateServiceUP, utils.FilterS, registry, - sip.cfg.GeneralCfg().ConnectTimeout) + srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.ConnManager, + utils.FilterS, + }, + registry, sip.cfg.GeneralCfg().ConnectTimeout) if err != nil { return } + cm := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager() + fs := srvDeps[utils.FilterS].(*FilterService).FilterS() sip.Lock() defer sip.Unlock() sip.oldListen = sip.cfg.SIPAgentCfg().Listen - sip.sip, err = agents.NewSIPAgent(sip.connMgr, sip.cfg, fs.(*FilterService).FilterS()) + sip.sip, err = agents.NewSIPAgent(cm, sip.cfg, fs) if err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", - utils.SIPAgent, err)) return } go sip.listenAndServe(shutdown) @@ -119,8 +116,3 @@ func (sip *SIPAgent) ShouldRun() bool { func (sip *SIPAgent) StateChan(stateID string) chan struct{} { return sip.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (sip *SIPAgent) IntRPCConn() birpc.ClientConnector { - return sip.intRPCconn -} diff --git a/services/statedeps.go b/services/statedeps.go index f124ab32a..31bf50670 100644 --- a/services/statedeps.go +++ b/services/statedeps.go @@ -52,11 +52,11 @@ func (sDs *StateDependencies) StateChan(stateID string) (retChan chan struct{}) // WaitForServicesToReachState ensures each service reaches the desired state, with the timeout applied individually per service. // Returns a map of service names to their instances or an error if any service fails to reach its state within its timeout window. -func WaitForServicesToReachState(state string, serviceIDs []string, indexer *servmanager.ServiceRegistry, timeout time.Duration, +func WaitForServicesToReachState(state string, serviceIDs []string, registry *servmanager.ServiceRegistry, timeout time.Duration, ) (map[string]servmanager.Service, error) { services := make(map[string]servmanager.Service, len(serviceIDs)) for _, serviceID := range serviceIDs { - srv, err := WaitForServiceState(state, serviceID, indexer, timeout) + srv, err := WaitForServiceState(state, serviceID, registry, timeout) if err != nil { return nil, err } @@ -68,13 +68,19 @@ func WaitForServicesToReachState(state string, serviceIDs []string, indexer *ser // WaitForServiceState waits up to timeout duration for a service to reach the specified state. // Returns the service instance or an error if the timeout is exceeded. -func WaitForServiceState(state, serviceID string, indexer *servmanager.ServiceRegistry, timeout time.Duration, +func WaitForServiceState(state, serviceID string, registry *servmanager.ServiceRegistry, timeout time.Duration, ) (servmanager.Service, error) { - srv := indexer.Lookup(serviceID) - if serviceID == utils.AnalyzerS && !srv.ShouldRun() { - // Return disabled analyzer service immediately since dependent - // services still need the instance. - return srv, nil + srv := registry.Lookup(serviceID) + if !srv.ShouldRun() { + switch serviceID { + case utils.AnalyzerS: + // Return disabled analyzer service immediately since dependent + // services still need the instance. + return srv, nil + case utils.AttributeS: + // Don't make DispatcherS wait when AttributeS is disabled. + return srv, nil + } } select { case <-srv.StateChan(state): diff --git a/services/stats.go b/services/stats.go index f631845ef..e7f54ce9e 100644 --- a/services/stats.go +++ b/services/stats.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" @@ -31,10 +30,9 @@ import ( ) // NewStatService returns the Stat Service -func NewStatService(cfg *config.CGRConfig, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup) *StatService { +func NewStatService(cfg *config.CGRConfig, srvDep map[string]*sync.WaitGroup) *StatService { return &StatService{ cfg: cfg, - connMgr: connMgr, srvDep: srvDep, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } @@ -43,16 +41,13 @@ func NewStatService(cfg *config.CGRConfig, connMgr *engine.ConnManager, srvDep m // StatService implements Service interface type StatService struct { sync.RWMutex + cfg *config.CGRConfig sts *engine.StatS cl *commonlisteners.CommonListenerS - connMgr *engine.ConnManager - cfg *config.CGRConfig - srvDep map[string]*sync.WaitGroup - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + srvDep map[string]*sync.WaitGroup + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -62,16 +57,17 @@ func (sts *StatService) Start(shutdown *utils.SyncedChan, registry *servmanager. srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, + utils.ConnManager, utils.CacheS, utils.FilterS, utils.DataDB, - utils.AnalyzerS, }, registry, sts.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } sts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheStatQueueProfiles, @@ -81,11 +77,10 @@ func (sts *StatService) Start(shutdown *utils.SyncedChan, registry *servmanager. } fs := srvDeps[utils.FilterS].(*FilterService) dbs := srvDeps[utils.DataDB].(*DataDBService) - anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) sts.Lock() defer sts.Unlock() - sts.sts = engine.NewStatService(dbs.DataManager(), sts.cfg, fs.FilterS(), sts.connMgr) + sts.sts = engine.NewStatService(dbs.DataManager(), sts.cfg, fs.FilterS(), cms.ConnManager()) sts.sts.StartLoop(context.TODO()) srv, _ := engine.NewService(sts.sts) // srv, _ := birpc.NewService(apis.NewStatSv1(sts.sts), "", false) @@ -94,7 +89,7 @@ func (sts *StatService) Start(shutdown *utils.SyncedChan, registry *servmanager. sts.cl.RpcRegister(s) } } - sts.intRPCconn = anz.GetInternalCodec(srv, utils.StatS) + cms.AddInternalConn(utils.StatS, srv) return } @@ -131,8 +126,3 @@ func (sts *StatService) ShouldRun() bool { func (sts *StatService) StateChan(stateID string) chan struct{} { return sts.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (sts *StatService) IntRPCConn() birpc.ClientConnector { - return sts.intRPCconn -} diff --git a/services/stordb.go b/services/stordb.go index 060244354..55bd1359e 100644 --- a/services/stordb.go +++ b/services/stordb.go @@ -22,7 +22,6 @@ import ( "fmt" "sync" - "github.com/cgrates/birpc" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -41,14 +40,11 @@ func NewStorDBService(cfg *config.CGRConfig, setVersions bool) *StorDBService { // StorDBService implements Service interface type StorDBService struct { sync.RWMutex - cfg *config.CGRConfig - oldDBCfg *config.StorDbCfg - + cfg *config.CGRConfig + oldDBCfg *config.StorDbCfg db engine.StorDB setVersions bool - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start @@ -177,8 +173,3 @@ func (db *StorDBService) DB() engine.StorDB { func (db *StorDBService) StateChan(stateID string) chan struct{} { return db.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (db *StorDBService) IntRPCConn() birpc.ClientConnector { - return db.intRPCconn -} diff --git a/services/thresholds.go b/services/thresholds.go index 5388df0e5..dfc137dbb 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" @@ -32,12 +31,10 @@ import ( // NewThresholdService returns the Threshold Service func NewThresholdService(cfg *config.CGRConfig, - connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup) *ThresholdService { return &ThresholdService{ cfg: cfg, srvDep: srvDep, - connMgr: connMgr, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -45,16 +42,13 @@ func NewThresholdService(cfg *config.CGRConfig, // ThresholdService implements Service interface type ThresholdService struct { sync.RWMutex + cfg *config.CGRConfig thrs *engine.ThresholdS cl *commonlisteners.CommonListenerS - connMgr *engine.ConnManager - cfg *config.CGRConfig - srvDep map[string]*sync.WaitGroup - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + srvDep map[string]*sync.WaitGroup + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -64,16 +58,17 @@ func (thrs *ThresholdService) Start(shutdown *utils.SyncedChan, registry *servma srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, + utils.ConnManager, utils.CacheS, utils.FilterS, utils.DataDB, - utils.AnalyzerS, }, registry, thrs.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } thrs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheThresholdProfiles, @@ -83,11 +78,10 @@ func (thrs *ThresholdService) Start(shutdown *utils.SyncedChan, registry *servma } fs := srvDeps[utils.FilterS].(*FilterService) dbs := srvDeps[utils.DataDB].(*DataDBService) - anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) thrs.Lock() defer thrs.Unlock() - thrs.thrs = engine.NewThresholdService(dbs.DataManager(), thrs.cfg, fs.FilterS(), thrs.connMgr) + thrs.thrs = engine.NewThresholdService(dbs.DataManager(), thrs.cfg, fs.FilterS(), cms.ConnManager()) thrs.thrs.StartLoop(context.TODO()) srv, _ := engine.NewService(thrs.thrs) // srv, _ := birpc.NewService(apis.NewThresholdSv1(thrs.thrs), "", false) @@ -96,7 +90,7 @@ func (thrs *ThresholdService) Start(shutdown *utils.SyncedChan, registry *servma thrs.cl.RpcRegister(s) } } - thrs.intRPCconn = anz.GetInternalCodec(srv, utils.ThresholdS) + cms.AddInternalConn(utils.ThresholdS, srv) return } @@ -133,8 +127,3 @@ func (thrs *ThresholdService) ShouldRun() bool { func (thrs *ThresholdService) StateChan(stateID string) chan struct{} { return thrs.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (thrs *ThresholdService) IntRPCConn() birpc.ClientConnector { - return thrs.intRPCconn -} diff --git a/services/tpes.go b/services/tpes.go index cc0657219..366f0135b 100644 --- a/services/tpes.go +++ b/services/tpes.go @@ -24,17 +24,15 @@ import ( "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/tpes" "github.com/cgrates/cgrates/utils" ) // NewTPeService is the constructor for the TpeService -func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager) *TPeService { +func NewTPeService(cfg *config.CGRConfig) *TPeService { return &TPeService{ cfg: cfg, - connMgr: connMgr, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -42,17 +40,12 @@ func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager) *TPeServi // TypeService implements Service interface type TPeService struct { sync.RWMutex - - tpes *tpes.TPeS - cl *commonlisteners.CommonListenerS - srv *birpc.Service - - stopChan chan struct{} - connMgr *engine.ConnManager - cfg *config.CGRConfig - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + cfg *config.CGRConfig + tpes *tpes.TPeS + cl *commonlisteners.CommonListenerS + srv *birpc.Service + stopChan chan struct{} + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start @@ -61,6 +54,7 @@ func (ts *TPeService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, + utils.ConnManager, utils.DataDB, }, registry, ts.cfg.GeneralCfg().ConnectTimeout) @@ -68,9 +62,10 @@ func (ts *TPeService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe return err } ts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() - dbs := srvDeps[utils.DataDB].(*DataDBService) + cm := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager() + dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager() - ts.tpes = tpes.NewTPeS(ts.cfg, dbs.DataManager(), ts.connMgr) + ts.tpes = tpes.NewTPeS(ts.cfg, dbs, cm) ts.stopChan = make(chan struct{}) ts.srv, _ = birpc.NewService(apis.NewTPeSv1(ts.tpes), utils.EmptyString, false) ts.cl.RpcRegister(ts.srv) @@ -103,8 +98,3 @@ func (ts *TPeService) ShouldRun() bool { func (ts *TPeService) StateChan(stateID string) chan struct{} { return ts.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (ts *TPeService) IntRPCConn() birpc.ClientConnector { - return ts.intRPCconn -} diff --git a/services/trends.go b/services/trends.go index b6cc696a3..7df080f19 100644 --- a/services/trends.go +++ b/services/trends.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" @@ -32,11 +31,9 @@ import ( // NewTrendsService returns the TrendS Service func NewTrendService(cfg *config.CGRConfig, - connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup) *TrendService { return &TrendService{ cfg: cfg, - connMgr: connMgr, srvDep: srvDep, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } @@ -44,16 +41,13 @@ func NewTrendService(cfg *config.CGRConfig, type TrendService struct { sync.RWMutex + cfg *config.CGRConfig trs *engine.TrendS cl *commonlisteners.CommonListenerS - connMgr *engine.ConnManager - cfg *config.CGRConfig - srvDep map[string]*sync.WaitGroup - - intRPCconn birpc.ClientConnector // expose API methods over internal connection - stateDeps *StateDependencies // channel subscriptions for state changes + srvDep map[string]*sync.WaitGroup + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -63,16 +57,17 @@ func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, + utils.ConnManager, utils.CacheS, utils.FilterS, utils.DataDB, - utils.AnalyzerS, }, registry, trs.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } trs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheTrendProfiles, @@ -81,11 +76,10 @@ func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager } fs := srvDeps[utils.FilterS].(*FilterService) dbs := srvDeps[utils.DataDB].(*DataDBService) - anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) trs.Lock() defer trs.Unlock() - trs.trs = engine.NewTrendService(dbs.DataManager(), trs.cfg, fs.FilterS(), trs.connMgr) + trs.trs = engine.NewTrendService(dbs.DataManager(), trs.cfg, fs.FilterS(), cms.ConnManager()) if err := trs.trs.StartTrendS(context.TODO()); err != nil { return err } @@ -98,7 +92,7 @@ func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager trs.cl.RpcRegister(s) } } - trs.intRPCconn = anz.GetInternalCodec(srv, utils.Trends) + cms.AddInternalConn(utils.TrendS, srv) return nil } @@ -135,8 +129,3 @@ func (trs *TrendService) ShouldRun() bool { func (trs *TrendService) StateChan(stateID string) chan struct{} { return trs.stateDeps.StateChan(stateID) } - -// IntRPCConn returns the internal connection used by RPCClient -func (trs *TrendService) IntRPCConn() birpc.ClientConnector { - return trs.intRPCconn -} diff --git a/servmanager/service_registry.go b/servmanager/service_registry.go index 2afd34652..c84937ffa 100644 --- a/servmanager/service_registry.go +++ b/servmanager/service_registry.go @@ -44,19 +44,23 @@ func (r *ServiceRegistry) Lookup(id string) Service { return r.services[id] } -// Register adds or updates a Service using its name as the unique identifier. -// Will overwrite existing service if name conflicts. -func (r *ServiceRegistry) Register(s Service) { +// Register adds or updates Services using their name as the unique identifier. +// Will overwrite existing services if name conflicts. +func (r *ServiceRegistry) Register(svcs ...Service) { r.mu.Lock() defer r.mu.Unlock() - r.services[s.ServiceName()] = s + for _, svc := range svcs { + r.services[svc.ServiceName()] = svc + } } -// Unregister removes a Service by ID. -func (r *ServiceRegistry) Unregister(id string) { +// Unregister removes Services by ID. +func (r *ServiceRegistry) Unregister(ids ...string) { r.mu.Lock() defer r.mu.Unlock() - delete(r.services, id) + for _, id := range ids { + delete(r.services, id) + } } // List returns a new slice containing all registered Services. diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index fa9e6106f..9cd111629 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -22,25 +22,21 @@ import ( "fmt" "sync" - "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/rpcclient" ) // NewServiceManager returns a service manager -func NewServiceManager(shdWg *sync.WaitGroup, connMgr *engine.ConnManager, - cfg *config.CGRConfig, registry *ServiceRegistry, services []Service) (sM *ServiceManager) { +func NewServiceManager(shdWg *sync.WaitGroup, cfg *config.CGRConfig, registry *ServiceRegistry, + services []Service) (sM *ServiceManager) { sM = &ServiceManager{ cfg: cfg, registry: registry, shdWg: shdWg, - connMgr: connMgr, rldChan: cfg.GetReloadChan(), } - sM.AddServices(services...) + sM.registry.Register(services...) return } @@ -51,7 +47,6 @@ type ServiceManager struct { registry *ServiceRegistry // index here the services for accessing them by their IDs shdWg *sync.WaitGroup // list of shutdown items rldChan <-chan string // reload signals come over this channelc - connMgr *engine.ConnManager } // StartServices starts all enabled services @@ -64,8 +59,7 @@ func (m *ServiceManager) StartServices(shutdown *utils.SyncedChan) { if svc.ShouldRun() && !IsServiceInState(svc, utils.StateServiceUP) { m.shdWg.Add(1) go func() { - if err := svc.Start(shutdown, m.registry); err != nil && - err != utils.ErrServiceAlreadyRunning { // in case the service was started in another gorutine + if err := svc.Start(shutdown, m.registry); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to start <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err)) shutdown.CloseOnce() } @@ -77,33 +71,6 @@ func (m *ServiceManager) StartServices(shutdown *utils.SyncedChan) { // startServer() } -// AddServices adds given services -func (m *ServiceManager) AddServices(services ...Service) { - m.Lock() - for _, svc := range services { - m.registry.Register(svc) - if sAPIData, hasAPIData := serviceAPIData[svc.ServiceName()]; hasAPIData { // Add the internal connections - rpcIntChan := make(chan birpc.ClientConnector, 1) - m.connMgr.AddInternalConn(sAPIData[1], sAPIData[0], rpcIntChan) - if len(sAPIData) > 2 { // Add the bidirectional API - m.connMgr.AddInternalConn(sAPIData[2], sAPIData[0], rpcIntChan) - } - go func() { // ToDo: centralize management into one single goroutine - if utils.StructChanTimeout( - m.registry.Lookup(svc.ServiceName()).StateChan(utils.StateServiceUP), - m.cfg.GeneralCfg().ConnectTimeout) { - utils.Logger.Err( - fmt.Sprintf("<%s> failed to register internal connection to service %s because of timeout waiting for ServiceUP state", - utils.ServiceManager, svc.ServiceName())) - // toDo: shutdown service - } - rpcIntChan <- svc.IntRPCConn() - }() - } - } - m.Unlock() -} - func (m *ServiceManager) handleReload(shutdown *utils.SyncedChan) { var serviceID string for { @@ -113,12 +80,7 @@ func (m *ServiceManager) handleReload(shutdown *utils.SyncedChan) { return case serviceID = <-m.rldChan: } - if serviceID == config.RPCConnsJSON { - go m.connMgr.Reload() - } else { - go m.reloadService(serviceID, shutdown) - - } + go m.reloadService(serviceID, shutdown) // handle RPC server } } @@ -189,8 +151,6 @@ type Service interface { ServiceName() string // StateChan returns the channel for specific state subscription StateChan(stateID string) chan struct{} - // IntRPCConn returns the connector needed for internal RPC connections - IntRPCConn() birpc.ClientConnector } // ArgsServiceID are passed to Start/Stop/Status RPC methods @@ -338,88 +298,6 @@ func toggleService(id string, status bool, srvMngr *ServiceManager) (err error) return } -var serviceAPIData = map[string][]string{ - utils.AnalyzerS: { - utils.AnalyzerSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAnalyzerS)}, - utils.AdminS: { - utils.AdminSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAdminS)}, - utils.AttributeS: { - utils.AttributeSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes)}, - utils.CacheS: { - utils.CacheSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)}, - utils.CDRs: { - utils.CDRsV1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs)}, - utils.ChargerS: { - utils.ChargerSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers)}, - utils.GuardianS: { - utils.GuardianSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian)}, - utils.LoaderS: { - utils.LoaderSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaLoaders)}, - utils.ResourceS: { - utils.ResourceSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources)}, - utils.SessionS: { - utils.SessionSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS), - utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS)}, - utils.StatS: { - utils.StatSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats)}, - utils.RankingS: { - utils.RankingSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRankings)}, - utils.TrendS: { - utils.TrendSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTrends)}, - utils.RouteS: { - utils.RouteSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRoutes)}, - utils.ThresholdS: { - utils.ThresholdSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds)}, - utils.ServiceManagerS: { - utils.ServiceManagerV1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaServiceManager)}, - utils.ConfigS: { - utils.ConfigSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig)}, - utils.CoreS: { - utils.CoreSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore)}, - utils.EEs: { - utils.EeSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs)}, - utils.RateS: { - utils.RateSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRates)}, - utils.DispatcherS: { - utils.DispatcherSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers)}, - utils.AccountS: { - utils.AccountSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts)}, - utils.ActionS: { - utils.ActionSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions)}, - utils.TPeS: { - utils.TPeSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTpes)}, - utils.EFs: { - utils.EfSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEFs)}, - utils.ERs: { - utils.ErSv1, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaERs)}, -} - // IsServiceInState performs a non-blocking check to determine if a service is in the specified state. func IsServiceInState(svc Service, state string) bool { select { diff --git a/utils/consts.go b/utils/consts.go index 503dbde7b..39cecf694 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -993,6 +993,7 @@ const ( GuardianS = "GuardianS" ServiceManagerS = "ServiceManager" CommonListenerS = "CommonListenerS" + ConnManager = "ConnManager" ) // Lower service names