Add test for cache replication for dispatcher_load

This commit is contained in:
TeoV
2020-04-16 17:01:27 +03:00
committed by Dan Christian Bogos
parent 9e9dfa8d99
commit 758b506cfe
6 changed files with 131 additions and 12 deletions

View File

@@ -20,7 +20,8 @@
"caches":{
"partitions": {
"*dispatcher_routes": {"limit": -1, "ttl": "1h", "replicate": true}
"*dispatcher_routes": {"limit": -1, "ttl": "1h", "replicate": true},
"*dispatcher_loads": {"limit": -1, "replicate": true}
},
"replication_conns": ["cacheReplication"],
},

View File

@@ -44,6 +44,12 @@
},
"chargers": {
"enabled": true,
"attributes_conns": ["*internal"],
},
"apiers": {
"enabled": true,
"caches_conns":["conn1"],

View File

@@ -1,2 +1,3 @@
#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight
cgrates.org,Engine1,*any,,,*weight,,Engine1,,20,false,,10
cgrates.org,Engine1,*any,,,*weight,,Engine1,,20,false,,10
cgrates.org,Engine2,*chargers,*string:~*req.EventName:TestLoad,,*load,,Engine1,,20,false,,20
1 #Tenant ID Subsystems FilterIDs ActivationInterval Strategy StrategyParameters ConnID ConnFilterIDs ConnWeight ConnBlocker ConnParameters Weight
2 cgrates.org Engine1 *any *weight Engine1 20 false 10
3 cgrates.org Engine2 *chargers *string:~*req.EventName:TestLoad *load Engine1 20 false 20

View File

@@ -1,3 +1,4 @@
#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight
cgrates.org,InternalDispatcher,*caches;*core,,,*weight,,Self,,20,false,,30
cgrates.org,ExternalDispatcher,*attributes,,,*weight,,Engine1,,20,false,,10
cgrates.org,ExternalDispatcher,*attributes,,,*weight,,Engine1,,20,false,,10
cgrates.org,Engine2,*chargers,,,*load,,Engine1,,20,false,,10
1 #Tenant ID Subsystems FilterIDs ActivationInterval Strategy StrategyParameters ConnID ConnFilterIDs ConnWeight ConnBlocker ConnParameters Weight
2 cgrates.org InternalDispatcher *caches;*core *weight Self 20 false 30
3 cgrates.org ExternalDispatcher *attributes *weight Engine1 20 false 10
4 cgrates.org Engine2 *chargers *load Engine1 20 false 10

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package dispatchers
import (
"encoding/gob"
"fmt"
"sort"
"strconv"
@@ -28,6 +29,11 @@ import (
"github.com/cgrates/cgrates/utils"
)
func init() {
gob.Register(new(LoadMetrics))
}
// Dispatcher is responsible for routing requests to pool of connections
// there will be different implementations based on strategy
@@ -324,7 +330,7 @@ func newLoadMetrics(hosts engine.DispatcherHostProfiles) (*LoadMetrics, error) {
}
type LoadMetrics struct {
sync.RWMutex
mutex sync.RWMutex
HostsLoad map[string]int64
HostsRatio map[string]int64
SumRatio int64
@@ -370,8 +376,10 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID *stri
continue
}
if routeID != nil && *routeID != "" { // cache the discovered route
engine.Cache.Set(utils.CacheDispatcherRoutes, *routeID, dH,
nil, true, utils.EmptyString)
if err = engine.Cache.Set(utils.CacheDispatcherRoutes, *routeID, dH,
nil, true, utils.EmptyString); err != nil {
return
}
}
break
}
@@ -380,14 +388,14 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID *stri
func (lM *LoadMetrics) getHosts(hostIDs []string) []string {
costs := make([]int64, len(hostIDs))
lM.RLock()
lM.mutex.RLock()
for i, id := range hostIDs {
costs[i] = lM.HostsLoad[id]
if costs[i] >= lM.HostsRatio[id] {
costs[i] += lM.SumRatio
}
}
lM.RUnlock()
lM.mutex.RUnlock()
sort.Slice(hostIDs, func(i, j int) bool {
return costs[i] < costs[j]
})
@@ -395,15 +403,15 @@ func (lM *LoadMetrics) getHosts(hostIDs []string) []string {
}
func (lM *LoadMetrics) incrementLoad(hostID, tntID string) {
lM.Lock()
lM.mutex.Lock()
lM.HostsLoad[hostID] += 1
engine.Cache.ReplicateSet(utils.CacheDispatcherLoads, tntID, lM)
lM.Unlock()
lM.mutex.Unlock()
}
func (lM *LoadMetrics) decrementLoad(hostID, tntID string) {
lM.Lock()
lM.mutex.Lock()
lM.HostsLoad[hostID] -= 1
engine.Cache.ReplicateSet(utils.CacheDispatcherLoads, tntID, lM)
lM.Unlock()
lM.mutex.Unlock()
}

View File

@@ -24,9 +24,13 @@ import (
"os/exec"
"path"
"reflect"
"sort"
"sync"
"testing"
"time"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -51,6 +55,7 @@ var (
testCacheRplAddData,
testCacheRplPing,
testCacheRplCheckReplication,
testCacheRplCheckLoadReplication,
testCacheRplStopEngine,
}
@@ -173,6 +178,22 @@ func testCacheRplAddData(t *testing.T) {
case <-time.After(2 * time.Second):
t.Errorf("cgr-loader failed: ")
}
chargerProfile := &v1.ChargerWithCache{
ChargerProfile: &engine.ChargerProfile{
Tenant: "cgrates.org",
ID: "DefaultCharger",
RunID: utils.MetaDefault,
AttributeIDs: []string{utils.META_NONE},
Weight: 20,
},
}
var result string
if err := engine1RPC.Call(utils.APIerSv1SetChargerProfile, chargerProfile, &result); err != nil {
t.Error(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
}
func testCacheRplPing(t *testing.T) {
@@ -231,6 +252,8 @@ func testCacheRplCheckReplication(t *testing.T) {
if err := dspEngine2RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err != nil {
t.Error(err.Error())
}
sort.Strings(rcvKeys)
sort.Strings(expKeys)
if !reflect.DeepEqual(expKeys, rcvKeys) {
t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys)
}
@@ -250,6 +273,85 @@ func testCacheRplCheckReplication(t *testing.T) {
}
}
func testCacheRplCheckLoadReplication(t *testing.T) {
var rcvKeys []string
argsAPI := utils.ArgsGetCacheItemIDsWithArgDispatcher{
TenantArg: utils.TenantArg{
Tenant: "cgrates.org",
},
ArgsGetCacheItemIDs: utils.ArgsGetCacheItemIDs{
CacheID: utils.CacheDispatcherLoads,
},
}
if err := dspEngine2RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
var rpl []*engine.ChrgSProcessEventReply
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
if err := dspEngine1RPC.Call(utils.ChargerSv1ProcessEvent, &utils.CGREventWithArgDispatcher{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testCacheRplCheckLoadReplication",
Event: map[string]interface{}{
utils.Account: "1007",
utils.Destination: "+491511231234",
"EventName": "TestLoad",
},
},
ArgDispatcher: &utils.ArgDispatcher{
RouteID: utils.StringPointer("testRoute123"),
},
}, &rpl); err != nil {
t.Error(err)
} else if rpl[0].ChargerSProfile != "DefaultCharger" {
t.Errorf("Received: %+v", utils.ToJSON(rpl))
}
wg.Done()
}()
}
wg.Wait()
expKeys := []string{"testRoute123:*core", "testRoute123:*attributes", "testRoute123:*chargers"}
argsAPI = utils.ArgsGetCacheItemIDsWithArgDispatcher{
TenantArg: utils.TenantArg{
Tenant: "cgrates.org",
},
ArgsGetCacheItemIDs: utils.ArgsGetCacheItemIDs{
CacheID: utils.CacheDispatcherRoutes,
},
}
if err := dspEngine2RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err != nil {
t.Error(err.Error())
}
sort.Strings(rcvKeys)
sort.Strings(expKeys)
if !reflect.DeepEqual(expKeys, rcvKeys) {
t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys)
}
expKeys = []string{"cgrates.org:Engine2"}
argsAPI = utils.ArgsGetCacheItemIDsWithArgDispatcher{
TenantArg: utils.TenantArg{
Tenant: "cgrates.org",
},
ArgsGetCacheItemIDs: utils.ArgsGetCacheItemIDs{
CacheID: utils.CacheDispatcherLoads,
},
}
if err := dspEngine2RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err != nil {
t.Error(err.Error())
}
sort.Strings(rcvKeys)
sort.Strings(expKeys)
if !reflect.DeepEqual(expKeys, rcvKeys) {
t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys)
}
}
func testCacheRplStopEngine(t *testing.T) {
if err := engine.KillEngine(*waitRater); err != nil {
t.Error(err)