Starting adding the indexes implementation

This commit is contained in:
Trial97
2020-06-04 08:59:09 +03:00
committed by Dan Christian Bogos
parent 926d4d2cd9
commit b10f779ac8
8 changed files with 510 additions and 12 deletions

View File

@@ -3280,3 +3280,124 @@ func (dm *DataManager) Reconnect(marshaller string, newcfg *config.DataDbCfg) (e
dm.dataDB = d
return
}
func (dm *DataManager) GetIndexes(idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) {
if dm == nil {
err = utils.ErrNoDatabaseConn
return
}
if x, ok := Cache.Get(idxItmType, tntCtx); ok { // Attempt to find in cache first
if x == nil {
return nil, utils.ErrNotFound
}
indexes = x.(map[string]utils.StringSet)
if idxKey == utils.EmptyString { // in case of empty key we expect all indexes for tenant:context
return
}
indx, has := indexes[idxKey]
if has {
if indx == nil {
return nil, utils.ErrNotFound
}
return map[string]utils.StringSet{idxKey: indx}, nil
}
}
if indexes, err = dm.DataDB().GetIndexesDrv(idxItmType, tntCtx, idxKey); err != nil {
// if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilterIndexes]; err == utils.ErrNotFound && itm.Remote {
// if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RmtConns, nil,
// utils.ReplicatorSv1GetFilterIndexes,
// &utils.GetFilterIndexesArgWithArgDispatcher{
// GetFilterIndexesArg: &utils.GetFilterIndexesArg{
// CacheID: cacheID,
// ItemIDPrefix: key,
// FilterType: filterType,
// FldNameVal: fldNameVal},
// TenantArg: utils.TenantArg{Tenant: config.CgrConfig().GeneralCfg().DefaultTenant},
// ArgDispatcher: &utils.ArgDispatcher{
// APIKey: utils.StringPointer(itm.APIKey),
// RouteID: utils.StringPointer(itm.RouteID)},
// }, &indexes); err == nil {
// err = dm.dataDB.SetFilterIndexesDrv(cacheID, key, indexes, true, utils.NonTransactional)
// }
// }
// if err != nil {
// err = utils.CastRPCErr(err)
if err == utils.ErrNotFound {
if idxKey == utils.EmptyString {
if errCh := Cache.Set(idxItmType, tntCtx, nil, nil,
true, utils.NonTransactional); errCh != nil {
return nil, errCh
}
} else {
idx := make(map[string]utils.StringSet)
if x, ok := Cache.Get(idxItmType, tntCtx); ok && x != nil {
idx = x.(map[string]utils.StringSet)
}
idx[idxKey] = nil
if errCh := Cache.Set(idxItmType, tntCtx, idx, nil,
true, utils.NonTransactional); errCh != nil {
return nil, errCh
}
}
}
return nil, err
// }
}
idx := make(map[string]utils.StringSet)
if x, ok := Cache.Get(idxItmType, tntCtx); ok && x != nil {
idx = x.(map[string]utils.StringSet)
}
for k, v := range indexes {
idx[k] = v
}
if err = Cache.Set(idxItmType, tntCtx, idx, nil,
true, utils.NonTransactional); err != nil {
return nil, err
}
return
}
/*
func (dm *DataManager) SetFilterIndexes(cacheID, itemIDPrefix string,
indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) {
if dm == nil {
err = utils.ErrNoDatabaseConn
return
}
if err = dm.DataDB().SetFilterIndexesDrv(cacheID, itemIDPrefix,
indexes, commit, transactionID); err != nil {
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilterIndexes]; itm.Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1SetFilterIndexes,
&utils.SetFilterIndexesArgWithArgDispatcher{
SetFilterIndexesArg: &utils.SetFilterIndexesArg{
CacheID: cacheID,
ItemIDPrefix: itemIDPrefix,
Indexes: indexes},
TenantArg: utils.TenantArg{Tenant: config.CgrConfig().GeneralCfg().DefaultTenant},
ArgDispatcher: &utils.ArgDispatcher{
APIKey: utils.StringPointer(itm.APIKey),
RouteID: utils.StringPointer(itm.RouteID)},
}, &reply); err != nil {
err = utils.CastRPCErr(err)
return
}
}
return
}
func (dm *DataManager) RemoveFilterIndexes(cacheID, itemIDPrefix string) (err error) {
if dm == nil {
err = utils.ErrNoDatabaseConn
return
}
if err = dm.DataDB().RemoveFilterIndexesDrv(cacheID, itemIDPrefix); err != nil {
return
}
return
}
*/

View File

@@ -294,22 +294,20 @@ func (rfi *FilterIndexer) RemoveItemFromIndex(tenant, itemID string, oldFilters
}
}
for _, itmMp := range rfi.indexes {
for range itmMp {
if _, has := itmMp[itemID]; has {
delete(itmMp, itemID) //Force deleting in driver
}
if _, has := itmMp[itemID]; has {
delete(itmMp, itemID) //Force deleting in driver
}
}
return rfi.StoreIndexes(false, utils.NonTransactional)
}
//createAndIndex create indexes for an item
func createAndIndex(itemPrefix, tenant, context, itemID string, filterIDs []string, dm *DataManager) (err error) {
func createAndIndex(itmPrfx, tenant, context, itemID string, filterIDs []string, dm *DataManager) (err error) {
indexerKey := tenant
if context != "" {
indexerKey = utils.ConcatenatedKey(tenant, context)
}
indexer := NewFilterIndexer(dm, itemPrefix, indexerKey)
indexer := NewFilterIndexer(dm, itmPrfx, indexerKey)
fltrIDs := make([]string, len(filterIDs))
for i, fltrID := range filterIDs {
fltrIDs[i] = fltrID
@@ -335,7 +333,7 @@ func createAndIndex(itemPrefix, tenant, context, itemID string, filterIDs []stri
true, false, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
err = fmt.Errorf("broken reference to filter: %+v for itemType: %+v and ID: %+v",
fltrID, itemPrefix, itemID)
fltrID, itmPrfx, itemID)
}
return
}

82
engine/libindex.go Normal file
View File

@@ -0,0 +1,82 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
/*
import (
"fmt"
"github.com/cgrates/cgrates/utils"
)
//createAndIndex create indexes for an item
func createAndIndex2(tnt, ctx, itmType, itmID string, filterIDs []string) (indx map[string]utils.StringSet, err error) {
indexerKey := tnt
if ctx != "" {
indexerKey = utils.ConcatenatedKey(tnt, ctx)
}
indexer := NewFilterIndexer(dm, itmType, indexerKey)
fltrIDs := make([]string, len(filterIDs))
for i, fltrID := range filterIDs {
fltrIDs[i] = fltrID
}
if len(fltrIDs) == 0 {
fltrIDs = []string{utils.META_NONE}
}
for _, fltrID := range fltrIDs {
var fltr *Filter
if fltrID == utils.META_NONE {
fltr = &Filter{
Tenant: tnt,
ID: itmID,
Rules: []*FilterRule{
{
Type: utils.META_NONE,
Element: utils.META_ANY,
Values: []string{utils.META_ANY},
},
},
}
} else if fltr, err = dm.GetFilter(tnt, fltrID,
true, false, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
err = fmt.Errorf("broken reference to filter: %+v for itemType: %+v and ID: %+v",
fltrID, itmType, itmID)
}
return
}
for _, flt := range fltr.Rules {
var fldType, fldName string
var fldVals []string
if utils.SliceHasMember([]string{utils.META_NONE, utils.MetaPrefix, utils.MetaString}, flt.Type) {
fldType, fldName = flt.Type, flt.Element
fldVals = flt.Values
}
for _, fldVal := range fldVals {
if err = indexer.loadFldNameFldValIndex(fldType,
fldName, fldVal); err != nil && err != utils.ErrNotFound {
return err
}
}
}
indexer.IndexTPFilter(FilterToTPFilter(fltr), itmID)
}
return indexer.StoreIndexes(true, utils.NonTransactional)
}
*/

View File

@@ -98,6 +98,10 @@ type DataDB interface {
SetFilterIndexesDrv(cacheID, itemIDPrefix string,
indexes map[string]utils.StringMap, commit bool, transactionID string) (err error)
RemoveFilterIndexesDrv(cacheID, itemIDPrefix string) (err error)
GetIndexesDrv(idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error)
SetIndexesDrv(idxItmType, tntCtx string,
indexes map[string]utils.StringSet, commit bool, transactionID string) (err error)
RemoveIndexesDrv(idxItmType, tntCtx string) (err error)
MatchFilterIndexDrv(cacheID, itemIDPrefix,
filterType, fieldName, fieldVal string) (itemIDs utils.StringMap, err error)
GetStatQueueProfileDrv(tenant string, ID string) (sq *StatQueueProfile, err error)
@@ -321,8 +325,5 @@ func (gm *GOBMarshaler) Unmarshal(data []byte, v interface{}) error {
// Decide the value of cacheCommit parameter based on transactionID
func cacheCommit(transactionID string) bool {
if transactionID == utils.NonTransactional {
return true
}
return false
return transactionID == utils.NonTransactional
}

View File

@@ -1445,3 +1445,75 @@ func (iDB *InternalDB) RemoveRateProfileDrv(tenant, id string) (err error) {
func (iDB *InternalDB) RemoveLoadIDsDrv() (err error) {
return utils.ErrNotImplemented
}
func (iDB *InternalDB) GetIndexesDrv(idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) {
dbKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx
x, ok := iDB.db.Get(idxItmType, dbKey)
if !ok || x == nil {
return nil, utils.ErrNotFound
}
indexes = x.(map[string]utils.StringSet)
if len(idxKey) != 0 {
return map[string]utils.StringSet{
idxKey: indexes[idxKey].Clone(),
}, nil
}
if len(indexes) == 0 {
return nil, utils.ErrNotFound
}
return
}
func (iDB *InternalDB) SetIndexesDrv(idxItmType, tntCtx string,
indexes map[string]utils.StringSet, commit bool, transactionID string) (err error) {
originKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx
dbKey := originKey
if transactionID != "" {
dbKey = "tmp_" + utils.ConcatenatedKey(dbKey, transactionID)
}
if commit && transactionID != "" {
x, _ := iDB.db.Get(idxItmType, dbKey)
iDB.db.Remove(idxItmType, dbKey,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
iDB.db.Set(idxItmType, originKey, x, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
var toBeDeleted []string
toBeAdded := make(map[string]utils.StringSet)
for key, strMp := range indexes {
if len(strMp) == 0 { // remove with no more elements inside
toBeDeleted = append(toBeDeleted, key)
delete(indexes, key)
continue
}
toBeAdded[key] = make(utils.StringSet)
toBeAdded[key] = strMp
}
x, ok := iDB.db.Get(idxItmType, dbKey)
if !ok || x == nil {
iDB.db.Set(idxItmType, dbKey, toBeAdded, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return err
}
mp := x.(map[string]utils.StringSet)
for _, key := range toBeDeleted {
delete(mp, key)
}
for key, strMp := range toBeAdded {
if _, has := mp[key]; !has {
mp[key] = make(utils.StringSet)
}
mp[key] = strMp
}
iDB.db.Set(idxItmType, dbKey, mp, nil,
cacheCommit(transactionID), transactionID)
return nil
}
func (iDB *InternalDB) RemoveIndexesDrv(idxItmType, tntCtx string) (err error) {
iDB.db.Remove(idxItmType, utils.CacheInstanceToPrefix[idxItmType]+tntCtx,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}

View File

@@ -61,6 +61,7 @@ const (
ColVer = "versions"
ColRsP = "resource_profiles"
ColRFI = "request_filter_indexes"
ColIndx = "indexes"
ColTmg = "timings"
ColRes = "resources"
ColSqs = "statqueues"
@@ -1743,6 +1744,15 @@ func (ms *MongoStorage) RemoveTimingDrv(id string) (err error) {
// GetFilterIndexesDrv retrieves Indexes from dataDB
//filterType is used togheter with fieldName:Val
/*
dataManager.GetFilterIndexesDrv(
utils.CacheAttributeFilterIndexes,
"cgrates.org:*sessions", utils.MetaString, map[string]string{
"Subject": "dan",
})
*/
func (ms *MongoStorage) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType string,
fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) {
type result struct {
@@ -1803,7 +1813,7 @@ func (ms *MongoStorage) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType st
if len(res.Value) == 0 {
continue
}
keys := strings.Split(res.Key, ":")
keys := strings.Split(res.Key, ":") // "cgrates.org:*sesions:*string:Subject:dan"
indexKey := utils.ConcatenatedKey(keys[1], keys[2], keys[3])
//check here if itemIDPrefix has context
if len(strings.Split(itemIDPrefix, ":")) == 2 {
@@ -2366,3 +2376,134 @@ func (ms *MongoStorage) RemoveRateProfileDrv(tenant, id string) (err error) {
return err
})
}
// GetIndexesDrv retrieves Indexes from dataDB
// the key is the tenant of the item or in case of context dependent profiles is a concatenatedKey between tenant and context
// id is used as a concatenated key in case of filterIndexes the id will be filterType:fieldName:fieldVal
func (ms *MongoStorage) GetIndexesDrv(cacheID, key, id string) (indexes map[string]utils.StringSet, err error) {
type result struct {
Key string
Value []string
}
dbKey := utils.CacheInstanceToPrefix[cacheID] + key
var q bson.M
if len(id) != 0 {
q = bson.M{"key": dbKey + id}
} else {
for _, character := range []string{".", "*"} {
dbKey = strings.Replace(dbKey, character, `\`+character, strings.Count(dbKey, character))
}
//inside bson.RegEx add carrot to match the prefix (optimization)
q = bson.M{"key": bsonx.Regex("^"+dbKey, "")}
}
indexes = make(map[string]utils.StringSet)
if err = ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(ColIndx).Find(sctx, q)
if err != nil {
return err
}
for cur.Next(sctx) {
var elem result
if err := cur.Decode(&elem); err != nil {
return err
}
if len(elem.Value) == 0 {
continue
}
keys := strings.Split(elem.Key, ":")
indexKey := utils.ConcatenatedKey(keys[1], keys[2], keys[3])
//check here if key has context
if len(strings.Split(key, ":")) == 2 {
indexKey = utils.ConcatenatedKey(keys[2], keys[3], keys[4])
}
indexes[indexKey] = utils.NewStringSet(elem.Value)
}
return cur.Close(sctx)
}); err != nil {
return nil, err
}
if len(indexes) == 0 {
return nil, utils.ErrNotFound
}
return indexes, nil
}
// SetIndexesDrv stores Indexes into DataDB
// the key is the tenant of the item or in case of context dependent profiles is a concatenatedKey between tenant and context
func (ms *MongoStorage) SetIndexesDrv(cacheID, key string,
indexes map[string]utils.StringSet, commit bool, transactionID string) (err error) {
originKey := utils.CacheInstanceToPrefix[cacheID] + key
dbKey := originKey
if transactionID != "" {
dbKey = "tmp_" + utils.ConcatenatedKey(originKey, transactionID)
}
if commit && transactionID != "" {
oldKey := "tmp_" + utils.ConcatenatedKey(originKey, transactionID)
regexKey := originKey
for _, character := range []string{".", "*"} {
regexKey = strings.Replace(regexKey, character, `\`+character, strings.Count(regexKey, character))
oldKey = strings.Replace(oldKey, character, `\`+character, strings.Count(oldKey, character))
}
//inside bson.RegEx add carrot to match the prefix (optimization)
if err = ms.query(func(sctx mongo.SessionContext) (err error) {
_, err = ms.getCol(ColIndx).DeleteMany(sctx, bson.M{"key": bsonx.Regex("^"+regexKey, "")})
return err
}); err != nil {
return err
}
var lastErr error
for key, itmMp := range indexes {
if err = ms.query(func(sctx mongo.SessionContext) (err error) {
_, err = ms.getCol(ColIndx).UpdateOne(sctx, bson.M{"key": utils.ConcatenatedKey(originKey, key)},
bson.M{"$set": bson.M{"key": utils.ConcatenatedKey(originKey, key), "value": itmMp.AsSlice()}},
options.Update().SetUpsert(true),
)
return err
}); err != nil {
lastErr = err
}
}
if lastErr != nil {
return lastErr
}
//inside bson.RegEx add carrot to match the prefix (optimization)
return ms.query(func(sctx mongo.SessionContext) (err error) {
_, err = ms.getCol(ColIndx).DeleteMany(sctx, bson.M{"key": bsonx.Regex("^"+oldKey, "")})
return err
})
}
var lastErr error
for key, itmMp := range indexes {
if err = ms.query(func(sctx mongo.SessionContext) (err error) {
var action bson.M
if len(itmMp) == 0 {
action = bson.M{"$unset": bson.M{"value": 1}}
} else {
action = bson.M{"$set": bson.M{"key": utils.ConcatenatedKey(dbKey, key), "value": itmMp.AsSlice()}}
}
_, err = ms.getCol(ColIndx).UpdateOne(sctx, bson.M{"key": utils.ConcatenatedKey(dbKey, key)},
action, options.Update().SetUpsert(true),
)
return err
}); err != nil {
lastErr = err
}
}
return lastErr
}
// RemoveIndexesDrv removes the indexes
// the key is the tenant of the item or in case of context dependent profiles is a concatenatedKey between tenant and context
func (ms *MongoStorage) RemoveIndexesDrv(cacheID, key string) (err error) {
regexKey := utils.CacheInstanceToPrefix[cacheID] + key
for _, character := range []string{".", "*"} {
regexKey = strings.Replace(regexKey, character, `\`+character, strings.Count(regexKey, character))
}
//inside bson.RegEx add carrot to match the prefix (optimization)
return ms.query(func(sctx mongo.SessionContext) (err error) {
_, err = ms.getCol(ColIndx).DeleteMany(sctx, bson.M{"key": bsonx.Regex("^"+regexKey, "")})
return err
})
}

View File

@@ -1748,3 +1748,74 @@ func (rs *RedisStorage) RemoveRateProfileDrv(tenant, id string) (err error) {
}
return
}
// GetIndexesDrv retrieves Indexes from dataDB
func (rs *RedisStorage) GetIndexesDrv(idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) {
mp := make(map[string]string)
dbKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx
if len(idxKey) == 0 {
mp, err = rs.Cmd(redis_HGETALL, dbKey).Map()
if err != nil {
return
} else if len(mp) == 0 {
return nil, utils.ErrNotFound
}
} else {
var itmMpStrLst []string
itmMpStrLst, err = rs.Cmd(redis_HMGET, dbKey, idxKey).List()
if err != nil {
return
} else if itmMpStrLst[0] == "" {
return nil, utils.ErrNotFound
}
mp[idxKey] = itmMpStrLst[0]
}
indexes = make(map[string]utils.StringSet)
for k, v := range mp {
var sm utils.StringSet
if err = rs.ms.Unmarshal([]byte(v), &sm); err != nil {
return
}
indexes[k] = sm
}
return
}
// SetFilterIndexesDrv stores Indexes into DataDB
func (rs *RedisStorage) SetIndexesDrv(idxItmType, tntCtx string,
indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) {
originKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx
dbKey := originKey
if transactionID != "" {
dbKey = "tmp_" + utils.ConcatenatedKey(dbKey, transactionID)
}
if commit && transactionID != "" {
return rs.Cmd(redis_RENAME, dbKey, originKey).Err
}
mp := make(map[string]string)
nameValSls := []interface{}{dbKey}
for key, strMp := range indexes {
if len(strMp) == 0 { // remove with no more elements inside
nameValSls = append(nameValSls, key)
continue
}
if encodedMp, err := rs.ms.Marshal(strMp); err != nil {
return err
} else {
mp[key] = string(encodedMp)
}
}
if len(nameValSls) != 1 {
if err = rs.Cmd(redis_HDEL, nameValSls...).Err; err != nil {
return err
}
}
if len(mp) != 0 {
return rs.Cmd(redis_HMSET, dbKey, mp).Err
}
return
}
func (rs *RedisStorage) RemoveIndexesDrv(idxItmType, tntCtx string) (err error) {
return rs.Cmd(redis_DEL, utils.CacheInstanceToPrefix[idxItmType]+tntCtx).Err
}

View File

@@ -89,3 +89,15 @@ func (s StringSet) Intersect(s2 StringSet) {
}
}
}
// Clone creates a clone of the set
func (s StringSet) Clone() (cln StringSet) {
if s == nil {
return
}
cln = make(StringSet)
for k := range s {
cln.Add(k)
}
return
}