uniformized code across redis and mongo drivers ,fixed integration test for filter indexes

This commit is contained in:
edwardro22
2018-01-05 09:17:54 +02:00
committed by Dan Christian Bogos
parent 04d81b5334
commit f703c2bd29
13 changed files with 123 additions and 123 deletions

View File

@@ -409,14 +409,16 @@ func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool)
return
}
func (dm *DataManager) RemoveThresholdProfile(tenant, id, transactionID string) (err error) {
func (dm *DataManager) RemoveThresholdProfile(tenant, id, transactionID string, withIndex bool) (err error) {
if err = dm.DataDB().RemThresholdProfileDrv(tenant, id); err != nil {
return
}
cache.RemKey(utils.ThresholdProfilePrefix+utils.ConcatenatedKey(tenant, id),
cacheCommit(transactionID), transactionID)
return NewReqFilterIndexer(dm, utils.ThresholdProfilePrefix,
tenant).RemoveItemFromIndex(id)
if withIndex {
return NewReqFilterIndexer(dm, utils.ThresholdProfilePrefix, tenant).RemoveItemFromIndex(id)
}
return
}
func (dm *DataManager) GetStatQueueProfile(tenant, id string, skipCache bool, transactionID string) (sqp *StatQueueProfile, err error) {
@@ -444,37 +446,40 @@ func (dm *DataManager) SetStatQueueProfile(sqp *StatQueueProfile, withIndex bool
if err = dm.DataDB().SetStatQueueProfileDrv(sqp); err != nil {
return err
}
// if withIndex {
// indexer := NewReqFilterIndexer(dm, utils.ThresholdProfilePrefix, sqp.Tenant)
// //remove old StatQueueProfile indexes
// if err = indexer.RemoveItemFromIndex(sqp.ID); err != nil &&
// err.Error() != utils.ErrNotFound.Error() {
// return
// }
// //Verify matching Filters for every FilterID from StatQueueProfile
// for _, fltrID := range sqp.FilterIDs {
// var fltr *Filter
// if fltr, err = dm.GetFilter(sqp.Tenant, fltrID, false, utils.NonTransactional); err != nil {
// if err == utils.ErrNotFound {
// err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, sqp)
// }
// return
// }
// indexer.IndexTPFilter(FilterToTPFilter(fltr), sqp.ID)
// }
// if err = indexer.StoreIndexes(); err != nil {
// return
// }
// }
if withIndex {
indexer := NewReqFilterIndexer(dm, utils.StatQueueProfilePrefix, sqp.Tenant)
//remove old StatQueueProfile indexes
if err = indexer.RemoveItemFromIndex(sqp.ID); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
return
}
//Verify matching Filters for every FilterID from StatQueueProfile
for _, fltrID := range sqp.FilterIDs {
var fltr *Filter
if fltr, err = dm.GetFilter(sqp.Tenant, fltrID, false, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, sqp)
}
return
}
indexer.IndexTPFilter(FilterToTPFilter(fltr), sqp.ID)
}
if err = indexer.StoreIndexes(); err != nil {
return
}
}
return
}
func (dm *DataManager) RemoveStatQueueProfile(tenant, id, transactionID string) (err error) {
func (dm *DataManager) RemoveStatQueueProfile(tenant, id, transactionID string, withIndex bool) (err error) {
if err = dm.DataDB().RemStatQueueProfileDrv(tenant, id); err != nil {
return
}
cache.RemKey(utils.StatQueueProfilePrefix+utils.ConcatenatedKey(tenant, id),
cacheCommit(transactionID), transactionID)
if withIndex {
return NewReqFilterIndexer(dm, utils.StatQueueProfilePrefix, tenant).RemoveItemFromIndex(id)
}
return
}
@@ -596,12 +601,15 @@ func (dm *DataManager) SetResourceProfile(rp *ResourceProfile, withIndex bool) (
return
}
func (dm *DataManager) RemoveResourceProfile(tenant, id, transactionID string) (err error) {
func (dm *DataManager) RemoveResourceProfile(tenant, id, transactionID string, withIndex bool) (err error) {
if err = dm.DataDB().RemoveResourceProfileDrv(tenant, id); err != nil {
return
}
cache.RemKey(utils.ResourceProfilesPrefix+utils.ConcatenatedKey(tenant, id),
cacheCommit(transactionID), transactionID)
if withIndex {
return NewReqFilterIndexer(dm, utils.ResourceProfilesPrefix, tenant).RemoveItemFromIndex(id)
}
return
}
@@ -941,8 +949,8 @@ func (dm *DataManager) SetFilterReverseIndexes(dbKey string, indexes map[string]
return dm.DataDB().SetFilterReverseIndexesDrv(dbKey, indexes)
}
func (dm *DataManager) RemoveFilterReverseIndexes(dbKey, itemID string) (err error) {
return dm.DataDB().RemoveFilterReverseIndexesDrv(dbKey, itemID)
func (dm *DataManager) RemoveFilterReverseIndexes(dbKey string) (err error) {
return dm.DataDB().RemoveFilterReverseIndexesDrv(dbKey)
}
func (dm *DataManager) MatchFilterIndex(dbKey, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) {
@@ -1016,37 +1024,40 @@ func (dm *DataManager) SetSupplierProfile(supp *SupplierProfile, withIndex bool)
return err
}
//to be implemented in tests
// if withIndex {
// indexer := NewReqFilterIndexer(dm, utils.SupplierProfilePrefix, supp.Tenant)
// //remove old SupplierProfile indexes
// if err = indexer.RemoveItemFromIndex(supp.ID); err != nil &&
// err.Error() != utils.ErrNotFound.Error() {
// return
// }
// //Verify matching Filters for every FilterID from SupplierProfile
// for _, fltrID := range supp.FilterIDs {
// var fltr *Filter
// if fltr, err = dm.GetFilter(supp.Tenant, fltrID, false, utils.NonTransactional); err != nil {
// if err == utils.ErrNotFound {
// err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, supp)
// }
// return
// }
// indexer.IndexTPFilter(FilterToTPFilter(fltr), supp.ID)
// }
// if err = indexer.StoreIndexes(); err != nil {
// return
// }
// }
if withIndex {
indexer := NewReqFilterIndexer(dm, utils.SupplierProfilePrefix, supp.Tenant)
//remove old SupplierProfile indexes
if err = indexer.RemoveItemFromIndex(supp.ID); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
return
}
//Verify matching Filters for every FilterID from SupplierProfile
for _, fltrID := range supp.FilterIDs {
var fltr *Filter
if fltr, err = dm.GetFilter(supp.Tenant, fltrID, false, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, supp)
}
return
}
indexer.IndexTPFilter(FilterToTPFilter(fltr), supp.ID)
}
if err = indexer.StoreIndexes(); err != nil {
return
}
}
return
}
func (dm *DataManager) RemoveSupplierProfile(tenant, id, transactionID string) (err error) {
func (dm *DataManager) RemoveSupplierProfile(tenant, id, transactionID string, withIndex bool) (err error) {
if err = dm.DataDB().RemoveSupplierProfileDrv(tenant, id); err != nil {
return
}
cache.RemKey(utils.SupplierProfilePrefix+utils.ConcatenatedKey(tenant, id),
cacheCommit(transactionID), transactionID)
if withIndex {
return NewReqFilterIndexer(dm, utils.SupplierProfilePrefix, tenant).RemoveItemFromIndex(id)
}
return
}
@@ -1076,36 +1087,39 @@ func (dm *DataManager) SetAttributeProfile(ap *AttributeProfile, withIndex bool)
return err
}
//to be implemented in tests
// if withIndex {
// indexer := NewReqFilterIndexer(dm, utils.AttributeProfilePrefix, ap.Tenant)
// //remove old AttributeProfile indexes
// if err = indexer.RemoveItemFromIndex(ap.ID); err != nil &&
// err.Error() != utils.ErrNotFound.Error() {
// return
// }
// //Verify matching Filters for every FilterID from AttributeProfile
// for _, fltrID := range ap.FilterIDs {
// var fltr *Filter
// if fltr, err = dm.GetFilter(ap.Tenant, fltrID, false, utils.NonTransactional); err != nil {
// if err == utils.ErrNotFound {
// err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, ap)
// }
// return
// }
// indexer.IndexTPFilter(FilterToTPFilter(fltr), ap.ID)
// }
// if err = indexer.StoreIndexes(); err != nil {
// return
// }
// }
if withIndex {
indexer := NewReqFilterIndexer(dm, utils.AttributeProfilePrefix, ap.Tenant)
//remove old AttributeProfile indexes
if err = indexer.RemoveItemFromIndex(ap.ID); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
return
}
//Verify matching Filters for every FilterID from AttributeProfile
for _, fltrID := range ap.FilterIDs {
var fltr *Filter
if fltr, err = dm.GetFilter(ap.Tenant, fltrID, false, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, ap)
}
return
}
indexer.IndexTPFilter(FilterToTPFilter(fltr), ap.ID)
}
if err = indexer.StoreIndexes(); err != nil {
return
}
}
return
}
func (dm *DataManager) RemoveAttributeProfile(tenant, id, transactionID string) (err error) {
func (dm *DataManager) RemoveAttributeProfile(tenant, id, transactionID string, withIndex bool) (err error) {
if err = dm.DataDB().RemoveAttributeProfileDrv(tenant, id); err != nil {
return
}
cache.RemKey(utils.AttributeProfilePrefix+utils.ConcatenatedKey(tenant, id),
cacheCommit(transactionID), transactionID)
if withIndex {
return NewReqFilterIndexer(dm, utils.AttributeProfilePrefix, tenant).RemoveItemFromIndex(id)
}
return
}

View File

@@ -122,7 +122,7 @@ type DataDB interface {
RemoveFilterIndexesDrv(id string) (err error)
GetFilterReverseIndexesDrv(dbKey string, fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error)
SetFilterReverseIndexesDrv(dbKey string, indexes map[string]utils.StringMap) (err error)
RemoveFilterReverseIndexesDrv(dbKey, itemID string) (err error)
RemoveFilterReverseIndexesDrv(dbKey string) (err error)
MatchFilterIndexDrv(dbKey, fieldName, fieldVal string) (itemIDs utils.StringMap, err error)
GetStatQueueProfileDrv(tenant string, ID string) (sq *StatQueueProfile, err error)
SetStatQueueProfileDrv(sq *StatQueueProfile) (err error)

View File

@@ -1328,10 +1328,10 @@ func (ms *MapStorage) SetFilterReverseIndexesDrv(dbKey string, indexes map[strin
}
//RemoveFilterReverseIndexesDrv removes ReverseIndexes for a specific itemID
func (ms *MapStorage) RemoveFilterReverseIndexesDrv(dbKey, itemID string) (err error) {
func (ms *MapStorage) RemoveFilterReverseIndexesDrv(dbKey string) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
delete(ms.dict, utils.ConcatenatedKey(dbKey, itemID))
delete(ms.dict, dbKey)
return
}

View File

@@ -1974,7 +1974,7 @@ func (ms *MongoStorage) RemoveFilterIndexesDrv(id string) (err error) {
defer session.Close()
err = col.Remove(bson.M{"key": id})
if err == mgo.ErrNotFound {
err = utils.ErrNotFound
err = nil
}
return
}
@@ -2042,16 +2042,12 @@ func (ms *MongoStorage) SetFilterReverseIndexesDrv(dbKey string, revIdx map[stri
}
//RemoveFilterReverseIndexesDrv removes ReverseIndexes for a specific itemID
func (ms *MongoStorage) RemoveFilterReverseIndexesDrv(dbKey, itemID string) (err error) {
func (ms *MongoStorage) RemoveFilterReverseIndexesDrv(dbKey string) (err error) {
session, col := ms.conn(colRFI)
defer session.Close()
if itemID != "" {
findParam := fmt.Sprintf("value.%s", itemID)
return col.Update(bson.M{"key": dbKey}, bson.M{"$unset": bson.M{findParam: 1}})
}
err = col.Remove(bson.M{"key": dbKey})
if err == mgo.ErrNotFound {
err = utils.ErrNotFound
err = nil
}
return
}

View File

@@ -1411,10 +1411,7 @@ func (rs *RedisStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]uti
}
func (rs *RedisStorage) RemoveFilterIndexesDrv(id string) (err error) {
if err = rs.Cmd("DEL", id).Err; err != nil {
return err
}
return
return rs.Cmd("DEL", id).Err
}
//GetFilterReverseIndexesDrv retrieves ReverseIndexes from dataDB
@@ -1482,10 +1479,7 @@ func (rs *RedisStorage) SetFilterReverseIndexesDrv(dbKey string, revIdx map[stri
}
//RemoveFilterReverseIndexesDrv removes ReverseIndexes for a specific itemID
func (rs *RedisStorage) RemoveFilterReverseIndexesDrv(dbKey, itemID string) (err error) {
if itemID != "" {
return rs.Cmd("HDEL", dbKey, itemID).Err
}
func (rs *RedisStorage) RemoveFilterReverseIndexesDrv(dbKey string) (err error) {
return rs.Cmd("DEL", dbKey).Err
}

View File

@@ -2843,7 +2843,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
log.Print("ResourceProfiles:")
}
for _, tpRsp := range tpr.resProfiles {
if err = tpr.dm.RemoveResourceProfile(tpRsp.Tenant, tpRsp.ID, utils.NonTransactional); err != nil {
if err = tpr.dm.RemoveResourceProfile(tpRsp.Tenant, tpRsp.ID, utils.NonTransactional, false); err != nil {
return err
}
if verbose {
@@ -2865,7 +2865,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
log.Print("StatQueueProfiles:")
}
for _, tpST := range tpr.sqProfiles {
if err = tpr.dm.RemoveStatQueueProfile(tpST.Tenant, tpST.ID, utils.NonTransactional); err != nil {
if err = tpr.dm.RemoveStatQueueProfile(tpST.Tenant, tpST.ID, utils.NonTransactional, false); err != nil {
return err
}
if verbose {
@@ -2887,7 +2887,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
log.Print("ThresholdProfiles:")
}
for _, tpTH := range tpr.thProfiles {
if err = tpr.dm.RemoveThresholdProfile(tpTH.Tenant, tpTH.ID, utils.NonTransactional); err != nil {
if err = tpr.dm.RemoveThresholdProfile(tpTH.Tenant, tpTH.ID, utils.NonTransactional, false); err != nil {
return err
}
if verbose {
@@ -2910,7 +2910,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
log.Print("SupplierProfiles:")
}
for _, tpTH := range tpr.sppProfiles {
if err = tpr.dm.RemoveSupplierProfile(tpTH.Tenant, tpTH.ID, utils.NonTransactional); err != nil {
if err = tpr.dm.RemoveSupplierProfile(tpTH.Tenant, tpTH.ID, utils.NonTransactional, false); err != nil {
return err
}
if verbose {