Add test for Cache Replicate Active-Active

This commit is contained in:
TeoV
2020-04-23 15:16:21 +03:00
committed by Dan Christian Bogos
parent 10833ec312
commit e243956d77
8 changed files with 488 additions and 0 deletions

View File

@@ -0,0 +1,53 @@
{
"general": {
"node_id": "DispatcherEngine",
"log_level": 7,
"reconnects": 1,
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
},
"stor_db": {
"db_type": "*internal",
},
"caches":{
"partitions": {
"*dispatcher_routes": {"limit": -1, "ttl": "1h", "replicate": true},
"*dispatcher_loads": {"limit": -1, "replicate": true}
},
"replication_conns": ["cacheReplication"],
},
"rpc_conns": {
"cacheReplication": {
"conns": [{"address": "127.0.0.1:3013", "transport":"*gob"}],
},
},
"schedulers": {
"enabled": true,
},
"dispatchers":{
"enabled": true,
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
},
}

View File

@@ -0,0 +1,60 @@
{
"general": {
"node_id": "DispatcherEngine2",
"log_level": 7,
"reconnects": 1,
},
"listen": {
"rpc_json": ":3012",
"rpc_gob": ":3013",
"http": ":3080",
},
"data_db": {
"db_type": "redis",
"db_port": 6379,
"db_name": "11",
},
"stor_db": {
"db_type": "*internal",
},
"caches":{
"partitions": {
"*dispatcher_routes": {"limit": -1, "ttl": "1h", "replicate": true},
"*dispatcher_loads": {"limit": -1, "replicate": true}
},
"replication_conns": ["cacheReplication"],
},
"rpc_conns": {
"cacheReplication": {
"conns": [{"address": "127.0.0.1:2013", "transport":"*gob"}],
},
},
"schedulers": {
"enabled": true,
},
"dispatchers":{
"enabled": true,
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
},
}

View File

@@ -0,0 +1,59 @@
{
// CGRateS Configuration file
//
"general": {
"node_id": "Engine1",
"log_level": 7
},
"listen": {
"rpc_json": ":6012",
"rpc_gob": ":6013",
"http": ":6080",
},
"data_db": {
"db_type": "*internal",
},
"stor_db": {
"db_type": "*internal"
},
"rpc_conns": {
"conn1": {
"strategy": "*first",
"conns": [{"address": "127.0.0.1:6012", "transport":"*json"}],
},
},
"attributes": {
"enabled": true
},
"rals": {
"enabled": true,
},
"schedulers": {
"enabled": true,
},
"chargers": {
"enabled": true,
"attributes_conns": ["*internal"],
},
"apiers": {
"enabled": true,
"caches_conns":["conn1"],
"scheduler_conns": ["*internal"],
},
}

View File

@@ -0,0 +1,3 @@
#Tenant[0],ID[1],Address[2],Transport[3],TLS[4]
cgrates.org,Self,*internal,,
cgrates.org,Engine1,127.0.0.1:6012,*json,false
1 #Tenant[0] ID[1] Address[2] Transport[3] TLS[4]
2 cgrates.org Self *internal
3 cgrates.org Engine1 127.0.0.1:6012 *json false

View File

@@ -0,0 +1,4 @@
#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight
cgrates.org,InternalDispatcher,*caches;*core,,,*weight,,Self,,20,false,,30
cgrates.org,Engine1,*any,,,*weight,,Engine1,,20,false,,10
cgrates.org,Engine2,*chargers,*string:~*req.EventName:TestLoad,,*load,,Engine1,,20,false,,20
1 #Tenant ID Subsystems FilterIDs ActivationInterval Strategy StrategyParameters ConnID ConnFilterIDs ConnWeight ConnBlocker ConnParameters Weight
2 cgrates.org InternalDispatcher *caches;*core *weight Self 20 false 30
3 cgrates.org Engine1 *any *weight Engine1 20 false 10
4 cgrates.org Engine2 *chargers *string:~*req.EventName:TestLoad *load Engine1 20 false 20

View File

@@ -0,0 +1,3 @@
#Tenant[0],ID[1],Address[2],Transport[3],TLS[4]
cgrates.org,Self,*internal,,
cgrates.org,Engine1,127.0.0.1:6012,*json,false
1 #Tenant[0] ID[1] Address[2] Transport[3] TLS[4]
2 cgrates.org Self *internal
3 cgrates.org Engine1 127.0.0.1:6012 *json false

View File

@@ -0,0 +1,4 @@
#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight
cgrates.org,InternalDispatcher,*caches;*core,,,*weight,,Self,,20,false,,30
cgrates.org,ExternalDispatcher,*attributes,,,*weight,,Engine1,,20,false,,10
cgrates.org,Engine2,*chargers,,,*load,,Engine1,,20,false,,10
1 #Tenant ID Subsystems FilterIDs ActivationInterval Strategy StrategyParameters ConnID ConnFilterIDs ConnWeight ConnBlocker ConnParameters Weight
2 cgrates.org InternalDispatcher *caches;*core *weight Self 20 false 30
3 cgrates.org ExternalDispatcher *attributes *weight Engine1 20 false 10
4 cgrates.org Engine2 *chargers *load Engine1 20 false 10

View File

@@ -59,6 +59,18 @@ var (
testCacheRplStopEngine,
}
sTestsCacheRplAA = []func(t *testing.T){
testCacheRplAAInitCfg,
testCacheRplInitDataDb,
testCacheRplStartEngine,
testCacheRplRpcConn,
testCacheRplAAAddData,
testCacheRplAACheckReplication,
testCacheRplAACheckLoadReplication,
testCacheRplStopEngine,
}
)
func TestCacheReplications(t *testing.T) {
@@ -79,6 +91,23 @@ func TestCacheReplications(t *testing.T) {
}
func TestCacheReplicationActiveActive(t *testing.T) {
switch *dbType {
case utils.MetaInternal:
t.SkipNow()
case utils.MetaMySQL:
for _, stest := range sTestsCacheRplAA {
t.Run("TestCacheReplicationActiveActive", stest)
}
case utils.MetaMongo:
t.SkipNow()
case utils.MetaPostgres:
t.SkipNow()
default:
t.Fatal("Unknown Database type")
}
}
func testCacheRplInitCfg(t *testing.T) {
var err error
dspEngine1CfgPath = path.Join(*dataDir, "conf", "samples", "cache_replicate", "dispatcher_engine")
@@ -100,6 +129,27 @@ func testCacheRplInitCfg(t *testing.T) {
}
}
func testCacheRplAAInitCfg(t *testing.T) {
var err error
dspEngine1CfgPath = path.Join(*dataDir, "conf", "samples", "cache_rpl_active_active", "dispatcher_engine")
dspEngine1Cfg, err = config.NewCGRConfigFromPath(dspEngine1CfgPath)
if err != nil {
t.Error(err)
}
dspEngine2CfgPath = path.Join(*dataDir, "conf", "samples", "cache_rpl_active_active", "dispatcher_engine2")
dspEngine2Cfg, err = config.NewCGRConfigFromPath(dspEngine2CfgPath)
if err != nil {
t.Error(err)
}
engine1CfgPath = path.Join(*dataDir, "conf", "samples", "cache_rpl_active_active", "engine1")
engine1Cfg, err = config.NewCGRConfigFromPath(engine1CfgPath)
if err != nil {
t.Error(err)
}
}
func testCacheRplInitDataDb(t *testing.T) {
if err := engine.InitDataDb(dspEngine1Cfg); err != nil {
t.Fatal(err)
@@ -196,6 +246,65 @@ func testCacheRplAddData(t *testing.T) {
}
}
func testCacheRplAAAddData(t *testing.T) {
wchan := make(chan struct{}, 1)
go func() {
loaderPath, err := exec.LookPath("cgr-loader")
if err != nil {
t.Error(err)
}
loader := exec.Command(loaderPath, "-config_path", dspEngine1CfgPath, "-path",
path.Join(*dataDir, "tariffplans", "cache_rpl_active_active", "dispatcher_engine"))
if err := loader.Start(); err != nil {
t.Error(err)
}
loader.Wait()
wchan <- struct{}{}
}()
select {
case <-wchan:
case <-time.After(2 * time.Second):
t.Errorf("cgr-loader failed: ")
}
go func() {
loaderPath, err := exec.LookPath("cgr-loader")
if err != nil {
t.Error(err)
}
loader := exec.Command(loaderPath, "-config_path", dspEngine2CfgPath, "-path",
path.Join(*dataDir, "tariffplans", "cache_rpl_active_active", "dispatcher_engine2"))
if err := loader.Start(); err != nil {
t.Error(err)
}
loader.Wait()
wchan <- struct{}{}
}()
select {
case <-wchan:
case <-time.After(2 * time.Second):
t.Errorf("cgr-loader failed: ")
}
chargerProfile := &v1.ChargerWithCache{
ChargerProfile: &engine.ChargerProfile{
Tenant: "cgrates.org",
ID: "DefaultCharger",
RunID: utils.MetaDefault,
AttributeIDs: []string{utils.META_NONE},
Weight: 20,
},
}
var result string
if err := engine1RPC.Call(utils.APIerSv1SetChargerProfile, chargerProfile, &result); err != nil {
t.Error(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
}
func testCacheRplPing(t *testing.T) {
var reply map[string]interface{}
ev := utils.TenantWithArgDispatcher{
@@ -273,6 +382,199 @@ func testCacheRplCheckReplication(t *testing.T) {
}
}
func testCacheRplAACheckReplication(t *testing.T) {
var rcvKeys []string
argsAPI := utils.ArgsGetCacheItemIDsWithArgDispatcher{
TenantArg: utils.TenantArg{
Tenant: "cgrates.org",
},
ArgsGetCacheItemIDs: utils.ArgsGetCacheItemIDs{
CacheID: utils.CacheDispatcherRoutes,
},
}
if err := dspEngine1RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err == nil ||
err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
if err := dspEngine2RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err == nil ||
err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
var rpl string
if err := dspEngine2RPC.Call(utils.AttributeSv1Ping, &utils.CGREventWithArgDispatcher{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
},
ArgDispatcher: &utils.ArgDispatcher{
RouteID: utils.StringPointer("testRouteFromDispatcher2"),
},
}, &rpl); err != nil {
t.Error(err)
} else if rpl != utils.Pong {
t.Errorf("Received: %s", rpl)
}
if err := dspEngine1RPC.Call(utils.AttributeSv1Ping, &utils.CGREventWithArgDispatcher{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
},
ArgDispatcher: &utils.ArgDispatcher{
RouteID: utils.StringPointer("testRouteFromDispatcher1"),
},
}, &rpl); err != nil {
t.Error(err)
} else if rpl != utils.Pong {
t.Errorf("Received: %s", rpl)
}
expKeys := []string{"testRouteFromDispatcher2:*attributes", "testRouteFromDispatcher1:*attributes"}
if err := dspEngine2RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err != nil {
t.Error(err.Error())
}
sort.Strings(rcvKeys)
sort.Strings(expKeys)
if !reflect.DeepEqual(expKeys, rcvKeys) {
t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys)
}
if err := dspEngine1RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err != nil {
t.Error(err.Error())
}
sort.Strings(rcvKeys)
sort.Strings(expKeys)
if !reflect.DeepEqual(expKeys, rcvKeys) {
t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys)
}
}
func testCacheRplAACheckLoadReplication(t *testing.T) {
var rcvKeys []string
argsAPI := utils.ArgsGetCacheItemIDsWithArgDispatcher{
TenantArg: utils.TenantArg{
Tenant: "cgrates.org",
},
ArgsGetCacheItemIDs: utils.ArgsGetCacheItemIDs{
CacheID: utils.CacheDispatcherLoads,
},
}
if err := dspEngine2RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err == nil ||
err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
if err := dspEngine1RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err == nil ||
err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
var rpl []*engine.ChrgSProcessEventReply
var wgDisp1 sync.WaitGroup
var wgDisp2 sync.WaitGroup
for i := 0; i < 10; i++ {
wgDisp1.Add(1)
wgDisp2.Add(1)
go func() {
if err := dspEngine1RPC.Call(utils.ChargerSv1ProcessEvent, &utils.CGREventWithArgDispatcher{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testCacheRplAACheckLoadReplication",
Event: map[string]interface{}{
utils.Account: "1007",
utils.Destination: "+491511231234",
"EventName": "TestLoad",
},
},
ArgDispatcher: &utils.ArgDispatcher{
RouteID: utils.StringPointer("testRouteFromDispatcher1"),
},
}, &rpl); err != nil {
t.Error(err)
} else if rpl[0].ChargerSProfile != "DefaultCharger" {
t.Errorf("Received: %+v", utils.ToJSON(rpl))
}
wgDisp1.Done()
}()
go func() {
if err := dspEngine2RPC.Call(utils.ChargerSv1ProcessEvent, &utils.CGREventWithArgDispatcher{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testCacheRplAACheckLoadReplication",
Event: map[string]interface{}{
utils.Account: "1007",
utils.Destination: "+491511231234",
"EventName": "TestLoad",
},
},
ArgDispatcher: &utils.ArgDispatcher{
RouteID: utils.StringPointer("testRouteFromDispatcher2"),
},
}, &rpl); err != nil {
t.Error(err)
} else if rpl[0].ChargerSProfile != "DefaultCharger" {
t.Errorf("Received: %+v", utils.ToJSON(rpl))
}
wgDisp2.Done()
}()
}
wgDisp1.Wait()
wgDisp2.Wait()
expKeys := []string{"testRouteFromDispatcher1:*attributes",
"testRouteFromDispatcher1:*chargers", "testRouteFromDispatcher2:*attributes",
"testRouteFromDispatcher2:*chargers"}
argsAPI = utils.ArgsGetCacheItemIDsWithArgDispatcher{
TenantArg: utils.TenantArg{
Tenant: "cgrates.org",
},
ArgsGetCacheItemIDs: utils.ArgsGetCacheItemIDs{
CacheID: utils.CacheDispatcherRoutes,
},
}
if err := dspEngine2RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err != nil {
t.Error(err.Error())
}
sort.Strings(rcvKeys)
sort.Strings(expKeys)
if !reflect.DeepEqual(expKeys, rcvKeys) {
t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys)
}
if err := dspEngine1RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err != nil {
t.Error(err.Error())
}
sort.Strings(rcvKeys)
sort.Strings(expKeys)
if !reflect.DeepEqual(expKeys, rcvKeys) {
t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys)
}
expKeys = []string{"cgrates.org:Engine2"}
argsAPI = utils.ArgsGetCacheItemIDsWithArgDispatcher{
TenantArg: utils.TenantArg{
Tenant: "cgrates.org",
},
ArgsGetCacheItemIDs: utils.ArgsGetCacheItemIDs{
CacheID: utils.CacheDispatcherLoads,
},
}
if err := dspEngine2RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err != nil {
t.Error(err.Error())
}
sort.Strings(rcvKeys)
sort.Strings(expKeys)
if !reflect.DeepEqual(expKeys, rcvKeys) {
t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys)
}
if err := dspEngine1RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err != nil {
t.Error(err.Error())
}
sort.Strings(rcvKeys)
sort.Strings(expKeys)
if !reflect.DeepEqual(expKeys, rcvKeys) {
t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys)
}
}
func testCacheRplCheckLoadReplication(t *testing.T) {
var rcvKeys []string
argsAPI := utils.ArgsGetCacheItemIDsWithArgDispatcher{