Updated all connection between subsystems using DispatcherS

This commit is contained in:
Tripon Alexandru-Ionut
2019-04-15 15:46:06 +03:00
committed by Dan Christian Bogos
parent f10833ae4b
commit 5d0585ec0d
6 changed files with 302 additions and 205 deletions

View File

@@ -57,12 +57,16 @@ Parameters specific per config instance:
*/
func NewCdrc(cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection,
closeChan chan struct{}, dfltTimezone string, roundDecimals int, filterS *engine.FilterS) (*Cdrc, error) {
var cdrcCfg *config.CdrcCfg
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
break
}
cdrc := &Cdrc{httpSkipTlsCheck: httpSkipTlsCheck, cdrcCfgs: cdrcCfgs, dfltCdrcCfg: cdrcCfg, timezone: utils.FirstNonEmpty(cdrcCfg.Timezone, dfltTimezone), cdrs: cdrs,
closeChan: closeChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles),
cdrcCfg := cdrcCfgs[0]
cdrc := &Cdrc{
httpSkipTlsCheck: httpSkipTlsCheck,
cdrcCfgs: cdrcCfgs,
dfltCdrcCfg: cdrcCfg,
timezone: utils.FirstNonEmpty(cdrcCfg.Timezone, dfltTimezone),
cdrs: cdrs,
closeChan: closeChan,
maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles),
}
var processFile struct{}
for i := 0; i < cdrcCfg.MaxOpenFiles; i++ {

View File

@@ -79,12 +79,17 @@ var (
cfg *config.CGRConfig
)
func startCdrcs(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConnection,
exitChan chan bool, filterSChan chan *engine.FilterS) {
func startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan chan rpcclient.RpcClientConnection,
filterSChan chan *engine.FilterS, exitChan chan bool) {
filterS := <-filterSChan
filterSChan <- filterS
cdrcInitialized := false // Control whether the cdrc was already initialized (so we don't reload in that case)
var cdrcChildrenChan chan struct{} // Will use it to communicate with the children of one fork
var dispatcherSConn rpcclient.RpcClientConnection
if cfg.DispatcherSCfg().Enabled {
dispatcherSConn = <-internalDispatcherSChan
internalDispatcherSChan <- dispatcherSConn
}
for {
select {
case <-exitChan: // Stop forking CDRCs
@@ -106,8 +111,8 @@ func startCdrcs(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConn
}
if len(enabledCfgs) != 0 {
go startCdrc(internalCdrSChan, internalRaterChan, enabledCfgs,
cfg.GeneralCfg().HttpSkipTlsVerify, cdrcChildrenChan,
exitChan, filterSChan)
cfg.GeneralCfg().HttpSkipTlsVerify, dispatcherSConn,
filterSChan, cdrcChildrenChan, exitChan)
} else {
utils.Logger.Info("<CDRC> No enabled CDRC clients")
}
@@ -118,22 +123,25 @@ func startCdrcs(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConn
// Fires up a cdrc instance
func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConnection, cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool,
closeChan chan struct{}, exitChan chan bool, filterSChan chan *engine.FilterS) {
dispatcherSConn rpcclient.RpcClientConnection, filterSChan chan *engine.FilterS, closeChan chan struct{}, exitChan chan bool) {
filterS := <-filterSChan
filterSChan <- filterS
var cdrcCfg *config.CdrcCfg
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
break
}
cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cdrcCfg.CdrsConns, internalCdrSChan, cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRC> Could not connect to CDRS via RPC: %s", err.Error()))
exitChan <- true
return
var err error
var cdrsConn rpcclient.RpcClientConnection
if cfg.DispatcherSCfg().Enabled {
cdrsConn = dispatcherSConn
} else {
cdrcCfg := cdrcCfgs[0]
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cdrcCfg.CdrsConns, internalCdrSChan, cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRC> Could not connect to CDRS via RPC: %s", err.Error()))
exitChan <- true
return
}
}
cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrsConn, closeChan,
cfg.GeneralCfg().DefaultTimezone, cfg.GeneralCfg().RoundingDecimals,
@@ -151,12 +159,19 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne
}
func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, internalThresholdSChan,
internalStatSChan, internalSupplierSChan, internalAttrSChan,
internalCDRSChan, internalChargerSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) {
internalStatSChan, internalSupplierSChan, internalAttrSChan, internalCDRSChan, internalChargerSChan,
internalDispatcherSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS Session service.")
var err error
var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, cdrsConn, chargerSConn *rpcclient.RpcClientPool
if len(cfg.SessionSCfg().ChargerSConns) != 0 {
var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, cdrsConn, chargerSConn, dispatcherSConn rpcclient.RpcClientConnection
isDispatcherSEnabled := cfg.DispatcherSCfg().Enabled
if isDispatcherSEnabled {
dispatcherSConn = <-internalDispatcherSChan
internalDispatcherSChan <- dispatcherSConn
}
if isDispatcherSEnabled {
chargerSConn = dispatcherSConn
} else if len(cfg.SessionSCfg().ChargerSConns) != 0 {
chargerSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate,
cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts,
@@ -170,7 +185,9 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
return
}
}
if len(cfg.SessionSCfg().RALsConns) != 0 {
if isDispatcherSEnabled {
ralsConns = dispatcherSConn
} else if len(cfg.SessionSCfg().RALsConns) != 0 {
ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate,
cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts,
@@ -184,7 +201,9 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
return
}
}
if len(cfg.SessionSCfg().ResSConns) != 0 {
if isDispatcherSEnabled {
resSConns = dispatcherSConn
} else if len(cfg.SessionSCfg().ResSConns) != 0 {
resSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
@@ -199,7 +218,9 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
return
}
}
if len(cfg.SessionSCfg().ThreshSConns) != 0 {
if isDispatcherSEnabled {
threshSConns = dispatcherSConn
} else if len(cfg.SessionSCfg().ThreshSConns) != 0 {
threshSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
@@ -214,7 +235,9 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
return
}
}
if len(cfg.SessionSCfg().StatSConns) != 0 {
if isDispatcherSEnabled {
statSConns = dispatcherSConn
} else if len(cfg.SessionSCfg().StatSConns) != 0 {
statSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
@@ -229,7 +252,9 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
return
}
}
if len(cfg.SessionSCfg().SupplSConns) != 0 {
if isDispatcherSEnabled {
suplSConns = dispatcherSConn
} else if len(cfg.SessionSCfg().SupplSConns) != 0 {
suplSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
@@ -244,7 +269,9 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
return
}
}
if len(cfg.SessionSCfg().AttrSConns) != 0 {
if isDispatcherSEnabled {
attrSConns = dispatcherSConn
} else if len(cfg.SessionSCfg().AttrSConns) != 0 {
attrSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
@@ -259,7 +286,9 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
return
}
}
if len(cfg.SessionSCfg().CDRsConns) != 0 {
if isDispatcherSEnabled {
cdrsConn = dispatcherSConn
} else if len(cfg.SessionSCfg().CDRsConns) != 0 {
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
@@ -299,7 +328,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
server.RpcRegister(smgRpc)
ssv1 := v1.NewSessionSv1(sm) // methods with multiple options
if !config.CgrConfig().DispatcherSCfg().Enabled {
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(ssv1)
}
// Register BiRpc handlers
@@ -379,7 +408,7 @@ func startAsteriskAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.
}
func startDiameterAgent(internalSsChan, internalDispatcherSChan chan rpcclient.RpcClientConnection,
exitChan chan bool, filterSChan chan *engine.FilterS) {
filterSChan chan *engine.FilterS, exitChan chan bool) {
var err error
utils.Logger.Info("Starting CGRateS DiameterAgent service")
filterS := <-filterSChan
@@ -439,8 +468,8 @@ func startDiameterAgent(internalSsChan, internalDispatcherSChan chan rpcclient.R
exitChan <- true
}
func startRadiusAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, exitChan chan bool,
filterSChan chan *engine.FilterS) {
func startRadiusAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection,
filterSChan chan *engine.FilterS, exitChan chan bool) {
filterS := <-filterSChan
filterSChan <- filterS
utils.Logger.Info("Starting CGRateS RadiusAgent service")
@@ -648,8 +677,7 @@ func startKamAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcCl
}
func startHTTPAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection,
exitChan chan bool, server *utils.Server,
filterSChan chan *engine.FilterS, dfltTenant string) {
server *utils.Server, filterSChan chan *engine.FilterS, dfltTenant string, exitChan chan bool) {
filterS := <-filterSChan
filterSChan <- filterS
var sS rpcclient.RpcClientConnection
@@ -681,19 +709,23 @@ func startHTTPAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcC
}
}
func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
cdrDb engine.CdrStorage, dm *engine.DataManager,
internalRaterChan, internalAttributeSChan,
internalThresholdSChan, internalStatSChan,
internalChargerSChan chan rpcclient.RpcClientConnection,
server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
func startCDRS(internalCdrSChan, internalRaterChan, internalAttributeSChan, internalThresholdSChan,
internalStatSChan, internalChargerSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection,
cdrDb engine.CdrStorage, dm *engine.DataManager, server *utils.Server,
filterSChan chan *engine.FilterS, exitChan chan bool) {
filterS := <-filterSChan
filterSChan <- filterS
var err error
utils.Logger.Info("Starting CGRateS CDRS service.")
var ralConn, attrSConn,
thresholdSConn, statsConn, chargerSConn *rpcclient.RpcClientPool
if len(cfg.CdrsCfg().CDRSChargerSConns) != 0 { // Conn pool towards RAL
var ralConn, attrSConn, thresholdSConn, statsConn, chargerSConn, dispatcherSConn rpcclient.RpcClientConnection
isDispatcherSEnabled := cfg.DispatcherSCfg().Enabled
if isDispatcherSEnabled {
dispatcherSConn = <-internalDispatcherSChan
internalDispatcherSChan <- dispatcherSConn
}
if isDispatcherSEnabled {
chargerSConn = dispatcherSConn
} else if len(cfg.CdrsCfg().CDRSChargerSConns) != 0 { // Conn pool towards RAL
chargerSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
@@ -708,7 +740,9 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
return
}
}
if len(cfg.CdrsCfg().CDRSRaterConns) != 0 { // Conn pool towards RAL
if isDispatcherSEnabled {
ralConn = dispatcherSConn
} else if len(cfg.CdrsCfg().CDRSRaterConns) != 0 { // Conn pool towards RAL
ralConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
@@ -722,7 +756,9 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
return
}
}
if len(cfg.CdrsCfg().CDRSAttributeSConns) != 0 { // Users connection init
if isDispatcherSEnabled {
attrSConn = dispatcherSConn
} else if len(cfg.CdrsCfg().CDRSAttributeSConns) != 0 { // Users connection init
attrSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
@@ -737,7 +773,9 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
return
}
}
if len(cfg.CdrsCfg().CDRSThresholdSConns) != 0 { // Stats connection init
if isDispatcherSEnabled {
thresholdSConn = dispatcherSConn
} else if len(cfg.CdrsCfg().CDRSThresholdSConns) != 0 { // Stats connection init
thresholdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
@@ -751,7 +789,9 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
return
}
}
if len(cfg.CdrsCfg().CDRSStatSConns) != 0 { // Stats connection init
if isDispatcherSEnabled {
statsConn = dispatcherSConn
} else if len(cfg.CdrsCfg().CDRSStatSConns) != 0 { // Stats connection init
statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
@@ -794,7 +834,7 @@ func startScheduler(internalSchedulerChan chan *scheduler.Scheduler, cacheDoneCh
// startAttributeService fires up the AttributeS
func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnection,
cacheS *engine.CacheS, cfg *config.CGRConfig, dm *engine.DataManager,
server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
server *utils.Server, filterSChan chan *engine.FilterS, exitChan chan bool) {
filterS := <-filterSChan
filterSChan <- filterS
<-cacheS.GetPrecacheChannel(utils.CacheAttributeProfiles)
@@ -822,24 +862,28 @@ func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnec
return
}()
aSv1 := v1.NewAttributeSv1(aS)
if !config.CgrConfig().DispatcherSCfg().Enabled {
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(aSv1)
}
internalAttributeSChan <- aSv1
}
// startChargerService fires up the ChargerS
func startChargerService(internalChargerSChan chan rpcclient.RpcClientConnection,
cacheS *engine.CacheS, internalAttributeSChan chan rpcclient.RpcClientConnection,
cfg *config.CGRConfig, dm *engine.DataManager,
server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
func startChargerService(internalChargerSChan, internalAttributeSChan,
internalDispatcherSChan chan rpcclient.RpcClientConnection,
cacheS *engine.CacheS, cfg *config.CGRConfig,
dm *engine.DataManager, server *utils.Server,
filterSChan chan *engine.FilterS, exitChan chan bool) {
filterS := <-filterSChan
filterSChan <- filterS
<-cacheS.GetPrecacheChannel(utils.CacheChargerProfiles)
<-cacheS.GetPrecacheChannel(utils.CacheChargerFilterIndexes)
var attrSConn *rpcclient.RpcClientPool
var attrSConn rpcclient.RpcClientConnection
var err error
if len(cfg.ChargerSCfg().AttributeSConns) != 0 { // AttributeS connection init
if cfg.DispatcherSCfg().Enabled {
attrSConn = <-internalDispatcherSChan
internalDispatcherSChan <- attrSConn
} else if len(cfg.ChargerSCfg().AttributeSConns) != 0 { // AttributeS connection init
attrSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
@@ -873,20 +917,25 @@ func startChargerService(internalChargerSChan chan rpcclient.RpcClientConnection
return
}()
cSv1 := v1.NewChargerSv1(cS)
if !config.CgrConfig().DispatcherSCfg().Enabled {
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(cSv1)
}
internalChargerSChan <- cSv1
}
func startResourceService(internalRsChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS,
internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
func startResourceService(internalRsChan, internalThresholdSChan,
internalDispatcherSChan chan rpcclient.RpcClientConnection,
cacheS *engine.CacheS, cfg *config.CGRConfig,
dm *engine.DataManager, server *utils.Server,
filterSChan chan *engine.FilterS, exitChan chan bool) {
var err error
var thdSConn *rpcclient.RpcClientPool
var thdSConn rpcclient.RpcClientConnection
filterS := <-filterSChan
filterSChan <- filterS
if len(cfg.ResourceSCfg().ThresholdSConns) != 0 { // Stats connection init
if cfg.DispatcherSCfg().Enabled {
thdSConn = <-internalDispatcherSChan
internalDispatcherSChan <- thdSConn
} else if len(cfg.ResourceSCfg().ThresholdSConns) != 0 { // Stats connection init
thdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
@@ -922,21 +971,26 @@ func startResourceService(internalRsChan chan rpcclient.RpcClientConnection, cac
return
}()
rsV1 := v1.NewResourceSv1(rS)
if !config.CgrConfig().DispatcherSCfg().Enabled {
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(rsV1)
}
internalRsChan <- rsV1
}
// startStatService fires up the StatS
func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS,
internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
func startStatService(internalStatSChan, internalThresholdSChan,
internalDispatcherSChan chan rpcclient.RpcClientConnection,
cacheS *engine.CacheS, cfg *config.CGRConfig,
dm *engine.DataManager, server *utils.Server,
filterSChan chan *engine.FilterS, exitChan chan bool) {
var err error
var thdSConn *rpcclient.RpcClientPool
var thdSConn rpcclient.RpcClientConnection
filterS := <-filterSChan
filterSChan <- filterS
if len(cfg.StatSCfg().ThresholdSConns) != 0 { // Stats connection init
if cfg.DispatcherSCfg().Enabled {
thdSConn = <-internalDispatcherSChan
internalDispatcherSChan <- thdSConn
} else if len(cfg.StatSCfg().ThresholdSConns) != 0 { // Stats connection init
thdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
@@ -971,7 +1025,7 @@ func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cach
return
}()
stsV1 := v1.NewStatSv1(sS)
if !config.CgrConfig().DispatcherSCfg().Enabled {
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(stsV1)
}
internalStatSChan <- stsV1
@@ -980,7 +1034,7 @@ func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cach
// startThresholdService fires up the ThresholdS
func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnection,
cacheS *engine.CacheS, cfg *config.CGRConfig, dm *engine.DataManager,
server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
server *utils.Server, filterSChan chan *engine.FilterS, exitChan chan bool) {
filterS := <-filterSChan
filterSChan <- filterS
<-cacheS.GetPrecacheChannel(utils.CacheThresholdProfiles)
@@ -1004,23 +1058,29 @@ func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnec
return
}()
tSv1 := v1.NewThresholdSv1(tS)
if !config.CgrConfig().DispatcherSCfg().Enabled {
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(tSv1)
}
internalThresholdSChan <- tSv1
}
// startSupplierService fires up the SupplierS
func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS,
internalRsChan, internalStatSChan chan rpcclient.RpcClientConnection,
cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server,
exitChan chan bool, filterSChan chan *engine.FilterS,
internalAttrSChan chan rpcclient.RpcClientConnection) {
func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSChan,
internalAttrSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection,
cacheS *engine.CacheS, cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server,
filterSChan chan *engine.FilterS, exitChan chan bool) {
var err error
filterS := <-filterSChan
filterSChan <- filterS
var attrSConn, resourceSConn, statSConn *rpcclient.RpcClientPool
if len(cfg.SupplierSCfg().AttributeSConns) != 0 {
var attrSConn, resourceSConn, statSConn, dispatcherSConn rpcclient.RpcClientConnection
isDispatcherSEnabled := cfg.DispatcherSCfg().Enabled
if isDispatcherSEnabled {
dispatcherSConn = <-internalDispatcherSChan
internalDispatcherSChan <- dispatcherSConn
}
if isDispatcherSEnabled {
attrSConn = dispatcherSConn
} else if len(cfg.SupplierSCfg().AttributeSConns) != 0 {
attrSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
@@ -1035,7 +1095,9 @@ func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnecti
return
}
}
if len(cfg.SupplierSCfg().StatSConns) != 0 {
if isDispatcherSEnabled {
statSConn = dispatcherSConn
} else if len(cfg.SupplierSCfg().StatSConns) != 0 {
statSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
@@ -1050,7 +1112,9 @@ func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnecti
return
}
}
if len(cfg.SupplierSCfg().ResourceSConns) != 0 {
if isDispatcherSEnabled {
resourceSConn = dispatcherSConn
} else if len(cfg.SupplierSCfg().ResourceSConns) != 0 {
resourceSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
@@ -1087,7 +1151,7 @@ func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnecti
return
}()
splV1 := v1.NewSupplierSv1(splS)
if !config.CgrConfig().DispatcherSCfg().Enabled {
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(splV1)
}
internalSupplierSChan <- splV1
@@ -1102,9 +1166,9 @@ func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS,
}
// loaderService will start and register APIs for LoaderService if enabled
func startLoaderS(cfg *config.CGRConfig,
dm *engine.DataManager, server *utils.Server, exitChan chan bool,
filterSChan chan *engine.FilterS, internalLoaderSChan, cacheSChan chan rpcclient.RpcClientConnection) {
func startLoaderS(internalLoaderSChan, cacheSChan chan rpcclient.RpcClientConnection,
cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server,
filterSChan chan *engine.FilterS, exitChan chan bool) {
filterS := <-filterSChan
filterSChan <- filterS
@@ -1120,10 +1184,8 @@ func startLoaderS(cfg *config.CGRConfig,
}
// startDispatcherService fires up the DispatcherS
func startDispatcherService(internalDispatcherSChan chan rpcclient.RpcClientConnection,
intAttrSChan chan rpcclient.RpcClientConnection,
cfg *config.CGRConfig,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
func startDispatcherService(internalDispatcherSChan, internalAttributeSChan chan rpcclient.RpcClientConnection,
cfg *config.CGRConfig, cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
dm *engine.DataManager, server *utils.Server, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS Dispatcher service.")
fltrS := <-filterSChan
@@ -1140,7 +1202,7 @@ func startDispatcherService(internalDispatcherSChan chan rpcclient.RpcClientConn
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DispatcherSCfg().AttributeSConns, intAttrSChan,
cfg.DispatcherSCfg().AttributeSConns, internalAttributeSChan,
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
@@ -1278,7 +1340,7 @@ func startRpc(server *utils.Server, internalRaterChan,
internalAttrSChan, internalChargerSChan, internalThdSChan, internalSuplSChan,
internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection,
exitChan chan bool) {
if !config.CgrConfig().DispatcherSCfg().Enabled {
if !cfg.DispatcherSCfg().Enabled {
select { // Any of the rpc methods will unlock listening to rpc requests
case resp := <-internalRaterChan:
internalRaterChan <- resp
@@ -1391,20 +1453,25 @@ func initLogger(cfg *config.CGRConfig) error {
return nil
}
func schedCDRsConns(internalCDRSChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
func schedCDRsConns(internalCDRSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
var err error
var cdrsConn *rpcclient.RpcClientPool
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SchedulerCfg().CDRsConns, internalCDRSChan,
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to CDRServer: %s", utils.SchedulerS, err.Error()))
exitChan <- true
return
var cdrsConn rpcclient.RpcClientConnection
if cfg.DispatcherSCfg().Enabled {
cdrsConn = <-internalDispatcherSChan
internalDispatcherSChan <- cdrsConn
} else {
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SchedulerCfg().CDRsConns, internalCDRSChan,
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to CDRServer: %s", utils.SchedulerS, err.Error()))
exitChan <- true
return
}
}
engine.SetSchedCdrsConns(cdrsConn)
}
@@ -1438,7 +1505,7 @@ func memProfiling(memProfDir string, interval time.Duration, nrFiles int, exitCh
}
}
func cpuProfiling(cpuProfDir string, exitChan chan bool, stopChan, doneChan chan struct{}) {
func cpuProfiling(cpuProfDir string, stopChan, doneChan chan struct{}, exitChan chan bool) {
cpuPath := path.Join(cpuProfDir, "cpu.prof")
f, err := os.Create(cpuPath)
if err != nil {
@@ -1485,7 +1552,7 @@ func main() {
cpuProfChanStop := make(chan struct{})
cpuProfChanDone := make(chan struct{})
if *cpuProfDir != "" {
go cpuProfiling(*cpuProfDir, exitChan, cpuProfChanStop, cpuProfChanDone)
go cpuProfiling(*cpuProfDir, cpuProfChanStop, cpuProfChanDone, exitChan)
}
if *scheduledShutdown != "" {
@@ -1568,7 +1635,6 @@ func main() {
// Done initing DBs
engine.SetRoundingDecimals(cfg.GeneralCfg().RoundingDecimals)
engine.SetRpSubjectPrefixMatching(cfg.RalsCfg().RpSubjectPrefixMatching)
stopHandled := false
// Rpc/http server
server := utils.NewServer()
@@ -1629,7 +1695,7 @@ func main() {
initGuardianSv1(internalGuardianSChan, server)
// Start ServiceManager
srvManager := servmanager.NewServiceManager(cfg, dm, exitChan, cacheS)
srvManager := servmanager.NewServiceManager(cfg, dm, cacheS, exitChan)
initServiceManagerV1(internalServeManagerChan, srvManager, server)
// init SchedulerS
@@ -1642,33 +1708,32 @@ func main() {
// Start RALs
if cfg.RalsCfg().RALsEnabled {
go startRater(internalRaterChan, internalApierV1Chan, internalApierV2Chan, cacheS, internalThresholdSChan,
internalStatSChan, srvManager, server, dm, loadDb, cdrDb,
&stopHandled, exitChan, cacheS, filterSChan, internalCacheSChan, internalSchedSChan)
go startRater(internalRaterChan, internalApierV1Chan, internalApierV2Chan, internalThresholdSChan,
internalStatSChan, internalCacheSChan, internalSchedSChan, internalDispatcherSChan, srvManager, server, dm, loadDb, cdrDb,
cacheS, filterSChan, exitChan)
}
// Start CDR Server
if cfg.CdrsCfg().CDRSEnabled {
go startCDRS(internalCdrSChan, cdrDb, dm,
internalRaterChan, internalAttributeSChan,
go startCDRS(internalCdrSChan, internalRaterChan, internalAttributeSChan,
internalThresholdSChan, internalStatSChan, internalChargerSChan,
server, exitChan, filterSChan)
internalDispatcherSChan, cdrDb, dm, server, filterSChan, exitChan)
}
// Create connection to CDR Server and share it in engine(used for *cdrlog action)
if len(cfg.SchedulerCfg().CDRsConns) != 0 {
go schedCDRsConns(internalCdrSChan, exitChan)
go schedCDRsConns(internalCdrSChan, internalDispatcherSChan, exitChan)
}
// Start CDRC components if necessary
go startCdrcs(internalCdrSChan, internalRaterChan, exitChan, filterSChan)
go startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan, filterSChan, exitChan)
// Start SM-Generic
if cfg.SessionSCfg().Enabled {
go startSessionS(internalSMGChan, internalRaterChan,
internalRsChan, internalThresholdSChan,
internalStatSChan, internalSupplierSChan, internalAttributeSChan,
internalCdrSChan, internalChargerSChan, server, exitChan)
go startSessionS(internalSMGChan, internalRaterChan, internalRsChan,
internalThresholdSChan, internalStatSChan, internalSupplierSChan,
internalAttributeSChan, internalCdrSChan, internalChargerSChan,
internalDispatcherSChan, server, exitChan)
}
// Start FreeSWITCHAgent
if cfg.FsAgentCfg().Enabled {
@@ -1685,11 +1750,11 @@ func main() {
}
if cfg.DiameterAgentCfg().Enabled {
go startDiameterAgent(internalSMGChan, internalDispatcherSChan, exitChan, filterSChan)
go startDiameterAgent(internalSMGChan, internalDispatcherSChan, filterSChan, exitChan)
}
if cfg.RadiusAgentCfg().Enabled {
go startRadiusAgent(internalSMGChan, internalDispatcherSChan, exitChan, filterSChan)
go startRadiusAgent(internalSMGChan, internalDispatcherSChan, filterSChan, exitChan)
}
if cfg.DNSAgentCfg().Enabled {
@@ -1697,8 +1762,8 @@ func main() {
}
if len(cfg.HttpAgentCfg()) != 0 {
go startHTTPAgent(internalSMGChan, internalDispatcherSChan, exitChan, server, filterSChan,
cfg.GeneralCfg().DefaultTenant)
go startHTTPAgent(internalSMGChan, internalDispatcherSChan, server, filterSChan,
cfg.GeneralCfg().DefaultTenant, exitChan)
}
// Start FilterS
@@ -1706,33 +1771,36 @@ func main() {
if cfg.AttributeSCfg().Enabled {
go startAttributeService(internalAttributeSChan, cacheS,
cfg, dm, server, exitChan, filterSChan)
cfg, dm, server, filterSChan, exitChan)
}
if cfg.ChargerSCfg().Enabled {
go startChargerService(internalChargerSChan, cacheS,
internalAttributeSChan, cfg, dm, server, exitChan, filterSChan)
go startChargerService(internalChargerSChan, internalAttributeSChan,
internalDispatcherSChan, cacheS, cfg, dm, server,
filterSChan, exitChan)
}
// Start RL service
if cfg.ResourceSCfg().Enabled {
go startResourceService(internalRsChan, cacheS,
internalThresholdSChan, cfg, dm, server, exitChan, filterSChan)
go startResourceService(internalRsChan, internalThresholdSChan,
internalDispatcherSChan, cacheS, cfg, dm, server,
filterSChan, exitChan)
}
if cfg.StatSCfg().Enabled {
go startStatService(internalStatSChan, cacheS,
internalThresholdSChan, cfg, dm, server, exitChan, filterSChan)
go startStatService(internalStatSChan, internalThresholdSChan,
internalDispatcherSChan, cacheS, cfg, dm, server,
filterSChan, exitChan)
}
if cfg.ThresholdSCfg().Enabled {
go startThresholdService(internalThresholdSChan, cacheS,
cfg, dm, server, exitChan, filterSChan)
cfg, dm, server, filterSChan, exitChan)
}
if cfg.SupplierSCfg().Enabled {
go startSupplierService(internalSupplierSChan, cacheS,
internalRsChan, internalStatSChan,
cfg, dm, server, exitChan, filterSChan, internalAttributeSChan)
go startSupplierService(internalSupplierSChan, internalRsChan,
internalStatSChan, internalAttributeSChan, internalDispatcherSChan,
cacheS, cfg, dm, server, filterSChan, exitChan)
}
if cfg.DispatcherSCfg().Enabled {
go startDispatcherService(internalDispatcherSChan,
@@ -1744,7 +1812,7 @@ func main() {
go startAnalyzerService(internalAnalyzerSChan, server, exitChan)
}
go startLoaderS(cfg, dm, server, exitChan, filterSChan, internalLoaderSChan, internalCacheSChan)
go startLoaderS(internalLoaderSChan, internalCacheSChan, cfg, dm, server, filterSChan, exitChan)
// Serve rpc connections
go startRpc(server, internalRaterChan, internalCdrSChan,

View File

@@ -31,14 +31,12 @@ import (
)
// Starts rater and reports on chan
func startRater(internalRaterChan, internalApierv1, internalApierv2 chan rpcclient.RpcClientConnection, cacheS *engine.CacheS,
internalThdSChan, internalStatSChan chan rpcclient.RpcClientConnection,
func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThdSChan, internalStatSChan,
internalCacheSChan, internalSchedulerSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection,
serviceManager *servmanager.ServiceManager, server *utils.Server,
dm *engine.DataManager, loadDb engine.LoadStorage, cdrDb engine.CdrStorage,
stopHandled *bool, exitChan chan bool, chS *engine.CacheS, // separate from channel for optimization
filterSChan chan *engine.FilterS,
cacheSChan chan rpcclient.RpcClientConnection,
schedulerSChan chan rpcclient.RpcClientConnection) {
chS *engine.CacheS, // separate from channel for optimization
filterSChan chan *engine.FilterS, exitChan chan bool) {
filterS := <-filterSChan
filterSChan <- filterS
var waitTasks []chan struct{}
@@ -58,8 +56,17 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2 chan rpcclie
<-chS.GetPrecacheChannel(utils.CacheTimings)
}()
var thdS *rpcclient.RpcClientPool
if len(cfg.RalsCfg().RALsThresholdSConns) != 0 { // Connections to ThresholdS
var dispatcherConn rpcclient.RpcClientConnection
isDispatcherEnabled := cfg.DispatcherSCfg().Enabled
if isDispatcherEnabled {
dispatcherConn = <-internalDispatcherSChan
internalDispatcherSChan <- dispatcherConn
}
var thdS rpcclient.RpcClientConnection
if isDispatcherEnabled {
thdS = dispatcherConn
} else if len(cfg.RalsCfg().RALsThresholdSConns) != 0 { // Connections to ThresholdS
thdsTaskChan := make(chan struct{})
waitTasks = append(waitTasks, thdsTaskChan)
go func() {
@@ -80,8 +87,10 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2 chan rpcclie
}()
}
var stats *rpcclient.RpcClientPool
if len(cfg.RalsCfg().RALsStatSConns) != 0 { // Connections to StatS
var stats rpcclient.RpcClientConnection
if isDispatcherEnabled {
stats = dispatcherConn
} else if len(cfg.RalsCfg().RALsStatSConns) != 0 { // Connections to StatS
statsTaskChan := make(chan struct{})
waitTasks = append(waitTasks, statsTaskChan)
go func() {
@@ -103,8 +112,10 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2 chan rpcclie
}
//create cache connection
var cacheSrpc *rpcclient.RpcClientPool
if len(cfg.ApierCfg().CachesConns) != 0 {
var cacheSrpc rpcclient.RpcClientConnection
if isDispatcherEnabled {
cacheSrpc = dispatcherConn
} else if len(cfg.ApierCfg().CachesConns) != 0 {
cachesTaskChan := make(chan struct{})
waitTasks = append(waitTasks, cachesTaskChan)
go func() {
@@ -115,7 +126,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2 chan rpcclie
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.ApierCfg().CachesConns, cacheSChan,
cfg.ApierCfg().CachesConns, internalCacheSChan,
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<APIer> Could not connect to CacheS, error: %s", err.Error()))
@@ -126,8 +137,10 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2 chan rpcclie
}
//create scheduler connection
var schedulerSrpc *rpcclient.RpcClientPool
if len(cfg.ApierCfg().SchedulerConns) != 0 {
var schedulerSrpc rpcclient.RpcClientConnection
if isDispatcherEnabled {
schedulerSrpc = dispatcherConn
} else if len(cfg.ApierCfg().SchedulerConns) != 0 {
schedulerSTaskChan := make(chan struct{})
waitTasks = append(waitTasks, schedulerSTaskChan)
go func() {
@@ -138,7 +151,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2 chan rpcclie
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.ApierCfg().SchedulerConns, schedulerSChan,
cfg.ApierCfg().SchedulerConns, internalSchedulerSChan,
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<APIer> Could not connect to SchedulerS, error: %s", err.Error()))

View File

@@ -344,7 +344,7 @@ type CGRConfig struct {
func (self *CGRConfig) checkConfigSanity() error {
// Rater checks
if self.ralsCfg.RALsEnabled {
if self.ralsCfg.RALsEnabled && !self.dispatcherSCfg.Enabled {
if !self.statsCfg.Enabled {
for _, connCfg := range self.ralsCfg.RALsStatSConns {
if connCfg.Address == utils.MetaInternal {
@@ -361,7 +361,7 @@ func (self *CGRConfig) checkConfigSanity() error {
}
}
// CDRServer checks
if self.cdrsCfg.CDRSEnabled {
if self.cdrsCfg.CDRSEnabled && !self.dispatcherSCfg.Enabled {
if !self.chargerSCfg.Enabled {
for _, conn := range self.cdrsCfg.CDRSChargerSConns {
if conn.Address == utils.MetaInternal {
@@ -460,7 +460,7 @@ func (self *CGRConfig) checkConfigSanity() error {
}
}
// SessionS checks
if self.sessionSCfg.Enabled {
if self.sessionSCfg.Enabled && !self.dispatcherSCfg.Enabled {
if len(self.sessionSCfg.RALsConns) == 0 {
return errors.New("<SessionS> RALs definition is mandatory")
}
@@ -528,11 +528,14 @@ func (self *CGRConfig) checkConfigSanity() error {
}
}
// FreeSWITCHAgent checks
if self.fsAgentCfg.Enabled {
if self.fsAgentCfg.Enabled && !self.dispatcherSCfg.Enabled {
if len(self.fsAgentCfg.SessionSConns) == 0 {
return fmt.Errorf("<%s> SMG definition is mandatory!", utils.FreeSWITCHAgent)
}
for _, connCfg := range self.fsAgentCfg.SessionSConns {
if connCfg.Address != utils.MetaInternal {
return errors.New("only <*internal> connectivity allowed in in <freeswitch_agent> towards <sessions> for now")
}
// if connCfg.Address != utils.MetaInternal {
// return errors.New("only <*internal> connectivity allowed in in <freeswitch_agent> towards <sessions> for now")
// }
if connCfg.Address == utils.MetaInternal &&
!self.sessionSCfg.Enabled {
return errors.New("<sessions> not enabled but referenced by <freeswitch_agent>")
@@ -540,11 +543,14 @@ func (self *CGRConfig) checkConfigSanity() error {
}
}
// KamailioAgent checks
if self.kamAgentCfg.Enabled {
if self.kamAgentCfg.Enabled && !self.dispatcherSCfg.Enabled {
if len(self.kamAgentCfg.SessionSConns) == 0 {
return fmt.Errorf("<%s> SMG definition is mandatory!", utils.KamailioAgent)
}
for _, connCfg := range self.kamAgentCfg.SessionSConns {
if connCfg.Address != utils.MetaInternal {
return errors.New("only <*internal> connectivity allowed in in <kamailio_agent> towards <sessions> for now")
}
// if connCfg.Address != utils.MetaInternal {
// return errors.New("only <*internal> connectivity allowed in in <kamailio_agent> towards <sessions> for now")
// }
if connCfg.Address == utils.MetaInternal &&
!self.sessionSCfg.Enabled {
return errors.New("<sessions> not enabled but referenced by <kamailio_agent>")
@@ -552,7 +558,7 @@ func (self *CGRConfig) checkConfigSanity() error {
}
}
// SMOpenSIPS checks
if self.SmOsipsConfig.Enabled {
if self.SmOsipsConfig.Enabled && !self.dispatcherSCfg.Enabled {
if len(self.SmOsipsConfig.RALsConns) == 0 {
return errors.New("<SMOpenSIPS> Rater definition is mandatory!")
}
@@ -575,8 +581,8 @@ func (self *CGRConfig) checkConfigSanity() error {
}
}
// AsteriskAgent checks
if self.asteriskAgentCfg.Enabled {
/*if len(self.asteriskAgentCfg.SessionSConns) == 0 {
if self.asteriskAgentCfg.Enabled && !self.dispatcherSCfg.Enabled {
if len(self.asteriskAgentCfg.SessionSConns) == 0 {
return errors.New("<SMAsterisk> SMG definition is mandatory!")
}
for _, smAstSMGConn := range self.asteriskAgentCfg.SessionSConns {
@@ -584,30 +590,33 @@ func (self *CGRConfig) checkConfigSanity() error {
return errors.New("<SMAsterisk> SMG not enabled.")
}
}
*/
if !self.sessionSCfg.Enabled {
return errors.New("<SMAsterisk> SMG not enabled.")
}
// if !self.sessionSCfg.Enabled {
// return errors.New("<SMAsterisk> SMG not enabled.")
// }
}
// DAgent checks
if self.diameterAgentCfg.Enabled && !self.dispatcherSCfg.Enabled {
if !self.sessionSCfg.Enabled {
for _, daSMGConn := range self.diameterAgentCfg.SessionSConns {
if daSMGConn.Address == utils.MetaInternal {
return fmt.Errorf("%s not enabled but referenced by %s component",
utils.SessionS, utils.DiameterAgent)
}
if self.diameterAgentCfg.Enabled && !self.sessionSCfg.Enabled && !self.dispatcherSCfg.Enabled {
if len(self.diameterAgentCfg.SessionSConns) == 0 {
return fmt.Errorf("<%s> SMG definition is mandatory!", utils.DiameterAgent)
}
for _, daSMGConn := range self.diameterAgentCfg.SessionSConns {
if daSMGConn.Address == utils.MetaInternal {
return fmt.Errorf("%s not enabled but referenced by %s component",
utils.SessionS, utils.DiameterAgent)
}
}
}
if self.radiusAgentCfg.Enabled && !self.sessionSCfg.Enabled && !self.dispatcherSCfg.Enabled {
if len(self.radiusAgentCfg.SessionSConns) == 0 {
return fmt.Errorf("<%s> SMG definition is mandatory!", utils.RadiusAgent)
}
for _, raSMGConn := range self.radiusAgentCfg.SessionSConns {
if raSMGConn.Address == utils.MetaInternal {
return errors.New("SMGeneric not enabled but referenced by RadiusAgent component")
}
}
}
if self.dnsAgentCfg.Enabled && !self.sessionSCfg.Enabled {
if self.dnsAgentCfg.Enabled && !self.sessionSCfg.Enabled && !self.dispatcherSCfg.Enabled {
for _, sSConn := range self.dnsAgentCfg.SessionSConns {
if sSConn.Address == utils.MetaInternal {
return fmt.Errorf("%s not enabled but referenced by %s", utils.SessionS, utils.DNSAgent)
@@ -617,9 +626,11 @@ func (self *CGRConfig) checkConfigSanity() error {
// HTTPAgent checks
for _, httpAgentCfg := range self.httpAgentCfg {
// httpAgent checks
for _, sSConn := range httpAgentCfg.SessionSConns {
if sSConn.Address == utils.MetaInternal && self.sessionSCfg.Enabled {
return errors.New("SessionS not enabled but referenced by HttpAgent component")
if !self.dispatcherSCfg.Enabled {
for _, sSConn := range httpAgentCfg.SessionSConns {
if sSConn.Address == utils.MetaInternal && self.sessionSCfg.Enabled {
return errors.New("SessionS not enabled but referenced by HttpAgent component")
}
}
}
if !utils.IsSliceMember([]string{utils.MetaUrl, utils.MetaXml}, httpAgentCfg.RequestPayload) {
@@ -636,7 +647,7 @@ func (self *CGRConfig) checkConfigSanity() error {
return fmt.Errorf("<%s> process_runs needs to be bigger than 0", utils.AttributeS)
}
}
if self.chargerSCfg.Enabled {
if self.chargerSCfg.Enabled && !self.dispatcherSCfg.Enabled {
for _, connCfg := range self.chargerSCfg.AttributeSConns {
if connCfg.Address == utils.MetaInternal &&
(self.attributeSCfg == nil || !self.attributeSCfg.Enabled) {
@@ -645,7 +656,7 @@ func (self *CGRConfig) checkConfigSanity() error {
}
}
// ResourceLimiter checks
if self.resourceSCfg.Enabled && !self.thresholdSCfg.Enabled {
if self.resourceSCfg.Enabled && !self.thresholdSCfg.Enabled && !self.dispatcherSCfg.Enabled {
for _, connCfg := range self.resourceSCfg.ThresholdSConns {
if connCfg.Address == utils.MetaInternal {
return errors.New("ThresholdS not enabled but requested by ResourceS component.")
@@ -653,7 +664,7 @@ func (self *CGRConfig) checkConfigSanity() error {
}
}
// StatS checks
if self.statsCfg.Enabled && !self.thresholdSCfg.Enabled {
if self.statsCfg.Enabled && !self.thresholdSCfg.Enabled && !self.dispatcherSCfg.Enabled {
for _, connCfg := range self.statsCfg.ThresholdSConns {
if connCfg.Address == utils.MetaInternal {
return errors.New("ThresholdS not enabled but requested by StatS component.")
@@ -661,7 +672,7 @@ func (self *CGRConfig) checkConfigSanity() error {
}
}
// SupplierS checks
if self.supplierSCfg.Enabled {
if self.supplierSCfg.Enabled && !self.dispatcherSCfg.Enabled {
for _, connCfg := range self.supplierSCfg.RALsConns {
if connCfg.Address != utils.MetaInternal {
return errors.New("Only <*internal> RALs connectivity allowed in SupplierS for now")
@@ -693,7 +704,7 @@ func (self *CGRConfig) checkConfigSanity() error {
}
}
// Scheduler check connection with CDR Server
if !self.cdrsCfg.CDRSEnabled {
if !self.cdrsCfg.CDRSEnabled && !self.dispatcherSCfg.Enabled {
for _, connCfg := range self.schedulerCfg.CDRsConns {
if connCfg.Address == utils.MetaInternal {
return errors.New("CDR Server not enabled but requested by Scheduler")

View File

@@ -62,16 +62,14 @@ const (
func NewFilterS(cfg *config.CGRConfig,
statSChan, resSChan chan rpcclient.RpcClientConnection, dm *DataManager) (fS *FilterS) {
fS = &FilterS{
statSChan: statSChan,
resSChan: resSChan,
dm: dm,
cfg: cfg,
dm: dm,
cfg: cfg,
}
if len(cfg.FilterSCfg().StatSConns) != 0 {
fS.connStatS()
fS.connStatS(statSChan)
}
if len(cfg.FilterSCfg().ResourceSConns) != 0 {
fS.connResourceS()
fS.connResourceS(resSChan)
}
return
}
@@ -80,14 +78,13 @@ func NewFilterS(cfg *config.CGRConfig,
// uses lazy connections where necessary to avoid deadlocks on service startup
type FilterS struct {
cfg *config.CGRConfig
statSChan, resSChan chan rpcclient.RpcClientConnection // reference towards internal statS connection, used for lazy connect
statSConns, resSConns rpcclient.RpcClientConnection
sSConnMux, rSConnMux sync.RWMutex // make sure only one goroutine attempts connecting
dm *DataManager
}
// connStatS returns will connect towards StatS
func (fS *FilterS) connStatS() (err error) {
func (fS *FilterS) connStatS(statSChan chan rpcclient.RpcClientConnection) (err error) {
fS.sSConnMux.Lock()
defer fS.sSConnMux.Unlock()
if fS.statSConns != nil { // connection was populated between locks
@@ -98,12 +95,12 @@ func (fS *FilterS) connStatS() (err error) {
fS.cfg.TlsCfg().CaCertificate, fS.cfg.GeneralCfg().ConnectAttempts,
fS.cfg.GeneralCfg().Reconnects, fS.cfg.GeneralCfg().ConnectTimeout,
fS.cfg.GeneralCfg().ReplyTimeout, fS.cfg.FilterSCfg().StatSConns,
fS.statSChan, fS.cfg.GeneralCfg().InternalTtl, true)
statSChan, fS.cfg.GeneralCfg().InternalTtl, true)
return
}
// connResourceS returns will connect towards ResourceS
func (fS *FilterS) connResourceS() (err error) {
func (fS *FilterS) connResourceS(resSChan chan rpcclient.RpcClientConnection) (err error) {
fS.rSConnMux.Lock()
defer fS.rSConnMux.Unlock()
if fS.resSConns != nil { // connection was populated between locks
@@ -114,7 +111,7 @@ func (fS *FilterS) connResourceS() (err error) {
fS.cfg.TlsCfg().CaCertificate, fS.cfg.GeneralCfg().ConnectAttempts,
fS.cfg.GeneralCfg().Reconnects, fS.cfg.GeneralCfg().ConnectTimeout,
fS.cfg.GeneralCfg().ReplyTimeout, fS.cfg.FilterSCfg().ResourceSConns,
fS.resSChan, fS.cfg.GeneralCfg().InternalTtl, true)
resSChan, fS.cfg.GeneralCfg().InternalTtl, true)
return
}

View File

@@ -32,9 +32,13 @@ import (
)
func NewServiceManager(cfg *config.CGRConfig, dm *engine.DataManager,
engineShutdown chan bool, cacheS *engine.CacheS) *ServiceManager {
return &ServiceManager{cfg: cfg, dm: dm,
engineShutdown: engineShutdown, cacheS: cacheS}
cacheS *engine.CacheS, engineShutdown chan bool) *ServiceManager {
return &ServiceManager{
cfg: cfg,
dm: dm,
engineShutdown: engineShutdown,
cacheS: cacheS,
}
}
// ServiceManager handles service management ran by the engine