mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-14 12:49:54 +05:00
100% Coverage in Dispatchers 1.0
This commit is contained in:
committed by
Dan Christian Bogos
parent
a2748e846d
commit
6ffae42a2f
@@ -116,6 +116,10 @@ func (dbM *dataDBKeys) SetAttributeProfileDrv(*context.Context, *engine.Attribut
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dbM *dataDBKeys) GetKeysForPrefix(ctx *context.Context, prf string) ([]string, error) {
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
func (dbM *dataDBKeys) RemoveAttributeProfileDrv(*context.Context, string, string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,17 +1,14 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
@@ -19,10 +16,15 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package dispatchers
|
||||
|
||||
import (
|
||||
"net/rpc"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
@@ -650,3 +652,664 @@ func TestLibDispatcherRoundRobinDispatch(t *testing.T) {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLibDispatcherSingleResultstrategyDispatcherDispatch(t *testing.T) {
|
||||
wgDsp := &singleResultstrategyDispatcher{}
|
||||
dataDB := engine.NewInternalDB(nil, nil, true)
|
||||
dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil)
|
||||
err := wgDsp.dispatch(dM, "", "", "", []string{""}, "", "", "")
|
||||
expected := "HOST_NOT_FOUND"
|
||||
if err == nil || err.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLibDispatcherSingleResultstrategyDispatcherDispatchRouteID(t *testing.T) {
|
||||
wgDsp := &singleResultstrategyDispatcher{}
|
||||
dataDB := engine.NewInternalDB(nil, nil, true)
|
||||
dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil)
|
||||
err := wgDsp.dispatch(dM, "routeID", "", "", []string{""}, "", "", "")
|
||||
expected := "HOST_NOT_FOUND"
|
||||
if err == nil || err.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLibDispatcherBroadcastStrategyDispatcherDispatch(t *testing.T) {
|
||||
wgDsp := &broadcastStrategyDispatcher{}
|
||||
dataDB := engine.NewInternalDB(nil, nil, true)
|
||||
dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil)
|
||||
err := wgDsp.dispatch(dM, "", "", "", []string{""}, "", "", "")
|
||||
expected := "HOST_NOT_FOUND"
|
||||
if err == nil || err.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLibDispatcherBroadcastStrategyDispatcherDispatchRouteID(t *testing.T) {
|
||||
wgDsp := &broadcastStrategyDispatcher{}
|
||||
dataDB := engine.NewInternalDB(nil, nil, true)
|
||||
dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil)
|
||||
err := wgDsp.dispatch(dM, "routeID", "", "", []string{""}, "", "", "")
|
||||
expected := "HOST_NOT_FOUND"
|
||||
if err == nil || err.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLibDispatcherLoadStrategyDispatcherDispatch(t *testing.T) {
|
||||
wgDsp := &loadStrategyDispatcher{}
|
||||
dataDB := engine.NewInternalDB(nil, nil, true)
|
||||
dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil)
|
||||
err := wgDsp.dispatch(dM, "", "", "", []string{""}, "", "", "")
|
||||
expected := "HOST_NOT_FOUND"
|
||||
if err == nil || err.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLibDispatcherLoadStrategyDispatcherDispatchHostsID(t *testing.T) {
|
||||
wgDsp := &loadStrategyDispatcher{}
|
||||
dataDB := engine.NewInternalDB(nil, nil, true)
|
||||
dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil)
|
||||
err := wgDsp.dispatch(dM, "routeID", "", "", []string{"hostID1", "hostID2"}, "", "", "")
|
||||
expected := "HOST_NOT_FOUND"
|
||||
if err == nil || err.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLibDispatcherLoadStrategyDispatchCaseHosts(t *testing.T) {
|
||||
wgDsp := &loadStrategyDispatcher{
|
||||
hosts: engine.DispatcherHostProfiles{
|
||||
{
|
||||
ID: "testID",
|
||||
FilterIDs: []string{"filterID"},
|
||||
Weight: 4,
|
||||
Params: map[string]interface{}{
|
||||
utils.MetaRatio: 1,
|
||||
},
|
||||
Blocker: false,
|
||||
},
|
||||
},
|
||||
defaultRatio: 1,
|
||||
}
|
||||
dataDB := engine.NewInternalDB(nil, nil, true)
|
||||
dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil)
|
||||
err := wgDsp.dispatch(dM, "", "", "", []string{"testID"}, "", "", "")
|
||||
expected := "HOST_NOT_FOUND"
|
||||
if err == nil || err.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLibDispatcherLoadStrategyDispatchCaseHostsError(t *testing.T) {
|
||||
wgDsp := &loadStrategyDispatcher{
|
||||
hosts: engine.DispatcherHostProfiles{
|
||||
{
|
||||
ID: "testID2",
|
||||
FilterIDs: []string{"filterID"},
|
||||
Weight: 4,
|
||||
Params: map[string]interface{}{
|
||||
utils.MetaRatio: 1,
|
||||
},
|
||||
Blocker: false,
|
||||
},
|
||||
},
|
||||
defaultRatio: 1,
|
||||
}
|
||||
err := wgDsp.dispatch(nil, "", "", "", []string{"testID2"}, "", "", "")
|
||||
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
|
||||
if err == nil || err.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLibDispatcherLoadStrategyDispatchCaseHostsCastError(t *testing.T) {
|
||||
cacheInit := engine.Cache
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
newCache := engine.NewCacheS(cfg, nil, nil)
|
||||
engine.Cache = newCache
|
||||
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherLoads, "testID",
|
||||
false, nil, true, utils.NonTransactional)
|
||||
wgDsp := &loadStrategyDispatcher{
|
||||
tntID: "testID",
|
||||
hosts: engine.DispatcherHostProfiles{
|
||||
{
|
||||
ID: "testID",
|
||||
FilterIDs: []string{"filterID"},
|
||||
Weight: 4,
|
||||
Params: map[string]interface{}{
|
||||
utils.MetaRatio: 1,
|
||||
},
|
||||
Blocker: false,
|
||||
},
|
||||
},
|
||||
defaultRatio: 1,
|
||||
}
|
||||
err := wgDsp.dispatch(nil, "", "", "", []string{"testID"}, "", "", "")
|
||||
expected := "cannot cast false to *LoadMetrics"
|
||||
if err == nil || err.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
|
||||
}
|
||||
engine.Cache = cacheInit
|
||||
}
|
||||
|
||||
func TestLibDispatcherLoadStrategyDispatchCaseHostsCastError2(t *testing.T) {
|
||||
wgDsp := &loadStrategyDispatcher{
|
||||
tntID: "testID",
|
||||
hosts: engine.DispatcherHostProfiles{
|
||||
{
|
||||
ID: "testID",
|
||||
FilterIDs: []string{"filterID"},
|
||||
Weight: 4,
|
||||
Params: map[string]interface{}{
|
||||
utils.MetaRatio: false,
|
||||
},
|
||||
Blocker: false,
|
||||
},
|
||||
},
|
||||
defaultRatio: 1,
|
||||
}
|
||||
err := wgDsp.dispatch(nil, "", "", "", []string{"testID"}, "", "", "")
|
||||
expected := "cannot convert field<bool>: false to int"
|
||||
if err == nil || err.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLibDispatcherSingleResultStrategyDispatcherCastError(t *testing.T) {
|
||||
cacheInit := engine.Cache
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
dm := engine.NewDataManager(nil, nil, nil)
|
||||
newCache := engine.NewCacheS(cfg, dm, nil)
|
||||
engine.Cache = newCache
|
||||
value := &engine.DispatcherHost{
|
||||
Tenant: "testTenant",
|
||||
RemoteHost: &config.RemoteHost{
|
||||
ID: "testID",
|
||||
Address: "",
|
||||
Transport: "",
|
||||
Synchronous: false,
|
||||
TLS: false,
|
||||
},
|
||||
}
|
||||
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes",
|
||||
value, nil, true, utils.NonTransactional)
|
||||
wgDsp := &singleResultstrategyDispatcher{}
|
||||
err := wgDsp.dispatch(nil, "testID", utils.MetaAttributes, "testTenant", []string{"testID"}, "", "", "")
|
||||
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
|
||||
if err == nil || err.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
|
||||
}
|
||||
engine.Cache = cacheInit
|
||||
}
|
||||
|
||||
type mockTypeCon struct{}
|
||||
|
||||
func (*mockTypeCon) Call(ctx *context.Context, method string, args interface{}, reply interface{}) error {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
|
||||
func TestLibDispatcherSingleResultStrategyDispatcherCastError2(t *testing.T) {
|
||||
cacheInit := engine.Cache
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
dm := engine.NewDataManager(nil, nil, nil)
|
||||
newCache := engine.NewCacheS(cfg, dm, nil)
|
||||
engine.Cache = newCache
|
||||
value := &engine.DispatcherHost{
|
||||
Tenant: "testTenant",
|
||||
RemoteHost: &config.RemoteHost{
|
||||
ID: "testID",
|
||||
Address: rpcclient.InternalRPC,
|
||||
Transport: utils.MetaInternal,
|
||||
Synchronous: false,
|
||||
TLS: false,
|
||||
},
|
||||
}
|
||||
|
||||
tmp := engine.IntRPC
|
||||
engine.IntRPC = map[string]*rpcclient.RPCClient{}
|
||||
chanRPC := make(chan birpc.ClientConnector, 1)
|
||||
chanRPC <- new(mockTypeCon)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1Ping, chanRPC)
|
||||
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes",
|
||||
value, nil, true, utils.NonTransactional)
|
||||
wgDsp := &singleResultstrategyDispatcher{}
|
||||
err := wgDsp.dispatch(nil, "testID", utils.MetaAttributes, "testTenant", []string{"testID"}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
|
||||
expected := "UNSUPPORTED_SERVICE_METHOD"
|
||||
if err == nil || err.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
|
||||
}
|
||||
engine.Cache = cacheInit
|
||||
engine.IntRPC = tmp
|
||||
}
|
||||
|
||||
func TestLibDispatcherBroadcastStrategyDispatcherDispatchError1(t *testing.T) {
|
||||
cacheInit := engine.Cache
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
dm := engine.NewDataManager(nil, nil, nil)
|
||||
newCache := engine.NewCacheS(cfg, dm, nil)
|
||||
engine.Cache = newCache
|
||||
value := &engine.DispatcherHost{
|
||||
Tenant: "testTenant",
|
||||
RemoteHost: &config.RemoteHost{
|
||||
ID: "testID",
|
||||
Address: "",
|
||||
Transport: "",
|
||||
Synchronous: false,
|
||||
TLS: false,
|
||||
},
|
||||
}
|
||||
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes",
|
||||
value, nil, true, utils.NonTransactional)
|
||||
wgDsp := &broadcastStrategyDispatcher{}
|
||||
err := wgDsp.dispatch(nil, "testID", utils.MetaAttributes, "testTenant", []string{"testID"}, "", "", "")
|
||||
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
|
||||
if err == nil || err.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
|
||||
}
|
||||
engine.Cache = cacheInit
|
||||
}
|
||||
|
||||
func TestLibDispatcherBroadcastStrategyDispatcherDispatchError2(t *testing.T) {
|
||||
cacheInit := engine.Cache
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
dm := engine.NewDataManager(nil, nil, nil)
|
||||
newCache := engine.NewCacheS(cfg, dm, nil)
|
||||
engine.Cache = newCache
|
||||
|
||||
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID",
|
||||
nil, nil, true, utils.NonTransactional)
|
||||
wgDsp := &broadcastStrategyDispatcher{}
|
||||
err := wgDsp.dispatch(nil, "testID", utils.MetaAttributes, "testTenant", []string{"testID"}, "", "", "")
|
||||
expected := "HOST_NOT_FOUND"
|
||||
if err == nil || err.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
|
||||
}
|
||||
engine.Cache = cacheInit
|
||||
}
|
||||
|
||||
func TestLibDispatcherBroadcastStrategyDispatcherDispatchError3(t *testing.T) {
|
||||
cacheInit := engine.Cache
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
dm := engine.NewDataManager(nil, nil, nil)
|
||||
newCache := engine.NewCacheS(cfg, dm, nil)
|
||||
engine.Cache = newCache
|
||||
value := &engine.DispatcherHost{
|
||||
Tenant: "testTenant",
|
||||
RemoteHost: &config.RemoteHost{
|
||||
ID: "testID",
|
||||
Address: "",
|
||||
Transport: "",
|
||||
Synchronous: false,
|
||||
TLS: false,
|
||||
},
|
||||
}
|
||||
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID",
|
||||
value, nil, true, utils.NonTransactional)
|
||||
wgDsp := &broadcastStrategyDispatcher{}
|
||||
err := wgDsp.dispatch(nil, "testID", utils.MetaAttributes, "testTenant", []string{"testID"}, "", "", "")
|
||||
if err != nil {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
|
||||
}
|
||||
engine.Cache = cacheInit
|
||||
}
|
||||
|
||||
func TestLibDispatcherLoadStrategyDispatcherCacheError(t *testing.T) {
|
||||
cacheInit := engine.Cache
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
dm := engine.NewDataManager(nil, nil, nil)
|
||||
newCache := engine.NewCacheS(cfg, dm, nil)
|
||||
engine.Cache = newCache
|
||||
value := &engine.DispatcherHost{
|
||||
Tenant: "testTenant",
|
||||
RemoteHost: &config.RemoteHost{
|
||||
ID: "testID",
|
||||
Address: "",
|
||||
Transport: "",
|
||||
Synchronous: false,
|
||||
TLS: false,
|
||||
},
|
||||
}
|
||||
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes",
|
||||
value, nil, true, utils.NonTransactional)
|
||||
wgDsp := &loadStrategyDispatcher{}
|
||||
err := wgDsp.dispatch(nil, "testID", utils.MetaAttributes, "testTenant", []string{"testID"}, "", "", "")
|
||||
expected := "HOST_NOT_FOUND"
|
||||
if err == nil || err.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
|
||||
}
|
||||
engine.Cache = cacheInit
|
||||
}
|
||||
|
||||
func TestLibDispatcherLoadStrategyDispatcherCacheError2(t *testing.T) {
|
||||
cacheInit := engine.Cache
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
dm := engine.NewDataManager(nil, nil, nil)
|
||||
newCache := engine.NewCacheS(cfg, dm, nil)
|
||||
engine.Cache = newCache
|
||||
value := &engine.DispatcherHost{
|
||||
Tenant: "testTenant",
|
||||
RemoteHost: &config.RemoteHost{
|
||||
ID: "testID",
|
||||
Address: rpcclient.InternalRPC,
|
||||
Transport: utils.MetaInternal,
|
||||
Synchronous: false,
|
||||
TLS: false,
|
||||
},
|
||||
}
|
||||
|
||||
tmp := engine.IntRPC
|
||||
engine.IntRPC = map[string]*rpcclient.RPCClient{}
|
||||
chanRPC := make(chan birpc.ClientConnector, 1)
|
||||
chanRPC <- new(mockTypeCon)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1Ping, chanRPC)
|
||||
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes",
|
||||
value, nil, true, utils.NonTransactional)
|
||||
wgDsp := &loadStrategyDispatcher{}
|
||||
err := wgDsp.dispatch(nil, "testID", utils.MetaAttributes, "testTenant", []string{"testID"}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
|
||||
expected := "UNSUPPORTED_SERVICE_METHOD"
|
||||
if err == nil || err.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
|
||||
}
|
||||
engine.Cache = cacheInit
|
||||
engine.IntRPC = tmp
|
||||
}
|
||||
|
||||
func TestLibDispatcherLoadStrategyDispatcherCacheError3(t *testing.T) {
|
||||
cacheInit := engine.Cache
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
dm := engine.NewDataManager(nil, nil, nil)
|
||||
newCache := engine.NewCacheS(cfg, dm, nil)
|
||||
engine.Cache = newCache
|
||||
value := &engine.DispatcherHost{
|
||||
Tenant: "testTenant",
|
||||
RemoteHost: &config.RemoteHost{
|
||||
ID: "testID",
|
||||
Address: rpcclient.InternalRPC,
|
||||
Transport: utils.MetaInternal,
|
||||
Synchronous: false,
|
||||
TLS: false,
|
||||
},
|
||||
}
|
||||
|
||||
tmp := engine.IntRPC
|
||||
engine.IntRPC = map[string]*rpcclient.RPCClient{}
|
||||
chanRPC := make(chan birpc.ClientConnector, 1)
|
||||
chanRPC <- new(mockTypeCon)
|
||||
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTENANT:testID",
|
||||
value, nil, true, utils.NonTransactional)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1Ping, chanRPC)
|
||||
wgDsp := &loadStrategyDispatcher{
|
||||
tntID: "testTENANT",
|
||||
hosts: engine.DispatcherHostProfiles{
|
||||
{
|
||||
ID: "testID",
|
||||
FilterIDs: []string{"filterID1", "filterID2"},
|
||||
Weight: 3,
|
||||
Params: map[string]interface{}{
|
||||
utils.MetaRatio: 1,
|
||||
},
|
||||
Blocker: true,
|
||||
},
|
||||
{
|
||||
ID: "testID2",
|
||||
FilterIDs: []string{"filterID1", "filterID2"},
|
||||
Weight: 3,
|
||||
Params: map[string]interface{}{
|
||||
utils.MetaRatio: 2,
|
||||
},
|
||||
Blocker: true,
|
||||
},
|
||||
},
|
||||
defaultRatio: 0,
|
||||
}
|
||||
err := wgDsp.dispatch(dm, "testID", utils.MetaAttributes, "testTENANT", []string{"testID", "testID2"}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
|
||||
if err != nil {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
|
||||
}
|
||||
engine.Cache = cacheInit
|
||||
engine.IntRPC = tmp
|
||||
}
|
||||
|
||||
func TestLibDispatcherLoadStrategyDispatcherCacheError4(t *testing.T) {
|
||||
cacheInit := engine.Cache
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
cfg.CacheCfg().ReplicationConns = []string{"con"}
|
||||
cfg.CacheCfg().Partitions[utils.CacheDispatcherRoutes].Replicate = true
|
||||
cfg.RPCConns()["con"] = &config.RPCConn{
|
||||
Strategy: "",
|
||||
PoolSize: 0,
|
||||
Conns: []*config.RemoteHost{
|
||||
{
|
||||
ID: "testID",
|
||||
Address: "",
|
||||
Transport: "",
|
||||
Synchronous: false,
|
||||
TLS: false,
|
||||
},
|
||||
},
|
||||
}
|
||||
rpcCl := map[string]chan birpc.ClientConnector{}
|
||||
connMng := engine.NewConnManager(cfg, rpcCl)
|
||||
dm := engine.NewDataManager(nil, nil, connMng)
|
||||
|
||||
newCache := engine.NewCacheS(cfg, dm, nil)
|
||||
engine.Cache = newCache
|
||||
value := &engine.DispatcherHost{
|
||||
Tenant: "testTenant",
|
||||
RemoteHost: &config.RemoteHost{
|
||||
ID: "testID",
|
||||
Address: rpcclient.InternalRPC,
|
||||
Transport: utils.MetaInternal,
|
||||
Synchronous: false,
|
||||
TLS: false,
|
||||
},
|
||||
}
|
||||
|
||||
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTENANT:testID",
|
||||
value, nil, true, utils.NonTransactional)
|
||||
wgDsp := &loadStrategyDispatcher{
|
||||
tntID: "testTENANT",
|
||||
hosts: engine.DispatcherHostProfiles{
|
||||
{
|
||||
ID: "testID",
|
||||
FilterIDs: []string{"filterID1", "filterID2"},
|
||||
Weight: 3,
|
||||
Params: map[string]interface{}{
|
||||
utils.MetaRatio: 1,
|
||||
},
|
||||
Blocker: true,
|
||||
},
|
||||
{
|
||||
ID: "testID2",
|
||||
FilterIDs: []string{"filterID1", "filterID2"},
|
||||
Weight: 3,
|
||||
Params: map[string]interface{}{
|
||||
utils.MetaRatio: 2,
|
||||
},
|
||||
Blocker: true,
|
||||
},
|
||||
},
|
||||
defaultRatio: 0,
|
||||
}
|
||||
err := wgDsp.dispatch(dm, "testID", utils.MetaAttributes, "testTENANT", []string{"testID", "testID2"}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
|
||||
expected := "DISCONNECTED"
|
||||
if err == nil || err.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
|
||||
}
|
||||
engine.Cache = cacheInit
|
||||
}
|
||||
|
||||
type mockTypeConDispatch struct{}
|
||||
|
||||
func (*mockTypeConDispatch) Call(ctx *context.Context, serviceMethod string, args, reply interface{}) error {
|
||||
return rpc.ErrShutdown
|
||||
}
|
||||
|
||||
func TestLibDispatcherLoadStrategyDispatcherCacheError5(t *testing.T) {
|
||||
cacheInit := engine.Cache
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
|
||||
dm := engine.NewDataManager(nil, nil, nil)
|
||||
newCache := engine.NewCacheS(cfg, dm, nil)
|
||||
engine.Cache = newCache
|
||||
value := &engine.DispatcherHost{
|
||||
Tenant: "testTenant",
|
||||
RemoteHost: &config.RemoteHost{
|
||||
ID: "testID",
|
||||
Address: rpcclient.InternalRPC,
|
||||
Transport: utils.MetaInternal,
|
||||
Synchronous: false,
|
||||
TLS: false,
|
||||
},
|
||||
}
|
||||
|
||||
tmp := engine.IntRPC
|
||||
engine.IntRPC = map[string]*rpcclient.RPCClient{}
|
||||
chanRPC := make(chan birpc.ClientConnector, 1)
|
||||
chanRPC <- new(mockTypeConDispatch)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, chanRPC)
|
||||
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID",
|
||||
value, nil, true, utils.NonTransactional)
|
||||
wgDsp := &loadStrategyDispatcher{
|
||||
tntID: "testTenant",
|
||||
hosts: engine.DispatcherHostProfiles{
|
||||
{
|
||||
ID: "testID",
|
||||
Weight: 3,
|
||||
Params: map[string]interface{}{
|
||||
utils.MetaRatio: 1,
|
||||
},
|
||||
Blocker: true,
|
||||
},
|
||||
},
|
||||
defaultRatio: 0,
|
||||
}
|
||||
err := wgDsp.dispatch(nil, "testID", utils.MetaAttributes, "testTenant", []string{"testID"}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
|
||||
if err == nil {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "connection is shut down", err)
|
||||
}
|
||||
engine.Cache = cacheInit
|
||||
engine.IntRPC = tmp
|
||||
}
|
||||
func TestLibDispatcherSingleResultstrategyDispatcherCase1(t *testing.T) {
|
||||
cacheInit := engine.Cache
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
dm := engine.NewDataManager(nil, nil, nil)
|
||||
newCache := engine.NewCacheS(cfg, dm, nil)
|
||||
engine.Cache = newCache
|
||||
value := &engine.DispatcherHost{
|
||||
Tenant: "testTenant",
|
||||
RemoteHost: &config.RemoteHost{
|
||||
ID: "testID",
|
||||
Address: rpcclient.InternalRPC,
|
||||
Transport: utils.MetaInternal,
|
||||
Synchronous: false,
|
||||
TLS: false,
|
||||
},
|
||||
}
|
||||
tmp := engine.IntRPC
|
||||
engine.IntRPC = map[string]*rpcclient.RPCClient{}
|
||||
chanRPC := make(chan birpc.ClientConnector, 1)
|
||||
chanRPC <- new(mockTypeConDispatch)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, chanRPC)
|
||||
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID",
|
||||
value, nil, true, utils.NonTransactional)
|
||||
wgDsp := &singleResultstrategyDispatcher{}
|
||||
err := wgDsp.dispatch(dm, "", utils.MetaAttributes, "testTenant", []string{"testID"}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
|
||||
if err == nil {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "connection is shut down", err)
|
||||
}
|
||||
engine.Cache = cacheInit
|
||||
engine.IntRPC = tmp
|
||||
}
|
||||
|
||||
type mockTypeConDispatch2 struct{}
|
||||
|
||||
func (*mockTypeConDispatch2) Call(ctx *context.Context, serviceMethod string, args, reply interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestLibDispatcherSingleResultstrategyDispatcherCase2(t *testing.T) {
|
||||
cacheInit := engine.Cache
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
dm := engine.NewDataManager(nil, nil, nil)
|
||||
newCache := engine.NewCacheS(cfg, dm, nil)
|
||||
engine.Cache = newCache
|
||||
value := &engine.DispatcherHost{
|
||||
Tenant: "testTenant",
|
||||
RemoteHost: &config.RemoteHost{
|
||||
ID: "testID",
|
||||
Address: rpcclient.InternalRPC,
|
||||
Transport: utils.MetaInternal,
|
||||
Synchronous: false,
|
||||
TLS: false,
|
||||
},
|
||||
}
|
||||
tmp := engine.IntRPC
|
||||
engine.IntRPC = map[string]*rpcclient.RPCClient{}
|
||||
chanRPC := make(chan birpc.ClientConnector, 1)
|
||||
chanRPC <- new(mockTypeConDispatch2)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, chanRPC)
|
||||
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID",
|
||||
value, nil, true, utils.NonTransactional)
|
||||
wgDsp := &singleResultstrategyDispatcher{}
|
||||
err := wgDsp.dispatch(dm, "routeID", utils.MetaAttributes, "testTenant", []string{"testID"}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
|
||||
if err != nil {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
|
||||
}
|
||||
engine.Cache = cacheInit
|
||||
engine.IntRPC = tmp
|
||||
}
|
||||
|
||||
func TestLibDispatcherSingleResultstrategyDispatcherCase3(t *testing.T) {
|
||||
cacheInit := engine.Cache
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
cfg.CacheCfg().ReplicationConns = []string{"con"}
|
||||
cfg.CacheCfg().Partitions[utils.CacheDispatcherRoutes].Replicate = true
|
||||
cfg.RPCConns()["con"] = &config.RPCConn{
|
||||
Strategy: "",
|
||||
PoolSize: 0,
|
||||
Conns: []*config.RemoteHost{
|
||||
{
|
||||
ID: "testID",
|
||||
Address: "",
|
||||
Transport: "",
|
||||
Synchronous: false,
|
||||
TLS: false,
|
||||
},
|
||||
},
|
||||
}
|
||||
rpcCl := map[string]chan birpc.ClientConnector{}
|
||||
connMng := engine.NewConnManager(cfg, rpcCl)
|
||||
dm := engine.NewDataManager(nil, nil, connMng)
|
||||
newCache := engine.NewCacheS(cfg, dm, nil)
|
||||
engine.Cache = newCache
|
||||
value := &engine.DispatcherHost{
|
||||
Tenant: "testTenant",
|
||||
RemoteHost: &config.RemoteHost{
|
||||
ID: "testID",
|
||||
Address: rpcclient.InternalRPC,
|
||||
Transport: utils.MetaInternal,
|
||||
Synchronous: false,
|
||||
TLS: false,
|
||||
},
|
||||
}
|
||||
tmp := engine.IntRPC
|
||||
engine.IntRPC = map[string]*rpcclient.RPCClient{}
|
||||
chanRPC := make(chan birpc.ClientConnector, 1)
|
||||
chanRPC <- new(mockTypeConDispatch2)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, chanRPC)
|
||||
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID",
|
||||
value, nil, true, utils.NonTransactional)
|
||||
wgDsp := &singleResultstrategyDispatcher{}
|
||||
err := wgDsp.dispatch(dm, "routeID", utils.MetaAttributes, "testTenant", []string{"testID"}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
|
||||
expected := "DISCONNECTED"
|
||||
if err == nil || err.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
|
||||
}
|
||||
engine.Cache = cacheInit
|
||||
engine.IntRPC = tmp
|
||||
}
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
// +build integration
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
@@ -62,7 +60,7 @@ func TestDspServiceManagerV1PingNil(t *testing.T) {
|
||||
}
|
||||
var reply *string
|
||||
result := dspSrv.ServiceManagerV1Ping(CGREvent, reply)
|
||||
expected := "DISPATCHER_ERROR:NOT_FOUND"
|
||||
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
|
||||
if result == nil || result.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
|
||||
}
|
||||
|
||||
@@ -23,7 +23,12 @@ import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
type DataDBMock struct{}
|
||||
type DataDBMock struct {
|
||||
GetKeysForPrefixF func(*context.Context, string) ([]string, error)
|
||||
GetChargerProfileDrvF func(string, string) (*ChargerProfile, error)
|
||||
GetFilterDrvF func(string, string) (*Filter, error)
|
||||
GetIndexesDrvF func(idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error)
|
||||
}
|
||||
|
||||
//Storage methods
|
||||
func (dbM *DataDBMock) Close() {}
|
||||
@@ -32,7 +37,10 @@ func (dbM *DataDBMock) Flush(string) error {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
func (dbM *DataDBMock) GetKeysForPrefix(_ *context.Context, _ string) ([]string, error) {
|
||||
func (dbM *DataDBMock) GetKeysForPrefix(ctx *context.Context, prf string) ([]string, error) {
|
||||
if dbM.GetKeysForPrefixF != nil {
|
||||
return dbM.GetKeysForPrefixF(ctx, prf)
|
||||
}
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user