diff --git a/apier/v1/dispatcher.go b/apier/v1/dispatcher.go index fde3857a1..2c5cf0e6d 100755 --- a/apier/v1/dispatcher.go +++ b/apier/v1/dispatcher.go @@ -657,3 +657,17 @@ func (dS *DispatcherSchedulerSv1) Reload(attr *dispatchers.StringkWithApiKey, re func (dS *DispatcherSchedulerSv1) Ping(args *dispatchers.CGREvWithApiKey, reply *string) error { return dS.dS.SchedulerSv1Ping(args, reply) } + +func NewDispatcherSv1(dS *dispatchers.DispatcherService) *DispatcherSv1 { + return &DispatcherSv1{dS: dS} +} + +type DispatcherSv1 struct { + dS *dispatchers.DispatcherService +} + +// GetProfileForEvent returns the matching dispatcher profile for the provided event +func (dSv1 DispatcherSv1) GetProfileForEvent(ev *dispatchers.DispatcherEvent, + dPrfl *engine.DispatcherProfile) error { + return dSv1.dS.V1GetProfileForEvent(ev, dPrfl) +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index df46faa7b..a74ff060d 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -969,6 +969,8 @@ func startDispatcherService(internalDispatcherSChan chan *dispatchers.Dispatcher return }() + server.RpcRegister(v1.NewDispatcherSv1(dspS)) + server.RpcRegisterName(utils.ThresholdSv1, v1.NewDispatcherThresholdSv1(dspS)) diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go index 4640341ef..068202588 100755 --- a/dispatchers/dispatchers.go +++ b/dispatchers/dispatchers.go @@ -64,86 +64,6 @@ func (dS *DispatcherService) Shutdown() error { return nil } -// dispatcherForEvent returns a dispatcher instance configured for specific event -// or utils.ErrNotFound if none present -func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent, - subsys string) (d Dispatcher, err error) { - // find out the matching profiles - anyIdxPrfx := utils.ConcatenatedKey(ev.Tenant, utils.META_ANY) - idxKeyPrfx := anyIdxPrfx - if subsys != "" { - idxKeyPrfx = utils.ConcatenatedKey(ev.Tenant, subsys) - } - var matchedPrlf *engine.DispatcherProfile - prflIDs, err := engine.MatchingItemIDsForEvent(ev.Event, - dS.cfg.DispatcherSCfg().StringIndexedFields, - dS.cfg.DispatcherSCfg().PrefixIndexedFields, - dS.dm, utils.CacheDispatcherFilterIndexes, - idxKeyPrfx, dS.cfg.DispatcherSCfg().IndexedSelects) - if err != nil { - // return nil, err - if err != utils.ErrNotFound { - return nil, err - } - prflIDs, err = engine.MatchingItemIDsForEvent(ev.Event, - dS.cfg.DispatcherSCfg().StringIndexedFields, - dS.cfg.DispatcherSCfg().PrefixIndexedFields, - dS.dm, utils.CacheDispatcherFilterIndexes, - anyIdxPrfx, dS.cfg.DispatcherSCfg().IndexedSelects) - if err != nil { - return nil, err - } - } - for prflID := range prflIDs { - prfl, err := dS.dm.GetDispatcherProfile(ev.Tenant, prflID, true, true, utils.NonTransactional) - if err != nil { - if err != utils.ErrNotFound { - return nil, err - } - continue - } - if prfl.ActivationInterval != nil && ev.Time != nil && - !prfl.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active - continue - } - if pass, err := dS.fltrS.Pass(ev.Tenant, prfl.FilterIDs, - config.NewNavigableMap(ev.Event)); err != nil { - return nil, err - } else if !pass { - continue - } - if matchedPrlf == nil || prfl.Weight > matchedPrlf.Weight { - matchedPrlf = prfl - } - } - if matchedPrlf == nil { - return nil, utils.ErrNotFound - } - tntID := matchedPrlf.TenantID() - // get or build the Dispatcher for the config - if x, ok := engine.Cache.Get(utils.CacheDispatchers, - tntID); ok && x != nil { - d = x.(Dispatcher) - return - } - if d, err = newDispatcher(dS.dm, matchedPrlf); err != nil { - return - } - engine.Cache.Set(utils.CacheDispatchers, tntID, d, nil, - true, utils.EmptyString) - return -} - -// Dispatch is the method forwarding the request towards the right connection -func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, routeID *string, - serviceMethod string, args interface{}, reply interface{}) (err error) { - d, errDsp := dS.dispatcherForEvent(ev, subsys) - if errDsp != nil { - return utils.NewErrDispatcherS(errDsp) - } - return d.Dispatch(routeID, serviceMethod, args, reply) -} - func (dS *DispatcherService) authorizeEvent(ev *utils.CGREvent, reply *engine.AttrSProcessEventReply) (err error) { if err = dS.attrS.Call(utils.AttributeSv1ProcessEvent, @@ -183,3 +103,91 @@ func (dS *DispatcherService) authorize(method, tenant, apiKey string, evTime *ti } return } + +// dispatcherForEvent returns a dispatcher instance configured for specific event +// or utils.ErrNotFound if none present +func (dS *DispatcherService) dispatcherProfileForEvent(ev *utils.CGREvent, + subsys string) (dPrlf *engine.DispatcherProfile, err error) { + // find out the matching profiles + anyIdxPrfx := utils.ConcatenatedKey(ev.Tenant, utils.META_ANY) + idxKeyPrfx := anyIdxPrfx + if subsys != "" { + idxKeyPrfx = utils.ConcatenatedKey(ev.Tenant, subsys) + } + prflIDs, err := engine.MatchingItemIDsForEvent(ev.Event, + dS.cfg.DispatcherSCfg().StringIndexedFields, + dS.cfg.DispatcherSCfg().PrefixIndexedFields, + dS.dm, utils.CacheDispatcherFilterIndexes, + idxKeyPrfx, dS.cfg.DispatcherSCfg().IndexedSelects) + if err != nil { + // return nil, err + if err != utils.ErrNotFound { + return nil, err + } + prflIDs, err = engine.MatchingItemIDsForEvent(ev.Event, + dS.cfg.DispatcherSCfg().StringIndexedFields, + dS.cfg.DispatcherSCfg().PrefixIndexedFields, + dS.dm, utils.CacheDispatcherFilterIndexes, + anyIdxPrfx, dS.cfg.DispatcherSCfg().IndexedSelects) + if err != nil { + return nil, err + } + } + for prflID := range prflIDs { + prfl, err := dS.dm.GetDispatcherProfile(ev.Tenant, prflID, true, true, utils.NonTransactional) + if err != nil { + if err != utils.ErrNotFound { + return nil, err + } + continue + } + if prfl.ActivationInterval != nil && ev.Time != nil && + !prfl.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active + continue + } + if pass, err := dS.fltrS.Pass(ev.Tenant, prfl.FilterIDs, + config.NewNavigableMap(ev.Event)); err != nil { + return nil, err + } else if !pass { + continue + } + if dPrlf == nil || prfl.Weight > dPrlf.Weight { + dPrlf = prfl + } + } + if dPrlf == nil { + return nil, utils.ErrNotFound + } + return +} + +// Dispatch is the method forwarding the request towards the right connection +func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, routeID *string, + serviceMethod string, args interface{}, reply interface{}) (err error) { + dPrfl, errDsp := dS.dispatcherProfileForEvent(ev, subsys) + if errDsp != nil { + return utils.NewErrDispatcherS(errDsp) + } + tntID := dPrfl.TenantID() + // get or build the Dispatcher for the config + var d Dispatcher + if x, ok := engine.Cache.Get(utils.CacheDispatchers, + tntID); ok && x != nil { + d = x.(Dispatcher) + } else if d, err = newDispatcher(dS.dm, dPrfl); err != nil { + return utils.NewErrDispatcherS(err) + } + engine.Cache.Set(utils.CacheDispatchers, tntID, d, nil, + true, utils.EmptyString) + return d.Dispatch(routeID, serviceMethod, args, reply) +} + +func (dS *DispatcherService) V1GetProfileForEvent(ev *DispatcherEvent, + dPfl *engine.DispatcherProfile) (err error) { + retDPfl, errDpfl := dS.dispatcherProfileForEvent(&ev.CGREvent, ev.Subsystem) + if errDpfl != nil { + return utils.NewErrDispatcherS(errDpfl) + } + *dPfl = *retDPfl + return +} diff --git a/dispatchers/utils.go b/dispatchers/utils.go index afb0f3d0e..247b19a36 100755 --- a/dispatchers/utils.go +++ b/dispatchers/utils.go @@ -43,6 +43,12 @@ type CGREvWithApiKey struct { utils.CGREvent } +type DispatcherEvent struct { + utils.CGREvent + DispatcherResource + Subsystem string +} + type TntIDWithApiKey struct { utils.TenantID DispatcherResource