diff --git a/engine/datamanager.go b/engine/datamanager.go index 8d713169a..ba4291af7 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -3280,3 +3280,124 @@ func (dm *DataManager) Reconnect(marshaller string, newcfg *config.DataDbCfg) (e dm.dataDB = d return } + +func (dm *DataManager) GetIndexes(idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) { + if dm == nil { + err = utils.ErrNoDatabaseConn + return + } + if x, ok := Cache.Get(idxItmType, tntCtx); ok { // Attempt to find in cache first + if x == nil { + return nil, utils.ErrNotFound + } + indexes = x.(map[string]utils.StringSet) + if idxKey == utils.EmptyString { // in case of empty key we expect all indexes for tenant:context + return + } + indx, has := indexes[idxKey] + if has { + if indx == nil { + return nil, utils.ErrNotFound + } + return map[string]utils.StringSet{idxKey: indx}, nil + } + } + if indexes, err = dm.DataDB().GetIndexesDrv(idxItmType, tntCtx, idxKey); err != nil { + // if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilterIndexes]; err == utils.ErrNotFound && itm.Remote { + // if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RmtConns, nil, + // utils.ReplicatorSv1GetFilterIndexes, + // &utils.GetFilterIndexesArgWithArgDispatcher{ + // GetFilterIndexesArg: &utils.GetFilterIndexesArg{ + // CacheID: cacheID, + // ItemIDPrefix: key, + // FilterType: filterType, + // FldNameVal: fldNameVal}, + // TenantArg: utils.TenantArg{Tenant: config.CgrConfig().GeneralCfg().DefaultTenant}, + // ArgDispatcher: &utils.ArgDispatcher{ + // APIKey: utils.StringPointer(itm.APIKey), + // RouteID: utils.StringPointer(itm.RouteID)}, + // }, &indexes); err == nil { + // err = dm.dataDB.SetFilterIndexesDrv(cacheID, key, indexes, true, utils.NonTransactional) + // } + // } + // if err != nil { + // err = utils.CastRPCErr(err) + if err == utils.ErrNotFound { + if idxKey == utils.EmptyString { + if errCh := Cache.Set(idxItmType, tntCtx, nil, nil, + true, utils.NonTransactional); errCh != nil { + return nil, errCh + } + } else { + idx := make(map[string]utils.StringSet) + if x, ok := Cache.Get(idxItmType, tntCtx); ok && x != nil { + idx = x.(map[string]utils.StringSet) + } + idx[idxKey] = nil + if errCh := Cache.Set(idxItmType, tntCtx, idx, nil, + true, utils.NonTransactional); errCh != nil { + return nil, errCh + } + } + } + return nil, err + // } + } + idx := make(map[string]utils.StringSet) + if x, ok := Cache.Get(idxItmType, tntCtx); ok && x != nil { + idx = x.(map[string]utils.StringSet) + } + for k, v := range indexes { + idx[k] = v + } + if err = Cache.Set(idxItmType, tntCtx, idx, nil, + true, utils.NonTransactional); err != nil { + return nil, err + } + return +} + +/* + +func (dm *DataManager) SetFilterIndexes(cacheID, itemIDPrefix string, + indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) { + if dm == nil { + err = utils.ErrNoDatabaseConn + return + } + if err = dm.DataDB().SetFilterIndexesDrv(cacheID, itemIDPrefix, + indexes, commit, transactionID); err != nil { + return + } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilterIndexes]; itm.Replicate { + var reply string + if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, + utils.ReplicatorSv1SetFilterIndexes, + &utils.SetFilterIndexesArgWithArgDispatcher{ + SetFilterIndexesArg: &utils.SetFilterIndexesArg{ + CacheID: cacheID, + ItemIDPrefix: itemIDPrefix, + Indexes: indexes}, + TenantArg: utils.TenantArg{Tenant: config.CgrConfig().GeneralCfg().DefaultTenant}, + ArgDispatcher: &utils.ArgDispatcher{ + APIKey: utils.StringPointer(itm.APIKey), + RouteID: utils.StringPointer(itm.RouteID)}, + }, &reply); err != nil { + err = utils.CastRPCErr(err) + return + } + } + return +} + +func (dm *DataManager) RemoveFilterIndexes(cacheID, itemIDPrefix string) (err error) { + if dm == nil { + err = utils.ErrNoDatabaseConn + return + } + if err = dm.DataDB().RemoveFilterIndexesDrv(cacheID, itemIDPrefix); err != nil { + return + } + return +} +*/ diff --git a/engine/filterindexer.go b/engine/filterindexer.go index d9d7621f7..70aa16316 100644 --- a/engine/filterindexer.go +++ b/engine/filterindexer.go @@ -294,22 +294,20 @@ func (rfi *FilterIndexer) RemoveItemFromIndex(tenant, itemID string, oldFilters } } for _, itmMp := range rfi.indexes { - for range itmMp { - if _, has := itmMp[itemID]; has { - delete(itmMp, itemID) //Force deleting in driver - } + if _, has := itmMp[itemID]; has { + delete(itmMp, itemID) //Force deleting in driver } } return rfi.StoreIndexes(false, utils.NonTransactional) } //createAndIndex create indexes for an item -func createAndIndex(itemPrefix, tenant, context, itemID string, filterIDs []string, dm *DataManager) (err error) { +func createAndIndex(itmPrfx, tenant, context, itemID string, filterIDs []string, dm *DataManager) (err error) { indexerKey := tenant if context != "" { indexerKey = utils.ConcatenatedKey(tenant, context) } - indexer := NewFilterIndexer(dm, itemPrefix, indexerKey) + indexer := NewFilterIndexer(dm, itmPrfx, indexerKey) fltrIDs := make([]string, len(filterIDs)) for i, fltrID := range filterIDs { fltrIDs[i] = fltrID @@ -335,7 +333,7 @@ func createAndIndex(itemPrefix, tenant, context, itemID string, filterIDs []stri true, false, utils.NonTransactional); err != nil { if err == utils.ErrNotFound { err = fmt.Errorf("broken reference to filter: %+v for itemType: %+v and ID: %+v", - fltrID, itemPrefix, itemID) + fltrID, itmPrfx, itemID) } return } diff --git a/engine/libindex.go b/engine/libindex.go new file mode 100644 index 000000000..56d5e3080 --- /dev/null +++ b/engine/libindex.go @@ -0,0 +1,82 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +/* +import ( + "fmt" + + "github.com/cgrates/cgrates/utils" +) + +//createAndIndex create indexes for an item +func createAndIndex2(tnt, ctx, itmType, itmID string, filterIDs []string) (indx map[string]utils.StringSet, err error) { + indexerKey := tnt + if ctx != "" { + indexerKey = utils.ConcatenatedKey(tnt, ctx) + } + indexer := NewFilterIndexer(dm, itmType, indexerKey) + fltrIDs := make([]string, len(filterIDs)) + for i, fltrID := range filterIDs { + fltrIDs[i] = fltrID + } + if len(fltrIDs) == 0 { + fltrIDs = []string{utils.META_NONE} + } + for _, fltrID := range fltrIDs { + var fltr *Filter + if fltrID == utils.META_NONE { + fltr = &Filter{ + Tenant: tnt, + ID: itmID, + Rules: []*FilterRule{ + { + Type: utils.META_NONE, + Element: utils.META_ANY, + Values: []string{utils.META_ANY}, + }, + }, + } + } else if fltr, err = dm.GetFilter(tnt, fltrID, + true, false, utils.NonTransactional); err != nil { + if err == utils.ErrNotFound { + err = fmt.Errorf("broken reference to filter: %+v for itemType: %+v and ID: %+v", + fltrID, itmType, itmID) + } + return + } + for _, flt := range fltr.Rules { + var fldType, fldName string + var fldVals []string + if utils.SliceHasMember([]string{utils.META_NONE, utils.MetaPrefix, utils.MetaString}, flt.Type) { + fldType, fldName = flt.Type, flt.Element + fldVals = flt.Values + } + for _, fldVal := range fldVals { + if err = indexer.loadFldNameFldValIndex(fldType, + fldName, fldVal); err != nil && err != utils.ErrNotFound { + return err + } + } + } + indexer.IndexTPFilter(FilterToTPFilter(fltr), itmID) + } + return indexer.StoreIndexes(true, utils.NonTransactional) +} +*/ diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 1522be61e..d64944302 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -98,6 +98,10 @@ type DataDB interface { SetFilterIndexesDrv(cacheID, itemIDPrefix string, indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) RemoveFilterIndexesDrv(cacheID, itemIDPrefix string) (err error) + GetIndexesDrv(idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) + SetIndexesDrv(idxItmType, tntCtx string, + indexes map[string]utils.StringSet, commit bool, transactionID string) (err error) + RemoveIndexesDrv(idxItmType, tntCtx string) (err error) MatchFilterIndexDrv(cacheID, itemIDPrefix, filterType, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) GetStatQueueProfileDrv(tenant string, ID string) (sq *StatQueueProfile, err error) @@ -321,8 +325,5 @@ func (gm *GOBMarshaler) Unmarshal(data []byte, v interface{}) error { // Decide the value of cacheCommit parameter based on transactionID func cacheCommit(transactionID string) bool { - if transactionID == utils.NonTransactional { - return true - } - return false + return transactionID == utils.NonTransactional } diff --git a/engine/storage_internal_datadb.go b/engine/storage_internal_datadb.go index 01e5a016b..5b4ad08cd 100644 --- a/engine/storage_internal_datadb.go +++ b/engine/storage_internal_datadb.go @@ -1445,3 +1445,75 @@ func (iDB *InternalDB) RemoveRateProfileDrv(tenant, id string) (err error) { func (iDB *InternalDB) RemoveLoadIDsDrv() (err error) { return utils.ErrNotImplemented } + +func (iDB *InternalDB) GetIndexesDrv(idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) { + dbKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx + x, ok := iDB.db.Get(idxItmType, dbKey) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + indexes = x.(map[string]utils.StringSet) + if len(idxKey) != 0 { + return map[string]utils.StringSet{ + idxKey: indexes[idxKey].Clone(), + }, nil + } + if len(indexes) == 0 { + return nil, utils.ErrNotFound + } + return +} + +func (iDB *InternalDB) SetIndexesDrv(idxItmType, tntCtx string, + indexes map[string]utils.StringSet, commit bool, transactionID string) (err error) { + originKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx + dbKey := originKey + if transactionID != "" { + dbKey = "tmp_" + utils.ConcatenatedKey(dbKey, transactionID) + } + if commit && transactionID != "" { + x, _ := iDB.db.Get(idxItmType, dbKey) + iDB.db.Remove(idxItmType, dbKey, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + iDB.db.Set(idxItmType, originKey, x, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return + } + var toBeDeleted []string + toBeAdded := make(map[string]utils.StringSet) + for key, strMp := range indexes { + if len(strMp) == 0 { // remove with no more elements inside + toBeDeleted = append(toBeDeleted, key) + delete(indexes, key) + continue + } + toBeAdded[key] = make(utils.StringSet) + toBeAdded[key] = strMp + } + + x, ok := iDB.db.Get(idxItmType, dbKey) + if !ok || x == nil { + iDB.db.Set(idxItmType, dbKey, toBeAdded, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return err + } + + mp := x.(map[string]utils.StringSet) + for _, key := range toBeDeleted { + delete(mp, key) + } + for key, strMp := range toBeAdded { + if _, has := mp[key]; !has { + mp[key] = make(utils.StringSet) + } + mp[key] = strMp + } + iDB.db.Set(idxItmType, dbKey, mp, nil, + cacheCommit(transactionID), transactionID) + return nil +} +func (iDB *InternalDB) RemoveIndexesDrv(idxItmType, tntCtx string) (err error) { + iDB.db.Remove(idxItmType, utils.CacheInstanceToPrefix[idxItmType]+tntCtx, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return +} diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index e112bc0a0..e6e2b4a18 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -61,6 +61,7 @@ const ( ColVer = "versions" ColRsP = "resource_profiles" ColRFI = "request_filter_indexes" + ColIndx = "indexes" ColTmg = "timings" ColRes = "resources" ColSqs = "statqueues" @@ -1743,6 +1744,15 @@ func (ms *MongoStorage) RemoveTimingDrv(id string) (err error) { // GetFilterIndexesDrv retrieves Indexes from dataDB //filterType is used togheter with fieldName:Val + +/* +dataManager.GetFilterIndexesDrv( + utils.CacheAttributeFilterIndexes, + "cgrates.org:*sessions", utils.MetaString, map[string]string{ + "Subject": "dan", + }) + +*/ func (ms *MongoStorage) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType string, fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) { type result struct { @@ -1803,7 +1813,7 @@ func (ms *MongoStorage) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType st if len(res.Value) == 0 { continue } - keys := strings.Split(res.Key, ":") + keys := strings.Split(res.Key, ":") // "cgrates.org:*sesions:*string:Subject:dan" indexKey := utils.ConcatenatedKey(keys[1], keys[2], keys[3]) //check here if itemIDPrefix has context if len(strings.Split(itemIDPrefix, ":")) == 2 { @@ -2366,3 +2376,134 @@ func (ms *MongoStorage) RemoveRateProfileDrv(tenant, id string) (err error) { return err }) } + +// GetIndexesDrv retrieves Indexes from dataDB +// the key is the tenant of the item or in case of context dependent profiles is a concatenatedKey between tenant and context +// id is used as a concatenated key in case of filterIndexes the id will be filterType:fieldName:fieldVal +func (ms *MongoStorage) GetIndexesDrv(cacheID, key, id string) (indexes map[string]utils.StringSet, err error) { + type result struct { + Key string + Value []string + } + dbKey := utils.CacheInstanceToPrefix[cacheID] + key + var q bson.M + if len(id) != 0 { + q = bson.M{"key": dbKey + id} + + } else { + for _, character := range []string{".", "*"} { + dbKey = strings.Replace(dbKey, character, `\`+character, strings.Count(dbKey, character)) + } + //inside bson.RegEx add carrot to match the prefix (optimization) + q = bson.M{"key": bsonx.Regex("^"+dbKey, "")} + } + + indexes = make(map[string]utils.StringSet) + if err = ms.query(func(sctx mongo.SessionContext) (err error) { + cur, err := ms.getCol(ColIndx).Find(sctx, q) + if err != nil { + return err + } + for cur.Next(sctx) { + var elem result + if err := cur.Decode(&elem); err != nil { + return err + } + if len(elem.Value) == 0 { + continue + } + keys := strings.Split(elem.Key, ":") + indexKey := utils.ConcatenatedKey(keys[1], keys[2], keys[3]) + //check here if key has context + if len(strings.Split(key, ":")) == 2 { + indexKey = utils.ConcatenatedKey(keys[2], keys[3], keys[4]) + } + indexes[indexKey] = utils.NewStringSet(elem.Value) + } + return cur.Close(sctx) + }); err != nil { + return nil, err + } + if len(indexes) == 0 { + return nil, utils.ErrNotFound + } + return indexes, nil +} + +// SetIndexesDrv stores Indexes into DataDB +// the key is the tenant of the item or in case of context dependent profiles is a concatenatedKey between tenant and context +func (ms *MongoStorage) SetIndexesDrv(cacheID, key string, + indexes map[string]utils.StringSet, commit bool, transactionID string) (err error) { + originKey := utils.CacheInstanceToPrefix[cacheID] + key + dbKey := originKey + if transactionID != "" { + dbKey = "tmp_" + utils.ConcatenatedKey(originKey, transactionID) + } + if commit && transactionID != "" { + oldKey := "tmp_" + utils.ConcatenatedKey(originKey, transactionID) + regexKey := originKey + for _, character := range []string{".", "*"} { + regexKey = strings.Replace(regexKey, character, `\`+character, strings.Count(regexKey, character)) + oldKey = strings.Replace(oldKey, character, `\`+character, strings.Count(oldKey, character)) + } + //inside bson.RegEx add carrot to match the prefix (optimization) + if err = ms.query(func(sctx mongo.SessionContext) (err error) { + _, err = ms.getCol(ColIndx).DeleteMany(sctx, bson.M{"key": bsonx.Regex("^"+regexKey, "")}) + return err + }); err != nil { + return err + } + var lastErr error + for key, itmMp := range indexes { + if err = ms.query(func(sctx mongo.SessionContext) (err error) { + _, err = ms.getCol(ColIndx).UpdateOne(sctx, bson.M{"key": utils.ConcatenatedKey(originKey, key)}, + bson.M{"$set": bson.M{"key": utils.ConcatenatedKey(originKey, key), "value": itmMp.AsSlice()}}, + options.Update().SetUpsert(true), + ) + return err + }); err != nil { + lastErr = err + } + } + if lastErr != nil { + return lastErr + } + //inside bson.RegEx add carrot to match the prefix (optimization) + return ms.query(func(sctx mongo.SessionContext) (err error) { + _, err = ms.getCol(ColIndx).DeleteMany(sctx, bson.M{"key": bsonx.Regex("^"+oldKey, "")}) + return err + }) + } + var lastErr error + for key, itmMp := range indexes { + if err = ms.query(func(sctx mongo.SessionContext) (err error) { + var action bson.M + if len(itmMp) == 0 { + action = bson.M{"$unset": bson.M{"value": 1}} + } else { + action = bson.M{"$set": bson.M{"key": utils.ConcatenatedKey(dbKey, key), "value": itmMp.AsSlice()}} + } + _, err = ms.getCol(ColIndx).UpdateOne(sctx, bson.M{"key": utils.ConcatenatedKey(dbKey, key)}, + action, options.Update().SetUpsert(true), + ) + return err + }); err != nil { + lastErr = err + } + } + return lastErr +} + +// RemoveIndexesDrv removes the indexes +// the key is the tenant of the item or in case of context dependent profiles is a concatenatedKey between tenant and context +func (ms *MongoStorage) RemoveIndexesDrv(cacheID, key string) (err error) { + regexKey := utils.CacheInstanceToPrefix[cacheID] + key + for _, character := range []string{".", "*"} { + regexKey = strings.Replace(regexKey, character, `\`+character, strings.Count(regexKey, character)) + } + //inside bson.RegEx add carrot to match the prefix (optimization) + return ms.query(func(sctx mongo.SessionContext) (err error) { + _, err = ms.getCol(ColIndx).DeleteMany(sctx, bson.M{"key": bsonx.Regex("^"+regexKey, "")}) + return err + }) +} diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 24e82ee63..9d1565290 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1748,3 +1748,74 @@ func (rs *RedisStorage) RemoveRateProfileDrv(tenant, id string) (err error) { } return } + +// GetIndexesDrv retrieves Indexes from dataDB +func (rs *RedisStorage) GetIndexesDrv(idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) { + mp := make(map[string]string) + dbKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx + if len(idxKey) == 0 { + mp, err = rs.Cmd(redis_HGETALL, dbKey).Map() + if err != nil { + return + } else if len(mp) == 0 { + return nil, utils.ErrNotFound + } + } else { + var itmMpStrLst []string + itmMpStrLst, err = rs.Cmd(redis_HMGET, dbKey, idxKey).List() + if err != nil { + return + } else if itmMpStrLst[0] == "" { + return nil, utils.ErrNotFound + } + mp[idxKey] = itmMpStrLst[0] + } + indexes = make(map[string]utils.StringSet) + for k, v := range mp { + var sm utils.StringSet + if err = rs.ms.Unmarshal([]byte(v), &sm); err != nil { + return + } + indexes[k] = sm + } + return +} + +// SetFilterIndexesDrv stores Indexes into DataDB +func (rs *RedisStorage) SetIndexesDrv(idxItmType, tntCtx string, + indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) { + originKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx + dbKey := originKey + if transactionID != "" { + dbKey = "tmp_" + utils.ConcatenatedKey(dbKey, transactionID) + } + if commit && transactionID != "" { + return rs.Cmd(redis_RENAME, dbKey, originKey).Err + } + mp := make(map[string]string) + nameValSls := []interface{}{dbKey} + for key, strMp := range indexes { + if len(strMp) == 0 { // remove with no more elements inside + nameValSls = append(nameValSls, key) + continue + } + if encodedMp, err := rs.ms.Marshal(strMp); err != nil { + return err + } else { + mp[key] = string(encodedMp) + } + } + if len(nameValSls) != 1 { + if err = rs.Cmd(redis_HDEL, nameValSls...).Err; err != nil { + return err + } + } + if len(mp) != 0 { + return rs.Cmd(redis_HMSET, dbKey, mp).Err + } + return +} + +func (rs *RedisStorage) RemoveIndexesDrv(idxItmType, tntCtx string) (err error) { + return rs.Cmd(redis_DEL, utils.CacheInstanceToPrefix[idxItmType]+tntCtx).Err +} diff --git a/utils/set.go b/utils/set.go index 46ba39ffa..bd3eff74b 100644 --- a/utils/set.go +++ b/utils/set.go @@ -89,3 +89,15 @@ func (s StringSet) Intersect(s2 StringSet) { } } } + +// Clone creates a clone of the set +func (s StringSet) Clone() (cln StringSet) { + if s == nil { + return + } + cln = make(StringSet) + for k := range s { + cln.Add(k) + } + return +}