From c8c2c0921dde32fc0fd8279b1867630e2a1eba64 Mon Sep 17 00:00:00 2001 From: TeoV Date: Wed, 7 Mar 2018 11:24:00 -0500 Subject: [PATCH] Integrate StatS in SessionS --- cmd/cgr-engine/cgr-engine.go | 28 ++++++++++++++++++------- config/config.go | 5 +++++ config/config_defaults.go | 1 + config/config_json_test.go | 1 + config/config_test.go | 1 + config/libconfig_json.go | 1 + config/smconfig.go | 8 +++++++ data/conf/cgrates/cgrates.json | 1 + data/conf/samples/sessions/cgrates.json | 12 +++++++++++ sessions/sessions.go | 7 ++++++- sessions/smgeneric_test.go | 6 +++--- 11 files changed, 59 insertions(+), 12 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index d5ac3a147..0116b1751 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -129,13 +129,15 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne } } -func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, internalThresholdSChan, internalSupplierSChan, - internalAttrSChan, internalCDRSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) { +func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, internalThresholdSChan, + internalStatSChan, internalSupplierSChan, internalAttrSChan, + internalCDRSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) { utils.Logger.Info("Starting CGRateS Session service.") var err error - var ralsConns, resSConns, threshSConns, suplSConns, attrSConns, cdrsConn *rpcclient.RpcClientPool + var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, cdrsConn *rpcclient.RpcClientPool if len(cfg.SessionSCfg().RALsConns) != 0 { - ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, + cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.SessionSCfg().RALsConns, internalRaterChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to RALs: %s", utils.SessionS, err.Error())) @@ -144,7 +146,8 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in } } if len(cfg.SessionSCfg().ResSConns) != 0 { - resSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + resSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, + cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.SessionSCfg().ResSConns, internalResourceSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ResourceS: %s", utils.SessionS, err.Error())) @@ -161,6 +164,15 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in return } } + if len(cfg.SessionSCfg().StatSConns) != 0 { + statSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.SessionSCfg().StatSConns, internalStatSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s", utils.SessionS, err.Error())) + exitChan <- true + return + } + } if len(cfg.SessionSCfg().SupplSConns) != 0 { suplSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.SessionSCfg().SupplSConns, internalSupplierSChan, cfg.InternalTtl) @@ -194,8 +206,8 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in exitChan <- true return } - sm := sessions.NewSMGeneric(cfg, ralsConns, resSConns, threshSConns, suplSConns, - attrSConns, cdrsConn, smgReplConns, cfg.DefaultTimezone) + sm := sessions.NewSMGeneric(cfg, ralsConns, resSConns, threshSConns, statSConns, + suplSConns, attrSConns, cdrsConn, smgReplConns, cfg.DefaultTimezone) if err = sm.Connect(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.SessionS, err)) } @@ -881,7 +893,7 @@ func main() { // Start SM-Generic if cfg.SessionSCfg().Enabled { go startSessionS(internalSMGChan, internalRaterChan, internalRsChan, internalThresholdSChan, - internalSupplierSChan, internalAttributeSChan, internalCdrSChan, server, exitChan) + internalStatSChan, internalSupplierSChan, internalAttributeSChan, internalCdrSChan, server, exitChan) } // Start FreeSWITCHAgent if cfg.FsAgentCfg().Enabled { diff --git a/config/config.go b/config/config.go index c5fd97fcf..1e2c8af8d 100755 --- a/config/config.go +++ b/config/config.go @@ -489,6 +489,11 @@ func (self *CGRConfig) checkConfigSanity() error { return errors.New(" ThresholdS not enabled but requested by SMGeneric component.") } } + for _, conn := range self.sessionSCfg.StatSConns { + if conn.Address == utils.MetaInternal && !self.statsCfg.Enabled { + return errors.New(" StatS not enabled but requested by SMGeneric component.") + } + } for _, conn := range self.sessionSCfg.SupplSConns { if conn.Address == utils.MetaInternal && !self.supplierSCfg.Enabled { return errors.New(" SupplierS not enabled but requested by SMGeneric component.") diff --git a/config/config_defaults.go b/config/config_defaults.go index c18e10eea..d8ed278c2 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -288,6 +288,7 @@ const CGRATES_CFG_JSON = ` ], "resources_conns": [], // address where to reach the ResourceS <""|*internal|127.0.0.1:2013> "thresholds_conns": [], // address where to reach the ThresholdS <""|*internal|127.0.0.1:2013> + "stats_conns": [], // address where to reach the StatS <""|*internal|127.0.0.1:2013> "suppliers_conns": [], // address where to reach the SupplierS <""|*internal|127.0.0.1:2013> "attributes_conns": [], // address where to reach the AttributeS <""|*internal|127.0.0.1:2013> "session_replication_conns": [], // replicate sessions towards these session services diff --git a/config/config_json_test.go b/config/config_json_test.go index 4895d20d3..75e2f9b3c 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -497,6 +497,7 @@ func TestSmgJsonCfg(t *testing.T) { }}, Resources_conns: &[]*HaPoolJsonCfg{}, Thresholds_conns: &[]*HaPoolJsonCfg{}, + Stats_conns: &[]*HaPoolJsonCfg{}, Suppliers_conns: &[]*HaPoolJsonCfg{}, Attributes_conns: &[]*HaPoolJsonCfg{}, Session_replication_conns: &[]*HaPoolJsonCfg{}, diff --git a/config/config_test.go b/config/config_test.go index 41cd31579..315d953b6 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -455,6 +455,7 @@ func TestCgrCfgJSONDefaultsSMGenericCfg(t *testing.T) { &HaPoolConfig{Address: "*internal"}}, ResSConns: []*HaPoolConfig{}, ThreshSConns: []*HaPoolConfig{}, + StatSConns: []*HaPoolConfig{}, SupplSConns: []*HaPoolConfig{}, AttrSConns: []*HaPoolConfig{}, SessionReplicationConns: []*HaPoolConfig{}, diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 51d7de735..12ab1a941 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -200,6 +200,7 @@ type SessionSJsonCfg struct { Rals_conns *[]*HaPoolJsonCfg Resources_conns *[]*HaPoolJsonCfg Thresholds_conns *[]*HaPoolJsonCfg + Stats_conns *[]*HaPoolJsonCfg Suppliers_conns *[]*HaPoolJsonCfg Cdrs_conns *[]*HaPoolJsonCfg Session_replication_conns *[]*HaPoolJsonCfg diff --git a/config/smconfig.go b/config/smconfig.go index 657bd9cdc..c24079761 100644 --- a/config/smconfig.go +++ b/config/smconfig.go @@ -94,6 +94,7 @@ type SessionSCfg struct { RALsConns []*HaPoolConfig ResSConns []*HaPoolConfig ThreshSConns []*HaPoolConfig + StatSConns []*HaPoolConfig SupplSConns []*HaPoolConfig AttrSConns []*HaPoolConfig CDRsConns []*HaPoolConfig @@ -141,6 +142,13 @@ func (self *SessionSCfg) loadFromJsonCfg(jsnCfg *SessionSJsonCfg) error { self.ThreshSConns[idx].loadFromJsonCfg(jsnHaCfg) } } + if jsnCfg.Stats_conns != nil { + self.StatSConns = make([]*HaPoolConfig, len(*jsnCfg.Stats_conns)) + for idx, jsnHaCfg := range *jsnCfg.Stats_conns { + self.StatSConns[idx] = NewDfltHaPoolConfig() + self.StatSConns[idx].loadFromJsonCfg(jsnHaCfg) + } + } if jsnCfg.Suppliers_conns != nil { self.SupplSConns = make([]*HaPoolConfig, len(*jsnCfg.Suppliers_conns)) for idx, jsnHaCfg := range *jsnCfg.Suppliers_conns { diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index 9cf171c1a..1b2592668 100644 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -267,6 +267,7 @@ // ], // "resources_conns": [], // address where to reach the ResourceS <""|*internal|127.0.0.1:2013> // "thresholds_conns": [], // address where to reach the ThresholdS <""|*internal|127.0.0.1:2013> +// "stats_conns": [], // address where to reach the StatS <""|*internal|127.0.0.1:2013> // "suppliers_conns": [], // address where to reach the SupplierS <""|*internal|127.0.0.1:2013> // "attributes_conns": [], // address where to reach the AttributeS <""|*internal|127.0.0.1:2013> // "session_replication_conns": [], // replicate sessions towards these session services diff --git a/data/conf/samples/sessions/cgrates.json b/data/conf/samples/sessions/cgrates.json index 927cca91f..c85dd0717 100644 --- a/data/conf/samples/sessions/cgrates.json +++ b/data/conf/samples/sessions/cgrates.json @@ -52,6 +52,15 @@ }, +"stats": { + "enabled": true, + "store_interval": "1s", + "thresholds_conns": [ + {"address": "*internal"} + ], +}, + + "suppliers": { "enabled": true, }, @@ -72,6 +81,9 @@ "thresholds_conns": [ {"address": "127.0.0.1:2012", "transport": "*json"} ], + "stats_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} + ], "suppliers_conns": [ {"address": "127.0.0.1:2012", "transport": "*json"} ], diff --git a/sessions/sessions.go b/sessions/sessions.go index c0040a1ac..6ed97018d 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -64,7 +64,7 @@ type SMGReplicationConn struct { } func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS, thdS, - splS, attrS, cdrsrv rpcclient.RpcClientConnection, + statS, splS, attrS, cdrsrv rpcclient.RpcClientConnection, smgReplConns []*SMGReplicationConn, timezone string) *SMGeneric { ssIdxCfg := cgrCfg.SessionSCfg().SessionIndexes ssIdxCfg[utils.OriginID] = true // Make sure we have indexing for OriginID since it is a requirement on prefix searching @@ -77,6 +77,9 @@ func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS, thdS, if thdS != nil && reflect.ValueOf(thdS).IsNil() { thdS = nil } + if statS != nil && reflect.ValueOf(statS).IsNil() { + statS = nil + } if splS != nil && reflect.ValueOf(splS).IsNil() { splS = nil } @@ -90,6 +93,7 @@ func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS, thdS, rals: rals, resS: resS, thdS: thdS, + statS: statS, splS: splS, attrS: attrS, cdrsrv: cdrsrv, @@ -111,6 +115,7 @@ type SMGeneric struct { rals rpcclient.RpcClientConnection // RALs connections resS rpcclient.RpcClientConnection // ResourceS connections thdS rpcclient.RpcClientConnection // ThresholdS connections + statS rpcclient.RpcClientConnection // StatS connections splS rpcclient.RpcClientConnection // SupplierS connections attrS rpcclient.RpcClientConnection // AttributeS connections cdrsrv rpcclient.RpcClientConnection // CDR server connections diff --git a/sessions/smgeneric_test.go b/sessions/smgeneric_test.go index 7ba1c1895..78aca735f 100644 --- a/sessions/smgeneric_test.go +++ b/sessions/smgeneric_test.go @@ -34,7 +34,7 @@ func init() { } func TestSMGSessionIndexing(t *testing.T) { - smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, "UTC") + smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, nil, "UTC") smGev := SMGenericEvent{ utils.EVENT_NAME: "TEST_EVENT", utils.TOR: "*voice", @@ -389,7 +389,7 @@ func TestSMGSessionIndexing(t *testing.T) { } func TestSMGActiveSessions(t *testing.T) { - smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, "UTC") + smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, nil, "UTC") smGev1 := SMGenericEvent{ utils.EVENT_NAME: "TEST_EVENT", utils.TOR: "*voice", @@ -463,7 +463,7 @@ func TestSMGActiveSessions(t *testing.T) { } func TestGetPassiveSessions(t *testing.T) { - smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, "UTC") + smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, nil, "UTC") if pSS := smg.getSessions("", true); len(pSS) != 0 { t.Errorf("PassiveSessions: %+v", pSS) }