mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Updated dispatcher host matching based on load if *ratio is specified
This commit is contained in:
committed by
Dan Christian Bogos
parent
bbe9c9ca4b
commit
ffb51d7d88
@@ -1507,6 +1507,8 @@ func testApierResetDataAfterLoadFromFolder(t *testing.T) {
|
||||
expStats[utils.CacheResourceFilterIndexes].Groups = 1
|
||||
expStats[utils.CacheAttributeFilterIndexes].Items = 4
|
||||
expStats[utils.CacheAttributeFilterIndexes].Groups = 1
|
||||
expStats[utils.CacheReverseFilterIndexes].Items = 10
|
||||
expStats[utils.CacheReverseFilterIndexes].Groups = 7
|
||||
|
||||
if err := rater.Call(utils.CacheSv1GetCacheStats, new(utils.AttrCacheIDsWithOpts), &rcvStats); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
@@ -172,6 +172,8 @@ func testCacheSAfterLoadFromFolder(t *testing.T) {
|
||||
expStats[utils.CacheResourceFilterIndexes].Groups = 1
|
||||
expStats[utils.CacheAttributeFilterIndexes].Items = 4
|
||||
expStats[utils.CacheAttributeFilterIndexes].Groups = 1
|
||||
expStats[utils.CacheReverseFilterIndexes].Items = 10
|
||||
expStats[utils.CacheReverseFilterIndexes].Groups = 7
|
||||
|
||||
if err := chcRPC.Call(utils.CacheSv1GetCacheStats, &utils.AttrCacheIDsWithOpts{}, &rcvStats); err != nil {
|
||||
t.Error(err)
|
||||
@@ -238,6 +240,8 @@ func testCacheSReload(t *testing.T) {
|
||||
expStats[utils.CacheResourceFilterIndexes].Groups = 1
|
||||
expStats[utils.CacheAttributeFilterIndexes].Items = 4
|
||||
expStats[utils.CacheAttributeFilterIndexes].Groups = 1
|
||||
expStats[utils.CacheReverseFilterIndexes].Items = 10
|
||||
expStats[utils.CacheReverseFilterIndexes].Groups = 7
|
||||
|
||||
if err := chcRPC.Call(utils.CacheSv1GetCacheStats, &utils.AttrCacheIDsWithOpts{}, &rcvStats); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
@@ -225,7 +225,7 @@ const CGRATES_CFG_JSON = `
|
||||
"*rate_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control rate filter indexes caching
|
||||
"*reverse_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control reverse filter indexes caching used only for set and remove filters
|
||||
"*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher routes caching
|
||||
"*dispatcher_loads": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher load ( in case of *load strategy )
|
||||
"*dispatcher_loads": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher load( in case of *ratio ConnParams is present)
|
||||
"*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher interface
|
||||
"*diameter_messages": {"limit": -1, "ttl": "3h", "static_ttl": false, "replicate": false}, // diameter messages caching
|
||||
"*rpc_responses": {"limit": 0, "ttl": "2s", "static_ttl": false, "replicate": false}, // RPC responses caching
|
||||
|
||||
@@ -204,7 +204,7 @@
|
||||
// "*rate_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control rate filter indexes caching
|
||||
// "*reverse_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control reverse filter indexes caching used only for set and remove filters
|
||||
// "*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher routes caching
|
||||
// "*dispatcher_loads": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher load ( in case of *load strategy )
|
||||
// "*dispatcher_loads": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher load( in case of *ratio ConnParams is present)
|
||||
// "*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher interface
|
||||
// "*diameter_messages": {"limit": -1, "ttl": "3h", "static_ttl": false, "replicate": false}, // diameter messages caching
|
||||
// "*rpc_responses": {"limit": 0, "ttl": "2s", "static_ttl": false, "replicate": false}, // RPC responses caching
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight
|
||||
cgrates.org,Engine1,*any,,,*weight,,Engine1,,20,false,,10
|
||||
cgrates.org,Engine2,*chargers,*string:~*req.EventName:TestLoad,,*load,,Engine1,,20,false,,20
|
||||
cgrates.org,Engine2,*chargers,*string:~*req.EventName:TestLoad,,*weight,,Engine1,,20,false,*ratio:1,20
|
||||
|
@@ -1,4 +1,4 @@
|
||||
#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight
|
||||
cgrates.org,InternalDispatcher,*caches;*core,,,*weight,,Self,,20,false,,30
|
||||
cgrates.org,ExternalDispatcher,*attributes,,,*weight,,Engine1,,20,false,,10
|
||||
cgrates.org,Engine2,*chargers,,,*load,,Engine1,,20,false,,10
|
||||
cgrates.org,Engine2,*chargers,,,*weight,,Engine1,,20,false,*ratio:1,10
|
||||
|
@@ -1,4 +1,4 @@
|
||||
#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight
|
||||
cgrates.org,InternalDispatcher,*caches;*core,,,*weight,,Self,,20,false,,30
|
||||
cgrates.org,Engine1,*any,,,*weight,,Engine1,,20,false,,10
|
||||
cgrates.org,Engine2,*chargers,*string:~*req.EventName:TestLoad,,*load,,Engine1,,20,false,,20
|
||||
cgrates.org,Engine2,*chargers,*string:~*req.EventName:TestLoad,,*weight,,Engine1,,20,false,*ratio:1,20
|
||||
|
@@ -1,4 +1,4 @@
|
||||
#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight
|
||||
cgrates.org,InternalDispatcher,*caches;*core,,,*weight,,Self,,20,false,,30
|
||||
cgrates.org,ExternalDispatcher,*attributes,,,*weight,,Engine1,,20,false,,10
|
||||
cgrates.org,Engine2,*chargers,,,*load,,Engine1,,20,false,,10
|
||||
cgrates.org,Engine2,*chargers,,,*weight,,Engine1,,20,false,*ratio:1,10
|
||||
|
@@ -159,6 +159,8 @@ func testDspChcLoadAfterFolder(t *testing.T) {
|
||||
expStats[utils.CacheChargerFilterIndexes].Groups = 1
|
||||
expStats[utils.CacheAttributeFilterIndexes].Items = 10
|
||||
expStats[utils.CacheAttributeFilterIndexes].Groups = 4
|
||||
expStats[utils.CacheReverseFilterIndexes].Items = 8
|
||||
expStats[utils.CacheReverseFilterIndexes].Groups = 6
|
||||
if err := dispEngine.RPC.Call(utils.CacheSv1GetCacheStats, &args, &rcvStats); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(expStats, rcvStats) {
|
||||
|
||||
@@ -22,7 +22,6 @@ import (
|
||||
"encoding/gob"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -44,7 +43,7 @@ type Dispatcher interface {
|
||||
// to make sure we take decisions based on latest config
|
||||
SetProfile(pfl *engine.DispatcherProfile)
|
||||
// HostIDs returns the ordered list of host IDs
|
||||
HostIDs() (hostIDs []string)
|
||||
HostIDs() (hostIDs engine.DispatcherHostIDs)
|
||||
// Dispatch is used to send the method over the connections given
|
||||
Dispatch(routeID string, subsystem,
|
||||
serviceMethod string, args interface{}, reply interface{}) (err error)
|
||||
@@ -59,46 +58,35 @@ type strategyDispatcher interface {
|
||||
// newDispatcher constructs instances of Dispatcher
|
||||
func newDispatcher(dm *engine.DataManager, pfl *engine.DispatcherProfile) (d Dispatcher, err error) {
|
||||
pfl.Hosts.Sort() // make sure the connections are sorted
|
||||
hosts := pfl.Hosts.Clone()
|
||||
switch pfl.Strategy {
|
||||
case utils.MetaWeight:
|
||||
d = &WeightDispatcher{
|
||||
dm: dm,
|
||||
tnt: pfl.Tenant,
|
||||
hosts: pfl.Hosts.Clone(),
|
||||
strategy: new(singleResultstrategyDispatcher),
|
||||
hosts: hosts,
|
||||
strategy: newSingleStrategyDispatcher(hosts, pfl.TenantID()),
|
||||
}
|
||||
case utils.MetaRandom:
|
||||
d = &RandomDispatcher{
|
||||
dm: dm,
|
||||
tnt: pfl.Tenant,
|
||||
hosts: pfl.Hosts.Clone(),
|
||||
strategy: new(singleResultstrategyDispatcher),
|
||||
hosts: hosts,
|
||||
strategy: newSingleStrategyDispatcher(hosts, pfl.TenantID()),
|
||||
}
|
||||
case utils.MetaRoundRobin:
|
||||
d = &RoundRobinDispatcher{
|
||||
dm: dm,
|
||||
tnt: pfl.Tenant,
|
||||
hosts: pfl.Hosts.Clone(),
|
||||
strategy: new(singleResultstrategyDispatcher),
|
||||
hosts: hosts,
|
||||
strategy: newSingleStrategyDispatcher(hosts, pfl.TenantID()),
|
||||
}
|
||||
case utils.MetaBroadcast:
|
||||
d = &BroadcastDispatcher{
|
||||
dm: dm,
|
||||
tnt: pfl.Tenant,
|
||||
hosts: pfl.Hosts.Clone(),
|
||||
strategy: new(brodcastStrategyDispatcher),
|
||||
}
|
||||
case utils.MetaLoad:
|
||||
hosts := pfl.Hosts.Clone()
|
||||
ls, err := newLoadStrategyDispatcher(hosts, pfl.TenantID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
d = &WeightDispatcher{
|
||||
dm: dm,
|
||||
tnt: pfl.Tenant,
|
||||
hosts: hosts,
|
||||
strategy: ls,
|
||||
strategy: new(brodcastStrategyDispatcher),
|
||||
}
|
||||
default:
|
||||
err = fmt.Errorf("unsupported dispatch strategy: <%s>", pfl.Strategy)
|
||||
@@ -123,7 +111,7 @@ func (wd *WeightDispatcher) SetProfile(pfl *engine.DispatcherProfile) {
|
||||
return
|
||||
}
|
||||
|
||||
func (wd *WeightDispatcher) HostIDs() (hostIDs []string) {
|
||||
func (wd *WeightDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) {
|
||||
wd.RLock()
|
||||
hostIDs = wd.hosts.HostIDs()
|
||||
wd.RUnlock()
|
||||
@@ -153,12 +141,12 @@ func (d *RandomDispatcher) SetProfile(pfl *engine.DispatcherProfile) {
|
||||
return
|
||||
}
|
||||
|
||||
func (d *RandomDispatcher) HostIDs() (hostIDs []string) {
|
||||
func (d *RandomDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) {
|
||||
d.RLock()
|
||||
hosts := d.hosts.Clone()
|
||||
hostIDs = d.hosts.HostIDs()
|
||||
d.RUnlock()
|
||||
hosts.Shuffle() // randomize the connections
|
||||
return hosts.HostIDs()
|
||||
hostIDs.Shuffle() // randomize the connections
|
||||
return
|
||||
}
|
||||
|
||||
func (d *RandomDispatcher) Dispatch(routeID string, subsystem,
|
||||
@@ -184,16 +172,16 @@ func (d *RoundRobinDispatcher) SetProfile(pfl *engine.DispatcherProfile) {
|
||||
return
|
||||
}
|
||||
|
||||
func (d *RoundRobinDispatcher) HostIDs() (hostIDs []string) {
|
||||
func (d *RoundRobinDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) {
|
||||
d.RLock()
|
||||
hosts := d.hosts.Clone()
|
||||
hosts.ReorderFromIndex(d.hostIdx)
|
||||
hostIDs = d.hosts.HostIDs()
|
||||
hostIDs.ReorderFromIndex(d.hostIdx)
|
||||
d.hostIdx++
|
||||
if d.hostIdx >= len(d.hosts) {
|
||||
d.hostIdx = 0
|
||||
}
|
||||
d.RUnlock()
|
||||
return hosts.HostIDs()
|
||||
return
|
||||
}
|
||||
|
||||
func (d *RoundRobinDispatcher) Dispatch(routeID string, subsystem,
|
||||
@@ -202,39 +190,9 @@ func (d *RoundRobinDispatcher) Dispatch(routeID string, subsystem,
|
||||
serviceMethod, args, reply)
|
||||
}
|
||||
|
||||
// BroadcastDispatcher will send the request to multiple hosts simultaneously
|
||||
type BroadcastDispatcher struct {
|
||||
sync.RWMutex
|
||||
dm *engine.DataManager
|
||||
tnt string
|
||||
hosts engine.DispatcherHostProfiles
|
||||
strategy strategyDispatcher
|
||||
}
|
||||
|
||||
func (d *BroadcastDispatcher) SetProfile(pfl *engine.DispatcherProfile) {
|
||||
d.Lock()
|
||||
pfl.Hosts.Sort()
|
||||
d.hosts = pfl.Hosts.Clone()
|
||||
d.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
func (d *BroadcastDispatcher) HostIDs() (hostIDs []string) {
|
||||
d.RLock()
|
||||
hostIDs = d.hosts.HostIDs()
|
||||
d.RUnlock()
|
||||
return
|
||||
}
|
||||
|
||||
func (d *BroadcastDispatcher) Dispatch(routeID string, subsystem,
|
||||
serviceMethod string, args interface{}, reply interface{}) (lastErr error) { // no cache needed for this strategy because we need to call all connections
|
||||
return d.strategy.dispatch(d.dm, routeID, subsystem, d.tnt, d.HostIDs(),
|
||||
serviceMethod, args, reply)
|
||||
}
|
||||
|
||||
type singleResultstrategyDispatcher struct{}
|
||||
|
||||
func (_ *singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID string, subsystem, tnt string,
|
||||
func (*singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID string, subsystem, tnt string,
|
||||
hostIDs []string, serviceMethod string, args interface{}, reply interface{}) (err error) {
|
||||
var dH *engine.DispatcherHost
|
||||
if routeID != utils.EmptyString {
|
||||
@@ -270,7 +228,7 @@ func (_ *singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeI
|
||||
|
||||
type brodcastStrategyDispatcher struct{}
|
||||
|
||||
func (_ *brodcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID string, subsystem, tnt string, hostIDs []string,
|
||||
func (*brodcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID string, subsystem, tnt string, hostIDs []string,
|
||||
serviceMethod string, args interface{}, reply interface{}) (err error) {
|
||||
var hasErrors bool
|
||||
for _, hostID := range hostIDs {
|
||||
@@ -295,13 +253,16 @@ func (_ *brodcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID st
|
||||
return
|
||||
}
|
||||
|
||||
func newLoadStrategyDispatcher(hosts engine.DispatcherHostProfiles, tntID string) (ls *loadStrategyDispatcher, err error) {
|
||||
ls = &loadStrategyDispatcher{
|
||||
tntID: tntID,
|
||||
hosts: hosts,
|
||||
func newSingleStrategyDispatcher(hosts engine.DispatcherHostProfiles, tntID string) (ls strategyDispatcher) {
|
||||
for _, host := range hosts {
|
||||
if _, has := host.Params[utils.MetaRatio]; has {
|
||||
return &loadStrategyDispatcher{
|
||||
tntID: tntID,
|
||||
hosts: hosts.Clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
return new(singleResultstrategyDispatcher)
|
||||
}
|
||||
|
||||
type loadStrategyDispatcher struct {
|
||||
@@ -318,8 +279,8 @@ func newLoadMetrics(hosts engine.DispatcherHostProfiles) (*LoadMetrics, error) {
|
||||
for _, host := range hosts {
|
||||
if strRatio, has := host.Params[utils.MetaRatio]; !has {
|
||||
lM.HostsRatio[host.ID] = 1
|
||||
lM.SumRatio += 1
|
||||
} else if ratio, err := strconv.ParseInt(utils.IfaceAsString(strRatio), 10, 64); err != nil {
|
||||
lM.SumRatio++
|
||||
} else if ratio, err := utils.IfaceAsTInt64(strRatio); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
lM.HostsRatio[host.ID] = ratio
|
||||
@@ -329,6 +290,7 @@ func newLoadMetrics(hosts engine.DispatcherHostProfiles) (*LoadMetrics, error) {
|
||||
return lM, nil
|
||||
}
|
||||
|
||||
// LoadMetrics the structure to save the metrix for load strategy
|
||||
type LoadMetrics struct {
|
||||
mutex sync.RWMutex
|
||||
HostsLoad map[string]int64
|
||||
@@ -386,32 +348,45 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID strin
|
||||
return
|
||||
}
|
||||
|
||||
// used to sort the host IDs based on costs
|
||||
type hostCosts struct {
|
||||
ids []string
|
||||
costs []int64
|
||||
}
|
||||
|
||||
func (hc *hostCosts) Len() int { return len(hc.ids) }
|
||||
func (hc *hostCosts) Less(i, j int) bool { return hc.costs[i] < hc.costs[j] }
|
||||
func (hc *hostCosts) Swap(i, j int) {
|
||||
hc.costs[i], hc.costs[j], hc.ids[i], hc.ids[j] = hc.costs[j], hc.costs[i], hc.ids[j], hc.ids[i]
|
||||
}
|
||||
|
||||
func (lM *LoadMetrics) getHosts(hostIDs []string) []string {
|
||||
costs := make([]int64, len(hostIDs))
|
||||
hlp := &hostCosts{
|
||||
ids: hostIDs,
|
||||
costs: make([]int64, len(hostIDs)),
|
||||
}
|
||||
lM.mutex.RLock()
|
||||
for i, id := range hostIDs {
|
||||
costs[i] = lM.HostsLoad[id]
|
||||
if costs[i] >= lM.HostsRatio[id] {
|
||||
costs[i] += lM.SumRatio
|
||||
hlp.costs[i] = lM.HostsLoad[id]
|
||||
if hlp.costs[i] >= lM.HostsRatio[id] {
|
||||
hlp.costs[i] += lM.SumRatio
|
||||
}
|
||||
}
|
||||
lM.mutex.RUnlock()
|
||||
sort.Slice(hostIDs, func(i, j int) bool {
|
||||
return costs[i] < costs[j]
|
||||
})
|
||||
return hostIDs
|
||||
sort.Sort(hlp)
|
||||
return hlp.ids
|
||||
}
|
||||
|
||||
func (lM *LoadMetrics) incrementLoad(hostID, tntID string) {
|
||||
lM.mutex.Lock()
|
||||
lM.HostsLoad[hostID] += 1
|
||||
lM.HostsLoad[hostID]++
|
||||
engine.Cache.ReplicateSet(utils.CacheDispatcherLoads, tntID, lM)
|
||||
lM.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (lM *LoadMetrics) decrementLoad(hostID, tntID string) {
|
||||
lM.mutex.Lock()
|
||||
lM.HostsLoad[hostID] -= 1
|
||||
lM.HostsLoad[hostID]--
|
||||
engine.Cache.ReplicateSet(utils.CacheDispatcherLoads, tntID, lM)
|
||||
lM.mutex.Unlock()
|
||||
}
|
||||
|
||||
61
dispatchers/libdispatcher_test.go
Normal file
61
dispatchers/libdispatcher_test.go
Normal file
@@ -0,0 +1,61 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package dispatchers
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func TestLoadMetricsGetHosts(t *testing.T) {
|
||||
dhp := engine.DispatcherHostProfiles{
|
||||
{ID: "DSP_1", Params: map[string]interface{}{utils.MetaRatio: 1}},
|
||||
{ID: "DSP_2", Params: map[string]interface{}{utils.MetaRatio: 1}},
|
||||
{ID: "DSP_3", Params: map[string]interface{}{utils.MetaRatio: 1}},
|
||||
{ID: "DSP_4", Params: map[string]interface{}{utils.MetaRatio: 1}},
|
||||
{ID: "DSP_5", Params: map[string]interface{}{utils.MetaRatio: 1}},
|
||||
}
|
||||
lm, err := newLoadMetrics(dhp)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
hostsIDs := engine.DispatcherHostIDs(dhp.HostIDs())
|
||||
// to prevent randomness we increment all loads exept the first one
|
||||
for _, hst := range hostsIDs[1:] {
|
||||
lm.incrementLoad(hst, utils.EmptyString)
|
||||
}
|
||||
// check only the first host because the rest may be in a random order
|
||||
// because they share the same cost
|
||||
if rply := lm.getHosts(hostsIDs.Clone()); rply[0] != "DSP_1" {
|
||||
t.Errorf("Expected: %q ,received: %q", "DSP_1", rply[0])
|
||||
}
|
||||
lm.incrementLoad(hostsIDs[0], utils.EmptyString)
|
||||
lm.decrementLoad(hostsIDs[1], utils.EmptyString)
|
||||
if rply := lm.getHosts(hostsIDs.Clone()); rply[0] != "DSP_2" {
|
||||
t.Errorf("Expected: %q ,received: %q", "DSP_2", rply[0])
|
||||
}
|
||||
for _, hst := range hostsIDs {
|
||||
lm.incrementLoad(hst, utils.EmptyString)
|
||||
}
|
||||
if rply := lm.getHosts(hostsIDs.Clone()); rply[0] != "DSP_2" {
|
||||
t.Errorf("Expected: %q ,received: %q", "DSP_2", rply[0])
|
||||
}
|
||||
}
|
||||
@@ -35,6 +35,8 @@ var (
|
||||
utils.DispatcherFilterIndexes: {},
|
||||
utils.RateProfilesFilterIndexPrfx: {},
|
||||
utils.RateFilterIndexPrfx: {},
|
||||
utils.ActionPlanIndexes: {},
|
||||
utils.FilterIndexPrfx: {},
|
||||
}
|
||||
cachePrefixMap = utils.StringSet{
|
||||
utils.DESTINATION_PREFIX: {},
|
||||
@@ -271,8 +273,12 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b
|
||||
}
|
||||
_, err = dm.GetIndexes(utils.CacheRateFilterIndexes, tntCtx, idxKey, false, true)
|
||||
case utils.FilterIndexPrfx:
|
||||
tntID := utils.NewTenantID(dataID)
|
||||
_, err = dm.GetIndexes(utils.CacheReverseFilterIndexes, tntID.Tenant, tntID.ID, false, true)
|
||||
idx := strings.LastIndexByte(dataID, utils.InInFieldSep[0])
|
||||
if idx < 0 {
|
||||
err = fmt.Errorf("WRONG_IDX_KEY_FORMAT<%s>", dataID)
|
||||
return
|
||||
}
|
||||
_, err = dm.GetIndexes(utils.CacheReverseFilterIndexes, dataID[:idx], dataID[idx+1:], false, true)
|
||||
case utils.LoadIDPrefix:
|
||||
_, err = dm.GetItemLoadIDs(utils.EmptyString, true)
|
||||
}
|
||||
|
||||
@@ -168,3 +168,34 @@ func (dH *DispatcherHost) Call(serviceMethod string, args interface{}, reply int
|
||||
}
|
||||
return dH.rpcConn.Call(serviceMethod, args, reply)
|
||||
}
|
||||
|
||||
type DispatcherHostIDs []string
|
||||
|
||||
// ReorderFromIndex will consider idx as starting point for the reordered slice
|
||||
func (dHPrflIDs DispatcherHostIDs) ReorderFromIndex(idx int) {
|
||||
initConns := dHPrflIDs.Clone()
|
||||
for i := 0; i < len(dHPrflIDs); i++ {
|
||||
if idx > len(dHPrflIDs)-1 {
|
||||
idx = 0
|
||||
}
|
||||
dHPrflIDs[i] = initConns[idx]
|
||||
idx++
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Shuffle will mix the connections in place
|
||||
func (dHPrflIDs DispatcherHostIDs) Shuffle() {
|
||||
rand.Shuffle(len(dHPrflIDs), func(i, j int) {
|
||||
dHPrflIDs[i], dHPrflIDs[j] = dHPrflIDs[j], dHPrflIDs[i]
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (dHPrflIDs DispatcherHostIDs) Clone() (cln DispatcherHostIDs) {
|
||||
cln = make(DispatcherHostIDs, len(dHPrflIDs))
|
||||
for i, dhID := range dHPrflIDs {
|
||||
cln[i] = dhID
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -252,3 +252,46 @@ func TestDispatcherHostCall(t *testing.T) {
|
||||
t.Errorf("Expected: %s , received: %s", utils.ToJSON(etRPC), utils.ToJSON(tRPC))
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatcherHostIDsProfilesReorderFromIndex(t *testing.T) {
|
||||
dConns := DispatcherHostIDs{"DSP_1", "DSP_2", "DSP_3"}
|
||||
eConns := DispatcherHostIDs{"DSP_1", "DSP_2", "DSP_3"}
|
||||
if dConns.ReorderFromIndex(0); !reflect.DeepEqual(eConns, dConns) {
|
||||
t.Errorf("expecting: %+v, received: %+v", eConns, dConns)
|
||||
}
|
||||
dConns = DispatcherHostIDs{"DSP_1", "DSP_2", "DSP_3"}
|
||||
if dConns.ReorderFromIndex(3); !reflect.DeepEqual(eConns, dConns) {
|
||||
t.Errorf("expecting: %+v, received: %+v", eConns, dConns)
|
||||
}
|
||||
dConns = DispatcherHostIDs{"DSP_1", "DSP_2", "DSP_3"}
|
||||
eConns = DispatcherHostIDs{"DSP_3", "DSP_1", "DSP_2"}
|
||||
if dConns.ReorderFromIndex(2); !reflect.DeepEqual(eConns, dConns) {
|
||||
t.Errorf("expecting: %+v, received: %+v", eConns, dConns)
|
||||
}
|
||||
dConns = DispatcherHostIDs{"DSP_1", "DSP_2", "DSP_3"}
|
||||
eConns = DispatcherHostIDs{"DSP_2", "DSP_3", "DSP_1"}
|
||||
if dConns.ReorderFromIndex(1); !reflect.DeepEqual(eConns, dConns) {
|
||||
t.Errorf("expecting: %+v, received: %+v",
|
||||
utils.ToJSON(eConns), utils.ToJSON(dConns))
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatcherHostIDsProfilesShuffle(t *testing.T) {
|
||||
dConns := DispatcherHostIDs{"DSP_1", "DSP_2", "DSP_3", "DSP_4"}
|
||||
oConns := DispatcherHostIDs{"DSP_1", "DSP_2", "DSP_3", "DSP_4"}
|
||||
if dConns.Shuffle(); dConns[0] == oConns[0] ||
|
||||
dConns[1] == oConns[1] || dConns[2] == oConns[2] ||
|
||||
dConns[3] == oConns[3] {
|
||||
t.Errorf("received: %s", utils.ToJSON(dConns))
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatcherHostIDsProfilesClone(t *testing.T) {
|
||||
dConns := DispatcherHostIDs{"DSP_1", "DSP_2", "DSP_3"}
|
||||
eConns := DispatcherHostIDs{"DSP_1", "DSP_2", "DSP_3"}
|
||||
d2Conns := dConns.Clone()
|
||||
d2Conns[0] = "DSP_4"
|
||||
if !reflect.DeepEqual(eConns, dConns) {
|
||||
t.Errorf("expecting: %+v, received: %+v", utils.ToJSON(eConns), utils.ToJSON(dConns))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -699,6 +699,14 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err er
|
||||
result, err = ms.getField3(sctx, ColIndx, utils.ChargerFilterIndexes, "key")
|
||||
case utils.DispatcherFilterIndexes:
|
||||
result, err = ms.getField3(sctx, ColIndx, utils.DispatcherFilterIndexes, "key")
|
||||
case utils.ActionPlanIndexes:
|
||||
result, err = ms.getField3(sctx, ColIndx, utils.ActionPlanIndexes, "key")
|
||||
case utils.RateProfilesFilterIndexPrfx:
|
||||
result, err = ms.getField3(sctx, ColIndx, utils.RateProfilesFilterIndexPrfx, "key")
|
||||
case utils.RateFilterIndexPrfx:
|
||||
result, err = ms.getField3(sctx, ColIndx, utils.RateFilterIndexPrfx, "key")
|
||||
case utils.FilterIndexPrfx:
|
||||
result, err = ms.getField3(sctx, ColIndx, utils.FilterIndexPrfx, "key")
|
||||
default:
|
||||
err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %s", prefix)
|
||||
}
|
||||
|
||||
@@ -459,7 +459,7 @@ func testGetCDRs(cfg *config.CGRConfig) error {
|
||||
if err := cdrStorage.SetCDR(cdr, false); err != nil {
|
||||
return fmt.Errorf("testGetCDRs #4 CDR: %+v, err: %v", cdr, err)
|
||||
}
|
||||
if *dbType == utils.MetaMySQL {
|
||||
if *dbType == utils.MetaMySQL || *dbType == utils.MetaPostgres {
|
||||
cdr.OrderID = int64(i + 1)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,15 +40,15 @@ func TestRPCObjectPointer(t *testing.T) {
|
||||
t.Errorf("error converting to struct: %+v (%v)", a, err)
|
||||
}
|
||||
/*
|
||||
//TODO: make pointer in arguments usable
|
||||
x, found = rpcParamsMap["RpcStruct.Tropa"]
|
||||
if !found {
|
||||
t.Errorf("error getting rpcobject: %v (%+v)", rpcParamsMap, x)
|
||||
}
|
||||
b := x.InParam
|
||||
// log.Printf("T: %+v", b)
|
||||
if err := mapstructure.Decode(map[string]interface{}{"Name": "a", "Surname": "b", "Age": 10.2}, b); err != nil || b.(*Attr).Name != "a" || b.(*Attr).Surname != "b" || b.(*Attr).Age != 10.2 {
|
||||
t.Errorf("error converting to struct: %+v (%v)", b, err)
|
||||
}
|
||||
//TODO: make pointer in arguments usable
|
||||
x, found = rpcParamsMap["RpcStruct.Tropa"]
|
||||
if !found {
|
||||
t.Errorf("error getting rpcobject: %v (%+v)", rpcParamsMap, x)
|
||||
}
|
||||
b := x.InParam
|
||||
// log.Printf("T: %+v", b)
|
||||
if err := mapstructure.Decode(map[string]interface{}{"Name": "a", "Surname": "b", "Age": 10.2}, b); err != nil || b.(*Attr).Name != "a" || b.(*Attr).Surname != "b" || b.(*Attr).Age != 10.2 {
|
||||
t.Errorf("error converting to struct: %+v (%v)", b, err)
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user