From 758b506cfe5598b25da51c9c5f23a51deccc1451 Mon Sep 17 00:00:00 2001 From: TeoV Date: Thu, 16 Apr 2020 17:01:27 +0300 Subject: [PATCH] Add test for cache replication for dispatcher_load --- .../dispatcher_engine/cgrates.json | 3 +- .../cache_replicate/engine1/cgrates.json | 6 ++ .../dispatcher_engine/DispatcherProfiles.csv | 3 +- .../dispatcher_engine2/DispatcherProfiles.csv | 3 +- dispatchers/libdispatcher.go | 26 +++-- general_tests/cacherpl_it_test.go | 102 ++++++++++++++++++ 6 files changed, 131 insertions(+), 12 deletions(-) diff --git a/data/conf/samples/cache_replicate/dispatcher_engine/cgrates.json b/data/conf/samples/cache_replicate/dispatcher_engine/cgrates.json index 70925f2fe..7a4c38a6d 100644 --- a/data/conf/samples/cache_replicate/dispatcher_engine/cgrates.json +++ b/data/conf/samples/cache_replicate/dispatcher_engine/cgrates.json @@ -20,7 +20,8 @@ "caches":{ "partitions": { - "*dispatcher_routes": {"limit": -1, "ttl": "1h", "replicate": true} + "*dispatcher_routes": {"limit": -1, "ttl": "1h", "replicate": true}, + "*dispatcher_loads": {"limit": -1, "replicate": true} }, "replication_conns": ["cacheReplication"], }, diff --git a/data/conf/samples/cache_replicate/engine1/cgrates.json b/data/conf/samples/cache_replicate/engine1/cgrates.json index 8293a6448..f7ba37b85 100644 --- a/data/conf/samples/cache_replicate/engine1/cgrates.json +++ b/data/conf/samples/cache_replicate/engine1/cgrates.json @@ -44,6 +44,12 @@ }, +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + + "apiers": { "enabled": true, "caches_conns":["conn1"], diff --git a/data/tariffplans/cache_replications/dispatcher_engine/DispatcherProfiles.csv b/data/tariffplans/cache_replications/dispatcher_engine/DispatcherProfiles.csv index 623c9f9d7..a4c4c28d5 100644 --- a/data/tariffplans/cache_replications/dispatcher_engine/DispatcherProfiles.csv +++ b/data/tariffplans/cache_replications/dispatcher_engine/DispatcherProfiles.csv @@ -1,2 +1,3 @@ #Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight -cgrates.org,Engine1,*any,,,*weight,,Engine1,,20,false,,10 \ No newline at end of file +cgrates.org,Engine1,*any,,,*weight,,Engine1,,20,false,,10 +cgrates.org,Engine2,*chargers,*string:~*req.EventName:TestLoad,,*load,,Engine1,,20,false,,20 \ No newline at end of file diff --git a/data/tariffplans/cache_replications/dispatcher_engine2/DispatcherProfiles.csv b/data/tariffplans/cache_replications/dispatcher_engine2/DispatcherProfiles.csv index 321c4af89..1a665991f 100644 --- a/data/tariffplans/cache_replications/dispatcher_engine2/DispatcherProfiles.csv +++ b/data/tariffplans/cache_replications/dispatcher_engine2/DispatcherProfiles.csv @@ -1,3 +1,4 @@ #Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight cgrates.org,InternalDispatcher,*caches;*core,,,*weight,,Self,,20,false,,30 -cgrates.org,ExternalDispatcher,*attributes,,,*weight,,Engine1,,20,false,,10 \ No newline at end of file +cgrates.org,ExternalDispatcher,*attributes,,,*weight,,Engine1,,20,false,,10 +cgrates.org,Engine2,*chargers,,,*load,,Engine1,,20,false,,10 \ No newline at end of file diff --git a/dispatchers/libdispatcher.go b/dispatchers/libdispatcher.go index 8cfa4825e..b02050819 100644 --- a/dispatchers/libdispatcher.go +++ b/dispatchers/libdispatcher.go @@ -19,6 +19,7 @@ along with this program. If not, see package dispatchers import ( + "encoding/gob" "fmt" "sort" "strconv" @@ -28,6 +29,11 @@ import ( "github.com/cgrates/cgrates/utils" ) +func init() { + gob.Register(new(LoadMetrics)) + +} + // Dispatcher is responsible for routing requests to pool of connections // there will be different implementations based on strategy @@ -324,7 +330,7 @@ func newLoadMetrics(hosts engine.DispatcherHostProfiles) (*LoadMetrics, error) { } type LoadMetrics struct { - sync.RWMutex + mutex sync.RWMutex HostsLoad map[string]int64 HostsRatio map[string]int64 SumRatio int64 @@ -370,8 +376,10 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID *stri continue } if routeID != nil && *routeID != "" { // cache the discovered route - engine.Cache.Set(utils.CacheDispatcherRoutes, *routeID, dH, - nil, true, utils.EmptyString) + if err = engine.Cache.Set(utils.CacheDispatcherRoutes, *routeID, dH, + nil, true, utils.EmptyString); err != nil { + return + } } break } @@ -380,14 +388,14 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID *stri func (lM *LoadMetrics) getHosts(hostIDs []string) []string { costs := make([]int64, len(hostIDs)) - lM.RLock() + lM.mutex.RLock() for i, id := range hostIDs { costs[i] = lM.HostsLoad[id] if costs[i] >= lM.HostsRatio[id] { costs[i] += lM.SumRatio } } - lM.RUnlock() + lM.mutex.RUnlock() sort.Slice(hostIDs, func(i, j int) bool { return costs[i] < costs[j] }) @@ -395,15 +403,15 @@ func (lM *LoadMetrics) getHosts(hostIDs []string) []string { } func (lM *LoadMetrics) incrementLoad(hostID, tntID string) { - lM.Lock() + lM.mutex.Lock() lM.HostsLoad[hostID] += 1 engine.Cache.ReplicateSet(utils.CacheDispatcherLoads, tntID, lM) - lM.Unlock() + lM.mutex.Unlock() } func (lM *LoadMetrics) decrementLoad(hostID, tntID string) { - lM.Lock() + lM.mutex.Lock() lM.HostsLoad[hostID] -= 1 engine.Cache.ReplicateSet(utils.CacheDispatcherLoads, tntID, lM) - lM.Unlock() + lM.mutex.Unlock() } diff --git a/general_tests/cacherpl_it_test.go b/general_tests/cacherpl_it_test.go index b544a0985..8d0ee6296 100644 --- a/general_tests/cacherpl_it_test.go +++ b/general_tests/cacherpl_it_test.go @@ -24,9 +24,13 @@ import ( "os/exec" "path" "reflect" + "sort" + "sync" "testing" "time" + v1 "github.com/cgrates/cgrates/apier/v1" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -51,6 +55,7 @@ var ( testCacheRplAddData, testCacheRplPing, testCacheRplCheckReplication, + testCacheRplCheckLoadReplication, testCacheRplStopEngine, } @@ -173,6 +178,22 @@ func testCacheRplAddData(t *testing.T) { case <-time.After(2 * time.Second): t.Errorf("cgr-loader failed: ") } + + chargerProfile := &v1.ChargerWithCache{ + ChargerProfile: &engine.ChargerProfile{ + Tenant: "cgrates.org", + ID: "DefaultCharger", + RunID: utils.MetaDefault, + AttributeIDs: []string{utils.META_NONE}, + Weight: 20, + }, + } + var result string + if err := engine1RPC.Call(utils.APIerSv1SetChargerProfile, chargerProfile, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } } func testCacheRplPing(t *testing.T) { @@ -231,6 +252,8 @@ func testCacheRplCheckReplication(t *testing.T) { if err := dspEngine2RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err != nil { t.Error(err.Error()) } + sort.Strings(rcvKeys) + sort.Strings(expKeys) if !reflect.DeepEqual(expKeys, rcvKeys) { t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys) } @@ -250,6 +273,85 @@ func testCacheRplCheckReplication(t *testing.T) { } } +func testCacheRplCheckLoadReplication(t *testing.T) { + var rcvKeys []string + argsAPI := utils.ArgsGetCacheItemIDsWithArgDispatcher{ + TenantArg: utils.TenantArg{ + Tenant: "cgrates.org", + }, + ArgsGetCacheItemIDs: utils.ArgsGetCacheItemIDs{ + CacheID: utils.CacheDispatcherLoads, + }, + } + if err := dspEngine2RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } + + var rpl []*engine.ChrgSProcessEventReply + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + if err := dspEngine1RPC.Call(utils.ChargerSv1ProcessEvent, &utils.CGREventWithArgDispatcher{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testCacheRplCheckLoadReplication", + Event: map[string]interface{}{ + utils.Account: "1007", + utils.Destination: "+491511231234", + "EventName": "TestLoad", + }, + }, + ArgDispatcher: &utils.ArgDispatcher{ + RouteID: utils.StringPointer("testRoute123"), + }, + }, &rpl); err != nil { + t.Error(err) + } else if rpl[0].ChargerSProfile != "DefaultCharger" { + t.Errorf("Received: %+v", utils.ToJSON(rpl)) + } + wg.Done() + }() + } + wg.Wait() + expKeys := []string{"testRoute123:*core", "testRoute123:*attributes", "testRoute123:*chargers"} + argsAPI = utils.ArgsGetCacheItemIDsWithArgDispatcher{ + TenantArg: utils.TenantArg{ + Tenant: "cgrates.org", + }, + ArgsGetCacheItemIDs: utils.ArgsGetCacheItemIDs{ + CacheID: utils.CacheDispatcherRoutes, + }, + } + if err := dspEngine2RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err != nil { + t.Error(err.Error()) + } + sort.Strings(rcvKeys) + sort.Strings(expKeys) + if !reflect.DeepEqual(expKeys, rcvKeys) { + t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys) + } + + expKeys = []string{"cgrates.org:Engine2"} + argsAPI = utils.ArgsGetCacheItemIDsWithArgDispatcher{ + TenantArg: utils.TenantArg{ + Tenant: "cgrates.org", + }, + ArgsGetCacheItemIDs: utils.ArgsGetCacheItemIDs{ + CacheID: utils.CacheDispatcherLoads, + }, + } + if err := dspEngine2RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err != nil { + t.Error(err.Error()) + } + sort.Strings(rcvKeys) + sort.Strings(expKeys) + if !reflect.DeepEqual(expKeys, rcvKeys) { + t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys) + } + +} + func testCacheRplStopEngine(t *testing.T) { if err := engine.KillEngine(*waitRater); err != nil { t.Error(err)