diff --git a/apier/v1/thresholds.go b/apier/v1/thresholds.go index 0e8bf7423..8e116f1d7 100644 --- a/apier/v1/thresholds.go +++ b/apier/v1/thresholds.go @@ -77,7 +77,7 @@ func (apierV1 *ApierV1) SetThresholdProfile(thp *engine.ThresholdProfile, reply if missing := utils.MissingStructFields(thp, []string{"Tenant", "ID"}); len(missing) != 0 { return utils.NewErrMandatoryIeMissing(missing...) } - if err := apierV1.DataManager.SetThresholdProfile(thp); err != nil { + if err := apierV1.DataManager.SetThresholdProfile(thp, true); err != nil { return utils.APIErrorHandler(err) } cache.RemKey(utils.ThresholdProfilePrefix+utils.ConcatenatedKey(thp.Tenant, thp.ID), true, "") // ToDo: Remove here with autoreload diff --git a/apier/v1/thresholds_it_test.go b/apier/v1/thresholds_it_test.go index ea909643e..c503763a4 100644 --- a/apier/v1/thresholds_it_test.go +++ b/apier/v1/thresholds_it_test.go @@ -346,6 +346,28 @@ func testV1TSGetThresholdsAfterRestart(t *testing.T) { func testV1TSSetThresholdProfile(t *testing.T) { var reply *engine.ThresholdProfile + filter = &engine.Filter{ + Tenant: "cgrates.org", + ID: "TestFilter", + RequestFilters: []*engine.RequestFilter{ + &engine.RequestFilter{ + FieldName: "*string", + Type: "Account", + Values: []string{"1001", "1002"}, + }, + }, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), + ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), + }, + } + + var result string + if err := tSv1Rpc.Call("ApierV1.SetFilter", filter, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } if err := tSv1Rpc.Call("ApierV1.GetThresholdProfile", &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { @@ -354,7 +376,7 @@ func testV1TSSetThresholdProfile(t *testing.T) { tPrfl = &engine.ThresholdProfile{ Tenant: "cgrates.org", ID: "TEST_PROFILE1", - FilterIDs: []string{"FilterID1", "FilterID2"}, + FilterIDs: []string{"TestFilter"}, ActivationInterval: &utils.ActivationInterval{ ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC).Local(), ExpiryTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC).Local(), @@ -366,7 +388,6 @@ func testV1TSSetThresholdProfile(t *testing.T) { ActionIDs: []string{"ACT_1", "ACT_2"}, Async: true, } - var result string if err := tSv1Rpc.Call("ApierV1.SetThresholdProfile", tPrfl, &result); err != nil { t.Error(err) } else if result != utils.OK { @@ -382,7 +403,28 @@ func testV1TSSetThresholdProfile(t *testing.T) { func testV1TSUpdateThresholdProfile(t *testing.T) { var result string - tPrfl.FilterIDs = []string{"FilterID1", "FilterID2", "FilterID3"} + filter = &engine.Filter{ + Tenant: "cgrates.org", + ID: "TestFilter2", + RequestFilters: []*engine.RequestFilter{ + &engine.RequestFilter{ + FieldName: "*string", + Type: "Account", + Values: []string{"10", "20"}, + }, + }, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), + ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), + }, + } + + if err := tSv1Rpc.Call("ApierV1.SetFilter", filter, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + tPrfl.FilterIDs = []string{"TestFilter", "TestFilter2"} if err := tSv1Rpc.Call("ApierV1.SetThresholdProfile", tPrfl, &result); err != nil { t.Error(err) } else if result != utils.OK { diff --git a/engine/datamanager.go b/engine/datamanager.go index 1992dbaf3..2b87ce8a0 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -378,8 +378,33 @@ func (dm *DataManager) GetThresholdProfile(tenant, id string, skipCache bool, tr return } -func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile) (err error) { - return dm.DataDB().SetThresholdProfileDrv(th) +//from t reader setThpRf false from api true +func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool) (err error) { + if err = dm.DataDB().SetThresholdProfileDrv(th); err != nil { + return err + } + if withIndex { + var thdsIndexers map[string]*ReqFilterIndexer // tenant, indexer + thdsIndexers = make(map[string]*ReqFilterIndexer) + // index thresholds for filters + if _, has := thdsIndexers[th.Tenant]; !has { + if thdsIndexers[th.Tenant] = NewReqFilterIndexer(dm, utils.ThresholdProfilePrefix, th.Tenant); err != nil { + return + } + } + for _, fltrID := range th.FilterIDs { + var fltr *Filter + if fltr, err = dm.GetFilter(th.Tenant, fltrID, false, utils.NonTransactional); err != nil { + if err == utils.ErrNotFound { + err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, th) + } + return + } + thdsIndexers[th.Tenant].IndexFilters(th.ID, fltr) + } + thdsIndexers[th.Tenant].StoreIndexes(false) + } + return } func (dm *DataManager) RemoveThresholdProfile(tenant, id, transactionID string) (err error) { @@ -843,19 +868,31 @@ func (dm *DataManager) HasData(category, subject string) (has bool, err error) { return dm.DataDB().HasDataDrv(category, subject) } -func (dm *DataManager) GetReqFilterIndexes(dbKey string, fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error) { - return dm.DataDB().GetReqFilterIndexesDrv(dbKey, fldNameVal) +func (dm *DataManager) GetFilterIndexes(dbKey string, fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error) { + return dm.DataDB().GetFilterIndexesDrv(dbKey, fldNameVal) } -func (dm *DataManager) SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) { - return dm.DataDB().SetReqFilterIndexesDrv(dbKey, indexes, update) +func (dm *DataManager) SetFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) { + return dm.DataDB().SetFilterIndexesDrv(dbKey, indexes, update) } -func (dm *DataManager) RemoveReqFilterIndexes(dbKey string) (err error) { - return dm.DataDB().RemoveReqFilterIndexesDrv(dbKey) +func (dm *DataManager) RemoveFilterIndexes(dbKey string) (err error) { + return dm.DataDB().RemoveFilterIndexesDrv(dbKey) } -func (dm *DataManager) MatchReqFilterIndex(dbKey, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) { +func (dm *DataManager) GetFilterReverseIndexes(dbKey string, fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error) { + return dm.DataDB().GetFilterReverseIndexesDrv(dbKey, fldNameVal) +} + +func (dm *DataManager) SetFilterReverseIndexes(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) { + return dm.DataDB().SetFilterReverseIndexesDrv(dbKey, indexes, update) +} + +func (dm *DataManager) RemoveFilterReverseIndexes(dbKey, itemID string) (err error) { + return dm.DataDB().RemoveFilterReverseIndexesDrv(dbKey, itemID) +} + +func (dm *DataManager) MatchFilterIndex(dbKey, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) { fieldValKey := utils.ConcatenatedKey(fieldName, fieldVal) cacheKey := dbKey + fieldValKey if x, ok := cache.Get(cacheKey); ok { // Attempt to find in cache first @@ -865,7 +902,7 @@ func (dm *DataManager) MatchReqFilterIndex(dbKey, fieldName, fieldVal string) (i return x.(utils.StringMap), nil } // Not found in cache, check in DB - itemIDs, err = dm.DataDB().MatchReqFilterIndexDrv(dbKey, fieldName, fieldVal) + itemIDs, err = dm.DataDB().MatchFilterIndexDrv(dbKey, fieldName, fieldVal) if err != nil { if err == utils.ErrNotFound { cache.Set(cacheKey, nil, true, utils.NonTransactional) diff --git a/engine/filterindexer.go b/engine/filterindexer.go index d73163d6b..de05b28b7 100644 --- a/engine/filterindexer.go +++ b/engine/filterindexer.go @@ -19,7 +19,6 @@ along with this program. If not, see package engine import ( - "fmt" "github.com/cgrates/cgrates/utils" ) @@ -53,10 +52,12 @@ func (rfi *ReqFilterIndexer) ChangedKeys(reverse bool) utils.StringMap { } // IndexFilters parses reqFltrs, adding itemID in the indexes and marks the changed keys in chngdIndxKeys -func (rfi *ReqFilterIndexer) IndexFilters(itemID string, reqFltrs []*RequestFilter) { +func (rfi *ReqFilterIndexer) IndexFilters(itemID string, reqFltrs *Filter) { var hasMetaString bool - rfi.reveseIndex[itemID] = make(map[string]utils.StringMap) - for _, fltr := range reqFltrs { + if _, hasIt := rfi.reveseIndex[itemID]; !hasIt { + rfi.reveseIndex[itemID] = make(map[string]utils.StringMap) + } + for _, fltr := range reqFltrs.RequestFilters { if fltr.Type != MetaString { continue } @@ -93,10 +94,12 @@ func (rfi *ReqFilterIndexer) IndexFilters(itemID string, reqFltrs []*RequestFilt return } -// IndexFilters parses reqFltrs, adding itemID in the indexes and marks the changed keys in chngdIndxKeys +// IndexTPFilter parses reqFltrs, adding itemID in the indexes and marks the changed keys in chngdIndxKeys func (rfi *ReqFilterIndexer) IndexTPFilter(tpFltr *utils.TPFilterProfile, itemID string) { var hasMetaString bool - rfi.reveseIndex[itemID] = make(map[string]utils.StringMap) + if _, hasIt := rfi.reveseIndex[itemID]; !hasIt { + rfi.reveseIndex[itemID] = make(map[string]utils.StringMap) + } for _, fltr := range tpFltr.Filters { if fltr.Type != MetaString { continue @@ -136,15 +139,15 @@ func (rfi *ReqFilterIndexer) IndexTPFilter(tpFltr *utils.TPFilterProfile, itemID // StoreIndexes handles storing the indexes to dataDB func (rfi *ReqFilterIndexer) StoreIndexes(update bool) (err error) { - if err = rfi.dm.SetReqFilterIndexes(GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false), rfi.indexes, update); err != nil { + if err = rfi.dm.SetFilterIndexes(GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false), rfi.indexes, update); err != nil { return } - return rfi.dm.SetReqFilterIndexes(GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, true), rfi.reveseIndex, update) + return rfi.dm.SetFilterReverseIndexes(GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, true), rfi.reveseIndex, update) } //Populate the ReqFilterIndexer.reveseIndex for specifil itemID func (rfi *ReqFilterIndexer) loadItemReverseIndex(itemID string) (err error) { - rcvReveseIdx, err := rfi.dm.GetReqFilterIndexes( + rcvReveseIdx, err := rfi.dm.GetFilterReverseIndexes( GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, true), map[string]string{itemID: ""}) if err != nil { @@ -159,13 +162,12 @@ func (rfi *ReqFilterIndexer) loadItemReverseIndex(itemID string) (err error) { } rfi.reveseIndex[itemID][key2] = val2 } - utils.Logger.Debug(fmt.Sprintf("itemID %+v \n ReverseIndex %+v ", itemID, rfi.reveseIndex)) return err } //Populate ReqFilterIndexer.indexes with specific fieldName,fieldValue , nil func (rfi *ReqFilterIndexer) loadFldNameFldValIndex(fldName, fldVal string) error { - rcvIdx, err := rfi.dm.GetReqFilterIndexes( + rcvIdx, err := rfi.dm.GetFilterIndexes( GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false), map[string]string{fldName: fldVal}) if err != nil { @@ -179,7 +181,6 @@ func (rfi *ReqFilterIndexer) loadFldNameFldValIndex(fldName, fldVal string) erro rfi.indexes[fldName][fldVal] = itmMap } } - utils.Logger.Debug(fmt.Sprintf(" \n Index %+v ", rfi.reveseIndex)) return nil } @@ -202,8 +203,8 @@ func (rfi *ReqFilterIndexer) RemoveItemFromIndex(itemID string) (err error) { } } } - utils.Logger.Debug(fmt.Sprintf("Indexes : %+v \n", rfi.indexes)) rfi.StoreIndexes(true) + rfi.dm.RemoveFilterReverseIndexes(GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, true), itemID) return } diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index 21eb57ded..e687fc815 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -21,7 +21,6 @@ package engine import ( "fmt" - "log" "path" "reflect" "strings" @@ -45,9 +44,9 @@ var sTestsOnStorIT = []func(t *testing.T){ testOnStorITFlush, testOnStorITIsDBEmpty, testOnStorITSetGetDerivedCharges, - testOnStorITSetReqFilterIndexes, - testOnStorITGetReqFilterIndexes, - testOnStorITMatchReqFilterIndex, + testOnStorITSetFilterIndexes, + testOnStorITGetFilterIndexes, + testOnStorITMatchFilterIndex, testOnStorITCacheDestinations, testOnStorITCacheReverseDestinations, testOnStorITCacheRatingPlan, @@ -199,7 +198,7 @@ func testOnStorITSetGetDerivedCharges(t *testing.T) { } } -func testOnStorITSetReqFilterIndexes(t *testing.T) { +func testOnStorITSetFilterIndexes(t *testing.T) { idxes := map[string]map[string]utils.StringMap{ "Account": map[string]utils.StringMap{ "1001": utils.StringMap{ @@ -226,14 +225,14 @@ func testOnStorITSetReqFilterIndexes(t *testing.T) { }, }, } - if err := onStor.SetReqFilterIndexes( + if err := onStor.SetFilterIndexes( GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), - idxes); err != nil { + idxes, false); err != nil { t.Error(err) } } -func testOnStorITGetReqFilterIndexes(t *testing.T) { +func testOnStorITGetFilterIndexes(t *testing.T) { eIdxes := map[string]map[string]utils.StringMap{ "Account": map[string]utils.StringMap{ "1001": utils.StringMap{ @@ -271,122 +270,58 @@ func testOnStorITGetReqFilterIndexes(t *testing.T) { }, }, } - if exsbjDan, err := onStor.GetReqFilterIndexes( + if exsbjDan, err := onStor.GetFilterIndexes( GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), sbjDan); err != nil { t.Error(err) } else if !reflect.DeepEqual(expectedsbjDan, exsbjDan) { t.Errorf("Expecting: %+v, received: %+v", expectedsbjDan, exsbjDan) } - if rcv, err := onStor.GetReqFilterIndexes( + if rcv, err := onStor.GetFilterIndexes( GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), nil); err != nil { t.Error(err) } else { - log.Printf(fmt.Sprintf("RCV ALL IDX %+v \n", rcv)) if !reflect.DeepEqual(eIdxes, rcv) { t.Errorf("Expecting: %+v, received: %+v", eIdxes, rcv) } } - // expectedsbjDan["Subject"]["dan"] = nil - // if err := onStor.SetReqFilterIndexes( - // GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), - // expectedsbjDan); err != nil { - // t.Error(err) - // } - // if rcvidx, err := onStor.GetReqFilterIndexes( - // GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), - // nil); err != nil { - // t.Error(err) - // } else { - // log.Printf(fmt.Sprintf("RcvALLIdx %+v", rcvidx)) - // } - // if rcvidx, err := onStor.GetReqFilterIndexes( - // GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), - // sbjDan); err == nil || err.Error() != utils.ErrNotFound.Error() { - // log.Printf(fmt.Sprintf("RcvIdx %+v", rcvidx)) - // t.Error(err) - // } else { - // log.Printf(fmt.Sprintf("RcvIdx %+v", rcvidx)) - // } - - idxes := map[string]map[string]utils.StringMap{ - "Account": map[string]utils.StringMap{ - "1001": utils.StringMap{ - "RL1": true, - }, - "1002": utils.StringMap{ - "RL1": true, - "RL2": true, - }, - "dan": utils.StringMap{ - "RL2": true, - }, - }, - "Subject": map[string]utils.StringMap{ - "dan": utils.StringMap{}, - }, - utils.NOT_AVAILABLE: map[string]utils.StringMap{ - utils.NOT_AVAILABLE: utils.StringMap{ - "RL4": true, - "RL5": true, - }, - }, - } - if err := onStor.SetReqFilterIndexes( - GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), - idxes); err != nil { + if _, err := onStor.GetFilterIndexes("unknown_key", nil); err == nil || err != utils.ErrNotFound { t.Error(err) } - if rcv, err := onStor.GetReqFilterIndexes( - GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), - nil); err != nil { + if err := onStor.RemoveFilterIndexes(GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false)); err != nil { + t.Error(err) + } + _, err := onStor.GetFilterIndexes( + GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), nil) + if err != utils.ErrNotFound { + //if err!=nil{ + t.Error(err) + //}else if !reflect.DeepEqual(eIdxes, idxes) { + // t.Errorf("Expecting: %+v, received: %+v", eIdxes, idxes) + } + if err := onStor.SetFilterIndexes( + GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), + eIdxes, false); err != nil { t.Error(err) - } else { - log.Printf(fmt.Sprintf("RCV ALL IDX %+v \n", rcv)) - // if !reflect.DeepEqual(eIdxes, rcv) { - // t.Errorf("Expecting: %+v, received: %+v", eIdxes, rcv) - // } } - /* - if idxes, err := onStor.GetReqFilterIndexes( - GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), - nil); err != nil { - t.Error(err) - } else if !reflect.DeepEqual(eIdxes, idxes) { - t.Errorf("Expecting: %+v, received: %+v", eIdxes, idxes) - } - - if _, err := onStor.GetReqFilterIndexes("unknown_key", nil); err == nil || err != utils.ErrNotFound { - t.Error(err) - } - if err := onStor.RemoveReqFilterIndexes(utils.ResourceProfilesStringIndex); err != nil { - t.Error(err) - } - _, err := onStor.GetReqFilterIndexes(utils.ResourceProfilesStringIndex, nil) - if err != utils.ErrNotFound { - //if err!=nil{ - t.Error(err) - //}else if !reflect.DeepEqual(eIdxes, idxes) { - // t.Errorf("Expecting: %+v, received: %+v", eIdxes, idxes) - } - if err := onStor.SetReqFilterIndexes(utils.ResourceProfilesStringIndex, eIdxes); err != nil { - t.Error(err) - } - */ } -func testOnStorITMatchReqFilterIndex(t *testing.T) { +func testOnStorITMatchFilterIndex(t *testing.T) { eMp := utils.StringMap{ "RL1": true, "RL2": true, } - if rcvMp, err := onStor.MatchReqFilterIndex(utils.ResourceProfilesStringIndex, "Account", "1002"); err != nil { + if rcvMp, err := onStor.MatchFilterIndex( + GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), + "Account", "1002"); err != nil { t.Error(err) } else if !reflect.DeepEqual(eMp, rcvMp) { t.Errorf("Expecting: %+v, received: %+v", eMp, rcvMp) } - if _, err := onStor.MatchReqFilterIndex(utils.ResourceProfilesStringIndex, "NonexistentField", "1002"); err == nil || err != utils.ErrNotFound { + if _, err := onStor.MatchFilterIndex( + GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), + "NonexistentField", "1002"); err == nil || err != utils.ErrNotFound { t.Error(err) } } @@ -1116,10 +1051,25 @@ func testOnStorITCacheStatQueue(t *testing.T) { } func testOnStorITCacheThresholdProfile(t *testing.T) { + filter := &Filter{ + Tenant: "cgrates.org", + ID: "TestFilter", + RequestFilters: []*RequestFilter{ + &RequestFilter{ + FieldName: "*string", + Type: "Account", + Values: []string{"1001", "1002"}, + }, + }, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), + ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), + }, + } tPrfl := &ThresholdProfile{ Tenant: "cgrates.org", ID: "Test_Threshold_Cache", - FilterIDs: []string{"FilterID1", "FilterID2"}, + FilterIDs: []string{"TestFilter"}, ActivationInterval: &utils.ActivationInterval{ ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), @@ -1131,7 +1081,10 @@ func testOnStorITCacheThresholdProfile(t *testing.T) { ActionIDs: []string{"ACT_1", "ACT_2"}, Async: true, } - if err := onStor.SetThresholdProfile(tPrfl); err != nil { + if err := onStor.SetFilter(filter); err != nil { + t.Error(err) + } + if err := onStor.SetThresholdProfile(tPrfl, true); err != nil { t.Error(err) } expectedR := []string{"thp_cgrates.org:Test_Threshold_Cache"} @@ -1202,7 +1155,7 @@ func testOnStorITCacheFilter(t *testing.T) { if err := onStor.SetFilter(filter); err != nil { t.Error(err) } - expectedT := []string{"ftr_cgrates.org:Filter1"} + expectedT := []string{"ftr_cgrates.org:TestFilter", "ftr_cgrates.org:Filter1"} if itm, err := onStor.DataDB().GetKeysForPrefix(utils.FilterPrefix); err != nil { t.Error(err) } else if !reflect.DeepEqual(expectedT, itm) { @@ -1220,7 +1173,6 @@ func testOnStorITCacheFilter(t *testing.T) { } else if rcv := itm.(*Filter); !reflect.DeepEqual(filter, rcv) { t.Errorf("Expecting: %+v, received: %+v", filter, rcv) } - } func testOnStorITCacheSupplierProfile(t *testing.T) { @@ -2482,23 +2434,41 @@ func testOnStorITCRUDStoredStatQueue(t *testing.T) { } func testOnStorITCRUDThresholdProfile(t *testing.T) { + fp := &Filter{ + Tenant: "cgrates.org", + ID: "TestFilter2", + RequestFilters: []*RequestFilter{ + &RequestFilter{ + FieldName: "Account", + Type: "*string", + Values: []string{"1001", "1002"}, + }, + }, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), + ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), + }, + } timeMinSleep := time.Duration(0 * time.Second) th := &ThresholdProfile{ Tenant: "cgrates.org", ID: "test", ActivationInterval: &utils.ActivationInterval{}, - FilterIDs: []string{}, + FilterIDs: []string{"TestFilter2"}, Recurrent: true, MinSleep: timeMinSleep, Blocker: true, Weight: 1.4, ActionIDs: []string{}, } + if err := onStor.SetFilter(fp); err != nil { + t.Error(err) + } if _, rcvErr := onStor.GetThresholdProfile(th.Tenant, th.ID, false, utils.NonTransactional); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } - if err := onStor.SetThresholdProfile(th); err != nil { + if err := onStor.SetThresholdProfile(th, true); err != nil { t.Error(err) } if rcv, err := onStor.GetThresholdProfile(th.Tenant, th.ID, true, utils.NonTransactional); err != nil { diff --git a/engine/reqfilterhelpers.go b/engine/reqfilterhelpers.go index 9145221fc..3bee85d90 100644 --- a/engine/reqfilterhelpers.go +++ b/engine/reqfilterhelpers.go @@ -50,7 +50,7 @@ func matchingItemIDsForEvent(ev map[string]interface{}, fieldIDs []string, fmt.Sprintf("<%s> cannot cast field: %s into string", utils.FilterS, fldName)) continue } - dbItemIDs, err := dm.MatchReqFilterIndex(dbIdxKey, fldName, fldVal) + dbItemIDs, err := dm.MatchFilterIndex(dbIdxKey, fldName, fldVal) if err != nil { if err == utils.ErrNotFound { continue @@ -63,7 +63,7 @@ func matchingItemIDsForEvent(ev map[string]interface{}, fieldIDs []string, } } } - dbItemIDs, err := dm.MatchReqFilterIndex(dbIdxKey, utils.NOT_AVAILABLE, utils.NOT_AVAILABLE) // add unindexed itemIDs to be checked + dbItemIDs, err := dm.MatchFilterIndex(dbIdxKey, utils.NOT_AVAILABLE, utils.NOT_AVAILABLE) // add unindexed itemIDs to be checked if err != nil { if err != utils.ErrNotFound { return nil, err diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 2c2e65dea..a023f00c4 100755 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -117,10 +117,13 @@ type DataDB interface { RemoveTimingDrv(string) error GetLoadHistory(int, bool, string) ([]*utils.LoadInstance, error) AddLoadHistory(*utils.LoadInstance, int, string) error - GetReqFilterIndexesDrv(dbKey string, fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error) - SetReqFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) - RemoveReqFilterIndexesDrv(id string) (err error) - MatchReqFilterIndexDrv(dbKey, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) + GetFilterIndexesDrv(dbKey string, fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error) + SetFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) + RemoveFilterIndexesDrv(id string) (err error) + GetFilterReverseIndexesDrv(dbKey string, fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error) + SetFilterReverseIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) + RemoveFilterReverseIndexesDrv(dbKey, itemID string) (err error) + MatchFilterIndexDrv(dbKey, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) GetStatQueueProfileDrv(tenant string, ID string) (sq *StatQueueProfile, err error) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) RemStatQueueProfileDrv(tenant, id string) (err error) diff --git a/engine/storage_map.go b/engine/storage_map.go index 61b53e901..09452bb8d 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1254,7 +1254,7 @@ func (ms *MapStorage) RemoveTimingDrv(id string) error { return nil } -func (ms *MapStorage) GetReqFilterIndexesDrv(dbKey string, +func (ms *MapStorage) GetFilterIndexesDrv(dbKey string, fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error) { ms.mu.RLock() defer ms.mu.RUnlock() @@ -1287,7 +1287,8 @@ func (ms *MapStorage) GetReqFilterIndexesDrv(dbKey string, } return } -func (ms *MapStorage) SetReqFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) { + +func (ms *MapStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) { ms.mu.Lock() defer ms.mu.Unlock() result, err := ms.ms.Marshal(indexes) @@ -1298,14 +1299,66 @@ func (ms *MapStorage) SetReqFilterIndexesDrv(dbKey string, indexes map[string]ma return } -func (ms *MapStorage) RemoveReqFilterIndexesDrv(id string) (err error) { +func (ms *MapStorage) RemoveFilterIndexesDrv(id string) (err error) { ms.mu.Lock() defer ms.mu.Unlock() delete(ms.dict, id) return } -func (ms *MapStorage) MatchReqFilterIndexDrv(dbKey, fldName, fldVal string) (itemIDs utils.StringMap, err error) { +func (ms *MapStorage) GetFilterReverseIndexesDrv(dbKey string, + fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() + values, ok := ms.dict[dbKey] + if !ok { + return nil, utils.ErrNotFound + } + if len(fldNameVal) != 0 { + rcvidx := make(map[string]map[string]utils.StringMap) + err = ms.ms.Unmarshal(values, &rcvidx) + if err != nil { + return nil, err + } + indexes = make(map[string]map[string]utils.StringMap) + for fldName, fldVal := range fldNameVal { + if _, has := indexes[fldName]; !has { + indexes[fldName] = make(map[string]utils.StringMap) + } + if _, has := indexes[fldName][fldVal]; !has { + indexes[fldName][fldVal] = make(utils.StringMap) + } + indexes[fldName][fldVal] = rcvidx[fldName][fldVal] + } + return + } else { + err = ms.ms.Unmarshal(values, &indexes) + if err != nil { + return nil, err + } + } + return +} + +func (ms *MapStorage) SetFilterReverseIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() + result, err := ms.ms.Marshal(indexes) + if err != nil { + return err + } + ms.dict[dbKey] = result + return +} + +func (ms *MapStorage) RemoveFilterReverseIndexesDrv(dbKey, itemID string) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() + delete(ms.dict, utils.ConcatenatedKey(dbKey, itemID)) + return +} + +func (ms *MapStorage) MatchFilterIndexDrv(dbKey, fldName, fldVal string) (itemIDs utils.StringMap, err error) { ms.mu.RLock() defer ms.mu.RUnlock() values, ok := ms.dict[dbKey] diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 0719575a5..d5232e500 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -1928,7 +1928,7 @@ func (ms *MongoStorage) RemoveTimingDrv(id string) (err error) { return nil } -func (ms *MongoStorage) GetReqFilterIndexesDrv(dbKey string, +func (ms *MongoStorage) GetFilterIndexesDrv(dbKey string, fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error) { session, col := ms.conn(colRFI) defer session.Close() @@ -1963,15 +1963,13 @@ func (ms *MongoStorage) GetReqFilterIndexesDrv(dbKey string, return result.Value, nil } -func (ms *MongoStorage) SetReqFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) { +func (ms *MongoStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) { session, col := ms.conn(colRFI) defer session.Close() if update { - utils.Logger.Debug("\n ENTER HERE !! \n") for k, v := range indexes { for k2, v2 := range v { findParam2 := fmt.Sprintf("value.%s.%s", k, k2) - utils.Logger.Debug(fmt.Sprintf("\nFINDPARAM2 %+v and len(v2) %+v \n", findParam2, len(v2))) if len(v2) != 0 { for k3 := range v2 { err = col.Update(bson.M{"key": dbKey}, bson.M{"$set": bson.M{findParam2: bson.M{k3: true}}}) @@ -1990,7 +1988,7 @@ func (ms *MongoStorage) SetReqFilterIndexesDrv(dbKey string, indexes map[string] return } -func (ms *MongoStorage) RemoveReqFilterIndexesDrv(id string) (err error) { +func (ms *MongoStorage) RemoveFilterIndexesDrv(id string) (err error) { session, col := ms.conn(colRFI) defer session.Close() if err = col.Remove(bson.M{"key": id}); err != nil { @@ -1998,8 +1996,77 @@ func (ms *MongoStorage) RemoveReqFilterIndexesDrv(id string) (err error) { } return nil } +func (ms *MongoStorage) GetFilterReverseIndexesDrv(dbKey string, + fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error) { + session, col := ms.conn(colRFI) + defer session.Close() + var result struct { + Key string + Value map[string]map[string]utils.StringMap + } + findParam := bson.M{"key": dbKey} + if len(fldNameVal) != 0 { + for fldName, fldValue := range fldNameVal { + var qryFltr bson.M + if fldValue == "" { + qryFltr = bson.M{fmt.Sprintf("value.%s", fldName): 1} + } else { + qryFltr = bson.M{fmt.Sprintf("value.%s.%s", fldName, fldValue): 1} + } + if err = col.Find(findParam).Select(qryFltr).One(&result); err != nil { + if err == mgo.ErrNotFound { + err = utils.ErrNotFound + } + return nil, err + } + return result.Value, nil + } + } + if err = col.Find(findParam).One(&result); err != nil { + if err == mgo.ErrNotFound { + err = utils.ErrNotFound + } + return nil, err + } + return result.Value, nil +} -func (ms *MongoStorage) MatchReqFilterIndexDrv(dbKey, fldName, fldVal string) (itemIDs utils.StringMap, err error) { +func (ms *MongoStorage) SetFilterReverseIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) { + session, col := ms.conn(colRFI) + defer session.Close() + if update { + for k, v := range indexes { + for k2, v2 := range v { + findParam2 := fmt.Sprintf("value.%s.%s", k, k2) + if len(v2) != 0 { + for k3 := range v2 { + err = col.Update(bson.M{"key": dbKey}, bson.M{"$set": bson.M{findParam2: bson.M{k3: true}}}) + } + } else { + err = col.Update(bson.M{"key": dbKey}, bson.M{"$unset": bson.M{findParam2: 1}}) + } + } + } + } else { + _, err = col.Upsert(bson.M{"key": dbKey}, &struct { + Key string + Value map[string]map[string]utils.StringMap + }{dbKey, indexes}) + } + return +} + +func (ms *MongoStorage) RemoveFilterReverseIndexesDrv(dbKey, itemID string) (err error) { + session, col := ms.conn(colRFI) + defer session.Close() + findParam := fmt.Sprintf("value.%s", itemID) + if err = col.Update(bson.M{"key": dbKey}, bson.M{"$unset": bson.M{findParam: 1}}); err != nil { + return + } + return nil +} + +func (ms *MongoStorage) MatchFilterIndexDrv(dbKey, fldName, fldVal string) (itemIDs utils.StringMap, err error) { session, col := ms.conn(colRFI) defer session.Close() var result struct { diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 352c0843f..9545a20f4 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1368,7 +1368,7 @@ func (rs *RedisStorage) RemoveTimingDrv(id string) (err error) { return } -func (rs *RedisStorage) GetReqFilterIndexesDrv(dbKey string, +func (rs *RedisStorage) GetFilterIndexesDrv(dbKey string, fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error) { mp := make(map[string]string) if len(fldNameVal) == 0 { @@ -1412,35 +1412,102 @@ func (rs *RedisStorage) GetReqFilterIndexesDrv(dbKey string, return } -func (rs *RedisStorage) SetReqFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) { +func (rs *RedisStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) { mp := make(map[string]string) - if update { - for fldName, fldValMp := range indexes { - for fldVal, strMp := range fldValMp { - if len(strMp) == 0 { // remove with no more elements inside - return rs.Cmd("HDEL", dbKey, utils.ConcatenatedKey(fldName, fldVal)).Err - } - if encodedMp, err := rs.ms.Marshal(strMp); err != nil { + for fldName, fldValMp := range indexes { + for fldVal, strMp := range fldValMp { + if len(strMp) == 0 { // remove with no more elements inside + if err = rs.Cmd("HDEL", dbKey, utils.ConcatenatedKey(fldName, fldVal)).Err; err != nil { return err - } else { - mp[utils.ConcatenatedKey(fldName, fldVal)] = string(encodedMp) } + continue + } + if encodedMp, err := rs.ms.Marshal(strMp); err != nil { + return err + } else { + mp[utils.ConcatenatedKey(fldName, fldVal)] = string(encodedMp) } } - } else { - return rs.Cmd("HMSET", dbKey, mp).Err } - return + return rs.Cmd("HMSET", dbKey, mp).Err } -func (rs *RedisStorage) RemoveReqFilterIndexesDrv(id string) (err error) { +func (rs *RedisStorage) RemoveFilterIndexesDrv(id string) (err error) { if err = rs.Cmd("DEL", id).Err; err != nil { return err } return } -func (rs *RedisStorage) MatchReqFilterIndexDrv(dbKey, fldName, fldVal string) (itemIDs utils.StringMap, err error) { +//GetFilterReverseIndexesDrv retrieves ReverseIndexes from dataDB +func (rs *RedisStorage) GetFilterReverseIndexesDrv(dbKey string, + fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error) { + mp := make(map[string]string) + if len(fldNameVal) == 0 { + mp, err = rs.Cmd("HGETALL", dbKey).Map() + if err != nil { + return + } else if len(mp) == 0 { + return nil, utils.ErrNotFound + } + } else { + var itmMpStrLst []string + for fldName, _ := range fldNameVal { + itmMpStrLst, err = rs.Cmd("HMGET", dbKey, fldName).List() + if err != nil { + return + } else if itmMpStrLst[0] == "" { + return nil, utils.ErrNotFound + } + mp[fldName] = itmMpStrLst[0] + } + } + indexes = make(map[string]map[string]utils.StringMap) + for k, v := range mp { + var sm map[string]utils.StringMap + if err = rs.ms.Unmarshal([]byte(v), &sm); err != nil { + return + } + if _, hasKey := indexes[k]; !hasKey { + indexes[k] = make(map[string]utils.StringMap) + } + for key, val := range sm { + if _, hasKey := indexes[k][key]; !hasKey { + indexes[k][key] = make(utils.StringMap) + } + indexes[k][key] = val + } + } + return +} + +func (rs *RedisStorage) SetFilterReverseIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) { + mp := make(map[string]string) + for fldName, fldValMp := range indexes { + for _, strMp := range fldValMp { + if len(strMp) == 0 { // remove with no more elements inside + if err = rs.Cmd("HDEL", dbKey, fldName).Err; err != nil { + return err + } + } + if encodedMp, err := rs.ms.Marshal(fldValMp); err != nil { + return err + } else { + mp[fldName] = string(encodedMp) + } + } + } + return rs.Cmd("HMSET", dbKey, mp).Err +} + +func (rs *RedisStorage) RemoveFilterReverseIndexesDrv(dbKey, itemID string) (err error) { + if err = rs.Cmd("HDEL", dbKey, itemID).Err; err != nil { + return err + } + return +} + +func (rs *RedisStorage) MatchFilterIndexDrv(dbKey, fldName, fldVal string) (itemIDs utils.StringMap, err error) { fieldValKey := utils.ConcatenatedKey(fldName, fldVal) fldValBytes, err := rs.Cmd("HGET", dbKey, fieldValKey).Bytes() if err != nil { diff --git a/engine/suppliers_test.go b/engine/suppliers_test.go index 936a60ab2..7d4ca0fbc 100644 --- a/engine/suppliers_test.go +++ b/engine/suppliers_test.go @@ -180,7 +180,6 @@ func TestSuppliersPopulateSupplierService(t *testing.T) { filter2 := &Filter{Tenant: config.CgrConfig().DefaultTenant, ID: "filter2", RequestFilters: filters2} dmspl.SetFilter(filter1) dmspl.SetFilter(filter2) - ssd := make(map[string]SuppliersSorter) ssd[utils.MetaWeight] = NewWeightSorter() splserv = SupplierService{ @@ -274,13 +273,13 @@ func TestSuppliersPopulateSupplierService(t *testing.T) { dmspl.DataDB().SetSupplierProfileDrv(spr) } ref := NewReqFilterIndexer(dmspl, utils.SupplierProfilePrefix, "cgrates.org") - ref.IndexFilters("supplierprofile1", filters1) - ref.IndexFilters("supplierprofile2", filters2) + ref.IndexFilters("supplierprofile1", filter1) + ref.IndexFilters("supplierprofile2", filter2) err = ref.StoreIndexes(false) if err != nil { t.Errorf("Error: %+v", err) } - //test here GetReqFilterIndexes with a specific map + //test here GetReqFilterIndexes for StorageMap with a specific map expidx := map[string]map[string]utils.StringMap{ "supplierprofile1": { "Supplier": { @@ -290,7 +289,9 @@ func TestSuppliersPopulateSupplierService(t *testing.T) { } splPrf1 := make(map[string]string) splPrf1["supplierprofile1"] = "Supplier" - if rcvidx, err := dmspl.GetReqFilterIndexes(GetDBIndexKey(utils.SupplierProfilePrefix, "cgrates.org", false), splPrf1); err != nil { + if rcvidx, err := dmspl.GetFilterIndexes( + GetDBIndexKey(utils.SupplierProfilePrefix, "cgrates.org", false), + splPrf1); err != nil { t.Errorf("Error: %+v", err) } else { if !reflect.DeepEqual(expidx, rcvidx) { diff --git a/engine/tp_reader.go b/engine/tp_reader.go index b92670ad9..d1236a141 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -2243,7 +2243,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err if err != nil { return err } - if err = tpr.dm.SetThresholdProfile(th); err != nil { + if err = tpr.dm.SetThresholdProfile(th, false); err != nil { return err } if verbose { @@ -2955,7 +2955,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro log.Print("Indexing resource profiles") } for tenant, fltrIdxer := range tpr.resIndexers { - if err := tpr.dm.RemoveReqFilterIndexes(GetDBIndexKey(fltrIdxer.itemType, fltrIdxer.dbKeySuffix, false)); err != nil { + if err := tpr.dm.RemoveFilterIndexes(GetDBIndexKey(fltrIdxer.itemType, fltrIdxer.dbKeySuffix, false)); err != nil { return err } if verbose { @@ -2967,7 +2967,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro log.Print("StatQueue filter indexes:") } for tenant, fltrIdxer := range tpr.sqpIndexers { - if err := tpr.dm.RemoveReqFilterIndexes(GetDBIndexKey(fltrIdxer.itemType, fltrIdxer.dbKeySuffix, false)); err != nil { + if err := tpr.dm.RemoveFilterIndexes(GetDBIndexKey(fltrIdxer.itemType, fltrIdxer.dbKeySuffix, false)); err != nil { return err } if verbose { @@ -2979,7 +2979,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro log.Print("Threshold filter indexes:") } for tenant, fltrIdxer := range tpr.thdsIndexers { - if err := tpr.dm.RemoveReqFilterIndexes(GetDBIndexKey(fltrIdxer.itemType, fltrIdxer.dbKeySuffix, false)); err != nil { + if err := tpr.dm.RemoveFilterIndexes(GetDBIndexKey(fltrIdxer.itemType, fltrIdxer.dbKeySuffix, false)); err != nil { return err } if verbose { @@ -2991,7 +2991,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro log.Print("Indexing Supplier Profiles") } for tenant, fltrIdxer := range tpr.sppIndexers { - if err := tpr.dm.RemoveReqFilterIndexes(GetDBIndexKey(fltrIdxer.itemType, fltrIdxer.dbKeySuffix, false)); err != nil { + if err := tpr.dm.RemoveFilterIndexes(GetDBIndexKey(fltrIdxer.itemType, fltrIdxer.dbKeySuffix, false)); err != nil { return err } if verbose { diff --git a/migrator/thresholds.go b/migrator/thresholds.go index 7f57981cf..74b64b6ea 100644 --- a/migrator/thresholds.go +++ b/migrator/thresholds.go @@ -83,7 +83,7 @@ func (m *Migrator) migrateCurrentThresholds() (err error) { } if ths != nil { if m.dryRun != true { - if err := m.dmOut.SetThresholdProfile(ths); err != nil { + if err := m.dmOut.SetThresholdProfile(ths, true); err != nil { return err } } @@ -114,7 +114,7 @@ func (m *Migrator) migrateV2ActionTriggers() (err error) { if err := m.dmOut.SetThreshold(th); err != nil { return err } - if err := m.dmOut.SetThresholdProfile(thp); err != nil { + if err := m.dmOut.SetThresholdProfile(thp, true); err != nil { return err } m.stats[utils.Thresholds] += 1 @@ -268,7 +268,7 @@ func (m *Migrator) SasThreshold(v2ATR *engine.ActionTrigger) (err error) { if err := m.dmOut.SetThreshold(th); err != nil { return err } - if err := m.dmOut.SetThresholdProfile(thp); err != nil { + if err := m.dmOut.SetThresholdProfile(thp, true); err != nil { return err } m.stats[utils.Thresholds] += 1