diff --git a/apier/v1/dispatcher.go b/apier/v1/dispatcher.go
index 1361dbb07..f956c624c 100755
--- a/apier/v1/dispatcher.go
+++ b/apier/v1/dispatcher.go
@@ -18,6 +18,7 @@ along with this program. If not, see
package v1
+/*
import (
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/engine"
@@ -281,3 +282,4 @@ func (dC *DispatcherChargerSv1) ProcessEvent(args *dispatchers.CGREvWithApiKey,
reply *[]*engine.AttrSProcessEventReply) (err error) {
return dC.dC.ChargerSv1ProcessEvent(args, reply)
}
+*/
diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go
index 607d996dc..da2a7dee4 100644
--- a/cmd/cgr-engine/cgr-engine.go
+++ b/cmd/cgr-engine/cgr-engine.go
@@ -28,7 +28,6 @@ import (
"runtime"
"runtime/pprof"
"strconv"
- "strings"
"syscall"
"time"
@@ -971,124 +970,26 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc
server *utils.Server, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS Dispatcher service.")
var err error
- var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, sessionsSConns, chargerSConns *rpcclient.RpcClientPool
+ //var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, sessionsSConns, chargerSConns *rpcclient.RpcClientPool
- cfg.DispatcherSCfg().DispatchingStrategy = strings.TrimPrefix(cfg.DispatcherSCfg().DispatchingStrategy,
- utils.Meta) // remote * from DispatchingStrategy
- if len(cfg.DispatcherSCfg().RALsConns) != 0 {
- ralsConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
- cfg.TlsCfg().ClientKey,
- cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
- cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
- cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
- cfg.DispatcherSCfg().RALsConns, internalRaterChan,
- cfg.GeneralCfg().InternalTtl)
- if err != nil {
- utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to RALs: %s", utils.DispatcherS, err.Error()))
- exitChan <- true
- return
+ /*
+ if len(cfg.DispatcherSCfg().RALsConns) != 0 {
+ ralsConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
+ cfg.TlsCfg().ClientKey,
+ cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
+ cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
+ cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
+ cfg.DispatcherSCfg().RALsConns, internalRaterChan,
+ cfg.GeneralCfg().InternalTtl)
+ if err != nil {
+ utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to RALs: %s", utils.DispatcherS, err.Error()))
+ exitChan <- true
+ return
+ }
}
- }
- if len(cfg.DispatcherSCfg().ResSConns) != 0 {
- resSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
- cfg.TlsCfg().ClientKey,
- cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
- cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
- cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
- cfg.DispatcherSCfg().ResSConns, nil,
- cfg.GeneralCfg().InternalTtl)
- if err != nil {
- utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ResoruceS: %s", utils.DispatcherS, err.Error()))
- exitChan <- true
- return
- }
- }
- if len(cfg.DispatcherSCfg().ThreshSConns) != 0 {
- threshSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
- cfg.TlsCfg().ClientKey,
- cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
- cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
- cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
- cfg.DispatcherSCfg().ThreshSConns, nil,
- cfg.GeneralCfg().InternalTtl)
- if err != nil {
- utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.DispatcherS, err.Error()))
- exitChan <- true
- return
- }
- }
- if len(cfg.DispatcherSCfg().StatSConns) != 0 {
- statSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
- cfg.TlsCfg().ClientKey,
- cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
- cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
- cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
- cfg.DispatcherSCfg().StatSConns, nil,
- cfg.GeneralCfg().InternalTtl)
- if err != nil {
- utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatQueueS: %s", utils.DispatcherS, err.Error()))
- exitChan <- true
- return
- }
- }
- if len(cfg.DispatcherSCfg().SupplSConns) != 0 {
- suplSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
- cfg.TlsCfg().ClientKey,
- cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
- cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
- cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
- cfg.DispatcherSCfg().SupplSConns, nil,
- cfg.GeneralCfg().InternalTtl)
- if err != nil {
- utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SupplierS: %s", utils.DispatcherS, err.Error()))
- exitChan <- true
- return
- }
- }
- if len(cfg.DispatcherSCfg().AttrSConns) != 0 {
- attrSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
- cfg.TlsCfg().ClientKey,
- cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
- cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
- cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
- cfg.DispatcherSCfg().AttrSConns, nil,
- cfg.GeneralCfg().InternalTtl)
- if err != nil {
- utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to AttributeS: %s", utils.DispatcherS, err.Error()))
- exitChan <- true
- return
- }
- }
- if len(cfg.DispatcherSCfg().SessionSConns) != 0 {
- sessionsSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
- cfg.TlsCfg().ClientKey,
- cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
- cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
- cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
- cfg.DispatcherSCfg().SessionSConns, nil,
- cfg.GeneralCfg().InternalTtl)
- if err != nil {
- utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SessionS: %s", utils.DispatcherS, err.Error()))
- exitChan <- true
- return
- }
- }
- if len(cfg.DispatcherSCfg().ChargerSConns) != 0 {
- chargerSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
- cfg.TlsCfg().ClientKey,
- cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
- cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
- cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
- cfg.DispatcherSCfg().ChargerSConns, nil,
- cfg.GeneralCfg().InternalTtl)
- if err != nil {
- utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ChargerS: %s", utils.DispatcherS, err.Error()))
- exitChan <- true
- return
- }
- }
- dspS, err := dispatchers.NewDispatcherService(dm, ralsConns, resSConns,
- threshSConns, statSConns, suplSConns, attrSConns, sessionsSConns, chargerSConns)
+ */
+
+ dspS, err := dispatchers.NewDispatcherService(dm, cfg)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherS, err.Error()))
exitChan <- true
@@ -1102,34 +1003,36 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc
exitChan <- true
return
}()
- if !cfg.ThresholdSCfg().Enabled && len(cfg.DispatcherSCfg().ThreshSConns) != 0 {
- server.RpcRegisterName(utils.ThresholdSv1,
- v1.NewDispatcherThresholdSv1(dspS))
- }
- if !cfg.StatSCfg().Enabled && len(cfg.DispatcherSCfg().StatSConns) != 0 {
- server.RpcRegisterName(utils.StatSv1,
- v1.NewDispatcherStatSv1(dspS))
- }
- if !cfg.ResourceSCfg().Enabled && len(cfg.DispatcherSCfg().ResSConns) != 0 {
- server.RpcRegisterName(utils.ResourceSv1,
- v1.NewDispatcherResourceSv1(dspS))
- }
- if !cfg.SupplierSCfg().Enabled && len(cfg.DispatcherSCfg().SupplSConns) != 0 {
- server.RpcRegisterName(utils.SupplierSv1,
- v1.NewDispatcherSupplierSv1(dspS))
- }
- if !cfg.AttributeSCfg().Enabled && len(cfg.DispatcherSCfg().AttrSConns) != 0 {
- server.RpcRegisterName(utils.AttributeSv1,
- v1.NewDispatcherAttributeSv1(dspS))
- }
- if !cfg.SessionSCfg().Enabled && len(cfg.DispatcherSCfg().SessionSConns) != 0 {
- server.RpcRegisterName(utils.SessionSv1,
- v1.NewDispatcherSessionSv1(dspS))
- }
- if !cfg.ChargerSCfg().Enabled && len(cfg.DispatcherSCfg().ChargerSConns) != 0 {
- server.RpcRegisterName(utils.ChargerSv1,
- v1.NewDispatcherChargerSv1(dspS))
- }
+ /*
+ if !cfg.ThresholdSCfg().Enabled && len(cfg.DispatcherSCfg().ThreshSConns) != 0 {
+ server.RpcRegisterName(utils.ThresholdSv1,
+ v1.NewDispatcherThresholdSv1(dspS))
+ }
+ if !cfg.StatSCfg().Enabled && len(cfg.DispatcherSCfg().StatSConns) != 0 {
+ server.RpcRegisterName(utils.StatSv1,
+ v1.NewDispatcherStatSv1(dspS))
+ }
+ if !cfg.ResourceSCfg().Enabled && len(cfg.DispatcherSCfg().ResSConns) != 0 {
+ server.RpcRegisterName(utils.ResourceSv1,
+ v1.NewDispatcherResourceSv1(dspS))
+ }
+ if !cfg.SupplierSCfg().Enabled && len(cfg.DispatcherSCfg().SupplSConns) != 0 {
+ server.RpcRegisterName(utils.SupplierSv1,
+ v1.NewDispatcherSupplierSv1(dspS))
+ }
+ if !cfg.AttributeSCfg().Enabled && len(cfg.DispatcherSCfg().AttrSConns) != 0 {
+ server.RpcRegisterName(utils.AttributeSv1,
+ v1.NewDispatcherAttributeSv1(dspS))
+ }
+ if !cfg.SessionSCfg().Enabled && len(cfg.DispatcherSCfg().SessionSConns) != 0 {
+ server.RpcRegisterName(utils.SessionSv1,
+ v1.NewDispatcherSessionSv1(dspS))
+ }
+ if !cfg.ChargerSCfg().Enabled && len(cfg.DispatcherSCfg().ChargerSConns) != 0 {
+ server.RpcRegisterName(utils.ChargerSv1,
+ v1.NewDispatcherChargerSv1(dspS))
+ }
+ */
}
// startAnalyzerService fires up the AnalyzerS
diff --git a/dispatchers/attributes.go b/dispatchers/attributes.go
index 8c18f48f1..55ef76395 100755
--- a/dispatchers/attributes.go
+++ b/dispatchers/attributes.go
@@ -18,6 +18,7 @@ along with this program. If not, see
package dispatchers
+/*
import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -55,3 +56,4 @@ func (dS *DispatcherService) AttributeSv1ProcessEvent(args *ArgsAttrProcessEvent
return dS.attrS.Call(utils.AttributeSv1ProcessEvent, args.AttrArgsProcessEvent, reply)
}
+*/
diff --git a/dispatchers/chargers.go b/dispatchers/chargers.go
index e413ea02b..a0feeff17 100755
--- a/dispatchers/chargers.go
+++ b/dispatchers/chargers.go
@@ -18,6 +18,7 @@ along with this program. If not, see
package dispatchers
+/*
import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -53,3 +54,4 @@ func (dS *DispatcherService) ChargerSv1ProcessEvent(args *CGREvWithApiKey,
}
return dS.chargerS.Call(utils.ChargerSv1ProcessEvent, args.CGREvent, reply)
}
+*/
diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go
index 3a902f492..1ffd3821a 100755
--- a/dispatchers/dispatchers.go
+++ b/dispatchers/dispatchers.go
@@ -20,65 +20,26 @@ package dispatchers
import (
"fmt"
- "reflect"
- "time"
+ "github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
- "github.com/cgrates/rpcclient"
)
// NewDispatcherService initializes a DispatcherService
-func NewDispatcherService(dm *engine.DataManager, rals, resS, thdS,
- statS, splS, attrS, sessionS, chargerS rpcclient.RpcClientConnection) (*DispatcherService, error) {
- if rals != nil && reflect.ValueOf(rals).IsNil() {
- rals = nil
- }
- if resS != nil && reflect.ValueOf(resS).IsNil() {
- resS = nil
- }
- if thdS != nil && reflect.ValueOf(thdS).IsNil() {
- thdS = nil
- }
- if statS != nil && reflect.ValueOf(statS).IsNil() {
- statS = nil
- }
- if splS != nil && reflect.ValueOf(splS).IsNil() {
- splS = nil
- }
- if attrS != nil && reflect.ValueOf(attrS).IsNil() {
- attrS = nil
- }
- if sessionS != nil && reflect.ValueOf(sessionS).IsNil() {
- sessionS = nil
- }
- if chargerS != nil && reflect.ValueOf(chargerS).IsNil() {
- chargerS = nil
- }
- return &DispatcherService{
- dm: dm,
- rals: rals,
- resS: resS,
- thdS: thdS,
- statS: statS,
- splS: splS,
- attrS: attrS,
- sessionS: sessionS,
- chargerS: chargerS}, nil
+func NewDispatcherService(dm *engine.DataManager,
+ cfg *config.CGRConfig) (*DispatcherService, error) {
+ return &DispatcherService{dm: dm, cfg: cfg}, nil
}
// DispatcherService is the service handling dispatching towards internal components
// designed to handle automatic partitioning and failover
type DispatcherService struct {
- dm *engine.DataManager
- rals rpcclient.RpcClientConnection // RALs connections
- resS rpcclient.RpcClientConnection // ResourceS connections
- thdS rpcclient.RpcClientConnection // ThresholdS connections
- statS rpcclient.RpcClientConnection // StatS connections
- splS rpcclient.RpcClientConnection // SupplierS connections
- attrS rpcclient.RpcClientConnection // AttributeS connections
- sessionS rpcclient.RpcClientConnection // SessionS connections
- chargerS rpcclient.RpcClientConnection // ChargerS connections
+ dm *engine.DataManager
+ cfg *config.CGRConfig
+ filterS *engine.FilterS
+ stringIndexedFields *[]string
+ prefixIndexedFields *[]string
}
// ListenAndServe will initialize the service
@@ -96,45 +57,65 @@ func (dS *DispatcherService) Shutdown() error {
return nil
}
-func (dS *DispatcherService) authorizeEvent(ev *utils.CGREvent,
- reply *engine.AttrSProcessEventReply) (err error) {
- if dS.attrS == nil {
- return utils.NewErrNotConnected(utils.AttributeS)
+// dispatcherForEvent returns a dispatcher instance configured for specific event
+// or utils.ErrNotFound if none present
+func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent,
+ subsys string) (d Dispatcher, err error) {
+ // find out the matching profiles
+ idxKeyPrfx := utils.ConcatenatedKey(ev.Tenant, utils.META_ANY)
+ if subsys != "" {
+ idxKeyPrfx = utils.ConcatenatedKey(ev.Tenant, subsys)
}
- if err = dS.attrS.Call(utils.AttributeSv1ProcessEvent,
- &engine.AttrArgsProcessEvent{
- CGREvent: *ev}, reply); err != nil {
- if err.Error() == utils.ErrNotFound.Error() {
- err = utils.ErrUnknownApiKey
+ matchingPrfls := make(map[string]*engine.DispatcherProfile)
+ prflIDs, err := engine.MatchingItemIDsForEvent(ev.Event, dS.stringIndexedFields, dS.prefixIndexedFields,
+ dS.dm, utils.CacheDispatcherFilterIndexes, idxKeyPrfx, dS.cfg.FilterSCfg().IndexedSelects)
+ if err != nil {
+ return nil, err
+ }
+ for prflID := range prflIDs {
+ prfl, err := dS.dm.GetDispatcherProfile(ev.Tenant, prflID, true, true, utils.NonTransactional)
+ if err != nil {
+ if err == utils.ErrNotFound {
+ continue
+ }
+ return nil, err
}
+ if prfl.ActivationInterval != nil && ev.Time != nil &&
+ !prfl.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active
+ continue
+ }
+ if pass, err := dS.filterS.Pass(ev.Tenant, prfl.FilterIDs,
+ config.NewNavigableMap(ev.Event)); err != nil {
+ return nil, err
+ } else if !pass {
+ continue
+ }
+ matchingPrfls[prflID] = prfl
+ }
+ if len(matchingPrfls) == 0 {
+ return nil, utils.ErrNotFound
+ }
+ // All good, convert from Map to Slice so we can sort
+ prfls := make(engine.DispatcherProfiles, len(matchingPrfls))
+ i := 0
+ for _, prfl := range matchingPrfls {
+ prfls[i] = prfl
+ i++
+ }
+ prfls.Sort()
+ matchedPrlf := prfls[0] // only use the first profile
+ tntID := matchedPrlf.TenantID()
+ // get or build the Dispatcher for the config
+ if x, ok := engine.Cache.Get(utils.CacheDispatchers,
+ tntID); ok && x != nil {
+ d = x.(Dispatcher)
+ d.SetProfile(matchedPrlf)
return
}
- return
-}
-
-func (dS *DispatcherService) authorize(method, tenant, apiKey string, evTime *time.Time) (err error) {
- if apiKey == "" {
- return utils.NewErrMandatoryIeMissing(utils.APIKey)
- }
- ev := &utils.CGREvent{
- Tenant: tenant,
- ID: utils.UUIDSha1Prefix(),
- Context: utils.StringPointer(utils.MetaAuth),
- Time: evTime,
- Event: map[string]interface{}{
- utils.APIKey: apiKey,
- },
- }
- var rplyEv engine.AttrSProcessEventReply
- if err = dS.authorizeEvent(ev, &rplyEv); err != nil {
- return
- }
- var apiMethods string
- if apiMethods, err = rplyEv.CGREvent.FieldAsString(utils.APIMethods); err != nil {
- return
- }
- if !ParseStringMap(apiMethods).HasKey(method) {
- return utils.ErrUnauthorizedApi
- }
+ if d, err = newDispatcher(matchedPrlf); err != nil {
+ return
+ }
+ engine.Cache.Set(utils.CacheDispatchers, tntID, d, nil,
+ true, utils.EmptyString)
return
}
diff --git a/dispatchers/libdispatcher.go b/dispatchers/libdispatcher.go
index b508764c4..69a99564b 100644
--- a/dispatchers/libdispatcher.go
+++ b/dispatchers/libdispatcher.go
@@ -19,8 +19,10 @@ along with this program. If not, see
package dispatchers
import (
+ "fmt"
"sort"
+ "github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -56,3 +58,37 @@ type DispatcherProfiles []*DispatcherProfile
func (dps DispatcherProfiles) Sort() {
sort.Slice(dps, func(i, j int) bool { return dps[i].Weight > dps[j].Weight })
}
+
+// Dispatcher is responsible for routing requests to pool of connections
+// there will be different implementations based on strategy
+type Dispatcher interface {
+ // SetConfig is used to update the configuration information within dispatcher
+ // to make sure we take decisions based on latest config
+ SetProfile(pfl *engine.DispatcherProfile)
+ // GetConnID returns an ordered list of connection IDs for the event
+ NextConnID() (connID string)
+}
+
+// newDispatcher constructs instances of Dispatcher
+func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) {
+ switch pfl.Strategy {
+ case utils.MetaWeight:
+ d = &WeightDispatcher{pfl: pfl}
+ default:
+ err = fmt.Errorf("unsupported dispatch strategy: <%s>", pfl.Strategy)
+ }
+ return
+}
+
+type WeightDispatcher struct {
+ pfl *engine.DispatcherProfile
+}
+
+func (wd *WeightDispatcher) SetProfile(pfl *engine.DispatcherProfile) {
+ wd.pfl = pfl
+ return
+}
+
+func (wd *WeightDispatcher) NextConnID() (connID string) {
+ return
+}
diff --git a/dispatchers/resources.go b/dispatchers/resources.go
index 79cf58bee..a7cc39657 100755
--- a/dispatchers/resources.go
+++ b/dispatchers/resources.go
@@ -18,6 +18,7 @@ along with this program. If not, see
package dispatchers
+/*
import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -42,3 +43,4 @@ func (dS *DispatcherService) ResourceSv1GetResourcesForEvent(args *ArgsV1ResUsag
return dS.resS.Call(utils.ResourceSv1GetResourcesForEvent, args.ArgRSv1ResourceUsage, reply)
}
+*/
diff --git a/dispatchers/sessions.go b/dispatchers/sessions.go
index 2b1e92676..d53d519c7 100755
--- a/dispatchers/sessions.go
+++ b/dispatchers/sessions.go
@@ -18,6 +18,7 @@ along with this program. If not, see
package dispatchers
+/*
import (
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
@@ -101,3 +102,4 @@ func (dS *DispatcherService) SessionSv1UpdateSession(args *UpdateSessionWithApiK
}
return dS.sessionS.Call(utils.SessionSv1UpdateSession, args.V1UpdateSessionArgs, reply)
}
+*/
diff --git a/dispatchers/stats.go b/dispatchers/stats.go
index c5dd4e89a..3188e017a 100755
--- a/dispatchers/stats.go
+++ b/dispatchers/stats.go
@@ -18,6 +18,7 @@ along with this program. If not, see
package dispatchers
+/*
import (
"time"
@@ -67,3 +68,4 @@ func (dS *DispatcherService) StatSv1ProcessEvent(args *ArgsStatProcessEventWithA
}
return dS.statS.Call(utils.StatSv1ProcessEvent, args, reply)
}
+*/
diff --git a/dispatchers/suppliers.go b/dispatchers/suppliers.go
index 27b9e4fa0..061b2fbe1 100755
--- a/dispatchers/suppliers.go
+++ b/dispatchers/suppliers.go
@@ -18,6 +18,7 @@ along with this program. If not, see
package dispatchers
+/*
import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -42,3 +43,4 @@ func (dS *DispatcherService) SupplierSv1GetSuppliers(args *ArgsGetSuppliersWithA
return dS.splS.Call(utils.SupplierSv1GetSuppliers, args.ArgsGetSuppliers, reply)
}
+*/
diff --git a/dispatchers/thresholds.go b/dispatchers/thresholds.go
index b0e653ad9..6556d904f 100755
--- a/dispatchers/thresholds.go
+++ b/dispatchers/thresholds.go
@@ -18,6 +18,7 @@ along with this program. If not, see
package dispatchers
+/*
import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -53,3 +54,4 @@ func (dS *DispatcherService) ThresholdSv1ProcessEvent(args *ArgsProcessEventWith
}
return dS.thdS.Call(utils.ThresholdSv1ProcessEvent, args.ArgsProcessEvent, tIDs)
}
+*/
diff --git a/engine/attributes.go b/engine/attributes.go
index 92549340f..ff07e3837 100644
--- a/engine/attributes.go
+++ b/engine/attributes.go
@@ -70,13 +70,13 @@ func (alS *AttributeService) matchingAttributeProfilesForEvent(args *AttrArgsPro
if len(args.AttributeIDs) != 0 {
attrIDs = args.AttributeIDs
} else {
- aPrflIDs, err := matchingItemIDsForEvent(args.Event, alS.stringIndexedFields, alS.prefixIndexedFields,
+ aPrflIDs, err := MatchingItemIDsForEvent(args.Event, alS.stringIndexedFields, alS.prefixIndexedFields,
alS.dm, utils.CacheAttributeFilterIndexes, attrIdxKey, alS.filterS.cfg.FilterSCfg().IndexedSelects)
if err != nil {
if err != utils.ErrNotFound {
return nil, err
}
- if aPrflIDs, err = matchingItemIDsForEvent(args.Event, alS.stringIndexedFields, alS.prefixIndexedFields,
+ if aPrflIDs, err = MatchingItemIDsForEvent(args.Event, alS.stringIndexedFields, alS.prefixIndexedFields,
alS.dm, utils.CacheAttributeFilterIndexes, utils.ConcatenatedKey(args.Tenant, utils.META_ANY),
alS.filterS.cfg.FilterSCfg().IndexedSelects); err != nil {
return nil, err
diff --git a/engine/caches.go b/engine/caches.go
index 5f3b8c5a3..d78bfc8c0 100644
--- a/engine/caches.go
+++ b/engine/caches.go
@@ -58,6 +58,7 @@ var precachedPartitions = []string{
utils.CacheSupplierProfiles,
utils.CacheAttributeProfiles,
utils.CacheChargerProfiles,
+ utils.CacheDispatcherProfiles,
utils.CacheDiameterMessages,
}
diff --git a/engine/chargers.go b/engine/chargers.go
index 84f1d88d1..25f600986 100644
--- a/engine/chargers.go
+++ b/engine/chargers.go
@@ -63,7 +63,7 @@ func (cS *ChargerService) Shutdown() (err error) {
// matchingChargingProfilesForEvent returns ordered list of matching chargers which are active by the time of the function call
func (cS *ChargerService) matchingChargerProfilesForEvent(cgrEv *utils.CGREvent) (cPs ChargerProfiles, err error) {
- cpIDs, err := matchingItemIDsForEvent(cgrEv.Event,
+ cpIDs, err := MatchingItemIDsForEvent(cgrEv.Event,
cS.cfg.ChargerSCfg().StringIndexedFields, cS.cfg.ChargerSCfg().PrefixIndexedFields,
cS.dm, utils.CacheChargerFilterIndexes, cgrEv.Tenant, cS.cfg.FilterSCfg().IndexedSelects)
if err != nil {
diff --git a/engine/filterhelpers.go b/engine/filterhelpers.go
index 39a8270d7..0c0939f81 100644
--- a/engine/filterhelpers.go
+++ b/engine/filterhelpers.go
@@ -27,10 +27,10 @@ import (
"github.com/cgrates/cgrates/utils"
)
-// matchingItemIDsForEvent returns the list of item IDs matching fieldName/fieldValue for an event
+// MatchingItemIDsForEvent returns the list of item IDs matching fieldName/fieldValue for an event
// fieldIDs limits the fields which are checked against indexes
// helper on top of dataDB.MatchFilterIndex, adding utils.ANY to list of fields queried
-func matchingItemIDsForEvent(ev map[string]interface{}, stringFldIDs, prefixFldIDs *[]string,
+func MatchingItemIDsForEvent(ev map[string]interface{}, stringFldIDs, prefixFldIDs *[]string,
dm *DataManager, cacheID, itemIDPrefix string, indexedSelects bool) (itemIDs utils.StringMap, err error) {
lockID := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, lockID)
diff --git a/engine/filterhelpers_test.go b/engine/filterhelpers_test.go
index 59f2ec8f7..b933b3a5c 100644
--- a/engine/filterhelpers_test.go
+++ b/engine/filterhelpers_test.go
@@ -81,7 +81,7 @@ func TestFilterMatchingItemIDsForEvent(t *testing.T) {
utils.AnswerTime: time.Date(2014, 7, 14, 14, 30, 0, 0, time.UTC),
"Field": "profile",
}
- aPrflIDs, err := matchingItemIDsForEvent(matchEV, nil, nil,
+ aPrflIDs, err := MatchingItemIDsForEvent(matchEV, nil, nil,
dmMatch, utils.CacheAttributeFilterIndexes, prefix, true)
if err != nil {
t.Errorf("Error: %+v", err)
@@ -93,7 +93,7 @@ func TestFilterMatchingItemIDsForEvent(t *testing.T) {
matchEV = map[string]interface{}{
"Field": "profilePrefix",
}
- aPrflIDs, err = matchingItemIDsForEvent(matchEV, nil, nil,
+ aPrflIDs, err = MatchingItemIDsForEvent(matchEV, nil, nil,
dmMatch, utils.CacheAttributeFilterIndexes, prefix, true)
if err != nil {
t.Errorf("Error: %+v", err)
diff --git a/engine/resources.go b/engine/resources.go
index 2a1be30e8..1a1ce8194 100644
--- a/engine/resources.go
+++ b/engine/resources.go
@@ -443,7 +443,7 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources)
// matchingResourcesForEvent returns ordered list of matching resources which are active by the time of the call
func (rS *ResourceService) matchingResourcesForEvent(ev *utils.CGREvent, usageTTL *time.Duration) (rs Resources, err error) {
matchingResources := make(map[string]*Resource)
- rIDs, err := matchingItemIDsForEvent(ev.Event, rS.stringIndexedFields, rS.prefixIndexedFields,
+ rIDs, err := MatchingItemIDsForEvent(ev.Event, rS.stringIndexedFields, rS.prefixIndexedFields,
rS.dm, utils.CacheResourceFilterIndexes, ev.Tenant, rS.filterS.cfg.FilterSCfg().IndexedSelects)
if err != nil {
return nil, err
diff --git a/engine/stats.go b/engine/stats.go
index d05da2c67..388f49917 100644
--- a/engine/stats.go
+++ b/engine/stats.go
@@ -151,7 +151,7 @@ func (sS *StatService) matchingStatQueuesForEvent(args *StatsArgsProcessEvent) (
if len(args.StatIDs) != 0 {
sqIDs = args.StatIDs
} else {
- mapIDs, err := matchingItemIDsForEvent(args.Event, sS.stringIndexedFields, sS.prefixIndexedFields,
+ mapIDs, err := MatchingItemIDsForEvent(args.Event, sS.stringIndexedFields, sS.prefixIndexedFields,
sS.dm, utils.CacheStatFilterIndexes, args.Tenant, sS.filterS.cfg.FilterSCfg().IndexedSelects)
if err != nil {
return nil, err
diff --git a/engine/suppliers.go b/engine/suppliers.go
index 5be156fc4..c4782d246 100644
--- a/engine/suppliers.go
+++ b/engine/suppliers.go
@@ -128,7 +128,7 @@ func (spS *SupplierService) Shutdown() error {
// matchingSupplierProfilesForEvent returns ordered list of matching resources which are active by the time of the call
func (spS *SupplierService) matchingSupplierProfilesForEvent(ev *utils.CGREvent) (sPrfls SupplierProfiles, err error) {
matchingLPs := make(map[string]*SupplierProfile)
- sPrflIDs, err := matchingItemIDsForEvent(ev.Event, spS.stringIndexedFields, spS.prefixIndexedFields,
+ sPrflIDs, err := MatchingItemIDsForEvent(ev.Event, spS.stringIndexedFields, spS.prefixIndexedFields,
spS.dm, utils.CacheSupplierFilterIndexes, ev.Tenant, spS.filterS.cfg.FilterSCfg().IndexedSelects)
if err != nil {
return nil, err
diff --git a/engine/thresholds.go b/engine/thresholds.go
index ef09bc824..368a742d2 100644
--- a/engine/thresholds.go
+++ b/engine/thresholds.go
@@ -223,7 +223,7 @@ func (tS *ThresholdService) matchingThresholdsForEvent(args *ArgsProcessEvent) (
if len(args.ThresholdIDs) != 0 {
tIDs = args.ThresholdIDs
} else {
- tIDsMap, err := matchingItemIDsForEvent(args.Event, tS.stringIndexedFields,
+ tIDsMap, err := MatchingItemIDsForEvent(args.Event, tS.stringIndexedFields,
tS.prefixIndexedFields, tS.dm, utils.CacheThresholdFilterIndexes,
args.Tenant, tS.filterS.cfg.FilterSCfg().IndexedSelects)
if err != nil {
diff --git a/utils/consts.go b/utils/consts.go
index 534c6a00f..d9ea45a9a 100755
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -932,6 +932,7 @@ const (
CacheAttributeProfiles = "attribute_profiles"
CacheChargerProfiles = "charger_profiles"
CacheDispatcherProfiles = "dispatcher_profiles"
+ CacheDispatchers = "dispatchers"
CacheResourceFilterIndexes = "resource_filter_indexes"
CacheStatFilterIndexes = "stat_filter_indexes"
CacheThresholdFilterIndexes = "threshold_filter_indexes"