Updated threshold caching

This commit is contained in:
Trial97
2021-07-27 16:41:09 +03:00
committed by Dan Christian Bogos
parent 28fbb85882
commit d024f97f31
5 changed files with 641 additions and 109 deletions

View File

@@ -184,10 +184,14 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b
_, err = dm.GetTiming(dataID, true, utils.NonTransactional)
case utils.ThresholdProfilePrefix:
tntID := utils.NewTenantID(dataID)
lkID := guardian.Guardian.GuardIDs("", config.CgrConfig().GeneralCfg().LockingTimeout, thresholdProfileLockKey(tntID.Tenant, tntID.ID))
_, err = dm.GetThresholdProfile(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
guardian.Guardian.UnguardIDs(lkID)
case utils.ThresholdPrefix:
tntID := utils.NewTenantID(dataID)
lkID := guardian.Guardian.GuardIDs("", config.CgrConfig().GeneralCfg().LockingTimeout, thresholdLockKey(tntID.Tenant, tntID.ID))
_, err = dm.GetThreshold(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
guardian.Guardian.UnguardIDs(lkID)
case utils.FilterPrefix:
tntID := utils.NewTenantID(dataID)
_, err = dm.GetFilter(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)

View File

@@ -389,6 +389,19 @@ type ResourceService struct {
connMgr *ConnManager
}
// Reload stops the backupLoop and restarts it
func (rS *ResourceService) Reload() {
close(rS.stopBackup)
<-rS.loopStoped // wait until the loop is done
rS.stopBackup = make(chan struct{})
go rS.runBackup()
}
// StartLoop starts the gorutine with the backup loop
func (rS *ResourceService) StartLoop() {
go rS.runBackup()
}
// Shutdown is called to shutdown the service
func (rS *ResourceService) Shutdown() {
utils.Logger.Info("<ResourceS> service shutdown initialized")
@@ -947,16 +960,3 @@ func (rS *ResourceService) V1GetResourceWithConfig(arg *utils.TenantIDWithAPIOpt
return
}
// Reload stops the backupLoop and restarts it
func (rS *ResourceService) Reload() {
close(rS.stopBackup)
<-rS.loopStoped // wait until the loop is done
rS.stopBackup = make(chan struct{})
go rS.runBackup()
}
// StartLoop starts the gorutine with the backup loop
func (rS *ResourceService) StartLoop() {
go rS.runBackup()
}

View File

@@ -76,7 +76,7 @@ func (sS *StatService) Shutdown() {
utils.Logger.Info("<StatS> service shutdown complete")
}
// runBackup will regularly store resources changed to dataDB
// runBackup will regularly store statQueues changed to dataDB
func (sS *StatService) runBackup() {
storeInterval := sS.cgrcfg.StatSCfg().StoreInterval
if storeInterval <= 0 {
@@ -94,7 +94,7 @@ func (sS *StatService) runBackup() {
}
}
// storeResources represents one task of complete backup
// storestatQueues represents one task of complete backup
func (sS *StatService) storeStats() {
var failedSqIDs []string
for { // don't stop untill we store all dirty statQueues
@@ -155,7 +155,7 @@ func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) {
return
}
// matchingStatQueuesForEvent returns ordered list of matching resources which are active by the time of the call
// matchingStatQueuesForEvent returns ordered list of matching statQueues which are active by the time of the call
func (sS *StatService) matchingStatQueuesForEvent(tnt string, statsIDs []string, actTime *time.Time, evNm utils.MapStorage) (sqs StatQueues, err error) {
sqIDs := utils.NewStringSet(statsIDs)
if len(sqIDs) == 0 {
@@ -440,7 +440,7 @@ func (sS *StatService) V1GetStatQueue(args *utils.TenantIDWithAPIOpts, reply *St
if tnt == utils.EmptyString {
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
}
// make sure resource is locked at process level
// make sure statQueue is locked at process level
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout,
statQueueLockKey(tnt, args.ID))
@@ -462,7 +462,7 @@ func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[
if tnt == utils.EmptyString {
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
}
// make sure resource is locked at process level
// make sure statQueue is locked at process level
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout,
statQueueLockKey(tnt, args.ID))
@@ -491,7 +491,7 @@ func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[s
if tnt == utils.EmptyString {
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
}
// make sure resource is locked at process level
// make sure statQueue is locked at process level
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout,
statQueueLockKey(tnt, args.ID))
@@ -538,7 +538,7 @@ func (sS *StatService) V1ResetStatQueue(tntID *utils.TenantID, rply *string) (er
if tnt == utils.EmptyString {
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
}
// make sure resource is locked at process level
// make sure statQueue is locked at process level
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout,
statQueueLockKey(tnt, tntID.ID))

View File

@@ -26,6 +26,7 @@ import (
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
)
@@ -48,6 +49,8 @@ type ThresholdProfile struct {
Weight float64 // Weight to sort the thresholds
ActionIDs []string
Async bool
lkID string // holds the reference towards guardian lock key
}
// TenantID returns the concatenated key beteen tenant and ID
@@ -55,6 +58,36 @@ func (tp *ThresholdProfile) TenantID() string {
return utils.ConcatenatedKey(tp.Tenant, tp.ID)
}
// thresholdProfileLockKey returns the ID used to lock a ThresholdProfile with guardian
func thresholdProfileLockKey(tnt, id string) string {
return utils.ConcatenatedKey(utils.CacheThresholdProfiles, tnt, id)
}
// lock will lock the ThresholdProfile using guardian and store the lock within r.lkID
// if lkID is passed as argument, the lock is considered as executed
func (tp *ThresholdProfile) lock(lkID string) {
if lkID == utils.EmptyString {
lkID = guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout,
thresholdProfileLockKey(tp.Tenant, tp.ID))
}
tp.lkID = lkID
}
// unlock will unlock the ThresholdProfile and clear rp.lkID
func (tp *ThresholdProfile) unlock() {
if tp.lkID == utils.EmptyString {
return
}
guardian.Guardian.UnguardIDs(tp.lkID)
tp.lkID = utils.EmptyString
}
// isLocked returns the locks status of this ThresholdProfile
func (tp *ThresholdProfile) isLocked() bool {
return tp.lkID != utils.EmptyString
}
// ThresholdWithAPIOpts is used in replicatorV1 for dispatcher
type ThresholdWithAPIOpts struct {
*Threshold
@@ -68,6 +101,7 @@ type Threshold struct {
Hits int // number of hits for this threshold
Snooze time.Time // prevent threshold to run too early
lkID string // ID of the lock used when matching the threshold
tPrfl *ThresholdProfile
dirty *bool // needs save
}
@@ -77,6 +111,36 @@ func (t *Threshold) TenantID() string {
return utils.ConcatenatedKey(t.Tenant, t.ID)
}
// thresholdLockKey returns the ID used to lock a threshold with guardian
func thresholdLockKey(tnt, id string) string {
return utils.ConcatenatedKey(utils.CacheThresholds, tnt, id)
}
// lock will lock the threshold using guardian and store the lock within r.lkID
// if lkID is passed as argument, the lock is considered as executed
func (t *Threshold) lock(lkID string) {
if lkID == utils.EmptyString {
lkID = guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout,
thresholdLockKey(t.Tenant, t.ID))
}
t.lkID = lkID
}
// unlock will unlock the threshold and clear r.lkID
func (t *Threshold) unlock() {
if t.lkID == utils.EmptyString {
return
}
guardian.Guardian.UnguardIDs(t.lkID)
t.lkID = utils.EmptyString
}
// isLocked returns the locks status of this threshold
func (t *Threshold) isLocked() bool {
return t.lkID != utils.EmptyString
}
// ProcessEvent processes an ThresholdEvent
// concurrentActions limits the number of simultaneous action sets executed
func (t *Threshold) ProcessEvent(args *ThresholdsArgsProcessEvent, dm *DataManager) (err error) {
@@ -128,9 +192,20 @@ func (ts Thresholds) Sort() {
sort.Slice(ts, func(i, j int) bool { return ts[i].tPrfl.Weight > ts[j].tPrfl.Weight })
}
// unlock will unlock thresholds part of this slice
func (ts Thresholds) unlock() {
for _, t := range ts {
t.unlock()
if t.tPrfl != nil {
t.tPrfl.unlock()
}
}
}
// NewThresholdService the constructor for ThresoldS service
func NewThresholdService(dm *DataManager, cgrcfg *config.CGRConfig, filterS *FilterS) (tS *ThresholdService) {
return &ThresholdService{dm: dm,
func NewThresholdService(dm *DataManager, cgrcfg *config.CGRConfig, filterS *FilterS) *ThresholdService {
return &ThresholdService{
dm: dm,
cgrcfg: cgrcfg,
filterS: filterS,
stopBackup: make(chan struct{}),
@@ -150,6 +225,19 @@ type ThresholdService struct {
stMux sync.RWMutex // protects storedTdIDs
}
// Reload stops the backupLoop and restarts it
func (tS *ThresholdService) Reload() {
close(tS.stopBackup)
<-tS.loopStoped // wait until the loop is done
tS.stopBackup = make(chan struct{})
go tS.runBackup()
}
// StartLoop starts the gorutine with the backup loop
func (tS *ThresholdService) StartLoop() {
go tS.runBackup()
}
// Shutdown is called to shutdown the service
func (tS *ThresholdService) Shutdown() {
utils.Logger.Info("<ThresholdS> shutdown initialized")
@@ -158,7 +246,7 @@ func (tS *ThresholdService) Shutdown() {
utils.Logger.Info("<ThresholdS> shutdown complete")
}
// backup will regularly store resources changed to dataDB
// backup will regularly store thresholds changed to dataDB
func (tS *ThresholdService) runBackup() {
storeInterval := tS.cgrcfg.ThresholdSCfg().StoreInterval
if storeInterval <= 0 {
@@ -179,7 +267,7 @@ func (tS *ThresholdService) runBackup() {
// storeThresholds represents one task of complete backup
func (tS *ThresholdService) storeThresholds() {
var failedTdIDs []string
for { // don't stop until we store all dirty resources
for { // don't stop until we store all dirty thresholds
tS.stMux.Lock()
tID := tS.storedTdIDs.GetOne()
if tID != "" {
@@ -189,11 +277,17 @@ func (tS *ThresholdService) storeThresholds() {
if tID == "" {
break // no more keys, backup completed
}
if tIf, ok := Cache.Get(utils.CacheThresholds, tID); !ok || tIf == nil {
tIf, ok := Cache.Get(utils.CacheThresholds, tID)
if !ok || tIf == nil {
utils.Logger.Warning(fmt.Sprintf("<ThresholdS> failed retrieving from cache treshold with ID: %s", tID))
} else if err := tS.StoreThreshold(tIf.(*Threshold)); err != nil {
continue
}
t := tIf.(*Threshold)
t.lock(utils.EmptyString)
if err := tS.StoreThreshold(t); err != nil {
failedTdIDs = append(failedTdIDs, tID) // record failure so we can schedule it for next backup
}
t.unlock()
// randomize the CPU load and give up thread control
runtime.Gosched()
}
@@ -215,6 +309,16 @@ func (tS *ThresholdService) StoreThreshold(t *Threshold) (err error) {
t.Tenant, t.ID, err.Error()))
return
}
//since we no longer handle cache in DataManager do here a manual caching
if tntID := t.TenantID(); Cache.HasItem(utils.CacheThresholds, tntID) { // only cache if previously there
if err = Cache.Set(utils.CacheThresholds, tntID, t, nil,
true, utils.NonTransactional); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<ThresholdService> failed caching Threshold with ID: %s, error: %s",
t.TenantID(), err.Error()))
return
}
}
*t.dirty = false
return
}
@@ -241,30 +345,50 @@ func (tS *ThresholdService) matchingThresholdsForEvent(tnt string, args *Thresho
}
ts = make(Thresholds, 0, len(tIDs))
for tID := range tIDs {
tPrfl, err := tS.dm.GetThresholdProfile(tnt, tID, true, true, utils.NonTransactional)
if err != nil {
lkPrflID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout,
thresholdProfileLockKey(tnt, tID))
var tPrfl *ThresholdProfile
if tPrfl, err = tS.dm.GetThresholdProfile(tnt, tID, true, true, utils.NonTransactional); err != nil {
guardian.Guardian.UnguardIDs(lkPrflID)
if err == utils.ErrNotFound {
err = nil
continue
}
ts.unlock()
return nil, err
}
tPrfl.lock(lkPrflID)
if tPrfl.ActivationInterval != nil && args.Time != nil &&
!tPrfl.ActivationInterval.IsActiveAtTime(*args.Time) { // not active
tPrfl.unlock()
continue
}
if pass, err := tS.filterS.Pass(tnt, tPrfl.FilterIDs,
var pass bool
if pass, err = tS.filterS.Pass(tnt, tPrfl.FilterIDs,
evNm); err != nil {
tPrfl.unlock()
ts.unlock()
return nil, err
} else if !pass {
tPrfl.unlock()
continue
}
t, err := tS.dm.GetThreshold(tPrfl.Tenant, tPrfl.ID, true, true, "")
if err != nil {
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout,
thresholdLockKey(tPrfl.Tenant, tPrfl.ID))
var t *Threshold
if t, err = tS.dm.GetThreshold(tPrfl.Tenant, tPrfl.ID, true, true, ""); err != nil {
guardian.Guardian.UnguardIDs(lkID)
tPrfl.unlock()
if err == utils.ErrNotFound { // corner case where the threshold was removed due to MaxHits
err = nil
continue
}
ts.unlock()
return nil, err
}
t.lock(lkID)
if t.dirty == nil || tPrfl.MaxHits == -1 || t.Hits < tPrfl.MaxHits {
t.dirty = utils.BoolPointer(false)
}
@@ -277,7 +401,8 @@ func (tS *ThresholdService) matchingThresholdsForEvent(tnt string, args *Thresho
}
ts.Sort()
for i, t := range ts {
if t.tPrfl.Blocker { // blocker will stop processing
if t.tPrfl.Blocker && i != len(ts)-1 { // blocker will stop processing and we are not at last index
Thresholds(ts[i+1:]).unlock()
ts = ts[:i+1]
break
}
@@ -309,10 +434,7 @@ func (attr *ThresholdsArgsProcessEvent) RPCClone() (interface{}, error) {
func (attr *ThresholdsArgsProcessEvent) Clone() *ThresholdsArgsProcessEvent {
var thIDs []string
if attr.ThresholdIDs != nil {
thIDs = make([]string, len(attr.ThresholdIDs))
for i, id := range attr.ThresholdIDs {
thIDs[i] = id
}
thIDs = utils.CloneStringSlice(attr.ThresholdIDs)
}
return &ThresholdsArgsProcessEvent{
ThresholdIDs: thIDs,
@@ -322,8 +444,8 @@ func (attr *ThresholdsArgsProcessEvent) Clone() *ThresholdsArgsProcessEvent {
// processEvent processes a new event, dispatching to matching thresholds
func (tS *ThresholdService) processEvent(tnt string, args *ThresholdsArgsProcessEvent) (thresholdsIDs []string, err error) {
matchTs, err := tS.matchingThresholdsForEvent(tnt, args)
if err != nil {
var matchTs Thresholds
if matchTs, err = tS.matchingThresholdsForEvent(tnt, args); err != nil {
return nil, err
}
var withErrors bool
@@ -331,8 +453,7 @@ func (tS *ThresholdService) processEvent(tnt string, args *ThresholdsArgsProcess
for _, t := range matchTs {
thresholdsIDs = append(thresholdsIDs, t.ID)
t.Hits++
err = t.ProcessEvent(args, tS.dm)
if err != nil {
if err = t.ProcessEvent(args, tS.dm); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<ThresholdService> threshold: %s, ignoring event: %s, error: %s",
t.TenantID(), utils.ConcatenatedKey(tnt, args.CGREvent.ID), err.Error()))
@@ -347,11 +468,14 @@ func (tS *ThresholdService) processEvent(tnt string, args *ThresholdsArgsProcess
withErrors = true
}
//since we don't handle in DataManager caching we do a manual remove here
if err = tS.dm.CacheDataFromDB(utils.ThresholdPrefix, []string{t.TenantID()}, true); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<ThresholdService> failed removing from cache non-recurrent threshold: %s, error: %s",
t.TenantID(), err.Error()))
withErrors = true
if tntID := t.TenantID(); Cache.HasItem(utils.CacheThresholds, tntID) { // only cache if previously there
if err = Cache.Set(utils.CacheThresholds, tntID, nil, nil,
true, utils.NonTransactional); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<ThresholdService> failed removing from cache non-recurrent threshold: %s, error: %s",
t.TenantID(), err.Error()))
withErrors = true
}
}
continue
}
@@ -366,6 +490,7 @@ func (tS *ThresholdService) processEvent(tnt string, args *ThresholdsArgsProcess
tS.stMux.Unlock()
}
}
matchTs.unlock()
if withErrors {
err = utils.ErrPartiallyExecuted
}
@@ -411,6 +536,7 @@ func (tS *ThresholdService) V1GetThresholdsForEvent(args *ThresholdsArgsProcessE
var ts Thresholds
if ts, err = tS.matchingThresholdsForEvent(tnt, args); err == nil {
*reply = ts
ts.unlock()
}
return
}
@@ -440,6 +566,11 @@ func (tS *ThresholdService) V1GetThreshold(tntID *utils.TenantID, t *Threshold)
if tnt == utils.EmptyString {
tnt = tS.cgrcfg.GeneralCfg().DefaultTenant
}
// make sure threshold is locked at process level
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout,
thresholdLockKey(tnt, tntID.ID))
defer guardian.Guardian.UnguardIDs(lkID)
if thd, err = tS.dm.GetThreshold(tnt, tntID.ID, true, true, ""); err != nil {
return
}
@@ -447,19 +578,6 @@ func (tS *ThresholdService) V1GetThreshold(tntID *utils.TenantID, t *Threshold)
return
}
// Reload stops the backupLoop and restarts it
func (tS *ThresholdService) Reload() {
close(tS.stopBackup)
<-tS.loopStoped // wait until the loop is done
tS.stopBackup = make(chan struct{})
go tS.runBackup()
}
// StartLoop starts the gorutine with the backup loop
func (tS *ThresholdService) StartLoop() {
go tS.runBackup()
}
// V1ResetThreshold resets the threshold hits
func (tS *ThresholdService) V1ResetThreshold(tntID *utils.TenantID, rply *string) (err error) {
var thd *Threshold
@@ -467,6 +585,11 @@ func (tS *ThresholdService) V1ResetThreshold(tntID *utils.TenantID, rply *string
if tnt == utils.EmptyString {
tnt = tS.cgrcfg.GeneralCfg().DefaultTenant
}
// make sure threshold is locked at process level
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout,
thresholdLockKey(tnt, tntID.ID))
defer guardian.Guardian.UnguardIDs(lkID)
if thd, err = tS.dm.GetThreshold(tnt, tntID.ID, true, true, ""); err != nil {
return
}

View File

@@ -19,6 +19,7 @@ package engine
import (
"bytes"
"fmt"
"log"
"os"
"reflect"
@@ -29,6 +30,7 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
func TestThresholdsSort(t *testing.T) {
@@ -376,29 +378,36 @@ func TestThresholdsmatchingThresholdsForEvent(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", th, temptTh)
}
}
if thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[0].Tenant, argsGetThresholds[0]); err != nil {
thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[0].Tenant, argsGetThresholds[0])
if err != nil {
t.Errorf("Error: %+v", err)
} else if !reflect.DeepEqual(ths[0].Tenant, thMatched[0].Tenant) {
}
thMatched.unlock()
if !reflect.DeepEqual(ths[0].Tenant, thMatched[0].Tenant) {
t.Errorf("Expecting: %+v, received: %+v", ths[0].Tenant, thMatched[0].Tenant)
} else if !reflect.DeepEqual(ths[0].ID, thMatched[0].ID) {
t.Errorf("Expecting: %+v, received: %+v", ths[0].ID, thMatched[0].ID)
} else if !reflect.DeepEqual(ths[0].Hits, thMatched[0].Hits) {
t.Errorf("Expecting: %+v, received: %+v", ths[0].Hits, thMatched[0].Hits)
}
if thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[1].Tenant, argsGetThresholds[1]); err != nil {
thMatched, err = thServ.matchingThresholdsForEvent(argsGetThresholds[1].Tenant, argsGetThresholds[1])
if err != nil {
t.Errorf("Error: %+v", err)
} else if !reflect.DeepEqual(ths[1].Tenant, thMatched[0].Tenant) {
}
thMatched.unlock()
if !reflect.DeepEqual(ths[1].Tenant, thMatched[0].Tenant) {
t.Errorf("Expecting: %+v, received: %+v", ths[1].Tenant, thMatched[0].Tenant)
} else if !reflect.DeepEqual(ths[1].ID, thMatched[0].ID) {
t.Errorf("Expecting: %+v, received: %+v", ths[1].ID, thMatched[0].ID)
} else if !reflect.DeepEqual(ths[1].Hits, thMatched[0].Hits) {
t.Errorf("Expecting: %+v, received: %+v", ths[1].Hits, thMatched[0].Hits)
}
if thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[2].Tenant, argsGetThresholds[2]); err != nil {
thMatched, err = thServ.matchingThresholdsForEvent(argsGetThresholds[2].Tenant, argsGetThresholds[2])
if err != nil {
t.Errorf("Error: %+v", err)
} else if !reflect.DeepEqual(ths[2].Tenant, thMatched[0].Tenant) {
}
thMatched.unlock()
if !reflect.DeepEqual(ths[2].Tenant, thMatched[0].Tenant) {
t.Errorf("Expecting: %+v, received: %+v", ths[2].Tenant, thMatched[0].Tenant)
} else if !reflect.DeepEqual(ths[2].ID, thMatched[0].ID) {
t.Errorf("Expecting: %+v, received: %+v", ths[2].ID, thMatched[0].ID)
@@ -798,9 +807,12 @@ func TestThresholdsVerifyIfExecuted(t *testing.T) {
} else if !reflect.DeepEqual(thIDs, thMatched) {
t.Errorf("Expecting: %+v, received: %+v", thIDs, thMatched)
}
if thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[0].Tenant, argsGetThresholds[0]); err != nil {
thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[0].Tenant, argsGetThresholds[0])
if err != nil {
t.Errorf("Error: %+v", err)
} else if !reflect.DeepEqual(ths[0].Tenant, thMatched[0].Tenant) {
}
thMatched.unlock()
if !reflect.DeepEqual(ths[0].Tenant, thMatched[0].Tenant) {
t.Errorf("Expecting: %+v, received: %+v", ths[0].Tenant, thMatched[0].Tenant)
} else if !reflect.DeepEqual(ths[0].ID, thMatched[0].ID) {
t.Errorf("Expecting: %+v, received: %+v", ths[0].ID, thMatched[0].ID)
@@ -808,9 +820,12 @@ func TestThresholdsVerifyIfExecuted(t *testing.T) {
t.Errorf("Expecting: 1, received: %+v", thMatched[0].Hits)
}
if thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[1].Tenant, argsGetThresholds[1]); err != nil {
thMatched, err = thServ.matchingThresholdsForEvent(argsGetThresholds[1].Tenant, argsGetThresholds[1])
if err != nil {
t.Errorf("Error: %+v", err)
} else if !reflect.DeepEqual(ths[1].Tenant, thMatched[0].Tenant) {
}
thMatched.unlock()
if !reflect.DeepEqual(ths[1].Tenant, thMatched[0].Tenant) {
t.Errorf("Expecting: %+v, received: %+v", ths[1].Tenant, thMatched[0].Tenant)
} else if !reflect.DeepEqual(ths[1].ID, thMatched[0].ID) {
t.Errorf("Expecting: %+v, received: %+v", ths[1].ID, thMatched[0].ID)
@@ -818,9 +833,12 @@ func TestThresholdsVerifyIfExecuted(t *testing.T) {
t.Errorf("Expecting: 1, received: %+v", thMatched[0].Hits)
}
if thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[2].Tenant, argsGetThresholds[2]); err != nil {
thMatched, err = thServ.matchingThresholdsForEvent(argsGetThresholds[2].Tenant, argsGetThresholds[2])
if err != nil {
t.Errorf("Error: %+v", err)
} else if !reflect.DeepEqual(ths[2].Tenant, thMatched[0].Tenant) {
}
thMatched.unlock()
if !reflect.DeepEqual(ths[2].Tenant, thMatched[0].Tenant) {
t.Errorf("Expecting: %+v, received: %+v", ths[2].Tenant, thMatched[0].Tenant)
} else if !reflect.DeepEqual(ths[2].ID, thMatched[0].ID) {
t.Errorf("Expecting: %+v, received: %+v", ths[2].ID, thMatched[0].ID)
@@ -1058,18 +1076,18 @@ func TestThresholdsProcessEvent2(t *testing.T) {
} else if !reflect.DeepEqual(thIDs, thMatched) && !reflect.DeepEqual(thIDsRev, thMatched) {
t.Errorf("Expecting: %+v, received: %+v", thIDs, thMatched)
}
if thMatched, err := thServ.matchingThresholdsForEvent(ev.Tenant, ev); err != nil {
t.Errorf("Error: %+v", err)
} else {
for _, thM := range thMatched {
if !reflect.DeepEqual(thPrf.Tenant, thM.Tenant) {
t.Errorf("Expecting: %+v, received: %+v", thPrf.Tenant, thM.Tenant)
} else if reflect.DeepEqual(thIDs[0], thM.ID) && thM.Hits != 1 {
t.Errorf("Expecting: 1 for %+v, received: %+v", thM.ID, thM.Hits)
} else if reflect.DeepEqual(thIDs[1], thM.ID) && thM.Hits != 1 {
t.Errorf("Expecting: 1 for %+v, received: %+v", thM.ID, thM.Hits)
}
thMatched, err := thServ.matchingThresholdsForEvent(ev.Tenant, ev)
if err != nil {
t.Fatalf("Error: %+v", err)
}
thMatched.unlock()
for _, thM := range thMatched {
if !reflect.DeepEqual(thPrf.Tenant, thM.Tenant) {
t.Errorf("Expecting: %+v, received: %+v", thPrf.Tenant, thM.Tenant)
} else if reflect.DeepEqual(thIDs[0], thM.ID) && thM.Hits != 1 {
t.Errorf("Expecting: 1 for %+v, received: %+v", thM.ID, thM.Hits)
} else if reflect.DeepEqual(thIDs[1], thM.ID) && thM.Hits != 1 {
t.Errorf("Expecting: 1 for %+v, received: %+v", thM.ID, thM.Hits)
}
}
}
@@ -1355,22 +1373,15 @@ func TestThresholdsStoreThresholdsOK(t *testing.T) {
dm := NewDataManager(data, cfg.CacheCfg(), nil)
tS := NewThresholdService(dm, cfg, nil)
value := &Threshold{
dirty: utils.BoolPointer(true),
Tenant: "cgrates.org",
ID: "TH1",
Hits: 2,
}
Cache.SetWithoutReplicate(utils.CacheThresholds, "TH1", value, nil, true,
utils.NonTransactional)
tS.storedTdIDs.Add("TH1")
exp := &Threshold{
dirty: utils.BoolPointer(false),
Tenant: "cgrates.org",
ID: "TH1",
Hits: 2,
}
Cache.SetWithoutReplicate(utils.CacheThresholds, "cgrates.org:TH1", exp, nil, true,
utils.NonTransactional)
tS.storedTdIDs.Add("cgrates.org:TH1")
tS.storeThresholds()
if rcv, err := tS.dm.GetThreshold("cgrates.org", "TH1", true, false,
@@ -1381,7 +1392,7 @@ func TestThresholdsStoreThresholdsOK(t *testing.T) {
utils.ToJSON(exp), utils.ToJSON(rcv))
}
Cache.Remove(utils.CacheThresholds, "TH1", true, utils.NonTransactional)
Cache.Remove(utils.CacheThresholds, "cgrates.org:TH1", true, utils.NonTransactional)
}
func TestThresholdsStoreThresholdsStoreThErr(t *testing.T) {
@@ -1614,13 +1625,10 @@ func TestThresholdsProcessEventStoreThOK(t *testing.T) {
Tenant: "cgrates.org",
ID: "TH2",
FilterIDs: []string{"*string:~*req.Account:1001"},
ActivationInterval: &utils.ActivationInterval{
ExpiryTime: time.Date(2021, 6, 1, 12, 0, 0, 0, time.UTC),
},
MinHits: 2,
MaxHits: 5,
Weight: 10,
Blocker: true,
MinHits: 2,
MaxHits: 5,
Weight: 10,
Blocker: true,
}
if err := dm.SetThresholdProfile(thPrf, true); err != nil {
@@ -1653,8 +1661,7 @@ func TestThresholdsProcessEventStoreThOK(t *testing.T) {
} else {
rcv.tPrfl = nil
rcv.dirty = nil
var snooze time.Time
rcv.Snooze = snooze
rcv.Snooze = time.Time{}
if !reflect.DeepEqual(rcv, exp) {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv)
}
@@ -1668,16 +1675,28 @@ func TestThresholdsProcessEventMaxHitsDMErr(t *testing.T) {
var buf bytes.Buffer
log.SetOutput(&buf)
defer func() {
log.SetOutput(os.Stderr)
}()
tmp := config.CgrConfig()
tmpC := Cache
tmpCMgr := connMgr
cfg := config.NewDefaultCGRConfig()
cfg.RPCConns()["test"] = &config.RPCConn{Conns: []*config.RemoteHost{{}}}
cfg.CacheCfg().ReplicationConns = []string{"test"}
cfg.CacheCfg().Partitions[utils.CacheThresholds].Replicate = true
config.SetCgrConfig(cfg)
data := NewInternalDB(nil, nil, true)
dm := NewDataManager(data, cfg.CacheCfg(), nil)
connMgr = NewConnManager(cfg, make(map[string]chan rpcclient.ClientConnector))
dm := NewDataManager(data, cfg.CacheCfg(), connMgr)
filterS := NewFilterS(cfg, nil, dm)
tS := NewThresholdService(nil, cfg, filterS)
Cache = NewCacheS(cfg, dm, nil)
defer func() {
connMgr = tmpCMgr
Cache = tmpC
config.SetCgrConfig(tmp)
log.SetOutput(os.Stderr)
}()
thPrf := &ThresholdProfile{
Tenant: "cgrates.org",
ID: "TH3",
@@ -1716,7 +1735,7 @@ func TestThresholdsProcessEventMaxHitsDMErr(t *testing.T) {
}
expLog1 := `[WARNING] <ThresholdService> failed removing from database non-recurrent threshold: cgrates.org:TH3, error: NO_DATABASE_CONNECTION`
expLog2 := `[WARNING] <ThresholdService> failed removing from cache non-recurrent threshold: cgrates.org:TH3, error: NO_DATABASE_CONNECTION`
expLog2 := `[WARNING] <ThresholdService> failed removing from cache non-recurrent threshold: cgrates.org:TH3, error: DISCONNECTED`
if _, err := tS.processEvent(args.Tenant, args); err == nil ||
err != utils.ErrPartiallyExecuted {
@@ -2081,3 +2100,389 @@ func TestThresholdsV1GetThresholdNotFoundErr(t *testing.T) {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", utils.ErrNotFound, err)
}
}
func TestThresholdMatchingThresholdForEventLocks(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
tmp := Cache
defer func() { Cache = tmp }()
Cache = NewCacheS(cfg, nil, nil)
db := NewInternalDB(nil, nil, true)
dm := NewDataManager(db, config.CgrConfig().CacheCfg(), nil)
cfg.ThresholdSCfg().StoreInterval = 1
cfg.ThresholdSCfg().StringIndexedFields = nil
cfg.ThresholdSCfg().PrefixIndexedFields = nil
rS := NewThresholdService(dm, cfg,
&FilterS{dm: dm, cfg: cfg})
prfs := make([]*ThresholdProfile, 0)
ids := utils.StringSet{}
for i := 0; i < 10; i++ {
rPrf := &ThresholdProfile{
Tenant: "cgrates.org",
ID: fmt.Sprintf("TH%d", i),
Weight: 20.00,
MaxHits: 5,
}
dm.SetThresholdProfile(rPrf, true)
prfs = append(prfs, rPrf)
ids.Add(rPrf.ID)
}
dm.RemoveThreshold("cgrates.org", "TH1")
mth, err := rS.matchingThresholdsForEvent("cgrates.org", &ThresholdsArgsProcessEvent{
ThresholdIDs: ids.AsSlice(),
CGREvent: new(utils.CGREvent),
})
if err != nil {
t.Errorf("Error: %+v", err)
}
defer mth.unlock()
for _, rPrf := range prfs {
if rPrf.ID == "TH1" {
if rPrf.isLocked() {
t.Fatalf("Expected profile to not be locked %q", rPrf.ID)
}
continue
}
if !rPrf.isLocked() {
t.Fatalf("Expected profile to be locked %q", rPrf.ID)
}
if r, err := dm.GetThreshold(rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil {
t.Errorf("error %s for <%s>", err, rPrf.ID)
} else if !r.isLocked() {
t.Fatalf("Expected Threshold to be locked %q", rPrf.ID)
}
}
}
func TestThresholdMatchingThresholdForEventLocks2(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
tmp := Cache
defer func() { Cache = tmp }()
Cache = NewCacheS(cfg, nil, nil)
db := NewInternalDB(nil, nil, true)
dm := NewDataManager(db, config.CgrConfig().CacheCfg(), nil)
cfg.ThresholdSCfg().StoreInterval = 1
cfg.ThresholdSCfg().StringIndexedFields = nil
cfg.ThresholdSCfg().PrefixIndexedFields = nil
rS := NewThresholdService(dm, cfg,
&FilterS{dm: dm, cfg: cfg})
prfs := make([]*ThresholdProfile, 0)
ids := utils.StringSet{}
for i := 0; i < 10; i++ {
rPrf := &ThresholdProfile{
Tenant: "cgrates.org",
ID: fmt.Sprintf("TH%d", i),
Weight: 20.00,
MaxHits: 5,
}
dm.SetThresholdProfile(rPrf, true)
prfs = append(prfs, rPrf)
ids.Add(rPrf.ID)
}
rPrf := &ThresholdProfile{
Tenant: "cgrates.org",
ID: "TH20",
FilterIDs: []string{"FLTR_RES_201"},
Weight: 20.00,
MaxHits: 5,
}
err = db.SetThresholdProfileDrv(rPrf)
if err != nil {
t.Fatal(err)
}
prfs = append(prfs, rPrf)
ids.Add(rPrf.ID)
_, err := rS.matchingThresholdsForEvent("cgrates.org", &ThresholdsArgsProcessEvent{
ThresholdIDs: ids.AsSlice(),
CGREvent: new(utils.CGREvent),
})
expErr := utils.ErrPrefixNotFound(rPrf.FilterIDs[0])
if err == nil || err.Error() != expErr.Error() {
t.Errorf("Expected error: %s ,received: %+v", expErr, err)
}
for _, rPrf := range prfs {
if rPrf.isLocked() {
t.Fatalf("Expected profile to not be locked %q", rPrf.ID)
}
if rPrf.ID == "TH20" {
continue
}
if r, err := dm.GetThreshold(rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil {
t.Errorf("error %s for <%s>", err, rPrf.ID)
} else if r.isLocked() {
t.Fatalf("Expected Threshold to not be locked %q", rPrf.ID)
}
}
}
func TestThresholdMatchingThresholdForEventLocksBlocker(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
tmp := Cache
defer func() { Cache = tmp }()
Cache = NewCacheS(cfg, nil, nil)
db := NewInternalDB(nil, nil, true)
dm := NewDataManager(db, config.CgrConfig().CacheCfg(), nil)
cfg.ThresholdSCfg().StoreInterval = 1
cfg.ThresholdSCfg().StringIndexedFields = nil
cfg.ThresholdSCfg().PrefixIndexedFields = nil
rS := NewThresholdService(dm, cfg,
&FilterS{dm: dm, cfg: cfg})
prfs := make([]*ThresholdProfile, 0)
ids := utils.StringSet{}
for i := 0; i < 10; i++ {
rPrf := &ThresholdProfile{
Tenant: "cgrates.org",
ID: fmt.Sprintf("TH%d", i),
Weight: float64(10 - i),
Blocker: i == 4,
MaxHits: 5,
}
dm.SetThresholdProfile(rPrf, true)
prfs = append(prfs, rPrf)
ids.Add(rPrf.ID)
}
mres, err := rS.matchingThresholdsForEvent("cgrates.org", &ThresholdsArgsProcessEvent{
ThresholdIDs: ids.AsSlice(),
CGREvent: new(utils.CGREvent),
})
if err != nil {
t.Errorf("Error: %+v", err)
}
defer mres.unlock()
if len(mres) != 5 {
t.Fatal("Expected 6 Thresholds")
}
for _, rPrf := range prfs[5:] {
if rPrf.isLocked() {
t.Errorf("Expected profile to not be locked %q", rPrf.ID)
}
if r, err := dm.GetThreshold(rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil {
t.Errorf("error %s for <%s>", err, rPrf.ID)
} else if r.isLocked() {
t.Fatalf("Expected Threshold to not be locked %q", rPrf.ID)
}
}
for _, rPrf := range prfs[:5] {
if !rPrf.isLocked() {
t.Errorf("Expected profile to be locked %q", rPrf.ID)
}
if r, err := dm.GetThreshold(rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil {
t.Errorf("error %s for <%s>", err, rPrf.ID)
} else if !r.isLocked() {
t.Fatalf("Expected Threshold to be locked %q", rPrf.ID)
}
}
}
func TestThresholdMatchingThresholdForEventLocksActivationInterval(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
tmp := Cache
defer func() { Cache = tmp }()
Cache = NewCacheS(cfg, nil, nil)
db := NewInternalDB(nil, nil, true)
dm := NewDataManager(db, config.CgrConfig().CacheCfg(), nil)
cfg.ThresholdSCfg().StoreInterval = 1
cfg.ThresholdSCfg().StringIndexedFields = nil
cfg.ThresholdSCfg().PrefixIndexedFields = nil
rS := NewThresholdService(dm, cfg,
&FilterS{dm: dm, cfg: cfg})
ids := utils.StringSet{}
for i := 0; i < 10; i++ {
rPrf := &ThresholdProfile{
Tenant: "cgrates.org",
ID: fmt.Sprintf("TH%d", i),
MaxHits: 5,
Weight: 20.00,
}
dm.SetThresholdProfile(rPrf, true)
ids.Add(rPrf.ID)
}
rPrf := &ThresholdProfile{
Tenant: "cgrates.org",
ID: "TH21",
MaxHits: 5,
Weight: 20.00,
ActivationInterval: &utils.ActivationInterval{
ExpiryTime: time.Now().Add(-5 * time.Second),
},
}
dm.SetThresholdProfile(rPrf, true)
ids.Add(rPrf.ID)
mres, err := rS.matchingThresholdsForEvent("cgrates.org", &ThresholdsArgsProcessEvent{
ThresholdIDs: ids.AsSlice(),
CGREvent: &utils.CGREvent{Time: utils.TimePointer(time.Now())},
})
if err != nil {
t.Errorf("Error: %+v", err)
}
defer mres.unlock()
if rPrf.isLocked() {
t.Fatalf("Expected profile to not be locked %q", rPrf.ID)
}
if r, err := dm.GetThreshold(rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil {
t.Errorf("error %s for <%s>", err, rPrf.ID)
} else if r.isLocked() {
t.Fatalf("Expected Threshold to not be locked %q", rPrf.ID)
}
}
func TestThresholdMatchingThresholdForEventLocks3(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
prfs := make([]*ThresholdProfile, 0)
tmp := Cache
defer func() { Cache = tmp }()
Cache = NewCacheS(cfg, nil, nil)
db := &DataDBMock{
GetThresholdProfileDrvF: func(tnt, id string) (*ThresholdProfile, error) {
if id == "TH1" {
return nil, utils.ErrNotImplemented
}
rPrf := &ThresholdProfile{
Tenant: "cgrates.org",
ID: id,
MaxHits: 5,
Weight: 20.00,
}
Cache.Set(utils.CacheThresholds, rPrf.TenantID(), &Threshold{
Tenant: rPrf.Tenant,
ID: rPrf.ID,
}, nil, true, utils.NonTransactional)
prfs = append(prfs, rPrf)
return rPrf, nil
},
}
dm := NewDataManager(db, config.CgrConfig().CacheCfg(), nil)
cfg.ThresholdSCfg().StoreInterval = 1
cfg.ThresholdSCfg().StringIndexedFields = nil
cfg.ThresholdSCfg().PrefixIndexedFields = nil
rS := NewThresholdService(dm, cfg,
&FilterS{dm: dm, cfg: cfg})
ids := utils.StringSet{}
for i := 0; i < 10; i++ {
ids.Add(fmt.Sprintf("TH%d", i))
}
_, err := rS.matchingThresholdsForEvent("cgrates.org", &ThresholdsArgsProcessEvent{
ThresholdIDs: ids.AsSlice(),
CGREvent: new(utils.CGREvent),
})
if err != utils.ErrNotImplemented {
t.Fatalf("Error: %+v", err)
}
for _, rPrf := range prfs {
if rPrf.isLocked() {
t.Fatalf("Expected profile to not be locked %q", rPrf.ID)
}
}
}
func TestThresholdMatchingThresholdForEventLocks4(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
tmp := Cache
defer func() { Cache = tmp }()
Cache = NewCacheS(cfg, nil, nil)
db := NewInternalDB(nil, nil, true)
dm := NewDataManager(db, config.CgrConfig().CacheCfg(), nil)
cfg.ThresholdSCfg().StoreInterval = 1
cfg.ThresholdSCfg().StringIndexedFields = nil
cfg.ThresholdSCfg().PrefixIndexedFields = nil
rS := NewThresholdService(dm, cfg,
&FilterS{dm: dm, cfg: cfg})
prfs := make([]*ThresholdProfile, 0)
ids := utils.StringSet{}
for i := 0; i < 10; i++ {
rPrf := &ThresholdProfile{
Tenant: "cgrates.org",
ID: fmt.Sprintf("TH%d", i),
Weight: 20.00,
MaxHits: 5,
}
dm.SetThresholdProfile(rPrf, true)
prfs = append(prfs, rPrf)
ids.Add(rPrf.ID)
}
ids.Add("TH20")
mres, err := rS.matchingThresholdsForEvent("cgrates.org", &ThresholdsArgsProcessEvent{
ThresholdIDs: ids.AsSlice(),
CGREvent: new(utils.CGREvent),
})
if err != nil {
t.Errorf("Error: %+v", err)
}
defer mres.unlock()
for _, rPrf := range prfs {
if !rPrf.isLocked() {
t.Errorf("Expected profile to be locked %q", rPrf.ID)
}
if r, err := dm.GetThreshold(rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil {
t.Errorf("error %s for <%s>", err, rPrf.ID)
} else if !r.isLocked() {
t.Fatalf("Expected Threshold to be locked %q", rPrf.ID)
}
}
}
func TestThresholdMatchingThresholdForEventLocks5(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
tmp := Cache
tmpC := config.CgrConfig()
defer func() {
Cache = tmp
config.SetCgrConfig(tmpC)
}()
Cache = NewCacheS(cfg, nil, nil)
db := NewInternalDB(nil, nil, true)
dm := NewDataManager(db, config.CgrConfig().CacheCfg(), NewConnManager(cfg, make(map[string]chan rpcclient.ClientConnector)))
cfg.ThresholdSCfg().StoreInterval = 1
cfg.ThresholdSCfg().StringIndexedFields = nil
cfg.ThresholdSCfg().PrefixIndexedFields = nil
cfg.RPCConns()["test"] = &config.RPCConn{Conns: []*config.RemoteHost{{}}}
cfg.DataDbCfg().RmtConns = []string{"test"}
cfg.DataDbCfg().Items[utils.CacheThresholds].Remote = true
config.SetCgrConfig(cfg)
rS := NewThresholdService(dm, cfg,
&FilterS{dm: dm, cfg: cfg})
prfs := make([]*ThresholdProfile, 0)
ids := utils.StringSet{}
for i := 0; i < 10; i++ {
rPrf := &ThresholdProfile{
Tenant: "cgrates.org",
ID: fmt.Sprintf("TH%d", i),
Weight: 20.00,
MaxHits: 5,
}
dm.SetThresholdProfile(rPrf, true)
prfs = append(prfs, rPrf)
ids.Add(rPrf.ID)
}
dm.RemoveThreshold("cgrates.org", "TH1")
_, err := rS.matchingThresholdsForEvent("cgrates.org", &ThresholdsArgsProcessEvent{
ThresholdIDs: ids.AsSlice(),
CGREvent: new(utils.CGREvent),
})
if err != utils.ErrDisconnected {
t.Errorf("Error: %+v", err)
}
for _, rPrf := range prfs {
if rPrf.isLocked() {
t.Fatalf("Expected profile to not be locked %q", rPrf.ID)
}
if rPrf.ID == "TH1" {
continue
}
if r, err := dm.GetThreshold(rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil {
t.Errorf("error %s for <%s>", err, rPrf.ID)
} else if r.isLocked() {
t.Fatalf("Expected Threshold to not be locked %q", rPrf.ID)
}
}
}