From 3f8af2f106063dac69e386a5888b7965de659677 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Thu, 3 Apr 2025 20:08:51 +0300 Subject: [PATCH] add benchmark for remote+replication (incomplete) --- agents/diambench_test.go | 348 ++++++++++++++++------- data/conf/samples/diambench/cgrates.json | 7 +- 2 files changed, 245 insertions(+), 110 deletions(-) diff --git a/agents/diambench_test.go b/agents/diambench_test.go index 297d1bc36..9a37f3f92 100644 --- a/agents/diambench_test.go +++ b/agents/diambench_test.go @@ -21,10 +21,10 @@ along with this program. If not, see package agents import ( + "bytes" "flag" "fmt" "path/filepath" - "sync" "sync/atomic" "testing" "time" @@ -45,20 +45,6 @@ var ( ) func BenchmarkDiameterCaps(b *testing.B) { - // b.Skip("still incomplete") - var dbCfg engine.DBCfg - switch *utils.DBType { - case utils.MetaInternal: - dbCfg = engine.InternalDBCfg - case utils.MetaMySQL: - case utils.MetaMongo: - dbCfg = engine.MongoDBCfg - case utils.MetaPostgres: - dbCfg = engine.PostgresDBCfg - default: - b.Fatal("unsupported dbtype value") - } - // CoreS config is dynamic for this benchmark. jsonCfg := fmt.Sprintf(`{ "cores": { @@ -83,7 +69,7 @@ ACT_TOPUP,*topup_reset,,,balance_sms,*sms,,,,,*unlimited,,1000000,,,,`, utils.ChargersCsv: `#Tenant,ID,FilterIDs,ActivationInterval,RunID,AttributeIDs,Weight cgrates.org,DEFAULT,,,*default,*none,0`, }, - DBCfg: dbCfg, + DBCfg: engine.InternalDBCfg, } client, cfg := ng.Run(b) @@ -96,107 +82,21 @@ cgrates.org,DEFAULT,,,*default,*none,0`, b.Fatal(err) } - var mu sync.Mutex // to ensure sessionID is unique - var sent int64 - var answered, completed atomic.Int64 - sendCCR := func(t testing.TB, replyTimeout time.Duration, wantResultCode string) { - mu.Lock() - sent++ - sessionID := fmt.Sprintf("session%d", sent) - mu.Unlock() - ccr := diam.NewRequest(diam.CreditControl, 4, nil) - ccr.NewAVP(avp.SessionID, avp.Mbit, 0, datatype.UTF8String(sessionID)) - ccr.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity("CGR-DA")) - ccr.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity("cgrates.org")) - ccr.NewAVP(avp.AuthApplicationID, avp.Mbit, 0, datatype.Unsigned32(4)) - ccr.NewAVP(avp.ServiceContextID, avp.Mbit, 0, datatype.UTF8String("message@DiamItCCRSMS")) - ccr.NewAVP(avp.CCRequestType, avp.Mbit, 0, datatype.Enumerated(4)) - ccr.NewAVP(avp.CCRequestNumber, avp.Mbit, 0, datatype.Unsigned32(0)) - ccr.NewAVP(avp.EventTimestamp, avp.Mbit, 0, datatype.Time(time.Date(2018, 10, 5, 11, 43, 10, 0, time.UTC))) - ccr.NewAVP(avp.SubscriptionID, avp.Mbit, 0, &diam.GroupedAVP{ - AVP: []*diam.AVP{ - diam.NewAVP(avp.SubscriptionIDType, avp.Mbit, 0, datatype.Enumerated(0)), - diam.NewAVP(avp.SubscriptionIDData, avp.Mbit, 0, datatype.UTF8String("1001")), // Subscription-Id-Data - }}) - ccr.NewAVP(avp.SubscriptionID, avp.Mbit, 0, &diam.GroupedAVP{ - AVP: []*diam.AVP{ - diam.NewAVP(avp.SubscriptionIDType, avp.Mbit, 0, datatype.Enumerated(1)), - diam.NewAVP(avp.SubscriptionIDData, avp.Mbit, 0, datatype.UTF8String("104502200011")), // Subscription-Id-Data - }}) - ccr.NewAVP(avp.ServiceIdentifier, avp.Mbit, 0, datatype.Unsigned32(0)) - ccr.NewAVP(avp.RequestedAction, avp.Mbit, 0, datatype.Enumerated(0)) - ccr.NewAVP(avp.RequestedServiceUnit, avp.Mbit, 0, &diam.GroupedAVP{ - AVP: []*diam.AVP{ - diam.NewAVP(avp.CCTime, avp.Mbit, 0, datatype.Unsigned32(1))}}) - ccr.NewAVP(873, avp.Mbit, 10415, &diam.GroupedAVP{ // - AVP: []*diam.AVP{ - diam.NewAVP(20300, avp.Mbit, 2011, &diam.GroupedAVP{ // IN-Information - AVP: []*diam.AVP{ - diam.NewAVP(20302, avp.Mbit, 2011, datatype.UTF8String("22509")), // Calling-Vlr-Number - diam.NewAVP(20385, avp.Mbit, 2011, datatype.UTF8String("4002")), // Called-Party-NP - }, - }), - diam.NewAVP(2000, avp.Mbit, 10415, &diam.GroupedAVP{ // SMS-Information - AVP: []*diam.AVP{ - diam.NewAVP(886, avp.Mbit, 10415, &diam.GroupedAVP{ // Originator-Address - AVP: []*diam.AVP{ - diam.NewAVP(899, avp.Mbit, 10415, datatype.Enumerated(1)), // Address-Type - diam.NewAVP(897, avp.Mbit, 10415, datatype.UTF8String("1001")), // Address-Data - }}), - diam.NewAVP(1201, avp.Mbit, 10415, &diam.GroupedAVP{ // Recipient-Address - AVP: []*diam.AVP{ - diam.NewAVP(899, avp.Mbit, 10415, datatype.Enumerated(1)), // Address-Type - diam.NewAVP(897, avp.Mbit, 10415, datatype.UTF8String("1003")), // Address-Data - }}), - }, - }), - }}) - - if err := diamClient.SendMessage(ccr); err != nil { - t.Errorf("failed to send diameter message: %v", err) - return - } - - reply := diamClient.ReceivedMessage(replyTimeout) - if reply == nil { - t.Error("received empty reply") - return - } - answered.Add(1) - - avps, err := reply.FindAVPsWithPath([]any{"Result-Code"}, dict.UndefinedVendorID) - if err != nil { - t.Error(err) - return - } - if len(avps) == 0 { - t.Error("missing AVPs in reply") - return - } - - resultCode, err := diamAVPAsString(avps[0]) - if err != nil { - t.Error(err) - return - } - if resultCode != wantResultCode { - t.Errorf("Result-Code=%s, want %s", resultCode, wantResultCode) - return - } - completed.Add(1) - } - // actual benchmark + var sent, completed atomic.Int64 b.SetParallelism(*parallelism) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { - sendCCR(b, cfg.CoreSCfg().ShutdownTimeout, "2001") + sent.Add(1) + if sendDiamCCR(b, diamClient, cfg.CoreSCfg().ShutdownTimeout, "2001") { + completed.Add(1) + } } }) // check results - b.Logf("sent %d, answered %d, completed %d", sent, answered.Load(), completed.Load()) + b.Logf("sent %d, completed %d", sent.Load(), completed.Load()) var acnt *engine.Account attrsAcnt := &utils.AttrGetAccount{ Tenant: "cgrates.org", @@ -209,3 +109,235 @@ cgrates.org,DEFAULT,,,*default,*none,0`, b.Errorf("APIerSv1.GetAccount: sms_balance: %f, want: %f", rply, expBalance) } } + +func BenchmarkDiameterReplication(b *testing.B) { + // Setup for storage engine. + rplNgJSONCfg := `{ +"listen": { + "rpc_json": "127.0.0.1:22012", + "rpc_gob": "127.0.0.1:22013", + "http": "127.0.0.1:22080", +}, +"data_db": { + "db_host": "192.168.122.164" +}, +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"] +}, +"schedulers": { + "enabled": true +} +}` + + rplNg := engine.TestEngine{ + TpFiles: map[string]string{ + utils.AccountActionsCsv: `#Tenant,Account,ActionPlanId,ActionTriggersId,AllowNegative,Disabled +cgrates.org,1001,PACKAGE_1001,,,`, + utils.ActionPlansCsv: `#Id,ActionsId,TimingId,Weight +PACKAGE_1001,ACT_TOPUP,*asap,10`, + utils.ActionsCsv: `#ActionsId[0],Action[1],ExtraParameters[2],Filter[3],BalanceId[4],BalanceType[5],Categories[6],DestinationIds[7],RatingSubject[8],SharedGroup[9],ExpiryTime[10],TimingIds[11],Units[12],BalanceWeight[13],BalanceBlocker[14],BalanceDisabled[15],Weight[16] +ACT_TOPUP,*topup_reset,,,balance_sms,*sms,,,,,*unlimited,,1000000,,,,`, + utils.ChargersCsv: `#Tenant,ID,FilterIDs,ActivationInterval,RunID,AttributeIDs,Weight +cgrates.org,DEFAULT,*string:~*req.Account:1001,,*default,*none,10 +cgrates.org,Raw,,,*raw,*constant:*req.RequestType:*none,0`, + }, + ConfigJSON: rplNgJSONCfg, + DBCfg: engine.DBCfg{ + DataDB: &engine.DBParams{ + Host: utils.StringPointer("192.168.122.164"), + }, + StorDB: engine.InternalDBCfg.StorDB, + }, + LogBuffer: new(bytes.Buffer), + } + // defer fmt.Println(rplNg.LogBuffer) + rplClient, _ := rplNg.Run(b) + + // Setup for rater. + rplInterval := 5 * time.Millisecond + failedDir := b.TempDir() + jsonCfg := fmt.Sprintf(`{ +"data_db": { + "db_type": "*internal", + "remote_conns": ["db_conn"], + "replication_conns": ["db_conn"], + "replication_filtered": false, + "replication_cache": "", + "replication_failed_dir": "%s", + "replication_interval": "%s", + "items":{ + "*accounts": {"limit": -1, "ttl": "", "static_ttl": false, "remote": true, "replicate": true}, + "*reverse_destinations": {"limit": -1, "ttl": "", "static_ttl": false, "remote": true, "replicate": false}, + "*destinations": {"limit": -1, "ttl": "", "static_ttl": false, "remote": true, "replicate": false}, + "*rating_plans": {"limit": -1, "ttl": "", "static_ttl": false, "remote": true, "replicate": false}, + "*rating_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote": true, "replicate": false}, + "*actions": {"limit": -1, "ttl": "", "static_ttl": false, "remote": true, "replicate": false}, + "*action_plans": {"limit": -1, "ttl": "", "static_ttl": false, "remote": true, "replicate": false}, + "*account_action_plans": {"limit": -1, "ttl": "", "static_ttl": false, "remote": true, "replicate": false}, + "*action_triggers": {"limit": -1, "ttl": "", "static_ttl": false, "remote": true, "replicate": false}, + "*shared_groups": {"limit": -1, "ttl": "", "static_ttl": false, "remote": true, "replicate": false}, + "*timings": {"limit": -1, "ttl": "", "static_ttl": false, "remote": true, "replicate": false}, + "*filters": {"limit": -1, "ttl": "", "static_ttl": false, "remote": true, "replicate": false}, + "*attribute_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote": true, "replicate": false}, + "*charger_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote": true, "replicate": false}, + "*load_ids": {"limit": -1, "ttl": "", "static_ttl": false, "remote": true, "replicate": false}, + "*versions": {"limit": -1, "ttl": "", "static_ttl": false, "remote": true, "replicate": false}, + "*attribute_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote": false, "replicate": false}, + "*charger_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote": true, "replicate": false}, + "*reverse_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote": false, "replicate": false}, + } +}, +"rpc_conns": { + "db_conn": { + "conns": [ + { + "address": "127.0.0.1:22013", + "transport":"*gob", + "connect_attempts": 5, + "reconnects": 5, + "max_reconnect_interval": "", + "connect_timeout":"1s", + "reply_timeout":"2s" + } + ] + } +} +}`, failedDir, rplInterval) + + ng := engine.TestEngine{ + ConfigJSON: jsonCfg, + ConfigPath: filepath.Join(*utils.DataDir, "conf", "samples", "diambench"), + LogBuffer: new(bytes.Buffer), + } + // defer fmt.Println(ng.LogBuffer) + client, cfg := ng.Run(b) + + time.Sleep(50 * time.Millisecond) // wait for DiameterAgent service to start + diamClient, err := NewDiameterClient(cfg.DiameterAgentCfg().Listen, "localhost", + cfg.DiameterAgentCfg().OriginRealm, cfg.DiameterAgentCfg().VendorID, + cfg.DiameterAgentCfg().ProductName, utils.DiameterFirmwareRevision, + cfg.DiameterAgentCfg().DictionariesPath, cfg.DiameterAgentCfg().ListenNet) + if err != nil { + b.Fatal(err) + } + + // actual benchmark + var sent, completed atomic.Int64 + b.SetParallelism(*parallelism) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + sent.Add(1) + if sendDiamCCR(b, diamClient, 5*time.Second, "2001") { + completed.Add(1) + } + } + }) + + // Wait for last batch of replications to execute. + time.Sleep(rplInterval) + + // check results + b.Logf("sent %d, completed %d", sent.Load(), completed.Load()) + var acnt *engine.Account + attrsAcnt := &utils.AttrGetAccount{ + Tenant: "cgrates.org", + Account: "1001", + } + expBalance := float64(1000000 - completed.Load()) + if err = client.Call(context.Background(), utils.APIerSv2GetAccount, attrsAcnt, &acnt); err != nil { + b.Errorf("APIerSv1.GetAccount unexpected err: %v", err) + } else if rply := acnt.BalanceMap[utils.MetaSMS].GetTotalValue(); rply != expBalance { + b.Errorf("APIerSv1.GetAccount: sms_balance: %f, want: %f", rply, expBalance) + } + if err = rplClient.Call(context.Background(), utils.APIerSv2GetAccount, attrsAcnt, &acnt); err != nil { + b.Errorf("APIerSv1.GetAccount unexpected err: %v", err) + } else if rply := acnt.BalanceMap[utils.MetaSMS].GetTotalValue(); rply != expBalance { + b.Errorf("APIerSv1.GetAccount: sms_balance: %f, want: %f", rply, expBalance) + } +} + +// sendDiamCCR sends a CCR and verifies the expected result code, returning success status +func sendDiamCCR(tb testing.TB, client *DiameterClient, replyTimeout time.Duration, wantResultCode string) bool { + tb.Helper() + ccr := diam.NewRequest(diam.CreditControl, 4, nil) + ccr.NewAVP(avp.SessionID, avp.Mbit, 0, datatype.UTF8String(utils.UUIDSha1Prefix())) + ccr.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity("CGR-DA")) + ccr.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity("cgrates.org")) + ccr.NewAVP(avp.AuthApplicationID, avp.Mbit, 0, datatype.Unsigned32(4)) + ccr.NewAVP(avp.ServiceContextID, avp.Mbit, 0, datatype.UTF8String("message@DiamItCCRSMS")) + ccr.NewAVP(avp.CCRequestType, avp.Mbit, 0, datatype.Enumerated(4)) + ccr.NewAVP(avp.CCRequestNumber, avp.Mbit, 0, datatype.Unsigned32(0)) + ccr.NewAVP(avp.EventTimestamp, avp.Mbit, 0, datatype.Time(time.Date(2018, 10, 5, 11, 43, 10, 0, time.UTC))) + ccr.NewAVP(avp.SubscriptionID, avp.Mbit, 0, &diam.GroupedAVP{ + AVP: []*diam.AVP{ + diam.NewAVP(avp.SubscriptionIDType, avp.Mbit, 0, datatype.Enumerated(0)), + diam.NewAVP(avp.SubscriptionIDData, avp.Mbit, 0, datatype.UTF8String("1001")), // Subscription-Id-Data + }}) + ccr.NewAVP(avp.SubscriptionID, avp.Mbit, 0, &diam.GroupedAVP{ + AVP: []*diam.AVP{ + diam.NewAVP(avp.SubscriptionIDType, avp.Mbit, 0, datatype.Enumerated(1)), + diam.NewAVP(avp.SubscriptionIDData, avp.Mbit, 0, datatype.UTF8String("104502200011")), // Subscription-Id-Data + }}) + ccr.NewAVP(avp.ServiceIdentifier, avp.Mbit, 0, datatype.Unsigned32(0)) + ccr.NewAVP(avp.RequestedAction, avp.Mbit, 0, datatype.Enumerated(0)) + ccr.NewAVP(avp.RequestedServiceUnit, avp.Mbit, 0, &diam.GroupedAVP{ + AVP: []*diam.AVP{ + diam.NewAVP(avp.CCTime, avp.Mbit, 0, datatype.Unsigned32(1))}}) + ccr.NewAVP(873, avp.Mbit, 10415, &diam.GroupedAVP{ // + AVP: []*diam.AVP{ + diam.NewAVP(20300, avp.Mbit, 2011, &diam.GroupedAVP{ // IN-Information + AVP: []*diam.AVP{ + diam.NewAVP(20302, avp.Mbit, 2011, datatype.UTF8String("22509")), // Calling-Vlr-Number + diam.NewAVP(20385, avp.Mbit, 2011, datatype.UTF8String("4002")), // Called-Party-NP + }, + }), + diam.NewAVP(2000, avp.Mbit, 10415, &diam.GroupedAVP{ // SMS-Information + AVP: []*diam.AVP{ + diam.NewAVP(886, avp.Mbit, 10415, &diam.GroupedAVP{ // Originator-Address + AVP: []*diam.AVP{ + diam.NewAVP(899, avp.Mbit, 10415, datatype.Enumerated(1)), // Address-Type + diam.NewAVP(897, avp.Mbit, 10415, datatype.UTF8String("1001")), // Address-Data + }}), + diam.NewAVP(1201, avp.Mbit, 10415, &diam.GroupedAVP{ // Recipient-Address + AVP: []*diam.AVP{ + diam.NewAVP(899, avp.Mbit, 10415, datatype.Enumerated(1)), // Address-Type + diam.NewAVP(897, avp.Mbit, 10415, datatype.UTF8String("1003")), // Address-Data + }}), + }, + }), + }}) + + if err := client.SendMessage(ccr); err != nil { + tb.Errorf("failed to send diameter message: %v", err) + return false + } + + reply := client.ReceivedMessage(replyTimeout) + if reply == nil { + tb.Error("received empty reply") + return false + } + + avps, err := reply.FindAVPsWithPath([]any{"Result-Code"}, dict.UndefinedVendorID) + if err != nil { + tb.Error(err) + return false + } + if len(avps) == 0 { + tb.Error("missing AVPs in reply") + return false + } + + resultCode, err := diamAVPAsString(avps[0]) + if err != nil { + tb.Error(err) + return false + } + if resultCode != wantResultCode { + tb.Errorf("Result-Code=%s, want %s", resultCode, wantResultCode) + return false + } + return true +} diff --git a/data/conf/samples/diambench/cgrates.json b/data/conf/samples/diambench/cgrates.json index 5a31f0e0d..b273b5f58 100644 --- a/data/conf/samples/diambench/cgrates.json +++ b/data/conf/samples/diambench/cgrates.json @@ -27,7 +27,9 @@ "cdrs_conns": ["*internal"] }, "chargers": { - "enabled": true + "enabled": true, + "attributes_conns": ["*internal"], + "string_indexed_fields": ["*req.Account"] }, "rals": { "enabled": true @@ -36,7 +38,8 @@ "enabled": true, "attributes_conns": ["*internal"], "chargers_conns": ["*internal"], - "rals_conns": ["*internal"] + "rals_conns": ["*internal"], + "store_cdrs": false }, "diameter_agent": { "enabled": true,