Remove cls fields from service structs

now retrieved through registry.Lookup
This commit is contained in:
ionutboangiu
2025-01-23 19:09:22 +02:00
committed by Dan Christian Bogos
parent 06ccafb5fd
commit 652d1e68cf
37 changed files with 489 additions and 536 deletions

View File

@@ -22,7 +22,6 @@ import (
"sync"
"github.com/cgrates/cgrates/accounts"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -41,15 +40,11 @@ func NewAccountService(cfg *config.CGRConfig) *AccountService {
// AccountService implements Service interface
type AccountService struct {
sync.RWMutex
cfg *config.CGRConfig
acts *accounts.AccountS
cl *commonlisteners.CommonListenerS
rldChan chan struct{}
stopChan chan struct{}
mu sync.RWMutex
cfg *config.CGRConfig
acts *accounts.AccountS
rldChan chan struct{}
stopChan chan struct{}
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -67,7 +62,7 @@ func (acts *AccountService) Start(shutdown *utils.SyncedChan, registry *servmana
if err != nil {
return err
}
acts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
@@ -78,8 +73,8 @@ func (acts *AccountService) Start(shutdown *utils.SyncedChan, registry *servmana
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager()
acts.Lock()
defer acts.Unlock()
acts.mu.Lock()
defer acts.mu.Unlock()
acts.acts = accounts.NewAccountS(acts.cfg, fs, cms.ConnManager(), dbs)
acts.stopChan = make(chan struct{})
go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan)
@@ -87,9 +82,7 @@ func (acts *AccountService) Start(shutdown *utils.SyncedChan, registry *servmana
if err != nil {
return err
}
acts.cl.RpcRegister(srv)
cl.RpcRegister(srv)
cms.AddInternalConn(utils.AccountS, srv)
return
}
@@ -101,12 +94,13 @@ func (acts *AccountService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRe
}
// Shutdown stops the service
func (acts *AccountService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
acts.Lock()
func (acts *AccountService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
acts.mu.Lock()
defer acts.mu.Unlock()
close(acts.stopChan)
acts.acts = nil
acts.Unlock()
acts.cl.RpcUnregisterName(utils.AccountSv1)
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
cl.RpcUnregisterName(utils.AccountSv1)
return
}

View File

@@ -22,7 +22,6 @@ import (
"sync"
"github.com/cgrates/cgrates/actions"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -41,15 +40,11 @@ func NewActionService(cfg *config.CGRConfig) *ActionService {
// ActionService implements Service interface
type ActionService struct {
sync.RWMutex
cfg *config.CGRConfig
acts *actions.ActionS
cl *commonlisteners.CommonListenerS
rldChan chan struct{}
stopChan chan struct{}
mu sync.RWMutex
cfg *config.CGRConfig
acts *actions.ActionS
rldChan chan struct{}
stopChan chan struct{}
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -67,7 +62,7 @@ func (acts *ActionService) Start(shutdown *utils.SyncedChan, registry *servmanag
if err != nil {
return err
}
acts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
@@ -78,8 +73,8 @@ func (acts *ActionService) Start(shutdown *utils.SyncedChan, registry *servmanag
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager()
acts.Lock()
defer acts.Unlock()
acts.mu.Lock()
defer acts.mu.Unlock()
acts.acts = actions.NewActionS(acts.cfg, fs, dbs, cms.ConnManager())
acts.stopChan = make(chan struct{})
go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan)
@@ -88,7 +83,7 @@ func (acts *ActionService) Start(shutdown *utils.SyncedChan, registry *servmanag
return
}
// srv, _ := birpc.NewService(apis.NewActionSv1(acts.acts), "", false)
acts.cl.RpcRegister(srv)
cl.RpcRegister(srv)
cms.AddInternalConn(utils.ActionS, srv)
return
}
@@ -100,13 +95,14 @@ func (acts *ActionService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceReg
}
// Shutdown stops the service
func (acts *ActionService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
acts.Lock()
defer acts.Unlock()
func (acts *ActionService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
acts.mu.Lock()
defer acts.mu.Unlock()
close(acts.stopChan)
acts.acts.Shutdown()
acts.acts = nil
acts.cl.RpcUnregisterName(utils.ActionSv1)
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
cl.RpcUnregisterName(utils.ActionSv1)
return
}

View File

@@ -22,7 +22,6 @@ import (
"sync"
"github.com/cgrates/cgrates/apis"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -39,17 +38,16 @@ func NewAdminSv1Service(cfg *config.CGRConfig) *AdminSv1Service {
// AdminSv1Service implements Service interface
type AdminSv1Service struct {
sync.RWMutex
mu sync.RWMutex
cfg *config.CGRConfig
api *apis.AdminSv1
cl *commonlisteners.CommonListenerS
stopChan chan struct{}
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
// For this service the start should be called from RAL Service
func (apiService *AdminSv1Service) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
func (s *AdminSv1Service) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
@@ -58,61 +56,62 @@ func (apiService *AdminSv1Service) Start(_ *utils.SyncedChan, registry *servmana
utils.DataDB,
utils.StorDB,
},
registry, apiService.cfg.GeneralCfg().ConnectTimeout)
registry, s.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
apiService.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
sdbs := srvDeps[utils.StorDB].(*StorDBService)
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
dm := srvDeps[utils.DataDB].(*DataDBService).DataManager()
sdb := srvDeps[utils.StorDB].(*StorDBService).DB()
apiService.Lock()
defer apiService.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
apiService.api = apis.NewAdminSv1(apiService.cfg, dbs.DataManager(), cms.ConnManager(), fs.FilterS(), sdbs.DB())
s.api = apis.NewAdminSv1(s.cfg, dm, cms.ConnManager(), fs, sdb)
srv, _ := engine.NewService(apiService.api)
// srv, _ := birpc.NewService(apiService.api, "", false)
srv, _ := engine.NewService(s.api)
// srv, _ := birpc.NewService(s.api, "", false)
for _, s := range srv {
apiService.cl.RpcRegister(s)
cl.RpcRegister(s)
}
rpl, _ := engine.NewService(apis.NewReplicatorSv1(dbs.DataManager(), apiService.api))
for _, s := range rpl {
apiService.cl.RpcRegister(s)
rpl, _ := engine.NewService(apis.NewReplicatorSv1(dm, s.api))
for _, svc := range rpl {
cl.RpcRegister(svc)
}
cms.AddInternalConn(utils.AdminS, srv)
return
}
// Reload handles the change of config
func (apiService *AdminSv1Service) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
func (s *AdminSv1Service) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
return
}
// Shutdown stops the service
func (apiService *AdminSv1Service) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
apiService.Lock()
// close(apiService.stopChan)
apiService.api = nil
apiService.cl.RpcUnregisterName(utils.AdminSv1)
apiService.Unlock()
func (s *AdminSv1Service) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
s.mu.Lock()
defer s.mu.Unlock()
// close(s.stopChan)
s.api = nil
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
cl.RpcUnregisterName(utils.AdminSv1)
return
}
// ServiceName returns the service name
func (apiService *AdminSv1Service) ServiceName() string {
func (s *AdminSv1Service) ServiceName() string {
return utils.AdminS
}
// ShouldRun returns if the service should be running
func (apiService *AdminSv1Service) ShouldRun() bool {
return apiService.cfg.AdminSCfg().Enabled
func (s *AdminSv1Service) ShouldRun() bool {
return s.cfg.AdminSCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (apiService *AdminSv1Service) StateChan(stateID string) chan struct{} {
return apiService.stateDeps.StateChan(stateID)
func (s *AdminSv1Service) StateChan(stateID string) chan struct{} {
return s.stateDeps.StateChan(stateID)
}

View File

@@ -47,10 +47,9 @@ func NewAnalyzerService(cfg *config.CGRConfig) *AnalyzerService {
// AnalyzerService implements Service interface
type AnalyzerService struct {
sync.RWMutex
mu sync.RWMutex
cfg *config.CGRConfig
anz *analyzers.AnalyzerS
cl *commonlisteners.CommonListenerS
cancelFunc context.CancelFunc
stateDeps *StateDependencies // channel subscriptions for state changes
@@ -63,10 +62,10 @@ func (anz *AnalyzerService) Start(shutdown *utils.SyncedChan, registry *servmana
if err != nil {
return
}
anz.cl = cls.(*CommonListenerService).CLS()
cl := cls.(*CommonListenerService).CLS()
anz.Lock()
defer anz.Unlock()
anz.mu.Lock()
defer anz.mu.Unlock()
if anz.anz, err = analyzers.NewAnalyzerS(anz.cfg); err != nil {
return
}
@@ -79,26 +78,26 @@ func (anz *AnalyzerService) Start(shutdown *utils.SyncedChan, registry *servmana
shutdown.CloseOnce()
}
}(anz.anz)
anz.cl.SetAnalyzer(anz.anz)
go anz.start(registry)
cl.SetAnalyzer(anz.anz)
go anz.start(registry, cl)
return
}
func (anz *AnalyzerService) start(registry *servmanager.ServiceRegistry) {
func (anz *AnalyzerService) start(registry *servmanager.ServiceRegistry, cl *commonlisteners.CommonListenerS) {
fs, err := WaitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
anz.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
anz.Lock()
anz.mu.Lock()
anz.anz.SetFilterS(fs.(*FilterService).FilterS())
srv, _ := engine.NewService(anz.anz)
// srv, _ := birpc.NewService(apis.NewAnalyzerSv1(anz.anz), "", false)
for _, s := range srv {
anz.cl.RpcRegister(s)
cl.RpcRegister(s)
}
anz.Unlock()
anz.mu.Unlock()
}
// Reload handles the change of config
@@ -107,14 +106,15 @@ func (anz *AnalyzerService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRe
}
// Shutdown stops the service
func (anz *AnalyzerService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
anz.Lock()
func (anz *AnalyzerService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
anz.mu.Lock()
anz.cancelFunc()
anz.cl.SetAnalyzer(nil)
cl.SetAnalyzer(nil)
anz.anz.Shutdown()
anz.anz = nil
anz.Unlock()
anz.cl.RpcUnregisterName(utils.AnalyzerSv1)
anz.mu.Unlock()
cl.RpcUnregisterName(utils.AnalyzerSv1)
return
}

View File

@@ -38,7 +38,7 @@ func NewAsteriskAgent(cfg *config.CGRConfig) *AsteriskAgent {
// AsteriskAgent implements Agent interface
type AsteriskAgent struct {
sync.RWMutex
mu sync.RWMutex
cfg *config.CGRConfig
stopChan chan struct{}
smas []*agents.AsteriskAgent
@@ -52,8 +52,8 @@ func (ast *AsteriskAgent) Start(shutdown *utils.SyncedChan, registry *servmanage
return
}
ast.Lock()
defer ast.Unlock()
ast.mu.Lock()
defer ast.mu.Unlock()
listenAndServe := func(sma *agents.AsteriskAgent, stopChan chan struct{}) {
if err := sma.ListenAndServe(stopChan); err != nil {
@@ -83,10 +83,10 @@ func (ast *AsteriskAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
}
func (ast *AsteriskAgent) shutdown() {
ast.Lock()
ast.mu.Lock()
defer ast.mu.Unlock()
close(ast.stopChan)
ast.smas = nil
ast.Unlock()
return // no shutdown for the momment
}

View File

@@ -22,7 +22,6 @@ import (
"sync"
"github.com/cgrates/cgrates/apis"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -39,13 +38,10 @@ func NewAttributeService(cfg *config.CGRConfig) *AttributeService {
// AttributeService implements Service interface
type AttributeService struct {
sync.RWMutex
cfg *config.CGRConfig
attrS *engine.AttributeS
cl *commonlisteners.CommonListenerS
rpc *apis.AttributeSv1 // useful on restart
mu sync.Mutex
cfg *config.CGRConfig
attrS *engine.AttributeS
rpc *apis.AttributeSv1 // useful on restart
stateDeps *StateDependencies
}
@@ -63,7 +59,7 @@ func (attrS *AttributeService) Start(shutdown *utils.SyncedChan, registry *servm
if err != nil {
return
}
attrS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
@@ -74,14 +70,14 @@ func (attrS *AttributeService) Start(shutdown *utils.SyncedChan, registry *servm
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
dm := srvDeps[utils.DataDB].(*DataDBService).DataManager()
attrS.Lock()
defer attrS.Unlock()
attrS.mu.Lock()
defer attrS.mu.Unlock()
attrS.attrS = engine.NewAttributeService(dm, fs, attrS.cfg)
attrS.rpc = apis.NewAttributeSv1(attrS.attrS)
srv, _ := engine.NewService(attrS.rpc)
// srv, _ := birpc.NewService(attrS.rpc, "", false)
for _, s := range srv {
attrS.cl.RpcRegister(s)
cl.RpcRegister(s)
}
cms.AddInternalConn(utils.AttributeS, srv)
return
@@ -94,9 +90,10 @@ func (attrS *AttributeService) Reload(_ *utils.SyncedChan, _ *servmanager.Servic
// Shutdown stops the service
func (attrS *AttributeService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
attrS.Lock()
attrS.cl.RpcUnregisterName(utils.AttributeSv1)
attrS.Unlock()
attrS.mu.Lock()
defer attrS.mu.Unlock()
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
cl.RpcUnregisterName(utils.AttributeSv1)
return
}

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -41,7 +40,6 @@ func NewCacheService(cfg *config.CGRConfig) *CacheService {
type CacheService struct {
mu sync.Mutex
cfg *config.CGRConfig
cl *commonlisteners.CommonListenerS
cacheCh chan *engine.CacheS
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -59,7 +57,7 @@ func (cS *CacheService) Start(shutdown *utils.SyncedChan, registry *servmanager.
if err != nil {
return err
}
cS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
dbs := srvDeps[utils.DataDB].(*DataDBService)
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cs := srvDeps[utils.CoreS].(*CoreService)
@@ -75,7 +73,7 @@ func (cS *CacheService) Start(shutdown *utils.SyncedChan, registry *servmanager.
srv, _ := engine.NewService(engine.Cache)
// srv, _ := birpc.NewService(apis.NewCacheSv1(engine.Cache), "", false)
for _, s := range srv {
cS.cl.RpcRegister(s)
cl.RpcRegister(s)
}
cms.AddInternalConn(utils.CacheS, srv)
return
@@ -87,8 +85,11 @@ func (cS *CacheService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegist
}
// Shutdown stops the service
func (cS *CacheService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) {
cS.cl.RpcUnregisterName(utils.CacheSv1)
func (cS *CacheService) Shutdown(registry *servmanager.ServiceRegistry) (_ error) {
cS.mu.Lock()
cS.mu.Unlock()
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
cl.RpcUnregisterName(utils.CacheSv1)
return
}

View File

@@ -19,8 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package services
import (
"sync"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -37,7 +35,6 @@ func NewCapService(cfg *config.CGRConfig) *CapService {
// CapService implements Service interface.
type CapService struct {
mu sync.RWMutex
cfg *config.CGRConfig
caps *engine.Caps
stateDeps *StateDependencies // channel subscriptions for state changes

View File

@@ -23,7 +23,6 @@ import (
"sync"
"github.com/cgrates/cgrates/cdrs"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -40,12 +39,9 @@ func NewCDRServer(cfg *config.CGRConfig) *CDRService {
// CDRService implements Service interface
type CDRService struct {
sync.RWMutex
cfg *config.CGRConfig
cdrS *cdrs.CDRServer
cl *commonlisteners.CommonListenerS
mu sync.RWMutex
cfg *config.CGRConfig
cdrS *cdrs.CDRServer
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -63,14 +59,14 @@ func (cs *CDRService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe
if err != nil {
return err
}
cs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
dbs := srvDeps[utils.DataDB].(*DataDBService)
sdbs := srvDeps[utils.StorDB].(*StorDBService).DB()
cs.Lock()
defer cs.Unlock()
cs.mu.Lock()
defer cs.mu.Unlock()
cs.cdrS = cdrs.NewCDRServer(cs.cfg, dbs.DataManager(), fs, cms.ConnManager(), sdbs)
runtime.Gosched()
@@ -78,7 +74,7 @@ func (cs *CDRService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe
if err != nil {
return err
}
cs.cl.RpcRegister(srv)
cl.RpcRegister(srv)
cms.AddInternalConn(utils.CDRServer, srv)
return
}
@@ -89,11 +85,12 @@ func (cs *CDRService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry
}
// Shutdown stops the service
func (cs *CDRService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
cs.Lock()
func (cs *CDRService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
cs.mu.Lock()
defer cs.mu.Unlock()
cs.cdrS = nil
cs.Unlock()
cs.cl.RpcUnregisterName(utils.CDRsV1)
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
cl.RpcUnregisterName(utils.CDRsV1)
return
}

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -38,12 +37,9 @@ func NewChargerService(cfg *config.CGRConfig) *ChargerService {
// ChargerService implements Service interface
type ChargerService struct {
sync.RWMutex
cfg *config.CGRConfig
chrS *engine.ChargerS
cl *commonlisteners.CommonListenerS
mu sync.RWMutex
cfg *config.CGRConfig
chrS *engine.ChargerS
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -61,7 +57,7 @@ func (chrS *ChargerService) Start(shutdown *utils.SyncedChan, registry *servmana
if err != nil {
return err
}
chrS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
@@ -72,13 +68,13 @@ func (chrS *ChargerService) Start(shutdown *utils.SyncedChan, registry *servmana
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
chrS.Lock()
defer chrS.Unlock()
chrS.mu.Lock()
defer chrS.mu.Unlock()
chrS.chrS = engine.NewChargerService(dbs.DataManager(), fs.FilterS(), chrS.cfg, cms.ConnManager())
srv, _ := engine.NewService(chrS.chrS)
// srv, _ := birpc.NewService(apis.NewChargerSv1(chrS.chrS), "", false)
for _, s := range srv {
chrS.cl.RpcRegister(s)
cl.RpcRegister(s)
}
cms.AddInternalConn(utils.ChargerS, srv)
return nil
@@ -90,11 +86,12 @@ func (chrS *ChargerService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRe
}
// Shutdown stops the service
func (chrS *ChargerService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
chrS.Lock()
defer chrS.Unlock()
func (chrS *ChargerService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
chrS.mu.Lock()
defer chrS.mu.Unlock()
chrS.chrS = nil
chrS.cl.RpcUnregisterName(utils.ChargerSv1)
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
cl.RpcUnregisterName(utils.ChargerSv1)
return
}

View File

@@ -69,7 +69,45 @@ func (s *CommonListenerService) Reload(_ *utils.SyncedChan, _ *servmanager.Servi
}
// Shutdown stops the service.
func (s *CommonListenerService) Shutdown(_ *servmanager.ServiceRegistry) error {
func (s *CommonListenerService) Shutdown(registry *servmanager.ServiceRegistry) error {
deps := []string{
utils.AccountS,
utils.ActionS,
utils.AdminS,
utils.AnalyzerS,
utils.AttributeS,
utils.CacheS,
utils.CDRServer,
utils.ChargerS,
utils.ConfigS,
utils.CoreS,
utils.EEs,
utils.EFs,
utils.ERs,
utils.GuardianS,
utils.HTTPAgent,
utils.JanusAgent,
utils.LoaderS,
utils.RankingS,
utils.RateS,
utils.RegistrarC,
utils.ResourceS,
utils.RouteS,
utils.SessionS,
utils.StatS,
utils.ThresholdS,
utils.TPeS,
utils.TrendS,
}
for _, svcID := range deps {
if !servmanager.IsServiceInState(registry.Lookup(svcID), utils.StateServiceUP) {
continue
}
_, err := WaitForServiceState(utils.StateServiceDOWN, svcID, registry, s.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
}
s.mu.Lock()
defer s.mu.Unlock()
s.cls = nil

View File

@@ -19,9 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package services
import (
"sync"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -38,9 +35,7 @@ func NewConfigService(cfg *config.CGRConfig) *ConfigService {
// ConfigService implements Service interface.
type ConfigService struct {
mu sync.RWMutex
cfg *config.CGRConfig
cl *commonlisteners.CommonListenerS
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -55,12 +50,12 @@ func (s *ConfigService) Start(_ *utils.SyncedChan, registry *servmanager.Service
if err != nil {
return err
}
s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
svcs, _ := engine.NewServiceWithName(s.cfg, utils.ConfigS, true)
for _, svc := range svcs {
s.cl.RpcRegister(svc)
cl.RpcRegister(svc)
}
cms.AddInternalConn(utils.ConfigS, svcs)
return nil
@@ -72,8 +67,9 @@ func (s *ConfigService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegist
}
// Shutdown stops the service.
func (s *ConfigService) Shutdown(_ *servmanager.ServiceRegistry) error {
s.cl.RpcUnregisterName(utils.ConfigSv1)
func (s *ConfigService) Shutdown(registry *servmanager.ServiceRegistry) error {
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
cl.RpcUnregisterName(utils.ConfigSv1)
return nil
}

View File

@@ -22,7 +22,6 @@ import (
"os"
"sync"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/engine"
@@ -45,7 +44,6 @@ type CoreService struct {
mu sync.RWMutex
cfg *config.CGRConfig
cS *cores.CoreS
cl *commonlisteners.CommonListenerS
fileCPU *os.File
stopChan chan struct{}
shdWg *sync.WaitGroup
@@ -65,7 +63,7 @@ func (s *CoreService) Start(shutdown *utils.SyncedChan, registry *servmanager.Se
return err
}
caps := srvDeps[utils.CapS].(*CapService).Caps()
s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
s.mu.Lock()
@@ -77,7 +75,7 @@ func (s *CoreService) Start(shutdown *utils.SyncedChan, registry *servmanager.Se
return err
}
for _, svc := range srv {
s.cl.RpcRegister(svc)
cl.RpcRegister(svc)
}
cms.AddInternalConn(utils.CoreS, srv)
return nil
@@ -89,7 +87,7 @@ func (s *CoreService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry
}
// Shutdown stops the service
func (s *CoreService) Shutdown(_ *servmanager.ServiceRegistry) error {
func (s *CoreService) Shutdown(registry *servmanager.ServiceRegistry) error {
s.mu.Lock()
defer s.mu.Unlock()
s.cS.Shutdown()
@@ -97,7 +95,8 @@ func (s *CoreService) Shutdown(_ *servmanager.ServiceRegistry) error {
s.cS.StopCPUProfiling()
s.cS.StopMemoryProfiling()
s.cS = nil
s.cl.RpcUnregisterName(utils.CoreSv1)
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
cl.RpcUnregisterName(utils.CoreSv1)
return nil
}

View File

@@ -39,7 +39,7 @@ func NewDataDBService(cfg *config.CGRConfig, setVersions bool) *DataDBService {
// DataDBService implements Service interface
type DataDBService struct {
sync.RWMutex
mu sync.RWMutex
cfg *config.CGRConfig
oldDBCfg *config.DataDbCfg
dm *engine.DataManager
@@ -53,8 +53,8 @@ func (db *DataDBService) Start(_ *utils.SyncedChan, registry *servmanager.Servic
if err != nil {
return
}
db.Lock()
defer db.Unlock()
db.mu.Lock()
defer db.mu.Unlock()
db.oldDBCfg = db.cfg.DataDbCfg().Clone()
dbConn, err := engine.NewDataDBConn(db.cfg.DataDbCfg().Type,
db.cfg.DataDbCfg().Host, db.cfg.DataDbCfg().Port,
@@ -81,8 +81,8 @@ func (db *DataDBService) Start(_ *utils.SyncedChan, registry *servmanager.Servic
// Reload handles the change of config
func (db *DataDBService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
db.Lock()
defer db.Unlock()
db.mu.Lock()
defer db.mu.Unlock()
if db.needsConnectionReload() {
var d engine.DataDBDriver
d, err = engine.NewDataDBConn(db.cfg.DataDbCfg().Type,
@@ -126,8 +126,8 @@ func (db *DataDBService) Shutdown(registry *servmanager.ServiceRegistry) error {
return err
}
}
db.Lock()
defer db.Unlock()
db.mu.Lock()
defer db.mu.Unlock()
db.dm.DataDB().Close()
return nil
}

View File

@@ -39,7 +39,7 @@ func NewDiameterAgent(cfg *config.CGRConfig) *DiameterAgent {
// DiameterAgent implements Agent interface
type DiameterAgent struct {
sync.RWMutex
mu sync.RWMutex
cfg *config.CGRConfig
stopChan chan struct{}
@@ -67,8 +67,8 @@ func (s *DiameterAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.
cm := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager()
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
return s.start(fs, cm, caps, shutdown)
}
@@ -94,8 +94,8 @@ func (s *DiameterAgent) start(filterS *engine.FilterS, cm *engine.ConnManager, c
// Reload handles the change of config
func (s *DiameterAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
if s.lnet == s.cfg.DiameterAgentCfg().ListenNet &&
s.laddr == s.cfg.DiameterAgentCfg().Listen {
return
@@ -120,10 +120,10 @@ func (s *DiameterAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager
// Shutdown stops the service
func (s *DiameterAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
s.Lock()
s.mu.Lock()
close(s.stopChan)
s.da = nil
s.Unlock()
s.mu.Unlock()
return // no shutdown for the momment
}

View File

@@ -37,7 +37,7 @@ func NewDNSAgent(cfg *config.CGRConfig) *DNSAgent {
// DNSAgent implements Agent interface
type DNSAgent struct {
sync.RWMutex
mu sync.RWMutex
cfg *config.CGRConfig
stopChan chan struct{}
@@ -61,8 +61,8 @@ func (dns *DNSAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.Ser
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
dns.Lock()
defer dns.Unlock()
dns.mu.Lock()
defer dns.mu.Unlock()
dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.FilterS(), cms.ConnManager())
if err != nil {
dns.dns = nil
@@ -87,8 +87,8 @@ func (dns *DNSAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager.Se
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
dns.Lock()
defer dns.Unlock()
dns.mu.Lock()
defer dns.mu.Unlock()
if dns.dns != nil {
close(dns.stopChan)
@@ -122,8 +122,8 @@ func (dns *DNSAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
return
}
close(dns.stopChan)
dns.Lock()
defer dns.Unlock()
dns.mu.Lock()
defer dns.mu.Unlock()
dns.dns = nil
return
}

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/ees"
"github.com/cgrates/cgrates/engine"
@@ -39,43 +38,12 @@ func NewEventExporterService(cfg *config.CGRConfig) *EventExporterService {
// EventExporterService is the service structure for EventExporterS
type EventExporterService struct {
mu sync.RWMutex
cfg *config.CGRConfig
eeS *ees.EeS
cl *commonlisteners.CommonListenerS
mu sync.RWMutex
cfg *config.CGRConfig
eeS *ees.EeS
stateDeps *StateDependencies // channel subscriptions for state changes
}
// ServiceName returns the service name
func (es *EventExporterService) ServiceName() string {
return utils.EEs
}
// ShouldRun returns if the service should be running
func (es *EventExporterService) ShouldRun() (should bool) {
return es.cfg.EEsCfg().Enabled
}
// Reload handles the change of config
func (es *EventExporterService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error {
es.mu.Lock()
defer es.mu.Unlock()
es.eeS.ClearExporterCache()
return es.eeS.SetupExporterCache()
}
// Shutdown stops the service
func (es *EventExporterService) Shutdown(_ *servmanager.ServiceRegistry) error {
es.mu.Lock()
defer es.mu.Unlock()
es.eeS.ClearExporterCache()
es.eeS = nil
es.cl.RpcUnregisterName(utils.EeSv1)
return nil
}
// Start should handle the service start
func (es *EventExporterService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
@@ -88,7 +56,7 @@ func (es *EventExporterService) Start(_ *utils.SyncedChan, registry *servmanager
if err != nil {
return err
}
es.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
@@ -102,11 +70,40 @@ func (es *EventExporterService) Start(_ *utils.SyncedChan, registry *servmanager
srv, _ := engine.NewServiceWithPing(es.eeS, utils.EeSv1, utils.V1Prfx)
// srv, _ := birpc.NewService(es.rpc, "", false)
es.cl.RpcRegister(srv)
cl.RpcRegister(srv)
cms.AddInternalConn(utils.EEs, srv)
return nil
}
// Reload handles the change of config
func (es *EventExporterService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error {
es.mu.Lock()
defer es.mu.Unlock()
es.eeS.ClearExporterCache()
return es.eeS.SetupExporterCache()
}
// Shutdown stops the service
func (es *EventExporterService) Shutdown(registry *servmanager.ServiceRegistry) error {
es.mu.Lock()
defer es.mu.Unlock()
es.eeS.ClearExporterCache()
es.eeS = nil
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
cl.RpcUnregisterName(utils.EeSv1)
return nil
}
// ServiceName returns the service name
func (es *EventExporterService) ServiceName() string {
return utils.EEs
}
// ShouldRun returns if the service should be running
func (es *EventExporterService) ShouldRun() (should bool) {
return es.cfg.EEsCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (es *EventExporterService) StateChan(stateID string) chan struct{} {
return es.stateDeps.StateChan(stateID)

View File

@@ -22,7 +22,6 @@ import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/efs"
"github.com/cgrates/cgrates/engine"
@@ -32,15 +31,11 @@ import (
// ExportFailoverService is the service structure for ExportFailover
type ExportFailoverService struct {
sync.Mutex
efS *efs.EfS
cl *commonlisteners.CommonListenerS
srv *birpc.Service
stopChan chan struct{}
cfg *config.CGRConfig
mu sync.Mutex
efS *efs.EfS
srv *birpc.Service
stopChan chan struct{}
cfg *config.CGRConfig
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -53,54 +48,59 @@ func NewExportFailoverService(cfg *config.CGRConfig) *ExportFailoverService {
}
// Start should handle the service start
func (efServ *ExportFailoverService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
func (s *ExportFailoverService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
},
registry, efServ.cfg.GeneralCfg().ConnectTimeout)
registry, s.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
efServ.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
efServ.Lock()
defer efServ.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
efServ.efS = efs.NewEfs(efServ.cfg, cms.ConnManager())
efServ.stopChan = make(chan struct{})
efServ.srv, _ = engine.NewServiceWithPing(efServ.efS, utils.EfSv1, utils.V1Prfx)
efServ.cl.RpcRegister(efServ.srv)
cms.AddInternalConn(utils.EFs, efServ.srv)
s.efS = efs.NewEfs(s.cfg, cms.ConnManager())
s.stopChan = make(chan struct{})
s.srv, _ = engine.NewServiceWithPing(s.efS, utils.EfSv1, utils.V1Prfx)
cl.RpcRegister(s.srv)
cms.AddInternalConn(utils.EFs, s.srv)
return
}
// Reload handles the change of config
func (efServ *ExportFailoverService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
func (s *ExportFailoverService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
return
}
// Shutdown stops the service
func (efServ *ExportFailoverService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
efServ.srv = nil
close(efServ.stopChan)
func (s *ExportFailoverService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
s.mu.Lock()
defer s.mu.Unlock()
s.srv = nil
close(s.stopChan)
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
cl.RpcUnregisterName(utils.EfSv1)
// NEXT SHOULD EXPORT ALL THE SHUTDOWN LOGGERS TO WRITE
return
}
// ShouldRun returns if the service should be running
func (efServ *ExportFailoverService) ShouldRun() bool {
return efServ.cfg.EFsCfg().Enabled
func (s *ExportFailoverService) ShouldRun() bool {
return s.cfg.EFsCfg().Enabled
}
// ServiceName returns the service name
func (efServ *ExportFailoverService) ServiceName() string {
func (s *ExportFailoverService) ServiceName() string {
return utils.EFs
}
// StateChan returns signaling channel of specific state
func (efServ *ExportFailoverService) StateChan(stateID string) chan struct{} {
return efServ.stateDeps.StateChan(stateID)
func (s *ExportFailoverService) StateChan(stateID string) chan struct{} {
return s.stateDeps.StateChan(stateID)
}

View File

@@ -22,7 +22,6 @@ import (
"fmt"
"sync"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/ers"
@@ -41,15 +40,11 @@ func NewEventReaderService(cfg *config.CGRConfig) *EventReaderService {
// EventReaderService implements Service interface
type EventReaderService struct {
sync.RWMutex
cfg *config.CGRConfig
ers *ers.ERService
cl *commonlisteners.CommonListenerS
rldChan chan struct{}
stopChan chan struct{}
mu sync.RWMutex
cfg *config.CGRConfig
ers *ers.ERService
rldChan chan struct{}
stopChan chan struct{}
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -65,12 +60,12 @@ func (erS *EventReaderService) Start(shutdown *utils.SyncedChan, registry *servm
if err != nil {
return err
}
erS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
erS.Lock()
defer erS.Unlock()
erS.mu.Lock()
defer erS.mu.Unlock()
// remake the stop chan
erS.stopChan = make(chan struct{})
@@ -83,7 +78,7 @@ func (erS *EventReaderService) Start(shutdown *utils.SyncedChan, registry *servm
if err != nil {
return err
}
erS.cl.RpcRegister(srv)
cl.RpcRegister(srv)
cms.AddInternalConn(utils.ERs, srv)
return
}
@@ -98,19 +93,20 @@ func (erS *EventReaderService) listenAndServe(ers *ers.ERService, stopChan, rldC
// Reload handles the change of config
func (erS *EventReaderService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
erS.RLock()
erS.mu.RLock()
erS.rldChan <- struct{}{}
erS.RUnlock()
erS.mu.RUnlock()
return
}
// Shutdown stops the service
func (erS *EventReaderService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
erS.Lock()
defer erS.Unlock()
func (erS *EventReaderService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
erS.mu.Lock()
defer erS.mu.Unlock()
close(erS.stopChan)
erS.ers = nil
erS.cl.RpcUnregisterName(utils.ErSv1)
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
cl.RpcUnregisterName(utils.ErSv1)
return
}

View File

@@ -19,9 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package services
import (
"sync"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/guardian"
@@ -39,9 +36,7 @@ func NewGuardianService(cfg *config.CGRConfig) *GuardianService {
// GuardianService implements Service interface.
type GuardianService struct {
mu sync.RWMutex
cfg *config.CGRConfig
cl *commonlisteners.CommonListenerS
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -56,15 +51,12 @@ func (s *GuardianService) Start(_ *utils.SyncedChan, registry *servmanager.Servi
if err != nil {
return err
}
s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
s.mu.Lock()
defer s.mu.Unlock()
svcs, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true)
for _, svc := range svcs {
s.cl.RpcRegister(svc)
cl.RpcRegister(svc)
}
cms.AddInternalConn(utils.GuardianS, svcs)
return nil
@@ -76,10 +68,9 @@ func (s *GuardianService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegi
}
// Shutdown stops the service.
func (s *GuardianService) Shutdown(_ *servmanager.ServiceRegistry) error {
s.mu.Lock()
defer s.mu.Unlock()
s.cl.RpcUnregisterName(utils.GuardianSv1)
func (s *GuardianService) Shutdown(registry *servmanager.ServiceRegistry) error {
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
cl.RpcUnregisterName(utils.GuardianSv1)
return nil
}

View File

@@ -22,7 +22,6 @@ import (
"sync"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
@@ -38,11 +37,9 @@ func NewHTTPAgent(cfg *config.CGRConfig) *HTTPAgent {
// HTTPAgent implements Agent interface
type HTTPAgent struct {
sync.RWMutex
mu sync.RWMutex
cfg *config.CGRConfig
cl *commonlisteners.CommonListenerS
// we can realy stop the HTTPAgent so keep a flag
// if we registerd the handlers
started bool
@@ -63,16 +60,16 @@ func (ha *HTTPAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceReg
return err
}
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager()
fs := srvDeps[utils.FilterS].(*FilterService)
cm := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager()
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
ha.Lock()
defer ha.Unlock()
ha.mu.Lock()
defer ha.mu.Unlock()
ha.started = true
for _, agntCfg := range ha.cfg.HTTPAgentCfg() {
cl.RegisterHttpHandler(agntCfg.URL,
agents.NewHTTPAgent(cms, agntCfg.SessionSConns, fs.FilterS(),
agents.NewHTTPAgent(cm, agntCfg.SessionSConns, fs,
ha.cfg.GeneralCfg().DefaultTenant, agntCfg.RequestPayload,
agntCfg.ReplyPayload, agntCfg.RequestProcessors))
}
@@ -86,9 +83,9 @@ func (ha *HTTPAgent) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry)
// Shutdown stops the service
func (ha *HTTPAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
ha.Lock()
ha.mu.Lock()
ha.started = false
ha.Unlock()
ha.mu.Unlock()
return // no shutdown for the momment
}

View File

@@ -39,10 +39,9 @@ func NewJanusAgent(cfg *config.CGRConfig) *JanusAgent {
// JanusAgent implements Service interface
type JanusAgent struct {
sync.RWMutex
mu sync.RWMutex
cfg *config.CGRConfig
jA *agents.JanusAgent
jA *agents.JanusAgent
// we can realy stop the JanusAgent so keep a flag
// if we registerd the jandlers
@@ -67,9 +66,9 @@ func (ja *JanusAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
ja.Lock()
ja.mu.Lock()
defer ja.mu.Unlock()
if ja.started {
ja.Unlock()
return utils.ErrServiceAlreadyRunning
}
ja.jA, err = agents.NewJanusAgent(ja.cfg, cms.ConnManager(), fs.FilterS())
@@ -89,7 +88,6 @@ func (ja *JanusAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe
cl.RegisterHttpHandler(fmt.Sprintf("POST %s/{sessionID}/{handleID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.HandlePlugin))
ja.started = true
ja.Unlock()
return
}
@@ -100,10 +98,10 @@ func (ja *JanusAgent) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry
// Shutdown stops the service
func (ja *JanusAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
ja.Lock()
ja.mu.Lock()
err = ja.jA.Shutdown()
ja.started = false
ja.Unlock()
ja.mu.Unlock()
return // no shutdown for the momment
}

View File

@@ -23,7 +23,6 @@ import (
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/loaders"
@@ -43,10 +42,9 @@ func NewLoaderService(cfg *config.CGRConfig, preloadIDs []string) *LoaderService
// LoaderService implements Service interface
type LoaderService struct {
sync.RWMutex
mu sync.RWMutex
cfg *config.CGRConfig
ldrs *loaders.LoaderS
cl *commonlisteners.CommonListenerS
preloadIDs []string
stopChan chan struct{}
stateDeps *StateDependencies // channel subscriptions for state changes
@@ -65,13 +63,13 @@ func (s *LoaderService) Start(_ *utils.SyncedChan, registry *servmanager.Service
if err != nil {
return err
}
s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager()
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
s.ldrs = loaders.NewLoaderS(s.cfg, dbs, fs, cms.ConnManager())
if !s.ldrs.Enabled() {
@@ -97,7 +95,7 @@ func (s *LoaderService) Start(_ *utils.SyncedChan, registry *servmanager.Service
srv, _ := engine.NewService(s.ldrs)
// srv, _ := birpc.NewService(apis.NewLoaderSv1(ldrs.ldrs), "", false)
for _, svc := range srv {
s.cl.RpcRegister(svc)
cl.RpcRegister(svc)
}
cms.AddInternalConn(utils.LoaderS, srv)
return nil
@@ -121,21 +119,21 @@ func (s *LoaderService) Reload(_ *utils.SyncedChan, registry *servmanager.Servic
close(s.stopChan)
s.stopChan = make(chan struct{})
s.RLock()
defer s.RUnlock()
s.mu.RLock()
defer s.mu.RUnlock()
s.ldrs.Reload(dbs, fs, cms)
return s.ldrs.ListenAndServe(s.stopChan)
}
// Shutdown stops the service
func (s *LoaderService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) {
s.Lock()
s.ldrs = nil
func (s *LoaderService) Shutdown(registry *servmanager.ServiceRegistry) error {
s.mu.Lock()
defer s.mu.Unlock()
close(s.stopChan)
s.cl.RpcUnregisterName(utils.LoaderSv1)
s.Unlock()
return
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
cl.RpcUnregisterName(utils.LoaderSv1)
return nil
}
// ServiceName returns the service name
@@ -148,11 +146,6 @@ func (s *LoaderService) ShouldRun() bool {
return s.cfg.LoaderCfg().Enabled()
}
// GetLoaderS returns the initialized LoaderService
func (s *LoaderService) GetLoaderS() *loaders.LoaderS {
return s.ldrs
}
// StateChan returns signaling channel of specific state
func (s *LoaderService) StateChan(stateID string) chan struct{} {
return s.stateDeps.StateChan(stateID)

View File

@@ -19,8 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package services
import (
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/efs"
@@ -40,10 +38,8 @@ func NewLoggerService(cfg *config.CGRConfig, loggerType string) *LoggerService {
// LoggerService implements Service interface.
type LoggerService struct {
mu sync.RWMutex
cfg *config.CGRConfig
stateDeps *StateDependencies // channel subscriptions for state changes
cfg *config.CGRConfig
stateDeps *StateDependencies // channel subscriptions for state changes
loggerType string
}

View File

@@ -38,16 +38,13 @@ func NewRadiusAgent(cfg *config.CGRConfig) *RadiusAgent {
// RadiusAgent implements Agent interface
type RadiusAgent struct {
sync.RWMutex
cfg *config.CGRConfig
stopChan chan struct{}
rad *agents.RadiusAgent
lnet string
lauth string
lacct string
mu sync.RWMutex
cfg *config.CGRConfig
stopChan chan struct{}
rad *agents.RadiusAgent
lnet string
lauth string
lacct string
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -65,8 +62,8 @@ func (rad *RadiusAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
rad.Lock()
defer rad.Unlock()
rad.mu.Lock()
defer rad.mu.Unlock()
rad.lnet = rad.cfg.RadiusAgentCfg().ListenNet
rad.lauth = rad.cfg.RadiusAgentCfg().ListenAuth
@@ -108,10 +105,10 @@ func (rad *RadiusAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
}
func (rad *RadiusAgent) shutdown() {
rad.Lock()
rad.mu.Lock()
defer rad.mu.Unlock()
close(rad.stopChan)
rad.rad = nil
rad.Unlock()
}
// ServiceName returns the service name

View File

@@ -23,7 +23,6 @@ import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -39,12 +38,9 @@ func NewRankingService(cfg *config.CGRConfig) *RankingService {
}
type RankingService struct {
sync.RWMutex
cfg *config.CGRConfig
ran *engine.RankingS
cl *commonlisteners.CommonListenerS
mu sync.RWMutex
cfg *config.CGRConfig
ran *engine.RankingS
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -62,7 +58,7 @@ func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanag
if err != nil {
return err
}
ran.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
@@ -73,8 +69,8 @@ func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanag
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
ran.Lock()
defer ran.Unlock()
ran.mu.Lock()
defer ran.mu.Unlock()
ran.ran = engine.NewRankingS(dbs.DataManager(), cms.ConnManager(), fs.FilterS(), ran.cfg)
if err := ran.ran.StartRankingS(context.TODO()); err != nil {
return err
@@ -84,7 +80,7 @@ func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanag
return err
}
for _, s := range srv {
ran.cl.RpcRegister(s)
cl.RpcRegister(s)
}
cms.AddInternalConn(utils.RankingS, srv)
return nil
@@ -92,19 +88,20 @@ func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanag
// Reload handles the change of config
func (ran *RankingService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
ran.Lock()
ran.mu.Lock()
ran.ran.Reload(context.TODO())
ran.Unlock()
ran.mu.Unlock()
return
}
// Shutdown stops the service
func (ran *RankingService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
ran.Lock()
defer ran.Unlock()
func (ran *RankingService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
ran.mu.Lock()
defer ran.mu.Unlock()
ran.ran.StopRankingS()
ran.ran = nil
ran.cl.RpcUnregisterName(utils.RankingSv1)
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
cl.RpcUnregisterName(utils.RankingSv1)
return
}

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/rates"
@@ -40,41 +39,14 @@ func NewRateService(cfg *config.CGRConfig) *RateService {
// RateService is the service structure for RateS
type RateService struct {
sync.RWMutex
mu sync.RWMutex
cfg *config.CGRConfig
rateS *rates.RateS
cl *commonlisteners.CommonListenerS
rldChan chan struct{}
stopChan chan struct{}
stateDeps *StateDependencies // channel subscriptions for state changes
}
// ServiceName returns the service name
func (rs *RateService) ServiceName() string {
return utils.RateS
}
// ShouldRun returns if the service should be running
func (rs *RateService) ShouldRun() (should bool) {
return rs.cfg.RateSCfg().Enabled
}
// Reload handles the change of config
func (rs *RateService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (_ error) {
rs.rldChan <- struct{}{}
return
}
// Shutdown stops the service
func (rs *RateService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
rs.Lock()
defer rs.Unlock()
close(rs.stopChan)
rs.rateS = nil
rs.cl.RpcUnregisterName(utils.RateSv1)
return
}
// Start should handle the service start
func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
@@ -89,7 +61,7 @@ func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.S
if err != nil {
return err
}
rs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
@@ -101,9 +73,9 @@ func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.S
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager()
rs.Lock()
rs.mu.Lock()
rs.rateS = rates.NewRateS(rs.cfg, fs, dbs)
rs.Unlock()
rs.mu.Unlock()
rs.stopChan = make(chan struct{})
go rs.rateS.ListenAndServe(rs.stopChan, rs.rldChan)
@@ -113,12 +85,39 @@ func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.S
return err
}
// srv, _ := birpc.NewService(apis.NewRateSv1(rs.rateS), "", false)
rs.cl.RpcRegister(srv)
cl.RpcRegister(srv)
cms.AddInternalConn(utils.RateS, srv)
return
}
// Reload handles the change of config
func (rs *RateService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (_ error) {
rs.rldChan <- struct{}{}
return
}
// Shutdown stops the service
func (rs *RateService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
rs.mu.Lock()
defer rs.mu.Unlock()
close(rs.stopChan)
rs.rateS = nil
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
cl.RpcUnregisterName(utils.RateSv1)
return
}
// StateChan returns signaling channel of specific state
func (rs *RateService) StateChan(stateID string) chan struct{} {
return rs.stateDeps.StateChan(stateID)
}
// ServiceName returns the service name
func (rs *RateService) ServiceName() string {
return utils.RateS
}
// ShouldRun returns if the service should be running
func (rs *RateService) ShouldRun() (should bool) {
return rs.cfg.RateSCfg().Enabled
}

View File

@@ -37,21 +37,18 @@ func NewRegistrarCService(cfg *config.CGRConfig) *RegistrarCService {
// RegistrarCService implements Service interface
type RegistrarCService struct {
sync.RWMutex
cfg *config.CGRConfig
dspS *registrarc.RegistrarCService
stopChan chan struct{}
rldChan chan struct{}
mu sync.RWMutex
cfg *config.CGRConfig
dspS *registrarc.RegistrarCService
stopChan chan struct{}
rldChan chan struct{}
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
func (dspS *RegistrarCService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
dspS.Lock()
defer dspS.Unlock()
dspS.mu.Lock()
defer dspS.mu.Unlock()
cms, err := WaitForServiceState(utils.StateServiceUP, utils.ConnManager, registry, dspS.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
@@ -73,11 +70,11 @@ func (dspS *RegistrarCService) Reload(_ *utils.SyncedChan, _ *servmanager.Servic
// Shutdown stops the service
func (dspS *RegistrarCService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
dspS.Lock()
dspS.mu.Lock()
defer dspS.mu.Unlock()
close(dspS.stopChan)
dspS.dspS.Shutdown()
dspS.dspS = nil
dspS.Unlock()
return
}

View File

@@ -22,7 +22,6 @@ import (
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -39,12 +38,9 @@ func NewResourceService(cfg *config.CGRConfig) *ResourceService {
// ResourceService implements Service interface
type ResourceService struct {
sync.RWMutex
cfg *config.CGRConfig
reS *engine.ResourceS
cl *commonlisteners.CommonListenerS
mu sync.RWMutex
cfg *config.CGRConfig
reS *engine.ResourceS
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -62,7 +58,7 @@ func (reS *ResourceService) Start(shutdown *utils.SyncedChan, registry *servmana
if err != nil {
return err
}
reS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
@@ -74,14 +70,14 @@ func (reS *ResourceService) Start(shutdown *utils.SyncedChan, registry *servmana
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
reS.Lock()
defer reS.Unlock()
reS.mu.Lock()
defer reS.mu.Unlock()
reS.reS = engine.NewResourceService(dbs.DataManager(), reS.cfg, fs.FilterS(), cms.ConnManager())
reS.reS.StartLoop(context.TODO())
srv, _ := engine.NewService(reS.reS)
// srv, _ := birpc.NewService(apis.NewResourceSv1(reS.reS), "", false)
for _, s := range srv {
reS.cl.RpcRegister(s)
cl.RpcRegister(s)
}
cms.AddInternalConn(utils.ResourceS, srv)
return
@@ -89,19 +85,20 @@ func (reS *ResourceService) Start(shutdown *utils.SyncedChan, registry *servmana
// Reload handles the change of config
func (reS *ResourceService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
reS.Lock()
reS.mu.Lock()
reS.reS.Reload(context.TODO())
reS.Unlock()
reS.mu.Unlock()
return
}
// Shutdown stops the service
func (reS *ResourceService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
reS.Lock()
defer reS.Unlock()
func (reS *ResourceService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
reS.mu.Lock()
defer reS.mu.Unlock()
reS.reS.Shutdown(context.TODO()) //we don't verify the error because shutdown never returns an error
reS.reS = nil
reS.cl.RpcUnregisterName(utils.ResourceSv1)
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
cl.RpcUnregisterName(utils.ResourceSv1)
return
}

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -38,12 +37,9 @@ func NewRouteService(cfg *config.CGRConfig) *RouteService {
// RouteService implements Service interface
type RouteService struct {
sync.RWMutex
cfg *config.CGRConfig
routeS *engine.RouteS
cl *commonlisteners.CommonListenerS
mu sync.RWMutex
cfg *config.CGRConfig
routeS *engine.RouteS
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -61,7 +57,7 @@ func (routeS *RouteService) Start(shutdown *utils.SyncedChan, registry *servmana
if err != nil {
return err
}
routeS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
@@ -72,13 +68,13 @@ func (routeS *RouteService) Start(shutdown *utils.SyncedChan, registry *servmana
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
routeS.Lock()
defer routeS.Unlock()
routeS.mu.Lock()
defer routeS.mu.Unlock()
routeS.routeS = engine.NewRouteService(dbs.DataManager(), fs.FilterS(), routeS.cfg, cms.ConnManager())
srv, _ := engine.NewService(routeS.routeS)
// srv, _ := birpc.NewService(apis.NewRouteSv1(routeS.routeS), "", false)
for _, s := range srv {
routeS.cl.RpcRegister(s)
cl.RpcRegister(s)
}
cms.AddInternalConn(utils.RouteS, srv)
return
@@ -90,11 +86,12 @@ func (routeS *RouteService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRe
}
// Shutdown stops the service
func (routeS *RouteService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
routeS.Lock()
defer routeS.Unlock()
func (routeS *RouteService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
routeS.mu.Lock()
defer routeS.mu.Unlock()
routeS.routeS = nil
routeS.cl.RpcUnregisterName(utils.RouteSv1)
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
cl.RpcUnregisterName(utils.RouteSv1)
return
}

View File

@@ -41,16 +41,12 @@ func NewSessionService(cfg *config.CGRConfig) *SessionService {
// SessionService implements Service interface
type SessionService struct {
sync.RWMutex
sm *sessions.SessionS
cl *commonlisteners.CommonListenerS
mu sync.RWMutex
sm *sessions.SessionS
bircpEnabled bool // to stop birpc server if needed
stopChan chan struct{}
cfg *config.CGRConfig
stateDeps *StateDependencies // channel subscriptions for state changes
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
@@ -66,15 +62,15 @@ func (smg *SessionService) Start(shutdown *utils.SyncedChan, registry *servmanag
if err != nil {
return err
}
smg.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager()
smg.Lock()
defer smg.Unlock()
smg.mu.Lock()
defer smg.mu.Unlock()
smg.sm = sessions.NewSessionS(smg.cfg, dbs.DataManager(), fs.FilterS(), cms.ConnManager())
smg.sm = sessions.NewSessionS(smg.cfg, dbs, fs, cms.ConnManager())
//start sync session in a separate goroutine
smg.stopChan = make(chan struct{})
go smg.sm.ListenAndServe(smg.stopChan)
@@ -84,28 +80,28 @@ func (smg *SessionService) Start(shutdown *utils.SyncedChan, registry *servmanag
srv, _ := engine.NewServiceWithName(smg.sm, utils.SessionS, true) // methods with multiple options
// srv, _ := birpc.NewService(apis.NewSessionSv1(smg.sm), utils.EmptyString, false) // methods with multiple options
for _, s := range srv {
smg.cl.RpcRegister(s)
cl.RpcRegister(s)
}
// Register BiRpc handlers
if smg.cfg.SessionSCfg().ListenBijson != utils.EmptyString {
smg.bircpEnabled = true
for n, s := range srv {
smg.cl.BiRPCRegisterName(n, s)
cl.BiRPCRegisterName(n, s)
}
// run this in it's own goroutine
go smg.start(shutdown)
go smg.start(shutdown, cl)
}
cms.AddInternalConn(utils.SessionS, srv)
return
}
func (smg *SessionService) start(shutdown *utils.SyncedChan) (err error) {
if err := smg.cl.ServeBiRPC(smg.cfg.SessionSCfg().ListenBijson,
func (smg *SessionService) start(shutdown *utils.SyncedChan, cl *commonlisteners.CommonListenerS) (err error) {
if err := cl.ServeBiRPC(smg.cfg.SessionSCfg().ListenBijson,
smg.cfg.SessionSCfg().ListenBigob, smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> serve BiRPC error: %s!", utils.SessionS, err))
smg.Lock()
smg.mu.Lock()
smg.bircpEnabled = false
smg.Unlock()
smg.mu.Unlock()
shutdown.CloseOnce()
}
return
@@ -117,19 +113,20 @@ func (smg *SessionService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceReg
}
// Shutdown stops the service
func (smg *SessionService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
smg.Lock()
defer smg.Unlock()
func (smg *SessionService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
smg.mu.Lock()
defer smg.mu.Unlock()
close(smg.stopChan)
if err = smg.sm.Shutdown(); err != nil {
return
}
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
if smg.bircpEnabled {
smg.cl.StopBiRPC()
cl.StopBiRPC()
smg.bircpEnabled = false
}
smg.sm = nil
smg.cl.RpcUnregisterName(utils.SessionSv1)
cl.RpcUnregisterName(utils.SessionSv1)
// smg.server.BiRPCUnregisterName(utils.SessionSv1)
return
}

View File

@@ -38,12 +38,10 @@ func NewSIPAgent(cfg *config.CGRConfig) *SIPAgent {
// SIPAgent implements Agent interface
type SIPAgent struct {
sync.RWMutex
cfg *config.CGRConfig
mu sync.RWMutex
cfg *config.CGRConfig
sip *agents.SIPAgent
oldListen string
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -61,8 +59,8 @@ func (sip *SIPAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.Ser
cm := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager()
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
sip.Lock()
defer sip.Unlock()
sip.mu.Lock()
defer sip.mu.Unlock()
sip.oldListen = sip.cfg.SIPAgentCfg().Listen
sip.sip, err = agents.NewSIPAgent(cm, sip.cfg, fs)
if err != nil {
@@ -84,19 +82,19 @@ func (sip *SIPAgent) Reload(shutdown *utils.SyncedChan, _ *servmanager.ServiceRe
if sip.oldListen == sip.cfg.SIPAgentCfg().Listen {
return
}
sip.Lock()
sip.mu.Lock()
sip.sip.Shutdown()
sip.oldListen = sip.cfg.SIPAgentCfg().Listen
sip.sip.InitStopChan()
sip.Unlock()
sip.mu.Unlock()
go sip.listenAndServe(shutdown)
return
}
// Shutdown stops the service
func (sip *SIPAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
sip.Lock()
defer sip.Unlock()
sip.mu.Lock()
defer sip.mu.Unlock()
sip.sip.Shutdown()
sip.sip = nil
return

View File

@@ -22,7 +22,6 @@ import (
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -39,12 +38,9 @@ func NewStatService(cfg *config.CGRConfig) *StatService {
// StatService implements Service interface
type StatService struct {
sync.RWMutex
cfg *config.CGRConfig
sts *engine.StatS
cl *commonlisteners.CommonListenerS
mu sync.RWMutex
cfg *config.CGRConfig
sts *engine.StatS
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -62,7 +58,7 @@ func (sts *StatService) Start(shutdown *utils.SyncedChan, registry *servmanager.
if err != nil {
return err
}
sts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
@@ -74,14 +70,14 @@ func (sts *StatService) Start(shutdown *utils.SyncedChan, registry *servmanager.
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
sts.Lock()
defer sts.Unlock()
sts.mu.Lock()
defer sts.mu.Unlock()
sts.sts = engine.NewStatService(dbs.DataManager(), sts.cfg, fs.FilterS(), cms.ConnManager())
sts.sts.StartLoop(context.TODO())
srv, _ := engine.NewService(sts.sts)
// srv, _ := birpc.NewService(apis.NewStatSv1(sts.sts), "", false)
for _, s := range srv {
sts.cl.RpcRegister(s)
cl.RpcRegister(s)
}
cms.AddInternalConn(utils.StatS, srv)
return
@@ -89,19 +85,20 @@ func (sts *StatService) Start(shutdown *utils.SyncedChan, registry *servmanager.
// Reload handles the change of config
func (sts *StatService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
sts.Lock()
sts.mu.Lock()
sts.sts.Reload(context.TODO())
sts.Unlock()
sts.mu.Unlock()
return
}
// Shutdown stops the service
func (sts *StatService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
sts.Lock()
defer sts.Unlock()
func (sts *StatService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
sts.mu.Lock()
defer sts.mu.Unlock()
sts.sts.Shutdown(context.TODO())
sts.sts = nil
sts.cl.RpcUnregisterName(utils.StatSv1)
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
cl.RpcUnregisterName(utils.StatSv1)
return
}

View File

@@ -39,7 +39,7 @@ func NewStorDBService(cfg *config.CGRConfig, setVersions bool) *StorDBService {
// StorDBService implements Service interface
type StorDBService struct {
sync.RWMutex
mu sync.RWMutex
cfg *config.CGRConfig
oldDBCfg *config.StorDbCfg
db engine.StorDB
@@ -49,8 +49,8 @@ type StorDBService struct {
// Start should handle the service start
func (db *StorDBService) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
db.Lock()
defer db.Unlock()
db.mu.Lock()
defer db.mu.Unlock()
db.oldDBCfg = db.cfg.StorDbCfg().Clone()
dbConn, err := engine.NewStorDBConn(db.cfg.StorDbCfg().Type, db.cfg.StorDbCfg().Host,
db.cfg.StorDbCfg().Port, db.cfg.StorDbCfg().Name, db.cfg.StorDbCfg().User,
@@ -76,8 +76,8 @@ func (db *StorDBService) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegist
// Reload handles the change of config
func (db *StorDBService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
db.Lock()
defer db.Unlock()
db.mu.Lock()
defer db.mu.Unlock()
if db.needsConnectionReload() {
var d engine.StorDB
if d, err = engine.NewStorDBConn(db.cfg.StorDbCfg().Type, db.cfg.StorDbCfg().Host,
@@ -123,10 +123,10 @@ func (db *StorDBService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegis
// Shutdown stops the service
func (db *StorDBService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) {
db.Lock()
db.mu.Lock()
defer db.mu.Unlock()
db.db.Close()
db.db = nil
db.Unlock()
return
}

View File

@@ -22,7 +22,6 @@ import (
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -39,12 +38,9 @@ func NewThresholdService(cfg *config.CGRConfig) *ThresholdService {
// ThresholdService implements Service interface
type ThresholdService struct {
sync.RWMutex
cfg *config.CGRConfig
thrs *engine.ThresholdS
cl *commonlisteners.CommonListenerS
mu sync.RWMutex
cfg *config.CGRConfig
thrs *engine.ThresholdS
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -62,7 +58,7 @@ func (thrs *ThresholdService) Start(shutdown *utils.SyncedChan, registry *servma
if err != nil {
return err
}
thrs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
@@ -74,35 +70,36 @@ func (thrs *ThresholdService) Start(shutdown *utils.SyncedChan, registry *servma
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
thrs.Lock()
defer thrs.Unlock()
thrs.mu.Lock()
defer thrs.mu.Unlock()
thrs.thrs = engine.NewThresholdService(dbs.DataManager(), thrs.cfg, fs.FilterS(), cms.ConnManager())
thrs.thrs.StartLoop(context.TODO())
srv, _ := engine.NewService(thrs.thrs)
// srv, _ := birpc.NewService(apis.NewThresholdSv1(thrs.thrs), "", false)
for _, s := range srv {
thrs.cl.RpcRegister(s)
cl.RpcRegister(s)
}
cms.AddInternalConn(utils.ThresholdS, srv)
return
}
// Reload handles the change of config
func (thrs *ThresholdService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (_ error) {
thrs.Lock()
func (thrs *ThresholdService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error {
thrs.mu.Lock()
thrs.thrs.Reload(context.TODO())
thrs.Unlock()
return
thrs.mu.Unlock()
return nil
}
// Shutdown stops the service
func (thrs *ThresholdService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) {
thrs.Lock()
defer thrs.Unlock()
func (thrs *ThresholdService) Shutdown(registry *servmanager.ServiceRegistry) error {
thrs.mu.Lock()
defer thrs.mu.Unlock()
thrs.thrs.Shutdown(context.TODO())
thrs.thrs = nil
thrs.cl.RpcUnregisterName(utils.ThresholdSv1)
return
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
cl.RpcUnregisterName(utils.ThresholdSv1)
return nil
}
// ServiceName returns the service name

View File

@@ -22,7 +22,6 @@ import (
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/apis"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/tpes"
@@ -39,10 +38,9 @@ func NewTPeService(cfg *config.CGRConfig) *TPeService {
// TypeService implements Service interface
type TPeService struct {
sync.RWMutex
mu sync.RWMutex
cfg *config.CGRConfig
tpes *tpes.TPeS
cl *commonlisteners.CommonListenerS
srv *birpc.Service
stopChan chan struct{}
stateDeps *StateDependencies // channel subscriptions for state changes
@@ -50,7 +48,6 @@ type TPeService struct {
// Start should handle the service start
func (ts *TPeService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
@@ -61,14 +58,17 @@ func (ts *TPeService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe
if err != nil {
return err
}
ts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cm := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager()
dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager()
ts.mu.Lock()
defer ts.mu.Unlock()
ts.tpes = tpes.NewTPeS(ts.cfg, dbs, cm)
ts.stopChan = make(chan struct{})
ts.srv, _ = birpc.NewService(apis.NewTPeSv1(ts.tpes), utils.EmptyString, false)
ts.cl.RpcRegister(ts.srv)
cl.RpcRegister(ts.srv)
return
}
@@ -78,9 +78,13 @@ func (ts *TPeService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry
}
// Shutdown stops the service
func (ts *TPeService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
func (ts *TPeService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.srv = nil
close(ts.stopChan)
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
cl.RpcUnregisterName(utils.TPeSv1)
return
}

View File

@@ -22,7 +22,6 @@ import (
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -38,12 +37,9 @@ func NewTrendService(cfg *config.CGRConfig) *TrendService {
}
type TrendService struct {
sync.RWMutex
cfg *config.CGRConfig
trs *engine.TrendS
cl *commonlisteners.CommonListenerS
mu sync.RWMutex
cfg *config.CGRConfig
trs *engine.TrendS
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -61,7 +57,7 @@ func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager
if err != nil {
return err
}
trs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
@@ -72,8 +68,8 @@ func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
trs.Lock()
defer trs.Unlock()
trs.mu.Lock()
defer trs.mu.Unlock()
trs.trs = engine.NewTrendService(dbs.DataManager(), trs.cfg, fs.FilterS(), cms.ConnManager())
if err := trs.trs.StartTrendS(context.TODO()); err != nil {
return err
@@ -83,7 +79,7 @@ func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager
return err
}
for _, s := range srv {
trs.cl.RpcRegister(s)
cl.RpcRegister(s)
}
cms.AddInternalConn(utils.TrendS, srv)
return nil
@@ -91,19 +87,20 @@ func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager
// Reload handles the change of config
func (trs *TrendService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
trs.Lock()
trs.mu.Lock()
trs.trs.Reload(context.TODO())
trs.Unlock()
trs.mu.Unlock()
return
}
// Shutdown stops the service
func (trs *TrendService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
trs.Lock()
defer trs.Unlock()
func (trs *TrendService) Shutdown(registry *servmanager.ServiceRegistry) (err error) {
trs.mu.Lock()
defer trs.mu.Unlock()
trs.trs.StopTrendS()
trs.trs = nil
trs.cl.RpcUnregisterName(utils.TrendSv1)
cl := registry.Lookup(utils.CommonListenerS).(*CommonListenerService).CLS()
cl.RpcUnregisterName(utils.TrendSv1)
return
}