replicator: allow batched get/remove for indexes

This commit is contained in:
ionutboangiu
2025-09-18 07:23:45 +03:00
committed by Dan Christian Bogos
parent 8546c4c64e
commit e7b30a139f
4 changed files with 27 additions and 31 deletions

View File

@@ -3785,17 +3785,12 @@ func (dm *DataManager) GetIndexes(idxItmType, tntCtx string, cacheRead, cacheWri
if indexes, err = dm.DataDB().GetIndexesDrv(idxItmType, tntCtx, idxKeys...); err != nil {
if itm := config.CgrConfig().DataDbCfg().Items[idxItmType]; err == utils.ErrNotFound && itm.Remote {
// For remote fallback, use single key for backward compatibility
var singleKey string
if len(idxKeys) > 0 {
singleKey = idxKeys[0]
}
if err = dm.connMgr.Call(context.TODO(), config.CgrConfig().DataDbCfg().RmtConns,
utils.ReplicatorSv1GetIndexes,
&utils.GetIndexesArg{
IdxItmType: idxItmType,
TntCtx: tntCtx,
IdxKey: singleKey,
IdxKeys: idxKeys,
Tenant: config.CgrConfig().GeneralCfg().DefaultTenant,
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, utils.EmptyString,
utils.FirstNonEmpty(config.CgrConfig().DataDbCfg().RmtConnID,
@@ -3806,10 +3801,12 @@ func (dm *DataManager) GetIndexes(idxItmType, tntCtx string, cacheRead, cacheWri
}
if err != nil {
err = utils.CastRPCErr(err)
if err == utils.ErrNotFound && cacheWrite && len(idxKeys) == 1 && idxKeys[0] != utils.EmptyString {
if errCh := Cache.Set(idxItmType, utils.ConcatenatedKey(tntCtx, idxKeys[0]), nil, []string{tntCtx},
true, utils.NonTransactional); errCh != nil {
return nil, errCh
if err == utils.ErrNotFound && cacheWrite {
for _, key := range idxKeys {
if errCh := Cache.Set(idxItmType, utils.ConcatenatedKey(tntCtx, key), nil, []string{tntCtx},
true, utils.NonTransactional); errCh != nil {
return nil, errCh
}
}
}
return nil, err
@@ -3858,20 +3855,17 @@ func (dm *DataManager) RemoveIndexes(idxItmType, tntCtx string, idxKeys ...strin
return
}
itm := config.CgrConfig().DataDbCfg().Items[idxItmType]
// Handle replication for each key since replication API supports single key only
for _, idxKey := range idxKeys {
_ = dm.replicator.replicate(
utils.CacheInstanceToPrefix[idxItmType], tntCtx, // these are used to get the host IDs from cache
utils.ReplicatorSv1RemoveIndexes,
&utils.GetIndexesArg{
IdxItmType: idxItmType,
TntCtx: tntCtx,
IdxKey: idxKey,
Tenant: config.CgrConfig().GeneralCfg().DefaultTenant,
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString),
}, itm)
}
_ = dm.replicator.replicate(
utils.CacheInstanceToPrefix[idxItmType], tntCtx, // these are used to get the host IDs from cache
utils.ReplicatorSv1RemoveIndexes,
&utils.GetIndexesArg{
IdxItmType: idxItmType,
TntCtx: tntCtx,
IdxKeys: idxKeys,
Tenant: config.CgrConfig().GeneralCfg().DefaultTenant,
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString),
}, itm)
return
}

View File

@@ -4800,7 +4800,7 @@ func TestDmIndexes(t *testing.T) {
if !cancast {
return utils.ErrNotConvertible
}
dm.DataDB().RemoveIndexesDrv(gIdxArg.IdxItmType, gIdxArg.Tenant, utils.EmptyString)
dm.DataDB().RemoveIndexesDrv(gIdxArg.IdxItmType, gIdxArg.Tenant, gIdxArg.IdxKeys...)
return nil
},
},