mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-16 05:39:54 +05:00
Integrate StatS in SessionS
This commit is contained in:
committed by
Dan Christian Bogos
parent
d558d5508c
commit
c8c2c0921d
@@ -129,13 +129,15 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne
|
||||
}
|
||||
}
|
||||
|
||||
func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, internalThresholdSChan, internalSupplierSChan,
|
||||
internalAttrSChan, internalCDRSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) {
|
||||
func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, internalThresholdSChan,
|
||||
internalStatSChan, internalSupplierSChan, internalAttrSChan,
|
||||
internalCDRSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) {
|
||||
utils.Logger.Info("Starting CGRateS Session service.")
|
||||
var err error
|
||||
var ralsConns, resSConns, threshSConns, suplSConns, attrSConns, cdrsConn *rpcclient.RpcClientPool
|
||||
var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, cdrsConn *rpcclient.RpcClientPool
|
||||
if len(cfg.SessionSCfg().RALsConns) != 0 {
|
||||
ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
|
||||
ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
|
||||
cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
|
||||
cfg.SessionSCfg().RALsConns, internalRaterChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to RALs: %s", utils.SessionS, err.Error()))
|
||||
@@ -144,7 +146,8 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
|
||||
}
|
||||
}
|
||||
if len(cfg.SessionSCfg().ResSConns) != 0 {
|
||||
resSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
|
||||
resSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
|
||||
cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
|
||||
cfg.SessionSCfg().ResSConns, internalResourceSChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ResourceS: %s", utils.SessionS, err.Error()))
|
||||
@@ -161,6 +164,15 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
|
||||
return
|
||||
}
|
||||
}
|
||||
if len(cfg.SessionSCfg().StatSConns) != 0 {
|
||||
statSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
|
||||
cfg.SessionSCfg().StatSConns, internalStatSChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s", utils.SessionS, err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
if len(cfg.SessionSCfg().SupplSConns) != 0 {
|
||||
suplSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
|
||||
cfg.SessionSCfg().SupplSConns, internalSupplierSChan, cfg.InternalTtl)
|
||||
@@ -194,8 +206,8 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
sm := sessions.NewSMGeneric(cfg, ralsConns, resSConns, threshSConns, suplSConns,
|
||||
attrSConns, cdrsConn, smgReplConns, cfg.DefaultTimezone)
|
||||
sm := sessions.NewSMGeneric(cfg, ralsConns, resSConns, threshSConns, statSConns,
|
||||
suplSConns, attrSConns, cdrsConn, smgReplConns, cfg.DefaultTimezone)
|
||||
if err = sm.Connect(); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.SessionS, err))
|
||||
}
|
||||
@@ -881,7 +893,7 @@ func main() {
|
||||
// Start SM-Generic
|
||||
if cfg.SessionSCfg().Enabled {
|
||||
go startSessionS(internalSMGChan, internalRaterChan, internalRsChan, internalThresholdSChan,
|
||||
internalSupplierSChan, internalAttributeSChan, internalCdrSChan, server, exitChan)
|
||||
internalStatSChan, internalSupplierSChan, internalAttributeSChan, internalCdrSChan, server, exitChan)
|
||||
}
|
||||
// Start FreeSWITCHAgent
|
||||
if cfg.FsAgentCfg().Enabled {
|
||||
|
||||
@@ -489,6 +489,11 @@ func (self *CGRConfig) checkConfigSanity() error {
|
||||
return errors.New("<SessionS> ThresholdS not enabled but requested by SMGeneric component.")
|
||||
}
|
||||
}
|
||||
for _, conn := range self.sessionSCfg.StatSConns {
|
||||
if conn.Address == utils.MetaInternal && !self.statsCfg.Enabled {
|
||||
return errors.New("<SessionS> StatS not enabled but requested by SMGeneric component.")
|
||||
}
|
||||
}
|
||||
for _, conn := range self.sessionSCfg.SupplSConns {
|
||||
if conn.Address == utils.MetaInternal && !self.supplierSCfg.Enabled {
|
||||
return errors.New("<SessionS> SupplierS not enabled but requested by SMGeneric component.")
|
||||
|
||||
@@ -288,6 +288,7 @@ const CGRATES_CFG_JSON = `
|
||||
],
|
||||
"resources_conns": [], // address where to reach the ResourceS <""|*internal|127.0.0.1:2013>
|
||||
"thresholds_conns": [], // address where to reach the ThresholdS <""|*internal|127.0.0.1:2013>
|
||||
"stats_conns": [], // address where to reach the StatS <""|*internal|127.0.0.1:2013>
|
||||
"suppliers_conns": [], // address where to reach the SupplierS <""|*internal|127.0.0.1:2013>
|
||||
"attributes_conns": [], // address where to reach the AttributeS <""|*internal|127.0.0.1:2013>
|
||||
"session_replication_conns": [], // replicate sessions towards these session services
|
||||
|
||||
@@ -497,6 +497,7 @@ func TestSmgJsonCfg(t *testing.T) {
|
||||
}},
|
||||
Resources_conns: &[]*HaPoolJsonCfg{},
|
||||
Thresholds_conns: &[]*HaPoolJsonCfg{},
|
||||
Stats_conns: &[]*HaPoolJsonCfg{},
|
||||
Suppliers_conns: &[]*HaPoolJsonCfg{},
|
||||
Attributes_conns: &[]*HaPoolJsonCfg{},
|
||||
Session_replication_conns: &[]*HaPoolJsonCfg{},
|
||||
|
||||
@@ -455,6 +455,7 @@ func TestCgrCfgJSONDefaultsSMGenericCfg(t *testing.T) {
|
||||
&HaPoolConfig{Address: "*internal"}},
|
||||
ResSConns: []*HaPoolConfig{},
|
||||
ThreshSConns: []*HaPoolConfig{},
|
||||
StatSConns: []*HaPoolConfig{},
|
||||
SupplSConns: []*HaPoolConfig{},
|
||||
AttrSConns: []*HaPoolConfig{},
|
||||
SessionReplicationConns: []*HaPoolConfig{},
|
||||
|
||||
@@ -200,6 +200,7 @@ type SessionSJsonCfg struct {
|
||||
Rals_conns *[]*HaPoolJsonCfg
|
||||
Resources_conns *[]*HaPoolJsonCfg
|
||||
Thresholds_conns *[]*HaPoolJsonCfg
|
||||
Stats_conns *[]*HaPoolJsonCfg
|
||||
Suppliers_conns *[]*HaPoolJsonCfg
|
||||
Cdrs_conns *[]*HaPoolJsonCfg
|
||||
Session_replication_conns *[]*HaPoolJsonCfg
|
||||
|
||||
@@ -94,6 +94,7 @@ type SessionSCfg struct {
|
||||
RALsConns []*HaPoolConfig
|
||||
ResSConns []*HaPoolConfig
|
||||
ThreshSConns []*HaPoolConfig
|
||||
StatSConns []*HaPoolConfig
|
||||
SupplSConns []*HaPoolConfig
|
||||
AttrSConns []*HaPoolConfig
|
||||
CDRsConns []*HaPoolConfig
|
||||
@@ -141,6 +142,13 @@ func (self *SessionSCfg) loadFromJsonCfg(jsnCfg *SessionSJsonCfg) error {
|
||||
self.ThreshSConns[idx].loadFromJsonCfg(jsnHaCfg)
|
||||
}
|
||||
}
|
||||
if jsnCfg.Stats_conns != nil {
|
||||
self.StatSConns = make([]*HaPoolConfig, len(*jsnCfg.Stats_conns))
|
||||
for idx, jsnHaCfg := range *jsnCfg.Stats_conns {
|
||||
self.StatSConns[idx] = NewDfltHaPoolConfig()
|
||||
self.StatSConns[idx].loadFromJsonCfg(jsnHaCfg)
|
||||
}
|
||||
}
|
||||
if jsnCfg.Suppliers_conns != nil {
|
||||
self.SupplSConns = make([]*HaPoolConfig, len(*jsnCfg.Suppliers_conns))
|
||||
for idx, jsnHaCfg := range *jsnCfg.Suppliers_conns {
|
||||
|
||||
@@ -267,6 +267,7 @@
|
||||
// ],
|
||||
// "resources_conns": [], // address where to reach the ResourceS <""|*internal|127.0.0.1:2013>
|
||||
// "thresholds_conns": [], // address where to reach the ThresholdS <""|*internal|127.0.0.1:2013>
|
||||
// "stats_conns": [], // address where to reach the StatS <""|*internal|127.0.0.1:2013>
|
||||
// "suppliers_conns": [], // address where to reach the SupplierS <""|*internal|127.0.0.1:2013>
|
||||
// "attributes_conns": [], // address where to reach the AttributeS <""|*internal|127.0.0.1:2013>
|
||||
// "session_replication_conns": [], // replicate sessions towards these session services
|
||||
|
||||
@@ -52,6 +52,15 @@
|
||||
},
|
||||
|
||||
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
},
|
||||
|
||||
|
||||
"suppliers": {
|
||||
"enabled": true,
|
||||
},
|
||||
@@ -72,6 +81,9 @@
|
||||
"thresholds_conns": [
|
||||
{"address": "127.0.0.1:2012", "transport": "*json"}
|
||||
],
|
||||
"stats_conns": [
|
||||
{"address": "127.0.0.1:2012", "transport": "*json"}
|
||||
],
|
||||
"suppliers_conns": [
|
||||
{"address": "127.0.0.1:2012", "transport": "*json"}
|
||||
],
|
||||
|
||||
@@ -64,7 +64,7 @@ type SMGReplicationConn struct {
|
||||
}
|
||||
|
||||
func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS, thdS,
|
||||
splS, attrS, cdrsrv rpcclient.RpcClientConnection,
|
||||
statS, splS, attrS, cdrsrv rpcclient.RpcClientConnection,
|
||||
smgReplConns []*SMGReplicationConn, timezone string) *SMGeneric {
|
||||
ssIdxCfg := cgrCfg.SessionSCfg().SessionIndexes
|
||||
ssIdxCfg[utils.OriginID] = true // Make sure we have indexing for OriginID since it is a requirement on prefix searching
|
||||
@@ -77,6 +77,9 @@ func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS, thdS,
|
||||
if thdS != nil && reflect.ValueOf(thdS).IsNil() {
|
||||
thdS = nil
|
||||
}
|
||||
if statS != nil && reflect.ValueOf(statS).IsNil() {
|
||||
statS = nil
|
||||
}
|
||||
if splS != nil && reflect.ValueOf(splS).IsNil() {
|
||||
splS = nil
|
||||
}
|
||||
@@ -90,6 +93,7 @@ func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS, thdS,
|
||||
rals: rals,
|
||||
resS: resS,
|
||||
thdS: thdS,
|
||||
statS: statS,
|
||||
splS: splS,
|
||||
attrS: attrS,
|
||||
cdrsrv: cdrsrv,
|
||||
@@ -111,6 +115,7 @@ type SMGeneric struct {
|
||||
rals rpcclient.RpcClientConnection // RALs connections
|
||||
resS rpcclient.RpcClientConnection // ResourceS connections
|
||||
thdS rpcclient.RpcClientConnection // ThresholdS connections
|
||||
statS rpcclient.RpcClientConnection // StatS connections
|
||||
splS rpcclient.RpcClientConnection // SupplierS connections
|
||||
attrS rpcclient.RpcClientConnection // AttributeS connections
|
||||
cdrsrv rpcclient.RpcClientConnection // CDR server connections
|
||||
|
||||
@@ -34,7 +34,7 @@ func init() {
|
||||
}
|
||||
|
||||
func TestSMGSessionIndexing(t *testing.T) {
|
||||
smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, "UTC")
|
||||
smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, nil, "UTC")
|
||||
smGev := SMGenericEvent{
|
||||
utils.EVENT_NAME: "TEST_EVENT",
|
||||
utils.TOR: "*voice",
|
||||
@@ -389,7 +389,7 @@ func TestSMGSessionIndexing(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSMGActiveSessions(t *testing.T) {
|
||||
smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, "UTC")
|
||||
smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, nil, "UTC")
|
||||
smGev1 := SMGenericEvent{
|
||||
utils.EVENT_NAME: "TEST_EVENT",
|
||||
utils.TOR: "*voice",
|
||||
@@ -463,7 +463,7 @@ func TestSMGActiveSessions(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGetPassiveSessions(t *testing.T) {
|
||||
smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, "UTC")
|
||||
smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, nil, "UTC")
|
||||
if pSS := smg.getSessions("", true); len(pSS) != 0 {
|
||||
t.Errorf("PassiveSessions: %+v", pSS)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user