Add opts field in config for thresholds

This commit is contained in:
ionutboangiu
2021-09-15 18:38:27 +03:00
committed by Dan Christian Bogos
parent afeedae898
commit 11aecbd22f
21 changed files with 204 additions and 83 deletions

View File

@@ -522,31 +522,29 @@ func (rS *ResourceService) processThresholds(ctx *context.Context, rs Resources,
var withErrs bool
for _, r := range rs {
var thIDs []string
if len(r.rPrf.ThresholdIDs) != 0 {
if len(r.rPrf.ThresholdIDs) == 1 &&
r.rPrf.ThresholdIDs[0] == utils.MetaNone {
continue
}
thIDs = r.rPrf.ThresholdIDs
opts[utils.OptsThresholdsThresholdIDs] = r.rPrf.ThresholdIDs
}
thEv := &ThresholdsArgsProcessEvent{ThresholdIDs: thIDs,
CGREvent: &utils.CGREvent{
Tenant: r.Tenant,
ID: utils.GenUUID(),
Event: map[string]interface{}{
utils.EventType: utils.ResourceUpdate,
utils.ResourceID: r.ID,
utils.Usage: r.TotalUsage(),
},
APIOpts: opts,
thEv := &ThresholdsArgsProcessEvent{CGREvent: &utils.CGREvent{
Tenant: r.Tenant,
ID: utils.GenUUID(),
Event: map[string]interface{}{
utils.EventType: utils.ResourceUpdate,
utils.ResourceID: r.ID,
utils.Usage: r.TotalUsage(),
},
APIOpts: opts,
},
}
var tIDs []string
if err := rS.connMgr.Call(ctx, rS.cgrcfg.ResourceSCfg().ThresholdSConns,
utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
(len(thIDs) != 0 || err.Error() != utils.ErrNotFound.Error()) {
(len(opts[utils.OptsThresholdsThresholdIDs].([]string)) != 0 || err.Error() != utils.ErrNotFound.Error()) {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing event %+v with %s.",
utils.ResourceS, err.Error(), thEv, utils.ThresholdS))

View File

@@ -302,16 +302,14 @@ func (sS *StatService) processThresholds(ctx *context.Context, sQs StatQueues, o
opts[utils.MetaEventType] = utils.StatUpdate
var withErrs bool
for _, sq := range sQs {
var thIDs []string
if len(sq.sqPrfl.ThresholdIDs) != 0 {
if len(sq.sqPrfl.ThresholdIDs) == 1 &&
sq.sqPrfl.ThresholdIDs[0] == utils.MetaNone {
continue
}
thIDs = sq.sqPrfl.ThresholdIDs
opts[utils.OptsThresholdsThresholdIDs] = sq.sqPrfl.ThresholdIDs
}
thEv := &ThresholdsArgsProcessEvent{
ThresholdIDs: thIDs,
CGREvent: &utils.CGREvent{
Tenant: sq.Tenant,
ID: utils.GenUUID(),
@@ -328,7 +326,7 @@ func (sS *StatService) processThresholds(ctx *context.Context, sQs StatQueues, o
var tIDs []string
if err := sS.connMgr.Call(ctx, sS.cgrcfg.StatSCfg().ThresholdSConns,
utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
(len(thIDs) != 0 || err.Error() != utils.ErrNotFound.Error()) {
(len(opts[utils.OptsThresholdsThresholdIDs].([]string)) != 0 || err.Error() != utils.ErrNotFound.Error()) {
utils.Logger.Warning(
fmt.Sprintf("<StatS> error: %s processing event %+v with ThresholdS.", err.Error(), thEv))
withErrs = true

View File

@@ -2962,7 +2962,6 @@ func TestStatQueueProcessThresholdsOK(t *testing.T) {
calls: map[string]func(ctx *context.Context, args interface{}, reply interface{}) error{
utils.ThresholdSv1ProcessEvent: func(ctx *context.Context, args, reply interface{}) error {
exp := &ThresholdsArgsProcessEvent{
ThresholdIDs: []string{"TH1"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: args.(*ThresholdsArgsProcessEvent).CGREvent.ID,
@@ -2972,7 +2971,8 @@ func TestStatQueueProcessThresholdsOK(t *testing.T) {
"testMetricType": time.Duration(time.Hour),
},
APIOpts: map[string]interface{}{
utils.MetaEventType: utils.StatUpdate,
utils.MetaEventType: utils.StatUpdate,
utils.OptsThresholdsThresholdIDs: []string{"TH1"},
},
},
}

View File

@@ -328,7 +328,10 @@ func (tS *ThresholdService) matchingThresholdsForEvent(ctx *context.Context, tnt
utils.MetaReq: args.Event,
utils.MetaOpts: args.APIOpts,
}
tIDs := utils.NewStringSet(args.ThresholdIDs)
var tIDs utils.StringSet
if args.APIOpts[utils.OptsThresholdsThresholdIDs] != nil {
tIDs = utils.NewStringSet(args.APIOpts[utils.OptsThresholdsThresholdIDs].([]string))
}
if len(tIDs) == 0 {
tIDs, err = MatchingItemIDsForEvent(ctx, evNm,
tS.cgrcfg.ThresholdSCfg().StringIndexedFields,
@@ -406,7 +409,6 @@ func (tS *ThresholdService) matchingThresholdsForEvent(ctx *context.Context, tnt
// ThresholdsArgsProcessEvent are the arguments to proccess the event with thresholds
type ThresholdsArgsProcessEvent struct {
ThresholdIDs []string
*utils.CGREvent
clnb bool //rpcclonable
}
@@ -426,13 +428,8 @@ func (attr *ThresholdsArgsProcessEvent) RPCClone() (interface{}, error) {
// Clone creates a clone of the object
func (attr *ThresholdsArgsProcessEvent) Clone() *ThresholdsArgsProcessEvent {
var thIDs []string
if attr.ThresholdIDs != nil {
thIDs = utils.CloneStringSlice(attr.ThresholdIDs)
}
return &ThresholdsArgsProcessEvent{
ThresholdIDs: thIDs,
CGREvent: attr.CGREvent.Clone(),
CGREvent: attr.CGREvent.Clone(),
}
}
@@ -453,6 +450,7 @@ func (tS *ThresholdService) processEvent(ctx *context.Context, tnt string, args
fmt.Sprintf("<ThresholdService> threshold: %s, ignoring event: %s, error: %s",
t.TenantID(), utils.ConcatenatedKey(tnt, args.CGREvent.ID), err.Error()))
withErrors = true
fmt.Println("sal1")
continue
}
if t.dirty == nil || t.Hits == t.tPrfl.MaxHits { // one time threshold

View File

@@ -1463,21 +1463,25 @@ func TestThresholdsStoreThresholdNilDirtyField(t *testing.T) {
func TestThresholdsSetCloneable(t *testing.T) {
args := &ThresholdsArgsProcessEvent{
ThresholdIDs: []string{"THD_ID"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "EventTest",
Event: map[string]interface{}{},
APIOpts: map[string]interface{}{
utils.OptsThresholdsThresholdIDs: []string{"THD_ID"},
},
},
clnb: false,
}
exp := &ThresholdsArgsProcessEvent{
ThresholdIDs: []string{"THD_ID"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "EventTest",
Event: map[string]interface{}{},
APIOpts: map[string]interface{}{
utils.OptsThresholdsThresholdIDs: []string{"THD_ID"},
},
},
clnb: true,
}
@@ -1490,23 +1494,25 @@ func TestThresholdsSetCloneable(t *testing.T) {
func TestThresholdsRPCClone(t *testing.T) {
args := &ThresholdsArgsProcessEvent{
ThresholdIDs: []string{"THD_ID"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "EventTest",
Event: make(map[string]interface{}),
APIOpts: make(map[string]interface{}),
Tenant: "cgrates.org",
ID: "EventTest",
Event: make(map[string]interface{}),
APIOpts: map[string]interface{}{
utils.OptsThresholdsThresholdIDs: []string{"THD_ID"},
},
},
clnb: true,
}
exp := &ThresholdsArgsProcessEvent{
ThresholdIDs: []string{"THD_ID"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "EventTest",
Event: make(map[string]interface{}),
APIOpts: make(map[string]interface{}),
Tenant: "cgrates.org",
ID: "EventTest",
Event: make(map[string]interface{}),
APIOpts: map[string]interface{}{
utils.OptsThresholdsThresholdIDs: []string{"THD_ID"},
},
},
}
@@ -1541,13 +1547,15 @@ func TestThresholdsProcessEventStoreThOK(t *testing.T) {
}
args := &ThresholdsArgsProcessEvent{
ThresholdIDs: []string{"TH2"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "ThdProcessEvent",
Event: map[string]interface{}{
utils.AccountField: "1001",
},
APIOpts: map[string]interface{}{
utils.OptsThresholdsThresholdIDs: []string{"TH2"},
},
},
}
exp := &Threshold{
@@ -1627,13 +1635,15 @@ func TestThresholdsProcessEventMaxHitsDMErr(t *testing.T) {
}
args := &ThresholdsArgsProcessEvent{
ThresholdIDs: []string{"TH3"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "ThdProcessEvent",
Event: map[string]interface{}{
utils.AccountField: "1001",
},
APIOpts: map[string]interface{}{
utils.OptsThresholdsThresholdIDs: []string{"TH3"},
},
},
}
@@ -1686,13 +1696,15 @@ func TestThresholdsProcessEventNotFound(t *testing.T) {
}
args := &ThresholdsArgsProcessEvent{
ThresholdIDs: []string{"TH6"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "ThdProcessEvent",
Event: map[string]interface{}{
utils.AccountField: "1001",
},
APIOpts: map[string]interface{}{
utils.OptsThresholdsThresholdIDs: []string{"TH6"},
},
},
}
@@ -2027,9 +2039,13 @@ func TestThresholdMatchingThresholdForEventLocks(t *testing.T) {
ids.Add(rPrf.ID)
}
dm.RemoveThreshold(context.Background(), "cgrates.org", "TH1")
ev := &utils.CGREvent{
APIOpts: map[string]interface{}{
utils.OptsThresholdsThresholdIDs: ids.AsSlice(),
},
}
mth, err := rS.matchingThresholdsForEvent(context.Background(), "cgrates.org", &ThresholdsArgsProcessEvent{
ThresholdIDs: ids.AsSlice(),
CGREvent: new(utils.CGREvent),
CGREvent: ev,
})
if err != nil {
t.Errorf("Error: %+v", err)
@@ -2093,9 +2109,13 @@ func TestThresholdMatchingThresholdForEventLocks2(t *testing.T) {
}
prfs = append(prfs, rPrf)
ids.Add(rPrf.ID)
ev := &utils.CGREvent{
APIOpts: map[string]interface{}{
utils.OptsThresholdsThresholdIDs: ids.AsSlice(),
},
}
_, err := rS.matchingThresholdsForEvent(context.Background(), "cgrates.org", &ThresholdsArgsProcessEvent{
ThresholdIDs: ids.AsSlice(),
CGREvent: new(utils.CGREvent),
CGREvent: ev,
})
expErr := utils.ErrPrefixNotFound(rPrf.FilterIDs[0])
if err == nil || err.Error() != expErr.Error() {
@@ -2143,9 +2163,13 @@ func TestThresholdMatchingThresholdForEventLocksBlocker(t *testing.T) {
prfs = append(prfs, rPrf)
ids.Add(rPrf.ID)
}
ev := &utils.CGREvent{
APIOpts: map[string]interface{}{
utils.OptsThresholdsThresholdIDs: ids.AsSlice(),
},
}
mres, err := rS.matchingThresholdsForEvent(context.Background(), "cgrates.org", &ThresholdsArgsProcessEvent{
ThresholdIDs: ids.AsSlice(),
CGREvent: new(utils.CGREvent),
CGREvent: ev,
})
if err != nil {
t.Errorf("Error: %+v", err)
@@ -2212,9 +2236,13 @@ func TestThresholdMatchingThresholdForEventLocks3(t *testing.T) {
for i := 0; i < 10; i++ {
ids.Add(fmt.Sprintf("TH%d", i))
}
ev := &utils.CGREvent{
APIOpts: map[string]interface{}{
utils.OptsThresholdsThresholdIDs: ids.AsSlice(),
},
}
_, err := rS.matchingThresholdsForEvent(context.Background(), "cgrates.org", &ThresholdsArgsProcessEvent{
ThresholdIDs: ids.AsSlice(),
CGREvent: new(utils.CGREvent),
CGREvent: ev,
})
if err != utils.ErrNotImplemented {
t.Fatalf("Error: %+v", err)
@@ -2254,9 +2282,13 @@ func TestThresholdMatchingThresholdForEventLocks4(t *testing.T) {
ids.Add(rPrf.ID)
}
ids.Add("TH20")
ev := &utils.CGREvent{
APIOpts: map[string]interface{}{
utils.OptsThresholdsThresholdIDs: ids.AsSlice(),
},
}
mres, err := rS.matchingThresholdsForEvent(context.Background(), "cgrates.org", &ThresholdsArgsProcessEvent{
ThresholdIDs: ids.AsSlice(),
CGREvent: new(utils.CGREvent),
CGREvent: ev,
})
if err != nil {
t.Errorf("Error: %+v", err)
@@ -2309,10 +2341,14 @@ func TestThresholdMatchingThresholdForEventLocks5(t *testing.T) {
prfs = append(prfs, rPrf)
ids.Add(rPrf.ID)
}
ev := &utils.CGREvent{
APIOpts: map[string]interface{}{
utils.OptsThresholdsThresholdIDs: ids.AsSlice(),
},
}
dm.RemoveThreshold(context.Background(), "cgrates.org", "TH1")
_, err := rS.matchingThresholdsForEvent(context.Background(), "cgrates.org", &ThresholdsArgsProcessEvent{
ThresholdIDs: ids.AsSlice(),
CGREvent: new(utils.CGREvent),
CGREvent: ev,
})
if err != utils.ErrDisconnected {
t.Errorf("Error: %+v", err)
@@ -2474,7 +2510,6 @@ func TestThresholdsV1GetThresholdsForEventOK(t *testing.T) {
t.Error(err)
}
args := &ThresholdsArgsProcessEvent{
ThresholdIDs: []string{},
CGREvent: &utils.CGREvent{
ID: "TestGetThresholdsForEvent",
Event: map[string]interface{}{
@@ -2528,8 +2563,7 @@ func TestThresholdsV1GetThresholdsForEventMissingArgs(t *testing.T) {
t.Error(err)
}
args := &ThresholdsArgsProcessEvent{
ThresholdIDs: []string{},
CGREvent: nil,
CGREvent: nil,
}
experr := `MANDATORY_IE_MISSING: [CGREvent]`
@@ -2540,7 +2574,6 @@ func TestThresholdsV1GetThresholdsForEventMissingArgs(t *testing.T) {
}
args = &ThresholdsArgsProcessEvent{
ThresholdIDs: []string{},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
@@ -2556,7 +2589,6 @@ func TestThresholdsV1GetThresholdsForEventMissingArgs(t *testing.T) {
}
args = &ThresholdsArgsProcessEvent{
ThresholdIDs: []string{},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "TestGetThresholdsForEvent",
@@ -3027,13 +3059,15 @@ func TestThreholdsMatchingThresholdsForEventDoesNotPass(t *testing.T) {
}
args := &ThresholdsArgsProcessEvent{
ThresholdIDs: []string{"TH1"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "TestEvent",
Event: map[string]interface{}{
utils.AccountField: "1002",
},
APIOpts: map[string]interface{}{
utils.OptsThresholdsThresholdIDs: []string{"TH1"},
},
},
}
if _, err := tS.matchingThresholdsForEvent(context.Background(), "cgrates.org",

View File

@@ -3085,7 +3085,6 @@ func TestResourcesProcessThresholdsOK(t *testing.T) {
calls: map[string]func(ctx *context.Context, args interface{}, reply interface{}) error{
utils.ThresholdSv1ProcessEvent: func(ctx *context.Context, args, reply interface{}) error {
exp := &ThresholdsArgsProcessEvent{
ThresholdIDs: []string{"THD_1"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: args.(*ThresholdsArgsProcessEvent).CGREvent.ID,
@@ -3095,7 +3094,8 @@ func TestResourcesProcessThresholdsOK(t *testing.T) {
utils.Usage: 0.,
},
APIOpts: map[string]interface{}{
utils.MetaEventType: utils.ResourceUpdate,
utils.MetaEventType: utils.ResourceUpdate,
utils.OptsThresholdsThresholdIDs: []string{"THD_1"},
},
},
}
@@ -3158,7 +3158,6 @@ func TestResourcesProcessThresholdsCallErr(t *testing.T) {
calls: map[string]func(ctx *context.Context, args interface{}, reply interface{}) error{
utils.ThresholdSv1ProcessEvent: func(ctx *context.Context, args, reply interface{}) error {
exp := &ThresholdsArgsProcessEvent{
ThresholdIDs: []string{"THD_1"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: args.(*ThresholdsArgsProcessEvent).CGREvent.ID,
@@ -3168,7 +3167,8 @@ func TestResourcesProcessThresholdsCallErr(t *testing.T) {
utils.Usage: 0.,
},
APIOpts: map[string]interface{}{
utils.MetaEventType: utils.ResourceUpdate,
utils.MetaEventType: utils.ResourceUpdate,
utils.OptsThresholdsThresholdIDs: []string{"THD_1"},
},
},
}