mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-15 13:19:53 +05:00
Threshold FilterableEvent, integration tests for thresholds
This commit is contained in:
@@ -58,11 +58,11 @@ var tEvs = []*engine.ThresholdEvent{
|
||||
utils.EventSource: utils.StatService,
|
||||
utils.StatID: "Stats1",
|
||||
"ASR": 35.0,
|
||||
"ACD": time.Duration(2*time.Minute + 45*time.Second),
|
||||
"ACD": "2m45s",
|
||||
"TCC": 12.7,
|
||||
"TCD": time.Duration(12*time.Minute + 15*time.Second),
|
||||
"TCD": "12m15s",
|
||||
"ACC": 0.75,
|
||||
"PDD": time.Duration(2 * time.Second),
|
||||
"PDD": "2s",
|
||||
}},
|
||||
&engine.ThresholdEvent{ // hitting THD_STATS_1 and THD_STATS_2
|
||||
Tenant: "cgrates.org",
|
||||
@@ -71,8 +71,8 @@ var tEvs = []*engine.ThresholdEvent{
|
||||
utils.EventSource: utils.StatService,
|
||||
utils.StatID: "STATS_HOURLY_DE",
|
||||
"ASR": 35.0,
|
||||
"ACD": time.Duration(2*time.Minute + 45*time.Second),
|
||||
"TCD": time.Duration(1 * time.Hour),
|
||||
"ACD": "2m45s",
|
||||
"TCD": "1h",
|
||||
}},
|
||||
&engine.ThresholdEvent{ // hitting THD_STATS_3
|
||||
Tenant: "cgrates.org",
|
||||
@@ -80,8 +80,8 @@ var tEvs = []*engine.ThresholdEvent{
|
||||
Fields: map[string]interface{}{
|
||||
utils.EventSource: utils.StatService,
|
||||
utils.StatID: "STATS_DAILY_DE",
|
||||
"ACD": time.Duration(2*time.Minute + 45*time.Second),
|
||||
"TCD": time.Duration(3*time.Hour + 1*time.Second),
|
||||
"ACD": "2m45s",
|
||||
"TCD": "3h1s",
|
||||
}},
|
||||
&engine.ThresholdEvent{ // hitting THD_RES_1
|
||||
Tenant: "cgrates.org",
|
||||
@@ -167,7 +167,7 @@ func testV1TSFromFolder(t *testing.T) {
|
||||
|
||||
func testV1TSGetThresholds(t *testing.T) {
|
||||
var tIDs []string
|
||||
expectedIDs := []string{"Threshold1"}
|
||||
expectedIDs := []string{"THD_RES_1", "THD_STATS_2", "THD_STATS_1", "THD_ACNT_BALANCE_1", "THD_STATS_3"}
|
||||
if err := tSv1Rpc.Call("ThresholdSV1.GetThresholdIDs", "cgrates.org", &tIDs); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(expectedIDs, tIDs) {
|
||||
@@ -185,30 +185,35 @@ func testV1TSGetThresholds(t *testing.T) {
|
||||
|
||||
func testV1TSProcessEvent(t *testing.T) {
|
||||
var hits int
|
||||
eHits := 1
|
||||
if err := tSv1Rpc.Call("ThresholdSV1.ProcessEvent", tEvs[0], &hits); err != nil {
|
||||
t.Error(err)
|
||||
} else if hits != 0 {
|
||||
t.Error("Expecting hits: %d, received: %d", 0, hits)
|
||||
} else if hits != eHits {
|
||||
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
|
||||
}
|
||||
eHits = 1
|
||||
if err := tSv1Rpc.Call("ThresholdSV1.ProcessEvent", tEvs[1], &hits); err != nil {
|
||||
t.Error(err)
|
||||
} else if hits != 0 {
|
||||
t.Error("Expecting hits: %d, received: %d", 0, hits)
|
||||
} else if hits != eHits {
|
||||
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
|
||||
}
|
||||
eHits = 2
|
||||
if err := tSv1Rpc.Call("ThresholdSV1.ProcessEvent", tEvs[2], &hits); err != nil {
|
||||
t.Error(err)
|
||||
} else if hits != 0 {
|
||||
t.Error("Expecting hits: %d, received: %d", 0, hits)
|
||||
} else if hits != eHits {
|
||||
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
|
||||
}
|
||||
eHits = 1
|
||||
if err := tSv1Rpc.Call("ThresholdSV1.ProcessEvent", tEvs[3], &hits); err != nil {
|
||||
t.Error(err)
|
||||
} else if hits != 0 {
|
||||
t.Error("Expecting hits: %d, received: %d", 0, hits)
|
||||
} else if hits != eHits {
|
||||
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
|
||||
}
|
||||
eHits = 1
|
||||
if err := tSv1Rpc.Call("ThresholdSV1.ProcessEvent", tEvs[4], &hits); err != nil {
|
||||
t.Error(err)
|
||||
} else if hits != 0 {
|
||||
t.Error("Expecting hits: %d, received: %d", 0, hits)
|
||||
} else if hits != eHits {
|
||||
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -72,6 +72,8 @@ const CGRATES_CFG_JSON = `
|
||||
"event_resources": {"limit": -1, "ttl": "1m", "static_ttl": false}, // matching resources to events
|
||||
"statqueue_profiles": {"limit": -1, "ttl": "1m", "static_ttl": false, "precache": false}, // statqueue profiles
|
||||
"statqueues": {"limit": -1, "ttl": "1m", "static_ttl": false, "precache": false}, // statqueues with metrics
|
||||
"threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control threshold profiles caching
|
||||
"thresholds": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control thresholds caching
|
||||
},
|
||||
|
||||
|
||||
|
||||
@@ -1,16 +1,16 @@
|
||||
#Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],Recurrent[6],MinSleep[7],Blocker[8],Weight[9],ActionIDs[10],Async[11]
|
||||
cgrates.org,THD_ACNT_BALANCE_1,*string,Account,1001;1002,2014-07-29T15:00:00Z,true,1s,true,10,LOG_WARNING
|
||||
cgrates.org,THD_ACNT_BALANCE_1,*string,EventSource,AccountS,,,,,,
|
||||
cgrates.org,THD_ACNT_BALANCE_1,*gte,BalanceValue,10.0,,,,,,
|
||||
cgrates.org,THD_STATS_1,*string,EventSource,StatS,2014-07-29T15:00:00Z,true,1s,true,10,LOG_WARNING
|
||||
cgrates.org,THD_STATS_1,*lt,ASR,40,,,,,,
|
||||
cgrates.org,THD_STATS_1,*lt,ACD,3m,,,,,,
|
||||
cgrates.org,THD_STATS_2,*string,EventSource,StatS,2014-07-29T15:00:00Z,true,1s,true,10,DISABLE_AND_LOG
|
||||
cgrates.org,THD_STATS_2,*string,StatID,STATS_HOURLY_DE,,,,,,
|
||||
cgrates.org,THD_STATS_2,*gt,TCD,30m,,,,,,
|
||||
cgrates.org,THD_STATS_3,*string,EventSource,StatS,2014-07-29T15:00:00Z,false,1s,true,10,TOPUP_100SMS_DE_MOBILE
|
||||
cgrates.org,THD_STATS_3,*string,StatID,STATS_DAILY_DE,,,,,,
|
||||
cgrates.org,THD_STATS_3,*gt,TCD,3h,,,,,,
|
||||
cgrates.org,THD_RES_1,*string,EventSource,ResourceS,2014-07-29T15:00:00Z,true,1s,true,10,LOG_WARNING
|
||||
cgrates.org,THD_RES_1,*string,ResourceID,RES_GRP_1,,,,,,
|
||||
cgrates.org,THD_RES_1,*gte,Usage,10,,,,,,
|
||||
cgrates.org,THD_ACNT_BALANCE_1,*string,Account,1001;1002,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING,
|
||||
cgrates.org,THD_ACNT_BALANCE_1,*string,EventSource,AccountS,,,,,,,
|
||||
cgrates.org,THD_ACNT_BALANCE_1,*gte,BalanceValue,10.0,,,,,,,
|
||||
cgrates.org,THD_STATS_1,*string,EventSource,StatS,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING,
|
||||
cgrates.org,THD_STATS_1,*lt,ASR,40.0,,,,,,,
|
||||
cgrates.org,THD_STATS_1,*lt,ACD,3m,,,,,,,
|
||||
cgrates.org,THD_STATS_2,*string,EventSource,StatS,2014-07-29T15:00:00Z,true,1s,false,10,DISABLE_AND_LOG,
|
||||
cgrates.org,THD_STATS_2,*string,StatID,STATS_HOURLY_DE,,,,,,,
|
||||
cgrates.org,THD_STATS_2,*gt,TCD,30m,,,,,,,
|
||||
cgrates.org,THD_STATS_3,*string,EventSource,StatS,2014-07-29T15:00:00Z,false,1s,false,10,TOPUP_100SMS_DE_MOBILE,
|
||||
cgrates.org,THD_STATS_3,*string,StatID,STATS_DAILY_DE,,,,,,,
|
||||
cgrates.org,THD_STATS_3,*gt,TCD,3h,,,,,,,
|
||||
cgrates.org,THD_RES_1,*string,EventSource,ResourceS,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING,
|
||||
cgrates.org,THD_RES_1,*string,ResourceID,RES_GRP_1,,,,,,,
|
||||
cgrates.org,THD_RES_1,*gte,Usage,10.0,,,,,,,
|
||||
|
||||
|
@@ -21,6 +21,7 @@ package engine
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -72,6 +73,31 @@ func (te *ThresholdEvent) Account() (acnt string, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (te *ThresholdEvent) FilterableEvent(fltredFields []string) (fEv map[string]interface{}) {
|
||||
fEv = make(map[string]interface{})
|
||||
if len(fltredFields) == 0 {
|
||||
i := 0
|
||||
fltredFields = make([]string, len(te.Fields))
|
||||
for k := range te.Fields {
|
||||
fltredFields[i] = k
|
||||
i++
|
||||
}
|
||||
}
|
||||
for _, fltrFld := range fltredFields {
|
||||
fldVal, has := te.Fields[fltrFld]
|
||||
if !has {
|
||||
continue // the field does not exist in map, ignore it
|
||||
}
|
||||
valOf := reflect.ValueOf(fldVal)
|
||||
if valOf.Kind() == reflect.String {
|
||||
fEv[fltrFld] = utils.StringToInterface(valOf.String()) // attempt converting from string to comparable interface
|
||||
} else {
|
||||
fEv[fltrFld] = fldVal
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Threshold is the unit matched by filters
|
||||
type Threshold struct {
|
||||
Tenant string
|
||||
@@ -127,7 +153,8 @@ func NewThresholdService(dm *DataManager, filteredFields []string, storeInterval
|
||||
filteredFields: filteredFields,
|
||||
storeInterval: storeInterval,
|
||||
statS: statS,
|
||||
stopBackup: make(chan struct{})}, nil
|
||||
stopBackup: make(chan struct{}),
|
||||
storedTdIDs: make(utils.StringMap)}, nil
|
||||
}
|
||||
|
||||
// ThresholdService manages Threshold execution and storing them to dataDB
|
||||
@@ -246,7 +273,7 @@ func (tS *ThresholdService) matchingThresholdsForEvent(ev *ThresholdEvent) (ts T
|
||||
}
|
||||
passAllFilters := true
|
||||
for _, fltr := range tPrfl.Filters {
|
||||
if pass, err := fltr.Pass(ev.Fields, "", tS.statS); err != nil {
|
||||
if pass, err := fltr.Pass(ev.FilterableEvent(nil), "", tS.statS); err != nil {
|
||||
return nil, err
|
||||
} else if !pass {
|
||||
passAllFilters = false
|
||||
|
||||
@@ -61,6 +61,8 @@ var (
|
||||
CacheTimings: TimingsPrefix,
|
||||
CacheStatQueueProfiles: StatQueueProfilePrefix,
|
||||
CacheStatQueues: StatQueuePrefix,
|
||||
CacheThresholdProfiles: ThresholdProfilePrefix,
|
||||
CacheThresholds: ThresholdPrefix,
|
||||
}
|
||||
CachePrefixToInstance map[string]string // will be built on init
|
||||
)
|
||||
@@ -470,6 +472,8 @@ const (
|
||||
BalanceID = "BalanceID"
|
||||
BalanceValue = "BalanceValue"
|
||||
ResourceS = "ResourceS"
|
||||
CacheThresholdProfiles = "threshold_profiles"
|
||||
CacheThresholds = "thresholds"
|
||||
)
|
||||
|
||||
func buildCacheInstRevPrefixes() {
|
||||
|
||||
Reference in New Issue
Block a user