From 8d05c14aeca51549855e610ff658367385127898 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 17 Sep 2025 19:38:06 +0300 Subject: [PATCH] add support for batched index keys removal --- apier/v1/filter_indexes.go | 2 +- engine/datadbmock.go | 2 +- engine/datamanager.go | 29 +++++++++++++++------------- engine/datamanager_test.go | 2 +- engine/storage_interface.go | 2 +- engine/storage_internal_datadb.go | 8 +++++--- engine/storage_mongo_datadb.go | 32 +++++++++++++++++-------------- engine/storage_redis.go | 11 ++++++++--- engine/storage_test.go | 2 +- engine/z_filterindexer_it_test.go | 2 +- 10 files changed, 53 insertions(+), 39 deletions(-) diff --git a/apier/v1/filter_indexes.go b/apier/v1/filter_indexes.go index bed00e241..d620ff7d7 100644 --- a/apier/v1/filter_indexes.go +++ b/apier/v1/filter_indexes.go @@ -79,7 +79,7 @@ func (apierSv1 *APIerSv1) RemoveFilterIndexes(ctx *context.Context, arg *AttrRem arg.ItemType = utils.CacheAttributeFilterIndexes tntCtx = utils.ConcatenatedKey(tnt, arg.Context) } - if err = apierSv1.DataManager.RemoveIndexes(arg.ItemType, tntCtx, utils.EmptyString); err != nil { + if err = apierSv1.DataManager.RemoveIndexes(arg.ItemType, tntCtx); err != nil { return } //generate a loadID for CacheFilterIndexes and store it in database diff --git a/engine/datadbmock.go b/engine/datadbmock.go index 3d092dec4..99522885e 100644 --- a/engine/datadbmock.go +++ b/engine/datadbmock.go @@ -351,7 +351,7 @@ func (dbM *DataDBMock) SetIndexesDrv(idxItmType, tntCtx string, return utils.ErrNotImplemented } -func (dbM *DataDBMock) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) (err error) { +func (dbM *DataDBMock) RemoveIndexesDrv(idxItmType, tntCtx string, idxKeys ...string) (err error) { return utils.ErrNotImplemented } diff --git a/engine/datamanager.go b/engine/datamanager.go index 359d29363..3aaa3b5eb 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -3855,25 +3855,28 @@ func (dm *DataManager) SetIndexes(idxItmType, tntCtx string, }, itm) } -func (dm *DataManager) RemoveIndexes(idxItmType, tntCtx, idxKey string) (err error) { +func (dm *DataManager) RemoveIndexes(idxItmType, tntCtx string, idxKeys ...string) (err error) { if dm == nil { return utils.ErrNoDatabaseConn } - if err = dm.DataDB().RemoveIndexesDrv(idxItmType, tntCtx, idxKey); err != nil { + if err = dm.DataDB().RemoveIndexesDrv(idxItmType, tntCtx, idxKeys...); err != nil { return } itm := config.CgrConfig().DataDbCfg().Items[idxItmType] - _ = dm.replicator.replicate( - utils.CacheInstanceToPrefix[idxItmType], tntCtx, // these are used to get the host IDs from cache - utils.ReplicatorSv1RemoveIndexes, - &utils.GetIndexesArg{ - IdxItmType: idxItmType, - TntCtx: tntCtx, - IdxKey: idxKey, - Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), - }, itm) + // Handle replication for each key since replication API supports single key only + for _, idxKey := range idxKeys { + _ = dm.replicator.replicate( + utils.CacheInstanceToPrefix[idxItmType], tntCtx, // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveIndexes, + &utils.GetIndexesArg{ + IdxItmType: idxItmType, + TntCtx: tntCtx, + IdxKey: idxKey, + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) + } return } diff --git a/engine/datamanager_test.go b/engine/datamanager_test.go index 80703f999..3395af1da 100644 --- a/engine/datamanager_test.go +++ b/engine/datamanager_test.go @@ -4831,7 +4831,7 @@ func TestDmIndexes(t *testing.T) { "cgrates.org", idxes, false, utils.NonTransactional); err != nil { t.Error(err) } - if err := dm.RemoveIndexes(utils.CacheResourceFilterIndexes, "cgrates.org", utils.EmptyString); err != nil { + if err := dm.RemoveIndexes(utils.CacheResourceFilterIndexes, "cgrates.org"); err != nil { t.Error(err) } } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 4d5fb5dde..c39a1057d 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -100,7 +100,7 @@ type DataDB interface { GetIndexesDrv(idxItmType, tntCtx string, idxKeys ...string) (indexes map[string]utils.StringSet, err error) SetIndexesDrv(idxItmType, tntCtx string, indexes map[string]utils.StringSet, commit bool, transactionID string) (err error) - RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) (err error) + RemoveIndexesDrv(idxItmType, tntCtx string, idxKeys ...string) (err error) GetStatQueueProfileDrv(tenant string, ID string) (sq *StatQueueProfile, err error) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) RemStatQueueProfileDrv(tenant, id string) (err error) diff --git a/engine/storage_internal_datadb.go b/engine/storage_internal_datadb.go index 7a9658789..e5a569d5b 100644 --- a/engine/storage_internal_datadb.go +++ b/engine/storage_internal_datadb.go @@ -976,12 +976,14 @@ func (iDB *InternalDB) SetIndexesDrv(idxItmType, tntCtx string, return } -func (iDB *InternalDB) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) (err error) { - if idxKey == utils.EmptyString { +func (iDB *InternalDB) RemoveIndexesDrv(idxItmType, tntCtx string, idxKeys ...string) (err error) { + if len(idxKeys) == 0 || (len(idxKeys) == 1 && idxKeys[0] == utils.EmptyString) { // remove all iDB.db.RemoveGroup(idxItmType, tntCtx, true, utils.EmptyString) return } - iDB.db.Remove(idxItmType, utils.ConcatenatedKey(tntCtx, idxKey), true, utils.NonTransactional) + for _, idxKey := range idxKeys { + iDB.db.Remove(idxItmType, utils.ConcatenatedKey(tntCtx, idxKey), true, utils.NonTransactional) + } return } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 2fa713c98..108004be1 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -2177,27 +2177,31 @@ func (ms *MongoStorage) SetIndexesDrv(idxItmType, tntCtx string, } // RemoveIndexesDrv removes the indexes -func (ms *MongoStorage) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) error { - if len(idxKey) != 0 { +func (ms *MongoStorage) RemoveIndexesDrv(idxItmType, tntCtx string, idxKeys ...string) error { + if len(idxKeys) == 0 || (len(idxKeys) == 1 && idxKeys[0] == utils.EmptyString) { // remove all + regexKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx + for _, character := range []string{".", "*"} { + regexKey = strings.ReplaceAll(regexKey, character, `\`+character) + } + // For optimization, use a caret (^) in the regex pattern. return ms.query(func(sctx mongo.SessionContext) error { - dr, err := ms.getCol(ColIndx).DeleteOne(sctx, - bson.M{"key": utils.ConcatenatedKey(utils.CacheInstanceToPrefix[idxItmType]+tntCtx, idxKey)}) - if dr.DeletedCount == 0 { - return utils.ErrNotFound - } + _, err := ms.getCol(ColIndx).DeleteMany(sctx, bson.M{ + "key": primitive.Regex{ + Pattern: "^" + regexKey, + }, + }) return err }) } - regexKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx - for _, character := range []string{".", "*"} { - regexKey = strings.ReplaceAll(regexKey, character, `\`+character) + // Remove specific keys (single or multiple) using $in query. + dbKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx + inKeys := make([]string, len(idxKeys)) + for i, key := range idxKeys { + inKeys[i] = utils.ConcatenatedKey(dbKey, key) } - // For optimization, use a caret (^) in the regex pattern. return ms.query(func(sctx mongo.SessionContext) error { _, err := ms.getCol(ColIndx).DeleteMany(sctx, bson.M{ - "key": primitive.Regex{ - Pattern: "^" + regexKey, - }, + "key": bson.M{"$in": inKeys}, }) return err }) diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 8c5166032..f16993c6b 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1436,11 +1436,16 @@ func (rs *RedisStorage) SetIndexesDrv(idxItmType, tntCtx string, return } -func (rs *RedisStorage) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) (err error) { - if idxKey == utils.EmptyString { +func (rs *RedisStorage) RemoveIndexesDrv(idxItmType, tntCtx string, idxKeys ...string) (err error) { + if len(idxKeys) == 0 || (len(idxKeys) == 1 && idxKeys[0] == utils.EmptyString) { return rs.Cmd(nil, redis_DEL, utils.CacheInstanceToPrefix[idxItmType]+tntCtx) } - return rs.Cmd(nil, redis_HDEL, utils.CacheInstanceToPrefix[idxItmType]+tntCtx, idxKey) + args := make([]string, len(idxKeys)+1) + args[0] = utils.CacheInstanceToPrefix[idxItmType] + tntCtx + for i, key := range idxKeys { + args[i+1] = key + } + return rs.Cmd(nil, redis_HDEL, args...) } // Will backup active sessions in DataDB diff --git a/engine/storage_test.go b/engine/storage_test.go index 8e56314eb..3dca4f7ab 100644 --- a/engine/storage_test.go +++ b/engine/storage_test.go @@ -525,7 +525,7 @@ func TestIDBRemoveIndexesDrv(t *testing.T) { idb.db.Set("chID", "itmID", true, []string{utils.EmptyString}, true, "trID") idb.db.Set("chID2", "itmIDv", true, []string{"grpID"}, true, "trID") - if err := idb.RemoveIndexesDrv("chID", utils.EmptyString, utils.EmptyString); err != nil { + if err := idb.RemoveIndexesDrv("chID", utils.EmptyString); err != nil { t.Error(err) } if err := idb.RemoveIndexesDrv("chID2", "itmID", "v"); err != nil { diff --git a/engine/z_filterindexer_it_test.go b/engine/z_filterindexer_it_test.go index 114f1968f..ba2fe6c5b 100644 --- a/engine/z_filterindexer_it_test.go +++ b/engine/z_filterindexer_it_test.go @@ -1698,7 +1698,7 @@ func testITTestIndexingThresholds(t *testing.T) { } else if !reflect.DeepEqual(eMp, rcvMp) { t.Errorf("Expecting: %+v, received: %+v", eMp, rcvMp) } - if err := dataManager.RemoveIndexes(utils.CacheThresholdFilterIndexes, th.Tenant, utils.EmptyString); err != nil { + if err := dataManager.RemoveIndexes(utils.CacheThresholdFilterIndexes, th.Tenant); err != nil { t.Error(err) } else if _, err := dataManager.GetIndexes( utils.CacheThresholdFilterIndexes, th.Tenant,