mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-25 09:08:45 +05:00
ThresholdS running with common filters, Thresholds.MinHits implementation, integration tests
This commit is contained in:
@@ -58,7 +58,8 @@ var tEvs = []*engine.ThresholdEvent{
|
||||
utils.ACCOUNT: "1002",
|
||||
utils.BalanceID: utils.META_DEFAULT,
|
||||
utils.Units: 12.3,
|
||||
utils.ExpiryTime: "2009-11-10T23:00:00Z"}},
|
||||
utils.ExpiryTime: time.Date(2009, 11, 10, 23, 00, 0, 0, time.UTC).Local(),
|
||||
}},
|
||||
&engine.ThresholdEvent{ // hitting THD_STATS_1
|
||||
Tenant: "cgrates.org",
|
||||
ID: "event3",
|
||||
@@ -111,10 +112,10 @@ var sTestsThresholdSV1 = []func(t *testing.T){
|
||||
testV1TSStartEngine,
|
||||
testV1TSRpcConn,
|
||||
testV1TSFromFolder,
|
||||
//testV1TSGetThresholds,
|
||||
//testV1TSProcessEvent,
|
||||
//testV1TSGetThresholdsAfterProcess,
|
||||
//testV1TSGetThresholdsAfterRestart,
|
||||
testV1TSGetThresholds,
|
||||
testV1TSProcessEvent,
|
||||
testV1TSGetThresholdsAfterProcess,
|
||||
testV1TSGetThresholdsAfterRestart,
|
||||
testV1TSSetThresholdProfile,
|
||||
testV1TSUpdateThresholdProfile,
|
||||
testV1TSRemoveThresholdProfile,
|
||||
|
||||
@@ -105,9 +105,9 @@ func (fS *FilterS) PassFiltersForEvent(tenant string, ev map[string]interface{},
|
||||
default:
|
||||
return false, fmt.Errorf("tenant: %s filter: %s unsupported filter type: <%s>", tenant, fltrID, fltr.Type)
|
||||
}
|
||||
}
|
||||
if err != nil || !pass {
|
||||
return pass, err
|
||||
if !pass || err != nil {
|
||||
return pass, err
|
||||
}
|
||||
}
|
||||
atLeastOneFilterPassing = true
|
||||
}
|
||||
@@ -164,12 +164,11 @@ type RFStatSThreshold struct {
|
||||
// RequestFilter filters requests coming into various places
|
||||
// Pass rule: default negative, one mathing rule should pass the filter
|
||||
type RequestFilter struct {
|
||||
Type string // Filter type (*string, *timing, *rsr_filters, *stats, *lt, *lte, *gt, *gte)
|
||||
FieldName string // Name of the field providing us the Values to check (used in case of some )
|
||||
Values []string // Filter definition
|
||||
ActivationInterval *utils.ActivationInterval
|
||||
rsrFields utils.RSRFields // Cache here the RSRFilter Values
|
||||
statSThresholds []*RFStatSThreshold // Cached compiled RFStatsThreshold out of Values
|
||||
Type string // Filter type (*string, *timing, *rsr_filters, *stats, *lt, *lte, *gt, *gte)
|
||||
FieldName string // Name of the field providing us the Values to check (used in case of some )
|
||||
Values []string // Filter definition
|
||||
rsrFields utils.RSRFields // Cache here the RSRFilter Values
|
||||
statSThresholds []*RFStatSThreshold // Cached compiled RFStatsThreshold out of Values
|
||||
}
|
||||
|
||||
// Separate method to compile RSR fields
|
||||
@@ -329,14 +328,16 @@ func (fltr *RequestFilter) passGreaterThan(req interface{}, extraFieldsLabel str
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
if fldStr, castStr := fldIf.(string); castStr { // attempt converting string since deserialization fails here (ie: time.Time fields)
|
||||
fldIf = utils.StringToInterface(fldStr)
|
||||
}
|
||||
for _, val := range fltr.Values {
|
||||
ifVal := utils.StringToInterface(val)
|
||||
orEqual := false
|
||||
if fltr.Type == MetaGreaterOrEqual ||
|
||||
fltr.Type == MetaLessThan {
|
||||
orEqual = true
|
||||
}
|
||||
if gte, err := utils.GreaterThan(fldIf, ifVal, orEqual); err != nil {
|
||||
if gte, err := utils.GreaterThan(fldIf, utils.StringToInterface(val), orEqual); err != nil {
|
||||
return false, err
|
||||
} else if utils.IsSliceMember([]string{MetaGreaterThan, MetaGreaterOrEqual}, fltr.Type) && gte {
|
||||
return true, nil
|
||||
|
||||
@@ -102,6 +102,7 @@ func (te *ThresholdEvent) FilterableEvent(fltredFields []string) (fEv map[string
|
||||
type Threshold struct {
|
||||
Tenant string
|
||||
ID string
|
||||
Hits int // number of hits for this threshold
|
||||
Snooze time.Time // prevent threshold to run too early
|
||||
|
||||
tPrfl *ThresholdProfile
|
||||
@@ -115,7 +116,10 @@ func (t *Threshold) TenantID() string {
|
||||
// ProcessEvent processes an ThresholdEvent
|
||||
// concurrentActions limits the number of simultaneous action sets executed
|
||||
func (t *Threshold) ProcessEvent(ev *ThresholdEvent, dm *DataManager) (err error) {
|
||||
if t.Snooze.After(time.Now()) { // ignore the event
|
||||
if t.Snooze.After(time.Now()) { // snoozed, not executing actions
|
||||
return
|
||||
}
|
||||
if t.Hits < t.tPrfl.MinHits { // number of hits was not met, will not execute actions
|
||||
return
|
||||
}
|
||||
acnt, _ := ev.Account()
|
||||
@@ -269,20 +273,11 @@ func (tS *ThresholdService) matchingThresholdsForEvent(ev *ThresholdEvent) (ts T
|
||||
!tPrfl.ActivationInterval.IsActiveAtTime(time.Now()) { // not active
|
||||
continue
|
||||
}
|
||||
/*
|
||||
passAllFilters := true
|
||||
for _, fltr := range tPrfl.Filters {
|
||||
if pass, err := fltr.Pass(ev.FilterableEvent(nil), "", tS.statS); err != nil {
|
||||
return nil, err
|
||||
} else if !pass {
|
||||
passAllFilters = false
|
||||
continue
|
||||
}
|
||||
}
|
||||
if !passAllFilters {
|
||||
continue
|
||||
}
|
||||
*/
|
||||
if pass, err := tS.filterS.PassFiltersForEvent(ev.Tenant, ev.Event, tPrfl.FilterIDs); err != nil {
|
||||
return nil, err
|
||||
} else if !pass {
|
||||
continue
|
||||
}
|
||||
t, err := tS.dm.GetThreshold(tPrfl.Tenant, tPrfl.ID, false, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -319,6 +314,7 @@ func (tS *ThresholdService) processEvent(ev *ThresholdEvent) (hits int, err erro
|
||||
hits = len(matchTs)
|
||||
var withErrors bool
|
||||
for _, t := range matchTs {
|
||||
t.Hits += 1
|
||||
err = t.ProcessEvent(ev, tS.dm)
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
|
||||
@@ -485,6 +485,7 @@ const (
|
||||
Disabled = "Disabled"
|
||||
Action = "Action"
|
||||
ThresholdSv1ProcessEvent = "ThresholdSv1.ProcessEvent"
|
||||
MetaNow = "*now"
|
||||
)
|
||||
|
||||
func buildCacheInstRevPrefixes() {
|
||||
|
||||
@@ -195,31 +195,38 @@ func GreaterThan(item, oItem interface{}, orEqual bool) (gte bool, err error) {
|
||||
typItem != typOItem {
|
||||
return false, errors.New("incomparable")
|
||||
}
|
||||
switch typItem.Kind() {
|
||||
case reflect.Float32, reflect.Float64:
|
||||
switch item.(type) {
|
||||
case float64:
|
||||
if orEqual {
|
||||
gte = valItm.Float() >= valOtItm.Float()
|
||||
} else {
|
||||
gte = valItm.Float() > valOtItm.Float()
|
||||
}
|
||||
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
|
||||
case int64:
|
||||
if orEqual {
|
||||
gte = valItm.Int() >= valOtItm.Int()
|
||||
} else {
|
||||
gte = valItm.Int() > valOtItm.Int()
|
||||
}
|
||||
case reflect.Struct:
|
||||
tVal, ok := valItm.Interface().(time.Time)
|
||||
tOVal, oOK := valOtItm.Interface().(time.Time)
|
||||
if !ok || !oOK {
|
||||
return false, fmt.Errorf("cannot cast struct to time: %v, %v", ok, oOK)
|
||||
}
|
||||
case time.Time:
|
||||
tVal := item.(time.Time)
|
||||
tOVal := oItem.(time.Time)
|
||||
if orEqual {
|
||||
gte = tVal == tOVal
|
||||
}
|
||||
if !gte {
|
||||
gte = tVal.After(tOVal)
|
||||
}
|
||||
case time.Duration:
|
||||
tVal := item.(time.Duration)
|
||||
tOVal := oItem.(time.Duration)
|
||||
if orEqual {
|
||||
gte = tVal == tOVal
|
||||
}
|
||||
if !gte {
|
||||
gte = tVal > tOVal
|
||||
}
|
||||
|
||||
default: // unsupported comparison
|
||||
err = fmt.Errorf("unsupported comparison type: %v, kind: %v", typItem, typItem.Kind())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user