Fix dispatcher tests

This commit is contained in:
ionutboangiu
2022-09-09 18:21:31 +03:00
committed by Dan Christian Bogos
parent ba35d39f2c
commit 1ed6d8256b
2 changed files with 69 additions and 251 deletions

View File

@@ -176,7 +176,7 @@ func newSingleDispatcher(hosts engine.DispatcherHostProfiles, params map[string]
}
// singleResultDispatcher routes the event to a single host
// implements Dispatcher interface
// implements Dispatcher interface
type singleResultDispatcher struct {
sorter hostSorter
hosts engine.DispatcherHostProfiles
@@ -202,32 +202,30 @@ func (sd *singleResultDispatcher) Dispatch(dm *engine.DataManager, flts *engine.
} else if len(hostIDs) == 0 { // in case we do not match any host
return utils.ErrHostNotFound
}
for _, hostID := range hostIDs {
var dRh *DispatcherRoute
if routeID != utils.EmptyString {
dRh = &DispatcherRoute{
Tenant: dR.Tenant,
ProfileID: dR.ProfileID,
HostID: hostID,
}
}
if err = callDHwithID(ctx, tnt, hostID, routeID, dRh, dm,
cfg, iPRCCh, serviceMethod, args, reply); err == nil ||
(err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors
return
}
if err != nil {
// not found or network errors will continue with standard dispatching
utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with identity <%s>",
utils.DispatcherS, err.Error(), hostID))
hostID := hostIDs[0]
var dRh *DispatcherRoute
if routeID != utils.EmptyString {
dRh = &DispatcherRoute{
Tenant: dR.Tenant,
ProfileID: dR.ProfileID,
HostID: hostID,
}
}
if err = callDHwithID(ctx, tnt, hostID, routeID, dRh, dm,
cfg, iPRCCh, serviceMethod, args, reply); err == nil ||
(err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors
return
}
if err != nil {
// not found or network errors will continue with standard dispatching
utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with identity <%s>",
utils.DispatcherS, err.Error(), hostID))
}
return
}
// broadcastDispatcher routes the event to multiple hosts in a pool
// implements the Dispatcher interface
// implements the Dispatcher interface
type broadcastDispatcher struct {
strategy string
hosts engine.DispatcherHostProfiles
@@ -317,30 +315,28 @@ func (ld *loadDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS,
} else if len(hostIDs) == 0 { // in case we do not match any host
return utils.ErrHostNotFound
}
for _, hostID := range hostIDs {
var dRh *DispatcherRoute
if routeID != utils.EmptyString {
dRh = &DispatcherRoute{
Tenant: dR.Tenant,
ProfileID: dR.ProfileID,
HostID: hostID,
}
}
lM.incrementLoad(ctx, hostID, ld.tntID)
err = callDHwithID(ctx, tnt, hostID, routeID, dRh, dm,
cfg, iPRCCh, serviceMethod, args, reply)
lM.decrementLoad(ctx, hostID, ld.tntID) // call ended
if err == nil ||
(err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors
return
}
if err != nil {
// not found or network errors will continue with standard dispatching
utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with id <%q>",
utils.DispatcherS, err.Error(), hostID))
hostID := hostIDs[0]
var dRh *DispatcherRoute
if routeID != utils.EmptyString {
dRh = &DispatcherRoute{
Tenant: dR.Tenant,
ProfileID: dR.ProfileID,
HostID: hostID,
}
}
lM.incrementLoad(ctx, hostID, ld.tntID)
err = callDHwithID(ctx, tnt, hostID, routeID, dRh, dm,
cfg, iPRCCh, serviceMethod, args, reply)
lM.decrementLoad(ctx, hostID, ld.tntID) // call ended
if err == nil ||
(err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors
return
}
if err != nil {
// not found or network errors will continue with standard dispatching
utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with id <%q>",
utils.DispatcherS, err.Error(), hostID))
}
return
}
@@ -420,7 +416,7 @@ func (lM *LoadMetrics) decrementLoad(ctx *context.Context, hostID, tntID string)
}
// callDHwithID is a wrapper on callDH using ID of the host which the other cannot do due to lazyDH
// if routeID provided, will also cache once the call is successful
// if routeID provided, will also cache once the call is successful
func callDHwithID(ctx *context.Context, tnt, hostID, routeID string, dR *DispatcherRoute,
dm *engine.DataManager, cfg *config.CGRConfig, iPRCCh chan birpc.ClientConnector,
serviceMethod string, args, reply interface{}) (err error) {

View File

@@ -31,7 +31,7 @@ import (
"github.com/cgrates/rpcclient"
)
func TestLoadMetricsGetHosts(t *testing.T) {
func TestLibDispatcherLoadMetricsGetHosts(t *testing.T) {
dhp := engine.DispatcherHostProfiles{
{ID: "DSP_1", Params: map[string]interface{}{utils.MetaRatio: 1}},
{ID: "DSP_2", Params: map[string]interface{}{utils.MetaRatio: 1}},
@@ -66,7 +66,7 @@ func TestLoadMetricsGetHosts(t *testing.T) {
}
}
func TestNewSingleDispatcher(t *testing.T) {
func TestLibDispatcherNewSingleDispatcher(t *testing.T) {
dhp := engine.DispatcherHostProfiles{
{ID: "DSP_1"},
{ID: "DSP_2"},
@@ -132,7 +132,7 @@ func TestNewSingleDispatcher(t *testing.T) {
}
}
func TestNewLoadMetrics(t *testing.T) {
func TestLibDispatcherNewLoadMetrics(t *testing.T) {
dhp := engine.DispatcherHostProfiles{
{ID: "DSP_1", Params: map[string]interface{}{utils.MetaRatio: 1}},
{ID: "DSP_2", Params: map[string]interface{}{utils.MetaRatio: 0}},
@@ -159,7 +159,7 @@ func TestNewLoadMetrics(t *testing.T) {
}
}
func TestLoadMetricsGetHosts2(t *testing.T) {
func TestLibDispatcherLoadMetricsGetHosts2(t *testing.T) {
dhp := engine.DispatcherHostProfiles{
{ID: "DSP_1", Params: map[string]interface{}{utils.MetaRatio: 2}},
{ID: "DSP_2", Params: map[string]interface{}{utils.MetaRatio: 3}},
@@ -389,7 +389,7 @@ func TestLibDispatcherNewDispatcherError(t *testing.T) {
func TestLibDispatcherSingleResultDispatcherDispatch(t *testing.T) {
wgDsp := &singleResultDispatcher{sorter: new(noSort)}
dM := engine.NewDataManager(engine.NewInternalDB(nil, nil, config.CgrConfig().DataDbCfg().Items), config.CgrConfig().CacheCfg(), nil)
err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", "", "", "", "")
err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", &DispatcherRoute{}, "", "", "")
expected := "HOST_NOT_FOUND"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -399,7 +399,7 @@ func TestLibDispatcherSingleResultDispatcherDispatch(t *testing.T) {
func TestLibDispatcherSingleResultDispatcherDispatchRouteID(t *testing.T) {
wgDsp := &singleResultDispatcher{sorter: new(roundRobinSort)}
dM := engine.NewDataManager(engine.NewInternalDB(nil, nil, config.CgrConfig().DataDbCfg().Items), config.CgrConfig().CacheCfg(), nil)
err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "routeID", "", "", "", "")
err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "routeID", &DispatcherRoute{}, "", "", "")
expected := "HOST_NOT_FOUND"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -409,7 +409,7 @@ func TestLibDispatcherSingleResultDispatcherDispatchRouteID(t *testing.T) {
func TestLibDispatcherBroadcastDispatcherDispatch(t *testing.T) {
wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}}
dM := engine.NewDataManager(engine.NewInternalDB(nil, nil, config.CgrConfig().DataDbCfg().Items), config.CgrConfig().CacheCfg(), nil)
err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", "", "", "", "")
err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", &DispatcherRoute{}, "", "", "")
expected := "HOST_NOT_FOUND"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -419,7 +419,7 @@ func TestLibDispatcherBroadcastDispatcherDispatch(t *testing.T) {
func TestLibDispatcherBroadcastDispatcherDispatchRouteID(t *testing.T) {
wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}}
dM := engine.NewDataManager(engine.NewInternalDB(nil, nil, config.CgrConfig().DataDbCfg().Items), config.CgrConfig().CacheCfg(), nil)
err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "routeID", "", "", "", "")
err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "routeID", &DispatcherRoute{}, "", "", "")
expected := "HOST_NOT_FOUND"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -429,7 +429,7 @@ func TestLibDispatcherBroadcastDispatcherDispatchRouteID(t *testing.T) {
func TestLibDispatcherLoadDispatcherDispatch(t *testing.T) {
wgDsp := &loadDispatcher{sorter: new(randomSort)}
dM := engine.NewDataManager(engine.NewInternalDB(nil, nil, config.CgrConfig().DataDbCfg().Items), config.CgrConfig().CacheCfg(), nil)
err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", "", "", "", "")
err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", &DispatcherRoute{}, "", "", "")
expected := "HOST_NOT_FOUND"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -445,55 +445,25 @@ func TestLibDispatcherLoadDispatcherDispatchHostsID(t *testing.T) {
sorter: new(noSort),
}
dM := engine.NewDataManager(engine.NewInternalDB(nil, nil, config.CgrConfig().DataDbCfg().Items), config.CgrConfig().CacheCfg(), nil)
err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "routeID", "", "", "", "")
err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "routeID", &DispatcherRoute{}, "", "", "")
expected := "HOST_NOT_FOUND"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
}
func TestLibDispatcherLoadStrategyDispatchCaseHosts(t *testing.T) {
func TestLibDispatcherLoadStrategyDispatchCaseCallError(t *testing.T) {
wgDsp := &loadDispatcher{
hosts: engine.DispatcherHostProfiles{
{
ID: "testID",
// FilterIDs: []string{"filterID"},
Weight: 4,
Params: map[string]interface{}{
utils.MetaRatio: 1,
},
Blocker: false,
ID: "hostID",
},
},
defaultRatio: 1,
sorter: new(noSort),
}
dM := engine.NewDataManager(engine.NewInternalDB(nil, nil, config.CgrConfig().DataDbCfg().Items), config.CgrConfig().CacheCfg(), nil)
err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", "", "", "", "")
expected := "HOST_NOT_FOUND"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
}
func TestLibDispatcherLoadStrategyDispatchCaseHostsError(t *testing.T) {
wgDsp := &loadDispatcher{
hosts: engine.DispatcherHostProfiles{
{
ID: "testID2",
// FilterIDs: []string{"filterID"},
Weight: 4,
Params: map[string]interface{}{
utils.MetaRatio: 1,
},
Blocker: false,
},
},
defaultRatio: 1,
sorter: new(noSort),
}
err := wgDsp.Dispatch(nil, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", "", "", "", "")
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
err := wgDsp.Dispatch(nil, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", &DispatcherRoute{}, "", "", "")
expected := "NO_DATABASE_CONNECTION"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
@@ -522,7 +492,7 @@ func TestLibDispatcherLoadStrategyDispatchCaseHostsCastError(t *testing.T) {
defaultRatio: 1,
sorter: new(noSort),
}
err := wgDsp.Dispatch(nil, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", "", "", "", "")
err := wgDsp.Dispatch(nil, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", &DispatcherRoute{}, "", "", "")
expected := "cannot cast false to *LoadMetrics"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -547,7 +517,7 @@ func TestLibDispatcherLoadStrategyDispatchCaseHostsCastError2(t *testing.T) {
defaultRatio: 1,
sorter: new(noSort),
}
err := wgDsp.Dispatch(nil, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", "", "", "", "")
err := wgDsp.Dispatch(nil, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", &DispatcherRoute{}, "", "", "")
expected := "cannot convert field<bool>: false to int"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -572,8 +542,8 @@ func TestLibDispatcherSingleResultDispatcherCastError(t *testing.T) {
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes",
value, nil, true, utils.NonTransactional)
wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}}
err := wgDsp.Dispatch(nil, nil, config.CgrConfig(), context.Background(), nil, nil, "", "testID", utils.MetaAttributes, "", "", "")
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
err := wgDsp.Dispatch(nil, nil, config.CgrConfig(), context.Background(), nil, nil, "", "testID", &DispatcherRoute{}, "", "", "")
expected := "NO_DATABASE_CONNECTION"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
@@ -586,34 +556,6 @@ func (*mockTypeCon) Call(ctx *context.Context, method string, args interface{},
return utils.ErrNotFound
}
func TestLibDispatcherSingleResultDispatcherCastError2(t *testing.T) {
cacheInit := engine.Cache
cfg := config.NewDefaultCGRConfig()
dm := engine.NewDataManager(nil, nil, nil)
newCache := engine.NewCacheS(cfg, dm, nil)
engine.Cache = newCache
value := &engine.DispatcherHost{
Tenant: "testTenant",
RemoteHost: &config.RemoteHost{
ID: "testID",
Address: rpcclient.InternalRPC,
Transport: utils.MetaInternal,
TLS: false,
},
}
chanRPC := make(chan birpc.ClientConnector, 1)
chanRPC <- new(mockTypeCon)
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes",
value, nil, true, utils.NonTransactional)
wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}}
err := wgDsp.Dispatch(nil, nil, cfg, context.Background(), chanRPC, nil, "testTenant", "testID", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
if err != utils.ErrNotFound {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.ErrNotFound, err)
}
engine.Cache = cacheInit
}
func TestLibDispatcherBroadcastDispatcherDispatchError1(t *testing.T) {
cacheInit := engine.Cache
cfg := config.NewDefaultCGRConfig()
@@ -632,7 +574,7 @@ func TestLibDispatcherBroadcastDispatcherDispatchError1(t *testing.T) {
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes",
value, nil, true, utils.NonTransactional)
wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}}
err := wgDsp.Dispatch(nil, nil, cfg, context.Background(), nil, nil, "testTenant", "testID", utils.MetaAttributes, "", "", "")
err := wgDsp.Dispatch(nil, nil, cfg, context.Background(), nil, nil, "testTenant", "testID", &DispatcherRoute{}, "", "", "")
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -650,7 +592,7 @@ func TestLibDispatcherBroadcastDispatcherDispatchError2(t *testing.T) {
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID",
nil, nil, true, utils.NonTransactional)
wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}}
err := wgDsp.Dispatch(nil, nil, cfg, context.Background(), nil, nil, "testTenant", "testID", utils.MetaAttributes, "", "", "")
err := wgDsp.Dispatch(nil, nil, cfg, context.Background(), nil, nil, "testTenant", "testID", &DispatcherRoute{}, "", "", "")
expected := "HOST_NOT_FOUND"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -676,113 +618,7 @@ func TestLibDispatcherBroadcastDispatcherDispatchError3(t *testing.T) {
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID",
value, nil, true, utils.NonTransactional)
wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}}
err := wgDsp.Dispatch(nil, nil, cfg, context.Background(), nil, nil, "testTenant", "testID", utils.MetaAttributes, "", "", "")
if err != nil {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
}
engine.Cache = cacheInit
}
func TestLibDispatcherLoadDispatcherCacheError(t *testing.T) {
cacheInit := engine.Cache
cfg := config.NewDefaultCGRConfig()
dm := engine.NewDataManager(nil, nil, nil)
newCache := engine.NewCacheS(cfg, dm, nil)
engine.Cache = newCache
value := &engine.DispatcherHost{
Tenant: "testTenant",
RemoteHost: &config.RemoteHost{
ID: "testID",
Address: "",
Transport: "",
TLS: false,
},
}
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes",
value, nil, true, utils.NonTransactional)
wgDsp := &loadDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}}
err := wgDsp.Dispatch(nil, nil, cfg, context.Background(), nil, nil, "testTenant", "testID", utils.MetaAttributes, "", "", "")
expected := "HOST_NOT_FOUND"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
engine.Cache = cacheInit
}
func TestLibDispatcherLoadDispatcherCacheError2(t *testing.T) {
cacheInit := engine.Cache
cfg := config.NewDefaultCGRConfig()
dm := engine.NewDataManager(nil, nil, nil)
newCache := engine.NewCacheS(cfg, dm, nil)
engine.Cache = newCache
value := &engine.DispatcherHost{
Tenant: "testTenant",
RemoteHost: &config.RemoteHost{
ID: "testID",
Address: rpcclient.InternalRPC,
Transport: utils.MetaInternal,
TLS: false,
},
}
chanRPC := make(chan birpc.ClientConnector, 1)
chanRPC <- new(mockTypeCon)
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes",
value, nil, true, utils.NonTransactional)
wgDsp := &loadDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}}
err := wgDsp.Dispatch(nil, nil, cfg, context.Background(), chanRPC, nil, "testTenant", "testID", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
if err != utils.ErrNotFound {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.ErrNotFound, err)
}
engine.Cache = cacheInit
}
func TestLibDispatcherLoadDispatcherCacheError3(t *testing.T) {
cacheInit := engine.Cache
cfg := config.NewDefaultCGRConfig()
dm := engine.NewDataManager(nil, nil, nil)
newCache := engine.NewCacheS(cfg, dm, nil)
engine.Cache = newCache
value := &engine.DispatcherHost{
Tenant: "testTenant",
RemoteHost: &config.RemoteHost{
ID: "testID",
Address: rpcclient.InternalRPC,
Transport: utils.MetaInternal,
TLS: false,
},
}
chanRPC := make(chan birpc.ClientConnector, 1)
chanRPC <- new(mockTypeCon)
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTENANT:testID",
value, nil, true, utils.NonTransactional)
wgDsp := &loadDispatcher{
tntID: "testTENANT",
hosts: engine.DispatcherHostProfiles{
{
ID: "testID",
// FilterIDs: []string{"filterID1", "filterID2"},
Weight: 3,
Params: map[string]interface{}{
utils.MetaRatio: 1,
},
Blocker: true,
},
{
ID: "testID2",
// FilterIDs: []string{"filterID1", "filterID2"},
Weight: 3,
Params: map[string]interface{}{
utils.MetaRatio: 2,
},
Blocker: true,
},
},
defaultRatio: 0,
sorter: new(noSort),
}
err := wgDsp.Dispatch(dm, nil, cfg, context.Background(), chanRPC, nil, "testTENANT", "testID", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
err := wgDsp.Dispatch(nil, nil, cfg, context.Background(), nil, nil, "testTenant", "testID", &DispatcherRoute{}, "", "", "")
if err != nil {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
}
@@ -807,7 +643,8 @@ func TestLibDispatcherLoadDispatcherCacheError4(t *testing.T) {
},
}
connMng := engine.NewConnManager(cfg)
dm := engine.NewDataManager(nil, nil, connMng)
dataDB := engine.NewInternalDB(nil, nil, nil)
dm := engine.NewDataManager(dataDB, nil, connMng)
newCache := engine.NewCacheS(cfg, dm, nil)
engine.Cache = newCache
@@ -848,7 +685,7 @@ func TestLibDispatcherLoadDispatcherCacheError4(t *testing.T) {
defaultRatio: 0,
sorter: new(noSort),
}
err := wgDsp.Dispatch(dm, nil, cfg, context.Background(), nil, nil, "testTENANT", "testID", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
err := wgDsp.Dispatch(dm, nil, cfg, context.Background(), nil, nil, "testTENANT", "testID", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
expected := "DISCONNECTED"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -898,7 +735,7 @@ func TestLibDispatcherLoadDispatcherCacheError5(t *testing.T) {
defaultRatio: 0,
sorter: new(noSort),
}
err := wgDsp.Dispatch(nil, nil, cfg, context.Background(), chanRPC, nil, "testTenant", "testID", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
err := wgDsp.Dispatch(nil, nil, cfg, context.Background(), chanRPC, nil, "testTenant", "testID", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
if err == nil {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "connection is shut down", err)
}
@@ -924,7 +761,7 @@ func TestLibDispatcherSingleResultDispatcherCase1(t *testing.T) {
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID",
value, nil, true, utils.NonTransactional)
wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}}
err := wgDsp.Dispatch(dm, nil, cfg, context.Background(), chanRPC, nil, "testTenant", "", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
err := wgDsp.Dispatch(dm, nil, cfg, context.Background(), chanRPC, nil, "testTenant", "", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
if err == nil {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "connection is shut down", err)
}
@@ -957,7 +794,7 @@ func TestLibDispatcherSingleResultDispatcherCase2(t *testing.T) {
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID",
value, nil, true, utils.NonTransactional)
wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}}
err := wgDsp.Dispatch(dm, nil, cfg, context.Background(), chanRPC, nil, "testTenant", "routeID", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
err := wgDsp.Dispatch(dm, nil, cfg, context.Background(), chanRPC, nil, "testTenant", "routeID", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
if err != nil {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
}
@@ -999,7 +836,7 @@ func TestLibDispatcherSingleResultDispatcherCase3(t *testing.T) {
engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID",
value, nil, true, utils.NonTransactional)
wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}}
err := wgDsp.Dispatch(dm, nil, cfg, context.Background(), chanRPC, nil, "testTenant", "routeID", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
err := wgDsp.Dispatch(dm, nil, cfg, context.Background(), chanRPC, nil, "testTenant", "routeID", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp)
expected := "DISCONNECTED"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -1018,7 +855,7 @@ func TestLibDispatcherDispatchFilterError(t *testing.T) {
}},
}
expErrMsg := "inline parse error for string: <*wrongType>"
if err := dsp.Dispatch(nil, flts, cfg, context.Background(), nil, nil, "", "", "", "", "", ""); err == nil || err.Error() != expErrMsg {
if err := dsp.Dispatch(nil, flts, cfg, context.Background(), nil, nil, "", "", &DispatcherRoute{}, "", "", ""); err == nil || err.Error() != expErrMsg {
t.Errorf("Expected error: %s received: %v", expErrMsg, err)
}
dsp = &loadDispatcher{
@@ -1029,7 +866,7 @@ func TestLibDispatcherDispatchFilterError(t *testing.T) {
}},
defaultRatio: 1,
}
if err := dsp.Dispatch(nil, flts, cfg, context.Background(), nil, nil, "", "", "", "", "", ""); err == nil || err.Error() != expErrMsg {
if err := dsp.Dispatch(nil, flts, cfg, context.Background(), nil, nil, "", "", &DispatcherRoute{}, "", "", ""); err == nil || err.Error() != expErrMsg {
t.Errorf("Expected error: %s received: %v", expErrMsg, err)
}
dsp = &broadcastDispatcher{
@@ -1038,26 +875,11 @@ func TestLibDispatcherDispatchFilterError(t *testing.T) {
FilterIDs: []string{"*wrongType"},
}},
}
if err := dsp.Dispatch(nil, flts, cfg, context.Background(), nil, nil, "", "", "", "", "", ""); err == nil || err.Error() != expErrMsg {
if err := dsp.Dispatch(nil, flts, cfg, context.Background(), nil, nil, "", "", &DispatcherRoute{}, "", "", ""); err == nil || err.Error() != expErrMsg {
t.Errorf("Expected error: %s received: %v", expErrMsg, err)
}
}
func TestLibDispatcherDispatchHostNotFound(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
flts := engine.NewFilterS(cfg, nil, nil)
db := engine.NewDataManager(engine.NewInternalDB(nil, nil, config.CgrConfig().DataDbCfg().Items), nil, nil)
var dsp Dispatcher = &singleResultDispatcher{
sorter: new(noSort),
hosts: engine.DispatcherHostProfiles{{
ID: "testID",
}},
}
if err := dsp.Dispatch(db, flts, cfg, context.Background(), nil, nil, "", "", "", "", "", ""); err != utils.ErrHostNotFound {
t.Errorf("Expected error: %s received: %v", utils.ErrHostNotFound, err)
}
}
func TestLibDispatcherRandomSort(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
flts := engine.NewFilterS(cfg, nil, nil)