mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Add locking for matchItemIDForEvent fixes #1128
This commit is contained in:
committed by
Dan Christian Bogos
parent
e634d968df
commit
3a70ba1d46
@@ -153,7 +153,7 @@ var sTestsThresholdSV1 = []func(t *testing.T){
|
||||
testV1TSLoadConfig,
|
||||
testV1TSInitDataDb,
|
||||
testV1TSResetStorDb,
|
||||
testV1TSStartEngine,
|
||||
// testV1TSStartEngine,
|
||||
testV1TSRpcConn,
|
||||
testV1TSFromFolder,
|
||||
testV1TSGetThresholds,
|
||||
|
||||
@@ -21,8 +21,6 @@ package engine
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/guardian"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
@@ -76,9 +74,6 @@ func (alS *AttributeService) matchingAttributeProfilesForEvent(ev *utils.CGREven
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
lockIDs := utils.PrefixSliceItems(aPrflIDs.Slice(), utils.AttributeFilterIndexes)
|
||||
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...)
|
||||
defer guardian.Guardian.UnguardIDs(lockIDs...)
|
||||
for apID := range aPrflIDs {
|
||||
aPrfl, err := alS.dm.GetAttributeProfile(ev.Tenant, apID, false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
|
||||
@@ -21,6 +21,8 @@ package engine
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/guardian"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
@@ -29,6 +31,9 @@ import (
|
||||
// helper on top of dataDB.MatchFilterIndex, adding utils.ANY to list of fields queried
|
||||
func matchingItemIDsForEvent(ev map[string]interface{}, stringFldIDs, prefixFldIDs *[]string,
|
||||
dm *DataManager, cacheID, itemIDPrefix string, indexedSelects bool) (itemIDs utils.StringMap, err error) {
|
||||
lockID := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix
|
||||
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockID)
|
||||
defer guardian.Guardian.UnguardIDs(lockID)
|
||||
itemIDs = make(utils.StringMap)
|
||||
if !indexedSelects {
|
||||
sliceIDs, err := dm.DataDB().GetKeysForPrefix(itemIDPrefix)
|
||||
|
||||
@@ -22,6 +22,8 @@ import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/guardian"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
@@ -124,6 +126,9 @@ func (rfi *FilterIndexer) cacheRemItemType() { // ToDo: tune here by removing pe
|
||||
|
||||
// StoreIndexes handles storing the indexes to dataDB
|
||||
func (rfi *FilterIndexer) StoreIndexes(commit bool, transactionID string) (err error) {
|
||||
lockID := utils.CacheInstanceToPrefix[utils.PrefixToIndexCache[rfi.itemType]] + rfi.dbKeySuffix
|
||||
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockID)
|
||||
defer guardian.Guardian.UnguardIDs(lockID)
|
||||
if err = rfi.dm.SetFilterIndexes(
|
||||
utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix,
|
||||
rfi.indexes, commit, transactionID); err != nil {
|
||||
|
||||
@@ -448,9 +448,6 @@ func (rS *ResourceService) matchingResourcesForEvent(ev *utils.CGREvent, usageTT
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lockIDs := utils.PrefixSliceItems(rIDs.Slice(), utils.ResourceFilterIndexes)
|
||||
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...)
|
||||
defer guardian.Guardian.UnguardIDs(lockIDs...)
|
||||
for resName := range rIDs {
|
||||
rPrf, err := rS.dm.GetResourceProfile(ev.Tenant, resName, false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
|
||||
@@ -152,10 +152,6 @@ func (sS *StatService) matchingStatQueuesForEvent(ev *utils.CGREvent) (sqs StatQ
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lockIDs := utils.PrefixSliceItems(sqIDs.Slice(), utils.StatFilterIndexes)
|
||||
lockIDs = append(lockIDs, utils.PrefixSliceItems(sqIDs.Slice(), utils.StatQueuePrefix)...) // add also lock for statQueue instances
|
||||
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...)
|
||||
defer guardian.Guardian.UnguardIDs(lockIDs...)
|
||||
for sqID := range sqIDs {
|
||||
sqPrfl, err := sS.dm.GetStatQueueProfile(ev.Tenant, sqID, false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
|
||||
@@ -25,7 +25,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/guardian"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
@@ -121,9 +120,6 @@ func (spS *SupplierService) matchingSupplierProfilesForEvent(ev *utils.CGREvent)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lockIDs := utils.PrefixSliceItems(sPrflIDs.Slice(), utils.SupplierFilterIndexes)
|
||||
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...)
|
||||
defer guardian.Guardian.UnguardIDs(lockIDs...)
|
||||
for lpID := range sPrflIDs {
|
||||
splPrfl, err := spS.dm.GetSupplierProfile(ev.Tenant, lpID, false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
|
||||
@@ -25,8 +25,6 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/guardian"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
@@ -231,9 +229,6 @@ func (tS *ThresholdService) matchingThresholdsForEvent(args *ArgsProcessEvent) (
|
||||
}
|
||||
tIDs = tIDsMap.Slice()
|
||||
}
|
||||
lockIDs := utils.PrefixSliceItems(tIDs, utils.ThresholdFilterIndexes)
|
||||
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...)
|
||||
defer guardian.Guardian.UnguardIDs(lockIDs...)
|
||||
for _, tID := range tIDs {
|
||||
tPrfl, err := tS.dm.GetThresholdProfile(args.Tenant, tID, false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user