mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
ThresholdS indexed_fields option in config
This commit is contained in:
@@ -602,7 +602,7 @@ func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnec
|
||||
dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
|
||||
filterS := <-filterSChan
|
||||
filterSChan <- filterS
|
||||
tS, err := engine.NewThresholdService(dm, cfg.ThresholdSCfg().FilteredFields,
|
||||
tS, err := engine.NewThresholdService(dm, cfg.ThresholdSCfg().IndexedFields,
|
||||
cfg.ThresholdSCfg().StoreInterval, filterS)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<ThresholdS> Could not init, error: %s", err.Error()))
|
||||
|
||||
@@ -436,7 +436,7 @@ const CGRATES_CFG_JSON = `
|
||||
"thresholds": {
|
||||
"enabled": false, // starts ThresholdS service: <true|false>.
|
||||
"store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|$dur>
|
||||
"filtered_fields": [], // match filters based on these fields for dynamic filtering, empty to use all
|
||||
"indexed_fields": [], // query indexes based on these fields for faster processing
|
||||
},
|
||||
|
||||
|
||||
|
||||
@@ -715,9 +715,9 @@ func TestDfStatServiceJsonCfg(t *testing.T) {
|
||||
|
||||
func TestDfThresholdSJsonCfg(t *testing.T) {
|
||||
eCfg := &ThresholdSJsonCfg{
|
||||
Enabled: utils.BoolPointer(false),
|
||||
Store_interval: utils.StringPointer(""),
|
||||
Filtered_fields: utils.StringSlicePointer([]string{}),
|
||||
Enabled: utils.BoolPointer(false),
|
||||
Store_interval: utils.StringPointer(""),
|
||||
Indexed_fields: utils.StringSlicePointer([]string{}),
|
||||
}
|
||||
if cfg, err := dfCgrJsonCfg.ThresholdSJsonCfg(); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
@@ -601,6 +601,7 @@ func TestCgrCfgJSONDefaultThresholdSCfg(t *testing.T) {
|
||||
eThresholdSCfg := &ThresholdSCfg{
|
||||
Enabled: false,
|
||||
StoreInterval: 0,
|
||||
IndexedFields: []string{},
|
||||
}
|
||||
if !reflect.DeepEqual(eThresholdSCfg, cgrCfg.thresholdSCfg) {
|
||||
t.Errorf("received: %+v, expecting: %+v", eThresholdSCfg, cgrCfg.statsCfg)
|
||||
|
||||
@@ -402,9 +402,9 @@ type StatServJsonCfg struct {
|
||||
|
||||
// Threshold service config section
|
||||
type ThresholdSJsonCfg struct {
|
||||
Enabled *bool
|
||||
Store_interval *string
|
||||
Filtered_fields *[]string
|
||||
Enabled *bool
|
||||
Store_interval *string
|
||||
Indexed_fields *[]string
|
||||
}
|
||||
|
||||
// Mailer config section
|
||||
|
||||
@@ -25,9 +25,9 @@ import (
|
||||
)
|
||||
|
||||
type ThresholdSCfg struct {
|
||||
Enabled bool
|
||||
StoreInterval time.Duration // Dump regularly from cache into dataDB
|
||||
FilteredFields []string
|
||||
Enabled bool
|
||||
StoreInterval time.Duration // Dump regularly from cache into dataDB
|
||||
IndexedFields []string
|
||||
}
|
||||
|
||||
func (t *ThresholdSCfg) loadFromJsonCfg(jsnCfg *ThresholdSJsonCfg) (err error) {
|
||||
@@ -42,5 +42,11 @@ func (t *ThresholdSCfg) loadFromJsonCfg(jsnCfg *ThresholdSJsonCfg) (err error) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if jsnCfg.Indexed_fields != nil {
|
||||
t.IndexedFields = make([]string, len(*jsnCfg.Indexed_fields))
|
||||
for i, fID := range *jsnCfg.Indexed_fields {
|
||||
t.IndexedFields[i] = fID
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -25,11 +25,24 @@ import (
|
||||
)
|
||||
|
||||
// matchingItemIDsForEvent returns the list of item IDs matching fieldName/fieldValue for an event
|
||||
// fieldIDs limits the fields which are checked against indexes
|
||||
// helper on top of dataDB.MatchReqFilterIndex, adding utils.NOT_AVAILABLE to list of fields queried
|
||||
// executes a number of $(len(fields) + 1) queries to dataDB so the size of event influences the speed of return
|
||||
func matchingItemIDsForEvent(ev map[string]interface{}, dm *DataManager, dbIdxKey string) (itemIDs utils.StringMap, err error) {
|
||||
func matchingItemIDsForEvent(ev map[string]interface{}, fieldIDs []string, dm *DataManager, dbIdxKey string) (itemIDs utils.StringMap, err error) {
|
||||
if len(fieldIDs) == 0 {
|
||||
fieldIDs = make([]string, len(ev))
|
||||
i := 0
|
||||
for fldID := range ev {
|
||||
fieldIDs[i] = fldID
|
||||
i += 1
|
||||
}
|
||||
}
|
||||
itemIDs = make(utils.StringMap)
|
||||
for fldName, fieldValIf := range ev {
|
||||
for _, fldName := range fieldIDs {
|
||||
fieldValIf, has := ev[fldName]
|
||||
if !has {
|
||||
continue
|
||||
}
|
||||
fldVal, canCast := utils.CastFieldIfToString(fieldValIf)
|
||||
if !canCast {
|
||||
return nil, fmt.Errorf("Cannot cast field: %s into string", fldName)
|
||||
|
||||
@@ -433,7 +433,7 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources)
|
||||
// matchingResourcesForEvent returns ordered list of matching resources which are active by the time of the call
|
||||
func (rS *ResourceService) matchingResourcesForEvent(tenant string, ev map[string]interface{}) (rs Resources, err error) {
|
||||
matchingResources := make(map[string]*Resource)
|
||||
rIDs, err := matchingItemIDsForEvent(ev, rS.dm, utils.ResourceProfilesStringIndex+tenant)
|
||||
rIDs, err := matchingItemIDsForEvent(ev, nil, rS.dm, utils.ResourceProfilesStringIndex+tenant)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -139,7 +139,7 @@ func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) {
|
||||
// matchingStatQueuesForEvent returns ordered list of matching resources which are active by the time of the call
|
||||
func (sS *StatService) matchingStatQueuesForEvent(ev *StatEvent) (sqs StatQueues, err error) {
|
||||
matchingSQs := make(map[string]*StatQueue)
|
||||
sqIDs, err := matchingItemIDsForEvent(ev.Event, sS.dm, utils.StatQueuesStringIndex+ev.Tenant)
|
||||
sqIDs, err := matchingItemIDsForEvent(ev.Event, nil, sS.dm, utils.StatQueuesStringIndex+ev.Tenant)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -151,25 +151,25 @@ func (ts Thresholds) Sort() {
|
||||
sort.Slice(ts, func(i, j int) bool { return ts[i].tPrfl.Weight > ts[j].tPrfl.Weight })
|
||||
}
|
||||
|
||||
func NewThresholdService(dm *DataManager, filteredFields []string, storeInterval time.Duration,
|
||||
func NewThresholdService(dm *DataManager, indexedFields []string, storeInterval time.Duration,
|
||||
filterS *FilterS) (tS *ThresholdService, err error) {
|
||||
return &ThresholdService{dm: dm,
|
||||
filteredFields: filteredFields,
|
||||
storeInterval: storeInterval,
|
||||
filterS: filterS,
|
||||
stopBackup: make(chan struct{}),
|
||||
storedTdIDs: make(utils.StringMap)}, nil
|
||||
indexedFields: indexedFields,
|
||||
storeInterval: storeInterval,
|
||||
filterS: filterS,
|
||||
stopBackup: make(chan struct{}),
|
||||
storedTdIDs: make(utils.StringMap)}, nil
|
||||
}
|
||||
|
||||
// ThresholdService manages Threshold execution and storing them to dataDB
|
||||
type ThresholdService struct {
|
||||
dm *DataManager
|
||||
filteredFields []string // fields considered when searching for matching thresholds
|
||||
storeInterval time.Duration
|
||||
filterS *FilterS
|
||||
stopBackup chan struct{}
|
||||
storedTdIDs utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool
|
||||
stMux sync.RWMutex // protects storedTdIDs
|
||||
dm *DataManager
|
||||
indexedFields []string // fields considered when searching for matching thresholds
|
||||
storeInterval time.Duration
|
||||
filterS *FilterS
|
||||
stopBackup chan struct{}
|
||||
storedTdIDs utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool
|
||||
stMux sync.RWMutex // protects storedTdIDs
|
||||
}
|
||||
|
||||
// Called to start the service
|
||||
@@ -254,7 +254,7 @@ func (tS *ThresholdService) StoreThreshold(t *Threshold) (err error) {
|
||||
// matchingThresholdsForEvent returns ordered list of matching thresholds which are active for an Event
|
||||
func (tS *ThresholdService) matchingThresholdsForEvent(ev *ThresholdEvent) (ts Thresholds, err error) {
|
||||
matchingTs := make(map[string]*Threshold)
|
||||
tIDs, err := matchingItemIDsForEvent(ev.Event, tS.dm, utils.ThresholdStringIndex+ev.Tenant)
|
||||
tIDs, err := matchingItemIDsForEvent(ev.Event, tS.indexedFields, tS.dm, utils.ThresholdStringIndex+ev.Tenant)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user