Added Filter indexes support

This commit is contained in:
Trial97
2020-06-12 17:05:47 +03:00
committed by Dan Christian Bogos
parent 39d49dcd2f
commit 3799ba5c4f
5 changed files with 149 additions and 44 deletions

View File

@@ -764,9 +764,17 @@ func (dm *DataManager) SetFilter(fltr *Filter) (err error) {
err = utils.ErrNoDatabaseConn
return
}
var oldFlt *Filter
if oldFlt, err = dm.GetFilter(fltr.Tenant, fltr.ID, true, false,
utils.NonTransactional); err != nil && err != utils.ErrNotFound {
return err
}
if err = dm.DataDB().SetFilterDrv(fltr); err != nil {
return
}
if withIndex {
// remove index?
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilters]; itm.Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
@@ -785,14 +793,25 @@ func (dm *DataManager) SetFilter(fltr *Filter) (err error) {
}
func (dm *DataManager) RemoveFilter(tenant, id, transactionID string) (err error) {
func (dm *DataManager) RemoveFilter(tenant, id, transactionID string, withIndex bool) (err error) {
if dm == nil {
err = utils.ErrNoDatabaseConn
return
}
var oldFlt *Filter
if oldFlt, err = dm.GetFilter(tenant, id, true, false,
utils.NonTransactional); err != nil && err != utils.ErrNotFound {
return err
}
if err = dm.DataDB().RemoveFilterDrv(tenant, id); err != nil {
return
}
if oldFlt == nil {
return utils.ErrNotFound
}
if withIndex {
// remove index?
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilters]; itm.Replicate {
var reply string
dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
@@ -1007,6 +1026,9 @@ func (dm *DataManager) RemoveThresholdProfile(tenant, id,
return utils.ErrNotFound
}
if withIndex {
if err = removeIndexFiltersItem(dm, utils.CacheThresholdFilterIndexes, tenant, id, oldTh.FilterIDs); err != nil {
return
}
if err = removeItemFromFilterIndex(dm, utils.CacheThresholdFilterIndexes,
tenant, utils.EmptyString, id, oldTh.FilterIDs); err != nil {
return
@@ -1131,6 +1153,9 @@ func (dm *DataManager) RemoveStatQueueProfile(tenant, id,
return utils.ErrNotFound
}
if withIndex {
if err = removeIndexFiltersItem(dm, utils.CacheStatFilterIndexes, tenant, id, oldSts.FilterIDs); err != nil {
return
}
if err = removeItemFromFilterIndex(dm, utils.CacheStatFilterIndexes,
tenant, utils.EmptyString, id, oldSts.FilterIDs); err != nil {
return
@@ -1447,6 +1472,9 @@ func (dm *DataManager) RemoveResourceProfile(tenant, id, transactionID string, w
return utils.ErrNotFound
}
if withIndex {
if err = removeIndexFiltersItem(dm, utils.CacheResourceFilterIndexes, tenant, id, oldRes.FilterIDs); err != nil {
return
}
if err = removeItemFromFilterIndex(dm, utils.CacheResourceFilterIndexes,
tenant, utils.EmptyString, id, oldRes.FilterIDs); err != nil {
return
@@ -2326,6 +2354,9 @@ func (dm *DataManager) RemoveRouteProfile(tenant, id, transactionID string, with
return utils.ErrNotFound
}
if withIndex {
if err = removeIndexFiltersItem(dm, utils.CacheRouteFilterIndexes, tenant, id, oldRpp.FilterIDs); err != nil {
return
}
if err = removeItemFromFilterIndex(dm, utils.CacheRouteFilterIndexes,
tenant, utils.EmptyString, id, oldRpp.FilterIDs); err != nil {
return
@@ -2466,6 +2497,9 @@ func (dm *DataManager) RemoveAttributeProfile(tenant, id string, transactionID s
return utils.ErrNotFound
}
if withIndex {
if err = removeIndexFiltersItem(dm, utils.CacheAttributeFilterIndexes, tenant, id, oldAttr.FilterIDs); err != nil {
return
}
for _, context := range oldAttr.Contexts {
if err = removeItemFromFilterIndex(dm, utils.CacheAttributeFilterIndexes,
tenant, context, id, oldAttr.FilterIDs); err != nil {
@@ -2593,6 +2627,9 @@ func (dm *DataManager) RemoveChargerProfile(tenant, id string,
return utils.ErrNotFound
}
if withIndex {
if err = removeIndexFiltersItem(dm, utils.CacheChargerFilterIndexes, tenant, id, oldCpp.FilterIDs); err != nil {
return
}
if err = removeItemFromFilterIndex(dm, utils.CacheChargerFilterIndexes,
tenant, utils.EmptyString, id, oldCpp.FilterIDs); err != nil {
return
@@ -2720,6 +2757,9 @@ func (dm *DataManager) RemoveDispatcherProfile(tenant, id string,
return utils.ErrNotFound
}
if withIndex {
if err = removeIndexFiltersItem(dm, utils.CacheDispatcherFilterIndexes, tenant, id, oldDpp.FilterIDs); err != nil {
return
}
for _, ctx := range oldDpp.Subsystems {
if err = removeItemFromFilterIndex(dm, utils.CacheDispatcherFilterIndexes,
tenant, ctx, id, oldDpp.FilterIDs); err != nil {

View File

@@ -20,6 +20,7 @@ package engine
import (
"fmt"
"strings"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/guardian"
@@ -158,6 +159,9 @@ func removeItemFromFilterIndex(dm *DataManager, idxItmType, tnt, ctx, itemID str
// updatedIndexes will compare the old filtersIDs with the new ones and only uptdate the filters indexes that are added/removed
func updatedIndexes(dm *DataManager, idxItmType, tnt, ctx, itemID string, oldFilterIds *[]string, newFilterIDs []string) (err error) {
if oldFilterIds == nil { // nothing to remove so just create the new indexes
if err = addIndexFiltersItem(dm, idxItmType, tnt, itemID, newFilterIDs); err != nil {
return
}
return addItemToFilterIndex(dm, idxItmType, tnt, ctx, itemID, newFilterIDs)
}
if len(*oldFilterIds) == 0 && len(newFilterIDs) == 0 { // nothing to update
@@ -186,6 +190,9 @@ func updatedIndexes(dm *DataManager, idxItmType, tnt, ctx, itemID string, oldFil
if len(oldFilterIDs) != 0 || oldFltrs.Size() == 0 {
// has some indexes to remove or
// the old profile doesn't have filters but the new one has so remove the *none index
if err = removeIndexFiltersItem(dm, idxItmType, tnt, itemID, oldFilterIDs); err != nil {
return
}
if err = removeItemFromFilterIndex(dm, idxItmType, tnt, ctx, itemID, oldFilterIDs); err != nil {
return
}
@@ -194,6 +201,9 @@ func updatedIndexes(dm *DataManager, idxItmType, tnt, ctx, itemID string, oldFil
if len(newFilterIDs) != 0 || newFltrs.Size() == 0 {
// has some indexes to add or
// the old profile has filters but the new one does not so add the *none index
if err = addIndexFiltersItem(dm, idxItmType, tnt, itemID, newFilterIDs); err != nil {
return
}
if err = addItemToFilterIndex(dm, idxItmType, tnt, ctx, itemID, newFilterIDs); err != nil {
return
}
@@ -204,6 +214,9 @@ func updatedIndexes(dm *DataManager, idxItmType, tnt, ctx, itemID string, oldFil
func updatedIndexesWithContexts(dm *DataManager, idxItmType, tnt, itemID string,
oldContexts, oldFilterIDs *[]string, newContexts, newFilterIDs []string) (err error) {
if oldContexts == nil { // new profile add all indexes
if err = addIndexFiltersItem(dm, idxItmType, tnt, itemID, newFilterIDs); err != nil {
return
}
for _, ctx := range newContexts {
if err = addItemToFilterIndex(dm, idxItmType, tnt, ctx, itemID, newFilterIDs); err != nil {
return
@@ -234,6 +247,11 @@ func updatedIndexesWithContexts(dm *DataManager, idxItmType, tnt, itemID string,
}
if oldFilterIDs != nil {
if len(updateContexts) == 0 {
if err = removeIndexFiltersItem(dm, idxItmType, tnt, itemID, *oldFilterIDs); err != nil {
return
}
}
for _, ctx := range removeContexts {
if err = removeItemFromFilterIndex(dm, idxItmType, tnt, ctx, itemID, *oldFilterIDs); err != nil {
return
@@ -242,6 +260,9 @@ func updatedIndexesWithContexts(dm *DataManager, idxItmType, tnt, itemID string,
}
if len(updateContexts) != 0 {
if oldFilterIDs == nil { // nothing to remove so just create the new indexes
if err = addIndexFiltersItem(dm, idxItmType, tnt, itemID, newFilterIDs); err != nil {
return
}
for _, ctx := range updateContexts {
if err = addItemToFilterIndex(dm, idxItmType, tnt, ctx, itemID, newFilterIDs); err != nil {
return
@@ -270,6 +291,9 @@ func updatedIndexesWithContexts(dm *DataManager, idxItmType, tnt, itemID string,
if len(removeFilterIDs) != 0 || oldFltrs.Size() == 0 {
// has some indexes to remove or
// the old profile doesn't have filters but the new one has so remove the *none index
if err = removeIndexFiltersItem(dm, idxItmType, tnt, itemID, removeFilterIDs); err != nil {
return
}
for _, ctx := range updateContexts {
if err = removeItemFromFilterIndex(dm, idxItmType, tnt, ctx, itemID, removeFilterIDs); err != nil {
return
@@ -280,6 +304,9 @@ func updatedIndexesWithContexts(dm *DataManager, idxItmType, tnt, itemID string,
if len(addFilterIDs) != 0 || newFltrs.Size() == 0 {
// has some indexes to add or
// the old profile has filters but the new one does not so add the *none index
if err = addIndexFiltersItem(dm, idxItmType, tnt, itemID, addFilterIDs); err != nil {
return
}
for _, ctx := range updateContexts {
if err = addItemToFilterIndex(dm, idxItmType, tnt, ctx, itemID, addFilterIDs); err != nil {
return
@@ -287,6 +314,10 @@ func updatedIndexesWithContexts(dm *DataManager, idxItmType, tnt, itemID string,
}
}
}
} else {
if err = addIndexFiltersItem(dm, idxItmType, tnt, itemID, newFilterIDs); err != nil {
return
}
}
for _, ctx := range addContexts {
@@ -352,3 +383,61 @@ func ComputeIndexes(dm *DataManager, tnt, ctx, idxItmType string, IDs *[]string,
}
return
}
func addIndexFiltersItem(dm *DataManager, idxItmType, tnt, itemID string, filterIDs []string) (err error) {
for _, ID := range filterIDs {
if strings.HasPrefix(ID, utils.Meta) { // skip inline
continue
}
tntCtx := utils.ConcatenatedKey(tnt, ID)
var indexes map[string]utils.StringSet
if indexes, err = dm.GetIndexes(utils.CacheFilterIndexes, tntCtx,
idxItmType, true, false); err != nil {
if err != utils.ErrNotFound {
return
}
err = nil
indexes[idxItmType] = make(utils.StringSet) // create an empty index if is not found in DB in case we add them later
}
indexes[idxItmType].Add(itemID)
if err = dm.SetIndexes(idxItmType, tntCtx, indexes, true, utils.NonTransactional); err != nil {
return
}
for indxKey := range indexes {
if err = Cache.Remove(utils.CacheFilterIndexes, utils.ConcatenatedKey(tntCtx, indxKey), true, utils.NonTransactional); err != nil {
return
}
}
}
return
}
func removeIndexFiltersItem(dm *DataManager, idxItmType, tnt, itemID string, filterIDs []string) (err error) {
for _, ID := range filterIDs {
if strings.HasPrefix(ID, utils.Meta) { // skip inline
continue
}
tntCtx := utils.ConcatenatedKey(tnt, ID)
var indexes map[string]utils.StringSet
if indexes, err = dm.GetIndexes(utils.CacheFilterIndexes, tntCtx,
idxItmType, true, false); err != nil {
if err != utils.ErrNotFound {
return
}
err = nil
continue // it is already removed
}
indexes[idxItmType].Remove(itemID)
if err = dm.SetIndexes(idxItmType, tntCtx, indexes, true, utils.NonTransactional); err != nil {
return
}
for indxKey := range indexes {
if err = Cache.Remove(utils.CacheFilterIndexes, utils.ConcatenatedKey(tntCtx, indxKey), true, utils.NonTransactional); err != nil {
return
}
}
}
return
}

View File

@@ -233,7 +233,7 @@ func testDCITMigrateAndMove(t *testing.T) {
},
}
if dcidx, err := dcMigrator.dmOut.DataManager().GetIndexes(
utils.PrefixToIndexCache[utils.AttributeProfilePrefix],
utils.CacheAttributeFilterIndexes,
utils.ConcatenatedKey("cgrates.org", utils.MetaChargers),
"", true, true); err != nil {
t.Error(err)
@@ -246,7 +246,7 @@ func testDCITMigrateAndMove(t *testing.T) {
},
}
if dcidx, err := dcMigrator.dmOut.DataManager().GetIndexes(
utils.PrefixToIndexCache[utils.ChargerProfilePrefix],
utils.CacheChargerFilterIndexes,
utils.ConcatenatedKey("cgrates.org", utils.MetaChargers),
"", true, true); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Errorf("Expected error %v, recived: %v with reply: %v", utils.ErrNotFound, err, utils.ToJSON(dcidx))

View File

@@ -199,7 +199,7 @@ func testUsrITMigrateAndMove(t *testing.T) {
},
}
if usridx, err := usrMigrator.dmOut.DataManager().GetIndexes(
utils.PrefixToIndexCache[utils.AttributeProfilePrefix],
utils.CacheAttributeFilterIndexes,
utils.ConcatenatedKey("cgrates.org", utils.META_ANY),
"", true, true); err != nil {
t.Error(err)

View File

@@ -26,10 +26,7 @@ var (
Account, Subject, Destination, SetupTime, AnswerTime, Usage, COST, RATED, Partial, RunID,
PreRated, CostSource, CostDetails, ExtraInfo, OrderID})
PostPaidRatedSlice = []string{META_POSTPAID, META_RATED}
ItemList = NewStringSet([]string{MetaAccounts, MetaAttributes, MetaChargers, MetaDispatchers, MetaDispatcherHosts,
MetaFilters, MetaResources, MetaStats, MetaThresholds, MetaRoutes,
})
AttrInlineTypes = NewStringSet([]string{META_CONSTANT, MetaVariable, META_COMPOSED, META_USAGE_DIFFERENCE,
AttrInlineTypes = NewStringSet([]string{META_CONSTANT, MetaVariable, META_COMPOSED, META_USAGE_DIFFERENCE,
MetaSum, MetaValueExponent})
GitLastLog string // If set, it will be processed as part of versioning
@@ -45,19 +42,7 @@ var (
MetaKafkajsonMap: CONTENT_JSON,
MetaS3jsonMap: CONTENT_JSON,
}
CDREFileSuffixes = map[string]string{
MetaHTTPjsonCDR: JSNSuffix,
MetaHTTPjsonMap: JSNSuffix,
MetaAMQPjsonCDR: JSNSuffix,
MetaAMQPjsonMap: JSNSuffix,
MetaAMQPV1jsonMap: JSNSuffix,
MetaSQSjsonMap: JSNSuffix,
MetaKafkajsonMap: JSNSuffix,
MetaS3jsonMap: JSNSuffix,
MetaHTTPPost: FormSuffix,
MetaFileCSV: CSVSuffix,
MetaFileFWV: FWVSuffix,
}
// CachePartitions enables creation of cache partitions
CachePartitions = NewStringSet([]string{CacheDestinations, CacheReverseDestinations,
CacheRatingPlans, CacheRatingProfiles, CacheActions, CacheActionPlans,
@@ -107,20 +92,20 @@ var (
CacheLoadIDs: LoadIDPrefix,
CacheAccounts: ACCOUNT_PREFIX,
CacheRateFilterIndexes: RateFilterIndexPrfx,
CacheFilterIndexes: FilterIndexPrfx,
}
CachePrefixToInstance map[string]string // will be built on init
PrefixToIndexCache = map[string]string{
ThresholdProfilePrefix: CacheThresholdFilterIndexes,
ResourceProfilesPrefix: CacheResourceFilterIndexes,
StatQueueProfilePrefix: CacheStatFilterIndexes,
RouteProfilePrefix: CacheRouteFilterIndexes,
AttributeProfilePrefix: CacheAttributeFilterIndexes,
ChargerProfilePrefix: CacheChargerFilterIndexes,
DispatcherProfilePrefix: CacheDispatcherFilterIndexes,
RateProfilePrefix: CacheRateProfilesFilterIndexes,
RatePrefix: CacheRateFilterIndexes,
CachePrefixToInstance map[string]string // will be built on init
CacheIndexesToPrefix = map[string]string{ // used by match index to get all the ids when index selects is disabled and for compute indexes
CacheThresholdFilterIndexes: ThresholdProfilePrefix,
CacheResourceFilterIndexes: ResourceProfilesPrefix,
CacheStatFilterIndexes: StatQueueProfilePrefix,
CacheRouteFilterIndexes: RouteProfilePrefix,
CacheAttributeFilterIndexes: AttributeProfilePrefix,
CacheChargerFilterIndexes: ChargerProfilePrefix,
CacheDispatcherFilterIndexes: DispatcherProfilePrefix,
CacheRateProfilesFilterIndexes: RateProfilePrefix,
CacheRateFilterIndexes: RatePrefix,
}
CacheIndexesToPrefix map[string]string // will be built on init
// NonMonetaryBalances are types of balances which are not handled as monetary
NonMonetaryBalances = NewStringSet([]string{VOICE, SMS, DATA, GENERIC})
@@ -614,7 +599,6 @@ const (
MetaDivide = "*divide"
MetaUrl = "*url"
MetaXml = "*xml"
ApiKey = "apikey"
MetaReq = "*req"
MetaVars = "*vars"
MetaRep = "*rep"
@@ -679,8 +663,6 @@ const (
LoadIDs = "load_ids"
DNSAgent = "DNSAgent"
TLSNoCaps = "tls"
MetaRouteID = "*route_id"
MetaApiKey = "*api_key"
UsageID = "UsageID"
Rcode = "Rcode"
Replacement = "Replacement"
@@ -1657,6 +1639,7 @@ const (
CacheUCH = "*uch"
CacheSTIR = "*stir"
CacheEventCharges = "*event_charges"
CacheFilterIndexes = "*filter_indexes"
)
// Prefix for indexing
@@ -1671,6 +1654,7 @@ const (
RouteFilterIndexes = "rti_"
RateProfilesFilterIndexPrfx = "rpi_"
RateFilterIndexPrfx = "rri_"
FilterIndexPrfx = "fii_"
)
// Agents
@@ -2218,14 +2202,6 @@ func buildCacheInstRevPrefixes() {
}
}
func buildCacheIndexesToPrefix() {
CacheIndexesToPrefix = make(map[string]string)
for k, v := range PrefixToIndexCache {
CacheIndexesToPrefix[v] = k
}
}
func init() {
buildCacheInstRevPrefixes()
buildCacheIndexesToPrefix()
}