diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index 28fc815f3..edaad3958 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -57,12 +57,16 @@ Parameters specific per config instance: */ func NewCdrc(cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection, closeChan chan struct{}, dfltTimezone string, roundDecimals int, filterS *engine.FilterS) (*Cdrc, error) { - var cdrcCfg *config.CdrcCfg - for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one - break - } - cdrc := &Cdrc{httpSkipTlsCheck: httpSkipTlsCheck, cdrcCfgs: cdrcCfgs, dfltCdrcCfg: cdrcCfg, timezone: utils.FirstNonEmpty(cdrcCfg.Timezone, dfltTimezone), cdrs: cdrs, - closeChan: closeChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles), + cdrcCfg := cdrcCfgs[0] + + cdrc := &Cdrc{ + httpSkipTlsCheck: httpSkipTlsCheck, + cdrcCfgs: cdrcCfgs, + dfltCdrcCfg: cdrcCfg, + timezone: utils.FirstNonEmpty(cdrcCfg.Timezone, dfltTimezone), + cdrs: cdrs, + closeChan: closeChan, + maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles), } var processFile struct{} for i := 0; i < cdrcCfg.MaxOpenFiles; i++ { diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 113f34b93..0c3a29a99 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -79,12 +79,17 @@ var ( cfg *config.CGRConfig ) -func startCdrcs(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConnection, - exitChan chan bool, filterSChan chan *engine.FilterS) { +func startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, + filterSChan chan *engine.FilterS, exitChan chan bool) { filterS := <-filterSChan filterSChan <- filterS cdrcInitialized := false // Control whether the cdrc was already initialized (so we don't reload in that case) var cdrcChildrenChan chan struct{} // Will use it to communicate with the children of one fork + var dispatcherSConn rpcclient.RpcClientConnection + if cfg.DispatcherSCfg().Enabled { + dispatcherSConn = <-internalDispatcherSChan + internalDispatcherSChan <- dispatcherSConn + } for { select { case <-exitChan: // Stop forking CDRCs @@ -106,8 +111,8 @@ func startCdrcs(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConn } if len(enabledCfgs) != 0 { go startCdrc(internalCdrSChan, internalRaterChan, enabledCfgs, - cfg.GeneralCfg().HttpSkipTlsVerify, cdrcChildrenChan, - exitChan, filterSChan) + cfg.GeneralCfg().HttpSkipTlsVerify, dispatcherSConn, + filterSChan, cdrcChildrenChan, exitChan) } else { utils.Logger.Info(" No enabled CDRC clients") } @@ -118,22 +123,25 @@ func startCdrcs(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConn // Fires up a cdrc instance func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConnection, cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool, - closeChan chan struct{}, exitChan chan bool, filterSChan chan *engine.FilterS) { + dispatcherSConn rpcclient.RpcClientConnection, filterSChan chan *engine.FilterS, closeChan chan struct{}, exitChan chan bool) { filterS := <-filterSChan filterSChan <- filterS - var cdrcCfg *config.CdrcCfg - for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one - break - } - cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cdrcCfg.CdrsConns, internalCdrSChan, cfg.GeneralCfg().InternalTtl, false) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %s", err.Error())) - exitChan <- true - return + var err error + var cdrsConn rpcclient.RpcClientConnection + if cfg.DispatcherSCfg().Enabled { + cdrsConn = dispatcherSConn + } else { + cdrcCfg := cdrcCfgs[0] + cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, + cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, + cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, + cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, + cdrcCfg.CdrsConns, internalCdrSChan, cfg.GeneralCfg().InternalTtl, false) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %s", err.Error())) + exitChan <- true + return + } } cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrsConn, closeChan, cfg.GeneralCfg().DefaultTimezone, cfg.GeneralCfg().RoundingDecimals, @@ -151,12 +159,19 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne } func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, internalThresholdSChan, - internalStatSChan, internalSupplierSChan, internalAttrSChan, - internalCDRSChan, internalChargerSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) { + internalStatSChan, internalSupplierSChan, internalAttrSChan, internalCDRSChan, internalChargerSChan, + internalDispatcherSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) { utils.Logger.Info("Starting CGRateS Session service.") var err error - var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, cdrsConn, chargerSConn *rpcclient.RpcClientPool - if len(cfg.SessionSCfg().ChargerSConns) != 0 { + var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, cdrsConn, chargerSConn, dispatcherSConn rpcclient.RpcClientConnection + isDispatcherSEnabled := cfg.DispatcherSCfg().Enabled + if isDispatcherSEnabled { + dispatcherSConn = <-internalDispatcherSChan + internalDispatcherSChan <- dispatcherSConn + } + if isDispatcherSEnabled { + chargerSConn = dispatcherSConn + } else if len(cfg.SessionSCfg().ChargerSConns) != 0 { chargerSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, @@ -170,7 +185,9 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in return } } - if len(cfg.SessionSCfg().RALsConns) != 0 { + if isDispatcherSEnabled { + ralsConns = dispatcherSConn + } else if len(cfg.SessionSCfg().RALsConns) != 0 { ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, @@ -184,7 +201,9 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in return } } - if len(cfg.SessionSCfg().ResSConns) != 0 { + if isDispatcherSEnabled { + resSConns = dispatcherSConn + } else if len(cfg.SessionSCfg().ResSConns) != 0 { resSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, @@ -199,7 +218,9 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in return } } - if len(cfg.SessionSCfg().ThreshSConns) != 0 { + if isDispatcherSEnabled { + threshSConns = dispatcherSConn + } else if len(cfg.SessionSCfg().ThreshSConns) != 0 { threshSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, @@ -214,7 +235,9 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in return } } - if len(cfg.SessionSCfg().StatSConns) != 0 { + if isDispatcherSEnabled { + statSConns = dispatcherSConn + } else if len(cfg.SessionSCfg().StatSConns) != 0 { statSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, @@ -229,7 +252,9 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in return } } - if len(cfg.SessionSCfg().SupplSConns) != 0 { + if isDispatcherSEnabled { + suplSConns = dispatcherSConn + } else if len(cfg.SessionSCfg().SupplSConns) != 0 { suplSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, @@ -244,7 +269,9 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in return } } - if len(cfg.SessionSCfg().AttrSConns) != 0 { + if isDispatcherSEnabled { + attrSConns = dispatcherSConn + } else if len(cfg.SessionSCfg().AttrSConns) != 0 { attrSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, @@ -259,7 +286,9 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in return } } - if len(cfg.SessionSCfg().CDRsConns) != 0 { + if isDispatcherSEnabled { + cdrsConn = dispatcherSConn + } else if len(cfg.SessionSCfg().CDRsConns) != 0 { cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, @@ -299,7 +328,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in server.RpcRegister(smgRpc) ssv1 := v1.NewSessionSv1(sm) // methods with multiple options - if !config.CgrConfig().DispatcherSCfg().Enabled { + if !cfg.DispatcherSCfg().Enabled { server.RpcRegister(ssv1) } // Register BiRpc handlers @@ -379,7 +408,7 @@ func startAsteriskAgent(internalSMGChan, internalDispatcherSChan chan rpcclient. } func startDiameterAgent(internalSsChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, - exitChan chan bool, filterSChan chan *engine.FilterS) { + filterSChan chan *engine.FilterS, exitChan chan bool) { var err error utils.Logger.Info("Starting CGRateS DiameterAgent service") filterS := <-filterSChan @@ -439,8 +468,8 @@ func startDiameterAgent(internalSsChan, internalDispatcherSChan chan rpcclient.R exitChan <- true } -func startRadiusAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, exitChan chan bool, - filterSChan chan *engine.FilterS) { +func startRadiusAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, + filterSChan chan *engine.FilterS, exitChan chan bool) { filterS := <-filterSChan filterSChan <- filterS utils.Logger.Info("Starting CGRateS RadiusAgent service") @@ -648,8 +677,7 @@ func startKamAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcCl } func startHTTPAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, - exitChan chan bool, server *utils.Server, - filterSChan chan *engine.FilterS, dfltTenant string) { + server *utils.Server, filterSChan chan *engine.FilterS, dfltTenant string, exitChan chan bool) { filterS := <-filterSChan filterSChan <- filterS var sS rpcclient.RpcClientConnection @@ -681,19 +709,23 @@ func startHTTPAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcC } } -func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, - cdrDb engine.CdrStorage, dm *engine.DataManager, - internalRaterChan, internalAttributeSChan, - internalThresholdSChan, internalStatSChan, - internalChargerSChan chan rpcclient.RpcClientConnection, - server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) { +func startCDRS(internalCdrSChan, internalRaterChan, internalAttributeSChan, internalThresholdSChan, + internalStatSChan, internalChargerSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, + cdrDb engine.CdrStorage, dm *engine.DataManager, server *utils.Server, + filterSChan chan *engine.FilterS, exitChan chan bool) { filterS := <-filterSChan filterSChan <- filterS var err error utils.Logger.Info("Starting CGRateS CDRS service.") - var ralConn, attrSConn, - thresholdSConn, statsConn, chargerSConn *rpcclient.RpcClientPool - if len(cfg.CdrsCfg().CDRSChargerSConns) != 0 { // Conn pool towards RAL + var ralConn, attrSConn, thresholdSConn, statsConn, chargerSConn, dispatcherSConn rpcclient.RpcClientConnection + isDispatcherSEnabled := cfg.DispatcherSCfg().Enabled + if isDispatcherSEnabled { + dispatcherSConn = <-internalDispatcherSChan + internalDispatcherSChan <- dispatcherSConn + } + if isDispatcherSEnabled { + chargerSConn = dispatcherSConn + } else if len(cfg.CdrsCfg().CDRSChargerSConns) != 0 { // Conn pool towards RAL chargerSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, @@ -708,7 +740,9 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, return } } - if len(cfg.CdrsCfg().CDRSRaterConns) != 0 { // Conn pool towards RAL + if isDispatcherSEnabled { + ralConn = dispatcherSConn + } else if len(cfg.CdrsCfg().CDRSRaterConns) != 0 { // Conn pool towards RAL ralConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, @@ -722,7 +756,9 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, return } } - if len(cfg.CdrsCfg().CDRSAttributeSConns) != 0 { // Users connection init + if isDispatcherSEnabled { + attrSConn = dispatcherSConn + } else if len(cfg.CdrsCfg().CDRSAttributeSConns) != 0 { // Users connection init attrSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, @@ -737,7 +773,9 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, return } } - if len(cfg.CdrsCfg().CDRSThresholdSConns) != 0 { // Stats connection init + if isDispatcherSEnabled { + thresholdSConn = dispatcherSConn + } else if len(cfg.CdrsCfg().CDRSThresholdSConns) != 0 { // Stats connection init thresholdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, @@ -751,7 +789,9 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, return } } - if len(cfg.CdrsCfg().CDRSStatSConns) != 0 { // Stats connection init + if isDispatcherSEnabled { + statsConn = dispatcherSConn + } else if len(cfg.CdrsCfg().CDRSStatSConns) != 0 { // Stats connection init statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, @@ -794,7 +834,7 @@ func startScheduler(internalSchedulerChan chan *scheduler.Scheduler, cacheDoneCh // startAttributeService fires up the AttributeS func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS, cfg *config.CGRConfig, dm *engine.DataManager, - server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) { + server *utils.Server, filterSChan chan *engine.FilterS, exitChan chan bool) { filterS := <-filterSChan filterSChan <- filterS <-cacheS.GetPrecacheChannel(utils.CacheAttributeProfiles) @@ -822,24 +862,28 @@ func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnec return }() aSv1 := v1.NewAttributeSv1(aS) - if !config.CgrConfig().DispatcherSCfg().Enabled { + if !cfg.DispatcherSCfg().Enabled { server.RpcRegister(aSv1) } internalAttributeSChan <- aSv1 } // startChargerService fires up the ChargerS -func startChargerService(internalChargerSChan chan rpcclient.RpcClientConnection, - cacheS *engine.CacheS, internalAttributeSChan chan rpcclient.RpcClientConnection, - cfg *config.CGRConfig, dm *engine.DataManager, - server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) { +func startChargerService(internalChargerSChan, internalAttributeSChan, + internalDispatcherSChan chan rpcclient.RpcClientConnection, + cacheS *engine.CacheS, cfg *config.CGRConfig, + dm *engine.DataManager, server *utils.Server, + filterSChan chan *engine.FilterS, exitChan chan bool) { filterS := <-filterSChan filterSChan <- filterS <-cacheS.GetPrecacheChannel(utils.CacheChargerProfiles) <-cacheS.GetPrecacheChannel(utils.CacheChargerFilterIndexes) - var attrSConn *rpcclient.RpcClientPool + var attrSConn rpcclient.RpcClientConnection var err error - if len(cfg.ChargerSCfg().AttributeSConns) != 0 { // AttributeS connection init + if cfg.DispatcherSCfg().Enabled { + attrSConn = <-internalDispatcherSChan + internalDispatcherSChan <- attrSConn + } else if len(cfg.ChargerSCfg().AttributeSConns) != 0 { // AttributeS connection init attrSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, @@ -873,20 +917,25 @@ func startChargerService(internalChargerSChan chan rpcclient.RpcClientConnection return }() cSv1 := v1.NewChargerSv1(cS) - if !config.CgrConfig().DispatcherSCfg().Enabled { + if !cfg.DispatcherSCfg().Enabled { server.RpcRegister(cSv1) } internalChargerSChan <- cSv1 } -func startResourceService(internalRsChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS, - internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, - dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) { +func startResourceService(internalRsChan, internalThresholdSChan, + internalDispatcherSChan chan rpcclient.RpcClientConnection, + cacheS *engine.CacheS, cfg *config.CGRConfig, + dm *engine.DataManager, server *utils.Server, + filterSChan chan *engine.FilterS, exitChan chan bool) { var err error - var thdSConn *rpcclient.RpcClientPool + var thdSConn rpcclient.RpcClientConnection filterS := <-filterSChan filterSChan <- filterS - if len(cfg.ResourceSCfg().ThresholdSConns) != 0 { // Stats connection init + if cfg.DispatcherSCfg().Enabled { + thdSConn = <-internalDispatcherSChan + internalDispatcherSChan <- thdSConn + } else if len(cfg.ResourceSCfg().ThresholdSConns) != 0 { // Stats connection init thdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, @@ -922,21 +971,26 @@ func startResourceService(internalRsChan chan rpcclient.RpcClientConnection, cac return }() rsV1 := v1.NewResourceSv1(rS) - if !config.CgrConfig().DispatcherSCfg().Enabled { + if !cfg.DispatcherSCfg().Enabled { server.RpcRegister(rsV1) } internalRsChan <- rsV1 } // startStatService fires up the StatS -func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS, - internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, - dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) { +func startStatService(internalStatSChan, internalThresholdSChan, + internalDispatcherSChan chan rpcclient.RpcClientConnection, + cacheS *engine.CacheS, cfg *config.CGRConfig, + dm *engine.DataManager, server *utils.Server, + filterSChan chan *engine.FilterS, exitChan chan bool) { var err error - var thdSConn *rpcclient.RpcClientPool + var thdSConn rpcclient.RpcClientConnection filterS := <-filterSChan filterSChan <- filterS - if len(cfg.StatSCfg().ThresholdSConns) != 0 { // Stats connection init + if cfg.DispatcherSCfg().Enabled { + thdSConn = <-internalDispatcherSChan + internalDispatcherSChan <- thdSConn + } else if len(cfg.StatSCfg().ThresholdSConns) != 0 { // Stats connection init thdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, @@ -971,7 +1025,7 @@ func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cach return }() stsV1 := v1.NewStatSv1(sS) - if !config.CgrConfig().DispatcherSCfg().Enabled { + if !cfg.DispatcherSCfg().Enabled { server.RpcRegister(stsV1) } internalStatSChan <- stsV1 @@ -980,7 +1034,7 @@ func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cach // startThresholdService fires up the ThresholdS func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS, cfg *config.CGRConfig, dm *engine.DataManager, - server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) { + server *utils.Server, filterSChan chan *engine.FilterS, exitChan chan bool) { filterS := <-filterSChan filterSChan <- filterS <-cacheS.GetPrecacheChannel(utils.CacheThresholdProfiles) @@ -1004,23 +1058,29 @@ func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnec return }() tSv1 := v1.NewThresholdSv1(tS) - if !config.CgrConfig().DispatcherSCfg().Enabled { + if !cfg.DispatcherSCfg().Enabled { server.RpcRegister(tSv1) } internalThresholdSChan <- tSv1 } // startSupplierService fires up the SupplierS -func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS, - internalRsChan, internalStatSChan chan rpcclient.RpcClientConnection, - cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server, - exitChan chan bool, filterSChan chan *engine.FilterS, - internalAttrSChan chan rpcclient.RpcClientConnection) { +func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSChan, + internalAttrSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, + cacheS *engine.CacheS, cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server, + filterSChan chan *engine.FilterS, exitChan chan bool) { var err error filterS := <-filterSChan filterSChan <- filterS - var attrSConn, resourceSConn, statSConn *rpcclient.RpcClientPool - if len(cfg.SupplierSCfg().AttributeSConns) != 0 { + var attrSConn, resourceSConn, statSConn, dispatcherSConn rpcclient.RpcClientConnection + isDispatcherSEnabled := cfg.DispatcherSCfg().Enabled + if isDispatcherSEnabled { + dispatcherSConn = <-internalDispatcherSChan + internalDispatcherSChan <- dispatcherSConn + } + if isDispatcherSEnabled { + attrSConn = dispatcherSConn + } else if len(cfg.SupplierSCfg().AttributeSConns) != 0 { attrSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, @@ -1035,7 +1095,9 @@ func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnecti return } } - if len(cfg.SupplierSCfg().StatSConns) != 0 { + if isDispatcherSEnabled { + statSConn = dispatcherSConn + } else if len(cfg.SupplierSCfg().StatSConns) != 0 { statSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, @@ -1050,7 +1112,9 @@ func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnecti return } } - if len(cfg.SupplierSCfg().ResourceSConns) != 0 { + if isDispatcherSEnabled { + resourceSConn = dispatcherSConn + } else if len(cfg.SupplierSCfg().ResourceSConns) != 0 { resourceSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, @@ -1087,7 +1151,7 @@ func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnecti return }() splV1 := v1.NewSupplierSv1(splS) - if !config.CgrConfig().DispatcherSCfg().Enabled { + if !cfg.DispatcherSCfg().Enabled { server.RpcRegister(splV1) } internalSupplierSChan <- splV1 @@ -1102,9 +1166,9 @@ func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS, } // loaderService will start and register APIs for LoaderService if enabled -func startLoaderS(cfg *config.CGRConfig, - dm *engine.DataManager, server *utils.Server, exitChan chan bool, - filterSChan chan *engine.FilterS, internalLoaderSChan, cacheSChan chan rpcclient.RpcClientConnection) { +func startLoaderS(internalLoaderSChan, cacheSChan chan rpcclient.RpcClientConnection, + cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server, + filterSChan chan *engine.FilterS, exitChan chan bool) { filterS := <-filterSChan filterSChan <- filterS @@ -1120,10 +1184,8 @@ func startLoaderS(cfg *config.CGRConfig, } // startDispatcherService fires up the DispatcherS -func startDispatcherService(internalDispatcherSChan chan rpcclient.RpcClientConnection, - intAttrSChan chan rpcclient.RpcClientConnection, - cfg *config.CGRConfig, - cacheS *engine.CacheS, filterSChan chan *engine.FilterS, +func startDispatcherService(internalDispatcherSChan, internalAttributeSChan chan rpcclient.RpcClientConnection, + cfg *config.CGRConfig, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, dm *engine.DataManager, server *utils.Server, exitChan chan bool) { utils.Logger.Info("Starting CGRateS Dispatcher service.") fltrS := <-filterSChan @@ -1140,7 +1202,7 @@ func startDispatcherService(internalDispatcherSChan chan rpcclient.RpcClientConn cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DispatcherSCfg().AttributeSConns, intAttrSChan, + cfg.DispatcherSCfg().AttributeSConns, internalAttributeSChan, cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", @@ -1278,7 +1340,7 @@ func startRpc(server *utils.Server, internalRaterChan, internalAttrSChan, internalChargerSChan, internalThdSChan, internalSuplSChan, internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, exitChan chan bool) { - if !config.CgrConfig().DispatcherSCfg().Enabled { + if !cfg.DispatcherSCfg().Enabled { select { // Any of the rpc methods will unlock listening to rpc requests case resp := <-internalRaterChan: internalRaterChan <- resp @@ -1391,20 +1453,25 @@ func initLogger(cfg *config.CGRConfig) error { return nil } -func schedCDRsConns(internalCDRSChan chan rpcclient.RpcClientConnection, exitChan chan bool) { +func schedCDRsConns(internalCDRSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, exitChan chan bool) { var err error - var cdrsConn *rpcclient.RpcClientPool - cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.SchedulerCfg().CDRsConns, internalCDRSChan, - cfg.GeneralCfg().InternalTtl, false) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to CDRServer: %s", utils.SchedulerS, err.Error())) - exitChan <- true - return + var cdrsConn rpcclient.RpcClientConnection + if cfg.DispatcherSCfg().Enabled { + cdrsConn = <-internalDispatcherSChan + internalDispatcherSChan <- cdrsConn + } else { + cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, + cfg.TlsCfg().ClientKey, + cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, + cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, + cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, + cfg.SchedulerCfg().CDRsConns, internalCDRSChan, + cfg.GeneralCfg().InternalTtl, false) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to CDRServer: %s", utils.SchedulerS, err.Error())) + exitChan <- true + return + } } engine.SetSchedCdrsConns(cdrsConn) } @@ -1438,7 +1505,7 @@ func memProfiling(memProfDir string, interval time.Duration, nrFiles int, exitCh } } -func cpuProfiling(cpuProfDir string, exitChan chan bool, stopChan, doneChan chan struct{}) { +func cpuProfiling(cpuProfDir string, stopChan, doneChan chan struct{}, exitChan chan bool) { cpuPath := path.Join(cpuProfDir, "cpu.prof") f, err := os.Create(cpuPath) if err != nil { @@ -1485,7 +1552,7 @@ func main() { cpuProfChanStop := make(chan struct{}) cpuProfChanDone := make(chan struct{}) if *cpuProfDir != "" { - go cpuProfiling(*cpuProfDir, exitChan, cpuProfChanStop, cpuProfChanDone) + go cpuProfiling(*cpuProfDir, cpuProfChanStop, cpuProfChanDone, exitChan) } if *scheduledShutdown != "" { @@ -1568,7 +1635,6 @@ func main() { // Done initing DBs engine.SetRoundingDecimals(cfg.GeneralCfg().RoundingDecimals) engine.SetRpSubjectPrefixMatching(cfg.RalsCfg().RpSubjectPrefixMatching) - stopHandled := false // Rpc/http server server := utils.NewServer() @@ -1629,7 +1695,7 @@ func main() { initGuardianSv1(internalGuardianSChan, server) // Start ServiceManager - srvManager := servmanager.NewServiceManager(cfg, dm, exitChan, cacheS) + srvManager := servmanager.NewServiceManager(cfg, dm, cacheS, exitChan) initServiceManagerV1(internalServeManagerChan, srvManager, server) // init SchedulerS @@ -1642,33 +1708,32 @@ func main() { // Start RALs if cfg.RalsCfg().RALsEnabled { - go startRater(internalRaterChan, internalApierV1Chan, internalApierV2Chan, cacheS, internalThresholdSChan, - internalStatSChan, srvManager, server, dm, loadDb, cdrDb, - &stopHandled, exitChan, cacheS, filterSChan, internalCacheSChan, internalSchedSChan) + go startRater(internalRaterChan, internalApierV1Chan, internalApierV2Chan, internalThresholdSChan, + internalStatSChan, internalCacheSChan, internalSchedSChan, internalDispatcherSChan, srvManager, server, dm, loadDb, cdrDb, + cacheS, filterSChan, exitChan) } // Start CDR Server if cfg.CdrsCfg().CDRSEnabled { - go startCDRS(internalCdrSChan, cdrDb, dm, - internalRaterChan, internalAttributeSChan, + go startCDRS(internalCdrSChan, internalRaterChan, internalAttributeSChan, internalThresholdSChan, internalStatSChan, internalChargerSChan, - server, exitChan, filterSChan) + internalDispatcherSChan, cdrDb, dm, server, filterSChan, exitChan) } // Create connection to CDR Server and share it in engine(used for *cdrlog action) if len(cfg.SchedulerCfg().CDRsConns) != 0 { - go schedCDRsConns(internalCdrSChan, exitChan) + go schedCDRsConns(internalCdrSChan, internalDispatcherSChan, exitChan) } // Start CDRC components if necessary - go startCdrcs(internalCdrSChan, internalRaterChan, exitChan, filterSChan) + go startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan, filterSChan, exitChan) // Start SM-Generic if cfg.SessionSCfg().Enabled { - go startSessionS(internalSMGChan, internalRaterChan, - internalRsChan, internalThresholdSChan, - internalStatSChan, internalSupplierSChan, internalAttributeSChan, - internalCdrSChan, internalChargerSChan, server, exitChan) + go startSessionS(internalSMGChan, internalRaterChan, internalRsChan, + internalThresholdSChan, internalStatSChan, internalSupplierSChan, + internalAttributeSChan, internalCdrSChan, internalChargerSChan, + internalDispatcherSChan, server, exitChan) } // Start FreeSWITCHAgent if cfg.FsAgentCfg().Enabled { @@ -1685,11 +1750,11 @@ func main() { } if cfg.DiameterAgentCfg().Enabled { - go startDiameterAgent(internalSMGChan, internalDispatcherSChan, exitChan, filterSChan) + go startDiameterAgent(internalSMGChan, internalDispatcherSChan, filterSChan, exitChan) } if cfg.RadiusAgentCfg().Enabled { - go startRadiusAgent(internalSMGChan, internalDispatcherSChan, exitChan, filterSChan) + go startRadiusAgent(internalSMGChan, internalDispatcherSChan, filterSChan, exitChan) } if cfg.DNSAgentCfg().Enabled { @@ -1697,8 +1762,8 @@ func main() { } if len(cfg.HttpAgentCfg()) != 0 { - go startHTTPAgent(internalSMGChan, internalDispatcherSChan, exitChan, server, filterSChan, - cfg.GeneralCfg().DefaultTenant) + go startHTTPAgent(internalSMGChan, internalDispatcherSChan, server, filterSChan, + cfg.GeneralCfg().DefaultTenant, exitChan) } // Start FilterS @@ -1706,33 +1771,36 @@ func main() { if cfg.AttributeSCfg().Enabled { go startAttributeService(internalAttributeSChan, cacheS, - cfg, dm, server, exitChan, filterSChan) + cfg, dm, server, filterSChan, exitChan) } if cfg.ChargerSCfg().Enabled { - go startChargerService(internalChargerSChan, cacheS, - internalAttributeSChan, cfg, dm, server, exitChan, filterSChan) + go startChargerService(internalChargerSChan, internalAttributeSChan, + internalDispatcherSChan, cacheS, cfg, dm, server, + filterSChan, exitChan) } // Start RL service if cfg.ResourceSCfg().Enabled { - go startResourceService(internalRsChan, cacheS, - internalThresholdSChan, cfg, dm, server, exitChan, filterSChan) + go startResourceService(internalRsChan, internalThresholdSChan, + internalDispatcherSChan, cacheS, cfg, dm, server, + filterSChan, exitChan) } if cfg.StatSCfg().Enabled { - go startStatService(internalStatSChan, cacheS, - internalThresholdSChan, cfg, dm, server, exitChan, filterSChan) + go startStatService(internalStatSChan, internalThresholdSChan, + internalDispatcherSChan, cacheS, cfg, dm, server, + filterSChan, exitChan) } if cfg.ThresholdSCfg().Enabled { go startThresholdService(internalThresholdSChan, cacheS, - cfg, dm, server, exitChan, filterSChan) + cfg, dm, server, filterSChan, exitChan) } if cfg.SupplierSCfg().Enabled { - go startSupplierService(internalSupplierSChan, cacheS, - internalRsChan, internalStatSChan, - cfg, dm, server, exitChan, filterSChan, internalAttributeSChan) + go startSupplierService(internalSupplierSChan, internalRsChan, + internalStatSChan, internalAttributeSChan, internalDispatcherSChan, + cacheS, cfg, dm, server, filterSChan, exitChan) } if cfg.DispatcherSCfg().Enabled { go startDispatcherService(internalDispatcherSChan, @@ -1744,7 +1812,7 @@ func main() { go startAnalyzerService(internalAnalyzerSChan, server, exitChan) } - go startLoaderS(cfg, dm, server, exitChan, filterSChan, internalLoaderSChan, internalCacheSChan) + go startLoaderS(internalLoaderSChan, internalCacheSChan, cfg, dm, server, filterSChan, exitChan) // Serve rpc connections go startRpc(server, internalRaterChan, internalCdrSChan, diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index bc2c0a75f..057f013e3 100755 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -31,14 +31,12 @@ import ( ) // Starts rater and reports on chan -func startRater(internalRaterChan, internalApierv1, internalApierv2 chan rpcclient.RpcClientConnection, cacheS *engine.CacheS, - internalThdSChan, internalStatSChan chan rpcclient.RpcClientConnection, +func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThdSChan, internalStatSChan, + internalCacheSChan, internalSchedulerSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, serviceManager *servmanager.ServiceManager, server *utils.Server, dm *engine.DataManager, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, - stopHandled *bool, exitChan chan bool, chS *engine.CacheS, // separate from channel for optimization - filterSChan chan *engine.FilterS, - cacheSChan chan rpcclient.RpcClientConnection, - schedulerSChan chan rpcclient.RpcClientConnection) { + chS *engine.CacheS, // separate from channel for optimization + filterSChan chan *engine.FilterS, exitChan chan bool) { filterS := <-filterSChan filterSChan <- filterS var waitTasks []chan struct{} @@ -58,8 +56,17 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2 chan rpcclie <-chS.GetPrecacheChannel(utils.CacheTimings) }() - var thdS *rpcclient.RpcClientPool - if len(cfg.RalsCfg().RALsThresholdSConns) != 0 { // Connections to ThresholdS + var dispatcherConn rpcclient.RpcClientConnection + isDispatcherEnabled := cfg.DispatcherSCfg().Enabled + if isDispatcherEnabled { + dispatcherConn = <-internalDispatcherSChan + internalDispatcherSChan <- dispatcherConn + } + + var thdS rpcclient.RpcClientConnection + if isDispatcherEnabled { + thdS = dispatcherConn + } else if len(cfg.RalsCfg().RALsThresholdSConns) != 0 { // Connections to ThresholdS thdsTaskChan := make(chan struct{}) waitTasks = append(waitTasks, thdsTaskChan) go func() { @@ -80,8 +87,10 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2 chan rpcclie }() } - var stats *rpcclient.RpcClientPool - if len(cfg.RalsCfg().RALsStatSConns) != 0 { // Connections to StatS + var stats rpcclient.RpcClientConnection + if isDispatcherEnabled { + stats = dispatcherConn + } else if len(cfg.RalsCfg().RALsStatSConns) != 0 { // Connections to StatS statsTaskChan := make(chan struct{}) waitTasks = append(waitTasks, statsTaskChan) go func() { @@ -103,8 +112,10 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2 chan rpcclie } //create cache connection - var cacheSrpc *rpcclient.RpcClientPool - if len(cfg.ApierCfg().CachesConns) != 0 { + var cacheSrpc rpcclient.RpcClientConnection + if isDispatcherEnabled { + cacheSrpc = dispatcherConn + } else if len(cfg.ApierCfg().CachesConns) != 0 { cachesTaskChan := make(chan struct{}) waitTasks = append(waitTasks, cachesTaskChan) go func() { @@ -115,7 +126,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2 chan rpcclie cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.ApierCfg().CachesConns, cacheSChan, + cfg.ApierCfg().CachesConns, internalCacheSChan, cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to CacheS, error: %s", err.Error())) @@ -126,8 +137,10 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2 chan rpcclie } //create scheduler connection - var schedulerSrpc *rpcclient.RpcClientPool - if len(cfg.ApierCfg().SchedulerConns) != 0 { + var schedulerSrpc rpcclient.RpcClientConnection + if isDispatcherEnabled { + schedulerSrpc = dispatcherConn + } else if len(cfg.ApierCfg().SchedulerConns) != 0 { schedulerSTaskChan := make(chan struct{}) waitTasks = append(waitTasks, schedulerSTaskChan) go func() { @@ -138,7 +151,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2 chan rpcclie cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.ApierCfg().SchedulerConns, schedulerSChan, + cfg.ApierCfg().SchedulerConns, internalSchedulerSChan, cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to SchedulerS, error: %s", err.Error())) diff --git a/config/config.go b/config/config.go index 2e2692a43..94434cdc7 100755 --- a/config/config.go +++ b/config/config.go @@ -344,7 +344,7 @@ type CGRConfig struct { func (self *CGRConfig) checkConfigSanity() error { // Rater checks - if self.ralsCfg.RALsEnabled { + if self.ralsCfg.RALsEnabled && !self.dispatcherSCfg.Enabled { if !self.statsCfg.Enabled { for _, connCfg := range self.ralsCfg.RALsStatSConns { if connCfg.Address == utils.MetaInternal { @@ -361,7 +361,7 @@ func (self *CGRConfig) checkConfigSanity() error { } } // CDRServer checks - if self.cdrsCfg.CDRSEnabled { + if self.cdrsCfg.CDRSEnabled && !self.dispatcherSCfg.Enabled { if !self.chargerSCfg.Enabled { for _, conn := range self.cdrsCfg.CDRSChargerSConns { if conn.Address == utils.MetaInternal { @@ -460,7 +460,7 @@ func (self *CGRConfig) checkConfigSanity() error { } } // SessionS checks - if self.sessionSCfg.Enabled { + if self.sessionSCfg.Enabled && !self.dispatcherSCfg.Enabled { if len(self.sessionSCfg.RALsConns) == 0 { return errors.New(" RALs definition is mandatory") } @@ -528,11 +528,14 @@ func (self *CGRConfig) checkConfigSanity() error { } } // FreeSWITCHAgent checks - if self.fsAgentCfg.Enabled { + if self.fsAgentCfg.Enabled && !self.dispatcherSCfg.Enabled { + if len(self.fsAgentCfg.SessionSConns) == 0 { + return fmt.Errorf("<%s> SMG definition is mandatory!", utils.FreeSWITCHAgent) + } for _, connCfg := range self.fsAgentCfg.SessionSConns { - if connCfg.Address != utils.MetaInternal { - return errors.New("only <*internal> connectivity allowed in in towards for now") - } + // if connCfg.Address != utils.MetaInternal { + // return errors.New("only <*internal> connectivity allowed in in towards for now") + // } if connCfg.Address == utils.MetaInternal && !self.sessionSCfg.Enabled { return errors.New(" not enabled but referenced by ") @@ -540,11 +543,14 @@ func (self *CGRConfig) checkConfigSanity() error { } } // KamailioAgent checks - if self.kamAgentCfg.Enabled { + if self.kamAgentCfg.Enabled && !self.dispatcherSCfg.Enabled { + if len(self.kamAgentCfg.SessionSConns) == 0 { + return fmt.Errorf("<%s> SMG definition is mandatory!", utils.KamailioAgent) + } for _, connCfg := range self.kamAgentCfg.SessionSConns { - if connCfg.Address != utils.MetaInternal { - return errors.New("only <*internal> connectivity allowed in in towards for now") - } + // if connCfg.Address != utils.MetaInternal { + // return errors.New("only <*internal> connectivity allowed in in towards for now") + // } if connCfg.Address == utils.MetaInternal && !self.sessionSCfg.Enabled { return errors.New(" not enabled but referenced by ") @@ -552,7 +558,7 @@ func (self *CGRConfig) checkConfigSanity() error { } } // SMOpenSIPS checks - if self.SmOsipsConfig.Enabled { + if self.SmOsipsConfig.Enabled && !self.dispatcherSCfg.Enabled { if len(self.SmOsipsConfig.RALsConns) == 0 { return errors.New(" Rater definition is mandatory!") } @@ -575,8 +581,8 @@ func (self *CGRConfig) checkConfigSanity() error { } } // AsteriskAgent checks - if self.asteriskAgentCfg.Enabled { - /*if len(self.asteriskAgentCfg.SessionSConns) == 0 { + if self.asteriskAgentCfg.Enabled && !self.dispatcherSCfg.Enabled { + if len(self.asteriskAgentCfg.SessionSConns) == 0 { return errors.New(" SMG definition is mandatory!") } for _, smAstSMGConn := range self.asteriskAgentCfg.SessionSConns { @@ -584,30 +590,33 @@ func (self *CGRConfig) checkConfigSanity() error { return errors.New(" SMG not enabled.") } } - */ - if !self.sessionSCfg.Enabled { - return errors.New(" SMG not enabled.") - } + // if !self.sessionSCfg.Enabled { + // return errors.New(" SMG not enabled.") + // } } // DAgent checks - if self.diameterAgentCfg.Enabled && !self.dispatcherSCfg.Enabled { - if !self.sessionSCfg.Enabled { - for _, daSMGConn := range self.diameterAgentCfg.SessionSConns { - if daSMGConn.Address == utils.MetaInternal { - return fmt.Errorf("%s not enabled but referenced by %s component", - utils.SessionS, utils.DiameterAgent) - } + if self.diameterAgentCfg.Enabled && !self.sessionSCfg.Enabled && !self.dispatcherSCfg.Enabled { + if len(self.diameterAgentCfg.SessionSConns) == 0 { + return fmt.Errorf("<%s> SMG definition is mandatory!", utils.DiameterAgent) + } + for _, daSMGConn := range self.diameterAgentCfg.SessionSConns { + if daSMGConn.Address == utils.MetaInternal { + return fmt.Errorf("%s not enabled but referenced by %s component", + utils.SessionS, utils.DiameterAgent) } } } if self.radiusAgentCfg.Enabled && !self.sessionSCfg.Enabled && !self.dispatcherSCfg.Enabled { + if len(self.radiusAgentCfg.SessionSConns) == 0 { + return fmt.Errorf("<%s> SMG definition is mandatory!", utils.RadiusAgent) + } for _, raSMGConn := range self.radiusAgentCfg.SessionSConns { if raSMGConn.Address == utils.MetaInternal { return errors.New("SMGeneric not enabled but referenced by RadiusAgent component") } } } - if self.dnsAgentCfg.Enabled && !self.sessionSCfg.Enabled { + if self.dnsAgentCfg.Enabled && !self.sessionSCfg.Enabled && !self.dispatcherSCfg.Enabled { for _, sSConn := range self.dnsAgentCfg.SessionSConns { if sSConn.Address == utils.MetaInternal { return fmt.Errorf("%s not enabled but referenced by %s", utils.SessionS, utils.DNSAgent) @@ -617,9 +626,11 @@ func (self *CGRConfig) checkConfigSanity() error { // HTTPAgent checks for _, httpAgentCfg := range self.httpAgentCfg { // httpAgent checks - for _, sSConn := range httpAgentCfg.SessionSConns { - if sSConn.Address == utils.MetaInternal && self.sessionSCfg.Enabled { - return errors.New("SessionS not enabled but referenced by HttpAgent component") + if !self.dispatcherSCfg.Enabled { + for _, sSConn := range httpAgentCfg.SessionSConns { + if sSConn.Address == utils.MetaInternal && self.sessionSCfg.Enabled { + return errors.New("SessionS not enabled but referenced by HttpAgent component") + } } } if !utils.IsSliceMember([]string{utils.MetaUrl, utils.MetaXml}, httpAgentCfg.RequestPayload) { @@ -636,7 +647,7 @@ func (self *CGRConfig) checkConfigSanity() error { return fmt.Errorf("<%s> process_runs needs to be bigger than 0", utils.AttributeS) } } - if self.chargerSCfg.Enabled { + if self.chargerSCfg.Enabled && !self.dispatcherSCfg.Enabled { for _, connCfg := range self.chargerSCfg.AttributeSConns { if connCfg.Address == utils.MetaInternal && (self.attributeSCfg == nil || !self.attributeSCfg.Enabled) { @@ -645,7 +656,7 @@ func (self *CGRConfig) checkConfigSanity() error { } } // ResourceLimiter checks - if self.resourceSCfg.Enabled && !self.thresholdSCfg.Enabled { + if self.resourceSCfg.Enabled && !self.thresholdSCfg.Enabled && !self.dispatcherSCfg.Enabled { for _, connCfg := range self.resourceSCfg.ThresholdSConns { if connCfg.Address == utils.MetaInternal { return errors.New("ThresholdS not enabled but requested by ResourceS component.") @@ -653,7 +664,7 @@ func (self *CGRConfig) checkConfigSanity() error { } } // StatS checks - if self.statsCfg.Enabled && !self.thresholdSCfg.Enabled { + if self.statsCfg.Enabled && !self.thresholdSCfg.Enabled && !self.dispatcherSCfg.Enabled { for _, connCfg := range self.statsCfg.ThresholdSConns { if connCfg.Address == utils.MetaInternal { return errors.New("ThresholdS not enabled but requested by StatS component.") @@ -661,7 +672,7 @@ func (self *CGRConfig) checkConfigSanity() error { } } // SupplierS checks - if self.supplierSCfg.Enabled { + if self.supplierSCfg.Enabled && !self.dispatcherSCfg.Enabled { for _, connCfg := range self.supplierSCfg.RALsConns { if connCfg.Address != utils.MetaInternal { return errors.New("Only <*internal> RALs connectivity allowed in SupplierS for now") @@ -693,7 +704,7 @@ func (self *CGRConfig) checkConfigSanity() error { } } // Scheduler check connection with CDR Server - if !self.cdrsCfg.CDRSEnabled { + if !self.cdrsCfg.CDRSEnabled && !self.dispatcherSCfg.Enabled { for _, connCfg := range self.schedulerCfg.CDRsConns { if connCfg.Address == utils.MetaInternal { return errors.New("CDR Server not enabled but requested by Scheduler") diff --git a/engine/filters.go b/engine/filters.go index f8841450a..2eb2bdbc4 100644 --- a/engine/filters.go +++ b/engine/filters.go @@ -62,16 +62,14 @@ const ( func NewFilterS(cfg *config.CGRConfig, statSChan, resSChan chan rpcclient.RpcClientConnection, dm *DataManager) (fS *FilterS) { fS = &FilterS{ - statSChan: statSChan, - resSChan: resSChan, - dm: dm, - cfg: cfg, + dm: dm, + cfg: cfg, } if len(cfg.FilterSCfg().StatSConns) != 0 { - fS.connStatS() + fS.connStatS(statSChan) } if len(cfg.FilterSCfg().ResourceSConns) != 0 { - fS.connResourceS() + fS.connResourceS(resSChan) } return } @@ -80,14 +78,13 @@ func NewFilterS(cfg *config.CGRConfig, // uses lazy connections where necessary to avoid deadlocks on service startup type FilterS struct { cfg *config.CGRConfig - statSChan, resSChan chan rpcclient.RpcClientConnection // reference towards internal statS connection, used for lazy connect statSConns, resSConns rpcclient.RpcClientConnection sSConnMux, rSConnMux sync.RWMutex // make sure only one goroutine attempts connecting dm *DataManager } // connStatS returns will connect towards StatS -func (fS *FilterS) connStatS() (err error) { +func (fS *FilterS) connStatS(statSChan chan rpcclient.RpcClientConnection) (err error) { fS.sSConnMux.Lock() defer fS.sSConnMux.Unlock() if fS.statSConns != nil { // connection was populated between locks @@ -98,12 +95,12 @@ func (fS *FilterS) connStatS() (err error) { fS.cfg.TlsCfg().CaCertificate, fS.cfg.GeneralCfg().ConnectAttempts, fS.cfg.GeneralCfg().Reconnects, fS.cfg.GeneralCfg().ConnectTimeout, fS.cfg.GeneralCfg().ReplyTimeout, fS.cfg.FilterSCfg().StatSConns, - fS.statSChan, fS.cfg.GeneralCfg().InternalTtl, true) + statSChan, fS.cfg.GeneralCfg().InternalTtl, true) return } // connResourceS returns will connect towards ResourceS -func (fS *FilterS) connResourceS() (err error) { +func (fS *FilterS) connResourceS(resSChan chan rpcclient.RpcClientConnection) (err error) { fS.rSConnMux.Lock() defer fS.rSConnMux.Unlock() if fS.resSConns != nil { // connection was populated between locks @@ -114,7 +111,7 @@ func (fS *FilterS) connResourceS() (err error) { fS.cfg.TlsCfg().CaCertificate, fS.cfg.GeneralCfg().ConnectAttempts, fS.cfg.GeneralCfg().Reconnects, fS.cfg.GeneralCfg().ConnectTimeout, fS.cfg.GeneralCfg().ReplyTimeout, fS.cfg.FilterSCfg().ResourceSConns, - fS.resSChan, fS.cfg.GeneralCfg().InternalTtl, true) + resSChan, fS.cfg.GeneralCfg().InternalTtl, true) return } diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 27c146cb3..04b2198ec 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -32,9 +32,13 @@ import ( ) func NewServiceManager(cfg *config.CGRConfig, dm *engine.DataManager, - engineShutdown chan bool, cacheS *engine.CacheS) *ServiceManager { - return &ServiceManager{cfg: cfg, dm: dm, - engineShutdown: engineShutdown, cacheS: cacheS} + cacheS *engine.CacheS, engineShutdown chan bool) *ServiceManager { + return &ServiceManager{ + cfg: cfg, + dm: dm, + engineShutdown: engineShutdown, + cacheS: cacheS, + } } // ServiceManager handles service management ran by the engine