Replaced FilterIndexes functions

This commit is contained in:
Trial97
2020-06-11 16:57:02 +03:00
committed by Dan Christian Bogos
parent ac814d98a2
commit 8d94a2d8dc
24 changed files with 310 additions and 897 deletions

View File

@@ -2218,132 +2218,6 @@ func (dm *DataManager) HasData(category, subject, tenant string) (has bool, err
return dm.DataDB().HasDataDrv(category, subject, tenant)
}
func (dm *DataManager) GetFilterIndexes(cacheID, itemIDPrefix, filterType string,
fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) {
if dm == nil {
err = utils.ErrNoDatabaseConn
return
}
if indexes, err = dm.DataDB().GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType, fldNameVal); err != nil {
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilterIndexes]; err == utils.ErrNotFound && itm.Remote {
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RmtConns, nil,
utils.ReplicatorSv1GetFilterIndexes,
&utils.GetFilterIndexesArgWithArgDispatcher{
GetFilterIndexesArg: &utils.GetFilterIndexesArg{
CacheID: cacheID,
ItemIDPrefix: itemIDPrefix,
FilterType: filterType,
FldNameVal: fldNameVal},
TenantArg: utils.TenantArg{Tenant: config.CgrConfig().GeneralCfg().DefaultTenant},
ArgDispatcher: &utils.ArgDispatcher{
APIKey: utils.StringPointer(itm.APIKey),
RouteID: utils.StringPointer(itm.RouteID)},
}, &indexes); err == nil {
err = dm.dataDB.SetFilterIndexesDrv(cacheID, itemIDPrefix, indexes, true, utils.NonTransactional)
}
}
if err != nil {
err = utils.CastRPCErr(err)
return nil, err
}
}
return
}
func (dm *DataManager) SetFilterIndexes(cacheID, itemIDPrefix string,
indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) {
if dm == nil {
err = utils.ErrNoDatabaseConn
return
}
if err = dm.DataDB().SetFilterIndexesDrv(cacheID, itemIDPrefix,
indexes, commit, transactionID); err != nil {
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilterIndexes]; itm.Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1SetFilterIndexes,
&utils.SetFilterIndexesArgWithArgDispatcher{
SetFilterIndexesArg: &utils.SetFilterIndexesArg{
CacheID: cacheID,
ItemIDPrefix: itemIDPrefix,
Indexes: indexes},
TenantArg: utils.TenantArg{Tenant: config.CgrConfig().GeneralCfg().DefaultTenant},
ArgDispatcher: &utils.ArgDispatcher{
APIKey: utils.StringPointer(itm.APIKey),
RouteID: utils.StringPointer(itm.RouteID)},
}, &reply); err != nil {
err = utils.CastRPCErr(err)
return
}
}
return
}
func (dm *DataManager) RemoveFilterIndexes(cacheID, itemIDPrefix string) (err error) {
if dm == nil {
err = utils.ErrNoDatabaseConn
return
}
if err = dm.DataDB().RemoveFilterIndexesDrv(cacheID, itemIDPrefix); err != nil {
return
}
return
}
func (dm *DataManager) MatchFilterIndex(cacheID, itemIDPrefix,
filterType, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) {
if dm == nil {
err = utils.ErrNoDatabaseConn
return
}
fieldValKey := utils.ConcatenatedKey(itemIDPrefix, filterType, fieldName, fieldVal)
if x, ok := Cache.Get(cacheID, fieldValKey); ok { // Attempt to find in cache first
if x == nil {
return nil, utils.ErrNotFound
}
return x.(utils.StringMap), nil
}
// Not found in cache, check in DB
itemIDs, err = dm.DataDB().MatchFilterIndexDrv(cacheID, itemIDPrefix, filterType, fieldName, fieldVal)
if err != nil {
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilterIndexes]; err == utils.ErrNotFound && itm.Remote {
err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RmtConns, nil,
utils.ReplicatorSv1MatchFilterIndex,
&utils.MatchFilterIndexArgWithArgDispatcher{MatchFilterIndexArg: &utils.MatchFilterIndexArg{
CacheID: cacheID,
ItemIDPrefix: itemIDPrefix,
FilterType: filterType,
FieldName: fieldName,
FieldVal: fieldVal,
},
TenantArg: utils.TenantArg{Tenant: config.CgrConfig().GeneralCfg().DefaultTenant},
ArgDispatcher: &utils.ArgDispatcher{
APIKey: utils.StringPointer(itm.APIKey),
RouteID: utils.StringPointer(itm.RouteID),
},
}, &itemIDs)
}
if err != nil {
err = utils.CastRPCErr(err)
if err == utils.ErrNotFound {
if errCh := Cache.Set(cacheID, fieldValKey, nil, nil,
true, utils.NonTransactional); errCh != nil {
return nil, errCh
}
}
return nil, err
}
}
if errCh := Cache.Set(cacheID, fieldValKey, itemIDs, nil,
true, utils.NonTransactional); errCh != nil {
return nil, errCh
}
return
}
func (dm *DataManager) GetRouteProfile(tenant, id string, cacheRead, cacheWrite bool,
transactionID string) (rpp *RouteProfile, err error) {
tntID := utils.ConcatenatedKey(tenant, id)

View File

@@ -28,7 +28,7 @@ import (
// MatchingItemIDsForEvent returns the list of item IDs matching fieldName/fieldValue for an event
// fieldIDs limits the fields which are checked against indexes
// helper on top of dataDB.MatchFilterIndex, adding utils.ANY to list of fields queried
// helper on top of dataDB.GetIndexes, adding utils.ANY to list of fields queried
func MatchingItemIDsForEvent(ev map[string]interface{}, stringFldIDs, prefixFldIDs *[]string,
dm *DataManager, cacheID, itemIDPrefix string, indexedSelects, nestedFields bool) (itemIDs utils.StringSet, err error) {
itemIDs = make(utils.StringSet)

View File

@@ -123,99 +123,102 @@ func testITIsDBEmpty(t *testing.T) {
}
func testITSetFilterIndexes(t *testing.T) {
idxes := map[string]utils.StringMap{
idxes := map[string]utils.StringSet{
"*string:Account:1001": {
"RL1": true,
"RL1": struct{}{},
},
"*string:Account:1002": {
"RL1": true,
"RL2": true,
"RL1": struct{}{},
"RL2": struct{}{},
},
"*string:Account:dan": {
"RL2": true,
"RL2": struct{}{},
},
"*string:Subject:dan": {
"RL2": true,
"RL3": true,
"RL2": struct{}{},
"RL3": struct{}{},
},
utils.ConcatenatedKey(utils.META_NONE, utils.ANY, utils.ANY): {
"RL4": true,
"RL5": true,
"RL4": struct{}{},
"RL5": struct{}{},
},
}
if err := dataManager.SetFilterIndexes(
utils.PrefixToIndexCache[utils.ResourceProfilesPrefix], "cgrates.org",
idxes, false, utils.NonTransactional); err != nil {
if err := dataManager.SetIndexes(utils.CacheResourceFilterIndexes,
"cgrates.org", idxes, false, utils.NonTransactional); err != nil {
t.Error(err)
}
}
func testITGetFilterIndexes(t *testing.T) {
eIdxes := map[string]utils.StringMap{
eIdxes := map[string]utils.StringSet{
"*string:Account:1001": {
"RL1": true,
"RL1": struct{}{},
},
"*string:Account:1002": {
"RL1": true,
"RL2": true,
"RL1": struct{}{},
"RL2": struct{}{},
},
"*string:Account:dan": {
"RL2": true,
"RL2": struct{}{},
},
"*string:Subject:dan": {
"RL2": true,
"RL3": true,
"RL2": struct{}{},
"RL3": struct{}{},
},
utils.ConcatenatedKey(utils.META_NONE, utils.ANY, utils.ANY): {
"RL4": true,
"RL5": true,
"RL4": struct{}{},
"RL5": struct{}{},
},
}
sbjDan := map[string]string{
"Subject": "dan",
}
expectedsbjDan := map[string]utils.StringMap{
expectedsbjDan := map[string]utils.StringSet{
"*string:Subject:dan": {
"RL2": true,
"RL3": true,
"RL2": struct{}{},
"RL3": struct{}{},
},
}
if exsbjDan, err := dataManager.GetFilterIndexes(
utils.PrefixToIndexCache[utils.ResourceProfilesPrefix],
"cgrates.org", utils.MetaString, sbjDan); err != nil {
if exsbjDan, err := dataManager.GetIndexes(
utils.CacheResourceFilterIndexes, "cgrates.org",
utils.ConcatenatedKey(utils.MetaString, "Subject", "dan"),
false, false); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expectedsbjDan, exsbjDan) {
t.Errorf("Expecting: %+v, received: %+v", expectedsbjDan, exsbjDan)
}
if rcv, err := dataManager.GetFilterIndexes(
utils.PrefixToIndexCache[utils.ResourceProfilesPrefix],
"cgrates.org", utils.EmptyString, nil); err != nil {
if rcv, err := dataManager.GetIndexes(
utils.CacheResourceFilterIndexes,
"cgrates.org", utils.EmptyString,
false, false); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eIdxes, rcv) {
t.Errorf("Expecting: %+v, received: %+v", eIdxes, rcv)
}
if _, err := dataManager.GetFilterIndexes("unknown_key", "unkonwn_tenant",
utils.EmptyString, nil); err == nil || err != utils.ErrNotFound {
if _, err := dataManager.GetIndexes(
"unknown_key", "unkonwn_tenant",
utils.EmptyString, false, false); err == nil || err != utils.ErrNotFound {
t.Error(err)
}
}
func testITMatchFilterIndex(t *testing.T) {
eMp := utils.StringMap{
"RL1": true,
"RL2": true,
eMp := map[string]utils.StringSet{
"*string:Account:1002": {
"RL1": struct{}{},
"RL2": struct{}{},
},
}
if rcvMp, err := dataManager.MatchFilterIndex(
if rcvMp, err := dataManager.GetIndexes(
utils.CacheResourceFilterIndexes, "cgrates.org",
utils.MetaString, "Account", "1002"); err != nil {
utils.ConcatenatedKey(utils.MetaString, "Account", "1002"),
false, true); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eMp, rcvMp) {
t.Errorf("Expecting: %+v, received: %+v", eMp, rcvMp)
}
if _, err := dataManager.MatchFilterIndex(
if _, err := dataManager.GetIndexes(
utils.CacheResourceFilterIndexes, "cgrates.org",
utils.MetaString, "NonexistentField", "1002"); err == nil ||
utils.ConcatenatedKey(utils.MetaString, "NonexistentField", "1002"),
true, true); err == nil ||
err != utils.ErrNotFound {
t.Error(err)
}
@@ -269,20 +272,19 @@ func testITTestThresholdFilterIndexes(t *testing.T) {
if err := dataManager.SetThresholdProfile(th2, true); err != nil {
t.Error(err)
}
eIdxes := map[string]utils.StringMap{
eIdxes := map[string]utils.StringSet{
"*string:EventType:Event1": {
"THD_Test": true,
"THD_Test2": true,
"THD_Test": struct{}{},
"THD_Test2": struct{}{},
},
"*string:EventType:Event2": {
"THD_Test": true,
"THD_Test2": true,
"THD_Test": struct{}{},
"THD_Test2": struct{}{},
},
}
rfi := NewFilterIndexer(onStor, utils.ThresholdProfilePrefix, th.Tenant)
if rcvIdx, err := dataManager.GetFilterIndexes(
utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix,
utils.EmptyString, nil); err != nil {
if rcvIdx, err := dataManager.GetIndexes(
utils.CacheThresholdFilterIndexes, th.Tenant,
utils.EmptyString, false, false); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eIdxes, rcvIdx) {
t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx)
@@ -314,23 +316,23 @@ func testITTestThresholdFilterIndexes(t *testing.T) {
if err := dataManager.SetThresholdProfile(cloneTh1, true); err != nil {
t.Error(err)
}
eIdxes = map[string]utils.StringMap{
eIdxes = map[string]utils.StringSet{
"*string:Account:1001": {
"THD_Test": true,
"THD_Test": struct{}{},
},
"*string:Account:1002": {
"THD_Test": true,
"THD_Test": struct{}{},
},
"*string:EventType:Event1": {
"THD_Test2": true,
"THD_Test2": struct{}{},
},
"*string:EventType:Event2": {
"THD_Test2": true,
"THD_Test2": struct{}{},
},
}
if rcvIdx, err := dataManager.GetFilterIndexes(
utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix,
utils.EmptyString, nil); err != nil {
if rcvIdx, err := dataManager.GetIndexes(
utils.CacheThresholdFilterIndexes, th.Tenant,
utils.EmptyString, false, false); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eIdxes, rcvIdx) {
t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx)
@@ -362,25 +364,25 @@ func testITTestThresholdFilterIndexes(t *testing.T) {
if err := dataManager.SetThresholdProfile(clone2Th1, true); err != nil {
t.Error(err)
}
eIdxes = map[string]utils.StringMap{
eIdxes = map[string]utils.StringSet{
"*string:Destination:10": {
"THD_Test": true,
"THD_Test": struct{}{},
},
"*string:Destination:20": {
"THD_Test": true,
"THD_Test": struct{}{},
},
"*string:EventType:Event1": {
"THD_Test": true,
"THD_Test2": true,
"THD_Test": struct{}{},
"THD_Test2": struct{}{},
},
"*string:EventType:Event2": {
"THD_Test": true,
"THD_Test2": true,
"THD_Test": struct{}{},
"THD_Test2": struct{}{},
},
}
if rcvIdx, err := dataManager.GetFilterIndexes(
utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix,
utils.EmptyString, nil); err != nil {
if rcvIdx, err := dataManager.GetIndexes(
utils.CacheThresholdFilterIndexes, th.Tenant,
utils.EmptyString, false, false); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eIdxes, rcvIdx) {
t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx)
@@ -394,9 +396,9 @@ func testITTestThresholdFilterIndexes(t *testing.T) {
th2.ID, utils.NonTransactional, true); err != nil {
t.Error(err)
}
if _, err := dataManager.GetFilterIndexes(
utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix,
utils.EmptyString, nil); err != utils.ErrNotFound {
if _, err := dataManager.GetIndexes(
utils.CacheThresholdFilterIndexes, th.Tenant,
utils.EmptyString, false, false); err != utils.ErrNotFound {
t.Error(err)
}
}
@@ -440,20 +442,19 @@ func testITTestAttributeProfileFilterIndexes(t *testing.T) {
if err := dataManager.SetAttributeProfile(attrProfile, true); err != nil {
t.Error(err)
}
eIdxes := map[string]utils.StringMap{
eIdxes := map[string]utils.StringSet{
"*string:EventType:Event1": {
"AttrPrf": true,
"AttrPrf": struct{}{},
},
"*string:EventType:Event2": {
"AttrPrf": true,
"AttrPrf": struct{}{},
},
}
for _, ctx := range attrProfile.Contexts {
rfi := NewFilterIndexer(onStor, utils.AttributeProfilePrefix,
utils.ConcatenatedKey(attrProfile.Tenant, ctx))
if rcvIdx, err := dataManager.GetFilterIndexes(
utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix,
utils.EmptyString, nil); err != nil {
if rcvIdx, err := dataManager.GetIndexes(
utils.CacheAttributeFilterIndexes,
utils.ConcatenatedKey(attrProfile.Tenant, ctx),
utils.EmptyString, false, false); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eIdxes, rcvIdx) {
t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx)
@@ -466,22 +467,20 @@ func testITTestAttributeProfileFilterIndexes(t *testing.T) {
t.Error(err)
}
//check indexes with the new context (con3)
rfi := NewFilterIndexer(onStor, utils.AttributeProfilePrefix,
utils.ConcatenatedKey(attrProfile.Tenant, "con3"))
if rcvIdx, err := dataManager.GetFilterIndexes(
utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix,
utils.EmptyString, nil); err != nil {
if rcvIdx, err := dataManager.GetIndexes(
utils.CacheAttributeFilterIndexes,
utils.ConcatenatedKey(attrProfile.Tenant, "con3"),
utils.EmptyString, false, false); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eIdxes, rcvIdx) {
t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx)
}
//check if old contexts was delete
for _, ctx := range []string{"con1", "con2"} {
rfi := NewFilterIndexer(onStor, utils.AttributeProfilePrefix,
utils.ConcatenatedKey(attrProfile.Tenant, ctx))
if _, err := dataManager.GetFilterIndexes(
utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix,
utils.EmptyString, nil); err != nil && err != utils.ErrNotFound {
if _, err := dataManager.GetIndexes(
utils.CacheAttributeFilterIndexes,
utils.ConcatenatedKey(attrProfile.Tenant, ctx),
utils.EmptyString, false, false); err != nil && err != utils.ErrNotFound {
t.Error(err)
}
}
@@ -491,11 +490,10 @@ func testITTestAttributeProfileFilterIndexes(t *testing.T) {
t.Error(err)
}
//check if index is removed
rfi = NewFilterIndexer(onStor, utils.AttributeProfilePrefix,
utils.ConcatenatedKey("cgrates.org", "con3"))
if _, err := dataManager.GetFilterIndexes(
utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix,
utils.MetaString, nil); err != nil && err != utils.ErrNotFound {
if _, err := dataManager.GetIndexes(
utils.CacheAttributeFilterIndexes,
utils.ConcatenatedKey("cgrates.org", "con3"),
utils.MetaString, false, false); err != nil && err != utils.ErrNotFound {
t.Error(err)
}
@@ -535,18 +533,17 @@ func testITTestThresholdInlineFilterIndexing(t *testing.T) {
if err := dataManager.SetThresholdProfile(th, true); err != nil {
t.Error(err)
}
eIdxes := map[string]utils.StringMap{
eIdxes := map[string]utils.StringSet{
"*string:EventType:Event1": {
"THD_Test": true,
"THD_Test": struct{}{},
},
"*string:EventType:Event2": {
"THD_Test": true,
"THD_Test": struct{}{},
},
}
rfi := NewFilterIndexer(onStor, utils.ThresholdProfilePrefix, th.Tenant)
if rcvIdx, err := dataManager.GetFilterIndexes(
utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix,
utils.EmptyString, nil); err != nil {
if rcvIdx, err := dataManager.GetIndexes(
utils.CacheThresholdFilterIndexes, th.Tenant,
utils.EmptyString, false, false); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eIdxes, rcvIdx) {
t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx)
@@ -557,20 +554,20 @@ func testITTestThresholdInlineFilterIndexing(t *testing.T) {
if err := dataManager.SetThresholdProfile(th, true); err != nil {
t.Error(err)
}
eIdxes = map[string]utils.StringMap{
eIdxes = map[string]utils.StringSet{
"*string:Account:1001": {
"THD_Test": true,
"THD_Test": struct{}{},
},
"*string:EventType:Event1": {
"THD_Test": true,
"THD_Test": struct{}{},
},
"*string:EventType:Event2": {
"THD_Test": true,
"THD_Test": struct{}{},
},
}
if rcvIdx, err := dataManager.GetFilterIndexes(
utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix,
utils.EmptyString, nil); err != nil {
if rcvIdx, err := dataManager.GetIndexes(
utils.CacheThresholdFilterIndexes, th.Tenant,
utils.EmptyString, false, false); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eIdxes, rcvIdx) {
t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx)
@@ -580,73 +577,71 @@ func testITTestThresholdInlineFilterIndexing(t *testing.T) {
th.ID, utils.NonTransactional, true); err != nil {
t.Error(err)
}
if _, err := dataManager.GetFilterIndexes(
utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix,
utils.EmptyString, nil); err != utils.ErrNotFound {
if _, err := dataManager.GetIndexes(
utils.CacheThresholdFilterIndexes, th.Tenant,
utils.EmptyString, false, false); err != utils.ErrNotFound {
t.Error(err)
}
}
func testITTestStoreFilterIndexesWithTransID(t *testing.T) {
idxes := map[string]utils.StringMap{
idxes := map[string]utils.StringSet{
"*string:Account:1001": {
"RL1": true,
"RL1": struct{}{},
},
"*string:Account:1002": {
"RL1": true,
"RL2": true,
"RL1": struct{}{},
"RL2": struct{}{},
},
"*string:Account:dan": {
"RL2": true,
"RL2": struct{}{},
},
"*string:Subject:dan": {
"RL2": true,
"RL3": true,
"RL2": struct{}{},
"RL3": struct{}{},
},
utils.ConcatenatedKey(utils.META_NONE,
utils.ANY, utils.ANY): {
"RL4": true,
"RL5": true,
"RL4": struct{}{},
"RL5": struct{}{},
},
}
if err := dataManager.SetFilterIndexes(
utils.PrefixToIndexCache[utils.ResourceProfilesPrefix], "cgrates.org",
idxes, false, "transaction1"); err != nil {
if err := dataManager.SetIndexes(utils.CacheResourceFilterIndexes,
"cgrates.org", idxes, false, "transaction1"); err != nil {
t.Error(err)
}
//commit transaction
if err := dataManager.SetFilterIndexes(
utils.PrefixToIndexCache[utils.ResourceProfilesPrefix],
if err := dataManager.SetIndexes(utils.CacheResourceFilterIndexes,
"cgrates.org", idxes, true, "transaction1"); err != nil {
t.Error(err)
}
eIdx := map[string]utils.StringMap{
eIdx := map[string]utils.StringSet{
"*string:Account:1001": {
"RL1": true,
"RL1": struct{}{},
},
"*string:Account:1002": {
"RL1": true,
"RL2": true,
"RL1": struct{}{},
"RL2": struct{}{},
},
"*string:Account:dan": {
"RL2": true,
"RL2": struct{}{},
},
"*string:Subject:dan": {
"RL2": true,
"RL3": true,
"RL2": struct{}{},
"RL3": struct{}{},
},
utils.ConcatenatedKey(utils.META_NONE,
utils.ANY, utils.ANY): {
"RL4": true,
"RL5": true,
"RL4": struct{}{},
"RL5": struct{}{},
},
}
//verify new key and check if data was moved
if rcv, err := dataManager.GetFilterIndexes(
utils.PrefixToIndexCache[utils.ResourceProfilesPrefix], "cgrates.org",
utils.EmptyString, nil); err != nil {
if rcv, err := dataManager.GetIndexes(
utils.CacheResourceFilterIndexes, "cgrates.org",
utils.EmptyString, false, false); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eIdx, rcv) {
t.Errorf("Expecting: %+v, received: %+v", eIdx, rcv)
@@ -654,38 +649,36 @@ func testITTestStoreFilterIndexesWithTransID(t *testing.T) {
}
func testITTestStoreFilterIndexesWithTransID2(t *testing.T) {
idxes := map[string]utils.StringMap{
idxes := map[string]utils.StringSet{
"*string:Event:Event1": {
"RL1": true,
"RL1": struct{}{},
},
"*string:Event:Event2": {
"RL1": true,
"RL2": true,
"RL1": struct{}{},
"RL2": struct{}{},
},
}
transID := "transaction1"
if err := dataManager.SetFilterIndexes(
utils.PrefixToIndexCache[utils.ResourceProfilesPrefix], "cgrates.org",
idxes, false, transID); err != nil {
if err := dataManager.SetIndexes(utils.CacheResourceFilterIndexes,
"cgrates.org", idxes, false, transID); err != nil {
t.Error(err)
}
//commit transaction
if err := dataManager.SetFilterIndexes(
utils.PrefixToIndexCache[utils.ResourceProfilesPrefix], "cgrates.org",
idxes, true, transID); err != nil {
if err := dataManager.SetIndexes(utils.CacheResourceFilterIndexes,
"cgrates.org", idxes, true, transID); err != nil {
t.Error(err)
}
//verify if old key was deleted
if _, err := dataManager.GetFilterIndexes(
"tmp_"+utils.PrefixToIndexCache[utils.ResourceProfilesPrefix],
if _, err := dataManager.GetIndexes(
"tmp_"+utils.CacheResourceFilterIndexes,
utils.ConcatenatedKey("cgrates.org", transID),
utils.EmptyString, nil); err != utils.ErrNotFound {
utils.EmptyString, false, false); err != utils.ErrNotFound {
t.Error(err)
}
//verify new key and check if data was moved
if rcv, err := dataManager.GetFilterIndexes(
utils.PrefixToIndexCache[utils.ResourceProfilesPrefix],
"cgrates.org", utils.EmptyString, nil); err != nil {
if rcv, err := dataManager.GetIndexes(
utils.CacheResourceFilterIndexes,
"cgrates.org", utils.EmptyString, false, false); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(idxes, rcv) {
t.Errorf("Expecting: %+v, received: %+v", idxes, rcv)
@@ -722,28 +715,31 @@ func testITTestIndexingWithEmptyFltrID(t *testing.T) {
if err := dataManager.SetThresholdProfile(th2, true); err != nil {
t.Error(err)
}
eIdxes := map[string]utils.StringMap{
eIdxes := map[string]utils.StringSet{
"*none:*any:*any": {
"THD_Test": true,
"THD_Test2": true,
"THD_Test": struct{}{},
"THD_Test2": struct{}{},
},
}
rfi := NewFilterIndexer(onStor, utils.ThresholdProfilePrefix, th.Tenant)
if rcvIdx, err := dataManager.GetFilterIndexes(
utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix,
utils.META_NONE, nil); err != nil {
if rcvIdx, err := dataManager.GetIndexes(
utils.CacheThresholdFilterIndexes, th.Tenant,
utils.EmptyString, false, false); err != nil {
t.Error(err)
} else {
if !reflect.DeepEqual(eIdxes, rcvIdx) {
t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx)
}
}
eMp := utils.StringMap{
"THD_Test": true,
"THD_Test2": true,
eMp := map[string]utils.StringSet{
"*none:*any:*any": {
"THD_Test": struct{}{},
"THD_Test2": struct{}{},
},
}
if rcvMp, err := dataManager.MatchFilterIndex(utils.CacheThresholdFilterIndexes, th.Tenant,
utils.META_NONE, utils.META_ANY, utils.META_ANY); err != nil {
if rcvMp, err := dataManager.GetIndexes(
utils.CacheThresholdFilterIndexes, th.Tenant,
utils.ConcatenatedKey(utils.META_NONE, utils.META_ANY, utils.META_ANY),
true, true); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eMp, rcvMp) {
t.Errorf("Expecting: %+v, received: %+v", eMp, rcvMp)
@@ -804,28 +800,31 @@ func testITTestIndexingWithEmptyFltrID2(t *testing.T) {
if err := dataManager.SetRouteProfile(splProfile2, true); err != nil {
t.Error(err)
}
eIdxes := map[string]utils.StringMap{
eIdxes := map[string]utils.StringSet{
"*none:*any:*any": {
"SPL_Weight": true,
"SPL_Weight2": true,
"SPL_Weight": struct{}{},
"SPL_Weight2": struct{}{},
},
}
rfi := NewFilterIndexer(onStor, utils.RouteProfilePrefix, splProfile.Tenant)
if rcvIdx, err := dataManager.GetFilterIndexes(
utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix,
utils.EmptyString, nil); err != nil {
if rcvIdx, err := dataManager.GetIndexes(
utils.CacheRouteFilterIndexes, splProfile.Tenant,
utils.EmptyString, false, false); err != nil {
t.Error(err)
} else {
if !reflect.DeepEqual(eIdxes, rcvIdx) {
t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx)
}
}
eMp := utils.StringMap{
"SPL_Weight": true,
"SPL_Weight2": true,
eMp := map[string]utils.StringSet{
"*none:*any:*any": {
"SPL_Weight": struct{}{},
"SPL_Weight2": struct{}{},
},
}
if rcvMp, err := dataManager.MatchFilterIndex(utils.CacheRouteFilterIndexes,
splProfile.Tenant, utils.META_NONE, utils.META_ANY, utils.META_ANY); err != nil {
if rcvMp, err := dataManager.GetIndexes(
utils.CacheRouteFilterIndexes, splProfile.Tenant,
utils.ConcatenatedKey(utils.META_NONE, utils.META_ANY, utils.META_ANY),
true, true); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eMp, rcvMp) {
t.Errorf("Expecting: %+v, received: %+v", eMp, rcvMp)
@@ -851,7 +850,6 @@ func testITTestIndexingThresholds(t *testing.T) {
FilterIDs: []string{"*string:Account:1002", "*lt:Balance:1000"},
ActionIDs: []string{},
}
rfi := NewFilterIndexer(onStor, utils.ThresholdProfilePrefix, th.Tenant)
if err := dataManager.SetThresholdProfile(th, true); err != nil {
t.Error(err)
}
@@ -861,30 +859,34 @@ func testITTestIndexingThresholds(t *testing.T) {
if err := dataManager.SetThresholdProfile(th3, true); err != nil {
t.Error(err)
}
eIdxes := map[string]utils.StringMap{
eIdxes := map[string]utils.StringSet{
"*string:Account:1001": {
"TH1": true,
"TH2": true,
"TH1": struct{}{},
"TH2": struct{}{},
},
"*string:Account:1002": {
"TH3": true,
"TH3": struct{}{},
},
}
if rcvIdx, err := dataManager.GetFilterIndexes(
utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix,
utils.EmptyString, nil); err != nil {
if rcvIdx, err := dataManager.GetIndexes(
utils.CacheThresholdFilterIndexes, th.Tenant,
utils.EmptyString, false, false); err != nil {
t.Error(err)
} else {
if !reflect.DeepEqual(eIdxes, rcvIdx) {
t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx)
}
}
eMp := utils.StringMap{
"TH1": true,
"TH2": true,
eMp := map[string]utils.StringSet{
"*string:Account:1001": {
"TH1": struct{}{},
"TH2": struct{}{},
},
}
if rcvMp, err := dataManager.MatchFilterIndex(utils.CacheThresholdFilterIndexes, th.Tenant,
utils.MetaString, utils.Account, "1001"); err != nil {
if rcvMp, err := dataManager.GetIndexes(
utils.CacheThresholdFilterIndexes, th.Tenant,
utils.ConcatenatedKey(utils.MetaString, utils.Account, "1001"),
true, true); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eMp, rcvMp) {
t.Errorf("Expecting: %+v, received: %+v", eMp, rcvMp)
@@ -910,7 +912,6 @@ func testITTestIndexingMetaNot(t *testing.T) {
FilterIDs: []string{"*notstring:Account:1002", "*notstring:Balance:1000"},
ActionIDs: []string{},
}
rfi := NewFilterIndexer(onStor, utils.ThresholdProfilePrefix, th.Tenant)
if err := dataManager.SetThresholdProfile(th, true); err != nil {
t.Error(err)
}
@@ -920,28 +921,32 @@ func testITTestIndexingMetaNot(t *testing.T) {
if err := dataManager.SetThresholdProfile(th3, true); err != nil {
t.Error(err)
}
eIdxes := map[string]utils.StringMap{
eIdxes := map[string]utils.StringSet{
"*string:Account:1001": {
"TH1": true,
"TH1": struct{}{},
},
"*prefix:EventName:Name": {
"TH2": true,
"TH2": struct{}{},
},
}
if rcvIdx, err := dataManager.GetFilterIndexes(
utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix,
utils.EmptyString, nil); err != nil {
if rcvIdx, err := dataManager.GetIndexes(
utils.CacheThresholdFilterIndexes, th.Tenant,
utils.EmptyString, false, false); err != nil {
t.Error(err)
} else {
if !reflect.DeepEqual(eIdxes, rcvIdx) {
t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx)
}
}
eMp := utils.StringMap{
"TH1": true,
eMp := map[string]utils.StringSet{
"*string:Account:1001": {
"TH1": struct{}{},
},
}
if rcvMp, err := dataManager.MatchFilterIndex(utils.CacheThresholdFilterIndexes, th.Tenant,
utils.MetaString, utils.Account, "1001"); err != nil {
if rcvMp, err := dataManager.GetIndexes(
utils.CacheThresholdFilterIndexes, th.Tenant,
utils.ConcatenatedKey(utils.MetaString, utils.Account, "1001"),
true, true); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eMp, rcvMp) {
t.Errorf("Expecting: %+v, received: %+v", eMp, rcvMp)

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"fmt"
"reflect"
"testing"
"time"
@@ -119,7 +118,6 @@ func TestFileName(t *testing.T) {
if rcv[:7] != "module|" {
t.Errorf("Expecting: 'module|', received: %+v", rcv[:7])
} else if rcv[14:] != ".gob" {
fmt.Println(rcv)
t.Errorf("Expecting: '.gob', received: %+v", rcv[14:])
}

View File

@@ -312,7 +312,7 @@ func splitFilterIndex(tntCtxIdxKey string) (tntCtx, idxKey string, err error) {
// ComputeIndexes gets the indexes from tha DB and ensure that the items are indexed
// getFilters returns a list of filters IDs for the given profile id
func ComputeIndexes(dm *DataManager, tnt, ctx, idxItmType string, IDs *[]string,
transactionID string, getFilters func(tnt, id, ctx string) (*[]string, error)) (err error) {
transactionID string, getFilters func(tnt, id, ctx string) (*[]string, error)) (processed bool, err error) {
var profilesIDs []string
if IDs == nil { // get all items
var ids []string
@@ -348,6 +348,7 @@ func ComputeIndexes(dm *DataManager, tnt, ctx, idxItmType string, IDs *[]string,
if err = dm.SetIndexes(idxItmType, tntCtx, index, cacheCommit(transactionID), transactionID); err != nil {
return
}
processed = true
}
return
}

View File

@@ -93,17 +93,10 @@ type DataDB interface {
RemoveTimingDrv(string) error
GetLoadHistory(int, bool, string) ([]*utils.LoadInstance, error)
AddLoadHistory(*utils.LoadInstance, int, string) error
GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType string,
fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error)
SetFilterIndexesDrv(cacheID, itemIDPrefix string,
indexes map[string]utils.StringMap, commit bool, transactionID string) (err error)
RemoveFilterIndexesDrv(cacheID, itemIDPrefix string) (err error)
GetIndexesDrv(idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error)
SetIndexesDrv(idxItmType, tntCtx string,
indexes map[string]utils.StringSet, commit bool, transactionID string) (err error)
RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) (err error)
MatchFilterIndexDrv(cacheID, itemIDPrefix,
filterType, fieldName, fieldVal string) (itemIDs utils.StringMap, err error)
GetStatQueueProfileDrv(tenant string, ID string) (sq *StatQueueProfile, err error)
SetStatQueueProfileDrv(sq *StatQueueProfile) (err error)
RemStatQueueProfileDrv(tenant, id string) (err error)

View File

@@ -1097,109 +1097,6 @@ func (iDB *InternalDB) AddLoadHistory(*utils.LoadInstance, int, string) error {
return nil
}
func (iDB *InternalDB) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType string,
fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) {
dbKey := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix
x, ok := iDB.db.Get(cacheID, dbKey)
if !ok || x == nil {
return nil, utils.ErrNotFound
}
if len(fldNameVal) != 0 {
rcvidx := x.(map[string]utils.StringMap)
indexes = make(map[string]utils.StringMap)
for fldName, fldVal := range fldNameVal {
if _, has := indexes[utils.ConcatenatedKey(filterType, fldName, fldVal)]; !has {
indexes[utils.ConcatenatedKey(filterType, fldName, fldVal)] = make(utils.StringMap)
}
if len(rcvidx[utils.ConcatenatedKey(filterType, fldName, fldVal)]) != 0 {
for key := range rcvidx[utils.ConcatenatedKey(filterType, fldName, fldVal)] {
indexes[utils.ConcatenatedKey(filterType, fldName, fldVal)][key] = true
}
}
}
return
} else {
indexes = x.(map[string]utils.StringMap)
if len(indexes) == 0 {
return nil, utils.ErrNotFound
}
}
return
}
func (iDB *InternalDB) SetFilterIndexesDrv(cacheID, itemIDPrefix string,
indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) {
originKey := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix
dbKey := originKey
if transactionID != "" {
dbKey = "tmp_" + utils.ConcatenatedKey(dbKey, transactionID)
}
if commit && transactionID != "" {
x, _ := iDB.db.Get(cacheID, dbKey)
iDB.db.Remove(cacheID, dbKey,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
iDB.db.Set(cacheID, originKey, x, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
var toBeDeleted []string
toBeAdded := make(map[string]utils.StringMap)
for key, strMp := range indexes {
if len(strMp) == 0 { // remove with no more elements inside
toBeDeleted = append(toBeDeleted, key)
delete(indexes, key)
continue
}
toBeAdded[key] = make(utils.StringMap)
toBeAdded[key] = strMp
}
x, ok := iDB.db.Get(cacheID, dbKey)
if !ok || x == nil {
iDB.db.Set(cacheID, dbKey, toBeAdded, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return err
}
mp := x.(map[string]utils.StringMap)
for _, key := range toBeDeleted {
delete(mp, key)
}
for key, strMp := range toBeAdded {
if _, has := mp[key]; !has {
mp[key] = make(utils.StringMap)
}
mp[key] = strMp
}
iDB.db.Set(cacheID, dbKey, mp, nil,
cacheCommit(transactionID), transactionID)
return nil
}
func (iDB *InternalDB) RemoveFilterIndexesDrv(cacheID, itemIDPrefix string) (err error) {
iDB.db.Remove(cacheID, utils.CacheInstanceToPrefix[cacheID]+itemIDPrefix,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
func (iDB *InternalDB) MatchFilterIndexDrv(cacheID, itemIDPrefix,
filterType, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) {
x, ok := iDB.db.Get(cacheID, utils.CacheInstanceToPrefix[cacheID]+itemIDPrefix)
if !ok || x == nil {
return nil, utils.ErrNotFound
}
indexes := x.(map[string]utils.StringMap)
if _, hasIt := indexes[utils.ConcatenatedKey(filterType, fieldName, fieldVal)]; hasIt {
itemIDs = indexes[utils.ConcatenatedKey(filterType, fieldName, fieldVal)]
}
if len(itemIDs) == 0 {
return nil, utils.ErrNotFound
}
return
}
func (iDB *InternalDB) GetStatQueueProfileDrv(tenant string, id string) (sq *StatQueueProfile, err error) {
x, ok := iDB.db.Get(utils.CacheStatQueueProfiles, utils.ConcatenatedKey(tenant, id))
if !ok || x == nil {
@@ -1486,7 +1383,7 @@ func (iDB *InternalDB) SetIndexesDrv(idxItmType, tntCtx string,
}
iDB.db.Remove(idxItmType, dbKey,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
key := strings.TrimSuffix(strings.TrimPrefix(dbKey, "tmp_"), transactionID)
key := strings.TrimSuffix(strings.TrimPrefix(dbKey, "tmp_"), utils.CONCATENATED_KEY_SEP+transactionID)
iDB.db.Set(idxItmType, key, x, []string{tntCtx},
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}

View File

@@ -60,7 +60,6 @@ const (
ColLht = "load_history"
ColVer = "versions"
ColRsP = "resource_profiles"
ColRFI = "request_filter_indexes"
ColIndx = "indexes"
ColTmg = "timings"
ColRes = "resources"
@@ -284,7 +283,7 @@ func (ms *MongoStorage) ensureIndexesForCol(col string) (err error) { // exporte
}
err = nil
switch col {
case ColAct, ColApl, ColAAp, ColAtr, ColRpl, ColDst, ColRds, ColLht, ColRFI:
case ColAct, ColApl, ColAAp, ColAtr, ColRpl, ColDst, ColRds, ColLht, ColIndx:
if err = ms.enusureIndex(col, true, "key"); err != nil {
return
}
@@ -351,7 +350,7 @@ func (ms *MongoStorage) EnsureIndexes(cols ...string) (err error) {
}
if ms.storageType == utils.DataDB {
for _, col := range []string{ColAct, ColApl, ColAAp, ColAtr,
ColRpl, ColDst, ColRds, ColLht, ColRFI, ColRsP, ColRes, ColSqs, ColSqp,
ColRpl, ColDst, ColRds, ColLht, ColIndx, ColRsP, ColRes, ColSqs, ColSqp,
ColTps, ColThs, ColRts, ColAttr, ColFlt, ColCpp, ColDpp, ColRpp,
ColRpf, ColShg, ColAcc} {
if err = ms.ensureIndexesForCol(col); err != nil {
@@ -688,19 +687,19 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err er
case utils.DispatcherHostPrefix:
result, err = ms.getField2(sctx, ColDph, utils.DispatcherHostPrefix, subject, tntID)
case utils.AttributeFilterIndexes:
result, err = ms.getField3(sctx, ColRFI, utils.AttributeFilterIndexes, "key")
result, err = ms.getField3(sctx, ColIndx, utils.AttributeFilterIndexes, "key")
case utils.ResourceFilterIndexes:
result, err = ms.getField3(sctx, ColRFI, utils.ResourceFilterIndexes, "key")
result, err = ms.getField3(sctx, ColIndx, utils.ResourceFilterIndexes, "key")
case utils.StatFilterIndexes:
result, err = ms.getField3(sctx, ColRFI, utils.StatFilterIndexes, "key")
result, err = ms.getField3(sctx, ColIndx, utils.StatFilterIndexes, "key")
case utils.ThresholdFilterIndexes:
result, err = ms.getField3(sctx, ColRFI, utils.ThresholdFilterIndexes, "key")
result, err = ms.getField3(sctx, ColIndx, utils.ThresholdFilterIndexes, "key")
case utils.RouteFilterIndexes:
result, err = ms.getField3(sctx, ColRFI, utils.RouteFilterIndexes, "key")
result, err = ms.getField3(sctx, ColIndx, utils.RouteFilterIndexes, "key")
case utils.ChargerFilterIndexes:
result, err = ms.getField3(sctx, ColRFI, utils.ChargerFilterIndexes, "key")
result, err = ms.getField3(sctx, ColIndx, utils.ChargerFilterIndexes, "key")
case utils.DispatcherFilterIndexes:
result, err = ms.getField3(sctx, ColRFI, utils.DispatcherFilterIndexes, "key")
result, err = ms.getField3(sctx, ColIndx, utils.DispatcherFilterIndexes, "key")
default:
err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %s", prefix)
}
@@ -1742,182 +1741,6 @@ func (ms *MongoStorage) RemoveTimingDrv(id string) (err error) {
})
}
// GetFilterIndexesDrv retrieves Indexes from dataDB
//filterType is used togheter with fieldName:Val
func (ms *MongoStorage) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType string,
fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) {
type result struct {
Key string
Value []string
}
var results []result
dbKey := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix
if len(fldNameVal) != 0 {
for fldName, fldValue := range fldNameVal {
if err = ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(ColRFI).Find(sctx, bson.M{"key": utils.ConcatenatedKey(dbKey, filterType, fldName, fldValue)})
if err != nil {
return err
}
for cur.Next(sctx) {
var elem result
if err := cur.Decode(&elem); err != nil {
return err
}
results = append(results, elem)
}
return cur.Close(sctx)
}); err != nil {
return nil, err
}
if len(results) == 0 {
return nil, utils.ErrNotFound
}
}
} else {
for _, character := range []string{".", "*"} {
dbKey = strings.Replace(dbKey, character, `\`+character, strings.Count(dbKey, character))
}
//inside bson.RegEx add carrot to match the prefix (optimization)
if err = ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(ColRFI).Find(sctx, bson.M{"key": bsonx.Regex("^"+dbKey, "")})
if err != nil {
return err
}
for cur.Next(sctx) {
var elem result
if err := cur.Decode(&elem); err != nil {
return err
}
results = append(results, elem)
}
return cur.Close(sctx)
}); err != nil {
return nil, err
}
if len(results) == 0 {
return nil, utils.ErrNotFound
}
}
indexes = make(map[string]utils.StringMap)
for _, res := range results {
if len(res.Value) == 0 {
continue
}
keys := strings.Split(res.Key, ":") // "cgrates.org:*sesions:*string:Subject:dan"
indexKey := utils.ConcatenatedKey(keys[1], keys[2], keys[3])
//check here if itemIDPrefix has context
if len(strings.Split(itemIDPrefix, ":")) == 2 {
indexKey = utils.ConcatenatedKey(keys[2], keys[3], keys[4])
}
indexes[indexKey] = utils.StringMapFromSlice(res.Value)
}
if len(indexes) == 0 {
return nil, utils.ErrNotFound
}
return indexes, nil
}
// SetFilterIndexesDrv stores Indexes into DataDB
func (ms *MongoStorage) SetFilterIndexesDrv(cacheID, itemIDPrefix string,
indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) {
originKey := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix
dbKey := originKey
if transactionID != "" {
dbKey = "tmp_" + utils.ConcatenatedKey(originKey, transactionID)
}
if commit && transactionID != "" {
regexKey := originKey
for _, character := range []string{".", "*"} {
regexKey = strings.Replace(regexKey, character, `\`+character, strings.Count(regexKey, character))
}
//inside bson.RegEx add carrot to match the prefix (optimization)
if err = ms.query(func(sctx mongo.SessionContext) (err error) {
_, err = ms.getCol(ColRFI).DeleteMany(sctx, bson.M{"key": bsonx.Regex("^"+regexKey, "")})
return err
}); err != nil {
return err
}
var lastErr error
for key, itmMp := range indexes {
if err = ms.query(func(sctx mongo.SessionContext) (err error) {
_, err = ms.getCol(ColRFI).UpdateOne(sctx, bson.M{"key": utils.ConcatenatedKey(originKey, key)},
bson.M{"$set": bson.M{"key": utils.ConcatenatedKey(originKey, key), "value": itmMp.Slice()}},
options.Update().SetUpsert(true),
)
return err
}); err != nil {
lastErr = err
}
}
if lastErr != nil {
return lastErr
}
oldKey := "tmp_" + utils.ConcatenatedKey(originKey, transactionID)
for _, character := range []string{".", "*"} {
oldKey = strings.Replace(oldKey, character, `\`+character, strings.Count(oldKey, character))
}
//inside bson.RegEx add carrot to match the prefix (optimization)
return ms.query(func(sctx mongo.SessionContext) (err error) {
_, err = ms.getCol(ColRFI).DeleteMany(sctx, bson.M{"key": bsonx.Regex("^"+oldKey, "")})
return err
})
} else {
var lastErr error
for key, itmMp := range indexes {
if err = ms.query(func(sctx mongo.SessionContext) (err error) {
var action bson.M
if len(itmMp) == 0 {
action = bson.M{"$unset": bson.M{"value": 1}}
} else {
action = bson.M{"$set": bson.M{"key": utils.ConcatenatedKey(dbKey, key), "value": itmMp.Slice()}}
}
_, err = ms.getCol(ColRFI).UpdateOne(sctx, bson.M{"key": utils.ConcatenatedKey(dbKey, key)},
action, options.Update().SetUpsert(true),
)
return err
}); err != nil {
lastErr = err
}
}
return lastErr
}
}
func (ms *MongoStorage) RemoveFilterIndexesDrv(cacheID, itemIDPrefix string) (err error) {
regexKey := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix
for _, character := range []string{".", "*"} {
regexKey = strings.Replace(regexKey, character, `\`+character, strings.Count(regexKey, character))
}
//inside bson.RegEx add carrot to match the prefix (optimization)
return ms.query(func(sctx mongo.SessionContext) (err error) {
_, err = ms.getCol(ColRFI).DeleteMany(sctx, bson.M{"key": bsonx.Regex("^"+regexKey, "")})
return err
})
}
func (ms *MongoStorage) MatchFilterIndexDrv(cacheID, itemIDPrefix,
filterType, fldName, fldVal string) (itemIDs utils.StringMap, err error) {
var result struct {
Key string
Value []string
}
dbKey := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix
if err = ms.query(func(sctx mongo.SessionContext) (err error) {
cur := ms.getCol(ColRFI).FindOne(sctx, bson.M{"key": utils.ConcatenatedKey(dbKey, filterType, fldName, fldVal)})
if err := cur.Decode(&result); err != nil {
if err == mongo.ErrNoDocuments {
return utils.ErrNotFound
}
return err
}
return nil
}); err != nil {
return nil, err
}
return utils.StringMapFromSlice(result.Value), nil
}
// GetStatQueueProfileDrv retrieves a StatQueueProfile from dataDB
func (ms *MongoStorage) GetStatQueueProfileDrv(tenant string, id string) (sq *StatQueueProfile, err error) {
sq = new(StatQueueProfile)
@@ -2387,7 +2210,6 @@ func (ms *MongoStorage) GetIndexesDrv(idxItmType, tntCtx, idxKey string) (indexe
//inside bson.RegEx add carrot to match the prefix (optimization)
q = bson.M{"key": bsonx.Regex("^"+dbKey, utils.EmptyString)}
}
indexes = make(map[string]utils.StringSet)
if err = ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(ColIndx).Find(sctx, q)
@@ -2439,9 +2261,12 @@ func (ms *MongoStorage) SetIndexesDrv(idxItmType, tntCtx string,
result, err = ms.getField3(sctx, ColIndx, regexKey, "key")
for _, key := range result {
idxKey := strings.TrimPrefix(key, dbKey)
if _, err = ms.getCol(ColIndx).DeleteOne(sctx,
bson.M{"key": originKey + idxKey}); err != nil { //ensure we do not have dup
return err
}
if _, err = ms.getCol(ColIndx).UpdateOne(sctx, bson.M{"key": key},
bson.M{"$set": bson.M{"key": originKey + idxKey}}, // only update the key
options.Update().SetUpsert(true),
); err != nil {
return err
}
@@ -2454,15 +2279,16 @@ func (ms *MongoStorage) SetIndexesDrv(idxItmType, tntCtx string,
var lastErr error
for idxKey, itmMp := range indexes {
if err = ms.query(func(sctx mongo.SessionContext) (err error) {
var action bson.M
if len(itmMp) == 0 {
action = bson.M{"$unset": bson.M{"value": 1}}
idxDbkey := utils.ConcatenatedKey(dbKey, idxKey)
if len(itmMp) == 0 { // remove from DB if we set it with empty indexes
_, err = ms.getCol(ColIndx).DeleteOne(sctx,
bson.M{"key": idxDbkey})
} else {
action = bson.M{"$set": bson.M{"key": utils.ConcatenatedKey(dbKey, idxKey), "value": itmMp.AsSlice()}}
_, err = ms.getCol(ColIndx).UpdateOne(sctx, bson.M{"key": idxDbkey},
bson.M{"$set": bson.M{"key": idxDbkey, "value": itmMp.AsSlice()}},
options.Update().SetUpsert(true),
)
}
_, err = ms.getCol(ColIndx).UpdateOne(sctx, bson.M{"key": utils.ConcatenatedKey(dbKey, idxKey)},
action, options.Update().SetUpsert(true),
)
return err
}); err != nil {
lastErr = err

View File

@@ -1212,99 +1212,6 @@ func (rs *RedisStorage) RemoveTimingDrv(id string) (err error) {
return
}
//GetFilterIndexesDrv retrieves Indexes from dataDB
//filterType is used togheter with fieldName:Val
func (rs *RedisStorage) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType string,
fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) {
mp := make(map[string]string)
dbKey := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix
if len(fldNameVal) == 0 {
mp, err = rs.Cmd(redis_HGETALL, dbKey).Map()
if err != nil {
return
} else if len(mp) == 0 {
return nil, utils.ErrNotFound
}
} else {
var itmMpStrLst []string
for fldName, fldVal := range fldNameVal {
concatTypeNameVal := utils.ConcatenatedKey(filterType, fldName, fldVal)
itmMpStrLst, err = rs.Cmd(redis_HMGET, dbKey, concatTypeNameVal).List()
if err != nil {
return
} else if itmMpStrLst[0] == "" {
return nil, utils.ErrNotFound
}
mp[concatTypeNameVal] = itmMpStrLst[0]
}
}
indexes = make(map[string]utils.StringMap)
for k, v := range mp {
var sm utils.StringMap
if err = rs.ms.Unmarshal([]byte(v), &sm); err != nil {
return
}
indexes[k] = sm
}
return
}
//SetFilterIndexesDrv stores Indexes into DataDB
func (rs *RedisStorage) SetFilterIndexesDrv(cacheID, itemIDPrefix string,
indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) {
originKey := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix
dbKey := originKey
if transactionID != "" {
dbKey = "tmp_" + utils.ConcatenatedKey(dbKey, transactionID)
}
if commit && transactionID != "" {
return rs.Cmd(redis_RENAME, dbKey, originKey).Err
} else {
mp := make(map[string]string)
nameValSls := []interface{}{dbKey}
for key, strMp := range indexes {
if len(strMp) == 0 { // remove with no more elements inside
nameValSls = append(nameValSls, key)
continue
}
if encodedMp, err := rs.ms.Marshal(strMp); err != nil {
return err
} else {
mp[key] = string(encodedMp)
}
}
if len(nameValSls) != 1 {
if err = rs.Cmd(redis_HDEL, nameValSls...).Err; err != nil {
return err
}
}
if len(mp) != 0 {
return rs.Cmd(redis_HMSET, dbKey, mp).Err
}
return
}
}
func (rs *RedisStorage) RemoveFilterIndexesDrv(cacheID, itemIDPrefix string) (err error) {
return rs.Cmd(redis_DEL, utils.CacheInstanceToPrefix[cacheID]+itemIDPrefix).Err
}
func (rs *RedisStorage) MatchFilterIndexDrv(cacheID, itemIDPrefix,
filterType, fldName, fldVal string) (itemIDs utils.StringMap, err error) {
fieldValKey := utils.ConcatenatedKey(filterType, fldName, fldVal)
fldValBytes, err := rs.Cmd(redis_HGET,
utils.CacheInstanceToPrefix[cacheID]+itemIDPrefix, fieldValKey).Bytes()
if err != nil {
if err == redis.ErrRespNil { // did not find the destination
err = utils.ErrNotFound
}
return nil, err
} else if err = rs.ms.Unmarshal(fldValBytes, &itemIDs); err != nil {
return
}
return
}
func (rs *RedisStorage) GetVersions(itm string) (vrs Versions, err error) {
if itm != "" {
fldVal, err := rs.Cmd(redis_HGET, utils.TBLVersions, itm).Str()