From 476f5ba877c84a26f5a524270ec3b041b994a97d Mon Sep 17 00:00:00 2001 From: DanB Date: Sat, 30 Nov 2024 19:13:19 +0100 Subject: [PATCH] Services with IntRPCConn method --- dispatchers/analyzers.go | 4 +- engine/connmanager.go | 2 +- services/accounts.go | 9 ++- services/actions.go | 10 +++- services/adminsv1.go | 9 ++- services/analyzers.go | 6 ++ services/asteriskagent.go | 7 +++ services/attributes.go | 10 +++- services/caches.go | 9 ++- services/cdrs.go | 10 +++- services/cgr-engine.go | 2 +- services/chargers.go | 10 +++- services/commonlisteners.go | 7 +++ services/cores.go | 10 +++- services/datadb.go | 7 +++ services/diameteragent.go | 7 +++ services/dispatchers.go | 9 ++- services/dnsagent.go | 7 +++ services/ees.go | 10 +++- services/efs.go | 6 ++ services/ers.go | 9 ++- services/freeswitchagent.go | 7 +++ services/globalvars.go | 7 +++ services/httpagent.go | 7 +++ services/janus.go | 7 +++ services/kamailioagent.go | 7 +++ services/loaders.go | 9 ++- services/radiusagent.go | 7 +++ services/rankings.go | 9 ++- services/rates.go | 10 +++- services/registrarc.go | 7 +++ services/resources.go | 10 +++- services/routes.go | 9 ++- services/sessions.go | 6 ++ services/sipagent.go | 7 +++ services/stats.go | 9 ++- services/stordb.go | 7 +++ services/thresholds.go | 9 ++- services/tpes.go | 6 ++ services/trends.go | 9 ++- servmanager/servmanager.go | 110 +++++++++++++++++++++++++++++++++++- utils/consts.go | 2 +- 42 files changed, 395 insertions(+), 26 deletions(-) diff --git a/dispatchers/analyzers.go b/dispatchers/analyzers.go index ef201cd34..56065de39 100644 --- a/dispatchers/analyzers.go +++ b/dispatchers/analyzers.go @@ -38,11 +38,11 @@ func (dS *DispatcherService) AnalyzerSv1Ping(ctx *context.Context, args *utils.C if args != nil { opts = args.APIOpts } - return dS.Dispatch(ctx, &utils.CGREvent{Tenant: tnt, Event: ev, APIOpts: opts}, utils.MetaAnalyzer, utils.AnalyzerSv1Ping, args, reply) + return dS.Dispatch(ctx, &utils.CGREvent{Tenant: tnt, Event: ev, APIOpts: opts}, utils.MetaAnalyzerS, utils.AnalyzerSv1Ping, args, reply) } func (dS *DispatcherService) AnalyzerSv1StringQuery(ctx *context.Context, args *analyzers.QueryArgs, reply *[]map[string]any) (err error) { tnt := dS.cfg.GeneralCfg().DefaultTenant ev := make(map[string]any) opts := make(map[string]any) - return dS.Dispatch(ctx, &utils.CGREvent{Tenant: tnt, Event: ev, APIOpts: opts}, utils.MetaAnalyzer, utils.AnalyzerSv1StringQuery, args, reply) + return dS.Dispatch(ctx, &utils.CGREvent{Tenant: tnt, Event: ev, APIOpts: opts}, utils.MetaAnalyzerS, utils.AnalyzerSv1StringQuery, args, reply) } diff --git a/engine/connmanager.go b/engine/connmanager.go index f8c81f400..da0eaccea 100644 --- a/engine/connmanager.go +++ b/engine/connmanager.go @@ -273,7 +273,7 @@ func (cM *ConnManager) EnableDispatcher(dsp IntService) { case strings.HasPrefix(m, utils.CoreS): key = utils.MetaCore case strings.HasPrefix(m, utils.AnalyzerS): - key = utils.MetaAnalyzer + key = utils.MetaAnalyzerS case strings.HasPrefix(m, utils.AdminS): key = utils.MetaAdminS case strings.HasPrefix(m, utils.LoaderS): diff --git a/services/accounts.go b/services/accounts.go index 793a3c0bd..f94e03e10 100644 --- a/services/accounts.go +++ b/services/accounts.go @@ -76,6 +76,7 @@ type AccountService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -121,7 +122,8 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e acts.cl.RpcRegister(srv) } - acts.connChan <- anz.GetInternalCodec(srv, utils.AccountS) + acts.intRPCconn = anz.GetInternalCodec(srv, utils.AccountS) + acts.connChan <- acts.intRPCconn return } @@ -164,3 +166,8 @@ func (acts *AccountService) ShouldRun() bool { func (acts *AccountService) StateChan(stateID string) chan struct{} { return acts.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (acts *AccountService) IntRPCConn() birpc.ClientConnector { + return acts.intRPCconn +} diff --git a/services/actions.go b/services/actions.go index 12f1c0a89..4aa603ea2 100644 --- a/services/actions.go +++ b/services/actions.go @@ -77,6 +77,7 @@ type ActionService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // share the API object implementing API calls for internal srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -120,7 +121,9 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er if !acts.cfg.DispatcherSCfg().Enabled { acts.cl.RpcRegister(srv) } - acts.connChan <- anz.GetInternalCodec(srv, utils.ActionS) + + acts.intRPCconn = anz.GetInternalCodec(srv, utils.ActionS) + acts.connChan <- acts.intRPCconn return } @@ -163,3 +166,8 @@ func (acts *ActionService) ShouldRun() bool { func (acts *ActionService) StateChan(stateID string) chan struct{} { return acts.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (acts *ActionService) IntRPCConn() birpc.ClientConnector { + return acts.intRPCconn +} diff --git a/services/adminsv1.go b/services/adminsv1.go index 038148ced..66d18b085 100644 --- a/services/adminsv1.go +++ b/services/adminsv1.go @@ -72,6 +72,7 @@ type AdminSv1Service struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // RPC connector with internal APIs srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -121,7 +122,8 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF } //backwards compatible - apiService.connChan <- anz.GetInternalCodec(srv, utils.AdminSv1) + apiService.intRPCconn = anz.GetInternalCodec(srv, utils.AdminSv1) + apiService.connChan <- apiService.intRPCconn return } @@ -163,3 +165,8 @@ func (apiService *AdminSv1Service) ShouldRun() bool { func (apiService *AdminSv1Service) StateChan(stateID string) chan struct{} { return apiService.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (apiService *AdminSv1Service) IntRPCConn() birpc.ClientConnector { + return apiService.intRPCconn +} diff --git a/services/analyzers.go b/services/analyzers.go index 406fa73c7..40eb86f15 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -66,6 +66,7 @@ type AnalyzerService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // share the API object implementing API calls for internal srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes @@ -177,3 +178,8 @@ func (anz *AnalyzerService) GetInternalCodec(c birpc.ClientConnector, to string) func (anz *AnalyzerService) StateChan(stateID string) chan struct{} { return anz.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (anz *AnalyzerService) IntRPCConn() birpc.ClientConnector { + return anz.intRPCconn +} diff --git a/services/asteriskagent.go b/services/asteriskagent.go index 21fab6735..a78ba2ac3 100644 --- a/services/asteriskagent.go +++ b/services/asteriskagent.go @@ -22,6 +22,7 @@ import ( "fmt" "sync" + "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/engine" @@ -54,6 +55,7 @@ type AsteriskAgent struct { connMgr *engine.ConnManager srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // share the API object implementing API calls for internal srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -123,3 +125,8 @@ func (ast *AsteriskAgent) ShouldRun() bool { func (ast *AsteriskAgent) StateChan(stateID string) chan struct{} { return ast.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (ast *AsteriskAgent) IntRPCConn() birpc.ClientConnector { + return ast.intRPCconn +} diff --git a/services/attributes.go b/services/attributes.go index 7946b0f42..a881316c2 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -72,6 +72,7 @@ type AttributeService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection serviceIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies } @@ -128,7 +129,9 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc) } }() - attrS.connChan <- anz.GetInternalCodec(srv, utils.AttributeS) + + attrS.intRPCconn = anz.GetInternalCodec(srv, utils.AttributeS) + attrS.connChan <- attrS.intRPCconn close(attrS.stateDeps.StateChan(utils.StateServiceUP)) // inform listeners about the service reaching UP state return } @@ -172,3 +175,8 @@ func (attrS *AttributeService) ShouldRun() bool { func (attrS *AttributeService) StateChan(stateID string) chan struct{} { return attrS.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (attrS *AttributeService) IntRPCConn() birpc.ClientConnector { + return attrS.intRPCconn +} diff --git a/services/caches.go b/services/caches.go index b4282541e..e5dc87e1d 100644 --- a/services/caches.go +++ b/services/caches.go @@ -67,6 +67,7 @@ type CacheService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -98,7 +99,8 @@ func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (e cS.cl.RpcRegister(s) } } - cS.rpc <- anz.GetInternalCodec(srv, utils.CacheS) + cS.intRPCconn = anz.GetInternalCodec(srv, utils.CacheS) + cS.rpc <- cS.intRPCconn return } @@ -155,3 +157,8 @@ func (cS *CacheService) WaitToPrecache(ctx *context.Context, cacheIDs ...string) func (cS *CacheService) StateChan(stateID string) chan struct{} { return cS.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (cS *CacheService) IntRPCConn() birpc.ClientConnector { + return cS.intRPCconn +} diff --git a/services/cdrs.go b/services/cdrs.go index cb81910ad..c8d93dbfe 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -73,6 +73,7 @@ type CDRService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -116,7 +117,9 @@ func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err err if !cs.cfg.DispatcherSCfg().Enabled { cs.cl.RpcRegister(srv) } - cs.connChan <- anz.GetInternalCodec(srv, utils.CDRServer) // Signal that cdrS is operational + + cs.intRPCconn = anz.GetInternalCodec(srv, utils.CDRServer) + cs.connChan <- cs.intRPCconn // Signal that cdrS is operational return } @@ -157,3 +160,8 @@ func (cs *CDRService) ShouldRun() bool { func (cs *CDRService) StateChan(stateID string) chan struct{} { return cs.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (cs *CDRService) IntRPCConn() birpc.ClientConnector { + return cs.intRPCconn +} diff --git a/services/cgr-engine.go b/services/cgr-engine.go index c46cf6af1..80577d76f 100644 --- a/services/cgr-engine.go +++ b/services/cgr-engine.go @@ -166,7 +166,7 @@ func (cgr *CGREngine) InitServices(setVersions bool) { // initialize the connManager before creating the DMService // because we need to pass the connection to it - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAnalyzer), utils.AnalyzerSv1, iAnalyzerSCh) + cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAnalyzerS), utils.AnalyzerSv1, iAnalyzerSCh) cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAdminS), utils.AdminSv1, iAdminSCh) cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes), utils.AttributeSv1, iAttributeSCh) cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches), utils.CacheSv1, iCacheSCh) diff --git a/services/chargers.go b/services/chargers.go index f8777691e..3f29e028a 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -70,6 +70,7 @@ type ChargerService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -109,7 +110,9 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e chrS.cl.RpcRegister(s) } } - chrS.connChan <- anz.GetInternalCodec(srv, utils.ChargerS) + + chrS.intRPCconn = anz.GetInternalCodec(srv, utils.ChargerS) + chrS.connChan <- chrS.intRPCconn return } @@ -150,3 +153,8 @@ func (chrS *ChargerService) ShouldRun() bool { func (chrS *ChargerService) StateChan(stateID string) chan struct{} { return chrS.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (chrS *ChargerService) IntRPCConn() birpc.ClientConnector { + return chrS.intRPCconn +} diff --git a/services/commonlisteners.go b/services/commonlisteners.go index cb876f2fc..96120df5a 100644 --- a/services/commonlisteners.go +++ b/services/commonlisteners.go @@ -21,6 +21,7 @@ package services import ( "sync" + "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" @@ -54,6 +55,7 @@ type CommonListenerService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -111,3 +113,8 @@ func (cl *CommonListenerService) ShouldRun() bool { func (cl *CommonListenerService) StateChan(stateID string) chan struct{} { return cl.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (cl *CommonListenerService) IntRPCConn() birpc.ClientConnector { + return cl.intRPCconn +} diff --git a/services/cores.go b/services/cores.go index 323b27b93..f6d3e19c3 100644 --- a/services/cores.go +++ b/services/cores.go @@ -72,6 +72,7 @@ type CoreService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -102,7 +103,9 @@ func (cS *CoreService) Start(ctx *context.Context, shtDw context.CancelFunc) err cS.cl.RpcRegister(s) } } - cS.connChan <- anz.GetInternalCodec(srv, utils.CoreS) + + cS.intRPCconn = anz.GetInternalCodec(srv, utils.CoreS) + cS.connChan <- cS.intRPCconn return nil } @@ -161,3 +164,8 @@ func (cS *CoreService) WaitForCoreS(ctx *context.Context) (cs *cores.CoreS, err func (cS *CoreService) StateChan(stateID string) chan struct{} { return cS.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (cS *CoreService) IntRPCConn() birpc.ClientConnector { + return cS.intRPCconn +} diff --git a/services/datadb.go b/services/datadb.go index c78785947..df6df9086 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -22,6 +22,7 @@ import ( "fmt" "sync" + "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -56,6 +57,7 @@ type DataDBService struct { srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -200,3 +202,8 @@ func (db *DataDBService) WaitForDM(ctx *context.Context) (datadb *engine.DataMan func (db *DataDBService) StateChan(stateID string) chan struct{} { return db.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (db *DataDBService) IntRPCConn() birpc.ClientConnector { + return db.intRPCconn +} diff --git a/services/diameteragent.go b/services/diameteragent.go index 40ce5bd65..d6b4b9dea 100644 --- a/services/diameteragent.go +++ b/services/diameteragent.go @@ -22,6 +22,7 @@ import ( "fmt" "sync" + "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" @@ -61,6 +62,7 @@ type DiameterAgent struct { srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -147,3 +149,8 @@ func (da *DiameterAgent) ShouldRun() bool { func (da *DiameterAgent) StateChan(stateID string) chan struct{} { return da.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (da *DiameterAgent) IntRPCConn() birpc.ClientConnector { + return da.intRPCconn +} diff --git a/services/dispatchers.go b/services/dispatchers.go index babc08703..5663a1111 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -72,6 +72,7 @@ type DispatcherService struct { srvsReload map[string]chan struct{} srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -117,7 +118,8 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc) // for the moment we dispable Apier through dispatcher // until we figured out a better sollution in case of gob server // dspS.server.SetDispatched() - dspS.connChan <- anz.GetInternalCodec(srv, utils.DispatcherS) + dspS.intRPCconn = anz.GetInternalCodec(srv, utils.DispatcherS) + dspS.connChan <- dspS.intRPCconn return } @@ -191,3 +193,8 @@ func (dspS *DispatcherService) sync() { func (dspS *DispatcherService) StateChan(stateID string) chan struct{} { return dspS.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (dspS *DispatcherService) IntRPCConn() birpc.ClientConnector { + return dspS.intRPCconn +} diff --git a/services/dnsagent.go b/services/dnsagent.go index f764370ed..00c0531f3 100644 --- a/services/dnsagent.go +++ b/services/dnsagent.go @@ -22,6 +22,7 @@ import ( "fmt" "sync" + "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" @@ -56,6 +57,7 @@ type DNSAgent struct { connMgr *engine.ConnManager srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -152,3 +154,8 @@ func (dns *DNSAgent) ShouldRun() bool { func (dns *DNSAgent) StateChan(stateID string) chan struct{} { return dns.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (dns *DNSAgent) IntRPCConn() birpc.ClientConnector { + return dns.intRPCconn +} diff --git a/services/ees.go b/services/ees.go index c894e5937..e0f7829e9 100644 --- a/services/ees.go +++ b/services/ees.go @@ -65,6 +65,7 @@ type EventExporterService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -136,7 +137,9 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc if !es.cfg.DispatcherSCfg().Enabled { es.cl.RpcRegister(srv) } - es.intConnChan <- anz.GetInternalCodec(srv, utils.EEs) + + es.intRPCconn = anz.GetInternalCodec(srv, utils.EEs) + es.intConnChan <- es.intRPCconn return nil } @@ -144,3 +147,8 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc func (es *EventExporterService) StateChan(stateID string) chan struct{} { return es.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (es *EventExporterService) IntRPCConn() birpc.ClientConnector { + return es.intRPCconn +} diff --git a/services/efs.go b/services/efs.go index 870ff6703..feb9696d9 100644 --- a/services/efs.go +++ b/services/efs.go @@ -49,6 +49,7 @@ type ExportFailoverService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -122,3 +123,8 @@ func (efServ *ExportFailoverService) ServiceName() string { func (efServ *ExportFailoverService) StateChan(stateID string) chan struct{} { return efServ.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (efServ *ExportFailoverService) IntRPCConn() birpc.ClientConnector { + return efServ.intRPCconn +} diff --git a/services/ers.go b/services/ers.go index 3d3cedb65..51b24ade4 100644 --- a/services/ers.go +++ b/services/ers.go @@ -73,6 +73,7 @@ type EventReaderService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -111,7 +112,8 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel if !erS.cfg.DispatcherSCfg().Enabled { erS.cl.RpcRegister(srv) } - erS.intConn <- anz.GetInternalCodec(srv, utils.ERs) + erS.intRPCconn = anz.GetInternalCodec(srv, utils.ERs) + erS.intConn <- erS.intRPCconn return } @@ -162,3 +164,8 @@ func (erS *EventReaderService) ShouldRun() bool { func (erS *EventReaderService) StateChan(stateID string) chan struct{} { return erS.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (erS *EventReaderService) IntRPCConn() birpc.ClientConnector { + return erS.intRPCconn +} diff --git a/services/freeswitchagent.go b/services/freeswitchagent.go index f9fdce110..1a0420ac9 100644 --- a/services/freeswitchagent.go +++ b/services/freeswitchagent.go @@ -22,6 +22,7 @@ import ( "fmt" "sync" + "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/engine" @@ -53,6 +54,7 @@ type FreeswitchAgent struct { connMgr *engine.ConnManager srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -122,3 +124,8 @@ func (fS *FreeswitchAgent) ShouldRun() bool { func (fS *FreeswitchAgent) StateChan(stateID string) chan struct{} { return fS.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (fS *FreeswitchAgent) IntRPCConn() birpc.ClientConnector { + return fS.intRPCconn +} diff --git a/services/globalvars.go b/services/globalvars.go index 875e38f56..62294deee 100644 --- a/services/globalvars.go +++ b/services/globalvars.go @@ -21,6 +21,7 @@ package services import ( "sync" + "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/engine" @@ -45,6 +46,7 @@ type GlobalVarS struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -93,3 +95,8 @@ func (gv *GlobalVarS) ShouldRun() bool { func (gv *GlobalVarS) StateChan(stateID string) chan struct{} { return gv.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (gv *GlobalVarS) IntRPCConn() birpc.ClientConnector { + return gv.intRPCconn +} diff --git a/services/httpagent.go b/services/httpagent.go index d9839e88f..9583636b5 100644 --- a/services/httpagent.go +++ b/services/httpagent.go @@ -22,6 +22,7 @@ import ( "fmt" "sync" + "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/commonlisteners" @@ -63,6 +64,7 @@ type HTTPAgent struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -127,3 +129,8 @@ func (ha *HTTPAgent) ShouldRun() bool { func (ha *HTTPAgent) StateChan(stateID string) chan struct{} { return ha.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (ha *HTTPAgent) IntRPCConn() birpc.ClientConnector { + return ha.intRPCconn +} diff --git a/services/janus.go b/services/janus.go index a14bfc97c..20d1156f1 100644 --- a/services/janus.go +++ b/services/janus.go @@ -23,6 +23,7 @@ import ( "net/http" "sync" + "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/commonlisteners" @@ -64,6 +65,7 @@ type JanusAgent struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -139,3 +141,8 @@ func (ja *JanusAgent) ShouldRun() bool { func (ja *JanusAgent) StateChan(stateID string) chan struct{} { return ja.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (ja *JanusAgent) IntRPCConn() birpc.ClientConnector { + return ja.intRPCconn +} diff --git a/services/kamailioagent.go b/services/kamailioagent.go index 9f61e0681..96ac32f9a 100644 --- a/services/kamailioagent.go +++ b/services/kamailioagent.go @@ -23,6 +23,7 @@ import ( "strings" "sync" + "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/engine" @@ -54,6 +55,7 @@ type KamailioAgent struct { connMgr *engine.ConnManager srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -128,3 +130,8 @@ func (kam *KamailioAgent) ShouldRun() bool { func (kam *KamailioAgent) StateChan(stateID string) chan struct{} { return kam.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (kam *KamailioAgent) IntRPCConn() birpc.ClientConnector { + return kam.intRPCconn +} diff --git a/services/loaders.go b/services/loaders.go index ef3748099..b7e470d10 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -71,6 +71,7 @@ type LoaderService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -112,7 +113,8 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er ldrs.cl.RpcRegister(s) } } - ldrs.connChan <- anz.GetInternalCodec(srv, utils.LoaderS) + ldrs.intRPCconn = anz.GetInternalCodec(srv, utils.LoaderS) + ldrs.connChan <- ldrs.intRPCconn return } @@ -178,3 +180,8 @@ func (ldrs *LoaderService) GetRPCChan() chan birpc.ClientConnector { func (ldrs *LoaderService) StateChan(stateID string) chan struct{} { return ldrs.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (ldrs *LoaderService) IntRPCConn() birpc.ClientConnector { + return ldrs.intRPCconn +} diff --git a/services/radiusagent.go b/services/radiusagent.go index 92bc331e4..0e4a6046e 100644 --- a/services/radiusagent.go +++ b/services/radiusagent.go @@ -22,6 +22,7 @@ import ( "fmt" "sync" + "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" @@ -59,6 +60,7 @@ type RadiusAgent struct { lauth string lacct string + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -146,3 +148,8 @@ func (rad *RadiusAgent) ShouldRun() bool { func (rad *RadiusAgent) StateChan(stateID string) chan struct{} { return rad.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (rad *RadiusAgent) IntRPCConn() birpc.ClientConnector { + return rad.intRPCconn +} diff --git a/services/rankings.go b/services/rankings.go index 0ed070c4b..96feb5ace 100644 --- a/services/rankings.go +++ b/services/rankings.go @@ -70,6 +70,7 @@ type RankingService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -118,7 +119,8 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er ran.cl.RpcRegister(s) } } - ran.connChan <- anz.GetInternalCodec(srv, utils.RankingS) + ran.intRPCconn = anz.GetInternalCodec(srv, utils.RankingS) + ran.connChan <- ran.intRPCconn return nil } @@ -161,3 +163,8 @@ func (ran *RankingService) ShouldRun() bool { func (ran *RankingService) StateChan(stateID string) chan struct{} { return ran.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (ran *RankingService) IntRPCConn() birpc.ClientConnector { + return ran.intRPCconn +} diff --git a/services/rates.go b/services/rates.go index 227e3a7a8..ff63d2ebb 100644 --- a/services/rates.go +++ b/services/rates.go @@ -71,6 +71,7 @@ type RateService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -150,7 +151,9 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er if !rs.cfg.DispatcherSCfg().Enabled { rs.cl.RpcRegister(srv) } - rs.intConnChan <- anz.GetInternalCodec(srv, utils.RateS) + + rs.intRPCconn = anz.GetInternalCodec(srv, utils.RateS) + rs.intConnChan <- rs.intRPCconn return } @@ -158,3 +161,8 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er func (rs *RateService) StateChan(stateID string) chan struct{} { return rs.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (rs *RateService) IntRPCConn() birpc.ClientConnector { + return rs.intRPCconn +} diff --git a/services/registrarc.go b/services/registrarc.go index 389fe1791..2d6387450 100644 --- a/services/registrarc.go +++ b/services/registrarc.go @@ -21,6 +21,7 @@ package services import ( "sync" + "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -53,6 +54,7 @@ type RegistrarCService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -112,3 +114,8 @@ func (dspS *RegistrarCService) ShouldRun() bool { func (dspS *RegistrarCService) StateChan(stateID string) chan struct{} { return dspS.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (dspS *RegistrarCService) IntRPCConn() birpc.ClientConnector { + return dspS.intRPCconn +} diff --git a/services/resources.go b/services/resources.go index 1f5327a88..59127a45b 100644 --- a/services/resources.go +++ b/services/resources.go @@ -70,6 +70,7 @@ type ResourceService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -112,7 +113,9 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e reS.cl.RpcRegister(s) } } - reS.connChan <- anz.GetInternalCodec(srv, utils.ResourceS) + + reS.intRPCconn = anz.GetInternalCodec(srv, utils.ResourceS) + reS.connChan <- reS.intRPCconn return } @@ -157,3 +160,8 @@ func (reS *ResourceService) ShouldRun() bool { func (reS *ResourceService) StateChan(stateID string) chan struct{} { return reS.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (reS *ResourceService) IntRPCConn() birpc.ClientConnector { + return reS.intRPCconn +} diff --git a/services/routes.go b/services/routes.go index 08b7e2c02..4e0049621 100644 --- a/services/routes.go +++ b/services/routes.go @@ -71,6 +71,7 @@ type RouteService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -111,7 +112,8 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e routeS.cl.RpcRegister(s) } } - routeS.connChan <- anz.GetInternalCodec(srv, utils.RouteS) + routeS.intRPCconn = anz.GetInternalCodec(srv, utils.RouteS) + routeS.connChan <- routeS.intRPCconn return } @@ -152,3 +154,8 @@ func (routeS *RouteService) ShouldRun() bool { func (routeS *RouteService) StateChan(stateID string) chan struct{} { return routeS.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (routeS *RouteService) IntRPCConn() birpc.ClientConnector { + return routeS.intRPCconn +} diff --git a/services/sessions.go b/services/sessions.go index 6c3686cbb..9588a067d 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -72,6 +72,7 @@ type SessionService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -182,3 +183,8 @@ func (smg *SessionService) ShouldRun() bool { func (smg *SessionService) StateChan(stateID string) chan struct{} { return smg.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (smg *SessionService) IntRPCConn() birpc.ClientConnector { + return smg.intRPCconn +} diff --git a/services/sipagent.go b/services/sipagent.go index bafbb1378..2dc19aea9 100644 --- a/services/sipagent.go +++ b/services/sipagent.go @@ -22,6 +22,7 @@ import ( "fmt" "sync" + "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" @@ -56,6 +57,7 @@ type SIPAgent struct { oldListen string + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -135,3 +137,8 @@ func (sip *SIPAgent) ShouldRun() bool { func (sip *SIPAgent) StateChan(stateID string) chan struct{} { return sip.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (sip *SIPAgent) IntRPCConn() birpc.ClientConnector { + return sip.intRPCconn +} diff --git a/services/stats.go b/services/stats.go index 58d58ac4a..659e48d65 100644 --- a/services/stats.go +++ b/services/stats.go @@ -70,6 +70,7 @@ type StatService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -114,7 +115,8 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e sts.cl.RpcRegister(s) } } - sts.connChan <- anz.GetInternalCodec(srv, utils.StatS) + sts.intRPCconn = anz.GetInternalCodec(srv, utils.StatS) + sts.connChan <- sts.intRPCconn return } @@ -159,3 +161,8 @@ func (sts *StatService) ShouldRun() bool { func (sts *StatService) StateChan(stateID string) chan struct{} { return sts.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (sts *StatService) IntRPCConn() birpc.ClientConnector { + return sts.intRPCconn +} diff --git a/services/stordb.go b/services/stordb.go index 648004f18..ab01e5581 100644 --- a/services/stordb.go +++ b/services/stordb.go @@ -22,6 +22,7 @@ import ( "fmt" "sync" + "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -53,6 +54,7 @@ type StorDBService struct { srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -214,3 +216,8 @@ func (db *StorDBService) needsConnectionReload() bool { func (db *StorDBService) StateChan(stateID string) chan struct{} { return db.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (db *StorDBService) IntRPCConn() birpc.ClientConnector { + return db.intRPCconn +} diff --git a/services/thresholds.go b/services/thresholds.go index ab5814700..610f00d5b 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -70,6 +70,7 @@ type ThresholdService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -113,7 +114,8 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc) thrs.cl.RpcRegister(s) } } - thrs.connChan <- anz.GetInternalCodec(srv, utils.ThresholdS) + thrs.intRPCconn = anz.GetInternalCodec(srv, utils.ThresholdS) + thrs.connChan <- thrs.intRPCconn return } @@ -158,3 +160,8 @@ func (thrs *ThresholdService) ShouldRun() bool { func (thrs *ThresholdService) StateChan(stateID string) chan struct{} { return thrs.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (thrs *ThresholdService) IntRPCConn() birpc.ClientConnector { + return thrs.intRPCconn +} diff --git a/services/tpes.go b/services/tpes.go index e2a345931..7dbd019f8 100644 --- a/services/tpes.go +++ b/services/tpes.go @@ -62,6 +62,7 @@ type TPeService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -117,3 +118,8 @@ func (ts *TPeService) ShouldRun() bool { func (ts *TPeService) StateChan(stateID string) chan struct{} { return ts.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (ts *TPeService) IntRPCConn() birpc.ClientConnector { + return ts.intRPCconn +} diff --git a/services/trends.go b/services/trends.go index 293693cf4..82341efac 100644 --- a/services/trends.go +++ b/services/trends.go @@ -69,6 +69,7 @@ type TrendService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes } @@ -115,7 +116,8 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err trs.cl.RpcRegister(s) } } - trs.connChan <- anz.GetInternalCodec(srv, utils.Trends) + trs.intRPCconn = anz.GetInternalCodec(srv, utils.Trends) + trs.connChan <- trs.intRPCconn return nil } @@ -158,3 +160,8 @@ func (trs *TrendService) ShouldRun() bool { func (trs *TrendService) StateChan(stateID string) chan struct{} { return trs.stateDeps.StateChan(stateID) } + +// IntRPCConn returns the internal connection used by RPCClient +func (trs *TrendService) IntRPCConn() birpc.ClientConnector { + return trs.intRPCconn +} diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index d1c777a7a..bdf36e019 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -22,10 +22,12 @@ import ( "fmt" "sync" + "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) // NewServiceManager returns a service manager @@ -50,9 +52,11 @@ type ServiceManager struct { serviceIndexer *ServiceIndexer // index here the services for accessing them by their IDs - shdWg *sync.WaitGroup // list of shutdown items - connMgr *engine.ConnManager + shdWg *sync.WaitGroup // list of shutdown items + rldChan <-chan string // reload signals come over this channelc + + connMgr *engine.ConnManager } // StartServices starts all enabled services @@ -92,6 +96,24 @@ func (srvMngr *ServiceManager) AddServices(services ...Service) { if _, has := srvMngr.subsystems[srv.ServiceName()]; !has { // do not rewrite the service srvMngr.subsystems[srv.ServiceName()] = srv } + if sAPIData, hasAPIData := serviceAPIData[srv.ServiceName()]; hasAPIData { // Add the internal connections + rpcIntChan := make(chan birpc.ClientConnector, 1) + srvMngr.connMgr.AddInternalConn(sAPIData[1], sAPIData[0], rpcIntChan) + if len(sAPIData) > 2 { // Add the bidirectional API + srvMngr.connMgr.AddInternalConn(sAPIData[2], sAPIData[0], rpcIntChan) + } + go func() { // ToDo: centralize management into one single goroutine + if utils.StructChanTimeout( + srvMngr.serviceIndexer.GetService(srv.ServiceName()).StateChan(utils.StateServiceUP), + srvMngr.cfg.GeneralCfg().ConnectTimeout) { + utils.Logger.Err( + fmt.Sprintf("<%s> failed to register internal connection to service %s because of timeout waiting for ServiceUP state", + utils.ServiceManager, srv.ServiceName())) + // toDo: shutdown service + } + rpcIntChan <- srv.IntRPCConn() + }() + } } srvMngr.Unlock() } @@ -181,6 +203,8 @@ type Service interface { ServiceName() string // StateChan returns the channel for specific state subscription StateChan(stateID string) chan struct{} + // IntRPCConn returns the connector needed for internal RPC connections + IntRPCConn() birpc.ClientConnector } // ArgsServiceID are passed to Start/Stop/Status RPC methods @@ -328,3 +352,85 @@ func toggleService(serviceID string, status bool, srvMngr *ServiceManager) (err } return } + +var serviceAPIData = map[string][]string{ + utils.AnalyzerS: { + utils.AnalyzerSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAnalyzerS)}, + utils.AdminS: { + utils.AdminSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAdminS)}, + utils.AttributeS: { + utils.AttributeSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes)}, + utils.CacheS: { + utils.CacheSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)}, + utils.CDRs: { + utils.CDRsV1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs)}, + utils.ChargerS: { + utils.ChargerSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers)}, + utils.GuardianS: { + utils.GuardianSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian)}, + utils.LoaderS: { + utils.LoaderSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaLoaders)}, + utils.ResourceS: { + utils.ResourceSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources)}, + utils.SessionS: { + utils.SessionSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS), + utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS)}, + utils.StatS: { + utils.StatSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats)}, + utils.RankingS: { + utils.RankingSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRankings)}, + utils.TrendS: { + utils.TrendSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTrends)}, + utils.RouteS: { + utils.RouteSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRoutes)}, + utils.ThresholdS: { + utils.ThresholdSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds)}, + utils.ServiceManagerS: { + utils.ServiceManagerV1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaServiceManager)}, + utils.ConfigS: { + utils.ConfigSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig)}, + utils.CoreS: { + utils.CoreSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore)}, + utils.EEs: { + utils.EeSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs)}, + utils.RateS: { + utils.RateSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRates)}, + utils.DispatcherS: { + utils.DispatcherSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers)}, + utils.AccountS: { + utils.AccountSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts)}, + utils.ActionS: { + utils.ActionSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions)}, + utils.TPeS: { + utils.TPeSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTpes)}, + utils.EFs: { + utils.EfSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEFs)}, + utils.ERs: { + utils.ErSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaERs)}, +} diff --git a/utils/consts.go b/utils/consts.go index 8bfa1159c..cf610e151 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -712,7 +712,7 @@ const ( Preference = "Preference" Flags = "Flags" Service = "Service" - MetaAnalyzer = "*analyzer" + MetaAnalyzerS = "*analyzers" CGREventString = "CGREvent" MetaTextPlain = "*text_plain" MetaRelease = "*release"