Add skeleton for ConnManager service

This commit is contained in:
TeoV
2019-11-29 15:40:38 +02:00
parent 981894d779
commit eda2958f99
14 changed files with 84 additions and 54 deletions

View File

@@ -1021,6 +1021,13 @@ func (cfg *CGRConfig) ERsCfg() *ERsCfg {
return cfg.ersCfg
}
// RPCConns reads the RPCConns configuration
func (cfg *CGRConfig) RPCConns() map[string]*RPCConn {
cfg.lks[RPCConnsJsonName].RLock()
defer cfg.lks[RPCConnsJsonName].RUnlock()
return cfg.rpcConns
}
// GetReloadChan returns the reload chanel for the given section
func (cfg *CGRConfig) GetReloadChan(sectID string) chan struct{} {
return cfg.rldChans[sectID]

View File

@@ -188,6 +188,7 @@ const CGRATES_CFG_JSON = `
"*rpc_responses": {"limit": 0, "ttl": "2s", "static_ttl": false}, // RPC responses caching
"*closed_sessions": {"limit": -1, "ttl": "10s", "static_ttl": false}, // closed sessions cached for CDRs
"*load_ids": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control the load_ids for items
"*rpc_connections": {"limit": -1, "ttl": "", "static_ttl": false}, // RPC connections caching
},
@@ -328,9 +329,7 @@ const CGRATES_CFG_JSON = `
"ers": { // EventReaderService
"enabled": false, // starts the EventReader service: <true|false>
"sessions_conns": [ // connections to SessionS: <*internal|127.0.0.1:2012>
{"address": "*internal"}
],
"sessions_conns":["*internal"], // RPC Connections IDs
"readers": [
{
"id": "*default", // identifier of the EventReader profile

View File

@@ -162,6 +162,8 @@ func TestCacheJsonCfg(t *testing.T) {
utils.CacheLoadIDs: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
utils.CacheRPCConnections: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)},
}
if gCfg, err := dfCgrJsonCfg.CacheJsonCfg(); err != nil {
@@ -1677,12 +1679,8 @@ func TestDfEventReaderCfg(t *testing.T) {
Value: utils.StringPointer("~*req.13"), Mandatory: utils.BoolPointer(true)},
}
eCfg := &ERsJsonCfg{
Enabled: utils.BoolPointer(false),
Sessions_conns: &[]*RemoteHostJson{
{
Address: utils.StringPointer(utils.MetaInternal),
},
},
Enabled: utils.BoolPointer(false),
Sessions_conns: &[]string{utils.MetaInternal},
Readers: &[]*EventReaderJsonCfg{
&EventReaderJsonCfg{
Id: utils.StringPointer(utils.MetaDefault),

View File

@@ -731,6 +731,8 @@ func TestCgrCfgJSONDefaultsCacheCFG(t *testing.T) {
TTL: time.Duration(10 * time.Second), StaticTTL: false},
utils.CacheLoadIDs: &CacheParamCfg{Limit: -1,
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheRPCConnections: &CacheParamCfg{Limit: -1,
TTL: time.Duration(0), StaticTTL: false},
}
if !reflect.DeepEqual(eCacheCfg, cgrCfg.CacheCfg()) {
@@ -1812,12 +1814,8 @@ func TestCgrCfgV1GetConfigSection(t *testing.T) {
func TestCgrCdfEventReader(t *testing.T) {
eCfg := &ERsCfg{
Enabled: false,
SessionSConns: []*RemoteHost{
{
Address: utils.MetaInternal,
},
},
Enabled: false,
SessionSConns: []string{utils.MetaInternal},
Readers: []*EventReaderCfg{
&EventReaderCfg{
ID: utils.MetaDefault,

View File

@@ -373,11 +373,9 @@ func (cfg *CGRConfig) checkConfigSanity() error {
}
// EventReader sanity checks
if cfg.ersCfg.Enabled {
if !cfg.sessionSCfg.Enabled {
for _, connCfg := range cfg.ersCfg.SessionSConns {
if connCfg.Address == utils.MetaInternal {
return fmt.Errorf("<%s> not enabled but requested by EventReader component.", utils.SessionS)
}
for _, connCfg := range cfg.ersCfg.SessionSConns {
if _, has := cfg.rpcConns[connCfg]; !has {
return fmt.Errorf("<%s> Connection with id: <%s> not defined", utils.ERs, connCfg)
}
}
for _, rdr := range cfg.ersCfg.Readers {

View File

@@ -617,18 +617,14 @@ func TestConfigSanityScheduler(t *testing.T) {
func TestConfigSanityEventReader(t *testing.T) {
cfg, _ = NewDefaultCGRConfig()
cfg.ersCfg = &ERsCfg{
Enabled: true,
SessionSConns: []*RemoteHost{
&RemoteHost{
Address: utils.MetaInternal,
},
},
Enabled: true,
SessionSConns: []string{"unexistedConn"},
}
expected := "<SessionS> not enabled but requested by EventReader component."
expected := "<ERs> Connection with id: <unexistedConn> not defined"
if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected {
t.Errorf("Expecting: %+q received: %+q", expected, err)
}
cfg.sessionSCfg.Enabled = true
cfg.ersCfg.SessionSConns = []string{utils.MetaInternal}
cfg.ersCfg.Readers = []*EventReaderCfg{
&EventReaderCfg{

View File

@@ -26,7 +26,7 @@ import (
type ERsCfg struct {
Enabled bool
SessionSConns []*RemoteHost
SessionSConns []string
Readers []*EventReaderCfg
}
@@ -38,10 +38,9 @@ func (erS *ERsCfg) loadFromJsonCfg(jsnCfg *ERsJsonCfg, sep string, dfltRdrCfg *E
erS.Enabled = *jsnCfg.Enabled
}
if jsnCfg.Sessions_conns != nil {
erS.SessionSConns = make([]*RemoteHost, len(*jsnCfg.Sessions_conns))
for idx, jsnHaCfg := range *jsnCfg.Sessions_conns {
erS.SessionSConns[idx] = NewDfltRemoteHost()
erS.SessionSConns[idx].loadFromJsonCfg(jsnHaCfg)
erS.SessionSConns = make([]string, len(*jsnCfg.Sessions_conns))
for i, fID := range *jsnCfg.Sessions_conns {
erS.SessionSConns[i] = fID
}
}
return erS.appendERsReaders(jsnCfg.Readers, sep, dfltRdrCfg)
@@ -83,10 +82,9 @@ func (ers *ERsCfg) appendERsReaders(jsnReaders *[]*EventReaderJsonCfg, sep strin
func (erS *ERsCfg) Clone() (cln *ERsCfg) {
cln = new(ERsCfg)
cln.Enabled = erS.Enabled
cln.SessionSConns = make([]*RemoteHost, len(erS.SessionSConns))
cln.SessionSConns = make([]string, len(erS.SessionSConns))
for idx, sConn := range erS.SessionSConns {
clonedVal := *sConn
cln.SessionSConns[idx] = &clonedVal
cln.SessionSConns[idx] = sConn
}
cln.Readers = make([]*EventReaderCfg, len(erS.Readers))
for idx, rdr := range erS.Readers {

View File

@@ -97,12 +97,8 @@ func TestEventRedearClone(t *testing.T) {
func TestEventReaderLoadFromJSON(t *testing.T) {
expectedERsCfg := &ERsCfg{
Enabled: true,
SessionSConns: []*RemoteHost{
{
Address: utils.MetaInternal,
},
},
Enabled: true,
SessionSConns: []string{"conn1", "conn3"},
Readers: []*EventReaderCfg{
&EventReaderCfg{
ID: utils.MetaDefault,
@@ -190,6 +186,7 @@ func TestEventReaderLoadFromJSON(t *testing.T) {
cfgJSONStr := `{
"ers": {
"enabled": true,
"sessions_conns":["conn1","conn3"],
"readers": [
{
"id": "file_reader1",

View File

@@ -185,7 +185,7 @@ type CdrcJsonCfg struct {
// EventReaderSJsonCfg contains the configuration of EventReaderService
type ERsJsonCfg struct {
Enabled *bool
Sessions_conns *[]*RemoteHostJson
Sessions_conns *[]string
Readers *[]*EventReaderJsonCfg
}

View File

@@ -56,6 +56,7 @@ var precachedPartitions = utils.StringMap{
utils.CacheChargerProfiles: true,
utils.CacheDispatcherProfiles: true,
utils.CacheDispatcherHosts: true,
utils.CacheRPCConnections: true,
utils.CacheAttributeFilterIndexes: true,
utils.CacheResourceFilterIndexes: true,

36
services/connmanager.go Normal file
View File

@@ -0,0 +1,36 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package services
import (
"github.com/cgrates/cgrates/config"
)
// NewConnManager returns the Connection Manager
func NewConnManager(cfg *config.CGRConfig) (cM *ConnManager) {
return
}
type ConnManager struct {
}
func (cM *ConnManager) GetConn() {
}

View File

@@ -75,11 +75,11 @@ func (erS *EventReaderService) Start() (err error) {
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ERs))
var sS rpcclient.RpcClientConnection
if sS, err = NewConnection(erS.cfg, erS.sSChan, erS.dispatcherChan, erS.cfg.ERsCfg().SessionSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>",
utils.ERs, utils.SessionS, err.Error()))
return
}
//if sS, err = NewConnection(erS.cfg, erS.sSChan, erS.dispatcherChan, erS.cfg.ERsCfg().SessionSConns); err != nil {
// utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>",
// utils.ERs, utils.SessionS, err.Error()))
// return
//}
// build the service
erS.ers = ers.NewERService(erS.cfg, filterS, sS, erS.stopChan)
go func(ers *ers.ERService, rldChan chan struct{}) {
@@ -99,11 +99,11 @@ func (erS *EventReaderService) GetIntenternalChan() (conn chan rpcclient.RpcClie
// Reload handles the change of config
func (erS *EventReaderService) Reload() (err error) {
var sS rpcclient.RpcClientConnection
if sS, err = NewConnection(erS.cfg, erS.sSChan, erS.dispatcherChan, erS.cfg.ERsCfg().SessionSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>",
utils.ERs, utils.SessionS, err.Error()))
return
}
//if sS, err = NewConnection(erS.cfg, erS.sSChan, erS.dispatcherChan, erS.cfg.ERsCfg().SessionSConns); err != nil {
// utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>",
// utils.ERs, utils.SessionS, err.Error()))
// return
//}
erS.RLock()
erS.ers.SetSessionSConnection(sS)
erS.rldChan <- struct{}{}

View File

@@ -25,7 +25,8 @@ import (
)
// NewConnection returns a new connection
func NewConnection(cfg *config.CGRConfig, serviceConnChan, dispatcherSChan chan rpcclient.RpcClientConnection, conns []*config.RemoteHost) (rpcclient.RpcClientConnection, error) {
func NewConnection(cfg *config.CGRConfig, serviceConnChan, dispatcherSChan chan rpcclient.RpcClientConnection,
conns []*config.RemoteHost) (rpcclient.RpcClientConnection, error) {
if len(conns) == 0 {
return nil, nil
}

View File

@@ -1446,6 +1446,7 @@ const (
MetaReady = "*ready"
CacheLoadIDs = "*load_ids"
CacheAccounts = "*accounts"
CacheRPCConnections = "*rpc_connections"
)
// Prefix for indexing