For dispatcher cache RouteID with subsystem

This commit is contained in:
TeoV
2019-04-08 07:47:59 -04:00
committed by Dan Christian Bogos
parent 870637077f
commit 476b3b16a2
2 changed files with 18 additions and 20 deletions

View File

@@ -179,7 +179,7 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, routeID
}
engine.Cache.Set(utils.CacheDispatchers, tntID, d, nil,
true, utils.EmptyString)
return d.Dispatch(routeID, serviceMethod, args, reply)
return d.Dispatch(routeID, subsys, serviceMethod, args, reply)
}
func (dS *DispatcherService) V1GetProfileForEvent(ev *DispatcherEvent,

View File

@@ -35,13 +35,13 @@ type Dispatcher interface {
// HostIDs returns the ordered list of host IDs
HostIDs() (hostIDs []string)
// Dispatch is used to send the method over the connections given
Dispatch(routeID *string,
Dispatch(routeID *string, subsystem,
serviceMethod string, args interface{}, reply interface{}) (err error)
}
type strategyDispatcher interface {
// dispatch is used to send the method over the connections given
dispatch(dm *engine.DataManager, routeID *string, tnt string, hostIDs []string,
dispatch(dm *engine.DataManager, routeID *string, subsystem, tnt string, hostIDs []string,
serviceMethod string, args interface{}, reply interface{}) (err error)
}
@@ -107,9 +107,9 @@ func (wd *WeightDispatcher) HostIDs() (hostIDs []string) {
return
}
func (wd *WeightDispatcher) Dispatch(routeID *string,
func (wd *WeightDispatcher) Dispatch(routeID *string, subsystem,
serviceMethod string, args interface{}, reply interface{}) (err error) {
return wd.strategy.dispatch(wd.dm, routeID, wd.tnt, wd.HostIDs(),
return wd.strategy.dispatch(wd.dm, routeID, subsystem, wd.tnt, wd.HostIDs(),
serviceMethod, args, reply)
}
@@ -138,9 +138,9 @@ func (d *RandomDispatcher) HostIDs() (hostIDs []string) {
return hosts.HostIDs()
}
func (d *RandomDispatcher) Dispatch(routeID *string,
func (d *RandomDispatcher) Dispatch(routeID *string, subsystem,
serviceMethod string, args interface{}, reply interface{}) (err error) {
return d.strategy.dispatch(d.dm, routeID, d.tnt, d.HostIDs(),
return d.strategy.dispatch(d.dm, routeID, subsystem, d.tnt, d.HostIDs(),
serviceMethod, args, reply)
}
@@ -173,9 +173,9 @@ func (d *RoundRobinDispatcher) HostIDs() (hostIDs []string) {
return hosts.HostIDs()
}
func (d *RoundRobinDispatcher) Dispatch(routeID *string,
func (d *RoundRobinDispatcher) Dispatch(routeID *string, subsystem,
serviceMethod string, args interface{}, reply interface{}) (err error) {
return d.strategy.dispatch(d.dm, routeID, d.tnt, d.HostIDs(),
return d.strategy.dispatch(d.dm, routeID, subsystem, d.tnt, d.HostIDs(),
serviceMethod, args, reply)
}
@@ -203,22 +203,21 @@ func (d *BroadcastDispatcher) HostIDs() (hostIDs []string) {
return
}
func (d *BroadcastDispatcher) Dispatch(routeID *string,
func (d *BroadcastDispatcher) Dispatch(routeID *string, subsystem,
serviceMethod string, args interface{}, reply interface{}) (lastErr error) { // no cache needed for this strategy because we need to call all connections
return d.strategy.dispatch(d.dm, routeID, d.tnt, d.HostIDs(),
return d.strategy.dispatch(d.dm, routeID, subsystem, d.tnt, d.HostIDs(),
serviceMethod, args, reply)
}
type singleResultstrategyDispatcher struct{}
func (_ *singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID *string, tnt string, hostIDs []string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
func (_ *singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID *string, subsystem, tnt string,
hostIDs []string, serviceMethod string, args interface{}, reply interface{}) (err error) {
var dH *engine.DispatcherHost
if routeID != nil &&
*routeID != "" {
if routeID != nil && *routeID != "" {
// use previously discovered route
if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes,
*routeID); ok && x != nil {
utils.ConcatenatedKey(subsystem, *routeID)); ok && x != nil {
dH = x.(*engine.DispatcherHost)
if err = dH.Call(serviceMethod, args, reply); !utils.IsNetworkError(err) {
return
@@ -233,9 +232,8 @@ func (_ *singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeI
if err = dH.Call(serviceMethod, args, reply); utils.IsNetworkError(err) {
continue
}
if routeID != nil &&
*routeID != "" { // cache the discovered route
engine.Cache.Set(utils.CacheDispatcherRoutes, *routeID, dH,
if routeID != nil && *routeID != "" { // cache the discovered route
engine.Cache.Set(utils.CacheDispatcherRoutes, utils.ConcatenatedKey(subsystem, *routeID), dH,
nil, true, utils.EmptyString)
}
break
@@ -245,7 +243,7 @@ func (_ *singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeI
type brodcastStrategyDispatcher struct{}
func (_ *brodcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID *string, tnt string, hostIDs []string,
func (_ *brodcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID *string, subsystem, tnt string, hostIDs []string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
var hasErrors bool
for _, hostID := range hostIDs {