DispatcherS.dispatcherForEvent method, exporting MatchingItemIDsForEvent from engine

This commit is contained in:
DanB
2019-01-31 11:57:25 +01:00
parent 8ea8739e10
commit c6fad981a6
21 changed files with 178 additions and 240 deletions

View File

@@ -18,6 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package v1
/*
import (
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/engine"
@@ -281,3 +282,4 @@ func (dC *DispatcherChargerSv1) ProcessEvent(args *dispatchers.CGREvWithApiKey,
reply *[]*engine.AttrSProcessEventReply) (err error) {
return dC.dC.ChargerSv1ProcessEvent(args, reply)
}
*/

View File

@@ -28,7 +28,6 @@ import (
"runtime"
"runtime/pprof"
"strconv"
"strings"
"syscall"
"time"
@@ -971,124 +970,26 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc
server *utils.Server, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS Dispatcher service.")
var err error
var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, sessionsSConns, chargerSConns *rpcclient.RpcClientPool
//var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, sessionsSConns, chargerSConns *rpcclient.RpcClientPool
cfg.DispatcherSCfg().DispatchingStrategy = strings.TrimPrefix(cfg.DispatcherSCfg().DispatchingStrategy,
utils.Meta) // remote * from DispatchingStrategy
if len(cfg.DispatcherSCfg().RALsConns) != 0 {
ralsConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DispatcherSCfg().RALsConns, internalRaterChan,
cfg.GeneralCfg().InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to RALs: %s", utils.DispatcherS, err.Error()))
exitChan <- true
return
/*
if len(cfg.DispatcherSCfg().RALsConns) != 0 {
ralsConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DispatcherSCfg().RALsConns, internalRaterChan,
cfg.GeneralCfg().InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to RALs: %s", utils.DispatcherS, err.Error()))
exitChan <- true
return
}
}
}
if len(cfg.DispatcherSCfg().ResSConns) != 0 {
resSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DispatcherSCfg().ResSConns, nil,
cfg.GeneralCfg().InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ResoruceS: %s", utils.DispatcherS, err.Error()))
exitChan <- true
return
}
}
if len(cfg.DispatcherSCfg().ThreshSConns) != 0 {
threshSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DispatcherSCfg().ThreshSConns, nil,
cfg.GeneralCfg().InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.DispatcherS, err.Error()))
exitChan <- true
return
}
}
if len(cfg.DispatcherSCfg().StatSConns) != 0 {
statSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DispatcherSCfg().StatSConns, nil,
cfg.GeneralCfg().InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatQueueS: %s", utils.DispatcherS, err.Error()))
exitChan <- true
return
}
}
if len(cfg.DispatcherSCfg().SupplSConns) != 0 {
suplSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DispatcherSCfg().SupplSConns, nil,
cfg.GeneralCfg().InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SupplierS: %s", utils.DispatcherS, err.Error()))
exitChan <- true
return
}
}
if len(cfg.DispatcherSCfg().AttrSConns) != 0 {
attrSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DispatcherSCfg().AttrSConns, nil,
cfg.GeneralCfg().InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to AttributeS: %s", utils.DispatcherS, err.Error()))
exitChan <- true
return
}
}
if len(cfg.DispatcherSCfg().SessionSConns) != 0 {
sessionsSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DispatcherSCfg().SessionSConns, nil,
cfg.GeneralCfg().InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SessionS: %s", utils.DispatcherS, err.Error()))
exitChan <- true
return
}
}
if len(cfg.DispatcherSCfg().ChargerSConns) != 0 {
chargerSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DispatcherSCfg().ChargerSConns, nil,
cfg.GeneralCfg().InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ChargerS: %s", utils.DispatcherS, err.Error()))
exitChan <- true
return
}
}
dspS, err := dispatchers.NewDispatcherService(dm, ralsConns, resSConns,
threshSConns, statSConns, suplSConns, attrSConns, sessionsSConns, chargerSConns)
*/
dspS, err := dispatchers.NewDispatcherService(dm, cfg)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherS, err.Error()))
exitChan <- true
@@ -1102,34 +1003,36 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc
exitChan <- true
return
}()
if !cfg.ThresholdSCfg().Enabled && len(cfg.DispatcherSCfg().ThreshSConns) != 0 {
server.RpcRegisterName(utils.ThresholdSv1,
v1.NewDispatcherThresholdSv1(dspS))
}
if !cfg.StatSCfg().Enabled && len(cfg.DispatcherSCfg().StatSConns) != 0 {
server.RpcRegisterName(utils.StatSv1,
v1.NewDispatcherStatSv1(dspS))
}
if !cfg.ResourceSCfg().Enabled && len(cfg.DispatcherSCfg().ResSConns) != 0 {
server.RpcRegisterName(utils.ResourceSv1,
v1.NewDispatcherResourceSv1(dspS))
}
if !cfg.SupplierSCfg().Enabled && len(cfg.DispatcherSCfg().SupplSConns) != 0 {
server.RpcRegisterName(utils.SupplierSv1,
v1.NewDispatcherSupplierSv1(dspS))
}
if !cfg.AttributeSCfg().Enabled && len(cfg.DispatcherSCfg().AttrSConns) != 0 {
server.RpcRegisterName(utils.AttributeSv1,
v1.NewDispatcherAttributeSv1(dspS))
}
if !cfg.SessionSCfg().Enabled && len(cfg.DispatcherSCfg().SessionSConns) != 0 {
server.RpcRegisterName(utils.SessionSv1,
v1.NewDispatcherSessionSv1(dspS))
}
if !cfg.ChargerSCfg().Enabled && len(cfg.DispatcherSCfg().ChargerSConns) != 0 {
server.RpcRegisterName(utils.ChargerSv1,
v1.NewDispatcherChargerSv1(dspS))
}
/*
if !cfg.ThresholdSCfg().Enabled && len(cfg.DispatcherSCfg().ThreshSConns) != 0 {
server.RpcRegisterName(utils.ThresholdSv1,
v1.NewDispatcherThresholdSv1(dspS))
}
if !cfg.StatSCfg().Enabled && len(cfg.DispatcherSCfg().StatSConns) != 0 {
server.RpcRegisterName(utils.StatSv1,
v1.NewDispatcherStatSv1(dspS))
}
if !cfg.ResourceSCfg().Enabled && len(cfg.DispatcherSCfg().ResSConns) != 0 {
server.RpcRegisterName(utils.ResourceSv1,
v1.NewDispatcherResourceSv1(dspS))
}
if !cfg.SupplierSCfg().Enabled && len(cfg.DispatcherSCfg().SupplSConns) != 0 {
server.RpcRegisterName(utils.SupplierSv1,
v1.NewDispatcherSupplierSv1(dspS))
}
if !cfg.AttributeSCfg().Enabled && len(cfg.DispatcherSCfg().AttrSConns) != 0 {
server.RpcRegisterName(utils.AttributeSv1,
v1.NewDispatcherAttributeSv1(dspS))
}
if !cfg.SessionSCfg().Enabled && len(cfg.DispatcherSCfg().SessionSConns) != 0 {
server.RpcRegisterName(utils.SessionSv1,
v1.NewDispatcherSessionSv1(dspS))
}
if !cfg.ChargerSCfg().Enabled && len(cfg.DispatcherSCfg().ChargerSConns) != 0 {
server.RpcRegisterName(utils.ChargerSv1,
v1.NewDispatcherChargerSv1(dspS))
}
*/
}
// startAnalyzerService fires up the AnalyzerS

View File

@@ -18,6 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package dispatchers
/*
import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -55,3 +56,4 @@ func (dS *DispatcherService) AttributeSv1ProcessEvent(args *ArgsAttrProcessEvent
return dS.attrS.Call(utils.AttributeSv1ProcessEvent, args.AttrArgsProcessEvent, reply)
}
*/

View File

@@ -18,6 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package dispatchers
/*
import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -53,3 +54,4 @@ func (dS *DispatcherService) ChargerSv1ProcessEvent(args *CGREvWithApiKey,
}
return dS.chargerS.Call(utils.ChargerSv1ProcessEvent, args.CGREvent, reply)
}
*/

View File

@@ -20,65 +20,26 @@ package dispatchers
import (
"fmt"
"reflect"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// NewDispatcherService initializes a DispatcherService
func NewDispatcherService(dm *engine.DataManager, rals, resS, thdS,
statS, splS, attrS, sessionS, chargerS rpcclient.RpcClientConnection) (*DispatcherService, error) {
if rals != nil && reflect.ValueOf(rals).IsNil() {
rals = nil
}
if resS != nil && reflect.ValueOf(resS).IsNil() {
resS = nil
}
if thdS != nil && reflect.ValueOf(thdS).IsNil() {
thdS = nil
}
if statS != nil && reflect.ValueOf(statS).IsNil() {
statS = nil
}
if splS != nil && reflect.ValueOf(splS).IsNil() {
splS = nil
}
if attrS != nil && reflect.ValueOf(attrS).IsNil() {
attrS = nil
}
if sessionS != nil && reflect.ValueOf(sessionS).IsNil() {
sessionS = nil
}
if chargerS != nil && reflect.ValueOf(chargerS).IsNil() {
chargerS = nil
}
return &DispatcherService{
dm: dm,
rals: rals,
resS: resS,
thdS: thdS,
statS: statS,
splS: splS,
attrS: attrS,
sessionS: sessionS,
chargerS: chargerS}, nil
func NewDispatcherService(dm *engine.DataManager,
cfg *config.CGRConfig) (*DispatcherService, error) {
return &DispatcherService{dm: dm, cfg: cfg}, nil
}
// DispatcherService is the service handling dispatching towards internal components
// designed to handle automatic partitioning and failover
type DispatcherService struct {
dm *engine.DataManager
rals rpcclient.RpcClientConnection // RALs connections
resS rpcclient.RpcClientConnection // ResourceS connections
thdS rpcclient.RpcClientConnection // ThresholdS connections
statS rpcclient.RpcClientConnection // StatS connections
splS rpcclient.RpcClientConnection // SupplierS connections
attrS rpcclient.RpcClientConnection // AttributeS connections
sessionS rpcclient.RpcClientConnection // SessionS connections
chargerS rpcclient.RpcClientConnection // ChargerS connections
dm *engine.DataManager
cfg *config.CGRConfig
filterS *engine.FilterS
stringIndexedFields *[]string
prefixIndexedFields *[]string
}
// ListenAndServe will initialize the service
@@ -96,45 +57,65 @@ func (dS *DispatcherService) Shutdown() error {
return nil
}
func (dS *DispatcherService) authorizeEvent(ev *utils.CGREvent,
reply *engine.AttrSProcessEventReply) (err error) {
if dS.attrS == nil {
return utils.NewErrNotConnected(utils.AttributeS)
// dispatcherForEvent returns a dispatcher instance configured for specific event
// or utils.ErrNotFound if none present
func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent,
subsys string) (d Dispatcher, err error) {
// find out the matching profiles
idxKeyPrfx := utils.ConcatenatedKey(ev.Tenant, utils.META_ANY)
if subsys != "" {
idxKeyPrfx = utils.ConcatenatedKey(ev.Tenant, subsys)
}
if err = dS.attrS.Call(utils.AttributeSv1ProcessEvent,
&engine.AttrArgsProcessEvent{
CGREvent: *ev}, reply); err != nil {
if err.Error() == utils.ErrNotFound.Error() {
err = utils.ErrUnknownApiKey
matchingPrfls := make(map[string]*engine.DispatcherProfile)
prflIDs, err := engine.MatchingItemIDsForEvent(ev.Event, dS.stringIndexedFields, dS.prefixIndexedFields,
dS.dm, utils.CacheDispatcherFilterIndexes, idxKeyPrfx, dS.cfg.FilterSCfg().IndexedSelects)
if err != nil {
return nil, err
}
for prflID := range prflIDs {
prfl, err := dS.dm.GetDispatcherProfile(ev.Tenant, prflID, true, true, utils.NonTransactional)
if err != nil {
if err == utils.ErrNotFound {
continue
}
return nil, err
}
if prfl.ActivationInterval != nil && ev.Time != nil &&
!prfl.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active
continue
}
if pass, err := dS.filterS.Pass(ev.Tenant, prfl.FilterIDs,
config.NewNavigableMap(ev.Event)); err != nil {
return nil, err
} else if !pass {
continue
}
matchingPrfls[prflID] = prfl
}
if len(matchingPrfls) == 0 {
return nil, utils.ErrNotFound
}
// All good, convert from Map to Slice so we can sort
prfls := make(engine.DispatcherProfiles, len(matchingPrfls))
i := 0
for _, prfl := range matchingPrfls {
prfls[i] = prfl
i++
}
prfls.Sort()
matchedPrlf := prfls[0] // only use the first profile
tntID := matchedPrlf.TenantID()
// get or build the Dispatcher for the config
if x, ok := engine.Cache.Get(utils.CacheDispatchers,
tntID); ok && x != nil {
d = x.(Dispatcher)
d.SetProfile(matchedPrlf)
return
}
return
}
func (dS *DispatcherService) authorize(method, tenant, apiKey string, evTime *time.Time) (err error) {
if apiKey == "" {
return utils.NewErrMandatoryIeMissing(utils.APIKey)
}
ev := &utils.CGREvent{
Tenant: tenant,
ID: utils.UUIDSha1Prefix(),
Context: utils.StringPointer(utils.MetaAuth),
Time: evTime,
Event: map[string]interface{}{
utils.APIKey: apiKey,
},
}
var rplyEv engine.AttrSProcessEventReply
if err = dS.authorizeEvent(ev, &rplyEv); err != nil {
return
}
var apiMethods string
if apiMethods, err = rplyEv.CGREvent.FieldAsString(utils.APIMethods); err != nil {
return
}
if !ParseStringMap(apiMethods).HasKey(method) {
return utils.ErrUnauthorizedApi
}
if d, err = newDispatcher(matchedPrlf); err != nil {
return
}
engine.Cache.Set(utils.CacheDispatchers, tntID, d, nil,
true, utils.EmptyString)
return
}

View File

@@ -19,8 +19,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package dispatchers
import (
"fmt"
"sort"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -56,3 +58,37 @@ type DispatcherProfiles []*DispatcherProfile
func (dps DispatcherProfiles) Sort() {
sort.Slice(dps, func(i, j int) bool { return dps[i].Weight > dps[j].Weight })
}
// Dispatcher is responsible for routing requests to pool of connections
// there will be different implementations based on strategy
type Dispatcher interface {
// SetConfig is used to update the configuration information within dispatcher
// to make sure we take decisions based on latest config
SetProfile(pfl *engine.DispatcherProfile)
// GetConnID returns an ordered list of connection IDs for the event
NextConnID() (connID string)
}
// newDispatcher constructs instances of Dispatcher
func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) {
switch pfl.Strategy {
case utils.MetaWeight:
d = &WeightDispatcher{pfl: pfl}
default:
err = fmt.Errorf("unsupported dispatch strategy: <%s>", pfl.Strategy)
}
return
}
type WeightDispatcher struct {
pfl *engine.DispatcherProfile
}
func (wd *WeightDispatcher) SetProfile(pfl *engine.DispatcherProfile) {
wd.pfl = pfl
return
}
func (wd *WeightDispatcher) NextConnID() (connID string) {
return
}

View File

@@ -18,6 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package dispatchers
/*
import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -42,3 +43,4 @@ func (dS *DispatcherService) ResourceSv1GetResourcesForEvent(args *ArgsV1ResUsag
return dS.resS.Call(utils.ResourceSv1GetResourcesForEvent, args.ArgRSv1ResourceUsage, reply)
}
*/

View File

@@ -18,6 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package dispatchers
/*
import (
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
@@ -101,3 +102,4 @@ func (dS *DispatcherService) SessionSv1UpdateSession(args *UpdateSessionWithApiK
}
return dS.sessionS.Call(utils.SessionSv1UpdateSession, args.V1UpdateSessionArgs, reply)
}
*/

View File

@@ -18,6 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package dispatchers
/*
import (
"time"
@@ -67,3 +68,4 @@ func (dS *DispatcherService) StatSv1ProcessEvent(args *ArgsStatProcessEventWithA
}
return dS.statS.Call(utils.StatSv1ProcessEvent, args, reply)
}
*/

View File

@@ -18,6 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package dispatchers
/*
import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -42,3 +43,4 @@ func (dS *DispatcherService) SupplierSv1GetSuppliers(args *ArgsGetSuppliersWithA
return dS.splS.Call(utils.SupplierSv1GetSuppliers, args.ArgsGetSuppliers, reply)
}
*/

View File

@@ -18,6 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package dispatchers
/*
import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -53,3 +54,4 @@ func (dS *DispatcherService) ThresholdSv1ProcessEvent(args *ArgsProcessEventWith
}
return dS.thdS.Call(utils.ThresholdSv1ProcessEvent, args.ArgsProcessEvent, tIDs)
}
*/

View File

@@ -70,13 +70,13 @@ func (alS *AttributeService) matchingAttributeProfilesForEvent(args *AttrArgsPro
if len(args.AttributeIDs) != 0 {
attrIDs = args.AttributeIDs
} else {
aPrflIDs, err := matchingItemIDsForEvent(args.Event, alS.stringIndexedFields, alS.prefixIndexedFields,
aPrflIDs, err := MatchingItemIDsForEvent(args.Event, alS.stringIndexedFields, alS.prefixIndexedFields,
alS.dm, utils.CacheAttributeFilterIndexes, attrIdxKey, alS.filterS.cfg.FilterSCfg().IndexedSelects)
if err != nil {
if err != utils.ErrNotFound {
return nil, err
}
if aPrflIDs, err = matchingItemIDsForEvent(args.Event, alS.stringIndexedFields, alS.prefixIndexedFields,
if aPrflIDs, err = MatchingItemIDsForEvent(args.Event, alS.stringIndexedFields, alS.prefixIndexedFields,
alS.dm, utils.CacheAttributeFilterIndexes, utils.ConcatenatedKey(args.Tenant, utils.META_ANY),
alS.filterS.cfg.FilterSCfg().IndexedSelects); err != nil {
return nil, err

View File

@@ -58,6 +58,7 @@ var precachedPartitions = []string{
utils.CacheSupplierProfiles,
utils.CacheAttributeProfiles,
utils.CacheChargerProfiles,
utils.CacheDispatcherProfiles,
utils.CacheDiameterMessages,
}

View File

@@ -63,7 +63,7 @@ func (cS *ChargerService) Shutdown() (err error) {
// matchingChargingProfilesForEvent returns ordered list of matching chargers which are active by the time of the function call
func (cS *ChargerService) matchingChargerProfilesForEvent(cgrEv *utils.CGREvent) (cPs ChargerProfiles, err error) {
cpIDs, err := matchingItemIDsForEvent(cgrEv.Event,
cpIDs, err := MatchingItemIDsForEvent(cgrEv.Event,
cS.cfg.ChargerSCfg().StringIndexedFields, cS.cfg.ChargerSCfg().PrefixIndexedFields,
cS.dm, utils.CacheChargerFilterIndexes, cgrEv.Tenant, cS.cfg.FilterSCfg().IndexedSelects)
if err != nil {

View File

@@ -27,10 +27,10 @@ import (
"github.com/cgrates/cgrates/utils"
)
// matchingItemIDsForEvent returns the list of item IDs matching fieldName/fieldValue for an event
// MatchingItemIDsForEvent returns the list of item IDs matching fieldName/fieldValue for an event
// fieldIDs limits the fields which are checked against indexes
// helper on top of dataDB.MatchFilterIndex, adding utils.ANY to list of fields queried
func matchingItemIDsForEvent(ev map[string]interface{}, stringFldIDs, prefixFldIDs *[]string,
func MatchingItemIDsForEvent(ev map[string]interface{}, stringFldIDs, prefixFldIDs *[]string,
dm *DataManager, cacheID, itemIDPrefix string, indexedSelects bool) (itemIDs utils.StringMap, err error) {
lockID := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, lockID)

View File

@@ -81,7 +81,7 @@ func TestFilterMatchingItemIDsForEvent(t *testing.T) {
utils.AnswerTime: time.Date(2014, 7, 14, 14, 30, 0, 0, time.UTC),
"Field": "profile",
}
aPrflIDs, err := matchingItemIDsForEvent(matchEV, nil, nil,
aPrflIDs, err := MatchingItemIDsForEvent(matchEV, nil, nil,
dmMatch, utils.CacheAttributeFilterIndexes, prefix, true)
if err != nil {
t.Errorf("Error: %+v", err)
@@ -93,7 +93,7 @@ func TestFilterMatchingItemIDsForEvent(t *testing.T) {
matchEV = map[string]interface{}{
"Field": "profilePrefix",
}
aPrflIDs, err = matchingItemIDsForEvent(matchEV, nil, nil,
aPrflIDs, err = MatchingItemIDsForEvent(matchEV, nil, nil,
dmMatch, utils.CacheAttributeFilterIndexes, prefix, true)
if err != nil {
t.Errorf("Error: %+v", err)

View File

@@ -443,7 +443,7 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources)
// matchingResourcesForEvent returns ordered list of matching resources which are active by the time of the call
func (rS *ResourceService) matchingResourcesForEvent(ev *utils.CGREvent, usageTTL *time.Duration) (rs Resources, err error) {
matchingResources := make(map[string]*Resource)
rIDs, err := matchingItemIDsForEvent(ev.Event, rS.stringIndexedFields, rS.prefixIndexedFields,
rIDs, err := MatchingItemIDsForEvent(ev.Event, rS.stringIndexedFields, rS.prefixIndexedFields,
rS.dm, utils.CacheResourceFilterIndexes, ev.Tenant, rS.filterS.cfg.FilterSCfg().IndexedSelects)
if err != nil {
return nil, err

View File

@@ -151,7 +151,7 @@ func (sS *StatService) matchingStatQueuesForEvent(args *StatsArgsProcessEvent) (
if len(args.StatIDs) != 0 {
sqIDs = args.StatIDs
} else {
mapIDs, err := matchingItemIDsForEvent(args.Event, sS.stringIndexedFields, sS.prefixIndexedFields,
mapIDs, err := MatchingItemIDsForEvent(args.Event, sS.stringIndexedFields, sS.prefixIndexedFields,
sS.dm, utils.CacheStatFilterIndexes, args.Tenant, sS.filterS.cfg.FilterSCfg().IndexedSelects)
if err != nil {
return nil, err

View File

@@ -128,7 +128,7 @@ func (spS *SupplierService) Shutdown() error {
// matchingSupplierProfilesForEvent returns ordered list of matching resources which are active by the time of the call
func (spS *SupplierService) matchingSupplierProfilesForEvent(ev *utils.CGREvent) (sPrfls SupplierProfiles, err error) {
matchingLPs := make(map[string]*SupplierProfile)
sPrflIDs, err := matchingItemIDsForEvent(ev.Event, spS.stringIndexedFields, spS.prefixIndexedFields,
sPrflIDs, err := MatchingItemIDsForEvent(ev.Event, spS.stringIndexedFields, spS.prefixIndexedFields,
spS.dm, utils.CacheSupplierFilterIndexes, ev.Tenant, spS.filterS.cfg.FilterSCfg().IndexedSelects)
if err != nil {
return nil, err

View File

@@ -223,7 +223,7 @@ func (tS *ThresholdService) matchingThresholdsForEvent(args *ArgsProcessEvent) (
if len(args.ThresholdIDs) != 0 {
tIDs = args.ThresholdIDs
} else {
tIDsMap, err := matchingItemIDsForEvent(args.Event, tS.stringIndexedFields,
tIDsMap, err := MatchingItemIDsForEvent(args.Event, tS.stringIndexedFields,
tS.prefixIndexedFields, tS.dm, utils.CacheThresholdFilterIndexes,
args.Tenant, tS.filterS.cfg.FilterSCfg().IndexedSelects)
if err != nil {

View File

@@ -932,6 +932,7 @@ const (
CacheAttributeProfiles = "attribute_profiles"
CacheChargerProfiles = "charger_profiles"
CacheDispatcherProfiles = "dispatcher_profiles"
CacheDispatchers = "dispatchers"
CacheResourceFilterIndexes = "resource_filter_indexes"
CacheStatFilterIndexes = "stat_filter_indexes"
CacheThresholdFilterIndexes = "threshold_filter_indexes"