Almost finish Indexing

This commit is contained in:
TeoV
2017-12-15 19:22:12 +02:00
committed by Dan Christian Bogos
parent 644e9637ba
commit 2d888b1717
8 changed files with 293 additions and 108 deletions

View File

@@ -388,7 +388,8 @@ func (dm *DataManager) RemoveThresholdProfile(tenant, id, transactionID string)
}
cache.RemKey(utils.ThresholdProfilePrefix+utils.ConcatenatedKey(tenant, id),
cacheCommit(transactionID), transactionID)
return
return NewReqFilterIndexer(dm, utils.ThresholdProfilePrefix,
tenant).RemoveItemFromIndex(id)
}
func (dm *DataManager) GetStatQueueProfile(tenant, id string, skipCache bool, transactionID string) (sqp *StatQueueProfile, err error) {
@@ -863,6 +864,7 @@ func (dm *DataManager) MatchReqFilterIndex(dbKey, fieldName, fieldVal string) (i
}
return x.(utils.StringMap), nil
}
// Not found in cache, check in DB
itemIDs, err = dm.DataDB().MatchReqFilterIndexDrv(dbKey, fieldName, fieldVal)
if err != nil {
if err == utils.ErrNotFound {

View File

@@ -19,15 +19,16 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"fmt"
"github.com/cgrates/cgrates/utils"
)
func NewReqFilterIndexer(dm *DataManager, itemType, dbKeySuffix string) (*ReqFilterIndexer, error) {
func NewReqFilterIndexer(dm *DataManager, itemType, dbKeySuffix string) *ReqFilterIndexer {
return &ReqFilterIndexer{dm: dm, itemType: itemType, dbKeySuffix: dbKeySuffix,
indexes: make(map[string]map[string]utils.StringMap),
reveseIndex: make(map[string]map[string]utils.StringMap),
chngdIndxKeys: make(utils.StringMap),
chngdRevIndxKeys: make(utils.StringMap)}, nil
chngdRevIndxKeys: make(utils.StringMap)}
}
// ReqFilterIndexer is a centralized indexer for all data sources using RequestFilter
@@ -141,6 +142,72 @@ func (rfi *ReqFilterIndexer) StoreIndexes() (err error) {
return rfi.dm.SetReqFilterIndexes(GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, true), rfi.reveseIndex)
}
//Populate the ReqFilterIndexer.reveseIndex for specifil itemID
func (rfi *ReqFilterIndexer) loadItemReverseIndex(itemID string) (err error) {
rcvReveseIdx, err := rfi.dm.GetReqFilterIndexes(
GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, true),
map[string]string{itemID: ""})
if err != nil {
return err
}
for key2, val2 := range rcvReveseIdx[itemID] {
if _, has := rfi.reveseIndex[itemID]; !has {
rfi.reveseIndex[itemID] = make(map[string]utils.StringMap)
}
if _, has := rfi.reveseIndex[itemID][key2]; !has {
rfi.reveseIndex[itemID][key2] = make(utils.StringMap)
}
rfi.reveseIndex[itemID][key2] = val2
}
utils.Logger.Debug(fmt.Sprintf("itemID %+v \n ReverseIndex %+v ", itemID, rfi.reveseIndex))
return err
}
//Populate ReqFilterIndexer.indexes with specific fieldName,fieldValue , nil
func (rfi *ReqFilterIndexer) loadFldNameFldValIndex(fldName, fldVal string) error {
rcvIdx, err := rfi.dm.GetReqFilterIndexes(
GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false),
map[string]string{fldName: fldVal})
if err != nil {
return err
}
for fldName, fldValMp := range rcvIdx {
if _, has := rfi.indexes[fldName]; !has {
rfi.indexes[fldName] = make(map[string]utils.StringMap)
}
for fldVal, itmMap := range fldValMp {
rfi.indexes[fldName][fldVal] = itmMap
}
}
utils.Logger.Debug(fmt.Sprintf(" \n Index %+v ", rfi.reveseIndex))
return nil
}
//RemoveItemFromIndex remove
func (rfi *ReqFilterIndexer) RemoveItemFromIndex(itemID string) (err error) {
if err = rfi.loadItemReverseIndex(itemID); err != nil {
return err
}
for fldName, fldValMp := range rfi.reveseIndex[itemID] {
for fldVal := range fldValMp {
if err = rfi.loadFldNameFldValIndex(fldName, fldVal); err != nil {
return err
}
}
}
for _, fldValMp := range rfi.indexes {
for _, itmIDMp := range fldValMp {
if _, has := itmIDMp[itemID]; has {
delete(itmIDMp, itemID)
}
}
}
rfi.StoreIndexes()
return
}
//GetDBIndexKey return the dbKey for an specific item
func GetDBIndexKey(itemType, dbKeySuffix string, reverse bool) (dbKey string) {
var idxPrefix, rIdxPrefix string
switch itemType {

View File

@@ -21,6 +21,7 @@ package engine
import (
"fmt"
"log"
"path"
"reflect"
"strings"
@@ -29,7 +30,6 @@ import (
"github.com/cgrates/cgrates/cache"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
@@ -44,66 +44,66 @@ var (
var sTestsOnStorIT = []func(t *testing.T){
testOnStorITFlush,
testOnStorITIsDBEmpty,
// testOnStorITSetGetDerivedCharges,
testOnStorITSetGetDerivedCharges,
testOnStorITSetReqFilterIndexes,
testOnStorITGetReqFilterIndexes,
//testOnStorITMatchReqFilterIndex,
// testOnStorITCacheDestinations,
// testOnStorITCacheReverseDestinations,
// testOnStorITCacheRatingPlan,
// testOnStorITCacheRatingProfile,
// testOnStorITCacheActions,
// testOnStorITCacheActionPlan,
// testOnStorITCacheAccountActionPlans,
// testOnStorITCacheActionTriggers,
// testOnStorITCacheSharedGroup,
// testOnStorITCacheDerivedChargers,
// testOnStorITCacheLCR,
// testOnStorITCacheAlias,
// testOnStorITCacheReverseAlias,
// testOnStorITCacheResource,
// testOnStorITCacheResourceProfile,
// testOnStorITCacheStatQueueProfile,
// testOnStorITCacheStatQueue,
// testOnStorITCacheThresholdProfile,
// testOnStorITCacheThreshold,
// testOnStorITCacheTiming,
// testOnStorITCacheFilter,
// testOnStorITCacheSupplierProfile,
// testOnStorITCacheAttributeProfile,
testOnStorITMatchReqFilterIndex,
testOnStorITCacheDestinations,
testOnStorITCacheReverseDestinations,
testOnStorITCacheRatingPlan,
testOnStorITCacheRatingProfile,
testOnStorITCacheActions,
testOnStorITCacheActionPlan,
testOnStorITCacheAccountActionPlans,
testOnStorITCacheActionTriggers,
testOnStorITCacheSharedGroup,
testOnStorITCacheDerivedChargers,
testOnStorITCacheLCR,
testOnStorITCacheAlias,
testOnStorITCacheReverseAlias,
testOnStorITCacheResource,
testOnStorITCacheResourceProfile,
testOnStorITCacheStatQueueProfile,
testOnStorITCacheStatQueue,
testOnStorITCacheThresholdProfile,
testOnStorITCacheThreshold,
testOnStorITCacheTiming,
testOnStorITCacheFilter,
testOnStorITCacheSupplierProfile,
testOnStorITCacheAttributeProfile,
// ToDo: test cache flush for a prefix
// ToDo: testOnStorITLoadAccountingCache
// testOnStorITHasData,
// testOnStorITPushPop,
// testOnStorITCRUDRatingPlan,
// testOnStorITCRUDRatingProfile,
// testOnStorITCRUDDestinations,
// testOnStorITCRUDReverseDestinations,
// testOnStorITCRUDLCR,
// testOnStorITCRUDCdrStats,
// testOnStorITCRUDActions,
// testOnStorITCRUDSharedGroup,
// testOnStorITCRUDActionTriggers,
// testOnStorITCRUDActionPlan,
// testOnStorITCRUDAccountActionPlans,
// testOnStorITCRUDAccount,
// testOnStorITCRUDCdrStatsQueue,
// testOnStorITCRUDSubscribers,
// testOnStorITCRUDUser,
// testOnStorITCRUDAlias,
// testOnStorITCRUDReverseAlias,
// testOnStorITCRUDResource,
// testOnStorITCRUDResourceProfile,
// testOnStorITCRUDTiming,
// testOnStorITCRUDHistory,
// testOnStorITCRUDStructVersion,
// testOnStorITCRUDStatQueueProfile,
// testOnStorITCRUDStoredStatQueue,
// testOnStorITCRUDThresholdProfile,
// testOnStorITCRUDThreshold,
// testOnStorITCRUDFilter,
// testOnStorITCRUDSupplierProfile,
// testOnStorITCRUDAttributeProfile,
testOnStorITHasData,
testOnStorITPushPop,
testOnStorITCRUDRatingPlan,
testOnStorITCRUDRatingProfile,
testOnStorITCRUDDestinations,
testOnStorITCRUDReverseDestinations,
testOnStorITCRUDLCR,
testOnStorITCRUDCdrStats,
testOnStorITCRUDActions,
testOnStorITCRUDSharedGroup,
testOnStorITCRUDActionTriggers,
testOnStorITCRUDActionPlan,
testOnStorITCRUDAccountActionPlans,
testOnStorITCRUDAccount,
testOnStorITCRUDCdrStatsQueue,
testOnStorITCRUDSubscribers,
testOnStorITCRUDUser,
testOnStorITCRUDAlias,
testOnStorITCRUDReverseAlias,
testOnStorITCRUDResource,
testOnStorITCRUDResourceProfile,
testOnStorITCRUDTiming,
testOnStorITCRUDHistory,
testOnStorITCRUDStructVersion,
testOnStorITCRUDStatQueueProfile,
testOnStorITCRUDStoredStatQueue,
testOnStorITCRUDThresholdProfile,
testOnStorITCRUDThreshold,
testOnStorITCRUDFilter,
testOnStorITCRUDSupplierProfile,
testOnStorITCRUDAttributeProfile,
}
func TestOnStorITRedisConnect(t *testing.T) {
@@ -234,32 +234,32 @@ func testOnStorITSetReqFilterIndexes(t *testing.T) {
}
func testOnStorITGetReqFilterIndexes(t *testing.T) {
// eIdxes := map[string]map[string]utils.StringMap{
// "Account": map[string]utils.StringMap{
// "1001": utils.StringMap{
// "RL1": true,
// },
// "1002": utils.StringMap{
// "RL1": true,
// "RL2": true,
// },
// "dan": utils.StringMap{
// "RL2": true,
// },
// },
// "Subject": map[string]utils.StringMap{
// "dan": utils.StringMap{
// "RL2": true,
// "RL3": true,
// },
// },
// utils.NOT_AVAILABLE: map[string]utils.StringMap{
// utils.NOT_AVAILABLE: utils.StringMap{
// "RL4": true,
// "RL5": true,
// },
// },
// }
eIdxes := map[string]map[string]utils.StringMap{
"Account": map[string]utils.StringMap{
"1001": utils.StringMap{
"RL1": true,
},
"1002": utils.StringMap{
"RL1": true,
"RL2": true,
},
"dan": utils.StringMap{
"RL2": true,
},
},
"Subject": map[string]utils.StringMap{
"dan": utils.StringMap{
"RL2": true,
"RL3": true,
},
},
utils.NOT_AVAILABLE: map[string]utils.StringMap{
utils.NOT_AVAILABLE: utils.StringMap{
"RL4": true,
"RL5": true,
},
},
}
sbjDan := map[string]string{
"Subject": "dan",
}
@@ -278,6 +278,76 @@ func testOnStorITGetReqFilterIndexes(t *testing.T) {
} else if !reflect.DeepEqual(expectedsbjDan, exsbjDan) {
t.Errorf("Expecting: %+v, received: %+v", expectedsbjDan, exsbjDan)
}
if rcv, err := onStor.GetReqFilterIndexes(
GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
nil); err != nil {
t.Error(err)
} else {
log.Printf(fmt.Sprintf("RCV ALL IDX %+v \n", rcv))
if !reflect.DeepEqual(eIdxes, rcv) {
t.Errorf("Expecting: %+v, received: %+v", eIdxes, rcv)
}
}
// expectedsbjDan["Subject"]["dan"] = nil
// if err := onStor.SetReqFilterIndexes(
// GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
// expectedsbjDan); err != nil {
// t.Error(err)
// }
// if rcvidx, err := onStor.GetReqFilterIndexes(
// GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
// nil); err != nil {
// t.Error(err)
// } else {
// log.Printf(fmt.Sprintf("RcvALLIdx %+v", rcvidx))
// }
// if rcvidx, err := onStor.GetReqFilterIndexes(
// GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
// sbjDan); err == nil || err.Error() != utils.ErrNotFound.Error() {
// log.Printf(fmt.Sprintf("RcvIdx %+v", rcvidx))
// t.Error(err)
// } else {
// log.Printf(fmt.Sprintf("RcvIdx %+v", rcvidx))
// }
idxes := map[string]map[string]utils.StringMap{
"Account": map[string]utils.StringMap{
"1001": utils.StringMap{
"RL1": true,
},
"1002": utils.StringMap{
"RL1": true,
"RL2": true,
},
"dan": utils.StringMap{
"RL2": true,
},
},
"Subject": map[string]utils.StringMap{
"dan": utils.StringMap{},
},
utils.NOT_AVAILABLE: map[string]utils.StringMap{
utils.NOT_AVAILABLE: utils.StringMap{
"RL4": true,
"RL5": true,
},
},
}
if err := onStor.SetReqFilterIndexes(
GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
idxes); err != nil {
t.Error(err)
}
if rcv, err := onStor.GetReqFilterIndexes(
GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
nil); err != nil {
t.Error(err)
} else {
log.Printf(fmt.Sprintf("RCV ALL IDX %+v \n", rcv))
// if !reflect.DeepEqual(eIdxes, rcv) {
// t.Errorf("Expecting: %+v, received: %+v", eIdxes, rcv)
// }
}
/*
if idxes, err := onStor.GetReqFilterIndexes(
GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),

View File

@@ -23,7 +23,6 @@ import (
"compress/zlib"
"errors"
"io/ioutil"
"log"
"strings"
"sync"
@@ -1257,19 +1256,35 @@ func (ms *MapStorage) RemoveTimingDrv(id string) error {
func (ms *MapStorage) GetReqFilterIndexesDrv(dbKey string,
fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error) {
log.Print("\nMAPSTORAGE \n")
ms.mu.RLock()
defer ms.mu.RUnlock()
values, ok := ms.dict[dbKey]
if !ok {
return nil, utils.ErrNotFound
}
log.Printf(" Values before unmarshal %+v \n", values)
err = ms.ms.Unmarshal(values, &indexes)
if err != nil {
return nil, err
if len(fldNameVal) != 0 {
rcvidx := make(map[string]map[string]utils.StringMap)
err = ms.ms.Unmarshal(values, &rcvidx)
if err != nil {
return nil, err
}
indexes = make(map[string]map[string]utils.StringMap)
for fldName, fldVal := range fldNameVal {
if _, has := indexes[fldName]; !has {
indexes[fldName] = make(map[string]utils.StringMap)
}
if _, has := indexes[fldName][fldVal]; !has {
indexes[fldName][fldVal] = make(utils.StringMap)
}
indexes[fldName][fldVal] = rcvidx[fldName][fldVal]
}
return
} else {
err = ms.ms.Unmarshal(values, &indexes)
if err != nil {
return nil, err
}
}
log.Printf(" Values after unmarshal %+v \n", indexes)
return
}
func (ms *MapStorage) SetReqFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) {

View File

@@ -25,6 +25,7 @@ import (
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"io/ioutil"
//"log"
"strings"
"time"
@@ -1938,8 +1939,13 @@ func (ms *MongoStorage) GetReqFilterIndexesDrv(dbKey string,
findParam := bson.M{"key": dbKey}
if len(fldNameVal) != 0 {
for fldName, fldValue := range fldNameVal {
findParam2 := bson.M{fmt.Sprintf("value.%s.%s", fldName, fldValue): 1}
if err = col.Find(findParam).Select(findParam2).One(&result); err != nil {
var qryFltr bson.M
if fldValue == "" {
qryFltr = bson.M{fmt.Sprintf("value.%s", fldName): 1}
} else {
qryFltr = bson.M{fmt.Sprintf("value.%s.%s", fldName, fldValue): 1}
}
if err = col.Find(findParam).Select(qryFltr).One(&result); err != nil {
if err == mgo.ErrNotFound {
err = utils.ErrNotFound
}
@@ -1960,6 +1966,16 @@ func (ms *MongoStorage) GetReqFilterIndexesDrv(dbKey string,
func (ms *MongoStorage) SetReqFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) {
session, col := ms.conn(colRFI)
defer session.Close()
for k, v := range indexes {
for k2, v2 := range v {
for k3, _ := range v2 {
findParam2 := bson.M{fmt.Sprintf("value.%s.%s", k, k2, k3): nil}
if len(v2) == 0 {
err = col.Update(bson.M{"key": dbKey}, bson.M{"$unset": findParam2})
}
}
}
}
_, err = col.Upsert(bson.M{"key": dbKey}, &struct {
Key string
Value map[string]map[string]utils.StringMap

View File

@@ -1379,11 +1379,14 @@ func (rs *RedisStorage) GetReqFilterIndexesDrv(dbKey string,
return nil, utils.ErrNotFound
}
} else {
var itmMpStrLst []string
for fldName, fldVal := range fldNameVal {
concatNameVal := utils.ConcatenatedKey(fldName, fldVal)
itmMpStrLst, err := rs.Cmd("HMGET", dbKey, concatNameVal).List()
itmMpStrLst, err = rs.Cmd("HMGET", dbKey, concatNameVal).List()
if err != nil {
return nil, err
return
} else if itmMpStrLst[0] == "" {
return nil, utils.ErrNotFound
}
mp[concatNameVal] = itmMpStrLst[0]
}
@@ -1435,7 +1438,6 @@ func (rs *RedisStorage) RemoveReqFilterIndexesDrv(id string) (err error) {
func (rs *RedisStorage) MatchReqFilterIndexDrv(dbKey, fldName, fldVal string) (itemIDs utils.StringMap, err error) {
fieldValKey := utils.ConcatenatedKey(fldName, fldVal)
// Not found in cache, check in DB
fldValBytes, err := rs.Cmd("HGET", dbKey, fieldValKey).Bytes()
if err != nil {
if err == redis.ErrRespNil { // did not find the destination

View File

@@ -273,17 +273,30 @@ func TestSuppliersPopulateSupplierService(t *testing.T) {
for _, spr := range sprsmatch {
dmspl.DataDB().SetSupplierProfileDrv(spr)
}
ref, err := NewReqFilterIndexer(dmspl, utils.SupplierProfilePrefix, "cgrates.org")
if err != nil {
t.Errorf("Error: %+v", err)
}
ref := NewReqFilterIndexer(dmspl, utils.SupplierProfilePrefix, "cgrates.org")
ref.IndexFilters("supplierprofile1", filters1)
ref.IndexFilters("supplierprofile2", filters2)
err = ref.StoreIndexes()
if err != nil {
t.Errorf("Error: %+v", err)
}
//test here GetReqFilterIndexes with a specific map
expidx := map[string]map[string]utils.StringMap{
"supplierprofile1": {
"Supplier": {
"supplierprofile1": true,
},
},
}
splPrf1 := make(map[string]string)
splPrf1["supplierprofile1"] = "Supplier"
if rcvidx, err := dmspl.GetReqFilterIndexes(GetDBIndexKey(utils.SupplierProfilePrefix, "cgrates.org", false), splPrf1); err != nil {
t.Errorf("Error: %+v", err)
} else {
if !reflect.DeepEqual(expidx, rcvidx) {
t.Errorf("Expected: %+v received: %+v", expidx, rcvidx)
}
}
}
func TestSuppliersmatchingSupplierProfilesForEvent(t *testing.T) {

View File

@@ -1630,7 +1630,7 @@ func (tpr *TpReader) LoadResourceProfilesFiltered(tag string) (err error) {
}
// index resource for filters
if _, has := tpr.resIndexers[tntID.Tenant]; !has {
if tpr.resIndexers[tntID.Tenant], err = NewReqFilterIndexer(tpr.dm, utils.ResourceProfilesPrefix, tntID.Tenant); err != nil {
if tpr.resIndexers[tntID.Tenant] = NewReqFilterIndexer(tpr.dm, utils.ResourceProfilesPrefix, tntID.Tenant); err != nil {
return
}
}
@@ -1676,7 +1676,7 @@ func (tpr *TpReader) LoadStatsFiltered(tag string) (err error) {
}
// index statQueues for filters
if _, has := tpr.sqpIndexers[tntID.Tenant]; !has {
if tpr.sqpIndexers[tntID.Tenant], err = NewReqFilterIndexer(tpr.dm, utils.StatQueueProfilePrefix, tntID.Tenant); err != nil {
if tpr.sqpIndexers[tntID.Tenant] = NewReqFilterIndexer(tpr.dm, utils.StatQueueProfilePrefix, tntID.Tenant); err != nil {
return
}
}
@@ -1722,7 +1722,7 @@ func (tpr *TpReader) LoadThresholdsFiltered(tag string) (err error) {
}
// index thresholds for filters
if _, has := tpr.thdsIndexers[tntID.Tenant]; !has {
if tpr.thdsIndexers[tntID.Tenant], err = NewReqFilterIndexer(tpr.dm, utils.ThresholdProfilePrefix, tntID.Tenant); err != nil {
if tpr.thdsIndexers[tntID.Tenant] = NewReqFilterIndexer(tpr.dm, utils.ThresholdProfilePrefix, tntID.Tenant); err != nil {
return
}
}
@@ -1785,7 +1785,7 @@ func (tpr *TpReader) LoadSupplierProfilesFiltered(tag string) (err error) {
}
// index supplier profile for filters
if _, has := tpr.sppIndexers[tntID.Tenant]; !has {
if tpr.sppIndexers[tntID.Tenant], err = NewReqFilterIndexer(tpr.dm, utils.SupplierProfilePrefix, tntID.Tenant); err != nil {
if tpr.sppIndexers[tntID.Tenant] = NewReqFilterIndexer(tpr.dm, utils.SupplierProfilePrefix, tntID.Tenant); err != nil {
return
}
}
@@ -1832,7 +1832,7 @@ func (tpr *TpReader) LoadAttributeProfilesFiltered(tag string) (err error) {
// index attribute profile for filters
attrKey := utils.ConcatenatedKey(tntID.Tenant, res.Context)
if _, has := tpr.attrIndexers[attrKey]; !has {
if tpr.attrIndexers[attrKey], err = NewReqFilterIndexer(tpr.dm, utils.AttributeProfilePrefix, attrKey); err != nil {
if tpr.attrIndexers[attrKey] = NewReqFilterIndexer(tpr.dm, utils.AttributeProfilePrefix, attrKey); err != nil {
return
}
}