From 520dca3204fc55d7db0dfe27f7d5e30cd8569419 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 9 Oct 2024 20:19:14 +0300 Subject: [PATCH] Add tests for load balancing to multiple workers --- general_tests/dsp_routes_it_test.go | 367 ++++++++++++++++++++++------ 1 file changed, 294 insertions(+), 73 deletions(-) diff --git a/general_tests/dsp_routes_it_test.go b/general_tests/dsp_routes_it_test.go index ffd9fb684..222e8c1f9 100644 --- a/general_tests/dsp_routes_it_test.go +++ b/general_tests/dsp_routes_it_test.go @@ -20,6 +20,7 @@ along with this program. If not, see package general_tests import ( + "bytes" "fmt" "reflect" "strconv" @@ -35,15 +36,15 @@ import ( ) const ( - host1Cfg = `{ + hostCfg = `{ "general": { - "node_id": "host1", + "node_id": "%s", "log_level": 7 }, "listen": { - "rpc_json": ":4012", - "rpc_gob": ":4013", - "http": ":4080" + "rpc_json": ":%[2]d12", + "rpc_gob": ":%[2]d13", + "http": ":%[2]d80" }, "dispatchers":{ "enabled": true, @@ -81,52 +82,7 @@ const ( "enabled": true } }` - host2Cfg = `{ -"general": { - "node_id": "host2", - "log_level": 7 -}, -"listen": { - "rpc_json": ":6012", - "rpc_gob": ":6013", - "http": ":6080" -}, -"dispatchers":{ - "enabled": true, - "prevent_loop": true -}, -"caches":{ - "partitions": { - "*dispatcher_profiles": { - "limit": -1, - "remote":true - }, - "*dispatcher_routes": { - "limit": -1, - "remote":true - }, - "*dispatchers": { - "limit": -1, - "remote":true - } - }, - "remote_conns": ["gob_cache"] -}, -"apiers": { - "enabled": true -}, -"rpc_conns": { - "gob_cache": { - "strategy": "*first", - "conns": [ - { - "address": "127.0.0.1:6013", - "transport":"*gob" - } - ] - } -} -}` + hostSetterCfg = `{ "general": { "node_id": "setter", @@ -160,25 +116,25 @@ const ( func TestDispatcherRoutesNotFound(t *testing.T) { switch *utils.DBType { - case utils.MetaInternal: - case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: + case utils.MetaMySQL: + case utils.MetaInternal, utils.MetaMongo, utils.MetaPostgres: t.SkipNow() default: t.Fatal("unsupported dbtype value") } host1 := TestEngine{ // first engine, port 4012 - ConfigJSON: host1Cfg, + ConfigJSON: fmt.Sprintf(hostCfg, "host1", 40), } ng1Client, _ := host1.Run(t) host2 := TestEngine{ // second engine, port 6012 - ConfigJSON: host2Cfg, + ConfigJSON: fmt.Sprintf(hostCfg, "host2", 60), } ng2Client, _ := host2.Run(t) // Send Status requests with *dispatchers on false. - checkStatus(t, ng1Client, false, "account#dan.bogos", "host1") - checkStatus(t, ng2Client, false, "account#dan.bogos", "host2") + checkStatus(t, ng1Client, false, "account#dan.bogos", "", "host1") + checkStatus(t, ng2Client, false, "account#dan.bogos", "", "host2") // Check that dispatcher routes were not cached due to *dispatchers being false. getCacheItem(t, ng1Client, false, utils.CacheDispatcherRoutes, "account#dan.bogos:*core", nil) @@ -187,8 +143,8 @@ func TestDispatcherRoutesNotFound(t *testing.T) { func TestDispatcherRoutes(t *testing.T) { switch *utils.DBType { - case utils.MetaInternal: - case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: + case utils.MetaMySQL: + case utils.MetaInternal, utils.MetaMongo, utils.MetaPostgres: t.SkipNow() default: t.Fatal("unsupported dbtype value") @@ -201,7 +157,7 @@ func TestDispatcherRoutes(t *testing.T) { // Starting only the second dispatcher engine, for now. host2 := TestEngine{ - ConfigJSON: host2Cfg, + ConfigJSON: fmt.Sprintf(hostCfg, "host2", 60), PreserveDataDB: true, PreserveStorDB: true, } @@ -209,11 +165,11 @@ func TestDispatcherRoutes(t *testing.T) { setDispatcherHost(t, setterClient, "host1", 4012) setDispatcherHost(t, setterClient, "host2", 6012) - setDispatcherProfile(t, setterClient, "dsp_test", "host1;10", "host2;5") + setDispatcherProfile(t, setterClient, "dsp_test", utils.MetaWeight, "host1;10", "host2;5") // Send status request to the second engine. "host2" will match, even though "host1" has the bigger weight. // That's because the first engine has not been started yet. - checkStatus(t, ng2Client, true, "account#dan.bogos", "host2") + checkStatus(t, ng2Client, true, "account#dan.bogos", "", "host2") // Check that the dispatcher route has been cached (same for the profile and the dispatcher itself). getCacheItem(t, ng2Client, false, utils.CacheDispatcherRoutes, "account#dan.bogos:*core", map[string]any{ @@ -253,21 +209,21 @@ func TestDispatcherRoutes(t *testing.T) { // Start the first engine. host1 := TestEngine{ - ConfigJSON: host1Cfg, + ConfigJSON: fmt.Sprintf(hostCfg, "host1", 40), PreserveDataDB: true, PreserveStorDB: true, } ng1Client, _ := host1.Run(t) // "host2" will match again due to being cached previously. - checkStatus(t, ng1Client, true, "account#dan.bogos", "host2") + checkStatus(t, ng1Client, true, "account#dan.bogos", "", "host2") // Clear cache and try again. - clearCache(t, ng1Client) - clearCache(t, ng2Client) + clearCache(t, ng1Client, "") + clearCache(t, ng2Client, "") // This time it will match "host1" which has the bigger weight. - checkStatus(t, ng1Client, true, "account#dan.bogos", "host1") + checkStatus(t, ng1Client, true, "account#dan.bogos", "", "host1") // Check the relevant cache items. Should be the same as before, the difference being the HostID // from *dispatcher_routes ("host1" instead of "host2"). @@ -307,7 +263,7 @@ func TestDispatcherRoutes(t *testing.T) { getCacheItem(t, ng1Client, false, utils.CacheDispatchers, "cgrates.org:dsp_test", map[string]any{}) // Overwrite the DispatcherProfile (removed host1). - setDispatcherProfile(t, setterClient, "dsp_test", "host2;5") + setDispatcherProfile(t, setterClient, "dsp_test", utils.MetaWeight, "host2;5") time.Sleep(5 * time.Millisecond) // wait for cache updates to reach all external engines // Check that related cache items have been updated automatically. @@ -338,19 +294,283 @@ func TestDispatcherRoutes(t *testing.T) { getCacheItem(t, ng1Client, false, utils.CacheDispatchers, "cgrates.org:dsp_test", nil) // Nothing happens when setting a different dispatcher profile that's using the same hosts as before. - setDispatcherProfile(t, setterClient, "dsp_test2", "host1;50", "host2;150") + setDispatcherProfile(t, setterClient, "dsp_test2", utils.MetaWeight, "host1;50", "host2;150") getCacheItem(t, ng1Client, false, utils.CacheDispatcherRoutes, "account#dan.bogos:*core", nil) getCacheItem(t, ng1Client, false, utils.CacheDispatcherProfiles, "cgrates.org:dsp_test2", nil) getCacheItem(t, ng1Client, false, utils.CacheDispatchers, "cgrates.org:dsp_test2", nil) } -func checkStatus(t *testing.T, client *birpc.Client, dispatch bool, routeID, expNodeID string) { +func TestDispatchersLoadBalanceWithAuth(t *testing.T) { + switch *utils.DBType { + case utils.MetaMySQL: + case utils.MetaInternal, utils.MetaMongo, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("unsupported dbtype value") + } + + const ( + dspCfg = `{ +"general": { + "node_id": "dispatcher", + "log_level": 7 +}, +"apiers": { + "enabled": true +}, +"attributes": { + "enabled": true +}, +"dispatchers": { + "enabled": true, + "attributes_conns": ["*internal"] +} +}` + hostCfg = `{ +"general": { + "node_id": "host%s", + "log_level": 7 +}, +"listen": { + "rpc_json": ":%[2]d12", + "rpc_gob": ":%[2]d13", + "http": ":%[2]d80" +}, +"apiers": { + "enabled": true +} +}` + ) + + dsp := TestEngine{ // dispatcher engine + ConfigJSON: dspCfg, + } + clientDsp, _ := dsp.Run(t) + hostA := TestEngine{ // first worker engine (additionally loads the tps), ports 210xx + ConfigJSON: fmt.Sprintf(hostCfg, "A", 210), + PreserveDataDB: true, + PreserveStorDB: true, + TpFiles: map[string]string{ + utils.DispatcherProfilesCsv: `#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight +cgrates.org,dsp_test,,,,*round_robin,,,,,,, +cgrates.org,dsp_test,,,,,,hostA,,30,,, +cgrates.org,dsp_test,,,,,,hostB,,20,,, +cgrates.org,dsp_test,,,,,,hostC,,10,,,`, + utils.DispatcherHostsCsv: `#Tenant[0],ID[1],Address[2],Transport[3],ConnectAttempts[4],Reconnects[5],MaxReconnectInterval[6],ConnectTimeout[7],ReplyTimeout[8],Tls[9],ClientKey[10],ClientCertificate[11],CaCertificate[12] +cgrates.org,hostA,127.0.0.1:21012,*json,1,1,,2s,2s,,,, +cgrates.org,hostB,127.0.0.1:22012,*json,1,1,,2s,2s,,,, +cgrates.org,hostC,127.0.0.1:23012,*json,1,1,,2s,2s,,,,`, + utils.AttributesCsv: `#Tenant,ID,Contexts,FilterIDs,ActivationInterval,AttributeFilterIDs,Path,Type,Value,Blocker,Weight +cgrates.org,attr_auth,*auth,*string:~*req.ApiKey:12345,,,*req.APIMethods,*constant,CacheSv1.Clear&CoreSv1.Status,false,20`, + }, + } + _, _ = hostA.Run(t) + hostB := TestEngine{ // second worker engine, ports 220xx + ConfigJSON: fmt.Sprintf(hostCfg, "B", 220), + PreserveDataDB: true, + PreserveStorDB: true, + } + _, _ = hostB.Run(t) + hostC := TestEngine{ // third worker engine, ports 230xx + PreserveDataDB: true, + PreserveStorDB: true, + ConfigJSON: fmt.Sprintf(hostCfg, "C", 230), + } + _, _ = hostC.Run(t) + + // Initial check for dispatcher status. + checkStatus(t, clientDsp, false, "account#1001", "12345", "dispatcher") + + // Test setup: + // - 3 CGR engine workers (hostA, hostB, hostC) + // - 4 accounts (1001, 1002, 1003, 1004) + // - using round-robin load strategy + + // First round (dispatcher routes not yet cached) + // Each account is assigned to a host in order, wrapping around to hostA for the 4th account. + checkStatus(t, clientDsp, true, "account#1001", "12345", "hostA") + checkStatus(t, clientDsp, true, "account#1002", "12345", "hostB") + checkStatus(t, clientDsp, true, "account#1003", "12345", "hostC") + checkStatus(t, clientDsp, true, "account#1004", "12345", "hostA") + + // Second round (previous dispatcher routes are cached) + // Each account maintains its previously assigned host, regardless of the round-robin order. + checkStatus(t, clientDsp, true, "account#1001", "12345", "hostA") // without routeID: hostB + checkStatus(t, clientDsp, true, "account#1002", "12345", "hostB") // without routeID: hostC + checkStatus(t, clientDsp, true, "account#1003", "12345", "hostC") // without routeID: hostA + checkStatus(t, clientDsp, true, "account#1004", "12345", "hostA") // without routeID: hostB + + // Third round (clearing cache inbetween status requests) + checkStatus(t, clientDsp, true, "account#1001", "12345", "hostA") // Without routeID: hostC + checkStatus(t, clientDsp, true, "account#1002", "12345", "hostB") // Without routeID: hostA + + // Clearing cache resets both the cached dispatcher routes and the + // round-robin load dispatcher. The assignment will now start over from + // the beginning. + clearCache(t, clientDsp, "12345") + checkStatus(t, clientDsp, true, "account#1003", "12345", "hostA") + checkStatus(t, clientDsp, true, "account#1004", "12345", "hostB") +} + +func TestDispatchersRoutingOnAcc(t *testing.T) { + t.Skip("skip until we find a way to mention nodeID of the worker processing the request inside the CDR") + switch *utils.DBType { + case utils.MetaMySQL: + case utils.MetaInternal, utils.MetaMongo, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("unsupported dbtype value") + } + + const ( + dspCfg = `{ +"general": { + "node_id": "dispatcher", + "log_level": 7 +}, +"apiers": { + "enabled": true +}, +"dispatchers": { + "enabled": true +} +}` + hostCfg = `{ +"general": { + "node_id": "host%s", + "log_level": 7 +}, +"listen": { + "rpc_json": ":%[2]d12", + "rpc_gob": ":%[2]d13", + "http": ":%[2]d80" +}, +"rals": { + "enabled": true +}, +"cdrs": { + "enabled": true, + "rals_conns": ["*internal"] +}, +"schedulers": { + "enabled": true, + "cdrs_conns": ["*internal"] +}, +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"] +}, +"sessions": { + "enabled": true, + "listen_bijson": "127.0.0.1:%[2]d14", + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], + "rals_conns": ["*internal"] +}, +"chargers": { + "enabled": true +} +}` + ) + + buf := &bytes.Buffer{} + dsp := TestEngine{ // dispatcher engine + LogBuffer: buf, + ConfigJSON: dspCfg, + } + clientDsp, _ := dsp.Run(t) + hostA := TestEngine{ // first worker engine (additionally loads the tps), ports 210xx + ConfigJSON: fmt.Sprintf(hostCfg, "A", 210), + PreserveDataDB: true, + PreserveStorDB: true, + TpFiles: map[string]string{ + utils.DispatcherProfilesCsv: `#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight +cgrates.org,dsp_test,,,,*round_robin,,,,,,, +cgrates.org,dsp_test,,,,,,hostA,,30,,, +cgrates.org,dsp_test,,,,,,hostB,,20,,, +cgrates.org,dsp_test,,,,,,hostC,,10,,,`, + utils.DispatcherHostsCsv: `#Tenant[0],ID[1],Address[2],Transport[3],ConnectAttempts[4],Reconnects[5],MaxReconnectInterval[6],ConnectTimeout[7],ReplyTimeout[8],Tls[9],ClientKey[10],ClientCertificate[11],CaCertificate[12] +cgrates.org,hostA,127.0.0.1:21012,*json,1,1,,2s,2s,,,, +cgrates.org,hostB,127.0.0.1:22012,*json,1,1,,2s,2s,,,, +cgrates.org,hostC,127.0.0.1:23012,*json,1,1,,2s,2s,,,,`, + utils.AccountActionsCsv: `#Tenant,Account,ActionPlanId,ActionTriggersId,AllowNegative,Disabled +cgrates.org,1001,package_topup,,, +cgrates.org,1002,package_topup,,, +cgrates.org,1003,package_topup,,, +cgrates.org,1004,package_topup,,,`, + utils.ActionPlansCsv: `#Id,ActionsId,TimingId,Weight +package_topup,act_topup,*asap,10`, + utils.ActionsCsv: `#ActionsId[0],Action[1],ExtraParameters[2],Filter[3],BalanceId[4],BalanceType[5],Categories[6],DestinationIds[7],RatingSubject[8],SharedGroup[9],ExpiryTime[10],TimingIds[11],Units[12],BalanceWeight[13],BalanceBlocker[14],BalanceDisabled[15],Weight[16] +act_topup,*topup_reset,,,main_balance,*sms,,,,,*unlimited,,10,,,,`, + }, + } + _, _ = hostA.Run(t) + hostB := TestEngine{ // second worker engine, ports 220xx + ConfigJSON: fmt.Sprintf(hostCfg, "B", 220), + PreserveDataDB: true, + PreserveStorDB: true, + } + _, _ = hostB.Run(t) + hostC := TestEngine{ // third worker engine, ports 230xx + PreserveDataDB: true, + PreserveStorDB: true, + ConfigJSON: fmt.Sprintf(hostCfg, "C", 230), + } + _, _ = hostC.Run(t) + + idx := 0 + processCDR := func(t *testing.T, client *birpc.Client, acc string) { + idx++ + routeID := "account#:" + acc + var reply string + if err := client.Call(context.Background(), utils.SessionSv1ProcessCDR, + &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]any{ + utils.RunID: utils.MetaDefault, + utils.Tenant: "cgrates.org", + utils.Category: "sms", + utils.ToR: utils.MetaSMS, + utils.OriginID: fmt.Sprintf("processCDR%d", idx), + utils.OriginHost: "127.0.0.1", + utils.RequestType: utils.MetaPostpaid, + utils.AccountField: acc, + utils.Destination: "9000", + utils.SetupTime: time.Date(2024, time.October, 9, 16, 14, 50, 0, time.UTC), + utils.AnswerTime: time.Date(2024, time.October, 9, 16, 15, 0, 0, time.UTC), + utils.Usage: 1, + }, + APIOpts: map[string]any{ + utils.OptsRouteID: routeID, + }, + }, &reply); err != nil { + t.Errorf("SessionSv1.ProcessCDR(acc: %s, idx: %d) unexpected err: %v", acc, idx, err) + } + } + + for range 3 { + processCDR(t, clientDsp, "1001") + processCDR(t, clientDsp, "1002") + processCDR(t, clientDsp, "1003") + processCDR(t, clientDsp, "1004") + } + + var cdrs []*engine.CDR + if err := clientDsp.Call(context.Background(), utils.CDRsV1GetCDRs, &utils.RPCCDRsFilterWithAPIOpts{ + RPCCDRsFilter: &utils.RPCCDRsFilter{}}, &cdrs); err != nil { + t.Fatal(err) + } + // fmt.Println(utils.ToJSON(cdrs)) +} + +func checkStatus(t *testing.T, client *birpc.Client, dispatch bool, routeID, apiKey, expNodeID string) { t.Helper() args := &utils.TenantWithAPIOpts{ Tenant: "cgrates.org", APIOpts: map[string]any{ utils.OptsRouteID: routeID, utils.MetaDispatchers: dispatch, + utils.OptsAPIKey: apiKey, }, } var reply map[string]any @@ -391,13 +611,14 @@ func getCacheItem(t *testing.T, client *birpc.Client, dispatch bool, cacheID, it } } -func clearCache(t *testing.T, client *birpc.Client) { +func clearCache(t *testing.T, client *birpc.Client, apiKey string) { t.Helper() var reply string if err := client.Call(context.Background(), utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{ APIOpts: map[string]any{ utils.MetaDispatchers: false, + utils.OptsAPIKey: apiKey, }, }, &reply); err != nil { t.Fatalf("CacheSv1.Clear unexpected err: %v", err) @@ -429,7 +650,7 @@ func setDispatcherHost(t *testing.T, client *birpc.Client, id string, port int) } } -func setDispatcherProfile(t *testing.T, client *birpc.Client, id string, hosts ...string) { +func setDispatcherProfile(t *testing.T, client *birpc.Client, id, strategy string, hosts ...string) { t.Helper() hostPrfs := make(engine.DispatcherHostProfiles, 0, len(hosts)) for _, host := range hosts { @@ -452,7 +673,7 @@ func setDispatcherProfile(t *testing.T, client *birpc.Client, id string, hosts . DispatcherProfile: &engine.DispatcherProfile{ Tenant: "cgrates.org", ID: id, - Strategy: "*weight", + Strategy: strategy, Subsystems: []string{utils.MetaAny}, Hosts: hostPrfs, },