DispatcherSv1.GetProfileForEvent API call

This commit is contained in:
DanB
2019-04-04 13:02:24 +02:00
parent e65aa274e7
commit 01857c4d8a
4 changed files with 110 additions and 80 deletions

View File

@@ -657,3 +657,17 @@ func (dS *DispatcherSchedulerSv1) Reload(attr *dispatchers.StringkWithApiKey, re
func (dS *DispatcherSchedulerSv1) Ping(args *dispatchers.CGREvWithApiKey, reply *string) error {
return dS.dS.SchedulerSv1Ping(args, reply)
}
func NewDispatcherSv1(dS *dispatchers.DispatcherService) *DispatcherSv1 {
return &DispatcherSv1{dS: dS}
}
type DispatcherSv1 struct {
dS *dispatchers.DispatcherService
}
// GetProfileForEvent returns the matching dispatcher profile for the provided event
func (dSv1 DispatcherSv1) GetProfileForEvent(ev *dispatchers.DispatcherEvent,
dPrfl *engine.DispatcherProfile) error {
return dSv1.dS.V1GetProfileForEvent(ev, dPrfl)
}

View File

@@ -969,6 +969,8 @@ func startDispatcherService(internalDispatcherSChan chan *dispatchers.Dispatcher
return
}()
server.RpcRegister(v1.NewDispatcherSv1(dspS))
server.RpcRegisterName(utils.ThresholdSv1,
v1.NewDispatcherThresholdSv1(dspS))

View File

@@ -64,86 +64,6 @@ func (dS *DispatcherService) Shutdown() error {
return nil
}
// dispatcherForEvent returns a dispatcher instance configured for specific event
// or utils.ErrNotFound if none present
func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent,
subsys string) (d Dispatcher, err error) {
// find out the matching profiles
anyIdxPrfx := utils.ConcatenatedKey(ev.Tenant, utils.META_ANY)
idxKeyPrfx := anyIdxPrfx
if subsys != "" {
idxKeyPrfx = utils.ConcatenatedKey(ev.Tenant, subsys)
}
var matchedPrlf *engine.DispatcherProfile
prflIDs, err := engine.MatchingItemIDsForEvent(ev.Event,
dS.cfg.DispatcherSCfg().StringIndexedFields,
dS.cfg.DispatcherSCfg().PrefixIndexedFields,
dS.dm, utils.CacheDispatcherFilterIndexes,
idxKeyPrfx, dS.cfg.DispatcherSCfg().IndexedSelects)
if err != nil {
// return nil, err
if err != utils.ErrNotFound {
return nil, err
}
prflIDs, err = engine.MatchingItemIDsForEvent(ev.Event,
dS.cfg.DispatcherSCfg().StringIndexedFields,
dS.cfg.DispatcherSCfg().PrefixIndexedFields,
dS.dm, utils.CacheDispatcherFilterIndexes,
anyIdxPrfx, dS.cfg.DispatcherSCfg().IndexedSelects)
if err != nil {
return nil, err
}
}
for prflID := range prflIDs {
prfl, err := dS.dm.GetDispatcherProfile(ev.Tenant, prflID, true, true, utils.NonTransactional)
if err != nil {
if err != utils.ErrNotFound {
return nil, err
}
continue
}
if prfl.ActivationInterval != nil && ev.Time != nil &&
!prfl.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active
continue
}
if pass, err := dS.fltrS.Pass(ev.Tenant, prfl.FilterIDs,
config.NewNavigableMap(ev.Event)); err != nil {
return nil, err
} else if !pass {
continue
}
if matchedPrlf == nil || prfl.Weight > matchedPrlf.Weight {
matchedPrlf = prfl
}
}
if matchedPrlf == nil {
return nil, utils.ErrNotFound
}
tntID := matchedPrlf.TenantID()
// get or build the Dispatcher for the config
if x, ok := engine.Cache.Get(utils.CacheDispatchers,
tntID); ok && x != nil {
d = x.(Dispatcher)
return
}
if d, err = newDispatcher(dS.dm, matchedPrlf); err != nil {
return
}
engine.Cache.Set(utils.CacheDispatchers, tntID, d, nil,
true, utils.EmptyString)
return
}
// Dispatch is the method forwarding the request towards the right connection
func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, routeID *string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
d, errDsp := dS.dispatcherForEvent(ev, subsys)
if errDsp != nil {
return utils.NewErrDispatcherS(errDsp)
}
return d.Dispatch(routeID, serviceMethod, args, reply)
}
func (dS *DispatcherService) authorizeEvent(ev *utils.CGREvent,
reply *engine.AttrSProcessEventReply) (err error) {
if err = dS.attrS.Call(utils.AttributeSv1ProcessEvent,
@@ -183,3 +103,91 @@ func (dS *DispatcherService) authorize(method, tenant, apiKey string, evTime *ti
}
return
}
// dispatcherForEvent returns a dispatcher instance configured for specific event
// or utils.ErrNotFound if none present
func (dS *DispatcherService) dispatcherProfileForEvent(ev *utils.CGREvent,
subsys string) (dPrlf *engine.DispatcherProfile, err error) {
// find out the matching profiles
anyIdxPrfx := utils.ConcatenatedKey(ev.Tenant, utils.META_ANY)
idxKeyPrfx := anyIdxPrfx
if subsys != "" {
idxKeyPrfx = utils.ConcatenatedKey(ev.Tenant, subsys)
}
prflIDs, err := engine.MatchingItemIDsForEvent(ev.Event,
dS.cfg.DispatcherSCfg().StringIndexedFields,
dS.cfg.DispatcherSCfg().PrefixIndexedFields,
dS.dm, utils.CacheDispatcherFilterIndexes,
idxKeyPrfx, dS.cfg.DispatcherSCfg().IndexedSelects)
if err != nil {
// return nil, err
if err != utils.ErrNotFound {
return nil, err
}
prflIDs, err = engine.MatchingItemIDsForEvent(ev.Event,
dS.cfg.DispatcherSCfg().StringIndexedFields,
dS.cfg.DispatcherSCfg().PrefixIndexedFields,
dS.dm, utils.CacheDispatcherFilterIndexes,
anyIdxPrfx, dS.cfg.DispatcherSCfg().IndexedSelects)
if err != nil {
return nil, err
}
}
for prflID := range prflIDs {
prfl, err := dS.dm.GetDispatcherProfile(ev.Tenant, prflID, true, true, utils.NonTransactional)
if err != nil {
if err != utils.ErrNotFound {
return nil, err
}
continue
}
if prfl.ActivationInterval != nil && ev.Time != nil &&
!prfl.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active
continue
}
if pass, err := dS.fltrS.Pass(ev.Tenant, prfl.FilterIDs,
config.NewNavigableMap(ev.Event)); err != nil {
return nil, err
} else if !pass {
continue
}
if dPrlf == nil || prfl.Weight > dPrlf.Weight {
dPrlf = prfl
}
}
if dPrlf == nil {
return nil, utils.ErrNotFound
}
return
}
// Dispatch is the method forwarding the request towards the right connection
func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, routeID *string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
dPrfl, errDsp := dS.dispatcherProfileForEvent(ev, subsys)
if errDsp != nil {
return utils.NewErrDispatcherS(errDsp)
}
tntID := dPrfl.TenantID()
// get or build the Dispatcher for the config
var d Dispatcher
if x, ok := engine.Cache.Get(utils.CacheDispatchers,
tntID); ok && x != nil {
d = x.(Dispatcher)
} else if d, err = newDispatcher(dS.dm, dPrfl); err != nil {
return utils.NewErrDispatcherS(err)
}
engine.Cache.Set(utils.CacheDispatchers, tntID, d, nil,
true, utils.EmptyString)
return d.Dispatch(routeID, serviceMethod, args, reply)
}
func (dS *DispatcherService) V1GetProfileForEvent(ev *DispatcherEvent,
dPfl *engine.DispatcherProfile) (err error) {
retDPfl, errDpfl := dS.dispatcherProfileForEvent(&ev.CGREvent, ev.Subsystem)
if errDpfl != nil {
return utils.NewErrDispatcherS(errDpfl)
}
*dPfl = *retDPfl
return
}

View File

@@ -43,6 +43,12 @@ type CGREvWithApiKey struct {
utils.CGREvent
}
type DispatcherEvent struct {
utils.CGREvent
DispatcherResource
Subsystem string
}
type TntIDWithApiKey struct {
utils.TenantID
DispatcherResource