diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 572ae10f2..9d53092a6 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -530,7 +530,7 @@ func main() { srvManager := servmanager.NewServiceManager(cfg, exitChan) attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan) dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalDispatcherSChan, connManager) - dspH := services.NewDispatcherHostsService(cfg, server, internalDispatcherSChan, connManager) + dspH := services.NewDispatcherHostsService(cfg, server, internalDispatcherSChan, connManager, exitChan) chrS := services.NewChargerService(cfg, dmService, cacheS, filterSChan, server, internalChargerSChan, connManager) tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server, internalThresholdSChan) diff --git a/dispatcherh/dispatcherh.go b/dispatcherh/dispatcherh.go index e3a885589..7fdc2180b 100644 --- a/dispatcherh/dispatcherh.go +++ b/dispatcherh/dispatcherh.go @@ -29,12 +29,12 @@ import ( // NewDispatcherHService constructs a DispatcherHService func NewDispatcherHService(cfg *config.CGRConfig, - connMgr *engine.ConnManager) (*DispatcherHostsService, error) { + connMgr *engine.ConnManager) *DispatcherHostsService { return &DispatcherHostsService{ cfg: cfg, connMgr: connMgr, stop: make(chan struct{}), - }, nil + } } // DispatcherHostsService is the service handling dispatching towards internal components @@ -46,7 +46,7 @@ type DispatcherHostsService struct { } // ListenAndServe will initialize the service -func (dhS *DispatcherHostsService) ListenAndServe(exitChan chan bool) (err error) { +func (dhS *DispatcherHostsService) ListenAndServe() (err error) { utils.Logger.Info("Starting DispatcherH service") for { if err = dhS.registerHosts(); err != nil { @@ -55,9 +55,6 @@ func (dhS *DispatcherHostsService) ListenAndServe(exitChan chan bool) (err error select { case <-dhS.stop: return - case e := <-exitChan: - exitChan <- e // put back for the others listening for shutdown request - return case <-time.After(dhS.cfg.DispatcherHCfg().RegisterInterval): } } @@ -97,10 +94,6 @@ func (dhS *DispatcherHostsService) registerHosts() (err error) { utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts to the conn with ID <%s> because : %s", utils.DispatcherH, connID, err)) continue - } else if rply != utils.OK { - utils.Logger.Warning(fmt.Sprintf("<%s> Unexpected reply recieved when setting the hosts: %s", - utils.DispatcherH, rply)) - continue } } } @@ -121,15 +114,12 @@ func (dhS *DispatcherHostsService) unregisterHosts() { utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts with tenant<%s> to the conn with ID <%s> because : %s", utils.DispatcherH, tnt, connID, err)) continue - } else if rply != utils.OK { - utils.Logger.Warning(fmt.Sprintf("<%s> Unexpected reply recieved when setting the hosts for tenant<%s>: %s", - utils.DispatcherH, tnt, rply)) - continue } } } } -func (dhS *DispatcherHostsService) Call(_ string, _, _ interface{}) error { +// Call only to implement rpcclient.ClientConnector interface +func (*DispatcherHostsService) Call(_ string, _, _ interface{}) error { return utils.ErrNotImplemented } diff --git a/dispatcherh/dispatcherh_test.go b/dispatcherh/dispatcherh_test.go new file mode 100644 index 000000000..502d7fb59 --- /dev/null +++ b/dispatcherh/dispatcherh_test.go @@ -0,0 +1,133 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package dispatcherh + +import ( + "net/http" + "net/http/httptest" + "reflect" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +func TestDispatcherHostsServiceCall(t *testing.T) { + if err := new(DispatcherHostsService).Call("", nil, nil); err != utils.ErrNotImplemented { + t.Errorf("Expected error: %s ,received: %v", utils.ErrNotImplemented, err) + } +} + +func TestDispatcherHostsService(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(Registar)) + defer ts.Close() + cfg, err := config.NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + + cfg.RPCConns()["conn1"] = &config.RPCConn{ + Strategy: rpcclient.PoolFirst, + Conns: []*config.RemoteHost{{ + Address: ts.URL, + Synchronous: true, + TLS: false, + Transport: rpcclient.HTTPjson, + }}, + } + cfg.DispatcherHCfg().Enabled = true + cfg.DispatcherHCfg().HostIDs = map[string][]string{utils.MetaDefault: {"Host1"}} + cfg.DispatcherHCfg().RegisterInterval = 100 * time.Millisecond + cfg.DispatcherHCfg().RegisterTransport = utils.MetaJSON + cfg.DispatcherHCfg().DispatchersConns = []string{"conn1"} + + ds := NewDispatcherHService(cfg, engine.NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{})) + + if err = ds.registerHosts(); err != nil { + t.Fatal(err) + } + + host1 := &engine.DispatcherHost{ + Tenant: "cgrates.org", + ID: "Host1", + Conns: []*config.RemoteHost{{ + Address: "127.0.0.1:2012", + Transport: utils.MetaJSON, + }}, + } + + if x, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); !ok { + t.Errorf("Expected to find Host1 in cache") + } else if !reflect.DeepEqual(host1, x) { + t.Errorf("Expected: %s ,received: %s", utils.ToJSON(host1), utils.ToJSON(x)) + } + cfg.DispatcherHCfg().HostIDs = map[string][]string{utils.MetaDefault: {"Host2"}} + config.CgrConfig().CacheCfg().Partitions[utils.CacheDispatcherHosts].Replicate = true + config.CgrConfig().CacheCfg().ReplicationConns = []string{"*localhost"} + if err = ds.registerHosts(); err != nil { + t.Fatal(err) + } + host1.ID = "Host2" + if x, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); !ok { + t.Errorf("Expected to find Host2 in cache") + } else if !reflect.DeepEqual(host1, x) { + t.Errorf("Expected: %s ,received: %s", utils.ToJSON(host1), utils.ToJSON(x)) + } + ds.unregisterHosts() + if _, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); ok { + t.Errorf("Expected to not find Host2 in cache") + } + + config.CgrConfig().CacheCfg().Partitions[utils.CacheDispatcherHosts].Replicate = false + config.CgrConfig().CacheCfg().ReplicationConns = []string{} + + host1.ID = "Host1" + cfg.DispatcherHCfg().HostIDs = map[string][]string{utils.MetaDefault: {"Host1"}} + if err = ds.Shutdown(); err != nil { + t.Fatal(err) + } + if _, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); ok { + t.Errorf("Expected to not find Host2 in cache") + } + + cfg.ListenCfg().RPCJSONListen = "2012" + if err = ds.registerHosts(); err == nil { + t.Fatal("Expected error received nil") + } + + ds = NewDispatcherHService(cfg, engine.NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{})) + config.CgrConfig().CacheCfg().Partitions[utils.CacheDispatcherHosts].Replicate = true + config.CgrConfig().CacheCfg().ReplicationConns = []string{"*localhost"} + if err = ds.ListenAndServe(); err == nil { + t.Fatal("Expected error received nil") + } + + config.CgrConfig().CacheCfg().Partitions[utils.CacheDispatcherHosts].Replicate = false + config.CgrConfig().CacheCfg().ReplicationConns = []string{} + cfg.ListenCfg().RPCJSONListen = "127.0.0.1:2012" + + ds = NewDispatcherHService(cfg, engine.NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{})) + ds.Shutdown() + if err = ds.ListenAndServe(); err != nil { + t.Fatal(err) + } +} diff --git a/dispatcherh/libdispatcherh.go b/dispatcherh/libdispatcherh.go index 32ce43c0d..15f5565bb 100644 --- a/dispatcherh/libdispatcherh.go +++ b/dispatcherh/libdispatcherh.go @@ -70,11 +70,12 @@ func (rargs *RegisterArgs) AsDispatcherHosts(ip string) (dHs []*engine.Dispatche func Registar(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() w.Header().Set("Content-Type", "application/json") - result, errMessage := utils.OK, utils.EmptyString + var result interface{} = utils.OK + var errMessage interface{} var err error var id *json.RawMessage if id, err = register(r); err != nil { - result, errMessage = utils.EmptyString, err.Error() + result, errMessage = nil, err.Error() } if err := utils.WriteServerResponse(w, id, result, errMessage); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> Failed to write resonse because: %s", @@ -83,12 +84,14 @@ func Registar(w http.ResponseWriter, r *http.Request) { } func register(req *http.Request) (*json.RawMessage, error) { + id := json.RawMessage("0") sReq, err := utils.DecodeServerRequest(req.Body) if err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode request because: %s", utils.DispatcherH, err)) - return nil, err + return &id, err } + var hasErrors bool switch sReq.Method { default: err = errors.New("rpc: can't find service " + sReq.Method) @@ -104,12 +107,14 @@ func register(req *http.Request) (*json.RawMessage, error) { return sReq.Id, err } for _, id := range args.IDs { - if err = engine.Cache.Remove(utils.CacheDispatcherHosts, utils.ConcatenatedKey(args.Tenant, id), false, utils.NonTransactional); err != nil { + if err = engine.Cache.Remove(utils.CacheDispatcherHosts, utils.ConcatenatedKey(args.Tenant, id), true, utils.NonTransactional); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> Failed to remove DispatcherHost <%s> from cache because: %s", utils.DispatcherH, id, err)) + hasErrors = true continue } } + case utils.DispatcherHv1RegisterHosts: var dHs RegisterArgs params := []interface{}{&dHs} @@ -126,14 +131,18 @@ func register(req *http.Request) (*json.RawMessage, error) { } for _, dH := range dHs.AsDispatcherHosts(addr) { - if err = engine.Cache.Set(utils.CacheDispatcherHosts, dH.Tenant, dH, nil, - false, utils.NonTransactional); err != nil { + if err = engine.Cache.Set(utils.CacheDispatcherHosts, dH.TenantID(), dH, nil, + true, utils.NonTransactional); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> Failed to set DispatcherHost <%s> in cache because: %s", utils.DispatcherH, dH.TenantID(), err)) + hasErrors = true continue } } } + if hasErrors { + return sReq.Id, utils.ErrPartiallyExecuted + } return sReq.Id, nil } diff --git a/dispatcherh/libdispatcherh_test.go b/dispatcherh/libdispatcherh_test.go new file mode 100644 index 000000000..80233d8a8 --- /dev/null +++ b/dispatcherh/libdispatcherh_test.go @@ -0,0 +1,359 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package dispatcherh + +import ( + "bytes" + "encoding/json" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "reflect" + "testing" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +func TestRegisterArgsAsDispatcherHosts(t *testing.T) { + args := &RegisterArgs{ + Tenant: "cgrates.org", + IDs: []string{"Host1", "Host2"}, + Opts: make(map[string]interface{}), + Port: "2012", + TLS: true, + Transport: utils.MetaJSON, + } + exp := []*engine.DispatcherHost{ + { + Tenant: "cgrates.org", + ID: "Host1", + Conns: []*config.RemoteHost{{ + Address: "127.0.0.1:2012", + TLS: true, + Transport: utils.MetaJSON, + }}, + }, + { + Tenant: "cgrates.org", + ID: "Host2", + Conns: []*config.RemoteHost{{ + Address: "127.0.0.1:2012", + TLS: true, + Transport: utils.MetaJSON, + }}, + }, + } + if rply := args.AsDispatcherHosts("127.0.0.1"); !reflect.DeepEqual(exp, rply) { + t.Errorf("Expected: %s ,received: %s", utils.ToJSON(exp), utils.ToJSON(rply)) + } +} + +func TestGetConnPort(t *testing.T) { + cfg, err := config.NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + + cfg.ListenCfg().RPCJSONTLSListen = ":2072" + cfg.ListenCfg().RPCJSONListen = ":2012" + cfg.ListenCfg().RPCGOBTLSListen = ":2073" + cfg.ListenCfg().RPCGOBListen = ":2013" + cfg.ListenCfg().HTTPTLSListen = ":2081" + cfg.ListenCfg().HTTPListen = ":2080" + cfg.HTTPCfg().HTTPJsonRPCURL = "/json_rpc" + + if port, err := getConnPort(cfg, utils.MetaJSON, false); err != nil { + t.Fatal(err) + } else if port != "2012" { + t.Errorf("Expected: %q ,received: %q", "2012", port) + } + if port, err := getConnPort(cfg, utils.MetaJSON, true); err != nil { + t.Fatal(err) + } else if port != "2072" { + t.Errorf("Expected: %q ,received: %q", "2072", port) + } + if port, err := getConnPort(cfg, utils.MetaGOB, false); err != nil { + t.Fatal(err) + } else if port != "2013" { + t.Errorf("Expected: %q ,received: %q", "2013", port) + } + if port, err := getConnPort(cfg, utils.MetaGOB, true); err != nil { + t.Fatal(err) + } else if port != "2073" { + t.Errorf("Expected: %q ,received: %q", "2073", port) + } + if port, err := getConnPort(cfg, rpcclient.HTTPjson, false); err != nil { + t.Fatal(err) + } else if port != "2080/json_rpc" { + t.Errorf("Expected: %q ,received: %q", "2080/json_rpc", port) + } + if port, err := getConnPort(cfg, rpcclient.HTTPjson, true); err != nil { + t.Fatal(err) + } else if port != "2081/json_rpc" { + t.Errorf("Expected: %q ,received: %q", "2081/json_rpc", port) + } + cfg.ListenCfg().RPCJSONListen = "2012" + if _, err := getConnPort(cfg, utils.MetaJSON, false); err == nil { + t.Fatal("Expected error received nil") + } +} + +func TestGetRemoteIP(t *testing.T) { + req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:2080/json_rpc", bytes.NewBuffer(nil)) + if err != nil { + t.Fatal(err) + } + req.RemoteAddr = "127.0.0.1:2356" + exp := "127.0.0.1" + if rply, err := getRemoteIP(req); err != nil { + t.Fatal(err) + } else if rply != exp { + t.Errorf("Expected: %q ,received: %q", exp, rply) + } + req.RemoteAddr = "notAnIP" + if _, err := getRemoteIP(req); err == nil { + t.Fatal("Expected error received nil") + } + req.RemoteAddr = "127.0.0:2012" + if _, err := getRemoteIP(req); err == nil { + t.Fatal("Expected error received nil") + } + + req.Header.Set("X-FORWARDED-FOR", "127.0.0.2,127.0.0.3") + exp = "127.0.0.2" + if rply, err := getRemoteIP(req); err != nil { + t.Fatal(err) + } else if rply != exp { + t.Errorf("Expected: %q ,received: %q", exp, rply) + } + req.Header.Set("X-FORWARDED-FOR", "127.0.0.") + if _, err := getRemoteIP(req); err == nil { + t.Fatal("Expected error received nil") + } + + req.Header.Set("X-REAL-IP", "127.0.0.4") + exp = "127.0.0.4" + if rply, err := getRemoteIP(req); err != nil { + t.Fatal(err) + } else if rply != exp { + t.Errorf("Expected: %q ,received: %q", exp, rply) + } +} + +func TestRegister(t *testing.T) { + ra := &RegisterArgs{ + Tenant: "cgrates.org", + IDs: []string{"Host1", "Host2"}, + Opts: make(map[string]interface{}), + Port: "2012", + TLS: true, + Transport: utils.MetaJSON, + } + raJSON, err := json.Marshal([]interface{}{ra}) + id := json.RawMessage("1") + if err != nil { + t.Fatal(err) + } + args := utils.NewServerRequest(utils.DispatcherHv1RegisterHosts, raJSON, id) + argsJSON, err := json.Marshal(args) + if err != nil { + t.Fatal(err) + } + req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:2080/json_rpc", bytes.NewBuffer(argsJSON)) + if err != nil { + t.Fatal(err) + } + req.RemoteAddr = "127.0.0.1:2356" + engine.SetCache(engine.NewCacheS(config.CgrConfig(), nil)) + if rplyID, err := register(req); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(id, *rplyID) { + t.Errorf("Expected: %q ,received: %q", string(id), string(*rplyID)) + } + + host1 := &engine.DispatcherHost{ + Tenant: "cgrates.org", + ID: "Host1", + Conns: []*config.RemoteHost{{ + Address: "127.0.0.1:2012", + TLS: true, + Transport: utils.MetaJSON, + }}, + } + host2 := &engine.DispatcherHost{ + Tenant: "cgrates.org", + ID: "Host2", + Conns: []*config.RemoteHost{{ + Address: "127.0.0.1:2012", + TLS: true, + Transport: utils.MetaJSON, + }}, + } + + if x, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); !ok { + t.Errorf("Expected to find Host1 in cache") + } else if !reflect.DeepEqual(host1, x) { + t.Errorf("Expected: %s ,received: %s", utils.ToJSON(host1), utils.ToJSON(x)) + } + if x, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host2.TenantID()); !ok { + t.Errorf("Expected to find Host2 in cache") + } else if !reflect.DeepEqual(host2, x) { + t.Errorf("Expected: %s ,received: %s", utils.ToJSON(host2), utils.ToJSON(x)) + } + + if _, err := register(req); err != io.EOF { + t.Errorf("Expected error: %s ,received: %v", io.EOF, err) + } + + ua := &UnregisterArgs{ + Tenant: "cgrates.org", + IDs: []string{"Host1", "Host2"}, + Opts: make(map[string]interface{}), + } + uaJSON, err := json.Marshal([]interface{}{ua}) + id = json.RawMessage("2") + if err != nil { + t.Fatal(err) + } + uargs := utils.NewServerRequest(utils.DispatcherHv1UnregisterHosts, uaJSON, id) + uargsJSON, err := json.Marshal(uargs) + if err != nil { + t.Fatal(err) + } + req, err = http.NewRequest(http.MethodPost, "http://127.0.0.1:2080/json_rpc", bytes.NewBuffer(uargsJSON)) + if err != nil { + t.Fatal(err) + } + req.RemoteAddr = "127.0.0.1:2356" + if rplyID, err := register(req); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(id, *rplyID) { + t.Errorf("Expected: %q ,received: %q", string(id), string(*rplyID)) + } + if x, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); ok { + t.Errorf("Expected to not find Host1 in cache %+v", x) + } + if x, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host2.TenantID()); ok { + t.Errorf("Expected to not find Host2 in cache %+v", x) + } + errCfg, err := config.NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + engine.NewConnManager(errCfg, map[string]chan rpcclient.ClientConnector{}) + errCfg.CacheCfg().Partitions[utils.CacheDispatcherHosts].Replicate = true + errCfg.CacheCfg().ReplicationConns = []string{"*localhost"} + engine.SetCache(engine.NewCacheS(errCfg, nil)) + req.Body = ioutil.NopCloser(bytes.NewBuffer(uargsJSON)) + if _, err := register(req); err != utils.ErrPartiallyExecuted { + t.Errorf("Expected error: %s ,received: %v", utils.ErrPartiallyExecuted, err) + } + + req.Body = ioutil.NopCloser(bytes.NewBuffer(argsJSON)) + if _, err := register(req); err != utils.ErrPartiallyExecuted { + t.Errorf("Expected error: %s ,received: %v", utils.ErrPartiallyExecuted, err) + } + + req.RemoteAddr = "127.0.0" + req.Body = ioutil.NopCloser(bytes.NewBuffer(argsJSON)) + if _, err := register(req); err == nil { + t.Errorf("Expected error,received: nil") + } + args2 := utils.NewServerRequest(utils.DispatcherHv1RegisterHosts, id, id) + args2JSON, err := json.Marshal(args2) + if err != nil { + t.Fatal(err) + } + req.Body = ioutil.NopCloser(bytes.NewBuffer(args2JSON)) + if _, err := register(req); err == nil { + t.Errorf("Expected error,received: nil") + } + args2 = utils.NewServerRequest(utils.DispatcherHv1UnregisterHosts, id, id) + args2JSON, err = json.Marshal(args2) + if err != nil { + t.Fatal(err) + } + req.Body = ioutil.NopCloser(bytes.NewBuffer(args2JSON)) + if _, err := register(req); err == nil { + t.Errorf("Expected error,received: nil") + } + args2 = utils.NewServerRequest(utils.DispatcherSv1GetProfileForEvent, id, id) + args2JSON, err = json.Marshal(args2) + if err != nil { + t.Fatal(err) + } + req.Body = ioutil.NopCloser(bytes.NewBuffer(args2JSON)) + if _, err := register(req); err == nil { + t.Errorf("Expected error,received: nil") + } + + engine.SetCache(engine.NewCacheS(config.CgrConfig(), nil)) +} + +type errRecorder struct{} + +func (*errRecorder) Header() http.Header { return make(http.Header) } +func (*errRecorder) Write([]byte) (int, error) { return 0, io.EOF } +func (*errRecorder) WriteHeader(statusCode int) {} + +func TestRegistar(t *testing.T) { + w := httptest.NewRecorder() + ra := &RegisterArgs{ + Tenant: "cgrates.org", + IDs: []string{"Host1", "Host2"}, + Opts: make(map[string]interface{}), + Port: "2012", + TLS: true, + Transport: utils.MetaJSON, + } + raJSON, err := json.Marshal([]interface{}{ra}) + id := json.RawMessage("1") + if err != nil { + t.Fatal(err) + } + args := utils.NewServerRequest(utils.DispatcherHv1RegisterHosts, raJSON, id) + argsJSON, err := json.Marshal(args) + if err != nil { + t.Fatal(err) + } + req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:2080/json_rpc", bytes.NewBuffer(argsJSON)) + if err != nil { + t.Fatal(err) + } + req.RemoteAddr = "127.0.0.1:2356" + + Registar(w, req) + exp := "{\"id\":1,\"result\":\"OK\",\"error\":null}\n" + if w.Body.String() != exp { + t.Errorf("Expected: %q ,received: %q", exp, w.Body.String()) + } + + w = httptest.NewRecorder() + Registar(w, req) + exp = "{\"id\":0,\"result\":null,\"error\":\"EOF\"}\n" + if w.Body.String() != exp { + t.Errorf("Expected: %q ,received: %q", exp, w.Body.String()) + } + + Registar(new(errRecorder), req) +} diff --git a/services/dispatcherh.go b/services/dispatcherh.go index 0dce19156..6b9447b7c 100644 --- a/services/dispatcherh.go +++ b/services/dispatcherh.go @@ -32,21 +32,24 @@ import ( // NewDispatcherHostsService returns the Dispatcher Service func NewDispatcherHostsService(cfg *config.CGRConfig, server *utils.Server, - internalChan chan rpcclient.ClientConnector, connMgr *engine.ConnManager) servmanager.Service { + internalChan chan rpcclient.ClientConnector, connMgr *engine.ConnManager, + exitChan chan bool) servmanager.Service { return &DispatcherHostsService{ connChan: internalChan, cfg: cfg, server: server, connMgr: connMgr, + exitChan: exitChan, } } // DispatcherHostsService implements Service interface type DispatcherHostsService struct { sync.RWMutex - cfg *config.CGRConfig - server *utils.Server - connMgr *engine.ConnManager + cfg *config.CGRConfig + server *utils.Server + connMgr *engine.ConnManager + exitChan chan bool dspS *dispatcherh.DispatcherHostsService // rpc *v1.DispatcherHSv1 @@ -62,11 +65,13 @@ func (dspS *DispatcherHostsService) Start() (err error) { dspS.Lock() defer dspS.Unlock() - if dspS.dspS, err = dispatcherh.NewDispatcherHService(dspS.cfg, dspS.connMgr); err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherH, err.Error())) - return - } - + dspS.dspS = dispatcherh.NewDispatcherHService(dspS.cfg, dspS.connMgr) + go func(ds *dispatcherh.DispatcherHostsService, ext chan bool) { + if err := ds.ListenAndServe(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DispatcherH, err.Error())) + ext <- true + } + }(dspS.dspS, dspS.exitChan) dspS.connChan <- dspS.dspS return diff --git a/utils/json_codec.go b/utils/json_codec.go index 473ecf356..8534530a8 100644 --- a/utils/json_codec.go +++ b/utils/json_codec.go @@ -71,6 +71,15 @@ func DecodeServerRequest(r io.Reader) (req *serverRequest, err error) { return } +// NewServerRequest used in dispatcherh tests +func NewServerRequest(method string, params, id json.RawMessage) *serverRequest { + return &serverRequest{ + Method: method, + Params: ¶ms, + Id: &id, + } +} + type serverRequest struct { Method string `json:"method"` Params *json.RawMessage `json:"params"` diff --git a/utils/server.go b/utils/server.go index 487fa0d13..eaca80b8f 100644 --- a/utils/server.go +++ b/utils/server.go @@ -382,7 +382,7 @@ func (r *rpcRequest) Call() io.Reader { } func loadTLSConfig(serverCrt, serverKey, caCert string, serverPolicy int, - serverName string) (config tls.Config, err error) { + serverName string) (config *tls.Config, err error) { cert, err := tls.LoadX509KeyPair(serverCrt, serverKey) if err != nil { log.Fatalf("Error: %s when load server keys", err) @@ -409,7 +409,7 @@ func loadTLSConfig(serverCrt, serverKey, caCert string, serverPolicy int, } } - config = tls.Config{ + config = &tls.Config{ Certificates: []tls.Certificate{cert}, ClientAuth: tls.ClientAuthType(serverPolicy), ClientCAs: rootCAs, @@ -432,7 +432,7 @@ func (s *Server) ServeGOBTLS(addr, serverCrt, serverKey, caCert string, if err != nil { return } - listener, err := tls.Listen(TCP, addr, &config) + listener, err := tls.Listen(TCP, addr, config) if err != nil { log.Println(fmt.Sprintf("Error: %s when listening", err)) exitChan <- true @@ -474,7 +474,7 @@ func (s *Server) ServeJSONTLS(addr, serverCrt, serverKey, caCert string, if err != nil { return } - listener, err := tls.Listen(TCP, addr, &config) + listener, err := tls.Listen(TCP, addr, config) if err != nil { log.Println(fmt.Sprintf("Error: %s when listening", err)) exitChan <- true @@ -561,7 +561,7 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP httpSrv := http.Server{ Addr: addr, Handler: s.httpsMux, - TLSConfig: &config, + TLSConfig: config, } Logger.Info(fmt.Sprintf(" start listening at <%s>", addr)) if err := httpSrv.ListenAndServeTLS(serverCrt, serverKey); err != nil {