From ac8f9f4399e9023857efb1cc2f20b1dd58db7e6e Mon Sep 17 00:00:00 2001 From: TeoV Date: Tue, 3 Dec 2019 06:44:20 -0500 Subject: [PATCH] Add a PoC for ConnManager with EventReader --- cmd/cgr-engine/cgr-engine.go | 38 ++++++++++--- config/configsanity.go | 10 ++-- config/erscfg.go | 7 ++- data/conf/samples/ers/cgrates.json | 2 +- engine/connmanager.go | 86 ++++++++++++++++++++++++++++++ ers/ers.go | 42 ++++++++------- ers/ers_test.go | 3 +- ers/filecsv_it_test.go | 5 +- services/attributes.go | 4 +- services/connmanager.go | 59 ++++++++------------ services/dispatchers.go | 7 +-- services/ers.go | 43 ++++----------- services/sessions.go | 4 +- services/utils.go | 27 ---------- 14 files changed, 197 insertions(+), 140 deletions(-) create mode 100644 engine/connmanager.go diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index ce5ed4433..079214e8f 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -486,7 +486,10 @@ func main() { internalCacheSChan := make(chan rpcclient.RpcClientConnection, 1) internalGuardianSChan := make(chan rpcclient.RpcClientConnection, 1) - internalCDRServerChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency + internalCDRServerChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency + internalAttributeSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency + internalDispatcherSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency + internalSessionSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency // init CacheS cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan) @@ -500,9 +503,8 @@ func main() { // Start ServiceManager srvManager := servmanager.NewServiceManager(cfg, exitChan) - connManager := services.NewConnManager(cfg) - attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server) - dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, attrS.GetIntenternalChan(), connManager) + attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan) + dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan, internalDispatcherSChan) chrS := services.NewChargerService(cfg, dmService, cacheS, filterSChan, server, attrS.GetIntenternalChan(), dspS.GetIntenternalChan()) tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server) @@ -526,12 +528,36 @@ func main() { smg := services.NewSessionService(cfg, dmService, server, chrS.GetIntenternalChan(), rals.GetResponder().GetIntenternalChan(), reS.GetIntenternalChan(), tS.GetIntenternalChan(), stS.GetIntenternalChan(), supS.GetIntenternalChan(), - attrS.GetIntenternalChan(), cdrS.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan) + attrS.GetIntenternalChan(), cdrS.GetIntenternalChan(), dspS.GetIntenternalChan(), internalSessionSChan, exitChan) ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, internalCacheSChan, dspS.GetIntenternalChan(), exitChan) anz := services.NewAnalyzerService(cfg, server, exitChan) + + connManager := services.NewConnManagerService(cfg, map[string]chan rpcclient.RpcClientConnection{ + utils.AnalyzerSv1: anz.GetIntenternalChan(), + utils.ApierV1: rals.GetAPIv1().GetIntenternalChan(), + utils.ApierV2: rals.GetAPIv2().GetIntenternalChan(), + utils.AttributeSv1: internalAttributeSChan, + utils.CacheSv1: internalCacheSChan, + utils.CDRsV1: cdrS.GetIntenternalChan(), + utils.CDRsV2: cdrS.GetIntenternalChan(), + utils.ChargerSv1: chrS.GetIntenternalChan(), + utils.GuardianSv1: internalGuardianSChan, + utils.LoaderSv1: ldrs.GetIntenternalChan(), + utils.ResourceSv1: reS.GetIntenternalChan(), + utils.Responder: rals.GetResponder().GetIntenternalChan(), + utils.SchedulerSv1: schS.GetIntenternalChan(), + utils.SessionSv1: internalSessionSChan, + utils.StatSv1: stS.GetIntenternalChan(), + utils.SupplierSv1: supS.GetIntenternalChan(), + utils.ThresholdSv1: tS.GetIntenternalChan(), + utils.ServiceManagerV1: internalServeManagerChan, + utils.ConfigSv1: internalConfigChan, + utils.CoreSv1: internalCoreSv1Chan, + utils.RALsV1: rals.GetIntenternalChan(), + }) srvManager.AddServices(connManager, attrS, chrS, tS, stS, reS, supS, schS, rals, rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS, smg, - services.NewEventReaderService(cfg, filterSChan, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan, connManager), + services.NewEventReaderService(cfg, filterSChan, exitChan, connManager.GetConnMgr()), services.NewDNSAgent(cfg, filterSChan, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan), services.NewFreeswitchAgent(cfg, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan), services.NewKamailioAgent(cfg, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan), diff --git a/config/configsanity.go b/config/configsanity.go index 42cf02279..b42515335 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -373,11 +373,11 @@ func (cfg *CGRConfig) checkConfigSanity() error { } // EventReader sanity checks if cfg.ersCfg.Enabled { - for _, connCfg := range cfg.ersCfg.SessionSConns { - if _, has := cfg.rpcConns[connCfg]; !has { - return fmt.Errorf("<%s> Connection with id: <%s> not defined", utils.ERs, connCfg) - } - } + //for _, connCfg := range cfg.ersCfg.SessionSConns { + // if _, has := cfg.rpcConns[connCfg]; !has { + // return fmt.Errorf("<%s> Connection with id: <%s> not defined", utils.ERs, connCfg) + // } + //} for _, rdr := range cfg.ersCfg.Readers { if !possibleReaderTypes.Has(rdr.Type) { return fmt.Errorf("<%s> unsupported data type: %s for reader with ID: %s", utils.ERs, rdr.Type, rdr.ID) diff --git a/config/erscfg.go b/config/erscfg.go index dd37d8f39..72abfdafd 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -40,7 +40,12 @@ func (erS *ERsCfg) loadFromJsonCfg(jsnCfg *ERsJsonCfg, sep string, dfltRdrCfg *E if jsnCfg.Sessions_conns != nil { erS.SessionSConns = make([]string, len(*jsnCfg.Sessions_conns)) for i, fID := range *jsnCfg.Sessions_conns { - erS.SessionSConns[i] = fID + // if we have the connection internal we change the name so we can have internal rpc for each subsystem + if fID == utils.MetaInternal { + erS.SessionSConns[i] = utils.SessionSv1 + } else { + erS.SessionSConns[i] = fID + } } } return erS.appendERsReaders(jsnCfg.Readers, sep, dfltRdrCfg) diff --git a/data/conf/samples/ers/cgrates.json b/data/conf/samples/ers/cgrates.json index 742ee3f81..b264ebe25 100644 --- a/data/conf/samples/ers/cgrates.json +++ b/data/conf/samples/ers/cgrates.json @@ -82,7 +82,7 @@ "ers": { "enabled": true, - "sessions_conns": ["*localhost"], + "sessions_conns": ["*internal"], "readers": [ { "id": "file_reader1", diff --git a/engine/connmanager.go b/engine/connmanager.go new file mode 100644 index 000000000..fef592f73 --- /dev/null +++ b/engine/connmanager.go @@ -0,0 +1,86 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "fmt" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewConnManager returns the Connection Manager +func NewConnManager(cfg *config.CGRConfig, rpcInternal map[string]chan rpcclient.RpcClientConnection) (cM *ConnManager) { + fmt.Println("Enter in engine NewConn Manager") + fmt.Println(rpcInternal) + return &ConnManager{cfg: cfg, rpcInternal: rpcInternal} +} + +type ConnManager struct { + cfg *config.CGRConfig + rpcInternal map[string]chan rpcclient.RpcClientConnection +} + +func (cM *ConnManager) getConn(connID string) (connPool *rpcclient.RpcClientPool, err error) { + //try to get the connection from cache + if x, ok := Cache.Get(utils.CacheRPCConnections, connID); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*rpcclient.RpcClientPool), nil + } + // in case we don't found in cache create the connection and add this in cache + var intChan chan rpcclient.RpcClientConnection + var connCfg *config.RPCConn + fmt.Println("TEST ?? ") + fmt.Println(cM) + fmt.Println(cM.rpcInternal) + fmt.Println(connID) + if internalChan, has := cM.rpcInternal[connID]; has { + connCfg = cM.cfg.RPCConns()[utils.MetaInternal] + intChan = internalChan + } else { + connCfg = cM.cfg.RPCConns()[connID] + } + connPool, err = NewRPCPool(connCfg.Strategy, + cM.cfg.TlsCfg().ClientKey, + cM.cfg.TlsCfg().ClientCerificate, cM.cfg.TlsCfg().CaCertificate, + cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects, + cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout, + connCfg.Conns, intChan, false) + if err != nil { + return + } + Cache.Set(utils.CacheRPCConnections, connID, connPool, nil, + true, utils.NonTransactional) + return +} + +func (cM *ConnManager) Call(connIDs []string, method string, arg, reply interface{}) (err error) { + for _, connID := range connIDs { + conn, err := cM.getConn(connID) + if err == nil { + if err := conn.Call(method, arg, reply); err != nil { + return err + } + } + } + return nil +} diff --git a/ers/ers.go b/ers/ers.go index 2b4063683..bbc0da8d9 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -26,7 +26,6 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/rpcclient" ) // erEvent is passed from reader to ERs @@ -36,8 +35,7 @@ type erEvent struct { } // NewERService instantiates the ERService -func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, - sS rpcclient.RpcClientConnection, stopChan chan struct{}) *ERService { +func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, stopChan chan struct{}, connMgr *engine.ConnManager) *ERService { return &ERService{ cfg: cfg, rdrs: make(map[string]EventReader), @@ -46,8 +44,8 @@ func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, rdrEvents: make(chan *erEvent), rdrErr: make(chan error), filterS: filterS, - sS: sS, stopChan: stopChan, + connMgr: connMgr, } } @@ -62,8 +60,8 @@ type ERService struct { rdrErr chan error // receive here errors which should stop the app filterS *engine.FilterS - sS rpcclient.RpcClientConnection // connection towards SessionS stopChan chan struct{} + connMgr *engine.ConnManager } // ListenAndServe keeps the service alive @@ -195,8 +193,10 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventRe cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator, ) rply := new(sessions.V1AuthorizeReply) - err = erS.sS.Call(utils.SessionSv1AuthorizeEvent, + fmt.Println("Call Auth") + err = erS.connMgr.Call(erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1AuthorizeEvent, authArgs, rply) + fmt.Println(" Finish Call Auth") case utils.MetaInitiate: initArgs := sessions.NewV1InitSessionArgs( rdrCfg.Flags.HasKey(utils.MetaAttributes), @@ -209,8 +209,10 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventRe rdrCfg.Flags.HasKey(utils.MetaAccounts), cgrEv, cgrArgs.ArgDispatcher) rply := new(sessions.V1InitSessionReply) - err = erS.sS.Call(utils.SessionSv1InitiateSession, + fmt.Println("Call Init") + err = erS.connMgr.Call(erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1InitiateSession, initArgs, rply) + fmt.Println(" Finish Call Init") case utils.MetaUpdate: updateArgs := sessions.NewV1UpdateSessionArgs( rdrCfg.Flags.HasKey(utils.MetaAttributes), @@ -218,8 +220,10 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventRe rdrCfg.Flags.HasKey(utils.MetaAccounts), cgrEv, cgrArgs.ArgDispatcher) rply := new(sessions.V1UpdateSessionReply) - err = erS.sS.Call(utils.SessionSv1UpdateSession, + fmt.Println("Call Update") + err = erS.connMgr.Call(erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1UpdateSession, updateArgs, rply) + fmt.Println(" Finish Call Update") case utils.MetaTerminate: terminateArgs := sessions.NewV1TerminateSessionArgs( rdrCfg.Flags.HasKey(utils.MetaAccounts), @@ -230,8 +234,10 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventRe rdrCfg.Flags.ParamsSlice(utils.MetaStats), cgrEv, cgrArgs.ArgDispatcher) rply := utils.StringPointer("") - err = erS.sS.Call(utils.SessionSv1TerminateSession, + fmt.Println("Call Terminate") + err = erS.connMgr.Call(erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1TerminateSession, terminateArgs, rply) + fmt.Println(" Finish Call Terminate") case utils.MetaMessage: evArgs := sessions.NewV1ProcessMessageArgs( rdrCfg.Flags.HasKey(utils.MetaAttributes), @@ -247,8 +253,10 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventRe rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost), cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator) rply := new(sessions.V1ProcessMessageReply) // need it so rpcclient can clone - err = erS.sS.Call(utils.SessionSv1ProcessMessage, + fmt.Println("Call ProcMsg") + err = erS.connMgr.Call(erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1ProcessMessage, evArgs, rply) + fmt.Println("Finish Call ProcMsg") if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) { cgrEv.Event[utils.Usage] = 0 // avoid further debits } else if evArgs.Debit { @@ -262,8 +270,10 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventRe Paginator: *cgrArgs.SupplierPaginator, } rply := new(sessions.V1ProcessEventReply) - err = erS.sS.Call(utils.SessionSv1ProcessEvent, + fmt.Println("Call Event") + err = erS.connMgr.Call(erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1ProcessEvent, evArgs, rply) + fmt.Println("Finish Call Event") case utils.MetaCDRs: // allow CDR processing } if err != nil { @@ -273,16 +283,12 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventRe if rdrCfg.Flags.HasKey(utils.MetaCDRs) && !rdrCfg.Flags.HasKey(utils.MetaDryRun) { rplyCDRs := utils.StringPointer("") - err = erS.sS.Call(utils.SessionSv1ProcessCDR, + fmt.Println("Call cdrs") + err = erS.connMgr.Call(erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1ProcessCDR, &utils.CGREventWithArgDispatcher{CGREvent: cgrEv, ArgDispatcher: cgrArgs.ArgDispatcher}, &rplyCDRs) + fmt.Println("finish Call cdrs") } return } - -// SetSessionSConnection sets the new connection to the threshold service -// only used on reload -func (erS *ERService) SetSessionSConnection(sS rpcclient.RpcClientConnection) { - erS.sS = sS -} diff --git a/ers/ers_test.go b/ers/ers_test.go index c48e299f3..a683ad07a 100644 --- a/ers/ers_test.go +++ b/ers/ers_test.go @@ -38,8 +38,7 @@ func TestERsNewERService(t *testing.T) { stopLsn: make(map[string]chan struct{}), rdrEvents: make(chan *erEvent), rdrErr: make(chan error), - stopChan: nil, - sS: nil} + stopChan: nil} rcv := NewERService(cfg, fltrS, nil, nil) if !reflect.DeepEqual(expected.cfg, rcv.cfg) { diff --git a/ers/filecsv_it_test.go b/ers/filecsv_it_test.go index c57787533..4b411d05f 100644 --- a/ers/filecsv_it_test.go +++ b/ers/filecsv_it_test.go @@ -62,7 +62,7 @@ accid23;*rated;cgrates.org;1001;086517174963;2013-02-03 19:54:00;26;val_extra3;" testCsvITInitConfig, testCsvITInitCdrDb, testCsvITResetDataDb, - testCsvITStartEngine, + //testCsvITStartEngine, testCsvITRpcConn, testCsvITLoadTPFromFolder, testCsvITHandleCdr1File, @@ -75,7 +75,7 @@ accid23;*rated;cgrates.org;1001;086517174963;2013-02-03 19:54:00;26;val_extra3;" testCsvITProcessFilteredCDR, testCsvITAnalyzeFilteredCDR, testCsvITProcessedFiles, - testCsvITCleanupFiles, + //testCsvITCleanupFiles, testCsvITKillEngine, } ) @@ -120,6 +120,7 @@ func testCsvITCreateCdrDirs(t *testing.T) { t.Fatal("Error creating folder: ", dir, err) } } + time.Sleep(10 * time.Second) } func testCsvITStartEngine(t *testing.T) { diff --git a/services/attributes.go b/services/attributes.go index 0d123bfd4..420320cf3 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -33,9 +33,9 @@ import ( // NewAttributeService returns the Attribute Service func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, - server *utils.Server) servmanager.Service { + server *utils.Server, internalChan chan rpcclient.RpcClientConnection) servmanager.Service { return &AttributeService{ - connChan: make(chan rpcclient.RpcClientConnection, 1), + connChan: internalChan, cfg: cfg, dm: dm, cacheS: cacheS, diff --git a/services/connmanager.go b/services/connmanager.go index f4bf3cdb4..db8311779 100644 --- a/services/connmanager.go +++ b/services/connmanager.go @@ -19,6 +19,7 @@ along with this program. If not, see package services import ( + "fmt" "sync" "github.com/cgrates/cgrates/config" @@ -27,72 +28,56 @@ import ( "github.com/cgrates/rpcclient" ) -// NewConnManager returns the Connection Manager -func NewConnManager(cfg *config.CGRConfig) (cM *ConnManager) { - return &ConnManager{cfg: cfg} +func NewConnManagerService(cfg *config.CGRConfig, intConns map[string]chan rpcclient.RpcClientConnection) *ConnManagerService { + fmt.Println("Enter in NewConnManagerService") + fmt.Println(intConns) + return &ConnManagerService{ + cfg: cfg, + connMgr: engine.NewConnManager(cfg, intConns), + } } -type ConnManager struct { +type ConnManagerService struct { sync.RWMutex - cfg *config.CGRConfig -} - -func (cM *ConnManager) GetConn(connID string, - internalChan chan rpcclient.RpcClientConnection) (connPool *rpcclient.RpcClientPool, err error) { - //try to get the connection from cache - if x, ok := engine.Cache.Get(utils.CacheRPCConnections, connID); ok { - if x == nil { - return nil, utils.ErrNotFound - } - return x.(*rpcclient.RpcClientPool), nil - } - // in case we don't found in cache create the connection and add this in cache - connCfg := cM.cfg.RPCConns()[connID] - connPool, err = engine.NewRPCPool(connCfg.Strategy, - cM.cfg.TlsCfg().ClientKey, - cM.cfg.TlsCfg().ClientCerificate, cM.cfg.TlsCfg().CaCertificate, - cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects, - cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout, - connCfg.Conns, internalChan, false) - if err != nil { - return - } - engine.Cache.Set(utils.CacheRPCConnections, connID, connPool, nil, - true, utils.NonTransactional) - return + cfg *config.CGRConfig + connMgr *engine.ConnManager } // Start should handle the sercive start -func (cM *ConnManager) Start() (err error) { +func (cM *ConnManagerService) Start() (err error) { return } // GetIntenternalChan returns the internal connection chanel -func (cM *ConnManager) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) { +func (cM *ConnManagerService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) { return nil } // Reload handles the change of config -func (cM *ConnManager) Reload() (err error) { +func (cM *ConnManagerService) Reload() (err error) { return // for the momment nothing to reload } // Shutdown stops the service -func (cM *ConnManager) Shutdown() (err error) { +func (cM *ConnManagerService) Shutdown() (err error) { return } // IsRunning returns if the service is running -func (cM *ConnManager) IsRunning() bool { +func (cM *ConnManagerService) IsRunning() bool { return true } // ServiceName returns the service name -func (cM *ConnManager) ServiceName() string { +func (cM *ConnManagerService) ServiceName() string { return utils.RPCConnS } // ShouldRun returns if the service should be running -func (cM *ConnManager) ShouldRun() bool { +func (cM *ConnManagerService) ShouldRun() bool { return true } + +func (cM *ConnManagerService) GetConnMgr() *engine.ConnManager { + return cM.connMgr +} diff --git a/services/dispatchers.go b/services/dispatchers.go index 131ac4a66..a0eda22d1 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -34,17 +34,15 @@ import ( // NewDispatcherService returns the Dispatcher Service func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, - server *utils.Server, attrsChan chan rpcclient.RpcClientConnection, - connMgr *ConnManager) servmanager.Service { + server *utils.Server, attrsChan, internalChan chan rpcclient.RpcClientConnection) servmanager.Service { return &DispatcherService{ - connChan: make(chan rpcclient.RpcClientConnection, 1), + connChan: internalChan, cfg: cfg, dm: dm, cacheS: cacheS, filterSChan: filterSChan, server: server, attrsChan: attrsChan, - conMgr: connMgr, } } @@ -61,7 +59,6 @@ type DispatcherService struct { dspS *dispatchers.DispatcherService rpc *v1.DispatcherSv1 connChan chan rpcclient.RpcClientConnection - conMgr *ConnManager } // Start should handle the sercive start diff --git a/services/ers.go b/services/ers.go index 965eac7e0..58ac12d98 100644 --- a/services/ers.go +++ b/services/ers.go @@ -32,32 +32,27 @@ import ( // NewEventReaderService returns the EventReader Service func NewEventReaderService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - sSChan, dispatcherChan chan rpcclient.RpcClientConnection, - exitChan chan bool, connMgr *ConnManager) servmanager.Service { + exitChan chan bool, connMgr *engine.ConnManager) servmanager.Service { return &EventReaderService{ - rldChan: make(chan struct{}, 1), - cfg: cfg, - filterSChan: filterSChan, - sSChan: sSChan, - dispatcherChan: dispatcherChan, - exitChan: exitChan, - connMgr: connMgr, + rldChan: make(chan struct{}, 1), + cfg: cfg, + filterSChan: filterSChan, + exitChan: exitChan, + connMgr: connMgr, } } // EventReaderService implements Service interface type EventReaderService struct { sync.RWMutex - cfg *config.CGRConfig - filterSChan chan *engine.FilterS - sSChan chan rpcclient.RpcClientConnection - dispatcherChan chan rpcclient.RpcClientConnection - exitChan chan bool + cfg *config.CGRConfig + filterSChan chan *engine.FilterS + exitChan chan bool ers *ers.ERService rldChan chan struct{} stopChan chan struct{} - connMgr *ConnManager + connMgr *engine.ConnManager } // Start should handle the sercive start @@ -76,17 +71,9 @@ func (erS *EventReaderService) Start() (err error) { erS.stopChan = make(chan struct{}, 1) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ERs)) - var sS rpcclient.RpcClientConnection - - if sS, err = NewConnectionPool(erS.cfg, erS.sSChan, erS.dispatcherChan, - erS.cfg.ERsCfg().SessionSConns, erS.connMgr); err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>", - utils.ERs, utils.SessionS, err.Error())) - return - } // build the service - erS.ers = ers.NewERService(erS.cfg, filterS, sS, erS.stopChan) + erS.ers = ers.NewERService(erS.cfg, filterS, erS.stopChan, erS.connMgr) go func(ers *ers.ERService, rldChan chan struct{}) { if err := ers.ListenAndServe(rldChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error())) @@ -103,15 +90,7 @@ func (erS *EventReaderService) GetIntenternalChan() (conn chan rpcclient.RpcClie // Reload handles the change of config func (erS *EventReaderService) Reload() (err error) { - var sS rpcclient.RpcClientConnection - if sS, err = NewConnectionPool(erS.cfg, erS.sSChan, - erS.dispatcherChan, erS.cfg.ERsCfg().SessionSConns, erS.connMgr); err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>", - utils.ERs, utils.SessionS, err.Error())) - return - } erS.RLock() - erS.ers.SetSessionSConnection(sS) erS.rldChan <- struct{}{} erS.RUnlock() return diff --git a/services/sessions.go b/services/sessions.go index 244036ea5..a69f2336c 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -33,10 +33,10 @@ import ( // NewSessionService returns the Session Service func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, server *utils.Server, chrsChan, respChan, resChan, thsChan, stsChan, - supChan, attrsChan, cdrsChan, dispatcherChan chan rpcclient.RpcClientConnection, + supChan, attrsChan, cdrsChan, dispatcherChan, internalChan chan rpcclient.RpcClientConnection, exitChan chan bool) servmanager.Service { return &SessionService{ - connChan: make(chan rpcclient.RpcClientConnection, 1), + connChan: internalChan, cfg: cfg, dm: dm, server: server, diff --git a/services/utils.go b/services/utils.go index 5a825a7cd..0bbf4cb93 100644 --- a/services/utils.go +++ b/services/utils.go @@ -41,30 +41,3 @@ func NewConnection(cfg *config.CGRConfig, serviceConnChan, dispatcherSChan chan cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, conns, internalChan, false) } - -// NewConnection returns a new connection -func NewConnectionPool(cfg *config.CGRConfig, serviceConnChan, dispatcherSChan chan rpcclient.RpcClientConnection, - connsIDs []string, connMgr *ConnManager) (rpcclient.RpcClientConnection, error) { - var rpcClient *rpcclient.RpcClientPool - var err error - if len(connsIDs) == 0 { - return nil, nil - } - internalChan := serviceConnChan - if cfg.DispatcherSCfg().Enabled { - internalChan = dispatcherSChan - } - rpcPool := rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST, cfg.GeneralCfg().ReplyTimeout) - atLestOneConnected := false // If one connected we don't longer return errors - for _, connID := range connsIDs { - rpcClient, err = connMgr.GetConn(connID, internalChan) - if err == nil { - atLestOneConnected = true - } - rpcPool.AddClient(rpcClient) - } - if atLestOneConnected { - err = nil - } - return rpcPool, err -}