From 8f416cdbcad637c85d82af5ce505ab1c3ebe6d8b Mon Sep 17 00:00:00 2001 From: adi Date: Fri, 21 Oct 2022 16:25:27 +0300 Subject: [PATCH] Improved dispatcher consts and opts test working --- .../cgrates.json | 8 +- .../cgrates.json | 6 +- dispatchers/dispatchers.go | 5 +- dispatchers/libdispatcher.go | 15 +- engine/caches.go | 5 + engine/datamanager.go | 1 - general_tests/dispatcher_opts_it_test.go | 247 ++++++++++-------- utils/consts.go | 1 - 8 files changed, 152 insertions(+), 136 deletions(-) rename data/conf/samples/{dispatcher_opts => dispatcher_opts_host1}/cgrates.json (86%) rename data/conf/samples/{dispatcher_opts_apier => dispatcher_opts_host2}/cgrates.json (92%) diff --git a/data/conf/samples/dispatcher_opts/cgrates.json b/data/conf/samples/dispatcher_opts_host1/cgrates.json similarity index 86% rename from data/conf/samples/dispatcher_opts/cgrates.json rename to data/conf/samples/dispatcher_opts_host1/cgrates.json index 9d84cca7a..31b3e6602 100644 --- a/data/conf/samples/dispatcher_opts/cgrates.json +++ b/data/conf/samples/dispatcher_opts_host1/cgrates.json @@ -21,10 +21,6 @@ "db_password": "CGRateS.org" }, -"attributes": { - "enabled": true -}, - "dispatchers":{ "enabled": true, "prevent_loop": true @@ -36,7 +32,7 @@ "*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false}, "*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false} }, - "replication_conns": ["gob_cache"] , + //"replication_conns": ["gob_cache"] , "remote_conns": ["gob_cache"] }, @@ -44,7 +40,7 @@ "gob_cache": { "strategy": "*first", "conns": [ - {"address": "127.0.0.1:2013", "transport":"*gob"} + {"address": "127.0.0.1:4013", "transport":"*gob"} ] } }, diff --git a/data/conf/samples/dispatcher_opts_apier/cgrates.json b/data/conf/samples/dispatcher_opts_host2/cgrates.json similarity index 92% rename from data/conf/samples/dispatcher_opts_apier/cgrates.json rename to data/conf/samples/dispatcher_opts_host2/cgrates.json index 6fdac1a13..46ce4b2ae 100644 --- a/data/conf/samples/dispatcher_opts_apier/cgrates.json +++ b/data/conf/samples/dispatcher_opts_host2/cgrates.json @@ -32,14 +32,10 @@ "*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false}, "*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false} }, - "replication_conns": ["gob_cache"], + //"replication_conns": ["gob_cache"], "remote_conns": ["gob_cache"] }, -"attributes": { - "enabled": true, -}, - "apiers": { "enabled": true }, diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go index 70f1e3d7f..d1dc3bfd1 100644 --- a/dispatchers/dispatchers.go +++ b/dispatchers/dispatchers.go @@ -104,7 +104,7 @@ func (dS *DispatcherService) dispatcherProfilesForEvent(tnt string, ev *utils.CG evNm utils.MapStorage, subsys string) (dPrlfs engine.DispatcherProfiles, err error) { // make sure dispatching is allowed var shouldDispatch bool - if shouldDispatch, err = utils.GetBoolOpts(ev, true, utils.OptsDispatchers); err != nil { + if shouldDispatch, err = utils.GetBoolOpts(ev, true, utils.MetaDispatchers); err != nil { return } else { var subsys string @@ -200,7 +200,6 @@ func (dS *DispatcherService) dispatcherProfilesForEvent(tnt string, ev *utils.CG // Dispatch is the method forwarding the request towards the right connection func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, serviceMethod string, args interface{}, reply interface{}) (err error) { - // fix missing tenant tnt := ev.Tenant if tnt == utils.EmptyString { tnt = dS.cfg.GeneralCfg().DefaultTenant @@ -222,7 +221,7 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, } // avoid further processing if the request is internal var shouldDispatch bool - if shouldDispatch, err = utils.GetBoolOpts(ev, true, utils.OptsDispatchers); err != nil { + if shouldDispatch, err = utils.GetBoolOpts(ev, true, utils.MetaDispatchers); err != nil { return utils.NewErrDispatcherS(err) } else { var subsys string diff --git a/dispatchers/libdispatcher.go b/dispatchers/libdispatcher.go index 06e8b3ed3..036eaacd0 100644 --- a/dispatchers/libdispatcher.go +++ b/dispatchers/libdispatcher.go @@ -455,16 +455,13 @@ func callDH(dh *engine.DispatcherHost, routeID string, dR *DispatcherRoute, Value: dR, GroupIDs: []string{utils.ConcatenatedKey(utils.CacheDispatcherProfiles, dR.Tenant, dR.ProfileID)}, } - if len(config.CgrConfig().CacheCfg().ReplicationConns) != 0 && - config.CgrConfig().CacheCfg().Partitions[utils.CacheDispatcherRoutes].Replicate { - if err = engine.Cache.SetWithReplicate(argsCache); err != nil { - if !rpcclient.IsNetworkError(err) { - return - } - // did not dispatch properly, fail-back to standard dispatching - utils.Logger.Warning(fmt.Sprintf("<%s> ignoring cache network error <%s> setting route dR %+v", - utils.DispatcherS, err.Error(), dR)) + if err = engine.Cache.SetWithReplicate(argsCache); err != nil { + if !rpcclient.IsNetworkError(err) { + return } + // did not dispatch properly, fail-back to standard dispatching + utils.Logger.Warning(fmt.Sprintf("<%s> ignoring cache network error <%s> setting route dR %+v", + utils.DispatcherS, err.Error(), dR)) } } if err = dh.Call(method, args, reply); err != nil { diff --git a/engine/caches.go b/engine/caches.go index ee21d6b4a..a9ebbbf05 100644 --- a/engine/caches.go +++ b/engine/caches.go @@ -178,7 +178,12 @@ func (chS *CacheS) SetWithoutReplicate(chID, itmID string, value interface{}, // SetWithReplicate combines local set with replicate, receiving the arguments needed by dispatcher func (chS *CacheS) SetWithReplicate(args *utils.ArgCacheReplicateSet) (err error) { + // normal cache set chS.tCache.Set(args.CacheID, args.ItemID, args.Value, args.GroupIDs, true, utils.EmptyString) + if len(chS.cfg.CacheCfg().ReplicationConns) == 0 || + !chS.cfg.CacheCfg().Partitions[args.CacheID].Replicate { + return + } var reply string return connMgr.Call(chS.cfg.CacheCfg().ReplicationConns, nil, utils.CacheSv1ReplicateSet, args, &reply) diff --git a/engine/datamanager.go b/engine/datamanager.go index 1b2d392ad..95ddc91e7 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -2798,7 +2798,6 @@ func (dm *DataManager) GetDispatcherProfile(tenant, id string, cacheRead, cacheW cacheCommit(transactionID), transactionID); errCh != nil { return nil, errCh } - } return nil, err } diff --git a/general_tests/dispatcher_opts_it_test.go b/general_tests/dispatcher_opts_it_test.go index 450b42f91..53aca2328 100644 --- a/general_tests/dispatcher_opts_it_test.go +++ b/general_tests/dispatcher_opts_it_test.go @@ -34,36 +34,38 @@ import ( ) var ( - setterCfgPath string - dspOptsCfgPath string - apierCfgPath string - setterCfg *config.CGRConfig - dspOptsCfg *config.CGRConfig - apierCfg *config.CGRConfig - setterRPC *rpc.Client - dspOptsRPC *rpc.Client - apierRPC *rpc.Client - dspOptsConfigDIR string - dpsOptsTest = []func(t *testing.T){ - // FIRST APRT OF THE TEST - // Start engine without Dispatcher on engine 4012 - testDispatcherOptsAdminInitCfg, - testDispatcherOptsAdminInitDataDb, - testDispatcherOptsAdminStartEngine, - testDispatcherOptsAdminRPCConn, + setterCfgPath string + cfg2CfgPath string + cfg1CfgPath string + setterCfg *config.CGRConfig + cfg2OptsCfg *config.CGRConfig + cfg1Cfg *config.CGRConfig + setterRPC *rpc.Client + cgr2RPC *rpc.Client + cgr1RPC *rpc.Client + cfg1ConfigDIR string + cfg2ConfigDIR string + setterConfigDIR string + dpsOptsTest = []func(t *testing.T){ + // FIRST PART OF THE TEST + // Start engine with Dispatcher on engine 2012 + testDispatcherCgr1InitCfg, + testDispatcherCgr1InitDataDb, + testDispatcherCgr1StartEngine, + testDispatcherCgr1RPCConn, // Sending Status requests in both engines, with *dispatchers:false - testDispatcherOptsDSPInitCfg, - testDispatcherOptsDSPStartEngine, - testDispatcherOptsDSPRPCConn, + testDispatcherCgr2InitCfg, + testDispatcherCgr2StartEngine, + testDispatcherCgr2RPCConn, - testDispatcherOptsCoreStatus, // *disaptchers:false - testDispatcherAdminCoreStatus, // *disaptchers:false + testDispatcherCgr1CoreStatus, // *disaptchers:false + testDispatcherCgr2CoreStatus, // *disaptchers:false testDispatcherGetItemBothEnginesFirstAttempt, // NOT FOUND - testDispatcherOptsDSPStopEngine, - testDispatcherOptsAdminStopEngine, + testDispatcherCgr1StopEngine, + testDispatcherCgr2StopEngine, // SECOND PART OF THE TEST // START HOST2 engine @@ -72,125 +74,126 @@ var ( testDispatcherSetterStartEngine, testDispatcherSetterRPCConn, - testDispatcherOptsAdminStartEngine, - testDispatcherOptsAdminRPCConn, + testDispatcherCgr2StartEngine, + testDispatcherCgr2RPCConn, - testDispatcherOptsSetterSetDispatcherProfile, // contains both hosts, HOST1 prio, host2 backup + testDispatcherSetterSetDispatcherProfile, // contains both hosts, HOST1 prio, host2 backup - testDispatcherAdminCoreStatusWithRouteID, // HOST2 matched because HOST1 is not started yet - testDispatcherAdminGetItemHOST2, + testDispatcherCgr2CoreStatusWithRouteID, // HOST2 matched because HOST1 is not started yet + testDispatcherCgr2GetItemHOST2, // START HOST1 engine - testDispatcherOptsDSPStartEngine, - testDispatcherOptsDSPRPCConn, - testDispatcherAdminCoreStatusWithRouteID, // same HOST2 will be matched, due to routeID + testDispatcherCgr1StartEngine, + testDispatcherCgr1RPCConn, + testDispatcherCgr1CoreStatusWithRouteIDSecondAttempt, // same HOST2 will be matched, due to routeID // clear cache in order to remove routeID testDisaptcherCacheClear, - testDispatcherAdminCoreStatusWithRouteIDButHost1, // due to clearing cache, HOST1 will be matched + testDispatcherCgr1CoreStatusWithRouteIDButHost1, // due to clearing cache, HOST1 will be matched // verify cache of dispatchers, SetDispatcherProfile API should reload the dispatchers cache (instance, profile and route) - testDispatcherAdminCheckCacheAfterRouting, + testDispatcherCgr1CheckCacheAfterRouting, testDispatcherSetterSetDispatcherProfileOverwrite, testDispatcherCheckCacheAfterSetDispatcherDSP1, - /* testDispatcherSetterSetAnotherProifle, //DSP2 + testDispatcherSetterSetAnotherProifle, //DSP2 testDispatcherCheckCacheAfterSetDispatcherDSP1, //we set DSP2, so for DSP1 nothing changed - testDispatcherCheckCacheAfterSetDispatcherDSP2, */ //NOT_FOUND for every get, cause it was not used that profile before + testDispatcherCheckCacheAfterSetDispatcherDSP2, //NOT_FOUND for every get, cause it was not used that profile before - testDispatcherOptsDSPStopEngine, - testDispatcherOptsAdminStopEngine, + testDispatcherCgr1StopEngine, + testDispatcherCgr2StopEngine, } ) func TestDispatcherOpts(t *testing.T) { for _, test := range dpsOptsTest { - t.Run(dspOptsConfigDIR, test) + t.Run("dispatcher-opts", test) } } -func testDispatcherOptsAdminInitCfg(t *testing.T) { - dspOptsConfigDIR = "dispatcher_opts_apier" +func testDispatcherCgr1InitCfg(t *testing.T) { + cfg1ConfigDIR = "dispatcher_opts_host1" var err error - apierCfgPath = path.Join(*dataDir, "conf", "samples", dspOptsConfigDIR) - apierCfg, err = config.NewCGRConfigFromPath(apierCfgPath) + cfg1CfgPath = path.Join(*dataDir, "conf", "samples", cfg1ConfigDIR) + cfg1Cfg, err = config.NewCGRConfigFromPath(cfg1CfgPath) if err != nil { t.Error(err) } } -func testDispatcherOptsAdminInitDataDb(t *testing.T) { - if err := engine.InitDataDb(apierCfg); err != nil { +func testDispatcherCgr1InitDataDb(t *testing.T) { + if err := engine.InitDataDb(cfg1Cfg); err != nil { t.Fatal(err) } } // Start CGR Engine woth Dispatcher enabled -func testDispatcherOptsAdminStartEngine(t *testing.T) { - if _, err := engine.StartEngine(apierCfgPath, *waitRater); err != nil { +func testDispatcherCgr1StartEngine(t *testing.T) { + if _, err := engine.StartEngine(cfg1CfgPath, *waitRater); err != nil { t.Fatal(err) } } -func testDispatcherOptsAdminRPCConn(t *testing.T) { +func testDispatcherCgr1RPCConn(t *testing.T) { var err error - apierRPC, err = newRPCClient(apierCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed + cgr1RPC, err = newRPCClient(cfg1Cfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal(err) } } -func testDispatcherOptsDSPInitCfg(t *testing.T) { - dspOptsConfigDIR = "dispatcher_opts" //changed with the cfg with dispatcher on +func testDispatcherCgr2InitCfg(t *testing.T) { + cfg2ConfigDIR = "dispatcher_opts_host2" //changed with the cfg with dispatcher on var err error - dspOptsCfgPath = path.Join(*dataDir, "conf", "samples", dspOptsConfigDIR) - dspOptsCfg, err = config.NewCGRConfigFromPath(dspOptsCfgPath) + cfg2CfgPath = path.Join(*dataDir, "conf", "samples", cfg2ConfigDIR) + cfg2OptsCfg, err = config.NewCGRConfigFromPath(cfg2CfgPath) if err != nil { t.Error(err) } } // Start CGR Engine woth Dispatcher enabled -func testDispatcherOptsDSPStartEngine(t *testing.T) { - if _, err := engine.StartEngine(dspOptsCfgPath, *waitRater); err != nil { +func testDispatcherCgr2StartEngine(t *testing.T) { + if _, err := engine.StartEngine(cfg2CfgPath, *waitRater); err != nil { t.Fatal(err) } } -func testDispatcherOptsDSPRPCConn(t *testing.T) { +func testDispatcherCgr2RPCConn(t *testing.T) { var err error - dspOptsRPC, err = newRPCClient(dspOptsCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed + cgr2RPC, err = newRPCClient(cfg2OptsCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal(err) } } -func testDispatcherOptsCoreStatus(t *testing.T) { - // HOST1 host matched +func testDispatcherCgr1CoreStatus(t *testing.T) { + // HOST1 host matched :2012 var reply map[string]interface{} ev := utils.TenantWithAPIOpts{ Tenant: "cgrates.org", APIOpts: map[string]interface{}{ - utils.OptsDispatchers: false, + utils.OptsRouteID: "account#dan.bogos", + utils.MetaDispatchers: false, }, } - if err := dspOptsRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil { + if err := cgr1RPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil { t.Error(err) } else if reply[utils.NodeID] != "HOST1" { t.Errorf("Expected HOST1, received %v", reply[utils.NodeID]) } } -func testDispatcherAdminCoreStatus(t *testing.T) { +func testDispatcherCgr2CoreStatus(t *testing.T) { // HOST2 host matched because it was called from engine with port :4012 -> host2 var reply map[string]interface{} ev := utils.TenantWithAPIOpts{ Tenant: "cgrates.org", APIOpts: map[string]interface{}{ utils.OptsRouteID: "account#dan.bogos", - utils.OptsDispatchers: false, + utils.MetaDispatchers: false, }, } - if err := apierRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil { + if err := cgr2RPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil { t.Error(err) } else if reply[utils.NodeID] != "HOST2" { t.Errorf("Expected HOST2, received %v", reply[utils.NodeID]) @@ -202,7 +205,7 @@ func testDispatcherGetItemBothEnginesFirstAttempt(t *testing.T) { argsCache := &utils.ArgsGetCacheItemWithAPIOpts{ Tenant: "cgrates.org", APIOpts: map[string]interface{}{ - utils.OptsDispatchers: false, + utils.MetaDispatchers: false, }, ArgsGetCacheItem: utils.ArgsGetCacheItem{ CacheID: utils.CacheDispatcherRoutes, @@ -210,11 +213,11 @@ func testDispatcherGetItemBothEnginesFirstAttempt(t *testing.T) { }, } var reply interface{} - if err := dspOptsRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, + if err := cgr2RPC.Call(utils.CacheSv1GetItem, argsCache, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } - if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, + if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } @@ -223,18 +226,18 @@ func testDispatcherGetItemBothEnginesFirstAttempt(t *testing.T) { argsCache = &utils.ArgsGetCacheItemWithAPIOpts{ Tenant: "cgrates.org", APIOpts: map[string]interface{}{ - utils.OptsDispatchers: false, + utils.MetaDispatchers: false, }, ArgsGetCacheItem: utils.ArgsGetCacheItem{ CacheID: utils.CacheDispatcherProfiles, ItemID: "cgrates.org:DSP1", }, } - if err := dspOptsRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, + if err := cgr2RPC.Call(utils.CacheSv1GetItem, argsCache, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } - if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, + if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } @@ -243,27 +246,27 @@ func testDispatcherGetItemBothEnginesFirstAttempt(t *testing.T) { argsCache = &utils.ArgsGetCacheItemWithAPIOpts{ Tenant: "cgrates.org", APIOpts: map[string]interface{}{ - utils.OptsDispatchers: false, + utils.MetaDispatchers: false, }, ArgsGetCacheItem: utils.ArgsGetCacheItem{ CacheID: utils.CacheDispatchers, ItemID: "cgrates.org:DSP1", }, } - if err := dspOptsRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, + if err := cgr2RPC.Call(utils.CacheSv1GetItem, argsCache, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } - if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, + if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } } func testDispatcherSetterInitCfg(t *testing.T) { - dspOptsConfigDIR = "dispatcher_opts_setter" + setterConfigDIR = "dispatcher_opts_setter" var err error - setterCfgPath = path.Join(*dataDir, "conf", "samples", dspOptsConfigDIR) + setterCfgPath = path.Join(*dataDir, "conf", "samples", setterConfigDIR) setterCfg, err = config.NewCGRConfigFromPath(setterCfgPath) if err != nil { t.Error(err) @@ -284,7 +287,7 @@ func testDispatcherSetterRPCConn(t *testing.T) { } } -func testDispatcherOptsSetterSetDispatcherProfile(t *testing.T) { +func testDispatcherSetterSetDispatcherProfile(t *testing.T) { // Set DispatcherHost var replyStr string setDispatcherHost := &engine.DispatcherHostWithAPIOpts{ @@ -301,7 +304,7 @@ func testDispatcherOptsSetterSetDispatcherProfile(t *testing.T) { }, }, APIOpts: map[string]interface{}{ - utils.OptsDispatchers: false, + utils.MetaDispatchers: false, }, } if err := setterRPC.Call(utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil { @@ -324,7 +327,7 @@ func testDispatcherOptsSetterSetDispatcherProfile(t *testing.T) { }, }, APIOpts: map[string]interface{}{ - utils.OptsDispatchers: false, + utils.MetaDispatchers: false, }, } if err := setterRPC.Call(utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil { @@ -353,7 +356,7 @@ func testDispatcherOptsSetterSetDispatcherProfile(t *testing.T) { }, }, APIOpts: map[string]interface{}{ - utils.OptsDispatchers: false, + utils.MetaDispatchers: false, }, } if err := setterRPC.Call(utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil { @@ -363,7 +366,7 @@ func testDispatcherOptsSetterSetDispatcherProfile(t *testing.T) { } } -func testDispatcherAdminCoreStatusWithRouteID(t *testing.T) { +func testDispatcherCgr2CoreStatusWithRouteID(t *testing.T) { var reply map[string]interface{} ev := utils.TenantWithAPIOpts{ Tenant: "cgrates.org", @@ -371,19 +374,36 @@ func testDispatcherAdminCoreStatusWithRouteID(t *testing.T) { utils.OptsRouteID: "account#dan.bogos", }, } - if err := apierRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil { + // even if HOST1 is prio, this engine was not staretd yet, so HOST2 matched + if err := cgr2RPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil { t.Error(err) } else if reply[utils.NodeID] != "HOST2" { t.Errorf("Expected HOST2, received %v", reply[utils.NodeID]) } } -func testDispatcherAdminGetItemHOST2(t *testing.T) { +func testDispatcherCgr1CoreStatusWithRouteIDSecondAttempt(t *testing.T) { + var reply map[string]interface{} + ev := utils.TenantWithAPIOpts{ + Tenant: "cgrates.org", + APIOpts: map[string]interface{}{ + utils.OptsRouteID: "account#dan.bogos", + }, + } + // same HOST2 will be matched, due to routeID + if err := cgr1RPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil { + t.Error(err) + } else if reply[utils.NodeID] != "HOST2" { + t.Errorf("Expected HOST2, received %v", reply[utils.NodeID]) + } +} + +func testDispatcherCgr2GetItemHOST2(t *testing.T) { // get for *dispatcher_routes argsCache := &utils.ArgsGetCacheItemWithAPIOpts{ Tenant: "cgrates.org", APIOpts: map[string]interface{}{ - utils.OptsDispatchers: false, + utils.MetaDispatchers: false, }, ArgsGetCacheItem: utils.ArgsGetCacheItem{ CacheID: utils.CacheDispatcherRoutes, @@ -391,7 +411,7 @@ func testDispatcherAdminGetItemHOST2(t *testing.T) { }, } var reply interface{} - if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, + if err := cgr2RPC.Call(utils.CacheSv1GetItem, argsCache, &reply); err != nil { t.Error(err) } else { @@ -409,14 +429,14 @@ func testDispatcherAdminGetItemHOST2(t *testing.T) { argsCache = &utils.ArgsGetCacheItemWithAPIOpts{ Tenant: "cgrates.org", APIOpts: map[string]interface{}{ - utils.OptsDispatchers: false, + utils.MetaDispatchers: false, }, ArgsGetCacheItem: utils.ArgsGetCacheItem{ CacheID: utils.CacheDispatcherProfiles, ItemID: "cgrates.org:DSP1", }, } - if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, + if err := cgr2RPC.Call(utils.CacheSv1GetItem, argsCache, &reply); err != nil { t.Error(err) } else { @@ -455,7 +475,7 @@ func testDispatcherAdminGetItemHOST2(t *testing.T) { argsCache = &utils.ArgsGetCacheItemWithAPIOpts{ Tenant: "cgrates.org", APIOpts: map[string]interface{}{ - utils.OptsDispatchers: false, + utils.MetaDispatchers: false, }, ArgsGetCacheItem: utils.ArgsGetCacheItem{ CacheID: utils.CacheDispatchers, @@ -463,7 +483,7 @@ func testDispatcherAdminGetItemHOST2(t *testing.T) { }, } // reply here is an interface type(singleResultDispatcher), it exists - if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, + if err := cgr2RPC.Call(utils.CacheSv1GetItem, argsCache, &reply); err != nil { t.Error(err) } @@ -471,9 +491,9 @@ func testDispatcherAdminGetItemHOST2(t *testing.T) { func testDisaptcherCacheClear(t *testing.T) { var reply string - if err := apierRPC.Call(utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{ + if err := cgr1RPC.Call(utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{ APIOpts: map[string]interface{}{ - utils.OptsDispatchers: false, + utils.MetaDispatchers: false, }, }, &reply); err != nil { t.Fatal(err) @@ -481,9 +501,9 @@ func testDisaptcherCacheClear(t *testing.T) { t.Errorf("Unexpected reply returned") } - if err := dspOptsRPC.Call(utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{ + if err := cgr2RPC.Call(utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{ APIOpts: map[string]interface{}{ - utils.OptsDispatchers: false, + utils.MetaDispatchers: false, }, }, &reply); err != nil { t.Fatal(err) @@ -492,7 +512,7 @@ func testDisaptcherCacheClear(t *testing.T) { } } -func testDispatcherAdminCoreStatusWithRouteIDButHost1(t *testing.T) { +func testDispatcherCgr1CoreStatusWithRouteIDButHost1(t *testing.T) { var reply map[string]interface{} ev := utils.TenantWithAPIOpts{ Tenant: "cgrates.org", @@ -500,14 +520,15 @@ func testDispatcherAdminCoreStatusWithRouteIDButHost1(t *testing.T) { utils.OptsRouteID: "account#dan.bogos", }, } - if err := apierRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil { + // as the cache was cleared, HOST1 will match due to his high prio, and it will be set as *dispatcher_routes as HOST1 + if err := cgr1RPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil { t.Error(err) } else if reply[utils.NodeID] != "HOST1" { t.Errorf("Expected HOST1, received %v", reply[utils.NodeID]) } } -func testDispatcherAdminCheckCacheAfterRouting(t *testing.T) { +func testDispatcherCgr1CheckCacheAfterRouting(t *testing.T) { // get for *dispatcher_routes argsCache := &utils.ArgsGetCacheItemWithAPIOpts{ Tenant: "cgrates.org", @@ -520,7 +541,7 @@ func testDispatcherAdminCheckCacheAfterRouting(t *testing.T) { }, } var reply interface{} - if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, + if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache, &reply); err != nil { t.Error(err) } else { @@ -545,7 +566,7 @@ func testDispatcherAdminCheckCacheAfterRouting(t *testing.T) { ItemID: "cgrates.org:DSP1", }, } - if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, + if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache, &reply); err != nil { t.Error(err) } else { @@ -592,14 +613,14 @@ func testDispatcherAdminCheckCacheAfterRouting(t *testing.T) { }, } // reply here is an interface type(singleResultDispatcher), it exists - if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, + if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache, &reply); err != nil { t.Error(err) } } func testDispatcherSetterSetDispatcherProfileOverwrite(t *testing.T) { - // as the cache was cleard, now that previously the HOST1 was matched, setting the profile wiht only HOST2 will remove the + // as the cache was cleared, and previously the HOST1 matched, setting the profile with only HOST2 will remove the // DispatcherRoutes, DispatcherProfile and the DispatcherInstance var replyStr string // Set DispatcherProfile @@ -626,6 +647,7 @@ func testDispatcherSetterSetDispatcherProfileOverwrite(t *testing.T) { } else if replyStr != utils.OK { t.Error("Unexpected reply returned", replyStr) } + time.Sleep(100 * time.Millisecond) } func testDispatcherCheckCacheAfterSetDispatcherDSP1(t *testing.T) { @@ -634,7 +656,7 @@ func testDispatcherCheckCacheAfterSetDispatcherDSP1(t *testing.T) { Tenant: "cgrates.org", APIOpts: map[string]interface{}{ utils.MetaDispatchers: false, - "adi": "nu", + "adi3": "nu", }, ArgsGetCacheItem: utils.ArgsGetCacheItem{ CacheID: utils.CacheDispatcherRoutes, @@ -642,16 +664,17 @@ func testDispatcherCheckCacheAfterSetDispatcherDSP1(t *testing.T) { }, } var reply interface{} // Should receive NOT_FOUND, as CallCache that was called in API will remove the DispatcherRoute - if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, + if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Errorf("Unexpected error returned: %v", err) } - /* // get for *dispatcher_profiles + // get for *dispatcher_profiles argsCache = &utils.ArgsGetCacheItemWithAPIOpts{ Tenant: "cgrates.org", APIOpts: map[string]interface{}{ utils.MetaDispatchers: false, + "adi2": "nu", }, ArgsGetCacheItem: utils.ArgsGetCacheItem{ CacheID: utils.CacheDispatcherProfiles, @@ -659,7 +682,7 @@ func testDispatcherCheckCacheAfterSetDispatcherDSP1(t *testing.T) { }, } // as the DSP1 profile was overwritten, only HOST2 in profile will be contained - if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, + if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache, &reply); err != nil { t.Error(err) } else { @@ -692,6 +715,7 @@ func testDispatcherCheckCacheAfterSetDispatcherDSP1(t *testing.T) { Tenant: "cgrates.org", APIOpts: map[string]interface{}{ utils.MetaDispatchers: false, + "adi1": "nu", }, ArgsGetCacheItem: utils.ArgsGetCacheItem{ CacheID: utils.CacheDispatchers, @@ -699,10 +723,10 @@ func testDispatcherCheckCacheAfterSetDispatcherDSP1(t *testing.T) { }, } // DispatcherInstance should also be removed, so it will be NOT_FOUND - if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, + if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { - t.Errorf("Unexpected error returned: %v", err) - } */ + t.Errorf("Unexpected error returned: %v and reply: %v", err, reply) + } } func testDispatcherSetterSetAnotherProifle(t *testing.T) { @@ -735,6 +759,7 @@ func testDispatcherSetterSetAnotherProifle(t *testing.T) { } else if replyStr != utils.OK { t.Error("Unexpected reply returned", replyStr) } + time.Sleep(100 * time.Millisecond) } func testDispatcherCheckCacheAfterSetDispatcherDSP2(t *testing.T) { @@ -751,7 +776,7 @@ func testDispatcherCheckCacheAfterSetDispatcherDSP2(t *testing.T) { } var reply interface{} // NOT_FOUND - if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, + if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Errorf("Unexpected error returned: %v", err) } @@ -768,7 +793,7 @@ func testDispatcherCheckCacheAfterSetDispatcherDSP2(t *testing.T) { }, } // NOT_FOUND - if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, + if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Errorf("Unexpected error returned: %v", err) } @@ -785,19 +810,19 @@ func testDispatcherCheckCacheAfterSetDispatcherDSP2(t *testing.T) { }, } // NOT_FOUND - if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, + if err := cgr1RPC.Call(utils.CacheSv1GetItem, argsCache, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Errorf("Unexpected error returned: %v", err) } } -func testDispatcherOptsDSPStopEngine(t *testing.T) { +func testDispatcherCgr1StopEngine(t *testing.T) { if err := engine.KillEngine(*waitRater); err != nil { t.Error(err) } } -func testDispatcherOptsAdminStopEngine(t *testing.T) { +func testDispatcherCgr2StopEngine(t *testing.T) { if err := engine.KillEngine(*waitRater); err != nil { t.Error(err) } diff --git a/utils/consts.go b/utils/consts.go index 2a166db11..cfd061cb6 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2482,7 +2482,6 @@ const ( OptsAPIKey = "*apiKey" OptsRouteID = "*routeID" OptsDispatchersProfilesCount = "*dispatchersProfilesCount" - OptsDispatchers = "*dispatcherS" // EEs OptsEEsVerbose = "*eesVerbose" // Resources