diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go index 15dc30632..c86438522 100755 --- a/dispatchers/dispatchers.go +++ b/dispatchers/dispatchers.go @@ -125,8 +125,7 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent, // get or build the Dispatcher for the config if x, ok := engine.Cache.Get(utils.CacheDispatchers, tntID); ok && x != nil { - d = x.(Dispatcher) - d.SetProfile(matchedPrlf) + d = x.(Dispatcher).GetInstance() return } if d, err = newDispatcher(matchedPrlf); err != nil { @@ -134,7 +133,7 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent, } engine.Cache.Set(utils.CacheDispatchers, tntID, d, nil, true, utils.EmptyString) - return + return d.GetInstance(), nil } // Dispatch is the method forwarding the request towards the right diff --git a/dispatchers/libdispatcher.go b/dispatchers/libdispatcher.go index 1b4ab260f..8dc131e70 100644 --- a/dispatchers/libdispatcher.go +++ b/dispatchers/libdispatcher.go @@ -20,6 +20,7 @@ package dispatchers import ( "fmt" + "sync" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -31,6 +32,9 @@ type Dispatcher interface { // SetConfig is used to update the configuration information within dispatcher // to make sure we take decisions based on latest config SetProfile(pfl *engine.DispatcherProfile) + // GetInstance will clone Dispatcher and update the internal states of original + // it is needed so the dispatcher logic can be apply per request + GetInstance() (d Dispatcher) // GetConnID returns an ordered list of connection IDs for the event NextConnID() (connID string) // MaxConns returns the maximum number of connections available in the pool @@ -42,7 +46,7 @@ func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) { pfl.Conns.Sort() // make sure the connections are sorted switch pfl.Strategy { case utils.MetaWeight: - d = &WeightDispatcher{pfl: pfl} + d = &WeightDispatcher{conns: pfl.Conns.Clone()} default: err = fmt.Errorf("unsupported dispatch strategy: <%s>", pfl.Strategy) } @@ -51,26 +55,42 @@ func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) { // WeightDispatcher selects the next connection based on weight type WeightDispatcher struct { - pfl *engine.DispatcherProfile + sync.RWMutex + conns engine.DispatcherConns nextConnIdx int // last returned connection index } +// incrNextConnIdx will increment the nextConnIidx +// not thread safe, it needs to be locked in a layer above +func (wd *WeightDispatcher) incrNextConnIdx() { + wd.nextConnIdx++ + if wd.nextConnIdx > len(wd.conns)-1 { + wd.nextConnIdx = 0 // start from beginning + } +} + func (wd *WeightDispatcher) SetProfile(pfl *engine.DispatcherProfile) { pfl.Conns.Sort() - wd.pfl = pfl + wd.Lock() + wd.conns = pfl.Conns.Clone() wd.nextConnIdx = 0 + wd.Unlock() return } +func (wd *WeightDispatcher) GetInstance() (d Dispatcher) { + wd.RLock() + wdInst := &WeightDispatcher{conns: wd.conns.Clone()} + wd.RUnlock() + return wdInst +} + func (wd *WeightDispatcher) NextConnID() (connID string) { - connID = wd.pfl.Conns[wd.nextConnIdx].ID - wd.nextConnIdx++ - if wd.nextConnIdx > len(wd.pfl.Conns)-1 { - wd.nextConnIdx = 0 // start from beginning - } + connID = wd.conns[wd.nextConnIdx].ID + wd.incrNextConnIdx() return } func (wd *WeightDispatcher) MaxConns() int { - return len(wd.pfl.Conns) + return len(wd.conns) } diff --git a/engine/dispatcherprfl.go b/engine/dispatcherprfl.go index 5633a07c7..282553159 100644 --- a/engine/dispatcherprfl.go +++ b/engine/dispatcherprfl.go @@ -32,6 +32,27 @@ type DispatcherConn struct { Blocker bool // no connection after this one } +func (dC *DispatcherConn) Clone() (cln *DispatcherConn) { + cln = &DispatcherConn{ + ID: dC.ID, + Weight: dC.Weight, + Blocker: dC.Blocker, + } + if dC.FilterIDs != nil { + cln.FilterIDs = make([]string, len(dC.FilterIDs)) + for i, fltr := range dC.FilterIDs { + cln.FilterIDs[i] = fltr + } + } + if dC.Params != nil { + cln.Params = make(map[string]interface{}) + for k, v := range dC.Params { + cln.Params[k] = v + } + } + return +} + type DispatcherConns []*DispatcherConn // Sort is part of sort interface, sort based on Weight @@ -39,6 +60,14 @@ func (dConns DispatcherConns) Sort() { sort.Slice(dConns, func(i, j int) bool { return dConns[i].Weight > dConns[j].Weight }) } +func (dConns DispatcherConns) Clone() (cln DispatcherConns) { + cln = make(DispatcherConns, len(dConns)) + for i, dConn := range dConns { + cln[i] = dConn.Clone() + } + return +} + // DispatcherProfile is the config for one Dispatcher type DispatcherProfile struct { Tenant string