Update methods for FilterIndexer

This commit is contained in:
TeoV
2017-12-19 17:35:36 +02:00
committed by Dan Christian Bogos
parent d3fb783d11
commit 8a42c224dd
11 changed files with 104 additions and 121 deletions

View File

@@ -1,4 +1,3 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
@@ -23,14 +22,14 @@ import (
"testing"
"time"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
var (
atrPs AttributeProfiles
srv AttributeService
dmAtr *DataManager
atrPs AttributeProfiles
srv AttributeService
dmAtr *DataManager
)
var sTestsAttributes = []func(t *testing.T){
@@ -38,13 +37,14 @@ var sTestsAttributes = []func(t *testing.T){
testAttributeMatchingAttributeProfilesForEvent,
testAttributeProfileForEvent,
testAttributeProcessEvent,
}
}
func TestAttributes(t *testing.T) {
for _, stest := range sTestsAttributes {
t.Run("Test Suppliers", stest)
}
}
//.matchingAttributeProfilesForEvent
//.attributeProfileForEvent
//.processEvent
@@ -55,7 +55,7 @@ func testPopulateAttrService(t *testing.T) {
second := 1 * time.Second
data, _ := NewMapStorage()
dmAtr = NewDataManager(data)
context :=utils.MetaRating
context := utils.MetaRating
attrMap := make(map[string]map[string]*Attribute)
attrMap["FL1"] = make(map[string]*Attribute)
attrMap["FL1"]["In1"] = &Attribute{
@@ -65,33 +65,33 @@ func testPopulateAttrService(t *testing.T) {
Append: true,
}
atrPs= AttributeProfiles{
&AttributeProfile{
Tenant: "cgrates.org",
ID: "attributeprofile1",
Context: context,
FilterIDs: []string{"filter1"},
ActivationInterval: &utils.ActivationInterval{
atrPs = AttributeProfiles{
&AttributeProfile{
Tenant: "cgrates.org",
ID: "attributeprofile1",
Context: context,
FilterIDs: []string{"filter1"},
ActivationInterval: &utils.ActivationInterval{
ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(),
ExpiryTime: time.Now().Add(time.Duration(20 * time.Minute)).Local(),
},
Attributes: attrMap,
Weight: 20,
Attributes: attrMap,
Weight: 20,
},
&AttributeProfile{
Tenant: "cgrates.org",
ID: "attributeprofile2",
Context: context,
FilterIDs: []string{"filter2"},
ActivationInterval: &utils.ActivationInterval{
&AttributeProfile{
Tenant: "cgrates.org",
ID: "attributeprofile2",
Context: context,
FilterIDs: []string{"filter2"},
ActivationInterval: &utils.ActivationInterval{
ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(),
ExpiryTime: time.Now().Add(time.Duration(20 * time.Minute)).Local(),
},
Attributes: attrMap,
Weight: 20,
Attributes: attrMap,
Weight: 20,
},
}
x, err := NewRequestFilter(MetaString, "attributeprofile1", []string{"Attribute"})
if err != nil {
t.Errorf("Error: %+v", err)
@@ -117,8 +117,8 @@ func testPopulateAttrService(t *testing.T) {
filter2 := &Filter{Tenant: config.CgrConfig().DefaultTenant, ID: "filter2", RequestFilters: filters2}
dmAtr.SetFilter(filter1)
dmAtr.SetFilter(filter2)
srv= AttributeService{
dm:dmAtr,
srv = AttributeService{
dm: dmAtr,
filterS: &FilterS{dm: dmAtr},
indexedFields: []string{"attributeprofile1", "attributeprofile2"},
}
@@ -130,83 +130,83 @@ func testPopulateAttrService(t *testing.T) {
ev["PddInterval"] = "1s"
ev["Weight"] = "20.0"
sev = &utils.CGREvent{
Tenant: "cgrates.org",
ID: "attribute_event",
Context:&context,
Event: ev,
Tenant: "cgrates.org",
ID: "attribute_event",
Context: &context,
Event: ev,
}
for _, atr := range atrPs {
dmAtr.SetAttributeProfile(atr)
dmAtr.SetAttributeProfile(atr)
}
prefix := utils.ConcatenatedKey(sev.Tenant, *sev.Context)
ref := NewReqFilterIndexer(dmAtr, utils.AttributeProfilePrefix,prefix)
ref := NewReqFilterIndexer(dmAtr, utils.AttributeProfilePrefix, prefix)
ref.IndexFilters("attributeprofile1", filter1)
ref.IndexFilters("attributeprofile2", filter2)
err = ref.StoreIndexes(true)
err = ref.StoreIndexes()
if err != nil {
t.Errorf("Error: %+v", err)
}
}
func testAttributeMatchingAttributeProfilesForEvent(t *testing.T) {
atrpl,err:=srv.matchingAttributeProfilesForEvent(sev)
atrpl, err := srv.matchingAttributeProfilesForEvent(sev)
if err != nil {
t.Errorf("Error: %+v", err)
}
if !reflect.DeepEqual(atrPs[0], atrpl[0])&&!reflect.DeepEqual(atrPs[0], atrpl[1]) {
if !reflect.DeepEqual(atrPs[0], atrpl[0]) && !reflect.DeepEqual(atrPs[0], atrpl[1]) {
t.Errorf("Expecting: %+v, received: %+v", atrPs[0], atrpl[0])
} else if !reflect.DeepEqual(atrPs[1], atrpl[1])&&!reflect.DeepEqual(atrPs[1], atrpl[0]) {
} else if !reflect.DeepEqual(atrPs[1], atrpl[1]) && !reflect.DeepEqual(atrPs[1], atrpl[0]) {
t.Errorf("Expecting: %+v, received: %+v", atrPs[1], atrpl[1])
}
}
func testAttributeProfileForEvent(t *testing.T) {
context:=utils.MetaRating
context := utils.MetaRating
ev := make(map[string]interface{})
ev["attributeprofile1"] = "Attribute"
ev["UsageInterval"] = "1s"
ev["Weight"] = "9.0"
sev = &utils.CGREvent{
Tenant: "cgrates.org",
ID: "attribute_event",
Tenant: "cgrates.org",
ID: "attribute_event",
Context: &context,
Event: ev,
Event: ev,
}
atrpl,err:=srv.attributeProfileForEvent(sev)
atrpl, err := srv.attributeProfileForEvent(sev)
if err != nil {
t.Errorf("Error: %+v", err)
}
if !reflect.DeepEqual(atrPs[0], atrpl)&&!reflect.DeepEqual(atrPs[1], atrpl) {
if !reflect.DeepEqual(atrPs[0], atrpl) && !reflect.DeepEqual(atrPs[1], atrpl) {
t.Errorf("Expecting: %+v, received: %+v", atrPs[0], atrpl)
}
}
func testAttributeProcessEvent(t *testing.T) {
context:=utils.MetaRating
context := utils.MetaRating
ev := make(map[string]interface{})
ev["attributeprofile1"] = "Attribute"
ev["UsageInterval"] = "1s"
ev["Weight"] = "9.0"
sev = &utils.CGREvent{
Tenant: "cgrates.org",
ID: "attribute_event",
Tenant: "cgrates.org",
ID: "attribute_event",
Context: &context,
Event: ev,
Event: ev,
}
eRply := &AttrSProcessEventReply{
MatchedProfile: "attributeprofile1",
CGREvent: sev,
CGREvent: sev,
}
atrpl,err:=srv.processEvent(sev)
atrpl, err := srv.processEvent(sev)
if err != nil {
t.Errorf("Error: %+v", err)
}
if !reflect.DeepEqual(eRply.MatchedProfile, atrpl.MatchedProfile) {
t.Errorf("Expecting: %+v, received: %+v", eRply.MatchedProfile, atrpl.MatchedProfile)
}else if !reflect.DeepEqual(eRply.AlteredFields, atrpl.AlteredFields) {
} else if !reflect.DeepEqual(eRply.AlteredFields, atrpl.AlteredFields) {
t.Errorf("Expecting: %+v, received: %+v", eRply.AlteredFields, atrpl.AlteredFields)
}else if !reflect.DeepEqual(eRply.CGREvent, atrpl.CGREvent) {
} else if !reflect.DeepEqual(eRply.CGREvent, atrpl.CGREvent) {
t.Errorf("Expecting: %+v, received: %+v", eRply.CGREvent, atrpl.CGREvent)
}
}
}

View File

@@ -403,7 +403,7 @@ func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool)
GetDBIndexKey(thdsIndexers.itemType, thdsIndexers.dbKeySuffix, true),
fldNameVal); rcvErr != nil {
if rcvErr.Error() == utils.ErrNotFound.Error() {
if err = thdsIndexers.StoreIndexes(false); err != nil {
if err = thdsIndexers.StoreIndexes(); err != nil {
return
}
} else {
@@ -414,12 +414,12 @@ func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool)
th.Tenant).RemoveItemFromIndex(th.ID); err != nil {
return
}
if err = thdsIndexers.StoreIndexes(false); err != nil {
if err = thdsIndexers.StoreIndexes(); err != nil {
return
}
}
}
if err = thdsIndexers.StoreIndexes(false); err != nil {
if err = thdsIndexers.StoreIndexes(); err != nil {
return
}
}
@@ -891,8 +891,8 @@ func (dm *DataManager) GetFilterIndexes(dbKey string, fldNameVal map[string]stri
return dm.DataDB().GetFilterIndexesDrv(dbKey, fldNameVal)
}
func (dm *DataManager) SetFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
return dm.DataDB().SetFilterIndexesDrv(dbKey, indexes, update)
func (dm *DataManager) SetFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) {
return dm.DataDB().SetFilterIndexesDrv(dbKey, indexes)
}
func (dm *DataManager) RemoveFilterIndexes(dbKey string) (err error) {
@@ -903,8 +903,8 @@ func (dm *DataManager) GetFilterReverseIndexes(dbKey string, fldNameVal map[stri
return dm.DataDB().GetFilterReverseIndexesDrv(dbKey, fldNameVal)
}
func (dm *DataManager) SetFilterReverseIndexes(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
return dm.DataDB().SetFilterReverseIndexesDrv(dbKey, indexes, update)
func (dm *DataManager) SetFilterReverseIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) {
return dm.DataDB().SetFilterReverseIndexesDrv(dbKey, indexes)
}
func (dm *DataManager) RemoveFilterReverseIndexes(dbKey, itemID string) (err error) {

View File

@@ -138,15 +138,15 @@ func (rfi *ReqFilterIndexer) IndexTPFilter(tpFltr *utils.TPFilterProfile, itemID
}
// StoreIndexes handles storing the indexes to dataDB
func (rfi *ReqFilterIndexer) StoreIndexes(update bool) (err error) {
func (rfi *ReqFilterIndexer) StoreIndexes() (err error) {
if err = rfi.dm.SetFilterIndexes(
GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false),
rfi.indexes, update); err != nil {
rfi.indexes); err != nil {
return
}
return rfi.dm.SetFilterReverseIndexes(
GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, true),
rfi.reveseIndex, update)
rfi.reveseIndex)
}
//Populate the ReqFilterIndexer.reveseIndex for specifil itemID
@@ -207,7 +207,7 @@ func (rfi *ReqFilterIndexer) RemoveItemFromIndex(itemID string) (err error) {
}
}
}
if err = rfi.StoreIndexes(true); err != nil {
if err = rfi.StoreIndexes(); err != nil {
return
}
if err = rfi.dm.RemoveFilterReverseIndexes(

View File

@@ -229,8 +229,7 @@ func testOnStorITSetFilterIndexes(t *testing.T) {
},
}
if err := onStor.SetFilterIndexes(
GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
idxes, false); err != nil {
GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), idxes); err != nil {
t.Error(err)
}
}
@@ -304,8 +303,7 @@ func testOnStorITGetFilterIndexes(t *testing.T) {
// t.Errorf("Expecting: %+v, received: %+v", eIdxes, idxes)
}
if err := onStor.SetFilterIndexes(
GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
eIdxes, false); err != nil {
GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), eIdxes); err != nil {
t.Error(err)
}
}

View File

@@ -118,10 +118,10 @@ type DataDB interface {
GetLoadHistory(int, bool, string) ([]*utils.LoadInstance, error)
AddLoadHistory(*utils.LoadInstance, int, string) error
GetFilterIndexesDrv(dbKey string, fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error)
SetFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error)
SetFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap) (err error)
RemoveFilterIndexesDrv(id string) (err error)
GetFilterReverseIndexesDrv(dbKey string, fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error)
SetFilterReverseIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error)
SetFilterReverseIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap) (err error)
RemoveFilterReverseIndexesDrv(dbKey, itemID string) (err error)
MatchFilterIndexDrv(dbKey, fieldName, fieldVal string) (itemIDs utils.StringMap, err error)
GetStatQueueProfileDrv(tenant string, ID string) (sq *StatQueueProfile, err error)

View File

@@ -22,7 +22,6 @@ import (
"bytes"
"compress/zlib"
"errors"
"fmt"
"io/ioutil"
"strings"
"sync"
@@ -1289,7 +1288,7 @@ func (ms *MapStorage) GetFilterIndexesDrv(dbKey string,
return
}
func (ms *MapStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
func (ms *MapStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(indexes)
@@ -1341,7 +1340,7 @@ func (ms *MapStorage) GetFilterReverseIndexesDrv(dbKey string,
return
}
func (ms *MapStorage) SetFilterReverseIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
func (ms *MapStorage) SetFilterReverseIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(indexes)
@@ -1546,7 +1545,6 @@ func (ms *MapStorage) SetFilterDrv(r *Filter) (err error) {
if err != nil {
return err
}
fmt.Printf("Setsfilter with Tenant:%+v ID:%+v \n", r.Tenant, r.ID)
ms.dict[utils.FilterPrefix+utils.ConcatenatedKey(r.Tenant, r.ID)] = result
return
}

View File

@@ -1963,28 +1963,21 @@ func (ms *MongoStorage) GetFilterIndexesDrv(dbKey string,
return result.Value, nil
}
func (ms *MongoStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
func (ms *MongoStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) {
session, col := ms.conn(colRFI)
defer session.Close()
if update {
for k, v := range indexes {
for k2, v2 := range v {
findParam2 := fmt.Sprintf("value.%s.%s", k, k2)
if len(v2) != 0 {
for k3 := range v2 {
err = col.Update(bson.M{"key": dbKey}, bson.M{"$set": bson.M{findParam2: bson.M{k3: true}}})
}
} else {
err = col.Update(bson.M{"key": dbKey}, bson.M{"$unset": bson.M{findParam2: 1}})
}
for k, v := range indexes {
for k2, v2 := range v {
findParam2 := fmt.Sprintf("value.%s.%s", k, k2)
if len(v2) == 0 {
err = col.Update(bson.M{"key": dbKey}, bson.M{"$unset": bson.M{findParam2: 1}})
}
}
} else {
_, err = col.Upsert(bson.M{"key": dbKey}, &struct {
Key string
Value map[string]map[string]utils.StringMap
}{dbKey, indexes})
}
_, err = col.Upsert(bson.M{"key": dbKey}, &struct {
Key string
Value map[string]map[string]utils.StringMap
}{dbKey, indexes})
return
}
@@ -2031,28 +2024,21 @@ func (ms *MongoStorage) GetFilterReverseIndexesDrv(dbKey string,
return result.Value, nil
}
func (ms *MongoStorage) SetFilterReverseIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
func (ms *MongoStorage) SetFilterReverseIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) {
session, col := ms.conn(colRFI)
defer session.Close()
if update {
for k, v := range indexes {
for k2, v2 := range v {
findParam2 := fmt.Sprintf("value.%s.%s", k, k2)
if len(v2) != 0 {
for k3 := range v2 {
err = col.Update(bson.M{"key": dbKey}, bson.M{"$set": bson.M{findParam2: bson.M{k3: true}}})
}
} else {
err = col.Update(bson.M{"key": dbKey}, bson.M{"$unset": bson.M{findParam2: 1}})
}
for k, v := range indexes {
for k2, v2 := range v {
findParam2 := fmt.Sprintf("value.%s.%s", k, k2)
if len(v2) == 0 {
err = col.Update(bson.M{"key": dbKey}, bson.M{"$unset": bson.M{findParam2: 1}})
}
}
} else {
_, err = col.Upsert(bson.M{"key": dbKey}, &struct {
Key string
Value map[string]map[string]utils.StringMap
}{dbKey, indexes})
}
_, err = col.Upsert(bson.M{"key": dbKey}, &struct {
Key string
Value map[string]map[string]utils.StringMap
}{dbKey, indexes})
return
}

View File

@@ -1412,7 +1412,7 @@ func (rs *RedisStorage) GetFilterIndexesDrv(dbKey string,
return
}
func (rs *RedisStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
func (rs *RedisStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) {
mp := make(map[string]string)
for fldName, fldValMp := range indexes {
for fldVal, strMp := range fldValMp {
@@ -1484,7 +1484,7 @@ func (rs *RedisStorage) GetFilterReverseIndexesDrv(dbKey string,
return
}
func (rs *RedisStorage) SetFilterReverseIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
func (rs *RedisStorage) SetFilterReverseIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) {
mp := make(map[string]string)
for fldName, fldValMp := range indexes {
for _, strMp := range fldValMp {

View File

@@ -275,7 +275,7 @@ func TestSuppliersPopulateSupplierService(t *testing.T) {
ref := NewReqFilterIndexer(dmspl, utils.SupplierProfilePrefix, "cgrates.org")
ref.IndexFilters("supplierprofile1", filter3)
ref.IndexFilters("supplierprofile2", filter4)
err = ref.StoreIndexes(false)
err = ref.StoreIndexes()
if err != nil {
t.Errorf("Error: %+v", err)
}

View File

@@ -2335,7 +2335,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("Indexing resource profiles")
}
for tenant, fltrIdxer := range tpr.resIndexers {
if err := fltrIdxer.StoreIndexes(false); err != nil {
if err := fltrIdxer.StoreIndexes(); err != nil {
return err
}
if verbose {
@@ -2350,7 +2350,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("StatQueue filter indexes:")
}
for tenant, fltrIdxer := range tpr.sqpIndexers {
if err := fltrIdxer.StoreIndexes(false); err != nil {
if err := fltrIdxer.StoreIndexes(); err != nil {
return err
}
if verbose {
@@ -2365,7 +2365,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("Threshold filter indexes:")
}
for tenant, fltrIdxer := range tpr.thdsIndexers {
if err := fltrIdxer.StoreIndexes(false); err != nil {
if err := fltrIdxer.StoreIndexes(); err != nil {
return err
}
if verbose {
@@ -2380,7 +2380,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("Indexing Supplier Profiles")
}
for tenant, fltrIdxer := range tpr.sppIndexers {
if err := fltrIdxer.StoreIndexes(false); err != nil {
if err := fltrIdxer.StoreIndexes(); err != nil {
return err
}
if verbose {
@@ -2395,7 +2395,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("Indexing Attribute Profiles")
}
for tntCntx, fltrIdxer := range tpr.attrIndexers {
if err := fltrIdxer.StoreIndexes(false); err != nil {
if err := fltrIdxer.StoreIndexes(); err != nil {
return err
}
if verbose {