Dispatcher.GetConnIDs implementation, profile ReorderFromIndex and Shuffle methods

This commit is contained in:
DanB
2019-02-15 15:27:11 +01:00
parent 07f337dfe7
commit 46a919d9f3
4 changed files with 142 additions and 36 deletions

View File

@@ -125,7 +125,7 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent,
// get or build the Dispatcher for the config
if x, ok := engine.Cache.Get(utils.CacheDispatchers,
tntID); ok && x != nil {
d = x.(Dispatcher).GetInstance()
d = x.(Dispatcher)
return
}
if d, err = newDispatcher(matchedPrlf); err != nil {
@@ -133,30 +133,29 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent,
}
engine.Cache.Set(utils.CacheDispatchers, tntID, d, nil,
true, utils.EmptyString)
return d.GetInstance(), nil
return
}
// Dispatch is the method forwarding the request towards the right
func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, RouteID *string,
// Dispatch is the method forwarding the request towards the right connection
func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, routeID *string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
d, errDsp := dS.dispatcherForEvent(ev, subsys)
if errDsp != nil {
return utils.NewErrDispatcherS(errDsp)
}
var connID string
if RouteID != nil &&
*RouteID != "" {
if routeID != nil &&
*routeID != "" {
// use previously discovered route
if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes,
*RouteID); ok && x != nil {
*routeID); ok && x != nil {
connID = x.(string)
if err = dS.conns[connID].Call(serviceMethod, args, reply); !utils.IsNetworkError(err) {
return
}
}
}
for i := 0; i < d.MaxConns(); i++ {
connID := d.NextConnID()
for _, connID = range d.ConnIDs() {
conn, has := dS.conns[connID]
if !has {
err = utils.NewErrDispatcherS(
@@ -166,9 +165,9 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, RouteID
if err = conn.Call(serviceMethod, args, reply); utils.IsNetworkError(err) {
continue
}
if RouteID != nil &&
*RouteID != "" { // cache the discovered route
engine.Cache.Set(utils.CacheDispatcherRoutes, *RouteID, connID,
if routeID != nil &&
*routeID != "" { // cache the discovered route
engine.Cache.Set(utils.CacheDispatcherRoutes, *routeID, connID,
nil, true, utils.EmptyString)
}
break

View File

@@ -29,16 +29,11 @@ import (
// Dispatcher is responsible for routing requests to pool of connections
// there will be different implementations based on strategy
type Dispatcher interface {
// SetConfig is used to update the configuration information within dispatcher
// SetProfile is used to update the configuration information within dispatcher
// to make sure we take decisions based on latest config
SetProfile(pfl *engine.DispatcherProfile)
// GetInstance will clone Dispatcher and update the internal states of original
// it is needed so the dispatcher logic can be apply per request
GetInstance() (d Dispatcher)
// GetConnID returns an ordered list of connection IDs for the event
NextConnID() (connID string)
// MaxConns returns the maximum number of connections available in the pool
MaxConns() int
// ConnIDs returns the ordered list of hosts IDs
ConnIDs() (conns []string)
}
// newDispatcher constructs instances of Dispatcher
@@ -56,41 +51,30 @@ func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) {
// WeightDispatcher selects the next connection based on weight
type WeightDispatcher struct {
sync.RWMutex
conns engine.DispatcherConns
nextConnIdx int // last returned connection index
conns engine.DispatcherConns
}
// incrNextConnIdx will increment the nextConnIidx
// not thread safe, it needs to be locked in a layer above
func (wd *WeightDispatcher) incrNextConnIdx() {
/*func (wd *WeightDispatcher) incrNextConnIdx() {
wd.nextConnIdx++
if wd.nextConnIdx > len(wd.conns)-1 {
wd.nextConnIdx = 0 // start from beginning
}
}
*/
func (wd *WeightDispatcher) SetProfile(pfl *engine.DispatcherProfile) {
pfl.Conns.Sort()
wd.Lock()
wd.conns = pfl.Conns.Clone()
wd.nextConnIdx = 0
wd.Unlock()
return
}
func (wd *WeightDispatcher) GetInstance() (d Dispatcher) {
func (wd *WeightDispatcher) ConnIDs() (connIDs []string) {
wd.RLock()
wdInst := &WeightDispatcher{conns: wd.conns.Clone()}
connIDs = wd.conns.ConnIDs()
wd.RUnlock()
return wdInst
}
func (wd *WeightDispatcher) NextConnID() (connID string) {
connID = wd.conns[wd.nextConnIdx].ID
wd.incrNextConnIdx()
return
}
func (wd *WeightDispatcher) MaxConns() int {
return len(wd.conns)
}

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"math/rand"
"sort"
"github.com/cgrates/cgrates/utils"
@@ -60,6 +61,27 @@ func (dConns DispatcherConns) Sort() {
sort.Slice(dConns, func(i, j int) bool { return dConns[i].Weight > dConns[j].Weight })
}
// ReorderFromIndex will consider idx as starting point for the reordered slice
func (dConns DispatcherConns) ReorderFromIndex(idx int) {
initConns := dConns.Clone()
for i := 0; i < len(dConns); i++ {
if idx > len(dConns)-1 {
idx = 0
}
dConns[i] = initConns[idx]
idx++
}
return
}
// Shuffle will mix the connections in place
func (dConns DispatcherConns) Shuffle() {
rand.Shuffle(len(dConns), func(i, j int) {
dConns[i], dConns[j] = dConns[j], dConns[i]
})
return
}
func (dConns DispatcherConns) Clone() (cln DispatcherConns) {
cln = make(DispatcherConns, len(dConns))
for i, dConn := range dConns {
@@ -68,6 +90,14 @@ func (dConns DispatcherConns) Clone() (cln DispatcherConns) {
return
}
func (dConns DispatcherConns) ConnIDs() (connIDs []string) {
connIDs = make([]string, len(dConns))
for i, conn := range dConns {
connIDs[i] = conn.ID
}
return
}
// DispatcherProfile is the config for one Dispatcher
type DispatcherProfile struct {
Tenant string

View File

@@ -0,0 +1,93 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"reflect"
"testing"
"github.com/cgrates/cgrates/utils"
)
func TestDispatcherConnsReorderFromIndex(t *testing.T) {
dConns := DispatcherConns{
{ID: "DSP_1", Weight: 30},
{ID: "DSP_2", Weight: 20},
{ID: "DSP_3", Weight: 10},
}
eConns := DispatcherConns{
{ID: "DSP_1", Weight: 30},
{ID: "DSP_2", Weight: 20},
{ID: "DSP_3", Weight: 10},
}
if dConns.ReorderFromIndex(0); !reflect.DeepEqual(eConns, dConns) {
t.Errorf("expecting: %+v, received: %+v", eConns, dConns)
}
dConns = DispatcherConns{
{ID: "DSP_1", Weight: 30},
{ID: "DSP_2", Weight: 20},
{ID: "DSP_3", Weight: 10},
}
if dConns.ReorderFromIndex(3); !reflect.DeepEqual(eConns, dConns) {
t.Errorf("expecting: %+v, received: %+v", eConns, dConns)
}
dConns = DispatcherConns{
{ID: "DSP_1", Weight: 30},
{ID: "DSP_2", Weight: 20},
{ID: "DSP_3", Weight: 10},
}
eConns = DispatcherConns{
{ID: "DSP_3", Weight: 10},
{ID: "DSP_1", Weight: 30},
{ID: "DSP_2", Weight: 20},
}
if dConns.ReorderFromIndex(2); !reflect.DeepEqual(eConns, dConns) {
t.Errorf("expecting: %+v, received: %+v", eConns, dConns)
}
dConns = DispatcherConns{
{ID: "DSP_1", Weight: 30},
{ID: "DSP_2", Weight: 20},
{ID: "DSP_3", Weight: 10},
}
eConns = DispatcherConns{
{ID: "DSP_2", Weight: 20},
{ID: "DSP_3", Weight: 10},
{ID: "DSP_1", Weight: 30},
}
if dConns.ReorderFromIndex(1); !reflect.DeepEqual(eConns, dConns) {
t.Errorf("expecting: %+v, received: %+v",
utils.ToJSON(eConns), utils.ToJSON(dConns))
}
}
func TestDispatcherConnsShuffle(t *testing.T) {
dConns := DispatcherConns{
{ID: "DSP_1", Weight: 30},
{ID: "DSP_2", Weight: 20},
{ID: "DSP_3", Weight: 10},
}
oConns := DispatcherConns{
{ID: "DSP_1", Weight: 30},
{ID: "DSP_2", Weight: 20},
{ID: "DSP_3", Weight: 10},
}
if dConns.Shuffle(); dConns[0] == oConns[0] ||
dConns[1] == oConns[1] || dConns[2] == oConns[2] {
t.Errorf("received: %s", utils.ToJSON(dConns))
}
}