mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Added unit tests for dispatcherh
This commit is contained in:
committed by
Dan Christian Bogos
parent
d8b19f5972
commit
2637687e22
@@ -530,7 +530,7 @@ func main() {
|
||||
srvManager := servmanager.NewServiceManager(cfg, exitChan)
|
||||
attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan)
|
||||
dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalDispatcherSChan, connManager)
|
||||
dspH := services.NewDispatcherHostsService(cfg, server, internalDispatcherSChan, connManager)
|
||||
dspH := services.NewDispatcherHostsService(cfg, server, internalDispatcherSChan, connManager, exitChan)
|
||||
chrS := services.NewChargerService(cfg, dmService, cacheS, filterSChan, server,
|
||||
internalChargerSChan, connManager)
|
||||
tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server, internalThresholdSChan)
|
||||
|
||||
@@ -29,12 +29,12 @@ import (
|
||||
|
||||
// NewDispatcherHService constructs a DispatcherHService
|
||||
func NewDispatcherHService(cfg *config.CGRConfig,
|
||||
connMgr *engine.ConnManager) (*DispatcherHostsService, error) {
|
||||
connMgr *engine.ConnManager) *DispatcherHostsService {
|
||||
return &DispatcherHostsService{
|
||||
cfg: cfg,
|
||||
connMgr: connMgr,
|
||||
stop: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// DispatcherHostsService is the service handling dispatching towards internal components
|
||||
@@ -46,7 +46,7 @@ type DispatcherHostsService struct {
|
||||
}
|
||||
|
||||
// ListenAndServe will initialize the service
|
||||
func (dhS *DispatcherHostsService) ListenAndServe(exitChan chan bool) (err error) {
|
||||
func (dhS *DispatcherHostsService) ListenAndServe() (err error) {
|
||||
utils.Logger.Info("Starting DispatcherH service")
|
||||
for {
|
||||
if err = dhS.registerHosts(); err != nil {
|
||||
@@ -55,9 +55,6 @@ func (dhS *DispatcherHostsService) ListenAndServe(exitChan chan bool) (err error
|
||||
select {
|
||||
case <-dhS.stop:
|
||||
return
|
||||
case e := <-exitChan:
|
||||
exitChan <- e // put back for the others listening for shutdown request
|
||||
return
|
||||
case <-time.After(dhS.cfg.DispatcherHCfg().RegisterInterval):
|
||||
}
|
||||
}
|
||||
@@ -97,10 +94,6 @@ func (dhS *DispatcherHostsService) registerHosts() (err error) {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts to the conn with ID <%s> because : %s",
|
||||
utils.DispatcherH, connID, err))
|
||||
continue
|
||||
} else if rply != utils.OK {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Unexpected reply recieved when setting the hosts: %s",
|
||||
utils.DispatcherH, rply))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -121,15 +114,12 @@ func (dhS *DispatcherHostsService) unregisterHosts() {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts with tenant<%s> to the conn with ID <%s> because : %s",
|
||||
utils.DispatcherH, tnt, connID, err))
|
||||
continue
|
||||
} else if rply != utils.OK {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Unexpected reply recieved when setting the hosts for tenant<%s>: %s",
|
||||
utils.DispatcherH, tnt, rply))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (dhS *DispatcherHostsService) Call(_ string, _, _ interface{}) error {
|
||||
// Call only to implement rpcclient.ClientConnector interface
|
||||
func (*DispatcherHostsService) Call(_ string, _, _ interface{}) error {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
133
dispatcherh/dispatcherh_test.go
Normal file
133
dispatcherh/dispatcherh_test.go
Normal file
@@ -0,0 +1,133 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package dispatcherh
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
func TestDispatcherHostsServiceCall(t *testing.T) {
|
||||
if err := new(DispatcherHostsService).Call("", nil, nil); err != utils.ErrNotImplemented {
|
||||
t.Errorf("Expected error: %s ,received: %v", utils.ErrNotImplemented, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatcherHostsService(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(Registar))
|
||||
defer ts.Close()
|
||||
cfg, err := config.NewDefaultCGRConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cfg.RPCConns()["conn1"] = &config.RPCConn{
|
||||
Strategy: rpcclient.PoolFirst,
|
||||
Conns: []*config.RemoteHost{{
|
||||
Address: ts.URL,
|
||||
Synchronous: true,
|
||||
TLS: false,
|
||||
Transport: rpcclient.HTTPjson,
|
||||
}},
|
||||
}
|
||||
cfg.DispatcherHCfg().Enabled = true
|
||||
cfg.DispatcherHCfg().HostIDs = map[string][]string{utils.MetaDefault: {"Host1"}}
|
||||
cfg.DispatcherHCfg().RegisterInterval = 100 * time.Millisecond
|
||||
cfg.DispatcherHCfg().RegisterTransport = utils.MetaJSON
|
||||
cfg.DispatcherHCfg().DispatchersConns = []string{"conn1"}
|
||||
|
||||
ds := NewDispatcherHService(cfg, engine.NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{}))
|
||||
|
||||
if err = ds.registerHosts(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
host1 := &engine.DispatcherHost{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "Host1",
|
||||
Conns: []*config.RemoteHost{{
|
||||
Address: "127.0.0.1:2012",
|
||||
Transport: utils.MetaJSON,
|
||||
}},
|
||||
}
|
||||
|
||||
if x, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); !ok {
|
||||
t.Errorf("Expected to find Host1 in cache")
|
||||
} else if !reflect.DeepEqual(host1, x) {
|
||||
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(host1), utils.ToJSON(x))
|
||||
}
|
||||
cfg.DispatcherHCfg().HostIDs = map[string][]string{utils.MetaDefault: {"Host2"}}
|
||||
config.CgrConfig().CacheCfg().Partitions[utils.CacheDispatcherHosts].Replicate = true
|
||||
config.CgrConfig().CacheCfg().ReplicationConns = []string{"*localhost"}
|
||||
if err = ds.registerHosts(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
host1.ID = "Host2"
|
||||
if x, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); !ok {
|
||||
t.Errorf("Expected to find Host2 in cache")
|
||||
} else if !reflect.DeepEqual(host1, x) {
|
||||
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(host1), utils.ToJSON(x))
|
||||
}
|
||||
ds.unregisterHosts()
|
||||
if _, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); ok {
|
||||
t.Errorf("Expected to not find Host2 in cache")
|
||||
}
|
||||
|
||||
config.CgrConfig().CacheCfg().Partitions[utils.CacheDispatcherHosts].Replicate = false
|
||||
config.CgrConfig().CacheCfg().ReplicationConns = []string{}
|
||||
|
||||
host1.ID = "Host1"
|
||||
cfg.DispatcherHCfg().HostIDs = map[string][]string{utils.MetaDefault: {"Host1"}}
|
||||
if err = ds.Shutdown(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); ok {
|
||||
t.Errorf("Expected to not find Host2 in cache")
|
||||
}
|
||||
|
||||
cfg.ListenCfg().RPCJSONListen = "2012"
|
||||
if err = ds.registerHosts(); err == nil {
|
||||
t.Fatal("Expected error received nil")
|
||||
}
|
||||
|
||||
ds = NewDispatcherHService(cfg, engine.NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{}))
|
||||
config.CgrConfig().CacheCfg().Partitions[utils.CacheDispatcherHosts].Replicate = true
|
||||
config.CgrConfig().CacheCfg().ReplicationConns = []string{"*localhost"}
|
||||
if err = ds.ListenAndServe(); err == nil {
|
||||
t.Fatal("Expected error received nil")
|
||||
}
|
||||
|
||||
config.CgrConfig().CacheCfg().Partitions[utils.CacheDispatcherHosts].Replicate = false
|
||||
config.CgrConfig().CacheCfg().ReplicationConns = []string{}
|
||||
cfg.ListenCfg().RPCJSONListen = "127.0.0.1:2012"
|
||||
|
||||
ds = NewDispatcherHService(cfg, engine.NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{}))
|
||||
ds.Shutdown()
|
||||
if err = ds.ListenAndServe(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
@@ -70,11 +70,12 @@ func (rargs *RegisterArgs) AsDispatcherHosts(ip string) (dHs []*engine.Dispatche
|
||||
func Registar(w http.ResponseWriter, r *http.Request) {
|
||||
defer r.Body.Close()
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
result, errMessage := utils.OK, utils.EmptyString
|
||||
var result interface{} = utils.OK
|
||||
var errMessage interface{}
|
||||
var err error
|
||||
var id *json.RawMessage
|
||||
if id, err = register(r); err != nil {
|
||||
result, errMessage = utils.EmptyString, err.Error()
|
||||
result, errMessage = nil, err.Error()
|
||||
}
|
||||
if err := utils.WriteServerResponse(w, id, result, errMessage); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to write resonse because: %s",
|
||||
@@ -83,12 +84,14 @@ func Registar(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func register(req *http.Request) (*json.RawMessage, error) {
|
||||
id := json.RawMessage("0")
|
||||
sReq, err := utils.DecodeServerRequest(req.Body)
|
||||
if err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode request because: %s",
|
||||
utils.DispatcherH, err))
|
||||
return nil, err
|
||||
return &id, err
|
||||
}
|
||||
var hasErrors bool
|
||||
switch sReq.Method {
|
||||
default:
|
||||
err = errors.New("rpc: can't find service " + sReq.Method)
|
||||
@@ -104,12 +107,14 @@ func register(req *http.Request) (*json.RawMessage, error) {
|
||||
return sReq.Id, err
|
||||
}
|
||||
for _, id := range args.IDs {
|
||||
if err = engine.Cache.Remove(utils.CacheDispatcherHosts, utils.ConcatenatedKey(args.Tenant, id), false, utils.NonTransactional); err != nil {
|
||||
if err = engine.Cache.Remove(utils.CacheDispatcherHosts, utils.ConcatenatedKey(args.Tenant, id), true, utils.NonTransactional); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to remove DispatcherHost <%s> from cache because: %s",
|
||||
utils.DispatcherH, id, err))
|
||||
hasErrors = true
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
case utils.DispatcherHv1RegisterHosts:
|
||||
var dHs RegisterArgs
|
||||
params := []interface{}{&dHs}
|
||||
@@ -126,14 +131,18 @@ func register(req *http.Request) (*json.RawMessage, error) {
|
||||
}
|
||||
|
||||
for _, dH := range dHs.AsDispatcherHosts(addr) {
|
||||
if err = engine.Cache.Set(utils.CacheDispatcherHosts, dH.Tenant, dH, nil,
|
||||
false, utils.NonTransactional); err != nil {
|
||||
if err = engine.Cache.Set(utils.CacheDispatcherHosts, dH.TenantID(), dH, nil,
|
||||
true, utils.NonTransactional); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to set DispatcherHost <%s> in cache because: %s",
|
||||
utils.DispatcherH, dH.TenantID(), err))
|
||||
hasErrors = true
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
if hasErrors {
|
||||
return sReq.Id, utils.ErrPartiallyExecuted
|
||||
}
|
||||
return sReq.Id, nil
|
||||
}
|
||||
|
||||
|
||||
359
dispatcherh/libdispatcherh_test.go
Normal file
359
dispatcherh/libdispatcherh_test.go
Normal file
@@ -0,0 +1,359 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package dispatcherh
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
func TestRegisterArgsAsDispatcherHosts(t *testing.T) {
|
||||
args := &RegisterArgs{
|
||||
Tenant: "cgrates.org",
|
||||
IDs: []string{"Host1", "Host2"},
|
||||
Opts: make(map[string]interface{}),
|
||||
Port: "2012",
|
||||
TLS: true,
|
||||
Transport: utils.MetaJSON,
|
||||
}
|
||||
exp := []*engine.DispatcherHost{
|
||||
{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "Host1",
|
||||
Conns: []*config.RemoteHost{{
|
||||
Address: "127.0.0.1:2012",
|
||||
TLS: true,
|
||||
Transport: utils.MetaJSON,
|
||||
}},
|
||||
},
|
||||
{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "Host2",
|
||||
Conns: []*config.RemoteHost{{
|
||||
Address: "127.0.0.1:2012",
|
||||
TLS: true,
|
||||
Transport: utils.MetaJSON,
|
||||
}},
|
||||
},
|
||||
}
|
||||
if rply := args.AsDispatcherHosts("127.0.0.1"); !reflect.DeepEqual(exp, rply) {
|
||||
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(exp), utils.ToJSON(rply))
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetConnPort(t *testing.T) {
|
||||
cfg, err := config.NewDefaultCGRConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cfg.ListenCfg().RPCJSONTLSListen = ":2072"
|
||||
cfg.ListenCfg().RPCJSONListen = ":2012"
|
||||
cfg.ListenCfg().RPCGOBTLSListen = ":2073"
|
||||
cfg.ListenCfg().RPCGOBListen = ":2013"
|
||||
cfg.ListenCfg().HTTPTLSListen = ":2081"
|
||||
cfg.ListenCfg().HTTPListen = ":2080"
|
||||
cfg.HTTPCfg().HTTPJsonRPCURL = "/json_rpc"
|
||||
|
||||
if port, err := getConnPort(cfg, utils.MetaJSON, false); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if port != "2012" {
|
||||
t.Errorf("Expected: %q ,received: %q", "2012", port)
|
||||
}
|
||||
if port, err := getConnPort(cfg, utils.MetaJSON, true); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if port != "2072" {
|
||||
t.Errorf("Expected: %q ,received: %q", "2072", port)
|
||||
}
|
||||
if port, err := getConnPort(cfg, utils.MetaGOB, false); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if port != "2013" {
|
||||
t.Errorf("Expected: %q ,received: %q", "2013", port)
|
||||
}
|
||||
if port, err := getConnPort(cfg, utils.MetaGOB, true); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if port != "2073" {
|
||||
t.Errorf("Expected: %q ,received: %q", "2073", port)
|
||||
}
|
||||
if port, err := getConnPort(cfg, rpcclient.HTTPjson, false); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if port != "2080/json_rpc" {
|
||||
t.Errorf("Expected: %q ,received: %q", "2080/json_rpc", port)
|
||||
}
|
||||
if port, err := getConnPort(cfg, rpcclient.HTTPjson, true); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if port != "2081/json_rpc" {
|
||||
t.Errorf("Expected: %q ,received: %q", "2081/json_rpc", port)
|
||||
}
|
||||
cfg.ListenCfg().RPCJSONListen = "2012"
|
||||
if _, err := getConnPort(cfg, utils.MetaJSON, false); err == nil {
|
||||
t.Fatal("Expected error received nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetRemoteIP(t *testing.T) {
|
||||
req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:2080/json_rpc", bytes.NewBuffer(nil))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req.RemoteAddr = "127.0.0.1:2356"
|
||||
exp := "127.0.0.1"
|
||||
if rply, err := getRemoteIP(req); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if rply != exp {
|
||||
t.Errorf("Expected: %q ,received: %q", exp, rply)
|
||||
}
|
||||
req.RemoteAddr = "notAnIP"
|
||||
if _, err := getRemoteIP(req); err == nil {
|
||||
t.Fatal("Expected error received nil")
|
||||
}
|
||||
req.RemoteAddr = "127.0.0:2012"
|
||||
if _, err := getRemoteIP(req); err == nil {
|
||||
t.Fatal("Expected error received nil")
|
||||
}
|
||||
|
||||
req.Header.Set("X-FORWARDED-FOR", "127.0.0.2,127.0.0.3")
|
||||
exp = "127.0.0.2"
|
||||
if rply, err := getRemoteIP(req); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if rply != exp {
|
||||
t.Errorf("Expected: %q ,received: %q", exp, rply)
|
||||
}
|
||||
req.Header.Set("X-FORWARDED-FOR", "127.0.0.")
|
||||
if _, err := getRemoteIP(req); err == nil {
|
||||
t.Fatal("Expected error received nil")
|
||||
}
|
||||
|
||||
req.Header.Set("X-REAL-IP", "127.0.0.4")
|
||||
exp = "127.0.0.4"
|
||||
if rply, err := getRemoteIP(req); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if rply != exp {
|
||||
t.Errorf("Expected: %q ,received: %q", exp, rply)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegister(t *testing.T) {
|
||||
ra := &RegisterArgs{
|
||||
Tenant: "cgrates.org",
|
||||
IDs: []string{"Host1", "Host2"},
|
||||
Opts: make(map[string]interface{}),
|
||||
Port: "2012",
|
||||
TLS: true,
|
||||
Transport: utils.MetaJSON,
|
||||
}
|
||||
raJSON, err := json.Marshal([]interface{}{ra})
|
||||
id := json.RawMessage("1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
args := utils.NewServerRequest(utils.DispatcherHv1RegisterHosts, raJSON, id)
|
||||
argsJSON, err := json.Marshal(args)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:2080/json_rpc", bytes.NewBuffer(argsJSON))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req.RemoteAddr = "127.0.0.1:2356"
|
||||
engine.SetCache(engine.NewCacheS(config.CgrConfig(), nil))
|
||||
if rplyID, err := register(req); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !reflect.DeepEqual(id, *rplyID) {
|
||||
t.Errorf("Expected: %q ,received: %q", string(id), string(*rplyID))
|
||||
}
|
||||
|
||||
host1 := &engine.DispatcherHost{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "Host1",
|
||||
Conns: []*config.RemoteHost{{
|
||||
Address: "127.0.0.1:2012",
|
||||
TLS: true,
|
||||
Transport: utils.MetaJSON,
|
||||
}},
|
||||
}
|
||||
host2 := &engine.DispatcherHost{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "Host2",
|
||||
Conns: []*config.RemoteHost{{
|
||||
Address: "127.0.0.1:2012",
|
||||
TLS: true,
|
||||
Transport: utils.MetaJSON,
|
||||
}},
|
||||
}
|
||||
|
||||
if x, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); !ok {
|
||||
t.Errorf("Expected to find Host1 in cache")
|
||||
} else if !reflect.DeepEqual(host1, x) {
|
||||
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(host1), utils.ToJSON(x))
|
||||
}
|
||||
if x, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host2.TenantID()); !ok {
|
||||
t.Errorf("Expected to find Host2 in cache")
|
||||
} else if !reflect.DeepEqual(host2, x) {
|
||||
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(host2), utils.ToJSON(x))
|
||||
}
|
||||
|
||||
if _, err := register(req); err != io.EOF {
|
||||
t.Errorf("Expected error: %s ,received: %v", io.EOF, err)
|
||||
}
|
||||
|
||||
ua := &UnregisterArgs{
|
||||
Tenant: "cgrates.org",
|
||||
IDs: []string{"Host1", "Host2"},
|
||||
Opts: make(map[string]interface{}),
|
||||
}
|
||||
uaJSON, err := json.Marshal([]interface{}{ua})
|
||||
id = json.RawMessage("2")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
uargs := utils.NewServerRequest(utils.DispatcherHv1UnregisterHosts, uaJSON, id)
|
||||
uargsJSON, err := json.Marshal(uargs)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req, err = http.NewRequest(http.MethodPost, "http://127.0.0.1:2080/json_rpc", bytes.NewBuffer(uargsJSON))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req.RemoteAddr = "127.0.0.1:2356"
|
||||
if rplyID, err := register(req); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !reflect.DeepEqual(id, *rplyID) {
|
||||
t.Errorf("Expected: %q ,received: %q", string(id), string(*rplyID))
|
||||
}
|
||||
if x, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); ok {
|
||||
t.Errorf("Expected to not find Host1 in cache %+v", x)
|
||||
}
|
||||
if x, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host2.TenantID()); ok {
|
||||
t.Errorf("Expected to not find Host2 in cache %+v", x)
|
||||
}
|
||||
errCfg, err := config.NewDefaultCGRConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
engine.NewConnManager(errCfg, map[string]chan rpcclient.ClientConnector{})
|
||||
errCfg.CacheCfg().Partitions[utils.CacheDispatcherHosts].Replicate = true
|
||||
errCfg.CacheCfg().ReplicationConns = []string{"*localhost"}
|
||||
engine.SetCache(engine.NewCacheS(errCfg, nil))
|
||||
req.Body = ioutil.NopCloser(bytes.NewBuffer(uargsJSON))
|
||||
if _, err := register(req); err != utils.ErrPartiallyExecuted {
|
||||
t.Errorf("Expected error: %s ,received: %v", utils.ErrPartiallyExecuted, err)
|
||||
}
|
||||
|
||||
req.Body = ioutil.NopCloser(bytes.NewBuffer(argsJSON))
|
||||
if _, err := register(req); err != utils.ErrPartiallyExecuted {
|
||||
t.Errorf("Expected error: %s ,received: %v", utils.ErrPartiallyExecuted, err)
|
||||
}
|
||||
|
||||
req.RemoteAddr = "127.0.0"
|
||||
req.Body = ioutil.NopCloser(bytes.NewBuffer(argsJSON))
|
||||
if _, err := register(req); err == nil {
|
||||
t.Errorf("Expected error,received: nil")
|
||||
}
|
||||
args2 := utils.NewServerRequest(utils.DispatcherHv1RegisterHosts, id, id)
|
||||
args2JSON, err := json.Marshal(args2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req.Body = ioutil.NopCloser(bytes.NewBuffer(args2JSON))
|
||||
if _, err := register(req); err == nil {
|
||||
t.Errorf("Expected error,received: nil")
|
||||
}
|
||||
args2 = utils.NewServerRequest(utils.DispatcherHv1UnregisterHosts, id, id)
|
||||
args2JSON, err = json.Marshal(args2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req.Body = ioutil.NopCloser(bytes.NewBuffer(args2JSON))
|
||||
if _, err := register(req); err == nil {
|
||||
t.Errorf("Expected error,received: nil")
|
||||
}
|
||||
args2 = utils.NewServerRequest(utils.DispatcherSv1GetProfileForEvent, id, id)
|
||||
args2JSON, err = json.Marshal(args2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req.Body = ioutil.NopCloser(bytes.NewBuffer(args2JSON))
|
||||
if _, err := register(req); err == nil {
|
||||
t.Errorf("Expected error,received: nil")
|
||||
}
|
||||
|
||||
engine.SetCache(engine.NewCacheS(config.CgrConfig(), nil))
|
||||
}
|
||||
|
||||
type errRecorder struct{}
|
||||
|
||||
func (*errRecorder) Header() http.Header { return make(http.Header) }
|
||||
func (*errRecorder) Write([]byte) (int, error) { return 0, io.EOF }
|
||||
func (*errRecorder) WriteHeader(statusCode int) {}
|
||||
|
||||
func TestRegistar(t *testing.T) {
|
||||
w := httptest.NewRecorder()
|
||||
ra := &RegisterArgs{
|
||||
Tenant: "cgrates.org",
|
||||
IDs: []string{"Host1", "Host2"},
|
||||
Opts: make(map[string]interface{}),
|
||||
Port: "2012",
|
||||
TLS: true,
|
||||
Transport: utils.MetaJSON,
|
||||
}
|
||||
raJSON, err := json.Marshal([]interface{}{ra})
|
||||
id := json.RawMessage("1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
args := utils.NewServerRequest(utils.DispatcherHv1RegisterHosts, raJSON, id)
|
||||
argsJSON, err := json.Marshal(args)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:2080/json_rpc", bytes.NewBuffer(argsJSON))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req.RemoteAddr = "127.0.0.1:2356"
|
||||
|
||||
Registar(w, req)
|
||||
exp := "{\"id\":1,\"result\":\"OK\",\"error\":null}\n"
|
||||
if w.Body.String() != exp {
|
||||
t.Errorf("Expected: %q ,received: %q", exp, w.Body.String())
|
||||
}
|
||||
|
||||
w = httptest.NewRecorder()
|
||||
Registar(w, req)
|
||||
exp = "{\"id\":0,\"result\":null,\"error\":\"EOF\"}\n"
|
||||
if w.Body.String() != exp {
|
||||
t.Errorf("Expected: %q ,received: %q", exp, w.Body.String())
|
||||
}
|
||||
|
||||
Registar(new(errRecorder), req)
|
||||
}
|
||||
@@ -32,21 +32,24 @@ import (
|
||||
|
||||
// NewDispatcherHostsService returns the Dispatcher Service
|
||||
func NewDispatcherHostsService(cfg *config.CGRConfig, server *utils.Server,
|
||||
internalChan chan rpcclient.ClientConnector, connMgr *engine.ConnManager) servmanager.Service {
|
||||
internalChan chan rpcclient.ClientConnector, connMgr *engine.ConnManager,
|
||||
exitChan chan bool) servmanager.Service {
|
||||
return &DispatcherHostsService{
|
||||
connChan: internalChan,
|
||||
cfg: cfg,
|
||||
server: server,
|
||||
connMgr: connMgr,
|
||||
exitChan: exitChan,
|
||||
}
|
||||
}
|
||||
|
||||
// DispatcherHostsService implements Service interface
|
||||
type DispatcherHostsService struct {
|
||||
sync.RWMutex
|
||||
cfg *config.CGRConfig
|
||||
server *utils.Server
|
||||
connMgr *engine.ConnManager
|
||||
cfg *config.CGRConfig
|
||||
server *utils.Server
|
||||
connMgr *engine.ConnManager
|
||||
exitChan chan bool
|
||||
|
||||
dspS *dispatcherh.DispatcherHostsService
|
||||
// rpc *v1.DispatcherHSv1
|
||||
@@ -62,11 +65,13 @@ func (dspS *DispatcherHostsService) Start() (err error) {
|
||||
dspS.Lock()
|
||||
defer dspS.Unlock()
|
||||
|
||||
if dspS.dspS, err = dispatcherh.NewDispatcherHService(dspS.cfg, dspS.connMgr); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherH, err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
dspS.dspS = dispatcherh.NewDispatcherHService(dspS.cfg, dspS.connMgr)
|
||||
go func(ds *dispatcherh.DispatcherHostsService, ext chan bool) {
|
||||
if err := ds.ListenAndServe(); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DispatcherH, err.Error()))
|
||||
ext <- true
|
||||
}
|
||||
}(dspS.dspS, dspS.exitChan)
|
||||
dspS.connChan <- dspS.dspS
|
||||
|
||||
return
|
||||
|
||||
@@ -71,6 +71,15 @@ func DecodeServerRequest(r io.Reader) (req *serverRequest, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// NewServerRequest used in dispatcherh tests
|
||||
func NewServerRequest(method string, params, id json.RawMessage) *serverRequest {
|
||||
return &serverRequest{
|
||||
Method: method,
|
||||
Params: ¶ms,
|
||||
Id: &id,
|
||||
}
|
||||
}
|
||||
|
||||
type serverRequest struct {
|
||||
Method string `json:"method"`
|
||||
Params *json.RawMessage `json:"params"`
|
||||
|
||||
@@ -382,7 +382,7 @@ func (r *rpcRequest) Call() io.Reader {
|
||||
}
|
||||
|
||||
func loadTLSConfig(serverCrt, serverKey, caCert string, serverPolicy int,
|
||||
serverName string) (config tls.Config, err error) {
|
||||
serverName string) (config *tls.Config, err error) {
|
||||
cert, err := tls.LoadX509KeyPair(serverCrt, serverKey)
|
||||
if err != nil {
|
||||
log.Fatalf("Error: %s when load server keys", err)
|
||||
@@ -409,7 +409,7 @@ func loadTLSConfig(serverCrt, serverKey, caCert string, serverPolicy int,
|
||||
}
|
||||
}
|
||||
|
||||
config = tls.Config{
|
||||
config = &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
ClientAuth: tls.ClientAuthType(serverPolicy),
|
||||
ClientCAs: rootCAs,
|
||||
@@ -432,7 +432,7 @@ func (s *Server) ServeGOBTLS(addr, serverCrt, serverKey, caCert string,
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
listener, err := tls.Listen(TCP, addr, &config)
|
||||
listener, err := tls.Listen(TCP, addr, config)
|
||||
if err != nil {
|
||||
log.Println(fmt.Sprintf("Error: %s when listening", err))
|
||||
exitChan <- true
|
||||
@@ -474,7 +474,7 @@ func (s *Server) ServeJSONTLS(addr, serverCrt, serverKey, caCert string,
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
listener, err := tls.Listen(TCP, addr, &config)
|
||||
listener, err := tls.Listen(TCP, addr, config)
|
||||
if err != nil {
|
||||
log.Println(fmt.Sprintf("Error: %s when listening", err))
|
||||
exitChan <- true
|
||||
@@ -561,7 +561,7 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP
|
||||
httpSrv := http.Server{
|
||||
Addr: addr,
|
||||
Handler: s.httpsMux,
|
||||
TLSConfig: &config,
|
||||
TLSConfig: config,
|
||||
}
|
||||
Logger.Info(fmt.Sprintf("<HTTPS> start listening at <%s>", addr))
|
||||
if err := httpSrv.ListenAndServeTLS(serverCrt, serverKey); err != nil {
|
||||
|
||||
Reference in New Issue
Block a user