Implement ConnManager service

Use it to register internal rpc conns instead of ServManager
DispatcherS now waits for AttributeS to start (only when enabled)
This commit is contained in:
ionutboangiu
2024-12-18 22:56:24 +02:00
committed by Dan Christian Bogos
parent bf3d9a3281
commit e7152dacf8
51 changed files with 687 additions and 933 deletions

View File

@@ -107,13 +107,12 @@ func runCGREngine(fs []string) (err error) {
}()
}
connMgr := engine.NewConnManager(cfg)
// init syslog
if utils.Logger, err = engine.NewLogger(context.TODO(),
utils.FirstNonEmpty(*flags.Logger, cfg.LoggerCfg().Type),
cfg.GeneralCfg().DefaultTenant,
cfg.GeneralCfg().NodeID,
connMgr, cfg); err != nil {
nil, cfg); err != nil {
return fmt.Errorf("Could not initialize syslog connection, err: <%s>", err)
}
efs.SetFailedPostCacheTTL(cfg.EFsCfg().FailedPostsTTL) // init failedPosts to posts loggers/exporters in case of failing
@@ -124,58 +123,57 @@ func runCGREngine(fs []string) (err error) {
utils.DataDB: new(sync.WaitGroup),
}
iServeManagerCh := make(chan birpc.ClientConnector, 1)
connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaServiceManager), utils.ServiceManagerV1, iServeManagerCh)
// ServiceIndexer will share service references to all services
registry := servmanager.NewServiceRegistry()
gvS := services.NewGlobalVarS(cfg)
dmS := services.NewDataDBService(cfg, connMgr, *flags.SetVersions, srvDep)
sdbS := services.NewStorDBService(cfg, *flags.SetVersions)
cls := services.NewCommonListenerService(cfg, caps)
anzS := services.NewAnalyzerService(cfg)
cms := services.NewConnManagerService(cfg)
dmS := services.NewDataDBService(cfg, *flags.SetVersions, srvDep)
sdbS := services.NewStorDBService(cfg, *flags.SetVersions)
configS := services.NewConfigService(cfg)
guardianS := services.NewGuardianService(cfg)
coreS := services.NewCoreService(cfg, caps, cpuPrfF, shdWg)
cacheS := services.NewCacheService(cfg, connMgr)
fltrS := services.NewFilterService(cfg, connMgr)
dspS := services.NewDispatcherService(cfg, connMgr)
ldrs := services.NewLoaderService(cfg, connMgr)
efs := services.NewExportFailoverService(cfg, connMgr)
adminS := services.NewAdminSv1Service(cfg, connMgr)
sessionS := services.NewSessionService(cfg, connMgr)
cacheS := services.NewCacheService(cfg)
fltrS := services.NewFilterService(cfg)
dspS := services.NewDispatcherService(cfg)
ldrs := services.NewLoaderService(cfg)
efs := services.NewExportFailoverService(cfg)
adminS := services.NewAdminSv1Service(cfg)
sessionS := services.NewSessionService(cfg)
attrS := services.NewAttributeService(cfg, dspS)
chrgS := services.NewChargerService(cfg, connMgr)
routeS := services.NewRouteService(cfg, connMgr)
resourceS := services.NewResourceService(cfg, connMgr, srvDep)
trendS := services.NewTrendService(cfg, connMgr, srvDep)
rankingS := services.NewRankingService(cfg, connMgr, srvDep)
thS := services.NewThresholdService(cfg, connMgr, srvDep)
stS := services.NewStatService(cfg, connMgr, srvDep)
erS := services.NewEventReaderService(cfg, connMgr)
dnsAgent := services.NewDNSAgent(cfg, connMgr)
fsAgent := services.NewFreeswitchAgent(cfg, connMgr)
kamAgent := services.NewKamailioAgent(cfg, connMgr)
janusAgent := services.NewJanusAgent(cfg, connMgr)
astAgent := services.NewAsteriskAgent(cfg, connMgr)
radAgent := services.NewRadiusAgent(cfg, connMgr)
diamAgent := services.NewDiameterAgent(cfg, connMgr, caps)
httpAgent := services.NewHTTPAgent(cfg, connMgr)
sipAgent := services.NewSIPAgent(cfg, connMgr)
eeS := services.NewEventExporterService(cfg, connMgr)
cdrS := services.NewCDRServer(cfg, connMgr)
registrarcS := services.NewRegistrarCService(cfg, connMgr)
chrgS := services.NewChargerService(cfg)
routeS := services.NewRouteService(cfg)
resourceS := services.NewResourceService(cfg, srvDep)
trendS := services.NewTrendService(cfg, srvDep)
rankingS := services.NewRankingService(cfg, srvDep)
thS := services.NewThresholdService(cfg, srvDep)
stS := services.NewStatService(cfg, srvDep)
erS := services.NewEventReaderService(cfg)
dnsAgent := services.NewDNSAgent(cfg)
fsAgent := services.NewFreeswitchAgent(cfg)
kamAgent := services.NewKamailioAgent(cfg)
janusAgent := services.NewJanusAgent(cfg)
astAgent := services.NewAsteriskAgent(cfg)
radAgent := services.NewRadiusAgent(cfg)
diamAgent := services.NewDiameterAgent(cfg, caps)
httpAgent := services.NewHTTPAgent(cfg)
sipAgent := services.NewSIPAgent(cfg)
eeS := services.NewEventExporterService(cfg)
cdrS := services.NewCDRServer(cfg)
registrarcS := services.NewRegistrarCService(cfg)
rateS := services.NewRateService(cfg)
actionS := services.NewActionService(cfg, connMgr)
accS := services.NewAccountService(cfg, connMgr)
tpeS := services.NewTPeService(cfg, connMgr)
actionS := services.NewActionService(cfg)
accS := services.NewAccountService(cfg)
tpeS := services.NewTPeService(cfg)
srvManager := servmanager.NewServiceManager(shdWg, connMgr, cfg, registry, []servmanager.Service{
srvManager := servmanager.NewServiceManager(shdWg, cfg, registry, []servmanager.Service{
gvS,
dmS,
sdbS,
cls,
anzS,
cms,
dmS,
sdbS,
configS,
guardianS,
coreS,
@@ -243,7 +241,7 @@ func runCGREngine(fs []string) (err error) {
}()
srvManager.StartServices(shutdown)
cgrInitServiceManagerV1(iServeManagerCh, cfg, srvManager, registry)
cgrInitServiceManagerV1(cfg, srvManager, registry)
if *flags.Preload != utils.EmptyString {
if err = cgrRunPreload(cfg, *flags.Preload, registry); err != nil {
@@ -297,22 +295,24 @@ func cgrRunPreload(cfg *config.CGRConfig, loaderIDs string,
return
}
func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector, cfg *config.CGRConfig,
srvMngr *servmanager.ServiceManager, registry *servmanager.ServiceRegistry) {
cls := registry.Lookup(utils.CommonListenerS).(*services.CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), cfg.GeneralCfg().ConnectTimeout) {
return
}
cl := cls.CLS()
anz := registry.Lookup(utils.AnalyzerS).(*services.AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cfg.GeneralCfg().ConnectTimeout) {
func cgrInitServiceManagerV1(cfg *config.CGRConfig, srvMngr *servmanager.ServiceManager,
registry *servmanager.ServiceRegistry) {
srvDeps, err := services.WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
},
registry, cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
cl := srvDeps[utils.CommonListenerS].(*services.CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*services.ConnManagerService)
srv, _ := birpc.NewService(apis.NewServiceManagerV1(srvMngr), utils.EmptyString, false)
if !cfg.DispatcherSCfg().Enabled {
cl.RpcRegister(srv)
}
iServMngrCh <- anz.GetInternalCodec(srv, utils.ServiceManager)
cms.AddInternalConn(utils.ServiceManager, srv)
}
func cgrStartRPC(cfg *config.CGRConfig, registry *servmanager.ServiceRegistry, shutdown *utils.SyncedChan) {

View File

@@ -115,7 +115,7 @@ var (
ActionSJSON: utils.ActionS,
CoreSJSON: utils.CoreS,
TPeSJSON: utils.TPeS,
RPCConnsJSON: RPCConnsJSON,
RPCConnsJSON: utils.ConnManager,
}
)

View File

@@ -1,5 +1,4 @@
//go:build integration
// +build integration
//go:build flaky
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments

View File

@@ -1,5 +1,4 @@
//go:build integration
// +build integration
//go:build flaky
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments

View File

@@ -122,7 +122,7 @@ func TestAnzDocIT(t *testing.T) {
anzStringQuery(t, client, 7, "", "*lte:~*hdr.RequestID:2")
anzStringQuery(t, client, -1, "", "*gt:~*hdr.RequestDuration:1ms")
anzStringQuery(t, client, 1, `+RequestMethod:"AttributeSv1.ProcessEvent"`, "*notstring:~*rep.Event.Cost:0")
anzStringQuery(t, client, 1, `+RequestMethod:"CoreSv1.Status"`, "*gt:~*rep.goroutines:55")
anzStringQuery(t, client, 1, `+RequestMethod:"CoreSv1.Status"`, "*gt:~*rep.goroutines:47")
}
// anzStringQuery sends an AnalyzerSv1.StringQuery request. First filter represents

View File

@@ -90,6 +90,9 @@ func TestRPCExpIT(t *testing.T) {
}
]
}
},
"efs": {
"enabled": true
}
}`,
DBCfg: engine.InternalDBCfg,

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/accounts"
"github.com/cgrates/cgrates/commonlisteners"
@@ -32,11 +31,9 @@ import (
)
// NewAccountService returns the Account Service
func NewAccountService(cfg *config.CGRConfig,
connMgr *engine.ConnManager) *AccountService {
func NewAccountService(cfg *config.CGRConfig) *AccountService {
return &AccountService{
cfg: cfg,
connMgr: connMgr,
rldChan: make(chan struct{}, 1),
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
@@ -45,17 +42,15 @@ func NewAccountService(cfg *config.CGRConfig,
// AccountService implements Service interface
type AccountService struct {
sync.RWMutex
cfg *config.CGRConfig
acts *accounts.AccountS
cl *commonlisteners.CommonListenerS
rldChan chan struct{}
stopChan chan struct{}
connMgr *engine.ConnManager
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
@@ -63,29 +58,29 @@ func (acts *AccountService) Start(shutdown *utils.SyncedChan, registry *servmana
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
registry, acts.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
acts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheAccounts,
utils.CacheAccountsFilterIndexes); err != nil {
return err
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager()
acts.Lock()
defer acts.Unlock()
acts.acts = accounts.NewAccountS(acts.cfg, fs.FilterS(), acts.connMgr, dbs.DataManager())
acts.acts = accounts.NewAccountS(acts.cfg, fs, cms.ConnManager(), dbs)
acts.stopChan = make(chan struct{})
go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan)
srv, err := engine.NewServiceWithPing(acts.acts, utils.AccountSv1, utils.V1Prfx)
@@ -97,7 +92,7 @@ func (acts *AccountService) Start(shutdown *utils.SyncedChan, registry *servmana
acts.cl.RpcRegister(srv)
}
acts.intRPCconn = anz.GetInternalCodec(srv, utils.AccountS)
cms.AddInternalConn(utils.AccountS, srv)
return
}
@@ -131,8 +126,3 @@ func (acts *AccountService) ShouldRun() bool {
func (acts *AccountService) StateChan(stateID string) chan struct{} {
return acts.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (acts *AccountService) IntRPCConn() birpc.ClientConnector {
return acts.intRPCconn
}

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/actions"
"github.com/cgrates/cgrates/commonlisteners"
@@ -32,10 +31,8 @@ import (
)
// NewActionService returns the Action Service
func NewActionService(cfg *config.CGRConfig,
connMgr *engine.ConnManager) *ActionService {
func NewActionService(cfg *config.CGRConfig) *ActionService {
return &ActionService{
connMgr: connMgr,
cfg: cfg,
rldChan: make(chan struct{}, 1),
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
@@ -45,6 +42,7 @@ func NewActionService(cfg *config.CGRConfig,
// ActionService implements Service interface
type ActionService struct {
sync.RWMutex
cfg *config.CGRConfig
acts *actions.ActionS
cl *commonlisteners.CommonListenerS
@@ -52,11 +50,7 @@ type ActionService struct {
rldChan chan struct{}
stopChan chan struct{}
connMgr *engine.ConnManager
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // share the API object implementing API calls for internal
stateDeps *StateDependencies // channel subscriptions for state changes
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
@@ -64,29 +58,29 @@ func (acts *ActionService) Start(shutdown *utils.SyncedChan, registry *servmanag
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
registry, acts.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
acts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheActionProfiles,
utils.CacheActionProfilesFilterIndexes); err != nil {
return err
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager()
acts.Lock()
defer acts.Unlock()
acts.acts = actions.NewActionS(acts.cfg, fs.FilterS(), dbs.DataManager(), acts.connMgr)
acts.acts = actions.NewActionS(acts.cfg, fs, dbs, cms.ConnManager())
acts.stopChan = make(chan struct{})
go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan)
srv, err := engine.NewServiceWithPing(acts.acts, utils.ActionSv1, utils.V1Prfx)
@@ -97,8 +91,7 @@ func (acts *ActionService) Start(shutdown *utils.SyncedChan, registry *servmanag
if !acts.cfg.DispatcherSCfg().Enabled {
acts.cl.RpcRegister(srv)
}
acts.intRPCconn = anz.GetInternalCodec(srv, utils.ActionS)
cms.AddInternalConn(utils.ActionS, srv)
return
}
@@ -133,8 +126,3 @@ func (acts *ActionService) ShouldRun() bool {
func (acts *ActionService) StateChan(stateID string) chan struct{} {
return acts.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (acts *ActionService) IntRPCConn() birpc.ClientConnector {
return acts.intRPCconn
}

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/apis"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
@@ -31,11 +30,9 @@ import (
)
// NewAPIerSv1Service returns the APIerSv1 Service
func NewAdminSv1Service(cfg *config.CGRConfig,
connMgr *engine.ConnManager) *AdminSv1Service {
func NewAdminSv1Service(cfg *config.CGRConfig) *AdminSv1Service {
return &AdminSv1Service{
cfg: cfg,
connMgr: connMgr,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -43,16 +40,11 @@ func NewAdminSv1Service(cfg *config.CGRConfig,
// AdminSv1Service implements Service interface
type AdminSv1Service struct {
sync.RWMutex
api *apis.AdminSv1
cl *commonlisteners.CommonListenerS
stopChan chan struct{}
connMgr *engine.ConnManager
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // RPC connector with internal APIs
stateDeps *StateDependencies // channel subscriptions for state changes
cfg *config.CGRConfig
api *apis.AdminSv1
cl *commonlisteners.CommonListenerS
stopChan chan struct{}
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -61,9 +53,9 @@ func (apiService *AdminSv1Service) Start(_ *utils.SyncedChan, registry *servmana
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
utils.StorDB,
},
registry, apiService.cfg.GeneralCfg().ConnectTimeout)
@@ -71,15 +63,15 @@ func (apiService *AdminSv1Service) Start(_ *utils.SyncedChan, registry *servmana
return err
}
apiService.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
sdbs := srvDeps[utils.StorDB].(*StorDBService)
apiService.Lock()
defer apiService.Unlock()
apiService.api = apis.NewAdminSv1(apiService.cfg, dbs.DataManager(), apiService.connMgr, fs.FilterS(), sdbs.DB())
apiService.api = apis.NewAdminSv1(apiService.cfg, dbs.DataManager(), cms.ConnManager(), fs.FilterS(), sdbs.DB())
srv, _ := engine.NewService(apiService.api)
// srv, _ := birpc.NewService(apiService.api, "", false)
@@ -93,9 +85,7 @@ func (apiService *AdminSv1Service) Start(_ *utils.SyncedChan, registry *servmana
apiService.cl.RpcRegister(s)
}
}
//backwards compatible
apiService.intRPCconn = anz.GetInternalCodec(srv, utils.AdminSv1)
cms.AddInternalConn(utils.AdminS, srv)
return
}
@@ -128,8 +118,3 @@ func (apiService *AdminSv1Service) ShouldRun() bool {
func (apiService *AdminSv1Service) StateChan(stateID string) chan struct{} {
return apiService.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (apiService *AdminSv1Service) IntRPCConn() birpc.ClientConnector {
return apiService.intRPCconn
}

View File

@@ -35,8 +35,12 @@ import (
// NewAnalyzerService returns the Analyzer Service
func NewAnalyzerService(cfg *config.CGRConfig) *AnalyzerService {
anz := &AnalyzerService{
cfg: cfg,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
cfg: cfg,
stateDeps: NewStateDependencies([]string{
utils.StateServiceInit,
utils.StateServiceUP,
utils.StateServiceDOWN,
}),
}
return anz
}
@@ -44,15 +48,11 @@ func NewAnalyzerService(cfg *config.CGRConfig) *AnalyzerService {
// AnalyzerService implements Service interface
type AnalyzerService struct {
sync.RWMutex
anz *analyzers.AnalyzerS
cl *commonlisteners.CommonListenerS
cancelFunc context.CancelFunc
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // share the API object implementing API calls for internal
stateDeps *StateDependencies // channel subscriptions for state changes
anz *analyzers.AnalyzerS
cl *commonlisteners.CommonListenerS
cancelFunc context.CancelFunc
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -70,6 +70,7 @@ func (anz *AnalyzerService) Start(shutdown *utils.SyncedChan, registry *servmana
if anz.anz, err = analyzers.NewAnalyzerS(anz.cfg); err != nil {
return
}
close(anz.stateDeps.StateChan(utils.StateServiceInit))
anzCtx, cancel := context.WithCancel(context.TODO())
anz.cancelFunc = cancel
go func(a *analyzers.AnalyzerS) {
@@ -146,8 +147,3 @@ func (anz *AnalyzerService) GetInternalCodec(c birpc.ClientConnector, to string)
func (anz *AnalyzerService) StateChan(stateID string) chan struct{} {
return anz.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (anz *AnalyzerService) IntRPCConn() birpc.ClientConnector {
return anz.intRPCconn
}

View File

@@ -22,9 +22,6 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/servmanager"
@@ -32,11 +29,9 @@ import (
)
// NewAsteriskAgent returns the Asterisk Agent
func NewAsteriskAgent(cfg *config.CGRConfig,
connMgr *engine.ConnManager) *AsteriskAgent {
func NewAsteriskAgent(cfg *config.CGRConfig) *AsteriskAgent {
return &AsteriskAgent{
cfg: cfg,
connMgr: connMgr,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -44,18 +39,19 @@ func NewAsteriskAgent(cfg *config.CGRConfig,
// AsteriskAgent implements Agent interface
type AsteriskAgent struct {
sync.RWMutex
cfg *config.CGRConfig
stopChan chan struct{}
smas []*agents.AsteriskAgent
connMgr *engine.ConnManager
intRPCconn birpc.ClientConnector // share the API object implementing API calls for internal
stateDeps *StateDependencies // channel subscriptions for state changes
cfg *config.CGRConfig
stopChan chan struct{}
smas []*agents.AsteriskAgent
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
func (ast *AsteriskAgent) Start(shutdown *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
func (ast *AsteriskAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
cms, err := WaitForServiceState(utils.StateServiceUP, utils.ConnManager, registry, ast.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
ast.Lock()
defer ast.Unlock()
@@ -68,7 +64,7 @@ func (ast *AsteriskAgent) Start(shutdown *utils.SyncedChan, _ *servmanager.Servi
ast.stopChan = make(chan struct{})
ast.smas = make([]*agents.AsteriskAgent, len(ast.cfg.AsteriskAgentCfg().AsteriskConns))
for connIdx := range ast.cfg.AsteriskAgentCfg().AsteriskConns { // Instantiate connections towards asterisk servers
ast.smas[connIdx] = agents.NewAsteriskAgent(ast.cfg, connIdx, ast.connMgr)
ast.smas[connIdx] = agents.NewAsteriskAgent(ast.cfg, connIdx, cms.(*ConnManagerService).ConnManager())
go listenAndServe(ast.smas[connIdx], ast.stopChan)
}
return
@@ -108,8 +104,3 @@ func (ast *AsteriskAgent) ShouldRun() bool {
func (ast *AsteriskAgent) StateChan(stateID string) chan struct{} {
return ast.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (ast *AsteriskAgent) IntRPCConn() birpc.ClientConnector {
return ast.intRPCconn
}

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/apis"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
@@ -43,6 +42,7 @@ func NewAttributeService(cfg *config.CGRConfig,
// AttributeService implements Service interface
type AttributeService struct {
sync.RWMutex
cfg *config.CGRConfig
dspS *DispatcherService
@@ -50,10 +50,7 @@ type AttributeService struct {
cl *commonlisteners.CommonListenerS
rpc *apis.AttributeSv1 // useful on restart
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies
stateDeps *StateDependencies
}
// Start should handle the service start
@@ -61,16 +58,17 @@ func (attrS *AttributeService) Start(shutdown *utils.SyncedChan, registry *servm
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
registry, attrS.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
attrS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheAttributeProfiles,
@@ -79,7 +77,6 @@ func (attrS *AttributeService) Start(shutdown *utils.SyncedChan, registry *servm
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
attrS.Lock()
defer attrS.Unlock()
@@ -104,8 +101,7 @@ func (attrS *AttributeService) Start(shutdown *utils.SyncedChan, registry *servm
}
}()
attrS.intRPCconn = anz.GetInternalCodec(srv, utils.AttributeS)
cms.AddInternalConn(utils.AttributeS, srv)
return
}
@@ -139,8 +135,3 @@ func (attrS *AttributeService) ShouldRun() bool {
func (attrS *AttributeService) StateChan(stateID string) chan struct{} {
return attrS.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (attrS *AttributeService) IntRPCConn() birpc.ClientConnector {
return attrS.intRPCconn
}

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -30,10 +29,9 @@ import (
)
// NewCacheService .
func NewCacheService(cfg *config.CGRConfig, connMgr *engine.ConnManager) *CacheService {
func NewCacheService(cfg *config.CGRConfig) *CacheService {
return &CacheService{
cfg: cfg,
connMgr: connMgr,
cacheCh: make(chan *engine.CacheS, 1),
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
@@ -41,15 +39,11 @@ func NewCacheService(cfg *config.CGRConfig, connMgr *engine.ConnManager) *CacheS
// CacheService implements Agent interface
type CacheService struct {
mu sync.Mutex
cl *commonlisteners.CommonListenerS
cacheCh chan *engine.CacheS
connMgr *engine.ConnManager
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
mu sync.Mutex
cfg *config.CGRConfig
cl *commonlisteners.CommonListenerS
cacheCh chan *engine.CacheS
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -58,7 +52,7 @@ func (cS *CacheService) Start(shutdown *utils.SyncedChan, registry *servmanager.
[]string{
utils.CommonListenerS,
utils.DataDB,
utils.AnalyzerS,
utils.ConnManager,
utils.CoreS,
},
registry, cS.cfg.GeneralCfg().ConnectTimeout)
@@ -67,13 +61,13 @@ func (cS *CacheService) Start(shutdown *utils.SyncedChan, registry *servmanager.
}
cS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cs := srvDeps[utils.CoreS].(*CoreService)
cS.mu.Lock()
defer cS.mu.Unlock()
engine.Cache = engine.NewCacheS(cS.cfg, dbs.DataManager(), cS.connMgr, cs.CoreS().CapsStats)
engine.Cache = engine.NewCacheS(cS.cfg, dbs.DataManager(), cms.ConnManager(), cs.CoreS().CapsStats)
go engine.Cache.Precache(shutdown)
cS.cacheCh <- engine.Cache
@@ -85,7 +79,7 @@ func (cS *CacheService) Start(shutdown *utils.SyncedChan, registry *servmanager.
cS.cl.RpcRegister(s)
}
}
cS.intRPCconn = anz.GetInternalCodec(srv, utils.CacheS)
cms.AddInternalConn(utils.CacheS, srv)
return
}
@@ -137,8 +131,3 @@ func (cS *CacheService) WaitToPrecache(shutdown *utils.SyncedChan, cacheIDs ...s
func (cS *CacheService) StateChan(stateID string) chan struct{} {
return cS.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (cS *CacheService) IntRPCConn() birpc.ClientConnector {
return cS.intRPCconn
}

View File

@@ -22,7 +22,6 @@ import (
"runtime"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/cdrs"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
@@ -32,11 +31,9 @@ import (
)
// NewCDRServer returns the CDR Server
func NewCDRServer(cfg *config.CGRConfig,
connMgr *engine.ConnManager) *CDRService {
func NewCDRServer(cfg *config.CGRConfig) *CDRService {
return &CDRService{
cfg: cfg,
connMgr: connMgr,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -44,15 +41,12 @@ func NewCDRServer(cfg *config.CGRConfig,
// CDRService implements Service interface
type CDRService struct {
sync.RWMutex
cfg *config.CGRConfig
cdrS *cdrs.CDRServer
cl *commonlisteners.CommonListenerS
connMgr *engine.ConnManager
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -60,9 +54,9 @@ func (cs *CDRService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
utils.StorDB,
},
registry, cs.cfg.GeneralCfg().ConnectTimeout)
@@ -70,15 +64,15 @@ func (cs *CDRService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe
return err
}
cs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
fs := srvDeps[utils.FilterS].(*FilterService)
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
sdbs := srvDeps[utils.StorDB].(*StorDBService)
sdbs := srvDeps[utils.StorDB].(*StorDBService).DB()
cs.Lock()
defer cs.Unlock()
cs.cdrS = cdrs.NewCDRServer(cs.cfg, dbs.DataManager(), fs.FilterS(), cs.connMgr, sdbs.DB())
cs.cdrS = cdrs.NewCDRServer(cs.cfg, dbs.DataManager(), fs, cms.ConnManager(), sdbs)
runtime.Gosched()
srv, err := engine.NewServiceWithPing(cs.cdrS, utils.CDRsV1, utils.V1Prfx)
if err != nil {
@@ -87,8 +81,7 @@ func (cs *CDRService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe
if !cs.cfg.DispatcherSCfg().Enabled {
cs.cl.RpcRegister(srv)
}
cs.intRPCconn = anz.GetInternalCodec(srv, utils.CDRServer)
cms.AddInternalConn(utils.CDRServer, srv)
return
}
@@ -120,8 +113,3 @@ func (cs *CDRService) ShouldRun() bool {
func (cs *CDRService) StateChan(stateID string) chan struct{} {
return cs.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (cs *CDRService) IntRPCConn() birpc.ClientConnector {
return cs.intRPCconn
}

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -30,11 +29,9 @@ import (
)
// NewChargerService returns the Charger Service
func NewChargerService(cfg *config.CGRConfig,
connMgr *engine.ConnManager) *ChargerService {
func NewChargerService(cfg *config.CGRConfig) *ChargerService {
return &ChargerService{
cfg: cfg,
connMgr: connMgr,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -42,15 +39,12 @@ func NewChargerService(cfg *config.CGRConfig,
// ChargerService implements Service interface
type ChargerService struct {
sync.RWMutex
cfg *config.CGRConfig
chrS *engine.ChargerS
cl *commonlisteners.CommonListenerS
connMgr *engine.ConnManager
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
@@ -58,16 +52,17 @@ func (chrS *ChargerService) Start(shutdown *utils.SyncedChan, registry *servmana
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
registry, chrS.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
chrS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheChargerProfiles,
@@ -76,11 +71,10 @@ func (chrS *ChargerService) Start(shutdown *utils.SyncedChan, registry *servmana
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
chrS.Lock()
defer chrS.Unlock()
chrS.chrS = engine.NewChargerService(dbs.DataManager(), fs.FilterS(), chrS.cfg, chrS.connMgr)
chrS.chrS = engine.NewChargerService(dbs.DataManager(), fs.FilterS(), chrS.cfg, cms.ConnManager())
srv, _ := engine.NewService(chrS.chrS)
// srv, _ := birpc.NewService(apis.NewChargerSv1(chrS.chrS), "", false)
if !chrS.cfg.DispatcherSCfg().Enabled {
@@ -88,8 +82,7 @@ func (chrS *ChargerService) Start(shutdown *utils.SyncedChan, registry *servmana
chrS.cl.RpcRegister(s)
}
}
chrS.intRPCconn = anz.GetInternalCodec(srv, utils.ChargerS)
cms.AddInternalConn(utils.ChargerS, srv)
return nil
}
@@ -121,8 +114,3 @@ func (chrS *ChargerService) ShouldRun() bool {
func (chrS *ChargerService) StateChan(stateID string) chan struct{} {
return chrS.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (chrS *ChargerService) IntRPCConn() birpc.ClientConnector {
return chrS.intRPCconn
}

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -41,15 +40,11 @@ func NewCommonListenerService(cfg *config.CGRConfig, caps *engine.Caps) *CommonL
// CommonListenerService implements Service interface.
type CommonListenerService struct {
mu sync.RWMutex
cls *commonlisteners.CommonListenerS
caps *engine.Caps
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
mu sync.RWMutex
cfg *config.CGRConfig
cls *commonlisteners.CommonListenerS
caps *engine.Caps
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start handles the service start.
@@ -94,11 +89,6 @@ func (cl *CommonListenerService) StateChan(stateID string) chan struct{} {
return cl.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (cl *CommonListenerService) IntRPCConn() birpc.ClientConnector {
return cl.intRPCconn
}
// CLS returns the CommonListenerS object.
func (cl *CommonListenerService) CLS() *commonlisteners.CommonListenerS {
return cl.cls

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -39,11 +38,10 @@ func NewConfigService(cfg *config.CGRConfig) *ConfigService {
// ConfigService implements Service interface.
type ConfigService struct {
mu sync.RWMutex
cfg *config.CGRConfig
cl *commonlisteners.CommonListenerS
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
mu sync.RWMutex
cfg *config.CGRConfig
cl *commonlisteners.CommonListenerS
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start handles the service start.
@@ -51,14 +49,14 @@ func (s *ConfigService) Start(_ *utils.SyncedChan, registry *servmanager.Service
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.AnalyzerS,
utils.ConnManager,
},
registry, s.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
svcs, _ := engine.NewServiceWithName(s.cfg, utils.ConfigS, true)
if !s.cfg.DispatcherSCfg().Enabled {
@@ -66,7 +64,7 @@ func (s *ConfigService) Start(_ *utils.SyncedChan, registry *servmanager.Service
s.cl.RpcRegister(svc)
}
}
s.intRPCconn = anz.GetInternalCodec(svcs, utils.ConfigSv1)
cms.AddInternalConn(utils.ConfigS, svcs)
return nil
}
@@ -94,8 +92,3 @@ func (s *ConfigService) ShouldRun() bool {
func (s *ConfigService) StateChan(stateID string) chan struct{} {
return s.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (s *ConfigService) IntRPCConn() birpc.ClientConnector {
return s.intRPCconn
}

225
services/connmanager.go Normal file
View File

@@ -0,0 +1,225 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// NewConnManagerService instantiates a new ConnManagerService.
func NewConnManagerService(cfg *config.CGRConfig) *ConnManagerService {
return &ConnManagerService{
cfg: cfg,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
// ConnManagerService implements Service interface.
type ConnManagerService struct {
mu sync.RWMutex
cfg *config.CGRConfig
connMgr *engine.ConnManager
anz *AnalyzerService
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start handles the service start.
func (s *ConnManagerService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
s.anz = registry.Lookup(utils.AnalyzerS).(*AnalyzerService)
if s.anz.ShouldRun() { // wait for AnalyzerS only if it should run
if _, err := WaitForServiceState(utils.StateServiceInit, utils.AnalyzerS, registry,
s.cfg.GeneralCfg().ConnectTimeout); err != nil {
return err
}
}
s.connMgr = engine.NewConnManager(s.cfg)
return nil
}
// Reload handles the config changes.
func (s *ConnManagerService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error {
s.connMgr.Reload()
return nil
}
// Shutdown stops the service.
func (s *ConnManagerService) Shutdown(_ *servmanager.ServiceRegistry) error {
s.connMgr = nil
engine.SetConnManager(nil)
return nil
}
// ServiceName returns the service name
func (s *ConnManagerService) ServiceName() string {
return utils.ConnManager
}
// ShouldRun returns if the service should be running.
func (s *ConnManagerService) ShouldRun() bool {
return true
}
// StateChan returns signaling channel of specific state
func (s *ConnManagerService) StateChan(stateID string) chan struct{} {
return s.stateDeps.StateChan(stateID)
}
// ConnManager returns the ConnManager object.
func (s *ConnManagerService) ConnManager() *engine.ConnManager {
return s.connMgr
}
// AddInternalConn registers direct internal RPC access for a service.
// TODO: Add function to remove internal conns (useful for shutdown).
func (s *ConnManagerService) AddInternalConn(svcName string, receiver birpc.ClientConnector) {
s.mu.Lock()
defer s.mu.Unlock()
route, exists := serviceMethods[svcName]
if !exists {
return
}
rpcIntChan := make(chan birpc.ClientConnector, 1)
s.connMgr.AddInternalConn(route.internalPath, route.receiver, rpcIntChan)
if route.biRPCPath != "" {
s.connMgr.AddInternalConn(route.biRPCPath, route.receiver, rpcIntChan)
}
rpcIntChan <- s.anz.GetInternalCodec(receiver, svcName)
}
// internalRoute defines how a service's methods can be accessed internally within the system.
type internalRoute struct {
receiver string // method receiver name (e.g. "ChargerSv1")
internalPath string // internal API path
biRPCPath string // bidirectional API path, if supported
}
var serviceMethods = map[string]internalRoute{
utils.AnalyzerS: {
receiver: utils.AnalyzerSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAnalyzerS),
},
utils.AdminS: {
receiver: utils.AdminSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAdminS),
},
utils.AttributeS: {
receiver: utils.AttributeSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes),
},
utils.CacheS: {
receiver: utils.CacheSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches),
},
utils.CDRs: {
receiver: utils.CDRsV1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs),
},
utils.ChargerS: {
receiver: utils.ChargerSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers),
},
utils.GuardianS: {
receiver: utils.GuardianSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian),
},
utils.LoaderS: {
receiver: utils.LoaderSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaLoaders),
},
utils.ResourceS: {
receiver: utils.ResourceSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources),
},
utils.SessionS: {
receiver: utils.SessionSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS),
biRPCPath: utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS),
},
utils.StatS: {
receiver: utils.StatSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats),
},
utils.RankingS: {
receiver: utils.RankingSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRankings),
},
utils.TrendS: {
receiver: utils.TrendSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTrends),
},
utils.RouteS: {
receiver: utils.RouteSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRoutes),
},
utils.ThresholdS: {
receiver: utils.ThresholdSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds),
},
utils.ServiceManagerS: {
receiver: utils.ServiceManagerV1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaServiceManager),
},
utils.ConfigS: {
receiver: utils.ConfigSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig),
},
utils.CoreS: {
receiver: utils.CoreSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore),
},
utils.EEs: {
receiver: utils.EeSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs),
},
utils.RateS: {
receiver: utils.RateSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRates),
},
utils.DispatcherS: {
receiver: utils.DispatcherSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers),
},
utils.AccountS: {
receiver: utils.AccountSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts),
},
utils.ActionS: {
receiver: utils.ActionSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions),
},
utils.TPeS: {
receiver: utils.TPeSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTpes),
},
utils.EFs: {
receiver: utils.EfSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEFs),
},
utils.ERs: {
receiver: utils.ErSv1,
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaERs),
},
}

View File

@@ -22,7 +22,6 @@ import (
"os"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
@@ -46,20 +45,16 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps,
// CoreService implements Service interface
type CoreService struct {
mu sync.RWMutex
cS *cores.CoreS
cl *commonlisteners.CommonListenerS
fileCPU *os.File
caps *engine.Caps
csCh chan *cores.CoreS
stopChan chan struct{}
shdWg *sync.WaitGroup
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
mu sync.RWMutex
cfg *config.CGRConfig
cS *cores.CoreS
cl *commonlisteners.CommonListenerS
fileCPU *os.File
caps *engine.Caps
csCh chan *cores.CoreS
stopChan chan struct{}
shdWg *sync.WaitGroup
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
@@ -67,14 +62,14 @@ func (cS *CoreService) Start(shutdown *utils.SyncedChan, registry *servmanager.S
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.AnalyzerS,
utils.ConnManager,
},
registry, cS.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
cS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cS.mu.Lock()
defer cS.mu.Unlock()
@@ -90,8 +85,7 @@ func (cS *CoreService) Start(shutdown *utils.SyncedChan, registry *servmanager.S
cS.cl.RpcRegister(s)
}
}
cS.intRPCconn = anz.GetInternalCodec(srv, utils.CoreS)
cms.AddInternalConn(utils.CoreS, srv)
return nil
}
@@ -129,11 +123,6 @@ func (cS *CoreService) StateChan(stateID string) chan struct{} {
return cS.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (cS *CoreService) IntRPCConn() birpc.ClientConnector {
return cS.intRPCconn
}
// CoreS returns the CoreS object.
func (cS *CoreService) CoreS() *cores.CoreS {
cS.mu.RLock()

View File

@@ -22,7 +22,6 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -30,11 +29,10 @@ import (
)
// NewDataDBService returns the DataDB Service
func NewDataDBService(cfg *config.CGRConfig, connMgr *engine.ConnManager, setVersions bool,
func NewDataDBService(cfg *config.CGRConfig, setVersions bool,
srvDep map[string]*sync.WaitGroup) *DataDBService {
return &DataDBService{
cfg: cfg,
connMgr: connMgr,
setVersions: setVersions,
srvDep: srvDep,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
@@ -44,21 +42,20 @@ func NewDataDBService(cfg *config.CGRConfig, connMgr *engine.ConnManager, setVer
// DataDBService implements Service interface
type DataDBService struct {
sync.RWMutex
cfg *config.CGRConfig
oldDBCfg *config.DataDbCfg
connMgr *engine.ConnManager
cfg *config.CGRConfig
oldDBCfg *config.DataDbCfg
dm *engine.DataManager
setVersions bool
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
srvDep map[string]*sync.WaitGroup
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start handles the service start.
func (db *DataDBService) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
func (db *DataDBService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
cms, err := WaitForServiceState(utils.StateServiceUP, utils.ConnManager, registry, db.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
db.Lock()
defer db.Unlock()
db.oldDBCfg = db.cfg.DataDbCfg().Clone()
@@ -71,7 +68,7 @@ func (db *DataDBService) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegist
utils.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err))
return
}
db.dm = engine.NewDataManager(dbConn, db.cfg.CacheCfg(), db.connMgr)
db.dm = engine.NewDataManager(dbConn, db.cfg.CacheCfg(), cms.(*ConnManagerService).ConnManager())
if db.setVersions {
err = engine.OverwriteDBVersions(dbConn)
@@ -176,8 +173,3 @@ func (db *DataDBService) DataManager() *engine.DataManager {
func (db *DataDBService) StateChan(stateID string) chan struct{} {
return db.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (db *DataDBService) IntRPCConn() birpc.ClientConnector {
return db.intRPCconn
}

View File

@@ -22,7 +22,6 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -31,11 +30,9 @@ import (
)
// NewDiameterAgent returns the Diameter Agent
func NewDiameterAgent(cfg *config.CGRConfig,
connMgr *engine.ConnManager, caps *engine.Caps) *DiameterAgent {
func NewDiameterAgent(cfg *config.CGRConfig, caps *engine.Caps) *DiameterAgent {
return &DiameterAgent{
cfg: cfg,
connMgr: connMgr,
caps: caps,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
@@ -47,36 +44,39 @@ type DiameterAgent struct {
cfg *config.CGRConfig
stopChan chan struct{}
da *agents.DiameterAgent
connMgr *engine.ConnManager
caps *engine.Caps
da *agents.DiameterAgent
caps *engine.Caps
lnet string
laddr string
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
func (da *DiameterAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
fs, err := WaitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
da.cfg.GeneralCfg().ConnectTimeout)
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.ConnManager,
utils.FilterS,
},
registry, da.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
da.Lock()
defer da.Unlock()
return da.start(fs.(*FilterService).FilterS(), da.caps, shutdown)
return da.start(fs.FilterS(), cms.ConnManager(), da.caps, shutdown)
}
func (da *DiameterAgent) start(filterS *engine.FilterS, caps *engine.Caps, shutdown *utils.SyncedChan) error {
func (da *DiameterAgent) start(filterS *engine.FilterS, cm *engine.ConnManager, caps *engine.Caps,
shutdown *utils.SyncedChan) error {
var err error
da.da, err = agents.NewDiameterAgent(da.cfg, filterS, da.connMgr, caps)
da.da, err = agents.NewDiameterAgent(da.cfg, filterS, cm, caps)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed to initialize agent: %v",
utils.DiameterAgent, err))
return err
}
da.lnet = da.cfg.DiameterAgentCfg().ListenNet
@@ -102,12 +102,18 @@ func (da *DiameterAgent) Reload(shutdown *utils.SyncedChan, registry *servmanage
}
close(da.stopChan)
fs, err := WaitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
da.cfg.GeneralCfg().ConnectTimeout)
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.ConnManager,
utils.FilterS,
},
registry, da.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
return da.start(fs.(*FilterService).FilterS(), da.caps, shutdown)
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
return da.start(fs.FilterS(), cms.ConnManager(), da.caps, shutdown)
}
// Shutdown stops the service
@@ -133,8 +139,3 @@ func (da *DiameterAgent) ShouldRun() bool {
func (da *DiameterAgent) StateChan(stateID string) chan struct{} {
return da.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (da *DiameterAgent) IntRPCConn() birpc.ClientConnector {
return da.intRPCconn
}

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/dispatchers"
@@ -31,11 +30,9 @@ import (
)
// NewDispatcherService returns the Dispatcher Service
func NewDispatcherService(cfg *config.CGRConfig,
connMgr *engine.ConnManager) *DispatcherService {
func NewDispatcherService(cfg *config.CGRConfig) *DispatcherService {
return &DispatcherService{
cfg: cfg,
connMgr: connMgr,
srvsReload: make(map[string]chan struct{}),
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
@@ -44,16 +41,12 @@ func NewDispatcherService(cfg *config.CGRConfig,
// DispatcherService implements Service interface
type DispatcherService struct {
sync.RWMutex
dspS *dispatchers.DispatcherService
cl *commonlisteners.CommonListenerS
connMgr *engine.ConnManager
cfg *config.CGRConfig
dspS *dispatchers.DispatcherService
cl *commonlisteners.CommonListenerS
connMgr *engine.ConnManager
srvsReload map[string]chan struct{}
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -61,16 +54,19 @@ func (dspS *DispatcherService) Start(shutdown *utils.SyncedChan, registry *servm
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
utils.AttributeS,
},
registry, dspS.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
dspS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
dspS.connMgr = cms.ConnManager()
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheDispatcherProfiles,
@@ -80,7 +76,6 @@ func (dspS *DispatcherService) Start(shutdown *utils.SyncedChan, registry *servm
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
dspS.Lock()
defer dspS.Unlock()
@@ -98,7 +93,7 @@ func (dspS *DispatcherService) Start(shutdown *utils.SyncedChan, registry *servm
// for the moment we dispable Apier through dispatcher
// until we figured out a better sollution in case of gob server
// dspS.server.SetDispatched()
dspS.intRPCconn = anz.GetInternalCodec(srv, utils.DispatcherS)
cms.AddInternalConn(utils.DispatcherS, srv)
return
}
@@ -162,8 +157,3 @@ func (dspS *DispatcherService) sync() {
func (dspS *DispatcherService) StateChan(stateID string) chan struct{} {
return dspS.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (dspS *DispatcherService) IntRPCConn() birpc.ClientConnector {
return dspS.intRPCconn
}

View File

@@ -19,23 +19,18 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package services
import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
// NewDNSAgent returns the DNS Agent
func NewDNSAgent(cfg *config.CGRConfig,
connMgr *engine.ConnManager) *DNSAgent {
func NewDNSAgent(cfg *config.CGRConfig) *DNSAgent {
return &DNSAgent{
cfg: cfg,
connMgr: connMgr,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -47,26 +42,29 @@ type DNSAgent struct {
stopChan chan struct{}
dns *agents.DNSAgent
connMgr *engine.ConnManager
dns *agents.DNSAgent
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
func (dns *DNSAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
fs, err := WaitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
dns.cfg.GeneralCfg().ConnectTimeout)
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.ConnManager,
utils.FilterS,
},
registry, dns.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
return err
}
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
dns.Lock()
defer dns.Unlock()
dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.(*FilterService).FilterS(), dns.connMgr)
dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.FilterS(), cms.ConnManager())
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error()))
dns.dns = nil
return
}
@@ -77,11 +75,17 @@ func (dns *DNSAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.Ser
// Reload handles the change of config
func (dns *DNSAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
fs, err := WaitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
dns.cfg.GeneralCfg().ConnectTimeout)
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.ConnManager,
utils.FilterS,
},
registry, dns.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
return err
}
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
dns.Lock()
defer dns.Unlock()
@@ -90,9 +94,8 @@ func (dns *DNSAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager.Se
close(dns.stopChan)
}
dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.(*FilterService).FilterS(), dns.connMgr)
dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.FilterS(), cms.ConnManager())
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error()))
dns.dns = nil
return
}
@@ -108,7 +111,6 @@ func (dns *DNSAgent) listenAndServe(stopChan chan struct{}, shutdown *utils.Sync
dns.dns.RLock()
defer dns.dns.RUnlock()
if err = dns.dns.ListenAndServe(stopChan); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error()))
shutdown.CloseOnce() // stop the engine here
}
return
@@ -140,8 +142,3 @@ func (dns *DNSAgent) ShouldRun() bool {
func (dns *DNSAgent) StateChan(stateID string) chan struct{} {
return dns.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (dns *DNSAgent) IntRPCConn() birpc.ClientConnector {
return dns.intRPCconn
}

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/ees"
@@ -31,27 +30,22 @@ import (
)
// NewEventExporterService constructs EventExporterService
func NewEventExporterService(cfg *config.CGRConfig,
connMgr *engine.ConnManager) *EventExporterService {
func NewEventExporterService(cfg *config.CGRConfig) *EventExporterService {
return &EventExporterService{
cfg: cfg,
connMgr: connMgr,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
// EventExporterService is the service structure for EventExporterS
type EventExporterService struct {
mu sync.RWMutex
mu sync.RWMutex
cfg *config.CGRConfig
eeS *ees.EeS
cl *commonlisteners.CommonListenerS
connMgr *engine.ConnManager
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
stateDeps *StateDependencies // channel subscriptions for state changes
}
// ServiceName returns the service name
@@ -87,21 +81,21 @@ func (es *EventExporterService) Start(_ *utils.SyncedChan, registry *servmanager
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
utils.FilterS,
utils.AnalyzerS,
},
registry, es.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
es.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
fs := srvDeps[utils.FilterS].(*FilterService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
es.mu.Lock()
defer es.mu.Unlock()
es.eeS, err = ees.NewEventExporterS(es.cfg, fs.FilterS(), es.connMgr)
es.eeS, err = ees.NewEventExporterS(es.cfg, fs, cms.ConnManager())
if err != nil {
return err
}
@@ -111,8 +105,7 @@ func (es *EventExporterService) Start(_ *utils.SyncedChan, registry *servmanager
if !es.cfg.DispatcherSCfg().Enabled {
es.cl.RpcRegister(srv)
}
es.intRPCconn = anz.GetInternalCodec(srv, utils.EEs)
cms.AddInternalConn(utils.EEs, srv)
return nil
}
@@ -120,8 +113,3 @@ func (es *EventExporterService) Start(_ *utils.SyncedChan, registry *servmanager
func (es *EventExporterService) StateChan(stateID string) chan struct{} {
return es.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (es *EventExporterService) IntRPCConn() birpc.ClientConnector {
return es.intRPCconn
}

View File

@@ -39,38 +39,41 @@ type ExportFailoverService struct {
srv *birpc.Service
stopChan chan struct{}
connMgr *engine.ConnManager
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
stateDeps *StateDependencies // channel subscriptions for state changes
}
// NewExportFailoverService is the constructor for the TpeService
func NewExportFailoverService(cfg *config.CGRConfig, connMgr *engine.ConnManager) *ExportFailoverService {
func NewExportFailoverService(cfg *config.CGRConfig) *ExportFailoverService {
return &ExportFailoverService{
cfg: cfg,
connMgr: connMgr,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
// Start should handle the service start
func (efServ *ExportFailoverService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
cls, err := WaitForServiceState(utils.StateServiceUP, utils.CommonListenerS, registry,
efServ.cfg.GeneralCfg().ConnectTimeout)
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
},
registry, efServ.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
efServ.cl = cls.(*CommonListenerService).CLS()
efServ.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
efServ.Lock()
defer efServ.Unlock()
efServ.efS = efs.NewEfs(efServ.cfg, efServ.connMgr)
efServ.efS = efs.NewEfs(efServ.cfg, cms.ConnManager())
efServ.stopChan = make(chan struct{})
efServ.srv, _ = engine.NewServiceWithPing(efServ.efS, utils.EfSv1, utils.V1Prfx)
efServ.cl.RpcRegister(efServ.srv)
cms.AddInternalConn(utils.EFs, efServ.srv)
return
}
@@ -101,8 +104,3 @@ func (efServ *ExportFailoverService) ServiceName() string {
func (efServ *ExportFailoverService) StateChan(stateID string) chan struct{} {
return efServ.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (efServ *ExportFailoverService) IntRPCConn() birpc.ClientConnector {
return efServ.intRPCconn
}

View File

@@ -22,7 +22,6 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -32,13 +31,10 @@ import (
)
// NewEventReaderService returns the EventReader Service
func NewEventReaderService(
cfg *config.CGRConfig,
connMgr *engine.ConnManager) *EventReaderService {
func NewEventReaderService(cfg *config.CGRConfig) *EventReaderService {
return &EventReaderService{
rldChan: make(chan struct{}, 1),
cfg: cfg,
connMgr: connMgr,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -46,17 +42,15 @@ func NewEventReaderService(
// EventReaderService implements Service interface
type EventReaderService struct {
sync.RWMutex
cfg *config.CGRConfig
ers *ers.ERService
cl *commonlisteners.CommonListenerS
rldChan chan struct{}
stopChan chan struct{}
connMgr *engine.ConnManager
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -64,16 +58,16 @@ func (erS *EventReaderService) Start(shutdown *utils.SyncedChan, registry *servm
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
utils.FilterS,
utils.AnalyzerS,
},
registry, erS.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
erS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
erS.Lock()
defer erS.Unlock()
@@ -82,7 +76,7 @@ func (erS *EventReaderService) Start(shutdown *utils.SyncedChan, registry *servm
erS.stopChan = make(chan struct{})
// build the service
erS.ers = ers.NewERService(erS.cfg, fs.FilterS(), erS.connMgr)
erS.ers = ers.NewERService(erS.cfg, fs.FilterS(), cms.ConnManager())
go erS.listenAndServe(erS.ers, erS.stopChan, erS.rldChan, shutdown)
srv, err := engine.NewServiceWithPing(erS.ers, utils.ErSv1, utils.V1Prfx)
@@ -92,7 +86,7 @@ func (erS *EventReaderService) Start(shutdown *utils.SyncedChan, registry *servm
if !erS.cfg.DispatcherSCfg().Enabled {
erS.cl.RpcRegister(srv)
}
erS.intRPCconn = anz.GetInternalCodec(srv, utils.ERs)
cms.AddInternalConn(utils.ERs, srv)
return
}
@@ -136,8 +130,3 @@ func (erS *EventReaderService) ShouldRun() bool {
func (erS *EventReaderService) StateChan(stateID string) chan struct{} {
return erS.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (erS *EventReaderService) IntRPCConn() birpc.ClientConnector {
return erS.intRPCconn
}

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -29,31 +28,26 @@ import (
)
// NewFilterService instantiates a new FilterService.
func NewFilterService(cfg *config.CGRConfig, connMgr *engine.ConnManager) *FilterService {
func NewFilterService(cfg *config.CGRConfig) *FilterService {
return &FilterService{
cfg: cfg,
connMgr: connMgr,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
// FilterService implements Service interface.
type FilterService struct {
mu sync.RWMutex
fltrS *engine.FilterS
cfg *config.CGRConfig
connMgr *engine.ConnManager
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
mu sync.RWMutex
cfg *config.CGRConfig
fltrS *engine.FilterS
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start handles the service start.
func (s *FilterService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.ConnManager,
utils.CacheS,
utils.DataDB,
},
@@ -61,6 +55,7 @@ func (s *FilterService) Start(shutdown *utils.SyncedChan, registry *servmanager.
if err != nil {
return err
}
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown, utils.CacheFilters); err != nil {
return err
@@ -70,7 +65,7 @@ func (s *FilterService) Start(shutdown *utils.SyncedChan, registry *servmanager.
s.mu.Lock()
defer s.mu.Unlock()
s.fltrS = engine.NewFilterS(s.cfg, s.connMgr, dbs.DataManager())
s.fltrS = engine.NewFilterS(s.cfg, cms.ConnManager(), dbs.DataManager())
return nil
}
@@ -102,11 +97,6 @@ func (s *FilterService) StateChan(stateID string) chan struct{} {
return s.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (s *FilterService) IntRPCConn() birpc.ClientConnector {
return s.intRPCconn
}
// FilterS returns the FilterS object.
func (s *FilterService) FilterS() *engine.FilterS {
return s.fltrS

View File

@@ -22,9 +22,6 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/servmanager"
@@ -32,11 +29,9 @@ import (
)
// NewFreeswitchAgent returns the Freeswitch Agent
func NewFreeswitchAgent(cfg *config.CGRConfig,
connMgr *engine.ConnManager) *FreeswitchAgent {
func NewFreeswitchAgent(cfg *config.CGRConfig) *FreeswitchAgent {
return &FreeswitchAgent{
cfg: cfg,
connMgr: connMgr,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -44,21 +39,22 @@ func NewFreeswitchAgent(cfg *config.CGRConfig,
// FreeswitchAgent implements Agent interface
type FreeswitchAgent struct {
sync.RWMutex
cfg *config.CGRConfig
fS *agents.FSsessions
connMgr *engine.ConnManager
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
cfg *config.CGRConfig
fS *agents.FSsessions
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
func (fS *FreeswitchAgent) Start(shutdown *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
func (fS *FreeswitchAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
cms, err := WaitForServiceState(utils.StateServiceUP, utils.ConnManager, registry, fS.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
fS.Lock()
defer fS.Unlock()
fS.fS = agents.NewFSsessions(fS.cfg.FsAgentCfg(), fS.cfg.GeneralCfg().DefaultTimezone, fS.connMgr)
fS.fS = agents.NewFSsessions(fS.cfg.FsAgentCfg(), fS.cfg.GeneralCfg().DefaultTimezone, cms.(*ConnManagerService).ConnManager())
go fS.connect(shutdown)
return
@@ -107,8 +103,3 @@ func (fS *FreeswitchAgent) ShouldRun() bool {
func (fS *FreeswitchAgent) StateChan(stateID string) chan struct{} {
return fS.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (fS *FreeswitchAgent) IntRPCConn() birpc.ClientConnector {
return fS.intRPCconn
}

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package services
import (
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/config"
@@ -37,10 +36,8 @@ func NewGlobalVarS(cfg *config.CGRConfig) *GlobalVarS {
// GlobalVarS implements Agent interface
type GlobalVarS struct {
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
cfg *config.CGRConfig
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -82,8 +79,3 @@ func (gv *GlobalVarS) ShouldRun() bool {
func (gv *GlobalVarS) StateChan(stateID string) chan struct{} {
return gv.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (gv *GlobalVarS) IntRPCConn() birpc.ClientConnector {
return gv.intRPCconn
}

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -40,11 +39,10 @@ func NewGuardianService(cfg *config.CGRConfig) *GuardianService {
// GuardianService implements Service interface.
type GuardianService struct {
mu sync.RWMutex
cfg *config.CGRConfig
cl *commonlisteners.CommonListenerS
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
mu sync.RWMutex
cfg *config.CGRConfig
cl *commonlisteners.CommonListenerS
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start handles the service start.
@@ -52,14 +50,14 @@ func (s *GuardianService) Start(_ *utils.SyncedChan, registry *servmanager.Servi
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.AnalyzerS,
utils.ConnManager,
},
registry, s.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
s.mu.Lock()
defer s.mu.Unlock()
@@ -70,7 +68,7 @@ func (s *GuardianService) Start(_ *utils.SyncedChan, registry *servmanager.Servi
s.cl.RpcRegister(svc)
}
}
s.intRPCconn = anz.GetInternalCodec(svcs, utils.GuardianS)
cms.AddInternalConn(utils.GuardianS, svcs)
return nil
}
@@ -101,8 +99,3 @@ func (s *GuardianService) ShouldRun() bool {
func (s *GuardianService) StateChan(stateID string) chan struct{} {
return s.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (s *GuardianService) IntRPCConn() birpc.ClientConnector {
return s.intRPCconn
}

View File

@@ -21,21 +21,17 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
// NewHTTPAgent returns the HTTP Agent
func NewHTTPAgent(cfg *config.CGRConfig,
connMgr *engine.ConnManager) *HTTPAgent {
func NewHTTPAgent(cfg *config.CGRConfig) *HTTPAgent {
return &HTTPAgent{
cfg: cfg,
connMgr: connMgr,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -43,6 +39,7 @@ func NewHTTPAgent(cfg *config.CGRConfig,
// HTTPAgent implements Agent interface
type HTTPAgent struct {
sync.RWMutex
cfg *config.CGRConfig
cl *commonlisteners.CommonListenerS
@@ -50,11 +47,7 @@ type HTTPAgent struct {
// if we registerd the handlers
started bool
connMgr *engine.ConnManager
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -62,6 +55,7 @@ func (ha *HTTPAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceReg
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
utils.FilterS,
},
registry, ha.cfg.GeneralCfg().ConnectTimeout)
@@ -69,6 +63,7 @@ func (ha *HTTPAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceReg
return err
}
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager()
fs := srvDeps[utils.FilterS].(*FilterService)
ha.Lock()
@@ -77,7 +72,7 @@ func (ha *HTTPAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceReg
ha.started = true
for _, agntCfg := range ha.cfg.HTTPAgentCfg() {
cl.RegisterHttpHandler(agntCfg.URL,
agents.NewHTTPAgent(ha.connMgr, agntCfg.SessionSConns, fs.FilterS(),
agents.NewHTTPAgent(cms, agntCfg.SessionSConns, fs.FilterS(),
ha.cfg.GeneralCfg().DefaultTenant, agntCfg.RequestPayload,
agntCfg.ReplyPayload, agntCfg.RequestProcessors))
}
@@ -111,8 +106,3 @@ func (ha *HTTPAgent) ShouldRun() bool {
func (ha *HTTPAgent) StateChan(stateID string) chan struct{} {
return ha.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (ha *HTTPAgent) IntRPCConn() birpc.ClientConnector {
return ha.intRPCconn
}

View File

@@ -23,20 +23,16 @@ import (
"net/http"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
// NewJanusAgent returns the Janus Agent
func NewJanusAgent(cfg *config.CGRConfig,
connMgr *engine.ConnManager) *JanusAgent {
func NewJanusAgent(cfg *config.CGRConfig) *JanusAgent {
return &JanusAgent{
cfg: cfg,
connMgr: connMgr,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -44,6 +40,7 @@ func NewJanusAgent(cfg *config.CGRConfig,
// JanusAgent implements Service interface
type JanusAgent struct {
sync.RWMutex
cfg *config.CGRConfig
jA *agents.JanusAgent
@@ -51,11 +48,7 @@ type JanusAgent struct {
// if we registerd the jandlers
started bool
connMgr *engine.ConnManager
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should jandle the sercive start
@@ -63,6 +56,7 @@ func (ja *JanusAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
utils.FilterS,
},
registry, ja.cfg.GeneralCfg().ConnectTimeout)
@@ -70,6 +64,7 @@ func (ja *JanusAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe
return err
}
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
ja.Lock()
@@ -77,7 +72,7 @@ func (ja *JanusAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe
ja.Unlock()
return utils.ErrServiceAlreadyRunning
}
ja.jA, err = agents.NewJanusAgent(ja.cfg, ja.connMgr, fs.FilterS())
ja.jA, err = agents.NewJanusAgent(ja.cfg, cms.ConnManager(), fs.FilterS())
if err != nil {
return
}
@@ -126,8 +121,3 @@ func (ja *JanusAgent) ShouldRun() bool {
func (ja *JanusAgent) StateChan(stateID string) chan struct{} {
return ja.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (ja *JanusAgent) IntRPCConn() birpc.ClientConnector {
return ja.intRPCconn
}

View File

@@ -23,20 +23,16 @@ import (
"strings"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
// NewKamailioAgent returns the Kamailio Agent
func NewKamailioAgent(cfg *config.CGRConfig,
connMgr *engine.ConnManager) *KamailioAgent {
func NewKamailioAgent(cfg *config.CGRConfig) *KamailioAgent {
return &KamailioAgent{
cfg: cfg,
connMgr: connMgr,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -44,21 +40,22 @@ func NewKamailioAgent(cfg *config.CGRConfig,
// KamailioAgent implements Agent interface
type KamailioAgent struct {
sync.RWMutex
cfg *config.CGRConfig
kam *agents.KamailioAgent
connMgr *engine.ConnManager
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
cfg *config.CGRConfig
kam *agents.KamailioAgent
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
func (kam *KamailioAgent) Start(shutdown *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
func (kam *KamailioAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
cms, err := WaitForServiceState(utils.StateServiceUP, utils.ConnManager, registry, kam.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
kam.Lock()
defer kam.Unlock()
kam.kam = agents.NewKamailioAgent(kam.cfg.KamAgentCfg(), kam.connMgr,
kam.kam = agents.NewKamailioAgent(kam.cfg.KamAgentCfg(), cms.(*ConnManagerService).ConnManager(),
utils.FirstNonEmpty(kam.cfg.KamAgentCfg().Timezone, kam.cfg.GeneralCfg().DefaultTimezone))
go kam.connect(kam.kam, shutdown)
@@ -112,8 +109,3 @@ func (kam *KamailioAgent) ShouldRun() bool {
func (kam *KamailioAgent) StateChan(stateID string) chan struct{} {
return kam.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (kam *KamailioAgent) IntRPCConn() birpc.ClientConnector {
return kam.intRPCconn
}

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -31,11 +30,9 @@ import (
)
// NewLoaderService returns the Loader Service
func NewLoaderService(cfg *config.CGRConfig,
connMgr *engine.ConnManager) *LoaderService {
func NewLoaderService(cfg *config.CGRConfig) *LoaderService {
return &LoaderService{
cfg: cfg,
connMgr: connMgr,
stopChan: make(chan struct{}),
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
@@ -49,11 +46,9 @@ type LoaderService struct {
cl *commonlisteners.CommonListenerS
stopChan chan struct{}
connMgr *engine.ConnManager
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
@@ -61,23 +56,23 @@ func (ldrs *LoaderService) Start(_ *utils.SyncedChan, registry *servmanager.Serv
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
registry, ldrs.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
ldrs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
ldrs.Lock()
defer ldrs.Unlock()
ldrs.ldrs = loaders.NewLoaderS(ldrs.cfg, dbs.DataManager(), fs.FilterS(), ldrs.connMgr)
ldrs.ldrs = loaders.NewLoaderS(ldrs.cfg, dbs.DataManager(), fs.FilterS(), cms.ConnManager())
if !ldrs.ldrs.Enabled() {
return
@@ -92,7 +87,7 @@ func (ldrs *LoaderService) Start(_ *utils.SyncedChan, registry *servmanager.Serv
ldrs.cl.RpcRegister(s)
}
}
ldrs.intRPCconn = anz.GetInternalCodec(srv, utils.LoaderS)
cms.AddInternalConn(utils.LoaderS, srv)
return
}
@@ -100,6 +95,7 @@ func (ldrs *LoaderService) Start(_ *utils.SyncedChan, registry *servmanager.Serv
func (ldrs *LoaderService) Reload(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.ConnManager,
utils.FilterS,
utils.DataDB,
},
@@ -107,6 +103,7 @@ func (ldrs *LoaderService) Reload(_ *utils.SyncedChan, registry *servmanager.Ser
if err != nil {
return err
}
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
close(ldrs.stopChan)
@@ -115,7 +112,7 @@ func (ldrs *LoaderService) Reload(_ *utils.SyncedChan, registry *servmanager.Ser
ldrs.RLock()
defer ldrs.RUnlock()
ldrs.ldrs.Reload(dbs.DataManager(), fs.FilterS(), ldrs.connMgr)
ldrs.ldrs.Reload(dbs.DataManager(), fs.FilterS(), cms.ConnManager())
return ldrs.ldrs.ListenAndServe(ldrs.stopChan)
}
@@ -148,8 +145,3 @@ func (ldrs *LoaderService) GetLoaderS() *loaders.LoaderS {
func (ldrs *LoaderService) StateChan(stateID string) chan struct{} {
return ldrs.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (ldrs *LoaderService) IntRPCConn() birpc.ClientConnector {
return ldrs.intRPCconn
}

View File

@@ -22,20 +22,16 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
// NewRadiusAgent returns the Radius Agent
func NewRadiusAgent(cfg *config.CGRConfig,
connMgr *engine.ConnManager) *RadiusAgent {
func NewRadiusAgent(cfg *config.CGRConfig) *RadiusAgent {
return &RadiusAgent{
cfg: cfg,
connMgr: connMgr,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -46,24 +42,28 @@ type RadiusAgent struct {
cfg *config.CGRConfig
stopChan chan struct{}
rad *agents.RadiusAgent
connMgr *engine.ConnManager
rad *agents.RadiusAgent
lnet string
lauth string
lacct string
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
func (rad *RadiusAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
fs, err := WaitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
rad.cfg.GeneralCfg().ConnectTimeout)
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.ConnManager,
utils.FilterS,
},
registry, rad.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
rad.Lock()
defer rad.Unlock()
@@ -72,7 +72,7 @@ func (rad *RadiusAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.
rad.lauth = rad.cfg.RadiusAgentCfg().ListenAuth
rad.lacct = rad.cfg.RadiusAgentCfg().ListenAcct
if rad.rad, err = agents.NewRadiusAgent(rad.cfg, fs.(*FilterService).FilterS(), rad.connMgr); err != nil {
if rad.rad, err = agents.NewRadiusAgent(rad.cfg, fs.FilterS(), cms.ConnManager()); err != nil {
return
}
rad.stopChan = make(chan struct{})
@@ -128,8 +128,3 @@ func (rad *RadiusAgent) ShouldRun() bool {
func (rad *RadiusAgent) StateChan(stateID string) chan struct{} {
return rad.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (rad *RadiusAgent) IntRPCConn() birpc.ClientConnector {
return rad.intRPCconn
}

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/commonlisteners"
@@ -33,11 +32,9 @@ import (
// NewRankingService returns the RankingS Service
func NewRankingService(cfg *config.CGRConfig,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) *RankingService {
return &RankingService{
cfg: cfg,
connMgr: connMgr,
srvDep: srvDep,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
@@ -45,16 +42,13 @@ func NewRankingService(cfg *config.CGRConfig,
type RankingService struct {
sync.RWMutex
cfg *config.CGRConfig
ran *engine.RankingS
cl *commonlisteners.CommonListenerS
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
srvDep map[string]*sync.WaitGroup
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -64,16 +58,17 @@ func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanag
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
registry, ran.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
ran.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheRankingProfiles,
@@ -82,11 +77,10 @@ func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanag
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
ran.Lock()
defer ran.Unlock()
ran.ran = engine.NewRankingS(dbs.DataManager(), ran.connMgr, fs.FilterS(), ran.cfg)
ran.ran = engine.NewRankingS(dbs.DataManager(), cms.ConnManager(), fs.FilterS(), ran.cfg)
if err := ran.ran.StartRankingS(context.TODO()); err != nil {
return err
}
@@ -99,7 +93,7 @@ func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanag
ran.cl.RpcRegister(s)
}
}
ran.intRPCconn = anz.GetInternalCodec(srv, utils.RankingS)
cms.AddInternalConn(utils.RankingS, srv)
return nil
}
@@ -136,8 +130,3 @@ func (ran *RankingService) ShouldRun() bool {
func (ran *RankingService) StateChan(stateID string) chan struct{} {
return ran.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (ran *RankingService) IntRPCConn() birpc.ClientConnector {
return ran.intRPCconn
}

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -42,16 +41,12 @@ func NewRateService(cfg *config.CGRConfig) *RateService {
// RateService is the service structure for RateS
type RateService struct {
sync.RWMutex
rateS *rates.RateS
cl *commonlisteners.CommonListenerS
rldChan chan struct{}
stopChan chan struct{}
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
cfg *config.CGRConfig
rateS *rates.RateS
cl *commonlisteners.CommonListenerS
rldChan chan struct{}
stopChan chan struct{}
stateDeps *StateDependencies // channel subscriptions for state changes
}
// ServiceName returns the service name
@@ -85,16 +80,17 @@ func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.S
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
registry, rs.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
rs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheRateProfiles,
@@ -102,12 +98,11 @@ func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.S
utils.CacheRateFilterIndexes); err != nil {
return err
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager()
rs.Lock()
rs.rateS = rates.NewRateS(rs.cfg, fs.FilterS(), dbs.DataManager())
rs.rateS = rates.NewRateS(rs.cfg, fs, dbs)
rs.Unlock()
rs.stopChan = make(chan struct{})
@@ -121,8 +116,7 @@ func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.S
if !rs.cfg.DispatcherSCfg().Enabled {
rs.cl.RpcRegister(srv)
}
rs.intRPCconn = anz.GetInternalCodec(srv, utils.RateS)
cms.AddInternalConn(utils.RateS, srv)
return
}
@@ -130,8 +124,3 @@ func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.S
func (rs *RateService) StateChan(stateID string) chan struct{} {
return rs.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (rs *RateService) IntRPCConn() birpc.ClientConnector {
return rs.intRPCconn
}

View File

@@ -21,19 +21,16 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/registrarc"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
// NewRegistrarCService returns the Dispatcher Service
func NewRegistrarCService(cfg *config.CGRConfig, connMgr *engine.ConnManager) *RegistrarCService {
func NewRegistrarCService(cfg *config.CGRConfig) *RegistrarCService {
return &RegistrarCService{
cfg: cfg,
connMgr: connMgr,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -41,26 +38,29 @@ func NewRegistrarCService(cfg *config.CGRConfig, connMgr *engine.ConnManager) *R
// RegistrarCService implements Service interface
type RegistrarCService struct {
sync.RWMutex
cfg *config.CGRConfig
dspS *registrarc.RegistrarCService
stopChan chan struct{}
rldChan chan struct{}
connMgr *engine.ConnManager
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
func (dspS *RegistrarCService) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
func (dspS *RegistrarCService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
dspS.Lock()
defer dspS.Unlock()
cms, err := WaitForServiceState(utils.StateServiceUP, utils.ConnManager, registry, dspS.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
dspS.stopChan = make(chan struct{})
dspS.rldChan = make(chan struct{})
dspS.dspS = registrarc.NewRegistrarCService(dspS.cfg, dspS.connMgr)
dspS.dspS = registrarc.NewRegistrarCService(dspS.cfg, cms.(*ConnManagerService).ConnManager())
go dspS.dspS.ListenAndServe(dspS.stopChan, dspS.rldChan)
return
}
@@ -96,8 +96,3 @@ func (dspS *RegistrarCService) ShouldRun() bool {
func (dspS *RegistrarCService) StateChan(stateID string) chan struct{} {
return dspS.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (dspS *RegistrarCService) IntRPCConn() birpc.ClientConnector {
return dspS.intRPCconn
}

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
@@ -32,11 +31,9 @@ import (
// NewResourceService returns the Resource Service
func NewResourceService(cfg *config.CGRConfig,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) *ResourceService {
return &ResourceService{
cfg: cfg,
connMgr: connMgr,
srvDep: srvDep,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
@@ -45,16 +42,13 @@ func NewResourceService(cfg *config.CGRConfig,
// ResourceService implements Service interface
type ResourceService struct {
sync.RWMutex
cfg *config.CGRConfig
reS *engine.ResourceS
cl *commonlisteners.CommonListenerS
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
srvDep map[string]*sync.WaitGroup
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
@@ -64,16 +58,17 @@ func (reS *ResourceService) Start(shutdown *utils.SyncedChan, registry *servmana
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
registry, reS.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
reS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheResourceProfiles,
@@ -83,11 +78,10 @@ func (reS *ResourceService) Start(shutdown *utils.SyncedChan, registry *servmana
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
reS.Lock()
defer reS.Unlock()
reS.reS = engine.NewResourceService(dbs.DataManager(), reS.cfg, fs.FilterS(), reS.connMgr)
reS.reS = engine.NewResourceService(dbs.DataManager(), reS.cfg, fs.FilterS(), cms.ConnManager())
reS.reS.StartLoop(context.TODO())
srv, _ := engine.NewService(reS.reS)
// srv, _ := birpc.NewService(apis.NewResourceSv1(reS.reS), "", false)
@@ -96,8 +90,7 @@ func (reS *ResourceService) Start(shutdown *utils.SyncedChan, registry *servmana
reS.cl.RpcRegister(s)
}
}
reS.intRPCconn = anz.GetInternalCodec(srv, utils.ResourceS)
cms.AddInternalConn(utils.ResourceS, srv)
return
}
@@ -134,8 +127,3 @@ func (reS *ResourceService) ShouldRun() bool {
func (reS *ResourceService) StateChan(stateID string) chan struct{} {
return reS.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (reS *ResourceService) IntRPCConn() birpc.ClientConnector {
return reS.intRPCconn
}

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -30,11 +29,9 @@ import (
)
// NewRouteService returns the Route Service
func NewRouteService(cfg *config.CGRConfig,
connMgr *engine.ConnManager) *RouteService {
func NewRouteService(cfg *config.CGRConfig) *RouteService {
return &RouteService{
cfg: cfg,
connMgr: connMgr,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -42,15 +39,12 @@ func NewRouteService(cfg *config.CGRConfig,
// RouteService implements Service interface
type RouteService struct {
sync.RWMutex
cfg *config.CGRConfig
routeS *engine.RouteS
cl *commonlisteners.CommonListenerS
connMgr *engine.ConnManager
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -58,16 +52,17 @@ func (routeS *RouteService) Start(shutdown *utils.SyncedChan, registry *servmana
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
registry, routeS.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
routeS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheRouteProfiles,
@@ -76,11 +71,10 @@ func (routeS *RouteService) Start(shutdown *utils.SyncedChan, registry *servmana
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
routeS.Lock()
defer routeS.Unlock()
routeS.routeS = engine.NewRouteService(dbs.DataManager(), fs.FilterS(), routeS.cfg, routeS.connMgr)
routeS.routeS = engine.NewRouteService(dbs.DataManager(), fs.FilterS(), routeS.cfg, cms.ConnManager())
srv, _ := engine.NewService(routeS.routeS)
// srv, _ := birpc.NewService(apis.NewRouteSv1(routeS.routeS), "", false)
if !routeS.cfg.DispatcherSCfg().Enabled {
@@ -88,7 +82,7 @@ func (routeS *RouteService) Start(shutdown *utils.SyncedChan, registry *servmana
routeS.cl.RpcRegister(s)
}
}
routeS.intRPCconn = anz.GetInternalCodec(srv, utils.RouteS)
cms.AddInternalConn(utils.RouteS, srv)
return
}
@@ -120,8 +114,3 @@ func (routeS *RouteService) ShouldRun() bool {
func (routeS *RouteService) StateChan(stateID string) chan struct{} {
return routeS.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (routeS *RouteService) IntRPCConn() birpc.ClientConnector {
return routeS.intRPCconn
}

View File

@@ -22,7 +22,6 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/engine"
@@ -33,11 +32,9 @@ import (
)
// NewSessionService returns the Session Service
func NewSessionService(cfg *config.CGRConfig,
connMgr *engine.ConnManager) *SessionService {
func NewSessionService(cfg *config.CGRConfig) *SessionService {
return &SessionService{
cfg: cfg,
connMgr: connMgr,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -51,11 +48,9 @@ type SessionService struct {
bircpEnabled bool // to stop birpc server if needed
stopChan chan struct{}
connMgr *engine.ConnManager
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
@@ -63,23 +58,23 @@ func (smg *SessionService) Start(shutdown *utils.SyncedChan, registry *servmanag
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
registry, smg.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
smg.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
smg.Lock()
defer smg.Unlock()
smg.sm = sessions.NewSessionS(smg.cfg, dbs.DataManager(), fs.FilterS(), smg.connMgr)
smg.sm = sessions.NewSessionS(smg.cfg, dbs.DataManager(), fs.FilterS(), cms.ConnManager())
//start sync session in a separate goroutine
smg.stopChan = make(chan struct{})
go smg.sm.ListenAndServe(smg.stopChan)
@@ -102,7 +97,7 @@ func (smg *SessionService) Start(shutdown *utils.SyncedChan, registry *servmanag
// run this in it's own goroutine
go smg.start(shutdown)
}
smg.intRPCconn = anz.GetInternalCodec(srv, utils.SessionS)
cms.AddInternalConn(utils.SessionS, srv)
return
}
@@ -155,8 +150,3 @@ func (smg *SessionService) ShouldRun() bool {
func (smg *SessionService) StateChan(stateID string) chan struct{} {
return smg.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (smg *SessionService) IntRPCConn() birpc.ClientConnector {
return smg.intRPCconn
}

View File

@@ -22,20 +22,16 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
// NewSIPAgent returns the sip Agent
func NewSIPAgent(cfg *config.CGRConfig,
connMgr *engine.ConnManager) *SIPAgent {
func NewSIPAgent(cfg *config.CGRConfig) *SIPAgent {
return &SIPAgent{
cfg: cfg,
connMgr: connMgr,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -45,30 +41,31 @@ type SIPAgent struct {
sync.RWMutex
cfg *config.CGRConfig
sip *agents.SIPAgent
connMgr *engine.ConnManager
sip *agents.SIPAgent
oldListen string
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
func (sip *SIPAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
fs, err := WaitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
sip.cfg.GeneralCfg().ConnectTimeout)
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.ConnManager,
utils.FilterS,
},
registry, sip.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
cm := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager()
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
sip.Lock()
defer sip.Unlock()
sip.oldListen = sip.cfg.SIPAgentCfg().Listen
sip.sip, err = agents.NewSIPAgent(sip.connMgr, sip.cfg, fs.(*FilterService).FilterS())
sip.sip, err = agents.NewSIPAgent(cm, sip.cfg, fs)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!",
utils.SIPAgent, err))
return
}
go sip.listenAndServe(shutdown)
@@ -119,8 +116,3 @@ func (sip *SIPAgent) ShouldRun() bool {
func (sip *SIPAgent) StateChan(stateID string) chan struct{} {
return sip.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (sip *SIPAgent) IntRPCConn() birpc.ClientConnector {
return sip.intRPCconn
}

View File

@@ -52,11 +52,11 @@ func (sDs *StateDependencies) StateChan(stateID string) (retChan chan struct{})
// WaitForServicesToReachState ensures each service reaches the desired state, with the timeout applied individually per service.
// Returns a map of service names to their instances or an error if any service fails to reach its state within its timeout window.
func WaitForServicesToReachState(state string, serviceIDs []string, indexer *servmanager.ServiceRegistry, timeout time.Duration,
func WaitForServicesToReachState(state string, serviceIDs []string, registry *servmanager.ServiceRegistry, timeout time.Duration,
) (map[string]servmanager.Service, error) {
services := make(map[string]servmanager.Service, len(serviceIDs))
for _, serviceID := range serviceIDs {
srv, err := WaitForServiceState(state, serviceID, indexer, timeout)
srv, err := WaitForServiceState(state, serviceID, registry, timeout)
if err != nil {
return nil, err
}
@@ -68,13 +68,19 @@ func WaitForServicesToReachState(state string, serviceIDs []string, indexer *ser
// WaitForServiceState waits up to timeout duration for a service to reach the specified state.
// Returns the service instance or an error if the timeout is exceeded.
func WaitForServiceState(state, serviceID string, indexer *servmanager.ServiceRegistry, timeout time.Duration,
func WaitForServiceState(state, serviceID string, registry *servmanager.ServiceRegistry, timeout time.Duration,
) (servmanager.Service, error) {
srv := indexer.Lookup(serviceID)
if serviceID == utils.AnalyzerS && !srv.ShouldRun() {
// Return disabled analyzer service immediately since dependent
// services still need the instance.
return srv, nil
srv := registry.Lookup(serviceID)
if !srv.ShouldRun() {
switch serviceID {
case utils.AnalyzerS:
// Return disabled analyzer service immediately since dependent
// services still need the instance.
return srv, nil
case utils.AttributeS:
// Don't make DispatcherS wait when AttributeS is disabled.
return srv, nil
}
}
select {
case <-srv.StateChan(state):

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
@@ -31,10 +30,9 @@ import (
)
// NewStatService returns the Stat Service
func NewStatService(cfg *config.CGRConfig, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup) *StatService {
func NewStatService(cfg *config.CGRConfig, srvDep map[string]*sync.WaitGroup) *StatService {
return &StatService{
cfg: cfg,
connMgr: connMgr,
srvDep: srvDep,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
@@ -43,16 +41,13 @@ func NewStatService(cfg *config.CGRConfig, connMgr *engine.ConnManager, srvDep m
// StatService implements Service interface
type StatService struct {
sync.RWMutex
cfg *config.CGRConfig
sts *engine.StatS
cl *commonlisteners.CommonListenerS
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
srvDep map[string]*sync.WaitGroup
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -62,16 +57,17 @@ func (sts *StatService) Start(shutdown *utils.SyncedChan, registry *servmanager.
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
registry, sts.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
sts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheStatQueueProfiles,
@@ -81,11 +77,10 @@ func (sts *StatService) Start(shutdown *utils.SyncedChan, registry *servmanager.
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
sts.Lock()
defer sts.Unlock()
sts.sts = engine.NewStatService(dbs.DataManager(), sts.cfg, fs.FilterS(), sts.connMgr)
sts.sts = engine.NewStatService(dbs.DataManager(), sts.cfg, fs.FilterS(), cms.ConnManager())
sts.sts.StartLoop(context.TODO())
srv, _ := engine.NewService(sts.sts)
// srv, _ := birpc.NewService(apis.NewStatSv1(sts.sts), "", false)
@@ -94,7 +89,7 @@ func (sts *StatService) Start(shutdown *utils.SyncedChan, registry *servmanager.
sts.cl.RpcRegister(s)
}
}
sts.intRPCconn = anz.GetInternalCodec(srv, utils.StatS)
cms.AddInternalConn(utils.StatS, srv)
return
}
@@ -131,8 +126,3 @@ func (sts *StatService) ShouldRun() bool {
func (sts *StatService) StateChan(stateID string) chan struct{} {
return sts.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (sts *StatService) IntRPCConn() birpc.ClientConnector {
return sts.intRPCconn
}

View File

@@ -22,7 +22,6 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -41,14 +40,11 @@ func NewStorDBService(cfg *config.CGRConfig, setVersions bool) *StorDBService {
// StorDBService implements Service interface
type StorDBService struct {
sync.RWMutex
cfg *config.CGRConfig
oldDBCfg *config.StorDbCfg
cfg *config.CGRConfig
oldDBCfg *config.StorDbCfg
db engine.StorDB
setVersions bool
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
@@ -177,8 +173,3 @@ func (db *StorDBService) DB() engine.StorDB {
func (db *StorDBService) StateChan(stateID string) chan struct{} {
return db.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (db *StorDBService) IntRPCConn() birpc.ClientConnector {
return db.intRPCconn
}

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
@@ -32,12 +31,10 @@ import (
// NewThresholdService returns the Threshold Service
func NewThresholdService(cfg *config.CGRConfig,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) *ThresholdService {
return &ThresholdService{
cfg: cfg,
srvDep: srvDep,
connMgr: connMgr,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -45,16 +42,13 @@ func NewThresholdService(cfg *config.CGRConfig,
// ThresholdService implements Service interface
type ThresholdService struct {
sync.RWMutex
cfg *config.CGRConfig
thrs *engine.ThresholdS
cl *commonlisteners.CommonListenerS
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
srvDep map[string]*sync.WaitGroup
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -64,16 +58,17 @@ func (thrs *ThresholdService) Start(shutdown *utils.SyncedChan, registry *servma
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
registry, thrs.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
thrs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheThresholdProfiles,
@@ -83,11 +78,10 @@ func (thrs *ThresholdService) Start(shutdown *utils.SyncedChan, registry *servma
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
thrs.Lock()
defer thrs.Unlock()
thrs.thrs = engine.NewThresholdService(dbs.DataManager(), thrs.cfg, fs.FilterS(), thrs.connMgr)
thrs.thrs = engine.NewThresholdService(dbs.DataManager(), thrs.cfg, fs.FilterS(), cms.ConnManager())
thrs.thrs.StartLoop(context.TODO())
srv, _ := engine.NewService(thrs.thrs)
// srv, _ := birpc.NewService(apis.NewThresholdSv1(thrs.thrs), "", false)
@@ -96,7 +90,7 @@ func (thrs *ThresholdService) Start(shutdown *utils.SyncedChan, registry *servma
thrs.cl.RpcRegister(s)
}
}
thrs.intRPCconn = anz.GetInternalCodec(srv, utils.ThresholdS)
cms.AddInternalConn(utils.ThresholdS, srv)
return
}
@@ -133,8 +127,3 @@ func (thrs *ThresholdService) ShouldRun() bool {
func (thrs *ThresholdService) StateChan(stateID string) chan struct{} {
return thrs.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (thrs *ThresholdService) IntRPCConn() birpc.ClientConnector {
return thrs.intRPCconn
}

View File

@@ -24,17 +24,15 @@ import (
"github.com/cgrates/cgrates/apis"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/tpes"
"github.com/cgrates/cgrates/utils"
)
// NewTPeService is the constructor for the TpeService
func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager) *TPeService {
func NewTPeService(cfg *config.CGRConfig) *TPeService {
return &TPeService{
cfg: cfg,
connMgr: connMgr,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
@@ -42,17 +40,12 @@ func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager) *TPeServi
// TypeService implements Service interface
type TPeService struct {
sync.RWMutex
tpes *tpes.TPeS
cl *commonlisteners.CommonListenerS
srv *birpc.Service
stopChan chan struct{}
connMgr *engine.ConnManager
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
cfg *config.CGRConfig
tpes *tpes.TPeS
cl *commonlisteners.CommonListenerS
srv *birpc.Service
stopChan chan struct{}
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
@@ -61,6 +54,7 @@ func (ts *TPeService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
utils.DataDB,
},
registry, ts.cfg.GeneralCfg().ConnectTimeout)
@@ -68,9 +62,10 @@ func (ts *TPeService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRe
return err
}
ts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
dbs := srvDeps[utils.DataDB].(*DataDBService)
cm := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager()
dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager()
ts.tpes = tpes.NewTPeS(ts.cfg, dbs.DataManager(), ts.connMgr)
ts.tpes = tpes.NewTPeS(ts.cfg, dbs, cm)
ts.stopChan = make(chan struct{})
ts.srv, _ = birpc.NewService(apis.NewTPeSv1(ts.tpes), utils.EmptyString, false)
ts.cl.RpcRegister(ts.srv)
@@ -103,8 +98,3 @@ func (ts *TPeService) ShouldRun() bool {
func (ts *TPeService) StateChan(stateID string) chan struct{} {
return ts.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (ts *TPeService) IntRPCConn() birpc.ClientConnector {
return ts.intRPCconn
}

View File

@@ -21,7 +21,6 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
@@ -32,11 +31,9 @@ import (
// NewTrendsService returns the TrendS Service
func NewTrendService(cfg *config.CGRConfig,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) *TrendService {
return &TrendService{
cfg: cfg,
connMgr: connMgr,
srvDep: srvDep,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
@@ -44,16 +41,13 @@ func NewTrendService(cfg *config.CGRConfig,
type TrendService struct {
sync.RWMutex
cfg *config.CGRConfig
trs *engine.TrendS
cl *commonlisteners.CommonListenerS
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
stateDeps *StateDependencies // channel subscriptions for state changes
srvDep map[string]*sync.WaitGroup
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -63,16 +57,17 @@ func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
utils.CacheS,
utils.FilterS,
utils.DataDB,
utils.AnalyzerS,
},
registry, trs.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
trs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
cacheS := srvDeps[utils.CacheS].(*CacheService)
if err = cacheS.WaitToPrecache(shutdown,
utils.CacheTrendProfiles,
@@ -81,11 +76,10 @@ func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager
}
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
anz := srvDeps[utils.AnalyzerS].(*AnalyzerService)
trs.Lock()
defer trs.Unlock()
trs.trs = engine.NewTrendService(dbs.DataManager(), trs.cfg, fs.FilterS(), trs.connMgr)
trs.trs = engine.NewTrendService(dbs.DataManager(), trs.cfg, fs.FilterS(), cms.ConnManager())
if err := trs.trs.StartTrendS(context.TODO()); err != nil {
return err
}
@@ -98,7 +92,7 @@ func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager
trs.cl.RpcRegister(s)
}
}
trs.intRPCconn = anz.GetInternalCodec(srv, utils.Trends)
cms.AddInternalConn(utils.TrendS, srv)
return nil
}
@@ -135,8 +129,3 @@ func (trs *TrendService) ShouldRun() bool {
func (trs *TrendService) StateChan(stateID string) chan struct{} {
return trs.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (trs *TrendService) IntRPCConn() birpc.ClientConnector {
return trs.intRPCconn
}

View File

@@ -44,19 +44,23 @@ func (r *ServiceRegistry) Lookup(id string) Service {
return r.services[id]
}
// Register adds or updates a Service using its name as the unique identifier.
// Will overwrite existing service if name conflicts.
func (r *ServiceRegistry) Register(s Service) {
// Register adds or updates Services using their name as the unique identifier.
// Will overwrite existing services if name conflicts.
func (r *ServiceRegistry) Register(svcs ...Service) {
r.mu.Lock()
defer r.mu.Unlock()
r.services[s.ServiceName()] = s
for _, svc := range svcs {
r.services[svc.ServiceName()] = svc
}
}
// Unregister removes a Service by ID.
func (r *ServiceRegistry) Unregister(id string) {
// Unregister removes Services by ID.
func (r *ServiceRegistry) Unregister(ids ...string) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.services, id)
for _, id := range ids {
delete(r.services, id)
}
}
// List returns a new slice containing all registered Services.

View File

@@ -22,25 +22,21 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// NewServiceManager returns a service manager
func NewServiceManager(shdWg *sync.WaitGroup, connMgr *engine.ConnManager,
cfg *config.CGRConfig, registry *ServiceRegistry, services []Service) (sM *ServiceManager) {
func NewServiceManager(shdWg *sync.WaitGroup, cfg *config.CGRConfig, registry *ServiceRegistry,
services []Service) (sM *ServiceManager) {
sM = &ServiceManager{
cfg: cfg,
registry: registry,
shdWg: shdWg,
connMgr: connMgr,
rldChan: cfg.GetReloadChan(),
}
sM.AddServices(services...)
sM.registry.Register(services...)
return
}
@@ -51,7 +47,6 @@ type ServiceManager struct {
registry *ServiceRegistry // index here the services for accessing them by their IDs
shdWg *sync.WaitGroup // list of shutdown items
rldChan <-chan string // reload signals come over this channelc
connMgr *engine.ConnManager
}
// StartServices starts all enabled services
@@ -64,8 +59,7 @@ func (m *ServiceManager) StartServices(shutdown *utils.SyncedChan) {
if svc.ShouldRun() && !IsServiceInState(svc, utils.StateServiceUP) {
m.shdWg.Add(1)
go func() {
if err := svc.Start(shutdown, m.registry); err != nil &&
err != utils.ErrServiceAlreadyRunning { // in case the service was started in another gorutine
if err := svc.Start(shutdown, m.registry); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed to start <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err))
shutdown.CloseOnce()
}
@@ -77,33 +71,6 @@ func (m *ServiceManager) StartServices(shutdown *utils.SyncedChan) {
// startServer()
}
// AddServices adds given services
func (m *ServiceManager) AddServices(services ...Service) {
m.Lock()
for _, svc := range services {
m.registry.Register(svc)
if sAPIData, hasAPIData := serviceAPIData[svc.ServiceName()]; hasAPIData { // Add the internal connections
rpcIntChan := make(chan birpc.ClientConnector, 1)
m.connMgr.AddInternalConn(sAPIData[1], sAPIData[0], rpcIntChan)
if len(sAPIData) > 2 { // Add the bidirectional API
m.connMgr.AddInternalConn(sAPIData[2], sAPIData[0], rpcIntChan)
}
go func() { // ToDo: centralize management into one single goroutine
if utils.StructChanTimeout(
m.registry.Lookup(svc.ServiceName()).StateChan(utils.StateServiceUP),
m.cfg.GeneralCfg().ConnectTimeout) {
utils.Logger.Err(
fmt.Sprintf("<%s> failed to register internal connection to service %s because of timeout waiting for ServiceUP state",
utils.ServiceManager, svc.ServiceName()))
// toDo: shutdown service
}
rpcIntChan <- svc.IntRPCConn()
}()
}
}
m.Unlock()
}
func (m *ServiceManager) handleReload(shutdown *utils.SyncedChan) {
var serviceID string
for {
@@ -113,12 +80,7 @@ func (m *ServiceManager) handleReload(shutdown *utils.SyncedChan) {
return
case serviceID = <-m.rldChan:
}
if serviceID == config.RPCConnsJSON {
go m.connMgr.Reload()
} else {
go m.reloadService(serviceID, shutdown)
}
go m.reloadService(serviceID, shutdown)
// handle RPC server
}
}
@@ -189,8 +151,6 @@ type Service interface {
ServiceName() string
// StateChan returns the channel for specific state subscription
StateChan(stateID string) chan struct{}
// IntRPCConn returns the connector needed for internal RPC connections
IntRPCConn() birpc.ClientConnector
}
// ArgsServiceID are passed to Start/Stop/Status RPC methods
@@ -338,88 +298,6 @@ func toggleService(id string, status bool, srvMngr *ServiceManager) (err error)
return
}
var serviceAPIData = map[string][]string{
utils.AnalyzerS: {
utils.AnalyzerSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAnalyzerS)},
utils.AdminS: {
utils.AdminSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAdminS)},
utils.AttributeS: {
utils.AttributeSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes)},
utils.CacheS: {
utils.CacheSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)},
utils.CDRs: {
utils.CDRsV1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs)},
utils.ChargerS: {
utils.ChargerSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers)},
utils.GuardianS: {
utils.GuardianSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian)},
utils.LoaderS: {
utils.LoaderSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaLoaders)},
utils.ResourceS: {
utils.ResourceSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources)},
utils.SessionS: {
utils.SessionSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS),
utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS)},
utils.StatS: {
utils.StatSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats)},
utils.RankingS: {
utils.RankingSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRankings)},
utils.TrendS: {
utils.TrendSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTrends)},
utils.RouteS: {
utils.RouteSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRoutes)},
utils.ThresholdS: {
utils.ThresholdSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds)},
utils.ServiceManagerS: {
utils.ServiceManagerV1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaServiceManager)},
utils.ConfigS: {
utils.ConfigSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig)},
utils.CoreS: {
utils.CoreSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore)},
utils.EEs: {
utils.EeSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs)},
utils.RateS: {
utils.RateSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRates)},
utils.DispatcherS: {
utils.DispatcherSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers)},
utils.AccountS: {
utils.AccountSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts)},
utils.ActionS: {
utils.ActionSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions)},
utils.TPeS: {
utils.TPeSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTpes)},
utils.EFs: {
utils.EfSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEFs)},
utils.ERs: {
utils.ErSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaERs)},
}
// IsServiceInState performs a non-blocking check to determine if a service is in the specified state.
func IsServiceInState(svc Service, state string) bool {
select {

View File

@@ -993,6 +993,7 @@ const (
GuardianS = "GuardianS"
ServiceManagerS = "ServiceManager"
CommonListenerS = "CommonListenerS"
ConnManager = "ConnManager"
)
// Lower service names