From 1ed6d8256bea456ad2e112fec3ef2816a2c26cf0 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Fri, 9 Sep 2022 18:21:31 +0300 Subject: [PATCH] Fix dispatcher tests --- dispatchers/libdispatcher.go | 80 +++++----- dispatchers/libdispatcher_test.go | 240 ++++-------------------------- 2 files changed, 69 insertions(+), 251 deletions(-) diff --git a/dispatchers/libdispatcher.go b/dispatchers/libdispatcher.go index 63ecf52a6..521bf723b 100644 --- a/dispatchers/libdispatcher.go +++ b/dispatchers/libdispatcher.go @@ -176,7 +176,7 @@ func newSingleDispatcher(hosts engine.DispatcherHostProfiles, params map[string] } // singleResultDispatcher routes the event to a single host -// implements Dispatcher interface +// implements Dispatcher interface type singleResultDispatcher struct { sorter hostSorter hosts engine.DispatcherHostProfiles @@ -202,32 +202,30 @@ func (sd *singleResultDispatcher) Dispatch(dm *engine.DataManager, flts *engine. } else if len(hostIDs) == 0 { // in case we do not match any host return utils.ErrHostNotFound } - for _, hostID := range hostIDs { - var dRh *DispatcherRoute - if routeID != utils.EmptyString { - dRh = &DispatcherRoute{ - Tenant: dR.Tenant, - ProfileID: dR.ProfileID, - HostID: hostID, - } - } - if err = callDHwithID(ctx, tnt, hostID, routeID, dRh, dm, - cfg, iPRCCh, serviceMethod, args, reply); err == nil || - (err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors - return - } - if err != nil { - // not found or network errors will continue with standard dispatching - utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with identity <%s>", - utils.DispatcherS, err.Error(), hostID)) + hostID := hostIDs[0] + var dRh *DispatcherRoute + if routeID != utils.EmptyString { + dRh = &DispatcherRoute{ + Tenant: dR.Tenant, + ProfileID: dR.ProfileID, + HostID: hostID, } + } + if err = callDHwithID(ctx, tnt, hostID, routeID, dRh, dm, + cfg, iPRCCh, serviceMethod, args, reply); err == nil || + (err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors return } + if err != nil { + // not found or network errors will continue with standard dispatching + utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with identity <%s>", + utils.DispatcherS, err.Error(), hostID)) + } return } // broadcastDispatcher routes the event to multiple hosts in a pool -// implements the Dispatcher interface +// implements the Dispatcher interface type broadcastDispatcher struct { strategy string hosts engine.DispatcherHostProfiles @@ -317,30 +315,28 @@ func (ld *loadDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS, } else if len(hostIDs) == 0 { // in case we do not match any host return utils.ErrHostNotFound } - for _, hostID := range hostIDs { - var dRh *DispatcherRoute - if routeID != utils.EmptyString { - dRh = &DispatcherRoute{ - Tenant: dR.Tenant, - ProfileID: dR.ProfileID, - HostID: hostID, - } - } - lM.incrementLoad(ctx, hostID, ld.tntID) - err = callDHwithID(ctx, tnt, hostID, routeID, dRh, dm, - cfg, iPRCCh, serviceMethod, args, reply) - lM.decrementLoad(ctx, hostID, ld.tntID) // call ended - if err == nil || - (err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors - return - } - if err != nil { - // not found or network errors will continue with standard dispatching - utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with id <%q>", - utils.DispatcherS, err.Error(), hostID)) + hostID := hostIDs[0] + var dRh *DispatcherRoute + if routeID != utils.EmptyString { + dRh = &DispatcherRoute{ + Tenant: dR.Tenant, + ProfileID: dR.ProfileID, + HostID: hostID, } + } + lM.incrementLoad(ctx, hostID, ld.tntID) + err = callDHwithID(ctx, tnt, hostID, routeID, dRh, dm, + cfg, iPRCCh, serviceMethod, args, reply) + lM.decrementLoad(ctx, hostID, ld.tntID) // call ended + if err == nil || + (err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors return } + if err != nil { + // not found or network errors will continue with standard dispatching + utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with id <%q>", + utils.DispatcherS, err.Error(), hostID)) + } return } @@ -420,7 +416,7 @@ func (lM *LoadMetrics) decrementLoad(ctx *context.Context, hostID, tntID string) } // callDHwithID is a wrapper on callDH using ID of the host which the other cannot do due to lazyDH -// if routeID provided, will also cache once the call is successful +// if routeID provided, will also cache once the call is successful func callDHwithID(ctx *context.Context, tnt, hostID, routeID string, dR *DispatcherRoute, dm *engine.DataManager, cfg *config.CGRConfig, iPRCCh chan birpc.ClientConnector, serviceMethod string, args, reply interface{}) (err error) { diff --git a/dispatchers/libdispatcher_test.go b/dispatchers/libdispatcher_test.go index 23b1c2f2c..ee0a267ec 100644 --- a/dispatchers/libdispatcher_test.go +++ b/dispatchers/libdispatcher_test.go @@ -31,7 +31,7 @@ import ( "github.com/cgrates/rpcclient" ) -func TestLoadMetricsGetHosts(t *testing.T) { +func TestLibDispatcherLoadMetricsGetHosts(t *testing.T) { dhp := engine.DispatcherHostProfiles{ {ID: "DSP_1", Params: map[string]interface{}{utils.MetaRatio: 1}}, {ID: "DSP_2", Params: map[string]interface{}{utils.MetaRatio: 1}}, @@ -66,7 +66,7 @@ func TestLoadMetricsGetHosts(t *testing.T) { } } -func TestNewSingleDispatcher(t *testing.T) { +func TestLibDispatcherNewSingleDispatcher(t *testing.T) { dhp := engine.DispatcherHostProfiles{ {ID: "DSP_1"}, {ID: "DSP_2"}, @@ -132,7 +132,7 @@ func TestNewSingleDispatcher(t *testing.T) { } } -func TestNewLoadMetrics(t *testing.T) { +func TestLibDispatcherNewLoadMetrics(t *testing.T) { dhp := engine.DispatcherHostProfiles{ {ID: "DSP_1", Params: map[string]interface{}{utils.MetaRatio: 1}}, {ID: "DSP_2", Params: map[string]interface{}{utils.MetaRatio: 0}}, @@ -159,7 +159,7 @@ func TestNewLoadMetrics(t *testing.T) { } } -func TestLoadMetricsGetHosts2(t *testing.T) { +func TestLibDispatcherLoadMetricsGetHosts2(t *testing.T) { dhp := engine.DispatcherHostProfiles{ {ID: "DSP_1", Params: map[string]interface{}{utils.MetaRatio: 2}}, {ID: "DSP_2", Params: map[string]interface{}{utils.MetaRatio: 3}}, @@ -389,7 +389,7 @@ func TestLibDispatcherNewDispatcherError(t *testing.T) { func TestLibDispatcherSingleResultDispatcherDispatch(t *testing.T) { wgDsp := &singleResultDispatcher{sorter: new(noSort)} dM := engine.NewDataManager(engine.NewInternalDB(nil, nil, config.CgrConfig().DataDbCfg().Items), config.CgrConfig().CacheCfg(), nil) - err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", "", "", "", "") + err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", &DispatcherRoute{}, "", "", "") expected := "HOST_NOT_FOUND" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -399,7 +399,7 @@ func TestLibDispatcherSingleResultDispatcherDispatch(t *testing.T) { func TestLibDispatcherSingleResultDispatcherDispatchRouteID(t *testing.T) { wgDsp := &singleResultDispatcher{sorter: new(roundRobinSort)} dM := engine.NewDataManager(engine.NewInternalDB(nil, nil, config.CgrConfig().DataDbCfg().Items), config.CgrConfig().CacheCfg(), nil) - err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "routeID", "", "", "", "") + err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "routeID", &DispatcherRoute{}, "", "", "") expected := "HOST_NOT_FOUND" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -409,7 +409,7 @@ func TestLibDispatcherSingleResultDispatcherDispatchRouteID(t *testing.T) { func TestLibDispatcherBroadcastDispatcherDispatch(t *testing.T) { wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} dM := engine.NewDataManager(engine.NewInternalDB(nil, nil, config.CgrConfig().DataDbCfg().Items), config.CgrConfig().CacheCfg(), nil) - err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", "", "", "", "") + err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", &DispatcherRoute{}, "", "", "") expected := "HOST_NOT_FOUND" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -419,7 +419,7 @@ func TestLibDispatcherBroadcastDispatcherDispatch(t *testing.T) { func TestLibDispatcherBroadcastDispatcherDispatchRouteID(t *testing.T) { wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} dM := engine.NewDataManager(engine.NewInternalDB(nil, nil, config.CgrConfig().DataDbCfg().Items), config.CgrConfig().CacheCfg(), nil) - err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "routeID", "", "", "", "") + err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "routeID", &DispatcherRoute{}, "", "", "") expected := "HOST_NOT_FOUND" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -429,7 +429,7 @@ func TestLibDispatcherBroadcastDispatcherDispatchRouteID(t *testing.T) { func TestLibDispatcherLoadDispatcherDispatch(t *testing.T) { wgDsp := &loadDispatcher{sorter: new(randomSort)} dM := engine.NewDataManager(engine.NewInternalDB(nil, nil, config.CgrConfig().DataDbCfg().Items), config.CgrConfig().CacheCfg(), nil) - err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", "", "", "", "") + err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", &DispatcherRoute{}, "", "", "") expected := "HOST_NOT_FOUND" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -445,55 +445,25 @@ func TestLibDispatcherLoadDispatcherDispatchHostsID(t *testing.T) { sorter: new(noSort), } dM := engine.NewDataManager(engine.NewInternalDB(nil, nil, config.CgrConfig().DataDbCfg().Items), config.CgrConfig().CacheCfg(), nil) - err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "routeID", "", "", "", "") + err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "routeID", &DispatcherRoute{}, "", "", "") expected := "HOST_NOT_FOUND" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } } -func TestLibDispatcherLoadStrategyDispatchCaseHosts(t *testing.T) { +func TestLibDispatcherLoadStrategyDispatchCaseCallError(t *testing.T) { wgDsp := &loadDispatcher{ hosts: engine.DispatcherHostProfiles{ { - ID: "testID", - // FilterIDs: []string{"filterID"}, - Weight: 4, - Params: map[string]interface{}{ - utils.MetaRatio: 1, - }, - Blocker: false, + ID: "hostID", }, }, defaultRatio: 1, sorter: new(noSort), } - dM := engine.NewDataManager(engine.NewInternalDB(nil, nil, config.CgrConfig().DataDbCfg().Items), config.CgrConfig().CacheCfg(), nil) - err := wgDsp.Dispatch(dM, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", "", "", "", "") - expected := "HOST_NOT_FOUND" - if err == nil || err.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) - } -} - -func TestLibDispatcherLoadStrategyDispatchCaseHostsError(t *testing.T) { - wgDsp := &loadDispatcher{ - hosts: engine.DispatcherHostProfiles{ - { - ID: "testID2", - // FilterIDs: []string{"filterID"}, - Weight: 4, - Params: map[string]interface{}{ - utils.MetaRatio: 1, - }, - Blocker: false, - }, - }, - defaultRatio: 1, - sorter: new(noSort), - } - err := wgDsp.Dispatch(nil, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", "", "", "", "") - expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" + err := wgDsp.Dispatch(nil, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", &DispatcherRoute{}, "", "", "") + expected := "NO_DATABASE_CONNECTION" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } @@ -522,7 +492,7 @@ func TestLibDispatcherLoadStrategyDispatchCaseHostsCastError(t *testing.T) { defaultRatio: 1, sorter: new(noSort), } - err := wgDsp.Dispatch(nil, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", "", "", "", "") + err := wgDsp.Dispatch(nil, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", &DispatcherRoute{}, "", "", "") expected := "cannot cast false to *LoadMetrics" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -547,7 +517,7 @@ func TestLibDispatcherLoadStrategyDispatchCaseHostsCastError2(t *testing.T) { defaultRatio: 1, sorter: new(noSort), } - err := wgDsp.Dispatch(nil, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", "", "", "", "") + err := wgDsp.Dispatch(nil, nil, config.CgrConfig(), context.Background(), nil, nil, "", "", &DispatcherRoute{}, "", "", "") expected := "cannot convert field: false to int" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -572,8 +542,8 @@ func TestLibDispatcherSingleResultDispatcherCastError(t *testing.T) { engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes", value, nil, true, utils.NonTransactional) wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} - err := wgDsp.Dispatch(nil, nil, config.CgrConfig(), context.Background(), nil, nil, "", "testID", utils.MetaAttributes, "", "", "") - expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" + err := wgDsp.Dispatch(nil, nil, config.CgrConfig(), context.Background(), nil, nil, "", "testID", &DispatcherRoute{}, "", "", "") + expected := "NO_DATABASE_CONNECTION" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } @@ -586,34 +556,6 @@ func (*mockTypeCon) Call(ctx *context.Context, method string, args interface{}, return utils.ErrNotFound } -func TestLibDispatcherSingleResultDispatcherCastError2(t *testing.T) { - cacheInit := engine.Cache - cfg := config.NewDefaultCGRConfig() - dm := engine.NewDataManager(nil, nil, nil) - newCache := engine.NewCacheS(cfg, dm, nil) - engine.Cache = newCache - value := &engine.DispatcherHost{ - Tenant: "testTenant", - RemoteHost: &config.RemoteHost{ - ID: "testID", - Address: rpcclient.InternalRPC, - Transport: utils.MetaInternal, - TLS: false, - }, - } - - chanRPC := make(chan birpc.ClientConnector, 1) - chanRPC <- new(mockTypeCon) - engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes", - value, nil, true, utils.NonTransactional) - wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} - err := wgDsp.Dispatch(nil, nil, cfg, context.Background(), chanRPC, nil, "testTenant", "testID", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) - if err != utils.ErrNotFound { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.ErrNotFound, err) - } - engine.Cache = cacheInit -} - func TestLibDispatcherBroadcastDispatcherDispatchError1(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() @@ -632,7 +574,7 @@ func TestLibDispatcherBroadcastDispatcherDispatchError1(t *testing.T) { engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes", value, nil, true, utils.NonTransactional) wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} - err := wgDsp.Dispatch(nil, nil, cfg, context.Background(), nil, nil, "testTenant", "testID", utils.MetaAttributes, "", "", "") + err := wgDsp.Dispatch(nil, nil, cfg, context.Background(), nil, nil, "testTenant", "testID", &DispatcherRoute{}, "", "", "") expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -650,7 +592,7 @@ func TestLibDispatcherBroadcastDispatcherDispatchError2(t *testing.T) { engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID", nil, nil, true, utils.NonTransactional) wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} - err := wgDsp.Dispatch(nil, nil, cfg, context.Background(), nil, nil, "testTenant", "testID", utils.MetaAttributes, "", "", "") + err := wgDsp.Dispatch(nil, nil, cfg, context.Background(), nil, nil, "testTenant", "testID", &DispatcherRoute{}, "", "", "") expected := "HOST_NOT_FOUND" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -676,113 +618,7 @@ func TestLibDispatcherBroadcastDispatcherDispatchError3(t *testing.T) { engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID", value, nil, true, utils.NonTransactional) wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} - err := wgDsp.Dispatch(nil, nil, cfg, context.Background(), nil, nil, "testTenant", "testID", utils.MetaAttributes, "", "", "") - if err != nil { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) - } - engine.Cache = cacheInit -} - -func TestLibDispatcherLoadDispatcherCacheError(t *testing.T) { - cacheInit := engine.Cache - cfg := config.NewDefaultCGRConfig() - dm := engine.NewDataManager(nil, nil, nil) - newCache := engine.NewCacheS(cfg, dm, nil) - engine.Cache = newCache - value := &engine.DispatcherHost{ - Tenant: "testTenant", - RemoteHost: &config.RemoteHost{ - ID: "testID", - Address: "", - Transport: "", - TLS: false, - }, - } - engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes", - value, nil, true, utils.NonTransactional) - wgDsp := &loadDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} - err := wgDsp.Dispatch(nil, nil, cfg, context.Background(), nil, nil, "testTenant", "testID", utils.MetaAttributes, "", "", "") - expected := "HOST_NOT_FOUND" - if err == nil || err.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) - } - engine.Cache = cacheInit -} - -func TestLibDispatcherLoadDispatcherCacheError2(t *testing.T) { - cacheInit := engine.Cache - cfg := config.NewDefaultCGRConfig() - dm := engine.NewDataManager(nil, nil, nil) - newCache := engine.NewCacheS(cfg, dm, nil) - engine.Cache = newCache - value := &engine.DispatcherHost{ - Tenant: "testTenant", - RemoteHost: &config.RemoteHost{ - ID: "testID", - Address: rpcclient.InternalRPC, - Transport: utils.MetaInternal, - TLS: false, - }, - } - - chanRPC := make(chan birpc.ClientConnector, 1) - chanRPC <- new(mockTypeCon) - engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes", - value, nil, true, utils.NonTransactional) - wgDsp := &loadDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} - err := wgDsp.Dispatch(nil, nil, cfg, context.Background(), chanRPC, nil, "testTenant", "testID", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) - if err != utils.ErrNotFound { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.ErrNotFound, err) - } - engine.Cache = cacheInit -} - -func TestLibDispatcherLoadDispatcherCacheError3(t *testing.T) { - cacheInit := engine.Cache - cfg := config.NewDefaultCGRConfig() - dm := engine.NewDataManager(nil, nil, nil) - newCache := engine.NewCacheS(cfg, dm, nil) - engine.Cache = newCache - value := &engine.DispatcherHost{ - Tenant: "testTenant", - RemoteHost: &config.RemoteHost{ - ID: "testID", - Address: rpcclient.InternalRPC, - Transport: utils.MetaInternal, - TLS: false, - }, - } - - chanRPC := make(chan birpc.ClientConnector, 1) - chanRPC <- new(mockTypeCon) - engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTENANT:testID", - value, nil, true, utils.NonTransactional) - wgDsp := &loadDispatcher{ - tntID: "testTENANT", - hosts: engine.DispatcherHostProfiles{ - { - ID: "testID", - // FilterIDs: []string{"filterID1", "filterID2"}, - Weight: 3, - Params: map[string]interface{}{ - utils.MetaRatio: 1, - }, - Blocker: true, - }, - { - ID: "testID2", - // FilterIDs: []string{"filterID1", "filterID2"}, - Weight: 3, - Params: map[string]interface{}{ - utils.MetaRatio: 2, - }, - Blocker: true, - }, - }, - defaultRatio: 0, - sorter: new(noSort), - } - err := wgDsp.Dispatch(dm, nil, cfg, context.Background(), chanRPC, nil, "testTENANT", "testID", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) + err := wgDsp.Dispatch(nil, nil, cfg, context.Background(), nil, nil, "testTenant", "testID", &DispatcherRoute{}, "", "", "") if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } @@ -807,7 +643,8 @@ func TestLibDispatcherLoadDispatcherCacheError4(t *testing.T) { }, } connMng := engine.NewConnManager(cfg) - dm := engine.NewDataManager(nil, nil, connMng) + dataDB := engine.NewInternalDB(nil, nil, nil) + dm := engine.NewDataManager(dataDB, nil, connMng) newCache := engine.NewCacheS(cfg, dm, nil) engine.Cache = newCache @@ -848,7 +685,7 @@ func TestLibDispatcherLoadDispatcherCacheError4(t *testing.T) { defaultRatio: 0, sorter: new(noSort), } - err := wgDsp.Dispatch(dm, nil, cfg, context.Background(), nil, nil, "testTENANT", "testID", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) + err := wgDsp.Dispatch(dm, nil, cfg, context.Background(), nil, nil, "testTENANT", "testID", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) expected := "DISCONNECTED" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -898,7 +735,7 @@ func TestLibDispatcherLoadDispatcherCacheError5(t *testing.T) { defaultRatio: 0, sorter: new(noSort), } - err := wgDsp.Dispatch(nil, nil, cfg, context.Background(), chanRPC, nil, "testTenant", "testID", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) + err := wgDsp.Dispatch(nil, nil, cfg, context.Background(), chanRPC, nil, "testTenant", "testID", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) if err == nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "connection is shut down", err) } @@ -924,7 +761,7 @@ func TestLibDispatcherSingleResultDispatcherCase1(t *testing.T) { engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID", value, nil, true, utils.NonTransactional) wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} - err := wgDsp.Dispatch(dm, nil, cfg, context.Background(), chanRPC, nil, "testTenant", "", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) + err := wgDsp.Dispatch(dm, nil, cfg, context.Background(), chanRPC, nil, "testTenant", "", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) if err == nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "connection is shut down", err) } @@ -957,7 +794,7 @@ func TestLibDispatcherSingleResultDispatcherCase2(t *testing.T) { engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID", value, nil, true, utils.NonTransactional) wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} - err := wgDsp.Dispatch(dm, nil, cfg, context.Background(), chanRPC, nil, "testTenant", "routeID", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) + err := wgDsp.Dispatch(dm, nil, cfg, context.Background(), chanRPC, nil, "testTenant", "routeID", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } @@ -999,7 +836,7 @@ func TestLibDispatcherSingleResultDispatcherCase3(t *testing.T) { engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID", value, nil, true, utils.NonTransactional) wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} - err := wgDsp.Dispatch(dm, nil, cfg, context.Background(), chanRPC, nil, "testTenant", "routeID", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) + err := wgDsp.Dispatch(dm, nil, cfg, context.Background(), chanRPC, nil, "testTenant", "routeID", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) expected := "DISCONNECTED" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -1018,7 +855,7 @@ func TestLibDispatcherDispatchFilterError(t *testing.T) { }}, } expErrMsg := "inline parse error for string: <*wrongType>" - if err := dsp.Dispatch(nil, flts, cfg, context.Background(), nil, nil, "", "", "", "", "", ""); err == nil || err.Error() != expErrMsg { + if err := dsp.Dispatch(nil, flts, cfg, context.Background(), nil, nil, "", "", &DispatcherRoute{}, "", "", ""); err == nil || err.Error() != expErrMsg { t.Errorf("Expected error: %s received: %v", expErrMsg, err) } dsp = &loadDispatcher{ @@ -1029,7 +866,7 @@ func TestLibDispatcherDispatchFilterError(t *testing.T) { }}, defaultRatio: 1, } - if err := dsp.Dispatch(nil, flts, cfg, context.Background(), nil, nil, "", "", "", "", "", ""); err == nil || err.Error() != expErrMsg { + if err := dsp.Dispatch(nil, flts, cfg, context.Background(), nil, nil, "", "", &DispatcherRoute{}, "", "", ""); err == nil || err.Error() != expErrMsg { t.Errorf("Expected error: %s received: %v", expErrMsg, err) } dsp = &broadcastDispatcher{ @@ -1038,26 +875,11 @@ func TestLibDispatcherDispatchFilterError(t *testing.T) { FilterIDs: []string{"*wrongType"}, }}, } - if err := dsp.Dispatch(nil, flts, cfg, context.Background(), nil, nil, "", "", "", "", "", ""); err == nil || err.Error() != expErrMsg { + if err := dsp.Dispatch(nil, flts, cfg, context.Background(), nil, nil, "", "", &DispatcherRoute{}, "", "", ""); err == nil || err.Error() != expErrMsg { t.Errorf("Expected error: %s received: %v", expErrMsg, err) } } -func TestLibDispatcherDispatchHostNotFound(t *testing.T) { - cfg := config.NewDefaultCGRConfig() - flts := engine.NewFilterS(cfg, nil, nil) - db := engine.NewDataManager(engine.NewInternalDB(nil, nil, config.CgrConfig().DataDbCfg().Items), nil, nil) - var dsp Dispatcher = &singleResultDispatcher{ - sorter: new(noSort), - hosts: engine.DispatcherHostProfiles{{ - ID: "testID", - }}, - } - if err := dsp.Dispatch(db, flts, cfg, context.Background(), nil, nil, "", "", "", "", "", ""); err != utils.ErrHostNotFound { - t.Errorf("Expected error: %s received: %v", utils.ErrHostNotFound, err) - } -} - func TestLibDispatcherRandomSort(t *testing.T) { cfg := config.NewDefaultCGRConfig() flts := engine.NewFilterS(cfg, nil, nil)