mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Improve dispatcher tests (incomplete)
This commit is contained in:
committed by
Dan Christian Bogos
parent
1ab7f80d50
commit
796b508353
@@ -1,53 +0,0 @@
|
||||
{
|
||||
|
||||
"general": {
|
||||
"node_id": "HOST1",
|
||||
"log_level": 7
|
||||
},
|
||||
|
||||
"listen": {
|
||||
"rpc_json": ":2012",
|
||||
"rpc_gob": ":2013",
|
||||
"http": ":2080"
|
||||
},
|
||||
|
||||
"data_db": {
|
||||
"db_type": "redis",
|
||||
"db_port": 6379,
|
||||
"db_name": "10"
|
||||
},
|
||||
|
||||
"stor_db": {
|
||||
"db_password": "CGRateS.org"
|
||||
},
|
||||
|
||||
"dispatchers":{
|
||||
"enabled": true,
|
||||
"prevent_loop": true
|
||||
},
|
||||
|
||||
"caches":{
|
||||
"partitions": {
|
||||
"*dispatcher_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false},
|
||||
"*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false},
|
||||
"*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false}
|
||||
},
|
||||
//"replication_conns": ["gob_cache"] ,
|
||||
"remote_conns": ["gob_cache"]
|
||||
},
|
||||
|
||||
"rpc_conns": {
|
||||
"gob_cache": {
|
||||
"strategy": "*first",
|
||||
"conns": [
|
||||
{"address": "127.0.0.1:4013", "transport":"*gob"}
|
||||
]
|
||||
}
|
||||
},
|
||||
|
||||
"apiers": {
|
||||
"enabled": true
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -1,52 +0,0 @@
|
||||
{
|
||||
|
||||
"general": {
|
||||
"node_id": "HOST2",
|
||||
"log_level": 7
|
||||
},
|
||||
|
||||
"listen": {
|
||||
"rpc_json": ":4012",
|
||||
"rpc_gob": ":4013",
|
||||
"http": ":4080"
|
||||
},
|
||||
|
||||
"data_db": {
|
||||
"db_type": "redis",
|
||||
"db_port": 6379,
|
||||
"db_name": "10"
|
||||
},
|
||||
|
||||
"stor_db": {
|
||||
"db_password": "CGRateS.org"
|
||||
},
|
||||
|
||||
"dispatchers":{
|
||||
"enabled": true,
|
||||
"prevent_loop": true
|
||||
},
|
||||
|
||||
"caches":{
|
||||
"partitions": {
|
||||
"*dispatcher_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false},
|
||||
"*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false},
|
||||
"*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false}
|
||||
},
|
||||
//"replication_conns": ["gob_cache"],
|
||||
"remote_conns": ["gob_cache"]
|
||||
},
|
||||
|
||||
"apiers": {
|
||||
"enabled": true
|
||||
},
|
||||
|
||||
"rpc_conns": {
|
||||
"gob_cache": {
|
||||
"strategy": "*first",
|
||||
"conns": [
|
||||
{"address": "127.0.0.1:4013", "transport":"*gob"}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,44 +0,0 @@
|
||||
{
|
||||
|
||||
"general": {
|
||||
"node_id": "DispatcherOpts_Setter",
|
||||
"log_level": 7
|
||||
},
|
||||
|
||||
"listen": {
|
||||
"rpc_json": ":6012",
|
||||
"rpc_gob": ":6013",
|
||||
"http": ":6080"
|
||||
},
|
||||
|
||||
"data_db": {
|
||||
"db_type": "redis",
|
||||
"db_port": 6379,
|
||||
"db_name": "10"
|
||||
},
|
||||
|
||||
"stor_db": {
|
||||
"db_password": "CGRateS.org"
|
||||
},
|
||||
|
||||
"dispatchers":{
|
||||
"enabled": false
|
||||
},
|
||||
|
||||
"apiers": {
|
||||
"enabled": true,
|
||||
"caches_conns":["broadcast_cache"]
|
||||
},
|
||||
|
||||
"rpc_conns": {
|
||||
"broadcast_cache": {
|
||||
"strategy": "*broadcast",
|
||||
"conns": [
|
||||
{"address": "127.0.0.1:2012", "transport":"*json"},
|
||||
{"address": "127.0.0.1:4012", "transport":"*json"},
|
||||
{"address": "127.0.0.1:6012", "transport":"*json"}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,830 +0,0 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package general_tests
|
||||
|
||||
import (
|
||||
"path"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
var (
|
||||
setterCfgPath string
|
||||
cfg2CfgPath string
|
||||
cfg1CfgPath string
|
||||
setterCfg *config.CGRConfig
|
||||
cfg2OptsCfg *config.CGRConfig
|
||||
cfg1Cfg *config.CGRConfig
|
||||
setterRPC *birpc.Client
|
||||
cgr2RPC *birpc.Client
|
||||
cgr1RPC *birpc.Client
|
||||
cfg1ConfigDIR string
|
||||
cfg2ConfigDIR string
|
||||
setterConfigDIR string
|
||||
dpsOptsTest = []func(t *testing.T){
|
||||
// FIRST PART OF THE TEST
|
||||
// Start engine with Dispatcher on engine 2012
|
||||
testDispatcherCgr1InitCfg,
|
||||
testDispatcherCgr1InitDataDb,
|
||||
testDispatcherCgr1StartEngine,
|
||||
testDispatcherCgr1RPCConn,
|
||||
|
||||
// Sending Status requests in both engines, with *dispatchers:false
|
||||
testDispatcherCgr2InitCfg,
|
||||
testDispatcherCgr2StartEngine,
|
||||
testDispatcherCgr2RPCConn,
|
||||
|
||||
testDispatcherCgr1CoreStatus, // *disaptchers:false
|
||||
testDispatcherCgr2CoreStatus, // *disaptchers:false
|
||||
|
||||
testDispatcherGetItemBothEnginesFirstAttempt, // NOT FOUND
|
||||
|
||||
testDispatcherCgr1StopEngine,
|
||||
testDispatcherCgr2StopEngine,
|
||||
|
||||
// SECOND PART OF THE TEST
|
||||
// START HOST2 engine
|
||||
|
||||
testDispatcherSetterInitCfg,
|
||||
testDispatcherSetterStartEngine,
|
||||
testDispatcherSetterRPCConn,
|
||||
|
||||
testDispatcherCgr2StartEngine,
|
||||
testDispatcherCgr2RPCConn,
|
||||
|
||||
testDispatcherSetterSetDispatcherProfile, // contains both hosts, HOST1 prio, host2 backup
|
||||
|
||||
testDispatcherCgr2CoreStatusWithRouteID, // HOST2 matched because HOST1 is not started yet
|
||||
testDispatcherCgr2GetItemHOST2,
|
||||
|
||||
// START HOST1 engine
|
||||
testDispatcherCgr1StartEngine,
|
||||
testDispatcherCgr1RPCConn,
|
||||
testDispatcherCgr1CoreStatusWithRouteIDSecondAttempt, // same HOST2 will be matched, due to routeID
|
||||
|
||||
// clear cache in order to remove routeID
|
||||
testDisaptcherCacheClear,
|
||||
testDispatcherCgr1CoreStatusWithRouteIDButHost1, // due to clearing cache, HOST1 will be matched
|
||||
|
||||
// verify cache of dispatchers, SetDispatcherProfile API should reload the dispatchers cache (instance, profile and route)
|
||||
testDispatcherCgr1CheckCacheAfterRouting,
|
||||
testDispatcherSetterSetDispatcherProfileOverwrite,
|
||||
testDispatcherCheckCacheAfterSetDispatcherDSP1,
|
||||
testDispatcherSetterSetAnotherProifle, //DSP2
|
||||
testDispatcherCheckCacheAfterSetDispatcherDSP1, //we set DSP2, so for DSP1 nothing changed
|
||||
testDispatcherCheckCacheAfterSetDispatcherDSP2, //NOT_FOUND for every get, cause it was not used that profile before
|
||||
|
||||
testDispatcherCgr1StopEngine,
|
||||
testDispatcherCgr2StopEngine,
|
||||
}
|
||||
)
|
||||
|
||||
func TestDispatcherOpts(t *testing.T) {
|
||||
for _, test := range dpsOptsTest {
|
||||
t.Run("dispatcher-opts", test)
|
||||
}
|
||||
}
|
||||
|
||||
func testDispatcherCgr1InitCfg(t *testing.T) {
|
||||
cfg1ConfigDIR = "dispatcher_opts_host1"
|
||||
var err error
|
||||
cfg1CfgPath = path.Join(*utils.DataDir, "conf", "samples", cfg1ConfigDIR)
|
||||
cfg1Cfg, err = config.NewCGRConfigFromPath(cfg1CfgPath)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testDispatcherCgr1InitDataDb(t *testing.T) {
|
||||
if err := engine.InitDataDb(cfg1Cfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Start CGR Engine woth Dispatcher enabled
|
||||
func testDispatcherCgr1StartEngine(t *testing.T) {
|
||||
if _, err := engine.StartEngine(cfg1CfgPath, *utils.WaitRater); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testDispatcherCgr1RPCConn(t *testing.T) {
|
||||
var err error
|
||||
cgr1RPC, err = newRPCClient(cfg1Cfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testDispatcherCgr2InitCfg(t *testing.T) {
|
||||
cfg2ConfigDIR = "dispatcher_opts_host2" //changed with the cfg with dispatcher on
|
||||
var err error
|
||||
cfg2CfgPath = path.Join(*utils.DataDir, "conf", "samples", cfg2ConfigDIR)
|
||||
cfg2OptsCfg, err = config.NewCGRConfigFromPath(cfg2CfgPath)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Start CGR Engine woth Dispatcher enabled
|
||||
func testDispatcherCgr2StartEngine(t *testing.T) {
|
||||
if _, err := engine.StartEngine(cfg2CfgPath, *utils.WaitRater); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testDispatcherCgr2RPCConn(t *testing.T) {
|
||||
var err error
|
||||
cgr2RPC, err = newRPCClient(cfg2OptsCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testDispatcherCgr1CoreStatus(t *testing.T) {
|
||||
// HOST1 host matched :2012
|
||||
var reply map[string]any
|
||||
ev := utils.TenantWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
APIOpts: map[string]any{
|
||||
utils.OptsRouteID: "account#dan.bogos",
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
}
|
||||
if err := cgr1RPC.Call(context.Background(), utils.CoreSv1Status, &ev, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply[utils.NodeID] != "HOST1" {
|
||||
t.Errorf("Expected HOST1, received %v", reply[utils.NodeID])
|
||||
}
|
||||
}
|
||||
|
||||
func testDispatcherCgr2CoreStatus(t *testing.T) {
|
||||
// HOST2 host matched because it was called from engine with port :4012 -> host2
|
||||
var reply map[string]any
|
||||
ev := utils.TenantWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
APIOpts: map[string]any{
|
||||
utils.OptsRouteID: "account#dan.bogos",
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
}
|
||||
if err := cgr2RPC.Call(context.Background(), utils.CoreSv1Status, &ev, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply[utils.NodeID] != "HOST2" {
|
||||
t.Errorf("Expected HOST2, received %v", reply[utils.NodeID])
|
||||
}
|
||||
}
|
||||
|
||||
func testDispatcherGetItemBothEnginesFirstAttempt(t *testing.T) {
|
||||
// get for *dispatcher_routes
|
||||
argsCache := &utils.ArgsGetCacheItemWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
ArgsGetCacheItem: utils.ArgsGetCacheItem{
|
||||
CacheID: utils.CacheDispatcherRoutes,
|
||||
ItemID: "account#dan.bogos:*core",
|
||||
},
|
||||
}
|
||||
var reply any
|
||||
if err := cgr2RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache,
|
||||
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Error(err)
|
||||
}
|
||||
if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache,
|
||||
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
// get for *dispatcher_profiles
|
||||
argsCache = &utils.ArgsGetCacheItemWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
ArgsGetCacheItem: utils.ArgsGetCacheItem{
|
||||
CacheID: utils.CacheDispatcherProfiles,
|
||||
ItemID: "cgrates.org:DSP1",
|
||||
},
|
||||
}
|
||||
if err := cgr2RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache,
|
||||
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Error(err)
|
||||
}
|
||||
if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache,
|
||||
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
// get for *dispatchers
|
||||
argsCache = &utils.ArgsGetCacheItemWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
ArgsGetCacheItem: utils.ArgsGetCacheItem{
|
||||
CacheID: utils.CacheDispatchers,
|
||||
ItemID: "cgrates.org:DSP1",
|
||||
},
|
||||
}
|
||||
if err := cgr2RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache,
|
||||
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Error(err)
|
||||
}
|
||||
if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache,
|
||||
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testDispatcherSetterInitCfg(t *testing.T) {
|
||||
setterConfigDIR = "dispatcher_opts_setter"
|
||||
var err error
|
||||
setterCfgPath = path.Join(*utils.DataDir, "conf", "samples", setterConfigDIR)
|
||||
setterCfg, err = config.NewCGRConfigFromPath(setterCfgPath)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testDispatcherSetterStartEngine(t *testing.T) {
|
||||
if _, err := engine.StartEngine(setterCfgPath, *utils.WaitRater); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testDispatcherSetterRPCConn(t *testing.T) {
|
||||
var err error
|
||||
setterRPC, err = newRPCClient(setterCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testDispatcherSetterSetDispatcherProfile(t *testing.T) {
|
||||
// Set DispatcherHost
|
||||
var replyStr string
|
||||
setDispatcherHost := &engine.DispatcherHostWithAPIOpts{
|
||||
DispatcherHost: &engine.DispatcherHost{
|
||||
Tenant: "cgrates.org",
|
||||
RemoteHost: &config.RemoteHost{
|
||||
ID: "HOST1",
|
||||
Address: "127.0.0.1:2012", // CGR1
|
||||
Transport: "*json",
|
||||
ConnectAttempts: 1,
|
||||
Reconnects: 3,
|
||||
ConnectTimeout: time.Minute,
|
||||
ReplyTimeout: 2 * time.Minute,
|
||||
},
|
||||
},
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
}
|
||||
if err := setterRPC.Call(context.Background(), utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil {
|
||||
t.Error("Unexpected error when calling APIerSv1.SetDispatcherHost: ", err)
|
||||
} else if replyStr != utils.OK {
|
||||
t.Error("Unexpected reply returned", replyStr)
|
||||
}
|
||||
|
||||
setDispatcherHost = &engine.DispatcherHostWithAPIOpts{
|
||||
DispatcherHost: &engine.DispatcherHost{
|
||||
Tenant: "cgrates.org",
|
||||
RemoteHost: &config.RemoteHost{
|
||||
ID: "HOST2",
|
||||
Address: "127.0.0.1:4012", // CGR2
|
||||
Transport: "*json",
|
||||
ConnectAttempts: 1,
|
||||
Reconnects: 3,
|
||||
ConnectTimeout: time.Minute,
|
||||
ReplyTimeout: 2 * time.Minute,
|
||||
},
|
||||
},
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
}
|
||||
if err := setterRPC.Call(context.Background(), utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil {
|
||||
t.Error("Unexpected error when calling APIerSv1.SetDispatcherHost: ", err)
|
||||
} else if replyStr != utils.OK {
|
||||
t.Error("Unexpected reply returned", replyStr)
|
||||
}
|
||||
|
||||
// Set DispatcherProfile
|
||||
setDispatcherProfile := &engine.DispatcherProfileWithAPIOpts{
|
||||
DispatcherProfile: &engine.DispatcherProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "DSP1",
|
||||
Strategy: "*weight",
|
||||
Weight: 10,
|
||||
Subsystems: []string{utils.MetaAny},
|
||||
Hosts: engine.DispatcherHostProfiles{
|
||||
{
|
||||
ID: "HOST1",
|
||||
Weight: 10,
|
||||
},
|
||||
{
|
||||
ID: "HOST2",
|
||||
Weight: 5,
|
||||
},
|
||||
},
|
||||
},
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
}
|
||||
if err := setterRPC.Call(context.Background(), utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil {
|
||||
t.Error("Unexpected error when calling APIerSv1.SetDispatcherProfile: ", err)
|
||||
} else if replyStr != utils.OK {
|
||||
t.Error("Unexpected reply returned", replyStr)
|
||||
}
|
||||
}
|
||||
|
||||
func testDispatcherCgr2CoreStatusWithRouteID(t *testing.T) {
|
||||
var reply map[string]any
|
||||
ev := utils.TenantWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
APIOpts: map[string]any{
|
||||
utils.OptsRouteID: "account#dan.bogos",
|
||||
},
|
||||
}
|
||||
// even if HOST1 is prio, this engine was not staretd yet, so HOST2 matched
|
||||
if err := cgr2RPC.Call(context.Background(), utils.CoreSv1Status, &ev, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply[utils.NodeID] != "HOST2" {
|
||||
t.Errorf("Expected HOST2, received %v", reply[utils.NodeID])
|
||||
}
|
||||
}
|
||||
|
||||
func testDispatcherCgr1CoreStatusWithRouteIDSecondAttempt(t *testing.T) {
|
||||
var reply map[string]any
|
||||
ev := utils.TenantWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
APIOpts: map[string]any{
|
||||
utils.OptsRouteID: "account#dan.bogos",
|
||||
},
|
||||
}
|
||||
// same HOST2 will be matched, due to routeID
|
||||
if err := cgr1RPC.Call(context.Background(), utils.CoreSv1Status, &ev, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply[utils.NodeID] != "HOST2" {
|
||||
t.Errorf("Expected HOST2, received %v", reply[utils.NodeID])
|
||||
}
|
||||
}
|
||||
|
||||
func testDispatcherCgr2GetItemHOST2(t *testing.T) {
|
||||
// get for *dispatcher_routes
|
||||
argsCache := &utils.ArgsGetCacheItemWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
ArgsGetCacheItem: utils.ArgsGetCacheItem{
|
||||
CacheID: utils.CacheDispatcherRoutes,
|
||||
ItemID: "account#dan.bogos:*core",
|
||||
},
|
||||
}
|
||||
var reply any
|
||||
if err := cgr2RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache,
|
||||
&reply); err != nil {
|
||||
t.Error(err)
|
||||
} else {
|
||||
expected := map[string]any{
|
||||
utils.Tenant: "cgrates.org",
|
||||
utils.ProfileID: "DSP1",
|
||||
"HostID": "HOST2",
|
||||
}
|
||||
if !reflect.DeepEqual(expected, reply) {
|
||||
t.Errorf("Expected %+v, \n received %+v", utils.ToJSON(expected), utils.ToJSON(reply))
|
||||
}
|
||||
}
|
||||
|
||||
// get for *dispatcher_profiles
|
||||
argsCache = &utils.ArgsGetCacheItemWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
ArgsGetCacheItem: utils.ArgsGetCacheItem{
|
||||
CacheID: utils.CacheDispatcherProfiles,
|
||||
ItemID: "cgrates.org:DSP1",
|
||||
},
|
||||
}
|
||||
if err := cgr2RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache,
|
||||
&reply); err != nil {
|
||||
t.Error(err)
|
||||
} else {
|
||||
expected := map[string]any{
|
||||
utils.FilterIDs: nil,
|
||||
"Hosts": []any{
|
||||
map[string]any{
|
||||
utils.Blocker: false,
|
||||
utils.FilterIDs: nil,
|
||||
utils.ID: "HOST1",
|
||||
utils.Params: nil,
|
||||
utils.Weight: 10.,
|
||||
},
|
||||
map[string]any{
|
||||
utils.Blocker: false,
|
||||
utils.FilterIDs: nil,
|
||||
utils.ID: "HOST2",
|
||||
utils.Params: nil,
|
||||
utils.Weight: 5.,
|
||||
},
|
||||
},
|
||||
utils.ActivationIntervalString: nil,
|
||||
utils.ID: "DSP1",
|
||||
utils.Strategy: "*weight",
|
||||
utils.Subsystems: []any{"*any"},
|
||||
"StrategyParams": nil,
|
||||
utils.Tenant: "cgrates.org",
|
||||
utils.Weight: 10.,
|
||||
}
|
||||
if !reflect.DeepEqual(expected, reply) {
|
||||
t.Errorf("Expected %+v, \n received %+v", utils.ToJSON(expected), utils.ToJSON(reply))
|
||||
}
|
||||
}
|
||||
|
||||
// get for *dispatchers
|
||||
argsCache = &utils.ArgsGetCacheItemWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
ArgsGetCacheItem: utils.ArgsGetCacheItem{
|
||||
CacheID: utils.CacheDispatchers,
|
||||
ItemID: "cgrates.org:DSP1",
|
||||
},
|
||||
}
|
||||
// reply here is an interface type(singleResultDispatcher), it exists
|
||||
if err := cgr2RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache,
|
||||
&reply); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testDisaptcherCacheClear(t *testing.T) {
|
||||
var reply string
|
||||
if err := cgr1RPC.Call(context.Background(), utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
}, &reply); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("Unexpected reply returned")
|
||||
}
|
||||
|
||||
if err := cgr2RPC.Call(context.Background(), utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
}, &reply); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("Unexpected reply returned")
|
||||
}
|
||||
}
|
||||
|
||||
func testDispatcherCgr1CoreStatusWithRouteIDButHost1(t *testing.T) {
|
||||
var reply map[string]any
|
||||
ev := utils.TenantWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
APIOpts: map[string]any{
|
||||
utils.OptsRouteID: "account#dan.bogos",
|
||||
},
|
||||
}
|
||||
// as the cache was cleared, HOST1 will match due to his high prio, and it will be set as *dispatcher_routes as HOST1
|
||||
if err := cgr1RPC.Call(context.Background(), utils.CoreSv1Status, &ev, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply[utils.NodeID] != "HOST1" {
|
||||
t.Errorf("Expected HOST1, received %v", reply[utils.NodeID])
|
||||
}
|
||||
}
|
||||
|
||||
func testDispatcherCgr1CheckCacheAfterRouting(t *testing.T) {
|
||||
// get for *dispatcher_routes
|
||||
argsCache := &utils.ArgsGetCacheItemWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
ArgsGetCacheItem: utils.ArgsGetCacheItem{
|
||||
CacheID: utils.CacheDispatcherRoutes,
|
||||
ItemID: "account#dan.bogos:*core",
|
||||
},
|
||||
}
|
||||
var reply any
|
||||
if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache,
|
||||
&reply); err != nil {
|
||||
t.Error(err)
|
||||
} else {
|
||||
expected := map[string]any{
|
||||
utils.Tenant: "cgrates.org",
|
||||
utils.ProfileID: "DSP1",
|
||||
"HostID": "HOST1",
|
||||
}
|
||||
if !reflect.DeepEqual(expected, reply) {
|
||||
t.Errorf("Expected %+v, \n received %+v", utils.ToJSON(expected), utils.ToJSON(reply))
|
||||
}
|
||||
}
|
||||
|
||||
// get for *dispatcher_profiles
|
||||
argsCache = &utils.ArgsGetCacheItemWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
ArgsGetCacheItem: utils.ArgsGetCacheItem{
|
||||
CacheID: utils.CacheDispatcherProfiles,
|
||||
ItemID: "cgrates.org:DSP1",
|
||||
},
|
||||
}
|
||||
if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache,
|
||||
&reply); err != nil {
|
||||
t.Error(err)
|
||||
} else {
|
||||
expected := map[string]any{
|
||||
utils.ActivationIntervalString: nil,
|
||||
utils.FilterIDs: nil,
|
||||
"Hosts": []any{
|
||||
map[string]any{
|
||||
utils.Blocker: false,
|
||||
utils.FilterIDs: nil,
|
||||
utils.ID: "HOST1",
|
||||
utils.Params: nil,
|
||||
utils.Weight: 10.,
|
||||
},
|
||||
map[string]any{
|
||||
utils.Blocker: false,
|
||||
utils.FilterIDs: nil,
|
||||
utils.ID: "HOST2",
|
||||
utils.Params: nil,
|
||||
utils.Weight: 5.,
|
||||
},
|
||||
},
|
||||
utils.ID: "DSP1",
|
||||
utils.Strategy: "*weight",
|
||||
utils.Subsystems: []any{"*any"},
|
||||
"StrategyParams": nil,
|
||||
utils.Tenant: "cgrates.org",
|
||||
utils.Weight: 10.,
|
||||
}
|
||||
if !reflect.DeepEqual(expected, reply) {
|
||||
t.Errorf("Expected %+v, \n received %+v", utils.ToJSON(expected), utils.ToJSON(reply))
|
||||
}
|
||||
}
|
||||
|
||||
// get for *dispatchers
|
||||
argsCache = &utils.ArgsGetCacheItemWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
ArgsGetCacheItem: utils.ArgsGetCacheItem{
|
||||
CacheID: utils.CacheDispatchers,
|
||||
ItemID: "cgrates.org:DSP1",
|
||||
},
|
||||
}
|
||||
// reply here is an interface type(singleResultDispatcher), it exists
|
||||
if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache,
|
||||
&reply); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testDispatcherSetterSetDispatcherProfileOverwrite(t *testing.T) {
|
||||
// as the cache was cleared, and previously the HOST1 matched, setting the profile with only HOST2 will remove the
|
||||
// DispatcherRoutes, DispatcherProfile and the DispatcherInstance
|
||||
var replyStr string
|
||||
// Set DispatcherProfile
|
||||
setDispatcherProfile := &engine.DispatcherProfileWithAPIOpts{
|
||||
DispatcherProfile: &engine.DispatcherProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "DSP1",
|
||||
Strategy: "*weight",
|
||||
Weight: 10,
|
||||
Subsystems: []string{utils.MetaAny},
|
||||
Hosts: engine.DispatcherHostProfiles{
|
||||
{
|
||||
ID: "HOST2",
|
||||
Weight: 5,
|
||||
},
|
||||
},
|
||||
},
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
}
|
||||
if err := setterRPC.Call(context.Background(), utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil {
|
||||
t.Error("Unexpected error when calling APIerSv1.SetDispatcherProfile: ", err)
|
||||
} else if replyStr != utils.OK {
|
||||
t.Error("Unexpected reply returned", replyStr)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
func testDispatcherCheckCacheAfterSetDispatcherDSP1(t *testing.T) {
|
||||
// get for *dispatcher_routes
|
||||
argsCache := &utils.ArgsGetCacheItemWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
"adi3": "nu",
|
||||
},
|
||||
ArgsGetCacheItem: utils.ArgsGetCacheItem{
|
||||
CacheID: utils.CacheDispatcherRoutes,
|
||||
ItemID: "account#dan.bogos:*core",
|
||||
},
|
||||
}
|
||||
var reply any // Should receive NOT_FOUND, as CallCache that was called in API will remove the DispatcherRoute
|
||||
if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache,
|
||||
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Errorf("Unexpected error returned: %v", err)
|
||||
}
|
||||
|
||||
// get for *dispatcher_profiles
|
||||
argsCache = &utils.ArgsGetCacheItemWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
"adi2": "nu",
|
||||
},
|
||||
ArgsGetCacheItem: utils.ArgsGetCacheItem{
|
||||
CacheID: utils.CacheDispatcherProfiles,
|
||||
ItemID: "cgrates.org:DSP1",
|
||||
},
|
||||
}
|
||||
// as the DSP1 profile was overwritten, only HOST2 in profile will be contained
|
||||
if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache,
|
||||
&reply); err != nil {
|
||||
t.Error(err)
|
||||
} else {
|
||||
expected := map[string]any{
|
||||
utils.FilterIDs: nil,
|
||||
"Hosts": []any{
|
||||
map[string]any{
|
||||
utils.Blocker: false,
|
||||
utils.FilterIDs: nil,
|
||||
utils.ID: "HOST2",
|
||||
utils.Params: nil,
|
||||
utils.Weight: 5.,
|
||||
},
|
||||
},
|
||||
"ActivationInterval": nil,
|
||||
"Subsystems": []any{"*any"},
|
||||
utils.ID: "DSP1",
|
||||
utils.Strategy: "*weight",
|
||||
"StrategyParams": nil,
|
||||
utils.Tenant: "cgrates.org",
|
||||
utils.Weight: 10.,
|
||||
}
|
||||
if !reflect.DeepEqual(expected, reply) {
|
||||
t.Errorf("Expected %+v, \n received %+v", utils.ToJSON(expected), utils.ToJSON(reply))
|
||||
}
|
||||
}
|
||||
|
||||
// get for *dispatchers
|
||||
argsCache = &utils.ArgsGetCacheItemWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
"adi1": "nu",
|
||||
},
|
||||
ArgsGetCacheItem: utils.ArgsGetCacheItem{
|
||||
CacheID: utils.CacheDispatchers,
|
||||
ItemID: "cgrates.org:DSP1",
|
||||
},
|
||||
}
|
||||
// DispatcherInstance should also be removed, so it will be NOT_FOUND
|
||||
if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache,
|
||||
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Errorf("Unexpected error returned: %v and reply: %v", err, reply)
|
||||
}
|
||||
}
|
||||
|
||||
func testDispatcherSetterSetAnotherProifle(t *testing.T) {
|
||||
var replyStr string
|
||||
// Set DispatcherProfile DSP2 with the existing hosts
|
||||
setDispatcherProfile := &engine.DispatcherProfileWithAPIOpts{
|
||||
DispatcherProfile: &engine.DispatcherProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "DSP2",
|
||||
Strategy: "*weight",
|
||||
Weight: 20,
|
||||
Subsystems: []string{utils.MetaAny},
|
||||
Hosts: engine.DispatcherHostProfiles{
|
||||
{
|
||||
ID: "HOST1",
|
||||
Weight: 50,
|
||||
},
|
||||
{
|
||||
ID: "HOST2",
|
||||
Weight: 125,
|
||||
},
|
||||
},
|
||||
},
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
}
|
||||
if err := setterRPC.Call(context.Background(), utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil {
|
||||
t.Error("Unexpected error when calling APIerSv1.SetDispatcherProfile: ", err)
|
||||
} else if replyStr != utils.OK {
|
||||
t.Error("Unexpected reply returned", replyStr)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
func testDispatcherCheckCacheAfterSetDispatcherDSP2(t *testing.T) {
|
||||
// get for *dispatcher_routes
|
||||
argsCache := &utils.ArgsGetCacheItemWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
ArgsGetCacheItem: utils.ArgsGetCacheItem{
|
||||
CacheID: utils.CacheDispatcherRoutes,
|
||||
ItemID: "account#dan.bogos:*core",
|
||||
},
|
||||
}
|
||||
var reply any
|
||||
// NOT_FOUND
|
||||
if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache,
|
||||
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Errorf("Unexpected error returned: %v", err)
|
||||
}
|
||||
|
||||
// get for *dispatcher_profiles
|
||||
argsCache = &utils.ArgsGetCacheItemWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
ArgsGetCacheItem: utils.ArgsGetCacheItem{
|
||||
CacheID: utils.CacheDispatcherProfiles,
|
||||
ItemID: "cgrates.org:DSP2",
|
||||
},
|
||||
}
|
||||
// NOT_FOUND
|
||||
if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache,
|
||||
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Errorf("Unexpected error returned: %v", err)
|
||||
}
|
||||
|
||||
// get for *dispatchers
|
||||
argsCache = &utils.ArgsGetCacheItemWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
ArgsGetCacheItem: utils.ArgsGetCacheItem{
|
||||
CacheID: utils.CacheDispatchers,
|
||||
ItemID: "cgrates.org:DSP2",
|
||||
},
|
||||
}
|
||||
// NOT_FOUND
|
||||
if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache,
|
||||
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Errorf("Unexpected error returned: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func testDispatcherCgr1StopEngine(t *testing.T) {
|
||||
if err := engine.KillEngine(*utils.WaitRater); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testDispatcherCgr2StopEngine(t *testing.T) {
|
||||
if err := engine.KillEngine(*utils.WaitRater); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
465
general_tests/dsp_routes_it_test.go
Normal file
465
general_tests/dsp_routes_it_test.go
Normal file
@@ -0,0 +1,465 @@
|
||||
//go:build integration
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package general_tests
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
host1Cfg = `{
|
||||
"general": {
|
||||
"node_id": "host1",
|
||||
"log_level": 7
|
||||
},
|
||||
"listen": {
|
||||
"rpc_json": ":4012",
|
||||
"rpc_gob": ":4013",
|
||||
"http": ":4080"
|
||||
},
|
||||
"dispatchers":{
|
||||
"enabled": true,
|
||||
"prevent_loop": true
|
||||
},
|
||||
"caches":{
|
||||
"partitions": {
|
||||
"*dispatcher_profiles": {
|
||||
"limit": -1,
|
||||
"remote":true
|
||||
},
|
||||
"*dispatcher_routes": {
|
||||
"limit": -1,
|
||||
"remote":true
|
||||
},
|
||||
"*dispatchers": {
|
||||
"limit": -1,
|
||||
"remote":true
|
||||
}
|
||||
},
|
||||
"remote_conns": ["gob_cache"]
|
||||
},
|
||||
"rpc_conns": {
|
||||
"gob_cache": {
|
||||
"strategy": "*first",
|
||||
"conns": [
|
||||
{
|
||||
"address": "127.0.0.1:6013",
|
||||
"transport": "*gob"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"apiers": {
|
||||
"enabled": true
|
||||
}
|
||||
}`
|
||||
host2Cfg = `{
|
||||
"general": {
|
||||
"node_id": "host2",
|
||||
"log_level": 7
|
||||
},
|
||||
"listen": {
|
||||
"rpc_json": ":6012",
|
||||
"rpc_gob": ":6013",
|
||||
"http": ":6080"
|
||||
},
|
||||
"dispatchers":{
|
||||
"enabled": true,
|
||||
"prevent_loop": true
|
||||
},
|
||||
"caches":{
|
||||
"partitions": {
|
||||
"*dispatcher_profiles": {
|
||||
"limit": -1,
|
||||
"remote":true
|
||||
},
|
||||
"*dispatcher_routes": {
|
||||
"limit": -1,
|
||||
"remote":true
|
||||
},
|
||||
"*dispatchers": {
|
||||
"limit": -1,
|
||||
"remote":true
|
||||
}
|
||||
},
|
||||
"remote_conns": ["gob_cache"]
|
||||
},
|
||||
"apiers": {
|
||||
"enabled": true
|
||||
},
|
||||
"rpc_conns": {
|
||||
"gob_cache": {
|
||||
"strategy": "*first",
|
||||
"conns": [
|
||||
{
|
||||
"address": "127.0.0.1:6013",
|
||||
"transport":"*gob"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}`
|
||||
hostSetterCfg = `{
|
||||
"general": {
|
||||
"node_id": "setter",
|
||||
"log_level": 7
|
||||
},
|
||||
"apiers": {
|
||||
"enabled": true,
|
||||
"caches_conns": ["broadcast_cache"]
|
||||
},
|
||||
"rpc_conns": {
|
||||
"broadcast_cache": {
|
||||
"strategy": "*broadcast",
|
||||
"conns": [
|
||||
{
|
||||
"address": "127.0.0.1:2012",
|
||||
"transport": "*json"
|
||||
},
|
||||
{
|
||||
"address": "127.0.0.1:4012",
|
||||
"transport": "*json"
|
||||
},
|
||||
{
|
||||
"address": "127.0.0.1:6012",
|
||||
"transport": "*json"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}`
|
||||
)
|
||||
|
||||
func TestDispatcherRoutesNotFound(t *testing.T) {
|
||||
switch *utils.DBType {
|
||||
case utils.MetaInternal:
|
||||
case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres:
|
||||
t.SkipNow()
|
||||
default:
|
||||
t.Fatal("unsupported dbtype value")
|
||||
}
|
||||
|
||||
host1 := TestEngine{ // first engine, port 4012
|
||||
ConfigJSON: host1Cfg,
|
||||
}
|
||||
ng1Client, _ := host1.Run(t)
|
||||
host2 := TestEngine{ // second engine, port 6012
|
||||
ConfigJSON: host2Cfg,
|
||||
}
|
||||
ng2Client, _ := host2.Run(t)
|
||||
|
||||
// Send Status requests with *dispatchers on false.
|
||||
checkStatus(t, ng1Client, false, "account#dan.bogos", "host1")
|
||||
checkStatus(t, ng2Client, false, "account#dan.bogos", "host2")
|
||||
|
||||
// Check that dispatcher routes were not cached due to *dispatchers being false.
|
||||
getCacheItem(t, ng1Client, false, utils.CacheDispatcherRoutes, "account#dan.bogos:*core", nil)
|
||||
getCacheItem(t, ng1Client, false, utils.CacheDispatcherRoutes, "account#dan.bogos:*core", nil)
|
||||
}
|
||||
|
||||
func TestDispatcherRoutes(t *testing.T) {
|
||||
switch *utils.DBType {
|
||||
case utils.MetaInternal:
|
||||
case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres:
|
||||
t.SkipNow()
|
||||
default:
|
||||
t.Fatal("unsupported dbtype value")
|
||||
}
|
||||
|
||||
setter := TestEngine{ // engine used to set dispatcher hosts/profiles (:2012)
|
||||
ConfigJSON: hostSetterCfg,
|
||||
}
|
||||
setterClient, _ := setter.Run(t)
|
||||
|
||||
// Starting only the second dispatcher engine, for now.
|
||||
host2 := TestEngine{
|
||||
ConfigJSON: host2Cfg,
|
||||
PreserveDataDB: true,
|
||||
PreserveStorDB: true,
|
||||
}
|
||||
ng2Client, _ := host2.Run(t)
|
||||
|
||||
setDispatcherHost(t, setterClient, "host1", 4012)
|
||||
setDispatcherHost(t, setterClient, "host2", 6012)
|
||||
setDispatcherProfile(t, setterClient, "dsp_test", "host1;10", "host2;5")
|
||||
|
||||
// Send status request to the second engine. "host2" will match, even though "host1" has the bigger weight.
|
||||
// That's because the first engine has not been started yet.
|
||||
checkStatus(t, ng2Client, true, "account#dan.bogos", "host2")
|
||||
|
||||
// Check that the dispatcher route has been cached (same for the profile and the dispatcher itself).
|
||||
getCacheItem(t, ng2Client, false, utils.CacheDispatcherRoutes, "account#dan.bogos:*core", map[string]any{
|
||||
utils.Tenant: "cgrates.org",
|
||||
utils.ProfileID: "dsp_test",
|
||||
"HostID": "host2",
|
||||
})
|
||||
getCacheItem(t, ng2Client, false, utils.CacheDispatcherProfiles, "cgrates.org:dsp_test", map[string]any{
|
||||
utils.FilterIDs: nil,
|
||||
"Hosts": []any{
|
||||
map[string]any{
|
||||
utils.Blocker: false,
|
||||
utils.FilterIDs: nil,
|
||||
utils.ID: "host1",
|
||||
utils.Params: nil,
|
||||
utils.Weight: 10.,
|
||||
},
|
||||
map[string]any{
|
||||
utils.Blocker: false,
|
||||
utils.FilterIDs: nil,
|
||||
utils.ID: "host2",
|
||||
utils.Params: nil,
|
||||
utils.Weight: 5.,
|
||||
},
|
||||
},
|
||||
utils.ActivationIntervalString: nil,
|
||||
utils.ID: "dsp_test",
|
||||
utils.Strategy: "*weight",
|
||||
utils.Subsystems: []any{"*any"},
|
||||
"StrategyParams": nil,
|
||||
utils.Tenant: "cgrates.org",
|
||||
utils.Weight: 0.,
|
||||
})
|
||||
|
||||
// Reply represents a singleResultDispatcher. Unexported, so it's enough to check if it exists.
|
||||
getCacheItem(t, ng2Client, false, utils.CacheDispatchers, "cgrates.org:dsp_test", map[string]any{})
|
||||
|
||||
// Start the first engine.
|
||||
host1 := TestEngine{
|
||||
ConfigJSON: host1Cfg,
|
||||
PreserveDataDB: true,
|
||||
PreserveStorDB: true,
|
||||
}
|
||||
ng1Client, _ := host1.Run(t)
|
||||
|
||||
// "host2" will match again due to being cached previously.
|
||||
checkStatus(t, ng1Client, true, "account#dan.bogos", "host2")
|
||||
|
||||
// Clear cache and try again.
|
||||
clearCache(t, ng1Client)
|
||||
clearCache(t, ng2Client)
|
||||
|
||||
// This time it will match "host1" which has the bigger weight.
|
||||
checkStatus(t, ng1Client, true, "account#dan.bogos", "host1")
|
||||
|
||||
// Check the relevant cache items. Should be the same as before, the difference being the HostID
|
||||
// from *dispatcher_routes ("host1" instead of "host2").
|
||||
getCacheItem(t, ng1Client, false, utils.CacheDispatcherRoutes, "account#dan.bogos:*core",
|
||||
map[string]any{
|
||||
utils.Tenant: "cgrates.org",
|
||||
utils.ProfileID: "dsp_test",
|
||||
"HostID": "host1",
|
||||
})
|
||||
getCacheItem(t, ng1Client, false, utils.CacheDispatcherProfiles, "cgrates.org:dsp_test",
|
||||
map[string]any{
|
||||
utils.ActivationIntervalString: nil,
|
||||
utils.FilterIDs: nil,
|
||||
"Hosts": []any{
|
||||
map[string]any{
|
||||
utils.Blocker: false,
|
||||
utils.FilterIDs: nil,
|
||||
utils.ID: "host1",
|
||||
utils.Params: nil,
|
||||
utils.Weight: 10.,
|
||||
},
|
||||
map[string]any{
|
||||
utils.Blocker: false,
|
||||
utils.FilterIDs: nil,
|
||||
utils.ID: "host2",
|
||||
utils.Params: nil,
|
||||
utils.Weight: 5.,
|
||||
},
|
||||
},
|
||||
utils.ID: "dsp_test",
|
||||
utils.Strategy: "*weight",
|
||||
utils.Subsystems: []any{"*any"},
|
||||
"StrategyParams": nil,
|
||||
utils.Tenant: "cgrates.org",
|
||||
utils.Weight: 0.,
|
||||
})
|
||||
getCacheItem(t, ng1Client, false, utils.CacheDispatchers, "cgrates.org:dsp_test", map[string]any{})
|
||||
|
||||
// Overwrite the DispatcherProfile (removed host1).
|
||||
setDispatcherProfile(t, setterClient, "dsp_test", "host2;5")
|
||||
time.Sleep(5 * time.Millisecond) // wait for cache updates to reach all external engines
|
||||
|
||||
// Check that related cache items have been updated automatically.
|
||||
|
||||
// Check that cache dispatcher route/ dispatcher instance was cleared,
|
||||
// as previously "host1" matched (which is now removed).
|
||||
getCacheItem(t, ng1Client, false, utils.CacheDispatcherRoutes, "account#dan.bogos:*core", nil)
|
||||
getCacheItem(t, ng1Client, false, utils.CacheDispatcherProfiles, "cgrates.org:dsp_test",
|
||||
map[string]any{
|
||||
utils.ActivationIntervalString: nil,
|
||||
utils.FilterIDs: nil,
|
||||
"Hosts": []any{
|
||||
map[string]any{
|
||||
utils.Blocker: false,
|
||||
utils.FilterIDs: nil,
|
||||
utils.ID: "host2",
|
||||
utils.Params: nil,
|
||||
utils.Weight: 5.,
|
||||
},
|
||||
},
|
||||
utils.ID: "dsp_test",
|
||||
utils.Strategy: "*weight",
|
||||
utils.Subsystems: []any{"*any"},
|
||||
"StrategyParams": nil,
|
||||
utils.Tenant: "cgrates.org",
|
||||
utils.Weight: 0.,
|
||||
})
|
||||
getCacheItem(t, ng1Client, false, utils.CacheDispatchers, "cgrates.org:dsp_test", nil)
|
||||
|
||||
// Nothing happens when setting a different dispatcher profile that's using the same hosts as before.
|
||||
setDispatcherProfile(t, setterClient, "dsp_test2", "host1;50", "host2;150")
|
||||
getCacheItem(t, ng1Client, false, utils.CacheDispatcherRoutes, "account#dan.bogos:*core", nil)
|
||||
getCacheItem(t, ng1Client, false, utils.CacheDispatcherProfiles, "cgrates.org:dsp_test2", nil)
|
||||
getCacheItem(t, ng1Client, false, utils.CacheDispatchers, "cgrates.org:dsp_test2", nil)
|
||||
}
|
||||
|
||||
func checkStatus(t *testing.T, client *birpc.Client, dispatch bool, routeID, expNodeID string) {
|
||||
t.Helper()
|
||||
args := &utils.TenantWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
APIOpts: map[string]any{
|
||||
utils.OptsRouteID: routeID,
|
||||
utils.MetaDispatchers: dispatch,
|
||||
},
|
||||
}
|
||||
var reply map[string]any
|
||||
if err := client.Call(context.Background(), utils.CoreSv1Status, args, &reply); err != nil {
|
||||
t.Errorf("CoreSv1.Status unexpected err: %v", err)
|
||||
} else if nodeID := reply[utils.NodeID]; nodeID != expNodeID {
|
||||
t.Errorf("CoreSv1.Status NodeID=%q, want %q", nodeID, expNodeID)
|
||||
}
|
||||
}
|
||||
|
||||
func getCacheItem(t *testing.T, client *birpc.Client, dispatch bool, cacheID, itemID string, expItem any) {
|
||||
t.Helper()
|
||||
args := &utils.ArgsGetCacheItemWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
ArgsGetCacheItem: utils.ArgsGetCacheItem{
|
||||
CacheID: cacheID,
|
||||
ItemID: itemID,
|
||||
},
|
||||
}
|
||||
if !dispatch {
|
||||
args.APIOpts = map[string]any{
|
||||
utils.MetaDispatchers: dispatch,
|
||||
}
|
||||
}
|
||||
var reply any
|
||||
err := client.Call(context.Background(), utils.CacheSv1GetItem, args, &reply)
|
||||
if expItem != nil {
|
||||
if err != nil {
|
||||
t.Fatalf("CacheSv1.GetItem unexpected err: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(reply, expItem) {
|
||||
t.Errorf("CacheSv1.GetItem = %s, want %s", utils.ToJSON(reply), utils.ToJSON(expItem))
|
||||
}
|
||||
return
|
||||
}
|
||||
if err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Errorf("CacheSv1.GetItem err=%v, want %v", err, utils.ErrNotFound)
|
||||
}
|
||||
}
|
||||
|
||||
func clearCache(t *testing.T, client *birpc.Client) {
|
||||
t.Helper()
|
||||
var reply string
|
||||
if err := client.Call(context.Background(), utils.CacheSv1Clear,
|
||||
&utils.AttrCacheIDsWithAPIOpts{
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
}, &reply); err != nil {
|
||||
t.Fatalf("CacheSv1.Clear unexpected err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func setDispatcherHost(t *testing.T, client *birpc.Client, id string, port int) {
|
||||
t.Helper()
|
||||
var reply string
|
||||
if err := client.Call(context.Background(), utils.APIerSv1SetDispatcherHost,
|
||||
&engine.DispatcherHostWithAPIOpts{
|
||||
DispatcherHost: &engine.DispatcherHost{
|
||||
Tenant: "cgrates.org",
|
||||
RemoteHost: &config.RemoteHost{
|
||||
ID: id,
|
||||
Address: fmt.Sprintf("127.0.0.1:%d", port),
|
||||
Transport: "*json",
|
||||
ConnectAttempts: 1,
|
||||
Reconnects: 3,
|
||||
ConnectTimeout: time.Second,
|
||||
ReplyTimeout: 2 * time.Second,
|
||||
},
|
||||
},
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
}, &reply); err != nil {
|
||||
t.Errorf("APIerSv1.SetDispatcherHost unexpected err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func setDispatcherProfile(t *testing.T, client *birpc.Client, id string, hosts ...string) {
|
||||
t.Helper()
|
||||
hostPrfs := make(engine.DispatcherHostProfiles, 0, len(hosts))
|
||||
for _, host := range hosts {
|
||||
host, weightStr, found := strings.Cut(host, ";")
|
||||
if !found {
|
||||
t.Fatal("hosts don't respect the 'host;weight' format")
|
||||
}
|
||||
weight, err := strconv.ParseFloat(weightStr, 64)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
hostPrfs = append(hostPrfs, &engine.DispatcherHostProfile{
|
||||
ID: host,
|
||||
Weight: weight,
|
||||
})
|
||||
}
|
||||
|
||||
var reply string
|
||||
if err := client.Call(context.Background(), utils.APIerSv1SetDispatcherProfile, &engine.DispatcherProfileWithAPIOpts{
|
||||
DispatcherProfile: &engine.DispatcherProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: id,
|
||||
Strategy: "*weight",
|
||||
Subsystems: []string{utils.MetaAny},
|
||||
Hosts: hostPrfs,
|
||||
},
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaDispatchers: false,
|
||||
},
|
||||
}, &reply); err != nil {
|
||||
t.Errorf("APIerSv1.SetDispatcherProfile unexpected err: %v", err)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user