update newFilterIndex to use batching

This commit is contained in:
ionutboangiu
2025-09-22 23:01:03 +03:00
committed by Dan Christian Bogos
parent 2078c93758
commit e9b1538450
2 changed files with 77 additions and 73 deletions

View File

@@ -19,7 +19,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"errors"
"fmt"
"maps"
"strings"
"github.com/cgrates/cgrates/config"
@@ -39,103 +41,105 @@ var (
}
)
// newFilterIndex will get the index from DataManager if is not found it will create it
// is used to update the mentioned index
func newFilterIndex(dm *DataManager, idxItmType, tnt, ctx, itemID string, filterIDs []string, newFlt *Filter) (indexes map[string]utils.StringSet, err error) {
// newFilterIndex retrieves filter indexes from DataManager.
// Creates empty indexes for any missing keys.
func newFilterIndex(dm *DataManager, itemType, tnt, ctx, itemID string,
filterIDs []string, newFlt *Filter) (map[string]utils.StringSet, error) {
tntCtx := tnt
if ctx != utils.EmptyString {
if ctx != "" {
tntCtx = utils.ConcatenatedKey(tnt, ctx)
}
indexes = make(map[string]utils.StringSet)
if len(filterIDs) == 0 { // in case of None
result := make(map[string]utils.StringSet)
if len(filterIDs) == 0 {
idxKey := utils.ConcatenatedKey(utils.MetaNone, utils.MetaAny, utils.MetaAny)
var rcvIndx map[string]utils.StringSet
if rcvIndx, err = dm.GetIndexes(idxItmType, tntCtx,
true, false, idxKey); err != nil {
if err != utils.ErrNotFound {
return
indexes, err := dm.GetIndexes(itemType, tntCtx, true, false, idxKey)
if err != nil {
if !errors.Is(err, utils.ErrNotFound) {
return nil, err
}
err = nil
indexes[idxKey] = make(utils.StringSet) // create an empty index if is not found in DB in case we add them later
return
result[idxKey] = make(utils.StringSet) // Create empty index
return result, nil
}
for idxKey, idx := range rcvIndx { // parse the received indexes
indexes[idxKey] = idx
}
return
maps.Copy(result, indexes)
return result, nil
}
// in case of more filters we parse each filter rule and only for supported index types
// we try to get them from Cache/DataDB or if not found in this location we create them here
// Collect all index keys from all filter rules.
var allKeys []string
for _, fltrID := range filterIDs {
var fltr *Filter
// Get filter from either new provided filter or DataManager.
if newFlt != nil && newFlt.Tenant == tnt && newFlt.ID == fltrID {
fltr = newFlt
} else if fltr, err = dm.GetFilter(tnt, fltrID,
true, false, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
err = fmt.Errorf("broken reference to filter: %+v for itemType: %+v and ID: %+v",
fltrID, idxItmType, itemID)
} else {
var err error
fltr, err = dm.GetFilter(tnt, fltrID, true, false, utils.NonTransactional)
if err != nil {
if errors.Is(err, utils.ErrNotFound) {
return nil, fmt.Errorf("broken filter reference %q for item %q of type %q",
fltrID, itemID, itemType)
}
return nil, err
}
return
}
for _, flt := range fltr.Rules {
if !FilterIndexTypes.Has(flt.Type) ||
IsDynamicDPPath(flt.Element) {
for _, rule := range fltr.Rules {
if !FilterIndexTypes.Has(rule.Type) || IsDynamicDPPath(rule.Element) {
continue
}
isDyn := strings.HasPrefix(flt.Element, utils.DynamicDataPrefix)
if isDyn && flt.Type == utils.MetaExists {
idxKey := utils.ConcatenatedKey(flt.Type, flt.Element[1:])
var rcvIndx map[string]utils.StringSet
// only read from cache in case if we do not find the index to not cache the negative response
if rcvIndx, err = dm.GetIndexes(idxItmType, tntCtx,
true, false, idxKey); err != nil {
if err != utils.ErrNotFound {
return
}
err = nil
indexes[idxKey] = make(utils.StringSet) // create an empty index if is not found in DB in case we add them later
continue
}
for idxKey, idx := range rcvIndx { // parse the received indexes
indexes[idxKey] = idx
}
elemIsDyn := strings.HasPrefix(rule.Element, utils.DynamicDataPrefix)
// Handle *exists separately.
if elemIsDyn && rule.Type == utils.MetaExists {
idxKey := utils.ConcatenatedKey(rule.Type, rule.Element[1:])
allKeys = append(allKeys, idxKey)
continue // skip values loop for *exists
}
for _, fldVal := range flt.Values {
if IsDynamicDPPath(fldVal) {
for _, val := range rule.Values {
if IsDynamicDPPath(val) {
continue
}
valIsDyn := strings.HasPrefix(val, utils.DynamicDataPrefix)
// Skip if both element and value are dynamic or both are static.
if elemIsDyn == valIsDyn {
continue
}
// Build index key based on which one is dynamic.
var idxKey string
if isDyn {
if strings.HasPrefix(fldVal, utils.DynamicDataPrefix) { // do not index if both the element and the value is dynamic
continue
}
idxKey = utils.ConcatenatedKey(flt.Type, flt.Element[1:], fldVal)
} else if strings.HasPrefix(fldVal, utils.DynamicDataPrefix) {
idxKey = utils.ConcatenatedKey(flt.Type, fldVal[1:], flt.Element)
if elemIsDyn {
idxKey = utils.ConcatenatedKey(rule.Type, rule.Element[1:], val)
} else {
// do not index not dynamic filters
continue
}
var rcvIndx map[string]utils.StringSet
// only read from cache in case if we do not find the index to not cache the negative response
if rcvIndx, err = dm.GetIndexes(idxItmType, tntCtx,
true, false, idxKey); err != nil {
if err != utils.ErrNotFound {
return
}
err = nil
indexes[idxKey] = make(utils.StringSet) // create an empty index if is not found in DB in case we add them later
continue
}
for idxKey, idx := range rcvIndx { // parse the received indexes
indexes[idxKey] = idx
idxKey = utils.ConcatenatedKey(rule.Type, val[1:], rule.Element)
}
allKeys = append(allKeys, idxKey)
}
}
}
return
if len(allKeys) > 0 {
indexes, err := dm.GetIndexes(itemType, tntCtx, true, false, allKeys...)
if err != nil && !errors.Is(err, utils.ErrNotFound) {
return nil, err
}
maps.Copy(result, indexes) // merge fetched indexes into result map
// Create empty sets for any requested keys that weren't returned.
for _, key := range allKeys {
if _, exists := result[key]; !exists {
result[key] = make(utils.StringSet)
}
}
}
return result, nil
}
// addItemToFilterIndex will add the itemID to the existing/created index and set it in the DataDB

View File

@@ -204,7 +204,7 @@ func TestLibIndexNewFilterIndexGetFilterErrNotFound(t *testing.T) {
Rules: []*FilterRule{},
}
_, err := newFilterIndex(dm, idxItmType, tnt, ctx, itemID, filterIDs, newFlt)
expectedErr := "broken reference to filter: nonexistent_filter for itemType: indexItemType and ID: item1"
expectedErr := `broken filter reference "nonexistent_filter" for item "item1" of type "indexItemType"`
if err == nil || err.Error() != expectedErr {
t.Fatalf("Expected error %v, got %v", expectedErr, err)
}