Update ComputeFilterIndexer to consider transactionID

This commit is contained in:
TeoV
2018-03-01 06:49:07 -05:00
committed by Dan Christian Bogos
parent 6b8aaf1da4
commit 9c5b611ea9
3 changed files with 123 additions and 154 deletions

View File

@@ -377,18 +377,103 @@ func (self *ApierV1) ComputeFilterIndexes(args utils.ArgsComputeFilterIndexes, r
return nil
}
func (self *ApierV1) ComputeFilterIndexes(args utils.ArgsComputeFilterIndexes, reply *string) error {
transactionID := utils.GenUUID()
//ThresholdProfile Indexes
thdsIndexers, err := self.computeThresholdIndexes(args.Tenant, args.ThresholdIDs, transactionID)
if err != nil {
return utils.APIErrorHandler(err)
}
//StatQueueProfile Indexes
sqpIndexers, err := self.computeStatIndexes(args.Tenant, args.StatIDs, transactionID)
if err != nil {
return utils.APIErrorHandler(err)
}
//ResourceProfile Indexes
rsIndexes, err := self.computeResourceIndexes(args.Tenant, args.ResourceIDs, transactionID)
if err != nil {
return utils.APIErrorHandler(err)
}
//SupplierProfile Indexes
sppIndexes, err := self.computeSupplierIndexes(args.Tenant, args.SupplierIDs, transactionID)
if err != nil {
return utils.APIErrorHandler(err)
}
//AttributeProfile Indexes
attrIndexes, err := self.computeAttributeIndexes(args.Tenant, args.AttributeIDs, transactionID)
if err != nil {
return utils.APIErrorHandler(err)
}
//Now we move from tmpKey to the right key for each type
//ThresholdProfile Indexes
if thdsIndexers != nil {
if err := thdsIndexers.StoreIndexes(true, transactionID); err != nil {
for _, id := range *args.ThresholdIDs {
if err := thdsIndexers.RemoveItemFromIndex(id); err != nil {
return err
}
}
return err
}
}
//StatQueueProfile Indexes
if sqpIndexers != nil {
if err := sqpIndexers.StoreIndexes(true, transactionID); err != nil {
for _, id := range *args.StatIDs {
if err := thdsIndexers.RemoveItemFromIndex(id); err != nil {
return err
}
}
return err
}
}
//ResourceProfile Indexes
if rsIndexes != nil {
if err := rsIndexes.StoreIndexes(true, transactionID); err != nil {
for _, id := range *args.ResourceIDs {
if err := thdsIndexers.RemoveItemFromIndex(id); err != nil {
return err
}
}
return err
}
}
//SupplierProfile Indexes
if sppIndexes != nil {
if err := sppIndexes.StoreIndexes(true, transactionID); err != nil {
for _, id := range *args.SupplierIDs {
if err := thdsIndexers.RemoveItemFromIndex(id); err != nil {
return err
}
}
return err
}
}
//AttributeProfile Indexes
if attrIndexes != nil {
if err := attrIndexes.StoreIndexes(true, transactionID); err != nil {
for _, id := range *args.AttributeIDs {
if err := thdsIndexers.RemoveItemFromIndex(id); err != nil {
return err
}
}
return err
}
}
*reply = utils.OK
return nil
}
func (self *ApierV1) computeThresholdIndexes(tenant string, thIDs *[]string, transactionID string) (filterIndexer *engine.FilterIndexer, err error) {
var thresholdIDs []string
thdsIndexers := engine.NewFilterIndexer(self.DataManager,
utils.ThresholdProfilePrefix, tenant)
thdsIndexers := engine.NewFilterIndexer(self.DataManager, utils.ThresholdProfilePrefix, tenant)
if thIDs == nil {
ids, err := self.DataManager.DataDB().GetKeysForPrefix(utils.ThresholdProfilePrefix)
if err != nil {
return nil, err
}
for _, id := range ids {
thresholdIDs = append(thresholdIDs,
strings.Split(id, utils.CONCATENATED_KEY_SEP)[1])
thresholdIDs = append(thresholdIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1])
}
} else {
thresholdIDs = *thIDs
@@ -397,11 +482,7 @@ func (self *ApierV1) computeThresholdIndexes(tenant string, thIDs *[]string, tra
for _, id := range thresholdIDs {
th, err := self.DataManager.GetThresholdProfile(tenant, id, false, utils.NonTransactional)
if err != nil {
<<<<<<< HEAD
return nil, err
=======
return err
>>>>>>> Indexes with tmp_ key will overwrite existing one
}
fltrIDs := make([]string, len(th.FilterIDs))
for i, fltrID := range th.FilterIDs {
@@ -427,19 +508,11 @@ func (self *ApierV1) computeThresholdIndexes(tenant string, thIDs *[]string, tra
} else if strings.HasPrefix(fltrID, utils.Meta) {
inFltr, err := engine.NewInlineFilter(fltrID)
if err != nil {
<<<<<<< HEAD
return nil, err
}
fltr, err = inFltr.AsFilter(th.Tenant)
if err != nil {
return nil, err
=======
return err
}
fltr, err = inFltr.AsFilter(th.Tenant)
if err != nil {
return err
>>>>>>> Indexes with tmp_ key will overwrite existing one
}
} else if fltr, err = self.DataManager.GetFilter(th.Tenant, fltrID,
false, utils.NonTransactional); err != nil {
@@ -463,23 +536,12 @@ func (self *ApierV1) computeThresholdIndexes(tenant string, thIDs *[]string, tra
return nil, err
}
}
<<<<<<< HEAD
return thdsIndexers, nil
=======
if err := thdsIndexers.StoreIndexes(transactionID); err != nil {
return err
}
if err := thdsIndexers.StoreIndexes(utils.NonTransactional); err != nil {
return err
}
return nil
>>>>>>> Indexes with tmp_ key will overwrite existing one
}
func (self *ApierV1) computeAttributeIndexes(tenant string, attrIDs *[]string, transactionID string) (filterIndexer *engine.FilterIndexer, err error) {
var attributeIDs []string
attrIndexers := engine.NewFilterIndexer(self.DataManager,
utils.AttributeProfilePrefix, tenant)
attrIndexers := engine.NewFilterIndexer(self.DataManager, utils.AttributeProfilePrefix, tenant)
if attrIDs == nil {
ids, err := self.DataManager.DataDB().GetKeysForPrefix(utils.AttributeProfilePrefix)
if err != nil {
@@ -493,14 +555,9 @@ func (self *ApierV1) computeAttributeIndexes(tenant string, attrIDs *[]string, t
transactionID = utils.NonTransactional
}
for _, id := range attributeIDs {
ap, err := self.DataManager.GetAttributeProfile(tenant, id,
false, utils.NonTransactional)
ap, err := self.DataManager.GetAttributeProfile(tenant, id, false, utils.NonTransactional)
if err != nil {
<<<<<<< HEAD
return nil, err
=======
return err
>>>>>>> Indexes with tmp_ key will overwrite existing one
}
fltrIDs := make([]string, len(ap.FilterIDs))
for i, fltrID := range ap.FilterIDs {
@@ -526,19 +583,11 @@ func (self *ApierV1) computeAttributeIndexes(tenant string, attrIDs *[]string, t
} else if strings.HasPrefix(fltrID, utils.Meta) {
inFltr, err := engine.NewInlineFilter(fltrID)
if err != nil {
<<<<<<< HEAD
return nil, err
}
fltr, err = inFltr.AsFilter(ap.Tenant)
if err != nil {
return nil, err
=======
return err
}
fltr, err = inFltr.AsFilter(ap.Tenant)
if err != nil {
return err
>>>>>>> Indexes with tmp_ key will overwrite existing one
}
} else if fltr, err = self.DataManager.GetFilter(ap.Tenant, fltrID,
false, utils.NonTransactional); err != nil {
@@ -551,7 +600,6 @@ func (self *ApierV1) computeAttributeIndexes(tenant string, attrIDs *[]string, t
attrIndexers.IndexTPFilter(engine.FilterToTPFilter(fltr), ap.ID)
}
}
}
if transactionID == utils.NonTransactional {
if err := attrIndexers.StoreIndexes(true, transactionID); err != nil {
@@ -563,23 +611,12 @@ func (self *ApierV1) computeAttributeIndexes(tenant string, attrIDs *[]string, t
return nil, err
}
}
<<<<<<< HEAD
return attrIndexers, nil
=======
if err := attrIndexers.StoreIndexes(transactionID); err != nil {
return err
}
if err := attrIndexers.StoreIndexes(utils.NonTransactional); err != nil {
return err
}
return nil
>>>>>>> Indexes with tmp_ key will overwrite existing one
}
func (self *ApierV1) computeResourceIndexes(tenant string, rsIDs *[]string, transactionID string) (filterIndexer *engine.FilterIndexer, err error) {
var resourceIDs []string
rpIndexers := engine.NewFilterIndexer(self.DataManager,
utils.ResourceProfilesPrefix, tenant)
rpIndexers := engine.NewFilterIndexer(self.DataManager, utils.ResourceProfilesPrefix, tenant)
if rsIDs == nil {
ids, err := self.DataManager.DataDB().GetKeysForPrefix(utils.ResourceProfilesPrefix)
if err != nil {
@@ -593,14 +630,9 @@ func (self *ApierV1) computeResourceIndexes(tenant string, rsIDs *[]string, tran
transactionID = utils.NonTransactional
}
for _, id := range resourceIDs {
rp, err := self.DataManager.GetResourceProfile(tenant, id,
false, utils.NonTransactional)
rp, err := self.DataManager.GetResourceProfile(tenant, id, false, utils.NonTransactional)
if err != nil {
<<<<<<< HEAD
return nil, err
=======
return err
>>>>>>> Indexes with tmp_ key will overwrite existing one
}
fltrIDs := make([]string, len(rp.FilterIDs))
for i, fltrID := range rp.FilterIDs {
@@ -626,19 +658,11 @@ func (self *ApierV1) computeResourceIndexes(tenant string, rsIDs *[]string, tran
} else if strings.HasPrefix(fltrID, utils.Meta) {
inFltr, err := engine.NewInlineFilter(fltrID)
if err != nil {
<<<<<<< HEAD
return nil, err
}
fltr, err = inFltr.AsFilter(rp.Tenant)
if err != nil {
return nil, err
=======
return err
}
fltr, err = inFltr.AsFilter(rp.Tenant)
if err != nil {
return err
>>>>>>> Indexes with tmp_ key will overwrite existing one
}
} else if fltr, err = self.DataManager.GetFilter(rp.Tenant, fltrID,
false, utils.NonTransactional); err != nil {
@@ -662,23 +686,12 @@ func (self *ApierV1) computeResourceIndexes(tenant string, rsIDs *[]string, tran
return nil, err
}
}
<<<<<<< HEAD
return rpIndexers, nil
=======
if err := rpIndexers.StoreIndexes(transactionID); err != nil {
return err
}
if err := rpIndexers.StoreIndexes(utils.NonTransactional); err != nil {
return err
}
return nil
>>>>>>> Indexes with tmp_ key will overwrite existing one
}
func (self *ApierV1) computeStatIndexes(tenant string, stIDs *[]string, transactionID string) (filterIndexer *engine.FilterIndexer, err error) {
var statIDs []string
sqpIndexers := engine.NewFilterIndexer(self.DataManager,
utils.StatQueueProfilePrefix, tenant)
sqpIndexers := engine.NewFilterIndexer(self.DataManager, utils.StatQueueProfilePrefix, tenant)
if stIDs == nil {
ids, err := self.DataManager.DataDB().GetKeysForPrefix(utils.StatQueueProfilePrefix)
if err != nil {
@@ -692,14 +705,9 @@ func (self *ApierV1) computeStatIndexes(tenant string, stIDs *[]string, transact
transactionID = utils.NonTransactional
}
for _, id := range statIDs {
sqp, err := self.DataManager.GetStatQueueProfile(tenant, id,
false, utils.NonTransactional)
sqp, err := self.DataManager.GetStatQueueProfile(tenant, id, false, utils.NonTransactional)
if err != nil {
<<<<<<< HEAD
return nil, err
=======
return err
>>>>>>> Indexes with tmp_ key will overwrite existing one
}
fltrIDs := make([]string, len(sqp.FilterIDs))
for i, fltrID := range sqp.FilterIDs {
@@ -725,19 +733,11 @@ func (self *ApierV1) computeStatIndexes(tenant string, stIDs *[]string, transact
} else if strings.HasPrefix(fltrID, utils.Meta) {
inFltr, err := engine.NewInlineFilter(fltrID)
if err != nil {
<<<<<<< HEAD
return nil, err
}
fltr, err = inFltr.AsFilter(sqp.Tenant)
if err != nil {
return nil, err
=======
return err
}
fltr, err = inFltr.AsFilter(sqp.Tenant)
if err != nil {
return err
>>>>>>> Indexes with tmp_ key will overwrite existing one
}
} else if fltr, err = self.DataManager.GetFilter(sqp.Tenant, fltrID,
false, utils.NonTransactional); err != nil {
@@ -761,23 +761,12 @@ func (self *ApierV1) computeStatIndexes(tenant string, stIDs *[]string, transact
return nil, err
}
}
<<<<<<< HEAD
return sqpIndexers, nil
=======
if err := sqpIndexers.StoreIndexes(transactionID); err != nil {
return err
}
if err := sqpIndexers.StoreIndexes(utils.NonTransactional); err != nil {
return err
}
return nil
>>>>>>> Indexes with tmp_ key will overwrite existing one
}
func (self *ApierV1) computeSupplierIndexes(tenant string, sppIDs *[]string, transactionID string) (filterIndexer *engine.FilterIndexer, err error) {
var supplierIDs []string
sppIndexers := engine.NewFilterIndexer(self.DataManager,
utils.SupplierProfilePrefix, tenant)
sppIndexers := engine.NewFilterIndexer(self.DataManager, utils.SupplierProfilePrefix, tenant)
if sppIDs == nil {
ids, err := self.DataManager.DataDB().GetKeysForPrefix(utils.SupplierProfilePrefix)
if err != nil {
@@ -791,14 +780,9 @@ func (self *ApierV1) computeSupplierIndexes(tenant string, sppIDs *[]string, tra
transactionID = utils.NonTransactional
}
for _, id := range supplierIDs {
spp, err := self.DataManager.GetSupplierProfile(tenant, id,
false, utils.NonTransactional)
spp, err := self.DataManager.GetSupplierProfile(tenant, id, false, utils.NonTransactional)
if err != nil {
<<<<<<< HEAD
return nil, err
=======
return err
>>>>>>> Indexes with tmp_ key will overwrite existing one
}
fltrIDs := make([]string, len(spp.FilterIDs))
for i, fltrID := range spp.FilterIDs {
@@ -824,19 +808,11 @@ func (self *ApierV1) computeSupplierIndexes(tenant string, sppIDs *[]string, tra
} else if strings.HasPrefix(fltrID, utils.Meta) {
inFltr, err := engine.NewInlineFilter(fltrID)
if err != nil {
<<<<<<< HEAD
return nil, err
}
fltr, err = inFltr.AsFilter(spp.Tenant)
if err != nil {
return nil, err
=======
return err
}
fltr, err = inFltr.AsFilter(spp.Tenant)
if err != nil {
return err
>>>>>>> Indexes with tmp_ key will overwrite existing one
}
} else if fltr, err = self.DataManager.GetFilter(spp.Tenant, fltrID,
false, utils.NonTransactional); err != nil {
@@ -854,29 +830,11 @@ func (self *ApierV1) computeSupplierIndexes(tenant string, sppIDs *[]string, tra
if err := sppIndexers.StoreIndexes(true, transactionID); err != nil {
return nil, err
}
return nil, nil
return nil, err
} else {
if err := sppIndexers.StoreIndexes(false, transactionID); err != nil {
return nil, err
}
}
<<<<<<< HEAD
return sppIndexers, nil
=======
if err := sppIndexers.StoreIndexes(transactionID); err != nil {
if err := self.DataManager.RemoveFilterIndexes(engine.GetDBIndexKey(utils.SupplierProfilePrefix,
tenant, false)); err != nil {
return err
}
if err := self.DataManager.RemoveFilterReverseIndexes(engine.GetDBIndexKey(utils.SupplierProfilePrefix,
tenant, true)); err != nil {
return err
}
} else {
if err := sppIndexers.StoreIndexes(utils.NonTransactional); err != nil {
return err
}
}
return nil
>>>>>>> Indexes with tmp_ key will overwrite existing one
}

View File

@@ -253,6 +253,7 @@ func testV1FIdxComputeThresholdsIndexes(t *testing.T) {
tenant := "cgrates.org"
var reply2 string
emptySlice := []string{}
<<<<<<< HEAD
if err := tFIdxRpc.Call(utils.ApierV1ComputeFilterIndexes,
utils.ArgsComputeFilterIndexes{
Tenant: tenant,
@@ -262,6 +263,16 @@ func testV1FIdxComputeThresholdsIndexes(t *testing.T) {
StatIDs: &emptySlice,
SupplierIDs: &emptySlice,
}, &reply2); err != nil {
=======
if err := tFIdxRpc.Call(utils.ApierV1ComputeFilterIndexes, utils.ArgsComputeFilterIndexes{
Tenant: tenant,
ThresholdIDs: nil,
AttributeIDs: &emptySlice,
ResourceIDs: &emptySlice,
StatIDs: &emptySlice,
SupplierIDs: &emptySlice,
}, &reply2); err != nil {
>>>>>>> Update ComputeFilterIndexer to consider transactionID
t.Error(err)
}
if reply2 != utils.OK {

View File

@@ -37,24 +37,24 @@ var (
// subtests to be executed for each confDIR
var sTests = []func(t *testing.T){
// testITFlush,
// testITIsDBEmpty,
// testITSetFilterIndexes,
// testITGetFilterIndexes,
// testITMatchFilterIndex,
// testITFlush,
// testITIsDBEmpty,
// testITTestThresholdFilterIndexes,
// testITTestAttributeProfileFilterIndexes,
// testITTestThresholdInlineFilterIndexing,
testITFlush,
testITIsDBEmpty,
testITSetFilterIndexes,
testITGetFilterIndexes,
testITMatchFilterIndex,
testITFlush,
testITIsDBEmpty,
testITTestThresholdFilterIndexes,
testITTestAttributeProfileFilterIndexes,
testITTestThresholdInlineFilterIndexing,
testITFlush,
testITIsDBEmpty,
testITTestStoreFilterIndexesWithTransID,
testITTestStoreFilterIndexesWithTransID2,
// testITFlush,
// testITIsDBEmpty,
// testITTestIndexingWithEmptyFltrID,
// testITTestIndexingWithEmptyFltrID2,
testITFlush,
testITIsDBEmpty,
testITTestIndexingWithEmptyFltrID,
testITTestIndexingWithEmptyFltrID2,
}
func TestFilterIndexerITRedis(t *testing.T) {