Updated DispatcherH configuration

This commit is contained in:
Trial97
2020-08-26 10:39:17 +03:00
committed by Dan Christian Bogos
parent 7f832da40d
commit 14ddc00fb8
11 changed files with 197 additions and 134 deletions

View File

@@ -935,10 +935,8 @@ const CGRATES_CFG_JSON = `
"dispatcherh":{
"enabled": false,
"dispatchers_conns": [],
"host_ids": {},
"hosts": {},
"register_interval": "5m",
"register_transport": "*json",
"register_tls": false,
},

View File

@@ -150,13 +150,14 @@
// },
// "http": { // HTTP server configuration
// "json_rpc_url": "/jsonrpc", // JSON RPC relative URL ("" to disable)
// "ws_url": "/ws", // WebSockets relative URL ("" to disable)
// "freeswitch_cdrs_url": "/freeswitch_json", // Freeswitch CDRS relative URL ("" to disable)
// "http_cdrs": "/cdr_http", // CDRS relative URL ("" to disable)
// "use_basic_auth": false, // use basic authentication
// "auth_users": {}, // basic authentication usernames and base64-encoded passwords (eg: { "username1": "cGFzc3dvcmQ=", "username2": "cGFzc3dvcmQy "})
// "http": { // HTTP server configuration
// "json_rpc_url": "/jsonrpc", // JSON RPC relative URL ("" to disable)
// "dispatchers_registrar_url": "/dispatchers_registrar", // dispatcherH registrar service relative URL
// "ws_url": "/ws", // WebSockets relative URL ("" to disable)
// "freeswitch_cdrs_url": "/freeswitch_json", // Freeswitch CDRS relative URL ("" to disable)
// "http_cdrs": "/cdr_http", // CDRS relative URL ("" to disable)
// "use_basic_auth": false, // use basic authentication
// "auth_users": {}, // basic authentication usernames and base64-encoded passwords (eg: { "username1": "cGFzc3dvcmQ=", "username2": "cGFzc3dvcmQy "})
// },
@@ -910,6 +911,14 @@
// },
// "dispatcherh":{
// "enabled": false,
// "dispatchers_conns": [],
// "hosts": {},
// "register_interval": "5m",
// },
// "analyzers":{ // AnalyzerS config
// "enabled":false // starts AnalyzerS service: <true|false>.
// },

View File

@@ -107,10 +107,10 @@
"dispatcherh":{
"enabled": false,
"dispatchers_conns": ["dispConn"],
"host_ids": {"*default":["ALL"]},
"hosts": {
"*default":[{"ID":"ALL", "register_transport": "*json", "register_tls": false}]
},
"register_interval": "1s",
"register_transport": "*json",
"register_tls": false,
},

View File

@@ -102,13 +102,14 @@
},
"dispatcherh":{
"enabled": false,
"dispatchers_conns": ["dispConn"],
"host_ids": {"*default":["ALL"]},
"hosts": {
"*default":[{"ID":"ALL2", "register_transport": "*json", "register_tls": false}]
},
"register_interval": "1s",
"register_transport": "*json",
"register_tls": false,
},

View File

@@ -111,10 +111,10 @@
"dispatcherh":{
"enabled": false,
"dispatchers_conns": ["dispConn"],
"host_ids": {"*default":["ALL"]},
"hosts": {
"*default":[{"ID":"ALL", "register_transport": "*json", "register_tls": false}]
},
"register_interval": "1s",
"register_transport": "*json",
"register_tls": false,
},

View File

@@ -109,10 +109,10 @@
"dispatcherh":{
"enabled": false,
"dispatchers_conns": ["dispConn"],
"host_ids": {"*default":["ALL"]},
"hosts": {
"*default":[{"ID":"ALL2", "register_transport": "*json", "register_tls": false}]
},
"register_interval": "1s",
"register_transport": "*json",
"register_tls": false,
},

View File

@@ -46,12 +46,10 @@ type DispatcherHostsService struct {
}
// ListenAndServe will initialize the service
func (dhS *DispatcherHostsService) ListenAndServe() (err error) {
func (dhS *DispatcherHostsService) ListenAndServe() {
utils.Logger.Info("Starting DispatcherH service")
for {
if err = dhS.registerHosts(); err != nil {
return
}
dhS.registerHosts()
select {
case <-dhS.stop:
return
@@ -61,36 +59,26 @@ func (dhS *DispatcherHostsService) ListenAndServe() (err error) {
}
// Shutdown is called to shutdown the service
func (dhS *DispatcherHostsService) Shutdown() error {
func (dhS *DispatcherHostsService) Shutdown() {
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.DispatcherH))
dhS.unregisterHosts()
close(dhS.stop)
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.DispatcherH))
return nil
return
}
func (dhS *DispatcherHostsService) registerHosts() (err error) {
var port string
if port, err = getConnPort(dhS.cfg,
dhS.cfg.DispatcherHCfg().RegisterTransport,
dhS.cfg.DispatcherHCfg().RegisterTLS); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to get the port because : %s",
utils.DispatcherH, err))
return
}
func (dhS *DispatcherHostsService) registerHosts() {
for _, connID := range dhS.cfg.DispatcherHCfg().DispatchersConns {
for tnt, ids := range dhS.cfg.DispatcherHCfg().Hosts {
for tnt, hostCfgs := range dhS.cfg.DispatcherHCfg().Hosts {
if tnt == utils.MetaDefault {
tnt = dhS.cfg.GeneralCfg().DefaultTenant
}
args, err := NewRegisterArgs(dhS.cfg, tnt, hostCfgs)
if err != nil {
continue
}
var rply string
if err := dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1RegisterHosts, &RegisterArgs{
Tenant: tnt,
IDs: ids,
Port: port,
Transport: dhS.cfg.DispatcherHCfg().RegisterTransport,
TLS: dhS.cfg.DispatcherHCfg().RegisterTLS,
}, &rply); err != nil {
if err := dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1RegisterHosts, args, &rply); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts to the conn with ID <%s> because : %s",
utils.DispatcherH, connID, err))
continue
@@ -103,14 +91,11 @@ func (dhS *DispatcherHostsService) registerHosts() (err error) {
func (dhS *DispatcherHostsService) unregisterHosts() {
var rply string
for _, connID := range dhS.cfg.DispatcherHCfg().DispatchersConns {
for tnt, ids := range dhS.cfg.DispatcherHCfg().Hosts {
for tnt, hostCfgs := range dhS.cfg.DispatcherHCfg().Hosts {
if tnt == utils.MetaDefault {
tnt = dhS.cfg.GeneralCfg().DefaultTenant
}
if err := dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1UnregisterHosts, &UnregisterArgs{
Tenant: tnt,
IDs: ids,
}, &rply); err != nil {
if err := dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1UnregisterHosts, NewUnregisterArgs(tnt, hostCfgs), &rply); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts with tenant<%s> to the conn with ID <%s> because : %s",
utils.DispatcherH, tnt, connID, err))
continue

View File

@@ -55,16 +55,20 @@ func TestDispatcherHostsService(t *testing.T) {
}},
}
cfg.DispatcherHCfg().Enabled = true
cfg.DispatcherHCfg().Hosts = map[string][]string{utils.MetaDefault: {"Host1"}}
cfg.DispatcherHCfg().Hosts = map[string][]*config.DispatcherHRegistarCfg{
utils.MetaDefault: {
{
ID: "Host1",
RegisterTransport: utils.MetaJSON,
},
},
}
cfg.DispatcherHCfg().RegisterInterval = 100 * time.Millisecond
cfg.DispatcherHCfg().RegisterTransport = utils.MetaJSON
cfg.DispatcherHCfg().DispatchersConns = []string{"conn1"}
ds := NewDispatcherHService(cfg, engine.NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{}))
if err = ds.registerHosts(); err != nil {
t.Fatal(err)
}
ds.registerHosts()
host1 := &engine.DispatcherHost{
Tenant: "cgrates.org",
@@ -80,12 +84,17 @@ func TestDispatcherHostsService(t *testing.T) {
} else if !reflect.DeepEqual(host1, x) {
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(host1), utils.ToJSON(x))
}
cfg.DispatcherHCfg().Hosts = map[string][]string{utils.MetaDefault: {"Host2"}}
cfg.DispatcherHCfg().Hosts = map[string][]*config.DispatcherHRegistarCfg{
utils.MetaDefault: {
{
ID: "Host2",
RegisterTransport: utils.MetaJSON,
},
},
}
config.CgrConfig().CacheCfg().Partitions[utils.CacheDispatcherHosts].Replicate = true
config.CgrConfig().CacheCfg().ReplicationConns = []string{"*localhost"}
if err = ds.registerHosts(); err != nil {
t.Fatal(err)
}
ds.registerHosts()
host1.ID = "Host2"
if x, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); !ok {
t.Errorf("Expected to find Host2 in cache")
@@ -101,33 +110,23 @@ func TestDispatcherHostsService(t *testing.T) {
config.CgrConfig().CacheCfg().ReplicationConns = []string{}
host1.ID = "Host1"
cfg.DispatcherHCfg().Hosts = map[string][]string{utils.MetaDefault: {"Host1"}}
if err = ds.Shutdown(); err != nil {
t.Fatal(err)
cfg.DispatcherHCfg().Hosts = map[string][]*config.DispatcherHRegistarCfg{
utils.MetaDefault: {
{
ID: "Host1",
RegisterTransport: utils.MetaJSON,
},
},
}
ds.Shutdown()
if _, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); ok {
t.Errorf("Expected to not find Host2 in cache")
}
cfg.ListenCfg().RPCJSONListen = "2012"
if err = ds.registerHosts(); err == nil {
t.Fatal("Expected error received nil")
}
ds = NewDispatcherHService(cfg, engine.NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{}))
config.CgrConfig().CacheCfg().Partitions[utils.CacheDispatcherHosts].Replicate = true
config.CgrConfig().CacheCfg().ReplicationConns = []string{"*localhost"}
if err = ds.ListenAndServe(); err == nil {
t.Fatal("Expected error received nil")
}
config.CgrConfig().CacheCfg().Partitions[utils.CacheDispatcherHosts].Replicate = false
config.CgrConfig().CacheCfg().ReplicationConns = []string{}
cfg.ListenCfg().RPCJSONListen = "127.0.0.1:2012"
ds.registerHosts()
ds = NewDispatcherHService(cfg, engine.NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{}))
ds.Shutdown()
if err = ds.ListenAndServe(); err != nil {
t.Fatal(err)
}
ds.ListenAndServe()
}

View File

@@ -32,16 +32,82 @@ import (
"github.com/cgrates/rpcclient"
)
// NewRegisterArgs creates the arguments for register hosts API
func NewRegisterArgs(cfg *config.CGRConfig, tnt string, hostCfgs []*config.DispatcherHRegistarCfg) (rargs *RegisterArgs, err error) {
rargs = &RegisterArgs{
Tenant: tnt,
Opts: make(map[string]interface{}),
Hosts: make([]*RegisterHostCfg, len(hostCfgs)),
}
for i, hostCfg := range hostCfgs {
var port string
if port, err = getConnPort(cfg,
hostCfg.RegisterTransport,
hostCfg.RegisterTLS); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to get the port because : %s",
utils.DispatcherH, err))
return
}
rargs.Hosts[i] = &RegisterHostCfg{
ID: hostCfg.ID,
Port: port,
Transport: hostCfg.RegisterTransport,
TLS: hostCfg.RegisterTLS,
}
}
return
}
// RegisterArgs the arguments to register the dispacher host
type RegisterArgs struct {
Tenant string
Opts map[string]interface{}
IDs []string
Tenant string
Opts map[string]interface{}
Hosts []*RegisterHostCfg
}
// RegisterHostCfg the host config used to register
type RegisterHostCfg struct {
ID string
Port string
Transport string
TLS bool
}
// AsDispatcherHosts converts the arguments to DispatcherHosts
func (rargs *RegisterArgs) AsDispatcherHosts(ip string) (dHs []*engine.DispatcherHost) {
dHs = make([]*engine.DispatcherHost, len(rargs.Hosts))
for i, hCfg := range rargs.Hosts {
dHs[i] = hCfg.AsDispatcherHost(rargs.Tenant, ip)
}
return
}
// AsDispatcherHost converts the arguments to DispatcherHosts
func (rhc *RegisterHostCfg) AsDispatcherHost(tnt, ip string) *engine.DispatcherHost {
return &engine.DispatcherHost{
Tenant: tnt,
ID: rhc.ID,
Conns: []*config.RemoteHost{{
Address: ip + ":" + rhc.Port,
Transport: rhc.Transport,
TLS: rhc.TLS,
}},
}
}
// NewUnregisterArgs creates the arguments for unregister hosts API
func NewUnregisterArgs(tnt string, hostCfgs []*config.DispatcherHRegistarCfg) (uargs *UnregisterArgs) {
uargs = &UnregisterArgs{
Tenant: tnt,
Opts: make(map[string]interface{}),
IDs: make([]string, len(hostCfgs)),
}
for i, hostCfg := range hostCfgs {
uargs.IDs[i] = hostCfg.ID
}
return
}
// UnregisterArgs the arguments to unregister the dispacher host
type UnregisterArgs struct {
Tenant string
@@ -49,23 +115,6 @@ type UnregisterArgs struct {
IDs []string
}
// AsDispatcherHosts converts the arguments to DispatcherHosts
func (rargs *RegisterArgs) AsDispatcherHosts(ip string) (dHs []*engine.DispatcherHost) {
dHs = make([]*engine.DispatcherHost, len(rargs.IDs))
for i, id := range rargs.IDs {
dHs[i] = &engine.DispatcherHost{
Tenant: rargs.Tenant,
ID: id,
Conns: []*config.RemoteHost{{
Address: ip + ":" + rargs.Port,
Transport: rargs.Transport,
TLS: rargs.TLS,
}},
}
}
return
}
// Registar handdle for httpServer to register the dispatcher hosts
func Registar(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

View File

@@ -36,12 +36,22 @@ import (
func TestRegisterArgsAsDispatcherHosts(t *testing.T) {
args := &RegisterArgs{
Tenant: "cgrates.org",
IDs: []string{"Host1", "Host2"},
Opts: make(map[string]interface{}),
Port: "2012",
TLS: true,
Transport: utils.MetaJSON,
Tenant: "cgrates.org",
Hosts: []*RegisterHostCfg{
{
ID: "Host1",
Port: "2012",
TLS: true,
Transport: utils.MetaJSON,
},
{
ID: "Host2",
Port: "2013",
TLS: false,
Transport: utils.MetaGOB,
},
},
Opts: make(map[string]interface{}),
}
exp := []*engine.DispatcherHost{
{
@@ -57,9 +67,9 @@ func TestRegisterArgsAsDispatcherHosts(t *testing.T) {
Tenant: "cgrates.org",
ID: "Host2",
Conns: []*config.RemoteHost{{
Address: "127.0.0.1:2012",
TLS: true,
Transport: utils.MetaJSON,
Address: "127.0.0.1:2013",
TLS: false,
Transport: utils.MetaGOB,
}},
},
}
@@ -162,12 +172,22 @@ func TestGetRemoteIP(t *testing.T) {
func TestRegister(t *testing.T) {
ra := &RegisterArgs{
Tenant: "cgrates.org",
IDs: []string{"Host1", "Host2"},
Opts: make(map[string]interface{}),
Port: "2012",
TLS: true,
Transport: utils.MetaJSON,
Tenant: "cgrates.org",
Hosts: []*RegisterHostCfg{
{
ID: "Host1",
Port: "2012",
TLS: true,
Transport: utils.MetaJSON,
},
{
ID: "Host2",
Port: "2013",
TLS: false,
Transport: utils.MetaGOB,
},
},
Opts: make(map[string]interface{}),
}
raJSON, err := json.Marshal([]interface{}{ra})
id := json.RawMessage("1")
@@ -204,9 +224,9 @@ func TestRegister(t *testing.T) {
Tenant: "cgrates.org",
ID: "Host2",
Conns: []*config.RemoteHost{{
Address: "127.0.0.1:2012",
TLS: true,
Transport: utils.MetaJSON,
Address: "127.0.0.1:2013",
TLS: false,
Transport: utils.MetaGOB,
}},
}
@@ -319,12 +339,22 @@ func (*errRecorder) WriteHeader(statusCode int) {}
func TestRegistar(t *testing.T) {
w := httptest.NewRecorder()
ra := &RegisterArgs{
Tenant: "cgrates.org",
IDs: []string{"Host1", "Host2"},
Opts: make(map[string]interface{}),
Port: "2012",
TLS: true,
Transport: utils.MetaJSON,
Tenant: "cgrates.org",
Hosts: []*RegisterHostCfg{
{
ID: "Host1",
Port: "2012",
TLS: true,
Transport: utils.MetaJSON,
},
{
ID: "Host2",
Port: "2013",
TLS: false,
Transport: utils.MetaGOB,
},
},
Opts: make(map[string]interface{}),
}
raJSON, err := json.Marshal([]interface{}{ra})
id := json.RawMessage("1")

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package services
import (
"fmt"
"sync"
"github.com/cgrates/cgrates/config"
@@ -66,12 +65,7 @@ func (dspS *DispatcherHostsService) Start() (err error) {
defer dspS.Unlock()
dspS.dspS = dispatcherh.NewDispatcherHService(dspS.cfg, dspS.connMgr)
go func(ds *dispatcherh.DispatcherHostsService, ext chan bool) {
if err := ds.ListenAndServe(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DispatcherH, err.Error()))
ext <- true
}
}(dspS.dspS, dspS.exitChan)
go dspS.dspS.ListenAndServe()
dspS.connChan <- dspS.dspS
return
@@ -85,13 +79,11 @@ func (dspS *DispatcherHostsService) Reload() (err error) {
// Shutdown stops the service
func (dspS *DispatcherHostsService) Shutdown() (err error) {
dspS.Lock()
defer dspS.Unlock()
if err = dspS.dspS.Shutdown(); err != nil {
return
}
dspS.dspS.Shutdown()
dspS.dspS = nil
// dspS.rpc = nil
<-dspS.connChan
dspS.Unlock()
return
}