From ffb51d7d88f2558531e5cb4eb2995726d22f12ae Mon Sep 17 00:00:00 2001 From: Trial97 Date: Mon, 17 Aug 2020 18:11:41 +0300 Subject: [PATCH] Updated dispatcher host matching based on load if *ratio is specified --- apier/v1/apier_it_test.go | 2 + apier/v1/caches_it_test.go | 4 + config/config_defaults.go | 2 +- data/conf/cgrates/cgrates.json | 2 +- .../dispatcher_engine/DispatcherProfiles.csv | 2 +- .../dispatcher_engine2/DispatcherProfiles.csv | 2 +- .../dispatcher_engine/DispatcherProfiles.csv | 2 +- .../dispatcher_engine2/DispatcherProfiles.csv | 2 +- dispatchers/caches_it_test.go | 2 + dispatchers/libdispatcher.go | 135 +++++++----------- dispatchers/libdispatcher_test.go | 61 ++++++++ engine/datamanager.go | 10 +- engine/dispatcherprfl.go | 31 ++++ engine/dispatcherprfl_test.go | 43 ++++++ engine/storage_mongo_datadb.go | 8 ++ engine/z_storage_cdrs_it_test.go | 2 +- utils/rpc_params_test.go | 20 +-- 17 files changed, 231 insertions(+), 99 deletions(-) create mode 100644 dispatchers/libdispatcher_test.go diff --git a/apier/v1/apier_it_test.go b/apier/v1/apier_it_test.go index 2c76873ce..413e70f2b 100644 --- a/apier/v1/apier_it_test.go +++ b/apier/v1/apier_it_test.go @@ -1507,6 +1507,8 @@ func testApierResetDataAfterLoadFromFolder(t *testing.T) { expStats[utils.CacheResourceFilterIndexes].Groups = 1 expStats[utils.CacheAttributeFilterIndexes].Items = 4 expStats[utils.CacheAttributeFilterIndexes].Groups = 1 + expStats[utils.CacheReverseFilterIndexes].Items = 10 + expStats[utils.CacheReverseFilterIndexes].Groups = 7 if err := rater.Call(utils.CacheSv1GetCacheStats, new(utils.AttrCacheIDsWithOpts), &rcvStats); err != nil { t.Error(err) diff --git a/apier/v1/caches_it_test.go b/apier/v1/caches_it_test.go index 0861f8fd7..6a15f33c5 100644 --- a/apier/v1/caches_it_test.go +++ b/apier/v1/caches_it_test.go @@ -172,6 +172,8 @@ func testCacheSAfterLoadFromFolder(t *testing.T) { expStats[utils.CacheResourceFilterIndexes].Groups = 1 expStats[utils.CacheAttributeFilterIndexes].Items = 4 expStats[utils.CacheAttributeFilterIndexes].Groups = 1 + expStats[utils.CacheReverseFilterIndexes].Items = 10 + expStats[utils.CacheReverseFilterIndexes].Groups = 7 if err := chcRPC.Call(utils.CacheSv1GetCacheStats, &utils.AttrCacheIDsWithOpts{}, &rcvStats); err != nil { t.Error(err) @@ -238,6 +240,8 @@ func testCacheSReload(t *testing.T) { expStats[utils.CacheResourceFilterIndexes].Groups = 1 expStats[utils.CacheAttributeFilterIndexes].Items = 4 expStats[utils.CacheAttributeFilterIndexes].Groups = 1 + expStats[utils.CacheReverseFilterIndexes].Items = 10 + expStats[utils.CacheReverseFilterIndexes].Groups = 7 if err := chcRPC.Call(utils.CacheSv1GetCacheStats, &utils.AttrCacheIDsWithOpts{}, &rcvStats); err != nil { t.Error(err) diff --git a/config/config_defaults.go b/config/config_defaults.go index a2e083bb9..992c348ff 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -225,7 +225,7 @@ const CGRATES_CFG_JSON = ` "*rate_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control rate filter indexes caching "*reverse_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control reverse filter indexes caching used only for set and remove filters "*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher routes caching - "*dispatcher_loads": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher load ( in case of *load strategy ) + "*dispatcher_loads": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher load( in case of *ratio ConnParams is present) "*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher interface "*diameter_messages": {"limit": -1, "ttl": "3h", "static_ttl": false, "replicate": false}, // diameter messages caching "*rpc_responses": {"limit": 0, "ttl": "2s", "static_ttl": false, "replicate": false}, // RPC responses caching diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index ba2d43cca..ec376705a 100755 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -204,7 +204,7 @@ // "*rate_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control rate filter indexes caching // "*reverse_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control reverse filter indexes caching used only for set and remove filters // "*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher routes caching -// "*dispatcher_loads": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher load ( in case of *load strategy ) +// "*dispatcher_loads": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher load( in case of *ratio ConnParams is present) // "*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher interface // "*diameter_messages": {"limit": -1, "ttl": "3h", "static_ttl": false, "replicate": false}, // diameter messages caching // "*rpc_responses": {"limit": 0, "ttl": "2s", "static_ttl": false, "replicate": false}, // RPC responses caching diff --git a/data/tariffplans/cache_replications/dispatcher_engine/DispatcherProfiles.csv b/data/tariffplans/cache_replications/dispatcher_engine/DispatcherProfiles.csv index a4c4c28d5..143f6b356 100644 --- a/data/tariffplans/cache_replications/dispatcher_engine/DispatcherProfiles.csv +++ b/data/tariffplans/cache_replications/dispatcher_engine/DispatcherProfiles.csv @@ -1,3 +1,3 @@ #Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight cgrates.org,Engine1,*any,,,*weight,,Engine1,,20,false,,10 -cgrates.org,Engine2,*chargers,*string:~*req.EventName:TestLoad,,*load,,Engine1,,20,false,,20 \ No newline at end of file +cgrates.org,Engine2,*chargers,*string:~*req.EventName:TestLoad,,*weight,,Engine1,,20,false,*ratio:1,20 \ No newline at end of file diff --git a/data/tariffplans/cache_replications/dispatcher_engine2/DispatcherProfiles.csv b/data/tariffplans/cache_replications/dispatcher_engine2/DispatcherProfiles.csv index 1a665991f..86f464c4f 100644 --- a/data/tariffplans/cache_replications/dispatcher_engine2/DispatcherProfiles.csv +++ b/data/tariffplans/cache_replications/dispatcher_engine2/DispatcherProfiles.csv @@ -1,4 +1,4 @@ #Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight cgrates.org,InternalDispatcher,*caches;*core,,,*weight,,Self,,20,false,,30 cgrates.org,ExternalDispatcher,*attributes,,,*weight,,Engine1,,20,false,,10 -cgrates.org,Engine2,*chargers,,,*load,,Engine1,,20,false,,10 \ No newline at end of file +cgrates.org,Engine2,*chargers,,,*weight,,Engine1,,20,false,*ratio:1,10 \ No newline at end of file diff --git a/data/tariffplans/cache_rpl_active_active/dispatcher_engine/DispatcherProfiles.csv b/data/tariffplans/cache_rpl_active_active/dispatcher_engine/DispatcherProfiles.csv index b5f51282f..752100425 100644 --- a/data/tariffplans/cache_rpl_active_active/dispatcher_engine/DispatcherProfiles.csv +++ b/data/tariffplans/cache_rpl_active_active/dispatcher_engine/DispatcherProfiles.csv @@ -1,4 +1,4 @@ #Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight cgrates.org,InternalDispatcher,*caches;*core,,,*weight,,Self,,20,false,,30 cgrates.org,Engine1,*any,,,*weight,,Engine1,,20,false,,10 -cgrates.org,Engine2,*chargers,*string:~*req.EventName:TestLoad,,*load,,Engine1,,20,false,,20 \ No newline at end of file +cgrates.org,Engine2,*chargers,*string:~*req.EventName:TestLoad,,*weight,,Engine1,,20,false,*ratio:1,20 \ No newline at end of file diff --git a/data/tariffplans/cache_rpl_active_active/dispatcher_engine2/DispatcherProfiles.csv b/data/tariffplans/cache_rpl_active_active/dispatcher_engine2/DispatcherProfiles.csv index 1a665991f..86f464c4f 100644 --- a/data/tariffplans/cache_rpl_active_active/dispatcher_engine2/DispatcherProfiles.csv +++ b/data/tariffplans/cache_rpl_active_active/dispatcher_engine2/DispatcherProfiles.csv @@ -1,4 +1,4 @@ #Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight cgrates.org,InternalDispatcher,*caches;*core,,,*weight,,Self,,20,false,,30 cgrates.org,ExternalDispatcher,*attributes,,,*weight,,Engine1,,20,false,,10 -cgrates.org,Engine2,*chargers,,,*load,,Engine1,,20,false,,10 \ No newline at end of file +cgrates.org,Engine2,*chargers,,,*weight,,Engine1,,20,false,*ratio:1,10 \ No newline at end of file diff --git a/dispatchers/caches_it_test.go b/dispatchers/caches_it_test.go index a0f8a92ac..7e86bf41e 100644 --- a/dispatchers/caches_it_test.go +++ b/dispatchers/caches_it_test.go @@ -159,6 +159,8 @@ func testDspChcLoadAfterFolder(t *testing.T) { expStats[utils.CacheChargerFilterIndexes].Groups = 1 expStats[utils.CacheAttributeFilterIndexes].Items = 10 expStats[utils.CacheAttributeFilterIndexes].Groups = 4 + expStats[utils.CacheReverseFilterIndexes].Items = 8 + expStats[utils.CacheReverseFilterIndexes].Groups = 6 if err := dispEngine.RPC.Call(utils.CacheSv1GetCacheStats, &args, &rcvStats); err != nil { t.Error(err) } else if !reflect.DeepEqual(expStats, rcvStats) { diff --git a/dispatchers/libdispatcher.go b/dispatchers/libdispatcher.go index 042ce2a89..0e8d57661 100644 --- a/dispatchers/libdispatcher.go +++ b/dispatchers/libdispatcher.go @@ -22,7 +22,6 @@ import ( "encoding/gob" "fmt" "sort" - "strconv" "sync" "github.com/cgrates/cgrates/engine" @@ -44,7 +43,7 @@ type Dispatcher interface { // to make sure we take decisions based on latest config SetProfile(pfl *engine.DispatcherProfile) // HostIDs returns the ordered list of host IDs - HostIDs() (hostIDs []string) + HostIDs() (hostIDs engine.DispatcherHostIDs) // Dispatch is used to send the method over the connections given Dispatch(routeID string, subsystem, serviceMethod string, args interface{}, reply interface{}) (err error) @@ -59,46 +58,35 @@ type strategyDispatcher interface { // newDispatcher constructs instances of Dispatcher func newDispatcher(dm *engine.DataManager, pfl *engine.DispatcherProfile) (d Dispatcher, err error) { pfl.Hosts.Sort() // make sure the connections are sorted + hosts := pfl.Hosts.Clone() switch pfl.Strategy { case utils.MetaWeight: d = &WeightDispatcher{ dm: dm, tnt: pfl.Tenant, - hosts: pfl.Hosts.Clone(), - strategy: new(singleResultstrategyDispatcher), + hosts: hosts, + strategy: newSingleStrategyDispatcher(hosts, pfl.TenantID()), } case utils.MetaRandom: d = &RandomDispatcher{ dm: dm, tnt: pfl.Tenant, - hosts: pfl.Hosts.Clone(), - strategy: new(singleResultstrategyDispatcher), + hosts: hosts, + strategy: newSingleStrategyDispatcher(hosts, pfl.TenantID()), } case utils.MetaRoundRobin: d = &RoundRobinDispatcher{ dm: dm, tnt: pfl.Tenant, - hosts: pfl.Hosts.Clone(), - strategy: new(singleResultstrategyDispatcher), + hosts: hosts, + strategy: newSingleStrategyDispatcher(hosts, pfl.TenantID()), } case utils.MetaBroadcast: - d = &BroadcastDispatcher{ - dm: dm, - tnt: pfl.Tenant, - hosts: pfl.Hosts.Clone(), - strategy: new(brodcastStrategyDispatcher), - } - case utils.MetaLoad: - hosts := pfl.Hosts.Clone() - ls, err := newLoadStrategyDispatcher(hosts, pfl.TenantID()) - if err != nil { - return nil, err - } d = &WeightDispatcher{ dm: dm, tnt: pfl.Tenant, hosts: hosts, - strategy: ls, + strategy: new(brodcastStrategyDispatcher), } default: err = fmt.Errorf("unsupported dispatch strategy: <%s>", pfl.Strategy) @@ -123,7 +111,7 @@ func (wd *WeightDispatcher) SetProfile(pfl *engine.DispatcherProfile) { return } -func (wd *WeightDispatcher) HostIDs() (hostIDs []string) { +func (wd *WeightDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) { wd.RLock() hostIDs = wd.hosts.HostIDs() wd.RUnlock() @@ -153,12 +141,12 @@ func (d *RandomDispatcher) SetProfile(pfl *engine.DispatcherProfile) { return } -func (d *RandomDispatcher) HostIDs() (hostIDs []string) { +func (d *RandomDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) { d.RLock() - hosts := d.hosts.Clone() + hostIDs = d.hosts.HostIDs() d.RUnlock() - hosts.Shuffle() // randomize the connections - return hosts.HostIDs() + hostIDs.Shuffle() // randomize the connections + return } func (d *RandomDispatcher) Dispatch(routeID string, subsystem, @@ -184,16 +172,16 @@ func (d *RoundRobinDispatcher) SetProfile(pfl *engine.DispatcherProfile) { return } -func (d *RoundRobinDispatcher) HostIDs() (hostIDs []string) { +func (d *RoundRobinDispatcher) HostIDs() (hostIDs engine.DispatcherHostIDs) { d.RLock() - hosts := d.hosts.Clone() - hosts.ReorderFromIndex(d.hostIdx) + hostIDs = d.hosts.HostIDs() + hostIDs.ReorderFromIndex(d.hostIdx) d.hostIdx++ if d.hostIdx >= len(d.hosts) { d.hostIdx = 0 } d.RUnlock() - return hosts.HostIDs() + return } func (d *RoundRobinDispatcher) Dispatch(routeID string, subsystem, @@ -202,39 +190,9 @@ func (d *RoundRobinDispatcher) Dispatch(routeID string, subsystem, serviceMethod, args, reply) } -// BroadcastDispatcher will send the request to multiple hosts simultaneously -type BroadcastDispatcher struct { - sync.RWMutex - dm *engine.DataManager - tnt string - hosts engine.DispatcherHostProfiles - strategy strategyDispatcher -} - -func (d *BroadcastDispatcher) SetProfile(pfl *engine.DispatcherProfile) { - d.Lock() - pfl.Hosts.Sort() - d.hosts = pfl.Hosts.Clone() - d.Unlock() - return -} - -func (d *BroadcastDispatcher) HostIDs() (hostIDs []string) { - d.RLock() - hostIDs = d.hosts.HostIDs() - d.RUnlock() - return -} - -func (d *BroadcastDispatcher) Dispatch(routeID string, subsystem, - serviceMethod string, args interface{}, reply interface{}) (lastErr error) { // no cache needed for this strategy because we need to call all connections - return d.strategy.dispatch(d.dm, routeID, subsystem, d.tnt, d.HostIDs(), - serviceMethod, args, reply) -} - type singleResultstrategyDispatcher struct{} -func (_ *singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID string, subsystem, tnt string, +func (*singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID string, subsystem, tnt string, hostIDs []string, serviceMethod string, args interface{}, reply interface{}) (err error) { var dH *engine.DispatcherHost if routeID != utils.EmptyString { @@ -270,7 +228,7 @@ func (_ *singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeI type brodcastStrategyDispatcher struct{} -func (_ *brodcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID string, subsystem, tnt string, hostIDs []string, +func (*brodcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID string, subsystem, tnt string, hostIDs []string, serviceMethod string, args interface{}, reply interface{}) (err error) { var hasErrors bool for _, hostID := range hostIDs { @@ -295,13 +253,16 @@ func (_ *brodcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID st return } -func newLoadStrategyDispatcher(hosts engine.DispatcherHostProfiles, tntID string) (ls *loadStrategyDispatcher, err error) { - ls = &loadStrategyDispatcher{ - tntID: tntID, - hosts: hosts, +func newSingleStrategyDispatcher(hosts engine.DispatcherHostProfiles, tntID string) (ls strategyDispatcher) { + for _, host := range hosts { + if _, has := host.Params[utils.MetaRatio]; has { + return &loadStrategyDispatcher{ + tntID: tntID, + hosts: hosts.Clone(), + } + } } - - return + return new(singleResultstrategyDispatcher) } type loadStrategyDispatcher struct { @@ -318,8 +279,8 @@ func newLoadMetrics(hosts engine.DispatcherHostProfiles) (*LoadMetrics, error) { for _, host := range hosts { if strRatio, has := host.Params[utils.MetaRatio]; !has { lM.HostsRatio[host.ID] = 1 - lM.SumRatio += 1 - } else if ratio, err := strconv.ParseInt(utils.IfaceAsString(strRatio), 10, 64); err != nil { + lM.SumRatio++ + } else if ratio, err := utils.IfaceAsTInt64(strRatio); err != nil { return nil, err } else { lM.HostsRatio[host.ID] = ratio @@ -329,6 +290,7 @@ func newLoadMetrics(hosts engine.DispatcherHostProfiles) (*LoadMetrics, error) { return lM, nil } +// LoadMetrics the structure to save the metrix for load strategy type LoadMetrics struct { mutex sync.RWMutex HostsLoad map[string]int64 @@ -386,32 +348,45 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID strin return } +// used to sort the host IDs based on costs +type hostCosts struct { + ids []string + costs []int64 +} + +func (hc *hostCosts) Len() int { return len(hc.ids) } +func (hc *hostCosts) Less(i, j int) bool { return hc.costs[i] < hc.costs[j] } +func (hc *hostCosts) Swap(i, j int) { + hc.costs[i], hc.costs[j], hc.ids[i], hc.ids[j] = hc.costs[j], hc.costs[i], hc.ids[j], hc.ids[i] +} + func (lM *LoadMetrics) getHosts(hostIDs []string) []string { - costs := make([]int64, len(hostIDs)) + hlp := &hostCosts{ + ids: hostIDs, + costs: make([]int64, len(hostIDs)), + } lM.mutex.RLock() for i, id := range hostIDs { - costs[i] = lM.HostsLoad[id] - if costs[i] >= lM.HostsRatio[id] { - costs[i] += lM.SumRatio + hlp.costs[i] = lM.HostsLoad[id] + if hlp.costs[i] >= lM.HostsRatio[id] { + hlp.costs[i] += lM.SumRatio } } lM.mutex.RUnlock() - sort.Slice(hostIDs, func(i, j int) bool { - return costs[i] < costs[j] - }) - return hostIDs + sort.Sort(hlp) + return hlp.ids } func (lM *LoadMetrics) incrementLoad(hostID, tntID string) { lM.mutex.Lock() - lM.HostsLoad[hostID] += 1 + lM.HostsLoad[hostID]++ engine.Cache.ReplicateSet(utils.CacheDispatcherLoads, tntID, lM) lM.mutex.Unlock() } func (lM *LoadMetrics) decrementLoad(hostID, tntID string) { lM.mutex.Lock() - lM.HostsLoad[hostID] -= 1 + lM.HostsLoad[hostID]-- engine.Cache.ReplicateSet(utils.CacheDispatcherLoads, tntID, lM) lM.mutex.Unlock() } diff --git a/dispatchers/libdispatcher_test.go b/dispatchers/libdispatcher_test.go new file mode 100644 index 000000000..b21359ef7 --- /dev/null +++ b/dispatchers/libdispatcher_test.go @@ -0,0 +1,61 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package dispatchers + +import ( + "testing" + + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func TestLoadMetricsGetHosts(t *testing.T) { + dhp := engine.DispatcherHostProfiles{ + {ID: "DSP_1", Params: map[string]interface{}{utils.MetaRatio: 1}}, + {ID: "DSP_2", Params: map[string]interface{}{utils.MetaRatio: 1}}, + {ID: "DSP_3", Params: map[string]interface{}{utils.MetaRatio: 1}}, + {ID: "DSP_4", Params: map[string]interface{}{utils.MetaRatio: 1}}, + {ID: "DSP_5", Params: map[string]interface{}{utils.MetaRatio: 1}}, + } + lm, err := newLoadMetrics(dhp) + if err != nil { + t.Fatal(err) + } + hostsIDs := engine.DispatcherHostIDs(dhp.HostIDs()) + // to prevent randomness we increment all loads exept the first one + for _, hst := range hostsIDs[1:] { + lm.incrementLoad(hst, utils.EmptyString) + } + // check only the first host because the rest may be in a random order + // because they share the same cost + if rply := lm.getHosts(hostsIDs.Clone()); rply[0] != "DSP_1" { + t.Errorf("Expected: %q ,received: %q", "DSP_1", rply[0]) + } + lm.incrementLoad(hostsIDs[0], utils.EmptyString) + lm.decrementLoad(hostsIDs[1], utils.EmptyString) + if rply := lm.getHosts(hostsIDs.Clone()); rply[0] != "DSP_2" { + t.Errorf("Expected: %q ,received: %q", "DSP_2", rply[0]) + } + for _, hst := range hostsIDs { + lm.incrementLoad(hst, utils.EmptyString) + } + if rply := lm.getHosts(hostsIDs.Clone()); rply[0] != "DSP_2" { + t.Errorf("Expected: %q ,received: %q", "DSP_2", rply[0]) + } +} diff --git a/engine/datamanager.go b/engine/datamanager.go index c6c8ff3a4..3869531d2 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -35,6 +35,8 @@ var ( utils.DispatcherFilterIndexes: {}, utils.RateProfilesFilterIndexPrfx: {}, utils.RateFilterIndexPrfx: {}, + utils.ActionPlanIndexes: {}, + utils.FilterIndexPrfx: {}, } cachePrefixMap = utils.StringSet{ utils.DESTINATION_PREFIX: {}, @@ -271,8 +273,12 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b } _, err = dm.GetIndexes(utils.CacheRateFilterIndexes, tntCtx, idxKey, false, true) case utils.FilterIndexPrfx: - tntID := utils.NewTenantID(dataID) - _, err = dm.GetIndexes(utils.CacheReverseFilterIndexes, tntID.Tenant, tntID.ID, false, true) + idx := strings.LastIndexByte(dataID, utils.InInFieldSep[0]) + if idx < 0 { + err = fmt.Errorf("WRONG_IDX_KEY_FORMAT<%s>", dataID) + return + } + _, err = dm.GetIndexes(utils.CacheReverseFilterIndexes, dataID[:idx], dataID[idx+1:], false, true) case utils.LoadIDPrefix: _, err = dm.GetItemLoadIDs(utils.EmptyString, true) } diff --git a/engine/dispatcherprfl.go b/engine/dispatcherprfl.go index 6b3ba2ff6..318b874b6 100644 --- a/engine/dispatcherprfl.go +++ b/engine/dispatcherprfl.go @@ -168,3 +168,34 @@ func (dH *DispatcherHost) Call(serviceMethod string, args interface{}, reply int } return dH.rpcConn.Call(serviceMethod, args, reply) } + +type DispatcherHostIDs []string + +// ReorderFromIndex will consider idx as starting point for the reordered slice +func (dHPrflIDs DispatcherHostIDs) ReorderFromIndex(idx int) { + initConns := dHPrflIDs.Clone() + for i := 0; i < len(dHPrflIDs); i++ { + if idx > len(dHPrflIDs)-1 { + idx = 0 + } + dHPrflIDs[i] = initConns[idx] + idx++ + } + return +} + +// Shuffle will mix the connections in place +func (dHPrflIDs DispatcherHostIDs) Shuffle() { + rand.Shuffle(len(dHPrflIDs), func(i, j int) { + dHPrflIDs[i], dHPrflIDs[j] = dHPrflIDs[j], dHPrflIDs[i] + }) + return +} + +func (dHPrflIDs DispatcherHostIDs) Clone() (cln DispatcherHostIDs) { + cln = make(DispatcherHostIDs, len(dHPrflIDs)) + for i, dhID := range dHPrflIDs { + cln[i] = dhID + } + return +} diff --git a/engine/dispatcherprfl_test.go b/engine/dispatcherprfl_test.go index b04b8f443..06b7369cb 100644 --- a/engine/dispatcherprfl_test.go +++ b/engine/dispatcherprfl_test.go @@ -252,3 +252,46 @@ func TestDispatcherHostCall(t *testing.T) { t.Errorf("Expected: %s , received: %s", utils.ToJSON(etRPC), utils.ToJSON(tRPC)) } } + +func TestDispatcherHostIDsProfilesReorderFromIndex(t *testing.T) { + dConns := DispatcherHostIDs{"DSP_1", "DSP_2", "DSP_3"} + eConns := DispatcherHostIDs{"DSP_1", "DSP_2", "DSP_3"} + if dConns.ReorderFromIndex(0); !reflect.DeepEqual(eConns, dConns) { + t.Errorf("expecting: %+v, received: %+v", eConns, dConns) + } + dConns = DispatcherHostIDs{"DSP_1", "DSP_2", "DSP_3"} + if dConns.ReorderFromIndex(3); !reflect.DeepEqual(eConns, dConns) { + t.Errorf("expecting: %+v, received: %+v", eConns, dConns) + } + dConns = DispatcherHostIDs{"DSP_1", "DSP_2", "DSP_3"} + eConns = DispatcherHostIDs{"DSP_3", "DSP_1", "DSP_2"} + if dConns.ReorderFromIndex(2); !reflect.DeepEqual(eConns, dConns) { + t.Errorf("expecting: %+v, received: %+v", eConns, dConns) + } + dConns = DispatcherHostIDs{"DSP_1", "DSP_2", "DSP_3"} + eConns = DispatcherHostIDs{"DSP_2", "DSP_3", "DSP_1"} + if dConns.ReorderFromIndex(1); !reflect.DeepEqual(eConns, dConns) { + t.Errorf("expecting: %+v, received: %+v", + utils.ToJSON(eConns), utils.ToJSON(dConns)) + } +} + +func TestDispatcherHostIDsProfilesShuffle(t *testing.T) { + dConns := DispatcherHostIDs{"DSP_1", "DSP_2", "DSP_3", "DSP_4"} + oConns := DispatcherHostIDs{"DSP_1", "DSP_2", "DSP_3", "DSP_4"} + if dConns.Shuffle(); dConns[0] == oConns[0] || + dConns[1] == oConns[1] || dConns[2] == oConns[2] || + dConns[3] == oConns[3] { + t.Errorf("received: %s", utils.ToJSON(dConns)) + } +} + +func TestDispatcherHostIDsProfilesClone(t *testing.T) { + dConns := DispatcherHostIDs{"DSP_1", "DSP_2", "DSP_3"} + eConns := DispatcherHostIDs{"DSP_1", "DSP_2", "DSP_3"} + d2Conns := dConns.Clone() + d2Conns[0] = "DSP_4" + if !reflect.DeepEqual(eConns, dConns) { + t.Errorf("expecting: %+v, received: %+v", utils.ToJSON(eConns), utils.ToJSON(dConns)) + } +} diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index b73879534..236f8bf5f 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -699,6 +699,14 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err er result, err = ms.getField3(sctx, ColIndx, utils.ChargerFilterIndexes, "key") case utils.DispatcherFilterIndexes: result, err = ms.getField3(sctx, ColIndx, utils.DispatcherFilterIndexes, "key") + case utils.ActionPlanIndexes: + result, err = ms.getField3(sctx, ColIndx, utils.ActionPlanIndexes, "key") + case utils.RateProfilesFilterIndexPrfx: + result, err = ms.getField3(sctx, ColIndx, utils.RateProfilesFilterIndexPrfx, "key") + case utils.RateFilterIndexPrfx: + result, err = ms.getField3(sctx, ColIndx, utils.RateFilterIndexPrfx, "key") + case utils.FilterIndexPrfx: + result, err = ms.getField3(sctx, ColIndx, utils.FilterIndexPrfx, "key") default: err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %s", prefix) } diff --git a/engine/z_storage_cdrs_it_test.go b/engine/z_storage_cdrs_it_test.go index ce335f918..993a5be66 100644 --- a/engine/z_storage_cdrs_it_test.go +++ b/engine/z_storage_cdrs_it_test.go @@ -459,7 +459,7 @@ func testGetCDRs(cfg *config.CGRConfig) error { if err := cdrStorage.SetCDR(cdr, false); err != nil { return fmt.Errorf("testGetCDRs #4 CDR: %+v, err: %v", cdr, err) } - if *dbType == utils.MetaMySQL { + if *dbType == utils.MetaMySQL || *dbType == utils.MetaPostgres { cdr.OrderID = int64(i + 1) } } diff --git a/utils/rpc_params_test.go b/utils/rpc_params_test.go index 556d05086..809afd5ea 100644 --- a/utils/rpc_params_test.go +++ b/utils/rpc_params_test.go @@ -40,15 +40,15 @@ func TestRPCObjectPointer(t *testing.T) { t.Errorf("error converting to struct: %+v (%v)", a, err) } /* - //TODO: make pointer in arguments usable - x, found = rpcParamsMap["RpcStruct.Tropa"] - if !found { - t.Errorf("error getting rpcobject: %v (%+v)", rpcParamsMap, x) - } - b := x.InParam - // log.Printf("T: %+v", b) - if err := mapstructure.Decode(map[string]interface{}{"Name": "a", "Surname": "b", "Age": 10.2}, b); err != nil || b.(*Attr).Name != "a" || b.(*Attr).Surname != "b" || b.(*Attr).Age != 10.2 { - t.Errorf("error converting to struct: %+v (%v)", b, err) - } + //TODO: make pointer in arguments usable + x, found = rpcParamsMap["RpcStruct.Tropa"] + if !found { + t.Errorf("error getting rpcobject: %v (%+v)", rpcParamsMap, x) + } + b := x.InParam + // log.Printf("T: %+v", b) + if err := mapstructure.Decode(map[string]interface{}{"Name": "a", "Surname": "b", "Age": 10.2}, b); err != nil || b.(*Attr).Name != "a" || b.(*Attr).Surname != "b" || b.(*Attr).Age != 10.2 { + t.Errorf("error converting to struct: %+v (%v)", b, err) + } */ }