diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index e2619fdeb..869775136 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -602,7 +602,7 @@ func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnec dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) { filterS := <-filterSChan filterSChan <- filterS - tS, err := engine.NewThresholdService(dm, cfg.ThresholdSCfg().FilteredFields, + tS, err := engine.NewThresholdService(dm, cfg.ThresholdSCfg().IndexedFields, cfg.ThresholdSCfg().StoreInterval, filterS) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not init, error: %s", err.Error())) diff --git a/config/config_defaults.go b/config/config_defaults.go index a2e053f1b..ce276e536 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -436,7 +436,7 @@ const CGRATES_CFG_JSON = ` "thresholds": { "enabled": false, // starts ThresholdS service: . "store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|$dur> - "filtered_fields": [], // match filters based on these fields for dynamic filtering, empty to use all + "indexed_fields": [], // query indexes based on these fields for faster processing }, diff --git a/config/config_json_test.go b/config/config_json_test.go index 4c8e03c54..be2770f07 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -715,9 +715,9 @@ func TestDfStatServiceJsonCfg(t *testing.T) { func TestDfThresholdSJsonCfg(t *testing.T) { eCfg := &ThresholdSJsonCfg{ - Enabled: utils.BoolPointer(false), - Store_interval: utils.StringPointer(""), - Filtered_fields: utils.StringSlicePointer([]string{}), + Enabled: utils.BoolPointer(false), + Store_interval: utils.StringPointer(""), + Indexed_fields: utils.StringSlicePointer([]string{}), } if cfg, err := dfCgrJsonCfg.ThresholdSJsonCfg(); err != nil { t.Error(err) diff --git a/config/config_test.go b/config/config_test.go index 0b4121803..ce1480c83 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -601,6 +601,7 @@ func TestCgrCfgJSONDefaultThresholdSCfg(t *testing.T) { eThresholdSCfg := &ThresholdSCfg{ Enabled: false, StoreInterval: 0, + IndexedFields: []string{}, } if !reflect.DeepEqual(eThresholdSCfg, cgrCfg.thresholdSCfg) { t.Errorf("received: %+v, expecting: %+v", eThresholdSCfg, cgrCfg.statsCfg) diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 5be9a2bb3..fbfe82f14 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -402,9 +402,9 @@ type StatServJsonCfg struct { // Threshold service config section type ThresholdSJsonCfg struct { - Enabled *bool - Store_interval *string - Filtered_fields *[]string + Enabled *bool + Store_interval *string + Indexed_fields *[]string } // Mailer config section diff --git a/config/thresholdscfg.go b/config/thresholdscfg.go index 78b7a24d0..625dd2bcc 100644 --- a/config/thresholdscfg.go +++ b/config/thresholdscfg.go @@ -25,9 +25,9 @@ import ( ) type ThresholdSCfg struct { - Enabled bool - StoreInterval time.Duration // Dump regularly from cache into dataDB - FilteredFields []string + Enabled bool + StoreInterval time.Duration // Dump regularly from cache into dataDB + IndexedFields []string } func (t *ThresholdSCfg) loadFromJsonCfg(jsnCfg *ThresholdSJsonCfg) (err error) { @@ -42,5 +42,11 @@ func (t *ThresholdSCfg) loadFromJsonCfg(jsnCfg *ThresholdSJsonCfg) (err error) { return err } } + if jsnCfg.Indexed_fields != nil { + t.IndexedFields = make([]string, len(*jsnCfg.Indexed_fields)) + for i, fID := range *jsnCfg.Indexed_fields { + t.IndexedFields[i] = fID + } + } return nil } diff --git a/engine/reqfilterhelpers.go b/engine/reqfilterhelpers.go index f07eef411..ad0ad6710 100644 --- a/engine/reqfilterhelpers.go +++ b/engine/reqfilterhelpers.go @@ -25,11 +25,24 @@ import ( ) // matchingItemIDsForEvent returns the list of item IDs matching fieldName/fieldValue for an event +// fieldIDs limits the fields which are checked against indexes // helper on top of dataDB.MatchReqFilterIndex, adding utils.NOT_AVAILABLE to list of fields queried // executes a number of $(len(fields) + 1) queries to dataDB so the size of event influences the speed of return -func matchingItemIDsForEvent(ev map[string]interface{}, dm *DataManager, dbIdxKey string) (itemIDs utils.StringMap, err error) { +func matchingItemIDsForEvent(ev map[string]interface{}, fieldIDs []string, dm *DataManager, dbIdxKey string) (itemIDs utils.StringMap, err error) { + if len(fieldIDs) == 0 { + fieldIDs = make([]string, len(ev)) + i := 0 + for fldID := range ev { + fieldIDs[i] = fldID + i += 1 + } + } itemIDs = make(utils.StringMap) - for fldName, fieldValIf := range ev { + for _, fldName := range fieldIDs { + fieldValIf, has := ev[fldName] + if !has { + continue + } fldVal, canCast := utils.CastFieldIfToString(fieldValIf) if !canCast { return nil, fmt.Errorf("Cannot cast field: %s into string", fldName) diff --git a/engine/resources.go b/engine/resources.go index 92cafe492..301f23f07 100755 --- a/engine/resources.go +++ b/engine/resources.go @@ -433,7 +433,7 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources) // matchingResourcesForEvent returns ordered list of matching resources which are active by the time of the call func (rS *ResourceService) matchingResourcesForEvent(tenant string, ev map[string]interface{}) (rs Resources, err error) { matchingResources := make(map[string]*Resource) - rIDs, err := matchingItemIDsForEvent(ev, rS.dm, utils.ResourceProfilesStringIndex+tenant) + rIDs, err := matchingItemIDsForEvent(ev, nil, rS.dm, utils.ResourceProfilesStringIndex+tenant) if err != nil { return nil, err } diff --git a/engine/stats.go b/engine/stats.go index dbe25b9ea..67b9697e7 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -139,7 +139,7 @@ func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) { // matchingStatQueuesForEvent returns ordered list of matching resources which are active by the time of the call func (sS *StatService) matchingStatQueuesForEvent(ev *StatEvent) (sqs StatQueues, err error) { matchingSQs := make(map[string]*StatQueue) - sqIDs, err := matchingItemIDsForEvent(ev.Event, sS.dm, utils.StatQueuesStringIndex+ev.Tenant) + sqIDs, err := matchingItemIDsForEvent(ev.Event, nil, sS.dm, utils.StatQueuesStringIndex+ev.Tenant) if err != nil { return nil, err } diff --git a/engine/thresholds.go b/engine/thresholds.go index b749632d2..6d9fdc057 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -151,25 +151,25 @@ func (ts Thresholds) Sort() { sort.Slice(ts, func(i, j int) bool { return ts[i].tPrfl.Weight > ts[j].tPrfl.Weight }) } -func NewThresholdService(dm *DataManager, filteredFields []string, storeInterval time.Duration, +func NewThresholdService(dm *DataManager, indexedFields []string, storeInterval time.Duration, filterS *FilterS) (tS *ThresholdService, err error) { return &ThresholdService{dm: dm, - filteredFields: filteredFields, - storeInterval: storeInterval, - filterS: filterS, - stopBackup: make(chan struct{}), - storedTdIDs: make(utils.StringMap)}, nil + indexedFields: indexedFields, + storeInterval: storeInterval, + filterS: filterS, + stopBackup: make(chan struct{}), + storedTdIDs: make(utils.StringMap)}, nil } // ThresholdService manages Threshold execution and storing them to dataDB type ThresholdService struct { - dm *DataManager - filteredFields []string // fields considered when searching for matching thresholds - storeInterval time.Duration - filterS *FilterS - stopBackup chan struct{} - storedTdIDs utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool - stMux sync.RWMutex // protects storedTdIDs + dm *DataManager + indexedFields []string // fields considered when searching for matching thresholds + storeInterval time.Duration + filterS *FilterS + stopBackup chan struct{} + storedTdIDs utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool + stMux sync.RWMutex // protects storedTdIDs } // Called to start the service @@ -254,7 +254,7 @@ func (tS *ThresholdService) StoreThreshold(t *Threshold) (err error) { // matchingThresholdsForEvent returns ordered list of matching thresholds which are active for an Event func (tS *ThresholdService) matchingThresholdsForEvent(ev *ThresholdEvent) (ts Thresholds, err error) { matchingTs := make(map[string]*Threshold) - tIDs, err := matchingItemIDsForEvent(ev.Event, tS.dm, utils.ThresholdStringIndex+ev.Tenant) + tIDs, err := matchingItemIDsForEvent(ev.Event, tS.indexedFields, tS.dm, utils.ThresholdStringIndex+ev.Tenant) if err != nil { return nil, err }