From 6ae624538b78d8558fc312926b4f735c228ce400 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Fri, 11 Jun 2021 09:59:38 +0300 Subject: [PATCH] Updated dispatcher conns sorting --- dispatchers/dispatchers.go | 21 +- dispatchers/dispatchers_it_test.go | 5 +- dispatchers/dispatchers_test.go | 99 ++++- dispatchers/libdispatcher.go | 379 +++++++---------- dispatchers/libdispatcher_test.go | 655 +++++++++-------------------- 5 files changed, 451 insertions(+), 708 deletions(-) diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go index 6aaee0ae8..902185b43 100644 --- a/dispatchers/dispatchers.go +++ b/dispatchers/dispatchers.go @@ -103,17 +103,13 @@ func (dS *DispatcherService) authorize(method, tenant string, apiKey string, evT // dispatcherForEvent returns a dispatcher instance configured for specific event // or utils.ErrNotFound if none present func (dS *DispatcherService) dispatcherProfilesForEvent(tnt string, ev *utils.CGREvent, - subsys string) (dPrlfs engine.DispatcherProfiles, err error) { + evNm utils.MapStorage, subsys string) (dPrlfs engine.DispatcherProfiles, err error) { // find out the matching profiles anyIdxPrfx := utils.ConcatenatedKey(tnt, utils.MetaAny) idxKeyPrfx := anyIdxPrfx if subsys != "" { idxKeyPrfx = utils.ConcatenatedKey(tnt, subsys) } - evNm := utils.MapStorage{ - utils.MetaReq: ev.Event, - utils.MetaOpts: ev.APIOpts, - } var prflIDs utils.StringSet if prflIDs, err = engine.MatchingItemIDsForEvent(evNm, dS.cfg.DispatcherSCfg().StringIndexedFields, @@ -194,8 +190,12 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, if tnt == utils.EmptyString { tnt = dS.cfg.GeneralCfg().DefaultTenant } + evNm := utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + } var dPrfls engine.DispatcherProfiles - if dPrfls, err = dS.dispatcherProfilesForEvent(tnt, ev, subsys); err != nil { + if dPrfls, err = dS.dispatcherProfilesForEvent(tnt, ev, evNm, subsys); err != nil { return utils.NewErrDispatcherS(err) } for _, dPrfl := range dPrfls { @@ -205,13 +205,13 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, if x, ok := engine.Cache.Get(utils.CacheDispatchers, tntID); ok && x != nil { d = x.(Dispatcher) - } else if d, err = newDispatcher(dS.dm, dPrfl); err != nil { + } else if d, err = newDispatcher(dPrfl); err != nil { return utils.NewErrDispatcherS(err) } if err = engine.Cache.Set(utils.CacheDispatchers, tntID, d, nil, true, utils.EmptyString); err != nil { return utils.NewErrDispatcherS(err) } - if err = d.Dispatch(utils.IfaceAsString(ev.APIOpts[utils.OptsRouteID]), subsys, serviceMethod, args, reply); !rpcclient.IsNetworkError(err) { + if err = d.Dispatch(dS.dm, dS.fltrS, evNm, tnt, utils.IfaceAsString(ev.APIOpts[utils.OptsRouteID]), subsys, serviceMethod, args, reply); !rpcclient.IsNetworkError(err) { return } } @@ -224,7 +224,10 @@ func (dS *DispatcherService) V1GetProfilesForEvent(ev *utils.CGREvent, if tnt == utils.EmptyString { tnt = dS.cfg.GeneralCfg().DefaultTenant } - retDPfl, errDpfl := dS.dispatcherProfilesForEvent(tnt, ev, utils.IfaceAsString(ev.APIOpts[utils.Subsys])) + retDPfl, errDpfl := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + }, utils.IfaceAsString(ev.APIOpts[utils.Subsys])) if errDpfl != nil { return utils.NewErrDispatcherS(errDpfl) } diff --git a/dispatchers/dispatchers_it_test.go b/dispatchers/dispatchers_it_test.go index 29c734761..770864699 100644 --- a/dispatchers/dispatchers_it_test.go +++ b/dispatchers/dispatchers_it_test.go @@ -187,7 +187,10 @@ func TestDispatcherServiceDispatcherProfileForEventGetDispatchertWithoutAuthenti }, } tnt := ev.Tenant - _, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MetaAccounts) + _, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + }, utils.MetaAccounts) expected := utils.ErrNotImplemented if err == nil || err != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) diff --git a/dispatchers/dispatchers_test.go b/dispatchers/dispatchers_test.go index 6f8009e9d..3da920b83 100644 --- a/dispatchers/dispatchers_test.go +++ b/dispatchers/dispatchers_test.go @@ -84,7 +84,10 @@ func TestDispatcherServiceDispatcherProfileForEventGetDispatcherProfileNF(t *tes } tnt := ev.Tenant subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys]) - _, err = dss.dispatcherProfilesForEvent(tnt, ev, subsys) + _, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + }, subsys) expected := utils.ErrNotImplemented if err == nil || err != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -102,7 +105,10 @@ func TestDispatcherServiceDispatcherProfileForEventMIIDENotFound(t *testing.T) { ev := &utils.CGREvent{} tnt := "" subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys]) - _, err := dss.dispatcherProfilesForEvent(tnt, ev, subsys) + _, err := dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + }, subsys) if err == nil || err != utils.ErrNotFound { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.ErrNotFound, err) } @@ -571,7 +577,10 @@ func TestDispatcherServiceDispatcherProfileForEventErrNil(t *testing.T) { } tnt := ev.Tenant subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys]) - _, err = dss.dispatcherProfilesForEvent(tnt, ev, subsys) + _, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + }, subsys) if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } @@ -615,7 +624,10 @@ func TestDispatcherV1GetProfileForEventReturn(t *testing.T) { } tnt := ev.Tenant subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys]) - _, err = dss.dispatcherProfilesForEvent(tnt, ev, subsys) + _, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + }, subsys) if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } @@ -665,7 +677,10 @@ func TestDispatcherServiceDispatcherProfileForEventErrNotFound(t *testing.T) { } tnt := ev.Tenant subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys]) - _, err = dss.dispatcherProfilesForEvent(tnt, ev, subsys) + _, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + }, subsys) if err == nil || err != utils.ErrNotFound { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.ErrNotFound, err) } @@ -709,7 +724,10 @@ func TestDispatcherServiceDispatcherProfileForEventErrNotFound2(t *testing.T) { } tnt := "" subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys]) - _, err = dss.dispatcherProfilesForEvent(tnt, ev, subsys) + _, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + }, subsys) if err == nil || err != utils.ErrNotFound { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.ErrNotFound, err) } @@ -757,7 +775,10 @@ func TestDispatcherServiceDispatcherProfileForEventErrNotFoundTime(t *testing.T) } tnt := ev.Tenant subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys]) - _, err = dss.dispatcherProfilesForEvent(tnt, ev, subsys) + _, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + }, subsys) if err == nil || err != utils.ErrNotFound { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.ErrNotFound, err) } @@ -801,7 +822,10 @@ func TestDispatcherServiceDispatcherProfileForEventErrNotFoundFilter(t *testing. } tnt := ev.Tenant subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys]) - _, err = dss.dispatcherProfilesForEvent(tnt, ev, subsys) + _, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + }, subsys) if err == nil || err.Error() != "NOT_FOUND:filter" { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "NOT_FOUND:filter", err) } @@ -871,7 +895,7 @@ func TestDispatcherServiceDispatchDspErrHostNotFound(t *testing.T) { Hosts: nil, } newCache := engine.NewCacheS(cfg, dm, nil) - value, errDsp := newDispatcher(dm, dsp) + value, errDsp := newDispatcher(dsp) if errDsp != nil { t.Fatal(errDsp) } @@ -956,7 +980,10 @@ func TestDispatcherServiceDispatcherProfileForEventFoundFilter(t *testing.T) { } tnt := ev.Tenant subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys]) - _, err = dss.dispatcherProfilesForEvent(tnt, ev, subsys) + _, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + }, subsys) if err == nil || err.Error() != "NOT_FOUND" { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "NOT_FOUND:filter", err) } @@ -996,7 +1023,10 @@ func TestDispatcherServiceDispatcherProfileForEventNotNotFound(t *testing.T) { } tnt := ev.Tenant subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys]) - _, err := dss.dispatcherProfilesForEvent(tnt, ev, subsys) + _, err := dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + }, subsys) expected := utils.ErrNotImplemented if err == nil || err != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -1054,7 +1084,10 @@ func TestDispatcherServiceDispatcherProfileForEventGetDispatcherError(t *testing } tnt := ev.Tenant subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys]) - _, err = dss.dispatcherProfilesForEvent(tnt, ev, subsys) + _, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + }, subsys) if err == nil || err.Error() != "NOT_FOUND" { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "NOT_FOUND:filter", err) } @@ -1080,7 +1113,7 @@ func TestDispatcherServiceDispatchDspErrHostNotFound2(t *testing.T) { Hosts: nil, } newCache := engine.NewCacheS(cfg, dm, nil) - value, errDsp := newDispatcher(dm, dsp) + value, errDsp := newDispatcher(dsp) if errDsp != nil { t.Fatal(errDsp) } @@ -1329,7 +1362,10 @@ func TestDispatchersdispatcherProfileForEventAnySSfalses(t *testing.T) { } subsys := utils.MetaSessionS - if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, subsys); err != nil { + if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + }, subsys); err != nil { t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err) } else if len(rcv) != 1 { t.Errorf("Unexpected number of profiles:%v", len(rcv)) @@ -1348,7 +1384,10 @@ func TestDispatchersdispatcherProfileForEventAnySSfalses(t *testing.T) { t.Error(err) } - if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, subsys); err != nil { + if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + }, subsys); err != nil { t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err) } else if len(rcv) != 1 { t.Errorf("Unexpected number of profiles:%v", len(rcv)) @@ -1413,7 +1452,10 @@ func TestDispatchersdispatcherProfileForEventAnySSfalseFirstNotFound(t *testing. } subsys := utils.MetaSessionS - if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, subsys); err != nil { + if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + }, subsys); err != nil { t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err) } else if len(rcv) != 1 { t.Errorf("Unexpected number of profiles:%v", len(rcv)) @@ -1478,7 +1520,10 @@ func TestDispatchersdispatcherProfileForEventAnySSfalseFound(t *testing.T) { } subsys := utils.MetaSessionS - if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, subsys); err != nil { + if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + }, subsys); err != nil { t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err) } else if len(rcv) != 1 { t.Errorf("Unexpected number of profiles:%v", len(rcv)) @@ -1543,7 +1588,10 @@ func TestDispatchersdispatcherProfileForEventAnySSfalseNotFound(t *testing.T) { } subsys := utils.MetaSessionS - if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, subsys); err == nil || err != utils.ErrNotFound { + if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + }, subsys); err == nil || err != utils.ErrNotFound { t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", utils.ErrNotFound, err) } else if rcv != nil { t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, rcv) @@ -1605,7 +1653,10 @@ func TestDispatchersdispatcherProfileForEventAnySStrueNotFound(t *testing.T) { } subsys := utils.MetaSessionS - if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, subsys); err == nil || err != utils.ErrNotFound { + if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + }, subsys); err == nil || err != utils.ErrNotFound { t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", utils.ErrNotFound, err) } else if rcv != nil { t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, rcv) @@ -1667,7 +1718,10 @@ func TestDispatchersdispatcherProfileForEventAnySStrueBothFound(t *testing.T) { } subsys := utils.MetaSessionS - if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, subsys); err != nil { + if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + }, subsys); err != nil { t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err) } else if len(rcv) != 1 { t.Errorf("Unexpected number of profiles:%v", len(rcv)) @@ -1681,7 +1735,10 @@ func TestDispatchersdispatcherProfileForEventAnySStrueBothFound(t *testing.T) { t.Error(err) } - if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, subsys); err != nil { + if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + }, subsys); err != nil { t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err) } else if len(rcv) != 1 { t.Errorf("Unexpected number of profiles:%v", len(rcv)) diff --git a/dispatchers/libdispatcher.go b/dispatchers/libdispatcher.go index 62f388f4b..5a65d0daa 100644 --- a/dispatchers/libdispatcher.go +++ b/dispatchers/libdispatcher.go @@ -21,6 +21,7 @@ package dispatchers import ( "encoding/gob" "fmt" + "math/rand" "sort" "sync" @@ -35,187 +36,131 @@ func init() { } -// Dispatcher is responsible for routing requests to pool of connections -// there will be different implementations based on strategy - // Dispatcher is responsible for routing requests to pool of connections // there will be different implementations based on strategy type Dispatcher interface { - // SetProfile is used to update the configuration information within dispatcher - // to make sure we take decisions based on latest config - SetProfile(pfl *engine.DispatcherProfile) - // HostIDs returns the ordered list of host IDs - HostIDs() (hostIDs engine.DispatcherHostIDs) // Dispatch is used to send the method over the connections given - Dispatch(routeID string, subsystem, - serviceMethod string, args interface{}, reply interface{}) (err error) -} - -type strategyDispatcher interface { - // dispatch is used to send the method over the connections given - dispatch(dm *engine.DataManager, routeID string, subsystem, tnt string, hostIDs []string, + Dispatch(dm *engine.DataManager, flts *engine.FilterS, + ev utils.DataProvider, tnt, routeID, subsystem string, serviceMethod string, args interface{}, reply interface{}) (err error) } // newDispatcher constructs instances of Dispatcher -func newDispatcher(dm *engine.DataManager, pfl *engine.DispatcherProfile) (d Dispatcher, err error) { - pfl.Hosts.Sort() // make sure the connections are sorted +func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) { hosts := pfl.Hosts.Clone() + hosts.Sort() // make sure the connections are sorted switch pfl.Strategy { case utils.MetaWeight: - var strDsp strategyDispatcher - if strDsp, err = newSingleStrategyDispatcher(hosts, pfl.StrategyParams, pfl.TenantID()); err != nil { - return - } - d = &WeightDispatcher{ - dm: dm, - tnt: pfl.Tenant, - hosts: hosts, - strategy: strDsp, - } + return newSingleDispatcher(hosts, pfl.StrategyParams, pfl.TenantID(), new(noSort)) case utils.MetaRandom: - var strDsp strategyDispatcher - if strDsp, err = newSingleStrategyDispatcher(hosts, pfl.StrategyParams, pfl.TenantID()); err != nil { - return - } - d = &RandomDispatcher{ - dm: dm, - tnt: pfl.Tenant, - hosts: hosts, - strategy: strDsp, - } + return newSingleDispatcher(hosts, pfl.StrategyParams, pfl.TenantID(), new(randomSort)) case utils.MetaRoundRobin: - var strDsp strategyDispatcher - if strDsp, err = newSingleStrategyDispatcher(hosts, pfl.StrategyParams, pfl.TenantID()); err != nil { - return - } - d = &RoundRobinDispatcher{ - dm: dm, - tnt: pfl.Tenant, - hosts: hosts, - strategy: strDsp, - } + return newSingleDispatcher(hosts, pfl.StrategyParams, pfl.TenantID(), new(roundRobinSort)) case rpcclient.PoolBroadcast, rpcclient.PoolBroadcastSync, rpcclient.PoolBroadcastAsync: - d = &WeightDispatcher{ - dm: dm, - tnt: pfl.Tenant, + return &broadcastDispatcher{ + strategy: pfl.Strategy, hosts: hosts, - strategy: &broadcastStrategyDispatcher{strategy: pfl.Strategy}, - } + }, nil default: err = fmt.Errorf("unsupported dispatch strategy: <%s>", pfl.Strategy) } return } -// WeightDispatcher selects the next connection based on weight -type WeightDispatcher struct { - sync.RWMutex - dm *engine.DataManager - tnt string - hosts engine.DispatcherHostProfiles - strategy strategyDispatcher -} - -// SetProfile used to implement Dispatcher interface -func (wd *WeightDispatcher) SetProfile(pfl *engine.DispatcherProfile) { - wd.Lock() - pfl.Hosts.Sort() - wd.hosts = pfl.Hosts.Clone() // avoid concurrency on profile - wd.Unlock() -} - -// HostIDs used to implement Dispatcher interface -func (wd *WeightDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) { - wd.RLock() - hostIDs = wd.hosts.HostIDs() - wd.RUnlock() - return -} - -// Dispatch used to implement Dispatcher interface -func (wd *WeightDispatcher) Dispatch(routeID string, subsystem, - serviceMethod string, args interface{}, reply interface{}) (err error) { - return wd.strategy.dispatch(wd.dm, routeID, subsystem, wd.tnt, wd.HostIDs(), - serviceMethod, args, reply) -} - -// RandomDispatcher selects the next connection randomly -// together with RouteID can serve as load-balancer -type RandomDispatcher struct { - sync.RWMutex - dm *engine.DataManager - tnt string - hosts engine.DispatcherHostProfiles - strategy strategyDispatcher -} - -// SetProfile used to implement Dispatcher interface -func (d *RandomDispatcher) SetProfile(pfl *engine.DispatcherProfile) { - d.Lock() - d.hosts = pfl.Hosts.Clone() - d.Unlock() -} - -// HostIDs used to implement Dispatcher interface -func (d *RandomDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) { - d.RLock() - hostIDs = d.hosts.HostIDs() - d.RUnlock() - hostIDs.Shuffle() // randomize the connections - return -} - -// Dispatch used to implement Dispatcher interface -func (d *RandomDispatcher) Dispatch(routeID string, subsystem, - serviceMethod string, args interface{}, reply interface{}) (err error) { - return d.strategy.dispatch(d.dm, routeID, subsystem, d.tnt, d.HostIDs(), - serviceMethod, args, reply) -} - -// RoundRobinDispatcher selects the next connection in round-robin fashion -type RoundRobinDispatcher struct { - sync.RWMutex - dm *engine.DataManager - tnt string - hosts engine.DispatcherHostProfiles - hostIdx int // used for the next connection - strategy strategyDispatcher -} - -// SetProfile used to implement Dispatcher interface -func (d *RoundRobinDispatcher) SetProfile(pfl *engine.DispatcherProfile) { - d.Lock() - d.hosts = pfl.Hosts.Clone() - d.Unlock() -} - -// HostIDs used to implement Dispatcher interface -func (d *RoundRobinDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) { - d.RLock() - hostIDs = d.hosts.HostIDs() - hostIDs.ReorderFromIndex(d.hostIdx) - d.hostIdx++ - if d.hostIdx >= len(d.hosts) { - d.hostIdx = 0 +func getDispatcherHosts(fltrs *engine.FilterS, ev utils.DataProvider, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) { + hostIDs = make(engine.DispatcherHostIDs, 0, len(hosts)) + for _, host := range hosts { + var pass bool + if pass, err = fltrs.Pass(tnt, host.FilterIDs, ev); err != nil { + return + } + if pass { + hostIDs = append(hostIDs, host.ID) + if host.Blocker { + break + } + } } - d.RUnlock() return } -// Dispatch used to implement Dispatcher interface -func (d *RoundRobinDispatcher) Dispatch(routeID string, subsystem, - serviceMethod string, args interface{}, reply interface{}) (err error) { - return d.strategy.dispatch(d.dm, routeID, subsystem, d.tnt, d.HostIDs(), - serviceMethod, args, reply) +type hostSorter interface { + Sort(fltrs *engine.FilterS, ev utils.DataProvider, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) } -type singleResultstrategyDispatcher struct{} +type noSort struct{} -func (*singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID string, subsystem, tnt string, - hostIDs []string, serviceMethod string, args interface{}, reply interface{}) (err error) { +func (noSort) Sort(fltrs *engine.FilterS, ev utils.DataProvider, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) { + return getDispatcherHosts(fltrs, ev, tnt, hosts) +} + +type randomSort struct{} + +func (randomSort) Sort(fltrs *engine.FilterS, ev utils.DataProvider, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) { + rand.Shuffle(len(hosts), func(i, j int) { + hosts[i], hosts[j] = hosts[j], hosts[i] + }) + return getDispatcherHosts(fltrs, ev, tnt, hosts) +} + +type roundRobinSort struct{ nextIDx int } + +func (rs *roundRobinSort) Sort(fltrs *engine.FilterS, ev utils.DataProvider, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) { + dh := make(engine.DispatcherHostProfiles, len(hosts)) + idx := rs.nextIDx + for i := 0; i < len(dh); i++ { + if idx > len(dh)-1 { + idx = 0 + } + dh[i] = hosts[idx] + idx++ + } + rs.nextIDx++ + if rs.nextIDx >= len(hosts) { + rs.nextIDx = 0 + } + return getDispatcherHosts(fltrs, ev, tnt, dh) +} + +func newSingleDispatcher(hosts engine.DispatcherHostProfiles, params map[string]interface{}, tntID string, sorter hostSorter) (_ Dispatcher, err error) { + if dflt, has := params[utils.MetaDefaultRatio]; has { + var ratio int64 + if ratio, err = utils.IfaceAsTInt64(dflt); err != nil { + return + } + return &loadDispatcher{ + tntID: tntID, + defaultRatio: ratio, + sorter: sorter, + hosts: hosts, + }, nil + } + for _, host := range hosts { + if _, has := host.Params[utils.MetaRatio]; has { + return &loadDispatcher{ + tntID: tntID, + defaultRatio: 1, + sorter: sorter, + hosts: hosts, + }, nil + } + } + return &singleResultDispatcher{ + sorter: sorter, + hosts: hosts, + }, nil +} + +type singleResultDispatcher struct { + sorter hostSorter + hosts engine.DispatcherHostProfiles +} + +func (sd *singleResultDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS, + ev utils.DataProvider, tnt, routeID, subsystem string, + serviceMethod string, args interface{}, reply interface{}) (err error) { var dH *engine.DispatcherHost if routeID != utils.EmptyString { // overwrite routeID with RouteID:Subsystem @@ -229,6 +174,10 @@ func (*singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID } } } + var hostIDs []string + if hostIDs, err = sd.sorter.Sort(flts, ev, tnt, sd.hosts); err != nil { + return + } var called bool for _, hostID := range hostIDs { if dH, err = dm.GetDispatcherHost(tnt, hostID, true, true, utils.NonTransactional); err != nil { @@ -260,12 +209,18 @@ func (*singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID return } -type broadcastStrategyDispatcher struct { +type broadcastDispatcher struct { strategy string + hosts engine.DispatcherHostProfiles } -func (b *broadcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID string, subsystem, tnt string, hostIDs []string, +func (b *broadcastDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS, + ev utils.DataProvider, tnt, routeID, subsystem string, serviceMethod string, args interface{}, reply interface{}) (err error) { + var hostIDs []string + if hostIDs, err = getDispatcherHosts(flts, ev, tnt, b.hosts); err != nil { + return + } var hasHosts bool pool := rpcclient.NewRPCPool(b.strategy, config.CgrConfig().GeneralCfg().ReplyTimeout) for _, hostID := range hostIDs { @@ -288,61 +243,15 @@ func (b *broadcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID s return pool.Call(serviceMethod, args, reply) } -func newSingleStrategyDispatcher(hosts engine.DispatcherHostProfiles, params map[string]interface{}, tntID string) (ls strategyDispatcher, err error) { - if dflt, has := params[utils.MetaDefaultRatio]; has { - var ratio int64 - if ratio, err = utils.IfaceAsTInt64(dflt); err != nil { - return nil, err - } - return &loadStrategyDispatcher{ - tntID: tntID, - hosts: hosts.Clone(), - defaultRatio: ratio, - }, nil - } - for _, host := range hosts { - if _, has := host.Params[utils.MetaRatio]; has { - return &loadStrategyDispatcher{ - tntID: tntID, - hosts: hosts.Clone(), - defaultRatio: 1, - }, nil - } - } - return new(singleResultstrategyDispatcher), nil -} - -type loadStrategyDispatcher struct { +type loadDispatcher struct { tntID string - hosts engine.DispatcherHostProfiles defaultRatio int64 + sorter hostSorter + hosts engine.DispatcherHostProfiles } -func newLoadMetrics(hosts engine.DispatcherHostProfiles, dfltRatio int64) (*LoadMetrics, error) { - lM := &LoadMetrics{ - HostsLoad: make(map[string]int64), - HostsRatio: make(map[string]int64), - } - for _, host := range hosts { - if strRatio, has := host.Params[utils.MetaRatio]; !has { - lM.HostsRatio[host.ID] = dfltRatio - } else if ratio, err := utils.IfaceAsTInt64(strRatio); err != nil { - return nil, err - } else { - lM.HostsRatio[host.ID] = ratio - } - } - return lM, nil -} - -// LoadMetrics the structure to save the metrix for load strategy -type LoadMetrics struct { - mutex sync.RWMutex - HostsLoad map[string]int64 - HostsRatio map[string]int64 -} - -func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID string, subsystem, tnt string, hostIDs []string, +func (ld *loadDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS, + ev utils.DataProvider, tnt, routeID, subsystem string, serviceMethod string, args interface{}, reply interface{}) (err error) { var dH *engine.DispatcherHost var lM *LoadMetrics @@ -370,8 +279,12 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID strin } } } + var hostIDs []string + if hostIDs, err = ld.sorter.Sort(flts, ev, tnt, lM.getHosts(ld.hosts)); err != nil { + return + } var called bool - for _, hostID := range lM.getHosts(hostIDs) { + for _, hostID := range hostIDs { if dH, err = dm.GetDispatcherHost(tnt, hostID, true, true, utils.NonTransactional); err != nil { if err == utils.ErrNotFound { utils.Logger.Warning(fmt.Sprintf("<%s> could not find host with ID %q", @@ -404,40 +317,64 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID strin return } +func newLoadMetrics(hosts engine.DispatcherHostProfiles, dfltRatio int64) (*LoadMetrics, error) { + lM := &LoadMetrics{ + HostsLoad: make(map[string]int64), + HostsRatio: make(map[string]int64), + } + for _, host := range hosts { + if strRatio, has := host.Params[utils.MetaRatio]; !has { + lM.HostsRatio[host.ID] = dfltRatio + } else if ratio, err := utils.IfaceAsTInt64(strRatio); err != nil { + return nil, err + } else { + lM.HostsRatio[host.ID] = ratio + } + } + return lM, nil +} + +// LoadMetrics the structure to save the metrix for load strategy +type LoadMetrics struct { + mutex sync.RWMutex + HostsLoad map[string]int64 + HostsRatio map[string]int64 +} + // used to sort the host IDs based on costs type hostCosts struct { - ids []string - multiple []int64 + hosts engine.DispatcherHostProfiles + load []int64 } -func (hc *hostCosts) Len() int { return len(hc.ids) } -func (hc *hostCosts) Less(i, j int) bool { return hc.multiple[i] < hc.multiple[j] } +func (hc *hostCosts) Len() int { return len(hc.hosts) } +func (hc *hostCosts) Less(i, j int) bool { return hc.load[i] < hc.load[j] } func (hc *hostCosts) Swap(i, j int) { - hc.multiple[i], hc.multiple[j] = hc.multiple[j], hc.multiple[i] - hc.ids[i], hc.ids[j] = hc.ids[j], hc.ids[i] + hc.load[i], hc.load[j] = hc.load[j], hc.load[i] + hc.hosts[i], hc.hosts[j] = hc.hosts[j], hc.hosts[i] } -func (lM *LoadMetrics) getHosts(hostIDs []string) []string { +func (lM *LoadMetrics) getHosts(hosts engine.DispatcherHostProfiles) engine.DispatcherHostProfiles { hlp := &hostCosts{ - ids: make([]string, 0, len(hostIDs)), - multiple: make([]int64, 0, len(hostIDs)), + hosts: make(engine.DispatcherHostProfiles, 0, len(hosts)), + load: make([]int64, 0, len(hosts)), } lM.mutex.RLock() - for _, id := range hostIDs { + for _, host := range hosts { switch { - case lM.HostsRatio[id] < 0: - hlp.multiple = append(hlp.multiple, 0) - case lM.HostsRatio[id] == 0: + case lM.HostsRatio[host.ID] < 0: + hlp.load = append(hlp.load, 0) + case lM.HostsRatio[host.ID] == 0: continue default: - hlp.multiple = append(hlp.multiple, lM.HostsLoad[id]/lM.HostsRatio[id]) + hlp.load = append(hlp.load, lM.HostsLoad[host.ID]/lM.HostsRatio[host.ID]) } - hlp.ids = append(hlp.ids, id) + hlp.hosts = append(hlp.hosts, host) } lM.mutex.RUnlock() sort.Stable(hlp) - return hlp.ids + return hlp.hosts } func (lM *LoadMetrics) incrementLoad(hostID, tntID string) { diff --git a/dispatchers/libdispatcher_test.go b/dispatchers/libdispatcher_test.go index 53adb6fd3..33f496f8b 100644 --- a/dispatchers/libdispatcher_test.go +++ b/dispatchers/libdispatcher_test.go @@ -21,11 +21,9 @@ package dispatchers import ( "net/rpc" "reflect" - "sync" "testing" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" @@ -50,23 +48,23 @@ func TestLoadMetricsGetHosts(t *testing.T) { } // check only the first host because the rest may be in a random order // because they share the same cost - if rply := lm.getHosts(hostsIDs.Clone()); rply[0] != "DSP_1" { - t.Errorf("Expected: %q ,received: %q", "DSP_1", rply[0]) + if rply := lm.getHosts(dhp.Clone()); rply[0].ID != "DSP_1" { + t.Errorf("Expected: %q ,received: %q", "DSP_1", rply[0].ID) } lm.incrementLoad(hostsIDs[0], utils.EmptyString) lm.decrementLoad(hostsIDs[1], utils.EmptyString) - if rply := lm.getHosts(hostsIDs.Clone()); rply[0] != "DSP_2" { - t.Errorf("Expected: %q ,received: %q", "DSP_2", rply[0]) + if rply := lm.getHosts(dhp.Clone()); rply[0].ID != "DSP_2" { + t.Errorf("Expected: %q ,received: %q", "DSP_2", rply[0].ID) } for _, hst := range hostsIDs { lm.incrementLoad(hst, utils.EmptyString) } - if rply := lm.getHosts(hostsIDs.Clone()); rply[0] != "DSP_2" { - t.Errorf("Expected: %q ,received: %q", "DSP_2", rply[0]) + if rply := lm.getHosts(dhp.Clone()); rply[0].ID != "DSP_2" { + t.Errorf("Expected: %q ,received: %q", "DSP_2", rply[0].ID) } } -func TestNewSingleStrategyDispatcher(t *testing.T) { +func TestNewSingleDispatcher(t *testing.T) { dhp := engine.DispatcherHostProfiles{ {ID: "DSP_1"}, {ID: "DSP_2"}, @@ -74,11 +72,11 @@ func TestNewSingleStrategyDispatcher(t *testing.T) { {ID: "DSP_4"}, {ID: "DSP_5"}, } - var exp strategyDispatcher = new(singleResultstrategyDispatcher) - if rply, err := newSingleStrategyDispatcher(dhp, map[string]interface{}{}, utils.EmptyString); err != nil { + var exp Dispatcher = &singleResultDispatcher{hosts: dhp} + if rply, err := newSingleDispatcher(dhp, map[string]interface{}{}, utils.EmptyString, nil); err != nil { t.Fatal(err) } else if !reflect.DeepEqual(exp, rply) { - t.Errorf("Expected: singleResultstrategyDispatcher structure,received: %s", utils.ToJSON(rply)) + t.Errorf("Expected: singleResultDispatcher structure,received: %s", utils.ToJSON(rply)) } dhp = engine.DispatcherHostProfiles{ @@ -88,15 +86,15 @@ func TestNewSingleStrategyDispatcher(t *testing.T) { {ID: "DSP_4"}, {ID: "DSP_5", Params: map[string]interface{}{utils.MetaRatio: 1}}, } - exp = &loadStrategyDispatcher{ + exp = &loadDispatcher{ hosts: dhp, tntID: "cgrates.org", defaultRatio: 1, } - if rply, err := newSingleStrategyDispatcher(dhp, map[string]interface{}{}, "cgrates.org"); err != nil { + if rply, err := newSingleDispatcher(dhp, map[string]interface{}{}, "cgrates.org", nil); err != nil { t.Fatal(err) } else if !reflect.DeepEqual(exp, rply) { - t.Errorf("Expected: loadStrategyDispatcher structure,received: %s", utils.ToJSON(rply)) + t.Errorf("Expected: loadDispatcher structure,received: %s", utils.ToJSON(rply)) } dhp = engine.DispatcherHostProfiles{ @@ -105,29 +103,29 @@ func TestNewSingleStrategyDispatcher(t *testing.T) { {ID: "DSP_3"}, {ID: "DSP_4"}, } - exp = &loadStrategyDispatcher{ + exp = &loadDispatcher{ hosts: dhp, tntID: "cgrates.org", defaultRatio: 2, } - if rply, err := newSingleStrategyDispatcher(dhp, map[string]interface{}{utils.MetaDefaultRatio: 2}, "cgrates.org"); err != nil { + if rply, err := newSingleDispatcher(dhp, map[string]interface{}{utils.MetaDefaultRatio: 2}, "cgrates.org", nil); err != nil { t.Fatal(err) } else if !reflect.DeepEqual(exp, rply) { - t.Errorf("Expected: loadStrategyDispatcher structure,received: %s", utils.ToJSON(rply)) + t.Errorf("Expected: loadDispatcher structure,received: %s", utils.ToJSON(rply)) } - exp = &loadStrategyDispatcher{ + exp = &loadDispatcher{ hosts: dhp, tntID: "cgrates.org", defaultRatio: 0, } - if rply, err := newSingleStrategyDispatcher(dhp, map[string]interface{}{utils.MetaDefaultRatio: 0}, "cgrates.org"); err != nil { + if rply, err := newSingleDispatcher(dhp, map[string]interface{}{utils.MetaDefaultRatio: 0}, "cgrates.org", nil); err != nil { t.Fatal(err) } else if !reflect.DeepEqual(exp, rply) { - t.Errorf("Expected: loadStrategyDispatcher structure,received: %s", utils.ToJSON(rply)) + t.Errorf("Expected: loadDispatcher structure,received: %s", utils.ToJSON(rply)) } - if _, err := newSingleStrategyDispatcher(dhp, map[string]interface{}{utils.MetaDefaultRatio: "A"}, "cgrates.org"); err == nil { + if _, err := newSingleDispatcher(dhp, map[string]interface{}{utils.MetaDefaultRatio: "A"}, "cgrates.org", nil); err == nil { t.Fatalf("Expected error received: %v", err) } } @@ -174,13 +172,13 @@ func TestLoadMetricsGetHosts2(t *testing.T) { } hostsIDs := engine.DispatcherHostIDs(dhp.HostIDs()) exp := []string(hostsIDs.Clone())[:5] - if rply := lm.getHosts(hostsIDs.Clone()); !reflect.DeepEqual(exp, rply) { + if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) { t.Errorf("Expected: %+v ,received: %+v", exp, rply) } for i := 0; i < 100; i++ { for _, dh := range dhp { for j := int64(0); j < lm.HostsRatio[dh.ID]; j++ { - if rply := lm.getHosts(hostsIDs.Clone()); !reflect.DeepEqual(exp, rply) { + if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) { t.Errorf("Expected for id<%s>: %+v ,received: %+v", dh.ID, exp, rply) } lm.incrementLoad(dh.ID, utils.EmptyString) @@ -188,30 +186,30 @@ func TestLoadMetricsGetHosts2(t *testing.T) { exp = append(exp[1:], exp[0]) } exp = []string{"DSP_1", "DSP_2", "DSP_3", "DSP_4", "DSP_5"} - if rply := lm.getHosts(hostsIDs.Clone()); !reflect.DeepEqual(exp, rply) { + if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) { t.Errorf("Expected: %+v ,received: %+v", exp, rply) } lm.decrementLoad("DSP_4", utils.EmptyString) lm.decrementLoad("DSP_4", utils.EmptyString) lm.decrementLoad("DSP_2", utils.EmptyString) exp = []string{"DSP_2", "DSP_4", "DSP_1", "DSP_3", "DSP_5"} - if rply := lm.getHosts(hostsIDs.Clone()); !reflect.DeepEqual(exp, rply) { + if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) { t.Errorf("Expected: %+v ,received: %+v", exp, rply) } lm.incrementLoad("DSP_2", utils.EmptyString) exp = []string{"DSP_4", "DSP_1", "DSP_2", "DSP_3", "DSP_5"} - if rply := lm.getHosts(hostsIDs.Clone()); !reflect.DeepEqual(exp, rply) { + if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) { t.Errorf("Expected: %+v ,received: %+v", exp, rply) } lm.incrementLoad("DSP_4", utils.EmptyString) - if rply := lm.getHosts(hostsIDs.Clone()); !reflect.DeepEqual(exp, rply) { + if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) { t.Errorf("Expected: %+v ,received: %+v", exp, rply) } lm.incrementLoad("DSP_4", utils.EmptyString) exp = []string{"DSP_1", "DSP_2", "DSP_3", "DSP_4", "DSP_5"} - if rply := lm.getHosts(hostsIDs.Clone()); !reflect.DeepEqual(exp, rply) { + if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) { t.Errorf("Expected: %+v ,received: %+v", exp, rply) } } @@ -230,11 +228,11 @@ func TestLoadMetricsGetHosts2(t *testing.T) { } hostsIDs = engine.DispatcherHostIDs(dhp.HostIDs()) exp = []string(hostsIDs.Clone())[:5] - if rply := lm.getHosts(hostsIDs.Clone()); !reflect.DeepEqual(exp, rply) { + if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) { t.Errorf("Expected: %+v ,received: %+v", exp, rply) } for i := 0; i < 100; i++ { - if rply := lm.getHosts(hostsIDs.Clone()); !reflect.DeepEqual(exp, rply) { + if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) { t.Errorf("Expected: %+v ,received: %+v", exp, rply) } lm.incrementLoad(exp[0], utils.EmptyString) @@ -242,40 +240,27 @@ func TestLoadMetricsGetHosts2(t *testing.T) { } func TestLibDispatcherNewDispatcherMetaWeight(t *testing.T) { - dataMng := &engine.DataManager{} pfl := &engine.DispatcherProfile{ Hosts: engine.DispatcherHostProfiles{}, Strategy: utils.MetaWeight, } - result, err := newDispatcher(dataMng, pfl) + result, err := newDispatcher(pfl) if err != nil { t.Errorf("\nExpected , \nReceived <%+v>", err) } - strategy, err := newSingleStrategyDispatcher(pfl.Hosts, pfl.StrategyParams, pfl.TenantID()) - if err != nil { - t.Errorf("\nExpected , \nReceived <%+v>", err) + expected := &singleResultDispatcher{ + hosts: engine.DispatcherHostProfiles{}, + sorter: new(noSort), } - expected := &WeightDispatcher{ - hosts: engine.DispatcherHostProfiles{}, - dm: dataMng, - strategy: strategy, + if !reflect.DeepEqual(result.(*singleResultDispatcher).hosts, expected.hosts) { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.hosts, result.(*singleResultDispatcher).hosts) } - if !reflect.DeepEqual(result.(*WeightDispatcher).dm, expected.dm) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.dm, result.(*WeightDispatcher).dm) - } - if !reflect.DeepEqual(result.(*WeightDispatcher).strategy, expected.strategy) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.strategy, result.(*WeightDispatcher).strategy) - } - if !reflect.DeepEqual(result.(*WeightDispatcher).hosts, expected.hosts) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.hosts, result.(*WeightDispatcher).hosts) - } - if !reflect.DeepEqual(result.(*WeightDispatcher).tnt, expected.tnt) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", result.(*WeightDispatcher).tnt, expected.tnt) + if !reflect.DeepEqual(result.(*singleResultDispatcher).sorter, expected.sorter) { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", result.(*singleResultDispatcher).sorter, expected.sorter) } } func TestLibDispatcherNewDispatcherMetaWeightErr(t *testing.T) { - dataMng := &engine.DataManager{} pfl := &engine.DispatcherProfile{ Hosts: engine.DispatcherHostProfiles{}, StrategyParams: map[string]interface{}{ @@ -283,7 +268,7 @@ func TestLibDispatcherNewDispatcherMetaWeightErr(t *testing.T) { }, Strategy: utils.MetaWeight, } - _, err := newDispatcher(dataMng, pfl) + _, err := newDispatcher(pfl) expected := "cannot convert field: false to int" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -292,40 +277,27 @@ func TestLibDispatcherNewDispatcherMetaWeightErr(t *testing.T) { } func TestLibDispatcherNewDispatcherMetaRandom(t *testing.T) { - dataMng := &engine.DataManager{} pfl := &engine.DispatcherProfile{ Hosts: engine.DispatcherHostProfiles{}, Strategy: utils.MetaRandom, } - result, err := newDispatcher(dataMng, pfl) + result, err := newDispatcher(pfl) if err != nil { t.Errorf("\nExpected , \nReceived <%+v>", err) } - strategy, err := newSingleStrategyDispatcher(pfl.Hosts, pfl.StrategyParams, pfl.TenantID()) - if err != nil { - t.Errorf("\nExpected , \nReceived <%+v>", err) + expected := &singleResultDispatcher{ + hosts: engine.DispatcherHostProfiles{}, + sorter: new(randomSort), } - expected := &RandomDispatcher{ - hosts: engine.DispatcherHostProfiles{}, - dm: dataMng, - strategy: strategy, + if !reflect.DeepEqual(result.(*singleResultDispatcher).sorter, expected.sorter) { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.sorter, result.(*singleResultDispatcher).sorter) } - if !reflect.DeepEqual(result.(*RandomDispatcher).dm, expected.dm) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.dm, result.(*WeightDispatcher).dm) - } - if !reflect.DeepEqual(result.(*RandomDispatcher).strategy, expected.strategy) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.strategy, result.(*WeightDispatcher).strategy) - } - if !reflect.DeepEqual(result.(*RandomDispatcher).hosts, expected.hosts) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.hosts, result.(*WeightDispatcher).hosts) - } - if !reflect.DeepEqual(result.(*RandomDispatcher).tnt, expected.tnt) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", result.(*WeightDispatcher).tnt, expected.tnt) + if !reflect.DeepEqual(result.(*singleResultDispatcher).hosts, expected.hosts) { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.hosts, result.(*singleResultDispatcher).hosts) } } func TestLibDispatcherNewDispatcherMetaRandomErr(t *testing.T) { - dataMng := &engine.DataManager{} pfl := &engine.DispatcherProfile{ Hosts: engine.DispatcherHostProfiles{}, StrategyParams: map[string]interface{}{ @@ -333,7 +305,7 @@ func TestLibDispatcherNewDispatcherMetaRandomErr(t *testing.T) { }, Strategy: utils.MetaRandom, } - _, err := newDispatcher(dataMng, pfl) + _, err := newDispatcher(pfl) expected := "cannot convert field: false to int" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -342,40 +314,27 @@ func TestLibDispatcherNewDispatcherMetaRandomErr(t *testing.T) { } func TestLibDispatcherNewDispatcherMetaRoundRobin(t *testing.T) { - dataMng := &engine.DataManager{} pfl := &engine.DispatcherProfile{ Hosts: engine.DispatcherHostProfiles{}, Strategy: utils.MetaRoundRobin, } - result, err := newDispatcher(dataMng, pfl) + result, err := newDispatcher(pfl) if err != nil { t.Errorf("\nExpected , \nReceived <%+v>", err) } - strategy, err := newSingleStrategyDispatcher(pfl.Hosts, pfl.StrategyParams, pfl.TenantID()) - if err != nil { - t.Errorf("\nExpected , \nReceived <%+v>", err) + expected := &singleResultDispatcher{ + hosts: engine.DispatcherHostProfiles{}, + sorter: new(roundRobinSort), } - expected := &RoundRobinDispatcher{ - hosts: engine.DispatcherHostProfiles{}, - dm: dataMng, - strategy: strategy, + if !reflect.DeepEqual(result.(*singleResultDispatcher).sorter, expected.sorter) { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.sorter, result.(*singleResultDispatcher).sorter) } - if !reflect.DeepEqual(result.(*RoundRobinDispatcher).dm, expected.dm) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.dm, result.(*WeightDispatcher).dm) - } - if !reflect.DeepEqual(result.(*RoundRobinDispatcher).strategy, expected.strategy) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.strategy, result.(*WeightDispatcher).strategy) - } - if !reflect.DeepEqual(result.(*RoundRobinDispatcher).hosts, expected.hosts) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.hosts, result.(*WeightDispatcher).hosts) - } - if !reflect.DeepEqual(result.(*RoundRobinDispatcher).tnt, expected.tnt) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", result.(*WeightDispatcher).tnt, expected.tnt) + if !reflect.DeepEqual(result.(*singleResultDispatcher).hosts, expected.hosts) { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.hosts, result.(*singleResultDispatcher).hosts) } } func TestLibDispatcherNewDispatcherMetaRoundRobinErr(t *testing.T) { - dataMng := &engine.DataManager{} pfl := &engine.DispatcherProfile{ Hosts: engine.DispatcherHostProfiles{}, StrategyParams: map[string]interface{}{ @@ -383,7 +342,7 @@ func TestLibDispatcherNewDispatcherMetaRoundRobinErr(t *testing.T) { }, Strategy: utils.MetaRoundRobin, } - _, err := newDispatcher(dataMng, pfl) + _, err := newDispatcher(pfl) expected := "cannot convert field: false to int" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -392,328 +351,105 @@ func TestLibDispatcherNewDispatcherMetaRoundRobinErr(t *testing.T) { } func TestLibDispatcherNewDispatcherPoolBroadcast(t *testing.T) { - dataMng := &engine.DataManager{} pfl := &engine.DispatcherProfile{ Hosts: engine.DispatcherHostProfiles{}, Strategy: rpcclient.PoolBroadcast, } - result, err := newDispatcher(dataMng, pfl) + result, err := newDispatcher(pfl) if err != nil { t.Errorf("\nExpected , \nReceived <%+v>", err) } - strategy := &broadcastStrategyDispatcher{strategy: pfl.Strategy} - expected := &WeightDispatcher{ + expected := &broadcastDispatcher{ hosts: engine.DispatcherHostProfiles{}, - dm: dataMng, - strategy: strategy, + strategy: pfl.Strategy, } - if !reflect.DeepEqual(result.(*WeightDispatcher).dm, expected.dm) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.dm, result.(*WeightDispatcher).dm) + if !reflect.DeepEqual(result.(*broadcastDispatcher).strategy, expected.strategy) { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.strategy, result.(*broadcastDispatcher).strategy) } - if !reflect.DeepEqual(result.(*WeightDispatcher).strategy, expected.strategy) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.strategy, result.(*WeightDispatcher).strategy) - } - if !reflect.DeepEqual(result.(*WeightDispatcher).hosts, expected.hosts) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.hosts, result.(*WeightDispatcher).hosts) - } - if !reflect.DeepEqual(result.(*WeightDispatcher).tnt, expected.tnt) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", result.(*WeightDispatcher).tnt, expected.tnt) + if !reflect.DeepEqual(result.(*broadcastDispatcher).hosts, expected.hosts) { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.hosts, result.(*broadcastDispatcher).hosts) } } func TestLibDispatcherNewDispatcherError(t *testing.T) { - dataMng := &engine.DataManager{} pfl := &engine.DispatcherProfile{ Hosts: engine.DispatcherHostProfiles{}, Strategy: "badStrategy", } expected := "unsupported dispatch strategy: " - _, err := newDispatcher(dataMng, pfl) + _, err := newDispatcher(pfl) if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } } -func TestLibDispatcherSetProfile(t *testing.T) { - pfl := &engine.DispatcherProfile{ - Hosts: engine.DispatcherHostProfiles{ - { - ID: "0", - FilterIDs: []string{"FilterTest1"}, - Weight: 1, - Params: nil, - Blocker: false, - }, - }, - } - wgDsp := &WeightDispatcher{} - wgDsp.SetProfile(pfl) - expected := &engine.DispatcherProfile{ - Hosts: engine.DispatcherHostProfiles{ - { - ID: "0", - FilterIDs: []string{"FilterTest1"}, - Weight: 1, - Params: nil, - Blocker: false, - }, - }, - } - if !reflect.DeepEqual(expected, pfl) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.ToJSON(expected), utils.ToJSON(pfl)) +func TestLibDispatcherSingleResultDispatcherDispatch(t *testing.T) { + wgDsp := &singleResultDispatcher{sorter: new(noSort)} + dataDB := engine.NewInternalDB(nil, nil, true) + dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil) + err := wgDsp.Dispatch(dM, nil, nil, "", "", "", "", "", "") + expected := "HOST_NOT_FOUND" + if err == nil || err.Error() != expected { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } } -func TestLibDispatcherHostIDs(t *testing.T) { - expected := engine.DispatcherHostIDs{"5", "10", "1"} +func TestLibDispatcherSingleResultDispatcherDispatchRouteID(t *testing.T) { + wgDsp := &singleResultDispatcher{sorter: new(roundRobinSort)} + dataDB := engine.NewInternalDB(nil, nil, true) + dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil) + err := wgDsp.Dispatch(dM, nil, nil, "", "routeID", "", "", "", "") + expected := "HOST_NOT_FOUND" + if err == nil || err.Error() != expected { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) + } +} - wgDsp := &WeightDispatcher{ - RWMutex: sync.RWMutex{}, - dm: nil, - tnt: "", +func TestLibDispatcherBroadcastDispatcherDispatch(t *testing.T) { + wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} + dataDB := engine.NewInternalDB(nil, nil, true) + dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil) + err := wgDsp.Dispatch(dM, nil, nil, "", "", "", "", "", "") + expected := "HOST_NOT_FOUND" + if err == nil || err.Error() != expected { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) + } +} + +func TestLibDispatcherBroadcastDispatcherDispatchRouteID(t *testing.T) { + wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} + dataDB := engine.NewInternalDB(nil, nil, true) + dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil) + err := wgDsp.Dispatch(dM, nil, nil, "", "routeID", "", "", "", "") + expected := "HOST_NOT_FOUND" + if err == nil || err.Error() != expected { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) + } +} + +func TestLibDispatcherLoadDispatcherDispatch(t *testing.T) { + wgDsp := &loadDispatcher{sorter: new(randomSort)} + dataDB := engine.NewInternalDB(nil, nil, true) + dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil) + err := wgDsp.Dispatch(dM, nil, nil, "", "", "", "", "", "") + expected := "HOST_NOT_FOUND" + if err == nil || err.Error() != expected { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) + } +} + +func TestLibDispatcherLoadDispatcherDispatchHostsID(t *testing.T) { + wgDsp := &loadDispatcher{ hosts: engine.DispatcherHostProfiles{ - { - ID: "5", - FilterIDs: nil, - Weight: 0, - Params: nil, - Blocker: false, - }, - { - ID: "10", - FilterIDs: nil, - Weight: 0, - Params: nil, - Blocker: false, - }, - { - ID: "1", - FilterIDs: nil, - Weight: 0, - Params: nil, - Blocker: false, - }, + {ID: "hostID1"}, + {ID: "hostID2"}, }, - strategy: nil, + sorter: new(noSort), } - result := wgDsp.HostIDs() - if !reflect.DeepEqual(expected, result) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) - } -} - -type strategyMockDispatcher struct{} - -func (stMck strategyMockDispatcher) dispatch(dm *engine.DataManager, routeID string, subsystem, tnt string, hostIDs []string, - serviceMethod string, args interface{}, reply interface{}) (err error) { - return -} - -func TestLibDispatcherDispatch(t *testing.T) { - wgDsp := &WeightDispatcher{ - strategy: strategyMockDispatcher{}, - } - result := wgDsp.Dispatch("", "", "", "", "") - if !reflect.DeepEqual(nil, result) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, result) - } -} - -func TestLibDispatcherRandomSetProfile(t *testing.T) { - pfl := &engine.DispatcherProfile{ - Hosts: []*engine.DispatcherHostProfile{ - { - ID: "0", - FilterIDs: nil, - Weight: 0, - Params: nil, - Blocker: false, - }, - }, - } - wgDsp := &RandomDispatcher{ - strategy: strategyMockDispatcher{}, - } - wgDsp.SetProfile(pfl) - expected := &engine.DispatcherProfile{ - Hosts: []*engine.DispatcherHostProfile{ - { - ID: "0", - FilterIDs: nil, - Weight: 0, - Params: nil, - Blocker: false, - }, - }, - } - - if !reflect.DeepEqual(pfl, expected) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, pfl) - } -} - -func TestLibDispatcherRandomHostIDs(t *testing.T) { - expected := engine.DispatcherHostIDs{"5"} - wgDsp := &RandomDispatcher{ - RWMutex: sync.RWMutex{}, - dm: nil, - tnt: "", - hosts: engine.DispatcherHostProfiles{ - { - ID: "5", - FilterIDs: nil, - Weight: 0, - Params: nil, - Blocker: false, - }, - }, - strategy: nil, - } - result := wgDsp.HostIDs() - - if !reflect.DeepEqual(expected, result) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) - } -} - -func TestLibDispatcherRandomDispatch(t *testing.T) { - wgDsp := &RandomDispatcher{ - strategy: strategyMockDispatcher{}, - } - result := wgDsp.Dispatch("", "", "", "", "") - if !reflect.DeepEqual(nil, result) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, result) - } -} - -func TestLibDispatcherRoundRobinSetProfile(t *testing.T) { - pfl := &engine.DispatcherProfile{ - Hosts: []*engine.DispatcherHostProfile{ - { - ID: "0", - FilterIDs: nil, - Weight: 0, - Params: nil, - Blocker: false, - }, - }, - } - wgDsp := &RoundRobinDispatcher{ - strategy: strategyMockDispatcher{}, - } - wgDsp.SetProfile(pfl) - expected := &engine.DispatcherProfile{ - Hosts: []*engine.DispatcherHostProfile{ - { - ID: "0", - FilterIDs: nil, - Weight: 0, - Params: nil, - Blocker: false, - }, - }, - } - - if !reflect.DeepEqual(pfl, expected) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, pfl) - } -} - -func TestLibDispatcherRoundRobinHostIDs(t *testing.T) { - expected := engine.DispatcherHostIDs{"5"} - wgDsp := &RoundRobinDispatcher{ - RWMutex: sync.RWMutex{}, - dm: nil, - tnt: "", - hosts: engine.DispatcherHostProfiles{ - { - ID: "5", - FilterIDs: nil, - Weight: 0, - Params: nil, - Blocker: false, - }, - }, - strategy: nil, - } - result := wgDsp.HostIDs() - - if !reflect.DeepEqual(expected, result) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) - } -} - -func TestLibDispatcherRoundRobinDispatch(t *testing.T) { - wgDsp := &RoundRobinDispatcher{ - strategy: strategyMockDispatcher{}, - } - result := wgDsp.Dispatch("", "", "", "", "") - if !reflect.DeepEqual(nil, result) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, result) - } -} - -func TestLibDispatcherSingleResultstrategyDispatcherDispatch(t *testing.T) { - wgDsp := &singleResultstrategyDispatcher{} dataDB := engine.NewInternalDB(nil, nil, true) dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil) - err := wgDsp.dispatch(dM, "", "", "", []string{""}, "", "", "") - expected := "HOST_NOT_FOUND" - if err == nil || err.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) - } -} - -func TestLibDispatcherSingleResultstrategyDispatcherDispatchRouteID(t *testing.T) { - wgDsp := &singleResultstrategyDispatcher{} - dataDB := engine.NewInternalDB(nil, nil, true) - dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil) - err := wgDsp.dispatch(dM, "routeID", "", "", []string{""}, "", "", "") - expected := "HOST_NOT_FOUND" - if err == nil || err.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) - } -} - -func TestLibDispatcherBroadcastStrategyDispatcherDispatch(t *testing.T) { - wgDsp := &broadcastStrategyDispatcher{} - dataDB := engine.NewInternalDB(nil, nil, true) - dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil) - err := wgDsp.dispatch(dM, "", "", "", []string{""}, "", "", "") - expected := "HOST_NOT_FOUND" - if err == nil || err.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) - } -} - -func TestLibDispatcherBroadcastStrategyDispatcherDispatchRouteID(t *testing.T) { - wgDsp := &broadcastStrategyDispatcher{} - dataDB := engine.NewInternalDB(nil, nil, true) - dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil) - err := wgDsp.dispatch(dM, "routeID", "", "", []string{""}, "", "", "") - expected := "HOST_NOT_FOUND" - if err == nil || err.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) - } -} - -func TestLibDispatcherLoadStrategyDispatcherDispatch(t *testing.T) { - wgDsp := &loadStrategyDispatcher{} - dataDB := engine.NewInternalDB(nil, nil, true) - dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil) - err := wgDsp.dispatch(dM, "", "", "", []string{""}, "", "", "") - expected := "HOST_NOT_FOUND" - if err == nil || err.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) - } -} - -func TestLibDispatcherLoadStrategyDispatcherDispatchHostsID(t *testing.T) { - wgDsp := &loadStrategyDispatcher{} - dataDB := engine.NewInternalDB(nil, nil, true) - dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil) - err := wgDsp.dispatch(dM, "routeID", "", "", []string{"hostID1", "hostID2"}, "", "", "") + err := wgDsp.Dispatch(dM, nil, nil, "", "routeID", "", "", "", "") expected := "HOST_NOT_FOUND" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -721,12 +457,12 @@ func TestLibDispatcherLoadStrategyDispatcherDispatchHostsID(t *testing.T) { } func TestLibDispatcherLoadStrategyDispatchCaseHosts(t *testing.T) { - wgDsp := &loadStrategyDispatcher{ + wgDsp := &loadDispatcher{ hosts: engine.DispatcherHostProfiles{ { - ID: "testID", - FilterIDs: []string{"filterID"}, - Weight: 4, + ID: "testID", + // FilterIDs: []string{"filterID"}, + Weight: 4, Params: map[string]interface{}{ utils.MetaRatio: 1, }, @@ -734,10 +470,11 @@ func TestLibDispatcherLoadStrategyDispatchCaseHosts(t *testing.T) { }, }, defaultRatio: 1, + sorter: new(noSort), } dataDB := engine.NewInternalDB(nil, nil, true) dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil) - err := wgDsp.dispatch(dM, "", "", "", []string{"testID"}, "", "", "") + err := wgDsp.Dispatch(dM, nil, nil, "", "", "", "", "", "") expected := "HOST_NOT_FOUND" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -745,12 +482,12 @@ func TestLibDispatcherLoadStrategyDispatchCaseHosts(t *testing.T) { } func TestLibDispatcherLoadStrategyDispatchCaseHostsError(t *testing.T) { - wgDsp := &loadStrategyDispatcher{ + wgDsp := &loadDispatcher{ hosts: engine.DispatcherHostProfiles{ { - ID: "testID2", - FilterIDs: []string{"filterID"}, - Weight: 4, + ID: "testID2", + // FilterIDs: []string{"filterID"}, + Weight: 4, Params: map[string]interface{}{ utils.MetaRatio: 1, }, @@ -758,8 +495,9 @@ func TestLibDispatcherLoadStrategyDispatchCaseHostsError(t *testing.T) { }, }, defaultRatio: 1, + sorter: new(noSort), } - err := wgDsp.dispatch(nil, "", "", "", []string{"testID2"}, "", "", "") + err := wgDsp.Dispatch(nil, nil, nil, "", "", "", "", "", "") expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -773,13 +511,13 @@ func TestLibDispatcherLoadStrategyDispatchCaseHostsCastError(t *testing.T) { engine.Cache = newCache engine.Cache.SetWithoutReplicate(utils.CacheDispatcherLoads, "testID", false, nil, true, utils.NonTransactional) - wgDsp := &loadStrategyDispatcher{ + wgDsp := &loadDispatcher{ tntID: "testID", hosts: engine.DispatcherHostProfiles{ { - ID: "testID", - FilterIDs: []string{"filterID"}, - Weight: 4, + ID: "testID", + // FilterIDs: []string{"filterID"}, + Weight: 4, Params: map[string]interface{}{ utils.MetaRatio: 1, }, @@ -787,8 +525,9 @@ func TestLibDispatcherLoadStrategyDispatchCaseHostsCastError(t *testing.T) { }, }, defaultRatio: 1, + sorter: new(noSort), } - err := wgDsp.dispatch(nil, "", "", "", []string{"testID"}, "", "", "") + err := wgDsp.Dispatch(nil, nil, nil, "", "", "", "", "", "") expected := "cannot cast false to *LoadMetrics" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -797,13 +536,13 @@ func TestLibDispatcherLoadStrategyDispatchCaseHostsCastError(t *testing.T) { } func TestLibDispatcherLoadStrategyDispatchCaseHostsCastError2(t *testing.T) { - wgDsp := &loadStrategyDispatcher{ + wgDsp := &loadDispatcher{ tntID: "testID", hosts: engine.DispatcherHostProfiles{ { - ID: "testID", - FilterIDs: []string{"filterID"}, - Weight: 4, + ID: "testID", + // FilterIDs: []string{"filterID"}, + Weight: 4, Params: map[string]interface{}{ utils.MetaRatio: false, }, @@ -811,15 +550,16 @@ func TestLibDispatcherLoadStrategyDispatchCaseHostsCastError2(t *testing.T) { }, }, defaultRatio: 1, + sorter: new(noSort), } - err := wgDsp.dispatch(nil, "", "", "", []string{"testID"}, "", "", "") + err := wgDsp.Dispatch(nil, nil, nil, "", "", "", "", "", "") expected := "cannot convert field: false to int" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } } -func TestLibDispatcherSingleResultStrategyDispatcherCastError(t *testing.T) { +func TestLibDispatcherSingleResultDispatcherCastError(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() dm := engine.NewDataManager(nil, nil, nil) @@ -837,8 +577,8 @@ func TestLibDispatcherSingleResultStrategyDispatcherCastError(t *testing.T) { } engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes", value, nil, true, utils.NonTransactional) - wgDsp := &singleResultstrategyDispatcher{} - err := wgDsp.dispatch(nil, "testID", utils.MetaAttributes, "testTenant", []string{"testID"}, "", "", "") + wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} + err := wgDsp.Dispatch(nil, nil, nil, "", "testID", utils.MetaAttributes, "", "", "") expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -852,7 +592,7 @@ func (*mockTypeCon) Call(serviceMethod string, args, reply interface{}) error { return utils.ErrNotFound } -func TestLibDispatcherSingleResultStrategyDispatcherCastError2(t *testing.T) { +func TestLibDispatcherSingleResultDispatcherCastError2(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() dm := engine.NewDataManager(nil, nil, nil) @@ -876,8 +616,8 @@ func TestLibDispatcherSingleResultStrategyDispatcherCastError2(t *testing.T) { engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1Ping, chanRPC) engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes", value, nil, true, utils.NonTransactional) - wgDsp := &singleResultstrategyDispatcher{} - err := wgDsp.dispatch(nil, "testID", utils.MetaAttributes, "testTenant", []string{"testID"}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) + wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} + err := wgDsp.Dispatch(nil, nil, nil, "testTenant", "testID", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) expected := "UNSUPPORTED_SERVICE_METHOD" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -886,7 +626,7 @@ func TestLibDispatcherSingleResultStrategyDispatcherCastError2(t *testing.T) { engine.IntRPC = tmp } -func TestLibDispatcherBroadcastStrategyDispatcherDispatchError1(t *testing.T) { +func TestLibDispatcherBroadcastDispatcherDispatchError1(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() dm := engine.NewDataManager(nil, nil, nil) @@ -904,8 +644,8 @@ func TestLibDispatcherBroadcastStrategyDispatcherDispatchError1(t *testing.T) { } engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes", value, nil, true, utils.NonTransactional) - wgDsp := &broadcastStrategyDispatcher{} - err := wgDsp.dispatch(nil, "testID", utils.MetaAttributes, "testTenant", []string{"testID"}, "", "", "") + wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} + err := wgDsp.Dispatch(nil, nil, nil, "testTenant", "testID", utils.MetaAttributes, "", "", "") expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -913,7 +653,7 @@ func TestLibDispatcherBroadcastStrategyDispatcherDispatchError1(t *testing.T) { engine.Cache = cacheInit } -func TestLibDispatcherBroadcastStrategyDispatcherDispatchError2(t *testing.T) { +func TestLibDispatcherBroadcastDispatcherDispatchError2(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() dm := engine.NewDataManager(nil, nil, nil) @@ -922,8 +662,8 @@ func TestLibDispatcherBroadcastStrategyDispatcherDispatchError2(t *testing.T) { engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID", nil, nil, true, utils.NonTransactional) - wgDsp := &broadcastStrategyDispatcher{} - err := wgDsp.dispatch(nil, "testID", utils.MetaAttributes, "testTenant", []string{"testID"}, "", "", "") + wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} + err := wgDsp.Dispatch(nil, nil, nil, "testTenant", "testID", utils.MetaAttributes, "", "", "") expected := "HOST_NOT_FOUND" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -931,7 +671,7 @@ func TestLibDispatcherBroadcastStrategyDispatcherDispatchError2(t *testing.T) { engine.Cache = cacheInit } -func TestLibDispatcherBroadcastStrategyDispatcherDispatchError3(t *testing.T) { +func TestLibDispatcherBroadcastDispatcherDispatchError3(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() dm := engine.NewDataManager(nil, nil, nil) @@ -949,15 +689,15 @@ func TestLibDispatcherBroadcastStrategyDispatcherDispatchError3(t *testing.T) { } engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID", value, nil, true, utils.NonTransactional) - wgDsp := &broadcastStrategyDispatcher{} - err := wgDsp.dispatch(nil, "testID", utils.MetaAttributes, "testTenant", []string{"testID"}, "", "", "") + wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} + err := wgDsp.Dispatch(nil, nil, nil, "testTenant", "testID", utils.MetaAttributes, "", "", "") if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } engine.Cache = cacheInit } -func TestLibDispatcherLoadStrategyDispatcherCacheError(t *testing.T) { +func TestLibDispatcherLoadDispatcherCacheError(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() dm := engine.NewDataManager(nil, nil, nil) @@ -975,8 +715,8 @@ func TestLibDispatcherLoadStrategyDispatcherCacheError(t *testing.T) { } engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes", value, nil, true, utils.NonTransactional) - wgDsp := &loadStrategyDispatcher{} - err := wgDsp.dispatch(nil, "testID", utils.MetaAttributes, "testTenant", []string{"testID"}, "", "", "") + wgDsp := &loadDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} + err := wgDsp.Dispatch(nil, nil, nil, "testTenant", "testID", utils.MetaAttributes, "", "", "") expected := "HOST_NOT_FOUND" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -984,7 +724,7 @@ func TestLibDispatcherLoadStrategyDispatcherCacheError(t *testing.T) { engine.Cache = cacheInit } -func TestLibDispatcherLoadStrategyDispatcherCacheError2(t *testing.T) { +func TestLibDispatcherLoadDispatcherCacheError2(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() dm := engine.NewDataManager(nil, nil, nil) @@ -1008,8 +748,8 @@ func TestLibDispatcherLoadStrategyDispatcherCacheError2(t *testing.T) { engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1Ping, chanRPC) engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes", value, nil, true, utils.NonTransactional) - wgDsp := &loadStrategyDispatcher{} - err := wgDsp.dispatch(nil, "testID", utils.MetaAttributes, "testTenant", []string{"testID"}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) + wgDsp := &loadDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} + err := wgDsp.Dispatch(nil, nil, nil, "testTenant", "testID", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) expected := "UNSUPPORTED_SERVICE_METHOD" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -1018,7 +758,7 @@ func TestLibDispatcherLoadStrategyDispatcherCacheError2(t *testing.T) { engine.IntRPC = tmp } -func TestLibDispatcherLoadStrategyDispatcherCacheError3(t *testing.T) { +func TestLibDispatcherLoadDispatcherCacheError3(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() dm := engine.NewDataManager(nil, nil, nil) @@ -1042,22 +782,22 @@ func TestLibDispatcherLoadStrategyDispatcherCacheError3(t *testing.T) { engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTENANT:testID", value, nil, true, utils.NonTransactional) engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1Ping, chanRPC) - wgDsp := &loadStrategyDispatcher{ + wgDsp := &loadDispatcher{ tntID: "testTENANT", hosts: engine.DispatcherHostProfiles{ { - ID: "testID", - FilterIDs: []string{"filterID1", "filterID2"}, - Weight: 3, + ID: "testID", + // FilterIDs: []string{"filterID1", "filterID2"}, + Weight: 3, Params: map[string]interface{}{ utils.MetaRatio: 1, }, Blocker: true, }, { - ID: "testID2", - FilterIDs: []string{"filterID1", "filterID2"}, - Weight: 3, + ID: "testID2", + // FilterIDs: []string{"filterID1", "filterID2"}, + Weight: 3, Params: map[string]interface{}{ utils.MetaRatio: 2, }, @@ -1065,8 +805,9 @@ func TestLibDispatcherLoadStrategyDispatcherCacheError3(t *testing.T) { }, }, defaultRatio: 0, + sorter: new(noSort), } - err := wgDsp.dispatch(dm, "testID", utils.MetaAttributes, "testTENANT", []string{"testID", "testID2"}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) + err := wgDsp.Dispatch(dm, nil, nil, "testTENANT", "testID", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } @@ -1074,7 +815,7 @@ func TestLibDispatcherLoadStrategyDispatcherCacheError3(t *testing.T) { engine.IntRPC = tmp } -func TestLibDispatcherLoadStrategyDispatcherCacheError4(t *testing.T) { +func TestLibDispatcherLoadDispatcherCacheError4(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() cfg.CacheCfg().ReplicationConns = []string{"con"} @@ -1111,22 +852,22 @@ func TestLibDispatcherLoadStrategyDispatcherCacheError4(t *testing.T) { engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTENANT:testID", value, nil, true, utils.NonTransactional) - wgDsp := &loadStrategyDispatcher{ + wgDsp := &loadDispatcher{ tntID: "testTENANT", hosts: engine.DispatcherHostProfiles{ { - ID: "testID", - FilterIDs: []string{"filterID1", "filterID2"}, - Weight: 3, + ID: "testID", + // FilterIDs: []string{"filterID1", "filterID2"}, + Weight: 3, Params: map[string]interface{}{ utils.MetaRatio: 1, }, Blocker: true, }, { - ID: "testID2", - FilterIDs: []string{"filterID1", "filterID2"}, - Weight: 3, + ID: "testID2", + // FilterIDs: []string{"filterID1", "filterID2"}, + Weight: 3, Params: map[string]interface{}{ utils.MetaRatio: 2, }, @@ -1134,8 +875,9 @@ func TestLibDispatcherLoadStrategyDispatcherCacheError4(t *testing.T) { }, }, defaultRatio: 0, + sorter: new(noSort), } - err := wgDsp.dispatch(dm, "testID", utils.MetaAttributes, "testTENANT", []string{"testID", "testID2"}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) + err := wgDsp.Dispatch(dm, nil, nil, "testTENANT", "testID", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) expected := "DISCONNECTED" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -1149,7 +891,7 @@ func (*mockTypeConDispatch) Call(serviceMethod string, args, reply interface{}) return rpc.ErrShutdown } -func TestLibDispatcherLoadStrategyDispatcherCacheError5(t *testing.T) { +func TestLibDispatcherLoadDispatcherCacheError5(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() @@ -1174,7 +916,7 @@ func TestLibDispatcherLoadStrategyDispatcherCacheError5(t *testing.T) { engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, chanRPC) engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID", value, nil, true, utils.NonTransactional) - wgDsp := &loadStrategyDispatcher{ + wgDsp := &loadDispatcher{ tntID: "testTenant", hosts: engine.DispatcherHostProfiles{ { @@ -1187,15 +929,16 @@ func TestLibDispatcherLoadStrategyDispatcherCacheError5(t *testing.T) { }, }, defaultRatio: 0, + sorter: new(noSort), } - err := wgDsp.dispatch(nil, "testID", utils.MetaAttributes, "testTenant", []string{"testID"}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) + err := wgDsp.Dispatch(nil, nil, nil, "testTenant", "testID", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) if err == nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "connection is shut down", err) } engine.Cache = cacheInit engine.IntRPC = tmp } -func TestLibDispatcherSingleResultstrategyDispatcherCase1(t *testing.T) { +func TestLibDispatcherSingleResultDispatcherCase1(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() dm := engine.NewDataManager(nil, nil, nil) @@ -1218,8 +961,8 @@ func TestLibDispatcherSingleResultstrategyDispatcherCase1(t *testing.T) { engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, chanRPC) engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID", value, nil, true, utils.NonTransactional) - wgDsp := &singleResultstrategyDispatcher{} - err := wgDsp.dispatch(dm, "", utils.MetaAttributes, "testTenant", []string{"testID"}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) + wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} + err := wgDsp.Dispatch(dm, nil, nil, "testTenant", "", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) if err == nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "connection is shut down", err) } @@ -1233,7 +976,7 @@ func (*mockTypeConDispatch2) Call(serviceMethod string, args, reply interface{}) return nil } -func TestLibDispatcherSingleResultstrategyDispatcherCase2(t *testing.T) { +func TestLibDispatcherSingleResultDispatcherCase2(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() dm := engine.NewDataManager(nil, nil, nil) @@ -1256,8 +999,8 @@ func TestLibDispatcherSingleResultstrategyDispatcherCase2(t *testing.T) { engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, chanRPC) engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID", value, nil, true, utils.NonTransactional) - wgDsp := &singleResultstrategyDispatcher{} - err := wgDsp.dispatch(dm, "routeID", utils.MetaAttributes, "testTenant", []string{"testID"}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) + wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} + err := wgDsp.Dispatch(dm, nil, nil, "testTenant", "routeID", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } @@ -1265,7 +1008,7 @@ func TestLibDispatcherSingleResultstrategyDispatcherCase2(t *testing.T) { engine.IntRPC = tmp } -func TestLibDispatcherSingleResultstrategyDispatcherCase3(t *testing.T) { +func TestLibDispatcherSingleResultDispatcherCase3(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() cfg.CacheCfg().ReplicationConns = []string{"con"} @@ -1305,8 +1048,8 @@ func TestLibDispatcherSingleResultstrategyDispatcherCase3(t *testing.T) { engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, chanRPC) engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID", value, nil, true, utils.NonTransactional) - wgDsp := &singleResultstrategyDispatcher{} - err := wgDsp.dispatch(dm, "routeID", utils.MetaAttributes, "testTenant", []string{"testID"}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) + wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} + err := wgDsp.Dispatch(dm, nil, nil, "testTenant", "routeID", utils.MetaAttributes, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) expected := "DISCONNECTED" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)