New constructor for FilterIndexes with old idx to avoid overwrite at ComputeID

This commit is contained in:
porosnicuadrian
2021-09-14 16:56:01 +03:00
committed by Dan Christian Bogos
parent 30a4b9e2e8
commit 2d36afe34f
3 changed files with 92 additions and 10 deletions

View File

@@ -316,6 +316,7 @@ func (api *APIerSv1) ComputeFilterIndexIDs(args utils.ArgsComputeFilterIndexIDs,
if err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
//StatQueueProfile Indexes
sqpIndexers, err := api.computeStatIndexes(args.Tenant, &args.StatIDs, transactionID)
if err != nil && err != utils.ErrNotFound {
@@ -460,7 +461,7 @@ func (api *APIerSv1) ComputeFilterIndexIDs(args utils.ArgsComputeFilterIndexIDs,
func (api *APIerSv1) computeThresholdIndexes(tenant string, thIDs *[]string,
transactionID string) (filterIndexer *engine.FilterIndexer, err error) {
var thresholdIDs []string
thdsIndexers := engine.NewFilterIndexer(api.DataManager, utils.ThresholdProfilePrefix, tenant)
var thdsIndexers *engine.FilterIndexer
if thIDs == nil {
ids, err := api.DataManager.DataDB().GetKeysForPrefix(utils.ThresholdProfilePrefix)
if err != nil {
@@ -469,7 +470,17 @@ func (api *APIerSv1) computeThresholdIndexes(tenant string, thIDs *[]string,
for _, id := range ids {
thresholdIDs = append(thresholdIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1])
}
// this will be on ComputeIndexes that contains empty indexes
thdsIndexers = engine.NewFilterIndexer(api.DataManager, utils.ThresholdProfilePrefix, tenant)
} else {
// this will be on ComputeIndexesIDs that contains the old indexes from the next getter
var oldIDx map[string]utils.StringMap
if oldIDx, err = api.DataManager.GetFilterIndexes(utils.PrefixToIndexCache[utils.ThresholdProfilePrefix],
tenant, utils.EmptyString, nil); err != nil || oldIDx == nil {
thdsIndexers = engine.NewFilterIndexer(api.DataManager, utils.ThresholdProfilePrefix, tenant)
} else {
thdsIndexers = engine.NewFilterIndexerWithIndexes(api.DataManager, utils.ThresholdProfilePrefix, tenant, oldIDx)
}
thresholdIDs = *thIDs
transactionID = utils.NonTransactional
}
@@ -510,6 +521,7 @@ func (api *APIerSv1) computeThresholdIndexes(tenant string, thIDs *[]string,
thdsIndexers.IndexTPFilter(engine.FilterToTPFilter(fltr), th.ID)
}
}
if transactionID == utils.NonTransactional {
if err := thdsIndexers.StoreIndexes(true, transactionID); err != nil {
return nil, err
@@ -526,8 +538,7 @@ func (api *APIerSv1) computeThresholdIndexes(tenant string, thIDs *[]string,
func (api *APIerSv1) computeAttributeIndexes(tenant, context string, attrIDs *[]string,
transactionID string) (filterIndexer *engine.FilterIndexer, err error) {
var attributeIDs []string
attrIndexers := engine.NewFilterIndexer(api.DataManager, utils.AttributeProfilePrefix,
utils.ConcatenatedKey(tenant, context))
var attrIndexers *engine.FilterIndexer
if attrIDs == nil {
ids, err := api.DataManager.DataDB().GetKeysForPrefix(utils.AttributeProfilePrefix)
if err != nil {
@@ -536,7 +547,18 @@ func (api *APIerSv1) computeAttributeIndexes(tenant, context string, attrIDs *[]
for _, id := range ids {
attributeIDs = append(attributeIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1])
}
// this will be on ComputeIndexes that contains empty indexes
attrIndexers = engine.NewFilterIndexer(api.DataManager, utils.AttributeProfilePrefix,
utils.ConcatenatedKey(tenant, context))
} else {
// this will be on ComputeIndexesIDs that contains the old indexes from the next getter
var oldIDx map[string]utils.StringMap
if oldIDx, err = api.DataManager.GetFilterIndexes(utils.PrefixToIndexCache[utils.AttributeProfilePrefix],
tenant, utils.EmptyString, nil); err != nil || oldIDx == nil {
attrIndexers = engine.NewFilterIndexer(api.DataManager, utils.AttributeProfilePrefix, utils.ConcatenatedKey(tenant, context))
} else {
attrIndexers = engine.NewFilterIndexerWithIndexes(api.DataManager, utils.AttributeProfilePrefix, utils.ConcatenatedKey(tenant, context), oldIDx)
}
attributeIDs = *attrIDs
transactionID = utils.NonTransactional
}
@@ -596,7 +618,7 @@ func (api *APIerSv1) computeAttributeIndexes(tenant, context string, attrIDs *[]
func (api *APIerSv1) computeResourceIndexes(tenant string, rsIDs *[]string,
transactionID string) (filterIndexer *engine.FilterIndexer, err error) {
var resourceIDs []string
rpIndexers := engine.NewFilterIndexer(api.DataManager, utils.ResourceProfilesPrefix, tenant)
var rpIndexers *engine.FilterIndexer
if rsIDs == nil {
ids, err := api.DataManager.DataDB().GetKeysForPrefix(utils.ResourceProfilesPrefix)
if err != nil {
@@ -605,7 +627,17 @@ func (api *APIerSv1) computeResourceIndexes(tenant string, rsIDs *[]string,
for _, id := range ids {
resourceIDs = append(resourceIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1])
}
// this will be on ComputeIndexes that contains empty indexes
rpIndexers = engine.NewFilterIndexer(api.DataManager, utils.ResourceProfilesPrefix, tenant)
} else {
// this will be on ComputeIndexesIDs that contains the old indexes from the next getter
var oldIDx map[string]utils.StringMap
if oldIDx, err = api.DataManager.GetFilterIndexes(utils.PrefixToIndexCache[utils.ResourceFilterIndexes],
tenant, utils.EmptyString, nil); err != nil || oldIDx == nil {
rpIndexers = engine.NewFilterIndexer(api.DataManager, utils.ResourceProfilesPrefix, tenant)
} else {
rpIndexers = engine.NewFilterIndexerWithIndexes(api.DataManager, utils.ResourceProfilesPrefix, tenant, oldIDx)
}
resourceIDs = *rsIDs
transactionID = utils.NonTransactional
}
@@ -662,7 +694,7 @@ func (api *APIerSv1) computeResourceIndexes(tenant string, rsIDs *[]string,
func (api *APIerSv1) computeStatIndexes(tenant string, stIDs *[]string,
transactionID string) (filterIndexer *engine.FilterIndexer, err error) {
var statIDs []string
sqpIndexers := engine.NewFilterIndexer(api.DataManager, utils.StatQueueProfilePrefix, tenant)
var sqpIndexers *engine.FilterIndexer
if stIDs == nil {
ids, err := api.DataManager.DataDB().GetKeysForPrefix(utils.StatQueueProfilePrefix)
if err != nil {
@@ -671,7 +703,17 @@ func (api *APIerSv1) computeStatIndexes(tenant string, stIDs *[]string,
for _, id := range ids {
statIDs = append(statIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1])
}
// this will be on ComputeIndexes that contains empty indexes
sqpIndexers = engine.NewFilterIndexer(api.DataManager, utils.StatQueueProfilePrefix, tenant)
} else {
// this will be on ComputeIndexesIDs that contains the old indexes from the next getter
var oldIDx map[string]utils.StringMap
if oldIDx, err = api.DataManager.GetFilterIndexes(utils.PrefixToIndexCache[utils.StatQueueProfilePrefix],
tenant, utils.EmptyString, nil); err != nil || oldIDx == nil {
sqpIndexers = engine.NewFilterIndexer(api.DataManager, utils.StatQueueProfilePrefix, tenant)
} else {
sqpIndexers = engine.NewFilterIndexerWithIndexes(api.DataManager, utils.StatQueueProfilePrefix, tenant, oldIDx)
}
statIDs = *stIDs
transactionID = utils.NonTransactional
}
@@ -728,7 +770,7 @@ func (api *APIerSv1) computeStatIndexes(tenant string, stIDs *[]string,
func (api *APIerSv1) computeSupplierIndexes(tenant string, sppIDs *[]string,
transactionID string) (filterIndexer *engine.FilterIndexer, err error) {
var supplierIDs []string
sppIndexers := engine.NewFilterIndexer(api.DataManager, utils.SupplierProfilePrefix, tenant)
var sppIndexers *engine.FilterIndexer
if sppIDs == nil {
ids, err := api.DataManager.DataDB().GetKeysForPrefix(utils.SupplierProfilePrefix)
if err != nil {
@@ -737,7 +779,17 @@ func (api *APIerSv1) computeSupplierIndexes(tenant string, sppIDs *[]string,
for _, id := range ids {
supplierIDs = append(supplierIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1])
}
// this will be on ComputeIndexes that contains empty indexes
sppIndexers = engine.NewFilterIndexer(api.DataManager, utils.SupplierProfilePrefix, tenant)
} else {
// this will be on ComputeIndexesIDs that contains the old indexes from the next getter
var oldIDx map[string]utils.StringMap
if oldIDx, err = api.DataManager.GetFilterIndexes(utils.PrefixToIndexCache[utils.SupplierProfilePrefix],
tenant, utils.EmptyString, nil); err != nil || oldIDx == nil {
sppIndexers = engine.NewFilterIndexer(api.DataManager, utils.SupplierProfilePrefix, tenant)
} else {
sppIndexers = engine.NewFilterIndexerWithIndexes(api.DataManager, utils.SupplierProfilePrefix, tenant, oldIDx)
}
supplierIDs = *sppIDs
transactionID = utils.NonTransactional
}
@@ -794,7 +846,7 @@ func (api *APIerSv1) computeSupplierIndexes(tenant string, sppIDs *[]string,
func (api *APIerSv1) computeChargerIndexes(tenant string, cppIDs *[]string,
transactionID string) (filterIndexer *engine.FilterIndexer, err error) {
var chargerIDs []string
cppIndexes := engine.NewFilterIndexer(api.DataManager, utils.ChargerProfilePrefix, tenant)
var cppIndexes *engine.FilterIndexer
if cppIDs == nil {
ids, err := api.DataManager.DataDB().GetKeysForPrefix(utils.ChargerProfilePrefix)
if err != nil {
@@ -803,7 +855,17 @@ func (api *APIerSv1) computeChargerIndexes(tenant string, cppIDs *[]string,
for _, id := range ids {
chargerIDs = append(chargerIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1])
}
// this will be on ComputeIndexes that contains empty indexes
cppIndexes = engine.NewFilterIndexer(api.DataManager, utils.ChargerProfilePrefix, tenant)
} else {
// this will be on ComputeIndexesIDs that contains the old indexes from the next getter
var oldIDx map[string]utils.StringMap
if oldIDx, err = api.DataManager.GetFilterIndexes(utils.PrefixToIndexCache[utils.ChargerProfilePrefix],
tenant, utils.EmptyString, nil); err != nil || oldIDx == nil {
cppIndexes = engine.NewFilterIndexer(api.DataManager, utils.ChargerProfilePrefix, tenant)
} else {
cppIndexes = engine.NewFilterIndexerWithIndexes(api.DataManager, utils.ChargerProfilePrefix, tenant, oldIDx)
}
chargerIDs = *cppIDs
transactionID = utils.NonTransactional
}
@@ -860,8 +922,7 @@ func (api *APIerSv1) computeChargerIndexes(tenant string, cppIDs *[]string,
func (api *APIerSv1) computeDispatcherIndexes(tenant, context string, dspIDs *[]string,
transactionID string) (filterIndexer *engine.FilterIndexer, err error) {
var dispatcherIDs []string
dspIndexes := engine.NewFilterIndexer(api.DataManager, utils.DispatcherProfilePrefix,
utils.ConcatenatedKey(tenant, context))
var dspIndexes *engine.FilterIndexer
if dspIDs == nil {
ids, err := api.DataManager.DataDB().GetKeysForPrefix(utils.DispatcherProfilePrefix)
if err != nil {
@@ -870,7 +931,20 @@ func (api *APIerSv1) computeDispatcherIndexes(tenant, context string, dspIDs *[]
for _, id := range ids {
dispatcherIDs = append(dispatcherIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1])
}
// this will be on ComputeIndexes that contains empty indexes
dspIndexes = engine.NewFilterIndexer(api.DataManager, utils.DispatcherProfilePrefix,
utils.ConcatenatedKey(tenant, context))
} else {
// this will be on ComputeIndexesIDs that contains the old indexes from the next getter
var oldIDx map[string]utils.StringMap
if oldIDx, err = api.DataManager.GetFilterIndexes(utils.PrefixToIndexCache[utils.DispatcherProfilePrefix],
tenant, utils.EmptyString, nil); err != nil || oldIDx == nil {
dspIndexes = engine.NewFilterIndexer(api.DataManager, utils.DispatcherProfilePrefix,
utils.ConcatenatedKey(tenant, context))
} else {
dspIndexes = engine.NewFilterIndexerWithIndexes(api.DataManager, utils.DispatcherProfilePrefix,
utils.ConcatenatedKey(tenant, context), oldIDx)
}
dispatcherIDs = *dspIDs
transactionID = utils.NonTransactional
}

View File

@@ -26,12 +26,21 @@ import (
"github.com/cgrates/cgrates/utils"
)
// NewFilterIndexer creates the FilterIndexes without any indexes in it
func NewFilterIndexer(dm *DataManager, itemType, dbKeySuffix string) *FilterIndexer {
return &FilterIndexer{dm: dm, itemType: itemType, dbKeySuffix: dbKeySuffix,
indexes: make(map[string]utils.StringMap),
chngdIndxKeys: make(utils.StringMap)}
}
// NewFilterIndexerWithIndexes creates the FilterIndexes that contains already the old indexes
func NewFilterIndexerWithIndexes(dm *DataManager, itemType, dbKeySuffix string,
oldIdx map[string]utils.StringMap) *FilterIndexer {
return &FilterIndexer{dm: dm, itemType: itemType, dbKeySuffix: dbKeySuffix,
indexes: oldIdx,
chngdIndxKeys: make(utils.StringMap)}
}
// FilterIndexer is a centralized indexer for all data sources using RequestFilter
// retrieves and stores it's data from/to dataDB
// not thread safe, meant to be used as logic within other code blocks

View File

@@ -1692,7 +1692,6 @@ func (ms *MongoStorage) SetFilterIndexesDrv(cacheID, itemIDPrefix string,
_, err = ms.getCol(ColRFI).DeleteOne(sctx,
bson.M{"key": idxDbkey})
} else {
_, err = ms.getCol(ColRFI).UpdateOne(sctx, bson.M{"key": idxDbkey},
bson.M{"$set": bson.M{"key": idxDbkey, "value": itmMp.Slice()}},
options.Update().SetUpsert(true),