diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go
index ce5ed4433..079214e8f 100644
--- a/cmd/cgr-engine/cgr-engine.go
+++ b/cmd/cgr-engine/cgr-engine.go
@@ -486,7 +486,10 @@ func main() {
internalCacheSChan := make(chan rpcclient.RpcClientConnection, 1)
internalGuardianSChan := make(chan rpcclient.RpcClientConnection, 1)
- internalCDRServerChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
+ internalCDRServerChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
+ internalAttributeSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
+ internalDispatcherSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
+ internalSessionSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
// init CacheS
cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan)
@@ -500,9 +503,8 @@ func main() {
// Start ServiceManager
srvManager := servmanager.NewServiceManager(cfg, exitChan)
- connManager := services.NewConnManager(cfg)
- attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server)
- dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, attrS.GetIntenternalChan(), connManager)
+ attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan)
+ dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan, internalDispatcherSChan)
chrS := services.NewChargerService(cfg, dmService, cacheS, filterSChan, server,
attrS.GetIntenternalChan(), dspS.GetIntenternalChan())
tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server)
@@ -526,12 +528,36 @@ func main() {
smg := services.NewSessionService(cfg, dmService, server, chrS.GetIntenternalChan(),
rals.GetResponder().GetIntenternalChan(), reS.GetIntenternalChan(),
tS.GetIntenternalChan(), stS.GetIntenternalChan(), supS.GetIntenternalChan(),
- attrS.GetIntenternalChan(), cdrS.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan)
+ attrS.GetIntenternalChan(), cdrS.GetIntenternalChan(), dspS.GetIntenternalChan(), internalSessionSChan, exitChan)
ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, internalCacheSChan, dspS.GetIntenternalChan(), exitChan)
anz := services.NewAnalyzerService(cfg, server, exitChan)
+
+ connManager := services.NewConnManagerService(cfg, map[string]chan rpcclient.RpcClientConnection{
+ utils.AnalyzerSv1: anz.GetIntenternalChan(),
+ utils.ApierV1: rals.GetAPIv1().GetIntenternalChan(),
+ utils.ApierV2: rals.GetAPIv2().GetIntenternalChan(),
+ utils.AttributeSv1: internalAttributeSChan,
+ utils.CacheSv1: internalCacheSChan,
+ utils.CDRsV1: cdrS.GetIntenternalChan(),
+ utils.CDRsV2: cdrS.GetIntenternalChan(),
+ utils.ChargerSv1: chrS.GetIntenternalChan(),
+ utils.GuardianSv1: internalGuardianSChan,
+ utils.LoaderSv1: ldrs.GetIntenternalChan(),
+ utils.ResourceSv1: reS.GetIntenternalChan(),
+ utils.Responder: rals.GetResponder().GetIntenternalChan(),
+ utils.SchedulerSv1: schS.GetIntenternalChan(),
+ utils.SessionSv1: internalSessionSChan,
+ utils.StatSv1: stS.GetIntenternalChan(),
+ utils.SupplierSv1: supS.GetIntenternalChan(),
+ utils.ThresholdSv1: tS.GetIntenternalChan(),
+ utils.ServiceManagerV1: internalServeManagerChan,
+ utils.ConfigSv1: internalConfigChan,
+ utils.CoreSv1: internalCoreSv1Chan,
+ utils.RALsV1: rals.GetIntenternalChan(),
+ })
srvManager.AddServices(connManager, attrS, chrS, tS, stS, reS, supS, schS, rals,
rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS, smg,
- services.NewEventReaderService(cfg, filterSChan, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan, connManager),
+ services.NewEventReaderService(cfg, filterSChan, exitChan, connManager.GetConnMgr()),
services.NewDNSAgent(cfg, filterSChan, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan),
services.NewFreeswitchAgent(cfg, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan),
services.NewKamailioAgent(cfg, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan),
diff --git a/config/configsanity.go b/config/configsanity.go
index 42cf02279..b42515335 100644
--- a/config/configsanity.go
+++ b/config/configsanity.go
@@ -373,11 +373,11 @@ func (cfg *CGRConfig) checkConfigSanity() error {
}
// EventReader sanity checks
if cfg.ersCfg.Enabled {
- for _, connCfg := range cfg.ersCfg.SessionSConns {
- if _, has := cfg.rpcConns[connCfg]; !has {
- return fmt.Errorf("<%s> Connection with id: <%s> not defined", utils.ERs, connCfg)
- }
- }
+ //for _, connCfg := range cfg.ersCfg.SessionSConns {
+ // if _, has := cfg.rpcConns[connCfg]; !has {
+ // return fmt.Errorf("<%s> Connection with id: <%s> not defined", utils.ERs, connCfg)
+ // }
+ //}
for _, rdr := range cfg.ersCfg.Readers {
if !possibleReaderTypes.Has(rdr.Type) {
return fmt.Errorf("<%s> unsupported data type: %s for reader with ID: %s", utils.ERs, rdr.Type, rdr.ID)
diff --git a/config/erscfg.go b/config/erscfg.go
index dd37d8f39..72abfdafd 100644
--- a/config/erscfg.go
+++ b/config/erscfg.go
@@ -40,7 +40,12 @@ func (erS *ERsCfg) loadFromJsonCfg(jsnCfg *ERsJsonCfg, sep string, dfltRdrCfg *E
if jsnCfg.Sessions_conns != nil {
erS.SessionSConns = make([]string, len(*jsnCfg.Sessions_conns))
for i, fID := range *jsnCfg.Sessions_conns {
- erS.SessionSConns[i] = fID
+ // if we have the connection internal we change the name so we can have internal rpc for each subsystem
+ if fID == utils.MetaInternal {
+ erS.SessionSConns[i] = utils.SessionSv1
+ } else {
+ erS.SessionSConns[i] = fID
+ }
}
}
return erS.appendERsReaders(jsnCfg.Readers, sep, dfltRdrCfg)
diff --git a/data/conf/samples/ers/cgrates.json b/data/conf/samples/ers/cgrates.json
index 742ee3f81..b264ebe25 100644
--- a/data/conf/samples/ers/cgrates.json
+++ b/data/conf/samples/ers/cgrates.json
@@ -82,7 +82,7 @@
"ers": {
"enabled": true,
- "sessions_conns": ["*localhost"],
+ "sessions_conns": ["*internal"],
"readers": [
{
"id": "file_reader1",
diff --git a/engine/connmanager.go b/engine/connmanager.go
new file mode 100644
index 000000000..fef592f73
--- /dev/null
+++ b/engine/connmanager.go
@@ -0,0 +1,86 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package engine
+
+import (
+ "fmt"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/utils"
+ "github.com/cgrates/rpcclient"
+)
+
+// NewConnManager returns the Connection Manager
+func NewConnManager(cfg *config.CGRConfig, rpcInternal map[string]chan rpcclient.RpcClientConnection) (cM *ConnManager) {
+ fmt.Println("Enter in engine NewConn Manager")
+ fmt.Println(rpcInternal)
+ return &ConnManager{cfg: cfg, rpcInternal: rpcInternal}
+}
+
+type ConnManager struct {
+ cfg *config.CGRConfig
+ rpcInternal map[string]chan rpcclient.RpcClientConnection
+}
+
+func (cM *ConnManager) getConn(connID string) (connPool *rpcclient.RpcClientPool, err error) {
+ //try to get the connection from cache
+ if x, ok := Cache.Get(utils.CacheRPCConnections, connID); ok {
+ if x == nil {
+ return nil, utils.ErrNotFound
+ }
+ return x.(*rpcclient.RpcClientPool), nil
+ }
+ // in case we don't found in cache create the connection and add this in cache
+ var intChan chan rpcclient.RpcClientConnection
+ var connCfg *config.RPCConn
+ fmt.Println("TEST ?? ")
+ fmt.Println(cM)
+ fmt.Println(cM.rpcInternal)
+ fmt.Println(connID)
+ if internalChan, has := cM.rpcInternal[connID]; has {
+ connCfg = cM.cfg.RPCConns()[utils.MetaInternal]
+ intChan = internalChan
+ } else {
+ connCfg = cM.cfg.RPCConns()[connID]
+ }
+ connPool, err = NewRPCPool(connCfg.Strategy,
+ cM.cfg.TlsCfg().ClientKey,
+ cM.cfg.TlsCfg().ClientCerificate, cM.cfg.TlsCfg().CaCertificate,
+ cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects,
+ cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout,
+ connCfg.Conns, intChan, false)
+ if err != nil {
+ return
+ }
+ Cache.Set(utils.CacheRPCConnections, connID, connPool, nil,
+ true, utils.NonTransactional)
+ return
+}
+
+func (cM *ConnManager) Call(connIDs []string, method string, arg, reply interface{}) (err error) {
+ for _, connID := range connIDs {
+ conn, err := cM.getConn(connID)
+ if err == nil {
+ if err := conn.Call(method, arg, reply); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
diff --git a/ers/ers.go b/ers/ers.go
index 2b4063683..bbc0da8d9 100644
--- a/ers/ers.go
+++ b/ers/ers.go
@@ -26,7 +26,6 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
- "github.com/cgrates/rpcclient"
)
// erEvent is passed from reader to ERs
@@ -36,8 +35,7 @@ type erEvent struct {
}
// NewERService instantiates the ERService
-func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS,
- sS rpcclient.RpcClientConnection, stopChan chan struct{}) *ERService {
+func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, stopChan chan struct{}, connMgr *engine.ConnManager) *ERService {
return &ERService{
cfg: cfg,
rdrs: make(map[string]EventReader),
@@ -46,8 +44,8 @@ func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS,
rdrEvents: make(chan *erEvent),
rdrErr: make(chan error),
filterS: filterS,
- sS: sS,
stopChan: stopChan,
+ connMgr: connMgr,
}
}
@@ -62,8 +60,8 @@ type ERService struct {
rdrErr chan error // receive here errors which should stop the app
filterS *engine.FilterS
- sS rpcclient.RpcClientConnection // connection towards SessionS
stopChan chan struct{}
+ connMgr *engine.ConnManager
}
// ListenAndServe keeps the service alive
@@ -195,8 +193,10 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventRe
cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator,
)
rply := new(sessions.V1AuthorizeReply)
- err = erS.sS.Call(utils.SessionSv1AuthorizeEvent,
+ fmt.Println("Call Auth")
+ err = erS.connMgr.Call(erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1AuthorizeEvent,
authArgs, rply)
+ fmt.Println(" Finish Call Auth")
case utils.MetaInitiate:
initArgs := sessions.NewV1InitSessionArgs(
rdrCfg.Flags.HasKey(utils.MetaAttributes),
@@ -209,8 +209,10 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventRe
rdrCfg.Flags.HasKey(utils.MetaAccounts),
cgrEv, cgrArgs.ArgDispatcher)
rply := new(sessions.V1InitSessionReply)
- err = erS.sS.Call(utils.SessionSv1InitiateSession,
+ fmt.Println("Call Init")
+ err = erS.connMgr.Call(erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1InitiateSession,
initArgs, rply)
+ fmt.Println(" Finish Call Init")
case utils.MetaUpdate:
updateArgs := sessions.NewV1UpdateSessionArgs(
rdrCfg.Flags.HasKey(utils.MetaAttributes),
@@ -218,8 +220,10 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventRe
rdrCfg.Flags.HasKey(utils.MetaAccounts),
cgrEv, cgrArgs.ArgDispatcher)
rply := new(sessions.V1UpdateSessionReply)
- err = erS.sS.Call(utils.SessionSv1UpdateSession,
+ fmt.Println("Call Update")
+ err = erS.connMgr.Call(erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1UpdateSession,
updateArgs, rply)
+ fmt.Println(" Finish Call Update")
case utils.MetaTerminate:
terminateArgs := sessions.NewV1TerminateSessionArgs(
rdrCfg.Flags.HasKey(utils.MetaAccounts),
@@ -230,8 +234,10 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventRe
rdrCfg.Flags.ParamsSlice(utils.MetaStats),
cgrEv, cgrArgs.ArgDispatcher)
rply := utils.StringPointer("")
- err = erS.sS.Call(utils.SessionSv1TerminateSession,
+ fmt.Println("Call Terminate")
+ err = erS.connMgr.Call(erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1TerminateSession,
terminateArgs, rply)
+ fmt.Println(" Finish Call Terminate")
case utils.MetaMessage:
evArgs := sessions.NewV1ProcessMessageArgs(
rdrCfg.Flags.HasKey(utils.MetaAttributes),
@@ -247,8 +253,10 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventRe
rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost),
cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator)
rply := new(sessions.V1ProcessMessageReply) // need it so rpcclient can clone
- err = erS.sS.Call(utils.SessionSv1ProcessMessage,
+ fmt.Println("Call ProcMsg")
+ err = erS.connMgr.Call(erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1ProcessMessage,
evArgs, rply)
+ fmt.Println("Finish Call ProcMsg")
if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
cgrEv.Event[utils.Usage] = 0 // avoid further debits
} else if evArgs.Debit {
@@ -262,8 +270,10 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventRe
Paginator: *cgrArgs.SupplierPaginator,
}
rply := new(sessions.V1ProcessEventReply)
- err = erS.sS.Call(utils.SessionSv1ProcessEvent,
+ fmt.Println("Call Event")
+ err = erS.connMgr.Call(erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1ProcessEvent,
evArgs, rply)
+ fmt.Println("Finish Call Event")
case utils.MetaCDRs: // allow CDR processing
}
if err != nil {
@@ -273,16 +283,12 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventRe
if rdrCfg.Flags.HasKey(utils.MetaCDRs) &&
!rdrCfg.Flags.HasKey(utils.MetaDryRun) {
rplyCDRs := utils.StringPointer("")
- err = erS.sS.Call(utils.SessionSv1ProcessCDR,
+ fmt.Println("Call cdrs")
+ err = erS.connMgr.Call(erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1ProcessCDR,
&utils.CGREventWithArgDispatcher{CGREvent: cgrEv,
ArgDispatcher: cgrArgs.ArgDispatcher}, &rplyCDRs)
+ fmt.Println("finish Call cdrs")
}
return
}
-
-// SetSessionSConnection sets the new connection to the threshold service
-// only used on reload
-func (erS *ERService) SetSessionSConnection(sS rpcclient.RpcClientConnection) {
- erS.sS = sS
-}
diff --git a/ers/ers_test.go b/ers/ers_test.go
index c48e299f3..a683ad07a 100644
--- a/ers/ers_test.go
+++ b/ers/ers_test.go
@@ -38,8 +38,7 @@ func TestERsNewERService(t *testing.T) {
stopLsn: make(map[string]chan struct{}),
rdrEvents: make(chan *erEvent),
rdrErr: make(chan error),
- stopChan: nil,
- sS: nil}
+ stopChan: nil}
rcv := NewERService(cfg, fltrS, nil, nil)
if !reflect.DeepEqual(expected.cfg, rcv.cfg) {
diff --git a/ers/filecsv_it_test.go b/ers/filecsv_it_test.go
index c57787533..4b411d05f 100644
--- a/ers/filecsv_it_test.go
+++ b/ers/filecsv_it_test.go
@@ -62,7 +62,7 @@ accid23;*rated;cgrates.org;1001;086517174963;2013-02-03 19:54:00;26;val_extra3;"
testCsvITInitConfig,
testCsvITInitCdrDb,
testCsvITResetDataDb,
- testCsvITStartEngine,
+ //testCsvITStartEngine,
testCsvITRpcConn,
testCsvITLoadTPFromFolder,
testCsvITHandleCdr1File,
@@ -75,7 +75,7 @@ accid23;*rated;cgrates.org;1001;086517174963;2013-02-03 19:54:00;26;val_extra3;"
testCsvITProcessFilteredCDR,
testCsvITAnalyzeFilteredCDR,
testCsvITProcessedFiles,
- testCsvITCleanupFiles,
+ //testCsvITCleanupFiles,
testCsvITKillEngine,
}
)
@@ -120,6 +120,7 @@ func testCsvITCreateCdrDirs(t *testing.T) {
t.Fatal("Error creating folder: ", dir, err)
}
}
+ time.Sleep(10 * time.Second)
}
func testCsvITStartEngine(t *testing.T) {
diff --git a/services/attributes.go b/services/attributes.go
index 0d123bfd4..420320cf3 100644
--- a/services/attributes.go
+++ b/services/attributes.go
@@ -33,9 +33,9 @@ import (
// NewAttributeService returns the Attribute Service
func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
- server *utils.Server) servmanager.Service {
+ server *utils.Server, internalChan chan rpcclient.RpcClientConnection) servmanager.Service {
return &AttributeService{
- connChan: make(chan rpcclient.RpcClientConnection, 1),
+ connChan: internalChan,
cfg: cfg,
dm: dm,
cacheS: cacheS,
diff --git a/services/connmanager.go b/services/connmanager.go
index f4bf3cdb4..db8311779 100644
--- a/services/connmanager.go
+++ b/services/connmanager.go
@@ -19,6 +19,7 @@ along with this program. If not, see
package services
import (
+ "fmt"
"sync"
"github.com/cgrates/cgrates/config"
@@ -27,72 +28,56 @@ import (
"github.com/cgrates/rpcclient"
)
-// NewConnManager returns the Connection Manager
-func NewConnManager(cfg *config.CGRConfig) (cM *ConnManager) {
- return &ConnManager{cfg: cfg}
+func NewConnManagerService(cfg *config.CGRConfig, intConns map[string]chan rpcclient.RpcClientConnection) *ConnManagerService {
+ fmt.Println("Enter in NewConnManagerService")
+ fmt.Println(intConns)
+ return &ConnManagerService{
+ cfg: cfg,
+ connMgr: engine.NewConnManager(cfg, intConns),
+ }
}
-type ConnManager struct {
+type ConnManagerService struct {
sync.RWMutex
- cfg *config.CGRConfig
-}
-
-func (cM *ConnManager) GetConn(connID string,
- internalChan chan rpcclient.RpcClientConnection) (connPool *rpcclient.RpcClientPool, err error) {
- //try to get the connection from cache
- if x, ok := engine.Cache.Get(utils.CacheRPCConnections, connID); ok {
- if x == nil {
- return nil, utils.ErrNotFound
- }
- return x.(*rpcclient.RpcClientPool), nil
- }
- // in case we don't found in cache create the connection and add this in cache
- connCfg := cM.cfg.RPCConns()[connID]
- connPool, err = engine.NewRPCPool(connCfg.Strategy,
- cM.cfg.TlsCfg().ClientKey,
- cM.cfg.TlsCfg().ClientCerificate, cM.cfg.TlsCfg().CaCertificate,
- cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects,
- cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout,
- connCfg.Conns, internalChan, false)
- if err != nil {
- return
- }
- engine.Cache.Set(utils.CacheRPCConnections, connID, connPool, nil,
- true, utils.NonTransactional)
- return
+ cfg *config.CGRConfig
+ connMgr *engine.ConnManager
}
// Start should handle the sercive start
-func (cM *ConnManager) Start() (err error) {
+func (cM *ConnManagerService) Start() (err error) {
return
}
// GetIntenternalChan returns the internal connection chanel
-func (cM *ConnManager) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) {
+func (cM *ConnManagerService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) {
return nil
}
// Reload handles the change of config
-func (cM *ConnManager) Reload() (err error) {
+func (cM *ConnManagerService) Reload() (err error) {
return // for the momment nothing to reload
}
// Shutdown stops the service
-func (cM *ConnManager) Shutdown() (err error) {
+func (cM *ConnManagerService) Shutdown() (err error) {
return
}
// IsRunning returns if the service is running
-func (cM *ConnManager) IsRunning() bool {
+func (cM *ConnManagerService) IsRunning() bool {
return true
}
// ServiceName returns the service name
-func (cM *ConnManager) ServiceName() string {
+func (cM *ConnManagerService) ServiceName() string {
return utils.RPCConnS
}
// ShouldRun returns if the service should be running
-func (cM *ConnManager) ShouldRun() bool {
+func (cM *ConnManagerService) ShouldRun() bool {
return true
}
+
+func (cM *ConnManagerService) GetConnMgr() *engine.ConnManager {
+ return cM.connMgr
+}
diff --git a/services/dispatchers.go b/services/dispatchers.go
index 131ac4a66..a0eda22d1 100644
--- a/services/dispatchers.go
+++ b/services/dispatchers.go
@@ -34,17 +34,15 @@ import (
// NewDispatcherService returns the Dispatcher Service
func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
- server *utils.Server, attrsChan chan rpcclient.RpcClientConnection,
- connMgr *ConnManager) servmanager.Service {
+ server *utils.Server, attrsChan, internalChan chan rpcclient.RpcClientConnection) servmanager.Service {
return &DispatcherService{
- connChan: make(chan rpcclient.RpcClientConnection, 1),
+ connChan: internalChan,
cfg: cfg,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
attrsChan: attrsChan,
- conMgr: connMgr,
}
}
@@ -61,7 +59,6 @@ type DispatcherService struct {
dspS *dispatchers.DispatcherService
rpc *v1.DispatcherSv1
connChan chan rpcclient.RpcClientConnection
- conMgr *ConnManager
}
// Start should handle the sercive start
diff --git a/services/ers.go b/services/ers.go
index 965eac7e0..58ac12d98 100644
--- a/services/ers.go
+++ b/services/ers.go
@@ -32,32 +32,27 @@ import (
// NewEventReaderService returns the EventReader Service
func NewEventReaderService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
- sSChan, dispatcherChan chan rpcclient.RpcClientConnection,
- exitChan chan bool, connMgr *ConnManager) servmanager.Service {
+ exitChan chan bool, connMgr *engine.ConnManager) servmanager.Service {
return &EventReaderService{
- rldChan: make(chan struct{}, 1),
- cfg: cfg,
- filterSChan: filterSChan,
- sSChan: sSChan,
- dispatcherChan: dispatcherChan,
- exitChan: exitChan,
- connMgr: connMgr,
+ rldChan: make(chan struct{}, 1),
+ cfg: cfg,
+ filterSChan: filterSChan,
+ exitChan: exitChan,
+ connMgr: connMgr,
}
}
// EventReaderService implements Service interface
type EventReaderService struct {
sync.RWMutex
- cfg *config.CGRConfig
- filterSChan chan *engine.FilterS
- sSChan chan rpcclient.RpcClientConnection
- dispatcherChan chan rpcclient.RpcClientConnection
- exitChan chan bool
+ cfg *config.CGRConfig
+ filterSChan chan *engine.FilterS
+ exitChan chan bool
ers *ers.ERService
rldChan chan struct{}
stopChan chan struct{}
- connMgr *ConnManager
+ connMgr *engine.ConnManager
}
// Start should handle the sercive start
@@ -76,17 +71,9 @@ func (erS *EventReaderService) Start() (err error) {
erS.stopChan = make(chan struct{}, 1)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ERs))
- var sS rpcclient.RpcClientConnection
-
- if sS, err = NewConnectionPool(erS.cfg, erS.sSChan, erS.dispatcherChan,
- erS.cfg.ERsCfg().SessionSConns, erS.connMgr); err != nil {
- utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>",
- utils.ERs, utils.SessionS, err.Error()))
- return
- }
// build the service
- erS.ers = ers.NewERService(erS.cfg, filterS, sS, erS.stopChan)
+ erS.ers = ers.NewERService(erS.cfg, filterS, erS.stopChan, erS.connMgr)
go func(ers *ers.ERService, rldChan chan struct{}) {
if err := ers.ListenAndServe(rldChan); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error()))
@@ -103,15 +90,7 @@ func (erS *EventReaderService) GetIntenternalChan() (conn chan rpcclient.RpcClie
// Reload handles the change of config
func (erS *EventReaderService) Reload() (err error) {
- var sS rpcclient.RpcClientConnection
- if sS, err = NewConnectionPool(erS.cfg, erS.sSChan,
- erS.dispatcherChan, erS.cfg.ERsCfg().SessionSConns, erS.connMgr); err != nil {
- utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>",
- utils.ERs, utils.SessionS, err.Error()))
- return
- }
erS.RLock()
- erS.ers.SetSessionSConnection(sS)
erS.rldChan <- struct{}{}
erS.RUnlock()
return
diff --git a/services/sessions.go b/services/sessions.go
index 244036ea5..a69f2336c 100644
--- a/services/sessions.go
+++ b/services/sessions.go
@@ -33,10 +33,10 @@ import (
// NewSessionService returns the Session Service
func NewSessionService(cfg *config.CGRConfig, dm *DataDBService,
server *utils.Server, chrsChan, respChan, resChan, thsChan, stsChan,
- supChan, attrsChan, cdrsChan, dispatcherChan chan rpcclient.RpcClientConnection,
+ supChan, attrsChan, cdrsChan, dispatcherChan, internalChan chan rpcclient.RpcClientConnection,
exitChan chan bool) servmanager.Service {
return &SessionService{
- connChan: make(chan rpcclient.RpcClientConnection, 1),
+ connChan: internalChan,
cfg: cfg,
dm: dm,
server: server,
diff --git a/services/utils.go b/services/utils.go
index 5a825a7cd..0bbf4cb93 100644
--- a/services/utils.go
+++ b/services/utils.go
@@ -41,30 +41,3 @@ func NewConnection(cfg *config.CGRConfig, serviceConnChan, dispatcherSChan chan
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
conns, internalChan, false)
}
-
-// NewConnection returns a new connection
-func NewConnectionPool(cfg *config.CGRConfig, serviceConnChan, dispatcherSChan chan rpcclient.RpcClientConnection,
- connsIDs []string, connMgr *ConnManager) (rpcclient.RpcClientConnection, error) {
- var rpcClient *rpcclient.RpcClientPool
- var err error
- if len(connsIDs) == 0 {
- return nil, nil
- }
- internalChan := serviceConnChan
- if cfg.DispatcherSCfg().Enabled {
- internalChan = dispatcherSChan
- }
- rpcPool := rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST, cfg.GeneralCfg().ReplyTimeout)
- atLestOneConnected := false // If one connected we don't longer return errors
- for _, connID := range connsIDs {
- rpcClient, err = connMgr.GetConn(connID, internalChan)
- if err == nil {
- atLestOneConnected = true
- }
- rpcPool.AddClient(rpcClient)
- }
- if atLestOneConnected {
- err = nil
- }
- return rpcPool, err
-}