diff --git a/data/conf/samples/cache_rpl_active_active/dispatcher_engine/cgrates.json b/data/conf/samples/cache_rpl_active_active/dispatcher_engine/cgrates.json new file mode 100644 index 000000000..7a4c38a6d --- /dev/null +++ b/data/conf/samples/cache_rpl_active_active/dispatcher_engine/cgrates.json @@ -0,0 +1,53 @@ +{ + +"general": { + "node_id": "DispatcherEngine", + "log_level": 7, + "reconnects": 1, +}, + + +"listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080", +}, + +"stor_db": { + "db_type": "*internal", +}, + + +"caches":{ + "partitions": { + "*dispatcher_routes": {"limit": -1, "ttl": "1h", "replicate": true}, + "*dispatcher_loads": {"limit": -1, "replicate": true} + }, + "replication_conns": ["cacheReplication"], +}, + + +"rpc_conns": { + "cacheReplication": { + "conns": [{"address": "127.0.0.1:3013", "transport":"*gob"}], + }, +}, + + +"schedulers": { + "enabled": true, +}, + + +"dispatchers":{ + "enabled": true, +}, + + +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, + + +} \ No newline at end of file diff --git a/data/conf/samples/cache_rpl_active_active/dispatcher_engine2/cgrates.json b/data/conf/samples/cache_rpl_active_active/dispatcher_engine2/cgrates.json new file mode 100644 index 000000000..d7b0a917f --- /dev/null +++ b/data/conf/samples/cache_rpl_active_active/dispatcher_engine2/cgrates.json @@ -0,0 +1,60 @@ +{ + +"general": { + "node_id": "DispatcherEngine2", + "log_level": 7, + "reconnects": 1, +}, + + +"listen": { + "rpc_json": ":3012", + "rpc_gob": ":3013", + "http": ":3080", +}, + + +"data_db": { + "db_type": "redis", + "db_port": 6379, + "db_name": "11", +}, + + +"stor_db": { + "db_type": "*internal", +}, + +"caches":{ + "partitions": { + "*dispatcher_routes": {"limit": -1, "ttl": "1h", "replicate": true}, + "*dispatcher_loads": {"limit": -1, "replicate": true} + }, + "replication_conns": ["cacheReplication"], +}, + + +"rpc_conns": { + "cacheReplication": { + "conns": [{"address": "127.0.0.1:2013", "transport":"*gob"}], + }, +}, + + +"schedulers": { + "enabled": true, +}, + + +"dispatchers":{ + "enabled": true, +}, + + +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, + + +} \ No newline at end of file diff --git a/data/conf/samples/cache_rpl_active_active/engine1/cgrates.json b/data/conf/samples/cache_rpl_active_active/engine1/cgrates.json new file mode 100644 index 000000000..f7ba37b85 --- /dev/null +++ b/data/conf/samples/cache_rpl_active_active/engine1/cgrates.json @@ -0,0 +1,59 @@ +{ +// CGRateS Configuration file +// + + +"general": { + "node_id": "Engine1", + "log_level": 7 +}, + + +"listen": { + "rpc_json": ":6012", + "rpc_gob": ":6013", + "http": ":6080", +}, + +"data_db": { + "db_type": "*internal", +}, + +"stor_db": { + "db_type": "*internal" +}, + + +"rpc_conns": { + "conn1": { + "strategy": "*first", + "conns": [{"address": "127.0.0.1:6012", "transport":"*json"}], + }, +}, + +"attributes": { + "enabled": true +}, + +"rals": { + "enabled": true, +}, + +"schedulers": { + "enabled": true, +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + + +"apiers": { + "enabled": true, + "caches_conns":["conn1"], + "scheduler_conns": ["*internal"], +}, + +} diff --git a/data/tariffplans/cache_rpl_active_active/dispatcher_engine/DispatcherHosts.csv b/data/tariffplans/cache_rpl_active_active/dispatcher_engine/DispatcherHosts.csv new file mode 100644 index 000000000..a5b9e604b --- /dev/null +++ b/data/tariffplans/cache_rpl_active_active/dispatcher_engine/DispatcherHosts.csv @@ -0,0 +1,3 @@ +#Tenant[0],ID[1],Address[2],Transport[3],TLS[4] +cgrates.org,Self,*internal,, +cgrates.org,Engine1,127.0.0.1:6012,*json,false diff --git a/data/tariffplans/cache_rpl_active_active/dispatcher_engine/DispatcherProfiles.csv b/data/tariffplans/cache_rpl_active_active/dispatcher_engine/DispatcherProfiles.csv new file mode 100644 index 000000000..b5f51282f --- /dev/null +++ b/data/tariffplans/cache_rpl_active_active/dispatcher_engine/DispatcherProfiles.csv @@ -0,0 +1,4 @@ +#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight +cgrates.org,InternalDispatcher,*caches;*core,,,*weight,,Self,,20,false,,30 +cgrates.org,Engine1,*any,,,*weight,,Engine1,,20,false,,10 +cgrates.org,Engine2,*chargers,*string:~*req.EventName:TestLoad,,*load,,Engine1,,20,false,,20 \ No newline at end of file diff --git a/data/tariffplans/cache_rpl_active_active/dispatcher_engine2/DispatcherHosts.csv b/data/tariffplans/cache_rpl_active_active/dispatcher_engine2/DispatcherHosts.csv new file mode 100644 index 000000000..a5b9e604b --- /dev/null +++ b/data/tariffplans/cache_rpl_active_active/dispatcher_engine2/DispatcherHosts.csv @@ -0,0 +1,3 @@ +#Tenant[0],ID[1],Address[2],Transport[3],TLS[4] +cgrates.org,Self,*internal,, +cgrates.org,Engine1,127.0.0.1:6012,*json,false diff --git a/data/tariffplans/cache_rpl_active_active/dispatcher_engine2/DispatcherProfiles.csv b/data/tariffplans/cache_rpl_active_active/dispatcher_engine2/DispatcherProfiles.csv new file mode 100644 index 000000000..1a665991f --- /dev/null +++ b/data/tariffplans/cache_rpl_active_active/dispatcher_engine2/DispatcherProfiles.csv @@ -0,0 +1,4 @@ +#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight +cgrates.org,InternalDispatcher,*caches;*core,,,*weight,,Self,,20,false,,30 +cgrates.org,ExternalDispatcher,*attributes,,,*weight,,Engine1,,20,false,,10 +cgrates.org,Engine2,*chargers,,,*load,,Engine1,,20,false,,10 \ No newline at end of file diff --git a/general_tests/cacherpl_it_test.go b/general_tests/cacherpl_it_test.go index 8d0ee6296..157736d6a 100644 --- a/general_tests/cacherpl_it_test.go +++ b/general_tests/cacherpl_it_test.go @@ -59,6 +59,18 @@ var ( testCacheRplStopEngine, } + + sTestsCacheRplAA = []func(t *testing.T){ + testCacheRplAAInitCfg, + testCacheRplInitDataDb, + testCacheRplStartEngine, + testCacheRplRpcConn, + testCacheRplAAAddData, + testCacheRplAACheckReplication, + testCacheRplAACheckLoadReplication, + + testCacheRplStopEngine, + } ) func TestCacheReplications(t *testing.T) { @@ -79,6 +91,23 @@ func TestCacheReplications(t *testing.T) { } +func TestCacheReplicationActiveActive(t *testing.T) { + switch *dbType { + case utils.MetaInternal: + t.SkipNow() + case utils.MetaMySQL: + for _, stest := range sTestsCacheRplAA { + t.Run("TestCacheReplicationActiveActive", stest) + } + case utils.MetaMongo: + t.SkipNow() + case utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("Unknown Database type") + } +} + func testCacheRplInitCfg(t *testing.T) { var err error dspEngine1CfgPath = path.Join(*dataDir, "conf", "samples", "cache_replicate", "dispatcher_engine") @@ -100,6 +129,27 @@ func testCacheRplInitCfg(t *testing.T) { } } +func testCacheRplAAInitCfg(t *testing.T) { + var err error + dspEngine1CfgPath = path.Join(*dataDir, "conf", "samples", "cache_rpl_active_active", "dispatcher_engine") + dspEngine1Cfg, err = config.NewCGRConfigFromPath(dspEngine1CfgPath) + if err != nil { + t.Error(err) + } + + dspEngine2CfgPath = path.Join(*dataDir, "conf", "samples", "cache_rpl_active_active", "dispatcher_engine2") + dspEngine2Cfg, err = config.NewCGRConfigFromPath(dspEngine2CfgPath) + if err != nil { + t.Error(err) + } + + engine1CfgPath = path.Join(*dataDir, "conf", "samples", "cache_rpl_active_active", "engine1") + engine1Cfg, err = config.NewCGRConfigFromPath(engine1CfgPath) + if err != nil { + t.Error(err) + } +} + func testCacheRplInitDataDb(t *testing.T) { if err := engine.InitDataDb(dspEngine1Cfg); err != nil { t.Fatal(err) @@ -196,6 +246,65 @@ func testCacheRplAddData(t *testing.T) { } } +func testCacheRplAAAddData(t *testing.T) { + wchan := make(chan struct{}, 1) + go func() { + loaderPath, err := exec.LookPath("cgr-loader") + if err != nil { + t.Error(err) + } + loader := exec.Command(loaderPath, "-config_path", dspEngine1CfgPath, "-path", + path.Join(*dataDir, "tariffplans", "cache_rpl_active_active", "dispatcher_engine")) + + if err := loader.Start(); err != nil { + t.Error(err) + } + loader.Wait() + wchan <- struct{}{} + }() + select { + case <-wchan: + case <-time.After(2 * time.Second): + t.Errorf("cgr-loader failed: ") + } + + go func() { + loaderPath, err := exec.LookPath("cgr-loader") + if err != nil { + t.Error(err) + } + loader := exec.Command(loaderPath, "-config_path", dspEngine2CfgPath, "-path", + path.Join(*dataDir, "tariffplans", "cache_rpl_active_active", "dispatcher_engine2")) + + if err := loader.Start(); err != nil { + t.Error(err) + } + loader.Wait() + wchan <- struct{}{} + }() + select { + case <-wchan: + case <-time.After(2 * time.Second): + t.Errorf("cgr-loader failed: ") + } + + chargerProfile := &v1.ChargerWithCache{ + ChargerProfile: &engine.ChargerProfile{ + Tenant: "cgrates.org", + ID: "DefaultCharger", + RunID: utils.MetaDefault, + AttributeIDs: []string{utils.META_NONE}, + Weight: 20, + }, + } + var result string + if err := engine1RPC.Call(utils.APIerSv1SetChargerProfile, chargerProfile, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } +} + func testCacheRplPing(t *testing.T) { var reply map[string]interface{} ev := utils.TenantWithArgDispatcher{ @@ -273,6 +382,199 @@ func testCacheRplCheckReplication(t *testing.T) { } } +func testCacheRplAACheckReplication(t *testing.T) { + var rcvKeys []string + argsAPI := utils.ArgsGetCacheItemIDsWithArgDispatcher{ + TenantArg: utils.TenantArg{ + Tenant: "cgrates.org", + }, + ArgsGetCacheItemIDs: utils.ArgsGetCacheItemIDs{ + CacheID: utils.CacheDispatcherRoutes, + }, + } + if err := dspEngine1RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err == nil || + err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } + if err := dspEngine2RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err == nil || + err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } + + var rpl string + if err := dspEngine2RPC.Call(utils.AttributeSv1Ping, &utils.CGREventWithArgDispatcher{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + }, + ArgDispatcher: &utils.ArgDispatcher{ + RouteID: utils.StringPointer("testRouteFromDispatcher2"), + }, + }, &rpl); err != nil { + t.Error(err) + } else if rpl != utils.Pong { + t.Errorf("Received: %s", rpl) + } + + if err := dspEngine1RPC.Call(utils.AttributeSv1Ping, &utils.CGREventWithArgDispatcher{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + }, + ArgDispatcher: &utils.ArgDispatcher{ + RouteID: utils.StringPointer("testRouteFromDispatcher1"), + }, + }, &rpl); err != nil { + t.Error(err) + } else if rpl != utils.Pong { + t.Errorf("Received: %s", rpl) + } + + expKeys := []string{"testRouteFromDispatcher2:*attributes", "testRouteFromDispatcher1:*attributes"} + + if err := dspEngine2RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err != nil { + t.Error(err.Error()) + } + sort.Strings(rcvKeys) + sort.Strings(expKeys) + if !reflect.DeepEqual(expKeys, rcvKeys) { + t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys) + } + + if err := dspEngine1RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err != nil { + t.Error(err.Error()) + } + sort.Strings(rcvKeys) + sort.Strings(expKeys) + if !reflect.DeepEqual(expKeys, rcvKeys) { + t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys) + } + +} + +func testCacheRplAACheckLoadReplication(t *testing.T) { + var rcvKeys []string + argsAPI := utils.ArgsGetCacheItemIDsWithArgDispatcher{ + TenantArg: utils.TenantArg{ + Tenant: "cgrates.org", + }, + ArgsGetCacheItemIDs: utils.ArgsGetCacheItemIDs{ + CacheID: utils.CacheDispatcherLoads, + }, + } + if err := dspEngine2RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err == nil || + err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } + if err := dspEngine1RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err == nil || + err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } + + var rpl []*engine.ChrgSProcessEventReply + var wgDisp1 sync.WaitGroup + var wgDisp2 sync.WaitGroup + for i := 0; i < 10; i++ { + wgDisp1.Add(1) + wgDisp2.Add(1) + go func() { + if err := dspEngine1RPC.Call(utils.ChargerSv1ProcessEvent, &utils.CGREventWithArgDispatcher{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testCacheRplAACheckLoadReplication", + Event: map[string]interface{}{ + utils.Account: "1007", + utils.Destination: "+491511231234", + "EventName": "TestLoad", + }, + }, + ArgDispatcher: &utils.ArgDispatcher{ + RouteID: utils.StringPointer("testRouteFromDispatcher1"), + }, + }, &rpl); err != nil { + t.Error(err) + } else if rpl[0].ChargerSProfile != "DefaultCharger" { + t.Errorf("Received: %+v", utils.ToJSON(rpl)) + } + wgDisp1.Done() + }() + go func() { + if err := dspEngine2RPC.Call(utils.ChargerSv1ProcessEvent, &utils.CGREventWithArgDispatcher{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testCacheRplAACheckLoadReplication", + Event: map[string]interface{}{ + utils.Account: "1007", + utils.Destination: "+491511231234", + "EventName": "TestLoad", + }, + }, + ArgDispatcher: &utils.ArgDispatcher{ + RouteID: utils.StringPointer("testRouteFromDispatcher2"), + }, + }, &rpl); err != nil { + t.Error(err) + } else if rpl[0].ChargerSProfile != "DefaultCharger" { + t.Errorf("Received: %+v", utils.ToJSON(rpl)) + } + wgDisp2.Done() + }() + } + wgDisp1.Wait() + wgDisp2.Wait() + expKeys := []string{"testRouteFromDispatcher1:*attributes", + "testRouteFromDispatcher1:*chargers", "testRouteFromDispatcher2:*attributes", + "testRouteFromDispatcher2:*chargers"} + argsAPI = utils.ArgsGetCacheItemIDsWithArgDispatcher{ + TenantArg: utils.TenantArg{ + Tenant: "cgrates.org", + }, + ArgsGetCacheItemIDs: utils.ArgsGetCacheItemIDs{ + CacheID: utils.CacheDispatcherRoutes, + }, + } + if err := dspEngine2RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err != nil { + t.Error(err.Error()) + } + sort.Strings(rcvKeys) + sort.Strings(expKeys) + if !reflect.DeepEqual(expKeys, rcvKeys) { + t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys) + } + if err := dspEngine1RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err != nil { + t.Error(err.Error()) + } + sort.Strings(rcvKeys) + sort.Strings(expKeys) + if !reflect.DeepEqual(expKeys, rcvKeys) { + t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys) + } + + expKeys = []string{"cgrates.org:Engine2"} + argsAPI = utils.ArgsGetCacheItemIDsWithArgDispatcher{ + TenantArg: utils.TenantArg{ + Tenant: "cgrates.org", + }, + ArgsGetCacheItemIDs: utils.ArgsGetCacheItemIDs{ + CacheID: utils.CacheDispatcherLoads, + }, + } + if err := dspEngine2RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err != nil { + t.Error(err.Error()) + } + sort.Strings(rcvKeys) + sort.Strings(expKeys) + if !reflect.DeepEqual(expKeys, rcvKeys) { + t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys) + } + if err := dspEngine1RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err != nil { + t.Error(err.Error()) + } + sort.Strings(rcvKeys) + sort.Strings(expKeys) + if !reflect.DeepEqual(expKeys, rcvKeys) { + t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys) + } +} + func testCacheRplCheckLoadReplication(t *testing.T) { var rcvKeys []string argsAPI := utils.ArgsGetCacheItemIDsWithArgDispatcher{