From 76c6af8538ab624431af6f2e1eabc166b803ef7b Mon Sep 17 00:00:00 2001 From: TeoV Date: Mon, 2 Dec 2019 09:55:54 -0500 Subject: [PATCH] Make a PoC for RPCConns with ERs --- cmd/cgr-engine/cgr-engine.go | 7 +- config/config.go | 12 +++- data/conf/samples/ers/cgrates.json | 4 +- data/conf/samples/ers_example/cgrates.json | 4 +- .../ers_reload/first_reload/cgrates.json | 4 +- .../samples/ers_reload/internal/cgrates.json | 4 +- .../ers_reload/second_reload/cgrates.json | 4 +- services/connmanager.go | 70 +++++++++++++++++-- services/dispatchers.go | 5 +- services/ers.go | 28 +++++--- services/utils.go | 27 +++++++ servmanager/servmanager.go | 1 + utils/consts.go | 1 + 13 files changed, 136 insertions(+), 35 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index b9e763001..ce5ed4433 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -500,8 +500,9 @@ func main() { // Start ServiceManager srvManager := servmanager.NewServiceManager(cfg, exitChan) + connManager := services.NewConnManager(cfg) attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server) - dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, attrS.GetIntenternalChan()) + dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, attrS.GetIntenternalChan(), connManager) chrS := services.NewChargerService(cfg, dmService, cacheS, filterSChan, server, attrS.GetIntenternalChan(), dspS.GetIntenternalChan()) tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server) @@ -528,9 +529,9 @@ func main() { attrS.GetIntenternalChan(), cdrS.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan) ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, internalCacheSChan, dspS.GetIntenternalChan(), exitChan) anz := services.NewAnalyzerService(cfg, server, exitChan) - srvManager.AddServices(attrS, chrS, tS, stS, reS, supS, schS, rals, + srvManager.AddServices(connManager, attrS, chrS, tS, stS, reS, supS, schS, rals, rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS, smg, - services.NewEventReaderService(cfg, filterSChan, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan), + services.NewEventReaderService(cfg, filterSChan, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan, connManager), services.NewDNSAgent(cfg, filterSChan, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan), services.NewFreeswitchAgent(cfg, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan), services.NewKamailioAgent(cfg, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan), diff --git a/config/config.go b/config/config.go index dc83d146f..54f8616c4 100755 --- a/config/config.go +++ b/config/config.go @@ -1116,6 +1116,8 @@ func (cfg *CGRConfig) V1GetConfigSection(args *StringWithArgDispatcher, reply *m jsonString = utils.ToJSON(cfg.CdreProfiles) case ERsJson: jsonString = utils.ToJSON(cfg.ERsCfg()) + case RPCConnsJsonName: + jsonString = utils.ToJSON(cfg.RPCConns()) default: return errors.New("Invalid section") } @@ -1710,6 +1712,14 @@ func (cfg *CGRConfig) loadConfig(path, section string) (err error) { cfg.lks[Apier].Lock() defer cfg.lks[Apier].Unlock() loadFuncs = append(loadFuncs, cfg.loadApierCfg) + if !fall { + break + } + fallthrough + case RPCConnsJsonName: + cfg.lks[RPCConnsJsonName].Lock() + defer cfg.lks[RPCConnsJsonName].Unlock() + loadFuncs = append(loadFuncs, cfg.loadRPCConns) } return cfg.loadConfigFromPath(path, loadFuncs) } @@ -1820,7 +1830,7 @@ func (cfg *CGRConfig) initChanels() { for _, section := range []string{GENERAL_JSN, DATADB_JSN, STORDB_JSN, LISTEN_JSN, TlsCfgJson, HTTP_JSN, SCHEDULER_JSN, CACHE_JSN, FilterSjsn, RALS_JSN, CDRS_JSN, CDRE_JSN, CDRC_JSN, ERsJson, SessionSJson, AsteriskAgentJSN, FreeSWITCHAgentJSN, KamailioAgentJSN, DA_JSN, RA_JSN, HttpAgentJson, DNSAgentJson, ATTRIBUTE_JSN, ChargerSCfgJson, RESOURCES_JSON, STATS_JSON, THRESHOLDS_JSON, - SupplierSJson, LoaderJson, MAILER_JSN, SURETAX_JSON, CgrLoaderCfgJson, CgrMigratorCfgJson, DispatcherSJson, AnalyzerCfgJson, Apier} { + SupplierSJson, LoaderJson, MAILER_JSN, SURETAX_JSON, CgrLoaderCfgJson, CgrMigratorCfgJson, DispatcherSJson, AnalyzerCfgJson, Apier, RPCConnsJsonName} { cfg.lks[section] = new(sync.RWMutex) cfg.rldChans[section] = make(chan struct{}, 1) } diff --git a/data/conf/samples/ers/cgrates.json b/data/conf/samples/ers/cgrates.json index c44bc80d5..742ee3f81 100644 --- a/data/conf/samples/ers/cgrates.json +++ b/data/conf/samples/ers/cgrates.json @@ -82,9 +82,7 @@ "ers": { "enabled": true, - "sessions_conns": [ // connections to SessionS: <*internal|127.0.0.1:2012> - {"address": "127.0.0.1:2012", "transport": "*json"} - ], + "sessions_conns": ["*localhost"], "readers": [ { "id": "file_reader1", diff --git a/data/conf/samples/ers_example/cgrates.json b/data/conf/samples/ers_example/cgrates.json index dfc31e548..3d3c3e626 100644 --- a/data/conf/samples/ers_example/cgrates.json +++ b/data/conf/samples/ers_example/cgrates.json @@ -82,9 +82,7 @@ "ers": { "enabled": true, - "sessions_conns": [ // connections to SessionS: <*internal|127.0.0.1:2012> - {"address": "127.0.0.1:2012", "transport": "*json"} - ], + "sessions_conns": ["*localhost"], "readers": [ { "id": "file_reader1", diff --git a/data/conf/samples/ers_reload/first_reload/cgrates.json b/data/conf/samples/ers_reload/first_reload/cgrates.json index 40d8e5849..c3832c5a7 100644 --- a/data/conf/samples/ers_reload/first_reload/cgrates.json +++ b/data/conf/samples/ers_reload/first_reload/cgrates.json @@ -82,9 +82,7 @@ "ers": { "enabled": true, - "sessions_conns": [ // connections to SessionS: <*internal|127.0.0.1:2012> - {"address": "127.0.0.1:2012", "transport": "*json"} - ], + "sessions_conns": ["*localhost"], "readers": [ { "id": "file_reader1", diff --git a/data/conf/samples/ers_reload/internal/cgrates.json b/data/conf/samples/ers_reload/internal/cgrates.json index bb86db2f4..f2cfeb632 100644 --- a/data/conf/samples/ers_reload/internal/cgrates.json +++ b/data/conf/samples/ers_reload/internal/cgrates.json @@ -82,9 +82,7 @@ "ers": { "enabled": true, - "sessions_conns": [ // connections to SessionS: <*internal|127.0.0.1:2012> - {"address": "*internal"} - ], + "sessions_conns": ["*internal"], "readers": [ { "id": "file_reader1", diff --git a/data/conf/samples/ers_reload/second_reload/cgrates.json b/data/conf/samples/ers_reload/second_reload/cgrates.json index 35ac07e3f..090e5da20 100644 --- a/data/conf/samples/ers_reload/second_reload/cgrates.json +++ b/data/conf/samples/ers_reload/second_reload/cgrates.json @@ -82,9 +82,7 @@ "ers": { "enabled": true, - "sessions_conns": [ // connections to SessionS: <*internal|127.0.0.1:2012> - {"address": "127.0.0.1:2012", "transport": "*json"} - ], + "sessions_conns": ["*localhost"], "readers": [ { "id": "file_reader1", diff --git a/services/connmanager.go b/services/connmanager.go index 40b3804d0..f4bf3cdb4 100644 --- a/services/connmanager.go +++ b/services/connmanager.go @@ -19,18 +19,80 @@ along with this program. If not, see package services import ( + "sync" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) // NewConnManager returns the Connection Manager func NewConnManager(cfg *config.CGRConfig) (cM *ConnManager) { - - return + return &ConnManager{cfg: cfg} } type ConnManager struct { + sync.RWMutex + cfg *config.CGRConfig } -func (cM *ConnManager) GetConn() { - +func (cM *ConnManager) GetConn(connID string, + internalChan chan rpcclient.RpcClientConnection) (connPool *rpcclient.RpcClientPool, err error) { + //try to get the connection from cache + if x, ok := engine.Cache.Get(utils.CacheRPCConnections, connID); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*rpcclient.RpcClientPool), nil + } + // in case we don't found in cache create the connection and add this in cache + connCfg := cM.cfg.RPCConns()[connID] + connPool, err = engine.NewRPCPool(connCfg.Strategy, + cM.cfg.TlsCfg().ClientKey, + cM.cfg.TlsCfg().ClientCerificate, cM.cfg.TlsCfg().CaCertificate, + cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects, + cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout, + connCfg.Conns, internalChan, false) + if err != nil { + return + } + engine.Cache.Set(utils.CacheRPCConnections, connID, connPool, nil, + true, utils.NonTransactional) + return +} + +// Start should handle the sercive start +func (cM *ConnManager) Start() (err error) { + return +} + +// GetIntenternalChan returns the internal connection chanel +func (cM *ConnManager) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) { + return nil +} + +// Reload handles the change of config +func (cM *ConnManager) Reload() (err error) { + return // for the momment nothing to reload +} + +// Shutdown stops the service +func (cM *ConnManager) Shutdown() (err error) { + return +} + +// IsRunning returns if the service is running +func (cM *ConnManager) IsRunning() bool { + return true +} + +// ServiceName returns the service name +func (cM *ConnManager) ServiceName() string { + return utils.RPCConnS +} + +// ShouldRun returns if the service should be running +func (cM *ConnManager) ShouldRun() bool { + return true } diff --git a/services/dispatchers.go b/services/dispatchers.go index f0daf1cbf..131ac4a66 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -34,7 +34,8 @@ import ( // NewDispatcherService returns the Dispatcher Service func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, - server *utils.Server, attrsChan chan rpcclient.RpcClientConnection) servmanager.Service { + server *utils.Server, attrsChan chan rpcclient.RpcClientConnection, + connMgr *ConnManager) servmanager.Service { return &DispatcherService{ connChan: make(chan rpcclient.RpcClientConnection, 1), cfg: cfg, @@ -43,6 +44,7 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, filterSChan: filterSChan, server: server, attrsChan: attrsChan, + conMgr: connMgr, } } @@ -59,6 +61,7 @@ type DispatcherService struct { dspS *dispatchers.DispatcherService rpc *v1.DispatcherSv1 connChan chan rpcclient.RpcClientConnection + conMgr *ConnManager } // Start should handle the sercive start diff --git a/services/ers.go b/services/ers.go index 15797ad88..965eac7e0 100644 --- a/services/ers.go +++ b/services/ers.go @@ -33,7 +33,7 @@ import ( // NewEventReaderService returns the EventReader Service func NewEventReaderService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, sSChan, dispatcherChan chan rpcclient.RpcClientConnection, - exitChan chan bool) servmanager.Service { + exitChan chan bool, connMgr *ConnManager) servmanager.Service { return &EventReaderService{ rldChan: make(chan struct{}, 1), cfg: cfg, @@ -41,6 +41,7 @@ func NewEventReaderService(cfg *config.CGRConfig, filterSChan chan *engine.Filte sSChan: sSChan, dispatcherChan: dispatcherChan, exitChan: exitChan, + connMgr: connMgr, } } @@ -56,6 +57,7 @@ type EventReaderService struct { ers *ers.ERService rldChan chan struct{} stopChan chan struct{} + connMgr *ConnManager } // Start should handle the sercive start @@ -75,11 +77,14 @@ func (erS *EventReaderService) Start() (err error) { utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ERs)) var sS rpcclient.RpcClientConnection - //if sS, err = NewConnection(erS.cfg, erS.sSChan, erS.dispatcherChan, erS.cfg.ERsCfg().SessionSConns); err != nil { - // utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>", - // utils.ERs, utils.SessionS, err.Error())) - // return - //} + + if sS, err = NewConnectionPool(erS.cfg, erS.sSChan, erS.dispatcherChan, + erS.cfg.ERsCfg().SessionSConns, erS.connMgr); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>", + utils.ERs, utils.SessionS, err.Error())) + return + } + // build the service erS.ers = ers.NewERService(erS.cfg, filterS, sS, erS.stopChan) go func(ers *ers.ERService, rldChan chan struct{}) { @@ -99,11 +104,12 @@ func (erS *EventReaderService) GetIntenternalChan() (conn chan rpcclient.RpcClie // Reload handles the change of config func (erS *EventReaderService) Reload() (err error) { var sS rpcclient.RpcClientConnection - //if sS, err = NewConnection(erS.cfg, erS.sSChan, erS.dispatcherChan, erS.cfg.ERsCfg().SessionSConns); err != nil { - // utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>", - // utils.ERs, utils.SessionS, err.Error())) - // return - //} + if sS, err = NewConnectionPool(erS.cfg, erS.sSChan, + erS.dispatcherChan, erS.cfg.ERsCfg().SessionSConns, erS.connMgr); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>", + utils.ERs, utils.SessionS, err.Error())) + return + } erS.RLock() erS.ers.SetSessionSConnection(sS) erS.rldChan <- struct{}{} diff --git a/services/utils.go b/services/utils.go index 0bbf4cb93..5a825a7cd 100644 --- a/services/utils.go +++ b/services/utils.go @@ -41,3 +41,30 @@ func NewConnection(cfg *config.CGRConfig, serviceConnChan, dispatcherSChan chan cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, conns, internalChan, false) } + +// NewConnection returns a new connection +func NewConnectionPool(cfg *config.CGRConfig, serviceConnChan, dispatcherSChan chan rpcclient.RpcClientConnection, + connsIDs []string, connMgr *ConnManager) (rpcclient.RpcClientConnection, error) { + var rpcClient *rpcclient.RpcClientPool + var err error + if len(connsIDs) == 0 { + return nil, nil + } + internalChan := serviceConnChan + if cfg.DispatcherSCfg().Enabled { + internalChan = dispatcherSChan + } + rpcPool := rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST, cfg.GeneralCfg().ReplyTimeout) + atLestOneConnected := false // If one connected we don't longer return errors + for _, connID := range connsIDs { + rpcClient, err = connMgr.GetConn(connID, internalChan) + if err == nil { + atLestOneConnected = true + } + rpcPool.AddClient(rpcClient) + } + if atLestOneConnected { + err = nil + } + return rpcPool, err +} diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 0c933bd78..6b028e3d4 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -147,6 +147,7 @@ func (srvMngr *ServiceManager) GetConfig() *config.CGRConfig { func (srvMngr *ServiceManager) StartServices() (err error) { go srvMngr.handleReload() for serviceName, shouldRun := range map[string]bool{ + utils.RPCConnS: true, utils.AttributeS: srvMngr.GetConfig().AttributeSCfg().Enabled, utils.ChargerS: srvMngr.GetConfig().ChargerSCfg().Enabled, utils.ThresholdS: srvMngr.GetConfig().ThresholdSCfg().Enabled, diff --git a/utils/consts.go b/utils/consts.go index 40073aa3d..2793b4642 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -659,6 +659,7 @@ const ( CDRServer = "CDRServer" ResponderS = "ResponderS" GuardianS = "GuardianS" + RPCConnS = "RPCConnS" ) // Lower service names