From c6fad981a64bb81d90570977ac2382128849d2aa Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 31 Jan 2019 11:57:25 +0100 Subject: [PATCH] DispatcherS.dispatcherForEvent method, exporting MatchingItemIDsForEvent from engine --- apier/v1/dispatcher.go | 2 + cmd/cgr-engine/cgr-engine.go | 193 +++++++++-------------------------- dispatchers/attributes.go | 2 + dispatchers/chargers.go | 2 + dispatchers/dispatchers.go | 149 ++++++++++++--------------- dispatchers/libdispatcher.go | 36 +++++++ dispatchers/resources.go | 2 + dispatchers/sessions.go | 2 + dispatchers/stats.go | 2 + dispatchers/suppliers.go | 2 + dispatchers/thresholds.go | 2 + engine/attributes.go | 4 +- engine/caches.go | 1 + engine/chargers.go | 2 +- engine/filterhelpers.go | 4 +- engine/filterhelpers_test.go | 4 +- engine/resources.go | 2 +- engine/stats.go | 2 +- engine/suppliers.go | 2 +- engine/thresholds.go | 2 +- utils/consts.go | 1 + 21 files changed, 178 insertions(+), 240 deletions(-) diff --git a/apier/v1/dispatcher.go b/apier/v1/dispatcher.go index 1361dbb07..f956c624c 100755 --- a/apier/v1/dispatcher.go +++ b/apier/v1/dispatcher.go @@ -18,6 +18,7 @@ along with this program. If not, see package v1 +/* import ( "github.com/cgrates/cgrates/dispatchers" "github.com/cgrates/cgrates/engine" @@ -281,3 +282,4 @@ func (dC *DispatcherChargerSv1) ProcessEvent(args *dispatchers.CGREvWithApiKey, reply *[]*engine.AttrSProcessEventReply) (err error) { return dC.dC.ChargerSv1ProcessEvent(args, reply) } +*/ diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 607d996dc..da2a7dee4 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -28,7 +28,6 @@ import ( "runtime" "runtime/pprof" "strconv" - "strings" "syscall" "time" @@ -971,124 +970,26 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc server *utils.Server, exitChan chan bool) { utils.Logger.Info("Starting CGRateS Dispatcher service.") var err error - var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, sessionsSConns, chargerSConns *rpcclient.RpcClientPool + //var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, sessionsSConns, chargerSConns *rpcclient.RpcClientPool - cfg.DispatcherSCfg().DispatchingStrategy = strings.TrimPrefix(cfg.DispatcherSCfg().DispatchingStrategy, - utils.Meta) // remote * from DispatchingStrategy - if len(cfg.DispatcherSCfg().RALsConns) != 0 { - ralsConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DispatcherSCfg().RALsConns, internalRaterChan, - cfg.GeneralCfg().InternalTtl) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to RALs: %s", utils.DispatcherS, err.Error())) - exitChan <- true - return + /* + if len(cfg.DispatcherSCfg().RALsConns) != 0 { + ralsConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy, + cfg.TlsCfg().ClientKey, + cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, + cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, + cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, + cfg.DispatcherSCfg().RALsConns, internalRaterChan, + cfg.GeneralCfg().InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to RALs: %s", utils.DispatcherS, err.Error())) + exitChan <- true + return + } } - } - if len(cfg.DispatcherSCfg().ResSConns) != 0 { - resSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DispatcherSCfg().ResSConns, nil, - cfg.GeneralCfg().InternalTtl) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ResoruceS: %s", utils.DispatcherS, err.Error())) - exitChan <- true - return - } - } - if len(cfg.DispatcherSCfg().ThreshSConns) != 0 { - threshSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DispatcherSCfg().ThreshSConns, nil, - cfg.GeneralCfg().InternalTtl) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.DispatcherS, err.Error())) - exitChan <- true - return - } - } - if len(cfg.DispatcherSCfg().StatSConns) != 0 { - statSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DispatcherSCfg().StatSConns, nil, - cfg.GeneralCfg().InternalTtl) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatQueueS: %s", utils.DispatcherS, err.Error())) - exitChan <- true - return - } - } - if len(cfg.DispatcherSCfg().SupplSConns) != 0 { - suplSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DispatcherSCfg().SupplSConns, nil, - cfg.GeneralCfg().InternalTtl) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SupplierS: %s", utils.DispatcherS, err.Error())) - exitChan <- true - return - } - } - if len(cfg.DispatcherSCfg().AttrSConns) != 0 { - attrSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DispatcherSCfg().AttrSConns, nil, - cfg.GeneralCfg().InternalTtl) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to AttributeS: %s", utils.DispatcherS, err.Error())) - exitChan <- true - return - } - } - if len(cfg.DispatcherSCfg().SessionSConns) != 0 { - sessionsSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DispatcherSCfg().SessionSConns, nil, - cfg.GeneralCfg().InternalTtl) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SessionS: %s", utils.DispatcherS, err.Error())) - exitChan <- true - return - } - } - if len(cfg.DispatcherSCfg().ChargerSConns) != 0 { - chargerSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DispatcherSCfg().ChargerSConns, nil, - cfg.GeneralCfg().InternalTtl) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ChargerS: %s", utils.DispatcherS, err.Error())) - exitChan <- true - return - } - } - dspS, err := dispatchers.NewDispatcherService(dm, ralsConns, resSConns, - threshSConns, statSConns, suplSConns, attrSConns, sessionsSConns, chargerSConns) + */ + + dspS, err := dispatchers.NewDispatcherService(dm, cfg) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherS, err.Error())) exitChan <- true @@ -1102,34 +1003,36 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc exitChan <- true return }() - if !cfg.ThresholdSCfg().Enabled && len(cfg.DispatcherSCfg().ThreshSConns) != 0 { - server.RpcRegisterName(utils.ThresholdSv1, - v1.NewDispatcherThresholdSv1(dspS)) - } - if !cfg.StatSCfg().Enabled && len(cfg.DispatcherSCfg().StatSConns) != 0 { - server.RpcRegisterName(utils.StatSv1, - v1.NewDispatcherStatSv1(dspS)) - } - if !cfg.ResourceSCfg().Enabled && len(cfg.DispatcherSCfg().ResSConns) != 0 { - server.RpcRegisterName(utils.ResourceSv1, - v1.NewDispatcherResourceSv1(dspS)) - } - if !cfg.SupplierSCfg().Enabled && len(cfg.DispatcherSCfg().SupplSConns) != 0 { - server.RpcRegisterName(utils.SupplierSv1, - v1.NewDispatcherSupplierSv1(dspS)) - } - if !cfg.AttributeSCfg().Enabled && len(cfg.DispatcherSCfg().AttrSConns) != 0 { - server.RpcRegisterName(utils.AttributeSv1, - v1.NewDispatcherAttributeSv1(dspS)) - } - if !cfg.SessionSCfg().Enabled && len(cfg.DispatcherSCfg().SessionSConns) != 0 { - server.RpcRegisterName(utils.SessionSv1, - v1.NewDispatcherSessionSv1(dspS)) - } - if !cfg.ChargerSCfg().Enabled && len(cfg.DispatcherSCfg().ChargerSConns) != 0 { - server.RpcRegisterName(utils.ChargerSv1, - v1.NewDispatcherChargerSv1(dspS)) - } + /* + if !cfg.ThresholdSCfg().Enabled && len(cfg.DispatcherSCfg().ThreshSConns) != 0 { + server.RpcRegisterName(utils.ThresholdSv1, + v1.NewDispatcherThresholdSv1(dspS)) + } + if !cfg.StatSCfg().Enabled && len(cfg.DispatcherSCfg().StatSConns) != 0 { + server.RpcRegisterName(utils.StatSv1, + v1.NewDispatcherStatSv1(dspS)) + } + if !cfg.ResourceSCfg().Enabled && len(cfg.DispatcherSCfg().ResSConns) != 0 { + server.RpcRegisterName(utils.ResourceSv1, + v1.NewDispatcherResourceSv1(dspS)) + } + if !cfg.SupplierSCfg().Enabled && len(cfg.DispatcherSCfg().SupplSConns) != 0 { + server.RpcRegisterName(utils.SupplierSv1, + v1.NewDispatcherSupplierSv1(dspS)) + } + if !cfg.AttributeSCfg().Enabled && len(cfg.DispatcherSCfg().AttrSConns) != 0 { + server.RpcRegisterName(utils.AttributeSv1, + v1.NewDispatcherAttributeSv1(dspS)) + } + if !cfg.SessionSCfg().Enabled && len(cfg.DispatcherSCfg().SessionSConns) != 0 { + server.RpcRegisterName(utils.SessionSv1, + v1.NewDispatcherSessionSv1(dspS)) + } + if !cfg.ChargerSCfg().Enabled && len(cfg.DispatcherSCfg().ChargerSConns) != 0 { + server.RpcRegisterName(utils.ChargerSv1, + v1.NewDispatcherChargerSv1(dspS)) + } + */ } // startAnalyzerService fires up the AnalyzerS diff --git a/dispatchers/attributes.go b/dispatchers/attributes.go index 8c18f48f1..55ef76395 100755 --- a/dispatchers/attributes.go +++ b/dispatchers/attributes.go @@ -18,6 +18,7 @@ along with this program. If not, see package dispatchers +/* import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -55,3 +56,4 @@ func (dS *DispatcherService) AttributeSv1ProcessEvent(args *ArgsAttrProcessEvent return dS.attrS.Call(utils.AttributeSv1ProcessEvent, args.AttrArgsProcessEvent, reply) } +*/ diff --git a/dispatchers/chargers.go b/dispatchers/chargers.go index e413ea02b..a0feeff17 100755 --- a/dispatchers/chargers.go +++ b/dispatchers/chargers.go @@ -18,6 +18,7 @@ along with this program. If not, see package dispatchers +/* import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -53,3 +54,4 @@ func (dS *DispatcherService) ChargerSv1ProcessEvent(args *CGREvWithApiKey, } return dS.chargerS.Call(utils.ChargerSv1ProcessEvent, args.CGREvent, reply) } +*/ diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go index 3a902f492..1ffd3821a 100755 --- a/dispatchers/dispatchers.go +++ b/dispatchers/dispatchers.go @@ -20,65 +20,26 @@ package dispatchers import ( "fmt" - "reflect" - "time" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/rpcclient" ) // NewDispatcherService initializes a DispatcherService -func NewDispatcherService(dm *engine.DataManager, rals, resS, thdS, - statS, splS, attrS, sessionS, chargerS rpcclient.RpcClientConnection) (*DispatcherService, error) { - if rals != nil && reflect.ValueOf(rals).IsNil() { - rals = nil - } - if resS != nil && reflect.ValueOf(resS).IsNil() { - resS = nil - } - if thdS != nil && reflect.ValueOf(thdS).IsNil() { - thdS = nil - } - if statS != nil && reflect.ValueOf(statS).IsNil() { - statS = nil - } - if splS != nil && reflect.ValueOf(splS).IsNil() { - splS = nil - } - if attrS != nil && reflect.ValueOf(attrS).IsNil() { - attrS = nil - } - if sessionS != nil && reflect.ValueOf(sessionS).IsNil() { - sessionS = nil - } - if chargerS != nil && reflect.ValueOf(chargerS).IsNil() { - chargerS = nil - } - return &DispatcherService{ - dm: dm, - rals: rals, - resS: resS, - thdS: thdS, - statS: statS, - splS: splS, - attrS: attrS, - sessionS: sessionS, - chargerS: chargerS}, nil +func NewDispatcherService(dm *engine.DataManager, + cfg *config.CGRConfig) (*DispatcherService, error) { + return &DispatcherService{dm: dm, cfg: cfg}, nil } // DispatcherService is the service handling dispatching towards internal components // designed to handle automatic partitioning and failover type DispatcherService struct { - dm *engine.DataManager - rals rpcclient.RpcClientConnection // RALs connections - resS rpcclient.RpcClientConnection // ResourceS connections - thdS rpcclient.RpcClientConnection // ThresholdS connections - statS rpcclient.RpcClientConnection // StatS connections - splS rpcclient.RpcClientConnection // SupplierS connections - attrS rpcclient.RpcClientConnection // AttributeS connections - sessionS rpcclient.RpcClientConnection // SessionS connections - chargerS rpcclient.RpcClientConnection // ChargerS connections + dm *engine.DataManager + cfg *config.CGRConfig + filterS *engine.FilterS + stringIndexedFields *[]string + prefixIndexedFields *[]string } // ListenAndServe will initialize the service @@ -96,45 +57,65 @@ func (dS *DispatcherService) Shutdown() error { return nil } -func (dS *DispatcherService) authorizeEvent(ev *utils.CGREvent, - reply *engine.AttrSProcessEventReply) (err error) { - if dS.attrS == nil { - return utils.NewErrNotConnected(utils.AttributeS) +// dispatcherForEvent returns a dispatcher instance configured for specific event +// or utils.ErrNotFound if none present +func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent, + subsys string) (d Dispatcher, err error) { + // find out the matching profiles + idxKeyPrfx := utils.ConcatenatedKey(ev.Tenant, utils.META_ANY) + if subsys != "" { + idxKeyPrfx = utils.ConcatenatedKey(ev.Tenant, subsys) } - if err = dS.attrS.Call(utils.AttributeSv1ProcessEvent, - &engine.AttrArgsProcessEvent{ - CGREvent: *ev}, reply); err != nil { - if err.Error() == utils.ErrNotFound.Error() { - err = utils.ErrUnknownApiKey + matchingPrfls := make(map[string]*engine.DispatcherProfile) + prflIDs, err := engine.MatchingItemIDsForEvent(ev.Event, dS.stringIndexedFields, dS.prefixIndexedFields, + dS.dm, utils.CacheDispatcherFilterIndexes, idxKeyPrfx, dS.cfg.FilterSCfg().IndexedSelects) + if err != nil { + return nil, err + } + for prflID := range prflIDs { + prfl, err := dS.dm.GetDispatcherProfile(ev.Tenant, prflID, true, true, utils.NonTransactional) + if err != nil { + if err == utils.ErrNotFound { + continue + } + return nil, err } + if prfl.ActivationInterval != nil && ev.Time != nil && + !prfl.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active + continue + } + if pass, err := dS.filterS.Pass(ev.Tenant, prfl.FilterIDs, + config.NewNavigableMap(ev.Event)); err != nil { + return nil, err + } else if !pass { + continue + } + matchingPrfls[prflID] = prfl + } + if len(matchingPrfls) == 0 { + return nil, utils.ErrNotFound + } + // All good, convert from Map to Slice so we can sort + prfls := make(engine.DispatcherProfiles, len(matchingPrfls)) + i := 0 + for _, prfl := range matchingPrfls { + prfls[i] = prfl + i++ + } + prfls.Sort() + matchedPrlf := prfls[0] // only use the first profile + tntID := matchedPrlf.TenantID() + // get or build the Dispatcher for the config + if x, ok := engine.Cache.Get(utils.CacheDispatchers, + tntID); ok && x != nil { + d = x.(Dispatcher) + d.SetProfile(matchedPrlf) return } - return -} - -func (dS *DispatcherService) authorize(method, tenant, apiKey string, evTime *time.Time) (err error) { - if apiKey == "" { - return utils.NewErrMandatoryIeMissing(utils.APIKey) - } - ev := &utils.CGREvent{ - Tenant: tenant, - ID: utils.UUIDSha1Prefix(), - Context: utils.StringPointer(utils.MetaAuth), - Time: evTime, - Event: map[string]interface{}{ - utils.APIKey: apiKey, - }, - } - var rplyEv engine.AttrSProcessEventReply - if err = dS.authorizeEvent(ev, &rplyEv); err != nil { - return - } - var apiMethods string - if apiMethods, err = rplyEv.CGREvent.FieldAsString(utils.APIMethods); err != nil { - return - } - if !ParseStringMap(apiMethods).HasKey(method) { - return utils.ErrUnauthorizedApi - } + if d, err = newDispatcher(matchedPrlf); err != nil { + return + } + engine.Cache.Set(utils.CacheDispatchers, tntID, d, nil, + true, utils.EmptyString) return } diff --git a/dispatchers/libdispatcher.go b/dispatchers/libdispatcher.go index b508764c4..69a99564b 100644 --- a/dispatchers/libdispatcher.go +++ b/dispatchers/libdispatcher.go @@ -19,8 +19,10 @@ along with this program. If not, see package dispatchers import ( + "fmt" "sort" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) @@ -56,3 +58,37 @@ type DispatcherProfiles []*DispatcherProfile func (dps DispatcherProfiles) Sort() { sort.Slice(dps, func(i, j int) bool { return dps[i].Weight > dps[j].Weight }) } + +// Dispatcher is responsible for routing requests to pool of connections +// there will be different implementations based on strategy +type Dispatcher interface { + // SetConfig is used to update the configuration information within dispatcher + // to make sure we take decisions based on latest config + SetProfile(pfl *engine.DispatcherProfile) + // GetConnID returns an ordered list of connection IDs for the event + NextConnID() (connID string) +} + +// newDispatcher constructs instances of Dispatcher +func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) { + switch pfl.Strategy { + case utils.MetaWeight: + d = &WeightDispatcher{pfl: pfl} + default: + err = fmt.Errorf("unsupported dispatch strategy: <%s>", pfl.Strategy) + } + return +} + +type WeightDispatcher struct { + pfl *engine.DispatcherProfile +} + +func (wd *WeightDispatcher) SetProfile(pfl *engine.DispatcherProfile) { + wd.pfl = pfl + return +} + +func (wd *WeightDispatcher) NextConnID() (connID string) { + return +} diff --git a/dispatchers/resources.go b/dispatchers/resources.go index 79cf58bee..a7cc39657 100755 --- a/dispatchers/resources.go +++ b/dispatchers/resources.go @@ -18,6 +18,7 @@ along with this program. If not, see package dispatchers +/* import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -42,3 +43,4 @@ func (dS *DispatcherService) ResourceSv1GetResourcesForEvent(args *ArgsV1ResUsag return dS.resS.Call(utils.ResourceSv1GetResourcesForEvent, args.ArgRSv1ResourceUsage, reply) } +*/ diff --git a/dispatchers/sessions.go b/dispatchers/sessions.go index 2b1e92676..d53d519c7 100755 --- a/dispatchers/sessions.go +++ b/dispatchers/sessions.go @@ -18,6 +18,7 @@ along with this program. If not, see package dispatchers +/* import ( "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" @@ -101,3 +102,4 @@ func (dS *DispatcherService) SessionSv1UpdateSession(args *UpdateSessionWithApiK } return dS.sessionS.Call(utils.SessionSv1UpdateSession, args.V1UpdateSessionArgs, reply) } +*/ diff --git a/dispatchers/stats.go b/dispatchers/stats.go index c5dd4e89a..3188e017a 100755 --- a/dispatchers/stats.go +++ b/dispatchers/stats.go @@ -18,6 +18,7 @@ along with this program. If not, see package dispatchers +/* import ( "time" @@ -67,3 +68,4 @@ func (dS *DispatcherService) StatSv1ProcessEvent(args *ArgsStatProcessEventWithA } return dS.statS.Call(utils.StatSv1ProcessEvent, args, reply) } +*/ diff --git a/dispatchers/suppliers.go b/dispatchers/suppliers.go index 27b9e4fa0..061b2fbe1 100755 --- a/dispatchers/suppliers.go +++ b/dispatchers/suppliers.go @@ -18,6 +18,7 @@ along with this program. If not, see package dispatchers +/* import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -42,3 +43,4 @@ func (dS *DispatcherService) SupplierSv1GetSuppliers(args *ArgsGetSuppliersWithA return dS.splS.Call(utils.SupplierSv1GetSuppliers, args.ArgsGetSuppliers, reply) } +*/ diff --git a/dispatchers/thresholds.go b/dispatchers/thresholds.go index b0e653ad9..6556d904f 100755 --- a/dispatchers/thresholds.go +++ b/dispatchers/thresholds.go @@ -18,6 +18,7 @@ along with this program. If not, see package dispatchers +/* import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -53,3 +54,4 @@ func (dS *DispatcherService) ThresholdSv1ProcessEvent(args *ArgsProcessEventWith } return dS.thdS.Call(utils.ThresholdSv1ProcessEvent, args.ArgsProcessEvent, tIDs) } +*/ diff --git a/engine/attributes.go b/engine/attributes.go index 92549340f..ff07e3837 100644 --- a/engine/attributes.go +++ b/engine/attributes.go @@ -70,13 +70,13 @@ func (alS *AttributeService) matchingAttributeProfilesForEvent(args *AttrArgsPro if len(args.AttributeIDs) != 0 { attrIDs = args.AttributeIDs } else { - aPrflIDs, err := matchingItemIDsForEvent(args.Event, alS.stringIndexedFields, alS.prefixIndexedFields, + aPrflIDs, err := MatchingItemIDsForEvent(args.Event, alS.stringIndexedFields, alS.prefixIndexedFields, alS.dm, utils.CacheAttributeFilterIndexes, attrIdxKey, alS.filterS.cfg.FilterSCfg().IndexedSelects) if err != nil { if err != utils.ErrNotFound { return nil, err } - if aPrflIDs, err = matchingItemIDsForEvent(args.Event, alS.stringIndexedFields, alS.prefixIndexedFields, + if aPrflIDs, err = MatchingItemIDsForEvent(args.Event, alS.stringIndexedFields, alS.prefixIndexedFields, alS.dm, utils.CacheAttributeFilterIndexes, utils.ConcatenatedKey(args.Tenant, utils.META_ANY), alS.filterS.cfg.FilterSCfg().IndexedSelects); err != nil { return nil, err diff --git a/engine/caches.go b/engine/caches.go index 5f3b8c5a3..d78bfc8c0 100644 --- a/engine/caches.go +++ b/engine/caches.go @@ -58,6 +58,7 @@ var precachedPartitions = []string{ utils.CacheSupplierProfiles, utils.CacheAttributeProfiles, utils.CacheChargerProfiles, + utils.CacheDispatcherProfiles, utils.CacheDiameterMessages, } diff --git a/engine/chargers.go b/engine/chargers.go index 84f1d88d1..25f600986 100644 --- a/engine/chargers.go +++ b/engine/chargers.go @@ -63,7 +63,7 @@ func (cS *ChargerService) Shutdown() (err error) { // matchingChargingProfilesForEvent returns ordered list of matching chargers which are active by the time of the function call func (cS *ChargerService) matchingChargerProfilesForEvent(cgrEv *utils.CGREvent) (cPs ChargerProfiles, err error) { - cpIDs, err := matchingItemIDsForEvent(cgrEv.Event, + cpIDs, err := MatchingItemIDsForEvent(cgrEv.Event, cS.cfg.ChargerSCfg().StringIndexedFields, cS.cfg.ChargerSCfg().PrefixIndexedFields, cS.dm, utils.CacheChargerFilterIndexes, cgrEv.Tenant, cS.cfg.FilterSCfg().IndexedSelects) if err != nil { diff --git a/engine/filterhelpers.go b/engine/filterhelpers.go index 39a8270d7..0c0939f81 100644 --- a/engine/filterhelpers.go +++ b/engine/filterhelpers.go @@ -27,10 +27,10 @@ import ( "github.com/cgrates/cgrates/utils" ) -// matchingItemIDsForEvent returns the list of item IDs matching fieldName/fieldValue for an event +// MatchingItemIDsForEvent returns the list of item IDs matching fieldName/fieldValue for an event // fieldIDs limits the fields which are checked against indexes // helper on top of dataDB.MatchFilterIndex, adding utils.ANY to list of fields queried -func matchingItemIDsForEvent(ev map[string]interface{}, stringFldIDs, prefixFldIDs *[]string, +func MatchingItemIDsForEvent(ev map[string]interface{}, stringFldIDs, prefixFldIDs *[]string, dm *DataManager, cacheID, itemIDPrefix string, indexedSelects bool) (itemIDs utils.StringMap, err error) { lockID := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, lockID) diff --git a/engine/filterhelpers_test.go b/engine/filterhelpers_test.go index 59f2ec8f7..b933b3a5c 100644 --- a/engine/filterhelpers_test.go +++ b/engine/filterhelpers_test.go @@ -81,7 +81,7 @@ func TestFilterMatchingItemIDsForEvent(t *testing.T) { utils.AnswerTime: time.Date(2014, 7, 14, 14, 30, 0, 0, time.UTC), "Field": "profile", } - aPrflIDs, err := matchingItemIDsForEvent(matchEV, nil, nil, + aPrflIDs, err := MatchingItemIDsForEvent(matchEV, nil, nil, dmMatch, utils.CacheAttributeFilterIndexes, prefix, true) if err != nil { t.Errorf("Error: %+v", err) @@ -93,7 +93,7 @@ func TestFilterMatchingItemIDsForEvent(t *testing.T) { matchEV = map[string]interface{}{ "Field": "profilePrefix", } - aPrflIDs, err = matchingItemIDsForEvent(matchEV, nil, nil, + aPrflIDs, err = MatchingItemIDsForEvent(matchEV, nil, nil, dmMatch, utils.CacheAttributeFilterIndexes, prefix, true) if err != nil { t.Errorf("Error: %+v", err) diff --git a/engine/resources.go b/engine/resources.go index 2a1be30e8..1a1ce8194 100644 --- a/engine/resources.go +++ b/engine/resources.go @@ -443,7 +443,7 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources) // matchingResourcesForEvent returns ordered list of matching resources which are active by the time of the call func (rS *ResourceService) matchingResourcesForEvent(ev *utils.CGREvent, usageTTL *time.Duration) (rs Resources, err error) { matchingResources := make(map[string]*Resource) - rIDs, err := matchingItemIDsForEvent(ev.Event, rS.stringIndexedFields, rS.prefixIndexedFields, + rIDs, err := MatchingItemIDsForEvent(ev.Event, rS.stringIndexedFields, rS.prefixIndexedFields, rS.dm, utils.CacheResourceFilterIndexes, ev.Tenant, rS.filterS.cfg.FilterSCfg().IndexedSelects) if err != nil { return nil, err diff --git a/engine/stats.go b/engine/stats.go index d05da2c67..388f49917 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -151,7 +151,7 @@ func (sS *StatService) matchingStatQueuesForEvent(args *StatsArgsProcessEvent) ( if len(args.StatIDs) != 0 { sqIDs = args.StatIDs } else { - mapIDs, err := matchingItemIDsForEvent(args.Event, sS.stringIndexedFields, sS.prefixIndexedFields, + mapIDs, err := MatchingItemIDsForEvent(args.Event, sS.stringIndexedFields, sS.prefixIndexedFields, sS.dm, utils.CacheStatFilterIndexes, args.Tenant, sS.filterS.cfg.FilterSCfg().IndexedSelects) if err != nil { return nil, err diff --git a/engine/suppliers.go b/engine/suppliers.go index 5be156fc4..c4782d246 100644 --- a/engine/suppliers.go +++ b/engine/suppliers.go @@ -128,7 +128,7 @@ func (spS *SupplierService) Shutdown() error { // matchingSupplierProfilesForEvent returns ordered list of matching resources which are active by the time of the call func (spS *SupplierService) matchingSupplierProfilesForEvent(ev *utils.CGREvent) (sPrfls SupplierProfiles, err error) { matchingLPs := make(map[string]*SupplierProfile) - sPrflIDs, err := matchingItemIDsForEvent(ev.Event, spS.stringIndexedFields, spS.prefixIndexedFields, + sPrflIDs, err := MatchingItemIDsForEvent(ev.Event, spS.stringIndexedFields, spS.prefixIndexedFields, spS.dm, utils.CacheSupplierFilterIndexes, ev.Tenant, spS.filterS.cfg.FilterSCfg().IndexedSelects) if err != nil { return nil, err diff --git a/engine/thresholds.go b/engine/thresholds.go index ef09bc824..368a742d2 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -223,7 +223,7 @@ func (tS *ThresholdService) matchingThresholdsForEvent(args *ArgsProcessEvent) ( if len(args.ThresholdIDs) != 0 { tIDs = args.ThresholdIDs } else { - tIDsMap, err := matchingItemIDsForEvent(args.Event, tS.stringIndexedFields, + tIDsMap, err := MatchingItemIDsForEvent(args.Event, tS.stringIndexedFields, tS.prefixIndexedFields, tS.dm, utils.CacheThresholdFilterIndexes, args.Tenant, tS.filterS.cfg.FilterSCfg().IndexedSelects) if err != nil { diff --git a/utils/consts.go b/utils/consts.go index 534c6a00f..d9ea45a9a 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -932,6 +932,7 @@ const ( CacheAttributeProfiles = "attribute_profiles" CacheChargerProfiles = "charger_profiles" CacheDispatcherProfiles = "dispatcher_profiles" + CacheDispatchers = "dispatchers" CacheResourceFilterIndexes = "resource_filter_indexes" CacheStatFilterIndexes = "stat_filter_indexes" CacheThresholdFilterIndexes = "threshold_filter_indexes"