From 14ddc00fb86a61ed692134c45ee047cd9468d2eb Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 26 Aug 2020 10:39:17 +0300 Subject: [PATCH] Updated DispatcherH configuration --- config/config_defaults.go | 4 +- data/conf/cgrates/cgrates.json | 23 +++-- .../dispatcherh/all2_mongo/cgrates.json | 6 +- .../dispatcherh/all2_mysql/cgrates.json | 7 +- .../dispatcherh/all_mongo/cgrates.json | 6 +- .../dispatcherh/all_mysql/cgrates.json | 6 +- dispatcherh/dispatcherh.go | 41 +++------ dispatcherh/dispatcherh_test.go | 57 ++++++------ dispatcherh/libdispatcherh.go | 89 ++++++++++++++----- dispatcherh/libdispatcherh_test.go | 78 +++++++++++----- services/dispatcherh.go | 14 +-- 11 files changed, 197 insertions(+), 134 deletions(-) diff --git a/config/config_defaults.go b/config/config_defaults.go index 481f961f2..64333bf06 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -935,10 +935,8 @@ const CGRATES_CFG_JSON = ` "dispatcherh":{ "enabled": false, "dispatchers_conns": [], - "host_ids": {}, + "hosts": {}, "register_interval": "5m", - "register_transport": "*json", - "register_tls": false, }, diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index ec376705a..a74f9e95e 100755 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -150,13 +150,14 @@ // }, -// "http": { // HTTP server configuration -// "json_rpc_url": "/jsonrpc", // JSON RPC relative URL ("" to disable) -// "ws_url": "/ws", // WebSockets relative URL ("" to disable) -// "freeswitch_cdrs_url": "/freeswitch_json", // Freeswitch CDRS relative URL ("" to disable) -// "http_cdrs": "/cdr_http", // CDRS relative URL ("" to disable) -// "use_basic_auth": false, // use basic authentication -// "auth_users": {}, // basic authentication usernames and base64-encoded passwords (eg: { "username1": "cGFzc3dvcmQ=", "username2": "cGFzc3dvcmQy "}) +// "http": { // HTTP server configuration +// "json_rpc_url": "/jsonrpc", // JSON RPC relative URL ("" to disable) +// "dispatchers_registrar_url": "/dispatchers_registrar", // dispatcherH registrar service relative URL +// "ws_url": "/ws", // WebSockets relative URL ("" to disable) +// "freeswitch_cdrs_url": "/freeswitch_json", // Freeswitch CDRS relative URL ("" to disable) +// "http_cdrs": "/cdr_http", // CDRS relative URL ("" to disable) +// "use_basic_auth": false, // use basic authentication +// "auth_users": {}, // basic authentication usernames and base64-encoded passwords (eg: { "username1": "cGFzc3dvcmQ=", "username2": "cGFzc3dvcmQy "}) // }, @@ -910,6 +911,14 @@ // }, +// "dispatcherh":{ +// "enabled": false, +// "dispatchers_conns": [], +// "hosts": {}, +// "register_interval": "5m", +// }, + + // "analyzers":{ // AnalyzerS config // "enabled":false // starts AnalyzerS service: . // }, diff --git a/data/conf/samples/dispatcherh/all2_mongo/cgrates.json b/data/conf/samples/dispatcherh/all2_mongo/cgrates.json index 38d02a909..915d76308 100644 --- a/data/conf/samples/dispatcherh/all2_mongo/cgrates.json +++ b/data/conf/samples/dispatcherh/all2_mongo/cgrates.json @@ -107,10 +107,10 @@ "dispatcherh":{ "enabled": false, "dispatchers_conns": ["dispConn"], - "host_ids": {"*default":["ALL"]}, + "hosts": { + "*default":[{"ID":"ALL", "register_transport": "*json", "register_tls": false}] + }, "register_interval": "1s", - "register_transport": "*json", - "register_tls": false, }, diff --git a/data/conf/samples/dispatcherh/all2_mysql/cgrates.json b/data/conf/samples/dispatcherh/all2_mysql/cgrates.json index d26a530c6..896f46033 100644 --- a/data/conf/samples/dispatcherh/all2_mysql/cgrates.json +++ b/data/conf/samples/dispatcherh/all2_mysql/cgrates.json @@ -102,13 +102,14 @@ }, + "dispatcherh":{ "enabled": false, "dispatchers_conns": ["dispConn"], - "host_ids": {"*default":["ALL"]}, + "hosts": { + "*default":[{"ID":"ALL2", "register_transport": "*json", "register_tls": false}] + }, "register_interval": "1s", - "register_transport": "*json", - "register_tls": false, }, diff --git a/data/conf/samples/dispatcherh/all_mongo/cgrates.json b/data/conf/samples/dispatcherh/all_mongo/cgrates.json index 4319740e4..437841e99 100644 --- a/data/conf/samples/dispatcherh/all_mongo/cgrates.json +++ b/data/conf/samples/dispatcherh/all_mongo/cgrates.json @@ -111,10 +111,10 @@ "dispatcherh":{ "enabled": false, "dispatchers_conns": ["dispConn"], - "host_ids": {"*default":["ALL"]}, + "hosts": { + "*default":[{"ID":"ALL", "register_transport": "*json", "register_tls": false}] + }, "register_interval": "1s", - "register_transport": "*json", - "register_tls": false, }, diff --git a/data/conf/samples/dispatcherh/all_mysql/cgrates.json b/data/conf/samples/dispatcherh/all_mysql/cgrates.json index 8304276a6..04250527c 100644 --- a/data/conf/samples/dispatcherh/all_mysql/cgrates.json +++ b/data/conf/samples/dispatcherh/all_mysql/cgrates.json @@ -109,10 +109,10 @@ "dispatcherh":{ "enabled": false, "dispatchers_conns": ["dispConn"], - "host_ids": {"*default":["ALL"]}, + "hosts": { + "*default":[{"ID":"ALL2", "register_transport": "*json", "register_tls": false}] + }, "register_interval": "1s", - "register_transport": "*json", - "register_tls": false, }, diff --git a/dispatcherh/dispatcherh.go b/dispatcherh/dispatcherh.go index 9213bf650..b19b968b6 100644 --- a/dispatcherh/dispatcherh.go +++ b/dispatcherh/dispatcherh.go @@ -46,12 +46,10 @@ type DispatcherHostsService struct { } // ListenAndServe will initialize the service -func (dhS *DispatcherHostsService) ListenAndServe() (err error) { +func (dhS *DispatcherHostsService) ListenAndServe() { utils.Logger.Info("Starting DispatcherH service") for { - if err = dhS.registerHosts(); err != nil { - return - } + dhS.registerHosts() select { case <-dhS.stop: return @@ -61,36 +59,26 @@ func (dhS *DispatcherHostsService) ListenAndServe() (err error) { } // Shutdown is called to shutdown the service -func (dhS *DispatcherHostsService) Shutdown() error { +func (dhS *DispatcherHostsService) Shutdown() { utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.DispatcherH)) dhS.unregisterHosts() close(dhS.stop) utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.DispatcherH)) - return nil + return } -func (dhS *DispatcherHostsService) registerHosts() (err error) { - var port string - if port, err = getConnPort(dhS.cfg, - dhS.cfg.DispatcherHCfg().RegisterTransport, - dhS.cfg.DispatcherHCfg().RegisterTLS); err != nil { - utils.Logger.Warning(fmt.Sprintf("<%s> Unable to get the port because : %s", - utils.DispatcherH, err)) - return - } +func (dhS *DispatcherHostsService) registerHosts() { for _, connID := range dhS.cfg.DispatcherHCfg().DispatchersConns { - for tnt, ids := range dhS.cfg.DispatcherHCfg().Hosts { + for tnt, hostCfgs := range dhS.cfg.DispatcherHCfg().Hosts { if tnt == utils.MetaDefault { tnt = dhS.cfg.GeneralCfg().DefaultTenant } + args, err := NewRegisterArgs(dhS.cfg, tnt, hostCfgs) + if err != nil { + continue + } var rply string - if err := dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1RegisterHosts, &RegisterArgs{ - Tenant: tnt, - IDs: ids, - Port: port, - Transport: dhS.cfg.DispatcherHCfg().RegisterTransport, - TLS: dhS.cfg.DispatcherHCfg().RegisterTLS, - }, &rply); err != nil { + if err := dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1RegisterHosts, args, &rply); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts to the conn with ID <%s> because : %s", utils.DispatcherH, connID, err)) continue @@ -103,14 +91,11 @@ func (dhS *DispatcherHostsService) registerHosts() (err error) { func (dhS *DispatcherHostsService) unregisterHosts() { var rply string for _, connID := range dhS.cfg.DispatcherHCfg().DispatchersConns { - for tnt, ids := range dhS.cfg.DispatcherHCfg().Hosts { + for tnt, hostCfgs := range dhS.cfg.DispatcherHCfg().Hosts { if tnt == utils.MetaDefault { tnt = dhS.cfg.GeneralCfg().DefaultTenant } - if err := dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1UnregisterHosts, &UnregisterArgs{ - Tenant: tnt, - IDs: ids, - }, &rply); err != nil { + if err := dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1UnregisterHosts, NewUnregisterArgs(tnt, hostCfgs), &rply); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts with tenant<%s> to the conn with ID <%s> because : %s", utils.DispatcherH, tnt, connID, err)) continue diff --git a/dispatcherh/dispatcherh_test.go b/dispatcherh/dispatcherh_test.go index f68802325..7db8a2d08 100644 --- a/dispatcherh/dispatcherh_test.go +++ b/dispatcherh/dispatcherh_test.go @@ -55,16 +55,20 @@ func TestDispatcherHostsService(t *testing.T) { }}, } cfg.DispatcherHCfg().Enabled = true - cfg.DispatcherHCfg().Hosts = map[string][]string{utils.MetaDefault: {"Host1"}} + cfg.DispatcherHCfg().Hosts = map[string][]*config.DispatcherHRegistarCfg{ + utils.MetaDefault: { + { + ID: "Host1", + RegisterTransport: utils.MetaJSON, + }, + }, + } cfg.DispatcherHCfg().RegisterInterval = 100 * time.Millisecond - cfg.DispatcherHCfg().RegisterTransport = utils.MetaJSON cfg.DispatcherHCfg().DispatchersConns = []string{"conn1"} ds := NewDispatcherHService(cfg, engine.NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{})) - if err = ds.registerHosts(); err != nil { - t.Fatal(err) - } + ds.registerHosts() host1 := &engine.DispatcherHost{ Tenant: "cgrates.org", @@ -80,12 +84,17 @@ func TestDispatcherHostsService(t *testing.T) { } else if !reflect.DeepEqual(host1, x) { t.Errorf("Expected: %s ,received: %s", utils.ToJSON(host1), utils.ToJSON(x)) } - cfg.DispatcherHCfg().Hosts = map[string][]string{utils.MetaDefault: {"Host2"}} + cfg.DispatcherHCfg().Hosts = map[string][]*config.DispatcherHRegistarCfg{ + utils.MetaDefault: { + { + ID: "Host2", + RegisterTransport: utils.MetaJSON, + }, + }, + } config.CgrConfig().CacheCfg().Partitions[utils.CacheDispatcherHosts].Replicate = true config.CgrConfig().CacheCfg().ReplicationConns = []string{"*localhost"} - if err = ds.registerHosts(); err != nil { - t.Fatal(err) - } + ds.registerHosts() host1.ID = "Host2" if x, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); !ok { t.Errorf("Expected to find Host2 in cache") @@ -101,33 +110,23 @@ func TestDispatcherHostsService(t *testing.T) { config.CgrConfig().CacheCfg().ReplicationConns = []string{} host1.ID = "Host1" - cfg.DispatcherHCfg().Hosts = map[string][]string{utils.MetaDefault: {"Host1"}} - if err = ds.Shutdown(); err != nil { - t.Fatal(err) + cfg.DispatcherHCfg().Hosts = map[string][]*config.DispatcherHRegistarCfg{ + utils.MetaDefault: { + { + ID: "Host1", + RegisterTransport: utils.MetaJSON, + }, + }, } + ds.Shutdown() if _, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); ok { t.Errorf("Expected to not find Host2 in cache") } cfg.ListenCfg().RPCJSONListen = "2012" - if err = ds.registerHosts(); err == nil { - t.Fatal("Expected error received nil") - } - - ds = NewDispatcherHService(cfg, engine.NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{})) - config.CgrConfig().CacheCfg().Partitions[utils.CacheDispatcherHosts].Replicate = true - config.CgrConfig().CacheCfg().ReplicationConns = []string{"*localhost"} - if err = ds.ListenAndServe(); err == nil { - t.Fatal("Expected error received nil") - } - - config.CgrConfig().CacheCfg().Partitions[utils.CacheDispatcherHosts].Replicate = false - config.CgrConfig().CacheCfg().ReplicationConns = []string{} - cfg.ListenCfg().RPCJSONListen = "127.0.0.1:2012" + ds.registerHosts() ds = NewDispatcherHService(cfg, engine.NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{})) ds.Shutdown() - if err = ds.ListenAndServe(); err != nil { - t.Fatal(err) - } + ds.ListenAndServe() } diff --git a/dispatcherh/libdispatcherh.go b/dispatcherh/libdispatcherh.go index 15f5565bb..86cfa487e 100644 --- a/dispatcherh/libdispatcherh.go +++ b/dispatcherh/libdispatcherh.go @@ -32,16 +32,82 @@ import ( "github.com/cgrates/rpcclient" ) +// NewRegisterArgs creates the arguments for register hosts API +func NewRegisterArgs(cfg *config.CGRConfig, tnt string, hostCfgs []*config.DispatcherHRegistarCfg) (rargs *RegisterArgs, err error) { + rargs = &RegisterArgs{ + Tenant: tnt, + Opts: make(map[string]interface{}), + Hosts: make([]*RegisterHostCfg, len(hostCfgs)), + } + for i, hostCfg := range hostCfgs { + var port string + if port, err = getConnPort(cfg, + hostCfg.RegisterTransport, + hostCfg.RegisterTLS); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Unable to get the port because : %s", + utils.DispatcherH, err)) + return + } + rargs.Hosts[i] = &RegisterHostCfg{ + ID: hostCfg.ID, + Port: port, + Transport: hostCfg.RegisterTransport, + TLS: hostCfg.RegisterTLS, + } + } + return +} + // RegisterArgs the arguments to register the dispacher host type RegisterArgs struct { - Tenant string - Opts map[string]interface{} - IDs []string + Tenant string + Opts map[string]interface{} + Hosts []*RegisterHostCfg +} + +// RegisterHostCfg the host config used to register +type RegisterHostCfg struct { + ID string Port string Transport string TLS bool } +// AsDispatcherHosts converts the arguments to DispatcherHosts +func (rargs *RegisterArgs) AsDispatcherHosts(ip string) (dHs []*engine.DispatcherHost) { + dHs = make([]*engine.DispatcherHost, len(rargs.Hosts)) + for i, hCfg := range rargs.Hosts { + dHs[i] = hCfg.AsDispatcherHost(rargs.Tenant, ip) + } + return +} + +// AsDispatcherHost converts the arguments to DispatcherHosts +func (rhc *RegisterHostCfg) AsDispatcherHost(tnt, ip string) *engine.DispatcherHost { + return &engine.DispatcherHost{ + Tenant: tnt, + ID: rhc.ID, + Conns: []*config.RemoteHost{{ + Address: ip + ":" + rhc.Port, + Transport: rhc.Transport, + TLS: rhc.TLS, + }}, + } +} + +// NewUnregisterArgs creates the arguments for unregister hosts API +func NewUnregisterArgs(tnt string, hostCfgs []*config.DispatcherHRegistarCfg) (uargs *UnregisterArgs) { + uargs = &UnregisterArgs{ + Tenant: tnt, + Opts: make(map[string]interface{}), + IDs: make([]string, len(hostCfgs)), + } + for i, hostCfg := range hostCfgs { + uargs.IDs[i] = hostCfg.ID + } + return +} + // UnregisterArgs the arguments to unregister the dispacher host type UnregisterArgs struct { Tenant string @@ -49,23 +115,6 @@ type UnregisterArgs struct { IDs []string } -// AsDispatcherHosts converts the arguments to DispatcherHosts -func (rargs *RegisterArgs) AsDispatcherHosts(ip string) (dHs []*engine.DispatcherHost) { - dHs = make([]*engine.DispatcherHost, len(rargs.IDs)) - for i, id := range rargs.IDs { - dHs[i] = &engine.DispatcherHost{ - Tenant: rargs.Tenant, - ID: id, - Conns: []*config.RemoteHost{{ - Address: ip + ":" + rargs.Port, - Transport: rargs.Transport, - TLS: rargs.TLS, - }}, - } - } - return -} - // Registar handdle for httpServer to register the dispatcher hosts func Registar(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() diff --git a/dispatcherh/libdispatcherh_test.go b/dispatcherh/libdispatcherh_test.go index 80233d8a8..50d2d2eaa 100644 --- a/dispatcherh/libdispatcherh_test.go +++ b/dispatcherh/libdispatcherh_test.go @@ -36,12 +36,22 @@ import ( func TestRegisterArgsAsDispatcherHosts(t *testing.T) { args := &RegisterArgs{ - Tenant: "cgrates.org", - IDs: []string{"Host1", "Host2"}, - Opts: make(map[string]interface{}), - Port: "2012", - TLS: true, - Transport: utils.MetaJSON, + Tenant: "cgrates.org", + Hosts: []*RegisterHostCfg{ + { + ID: "Host1", + Port: "2012", + TLS: true, + Transport: utils.MetaJSON, + }, + { + ID: "Host2", + Port: "2013", + TLS: false, + Transport: utils.MetaGOB, + }, + }, + Opts: make(map[string]interface{}), } exp := []*engine.DispatcherHost{ { @@ -57,9 +67,9 @@ func TestRegisterArgsAsDispatcherHosts(t *testing.T) { Tenant: "cgrates.org", ID: "Host2", Conns: []*config.RemoteHost{{ - Address: "127.0.0.1:2012", - TLS: true, - Transport: utils.MetaJSON, + Address: "127.0.0.1:2013", + TLS: false, + Transport: utils.MetaGOB, }}, }, } @@ -162,12 +172,22 @@ func TestGetRemoteIP(t *testing.T) { func TestRegister(t *testing.T) { ra := &RegisterArgs{ - Tenant: "cgrates.org", - IDs: []string{"Host1", "Host2"}, - Opts: make(map[string]interface{}), - Port: "2012", - TLS: true, - Transport: utils.MetaJSON, + Tenant: "cgrates.org", + Hosts: []*RegisterHostCfg{ + { + ID: "Host1", + Port: "2012", + TLS: true, + Transport: utils.MetaJSON, + }, + { + ID: "Host2", + Port: "2013", + TLS: false, + Transport: utils.MetaGOB, + }, + }, + Opts: make(map[string]interface{}), } raJSON, err := json.Marshal([]interface{}{ra}) id := json.RawMessage("1") @@ -204,9 +224,9 @@ func TestRegister(t *testing.T) { Tenant: "cgrates.org", ID: "Host2", Conns: []*config.RemoteHost{{ - Address: "127.0.0.1:2012", - TLS: true, - Transport: utils.MetaJSON, + Address: "127.0.0.1:2013", + TLS: false, + Transport: utils.MetaGOB, }}, } @@ -319,12 +339,22 @@ func (*errRecorder) WriteHeader(statusCode int) {} func TestRegistar(t *testing.T) { w := httptest.NewRecorder() ra := &RegisterArgs{ - Tenant: "cgrates.org", - IDs: []string{"Host1", "Host2"}, - Opts: make(map[string]interface{}), - Port: "2012", - TLS: true, - Transport: utils.MetaJSON, + Tenant: "cgrates.org", + Hosts: []*RegisterHostCfg{ + { + ID: "Host1", + Port: "2012", + TLS: true, + Transport: utils.MetaJSON, + }, + { + ID: "Host2", + Port: "2013", + TLS: false, + Transport: utils.MetaGOB, + }, + }, + Opts: make(map[string]interface{}), } raJSON, err := json.Marshal([]interface{}{ra}) id := json.RawMessage("1") diff --git a/services/dispatcherh.go b/services/dispatcherh.go index 6b9447b7c..1b00b1d98 100644 --- a/services/dispatcherh.go +++ b/services/dispatcherh.go @@ -19,7 +19,6 @@ along with this program. If not, see package services import ( - "fmt" "sync" "github.com/cgrates/cgrates/config" @@ -66,12 +65,7 @@ func (dspS *DispatcherHostsService) Start() (err error) { defer dspS.Unlock() dspS.dspS = dispatcherh.NewDispatcherHService(dspS.cfg, dspS.connMgr) - go func(ds *dispatcherh.DispatcherHostsService, ext chan bool) { - if err := ds.ListenAndServe(); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DispatcherH, err.Error())) - ext <- true - } - }(dspS.dspS, dspS.exitChan) + go dspS.dspS.ListenAndServe() dspS.connChan <- dspS.dspS return @@ -85,13 +79,11 @@ func (dspS *DispatcherHostsService) Reload() (err error) { // Shutdown stops the service func (dspS *DispatcherHostsService) Shutdown() (err error) { dspS.Lock() - defer dspS.Unlock() - if err = dspS.dspS.Shutdown(); err != nil { - return - } + dspS.dspS.Shutdown() dspS.dspS = nil // dspS.rpc = nil <-dspS.connChan + dspS.Unlock() return }