Make a PoC for RPCConns with ERs

This commit is contained in:
TeoV
2019-12-02 09:55:54 -05:00
parent eda2958f99
commit 76c6af8538
13 changed files with 136 additions and 35 deletions

View File

@@ -500,8 +500,9 @@ func main() {
// Start ServiceManager
srvManager := servmanager.NewServiceManager(cfg, exitChan)
connManager := services.NewConnManager(cfg)
attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server)
dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, attrS.GetIntenternalChan())
dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, attrS.GetIntenternalChan(), connManager)
chrS := services.NewChargerService(cfg, dmService, cacheS, filterSChan, server,
attrS.GetIntenternalChan(), dspS.GetIntenternalChan())
tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server)
@@ -528,9 +529,9 @@ func main() {
attrS.GetIntenternalChan(), cdrS.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan)
ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, internalCacheSChan, dspS.GetIntenternalChan(), exitChan)
anz := services.NewAnalyzerService(cfg, server, exitChan)
srvManager.AddServices(attrS, chrS, tS, stS, reS, supS, schS, rals,
srvManager.AddServices(connManager, attrS, chrS, tS, stS, reS, supS, schS, rals,
rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS, smg,
services.NewEventReaderService(cfg, filterSChan, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan),
services.NewEventReaderService(cfg, filterSChan, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan, connManager),
services.NewDNSAgent(cfg, filterSChan, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan),
services.NewFreeswitchAgent(cfg, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan),
services.NewKamailioAgent(cfg, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan),

View File

@@ -1116,6 +1116,8 @@ func (cfg *CGRConfig) V1GetConfigSection(args *StringWithArgDispatcher, reply *m
jsonString = utils.ToJSON(cfg.CdreProfiles)
case ERsJson:
jsonString = utils.ToJSON(cfg.ERsCfg())
case RPCConnsJsonName:
jsonString = utils.ToJSON(cfg.RPCConns())
default:
return errors.New("Invalid section")
}
@@ -1710,6 +1712,14 @@ func (cfg *CGRConfig) loadConfig(path, section string) (err error) {
cfg.lks[Apier].Lock()
defer cfg.lks[Apier].Unlock()
loadFuncs = append(loadFuncs, cfg.loadApierCfg)
if !fall {
break
}
fallthrough
case RPCConnsJsonName:
cfg.lks[RPCConnsJsonName].Lock()
defer cfg.lks[RPCConnsJsonName].Unlock()
loadFuncs = append(loadFuncs, cfg.loadRPCConns)
}
return cfg.loadConfigFromPath(path, loadFuncs)
}
@@ -1820,7 +1830,7 @@ func (cfg *CGRConfig) initChanels() {
for _, section := range []string{GENERAL_JSN, DATADB_JSN, STORDB_JSN, LISTEN_JSN, TlsCfgJson, HTTP_JSN, SCHEDULER_JSN, CACHE_JSN, FilterSjsn, RALS_JSN,
CDRS_JSN, CDRE_JSN, CDRC_JSN, ERsJson, SessionSJson, AsteriskAgentJSN, FreeSWITCHAgentJSN, KamailioAgentJSN,
DA_JSN, RA_JSN, HttpAgentJson, DNSAgentJson, ATTRIBUTE_JSN, ChargerSCfgJson, RESOURCES_JSON, STATS_JSON, THRESHOLDS_JSON,
SupplierSJson, LoaderJson, MAILER_JSN, SURETAX_JSON, CgrLoaderCfgJson, CgrMigratorCfgJson, DispatcherSJson, AnalyzerCfgJson, Apier} {
SupplierSJson, LoaderJson, MAILER_JSN, SURETAX_JSON, CgrLoaderCfgJson, CgrMigratorCfgJson, DispatcherSJson, AnalyzerCfgJson, Apier, RPCConnsJsonName} {
cfg.lks[section] = new(sync.RWMutex)
cfg.rldChans[section] = make(chan struct{}, 1)
}

View File

@@ -82,9 +82,7 @@
"ers": {
"enabled": true,
"sessions_conns": [ // connections to SessionS: <*internal|127.0.0.1:2012>
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"sessions_conns": ["*localhost"],
"readers": [
{
"id": "file_reader1",

View File

@@ -82,9 +82,7 @@
"ers": {
"enabled": true,
"sessions_conns": [ // connections to SessionS: <*internal|127.0.0.1:2012>
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"sessions_conns": ["*localhost"],
"readers": [
{
"id": "file_reader1",

View File

@@ -82,9 +82,7 @@
"ers": {
"enabled": true,
"sessions_conns": [ // connections to SessionS: <*internal|127.0.0.1:2012>
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"sessions_conns": ["*localhost"],
"readers": [
{
"id": "file_reader1",

View File

@@ -82,9 +82,7 @@
"ers": {
"enabled": true,
"sessions_conns": [ // connections to SessionS: <*internal|127.0.0.1:2012>
{"address": "*internal"}
],
"sessions_conns": ["*internal"],
"readers": [
{
"id": "file_reader1",

View File

@@ -82,9 +82,7 @@
"ers": {
"enabled": true,
"sessions_conns": [ // connections to SessionS: <*internal|127.0.0.1:2012>
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"sessions_conns": ["*localhost"],
"readers": [
{
"id": "file_reader1",

View File

@@ -19,18 +19,80 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package services
import (
"sync"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// NewConnManager returns the Connection Manager
func NewConnManager(cfg *config.CGRConfig) (cM *ConnManager) {
return
return &ConnManager{cfg: cfg}
}
type ConnManager struct {
sync.RWMutex
cfg *config.CGRConfig
}
func (cM *ConnManager) GetConn() {
func (cM *ConnManager) GetConn(connID string,
internalChan chan rpcclient.RpcClientConnection) (connPool *rpcclient.RpcClientPool, err error) {
//try to get the connection from cache
if x, ok := engine.Cache.Get(utils.CacheRPCConnections, connID); ok {
if x == nil {
return nil, utils.ErrNotFound
}
return x.(*rpcclient.RpcClientPool), nil
}
// in case we don't found in cache create the connection and add this in cache
connCfg := cM.cfg.RPCConns()[connID]
connPool, err = engine.NewRPCPool(connCfg.Strategy,
cM.cfg.TlsCfg().ClientKey,
cM.cfg.TlsCfg().ClientCerificate, cM.cfg.TlsCfg().CaCertificate,
cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects,
cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout,
connCfg.Conns, internalChan, false)
if err != nil {
return
}
engine.Cache.Set(utils.CacheRPCConnections, connID, connPool, nil,
true, utils.NonTransactional)
return
}
// Start should handle the sercive start
func (cM *ConnManager) Start() (err error) {
return
}
// GetIntenternalChan returns the internal connection chanel
func (cM *ConnManager) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) {
return nil
}
// Reload handles the change of config
func (cM *ConnManager) Reload() (err error) {
return // for the momment nothing to reload
}
// Shutdown stops the service
func (cM *ConnManager) Shutdown() (err error) {
return
}
// IsRunning returns if the service is running
func (cM *ConnManager) IsRunning() bool {
return true
}
// ServiceName returns the service name
func (cM *ConnManager) ServiceName() string {
return utils.RPCConnS
}
// ShouldRun returns if the service should be running
func (cM *ConnManager) ShouldRun() bool {
return true
}

View File

@@ -34,7 +34,8 @@ import (
// NewDispatcherService returns the Dispatcher Service
func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
server *utils.Server, attrsChan chan rpcclient.RpcClientConnection) servmanager.Service {
server *utils.Server, attrsChan chan rpcclient.RpcClientConnection,
connMgr *ConnManager) servmanager.Service {
return &DispatcherService{
connChan: make(chan rpcclient.RpcClientConnection, 1),
cfg: cfg,
@@ -43,6 +44,7 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
filterSChan: filterSChan,
server: server,
attrsChan: attrsChan,
conMgr: connMgr,
}
}
@@ -59,6 +61,7 @@ type DispatcherService struct {
dspS *dispatchers.DispatcherService
rpc *v1.DispatcherSv1
connChan chan rpcclient.RpcClientConnection
conMgr *ConnManager
}
// Start should handle the sercive start

View File

@@ -33,7 +33,7 @@ import (
// NewEventReaderService returns the EventReader Service
func NewEventReaderService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
sSChan, dispatcherChan chan rpcclient.RpcClientConnection,
exitChan chan bool) servmanager.Service {
exitChan chan bool, connMgr *ConnManager) servmanager.Service {
return &EventReaderService{
rldChan: make(chan struct{}, 1),
cfg: cfg,
@@ -41,6 +41,7 @@ func NewEventReaderService(cfg *config.CGRConfig, filterSChan chan *engine.Filte
sSChan: sSChan,
dispatcherChan: dispatcherChan,
exitChan: exitChan,
connMgr: connMgr,
}
}
@@ -56,6 +57,7 @@ type EventReaderService struct {
ers *ers.ERService
rldChan chan struct{}
stopChan chan struct{}
connMgr *ConnManager
}
// Start should handle the sercive start
@@ -75,11 +77,14 @@ func (erS *EventReaderService) Start() (err error) {
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ERs))
var sS rpcclient.RpcClientConnection
//if sS, err = NewConnection(erS.cfg, erS.sSChan, erS.dispatcherChan, erS.cfg.ERsCfg().SessionSConns); err != nil {
// utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>",
// utils.ERs, utils.SessionS, err.Error()))
// return
//}
if sS, err = NewConnectionPool(erS.cfg, erS.sSChan, erS.dispatcherChan,
erS.cfg.ERsCfg().SessionSConns, erS.connMgr); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>",
utils.ERs, utils.SessionS, err.Error()))
return
}
// build the service
erS.ers = ers.NewERService(erS.cfg, filterS, sS, erS.stopChan)
go func(ers *ers.ERService, rldChan chan struct{}) {
@@ -99,11 +104,12 @@ func (erS *EventReaderService) GetIntenternalChan() (conn chan rpcclient.RpcClie
// Reload handles the change of config
func (erS *EventReaderService) Reload() (err error) {
var sS rpcclient.RpcClientConnection
//if sS, err = NewConnection(erS.cfg, erS.sSChan, erS.dispatcherChan, erS.cfg.ERsCfg().SessionSConns); err != nil {
// utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>",
// utils.ERs, utils.SessionS, err.Error()))
// return
//}
if sS, err = NewConnectionPool(erS.cfg, erS.sSChan,
erS.dispatcherChan, erS.cfg.ERsCfg().SessionSConns, erS.connMgr); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>",
utils.ERs, utils.SessionS, err.Error()))
return
}
erS.RLock()
erS.ers.SetSessionSConnection(sS)
erS.rldChan <- struct{}{}

View File

@@ -41,3 +41,30 @@ func NewConnection(cfg *config.CGRConfig, serviceConnChan, dispatcherSChan chan
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
conns, internalChan, false)
}
// NewConnection returns a new connection
func NewConnectionPool(cfg *config.CGRConfig, serviceConnChan, dispatcherSChan chan rpcclient.RpcClientConnection,
connsIDs []string, connMgr *ConnManager) (rpcclient.RpcClientConnection, error) {
var rpcClient *rpcclient.RpcClientPool
var err error
if len(connsIDs) == 0 {
return nil, nil
}
internalChan := serviceConnChan
if cfg.DispatcherSCfg().Enabled {
internalChan = dispatcherSChan
}
rpcPool := rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST, cfg.GeneralCfg().ReplyTimeout)
atLestOneConnected := false // If one connected we don't longer return errors
for _, connID := range connsIDs {
rpcClient, err = connMgr.GetConn(connID, internalChan)
if err == nil {
atLestOneConnected = true
}
rpcPool.AddClient(rpcClient)
}
if atLestOneConnected {
err = nil
}
return rpcPool, err
}

View File

@@ -147,6 +147,7 @@ func (srvMngr *ServiceManager) GetConfig() *config.CGRConfig {
func (srvMngr *ServiceManager) StartServices() (err error) {
go srvMngr.handleReload()
for serviceName, shouldRun := range map[string]bool{
utils.RPCConnS: true,
utils.AttributeS: srvMngr.GetConfig().AttributeSCfg().Enabled,
utils.ChargerS: srvMngr.GetConfig().ChargerSCfg().Enabled,
utils.ThresholdS: srvMngr.GetConfig().ThresholdSCfg().Enabled,

View File

@@ -659,6 +659,7 @@ const (
CDRServer = "CDRServer"
ResponderS = "ResponderS"
GuardianS = "GuardianS"
RPCConnS = "RPCConnS"
)
// Lower service names