Updated dispatcher connections

This commit is contained in:
Trial97
2019-08-19 18:30:25 +03:00
committed by Dan Christian Bogos
parent 2d0db38384
commit 0cd5fad596
6 changed files with 286 additions and 386 deletions

View File

@@ -47,19 +47,6 @@ import (
"github.com/cgrates/rpcclient"
)
const (
JSON = "json"
GOB = "gob"
POSTGRES = "postgres"
MYSQL = "mysql"
MONGO = "mongo"
REDIS = "redis"
SAME = "same"
FS = "freeswitch"
KAMAILIO = "kamailio"
OSIPS = "opensips"
)
var (
cgrEngineFlags = flag.NewFlagSet("cgr-engine", flag.ContinueOnError)
cfgPath = cgrEngineFlags.String("config_path", utils.CONFIG_PATH, "Configuration directory path.")
@@ -85,10 +72,9 @@ func startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan cha
filterSChan <- filterS
cdrcInitialized := false // Control whether the cdrc was already initialized (so we don't reload in that case)
var cdrcChildrenChan chan struct{} // Will use it to communicate with the children of one fork
var dispatcherSConn rpcclient.RpcClientConnection
intCdrSChan := internalCdrSChan
if cfg.DispatcherSCfg().Enabled {
dispatcherSConn = <-internalDispatcherSChan
internalDispatcherSChan <- dispatcherSConn
intCdrSChan = internalDispatcherSChan
}
for {
select {
@@ -110,9 +96,9 @@ func startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan cha
}
}
if len(enabledCfgs) != 0 {
go startCdrc(internalCdrSChan, internalRaterChan, enabledCfgs,
cfg.GeneralCfg().HttpSkipTlsVerify, dispatcherSConn,
filterSChan, cdrcChildrenChan, exitChan)
go startCdrc(intCdrSChan, internalRaterChan, enabledCfgs,
cfg.GeneralCfg().HttpSkipTlsVerify, filterSChan,
cdrcChildrenChan, exitChan)
} else {
utils.Logger.Info("<CDRC> No enabled CDRC clients")
}
@@ -123,26 +109,23 @@ func startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan cha
// Fires up a cdrc instance
func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConnection, cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool,
dispatcherSConn rpcclient.RpcClientConnection, filterSChan chan *engine.FilterS, closeChan chan struct{}, exitChan chan bool) {
filterSChan chan *engine.FilterS, closeChan chan struct{}, exitChan chan bool) {
filterS := <-filterSChan
filterSChan <- filterS
var err error
var cdrsConn rpcclient.RpcClientConnection
if cfg.DispatcherSCfg().Enabled {
cdrsConn = dispatcherSConn
} else {
cdrcCfg := cdrcCfgs[0]
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cdrcCfg.CdrsConns, internalCdrSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRC> Could not connect to CDRS via RPC: %s", err.Error()))
exitChan <- true
return
}
cdrcCfg := cdrcCfgs[0]
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cdrcCfg.CdrsConns, internalCdrSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRC> Could not connect to CDRS via RPC: %s", err.Error()))
exitChan <- true
return
}
cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrsConn, closeChan,
cfg.GeneralCfg().DefaultTimezone, filterS)
if err != nil {
@@ -163,21 +146,34 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
dm *engine.DataManager, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS Session service.")
var err error
var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, cdrsConn, chargerSConn, dispatcherSConn rpcclient.RpcClientConnection
isDispatcherSEnabled := cfg.DispatcherSCfg().Enabled
if isDispatcherSEnabled {
dispatcherSConn = <-internalDispatcherSChan
internalDispatcherSChan <- dispatcherSConn
var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, cdrsConn, chargerSConn rpcclient.RpcClientConnection
intChargerSChan := internalChargerSChan
intRaterChan := internalRaterChan
intResourceSChan := internalResourceSChan
intThresholdSChan := internalThresholdSChan
intStatSChan := internalStatSChan
intSupplierSChan := internalSupplierSChan
intAttrSChan := internalAttrSChan
intCDRSChan := internalCDRSChan
if cfg.DispatcherSCfg().Enabled {
intChargerSChan = internalDispatcherSChan
intRaterChan = internalDispatcherSChan
intResourceSChan = internalDispatcherSChan
intThresholdSChan = internalDispatcherSChan
intStatSChan = internalDispatcherSChan
intSupplierSChan = internalDispatcherSChan
intAttrSChan = internalDispatcherSChan
intCDRSChan = internalDispatcherSChan
}
if isDispatcherSEnabled {
chargerSConn = dispatcherSConn
} else if len(cfg.SessionSCfg().ChargerSConns) != 0 {
if len(cfg.SessionSCfg().ChargerSConns) != 0 {
chargerSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate,
cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts,
cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout,
cfg.GeneralCfg().ReplyTimeout, cfg.SessionSCfg().ChargerSConns,
internalChargerSChan, false)
intChargerSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.ChargerS, err.Error()))
@@ -185,15 +181,13 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
return
}
}
if isDispatcherSEnabled {
ralsConns = dispatcherSConn
} else if len(cfg.SessionSCfg().RALsConns) != 0 {
if len(cfg.SessionSCfg().RALsConns) != 0 {
ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate,
cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts,
cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout,
cfg.GeneralCfg().ReplyTimeout, cfg.SessionSCfg().RALsConns,
internalRaterChan, false)
intRaterChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to RALs: %s",
utils.SessionS, err.Error()))
@@ -201,15 +195,13 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
return
}
}
if isDispatcherSEnabled {
resSConns = dispatcherSConn
} else if len(cfg.SessionSCfg().ResSConns) != 0 {
if len(cfg.SessionSCfg().ResSConns) != 0 {
resSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SessionSCfg().ResSConns, internalResourceSChan, false)
cfg.SessionSCfg().ResSConns, intResourceSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ResourceS: %s",
utils.SessionS, err.Error()))
@@ -217,15 +209,13 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
return
}
}
if isDispatcherSEnabled {
threshSConns = dispatcherSConn
} else if len(cfg.SessionSCfg().ThreshSConns) != 0 {
if len(cfg.SessionSCfg().ThreshSConns) != 0 {
threshSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SessionSCfg().ThreshSConns, internalThresholdSChan, false)
cfg.SessionSCfg().ThreshSConns, intThresholdSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s",
utils.SessionS, err.Error()))
@@ -233,15 +223,13 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
return
}
}
if isDispatcherSEnabled {
statSConns = dispatcherSConn
} else if len(cfg.SessionSCfg().StatSConns) != 0 {
if len(cfg.SessionSCfg().StatSConns) != 0 {
statSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SessionSCfg().StatSConns, internalStatSChan, false)
cfg.SessionSCfg().StatSConns, intStatSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s",
utils.SessionS, err.Error()))
@@ -249,15 +237,13 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
return
}
}
if isDispatcherSEnabled {
suplSConns = dispatcherSConn
} else if len(cfg.SessionSCfg().SupplSConns) != 0 {
if len(cfg.SessionSCfg().SupplSConns) != 0 {
suplSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SessionSCfg().SupplSConns, internalSupplierSChan, false)
cfg.SessionSCfg().SupplSConns, intSupplierSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SupplierS: %s",
utils.SessionS, err.Error()))
@@ -265,15 +251,13 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
return
}
}
if isDispatcherSEnabled {
attrSConns = dispatcherSConn
} else if len(cfg.SessionSCfg().AttrSConns) != 0 {
if len(cfg.SessionSCfg().AttrSConns) != 0 {
attrSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SessionSCfg().AttrSConns, internalAttrSChan, false)
cfg.SessionSCfg().AttrSConns, intAttrSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to AttributeS: %s",
utils.SessionS, err.Error()))
@@ -281,15 +265,13 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
return
}
}
if isDispatcherSEnabled {
cdrsConn = dispatcherSConn
} else if len(cfg.SessionSCfg().CDRsConns) != 0 {
if len(cfg.SessionSCfg().CDRsConns) != 0 {
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SessionSCfg().CDRsConns, internalCDRSChan, false)
cfg.SessionSCfg().CDRsConns, intCDRSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to RALs: %s",
utils.SessionS, err.Error()))
@@ -343,16 +325,11 @@ func startAsteriskAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.
var sS rpcclient.RpcClientConnection
var sSInternal bool
utils.Logger.Info("Starting Asterisk agent")
intSMGChan := internalSMGChan
if cfg.DispatcherSCfg().Enabled {
sS = <-internalDispatcherSChan
internalDispatcherSChan <- sS
} else if len(cfg.AsteriskAgentCfg().SessionSConns) == 0 {
utils.Logger.Crit(
fmt.Sprintf("<%s> no SessionS connections defined",
utils.AsteriskAgent))
exitChan <- true
return
} else if cfg.AsteriskAgentCfg().SessionSConns[0].Address == utils.MetaInternal {
intSMGChan = internalDispatcherSChan
}
if !cfg.DispatcherSCfg().Enabled && cfg.AsteriskAgentCfg().SessionSConns[0].Address == utils.MetaInternal {
sSInternal = true
sSIntConn := <-internalSMGChan
internalSMGChan <- sSIntConn
@@ -363,7 +340,7 @@ func startAsteriskAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.AsteriskAgentCfg().SessionSConns, internalSMGChan, false)
cfg.AsteriskAgentCfg().SessionSConns, intSMGChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.AsteriskAgent, utils.SessionS, err.Error()))
@@ -408,16 +385,11 @@ func startDiameterAgent(internalSsChan, internalDispatcherSChan chan rpcclient.R
filterSChan <- filterS
var sS rpcclient.RpcClientConnection
var sSInternal bool
intSsChan := internalSsChan
if cfg.DispatcherSCfg().Enabled {
sS = <-internalDispatcherSChan
internalDispatcherSChan <- sS
} else if len(cfg.DiameterAgentCfg().SessionSConns) == 0 {
utils.Logger.Crit(
fmt.Sprintf("<%s> no SessionS connections defined",
utils.DiameterAgent))
exitChan <- true
return
} else if cfg.DiameterAgentCfg().SessionSConns[0].Address == utils.MetaInternal {
intSsChan = internalDispatcherSChan
}
if !cfg.DispatcherSCfg().Enabled && cfg.DiameterAgentCfg().SessionSConns[0].Address == utils.MetaInternal {
sSInternal = true
sSIntConn := <-internalSsChan
internalSsChan <- sSIntConn
@@ -428,7 +400,7 @@ func startDiameterAgent(internalSsChan, internalDispatcherSChan chan rpcclient.R
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DiameterAgentCfg().SessionSConns, internalSsChan, false)
cfg.DiameterAgentCfg().SessionSConns, intSsChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.DiameterAgent, utils.SessionS, err.Error()))
@@ -467,28 +439,23 @@ func startRadiusAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.Rp
utils.Logger.Info("Starting CGRateS RadiusAgent service")
var err error
var smgConn rpcclient.RpcClientConnection
intSMGChan := internalSMGChan
if cfg.DispatcherSCfg().Enabled {
smgConn = <-internalDispatcherSChan
internalDispatcherSChan <- smgConn
} else if len(cfg.RadiusAgentCfg().SessionSConns) != 0 {
smgConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.RadiusAgentCfg().SessionSConns, internalSMGChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SMG: %s", utils.RadiusAgent, err.Error()))
exitChan <- true
return
}
} else {
utils.Logger.Crit(
fmt.Sprintf("<%s> no SessionS connections defined",
utils.RadiusAgent))
intSMGChan = internalDispatcherSChan
}
smgConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.RadiusAgentCfg().SessionSConns, intSMGChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SMG: %s", utils.RadiusAgent, err.Error()))
exitChan <- true
return
}
ra, err := agents.NewRadiusAgent(cfg, filterS, smgConn)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.RadiusAgent, err.Error()))
@@ -509,16 +476,11 @@ func startDNSAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcCl
filterS := <-filterSChan
filterSChan <- filterS
utils.Logger.Info(fmt.Sprintf("starting %s service", utils.DNSAgent))
intSMGChan := internalSMGChan
if cfg.DispatcherSCfg().Enabled {
sS = <-internalDispatcherSChan
internalDispatcherSChan <- sS
} else if len(cfg.DNSAgentCfg().SessionSConns) == 0 {
utils.Logger.Crit(
fmt.Sprintf("<%s> no SessionS connections defined",
utils.DNSAgent))
exitChan <- true
return
} else if cfg.DNSAgentCfg().SessionSConns[0].Address == utils.MetaInternal {
intSMGChan = internalDispatcherSChan
}
if !cfg.DispatcherSCfg().Enabled && cfg.DNSAgentCfg().SessionSConns[0].Address == utils.MetaInternal {
// sSInternal = true
sSIntConn := <-internalSMGChan
internalSMGChan <- sSIntConn
@@ -529,7 +491,7 @@ func startDNSAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcCl
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DNSAgentCfg().SessionSConns, internalSMGChan, false)
cfg.DNSAgentCfg().SessionSConns, intSMGChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.DNSAgent, utils.SessionS, err.Error()))
@@ -565,16 +527,11 @@ func startFsAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcCli
var sS rpcclient.RpcClientConnection
var sSInternal bool
utils.Logger.Info("Starting FreeSWITCH agent")
intSMGChan := internalSMGChan
if cfg.DispatcherSCfg().Enabled {
sS = <-internalDispatcherSChan
internalDispatcherSChan <- sS
} else if len(cfg.FsAgentCfg().SessionSConns) == 0 {
utils.Logger.Crit(
fmt.Sprintf("<%s> no SessionS connections defined",
utils.FreeSWITCHAgent))
exitChan <- true
return
} else if cfg.FsAgentCfg().SessionSConns[0].Address == utils.MetaInternal {
intSMGChan = internalDispatcherSChan
}
if !cfg.DispatcherSCfg().Enabled && cfg.FsAgentCfg().SessionSConns[0].Address == utils.MetaInternal {
sSInternal = true
sSIntConn := <-internalSMGChan
internalSMGChan <- sSIntConn
@@ -585,7 +542,7 @@ func startFsAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcCli
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.FsAgentCfg().SessionSConns, internalSMGChan, false)
cfg.FsAgentCfg().SessionSConns, intSMGChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.FreeSWITCHAgent, utils.SessionS, err.Error()))
@@ -617,16 +574,11 @@ func startKamAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcCl
var sS rpcclient.RpcClientConnection
var sSInternal bool
utils.Logger.Info("Starting Kamailio agent")
intSMGChan := internalSMGChan
if cfg.DispatcherSCfg().Enabled {
sS = <-internalDispatcherSChan
internalDispatcherSChan <- sS
} else if len(cfg.KamAgentCfg().SessionSConns) == 0 {
utils.Logger.Crit(
fmt.Sprintf("<%s> no SessionS connections defined",
utils.KamailioAgent))
exitChan <- true
return
} else if cfg.KamAgentCfg().SessionSConns[0].Address == utils.MetaInternal {
intSMGChan = internalDispatcherSChan
}
if !cfg.DispatcherSCfg().Enabled && cfg.KamAgentCfg().SessionSConns[0].Address == utils.MetaInternal {
sSInternal = true
sSIntConn := <-internalSMGChan
internalSMGChan <- sSIntConn
@@ -637,7 +589,7 @@ func startKamAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcCl
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.KamAgentCfg().SessionSConns, internalSMGChan, false)
cfg.KamAgentCfg().SessionSConns, intSMGChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.KamailioAgent, utils.SessionS, err.Error()))
@@ -669,20 +621,20 @@ func startHTTPAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcC
filterS := <-filterSChan
filterSChan <- filterS
var sS rpcclient.RpcClientConnection
intSMGChan := internalSMGChan
if cfg.DispatcherSCfg().Enabled {
sS = <-internalDispatcherSChan
internalDispatcherSChan <- sS
intSMGChan = internalDispatcherSChan
}
utils.Logger.Info("Starting HTTP agent")
var err error
for _, agntCfg := range cfg.HttpAgentCfg() {
if !cfg.DispatcherSCfg().Enabled && len(agntCfg.SessionSConns) != 0 {
if len(agntCfg.SessionSConns) != 0 {
sS, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
agntCfg.SessionSConns, internalSMGChan, false)
agntCfg.SessionSConns, intSMGChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> could not connect to %s, error: %s",
utils.HTTPAgent, utils.SessionS, err.Error()))
@@ -704,21 +656,27 @@ func startCDRS(internalCdrSChan, internalRaterChan, internalAttributeSChan, inte
filterSChan <- filterS
var err error
utils.Logger.Info("Starting CGRateS CDRS service.")
var ralConn, attrSConn, thresholdSConn, statsConn, chargerSConn, dispatcherSConn rpcclient.RpcClientConnection
isDispatcherSEnabled := cfg.DispatcherSCfg().Enabled
if isDispatcherSEnabled {
dispatcherSConn = <-internalDispatcherSChan
internalDispatcherSChan <- dispatcherSConn
var ralConn, attrSConn, thresholdSConn, statsConn, chargerSConn rpcclient.RpcClientConnection
intChargerSChan := internalChargerSChan
intRaterChan := internalRaterChan
intAttributeSChan := internalAttributeSChan
intThresholdSChan := internalThresholdSChan
intStatSChan := internalStatSChan
if cfg.DispatcherSCfg().Enabled {
intChargerSChan = internalDispatcherSChan
intRaterChan = internalDispatcherSChan
intAttributeSChan = internalDispatcherSChan
intThresholdSChan = internalDispatcherSChan
intStatSChan = internalDispatcherSChan
}
if isDispatcherSEnabled {
chargerSConn = dispatcherSConn
} else if len(cfg.CdrsCfg().CDRSChargerSConns) != 0 { // Conn pool towards RAL
if len(cfg.CdrsCfg().CDRSChargerSConns) != 0 { // Conn pool towards RAL
chargerSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.CdrsCfg().CDRSChargerSConns, internalChargerSChan, false)
cfg.CdrsCfg().CDRSChargerSConns, intChargerSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s",
utils.ChargerS, err.Error()))
@@ -726,30 +684,26 @@ func startCDRS(internalCdrSChan, internalRaterChan, internalAttributeSChan, inte
return
}
}
if isDispatcherSEnabled {
ralConn = dispatcherSConn
} else if len(cfg.CdrsCfg().CDRSRaterConns) != 0 { // Conn pool towards RAL
if len(cfg.CdrsCfg().CDRSRaterConns) != 0 { // Conn pool towards RAL
ralConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.CdrsCfg().CDRSRaterConns, internalRaterChan, false)
cfg.CdrsCfg().CDRSRaterConns, intRaterChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to RAL: %s", err.Error()))
exitChan <- true
return
}
}
if isDispatcherSEnabled {
attrSConn = dispatcherSConn
} else if len(cfg.CdrsCfg().CDRSAttributeSConns) != 0 { // Users connection init
if len(cfg.CdrsCfg().CDRSAttributeSConns) != 0 { // Users connection init
attrSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.CdrsCfg().CDRSAttributeSConns, internalAttributeSChan, false)
cfg.CdrsCfg().CDRSAttributeSConns, intAttributeSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s",
utils.AttributeS, err.Error()))
@@ -757,30 +711,26 @@ func startCDRS(internalCdrSChan, internalRaterChan, internalAttributeSChan, inte
return
}
}
if isDispatcherSEnabled {
thresholdSConn = dispatcherSConn
} else if len(cfg.CdrsCfg().CDRSThresholdSConns) != 0 { // Stats connection init
if len(cfg.CdrsCfg().CDRSThresholdSConns) != 0 { // Stats connection init
thresholdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.CdrsCfg().CDRSThresholdSConns, internalThresholdSChan, false)
cfg.CdrsCfg().CDRSThresholdSConns, intThresholdSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to ThresholdS: %s", err.Error()))
exitChan <- true
return
}
}
if isDispatcherSEnabled {
statsConn = dispatcherSConn
} else if len(cfg.CdrsCfg().CDRSStatSConns) != 0 { // Stats connection init
if len(cfg.CdrsCfg().CDRSStatSConns) != 0 { // Stats connection init
statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.CdrsCfg().CDRSStatSConns, internalStatSChan, false)
cfg.CdrsCfg().CDRSStatSConns, intStatSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to StatS: %s", err.Error()))
exitChan <- true
@@ -862,16 +812,17 @@ func startChargerService(internalChargerSChan, internalAttributeSChan,
<-cacheS.GetPrecacheChannel(utils.CacheChargerFilterIndexes)
var attrSConn rpcclient.RpcClientConnection
var err error
intAttributeSChan := internalAttributeSChan
if cfg.DispatcherSCfg().Enabled {
attrSConn = <-internalDispatcherSChan
internalDispatcherSChan <- attrSConn
} else if len(cfg.ChargerSCfg().AttributeSConns) != 0 { // AttributeS connection init
intAttributeSChan = internalDispatcherSChan
}
if len(cfg.ChargerSCfg().AttributeSConns) != 0 { // AttributeS connection init
attrSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.ChargerSCfg().AttributeSConns, internalAttributeSChan, false)
cfg.ChargerSCfg().AttributeSConns, intAttributeSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.ChargerS, utils.AttributeS, err.Error()))
@@ -913,16 +864,17 @@ func startResourceService(internalRsChan, internalThresholdSChan,
var thdSConn rpcclient.RpcClientConnection
filterS := <-filterSChan
filterSChan <- filterS
intThresholdSChan := internalThresholdSChan
if cfg.DispatcherSCfg().Enabled {
thdSConn = <-internalDispatcherSChan
internalDispatcherSChan <- thdSConn
} else if len(cfg.ResourceSCfg().ThresholdSConns) != 0 { // Stats connection init
intThresholdSChan = internalDispatcherSChan
}
if len(cfg.ResourceSCfg().ThresholdSConns) != 0 { // Stats connection init
thdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.ResourceSCfg().ThresholdSConns, internalThresholdSChan, false)
cfg.ResourceSCfg().ThresholdSConns, intThresholdSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<ResourceS> Could not connect to ThresholdS: %s", err.Error()))
exitChan <- true
@@ -967,16 +919,17 @@ func startStatService(internalStatSChan, internalThresholdSChan,
var thdSConn rpcclient.RpcClientConnection
filterS := <-filterSChan
filterSChan <- filterS
intThresholdSChan := internalThresholdSChan
if cfg.DispatcherSCfg().Enabled {
thdSConn = <-internalDispatcherSChan
internalDispatcherSChan <- thdSConn
} else if len(cfg.StatSCfg().ThresholdSConns) != 0 { // Stats connection init
intThresholdSChan = internalDispatcherSChan
}
if len(cfg.StatSCfg().ThresholdSConns) != 0 { // Stats connection init
thdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.StatSCfg().ThresholdSConns, internalThresholdSChan, false)
cfg.StatSCfg().ThresholdSConns, intThresholdSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<StatS> Could not connect to ThresholdS: %s", err.Error()))
exitChan <- true
@@ -1051,21 +1004,23 @@ func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSCh
var err error
filterS := <-filterSChan
filterSChan <- filterS
var attrSConn, resourceSConn, statSConn, dispatcherSConn rpcclient.RpcClientConnection
isDispatcherSEnabled := cfg.DispatcherSCfg().Enabled
if isDispatcherSEnabled {
dispatcherSConn = <-internalDispatcherSChan
internalDispatcherSChan <- dispatcherSConn
var attrSConn, resourceSConn, statSConn rpcclient.RpcClientConnection
intAttrSChan := internalAttrSChan
intStatSChan := internalStatSChan
intRsChan := internalRsChan
if cfg.DispatcherSCfg().Enabled { // use dispatcher as internal chanel if active
intAttrSChan = internalDispatcherSChan
intStatSChan = internalDispatcherSChan
intRsChan = internalDispatcherSChan
}
if isDispatcherSEnabled {
attrSConn = dispatcherSConn
} else if len(cfg.SupplierSCfg().AttributeSConns) != 0 {
if len(cfg.SupplierSCfg().AttributeSConns) != 0 {
attrSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SupplierSCfg().AttributeSConns, internalAttrSChan, false)
cfg.SupplierSCfg().AttributeSConns, intAttrSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SupplierS, utils.AttributeS, err.Error()))
@@ -1073,15 +1028,13 @@ func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSCh
return
}
}
if isDispatcherSEnabled {
statSConn = dispatcherSConn
} else if len(cfg.SupplierSCfg().StatSConns) != 0 {
if len(cfg.SupplierSCfg().StatSConns) != 0 {
statSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SupplierSCfg().StatSConns, internalStatSChan, false)
cfg.SupplierSCfg().StatSConns, intStatSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s",
utils.SupplierS, err.Error()))
@@ -1089,15 +1042,13 @@ func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSCh
return
}
}
if isDispatcherSEnabled {
resourceSConn = dispatcherSConn
} else if len(cfg.SupplierSCfg().ResourceSConns) != 0 {
if len(cfg.SupplierSCfg().ResourceSConns) != 0 {
resourceSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SupplierSCfg().ResourceSConns, internalRsChan, false)
cfg.SupplierSCfg().ResourceSConns, intRsChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s",
utils.SupplierS, err.Error()))
@@ -1457,23 +1408,20 @@ func initLogger(cfg *config.CGRConfig) error {
}
func schedCDRsConns(internalCDRSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
var err error
var cdrsConn rpcclient.RpcClientConnection
intChan := internalCDRSChan
if cfg.DispatcherSCfg().Enabled {
cdrsConn = <-internalDispatcherSChan
internalDispatcherSChan <- cdrsConn
} else {
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SchedulerCfg().CDRsConns, internalCDRSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to CDRServer: %s", utils.SchedulerS, err.Error()))
exitChan <- true
return
}
intChan = internalDispatcherSChan
}
cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SchedulerCfg().CDRsConns, intChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to CDRServer: %s", utils.SchedulerS, err.Error()))
exitChan <- true
return
}
engine.SetSchedCdrsConns(cdrsConn)
}

View File

@@ -57,17 +57,21 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThd
<-chS.GetPrecacheChannel(utils.CacheTimings)
}()
var dispatcherConn rpcclient.RpcClientConnection
isDispatcherEnabled := cfg.DispatcherSCfg().Enabled
if isDispatcherEnabled {
dispatcherConn = <-internalDispatcherSChan
internalDispatcherSChan <- dispatcherConn
intThdSChan := internalThdSChan
intStatSChan := internalStatSChan
intCacheSChan := internalCacheSChan
intSchedulerSChanL := internalSchedulerSChan
intAttributeSChan := internalAttributeSChan
if cfg.DispatcherSCfg().Enabled {
intThdSChan = internalDispatcherSChan
intStatSChan = internalDispatcherSChan
intCacheSChan = internalDispatcherSChan
intSchedulerSChanL = internalDispatcherSChan
intAttributeSChan = internalDispatcherSChan
}
var thdS rpcclient.RpcClientConnection
if isDispatcherEnabled {
thdS = dispatcherConn
} else if len(cfg.RalsCfg().RALsThresholdSConns) != 0 { // Connections to ThresholdS
if len(cfg.RalsCfg().RALsThresholdSConns) != 0 { // Connections to ThresholdS
thdsTaskChan := make(chan struct{})
waitTasks = append(waitTasks, thdsTaskChan)
go func() {
@@ -78,7 +82,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThd
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.RalsCfg().RALsThresholdSConns, internalThdSChan, false)
cfg.RalsCfg().RALsThresholdSConns, intThdSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect to ThresholdS, error: %s", err.Error()))
exitChan <- true
@@ -88,9 +92,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThd
}
var stats rpcclient.RpcClientConnection
if isDispatcherEnabled {
stats = dispatcherConn
} else if len(cfg.RalsCfg().RALsStatSConns) != 0 { // Connections to StatS
if len(cfg.RalsCfg().RALsStatSConns) != 0 { // Connections to StatS
statsTaskChan := make(chan struct{})
waitTasks = append(waitTasks, statsTaskChan)
go func() {
@@ -101,7 +103,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThd
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.RalsCfg().RALsStatSConns, internalStatSChan, false)
cfg.RalsCfg().RALsStatSConns, intStatSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect to StatS, error: %s", err.Error()))
exitChan <- true
@@ -112,9 +114,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThd
//create cache connection
var cacheSrpc rpcclient.RpcClientConnection
if isDispatcherEnabled {
cacheSrpc = dispatcherConn
} else if len(cfg.ApierCfg().CachesConns) != 0 {
if len(cfg.ApierCfg().CachesConns) != 0 {
cachesTaskChan := make(chan struct{})
waitTasks = append(waitTasks, cachesTaskChan)
go func() {
@@ -125,7 +125,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThd
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.ApierCfg().CachesConns, internalCacheSChan, false)
cfg.ApierCfg().CachesConns, intCacheSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<APIer> Could not connect to CacheS, error: %s", err.Error()))
exitChan <- true
@@ -136,9 +136,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThd
//create scheduler connection
var schedulerSrpc rpcclient.RpcClientConnection
if isDispatcherEnabled {
schedulerSrpc = dispatcherConn
} else if len(cfg.ApierCfg().SchedulerConns) != 0 {
if len(cfg.ApierCfg().SchedulerConns) != 0 {
schedulerSTaskChan := make(chan struct{})
waitTasks = append(waitTasks, schedulerSTaskChan)
go func() {
@@ -149,7 +147,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThd
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.ApierCfg().SchedulerConns, internalSchedulerSChan, false)
cfg.ApierCfg().SchedulerConns, intSchedulerSChanL, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<APIer> Could not connect to SchedulerS, error: %s", err.Error()))
exitChan <- true
@@ -160,9 +158,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThd
//create scheduler connection
var attributeSrpc rpcclient.RpcClientConnection
if isDispatcherEnabled {
attributeSrpc = dispatcherConn
} else if len(cfg.ApierCfg().SchedulerConns) != 0 {
if len(cfg.ApierCfg().SchedulerConns) != 0 {
attributeSTaskChan := make(chan struct{})
waitTasks = append(waitTasks, attributeSTaskChan)
go func() {
@@ -173,7 +169,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThd
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.ApierCfg().AttributeSConns, internalAttributeSChan, false)
cfg.ApierCfg().AttributeSConns, intAttributeSChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<APIer> Could not connect to AttributeS, error: %s", err.Error()))
exitChan <- true

View File

@@ -167,7 +167,6 @@ func NewDefaultCGRConfig() (*CGRConfig, error) {
cfg.migratorCgrCfg = new(MigratorCgrCfg)
cfg.mailerCfg = new(MailerCfg)
cfg.loaderCfg = make([]*LoaderSCfg, 0)
cfg.SmOsipsConfig = new(SmOsipsConfig)
cfg.apier = new(ApierCfg)
cfg.ersCfg = new(ERsCfg)
@@ -358,7 +357,6 @@ type CGRConfig struct {
migratorCgrCfg *MigratorCgrCfg // MigratorCgr config
mailerCfg *MailerCfg // Mailer config
analyzerSCfg *AnalyzerSCfg // AnalyzerS config
SmOsipsConfig *SmOsipsConfig // SMOpenSIPS Configuration
apier *ApierCfg
ersCfg *ERsCfg
}
@@ -374,14 +372,16 @@ func (self *CGRConfig) checkConfigSanity() error {
if !self.statsCfg.Enabled {
for _, connCfg := range self.ralsCfg.RALsStatSConns {
if connCfg.Address == utils.MetaInternal {
return errors.New("StatS not enabled but requested by RALs component.")
return fmt.Errorf("%s not enabled but requested by %s component.",
utils.StatS, utils.RALService)
}
}
}
if !self.thresholdSCfg.Enabled {
for _, connCfg := range self.ralsCfg.RALsThresholdSConns {
if connCfg.Address == utils.MetaInternal {
return errors.New("ThresholdS not enabled but requested by RALs component.")
return fmt.Errorf("%s not enabled but requested by %s component.",
utils.ThresholdS, utils.RALService)
}
}
}
@@ -438,7 +438,7 @@ func (self *CGRConfig) checkConfigSanity() error {
if len(cdrcInst.CdrsConns) == 0 {
return fmt.Errorf("<CDRC> Instance: %s, CdrC enabled but no CDRS defined!", cdrcInst.ID)
}
if !self.cdrsCfg.CDRSEnabled {
if !self.cdrsCfg.CDRSEnabled && !self.dispatcherSCfg.Enabled {
for _, conn := range cdrcInst.CdrsConns {
if conn.Address == utils.MetaInternal {
return errors.New("CDRS not enabled but referenced from CDRC")
@@ -488,162 +488,160 @@ func (self *CGRConfig) checkConfigSanity() error {
if !self.chargerSCfg.Enabled {
for _, conn := range self.sessionSCfg.ChargerSConns {
if conn.Address == utils.MetaInternal {
return errors.New("<SessionS> ChargerS not enabled")
return fmt.Errorf("<%s> %s not enabled", utils.SessionS, utils.ChargerS)
}
}
}
if !self.ralsCfg.RALsEnabled {
for _, smgRALsConn := range self.sessionSCfg.RALsConns {
if smgRALsConn.Address == utils.MetaInternal {
return errors.New("<SessionS> RALs not enabled but requested by SMGeneric component.")
return fmt.Errorf("<%s> %s not enabled but requested by SMGeneric component.", utils.SessionS, utils.RALService)
}
}
}
if !self.resourceSCfg.Enabled {
for _, conn := range self.sessionSCfg.ResSConns {
if conn.Address == utils.MetaInternal {
return errors.New("<SessionS> ResourceS not enabled but requested by SMGeneric component.")
return fmt.Errorf("<%s> %s not enabled but requested by SMGeneric component.", utils.SessionS, utils.ResourceS)
}
}
}
if !self.thresholdSCfg.Enabled {
for _, conn := range self.sessionSCfg.ThreshSConns {
if conn.Address == utils.MetaInternal {
return errors.New("<SessionS> ThresholdS not enabled but requested by SMGeneric component.")
return fmt.Errorf("<%s> %s not enabled but requested by SMGeneric component.", utils.SessionS, utils.ThresholdS)
}
}
}
if !self.statsCfg.Enabled {
for _, conn := range self.sessionSCfg.StatSConns {
if conn.Address == utils.MetaInternal {
return errors.New("<SessionS> StatS not enabled but requested by SMGeneric component.")
return fmt.Errorf("<%s> %s not enabled but requested by SMGeneric component.", utils.SessionS, utils.StatS)
}
}
}
if !self.supplierSCfg.Enabled {
for _, conn := range self.sessionSCfg.SupplSConns {
if conn.Address == utils.MetaInternal {
return errors.New("<SessionS> SupplierS not enabled but requested by SMGeneric component.")
return fmt.Errorf("<%s> %s not enabled but requested by SMGeneric component.", utils.SessionS, utils.SupplierS)
}
}
}
if !self.attributeSCfg.Enabled {
for _, conn := range self.sessionSCfg.AttrSConns {
if conn.Address == utils.MetaInternal {
return errors.New("<SessionS> AttributeS not enabled but requested by SMGeneric component.")
return fmt.Errorf("<%s> %s not enabled but requested by SMGeneric component.", utils.SessionS, utils.AttributeS)
}
}
}
if !self.cdrsCfg.CDRSEnabled {
for _, smgCDRSConn := range self.sessionSCfg.CDRsConns {
if smgCDRSConn.Address == utils.MetaInternal {
return errors.New("<SessionS> CDRS not enabled but referenced by SMGeneric component")
return fmt.Errorf("<%s> CDRS not enabled but referenced by SMGeneric component", utils.SessionS)
}
}
}
}
// FreeSWITCHAgent checks
if self.fsAgentCfg.Enabled && !self.dispatcherSCfg.Enabled {
if self.fsAgentCfg.Enabled {
if len(self.fsAgentCfg.SessionSConns) == 0 {
return fmt.Errorf("<%s> SMG definition is mandatory!", utils.FreeSWITCHAgent)
return fmt.Errorf("<%s> no %s connections defined",
utils.FreeSWITCHAgent, utils.SessionS)
}
for _, connCfg := range self.fsAgentCfg.SessionSConns {
// if connCfg.Address != utils.MetaInternal {
// return errors.New("only <*internal> connectivity allowed in in <freeswitch_agent> towards <sessions> for now")
// }
if connCfg.Address == utils.MetaInternal &&
!self.sessionSCfg.Enabled {
return errors.New("<sessions> not enabled but referenced by <freeswitch_agent>")
if !self.dispatcherSCfg.Enabled && // if dispatcher is enabled all internal connections are managed by it
!self.sessionSCfg.Enabled {
for _, connCfg := range self.fsAgentCfg.SessionSConns {
if connCfg.Address == utils.MetaInternal {
return fmt.Errorf("%s not enabled but referenced by %s",
utils.SessionS, utils.FreeSWITCHAgent)
}
}
}
}
// KamailioAgent checks
if self.kamAgentCfg.Enabled && !self.dispatcherSCfg.Enabled {
if self.kamAgentCfg.Enabled {
if len(self.kamAgentCfg.SessionSConns) == 0 {
return fmt.Errorf("<%s> SMG definition is mandatory!", utils.KamailioAgent)
return fmt.Errorf("<%s> no %s connections defined",
utils.KamailioAgent, utils.SessionS)
}
for _, connCfg := range self.kamAgentCfg.SessionSConns {
// if connCfg.Address != utils.MetaInternal {
// return errors.New("only <*internal> connectivity allowed in in <kamailio_agent> towards <sessions> for now")
// }
if connCfg.Address == utils.MetaInternal &&
!self.sessionSCfg.Enabled {
return errors.New("<sessions> not enabled but referenced by <kamailio_agent>")
}
}
}
// SMOpenSIPS checks
if self.SmOsipsConfig.Enabled && !self.dispatcherSCfg.Enabled {
if len(self.SmOsipsConfig.RALsConns) == 0 {
return errors.New("<SMOpenSIPS> Rater definition is mandatory!")
}
if !self.ralsCfg.RALsEnabled {
for _, smOsipsRaterConn := range self.SmOsipsConfig.RALsConns {
if smOsipsRaterConn.Address == utils.MetaInternal {
return errors.New("<SMOpenSIPS> RALs not enabled.")
}
}
}
if len(self.SmOsipsConfig.CDRsConns) == 0 {
return errors.New("<SMOpenSIPS> CDRs definition is mandatory!")
}
if !self.cdrsCfg.CDRSEnabled {
for _, smOsipsCDRSConn := range self.SmOsipsConfig.CDRsConns {
if smOsipsCDRSConn.Address == utils.MetaInternal {
return errors.New("<SMOpenSIPS> CDRS not enabled.")
if !self.dispatcherSCfg.Enabled && // if dispatcher is enabled all internal connections are managed by it
!self.sessionSCfg.Enabled {
for _, connCfg := range self.kamAgentCfg.SessionSConns {
if connCfg.Address == utils.MetaInternal {
return fmt.Errorf("%s not enabled but referenced by %s",
utils.SessionS, utils.KamailioAgent)
}
}
}
}
// AsteriskAgent checks
if self.asteriskAgentCfg.Enabled && !self.dispatcherSCfg.Enabled {
if self.asteriskAgentCfg.Enabled {
if len(self.asteriskAgentCfg.SessionSConns) == 0 {
return errors.New("<SMAsterisk> SMG definition is mandatory!")
return fmt.Errorf("<%s> no %s connections defined",
utils.AsteriskAgent, utils.SessionS)
}
for _, smAstSMGConn := range self.asteriskAgentCfg.SessionSConns {
if smAstSMGConn.Address == utils.MetaInternal && !self.sessionSCfg.Enabled {
return errors.New("<SMAsterisk> SMG not enabled.")
if !self.dispatcherSCfg.Enabled && // if dispatcher is enabled all internal connections are managed by it
!self.sessionSCfg.Enabled {
for _, smAstSMGConn := range self.asteriskAgentCfg.SessionSConns {
if smAstSMGConn.Address == utils.MetaInternal {
return fmt.Errorf("%s not enabled but referenced by %s",
utils.SessionS, utils.AsteriskAgent)
}
}
}
// if !self.sessionSCfg.Enabled {
// return errors.New("<SMAsterisk> SMG not enabled.")
// }
}
// DAgent checks
if self.diameterAgentCfg.Enabled && !self.sessionSCfg.Enabled && !self.dispatcherSCfg.Enabled {
if self.diameterAgentCfg.Enabled {
if len(self.diameterAgentCfg.SessionSConns) == 0 {
return fmt.Errorf("<%s> SMG definition is mandatory!", utils.DiameterAgent)
return fmt.Errorf("<%s> no %s connections defined",
utils.DiameterAgent, utils.SessionS)
}
for _, daSMGConn := range self.diameterAgentCfg.SessionSConns {
if daSMGConn.Address == utils.MetaInternal {
return fmt.Errorf("%s not enabled but referenced by %s component",
utils.SessionS, utils.DiameterAgent)
if !self.dispatcherSCfg.Enabled && // if dispatcher is enabled all internal connections are managed by it
!self.sessionSCfg.Enabled {
for _, daSMGConn := range self.diameterAgentCfg.SessionSConns {
if daSMGConn.Address == utils.MetaInternal {
return fmt.Errorf("%s not enabled but referenced by %s",
utils.SessionS, utils.DiameterAgent)
}
}
}
}
if self.radiusAgentCfg.Enabled && !self.sessionSCfg.Enabled && !self.dispatcherSCfg.Enabled {
if self.radiusAgentCfg.Enabled {
if len(self.radiusAgentCfg.SessionSConns) == 0 {
return fmt.Errorf("<%s> SMG definition is mandatory!", utils.RadiusAgent)
return fmt.Errorf("<%s> no %s connections defined",
utils.RadiusAgent, utils.SessionS)
}
for _, raSMGConn := range self.radiusAgentCfg.SessionSConns {
if raSMGConn.Address == utils.MetaInternal {
return errors.New("SMGeneric not enabled but referenced by RadiusAgent component")
if !self.dispatcherSCfg.Enabled && // if dispatcher is enabled all internal connections are managed by it
!self.sessionSCfg.Enabled {
for _, raSMGConn := range self.radiusAgentCfg.SessionSConns {
if raSMGConn.Address == utils.MetaInternal {
return fmt.Errorf("%s not enabled but referenced by %s",
utils.SessionS, utils.RadiusAgent)
}
}
}
}
if self.dnsAgentCfg.Enabled && !self.sessionSCfg.Enabled && !self.dispatcherSCfg.Enabled {
for _, sSConn := range self.dnsAgentCfg.SessionSConns {
if sSConn.Address == utils.MetaInternal {
return fmt.Errorf("%s not enabled but referenced by %s", utils.SessionS, utils.DNSAgent)
if self.dnsAgentCfg.Enabled {
if len(self.dnsAgentCfg.SessionSConns) == 0 {
return fmt.Errorf("<%s> no %s connections defined",
utils.DNSAgent, utils.SessionS)
}
if !self.dispatcherSCfg.Enabled && // if dispatcher is enabled all internal connections are managed by it
!self.sessionSCfg.Enabled {
for _, sSConn := range self.dnsAgentCfg.SessionSConns {
if sSConn.Address == utils.MetaInternal {
return fmt.Errorf("%s not enabled but referenced by %s", utils.SessionS, utils.DNSAgent)
}
}
}
}
// HTTPAgent checks
for _, httpAgentCfg := range self.httpAgentCfg {
// httpAgent checks
if !self.dispatcherSCfg.Enabled {
if !self.dispatcherSCfg.Enabled && // if dispatcher is enabled all internal connections are managed by it
self.sessionSCfg.Enabled {
for _, sSConn := range httpAgentCfg.SessionSConns {
if sSConn.Address == utils.MetaInternal && self.sessionSCfg.Enabled {
if sSConn.Address == utils.MetaInternal {
return errors.New("SessionS not enabled but referenced by HttpAgent component")
}
}
@@ -662,10 +660,10 @@ func (self *CGRConfig) checkConfigSanity() error {
return fmt.Errorf("<%s> process_runs needs to be bigger than 0", utils.AttributeS)
}
}
if self.chargerSCfg.Enabled && !self.dispatcherSCfg.Enabled {
if self.chargerSCfg.Enabled && !self.dispatcherSCfg.Enabled &&
(self.attributeSCfg == nil || !self.attributeSCfg.Enabled) {
for _, connCfg := range self.chargerSCfg.AttributeSConns {
if connCfg.Address == utils.MetaInternal &&
(self.attributeSCfg == nil || !self.attributeSCfg.Enabled) {
if connCfg.Address == utils.MetaInternal {
return errors.New("AttributeS not enabled but requested by ChargerS component.")
}
}
@@ -692,7 +690,7 @@ func (self *CGRConfig) checkConfigSanity() error {
if connCfg.Address != utils.MetaInternal {
return errors.New("Only <*internal> RALs connectivity allowed in SupplierS for now")
}
if connCfg.Address == utils.MetaInternal && !self.ralsCfg.RALsEnabled {
if !self.ralsCfg.RALsEnabled {
return errors.New("RALs not enabled but requested by SupplierS component.")
}
}

View File

@@ -338,72 +338,6 @@ func (self *OsipsConnConfig) loadFromJsonCfg(jsnCfg *OsipsConnJsonCfg) error {
return nil
}
// SM-OpenSIPS config section
type SmOsipsConfig struct {
Enabled bool
ListenUdp string
RALsConns []*RemoteHost
CDRsConns []*RemoteHost
CreateCdr bool
DebitInterval time.Duration
MinCallDuration time.Duration
MaxCallDuration time.Duration
EventsSubscribeInterval time.Duration
MiAddr string
}
func (self *SmOsipsConfig) loadFromJsonCfg(jsnCfg *SmOsipsJsonCfg) error {
var err error
if jsnCfg.Enabled != nil {
self.Enabled = *jsnCfg.Enabled
}
if jsnCfg.Listen_udp != nil {
self.ListenUdp = *jsnCfg.Listen_udp
}
if jsnCfg.Rals_conns != nil {
self.RALsConns = make([]*RemoteHost, len(*jsnCfg.Rals_conns))
for idx, jsnHaCfg := range *jsnCfg.Rals_conns {
self.RALsConns[idx] = NewDfltRemoteHost()
self.RALsConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Cdrs_conns != nil {
self.CDRsConns = make([]*RemoteHost, len(*jsnCfg.Cdrs_conns))
for idx, jsnHaCfg := range *jsnCfg.Cdrs_conns {
self.CDRsConns[idx] = NewDfltRemoteHost()
self.CDRsConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Create_cdr != nil {
self.CreateCdr = *jsnCfg.Create_cdr
}
if jsnCfg.Debit_interval != nil {
if self.DebitInterval, err = utils.ParseDurationWithNanosecs(*jsnCfg.Debit_interval); err != nil {
return err
}
}
if jsnCfg.Min_call_duration != nil {
if self.MinCallDuration, err = utils.ParseDurationWithNanosecs(*jsnCfg.Min_call_duration); err != nil {
return err
}
}
if jsnCfg.Max_call_duration != nil {
if self.MaxCallDuration, err = utils.ParseDurationWithNanosecs(*jsnCfg.Max_call_duration); err != nil {
return err
}
}
if jsnCfg.Events_subscribe_interval != nil {
if self.EventsSubscribeInterval, err = utils.ParseDurationWithNanosecs(*jsnCfg.Events_subscribe_interval); err != nil {
return err
}
}
if jsnCfg.Mi_addr != nil {
self.MiAddr = *jsnCfg.Mi_addr
}
return nil
}
// Uses stored defaults so we can pre-populate by loading from JSON config
func NewDefaultAsteriskConnCfg() *AsteriskConnCfg {
if dfltAstConnCfg == nil {

View File

@@ -42,6 +42,18 @@
"sessions": {
"enabled": true,
"attributes_conns": [
{"address": "*internal"},
],
"resources_conns": [
{"address": "*internal"},
],
"rals_conns": [
{"address": "*internal"}
],
"chargers_conns": [
{"address": "*internal"}
],
"listen_bijson": ":3014",
},

View File

@@ -50,6 +50,18 @@
"sessions": {
"enabled": true,
"attributes_conns": [
{"address": "*internal"},
],
"resources_conns": [
{"address": "*internal"},
],
"rals_conns": [
{"address": "*internal"}
],
"chargers_conns": [
{"address": "*internal"}
],
"listen_bijson": ":3014",
},