From f1f974d60ee076db6d7829351ec0015f1d82b2e5 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Tue, 19 Mar 2019 11:56:58 +0200 Subject: [PATCH] Added the *broadcast strategy to DispatcherS --- dispatchers/dispatchers.go | 31 +--------- dispatchers/libdispatcher.go | 117 +++++++++++++++++++++++++++++++++++ sessions/sessions.go | 7 +-- 3 files changed, 121 insertions(+), 34 deletions(-) diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go index 046342f59..2367e2079 100755 --- a/dispatchers/dispatchers.go +++ b/dispatchers/dispatchers.go @@ -143,36 +143,7 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, routeID if errDsp != nil { return utils.NewErrDispatcherS(errDsp) } - var connID string - if routeID != nil && - *routeID != "" { - // use previously discovered route - if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes, - *routeID); ok && x != nil { - connID = x.(string) - if err = dS.conns[connID].Call(serviceMethod, args, reply); !utils.IsNetworkError(err) { - return - } - } - } - for _, connID = range d.ConnIDs() { - conn, has := dS.conns[connID] - if !has { - err = utils.NewErrDispatcherS( - fmt.Errorf("no connection with id: <%s>", connID)) - continue - } - if err = conn.Call(serviceMethod, args, reply); utils.IsNetworkError(err) { - continue - } - if routeID != nil && - *routeID != "" { // cache the discovered route - engine.Cache.Set(utils.CacheDispatcherRoutes, *routeID, connID, - nil, true, utils.EmptyString) - } - break - } - return + return d.Dispatch(dS.conns, routeID, serviceMethod, args, reply) } func (dS *DispatcherService) authorizeEvent(ev *utils.CGREvent, diff --git a/dispatchers/libdispatcher.go b/dispatchers/libdispatcher.go index 657f1fc89..370786f59 100644 --- a/dispatchers/libdispatcher.go +++ b/dispatchers/libdispatcher.go @@ -20,10 +20,12 @@ package dispatchers import ( "fmt" + "reflect" "sync" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) // Dispatcher is responsible for routing requests to pool of connections @@ -34,6 +36,9 @@ type Dispatcher interface { SetProfile(pfl *engine.DispatcherProfile) // ConnIDs returns the ordered list of hosts IDs ConnIDs() (conns []string) + // Dispatch is used to send the method over the connections given + Dispatch(conns map[string]*rpcclient.RpcClientPool, routeID *string, + serviceMethod string, args interface{}, reply interface{}) (err error) } // newDispatcher constructs instances of Dispatcher @@ -46,12 +51,49 @@ func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) { d = &RandomDispatcher{conns: pfl.Conns.Clone()} case utils.MetaRoundRobin: d = &RoundRobinDispatcher{conns: pfl.Conns.Clone()} + case utils.MetaBroadcast: + d = &BroadcastDispatcher{conns: pfl.Conns.Clone()} default: err = fmt.Errorf("unsupported dispatch strategy: <%s>", pfl.Strategy) } return } +// Dispatch is used to send the method over the connections given until one is send corectly +func DispatchOne(d Dispatcher, conns map[string]*rpcclient.RpcClientPool, routeID *string, + serviceMethod string, args interface{}, reply interface{}) (err error) { + var connID string + if routeID != nil && + *routeID != "" { + // use previously discovered route + if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes, + *routeID); ok && x != nil { + connID = x.(string) + if err = conns[connID].Call(serviceMethod, args, reply); !utils.IsNetworkError(err) { + return + } + } + } + for _, connID = range d.ConnIDs() { + conn, has := conns[connID] + if !has { + err = utils.NewErrDispatcherS( + fmt.Errorf("no connection with id: <%s>", connID)) + continue + } + if err = conn.Call(serviceMethod, args, reply); utils.IsNetworkError(err) { + continue + } + if routeID != nil && + *routeID != "" { // cache the discovered route + engine.Cache.Set(utils.CacheDispatcherRoutes, *routeID, connID, + nil, true, utils.EmptyString) + } + break + } + return +} + // WeightDispatcher selects the next connection based on weight type WeightDispatcher struct { sync.RWMutex @@ -73,6 +115,11 @@ func (wd *WeightDispatcher) ConnIDs() (connIDs []string) { return } +func (wd *WeightDispatcher) Dispatch(conns map[string]*rpcclient.RpcClientPool, routeID *string, + serviceMethod string, args interface{}, reply interface{}) (err error) { + return DispatchOne(wd, conns, routeID, serviceMethod, args, reply) +} + // RandomDispatcher selects the next connection randomly // together with RouteID can serve as load-balancer type RandomDispatcher struct { @@ -95,6 +142,11 @@ func (d *RandomDispatcher) ConnIDs() (connIDs []string) { return conns.ConnIDs() } +func (d *RandomDispatcher) Dispatch(conns map[string]*rpcclient.RpcClientPool, routeID *string, + serviceMethod string, args interface{}, reply interface{}) (err error) { + return DispatchOne(d, conns, routeID, serviceMethod, args, reply) +} + // RoundRobinDispatcher selects the next connection in round-robin fashion type RoundRobinDispatcher struct { sync.RWMutex @@ -120,3 +172,68 @@ func (d *RoundRobinDispatcher) ConnIDs() (connIDs []string) { d.RUnlock() return conns.ConnIDs() } + +func (d *RoundRobinDispatcher) Dispatch(conns map[string]*rpcclient.RpcClientPool, routeID *string, + serviceMethod string, args interface{}, reply interface{}) (err error) { + return DispatchOne(d, conns, routeID, serviceMethod, args, reply) +} + +// RoundRobinDispatcher selects the next connection in round-robin fashion +type BroadcastDispatcher struct { + sync.RWMutex + conns engine.DispatcherConns + connIdx int // used for the next connection +} + +func (d *BroadcastDispatcher) SetProfile(pfl *engine.DispatcherProfile) { + d.Lock() + pfl.Conns.Sort() + d.conns = pfl.Conns.Clone() // avoid concurrency on profile + d.Unlock() + return +} + +func (d *BroadcastDispatcher) ConnIDs() (connIDs []string) { + d.RLock() + connIDs = d.conns.ConnIDs() + d.RUnlock() + return +} + +func (d *BroadcastDispatcher) Dispatch(conns map[string]*rpcclient.RpcClientPool, routeID *string, + serviceMethod string, args interface{}, reply interface{}) (lastErr error) { // no cache needed for this strategy because we need to call all connections + var firstReply interface{} = nil + var err error + for _, connID := range d.ConnIDs() { + conn, has := conns[connID] + if !has { + err = utils.NewErrDispatcherS( + fmt.Errorf("no connection with id: <%s>", connID)) + utils.Logger.Err(fmt.Sprintf("<%s> Error at %s strategy for connID %q : %s", + utils.DispatcherS, utils.MetaBroadcast, connID, err.Error())) + lastErr = err + continue + } + if err = conn.Call(serviceMethod, args, reply); utils.IsNetworkError(err) { + utils.Logger.Err(fmt.Sprintf("<%s> Error at %s strategy for connID %q : %s", + utils.DispatcherS, utils.MetaBroadcast, connID, err.Error())) + lastErr = err + continue + } else if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Error at %s strategy for connID %q : %s", + utils.DispatcherS, utils.MetaBroadcast, connID, err.Error())) + lastErr = err + } + if firstReply == nil { // save first value + firstReply = reflect.ValueOf(reply).Elem().Interface() + } + } + if firstReply == nil { // do not rewrite lastErr if no call was succcesful + return + } + reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(firstReply)) // set reply value to the first succesfuly call + if lastErr != nil { // rewrite err if not all call were succesfull + lastErr = utils.ErrPartiallyExecuted + } + return +} diff --git a/sessions/sessions.go b/sessions/sessions.go index 37d0dca4a..42c0013e3 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -42,10 +42,9 @@ const ( ) var ( - ErrPartiallyExecuted = errors.New("PARTIALLY_EXECUTED") - ErrActiveSession = errors.New("ACTIVE_SESSION") - ErrForcedDisconnect = errors.New("FORCED_DISCONNECT") - debug bool + ErrActiveSession = errors.New("ACTIVE_SESSION") + ErrForcedDisconnect = errors.New("FORCED_DISCONNECT") + debug bool ) // NewSReplConns initiates the connections configured for session replication