From 6be26def4f8af6f5e860fb6e602726ed9fe3fb7d Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 15 Jan 2025 12:21:53 +0200 Subject: [PATCH] Implement CapService --- cmd/cgr-engine/cgr-engine.go | 10 ++--- services/caps.go | 80 ++++++++++++++++++++++++++++++++++++ services/commonlisteners.go | 48 +++++++++++----------- services/cores.go | 69 +++++++++++++++---------------- services/diameteragent.go | 78 ++++++++++++++++++----------------- utils/consts.go | 1 + 6 files changed, 185 insertions(+), 101 deletions(-) create mode 100644 services/caps.go diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index b357dd330..a0548183a 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -36,7 +36,6 @@ import ( "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/loaders" "github.com/cgrates/cgrates/services" "github.com/cgrates/cgrates/servmanager" @@ -125,7 +124,6 @@ func runCGREngine(fs []string) (err error) { utils.Logger.Info(fmt.Sprintf(" starting version <%s><%s>", vers, runtime.Version())) - caps := engine.NewCaps(cfg.CoreSCfg().Caps, cfg.CoreSCfg().CapsStrategy) srvDep := map[string]*sync.WaitGroup{ utils.DataDB: new(sync.WaitGroup), } @@ -133,7 +131,8 @@ func runCGREngine(fs []string) (err error) { // ServiceIndexer will share service references to all services registry := servmanager.NewServiceRegistry() gvS := services.NewGlobalVarS(cfg) - cls := services.NewCommonListenerService(cfg, caps) + caps := services.NewCapService(cfg) + cls := services.NewCommonListenerService(cfg) anzS := services.NewAnalyzerService(cfg) cms := services.NewConnManagerService(cfg) lgs := services.NewLoggerService(cfg, *flags.Logger) @@ -141,7 +140,7 @@ func runCGREngine(fs []string) (err error) { sdbS := services.NewStorDBService(cfg, *flags.SetVersions) configS := services.NewConfigService(cfg) guardianS := services.NewGuardianService(cfg) - coreS := services.NewCoreService(cfg, caps, cpuPrfF, shdWg) + coreS := services.NewCoreService(cfg, cpuPrfF, shdWg) cacheS := services.NewCacheService(cfg) fltrS := services.NewFilterService(cfg) dspS := services.NewDispatcherService(cfg) @@ -164,7 +163,7 @@ func runCGREngine(fs []string) (err error) { janusAgent := services.NewJanusAgent(cfg) astAgent := services.NewAsteriskAgent(cfg) radAgent := services.NewRadiusAgent(cfg) - diamAgent := services.NewDiameterAgent(cfg, caps) + diamAgent := services.NewDiameterAgent(cfg) httpAgent := services.NewHTTPAgent(cfg) sipAgent := services.NewSIPAgent(cfg) eeS := services.NewEventExporterService(cfg) @@ -177,6 +176,7 @@ func runCGREngine(fs []string) (err error) { srvManager := servmanager.NewServiceManager(shdWg, cfg, registry, []servmanager.Service{ gvS, + caps, cls, anzS, cms, diff --git a/services/caps.go b/services/caps.go new file mode 100644 index 000000000..feb115c08 --- /dev/null +++ b/services/caps.go @@ -0,0 +1,80 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "sync" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" +) + +// NewCapService instantiates a new CapService. +func NewCapService(cfg *config.CGRConfig) *CapService { + return &CapService{ + cfg: cfg, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), + } +} + +// CapService implements Service interface. +type CapService struct { + mu sync.RWMutex + cfg *config.CGRConfig + caps *engine.Caps + stateDeps *StateDependencies // channel subscriptions for state changes +} + +// Start handles the service start. +func (s *CapService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { + s.caps = engine.NewCaps(s.cfg.CoreSCfg().Caps, s.cfg.CoreSCfg().CapsStrategy) + return nil +} + +// Reload handles the config changes. +func (s *CapService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error { + return nil +} + +// Shutdown stops the service. +func (s *CapService) Shutdown(_ *servmanager.ServiceRegistry) error { + return nil +} + +// ServiceName returns the service name +func (s *CapService) ServiceName() string { + return utils.CapS +} + +// ShouldRun returns if the service should be running. +func (s *CapService) ShouldRun() bool { + return true +} + +// StateChan returns signaling channel of specific state +func (s *CapService) StateChan(stateID string) chan struct{} { + return s.stateDeps.StateChan(stateID) +} + +// Caps returns the Caps object. +func (s *CapService) Caps() *engine.Caps { + return s.caps +} diff --git a/services/commonlisteners.go b/services/commonlisteners.go index 1b0a9836d..5019a017b 100644 --- a/services/commonlisteners.go +++ b/services/commonlisteners.go @@ -23,17 +23,15 @@ import ( "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/registrarc" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) // NewCommonListenerService instantiates a new CommonListenerService. -func NewCommonListenerService(cfg *config.CGRConfig, caps *engine.Caps) *CommonListenerService { +func NewCommonListenerService(cfg *config.CGRConfig) *CommonListenerService { return &CommonListenerService{ cfg: cfg, - caps: caps, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -43,53 +41,57 @@ type CommonListenerService struct { mu sync.RWMutex cfg *config.CGRConfig cls *commonlisteners.CommonListenerS - caps *engine.Caps stateDeps *StateDependencies // channel subscriptions for state changes } // Start handles the service start. -func (cl *CommonListenerService) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error { - cl.mu.Lock() - defer cl.mu.Unlock() - cl.cls = commonlisteners.NewCommonListenerS(cl.caps) - if len(cl.cfg.HTTPCfg().RegistrarSURL) != 0 { - cl.cls.RegisterHTTPFunc(cl.cfg.HTTPCfg().RegistrarSURL, registrarc.Registrar) +func (s *CommonListenerService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { + cs, err := WaitForServiceState(utils.StateServiceUP, utils.CapS, registry, + s.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } - if cl.cfg.ConfigSCfg().Enabled { - cl.cls.RegisterHTTPFunc(cl.cfg.ConfigSCfg().URL, config.HandlerConfigS) + s.mu.Lock() + defer s.mu.Unlock() + s.cls = commonlisteners.NewCommonListenerS(cs.(*CapService).Caps()) + if len(s.cfg.HTTPCfg().RegistrarSURL) != 0 { + s.cls.RegisterHTTPFunc(s.cfg.HTTPCfg().RegistrarSURL, registrarc.Registrar) + } + if s.cfg.ConfigSCfg().Enabled { + s.cls.RegisterHTTPFunc(s.cfg.ConfigSCfg().URL, config.HandlerConfigS) } return nil } // Reload handles the config changes. -func (cl *CommonListenerService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error { +func (s *CommonListenerService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error { return nil } // Shutdown stops the service. -func (cl *CommonListenerService) Shutdown(_ *servmanager.ServiceRegistry) error { - cl.mu.Lock() - defer cl.mu.Unlock() - cl.cls = nil +func (s *CommonListenerService) Shutdown(_ *servmanager.ServiceRegistry) error { + s.mu.Lock() + defer s.mu.Unlock() + s.cls = nil return nil } // ServiceName returns the service name -func (cl *CommonListenerService) ServiceName() string { +func (s *CommonListenerService) ServiceName() string { return utils.CommonListenerS } // ShouldRun returns if the service should be running. -func (cl *CommonListenerService) ShouldRun() bool { +func (s *CommonListenerService) ShouldRun() bool { return true } // StateChan returns signaling channel of specific state -func (cl *CommonListenerService) StateChan(stateID string) chan struct{} { - return cl.stateDeps.StateChan(stateID) +func (s *CommonListenerService) StateChan(stateID string) chan struct{} { + return s.stateDeps.StateChan(stateID) } // CLS returns the CommonListenerS object. -func (cl *CommonListenerService) CLS() *commonlisteners.CommonListenerS { - return cl.cls +func (s *CommonListenerService) CLS() *commonlisteners.CommonListenerS { + return s.cls } diff --git a/services/cores.go b/services/cores.go index 69b3f1822..4997080f2 100644 --- a/services/cores.go +++ b/services/cores.go @@ -31,12 +31,10 @@ import ( ) // NewCoreService returns the Core Service -func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, - fileCPU *os.File, shdWg *sync.WaitGroup) *CoreService { +func NewCoreService(cfg *config.CGRConfig, fileCPU *os.File, shdWg *sync.WaitGroup) *CoreService { return &CoreService{ shdWg: shdWg, cfg: cfg, - caps: caps, fileCPU: fileCPU, csCh: make(chan *cores.CoreS, 1), stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), @@ -50,7 +48,6 @@ type CoreService struct { cS *cores.CoreS cl *commonlisteners.CommonListenerS fileCPU *os.File - caps *engine.Caps csCh chan *cores.CoreS stopChan chan struct{} shdWg *sync.WaitGroup @@ -58,31 +55,33 @@ type CoreService struct { } // Start should handle the service start -func (cS *CoreService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { +func (s *CoreService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ + utils.CapS, utils.CommonListenerS, utils.ConnManager, }, - registry, cS.cfg.GeneralCfg().ConnectTimeout) + registry, s.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } - cS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + caps := srvDeps[utils.CapS].(*CapService).Caps() + s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) - cS.mu.Lock() - defer cS.mu.Unlock() - cS.stopChan = make(chan struct{}) - cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.fileCPU, cS.stopChan, cS.shdWg, shutdown) - cS.csCh <- cS.cS - srv, err := engine.NewService(cS.cS) + s.mu.Lock() + defer s.mu.Unlock() + s.stopChan = make(chan struct{}) + s.cS = cores.NewCoreService(s.cfg, caps, s.fileCPU, s.stopChan, s.shdWg, shutdown) + s.csCh <- s.cS + srv, err := engine.NewService(s.cS) if err != nil { return err } - if !cS.cfg.DispatcherSCfg().Enabled { - for _, s := range srv { - cS.cl.RpcRegister(s) + if !s.cfg.DispatcherSCfg().Enabled { + for _, svc := range srv { + s.cl.RpcRegister(svc) } } cms.AddInternalConn(utils.CoreS, srv) @@ -90,42 +89,42 @@ func (cS *CoreService) Start(shutdown *utils.SyncedChan, registry *servmanager.S } // Reload handles the change of config -func (cS *CoreService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error { +func (s *CoreService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error { return nil } // Shutdown stops the service -func (cS *CoreService) Shutdown(_ *servmanager.ServiceRegistry) error { - cS.mu.Lock() - defer cS.mu.Unlock() - cS.cS.Shutdown() - close(cS.stopChan) - cS.cS.StopCPUProfiling() - cS.cS.StopMemoryProfiling() - cS.cS = nil - <-cS.csCh - cS.cl.RpcUnregisterName(utils.CoreSv1) +func (s *CoreService) Shutdown(_ *servmanager.ServiceRegistry) error { + s.mu.Lock() + defer s.mu.Unlock() + s.cS.Shutdown() + close(s.stopChan) + s.cS.StopCPUProfiling() + s.cS.StopMemoryProfiling() + s.cS = nil + <-s.csCh + s.cl.RpcUnregisterName(utils.CoreSv1) return nil } // ServiceName returns the service name -func (cS *CoreService) ServiceName() string { +func (s *CoreService) ServiceName() string { return utils.CoreS } // ShouldRun returns if the service should be running -func (cS *CoreService) ShouldRun() bool { +func (s *CoreService) ShouldRun() bool { return true } // StateChan returns signaling channel of specific state -func (cS *CoreService) StateChan(stateID string) chan struct{} { - return cS.stateDeps.StateChan(stateID) +func (s *CoreService) StateChan(stateID string) chan struct{} { + return s.stateDeps.StateChan(stateID) } // CoreS returns the CoreS object. -func (cS *CoreService) CoreS() *cores.CoreS { - cS.mu.RLock() - defer cS.mu.RUnlock() - return cS.cS +func (s *CoreService) CoreS() *cores.CoreS { + s.mu.RLock() + defer s.mu.RUnlock() + return s.cS } diff --git a/services/diameteragent.go b/services/diameteragent.go index c8f578e15..9e776cc7d 100644 --- a/services/diameteragent.go +++ b/services/diameteragent.go @@ -30,10 +30,9 @@ import ( ) // NewDiameterAgent returns the Diameter Agent -func NewDiameterAgent(cfg *config.CGRConfig, caps *engine.Caps) *DiameterAgent { +func NewDiameterAgent(cfg *config.CGRConfig) *DiameterAgent { return &DiameterAgent{ cfg: cfg, - caps: caps, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -44,8 +43,7 @@ type DiameterAgent struct { cfg *config.CGRConfig stopChan chan struct{} - da *agents.DiameterAgent - caps *engine.Caps + da *agents.DiameterAgent lnet string laddr string @@ -54,88 +52,92 @@ type DiameterAgent struct { } // Start should handle the sercive start -func (da *DiameterAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { +func (s *DiameterAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ + utils.CapS, utils.ConnManager, utils.FilterS, }, - registry, da.cfg.GeneralCfg().ConnectTimeout) + registry, s.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } - cms := srvDeps[utils.ConnManager].(*ConnManagerService) - fs := srvDeps[utils.FilterS].(*FilterService) + caps := srvDeps[utils.CapS].(*CapService).Caps() + cm := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager() + fs := srvDeps[utils.FilterS].(*FilterService).FilterS() - da.Lock() - defer da.Unlock() - return da.start(fs.FilterS(), cms.ConnManager(), da.caps, shutdown) + s.Lock() + defer s.Unlock() + return s.start(fs, cm, caps, shutdown) } -func (da *DiameterAgent) start(filterS *engine.FilterS, cm *engine.ConnManager, caps *engine.Caps, +func (s *DiameterAgent) start(filterS *engine.FilterS, cm *engine.ConnManager, caps *engine.Caps, shutdown *utils.SyncedChan) error { var err error - da.da, err = agents.NewDiameterAgent(da.cfg, filterS, cm, caps) + s.da, err = agents.NewDiameterAgent(s.cfg, filterS, cm, caps) if err != nil { return err } - da.lnet = da.cfg.DiameterAgentCfg().ListenNet - da.laddr = da.cfg.DiameterAgentCfg().Listen - da.stopChan = make(chan struct{}) + s.lnet = s.cfg.DiameterAgentCfg().ListenNet + s.laddr = s.cfg.DiameterAgentCfg().Listen + s.stopChan = make(chan struct{}) go func(d *agents.DiameterAgent) { - if err := d.ListenAndServe(da.stopChan); err != nil { + if err := d.ListenAndServe(s.stopChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.DiameterAgent, err)) shutdown.CloseOnce() } - }(da.da) + }(s.da) return nil } // Reload handles the change of config -func (da *DiameterAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { - da.Lock() - defer da.Unlock() - if da.lnet == da.cfg.DiameterAgentCfg().ListenNet && - da.laddr == da.cfg.DiameterAgentCfg().Listen { +func (s *DiameterAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { + s.Lock() + defer s.Unlock() + if s.lnet == s.cfg.DiameterAgentCfg().ListenNet && + s.laddr == s.cfg.DiameterAgentCfg().Listen { return } - close(da.stopChan) + close(s.stopChan) srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ + utils.CapS, utils.ConnManager, utils.FilterS, }, - registry, da.cfg.GeneralCfg().ConnectTimeout) + registry, s.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } - cms := srvDeps[utils.ConnManager].(*ConnManagerService) - fs := srvDeps[utils.FilterS].(*FilterService) - return da.start(fs.FilterS(), cms.ConnManager(), da.caps, shutdown) + caps := srvDeps[utils.CapS].(*CapService).Caps() + cm := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager() + fs := srvDeps[utils.FilterS].(*FilterService).FilterS() + return s.start(fs, cm, caps, shutdown) } // Shutdown stops the service -func (da *DiameterAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - da.Lock() - close(da.stopChan) - da.da = nil - da.Unlock() +func (s *DiameterAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { + s.Lock() + close(s.stopChan) + s.da = nil + s.Unlock() return // no shutdown for the momment } // ServiceName returns the service name -func (da *DiameterAgent) ServiceName() string { +func (s *DiameterAgent) ServiceName() string { return utils.DiameterAgent } // ShouldRun returns if the service should be running -func (da *DiameterAgent) ShouldRun() bool { - return da.cfg.DiameterAgentCfg().Enabled +func (s *DiameterAgent) ShouldRun() bool { + return s.cfg.DiameterAgentCfg().Enabled } // StateChan returns signaling channel of specific state -func (da *DiameterAgent) StateChan(stateID string) chan struct{} { - return da.stateDeps.StateChan(stateID) +func (s *DiameterAgent) StateChan(stateID string) chan struct{} { + return s.stateDeps.StateChan(stateID) } diff --git a/utils/consts.go b/utils/consts.go index 749ed4b52..83e81ea17 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -995,6 +995,7 @@ const ( CommonListenerS = "CommonListenerS" ConnManager = "ConnManager" LoggerS = "LoggerS" + CapS = "CapS" ) // Lower service names