5 Commits

Author SHA1 Message Date
ionutboangiu
fabdf9702e fix ExtraFields not being cloned in CallDescriptor.Clone 2026-02-09 11:18:53 +01:00
ionutboangiu
65a7f1c145 add guardian lock for dispatcher route race condition 2026-01-27 17:31:56 +01:00
ionutboangiu
ada413c856 add dispatcher concurrent sessions test 2026-01-27 16:53:13 +01:00
ionutboangiu
aca201f13b test HTTPAgent sessions with dispatcher round-robin 2026-01-15 17:00:02 +01:00
ionutboangiu
4203abaf02 add dispatcher sessions round-robin test 2026-01-15 12:02:40 +01:00
6 changed files with 1790 additions and 7 deletions

View File

@@ -24,7 +24,9 @@ import (
"strconv"
"sync"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
)
@@ -230,6 +232,9 @@ func (*singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID
if routeID != nil && *routeID != "" {
// overwrite routeID with RouteID:Subsystem
*routeID = utils.ConcatenatedKey(*routeID, subsystem)
guardID := utils.ConcatenatedKey(utils.DispatcherS, utils.RouteID, *routeID)
refID := guardian.Guardian.GuardIDs("", config.CgrConfig().GeneralCfg().LockingTimeout, guardID)
defer guardian.Guardian.UnguardIDs(refID)
// use previously discovered route
if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes,
*routeID); ok && x != nil {
@@ -316,6 +321,9 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID *stri
if routeID != nil && *routeID != "" {
// overwrite routeID with RouteID:Subsystem
*routeID = utils.ConcatenatedKey(*routeID, subsystem)
guardID := utils.ConcatenatedKey(utils.DispatcherS, utils.RouteID, *routeID)
refID := guardian.Guardian.GuardIDs("", config.CgrConfig().GeneralCfg().LockingTimeout, guardID)
defer guardian.Guardian.UnguardIDs(refID)
// use previously discovered route
if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes,
*routeID); ok && x != nil {

View File

@@ -21,6 +21,7 @@ package engine
import (
"errors"
"fmt"
"maps"
"net"
"sync"
"time"
@@ -1035,8 +1036,8 @@ func (cd *CallDescriptor) Clone() *CallDescriptor {
PerformRounding: cd.PerformRounding,
CgrID: cd.CgrID,
RunID: cd.RunID,
ExtraFields: maps.Clone(cd.ExtraFields),
}
}
// AccountSummary returns the AccountSummary for cached account

View File

@@ -0,0 +1,971 @@
//go:build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>
*/
package general_tests
import (
"flag"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var async = flag.Bool("async", false, "Run dispatcher sessions concurrently")
// TestDispatcherMultiChain tests concurrent sessions going through dispatchers.
// Tests that requests for the same account always get routed to the same RALs engine.
// Uses 2 HTTPAgents, 2 Dispatchers, 4 Sessions, 4 RALs, 1 shared engine.
func TestDispatcherMultiChain(t *testing.T) {
// Shared engine: Chargers, CDRs, Apiers (port 6012/6013/6080)
cfgShared := `{
"general": {
"node_id": "shared"
},
"listen": {
"rpc_json": "127.0.0.1:6012",
"rpc_gob": "127.0.0.1:6013",
"http": "127.0.0.1:6080"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"chargers": {
"enabled": true
},
"cdrs": {
"enabled": true,
"chargers_conns": ["*internal"],
"rals_conns": ["*internal"]
},
"rals": {
"enabled": true
},
"apiers": {
"enabled": true
}
}`
// RALS1: port 5012/5013/5080
cfgRALS1 := `{
"general": {
"node_id": "rals1"
},
"listen": {
"rpc_json": "127.0.0.1:5012",
"rpc_gob": "127.0.0.1:5013",
"http": "127.0.0.1:5080"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rals": {
"enabled": true
}
}`
// RALS2: port 5112/5113/5180
cfgRALS2 := `{
"general": {
"node_id": "ral2"
},
"listen": {
"rpc_json": "127.0.0.1:5112",
"rpc_gob": "127.0.0.1:5113",
"http": "127.0.0.1:5180"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rals": {
"enabled": true
}
}`
// RALS3: port 5212/5213/5280
cfgRALS3 := `{
"general": {
"node_id": "rals3"
},
"listen": {
"rpc_json": "127.0.0.1:5212",
"rpc_gob": "127.0.0.1:5213",
"http": "127.0.0.1:5280"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rals": {
"enabled": true
}
}`
// RALS4: port 5312/5313/5380
cfgRALS4 := `{
"general": {
"node_id": "rals4"
},
"listen": {
"rpc_json": "127.0.0.1:5312",
"rpc_gob": "127.0.0.1:5313",
"http": "127.0.0.1:5380"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rals": {
"enabled": true
}
}`
// SM1: port 4012/4013/4080, routes RALs through DSP1
cfgSM1 := `{
"general": {
"node_id": "sm1"
},
"listen": {
"rpc_json": "127.0.0.1:4012",
"rpc_gob": "127.0.0.1:4013",
"http": "127.0.0.1:4080"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rpc_conns": {
"conn_dsp1": {
"conns": [{"address": "127.0.0.1:3012", "transport": "*json"}]
},
"conn_shared": {
"conns": [{"address": "127.0.0.1:6012", "transport": "*json"}]
}
},
"sessions": {
"enabled": true,
"listen_bijson": "127.0.0.1:2014",
"chargers_conns": ["conn_shared"],
"rals_conns": ["conn_dsp1"],
"cdrs_conns": ["conn_shared"]
}
}`
// SM2: port 4112/4113/4180, routes RALs through DSP1
cfgSM2 := `{
"general": {
"node_id": "sm2"
},
"listen": {
"rpc_json": "127.0.0.1:4112",
"rpc_gob": "127.0.0.1:4113",
"http": "127.0.0.1:4180"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rpc_conns": {
"conn_dsp1": {
"conns": [{"address": "127.0.0.1:3012", "transport": "*json"}]
},
"conn_shared": {
"conns": [{"address": "127.0.0.1:6012", "transport": "*json"}]
}
},
"sessions": {
"enabled": true,
"listen_bijson": "127.0.0.1:2114",
"chargers_conns": ["conn_shared"],
"rals_conns": ["conn_dsp1"],
"cdrs_conns": ["conn_shared"]
}
}`
// SM3: port 4212/4213/4280, routes RALs through DSP2
cfgSM3 := `{
"general": {
"node_id": "sm3"
},
"listen": {
"rpc_json": "127.0.0.1:4212",
"rpc_gob": "127.0.0.1:4213",
"http": "127.0.0.1:4280"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rpc_conns": {
"conn_dsp2": {
"conns": [{"address": "127.0.0.1:3112", "transport": "*json"}]
},
"conn_shared": {
"conns": [{"address": "127.0.0.1:6012", "transport": "*json"}]
}
},
"sessions": {
"enabled": true,
"listen_bijson": "127.0.0.1:2214",
"chargers_conns": ["conn_shared"],
"rals_conns": ["conn_dsp2"],
"cdrs_conns": ["conn_shared"]
}
}`
// SM4: port 4312/4313/4380, routes RALs through DSP2
cfgSM4 := `{
"general": {
"node_id": "sm4"
},
"listen": {
"rpc_json": "127.0.0.1:4312",
"rpc_gob": "127.0.0.1:4313",
"http": "127.0.0.1:4380"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rpc_conns": {
"conn_dsp2": {
"conns": [{"address": "127.0.0.1:3112", "transport": "*json"}]
},
"conn_shared": {
"conns": [{"address": "127.0.0.1:6012", "transport": "*json"}]
}
},
"sessions": {
"enabled": true,
"listen_bijson": "127.0.0.1:2314",
"chargers_conns": ["conn_shared"],
"rals_conns": ["conn_dsp2"],
"cdrs_conns": ["conn_shared"]
}
}`
// DSP1: port 3012/3013/3080, routes sessions to SM1/SM2, RALs to RALS1/RALS2
cfgDSP1 := `{
"general": {
"node_id": "dsp1"
},
"listen": {
"rpc_json": "127.0.0.1:3012",
"rpc_gob": "127.0.0.1:3013",
"http": "127.0.0.1:3080"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"dispatchers": {
"enabled": true,
"string_indexed_fields": ["Agent"]
}
}`
// DSP2: port 3112/3113/3180, routes sessions to SM3/SM4, RALs to RALS3/RALS4
cfgDSP2 := `{
"general": {
"node_id": "dsp2"
},
"listen": {
"rpc_json": "127.0.0.1:3112",
"rpc_gob": "127.0.0.1:3113",
"http": "127.0.0.1:3180"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"dispatchers": {
"enabled": true,
"string_indexed_fields": ["Agent"]
}
}`
// HA1: HTTPAgent on port 2012/2013/2080, sends to DSP1
cfgHA1 := `{
"general": {
"node_id": "ha1"
},
"listen": {
"rpc_json": "127.0.0.1:2012",
"rpc_gob": "127.0.0.1:2013",
"http": "127.0.0.1:2080"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rpc_conns": {
"conn_dsp1": {
"conns": [{"address": "127.0.0.1:3012", "transport": "*json"}]
}
},
"http_agent": [
{
"id": "HTTPAgent_HA1",
"url": "/sessions",
"sessions_conns": ["conn_dsp1"],
"request_payload": "*url",
"reply_payload": "*xml",
"request_processors": [
{
"id": "auth",
"filters": ["*string:~*req.request_type:auth"],
"flags": ["*auth", "*accounts"],
"request_fields": [
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice"},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.session_id"},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*prepaid"},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org"},
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call"},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account"},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.account"},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination"},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*constant", "value": "*now"},
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage"},
{"tag": "Agent", "path": "*cgreq.Agent", "type": "*constant", "value": "ha1"},
{"tag": "RouteID", "path": "*cgreq.*route_id", "type": "*variable", "value": "~*req.account"}
],
"reply_fields": [
{"tag": "MaxUsage", "path": "*rep.MaxUsage", "type": "*variable", "value": "~*cgrep.MaxUsage{*duration_seconds}"}
]
},
{
"id": "init",
"filters": ["*string:~*req.request_type:init"],
"flags": ["*initiate", "*accounts"],
"request_fields": [
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice"},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.session_id"},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*prepaid"},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org"},
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call"},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account"},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.account"},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination"},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*constant", "value": "*now"},
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*constant", "value": "*now"},
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage"},
{"tag": "Agent", "path": "*cgreq.Agent", "type": "*constant", "value": "ha1"},
{"tag": "RouteID", "path": "*cgreq.*route_id", "type": "*variable", "value": "~*req.account"}
],
"reply_fields": [
{"tag": "MaxUsage", "path": "*rep.MaxUsage", "type": "*variable", "value": "~*cgrep.MaxUsage{*duration_seconds}"}
]
},
{
"id": "update",
"filters": ["*string:~*req.request_type:update"],
"flags": ["*update", "*accounts"],
"request_fields": [
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice"},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.session_id"},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*prepaid"},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org"},
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call"},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account"},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.account"},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination"},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*constant", "value": "*now"},
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*constant", "value": "*now"},
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage"},
{"tag": "Agent", "path": "*cgreq.Agent", "type": "*constant", "value": "ha1"},
{"tag": "RouteID", "path": "*cgreq.*route_id", "type": "*variable", "value": "~*req.account"}
],
"reply_fields": [
{"tag": "MaxUsage", "path": "*rep.MaxUsage", "type": "*variable", "value": "~*cgrep.MaxUsage{*duration_seconds}"}
]
},
{
"id": "terminate",
"filters": ["*string:~*req.request_type:terminate"],
"flags": ["*terminate", "*accounts", "*cdrs"],
"request_fields": [
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice"},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.session_id"},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*prepaid"},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org"},
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call"},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account"},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.account"},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination"},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*constant", "value": "*now"},
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*constant", "value": "*now"},
{"tag": "Agent", "path": "*cgreq.Agent", "type": "*constant", "value": "ha1"},
{"tag": "RouteID", "path": "*cgreq.*route_id", "type": "*variable", "value": "~*req.account"}
],
"reply_fields": [
{"tag": "Status", "path": "*rep.Status", "type": "*constant", "value": "OK"}
]
}
]
}
]
}`
// HA2: HTTPAgent on port 2112/2113/2180, sends to DSP2
cfgHA2 := `{
"general": {
"node_id": "ha2"
},
"listen": {
"rpc_json": "127.0.0.1:2112",
"rpc_gob": "127.0.0.1:2113",
"http": "127.0.0.1:2180"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rpc_conns": {
"conn_dsp2": {
"conns": [{"address": "127.0.0.1:3112", "transport": "*json"}]
}
},
"http_agent": [
{
"id": "HTTPAgent_HA2",
"url": "/sessions",
"sessions_conns": ["conn_dsp2"],
"request_payload": "*url",
"reply_payload": "*xml",
"request_processors": [
{
"id": "auth",
"filters": ["*string:~*req.request_type:auth"],
"flags": ["*auth", "*accounts"],
"request_fields": [
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice"},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.session_id"},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*prepaid"},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org"},
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call"},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account"},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.account"},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination"},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*constant", "value": "*now"},
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage"},
{"tag": "Agent", "path": "*cgreq.Agent", "type": "*constant", "value": "ha2"},
{"tag": "RouteID", "path": "*cgreq.*route_id", "type": "*variable", "value": "~*req.account"}
],
"reply_fields": [
{"tag": "MaxUsage", "path": "*rep.MaxUsage", "type": "*variable", "value": "~*cgrep.MaxUsage{*duration_seconds}"}
]
},
{
"id": "init",
"filters": ["*string:~*req.request_type:init"],
"flags": ["*initiate", "*accounts"],
"request_fields": [
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice"},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.session_id"},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*prepaid"},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org"},
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call"},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account"},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.account"},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination"},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*constant", "value": "*now"},
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*constant", "value": "*now"},
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage"},
{"tag": "Agent", "path": "*cgreq.Agent", "type": "*constant", "value": "ha2"},
{"tag": "RouteID", "path": "*cgreq.*route_id", "type": "*variable", "value": "~*req.account"}
],
"reply_fields": [
{"tag": "MaxUsage", "path": "*rep.MaxUsage", "type": "*variable", "value": "~*cgrep.MaxUsage{*duration_seconds}"}
]
},
{
"id": "update",
"filters": ["*string:~*req.request_type:update"],
"flags": ["*update", "*accounts"],
"request_fields": [
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice"},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.session_id"},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*prepaid"},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org"},
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call"},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account"},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.account"},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination"},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*constant", "value": "*now"},
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*constant", "value": "*now"},
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage"},
{"tag": "Agent", "path": "*cgreq.Agent", "type": "*constant", "value": "ha2"},
{"tag": "RouteID", "path": "*cgreq.*route_id", "type": "*variable", "value": "~*req.account"}
],
"reply_fields": [
{"tag": "MaxUsage", "path": "*rep.MaxUsage", "type": "*variable", "value": "~*cgrep.MaxUsage{*duration_seconds}"}
]
},
{
"id": "terminate",
"filters": ["*string:~*req.request_type:terminate"],
"flags": ["*terminate", "*accounts", "*cdrs"],
"request_fields": [
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice"},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.session_id"},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*prepaid"},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org"},
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call"},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account"},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.account"},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination"},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*constant", "value": "*now"},
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*constant", "value": "*now"},
{"tag": "Agent", "path": "*cgreq.Agent", "type": "*constant", "value": "ha2"},
{"tag": "RouteID", "path": "*cgreq.*route_id", "type": "*variable", "value": "~*req.account"}
],
"reply_fields": [
{"tag": "Status", "path": "*rep.Status", "type": "*constant", "value": "OK"}
]
}
]
}
]
}`
// Tariff plan files with dispatcher profiles for both DSP1 and DSP2
tpFiles := map[string]string{
utils.DestinationRatesCsv: `#Id,DestinationId,RatesTag,RoundingMethod,RoundingDecimals,MaxCost,MaxCostStrategy
DR_ANY,*any,RT_ANY,*up,0,0,`,
utils.RatesCsv: `#Id,ConnectFee,Rate,RateUnit,RateIncrement,GroupIntervalStart
RT_ANY,0,1,1s,1s,0s`,
utils.RatingPlansCsv: `#Id,DestinationRatesId,TimingTag,Weight
RP_ANY,DR_ANY,*any,10`,
utils.RatingProfilesCsv: `#Tenant,Category,Subject,ActivationTime,RatingPlanId,RatesFallbackSubject
cgrates.org,call,1001,,RP_ANY,
cgrates.org,call,1002,,RP_ANY,
cgrates.org,call,1003,,RP_ANY,
cgrates.org,call,1004,,RP_ANY,
cgrates.org,call,1005,,RP_ANY,
cgrates.org,call,2001,,RP_ANY,
cgrates.org,call,2002,,RP_ANY,
cgrates.org,call,2003,,RP_ANY,
cgrates.org,call,2004,,RP_ANY,
cgrates.org,call,2005,,RP_ANY,`,
utils.ChargersCsv: `#Tenant,ID,FilterIDs,ActivationInterval,RunID,AttributeIDs,Weight
cgrates.org,DEFAULT,,,*default,*none,0`,
utils.FiltersCsv: `#Tenant,ID,Type,Path,Values,ActivationInterval
cgrates.org,FLTR_HA1,*string,~*req.Agent,ha1,
cgrates.org,FLTR_HA2,*string,~*req.Agent,ha2,`,
utils.DispatcherHostsCsv: `#Tenant,ID,Address,Transport,TLS
cgrates.org,SM1,127.0.0.1:4012,*json,false
cgrates.org,SM2,127.0.0.1:4112,*json,false
cgrates.org,SM3,127.0.0.1:4212,*json,false
cgrates.org,SM4,127.0.0.1:4312,*json,false
cgrates.org,RALS1,127.0.0.1:5012,*json,false
cgrates.org,RALS2,127.0.0.1:5112,*json,false
cgrates.org,RALS3,127.0.0.1:5212,*json,false
cgrates.org,RALS4,127.0.0.1:5312,*json,false`,
utils.DispatcherProfilesCsv: `#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight
cgrates.org,DSP1_SM,*sessions,FLTR_HA1,,*round_robin,,SM1,,10,false,,10
cgrates.org,DSP1_SM,,,,,,SM2,,10,,,
cgrates.org,DSP1_RALS,*responder,FLTR_HA1,,*round_robin,,RALS1,,10,false,,10
cgrates.org,DSP1_RALS,,,,,,RALS2,,10,,,
cgrates.org,DSP2_SM,*sessions,FLTR_HA2,,*round_robin,,SM3,,10,false,,10
cgrates.org,DSP2_SM,,,,,,SM4,,10,,,
cgrates.org,DSP2_RALS,*responder,FLTR_HA2,,*round_robin,,RALS3,,10,false,,10
cgrates.org,DSP2_RALS,,,,,,RALS4,,10,,,`,
}
// Start engines in correct order:
// 1. Shared engine first (load tariff plan through it)
// 2. RALS1-4
// 3. DSP1-2 (must be up before SM engines since SM connects to DSP for rals_conns)
// 4. SM1-4
// 5. HA1-2
ngShared := TestEnvironment{
ConfigJSON: cfgShared,
TpFiles: tpFiles,
}
clientShared, _ := ngShared.Setup(t, 0)
ngRALS1 := TestEnvironment{
ConfigJSON: cfgRALS1,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngRALS1.Setup(t, 0)
ngRALS2 := TestEnvironment{
ConfigJSON: cfgRALS2,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngRALS2.Setup(t, 0)
ngRALS3 := TestEnvironment{
ConfigJSON: cfgRALS3,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngRALS3.Setup(t, 0)
ngRALS4 := TestEnvironment{
ConfigJSON: cfgRALS4,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngRALS4.Setup(t, 0)
ngDSP1 := TestEnvironment{
ConfigJSON: cfgDSP1,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngDSP1.Setup(t, 0)
ngDSP2 := TestEnvironment{
ConfigJSON: cfgDSP2,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngDSP2.Setup(t, 0)
ngSM1 := TestEnvironment{
ConfigJSON: cfgSM1,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngSM1.Setup(t, 0)
ngSM2 := TestEnvironment{
ConfigJSON: cfgSM2,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngSM2.Setup(t, 0)
ngSM3 := TestEnvironment{
ConfigJSON: cfgSM3,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngSM3.Setup(t, 0)
ngSM4 := TestEnvironment{
ConfigJSON: cfgSM4,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngSM4.Setup(t, 0)
ngHA1 := TestEnvironment{
ConfigJSON: cfgHA1,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngHA1.Setup(t, 0)
ngHA2 := TestEnvironment{
ConfigJSON: cfgHA2,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngHA2.Setup(t, 0)
httpC := &http.Client{}
var sessionNo atomic.Int64
sendRequest := func(t *testing.T, port int, reqType, sessionID, account, destination, usage string) string {
t.Helper()
resp, err := httpC.PostForm(fmt.Sprintf("http://127.0.0.1:%d/sessions", port),
url.Values{
"request_type": {reqType},
"session_id": {sessionID},
"account": {account},
"destination": {destination},
"usage": {usage},
})
if err != nil {
t.Fatalf("HTTP request failed: %v", err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
t.Fatalf("HTTP request returned %d: %s", resp.StatusCode, string(body))
}
return string(body)
}
runHTTPSession := func(t *testing.T, port int, account, destination string, initUsage time.Duration, updateUsages ...time.Duration) {
t.Helper()
sessionID := fmt.Sprintf("session_%s_%d", account, sessionNo.Add(1))
sendRequest(t, port, "auth", sessionID, account, destination, initUsage.String())
sendRequest(t, port, "init", sessionID, account, destination, initUsage.String())
for _, u := range updateUsages {
sendRequest(t, port, "update", sessionID, account, destination, u.String())
}
sendRequest(t, port, "terminate", sessionID, account, destination, "")
}
setBalance := func(t *testing.T, acc string, value float64) {
t.Helper()
var reply string
if err := clientShared.Call(utils.APIerSv2SetBalance,
utils.AttrSetBalance{
Tenant: "cgrates.org",
Account: acc,
Value: value,
BalanceType: utils.MONETARY,
Balance: map[string]any{utils.ID: "test"},
}, &reply); err != nil {
t.Fatal(err)
}
}
checkBalance := func(t *testing.T, acc string, want float64) {
t.Helper()
var acnt engine.Account
if err := clientShared.Call(utils.APIerSv2GetAccount,
&utils.AttrGetAccount{
Tenant: "cgrates.org",
Account: acc,
}, &acnt); err != nil {
t.Fatalf("GetAccount failed: %v", err)
}
if bal := acnt.BalanceMap[utils.MONETARY][0]; bal == nil {
t.Errorf("balance not found for account %q", acc)
} else if bal.Value != want {
t.Errorf("account %q balance = %v, want %v", acc, bal.Value, want)
}
}
// Set balances for all 10 accounts
setBalance(t, "1001", 100)
setBalance(t, "1002", 100)
setBalance(t, "1003", 100)
setBalance(t, "1004", 100)
setBalance(t, "1005", 100)
setBalance(t, "2001", 100)
setBalance(t, "2002", 100)
setBalance(t, "2003", 100)
setBalance(t, "2004", 100)
setBalance(t, "2005", 100)
type sessionParams struct {
port int
account string
destination string
initUsage time.Duration
updates []time.Duration
}
// 2 sessions per account, all run concurrently when -async flag is set.
// 18s + 10s = 28s total per account, expected balance: 100 - 28 = 72
sessions := []sessionParams{
{2080, "1001", "1099", 10 * time.Second, []time.Duration{5 * time.Second, 3 * time.Second}},
{2080, "1001", "1099", 5 * time.Second, []time.Duration{3 * time.Second, 2 * time.Second}},
{2080, "1002", "1099", 10 * time.Second, []time.Duration{5 * time.Second, 3 * time.Second}},
{2080, "1002", "1099", 5 * time.Second, []time.Duration{3 * time.Second, 2 * time.Second}},
{2080, "1003", "1099", 10 * time.Second, []time.Duration{5 * time.Second, 3 * time.Second}},
{2080, "1003", "1099", 5 * time.Second, []time.Duration{3 * time.Second, 2 * time.Second}},
{2080, "1004", "1099", 10 * time.Second, []time.Duration{5 * time.Second, 3 * time.Second}},
{2080, "1004", "1099", 5 * time.Second, []time.Duration{3 * time.Second, 2 * time.Second}},
{2080, "1005", "1099", 10 * time.Second, []time.Duration{5 * time.Second, 3 * time.Second}},
{2080, "1005", "1099", 5 * time.Second, []time.Duration{3 * time.Second, 2 * time.Second}},
{2180, "2001", "2099", 10 * time.Second, []time.Duration{5 * time.Second, 3 * time.Second}},
{2180, "2001", "2099", 5 * time.Second, []time.Duration{3 * time.Second, 2 * time.Second}},
{2180, "2002", "2099", 10 * time.Second, []time.Duration{5 * time.Second, 3 * time.Second}},
{2180, "2002", "2099", 5 * time.Second, []time.Duration{3 * time.Second, 2 * time.Second}},
{2180, "2003", "2099", 10 * time.Second, []time.Duration{5 * time.Second, 3 * time.Second}},
{2180, "2003", "2099", 5 * time.Second, []time.Duration{3 * time.Second, 2 * time.Second}},
{2180, "2004", "2099", 10 * time.Second, []time.Duration{5 * time.Second, 3 * time.Second}},
{2180, "2004", "2099", 5 * time.Second, []time.Duration{3 * time.Second, 2 * time.Second}},
{2180, "2005", "2099", 10 * time.Second, []time.Duration{5 * time.Second, 3 * time.Second}},
{2180, "2005", "2099", 5 * time.Second, []time.Duration{3 * time.Second, 2 * time.Second}},
}
if *async {
var wg sync.WaitGroup
for _, s := range sessions {
wg.Add(1)
go func(sp sessionParams) {
defer wg.Done()
runHTTPSession(t, sp.port, sp.account, sp.destination, sp.initUsage, sp.updates...)
}(s)
}
wg.Wait()
} else {
for _, s := range sessions {
runHTTPSession(t, s.port, s.account, s.destination, s.initUsage, s.updates...)
}
}
checkBalance(t, "1001", 72)
checkBalance(t, "1002", 72)
checkBalance(t, "1003", 72)
checkBalance(t, "1004", 72)
checkBalance(t, "1005", 72)
checkBalance(t, "2001", 72)
checkBalance(t, "2002", 72)
checkBalance(t, "2003", 72)
checkBalance(t, "2004", 72)
checkBalance(t, "2005", 72)
}

View File

@@ -0,0 +1,384 @@
//go:build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>
*/
package general_tests
import (
"fmt"
"io"
"net/http"
"net/url"
"testing"
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
// TestDispatcherSessionsHTTP tests HTTPAgent + Sessions + Dispatcher in the same engine,
// with round-robin routing to multiple RALs engines via HTTP requests.
func TestDispatcherSessionsHTTP(t *testing.T) {
cfgMain := `{
"listen": {
"rpc_json": "127.0.0.1:2012",
"rpc_gob": "127.0.0.1:2013",
"http": "127.0.0.1:2080"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"http_agent": [
{
"id": "HTTPAgent",
"url": "/sessions",
"sessions_conns": ["*internal"],
"request_payload": "*url",
"reply_payload": "*xml",
"request_processors": [
{
"id": "auth",
"filters": ["*string:~*req.request_type:auth"],
"flags": ["*authorize", "*accounts"],
"request_fields": [
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice"},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.session_id"},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*prepaid"},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org"},
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call"},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account"},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.account"},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination"},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*constant", "value": "*now"},
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage"},
{"tag": "RouteID", "path": "*cgreq.*route_id", "type": "*variable", "value": "~*req.account"}
],
"reply_fields": [
{"tag": "MaxUsage", "path": "*rep.MaxUsage", "type": "*variable", "value": "~*cgrep.MaxUsage{*duration_seconds}"}
]
},
{
"id": "init",
"filters": ["*string:~*req.request_type:init"],
"flags": ["*initiate", "*accounts"],
"request_fields": [
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice"},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.session_id"},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*prepaid"},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org"},
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call"},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account"},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.account"},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination"},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*constant", "value": "*now"},
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*constant", "value": "*now"},
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage"},
{"tag": "RouteID", "path": "*cgreq.*route_id", "type": "*variable", "value": "~*req.account"}
],
"reply_fields": [
{"tag": "MaxUsage", "path": "*rep.MaxUsage", "type": "*variable", "value": "~*cgrep.MaxUsage{*duration_seconds}"}
]
},
{
"id": "update",
"filters": ["*string:~*req.request_type:update"],
"flags": ["*update", "*accounts"],
"request_fields": [
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice"},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.session_id"},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*prepaid"},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org"},
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call"},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account"},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.account"},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination"},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*constant", "value": "*now"},
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*constant", "value": "*now"},
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage"},
{"tag": "RouteID", "path": "*cgreq.*route_id", "type": "*variable", "value": "~*req.account"}
],
"reply_fields": [
{"tag": "MaxUsage", "path": "*rep.MaxUsage", "type": "*variable", "value": "~*cgrep.MaxUsage{*duration_seconds}"}
]
},
{
"id": "terminate",
"filters": ["*string:~*req.request_type:terminate"],
"flags": ["*terminate", "*accounts", "*cdrs"],
"request_fields": [
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice"},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.session_id"},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*prepaid"},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org"},
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call"},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account"},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.account"},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination"},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*constant", "value": "*now"},
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*constant", "value": "*now"},
{"tag": "RouteID", "path": "*cgreq.*route_id", "type": "*variable", "value": "~*req.account"}
],
"reply_fields": [
{"tag": "Status", "path": "*rep.Status", "type": "*constant", "value": "OK"}
]
}
]
}
],
"sessions": {
"enabled": true,
"chargers_conns": ["*internal"],
"rals_conns": ["*localhost"],
"cdrs_conns": ["*localhost"]
},
"dispatchers": {
"enabled": true
},
"chargers": {
"enabled": true
}
}`
cfgRALs1 := `{
"listen": {
"rpc_json": "127.0.0.1:4012",
"rpc_gob": "127.0.0.1:4013",
"http": "127.0.0.1:4080"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rals": {
"enabled": true
},
"cdrs": {
"enabled": true,
"chargers_conns": ["*internal"],
"rals_conns": ["*internal"]
},
"chargers": {
"enabled": true
},
"apiers": {
"enabled": true
}
}`
cfgRALs2 := `{
"listen": {
"rpc_json": "127.0.0.1:5012",
"rpc_gob": "127.0.0.1:5013",
"http": "127.0.0.1:5080"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rals": {
"enabled": true
},
"cdrs": {
"enabled": true,
"chargers_conns": ["*internal"],
"rals_conns": ["*internal"]
},
"chargers": {
"enabled": true
},
"apiers": {
"enabled": true
}
}`
tpFiles := map[string]string{
utils.DestinationRatesCsv: `#Id,DestinationId,RatesTag,RoundingMethod,RoundingDecimals,MaxCost,MaxCostStrategy
DR_ANY,*any,RT_ANY,*up,0,0,`,
utils.RatesCsv: `#Id,ConnectFee,Rate,RateUnit,RateIncrement,GroupIntervalStart
RT_ANY,0,1,1s,1s,0s`,
utils.RatingPlansCsv: `#Id,DestinationRatesId,TimingTag,Weight
RP_ANY,DR_ANY,*any,10`,
utils.RatingProfilesCsv: `#Tenant,Category,Subject,ActivationTime,RatingPlanId,RatesFallbackSubject
cgrates.org,call,1001,2014-01-14T00:00:00Z,RP_ANY,
cgrates.org,call,1002,2014-01-14T00:00:00Z,RP_ANY,
cgrates.org,call,1003,2014-01-14T00:00:00Z,RP_ANY,`,
utils.ChargersCsv: `#Tenant,ID,FilterIDs,ActivationInterval,RunID,AttributeIDs,Weight
cgrates.org,DEFAULT,,,*default,*none,0`,
utils.DispatcherHostsCsv: `#Tenant,ID,Address,Transport,TLS
cgrates.org,RALS1,127.0.0.1:4012,*json,false
cgrates.org,RALS2,127.0.0.1:5012,*json,false`,
utils.DispatcherProfilesCsv: `#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight
cgrates.org,DSP_RALS,,,,*round_robin,,RALS1,,10,false,,10
cgrates.org,DSP_RALS,,,,,,RALS2,,10,,,`,
}
// Start RALs1 first and load tariff plan data through it.
// We cannot load via Main engine because Dispatchers is enabled there and
// intercepts all API calls (including APIerSv1.LoadTariffPlanFromFolder),
// causing them to fail.
// All engines share the same Redis/MySQL, so the data will be available
// everywhere anyway.
ngRALs1 := TestEnvironment{
ConfigJSON: cfgRALs1,
TpFiles: tpFiles,
}
clientRALs1, _ := ngRALs1.Setup(t, 0)
ngRALs2 := TestEnvironment{
ConfigJSON: cfgRALs2,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngRALs2.Setup(t, 0)
ngMain := TestEnvironment{
ConfigJSON: cfgMain,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngMain.Setup(t, 0)
httpC := &http.Client{}
var sessionNo int
sendRequest := func(t *testing.T, reqType, sessionID, account, destination, usage string) string {
t.Helper()
// GET also works since HTTPAgent's *url payload uses FormValue()
//
// reqUrl := fmt.Sprintf(
// "http://127.0.0.1:2080/sessions?request_type=%s&session_id=%s&account=%s&destination=%s&usage=%s",
// reqType, sessionID, account, destination, usage)
// resp, err := httpC.Get(reqUrl)
resp, err := httpC.PostForm("http://127.0.0.1:2080/sessions",
url.Values{
"request_type": {reqType},
"session_id": {sessionID},
"account": {account},
"destination": {destination},
"usage": {usage},
})
if err != nil {
t.Fatalf("HTTP request failed: %v", err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
t.Fatalf("HTTP request returned %d: %s", resp.StatusCode, string(body))
}
return string(body)
}
runHTTPSession := func(t *testing.T, account, destination string, initUsage time.Duration, updateUsages ...time.Duration) {
t.Helper()
sessionNo++
sessionID := fmt.Sprintf("session_%s_%d", account, sessionNo)
sendRequest(t, "auth", sessionID, account, destination, initUsage.String())
sendRequest(t, "init", sessionID, account, destination, initUsage.String())
for _, u := range updateUsages {
sendRequest(t, "update", sessionID, account, destination, u.String())
}
sendRequest(t, "terminate", sessionID, account, destination, "")
}
setBalance := func(t *testing.T, acc string, value float64) {
t.Helper()
// Use clientRALs1 for API calls since Main engine's Dispatcher would intercept them.
var reply string
if err := clientRALs1.Call(utils.APIerSv2SetBalance,
utils.AttrSetBalance{
Tenant: "cgrates.org",
Account: acc,
Value: value,
BalanceType: utils.MONETARY,
Balance: map[string]any{utils.ID: "test"},
}, &reply); err != nil {
t.Fatal(err)
}
}
checkBalance := func(t *testing.T, acc string, want float64) {
t.Helper()
var acnt engine.Account
if err := clientRALs1.Call(utils.APIerSv2GetAccount,
&utils.AttrGetAccount{
Tenant: "cgrates.org",
Account: acc,
}, &acnt); err != nil {
t.Fatalf("GetAccount failed: %v", err)
}
if bal := acnt.BalanceMap[utils.MONETARY][0]; bal == nil {
t.Errorf("balance not found for account %q", acc)
} else if bal.Value != want {
t.Errorf("account %q balance = %v, want %v", acc, bal.Value, want)
}
}
setBalance(t, "1001", 100)
setBalance(t, "1002", 100)
setBalance(t, "1003", 100)
// First cycle: 10s + 5s + 3s = 18s per account
runHTTPSession(t, "1001", "1099", 10*time.Second, 5*time.Second, 3*time.Second)
runHTTPSession(t, "1002", "1099", 10*time.Second, 5*time.Second, 3*time.Second)
runHTTPSession(t, "1003", "1099", 10*time.Second, 5*time.Second, 3*time.Second)
// Second cycle: 5s + 3s + 2s = 10s per account
runHTTPSession(t, "1001", "1099", 5*time.Second, 3*time.Second, 2*time.Second)
runHTTPSession(t, "1002", "1099", 5*time.Second, 3*time.Second, 2*time.Second)
runHTTPSession(t, "1003", "1099", 5*time.Second, 3*time.Second, 2*time.Second)
checkBalance(t, "1001", 72) // 100 - 18 - 10
checkBalance(t, "1002", 72)
checkBalance(t, "1003", 72)
}

View File

@@ -0,0 +1,417 @@
//go:build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>
*/
package general_tests
import (
"fmt"
"testing"
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
)
func TestDspSessionsPrepaidRoundRobin(t *testing.T) {
cfgMain := `{
"listen": {
"rpc_json": "127.0.0.1:2012",
"rpc_gob": "127.0.0.1:2013",
"http": "127.0.0.1:2080"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rpc_conns": {
"conn_dsp": {
"conns": [{"address": "127.0.0.1:3012", "transport":"*json"}]
}
},
"sessions": {
"enabled": true,
"chargers_conns": ["*internal"],
"rals_conns": ["conn_dsp"],
"cdrs_conns": ["conn_dsp"]
},
"chargers": {
"enabled": true
},
"apiers": {
"enabled": true
}
}`
cfgDispatcher := `{
"listen": {
"rpc_json": "127.0.0.1:3012",
"rpc_gob": "127.0.0.1:3013",
"http": "127.0.0.1:3080"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"dispatchers": {
"enabled": true
}
}`
cfgRALs1 := `{
"listen": {
"rpc_json": "127.0.0.1:4012",
"rpc_gob": "127.0.0.1:4013",
"http": "127.0.0.1:4080"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rals": {
"enabled": true
},
"cdrs": {
"enabled": true,
"chargers_conns": ["*internal"],
"rals_conns": ["*internal"]
},
"chargers": {
"enabled": true
},
"apiers": {
"enabled": true
}
}`
cfgRALs2 := `{
"listen": {
"rpc_json": "127.0.0.1:5012",
"rpc_gob": "127.0.0.1:5013",
"http": "127.0.0.1:5080"
},
"data_db": {
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_type": "*mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
},
"rals": {
"enabled": true
},
"cdrs": {
"enabled": true,
"chargers_conns": ["*internal"],
"rals_conns": ["*internal"]
},
"chargers": {
"enabled": true
},
"apiers": {
"enabled": true
}
}`
tpFiles := map[string]string{
utils.DestinationRatesCsv: `#Id,DestinationId,RatesTag,RoundingMethod,RoundingDecimals,MaxCost,MaxCostStrategy
DR_ANY,*any,RT_ANY,*up,0,0,`,
utils.RatesCsv: `#Id,ConnectFee,Rate,RateUnit,RateIncrement,GroupIntervalStart
RT_ANY,0,1,1s,1s,0s`,
utils.RatingPlansCsv: `#Id,DestinationRatesId,TimingTag,Weight
RP_ANY,DR_ANY,*any,10`,
utils.RatingProfilesCsv: `#Tenant,Category,Subject,ActivationTime,RatingPlanId,RatesFallbackSubject
cgrates.org,call,1001,2014-01-14T00:00:00Z,RP_ANY,
cgrates.org,call,1002,2014-01-14T00:00:00Z,RP_ANY,
cgrates.org,call,1003,2014-01-14T00:00:00Z,RP_ANY,`,
utils.ChargersCsv: `#Tenant,ID,FilterIDs,ActivationInterval,RunID,AttributeIDs,Weight
cgrates.org,DEFAULT,,,*default,*none,0`,
utils.DispatcherHostsCsv: `#Tenant,ID,Address,Transport,TLS
cgrates.org,RALS1,127.0.0.1:4012,*json,false
cgrates.org,RALS2,127.0.0.1:5012,*json,false`,
// empty Subsystems field matches any subsystem
utils.DispatcherProfilesCsv: `#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight
cgrates.org,DSP_RALS,,,,*round_robin,,RALS1,,10,false,,10
cgrates.org,DSP_RALS,,,,,,RALS2,,10,,,`,
}
ngMain := TestEnvironment{
ConfigJSON: cfgMain,
TpFiles: tpFiles,
}
clientMain, _ := ngMain.Setup(t, 0)
ngRALs1 := TestEnvironment{
ConfigJSON: cfgRALs1,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngRALs1.Setup(t, 0)
ngRALs2 := TestEnvironment{
ConfigJSON: cfgRALs2,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngRALs2.Setup(t, 0)
ngDispatcher := TestEnvironment{
ConfigJSON: cfgDispatcher,
PreserveDataDB: true,
PreserveStorDB: true,
}
ngDispatcher.Setup(t, 0)
var (
sessionNo = 0
balanceID = "test"
)
runPrepaidSession := func(t *testing.T, account, destination string, initUsage time.Duration, updateUsages ...time.Duration) {
t.Helper()
sessionNo++
originID := fmt.Sprintf("session_%s_%d", account, sessionNo)
setupTime := time.Date(2024, time.January, 1, 12, 0, 0, 0, time.UTC)
answerTime := time.Date(2024, time.January, 1, 12, 0, 10, 0, time.UTC)
// AuthorizeEvent extracts RouteID from the event via *route_id field.
authArgs := &sessions.V1AuthorizeArgs{
GetMaxUsage: true,
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: utils.GenUUID(),
Event: map[string]any{
utils.OriginID: originID,
utils.Tenant: "cgrates.org",
utils.Category: "call",
utils.ToR: utils.VOICE,
utils.RequestType: utils.META_PREPAID,
utils.Account: account,
utils.Subject: account,
utils.Destination: destination,
utils.SetupTime: setupTime,
utils.Usage: initUsage,
utils.MetaRouteID: account,
},
},
}
var authRpl sessions.V1AuthorizeReply
if err := clientMain.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authRpl); err != nil {
t.Fatalf("AuthorizeEvent failed: %v", err)
}
// InitiateSession stores ArgDispatcher on the session for subsequent calls.
initArgs := &sessions.V1InitSessionArgs{
InitSession: true,
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: utils.GenUUID(),
Event: map[string]any{
utils.OriginID: originID,
utils.Tenant: "cgrates.org",
utils.Category: "call",
utils.ToR: utils.VOICE,
utils.RequestType: utils.META_PREPAID,
utils.Account: account,
utils.Subject: account,
utils.Destination: destination,
utils.SetupTime: setupTime,
utils.AnswerTime: answerTime,
utils.Usage: initUsage,
},
},
ArgDispatcher: &utils.ArgDispatcher{
RouteID: utils.StringPointer(account),
},
}
var initRpl sessions.V1InitSessionReply
if err := clientMain.Call(utils.SessionSv1InitiateSession, initArgs, &initRpl); err != nil {
t.Fatalf("InitiateSession failed: %v", err)
}
for i, updateUsage := range updateUsages {
updateArgs := &sessions.V1UpdateSessionArgs{
UpdateSession: true,
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: utils.GenUUID(),
Event: map[string]any{
utils.OriginID: originID,
utils.Tenant: "cgrates.org",
utils.Category: "call",
utils.ToR: utils.VOICE,
utils.RequestType: utils.META_PREPAID,
utils.Account: account,
utils.Subject: account,
utils.Destination: destination,
utils.SetupTime: setupTime,
utils.AnswerTime: answerTime,
utils.Usage: updateUsage,
},
},
}
var updateRpl sessions.V1UpdateSessionReply
if err := clientMain.Call(utils.SessionSv1UpdateSession, updateArgs, &updateRpl); err != nil {
t.Fatalf("UpdateSession %d failed: %v", i+1, err)
}
}
termArgs := &sessions.V1TerminateSessionArgs{
TerminateSession: true,
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: utils.GenUUID(),
Event: map[string]any{
utils.OriginID: originID,
utils.Tenant: "cgrates.org",
utils.Category: "call",
utils.ToR: utils.VOICE,
utils.RequestType: utils.META_PREPAID,
utils.Account: account,
utils.Subject: account,
utils.Destination: destination,
utils.SetupTime: setupTime,
utils.AnswerTime: answerTime,
},
},
}
var termRpl string
if err := clientMain.Call(utils.SessionSv1TerminateSession, termArgs, &termRpl); err != nil {
t.Fatalf("TerminateSession failed: %v", err)
}
// ProcessCDR requires explicit ArgDispatcher since it doesn't depend on the session.
cdrArgs := &utils.CGREventWithArgDispatcher{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: utils.GenUUID(),
Event: map[string]any{
utils.OriginID: originID,
utils.Tenant: "cgrates.org",
utils.Category: "call",
utils.ToR: utils.VOICE,
utils.RequestType: utils.META_PREPAID,
utils.Account: account,
utils.Subject: account,
utils.Destination: destination,
utils.SetupTime: setupTime,
utils.AnswerTime: answerTime,
utils.Usage: 0,
},
},
ArgDispatcher: &utils.ArgDispatcher{
RouteID: utils.StringPointer(account),
},
}
var cdrRpl string
if err := clientMain.Call(utils.SessionSv1ProcessCDR, cdrArgs, &cdrRpl); err != nil {
t.Fatalf("ProcessCDR failed: %v", err)
}
}
setBalance := func(t *testing.T, acc string, value float64) {
t.Helper()
var reply string
if err := clientMain.Call(utils.APIerSv2SetBalance,
utils.AttrSetBalance{
Tenant: "cgrates.org",
Account: acc,
Value: value,
BalanceType: utils.MONETARY,
Balance: map[string]any{
utils.ID: balanceID,
},
}, &reply); err != nil {
t.Fatal(err)
}
}
checkBalance := func(t *testing.T, acc string, want float64) {
t.Helper()
var acnt engine.Account
if err := clientMain.Call(utils.APIerSv2GetAccount,
&utils.AttrGetAccount{
Tenant: "cgrates.org",
Account: acc,
}, &acnt); err != nil {
t.Fatalf("GetAccount failed: %v", err)
}
if bal := acnt.BalanceMap[utils.MONETARY][0]; bal == nil {
t.Errorf("balance not found for account %q", acc)
} else if bal.Value != want {
t.Errorf("account %q balance = %v, want %v", acc, bal.Value, want)
}
}
setBalance(t, "1001", 100)
setBalance(t, "1002", 100)
setBalance(t, "1003", 100)
// First cycle establishes route bindings per account.
runPrepaidSession(t, "1001", "1099", 10*time.Second, 5*time.Second, 3*time.Second)
runPrepaidSession(t, "1002", "1099", 10*time.Second, 5*time.Second, 3*time.Second)
runPrepaidSession(t, "1003", "1099", 10*time.Second, 5*time.Second, 3*time.Second)
// Second cycle verifies same account hits the same engine.
runPrepaidSession(t, "1001", "1099", 5*time.Second, 3*time.Second, 2*time.Second)
runPrepaidSession(t, "1002", "1099", 5*time.Second, 3*time.Second, 2*time.Second)
runPrepaidSession(t, "1003", "1099", 5*time.Second, 3*time.Second, 2*time.Second)
checkBalance(t, "1001", 72) // 100 - 18 - 10
checkBalance(t, "1002", 72)
checkBalance(t, "1003", 72)
}

View File

@@ -50,11 +50,13 @@ func newRPCClient(cfg *config.ListenCfg) (c *rpc.Client, err error) {
// TestEnvironment holds the setup parameters and configurations
// required for running integration tests.
type TestEnvironment struct {
ConfigPath string // file path to the main configuration file
ConfigJSON string // contains the configuration JSON content if ConfigPath is missing
TpPath string // specifies the path to the tariff plans
TpFiles map[string]string // maps CSV filenames to their content for tariff plan loading
LogBuffer io.Writer // captures the log output of the test environment
ConfigPath string // file path to the main configuration file
ConfigJSON string // contains the configuration JSON content if ConfigPath is missing
TpPath string // specifies the path to the tariff plans
TpFiles map[string]string // maps CSV filenames to their content for tariff plan loading
PreserveDataDB bool // prevents automatic data_db flush when set
PreserveStorDB bool // prevents automatic stor_db flush when set
LogBuffer io.Writer // captures the log output of the test environment
}
// Setup initializes the testing environment using the provided configuration. It loads the configuration
@@ -76,7 +78,7 @@ func (env TestEnvironment) Setup(t *testing.T, engineDelay int) (*rpc.Client, *c
cfg, env.ConfigPath = initCfg(t, env.ConfigJSON)
}
flushDBs(t, cfg, true, true)
flushDBs(t, cfg, !env.PreserveDataDB, !env.PreserveStorDB)
startEngine(t, cfg, env.ConfigPath, engineDelay, env.LogBuffer)
client, err := newRPCClient(cfg.ListenCfg())