matchingItemIDsForEvent with support for *prefix DB queries

This commit is contained in:
DanB
2018-01-23 18:10:30 +01:00
parent 76a53102fd
commit 8df8f85afc
8 changed files with 68 additions and 56 deletions

View File

@@ -65,8 +65,8 @@ func (alS *AttributeService) matchingAttributeProfilesForEvent(ev *utils.CGREven
}
attrIdxKey = utils.ConcatenatedKey(ev.Tenant, contextVal)
matchingAPs := make(map[string]*AttributeProfile)
aPrflIDs, err := matchingItemIDsForEvent(ev.Event, alS.stringIndexedFields,
alS.dm, utils.AttributeFilterIndexes+attrIdxKey, MetaString)
aPrflIDs, err := matchingItemIDsForEvent(ev.Event, alS.stringIndexedFields, alS.prefixIndexedFields,
alS.dm, utils.AttributeFilterIndexes+attrIdxKey)
if err != nil {
return nil, err
}

View File

@@ -74,27 +74,27 @@ func (rfi *ReqFilterIndexer) IndexTPFilter(tpFltr *utils.TPFilterProfile, itemID
rfi.chngdIndxKeys[concatKey] = true
}
rfi.chngdRevIndxKeys[itemID] = true
// case MetaStringPrefix:
// for _, fldVal := range fltr.Values {
// concatKey := utils.ConcatenatedKey(fltr.FieldName, fldVal)
// if _, hasIt := rfi.indexes[concatKey]; !hasIt {
// rfi.indexes[concatKey] = make(utils.StringMap)
// }
// rfi.indexes[concatKey][itemID] = true
// rfi.reveseIndex[itemID][concatKey] = true
// rfi.chngdIndxKeys[concatKey] = true
// }
// rfi.chngdRevIndxKeys[itemID] = true
case MetaPrefix:
for _, fldVal := range fltr.Values {
concatKey := utils.ConcatenatedKey(fltr.FieldName, fldVal)
if _, hasIt := rfi.indexes[concatKey]; !hasIt {
rfi.indexes[concatKey] = make(utils.StringMap)
}
rfi.indexes[concatKey][itemID] = true
rfi.reveseIndex[itemID][concatKey] = true
rfi.chngdIndxKeys[concatKey] = true
}
rfi.chngdRevIndxKeys[itemID] = true
default:
naConcatKey := utils.ConcatenatedKey(utils.MetaDefault, utils.NOT_AVAILABLE, utils.NOT_AVAILABLE)
if _, hasIt := rfi.indexes[naConcatKey]; !hasIt {
rfi.indexes[naConcatKey] = make(utils.StringMap)
concatKey := utils.ConcatenatedKey(utils.MetaDefault, utils.ANY, utils.ANY)
if _, hasIt := rfi.indexes[concatKey]; !hasIt {
rfi.indexes[concatKey] = make(utils.StringMap)
}
if _, hasIt := rfi.reveseIndex[itemID]; !hasIt {
rfi.reveseIndex[itemID] = make(utils.StringMap)
}
rfi.reveseIndex[itemID][naConcatKey] = true
rfi.indexes[naConcatKey][itemID] = true // Fields without real field index will be located in map[NOT_AVAILABLE:NOT_AVAILABLE][rl.ID]
rfi.reveseIndex[itemID][concatKey] = true
rfi.indexes[concatKey][itemID] = true // Fields without real field index will be located in map[*any:*any][rl.ID]
}
}

View File

@@ -206,7 +206,7 @@ func testOnStorITSetFilterIndexes(t *testing.T) {
"RL2": true,
"RL3": true,
},
utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE): utils.StringMap{
utils.ConcatenatedKey(utils.MetaDefault, utils.ANY, utils.ANY): utils.StringMap{
"RL4": true,
"RL5": true,
},
@@ -234,7 +234,7 @@ func testOnStorITGetFilterIndexes(t *testing.T) {
"RL2": true,
"RL3": true,
},
utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE): utils.StringMap{
utils.ConcatenatedKey(utils.MetaDefault, utils.ANY, utils.ANY): utils.StringMap{
"RL4": true,
"RL5": true,
},

View File

@@ -26,45 +26,55 @@ import (
// matchingItemIDsForEvent returns the list of item IDs matching fieldName/fieldValue for an event
// fieldIDs limits the fields which are checked against indexes
// helper on top of dataDB.MatchReqFilterIndex, adding utils.NOT_AVAILABLE to list of fields queried
// executes a number of $(len(fields) + 1) queries to dataDB so the size of event influences the speed of return
func matchingItemIDsForEvent(ev map[string]interface{}, fieldIDs *[]string,
dm *DataManager, dbIdxKey, filterType string) (itemIDs utils.StringMap, err error) {
if fieldIDs == nil {
allFieldIDs := make([]string, len(ev))
i := 0
for fldID := range ev {
allFieldIDs[i] = fldID
i += 1
}
fieldIDs = &allFieldIDs
}
// helper on top of dataDB.MatchReqFilterIndex, adding utils.ANY to list of fields queried
func matchingItemIDsForEvent(ev map[string]interface{}, stringFldIDs, prefixFldIDs *[]string,
dm *DataManager, dbIdxKey string) (itemIDs utils.StringMap, err error) {
itemIDs = make(utils.StringMap)
for _, fldName := range *fieldIDs {
fieldValIf, has := ev[fldName]
if !has {
continue
allFieldIDs := make([]string, len(ev))
i := 0
for fldID := range ev {
allFieldIDs[i] = fldID
i += 1
}
filterIndexTypes := []string{MetaString, MetaPrefix}
for i, fieldIDs := range []*[]string{stringFldIDs, prefixFldIDs} { // same routine for both string and prefix filter types
if fieldIDs == nil {
fieldIDs = &allFieldIDs
}
fldVal, canCast := utils.CastFieldIfToString(fieldValIf)
if !canCast {
utils.Logger.Warning(
fmt.Sprintf("<%s> cannot cast field: %s into string", utils.FilterS, fldName))
continue
}
dbItemIDs, err := dm.MatchFilterIndex(dbIdxKey, filterType, fldName, fldVal)
if err != nil {
if err == utils.ErrNotFound {
for _, fldName := range *fieldIDs {
fieldValIf, has := ev[fldName]
if !has {
continue
}
return nil, err
}
for itemID := range dbItemIDs {
if _, hasIt := itemIDs[itemID]; !hasIt { // Add it to list if not already there
itemIDs[itemID] = dbItemIDs[itemID]
fldVal, canCast := utils.CastFieldIfToString(fieldValIf)
if !canCast {
utils.Logger.Warning(
fmt.Sprintf("<%s> cannot cast field: %s into string", utils.FilterS, fldName))
continue
}
fldVals := []string{fldVal} // default is only one fieldValue checked
if filterIndexTypes[i] == MetaPrefix {
fldVals = utils.SplitPrefix(fldVal, 1) // all prefixes till last digit
}
var dbItemIDs utils.StringMap // list of items matched in DB
for _, val := range fldVals {
dbItemIDs, err = dm.MatchFilterIndex(dbIdxKey, filterIndexTypes[i], fldName, val)
if err != nil {
if err == utils.ErrNotFound {
continue
}
return nil, err
}
break // we got at least one answer back, longest prefix wins
}
for itemID := range dbItemIDs {
if _, hasIt := itemIDs[itemID]; !hasIt { // Add it to list if not already there
itemIDs[itemID] = dbItemIDs[itemID]
}
}
}
}
dbItemIDs, err := dm.MatchFilterIndex(dbIdxKey, utils.MetaDefault, utils.NOT_AVAILABLE, utils.NOT_AVAILABLE) // add unindexed itemIDs to be checked
dbItemIDs, err := dm.MatchFilterIndex(dbIdxKey, utils.MetaDefault, utils.ANY, utils.ANY) // add unindexed itemIDs to be checked
if err != nil {
if err != utils.ErrNotFound {
return nil, err

View File

@@ -438,7 +438,8 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources)
// matchingResourcesForEvent returns ordered list of matching resources which are active by the time of the call
func (rS *ResourceService) matchingResourcesForEvent(tenant string, ev map[string]interface{}) (rs Resources, err error) {
matchingResources := make(map[string]*Resource)
rIDs, err := matchingItemIDsForEvent(ev, rS.stringIndexedFields, rS.dm, utils.ResourceFilterIndexes+tenant, MetaString)
rIDs, err := matchingItemIDsForEvent(ev, rS.stringIndexedFields, rS.prefixIndexedFields,
rS.dm, utils.ResourceFilterIndexes+tenant)
if err != nil {
return nil, err
}

View File

@@ -143,7 +143,8 @@ func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) {
// matchingStatQueuesForEvent returns ordered list of matching resources which are active by the time of the call
func (sS *StatService) matchingStatQueuesForEvent(ev *utils.CGREvent) (sqs StatQueues, err error) {
matchingSQs := make(map[string]*StatQueue)
sqIDs, err := matchingItemIDsForEvent(ev.Event, sS.stringIndexedFields, sS.dm, utils.StatFilterIndexes+ev.Tenant, MetaString)
sqIDs, err := matchingItemIDsForEvent(ev.Event, sS.stringIndexedFields, sS.prefixIndexedFields,
sS.dm, utils.StatFilterIndexes+ev.Tenant)
if err != nil {
return nil, err
}

View File

@@ -117,7 +117,7 @@ func (spS *SupplierService) Shutdown() error {
func (spS *SupplierService) matchingSupplierProfilesForEvent(ev *utils.CGREvent) (sPrfls SupplierProfiles, err error) {
matchingLPs := make(map[string]*SupplierProfile)
sPrflIDs, err := matchingItemIDsForEvent(ev.Event, spS.stringIndexedFields,
spS.dm, utils.SupplierFilterIndexes+ev.Tenant, MetaString)
spS.prefixIndexedFields, spS.dm, utils.SupplierFilterIndexes+ev.Tenant)
if err != nil {
return nil, err
}

View File

@@ -222,7 +222,7 @@ func (tS *ThresholdService) matchingThresholdsForEvent(args *ArgsProcessEvent) (
tIDs = args.ThresholdIDs
} else {
tIDsMap, err := matchingItemIDsForEvent(args.Event, tS.stringIndexedFields,
tS.dm, utils.ThresholdFilterIndexes+args.Tenant, MetaString)
tS.prefixIndexedFields, tS.dm, utils.ThresholdFilterIndexes+args.Tenant)
if err != nil {
return nil, err
}