IndexFilter Ready for mongo and redis

This commit is contained in:
TeoV
2017-12-18 16:25:12 +02:00
committed by Dan Christian Bogos
parent c14855e8eb
commit 55872d572a
13 changed files with 415 additions and 174 deletions

View File

@@ -378,8 +378,33 @@ func (dm *DataManager) GetThresholdProfile(tenant, id string, skipCache bool, tr
return
}
func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile) (err error) {
return dm.DataDB().SetThresholdProfileDrv(th)
//from t reader setThpRf false from api true
func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool) (err error) {
if err = dm.DataDB().SetThresholdProfileDrv(th); err != nil {
return err
}
if withIndex {
var thdsIndexers map[string]*ReqFilterIndexer // tenant, indexer
thdsIndexers = make(map[string]*ReqFilterIndexer)
// index thresholds for filters
if _, has := thdsIndexers[th.Tenant]; !has {
if thdsIndexers[th.Tenant] = NewReqFilterIndexer(dm, utils.ThresholdProfilePrefix, th.Tenant); err != nil {
return
}
}
for _, fltrID := range th.FilterIDs {
var fltr *Filter
if fltr, err = dm.GetFilter(th.Tenant, fltrID, false, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, th)
}
return
}
thdsIndexers[th.Tenant].IndexFilters(th.ID, fltr)
}
thdsIndexers[th.Tenant].StoreIndexes(false)
}
return
}
func (dm *DataManager) RemoveThresholdProfile(tenant, id, transactionID string) (err error) {
@@ -843,19 +868,31 @@ func (dm *DataManager) HasData(category, subject string) (has bool, err error) {
return dm.DataDB().HasDataDrv(category, subject)
}
func (dm *DataManager) GetReqFilterIndexes(dbKey string, fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error) {
return dm.DataDB().GetReqFilterIndexesDrv(dbKey, fldNameVal)
func (dm *DataManager) GetFilterIndexes(dbKey string, fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error) {
return dm.DataDB().GetFilterIndexesDrv(dbKey, fldNameVal)
}
func (dm *DataManager) SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
return dm.DataDB().SetReqFilterIndexesDrv(dbKey, indexes, update)
func (dm *DataManager) SetFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
return dm.DataDB().SetFilterIndexesDrv(dbKey, indexes, update)
}
func (dm *DataManager) RemoveReqFilterIndexes(dbKey string) (err error) {
return dm.DataDB().RemoveReqFilterIndexesDrv(dbKey)
func (dm *DataManager) RemoveFilterIndexes(dbKey string) (err error) {
return dm.DataDB().RemoveFilterIndexesDrv(dbKey)
}
func (dm *DataManager) MatchReqFilterIndex(dbKey, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) {
func (dm *DataManager) GetFilterReverseIndexes(dbKey string, fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error) {
return dm.DataDB().GetFilterReverseIndexesDrv(dbKey, fldNameVal)
}
func (dm *DataManager) SetFilterReverseIndexes(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
return dm.DataDB().SetFilterReverseIndexesDrv(dbKey, indexes, update)
}
func (dm *DataManager) RemoveFilterReverseIndexes(dbKey, itemID string) (err error) {
return dm.DataDB().RemoveFilterReverseIndexesDrv(dbKey, itemID)
}
func (dm *DataManager) MatchFilterIndex(dbKey, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) {
fieldValKey := utils.ConcatenatedKey(fieldName, fieldVal)
cacheKey := dbKey + fieldValKey
if x, ok := cache.Get(cacheKey); ok { // Attempt to find in cache first
@@ -865,7 +902,7 @@ func (dm *DataManager) MatchReqFilterIndex(dbKey, fieldName, fieldVal string) (i
return x.(utils.StringMap), nil
}
// Not found in cache, check in DB
itemIDs, err = dm.DataDB().MatchReqFilterIndexDrv(dbKey, fieldName, fieldVal)
itemIDs, err = dm.DataDB().MatchFilterIndexDrv(dbKey, fieldName, fieldVal)
if err != nil {
if err == utils.ErrNotFound {
cache.Set(cacheKey, nil, true, utils.NonTransactional)

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"fmt"
"github.com/cgrates/cgrates/utils"
)
@@ -53,10 +52,12 @@ func (rfi *ReqFilterIndexer) ChangedKeys(reverse bool) utils.StringMap {
}
// IndexFilters parses reqFltrs, adding itemID in the indexes and marks the changed keys in chngdIndxKeys
func (rfi *ReqFilterIndexer) IndexFilters(itemID string, reqFltrs []*RequestFilter) {
func (rfi *ReqFilterIndexer) IndexFilters(itemID string, reqFltrs *Filter) {
var hasMetaString bool
rfi.reveseIndex[itemID] = make(map[string]utils.StringMap)
for _, fltr := range reqFltrs {
if _, hasIt := rfi.reveseIndex[itemID]; !hasIt {
rfi.reveseIndex[itemID] = make(map[string]utils.StringMap)
}
for _, fltr := range reqFltrs.RequestFilters {
if fltr.Type != MetaString {
continue
}
@@ -93,10 +94,12 @@ func (rfi *ReqFilterIndexer) IndexFilters(itemID string, reqFltrs []*RequestFilt
return
}
// IndexFilters parses reqFltrs, adding itemID in the indexes and marks the changed keys in chngdIndxKeys
// IndexTPFilter parses reqFltrs, adding itemID in the indexes and marks the changed keys in chngdIndxKeys
func (rfi *ReqFilterIndexer) IndexTPFilter(tpFltr *utils.TPFilterProfile, itemID string) {
var hasMetaString bool
rfi.reveseIndex[itemID] = make(map[string]utils.StringMap)
if _, hasIt := rfi.reveseIndex[itemID]; !hasIt {
rfi.reveseIndex[itemID] = make(map[string]utils.StringMap)
}
for _, fltr := range tpFltr.Filters {
if fltr.Type != MetaString {
continue
@@ -136,15 +139,15 @@ func (rfi *ReqFilterIndexer) IndexTPFilter(tpFltr *utils.TPFilterProfile, itemID
// StoreIndexes handles storing the indexes to dataDB
func (rfi *ReqFilterIndexer) StoreIndexes(update bool) (err error) {
if err = rfi.dm.SetReqFilterIndexes(GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false), rfi.indexes, update); err != nil {
if err = rfi.dm.SetFilterIndexes(GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false), rfi.indexes, update); err != nil {
return
}
return rfi.dm.SetReqFilterIndexes(GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, true), rfi.reveseIndex, update)
return rfi.dm.SetFilterReverseIndexes(GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, true), rfi.reveseIndex, update)
}
//Populate the ReqFilterIndexer.reveseIndex for specifil itemID
func (rfi *ReqFilterIndexer) loadItemReverseIndex(itemID string) (err error) {
rcvReveseIdx, err := rfi.dm.GetReqFilterIndexes(
rcvReveseIdx, err := rfi.dm.GetFilterReverseIndexes(
GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, true),
map[string]string{itemID: ""})
if err != nil {
@@ -159,13 +162,12 @@ func (rfi *ReqFilterIndexer) loadItemReverseIndex(itemID string) (err error) {
}
rfi.reveseIndex[itemID][key2] = val2
}
utils.Logger.Debug(fmt.Sprintf("itemID %+v \n ReverseIndex %+v ", itemID, rfi.reveseIndex))
return err
}
//Populate ReqFilterIndexer.indexes with specific fieldName,fieldValue , nil
func (rfi *ReqFilterIndexer) loadFldNameFldValIndex(fldName, fldVal string) error {
rcvIdx, err := rfi.dm.GetReqFilterIndexes(
rcvIdx, err := rfi.dm.GetFilterIndexes(
GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false),
map[string]string{fldName: fldVal})
if err != nil {
@@ -179,7 +181,6 @@ func (rfi *ReqFilterIndexer) loadFldNameFldValIndex(fldName, fldVal string) erro
rfi.indexes[fldName][fldVal] = itmMap
}
}
utils.Logger.Debug(fmt.Sprintf(" \n Index %+v ", rfi.reveseIndex))
return nil
}
@@ -202,8 +203,8 @@ func (rfi *ReqFilterIndexer) RemoveItemFromIndex(itemID string) (err error) {
}
}
}
utils.Logger.Debug(fmt.Sprintf("Indexes : %+v \n", rfi.indexes))
rfi.StoreIndexes(true)
rfi.dm.RemoveFilterReverseIndexes(GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, true), itemID)
return
}

View File

@@ -21,7 +21,6 @@ package engine
import (
"fmt"
"log"
"path"
"reflect"
"strings"
@@ -45,9 +44,9 @@ var sTestsOnStorIT = []func(t *testing.T){
testOnStorITFlush,
testOnStorITIsDBEmpty,
testOnStorITSetGetDerivedCharges,
testOnStorITSetReqFilterIndexes,
testOnStorITGetReqFilterIndexes,
testOnStorITMatchReqFilterIndex,
testOnStorITSetFilterIndexes,
testOnStorITGetFilterIndexes,
testOnStorITMatchFilterIndex,
testOnStorITCacheDestinations,
testOnStorITCacheReverseDestinations,
testOnStorITCacheRatingPlan,
@@ -199,7 +198,7 @@ func testOnStorITSetGetDerivedCharges(t *testing.T) {
}
}
func testOnStorITSetReqFilterIndexes(t *testing.T) {
func testOnStorITSetFilterIndexes(t *testing.T) {
idxes := map[string]map[string]utils.StringMap{
"Account": map[string]utils.StringMap{
"1001": utils.StringMap{
@@ -226,14 +225,14 @@ func testOnStorITSetReqFilterIndexes(t *testing.T) {
},
},
}
if err := onStor.SetReqFilterIndexes(
if err := onStor.SetFilterIndexes(
GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
idxes); err != nil {
idxes, false); err != nil {
t.Error(err)
}
}
func testOnStorITGetReqFilterIndexes(t *testing.T) {
func testOnStorITGetFilterIndexes(t *testing.T) {
eIdxes := map[string]map[string]utils.StringMap{
"Account": map[string]utils.StringMap{
"1001": utils.StringMap{
@@ -271,122 +270,58 @@ func testOnStorITGetReqFilterIndexes(t *testing.T) {
},
},
}
if exsbjDan, err := onStor.GetReqFilterIndexes(
if exsbjDan, err := onStor.GetFilterIndexes(
GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
sbjDan); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expectedsbjDan, exsbjDan) {
t.Errorf("Expecting: %+v, received: %+v", expectedsbjDan, exsbjDan)
}
if rcv, err := onStor.GetReqFilterIndexes(
if rcv, err := onStor.GetFilterIndexes(
GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
nil); err != nil {
t.Error(err)
} else {
log.Printf(fmt.Sprintf("RCV ALL IDX %+v \n", rcv))
if !reflect.DeepEqual(eIdxes, rcv) {
t.Errorf("Expecting: %+v, received: %+v", eIdxes, rcv)
}
}
// expectedsbjDan["Subject"]["dan"] = nil
// if err := onStor.SetReqFilterIndexes(
// GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
// expectedsbjDan); err != nil {
// t.Error(err)
// }
// if rcvidx, err := onStor.GetReqFilterIndexes(
// GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
// nil); err != nil {
// t.Error(err)
// } else {
// log.Printf(fmt.Sprintf("RcvALLIdx %+v", rcvidx))
// }
// if rcvidx, err := onStor.GetReqFilterIndexes(
// GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
// sbjDan); err == nil || err.Error() != utils.ErrNotFound.Error() {
// log.Printf(fmt.Sprintf("RcvIdx %+v", rcvidx))
// t.Error(err)
// } else {
// log.Printf(fmt.Sprintf("RcvIdx %+v", rcvidx))
// }
idxes := map[string]map[string]utils.StringMap{
"Account": map[string]utils.StringMap{
"1001": utils.StringMap{
"RL1": true,
},
"1002": utils.StringMap{
"RL1": true,
"RL2": true,
},
"dan": utils.StringMap{
"RL2": true,
},
},
"Subject": map[string]utils.StringMap{
"dan": utils.StringMap{},
},
utils.NOT_AVAILABLE: map[string]utils.StringMap{
utils.NOT_AVAILABLE: utils.StringMap{
"RL4": true,
"RL5": true,
},
},
}
if err := onStor.SetReqFilterIndexes(
GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
idxes); err != nil {
if _, err := onStor.GetFilterIndexes("unknown_key", nil); err == nil || err != utils.ErrNotFound {
t.Error(err)
}
if rcv, err := onStor.GetReqFilterIndexes(
GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
nil); err != nil {
if err := onStor.RemoveFilterIndexes(GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false)); err != nil {
t.Error(err)
}
_, err := onStor.GetFilterIndexes(
GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), nil)
if err != utils.ErrNotFound {
//if err!=nil{
t.Error(err)
//}else if !reflect.DeepEqual(eIdxes, idxes) {
// t.Errorf("Expecting: %+v, received: %+v", eIdxes, idxes)
}
if err := onStor.SetFilterIndexes(
GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
eIdxes, false); err != nil {
t.Error(err)
} else {
log.Printf(fmt.Sprintf("RCV ALL IDX %+v \n", rcv))
// if !reflect.DeepEqual(eIdxes, rcv) {
// t.Errorf("Expecting: %+v, received: %+v", eIdxes, rcv)
// }
}
/*
if idxes, err := onStor.GetReqFilterIndexes(
GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
nil); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eIdxes, idxes) {
t.Errorf("Expecting: %+v, received: %+v", eIdxes, idxes)
}
if _, err := onStor.GetReqFilterIndexes("unknown_key", nil); err == nil || err != utils.ErrNotFound {
t.Error(err)
}
if err := onStor.RemoveReqFilterIndexes(utils.ResourceProfilesStringIndex); err != nil {
t.Error(err)
}
_, err := onStor.GetReqFilterIndexes(utils.ResourceProfilesStringIndex, nil)
if err != utils.ErrNotFound {
//if err!=nil{
t.Error(err)
//}else if !reflect.DeepEqual(eIdxes, idxes) {
// t.Errorf("Expecting: %+v, received: %+v", eIdxes, idxes)
}
if err := onStor.SetReqFilterIndexes(utils.ResourceProfilesStringIndex, eIdxes); err != nil {
t.Error(err)
}
*/
}
func testOnStorITMatchReqFilterIndex(t *testing.T) {
func testOnStorITMatchFilterIndex(t *testing.T) {
eMp := utils.StringMap{
"RL1": true,
"RL2": true,
}
if rcvMp, err := onStor.MatchReqFilterIndex(utils.ResourceProfilesStringIndex, "Account", "1002"); err != nil {
if rcvMp, err := onStor.MatchFilterIndex(
GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
"Account", "1002"); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eMp, rcvMp) {
t.Errorf("Expecting: %+v, received: %+v", eMp, rcvMp)
}
if _, err := onStor.MatchReqFilterIndex(utils.ResourceProfilesStringIndex, "NonexistentField", "1002"); err == nil || err != utils.ErrNotFound {
if _, err := onStor.MatchFilterIndex(
GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
"NonexistentField", "1002"); err == nil || err != utils.ErrNotFound {
t.Error(err)
}
}
@@ -1116,10 +1051,25 @@ func testOnStorITCacheStatQueue(t *testing.T) {
}
func testOnStorITCacheThresholdProfile(t *testing.T) {
filter := &Filter{
Tenant: "cgrates.org",
ID: "TestFilter",
RequestFilters: []*RequestFilter{
&RequestFilter{
FieldName: "*string",
Type: "Account",
Values: []string{"1001", "1002"},
},
},
ActivationInterval: &utils.ActivationInterval{
ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(),
ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(),
},
}
tPrfl := &ThresholdProfile{
Tenant: "cgrates.org",
ID: "Test_Threshold_Cache",
FilterIDs: []string{"FilterID1", "FilterID2"},
FilterIDs: []string{"TestFilter"},
ActivationInterval: &utils.ActivationInterval{
ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(),
ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(),
@@ -1131,7 +1081,10 @@ func testOnStorITCacheThresholdProfile(t *testing.T) {
ActionIDs: []string{"ACT_1", "ACT_2"},
Async: true,
}
if err := onStor.SetThresholdProfile(tPrfl); err != nil {
if err := onStor.SetFilter(filter); err != nil {
t.Error(err)
}
if err := onStor.SetThresholdProfile(tPrfl, true); err != nil {
t.Error(err)
}
expectedR := []string{"thp_cgrates.org:Test_Threshold_Cache"}
@@ -1202,7 +1155,7 @@ func testOnStorITCacheFilter(t *testing.T) {
if err := onStor.SetFilter(filter); err != nil {
t.Error(err)
}
expectedT := []string{"ftr_cgrates.org:Filter1"}
expectedT := []string{"ftr_cgrates.org:TestFilter", "ftr_cgrates.org:Filter1"}
if itm, err := onStor.DataDB().GetKeysForPrefix(utils.FilterPrefix); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expectedT, itm) {
@@ -1220,7 +1173,6 @@ func testOnStorITCacheFilter(t *testing.T) {
} else if rcv := itm.(*Filter); !reflect.DeepEqual(filter, rcv) {
t.Errorf("Expecting: %+v, received: %+v", filter, rcv)
}
}
func testOnStorITCacheSupplierProfile(t *testing.T) {
@@ -2482,23 +2434,41 @@ func testOnStorITCRUDStoredStatQueue(t *testing.T) {
}
func testOnStorITCRUDThresholdProfile(t *testing.T) {
fp := &Filter{
Tenant: "cgrates.org",
ID: "TestFilter2",
RequestFilters: []*RequestFilter{
&RequestFilter{
FieldName: "Account",
Type: "*string",
Values: []string{"1001", "1002"},
},
},
ActivationInterval: &utils.ActivationInterval{
ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(),
ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(),
},
}
timeMinSleep := time.Duration(0 * time.Second)
th := &ThresholdProfile{
Tenant: "cgrates.org",
ID: "test",
ActivationInterval: &utils.ActivationInterval{},
FilterIDs: []string{},
FilterIDs: []string{"TestFilter2"},
Recurrent: true,
MinSleep: timeMinSleep,
Blocker: true,
Weight: 1.4,
ActionIDs: []string{},
}
if err := onStor.SetFilter(fp); err != nil {
t.Error(err)
}
if _, rcvErr := onStor.GetThresholdProfile(th.Tenant, th.ID,
false, utils.NonTransactional); rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
}
if err := onStor.SetThresholdProfile(th); err != nil {
if err := onStor.SetThresholdProfile(th, true); err != nil {
t.Error(err)
}
if rcv, err := onStor.GetThresholdProfile(th.Tenant, th.ID, true, utils.NonTransactional); err != nil {

View File

@@ -50,7 +50,7 @@ func matchingItemIDsForEvent(ev map[string]interface{}, fieldIDs []string,
fmt.Sprintf("<%s> cannot cast field: %s into string", utils.FilterS, fldName))
continue
}
dbItemIDs, err := dm.MatchReqFilterIndex(dbIdxKey, fldName, fldVal)
dbItemIDs, err := dm.MatchFilterIndex(dbIdxKey, fldName, fldVal)
if err != nil {
if err == utils.ErrNotFound {
continue
@@ -63,7 +63,7 @@ func matchingItemIDsForEvent(ev map[string]interface{}, fieldIDs []string,
}
}
}
dbItemIDs, err := dm.MatchReqFilterIndex(dbIdxKey, utils.NOT_AVAILABLE, utils.NOT_AVAILABLE) // add unindexed itemIDs to be checked
dbItemIDs, err := dm.MatchFilterIndex(dbIdxKey, utils.NOT_AVAILABLE, utils.NOT_AVAILABLE) // add unindexed itemIDs to be checked
if err != nil {
if err != utils.ErrNotFound {
return nil, err

View File

@@ -117,10 +117,13 @@ type DataDB interface {
RemoveTimingDrv(string) error
GetLoadHistory(int, bool, string) ([]*utils.LoadInstance, error)
AddLoadHistory(*utils.LoadInstance, int, string) error
GetReqFilterIndexesDrv(dbKey string, fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error)
SetReqFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error)
RemoveReqFilterIndexesDrv(id string) (err error)
MatchReqFilterIndexDrv(dbKey, fieldName, fieldVal string) (itemIDs utils.StringMap, err error)
GetFilterIndexesDrv(dbKey string, fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error)
SetFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error)
RemoveFilterIndexesDrv(id string) (err error)
GetFilterReverseIndexesDrv(dbKey string, fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error)
SetFilterReverseIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error)
RemoveFilterReverseIndexesDrv(dbKey, itemID string) (err error)
MatchFilterIndexDrv(dbKey, fieldName, fieldVal string) (itemIDs utils.StringMap, err error)
GetStatQueueProfileDrv(tenant string, ID string) (sq *StatQueueProfile, err error)
SetStatQueueProfileDrv(sq *StatQueueProfile) (err error)
RemStatQueueProfileDrv(tenant, id string) (err error)

View File

@@ -1254,7 +1254,7 @@ func (ms *MapStorage) RemoveTimingDrv(id string) error {
return nil
}
func (ms *MapStorage) GetReqFilterIndexesDrv(dbKey string,
func (ms *MapStorage) GetFilterIndexesDrv(dbKey string,
fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
@@ -1287,7 +1287,8 @@ func (ms *MapStorage) GetReqFilterIndexesDrv(dbKey string,
}
return
}
func (ms *MapStorage) SetReqFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
func (ms *MapStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(indexes)
@@ -1298,14 +1299,66 @@ func (ms *MapStorage) SetReqFilterIndexesDrv(dbKey string, indexes map[string]ma
return
}
func (ms *MapStorage) RemoveReqFilterIndexesDrv(id string) (err error) {
func (ms *MapStorage) RemoveFilterIndexesDrv(id string) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
delete(ms.dict, id)
return
}
func (ms *MapStorage) MatchReqFilterIndexDrv(dbKey, fldName, fldVal string) (itemIDs utils.StringMap, err error) {
func (ms *MapStorage) GetFilterReverseIndexesDrv(dbKey string,
fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
values, ok := ms.dict[dbKey]
if !ok {
return nil, utils.ErrNotFound
}
if len(fldNameVal) != 0 {
rcvidx := make(map[string]map[string]utils.StringMap)
err = ms.ms.Unmarshal(values, &rcvidx)
if err != nil {
return nil, err
}
indexes = make(map[string]map[string]utils.StringMap)
for fldName, fldVal := range fldNameVal {
if _, has := indexes[fldName]; !has {
indexes[fldName] = make(map[string]utils.StringMap)
}
if _, has := indexes[fldName][fldVal]; !has {
indexes[fldName][fldVal] = make(utils.StringMap)
}
indexes[fldName][fldVal] = rcvidx[fldName][fldVal]
}
return
} else {
err = ms.ms.Unmarshal(values, &indexes)
if err != nil {
return nil, err
}
}
return
}
func (ms *MapStorage) SetFilterReverseIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(indexes)
if err != nil {
return err
}
ms.dict[dbKey] = result
return
}
func (ms *MapStorage) RemoveFilterReverseIndexesDrv(dbKey, itemID string) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
delete(ms.dict, utils.ConcatenatedKey(dbKey, itemID))
return
}
func (ms *MapStorage) MatchFilterIndexDrv(dbKey, fldName, fldVal string) (itemIDs utils.StringMap, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
values, ok := ms.dict[dbKey]

View File

@@ -1928,7 +1928,7 @@ func (ms *MongoStorage) RemoveTimingDrv(id string) (err error) {
return nil
}
func (ms *MongoStorage) GetReqFilterIndexesDrv(dbKey string,
func (ms *MongoStorage) GetFilterIndexesDrv(dbKey string,
fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error) {
session, col := ms.conn(colRFI)
defer session.Close()
@@ -1963,15 +1963,13 @@ func (ms *MongoStorage) GetReqFilterIndexesDrv(dbKey string,
return result.Value, nil
}
func (ms *MongoStorage) SetReqFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
func (ms *MongoStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
session, col := ms.conn(colRFI)
defer session.Close()
if update {
utils.Logger.Debug("\n ENTER HERE !! \n")
for k, v := range indexes {
for k2, v2 := range v {
findParam2 := fmt.Sprintf("value.%s.%s", k, k2)
utils.Logger.Debug(fmt.Sprintf("\nFINDPARAM2 %+v and len(v2) %+v \n", findParam2, len(v2)))
if len(v2) != 0 {
for k3 := range v2 {
err = col.Update(bson.M{"key": dbKey}, bson.M{"$set": bson.M{findParam2: bson.M{k3: true}}})
@@ -1990,7 +1988,7 @@ func (ms *MongoStorage) SetReqFilterIndexesDrv(dbKey string, indexes map[string]
return
}
func (ms *MongoStorage) RemoveReqFilterIndexesDrv(id string) (err error) {
func (ms *MongoStorage) RemoveFilterIndexesDrv(id string) (err error) {
session, col := ms.conn(colRFI)
defer session.Close()
if err = col.Remove(bson.M{"key": id}); err != nil {
@@ -1998,8 +1996,77 @@ func (ms *MongoStorage) RemoveReqFilterIndexesDrv(id string) (err error) {
}
return nil
}
func (ms *MongoStorage) GetFilterReverseIndexesDrv(dbKey string,
fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error) {
session, col := ms.conn(colRFI)
defer session.Close()
var result struct {
Key string
Value map[string]map[string]utils.StringMap
}
findParam := bson.M{"key": dbKey}
if len(fldNameVal) != 0 {
for fldName, fldValue := range fldNameVal {
var qryFltr bson.M
if fldValue == "" {
qryFltr = bson.M{fmt.Sprintf("value.%s", fldName): 1}
} else {
qryFltr = bson.M{fmt.Sprintf("value.%s.%s", fldName, fldValue): 1}
}
if err = col.Find(findParam).Select(qryFltr).One(&result); err != nil {
if err == mgo.ErrNotFound {
err = utils.ErrNotFound
}
return nil, err
}
return result.Value, nil
}
}
if err = col.Find(findParam).One(&result); err != nil {
if err == mgo.ErrNotFound {
err = utils.ErrNotFound
}
return nil, err
}
return result.Value, nil
}
func (ms *MongoStorage) MatchReqFilterIndexDrv(dbKey, fldName, fldVal string) (itemIDs utils.StringMap, err error) {
func (ms *MongoStorage) SetFilterReverseIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
session, col := ms.conn(colRFI)
defer session.Close()
if update {
for k, v := range indexes {
for k2, v2 := range v {
findParam2 := fmt.Sprintf("value.%s.%s", k, k2)
if len(v2) != 0 {
for k3 := range v2 {
err = col.Update(bson.M{"key": dbKey}, bson.M{"$set": bson.M{findParam2: bson.M{k3: true}}})
}
} else {
err = col.Update(bson.M{"key": dbKey}, bson.M{"$unset": bson.M{findParam2: 1}})
}
}
}
} else {
_, err = col.Upsert(bson.M{"key": dbKey}, &struct {
Key string
Value map[string]map[string]utils.StringMap
}{dbKey, indexes})
}
return
}
func (ms *MongoStorage) RemoveFilterReverseIndexesDrv(dbKey, itemID string) (err error) {
session, col := ms.conn(colRFI)
defer session.Close()
findParam := fmt.Sprintf("value.%s", itemID)
if err = col.Update(bson.M{"key": dbKey}, bson.M{"$unset": bson.M{findParam: 1}}); err != nil {
return
}
return nil
}
func (ms *MongoStorage) MatchFilterIndexDrv(dbKey, fldName, fldVal string) (itemIDs utils.StringMap, err error) {
session, col := ms.conn(colRFI)
defer session.Close()
var result struct {

View File

@@ -1368,7 +1368,7 @@ func (rs *RedisStorage) RemoveTimingDrv(id string) (err error) {
return
}
func (rs *RedisStorage) GetReqFilterIndexesDrv(dbKey string,
func (rs *RedisStorage) GetFilterIndexesDrv(dbKey string,
fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error) {
mp := make(map[string]string)
if len(fldNameVal) == 0 {
@@ -1412,35 +1412,102 @@ func (rs *RedisStorage) GetReqFilterIndexesDrv(dbKey string,
return
}
func (rs *RedisStorage) SetReqFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
func (rs *RedisStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
mp := make(map[string]string)
if update {
for fldName, fldValMp := range indexes {
for fldVal, strMp := range fldValMp {
if len(strMp) == 0 { // remove with no more elements inside
return rs.Cmd("HDEL", dbKey, utils.ConcatenatedKey(fldName, fldVal)).Err
}
if encodedMp, err := rs.ms.Marshal(strMp); err != nil {
for fldName, fldValMp := range indexes {
for fldVal, strMp := range fldValMp {
if len(strMp) == 0 { // remove with no more elements inside
if err = rs.Cmd("HDEL", dbKey, utils.ConcatenatedKey(fldName, fldVal)).Err; err != nil {
return err
} else {
mp[utils.ConcatenatedKey(fldName, fldVal)] = string(encodedMp)
}
continue
}
if encodedMp, err := rs.ms.Marshal(strMp); err != nil {
return err
} else {
mp[utils.ConcatenatedKey(fldName, fldVal)] = string(encodedMp)
}
}
} else {
return rs.Cmd("HMSET", dbKey, mp).Err
}
return
return rs.Cmd("HMSET", dbKey, mp).Err
}
func (rs *RedisStorage) RemoveReqFilterIndexesDrv(id string) (err error) {
func (rs *RedisStorage) RemoveFilterIndexesDrv(id string) (err error) {
if err = rs.Cmd("DEL", id).Err; err != nil {
return err
}
return
}
func (rs *RedisStorage) MatchReqFilterIndexDrv(dbKey, fldName, fldVal string) (itemIDs utils.StringMap, err error) {
//GetFilterReverseIndexesDrv retrieves ReverseIndexes from dataDB
func (rs *RedisStorage) GetFilterReverseIndexesDrv(dbKey string,
fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error) {
mp := make(map[string]string)
if len(fldNameVal) == 0 {
mp, err = rs.Cmd("HGETALL", dbKey).Map()
if err != nil {
return
} else if len(mp) == 0 {
return nil, utils.ErrNotFound
}
} else {
var itmMpStrLst []string
for fldName, _ := range fldNameVal {
itmMpStrLst, err = rs.Cmd("HMGET", dbKey, fldName).List()
if err != nil {
return
} else if itmMpStrLst[0] == "" {
return nil, utils.ErrNotFound
}
mp[fldName] = itmMpStrLst[0]
}
}
indexes = make(map[string]map[string]utils.StringMap)
for k, v := range mp {
var sm map[string]utils.StringMap
if err = rs.ms.Unmarshal([]byte(v), &sm); err != nil {
return
}
if _, hasKey := indexes[k]; !hasKey {
indexes[k] = make(map[string]utils.StringMap)
}
for key, val := range sm {
if _, hasKey := indexes[k][key]; !hasKey {
indexes[k][key] = make(utils.StringMap)
}
indexes[k][key] = val
}
}
return
}
func (rs *RedisStorage) SetFilterReverseIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
mp := make(map[string]string)
for fldName, fldValMp := range indexes {
for _, strMp := range fldValMp {
if len(strMp) == 0 { // remove with no more elements inside
if err = rs.Cmd("HDEL", dbKey, fldName).Err; err != nil {
return err
}
}
if encodedMp, err := rs.ms.Marshal(fldValMp); err != nil {
return err
} else {
mp[fldName] = string(encodedMp)
}
}
}
return rs.Cmd("HMSET", dbKey, mp).Err
}
func (rs *RedisStorage) RemoveFilterReverseIndexesDrv(dbKey, itemID string) (err error) {
if err = rs.Cmd("HDEL", dbKey, itemID).Err; err != nil {
return err
}
return
}
func (rs *RedisStorage) MatchFilterIndexDrv(dbKey, fldName, fldVal string) (itemIDs utils.StringMap, err error) {
fieldValKey := utils.ConcatenatedKey(fldName, fldVal)
fldValBytes, err := rs.Cmd("HGET", dbKey, fieldValKey).Bytes()
if err != nil {

View File

@@ -180,7 +180,6 @@ func TestSuppliersPopulateSupplierService(t *testing.T) {
filter2 := &Filter{Tenant: config.CgrConfig().DefaultTenant, ID: "filter2", RequestFilters: filters2}
dmspl.SetFilter(filter1)
dmspl.SetFilter(filter2)
ssd := make(map[string]SuppliersSorter)
ssd[utils.MetaWeight] = NewWeightSorter()
splserv = SupplierService{
@@ -274,13 +273,13 @@ func TestSuppliersPopulateSupplierService(t *testing.T) {
dmspl.DataDB().SetSupplierProfileDrv(spr)
}
ref := NewReqFilterIndexer(dmspl, utils.SupplierProfilePrefix, "cgrates.org")
ref.IndexFilters("supplierprofile1", filters1)
ref.IndexFilters("supplierprofile2", filters2)
ref.IndexFilters("supplierprofile1", filter1)
ref.IndexFilters("supplierprofile2", filter2)
err = ref.StoreIndexes(false)
if err != nil {
t.Errorf("Error: %+v", err)
}
//test here GetReqFilterIndexes with a specific map
//test here GetReqFilterIndexes for StorageMap with a specific map
expidx := map[string]map[string]utils.StringMap{
"supplierprofile1": {
"Supplier": {
@@ -290,7 +289,9 @@ func TestSuppliersPopulateSupplierService(t *testing.T) {
}
splPrf1 := make(map[string]string)
splPrf1["supplierprofile1"] = "Supplier"
if rcvidx, err := dmspl.GetReqFilterIndexes(GetDBIndexKey(utils.SupplierProfilePrefix, "cgrates.org", false), splPrf1); err != nil {
if rcvidx, err := dmspl.GetFilterIndexes(
GetDBIndexKey(utils.SupplierProfilePrefix, "cgrates.org", false),
splPrf1); err != nil {
t.Errorf("Error: %+v", err)
} else {
if !reflect.DeepEqual(expidx, rcvidx) {

View File

@@ -2243,7 +2243,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
if err != nil {
return err
}
if err = tpr.dm.SetThresholdProfile(th); err != nil {
if err = tpr.dm.SetThresholdProfile(th, false); err != nil {
return err
}
if verbose {
@@ -2955,7 +2955,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
log.Print("Indexing resource profiles")
}
for tenant, fltrIdxer := range tpr.resIndexers {
if err := tpr.dm.RemoveReqFilterIndexes(GetDBIndexKey(fltrIdxer.itemType, fltrIdxer.dbKeySuffix, false)); err != nil {
if err := tpr.dm.RemoveFilterIndexes(GetDBIndexKey(fltrIdxer.itemType, fltrIdxer.dbKeySuffix, false)); err != nil {
return err
}
if verbose {
@@ -2967,7 +2967,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
log.Print("StatQueue filter indexes:")
}
for tenant, fltrIdxer := range tpr.sqpIndexers {
if err := tpr.dm.RemoveReqFilterIndexes(GetDBIndexKey(fltrIdxer.itemType, fltrIdxer.dbKeySuffix, false)); err != nil {
if err := tpr.dm.RemoveFilterIndexes(GetDBIndexKey(fltrIdxer.itemType, fltrIdxer.dbKeySuffix, false)); err != nil {
return err
}
if verbose {
@@ -2979,7 +2979,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
log.Print("Threshold filter indexes:")
}
for tenant, fltrIdxer := range tpr.thdsIndexers {
if err := tpr.dm.RemoveReqFilterIndexes(GetDBIndexKey(fltrIdxer.itemType, fltrIdxer.dbKeySuffix, false)); err != nil {
if err := tpr.dm.RemoveFilterIndexes(GetDBIndexKey(fltrIdxer.itemType, fltrIdxer.dbKeySuffix, false)); err != nil {
return err
}
if verbose {
@@ -2991,7 +2991,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
log.Print("Indexing Supplier Profiles")
}
for tenant, fltrIdxer := range tpr.sppIndexers {
if err := tpr.dm.RemoveReqFilterIndexes(GetDBIndexKey(fltrIdxer.itemType, fltrIdxer.dbKeySuffix, false)); err != nil {
if err := tpr.dm.RemoveFilterIndexes(GetDBIndexKey(fltrIdxer.itemType, fltrIdxer.dbKeySuffix, false)); err != nil {
return err
}
if verbose {