From d4d9939febf6f16ba13583be3038b77c5cc78d55 Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 31 Jan 2019 20:27:01 +0100 Subject: [PATCH] DispatcherS.Dispatch method together with sample call in dispatchers/attributes.go --- dispatchers/attributes.go | 24 +++++++++---------- dispatchers/dispatchers.go | 24 +++++++++++++++++++ dispatchers/libdispatcher.go | 6 +++++ utils/errors.go | 46 ++++++++++++++++++++++++++++++++---- 4 files changed, 82 insertions(+), 18 deletions(-) diff --git a/dispatchers/attributes.go b/dispatchers/attributes.go index 55ef76395..40b295ec1 100755 --- a/dispatchers/attributes.go +++ b/dispatchers/attributes.go @@ -18,32 +18,30 @@ along with this program. If not, see package dispatchers -/* import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) +/* func (dS *DispatcherService) AttributeSv1Ping(ign string, reply *string) error { - if dS.attrS == nil { - return utils.NewErrNotConnected(utils.AttributeS) - } - return dS.attrS.Call(utils.AttributeSv1Ping, ign, reply) + return dS.Dispatch(nil, utils.MetaAttributes, + utils.AttributeSv1Ping, ign, reply) } +*/ func (dS *DispatcherService) AttributeSv1GetAttributeForEvent(args *ArgsAttrProcessEventWithApiKey, reply *engine.AttributeProfile) (err error) { - if dS.attrS == nil { - return utils.NewErrNotConnected(utils.AttributeS) - } - if err = dS.authorize(utils.AttributeSv1GetAttributeForEvent, args.AttrArgsProcessEvent.CGREvent.Tenant, - args.APIKey, args.AttrArgsProcessEvent.CGREvent.Time); err != nil { - return - } - return dS.attrS.Call(utils.AttributeSv1GetAttributeForEvent, args.AttrArgsProcessEvent, reply) + //if err = dS.authorize(utils.AttributeSv1GetAttributeForEvent, args.AttrArgsProcessEvent.CGREvent.Tenant, + // args.APIKey, args.AttrArgsProcessEvent.CGREvent.Time); err != nil { + // return + //} + return dS.Dispatch(&args.CGREvent, utils.MetaAttributes, + utils.AttributeSv1GetAttributeForEvent, args.AttrArgsProcessEvent, reply) } +/* func (dS *DispatcherService) AttributeSv1ProcessEvent(args *ArgsAttrProcessEventWithApiKey, reply *engine.AttrSProcessEventReply) (err error) { if dS.attrS == nil { diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go index bdb239d8b..29f6d5ec9 100755 --- a/dispatchers/dispatchers.go +++ b/dispatchers/dispatchers.go @@ -24,6 +24,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) // NewDispatcherService initializes a DispatcherService @@ -40,6 +41,7 @@ type DispatcherService struct { filterS *engine.FilterS stringIndexedFields *[]string prefixIndexedFields *[]string + conns map[string]*rpcclient.RpcClientPool // available connections, accessed based on connID } // ListenAndServe will initialize the service @@ -89,6 +91,7 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent, } return nil, err } + } if prfl.ActivationInterval != nil && ev.Time != nil && !prfl.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active @@ -129,3 +132,24 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent, true, utils.EmptyString) return } + +// Dispatch is the method forwarding the request towards the right +func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, + serviceMethod string, args interface{}, reply interface{}) (err error) { + d, errDsp := dS.dispatcherForEvent(ev, subsys) + if errDsp != nil { + return utils.NewErrDispatcherS(errDsp) + } + for i := 0; i < d.MaxConns(); i++ { + connID := d.NextConnID() + conn, has := dS.conns[connID] + if !has { + utils.NewErrDispatcherS( + fmt.Errorf("no connection with id: <%s>", connID)) + } + if err = conn.Call(serviceMethod, args, reply); !utils.IsNetworkError(err) { + break + } + } + return +} diff --git a/dispatchers/libdispatcher.go b/dispatchers/libdispatcher.go index 82c9d5325..fe3ee5a82 100644 --- a/dispatchers/libdispatcher.go +++ b/dispatchers/libdispatcher.go @@ -33,6 +33,8 @@ type Dispatcher interface { SetProfile(pfl *engine.DispatcherProfile) // GetConnID returns an ordered list of connection IDs for the event NextConnID() (connID string) + // MaxConns returns the maximum number of connections available in the pool + MaxConns() int } // newDispatcher constructs instances of Dispatcher @@ -67,3 +69,7 @@ func (wd *WeightDispatcher) NextConnID() (connID string) { } return } + +func (wd *WeightDispatcher) MaxConns() int { + return len(wd.pfl.Conns) +} diff --git a/utils/errors.go b/utils/errors.go index d460017ae..5a52e916f 100644 --- a/utils/errors.go +++ b/utils/errors.go @@ -21,7 +21,10 @@ package utils import ( "errors" "fmt" + "net" + "net/rpc" "strings" + "syscall" ) var ( @@ -57,12 +60,22 @@ var ( ErrUnauthorizedApi = errors.New("UNAUTHORIZED_API") ErrUnknownApiKey = errors.New("UNKNOWN_API_KEY") ErrIncompatible = errors.New("INCOMPATIBLE") + ErrReqUnsynchronized = errors.New("REQ_UNSYNCHRONIZED") + ErrUnsupporteServiceMethod = errors.New("UNSUPPORTED_SERVICE_METHOD") + ErrWrongArgsType = errors.New("WRONG_ARGS_TYPE") + ErrWrongReplyType = errors.New("WRONG_REPLY_TYPE") + ErrDisconnected = errors.New("DISCONNECTED") + ErrReplyTimeout = errors.New("REPLY_TIMEOUT") + ErrFailedReconnect = errors.New("FAILED_RECONNECT") + ErrInternallyDisconnected = errors.New("INTERNALLY_DISCONNECTED") + ErrUnsupportedCodec = errors.New("UNSUPPORTED_CODEC") + ErrSessionNotFound = errors.New("SESSION_NOT_FOUND") + ErrJsonIncompleteComment = errors.New("JSON_INCOMPLETE_COMMENT") + ErrCDRCNoProfileID = errors.New("CDRC_PROFILE_WITHOUT_ID") + ErrCDRCNoInDir = errors.New("CDRC_PROFILE_WITHOUT_IN_DIR") + ErrNotEnoughParameters = errors.New("NotEnoughParameters") RalsErrorPrfx = "RALS_ERROR" - - ErrJsonIncompleteComment = errors.New("JSON_INCOMPLETE_COMMENT") - ErrCDRCNoProfileID = errors.New("CDRC_PROFILE_WITHOUT_ID") - ErrCDRCNoInDir = errors.New("CDRC_PROFILE_WITHOUT_IN_DIR") - ErrNotEnoughParameters = errors.New("NotEnoughParameters") + DispatcherErrorPrefix = "DISPATCHER_ERROR" ) // NewCGRError initialises a new CGRError @@ -133,6 +146,10 @@ func NewErrAttributeS(err error) error { return fmt.Errorf("ATTRIBUTES_ERROR:%s", err) } +func NewErrDispatcherS(err error) error { + return fmt.Errorf("%s:%s", DispatcherErrorPrefix, err.Error()) +} + // Centralized returns for APIs func APIErrorHandler(errIn error) (err error) { cgrErr, ok := errIn.(*CGRError) @@ -177,3 +194,22 @@ func ErrPrefixNotErrNotImplemented(reason string) error { func ErrEnvNotFound(key string) error { return ErrPrefix(ErrNotFound, "ENV_VAR:"+key) } + +// IsNetworkError will decide if an error is network generated or RPC one +// used by Dispatcher to figure out whether it should try another connection +func IsNetworkError(err error) bool { + if err == nil { + return false + } + if operr, ok := err.(*net.OpError); ok && + strings.HasSuffix(operr.Err.Error(), + syscall.ECONNRESET.Error()) { // connection reset + return true + } + return err == rpc.ErrShutdown || + err == ErrReqUnsynchronized || + err == ErrDisconnected || + err == ErrReplyTimeout || + err.Error() == ErrSessionNotFound.Error() || + strings.HasPrefix(err.Error(), "rpc: can't find service") +}