Added the *broadcast strategy to DispatcherS

This commit is contained in:
Trial97
2019-03-19 11:56:58 +02:00
committed by Dan Christian Bogos
parent 2c2860ae82
commit f1f974d60e
3 changed files with 121 additions and 34 deletions

View File

@@ -143,36 +143,7 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, routeID
if errDsp != nil {
return utils.NewErrDispatcherS(errDsp)
}
var connID string
if routeID != nil &&
*routeID != "" {
// use previously discovered route
if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes,
*routeID); ok && x != nil {
connID = x.(string)
if err = dS.conns[connID].Call(serviceMethod, args, reply); !utils.IsNetworkError(err) {
return
}
}
}
for _, connID = range d.ConnIDs() {
conn, has := dS.conns[connID]
if !has {
err = utils.NewErrDispatcherS(
fmt.Errorf("no connection with id: <%s>", connID))
continue
}
if err = conn.Call(serviceMethod, args, reply); utils.IsNetworkError(err) {
continue
}
if routeID != nil &&
*routeID != "" { // cache the discovered route
engine.Cache.Set(utils.CacheDispatcherRoutes, *routeID, connID,
nil, true, utils.EmptyString)
}
break
}
return
return d.Dispatch(dS.conns, routeID, serviceMethod, args, reply)
}
func (dS *DispatcherService) authorizeEvent(ev *utils.CGREvent,

View File

@@ -20,10 +20,12 @@ package dispatchers
import (
"fmt"
"reflect"
"sync"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// Dispatcher is responsible for routing requests to pool of connections
@@ -34,6 +36,9 @@ type Dispatcher interface {
SetProfile(pfl *engine.DispatcherProfile)
// ConnIDs returns the ordered list of hosts IDs
ConnIDs() (conns []string)
// Dispatch is used to send the method over the connections given
Dispatch(conns map[string]*rpcclient.RpcClientPool, routeID *string,
serviceMethod string, args interface{}, reply interface{}) (err error)
}
// newDispatcher constructs instances of Dispatcher
@@ -46,12 +51,49 @@ func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) {
d = &RandomDispatcher{conns: pfl.Conns.Clone()}
case utils.MetaRoundRobin:
d = &RoundRobinDispatcher{conns: pfl.Conns.Clone()}
case utils.MetaBroadcast:
d = &BroadcastDispatcher{conns: pfl.Conns.Clone()}
default:
err = fmt.Errorf("unsupported dispatch strategy: <%s>", pfl.Strategy)
}
return
}
// Dispatch is used to send the method over the connections given until one is send corectly
func DispatchOne(d Dispatcher, conns map[string]*rpcclient.RpcClientPool, routeID *string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
var connID string
if routeID != nil &&
*routeID != "" {
// use previously discovered route
if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes,
*routeID); ok && x != nil {
connID = x.(string)
if err = conns[connID].Call(serviceMethod, args, reply); !utils.IsNetworkError(err) {
return
}
}
}
for _, connID = range d.ConnIDs() {
conn, has := conns[connID]
if !has {
err = utils.NewErrDispatcherS(
fmt.Errorf("no connection with id: <%s>", connID))
continue
}
if err = conn.Call(serviceMethod, args, reply); utils.IsNetworkError(err) {
continue
}
if routeID != nil &&
*routeID != "" { // cache the discovered route
engine.Cache.Set(utils.CacheDispatcherRoutes, *routeID, connID,
nil, true, utils.EmptyString)
}
break
}
return
}
// WeightDispatcher selects the next connection based on weight
type WeightDispatcher struct {
sync.RWMutex
@@ -73,6 +115,11 @@ func (wd *WeightDispatcher) ConnIDs() (connIDs []string) {
return
}
func (wd *WeightDispatcher) Dispatch(conns map[string]*rpcclient.RpcClientPool, routeID *string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
return DispatchOne(wd, conns, routeID, serviceMethod, args, reply)
}
// RandomDispatcher selects the next connection randomly
// together with RouteID can serve as load-balancer
type RandomDispatcher struct {
@@ -95,6 +142,11 @@ func (d *RandomDispatcher) ConnIDs() (connIDs []string) {
return conns.ConnIDs()
}
func (d *RandomDispatcher) Dispatch(conns map[string]*rpcclient.RpcClientPool, routeID *string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
return DispatchOne(d, conns, routeID, serviceMethod, args, reply)
}
// RoundRobinDispatcher selects the next connection in round-robin fashion
type RoundRobinDispatcher struct {
sync.RWMutex
@@ -120,3 +172,68 @@ func (d *RoundRobinDispatcher) ConnIDs() (connIDs []string) {
d.RUnlock()
return conns.ConnIDs()
}
func (d *RoundRobinDispatcher) Dispatch(conns map[string]*rpcclient.RpcClientPool, routeID *string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
return DispatchOne(d, conns, routeID, serviceMethod, args, reply)
}
// RoundRobinDispatcher selects the next connection in round-robin fashion
type BroadcastDispatcher struct {
sync.RWMutex
conns engine.DispatcherConns
connIdx int // used for the next connection
}
func (d *BroadcastDispatcher) SetProfile(pfl *engine.DispatcherProfile) {
d.Lock()
pfl.Conns.Sort()
d.conns = pfl.Conns.Clone() // avoid concurrency on profile
d.Unlock()
return
}
func (d *BroadcastDispatcher) ConnIDs() (connIDs []string) {
d.RLock()
connIDs = d.conns.ConnIDs()
d.RUnlock()
return
}
func (d *BroadcastDispatcher) Dispatch(conns map[string]*rpcclient.RpcClientPool, routeID *string,
serviceMethod string, args interface{}, reply interface{}) (lastErr error) { // no cache needed for this strategy because we need to call all connections
var firstReply interface{} = nil
var err error
for _, connID := range d.ConnIDs() {
conn, has := conns[connID]
if !has {
err = utils.NewErrDispatcherS(
fmt.Errorf("no connection with id: <%s>", connID))
utils.Logger.Err(fmt.Sprintf("<%s> Error at %s strategy for connID %q : %s",
utils.DispatcherS, utils.MetaBroadcast, connID, err.Error()))
lastErr = err
continue
}
if err = conn.Call(serviceMethod, args, reply); utils.IsNetworkError(err) {
utils.Logger.Err(fmt.Sprintf("<%s> Error at %s strategy for connID %q : %s",
utils.DispatcherS, utils.MetaBroadcast, connID, err.Error()))
lastErr = err
continue
} else if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error at %s strategy for connID %q : %s",
utils.DispatcherS, utils.MetaBroadcast, connID, err.Error()))
lastErr = err
}
if firstReply == nil { // save first value
firstReply = reflect.ValueOf(reply).Elem().Interface()
}
}
if firstReply == nil { // do not rewrite lastErr if no call was succcesful
return
}
reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(firstReply)) // set reply value to the first succesfuly call
if lastErr != nil { // rewrite err if not all call were succesfull
lastErr = utils.ErrPartiallyExecuted
}
return
}

View File

@@ -42,10 +42,9 @@ const (
)
var (
ErrPartiallyExecuted = errors.New("PARTIALLY_EXECUTED")
ErrActiveSession = errors.New("ACTIVE_SESSION")
ErrForcedDisconnect = errors.New("FORCED_DISCONNECT")
debug bool
ErrActiveSession = errors.New("ACTIVE_SESSION")
ErrForcedDisconnect = errors.New("FORCED_DISCONNECT")
debug bool
)
// NewSReplConns initiates the connections configured for session replication