From 8d37467b63d589f443a5ca4c3a80e48c2ac16697 Mon Sep 17 00:00:00 2001 From: DanB Date: Sat, 27 Aug 2022 15:44:26 +0200 Subject: [PATCH] Reorganising dispatcherS routeID functionality to avoid extra processing of event --- cmd/cgr-engine/cgr-engine.go | 4 +- dispatchers/dispatchers.go | 42 ++++++-- dispatchers/libdispatcher.go | 189 +++++++++++++++++++---------------- services/cgr-engine.go | 11 +- servmanager/servmanager.go | 8 +- 5 files changed, 151 insertions(+), 103 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 4a716f7e8..73ca7af25 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -47,7 +47,9 @@ func RunCGREngine(fs []string) (err error) { return } if *flags.PidFile != utils.EmptyString { - services.CgrWritePid(*flags.PidFile) + if err = services.CgrWritePid(*flags.PidFile); err != nil { + return + } } if *flags.Singlecpu { runtime.GOMAXPROCS(1) // Having multiple cpus may slow down computing due to CPU management, to be reviewed in future Go releases diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go index 8ed90370f..33e10dfc3 100644 --- a/dispatchers/dispatchers.go +++ b/dispatchers/dispatchers.go @@ -156,6 +156,30 @@ func (dS *DispatcherService) Dispatch(ctx *context.Context, ev *utils.CGREvent, if tnt == utils.EmptyString { tnt = dS.cfg.GeneralCfg().DefaultTenant } + var dR *DispatcherRoute + var dPrfls engine.DispatcherProfiles + routeID := utils.IfaceAsString(ev.APIOpts[utils.OptsRouteID]) + if routeID != utils.EmptyString { // overwrite routeID with RouteID:Subsystem for subsystem correct routing + routeID = utils.ConcatenatedKey(routeID, subsys) + // use previously discovered route + if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes, + routeID); ok && x != nil { + dR = x.(*DispatcherRoute) + // query the profile out of cached route + var dPrfl *engine.DispatcherProfile + if dPrfl, err = dS.dm.GetDispatcherProfile(ctx, dR.Tenant, dR.ProfileID, + true, true, utils.NonTransactional); err != nil { + if err != utils.ErrNotFound { + return + } + // profile was not found + utils.Logger.Warning(fmt.Sprintf("<%s> could not find profile with tenant: <%s> and ID <%s> for routeID: <%s>", + utils.DispatcherS, dR.Tenant, dR.ProfileID, routeID)) + } else { + dPrfls = engine.DispatcherProfiles{dPrfl} // will be used as the list of routes + } + } + } evNm := utils.MapStorage{ utils.MetaReq: ev.Event, utils.MetaOpts: ev.APIOpts, @@ -164,9 +188,13 @@ func (dS *DispatcherService) Dispatch(ctx *context.Context, ev *utils.CGREvent, utils.MetaMethod: serviceMethod, }, } - var dPrfls engine.DispatcherProfiles - if dPrfls, err = dS.dispatcherProfilesForEvent(ctx, tnt, ev, evNm); err != nil { - return utils.NewErrDispatcherS(err) + if dPrfls == nil { // did not discover it yet + if dPrfls, err = dS.dispatcherProfilesForEvent(ctx, tnt, ev, evNm); err != nil { + return utils.NewErrDispatcherS(err) + } + } + if len(dPrfls) == 0 { + return utils.NewErrDispatcherS(utils.ErrPrefixNotFound("PROFILE")) } for _, dPrfl := range dPrfls { tntID := dPrfl.TenantID() @@ -177,13 +205,13 @@ func (dS *DispatcherService) Dispatch(ctx *context.Context, ev *utils.CGREvent, d = x.(Dispatcher) } else if d, err = newDispatcher(dPrfl); err != nil { return utils.NewErrDispatcherS(err) - } - if err = engine.Cache.Set(ctx, utils.CacheDispatchers, tntID, d, nil, true, utils.EmptyString); err != nil { + } else if err = engine.Cache.Set(ctx, utils.CacheDispatchers, tntID, d, // cache the built Dispatcher + nil, true, utils.EmptyString); err != nil { return utils.NewErrDispatcherS(err) } if err = d.Dispatch(dS.dm, dS.fltrS, dS.cfg, - ctx, dS.connMgr.GetDispInternalChan(), evNm, tnt, utils.IfaceAsString(ev.APIOpts[utils.OptsRouteID]), - subsys, serviceMethod, args, reply); !rpcclient.IsNetworkError(err) { + ctx, dS.connMgr.GetDispInternalChan(), evNm, tnt, routeID, dR, + serviceMethod, args, reply); !rpcclient.IsNetworkError(err) { return } } diff --git a/dispatchers/libdispatcher.go b/dispatchers/libdispatcher.go index cf6d94053..25653ef8d 100644 --- a/dispatchers/libdispatcher.go +++ b/dispatchers/libdispatcher.go @@ -38,13 +38,18 @@ func init() { } +// DispatcherRoute is bounded to a routeID +type DispatcherRoute struct { + Tenant, ProfileID, HostID string +} + // Dispatcher is responsible for routing requests to pool of connections // there will be different implementations based on strategy type Dispatcher interface { // Dispatch is used to send the method over the connections given Dispatch(dm *engine.DataManager, flts *engine.FilterS, cfg *config.CGRConfig, ctx *context.Context, iPRCCh chan birpc.ClientConnector, - ev utils.DataProvider, tnt, routeID, subsystem, + ev utils.DataProvider, tnt, routeID string, dR *DispatcherRoute, serviceMethod string, args interface{}, reply interface{}) (err error) } @@ -72,7 +77,9 @@ func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) { return } -func getDispatcherHosts(fltrs *engine.FilterS, ev utils.DataProvider, ctx *context.Context, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) { +// getDispatcherHosts returns a list of host IDs matching the event with filters +func getDispatcherHosts(fltrs *engine.FilterS, ev utils.DataProvider, + ctx *context.Context, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) { hostIDs = make(engine.DispatcherHostIDs, 0, len(hosts)) for _, host := range hosts { var pass bool @@ -89,28 +96,37 @@ func getDispatcherHosts(fltrs *engine.FilterS, ev utils.DataProvider, ctx *conte return } +// hostSorted is the sorting interface used by singleDispatcher type hostSorter interface { - Sort(fltrs *engine.FilterS, ev utils.DataProvider, ctx *context.Context, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) + Sort(fltrs *engine.FilterS, ev utils.DataProvider, + ctx *context.Context, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) } +// noSort will just return the matching hosts for the event. type noSort struct{} -func (noSort) Sort(fltrs *engine.FilterS, ev utils.DataProvider, ctx *context.Context, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) { +func (noSort) Sort(fltrs *engine.FilterS, ev utils.DataProvider, + ctx *context.Context, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) { return getDispatcherHosts(fltrs, ev, ctx, tnt, hosts) } +// randomSort will randomize the matching hosts for the event type randomSort struct{} -func (randomSort) Sort(fltrs *engine.FilterS, ev utils.DataProvider, ctx *context.Context, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) { +func (randomSort) Sort(fltrs *engine.FilterS, ev utils.DataProvider, + ctx *context.Context, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) { rand.Shuffle(len(hosts), func(i, j int) { hosts[i], hosts[j] = hosts[j], hosts[i] }) return getDispatcherHosts(fltrs, ev, ctx, tnt, hosts) } +// roundRoinSort will sort the matching hosts for the event in a round-robin fashion via nextIDx +// which will be increased on each Sort iteration type roundRobinSort struct{ nextIDx int } -func (rs *roundRobinSort) Sort(fltrs *engine.FilterS, ev utils.DataProvider, ctx *context.Context, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) { +func (rs *roundRobinSort) Sort(fltrs *engine.FilterS, ev utils.DataProvider, + ctx *context.Context, tnt string, hosts engine.DispatcherHostProfiles) (hostIDs engine.DispatcherHostIDs, err error) { dh := make(engine.DispatcherHostProfiles, len(hosts)) idx := rs.nextIDx for i := 0; i < len(dh); i++ { @@ -127,7 +143,9 @@ func (rs *roundRobinSort) Sort(fltrs *engine.FilterS, ev utils.DataProvider, ctx return getDispatcherHosts(fltrs, ev, ctx, tnt, dh) } -func newSingleDispatcher(hosts engine.DispatcherHostProfiles, params map[string]interface{}, tntID string, sorter hostSorter) (_ Dispatcher, err error) { +// newSingleDispatcher is the constructor for singleDispatcher struct. +func newSingleDispatcher(hosts engine.DispatcherHostProfiles, params map[string]interface{}, + tntID string, sorter hostSorter) (_ Dispatcher, err error) { if dflt, has := params[utils.MetaDefaultRatio]; has { var ratio int64 if ratio, err = utils.IfaceAsTInt64(dflt); err != nil { @@ -156,6 +174,8 @@ func newSingleDispatcher(hosts engine.DispatcherHostProfiles, params map[string] }, nil } +// singleResultDispatcher routes the event to a single host +// implements Dispatcher interface type singleResultDispatcher struct { sorter hostSorter hosts engine.DispatcherHostProfiles @@ -163,56 +183,46 @@ type singleResultDispatcher struct { func (sd *singleResultDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS, cfg *config.CGRConfig, ctx *context.Context, iPRCCh chan birpc.ClientConnector, - ev utils.DataProvider, tnt, routeID, subsystem string, + ev utils.DataProvider, tnt, routeID string, dR *DispatcherRoute, serviceMethod string, args interface{}, reply interface{}) (err error) { - var dH *engine.DispatcherHost - if routeID != utils.EmptyString { - // overwrite routeID with RouteID:Subsystem - routeID = utils.ConcatenatedKey(routeID, subsystem) - // use previously discovered route - if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes, - routeID); ok && x != nil { - dH = x.(*engine.DispatcherHost) - if err = callDH(ctx, dH, cfg, iPRCCh, serviceMethod, args, reply); !rpcclient.IsNetworkError(err) { - return - } + if routeID != utils.EmptyString { // route to previously discovered route + if err = callDHwithID(ctx, tnt, dR.HostID, routeID, dR, dm, + cfg, iPRCCh, serviceMethod, args, reply); err == nil || + (err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors + return } + // not found or network errors will continue + utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with id <%q>", + utils.DispatcherS, err.Error(), dR.HostID)) } var hostIDs []string if hostIDs, err = sd.sorter.Sort(flts, ev, ctx, tnt, sd.hosts); err != nil { return + } else if len(hostIDs) == 0 { // in case we do not match any host + return utils.ErrHostNotFound } - var called bool for _, hostID := range hostIDs { - if dH, err = dm.GetDispatcherHost(ctx, tnt, hostID, true, true, utils.NonTransactional); err != nil { - if err == utils.ErrNotFound { - utils.Logger.Warning(fmt.Sprintf("<%s> could not find host with ID %q", - utils.DispatcherS, hostID)) - err = nil - continue - } - err = utils.NewErrDispatcherS(err) + dRh := dR + if routeID != utils.EmptyString { + dRh.HostID = hostID + } + if err = callDHwithID(ctx, tnt, hostID, routeID, dRh, dm, + cfg, iPRCCh, serviceMethod, args, reply); err == nil || + (err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors return } - called = true - if err = callDH(ctx, dH, cfg, iPRCCh, serviceMethod, args, reply); rpcclient.IsNetworkError(err) { - continue + if err != nil { + // not found or network errors will continue with standard dispatching + utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with id <%q>", + utils.DispatcherS, err.Error(), hostID)) } - if routeID != utils.EmptyString { // cache the discovered route - if err = engine.Cache.Set(ctx, utils.CacheDispatcherRoutes, routeID, dH, - nil, true, utils.EmptyString); err != nil { - return - } - } - break - } - if !called { // in case we do not match any host - err = utils.ErrHostNotFound return } return } +// broadcastDispatcher routes the event to multiple hosts in a pool +// implements the Dispatcher interface type broadcastDispatcher struct { strategy string hosts engine.DispatcherHostProfiles @@ -220,7 +230,7 @@ type broadcastDispatcher struct { func (b *broadcastDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS, cfg *config.CGRConfig, ctx *context.Context, iPRCCh chan birpc.ClientConnector, - ev utils.DataProvider, tnt, routeID, subsystem string, + ev utils.DataProvider, tnt, routeID string, dR *DispatcherRoute, serviceMethod string, args interface{}, reply interface{}) (err error) { var hostIDs []string if hostIDs, err = getDispatcherHosts(flts, ev, ctx, tnt, b.hosts); err != nil { @@ -240,7 +250,7 @@ func (b *broadcastDispatcher) Dispatch(dm *engine.DataManager, flts *engine.Filt return utils.NewErrDispatcherS(err) } hasHosts = true - pool.AddClient(&lazzyDH{ + pool.AddClient(&lazyDH{ dh: dH, cfg: cfg, iPRCCh: iPRCCh, @@ -261,9 +271,9 @@ type loadDispatcher struct { func (ld *loadDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS, cfg *config.CGRConfig, ctx *context.Context, iPRCCh chan birpc.ClientConnector, - ev utils.DataProvider, tnt, routeID, subsystem string, + ev utils.DataProvider, tnt, routeID string, dR *DispatcherRoute, serviceMethod string, args interface{}, reply interface{}) (err error) { - var dH *engine.DispatcherHost + var lM *LoadMetrics if x, ok := engine.Cache.Get(utils.CacheDispatcherLoads, ld.tntID); ok && x != nil { var canCast bool @@ -273,57 +283,42 @@ func (ld *loadDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS, } else if lM, err = newLoadMetrics(ld.hosts, ld.defaultRatio); err != nil { return } - - if routeID != utils.EmptyString { - // overwrite routeID with RouteID:Subsystem - routeID = utils.ConcatenatedKey(routeID, subsystem) - // use previously discovered route - if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes, - routeID); ok && x != nil { - dH = x.(*engine.DispatcherHost) - lM.incrementLoad(ctx, dH.ID, ld.tntID) - err = callDH(ctx, dH, cfg, iPRCCh, serviceMethod, args, reply) - lM.decrementLoad(ctx, dH.ID, ld.tntID) // call ended - if !rpcclient.IsNetworkError(err) { - return - } + if routeID != utils.EmptyString { // route to previously discovered route + lM.incrementLoad(ctx, dR.HostID, ld.tntID) + err = callDHwithID(ctx, tnt, dR.HostID, routeID, dR, dm, + cfg, iPRCCh, serviceMethod, args, reply) + lM.decrementLoad(ctx, dR.HostID, ld.tntID) // call ended + if err == nil || + (err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors + return } + // not found or network errors will continue with standard dispatching + utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with id <%q>", + utils.DispatcherS, err.Error(), dR.HostID)) } var hostIDs []string if hostIDs, err = ld.sorter.Sort(flts, ev, ctx, tnt, lM.getHosts(ld.hosts)); err != nil { return + } else if len(hostIDs) == 0 { // in case we do not match any host + return utils.ErrHostNotFound } - var called bool for _, hostID := range hostIDs { - if dH, err = dm.GetDispatcherHost(ctx, tnt, hostID, true, true, utils.NonTransactional); err != nil { - if err == utils.ErrNotFound { - utils.Logger.Warning(fmt.Sprintf("<%s> could not find host with ID %q", - utils.DispatcherS, hostID)) - err = nil - continue - } - err = utils.NewErrDispatcherS(err) + lM.incrementLoad(ctx, hostID, ld.tntID) + err = callDHwithID(ctx, tnt, hostID, routeID, dR, dm, + cfg, iPRCCh, serviceMethod, args, reply) + lM.decrementLoad(ctx, hostID, ld.tntID) // call ended + if err == nil || + (err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors return } - called = true - lM.incrementLoad(ctx, hostID, ld.tntID) - err = callDH(ctx, dH, cfg, iPRCCh, serviceMethod, args, reply) - lM.decrementLoad(ctx, hostID, ld.tntID) // call ended - if rpcclient.IsNetworkError(err) { - continue + if err != nil { + // not found or network errors will continue with standard dispatching + utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with id <%q>", + utils.DispatcherS, err.Error(), hostID)) } - if routeID != utils.EmptyString { // cache the discovered route - if err = engine.Cache.Set(ctx, utils.CacheDispatcherRoutes, routeID, dH, - nil, true, utils.EmptyString); err != nil { - return - } - } - break - } - if !called { // in case we do not match any host - err = utils.ErrHostNotFound return } + return } @@ -401,6 +396,27 @@ func (lM *LoadMetrics) decrementLoad(ctx *context.Context, hostID, tntID string) lM.mutex.Unlock() } +// callDHwithID is a wrapper on callDH using ID of the host which the other cannot do due to lazyDH +// if routeID provided, will also cache once the call is successful +func callDHwithID(ctx *context.Context, tnt, hostID, routeID string, dR *DispatcherRoute, + dm *engine.DataManager, cfg *config.CGRConfig, iPRCCh chan birpc.ClientConnector, + serviceMethod string, args, reply interface{}) (err error) { + var dH *engine.DispatcherHost + if dH, err = dm.GetDispatcherHost(ctx, tnt, hostID, true, true, utils.NonTransactional); err != nil { + return + } + if err = callDH(ctx, dH, cfg, iPRCCh, serviceMethod, args, reply); err != nil { + return + } + if routeID != utils.EmptyString { // cache the discovered route + if err = engine.Cache.Set(ctx, utils.CacheDispatcherRoutes, routeID, dR, + nil, true, utils.EmptyString); err != nil { + return + } + } + return +} + func callDH(ctx *context.Context, dh *engine.DispatcherHost, cfg *config.CGRConfig, iPRCCh chan birpc.ClientConnector, method string, args, reply interface{}) (err error) { @@ -411,12 +427,13 @@ func callDH(ctx *context.Context, return conn.Call(ctx, method, args, reply) } -type lazzyDH struct { +// lazyDH is created for the broadcast strategy so we can make sure host exists during setup phase +type lazyDH struct { dh *engine.DispatcherHost cfg *config.CGRConfig iPRCCh chan birpc.ClientConnector } -func (l *lazzyDH) Call(ctx *context.Context, method string, args, reply interface{}) error { +func (l *lazyDH) Call(ctx *context.Context, method string, args, reply interface{}) error { return callDH(ctx, l.dh, l.cfg, l.iPRCCh, method, args, reply) } diff --git a/services/cgr-engine.go b/services/cgr-engine.go index 884ab7465..246d43726 100644 --- a/services/cgr-engine.go +++ b/services/cgr-engine.go @@ -40,12 +40,13 @@ import ( "github.com/cgrates/rpcclient" ) -func NewCGREngine(cfg *config.CGRConfig, cM *engine.ConnManager, shdWg *sync.WaitGroup, server *cores.Server, caps *engine.Caps) *CGREngine { +func NewCGREngine(cfg *config.CGRConfig, cM *engine.ConnManager, shdWg *sync.WaitGroup, + server *cores.Server, caps *engine.Caps) *CGREngine { return &CGREngine{ - cfg: cfg, - cM: cM, - caps: caps, - shdWg: shdWg, + cfg: cfg, // Engine configuration + cM: cM, // connection manager + caps: caps, // caps is used to limit RPC CPS + shdWg: shdWg, // wait for shutdown srvManager: servmanager.NewServiceManager(shdWg, cM, cfg.GetReloadChan()), server: server, // Rpc/http server srvDep: map[string]*sync.WaitGroup{ diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 3fab821ba..7614e4642 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -40,12 +40,12 @@ func NewServiceManager(shdWg *sync.WaitGroup, connMgr *engine.ConnManager, rldCh // ServiceManager handles service management ran by the engine type ServiceManager struct { - sync.RWMutex // lock access to any shared data - subsystems map[string]Service + sync.RWMutex // lock access to any shared data + subsystems map[string]Service // active subsystems managed by SM - shdWg *sync.WaitGroup + shdWg *sync.WaitGroup // list of shutdown items connMgr *engine.ConnManager - rldChan <-chan string + rldChan <-chan string // reload signals come over this channelc } // StartServices starts all enabled services