Add thresholds_conns option for sessions config

This commit is contained in:
TeoV
2018-02-28 07:14:05 -05:00
committed by Dan Christian Bogos
parent 1984934ae3
commit 3f461a5524
11 changed files with 43 additions and 8 deletions

View File

@@ -130,11 +130,11 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne
}
}
func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, internalSupplierSChan,
func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, internalThresholdSChan, internalSupplierSChan,
internalAttrSChan, internalCDRSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS Session service.")
var err error
var ralsConns, resSConns, suplSConns, attrSConns, cdrsConn *rpcclient.RpcClientPool
var ralsConns, resSConns, threshSConns, suplSConns, attrSConns, cdrsConn *rpcclient.RpcClientPool
if len(cfg.SessionSCfg().RALsConns) != 0 {
ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SessionSCfg().RALsConns, internalRaterChan, cfg.InternalTtl)
@@ -153,6 +153,15 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
return
}
}
if len(cfg.SessionSCfg().ThreshSConns) != 0 {
threshSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SessionSCfg().ThreshSConns, internalThresholdSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.SessionS, err.Error()))
exitChan <- true
return
}
}
if len(cfg.SessionSCfg().SupplSConns) != 0 {
suplSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SessionSCfg().SupplSConns, internalSupplierSChan, cfg.InternalTtl)
@@ -186,7 +195,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
exitChan <- true
return
}
sm := sessions.NewSMGeneric(cfg, ralsConns, resSConns, suplSConns,
sm := sessions.NewSMGeneric(cfg, ralsConns, resSConns, threshSConns, suplSConns,
attrSConns, cdrsConn, smgReplConns, cfg.DefaultTimezone)
if err = sm.Connect(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.SessionS, err))
@@ -872,7 +881,7 @@ func main() {
// Start SM-Generic
if cfg.SessionSCfg().Enabled {
go startSessionS(internalSMGChan, internalRaterChan, internalRsChan,
go startSessionS(internalSMGChan, internalRaterChan, internalRsChan, internalThresholdSChan,
internalSupplierSChan, internalAttributeSChan, internalCdrSChan, server, exitChan)
}
// Start FreeSWITCHAgent

View File

@@ -484,6 +484,11 @@ func (self *CGRConfig) checkConfigSanity() error {
return errors.New("<SessionS> ResourceS not enabled but requested by SMGeneric component.")
}
}
for _, conn := range self.sessionSCfg.ThreshSConns {
if conn.Address == utils.MetaInternal && !self.thresholdSCfg.Enabled {
return errors.New("<SessionS> ThresholdS not enabled but requested by SMGeneric component.")
}
}
for _, conn := range self.sessionSCfg.SupplSConns {
if conn.Address == utils.MetaInternal && !self.supplierSCfg.Enabled {
return errors.New("<SessionS> SupplierS not enabled but requested by SMGeneric component.")

View File

@@ -287,6 +287,7 @@ const CGRATES_CFG_JSON = `
{"address": "*internal"} // address where to reach CDR Server, empty to disable CDR capturing <*internal|x.y.z.y:1234>
],
"resources_conns": [], // address where to reach the ResourceS <""|*internal|127.0.0.1:2013>
"thresholds_conns": [], // address where to reach the ThresholdS <""|*internal|127.0.0.1:2013>
"suppliers_conns": [], // address where to reach the SupplierS <""|*internal|127.0.0.1:2013>
"attributes_conns": [], // address where to reach the AttributeS <""|*internal|127.0.0.1:2013>
"session_replication_conns": [], // replicate sessions towards these session services

View File

@@ -496,6 +496,7 @@ func TestSmgJsonCfg(t *testing.T) {
Address: utils.StringPointer(utils.MetaInternal),
}},
Resources_conns: &[]*HaPoolJsonCfg{},
Thresholds_conns: &[]*HaPoolJsonCfg{},
Suppliers_conns: &[]*HaPoolJsonCfg{},
Attributes_conns: &[]*HaPoolJsonCfg{},
Session_replication_conns: &[]*HaPoolJsonCfg{},

View File

@@ -454,6 +454,7 @@ func TestCgrCfgJSONDefaultsSMGenericCfg(t *testing.T) {
CDRsConns: []*HaPoolConfig{
&HaPoolConfig{Address: "*internal"}},
ResSConns: []*HaPoolConfig{},
ThreshSConns: []*HaPoolConfig{},
SupplSConns: []*HaPoolConfig{},
AttrSConns: []*HaPoolConfig{},
SessionReplicationConns: []*HaPoolConfig{},

View File

@@ -199,6 +199,7 @@ type SessionSJsonCfg struct {
Listen_bijson *string
Rals_conns *[]*HaPoolJsonCfg
Resources_conns *[]*HaPoolJsonCfg
Thresholds_conns *[]*HaPoolJsonCfg
Suppliers_conns *[]*HaPoolJsonCfg
Cdrs_conns *[]*HaPoolJsonCfg
Session_replication_conns *[]*HaPoolJsonCfg

View File

@@ -93,6 +93,7 @@ type SessionSCfg struct {
ListenBijson string
RALsConns []*HaPoolConfig
ResSConns []*HaPoolConfig
ThreshSConns []*HaPoolConfig
SupplSConns []*HaPoolConfig
AttrSConns []*HaPoolConfig
CDRsConns []*HaPoolConfig
@@ -133,6 +134,13 @@ func (self *SessionSCfg) loadFromJsonCfg(jsnCfg *SessionSJsonCfg) error {
self.ResSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Thresholds_conns != nil {
self.ThreshSConns = make([]*HaPoolConfig, len(*jsnCfg.Thresholds_conns))
for idx, jsnHaCfg := range *jsnCfg.Thresholds_conns {
self.ThreshSConns[idx] = NewDfltHaPoolConfig()
self.ThreshSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Suppliers_conns != nil {
self.SupplSConns = make([]*HaPoolConfig, len(*jsnCfg.Suppliers_conns))
for idx, jsnHaCfg := range *jsnCfg.Suppliers_conns {

View File

@@ -266,6 +266,7 @@
// {"address": "*internal"} // address where to reach CDR Server, empty to disable CDR capturing <*internal|x.y.z.y:1234>
// ],
// "resources_conns": [], // address where to reach the ResourceS <""|*internal|127.0.0.1:2013>
// "thresholds_conns": [], // address where to reach the ThresholdS <""|*internal|127.0.0.1:2013>
// "suppliers_conns": [], // address where to reach the SupplierS <""|*internal|127.0.0.1:2013>
// "attributes_conns": [], // address where to reach the AttributeS <""|*internal|127.0.0.1:2013>
// "session_replication_conns": [], // replicate sessions towards these session services

View File

@@ -63,6 +63,9 @@
"resources_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"thresholds_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"suppliers_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],

View File

@@ -64,7 +64,7 @@ type SMGReplicationConn struct {
Synchronous bool
}
func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS,
func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS, thdS,
splS, attrS, cdrsrv rpcclient.RpcClientConnection,
smgReplConns []*SMGReplicationConn, timezone string) *SMGeneric {
ssIdxCfg := cgrCfg.SessionSCfg().SessionIndexes
@@ -75,6 +75,9 @@ func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS,
if resS != nil && reflect.ValueOf(resS).IsNil() {
resS = nil
}
if thdS != nil && reflect.ValueOf(thdS).IsNil() {
thdS = nil
}
if splS != nil && reflect.ValueOf(splS).IsNil() {
splS = nil
}
@@ -87,6 +90,7 @@ func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS,
return &SMGeneric{cgrCfg: cgrCfg,
rals: rals,
resS: resS,
thdS: thdS,
splS: splS,
attrS: attrS,
cdrsrv: cdrsrv,
@@ -107,6 +111,7 @@ type SMGeneric struct {
cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple
rals rpcclient.RpcClientConnection // RALs connections
resS rpcclient.RpcClientConnection // ResourceS connections
thdS rpcclient.RpcClientConnection // ThresholdS connections
splS rpcclient.RpcClientConnection // SupplierS connections
attrS rpcclient.RpcClientConnection // AttributeS connections
cdrsrv rpcclient.RpcClientConnection // CDR server connections

View File

@@ -34,7 +34,7 @@ func init() {
}
func TestSMGSessionIndexing(t *testing.T) {
smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, "UTC")
smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, "UTC")
smGev := SMGenericEvent{
utils.EVENT_NAME: "TEST_EVENT",
utils.TOR: "*voice",
@@ -389,7 +389,7 @@ func TestSMGSessionIndexing(t *testing.T) {
}
func TestSMGActiveSessions(t *testing.T) {
smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, "UTC")
smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, "UTC")
smGev1 := SMGenericEvent{
utils.EVENT_NAME: "TEST_EVENT",
utils.TOR: "*voice",
@@ -463,7 +463,7 @@ func TestSMGActiveSessions(t *testing.T) {
}
func TestGetPassiveSessions(t *testing.T) {
smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, "UTC")
smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, "UTC")
if pSS := smg.getSessions("", true); len(pSS) != 0 {
t.Errorf("PassiveSessions: %+v", pSS)
}