diff --git a/config/config.go b/config/config.go index c72316642..dc83d146f 100755 --- a/config/config.go +++ b/config/config.go @@ -1021,6 +1021,13 @@ func (cfg *CGRConfig) ERsCfg() *ERsCfg { return cfg.ersCfg } +// RPCConns reads the RPCConns configuration +func (cfg *CGRConfig) RPCConns() map[string]*RPCConn { + cfg.lks[RPCConnsJsonName].RLock() + defer cfg.lks[RPCConnsJsonName].RUnlock() + return cfg.rpcConns +} + // GetReloadChan returns the reload chanel for the given section func (cfg *CGRConfig) GetReloadChan(sectID string) chan struct{} { return cfg.rldChans[sectID] diff --git a/config/config_defaults.go b/config/config_defaults.go index fb6c5bac1..15263330e 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -188,6 +188,7 @@ const CGRATES_CFG_JSON = ` "*rpc_responses": {"limit": 0, "ttl": "2s", "static_ttl": false}, // RPC responses caching "*closed_sessions": {"limit": -1, "ttl": "10s", "static_ttl": false}, // closed sessions cached for CDRs "*load_ids": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control the load_ids for items + "*rpc_connections": {"limit": -1, "ttl": "", "static_ttl": false}, // RPC connections caching }, @@ -328,9 +329,7 @@ const CGRATES_CFG_JSON = ` "ers": { // EventReaderService "enabled": false, // starts the EventReader service: - "sessions_conns": [ // connections to SessionS: <*internal|127.0.0.1:2012> - {"address": "*internal"} - ], + "sessions_conns":["*internal"], // RPC Connections IDs "readers": [ { "id": "*default", // identifier of the EventReader profile diff --git a/config/config_json_test.go b/config/config_json_test.go index 8cc6adfa9..ae0479930 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -162,6 +162,8 @@ func TestCacheJsonCfg(t *testing.T) { utils.CacheLoadIDs: &CacheParamJsonCfg{Limit: utils.IntPointer(-1), Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false), Precache: utils.BoolPointer(false)}, + utils.CacheRPCConnections: &CacheParamJsonCfg{Limit: utils.IntPointer(-1), + Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)}, } if gCfg, err := dfCgrJsonCfg.CacheJsonCfg(); err != nil { @@ -1677,12 +1679,8 @@ func TestDfEventReaderCfg(t *testing.T) { Value: utils.StringPointer("~*req.13"), Mandatory: utils.BoolPointer(true)}, } eCfg := &ERsJsonCfg{ - Enabled: utils.BoolPointer(false), - Sessions_conns: &[]*RemoteHostJson{ - { - Address: utils.StringPointer(utils.MetaInternal), - }, - }, + Enabled: utils.BoolPointer(false), + Sessions_conns: &[]string{utils.MetaInternal}, Readers: &[]*EventReaderJsonCfg{ &EventReaderJsonCfg{ Id: utils.StringPointer(utils.MetaDefault), diff --git a/config/config_test.go b/config/config_test.go index df7b4b89a..786602a48 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -731,6 +731,8 @@ func TestCgrCfgJSONDefaultsCacheCFG(t *testing.T) { TTL: time.Duration(10 * time.Second), StaticTTL: false}, utils.CacheLoadIDs: &CacheParamCfg{Limit: -1, TTL: time.Duration(0), StaticTTL: false, Precache: false}, + utils.CacheRPCConnections: &CacheParamCfg{Limit: -1, + TTL: time.Duration(0), StaticTTL: false}, } if !reflect.DeepEqual(eCacheCfg, cgrCfg.CacheCfg()) { @@ -1812,12 +1814,8 @@ func TestCgrCfgV1GetConfigSection(t *testing.T) { func TestCgrCdfEventReader(t *testing.T) { eCfg := &ERsCfg{ - Enabled: false, - SessionSConns: []*RemoteHost{ - { - Address: utils.MetaInternal, - }, - }, + Enabled: false, + SessionSConns: []string{utils.MetaInternal}, Readers: []*EventReaderCfg{ &EventReaderCfg{ ID: utils.MetaDefault, diff --git a/config/configsanity.go b/config/configsanity.go index 82974db14..42cf02279 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -373,11 +373,9 @@ func (cfg *CGRConfig) checkConfigSanity() error { } // EventReader sanity checks if cfg.ersCfg.Enabled { - if !cfg.sessionSCfg.Enabled { - for _, connCfg := range cfg.ersCfg.SessionSConns { - if connCfg.Address == utils.MetaInternal { - return fmt.Errorf("<%s> not enabled but requested by EventReader component.", utils.SessionS) - } + for _, connCfg := range cfg.ersCfg.SessionSConns { + if _, has := cfg.rpcConns[connCfg]; !has { + return fmt.Errorf("<%s> Connection with id: <%s> not defined", utils.ERs, connCfg) } } for _, rdr := range cfg.ersCfg.Readers { diff --git a/config/configsanity_test.go b/config/configsanity_test.go index bb59a3115..803fab445 100644 --- a/config/configsanity_test.go +++ b/config/configsanity_test.go @@ -617,18 +617,14 @@ func TestConfigSanityScheduler(t *testing.T) { func TestConfigSanityEventReader(t *testing.T) { cfg, _ = NewDefaultCGRConfig() cfg.ersCfg = &ERsCfg{ - Enabled: true, - SessionSConns: []*RemoteHost{ - &RemoteHost{ - Address: utils.MetaInternal, - }, - }, + Enabled: true, + SessionSConns: []string{"unexistedConn"}, } - expected := " not enabled but requested by EventReader component." + expected := " Connection with id: not defined" if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected { t.Errorf("Expecting: %+q received: %+q", expected, err) } - cfg.sessionSCfg.Enabled = true + cfg.ersCfg.SessionSConns = []string{utils.MetaInternal} cfg.ersCfg.Readers = []*EventReaderCfg{ &EventReaderCfg{ diff --git a/config/erscfg.go b/config/erscfg.go index 67ac042f1..dd37d8f39 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -26,7 +26,7 @@ import ( type ERsCfg struct { Enabled bool - SessionSConns []*RemoteHost + SessionSConns []string Readers []*EventReaderCfg } @@ -38,10 +38,9 @@ func (erS *ERsCfg) loadFromJsonCfg(jsnCfg *ERsJsonCfg, sep string, dfltRdrCfg *E erS.Enabled = *jsnCfg.Enabled } if jsnCfg.Sessions_conns != nil { - erS.SessionSConns = make([]*RemoteHost, len(*jsnCfg.Sessions_conns)) - for idx, jsnHaCfg := range *jsnCfg.Sessions_conns { - erS.SessionSConns[idx] = NewDfltRemoteHost() - erS.SessionSConns[idx].loadFromJsonCfg(jsnHaCfg) + erS.SessionSConns = make([]string, len(*jsnCfg.Sessions_conns)) + for i, fID := range *jsnCfg.Sessions_conns { + erS.SessionSConns[i] = fID } } return erS.appendERsReaders(jsnCfg.Readers, sep, dfltRdrCfg) @@ -83,10 +82,9 @@ func (ers *ERsCfg) appendERsReaders(jsnReaders *[]*EventReaderJsonCfg, sep strin func (erS *ERsCfg) Clone() (cln *ERsCfg) { cln = new(ERsCfg) cln.Enabled = erS.Enabled - cln.SessionSConns = make([]*RemoteHost, len(erS.SessionSConns)) + cln.SessionSConns = make([]string, len(erS.SessionSConns)) for idx, sConn := range erS.SessionSConns { - clonedVal := *sConn - cln.SessionSConns[idx] = &clonedVal + cln.SessionSConns[idx] = sConn } cln.Readers = make([]*EventReaderCfg, len(erS.Readers)) for idx, rdr := range erS.Readers { diff --git a/config/erscfg_test.go b/config/erscfg_test.go index 73f75654f..94300b18f 100644 --- a/config/erscfg_test.go +++ b/config/erscfg_test.go @@ -97,12 +97,8 @@ func TestEventRedearClone(t *testing.T) { func TestEventReaderLoadFromJSON(t *testing.T) { expectedERsCfg := &ERsCfg{ - Enabled: true, - SessionSConns: []*RemoteHost{ - { - Address: utils.MetaInternal, - }, - }, + Enabled: true, + SessionSConns: []string{"conn1", "conn3"}, Readers: []*EventReaderCfg{ &EventReaderCfg{ ID: utils.MetaDefault, @@ -190,6 +186,7 @@ func TestEventReaderLoadFromJSON(t *testing.T) { cfgJSONStr := `{ "ers": { "enabled": true, + "sessions_conns":["conn1","conn3"], "readers": [ { "id": "file_reader1", diff --git a/config/libconfig_json.go b/config/libconfig_json.go index faeff6c01..839f4201f 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -185,7 +185,7 @@ type CdrcJsonCfg struct { // EventReaderSJsonCfg contains the configuration of EventReaderService type ERsJsonCfg struct { Enabled *bool - Sessions_conns *[]*RemoteHostJson + Sessions_conns *[]string Readers *[]*EventReaderJsonCfg } diff --git a/engine/caches.go b/engine/caches.go index da7d9425d..85e5fbb04 100644 --- a/engine/caches.go +++ b/engine/caches.go @@ -56,6 +56,7 @@ var precachedPartitions = utils.StringMap{ utils.CacheChargerProfiles: true, utils.CacheDispatcherProfiles: true, utils.CacheDispatcherHosts: true, + utils.CacheRPCConnections: true, utils.CacheAttributeFilterIndexes: true, utils.CacheResourceFilterIndexes: true, diff --git a/services/connmanager.go b/services/connmanager.go new file mode 100644 index 000000000..40b3804d0 --- /dev/null +++ b/services/connmanager.go @@ -0,0 +1,36 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "github.com/cgrates/cgrates/config" +) + +// NewConnManager returns the Connection Manager +func NewConnManager(cfg *config.CGRConfig) (cM *ConnManager) { + + return +} + +type ConnManager struct { +} + +func (cM *ConnManager) GetConn() { + +} diff --git a/services/ers.go b/services/ers.go index cac0b0405..15797ad88 100644 --- a/services/ers.go +++ b/services/ers.go @@ -75,11 +75,11 @@ func (erS *EventReaderService) Start() (err error) { utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ERs)) var sS rpcclient.RpcClientConnection - if sS, err = NewConnection(erS.cfg, erS.sSChan, erS.dispatcherChan, erS.cfg.ERsCfg().SessionSConns); err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>", - utils.ERs, utils.SessionS, err.Error())) - return - } + //if sS, err = NewConnection(erS.cfg, erS.sSChan, erS.dispatcherChan, erS.cfg.ERsCfg().SessionSConns); err != nil { + // utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>", + // utils.ERs, utils.SessionS, err.Error())) + // return + //} // build the service erS.ers = ers.NewERService(erS.cfg, filterS, sS, erS.stopChan) go func(ers *ers.ERService, rldChan chan struct{}) { @@ -99,11 +99,11 @@ func (erS *EventReaderService) GetIntenternalChan() (conn chan rpcclient.RpcClie // Reload handles the change of config func (erS *EventReaderService) Reload() (err error) { var sS rpcclient.RpcClientConnection - if sS, err = NewConnection(erS.cfg, erS.sSChan, erS.dispatcherChan, erS.cfg.ERsCfg().SessionSConns); err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>", - utils.ERs, utils.SessionS, err.Error())) - return - } + //if sS, err = NewConnection(erS.cfg, erS.sSChan, erS.dispatcherChan, erS.cfg.ERsCfg().SessionSConns); err != nil { + // utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>", + // utils.ERs, utils.SessionS, err.Error())) + // return + //} erS.RLock() erS.ers.SetSessionSConnection(sS) erS.rldChan <- struct{}{} diff --git a/services/utils.go b/services/utils.go index b7566fd0a..0bbf4cb93 100644 --- a/services/utils.go +++ b/services/utils.go @@ -25,7 +25,8 @@ import ( ) // NewConnection returns a new connection -func NewConnection(cfg *config.CGRConfig, serviceConnChan, dispatcherSChan chan rpcclient.RpcClientConnection, conns []*config.RemoteHost) (rpcclient.RpcClientConnection, error) { +func NewConnection(cfg *config.CGRConfig, serviceConnChan, dispatcherSChan chan rpcclient.RpcClientConnection, + conns []*config.RemoteHost) (rpcclient.RpcClientConnection, error) { if len(conns) == 0 { return nil, nil } diff --git a/utils/consts.go b/utils/consts.go index 7dc8298f2..40073aa3d 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -1446,6 +1446,7 @@ const ( MetaReady = "*ready" CacheLoadIDs = "*load_ids" CacheAccounts = "*accounts" + CacheRPCConnections = "*rpc_connections" ) // Prefix for indexing