diff --git a/services/accounts.go b/services/accounts.go
index 2ff193a27..438ed2ecb 100644
--- a/services/accounts.go
+++ b/services/accounts.go
@@ -22,7 +22,6 @@ import (
"sync"
"github.com/cgrates/cgrates/accounts"
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -41,15 +40,11 @@ func NewAccountService(cfg *config.CGRConfig) *AccountService {
// AccountService implements Service interface
type AccountService struct {
- sync.RWMutex
- cfg *config.CGRConfig
-
- acts *accounts.AccountS
- cl *commonlisteners.CommonListenerS
-
- rldChan chan struct{}
- stopChan chan struct{}
-
+ mu sync.RWMutex
+ cfg *config.CGRConfig
+ acts *accounts.AccountS
+ rldChan chan struct{}
+ stopChan chan struct{}
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -67,7 +62,7 @@ func (acts *AccountService) Start(shutdown *utils.SyncedChan, registry *servmana
if err != nil {
return err
}
- acts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
@@ -78,8 +73,8 @@ func (acts *AccountService) Start(shutdown *utils.SyncedChan, registry *servmana
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager()
- acts.Lock()
- defer acts.Unlock()
+ acts.mu.Lock()
+ defer acts.mu.Unlock()
acts.acts = accounts.NewAccountS(acts.cfg, fs, cms.ConnManager(), dbs)
acts.stopChan = make(chan struct{})
go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan)
@@ -87,9 +82,7 @@ func (acts *AccountService) Start(shutdown *utils.SyncedChan, registry *servmana
if err != nil {
return err
}
-
- acts.cl.RpcRegister(srv)
-
+ cl.RpcRegister(srv)
cms.AddInternalConn(utils.AccountS, srv)
return
}
@@ -101,12 +94,13 @@ func (acts *AccountService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRe
}
// Shutdown stops the service
-func (acts *AccountService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
- acts.Lock()
+func (acts *AccountService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
+ acts.mu.Lock()
+ defer acts.mu.Unlock()
close(acts.stopChan)
acts.acts = nil
- acts.Unlock()
- acts.cl.RpcUnregisterName(utils.AccountSv1)
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ cl.RpcUnregisterName(utils.AccountSv1)
return
}
diff --git a/services/actions.go b/services/actions.go
index eb9bfeb76..290622b63 100644
--- a/services/actions.go
+++ b/services/actions.go
@@ -22,7 +22,6 @@ import (
"sync"
"github.com/cgrates/cgrates/actions"
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -41,15 +40,11 @@ func NewActionService(cfg *config.CGRConfig) *ActionService {
// ActionService implements Service interface
type ActionService struct {
- sync.RWMutex
- cfg *config.CGRConfig
-
- acts *actions.ActionS
- cl *commonlisteners.CommonListenerS
-
- rldChan chan struct{}
- stopChan chan struct{}
-
+ mu sync.RWMutex
+ cfg *config.CGRConfig
+ acts *actions.ActionS
+ rldChan chan struct{}
+ stopChan chan struct{}
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -67,7 +62,7 @@ func (acts *ActionService) Start(shutdown *utils.SyncedChan, registry *servmanag
if err != nil {
return err
}
- acts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
@@ -78,8 +73,8 @@ func (acts *ActionService) Start(shutdown *utils.SyncedChan, registry *servmanag
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager()
- acts.Lock()
- defer acts.Unlock()
+ acts.mu.Lock()
+ defer acts.mu.Unlock()
acts.acts = actions.NewActionS(acts.cfg, fs, dbs, cms.ConnManager())
acts.stopChan = make(chan struct{})
go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan)
@@ -88,7 +83,7 @@ func (acts *ActionService) Start(shutdown *utils.SyncedChan, registry *servmanag
return
}
// srv, _ := birpc.NewService(apis.NewActionSv1(acts.acts), "", false)
- acts.cl.RpcRegister(srv)
+ cl.RpcRegister(srv)
cms.AddInternalConn(utils.ActionS, srv)
return
}
@@ -100,13 +95,14 @@ func (acts *ActionService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceReg
}
// Shutdown stops the service
-func (acts *ActionService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
- acts.Lock()
- defer acts.Unlock()
+func (acts *ActionService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
+ acts.mu.Lock()
+ defer acts.mu.Unlock()
close(acts.stopChan)
acts.acts.Shutdown()
acts.acts = nil
- acts.cl.RpcUnregisterName(utils.ActionSv1)
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ cl.RpcUnregisterName(utils.ActionSv1)
return
}
diff --git a/services/adminsv1.go b/services/adminsv1.go
index eac0b44ab..0376c57b2 100644
--- a/services/adminsv1.go
+++ b/services/adminsv1.go
@@ -22,7 +22,6 @@ import (
"sync"
"github.com/cgrates/cgrates/apis"
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -39,17 +38,16 @@ func NewAdminSv1Service(cfg *config.CGRConfig) *AdminSv1Service {
// AdminSv1Service implements Service interface
type AdminSv1Service struct {
- sync.RWMutex
+ mu sync.RWMutex
cfg *config.CGRConfig
api *apis.AdminSv1
- cl *commonlisteners.CommonListenerS
stopChan chan struct{}
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
// For this service the start should be called from RAL Service
-func (apiService *AdminSv1Service) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
+func (s *AdminSv1Service) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
@@ -58,61 +56,62 @@ func (apiService *AdminSv1Service) Start(_ *utils.SyncedChan, registry *servmana
utils.DataDB,
utils.StorDB,
},
- registry, apiService.cfg.GeneralCfg().ConnectTimeout)
+ registry, s.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
- apiService.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
- fs := srvDeps[utils.FilterS].(*FilterService)
- dbs := srvDeps[utils.DataDB].(*DataDBService)
- sdbs := srvDeps[utils.StorDB].(*StorDBService)
+ fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
+ dm := srvDeps[utils.DataDB].(*DataDBService).DataManager()
+ sdb := srvDeps[utils.StorDB].(*StorDBService).DB()
- apiService.Lock()
- defer apiService.Unlock()
+ s.mu.Lock()
+ defer s.mu.Unlock()
- apiService.api = apis.NewAdminSv1(apiService.cfg, dbs.DataManager(), cms.ConnManager(), fs.FilterS(), sdbs.DB())
+ s.api = apis.NewAdminSv1(s.cfg, dm, cms.ConnManager(), fs, sdb)
- srv, _ := engine.NewService(apiService.api)
- // srv, _ := birpc.NewService(apiService.api, "", false)
+ srv, _ := engine.NewService(s.api)
+ // srv, _ := birpc.NewService(s.api, "", false)
for _, s := range srv {
- apiService.cl.RpcRegister(s)
+ cl.RpcRegister(s)
}
- rpl, _ := engine.NewService(apis.NewReplicatorSv1(dbs.DataManager(), apiService.api))
- for _, s := range rpl {
- apiService.cl.RpcRegister(s)
+ rpl, _ := engine.NewService(apis.NewReplicatorSv1(dm, s.api))
+ for _, svc := range rpl {
+ cl.RpcRegister(svc)
}
cms.AddInternalConn(utils.AdminS, srv)
return
}
// Reload handles the change of config
-func (apiService *AdminSv1Service) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
+func (s *AdminSv1Service) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
return
}
// Shutdown stops the service
-func (apiService *AdminSv1Service) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
- apiService.Lock()
- // close(apiService.stopChan)
- apiService.api = nil
- apiService.cl.RpcUnregisterName(utils.AdminSv1)
- apiService.Unlock()
+func (s *AdminSv1Service) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ // close(s.stopChan)
+ s.api = nil
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ cl.RpcUnregisterName(utils.AdminSv1)
return
}
// ServiceName returns the service name
-func (apiService *AdminSv1Service) ServiceName() string {
+func (s *AdminSv1Service) ServiceName() string {
return utils.AdminS
}
// ShouldRun returns if the service should be running
-func (apiService *AdminSv1Service) ShouldRun() bool {
- return apiService.cfg.AdminSCfg().Enabled
+func (s *AdminSv1Service) ShouldRun() bool {
+ return s.cfg.AdminSCfg().Enabled
}
// StateChan returns signaling channel of specific state
-func (apiService *AdminSv1Service) StateChan(stateID string) chan struct{} {
- return apiService.stateDeps.StateChan(stateID)
+func (s *AdminSv1Service) StateChan(stateID string) chan struct{} {
+ return s.stateDeps.StateChan(stateID)
}
diff --git a/services/analyzers.go b/services/analyzers.go
index cb301f8e9..0d3ec6cae 100644
--- a/services/analyzers.go
+++ b/services/analyzers.go
@@ -47,10 +47,9 @@ func NewAnalyzerService(cfg *config.CGRConfig) *AnalyzerService {
// AnalyzerService implements Service interface
type AnalyzerService struct {
- sync.RWMutex
+ mu sync.RWMutex
cfg *config.CGRConfig
anz *analyzers.AnalyzerS
- cl *commonlisteners.CommonListenerS
cancelFunc context.CancelFunc
stateDeps *StateDependencies // channel subscriptions for state changes
@@ -63,10 +62,10 @@ func (anz *AnalyzerService) Start(shutdown *utils.SyncedChan, registry *servmana
if err != nil {
return
}
- anz.cl = cls.(*CommonListenerService).CLS()
+ cl := cls.(*CommonListenerService).CLS()
- anz.Lock()
- defer anz.Unlock()
+ anz.mu.Lock()
+ defer anz.mu.Unlock()
if anz.anz, err = analyzers.NewAnalyzerS(anz.cfg); err != nil {
return
}
@@ -79,26 +78,26 @@ func (anz *AnalyzerService) Start(shutdown *utils.SyncedChan, registry *servmana
shutdown.CloseOnce()
}
}(anz.anz)
- anz.cl.SetAnalyzer(anz.anz)
- go anz.start(registry)
+ cl.SetAnalyzer(anz.anz)
+ go anz.start(registry, cl)
return
}
-func (anz *AnalyzerService) start(registry *servmanager.ServiceRegistry) {
+func (anz *AnalyzerService) start(registry *servmanager.ServiceRegistry, cl *commonlisteners.CommonListenerS) {
fs, err := WaitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
anz.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
- anz.Lock()
+ anz.mu.Lock()
anz.anz.SetFilterS(fs.(*FilterService).FilterS())
srv, _ := engine.NewService(anz.anz)
// srv, _ := birpc.NewService(apis.NewAnalyzerSv1(anz.anz), "", false)
for _, s := range srv {
- anz.cl.RpcRegister(s)
+ cl.RpcRegister(s)
}
- anz.Unlock()
+ anz.mu.Unlock()
}
// Reload handles the change of config
@@ -107,14 +106,15 @@ func (anz *AnalyzerService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRe
}
// Shutdown stops the service
-func (anz *AnalyzerService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
- anz.Lock()
+func (anz *AnalyzerService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ anz.mu.Lock()
anz.cancelFunc()
- anz.cl.SetAnalyzer(nil)
+ cl.SetAnalyzer(nil)
anz.anz.Shutdown()
anz.anz = nil
- anz.Unlock()
- anz.cl.RpcUnregisterName(utils.AnalyzerSv1)
+ anz.mu.Unlock()
+ cl.RpcUnregisterName(utils.AnalyzerSv1)
return
}
diff --git a/services/asteriskagent.go b/services/asteriskagent.go
index 79cfce440..1e107c2cc 100644
--- a/services/asteriskagent.go
+++ b/services/asteriskagent.go
@@ -38,7 +38,7 @@ func NewAsteriskAgent(cfg *config.CGRConfig) *AsteriskAgent {
// AsteriskAgent implements Agent interface
type AsteriskAgent struct {
- sync.RWMutex
+ mu sync.RWMutex
cfg *config.CGRConfig
stopChan chan struct{}
smas []*agents.AsteriskAgent
@@ -52,8 +52,8 @@ func (ast *AsteriskAgent) Start(shutdown *utils.SyncedChan, registry *servmanage
return
}
- ast.Lock()
- defer ast.Unlock()
+ ast.mu.Lock()
+ defer ast.mu.Unlock()
listenAndServe := func(sma *agents.AsteriskAgent, stopChan chan struct{}) {
if err := sma.ListenAndServe(stopChan); err != nil {
@@ -83,10 +83,10 @@ func (ast *AsteriskAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
}
func (ast *AsteriskAgent) shutdown() {
- ast.Lock()
+ ast.mu.Lock()
+ defer ast.mu.Unlock()
close(ast.stopChan)
ast.smas = nil
- ast.Unlock()
return // no shutdown for the momment
}
diff --git a/services/attributes.go b/services/attributes.go
index cb9773ef8..ff768e530 100644
--- a/services/attributes.go
+++ b/services/attributes.go
@@ -22,7 +22,6 @@ import (
"sync"
"github.com/cgrates/cgrates/apis"
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -39,13 +38,10 @@ func NewAttributeService(cfg *config.CGRConfig) *AttributeService {
// AttributeService implements Service interface
type AttributeService struct {
- sync.RWMutex
- cfg *config.CGRConfig
-
- attrS *engine.AttributeS
- cl *commonlisteners.CommonListenerS
- rpc *apis.AttributeSv1 // useful on restart
-
+ mu sync.Mutex
+ cfg *config.CGRConfig
+ attrS *engine.AttributeS
+ rpc *apis.AttributeSv1 // useful on restart
stateDeps *StateDependencies
}
@@ -63,7 +59,7 @@ func (attrS *AttributeService) Start(shutdown *utils.SyncedChan, registry *servm
if err != nil {
return
}
- attrS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
@@ -74,14 +70,14 @@ func (attrS *AttributeService) Start(shutdown *utils.SyncedChan, registry *servm
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
dm := srvDeps[utils.DataDB].(*DataDBService).DataManager()
- attrS.Lock()
- defer attrS.Unlock()
+ attrS.mu.Lock()
+ defer attrS.mu.Unlock()
attrS.attrS = engine.NewAttributeService(dm, fs, attrS.cfg)
attrS.rpc = apis.NewAttributeSv1(attrS.attrS)
srv, _ := engine.NewService(attrS.rpc)
// srv, _ := birpc.NewService(attrS.rpc, "", false)
for _, s := range srv {
- attrS.cl.RpcRegister(s)
+ cl.RpcRegister(s)
}
cms.AddInternalConn(utils.AttributeS, srv)
return
@@ -94,9 +90,10 @@ func (attrS *AttributeService) Reload(_ *utils.SyncedChan, _ *servmanager.Servic
// Shutdown stops the service
func (attrS *AttributeService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
- attrS.Lock()
- attrS.cl.RpcUnregisterName(utils.AttributeSv1)
- attrS.Unlock()
+ attrS.mu.Lock()
+ defer attrS.mu.Unlock()
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ cl.RpcUnregisterName(utils.AttributeSv1)
return
}
diff --git a/services/caches.go b/services/caches.go
index b338c7951..e38298e7c 100644
--- a/services/caches.go
+++ b/services/caches.go
@@ -21,7 +21,6 @@ package services
import (
"sync"
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -41,7 +40,6 @@ func NewCacheService(cfg *config.CGRConfig) *CacheService {
type CacheService struct {
mu sync.Mutex
cfg *config.CGRConfig
- cl *commonlisteners.CommonListenerS
cacheCh chan *engine.CacheS
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -59,7 +57,7 @@ func (cS *CacheService) Start(shutdown *utils.SyncedChan, registry *servmanager.
if err != nil {
return err
}
- cS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
dbs := srvDeps[utils.DataDB].(*DataDBService)
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cs := srvDeps[utils.CoreS].(*CoreService)
@@ -75,7 +73,7 @@ func (cS *CacheService) Start(shutdown *utils.SyncedChan, registry *servmanager.
srv, _ := engine.NewService(engine.Cache)
// srv, _ := birpc.NewService(apis.NewCacheSv1(engine.Cache), "", false)
for _, s := range srv {
- cS.cl.RpcRegister(s)
+ cl.RpcRegister(s)
}
cms.AddInternalConn(utils.CacheS, srv)
return
@@ -87,8 +85,11 @@ func (cS *CacheService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegist
}
// Shutdown stops the service
-func (cS *CacheService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) {
- cS.cl.RpcUnregisterName(utils.CacheSv1)
+func (cS *CacheService) Shutdown(registry *servmanager.ServiceRegistry) (_ error) {
+ cS.mu.Lock()
+ cS.mu.Unlock()
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ cl.RpcUnregisterName(utils.CacheSv1)
return
}
diff --git a/services/caps.go b/services/caps.go
index feb115c08..2879e1040 100644
--- a/services/caps.go
+++ b/services/caps.go
@@ -19,8 +19,6 @@ along with this program. If not, see
package services
import (
- "sync"
-
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -37,7 +35,6 @@ func NewCapService(cfg *config.CGRConfig) *CapService {
// CapService implements Service interface.
type CapService struct {
- mu sync.RWMutex
cfg *config.CGRConfig
caps *engine.Caps
stateDeps *StateDependencies // channel subscriptions for state changes
diff --git a/services/cdrs.go b/services/cdrs.go
index abe738936..13ccd7952 100644
--- a/services/cdrs.go
+++ b/services/cdrs.go
@@ -23,7 +23,6 @@ import (
"sync"
"github.com/cgrates/cgrates/cdrs"
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -40,12 +39,9 @@ func NewCDRServer(cfg *config.CGRConfig) *CDRService {
// CDRService implements Service interface
type CDRService struct {
- sync.RWMutex
- cfg *config.CGRConfig
-
- cdrS *cdrs.CDRServer
- cl *commonlisteners.CommonListenerS
-
+ mu sync.RWMutex
+ cfg *config.CGRConfig
+ cdrS *cdrs.CDRServer
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -63,14 +59,14 @@ func (cs *CDRService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe
if err != nil {
return err
}
- cs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
dbs := srvDeps[utils.DataDB].(*DataDBService)
sdbs := srvDeps[utils.StorDB].(*StorDBService).DB()
- cs.Lock()
- defer cs.Unlock()
+ cs.mu.Lock()
+ defer cs.mu.Unlock()
cs.cdrS = cdrs.NewCDRServer(cs.cfg, dbs.DataManager(), fs, cms.ConnManager(), sdbs)
runtime.Gosched()
@@ -78,7 +74,7 @@ func (cs *CDRService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe
if err != nil {
return err
}
- cs.cl.RpcRegister(srv)
+ cl.RpcRegister(srv)
cms.AddInternalConn(utils.CDRServer, srv)
return
}
@@ -89,11 +85,12 @@ func (cs *CDRService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry
}
// Shutdown stops the service
-func (cs *CDRService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
- cs.Lock()
+func (cs *CDRService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
+ cs.mu.Lock()
+ defer cs.mu.Unlock()
cs.cdrS = nil
- cs.Unlock()
- cs.cl.RpcUnregisterName(utils.CDRsV1)
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ cl.RpcUnregisterName(utils.CDRsV1)
return
}
diff --git a/services/chargers.go b/services/chargers.go
index 79594cbf1..379c7c5a5 100644
--- a/services/chargers.go
+++ b/services/chargers.go
@@ -21,7 +21,6 @@ package services
import (
"sync"
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -38,12 +37,9 @@ func NewChargerService(cfg *config.CGRConfig) *ChargerService {
// ChargerService implements Service interface
type ChargerService struct {
- sync.RWMutex
- cfg *config.CGRConfig
-
- chrS *engine.ChargerS
- cl *commonlisteners.CommonListenerS
-
+ mu sync.RWMutex
+ cfg *config.CGRConfig
+ chrS *engine.ChargerS
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -61,7 +57,7 @@ func (chrS *ChargerService) Start(shutdown *utils.SyncedChan, registry *servmana
if err != nil {
return err
}
- chrS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
@@ -72,13 +68,13 @@ func (chrS *ChargerService) Start(shutdown *utils.SyncedChan, registry *servmana
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
- chrS.Lock()
- defer chrS.Unlock()
+ chrS.mu.Lock()
+ defer chrS.mu.Unlock()
chrS.chrS = engine.NewChargerService(dbs.DataManager(), fs.FilterS(), chrS.cfg, cms.ConnManager())
srv, _ := engine.NewService(chrS.chrS)
// srv, _ := birpc.NewService(apis.NewChargerSv1(chrS.chrS), "", false)
for _, s := range srv {
- chrS.cl.RpcRegister(s)
+ cl.RpcRegister(s)
}
cms.AddInternalConn(utils.ChargerS, srv)
return nil
@@ -90,11 +86,12 @@ func (chrS *ChargerService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRe
}
// Shutdown stops the service
-func (chrS *ChargerService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
- chrS.Lock()
- defer chrS.Unlock()
+func (chrS *ChargerService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
+ chrS.mu.Lock()
+ defer chrS.mu.Unlock()
chrS.chrS = nil
- chrS.cl.RpcUnregisterName(utils.ChargerSv1)
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ cl.RpcUnregisterName(utils.ChargerSv1)
return
}
diff --git a/services/commonlisteners.go b/services/commonlisteners.go
index 5019a017b..7c76cfead 100644
--- a/services/commonlisteners.go
+++ b/services/commonlisteners.go
@@ -69,7 +69,45 @@ func (s *CommonListenerService) Reload(_ *utils.SyncedChan, _ *servmanager.Servi
}
// Shutdown stops the service.
-func (s *CommonListenerService) Shutdown(_ *servmanager.ServiceRegistry) error {
+func (s *CommonListenerService) Shutdown(registry *servmanager.ServiceRegistry) error {
+ deps := []string{
+ utils.AccountS,
+ utils.ActionS,
+ utils.AdminS,
+ utils.AnalyzerS,
+ utils.AttributeS,
+ utils.CacheS,
+ utils.CDRServer,
+ utils.ChargerS,
+ utils.ConfigS,
+ utils.CoreS,
+ utils.EEs,
+ utils.EFs,
+ utils.ERs,
+ utils.GuardianS,
+ utils.HTTPAgent,
+ utils.JanusAgent,
+ utils.LoaderS,
+ utils.RankingS,
+ utils.RateS,
+ utils.RegistrarC,
+ utils.ResourceS,
+ utils.RouteS,
+ utils.SessionS,
+ utils.StatS,
+ utils.ThresholdS,
+ utils.TPeS,
+ utils.TrendS,
+ }
+ for _, svcID := range deps {
+ if !servmanager.IsServiceInState(registry.Lookup(svcID), utils.StateServiceUP) {
+ continue
+ }
+ _, err := WaitForServiceState(utils.StateServiceDOWN, svcID, registry, s.cfg.GeneralCfg().ConnectTimeout)
+ if err != nil {
+ return err
+ }
+ }
s.mu.Lock()
defer s.mu.Unlock()
s.cls = nil
diff --git a/services/config.go b/services/config.go
index 5b2053ddb..02124c432 100644
--- a/services/config.go
+++ b/services/config.go
@@ -19,9 +19,6 @@ along with this program. If not, see
package services
import (
- "sync"
-
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -38,9 +35,7 @@ func NewConfigService(cfg *config.CGRConfig) *ConfigService {
// ConfigService implements Service interface.
type ConfigService struct {
- mu sync.RWMutex
cfg *config.CGRConfig
- cl *commonlisteners.CommonListenerS
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -55,12 +50,12 @@ func (s *ConfigService) Start(_ *utils.SyncedChan, registry *servmanager.Service
if err != nil {
return err
}
- s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
svcs, _ := engine.NewServiceWithName(s.cfg, utils.ConfigS, true)
for _, svc := range svcs {
- s.cl.RpcRegister(svc)
+ cl.RpcRegister(svc)
}
cms.AddInternalConn(utils.ConfigS, svcs)
return nil
@@ -72,8 +67,9 @@ func (s *ConfigService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegist
}
// Shutdown stops the service.
-func (s *ConfigService) Shutdown(_ *servmanager.ServiceRegistry) error {
- s.cl.RpcUnregisterName(utils.ConfigSv1)
+func (s *ConfigService) Shutdown(registry *servmanager.ServiceRegistry) error {
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ cl.RpcUnregisterName(utils.ConfigSv1)
return nil
}
diff --git a/services/cores.go b/services/cores.go
index b723b2d4e..d79178639 100644
--- a/services/cores.go
+++ b/services/cores.go
@@ -22,7 +22,6 @@ import (
"os"
"sync"
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/engine"
@@ -45,7 +44,6 @@ type CoreService struct {
mu sync.RWMutex
cfg *config.CGRConfig
cS *cores.CoreS
- cl *commonlisteners.CommonListenerS
fileCPU *os.File
stopChan chan struct{}
shdWg *sync.WaitGroup
@@ -65,7 +63,7 @@ func (s *CoreService) Start(shutdown *utils.SyncedChan, registry *servmanager.Se
return err
}
caps := srvDeps[utils.CapS].(*CapService).Caps()
- s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
s.mu.Lock()
@@ -77,7 +75,7 @@ func (s *CoreService) Start(shutdown *utils.SyncedChan, registry *servmanager.Se
return err
}
for _, svc := range srv {
- s.cl.RpcRegister(svc)
+ cl.RpcRegister(svc)
}
cms.AddInternalConn(utils.CoreS, srv)
return nil
@@ -89,7 +87,7 @@ func (s *CoreService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry
}
// Shutdown stops the service
-func (s *CoreService) Shutdown(_ *servmanager.ServiceRegistry) error {
+func (s *CoreService) Shutdown(registry *servmanager.ServiceRegistry) error {
s.mu.Lock()
defer s.mu.Unlock()
s.cS.Shutdown()
@@ -97,7 +95,8 @@ func (s *CoreService) Shutdown(_ *servmanager.ServiceRegistry) error {
s.cS.StopCPUProfiling()
s.cS.StopMemoryProfiling()
s.cS = nil
- s.cl.RpcUnregisterName(utils.CoreSv1)
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ cl.RpcUnregisterName(utils.CoreSv1)
return nil
}
diff --git a/services/datadb.go b/services/datadb.go
index 61c313ed3..23a747d16 100644
--- a/services/datadb.go
+++ b/services/datadb.go
@@ -39,7 +39,7 @@ func NewDataDBService(cfg *config.CGRConfig, setVersions bool) *DataDBService {
// DataDBService implements Service interface
type DataDBService struct {
- sync.RWMutex
+ mu sync.RWMutex
cfg *config.CGRConfig
oldDBCfg *config.DataDbCfg
dm *engine.DataManager
@@ -53,8 +53,8 @@ func (db *DataDBService) Start(_ *utils.SyncedChan, registry *servmanager.Servic
if err != nil {
return
}
- db.Lock()
- defer db.Unlock()
+ db.mu.Lock()
+ defer db.mu.Unlock()
db.oldDBCfg = db.cfg.DataDbCfg().Clone()
dbConn, err := engine.NewDataDBConn(db.cfg.DataDbCfg().Type,
db.cfg.DataDbCfg().Host, db.cfg.DataDbCfg().Port,
@@ -81,8 +81,8 @@ func (db *DataDBService) Start(_ *utils.SyncedChan, registry *servmanager.Servic
// Reload handles the change of config
func (db *DataDBService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
- db.Lock()
- defer db.Unlock()
+ db.mu.Lock()
+ defer db.mu.Unlock()
if db.needsConnectionReload() {
var d engine.DataDBDriver
d, err = engine.NewDataDBConn(db.cfg.DataDbCfg().Type,
@@ -126,8 +126,8 @@ func (db *DataDBService) Shutdown(registry *servmanager.ServiceRegistry) error {
return err
}
}
- db.Lock()
- defer db.Unlock()
+ db.mu.Lock()
+ defer db.mu.Unlock()
db.dm.DataDB().Close()
return nil
}
diff --git a/services/diameteragent.go b/services/diameteragent.go
index 9e776cc7d..44aad26a2 100644
--- a/services/diameteragent.go
+++ b/services/diameteragent.go
@@ -39,7 +39,7 @@ func NewDiameterAgent(cfg *config.CGRConfig) *DiameterAgent {
// DiameterAgent implements Agent interface
type DiameterAgent struct {
- sync.RWMutex
+ mu sync.RWMutex
cfg *config.CGRConfig
stopChan chan struct{}
@@ -67,8 +67,8 @@ func (s *DiameterAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.
cm := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager()
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
- s.Lock()
- defer s.Unlock()
+ s.mu.Lock()
+ defer s.mu.Unlock()
return s.start(fs, cm, caps, shutdown)
}
@@ -94,8 +94,8 @@ func (s *DiameterAgent) start(filterS *engine.FilterS, cm *engine.ConnManager, c
// Reload handles the change of config
func (s *DiameterAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
- s.Lock()
- defer s.Unlock()
+ s.mu.Lock()
+ defer s.mu.Unlock()
if s.lnet == s.cfg.DiameterAgentCfg().ListenNet &&
s.laddr == s.cfg.DiameterAgentCfg().Listen {
return
@@ -120,10 +120,10 @@ func (s *DiameterAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager
// Shutdown stops the service
func (s *DiameterAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
- s.Lock()
+ s.mu.Lock()
close(s.stopChan)
s.da = nil
- s.Unlock()
+ s.mu.Unlock()
return // no shutdown for the momment
}
diff --git a/services/dnsagent.go b/services/dnsagent.go
index 8a37eff98..78d7fc7af 100644
--- a/services/dnsagent.go
+++ b/services/dnsagent.go
@@ -37,7 +37,7 @@ func NewDNSAgent(cfg *config.CGRConfig) *DNSAgent {
// DNSAgent implements Agent interface
type DNSAgent struct {
- sync.RWMutex
+ mu sync.RWMutex
cfg *config.CGRConfig
stopChan chan struct{}
@@ -61,8 +61,8 @@ func (dns *DNSAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.Ser
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
- dns.Lock()
- defer dns.Unlock()
+ dns.mu.Lock()
+ defer dns.mu.Unlock()
dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.FilterS(), cms.ConnManager())
if err != nil {
dns.dns = nil
@@ -87,8 +87,8 @@ func (dns *DNSAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager.Se
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
- dns.Lock()
- defer dns.Unlock()
+ dns.mu.Lock()
+ defer dns.mu.Unlock()
if dns.dns != nil {
close(dns.stopChan)
@@ -122,8 +122,8 @@ func (dns *DNSAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
return
}
close(dns.stopChan)
- dns.Lock()
- defer dns.Unlock()
+ dns.mu.Lock()
+ defer dns.mu.Unlock()
dns.dns = nil
return
}
diff --git a/services/ees.go b/services/ees.go
index e031d2496..c8afaf690 100644
--- a/services/ees.go
+++ b/services/ees.go
@@ -21,7 +21,6 @@ package services
import (
"sync"
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/ees"
"github.com/cgrates/cgrates/engine"
@@ -39,43 +38,12 @@ func NewEventExporterService(cfg *config.CGRConfig) *EventExporterService {
// EventExporterService is the service structure for EventExporterS
type EventExporterService struct {
- mu sync.RWMutex
- cfg *config.CGRConfig
-
- eeS *ees.EeS
- cl *commonlisteners.CommonListenerS
-
+ mu sync.RWMutex
+ cfg *config.CGRConfig
+ eeS *ees.EeS
stateDeps *StateDependencies // channel subscriptions for state changes
}
-// ServiceName returns the service name
-func (es *EventExporterService) ServiceName() string {
- return utils.EEs
-}
-
-// ShouldRun returns if the service should be running
-func (es *EventExporterService) ShouldRun() (should bool) {
- return es.cfg.EEsCfg().Enabled
-}
-
-// Reload handles the change of config
-func (es *EventExporterService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error {
- es.mu.Lock()
- defer es.mu.Unlock()
- es.eeS.ClearExporterCache()
- return es.eeS.SetupExporterCache()
-}
-
-// Shutdown stops the service
-func (es *EventExporterService) Shutdown(_ *servmanager.ServiceRegistry) error {
- es.mu.Lock()
- defer es.mu.Unlock()
- es.eeS.ClearExporterCache()
- es.eeS = nil
- es.cl.RpcUnregisterName(utils.EeSv1)
- return nil
-}
-
// Start should handle the service start
func (es *EventExporterService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
@@ -88,7 +56,7 @@ func (es *EventExporterService) Start(_ *utils.SyncedChan, registry *servmanager
if err != nil {
return err
}
- es.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
@@ -102,11 +70,40 @@ func (es *EventExporterService) Start(_ *utils.SyncedChan, registry *servmanager
srv, _ := engine.NewServiceWithPing(es.eeS, utils.EeSv1, utils.V1Prfx)
// srv, _ := birpc.NewService(es.rpc, "", false)
- es.cl.RpcRegister(srv)
+ cl.RpcRegister(srv)
cms.AddInternalConn(utils.EEs, srv)
return nil
}
+// Reload handles the change of config
+func (es *EventExporterService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error {
+ es.mu.Lock()
+ defer es.mu.Unlock()
+ es.eeS.ClearExporterCache()
+ return es.eeS.SetupExporterCache()
+}
+
+// Shutdown stops the service
+func (es *EventExporterService) Shutdown(registry *servmanager.ServiceRegistry) error {
+ es.mu.Lock()
+ defer es.mu.Unlock()
+ es.eeS.ClearExporterCache()
+ es.eeS = nil
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ cl.RpcUnregisterName(utils.EeSv1)
+ return nil
+}
+
+// ServiceName returns the service name
+func (es *EventExporterService) ServiceName() string {
+ return utils.EEs
+}
+
+// ShouldRun returns if the service should be running
+func (es *EventExporterService) ShouldRun() (should bool) {
+ return es.cfg.EEsCfg().Enabled
+}
+
// StateChan returns signaling channel of specific state
func (es *EventExporterService) StateChan(stateID string) chan struct{} {
return es.stateDeps.StateChan(stateID)
diff --git a/services/efs.go b/services/efs.go
index eea73a6c2..e548b033a 100644
--- a/services/efs.go
+++ b/services/efs.go
@@ -22,7 +22,6 @@ import (
"sync"
"github.com/cgrates/birpc"
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/efs"
"github.com/cgrates/cgrates/engine"
@@ -32,15 +31,11 @@ import (
// ExportFailoverService is the service structure for ExportFailover
type ExportFailoverService struct {
- sync.Mutex
-
- efS *efs.EfS
- cl *commonlisteners.CommonListenerS
- srv *birpc.Service
-
- stopChan chan struct{}
- cfg *config.CGRConfig
-
+ mu sync.Mutex
+ efS *efs.EfS
+ srv *birpc.Service
+ stopChan chan struct{}
+ cfg *config.CGRConfig
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -53,54 +48,59 @@ func NewExportFailoverService(cfg *config.CGRConfig) *ExportFailoverService {
}
// Start should handle the service start
-func (efServ *ExportFailoverService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
+func (s *ExportFailoverService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
},
- registry, efServ.cfg.GeneralCfg().ConnectTimeout)
+ registry, s.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
- efServ.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
- efServ.Lock()
- defer efServ.Unlock()
+ s.mu.Lock()
+ defer s.mu.Unlock()
- efServ.efS = efs.NewEfs(efServ.cfg, cms.ConnManager())
- efServ.stopChan = make(chan struct{})
- efServ.srv, _ = engine.NewServiceWithPing(efServ.efS, utils.EfSv1, utils.V1Prfx)
- efServ.cl.RpcRegister(efServ.srv)
- cms.AddInternalConn(utils.EFs, efServ.srv)
+ s.efS = efs.NewEfs(s.cfg, cms.ConnManager())
+ s.stopChan = make(chan struct{})
+ s.srv, _ = engine.NewServiceWithPing(s.efS, utils.EfSv1, utils.V1Prfx)
+ cl.RpcRegister(s.srv)
+ cms.AddInternalConn(utils.EFs, s.srv)
return
}
// Reload handles the change of config
-func (efServ *ExportFailoverService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
+func (s *ExportFailoverService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
return
}
// Shutdown stops the service
-func (efServ *ExportFailoverService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
- efServ.srv = nil
- close(efServ.stopChan)
+func (s *ExportFailoverService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.srv = nil
+ close(s.stopChan)
+
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ cl.RpcUnregisterName(utils.EfSv1)
// NEXT SHOULD EXPORT ALL THE SHUTDOWN LOGGERS TO WRITE
return
}
// ShouldRun returns if the service should be running
-func (efServ *ExportFailoverService) ShouldRun() bool {
- return efServ.cfg.EFsCfg().Enabled
+func (s *ExportFailoverService) ShouldRun() bool {
+ return s.cfg.EFsCfg().Enabled
}
// ServiceName returns the service name
-func (efServ *ExportFailoverService) ServiceName() string {
+func (s *ExportFailoverService) ServiceName() string {
return utils.EFs
}
// StateChan returns signaling channel of specific state
-func (efServ *ExportFailoverService) StateChan(stateID string) chan struct{} {
- return efServ.stateDeps.StateChan(stateID)
+func (s *ExportFailoverService) StateChan(stateID string) chan struct{} {
+ return s.stateDeps.StateChan(stateID)
}
diff --git a/services/ers.go b/services/ers.go
index 1f7acc1a8..57dad85f5 100644
--- a/services/ers.go
+++ b/services/ers.go
@@ -22,7 +22,6 @@ import (
"fmt"
"sync"
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/ers"
@@ -41,15 +40,11 @@ func NewEventReaderService(cfg *config.CGRConfig) *EventReaderService {
// EventReaderService implements Service interface
type EventReaderService struct {
- sync.RWMutex
- cfg *config.CGRConfig
-
- ers *ers.ERService
- cl *commonlisteners.CommonListenerS
-
- rldChan chan struct{}
- stopChan chan struct{}
-
+ mu sync.RWMutex
+ cfg *config.CGRConfig
+ ers *ers.ERService
+ rldChan chan struct{}
+ stopChan chan struct{}
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -65,12 +60,12 @@ func (erS *EventReaderService) Start(shutdown *utils.SyncedChan, registry *servm
if err != nil {
return err
}
- erS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
- erS.Lock()
- defer erS.Unlock()
+ erS.mu.Lock()
+ defer erS.mu.Unlock()
// remake the stop chan
erS.stopChan = make(chan struct{})
@@ -83,7 +78,7 @@ func (erS *EventReaderService) Start(shutdown *utils.SyncedChan, registry *servm
if err != nil {
return err
}
- erS.cl.RpcRegister(srv)
+ cl.RpcRegister(srv)
cms.AddInternalConn(utils.ERs, srv)
return
}
@@ -98,19 +93,20 @@ func (erS *EventReaderService) listenAndServe(ers *ers.ERService, stopChan, rldC
// Reload handles the change of config
func (erS *EventReaderService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
- erS.RLock()
+ erS.mu.RLock()
erS.rldChan <- struct{}{}
- erS.RUnlock()
+ erS.mu.RUnlock()
return
}
// Shutdown stops the service
-func (erS *EventReaderService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
- erS.Lock()
- defer erS.Unlock()
+func (erS *EventReaderService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
+ erS.mu.Lock()
+ defer erS.mu.Unlock()
close(erS.stopChan)
erS.ers = nil
- erS.cl.RpcUnregisterName(utils.ErSv1)
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ cl.RpcUnregisterName(utils.ErSv1)
return
}
diff --git a/services/guardian.go b/services/guardian.go
index 69bd1f072..4d1608b0c 100644
--- a/services/guardian.go
+++ b/services/guardian.go
@@ -19,9 +19,6 @@ along with this program. If not, see
package services
import (
- "sync"
-
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/guardian"
@@ -39,9 +36,7 @@ func NewGuardianService(cfg *config.CGRConfig) *GuardianService {
// GuardianService implements Service interface.
type GuardianService struct {
- mu sync.RWMutex
cfg *config.CGRConfig
- cl *commonlisteners.CommonListenerS
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -56,15 +51,12 @@ func (s *GuardianService) Start(_ *utils.SyncedChan, registry *servmanager.Servi
if err != nil {
return err
}
- s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
- s.mu.Lock()
- defer s.mu.Unlock()
-
svcs, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true)
for _, svc := range svcs {
- s.cl.RpcRegister(svc)
+ cl.RpcRegister(svc)
}
cms.AddInternalConn(utils.GuardianS, svcs)
return nil
@@ -76,10 +68,9 @@ func (s *GuardianService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegi
}
// Shutdown stops the service.
-func (s *GuardianService) Shutdown(_ *servmanager.ServiceRegistry) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- s.cl.RpcUnregisterName(utils.GuardianSv1)
+func (s *GuardianService) Shutdown(registry *servmanager.ServiceRegistry) error {
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ cl.RpcUnregisterName(utils.GuardianSv1)
return nil
}
diff --git a/services/httpagent.go b/services/httpagent.go
index 575b8c74a..096df69a5 100644
--- a/services/httpagent.go
+++ b/services/httpagent.go
@@ -22,7 +22,6 @@ import (
"sync"
"github.com/cgrates/cgrates/agents"
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
@@ -38,11 +37,9 @@ func NewHTTPAgent(cfg *config.CGRConfig) *HTTPAgent {
// HTTPAgent implements Agent interface
type HTTPAgent struct {
- sync.RWMutex
+ mu sync.RWMutex
cfg *config.CGRConfig
- cl *commonlisteners.CommonListenerS
-
// we can realy stop the HTTPAgent so keep a flag
// if we registerd the handlers
started bool
@@ -63,16 +60,16 @@ func (ha *HTTPAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceReg
return err
}
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
- cms := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager()
- fs := srvDeps[utils.FilterS].(*FilterService)
+ cm := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager()
+ fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
- ha.Lock()
- defer ha.Unlock()
+ ha.mu.Lock()
+ defer ha.mu.Unlock()
ha.started = true
for _, agntCfg := range ha.cfg.HTTPAgentCfg() {
cl.RegisterHttpHandler(agntCfg.URL,
- agents.NewHTTPAgent(cms, agntCfg.SessionSConns, fs.FilterS(),
+ agents.NewHTTPAgent(cm, agntCfg.SessionSConns, fs,
ha.cfg.GeneralCfg().DefaultTenant, agntCfg.RequestPayload,
agntCfg.ReplyPayload, agntCfg.RequestProcessors))
}
@@ -86,9 +83,9 @@ func (ha *HTTPAgent) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry)
// Shutdown stops the service
func (ha *HTTPAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
- ha.Lock()
+ ha.mu.Lock()
ha.started = false
- ha.Unlock()
+ ha.mu.Unlock()
return // no shutdown for the momment
}
diff --git a/services/janus.go b/services/janus.go
index a14cab76f..aaf0c2bc8 100644
--- a/services/janus.go
+++ b/services/janus.go
@@ -39,10 +39,9 @@ func NewJanusAgent(cfg *config.CGRConfig) *JanusAgent {
// JanusAgent implements Service interface
type JanusAgent struct {
- sync.RWMutex
+ mu sync.RWMutex
cfg *config.CGRConfig
-
- jA *agents.JanusAgent
+ jA *agents.JanusAgent
// we can realy stop the JanusAgent so keep a flag
// if we registerd the jandlers
@@ -67,9 +66,9 @@ func (ja *JanusAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
- ja.Lock()
+ ja.mu.Lock()
+ defer ja.mu.Unlock()
if ja.started {
- ja.Unlock()
return utils.ErrServiceAlreadyRunning
}
ja.jA, err = agents.NewJanusAgent(ja.cfg, cms.ConnManager(), fs.FilterS())
@@ -89,7 +88,6 @@ func (ja *JanusAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe
cl.RegisterHttpHandler(fmt.Sprintf("POST %s/{sessionID}/{handleID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.HandlePlugin))
ja.started = true
- ja.Unlock()
return
}
@@ -100,10 +98,10 @@ func (ja *JanusAgent) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry
// Shutdown stops the service
func (ja *JanusAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
- ja.Lock()
+ ja.mu.Lock()
err = ja.jA.Shutdown()
ja.started = false
- ja.Unlock()
+ ja.mu.Unlock()
return // no shutdown for the momment
}
diff --git a/services/loaders.go b/services/loaders.go
index 103539de8..b6995f12a 100644
--- a/services/loaders.go
+++ b/services/loaders.go
@@ -23,7 +23,6 @@ import (
"sync"
"github.com/cgrates/birpc/context"
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/loaders"
@@ -43,10 +42,9 @@ func NewLoaderService(cfg *config.CGRConfig, preloadIDs []string) *LoaderService
// LoaderService implements Service interface
type LoaderService struct {
- sync.RWMutex
+ mu sync.RWMutex
cfg *config.CGRConfig
ldrs *loaders.LoaderS
- cl *commonlisteners.CommonListenerS
preloadIDs []string
stopChan chan struct{}
stateDeps *StateDependencies // channel subscriptions for state changes
@@ -65,13 +63,13 @@ func (s *LoaderService) Start(_ *utils.SyncedChan, registry *servmanager.Service
if err != nil {
return err
}
- s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager()
- s.Lock()
- defer s.Unlock()
+ s.mu.Lock()
+ defer s.mu.Unlock()
s.ldrs = loaders.NewLoaderS(s.cfg, dbs, fs, cms.ConnManager())
if !s.ldrs.Enabled() {
@@ -97,7 +95,7 @@ func (s *LoaderService) Start(_ *utils.SyncedChan, registry *servmanager.Service
srv, _ := engine.NewService(s.ldrs)
// srv, _ := birpc.NewService(apis.NewLoaderSv1(ldrs.ldrs), "", false)
for _, svc := range srv {
- s.cl.RpcRegister(svc)
+ cl.RpcRegister(svc)
}
cms.AddInternalConn(utils.LoaderS, srv)
return nil
@@ -121,21 +119,21 @@ func (s *LoaderService) Reload(_ *utils.SyncedChan, registry *servmanager.Servic
close(s.stopChan)
s.stopChan = make(chan struct{})
- s.RLock()
- defer s.RUnlock()
+ s.mu.RLock()
+ defer s.mu.RUnlock()
s.ldrs.Reload(dbs, fs, cms)
return s.ldrs.ListenAndServe(s.stopChan)
}
// Shutdown stops the service
-func (s *LoaderService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) {
- s.Lock()
- s.ldrs = nil
+func (s *LoaderService) Shutdown(registry *servmanager.ServiceRegistry) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
close(s.stopChan)
- s.cl.RpcUnregisterName(utils.LoaderSv1)
- s.Unlock()
- return
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ cl.RpcUnregisterName(utils.LoaderSv1)
+ return nil
}
// ServiceName returns the service name
@@ -148,11 +146,6 @@ func (s *LoaderService) ShouldRun() bool {
return s.cfg.LoaderCfg().Enabled()
}
-// GetLoaderS returns the initialized LoaderService
-func (s *LoaderService) GetLoaderS() *loaders.LoaderS {
- return s.ldrs
-}
-
// StateChan returns signaling channel of specific state
func (s *LoaderService) StateChan(stateID string) chan struct{} {
return s.stateDeps.StateChan(stateID)
diff --git a/services/logger.go b/services/logger.go
index 9627007d2..7a95c9d72 100644
--- a/services/logger.go
+++ b/services/logger.go
@@ -19,8 +19,6 @@ along with this program. If not, see
package services
import (
- "sync"
-
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/efs"
@@ -40,10 +38,8 @@ func NewLoggerService(cfg *config.CGRConfig, loggerType string) *LoggerService {
// LoggerService implements Service interface.
type LoggerService struct {
- mu sync.RWMutex
- cfg *config.CGRConfig
- stateDeps *StateDependencies // channel subscriptions for state changes
-
+ cfg *config.CGRConfig
+ stateDeps *StateDependencies // channel subscriptions for state changes
loggerType string
}
diff --git a/services/radiusagent.go b/services/radiusagent.go
index 66194b9b2..8701d803e 100644
--- a/services/radiusagent.go
+++ b/services/radiusagent.go
@@ -38,16 +38,13 @@ func NewRadiusAgent(cfg *config.CGRConfig) *RadiusAgent {
// RadiusAgent implements Agent interface
type RadiusAgent struct {
- sync.RWMutex
- cfg *config.CGRConfig
- stopChan chan struct{}
-
- rad *agents.RadiusAgent
-
- lnet string
- lauth string
- lacct string
-
+ mu sync.RWMutex
+ cfg *config.CGRConfig
+ stopChan chan struct{}
+ rad *agents.RadiusAgent
+ lnet string
+ lauth string
+ lacct string
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -65,8 +62,8 @@ func (rad *RadiusAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
- rad.Lock()
- defer rad.Unlock()
+ rad.mu.Lock()
+ defer rad.mu.Unlock()
rad.lnet = rad.cfg.RadiusAgentCfg().ListenNet
rad.lauth = rad.cfg.RadiusAgentCfg().ListenAuth
@@ -108,10 +105,10 @@ func (rad *RadiusAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
}
func (rad *RadiusAgent) shutdown() {
- rad.Lock()
+ rad.mu.Lock()
+ defer rad.mu.Unlock()
close(rad.stopChan)
rad.rad = nil
- rad.Unlock()
}
// ServiceName returns the service name
diff --git a/services/rankings.go b/services/rankings.go
index 9b462bc92..3c5217c82 100644
--- a/services/rankings.go
+++ b/services/rankings.go
@@ -23,7 +23,6 @@ import (
"github.com/cgrates/birpc/context"
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -39,12 +38,9 @@ func NewRankingService(cfg *config.CGRConfig) *RankingService {
}
type RankingService struct {
- sync.RWMutex
- cfg *config.CGRConfig
-
- ran *engine.RankingS
- cl *commonlisteners.CommonListenerS
-
+ mu sync.RWMutex
+ cfg *config.CGRConfig
+ ran *engine.RankingS
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -62,7 +58,7 @@ func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanag
if err != nil {
return err
}
- ran.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
@@ -73,8 +69,8 @@ func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanag
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
- ran.Lock()
- defer ran.Unlock()
+ ran.mu.Lock()
+ defer ran.mu.Unlock()
ran.ran = engine.NewRankingS(dbs.DataManager(), cms.ConnManager(), fs.FilterS(), ran.cfg)
if err := ran.ran.StartRankingS(context.TODO()); err != nil {
return err
@@ -84,7 +80,7 @@ func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanag
return err
}
for _, s := range srv {
- ran.cl.RpcRegister(s)
+ cl.RpcRegister(s)
}
cms.AddInternalConn(utils.RankingS, srv)
return nil
@@ -92,19 +88,20 @@ func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanag
// Reload handles the change of config
func (ran *RankingService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
- ran.Lock()
+ ran.mu.Lock()
ran.ran.Reload(context.TODO())
- ran.Unlock()
+ ran.mu.Unlock()
return
}
// Shutdown stops the service
-func (ran *RankingService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
- ran.Lock()
- defer ran.Unlock()
+func (ran *RankingService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
+ ran.mu.Lock()
+ defer ran.mu.Unlock()
ran.ran.StopRankingS()
ran.ran = nil
- ran.cl.RpcUnregisterName(utils.RankingSv1)
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ cl.RpcUnregisterName(utils.RankingSv1)
return
}
diff --git a/services/rates.go b/services/rates.go
index 2082e2db2..18d54a845 100644
--- a/services/rates.go
+++ b/services/rates.go
@@ -21,7 +21,6 @@ package services
import (
"sync"
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/rates"
@@ -40,41 +39,14 @@ func NewRateService(cfg *config.CGRConfig) *RateService {
// RateService is the service structure for RateS
type RateService struct {
- sync.RWMutex
+ mu sync.RWMutex
cfg *config.CGRConfig
rateS *rates.RateS
- cl *commonlisteners.CommonListenerS
rldChan chan struct{}
stopChan chan struct{}
stateDeps *StateDependencies // channel subscriptions for state changes
}
-// ServiceName returns the service name
-func (rs *RateService) ServiceName() string {
- return utils.RateS
-}
-
-// ShouldRun returns if the service should be running
-func (rs *RateService) ShouldRun() (should bool) {
- return rs.cfg.RateSCfg().Enabled
-}
-
-// Reload handles the change of config
-func (rs *RateService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (_ error) {
- rs.rldChan <- struct{}{}
- return
-}
-
-// Shutdown stops the service
-func (rs *RateService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
- rs.Lock()
- defer rs.Unlock()
- close(rs.stopChan)
- rs.rateS = nil
- rs.cl.RpcUnregisterName(utils.RateSv1)
- return
-}
-
// Start should handle the service start
func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
@@ -89,7 +61,7 @@ func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.S
if err != nil {
return err
}
- rs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
@@ -101,9 +73,9 @@ func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.S
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager()
- rs.Lock()
+ rs.mu.Lock()
rs.rateS = rates.NewRateS(rs.cfg, fs, dbs)
- rs.Unlock()
+ rs.mu.Unlock()
rs.stopChan = make(chan struct{})
go rs.rateS.ListenAndServe(rs.stopChan, rs.rldChan)
@@ -113,12 +85,39 @@ func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.S
return err
}
// srv, _ := birpc.NewService(apis.NewRateSv1(rs.rateS), "", false)
- rs.cl.RpcRegister(srv)
+ cl.RpcRegister(srv)
cms.AddInternalConn(utils.RateS, srv)
return
}
+// Reload handles the change of config
+func (rs *RateService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (_ error) {
+ rs.rldChan <- struct{}{}
+ return
+}
+
+// Shutdown stops the service
+func (rs *RateService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
+ rs.mu.Lock()
+ defer rs.mu.Unlock()
+ close(rs.stopChan)
+ rs.rateS = nil
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ cl.RpcUnregisterName(utils.RateSv1)
+ return
+}
+
// StateChan returns signaling channel of specific state
func (rs *RateService) StateChan(stateID string) chan struct{} {
return rs.stateDeps.StateChan(stateID)
}
+
+// ServiceName returns the service name
+func (rs *RateService) ServiceName() string {
+ return utils.RateS
+}
+
+// ShouldRun returns if the service should be running
+func (rs *RateService) ShouldRun() (should bool) {
+ return rs.cfg.RateSCfg().Enabled
+}
diff --git a/services/registrarc.go b/services/registrarc.go
index bdd07177c..9d9795264 100644
--- a/services/registrarc.go
+++ b/services/registrarc.go
@@ -37,21 +37,18 @@ func NewRegistrarCService(cfg *config.CGRConfig) *RegistrarCService {
// RegistrarCService implements Service interface
type RegistrarCService struct {
- sync.RWMutex
- cfg *config.CGRConfig
-
- dspS *registrarc.RegistrarCService
-
- stopChan chan struct{}
- rldChan chan struct{}
-
+ mu sync.RWMutex
+ cfg *config.CGRConfig
+ dspS *registrarc.RegistrarCService
+ stopChan chan struct{}
+ rldChan chan struct{}
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
func (dspS *RegistrarCService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
- dspS.Lock()
- defer dspS.Unlock()
+ dspS.mu.Lock()
+ defer dspS.mu.Unlock()
cms, err := WaitForServiceState(utils.StateServiceUP, utils.ConnManager, registry, dspS.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
@@ -73,11 +70,11 @@ func (dspS *RegistrarCService) Reload(_ *utils.SyncedChan, _ *servmanager.Servic
// Shutdown stops the service
func (dspS *RegistrarCService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
- dspS.Lock()
+ dspS.mu.Lock()
+ defer dspS.mu.Unlock()
close(dspS.stopChan)
dspS.dspS.Shutdown()
dspS.dspS = nil
- dspS.Unlock()
return
}
diff --git a/services/resources.go b/services/resources.go
index 602d95661..f26a7af6a 100644
--- a/services/resources.go
+++ b/services/resources.go
@@ -22,7 +22,6 @@ import (
"sync"
"github.com/cgrates/birpc/context"
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -39,12 +38,9 @@ func NewResourceService(cfg *config.CGRConfig) *ResourceService {
// ResourceService implements Service interface
type ResourceService struct {
- sync.RWMutex
- cfg *config.CGRConfig
-
- reS *engine.ResourceS
- cl *commonlisteners.CommonListenerS
-
+ mu sync.RWMutex
+ cfg *config.CGRConfig
+ reS *engine.ResourceS
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -62,7 +58,7 @@ func (reS *ResourceService) Start(shutdown *utils.SyncedChan, registry *servmana
if err != nil {
return err
}
- reS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
@@ -74,14 +70,14 @@ func (reS *ResourceService) Start(shutdown *utils.SyncedChan, registry *servmana
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
- reS.Lock()
- defer reS.Unlock()
+ reS.mu.Lock()
+ defer reS.mu.Unlock()
reS.reS = engine.NewResourceService(dbs.DataManager(), reS.cfg, fs.FilterS(), cms.ConnManager())
reS.reS.StartLoop(context.TODO())
srv, _ := engine.NewService(reS.reS)
// srv, _ := birpc.NewService(apis.NewResourceSv1(reS.reS), "", false)
for _, s := range srv {
- reS.cl.RpcRegister(s)
+ cl.RpcRegister(s)
}
cms.AddInternalConn(utils.ResourceS, srv)
return
@@ -89,19 +85,20 @@ func (reS *ResourceService) Start(shutdown *utils.SyncedChan, registry *servmana
// Reload handles the change of config
func (reS *ResourceService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
- reS.Lock()
+ reS.mu.Lock()
reS.reS.Reload(context.TODO())
- reS.Unlock()
+ reS.mu.Unlock()
return
}
// Shutdown stops the service
-func (reS *ResourceService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
- reS.Lock()
- defer reS.Unlock()
+func (reS *ResourceService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
+ reS.mu.Lock()
+ defer reS.mu.Unlock()
reS.reS.Shutdown(context.TODO()) //we don't verify the error because shutdown never returns an error
reS.reS = nil
- reS.cl.RpcUnregisterName(utils.ResourceSv1)
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ cl.RpcUnregisterName(utils.ResourceSv1)
return
}
diff --git a/services/routes.go b/services/routes.go
index f8e1a0225..557ae23ce 100644
--- a/services/routes.go
+++ b/services/routes.go
@@ -21,7 +21,6 @@ package services
import (
"sync"
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -38,12 +37,9 @@ func NewRouteService(cfg *config.CGRConfig) *RouteService {
// RouteService implements Service interface
type RouteService struct {
- sync.RWMutex
- cfg *config.CGRConfig
-
- routeS *engine.RouteS
- cl *commonlisteners.CommonListenerS
-
+ mu sync.RWMutex
+ cfg *config.CGRConfig
+ routeS *engine.RouteS
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -61,7 +57,7 @@ func (routeS *RouteService) Start(shutdown *utils.SyncedChan, registry *servmana
if err != nil {
return err
}
- routeS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
@@ -72,13 +68,13 @@ func (routeS *RouteService) Start(shutdown *utils.SyncedChan, registry *servmana
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
- routeS.Lock()
- defer routeS.Unlock()
+ routeS.mu.Lock()
+ defer routeS.mu.Unlock()
routeS.routeS = engine.NewRouteService(dbs.DataManager(), fs.FilterS(), routeS.cfg, cms.ConnManager())
srv, _ := engine.NewService(routeS.routeS)
// srv, _ := birpc.NewService(apis.NewRouteSv1(routeS.routeS), "", false)
for _, s := range srv {
- routeS.cl.RpcRegister(s)
+ cl.RpcRegister(s)
}
cms.AddInternalConn(utils.RouteS, srv)
return
@@ -90,11 +86,12 @@ func (routeS *RouteService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRe
}
// Shutdown stops the service
-func (routeS *RouteService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
- routeS.Lock()
- defer routeS.Unlock()
+func (routeS *RouteService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
+ routeS.mu.Lock()
+ defer routeS.mu.Unlock()
routeS.routeS = nil
- routeS.cl.RpcUnregisterName(utils.RouteSv1)
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ cl.RpcUnregisterName(utils.RouteSv1)
return
}
diff --git a/services/sessions.go b/services/sessions.go
index 57de1c2d6..defd5b5dd 100644
--- a/services/sessions.go
+++ b/services/sessions.go
@@ -41,16 +41,12 @@ func NewSessionService(cfg *config.CGRConfig) *SessionService {
// SessionService implements Service interface
type SessionService struct {
- sync.RWMutex
-
- sm *sessions.SessionS
- cl *commonlisteners.CommonListenerS
-
+ mu sync.RWMutex
+ sm *sessions.SessionS
bircpEnabled bool // to stop birpc server if needed
stopChan chan struct{}
cfg *config.CGRConfig
-
- stateDeps *StateDependencies // channel subscriptions for state changes
+ stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
@@ -66,15 +62,15 @@ func (smg *SessionService) Start(shutdown *utils.SyncedChan, registry *servmanag
if err != nil {
return err
}
- smg.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
- fs := srvDeps[utils.FilterS].(*FilterService)
- dbs := srvDeps[utils.DataDB].(*DataDBService)
+ fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
+ dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager()
- smg.Lock()
- defer smg.Unlock()
+ smg.mu.Lock()
+ defer smg.mu.Unlock()
- smg.sm = sessions.NewSessionS(smg.cfg, dbs.DataManager(), fs.FilterS(), cms.ConnManager())
+ smg.sm = sessions.NewSessionS(smg.cfg, dbs, fs, cms.ConnManager())
//start sync session in a separate goroutine
smg.stopChan = make(chan struct{})
go smg.sm.ListenAndServe(smg.stopChan)
@@ -84,28 +80,28 @@ func (smg *SessionService) Start(shutdown *utils.SyncedChan, registry *servmanag
srv, _ := engine.NewServiceWithName(smg.sm, utils.SessionS, true) // methods with multiple options
// srv, _ := birpc.NewService(apis.NewSessionSv1(smg.sm), utils.EmptyString, false) // methods with multiple options
for _, s := range srv {
- smg.cl.RpcRegister(s)
+ cl.RpcRegister(s)
}
// Register BiRpc handlers
if smg.cfg.SessionSCfg().ListenBijson != utils.EmptyString {
smg.bircpEnabled = true
for n, s := range srv {
- smg.cl.BiRPCRegisterName(n, s)
+ cl.BiRPCRegisterName(n, s)
}
// run this in it's own goroutine
- go smg.start(shutdown)
+ go smg.start(shutdown, cl)
}
cms.AddInternalConn(utils.SessionS, srv)
return
}
-func (smg *SessionService) start(shutdown *utils.SyncedChan) (err error) {
- if err := smg.cl.ServeBiRPC(smg.cfg.SessionSCfg().ListenBijson,
+func (smg *SessionService) start(shutdown *utils.SyncedChan, cl *commonlisteners.CommonListenerS) (err error) {
+ if err := cl.ServeBiRPC(smg.cfg.SessionSCfg().ListenBijson,
smg.cfg.SessionSCfg().ListenBigob, smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> serve BiRPC error: %s!", utils.SessionS, err))
- smg.Lock()
+ smg.mu.Lock()
smg.bircpEnabled = false
- smg.Unlock()
+ smg.mu.Unlock()
shutdown.CloseOnce()
}
return
@@ -117,19 +113,20 @@ func (smg *SessionService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceReg
}
// Shutdown stops the service
-func (smg *SessionService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
- smg.Lock()
- defer smg.Unlock()
+func (smg *SessionService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
+ smg.mu.Lock()
+ defer smg.mu.Unlock()
close(smg.stopChan)
if err = smg.sm.Shutdown(); err != nil {
return
}
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
if smg.bircpEnabled {
- smg.cl.StopBiRPC()
+ cl.StopBiRPC()
smg.bircpEnabled = false
}
smg.sm = nil
- smg.cl.RpcUnregisterName(utils.SessionSv1)
+ cl.RpcUnregisterName(utils.SessionSv1)
// smg.server.BiRPCUnregisterName(utils.SessionSv1)
return
}
diff --git a/services/sipagent.go b/services/sipagent.go
index 6447ef29c..c29063249 100644
--- a/services/sipagent.go
+++ b/services/sipagent.go
@@ -38,12 +38,10 @@ func NewSIPAgent(cfg *config.CGRConfig) *SIPAgent {
// SIPAgent implements Agent interface
type SIPAgent struct {
- sync.RWMutex
- cfg *config.CGRConfig
-
+ mu sync.RWMutex
+ cfg *config.CGRConfig
sip *agents.SIPAgent
oldListen string
-
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -61,8 +59,8 @@ func (sip *SIPAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.Ser
cm := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager()
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
- sip.Lock()
- defer sip.Unlock()
+ sip.mu.Lock()
+ defer sip.mu.Unlock()
sip.oldListen = sip.cfg.SIPAgentCfg().Listen
sip.sip, err = agents.NewSIPAgent(cm, sip.cfg, fs)
if err != nil {
@@ -84,19 +82,19 @@ func (sip *SIPAgent) Reload(shutdown *utils.SyncedChan, _ *servmanager.ServiceRe
if sip.oldListen == sip.cfg.SIPAgentCfg().Listen {
return
}
- sip.Lock()
+ sip.mu.Lock()
sip.sip.Shutdown()
sip.oldListen = sip.cfg.SIPAgentCfg().Listen
sip.sip.InitStopChan()
- sip.Unlock()
+ sip.mu.Unlock()
go sip.listenAndServe(shutdown)
return
}
// Shutdown stops the service
func (sip *SIPAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
- sip.Lock()
- defer sip.Unlock()
+ sip.mu.Lock()
+ defer sip.mu.Unlock()
sip.sip.Shutdown()
sip.sip = nil
return
diff --git a/services/stats.go b/services/stats.go
index 557b76d39..b6141b79c 100644
--- a/services/stats.go
+++ b/services/stats.go
@@ -22,7 +22,6 @@ import (
"sync"
"github.com/cgrates/birpc/context"
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -39,12 +38,9 @@ func NewStatService(cfg *config.CGRConfig) *StatService {
// StatService implements Service interface
type StatService struct {
- sync.RWMutex
- cfg *config.CGRConfig
-
- sts *engine.StatS
- cl *commonlisteners.CommonListenerS
-
+ mu sync.RWMutex
+ cfg *config.CGRConfig
+ sts *engine.StatS
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -62,7 +58,7 @@ func (sts *StatService) Start(shutdown *utils.SyncedChan, registry *servmanager.
if err != nil {
return err
}
- sts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
@@ -74,14 +70,14 @@ func (sts *StatService) Start(shutdown *utils.SyncedChan, registry *servmanager.
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
- sts.Lock()
- defer sts.Unlock()
+ sts.mu.Lock()
+ defer sts.mu.Unlock()
sts.sts = engine.NewStatService(dbs.DataManager(), sts.cfg, fs.FilterS(), cms.ConnManager())
sts.sts.StartLoop(context.TODO())
srv, _ := engine.NewService(sts.sts)
// srv, _ := birpc.NewService(apis.NewStatSv1(sts.sts), "", false)
for _, s := range srv {
- sts.cl.RpcRegister(s)
+ cl.RpcRegister(s)
}
cms.AddInternalConn(utils.StatS, srv)
return
@@ -89,19 +85,20 @@ func (sts *StatService) Start(shutdown *utils.SyncedChan, registry *servmanager.
// Reload handles the change of config
func (sts *StatService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
- sts.Lock()
+ sts.mu.Lock()
sts.sts.Reload(context.TODO())
- sts.Unlock()
+ sts.mu.Unlock()
return
}
// Shutdown stops the service
-func (sts *StatService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
- sts.Lock()
- defer sts.Unlock()
+func (sts *StatService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
+ sts.mu.Lock()
+ defer sts.mu.Unlock()
sts.sts.Shutdown(context.TODO())
sts.sts = nil
- sts.cl.RpcUnregisterName(utils.StatSv1)
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ cl.RpcUnregisterName(utils.StatSv1)
return
}
diff --git a/services/stordb.go b/services/stordb.go
index 55bd1359e..911baaef5 100644
--- a/services/stordb.go
+++ b/services/stordb.go
@@ -39,7 +39,7 @@ func NewStorDBService(cfg *config.CGRConfig, setVersions bool) *StorDBService {
// StorDBService implements Service interface
type StorDBService struct {
- sync.RWMutex
+ mu sync.RWMutex
cfg *config.CGRConfig
oldDBCfg *config.StorDbCfg
db engine.StorDB
@@ -49,8 +49,8 @@ type StorDBService struct {
// Start should handle the service start
func (db *StorDBService) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
- db.Lock()
- defer db.Unlock()
+ db.mu.Lock()
+ defer db.mu.Unlock()
db.oldDBCfg = db.cfg.StorDbCfg().Clone()
dbConn, err := engine.NewStorDBConn(db.cfg.StorDbCfg().Type, db.cfg.StorDbCfg().Host,
db.cfg.StorDbCfg().Port, db.cfg.StorDbCfg().Name, db.cfg.StorDbCfg().User,
@@ -76,8 +76,8 @@ func (db *StorDBService) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegist
// Reload handles the change of config
func (db *StorDBService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
- db.Lock()
- defer db.Unlock()
+ db.mu.Lock()
+ defer db.mu.Unlock()
if db.needsConnectionReload() {
var d engine.StorDB
if d, err = engine.NewStorDBConn(db.cfg.StorDbCfg().Type, db.cfg.StorDbCfg().Host,
@@ -123,10 +123,10 @@ func (db *StorDBService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegis
// Shutdown stops the service
func (db *StorDBService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) {
- db.Lock()
+ db.mu.Lock()
+ defer db.mu.Unlock()
db.db.Close()
db.db = nil
- db.Unlock()
return
}
diff --git a/services/thresholds.go b/services/thresholds.go
index eeb3d0592..d744b0e91 100644
--- a/services/thresholds.go
+++ b/services/thresholds.go
@@ -22,7 +22,6 @@ import (
"sync"
"github.com/cgrates/birpc/context"
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -39,12 +38,9 @@ func NewThresholdService(cfg *config.CGRConfig) *ThresholdService {
// ThresholdService implements Service interface
type ThresholdService struct {
- sync.RWMutex
- cfg *config.CGRConfig
-
- thrs *engine.ThresholdS
- cl *commonlisteners.CommonListenerS
-
+ mu sync.RWMutex
+ cfg *config.CGRConfig
+ thrs *engine.ThresholdS
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -62,7 +58,7 @@ func (thrs *ThresholdService) Start(shutdown *utils.SyncedChan, registry *servma
if err != nil {
return err
}
- thrs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
@@ -74,35 +70,36 @@ func (thrs *ThresholdService) Start(shutdown *utils.SyncedChan, registry *servma
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
- thrs.Lock()
- defer thrs.Unlock()
+ thrs.mu.Lock()
+ defer thrs.mu.Unlock()
thrs.thrs = engine.NewThresholdService(dbs.DataManager(), thrs.cfg, fs.FilterS(), cms.ConnManager())
thrs.thrs.StartLoop(context.TODO())
srv, _ := engine.NewService(thrs.thrs)
// srv, _ := birpc.NewService(apis.NewThresholdSv1(thrs.thrs), "", false)
for _, s := range srv {
- thrs.cl.RpcRegister(s)
+ cl.RpcRegister(s)
}
cms.AddInternalConn(utils.ThresholdS, srv)
return
}
// Reload handles the change of config
-func (thrs *ThresholdService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (_ error) {
- thrs.Lock()
+func (thrs *ThresholdService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error {
+ thrs.mu.Lock()
thrs.thrs.Reload(context.TODO())
- thrs.Unlock()
- return
+ thrs.mu.Unlock()
+ return nil
}
// Shutdown stops the service
-func (thrs *ThresholdService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) {
- thrs.Lock()
- defer thrs.Unlock()
+func (thrs *ThresholdService) Shutdown(registry *servmanager.ServiceRegistry) error {
+ thrs.mu.Lock()
+ defer thrs.mu.Unlock()
thrs.thrs.Shutdown(context.TODO())
thrs.thrs = nil
- thrs.cl.RpcUnregisterName(utils.ThresholdSv1)
- return
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ cl.RpcUnregisterName(utils.ThresholdSv1)
+ return nil
}
// ServiceName returns the service name
diff --git a/services/tpes.go b/services/tpes.go
index 366f0135b..72a322f68 100644
--- a/services/tpes.go
+++ b/services/tpes.go
@@ -22,7 +22,6 @@ import (
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/apis"
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/tpes"
@@ -39,10 +38,9 @@ func NewTPeService(cfg *config.CGRConfig) *TPeService {
// TypeService implements Service interface
type TPeService struct {
- sync.RWMutex
+ mu sync.RWMutex
cfg *config.CGRConfig
tpes *tpes.TPeS
- cl *commonlisteners.CommonListenerS
srv *birpc.Service
stopChan chan struct{}
stateDeps *StateDependencies // channel subscriptions for state changes
@@ -50,7 +48,6 @@ type TPeService struct {
// Start should handle the service start
func (ts *TPeService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
-
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
@@ -61,14 +58,17 @@ func (ts *TPeService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe
if err != nil {
return err
}
- ts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cm := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager()
dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager()
+ ts.mu.Lock()
+ defer ts.mu.Unlock()
+
ts.tpes = tpes.NewTPeS(ts.cfg, dbs, cm)
ts.stopChan = make(chan struct{})
ts.srv, _ = birpc.NewService(apis.NewTPeSv1(ts.tpes), utils.EmptyString, false)
- ts.cl.RpcRegister(ts.srv)
+ cl.RpcRegister(ts.srv)
return
}
@@ -78,9 +78,13 @@ func (ts *TPeService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry
}
// Shutdown stops the service
-func (ts *TPeService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
+func (ts *TPeService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
+ ts.mu.Lock()
+ defer ts.mu.Unlock()
ts.srv = nil
close(ts.stopChan)
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ cl.RpcUnregisterName(utils.TPeSv1)
return
}
diff --git a/services/trends.go b/services/trends.go
index 3c294df16..99eafdf6c 100644
--- a/services/trends.go
+++ b/services/trends.go
@@ -22,7 +22,6 @@ import (
"sync"
"github.com/cgrates/birpc/context"
- "github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -38,12 +37,9 @@ func NewTrendService(cfg *config.CGRConfig) *TrendService {
}
type TrendService struct {
- sync.RWMutex
- cfg *config.CGRConfig
-
- trs *engine.TrendS
- cl *commonlisteners.CommonListenerS
-
+ mu sync.RWMutex
+ cfg *config.CGRConfig
+ trs *engine.TrendS
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -61,7 +57,7 @@ func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager
if err != nil {
return err
}
- trs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
+ cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
@@ -72,8 +68,8 @@ func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
- trs.Lock()
- defer trs.Unlock()
+ trs.mu.Lock()
+ defer trs.mu.Unlock()
trs.trs = engine.NewTrendService(dbs.DataManager(), trs.cfg, fs.FilterS(), cms.ConnManager())
if err := trs.trs.StartTrendS(context.TODO()); err != nil {
return err
@@ -83,7 +79,7 @@ func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager
return err
}
for _, s := range srv {
- trs.cl.RpcRegister(s)
+ cl.RpcRegister(s)
}
cms.AddInternalConn(utils.TrendS, srv)
return nil
@@ -91,19 +87,20 @@ func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager
// Reload handles the change of config
func (trs *TrendService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
- trs.Lock()
+ trs.mu.Lock()
trs.trs.Reload(context.TODO())
- trs.Unlock()
+ trs.mu.Unlock()
return
}
// Shutdown stops the service
-func (trs *TrendService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
- trs.Lock()
- defer trs.Unlock()
+func (trs *TrendService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
+ trs.mu.Lock()
+ defer trs.mu.Unlock()
trs.trs.StopTrendS()
trs.trs = nil
- trs.cl.RpcUnregisterName(utils.TrendSv1)
+ cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
+ cl.RpcUnregisterName(utils.TrendSv1)
return
}