Improved dispatcher consts and opts test working

This commit is contained in:
adi
2022-10-21 16:25:27 +03:00
committed by Dan Christian Bogos
parent 4a537e73fa
commit 8f416cdbca
8 changed files with 152 additions and 136 deletions

View File

@@ -21,10 +21,6 @@
"db_password": "CGRateS.org"
},
"attributes": {
"enabled": true
},
"dispatchers":{
"enabled": true,
"prevent_loop": true
@@ -36,7 +32,7 @@
"*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false},
"*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false}
},
"replication_conns": ["gob_cache"] ,
//"replication_conns": ["gob_cache"] ,
"remote_conns": ["gob_cache"]
},
@@ -44,7 +40,7 @@
"gob_cache": {
"strategy": "*first",
"conns": [
{"address": "127.0.0.1:2013", "transport":"*gob"}
{"address": "127.0.0.1:4013", "transport":"*gob"}
]
}
},

View File

@@ -32,14 +32,10 @@
"*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false},
"*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false}
},
"replication_conns": ["gob_cache"],
//"replication_conns": ["gob_cache"],
"remote_conns": ["gob_cache"]
},
"attributes": {
"enabled": true,
},
"apiers": {
"enabled": true
},

View File

@@ -104,7 +104,7 @@ func (dS *DispatcherService) dispatcherProfilesForEvent(tnt string, ev *utils.CG
evNm utils.MapStorage, subsys string) (dPrlfs engine.DispatcherProfiles, err error) {
// make sure dispatching is allowed
var shouldDispatch bool
if shouldDispatch, err = utils.GetBoolOpts(ev, true, utils.OptsDispatchers); err != nil {
if shouldDispatch, err = utils.GetBoolOpts(ev, true, utils.MetaDispatchers); err != nil {
return
} else {
var subsys string
@@ -200,7 +200,6 @@ func (dS *DispatcherService) dispatcherProfilesForEvent(tnt string, ev *utils.CG
// Dispatch is the method forwarding the request towards the right connection
func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
// fix missing tenant
tnt := ev.Tenant
if tnt == utils.EmptyString {
tnt = dS.cfg.GeneralCfg().DefaultTenant
@@ -222,7 +221,7 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string,
}
// avoid further processing if the request is internal
var shouldDispatch bool
if shouldDispatch, err = utils.GetBoolOpts(ev, true, utils.OptsDispatchers); err != nil {
if shouldDispatch, err = utils.GetBoolOpts(ev, true, utils.MetaDispatchers); err != nil {
return utils.NewErrDispatcherS(err)
} else {
var subsys string

View File

@@ -455,16 +455,13 @@ func callDH(dh *engine.DispatcherHost, routeID string, dR *DispatcherRoute,
Value: dR,
GroupIDs: []string{utils.ConcatenatedKey(utils.CacheDispatcherProfiles, dR.Tenant, dR.ProfileID)},
}
if len(config.CgrConfig().CacheCfg().ReplicationConns) != 0 &&
config.CgrConfig().CacheCfg().Partitions[utils.CacheDispatcherRoutes].Replicate {
if err = engine.Cache.SetWithReplicate(argsCache); err != nil {
if !rpcclient.IsNetworkError(err) {
return
}
// did not dispatch properly, fail-back to standard dispatching
utils.Logger.Warning(fmt.Sprintf("<%s> ignoring cache network error <%s> setting route dR %+v",
utils.DispatcherS, err.Error(), dR))
if err = engine.Cache.SetWithReplicate(argsCache); err != nil {
if !rpcclient.IsNetworkError(err) {
return
}
// did not dispatch properly, fail-back to standard dispatching
utils.Logger.Warning(fmt.Sprintf("<%s> ignoring cache network error <%s> setting route dR %+v",
utils.DispatcherS, err.Error(), dR))
}
}
if err = dh.Call(method, args, reply); err != nil {

View File

@@ -178,7 +178,12 @@ func (chS *CacheS) SetWithoutReplicate(chID, itmID string, value interface{},
// SetWithReplicate combines local set with replicate, receiving the arguments needed by dispatcher
func (chS *CacheS) SetWithReplicate(args *utils.ArgCacheReplicateSet) (err error) {
// normal cache set
chS.tCache.Set(args.CacheID, args.ItemID, args.Value, args.GroupIDs, true, utils.EmptyString)
if len(chS.cfg.CacheCfg().ReplicationConns) == 0 ||
!chS.cfg.CacheCfg().Partitions[args.CacheID].Replicate {
return
}
var reply string
return connMgr.Call(chS.cfg.CacheCfg().ReplicationConns, nil,
utils.CacheSv1ReplicateSet, args, &reply)

View File

@@ -2798,7 +2798,6 @@ func (dm *DataManager) GetDispatcherProfile(tenant, id string, cacheRead, cacheW
cacheCommit(transactionID), transactionID); errCh != nil {
return nil, errCh
}
}
return nil, err
}

View File

@@ -34,36 +34,38 @@ import (
)
var (
setterCfgPath string
dspOptsCfgPath string
apierCfgPath string
setterCfg *config.CGRConfig
dspOptsCfg *config.CGRConfig
apierCfg *config.CGRConfig
setterRPC *rpc.Client
dspOptsRPC *rpc.Client
apierRPC *rpc.Client
dspOptsConfigDIR string
dpsOptsTest = []func(t *testing.T){
// FIRST APRT OF THE TEST
// Start engine without Dispatcher on engine 4012
testDispatcherOptsAdminInitCfg,
testDispatcherOptsAdminInitDataDb,
testDispatcherOptsAdminStartEngine,
testDispatcherOptsAdminRPCConn,
setterCfgPath string
cfg2CfgPath string
cfg1CfgPath string
setterCfg *config.CGRConfig
cfg2OptsCfg *config.CGRConfig
cfg1Cfg *config.CGRConfig
setterRPC *rpc.Client
cgr2RPC *rpc.Client
cgr1RPC *rpc.Client
cfg1ConfigDIR string
cfg2ConfigDIR string
setterConfigDIR string
dpsOptsTest = []func(t *testing.T){
// FIRST PART OF THE TEST
// Start engine with Dispatcher on engine 2012
testDispatcherCgr1InitCfg,
testDispatcherCgr1InitDataDb,
testDispatcherCgr1StartEngine,
testDispatcherCgr1RPCConn,
// Sending Status requests in both engines, with *dispatchers:false
testDispatcherOptsDSPInitCfg,
testDispatcherOptsDSPStartEngine,
testDispatcherOptsDSPRPCConn,
testDispatcherCgr2InitCfg,
testDispatcherCgr2StartEngine,
testDispatcherCgr2RPCConn,
testDispatcherOptsCoreStatus, // *disaptchers:false
testDispatcherAdminCoreStatus, // *disaptchers:false
testDispatcherCgr1CoreStatus, // *disaptchers:false
testDispatcherCgr2CoreStatus, // *disaptchers:false
testDispatcherGetItemBothEnginesFirstAttempt, // NOT FOUND
testDispatcherOptsDSPStopEngine,
testDispatcherOptsAdminStopEngine,
testDispatcherCgr1StopEngine,
testDispatcherCgr2StopEngine,
// SECOND PART OF THE TEST
// START HOST2 engine
@@ -72,125 +74,126 @@ var (
testDispatcherSetterStartEngine,
testDispatcherSetterRPCConn,
testDispatcherOptsAdminStartEngine,
testDispatcherOptsAdminRPCConn,
testDispatcherCgr2StartEngine,
testDispatcherCgr2RPCConn,
testDispatcherOptsSetterSetDispatcherProfile, // contains both hosts, HOST1 prio, host2 backup
testDispatcherSetterSetDispatcherProfile, // contains both hosts, HOST1 prio, host2 backup
testDispatcherAdminCoreStatusWithRouteID, // HOST2 matched because HOST1 is not started yet
testDispatcherAdminGetItemHOST2,
testDispatcherCgr2CoreStatusWithRouteID, // HOST2 matched because HOST1 is not started yet
testDispatcherCgr2GetItemHOST2,
// START HOST1 engine
testDispatcherOptsDSPStartEngine,
testDispatcherOptsDSPRPCConn,
testDispatcherAdminCoreStatusWithRouteID, // same HOST2 will be matched, due to routeID
testDispatcherCgr1StartEngine,
testDispatcherCgr1RPCConn,
testDispatcherCgr1CoreStatusWithRouteIDSecondAttempt, // same HOST2 will be matched, due to routeID
// clear cache in order to remove routeID
testDisaptcherCacheClear,
testDispatcherAdminCoreStatusWithRouteIDButHost1, // due to clearing cache, HOST1 will be matched
testDispatcherCgr1CoreStatusWithRouteIDButHost1, // due to clearing cache, HOST1 will be matched
// verify cache of dispatchers, SetDispatcherProfile API should reload the dispatchers cache (instance, profile and route)
testDispatcherAdminCheckCacheAfterRouting,
testDispatcherCgr1CheckCacheAfterRouting,
testDispatcherSetterSetDispatcherProfileOverwrite,
testDispatcherCheckCacheAfterSetDispatcherDSP1,
/* testDispatcherSetterSetAnotherProifle, //DSP2
testDispatcherSetterSetAnotherProifle, //DSP2
testDispatcherCheckCacheAfterSetDispatcherDSP1, //we set DSP2, so for DSP1 nothing changed
testDispatcherCheckCacheAfterSetDispatcherDSP2, */ //NOT_FOUND for every get, cause it was not used that profile before
testDispatcherCheckCacheAfterSetDispatcherDSP2, //NOT_FOUND for every get, cause it was not used that profile before
testDispatcherOptsDSPStopEngine,
testDispatcherOptsAdminStopEngine,
testDispatcherCgr1StopEngine,
testDispatcherCgr2StopEngine,
}
)
func TestDispatcherOpts(t *testing.T) {
for _, test := range dpsOptsTest {
t.Run(dspOptsConfigDIR, test)
t.Run("dispatcher-opts", test)
}
}
func testDispatcherOptsAdminInitCfg(t *testing.T) {
dspOptsConfigDIR = "dispatcher_opts_apier"
func testDispatcherCgr1InitCfg(t *testing.T) {
cfg1ConfigDIR = "dispatcher_opts_host1"
var err error
apierCfgPath = path.Join(*dataDir, "conf", "samples", dspOptsConfigDIR)
apierCfg, err = config.NewCGRConfigFromPath(apierCfgPath)
cfg1CfgPath = path.Join(*dataDir, "conf", "samples", cfg1ConfigDIR)
cfg1Cfg, err = config.NewCGRConfigFromPath(cfg1CfgPath)
if err != nil {
t.Error(err)
}
}
func testDispatcherOptsAdminInitDataDb(t *testing.T) {
if err := engine.InitDataDb(apierCfg); err != nil {
func testDispatcherCgr1InitDataDb(t *testing.T) {
if err := engine.InitDataDb(cfg1Cfg); err != nil {
t.Fatal(err)
}
}
// Start CGR Engine woth Dispatcher enabled
func testDispatcherOptsAdminStartEngine(t *testing.T) {
if _, err := engine.StartEngine(apierCfgPath, *waitRater); err != nil {
func testDispatcherCgr1StartEngine(t *testing.T) {
if _, err := engine.StartEngine(cfg1CfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
func testDispatcherOptsAdminRPCConn(t *testing.T) {
func testDispatcherCgr1RPCConn(t *testing.T) {
var err error
apierRPC, err = newRPCClient(apierCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
cgr1RPC, err = newRPCClient(cfg1Cfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal(err)
}
}
func testDispatcherOptsDSPInitCfg(t *testing.T) {
dspOptsConfigDIR = "dispatcher_opts" //changed with the cfg with dispatcher on
func testDispatcherCgr2InitCfg(t *testing.T) {
cfg2ConfigDIR = "dispatcher_opts_host2" //changed with the cfg with dispatcher on
var err error
dspOptsCfgPath = path.Join(*dataDir, "conf", "samples", dspOptsConfigDIR)
dspOptsCfg, err = config.NewCGRConfigFromPath(dspOptsCfgPath)
cfg2CfgPath = path.Join(*dataDir, "conf", "samples", cfg2ConfigDIR)
cfg2OptsCfg, err = config.NewCGRConfigFromPath(cfg2CfgPath)
if err != nil {
t.Error(err)
}
}
// Start CGR Engine woth Dispatcher enabled
func testDispatcherOptsDSPStartEngine(t *testing.T) {
if _, err := engine.StartEngine(dspOptsCfgPath, *waitRater); err != nil {
func testDispatcherCgr2StartEngine(t *testing.T) {
if _, err := engine.StartEngine(cfg2CfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
func testDispatcherOptsDSPRPCConn(t *testing.T) {
func testDispatcherCgr2RPCConn(t *testing.T) {
var err error
dspOptsRPC, err = newRPCClient(dspOptsCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
cgr2RPC, err = newRPCClient(cfg2OptsCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal(err)
}
}
func testDispatcherOptsCoreStatus(t *testing.T) {
// HOST1 host matched
func testDispatcherCgr1CoreStatus(t *testing.T) {
// HOST1 host matched :2012
var reply map[string]interface{}
ev := utils.TenantWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
utils.OptsRouteID: "account#dan.bogos",
utils.MetaDispatchers: false,
},
}
if err := dspOptsRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
if err := cgr1RPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
t.Error(err)
} else if reply[utils.NodeID] != "HOST1" {
t.Errorf("Expected HOST1, received %v", reply[utils.NodeID])
}
}
func testDispatcherAdminCoreStatus(t *testing.T) {
func testDispatcherCgr2CoreStatus(t *testing.T) {
// HOST2 host matched because it was called from engine with port :4012 -> host2
var reply map[string]interface{}
ev := utils.TenantWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
utils.OptsRouteID: "account#dan.bogos",
utils.OptsDispatchers: false,
utils.MetaDispatchers: false,
},
}
if err := apierRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
if err := cgr2RPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
t.Error(err)
} else if reply[utils.NodeID] != "HOST2" {
t.Errorf("Expected HOST2, received %v", reply[utils.NodeID])
@@ -202,7 +205,7 @@ func testDispatcherGetItemBothEnginesFirstAttempt(t *testing.T) {
argsCache := &utils.ArgsGetCacheItemWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
utils.MetaDispatchers: false,
},
ArgsGetCacheItem: utils.ArgsGetCacheItem{
CacheID: utils.CacheDispatcherRoutes,
@@ -210,11 +213,11 @@ func testDispatcherGetItemBothEnginesFirstAttempt(t *testing.T) {
},
}
var reply interface{}
if err := dspOptsRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
if err := cgr2RPC.Call(utils.CacheSv1GetItem, argsCache,
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache,
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
@@ -223,18 +226,18 @@ func testDispatcherGetItemBothEnginesFirstAttempt(t *testing.T) {
argsCache = &utils.ArgsGetCacheItemWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
utils.MetaDispatchers: false,
},
ArgsGetCacheItem: utils.ArgsGetCacheItem{
CacheID: utils.CacheDispatcherProfiles,
ItemID: "cgrates.org:DSP1",
},
}
if err := dspOptsRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
if err := cgr2RPC.Call(utils.CacheSv1GetItem, argsCache,
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache,
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
@@ -243,27 +246,27 @@ func testDispatcherGetItemBothEnginesFirstAttempt(t *testing.T) {
argsCache = &utils.ArgsGetCacheItemWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
utils.MetaDispatchers: false,
},
ArgsGetCacheItem: utils.ArgsGetCacheItem{
CacheID: utils.CacheDispatchers,
ItemID: "cgrates.org:DSP1",
},
}
if err := dspOptsRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
if err := cgr2RPC.Call(utils.CacheSv1GetItem, argsCache,
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache,
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
}
func testDispatcherSetterInitCfg(t *testing.T) {
dspOptsConfigDIR = "dispatcher_opts_setter"
setterConfigDIR = "dispatcher_opts_setter"
var err error
setterCfgPath = path.Join(*dataDir, "conf", "samples", dspOptsConfigDIR)
setterCfgPath = path.Join(*dataDir, "conf", "samples", setterConfigDIR)
setterCfg, err = config.NewCGRConfigFromPath(setterCfgPath)
if err != nil {
t.Error(err)
@@ -284,7 +287,7 @@ func testDispatcherSetterRPCConn(t *testing.T) {
}
}
func testDispatcherOptsSetterSetDispatcherProfile(t *testing.T) {
func testDispatcherSetterSetDispatcherProfile(t *testing.T) {
// Set DispatcherHost
var replyStr string
setDispatcherHost := &engine.DispatcherHostWithAPIOpts{
@@ -301,7 +304,7 @@ func testDispatcherOptsSetterSetDispatcherProfile(t *testing.T) {
},
},
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
utils.MetaDispatchers: false,
},
}
if err := setterRPC.Call(utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil {
@@ -324,7 +327,7 @@ func testDispatcherOptsSetterSetDispatcherProfile(t *testing.T) {
},
},
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
utils.MetaDispatchers: false,
},
}
if err := setterRPC.Call(utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil {
@@ -353,7 +356,7 @@ func testDispatcherOptsSetterSetDispatcherProfile(t *testing.T) {
},
},
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
utils.MetaDispatchers: false,
},
}
if err := setterRPC.Call(utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil {
@@ -363,7 +366,7 @@ func testDispatcherOptsSetterSetDispatcherProfile(t *testing.T) {
}
}
func testDispatcherAdminCoreStatusWithRouteID(t *testing.T) {
func testDispatcherCgr2CoreStatusWithRouteID(t *testing.T) {
var reply map[string]interface{}
ev := utils.TenantWithAPIOpts{
Tenant: "cgrates.org",
@@ -371,19 +374,36 @@ func testDispatcherAdminCoreStatusWithRouteID(t *testing.T) {
utils.OptsRouteID: "account#dan.bogos",
},
}
if err := apierRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
// even if HOST1 is prio, this engine was not staretd yet, so HOST2 matched
if err := cgr2RPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
t.Error(err)
} else if reply[utils.NodeID] != "HOST2" {
t.Errorf("Expected HOST2, received %v", reply[utils.NodeID])
}
}
func testDispatcherAdminGetItemHOST2(t *testing.T) {
func testDispatcherCgr1CoreStatusWithRouteIDSecondAttempt(t *testing.T) {
var reply map[string]interface{}
ev := utils.TenantWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
utils.OptsRouteID: "account#dan.bogos",
},
}
// same HOST2 will be matched, due to routeID
if err := cgr1RPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
t.Error(err)
} else if reply[utils.NodeID] != "HOST2" {
t.Errorf("Expected HOST2, received %v", reply[utils.NodeID])
}
}
func testDispatcherCgr2GetItemHOST2(t *testing.T) {
// get for *dispatcher_routes
argsCache := &utils.ArgsGetCacheItemWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
utils.MetaDispatchers: false,
},
ArgsGetCacheItem: utils.ArgsGetCacheItem{
CacheID: utils.CacheDispatcherRoutes,
@@ -391,7 +411,7 @@ func testDispatcherAdminGetItemHOST2(t *testing.T) {
},
}
var reply interface{}
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
if err := cgr2RPC.Call(utils.CacheSv1GetItem, argsCache,
&reply); err != nil {
t.Error(err)
} else {
@@ -409,14 +429,14 @@ func testDispatcherAdminGetItemHOST2(t *testing.T) {
argsCache = &utils.ArgsGetCacheItemWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
utils.MetaDispatchers: false,
},
ArgsGetCacheItem: utils.ArgsGetCacheItem{
CacheID: utils.CacheDispatcherProfiles,
ItemID: "cgrates.org:DSP1",
},
}
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
if err := cgr2RPC.Call(utils.CacheSv1GetItem, argsCache,
&reply); err != nil {
t.Error(err)
} else {
@@ -455,7 +475,7 @@ func testDispatcherAdminGetItemHOST2(t *testing.T) {
argsCache = &utils.ArgsGetCacheItemWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
utils.MetaDispatchers: false,
},
ArgsGetCacheItem: utils.ArgsGetCacheItem{
CacheID: utils.CacheDispatchers,
@@ -463,7 +483,7 @@ func testDispatcherAdminGetItemHOST2(t *testing.T) {
},
}
// reply here is an interface type(singleResultDispatcher), it exists
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
if err := cgr2RPC.Call(utils.CacheSv1GetItem, argsCache,
&reply); err != nil {
t.Error(err)
}
@@ -471,9 +491,9 @@ func testDispatcherAdminGetItemHOST2(t *testing.T) {
func testDisaptcherCacheClear(t *testing.T) {
var reply string
if err := apierRPC.Call(utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{
if err := cgr1RPC.Call(utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
utils.MetaDispatchers: false,
},
}, &reply); err != nil {
t.Fatal(err)
@@ -481,9 +501,9 @@ func testDisaptcherCacheClear(t *testing.T) {
t.Errorf("Unexpected reply returned")
}
if err := dspOptsRPC.Call(utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{
if err := cgr2RPC.Call(utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
utils.MetaDispatchers: false,
},
}, &reply); err != nil {
t.Fatal(err)
@@ -492,7 +512,7 @@ func testDisaptcherCacheClear(t *testing.T) {
}
}
func testDispatcherAdminCoreStatusWithRouteIDButHost1(t *testing.T) {
func testDispatcherCgr1CoreStatusWithRouteIDButHost1(t *testing.T) {
var reply map[string]interface{}
ev := utils.TenantWithAPIOpts{
Tenant: "cgrates.org",
@@ -500,14 +520,15 @@ func testDispatcherAdminCoreStatusWithRouteIDButHost1(t *testing.T) {
utils.OptsRouteID: "account#dan.bogos",
},
}
if err := apierRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
// as the cache was cleared, HOST1 will match due to his high prio, and it will be set as *dispatcher_routes as HOST1
if err := cgr1RPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
t.Error(err)
} else if reply[utils.NodeID] != "HOST1" {
t.Errorf("Expected HOST1, received %v", reply[utils.NodeID])
}
}
func testDispatcherAdminCheckCacheAfterRouting(t *testing.T) {
func testDispatcherCgr1CheckCacheAfterRouting(t *testing.T) {
// get for *dispatcher_routes
argsCache := &utils.ArgsGetCacheItemWithAPIOpts{
Tenant: "cgrates.org",
@@ -520,7 +541,7 @@ func testDispatcherAdminCheckCacheAfterRouting(t *testing.T) {
},
}
var reply interface{}
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache,
&reply); err != nil {
t.Error(err)
} else {
@@ -545,7 +566,7 @@ func testDispatcherAdminCheckCacheAfterRouting(t *testing.T) {
ItemID: "cgrates.org:DSP1",
},
}
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache,
&reply); err != nil {
t.Error(err)
} else {
@@ -592,14 +613,14 @@ func testDispatcherAdminCheckCacheAfterRouting(t *testing.T) {
},
}
// reply here is an interface type(singleResultDispatcher), it exists
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache,
&reply); err != nil {
t.Error(err)
}
}
func testDispatcherSetterSetDispatcherProfileOverwrite(t *testing.T) {
// as the cache was cleard, now that previously the HOST1 was matched, setting the profile wiht only HOST2 will remove the
// as the cache was cleared, and previously the HOST1 matched, setting the profile with only HOST2 will remove the
// DispatcherRoutes, DispatcherProfile and the DispatcherInstance
var replyStr string
// Set DispatcherProfile
@@ -626,6 +647,7 @@ func testDispatcherSetterSetDispatcherProfileOverwrite(t *testing.T) {
} else if replyStr != utils.OK {
t.Error("Unexpected reply returned", replyStr)
}
time.Sleep(100 * time.Millisecond)
}
func testDispatcherCheckCacheAfterSetDispatcherDSP1(t *testing.T) {
@@ -634,7 +656,7 @@ func testDispatcherCheckCacheAfterSetDispatcherDSP1(t *testing.T) {
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
utils.MetaDispatchers: false,
"adi": "nu",
"adi3": "nu",
},
ArgsGetCacheItem: utils.ArgsGetCacheItem{
CacheID: utils.CacheDispatcherRoutes,
@@ -642,16 +664,17 @@ func testDispatcherCheckCacheAfterSetDispatcherDSP1(t *testing.T) {
},
}
var reply interface{} // Should receive NOT_FOUND, as CallCache that was called in API will remove the DispatcherRoute
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache,
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Errorf("Unexpected error returned: %v", err)
}
/* // get for *dispatcher_profiles
// get for *dispatcher_profiles
argsCache = &utils.ArgsGetCacheItemWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
utils.MetaDispatchers: false,
"adi2": "nu",
},
ArgsGetCacheItem: utils.ArgsGetCacheItem{
CacheID: utils.CacheDispatcherProfiles,
@@ -659,7 +682,7 @@ func testDispatcherCheckCacheAfterSetDispatcherDSP1(t *testing.T) {
},
}
// as the DSP1 profile was overwritten, only HOST2 in profile will be contained
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache,
&reply); err != nil {
t.Error(err)
} else {
@@ -692,6 +715,7 @@ func testDispatcherCheckCacheAfterSetDispatcherDSP1(t *testing.T) {
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
utils.MetaDispatchers: false,
"adi1": "nu",
},
ArgsGetCacheItem: utils.ArgsGetCacheItem{
CacheID: utils.CacheDispatchers,
@@ -699,10 +723,10 @@ func testDispatcherCheckCacheAfterSetDispatcherDSP1(t *testing.T) {
},
}
// DispatcherInstance should also be removed, so it will be NOT_FOUND
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache,
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Errorf("Unexpected error returned: %v", err)
} */
t.Errorf("Unexpected error returned: %v and reply: %v", err, reply)
}
}
func testDispatcherSetterSetAnotherProifle(t *testing.T) {
@@ -735,6 +759,7 @@ func testDispatcherSetterSetAnotherProifle(t *testing.T) {
} else if replyStr != utils.OK {
t.Error("Unexpected reply returned", replyStr)
}
time.Sleep(100 * time.Millisecond)
}
func testDispatcherCheckCacheAfterSetDispatcherDSP2(t *testing.T) {
@@ -751,7 +776,7 @@ func testDispatcherCheckCacheAfterSetDispatcherDSP2(t *testing.T) {
}
var reply interface{}
// NOT_FOUND
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache,
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Errorf("Unexpected error returned: %v", err)
}
@@ -768,7 +793,7 @@ func testDispatcherCheckCacheAfterSetDispatcherDSP2(t *testing.T) {
},
}
// NOT_FOUND
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache,
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Errorf("Unexpected error returned: %v", err)
}
@@ -785,19 +810,19 @@ func testDispatcherCheckCacheAfterSetDispatcherDSP2(t *testing.T) {
},
}
// NOT_FOUND
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache,
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Errorf("Unexpected error returned: %v", err)
}
}
func testDispatcherOptsDSPStopEngine(t *testing.T) {
func testDispatcherCgr1StopEngine(t *testing.T) {
if err := engine.KillEngine(*waitRater); err != nil {
t.Error(err)
}
}
func testDispatcherOptsAdminStopEngine(t *testing.T) {
func testDispatcherCgr2StopEngine(t *testing.T) {
if err := engine.KillEngine(*waitRater); err != nil {
t.Error(err)
}

View File

@@ -2482,7 +2482,6 @@ const (
OptsAPIKey = "*apiKey"
OptsRouteID = "*routeID"
OptsDispatchersProfilesCount = "*dispatchersProfilesCount"
OptsDispatchers = "*dispatcherS"
// EEs
OptsEEsVerbose = "*eesVerbose"
// Resources