Files
cgrates/dispatchers/libdispatcher_test.go
2025-10-29 19:42:40 +01:00

1049 lines
34 KiB
Go

/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>
*/
package dispatchers
import (
"net/rpc"
"reflect"
"testing"
"time"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
"github.com/google/go-cmp/cmp"
)
func TestLoadMetricsGetHosts(t *testing.T) {
dhp := engine.DispatcherHostProfiles{
{ID: "DSP_1", Params: map[string]any{utils.MetaRatio: 1}},
{ID: "DSP_2", Params: map[string]any{utils.MetaRatio: 1}},
{ID: "DSP_3", Params: map[string]any{utils.MetaRatio: 1}},
{ID: "DSP_4", Params: map[string]any{utils.MetaRatio: 1}},
{ID: "DSP_5", Params: map[string]any{utils.MetaRatio: 1}},
}
lm, err := newLoadMetrics(dhp, 1)
if err != nil {
t.Fatal(err)
}
hostsIDs := engine.DispatcherHostIDs(dhp.HostIDs())
// to prevent randomness we increment all loads exept the first one
for _, hst := range hostsIDs[1:] {
lm.incrementLoad(hst, utils.EmptyString)
}
// check only the first host because the rest may be in a random order
// because they share the same cost
if rply := lm.getHosts(dhp.Clone()); rply[0].ID != "DSP_1" {
t.Errorf("Expected: %q ,received: %q", "DSP_1", rply[0].ID)
}
lm.incrementLoad(hostsIDs[0], utils.EmptyString)
lm.decrementLoad(hostsIDs[1], utils.EmptyString)
if rply := lm.getHosts(dhp.Clone()); rply[0].ID != "DSP_2" {
t.Errorf("Expected: %q ,received: %q", "DSP_2", rply[0].ID)
}
for _, hst := range hostsIDs {
lm.incrementLoad(hst, utils.EmptyString)
}
if rply := lm.getHosts(dhp.Clone()); rply[0].ID != "DSP_2" {
t.Errorf("Expected: %q ,received: %q", "DSP_2", rply[0].ID)
}
}
func TestNewSingleDispatcher(t *testing.T) {
dhp := engine.DispatcherHostProfiles{
{ID: "DSP_1"},
{ID: "DSP_2"},
{ID: "DSP_3"},
{ID: "DSP_4"},
{ID: "DSP_5"},
}
var exp Dispatcher = &singleResultDispatcher{hosts: dhp}
if rply, err := newSingleDispatcher(dhp, map[string]any{}, utils.EmptyString, nil); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(exp, rply) {
t.Errorf("Expected: singleResultDispatcher structure,received: %s", utils.ToJSON(rply))
}
dhp = engine.DispatcherHostProfiles{
{ID: "DSP_1"},
{ID: "DSP_2"},
{ID: "DSP_3"},
{ID: "DSP_4"},
{ID: "DSP_5", Params: map[string]any{utils.MetaRatio: 1}},
}
exp = &loadDispatcher{
hosts: dhp,
tntID: "cgrates.org",
defaultRatio: 1,
}
if rply, err := newSingleDispatcher(dhp, map[string]any{}, "cgrates.org", nil); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(exp, rply) {
t.Errorf("Expected: loadDispatcher structure,received: %s", utils.ToJSON(rply))
}
dhp = engine.DispatcherHostProfiles{
{ID: "DSP_1"},
{ID: "DSP_2"},
{ID: "DSP_3"},
{ID: "DSP_4"},
}
exp = &loadDispatcher{
hosts: dhp,
tntID: "cgrates.org",
defaultRatio: 2,
}
if rply, err := newSingleDispatcher(dhp, map[string]any{utils.MetaDefaultRatio: 2}, "cgrates.org", nil); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(exp, rply) {
t.Errorf("Expected: loadDispatcher structure,received: %s", utils.ToJSON(rply))
}
exp = &loadDispatcher{
hosts: dhp,
tntID: "cgrates.org",
defaultRatio: 0,
}
if rply, err := newSingleDispatcher(dhp, map[string]any{utils.MetaDefaultRatio: 0}, "cgrates.org", nil); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(exp, rply) {
t.Errorf("Expected: loadDispatcher structure,received: %s", utils.ToJSON(rply))
}
if _, err := newSingleDispatcher(dhp, map[string]any{utils.MetaDefaultRatio: "A"}, "cgrates.org", nil); err == nil {
t.Fatalf("Expected error received: %v", err)
}
}
func TestNewLoadMetrics(t *testing.T) {
dhp := engine.DispatcherHostProfiles{
{ID: "DSP_1", Params: map[string]any{utils.MetaRatio: 1}},
{ID: "DSP_2", Params: map[string]any{utils.MetaRatio: 0}},
{ID: "DSP_3"},
}
exp := &LoadMetrics{
HostsLoad: map[string]int64{},
HostsRatio: map[string]int64{
"DSP_1": 1,
"DSP_2": 0,
"DSP_3": 2,
},
}
if lm, err := newLoadMetrics(dhp, 2); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(exp, lm) {
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(exp), utils.ToJSON(lm))
}
dhp = engine.DispatcherHostProfiles{
{ID: "DSP_1", Params: map[string]any{utils.MetaRatio: "A"}},
}
if _, err := newLoadMetrics(dhp, 2); err == nil {
t.Errorf("Expected error received: %v", err)
}
}
func TestLoadMetricsGetHosts2(t *testing.T) {
dhp := engine.DispatcherHostProfiles{
{ID: "DSP_1", Params: map[string]any{utils.MetaRatio: 2}},
{ID: "DSP_2", Params: map[string]any{utils.MetaRatio: 3}},
{ID: "DSP_3", Params: map[string]any{utils.MetaRatio: 1}},
{ID: "DSP_4", Params: map[string]any{utils.MetaRatio: 5}},
{ID: "DSP_5", Params: map[string]any{utils.MetaRatio: 1}},
{ID: "DSP_6", Params: map[string]any{utils.MetaRatio: 0}},
}
lm, err := newLoadMetrics(dhp, 1)
if err != nil {
t.Fatal(err)
}
hostsIDs := engine.DispatcherHostIDs(dhp.HostIDs())
exp := []string(hostsIDs.Clone())[:5]
if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) {
t.Errorf("Expected: %+v ,received: %+v", exp, rply)
}
for i := 0; i < 100; i++ {
for _, dh := range dhp {
for j := int64(0); j < lm.HostsRatio[dh.ID]; j++ {
if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) {
t.Errorf("Expected for id<%s>: %+v ,received: %+v", dh.ID, exp, rply)
}
lm.incrementLoad(dh.ID, utils.EmptyString)
}
exp = append(exp[1:], exp[0])
}
exp = []string{"DSP_1", "DSP_2", "DSP_3", "DSP_4", "DSP_5"}
if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) {
t.Errorf("Expected: %+v ,received: %+v", exp, rply)
}
lm.decrementLoad("DSP_4", utils.EmptyString)
lm.decrementLoad("DSP_4", utils.EmptyString)
lm.decrementLoad("DSP_2", utils.EmptyString)
exp = []string{"DSP_2", "DSP_4", "DSP_1", "DSP_3", "DSP_5"}
if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) {
t.Errorf("Expected: %+v ,received: %+v", exp, rply)
}
lm.incrementLoad("DSP_2", utils.EmptyString)
exp = []string{"DSP_4", "DSP_1", "DSP_2", "DSP_3", "DSP_5"}
if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) {
t.Errorf("Expected: %+v ,received: %+v", exp, rply)
}
lm.incrementLoad("DSP_4", utils.EmptyString)
if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) {
t.Errorf("Expected: %+v ,received: %+v", exp, rply)
}
lm.incrementLoad("DSP_4", utils.EmptyString)
exp = []string{"DSP_1", "DSP_2", "DSP_3", "DSP_4", "DSP_5"}
if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) {
t.Errorf("Expected: %+v ,received: %+v", exp, rply)
}
}
dhp = engine.DispatcherHostProfiles{
{ID: "DSP_1", Params: map[string]any{utils.MetaRatio: -1}},
{ID: "DSP_2", Params: map[string]any{utils.MetaRatio: 3}},
{ID: "DSP_3", Params: map[string]any{utils.MetaRatio: 1}},
{ID: "DSP_4", Params: map[string]any{utils.MetaRatio: 5}},
{ID: "DSP_5", Params: map[string]any{utils.MetaRatio: 1}},
{ID: "DSP_6", Params: map[string]any{utils.MetaRatio: 0}},
}
lm, err = newLoadMetrics(dhp, 1)
if err != nil {
t.Fatal(err)
}
hostsIDs = engine.DispatcherHostIDs(dhp.HostIDs())
exp = []string(hostsIDs.Clone())[:5]
if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) {
t.Errorf("Expected: %+v ,received: %+v", exp, rply)
}
for i := 0; i < 100; i++ {
if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) {
t.Errorf("Expected: %+v ,received: %+v", exp, rply)
}
lm.incrementLoad(exp[0], utils.EmptyString)
}
}
func TestLibDispatcherNewDispatcherMetaWeight(t *testing.T) {
pfl := &engine.DispatcherProfile{
Hosts: engine.DispatcherHostProfiles{},
Strategy: utils.MetaWeight,
}
result, err := newDispatcher(pfl)
if err != nil {
t.Errorf("\nExpected <nil>, \nReceived <%+v>", err)
}
expected := &singleResultDispatcher{
hosts: engine.DispatcherHostProfiles{},
sorter: new(noSort),
}
if !reflect.DeepEqual(result.(*singleResultDispatcher).hosts, expected.hosts) {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.hosts, result.(*singleResultDispatcher).hosts)
}
if !reflect.DeepEqual(result.(*singleResultDispatcher).sorter, expected.sorter) {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", result.(*singleResultDispatcher).sorter, expected.sorter)
}
}
func TestLibDispatcherNewDispatcherMetaWeightErr(t *testing.T) {
pfl := &engine.DispatcherProfile{
Hosts: engine.DispatcherHostProfiles{},
StrategyParams: map[string]any{
utils.MetaDefaultRatio: false,
},
Strategy: utils.MetaWeight,
}
_, err := newDispatcher(pfl)
expected := "cannot convert field<bool>: false to int"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
}
func TestLibDispatcherNewDispatcherMetaRandom(t *testing.T) {
pfl := &engine.DispatcherProfile{
Hosts: engine.DispatcherHostProfiles{},
Strategy: utils.MetaRandom,
}
result, err := newDispatcher(pfl)
if err != nil {
t.Errorf("\nExpected <nil>, \nReceived <%+v>", err)
}
expected := &singleResultDispatcher{
hosts: engine.DispatcherHostProfiles{},
sorter: new(randomSort),
}
if !reflect.DeepEqual(result.(*singleResultDispatcher).sorter, expected.sorter) {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.sorter, result.(*singleResultDispatcher).sorter)
}
if !reflect.DeepEqual(result.(*singleResultDispatcher).hosts, expected.hosts) {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.hosts, result.(*singleResultDispatcher).hosts)
}
}
func TestLibDispatcherNewDispatcherMetaRandomErr(t *testing.T) {
pfl := &engine.DispatcherProfile{
Hosts: engine.DispatcherHostProfiles{},
StrategyParams: map[string]any{
utils.MetaDefaultRatio: false,
},
Strategy: utils.MetaRandom,
}
_, err := newDispatcher(pfl)
expected := "cannot convert field<bool>: false to int"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
}
func TestLibDispatcherNewDispatcherMetaRoundRobin(t *testing.T) {
pfl := &engine.DispatcherProfile{
Hosts: engine.DispatcherHostProfiles{},
Strategy: utils.MetaRoundRobin,
}
result, err := newDispatcher(pfl)
if err != nil {
t.Errorf("\nExpected <nil>, \nReceived <%+v>", err)
}
expected := &singleResultDispatcher{
hosts: engine.DispatcherHostProfiles{},
sorter: new(roundRobinSort),
}
if !reflect.DeepEqual(result.(*singleResultDispatcher).sorter, expected.sorter) {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.sorter, result.(*singleResultDispatcher).sorter)
}
if !reflect.DeepEqual(result.(*singleResultDispatcher).hosts, expected.hosts) {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.hosts, result.(*singleResultDispatcher).hosts)
}
}
func TestLibDispatcherNewDispatcherMetaRoundRobinErr(t *testing.T) {
pfl := &engine.DispatcherProfile{
Hosts: engine.DispatcherHostProfiles{},
StrategyParams: map[string]any{
utils.MetaDefaultRatio: false,
},
Strategy: utils.MetaRoundRobin,
}
_, err := newDispatcher(pfl)
expected := "cannot convert field<bool>: false to int"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
}
func TestLibDispatcherNewDispatcherPoolBroadcast(t *testing.T) {
pfl := &engine.DispatcherProfile{
Hosts: engine.DispatcherHostProfiles{},
Strategy: rpcclient.PoolBroadcast,
}
result, err := newDispatcher(pfl)
if err != nil {
t.Errorf("\nExpected <nil>, \nReceived <%+v>", err)
}
expected := &broadcastDispatcher{
hosts: engine.DispatcherHostProfiles{},
strategy: pfl.Strategy,
}
if !reflect.DeepEqual(result.(*broadcastDispatcher).strategy, expected.strategy) {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.strategy, result.(*broadcastDispatcher).strategy)
}
if !reflect.DeepEqual(result.(*broadcastDispatcher).hosts, expected.hosts) {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.hosts, result.(*broadcastDispatcher).hosts)
}
}
func TestLibDispatcherNewDispatcherError(t *testing.T) {
pfl := &engine.DispatcherProfile{
Hosts: engine.DispatcherHostProfiles{},
Strategy: "badStrategy",
}
expected := "unsupported dispatch strategy: <badStrategy>"
_, err := newDispatcher(pfl)
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
}
func TestLibDispatcherSingleResultDispatcherDispatch(t *testing.T) {
wgDsp := &singleResultDispatcher{sorter: new(noSort)}
dataDB, dErr := engine.NewInternalDB(nil, nil, true, nil, config.CgrConfig().DataDbCfg().Items)
if dErr != nil {
t.Error(dErr)
}
dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil)
err := wgDsp.Dispatch(dM, nil, nil, "", "", &DispatcherRoute{}, "", "", "")
expected := "DSP_HOST_NOT_FOUND"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
}
func TestLibDispatcherSingleResultDispatcherDispatchRouteID(t *testing.T) {
wgDsp := &singleResultDispatcher{sorter: new(roundRobinSort)}
dataDB, dErr := engine.NewInternalDB(nil, nil, true, nil, config.CgrConfig().DataDbCfg().Items)
if dErr != nil {
t.Error(dErr)
}
dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil)
err := wgDsp.Dispatch(dM, nil, nil, "", "routeID", &DispatcherRoute{}, "", "", "")
expected := "DSP_HOST_NOT_FOUND"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
}
func TestLibDispatcherBroadcastDispatcherDispatch(t *testing.T) {
wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}}
dataDB, dErr := engine.NewInternalDB(nil, nil, true, nil, config.CgrConfig().DataDbCfg().Items)
if dErr != nil {
t.Error(dErr)
}
dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil)
err := wgDsp.Dispatch(dM, nil, nil, "", "", &DispatcherRoute{}, "", "", "")
expected := "DSP_HOST_NOT_FOUND"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
}
func TestLibDispatcherBroadcastDispatcherDispatchRouteID(t *testing.T) {
wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}}
dataDB, dErr := engine.NewInternalDB(nil, nil, true, nil, config.CgrConfig().DataDbCfg().Items)
if dErr != nil {
t.Error(dErr)
}
dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil)
err := wgDsp.Dispatch(dM, nil, nil, "", "routeID", &DispatcherRoute{}, "", "", "")
expected := "DSP_HOST_NOT_FOUND"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
}
func TestLibDispatcherLoadDispatcherDispatch(t *testing.T) {
wgDsp := &loadDispatcher{sorter: new(randomSort)}
dataDB, dErr := engine.NewInternalDB(nil, nil, true, nil, config.CgrConfig().DataDbCfg().Items)
if dErr != nil {
t.Error(dErr)
}
dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil)
err := wgDsp.Dispatch(dM, nil, nil, "", "", &DispatcherRoute{}, "", "", "")
expected := "DSP_HOST_NOT_FOUND"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
}
func TestLibDispatcherLoadDispatcherDispatchHostsID(t *testing.T) {
wgDsp := &loadDispatcher{
hosts: engine.DispatcherHostProfiles{
{ID: "hostID1"},
{ID: "hostID2"},
},
sorter: new(noSort),
}
dataDB, dErr := engine.NewInternalDB(nil, nil, true, nil, config.CgrConfig().DataDbCfg().Items)
if dErr != nil {
t.Error(dErr)
}
dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil)
err := wgDsp.Dispatch(dM, nil, nil, "", "routeID", &DispatcherRoute{}, "", "", "")
expected := "DSP_HOST_NOT_FOUND"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
}
func TestLibDispatcherLoadStrategyDispatchCaseCallError2(t *testing.T) {
wgDsp := &loadDispatcher{
hosts: engine.DispatcherHostProfiles{
{
ID: "testID2",
// FilterIDs: []string{"filterID"},
Weight: 4,
Params: map[string]any{
utils.MetaRatio: 1,
},
Blocker: false,
},
},
defaultRatio: 1,
sorter: new(noSort),
}
err := wgDsp.Dispatch(nil, nil, nil, "", "", &DispatcherRoute{}, "", "", "")
expected := "NO_DATABASE_CONNECTION"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
}
func TestLibDispatcherLoadStrategyDispatchCaseHostsCastError(t *testing.T) {
cacheInit := engine.Cache
cfg := config.NewDefaultCGRConfig()
newCache := engine.NewCacheS(cfg, nil, nil)
engine.Cache = newCache
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherLoads, "testID",
false, nil, true, utils.NonTransactional)
wgDsp := &loadDispatcher{
tntID: "testID",
hosts: engine.DispatcherHostProfiles{
{
ID: "testID",
// FilterIDs: []string{"filterID"},
Weight: 4,
Params: map[string]any{
utils.MetaRatio: 1,
},
Blocker: false,
},
},
defaultRatio: 1,
sorter: new(noSort),
}
err := wgDsp.Dispatch(nil, nil, nil, "", "", &DispatcherRoute{}, "", "", "")
expected := "cannot cast false to *LoadMetrics"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
engine.Cache = cacheInit
}
func TestLibDispatcherLoadStrategyDispatchCaseHostsCastError2(t *testing.T) {
wgDsp := &loadDispatcher{
tntID: "testID",
hosts: engine.DispatcherHostProfiles{
{
ID: "testID",
// FilterIDs: []string{"filterID"},
Weight: 4,
Params: map[string]any{
utils.MetaRatio: false,
},
Blocker: false,
},
},
defaultRatio: 1,
sorter: new(noSort),
}
err := wgDsp.Dispatch(nil, nil, nil, "", "", &DispatcherRoute{}, "", "", "")
expected := "cannot convert field<bool>: false to int"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
}
func TestLibDispatcherSingleResultDispatcherCastError(t *testing.T) {
cacheInit := engine.Cache
cfg := config.NewDefaultCGRConfig()
dm := engine.NewDataManager(nil, nil, nil)
newCache := engine.NewCacheS(cfg, dm, nil)
engine.Cache = newCache
value := &engine.DispatcherHost{
Tenant: "testTenant",
RemoteHost: &config.RemoteHost{
ID: "testID",
Address: "",
Transport: "",
TLS: false,
},
}
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes",
value, nil, true, utils.NonTransactional)
wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}}
err := wgDsp.Dispatch(nil, nil, nil, "", "testID", &DispatcherRoute{}, "", "", "")
expected := "NO_DATABASE_CONNECTION"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
engine.Cache = cacheInit
}
func TestLibDispatcherBroadcastDispatcherDispatchError1(t *testing.T) {
cacheInit := engine.Cache
cfg := config.NewDefaultCGRConfig()
dm := engine.NewDataManager(nil, nil, nil)
newCache := engine.NewCacheS(cfg, dm, nil)
engine.Cache = newCache
value := &engine.DispatcherHost{
Tenant: "testTenant",
RemoteHost: &config.RemoteHost{
ID: "testID",
Address: "",
Transport: "",
TLS: false,
},
}
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes",
value, nil, true, utils.NonTransactional)
wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}}
err := wgDsp.Dispatch(nil, nil, nil, "testTenant", "testID", &DispatcherRoute{}, "", "", "")
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
engine.Cache = cacheInit
}
func TestLibDispatcherBroadcastDispatcherDispatchError2(t *testing.T) {
cacheInit := engine.Cache
cfg := config.NewDefaultCGRConfig()
dm := engine.NewDataManager(nil, nil, nil)
newCache := engine.NewCacheS(cfg, dm, nil)
engine.Cache = newCache
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID",
nil, nil, true, utils.NonTransactional)
wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}}
err := wgDsp.Dispatch(nil, nil, nil, "testTenant", "testID", &DispatcherRoute{}, "", "", "")
expected := "DSP_HOST_NOT_FOUND"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
engine.Cache = cacheInit
}
func TestLibDispatcherBroadcastDispatcherDispatchError3(t *testing.T) {
cacheInit := engine.Cache
cfg := config.NewDefaultCGRConfig()
dm := engine.NewDataManager(nil, nil, nil)
newCache := engine.NewCacheS(cfg, dm, nil)
engine.Cache = newCache
value := &engine.DispatcherHost{
Tenant: "testTenant",
RemoteHost: &config.RemoteHost{
ID: "testID",
Address: "",
Transport: "",
TLS: false,
},
}
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID",
value, nil, true, utils.NonTransactional)
wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}}
err := wgDsp.Dispatch(nil, nil, nil, "testTenant", "testID", &DispatcherRoute{}, "", "", "")
if err != nil {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
}
engine.Cache = cacheInit
}
func TestLibDispatcherLoadDispatcherCacheError(t *testing.T) {
cacheInit := engine.Cache
cfg := config.NewDefaultCGRConfig()
dm := engine.NewDataManager(nil, nil, nil)
newCache := engine.NewCacheS(cfg, dm, nil)
engine.Cache = newCache
value := &engine.DispatcherHost{
Tenant: "testTenant",
RemoteHost: &config.RemoteHost{
ID: "testID",
Address: "",
Transport: "",
TLS: false,
},
}
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes",
value, nil, true, utils.NonTransactional)
wgDsp := &loadDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}}
err := wgDsp.Dispatch(nil, nil, nil, "testTenant", "testID", &DispatcherRoute{}, "", "", "")
expected := "DSP_HOST_NOT_FOUND"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
engine.Cache = cacheInit
}
func TestLibDispatcherLoadDispatcherCacheError7(t *testing.T) {
cacheInit := engine.Cache
cfg := config.NewDefaultCGRConfig()
cfg.CacheCfg().ReplicationConns = []string{"con"}
cfg.CacheCfg().RemoteConns = []string{"con1"}
cfg.CacheCfg().Partitions[utils.CacheDispatcherRoutes].Replicate = true
cfg.CacheCfg().Partitions[utils.CacheDispatcherRoutes].Remote = true
cfg.RPCConns()["con"] = &config.RPCConn{
Strategy: "",
PoolSize: 0,
Conns: []*config.RemoteHost{
{
ID: "testID",
Address: "",
Transport: "",
TLS: false,
},
},
}
cfg.RPCConns()["con1"] = &config.RPCConn{
Strategy: "*first",
PoolSize: 0,
Conns: []*config.RemoteHost{
{
ID: "conn_internal",
Address: "*internal",
Transport: "",
TLS: false,
},
},
}
//rpcCl := map[string]chan birpc.ClientConnector{}
connMng := engine.NewConnManager(cfg, nil)
dm := engine.NewDataManager(nil, nil, connMng)
newCache := engine.NewCacheS(cfg, dm, nil)
engine.Cache = newCache
value := &engine.DispatcherHost{
Tenant: "testTenant",
RemoteHost: &config.RemoteHost{
ID: "testID",
Address: rpcclient.InternalRPC,
Transport: utils.MetaInternal,
TLS: false,
},
}
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTENANT:testID",
value, nil, true, utils.NonTransactional)
wgDsp := &loadDispatcher{
tntID: "testTENANT",
hosts: engine.DispatcherHostProfiles{
{
ID: "testID",
// FilterIDs: []string{"filterID1", "filterID2"},
Weight: 3,
Params: map[string]any{
utils.MetaRatio: 1,
},
Blocker: true,
},
{
ID: "testID2",
// FilterIDs: []string{"filterID1", "filterID2"},
Weight: 3,
Params: map[string]any{
utils.MetaRatio: 2,
},
Blocker: true,
},
},
defaultRatio: 0,
sorter: new(noSort),
}
err := wgDsp.Dispatch(dm, nil, nil, "testTENANT", "testID", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
expected := "UNSUPPORTED_SERVICE_METHOD"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
engine.Cache = cacheInit
}
type mockTypeConDispatch struct{}
func (*mockTypeConDispatch) Call(ctx *context.Context, serviceMethod string, args, reply any) error {
return rpc.ErrShutdown
}
func TestLibDispatcherLoadDispatcherCacheError5(t *testing.T) {
cacheInit := engine.Cache
cfg := config.NewDefaultCGRConfig()
dm := engine.NewDataManager(nil, nil, nil)
newCache := engine.NewCacheS(cfg, dm, nil)
engine.Cache = newCache
value := &engine.DispatcherHost{
Tenant: "testTenant",
RemoteHost: &config.RemoteHost{
ID: "testID",
Address: rpcclient.InternalRPC,
Transport: utils.MetaInternal,
TLS: false,
},
}
tmp := engine.IntRPC
engine.IntRPC = map[string]*rpcclient.RPCClient{}
chanRPC := make(chan birpc.ClientConnector, 1)
chanRPC <- new(mockTypeConDispatch)
engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, chanRPC)
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID",
value, nil, true, utils.NonTransactional)
wgDsp := &loadDispatcher{
tntID: "testTenant",
hosts: engine.DispatcherHostProfiles{
{
ID: "testID",
Weight: 3,
Params: map[string]any{
utils.MetaRatio: 1,
},
Blocker: true,
},
},
defaultRatio: 0,
sorter: new(noSort),
}
err := wgDsp.Dispatch(nil, nil, nil, "testTenant", "testID", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
if err == nil {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "connection is shut down", err)
}
engine.Cache = cacheInit
engine.IntRPC = tmp
}
func TestLibDispatcherSingleResultDispatcherCase1(t *testing.T) {
cacheInit := engine.Cache
cfg := config.NewDefaultCGRConfig()
dm := engine.NewDataManager(nil, nil, nil)
newCache := engine.NewCacheS(cfg, dm, nil)
engine.Cache = newCache
value := &engine.DispatcherHost{
Tenant: "testTenant",
RemoteHost: &config.RemoteHost{
ID: "testID",
Address: rpcclient.InternalRPC,
Transport: utils.MetaInternal,
TLS: false,
},
}
tmp := engine.IntRPC
engine.IntRPC = map[string]*rpcclient.RPCClient{}
chanRPC := make(chan birpc.ClientConnector, 1)
chanRPC <- new(mockTypeConDispatch)
engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, chanRPC)
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID",
value, nil, true, utils.NonTransactional)
wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}}
err := wgDsp.Dispatch(dm, nil, nil, "testTenant", "", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
if err == nil {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "connection is shut down", err)
}
engine.Cache = cacheInit
engine.IntRPC = tmp
}
type mockTypeConDispatch2 struct{}
func (*mockTypeConDispatch2) Call(ctx *context.Context, serviceMethod string, args, reply any) error {
return nil
}
func TestLibDispatcherSingleResultDispatcherCase2(t *testing.T) {
cacheInit := engine.Cache
cfg := config.NewDefaultCGRConfig()
dm := engine.NewDataManager(nil, nil, nil)
newCache := engine.NewCacheS(cfg, dm, nil)
engine.Cache = newCache
value := &engine.DispatcherHost{
Tenant: "testTenant",
RemoteHost: &config.RemoteHost{
ID: "testID",
Address: rpcclient.InternalRPC,
Transport: utils.MetaInternal,
TLS: false,
},
}
tmp := engine.IntRPC
engine.IntRPC = map[string]*rpcclient.RPCClient{}
chanRPC := make(chan birpc.ClientConnector, 1)
chanRPC <- new(mockTypeConDispatch2)
engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, chanRPC)
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID",
value, nil, true, utils.NonTransactional)
wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}}
err := wgDsp.Dispatch(dm, nil, nil, "testTenant", "routeID", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
if err != nil {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
}
engine.Cache = cacheInit
engine.IntRPC = tmp
}
func TestLibDispatcherSingleResultDispatcherCase3(t *testing.T) {
cacheInit := engine.Cache
cfg := config.NewDefaultCGRConfig()
cfg.CacheCfg().ReplicationConns = []string{"con"}
cfg.CacheCfg().Partitions[utils.CacheDispatcherRoutes].Replicate = true
cfg.RPCConns()["con"] = &config.RPCConn{
Strategy: "",
PoolSize: 0,
Conns: []*config.RemoteHost{
{
ID: "testID",
Address: "",
Transport: "",
TLS: false,
},
},
}
rpcCl := map[string]chan birpc.ClientConnector{}
connMng := engine.NewConnManager(cfg, rpcCl)
dm := engine.NewDataManager(nil, nil, connMng)
newCache := engine.NewCacheS(cfg, dm, nil)
engine.Cache = newCache
value := &engine.DispatcherHost{
Tenant: "testTenant",
RemoteHost: &config.RemoteHost{
ID: "testID",
Address: "",
Transport: utils.MetaInternal,
TLS: false,
},
}
tmp := engine.IntRPC
engine.IntRPC = map[string]*rpcclient.RPCClient{}
chanRPC := make(chan birpc.ClientConnector, 1)
chanRPC <- new(mockTypeConDispatch2)
engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, chanRPC)
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID",
value, nil, true, utils.NonTransactional)
wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}}
err := wgDsp.Dispatch(dm, nil, nil, "testTenant", "routeID", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
expected := "INTERNALLY_DISCONNECTED"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
engine.Cache = cacheInit
engine.IntRPC = tmp
}
func TestLibDispatcherRandomSort(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
flts := engine.NewFilterS(cfg, nil, nil)
sorter := new(randomSort)
hosts := engine.DispatcherHostProfiles{
{ID: "testID1"},
{ID: "testID2"},
}
expHostIDs1 := engine.DispatcherHostIDs{"testID1", "testID2"}
expHostIDs2 := engine.DispatcherHostIDs{"testID2", "testID1"}
if hostIDs, err := sorter.Sort(flts, nil, "", hosts); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expHostIDs1, hostIDs) &&
!reflect.DeepEqual(expHostIDs2, hostIDs) {
t.Errorf("Expected: %q or %q, received: %q", expHostIDs1, expHostIDs2, hostIDs)
}
}
func TestLibDispatcherRoundRobinSort(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
flts := engine.NewFilterS(cfg, nil, nil)
sorter := new(roundRobinSort)
hosts := engine.DispatcherHostProfiles{
{ID: "testID1"},
{ID: "testID2"},
}
expHostIDs1 := engine.DispatcherHostIDs{"testID1", "testID2"}
if hostIDs, err := sorter.Sort(flts, nil, "", hosts); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expHostIDs1, hostIDs) {
t.Errorf("Expected: %q, received: %q", expHostIDs1, hostIDs)
}
expHostIDs2 := engine.DispatcherHostIDs{"testID2", "testID1"}
if hostIDs, err := sorter.Sort(flts, nil, "", hosts); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expHostIDs2, hostIDs) {
t.Errorf("Expected: %q, received: %q", expHostIDs2, hostIDs)
}
}
func TestLibDispatcherLoadStrategyDispatchCaseCallError(t *testing.T) {
wgDsp := &loadDispatcher{
hosts: engine.DispatcherHostProfiles{
{
ID: "hostID",
},
},
defaultRatio: 1,
sorter: new(noSort),
}
err := wgDsp.Dispatch(nil, nil, nil, "cgrates.org", "", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
expected := "NO_DATABASE_CONNECTION"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
}
func TestLibDispatcherDispatchFilterError(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
flts := engine.NewFilterS(cfg, nil, nil)
data, err := engine.NewInternalDB(nil, nil, true, nil, cfg.DataDbCfg().Items)
if err != nil {
t.Error(err)
}
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
var dsp Dispatcher = &singleResultDispatcher{
sorter: new(noSort),
hosts: engine.DispatcherHostProfiles{{
ID: "testID",
FilterIDs: []string{"*wrongType"},
}},
}
expErrMsg := "inline parse error for string: <*wrongType>"
if err := dsp.Dispatch(dm, flts, nil, "cgrates.org", "", &DispatcherRoute{}, "", "", ""); err == nil || err.Error() != expErrMsg {
t.Errorf("Expected error: %s received: %v", expErrMsg, err)
}
dsp = &loadDispatcher{
sorter: new(noSort),
hosts: engine.DispatcherHostProfiles{{
ID: "testID2",
FilterIDs: []string{"*wrongType"},
}},
defaultRatio: 1,
}
if err := dsp.Dispatch(dm, flts, nil, "cgrates.org", "", &DispatcherRoute{}, "", "", ""); err == nil || err.Error() != expErrMsg {
t.Errorf("Expected error: %s received: %v", expErrMsg, err)
}
dsp = &broadcastDispatcher{
hosts: engine.DispatcherHostProfiles{{
ID: "testID",
FilterIDs: []string{"*wrongType"},
}},
}
if err := dsp.Dispatch(dm, flts, nil, "cgrates.org", "", &DispatcherRoute{}, "", "", ""); err == nil || err.Error() != expErrMsg {
t.Errorf("Expected error: %s received: %v", expErrMsg, err)
}
}
func TestLibDispatcherNewInternalHost(t *testing.T) {
tnt := "cgrates.org"
want := &engine.DispatcherHost{
Tenant: tnt,
RemoteHost: &config.RemoteHost{
ID: utils.MetaInternal,
Address: utils.MetaInternal,
ConnectAttempts: 1,
Reconnects: 1,
ConnectTimeout: time.Second,
ReplyTimeout: time.Second,
},
}
got := newInternalHost(tnt)
if diff := cmp.Diff(want, got, cmp.AllowUnexported(engine.DispatcherHost{})); diff != "" {
t.Errorf("newInternalHost(%q) returned an unexpected value(-want +got): \n%s", tnt, diff)
}
}