Add conncetions from Sessions and CDRs through ConnManager

This commit is contained in:
TeoV
2019-12-10 04:32:26 -05:00
parent 76ca5b9d68
commit 95477bcabc
112 changed files with 937 additions and 2155 deletions

View File

@@ -60,16 +60,11 @@ var (
cfg *config.CGRConfig
)
func startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan chan rpcclient.ClientConnector,
filterSChan chan *engine.FilterS, exitChan chan bool) {
func startCdrcs(filterSChan chan *engine.FilterS, exitChan chan bool, connMgr *engine.ConnManager) {
filterS := <-filterSChan
filterSChan <- filterS
cdrcInitialized := false // Control whether the cdrc was already initialized (so we don't reload in that case)
var cdrcChildrenChan chan struct{} // Will use it to communicate with the children of one fork
intCdrSChan := internalCdrSChan
if cfg.DispatcherSCfg().Enabled {
intCdrSChan = internalDispatcherSChan
}
for {
select {
case <-exitChan: // Stop forking CDRCs
@@ -90,9 +85,9 @@ func startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan cha
}
}
if len(enabledCfgs) != 0 {
go startCdrc(intCdrSChan, internalRaterChan, enabledCfgs,
go startCdrc(enabledCfgs,
cfg.GeneralCfg().HttpSkipTlsVerify, filterSChan,
cdrcChildrenChan, exitChan)
cdrcChildrenChan, exitChan, connMgr)
} else {
utils.Logger.Info("<CDRC> No enabled CDRC clients")
}
@@ -102,26 +97,15 @@ func startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan cha
}
// Fires up a cdrc instance
func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.ClientConnector, cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool,
filterSChan chan *engine.FilterS, closeChan chan struct{}, exitChan chan bool) {
func startCdrc(cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool,
filterSChan chan *engine.FilterS, closeChan chan struct{}, exitChan chan bool, connMgr *engine.ConnManager) {
filterS := <-filterSChan
filterSChan <- filterS
var err error
var cdrsConn rpcclient.ClientConnector
cdrcCfg := cdrcCfgs[0]
cdrsConn, err = engine.NewRPCPool(rpcclient.PoolFirst, cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cdrcCfg.CdrsConns, internalCdrSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRC> Could not connect to CDRS via RPC: %s", err.Error()))
exitChan <- true
return
}
cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrsConn, closeChan,
cfg.GeneralCfg().DefaultTimezone, filterS)
cfg.GeneralCfg().DefaultTimezone, filterS, connMgr)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("Cdrc config parsing error: %s", err.Error()))
exitChan <- true
@@ -520,10 +504,9 @@ func main() {
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaApier): internalAPIerV1Chan,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes): internalAttributeSChan,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): internalCacheSChan,
//utils.CDRsV1: cdrS.GetIntenternalChan(),
//utils.CDRsV2: cdrS.GetIntenternalChan(),
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers): internalChargerSChan,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian): internalGuardianSChan,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs): internalCDRServerChan,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers): internalChargerSChan,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian): internalGuardianSChan,
//utils.LoaderSv1: ldrs.GetIntenternalChan(),
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources): internalResourceSChan,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResponder): internalResponderChan,
@@ -536,6 +519,7 @@ func main() {
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig): internalConfigChan,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore): internalCoreSv1Chan,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRALs): internalRALsChan,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers): internalDispatcherSChan,
})
attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan)
@@ -558,14 +542,10 @@ func main() {
schS, exitChan, connManager.GetConnMgr())
cdrS := services.NewCDRServer(cfg, dmService, storDBService, filterSChan, server, internalCDRServerChan,
chrS.GetIntenternalChan(), rals.GetResponder().GetIntenternalChan(),
attrS.GetIntenternalChan(), tS.GetIntenternalChan(),
stS.GetIntenternalChan(), dspS.GetIntenternalChan())
connManager.GetConnMgr())
smg := services.NewSessionService(cfg, dmService, server, internalSessionSChan, exitChan, connManager.GetConnMgr())
smg := services.NewSessionService(cfg, dmService, server, chrS.GetIntenternalChan(),
rals.GetResponder().GetIntenternalChan(), reS.GetIntenternalChan(),
tS.GetIntenternalChan(), stS.GetIntenternalChan(), supS.GetIntenternalChan(),
attrS.GetIntenternalChan(), cdrS.GetIntenternalChan(), dspS.GetIntenternalChan(), internalSessionSChan, exitChan)
ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, internalCacheSChan, dspS.GetIntenternalChan(), exitChan)
anz := services.NewAnalyzerService(cfg, server, exitChan)
@@ -618,7 +598,7 @@ func main() {
initConfigSv1(internalConfigChan, server)
// Start CDRC components if necessary
go startCdrcs(cdrS.GetIntenternalChan(), rals.GetResponder().GetIntenternalChan(), dspS.GetIntenternalChan(), filterSChan, exitChan)
go startCdrcs(filterSChan, exitChan, connManager.GetConnMgr())
// Serve rpc connections
go startRpc(server, rals.GetResponder().GetIntenternalChan(), cdrS.GetIntenternalChan(),