diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go index c86438522..5963c3763 100755 --- a/dispatchers/dispatchers.go +++ b/dispatchers/dispatchers.go @@ -125,7 +125,7 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent, // get or build the Dispatcher for the config if x, ok := engine.Cache.Get(utils.CacheDispatchers, tntID); ok && x != nil { - d = x.(Dispatcher).GetInstance() + d = x.(Dispatcher) return } if d, err = newDispatcher(matchedPrlf); err != nil { @@ -133,30 +133,29 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent, } engine.Cache.Set(utils.CacheDispatchers, tntID, d, nil, true, utils.EmptyString) - return d.GetInstance(), nil + return } -// Dispatch is the method forwarding the request towards the right -func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, RouteID *string, +// Dispatch is the method forwarding the request towards the right connection +func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, routeID *string, serviceMethod string, args interface{}, reply interface{}) (err error) { d, errDsp := dS.dispatcherForEvent(ev, subsys) if errDsp != nil { return utils.NewErrDispatcherS(errDsp) } var connID string - if RouteID != nil && - *RouteID != "" { + if routeID != nil && + *routeID != "" { // use previously discovered route if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes, - *RouteID); ok && x != nil { + *routeID); ok && x != nil { connID = x.(string) if err = dS.conns[connID].Call(serviceMethod, args, reply); !utils.IsNetworkError(err) { return } } } - for i := 0; i < d.MaxConns(); i++ { - connID := d.NextConnID() + for _, connID = range d.ConnIDs() { conn, has := dS.conns[connID] if !has { err = utils.NewErrDispatcherS( @@ -166,9 +165,9 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, RouteID if err = conn.Call(serviceMethod, args, reply); utils.IsNetworkError(err) { continue } - if RouteID != nil && - *RouteID != "" { // cache the discovered route - engine.Cache.Set(utils.CacheDispatcherRoutes, *RouteID, connID, + if routeID != nil && + *routeID != "" { // cache the discovered route + engine.Cache.Set(utils.CacheDispatcherRoutes, *routeID, connID, nil, true, utils.EmptyString) } break diff --git a/dispatchers/libdispatcher.go b/dispatchers/libdispatcher.go index 8dc131e70..96afa3f8c 100644 --- a/dispatchers/libdispatcher.go +++ b/dispatchers/libdispatcher.go @@ -29,16 +29,11 @@ import ( // Dispatcher is responsible for routing requests to pool of connections // there will be different implementations based on strategy type Dispatcher interface { - // SetConfig is used to update the configuration information within dispatcher + // SetProfile is used to update the configuration information within dispatcher // to make sure we take decisions based on latest config SetProfile(pfl *engine.DispatcherProfile) - // GetInstance will clone Dispatcher and update the internal states of original - // it is needed so the dispatcher logic can be apply per request - GetInstance() (d Dispatcher) - // GetConnID returns an ordered list of connection IDs for the event - NextConnID() (connID string) - // MaxConns returns the maximum number of connections available in the pool - MaxConns() int + // ConnIDs returns the ordered list of hosts IDs + ConnIDs() (conns []string) } // newDispatcher constructs instances of Dispatcher @@ -56,41 +51,30 @@ func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) { // WeightDispatcher selects the next connection based on weight type WeightDispatcher struct { sync.RWMutex - conns engine.DispatcherConns - nextConnIdx int // last returned connection index + conns engine.DispatcherConns } // incrNextConnIdx will increment the nextConnIidx // not thread safe, it needs to be locked in a layer above -func (wd *WeightDispatcher) incrNextConnIdx() { +/*func (wd *WeightDispatcher) incrNextConnIdx() { wd.nextConnIdx++ if wd.nextConnIdx > len(wd.conns)-1 { wd.nextConnIdx = 0 // start from beginning } } +*/ func (wd *WeightDispatcher) SetProfile(pfl *engine.DispatcherProfile) { pfl.Conns.Sort() wd.Lock() wd.conns = pfl.Conns.Clone() - wd.nextConnIdx = 0 wd.Unlock() return } -func (wd *WeightDispatcher) GetInstance() (d Dispatcher) { +func (wd *WeightDispatcher) ConnIDs() (connIDs []string) { wd.RLock() - wdInst := &WeightDispatcher{conns: wd.conns.Clone()} + connIDs = wd.conns.ConnIDs() wd.RUnlock() - return wdInst -} - -func (wd *WeightDispatcher) NextConnID() (connID string) { - connID = wd.conns[wd.nextConnIdx].ID - wd.incrNextConnIdx() return } - -func (wd *WeightDispatcher) MaxConns() int { - return len(wd.conns) -} diff --git a/engine/dispatcherprfl.go b/engine/dispatcherprfl.go index 282553159..d9e409d38 100644 --- a/engine/dispatcherprfl.go +++ b/engine/dispatcherprfl.go @@ -19,6 +19,7 @@ along with this program. If not, see package engine import ( + "math/rand" "sort" "github.com/cgrates/cgrates/utils" @@ -60,6 +61,27 @@ func (dConns DispatcherConns) Sort() { sort.Slice(dConns, func(i, j int) bool { return dConns[i].Weight > dConns[j].Weight }) } +// ReorderFromIndex will consider idx as starting point for the reordered slice +func (dConns DispatcherConns) ReorderFromIndex(idx int) { + initConns := dConns.Clone() + for i := 0; i < len(dConns); i++ { + if idx > len(dConns)-1 { + idx = 0 + } + dConns[i] = initConns[idx] + idx++ + } + return +} + +// Shuffle will mix the connections in place +func (dConns DispatcherConns) Shuffle() { + rand.Shuffle(len(dConns), func(i, j int) { + dConns[i], dConns[j] = dConns[j], dConns[i] + }) + return +} + func (dConns DispatcherConns) Clone() (cln DispatcherConns) { cln = make(DispatcherConns, len(dConns)) for i, dConn := range dConns { @@ -68,6 +90,14 @@ func (dConns DispatcherConns) Clone() (cln DispatcherConns) { return } +func (dConns DispatcherConns) ConnIDs() (connIDs []string) { + connIDs = make([]string, len(dConns)) + for i, conn := range dConns { + connIDs[i] = conn.ID + } + return +} + // DispatcherProfile is the config for one Dispatcher type DispatcherProfile struct { Tenant string diff --git a/engine/dispatcherprfl_test.go b/engine/dispatcherprfl_test.go new file mode 100644 index 000000000..eb0ccb185 --- /dev/null +++ b/engine/dispatcherprfl_test.go @@ -0,0 +1,93 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package engine + +import ( + "reflect" + "testing" + + "github.com/cgrates/cgrates/utils" +) + +func TestDispatcherConnsReorderFromIndex(t *testing.T) { + dConns := DispatcherConns{ + {ID: "DSP_1", Weight: 30}, + {ID: "DSP_2", Weight: 20}, + {ID: "DSP_3", Weight: 10}, + } + eConns := DispatcherConns{ + {ID: "DSP_1", Weight: 30}, + {ID: "DSP_2", Weight: 20}, + {ID: "DSP_3", Weight: 10}, + } + if dConns.ReorderFromIndex(0); !reflect.DeepEqual(eConns, dConns) { + t.Errorf("expecting: %+v, received: %+v", eConns, dConns) + } + dConns = DispatcherConns{ + {ID: "DSP_1", Weight: 30}, + {ID: "DSP_2", Weight: 20}, + {ID: "DSP_3", Weight: 10}, + } + if dConns.ReorderFromIndex(3); !reflect.DeepEqual(eConns, dConns) { + t.Errorf("expecting: %+v, received: %+v", eConns, dConns) + } + dConns = DispatcherConns{ + {ID: "DSP_1", Weight: 30}, + {ID: "DSP_2", Weight: 20}, + {ID: "DSP_3", Weight: 10}, + } + eConns = DispatcherConns{ + {ID: "DSP_3", Weight: 10}, + {ID: "DSP_1", Weight: 30}, + {ID: "DSP_2", Weight: 20}, + } + if dConns.ReorderFromIndex(2); !reflect.DeepEqual(eConns, dConns) { + t.Errorf("expecting: %+v, received: %+v", eConns, dConns) + } + dConns = DispatcherConns{ + {ID: "DSP_1", Weight: 30}, + {ID: "DSP_2", Weight: 20}, + {ID: "DSP_3", Weight: 10}, + } + eConns = DispatcherConns{ + {ID: "DSP_2", Weight: 20}, + {ID: "DSP_3", Weight: 10}, + {ID: "DSP_1", Weight: 30}, + } + if dConns.ReorderFromIndex(1); !reflect.DeepEqual(eConns, dConns) { + t.Errorf("expecting: %+v, received: %+v", + utils.ToJSON(eConns), utils.ToJSON(dConns)) + } +} + +func TestDispatcherConnsShuffle(t *testing.T) { + dConns := DispatcherConns{ + {ID: "DSP_1", Weight: 30}, + {ID: "DSP_2", Weight: 20}, + {ID: "DSP_3", Weight: 10}, + } + oConns := DispatcherConns{ + {ID: "DSP_1", Weight: 30}, + {ID: "DSP_2", Weight: 20}, + {ID: "DSP_3", Weight: 10}, + } + if dConns.Shuffle(); dConns[0] == oConns[0] || + dConns[1] == oConns[1] || dConns[2] == oConns[2] { + t.Errorf("received: %s", utils.ToJSON(dConns)) + } +}