diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 0dcf69fe5..95ad91592 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -39,7 +39,6 @@ import ( "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/efs" "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/loaders" "github.com/cgrates/cgrates/services" "github.com/cgrates/cgrates/servmanager" @@ -129,10 +128,6 @@ func runCGREngine(fs []string) (err error) { iServeManagerCh := make(chan birpc.ClientConnector, 1) connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaServiceManager), utils.ServiceManagerV1, iServeManagerCh) - iConfigCh := make(chan birpc.ClientConnector, 1) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig), utils.ConfigSv1, iConfigCh) - iGuardianSCh := make(chan birpc.ClientConnector, 1) - connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian), utils.GuardianSv1, iGuardianSCh) // ServiceIndexer will share service references to all services srvIdxr := servmanager.NewServiceIndexer() @@ -141,6 +136,8 @@ func runCGREngine(fs []string) (err error) { sdbS := services.NewStorDBService(cfg, *flags.SetVersions, srvIdxr) cls := services.NewCommonListenerService(cfg, caps, srvIdxr) anzS := services.NewAnalyzerService(cfg, srvIdxr) + configS := services.NewConfigService(cfg, srvIdxr) + guardianS := services.NewGuardianService(cfg, srvIdxr) coreS := services.NewCoreService(cfg, caps, cpuPrfF, shdWg, srvIdxr) cacheS := services.NewCacheService(cfg, connMgr, srvIdxr) fltrS := services.NewFilterService(cfg, connMgr, srvIdxr) @@ -175,47 +172,48 @@ func runCGREngine(fs []string) (err error) { accS := services.NewAccountService(cfg, connMgr, srvIdxr) tpeS := services.NewTPeService(cfg, connMgr, srvIdxr) - srvManager := servmanager.NewServiceManager(shdWg, connMgr, cfg, srvIdxr, - []servmanager.Service{ - gvS, - dmS, - sdbS, - cls, - anzS, - coreS, - cacheS, - fltrS, - dspS, - ldrs, - efs, - adminS, - sessionS, - attrS, - chrgS, - routeS, - resourceS, - trendS, - rankingS, - thS, - stS, - erS, - dnsAgent, - fsAgent, - kamAgent, - janusAgent, - astAgent, - radAgent, - diamAgent, - httpAgent, - sipAgent, - eeS, - cdrS, - registrarcS, - rateS, - actionS, - accS, - tpeS, - }) + srvManager := servmanager.NewServiceManager(shdWg, connMgr, cfg, srvIdxr, []servmanager.Service{ + gvS, + dmS, + sdbS, + cls, + anzS, + configS, + guardianS, + coreS, + cacheS, + fltrS, + dspS, + ldrs, + efs, + adminS, + sessionS, + attrS, + chrgS, + routeS, + resourceS, + trendS, + rankingS, + thS, + stS, + erS, + dnsAgent, + fsAgent, + kamAgent, + janusAgent, + astAgent, + radAgent, + diamAgent, + httpAgent, + sipAgent, + eeS, + cdrS, + registrarcS, + rateS, + actionS, + accS, + tpeS, + }) defer func() { ctx, cancel := context.WithTimeout(context.Background(), cfg.CoreSCfg().ShutdownTimeout*10) @@ -309,10 +307,7 @@ func runCGREngine(fs []string) (err error) { return } srvManager.StartServices(ctx, cancel) - cgrInitServiceManagerV1(iServeManagerCh, srvManager, cfg, cls.CLS(), anzS) - cgrInitGuardianSv1(iGuardianSCh, cfg, cls.CLS(), anzS) - cgrInitConfigSv1(iConfigCh, cfg, cls.CLS(), anzS) if *flags.Preload != utils.EmptyString { if err = cgrRunPreload(ctx, cfg, *flags.Preload, srvIdxr); err != nil { @@ -368,17 +363,6 @@ func cgrRunPreload(ctx *context.Context, cfg *config.CGRConfig, loaderIDs string return } -func cgrInitGuardianSv1(iGuardianSCh chan birpc.ClientConnector, cfg *config.CGRConfig, - cl *commonlisteners.CommonListenerS, anz *services.AnalyzerService) { - srv, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true) - if !cfg.DispatcherSCfg().Enabled { - for _, s := range srv { - cl.RpcRegister(s) - } - } - iGuardianSCh <- anz.GetInternalCodec(srv, utils.GuardianS) -} - func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector, srvMngr *servmanager.ServiceManager, cfg *config.CGRConfig, cl *commonlisteners.CommonListenerS, anz *services.AnalyzerService) { @@ -389,18 +373,6 @@ func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector, iServMngrCh <- anz.GetInternalCodec(srv, utils.ServiceManager) } -func cgrInitConfigSv1(iConfigCh chan birpc.ClientConnector, - cfg *config.CGRConfig, cl *commonlisteners.CommonListenerS, anz *services.AnalyzerService) { - srv, _ := engine.NewServiceWithName(cfg, utils.ConfigS, true) - // srv, _ := birpc.NewService(apis.NewConfigSv1(cfg), "", false) - if !cfg.DispatcherSCfg().Enabled { - for _, s := range srv { - cl.RpcRegister(s) - } - } - iConfigCh <- anz.GetInternalCodec(srv, utils.ConfigSv1) -} - func cgrStartRPC(ctx *context.Context, shtdwnEngine context.CancelFunc, cfg *config.CGRConfig, sIdxr *servmanager.ServiceIndexer) { if cfg.DispatcherSCfg().Enabled { // wait only for dispatcher as cache is allways registered before this diff --git a/services/config.go b/services/config.go new file mode 100644 index 000000000..665e3f6c3 --- /dev/null +++ b/services/config.go @@ -0,0 +1,105 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "sync" + + "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" +) + +// NewConfigService instantiates a new ConfigService. +func NewConfigService(cfg *config.CGRConfig, srvIndexer *servmanager.ServiceIndexer) *ConfigService { + return &ConfigService{ + cfg: cfg, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + } +} + +// ConfigService implements Service interface. +type ConfigService struct { + mu sync.RWMutex + cfg *config.CGRConfig + intRPCconn birpc.ClientConnector // expose API methods over internal connection + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes +} + +// Start handles the service start. +func (s *ConfigService) Start(_ *context.Context, _ context.CancelFunc) error { + cls := s.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.GuardianS, utils.CommonListenerS, utils.StateServiceUP) + } + anz := s.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.GuardianS, utils.AnalyzerS, utils.StateServiceUP) + } + + srv, _ := engine.NewServiceWithName(s.cfg, utils.ConfigS, true) + if !s.cfg.DispatcherSCfg().Enabled { + for _, s := range srv { + cls.CLS().RpcRegister(s) + } + } + s.intRPCconn = anz.GetInternalCodec(srv, utils.ConfigSv1) + close(s.stateDeps.StateChan(utils.StateServiceUP)) + return nil +} + +// Reload handles the config changes. +func (s *ConfigService) Reload(*context.Context, context.CancelFunc) error { + return nil +} + +// Shutdown stops the service. +func (s *ConfigService) Shutdown() error { + return nil +} + +// IsRunning returns whether the service is running or not. +func (s *ConfigService) IsRunning() bool { + return true +} + +// ServiceName returns the service name +func (s *ConfigService) ServiceName() string { + return utils.ConfigS +} + +// ShouldRun returns if the service should be running. +func (s *ConfigService) ShouldRun() bool { + return true +} + +// StateChan returns signaling channel of specific state +func (s *ConfigService) StateChan(stateID string) chan struct{} { + return s.stateDeps.StateChan(stateID) +} + +// IntRPCConn returns the internal connection used by RPCClient +func (s *ConfigService) IntRPCConn() birpc.ClientConnector { + return s.intRPCconn +} diff --git a/services/guardian.go b/services/guardian.go new file mode 100644 index 000000000..df5b15d34 --- /dev/null +++ b/services/guardian.go @@ -0,0 +1,105 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "sync" + + "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/guardian" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" +) + +// NewGuardianService instantiates a new GuardianService. +func NewGuardianService(cfg *config.CGRConfig, srvIndexer *servmanager.ServiceIndexer) *GuardianService { + return &GuardianService{ + cfg: cfg, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + } +} + +// GuardianService implements Service interface. +type GuardianService struct { + mu sync.RWMutex + cfg *config.CGRConfig + intRPCconn birpc.ClientConnector // expose API methods over internal connection + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes +} + +// Start handles the service start. +func (s *GuardianService) Start(_ *context.Context, _ context.CancelFunc) error { + cls := s.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.GuardianS, utils.CommonListenerS, utils.StateServiceUP) + } + anz := s.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.GuardianS, utils.AnalyzerS, utils.StateServiceUP) + } + srv, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true) + if !s.cfg.DispatcherSCfg().Enabled { + for _, s := range srv { + cls.CLS().RpcRegister(s) + } + } + s.intRPCconn = anz.GetInternalCodec(srv, utils.GuardianS) + close(s.stateDeps.StateChan(utils.StateServiceUP)) + return nil +} + +// Reload handles the config changes. +func (s *GuardianService) Reload(*context.Context, context.CancelFunc) error { + return nil +} + +// Shutdown stops the service. +func (s *GuardianService) Shutdown() error { + return nil +} + +// IsRunning returns whether the service is running or not. +func (s *GuardianService) IsRunning() bool { + return true +} + +// ServiceName returns the service name +func (s *GuardianService) ServiceName() string { + return utils.FilterS +} + +// ShouldRun returns if the service should be running. +func (s *GuardianService) ShouldRun() bool { + return true +} + +// StateChan returns signaling channel of specific state +func (s *GuardianService) StateChan(stateID string) chan struct{} { + return s.stateDeps.StateChan(stateID) +} + +// IntRPCConn returns the internal connection used by RPCClient +func (s *GuardianService) IntRPCConn() birpc.ClientConnector { + return s.intRPCconn +}