From d7ee81c86ec8fe429b840799f583c1674bcbf7e5 Mon Sep 17 00:00:00 2001 From: adi Date: Thu, 20 Oct 2022 17:34:15 +0300 Subject: [PATCH] Changed cache verification replication/remote --- apier/v1/full_remote_it_test.go | 2 +- .../conf/samples/dispatcher_opts/cgrates.json | 1 + .../dispatcher_opts_apier/cgrates.json | 1 + dispatchers/libdispatcher.go | 15 ++++++----- engine/caches.go | 4 --- general_tests/dispatcher_opts_it_test.go | 25 ++++++++++++++----- 6 files changed, 31 insertions(+), 17 deletions(-) diff --git a/apier/v1/full_remote_it_test.go b/apier/v1/full_remote_it_test.go index af242b1cd..1a433a630 100644 --- a/apier/v1/full_remote_it_test.go +++ b/apier/v1/full_remote_it_test.go @@ -539,7 +539,7 @@ func testFullRemoteITDispatcher(t *testing.T) { var reply *engine.DispatcherProfile if err := fullRemInternalRPC.Call(utils.APIerSv1GetDispatcherProfile, utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "Dsp1"}}, - &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { + &reply); err == nil || err.Error() != utils.ErrDSPProfileNotFound.Error() { t.Fatal(err) } diff --git a/data/conf/samples/dispatcher_opts/cgrates.json b/data/conf/samples/dispatcher_opts/cgrates.json index a8113259b..9d84cca7a 100644 --- a/data/conf/samples/dispatcher_opts/cgrates.json +++ b/data/conf/samples/dispatcher_opts/cgrates.json @@ -36,6 +36,7 @@ "*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false}, "*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false} }, + "replication_conns": ["gob_cache"] , "remote_conns": ["gob_cache"] }, diff --git a/data/conf/samples/dispatcher_opts_apier/cgrates.json b/data/conf/samples/dispatcher_opts_apier/cgrates.json index 098ee5563..6fdac1a13 100644 --- a/data/conf/samples/dispatcher_opts_apier/cgrates.json +++ b/data/conf/samples/dispatcher_opts_apier/cgrates.json @@ -32,6 +32,7 @@ "*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false}, "*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false} }, + "replication_conns": ["gob_cache"], "remote_conns": ["gob_cache"] }, diff --git a/dispatchers/libdispatcher.go b/dispatchers/libdispatcher.go index 036eaacd0..06e8b3ed3 100644 --- a/dispatchers/libdispatcher.go +++ b/dispatchers/libdispatcher.go @@ -455,13 +455,16 @@ func callDH(dh *engine.DispatcherHost, routeID string, dR *DispatcherRoute, Value: dR, GroupIDs: []string{utils.ConcatenatedKey(utils.CacheDispatcherProfiles, dR.Tenant, dR.ProfileID)}, } - if err = engine.Cache.SetWithReplicate(argsCache); err != nil { - if !rpcclient.IsNetworkError(err) { - return + if len(config.CgrConfig().CacheCfg().ReplicationConns) != 0 && + config.CgrConfig().CacheCfg().Partitions[utils.CacheDispatcherRoutes].Replicate { + if err = engine.Cache.SetWithReplicate(argsCache); err != nil { + if !rpcclient.IsNetworkError(err) { + return + } + // did not dispatch properly, fail-back to standard dispatching + utils.Logger.Warning(fmt.Sprintf("<%s> ignoring cache network error <%s> setting route dR %+v", + utils.DispatcherS, err.Error(), dR)) } - // did not dispatch properly, fail-back to standard dispatching - utils.Logger.Warning(fmt.Sprintf("<%s> ignoring cache network error <%s> setting route dR %+v", - utils.DispatcherS, err.Error(), dR)) } } if err = dh.Call(method, args, reply); err != nil { diff --git a/engine/caches.go b/engine/caches.go index 9c7e0bd8c..ee21d6b4a 100644 --- a/engine/caches.go +++ b/engine/caches.go @@ -179,10 +179,6 @@ func (chS *CacheS) SetWithoutReplicate(chID, itmID string, value interface{}, // SetWithReplicate combines local set with replicate, receiving the arguments needed by dispatcher func (chS *CacheS) SetWithReplicate(args *utils.ArgCacheReplicateSet) (err error) { chS.tCache.Set(args.CacheID, args.ItemID, args.Value, args.GroupIDs, true, utils.EmptyString) - if len(chS.cfg.CacheCfg().ReplicationConns) == 0 || - !chS.cfg.CacheCfg().Partitions[args.CacheID].Replicate { - return - } var reply string return connMgr.Call(chS.cfg.CacheCfg().ReplicationConns, nil, utils.CacheSv1ReplicateSet, args, &reply) diff --git a/general_tests/dispatcher_opts_it_test.go b/general_tests/dispatcher_opts_it_test.go index e7ea65fe3..450b42f91 100644 --- a/general_tests/dispatcher_opts_it_test.go +++ b/general_tests/dispatcher_opts_it_test.go @@ -93,9 +93,9 @@ var ( testDispatcherAdminCheckCacheAfterRouting, testDispatcherSetterSetDispatcherProfileOverwrite, testDispatcherCheckCacheAfterSetDispatcherDSP1, - testDispatcherSetterSetAnotherProifle, //DSP2 + /* testDispatcherSetterSetAnotherProifle, //DSP2 testDispatcherCheckCacheAfterSetDispatcherDSP1, //we set DSP2, so for DSP1 nothing changed - testDispatcherCheckCacheAfterSetDispatcherDSP2, //NOT_FOUND for every get, cause it was not used that profile before + testDispatcherCheckCacheAfterSetDispatcherDSP2, */ //NOT_FOUND for every get, cause it was not used that profile before testDispatcherOptsDSPStopEngine, testDispatcherOptsAdminStopEngine, @@ -480,6 +480,16 @@ func testDisaptcherCacheClear(t *testing.T) { } else if reply != utils.OK { t.Errorf("Unexpected reply returned") } + + if err := dspOptsRPC.Call(utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{ + APIOpts: map[string]interface{}{ + utils.OptsDispatchers: false, + }, + }, &reply); err != nil { + t.Fatal(err) + } else if reply != utils.OK { + t.Errorf("Unexpected reply returned") + } } func testDispatcherAdminCoreStatusWithRouteIDButHost1(t *testing.T) { @@ -540,7 +550,8 @@ func testDispatcherAdminCheckCacheAfterRouting(t *testing.T) { t.Error(err) } else { expected := map[string]interface{}{ - utils.FilterIDs: nil, + utils.ActivationIntervalString: nil, + utils.FilterIDs: nil, "Hosts": []interface{}{ map[string]interface{}{ utils.Blocker: false, @@ -559,12 +570,13 @@ func testDispatcherAdminCheckCacheAfterRouting(t *testing.T) { }, utils.ID: "DSP1", utils.Strategy: "*weight", + utils.Subsystems: []interface{}{"*any"}, "StrategyParams": nil, utils.Tenant: "cgrates.org", utils.Weight: 10., } if !reflect.DeepEqual(expected, reply) { - t.Errorf("Expected %+v, \n received %+v", expected, reply) + t.Errorf("Expected %+v, \n received %+v", utils.ToJSON(expected), utils.ToJSON(reply)) } } @@ -622,6 +634,7 @@ func testDispatcherCheckCacheAfterSetDispatcherDSP1(t *testing.T) { Tenant: "cgrates.org", APIOpts: map[string]interface{}{ utils.MetaDispatchers: false, + "adi": "nu", }, ArgsGetCacheItem: utils.ArgsGetCacheItem{ CacheID: utils.CacheDispatcherRoutes, @@ -634,7 +647,7 @@ func testDispatcherCheckCacheAfterSetDispatcherDSP1(t *testing.T) { t.Errorf("Unexpected error returned: %v", err) } - // get for *dispatcher_profiles + /* // get for *dispatcher_profiles argsCache = &utils.ArgsGetCacheItemWithAPIOpts{ Tenant: "cgrates.org", APIOpts: map[string]interface{}{ @@ -689,7 +702,7 @@ func testDispatcherCheckCacheAfterSetDispatcherDSP1(t *testing.T) { if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Errorf("Unexpected error returned: %v", err) - } + } */ } func testDispatcherSetterSetAnotherProifle(t *testing.T) {