refactor matched thresholds sorting

This commit is contained in:
ionutboangiu
2025-03-24 17:45:57 +02:00
committed by Dan Christian Bogos
parent 4083eb35df
commit 52177197f2
7 changed files with 66 additions and 87 deletions

View File

@@ -193,7 +193,7 @@ func (tSv1 *ThresholdSv1) GetThresholdIDs(ctx *context.Context, args *utils.Tena
}
// GetThresholdsForEvent returns a list of thresholds matching an event
func (tSv1 *ThresholdSv1) GetThresholdsForEvent(ctx *context.Context, args *utils.CGREvent, reply *engine.Thresholds) error {
func (tSv1 *ThresholdSv1) GetThresholdsForEvent(ctx *context.Context, args *utils.CGREvent, reply *[]*engine.Threshold) error {
return tSv1.tS.V1GetThresholdsForEvent(ctx, args, reply)
}

View File

@@ -439,20 +439,20 @@ func testThresholdsGetThresholdsForEvent(t *testing.T) {
utils.OptsThresholdsProfileIDs: []string{"THD_1", "THD_2"},
},
}
expThs := engine.Thresholds{
&engine.Threshold{
expThs := []*engine.Threshold{
{
Tenant: "cgrates.org",
ID: "THD_2",
Hits: 0,
},
&engine.Threshold{
{
Tenant: "cgrates.org",
ID: "THD_1",
Hits: 0,
},
}
var rplyThs engine.Thresholds
var rplyThs []*engine.Threshold
if err := thRPC.Call(context.Background(), utils.ThresholdSv1GetThresholdsForEvent,
args, &rplyThs); err != nil {
t.Error(err)
@@ -500,7 +500,7 @@ func testThresholdsGetThresholdsAfterRemove(t *testing.T) {
},
}
var rplyThs engine.Thresholds
var rplyThs []*engine.Threshold
if err := thRPC.Call(context.Background(), utils.ThresholdSv1GetThresholdsForEvent,
args, &rplyThs); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", utils.ErrNotFound, err)
@@ -621,20 +621,20 @@ func testThresholdsGetThresholdsAfterFirstEvent(t *testing.T) {
utils.OptsThresholdsProfileIDs: []string{"THD_1", "THD_2"},
},
}
expThs := engine.Thresholds{
&engine.Threshold{
expThs := []*engine.Threshold{
{
Tenant: "cgrates.org",
ID: "THD_2",
Hits: 1,
},
&engine.Threshold{
{
Tenant: "cgrates.org",
ID: "THD_1",
Hits: 1,
},
}
var rplyThs engine.Thresholds
var rplyThs []*engine.Threshold
if err := thRPC.Call(context.Background(), utils.ThresholdSv1GetThresholdsForEvent,
args, &rplyThs); err != nil {
t.Error(err)
@@ -660,20 +660,20 @@ func testThresholdsGetThresholdsAfterSecondEvent(t *testing.T) {
utils.OptsThresholdsProfileIDs: []string{"THD_1", "THD_2"},
},
}
expThs := engine.Thresholds{
&engine.Threshold{
expThs := []*engine.Threshold{
{
Tenant: "cgrates.org",
ID: "THD_2",
Hits: 0,
},
&engine.Threshold{
{
Tenant: "cgrates.org",
ID: "THD_1",
Hits: 0,
},
}
var rplyThs engine.Thresholds
var rplyThs []*engine.Threshold
if err := thRPC.Call(context.Background(), utils.ThresholdSv1GetThresholdsForEvent,
args, &rplyThs); err != nil {
t.Error(err)

View File

@@ -691,7 +691,7 @@ func TestThresholdsAPIs(t *testing.T) {
},
}
expThresholds := engine.Thresholds{
expThresholds := []*engine.Threshold{
{
Tenant: "cgrates.org",
ID: "thd1",
@@ -702,7 +702,7 @@ func TestThresholdsAPIs(t *testing.T) {
},
}
var rplyThresholds engine.Thresholds
var rplyThresholds []*engine.Threshold
if err := tSv1.GetThresholdsForEvent(context.Background(), args, &rplyThresholds); err != nil {
t.Error(err)
} else {

View File

@@ -62,6 +62,6 @@ func (self *CmdThresholdsForEvent) PostprocessRpcParams() error {
}
func (self *CmdThresholdsForEvent) RpcResult() any {
var s engine.Thresholds
var s []*engine.Threshold
return &s
}

View File

@@ -19,9 +19,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"cmp"
"fmt"
"runtime"
"sort"
"slices"
"sync"
"time"
@@ -101,10 +102,9 @@ type Threshold struct {
Hits int // number of hits for this threshold
Snooze time.Time // prevent threshold to run too early
lkID string // ID of the lock used when matching the threshold
tPrfl *ThresholdProfile
dirty *bool // needs save
weight float64
lkID string // ID of the lock used when matching the threshold
tPrfl *ThresholdProfile
dirty *bool // needs save
}
// TenantID returns the concatenated key beteen tenant and ID
@@ -187,16 +187,8 @@ func processEventWithThreshold(ctx *context.Context, connMgr *ConnManager, actio
return
}
// Thresholds is a sortable slice of Threshold
type Thresholds []*Threshold
// Sort sorts based on Weight
func (ts Thresholds) Sort() {
sort.Slice(ts, func(i, j int) bool { return ts[i].weight > ts[j].weight })
}
// unlock will unlock thresholds part of this slice
func (ts Thresholds) unlock() {
// unlockThresholds unlocks all locked thresholds in the given slice.
func unlockThresholds(ts []*Threshold) {
for _, t := range ts {
t.unlock()
if t.tPrfl != nil {
@@ -327,7 +319,7 @@ func (tS *ThresholdS) StoreThreshold(ctx *context.Context, t *Threshold) (err er
}
// matchingThresholdsForEvent returns ordered list of matching thresholds which are active for an Event
func (tS *ThresholdS) matchingThresholdsForEvent(ctx *context.Context, tnt string, args *utils.CGREvent) (ts Thresholds, err error) {
func (tS *ThresholdS) matchingThresholdsForEvent(ctx *context.Context, tnt string, args *utils.CGREvent) (ts []*Threshold, err error) {
evNm := utils.MapStorage{
utils.MetaReq: args.Event,
utils.MetaOpts: args.APIOpts,
@@ -360,7 +352,8 @@ func (tS *ThresholdS) matchingThresholdsForEvent(ctx *context.Context, tnt strin
return nil, err
}
}
ts = make(Thresholds, 0, len(tIDs))
ts = make([]*Threshold, 0, len(tIDs))
weights := make(map[string]float64) // stores sorting weights by tID
for tID := range tIDs {
lkPrflID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout,
@@ -372,7 +365,7 @@ func (tS *ThresholdS) matchingThresholdsForEvent(ctx *context.Context, tnt strin
err = nil
continue
}
ts.unlock()
unlockThresholds(ts)
return nil, err
}
tPrfl.lock(lkPrflID)
@@ -381,7 +374,7 @@ func (tS *ThresholdS) matchingThresholdsForEvent(ctx *context.Context, tnt strin
if pass, err = tS.fltrS.Pass(ctx, tnt, tPrfl.FilterIDs,
evNm); err != nil {
tPrfl.unlock()
ts.unlock()
unlockThresholds(ts)
return nil, err
} else if !pass {
tPrfl.unlock()
@@ -399,7 +392,7 @@ func (tS *ThresholdS) matchingThresholdsForEvent(ctx *context.Context, tnt strin
err = nil
continue
}
ts.unlock()
unlockThresholds(ts)
return nil, err
}
t.lock(lkID)
@@ -408,20 +401,27 @@ func (tS *ThresholdS) matchingThresholdsForEvent(ctx *context.Context, tnt strin
}
t.tPrfl = tPrfl
if t.weight, err = WeightFromDynamics(ctx, tPrfl.Weights,
tS.fltrS, tnt, evNm); err != nil {
return
weight, err := WeightFromDynamics(ctx, tPrfl.Weights,
tS.fltrS, tnt, evNm)
if err != nil {
return nil, err
}
weights[t.ID] = weight
ts = append(ts, t)
}
// All good, convert from Map to Slice so we can sort
if len(ts) == 0 {
return nil, utils.ErrNotFound
}
ts.Sort()
// Sort by weight (higher values first).
slices.SortFunc(ts, func(a, b *Threshold) int {
return cmp.Compare(weights[b.ID], weights[a.ID])
})
for i, t := range ts {
if t.tPrfl.Blocker && i != len(ts)-1 { // blocker will stop processing and we are not at last index
Thresholds(ts[i+1:]).unlock()
unlockThresholds(ts[i+1:])
ts = ts[:i+1]
break
}
@@ -431,8 +431,8 @@ func (tS *ThresholdS) matchingThresholdsForEvent(ctx *context.Context, tnt strin
// processEvent processes a new event, dispatching to matching thresholds
func (tS *ThresholdS) processEvent(ctx *context.Context, tnt string, args *utils.CGREvent) (thresholdsIDs []string, err error) {
var matchTs Thresholds
if matchTs, err = tS.matchingThresholdsForEvent(ctx, tnt, args); err != nil {
matchTs, err := tS.matchingThresholdsForEvent(ctx, tnt, args)
if err != nil {
return nil, err
}
var withErrors bool
@@ -478,7 +478,7 @@ func (tS *ThresholdS) processEvent(ctx *context.Context, tnt string, args *utils
tS.stMux.Unlock()
}
}
matchTs.unlock()
unlockThresholds(matchTs)
if withErrors {
err = utils.ErrPartiallyExecuted
}
@@ -508,7 +508,7 @@ func (tS *ThresholdS) V1ProcessEvent(ctx *context.Context, args *utils.CGREvent,
}
// V1GetThresholdsForEvent queries thresholds matching an Event
func (tS *ThresholdS) V1GetThresholdsForEvent(ctx *context.Context, args *utils.CGREvent, reply *Thresholds) (err error) {
func (tS *ThresholdS) V1GetThresholdsForEvent(ctx *context.Context, args *utils.CGREvent, reply *[]*Threshold) (err error) {
if args == nil {
return utils.NewErrMandatoryIeMissing(utils.CGREventString)
}
@@ -521,10 +521,10 @@ func (tS *ThresholdS) V1GetThresholdsForEvent(ctx *context.Context, args *utils.
if tnt == utils.EmptyString {
tnt = tS.cfg.GeneralCfg().DefaultTenant
}
var ts Thresholds
var ts []*Threshold
if ts, err = tS.matchingThresholdsForEvent(ctx, tnt, args); err == nil {
*reply = ts
ts.unlock()
unlockThresholds(ts)
}
return
}

View File

@@ -31,25 +31,6 @@ import (
"github.com/cgrates/cgrates/utils"
)
func TestThresholdsSort(t *testing.T) {
ts := Thresholds{
&Threshold{weight: 30.0, tPrfl: &ThresholdProfile{ID: "FIRST"}},
&Threshold{weight: 40.0, tPrfl: &ThresholdProfile{ID: "SECOND"}},
&Threshold{weight: 30.0, tPrfl: &ThresholdProfile{ID: "THIRD"}},
&Threshold{weight: 35.0, tPrfl: &ThresholdProfile{ID: "FOURTH"}},
}
ts.Sort()
eInst := Thresholds{
&Threshold{weight: 40.0, tPrfl: &ThresholdProfile{ID: "SECOND"}},
&Threshold{weight: 35.0, tPrfl: &ThresholdProfile{ID: "FOURTH"}},
&Threshold{weight: 30.0, tPrfl: &ThresholdProfile{ID: "FIRST"}},
&Threshold{weight: 30.0, tPrfl: &ThresholdProfile{ID: "THIRD"}},
}
if !reflect.DeepEqual(eInst, ts) {
t.Errorf("expecting: %+v, received: %+v", utils.ToJSON(eInst), utils.ToJSON(ts))
}
}
func TestThresholdsCache(t *testing.T) {
var dmTH *DataManager
tPrfls := []*ThresholdProfile{
@@ -98,18 +79,18 @@ func TestThresholdsCache(t *testing.T) {
Async: false,
},
}
ths := Thresholds{
&Threshold{
ths := []*Threshold{
{
Tenant: "cgrates.org",
ID: "TH_1",
Hits: 0,
},
&Threshold{
{
Tenant: "cgrates.org",
ID: "TH_2",
Hits: 0,
},
&Threshold{
{
Tenant: "cgrates.org",
ID: "TH_3",
Hits: 0,
@@ -249,18 +230,18 @@ func TestThresholdsmatchingThresholdsForEvent(t *testing.T) {
Async: false,
},
}
ths := Thresholds{
&Threshold{
ths := []*Threshold{
{
Tenant: "cgrates.org",
ID: "TH_1",
Hits: 0,
},
&Threshold{
{
Tenant: "cgrates.org",
ID: "TH_2",
Hits: 0,
},
&Threshold{
{
Tenant: "cgrates.org",
ID: "TH_3",
Hits: 0,
@@ -378,7 +359,7 @@ func TestThresholdsmatchingThresholdsForEvent(t *testing.T) {
if err != nil {
t.Errorf("Error: %+v", err)
}
thMatched.unlock()
unlockThresholds(thMatched)
if !reflect.DeepEqual(ths[0].Tenant, thMatched[0].Tenant) {
t.Errorf("Expecting: %+v, received: %+v", ths[0].Tenant, thMatched[0].Tenant)
} else if !reflect.DeepEqual(ths[0].ID, thMatched[0].ID) {
@@ -390,7 +371,7 @@ func TestThresholdsmatchingThresholdsForEvent(t *testing.T) {
if err != nil {
t.Errorf("Error: %+v", err)
}
thMatched.unlock()
unlockThresholds(thMatched)
if !reflect.DeepEqual(ths[1].Tenant, thMatched[0].Tenant) {
t.Errorf("Expecting: %+v, received: %+v", ths[1].Tenant, thMatched[0].Tenant)
} else if !reflect.DeepEqual(ths[1].ID, thMatched[0].ID) {
@@ -402,7 +383,7 @@ func TestThresholdsmatchingThresholdsForEvent(t *testing.T) {
if err != nil {
t.Errorf("Error: %+v", err)
}
thMatched.unlock()
unlockThresholds(thMatched)
if !reflect.DeepEqual(ths[2].Tenant, thMatched[0].Tenant) {
t.Errorf("Expecting: %+v, received: %+v", ths[2].Tenant, thMatched[0].Tenant)
} else if !reflect.DeepEqual(ths[2].ID, thMatched[0].ID) {
@@ -1447,7 +1428,6 @@ func TestThresholdsProcessEventStoreThOK(t *testing.T) {
t.Error(err)
} else {
rcv.tPrfl = nil
rcv.weight = 0
rcv.dirty = nil
rcv.Snooze = time.Time{}
if !reflect.DeepEqual(rcv, exp) {
@@ -1947,7 +1927,7 @@ func TestThresholdMatchingThresholdForEventLocks(t *testing.T) {
if err != nil {
t.Errorf("Error: %+v", err)
}
defer mth.unlock()
defer unlockThresholds(mth)
for _, rPrf := range prfs {
if rPrf.ID == "TH1" {
if rPrf.isLocked() {
@@ -2079,7 +2059,7 @@ func TestThresholdMatchingThresholdForEventLocksBlocker(t *testing.T) {
if err != nil {
t.Errorf("Error: %+v", err)
}
defer mres.unlock()
defer unlockThresholds(mres)
if len(mres) != 5 {
t.Fatal("Expected 6 Thresholds")
}
@@ -2202,7 +2182,7 @@ func TestThresholdMatchingThresholdForEventLocks4(t *testing.T) {
if err != nil {
t.Errorf("Error: %+v", err)
}
defer mres.unlock()
defer unlockThresholds(mres)
for _, rPrf := range prfs {
if !rPrf.isLocked() {
t.Errorf("Expected profile to be locked %q", rPrf.ID)
@@ -2412,17 +2392,16 @@ func TestThresholdsV1GetThresholdsForEventOK(t *testing.T) {
},
}
exp := Thresholds{
exp := []*Threshold{
{
Tenant: "cgrates.org",
Hits: 0,
ID: "TH1",
tPrfl: thPrf,
dirty: utils.BoolPointer(false),
weight: 10,
},
}
var reply Thresholds
var reply []*Threshold
if err := tS.V1GetThresholdsForEvent(context.Background(), args, &reply); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(reply, exp) {
@@ -2464,7 +2443,7 @@ func TestThresholdsV1GetThresholdsForEventMissingArgs(t *testing.T) {
var args *utils.CGREvent
experr := `MANDATORY_IE_MISSING: [CGREvent]`
var reply Thresholds
var reply []*Threshold
if err := tS.V1GetThresholdsForEvent(context.Background(), args, &reply); err == nil ||
err.Error() != experr {
t.Error(err)

View File

@@ -195,7 +195,7 @@ func checkThresholdsHits(t *testing.T, client *birpc.Client, expectedThHits map[
}
method := utils.ThresholdSv1GetThresholdsForEvent
var reply engine.Thresholds
var reply []*engine.Threshold
if err := client.Call(context.Background(), utils.ThresholdSv1GetThresholdsForEvent,
&utils.CGREvent{
Tenant: "cgrates.org",