From 717d372facdc30df119057f4726af0f749d69728 Mon Sep 17 00:00:00 2001 From: porosnicuadrian Date: Tue, 24 Aug 2021 17:36:20 +0300 Subject: [PATCH] Added transaction for filter indexes(case of computeIDs) + little update on mongo/redis get/set index --- apis/filter_indexes.go | 77 +++++++++++++++++++++++++++++++++- engine/datadbmock.go | 6 +-- engine/datamanager.go | 31 +++++++------- engine/filterhelpers.go | 2 +- engine/libindex_health.go | 2 +- engine/storage_interface.go | 2 +- engine/storage_mongo_datadb.go | 10 +++-- engine/storage_redis.go | 10 ++++- 8 files changed, 111 insertions(+), 29 deletions(-) diff --git a/apis/filter_indexes.go b/apis/filter_indexes.go index be5878b12..4fc779656 100644 --- a/apis/filter_indexes.go +++ b/apis/filter_indexes.go @@ -16,6 +16,7 @@ along with this program. If not, see package apis import ( + "fmt" "strings" "time" @@ -136,7 +137,7 @@ func (adms *AdminSv1) GetFilterIndexes(ctx *context.Context, arg *AttrGetFilterI arg.ItemType = utils.CacheAttributeFilterIndexes } if indexes, err = adms.dm.GetIndexes(ctx, - arg.ItemType, tntCtx, utils.EmptyString, true, true); err != nil { + arg.ItemType, tntCtx, utils.EmptyString, utils.EmptyString, true, true); err != nil { return } if arg.FilterType != utils.EmptyString { @@ -353,6 +354,7 @@ func (adms *AdminSv1) ComputeFilterIndexes(ctx *context.Context, args *utils.Arg } args.ActionS = indexes.Size() != 0 } + // RateFilter Indexes var ratePrf []string if args.RateS { cacheIDs[utils.CacheRateProfilesFilterIndexes] = []string{utils.MetaAny} @@ -484,7 +486,7 @@ func (adms *AdminSv1) ComputeFilterIndexes(ctx *context.Context, args *utils.Arg // ComputeFilterIndexIDs computes specific filter indexes func (adms *AdminSv1) ComputeFilterIndexIDs(ctx *context.Context, args *utils.ArgsComputeFilterIndexIDs, reply *string) (err error) { - transactionID := utils.NonTransactional + transactionID := utils.GenUUID() tnt := args.Tenant if tnt == utils.EmptyString { tnt = adms.cfg.GeneralCfg().DefaultTenant @@ -607,12 +609,14 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(ctx *context.Context, args *utils.Ar cacheIDs[utils.CacheActionProfilesFilterIndexes] = indexes.AsSlice() } //RateProfile Indexes + var ratePrf []string if _, err = engine.ComputeIndexes(ctx, adms.dm, tnt, utils.EmptyString, utils.CacheRateProfilesFilterIndexes, &args.RateProfileIDs, transactionID, func(tnt, id, grp string) (*[]string, error) { rpr, e := adms.dm.GetRateProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } + ratePrf = append(ratePrf, utils.ConcatenatedKey(tnt, id)) rtIds := make([]string, 0, len(rpr.Rates)) for key := range rpr.Rates { rtIds = append(rtIds, key) @@ -648,6 +652,75 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(ctx *context.Context, args *utils.Ar if indexes.Size() != 0 { cacheIDs[utils.CacheDispatcherFilterIndexes] = indexes.AsSlice() } + //Now we move from tmpKey to the right key for each type + //ThresholdProfile Indexes + if len(args.ThresholdIDs) != 0 { + if err = adms.dm.SetIndexes(ctx, utils.CacheThresholdFilterIndexes, tnt, nil, true, transactionID); err != nil { + return + } + } + //AccountProfile Indexes + if len(args.AccountIDs) != 0 { + if err = adms.dm.SetIndexes(ctx, utils.CacheAccountsFilterIndexes, tnt, nil, true, transactionID); err != nil { + return + } + } + //ActionProfile Indexes + if len(args.ActionProfileIDs) != 0 { + if err = adms.dm.SetIndexes(ctx, utils.CacheActionProfilesFilterIndexes, tnt, nil, true, transactionID); err != nil { + return + } + } + //RateProfile Indexes + if len(args.RateProfileIDs) != 0 { + if err = adms.dm.SetIndexes(ctx, utils.CacheRateProfilesFilterIndexes, tnt, nil, true, transactionID); err != nil { + return err + } + for _, tntId := range ratePrf { + if err = adms.dm.SetIndexes(ctx, utils.CacheRateFilterIndexes, tntId, nil, true, transactionID); err != nil { + return err + } + + } + } + //StatQueueProfile Indexes + if len(args.StatIDs) != 0 { + if err = adms.dm.SetIndexes(ctx, utils.CacheStatFilterIndexes, tnt, nil, true, transactionID); err != nil { + return + } + } + //ResourceProfile Indexes + if len(args.ResourceIDs) != 0 { + if err = adms.dm.SetIndexes(ctx, utils.CacheResourceFilterIndexes, tnt, nil, true, transactionID); err != nil { + return + } + } + //RouteProfile Indexes + if len(args.RouteIDs) != 0 { + if err = adms.dm.SetIndexes(ctx, utils.CacheRouteFilterIndexes, tnt, nil, true, transactionID); err != nil { + return + } + } + //AttributeProfile Indexes + if len(args.AttributeIDs) != 0 { + if err = adms.dm.SetIndexes(ctx, utils.CacheAttributeFilterIndexes, tnt, nil, true, transactionID); err != nil { + return + } + } + //ChargerProfile Indexes + if len(args.ChargerIDs) != 0 { + if err = adms.dm.SetIndexes(ctx, utils.CacheChargerFilterIndexes, tnt, nil, true, transactionID); err != nil { + return + } + } + //DispatcherProfile Indexes + if len(args.DispatcherIDs) != 0 { + if err = adms.dm.SetIndexes(ctx, utils.CacheDispatcherFilterIndexes, tnt, nil, true, transactionID); err != nil { + return + } + } + //generate a load + //ID for CacheFilterIndexes and store it in database loadIDs := make(map[string]int64) timeNow := time.Now().UnixNano() diff --git a/engine/datadbmock.go b/engine/datadbmock.go index c3d10bbc6..3f3a28612 100644 --- a/engine/datadbmock.go +++ b/engine/datadbmock.go @@ -28,7 +28,7 @@ type DataDBMock struct { SetRateProfileDrvF func(*context.Context, *utils.RateProfile) error GetRateProfileDrvF func(*context.Context, string, string) (*utils.RateProfile, error) GetKeysForPrefixF func(*context.Context, string) ([]string, error) - GetIndexesDrvF func(ctx *context.Context, idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) + GetIndexesDrvF func(ctx *context.Context, idxItmType, tntCtx, idxKey, transactionID string) (indexes map[string]utils.StringSet, err error) SetIndexesDrvF func(ctx *context.Context, idxItmType, tntCtx string, indexes map[string]utils.StringSet, commit bool, transactionID string) (err error) GetAttributeProfileDrvF func(ctx *context.Context, str1 string, str2 string) (*AttributeProfile, error) SetAttributeProfileDrvF func(ctx *context.Context, attr *AttributeProfile) error @@ -151,9 +151,9 @@ func (dbM *DataDBMock) AddLoadHistory(*utils.LoadInstance, int, string) error { return utils.ErrNotImplemented } -func (dbM *DataDBMock) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) { +func (dbM *DataDBMock) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey, transactionID string) (indexes map[string]utils.StringSet, err error) { if dbM.GetIndexesDrvF != nil { - return dbM.GetIndexesDrvF(ctx, idxItmType, tntCtx, idxKey) + return dbM.GetIndexesDrvF(ctx, idxItmType, tntCtx, idxKey, transactionID) } return nil, utils.ErrNotImplemented } diff --git a/engine/datamanager.go b/engine/datamanager.go index 8d9e5b931..8cab41525 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -202,74 +202,74 @@ func (dm *DataManager) CacheDataFromDB(ctx *context.Context, prfx string, ids [] if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil { return } - _, err = dm.GetIndexes(ctx, utils.CacheAttributeFilterIndexes, tntCtx, idxKey, false, true) + _, err = dm.GetIndexes(ctx, utils.CacheAttributeFilterIndexes, tntCtx, idxKey, utils.NonTransactional, false, true) case utils.ResourceFilterIndexes: var tntCtx, idxKey string if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil { return } - _, err = dm.GetIndexes(ctx, utils.CacheResourceFilterIndexes, tntCtx, idxKey, false, true) + _, err = dm.GetIndexes(ctx, utils.CacheResourceFilterIndexes, tntCtx, idxKey, utils.NonTransactional, false, true) case utils.StatFilterIndexes: var tntCtx, idxKey string if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil { return } - _, err = dm.GetIndexes(ctx, utils.CacheStatFilterIndexes, tntCtx, idxKey, false, true) + _, err = dm.GetIndexes(ctx, utils.CacheStatFilterIndexes, tntCtx, idxKey, utils.NonTransactional, false, true) case utils.ThresholdFilterIndexes: var tntCtx, idxKey string if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil { return } - _, err = dm.GetIndexes(ctx, utils.CacheThresholdFilterIndexes, tntCtx, idxKey, false, true) + _, err = dm.GetIndexes(ctx, utils.CacheThresholdFilterIndexes, tntCtx, idxKey, utils.NonTransactional, false, true) case utils.RouteFilterIndexes: var tntCtx, idxKey string if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil { return } - _, err = dm.GetIndexes(ctx, utils.CacheRouteFilterIndexes, tntCtx, idxKey, false, true) + _, err = dm.GetIndexes(ctx, utils.CacheRouteFilterIndexes, tntCtx, idxKey, utils.NonTransactional, false, true) case utils.ChargerFilterIndexes: var tntCtx, idxKey string if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil { return } - _, err = dm.GetIndexes(ctx, utils.CacheChargerFilterIndexes, tntCtx, idxKey, false, true) + _, err = dm.GetIndexes(ctx, utils.CacheChargerFilterIndexes, tntCtx, idxKey, utils.NonTransactional, false, true) case utils.DispatcherFilterIndexes: var tntCtx, idxKey string if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil { return } - _, err = dm.GetIndexes(ctx, utils.CacheDispatcherFilterIndexes, tntCtx, idxKey, false, true) + _, err = dm.GetIndexes(ctx, utils.CacheDispatcherFilterIndexes, tntCtx, idxKey, utils.NonTransactional, false, true) case utils.RateProfilesFilterIndexPrfx: var tntCtx, idxKey string if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil { return } - _, err = dm.GetIndexes(ctx, utils.CacheRateProfilesFilterIndexes, tntCtx, idxKey, false, true) + _, err = dm.GetIndexes(ctx, utils.CacheRateProfilesFilterIndexes, tntCtx, idxKey, utils.NonTransactional, false, true) case utils.RateFilterIndexPrfx: var tntCtx, idxKey string if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil { return } - _, err = dm.GetIndexes(ctx, utils.CacheRateFilterIndexes, tntCtx, idxKey, false, true) + _, err = dm.GetIndexes(ctx, utils.CacheRateFilterIndexes, tntCtx, idxKey, utils.NonTransactional, false, true) case utils.ActionProfilesFilterIndexPrfx: var tntCtx, idxKey string if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil { return } - _, err = dm.GetIndexes(ctx, utils.CacheActionProfilesFilterIndexes, tntCtx, idxKey, false, true) + _, err = dm.GetIndexes(ctx, utils.CacheActionProfilesFilterIndexes, tntCtx, idxKey, utils.NonTransactional, false, true) case utils.AccountFilterIndexPrfx: var tntCtx, idxKey string if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil { return } - _, err = dm.GetIndexes(ctx, utils.CacheAccountsFilterIndexes, tntCtx, idxKey, false, true) + _, err = dm.GetIndexes(ctx, utils.CacheAccountsFilterIndexes, tntCtx, idxKey, utils.NonTransactional, false, true) case utils.FilterIndexPrfx: idx := strings.LastIndexByte(dataID, utils.InInFieldSep[0]) if idx < 0 { err = fmt.Errorf("WRONG_IDX_KEY_FORMAT<%s>", dataID) return } - _, err = dm.GetIndexes(ctx, utils.CacheReverseFilterIndexes, dataID[:idx], dataID[idx+1:], false, true) + _, err = dm.GetIndexes(ctx, utils.CacheReverseFilterIndexes, dataID[:idx], dataID[idx+1:], utils.NonTransactional, false, true) case utils.LoadIDPrefix: _, err = dm.GetItemLoadIDs(ctx, utils.EmptyString, true) case utils.MetaAPIBan: @@ -393,7 +393,7 @@ func (dm *DataManager) RemoveFilter(ctx *context.Context, tenant, id string, wit tntCtx = utils.ConcatenatedKey(tenant, id) var rcvIndx map[string]utils.StringSet if rcvIndx, err = dm.GetIndexes(ctx, utils.CacheReverseFilterIndexes, tntCtx, - utils.EmptyString, true, true); err != nil { + utils.EmptyString, utils.NonTransactional, true, true); err != nil { if err != utils.ErrNotFound { return } @@ -2288,7 +2288,7 @@ func (dm *DataManager) Reconnect(d DataDB) { dm.dataDB = d } -func (dm *DataManager) GetIndexes(ctx *context.Context, idxItmType, tntCtx, idxKey string, +func (dm *DataManager) GetIndexes(ctx *context.Context, idxItmType, tntCtx, idxKey, transactionID string, cacheRead, cacheWrite bool) (indexes map[string]utils.StringSet, err error) { if dm == nil { err = utils.ErrNoDatabaseConn @@ -2305,7 +2305,7 @@ func (dm *DataManager) GetIndexes(ctx *context.Context, idxItmType, tntCtx, idxK }, nil } } - if indexes, err = dm.DataDB().GetIndexesDrv(ctx, idxItmType, tntCtx, idxKey); err != nil { + if indexes, err = dm.DataDB().GetIndexesDrv(ctx, idxItmType, tntCtx, idxKey, transactionID); err != nil { if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaIndexes]; err == utils.ErrNotFound && itm.Remote { if err = dm.connMgr.Call(ctx, config.CgrConfig().DataDbCfg().RmtConns, utils.ReplicatorSv1GetIndexes, @@ -2333,7 +2333,6 @@ func (dm *DataManager) GetIndexes(ctx *context.Context, idxItmType, tntCtx, idxK return nil, err } } - if cacheWrite { for k, v := range indexes { if err = Cache.Set(ctx, idxItmType, utils.ConcatenatedKey(tntCtx, k), v, []string{tntCtx}, diff --git a/engine/filterhelpers.go b/engine/filterhelpers.go index 5636bf45d..085e94a48 100644 --- a/engine/filterhelpers.go +++ b/engine/filterhelpers.go @@ -77,7 +77,7 @@ func MatchingItemIDsForEvent(ctx *context.Context, ev utils.MapStorage, stringFl for _, val := range fldVals { var dbIndexes map[string]utils.StringSet // list of items matched in DB key := utils.ConcatenatedKey(filterIndexTypes[i], fldName, val) - if dbIndexes, err = dm.GetIndexes(ctx, cacheID, itemIDPrefix, key, true, true); err != nil { + if dbIndexes, err = dm.GetIndexes(ctx, cacheID, itemIDPrefix, key, utils.NonTransactional, true, true); err != nil { if err == utils.ErrNotFound { err = nil continue diff --git a/engine/libindex_health.go b/engine/libindex_health.go index 86e758423..f8c84288b 100644 --- a/engine/libindex_health.go +++ b/engine/libindex_health.go @@ -183,7 +183,7 @@ func getIHFltrIdxFromCache(ctx *context.Context, dm *DataManager, fltrIdxCache * return fltrVal.(utils.StringSet), nil } var indexes map[string]utils.StringSet - if indexes, err = dm.GetIndexes(ctx, idxItmType, tntGrp, idxKey, true, false); err != nil { + if indexes, err = dm.GetIndexes(ctx, idxItmType, tntGrp, idxKey, utils.NonTransactional, true, false); err != nil { if err == utils.ErrNotFound { fltrIdxCache.Set(cacheKey, nil, nil) } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index dfa11c735..d9e95755e 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -56,7 +56,7 @@ type DataDB interface { RemoveResourceDrv(*context.Context, string, string) error GetLoadHistory(int, bool, string) ([]*utils.LoadInstance, error) AddLoadHistory(*utils.LoadInstance, int, string) error - GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) + GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey, transactionID string) (indexes map[string]utils.StringSet, err error) SetIndexesDrv(ctx *context.Context, idxItmType, tntCtx string, indexes map[string]utils.StringSet, commit bool, transactionID string) (err error) RemoveIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey string) (err error) diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index a2c057332..2393d6a2d 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -1292,12 +1292,16 @@ func (ms *MongoStorage) RemoveActionProfileDrv(ctx *context.Context, tenant, id // GetIndexesDrv retrieves Indexes from dataDB // the key is the tenant of the item or in case of context dependent profiles is a concatenatedKey between tenant and context // id is used as a concatenated key in case of filterIndexes the id will be filterType:fieldName:fieldVal -func (ms *MongoStorage) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) { +func (ms *MongoStorage) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey, transactionID string) (indexes map[string]utils.StringSet, err error) { type result struct { Key string Value []string } - dbKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx + originKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx + if transactionID != utils.NonTransactional { + originKey = "tmp_" + utils.ConcatenatedKey(originKey, transactionID) + } + dbKey := originKey var q bson.M if len(idxKey) != 0 { q = bson.M{"key": utils.ConcatenatedKey(dbKey, idxKey)} @@ -1322,7 +1326,7 @@ func (ms *MongoStorage) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, if len(elem.Value) == 0 { continue } - indexKey := strings.TrimPrefix(elem.Key, utils.CacheInstanceToPrefix[idxItmType]+tntCtx+utils.ConcatenatedKeySep) + indexKey := strings.TrimPrefix(elem.Key, originKey+utils.ConcatenatedKeySep) indexes[indexKey] = utils.NewStringSet(elem.Value) } return cur.Close(sctx) diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 56904a076..c04bb177b 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -22,6 +22,7 @@ import ( "crypto/tls" "crypto/x509" "errors" + "fmt" "os" "strconv" "time" @@ -761,9 +762,14 @@ func (rs *RedisStorage) RemoveActionProfileDrv(ctx *context.Context, tenant, id } // GetIndexesDrv retrieves Indexes from dataDB -func (rs *RedisStorage) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) { +func (rs *RedisStorage) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey, transactionID string) (indexes map[string]utils.StringSet, err error) { mp := make(map[string]string) - dbKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx + // dbKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx + originKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx + if transactionID != utils.NonTransactional { + originKey = "tmp_" + utils.ConcatenatedKey(originKey, transactionID) + } + dbKey := originKey if len(idxKey) == 0 { if err = rs.Cmd(&mp, redisHGETALL, dbKey); err != nil { return