Changed dispatcher in registrarc to dispatchers and removed enabled from registrarc

This commit is contained in:
andronache
2021-06-10 16:04:54 +03:00
committed by Dan Christian Bogos
parent fcddc96f30
commit 779d46204d
17 changed files with 76 additions and 128 deletions

View File

@@ -151,9 +151,9 @@ func newCGRConfig(config []byte) (cfg *CGRConfig, err error) {
cfg.dispatcherSCfg = new(DispatcherSCfg)
cfg.registrarCCfg = new(RegistrarCCfgs)
cfg.registrarCCfg.RPC = new(RegistrarCCfg)
cfg.registrarCCfg.Dispatcher = new(RegistrarCCfg)
cfg.registrarCCfg.Dispatchers = new(RegistrarCCfg)
cfg.registrarCCfg.RPC.Hosts = make(map[string][]*RemoteHost)
cfg.registrarCCfg.Dispatcher.Hosts = make(map[string][]*RemoteHost)
cfg.registrarCCfg.Dispatchers.Hosts = make(map[string][]*RemoteHost)
cfg.loaderCgrCfg = new(LoaderCgrCfg)
cfg.migratorCgrCfg = new(MigratorCgrCfg)
cfg.migratorCgrCfg.OutDataDBOpts = make(map[string]interface{})

View File

@@ -1001,7 +1001,7 @@ const CGRATES_CFG_JSON = `
"hosts": {},
"refresh_interval": "5m",
},
"dispatcher":{
"dispatchers":{
"enabled": false,
"registrars_conns": [],
"hosts": {},

File diff suppressed because one or more lines are too long

View File

@@ -946,24 +946,21 @@ func (cfg *CGRConfig) checkConfigSanity() error {
}
}
if cfg.registrarCCfg.Dispatcher.Enabled {
if len(cfg.registrarCCfg.Dispatcher.Hosts) == 0 {
if len(cfg.registrarCCfg.Dispatchers.RegistrarSConns) != 0 {
if len(cfg.registrarCCfg.Dispatchers.Hosts) == 0 {
return fmt.Errorf("<%s> missing dispatcher host IDs", utils.RegistrarC)
}
if cfg.registrarCCfg.Dispatcher.RefreshInterval <= 0 {
if cfg.registrarCCfg.Dispatchers.RefreshInterval <= 0 {
return fmt.Errorf("<%s> the register imterval needs to be bigger than 0", utils.RegistrarC)
}
for tnt, hosts := range cfg.registrarCCfg.Dispatcher.Hosts {
for tnt, hosts := range cfg.registrarCCfg.Dispatchers.Hosts {
for _, host := range hosts {
if !utils.SliceHasMember([]string{utils.MetaGOB, rpcclient.HTTPjson, utils.MetaJSON, rpcclient.BiRPCJSON, rpcclient.BiRPCGOB}, host.Transport) {
return fmt.Errorf("<%s> unsupported transport <%s> for host <%s>", utils.RegistrarC, host.Transport, utils.ConcatenatedKey(tnt, host.ID))
}
}
}
if len(cfg.registrarCCfg.Dispatcher.RegistrarSConns) == 0 {
return fmt.Errorf("<%s> missing dispatcher connection IDs", utils.RegistrarC)
}
for _, connID := range cfg.registrarCCfg.Dispatcher.RegistrarSConns {
for _, connID := range cfg.registrarCCfg.Dispatchers.RegistrarSConns {
if connID == utils.MetaInternal {
return fmt.Errorf("<%s> internal connection IDs are not supported", utils.RegistrarC)
}
@@ -980,7 +977,7 @@ func (cfg *CGRConfig) checkConfigSanity() error {
}
}
if cfg.registrarCCfg.RPC.Enabled {
if len(cfg.registrarCCfg.RPC.RegistrarSConns) != 0 {
if len(cfg.registrarCCfg.RPC.Hosts) == 0 {
return fmt.Errorf("<%s> missing RPC host IDs", utils.RegistrarC)
}
@@ -994,9 +991,6 @@ func (cfg *CGRConfig) checkConfigSanity() error {
}
}
}
if len(cfg.registrarCCfg.RPC.RegistrarSConns) == 0 {
return fmt.Errorf("<%s> missing RPC connection IDs", utils.RegistrarC)
}
for _, connID := range cfg.registrarCCfg.RPC.RegistrarSConns {
if connID == utils.MetaInternal {
return fmt.Errorf("<%s> internal connection IDs are not supported", utils.RegistrarC)

View File

@@ -1399,12 +1399,12 @@ func TestConfigSanityRegistrarCRPC(t *testing.T) {
cfg.registrarCCfg = &RegistrarCCfgs{
RPC: &RegistrarCCfg{
Enabled: true,
RegistrarSConns: []string{utils.MetaLocalHost},
Hosts: map[string][]*RemoteHost{
"hosts": {},
},
},
Dispatcher: &RegistrarCCfg{},
Dispatchers: &RegistrarCCfg{},
}
expected := "<RegistrarC> the register imterval needs to be bigger than 0"
@@ -1432,10 +1432,6 @@ func TestConfigSanityRegistrarCRPC(t *testing.T) {
}
cfg.registrarCCfg.RPC.Hosts["hosts"][0].Transport = utils.MetaJSON
expected = "<RegistrarC> missing RPC connection IDs"
if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected {
t.Errorf("Expecting: %+q received: %+q", expected, err)
}
cfg.registrarCCfg.RPC.RegistrarSConns = []string{utils.MetaInternal}
expected = "<RegistrarC> internal connection IDs are not supported"
@@ -1468,8 +1464,8 @@ func TestConfigSanityRegistrarCDispatcher(t *testing.T) {
cfg := NewDefaultCGRConfig()
cfg.registrarCCfg = &RegistrarCCfgs{
Dispatcher: &RegistrarCCfg{
Enabled: true,
Dispatchers: &RegistrarCCfg{
RegistrarSConns: []string{utils.MetaLocalHost},
Hosts: map[string][]*RemoteHost{
"hosts": {},
},
@@ -1482,14 +1478,9 @@ func TestConfigSanityRegistrarCDispatcher(t *testing.T) {
t.Errorf("Expecting: %+q received: %+q", expected, err)
}
cfg.registrarCCfg.Dispatcher.Hosts = nil
expected = "<RegistrarC> missing dispatcher host IDs"
if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected {
t.Errorf("Expecting: %+q received: %+q", expected, err)
}
cfg.registrarCCfg.Dispatcher.RefreshInterval = 2
cfg.registrarCCfg.Dispatcher.Hosts = map[string][]*RemoteHost{
cfg.registrarCCfg.Dispatchers.Hosts = nil
cfg.registrarCCfg.Dispatchers.RefreshInterval = 2
cfg.registrarCCfg.Dispatchers.Hosts = map[string][]*RemoteHost{
"hosts": {
{
ID: "randomID",
@@ -1501,25 +1492,21 @@ func TestConfigSanityRegistrarCDispatcher(t *testing.T) {
t.Errorf("Expecting: %+q received: %+q", expected, err)
}
cfg.registrarCCfg.Dispatcher.Hosts["hosts"][0].Transport = utils.MetaJSON
expected = "<RegistrarC> missing dispatcher connection IDs"
if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected {
t.Errorf("Expecting: %+q received: %+q", expected, err)
}
cfg.registrarCCfg.Dispatchers.Hosts["hosts"][0].Transport = utils.MetaJSON
cfg.registrarCCfg.Dispatcher.RegistrarSConns = []string{utils.MetaInternal}
cfg.registrarCCfg.Dispatchers.RegistrarSConns = []string{utils.MetaInternal}
expected = "<RegistrarC> internal connection IDs are not supported"
if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected {
t.Errorf("Expecting: %+q received: %+q", expected, err)
}
cfg.registrarCCfg.Dispatcher.RegistrarSConns = []string{utils.MetaLocalHost}
cfg.registrarCCfg.Dispatchers.RegistrarSConns = []string{utils.MetaLocalHost}
expected = "<RegistrarC> connection with id: <*localhost> unsupported transport <*json>"
if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected {
t.Errorf("Expecting: %+q received: %+q", expected, err)
}
cfg.registrarCCfg.Dispatcher.RegistrarSConns = []string{"*conn1"}
cfg.registrarCCfg.Dispatchers.RegistrarSConns = []string{"*conn1"}
expected = "<RegistrarC> connection with id: <*conn1> not defined"
if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected {
t.Errorf("Expecting: %+q received: %+q", expected, err)

View File

@@ -26,8 +26,8 @@ import (
// RegistrarCCfgs is the configuration of registrarc rpc and dispatcher
type RegistrarCCfgs struct {
RPC *RegistrarCCfg
Dispatcher *RegistrarCCfg
RPC *RegistrarCCfg
Dispatchers *RegistrarCCfg
}
func (dps *RegistrarCCfgs) loadFromJSONCfg(jsnCfg *RegistrarCJsonCfgs) (err error) {
@@ -37,28 +37,27 @@ func (dps *RegistrarCCfgs) loadFromJSONCfg(jsnCfg *RegistrarCJsonCfgs) (err erro
if err = dps.RPC.loadFromJSONCfg(jsnCfg.RPC); err != nil {
return
}
return dps.Dispatcher.loadFromJSONCfg(jsnCfg.Dispatcher)
return dps.Dispatchers.loadFromJSONCfg(jsnCfg.Dispatchers)
}
// AsMapInterface returns the config as a map[string]interface{}
func (dps *RegistrarCCfgs) AsMapInterface() (initialMP map[string]interface{}) {
return map[string]interface{}{
utils.RPCCfg: dps.RPC.AsMapInterface(),
utils.DispatcherCfg: dps.Dispatcher.AsMapInterface(),
utils.DispatcherCfg: dps.Dispatchers.AsMapInterface(),
}
}
// Clone returns a deep copy of DispatcherHCfg
func (dps RegistrarCCfgs) Clone() (cln *RegistrarCCfgs) {
return &RegistrarCCfgs{
RPC: dps.RPC.Clone(),
Dispatcher: dps.Dispatcher.Clone(),
RPC: dps.RPC.Clone(),
Dispatchers: dps.Dispatchers.Clone(),
}
}
// RegistrarCCfg is the configuration of registrarc
type RegistrarCCfg struct {
Enabled bool
RegistrarSConns []string
Hosts map[string][]*RemoteHost
RefreshInterval time.Duration
@@ -68,9 +67,6 @@ func (dps *RegistrarCCfg) loadFromJSONCfg(jsnCfg *RegistrarCJsonCfg) (err error)
if jsnCfg == nil {
return nil
}
if jsnCfg.Enabled != nil {
dps.Enabled = *jsnCfg.Enabled
}
if jsnCfg.Registrars_conns != nil {
dps.RegistrarSConns = utils.CloneStringSlice(*jsnCfg.Registrars_conns)
}
@@ -94,7 +90,6 @@ func (dps *RegistrarCCfg) loadFromJSONCfg(jsnCfg *RegistrarCJsonCfg) (err error)
// AsMapInterface returns the config as a map[string]interface{}
func (dps *RegistrarCCfg) AsMapInterface() (initialMP map[string]interface{}) {
initialMP = map[string]interface{}{
utils.EnabledCfg: dps.Enabled,
utils.RegistrarsConnsCfg: utils.CloneStringSlice(dps.RegistrarSConns),
utils.RefreshIntervalCfg: dps.RefreshInterval.String(),
}
@@ -118,7 +113,6 @@ func (dps *RegistrarCCfg) AsMapInterface() (initialMP map[string]interface{}) {
// Clone returns a deep copy of DispatcherHCfg
func (dps RegistrarCCfg) Clone() (cln *RegistrarCCfg) {
cln = &RegistrarCCfg{
Enabled: dps.Enabled,
RefreshInterval: dps.RefreshInterval,
Hosts: make(map[string][]*RemoteHost),
}
@@ -136,7 +130,6 @@ func (dps RegistrarCCfg) Clone() (cln *RegistrarCCfg) {
}
type RegistrarCJsonCfg struct {
Enabled *bool
Registrars_conns *[]string
Hosts map[string][]*RemoteHostJson
Refresh_interval *string
@@ -146,9 +139,6 @@ func diffRegistrarCJsonCfg(d *RegistrarCJsonCfg, v1, v2 *RegistrarCCfg) *Registr
if d == nil {
d = new(RegistrarCJsonCfg)
}
if v1.Enabled != v2.Enabled {
d.Enabled = utils.BoolPointer(v2.Enabled)
}
if !utils.SliceStringEqual(v1.RegistrarSConns, v2.RegistrarSConns) {
d.Registrars_conns = utils.SliceStringPointer(utils.CloneStringSlice(v2.RegistrarSConns))
}
@@ -170,8 +160,8 @@ func diffRegistrarCJsonCfg(d *RegistrarCJsonCfg, v1, v2 *RegistrarCCfg) *Registr
}
type RegistrarCJsonCfgs struct {
RPC *RegistrarCJsonCfg
Dispatcher *RegistrarCJsonCfg
RPC *RegistrarCJsonCfg
Dispatchers *RegistrarCJsonCfg
}
func diffRegistrarCJsonCfgs(d *RegistrarCJsonCfgs, v1, v2 *RegistrarCCfgs) *RegistrarCJsonCfgs {
@@ -179,6 +169,6 @@ func diffRegistrarCJsonCfgs(d *RegistrarCJsonCfgs, v1, v2 *RegistrarCCfgs) *Regi
d = new(RegistrarCJsonCfgs)
}
d.RPC = diffRegistrarCJsonCfg(d.RPC, v1.RPC, v2.RPC)
d.Dispatcher = diffRegistrarCJsonCfg(d.Dispatcher, v1.Dispatcher, v2.Dispatcher)
d.Dispatchers = diffRegistrarCJsonCfg(d.Dispatchers, v1.Dispatchers, v2.Dispatchers)
return d
}

View File

@@ -29,7 +29,6 @@ import (
func TestDispatcherHCfgloadFromJsonCfg(t *testing.T) {
jsonCfg := &RegistrarCJsonCfgs{
RPC: &RegistrarCJsonCfg{
Enabled: utils.BoolPointer(true),
Registrars_conns: &[]string{"*conn1", "*conn2"},
Hosts: map[string][]*RemoteHostJson{
utils.MetaDefault: {
@@ -57,8 +56,7 @@ func TestDispatcherHCfgloadFromJsonCfg(t *testing.T) {
},
Refresh_interval: utils.StringPointer("5"),
},
Dispatcher: &RegistrarCJsonCfg{
Enabled: utils.BoolPointer(true),
Dispatchers: &RegistrarCJsonCfg{
Registrars_conns: &[]string{"*conn1", "*conn2"},
Hosts: map[string][]*RemoteHostJson{
utils.MetaDefault: {
@@ -89,7 +87,6 @@ func TestDispatcherHCfgloadFromJsonCfg(t *testing.T) {
}
expected := &RegistrarCCfgs{
RPC: &RegistrarCCfg{
Enabled: true,
RegistrarSConns: []string{"*conn1", "*conn2"},
Hosts: map[string][]*RemoteHost{
utils.MetaDefault: {
@@ -117,8 +114,7 @@ func TestDispatcherHCfgloadFromJsonCfg(t *testing.T) {
},
RefreshInterval: 5,
},
Dispatcher: &RegistrarCCfg{
Enabled: true,
Dispatchers: &RegistrarCCfg{
RegistrarSConns: []string{"*conn1", "*conn2"},
Hosts: map[string][]*RemoteHost{
utils.MetaDefault: {
@@ -159,7 +155,6 @@ func TestDispatcherHCfgAsMapInterface(t *testing.T) {
cfgJSONStr := `{
"registrarc":{
"rpc":{
"enabled": true,
"registrars_conns": ["*conn1","*conn2"],
"hosts": {
"*default": [
@@ -177,8 +172,7 @@ func TestDispatcherHCfgAsMapInterface(t *testing.T) {
},
"refresh_interval": "0",
},
"dispatcher":{
"enabled": true,
"dispatchers":{
"registrars_conns": ["*conn1","*conn2"],
"hosts": {
"*default": [
@@ -200,7 +194,6 @@ func TestDispatcherHCfgAsMapInterface(t *testing.T) {
}`
eMap := map[string]interface{}{
utils.RPCCfg: map[string]interface{}{
utils.EnabledCfg: true,
utils.RegistrarsConnsCfg: []string{"*conn1", "*conn2"},
utils.HostsCfg: map[string][]map[string]interface{}{
utils.MetaDefault: {
@@ -217,7 +210,6 @@ func TestDispatcherHCfgAsMapInterface(t *testing.T) {
utils.RefreshIntervalCfg: "0",
},
utils.DispatcherCfg: map[string]interface{}{
utils.EnabledCfg: true,
utils.RegistrarsConnsCfg: []string{"*conn1", "*conn2"},
utils.HostsCfg: map[string][]map[string]interface{}{
utils.MetaDefault: {
@@ -256,7 +248,7 @@ func TestDispatcherCfgParseWithNanoSec(t *testing.T) {
func TestDispatcherCfgParseWithNanoSec2(t *testing.T) {
jsonCfg := &RegistrarCJsonCfgs{
Dispatcher: &RegistrarCJsonCfg{
Dispatchers: &RegistrarCJsonCfg{
Refresh_interval: utils.StringPointer("1ss"),
},
}
@@ -273,13 +265,11 @@ func TestDispatcherHCfgAsMapInterface2(t *testing.T) {
}`
eMap := map[string]interface{}{
utils.DispatcherCfg: map[string]interface{}{
utils.EnabledCfg: false,
utils.RegistrarsConnsCfg: []string{},
utils.HostsCfg: map[string][]map[string]interface{}{},
utils.RefreshIntervalCfg: "5m0s",
},
utils.RPCCfg: map[string]interface{}{
utils.EnabledCfg: false,
utils.RegistrarsConnsCfg: []string{},
utils.HostsCfg: map[string][]map[string]interface{}{},
utils.RefreshIntervalCfg: "5m0s",
@@ -294,7 +284,6 @@ func TestDispatcherHCfgAsMapInterface2(t *testing.T) {
func TestDispatcherHCfgClone(t *testing.T) {
ban := &RegistrarCCfg{
Enabled: true,
RegistrarSConns: []string{"*conn1", "*conn2"},
Hosts: map[string][]*RemoteHost{
utils.MetaDefault: {
@@ -338,7 +327,6 @@ func TestDiffRegistrarCJsonCfg(t *testing.T) {
var d *RegistrarCJsonCfg
v1 := &RegistrarCCfg{
Enabled: false,
RegistrarSConns: []string{"*localhost"},
Hosts: map[string][]*RemoteHost{
"HOST_1": {
@@ -355,7 +343,6 @@ func TestDiffRegistrarCJsonCfg(t *testing.T) {
}
v2 := &RegistrarCCfg{
Enabled: true,
RegistrarSConns: []string{"*birpc"},
Hosts: map[string][]*RemoteHost{
"HOST_1": {
@@ -372,7 +359,6 @@ func TestDiffRegistrarCJsonCfg(t *testing.T) {
}
expected := &RegistrarCJsonCfg{
Enabled: utils.BoolPointer(true),
Registrars_conns: &[]string{"*birpc"},
Hosts: map[string][]*RemoteHostJson{
"HOST_1": {
@@ -418,7 +404,6 @@ func TestDiffRegistrarCJsonCfgs(t *testing.T) {
v1 := &RegistrarCCfgs{
RPC: &RegistrarCCfg{
Enabled: false,
RegistrarSConns: []string{"*localhost"},
Hosts: map[string][]*RemoteHost{
"HOST_1": {
@@ -433,8 +418,7 @@ func TestDiffRegistrarCJsonCfgs(t *testing.T) {
},
RefreshInterval: 2 * time.Second,
},
Dispatcher: &RegistrarCCfg{
Enabled: false,
Dispatchers: &RegistrarCCfg{
RegistrarSConns: []string{"*localhost"},
Hosts: map[string][]*RemoteHost{
"HOST_1": {
@@ -453,7 +437,6 @@ func TestDiffRegistrarCJsonCfgs(t *testing.T) {
v2 := &RegistrarCCfgs{
RPC: &RegistrarCCfg{
Enabled: true,
RegistrarSConns: []string{"*birpc"},
Hosts: map[string][]*RemoteHost{
"HOST_1": {
@@ -468,8 +451,7 @@ func TestDiffRegistrarCJsonCfgs(t *testing.T) {
},
RefreshInterval: 4 * time.Second,
},
Dispatcher: &RegistrarCCfg{
Enabled: true,
Dispatchers: &RegistrarCCfg{
RegistrarSConns: []string{"*birpc"},
Hosts: map[string][]*RemoteHost{
"HOST_1": {
@@ -488,7 +470,6 @@ func TestDiffRegistrarCJsonCfgs(t *testing.T) {
expected := &RegistrarCJsonCfgs{
RPC: &RegistrarCJsonCfg{
Enabled: utils.BoolPointer(true),
Registrars_conns: &[]string{"*birpc"},
Hosts: map[string][]*RemoteHostJson{
"HOST_1": {
@@ -503,8 +484,7 @@ func TestDiffRegistrarCJsonCfgs(t *testing.T) {
},
Refresh_interval: utils.StringPointer("4s"),
},
Dispatcher: &RegistrarCJsonCfg{
Enabled: utils.BoolPointer(true),
Dispatchers: &RegistrarCJsonCfg{
Registrars_conns: &[]string{"*birpc"},
Hosts: map[string][]*RemoteHostJson{
"HOST_1": {
@@ -542,7 +522,7 @@ func TestDiffRegistrarCJsonCfgs(t *testing.T) {
},
},
},
Dispatcher: &RegistrarCJsonCfg{
Dispatchers: &RegistrarCJsonCfg{
Hosts: map[string][]*RemoteHostJson{
"HOST_1": {
{

View File

@@ -1013,7 +1013,7 @@
// "hosts": {},
// "refresh_interval": "5m",
// },
// "dispatcher":{
// "dispatchers":{
// "enabled": false,
// "registrars_conns": [],
// "hosts": {},

View File

@@ -105,7 +105,7 @@
"registrarc":{
"dispatcher":{
"dispatchers":{
"enabled": true,
"registrars_conns": ["dispConn"],
"hosts": {

View File

@@ -103,7 +103,7 @@
"registrarc":{
"dispatcher":{
"dispatchers":{
"enabled": true,
"registrars_conns": ["dispConn"],
"hosts": {

View File

@@ -109,7 +109,7 @@
"registrarc":{
"dispatcher":{
"dispatchers":{
"enabled": true,
"registrars_conns": ["dispConn"],
"hosts": {

View File

@@ -107,7 +107,7 @@
"registrarc":{
"dispatcher":{
"dispatchers":{
"enabled": true,
"registrars_conns": ["dispConn"],
"hosts": {

View File

@@ -48,11 +48,11 @@ type RegistrarCService struct {
func (dhS *RegistrarCService) ListenAndServe(stopChan, rldChan <-chan struct{}) {
dTm, rTm := &time.Timer{}, &time.Timer{}
var dTmStarted, rTmStarted bool
if dTmStarted = dhS.cfg.RegistrarCCfg().Dispatcher.Enabled; dTmStarted {
dTm = time.NewTimer(dhS.cfg.RegistrarCCfg().Dispatcher.RefreshInterval)
if len(dhS.cfg.RegistrarCCfg().Dispatchers.RegistrarSConns) != 0 {
dTm = time.NewTimer(dhS.cfg.RegistrarCCfg().Dispatchers.RefreshInterval)
dhS.registerDispHosts()
}
if rTmStarted = dhS.cfg.RegistrarCCfg().RPC.Enabled; rTmStarted {
if len(dhS.cfg.RegistrarCCfg().RPC.RegistrarSConns) != 0 {
rTm = time.NewTimer(dhS.cfg.RegistrarCCfg().RPC.RefreshInterval)
dhS.registerRPCHosts()
}
@@ -65,25 +65,25 @@ func (dhS *RegistrarCService) ListenAndServe(stopChan, rldChan <-chan struct{})
if dTmStarted {
dTm.Stop()
}
if dTmStarted = dhS.cfg.RegistrarCCfg().Dispatcher.Enabled; dTmStarted {
dTm = time.NewTimer(dhS.cfg.RegistrarCCfg().Dispatcher.RefreshInterval)
if len(dhS.cfg.RegistrarCCfg().Dispatchers.RegistrarSConns) != 0 {
dTm = time.NewTimer(dhS.cfg.RegistrarCCfg().Dispatchers.RefreshInterval)
dhS.registerDispHosts()
}
if rTmStarted = dhS.cfg.RegistrarCCfg().RPC.Enabled; rTmStarted {
if len(dhS.cfg.RegistrarCCfg().RPC.RegistrarSConns) != 0 {
rTm = time.NewTimer(dhS.cfg.RegistrarCCfg().RPC.RefreshInterval)
dhS.registerRPCHosts()
}
case <-stopChan:
if dhS.cfg.RegistrarCCfg().Dispatcher.Enabled {
if len(dhS.cfg.RegistrarCCfg().Dispatchers.RegistrarSConns) != 0 {
dTm.Stop()
}
if dhS.cfg.RegistrarCCfg().RPC.Enabled {
if len(dhS.cfg.RegistrarCCfg().RPC.RegistrarSConns) != 0 {
rTm.Stop()
}
return
case <-dTm.C:
dhS.registerDispHosts()
dTm.Reset(dhS.cfg.RegistrarCCfg().Dispatcher.RefreshInterval)
dTm.Reset(dhS.cfg.RegistrarCCfg().Dispatchers.RefreshInterval)
case <-rTm.C:
dhS.registerRPCHosts()
rTm.Reset(dhS.cfg.RegistrarCCfg().RPC.RefreshInterval)
@@ -94,11 +94,11 @@ func (dhS *RegistrarCService) ListenAndServe(stopChan, rldChan <-chan struct{})
// Shutdown is called to shutdown the service
func (dhS *RegistrarCService) Shutdown() {
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.RegistrarC))
if dhS.cfg.RegistrarCCfg().Dispatcher.Enabled {
unregisterHosts(dhS.connMgr, dhS.cfg.RegistrarCCfg().Dispatcher,
if len(dhS.cfg.RegistrarCCfg().Dispatchers.RegistrarSConns) != 0 {
unregisterHosts(dhS.connMgr, dhS.cfg.RegistrarCCfg().Dispatchers,
dhS.cfg.GeneralCfg().DefaultTenant, utils.RegistrarSv1UnregisterDispatcherHosts)
}
if dhS.cfg.RegistrarCCfg().RPC.Enabled {
if len(dhS.cfg.RegistrarCCfg().RPC.RegistrarSConns) != 0 {
unregisterHosts(dhS.connMgr, dhS.cfg.RegistrarCCfg().RPC,
dhS.cfg.GeneralCfg().DefaultTenant, utils.RegistrarSv1UnregisterRPCHosts)
}
@@ -106,8 +106,8 @@ func (dhS *RegistrarCService) Shutdown() {
}
func (dhS *RegistrarCService) registerDispHosts() {
for _, connID := range dhS.cfg.RegistrarCCfg().Dispatcher.RegistrarSConns {
for tnt, hostCfgs := range dhS.cfg.RegistrarCCfg().Dispatcher.Hosts {
for _, connID := range dhS.cfg.RegistrarCCfg().Dispatchers.RegistrarSConns {
for tnt, hostCfgs := range dhS.cfg.RegistrarCCfg().Dispatchers.Hosts {
if tnt == utils.MetaDefault {
tnt = dhS.cfg.GeneralCfg().DefaultTenant
}

View File

@@ -46,8 +46,7 @@ func TestDispatcherHostsService(t *testing.T) {
Transport: rpcclient.HTTPjson,
}},
}
cfg.RegistrarCCfg().Dispatcher.Enabled = true
cfg.RegistrarCCfg().Dispatcher.Hosts = map[string][]*config.RemoteHost{
cfg.RegistrarCCfg().Dispatchers.Hosts = map[string][]*config.RemoteHost{
utils.MetaDefault: {
{
ID: "Host1",
@@ -55,8 +54,8 @@ func TestDispatcherHostsService(t *testing.T) {
},
},
}
cfg.RegistrarCCfg().Dispatcher.RefreshInterval = 100 * time.Millisecond
cfg.RegistrarCCfg().Dispatcher.RegistrarSConns = []string{"conn1"}
cfg.RegistrarCCfg().Dispatchers.RefreshInterval = 100 * time.Millisecond
cfg.RegistrarCCfg().Dispatchers.RegistrarSConns = []string{"conn1"}
ds := NewRegistrarCService(cfg, engine.NewConnManager(cfg, map[string]chan birpc.ClientConnector{}))
@@ -76,7 +75,7 @@ func TestDispatcherHostsService(t *testing.T) {
} else if !reflect.DeepEqual(host1, x) {
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(host1), utils.ToJSON(x))
}
cfg.RegistrarCCfg().Dispatcher.Hosts = map[string][]*config.RemoteHost{
cfg.RegistrarCCfg().Dispatchers.Hosts = map[string][]*config.RemoteHost{
utils.MetaDefault: {
{
ID: "Host2",
@@ -93,7 +92,7 @@ func TestDispatcherHostsService(t *testing.T) {
} else if !reflect.DeepEqual(host1, x) {
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(host1), utils.ToJSON(x))
}
unregisterHosts(ds.connMgr, cfg.RegistrarCCfg().Dispatcher, "cgrates.org", utils.RegistrarSv1UnregisterDispatcherHosts)
unregisterHosts(ds.connMgr, cfg.RegistrarCCfg().Dispatchers, "cgrates.org", utils.RegistrarSv1UnregisterDispatcherHosts)
if _, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); ok {
t.Errorf("Expected to not find Host2 in cache")
}
@@ -102,7 +101,7 @@ func TestDispatcherHostsService(t *testing.T) {
config.CgrConfig().CacheCfg().ReplicationConns = []string{}
host1.ID = "Host1"
cfg.RegistrarCCfg().Dispatcher.Hosts = map[string][]*config.RemoteHost{
cfg.RegistrarCCfg().Dispatchers.Hosts = map[string][]*config.RemoteHost{
utils.MetaDefault: {
{
ID: "Host1",
@@ -128,8 +127,6 @@ func TestDispatcherHostsService(t *testing.T) {
func TestRegistrarcListenAndServe(t *testing.T) {
//cover purposes only
cfg := config.NewDefaultCGRConfig()
cfg.RegistrarCCfg().Dispatcher.Enabled = true
cfg.RegistrarCCfg().RPC.Enabled = true
regStSrv := NewRegistrarCService(cfg, nil)
stopChan := make(chan struct{}, 1)
rldChan := make(chan struct{}, 1)
@@ -195,9 +192,7 @@ func TestRegisterRPCHosts(t *testing.T) {
func TestRegistrarcListenAndServedTmCDispatcher(t *testing.T) {
//cover purposes only
cfg := config.NewDefaultCGRConfig()
cfg.RegistrarCCfg().Dispatcher.Enabled = true
cfg.RegistrarCCfg().Dispatcher.RefreshInterval = 1
cfg.RegistrarCCfg().RPC.Enabled = true
cfg.RegistrarCCfg().Dispatchers.RefreshInterval = 1
regStSrv := NewRegistrarCService(cfg, nil)
stopChan := make(chan struct{}, 1)
rldChan := make(chan struct{}, 1)
@@ -212,8 +207,6 @@ func TestRegistrarcListenAndServedTmCDispatcher(t *testing.T) {
func TestRegistrarcListenAndServedTmCRPC(t *testing.T) {
//cover purposes only
cfg := config.NewDefaultCGRConfig()
cfg.RegistrarCCfg().Dispatcher.Enabled = true
cfg.RegistrarCCfg().RPC.Enabled = true
cfg.RegistrarCCfg().RPC.RefreshInterval = 1
regStSrv := NewRegistrarCService(cfg, nil)
stopChan := make(chan struct{}, 1)

View File

@@ -103,5 +103,11 @@ func (dspS *RegistrarCService) ServiceName() string {
// ShouldRun returns if the service should be running
func (dspS *RegistrarCService) ShouldRun() bool {
return dspS.cfg.RegistrarCCfg().Dispatcher.Enabled || dspS.cfg.RegistrarCCfg().RPC.Enabled
if len(dspS.cfg.RegistrarCCfg().RPC.RegistrarSConns) != 0 {
return true
}
if len(dspS.cfg.RegistrarCCfg().Dispatchers.RegistrarSConns) != 0 {
return true
}
return false
}

View File

@@ -91,7 +91,7 @@ func TestDispatcherHReload(t *testing.T) {
if err != nil {
t.Errorf("\nExpecting <nil>,\n Received <%+v>", err)
}
cfg.RegistrarCCfg().Dispatcher.Enabled = false
cfg.RegistrarCCfg().Dispatchers.Enabled = false
cfg.GetReloadChan(config.RegistrarCJSON) <- struct{}{}
time.Sleep(10 * time.Millisecond)
if srv.IsRunning() {

View File

@@ -2104,7 +2104,7 @@ const (
// RegistrarCCfg
const (
RPCCfg = "rpc"
DispatcherCfg = "dispatcher"
DispatcherCfg = "dispatchers"
RegistrarsConnsCfg = "registrars_conns"
HostsCfg = "hosts"
RefreshIntervalCfg = "refresh_interval"