Added transaction for filter indexes(case of computeIDs) + little update on mongo/redis get/set index

This commit is contained in:
porosnicuadrian
2021-08-24 17:36:20 +03:00
committed by Dan Christian Bogos
parent 8c762e8e7e
commit 717d372fac
8 changed files with 111 additions and 29 deletions

View File

@@ -16,6 +16,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package apis
import (
"fmt"
"strings"
"time"
@@ -136,7 +137,7 @@ func (adms *AdminSv1) GetFilterIndexes(ctx *context.Context, arg *AttrGetFilterI
arg.ItemType = utils.CacheAttributeFilterIndexes
}
if indexes, err = adms.dm.GetIndexes(ctx,
arg.ItemType, tntCtx, utils.EmptyString, true, true); err != nil {
arg.ItemType, tntCtx, utils.EmptyString, utils.EmptyString, true, true); err != nil {
return
}
if arg.FilterType != utils.EmptyString {
@@ -353,6 +354,7 @@ func (adms *AdminSv1) ComputeFilterIndexes(ctx *context.Context, args *utils.Arg
}
args.ActionS = indexes.Size() != 0
}
// RateFilter Indexes
var ratePrf []string
if args.RateS {
cacheIDs[utils.CacheRateProfilesFilterIndexes] = []string{utils.MetaAny}
@@ -484,7 +486,7 @@ func (adms *AdminSv1) ComputeFilterIndexes(ctx *context.Context, args *utils.Arg
// ComputeFilterIndexIDs computes specific filter indexes
func (adms *AdminSv1) ComputeFilterIndexIDs(ctx *context.Context, args *utils.ArgsComputeFilterIndexIDs, reply *string) (err error) {
transactionID := utils.NonTransactional
transactionID := utils.GenUUID()
tnt := args.Tenant
if tnt == utils.EmptyString {
tnt = adms.cfg.GeneralCfg().DefaultTenant
@@ -607,12 +609,14 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(ctx *context.Context, args *utils.Ar
cacheIDs[utils.CacheActionProfilesFilterIndexes] = indexes.AsSlice()
}
//RateProfile Indexes
var ratePrf []string
if _, err = engine.ComputeIndexes(ctx, adms.dm, tnt, utils.EmptyString, utils.CacheRateProfilesFilterIndexes,
&args.RateProfileIDs, transactionID, func(tnt, id, grp string) (*[]string, error) {
rpr, e := adms.dm.GetRateProfile(ctx, tnt, id, true, false, utils.NonTransactional)
if e != nil {
return nil, e
}
ratePrf = append(ratePrf, utils.ConcatenatedKey(tnt, id))
rtIds := make([]string, 0, len(rpr.Rates))
for key := range rpr.Rates {
rtIds = append(rtIds, key)
@@ -648,6 +652,75 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(ctx *context.Context, args *utils.Ar
if indexes.Size() != 0 {
cacheIDs[utils.CacheDispatcherFilterIndexes] = indexes.AsSlice()
}
//Now we move from tmpKey to the right key for each type
//ThresholdProfile Indexes
if len(args.ThresholdIDs) != 0 {
if err = adms.dm.SetIndexes(ctx, utils.CacheThresholdFilterIndexes, tnt, nil, true, transactionID); err != nil {
return
}
}
//AccountProfile Indexes
if len(args.AccountIDs) != 0 {
if err = adms.dm.SetIndexes(ctx, utils.CacheAccountsFilterIndexes, tnt, nil, true, transactionID); err != nil {
return
}
}
//ActionProfile Indexes
if len(args.ActionProfileIDs) != 0 {
if err = adms.dm.SetIndexes(ctx, utils.CacheActionProfilesFilterIndexes, tnt, nil, true, transactionID); err != nil {
return
}
}
//RateProfile Indexes
if len(args.RateProfileIDs) != 0 {
if err = adms.dm.SetIndexes(ctx, utils.CacheRateProfilesFilterIndexes, tnt, nil, true, transactionID); err != nil {
return err
}
for _, tntId := range ratePrf {
if err = adms.dm.SetIndexes(ctx, utils.CacheRateFilterIndexes, tntId, nil, true, transactionID); err != nil {
return err
}
}
}
//StatQueueProfile Indexes
if len(args.StatIDs) != 0 {
if err = adms.dm.SetIndexes(ctx, utils.CacheStatFilterIndexes, tnt, nil, true, transactionID); err != nil {
return
}
}
//ResourceProfile Indexes
if len(args.ResourceIDs) != 0 {
if err = adms.dm.SetIndexes(ctx, utils.CacheResourceFilterIndexes, tnt, nil, true, transactionID); err != nil {
return
}
}
//RouteProfile Indexes
if len(args.RouteIDs) != 0 {
if err = adms.dm.SetIndexes(ctx, utils.CacheRouteFilterIndexes, tnt, nil, true, transactionID); err != nil {
return
}
}
//AttributeProfile Indexes
if len(args.AttributeIDs) != 0 {
if err = adms.dm.SetIndexes(ctx, utils.CacheAttributeFilterIndexes, tnt, nil, true, transactionID); err != nil {
return
}
}
//ChargerProfile Indexes
if len(args.ChargerIDs) != 0 {
if err = adms.dm.SetIndexes(ctx, utils.CacheChargerFilterIndexes, tnt, nil, true, transactionID); err != nil {
return
}
}
//DispatcherProfile Indexes
if len(args.DispatcherIDs) != 0 {
if err = adms.dm.SetIndexes(ctx, utils.CacheDispatcherFilterIndexes, tnt, nil, true, transactionID); err != nil {
return
}
}
//generate a load
//ID for CacheFilterIndexes and store it in database
loadIDs := make(map[string]int64)
timeNow := time.Now().UnixNano()

View File

@@ -28,7 +28,7 @@ type DataDBMock struct {
SetRateProfileDrvF func(*context.Context, *utils.RateProfile) error
GetRateProfileDrvF func(*context.Context, string, string) (*utils.RateProfile, error)
GetKeysForPrefixF func(*context.Context, string) ([]string, error)
GetIndexesDrvF func(ctx *context.Context, idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error)
GetIndexesDrvF func(ctx *context.Context, idxItmType, tntCtx, idxKey, transactionID string) (indexes map[string]utils.StringSet, err error)
SetIndexesDrvF func(ctx *context.Context, idxItmType, tntCtx string, indexes map[string]utils.StringSet, commit bool, transactionID string) (err error)
GetAttributeProfileDrvF func(ctx *context.Context, str1 string, str2 string) (*AttributeProfile, error)
SetAttributeProfileDrvF func(ctx *context.Context, attr *AttributeProfile) error
@@ -151,9 +151,9 @@ func (dbM *DataDBMock) AddLoadHistory(*utils.LoadInstance, int, string) error {
return utils.ErrNotImplemented
}
func (dbM *DataDBMock) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) {
func (dbM *DataDBMock) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey, transactionID string) (indexes map[string]utils.StringSet, err error) {
if dbM.GetIndexesDrvF != nil {
return dbM.GetIndexesDrvF(ctx, idxItmType, tntCtx, idxKey)
return dbM.GetIndexesDrvF(ctx, idxItmType, tntCtx, idxKey, transactionID)
}
return nil, utils.ErrNotImplemented
}

View File

@@ -202,74 +202,74 @@ func (dm *DataManager) CacheDataFromDB(ctx *context.Context, prfx string, ids []
if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil {
return
}
_, err = dm.GetIndexes(ctx, utils.CacheAttributeFilterIndexes, tntCtx, idxKey, false, true)
_, err = dm.GetIndexes(ctx, utils.CacheAttributeFilterIndexes, tntCtx, idxKey, utils.NonTransactional, false, true)
case utils.ResourceFilterIndexes:
var tntCtx, idxKey string
if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil {
return
}
_, err = dm.GetIndexes(ctx, utils.CacheResourceFilterIndexes, tntCtx, idxKey, false, true)
_, err = dm.GetIndexes(ctx, utils.CacheResourceFilterIndexes, tntCtx, idxKey, utils.NonTransactional, false, true)
case utils.StatFilterIndexes:
var tntCtx, idxKey string
if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil {
return
}
_, err = dm.GetIndexes(ctx, utils.CacheStatFilterIndexes, tntCtx, idxKey, false, true)
_, err = dm.GetIndexes(ctx, utils.CacheStatFilterIndexes, tntCtx, idxKey, utils.NonTransactional, false, true)
case utils.ThresholdFilterIndexes:
var tntCtx, idxKey string
if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil {
return
}
_, err = dm.GetIndexes(ctx, utils.CacheThresholdFilterIndexes, tntCtx, idxKey, false, true)
_, err = dm.GetIndexes(ctx, utils.CacheThresholdFilterIndexes, tntCtx, idxKey, utils.NonTransactional, false, true)
case utils.RouteFilterIndexes:
var tntCtx, idxKey string
if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil {
return
}
_, err = dm.GetIndexes(ctx, utils.CacheRouteFilterIndexes, tntCtx, idxKey, false, true)
_, err = dm.GetIndexes(ctx, utils.CacheRouteFilterIndexes, tntCtx, idxKey, utils.NonTransactional, false, true)
case utils.ChargerFilterIndexes:
var tntCtx, idxKey string
if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil {
return
}
_, err = dm.GetIndexes(ctx, utils.CacheChargerFilterIndexes, tntCtx, idxKey, false, true)
_, err = dm.GetIndexes(ctx, utils.CacheChargerFilterIndexes, tntCtx, idxKey, utils.NonTransactional, false, true)
case utils.DispatcherFilterIndexes:
var tntCtx, idxKey string
if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil {
return
}
_, err = dm.GetIndexes(ctx, utils.CacheDispatcherFilterIndexes, tntCtx, idxKey, false, true)
_, err = dm.GetIndexes(ctx, utils.CacheDispatcherFilterIndexes, tntCtx, idxKey, utils.NonTransactional, false, true)
case utils.RateProfilesFilterIndexPrfx:
var tntCtx, idxKey string
if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil {
return
}
_, err = dm.GetIndexes(ctx, utils.CacheRateProfilesFilterIndexes, tntCtx, idxKey, false, true)
_, err = dm.GetIndexes(ctx, utils.CacheRateProfilesFilterIndexes, tntCtx, idxKey, utils.NonTransactional, false, true)
case utils.RateFilterIndexPrfx:
var tntCtx, idxKey string
if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil {
return
}
_, err = dm.GetIndexes(ctx, utils.CacheRateFilterIndexes, tntCtx, idxKey, false, true)
_, err = dm.GetIndexes(ctx, utils.CacheRateFilterIndexes, tntCtx, idxKey, utils.NonTransactional, false, true)
case utils.ActionProfilesFilterIndexPrfx:
var tntCtx, idxKey string
if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil {
return
}
_, err = dm.GetIndexes(ctx, utils.CacheActionProfilesFilterIndexes, tntCtx, idxKey, false, true)
_, err = dm.GetIndexes(ctx, utils.CacheActionProfilesFilterIndexes, tntCtx, idxKey, utils.NonTransactional, false, true)
case utils.AccountFilterIndexPrfx:
var tntCtx, idxKey string
if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil {
return
}
_, err = dm.GetIndexes(ctx, utils.CacheAccountsFilterIndexes, tntCtx, idxKey, false, true)
_, err = dm.GetIndexes(ctx, utils.CacheAccountsFilterIndexes, tntCtx, idxKey, utils.NonTransactional, false, true)
case utils.FilterIndexPrfx:
idx := strings.LastIndexByte(dataID, utils.InInFieldSep[0])
if idx < 0 {
err = fmt.Errorf("WRONG_IDX_KEY_FORMAT<%s>", dataID)
return
}
_, err = dm.GetIndexes(ctx, utils.CacheReverseFilterIndexes, dataID[:idx], dataID[idx+1:], false, true)
_, err = dm.GetIndexes(ctx, utils.CacheReverseFilterIndexes, dataID[:idx], dataID[idx+1:], utils.NonTransactional, false, true)
case utils.LoadIDPrefix:
_, err = dm.GetItemLoadIDs(ctx, utils.EmptyString, true)
case utils.MetaAPIBan:
@@ -393,7 +393,7 @@ func (dm *DataManager) RemoveFilter(ctx *context.Context, tenant, id string, wit
tntCtx = utils.ConcatenatedKey(tenant, id)
var rcvIndx map[string]utils.StringSet
if rcvIndx, err = dm.GetIndexes(ctx, utils.CacheReverseFilterIndexes, tntCtx,
utils.EmptyString, true, true); err != nil {
utils.EmptyString, utils.NonTransactional, true, true); err != nil {
if err != utils.ErrNotFound {
return
}
@@ -2288,7 +2288,7 @@ func (dm *DataManager) Reconnect(d DataDB) {
dm.dataDB = d
}
func (dm *DataManager) GetIndexes(ctx *context.Context, idxItmType, tntCtx, idxKey string,
func (dm *DataManager) GetIndexes(ctx *context.Context, idxItmType, tntCtx, idxKey, transactionID string,
cacheRead, cacheWrite bool) (indexes map[string]utils.StringSet, err error) {
if dm == nil {
err = utils.ErrNoDatabaseConn
@@ -2305,7 +2305,7 @@ func (dm *DataManager) GetIndexes(ctx *context.Context, idxItmType, tntCtx, idxK
}, nil
}
}
if indexes, err = dm.DataDB().GetIndexesDrv(ctx, idxItmType, tntCtx, idxKey); err != nil {
if indexes, err = dm.DataDB().GetIndexesDrv(ctx, idxItmType, tntCtx, idxKey, transactionID); err != nil {
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaIndexes]; err == utils.ErrNotFound && itm.Remote {
if err = dm.connMgr.Call(ctx, config.CgrConfig().DataDbCfg().RmtConns,
utils.ReplicatorSv1GetIndexes,
@@ -2333,7 +2333,6 @@ func (dm *DataManager) GetIndexes(ctx *context.Context, idxItmType, tntCtx, idxK
return nil, err
}
}
if cacheWrite {
for k, v := range indexes {
if err = Cache.Set(ctx, idxItmType, utils.ConcatenatedKey(tntCtx, k), v, []string{tntCtx},

View File

@@ -77,7 +77,7 @@ func MatchingItemIDsForEvent(ctx *context.Context, ev utils.MapStorage, stringFl
for _, val := range fldVals {
var dbIndexes map[string]utils.StringSet // list of items matched in DB
key := utils.ConcatenatedKey(filterIndexTypes[i], fldName, val)
if dbIndexes, err = dm.GetIndexes(ctx, cacheID, itemIDPrefix, key, true, true); err != nil {
if dbIndexes, err = dm.GetIndexes(ctx, cacheID, itemIDPrefix, key, utils.NonTransactional, true, true); err != nil {
if err == utils.ErrNotFound {
err = nil
continue

View File

@@ -183,7 +183,7 @@ func getIHFltrIdxFromCache(ctx *context.Context, dm *DataManager, fltrIdxCache *
return fltrVal.(utils.StringSet), nil
}
var indexes map[string]utils.StringSet
if indexes, err = dm.GetIndexes(ctx, idxItmType, tntGrp, idxKey, true, false); err != nil {
if indexes, err = dm.GetIndexes(ctx, idxItmType, tntGrp, idxKey, utils.NonTransactional, true, false); err != nil {
if err == utils.ErrNotFound {
fltrIdxCache.Set(cacheKey, nil, nil)
}

View File

@@ -56,7 +56,7 @@ type DataDB interface {
RemoveResourceDrv(*context.Context, string, string) error
GetLoadHistory(int, bool, string) ([]*utils.LoadInstance, error)
AddLoadHistory(*utils.LoadInstance, int, string) error
GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error)
GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey, transactionID string) (indexes map[string]utils.StringSet, err error)
SetIndexesDrv(ctx *context.Context, idxItmType, tntCtx string,
indexes map[string]utils.StringSet, commit bool, transactionID string) (err error)
RemoveIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey string) (err error)

View File

@@ -1292,12 +1292,16 @@ func (ms *MongoStorage) RemoveActionProfileDrv(ctx *context.Context, tenant, id
// GetIndexesDrv retrieves Indexes from dataDB
// the key is the tenant of the item or in case of context dependent profiles is a concatenatedKey between tenant and context
// id is used as a concatenated key in case of filterIndexes the id will be filterType:fieldName:fieldVal
func (ms *MongoStorage) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) {
func (ms *MongoStorage) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey, transactionID string) (indexes map[string]utils.StringSet, err error) {
type result struct {
Key string
Value []string
}
dbKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx
originKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx
if transactionID != utils.NonTransactional {
originKey = "tmp_" + utils.ConcatenatedKey(originKey, transactionID)
}
dbKey := originKey
var q bson.M
if len(idxKey) != 0 {
q = bson.M{"key": utils.ConcatenatedKey(dbKey, idxKey)}
@@ -1322,7 +1326,7 @@ func (ms *MongoStorage) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx,
if len(elem.Value) == 0 {
continue
}
indexKey := strings.TrimPrefix(elem.Key, utils.CacheInstanceToPrefix[idxItmType]+tntCtx+utils.ConcatenatedKeySep)
indexKey := strings.TrimPrefix(elem.Key, originKey+utils.ConcatenatedKeySep)
indexes[indexKey] = utils.NewStringSet(elem.Value)
}
return cur.Close(sctx)

View File

@@ -22,6 +22,7 @@ import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"os"
"strconv"
"time"
@@ -761,9 +762,14 @@ func (rs *RedisStorage) RemoveActionProfileDrv(ctx *context.Context, tenant, id
}
// GetIndexesDrv retrieves Indexes from dataDB
func (rs *RedisStorage) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) {
func (rs *RedisStorage) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey, transactionID string) (indexes map[string]utils.StringSet, err error) {
mp := make(map[string]string)
dbKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx
// dbKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx
originKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx
if transactionID != utils.NonTransactional {
originKey = "tmp_" + utils.ConcatenatedKey(originKey, transactionID)
}
dbKey := originKey
if len(idxKey) == 0 {
if err = rs.Cmd(&mp, redisHGETALL, dbKey); err != nil {
return