Updated dispatcher conns sorting

This commit is contained in:
Trial97
2021-06-11 09:59:38 +03:00
committed by Dan Christian Bogos
parent 0dbc372ed6
commit 6ae624538b
5 changed files with 451 additions and 708 deletions

View File

@@ -103,17 +103,13 @@ func (dS *DispatcherService) authorize(method, tenant string, apiKey string, evT
// dispatcherForEvent returns a dispatcher instance configured for specific event
// or utils.ErrNotFound if none present
func (dS *DispatcherService) dispatcherProfilesForEvent(tnt string, ev *utils.CGREvent,
subsys string) (dPrlfs engine.DispatcherProfiles, err error) {
evNm utils.MapStorage, subsys string) (dPrlfs engine.DispatcherProfiles, err error) {
// find out the matching profiles
anyIdxPrfx := utils.ConcatenatedKey(tnt, utils.MetaAny)
idxKeyPrfx := anyIdxPrfx
if subsys != "" {
idxKeyPrfx = utils.ConcatenatedKey(tnt, subsys)
}
evNm := utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}
var prflIDs utils.StringSet
if prflIDs, err = engine.MatchingItemIDsForEvent(evNm,
dS.cfg.DispatcherSCfg().StringIndexedFields,
@@ -194,8 +190,12 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string,
if tnt == utils.EmptyString {
tnt = dS.cfg.GeneralCfg().DefaultTenant
}
evNm := utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}
var dPrfls engine.DispatcherProfiles
if dPrfls, err = dS.dispatcherProfilesForEvent(tnt, ev, subsys); err != nil {
if dPrfls, err = dS.dispatcherProfilesForEvent(tnt, ev, evNm, subsys); err != nil {
return utils.NewErrDispatcherS(err)
}
for _, dPrfl := range dPrfls {
@@ -205,13 +205,13 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string,
if x, ok := engine.Cache.Get(utils.CacheDispatchers,
tntID); ok && x != nil {
d = x.(Dispatcher)
} else if d, err = newDispatcher(dS.dm, dPrfl); err != nil {
} else if d, err = newDispatcher(dPrfl); err != nil {
return utils.NewErrDispatcherS(err)
}
if err = engine.Cache.Set(utils.CacheDispatchers, tntID, d, nil, true, utils.EmptyString); err != nil {
return utils.NewErrDispatcherS(err)
}
if err = d.Dispatch(utils.IfaceAsString(ev.APIOpts[utils.OptsRouteID]), subsys, serviceMethod, args, reply); !rpcclient.IsNetworkError(err) {
if err = d.Dispatch(dS.dm, dS.fltrS, evNm, tnt, utils.IfaceAsString(ev.APIOpts[utils.OptsRouteID]), subsys, serviceMethod, args, reply); !rpcclient.IsNetworkError(err) {
return
}
}
@@ -224,7 +224,10 @@ func (dS *DispatcherService) V1GetProfilesForEvent(ev *utils.CGREvent,
if tnt == utils.EmptyString {
tnt = dS.cfg.GeneralCfg().DefaultTenant
}
retDPfl, errDpfl := dS.dispatcherProfilesForEvent(tnt, ev, utils.IfaceAsString(ev.APIOpts[utils.Subsys]))
retDPfl, errDpfl := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}, utils.IfaceAsString(ev.APIOpts[utils.Subsys]))
if errDpfl != nil {
return utils.NewErrDispatcherS(errDpfl)
}

View File

@@ -187,7 +187,10 @@ func TestDispatcherServiceDispatcherProfileForEventGetDispatchertWithoutAuthenti
},
}
tnt := ev.Tenant
_, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MetaAccounts)
_, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}, utils.MetaAccounts)
expected := utils.ErrNotImplemented
if err == nil || err != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)

View File

@@ -84,7 +84,10 @@ func TestDispatcherServiceDispatcherProfileForEventGetDispatcherProfileNF(t *tes
}
tnt := ev.Tenant
subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys])
_, err = dss.dispatcherProfilesForEvent(tnt, ev, subsys)
_, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}, subsys)
expected := utils.ErrNotImplemented
if err == nil || err != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -102,7 +105,10 @@ func TestDispatcherServiceDispatcherProfileForEventMIIDENotFound(t *testing.T) {
ev := &utils.CGREvent{}
tnt := ""
subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys])
_, err := dss.dispatcherProfilesForEvent(tnt, ev, subsys)
_, err := dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}, subsys)
if err == nil || err != utils.ErrNotFound {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.ErrNotFound, err)
}
@@ -571,7 +577,10 @@ func TestDispatcherServiceDispatcherProfileForEventErrNil(t *testing.T) {
}
tnt := ev.Tenant
subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys])
_, err = dss.dispatcherProfilesForEvent(tnt, ev, subsys)
_, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}, subsys)
if err != nil {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
}
@@ -615,7 +624,10 @@ func TestDispatcherV1GetProfileForEventReturn(t *testing.T) {
}
tnt := ev.Tenant
subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys])
_, err = dss.dispatcherProfilesForEvent(tnt, ev, subsys)
_, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}, subsys)
if err != nil {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
}
@@ -665,7 +677,10 @@ func TestDispatcherServiceDispatcherProfileForEventErrNotFound(t *testing.T) {
}
tnt := ev.Tenant
subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys])
_, err = dss.dispatcherProfilesForEvent(tnt, ev, subsys)
_, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}, subsys)
if err == nil || err != utils.ErrNotFound {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.ErrNotFound, err)
}
@@ -709,7 +724,10 @@ func TestDispatcherServiceDispatcherProfileForEventErrNotFound2(t *testing.T) {
}
tnt := ""
subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys])
_, err = dss.dispatcherProfilesForEvent(tnt, ev, subsys)
_, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}, subsys)
if err == nil || err != utils.ErrNotFound {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.ErrNotFound, err)
}
@@ -757,7 +775,10 @@ func TestDispatcherServiceDispatcherProfileForEventErrNotFoundTime(t *testing.T)
}
tnt := ev.Tenant
subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys])
_, err = dss.dispatcherProfilesForEvent(tnt, ev, subsys)
_, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}, subsys)
if err == nil || err != utils.ErrNotFound {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.ErrNotFound, err)
}
@@ -801,7 +822,10 @@ func TestDispatcherServiceDispatcherProfileForEventErrNotFoundFilter(t *testing.
}
tnt := ev.Tenant
subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys])
_, err = dss.dispatcherProfilesForEvent(tnt, ev, subsys)
_, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}, subsys)
if err == nil || err.Error() != "NOT_FOUND:filter" {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "NOT_FOUND:filter", err)
}
@@ -871,7 +895,7 @@ func TestDispatcherServiceDispatchDspErrHostNotFound(t *testing.T) {
Hosts: nil,
}
newCache := engine.NewCacheS(cfg, dm, nil)
value, errDsp := newDispatcher(dm, dsp)
value, errDsp := newDispatcher(dsp)
if errDsp != nil {
t.Fatal(errDsp)
}
@@ -956,7 +980,10 @@ func TestDispatcherServiceDispatcherProfileForEventFoundFilter(t *testing.T) {
}
tnt := ev.Tenant
subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys])
_, err = dss.dispatcherProfilesForEvent(tnt, ev, subsys)
_, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}, subsys)
if err == nil || err.Error() != "NOT_FOUND" {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "NOT_FOUND:filter", err)
}
@@ -996,7 +1023,10 @@ func TestDispatcherServiceDispatcherProfileForEventNotNotFound(t *testing.T) {
}
tnt := ev.Tenant
subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys])
_, err := dss.dispatcherProfilesForEvent(tnt, ev, subsys)
_, err := dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}, subsys)
expected := utils.ErrNotImplemented
if err == nil || err != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -1054,7 +1084,10 @@ func TestDispatcherServiceDispatcherProfileForEventGetDispatcherError(t *testing
}
tnt := ev.Tenant
subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys])
_, err = dss.dispatcherProfilesForEvent(tnt, ev, subsys)
_, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}, subsys)
if err == nil || err.Error() != "NOT_FOUND" {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "NOT_FOUND:filter", err)
}
@@ -1080,7 +1113,7 @@ func TestDispatcherServiceDispatchDspErrHostNotFound2(t *testing.T) {
Hosts: nil,
}
newCache := engine.NewCacheS(cfg, dm, nil)
value, errDsp := newDispatcher(dm, dsp)
value, errDsp := newDispatcher(dsp)
if errDsp != nil {
t.Fatal(errDsp)
}
@@ -1329,7 +1362,10 @@ func TestDispatchersdispatcherProfileForEventAnySSfalses(t *testing.T) {
}
subsys := utils.MetaSessionS
if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, subsys); err != nil {
if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}, subsys); err != nil {
t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err)
} else if len(rcv) != 1 {
t.Errorf("Unexpected number of profiles:%v", len(rcv))
@@ -1348,7 +1384,10 @@ func TestDispatchersdispatcherProfileForEventAnySSfalses(t *testing.T) {
t.Error(err)
}
if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, subsys); err != nil {
if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}, subsys); err != nil {
t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err)
} else if len(rcv) != 1 {
t.Errorf("Unexpected number of profiles:%v", len(rcv))
@@ -1413,7 +1452,10 @@ func TestDispatchersdispatcherProfileForEventAnySSfalseFirstNotFound(t *testing.
}
subsys := utils.MetaSessionS
if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, subsys); err != nil {
if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}, subsys); err != nil {
t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err)
} else if len(rcv) != 1 {
t.Errorf("Unexpected number of profiles:%v", len(rcv))
@@ -1478,7 +1520,10 @@ func TestDispatchersdispatcherProfileForEventAnySSfalseFound(t *testing.T) {
}
subsys := utils.MetaSessionS
if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, subsys); err != nil {
if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}, subsys); err != nil {
t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err)
} else if len(rcv) != 1 {
t.Errorf("Unexpected number of profiles:%v", len(rcv))
@@ -1543,7 +1588,10 @@ func TestDispatchersdispatcherProfileForEventAnySSfalseNotFound(t *testing.T) {
}
subsys := utils.MetaSessionS
if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, subsys); err == nil || err != utils.ErrNotFound {
if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}, subsys); err == nil || err != utils.ErrNotFound {
t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", utils.ErrNotFound, err)
} else if rcv != nil {
t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, rcv)
@@ -1605,7 +1653,10 @@ func TestDispatchersdispatcherProfileForEventAnySStrueNotFound(t *testing.T) {
}
subsys := utils.MetaSessionS
if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, subsys); err == nil || err != utils.ErrNotFound {
if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}, subsys); err == nil || err != utils.ErrNotFound {
t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", utils.ErrNotFound, err)
} else if rcv != nil {
t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, rcv)
@@ -1667,7 +1718,10 @@ func TestDispatchersdispatcherProfileForEventAnySStrueBothFound(t *testing.T) {
}
subsys := utils.MetaSessionS
if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, subsys); err != nil {
if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}, subsys); err != nil {
t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err)
} else if len(rcv) != 1 {
t.Errorf("Unexpected number of profiles:%v", len(rcv))
@@ -1681,7 +1735,10 @@ func TestDispatchersdispatcherProfileForEventAnySStrueBothFound(t *testing.T) {
t.Error(err)
}
if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, subsys); err != nil {
if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
}, subsys); err != nil {
t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err)
} else if len(rcv) != 1 {
t.Errorf("Unexpected number of profiles:%v", len(rcv))

View File

@@ -21,6 +21,7 @@ package dispatchers
import (
"encoding/gob"
"fmt"
"math/rand"
"sort"
"sync"
@@ -35,187 +36,131 @@ func init() {
}
// Dispatcher is responsible for routing requests to pool of connections
// there will be different implementations based on strategy
// Dispatcher is responsible for routing requests to pool of connections
// there will be different implementations based on strategy
type Dispatcher interface {
// SetProfile is used to update the configuration information within dispatcher
// to make sure we take decisions based on latest config
SetProfile(pfl *engine.DispatcherProfile)
// HostIDs returns the ordered list of host IDs
HostIDs() (hostIDs engine.DispatcherHostIDs)
// Dispatch is used to send the method over the connections given
Dispatch(routeID string, subsystem,
serviceMethod string, args interface{}, reply interface{}) (err error)
}
type strategyDispatcher interface {
// dispatch is used to send the method over the connections given
dispatch(dm *engine.DataManager, routeID string, subsystem, tnt string, hostIDs []string,
Dispatch(dm *engine.DataManager, flts *engine.FilterS,
ev utils.DataProvider, tnt, routeID, subsystem string,
serviceMethod string, args interface{}, reply interface{}) (err error)
}
// newDispatcher constructs instances of Dispatcher
func newDispatcher(dm *engine.DataManager, pfl *engine.DispatcherProfile) (d Dispatcher, err error) {
pfl.Hosts.Sort() // make sure the connections are sorted
func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) {
hosts := pfl.Hosts.Clone()
hosts.Sort() // make sure the connections are sorted
switch pfl.Strategy {
case utils.MetaWeight:
var strDsp strategyDispatcher
if strDsp, err = newSingleStrategyDispatcher(hosts, pfl.StrategyParams, pfl.TenantID()); err != nil {
return
}
d = &WeightDispatcher{
dm: dm,
tnt: pfl.Tenant,
hosts: hosts,
strategy: strDsp,
}
return newSingleDispatcher(hosts, pfl.StrategyParams, pfl.TenantID(), new(noSort))
case utils.MetaRandom:
var strDsp strategyDispatcher
if strDsp, err = newSingleStrategyDispatcher(hosts, pfl.StrategyParams, pfl.TenantID()); err != nil {
return
}
d = &RandomDispatcher{
dm: dm,
tnt: pfl.Tenant,
hosts: hosts,
strategy: strDsp,
}
return newSingleDispatcher(hosts, pfl.StrategyParams, pfl.TenantID(), new(randomSort))
case utils.MetaRoundRobin:
var strDsp strategyDispatcher
if strDsp, err = newSingleStrategyDispatcher(hosts, pfl.StrategyParams, pfl.TenantID()); err != nil {
return
}
d = &RoundRobinDispatcher{
dm: dm,
tnt: pfl.Tenant,
hosts: hosts,
strategy: strDsp,
}
return newSingleDispatcher(hosts, pfl.StrategyParams, pfl.TenantID(), new(roundRobinSort))
case rpcclient.PoolBroadcast,
rpcclient.PoolBroadcastSync,
rpcclient.PoolBroadcastAsync:
d = &WeightDispatcher{
dm: dm,
tnt: pfl.Tenant,
return &broadcastDispatcher{
strategy: pfl.Strategy,
hosts: hosts,
strategy: &broadcastStrategyDispatcher{strategy: pfl.Strategy},
}
}, nil
default:
err = fmt.Errorf("unsupported dispatch strategy: <%s>", pfl.Strategy)
}
return
}
// WeightDispatcher selects the next connection based on weight
type WeightDispatcher struct {
sync.RWMutex
dm *engine.DataManager
tnt string
hosts engine.DispatcherHostProfiles
strategy strategyDispatcher
}
// SetProfile used to implement Dispatcher interface
func (wd *WeightDispatcher) SetProfile(pfl *engine.DispatcherProfile) {
wd.Lock()
pfl.Hosts.Sort()
wd.hosts = pfl.Hosts.Clone() // avoid concurrency on profile
wd.Unlock()
}
// HostIDs used to implement Dispatcher interface
func (wd *WeightDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) {
wd.RLock()
hostIDs = wd.hosts.HostIDs()
wd.RUnlock()
return
}
// Dispatch used to implement Dispatcher interface
func (wd *WeightDispatcher) Dispatch(routeID string, subsystem,
serviceMethod string, args interface{}, reply interface{}) (err error) {
return wd.strategy.dispatch(wd.dm, routeID, subsystem, wd.tnt, wd.HostIDs(),
serviceMethod, args, reply)
}
// RandomDispatcher selects the next connection randomly
// together with RouteID can serve as load-balancer
type RandomDispatcher struct {
sync.RWMutex
dm *engine.DataManager
tnt string
hosts engine.DispatcherHostProfiles
strategy strategyDispatcher
}
// SetProfile used to implement Dispatcher interface
func (d *RandomDispatcher) SetProfile(pfl *engine.DispatcherProfile) {
d.Lock()
d.hosts = pfl.Hosts.Clone()
d.Unlock()
}
// HostIDs used to implement Dispatcher interface
func (d *RandomDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) {
d.RLock()
hostIDs = d.hosts.HostIDs()
d.RUnlock()
hostIDs.Shuffle() // randomize the connections
return
}
// Dispatch used to implement Dispatcher interface
func (d *RandomDispatcher) Dispatch(routeID string, subsystem,
serviceMethod string, args interface{}, reply interface{}) (err error) {
return d.strategy.dispatch(d.dm, routeID, subsystem, d.tnt, d.HostIDs(),
serviceMethod, args, reply)
}
// RoundRobinDispatcher selects the next connection in round-robin fashion
type RoundRobinDispatcher struct {
sync.RWMutex
dm *engine.DataManager
tnt string
hosts engine.DispatcherHostProfiles
hostIdx int // used for the next connection
strategy strategyDispatcher
}
// SetProfile used to implement Dispatcher interface
func (d *RoundRobinDispatcher) SetProfile(pfl *engine.DispatcherProfile) {
d.Lock()
d.hosts = pfl.Hosts.Clone()
d.Unlock()
}
// HostIDs used to implement Dispatcher interface
func (d *RoundRobinDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) {
d.RLock()
hostIDs = d.hosts.HostIDs()
hostIDs.ReorderFromIndex(d.hostIdx)
d.hostIdx++
if d.hostIdx >= len(d.hosts) {
d.hostIdx = 0
func getDispatcherHosts(fltrs *engine.FilterS, ev utils.DataProvider, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) {
hostIDs = make(engine.DispatcherHostIDs, 0, len(hosts))
for _, host := range hosts {
var pass bool
if pass, err = fltrs.Pass(tnt, host.FilterIDs, ev); err != nil {
return
}
if pass {
hostIDs = append(hostIDs, host.ID)
if host.Blocker {
break
}
}
}
d.RUnlock()
return
}
// Dispatch used to implement Dispatcher interface
func (d *RoundRobinDispatcher) Dispatch(routeID string, subsystem,
serviceMethod string, args interface{}, reply interface{}) (err error) {
return d.strategy.dispatch(d.dm, routeID, subsystem, d.tnt, d.HostIDs(),
serviceMethod, args, reply)
type hostSorter interface {
Sort(fltrs *engine.FilterS, ev utils.DataProvider, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error)
}
type singleResultstrategyDispatcher struct{}
type noSort struct{}
func (*singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID string, subsystem, tnt string,
hostIDs []string, serviceMethod string, args interface{}, reply interface{}) (err error) {
func (noSort) Sort(fltrs *engine.FilterS, ev utils.DataProvider, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) {
return getDispatcherHosts(fltrs, ev, tnt, hosts)
}
type randomSort struct{}
func (randomSort) Sort(fltrs *engine.FilterS, ev utils.DataProvider, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) {
rand.Shuffle(len(hosts), func(i, j int) {
hosts[i], hosts[j] = hosts[j], hosts[i]
})
return getDispatcherHosts(fltrs, ev, tnt, hosts)
}
type roundRobinSort struct{ nextIDx int }
func (rs *roundRobinSort) Sort(fltrs *engine.FilterS, ev utils.DataProvider, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) {
dh := make(engine.DispatcherHostProfiles, len(hosts))
idx := rs.nextIDx
for i := 0; i < len(dh); i++ {
if idx > len(dh)-1 {
idx = 0
}
dh[i] = hosts[idx]
idx++
}
rs.nextIDx++
if rs.nextIDx >= len(hosts) {
rs.nextIDx = 0
}
return getDispatcherHosts(fltrs, ev, tnt, dh)
}
func newSingleDispatcher(hosts engine.DispatcherHostProfiles, params map[string]interface{}, tntID string, sorter hostSorter) (_ Dispatcher, err error) {
if dflt, has := params[utils.MetaDefaultRatio]; has {
var ratio int64
if ratio, err = utils.IfaceAsTInt64(dflt); err != nil {
return
}
return &loadDispatcher{
tntID: tntID,
defaultRatio: ratio,
sorter: sorter,
hosts: hosts,
}, nil
}
for _, host := range hosts {
if _, has := host.Params[utils.MetaRatio]; has {
return &loadDispatcher{
tntID: tntID,
defaultRatio: 1,
sorter: sorter,
hosts: hosts,
}, nil
}
}
return &singleResultDispatcher{
sorter: sorter,
hosts: hosts,
}, nil
}
type singleResultDispatcher struct {
sorter hostSorter
hosts engine.DispatcherHostProfiles
}
func (sd *singleResultDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS,
ev utils.DataProvider, tnt, routeID, subsystem string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
var dH *engine.DispatcherHost
if routeID != utils.EmptyString {
// overwrite routeID with RouteID:Subsystem
@@ -229,6 +174,10 @@ func (*singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID
}
}
}
var hostIDs []string
if hostIDs, err = sd.sorter.Sort(flts, ev, tnt, sd.hosts); err != nil {
return
}
var called bool
for _, hostID := range hostIDs {
if dH, err = dm.GetDispatcherHost(tnt, hostID, true, true, utils.NonTransactional); err != nil {
@@ -260,12 +209,18 @@ func (*singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID
return
}
type broadcastStrategyDispatcher struct {
type broadcastDispatcher struct {
strategy string
hosts engine.DispatcherHostProfiles
}
func (b *broadcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID string, subsystem, tnt string, hostIDs []string,
func (b *broadcastDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS,
ev utils.DataProvider, tnt, routeID, subsystem string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
var hostIDs []string
if hostIDs, err = getDispatcherHosts(flts, ev, tnt, b.hosts); err != nil {
return
}
var hasHosts bool
pool := rpcclient.NewRPCPool(b.strategy, config.CgrConfig().GeneralCfg().ReplyTimeout)
for _, hostID := range hostIDs {
@@ -288,61 +243,15 @@ func (b *broadcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID s
return pool.Call(serviceMethod, args, reply)
}
func newSingleStrategyDispatcher(hosts engine.DispatcherHostProfiles, params map[string]interface{}, tntID string) (ls strategyDispatcher, err error) {
if dflt, has := params[utils.MetaDefaultRatio]; has {
var ratio int64
if ratio, err = utils.IfaceAsTInt64(dflt); err != nil {
return nil, err
}
return &loadStrategyDispatcher{
tntID: tntID,
hosts: hosts.Clone(),
defaultRatio: ratio,
}, nil
}
for _, host := range hosts {
if _, has := host.Params[utils.MetaRatio]; has {
return &loadStrategyDispatcher{
tntID: tntID,
hosts: hosts.Clone(),
defaultRatio: 1,
}, nil
}
}
return new(singleResultstrategyDispatcher), nil
}
type loadStrategyDispatcher struct {
type loadDispatcher struct {
tntID string
hosts engine.DispatcherHostProfiles
defaultRatio int64
sorter hostSorter
hosts engine.DispatcherHostProfiles
}
func newLoadMetrics(hosts engine.DispatcherHostProfiles, dfltRatio int64) (*LoadMetrics, error) {
lM := &LoadMetrics{
HostsLoad: make(map[string]int64),
HostsRatio: make(map[string]int64),
}
for _, host := range hosts {
if strRatio, has := host.Params[utils.MetaRatio]; !has {
lM.HostsRatio[host.ID] = dfltRatio
} else if ratio, err := utils.IfaceAsTInt64(strRatio); err != nil {
return nil, err
} else {
lM.HostsRatio[host.ID] = ratio
}
}
return lM, nil
}
// LoadMetrics the structure to save the metrix for load strategy
type LoadMetrics struct {
mutex sync.RWMutex
HostsLoad map[string]int64
HostsRatio map[string]int64
}
func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID string, subsystem, tnt string, hostIDs []string,
func (ld *loadDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS,
ev utils.DataProvider, tnt, routeID, subsystem string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
var dH *engine.DispatcherHost
var lM *LoadMetrics
@@ -370,8 +279,12 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID strin
}
}
}
var hostIDs []string
if hostIDs, err = ld.sorter.Sort(flts, ev, tnt, lM.getHosts(ld.hosts)); err != nil {
return
}
var called bool
for _, hostID := range lM.getHosts(hostIDs) {
for _, hostID := range hostIDs {
if dH, err = dm.GetDispatcherHost(tnt, hostID, true, true, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
utils.Logger.Warning(fmt.Sprintf("<%s> could not find host with ID %q",
@@ -404,40 +317,64 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID strin
return
}
func newLoadMetrics(hosts engine.DispatcherHostProfiles, dfltRatio int64) (*LoadMetrics, error) {
lM := &LoadMetrics{
HostsLoad: make(map[string]int64),
HostsRatio: make(map[string]int64),
}
for _, host := range hosts {
if strRatio, has := host.Params[utils.MetaRatio]; !has {
lM.HostsRatio[host.ID] = dfltRatio
} else if ratio, err := utils.IfaceAsTInt64(strRatio); err != nil {
return nil, err
} else {
lM.HostsRatio[host.ID] = ratio
}
}
return lM, nil
}
// LoadMetrics the structure to save the metrix for load strategy
type LoadMetrics struct {
mutex sync.RWMutex
HostsLoad map[string]int64
HostsRatio map[string]int64
}
// used to sort the host IDs based on costs
type hostCosts struct {
ids []string
multiple []int64
hosts engine.DispatcherHostProfiles
load []int64
}
func (hc *hostCosts) Len() int { return len(hc.ids) }
func (hc *hostCosts) Less(i, j int) bool { return hc.multiple[i] < hc.multiple[j] }
func (hc *hostCosts) Len() int { return len(hc.hosts) }
func (hc *hostCosts) Less(i, j int) bool { return hc.load[i] < hc.load[j] }
func (hc *hostCosts) Swap(i, j int) {
hc.multiple[i], hc.multiple[j] = hc.multiple[j], hc.multiple[i]
hc.ids[i], hc.ids[j] = hc.ids[j], hc.ids[i]
hc.load[i], hc.load[j] = hc.load[j], hc.load[i]
hc.hosts[i], hc.hosts[j] = hc.hosts[j], hc.hosts[i]
}
func (lM *LoadMetrics) getHosts(hostIDs []string) []string {
func (lM *LoadMetrics) getHosts(hosts engine.DispatcherHostProfiles) engine.DispatcherHostProfiles {
hlp := &hostCosts{
ids: make([]string, 0, len(hostIDs)),
multiple: make([]int64, 0, len(hostIDs)),
hosts: make(engine.DispatcherHostProfiles, 0, len(hosts)),
load: make([]int64, 0, len(hosts)),
}
lM.mutex.RLock()
for _, id := range hostIDs {
for _, host := range hosts {
switch {
case lM.HostsRatio[id] < 0:
hlp.multiple = append(hlp.multiple, 0)
case lM.HostsRatio[id] == 0:
case lM.HostsRatio[host.ID] < 0:
hlp.load = append(hlp.load, 0)
case lM.HostsRatio[host.ID] == 0:
continue
default:
hlp.multiple = append(hlp.multiple, lM.HostsLoad[id]/lM.HostsRatio[id])
hlp.load = append(hlp.load, lM.HostsLoad[host.ID]/lM.HostsRatio[host.ID])
}
hlp.ids = append(hlp.ids, id)
hlp.hosts = append(hlp.hosts, host)
}
lM.mutex.RUnlock()
sort.Stable(hlp)
return hlp.ids
return hlp.hosts
}
func (lM *LoadMetrics) incrementLoad(hostID, tntID string) {

File diff suppressed because it is too large Load Diff