DispatcherS improvements for caching based on routeID

This commit is contained in:
DanB
2022-09-05 20:23:51 +02:00
parent 8d37467b63
commit 56f408e628
2 changed files with 87 additions and 44 deletions

View File

@@ -165,19 +165,7 @@ func (dS *DispatcherService) Dispatch(ctx *context.Context, ev *utils.CGREvent,
if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes,
routeID); ok && x != nil {
dR = x.(*DispatcherRoute)
// query the profile out of cached route
var dPrfl *engine.DispatcherProfile
if dPrfl, err = dS.dm.GetDispatcherProfile(ctx, dR.Tenant, dR.ProfileID,
true, true, utils.NonTransactional); err != nil {
if err != utils.ErrNotFound {
return
}
// profile was not found
utils.Logger.Warning(fmt.Sprintf("<%s> could not find profile with tenant: <%s> and ID <%s> for routeID: <%s>",
utils.DispatcherS, dR.Tenant, dR.ProfileID, routeID))
} else {
dPrfls = engine.DispatcherProfiles{dPrfl} // will be used as the list of routes
}
dPrfls = engine.DispatcherProfiles{&engine.DispatcherProfile{Tenant: dR.Tenant, ID: dR.ProfileID}} // will be used bellow to retrieve the dispatcher
}
}
evNm := utils.MapStorage{
@@ -196,6 +184,7 @@ func (dS *DispatcherService) Dispatch(ctx *context.Context, ev *utils.CGREvent,
if len(dPrfls) == 0 {
return utils.NewErrDispatcherS(utils.ErrPrefixNotFound("PROFILE"))
}
fmt.Printf("dispatching event: %+v to profiles: %+v\n", ev, dPrfls)
for _, dPrfl := range dPrfls {
tntID := dPrfl.TenantID()
// get or build the Dispatcher for the config
@@ -203,11 +192,34 @@ func (dS *DispatcherService) Dispatch(ctx *context.Context, ev *utils.CGREvent,
if x, ok := engine.Cache.Get(utils.CacheDispatchers,
tntID); ok && x != nil {
d = x.(Dispatcher)
} else if d, err = newDispatcher(dPrfl); err != nil {
return utils.NewErrDispatcherS(err)
} else if err = engine.Cache.Set(ctx, utils.CacheDispatchers, tntID, d, // cache the built Dispatcher
nil, true, utils.EmptyString); err != nil {
return utils.NewErrDispatcherS(err)
} else { // dispatcher is not cached, build it here
if dPrfl.Hosts == nil { // dispatcher profile was not retrieved but built artificially above, try retrieving
if dPrfl, err = dS.dm.GetDispatcherProfile(ctx, dPrfl.Tenant, dPrfl.ID,
true, true, utils.NonTransactional); err != nil {
if err != utils.ErrNotFound {
return
}
// profile was not found
utils.Logger.Warning(fmt.Sprintf("<%s> could not find profile with tenant: <%s> and ID <%s> for routeID: <%s>",
utils.DispatcherS, dR.Tenant, dR.ProfileID, routeID))
if len(dPrfls) == 1 { // the only profile set does not exist anymore
return utils.NewErrDispatcherS(utils.ErrPrefixNotFound("PROFILE"))
}
continue
}
}
if d, err = newDispatcher(dPrfl); err != nil {
return utils.NewErrDispatcherS(err)
} else if err = engine.Cache.Set(ctx, utils.CacheDispatchers, tntID, d, // cache the built Dispatcher
nil, true, utils.EmptyString); err != nil {
return utils.NewErrDispatcherS(err)
}
}
if routeID != utils.EmptyString && dR == nil { // first time we cache the route
dR = &DispatcherRoute{
Tenant: dPrfl.Tenant,
ProfileID: dPrfl.ID,
}
}
if err = d.Dispatch(dS.dm, dS.fltrS, dS.cfg,
ctx, dS.connMgr.GetDispInternalChan(), evNm, tnt, routeID, dR,

View File

@@ -56,6 +56,7 @@ type Dispatcher interface {
// newDispatcher constructs instances of Dispatcher
func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) {
hosts := pfl.Hosts.Clone()
fmt.Printf("hosts: %+v, cloned: %+v\n", pfl.Hosts, hosts)
hosts.Sort() // make sure the connections are sorted
switch pfl.Strategy {
case utils.MetaWeight:
@@ -185,14 +186,14 @@ func (sd *singleResultDispatcher) Dispatch(dm *engine.DataManager, flts *engine.
ctx *context.Context, iPRCCh chan birpc.ClientConnector,
ev utils.DataProvider, tnt, routeID string, dR *DispatcherRoute,
serviceMethod string, args interface{}, reply interface{}) (err error) {
if routeID != utils.EmptyString { // route to previously discovered route
if dR != nil { // route to previously discovered route
if err = callDHwithID(ctx, tnt, dR.HostID, routeID, dR, dm,
cfg, iPRCCh, serviceMethod, args, reply); err == nil ||
(err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors
return
}
// not found or network errors will continue
utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with id <%q>",
utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with idnt <%q>",
utils.DispatcherS, err.Error(), dR.HostID))
}
var hostIDs []string
@@ -202,10 +203,15 @@ func (sd *singleResultDispatcher) Dispatch(dm *engine.DataManager, flts *engine.
return utils.ErrHostNotFound
}
for _, hostID := range hostIDs {
dRh := dR
var dRh *DispatcherRoute
if routeID != utils.EmptyString {
dRh.HostID = hostID
dRh = &DispatcherRoute{
Tenant: dR.Tenant,
ProfileID: dR.ProfileID,
HostID: hostID,
}
}
fmt.Printf("Will dispatch to host with id: <%s>\n", hostID)
if err = callDHwithID(ctx, tnt, hostID, routeID, dRh, dm,
cfg, iPRCCh, serviceMethod, args, reply); err == nil ||
(err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors
@@ -213,7 +219,7 @@ func (sd *singleResultDispatcher) Dispatch(dm *engine.DataManager, flts *engine.
}
if err != nil {
// not found or network errors will continue with standard dispatching
utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with id <%q>",
utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with identity <%s>",
utils.DispatcherS, err.Error(), hostID))
}
return
@@ -250,10 +256,20 @@ func (b *broadcastDispatcher) Dispatch(dm *engine.DataManager, flts *engine.Filt
return utils.NewErrDispatcherS(err)
}
hasHosts = true
var dRh *DispatcherRoute
if routeID != utils.EmptyString {
dRh = &DispatcherRoute{
Tenant: dR.Tenant,
ProfileID: dR.ProfileID,
HostID: hostID,
}
}
pool.AddClient(&lazyDH{
dh: dH,
cfg: cfg,
iPRCCh: iPRCCh,
dh: dH,
cfg: cfg,
iPRCCh: iPRCCh,
routeID: routeID,
dR: dRh,
})
}
if !hasHosts { // in case we do not match any host
@@ -283,7 +299,7 @@ func (ld *loadDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS,
} else if lM, err = newLoadMetrics(ld.hosts, ld.defaultRatio); err != nil {
return
}
if routeID != utils.EmptyString { // route to previously discovered route
if dR != nil { // route to previously discovered route
lM.incrementLoad(ctx, dR.HostID, ld.tntID)
err = callDHwithID(ctx, tnt, dR.HostID, routeID, dR, dm,
cfg, iPRCCh, serviceMethod, args, reply)
@@ -303,8 +319,16 @@ func (ld *loadDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS,
return utils.ErrHostNotFound
}
for _, hostID := range hostIDs {
var dRh *DispatcherRoute
if routeID != utils.EmptyString {
dRh = &DispatcherRoute{
Tenant: dR.Tenant,
ProfileID: dR.ProfileID,
HostID: hostID,
}
}
lM.incrementLoad(ctx, hostID, ld.tntID)
err = callDHwithID(ctx, tnt, hostID, routeID, dR, dm,
err = callDHwithID(ctx, tnt, hostID, routeID, dRh, dm,
cfg, iPRCCh, serviceMethod, args, reply)
lM.decrementLoad(ctx, hostID, ld.tntID) // call ended
if err == nil ||
@@ -405,7 +429,22 @@ func callDHwithID(ctx *context.Context, tnt, hostID, routeID string, dR *Dispatc
if dH, err = dm.GetDispatcherHost(ctx, tnt, hostID, true, true, utils.NonTransactional); err != nil {
return
}
if err = callDH(ctx, dH, cfg, iPRCCh, serviceMethod, args, reply); err != nil {
if err = callDH(ctx, dH, routeID, dR, cfg, iPRCCh, serviceMethod, args, reply); err != nil {
return
}
return
}
func callDH(ctx *context.Context,
dh *engine.DispatcherHost, routeID string, dR *DispatcherRoute,
cfg *config.CGRConfig, iPRCCh chan birpc.ClientConnector,
method string, args, reply interface{}) (err error) {
var conn birpc.ClientConnector
if conn, err = dh.GetConn(ctx, cfg, iPRCCh); err != nil {
return
}
if err = conn.Call(ctx, method, args, reply); err != nil {
return
}
if routeID != utils.EmptyString { // cache the discovered route
@@ -417,23 +456,15 @@ func callDHwithID(ctx *context.Context, tnt, hostID, routeID string, dR *Dispatc
return
}
func callDH(ctx *context.Context,
dh *engine.DispatcherHost, cfg *config.CGRConfig, iPRCCh chan birpc.ClientConnector,
method string, args, reply interface{}) (err error) {
var conn birpc.ClientConnector
if conn, err = dh.GetConn(ctx, cfg, iPRCCh); err != nil {
return
}
return conn.Call(ctx, method, args, reply)
}
// lazyDH is created for the broadcast strategy so we can make sure host exists during setup phase
type lazyDH struct {
dh *engine.DispatcherHost
cfg *config.CGRConfig
iPRCCh chan birpc.ClientConnector
dh *engine.DispatcherHost
cfg *config.CGRConfig
iPRCCh chan birpc.ClientConnector
routeID string
dR *DispatcherRoute
}
func (l *lazyDH) Call(ctx *context.Context, method string, args, reply interface{}) error {
return callDH(ctx, l.dh, l.cfg, l.iPRCCh, method, args, reply)
func (l *lazyDH) Call(ctx *context.Context, method string, args, reply interface{}) (err error) {
return callDH(ctx, l.dh, l.routeID, l.dR, l.cfg, l.iPRCCh, method, args, reply)
}