From 2d36afe34f9a4e7d9648517d9b15ab035a439dbf Mon Sep 17 00:00:00 2001 From: porosnicuadrian Date: Tue, 14 Sep 2021 16:56:01 +0300 Subject: [PATCH] New constructor for FilterIndexes with old idx to avoid overwrite at ComputeID --- apier/v1/filter_indexes.go | 92 ++++++++++++++++++++++++++++++---- engine/filterindexer.go | 9 ++++ engine/storage_mongo_datadb.go | 1 - 3 files changed, 92 insertions(+), 10 deletions(-) diff --git a/apier/v1/filter_indexes.go b/apier/v1/filter_indexes.go index 4ca39f6a4..2385e32e4 100644 --- a/apier/v1/filter_indexes.go +++ b/apier/v1/filter_indexes.go @@ -316,6 +316,7 @@ func (api *APIerSv1) ComputeFilterIndexIDs(args utils.ArgsComputeFilterIndexIDs, if err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + //StatQueueProfile Indexes sqpIndexers, err := api.computeStatIndexes(args.Tenant, &args.StatIDs, transactionID) if err != nil && err != utils.ErrNotFound { @@ -460,7 +461,7 @@ func (api *APIerSv1) ComputeFilterIndexIDs(args utils.ArgsComputeFilterIndexIDs, func (api *APIerSv1) computeThresholdIndexes(tenant string, thIDs *[]string, transactionID string) (filterIndexer *engine.FilterIndexer, err error) { var thresholdIDs []string - thdsIndexers := engine.NewFilterIndexer(api.DataManager, utils.ThresholdProfilePrefix, tenant) + var thdsIndexers *engine.FilterIndexer if thIDs == nil { ids, err := api.DataManager.DataDB().GetKeysForPrefix(utils.ThresholdProfilePrefix) if err != nil { @@ -469,7 +470,17 @@ func (api *APIerSv1) computeThresholdIndexes(tenant string, thIDs *[]string, for _, id := range ids { thresholdIDs = append(thresholdIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1]) } + // this will be on ComputeIndexes that contains empty indexes + thdsIndexers = engine.NewFilterIndexer(api.DataManager, utils.ThresholdProfilePrefix, tenant) } else { + // this will be on ComputeIndexesIDs that contains the old indexes from the next getter + var oldIDx map[string]utils.StringMap + if oldIDx, err = api.DataManager.GetFilterIndexes(utils.PrefixToIndexCache[utils.ThresholdProfilePrefix], + tenant, utils.EmptyString, nil); err != nil || oldIDx == nil { + thdsIndexers = engine.NewFilterIndexer(api.DataManager, utils.ThresholdProfilePrefix, tenant) + } else { + thdsIndexers = engine.NewFilterIndexerWithIndexes(api.DataManager, utils.ThresholdProfilePrefix, tenant, oldIDx) + } thresholdIDs = *thIDs transactionID = utils.NonTransactional } @@ -510,6 +521,7 @@ func (api *APIerSv1) computeThresholdIndexes(tenant string, thIDs *[]string, thdsIndexers.IndexTPFilter(engine.FilterToTPFilter(fltr), th.ID) } } + if transactionID == utils.NonTransactional { if err := thdsIndexers.StoreIndexes(true, transactionID); err != nil { return nil, err @@ -526,8 +538,7 @@ func (api *APIerSv1) computeThresholdIndexes(tenant string, thIDs *[]string, func (api *APIerSv1) computeAttributeIndexes(tenant, context string, attrIDs *[]string, transactionID string) (filterIndexer *engine.FilterIndexer, err error) { var attributeIDs []string - attrIndexers := engine.NewFilterIndexer(api.DataManager, utils.AttributeProfilePrefix, - utils.ConcatenatedKey(tenant, context)) + var attrIndexers *engine.FilterIndexer if attrIDs == nil { ids, err := api.DataManager.DataDB().GetKeysForPrefix(utils.AttributeProfilePrefix) if err != nil { @@ -536,7 +547,18 @@ func (api *APIerSv1) computeAttributeIndexes(tenant, context string, attrIDs *[] for _, id := range ids { attributeIDs = append(attributeIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1]) } + // this will be on ComputeIndexes that contains empty indexes + attrIndexers = engine.NewFilterIndexer(api.DataManager, utils.AttributeProfilePrefix, + utils.ConcatenatedKey(tenant, context)) } else { + // this will be on ComputeIndexesIDs that contains the old indexes from the next getter + var oldIDx map[string]utils.StringMap + if oldIDx, err = api.DataManager.GetFilterIndexes(utils.PrefixToIndexCache[utils.AttributeProfilePrefix], + tenant, utils.EmptyString, nil); err != nil || oldIDx == nil { + attrIndexers = engine.NewFilterIndexer(api.DataManager, utils.AttributeProfilePrefix, utils.ConcatenatedKey(tenant, context)) + } else { + attrIndexers = engine.NewFilterIndexerWithIndexes(api.DataManager, utils.AttributeProfilePrefix, utils.ConcatenatedKey(tenant, context), oldIDx) + } attributeIDs = *attrIDs transactionID = utils.NonTransactional } @@ -596,7 +618,7 @@ func (api *APIerSv1) computeAttributeIndexes(tenant, context string, attrIDs *[] func (api *APIerSv1) computeResourceIndexes(tenant string, rsIDs *[]string, transactionID string) (filterIndexer *engine.FilterIndexer, err error) { var resourceIDs []string - rpIndexers := engine.NewFilterIndexer(api.DataManager, utils.ResourceProfilesPrefix, tenant) + var rpIndexers *engine.FilterIndexer if rsIDs == nil { ids, err := api.DataManager.DataDB().GetKeysForPrefix(utils.ResourceProfilesPrefix) if err != nil { @@ -605,7 +627,17 @@ func (api *APIerSv1) computeResourceIndexes(tenant string, rsIDs *[]string, for _, id := range ids { resourceIDs = append(resourceIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1]) } + // this will be on ComputeIndexes that contains empty indexes + rpIndexers = engine.NewFilterIndexer(api.DataManager, utils.ResourceProfilesPrefix, tenant) } else { + // this will be on ComputeIndexesIDs that contains the old indexes from the next getter + var oldIDx map[string]utils.StringMap + if oldIDx, err = api.DataManager.GetFilterIndexes(utils.PrefixToIndexCache[utils.ResourceFilterIndexes], + tenant, utils.EmptyString, nil); err != nil || oldIDx == nil { + rpIndexers = engine.NewFilterIndexer(api.DataManager, utils.ResourceProfilesPrefix, tenant) + } else { + rpIndexers = engine.NewFilterIndexerWithIndexes(api.DataManager, utils.ResourceProfilesPrefix, tenant, oldIDx) + } resourceIDs = *rsIDs transactionID = utils.NonTransactional } @@ -662,7 +694,7 @@ func (api *APIerSv1) computeResourceIndexes(tenant string, rsIDs *[]string, func (api *APIerSv1) computeStatIndexes(tenant string, stIDs *[]string, transactionID string) (filterIndexer *engine.FilterIndexer, err error) { var statIDs []string - sqpIndexers := engine.NewFilterIndexer(api.DataManager, utils.StatQueueProfilePrefix, tenant) + var sqpIndexers *engine.FilterIndexer if stIDs == nil { ids, err := api.DataManager.DataDB().GetKeysForPrefix(utils.StatQueueProfilePrefix) if err != nil { @@ -671,7 +703,17 @@ func (api *APIerSv1) computeStatIndexes(tenant string, stIDs *[]string, for _, id := range ids { statIDs = append(statIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1]) } + // this will be on ComputeIndexes that contains empty indexes + sqpIndexers = engine.NewFilterIndexer(api.DataManager, utils.StatQueueProfilePrefix, tenant) } else { + // this will be on ComputeIndexesIDs that contains the old indexes from the next getter + var oldIDx map[string]utils.StringMap + if oldIDx, err = api.DataManager.GetFilterIndexes(utils.PrefixToIndexCache[utils.StatQueueProfilePrefix], + tenant, utils.EmptyString, nil); err != nil || oldIDx == nil { + sqpIndexers = engine.NewFilterIndexer(api.DataManager, utils.StatQueueProfilePrefix, tenant) + } else { + sqpIndexers = engine.NewFilterIndexerWithIndexes(api.DataManager, utils.StatQueueProfilePrefix, tenant, oldIDx) + } statIDs = *stIDs transactionID = utils.NonTransactional } @@ -728,7 +770,7 @@ func (api *APIerSv1) computeStatIndexes(tenant string, stIDs *[]string, func (api *APIerSv1) computeSupplierIndexes(tenant string, sppIDs *[]string, transactionID string) (filterIndexer *engine.FilterIndexer, err error) { var supplierIDs []string - sppIndexers := engine.NewFilterIndexer(api.DataManager, utils.SupplierProfilePrefix, tenant) + var sppIndexers *engine.FilterIndexer if sppIDs == nil { ids, err := api.DataManager.DataDB().GetKeysForPrefix(utils.SupplierProfilePrefix) if err != nil { @@ -737,7 +779,17 @@ func (api *APIerSv1) computeSupplierIndexes(tenant string, sppIDs *[]string, for _, id := range ids { supplierIDs = append(supplierIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1]) } + // this will be on ComputeIndexes that contains empty indexes + sppIndexers = engine.NewFilterIndexer(api.DataManager, utils.SupplierProfilePrefix, tenant) } else { + // this will be on ComputeIndexesIDs that contains the old indexes from the next getter + var oldIDx map[string]utils.StringMap + if oldIDx, err = api.DataManager.GetFilterIndexes(utils.PrefixToIndexCache[utils.SupplierProfilePrefix], + tenant, utils.EmptyString, nil); err != nil || oldIDx == nil { + sppIndexers = engine.NewFilterIndexer(api.DataManager, utils.SupplierProfilePrefix, tenant) + } else { + sppIndexers = engine.NewFilterIndexerWithIndexes(api.DataManager, utils.SupplierProfilePrefix, tenant, oldIDx) + } supplierIDs = *sppIDs transactionID = utils.NonTransactional } @@ -794,7 +846,7 @@ func (api *APIerSv1) computeSupplierIndexes(tenant string, sppIDs *[]string, func (api *APIerSv1) computeChargerIndexes(tenant string, cppIDs *[]string, transactionID string) (filterIndexer *engine.FilterIndexer, err error) { var chargerIDs []string - cppIndexes := engine.NewFilterIndexer(api.DataManager, utils.ChargerProfilePrefix, tenant) + var cppIndexes *engine.FilterIndexer if cppIDs == nil { ids, err := api.DataManager.DataDB().GetKeysForPrefix(utils.ChargerProfilePrefix) if err != nil { @@ -803,7 +855,17 @@ func (api *APIerSv1) computeChargerIndexes(tenant string, cppIDs *[]string, for _, id := range ids { chargerIDs = append(chargerIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1]) } + // this will be on ComputeIndexes that contains empty indexes + cppIndexes = engine.NewFilterIndexer(api.DataManager, utils.ChargerProfilePrefix, tenant) } else { + // this will be on ComputeIndexesIDs that contains the old indexes from the next getter + var oldIDx map[string]utils.StringMap + if oldIDx, err = api.DataManager.GetFilterIndexes(utils.PrefixToIndexCache[utils.ChargerProfilePrefix], + tenant, utils.EmptyString, nil); err != nil || oldIDx == nil { + cppIndexes = engine.NewFilterIndexer(api.DataManager, utils.ChargerProfilePrefix, tenant) + } else { + cppIndexes = engine.NewFilterIndexerWithIndexes(api.DataManager, utils.ChargerProfilePrefix, tenant, oldIDx) + } chargerIDs = *cppIDs transactionID = utils.NonTransactional } @@ -860,8 +922,7 @@ func (api *APIerSv1) computeChargerIndexes(tenant string, cppIDs *[]string, func (api *APIerSv1) computeDispatcherIndexes(tenant, context string, dspIDs *[]string, transactionID string) (filterIndexer *engine.FilterIndexer, err error) { var dispatcherIDs []string - dspIndexes := engine.NewFilterIndexer(api.DataManager, utils.DispatcherProfilePrefix, - utils.ConcatenatedKey(tenant, context)) + var dspIndexes *engine.FilterIndexer if dspIDs == nil { ids, err := api.DataManager.DataDB().GetKeysForPrefix(utils.DispatcherProfilePrefix) if err != nil { @@ -870,7 +931,20 @@ func (api *APIerSv1) computeDispatcherIndexes(tenant, context string, dspIDs *[] for _, id := range ids { dispatcherIDs = append(dispatcherIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1]) } + // this will be on ComputeIndexes that contains empty indexes + dspIndexes = engine.NewFilterIndexer(api.DataManager, utils.DispatcherProfilePrefix, + utils.ConcatenatedKey(tenant, context)) } else { + // this will be on ComputeIndexesIDs that contains the old indexes from the next getter + var oldIDx map[string]utils.StringMap + if oldIDx, err = api.DataManager.GetFilterIndexes(utils.PrefixToIndexCache[utils.DispatcherProfilePrefix], + tenant, utils.EmptyString, nil); err != nil || oldIDx == nil { + dspIndexes = engine.NewFilterIndexer(api.DataManager, utils.DispatcherProfilePrefix, + utils.ConcatenatedKey(tenant, context)) + } else { + dspIndexes = engine.NewFilterIndexerWithIndexes(api.DataManager, utils.DispatcherProfilePrefix, + utils.ConcatenatedKey(tenant, context), oldIDx) + } dispatcherIDs = *dspIDs transactionID = utils.NonTransactional } diff --git a/engine/filterindexer.go b/engine/filterindexer.go index c5d7a960f..3bc9eea37 100644 --- a/engine/filterindexer.go +++ b/engine/filterindexer.go @@ -26,12 +26,21 @@ import ( "github.com/cgrates/cgrates/utils" ) +// NewFilterIndexer creates the FilterIndexes without any indexes in it func NewFilterIndexer(dm *DataManager, itemType, dbKeySuffix string) *FilterIndexer { return &FilterIndexer{dm: dm, itemType: itemType, dbKeySuffix: dbKeySuffix, indexes: make(map[string]utils.StringMap), chngdIndxKeys: make(utils.StringMap)} } +// NewFilterIndexerWithIndexes creates the FilterIndexes that contains already the old indexes +func NewFilterIndexerWithIndexes(dm *DataManager, itemType, dbKeySuffix string, + oldIdx map[string]utils.StringMap) *FilterIndexer { + return &FilterIndexer{dm: dm, itemType: itemType, dbKeySuffix: dbKeySuffix, + indexes: oldIdx, + chngdIndxKeys: make(utils.StringMap)} +} + // FilterIndexer is a centralized indexer for all data sources using RequestFilter // retrieves and stores it's data from/to dataDB // not thread safe, meant to be used as logic within other code blocks diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 32fb6c17a..c78e64503 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -1692,7 +1692,6 @@ func (ms *MongoStorage) SetFilterIndexesDrv(cacheID, itemIDPrefix string, _, err = ms.getCol(ColRFI).DeleteOne(sctx, bson.M{"key": idxDbkey}) } else { - _, err = ms.getCol(ColRFI).UpdateOne(sctx, bson.M{"key": idxDbkey}, bson.M{"$set": bson.M{"key": idxDbkey, "value": itmMp.Slice()}}, options.Update().SetUpsert(true),