IMproved sipatcher test cases and cache conn bug

This commit is contained in:
adi
2022-10-05 16:43:05 +03:00
committed by Dan Christian Bogos
parent 8db98f1408
commit 25cd303be6
5 changed files with 275 additions and 362 deletions

View File

@@ -33,6 +33,7 @@
"caches":{
"partitions": {
"*dispatcher_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false},
"*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false}, // control dispatcher routes caching
"*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false} // control dispatcher interface
},

View File

@@ -30,6 +30,7 @@
"caches":{
"partitions": {
"*dispatcher_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false},
"*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false}, // control dispatcher routes caching
"*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false} // control dispatcher interface
},

View File

@@ -665,7 +665,9 @@ func TestLibDispatcherLoadDispatcherCacheError4(t *testing.T) {
cacheInit := engine.Cache
cfg := config.NewDefaultCGRConfig()
cfg.CacheCfg().ReplicationConns = []string{"con"}
cfg.CacheCfg().RemoteConns = []string{"con1"}
cfg.CacheCfg().Partitions[utils.CacheDispatcherRoutes].Replicate = true
cfg.CacheCfg().Partitions[utils.CacheDispatcherRoutes].Remote = true
cfg.RPCConns()["con"] = &config.RPCConn{
Strategy: "",
PoolSize: 0,
@@ -678,8 +680,20 @@ func TestLibDispatcherLoadDispatcherCacheError4(t *testing.T) {
},
},
}
rpcCl := map[string]chan rpcclient.ClientConnector{}
connMng := engine.NewConnManager(cfg, rpcCl)
cfg.RPCConns()["con1"] = &config.RPCConn{
Strategy: "*first",
PoolSize: 0,
Conns: []*config.RemoteHost{
{
ID: "conn_internal",
Address: "*internal",
Transport: "",
TLS: false,
},
},
}
//rpcCl := map[string]chan rpcclient.ClientConnector{}
connMng := engine.NewConnManager(cfg, nil)
dm := engine.NewDataManager(nil, nil, connMng)
newCache := engine.NewCacheS(cfg, dm, nil)
@@ -722,7 +736,7 @@ func TestLibDispatcherLoadDispatcherCacheError4(t *testing.T) {
sorter: new(noSort),
}
err := wgDsp.Dispatch(dm, nil, nil, "testTENANT", "testID", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
expected := "DISCONNECTED"
expected := "UNSUPPORTED_SERVICE_METHOD"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}

View File

@@ -206,7 +206,7 @@ func (chS *CacheS) GetWithRemote(args *utils.ArgsGetCacheItemWithAPIOpts) (itm i
}
if len(chS.cfg.CacheCfg().RemoteConns) == 0 ||
!chS.cfg.CacheCfg().Partitions[args.CacheID].Remote {
return
return nil, utils.ErrNotFound
}
// item was not found locally, query from remote
if err = connMgr.Call(chS.cfg.CacheCfg().RemoteConns, nil,

View File

@@ -4,14 +4,17 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
@@ -31,46 +34,65 @@ import (
)
var (
setterCfgPath string
dspOptsCfgPath string
apierCfgPath string
setterCfgPath string
setterCfg *config.CGRConfig
dspOptsCfg *config.CGRConfig
apierCfg *config.CGRConfig
setterCfg *config.CGRConfig
setterRPC *rpc.Client
dspOptsRPC *rpc.Client
apierRPC *rpc.Client
setterRPC *rpc.Client
dspOptsConfigDIR string
dpsOptsTest = []func(t *testing.T){
testDispatcherOptsSetterInitCfg,
testDispatcherOptsSetterInitDataDb,
testDispatcherOptsSetterStartEngine,
testDispatcherOptsSetterRPCConn,
// FIRST APRT OF THE TEST
// Start engine without Dispatcher on engine 4012
testDispatcherOptsAPIerInitCfg,
testDispatcherOptsAPIerInitDataDb,
testDispatcherOptsAPIerStartEngine,
testDispatcherOptsAPIerRPCConn,
testDispatcherOptsAPIerSetDispatcherProfile,
testDispatcherOptsAdminInitCfg,
testDispatcherOptsAdminInitDataDb,
testDispatcherOptsAdminStartEngine,
testDispatcherOptsAdminRPCConn,
// Start engine without Dispatcher on engine 2012 with profiles in database (*dispatchers:false)
// Sending Status requests in both engines, with *dispatchers:false
testDispatcherOptsDSPInitCfg,
testDispatcherOptsDSPStartEngine,
testDispatcherOptsDSPRPCConn,
testDispatcherOptsCoreStatus, // localhost(:2012) CoresV1Status
testDispatcherGetItemBothEngines1,
testDispatcherAPIerCoreStatus,
testDispatcherOptsCoreStatus, // *disaptchers:false
testDispatcherAdminCoreStatus, // *disaptchers:false
testDispatcherOptsAPIerSetDispatcherHost4012,
testDispatcherCacheClearBothEngines,
testDispatcherOptsCoreStatusHost4012,
testDispatcherGetItemBothEngines2,
testDispatcherGetItemBothEnginesFirstAttempt, // NOT FOUND
testDispatcherOptsDSPStopEngine,
testDispatcherOptsAPIerStopEngine,
testDispatcherOptsAdminStopEngine,
// SECOND PART OF THE TEST
// START HOST2 engine
testDispatcherSetterInitCfg,
testDispatcherSetterStartEngine,
testDispatcherSetterRPCConn,
testDispatcherOptsAdminStartEngine,
testDispatcherOptsAdminRPCConn,
testDispatcherOptsSetterSetDispatcherProfile, // contains both hosts, HOST1 prio, host2 backup
testDispatcherAdminCoreStatusWithRouteID, // HOST2 matched because HOST1 is not started yet
testDispatcherAdminGetItemHOST2,
// START HOST1 engine
testDispatcherOptsDSPStartEngine,
testDispatcherOptsDSPRPCConn,
testDispatcherAdminCoreStatusWithRouteID, // same HOST2 will be matched, due to routeID
// clear cache in order to remove routeID
testDisaptcherCacheClear,
testDispatcherAdminCoreStatusWithRouteIDButHost1, // due to clearing cache, HOST1 will be matched
// verify cache of dispatchers, SetDispatcherProfile API should reload the dispatchers cache (instance, profile and route)
testDispatcherOptsDSPStopEngine,
testDispatcherOptsAdminStopEngine,
}
)
@@ -80,7 +102,7 @@ func TestDispatcherOpts(t *testing.T) {
}
}
func testDispatcherOptsAPIerInitCfg(t *testing.T) {
func testDispatcherOptsAdminInitCfg(t *testing.T) {
dspOptsConfigDIR = "dispatcher_opts_apier"
var err error
apierCfgPath = path.Join(*dataDir, "conf", "samples", dspOptsConfigDIR)
@@ -90,20 +112,20 @@ func testDispatcherOptsAPIerInitCfg(t *testing.T) {
}
}
func testDispatcherOptsAPIerInitDataDb(t *testing.T) {
func testDispatcherOptsAdminInitDataDb(t *testing.T) {
if err := engine.InitDataDb(apierCfg); err != nil {
t.Fatal(err)
}
}
// Start CGR Engine woth Dispatcher enabled
func testDispatcherOptsAPIerStartEngine(t *testing.T) {
func testDispatcherOptsAdminStartEngine(t *testing.T) {
if _, err := engine.StartEngine(apierCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
func testDispatcherOptsAPIerRPCConn(t *testing.T) {
func testDispatcherOptsAdminRPCConn(t *testing.T) {
var err error
apierRPC, err = newRPCClient(apierCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
@@ -111,58 +133,6 @@ func testDispatcherOptsAPIerRPCConn(t *testing.T) {
}
}
func testDispatcherOptsAPIerSetDispatcherProfile(t *testing.T) {
// Set DispatcherHost
var replyStr string
setDispatcherHost := &engine.DispatcherHostWithAPIOpts{
DispatcherHost: &engine.DispatcherHost{
Tenant: "cgrates.org",
RemoteHost: &config.RemoteHost{
ID: "HOST1",
Address: "127.0.0.1:2012",
Transport: "*json",
ConnectAttempts: 1,
Reconnects: 3,
ConnectTimeout: time.Minute,
ReplyTimeout: 2 * time.Minute,
},
},
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
},
}
if err := setterRPC.Call(utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil {
t.Error("Unexpected error when calling APIerSv1.SetDispatcherHost: ", err)
} else if replyStr != utils.OK {
t.Error("Unexpected reply returned", replyStr)
}
// Set DispatcherProfile
setDispatcherProfile := &engine.DispatcherProfileWithAPIOpts{
DispatcherProfile: &engine.DispatcherProfile{
Tenant: "cgrates.org",
ID: "DSP1",
Strategy: "*weight",
Subsystems: []string{utils.MetaAny},
Weight: 10,
Hosts: engine.DispatcherHostProfiles{
{
ID: "HOST1",
Weight: 5,
},
},
},
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
},
}
if err := setterRPC.Call(utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil {
t.Error("Unexpected error when calling APIerSv1.SetDispatcherProfile: ", err)
} else if replyStr != utils.OK {
t.Error("Unexpected reply returned", replyStr)
}
}
func testDispatcherOptsDSPInitCfg(t *testing.T) {
dspOptsConfigDIR = "dispatcher_opts" //changed with the cfg with dispatcher on
var err error
@@ -194,8 +164,7 @@ func testDispatcherOptsCoreStatus(t *testing.T) {
ev := utils.TenantWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
"*host": "HOST2",
utils.OptsRouteID: "account#dan.bogos",
utils.OptsDispatchers: false,
},
}
if err := dspOptsRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
@@ -205,7 +174,25 @@ func testDispatcherOptsCoreStatus(t *testing.T) {
}
}
func testDispatcherGetItemBothEngines1(t *testing.T) {
func testDispatcherAdminCoreStatus(t *testing.T) {
// HOST2 host matched because it was called from engine with port :4012 -> host2
var reply map[string]interface{}
ev := utils.TenantWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
utils.OptsRouteID: "account#dan.bogos",
utils.OptsDispatchers: false,
},
}
if err := apierRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
t.Error(err)
} else if reply[utils.NodeID] != "HOST2" {
t.Errorf("Expected HOST2, received %v", reply[utils.NodeID])
}
}
func testDispatcherGetItemBothEnginesFirstAttempt(t *testing.T) {
// get for *dispatcher_routes
argsCache := &utils.ArgsGetCacheItemWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
@@ -218,50 +205,111 @@ func testDispatcherGetItemBothEngines1(t *testing.T) {
}
var reply interface{}
if err := dspOptsRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
&reply); err != nil {
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
} else {
expected := map[string]interface{}{
utils.Tenant: "cgrates.org",
utils.ProfileID: "DSP1",
"HostID": "HOST1",
}
if !reflect.DeepEqual(expected, reply) {
t.Errorf("Expected %+v, \n received %+v", utils.ToJSON(expected), utils.ToJSON(reply))
}
}
// get for *dispatcher_profiles
argsCache = &utils.ArgsGetCacheItemWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
},
ArgsGetCacheItem: utils.ArgsGetCacheItem{
CacheID: utils.CacheDispatcherProfiles,
ItemID: "cgrates.org:DSP1",
},
}
if err := dspOptsRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
// get for *dispatchers
argsCache = &utils.ArgsGetCacheItemWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
},
ArgsGetCacheItem: utils.ArgsGetCacheItem{
CacheID: utils.CacheDispatchers,
ItemID: "cgrates.org:DSP1",
},
}
if err := dspOptsRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
}
func testDispatcherAPIerCoreStatus(t *testing.T) {
// HOST2 host matched because it was called from engine with port :4012 -> host2
var reply map[string]interface{}
ev := utils.TenantWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
},
}
if err := apierRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
func testDispatcherSetterInitCfg(t *testing.T) {
dspOptsConfigDIR = "dispatcher_opts_setter"
var err error
setterCfgPath = path.Join(*dataDir, "conf", "samples", dspOptsConfigDIR)
setterCfg, err = config.NewCGRConfigFromPath(setterCfgPath)
if err != nil {
t.Error(err)
} else if reply[utils.NodeID] != "HOST2" {
t.Errorf("Expected HOST2, received %v", reply[utils.NodeID])
}
}
func testDispatcherOptsAPIerSetDispatcherHost4012(t *testing.T) {
// Set DispatcherHost on 4012 host
func testDispatcherSetterStartEngine(t *testing.T) {
if _, err := engine.StartEngine(setterCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
func testDispatcherSetterRPCConn(t *testing.T) {
var err error
setterRPC, err = newRPCClient(setterCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal(err)
}
}
func testDispatcherOptsSetterSetDispatcherProfile(t *testing.T) {
// Set DispatcherHost
var replyStr string
setDispatcherHost := &engine.DispatcherHostWithAPIOpts{
DispatcherHost: &engine.DispatcherHost{
Tenant: "cgrates.org",
RemoteHost: &config.RemoteHost{
ID: "HOST1",
Address: "127.0.0.1:2012", // CGR1
Transport: "*json",
ConnectAttempts: 1,
Reconnects: 3,
ConnectTimeout: time.Minute,
ReplyTimeout: 2 * time.Minute,
},
},
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
},
}
if err := setterRPC.Call(utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil {
t.Error("Unexpected error when calling APIerSv1.SetDispatcherHost: ", err)
} else if replyStr != utils.OK {
t.Error("Unexpected reply returned", replyStr)
}
setDispatcherHost = &engine.DispatcherHostWithAPIOpts{
DispatcherHost: &engine.DispatcherHost{
Tenant: "cgrates.org",
RemoteHost: &config.RemoteHost{
ID: "HOST2",
Address: "127.0.0.1:4012",
Address: "127.0.0.1:4012", // CGR2
Transport: "*json",
ConnectAttempts: 1,
Reconnects: 3,
@@ -285,13 +333,17 @@ func testDispatcherOptsAPIerSetDispatcherHost4012(t *testing.T) {
Tenant: "cgrates.org",
ID: "DSP1",
Strategy: "*weight",
Subsystems: []string{utils.MetaAny},
Weight: 10,
Subsystems: []string{utils.MetaAny},
Hosts: engine.DispatcherHostProfiles{
{
ID: "HOST2",
ID: "HOST1",
Weight: 10,
},
{
ID: "HOST2",
Weight: 5,
},
},
},
APIOpts: map[string]interface{}{
@@ -305,47 +357,23 @@ func testDispatcherOptsAPIerSetDispatcherHost4012(t *testing.T) {
}
}
func testDispatcherOptsCoreStatusHost4012(t *testing.T) {
// status just for HOST4012
func testDispatcherAdminCoreStatusWithRouteID(t *testing.T) {
var reply map[string]interface{}
ev := utils.TenantWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
"*host": "HOST2",
utils.OptsRouteID: "account#dan.bogos",
},
}
if err := dspOptsRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
if err := apierRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
t.Error(err)
} else if reply[utils.NodeID] != "HOST2" {
t.Errorf("Expected HOST1, received %v", reply[utils.NodeID])
t.Errorf("Expected HOST2, received %v", reply[utils.NodeID])
}
}
func testDispatcherCacheClearBothEngines(t *testing.T) {
var reply string
if err := dspOptsRPC.Call(utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
},
}, &reply); err != nil {
t.Fatal(err)
} else if reply != utils.OK {
t.Errorf("Unexpected reply returned")
}
if err := apierRPC.Call(utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
},
}, &reply); err != nil {
t.Fatal(err)
} else if reply != utils.OK {
t.Errorf("Unexpected reply returned")
}
}
func testDispatcherGetItemBothEngines2(t *testing.T) {
func testDispatcherAdminGetItemHOST2(t *testing.T) {
// get for *dispatcher_routes
argsCache := &utils.ArgsGetCacheItemWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
@@ -357,7 +385,7 @@ func testDispatcherGetItemBothEngines2(t *testing.T) {
},
}
var reply interface{}
if err := dspOptsRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
&reply); err != nil {
t.Error(err)
} else {
@@ -371,9 +399,95 @@ func testDispatcherGetItemBothEngines2(t *testing.T) {
}
}
// get for *dispatcher_profiles
argsCache = &utils.ArgsGetCacheItemWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
},
ArgsGetCacheItem: utils.ArgsGetCacheItem{
CacheID: utils.CacheDispatcherProfiles,
ItemID: "cgrates.org:DSP1",
},
}
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
&reply); err != nil {
t.Error(err)
} else {
expected := map[string]interface{}{
utils.FilterIDs: nil,
"Hosts": []interface{}{
map[string]interface{}{
utils.Blocker: false,
utils.FilterIDs: nil,
utils.ID: "HOST1",
utils.Params: nil,
utils.Weight: 10.,
},
map[string]interface{}{
utils.Blocker: false,
utils.FilterIDs: nil,
utils.ID: "HOST2",
utils.Params: nil,
utils.Weight: 5.,
},
},
utils.ActivationIntervalString: nil,
utils.ID: "DSP1",
utils.Strategy: "*weight",
utils.Subsystems: []interface{}{"*any"},
"StrategyParams": nil,
utils.Tenant: "cgrates.org",
utils.Weight: 10.,
}
if !reflect.DeepEqual(expected, reply) {
t.Errorf("Expected %+v, \n received %+v", utils.ToJSON(expected), utils.ToJSON(reply))
}
}
// get for *dispatchers
argsCache = &utils.ArgsGetCacheItemWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
},
ArgsGetCacheItem: utils.ArgsGetCacheItem{
CacheID: utils.CacheDispatchers,
ItemID: "cgrates.org:DSP1",
},
}
// reply here is an interface type(singleResultDispatcher), it exists
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
&reply); err != nil {
t.Error(err)
}
}
func testDisaptcherCacheClear(t *testing.T) {
var reply string
if err := apierRPC.Call(utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
},
}, &reply); err != nil {
t.Fatal(err)
} else if reply != utils.OK {
t.Errorf("Unexpected reply returned")
}
}
func testDispatcherAdminCoreStatusWithRouteIDButHost1(t *testing.T) {
var reply map[string]interface{}
ev := utils.TenantWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
utils.OptsRouteID: "account#dan.bogos",
},
}
if err := apierRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
t.Error(err)
} else if reply[utils.NodeID] != "HOST1" {
t.Errorf("Expected HOST1, received %v", reply[utils.NodeID])
}
}
@@ -383,225 +497,8 @@ func testDispatcherOptsDSPStopEngine(t *testing.T) {
}
}
func testDispatcherOptsAPIerStopEngine(t *testing.T) {
func testDispatcherOptsAdminStopEngine(t *testing.T) {
if err := engine.KillEngine(*waitRater); err != nil {
t.Error(err)
}
}
// ----------------------------
func testDispatcherOptsSetterInitCfg(t *testing.T) {
dspOptsConfigDIR = "dispatcher_opts_setter"
var err error
setterCfgPath = path.Join(*dataDir, "conf", "samples", dspOptsConfigDIR)
setterCfg, err = config.NewCGRConfigFromPath(setterCfgPath)
if err != nil {
t.Error(err)
}
}
func testDispatcherOptsSetterInitDataDb(t *testing.T) {
if err := engine.InitDataDb(setterCfg); err != nil {
t.Fatal(err)
}
}
func testDispatcherOptsSetterStartEngine(t *testing.T) {
if _, err := engine.StartEngine(setterCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
func testDispatcherOptsSetterRPCConn(t *testing.T) {
var err error
setterRPC, err = newRPCClient(setterCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal(err)
}
}
func testDispatcherOptsSetterSetDispatcherProfile(t *testing.T) {
// Set DispatcherHost
var replyStr string
setDispatcherHost := &engine.DispatcherHostWithAPIOpts{
DispatcherHost: &engine.DispatcherHost{
Tenant: "cgrates.org",
RemoteHost: &config.RemoteHost{
ID: "SELF_ENGINE",
Address: "127.0.0.1:4012",
Transport: "*json",
ConnectAttempts: 1,
Reconnects: 3,
ConnectTimeout: time.Minute,
ReplyTimeout: 2 * time.Minute,
},
},
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
},
}
if err := setterRPC.Call(utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil {
t.Error("Unexpected error when calling APIerSv1.SetDispatcherHost: ", err)
} else if replyStr != utils.OK {
t.Error("Unexpected reply returned", replyStr)
}
// Set DispatcherProfile
setDispatcherProfile := &engine.DispatcherProfileWithAPIOpts{
DispatcherProfile: &engine.DispatcherProfile{
Tenant: "cgrates.org",
ID: "DSP1",
Strategy: "*weight",
Subsystems: []string{utils.MetaAny},
Weight: 10,
Hosts: engine.DispatcherHostProfiles{
{
ID: "SELF_ENGINE",
Weight: 5,
},
},
},
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
},
}
if err := setterRPC.Call(utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil {
t.Error("Unexpected error when calling APIerSv1.SetDispatcherProfile: ", err)
} else if replyStr != utils.OK {
t.Error("Unexpected reply returned", replyStr)
}
}
func testDispatcherOptsSetterSetDispatcherHost4012(t *testing.T) {
// Set DispatcherHost on 4012 host
var replyStr string
setDispatcherHost := &engine.DispatcherHostWithAPIOpts{
DispatcherHost: &engine.DispatcherHost{
Tenant: "cgrates.org",
RemoteHost: &config.RemoteHost{
ID: "HOST4012",
Address: "127.0.0.1:4012",
Transport: "*json",
ConnectAttempts: 1,
Reconnects: 3,
ConnectTimeout: time.Minute,
ReplyTimeout: 2 * time.Minute,
},
},
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
},
}
if err := setterRPC.Call(utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil {
t.Error("Unexpected error when calling APIerSv1.SetDispatcherHost: ", err)
} else if replyStr != utils.OK {
t.Error("Unexpected reply returned", replyStr)
}
// Set DispatcherProfile
setDispatcherProfile := &engine.DispatcherProfileWithAPIOpts{
DispatcherProfile: &engine.DispatcherProfile{
Tenant: "cgrates.org",
ID: "DSP1",
Strategy: "*weight",
Subsystems: []string{utils.MetaAny},
Weight: 10,
Hosts: engine.DispatcherHostProfiles{
{
ID: "HOST4012",
Weight: 10,
},
},
},
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
},
}
if err := setterRPC.Call(utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil {
t.Error("Unexpected error when calling APIerSv1.SetDispatcherProfile: ", err)
} else if replyStr != utils.OK {
t.Error("Unexpected reply returned", replyStr)
}
}
func testDispatcherOptsSetterSetDispatcherProfileDoubleHost(t *testing.T) {
// Set DispatcherProfile with both engines
setDispatcherProfile := &engine.DispatcherProfileWithAPIOpts{
DispatcherProfile: &engine.DispatcherProfile{
Tenant: "cgrates.org",
ID: "DSP1",
Strategy: "*weight",
Subsystems: []string{utils.MetaAny},
Weight: 10,
Hosts: engine.DispatcherHostProfiles{
{
ID: "SELF_ENGINE",
Weight: 5,
},
{
ID: "HOST4012",
Weight: 10,
},
},
},
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
},
}
var replyStr string
if err := setterRPC.Call(utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil {
t.Error("Unexpected error when calling APIerSv1.SetDispatcherProfile: ", err)
} else if replyStr != utils.OK {
t.Error("Unexpected reply returned", replyStr)
}
}
func testDispatcherOptsSetterSetDispatcherHostInexistent(t *testing.T) {
// Set DispatcherHost on 4012 host
var replyStr string
setDispatcherHost := &engine.DispatcherHostWithAPIOpts{
DispatcherHost: &engine.DispatcherHost{
Tenant: "cgrates.org",
RemoteHost: &config.RemoteHost{
ID: "INEXISTENT",
Address: "127.0.0.1:1223",
Transport: "*json",
ConnectAttempts: 1,
Reconnects: 3,
ConnectTimeout: time.Minute,
ReplyTimeout: 2 * time.Minute,
},
},
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
},
}
if err := setterRPC.Call(utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil {
t.Error("Unexpected error when calling APIerSv1.SetDispatcherHost: ", err)
} else if replyStr != utils.OK {
t.Error("Unexpected reply returned", replyStr)
}
// Set DispatcherProfile Different with an inexistent engine opened, but with a bigger weight(this should match now)
setDispatcherProfile := &engine.DispatcherProfileWithAPIOpts{
DispatcherProfile: &engine.DispatcherProfile{
Tenant: "cgrates.org",
ID: "DSP1",
Strategy: "*weight",
Subsystems: []string{utils.MetaAny},
Weight: 20,
Hosts: engine.DispatcherHostProfiles{
{
ID: "INEXISTENT",
Weight: 10,
},
},
},
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
},
}
if err := setterRPC.Call(utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil {
t.Error("Unexpected error when calling APIerSv1.SetDispatcherProfile: ", err)
} else if replyStr != utils.OK {
t.Error("Unexpected reply returned", replyStr)
}
}
func testDispatcherOptsSetterStopEngine(t *testing.T) {
}