Renamed DispatcherH in RegistarC

This commit is contained in:
Trial97
2021-02-26 16:24:38 +02:00
committed by Dan Christian Bogos
parent fe80a80e47
commit 038aa5f2ea
34 changed files with 966 additions and 669 deletions

102
registrarc/dispatcherh.go Normal file
View File

@@ -0,0 +1,102 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package registrarc
import (
"fmt"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
// NewRegistrarCService constructs a DispatcherHService
func NewRegistrarCService(cfg *config.CGRConfig,
connMgr *engine.ConnManager) *DispatcherHostsService {
return &DispatcherHostsService{
cfg: cfg,
connMgr: connMgr,
}
}
// DispatcherHostsService is the service handling dispatching towards internal components
// designed to handle automatic partitioning and failover
type DispatcherHostsService struct {
cfg *config.CGRConfig
connMgr *engine.ConnManager
}
// ListenAndServe will initialize the service
func (dhS *DispatcherHostsService) ListenAndServe(stopChan chan struct{}) {
utils.Logger.Info("Starting DispatcherH service")
for {
dhS.registerHosts()
select {
case <-stopChan:
return
case <-time.After(dhS.cfg.RegistrarCCfg().Dispatcher.RefreshInterval):
}
}
}
// Shutdown is called to shutdown the service
func (dhS *DispatcherHostsService) Shutdown() {
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.RegistrarC))
dhS.unregisterHosts()
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.RegistrarC))
return
}
func (dhS *DispatcherHostsService) registerHosts() {
for _, connID := range dhS.cfg.RegistrarCCfg().Dispatcher.RegistrarSConns {
for tnt, hostCfgs := range dhS.cfg.RegistrarCCfg().Dispatcher.Hosts {
if tnt == utils.MetaDefault {
tnt = dhS.cfg.GeneralCfg().DefaultTenant
}
args, err := NewRegisterArgs(dhS.cfg, tnt, hostCfgs)
if err != nil {
continue
}
var rply string
if err := dhS.connMgr.Call([]string{connID}, nil, utils.RegistrarSv1RegisterDispatcherHosts, args, &rply); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts to the conn with ID <%s> because : %s",
utils.RegistrarC, connID, err))
continue
}
}
}
return
}
func (dhS *DispatcherHostsService) unregisterHosts() {
var rply string
for _, connID := range dhS.cfg.RegistrarCCfg().Dispatcher.RegistrarSConns {
for tnt, hostCfgs := range dhS.cfg.RegistrarCCfg().Dispatcher.Hosts {
if tnt == utils.MetaDefault {
tnt = dhS.cfg.GeneralCfg().DefaultTenant
}
if err := dhS.connMgr.Call([]string{connID}, nil, utils.RegistrarSv1UnregisterDispatcherHosts, NewUnregisterArgs(tnt, hostCfgs), &rply); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts with tenant<%s> to the conn with ID <%s> because : %s",
utils.RegistrarC, tnt, connID, err))
continue
}
}
}
}

View File

@@ -0,0 +1,186 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package registrarc
import (
"bytes"
"net/rpc"
"os/exec"
"path"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var (
dspDir string
dspCfgPath string
dspCfg *config.CGRConfig
dspCmd *exec.Cmd
dspRPC *rpc.Client
allDir string
allCfgPath string
allCmd *exec.Cmd
all2Dir string
all2CfgPath string
all2Cmd *exec.Cmd
dsphTest = []func(t *testing.T){
testDsphInitCfg,
testDsphInitDB,
testDsphStartEngine,
testDsphLoadData,
testDsphBeforeDsphStart,
testDsphStartAll2,
testDsphStartAll,
testDsphStopEngines,
testDsphStopDispatcher,
}
)
func TestDspHosts(t *testing.T) {
switch *dbType {
case utils.MetaMySQL:
allDir = "all_mysql"
all2Dir = "all2_mysql"
dspDir = "dispatchers_mysql"
case utils.MetaMongo:
allDir = "all_mongo"
all2Dir = "all2_mongo"
dspDir = "dispatchers_mongo"
case utils.MetaInternal, utils.MetaPostgres:
t.SkipNow()
default:
t.Fatal("Unknown Database type")
}
for _, stest := range dsphTest {
t.Run(dspDir, stest)
}
}
func testDsphInitCfg(t *testing.T) {
dspCfgPath = path.Join(*dataDir, "conf", "samples", "dispatcherh", dspDir)
allCfgPath = path.Join(*dataDir, "conf", "samples", "dispatcherh", allDir)
all2CfgPath = path.Join(*dataDir, "conf", "samples", "dispatcherh", all2Dir)
var err error
if dspCfg, err = config.NewCGRConfigFromPath(dspCfgPath); err != nil {
t.Error(err)
}
}
func testDsphInitDB(t *testing.T) {
if err := engine.InitDataDb(dspCfg); err != nil {
t.Fatal(err)
}
if err := engine.InitStorDb(dspCfg); err != nil {
t.Fatal(err)
}
}
func testDsphStartEngine(t *testing.T) {
var err error
if dspCmd, err = engine.StopStartEngine(dspCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
dspRPC, err = newRPCClient(dspCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
}
}
func testDsphLoadData(t *testing.T) {
loader := exec.Command("cgr-loader", "-config_path", dspCfgPath, "-path", path.Join(*dataDir, "tariffplans", "dispatcherh"), "-caches_address=")
output := bytes.NewBuffer(nil)
outerr := bytes.NewBuffer(nil)
loader.Stdout = output
loader.Stderr = outerr
if err := loader.Run(); err != nil {
t.Log(loader.Args)
t.Log(output.String())
t.Log(outerr.String())
t.Fatal(err)
}
}
func testDsphGetNodeID() (id string, err error) {
var status map[string]interface{}
if err = dspRPC.Call(utils.CoreSv1Status, utils.TenantWithOpts{
Tenant: "cgrates.org",
Opts: map[string]interface{}{},
}, &status); err != nil {
return
}
return utils.IfaceAsString(status[utils.NodeID]), nil
}
func testDsphBeforeDsphStart(t *testing.T) {
if _, err := testDsphGetNodeID(); err == nil || err.Error() != utils.ErrHostNotFound.Error() {
t.Errorf("Expected error: %s received: %v", utils.ErrHostNotFound, err)
}
}
func testDsphStartAll2(t *testing.T) {
var err error
if all2Cmd, err = engine.StartEngine(all2CfgPath, *waitRater); err != nil {
t.Fatal(err)
}
if nodeID, err := testDsphGetNodeID(); err != nil {
t.Fatal(err)
} else if nodeID != "ALL2" {
t.Errorf("Expected nodeID: %q ,received: %q", "ALL2", nodeID)
}
}
func testDsphStartAll(t *testing.T) {
var err error
if allCmd, err = engine.StartEngine(allCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
if nodeID, err := testDsphGetNodeID(); err != nil {
t.Fatal(err)
} else if nodeID != "ALL" {
t.Errorf("Expected nodeID: %q ,received: %q", "ALL", nodeID)
}
}
func testDsphStopEngines(t *testing.T) {
if err := allCmd.Process.Kill(); err != nil {
t.Fatal(err)
}
if err := all2Cmd.Process.Kill(); err != nil {
t.Fatal(err)
}
time.Sleep(2 * time.Second)
if _, err := testDsphGetNodeID(); err == nil || err.Error() != utils.ErrHostNotFound.Error() {
t.Errorf("Expected error: %s received: %v", utils.ErrHostNotFound, err)
}
}
func testDsphStopDispatcher(t *testing.T) {
if err := engine.KillEngine(*waitRater); err != nil {
t.Error(err)
}
}

View File

@@ -0,0 +1,125 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package registrarc
import (
"net/http"
"net/http/httptest"
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
func TestDispatcherHostsService(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(Registar))
defer ts.Close()
cfg := config.NewDefaultCGRConfig()
cfg.RPCConns()["conn1"] = &config.RPCConn{
Strategy: rpcclient.PoolFirst,
Conns: []*config.RemoteHost{{
Address: ts.URL,
Synchronous: true,
TLS: false,
Transport: rpcclient.HTTPjson,
}},
}
cfg.DispatcherHCfg().Enabled = true
cfg.DispatcherHCfg().Hosts = map[string][]*config.DispatcherHRegistarCfg{
utils.MetaDefault: {
{
ID: "Host1",
RegisterTransport: utils.MetaJSON,
},
},
}
cfg.DispatcherHCfg().RefreshInterval = 100 * time.Millisecond
cfg.DispatcherHCfg().RegistrarSConns = []string{"conn1"}
ds := NewRegistrarCService(cfg, engine.NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{}))
ds.registerHosts()
host1 := &engine.DispatcherHost{
Tenant: "cgrates.org",
ID: "Host1",
Conn: &config.RemoteHost{
Address: "127.0.0.1:2012",
Transport: utils.MetaJSON,
},
}
if x, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); !ok {
t.Errorf("Expected to find Host1 in cache")
} else if !reflect.DeepEqual(host1, x) {
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(host1), utils.ToJSON(x))
}
cfg.DispatcherHCfg().Hosts = map[string][]*config.DispatcherHRegistarCfg{
utils.MetaDefault: {
{
ID: "Host2",
RegisterTransport: utils.MetaJSON,
},
},
}
config.CgrConfig().CacheCfg().Partitions[utils.CacheDispatcherHosts].Replicate = true
config.CgrConfig().CacheCfg().ReplicationConns = []string{"*localhost"}
ds.registerHosts()
host1.ID = "Host2"
if x, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); !ok {
t.Errorf("Expected to find Host2 in cache")
} else if !reflect.DeepEqual(host1, x) {
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(host1), utils.ToJSON(x))
}
ds.unregisterHosts()
if _, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); ok {
t.Errorf("Expected to not find Host2 in cache")
}
config.CgrConfig().CacheCfg().Partitions[utils.CacheDispatcherHosts].Replicate = false
config.CgrConfig().CacheCfg().ReplicationConns = []string{}
host1.ID = "Host1"
cfg.DispatcherHCfg().Hosts = map[string][]*config.DispatcherHRegistarCfg{
utils.MetaDefault: {
{
ID: "Host1",
RegisterTransport: utils.MetaJSON,
},
},
}
ds.Shutdown()
if _, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); ok {
t.Errorf("Expected to not find Host2 in cache")
}
cfg.ListenCfg().RPCJSONListen = "2012"
ds.registerHosts()
ds = NewRegistrarCService(cfg, engine.NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{}))
ds.Shutdown()
stopChan := make(chan struct{})
close(stopChan)
ds.ListenAndServe(stopChan)
}

46
registrarc/lib_test.go Normal file
View File

@@ -0,0 +1,46 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package registrarc
import (
"errors"
"flag"
"net/rpc"
"net/rpc/jsonrpc"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
var (
dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
waitRater = flag.Int("wait_rater", 100, "Number of milliseconds to wait for rater to start and cache")
encoding = flag.String("rpc", utils.MetaJSON, "what encoding would be used for rpc communication")
dbType = flag.String("dbtype", utils.MetaInternal, "The type of DataBase (Internal/Mongo/mySql)")
)
func newRPCClient(cfg *config.ListenCfg) (c *rpc.Client, err error) {
switch *encoding {
case utils.MetaJSON:
return jsonrpc.Dial(utils.TCP, cfg.RPCJSONListen)
case utils.MetaGOB:
return rpc.Dial(utils.TCP, cfg.RPCGOBListen)
default:
return nil, errors.New("UNSUPPORTED_RPC")
}
}

View File

@@ -0,0 +1,275 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package registrarc
import (
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// NewRegisterArgs creates the arguments for register hosts API
func NewRegisterArgs(cfg *config.CGRConfig, tnt string, hostCfgs []*config.RemoteHost) (rargs *RegisterArgs, err error) {
rargs = &RegisterArgs{
Tenant: tnt,
Opts: make(map[string]interface{}),
Hosts: make([]*RegisterHostCfg, len(hostCfgs)),
}
for i, hostCfg := range hostCfgs {
var port string
if port, err = getConnPort(cfg,
hostCfg.Transport,
hostCfg.TLS); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to get the port because : %s",
utils.RegistrarC, err))
return
}
rargs.Hosts[i] = &RegisterHostCfg{
ID: hostCfg.ID,
Port: port,
Transport: hostCfg.Transport,
TLS: hostCfg.TLS,
}
}
return
}
// RegisterArgs the arguments to register the dispacher host
type RegisterArgs struct {
Tenant string
Opts map[string]interface{}
Hosts []*RegisterHostCfg
}
// RegisterHostCfg the host config used to register
type RegisterHostCfg struct {
ID string
Port string
Transport string
TLS bool
Synchronous bool
}
// AsDispatcherHosts converts the arguments to DispatcherHosts
func (rargs *RegisterArgs) AsDispatcherHosts(ip string) (dHs []*engine.DispatcherHost) {
dHs = make([]*engine.DispatcherHost, len(rargs.Hosts))
for i, hCfg := range rargs.Hosts {
dHs[i] = hCfg.AsDispatcherHost(rargs.Tenant, ip)
}
return
}
// AsDispatcherHost converts the arguments to DispatcherHosts
func (rhc *RegisterHostCfg) AsDispatcherHost(tnt, ip string) *engine.DispatcherHost {
return &engine.DispatcherHost{
Tenant: tnt,
RemoteHost: &config.RemoteHost{
ID: rhc.ID,
Address: ip + ":" + rhc.Port,
Transport: rhc.Transport,
TLS: rhc.TLS,
},
}
}
// NewUnregisterArgs creates the arguments for unregister hosts API
func NewUnregisterArgs(tnt string, hostCfgs []*config.RemoteHost) (uargs *UnregisterArgs) {
uargs = &UnregisterArgs{
Tenant: tnt,
Opts: make(map[string]interface{}),
IDs: make([]string, len(hostCfgs)),
}
for i, hostCfg := range hostCfgs {
uargs.IDs[i] = hostCfg.ID
}
return
}
// UnregisterArgs the arguments to unregister the dispacher host
type UnregisterArgs struct {
Tenant string
Opts map[string]interface{}
IDs []string
}
// Registrar handdle for httpServer to register the dispatcher hosts
func Registrar(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
w.Header().Set("Content-Type", "application/json")
var result interface{} = utils.OK
var errMessage interface{}
var err error
var id *json.RawMessage
if id, err = register(r); err != nil {
result, errMessage = nil, err.Error()
}
if err := utils.WriteServerResponse(w, id, result, errMessage); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to write resonse because: %s",
utils.RegistrarC, err))
}
}
func register(req *http.Request) (*json.RawMessage, error) {
id := json.RawMessage("0")
sReq, err := utils.DecodeServerRequest(req.Body)
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode request because: %s",
utils.RegistrarC, err))
return &id, err
}
var hasErrors bool
switch sReq.Method {
default:
err = errors.New("rpc: can't find service " + sReq.Method)
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to register hosts because: %s",
utils.RegistrarC, err))
return sReq.Id, err
case utils.RegistrarSv1UnregisterDispatcherHosts:
var args UnregisterArgs
params := []interface{}{&args}
if err = json.Unmarshal(*sReq.Params, &params); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode params because: %s",
utils.RegistrarC, err))
return sReq.Id, err
}
for _, id := range args.IDs {
if err = engine.Cache.Remove(utils.CacheDispatcherHosts, utils.ConcatenatedKey(args.Tenant, id), true, utils.NonTransactional); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to remove DispatcherHost <%s> from cache because: %s",
utils.RegistrarC, id, err))
hasErrors = true
continue
}
}
case utils.RegistrarSv1RegisterDispatcherHosts:
var dHs RegisterArgs
params := []interface{}{&dHs}
if err = json.Unmarshal(*sReq.Params, &params); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode params because: %s",
utils.RegistrarC, err))
return sReq.Id, err
}
var addr string
if addr, err = utils.GetRemoteIP(req); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to obtain the remote IP because: %s",
utils.RegistrarC, err))
return sReq.Id, err
}
for _, dH := range dHs.AsDispatcherHosts(addr) {
if err = engine.Cache.Set(utils.CacheDispatcherHosts, dH.TenantID(), dH, nil,
true, utils.NonTransactional); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to set DispatcherHost <%s> in cache because: %s",
utils.RegistrarC, dH.TenantID(), err))
hasErrors = true
continue
}
}
case utils.RegistrarSv1UnregisterRPCHosts:
var args UnregisterArgs
params := []interface{}{&args}
if err = json.Unmarshal(*sReq.Params, &params); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode params because: %s",
utils.RegistrarC, err))
return sReq.Id, err
}
config.CgrConfig().LockSections(config.RPCConnsJsonName)
for _, connID := range args.IDs {
if err = engine.Cache.Remove(utils.CacheRPCConnections, connID,
true, utils.NonTransactional); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to remove connection <%s> in cache because: %s",
utils.RegistrarC, connID, err))
hasErrors = true
}
}
config.CgrConfig().UnlockSections(config.RPCConnsJsonName)
case utils.RegistrarSv1RegisterRPCHosts:
var dHs RegisterArgs
params := []interface{}{&dHs}
if err = json.Unmarshal(*sReq.Params, &params); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode params because: %s",
utils.RegistrarC, err))
return sReq.Id, err
}
var addr string
if addr, err = utils.GetRemoteIP(req); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to obtain the remote IP because: %s",
utils.RegistrarC, err))
return sReq.Id, err
}
cfgHosts := make(map[string]*config.RemoteHost)
for _, dH := range dHs.AsDispatcherHosts(addr) {
cfgHosts[dH.ID] = dH.RemoteHost
}
config.CgrConfig().LockSections(config.RPCConnsJsonName)
for connID := range config.UpdateRPCCons(config.CgrConfig().RPCConns(), cfgHosts) {
if err = engine.Cache.Remove(utils.CacheRPCConnections, connID,
true, utils.NonTransactional); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to remove connection <%s> in cache because: %s",
utils.RegistrarC, connID, err))
hasErrors = true
}
}
config.CgrConfig().UnlockSections(config.RPCConnsJsonName)
}
if hasErrors {
return sReq.Id, utils.ErrPartiallyExecuted
}
return sReq.Id, nil
}
func getConnPort(cfg *config.CGRConfig, transport string, tls bool) (port string, err error) {
var address string
var extraPath string
switch transport {
case utils.MetaJSON:
if tls {
address = cfg.ListenCfg().RPCJSONTLSListen
} else {
address = cfg.ListenCfg().RPCJSONListen
}
case utils.MetaGOB:
if tls {
address = cfg.ListenCfg().RPCGOBTLSListen
} else {
address = cfg.ListenCfg().RPCGOBListen
}
case rpcclient.HTTPjson:
if tls {
address = cfg.ListenCfg().HTTPTLSListen
} else {
address = cfg.ListenCfg().HTTPListen
}
extraPath = cfg.HTTPCfg().HTTPJsonRPCURL
}
if _, port, err = net.SplitHostPort(address); err != nil {
return
}
port += extraPath
return
}

View File

@@ -0,0 +1,354 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package registrarc
import (
"bytes"
"encoding/json"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
func TestRegisterArgsAsDispatcherHosts(t *testing.T) {
args := &RegisterArgs{
Tenant: "cgrates.org",
Hosts: []*RegisterHostCfg{
{
ID: "Host1",
Port: "2012",
TLS: true,
Transport: utils.MetaJSON,
},
{
ID: "Host2",
Port: "2013",
TLS: false,
Transport: utils.MetaGOB,
},
},
Opts: make(map[string]interface{}),
}
exp := []*engine.DispatcherHost{
{
Tenant: "cgrates.org",
RemoteHost: &config.RemoteHost{
ID: "Host1",
Address: "127.0.0.1:2012",
TLS: true,
Transport: utils.MetaJSON,
},
},
{
Tenant: "cgrates.org",
RemoteHost: &config.RemoteHost{
ID: "Host2",
Address: "127.0.0.1:2013",
TLS: false,
Transport: utils.MetaGOB,
},
},
}
if rply := args.AsDispatcherHosts("127.0.0.1"); !reflect.DeepEqual(exp, rply) {
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(exp), utils.ToJSON(rply))
}
}
func TestGetConnPort(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.ListenCfg().RPCJSONTLSListen = ":2072"
cfg.ListenCfg().RPCJSONListen = ":2012"
cfg.ListenCfg().RPCGOBTLSListen = ":2073"
cfg.ListenCfg().RPCGOBListen = ":2013"
cfg.ListenCfg().HTTPTLSListen = ":2081"
cfg.ListenCfg().HTTPListen = ":2080"
cfg.HTTPCfg().HTTPJsonRPCURL = "/json_rpc"
if port, err := getConnPort(cfg, utils.MetaJSON, false); err != nil {
t.Fatal(err)
} else if port != "2012" {
t.Errorf("Expected: %q ,received: %q", "2012", port)
}
if port, err := getConnPort(cfg, utils.MetaJSON, true); err != nil {
t.Fatal(err)
} else if port != "2072" {
t.Errorf("Expected: %q ,received: %q", "2072", port)
}
if port, err := getConnPort(cfg, utils.MetaGOB, false); err != nil {
t.Fatal(err)
} else if port != "2013" {
t.Errorf("Expected: %q ,received: %q", "2013", port)
}
if port, err := getConnPort(cfg, utils.MetaGOB, true); err != nil {
t.Fatal(err)
} else if port != "2073" {
t.Errorf("Expected: %q ,received: %q", "2073", port)
}
if port, err := getConnPort(cfg, rpcclient.HTTPjson, false); err != nil {
t.Fatal(err)
} else if port != "2080/json_rpc" {
t.Errorf("Expected: %q ,received: %q", "2080/json_rpc", port)
}
if port, err := getConnPort(cfg, rpcclient.HTTPjson, true); err != nil {
t.Fatal(err)
} else if port != "2081/json_rpc" {
t.Errorf("Expected: %q ,received: %q", "2081/json_rpc", port)
}
cfg.ListenCfg().RPCJSONListen = "2012"
if _, err := getConnPort(cfg, utils.MetaJSON, false); err == nil {
t.Fatal("Expected error received nil")
}
}
func TestRegister(t *testing.T) {
ra := &RegisterArgs{
Tenant: "cgrates.org",
Hosts: []*RegisterHostCfg{
{
ID: "Host1",
Port: "2012",
TLS: true,
Transport: utils.MetaJSON,
},
{
ID: "Host2",
Port: "2013",
TLS: false,
Transport: utils.MetaGOB,
},
},
Opts: make(map[string]interface{}),
}
raJSON, err := json.Marshal([]interface{}{ra})
id := json.RawMessage("1")
if err != nil {
t.Fatal(err)
}
args := utils.NewServerRequest(utils.RegistrarSv1RegisterDispatcherHosts, raJSON, id)
argsJSON, err := json.Marshal(args)
if err != nil {
t.Fatal(err)
}
req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:2080/json_rpc", bytes.NewBuffer(argsJSON))
if err != nil {
t.Fatal(err)
}
req.RemoteAddr = "127.0.0.1:2356"
engine.SetCache(engine.NewCacheS(config.CgrConfig(), nil, nil))
if rplyID, err := register(req); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(id, *rplyID) {
t.Errorf("Expected: %q ,received: %q", string(id), string(*rplyID))
}
host1 := &engine.DispatcherHost{
Tenant: "cgrates.org",
RemoteHost: &config.RemoteHost{
ID: "Host1",
Address: "127.0.0.1:2012",
TLS: true,
Transport: utils.MetaJSON,
},
}
host2 := &engine.DispatcherHost{
Tenant: "cgrates.org",
RemoteHost: &config.RemoteHost{
ID: "Host2",
Address: "127.0.0.1:2013",
TLS: false,
Transport: utils.MetaGOB,
},
}
if x, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); !ok {
t.Errorf("Expected to find Host1 in cache")
} else if !reflect.DeepEqual(host1, x) {
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(host1), utils.ToJSON(x))
}
if x, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host2.TenantID()); !ok {
t.Errorf("Expected to find Host2 in cache")
} else if !reflect.DeepEqual(host2, x) {
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(host2), utils.ToJSON(x))
}
if _, err := register(req); err != io.EOF {
t.Errorf("Expected error: %s ,received: %v", io.EOF, err)
}
ua := &UnregisterArgs{
Tenant: "cgrates.org",
IDs: []string{"Host1", "Host2"},
Opts: make(map[string]interface{}),
}
uaJSON, err := json.Marshal([]interface{}{ua})
id = json.RawMessage("2")
if err != nil {
t.Fatal(err)
}
uargs := utils.NewServerRequest(utils.RegistrarSv1UnregisterDispatcherHosts, uaJSON, id)
uargsJSON, err := json.Marshal(uargs)
if err != nil {
t.Fatal(err)
}
req, err = http.NewRequest(http.MethodPost, "http://127.0.0.1:2080/json_rpc", bytes.NewBuffer(uargsJSON))
if err != nil {
t.Fatal(err)
}
req.RemoteAddr = "127.0.0.1:2356"
if rplyID, err := register(req); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(id, *rplyID) {
t.Errorf("Expected: %q ,received: %q", string(id), string(*rplyID))
}
if x, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); ok {
t.Errorf("Expected to not find Host1 in cache %+v", x)
}
if x, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host2.TenantID()); ok {
t.Errorf("Expected to not find Host2 in cache %+v", x)
}
errCfg := config.NewDefaultCGRConfig()
engine.NewConnManager(errCfg, map[string]chan rpcclient.ClientConnector{})
errCfg.CacheCfg().Partitions[utils.CacheDispatcherHosts].Replicate = true
errCfg.RPCConns()["errCon"] = &config.RPCConn{
Strategy: utils.MetaFirst,
PoolSize: 1,
Conns: []*config.RemoteHost{
{
Address: "127.0.0.1:5612",
Transport: "*json",
Synchronous: false,
TLS: false,
},
},
}
errCfg.CacheCfg().ReplicationConns = []string{"errCon"}
engine.SetCache(engine.NewCacheS(errCfg, nil, nil))
req.Body = ioutil.NopCloser(bytes.NewBuffer(uargsJSON))
if _, err := register(req); err != utils.ErrPartiallyExecuted {
t.Errorf("Expected error: %s ,received: %v", utils.ErrPartiallyExecuted, err)
}
req.Body = ioutil.NopCloser(bytes.NewBuffer(argsJSON))
if _, err := register(req); err != utils.ErrPartiallyExecuted {
t.Errorf("Expected error: %s ,received: %v", utils.ErrPartiallyExecuted, err)
}
req.RemoteAddr = "127.0.0"
req.Body = ioutil.NopCloser(bytes.NewBuffer(argsJSON))
if _, err := register(req); err == nil {
t.Errorf("Expected error,received: nil")
}
args2 := utils.NewServerRequest(utils.RegistrarSv1RegisterDispatcherHosts, id, id)
args2JSON, err := json.Marshal(args2)
if err != nil {
t.Fatal(err)
}
req.Body = ioutil.NopCloser(bytes.NewBuffer(args2JSON))
if _, err := register(req); err == nil {
t.Errorf("Expected error,received: nil")
}
args2 = utils.NewServerRequest(utils.RegistrarSv1UnregisterDispatcherHosts, id, id)
args2JSON, err = json.Marshal(args2)
if err != nil {
t.Fatal(err)
}
req.Body = ioutil.NopCloser(bytes.NewBuffer(args2JSON))
if _, err := register(req); err == nil {
t.Errorf("Expected error,received: nil")
}
args2 = utils.NewServerRequest(utils.DispatcherSv1GetProfileForEvent, id, id)
args2JSON, err = json.Marshal(args2)
if err != nil {
t.Fatal(err)
}
req.Body = ioutil.NopCloser(bytes.NewBuffer(args2JSON))
if _, err := register(req); err == nil {
t.Errorf("Expected error,received: nil")
}
engine.SetCache(engine.NewCacheS(config.CgrConfig(), nil, nil))
}
type errRecorder struct{}
func (*errRecorder) Header() http.Header { return make(http.Header) }
func (*errRecorder) Write([]byte) (int, error) { return 0, io.EOF }
func (*errRecorder) WriteHeader(statusCode int) {}
func TestRegistrar(t *testing.T) {
w := httptest.NewRecorder()
ra := &RegisterArgs{
Tenant: "cgrates.org",
Hosts: []*RegisterHostCfg{
{
ID: "Host1",
Port: "2012",
TLS: true,
Transport: utils.MetaJSON,
},
{
ID: "Host2",
Port: "2013",
TLS: false,
Transport: utils.MetaGOB,
},
},
Opts: make(map[string]interface{}),
}
raJSON, err := json.Marshal([]interface{}{ra})
id := json.RawMessage("1")
if err != nil {
t.Fatal(err)
}
args := utils.NewServerRequest(utils.RegistrarSv1RegisterDispatcherHosts, raJSON, id)
argsJSON, err := json.Marshal(args)
if err != nil {
t.Fatal(err)
}
req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:2080/json_rpc", bytes.NewBuffer(argsJSON))
if err != nil {
t.Fatal(err)
}
req.RemoteAddr = "127.0.0.1:2356"
Registrar(w, req)
exp := "{\"id\":1,\"result\":\"OK\",\"error\":null}\n"
if w.Body.String() != exp {
t.Errorf("Expected: %q ,received: %q", exp, w.Body.String())
}
w = httptest.NewRecorder()
Registrar(w, req)
exp = "{\"id\":0,\"result\":null,\"error\":\"EOF\"}\n"
if w.Body.String() != exp {
t.Errorf("Expected: %q ,received: %q", exp, w.Body.String())
}
Registrar(new(errRecorder), req)
}