Added *default_ratio for dispatchers with load strategy

This commit is contained in:
Trial97
2020-08-18 17:09:27 +03:00
committed by Dan Christian Bogos
parent cb21fb0bc3
commit 536b4fa5b2
6 changed files with 112 additions and 26 deletions

View File

@@ -13,5 +13,5 @@ cgrates.org,EVENT4,*any,*string:~*req.EventName:Broadcast,,*broadcast,,ALL2,,20,
cgrates.org,EVENT4,,,,,,ALL,,10,,,
cgrates.org,EVENT5,*any,*string:~*req.EventName:Internal,,*weight,,SELF,,20,false,,20
cgrates.org,EVENT6,*any,*string:~*opts.*method:DispatcherSv1.GetProfileForEvent,,*weight,,SELF,,20,false,,20
cgrates.org,EVENT7,*any,*string:~*opts.EventType:LoadDispatcher,,*weight,,ALL,,20,false,,207
cgrates.org,EVENT7,*any,,,*weight,,ALL2,,20,false,*ratio:1,207
cgrates.org,EVENT7,*any,*string:~*opts.EventType:LoadDispatcher,,*weight,*default_ratio:1,ALL,,20,false,,20
cgrates.org,EVENT7,,,,,,ALL2,,20,,*ratio:1,20
1 #Tenant ID Subsystems FilterIDs ActivationInterval Strategy StrategyParameters ConnID ConnFilterIDs ConnWeight ConnBlocker ConnParameters Weight
13 cgrates.org EVENT4 ALL 10
14 cgrates.org EVENT5 *any *string:~*req.EventName:Internal *weight SELF 20 false 20
15 cgrates.org EVENT6 *any *string:~*opts.*method:DispatcherSv1.GetProfileForEvent *weight SELF 20 false 20
16 cgrates.org EVENT7 *any *string:~*opts.EventType:LoadDispatcher *weight *default_ratio:1 ALL 20 false 207 20
17 cgrates.org EVENT7 *any *weight ALL2 20 false *ratio:1 207 20

View File

@@ -13,5 +13,5 @@ cgrates.org,EVENT4,*any,*string:~*req.EventName:Broadcast,,*broadcast,,ALL2,,20,
cgrates.org,EVENT4,,,,,,ALL,,10,,,
cgrates.org,EVENT5,*any,*string:~*req.EventName:Internal,,*weight,,SELF,,20,false,,20
cgrates.org,EVENT6,*any,*string:~*opts.*method:DispatcherSv1.GetProfileForEvent,,*weight,,SELF,,20,false,,20
cgrates.org,EVENT7,*any,*string:~*opts.EventType:LoadDispatcher,,*weight,,ALL,,20,false,,207
cgrates.org,EVENT7,*any,,,*weight,,ALL2,,20,false,*ratio:1,207
cgrates.org,EVENT7,*any,*string:~*opts.EventType:LoadDispatcher,,*weight,*default_ratio:1,ALL,,20,false,,20
cgrates.org,EVENT7,,,,,,ALL2,,20,,*ratio:1,20
1 #Tenant ID Subsystems FilterIDs ActivationInterval Strategy StrategyParameters ConnID ConnFilterIDs ConnWeight ConnBlocker ConnParameters Weight
13 cgrates.org EVENT4 ALL 10
14 cgrates.org EVENT5 *any *string:~*req.EventName:Internal *weight SELF 20 false 20
15 cgrates.org EVENT6 *any *string:~*opts.*method:DispatcherSv1.GetProfileForEvent *weight SELF 20 false 20
16 cgrates.org EVENT7 *any *string:~*opts.EventType:LoadDispatcher *weight *default_ratio:1 ALL 20 false 207 20
17 cgrates.org EVENT7 *any *weight ALL2 20 false *ratio:1 207 20

View File

@@ -61,25 +61,37 @@ func newDispatcher(dm *engine.DataManager, pfl *engine.DispatcherProfile) (d Dis
hosts := pfl.Hosts.Clone()
switch pfl.Strategy {
case utils.MetaWeight:
var strDsp strategyDispatcher
if strDsp, err = newSingleStrategyDispatcher(hosts, pfl.StrategyParams, pfl.TenantID()); err != nil {
return
}
d = &WeightDispatcher{
dm: dm,
tnt: pfl.Tenant,
hosts: hosts,
strategy: newSingleStrategyDispatcher(hosts, pfl.TenantID()),
strategy: strDsp,
}
case utils.MetaRandom:
var strDsp strategyDispatcher
if strDsp, err = newSingleStrategyDispatcher(hosts, pfl.StrategyParams, pfl.TenantID()); err != nil {
return
}
d = &RandomDispatcher{
dm: dm,
tnt: pfl.Tenant,
hosts: hosts,
strategy: newSingleStrategyDispatcher(hosts, pfl.TenantID()),
strategy: strDsp,
}
case utils.MetaRoundRobin:
var strDsp strategyDispatcher
if strDsp, err = newSingleStrategyDispatcher(hosts, pfl.StrategyParams, pfl.TenantID()); err != nil {
return
}
d = &RoundRobinDispatcher{
dm: dm,
tnt: pfl.Tenant,
hosts: hosts,
strategy: newSingleStrategyDispatcher(hosts, pfl.TenantID()),
strategy: strDsp,
}
case utils.MetaBroadcast:
d = &WeightDispatcher{
@@ -103,6 +115,7 @@ type WeightDispatcher struct {
strategy strategyDispatcher
}
// SetProfile used to implement Dispatcher interface
func (wd *WeightDispatcher) SetProfile(pfl *engine.DispatcherProfile) {
wd.Lock()
pfl.Hosts.Sort()
@@ -111,6 +124,7 @@ func (wd *WeightDispatcher) SetProfile(pfl *engine.DispatcherProfile) {
return
}
// HostIDs used to implement Dispatcher interface
func (wd *WeightDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) {
wd.RLock()
hostIDs = wd.hosts.HostIDs()
@@ -118,6 +132,7 @@ func (wd *WeightDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) {
return
}
// Dispatch used to implement Dispatcher interface
func (wd *WeightDispatcher) Dispatch(routeID string, subsystem,
serviceMethod string, args interface{}, reply interface{}) (err error) {
return wd.strategy.dispatch(wd.dm, routeID, subsystem, wd.tnt, wd.HostIDs(),
@@ -134,6 +149,7 @@ type RandomDispatcher struct {
strategy strategyDispatcher
}
// SetProfile used to implement Dispatcher interface
func (d *RandomDispatcher) SetProfile(pfl *engine.DispatcherProfile) {
d.Lock()
d.hosts = pfl.Hosts.Clone()
@@ -141,6 +157,7 @@ func (d *RandomDispatcher) SetProfile(pfl *engine.DispatcherProfile) {
return
}
// HostIDs used to implement Dispatcher interface
func (d *RandomDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) {
d.RLock()
hostIDs = d.hosts.HostIDs()
@@ -149,6 +166,7 @@ func (d *RandomDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) {
return
}
// Dispatch used to implement Dispatcher interface
func (d *RandomDispatcher) Dispatch(routeID string, subsystem,
serviceMethod string, args interface{}, reply interface{}) (err error) {
return d.strategy.dispatch(d.dm, routeID, subsystem, d.tnt, d.HostIDs(),
@@ -165,6 +183,7 @@ type RoundRobinDispatcher struct {
strategy strategyDispatcher
}
// SetProfile used to implement Dispatcher interface
func (d *RoundRobinDispatcher) SetProfile(pfl *engine.DispatcherProfile) {
d.Lock()
d.hosts = pfl.Hosts.Clone()
@@ -172,6 +191,7 @@ func (d *RoundRobinDispatcher) SetProfile(pfl *engine.DispatcherProfile) {
return
}
// HostIDs used to implement Dispatcher interface
func (d *RoundRobinDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) {
d.RLock()
hostIDs = d.hosts.HostIDs()
@@ -184,6 +204,7 @@ func (d *RoundRobinDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) {
return
}
// Dispatch used to implement Dispatcher interface
func (d *RoundRobinDispatcher) Dispatch(routeID string, subsystem,
serviceMethod string, args interface{}, reply interface{}) (err error) {
return d.strategy.dispatch(d.dm, routeID, subsystem, d.tnt, d.HostIDs(),
@@ -253,24 +274,37 @@ func (*brodcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID stri
return
}
func newSingleStrategyDispatcher(hosts engine.DispatcherHostProfiles, tntID string) (ls strategyDispatcher) {
func newSingleStrategyDispatcher(hosts engine.DispatcherHostProfiles, params map[string]interface{}, tntID string) (ls strategyDispatcher, err error) {
if dflt, has := params[utils.MetaDefaultRatio]; has {
var ratio int64
if ratio, err = utils.IfaceAsTInt64(dflt); err != nil {
return nil, err
}
return &loadStrategyDispatcher{
tntID: tntID,
hosts: hosts.Clone(),
defaultRatio: ratio,
}, nil
}
for _, host := range hosts {
if _, has := host.Params[utils.MetaRatio]; has {
return &loadStrategyDispatcher{
tntID: tntID,
hosts: hosts.Clone(),
}
tntID: tntID,
hosts: hosts.Clone(),
defaultRatio: 1,
}, nil
}
}
return new(singleResultstrategyDispatcher)
return new(singleResultstrategyDispatcher), nil
}
type loadStrategyDispatcher struct {
tntID string
hosts engine.DispatcherHostProfiles
tntID string
hosts engine.DispatcherHostProfiles
defaultRatio int64
}
func newLoadMetrics(hosts engine.DispatcherHostProfiles) (*LoadMetrics, error) {
func newLoadMetrics(hosts engine.DispatcherHostProfiles, dfltRatio int64) (*LoadMetrics, error) {
lM := &LoadMetrics{
HostsLoad: make(map[string]int64),
HostsRatio: make(map[string]int64),
@@ -278,8 +312,8 @@ func newLoadMetrics(hosts engine.DispatcherHostProfiles) (*LoadMetrics, error) {
}
for _, host := range hosts {
if strRatio, has := host.Params[utils.MetaRatio]; !has {
lM.HostsRatio[host.ID] = 1
lM.SumRatio++
lM.HostsRatio[host.ID] = dfltRatio
lM.SumRatio += dfltRatio
} else if ratio, err := utils.IfaceAsTInt64(strRatio); err != nil {
return nil, err
} else {
@@ -307,7 +341,7 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID strin
if lM, canCast = x.(*LoadMetrics); !canCast {
return fmt.Errorf("cannot cast %+v to *LoadMetrics", x)
}
} else if lM, err = newLoadMetrics(ld.hosts); err != nil {
} else if lM, err = newLoadMetrics(ld.hosts, ld.defaultRatio); err != nil {
return
}

View File

@@ -34,7 +34,7 @@ func TestLoadMetricsGetHosts(t *testing.T) {
{ID: "DSP_4", Params: map[string]interface{}{utils.MetaRatio: 1}},
{ID: "DSP_5", Params: map[string]interface{}{utils.MetaRatio: 1}},
}
lm, err := newLoadMetrics(dhp)
lm, err := newLoadMetrics(dhp, 1)
if err != nil {
t.Fatal(err)
}
@@ -70,7 +70,9 @@ func TestNewSingleStrategyDispatcher(t *testing.T) {
{ID: "DSP_5"},
}
var exp strategyDispatcher = new(singleResultstrategyDispatcher)
if rply := newSingleStrategyDispatcher(dhp, utils.EmptyString); !reflect.DeepEqual(exp, rply) {
if rply, err := newSingleStrategyDispatcher(dhp, map[string]interface{}{}, utils.EmptyString); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(exp, rply) {
t.Errorf("Expected: singleResultstrategyDispatcher structure,received: %s", utils.ToJSON(rply))
}
@@ -82,11 +84,62 @@ func TestNewSingleStrategyDispatcher(t *testing.T) {
{ID: "DSP_5", Params: map[string]interface{}{utils.MetaRatio: 1}},
}
exp = &loadStrategyDispatcher{
hosts: dhp,
tntID: "cgrates.org",
hosts: dhp,
tntID: "cgrates.org",
defaultRatio: 1,
}
if rply := newSingleStrategyDispatcher(dhp, "cgrates.org"); !reflect.DeepEqual(exp, rply) {
if rply, err := newSingleStrategyDispatcher(dhp, map[string]interface{}{}, "cgrates.org"); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(exp, rply) {
t.Errorf("Expected: loadStrategyDispatcher structure,received: %s", utils.ToJSON(rply))
}
dhp = engine.DispatcherHostProfiles{
{ID: "DSP_1"},
{ID: "DSP_2"},
{ID: "DSP_3"},
{ID: "DSP_4"},
}
exp = &loadStrategyDispatcher{
hosts: dhp,
tntID: "cgrates.org",
defaultRatio: 2,
}
if rply, err := newSingleStrategyDispatcher(dhp, map[string]interface{}{utils.MetaDefaultRatio: 2}, "cgrates.org"); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(exp, rply) {
t.Errorf("Expected: loadStrategyDispatcher structure,received: %s", utils.ToJSON(rply))
}
if _, err := newSingleStrategyDispatcher(dhp, map[string]interface{}{utils.MetaDefaultRatio: "A"}, "cgrates.org"); err == nil {
t.Fatalf("Expected error received: %v", err)
}
}
func TestNewLoadMetrics(t *testing.T) {
dhp := engine.DispatcherHostProfiles{
{ID: "DSP_1", Params: map[string]interface{}{utils.MetaRatio: 1}},
{ID: "DSP_2", Params: map[string]interface{}{utils.MetaRatio: 1}},
{ID: "DSP_3"},
}
exp := &LoadMetrics{
HostsLoad: map[string]int64{},
HostsRatio: map[string]int64{
"DSP_1": 1,
"DSP_2": 1,
"DSP_3": 2,
},
SumRatio: 4,
}
if lm, err := newLoadMetrics(dhp, 2); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(exp, lm) {
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(exp), utils.ToJSON(lm))
}
dhp = engine.DispatcherHostProfiles{
{ID: "DSP_1", Params: map[string]interface{}{utils.MetaRatio: "A"}},
}
if _, err := newLoadMetrics(dhp, 2); err == nil {
t.Errorf("Expected error received: %v", err)
}
}

View File

@@ -279,9 +279,7 @@ func TestDispatcherHostIDsProfilesReorderFromIndex(t *testing.T) {
func TestDispatcherHostIDsProfilesShuffle(t *testing.T) {
dConns := DispatcherHostIDs{"DSP_1", "DSP_2", "DSP_3", "DSP_4"}
oConns := DispatcherHostIDs{"DSP_1", "DSP_2", "DSP_3", "DSP_4"}
if dConns.Shuffle(); dConns[0] == oConns[0] ||
dConns[1] == oConns[1] || dConns[2] == oConns[2] ||
dConns[3] == oConns[3] {
if dConns.Shuffle(); reflect.DeepEqual(dConns, oConns) {
t.Errorf("received: %s", utils.ToJSON(dConns))
}
}

View File

@@ -1031,6 +1031,7 @@ const (
MetaBroadcast = "*broadcast"
MetaRoundRobin = "*round_robin"
MetaRatio = "*ratio"
MetaDefaultRatio = "*default_ratio"
ThresholdSv1 = "ThresholdSv1"
StatSv1 = "StatSv1"
ResourceSv1 = "ResourceSv1"