Services with IntRPCConn method

This commit is contained in:
DanB
2024-11-30 19:13:19 +01:00
parent 6b241ee35b
commit 476f5ba877
42 changed files with 395 additions and 26 deletions

View File

@@ -38,11 +38,11 @@ func (dS *DispatcherService) AnalyzerSv1Ping(ctx *context.Context, args *utils.C
if args != nil {
opts = args.APIOpts
}
return dS.Dispatch(ctx, &utils.CGREvent{Tenant: tnt, Event: ev, APIOpts: opts}, utils.MetaAnalyzer, utils.AnalyzerSv1Ping, args, reply)
return dS.Dispatch(ctx, &utils.CGREvent{Tenant: tnt, Event: ev, APIOpts: opts}, utils.MetaAnalyzerS, utils.AnalyzerSv1Ping, args, reply)
}
func (dS *DispatcherService) AnalyzerSv1StringQuery(ctx *context.Context, args *analyzers.QueryArgs, reply *[]map[string]any) (err error) {
tnt := dS.cfg.GeneralCfg().DefaultTenant
ev := make(map[string]any)
opts := make(map[string]any)
return dS.Dispatch(ctx, &utils.CGREvent{Tenant: tnt, Event: ev, APIOpts: opts}, utils.MetaAnalyzer, utils.AnalyzerSv1StringQuery, args, reply)
return dS.Dispatch(ctx, &utils.CGREvent{Tenant: tnt, Event: ev, APIOpts: opts}, utils.MetaAnalyzerS, utils.AnalyzerSv1StringQuery, args, reply)
}

View File

@@ -273,7 +273,7 @@ func (cM *ConnManager) EnableDispatcher(dsp IntService) {
case strings.HasPrefix(m, utils.CoreS):
key = utils.MetaCore
case strings.HasPrefix(m, utils.AnalyzerS):
key = utils.MetaAnalyzer
key = utils.MetaAnalyzerS
case strings.HasPrefix(m, utils.AdminS):
key = utils.MetaAdminS
case strings.HasPrefix(m, utils.LoaderS):

View File

@@ -76,6 +76,7 @@ type AccountService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -121,7 +122,8 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e
acts.cl.RpcRegister(srv)
}
acts.connChan <- anz.GetInternalCodec(srv, utils.AccountS)
acts.intRPCconn = anz.GetInternalCodec(srv, utils.AccountS)
acts.connChan <- acts.intRPCconn
return
}
@@ -164,3 +166,8 @@ func (acts *AccountService) ShouldRun() bool {
func (acts *AccountService) StateChan(stateID string) chan struct{} {
return acts.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (acts *AccountService) IntRPCConn() birpc.ClientConnector {
return acts.intRPCconn
}

View File

@@ -77,6 +77,7 @@ type ActionService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // share the API object implementing API calls for internal
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -120,7 +121,9 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er
if !acts.cfg.DispatcherSCfg().Enabled {
acts.cl.RpcRegister(srv)
}
acts.connChan <- anz.GetInternalCodec(srv, utils.ActionS)
acts.intRPCconn = anz.GetInternalCodec(srv, utils.ActionS)
acts.connChan <- acts.intRPCconn
return
}
@@ -163,3 +166,8 @@ func (acts *ActionService) ShouldRun() bool {
func (acts *ActionService) StateChan(stateID string) chan struct{} {
return acts.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (acts *ActionService) IntRPCConn() birpc.ClientConnector {
return acts.intRPCconn
}

View File

@@ -72,6 +72,7 @@ type AdminSv1Service struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // RPC connector with internal APIs
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -121,7 +122,8 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF
}
//backwards compatible
apiService.connChan <- anz.GetInternalCodec(srv, utils.AdminSv1)
apiService.intRPCconn = anz.GetInternalCodec(srv, utils.AdminSv1)
apiService.connChan <- apiService.intRPCconn
return
}
@@ -163,3 +165,8 @@ func (apiService *AdminSv1Service) ShouldRun() bool {
func (apiService *AdminSv1Service) StateChan(stateID string) chan struct{} {
return apiService.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (apiService *AdminSv1Service) IntRPCConn() birpc.ClientConnector {
return apiService.intRPCconn
}

View File

@@ -66,6 +66,7 @@ type AnalyzerService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // share the API object implementing API calls for internal
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
@@ -177,3 +178,8 @@ func (anz *AnalyzerService) GetInternalCodec(c birpc.ClientConnector, to string)
func (anz *AnalyzerService) StateChan(stateID string) chan struct{} {
return anz.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (anz *AnalyzerService) IntRPCConn() birpc.ClientConnector {
return anz.intRPCconn
}

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/engine"
@@ -54,6 +55,7 @@ type AsteriskAgent struct {
connMgr *engine.ConnManager
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // share the API object implementing API calls for internal
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -123,3 +125,8 @@ func (ast *AsteriskAgent) ShouldRun() bool {
func (ast *AsteriskAgent) StateChan(stateID string) chan struct{} {
return ast.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (ast *AsteriskAgent) IntRPCConn() birpc.ClientConnector {
return ast.intRPCconn
}

View File

@@ -72,6 +72,7 @@ type AttributeService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
serviceIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies
}
@@ -128,7 +129,9 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc)
}
}()
attrS.connChan <- anz.GetInternalCodec(srv, utils.AttributeS)
attrS.intRPCconn = anz.GetInternalCodec(srv, utils.AttributeS)
attrS.connChan <- attrS.intRPCconn
close(attrS.stateDeps.StateChan(utils.StateServiceUP)) // inform listeners about the service reaching UP state
return
}
@@ -172,3 +175,8 @@ func (attrS *AttributeService) ShouldRun() bool {
func (attrS *AttributeService) StateChan(stateID string) chan struct{} {
return attrS.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (attrS *AttributeService) IntRPCConn() birpc.ClientConnector {
return attrS.intRPCconn
}

View File

@@ -67,6 +67,7 @@ type CacheService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -98,7 +99,8 @@ func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (e
cS.cl.RpcRegister(s)
}
}
cS.rpc <- anz.GetInternalCodec(srv, utils.CacheS)
cS.intRPCconn = anz.GetInternalCodec(srv, utils.CacheS)
cS.rpc <- cS.intRPCconn
return
}
@@ -155,3 +157,8 @@ func (cS *CacheService) WaitToPrecache(ctx *context.Context, cacheIDs ...string)
func (cS *CacheService) StateChan(stateID string) chan struct{} {
return cS.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (cS *CacheService) IntRPCConn() birpc.ClientConnector {
return cS.intRPCconn
}

View File

@@ -73,6 +73,7 @@ type CDRService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -116,7 +117,9 @@ func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err err
if !cs.cfg.DispatcherSCfg().Enabled {
cs.cl.RpcRegister(srv)
}
cs.connChan <- anz.GetInternalCodec(srv, utils.CDRServer) // Signal that cdrS is operational
cs.intRPCconn = anz.GetInternalCodec(srv, utils.CDRServer)
cs.connChan <- cs.intRPCconn // Signal that cdrS is operational
return
}
@@ -157,3 +160,8 @@ func (cs *CDRService) ShouldRun() bool {
func (cs *CDRService) StateChan(stateID string) chan struct{} {
return cs.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (cs *CDRService) IntRPCConn() birpc.ClientConnector {
return cs.intRPCconn
}

View File

@@ -166,7 +166,7 @@ func (cgr *CGREngine) InitServices(setVersions bool) {
// initialize the connManager before creating the DMService
// because we need to pass the connection to it
cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAnalyzer), utils.AnalyzerSv1, iAnalyzerSCh)
cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAnalyzerS), utils.AnalyzerSv1, iAnalyzerSCh)
cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAdminS), utils.AdminSv1, iAdminSCh)
cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes), utils.AttributeSv1, iAttributeSCh)
cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches), utils.CacheSv1, iCacheSCh)

View File

@@ -70,6 +70,7 @@ type ChargerService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -109,7 +110,9 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e
chrS.cl.RpcRegister(s)
}
}
chrS.connChan <- anz.GetInternalCodec(srv, utils.ChargerS)
chrS.intRPCconn = anz.GetInternalCodec(srv, utils.ChargerS)
chrS.connChan <- chrS.intRPCconn
return
}
@@ -150,3 +153,8 @@ func (chrS *ChargerService) ShouldRun() bool {
func (chrS *ChargerService) StateChan(stateID string) chan struct{} {
return chrS.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (chrS *ChargerService) IntRPCConn() birpc.ClientConnector {
return chrS.intRPCconn
}

View File

@@ -21,6 +21,7 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
@@ -54,6 +55,7 @@ type CommonListenerService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -111,3 +113,8 @@ func (cl *CommonListenerService) ShouldRun() bool {
func (cl *CommonListenerService) StateChan(stateID string) chan struct{} {
return cl.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (cl *CommonListenerService) IntRPCConn() birpc.ClientConnector {
return cl.intRPCconn
}

View File

@@ -72,6 +72,7 @@ type CoreService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -102,7 +103,9 @@ func (cS *CoreService) Start(ctx *context.Context, shtDw context.CancelFunc) err
cS.cl.RpcRegister(s)
}
}
cS.connChan <- anz.GetInternalCodec(srv, utils.CoreS)
cS.intRPCconn = anz.GetInternalCodec(srv, utils.CoreS)
cS.connChan <- cS.intRPCconn
return nil
}
@@ -161,3 +164,8 @@ func (cS *CoreService) WaitForCoreS(ctx *context.Context) (cs *cores.CoreS, err
func (cS *CoreService) StateChan(stateID string) chan struct{} {
return cS.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (cS *CoreService) IntRPCConn() birpc.ClientConnector {
return cS.intRPCconn
}

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -56,6 +57,7 @@ type DataDBService struct {
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -200,3 +202,8 @@ func (db *DataDBService) WaitForDM(ctx *context.Context) (datadb *engine.DataMan
func (db *DataDBService) StateChan(stateID string) chan struct{} {
return db.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (db *DataDBService) IntRPCConn() birpc.ClientConnector {
return db.intRPCconn
}

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
@@ -61,6 +62,7 @@ type DiameterAgent struct {
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -147,3 +149,8 @@ func (da *DiameterAgent) ShouldRun() bool {
func (da *DiameterAgent) StateChan(stateID string) chan struct{} {
return da.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (da *DiameterAgent) IntRPCConn() birpc.ClientConnector {
return da.intRPCconn
}

View File

@@ -72,6 +72,7 @@ type DispatcherService struct {
srvsReload map[string]chan struct{}
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -117,7 +118,8 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc)
// for the moment we dispable Apier through dispatcher
// until we figured out a better sollution in case of gob server
// dspS.server.SetDispatched()
dspS.connChan <- anz.GetInternalCodec(srv, utils.DispatcherS)
dspS.intRPCconn = anz.GetInternalCodec(srv, utils.DispatcherS)
dspS.connChan <- dspS.intRPCconn
return
}
@@ -191,3 +193,8 @@ func (dspS *DispatcherService) sync() {
func (dspS *DispatcherService) StateChan(stateID string) chan struct{} {
return dspS.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (dspS *DispatcherService) IntRPCConn() birpc.ClientConnector {
return dspS.intRPCconn
}

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
@@ -56,6 +57,7 @@ type DNSAgent struct {
connMgr *engine.ConnManager
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -152,3 +154,8 @@ func (dns *DNSAgent) ShouldRun() bool {
func (dns *DNSAgent) StateChan(stateID string) chan struct{} {
return dns.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (dns *DNSAgent) IntRPCConn() birpc.ClientConnector {
return dns.intRPCconn
}

View File

@@ -65,6 +65,7 @@ type EventExporterService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -136,7 +137,9 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc
if !es.cfg.DispatcherSCfg().Enabled {
es.cl.RpcRegister(srv)
}
es.intConnChan <- anz.GetInternalCodec(srv, utils.EEs)
es.intRPCconn = anz.GetInternalCodec(srv, utils.EEs)
es.intConnChan <- es.intRPCconn
return nil
}
@@ -144,3 +147,8 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc
func (es *EventExporterService) StateChan(stateID string) chan struct{} {
return es.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (es *EventExporterService) IntRPCConn() birpc.ClientConnector {
return es.intRPCconn
}

View File

@@ -49,6 +49,7 @@ type ExportFailoverService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -122,3 +123,8 @@ func (efServ *ExportFailoverService) ServiceName() string {
func (efServ *ExportFailoverService) StateChan(stateID string) chan struct{} {
return efServ.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (efServ *ExportFailoverService) IntRPCConn() birpc.ClientConnector {
return efServ.intRPCconn
}

View File

@@ -73,6 +73,7 @@ type EventReaderService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -111,7 +112,8 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel
if !erS.cfg.DispatcherSCfg().Enabled {
erS.cl.RpcRegister(srv)
}
erS.intConn <- anz.GetInternalCodec(srv, utils.ERs)
erS.intRPCconn = anz.GetInternalCodec(srv, utils.ERs)
erS.intConn <- erS.intRPCconn
return
}
@@ -162,3 +164,8 @@ func (erS *EventReaderService) ShouldRun() bool {
func (erS *EventReaderService) StateChan(stateID string) chan struct{} {
return erS.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (erS *EventReaderService) IntRPCConn() birpc.ClientConnector {
return erS.intRPCconn
}

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/engine"
@@ -53,6 +54,7 @@ type FreeswitchAgent struct {
connMgr *engine.ConnManager
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -122,3 +124,8 @@ func (fS *FreeswitchAgent) ShouldRun() bool {
func (fS *FreeswitchAgent) StateChan(stateID string) chan struct{} {
return fS.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (fS *FreeswitchAgent) IntRPCConn() birpc.ClientConnector {
return fS.intRPCconn
}

View File

@@ -21,6 +21,7 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/engine"
@@ -45,6 +46,7 @@ type GlobalVarS struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -93,3 +95,8 @@ func (gv *GlobalVarS) ShouldRun() bool {
func (gv *GlobalVarS) StateChan(stateID string) chan struct{} {
return gv.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (gv *GlobalVarS) IntRPCConn() birpc.ClientConnector {
return gv.intRPCconn
}

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/commonlisteners"
@@ -63,6 +64,7 @@ type HTTPAgent struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -127,3 +129,8 @@ func (ha *HTTPAgent) ShouldRun() bool {
func (ha *HTTPAgent) StateChan(stateID string) chan struct{} {
return ha.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (ha *HTTPAgent) IntRPCConn() birpc.ClientConnector {
return ha.intRPCconn
}

View File

@@ -23,6 +23,7 @@ import (
"net/http"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/commonlisteners"
@@ -64,6 +65,7 @@ type JanusAgent struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -139,3 +141,8 @@ func (ja *JanusAgent) ShouldRun() bool {
func (ja *JanusAgent) StateChan(stateID string) chan struct{} {
return ja.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (ja *JanusAgent) IntRPCConn() birpc.ClientConnector {
return ja.intRPCconn
}

View File

@@ -23,6 +23,7 @@ import (
"strings"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/engine"
@@ -54,6 +55,7 @@ type KamailioAgent struct {
connMgr *engine.ConnManager
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -128,3 +130,8 @@ func (kam *KamailioAgent) ShouldRun() bool {
func (kam *KamailioAgent) StateChan(stateID string) chan struct{} {
return kam.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (kam *KamailioAgent) IntRPCConn() birpc.ClientConnector {
return kam.intRPCconn
}

View File

@@ -71,6 +71,7 @@ type LoaderService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -112,7 +113,8 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er
ldrs.cl.RpcRegister(s)
}
}
ldrs.connChan <- anz.GetInternalCodec(srv, utils.LoaderS)
ldrs.intRPCconn = anz.GetInternalCodec(srv, utils.LoaderS)
ldrs.connChan <- ldrs.intRPCconn
return
}
@@ -178,3 +180,8 @@ func (ldrs *LoaderService) GetRPCChan() chan birpc.ClientConnector {
func (ldrs *LoaderService) StateChan(stateID string) chan struct{} {
return ldrs.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (ldrs *LoaderService) IntRPCConn() birpc.ClientConnector {
return ldrs.intRPCconn
}

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
@@ -59,6 +60,7 @@ type RadiusAgent struct {
lauth string
lacct string
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -146,3 +148,8 @@ func (rad *RadiusAgent) ShouldRun() bool {
func (rad *RadiusAgent) StateChan(stateID string) chan struct{} {
return rad.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (rad *RadiusAgent) IntRPCConn() birpc.ClientConnector {
return rad.intRPCconn
}

View File

@@ -70,6 +70,7 @@ type RankingService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -118,7 +119,8 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er
ran.cl.RpcRegister(s)
}
}
ran.connChan <- anz.GetInternalCodec(srv, utils.RankingS)
ran.intRPCconn = anz.GetInternalCodec(srv, utils.RankingS)
ran.connChan <- ran.intRPCconn
return nil
}
@@ -161,3 +163,8 @@ func (ran *RankingService) ShouldRun() bool {
func (ran *RankingService) StateChan(stateID string) chan struct{} {
return ran.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (ran *RankingService) IntRPCConn() birpc.ClientConnector {
return ran.intRPCconn
}

View File

@@ -71,6 +71,7 @@ type RateService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -150,7 +151,9 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er
if !rs.cfg.DispatcherSCfg().Enabled {
rs.cl.RpcRegister(srv)
}
rs.intConnChan <- anz.GetInternalCodec(srv, utils.RateS)
rs.intRPCconn = anz.GetInternalCodec(srv, utils.RateS)
rs.intConnChan <- rs.intRPCconn
return
}
@@ -158,3 +161,8 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er
func (rs *RateService) StateChan(stateID string) chan struct{} {
return rs.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (rs *RateService) IntRPCConn() birpc.ClientConnector {
return rs.intRPCconn
}

View File

@@ -21,6 +21,7 @@ package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -53,6 +54,7 @@ type RegistrarCService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -112,3 +114,8 @@ func (dspS *RegistrarCService) ShouldRun() bool {
func (dspS *RegistrarCService) StateChan(stateID string) chan struct{} {
return dspS.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (dspS *RegistrarCService) IntRPCConn() birpc.ClientConnector {
return dspS.intRPCconn
}

View File

@@ -70,6 +70,7 @@ type ResourceService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -112,7 +113,9 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e
reS.cl.RpcRegister(s)
}
}
reS.connChan <- anz.GetInternalCodec(srv, utils.ResourceS)
reS.intRPCconn = anz.GetInternalCodec(srv, utils.ResourceS)
reS.connChan <- reS.intRPCconn
return
}
@@ -157,3 +160,8 @@ func (reS *ResourceService) ShouldRun() bool {
func (reS *ResourceService) StateChan(stateID string) chan struct{} {
return reS.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (reS *ResourceService) IntRPCConn() birpc.ClientConnector {
return reS.intRPCconn
}

View File

@@ -71,6 +71,7 @@ type RouteService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -111,7 +112,8 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e
routeS.cl.RpcRegister(s)
}
}
routeS.connChan <- anz.GetInternalCodec(srv, utils.RouteS)
routeS.intRPCconn = anz.GetInternalCodec(srv, utils.RouteS)
routeS.connChan <- routeS.intRPCconn
return
}
@@ -152,3 +154,8 @@ func (routeS *RouteService) ShouldRun() bool {
func (routeS *RouteService) StateChan(stateID string) chan struct{} {
return routeS.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (routeS *RouteService) IntRPCConn() birpc.ClientConnector {
return routeS.intRPCconn
}

View File

@@ -72,6 +72,7 @@ type SessionService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -182,3 +183,8 @@ func (smg *SessionService) ShouldRun() bool {
func (smg *SessionService) StateChan(stateID string) chan struct{} {
return smg.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (smg *SessionService) IntRPCConn() birpc.ClientConnector {
return smg.intRPCconn
}

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
@@ -56,6 +57,7 @@ type SIPAgent struct {
oldListen string
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -135,3 +137,8 @@ func (sip *SIPAgent) ShouldRun() bool {
func (sip *SIPAgent) StateChan(stateID string) chan struct{} {
return sip.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (sip *SIPAgent) IntRPCConn() birpc.ClientConnector {
return sip.intRPCconn
}

View File

@@ -70,6 +70,7 @@ type StatService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -114,7 +115,8 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e
sts.cl.RpcRegister(s)
}
}
sts.connChan <- anz.GetInternalCodec(srv, utils.StatS)
sts.intRPCconn = anz.GetInternalCodec(srv, utils.StatS)
sts.connChan <- sts.intRPCconn
return
}
@@ -159,3 +161,8 @@ func (sts *StatService) ShouldRun() bool {
func (sts *StatService) StateChan(stateID string) chan struct{} {
return sts.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (sts *StatService) IntRPCConn() birpc.ClientConnector {
return sts.intRPCconn
}

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -53,6 +54,7 @@ type StorDBService struct {
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -214,3 +216,8 @@ func (db *StorDBService) needsConnectionReload() bool {
func (db *StorDBService) StateChan(stateID string) chan struct{} {
return db.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (db *StorDBService) IntRPCConn() birpc.ClientConnector {
return db.intRPCconn
}

View File

@@ -70,6 +70,7 @@ type ThresholdService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -113,7 +114,8 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc)
thrs.cl.RpcRegister(s)
}
}
thrs.connChan <- anz.GetInternalCodec(srv, utils.ThresholdS)
thrs.intRPCconn = anz.GetInternalCodec(srv, utils.ThresholdS)
thrs.connChan <- thrs.intRPCconn
return
}
@@ -158,3 +160,8 @@ func (thrs *ThresholdService) ShouldRun() bool {
func (thrs *ThresholdService) StateChan(stateID string) chan struct{} {
return thrs.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (thrs *ThresholdService) IntRPCConn() birpc.ClientConnector {
return thrs.intRPCconn
}

View File

@@ -62,6 +62,7 @@ type TPeService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -117,3 +118,8 @@ func (ts *TPeService) ShouldRun() bool {
func (ts *TPeService) StateChan(stateID string) chan struct{} {
return ts.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (ts *TPeService) IntRPCConn() birpc.ClientConnector {
return ts.intRPCconn
}

View File

@@ -69,6 +69,7 @@ type TrendService struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
@@ -115,7 +116,8 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err
trs.cl.RpcRegister(s)
}
}
trs.connChan <- anz.GetInternalCodec(srv, utils.Trends)
trs.intRPCconn = anz.GetInternalCodec(srv, utils.Trends)
trs.connChan <- trs.intRPCconn
return nil
}
@@ -158,3 +160,8 @@ func (trs *TrendService) ShouldRun() bool {
func (trs *TrendService) StateChan(stateID string) chan struct{} {
return trs.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (trs *TrendService) IntRPCConn() birpc.ClientConnector {
return trs.intRPCconn
}

View File

@@ -22,10 +22,12 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// NewServiceManager returns a service manager
@@ -50,9 +52,11 @@ type ServiceManager struct {
serviceIndexer *ServiceIndexer // index here the services for accessing them by their IDs
shdWg *sync.WaitGroup // list of shutdown items
connMgr *engine.ConnManager
shdWg *sync.WaitGroup // list of shutdown items
rldChan <-chan string // reload signals come over this channelc
connMgr *engine.ConnManager
}
// StartServices starts all enabled services
@@ -92,6 +96,24 @@ func (srvMngr *ServiceManager) AddServices(services ...Service) {
if _, has := srvMngr.subsystems[srv.ServiceName()]; !has { // do not rewrite the service
srvMngr.subsystems[srv.ServiceName()] = srv
}
if sAPIData, hasAPIData := serviceAPIData[srv.ServiceName()]; hasAPIData { // Add the internal connections
rpcIntChan := make(chan birpc.ClientConnector, 1)
srvMngr.connMgr.AddInternalConn(sAPIData[1], sAPIData[0], rpcIntChan)
if len(sAPIData) > 2 { // Add the bidirectional API
srvMngr.connMgr.AddInternalConn(sAPIData[2], sAPIData[0], rpcIntChan)
}
go func() { // ToDo: centralize management into one single goroutine
if utils.StructChanTimeout(
srvMngr.serviceIndexer.GetService(srv.ServiceName()).StateChan(utils.StateServiceUP),
srvMngr.cfg.GeneralCfg().ConnectTimeout) {
utils.Logger.Err(
fmt.Sprintf("<%s> failed to register internal connection to service %s because of timeout waiting for ServiceUP state",
utils.ServiceManager, srv.ServiceName()))
// toDo: shutdown service
}
rpcIntChan <- srv.IntRPCConn()
}()
}
}
srvMngr.Unlock()
}
@@ -181,6 +203,8 @@ type Service interface {
ServiceName() string
// StateChan returns the channel for specific state subscription
StateChan(stateID string) chan struct{}
// IntRPCConn returns the connector needed for internal RPC connections
IntRPCConn() birpc.ClientConnector
}
// ArgsServiceID are passed to Start/Stop/Status RPC methods
@@ -328,3 +352,85 @@ func toggleService(serviceID string, status bool, srvMngr *ServiceManager) (err
}
return
}
var serviceAPIData = map[string][]string{
utils.AnalyzerS: {
utils.AnalyzerSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAnalyzerS)},
utils.AdminS: {
utils.AdminSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAdminS)},
utils.AttributeS: {
utils.AttributeSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes)},
utils.CacheS: {
utils.CacheSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)},
utils.CDRs: {
utils.CDRsV1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs)},
utils.ChargerS: {
utils.ChargerSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers)},
utils.GuardianS: {
utils.GuardianSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian)},
utils.LoaderS: {
utils.LoaderSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaLoaders)},
utils.ResourceS: {
utils.ResourceSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources)},
utils.SessionS: {
utils.SessionSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS),
utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS)},
utils.StatS: {
utils.StatSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats)},
utils.RankingS: {
utils.RankingSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRankings)},
utils.TrendS: {
utils.TrendSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTrends)},
utils.RouteS: {
utils.RouteSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRoutes)},
utils.ThresholdS: {
utils.ThresholdSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds)},
utils.ServiceManagerS: {
utils.ServiceManagerV1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaServiceManager)},
utils.ConfigS: {
utils.ConfigSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig)},
utils.CoreS: {
utils.CoreSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore)},
utils.EEs: {
utils.EeSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs)},
utils.RateS: {
utils.RateSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRates)},
utils.DispatcherS: {
utils.DispatcherSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers)},
utils.AccountS: {
utils.AccountSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts)},
utils.ActionS: {
utils.ActionSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions)},
utils.TPeS: {
utils.TPeSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTpes)},
utils.EFs: {
utils.EfSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEFs)},
utils.ERs: {
utils.ErSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaERs)},
}

View File

@@ -712,7 +712,7 @@ const (
Preference = "Preference"
Flags = "Flags"
Service = "Service"
MetaAnalyzer = "*analyzer"
MetaAnalyzerS = "*analyzers"
CGREventString = "CGREvent"
MetaTextPlain = "*text_plain"
MetaRelease = "*release"