DispatcherS.Dispatch method together with sample call in dispatchers/attributes.go

This commit is contained in:
DanB
2019-01-31 20:27:01 +01:00
parent e07ba0a3bb
commit d4d9939feb
4 changed files with 82 additions and 18 deletions

View File

@@ -18,32 +18,30 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package dispatchers
/*
import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
/*
func (dS *DispatcherService) AttributeSv1Ping(ign string, reply *string) error {
if dS.attrS == nil {
return utils.NewErrNotConnected(utils.AttributeS)
}
return dS.attrS.Call(utils.AttributeSv1Ping, ign, reply)
return dS.Dispatch(nil, utils.MetaAttributes,
utils.AttributeSv1Ping, ign, reply)
}
*/
func (dS *DispatcherService) AttributeSv1GetAttributeForEvent(args *ArgsAttrProcessEventWithApiKey,
reply *engine.AttributeProfile) (err error) {
if dS.attrS == nil {
return utils.NewErrNotConnected(utils.AttributeS)
}
if err = dS.authorize(utils.AttributeSv1GetAttributeForEvent, args.AttrArgsProcessEvent.CGREvent.Tenant,
args.APIKey, args.AttrArgsProcessEvent.CGREvent.Time); err != nil {
return
}
return dS.attrS.Call(utils.AttributeSv1GetAttributeForEvent, args.AttrArgsProcessEvent, reply)
//if err = dS.authorize(utils.AttributeSv1GetAttributeForEvent, args.AttrArgsProcessEvent.CGREvent.Tenant,
// args.APIKey, args.AttrArgsProcessEvent.CGREvent.Time); err != nil {
// return
//}
return dS.Dispatch(&args.CGREvent, utils.MetaAttributes,
utils.AttributeSv1GetAttributeForEvent, args.AttrArgsProcessEvent, reply)
}
/*
func (dS *DispatcherService) AttributeSv1ProcessEvent(args *ArgsAttrProcessEventWithApiKey,
reply *engine.AttrSProcessEventReply) (err error) {
if dS.attrS == nil {

View File

@@ -24,6 +24,7 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// NewDispatcherService initializes a DispatcherService
@@ -40,6 +41,7 @@ type DispatcherService struct {
filterS *engine.FilterS
stringIndexedFields *[]string
prefixIndexedFields *[]string
conns map[string]*rpcclient.RpcClientPool // available connections, accessed based on connID
}
// ListenAndServe will initialize the service
@@ -89,6 +91,7 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent,
}
return nil, err
}
}
if prfl.ActivationInterval != nil && ev.Time != nil &&
!prfl.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active
@@ -129,3 +132,24 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent,
true, utils.EmptyString)
return
}
// Dispatch is the method forwarding the request towards the right
func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
d, errDsp := dS.dispatcherForEvent(ev, subsys)
if errDsp != nil {
return utils.NewErrDispatcherS(errDsp)
}
for i := 0; i < d.MaxConns(); i++ {
connID := d.NextConnID()
conn, has := dS.conns[connID]
if !has {
utils.NewErrDispatcherS(
fmt.Errorf("no connection with id: <%s>", connID))
}
if err = conn.Call(serviceMethod, args, reply); !utils.IsNetworkError(err) {
break
}
}
return
}

View File

@@ -33,6 +33,8 @@ type Dispatcher interface {
SetProfile(pfl *engine.DispatcherProfile)
// GetConnID returns an ordered list of connection IDs for the event
NextConnID() (connID string)
// MaxConns returns the maximum number of connections available in the pool
MaxConns() int
}
// newDispatcher constructs instances of Dispatcher
@@ -67,3 +69,7 @@ func (wd *WeightDispatcher) NextConnID() (connID string) {
}
return
}
func (wd *WeightDispatcher) MaxConns() int {
return len(wd.pfl.Conns)
}

View File

@@ -21,7 +21,10 @@ package utils
import (
"errors"
"fmt"
"net"
"net/rpc"
"strings"
"syscall"
)
var (
@@ -57,12 +60,22 @@ var (
ErrUnauthorizedApi = errors.New("UNAUTHORIZED_API")
ErrUnknownApiKey = errors.New("UNKNOWN_API_KEY")
ErrIncompatible = errors.New("INCOMPATIBLE")
ErrReqUnsynchronized = errors.New("REQ_UNSYNCHRONIZED")
ErrUnsupporteServiceMethod = errors.New("UNSUPPORTED_SERVICE_METHOD")
ErrWrongArgsType = errors.New("WRONG_ARGS_TYPE")
ErrWrongReplyType = errors.New("WRONG_REPLY_TYPE")
ErrDisconnected = errors.New("DISCONNECTED")
ErrReplyTimeout = errors.New("REPLY_TIMEOUT")
ErrFailedReconnect = errors.New("FAILED_RECONNECT")
ErrInternallyDisconnected = errors.New("INTERNALLY_DISCONNECTED")
ErrUnsupportedCodec = errors.New("UNSUPPORTED_CODEC")
ErrSessionNotFound = errors.New("SESSION_NOT_FOUND")
ErrJsonIncompleteComment = errors.New("JSON_INCOMPLETE_COMMENT")
ErrCDRCNoProfileID = errors.New("CDRC_PROFILE_WITHOUT_ID")
ErrCDRCNoInDir = errors.New("CDRC_PROFILE_WITHOUT_IN_DIR")
ErrNotEnoughParameters = errors.New("NotEnoughParameters")
RalsErrorPrfx = "RALS_ERROR"
ErrJsonIncompleteComment = errors.New("JSON_INCOMPLETE_COMMENT")
ErrCDRCNoProfileID = errors.New("CDRC_PROFILE_WITHOUT_ID")
ErrCDRCNoInDir = errors.New("CDRC_PROFILE_WITHOUT_IN_DIR")
ErrNotEnoughParameters = errors.New("NotEnoughParameters")
DispatcherErrorPrefix = "DISPATCHER_ERROR"
)
// NewCGRError initialises a new CGRError
@@ -133,6 +146,10 @@ func NewErrAttributeS(err error) error {
return fmt.Errorf("ATTRIBUTES_ERROR:%s", err)
}
func NewErrDispatcherS(err error) error {
return fmt.Errorf("%s:%s", DispatcherErrorPrefix, err.Error())
}
// Centralized returns for APIs
func APIErrorHandler(errIn error) (err error) {
cgrErr, ok := errIn.(*CGRError)
@@ -177,3 +194,22 @@ func ErrPrefixNotErrNotImplemented(reason string) error {
func ErrEnvNotFound(key string) error {
return ErrPrefix(ErrNotFound, "ENV_VAR:"+key)
}
// IsNetworkError will decide if an error is network generated or RPC one
// used by Dispatcher to figure out whether it should try another connection
func IsNetworkError(err error) bool {
if err == nil {
return false
}
if operr, ok := err.(*net.OpError); ok &&
strings.HasSuffix(operr.Err.Error(),
syscall.ECONNRESET.Error()) { // connection reset
return true
}
return err == rpc.ErrShutdown ||
err == ErrReqUnsynchronized ||
err == ErrDisconnected ||
err == ErrReplyTimeout ||
err.Error() == ErrSessionNotFound.Error() ||
strings.HasPrefix(err.Error(), "rpc: can't find service")
}