From 3f461a5524309f17c08b78e729abf00e820f66a0 Mon Sep 17 00:00:00 2001 From: TeoV Date: Wed, 28 Feb 2018 07:14:05 -0500 Subject: [PATCH] Add thresholds_conns option for sessions config --- cmd/cgr-engine/cgr-engine.go | 17 +++++++++++++---- config/config.go | 5 +++++ config/config_defaults.go | 1 + config/config_json_test.go | 1 + config/config_test.go | 1 + config/libconfig_json.go | 1 + config/smconfig.go | 8 ++++++++ data/conf/cgrates/cgrates.json | 1 + data/conf/samples/sessions/cgrates.json | 3 +++ sessions/sessions.go | 7 ++++++- sessions/smgeneric_test.go | 6 +++--- 11 files changed, 43 insertions(+), 8 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 25f62f467..234b923f0 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -130,11 +130,11 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne } } -func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, internalSupplierSChan, +func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, internalThresholdSChan, internalSupplierSChan, internalAttrSChan, internalCDRSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) { utils.Logger.Info("Starting CGRateS Session service.") var err error - var ralsConns, resSConns, suplSConns, attrSConns, cdrsConn *rpcclient.RpcClientPool + var ralsConns, resSConns, threshSConns, suplSConns, attrSConns, cdrsConn *rpcclient.RpcClientPool if len(cfg.SessionSCfg().RALsConns) != 0 { ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.SessionSCfg().RALsConns, internalRaterChan, cfg.InternalTtl) @@ -153,6 +153,15 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in return } } + if len(cfg.SessionSCfg().ThreshSConns) != 0 { + threshSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.SessionSCfg().ThreshSConns, internalThresholdSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.SessionS, err.Error())) + exitChan <- true + return + } + } if len(cfg.SessionSCfg().SupplSConns) != 0 { suplSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.SessionSCfg().SupplSConns, internalSupplierSChan, cfg.InternalTtl) @@ -186,7 +195,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in exitChan <- true return } - sm := sessions.NewSMGeneric(cfg, ralsConns, resSConns, suplSConns, + sm := sessions.NewSMGeneric(cfg, ralsConns, resSConns, threshSConns, suplSConns, attrSConns, cdrsConn, smgReplConns, cfg.DefaultTimezone) if err = sm.Connect(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.SessionS, err)) @@ -872,7 +881,7 @@ func main() { // Start SM-Generic if cfg.SessionSCfg().Enabled { - go startSessionS(internalSMGChan, internalRaterChan, internalRsChan, + go startSessionS(internalSMGChan, internalRaterChan, internalRsChan, internalThresholdSChan, internalSupplierSChan, internalAttributeSChan, internalCdrSChan, server, exitChan) } // Start FreeSWITCHAgent diff --git a/config/config.go b/config/config.go index 861566535..c5fd97fcf 100755 --- a/config/config.go +++ b/config/config.go @@ -484,6 +484,11 @@ func (self *CGRConfig) checkConfigSanity() error { return errors.New(" ResourceS not enabled but requested by SMGeneric component.") } } + for _, conn := range self.sessionSCfg.ThreshSConns { + if conn.Address == utils.MetaInternal && !self.thresholdSCfg.Enabled { + return errors.New(" ThresholdS not enabled but requested by SMGeneric component.") + } + } for _, conn := range self.sessionSCfg.SupplSConns { if conn.Address == utils.MetaInternal && !self.supplierSCfg.Enabled { return errors.New(" SupplierS not enabled but requested by SMGeneric component.") diff --git a/config/config_defaults.go b/config/config_defaults.go index ac70c6b22..c18e10eea 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -287,6 +287,7 @@ const CGRATES_CFG_JSON = ` {"address": "*internal"} // address where to reach CDR Server, empty to disable CDR capturing <*internal|x.y.z.y:1234> ], "resources_conns": [], // address where to reach the ResourceS <""|*internal|127.0.0.1:2013> + "thresholds_conns": [], // address where to reach the ThresholdS <""|*internal|127.0.0.1:2013> "suppliers_conns": [], // address where to reach the SupplierS <""|*internal|127.0.0.1:2013> "attributes_conns": [], // address where to reach the AttributeS <""|*internal|127.0.0.1:2013> "session_replication_conns": [], // replicate sessions towards these session services diff --git a/config/config_json_test.go b/config/config_json_test.go index d56244848..4895d20d3 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -496,6 +496,7 @@ func TestSmgJsonCfg(t *testing.T) { Address: utils.StringPointer(utils.MetaInternal), }}, Resources_conns: &[]*HaPoolJsonCfg{}, + Thresholds_conns: &[]*HaPoolJsonCfg{}, Suppliers_conns: &[]*HaPoolJsonCfg{}, Attributes_conns: &[]*HaPoolJsonCfg{}, Session_replication_conns: &[]*HaPoolJsonCfg{}, diff --git a/config/config_test.go b/config/config_test.go index c894480e1..41cd31579 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -454,6 +454,7 @@ func TestCgrCfgJSONDefaultsSMGenericCfg(t *testing.T) { CDRsConns: []*HaPoolConfig{ &HaPoolConfig{Address: "*internal"}}, ResSConns: []*HaPoolConfig{}, + ThreshSConns: []*HaPoolConfig{}, SupplSConns: []*HaPoolConfig{}, AttrSConns: []*HaPoolConfig{}, SessionReplicationConns: []*HaPoolConfig{}, diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 97cda944d..51d7de735 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -199,6 +199,7 @@ type SessionSJsonCfg struct { Listen_bijson *string Rals_conns *[]*HaPoolJsonCfg Resources_conns *[]*HaPoolJsonCfg + Thresholds_conns *[]*HaPoolJsonCfg Suppliers_conns *[]*HaPoolJsonCfg Cdrs_conns *[]*HaPoolJsonCfg Session_replication_conns *[]*HaPoolJsonCfg diff --git a/config/smconfig.go b/config/smconfig.go index 1557fcdd2..657bd9cdc 100644 --- a/config/smconfig.go +++ b/config/smconfig.go @@ -93,6 +93,7 @@ type SessionSCfg struct { ListenBijson string RALsConns []*HaPoolConfig ResSConns []*HaPoolConfig + ThreshSConns []*HaPoolConfig SupplSConns []*HaPoolConfig AttrSConns []*HaPoolConfig CDRsConns []*HaPoolConfig @@ -133,6 +134,13 @@ func (self *SessionSCfg) loadFromJsonCfg(jsnCfg *SessionSJsonCfg) error { self.ResSConns[idx].loadFromJsonCfg(jsnHaCfg) } } + if jsnCfg.Thresholds_conns != nil { + self.ThreshSConns = make([]*HaPoolConfig, len(*jsnCfg.Thresholds_conns)) + for idx, jsnHaCfg := range *jsnCfg.Thresholds_conns { + self.ThreshSConns[idx] = NewDfltHaPoolConfig() + self.ThreshSConns[idx].loadFromJsonCfg(jsnHaCfg) + } + } if jsnCfg.Suppliers_conns != nil { self.SupplSConns = make([]*HaPoolConfig, len(*jsnCfg.Suppliers_conns)) for idx, jsnHaCfg := range *jsnCfg.Suppliers_conns { diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index 41ee0b50f..9cf171c1a 100644 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -266,6 +266,7 @@ // {"address": "*internal"} // address where to reach CDR Server, empty to disable CDR capturing <*internal|x.y.z.y:1234> // ], // "resources_conns": [], // address where to reach the ResourceS <""|*internal|127.0.0.1:2013> +// "thresholds_conns": [], // address where to reach the ThresholdS <""|*internal|127.0.0.1:2013> // "suppliers_conns": [], // address where to reach the SupplierS <""|*internal|127.0.0.1:2013> // "attributes_conns": [], // address where to reach the AttributeS <""|*internal|127.0.0.1:2013> // "session_replication_conns": [], // replicate sessions towards these session services diff --git a/data/conf/samples/sessions/cgrates.json b/data/conf/samples/sessions/cgrates.json index 02393bbe7..212525355 100644 --- a/data/conf/samples/sessions/cgrates.json +++ b/data/conf/samples/sessions/cgrates.json @@ -63,6 +63,9 @@ "resources_conns": [ {"address": "127.0.0.1:2012", "transport": "*json"} ], + "thresholds_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} + ], "suppliers_conns": [ {"address": "127.0.0.1:2012", "transport": "*json"} ], diff --git a/sessions/sessions.go b/sessions/sessions.go index 851e5fc58..2482371bb 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -64,7 +64,7 @@ type SMGReplicationConn struct { Synchronous bool } -func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS, +func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS, thdS, splS, attrS, cdrsrv rpcclient.RpcClientConnection, smgReplConns []*SMGReplicationConn, timezone string) *SMGeneric { ssIdxCfg := cgrCfg.SessionSCfg().SessionIndexes @@ -75,6 +75,9 @@ func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS, if resS != nil && reflect.ValueOf(resS).IsNil() { resS = nil } + if thdS != nil && reflect.ValueOf(thdS).IsNil() { + thdS = nil + } if splS != nil && reflect.ValueOf(splS).IsNil() { splS = nil } @@ -87,6 +90,7 @@ func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS, return &SMGeneric{cgrCfg: cgrCfg, rals: rals, resS: resS, + thdS: thdS, splS: splS, attrS: attrS, cdrsrv: cdrsrv, @@ -107,6 +111,7 @@ type SMGeneric struct { cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple rals rpcclient.RpcClientConnection // RALs connections resS rpcclient.RpcClientConnection // ResourceS connections + thdS rpcclient.RpcClientConnection // ThresholdS connections splS rpcclient.RpcClientConnection // SupplierS connections attrS rpcclient.RpcClientConnection // AttributeS connections cdrsrv rpcclient.RpcClientConnection // CDR server connections diff --git a/sessions/smgeneric_test.go b/sessions/smgeneric_test.go index 27c87af8a..7ba1c1895 100644 --- a/sessions/smgeneric_test.go +++ b/sessions/smgeneric_test.go @@ -34,7 +34,7 @@ func init() { } func TestSMGSessionIndexing(t *testing.T) { - smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, "UTC") + smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, "UTC") smGev := SMGenericEvent{ utils.EVENT_NAME: "TEST_EVENT", utils.TOR: "*voice", @@ -389,7 +389,7 @@ func TestSMGSessionIndexing(t *testing.T) { } func TestSMGActiveSessions(t *testing.T) { - smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, "UTC") + smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, "UTC") smGev1 := SMGenericEvent{ utils.EVENT_NAME: "TEST_EVENT", utils.TOR: "*voice", @@ -463,7 +463,7 @@ func TestSMGActiveSessions(t *testing.T) { } func TestGetPassiveSessions(t *testing.T) { - smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, "UTC") + smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, "UTC") if pSS := smg.getSessions("", true); len(pSS) != 0 { t.Errorf("PassiveSessions: %+v", pSS) }