From 652d1e68cf561955ef790ce6b01ba77ad2fe8120 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Thu, 23 Jan 2025 19:09:22 +0200 Subject: [PATCH] Remove cls fields from service structs now retrieved through registry.Lookup --- services/accounts.go | 34 ++++++++---------- services/actions.go | 32 ++++++++--------- services/adminsv1.go | 59 +++++++++++++++--------------- services/analyzers.go | 32 ++++++++--------- services/asteriskagent.go | 10 +++--- services/attributes.go | 27 +++++++------- services/caches.go | 13 +++---- services/caps.go | 3 -- services/cdrs.go | 27 +++++++------- services/chargers.go | 27 +++++++------- services/commonlisteners.go | 40 ++++++++++++++++++++- services/config.go | 14 +++----- services/cores.go | 11 +++--- services/datadb.go | 14 ++++---- services/diameteragent.go | 14 ++++---- services/dnsagent.go | 14 ++++---- services/ees.go | 71 ++++++++++++++++++------------------- services/efs.go | 58 +++++++++++++++--------------- services/ers.go | 36 +++++++++---------- services/guardian.go | 19 +++------- services/httpagent.go | 19 +++++----- services/janus.go | 14 ++++---- services/loaders.go | 33 +++++++---------- services/logger.go | 8 ++--- services/radiusagent.go | 25 ++++++------- services/rankings.go | 31 ++++++++-------- services/rates.go | 65 +++++++++++++++++---------------- services/registrarc.go | 21 +++++------ services/resources.go | 31 ++++++++-------- services/routes.go | 27 +++++++------- services/sessions.go | 47 ++++++++++++------------ services/sipagent.go | 18 +++++----- services/stats.go | 31 ++++++++-------- services/stordb.go | 14 ++++---- services/thresholds.go | 37 +++++++++---------- services/tpes.go | 18 ++++++---- services/trends.go | 31 ++++++++-------- 37 files changed, 489 insertions(+), 536 deletions(-) diff --git a/services/accounts.go b/services/accounts.go index 2ff193a27..438ed2ecb 100644 --- a/services/accounts.go +++ b/services/accounts.go @@ -22,7 +22,6 @@ import ( "sync" "github.com/cgrates/cgrates/accounts" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -41,15 +40,11 @@ func NewAccountService(cfg *config.CGRConfig) *AccountService { // AccountService implements Service interface type AccountService struct { - sync.RWMutex - cfg *config.CGRConfig - - acts *accounts.AccountS - cl *commonlisteners.CommonListenerS - - rldChan chan struct{} - stopChan chan struct{} - + mu sync.RWMutex + cfg *config.CGRConfig + acts *accounts.AccountS + rldChan chan struct{} + stopChan chan struct{} stateDeps *StateDependencies // channel subscriptions for state changes } @@ -67,7 +62,7 @@ func (acts *AccountService) Start(shutdown *utils.SyncedChan, registry *servmana if err != nil { return err } - acts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, @@ -78,8 +73,8 @@ func (acts *AccountService) Start(shutdown *utils.SyncedChan, registry *servmana fs := srvDeps[utils.FilterS].(*FilterService).FilterS() dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager() - acts.Lock() - defer acts.Unlock() + acts.mu.Lock() + defer acts.mu.Unlock() acts.acts = accounts.NewAccountS(acts.cfg, fs, cms.ConnManager(), dbs) acts.stopChan = make(chan struct{}) go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan) @@ -87,9 +82,7 @@ func (acts *AccountService) Start(shutdown *utils.SyncedChan, registry *servmana if err != nil { return err } - - acts.cl.RpcRegister(srv) - + cl.RpcRegister(srv) cms.AddInternalConn(utils.AccountS, srv) return } @@ -101,12 +94,13 @@ func (acts *AccountService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRe } // Shutdown stops the service -func (acts *AccountService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - acts.Lock() +func (acts *AccountService) Shutdown(registry *servmanager.ServiceRegistry) (err error) { + acts.mu.Lock() + defer acts.mu.Unlock() close(acts.stopChan) acts.acts = nil - acts.Unlock() - acts.cl.RpcUnregisterName(utils.AccountSv1) + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + cl.RpcUnregisterName(utils.AccountSv1) return } diff --git a/services/actions.go b/services/actions.go index eb9bfeb76..290622b63 100644 --- a/services/actions.go +++ b/services/actions.go @@ -22,7 +22,6 @@ import ( "sync" "github.com/cgrates/cgrates/actions" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -41,15 +40,11 @@ func NewActionService(cfg *config.CGRConfig) *ActionService { // ActionService implements Service interface type ActionService struct { - sync.RWMutex - cfg *config.CGRConfig - - acts *actions.ActionS - cl *commonlisteners.CommonListenerS - - rldChan chan struct{} - stopChan chan struct{} - + mu sync.RWMutex + cfg *config.CGRConfig + acts *actions.ActionS + rldChan chan struct{} + stopChan chan struct{} stateDeps *StateDependencies // channel subscriptions for state changes } @@ -67,7 +62,7 @@ func (acts *ActionService) Start(shutdown *utils.SyncedChan, registry *servmanag if err != nil { return err } - acts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, @@ -78,8 +73,8 @@ func (acts *ActionService) Start(shutdown *utils.SyncedChan, registry *servmanag fs := srvDeps[utils.FilterS].(*FilterService).FilterS() dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager() - acts.Lock() - defer acts.Unlock() + acts.mu.Lock() + defer acts.mu.Unlock() acts.acts = actions.NewActionS(acts.cfg, fs, dbs, cms.ConnManager()) acts.stopChan = make(chan struct{}) go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan) @@ -88,7 +83,7 @@ func (acts *ActionService) Start(shutdown *utils.SyncedChan, registry *servmanag return } // srv, _ := birpc.NewService(apis.NewActionSv1(acts.acts), "", false) - acts.cl.RpcRegister(srv) + cl.RpcRegister(srv) cms.AddInternalConn(utils.ActionS, srv) return } @@ -100,13 +95,14 @@ func (acts *ActionService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceReg } // Shutdown stops the service -func (acts *ActionService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - acts.Lock() - defer acts.Unlock() +func (acts *ActionService) Shutdown(registry *servmanager.ServiceRegistry) (err error) { + acts.mu.Lock() + defer acts.mu.Unlock() close(acts.stopChan) acts.acts.Shutdown() acts.acts = nil - acts.cl.RpcUnregisterName(utils.ActionSv1) + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + cl.RpcUnregisterName(utils.ActionSv1) return } diff --git a/services/adminsv1.go b/services/adminsv1.go index eac0b44ab..0376c57b2 100644 --- a/services/adminsv1.go +++ b/services/adminsv1.go @@ -22,7 +22,6 @@ import ( "sync" "github.com/cgrates/cgrates/apis" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -39,17 +38,16 @@ func NewAdminSv1Service(cfg *config.CGRConfig) *AdminSv1Service { // AdminSv1Service implements Service interface type AdminSv1Service struct { - sync.RWMutex + mu sync.RWMutex cfg *config.CGRConfig api *apis.AdminSv1 - cl *commonlisteners.CommonListenerS stopChan chan struct{} stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start // For this service the start should be called from RAL Service -func (apiService *AdminSv1Service) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { +func (s *AdminSv1Service) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -58,61 +56,62 @@ func (apiService *AdminSv1Service) Start(_ *utils.SyncedChan, registry *servmana utils.DataDB, utils.StorDB, }, - registry, apiService.cfg.GeneralCfg().ConnectTimeout) + registry, s.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } - apiService.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) - fs := srvDeps[utils.FilterS].(*FilterService) - dbs := srvDeps[utils.DataDB].(*DataDBService) - sdbs := srvDeps[utils.StorDB].(*StorDBService) + fs := srvDeps[utils.FilterS].(*FilterService).FilterS() + dm := srvDeps[utils.DataDB].(*DataDBService).DataManager() + sdb := srvDeps[utils.StorDB].(*StorDBService).DB() - apiService.Lock() - defer apiService.Unlock() + s.mu.Lock() + defer s.mu.Unlock() - apiService.api = apis.NewAdminSv1(apiService.cfg, dbs.DataManager(), cms.ConnManager(), fs.FilterS(), sdbs.DB()) + s.api = apis.NewAdminSv1(s.cfg, dm, cms.ConnManager(), fs, sdb) - srv, _ := engine.NewService(apiService.api) - // srv, _ := birpc.NewService(apiService.api, "", false) + srv, _ := engine.NewService(s.api) + // srv, _ := birpc.NewService(s.api, "", false) for _, s := range srv { - apiService.cl.RpcRegister(s) + cl.RpcRegister(s) } - rpl, _ := engine.NewService(apis.NewReplicatorSv1(dbs.DataManager(), apiService.api)) - for _, s := range rpl { - apiService.cl.RpcRegister(s) + rpl, _ := engine.NewService(apis.NewReplicatorSv1(dm, s.api)) + for _, svc := range rpl { + cl.RpcRegister(svc) } cms.AddInternalConn(utils.AdminS, srv) return } // Reload handles the change of config -func (apiService *AdminSv1Service) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { +func (s *AdminSv1Service) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { return } // Shutdown stops the service -func (apiService *AdminSv1Service) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - apiService.Lock() - // close(apiService.stopChan) - apiService.api = nil - apiService.cl.RpcUnregisterName(utils.AdminSv1) - apiService.Unlock() +func (s *AdminSv1Service) Shutdown(registry *servmanager.ServiceRegistry) (err error) { + s.mu.Lock() + defer s.mu.Unlock() + // close(s.stopChan) + s.api = nil + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + cl.RpcUnregisterName(utils.AdminSv1) return } // ServiceName returns the service name -func (apiService *AdminSv1Service) ServiceName() string { +func (s *AdminSv1Service) ServiceName() string { return utils.AdminS } // ShouldRun returns if the service should be running -func (apiService *AdminSv1Service) ShouldRun() bool { - return apiService.cfg.AdminSCfg().Enabled +func (s *AdminSv1Service) ShouldRun() bool { + return s.cfg.AdminSCfg().Enabled } // StateChan returns signaling channel of specific state -func (apiService *AdminSv1Service) StateChan(stateID string) chan struct{} { - return apiService.stateDeps.StateChan(stateID) +func (s *AdminSv1Service) StateChan(stateID string) chan struct{} { + return s.stateDeps.StateChan(stateID) } diff --git a/services/analyzers.go b/services/analyzers.go index cb301f8e9..0d3ec6cae 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -47,10 +47,9 @@ func NewAnalyzerService(cfg *config.CGRConfig) *AnalyzerService { // AnalyzerService implements Service interface type AnalyzerService struct { - sync.RWMutex + mu sync.RWMutex cfg *config.CGRConfig anz *analyzers.AnalyzerS - cl *commonlisteners.CommonListenerS cancelFunc context.CancelFunc stateDeps *StateDependencies // channel subscriptions for state changes @@ -63,10 +62,10 @@ func (anz *AnalyzerService) Start(shutdown *utils.SyncedChan, registry *servmana if err != nil { return } - anz.cl = cls.(*CommonListenerService).CLS() + cl := cls.(*CommonListenerService).CLS() - anz.Lock() - defer anz.Unlock() + anz.mu.Lock() + defer anz.mu.Unlock() if anz.anz, err = analyzers.NewAnalyzerS(anz.cfg); err != nil { return } @@ -79,26 +78,26 @@ func (anz *AnalyzerService) Start(shutdown *utils.SyncedChan, registry *servmana shutdown.CloseOnce() } }(anz.anz) - anz.cl.SetAnalyzer(anz.anz) - go anz.start(registry) + cl.SetAnalyzer(anz.anz) + go anz.start(registry, cl) return } -func (anz *AnalyzerService) start(registry *servmanager.ServiceRegistry) { +func (anz *AnalyzerService) start(registry *servmanager.ServiceRegistry, cl *commonlisteners.CommonListenerS) { fs, err := WaitForServiceState(utils.StateServiceUP, utils.FilterS, registry, anz.cfg.GeneralCfg().ConnectTimeout) if err != nil { return } - anz.Lock() + anz.mu.Lock() anz.anz.SetFilterS(fs.(*FilterService).FilterS()) srv, _ := engine.NewService(anz.anz) // srv, _ := birpc.NewService(apis.NewAnalyzerSv1(anz.anz), "", false) for _, s := range srv { - anz.cl.RpcRegister(s) + cl.RpcRegister(s) } - anz.Unlock() + anz.mu.Unlock() } // Reload handles the change of config @@ -107,14 +106,15 @@ func (anz *AnalyzerService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRe } // Shutdown stops the service -func (anz *AnalyzerService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - anz.Lock() +func (anz *AnalyzerService) Shutdown(registry *servmanager.ServiceRegistry) (err error) { + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + anz.mu.Lock() anz.cancelFunc() - anz.cl.SetAnalyzer(nil) + cl.SetAnalyzer(nil) anz.anz.Shutdown() anz.anz = nil - anz.Unlock() - anz.cl.RpcUnregisterName(utils.AnalyzerSv1) + anz.mu.Unlock() + cl.RpcUnregisterName(utils.AnalyzerSv1) return } diff --git a/services/asteriskagent.go b/services/asteriskagent.go index 79cfce440..1e107c2cc 100644 --- a/services/asteriskagent.go +++ b/services/asteriskagent.go @@ -38,7 +38,7 @@ func NewAsteriskAgent(cfg *config.CGRConfig) *AsteriskAgent { // AsteriskAgent implements Agent interface type AsteriskAgent struct { - sync.RWMutex + mu sync.RWMutex cfg *config.CGRConfig stopChan chan struct{} smas []*agents.AsteriskAgent @@ -52,8 +52,8 @@ func (ast *AsteriskAgent) Start(shutdown *utils.SyncedChan, registry *servmanage return } - ast.Lock() - defer ast.Unlock() + ast.mu.Lock() + defer ast.mu.Unlock() listenAndServe := func(sma *agents.AsteriskAgent, stopChan chan struct{}) { if err := sma.ListenAndServe(stopChan); err != nil { @@ -83,10 +83,10 @@ func (ast *AsteriskAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { } func (ast *AsteriskAgent) shutdown() { - ast.Lock() + ast.mu.Lock() + defer ast.mu.Unlock() close(ast.stopChan) ast.smas = nil - ast.Unlock() return // no shutdown for the momment } diff --git a/services/attributes.go b/services/attributes.go index cb9773ef8..ff768e530 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -22,7 +22,6 @@ import ( "sync" "github.com/cgrates/cgrates/apis" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -39,13 +38,10 @@ func NewAttributeService(cfg *config.CGRConfig) *AttributeService { // AttributeService implements Service interface type AttributeService struct { - sync.RWMutex - cfg *config.CGRConfig - - attrS *engine.AttributeS - cl *commonlisteners.CommonListenerS - rpc *apis.AttributeSv1 // useful on restart - + mu sync.Mutex + cfg *config.CGRConfig + attrS *engine.AttributeS + rpc *apis.AttributeSv1 // useful on restart stateDeps *StateDependencies } @@ -63,7 +59,7 @@ func (attrS *AttributeService) Start(shutdown *utils.SyncedChan, registry *servm if err != nil { return } - attrS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, @@ -74,14 +70,14 @@ func (attrS *AttributeService) Start(shutdown *utils.SyncedChan, registry *servm fs := srvDeps[utils.FilterS].(*FilterService).FilterS() dm := srvDeps[utils.DataDB].(*DataDBService).DataManager() - attrS.Lock() - defer attrS.Unlock() + attrS.mu.Lock() + defer attrS.mu.Unlock() attrS.attrS = engine.NewAttributeService(dm, fs, attrS.cfg) attrS.rpc = apis.NewAttributeSv1(attrS.attrS) srv, _ := engine.NewService(attrS.rpc) // srv, _ := birpc.NewService(attrS.rpc, "", false) for _, s := range srv { - attrS.cl.RpcRegister(s) + cl.RpcRegister(s) } cms.AddInternalConn(utils.AttributeS, srv) return @@ -94,9 +90,10 @@ func (attrS *AttributeService) Reload(_ *utils.SyncedChan, _ *servmanager.Servic // Shutdown stops the service func (attrS *AttributeService) Shutdown(registry *servmanager.ServiceRegistry) (err error) { - attrS.Lock() - attrS.cl.RpcUnregisterName(utils.AttributeSv1) - attrS.Unlock() + attrS.mu.Lock() + defer attrS.mu.Unlock() + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + cl.RpcUnregisterName(utils.AttributeSv1) return } diff --git a/services/caches.go b/services/caches.go index b338c7951..e38298e7c 100644 --- a/services/caches.go +++ b/services/caches.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -41,7 +40,6 @@ func NewCacheService(cfg *config.CGRConfig) *CacheService { type CacheService struct { mu sync.Mutex cfg *config.CGRConfig - cl *commonlisteners.CommonListenerS cacheCh chan *engine.CacheS stateDeps *StateDependencies // channel subscriptions for state changes } @@ -59,7 +57,7 @@ func (cS *CacheService) Start(shutdown *utils.SyncedChan, registry *servmanager. if err != nil { return err } - cS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() dbs := srvDeps[utils.DataDB].(*DataDBService) cms := srvDeps[utils.ConnManager].(*ConnManagerService) cs := srvDeps[utils.CoreS].(*CoreService) @@ -75,7 +73,7 @@ func (cS *CacheService) Start(shutdown *utils.SyncedChan, registry *servmanager. srv, _ := engine.NewService(engine.Cache) // srv, _ := birpc.NewService(apis.NewCacheSv1(engine.Cache), "", false) for _, s := range srv { - cS.cl.RpcRegister(s) + cl.RpcRegister(s) } cms.AddInternalConn(utils.CacheS, srv) return @@ -87,8 +85,11 @@ func (cS *CacheService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegist } // Shutdown stops the service -func (cS *CacheService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) { - cS.cl.RpcUnregisterName(utils.CacheSv1) +func (cS *CacheService) Shutdown(registry *servmanager.ServiceRegistry) (_ error) { + cS.mu.Lock() + cS.mu.Unlock() + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + cl.RpcUnregisterName(utils.CacheSv1) return } diff --git a/services/caps.go b/services/caps.go index feb115c08..2879e1040 100644 --- a/services/caps.go +++ b/services/caps.go @@ -19,8 +19,6 @@ along with this program. If not, see package services import ( - "sync" - "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -37,7 +35,6 @@ func NewCapService(cfg *config.CGRConfig) *CapService { // CapService implements Service interface. type CapService struct { - mu sync.RWMutex cfg *config.CGRConfig caps *engine.Caps stateDeps *StateDependencies // channel subscriptions for state changes diff --git a/services/cdrs.go b/services/cdrs.go index abe738936..13ccd7952 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/cgrates/cgrates/cdrs" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -40,12 +39,9 @@ func NewCDRServer(cfg *config.CGRConfig) *CDRService { // CDRService implements Service interface type CDRService struct { - sync.RWMutex - cfg *config.CGRConfig - - cdrS *cdrs.CDRServer - cl *commonlisteners.CommonListenerS - + mu sync.RWMutex + cfg *config.CGRConfig + cdrS *cdrs.CDRServer stateDeps *StateDependencies // channel subscriptions for state changes } @@ -63,14 +59,14 @@ func (cs *CDRService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe if err != nil { return err } - cs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) fs := srvDeps[utils.FilterS].(*FilterService).FilterS() dbs := srvDeps[utils.DataDB].(*DataDBService) sdbs := srvDeps[utils.StorDB].(*StorDBService).DB() - cs.Lock() - defer cs.Unlock() + cs.mu.Lock() + defer cs.mu.Unlock() cs.cdrS = cdrs.NewCDRServer(cs.cfg, dbs.DataManager(), fs, cms.ConnManager(), sdbs) runtime.Gosched() @@ -78,7 +74,7 @@ func (cs *CDRService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe if err != nil { return err } - cs.cl.RpcRegister(srv) + cl.RpcRegister(srv) cms.AddInternalConn(utils.CDRServer, srv) return } @@ -89,11 +85,12 @@ func (cs *CDRService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry } // Shutdown stops the service -func (cs *CDRService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - cs.Lock() +func (cs *CDRService) Shutdown(registry *servmanager.ServiceRegistry) (err error) { + cs.mu.Lock() + defer cs.mu.Unlock() cs.cdrS = nil - cs.Unlock() - cs.cl.RpcUnregisterName(utils.CDRsV1) + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + cl.RpcUnregisterName(utils.CDRsV1) return } diff --git a/services/chargers.go b/services/chargers.go index 79594cbf1..379c7c5a5 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -38,12 +37,9 @@ func NewChargerService(cfg *config.CGRConfig) *ChargerService { // ChargerService implements Service interface type ChargerService struct { - sync.RWMutex - cfg *config.CGRConfig - - chrS *engine.ChargerS - cl *commonlisteners.CommonListenerS - + mu sync.RWMutex + cfg *config.CGRConfig + chrS *engine.ChargerS stateDeps *StateDependencies // channel subscriptions for state changes } @@ -61,7 +57,7 @@ func (chrS *ChargerService) Start(shutdown *utils.SyncedChan, registry *servmana if err != nil { return err } - chrS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, @@ -72,13 +68,13 @@ func (chrS *ChargerService) Start(shutdown *utils.SyncedChan, registry *servmana fs := srvDeps[utils.FilterS].(*FilterService) dbs := srvDeps[utils.DataDB].(*DataDBService) - chrS.Lock() - defer chrS.Unlock() + chrS.mu.Lock() + defer chrS.mu.Unlock() chrS.chrS = engine.NewChargerService(dbs.DataManager(), fs.FilterS(), chrS.cfg, cms.ConnManager()) srv, _ := engine.NewService(chrS.chrS) // srv, _ := birpc.NewService(apis.NewChargerSv1(chrS.chrS), "", false) for _, s := range srv { - chrS.cl.RpcRegister(s) + cl.RpcRegister(s) } cms.AddInternalConn(utils.ChargerS, srv) return nil @@ -90,11 +86,12 @@ func (chrS *ChargerService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRe } // Shutdown stops the service -func (chrS *ChargerService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - chrS.Lock() - defer chrS.Unlock() +func (chrS *ChargerService) Shutdown(registry *servmanager.ServiceRegistry) (err error) { + chrS.mu.Lock() + defer chrS.mu.Unlock() chrS.chrS = nil - chrS.cl.RpcUnregisterName(utils.ChargerSv1) + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + cl.RpcUnregisterName(utils.ChargerSv1) return } diff --git a/services/commonlisteners.go b/services/commonlisteners.go index 5019a017b..7c76cfead 100644 --- a/services/commonlisteners.go +++ b/services/commonlisteners.go @@ -69,7 +69,45 @@ func (s *CommonListenerService) Reload(_ *utils.SyncedChan, _ *servmanager.Servi } // Shutdown stops the service. -func (s *CommonListenerService) Shutdown(_ *servmanager.ServiceRegistry) error { +func (s *CommonListenerService) Shutdown(registry *servmanager.ServiceRegistry) error { + deps := []string{ + utils.AccountS, + utils.ActionS, + utils.AdminS, + utils.AnalyzerS, + utils.AttributeS, + utils.CacheS, + utils.CDRServer, + utils.ChargerS, + utils.ConfigS, + utils.CoreS, + utils.EEs, + utils.EFs, + utils.ERs, + utils.GuardianS, + utils.HTTPAgent, + utils.JanusAgent, + utils.LoaderS, + utils.RankingS, + utils.RateS, + utils.RegistrarC, + utils.ResourceS, + utils.RouteS, + utils.SessionS, + utils.StatS, + utils.ThresholdS, + utils.TPeS, + utils.TrendS, + } + for _, svcID := range deps { + if !servmanager.IsServiceInState(registry.Lookup(svcID), utils.StateServiceUP) { + continue + } + _, err := WaitForServiceState(utils.StateServiceDOWN, svcID, registry, s.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err + } + } s.mu.Lock() defer s.mu.Unlock() s.cls = nil diff --git a/services/config.go b/services/config.go index 5b2053ddb..02124c432 100644 --- a/services/config.go +++ b/services/config.go @@ -19,9 +19,6 @@ along with this program. If not, see package services import ( - "sync" - - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -38,9 +35,7 @@ func NewConfigService(cfg *config.CGRConfig) *ConfigService { // ConfigService implements Service interface. type ConfigService struct { - mu sync.RWMutex cfg *config.CGRConfig - cl *commonlisteners.CommonListenerS stateDeps *StateDependencies // channel subscriptions for state changes } @@ -55,12 +50,12 @@ func (s *ConfigService) Start(_ *utils.SyncedChan, registry *servmanager.Service if err != nil { return err } - s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) svcs, _ := engine.NewServiceWithName(s.cfg, utils.ConfigS, true) for _, svc := range svcs { - s.cl.RpcRegister(svc) + cl.RpcRegister(svc) } cms.AddInternalConn(utils.ConfigS, svcs) return nil @@ -72,8 +67,9 @@ func (s *ConfigService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegist } // Shutdown stops the service. -func (s *ConfigService) Shutdown(_ *servmanager.ServiceRegistry) error { - s.cl.RpcUnregisterName(utils.ConfigSv1) +func (s *ConfigService) Shutdown(registry *servmanager.ServiceRegistry) error { + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + cl.RpcUnregisterName(utils.ConfigSv1) return nil } diff --git a/services/cores.go b/services/cores.go index b723b2d4e..d79178639 100644 --- a/services/cores.go +++ b/services/cores.go @@ -22,7 +22,6 @@ import ( "os" "sync" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" @@ -45,7 +44,6 @@ type CoreService struct { mu sync.RWMutex cfg *config.CGRConfig cS *cores.CoreS - cl *commonlisteners.CommonListenerS fileCPU *os.File stopChan chan struct{} shdWg *sync.WaitGroup @@ -65,7 +63,7 @@ func (s *CoreService) Start(shutdown *utils.SyncedChan, registry *servmanager.Se return err } caps := srvDeps[utils.CapS].(*CapService).Caps() - s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) s.mu.Lock() @@ -77,7 +75,7 @@ func (s *CoreService) Start(shutdown *utils.SyncedChan, registry *servmanager.Se return err } for _, svc := range srv { - s.cl.RpcRegister(svc) + cl.RpcRegister(svc) } cms.AddInternalConn(utils.CoreS, srv) return nil @@ -89,7 +87,7 @@ func (s *CoreService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry } // Shutdown stops the service -func (s *CoreService) Shutdown(_ *servmanager.ServiceRegistry) error { +func (s *CoreService) Shutdown(registry *servmanager.ServiceRegistry) error { s.mu.Lock() defer s.mu.Unlock() s.cS.Shutdown() @@ -97,7 +95,8 @@ func (s *CoreService) Shutdown(_ *servmanager.ServiceRegistry) error { s.cS.StopCPUProfiling() s.cS.StopMemoryProfiling() s.cS = nil - s.cl.RpcUnregisterName(utils.CoreSv1) + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + cl.RpcUnregisterName(utils.CoreSv1) return nil } diff --git a/services/datadb.go b/services/datadb.go index 61c313ed3..23a747d16 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -39,7 +39,7 @@ func NewDataDBService(cfg *config.CGRConfig, setVersions bool) *DataDBService { // DataDBService implements Service interface type DataDBService struct { - sync.RWMutex + mu sync.RWMutex cfg *config.CGRConfig oldDBCfg *config.DataDbCfg dm *engine.DataManager @@ -53,8 +53,8 @@ func (db *DataDBService) Start(_ *utils.SyncedChan, registry *servmanager.Servic if err != nil { return } - db.Lock() - defer db.Unlock() + db.mu.Lock() + defer db.mu.Unlock() db.oldDBCfg = db.cfg.DataDbCfg().Clone() dbConn, err := engine.NewDataDBConn(db.cfg.DataDbCfg().Type, db.cfg.DataDbCfg().Host, db.cfg.DataDbCfg().Port, @@ -81,8 +81,8 @@ func (db *DataDBService) Start(_ *utils.SyncedChan, registry *servmanager.Servic // Reload handles the change of config func (db *DataDBService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { - db.Lock() - defer db.Unlock() + db.mu.Lock() + defer db.mu.Unlock() if db.needsConnectionReload() { var d engine.DataDBDriver d, err = engine.NewDataDBConn(db.cfg.DataDbCfg().Type, @@ -126,8 +126,8 @@ func (db *DataDBService) Shutdown(registry *servmanager.ServiceRegistry) error { return err } } - db.Lock() - defer db.Unlock() + db.mu.Lock() + defer db.mu.Unlock() db.dm.DataDB().Close() return nil } diff --git a/services/diameteragent.go b/services/diameteragent.go index 9e776cc7d..44aad26a2 100644 --- a/services/diameteragent.go +++ b/services/diameteragent.go @@ -39,7 +39,7 @@ func NewDiameterAgent(cfg *config.CGRConfig) *DiameterAgent { // DiameterAgent implements Agent interface type DiameterAgent struct { - sync.RWMutex + mu sync.RWMutex cfg *config.CGRConfig stopChan chan struct{} @@ -67,8 +67,8 @@ func (s *DiameterAgent) Start(shutdown *utils.SyncedChan, registry *servmanager. cm := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager() fs := srvDeps[utils.FilterS].(*FilterService).FilterS() - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() return s.start(fs, cm, caps, shutdown) } @@ -94,8 +94,8 @@ func (s *DiameterAgent) start(filterS *engine.FilterS, cm *engine.ConnManager, c // Reload handles the change of config func (s *DiameterAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() if s.lnet == s.cfg.DiameterAgentCfg().ListenNet && s.laddr == s.cfg.DiameterAgentCfg().Listen { return @@ -120,10 +120,10 @@ func (s *DiameterAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager // Shutdown stops the service func (s *DiameterAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - s.Lock() + s.mu.Lock() close(s.stopChan) s.da = nil - s.Unlock() + s.mu.Unlock() return // no shutdown for the momment } diff --git a/services/dnsagent.go b/services/dnsagent.go index 8a37eff98..78d7fc7af 100644 --- a/services/dnsagent.go +++ b/services/dnsagent.go @@ -37,7 +37,7 @@ func NewDNSAgent(cfg *config.CGRConfig) *DNSAgent { // DNSAgent implements Agent interface type DNSAgent struct { - sync.RWMutex + mu sync.RWMutex cfg *config.CGRConfig stopChan chan struct{} @@ -61,8 +61,8 @@ func (dns *DNSAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.Ser cms := srvDeps[utils.ConnManager].(*ConnManagerService) fs := srvDeps[utils.FilterS].(*FilterService) - dns.Lock() - defer dns.Unlock() + dns.mu.Lock() + defer dns.mu.Unlock() dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.FilterS(), cms.ConnManager()) if err != nil { dns.dns = nil @@ -87,8 +87,8 @@ func (dns *DNSAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager.Se cms := srvDeps[utils.ConnManager].(*ConnManagerService) fs := srvDeps[utils.FilterS].(*FilterService) - dns.Lock() - defer dns.Unlock() + dns.mu.Lock() + defer dns.mu.Unlock() if dns.dns != nil { close(dns.stopChan) @@ -122,8 +122,8 @@ func (dns *DNSAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { return } close(dns.stopChan) - dns.Lock() - defer dns.Unlock() + dns.mu.Lock() + defer dns.mu.Unlock() dns.dns = nil return } diff --git a/services/ees.go b/services/ees.go index e031d2496..c8afaf690 100644 --- a/services/ees.go +++ b/services/ees.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/ees" "github.com/cgrates/cgrates/engine" @@ -39,43 +38,12 @@ func NewEventExporterService(cfg *config.CGRConfig) *EventExporterService { // EventExporterService is the service structure for EventExporterS type EventExporterService struct { - mu sync.RWMutex - cfg *config.CGRConfig - - eeS *ees.EeS - cl *commonlisteners.CommonListenerS - + mu sync.RWMutex + cfg *config.CGRConfig + eeS *ees.EeS stateDeps *StateDependencies // channel subscriptions for state changes } -// ServiceName returns the service name -func (es *EventExporterService) ServiceName() string { - return utils.EEs -} - -// ShouldRun returns if the service should be running -func (es *EventExporterService) ShouldRun() (should bool) { - return es.cfg.EEsCfg().Enabled -} - -// Reload handles the change of config -func (es *EventExporterService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error { - es.mu.Lock() - defer es.mu.Unlock() - es.eeS.ClearExporterCache() - return es.eeS.SetupExporterCache() -} - -// Shutdown stops the service -func (es *EventExporterService) Shutdown(_ *servmanager.ServiceRegistry) error { - es.mu.Lock() - defer es.mu.Unlock() - es.eeS.ClearExporterCache() - es.eeS = nil - es.cl.RpcUnregisterName(utils.EeSv1) - return nil -} - // Start should handle the service start func (es *EventExporterService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, @@ -88,7 +56,7 @@ func (es *EventExporterService) Start(_ *utils.SyncedChan, registry *servmanager if err != nil { return err } - es.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) fs := srvDeps[utils.FilterS].(*FilterService).FilterS() @@ -102,11 +70,40 @@ func (es *EventExporterService) Start(_ *utils.SyncedChan, registry *servmanager srv, _ := engine.NewServiceWithPing(es.eeS, utils.EeSv1, utils.V1Prfx) // srv, _ := birpc.NewService(es.rpc, "", false) - es.cl.RpcRegister(srv) + cl.RpcRegister(srv) cms.AddInternalConn(utils.EEs, srv) return nil } +// Reload handles the change of config +func (es *EventExporterService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error { + es.mu.Lock() + defer es.mu.Unlock() + es.eeS.ClearExporterCache() + return es.eeS.SetupExporterCache() +} + +// Shutdown stops the service +func (es *EventExporterService) Shutdown(registry *servmanager.ServiceRegistry) error { + es.mu.Lock() + defer es.mu.Unlock() + es.eeS.ClearExporterCache() + es.eeS = nil + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + cl.RpcUnregisterName(utils.EeSv1) + return nil +} + +// ServiceName returns the service name +func (es *EventExporterService) ServiceName() string { + return utils.EEs +} + +// ShouldRun returns if the service should be running +func (es *EventExporterService) ShouldRun() (should bool) { + return es.cfg.EEsCfg().Enabled +} + // StateChan returns signaling channel of specific state func (es *EventExporterService) StateChan(stateID string) chan struct{} { return es.stateDeps.StateChan(stateID) diff --git a/services/efs.go b/services/efs.go index eea73a6c2..e548b033a 100644 --- a/services/efs.go +++ b/services/efs.go @@ -22,7 +22,6 @@ import ( "sync" "github.com/cgrates/birpc" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/efs" "github.com/cgrates/cgrates/engine" @@ -32,15 +31,11 @@ import ( // ExportFailoverService is the service structure for ExportFailover type ExportFailoverService struct { - sync.Mutex - - efS *efs.EfS - cl *commonlisteners.CommonListenerS - srv *birpc.Service - - stopChan chan struct{} - cfg *config.CGRConfig - + mu sync.Mutex + efS *efs.EfS + srv *birpc.Service + stopChan chan struct{} + cfg *config.CGRConfig stateDeps *StateDependencies // channel subscriptions for state changes } @@ -53,54 +48,59 @@ func NewExportFailoverService(cfg *config.CGRConfig) *ExportFailoverService { } // Start should handle the service start -func (efServ *ExportFailoverService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { +func (s *ExportFailoverService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, utils.ConnManager, }, - registry, efServ.cfg.GeneralCfg().ConnectTimeout) + registry, s.cfg.GeneralCfg().ConnectTimeout) if err != nil { return } - efServ.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) - efServ.Lock() - defer efServ.Unlock() + s.mu.Lock() + defer s.mu.Unlock() - efServ.efS = efs.NewEfs(efServ.cfg, cms.ConnManager()) - efServ.stopChan = make(chan struct{}) - efServ.srv, _ = engine.NewServiceWithPing(efServ.efS, utils.EfSv1, utils.V1Prfx) - efServ.cl.RpcRegister(efServ.srv) - cms.AddInternalConn(utils.EFs, efServ.srv) + s.efS = efs.NewEfs(s.cfg, cms.ConnManager()) + s.stopChan = make(chan struct{}) + s.srv, _ = engine.NewServiceWithPing(s.efS, utils.EfSv1, utils.V1Prfx) + cl.RpcRegister(s.srv) + cms.AddInternalConn(utils.EFs, s.srv) return } // Reload handles the change of config -func (efServ *ExportFailoverService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { +func (s *ExportFailoverService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { return } // Shutdown stops the service -func (efServ *ExportFailoverService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - efServ.srv = nil - close(efServ.stopChan) +func (s *ExportFailoverService) Shutdown(registry *servmanager.ServiceRegistry) (err error) { + s.mu.Lock() + defer s.mu.Unlock() + s.srv = nil + close(s.stopChan) + + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + cl.RpcUnregisterName(utils.EfSv1) // NEXT SHOULD EXPORT ALL THE SHUTDOWN LOGGERS TO WRITE return } // ShouldRun returns if the service should be running -func (efServ *ExportFailoverService) ShouldRun() bool { - return efServ.cfg.EFsCfg().Enabled +func (s *ExportFailoverService) ShouldRun() bool { + return s.cfg.EFsCfg().Enabled } // ServiceName returns the service name -func (efServ *ExportFailoverService) ServiceName() string { +func (s *ExportFailoverService) ServiceName() string { return utils.EFs } // StateChan returns signaling channel of specific state -func (efServ *ExportFailoverService) StateChan(stateID string) chan struct{} { - return efServ.stateDeps.StateChan(stateID) +func (s *ExportFailoverService) StateChan(stateID string) chan struct{} { + return s.stateDeps.StateChan(stateID) } diff --git a/services/ers.go b/services/ers.go index 1f7acc1a8..57dad85f5 100644 --- a/services/ers.go +++ b/services/ers.go @@ -22,7 +22,6 @@ import ( "fmt" "sync" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/ers" @@ -41,15 +40,11 @@ func NewEventReaderService(cfg *config.CGRConfig) *EventReaderService { // EventReaderService implements Service interface type EventReaderService struct { - sync.RWMutex - cfg *config.CGRConfig - - ers *ers.ERService - cl *commonlisteners.CommonListenerS - - rldChan chan struct{} - stopChan chan struct{} - + mu sync.RWMutex + cfg *config.CGRConfig + ers *ers.ERService + rldChan chan struct{} + stopChan chan struct{} stateDeps *StateDependencies // channel subscriptions for state changes } @@ -65,12 +60,12 @@ func (erS *EventReaderService) Start(shutdown *utils.SyncedChan, registry *servm if err != nil { return err } - erS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) fs := srvDeps[utils.FilterS].(*FilterService) - erS.Lock() - defer erS.Unlock() + erS.mu.Lock() + defer erS.mu.Unlock() // remake the stop chan erS.stopChan = make(chan struct{}) @@ -83,7 +78,7 @@ func (erS *EventReaderService) Start(shutdown *utils.SyncedChan, registry *servm if err != nil { return err } - erS.cl.RpcRegister(srv) + cl.RpcRegister(srv) cms.AddInternalConn(utils.ERs, srv) return } @@ -98,19 +93,20 @@ func (erS *EventReaderService) listenAndServe(ers *ers.ERService, stopChan, rldC // Reload handles the change of config func (erS *EventReaderService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { - erS.RLock() + erS.mu.RLock() erS.rldChan <- struct{}{} - erS.RUnlock() + erS.mu.RUnlock() return } // Shutdown stops the service -func (erS *EventReaderService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - erS.Lock() - defer erS.Unlock() +func (erS *EventReaderService) Shutdown(registry *servmanager.ServiceRegistry) (err error) { + erS.mu.Lock() + defer erS.mu.Unlock() close(erS.stopChan) erS.ers = nil - erS.cl.RpcUnregisterName(utils.ErSv1) + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + cl.RpcUnregisterName(utils.ErSv1) return } diff --git a/services/guardian.go b/services/guardian.go index 69bd1f072..4d1608b0c 100644 --- a/services/guardian.go +++ b/services/guardian.go @@ -19,9 +19,6 @@ along with this program. If not, see package services import ( - "sync" - - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/guardian" @@ -39,9 +36,7 @@ func NewGuardianService(cfg *config.CGRConfig) *GuardianService { // GuardianService implements Service interface. type GuardianService struct { - mu sync.RWMutex cfg *config.CGRConfig - cl *commonlisteners.CommonListenerS stateDeps *StateDependencies // channel subscriptions for state changes } @@ -56,15 +51,12 @@ func (s *GuardianService) Start(_ *utils.SyncedChan, registry *servmanager.Servi if err != nil { return err } - s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) - s.mu.Lock() - defer s.mu.Unlock() - svcs, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true) for _, svc := range svcs { - s.cl.RpcRegister(svc) + cl.RpcRegister(svc) } cms.AddInternalConn(utils.GuardianS, svcs) return nil @@ -76,10 +68,9 @@ func (s *GuardianService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegi } // Shutdown stops the service. -func (s *GuardianService) Shutdown(_ *servmanager.ServiceRegistry) error { - s.mu.Lock() - defer s.mu.Unlock() - s.cl.RpcUnregisterName(utils.GuardianSv1) +func (s *GuardianService) Shutdown(registry *servmanager.ServiceRegistry) error { + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + cl.RpcUnregisterName(utils.GuardianSv1) return nil } diff --git a/services/httpagent.go b/services/httpagent.go index 575b8c74a..096df69a5 100644 --- a/services/httpagent.go +++ b/services/httpagent.go @@ -22,7 +22,6 @@ import ( "sync" "github.com/cgrates/cgrates/agents" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" @@ -38,11 +37,9 @@ func NewHTTPAgent(cfg *config.CGRConfig) *HTTPAgent { // HTTPAgent implements Agent interface type HTTPAgent struct { - sync.RWMutex + mu sync.RWMutex cfg *config.CGRConfig - cl *commonlisteners.CommonListenerS - // we can realy stop the HTTPAgent so keep a flag // if we registerd the handlers started bool @@ -63,16 +60,16 @@ func (ha *HTTPAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceReg return err } cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() - cms := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager() - fs := srvDeps[utils.FilterS].(*FilterService) + cm := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager() + fs := srvDeps[utils.FilterS].(*FilterService).FilterS() - ha.Lock() - defer ha.Unlock() + ha.mu.Lock() + defer ha.mu.Unlock() ha.started = true for _, agntCfg := range ha.cfg.HTTPAgentCfg() { cl.RegisterHttpHandler(agntCfg.URL, - agents.NewHTTPAgent(cms, agntCfg.SessionSConns, fs.FilterS(), + agents.NewHTTPAgent(cm, agntCfg.SessionSConns, fs, ha.cfg.GeneralCfg().DefaultTenant, agntCfg.RequestPayload, agntCfg.ReplyPayload, agntCfg.RequestProcessors)) } @@ -86,9 +83,9 @@ func (ha *HTTPAgent) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) // Shutdown stops the service func (ha *HTTPAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - ha.Lock() + ha.mu.Lock() ha.started = false - ha.Unlock() + ha.mu.Unlock() return // no shutdown for the momment } diff --git a/services/janus.go b/services/janus.go index a14cab76f..aaf0c2bc8 100644 --- a/services/janus.go +++ b/services/janus.go @@ -39,10 +39,9 @@ func NewJanusAgent(cfg *config.CGRConfig) *JanusAgent { // JanusAgent implements Service interface type JanusAgent struct { - sync.RWMutex + mu sync.RWMutex cfg *config.CGRConfig - - jA *agents.JanusAgent + jA *agents.JanusAgent // we can realy stop the JanusAgent so keep a flag // if we registerd the jandlers @@ -67,9 +66,9 @@ func (ja *JanusAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe cms := srvDeps[utils.ConnManager].(*ConnManagerService) fs := srvDeps[utils.FilterS].(*FilterService) - ja.Lock() + ja.mu.Lock() + defer ja.mu.Unlock() if ja.started { - ja.Unlock() return utils.ErrServiceAlreadyRunning } ja.jA, err = agents.NewJanusAgent(ja.cfg, cms.ConnManager(), fs.FilterS()) @@ -89,7 +88,6 @@ func (ja *JanusAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe cl.RegisterHttpHandler(fmt.Sprintf("POST %s/{sessionID}/{handleID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.HandlePlugin)) ja.started = true - ja.Unlock() return } @@ -100,10 +98,10 @@ func (ja *JanusAgent) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry // Shutdown stops the service func (ja *JanusAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - ja.Lock() + ja.mu.Lock() err = ja.jA.Shutdown() ja.started = false - ja.Unlock() + ja.mu.Unlock() return // no shutdown for the momment } diff --git a/services/loaders.go b/services/loaders.go index 103539de8..b6995f12a 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/loaders" @@ -43,10 +42,9 @@ func NewLoaderService(cfg *config.CGRConfig, preloadIDs []string) *LoaderService // LoaderService implements Service interface type LoaderService struct { - sync.RWMutex + mu sync.RWMutex cfg *config.CGRConfig ldrs *loaders.LoaderS - cl *commonlisteners.CommonListenerS preloadIDs []string stopChan chan struct{} stateDeps *StateDependencies // channel subscriptions for state changes @@ -65,13 +63,13 @@ func (s *LoaderService) Start(_ *utils.SyncedChan, registry *servmanager.Service if err != nil { return err } - s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) fs := srvDeps[utils.FilterS].(*FilterService).FilterS() dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager() - s.Lock() - defer s.Unlock() + s.mu.Lock() + defer s.mu.Unlock() s.ldrs = loaders.NewLoaderS(s.cfg, dbs, fs, cms.ConnManager()) if !s.ldrs.Enabled() { @@ -97,7 +95,7 @@ func (s *LoaderService) Start(_ *utils.SyncedChan, registry *servmanager.Service srv, _ := engine.NewService(s.ldrs) // srv, _ := birpc.NewService(apis.NewLoaderSv1(ldrs.ldrs), "", false) for _, svc := range srv { - s.cl.RpcRegister(svc) + cl.RpcRegister(svc) } cms.AddInternalConn(utils.LoaderS, srv) return nil @@ -121,21 +119,21 @@ func (s *LoaderService) Reload(_ *utils.SyncedChan, registry *servmanager.Servic close(s.stopChan) s.stopChan = make(chan struct{}) - s.RLock() - defer s.RUnlock() + s.mu.RLock() + defer s.mu.RUnlock() s.ldrs.Reload(dbs, fs, cms) return s.ldrs.ListenAndServe(s.stopChan) } // Shutdown stops the service -func (s *LoaderService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) { - s.Lock() - s.ldrs = nil +func (s *LoaderService) Shutdown(registry *servmanager.ServiceRegistry) error { + s.mu.Lock() + defer s.mu.Unlock() close(s.stopChan) - s.cl.RpcUnregisterName(utils.LoaderSv1) - s.Unlock() - return + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + cl.RpcUnregisterName(utils.LoaderSv1) + return nil } // ServiceName returns the service name @@ -148,11 +146,6 @@ func (s *LoaderService) ShouldRun() bool { return s.cfg.LoaderCfg().Enabled() } -// GetLoaderS returns the initialized LoaderService -func (s *LoaderService) GetLoaderS() *loaders.LoaderS { - return s.ldrs -} - // StateChan returns signaling channel of specific state func (s *LoaderService) StateChan(stateID string) chan struct{} { return s.stateDeps.StateChan(stateID) diff --git a/services/logger.go b/services/logger.go index 9627007d2..7a95c9d72 100644 --- a/services/logger.go +++ b/services/logger.go @@ -19,8 +19,6 @@ along with this program. If not, see package services import ( - "sync" - "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/efs" @@ -40,10 +38,8 @@ func NewLoggerService(cfg *config.CGRConfig, loggerType string) *LoggerService { // LoggerService implements Service interface. type LoggerService struct { - mu sync.RWMutex - cfg *config.CGRConfig - stateDeps *StateDependencies // channel subscriptions for state changes - + cfg *config.CGRConfig + stateDeps *StateDependencies // channel subscriptions for state changes loggerType string } diff --git a/services/radiusagent.go b/services/radiusagent.go index 66194b9b2..8701d803e 100644 --- a/services/radiusagent.go +++ b/services/radiusagent.go @@ -38,16 +38,13 @@ func NewRadiusAgent(cfg *config.CGRConfig) *RadiusAgent { // RadiusAgent implements Agent interface type RadiusAgent struct { - sync.RWMutex - cfg *config.CGRConfig - stopChan chan struct{} - - rad *agents.RadiusAgent - - lnet string - lauth string - lacct string - + mu sync.RWMutex + cfg *config.CGRConfig + stopChan chan struct{} + rad *agents.RadiusAgent + lnet string + lauth string + lacct string stateDeps *StateDependencies // channel subscriptions for state changes } @@ -65,8 +62,8 @@ func (rad *RadiusAgent) Start(shutdown *utils.SyncedChan, registry *servmanager. cms := srvDeps[utils.ConnManager].(*ConnManagerService) fs := srvDeps[utils.FilterS].(*FilterService) - rad.Lock() - defer rad.Unlock() + rad.mu.Lock() + defer rad.mu.Unlock() rad.lnet = rad.cfg.RadiusAgentCfg().ListenNet rad.lauth = rad.cfg.RadiusAgentCfg().ListenAuth @@ -108,10 +105,10 @@ func (rad *RadiusAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { } func (rad *RadiusAgent) shutdown() { - rad.Lock() + rad.mu.Lock() + defer rad.mu.Unlock() close(rad.stopChan) rad.rad = nil - rad.Unlock() } // ServiceName returns the service name diff --git a/services/rankings.go b/services/rankings.go index 9b462bc92..3c5217c82 100644 --- a/services/rankings.go +++ b/services/rankings.go @@ -23,7 +23,6 @@ import ( "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -39,12 +38,9 @@ func NewRankingService(cfg *config.CGRConfig) *RankingService { } type RankingService struct { - sync.RWMutex - cfg *config.CGRConfig - - ran *engine.RankingS - cl *commonlisteners.CommonListenerS - + mu sync.RWMutex + cfg *config.CGRConfig + ran *engine.RankingS stateDeps *StateDependencies // channel subscriptions for state changes } @@ -62,7 +58,7 @@ func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanag if err != nil { return err } - ran.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, @@ -73,8 +69,8 @@ func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanag fs := srvDeps[utils.FilterS].(*FilterService) dbs := srvDeps[utils.DataDB].(*DataDBService) - ran.Lock() - defer ran.Unlock() + ran.mu.Lock() + defer ran.mu.Unlock() ran.ran = engine.NewRankingS(dbs.DataManager(), cms.ConnManager(), fs.FilterS(), ran.cfg) if err := ran.ran.StartRankingS(context.TODO()); err != nil { return err @@ -84,7 +80,7 @@ func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanag return err } for _, s := range srv { - ran.cl.RpcRegister(s) + cl.RpcRegister(s) } cms.AddInternalConn(utils.RankingS, srv) return nil @@ -92,19 +88,20 @@ func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanag // Reload handles the change of config func (ran *RankingService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { - ran.Lock() + ran.mu.Lock() ran.ran.Reload(context.TODO()) - ran.Unlock() + ran.mu.Unlock() return } // Shutdown stops the service -func (ran *RankingService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - ran.Lock() - defer ran.Unlock() +func (ran *RankingService) Shutdown(registry *servmanager.ServiceRegistry) (err error) { + ran.mu.Lock() + defer ran.mu.Unlock() ran.ran.StopRankingS() ran.ran = nil - ran.cl.RpcUnregisterName(utils.RankingSv1) + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + cl.RpcUnregisterName(utils.RankingSv1) return } diff --git a/services/rates.go b/services/rates.go index 2082e2db2..18d54a845 100644 --- a/services/rates.go +++ b/services/rates.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/rates" @@ -40,41 +39,14 @@ func NewRateService(cfg *config.CGRConfig) *RateService { // RateService is the service structure for RateS type RateService struct { - sync.RWMutex + mu sync.RWMutex cfg *config.CGRConfig rateS *rates.RateS - cl *commonlisteners.CommonListenerS rldChan chan struct{} stopChan chan struct{} stateDeps *StateDependencies // channel subscriptions for state changes } -// ServiceName returns the service name -func (rs *RateService) ServiceName() string { - return utils.RateS -} - -// ShouldRun returns if the service should be running -func (rs *RateService) ShouldRun() (should bool) { - return rs.cfg.RateSCfg().Enabled -} - -// Reload handles the change of config -func (rs *RateService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (_ error) { - rs.rldChan <- struct{}{} - return -} - -// Shutdown stops the service -func (rs *RateService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - rs.Lock() - defer rs.Unlock() - close(rs.stopChan) - rs.rateS = nil - rs.cl.RpcUnregisterName(utils.RateSv1) - return -} - // Start should handle the service start func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, @@ -89,7 +61,7 @@ func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.S if err != nil { return err } - rs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, @@ -101,9 +73,9 @@ func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.S fs := srvDeps[utils.FilterS].(*FilterService).FilterS() dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager() - rs.Lock() + rs.mu.Lock() rs.rateS = rates.NewRateS(rs.cfg, fs, dbs) - rs.Unlock() + rs.mu.Unlock() rs.stopChan = make(chan struct{}) go rs.rateS.ListenAndServe(rs.stopChan, rs.rldChan) @@ -113,12 +85,39 @@ func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.S return err } // srv, _ := birpc.NewService(apis.NewRateSv1(rs.rateS), "", false) - rs.cl.RpcRegister(srv) + cl.RpcRegister(srv) cms.AddInternalConn(utils.RateS, srv) return } +// Reload handles the change of config +func (rs *RateService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (_ error) { + rs.rldChan <- struct{}{} + return +} + +// Shutdown stops the service +func (rs *RateService) Shutdown(registry *servmanager.ServiceRegistry) (err error) { + rs.mu.Lock() + defer rs.mu.Unlock() + close(rs.stopChan) + rs.rateS = nil + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + cl.RpcUnregisterName(utils.RateSv1) + return +} + // StateChan returns signaling channel of specific state func (rs *RateService) StateChan(stateID string) chan struct{} { return rs.stateDeps.StateChan(stateID) } + +// ServiceName returns the service name +func (rs *RateService) ServiceName() string { + return utils.RateS +} + +// ShouldRun returns if the service should be running +func (rs *RateService) ShouldRun() (should bool) { + return rs.cfg.RateSCfg().Enabled +} diff --git a/services/registrarc.go b/services/registrarc.go index bdd07177c..9d9795264 100644 --- a/services/registrarc.go +++ b/services/registrarc.go @@ -37,21 +37,18 @@ func NewRegistrarCService(cfg *config.CGRConfig) *RegistrarCService { // RegistrarCService implements Service interface type RegistrarCService struct { - sync.RWMutex - cfg *config.CGRConfig - - dspS *registrarc.RegistrarCService - - stopChan chan struct{} - rldChan chan struct{} - + mu sync.RWMutex + cfg *config.CGRConfig + dspS *registrarc.RegistrarCService + stopChan chan struct{} + rldChan chan struct{} stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start func (dspS *RegistrarCService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { - dspS.Lock() - defer dspS.Unlock() + dspS.mu.Lock() + defer dspS.mu.Unlock() cms, err := WaitForServiceState(utils.StateServiceUP, utils.ConnManager, registry, dspS.cfg.GeneralCfg().ConnectTimeout) if err != nil { @@ -73,11 +70,11 @@ func (dspS *RegistrarCService) Reload(_ *utils.SyncedChan, _ *servmanager.Servic // Shutdown stops the service func (dspS *RegistrarCService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - dspS.Lock() + dspS.mu.Lock() + defer dspS.mu.Unlock() close(dspS.stopChan) dspS.dspS.Shutdown() dspS.dspS = nil - dspS.Unlock() return } diff --git a/services/resources.go b/services/resources.go index 602d95661..f26a7af6a 100644 --- a/services/resources.go +++ b/services/resources.go @@ -22,7 +22,6 @@ import ( "sync" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -39,12 +38,9 @@ func NewResourceService(cfg *config.CGRConfig) *ResourceService { // ResourceService implements Service interface type ResourceService struct { - sync.RWMutex - cfg *config.CGRConfig - - reS *engine.ResourceS - cl *commonlisteners.CommonListenerS - + mu sync.RWMutex + cfg *config.CGRConfig + reS *engine.ResourceS stateDeps *StateDependencies // channel subscriptions for state changes } @@ -62,7 +58,7 @@ func (reS *ResourceService) Start(shutdown *utils.SyncedChan, registry *servmana if err != nil { return err } - reS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, @@ -74,14 +70,14 @@ func (reS *ResourceService) Start(shutdown *utils.SyncedChan, registry *servmana fs := srvDeps[utils.FilterS].(*FilterService) dbs := srvDeps[utils.DataDB].(*DataDBService) - reS.Lock() - defer reS.Unlock() + reS.mu.Lock() + defer reS.mu.Unlock() reS.reS = engine.NewResourceService(dbs.DataManager(), reS.cfg, fs.FilterS(), cms.ConnManager()) reS.reS.StartLoop(context.TODO()) srv, _ := engine.NewService(reS.reS) // srv, _ := birpc.NewService(apis.NewResourceSv1(reS.reS), "", false) for _, s := range srv { - reS.cl.RpcRegister(s) + cl.RpcRegister(s) } cms.AddInternalConn(utils.ResourceS, srv) return @@ -89,19 +85,20 @@ func (reS *ResourceService) Start(shutdown *utils.SyncedChan, registry *servmana // Reload handles the change of config func (reS *ResourceService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { - reS.Lock() + reS.mu.Lock() reS.reS.Reload(context.TODO()) - reS.Unlock() + reS.mu.Unlock() return } // Shutdown stops the service -func (reS *ResourceService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - reS.Lock() - defer reS.Unlock() +func (reS *ResourceService) Shutdown(registry *servmanager.ServiceRegistry) (err error) { + reS.mu.Lock() + defer reS.mu.Unlock() reS.reS.Shutdown(context.TODO()) //we don't verify the error because shutdown never returns an error reS.reS = nil - reS.cl.RpcUnregisterName(utils.ResourceSv1) + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + cl.RpcUnregisterName(utils.ResourceSv1) return } diff --git a/services/routes.go b/services/routes.go index f8e1a0225..557ae23ce 100644 --- a/services/routes.go +++ b/services/routes.go @@ -21,7 +21,6 @@ package services import ( "sync" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -38,12 +37,9 @@ func NewRouteService(cfg *config.CGRConfig) *RouteService { // RouteService implements Service interface type RouteService struct { - sync.RWMutex - cfg *config.CGRConfig - - routeS *engine.RouteS - cl *commonlisteners.CommonListenerS - + mu sync.RWMutex + cfg *config.CGRConfig + routeS *engine.RouteS stateDeps *StateDependencies // channel subscriptions for state changes } @@ -61,7 +57,7 @@ func (routeS *RouteService) Start(shutdown *utils.SyncedChan, registry *servmana if err != nil { return err } - routeS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, @@ -72,13 +68,13 @@ func (routeS *RouteService) Start(shutdown *utils.SyncedChan, registry *servmana fs := srvDeps[utils.FilterS].(*FilterService) dbs := srvDeps[utils.DataDB].(*DataDBService) - routeS.Lock() - defer routeS.Unlock() + routeS.mu.Lock() + defer routeS.mu.Unlock() routeS.routeS = engine.NewRouteService(dbs.DataManager(), fs.FilterS(), routeS.cfg, cms.ConnManager()) srv, _ := engine.NewService(routeS.routeS) // srv, _ := birpc.NewService(apis.NewRouteSv1(routeS.routeS), "", false) for _, s := range srv { - routeS.cl.RpcRegister(s) + cl.RpcRegister(s) } cms.AddInternalConn(utils.RouteS, srv) return @@ -90,11 +86,12 @@ func (routeS *RouteService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRe } // Shutdown stops the service -func (routeS *RouteService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - routeS.Lock() - defer routeS.Unlock() +func (routeS *RouteService) Shutdown(registry *servmanager.ServiceRegistry) (err error) { + routeS.mu.Lock() + defer routeS.mu.Unlock() routeS.routeS = nil - routeS.cl.RpcUnregisterName(utils.RouteSv1) + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + cl.RpcUnregisterName(utils.RouteSv1) return } diff --git a/services/sessions.go b/services/sessions.go index 57de1c2d6..defd5b5dd 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -41,16 +41,12 @@ func NewSessionService(cfg *config.CGRConfig) *SessionService { // SessionService implements Service interface type SessionService struct { - sync.RWMutex - - sm *sessions.SessionS - cl *commonlisteners.CommonListenerS - + mu sync.RWMutex + sm *sessions.SessionS bircpEnabled bool // to stop birpc server if needed stopChan chan struct{} cfg *config.CGRConfig - - stateDeps *StateDependencies // channel subscriptions for state changes + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start @@ -66,15 +62,15 @@ func (smg *SessionService) Start(shutdown *utils.SyncedChan, registry *servmanag if err != nil { return err } - smg.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) - fs := srvDeps[utils.FilterS].(*FilterService) - dbs := srvDeps[utils.DataDB].(*DataDBService) + fs := srvDeps[utils.FilterS].(*FilterService).FilterS() + dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager() - smg.Lock() - defer smg.Unlock() + smg.mu.Lock() + defer smg.mu.Unlock() - smg.sm = sessions.NewSessionS(smg.cfg, dbs.DataManager(), fs.FilterS(), cms.ConnManager()) + smg.sm = sessions.NewSessionS(smg.cfg, dbs, fs, cms.ConnManager()) //start sync session in a separate goroutine smg.stopChan = make(chan struct{}) go smg.sm.ListenAndServe(smg.stopChan) @@ -84,28 +80,28 @@ func (smg *SessionService) Start(shutdown *utils.SyncedChan, registry *servmanag srv, _ := engine.NewServiceWithName(smg.sm, utils.SessionS, true) // methods with multiple options // srv, _ := birpc.NewService(apis.NewSessionSv1(smg.sm), utils.EmptyString, false) // methods with multiple options for _, s := range srv { - smg.cl.RpcRegister(s) + cl.RpcRegister(s) } // Register BiRpc handlers if smg.cfg.SessionSCfg().ListenBijson != utils.EmptyString { smg.bircpEnabled = true for n, s := range srv { - smg.cl.BiRPCRegisterName(n, s) + cl.BiRPCRegisterName(n, s) } // run this in it's own goroutine - go smg.start(shutdown) + go smg.start(shutdown, cl) } cms.AddInternalConn(utils.SessionS, srv) return } -func (smg *SessionService) start(shutdown *utils.SyncedChan) (err error) { - if err := smg.cl.ServeBiRPC(smg.cfg.SessionSCfg().ListenBijson, +func (smg *SessionService) start(shutdown *utils.SyncedChan, cl *commonlisteners.CommonListenerS) (err error) { + if err := cl.ServeBiRPC(smg.cfg.SessionSCfg().ListenBijson, smg.cfg.SessionSCfg().ListenBigob, smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> serve BiRPC error: %s!", utils.SessionS, err)) - smg.Lock() + smg.mu.Lock() smg.bircpEnabled = false - smg.Unlock() + smg.mu.Unlock() shutdown.CloseOnce() } return @@ -117,19 +113,20 @@ func (smg *SessionService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceReg } // Shutdown stops the service -func (smg *SessionService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - smg.Lock() - defer smg.Unlock() +func (smg *SessionService) Shutdown(registry *servmanager.ServiceRegistry) (err error) { + smg.mu.Lock() + defer smg.mu.Unlock() close(smg.stopChan) if err = smg.sm.Shutdown(); err != nil { return } + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() if smg.bircpEnabled { - smg.cl.StopBiRPC() + cl.StopBiRPC() smg.bircpEnabled = false } smg.sm = nil - smg.cl.RpcUnregisterName(utils.SessionSv1) + cl.RpcUnregisterName(utils.SessionSv1) // smg.server.BiRPCUnregisterName(utils.SessionSv1) return } diff --git a/services/sipagent.go b/services/sipagent.go index 6447ef29c..c29063249 100644 --- a/services/sipagent.go +++ b/services/sipagent.go @@ -38,12 +38,10 @@ func NewSIPAgent(cfg *config.CGRConfig) *SIPAgent { // SIPAgent implements Agent interface type SIPAgent struct { - sync.RWMutex - cfg *config.CGRConfig - + mu sync.RWMutex + cfg *config.CGRConfig sip *agents.SIPAgent oldListen string - stateDeps *StateDependencies // channel subscriptions for state changes } @@ -61,8 +59,8 @@ func (sip *SIPAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.Ser cm := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager() fs := srvDeps[utils.FilterS].(*FilterService).FilterS() - sip.Lock() - defer sip.Unlock() + sip.mu.Lock() + defer sip.mu.Unlock() sip.oldListen = sip.cfg.SIPAgentCfg().Listen sip.sip, err = agents.NewSIPAgent(cm, sip.cfg, fs) if err != nil { @@ -84,19 +82,19 @@ func (sip *SIPAgent) Reload(shutdown *utils.SyncedChan, _ *servmanager.ServiceRe if sip.oldListen == sip.cfg.SIPAgentCfg().Listen { return } - sip.Lock() + sip.mu.Lock() sip.sip.Shutdown() sip.oldListen = sip.cfg.SIPAgentCfg().Listen sip.sip.InitStopChan() - sip.Unlock() + sip.mu.Unlock() go sip.listenAndServe(shutdown) return } // Shutdown stops the service func (sip *SIPAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - sip.Lock() - defer sip.Unlock() + sip.mu.Lock() + defer sip.mu.Unlock() sip.sip.Shutdown() sip.sip = nil return diff --git a/services/stats.go b/services/stats.go index 557b76d39..b6141b79c 100644 --- a/services/stats.go +++ b/services/stats.go @@ -22,7 +22,6 @@ import ( "sync" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -39,12 +38,9 @@ func NewStatService(cfg *config.CGRConfig) *StatService { // StatService implements Service interface type StatService struct { - sync.RWMutex - cfg *config.CGRConfig - - sts *engine.StatS - cl *commonlisteners.CommonListenerS - + mu sync.RWMutex + cfg *config.CGRConfig + sts *engine.StatS stateDeps *StateDependencies // channel subscriptions for state changes } @@ -62,7 +58,7 @@ func (sts *StatService) Start(shutdown *utils.SyncedChan, registry *servmanager. if err != nil { return err } - sts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, @@ -74,14 +70,14 @@ func (sts *StatService) Start(shutdown *utils.SyncedChan, registry *servmanager. fs := srvDeps[utils.FilterS].(*FilterService) dbs := srvDeps[utils.DataDB].(*DataDBService) - sts.Lock() - defer sts.Unlock() + sts.mu.Lock() + defer sts.mu.Unlock() sts.sts = engine.NewStatService(dbs.DataManager(), sts.cfg, fs.FilterS(), cms.ConnManager()) sts.sts.StartLoop(context.TODO()) srv, _ := engine.NewService(sts.sts) // srv, _ := birpc.NewService(apis.NewStatSv1(sts.sts), "", false) for _, s := range srv { - sts.cl.RpcRegister(s) + cl.RpcRegister(s) } cms.AddInternalConn(utils.StatS, srv) return @@ -89,19 +85,20 @@ func (sts *StatService) Start(shutdown *utils.SyncedChan, registry *servmanager. // Reload handles the change of config func (sts *StatService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { - sts.Lock() + sts.mu.Lock() sts.sts.Reload(context.TODO()) - sts.Unlock() + sts.mu.Unlock() return } // Shutdown stops the service -func (sts *StatService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - sts.Lock() - defer sts.Unlock() +func (sts *StatService) Shutdown(registry *servmanager.ServiceRegistry) (err error) { + sts.mu.Lock() + defer sts.mu.Unlock() sts.sts.Shutdown(context.TODO()) sts.sts = nil - sts.cl.RpcUnregisterName(utils.StatSv1) + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + cl.RpcUnregisterName(utils.StatSv1) return } diff --git a/services/stordb.go b/services/stordb.go index 55bd1359e..911baaef5 100644 --- a/services/stordb.go +++ b/services/stordb.go @@ -39,7 +39,7 @@ func NewStorDBService(cfg *config.CGRConfig, setVersions bool) *StorDBService { // StorDBService implements Service interface type StorDBService struct { - sync.RWMutex + mu sync.RWMutex cfg *config.CGRConfig oldDBCfg *config.StorDbCfg db engine.StorDB @@ -49,8 +49,8 @@ type StorDBService struct { // Start should handle the service start func (db *StorDBService) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { - db.Lock() - defer db.Unlock() + db.mu.Lock() + defer db.mu.Unlock() db.oldDBCfg = db.cfg.StorDbCfg().Clone() dbConn, err := engine.NewStorDBConn(db.cfg.StorDbCfg().Type, db.cfg.StorDbCfg().Host, db.cfg.StorDbCfg().Port, db.cfg.StorDbCfg().Name, db.cfg.StorDbCfg().User, @@ -76,8 +76,8 @@ func (db *StorDBService) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegist // Reload handles the change of config func (db *StorDBService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { - db.Lock() - defer db.Unlock() + db.mu.Lock() + defer db.mu.Unlock() if db.needsConnectionReload() { var d engine.StorDB if d, err = engine.NewStorDBConn(db.cfg.StorDbCfg().Type, db.cfg.StorDbCfg().Host, @@ -123,10 +123,10 @@ func (db *StorDBService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegis // Shutdown stops the service func (db *StorDBService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) { - db.Lock() + db.mu.Lock() + defer db.mu.Unlock() db.db.Close() db.db = nil - db.Unlock() return } diff --git a/services/thresholds.go b/services/thresholds.go index eeb3d0592..d744b0e91 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -22,7 +22,6 @@ import ( "sync" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -39,12 +38,9 @@ func NewThresholdService(cfg *config.CGRConfig) *ThresholdService { // ThresholdService implements Service interface type ThresholdService struct { - sync.RWMutex - cfg *config.CGRConfig - - thrs *engine.ThresholdS - cl *commonlisteners.CommonListenerS - + mu sync.RWMutex + cfg *config.CGRConfig + thrs *engine.ThresholdS stateDeps *StateDependencies // channel subscriptions for state changes } @@ -62,7 +58,7 @@ func (thrs *ThresholdService) Start(shutdown *utils.SyncedChan, registry *servma if err != nil { return err } - thrs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, @@ -74,35 +70,36 @@ func (thrs *ThresholdService) Start(shutdown *utils.SyncedChan, registry *servma fs := srvDeps[utils.FilterS].(*FilterService) dbs := srvDeps[utils.DataDB].(*DataDBService) - thrs.Lock() - defer thrs.Unlock() + thrs.mu.Lock() + defer thrs.mu.Unlock() thrs.thrs = engine.NewThresholdService(dbs.DataManager(), thrs.cfg, fs.FilterS(), cms.ConnManager()) thrs.thrs.StartLoop(context.TODO()) srv, _ := engine.NewService(thrs.thrs) // srv, _ := birpc.NewService(apis.NewThresholdSv1(thrs.thrs), "", false) for _, s := range srv { - thrs.cl.RpcRegister(s) + cl.RpcRegister(s) } cms.AddInternalConn(utils.ThresholdS, srv) return } // Reload handles the change of config -func (thrs *ThresholdService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (_ error) { - thrs.Lock() +func (thrs *ThresholdService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error { + thrs.mu.Lock() thrs.thrs.Reload(context.TODO()) - thrs.Unlock() - return + thrs.mu.Unlock() + return nil } // Shutdown stops the service -func (thrs *ThresholdService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) { - thrs.Lock() - defer thrs.Unlock() +func (thrs *ThresholdService) Shutdown(registry *servmanager.ServiceRegistry) error { + thrs.mu.Lock() + defer thrs.mu.Unlock() thrs.thrs.Shutdown(context.TODO()) thrs.thrs = nil - thrs.cl.RpcUnregisterName(utils.ThresholdSv1) - return + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + cl.RpcUnregisterName(utils.ThresholdSv1) + return nil } // ServiceName returns the service name diff --git a/services/tpes.go b/services/tpes.go index 366f0135b..72a322f68 100644 --- a/services/tpes.go +++ b/services/tpes.go @@ -22,7 +22,6 @@ import ( "github.com/cgrates/birpc" "github.com/cgrates/cgrates/apis" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/tpes" @@ -39,10 +38,9 @@ func NewTPeService(cfg *config.CGRConfig) *TPeService { // TypeService implements Service interface type TPeService struct { - sync.RWMutex + mu sync.RWMutex cfg *config.CGRConfig tpes *tpes.TPeS - cl *commonlisteners.CommonListenerS srv *birpc.Service stopChan chan struct{} stateDeps *StateDependencies // channel subscriptions for state changes @@ -50,7 +48,6 @@ type TPeService struct { // Start should handle the service start func (ts *TPeService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { - srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -61,14 +58,17 @@ func (ts *TPeService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe if err != nil { return err } - ts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cm := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager() dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager() + ts.mu.Lock() + defer ts.mu.Unlock() + ts.tpes = tpes.NewTPeS(ts.cfg, dbs, cm) ts.stopChan = make(chan struct{}) ts.srv, _ = birpc.NewService(apis.NewTPeSv1(ts.tpes), utils.EmptyString, false) - ts.cl.RpcRegister(ts.srv) + cl.RpcRegister(ts.srv) return } @@ -78,9 +78,13 @@ func (ts *TPeService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry } // Shutdown stops the service -func (ts *TPeService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { +func (ts *TPeService) Shutdown(registry *servmanager.ServiceRegistry) (err error) { + ts.mu.Lock() + defer ts.mu.Unlock() ts.srv = nil close(ts.stopChan) + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + cl.RpcUnregisterName(utils.TPeSv1) return } diff --git a/services/trends.go b/services/trends.go index 3c294df16..99eafdf6c 100644 --- a/services/trends.go +++ b/services/trends.go @@ -22,7 +22,6 @@ import ( "sync" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -38,12 +37,9 @@ func NewTrendService(cfg *config.CGRConfig) *TrendService { } type TrendService struct { - sync.RWMutex - cfg *config.CGRConfig - - trs *engine.TrendS - cl *commonlisteners.CommonListenerS - + mu sync.RWMutex + cfg *config.CGRConfig + trs *engine.TrendS stateDeps *StateDependencies // channel subscriptions for state changes } @@ -61,7 +57,7 @@ func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager if err != nil { return err } - trs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, @@ -72,8 +68,8 @@ func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager fs := srvDeps[utils.FilterS].(*FilterService) dbs := srvDeps[utils.DataDB].(*DataDBService) - trs.Lock() - defer trs.Unlock() + trs.mu.Lock() + defer trs.mu.Unlock() trs.trs = engine.NewTrendService(dbs.DataManager(), trs.cfg, fs.FilterS(), cms.ConnManager()) if err := trs.trs.StartTrendS(context.TODO()); err != nil { return err @@ -83,7 +79,7 @@ func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager return err } for _, s := range srv { - trs.cl.RpcRegister(s) + cl.RpcRegister(s) } cms.AddInternalConn(utils.TrendS, srv) return nil @@ -91,19 +87,20 @@ func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager // Reload handles the change of config func (trs *TrendService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) { - trs.Lock() + trs.mu.Lock() trs.trs.Reload(context.TODO()) - trs.Unlock() + trs.mu.Unlock() return } // Shutdown stops the service -func (trs *TrendService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { - trs.Lock() - defer trs.Unlock() +func (trs *TrendService) Shutdown(registry *servmanager.ServiceRegistry) (err error) { + trs.mu.Lock() + defer trs.mu.Unlock() trs.trs.StopTrendS() trs.trs = nil - trs.cl.RpcUnregisterName(utils.TrendSv1) + cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS() + cl.RpcUnregisterName(utils.TrendSv1) return }