From 881310f01c0a1814235f0c7455bb3589e13833e4 Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 26 Oct 2017 17:51:15 +0200 Subject: [PATCH] ThresholdS running with common filters, Thresholds.MinHits implementation, integration tests --- apier/v1/thresholds_it_test.go | 11 ++++++----- engine/filters.go | 23 ++++++++++++----------- engine/thresholds.go | 26 +++++++++++--------------- utils/consts.go | 1 + utils/reflect.go | 25 ++++++++++++++++--------- 5 files changed, 46 insertions(+), 40 deletions(-) diff --git a/apier/v1/thresholds_it_test.go b/apier/v1/thresholds_it_test.go index cdfbd4774..5704966cf 100644 --- a/apier/v1/thresholds_it_test.go +++ b/apier/v1/thresholds_it_test.go @@ -58,7 +58,8 @@ var tEvs = []*engine.ThresholdEvent{ utils.ACCOUNT: "1002", utils.BalanceID: utils.META_DEFAULT, utils.Units: 12.3, - utils.ExpiryTime: "2009-11-10T23:00:00Z"}}, + utils.ExpiryTime: time.Date(2009, 11, 10, 23, 00, 0, 0, time.UTC).Local(), + }}, &engine.ThresholdEvent{ // hitting THD_STATS_1 Tenant: "cgrates.org", ID: "event3", @@ -111,10 +112,10 @@ var sTestsThresholdSV1 = []func(t *testing.T){ testV1TSStartEngine, testV1TSRpcConn, testV1TSFromFolder, - //testV1TSGetThresholds, - //testV1TSProcessEvent, - //testV1TSGetThresholdsAfterProcess, - //testV1TSGetThresholdsAfterRestart, + testV1TSGetThresholds, + testV1TSProcessEvent, + testV1TSGetThresholdsAfterProcess, + testV1TSGetThresholdsAfterRestart, testV1TSSetThresholdProfile, testV1TSUpdateThresholdProfile, testV1TSRemoveThresholdProfile, diff --git a/engine/filters.go b/engine/filters.go index 5190be47c..e445f3d50 100644 --- a/engine/filters.go +++ b/engine/filters.go @@ -105,9 +105,9 @@ func (fS *FilterS) PassFiltersForEvent(tenant string, ev map[string]interface{}, default: return false, fmt.Errorf("tenant: %s filter: %s unsupported filter type: <%s>", tenant, fltrID, fltr.Type) } - } - if err != nil || !pass { - return pass, err + if !pass || err != nil { + return pass, err + } } atLeastOneFilterPassing = true } @@ -164,12 +164,11 @@ type RFStatSThreshold struct { // RequestFilter filters requests coming into various places // Pass rule: default negative, one mathing rule should pass the filter type RequestFilter struct { - Type string // Filter type (*string, *timing, *rsr_filters, *stats, *lt, *lte, *gt, *gte) - FieldName string // Name of the field providing us the Values to check (used in case of some ) - Values []string // Filter definition - ActivationInterval *utils.ActivationInterval - rsrFields utils.RSRFields // Cache here the RSRFilter Values - statSThresholds []*RFStatSThreshold // Cached compiled RFStatsThreshold out of Values + Type string // Filter type (*string, *timing, *rsr_filters, *stats, *lt, *lte, *gt, *gte) + FieldName string // Name of the field providing us the Values to check (used in case of some ) + Values []string // Filter definition + rsrFields utils.RSRFields // Cache here the RSRFilter Values + statSThresholds []*RFStatSThreshold // Cached compiled RFStatsThreshold out of Values } // Separate method to compile RSR fields @@ -329,14 +328,16 @@ func (fltr *RequestFilter) passGreaterThan(req interface{}, extraFieldsLabel str } return false, err } + if fldStr, castStr := fldIf.(string); castStr { // attempt converting string since deserialization fails here (ie: time.Time fields) + fldIf = utils.StringToInterface(fldStr) + } for _, val := range fltr.Values { - ifVal := utils.StringToInterface(val) orEqual := false if fltr.Type == MetaGreaterOrEqual || fltr.Type == MetaLessThan { orEqual = true } - if gte, err := utils.GreaterThan(fldIf, ifVal, orEqual); err != nil { + if gte, err := utils.GreaterThan(fldIf, utils.StringToInterface(val), orEqual); err != nil { return false, err } else if utils.IsSliceMember([]string{MetaGreaterThan, MetaGreaterOrEqual}, fltr.Type) && gte { return true, nil diff --git a/engine/thresholds.go b/engine/thresholds.go index a06223618..b749632d2 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -102,6 +102,7 @@ func (te *ThresholdEvent) FilterableEvent(fltredFields []string) (fEv map[string type Threshold struct { Tenant string ID string + Hits int // number of hits for this threshold Snooze time.Time // prevent threshold to run too early tPrfl *ThresholdProfile @@ -115,7 +116,10 @@ func (t *Threshold) TenantID() string { // ProcessEvent processes an ThresholdEvent // concurrentActions limits the number of simultaneous action sets executed func (t *Threshold) ProcessEvent(ev *ThresholdEvent, dm *DataManager) (err error) { - if t.Snooze.After(time.Now()) { // ignore the event + if t.Snooze.After(time.Now()) { // snoozed, not executing actions + return + } + if t.Hits < t.tPrfl.MinHits { // number of hits was not met, will not execute actions return } acnt, _ := ev.Account() @@ -269,20 +273,11 @@ func (tS *ThresholdService) matchingThresholdsForEvent(ev *ThresholdEvent) (ts T !tPrfl.ActivationInterval.IsActiveAtTime(time.Now()) { // not active continue } - /* - passAllFilters := true - for _, fltr := range tPrfl.Filters { - if pass, err := fltr.Pass(ev.FilterableEvent(nil), "", tS.statS); err != nil { - return nil, err - } else if !pass { - passAllFilters = false - continue - } - } - if !passAllFilters { - continue - } - */ + if pass, err := tS.filterS.PassFiltersForEvent(ev.Tenant, ev.Event, tPrfl.FilterIDs); err != nil { + return nil, err + } else if !pass { + continue + } t, err := tS.dm.GetThreshold(tPrfl.Tenant, tPrfl.ID, false, "") if err != nil { return nil, err @@ -319,6 +314,7 @@ func (tS *ThresholdService) processEvent(ev *ThresholdEvent) (hits int, err erro hits = len(matchTs) var withErrors bool for _, t := range matchTs { + t.Hits += 1 err = t.ProcessEvent(ev, tS.dm) if err != nil { utils.Logger.Warning( diff --git a/utils/consts.go b/utils/consts.go index e4849ca42..35aaa2586 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -485,6 +485,7 @@ const ( Disabled = "Disabled" Action = "Action" ThresholdSv1ProcessEvent = "ThresholdSv1.ProcessEvent" + MetaNow = "*now" ) func buildCacheInstRevPrefixes() { diff --git a/utils/reflect.go b/utils/reflect.go index 0ae691815..8b14b3de3 100644 --- a/utils/reflect.go +++ b/utils/reflect.go @@ -195,31 +195,38 @@ func GreaterThan(item, oItem interface{}, orEqual bool) (gte bool, err error) { typItem != typOItem { return false, errors.New("incomparable") } - switch typItem.Kind() { - case reflect.Float32, reflect.Float64: + switch item.(type) { + case float64: if orEqual { gte = valItm.Float() >= valOtItm.Float() } else { gte = valItm.Float() > valOtItm.Float() } - case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + case int64: if orEqual { gte = valItm.Int() >= valOtItm.Int() } else { gte = valItm.Int() > valOtItm.Int() } - case reflect.Struct: - tVal, ok := valItm.Interface().(time.Time) - tOVal, oOK := valOtItm.Interface().(time.Time) - if !ok || !oOK { - return false, fmt.Errorf("cannot cast struct to time: %v, %v", ok, oOK) - } + case time.Time: + tVal := item.(time.Time) + tOVal := oItem.(time.Time) if orEqual { gte = tVal == tOVal } if !gte { gte = tVal.After(tOVal) } + case time.Duration: + tVal := item.(time.Duration) + tOVal := oItem.(time.Duration) + if orEqual { + gte = tVal == tOVal + } + if !gte { + gte = tVal > tOVal + } + default: // unsupported comparison err = fmt.Errorf("unsupported comparison type: %v, kind: %v", typItem, typItem.Kind()) }