Reorganising dispatcherS routeID functionality to avoid extra processing of event

This commit is contained in:
DanB
2022-08-27 15:44:26 +02:00
parent b1c6f57c0c
commit 8d37467b63
5 changed files with 151 additions and 103 deletions

View File

@@ -47,7 +47,9 @@ func RunCGREngine(fs []string) (err error) {
return
}
if *flags.PidFile != utils.EmptyString {
services.CgrWritePid(*flags.PidFile)
if err = services.CgrWritePid(*flags.PidFile); err != nil {
return
}
}
if *flags.Singlecpu {
runtime.GOMAXPROCS(1) // Having multiple cpus may slow down computing due to CPU management, to be reviewed in future Go releases

View File

@@ -156,6 +156,30 @@ func (dS *DispatcherService) Dispatch(ctx *context.Context, ev *utils.CGREvent,
if tnt == utils.EmptyString {
tnt = dS.cfg.GeneralCfg().DefaultTenant
}
var dR *DispatcherRoute
var dPrfls engine.DispatcherProfiles
routeID := utils.IfaceAsString(ev.APIOpts[utils.OptsRouteID])
if routeID != utils.EmptyString { // overwrite routeID with RouteID:Subsystem for subsystem correct routing
routeID = utils.ConcatenatedKey(routeID, subsys)
// use previously discovered route
if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes,
routeID); ok && x != nil {
dR = x.(*DispatcherRoute)
// query the profile out of cached route
var dPrfl *engine.DispatcherProfile
if dPrfl, err = dS.dm.GetDispatcherProfile(ctx, dR.Tenant, dR.ProfileID,
true, true, utils.NonTransactional); err != nil {
if err != utils.ErrNotFound {
return
}
// profile was not found
utils.Logger.Warning(fmt.Sprintf("<%s> could not find profile with tenant: <%s> and ID <%s> for routeID: <%s>",
utils.DispatcherS, dR.Tenant, dR.ProfileID, routeID))
} else {
dPrfls = engine.DispatcherProfiles{dPrfl} // will be used as the list of routes
}
}
}
evNm := utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
@@ -164,9 +188,13 @@ func (dS *DispatcherService) Dispatch(ctx *context.Context, ev *utils.CGREvent,
utils.MetaMethod: serviceMethod,
},
}
var dPrfls engine.DispatcherProfiles
if dPrfls, err = dS.dispatcherProfilesForEvent(ctx, tnt, ev, evNm); err != nil {
return utils.NewErrDispatcherS(err)
if dPrfls == nil { // did not discover it yet
if dPrfls, err = dS.dispatcherProfilesForEvent(ctx, tnt, ev, evNm); err != nil {
return utils.NewErrDispatcherS(err)
}
}
if len(dPrfls) == 0 {
return utils.NewErrDispatcherS(utils.ErrPrefixNotFound("PROFILE"))
}
for _, dPrfl := range dPrfls {
tntID := dPrfl.TenantID()
@@ -177,13 +205,13 @@ func (dS *DispatcherService) Dispatch(ctx *context.Context, ev *utils.CGREvent,
d = x.(Dispatcher)
} else if d, err = newDispatcher(dPrfl); err != nil {
return utils.NewErrDispatcherS(err)
}
if err = engine.Cache.Set(ctx, utils.CacheDispatchers, tntID, d, nil, true, utils.EmptyString); err != nil {
} else if err = engine.Cache.Set(ctx, utils.CacheDispatchers, tntID, d, // cache the built Dispatcher
nil, true, utils.EmptyString); err != nil {
return utils.NewErrDispatcherS(err)
}
if err = d.Dispatch(dS.dm, dS.fltrS, dS.cfg,
ctx, dS.connMgr.GetDispInternalChan(), evNm, tnt, utils.IfaceAsString(ev.APIOpts[utils.OptsRouteID]),
subsys, serviceMethod, args, reply); !rpcclient.IsNetworkError(err) {
ctx, dS.connMgr.GetDispInternalChan(), evNm, tnt, routeID, dR,
serviceMethod, args, reply); !rpcclient.IsNetworkError(err) {
return
}
}

View File

@@ -38,13 +38,18 @@ func init() {
}
// DispatcherRoute is bounded to a routeID
type DispatcherRoute struct {
Tenant, ProfileID, HostID string
}
// Dispatcher is responsible for routing requests to pool of connections
// there will be different implementations based on strategy
type Dispatcher interface {
// Dispatch is used to send the method over the connections given
Dispatch(dm *engine.DataManager, flts *engine.FilterS, cfg *config.CGRConfig,
ctx *context.Context, iPRCCh chan birpc.ClientConnector,
ev utils.DataProvider, tnt, routeID, subsystem,
ev utils.DataProvider, tnt, routeID string, dR *DispatcherRoute,
serviceMethod string, args interface{}, reply interface{}) (err error)
}
@@ -72,7 +77,9 @@ func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) {
return
}
func getDispatcherHosts(fltrs *engine.FilterS, ev utils.DataProvider, ctx *context.Context, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) {
// getDispatcherHosts returns a list of host IDs matching the event with filters
func getDispatcherHosts(fltrs *engine.FilterS, ev utils.DataProvider,
ctx *context.Context, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) {
hostIDs = make(engine.DispatcherHostIDs, 0, len(hosts))
for _, host := range hosts {
var pass bool
@@ -89,28 +96,37 @@ func getDispatcherHosts(fltrs *engine.FilterS, ev utils.DataProvider, ctx *conte
return
}
// hostSorted is the sorting interface used by singleDispatcher
type hostSorter interface {
Sort(fltrs *engine.FilterS, ev utils.DataProvider, ctx *context.Context, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error)
Sort(fltrs *engine.FilterS, ev utils.DataProvider,
ctx *context.Context, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error)
}
// noSort will just return the matching hosts for the event.
type noSort struct{}
func (noSort) Sort(fltrs *engine.FilterS, ev utils.DataProvider, ctx *context.Context, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) {
func (noSort) Sort(fltrs *engine.FilterS, ev utils.DataProvider,
ctx *context.Context, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) {
return getDispatcherHosts(fltrs, ev, ctx, tnt, hosts)
}
// randomSort will randomize the matching hosts for the event
type randomSort struct{}
func (randomSort) Sort(fltrs *engine.FilterS, ev utils.DataProvider, ctx *context.Context, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) {
func (randomSort) Sort(fltrs *engine.FilterS, ev utils.DataProvider,
ctx *context.Context, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) {
rand.Shuffle(len(hosts), func(i, j int) {
hosts[i], hosts[j] = hosts[j], hosts[i]
})
return getDispatcherHosts(fltrs, ev, ctx, tnt, hosts)
}
// roundRoinSort will sort the matching hosts for the event in a round-robin fashion via nextIDx
// which will be increased on each Sort iteration
type roundRobinSort struct{ nextIDx int }
func (rs *roundRobinSort) Sort(fltrs *engine.FilterS, ev utils.DataProvider, ctx *context.Context, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) {
func (rs *roundRobinSort) Sort(fltrs *engine.FilterS, ev utils.DataProvider,
ctx *context.Context, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) {
dh := make(engine.DispatcherHostProfiles, len(hosts))
idx := rs.nextIDx
for i := 0; i < len(dh); i++ {
@@ -127,7 +143,9 @@ func (rs *roundRobinSort) Sort(fltrs *engine.FilterS, ev utils.DataProvider, ctx
return getDispatcherHosts(fltrs, ev, ctx, tnt, dh)
}
func newSingleDispatcher(hosts engine.DispatcherHostProfiles, params map[string]interface{}, tntID string, sorter hostSorter) (_ Dispatcher, err error) {
// newSingleDispatcher is the constructor for singleDispatcher struct.
func newSingleDispatcher(hosts engine.DispatcherHostProfiles, params map[string]interface{},
tntID string, sorter hostSorter) (_ Dispatcher, err error) {
if dflt, has := params[utils.MetaDefaultRatio]; has {
var ratio int64
if ratio, err = utils.IfaceAsTInt64(dflt); err != nil {
@@ -156,6 +174,8 @@ func newSingleDispatcher(hosts engine.DispatcherHostProfiles, params map[string]
}, nil
}
// singleResultDispatcher routes the event to a single host
// implements Dispatcher interface
type singleResultDispatcher struct {
sorter hostSorter
hosts engine.DispatcherHostProfiles
@@ -163,56 +183,46 @@ type singleResultDispatcher struct {
func (sd *singleResultDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS, cfg *config.CGRConfig,
ctx *context.Context, iPRCCh chan birpc.ClientConnector,
ev utils.DataProvider, tnt, routeID, subsystem string,
ev utils.DataProvider, tnt, routeID string, dR *DispatcherRoute,
serviceMethod string, args interface{}, reply interface{}) (err error) {
var dH *engine.DispatcherHost
if routeID != utils.EmptyString {
// overwrite routeID with RouteID:Subsystem
routeID = utils.ConcatenatedKey(routeID, subsystem)
// use previously discovered route
if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes,
routeID); ok && x != nil {
dH = x.(*engine.DispatcherHost)
if err = callDH(ctx, dH, cfg, iPRCCh, serviceMethod, args, reply); !rpcclient.IsNetworkError(err) {
return
}
if routeID != utils.EmptyString { // route to previously discovered route
if err = callDHwithID(ctx, tnt, dR.HostID, routeID, dR, dm,
cfg, iPRCCh, serviceMethod, args, reply); err == nil ||
(err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors
return
}
// not found or network errors will continue
utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with id <%q>",
utils.DispatcherS, err.Error(), dR.HostID))
}
var hostIDs []string
if hostIDs, err = sd.sorter.Sort(flts, ev, ctx, tnt, sd.hosts); err != nil {
return
} else if len(hostIDs) == 0 { // in case we do not match any host
return utils.ErrHostNotFound
}
var called bool
for _, hostID := range hostIDs {
if dH, err = dm.GetDispatcherHost(ctx, tnt, hostID, true, true, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
utils.Logger.Warning(fmt.Sprintf("<%s> could not find host with ID %q",
utils.DispatcherS, hostID))
err = nil
continue
}
err = utils.NewErrDispatcherS(err)
dRh := dR
if routeID != utils.EmptyString {
dRh.HostID = hostID
}
if err = callDHwithID(ctx, tnt, hostID, routeID, dRh, dm,
cfg, iPRCCh, serviceMethod, args, reply); err == nil ||
(err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors
return
}
called = true
if err = callDH(ctx, dH, cfg, iPRCCh, serviceMethod, args, reply); rpcclient.IsNetworkError(err) {
continue
if err != nil {
// not found or network errors will continue with standard dispatching
utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with id <%q>",
utils.DispatcherS, err.Error(), hostID))
}
if routeID != utils.EmptyString { // cache the discovered route
if err = engine.Cache.Set(ctx, utils.CacheDispatcherRoutes, routeID, dH,
nil, true, utils.EmptyString); err != nil {
return
}
}
break
}
if !called { // in case we do not match any host
err = utils.ErrHostNotFound
return
}
return
}
// broadcastDispatcher routes the event to multiple hosts in a pool
// implements the Dispatcher interface
type broadcastDispatcher struct {
strategy string
hosts engine.DispatcherHostProfiles
@@ -220,7 +230,7 @@ type broadcastDispatcher struct {
func (b *broadcastDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS, cfg *config.CGRConfig,
ctx *context.Context, iPRCCh chan birpc.ClientConnector,
ev utils.DataProvider, tnt, routeID, subsystem string,
ev utils.DataProvider, tnt, routeID string, dR *DispatcherRoute,
serviceMethod string, args interface{}, reply interface{}) (err error) {
var hostIDs []string
if hostIDs, err = getDispatcherHosts(flts, ev, ctx, tnt, b.hosts); err != nil {
@@ -240,7 +250,7 @@ func (b *broadcastDispatcher) Dispatch(dm *engine.DataManager, flts *engine.Filt
return utils.NewErrDispatcherS(err)
}
hasHosts = true
pool.AddClient(&lazzyDH{
pool.AddClient(&lazyDH{
dh: dH,
cfg: cfg,
iPRCCh: iPRCCh,
@@ -261,9 +271,9 @@ type loadDispatcher struct {
func (ld *loadDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS, cfg *config.CGRConfig,
ctx *context.Context, iPRCCh chan birpc.ClientConnector,
ev utils.DataProvider, tnt, routeID, subsystem string,
ev utils.DataProvider, tnt, routeID string, dR *DispatcherRoute,
serviceMethod string, args interface{}, reply interface{}) (err error) {
var dH *engine.DispatcherHost
var lM *LoadMetrics
if x, ok := engine.Cache.Get(utils.CacheDispatcherLoads, ld.tntID); ok && x != nil {
var canCast bool
@@ -273,57 +283,42 @@ func (ld *loadDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS,
} else if lM, err = newLoadMetrics(ld.hosts, ld.defaultRatio); err != nil {
return
}
if routeID != utils.EmptyString {
// overwrite routeID with RouteID:Subsystem
routeID = utils.ConcatenatedKey(routeID, subsystem)
// use previously discovered route
if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes,
routeID); ok && x != nil {
dH = x.(*engine.DispatcherHost)
lM.incrementLoad(ctx, dH.ID, ld.tntID)
err = callDH(ctx, dH, cfg, iPRCCh, serviceMethod, args, reply)
lM.decrementLoad(ctx, dH.ID, ld.tntID) // call ended
if !rpcclient.IsNetworkError(err) {
return
}
if routeID != utils.EmptyString { // route to previously discovered route
lM.incrementLoad(ctx, dR.HostID, ld.tntID)
err = callDHwithID(ctx, tnt, dR.HostID, routeID, dR, dm,
cfg, iPRCCh, serviceMethod, args, reply)
lM.decrementLoad(ctx, dR.HostID, ld.tntID) // call ended
if err == nil ||
(err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors
return
}
// not found or network errors will continue with standard dispatching
utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with id <%q>",
utils.DispatcherS, err.Error(), dR.HostID))
}
var hostIDs []string
if hostIDs, err = ld.sorter.Sort(flts, ev, ctx, tnt, lM.getHosts(ld.hosts)); err != nil {
return
} else if len(hostIDs) == 0 { // in case we do not match any host
return utils.ErrHostNotFound
}
var called bool
for _, hostID := range hostIDs {
if dH, err = dm.GetDispatcherHost(ctx, tnt, hostID, true, true, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
utils.Logger.Warning(fmt.Sprintf("<%s> could not find host with ID %q",
utils.DispatcherS, hostID))
err = nil
continue
}
err = utils.NewErrDispatcherS(err)
lM.incrementLoad(ctx, hostID, ld.tntID)
err = callDHwithID(ctx, tnt, hostID, routeID, dR, dm,
cfg, iPRCCh, serviceMethod, args, reply)
lM.decrementLoad(ctx, hostID, ld.tntID) // call ended
if err == nil ||
(err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors
return
}
called = true
lM.incrementLoad(ctx, hostID, ld.tntID)
err = callDH(ctx, dH, cfg, iPRCCh, serviceMethod, args, reply)
lM.decrementLoad(ctx, hostID, ld.tntID) // call ended
if rpcclient.IsNetworkError(err) {
continue
if err != nil {
// not found or network errors will continue with standard dispatching
utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with id <%q>",
utils.DispatcherS, err.Error(), hostID))
}
if routeID != utils.EmptyString { // cache the discovered route
if err = engine.Cache.Set(ctx, utils.CacheDispatcherRoutes, routeID, dH,
nil, true, utils.EmptyString); err != nil {
return
}
}
break
}
if !called { // in case we do not match any host
err = utils.ErrHostNotFound
return
}
return
}
@@ -401,6 +396,27 @@ func (lM *LoadMetrics) decrementLoad(ctx *context.Context, hostID, tntID string)
lM.mutex.Unlock()
}
// callDHwithID is a wrapper on callDH using ID of the host which the other cannot do due to lazyDH
// if routeID provided, will also cache once the call is successful
func callDHwithID(ctx *context.Context, tnt, hostID, routeID string, dR *DispatcherRoute,
dm *engine.DataManager, cfg *config.CGRConfig, iPRCCh chan birpc.ClientConnector,
serviceMethod string, args, reply interface{}) (err error) {
var dH *engine.DispatcherHost
if dH, err = dm.GetDispatcherHost(ctx, tnt, hostID, true, true, utils.NonTransactional); err != nil {
return
}
if err = callDH(ctx, dH, cfg, iPRCCh, serviceMethod, args, reply); err != nil {
return
}
if routeID != utils.EmptyString { // cache the discovered route
if err = engine.Cache.Set(ctx, utils.CacheDispatcherRoutes, routeID, dR,
nil, true, utils.EmptyString); err != nil {
return
}
}
return
}
func callDH(ctx *context.Context,
dh *engine.DispatcherHost, cfg *config.CGRConfig, iPRCCh chan birpc.ClientConnector,
method string, args, reply interface{}) (err error) {
@@ -411,12 +427,13 @@ func callDH(ctx *context.Context,
return conn.Call(ctx, method, args, reply)
}
type lazzyDH struct {
// lazyDH is created for the broadcast strategy so we can make sure host exists during setup phase
type lazyDH struct {
dh *engine.DispatcherHost
cfg *config.CGRConfig
iPRCCh chan birpc.ClientConnector
}
func (l *lazzyDH) Call(ctx *context.Context, method string, args, reply interface{}) error {
func (l *lazyDH) Call(ctx *context.Context, method string, args, reply interface{}) error {
return callDH(ctx, l.dh, l.cfg, l.iPRCCh, method, args, reply)
}

View File

@@ -40,12 +40,13 @@ import (
"github.com/cgrates/rpcclient"
)
func NewCGREngine(cfg *config.CGRConfig, cM *engine.ConnManager, shdWg *sync.WaitGroup, server *cores.Server, caps *engine.Caps) *CGREngine {
func NewCGREngine(cfg *config.CGRConfig, cM *engine.ConnManager, shdWg *sync.WaitGroup,
server *cores.Server, caps *engine.Caps) *CGREngine {
return &CGREngine{
cfg: cfg,
cM: cM,
caps: caps,
shdWg: shdWg,
cfg: cfg, // Engine configuration
cM: cM, // connection manager
caps: caps, // caps is used to limit RPC CPS
shdWg: shdWg, // wait for shutdown
srvManager: servmanager.NewServiceManager(shdWg, cM, cfg.GetReloadChan()),
server: server, // Rpc/http server
srvDep: map[string]*sync.WaitGroup{

View File

@@ -40,12 +40,12 @@ func NewServiceManager(shdWg *sync.WaitGroup, connMgr *engine.ConnManager, rldCh
// ServiceManager handles service management ran by the engine
type ServiceManager struct {
sync.RWMutex // lock access to any shared data
subsystems map[string]Service
sync.RWMutex // lock access to any shared data
subsystems map[string]Service // active subsystems managed by SM
shdWg *sync.WaitGroup
shdWg *sync.WaitGroup // list of shutdown items
connMgr *engine.ConnManager
rldChan <-chan string
rldChan <-chan string // reload signals come over this channelc
}
// StartServices starts all enabled services