diff --git a/data/tariffplans/dispatchers/DispatcherProfiles.csv b/data/tariffplans/dispatchers/DispatcherProfiles.csv index 5214c0e2f..8740f54fd 100644 --- a/data/tariffplans/dispatchers/DispatcherProfiles.csv +++ b/data/tariffplans/dispatchers/DispatcherProfiles.csv @@ -13,5 +13,5 @@ cgrates.org,EVENT4,*any,*string:~*req.EventName:Broadcast,,*broadcast,,ALL2,,20, cgrates.org,EVENT4,,,,,,ALL,,10,,, cgrates.org,EVENT5,*any,*string:~*req.EventName:Internal,,*weight,,SELF,,20,false,,20 cgrates.org,EVENT6,*any,*string:~*opts.*method:DispatcherSv1.GetProfileForEvent,,*weight,,SELF,,20,false,,20 -cgrates.org,EVENT7,*any,*string:~*opts.EventType:LoadDispatcher,,*weight,,ALL,,20,false,,207 -cgrates.org,EVENT7,*any,,,*weight,,ALL2,,20,false,*ratio:1,207 \ No newline at end of file +cgrates.org,EVENT7,*any,*string:~*opts.EventType:LoadDispatcher,,*weight,*default_ratio:1,ALL,,20,false,,20 +cgrates.org,EVENT7,,,,,,ALL2,,20,,*ratio:1,20 \ No newline at end of file diff --git a/data/tariffplans/dispatchers_gob/DispatcherProfiles.csv b/data/tariffplans/dispatchers_gob/DispatcherProfiles.csv index 5214c0e2f..8740f54fd 100644 --- a/data/tariffplans/dispatchers_gob/DispatcherProfiles.csv +++ b/data/tariffplans/dispatchers_gob/DispatcherProfiles.csv @@ -13,5 +13,5 @@ cgrates.org,EVENT4,*any,*string:~*req.EventName:Broadcast,,*broadcast,,ALL2,,20, cgrates.org,EVENT4,,,,,,ALL,,10,,, cgrates.org,EVENT5,*any,*string:~*req.EventName:Internal,,*weight,,SELF,,20,false,,20 cgrates.org,EVENT6,*any,*string:~*opts.*method:DispatcherSv1.GetProfileForEvent,,*weight,,SELF,,20,false,,20 -cgrates.org,EVENT7,*any,*string:~*opts.EventType:LoadDispatcher,,*weight,,ALL,,20,false,,207 -cgrates.org,EVENT7,*any,,,*weight,,ALL2,,20,false,*ratio:1,207 \ No newline at end of file +cgrates.org,EVENT7,*any,*string:~*opts.EventType:LoadDispatcher,,*weight,*default_ratio:1,ALL,,20,false,,20 +cgrates.org,EVENT7,,,,,,ALL2,,20,,*ratio:1,20 \ No newline at end of file diff --git a/dispatchers/libdispatcher.go b/dispatchers/libdispatcher.go index 0e8d57661..2763a1d2e 100644 --- a/dispatchers/libdispatcher.go +++ b/dispatchers/libdispatcher.go @@ -61,25 +61,37 @@ func newDispatcher(dm *engine.DataManager, pfl *engine.DispatcherProfile) (d Dis hosts := pfl.Hosts.Clone() switch pfl.Strategy { case utils.MetaWeight: + var strDsp strategyDispatcher + if strDsp, err = newSingleStrategyDispatcher(hosts, pfl.StrategyParams, pfl.TenantID()); err != nil { + return + } d = &WeightDispatcher{ dm: dm, tnt: pfl.Tenant, hosts: hosts, - strategy: newSingleStrategyDispatcher(hosts, pfl.TenantID()), + strategy: strDsp, } case utils.MetaRandom: + var strDsp strategyDispatcher + if strDsp, err = newSingleStrategyDispatcher(hosts, pfl.StrategyParams, pfl.TenantID()); err != nil { + return + } d = &RandomDispatcher{ dm: dm, tnt: pfl.Tenant, hosts: hosts, - strategy: newSingleStrategyDispatcher(hosts, pfl.TenantID()), + strategy: strDsp, } case utils.MetaRoundRobin: + var strDsp strategyDispatcher + if strDsp, err = newSingleStrategyDispatcher(hosts, pfl.StrategyParams, pfl.TenantID()); err != nil { + return + } d = &RoundRobinDispatcher{ dm: dm, tnt: pfl.Tenant, hosts: hosts, - strategy: newSingleStrategyDispatcher(hosts, pfl.TenantID()), + strategy: strDsp, } case utils.MetaBroadcast: d = &WeightDispatcher{ @@ -103,6 +115,7 @@ type WeightDispatcher struct { strategy strategyDispatcher } +// SetProfile used to implement Dispatcher interface func (wd *WeightDispatcher) SetProfile(pfl *engine.DispatcherProfile) { wd.Lock() pfl.Hosts.Sort() @@ -111,6 +124,7 @@ func (wd *WeightDispatcher) SetProfile(pfl *engine.DispatcherProfile) { return } +// HostIDs used to implement Dispatcher interface func (wd *WeightDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) { wd.RLock() hostIDs = wd.hosts.HostIDs() @@ -118,6 +132,7 @@ func (wd *WeightDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) { return } +// Dispatch used to implement Dispatcher interface func (wd *WeightDispatcher) Dispatch(routeID string, subsystem, serviceMethod string, args interface{}, reply interface{}) (err error) { return wd.strategy.dispatch(wd.dm, routeID, subsystem, wd.tnt, wd.HostIDs(), @@ -134,6 +149,7 @@ type RandomDispatcher struct { strategy strategyDispatcher } +// SetProfile used to implement Dispatcher interface func (d *RandomDispatcher) SetProfile(pfl *engine.DispatcherProfile) { d.Lock() d.hosts = pfl.Hosts.Clone() @@ -141,6 +157,7 @@ func (d *RandomDispatcher) SetProfile(pfl *engine.DispatcherProfile) { return } +// HostIDs used to implement Dispatcher interface func (d *RandomDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) { d.RLock() hostIDs = d.hosts.HostIDs() @@ -149,6 +166,7 @@ func (d *RandomDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) { return } +// Dispatch used to implement Dispatcher interface func (d *RandomDispatcher) Dispatch(routeID string, subsystem, serviceMethod string, args interface{}, reply interface{}) (err error) { return d.strategy.dispatch(d.dm, routeID, subsystem, d.tnt, d.HostIDs(), @@ -165,6 +183,7 @@ type RoundRobinDispatcher struct { strategy strategyDispatcher } +// SetProfile used to implement Dispatcher interface func (d *RoundRobinDispatcher) SetProfile(pfl *engine.DispatcherProfile) { d.Lock() d.hosts = pfl.Hosts.Clone() @@ -172,6 +191,7 @@ func (d *RoundRobinDispatcher) SetProfile(pfl *engine.DispatcherProfile) { return } +// HostIDs used to implement Dispatcher interface func (d *RoundRobinDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) { d.RLock() hostIDs = d.hosts.HostIDs() @@ -184,6 +204,7 @@ func (d *RoundRobinDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) { return } +// Dispatch used to implement Dispatcher interface func (d *RoundRobinDispatcher) Dispatch(routeID string, subsystem, serviceMethod string, args interface{}, reply interface{}) (err error) { return d.strategy.dispatch(d.dm, routeID, subsystem, d.tnt, d.HostIDs(), @@ -253,24 +274,37 @@ func (*brodcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID stri return } -func newSingleStrategyDispatcher(hosts engine.DispatcherHostProfiles, tntID string) (ls strategyDispatcher) { +func newSingleStrategyDispatcher(hosts engine.DispatcherHostProfiles, params map[string]interface{}, tntID string) (ls strategyDispatcher, err error) { + if dflt, has := params[utils.MetaDefaultRatio]; has { + var ratio int64 + if ratio, err = utils.IfaceAsTInt64(dflt); err != nil { + return nil, err + } + return &loadStrategyDispatcher{ + tntID: tntID, + hosts: hosts.Clone(), + defaultRatio: ratio, + }, nil + } for _, host := range hosts { if _, has := host.Params[utils.MetaRatio]; has { return &loadStrategyDispatcher{ - tntID: tntID, - hosts: hosts.Clone(), - } + tntID: tntID, + hosts: hosts.Clone(), + defaultRatio: 1, + }, nil } } - return new(singleResultstrategyDispatcher) + return new(singleResultstrategyDispatcher), nil } type loadStrategyDispatcher struct { - tntID string - hosts engine.DispatcherHostProfiles + tntID string + hosts engine.DispatcherHostProfiles + defaultRatio int64 } -func newLoadMetrics(hosts engine.DispatcherHostProfiles) (*LoadMetrics, error) { +func newLoadMetrics(hosts engine.DispatcherHostProfiles, dfltRatio int64) (*LoadMetrics, error) { lM := &LoadMetrics{ HostsLoad: make(map[string]int64), HostsRatio: make(map[string]int64), @@ -278,8 +312,8 @@ func newLoadMetrics(hosts engine.DispatcherHostProfiles) (*LoadMetrics, error) { } for _, host := range hosts { if strRatio, has := host.Params[utils.MetaRatio]; !has { - lM.HostsRatio[host.ID] = 1 - lM.SumRatio++ + lM.HostsRatio[host.ID] = dfltRatio + lM.SumRatio += dfltRatio } else if ratio, err := utils.IfaceAsTInt64(strRatio); err != nil { return nil, err } else { @@ -307,7 +341,7 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID strin if lM, canCast = x.(*LoadMetrics); !canCast { return fmt.Errorf("cannot cast %+v to *LoadMetrics", x) } - } else if lM, err = newLoadMetrics(ld.hosts); err != nil { + } else if lM, err = newLoadMetrics(ld.hosts, ld.defaultRatio); err != nil { return } diff --git a/dispatchers/libdispatcher_test.go b/dispatchers/libdispatcher_test.go index 23dbb601f..d1bae2cb2 100644 --- a/dispatchers/libdispatcher_test.go +++ b/dispatchers/libdispatcher_test.go @@ -34,7 +34,7 @@ func TestLoadMetricsGetHosts(t *testing.T) { {ID: "DSP_4", Params: map[string]interface{}{utils.MetaRatio: 1}}, {ID: "DSP_5", Params: map[string]interface{}{utils.MetaRatio: 1}}, } - lm, err := newLoadMetrics(dhp) + lm, err := newLoadMetrics(dhp, 1) if err != nil { t.Fatal(err) } @@ -70,7 +70,9 @@ func TestNewSingleStrategyDispatcher(t *testing.T) { {ID: "DSP_5"}, } var exp strategyDispatcher = new(singleResultstrategyDispatcher) - if rply := newSingleStrategyDispatcher(dhp, utils.EmptyString); !reflect.DeepEqual(exp, rply) { + if rply, err := newSingleStrategyDispatcher(dhp, map[string]interface{}{}, utils.EmptyString); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(exp, rply) { t.Errorf("Expected: singleResultstrategyDispatcher structure,received: %s", utils.ToJSON(rply)) } @@ -82,11 +84,62 @@ func TestNewSingleStrategyDispatcher(t *testing.T) { {ID: "DSP_5", Params: map[string]interface{}{utils.MetaRatio: 1}}, } exp = &loadStrategyDispatcher{ - hosts: dhp, - tntID: "cgrates.org", + hosts: dhp, + tntID: "cgrates.org", + defaultRatio: 1, } - if rply := newSingleStrategyDispatcher(dhp, "cgrates.org"); !reflect.DeepEqual(exp, rply) { + if rply, err := newSingleStrategyDispatcher(dhp, map[string]interface{}{}, "cgrates.org"); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(exp, rply) { t.Errorf("Expected: loadStrategyDispatcher structure,received: %s", utils.ToJSON(rply)) + } + dhp = engine.DispatcherHostProfiles{ + {ID: "DSP_1"}, + {ID: "DSP_2"}, + {ID: "DSP_3"}, + {ID: "DSP_4"}, + } + exp = &loadStrategyDispatcher{ + hosts: dhp, + tntID: "cgrates.org", + defaultRatio: 2, + } + if rply, err := newSingleStrategyDispatcher(dhp, map[string]interface{}{utils.MetaDefaultRatio: 2}, "cgrates.org"); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(exp, rply) { + t.Errorf("Expected: loadStrategyDispatcher structure,received: %s", utils.ToJSON(rply)) + } + + if _, err := newSingleStrategyDispatcher(dhp, map[string]interface{}{utils.MetaDefaultRatio: "A"}, "cgrates.org"); err == nil { + t.Fatalf("Expected error received: %v", err) + } +} + +func TestNewLoadMetrics(t *testing.T) { + dhp := engine.DispatcherHostProfiles{ + {ID: "DSP_1", Params: map[string]interface{}{utils.MetaRatio: 1}}, + {ID: "DSP_2", Params: map[string]interface{}{utils.MetaRatio: 1}}, + {ID: "DSP_3"}, + } + exp := &LoadMetrics{ + HostsLoad: map[string]int64{}, + HostsRatio: map[string]int64{ + "DSP_1": 1, + "DSP_2": 1, + "DSP_3": 2, + }, + SumRatio: 4, + } + if lm, err := newLoadMetrics(dhp, 2); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(exp, lm) { + t.Errorf("Expected: %s ,received: %s", utils.ToJSON(exp), utils.ToJSON(lm)) + } + dhp = engine.DispatcherHostProfiles{ + {ID: "DSP_1", Params: map[string]interface{}{utils.MetaRatio: "A"}}, + } + if _, err := newLoadMetrics(dhp, 2); err == nil { + t.Errorf("Expected error received: %v", err) } } diff --git a/engine/dispatcherprfl_test.go b/engine/dispatcherprfl_test.go index 06b7369cb..bc383dce1 100644 --- a/engine/dispatcherprfl_test.go +++ b/engine/dispatcherprfl_test.go @@ -279,9 +279,7 @@ func TestDispatcherHostIDsProfilesReorderFromIndex(t *testing.T) { func TestDispatcherHostIDsProfilesShuffle(t *testing.T) { dConns := DispatcherHostIDs{"DSP_1", "DSP_2", "DSP_3", "DSP_4"} oConns := DispatcherHostIDs{"DSP_1", "DSP_2", "DSP_3", "DSP_4"} - if dConns.Shuffle(); dConns[0] == oConns[0] || - dConns[1] == oConns[1] || dConns[2] == oConns[2] || - dConns[3] == oConns[3] { + if dConns.Shuffle(); reflect.DeepEqual(dConns, oConns) { t.Errorf("received: %s", utils.ToJSON(dConns)) } } diff --git a/utils/consts.go b/utils/consts.go index ffb9286ac..475466126 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -1031,6 +1031,7 @@ const ( MetaBroadcast = "*broadcast" MetaRoundRobin = "*round_robin" MetaRatio = "*ratio" + MetaDefaultRatio = "*default_ratio" ThresholdSv1 = "ThresholdSv1" StatSv1 = "StatSv1" ResourceSv1 = "ResourceSv1"