Changed dispatcher in registrarc to dispatchers

This commit is contained in:
andronache
2021-06-10 09:37:34 +03:00
committed by Dan Christian Bogos
parent e418773f73
commit 213787ee80
18 changed files with 63 additions and 63 deletions

View File

@@ -158,9 +158,9 @@ func newCGRConfig(config []byte) (cfg *CGRConfig, err error) {
cfg.dispatcherSCfg = new(DispatcherSCfg)
cfg.registrarCCfg = new(RegistrarCCfgs)
cfg.registrarCCfg.RPC = new(RegistrarCCfg)
cfg.registrarCCfg.Dispatcher = new(RegistrarCCfg)
cfg.registrarCCfg.Dispatchers = new(RegistrarCCfg)
cfg.registrarCCfg.RPC.Hosts = make(map[string][]*RemoteHost)
cfg.registrarCCfg.Dispatcher.Hosts = make(map[string][]*RemoteHost)
cfg.registrarCCfg.Dispatchers.Hosts = make(map[string][]*RemoteHost)
cfg.loaderCgrCfg = new(LoaderCgrCfg)
cfg.migratorCgrCfg = new(MigratorCgrCfg)
cfg.migratorCgrCfg.OutDataDBOpts = make(map[string]interface{})

View File

@@ -1014,7 +1014,7 @@ const CGRATES_CFG_JSON = `
"hosts": {},
"refresh_interval": "5m",
},
"dispatcher":{
"dispatchers":{
"enabled": false,
"registrars_conns": [],
"hosts": {},

File diff suppressed because one or more lines are too long

View File

@@ -1016,24 +1016,24 @@ func (cfg *CGRConfig) checkConfigSanity() error {
}
}
if cfg.registrarCCfg.Dispatcher.Enabled {
if len(cfg.registrarCCfg.Dispatcher.Hosts) == 0 {
if cfg.registrarCCfg.Dispatchers.Enabled {
if len(cfg.registrarCCfg.Dispatchers.Hosts) == 0 {
return fmt.Errorf("<%s> missing dispatcher host IDs", utils.RegistrarC)
}
if cfg.registrarCCfg.Dispatcher.RefreshInterval <= 0 {
if cfg.registrarCCfg.Dispatchers.RefreshInterval <= 0 {
return fmt.Errorf("<%s> the register imterval needs to be bigger than 0", utils.RegistrarC)
}
for tnt, hosts := range cfg.registrarCCfg.Dispatcher.Hosts {
for tnt, hosts := range cfg.registrarCCfg.Dispatchers.Hosts {
for _, host := range hosts {
if !utils.SliceHasMember([]string{utils.MetaGOB, rpcclient.HTTPjson, utils.MetaJSON, rpcclient.BiRPCJSON, rpcclient.BiRPCGOB}, host.Transport) {
return fmt.Errorf("<%s> unsupported transport <%s> for host <%s>", utils.RegistrarC, host.Transport, utils.ConcatenatedKey(tnt, host.ID))
}
}
}
if len(cfg.registrarCCfg.Dispatcher.RegistrarSConns) == 0 {
if len(cfg.registrarCCfg.Dispatchers.RegistrarSConns) == 0 {
return fmt.Errorf("<%s> missing dispatcher connection IDs", utils.RegistrarC)
}
for _, connID := range cfg.registrarCCfg.Dispatcher.RegistrarSConns {
for _, connID := range cfg.registrarCCfg.Dispatchers.RegistrarSConns {
if connID == utils.MetaInternal {
return fmt.Errorf("<%s> internal connection IDs are not supported", utils.RegistrarC)
}

View File

@@ -1519,7 +1519,7 @@ func TestConfigSanityRegistrarCRPC(t *testing.T) {
"hosts": {},
},
},
Dispatcher: &RegistrarCCfg{},
Dispatchers: &RegistrarCCfg{},
}
expected := "<RegistrarC> the register imterval needs to be bigger than 0"
@@ -1583,7 +1583,7 @@ func TestConfigSanityRegistrarCDispatcher(t *testing.T) {
cfg := NewDefaultCGRConfig()
cfg.registrarCCfg = &RegistrarCCfgs{
Dispatcher: &RegistrarCCfg{
Dispatchers: &RegistrarCCfg{
Enabled: true,
Hosts: map[string][]*RemoteHost{
"hosts": {},
@@ -1597,14 +1597,14 @@ func TestConfigSanityRegistrarCDispatcher(t *testing.T) {
t.Errorf("Expecting: %+q received: %+q", expected, err)
}
cfg.registrarCCfg.Dispatcher.Hosts = nil
cfg.registrarCCfg.Dispatchers.Hosts = nil
expected = "<RegistrarC> missing dispatcher host IDs"
if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected {
t.Errorf("Expecting: %+q received: %+q", expected, err)
}
cfg.registrarCCfg.Dispatcher.RefreshInterval = 2
cfg.registrarCCfg.Dispatcher.Hosts = map[string][]*RemoteHost{
cfg.registrarCCfg.Dispatchers.RefreshInterval = 2
cfg.registrarCCfg.Dispatchers.Hosts = map[string][]*RemoteHost{
"hosts": {
{
ID: "randomID",
@@ -1616,25 +1616,25 @@ func TestConfigSanityRegistrarCDispatcher(t *testing.T) {
t.Errorf("Expecting: %+q received: %+q", expected, err)
}
cfg.registrarCCfg.Dispatcher.Hosts["hosts"][0].Transport = utils.MetaJSON
cfg.registrarCCfg.Dispatchers.Hosts["hosts"][0].Transport = utils.MetaJSON
expected = "<RegistrarC> missing dispatcher connection IDs"
if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected {
t.Errorf("Expecting: %+q received: %+q", expected, err)
}
cfg.registrarCCfg.Dispatcher.RegistrarSConns = []string{utils.MetaInternal}
cfg.registrarCCfg.Dispatchers.RegistrarSConns = []string{utils.MetaInternal}
expected = "<RegistrarC> internal connection IDs are not supported"
if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected {
t.Errorf("Expecting: %+q received: %+q", expected, err)
}
cfg.registrarCCfg.Dispatcher.RegistrarSConns = []string{utils.MetaLocalHost}
cfg.registrarCCfg.Dispatchers.RegistrarSConns = []string{utils.MetaLocalHost}
expected = "<RegistrarC> connection with id: <*localhost> unsupported transport <*json>"
if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected {
t.Errorf("Expecting: %+q received: %+q", expected, err)
}
cfg.registrarCCfg.Dispatcher.RegistrarSConns = []string{"*conn1"}
cfg.registrarCCfg.Dispatchers.RegistrarSConns = []string{"*conn1"}
expected = "<RegistrarC> connection with id: <*conn1> not defined"
if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected {
t.Errorf("Expecting: %+q received: %+q", expected, err)

View File

@@ -544,8 +544,8 @@ type RegistrarCJsonCfg struct {
}
type RegistrarCJsonCfgs struct {
RPC *RegistrarCJsonCfg
Dispatcher *RegistrarCJsonCfg
RPC *RegistrarCJsonCfg
Dispatchers *RegistrarCJsonCfg
}
type LoaderCfgJson struct {

View File

@@ -26,8 +26,8 @@ import (
// RegistrarCCfgs is the configuration of registrarc rpc and dispatcher
type RegistrarCCfgs struct {
RPC *RegistrarCCfg
Dispatcher *RegistrarCCfg
RPC *RegistrarCCfg
Dispatchers *RegistrarCCfg
}
func (dps *RegistrarCCfgs) loadFromJSONCfg(jsnCfg *RegistrarCJsonCfgs) (err error) {
@@ -37,22 +37,22 @@ func (dps *RegistrarCCfgs) loadFromJSONCfg(jsnCfg *RegistrarCJsonCfgs) (err erro
if err = dps.RPC.loadFromJSONCfg(jsnCfg.RPC); err != nil {
return
}
return dps.Dispatcher.loadFromJSONCfg(jsnCfg.Dispatcher)
return dps.Dispatchers.loadFromJSONCfg(jsnCfg.Dispatchers)
}
// AsMapInterface returns the config as a map[string]interface{}
func (dps *RegistrarCCfgs) AsMapInterface() (initialMP map[string]interface{}) {
return map[string]interface{}{
utils.RPCCfg: dps.RPC.AsMapInterface(),
utils.DispatcherCfg: dps.Dispatcher.AsMapInterface(),
utils.DispatcherCfg: dps.Dispatchers.AsMapInterface(),
}
}
// Clone returns a deep copy of DispatcherHCfg
func (dps RegistrarCCfgs) Clone() (cln *RegistrarCCfgs) {
return &RegistrarCCfgs{
RPC: dps.RPC.Clone(),
Dispatcher: dps.Dispatcher.Clone(),
RPC: dps.RPC.Clone(),
Dispatchers: dps.Dispatchers.Clone(),
}
}

View File

@@ -56,7 +56,7 @@ func TestDispatcherHCfgloadFromJsonCfg(t *testing.T) {
},
Refresh_interval: utils.StringPointer("5"),
},
Dispatcher: &RegistrarCJsonCfg{
Dispatchers: &RegistrarCJsonCfg{
Enabled: utils.BoolPointer(true),
Registrars_conns: &[]string{"*conn1", "*conn2"},
Hosts: map[string][]*RemoteHostJson{
@@ -116,7 +116,7 @@ func TestDispatcherHCfgloadFromJsonCfg(t *testing.T) {
},
RefreshInterval: 5,
},
Dispatcher: &RegistrarCCfg{
Dispatchers: &RegistrarCCfg{
Enabled: true,
RegistrarSConns: []string{"*conn1", "*conn2"},
Hosts: map[string][]*RemoteHost{
@@ -176,7 +176,7 @@ func TestDispatcherHCfgAsMapInterface(t *testing.T) {
},
"refresh_interval": "0",
},
"dispatcher":{
"dispatchers":{
"enabled": true,
"registrars_conns": ["*conn1","*conn2"],
"hosts": {
@@ -255,7 +255,7 @@ func TestDispatcherCfgParseWithNanoSec(t *testing.T) {
func TestDispatcherCfgParseWithNanoSec2(t *testing.T) {
jsonCfg := &RegistrarCJsonCfgs{
Dispatcher: &RegistrarCJsonCfg{
Dispatchers: &RegistrarCJsonCfg{
Refresh_interval: utils.StringPointer("1ss"),
},
}

View File

@@ -990,7 +990,7 @@
// "hosts": {},
// "refresh_interval": "5m",
// },
// "dispatcher":{
// "dispatchers":{
// "enabled": false,
// "registrars_conns": [],
// "hosts": {},

View File

@@ -99,7 +99,7 @@
"registrarc":{
"dispatcher":{
"dispatchers":{
"enabled": true,
"registrars_conns": ["dispConn"],
"hosts": {

View File

@@ -98,7 +98,7 @@
"registrarc":{
"dispatcher":{
"dispatchers":{
"enabled": true,
"registrars_conns": ["dispConn"],
"hosts": {

View File

@@ -104,7 +104,7 @@
"registrarc":{
"dispatcher":{
"dispatchers":{
"enabled": true,
"registrars_conns": ["dispConn"],
"hosts": {

View File

@@ -102,7 +102,7 @@
"registrarc":{
"dispatcher":{
"dispatchers":{
"enabled": true,
"registrars_conns": ["dispConn"],
"hosts": {

View File

@@ -47,8 +47,8 @@ type RegistrarCService struct {
func (dhS *RegistrarCService) ListenAndServe(stopChan, rldChan <-chan struct{}) {
dTm, rTm := &time.Timer{}, &time.Timer{}
var dTmStarted, rTmStarted bool
if dTmStarted = dhS.cfg.RegistrarCCfg().Dispatcher.Enabled; dTmStarted {
dTm = time.NewTimer(dhS.cfg.RegistrarCCfg().Dispatcher.RefreshInterval)
if dTmStarted = dhS.cfg.RegistrarCCfg().Dispatchers.Enabled; dTmStarted {
dTm = time.NewTimer(dhS.cfg.RegistrarCCfg().Dispatchers.RefreshInterval)
dhS.registerDispHosts()
}
if rTmStarted = dhS.cfg.RegistrarCCfg().RPC.Enabled; rTmStarted {
@@ -64,8 +64,8 @@ func (dhS *RegistrarCService) ListenAndServe(stopChan, rldChan <-chan struct{})
if dTmStarted {
dTm.Stop()
}
if dTmStarted = dhS.cfg.RegistrarCCfg().Dispatcher.Enabled; dTmStarted {
dTm = time.NewTimer(dhS.cfg.RegistrarCCfg().Dispatcher.RefreshInterval)
if dTmStarted = dhS.cfg.RegistrarCCfg().Dispatchers.Enabled; dTmStarted {
dTm = time.NewTimer(dhS.cfg.RegistrarCCfg().Dispatchers.RefreshInterval)
dhS.registerDispHosts()
}
if rTmStarted = dhS.cfg.RegistrarCCfg().RPC.Enabled; rTmStarted {
@@ -73,7 +73,7 @@ func (dhS *RegistrarCService) ListenAndServe(stopChan, rldChan <-chan struct{})
dhS.registerRPCHosts()
}
case <-stopChan:
if dhS.cfg.RegistrarCCfg().Dispatcher.Enabled {
if dhS.cfg.RegistrarCCfg().Dispatchers.Enabled {
dTm.Stop()
}
if dhS.cfg.RegistrarCCfg().RPC.Enabled {
@@ -82,7 +82,7 @@ func (dhS *RegistrarCService) ListenAndServe(stopChan, rldChan <-chan struct{})
return
case <-dTm.C:
dhS.registerDispHosts()
dTm.Reset(dhS.cfg.RegistrarCCfg().Dispatcher.RefreshInterval)
dTm.Reset(dhS.cfg.RegistrarCCfg().Dispatchers.RefreshInterval)
case <-rTm.C:
dhS.registerRPCHosts()
rTm.Reset(dhS.cfg.RegistrarCCfg().RPC.RefreshInterval)
@@ -93,8 +93,8 @@ func (dhS *RegistrarCService) ListenAndServe(stopChan, rldChan <-chan struct{})
// Shutdown is called to shutdown the service
func (dhS *RegistrarCService) Shutdown() {
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.RegistrarC))
if dhS.cfg.RegistrarCCfg().Dispatcher.Enabled {
unregisterHosts(dhS.connMgr, dhS.cfg.RegistrarCCfg().Dispatcher,
if dhS.cfg.RegistrarCCfg().Dispatchers.Enabled {
unregisterHosts(dhS.connMgr, dhS.cfg.RegistrarCCfg().Dispatchers,
dhS.cfg.GeneralCfg().DefaultTenant, utils.RegistrarSv1UnregisterDispatcherHosts)
}
if dhS.cfg.RegistrarCCfg().RPC.Enabled {
@@ -105,8 +105,8 @@ func (dhS *RegistrarCService) Shutdown() {
}
func (dhS *RegistrarCService) registerDispHosts() {
for _, connID := range dhS.cfg.RegistrarCCfg().Dispatcher.RegistrarSConns {
for tnt, hostCfgs := range dhS.cfg.RegistrarCCfg().Dispatcher.Hosts {
for _, connID := range dhS.cfg.RegistrarCCfg().Dispatchers.RegistrarSConns {
for tnt, hostCfgs := range dhS.cfg.RegistrarCCfg().Dispatchers.Hosts {
if tnt == utils.MetaDefault {
tnt = dhS.cfg.GeneralCfg().DefaultTenant
}

View File

@@ -45,8 +45,8 @@ func TestDispatcherHostsService(t *testing.T) {
Transport: rpcclient.HTTPjson,
}},
}
cfg.RegistrarCCfg().Dispatcher.Enabled = true
cfg.RegistrarCCfg().Dispatcher.Hosts = map[string][]*config.RemoteHost{
cfg.RegistrarCCfg().Dispatchers.Enabled = true
cfg.RegistrarCCfg().Dispatchers.Hosts = map[string][]*config.RemoteHost{
utils.MetaDefault: {
{
ID: "Host1",
@@ -54,8 +54,8 @@ func TestDispatcherHostsService(t *testing.T) {
},
},
}
cfg.RegistrarCCfg().Dispatcher.RefreshInterval = 100 * time.Millisecond
cfg.RegistrarCCfg().Dispatcher.RegistrarSConns = []string{"conn1"}
cfg.RegistrarCCfg().Dispatchers.RefreshInterval = 100 * time.Millisecond
cfg.RegistrarCCfg().Dispatchers.RegistrarSConns = []string{"conn1"}
ds := NewRegistrarCService(cfg, engine.NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{}))
@@ -75,7 +75,7 @@ func TestDispatcherHostsService(t *testing.T) {
} else if !reflect.DeepEqual(host1, x) {
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(host1), utils.ToJSON(x))
}
cfg.RegistrarCCfg().Dispatcher.Hosts = map[string][]*config.RemoteHost{
cfg.RegistrarCCfg().Dispatchers.Hosts = map[string][]*config.RemoteHost{
utils.MetaDefault: {
{
ID: "Host2",
@@ -92,7 +92,7 @@ func TestDispatcherHostsService(t *testing.T) {
} else if !reflect.DeepEqual(host1, x) {
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(host1), utils.ToJSON(x))
}
unregisterHosts(ds.connMgr, cfg.RegistrarCCfg().Dispatcher, "cgrates.org", utils.RegistrarSv1UnregisterDispatcherHosts)
unregisterHosts(ds.connMgr, cfg.RegistrarCCfg().Dispatchers, "cgrates.org", utils.RegistrarSv1UnregisterDispatcherHosts)
if _, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); ok {
t.Errorf("Expected to not find Host2 in cache")
}
@@ -101,7 +101,7 @@ func TestDispatcherHostsService(t *testing.T) {
config.CgrConfig().CacheCfg().ReplicationConns = []string{}
host1.ID = "Host1"
cfg.RegistrarCCfg().Dispatcher.Hosts = map[string][]*config.RemoteHost{
cfg.RegistrarCCfg().Dispatchers.Hosts = map[string][]*config.RemoteHost{
utils.MetaDefault: {
{
ID: "Host1",
@@ -127,7 +127,7 @@ func TestDispatcherHostsService(t *testing.T) {
func TestRegistrarcListenAndServe(t *testing.T) {
//cover purposes only
cfg := config.NewDefaultCGRConfig()
cfg.RegistrarCCfg().Dispatcher.Enabled = true
cfg.RegistrarCCfg().Dispatchers.Enabled = true
cfg.RegistrarCCfg().RPC.Enabled = true
regStSrv := NewRegistrarCService(cfg, nil)
stopChan := make(chan struct{}, 1)
@@ -194,8 +194,8 @@ func TestRegisterRPCHosts(t *testing.T) {
func TestRegistrarcListenAndServedTmCDispatcher(t *testing.T) {
//cover purposes only
cfg := config.NewDefaultCGRConfig()
cfg.RegistrarCCfg().Dispatcher.Enabled = true
cfg.RegistrarCCfg().Dispatcher.RefreshInterval = 1
cfg.RegistrarCCfg().Dispatchers.Enabled = true
cfg.RegistrarCCfg().Dispatchers.RefreshInterval = 1
cfg.RegistrarCCfg().RPC.Enabled = true
regStSrv := NewRegistrarCService(cfg, nil)
stopChan := make(chan struct{}, 1)
@@ -211,7 +211,7 @@ func TestRegistrarcListenAndServedTmCDispatcher(t *testing.T) {
func TestRegistrarcListenAndServedTmCRPC(t *testing.T) {
//cover purposes only
cfg := config.NewDefaultCGRConfig()
cfg.RegistrarCCfg().Dispatcher.Enabled = true
cfg.RegistrarCCfg().Dispatchers.Enabled = true
cfg.RegistrarCCfg().RPC.Enabled = true
cfg.RegistrarCCfg().RPC.RefreshInterval = 1
regStSrv := NewRegistrarCService(cfg, nil)

View File

@@ -103,5 +103,5 @@ func (dspS *RegistrarCService) ServiceName() string {
// ShouldRun returns if the service should be running
func (dspS *RegistrarCService) ShouldRun() bool {
return dspS.cfg.RegistrarCCfg().Dispatcher.Enabled || dspS.cfg.RegistrarCCfg().RPC.Enabled
return dspS.cfg.RegistrarCCfg().Dispatchers.Enabled || dspS.cfg.RegistrarCCfg().RPC.Enabled
}

View File

@@ -89,7 +89,7 @@ func TestDispatcherHReload(t *testing.T) {
if err != nil {
t.Errorf("\nExpecting <nil>,\n Received <%+v>", err)
}
cfg.RegistrarCCfg().Dispatcher.Enabled = false
cfg.RegistrarCCfg().Dispatchers.Enabled = false
cfg.GetReloadChan(config.RegistrarCJson) <- struct{}{}
time.Sleep(10 * time.Millisecond)
if srv.IsRunning() {

View File

@@ -2364,7 +2364,7 @@ const (
// RegistrarCCfg
const (
RPCCfg = "rpc"
DispatcherCfg = "dispatcher"
DispatcherCfg = "dispatchers"
RegistrarsConnsCfg = "registrars_conns"
HostsCfg = "hosts"
RefreshIntervalCfg = "refresh_interval"