mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Add a PoC for ConnManager with EventReader
This commit is contained in:
@@ -486,7 +486,10 @@ func main() {
|
||||
internalCacheSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
internalGuardianSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
|
||||
internalCDRServerChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
|
||||
internalCDRServerChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
|
||||
internalAttributeSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
|
||||
internalDispatcherSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
|
||||
internalSessionSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
|
||||
|
||||
// init CacheS
|
||||
cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan)
|
||||
@@ -500,9 +503,8 @@ func main() {
|
||||
// Start ServiceManager
|
||||
srvManager := servmanager.NewServiceManager(cfg, exitChan)
|
||||
|
||||
connManager := services.NewConnManager(cfg)
|
||||
attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server)
|
||||
dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, attrS.GetIntenternalChan(), connManager)
|
||||
attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan)
|
||||
dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan, internalDispatcherSChan)
|
||||
chrS := services.NewChargerService(cfg, dmService, cacheS, filterSChan, server,
|
||||
attrS.GetIntenternalChan(), dspS.GetIntenternalChan())
|
||||
tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server)
|
||||
@@ -526,12 +528,36 @@ func main() {
|
||||
smg := services.NewSessionService(cfg, dmService, server, chrS.GetIntenternalChan(),
|
||||
rals.GetResponder().GetIntenternalChan(), reS.GetIntenternalChan(),
|
||||
tS.GetIntenternalChan(), stS.GetIntenternalChan(), supS.GetIntenternalChan(),
|
||||
attrS.GetIntenternalChan(), cdrS.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan)
|
||||
attrS.GetIntenternalChan(), cdrS.GetIntenternalChan(), dspS.GetIntenternalChan(), internalSessionSChan, exitChan)
|
||||
ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, internalCacheSChan, dspS.GetIntenternalChan(), exitChan)
|
||||
anz := services.NewAnalyzerService(cfg, server, exitChan)
|
||||
|
||||
connManager := services.NewConnManagerService(cfg, map[string]chan rpcclient.RpcClientConnection{
|
||||
utils.AnalyzerSv1: anz.GetIntenternalChan(),
|
||||
utils.ApierV1: rals.GetAPIv1().GetIntenternalChan(),
|
||||
utils.ApierV2: rals.GetAPIv2().GetIntenternalChan(),
|
||||
utils.AttributeSv1: internalAttributeSChan,
|
||||
utils.CacheSv1: internalCacheSChan,
|
||||
utils.CDRsV1: cdrS.GetIntenternalChan(),
|
||||
utils.CDRsV2: cdrS.GetIntenternalChan(),
|
||||
utils.ChargerSv1: chrS.GetIntenternalChan(),
|
||||
utils.GuardianSv1: internalGuardianSChan,
|
||||
utils.LoaderSv1: ldrs.GetIntenternalChan(),
|
||||
utils.ResourceSv1: reS.GetIntenternalChan(),
|
||||
utils.Responder: rals.GetResponder().GetIntenternalChan(),
|
||||
utils.SchedulerSv1: schS.GetIntenternalChan(),
|
||||
utils.SessionSv1: internalSessionSChan,
|
||||
utils.StatSv1: stS.GetIntenternalChan(),
|
||||
utils.SupplierSv1: supS.GetIntenternalChan(),
|
||||
utils.ThresholdSv1: tS.GetIntenternalChan(),
|
||||
utils.ServiceManagerV1: internalServeManagerChan,
|
||||
utils.ConfigSv1: internalConfigChan,
|
||||
utils.CoreSv1: internalCoreSv1Chan,
|
||||
utils.RALsV1: rals.GetIntenternalChan(),
|
||||
})
|
||||
srvManager.AddServices(connManager, attrS, chrS, tS, stS, reS, supS, schS, rals,
|
||||
rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS, smg,
|
||||
services.NewEventReaderService(cfg, filterSChan, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan, connManager),
|
||||
services.NewEventReaderService(cfg, filterSChan, exitChan, connManager.GetConnMgr()),
|
||||
services.NewDNSAgent(cfg, filterSChan, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan),
|
||||
services.NewFreeswitchAgent(cfg, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan),
|
||||
services.NewKamailioAgent(cfg, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan),
|
||||
|
||||
@@ -373,11 +373,11 @@ func (cfg *CGRConfig) checkConfigSanity() error {
|
||||
}
|
||||
// EventReader sanity checks
|
||||
if cfg.ersCfg.Enabled {
|
||||
for _, connCfg := range cfg.ersCfg.SessionSConns {
|
||||
if _, has := cfg.rpcConns[connCfg]; !has {
|
||||
return fmt.Errorf("<%s> Connection with id: <%s> not defined", utils.ERs, connCfg)
|
||||
}
|
||||
}
|
||||
//for _, connCfg := range cfg.ersCfg.SessionSConns {
|
||||
// if _, has := cfg.rpcConns[connCfg]; !has {
|
||||
// return fmt.Errorf("<%s> Connection with id: <%s> not defined", utils.ERs, connCfg)
|
||||
// }
|
||||
//}
|
||||
for _, rdr := range cfg.ersCfg.Readers {
|
||||
if !possibleReaderTypes.Has(rdr.Type) {
|
||||
return fmt.Errorf("<%s> unsupported data type: %s for reader with ID: %s", utils.ERs, rdr.Type, rdr.ID)
|
||||
|
||||
@@ -40,7 +40,12 @@ func (erS *ERsCfg) loadFromJsonCfg(jsnCfg *ERsJsonCfg, sep string, dfltRdrCfg *E
|
||||
if jsnCfg.Sessions_conns != nil {
|
||||
erS.SessionSConns = make([]string, len(*jsnCfg.Sessions_conns))
|
||||
for i, fID := range *jsnCfg.Sessions_conns {
|
||||
erS.SessionSConns[i] = fID
|
||||
// if we have the connection internal we change the name so we can have internal rpc for each subsystem
|
||||
if fID == utils.MetaInternal {
|
||||
erS.SessionSConns[i] = utils.SessionSv1
|
||||
} else {
|
||||
erS.SessionSConns[i] = fID
|
||||
}
|
||||
}
|
||||
}
|
||||
return erS.appendERsReaders(jsnCfg.Readers, sep, dfltRdrCfg)
|
||||
|
||||
@@ -82,7 +82,7 @@
|
||||
|
||||
"ers": {
|
||||
"enabled": true,
|
||||
"sessions_conns": ["*localhost"],
|
||||
"sessions_conns": ["*internal"],
|
||||
"readers": [
|
||||
{
|
||||
"id": "file_reader1",
|
||||
|
||||
86
engine/connmanager.go
Normal file
86
engine/connmanager.go
Normal file
@@ -0,0 +1,86 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package engine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
// NewConnManager returns the Connection Manager
|
||||
func NewConnManager(cfg *config.CGRConfig, rpcInternal map[string]chan rpcclient.RpcClientConnection) (cM *ConnManager) {
|
||||
fmt.Println("Enter in engine NewConn Manager")
|
||||
fmt.Println(rpcInternal)
|
||||
return &ConnManager{cfg: cfg, rpcInternal: rpcInternal}
|
||||
}
|
||||
|
||||
type ConnManager struct {
|
||||
cfg *config.CGRConfig
|
||||
rpcInternal map[string]chan rpcclient.RpcClientConnection
|
||||
}
|
||||
|
||||
func (cM *ConnManager) getConn(connID string) (connPool *rpcclient.RpcClientPool, err error) {
|
||||
//try to get the connection from cache
|
||||
if x, ok := Cache.Get(utils.CacheRPCConnections, connID); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*rpcclient.RpcClientPool), nil
|
||||
}
|
||||
// in case we don't found in cache create the connection and add this in cache
|
||||
var intChan chan rpcclient.RpcClientConnection
|
||||
var connCfg *config.RPCConn
|
||||
fmt.Println("TEST ?? ")
|
||||
fmt.Println(cM)
|
||||
fmt.Println(cM.rpcInternal)
|
||||
fmt.Println(connID)
|
||||
if internalChan, has := cM.rpcInternal[connID]; has {
|
||||
connCfg = cM.cfg.RPCConns()[utils.MetaInternal]
|
||||
intChan = internalChan
|
||||
} else {
|
||||
connCfg = cM.cfg.RPCConns()[connID]
|
||||
}
|
||||
connPool, err = NewRPCPool(connCfg.Strategy,
|
||||
cM.cfg.TlsCfg().ClientKey,
|
||||
cM.cfg.TlsCfg().ClientCerificate, cM.cfg.TlsCfg().CaCertificate,
|
||||
cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects,
|
||||
cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout,
|
||||
connCfg.Conns, intChan, false)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
Cache.Set(utils.CacheRPCConnections, connID, connPool, nil,
|
||||
true, utils.NonTransactional)
|
||||
return
|
||||
}
|
||||
|
||||
func (cM *ConnManager) Call(connIDs []string, method string, arg, reply interface{}) (err error) {
|
||||
for _, connID := range connIDs {
|
||||
conn, err := cM.getConn(connID)
|
||||
if err == nil {
|
||||
if err := conn.Call(method, arg, reply); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
42
ers/ers.go
42
ers/ers.go
@@ -26,7 +26,6 @@ import (
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/sessions"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
// erEvent is passed from reader to ERs
|
||||
@@ -36,8 +35,7 @@ type erEvent struct {
|
||||
}
|
||||
|
||||
// NewERService instantiates the ERService
|
||||
func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS,
|
||||
sS rpcclient.RpcClientConnection, stopChan chan struct{}) *ERService {
|
||||
func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, stopChan chan struct{}, connMgr *engine.ConnManager) *ERService {
|
||||
return &ERService{
|
||||
cfg: cfg,
|
||||
rdrs: make(map[string]EventReader),
|
||||
@@ -46,8 +44,8 @@ func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS,
|
||||
rdrEvents: make(chan *erEvent),
|
||||
rdrErr: make(chan error),
|
||||
filterS: filterS,
|
||||
sS: sS,
|
||||
stopChan: stopChan,
|
||||
connMgr: connMgr,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,8 +60,8 @@ type ERService struct {
|
||||
rdrErr chan error // receive here errors which should stop the app
|
||||
|
||||
filterS *engine.FilterS
|
||||
sS rpcclient.RpcClientConnection // connection towards SessionS
|
||||
stopChan chan struct{}
|
||||
connMgr *engine.ConnManager
|
||||
}
|
||||
|
||||
// ListenAndServe keeps the service alive
|
||||
@@ -195,8 +193,10 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventRe
|
||||
cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator,
|
||||
)
|
||||
rply := new(sessions.V1AuthorizeReply)
|
||||
err = erS.sS.Call(utils.SessionSv1AuthorizeEvent,
|
||||
fmt.Println("Call Auth")
|
||||
err = erS.connMgr.Call(erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1AuthorizeEvent,
|
||||
authArgs, rply)
|
||||
fmt.Println(" Finish Call Auth")
|
||||
case utils.MetaInitiate:
|
||||
initArgs := sessions.NewV1InitSessionArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
@@ -209,8 +209,10 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventRe
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
cgrEv, cgrArgs.ArgDispatcher)
|
||||
rply := new(sessions.V1InitSessionReply)
|
||||
err = erS.sS.Call(utils.SessionSv1InitiateSession,
|
||||
fmt.Println("Call Init")
|
||||
err = erS.connMgr.Call(erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1InitiateSession,
|
||||
initArgs, rply)
|
||||
fmt.Println(" Finish Call Init")
|
||||
case utils.MetaUpdate:
|
||||
updateArgs := sessions.NewV1UpdateSessionArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
@@ -218,8 +220,10 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventRe
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
cgrEv, cgrArgs.ArgDispatcher)
|
||||
rply := new(sessions.V1UpdateSessionReply)
|
||||
err = erS.sS.Call(utils.SessionSv1UpdateSession,
|
||||
fmt.Println("Call Update")
|
||||
err = erS.connMgr.Call(erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1UpdateSession,
|
||||
updateArgs, rply)
|
||||
fmt.Println(" Finish Call Update")
|
||||
case utils.MetaTerminate:
|
||||
terminateArgs := sessions.NewV1TerminateSessionArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
@@ -230,8 +234,10 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventRe
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaStats),
|
||||
cgrEv, cgrArgs.ArgDispatcher)
|
||||
rply := utils.StringPointer("")
|
||||
err = erS.sS.Call(utils.SessionSv1TerminateSession,
|
||||
fmt.Println("Call Terminate")
|
||||
err = erS.connMgr.Call(erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1TerminateSession,
|
||||
terminateArgs, rply)
|
||||
fmt.Println(" Finish Call Terminate")
|
||||
case utils.MetaMessage:
|
||||
evArgs := sessions.NewV1ProcessMessageArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
@@ -247,8 +253,10 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventRe
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost),
|
||||
cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator)
|
||||
rply := new(sessions.V1ProcessMessageReply) // need it so rpcclient can clone
|
||||
err = erS.sS.Call(utils.SessionSv1ProcessMessage,
|
||||
fmt.Println("Call ProcMsg")
|
||||
err = erS.connMgr.Call(erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1ProcessMessage,
|
||||
evArgs, rply)
|
||||
fmt.Println("Finish Call ProcMsg")
|
||||
if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
|
||||
cgrEv.Event[utils.Usage] = 0 // avoid further debits
|
||||
} else if evArgs.Debit {
|
||||
@@ -262,8 +270,10 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventRe
|
||||
Paginator: *cgrArgs.SupplierPaginator,
|
||||
}
|
||||
rply := new(sessions.V1ProcessEventReply)
|
||||
err = erS.sS.Call(utils.SessionSv1ProcessEvent,
|
||||
fmt.Println("Call Event")
|
||||
err = erS.connMgr.Call(erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1ProcessEvent,
|
||||
evArgs, rply)
|
||||
fmt.Println("Finish Call Event")
|
||||
case utils.MetaCDRs: // allow CDR processing
|
||||
}
|
||||
if err != nil {
|
||||
@@ -273,16 +283,12 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventRe
|
||||
if rdrCfg.Flags.HasKey(utils.MetaCDRs) &&
|
||||
!rdrCfg.Flags.HasKey(utils.MetaDryRun) {
|
||||
rplyCDRs := utils.StringPointer("")
|
||||
err = erS.sS.Call(utils.SessionSv1ProcessCDR,
|
||||
fmt.Println("Call cdrs")
|
||||
err = erS.connMgr.Call(erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1ProcessCDR,
|
||||
&utils.CGREventWithArgDispatcher{CGREvent: cgrEv,
|
||||
ArgDispatcher: cgrArgs.ArgDispatcher}, &rplyCDRs)
|
||||
fmt.Println("finish Call cdrs")
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// SetSessionSConnection sets the new connection to the threshold service
|
||||
// only used on reload
|
||||
func (erS *ERService) SetSessionSConnection(sS rpcclient.RpcClientConnection) {
|
||||
erS.sS = sS
|
||||
}
|
||||
|
||||
@@ -38,8 +38,7 @@ func TestERsNewERService(t *testing.T) {
|
||||
stopLsn: make(map[string]chan struct{}),
|
||||
rdrEvents: make(chan *erEvent),
|
||||
rdrErr: make(chan error),
|
||||
stopChan: nil,
|
||||
sS: nil}
|
||||
stopChan: nil}
|
||||
rcv := NewERService(cfg, fltrS, nil, nil)
|
||||
|
||||
if !reflect.DeepEqual(expected.cfg, rcv.cfg) {
|
||||
|
||||
@@ -62,7 +62,7 @@ accid23;*rated;cgrates.org;1001;086517174963;2013-02-03 19:54:00;26;val_extra3;"
|
||||
testCsvITInitConfig,
|
||||
testCsvITInitCdrDb,
|
||||
testCsvITResetDataDb,
|
||||
testCsvITStartEngine,
|
||||
//testCsvITStartEngine,
|
||||
testCsvITRpcConn,
|
||||
testCsvITLoadTPFromFolder,
|
||||
testCsvITHandleCdr1File,
|
||||
@@ -75,7 +75,7 @@ accid23;*rated;cgrates.org;1001;086517174963;2013-02-03 19:54:00;26;val_extra3;"
|
||||
testCsvITProcessFilteredCDR,
|
||||
testCsvITAnalyzeFilteredCDR,
|
||||
testCsvITProcessedFiles,
|
||||
testCsvITCleanupFiles,
|
||||
//testCsvITCleanupFiles,
|
||||
testCsvITKillEngine,
|
||||
}
|
||||
)
|
||||
@@ -120,6 +120,7 @@ func testCsvITCreateCdrDirs(t *testing.T) {
|
||||
t.Fatal("Error creating folder: ", dir, err)
|
||||
}
|
||||
}
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
|
||||
func testCsvITStartEngine(t *testing.T) {
|
||||
|
||||
@@ -33,9 +33,9 @@ import (
|
||||
// NewAttributeService returns the Attribute Service
|
||||
func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
|
||||
server *utils.Server) servmanager.Service {
|
||||
server *utils.Server, internalChan chan rpcclient.RpcClientConnection) servmanager.Service {
|
||||
return &AttributeService{
|
||||
connChan: make(chan rpcclient.RpcClientConnection, 1),
|
||||
connChan: internalChan,
|
||||
cfg: cfg,
|
||||
dm: dm,
|
||||
cacheS: cacheS,
|
||||
|
||||
@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package services
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
@@ -27,72 +28,56 @@ import (
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
// NewConnManager returns the Connection Manager
|
||||
func NewConnManager(cfg *config.CGRConfig) (cM *ConnManager) {
|
||||
return &ConnManager{cfg: cfg}
|
||||
func NewConnManagerService(cfg *config.CGRConfig, intConns map[string]chan rpcclient.RpcClientConnection) *ConnManagerService {
|
||||
fmt.Println("Enter in NewConnManagerService")
|
||||
fmt.Println(intConns)
|
||||
return &ConnManagerService{
|
||||
cfg: cfg,
|
||||
connMgr: engine.NewConnManager(cfg, intConns),
|
||||
}
|
||||
}
|
||||
|
||||
type ConnManager struct {
|
||||
type ConnManagerService struct {
|
||||
sync.RWMutex
|
||||
cfg *config.CGRConfig
|
||||
}
|
||||
|
||||
func (cM *ConnManager) GetConn(connID string,
|
||||
internalChan chan rpcclient.RpcClientConnection) (connPool *rpcclient.RpcClientPool, err error) {
|
||||
//try to get the connection from cache
|
||||
if x, ok := engine.Cache.Get(utils.CacheRPCConnections, connID); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*rpcclient.RpcClientPool), nil
|
||||
}
|
||||
// in case we don't found in cache create the connection and add this in cache
|
||||
connCfg := cM.cfg.RPCConns()[connID]
|
||||
connPool, err = engine.NewRPCPool(connCfg.Strategy,
|
||||
cM.cfg.TlsCfg().ClientKey,
|
||||
cM.cfg.TlsCfg().ClientCerificate, cM.cfg.TlsCfg().CaCertificate,
|
||||
cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects,
|
||||
cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout,
|
||||
connCfg.Conns, internalChan, false)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
engine.Cache.Set(utils.CacheRPCConnections, connID, connPool, nil,
|
||||
true, utils.NonTransactional)
|
||||
return
|
||||
cfg *config.CGRConfig
|
||||
connMgr *engine.ConnManager
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (cM *ConnManager) Start() (err error) {
|
||||
func (cM *ConnManagerService) Start() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// GetIntenternalChan returns the internal connection chanel
|
||||
func (cM *ConnManager) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) {
|
||||
func (cM *ConnManagerService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (cM *ConnManager) Reload() (err error) {
|
||||
func (cM *ConnManagerService) Reload() (err error) {
|
||||
return // for the momment nothing to reload
|
||||
}
|
||||
|
||||
// Shutdown stops the service
|
||||
func (cM *ConnManager) Shutdown() (err error) {
|
||||
func (cM *ConnManagerService) Shutdown() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// IsRunning returns if the service is running
|
||||
func (cM *ConnManager) IsRunning() bool {
|
||||
func (cM *ConnManagerService) IsRunning() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// ServiceName returns the service name
|
||||
func (cM *ConnManager) ServiceName() string {
|
||||
func (cM *ConnManagerService) ServiceName() string {
|
||||
return utils.RPCConnS
|
||||
}
|
||||
|
||||
// ShouldRun returns if the service should be running
|
||||
func (cM *ConnManager) ShouldRun() bool {
|
||||
func (cM *ConnManagerService) ShouldRun() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (cM *ConnManagerService) GetConnMgr() *engine.ConnManager {
|
||||
return cM.connMgr
|
||||
}
|
||||
|
||||
@@ -34,17 +34,15 @@ import (
|
||||
// NewDispatcherService returns the Dispatcher Service
|
||||
func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
|
||||
server *utils.Server, attrsChan chan rpcclient.RpcClientConnection,
|
||||
connMgr *ConnManager) servmanager.Service {
|
||||
server *utils.Server, attrsChan, internalChan chan rpcclient.RpcClientConnection) servmanager.Service {
|
||||
return &DispatcherService{
|
||||
connChan: make(chan rpcclient.RpcClientConnection, 1),
|
||||
connChan: internalChan,
|
||||
cfg: cfg,
|
||||
dm: dm,
|
||||
cacheS: cacheS,
|
||||
filterSChan: filterSChan,
|
||||
server: server,
|
||||
attrsChan: attrsChan,
|
||||
conMgr: connMgr,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,7 +59,6 @@ type DispatcherService struct {
|
||||
dspS *dispatchers.DispatcherService
|
||||
rpc *v1.DispatcherSv1
|
||||
connChan chan rpcclient.RpcClientConnection
|
||||
conMgr *ConnManager
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
|
||||
@@ -32,32 +32,27 @@ import (
|
||||
|
||||
// NewEventReaderService returns the EventReader Service
|
||||
func NewEventReaderService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
|
||||
sSChan, dispatcherChan chan rpcclient.RpcClientConnection,
|
||||
exitChan chan bool, connMgr *ConnManager) servmanager.Service {
|
||||
exitChan chan bool, connMgr *engine.ConnManager) servmanager.Service {
|
||||
return &EventReaderService{
|
||||
rldChan: make(chan struct{}, 1),
|
||||
cfg: cfg,
|
||||
filterSChan: filterSChan,
|
||||
sSChan: sSChan,
|
||||
dispatcherChan: dispatcherChan,
|
||||
exitChan: exitChan,
|
||||
connMgr: connMgr,
|
||||
rldChan: make(chan struct{}, 1),
|
||||
cfg: cfg,
|
||||
filterSChan: filterSChan,
|
||||
exitChan: exitChan,
|
||||
connMgr: connMgr,
|
||||
}
|
||||
}
|
||||
|
||||
// EventReaderService implements Service interface
|
||||
type EventReaderService struct {
|
||||
sync.RWMutex
|
||||
cfg *config.CGRConfig
|
||||
filterSChan chan *engine.FilterS
|
||||
sSChan chan rpcclient.RpcClientConnection
|
||||
dispatcherChan chan rpcclient.RpcClientConnection
|
||||
exitChan chan bool
|
||||
cfg *config.CGRConfig
|
||||
filterSChan chan *engine.FilterS
|
||||
exitChan chan bool
|
||||
|
||||
ers *ers.ERService
|
||||
rldChan chan struct{}
|
||||
stopChan chan struct{}
|
||||
connMgr *ConnManager
|
||||
connMgr *engine.ConnManager
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
@@ -76,17 +71,9 @@ func (erS *EventReaderService) Start() (err error) {
|
||||
erS.stopChan = make(chan struct{}, 1)
|
||||
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ERs))
|
||||
var sS rpcclient.RpcClientConnection
|
||||
|
||||
if sS, err = NewConnectionPool(erS.cfg, erS.sSChan, erS.dispatcherChan,
|
||||
erS.cfg.ERsCfg().SessionSConns, erS.connMgr); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>",
|
||||
utils.ERs, utils.SessionS, err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
// build the service
|
||||
erS.ers = ers.NewERService(erS.cfg, filterS, sS, erS.stopChan)
|
||||
erS.ers = ers.NewERService(erS.cfg, filterS, erS.stopChan, erS.connMgr)
|
||||
go func(ers *ers.ERService, rldChan chan struct{}) {
|
||||
if err := ers.ListenAndServe(rldChan); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error()))
|
||||
@@ -103,15 +90,7 @@ func (erS *EventReaderService) GetIntenternalChan() (conn chan rpcclient.RpcClie
|
||||
|
||||
// Reload handles the change of config
|
||||
func (erS *EventReaderService) Reload() (err error) {
|
||||
var sS rpcclient.RpcClientConnection
|
||||
if sS, err = NewConnectionPool(erS.cfg, erS.sSChan,
|
||||
erS.dispatcherChan, erS.cfg.ERsCfg().SessionSConns, erS.connMgr); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>",
|
||||
utils.ERs, utils.SessionS, err.Error()))
|
||||
return
|
||||
}
|
||||
erS.RLock()
|
||||
erS.ers.SetSessionSConnection(sS)
|
||||
erS.rldChan <- struct{}{}
|
||||
erS.RUnlock()
|
||||
return
|
||||
|
||||
@@ -33,10 +33,10 @@ import (
|
||||
// NewSessionService returns the Session Service
|
||||
func NewSessionService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
server *utils.Server, chrsChan, respChan, resChan, thsChan, stsChan,
|
||||
supChan, attrsChan, cdrsChan, dispatcherChan chan rpcclient.RpcClientConnection,
|
||||
supChan, attrsChan, cdrsChan, dispatcherChan, internalChan chan rpcclient.RpcClientConnection,
|
||||
exitChan chan bool) servmanager.Service {
|
||||
return &SessionService{
|
||||
connChan: make(chan rpcclient.RpcClientConnection, 1),
|
||||
connChan: internalChan,
|
||||
cfg: cfg,
|
||||
dm: dm,
|
||||
server: server,
|
||||
|
||||
@@ -41,30 +41,3 @@ func NewConnection(cfg *config.CGRConfig, serviceConnChan, dispatcherSChan chan
|
||||
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
|
||||
conns, internalChan, false)
|
||||
}
|
||||
|
||||
// NewConnection returns a new connection
|
||||
func NewConnectionPool(cfg *config.CGRConfig, serviceConnChan, dispatcherSChan chan rpcclient.RpcClientConnection,
|
||||
connsIDs []string, connMgr *ConnManager) (rpcclient.RpcClientConnection, error) {
|
||||
var rpcClient *rpcclient.RpcClientPool
|
||||
var err error
|
||||
if len(connsIDs) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
internalChan := serviceConnChan
|
||||
if cfg.DispatcherSCfg().Enabled {
|
||||
internalChan = dispatcherSChan
|
||||
}
|
||||
rpcPool := rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST, cfg.GeneralCfg().ReplyTimeout)
|
||||
atLestOneConnected := false // If one connected we don't longer return errors
|
||||
for _, connID := range connsIDs {
|
||||
rpcClient, err = connMgr.GetConn(connID, internalChan)
|
||||
if err == nil {
|
||||
atLestOneConnected = true
|
||||
}
|
||||
rpcPool.AddClient(rpcClient)
|
||||
}
|
||||
if atLestOneConnected {
|
||||
err = nil
|
||||
}
|
||||
return rpcPool, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user