diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index b3696e082..baa938545 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -135,10 +135,20 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, internalThresholdSChan, internalStatSChan, internalSupplierSChan, internalAttrSChan, - internalCDRSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) { + internalCDRSChan, internalChargerSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) { utils.Logger.Info("Starting CGRateS Session service.") var err error - var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, cdrsConn *rpcclient.RpcClientPool + var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, cdrsConn, chargerSConn *rpcclient.RpcClientPool + if len(cfg.SessionSCfg().ChargerSConns) != 0 { + chargerSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate, + cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.SessionSCfg().ChargerSConns, internalChargerSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.SessionS, utils.ChargerS, err.Error())) + exitChan <- true + return + } + } if len(cfg.SessionSCfg().RALsConns) != 0 { ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, @@ -216,7 +226,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in return } sm := sessions.NewSMGeneric(cfg, ralsConns, resSConns, threshSConns, statSConns, - suplSConns, attrSConns, cdrsConn, smgReplConns, cfg.DefaultTimezone) + suplSConns, attrSConns, cdrsConn, chargerSConn, smgReplConns, cfg.DefaultTimezone) if err = sm.Connect(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.SessionS, err)) } @@ -1232,8 +1242,10 @@ func main() { // Start SM-Generic if cfg.SessionSCfg().Enabled { - go startSessionS(internalSMGChan, internalRaterChan, internalRsChan, internalThresholdSChan, - internalStatSChan, internalSupplierSChan, internalAttributeSChan, internalCdrSChan, server, exitChan) + go startSessionS(internalSMGChan, internalRaterChan, + internalRsChan, internalThresholdSChan, + internalStatSChan, internalSupplierSChan, internalAttributeSChan, + internalCdrSChan, internalChargerSChan, server, exitChan) } // Start FreeSWITCHAgent if cfg.FsAgentCfg().Enabled { diff --git a/config/config.go b/config/config.go index a1e0ce252..01df70e40 100755 --- a/config/config.go +++ b/config/config.go @@ -526,6 +526,11 @@ func (self *CGRConfig) checkConfigSanity() error { if len(self.sessionSCfg.RALsConns) == 0 { return errors.New(" RALs definition is mandatory!") } + for _, conn := range self.sessionSCfg.ChargerSConns { + if conn.Address == utils.MetaInternal && !self.chargerSCfg.Enabled { + return errors.New(" ChargerS not enabled but requested") + } + } for _, smgRALsConn := range self.sessionSCfg.RALsConns { if smgRALsConn.Address == utils.MetaInternal && !self.RALsEnabled { return errors.New(" RALs not enabled but requested by SMGeneric component.") diff --git a/config/config_defaults.go b/config/config_defaults.go index 56ddf3cc3..e55a96f82 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -296,6 +296,7 @@ const CGRATES_CFG_JSON = ` "sessions": { "enabled": false, // starts session manager service: "listen_bijson": "127.0.0.1:2014", // address where to listen for bidirectional JSON-RPC requests + "chargers_conns": [], // address where to reach the charger service, empty to disable charger functionality: <""|*internal|x.y.z.y:1234> "rals_conns": [ {"address": "*internal"} // address where to reach the RALs <""|*internal|127.0.0.1:2013> ], diff --git a/config/config_json_test.go b/config/config_json_test.go index 7307e774f..3d0d8eebb 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -493,8 +493,9 @@ func TestDfCdrcJsonCfg(t *testing.T) { func TestSmgJsonCfg(t *testing.T) { eCfg := &SessionSJsonCfg{ - Enabled: utils.BoolPointer(false), - Listen_bijson: utils.StringPointer("127.0.0.1:2014"), + Enabled: utils.BoolPointer(false), + Listen_bijson: utils.StringPointer("127.0.0.1:2014"), + Chargers_conns: &[]*HaPoolJsonCfg{}, Rals_conns: &[]*HaPoolJsonCfg{ &HaPoolJsonCfg{ Address: utils.StringPointer(utils.MetaInternal), diff --git a/config/config_test.go b/config/config_test.go index a185d53c6..d608fc691 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -600,8 +600,9 @@ func TestCgrCfgJSONDefaultsCdreProfiles(t *testing.T) { func TestCgrCfgJSONDefaultsSMGenericCfg(t *testing.T) { eSessionSCfg := &SessionSCfg{ - Enabled: false, - ListenBijson: "127.0.0.1:2014", + Enabled: false, + ListenBijson: "127.0.0.1:2014", + ChargerSConns: []*HaPoolConfig{}, RALsConns: []*HaPoolConfig{ &HaPoolConfig{Address: "*internal"}}, CDRsConns: []*HaPoolConfig{ diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 4ac8452a5..57ec4028c 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -214,6 +214,7 @@ type CdrcJsonCfg struct { type SessionSJsonCfg struct { Enabled *bool Listen_bijson *string + Chargers_conns *[]*HaPoolJsonCfg Rals_conns *[]*HaPoolJsonCfg Resources_conns *[]*HaPoolJsonCfg Thresholds_conns *[]*HaPoolJsonCfg diff --git a/config/smconfig.go b/config/smconfig.go index cc96b9300..a0696c095 100644 --- a/config/smconfig.go +++ b/config/smconfig.go @@ -97,6 +97,7 @@ func (self *FsConnConfig) loadFromJsonCfg(jsnCfg *FsConnJsonCfg) error { type SessionSCfg struct { Enabled bool ListenBijson string + ChargerSConns []*HaPoolConfig RALsConns []*HaPoolConfig ResSConns []*HaPoolConfig ThreshSConns []*HaPoolConfig @@ -128,6 +129,13 @@ func (self *SessionSCfg) loadFromJsonCfg(jsnCfg *SessionSJsonCfg) error { if jsnCfg.Listen_bijson != nil { self.ListenBijson = *jsnCfg.Listen_bijson } + if jsnCfg.Chargers_conns != nil { + self.ChargerSConns = make([]*HaPoolConfig, len(*jsnCfg.Chargers_conns)) + for idx, jsnHaCfg := range *jsnCfg.Chargers_conns { + self.ChargerSConns[idx] = NewDfltHaPoolConfig() + self.ChargerSConns[idx].loadFromJsonCfg(jsnHaCfg) + } + } if jsnCfg.Rals_conns != nil { self.RALsConns = make([]*HaPoolConfig, len(*jsnCfg.Rals_conns)) for idx, jsnHaCfg := range *jsnCfg.Rals_conns { diff --git a/sessions/sessions.go b/sessions/sessions.go index 41e1a4f77..3da5feae9 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -65,7 +65,7 @@ type SMGReplicationConn struct { } func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS, thdS, - statS, splS, attrS, cdrsrv rpcclient.RpcClientConnection, + statS, splS, attrS, cdrsrv, chargerS rpcclient.RpcClientConnection, smgReplConns []*SMGReplicationConn, timezone string) *SMGeneric { ssIdxCfg := cgrCfg.SessionSCfg().SessionIndexes ssIdxCfg[utils.OriginID] = true // Make sure we have indexing for OriginID since it is a requirement on prefix searching @@ -90,7 +90,11 @@ func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS, thdS, if cdrsrv != nil && reflect.ValueOf(cdrsrv).IsNil() { cdrsrv = nil } + if chargerS != nil && reflect.ValueOf(chargerS).IsNil() { + chargerS = nil + } return &SMGeneric{cgrCfg: cgrCfg, + chargerS: chargerS, rals: rals, resS: resS, thdS: thdS, @@ -113,7 +117,8 @@ func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS, thdS, } type SMGeneric struct { - cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple + cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple + chargerS rpcclient.RpcClientConnection rals rpcclient.RpcClientConnection // RALs connections resS rpcclient.RpcClientConnection // ResourceS connections thdS rpcclient.RpcClientConnection // ThresholdS connections diff --git a/sessions/smgeneric_test.go b/sessions/smgeneric_test.go index 8e2905d31..76aab6640 100644 --- a/sessions/smgeneric_test.go +++ b/sessions/smgeneric_test.go @@ -34,7 +34,7 @@ func init() { } func TestSMGSessionIndexing(t *testing.T) { - smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, nil, "UTC") + smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC") smGev := SMGenericEvent{ utils.EVENT_NAME: "TEST_EVENT", utils.ToR: "*voice", @@ -389,7 +389,7 @@ func TestSMGSessionIndexing(t *testing.T) { } func TestSMGActiveSessions(t *testing.T) { - smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, nil, "UTC") + smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC") smGev1 := SMGenericEvent{ utils.EVENT_NAME: "TEST_EVENT", utils.ToR: "*voice", @@ -463,7 +463,7 @@ func TestSMGActiveSessions(t *testing.T) { } func TestGetPassiveSessions(t *testing.T) { - smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, nil, "UTC") + smg := NewSMGeneric(smgCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC") if pSS := smg.getSessions("", true); len(pSS) != 0 { t.Errorf("PassiveSessions: %+v", pSS) }