diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index e59c69570..a3b254496 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -47,19 +47,6 @@ import ( "github.com/cgrates/rpcclient" ) -const ( - JSON = "json" - GOB = "gob" - POSTGRES = "postgres" - MYSQL = "mysql" - MONGO = "mongo" - REDIS = "redis" - SAME = "same" - FS = "freeswitch" - KAMAILIO = "kamailio" - OSIPS = "opensips" -) - var ( cgrEngineFlags = flag.NewFlagSet("cgr-engine", flag.ContinueOnError) cfgPath = cgrEngineFlags.String("config_path", utils.CONFIG_PATH, "Configuration directory path.") @@ -85,10 +72,9 @@ func startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan cha filterSChan <- filterS cdrcInitialized := false // Control whether the cdrc was already initialized (so we don't reload in that case) var cdrcChildrenChan chan struct{} // Will use it to communicate with the children of one fork - var dispatcherSConn rpcclient.RpcClientConnection + intCdrSChan := internalCdrSChan if cfg.DispatcherSCfg().Enabled { - dispatcherSConn = <-internalDispatcherSChan - internalDispatcherSChan <- dispatcherSConn + intCdrSChan = internalDispatcherSChan } for { select { @@ -110,9 +96,9 @@ func startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan cha } } if len(enabledCfgs) != 0 { - go startCdrc(internalCdrSChan, internalRaterChan, enabledCfgs, - cfg.GeneralCfg().HttpSkipTlsVerify, dispatcherSConn, - filterSChan, cdrcChildrenChan, exitChan) + go startCdrc(intCdrSChan, internalRaterChan, enabledCfgs, + cfg.GeneralCfg().HttpSkipTlsVerify, filterSChan, + cdrcChildrenChan, exitChan) } else { utils.Logger.Info(" No enabled CDRC clients") } @@ -123,26 +109,23 @@ func startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan cha // Fires up a cdrc instance func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConnection, cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool, - dispatcherSConn rpcclient.RpcClientConnection, filterSChan chan *engine.FilterS, closeChan chan struct{}, exitChan chan bool) { + filterSChan chan *engine.FilterS, closeChan chan struct{}, exitChan chan bool) { filterS := <-filterSChan filterSChan <- filterS var err error var cdrsConn rpcclient.RpcClientConnection - if cfg.DispatcherSCfg().Enabled { - cdrsConn = dispatcherSConn - } else { - cdrcCfg := cdrcCfgs[0] - cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cdrcCfg.CdrsConns, internalCdrSChan, false) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %s", err.Error())) - exitChan <- true - return - } + cdrcCfg := cdrcCfgs[0] + cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, + cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, + cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, + cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, + cdrcCfg.CdrsConns, internalCdrSChan, false) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %s", err.Error())) + exitChan <- true + return } + cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrsConn, closeChan, cfg.GeneralCfg().DefaultTimezone, filterS) if err != nil { @@ -163,21 +146,34 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in dm *engine.DataManager, exitChan chan bool) { utils.Logger.Info("Starting CGRateS Session service.") var err error - var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, cdrsConn, chargerSConn, dispatcherSConn rpcclient.RpcClientConnection - isDispatcherSEnabled := cfg.DispatcherSCfg().Enabled - if isDispatcherSEnabled { - dispatcherSConn = <-internalDispatcherSChan - internalDispatcherSChan <- dispatcherSConn + var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, cdrsConn, chargerSConn rpcclient.RpcClientConnection + + intChargerSChan := internalChargerSChan + intRaterChan := internalRaterChan + intResourceSChan := internalResourceSChan + intThresholdSChan := internalThresholdSChan + intStatSChan := internalStatSChan + intSupplierSChan := internalSupplierSChan + intAttrSChan := internalAttrSChan + intCDRSChan := internalCDRSChan + if cfg.DispatcherSCfg().Enabled { + intChargerSChan = internalDispatcherSChan + intRaterChan = internalDispatcherSChan + intResourceSChan = internalDispatcherSChan + intThresholdSChan = internalDispatcherSChan + intStatSChan = internalDispatcherSChan + intSupplierSChan = internalDispatcherSChan + intAttrSChan = internalDispatcherSChan + intCDRSChan = internalDispatcherSChan } - if isDispatcherSEnabled { - chargerSConn = dispatcherSConn - } else if len(cfg.SessionSCfg().ChargerSConns) != 0 { + + if len(cfg.SessionSCfg().ChargerSConns) != 0 { chargerSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.SessionSCfg().ChargerSConns, - internalChargerSChan, false) + intChargerSChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.SessionS, utils.ChargerS, err.Error())) @@ -185,15 +181,13 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in return } } - if isDispatcherSEnabled { - ralsConns = dispatcherSConn - } else if len(cfg.SessionSCfg().RALsConns) != 0 { + if len(cfg.SessionSCfg().RALsConns) != 0 { ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.SessionSCfg().RALsConns, - internalRaterChan, false) + intRaterChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to RALs: %s", utils.SessionS, err.Error())) @@ -201,15 +195,13 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in return } } - if isDispatcherSEnabled { - resSConns = dispatcherSConn - } else if len(cfg.SessionSCfg().ResSConns) != 0 { + if len(cfg.SessionSCfg().ResSConns) != 0 { resSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.SessionSCfg().ResSConns, internalResourceSChan, false) + cfg.SessionSCfg().ResSConns, intResourceSChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ResourceS: %s", utils.SessionS, err.Error())) @@ -217,15 +209,13 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in return } } - if isDispatcherSEnabled { - threshSConns = dispatcherSConn - } else if len(cfg.SessionSCfg().ThreshSConns) != 0 { + if len(cfg.SessionSCfg().ThreshSConns) != 0 { threshSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.SessionSCfg().ThreshSConns, internalThresholdSChan, false) + cfg.SessionSCfg().ThreshSConns, intThresholdSChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.SessionS, err.Error())) @@ -233,15 +223,13 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in return } } - if isDispatcherSEnabled { - statSConns = dispatcherSConn - } else if len(cfg.SessionSCfg().StatSConns) != 0 { + if len(cfg.SessionSCfg().StatSConns) != 0 { statSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.SessionSCfg().StatSConns, internalStatSChan, false) + cfg.SessionSCfg().StatSConns, intStatSChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s", utils.SessionS, err.Error())) @@ -249,15 +237,13 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in return } } - if isDispatcherSEnabled { - suplSConns = dispatcherSConn - } else if len(cfg.SessionSCfg().SupplSConns) != 0 { + if len(cfg.SessionSCfg().SupplSConns) != 0 { suplSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.SessionSCfg().SupplSConns, internalSupplierSChan, false) + cfg.SessionSCfg().SupplSConns, intSupplierSChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SupplierS: %s", utils.SessionS, err.Error())) @@ -265,15 +251,13 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in return } } - if isDispatcherSEnabled { - attrSConns = dispatcherSConn - } else if len(cfg.SessionSCfg().AttrSConns) != 0 { + if len(cfg.SessionSCfg().AttrSConns) != 0 { attrSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.SessionSCfg().AttrSConns, internalAttrSChan, false) + cfg.SessionSCfg().AttrSConns, intAttrSChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to AttributeS: %s", utils.SessionS, err.Error())) @@ -281,15 +265,13 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in return } } - if isDispatcherSEnabled { - cdrsConn = dispatcherSConn - } else if len(cfg.SessionSCfg().CDRsConns) != 0 { + if len(cfg.SessionSCfg().CDRsConns) != 0 { cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.SessionSCfg().CDRsConns, internalCDRSChan, false) + cfg.SessionSCfg().CDRsConns, intCDRSChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to RALs: %s", utils.SessionS, err.Error())) @@ -343,16 +325,11 @@ func startAsteriskAgent(internalSMGChan, internalDispatcherSChan chan rpcclient. var sS rpcclient.RpcClientConnection var sSInternal bool utils.Logger.Info("Starting Asterisk agent") + intSMGChan := internalSMGChan if cfg.DispatcherSCfg().Enabled { - sS = <-internalDispatcherSChan - internalDispatcherSChan <- sS - } else if len(cfg.AsteriskAgentCfg().SessionSConns) == 0 { - utils.Logger.Crit( - fmt.Sprintf("<%s> no SessionS connections defined", - utils.AsteriskAgent)) - exitChan <- true - return - } else if cfg.AsteriskAgentCfg().SessionSConns[0].Address == utils.MetaInternal { + intSMGChan = internalDispatcherSChan + } + if !cfg.DispatcherSCfg().Enabled && cfg.AsteriskAgentCfg().SessionSConns[0].Address == utils.MetaInternal { sSInternal = true sSIntConn := <-internalSMGChan internalSMGChan <- sSIntConn @@ -363,7 +340,7 @@ func startAsteriskAgent(internalSMGChan, internalDispatcherSChan chan rpcclient. cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.AsteriskAgentCfg().SessionSConns, internalSMGChan, false) + cfg.AsteriskAgentCfg().SessionSConns, intSMGChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.AsteriskAgent, utils.SessionS, err.Error())) @@ -408,16 +385,11 @@ func startDiameterAgent(internalSsChan, internalDispatcherSChan chan rpcclient.R filterSChan <- filterS var sS rpcclient.RpcClientConnection var sSInternal bool + intSsChan := internalSsChan if cfg.DispatcherSCfg().Enabled { - sS = <-internalDispatcherSChan - internalDispatcherSChan <- sS - } else if len(cfg.DiameterAgentCfg().SessionSConns) == 0 { - utils.Logger.Crit( - fmt.Sprintf("<%s> no SessionS connections defined", - utils.DiameterAgent)) - exitChan <- true - return - } else if cfg.DiameterAgentCfg().SessionSConns[0].Address == utils.MetaInternal { + intSsChan = internalDispatcherSChan + } + if !cfg.DispatcherSCfg().Enabled && cfg.DiameterAgentCfg().SessionSConns[0].Address == utils.MetaInternal { sSInternal = true sSIntConn := <-internalSsChan internalSsChan <- sSIntConn @@ -428,7 +400,7 @@ func startDiameterAgent(internalSsChan, internalDispatcherSChan chan rpcclient.R cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DiameterAgentCfg().SessionSConns, internalSsChan, false) + cfg.DiameterAgentCfg().SessionSConns, intSsChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.DiameterAgent, utils.SessionS, err.Error())) @@ -467,28 +439,23 @@ func startRadiusAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.Rp utils.Logger.Info("Starting CGRateS RadiusAgent service") var err error var smgConn rpcclient.RpcClientConnection + intSMGChan := internalSMGChan if cfg.DispatcherSCfg().Enabled { - smgConn = <-internalDispatcherSChan - internalDispatcherSChan <- smgConn - } else if len(cfg.RadiusAgentCfg().SessionSConns) != 0 { - smgConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.RadiusAgentCfg().SessionSConns, internalSMGChan, false) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SMG: %s", utils.RadiusAgent, err.Error())) - exitChan <- true - return - } - } else { - utils.Logger.Crit( - fmt.Sprintf("<%s> no SessionS connections defined", - utils.RadiusAgent)) + intSMGChan = internalDispatcherSChan + } + + smgConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, + cfg.TlsCfg().ClientKey, + cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, + cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, + cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, + cfg.RadiusAgentCfg().SessionSConns, intSMGChan, false) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SMG: %s", utils.RadiusAgent, err.Error())) exitChan <- true return } + ra, err := agents.NewRadiusAgent(cfg, filterS, smgConn) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.RadiusAgent, err.Error())) @@ -509,16 +476,11 @@ func startDNSAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcCl filterS := <-filterSChan filterSChan <- filterS utils.Logger.Info(fmt.Sprintf("starting %s service", utils.DNSAgent)) + intSMGChan := internalSMGChan if cfg.DispatcherSCfg().Enabled { - sS = <-internalDispatcherSChan - internalDispatcherSChan <- sS - } else if len(cfg.DNSAgentCfg().SessionSConns) == 0 { - utils.Logger.Crit( - fmt.Sprintf("<%s> no SessionS connections defined", - utils.DNSAgent)) - exitChan <- true - return - } else if cfg.DNSAgentCfg().SessionSConns[0].Address == utils.MetaInternal { + intSMGChan = internalDispatcherSChan + } + if !cfg.DispatcherSCfg().Enabled && cfg.DNSAgentCfg().SessionSConns[0].Address == utils.MetaInternal { // sSInternal = true sSIntConn := <-internalSMGChan internalSMGChan <- sSIntConn @@ -529,7 +491,7 @@ func startDNSAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcCl cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DNSAgentCfg().SessionSConns, internalSMGChan, false) + cfg.DNSAgentCfg().SessionSConns, intSMGChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.DNSAgent, utils.SessionS, err.Error())) @@ -565,16 +527,11 @@ func startFsAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcCli var sS rpcclient.RpcClientConnection var sSInternal bool utils.Logger.Info("Starting FreeSWITCH agent") + intSMGChan := internalSMGChan if cfg.DispatcherSCfg().Enabled { - sS = <-internalDispatcherSChan - internalDispatcherSChan <- sS - } else if len(cfg.FsAgentCfg().SessionSConns) == 0 { - utils.Logger.Crit( - fmt.Sprintf("<%s> no SessionS connections defined", - utils.FreeSWITCHAgent)) - exitChan <- true - return - } else if cfg.FsAgentCfg().SessionSConns[0].Address == utils.MetaInternal { + intSMGChan = internalDispatcherSChan + } + if !cfg.DispatcherSCfg().Enabled && cfg.FsAgentCfg().SessionSConns[0].Address == utils.MetaInternal { sSInternal = true sSIntConn := <-internalSMGChan internalSMGChan <- sSIntConn @@ -585,7 +542,7 @@ func startFsAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcCli cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.FsAgentCfg().SessionSConns, internalSMGChan, false) + cfg.FsAgentCfg().SessionSConns, intSMGChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.FreeSWITCHAgent, utils.SessionS, err.Error())) @@ -617,16 +574,11 @@ func startKamAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcCl var sS rpcclient.RpcClientConnection var sSInternal bool utils.Logger.Info("Starting Kamailio agent") + intSMGChan := internalSMGChan if cfg.DispatcherSCfg().Enabled { - sS = <-internalDispatcherSChan - internalDispatcherSChan <- sS - } else if len(cfg.KamAgentCfg().SessionSConns) == 0 { - utils.Logger.Crit( - fmt.Sprintf("<%s> no SessionS connections defined", - utils.KamailioAgent)) - exitChan <- true - return - } else if cfg.KamAgentCfg().SessionSConns[0].Address == utils.MetaInternal { + intSMGChan = internalDispatcherSChan + } + if !cfg.DispatcherSCfg().Enabled && cfg.KamAgentCfg().SessionSConns[0].Address == utils.MetaInternal { sSInternal = true sSIntConn := <-internalSMGChan internalSMGChan <- sSIntConn @@ -637,7 +589,7 @@ func startKamAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcCl cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.KamAgentCfg().SessionSConns, internalSMGChan, false) + cfg.KamAgentCfg().SessionSConns, intSMGChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.KamailioAgent, utils.SessionS, err.Error())) @@ -669,20 +621,20 @@ func startHTTPAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcC filterS := <-filterSChan filterSChan <- filterS var sS rpcclient.RpcClientConnection + intSMGChan := internalSMGChan if cfg.DispatcherSCfg().Enabled { - sS = <-internalDispatcherSChan - internalDispatcherSChan <- sS + intSMGChan = internalDispatcherSChan } utils.Logger.Info("Starting HTTP agent") var err error for _, agntCfg := range cfg.HttpAgentCfg() { - if !cfg.DispatcherSCfg().Enabled && len(agntCfg.SessionSConns) != 0 { + if len(agntCfg.SessionSConns) != 0 { sS, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - agntCfg.SessionSConns, internalSMGChan, false) + agntCfg.SessionSConns, intSMGChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> could not connect to %s, error: %s", utils.HTTPAgent, utils.SessionS, err.Error())) @@ -704,21 +656,27 @@ func startCDRS(internalCdrSChan, internalRaterChan, internalAttributeSChan, inte filterSChan <- filterS var err error utils.Logger.Info("Starting CGRateS CDRS service.") - var ralConn, attrSConn, thresholdSConn, statsConn, chargerSConn, dispatcherSConn rpcclient.RpcClientConnection - isDispatcherSEnabled := cfg.DispatcherSCfg().Enabled - if isDispatcherSEnabled { - dispatcherSConn = <-internalDispatcherSChan - internalDispatcherSChan <- dispatcherSConn + var ralConn, attrSConn, thresholdSConn, statsConn, chargerSConn rpcclient.RpcClientConnection + + intChargerSChan := internalChargerSChan + intRaterChan := internalRaterChan + intAttributeSChan := internalAttributeSChan + intThresholdSChan := internalThresholdSChan + intStatSChan := internalStatSChan + if cfg.DispatcherSCfg().Enabled { + intChargerSChan = internalDispatcherSChan + intRaterChan = internalDispatcherSChan + intAttributeSChan = internalDispatcherSChan + intThresholdSChan = internalDispatcherSChan + intStatSChan = internalDispatcherSChan } - if isDispatcherSEnabled { - chargerSConn = dispatcherSConn - } else if len(cfg.CdrsCfg().CDRSChargerSConns) != 0 { // Conn pool towards RAL + if len(cfg.CdrsCfg().CDRSChargerSConns) != 0 { // Conn pool towards RAL chargerSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.CdrsCfg().CDRSChargerSConns, internalChargerSChan, false) + cfg.CdrsCfg().CDRSChargerSConns, intChargerSChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", utils.ChargerS, err.Error())) @@ -726,30 +684,26 @@ func startCDRS(internalCdrSChan, internalRaterChan, internalAttributeSChan, inte return } } - if isDispatcherSEnabled { - ralConn = dispatcherSConn - } else if len(cfg.CdrsCfg().CDRSRaterConns) != 0 { // Conn pool towards RAL + if len(cfg.CdrsCfg().CDRSRaterConns) != 0 { // Conn pool towards RAL ralConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.CdrsCfg().CDRSRaterConns, internalRaterChan, false) + cfg.CdrsCfg().CDRSRaterConns, intRaterChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) exitChan <- true return } } - if isDispatcherSEnabled { - attrSConn = dispatcherSConn - } else if len(cfg.CdrsCfg().CDRSAttributeSConns) != 0 { // Users connection init + if len(cfg.CdrsCfg().CDRSAttributeSConns) != 0 { // Users connection init attrSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.CdrsCfg().CDRSAttributeSConns, internalAttributeSChan, false) + cfg.CdrsCfg().CDRSAttributeSConns, intAttributeSChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", utils.AttributeS, err.Error())) @@ -757,30 +711,26 @@ func startCDRS(internalCdrSChan, internalRaterChan, internalAttributeSChan, inte return } } - if isDispatcherSEnabled { - thresholdSConn = dispatcherSConn - } else if len(cfg.CdrsCfg().CDRSThresholdSConns) != 0 { // Stats connection init + if len(cfg.CdrsCfg().CDRSThresholdSConns) != 0 { // Stats connection init thresholdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.CdrsCfg().CDRSThresholdSConns, internalThresholdSChan, false) + cfg.CdrsCfg().CDRSThresholdSConns, intThresholdSChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to ThresholdS: %s", err.Error())) exitChan <- true return } } - if isDispatcherSEnabled { - statsConn = dispatcherSConn - } else if len(cfg.CdrsCfg().CDRSStatSConns) != 0 { // Stats connection init + if len(cfg.CdrsCfg().CDRSStatSConns) != 0 { // Stats connection init statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.CdrsCfg().CDRSStatSConns, internalStatSChan, false) + cfg.CdrsCfg().CDRSStatSConns, intStatSChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to StatS: %s", err.Error())) exitChan <- true @@ -862,16 +812,17 @@ func startChargerService(internalChargerSChan, internalAttributeSChan, <-cacheS.GetPrecacheChannel(utils.CacheChargerFilterIndexes) var attrSConn rpcclient.RpcClientConnection var err error + intAttributeSChan := internalAttributeSChan if cfg.DispatcherSCfg().Enabled { - attrSConn = <-internalDispatcherSChan - internalDispatcherSChan <- attrSConn - } else if len(cfg.ChargerSCfg().AttributeSConns) != 0 { // AttributeS connection init + intAttributeSChan = internalDispatcherSChan + } + if len(cfg.ChargerSCfg().AttributeSConns) != 0 { // AttributeS connection init attrSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.ChargerSCfg().AttributeSConns, internalAttributeSChan, false) + cfg.ChargerSCfg().AttributeSConns, intAttributeSChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.ChargerS, utils.AttributeS, err.Error())) @@ -913,16 +864,17 @@ func startResourceService(internalRsChan, internalThresholdSChan, var thdSConn rpcclient.RpcClientConnection filterS := <-filterSChan filterSChan <- filterS + intThresholdSChan := internalThresholdSChan if cfg.DispatcherSCfg().Enabled { - thdSConn = <-internalDispatcherSChan - internalDispatcherSChan <- thdSConn - } else if len(cfg.ResourceSCfg().ThresholdSConns) != 0 { // Stats connection init + intThresholdSChan = internalDispatcherSChan + } + if len(cfg.ResourceSCfg().ThresholdSConns) != 0 { // Stats connection init thdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.ResourceSCfg().ThresholdSConns, internalThresholdSChan, false) + cfg.ResourceSCfg().ThresholdSConns, intThresholdSChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to ThresholdS: %s", err.Error())) exitChan <- true @@ -967,16 +919,17 @@ func startStatService(internalStatSChan, internalThresholdSChan, var thdSConn rpcclient.RpcClientConnection filterS := <-filterSChan filterSChan <- filterS + intThresholdSChan := internalThresholdSChan if cfg.DispatcherSCfg().Enabled { - thdSConn = <-internalDispatcherSChan - internalDispatcherSChan <- thdSConn - } else if len(cfg.StatSCfg().ThresholdSConns) != 0 { // Stats connection init + intThresholdSChan = internalDispatcherSChan + } + if len(cfg.StatSCfg().ThresholdSConns) != 0 { // Stats connection init thdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.StatSCfg().ThresholdSConns, internalThresholdSChan, false) + cfg.StatSCfg().ThresholdSConns, intThresholdSChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to ThresholdS: %s", err.Error())) exitChan <- true @@ -1051,21 +1004,23 @@ func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSCh var err error filterS := <-filterSChan filterSChan <- filterS - var attrSConn, resourceSConn, statSConn, dispatcherSConn rpcclient.RpcClientConnection - isDispatcherSEnabled := cfg.DispatcherSCfg().Enabled - if isDispatcherSEnabled { - dispatcherSConn = <-internalDispatcherSChan - internalDispatcherSChan <- dispatcherSConn + var attrSConn, resourceSConn, statSConn rpcclient.RpcClientConnection + + intAttrSChan := internalAttrSChan + intStatSChan := internalStatSChan + intRsChan := internalRsChan + if cfg.DispatcherSCfg().Enabled { // use dispatcher as internal chanel if active + intAttrSChan = internalDispatcherSChan + intStatSChan = internalDispatcherSChan + intRsChan = internalDispatcherSChan } - if isDispatcherSEnabled { - attrSConn = dispatcherSConn - } else if len(cfg.SupplierSCfg().AttributeSConns) != 0 { + if len(cfg.SupplierSCfg().AttributeSConns) != 0 { attrSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.SupplierSCfg().AttributeSConns, internalAttrSChan, false) + cfg.SupplierSCfg().AttributeSConns, intAttrSChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.SupplierS, utils.AttributeS, err.Error())) @@ -1073,15 +1028,13 @@ func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSCh return } } - if isDispatcherSEnabled { - statSConn = dispatcherSConn - } else if len(cfg.SupplierSCfg().StatSConns) != 0 { + if len(cfg.SupplierSCfg().StatSConns) != 0 { statSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.SupplierSCfg().StatSConns, internalStatSChan, false) + cfg.SupplierSCfg().StatSConns, intStatSChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s", utils.SupplierS, err.Error())) @@ -1089,15 +1042,13 @@ func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSCh return } } - if isDispatcherSEnabled { - resourceSConn = dispatcherSConn - } else if len(cfg.SupplierSCfg().ResourceSConns) != 0 { + if len(cfg.SupplierSCfg().ResourceSConns) != 0 { resourceSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.SupplierSCfg().ResourceSConns, internalRsChan, false) + cfg.SupplierSCfg().ResourceSConns, intRsChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s", utils.SupplierS, err.Error())) @@ -1457,23 +1408,20 @@ func initLogger(cfg *config.CGRConfig) error { } func schedCDRsConns(internalCDRSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, exitChan chan bool) { - var err error - var cdrsConn rpcclient.RpcClientConnection + intChan := internalCDRSChan if cfg.DispatcherSCfg().Enabled { - cdrsConn = <-internalDispatcherSChan - internalDispatcherSChan <- cdrsConn - } else { - cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.SchedulerCfg().CDRsConns, internalCDRSChan, false) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to CDRServer: %s", utils.SchedulerS, err.Error())) - exitChan <- true - return - } + intChan = internalDispatcherSChan + } + cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, + cfg.TlsCfg().ClientKey, + cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, + cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, + cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, + cfg.SchedulerCfg().CDRsConns, intChan, false) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to CDRServer: %s", utils.SchedulerS, err.Error())) + exitChan <- true + return } engine.SetSchedCdrsConns(cdrsConn) } diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index 6d8db3013..e69faf63e 100755 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -57,17 +57,21 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThd <-chS.GetPrecacheChannel(utils.CacheTimings) }() - var dispatcherConn rpcclient.RpcClientConnection - isDispatcherEnabled := cfg.DispatcherSCfg().Enabled - if isDispatcherEnabled { - dispatcherConn = <-internalDispatcherSChan - internalDispatcherSChan <- dispatcherConn + intThdSChan := internalThdSChan + intStatSChan := internalStatSChan + intCacheSChan := internalCacheSChan + intSchedulerSChanL := internalSchedulerSChan + intAttributeSChan := internalAttributeSChan + if cfg.DispatcherSCfg().Enabled { + intThdSChan = internalDispatcherSChan + intStatSChan = internalDispatcherSChan + intCacheSChan = internalDispatcherSChan + intSchedulerSChanL = internalDispatcherSChan + intAttributeSChan = internalDispatcherSChan } var thdS rpcclient.RpcClientConnection - if isDispatcherEnabled { - thdS = dispatcherConn - } else if len(cfg.RalsCfg().RALsThresholdSConns) != 0 { // Connections to ThresholdS + if len(cfg.RalsCfg().RALsThresholdSConns) != 0 { // Connections to ThresholdS thdsTaskChan := make(chan struct{}) waitTasks = append(waitTasks, thdsTaskChan) go func() { @@ -78,7 +82,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThd cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.RalsCfg().RALsThresholdSConns, internalThdSChan, false) + cfg.RalsCfg().RALsThresholdSConns, intThdSChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to ThresholdS, error: %s", err.Error())) exitChan <- true @@ -88,9 +92,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThd } var stats rpcclient.RpcClientConnection - if isDispatcherEnabled { - stats = dispatcherConn - } else if len(cfg.RalsCfg().RALsStatSConns) != 0 { // Connections to StatS + if len(cfg.RalsCfg().RALsStatSConns) != 0 { // Connections to StatS statsTaskChan := make(chan struct{}) waitTasks = append(waitTasks, statsTaskChan) go func() { @@ -101,7 +103,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThd cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.RalsCfg().RALsStatSConns, internalStatSChan, false) + cfg.RalsCfg().RALsStatSConns, intStatSChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to StatS, error: %s", err.Error())) exitChan <- true @@ -112,9 +114,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThd //create cache connection var cacheSrpc rpcclient.RpcClientConnection - if isDispatcherEnabled { - cacheSrpc = dispatcherConn - } else if len(cfg.ApierCfg().CachesConns) != 0 { + if len(cfg.ApierCfg().CachesConns) != 0 { cachesTaskChan := make(chan struct{}) waitTasks = append(waitTasks, cachesTaskChan) go func() { @@ -125,7 +125,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThd cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.ApierCfg().CachesConns, internalCacheSChan, false) + cfg.ApierCfg().CachesConns, intCacheSChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to CacheS, error: %s", err.Error())) exitChan <- true @@ -136,9 +136,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThd //create scheduler connection var schedulerSrpc rpcclient.RpcClientConnection - if isDispatcherEnabled { - schedulerSrpc = dispatcherConn - } else if len(cfg.ApierCfg().SchedulerConns) != 0 { + if len(cfg.ApierCfg().SchedulerConns) != 0 { schedulerSTaskChan := make(chan struct{}) waitTasks = append(waitTasks, schedulerSTaskChan) go func() { @@ -149,7 +147,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThd cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.ApierCfg().SchedulerConns, internalSchedulerSChan, false) + cfg.ApierCfg().SchedulerConns, intSchedulerSChanL, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to SchedulerS, error: %s", err.Error())) exitChan <- true @@ -160,9 +158,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThd //create scheduler connection var attributeSrpc rpcclient.RpcClientConnection - if isDispatcherEnabled { - attributeSrpc = dispatcherConn - } else if len(cfg.ApierCfg().SchedulerConns) != 0 { + if len(cfg.ApierCfg().SchedulerConns) != 0 { attributeSTaskChan := make(chan struct{}) waitTasks = append(waitTasks, attributeSTaskChan) go func() { @@ -173,7 +169,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThd cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.ApierCfg().AttributeSConns, internalAttributeSChan, false) + cfg.ApierCfg().AttributeSConns, intAttributeSChan, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to AttributeS, error: %s", err.Error())) exitChan <- true diff --git a/config/config.go b/config/config.go index 5a20c3a17..83e8e9697 100755 --- a/config/config.go +++ b/config/config.go @@ -167,7 +167,6 @@ func NewDefaultCGRConfig() (*CGRConfig, error) { cfg.migratorCgrCfg = new(MigratorCgrCfg) cfg.mailerCfg = new(MailerCfg) cfg.loaderCfg = make([]*LoaderSCfg, 0) - cfg.SmOsipsConfig = new(SmOsipsConfig) cfg.apier = new(ApierCfg) cfg.ersCfg = new(ERsCfg) @@ -358,7 +357,6 @@ type CGRConfig struct { migratorCgrCfg *MigratorCgrCfg // MigratorCgr config mailerCfg *MailerCfg // Mailer config analyzerSCfg *AnalyzerSCfg // AnalyzerS config - SmOsipsConfig *SmOsipsConfig // SMOpenSIPS Configuration apier *ApierCfg ersCfg *ERsCfg } @@ -374,14 +372,16 @@ func (self *CGRConfig) checkConfigSanity() error { if !self.statsCfg.Enabled { for _, connCfg := range self.ralsCfg.RALsStatSConns { if connCfg.Address == utils.MetaInternal { - return errors.New("StatS not enabled but requested by RALs component.") + return fmt.Errorf("%s not enabled but requested by %s component.", + utils.StatS, utils.RALService) } } } if !self.thresholdSCfg.Enabled { for _, connCfg := range self.ralsCfg.RALsThresholdSConns { if connCfg.Address == utils.MetaInternal { - return errors.New("ThresholdS not enabled but requested by RALs component.") + return fmt.Errorf("%s not enabled but requested by %s component.", + utils.ThresholdS, utils.RALService) } } } @@ -438,7 +438,7 @@ func (self *CGRConfig) checkConfigSanity() error { if len(cdrcInst.CdrsConns) == 0 { return fmt.Errorf(" Instance: %s, CdrC enabled but no CDRS defined!", cdrcInst.ID) } - if !self.cdrsCfg.CDRSEnabled { + if !self.cdrsCfg.CDRSEnabled && !self.dispatcherSCfg.Enabled { for _, conn := range cdrcInst.CdrsConns { if conn.Address == utils.MetaInternal { return errors.New("CDRS not enabled but referenced from CDRC") @@ -488,162 +488,160 @@ func (self *CGRConfig) checkConfigSanity() error { if !self.chargerSCfg.Enabled { for _, conn := range self.sessionSCfg.ChargerSConns { if conn.Address == utils.MetaInternal { - return errors.New(" ChargerS not enabled") + return fmt.Errorf("<%s> %s not enabled", utils.SessionS, utils.ChargerS) } } } if !self.ralsCfg.RALsEnabled { for _, smgRALsConn := range self.sessionSCfg.RALsConns { if smgRALsConn.Address == utils.MetaInternal { - return errors.New(" RALs not enabled but requested by SMGeneric component.") + return fmt.Errorf("<%s> %s not enabled but requested by SMGeneric component.", utils.SessionS, utils.RALService) } } } if !self.resourceSCfg.Enabled { for _, conn := range self.sessionSCfg.ResSConns { if conn.Address == utils.MetaInternal { - return errors.New(" ResourceS not enabled but requested by SMGeneric component.") + return fmt.Errorf("<%s> %s not enabled but requested by SMGeneric component.", utils.SessionS, utils.ResourceS) } } } if !self.thresholdSCfg.Enabled { for _, conn := range self.sessionSCfg.ThreshSConns { if conn.Address == utils.MetaInternal { - return errors.New(" ThresholdS not enabled but requested by SMGeneric component.") + return fmt.Errorf("<%s> %s not enabled but requested by SMGeneric component.", utils.SessionS, utils.ThresholdS) } } } if !self.statsCfg.Enabled { for _, conn := range self.sessionSCfg.StatSConns { if conn.Address == utils.MetaInternal { - return errors.New(" StatS not enabled but requested by SMGeneric component.") + return fmt.Errorf("<%s> %s not enabled but requested by SMGeneric component.", utils.SessionS, utils.StatS) } } } if !self.supplierSCfg.Enabled { for _, conn := range self.sessionSCfg.SupplSConns { if conn.Address == utils.MetaInternal { - return errors.New(" SupplierS not enabled but requested by SMGeneric component.") + return fmt.Errorf("<%s> %s not enabled but requested by SMGeneric component.", utils.SessionS, utils.SupplierS) } } } if !self.attributeSCfg.Enabled { for _, conn := range self.sessionSCfg.AttrSConns { if conn.Address == utils.MetaInternal { - return errors.New(" AttributeS not enabled but requested by SMGeneric component.") + return fmt.Errorf("<%s> %s not enabled but requested by SMGeneric component.", utils.SessionS, utils.AttributeS) } } } if !self.cdrsCfg.CDRSEnabled { for _, smgCDRSConn := range self.sessionSCfg.CDRsConns { if smgCDRSConn.Address == utils.MetaInternal { - return errors.New(" CDRS not enabled but referenced by SMGeneric component") + return fmt.Errorf("<%s> CDRS not enabled but referenced by SMGeneric component", utils.SessionS) } } } } // FreeSWITCHAgent checks - if self.fsAgentCfg.Enabled && !self.dispatcherSCfg.Enabled { + if self.fsAgentCfg.Enabled { if len(self.fsAgentCfg.SessionSConns) == 0 { - return fmt.Errorf("<%s> SMG definition is mandatory!", utils.FreeSWITCHAgent) + return fmt.Errorf("<%s> no %s connections defined", + utils.FreeSWITCHAgent, utils.SessionS) } - for _, connCfg := range self.fsAgentCfg.SessionSConns { - // if connCfg.Address != utils.MetaInternal { - // return errors.New("only <*internal> connectivity allowed in in towards for now") - // } - if connCfg.Address == utils.MetaInternal && - !self.sessionSCfg.Enabled { - return errors.New(" not enabled but referenced by ") + if !self.dispatcherSCfg.Enabled && // if dispatcher is enabled all internal connections are managed by it + !self.sessionSCfg.Enabled { + for _, connCfg := range self.fsAgentCfg.SessionSConns { + if connCfg.Address == utils.MetaInternal { + return fmt.Errorf("%s not enabled but referenced by %s", + utils.SessionS, utils.FreeSWITCHAgent) + } } } } // KamailioAgent checks - if self.kamAgentCfg.Enabled && !self.dispatcherSCfg.Enabled { + if self.kamAgentCfg.Enabled { if len(self.kamAgentCfg.SessionSConns) == 0 { - return fmt.Errorf("<%s> SMG definition is mandatory!", utils.KamailioAgent) + return fmt.Errorf("<%s> no %s connections defined", + utils.KamailioAgent, utils.SessionS) } - for _, connCfg := range self.kamAgentCfg.SessionSConns { - // if connCfg.Address != utils.MetaInternal { - // return errors.New("only <*internal> connectivity allowed in in towards for now") - // } - if connCfg.Address == utils.MetaInternal && - !self.sessionSCfg.Enabled { - return errors.New(" not enabled but referenced by ") - } - } - } - // SMOpenSIPS checks - if self.SmOsipsConfig.Enabled && !self.dispatcherSCfg.Enabled { - if len(self.SmOsipsConfig.RALsConns) == 0 { - return errors.New(" Rater definition is mandatory!") - } - if !self.ralsCfg.RALsEnabled { - for _, smOsipsRaterConn := range self.SmOsipsConfig.RALsConns { - if smOsipsRaterConn.Address == utils.MetaInternal { - return errors.New(" RALs not enabled.") - } - } - } - if len(self.SmOsipsConfig.CDRsConns) == 0 { - return errors.New(" CDRs definition is mandatory!") - } - if !self.cdrsCfg.CDRSEnabled { - for _, smOsipsCDRSConn := range self.SmOsipsConfig.CDRsConns { - if smOsipsCDRSConn.Address == utils.MetaInternal { - return errors.New(" CDRS not enabled.") + if !self.dispatcherSCfg.Enabled && // if dispatcher is enabled all internal connections are managed by it + !self.sessionSCfg.Enabled { + for _, connCfg := range self.kamAgentCfg.SessionSConns { + if connCfg.Address == utils.MetaInternal { + return fmt.Errorf("%s not enabled but referenced by %s", + utils.SessionS, utils.KamailioAgent) } } } } // AsteriskAgent checks - if self.asteriskAgentCfg.Enabled && !self.dispatcherSCfg.Enabled { + if self.asteriskAgentCfg.Enabled { if len(self.asteriskAgentCfg.SessionSConns) == 0 { - return errors.New(" SMG definition is mandatory!") + return fmt.Errorf("<%s> no %s connections defined", + utils.AsteriskAgent, utils.SessionS) } - for _, smAstSMGConn := range self.asteriskAgentCfg.SessionSConns { - if smAstSMGConn.Address == utils.MetaInternal && !self.sessionSCfg.Enabled { - return errors.New(" SMG not enabled.") + if !self.dispatcherSCfg.Enabled && // if dispatcher is enabled all internal connections are managed by it + !self.sessionSCfg.Enabled { + for _, smAstSMGConn := range self.asteriskAgentCfg.SessionSConns { + if smAstSMGConn.Address == utils.MetaInternal { + return fmt.Errorf("%s not enabled but referenced by %s", + utils.SessionS, utils.AsteriskAgent) + } } } - // if !self.sessionSCfg.Enabled { - // return errors.New(" SMG not enabled.") - // } } // DAgent checks - if self.diameterAgentCfg.Enabled && !self.sessionSCfg.Enabled && !self.dispatcherSCfg.Enabled { + if self.diameterAgentCfg.Enabled { if len(self.diameterAgentCfg.SessionSConns) == 0 { - return fmt.Errorf("<%s> SMG definition is mandatory!", utils.DiameterAgent) + return fmt.Errorf("<%s> no %s connections defined", + utils.DiameterAgent, utils.SessionS) } - for _, daSMGConn := range self.diameterAgentCfg.SessionSConns { - if daSMGConn.Address == utils.MetaInternal { - return fmt.Errorf("%s not enabled but referenced by %s component", - utils.SessionS, utils.DiameterAgent) + if !self.dispatcherSCfg.Enabled && // if dispatcher is enabled all internal connections are managed by it + !self.sessionSCfg.Enabled { + for _, daSMGConn := range self.diameterAgentCfg.SessionSConns { + if daSMGConn.Address == utils.MetaInternal { + return fmt.Errorf("%s not enabled but referenced by %s", + utils.SessionS, utils.DiameterAgent) + } } } } - if self.radiusAgentCfg.Enabled && !self.sessionSCfg.Enabled && !self.dispatcherSCfg.Enabled { + if self.radiusAgentCfg.Enabled { if len(self.radiusAgentCfg.SessionSConns) == 0 { - return fmt.Errorf("<%s> SMG definition is mandatory!", utils.RadiusAgent) + return fmt.Errorf("<%s> no %s connections defined", + utils.RadiusAgent, utils.SessionS) } - for _, raSMGConn := range self.radiusAgentCfg.SessionSConns { - if raSMGConn.Address == utils.MetaInternal { - return errors.New("SMGeneric not enabled but referenced by RadiusAgent component") + if !self.dispatcherSCfg.Enabled && // if dispatcher is enabled all internal connections are managed by it + !self.sessionSCfg.Enabled { + for _, raSMGConn := range self.radiusAgentCfg.SessionSConns { + if raSMGConn.Address == utils.MetaInternal { + return fmt.Errorf("%s not enabled but referenced by %s", + utils.SessionS, utils.RadiusAgent) + } } } } - if self.dnsAgentCfg.Enabled && !self.sessionSCfg.Enabled && !self.dispatcherSCfg.Enabled { - for _, sSConn := range self.dnsAgentCfg.SessionSConns { - if sSConn.Address == utils.MetaInternal { - return fmt.Errorf("%s not enabled but referenced by %s", utils.SessionS, utils.DNSAgent) + if self.dnsAgentCfg.Enabled { + if len(self.dnsAgentCfg.SessionSConns) == 0 { + return fmt.Errorf("<%s> no %s connections defined", + utils.DNSAgent, utils.SessionS) + } + if !self.dispatcherSCfg.Enabled && // if dispatcher is enabled all internal connections are managed by it + !self.sessionSCfg.Enabled { + for _, sSConn := range self.dnsAgentCfg.SessionSConns { + if sSConn.Address == utils.MetaInternal { + return fmt.Errorf("%s not enabled but referenced by %s", utils.SessionS, utils.DNSAgent) + } } } } // HTTPAgent checks for _, httpAgentCfg := range self.httpAgentCfg { // httpAgent checks - if !self.dispatcherSCfg.Enabled { + if !self.dispatcherSCfg.Enabled && // if dispatcher is enabled all internal connections are managed by it + self.sessionSCfg.Enabled { for _, sSConn := range httpAgentCfg.SessionSConns { - if sSConn.Address == utils.MetaInternal && self.sessionSCfg.Enabled { + if sSConn.Address == utils.MetaInternal { return errors.New("SessionS not enabled but referenced by HttpAgent component") } } @@ -662,10 +660,10 @@ func (self *CGRConfig) checkConfigSanity() error { return fmt.Errorf("<%s> process_runs needs to be bigger than 0", utils.AttributeS) } } - if self.chargerSCfg.Enabled && !self.dispatcherSCfg.Enabled { + if self.chargerSCfg.Enabled && !self.dispatcherSCfg.Enabled && + (self.attributeSCfg == nil || !self.attributeSCfg.Enabled) { for _, connCfg := range self.chargerSCfg.AttributeSConns { - if connCfg.Address == utils.MetaInternal && - (self.attributeSCfg == nil || !self.attributeSCfg.Enabled) { + if connCfg.Address == utils.MetaInternal { return errors.New("AttributeS not enabled but requested by ChargerS component.") } } @@ -692,7 +690,7 @@ func (self *CGRConfig) checkConfigSanity() error { if connCfg.Address != utils.MetaInternal { return errors.New("Only <*internal> RALs connectivity allowed in SupplierS for now") } - if connCfg.Address == utils.MetaInternal && !self.ralsCfg.RALsEnabled { + if !self.ralsCfg.RALsEnabled { return errors.New("RALs not enabled but requested by SupplierS component.") } } diff --git a/config/smconfig.go b/config/smconfig.go index 414466059..81d2ef489 100644 --- a/config/smconfig.go +++ b/config/smconfig.go @@ -338,72 +338,6 @@ func (self *OsipsConnConfig) loadFromJsonCfg(jsnCfg *OsipsConnJsonCfg) error { return nil } -// SM-OpenSIPS config section -type SmOsipsConfig struct { - Enabled bool - ListenUdp string - RALsConns []*RemoteHost - CDRsConns []*RemoteHost - CreateCdr bool - DebitInterval time.Duration - MinCallDuration time.Duration - MaxCallDuration time.Duration - EventsSubscribeInterval time.Duration - MiAddr string -} - -func (self *SmOsipsConfig) loadFromJsonCfg(jsnCfg *SmOsipsJsonCfg) error { - var err error - if jsnCfg.Enabled != nil { - self.Enabled = *jsnCfg.Enabled - } - if jsnCfg.Listen_udp != nil { - self.ListenUdp = *jsnCfg.Listen_udp - } - if jsnCfg.Rals_conns != nil { - self.RALsConns = make([]*RemoteHost, len(*jsnCfg.Rals_conns)) - for idx, jsnHaCfg := range *jsnCfg.Rals_conns { - self.RALsConns[idx] = NewDfltRemoteHost() - self.RALsConns[idx].loadFromJsonCfg(jsnHaCfg) - } - } - if jsnCfg.Cdrs_conns != nil { - self.CDRsConns = make([]*RemoteHost, len(*jsnCfg.Cdrs_conns)) - for idx, jsnHaCfg := range *jsnCfg.Cdrs_conns { - self.CDRsConns[idx] = NewDfltRemoteHost() - self.CDRsConns[idx].loadFromJsonCfg(jsnHaCfg) - } - } - if jsnCfg.Create_cdr != nil { - self.CreateCdr = *jsnCfg.Create_cdr - } - if jsnCfg.Debit_interval != nil { - if self.DebitInterval, err = utils.ParseDurationWithNanosecs(*jsnCfg.Debit_interval); err != nil { - return err - } - } - if jsnCfg.Min_call_duration != nil { - if self.MinCallDuration, err = utils.ParseDurationWithNanosecs(*jsnCfg.Min_call_duration); err != nil { - return err - } - } - if jsnCfg.Max_call_duration != nil { - if self.MaxCallDuration, err = utils.ParseDurationWithNanosecs(*jsnCfg.Max_call_duration); err != nil { - return err - } - } - if jsnCfg.Events_subscribe_interval != nil { - if self.EventsSubscribeInterval, err = utils.ParseDurationWithNanosecs(*jsnCfg.Events_subscribe_interval); err != nil { - return err - } - } - if jsnCfg.Mi_addr != nil { - self.MiAddr = *jsnCfg.Mi_addr - } - - return nil -} - // Uses stored defaults so we can pre-populate by loading from JSON config func NewDefaultAsteriskConnCfg() *AsteriskConnCfg { if dfltAstConnCfg == nil { diff --git a/data/conf/samples/dispatchers/dispatchers/cgrates.json b/data/conf/samples/dispatchers/dispatchers/cgrates.json index 666fe17ea..45186d5b9 100755 --- a/data/conf/samples/dispatchers/dispatchers/cgrates.json +++ b/data/conf/samples/dispatchers/dispatchers/cgrates.json @@ -42,6 +42,18 @@ "sessions": { "enabled": true, + "attributes_conns": [ + {"address": "*internal"}, + ], + "resources_conns": [ + {"address": "*internal"}, + ], + "rals_conns": [ + {"address": "*internal"} + ], + "chargers_conns": [ + {"address": "*internal"} + ], "listen_bijson": ":3014", }, diff --git a/data/conf/samples/dispatchers/dispatchers_mongo/cgrates.json b/data/conf/samples/dispatchers/dispatchers_mongo/cgrates.json index 7ac91ee68..ea4874c4a 100644 --- a/data/conf/samples/dispatchers/dispatchers_mongo/cgrates.json +++ b/data/conf/samples/dispatchers/dispatchers_mongo/cgrates.json @@ -50,6 +50,18 @@ "sessions": { "enabled": true, + "attributes_conns": [ + {"address": "*internal"}, + ], + "resources_conns": [ + {"address": "*internal"}, + ], + "rals_conns": [ + {"address": "*internal"} + ], + "chargers_conns": [ + {"address": "*internal"} + ], "listen_bijson": ":3014", },