mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Adding Dispatcher.GetInstance method for per request Dispatcher cloning
This commit is contained in:
@@ -125,8 +125,7 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent,
|
||||
// get or build the Dispatcher for the config
|
||||
if x, ok := engine.Cache.Get(utils.CacheDispatchers,
|
||||
tntID); ok && x != nil {
|
||||
d = x.(Dispatcher)
|
||||
d.SetProfile(matchedPrlf)
|
||||
d = x.(Dispatcher).GetInstance()
|
||||
return
|
||||
}
|
||||
if d, err = newDispatcher(matchedPrlf); err != nil {
|
||||
@@ -134,7 +133,7 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent,
|
||||
}
|
||||
engine.Cache.Set(utils.CacheDispatchers, tntID, d, nil,
|
||||
true, utils.EmptyString)
|
||||
return
|
||||
return d.GetInstance(), nil
|
||||
}
|
||||
|
||||
// Dispatch is the method forwarding the request towards the right
|
||||
|
||||
@@ -20,6 +20,7 @@ package dispatchers
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -31,6 +32,9 @@ type Dispatcher interface {
|
||||
// SetConfig is used to update the configuration information within dispatcher
|
||||
// to make sure we take decisions based on latest config
|
||||
SetProfile(pfl *engine.DispatcherProfile)
|
||||
// GetInstance will clone Dispatcher and update the internal states of original
|
||||
// it is needed so the dispatcher logic can be apply per request
|
||||
GetInstance() (d Dispatcher)
|
||||
// GetConnID returns an ordered list of connection IDs for the event
|
||||
NextConnID() (connID string)
|
||||
// MaxConns returns the maximum number of connections available in the pool
|
||||
@@ -42,7 +46,7 @@ func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) {
|
||||
pfl.Conns.Sort() // make sure the connections are sorted
|
||||
switch pfl.Strategy {
|
||||
case utils.MetaWeight:
|
||||
d = &WeightDispatcher{pfl: pfl}
|
||||
d = &WeightDispatcher{conns: pfl.Conns.Clone()}
|
||||
default:
|
||||
err = fmt.Errorf("unsupported dispatch strategy: <%s>", pfl.Strategy)
|
||||
}
|
||||
@@ -51,26 +55,42 @@ func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) {
|
||||
|
||||
// WeightDispatcher selects the next connection based on weight
|
||||
type WeightDispatcher struct {
|
||||
pfl *engine.DispatcherProfile
|
||||
sync.RWMutex
|
||||
conns engine.DispatcherConns
|
||||
nextConnIdx int // last returned connection index
|
||||
}
|
||||
|
||||
// incrNextConnIdx will increment the nextConnIidx
|
||||
// not thread safe, it needs to be locked in a layer above
|
||||
func (wd *WeightDispatcher) incrNextConnIdx() {
|
||||
wd.nextConnIdx++
|
||||
if wd.nextConnIdx > len(wd.conns)-1 {
|
||||
wd.nextConnIdx = 0 // start from beginning
|
||||
}
|
||||
}
|
||||
|
||||
func (wd *WeightDispatcher) SetProfile(pfl *engine.DispatcherProfile) {
|
||||
pfl.Conns.Sort()
|
||||
wd.pfl = pfl
|
||||
wd.Lock()
|
||||
wd.conns = pfl.Conns.Clone()
|
||||
wd.nextConnIdx = 0
|
||||
wd.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
func (wd *WeightDispatcher) GetInstance() (d Dispatcher) {
|
||||
wd.RLock()
|
||||
wdInst := &WeightDispatcher{conns: wd.conns.Clone()}
|
||||
wd.RUnlock()
|
||||
return wdInst
|
||||
}
|
||||
|
||||
func (wd *WeightDispatcher) NextConnID() (connID string) {
|
||||
connID = wd.pfl.Conns[wd.nextConnIdx].ID
|
||||
wd.nextConnIdx++
|
||||
if wd.nextConnIdx > len(wd.pfl.Conns)-1 {
|
||||
wd.nextConnIdx = 0 // start from beginning
|
||||
}
|
||||
connID = wd.conns[wd.nextConnIdx].ID
|
||||
wd.incrNextConnIdx()
|
||||
return
|
||||
}
|
||||
|
||||
func (wd *WeightDispatcher) MaxConns() int {
|
||||
return len(wd.pfl.Conns)
|
||||
return len(wd.conns)
|
||||
}
|
||||
|
||||
@@ -32,6 +32,27 @@ type DispatcherConn struct {
|
||||
Blocker bool // no connection after this one
|
||||
}
|
||||
|
||||
func (dC *DispatcherConn) Clone() (cln *DispatcherConn) {
|
||||
cln = &DispatcherConn{
|
||||
ID: dC.ID,
|
||||
Weight: dC.Weight,
|
||||
Blocker: dC.Blocker,
|
||||
}
|
||||
if dC.FilterIDs != nil {
|
||||
cln.FilterIDs = make([]string, len(dC.FilterIDs))
|
||||
for i, fltr := range dC.FilterIDs {
|
||||
cln.FilterIDs[i] = fltr
|
||||
}
|
||||
}
|
||||
if dC.Params != nil {
|
||||
cln.Params = make(map[string]interface{})
|
||||
for k, v := range dC.Params {
|
||||
cln.Params[k] = v
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type DispatcherConns []*DispatcherConn
|
||||
|
||||
// Sort is part of sort interface, sort based on Weight
|
||||
@@ -39,6 +60,14 @@ func (dConns DispatcherConns) Sort() {
|
||||
sort.Slice(dConns, func(i, j int) bool { return dConns[i].Weight > dConns[j].Weight })
|
||||
}
|
||||
|
||||
func (dConns DispatcherConns) Clone() (cln DispatcherConns) {
|
||||
cln = make(DispatcherConns, len(dConns))
|
||||
for i, dConn := range dConns {
|
||||
cln[i] = dConn.Clone()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DispatcherProfile is the config for one Dispatcher
|
||||
type DispatcherProfile struct {
|
||||
Tenant string
|
||||
|
||||
Reference in New Issue
Block a user