3 Commits

Author SHA1 Message Date
ionutboangiu
fabdf9702e fix ExtraFields not being cloned in CallDescriptor.Clone 2026-02-09 11:18:53 +01:00
ionutboangiu
65a7f1c145 add guardian lock for dispatcher route race condition 2026-01-27 17:31:56 +01:00
ionutboangiu
ada413c856 add dispatcher concurrent sessions test 2026-01-27 16:53:13 +01:00
3 changed files with 981 additions and 1 deletions

View File

@@ -24,7 +24,9 @@ import (
"strconv"
"sync"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
)
@@ -230,6 +232,9 @@ func (*singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID
if routeID != nil && *routeID != "" {
// overwrite routeID with RouteID:Subsystem
*routeID = utils.ConcatenatedKey(*routeID, subsystem)
guardID := utils.ConcatenatedKey(utils.DispatcherS, utils.RouteID, *routeID)
refID := guardian.Guardian.GuardIDs("", config.CgrConfig().GeneralCfg().LockingTimeout, guardID)
defer guardian.Guardian.UnguardIDs(refID)
// use previously discovered route
if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes,
*routeID); ok && x != nil {
@@ -316,6 +321,9 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID *stri
if routeID != nil && *routeID != "" {
// overwrite routeID with RouteID:Subsystem
*routeID = utils.ConcatenatedKey(*routeID, subsystem)
guardID := utils.ConcatenatedKey(utils.DispatcherS, utils.RouteID, *routeID)
refID := guardian.Guardian.GuardIDs("", config.CgrConfig().GeneralCfg().LockingTimeout, guardID)
defer guardian.Guardian.UnguardIDs(refID)
// use previously discovered route
if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes,
*routeID); ok && x != nil {

View File

@@ -21,6 +21,7 @@ package engine
import (
"errors"
"fmt"
"maps"
"net"
"sync"
"time"
@@ -1035,8 +1036,8 @@ func (cd *CallDescriptor) Clone() *CallDescriptor {
PerformRounding: cd.PerformRounding,
CgrID: cd.CgrID,
RunID: cd.RunID,
ExtraFields: maps.Clone(cd.ExtraFields),
}
}
// AccountSummary returns the AccountSummary for cached account

View File

@@ -0,0 +1,971 @@
//go:build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>
*/
package general_tests
import (
"flag"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var async = flag.Bool("async", false, "Run dispatcher sessions concurrently")
// TestDispatcherMultiChain tests concurrent sessions going through dispatchers.
// Tests that requests for the same account always get routed to the same RALs engine.
// Uses 2 HTTPAgents, 2 Dispatchers, 4 Sessions, 4 RALs, 1 shared engine.
func TestDispatcherMultiChain(t *testing.T) {
// Shared engine: Chargers, CDRs, Apiers (port 6012/6013/6080)
cfgShared := `{
"general": {
"node_id": "shared"
},
"listen": {
"rpc_json": "127.0.0.1:6012",
"rpc_gob": "127.0.0.1:6013",
"http": "127.0.0.1:6080"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"chargers": {
"enabled": true
},
"cdrs": {
"enabled": true,
"chargers_conns": ["*internal"],
"rals_conns": ["*internal"]
},
"rals": {
"enabled": true
},
"apiers": {
"enabled": true
}
}`
// RALS1: port 5012/5013/5080
cfgRALS1 := `{
"general": {
"node_id": "rals1"
},
"listen": {
"rpc_json": "127.0.0.1:5012",
"rpc_gob": "127.0.0.1:5013",
"http": "127.0.0.1:5080"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rals": {
"enabled": true
}
}`
// RALS2: port 5112/5113/5180
cfgRALS2 := `{
"general": {
"node_id": "ral2"
},
"listen": {
"rpc_json": "127.0.0.1:5112",
"rpc_gob": "127.0.0.1:5113",
"http": "127.0.0.1:5180"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rals": {
"enabled": true
}
}`
// RALS3: port 5212/5213/5280
cfgRALS3 := `{
"general": {
"node_id": "rals3"
},
"listen": {
"rpc_json": "127.0.0.1:5212",
"rpc_gob": "127.0.0.1:5213",
"http": "127.0.0.1:5280"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rals": {
"enabled": true
}
}`
// RALS4: port 5312/5313/5380
cfgRALS4 := `{
"general": {
"node_id": "rals4"
},
"listen": {
"rpc_json": "127.0.0.1:5312",
"rpc_gob": "127.0.0.1:5313",
"http": "127.0.0.1:5380"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rals": {
"enabled": true
}
}`
// SM1: port 4012/4013/4080, routes RALs through DSP1
cfgSM1 := `{
"general": {
"node_id": "sm1"
},
"listen": {
"rpc_json": "127.0.0.1:4012",
"rpc_gob": "127.0.0.1:4013",
"http": "127.0.0.1:4080"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rpc_conns": {
"conn_dsp1": {
"conns": [{"address": "127.0.0.1:3012", "transport": "*json"}]
},
"conn_shared": {
"conns": [{"address": "127.0.0.1:6012", "transport": "*json"}]
}
},
"sessions": {
"enabled": true,
"listen_bijson": "127.0.0.1:2014",
"chargers_conns": ["conn_shared"],
"rals_conns": ["conn_dsp1"],
"cdrs_conns": ["conn_shared"]
}
}`
// SM2: port 4112/4113/4180, routes RALs through DSP1
cfgSM2 := `{
"general": {
"node_id": "sm2"
},
"listen": {
"rpc_json": "127.0.0.1:4112",
"rpc_gob": "127.0.0.1:4113",
"http": "127.0.0.1:4180"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rpc_conns": {
"conn_dsp1": {
"conns": [{"address": "127.0.0.1:3012", "transport": "*json"}]
},
"conn_shared": {
"conns": [{"address": "127.0.0.1:6012", "transport": "*json"}]
}
},
"sessions": {
"enabled": true,
"listen_bijson": "127.0.0.1:2114",
"chargers_conns": ["conn_shared"],
"rals_conns": ["conn_dsp1"],
"cdrs_conns": ["conn_shared"]
}
}`
// SM3: port 4212/4213/4280, routes RALs through DSP2
cfgSM3 := `{
"general": {
"node_id": "sm3"
},
"listen": {
"rpc_json": "127.0.0.1:4212",
"rpc_gob": "127.0.0.1:4213",
"http": "127.0.0.1:4280"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rpc_conns": {
"conn_dsp2": {
"conns": [{"address": "127.0.0.1:3112", "transport": "*json"}]
},
"conn_shared": {
"conns": [{"address": "127.0.0.1:6012", "transport": "*json"}]
}
},
"sessions": {
"enabled": true,
"listen_bijson": "127.0.0.1:2214",
"chargers_conns": ["conn_shared"],
"rals_conns": ["conn_dsp2"],
"cdrs_conns": ["conn_shared"]
}
}`
// SM4: port 4312/4313/4380, routes RALs through DSP2
cfgSM4 := `{
"general": {
"node_id": "sm4"
},
"listen": {
"rpc_json": "127.0.0.1:4312",
"rpc_gob": "127.0.0.1:4313",
"http": "127.0.0.1:4380"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rpc_conns": {
"conn_dsp2": {
"conns": [{"address": "127.0.0.1:3112", "transport": "*json"}]
},
"conn_shared": {
"conns": [{"address": "127.0.0.1:6012", "transport": "*json"}]
}
},
"sessions": {
"enabled": true,
"listen_bijson": "127.0.0.1:2314",
"chargers_conns": ["conn_shared"],
"rals_conns": ["conn_dsp2"],
"cdrs_conns": ["conn_shared"]
}
}`
// DSP1: port 3012/3013/3080, routes sessions to SM1/SM2, RALs to RALS1/RALS2
cfgDSP1 := `{
"general": {
"node_id": "dsp1"
},
"listen": {
"rpc_json": "127.0.0.1:3012",
"rpc_gob": "127.0.0.1:3013",
"http": "127.0.0.1:3080"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"dispatchers": {
"enabled": true,
"string_indexed_fields": ["Agent"]
}
}`
// DSP2: port 3112/3113/3180, routes sessions to SM3/SM4, RALs to RALS3/RALS4
cfgDSP2 := `{
"general": {
"node_id": "dsp2"
},
"listen": {
"rpc_json": "127.0.0.1:3112",
"rpc_gob": "127.0.0.1:3113",
"http": "127.0.0.1:3180"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"dispatchers": {
"enabled": true,
"string_indexed_fields": ["Agent"]
}
}`
// HA1: HTTPAgent on port 2012/2013/2080, sends to DSP1
cfgHA1 := `{
"general": {
"node_id": "ha1"
},
"listen": {
"rpc_json": "127.0.0.1:2012",
"rpc_gob": "127.0.0.1:2013",
"http": "127.0.0.1:2080"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rpc_conns": {
"conn_dsp1": {
"conns": [{"address": "127.0.0.1:3012", "transport": "*json"}]
}
},
"http_agent": [
{
"id": "HTTPAgent_HA1",
"url": "/sessions",
"sessions_conns": ["conn_dsp1"],
"request_payload": "*url",
"reply_payload": "*xml",
"request_processors": [
{
"id": "auth",
"filters": ["*string:~*req.request_type:auth"],
"flags": ["*auth", "*accounts"],
"request_fields": [
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice"},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.session_id"},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*prepaid"},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org"},
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call"},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account"},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.account"},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination"},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*constant", "value": "*now"},
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage"},
{"tag": "Agent", "path": "*cgreq.Agent", "type": "*constant", "value": "ha1"},
{"tag": "RouteID", "path": "*cgreq.*route_id", "type": "*variable", "value": "~*req.account"}
],
"reply_fields": [
{"tag": "MaxUsage", "path": "*rep.MaxUsage", "type": "*variable", "value": "~*cgrep.MaxUsage{*duration_seconds}"}
]
},
{
"id": "init",
"filters": ["*string:~*req.request_type:init"],
"flags": ["*initiate", "*accounts"],
"request_fields": [
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice"},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.session_id"},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*prepaid"},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org"},
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call"},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account"},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.account"},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination"},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*constant", "value": "*now"},
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*constant", "value": "*now"},
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage"},
{"tag": "Agent", "path": "*cgreq.Agent", "type": "*constant", "value": "ha1"},
{"tag": "RouteID", "path": "*cgreq.*route_id", "type": "*variable", "value": "~*req.account"}
],
"reply_fields": [
{"tag": "MaxUsage", "path": "*rep.MaxUsage", "type": "*variable", "value": "~*cgrep.MaxUsage{*duration_seconds}"}
]
},
{
"id": "update",
"filters": ["*string:~*req.request_type:update"],
"flags": ["*update", "*accounts"],
"request_fields": [
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice"},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.session_id"},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*prepaid"},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org"},
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call"},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account"},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.account"},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination"},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*constant", "value": "*now"},
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*constant", "value": "*now"},
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage"},
{"tag": "Agent", "path": "*cgreq.Agent", "type": "*constant", "value": "ha1"},
{"tag": "RouteID", "path": "*cgreq.*route_id", "type": "*variable", "value": "~*req.account"}
],
"reply_fields": [
{"tag": "MaxUsage", "path": "*rep.MaxUsage", "type": "*variable", "value": "~*cgrep.MaxUsage{*duration_seconds}"}
]
},
{
"id": "terminate",
"filters": ["*string:~*req.request_type:terminate"],
"flags": ["*terminate", "*accounts", "*cdrs"],
"request_fields": [
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice"},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.session_id"},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*prepaid"},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org"},
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call"},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account"},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.account"},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination"},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*constant", "value": "*now"},
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*constant", "value": "*now"},
{"tag": "Agent", "path": "*cgreq.Agent", "type": "*constant", "value": "ha1"},
{"tag": "RouteID", "path": "*cgreq.*route_id", "type": "*variable", "value": "~*req.account"}
],
"reply_fields": [
{"tag": "Status", "path": "*rep.Status", "type": "*constant", "value": "OK"}
]
}
]
}
]
}`
// HA2: HTTPAgent on port 2112/2113/2180, sends to DSP2
cfgHA2 := `{
"general": {
"node_id": "ha2"
},
"listen": {
"rpc_json": "127.0.0.1:2112",
"rpc_gob": "127.0.0.1:2113",
"http": "127.0.0.1:2180"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rpc_conns": {
"conn_dsp2": {
"conns": [{"address": "127.0.0.1:3112", "transport": "*json"}]
}
},
"http_agent": [
{
"id": "HTTPAgent_HA2",
"url": "/sessions",
"sessions_conns": ["conn_dsp2"],
"request_payload": "*url",
"reply_payload": "*xml",
"request_processors": [
{
"id": "auth",
"filters": ["*string:~*req.request_type:auth"],
"flags": ["*auth", "*accounts"],
"request_fields": [
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice"},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.session_id"},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*prepaid"},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org"},
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call"},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account"},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.account"},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination"},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*constant", "value": "*now"},
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage"},
{"tag": "Agent", "path": "*cgreq.Agent", "type": "*constant", "value": "ha2"},
{"tag": "RouteID", "path": "*cgreq.*route_id", "type": "*variable", "value": "~*req.account"}
],
"reply_fields": [
{"tag": "MaxUsage", "path": "*rep.MaxUsage", "type": "*variable", "value": "~*cgrep.MaxUsage{*duration_seconds}"}
]
},
{
"id": "init",
"filters": ["*string:~*req.request_type:init"],
"flags": ["*initiate", "*accounts"],
"request_fields": [
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice"},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.session_id"},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*prepaid"},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org"},
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call"},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account"},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.account"},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination"},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*constant", "value": "*now"},
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*constant", "value": "*now"},
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage"},
{"tag": "Agent", "path": "*cgreq.Agent", "type": "*constant", "value": "ha2"},
{"tag": "RouteID", "path": "*cgreq.*route_id", "type": "*variable", "value": "~*req.account"}
],
"reply_fields": [
{"tag": "MaxUsage", "path": "*rep.MaxUsage", "type": "*variable", "value": "~*cgrep.MaxUsage{*duration_seconds}"}
]
},
{
"id": "update",
"filters": ["*string:~*req.request_type:update"],
"flags": ["*update", "*accounts"],
"request_fields": [
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice"},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.session_id"},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*prepaid"},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org"},
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call"},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account"},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.account"},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination"},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*constant", "value": "*now"},
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*constant", "value": "*now"},
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage"},
{"tag": "Agent", "path": "*cgreq.Agent", "type": "*constant", "value": "ha2"},
{"tag": "RouteID", "path": "*cgreq.*route_id", "type": "*variable", "value": "~*req.account"}
],
"reply_fields": [
{"tag": "MaxUsage", "path": "*rep.MaxUsage", "type": "*variable", "value": "~*cgrep.MaxUsage{*duration_seconds}"}
]
},
{
"id": "terminate",
"filters": ["*string:~*req.request_type:terminate"],
"flags": ["*terminate", "*accounts", "*cdrs"],
"request_fields": [
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice"},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.session_id"},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*prepaid"},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org"},
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call"},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account"},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.account"},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination"},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*constant", "value": "*now"},
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*constant", "value": "*now"},
{"tag": "Agent", "path": "*cgreq.Agent", "type": "*constant", "value": "ha2"},
{"tag": "RouteID", "path": "*cgreq.*route_id", "type": "*variable", "value": "~*req.account"}
],
"reply_fields": [
{"tag": "Status", "path": "*rep.Status", "type": "*constant", "value": "OK"}
]
}
]
}
]
}`
// Tariff plan files with dispatcher profiles for both DSP1 and DSP2
tpFiles := map[string]string{
utils.DestinationRatesCsv: `#Id,DestinationId,RatesTag,RoundingMethod,RoundingDecimals,MaxCost,MaxCostStrategy
DR_ANY,*any,RT_ANY,*up,0,0,`,
utils.RatesCsv: `#Id,ConnectFee,Rate,RateUnit,RateIncrement,GroupIntervalStart
RT_ANY,0,1,1s,1s,0s`,
utils.RatingPlansCsv: `#Id,DestinationRatesId,TimingTag,Weight
RP_ANY,DR_ANY,*any,10`,
utils.RatingProfilesCsv: `#Tenant,Category,Subject,ActivationTime,RatingPlanId,RatesFallbackSubject
cgrates.org,call,1001,,RP_ANY,
cgrates.org,call,1002,,RP_ANY,
cgrates.org,call,1003,,RP_ANY,
cgrates.org,call,1004,,RP_ANY,
cgrates.org,call,1005,,RP_ANY,
cgrates.org,call,2001,,RP_ANY,
cgrates.org,call,2002,,RP_ANY,
cgrates.org,call,2003,,RP_ANY,
cgrates.org,call,2004,,RP_ANY,
cgrates.org,call,2005,,RP_ANY,`,
utils.ChargersCsv: `#Tenant,ID,FilterIDs,ActivationInterval,RunID,AttributeIDs,Weight
cgrates.org,DEFAULT,,,*default,*none,0`,
utils.FiltersCsv: `#Tenant,ID,Type,Path,Values,ActivationInterval
cgrates.org,FLTR_HA1,*string,~*req.Agent,ha1,
cgrates.org,FLTR_HA2,*string,~*req.Agent,ha2,`,
utils.DispatcherHostsCsv: `#Tenant,ID,Address,Transport,TLS
cgrates.org,SM1,127.0.0.1:4012,*json,false
cgrates.org,SM2,127.0.0.1:4112,*json,false
cgrates.org,SM3,127.0.0.1:4212,*json,false
cgrates.org,SM4,127.0.0.1:4312,*json,false
cgrates.org,RALS1,127.0.0.1:5012,*json,false
cgrates.org,RALS2,127.0.0.1:5112,*json,false
cgrates.org,RALS3,127.0.0.1:5212,*json,false
cgrates.org,RALS4,127.0.0.1:5312,*json,false`,
utils.DispatcherProfilesCsv: `#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight
cgrates.org,DSP1_SM,*sessions,FLTR_HA1,,*round_robin,,SM1,,10,false,,10
cgrates.org,DSP1_SM,,,,,,SM2,,10,,,
cgrates.org,DSP1_RALS,*responder,FLTR_HA1,,*round_robin,,RALS1,,10,false,,10
cgrates.org,DSP1_RALS,,,,,,RALS2,,10,,,
cgrates.org,DSP2_SM,*sessions,FLTR_HA2,,*round_robin,,SM3,,10,false,,10
cgrates.org,DSP2_SM,,,,,,SM4,,10,,,
cgrates.org,DSP2_RALS,*responder,FLTR_HA2,,*round_robin,,RALS3,,10,false,,10
cgrates.org,DSP2_RALS,,,,,,RALS4,,10,,,`,
}
// Start engines in correct order:
// 1. Shared engine first (load tariff plan through it)
// 2. RALS1-4
// 3. DSP1-2 (must be up before SM engines since SM connects to DSP for rals_conns)
// 4. SM1-4
// 5. HA1-2
ngShared := TestEnvironment{
ConfigJSON: cfgShared,
TpFiles: tpFiles,
}
clientShared, _ := ngShared.Setup(t, 0)
ngRALS1 := TestEnvironment{
ConfigJSON: cfgRALS1,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngRALS1.Setup(t, 0)
ngRALS2 := TestEnvironment{
ConfigJSON: cfgRALS2,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngRALS2.Setup(t, 0)
ngRALS3 := TestEnvironment{
ConfigJSON: cfgRALS3,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngRALS3.Setup(t, 0)
ngRALS4 := TestEnvironment{
ConfigJSON: cfgRALS4,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngRALS4.Setup(t, 0)
ngDSP1 := TestEnvironment{
ConfigJSON: cfgDSP1,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngDSP1.Setup(t, 0)
ngDSP2 := TestEnvironment{
ConfigJSON: cfgDSP2,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngDSP2.Setup(t, 0)
ngSM1 := TestEnvironment{
ConfigJSON: cfgSM1,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngSM1.Setup(t, 0)
ngSM2 := TestEnvironment{
ConfigJSON: cfgSM2,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngSM2.Setup(t, 0)
ngSM3 := TestEnvironment{
ConfigJSON: cfgSM3,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngSM3.Setup(t, 0)
ngSM4 := TestEnvironment{
ConfigJSON: cfgSM4,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngSM4.Setup(t, 0)
ngHA1 := TestEnvironment{
ConfigJSON: cfgHA1,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngHA1.Setup(t, 0)
ngHA2 := TestEnvironment{
ConfigJSON: cfgHA2,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngHA2.Setup(t, 0)
httpC := &http.Client{}
var sessionNo atomic.Int64
sendRequest := func(t *testing.T, port int, reqType, sessionID, account, destination, usage string) string {
t.Helper()
resp, err := httpC.PostForm(fmt.Sprintf("http://127.0.0.1:%d/sessions", port),
url.Values{
"request_type": {reqType},
"session_id": {sessionID},
"account": {account},
"destination": {destination},
"usage": {usage},
})
if err != nil {
t.Fatalf("HTTP request failed: %v", err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
t.Fatalf("HTTP request returned %d: %s", resp.StatusCode, string(body))
}
return string(body)
}
runHTTPSession := func(t *testing.T, port int, account, destination string, initUsage time.Duration, updateUsages ...time.Duration) {
t.Helper()
sessionID := fmt.Sprintf("session_%s_%d", account, sessionNo.Add(1))
sendRequest(t, port, "auth", sessionID, account, destination, initUsage.String())
sendRequest(t, port, "init", sessionID, account, destination, initUsage.String())
for _, u := range updateUsages {
sendRequest(t, port, "update", sessionID, account, destination, u.String())
}
sendRequest(t, port, "terminate", sessionID, account, destination, "")
}
setBalance := func(t *testing.T, acc string, value float64) {
t.Helper()
var reply string
if err := clientShared.Call(utils.APIerSv2SetBalance,
utils.AttrSetBalance{
Tenant: "cgrates.org",
Account: acc,
Value: value,
BalanceType: utils.MONETARY,
Balance: map[string]any{utils.ID: "test"},
}, &reply); err != nil {
t.Fatal(err)
}
}
checkBalance := func(t *testing.T, acc string, want float64) {
t.Helper()
var acnt engine.Account
if err := clientShared.Call(utils.APIerSv2GetAccount,
&utils.AttrGetAccount{
Tenant: "cgrates.org",
Account: acc,
}, &acnt); err != nil {
t.Fatalf("GetAccount failed: %v", err)
}
if bal := acnt.BalanceMap[utils.MONETARY][0]; bal == nil {
t.Errorf("balance not found for account %q", acc)
} else if bal.Value != want {
t.Errorf("account %q balance = %v, want %v", acc, bal.Value, want)
}
}
// Set balances for all 10 accounts
setBalance(t, "1001", 100)
setBalance(t, "1002", 100)
setBalance(t, "1003", 100)
setBalance(t, "1004", 100)
setBalance(t, "1005", 100)
setBalance(t, "2001", 100)
setBalance(t, "2002", 100)
setBalance(t, "2003", 100)
setBalance(t, "2004", 100)
setBalance(t, "2005", 100)
type sessionParams struct {
port int
account string
destination string
initUsage time.Duration
updates []time.Duration
}
// 2 sessions per account, all run concurrently when -async flag is set.
// 18s + 10s = 28s total per account, expected balance: 100 - 28 = 72
sessions := []sessionParams{
{2080, "1001", "1099", 10 * time.Second, []time.Duration{5 * time.Second, 3 * time.Second}},
{2080, "1001", "1099", 5 * time.Second, []time.Duration{3 * time.Second, 2 * time.Second}},
{2080, "1002", "1099", 10 * time.Second, []time.Duration{5 * time.Second, 3 * time.Second}},
{2080, "1002", "1099", 5 * time.Second, []time.Duration{3 * time.Second, 2 * time.Second}},
{2080, "1003", "1099", 10 * time.Second, []time.Duration{5 * time.Second, 3 * time.Second}},
{2080, "1003", "1099", 5 * time.Second, []time.Duration{3 * time.Second, 2 * time.Second}},
{2080, "1004", "1099", 10 * time.Second, []time.Duration{5 * time.Second, 3 * time.Second}},
{2080, "1004", "1099", 5 * time.Second, []time.Duration{3 * time.Second, 2 * time.Second}},
{2080, "1005", "1099", 10 * time.Second, []time.Duration{5 * time.Second, 3 * time.Second}},
{2080, "1005", "1099", 5 * time.Second, []time.Duration{3 * time.Second, 2 * time.Second}},
{2180, "2001", "2099", 10 * time.Second, []time.Duration{5 * time.Second, 3 * time.Second}},
{2180, "2001", "2099", 5 * time.Second, []time.Duration{3 * time.Second, 2 * time.Second}},
{2180, "2002", "2099", 10 * time.Second, []time.Duration{5 * time.Second, 3 * time.Second}},
{2180, "2002", "2099", 5 * time.Second, []time.Duration{3 * time.Second, 2 * time.Second}},
{2180, "2003", "2099", 10 * time.Second, []time.Duration{5 * time.Second, 3 * time.Second}},
{2180, "2003", "2099", 5 * time.Second, []time.Duration{3 * time.Second, 2 * time.Second}},
{2180, "2004", "2099", 10 * time.Second, []time.Duration{5 * time.Second, 3 * time.Second}},
{2180, "2004", "2099", 5 * time.Second, []time.Duration{3 * time.Second, 2 * time.Second}},
{2180, "2005", "2099", 10 * time.Second, []time.Duration{5 * time.Second, 3 * time.Second}},
{2180, "2005", "2099", 5 * time.Second, []time.Duration{3 * time.Second, 2 * time.Second}},
}
if *async {
var wg sync.WaitGroup
for _, s := range sessions {
wg.Add(1)
go func(sp sessionParams) {
defer wg.Done()
runHTTPSession(t, sp.port, sp.account, sp.destination, sp.initUsage, sp.updates...)
}(s)
}
wg.Wait()
} else {
for _, s := range sessions {
runHTTPSession(t, s.port, s.account, s.destination, s.initUsage, s.updates...)
}
}
checkBalance(t, "1001", 72)
checkBalance(t, "1002", 72)
checkBalance(t, "1003", 72)
checkBalance(t, "1004", 72)
checkBalance(t, "1005", 72)
checkBalance(t, "2001", 72)
checkBalance(t, "2002", 72)
checkBalance(t, "2003", 72)
checkBalance(t, "2004", 72)
checkBalance(t, "2005", 72)
}