From ad3eb76102f12ff8030d7801df6760512e4376e6 Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 13 Oct 2017 21:02:25 +0200 Subject: [PATCH] Threshold FilterableEvent, integration tests for thresholds --- apier/v1/thresholds_it_test.go | 41 +++++++++++++----------- config/config_defaults.go | 2 ++ data/tariffplans/tutorial/Thresholds.csv | 30 ++++++++--------- engine/thresholds.go | 31 ++++++++++++++++-- utils/consts.go | 4 +++ 5 files changed, 73 insertions(+), 35 deletions(-) diff --git a/apier/v1/thresholds_it_test.go b/apier/v1/thresholds_it_test.go index d2211163a..44f5426fc 100644 --- a/apier/v1/thresholds_it_test.go +++ b/apier/v1/thresholds_it_test.go @@ -58,11 +58,11 @@ var tEvs = []*engine.ThresholdEvent{ utils.EventSource: utils.StatService, utils.StatID: "Stats1", "ASR": 35.0, - "ACD": time.Duration(2*time.Minute + 45*time.Second), + "ACD": "2m45s", "TCC": 12.7, - "TCD": time.Duration(12*time.Minute + 15*time.Second), + "TCD": "12m15s", "ACC": 0.75, - "PDD": time.Duration(2 * time.Second), + "PDD": "2s", }}, &engine.ThresholdEvent{ // hitting THD_STATS_1 and THD_STATS_2 Tenant: "cgrates.org", @@ -71,8 +71,8 @@ var tEvs = []*engine.ThresholdEvent{ utils.EventSource: utils.StatService, utils.StatID: "STATS_HOURLY_DE", "ASR": 35.0, - "ACD": time.Duration(2*time.Minute + 45*time.Second), - "TCD": time.Duration(1 * time.Hour), + "ACD": "2m45s", + "TCD": "1h", }}, &engine.ThresholdEvent{ // hitting THD_STATS_3 Tenant: "cgrates.org", @@ -80,8 +80,8 @@ var tEvs = []*engine.ThresholdEvent{ Fields: map[string]interface{}{ utils.EventSource: utils.StatService, utils.StatID: "STATS_DAILY_DE", - "ACD": time.Duration(2*time.Minute + 45*time.Second), - "TCD": time.Duration(3*time.Hour + 1*time.Second), + "ACD": "2m45s", + "TCD": "3h1s", }}, &engine.ThresholdEvent{ // hitting THD_RES_1 Tenant: "cgrates.org", @@ -167,7 +167,7 @@ func testV1TSFromFolder(t *testing.T) { func testV1TSGetThresholds(t *testing.T) { var tIDs []string - expectedIDs := []string{"Threshold1"} + expectedIDs := []string{"THD_RES_1", "THD_STATS_2", "THD_STATS_1", "THD_ACNT_BALANCE_1", "THD_STATS_3"} if err := tSv1Rpc.Call("ThresholdSV1.GetThresholdIDs", "cgrates.org", &tIDs); err != nil { t.Error(err) } else if !reflect.DeepEqual(expectedIDs, tIDs) { @@ -185,30 +185,35 @@ func testV1TSGetThresholds(t *testing.T) { func testV1TSProcessEvent(t *testing.T) { var hits int + eHits := 1 if err := tSv1Rpc.Call("ThresholdSV1.ProcessEvent", tEvs[0], &hits); err != nil { t.Error(err) - } else if hits != 0 { - t.Error("Expecting hits: %d, received: %d", 0, hits) + } else if hits != eHits { + t.Errorf("Expecting hits: %d, received: %d", eHits, hits) } + eHits = 1 if err := tSv1Rpc.Call("ThresholdSV1.ProcessEvent", tEvs[1], &hits); err != nil { t.Error(err) - } else if hits != 0 { - t.Error("Expecting hits: %d, received: %d", 0, hits) + } else if hits != eHits { + t.Errorf("Expecting hits: %d, received: %d", eHits, hits) } + eHits = 2 if err := tSv1Rpc.Call("ThresholdSV1.ProcessEvent", tEvs[2], &hits); err != nil { t.Error(err) - } else if hits != 0 { - t.Error("Expecting hits: %d, received: %d", 0, hits) + } else if hits != eHits { + t.Errorf("Expecting hits: %d, received: %d", eHits, hits) } + eHits = 1 if err := tSv1Rpc.Call("ThresholdSV1.ProcessEvent", tEvs[3], &hits); err != nil { t.Error(err) - } else if hits != 0 { - t.Error("Expecting hits: %d, received: %d", 0, hits) + } else if hits != eHits { + t.Errorf("Expecting hits: %d, received: %d", eHits, hits) } + eHits = 1 if err := tSv1Rpc.Call("ThresholdSV1.ProcessEvent", tEvs[4], &hits); err != nil { t.Error(err) - } else if hits != 0 { - t.Error("Expecting hits: %d, received: %d", 0, hits) + } else if hits != eHits { + t.Errorf("Expecting hits: %d, received: %d", eHits, hits) } } diff --git a/config/config_defaults.go b/config/config_defaults.go index abf5c8542..62bcf3e7b 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -72,6 +72,8 @@ const CGRATES_CFG_JSON = ` "event_resources": {"limit": -1, "ttl": "1m", "static_ttl": false}, // matching resources to events "statqueue_profiles": {"limit": -1, "ttl": "1m", "static_ttl": false, "precache": false}, // statqueue profiles "statqueues": {"limit": -1, "ttl": "1m", "static_ttl": false, "precache": false}, // statqueues with metrics + "threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control threshold profiles caching + "thresholds": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control thresholds caching }, diff --git a/data/tariffplans/tutorial/Thresholds.csv b/data/tariffplans/tutorial/Thresholds.csv index fa8ed6123..54b36a0f3 100644 --- a/data/tariffplans/tutorial/Thresholds.csv +++ b/data/tariffplans/tutorial/Thresholds.csv @@ -1,16 +1,16 @@ #Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],Recurrent[6],MinSleep[7],Blocker[8],Weight[9],ActionIDs[10],Async[11] -cgrates.org,THD_ACNT_BALANCE_1,*string,Account,1001;1002,2014-07-29T15:00:00Z,true,1s,true,10,LOG_WARNING -cgrates.org,THD_ACNT_BALANCE_1,*string,EventSource,AccountS,,,,,, -cgrates.org,THD_ACNT_BALANCE_1,*gte,BalanceValue,10.0,,,,,, -cgrates.org,THD_STATS_1,*string,EventSource,StatS,2014-07-29T15:00:00Z,true,1s,true,10,LOG_WARNING -cgrates.org,THD_STATS_1,*lt,ASR,40,,,,,, -cgrates.org,THD_STATS_1,*lt,ACD,3m,,,,,, -cgrates.org,THD_STATS_2,*string,EventSource,StatS,2014-07-29T15:00:00Z,true,1s,true,10,DISABLE_AND_LOG -cgrates.org,THD_STATS_2,*string,StatID,STATS_HOURLY_DE,,,,,, -cgrates.org,THD_STATS_2,*gt,TCD,30m,,,,,, -cgrates.org,THD_STATS_3,*string,EventSource,StatS,2014-07-29T15:00:00Z,false,1s,true,10,TOPUP_100SMS_DE_MOBILE -cgrates.org,THD_STATS_3,*string,StatID,STATS_DAILY_DE,,,,,, -cgrates.org,THD_STATS_3,*gt,TCD,3h,,,,,, -cgrates.org,THD_RES_1,*string,EventSource,ResourceS,2014-07-29T15:00:00Z,true,1s,true,10,LOG_WARNING -cgrates.org,THD_RES_1,*string,ResourceID,RES_GRP_1,,,,,, -cgrates.org,THD_RES_1,*gte,Usage,10,,,,,, +cgrates.org,THD_ACNT_BALANCE_1,*string,Account,1001;1002,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING, +cgrates.org,THD_ACNT_BALANCE_1,*string,EventSource,AccountS,,,,,,, +cgrates.org,THD_ACNT_BALANCE_1,*gte,BalanceValue,10.0,,,,,,, +cgrates.org,THD_STATS_1,*string,EventSource,StatS,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING, +cgrates.org,THD_STATS_1,*lt,ASR,40.0,,,,,,, +cgrates.org,THD_STATS_1,*lt,ACD,3m,,,,,,, +cgrates.org,THD_STATS_2,*string,EventSource,StatS,2014-07-29T15:00:00Z,true,1s,false,10,DISABLE_AND_LOG, +cgrates.org,THD_STATS_2,*string,StatID,STATS_HOURLY_DE,,,,,,, +cgrates.org,THD_STATS_2,*gt,TCD,30m,,,,,,, +cgrates.org,THD_STATS_3,*string,EventSource,StatS,2014-07-29T15:00:00Z,false,1s,false,10,TOPUP_100SMS_DE_MOBILE, +cgrates.org,THD_STATS_3,*string,StatID,STATS_DAILY_DE,,,,,,, +cgrates.org,THD_STATS_3,*gt,TCD,3h,,,,,,, +cgrates.org,THD_RES_1,*string,EventSource,ResourceS,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING, +cgrates.org,THD_RES_1,*string,ResourceID,RES_GRP_1,,,,,,, +cgrates.org,THD_RES_1,*gte,Usage,10.0,,,,,,, diff --git a/engine/thresholds.go b/engine/thresholds.go index 42654bd45..6d05a2f2d 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -21,6 +21,7 @@ package engine import ( "fmt" "math/rand" + "reflect" "sort" "sync" "time" @@ -72,6 +73,31 @@ func (te *ThresholdEvent) Account() (acnt string, err error) { return } +func (te *ThresholdEvent) FilterableEvent(fltredFields []string) (fEv map[string]interface{}) { + fEv = make(map[string]interface{}) + if len(fltredFields) == 0 { + i := 0 + fltredFields = make([]string, len(te.Fields)) + for k := range te.Fields { + fltredFields[i] = k + i++ + } + } + for _, fltrFld := range fltredFields { + fldVal, has := te.Fields[fltrFld] + if !has { + continue // the field does not exist in map, ignore it + } + valOf := reflect.ValueOf(fldVal) + if valOf.Kind() == reflect.String { + fEv[fltrFld] = utils.StringToInterface(valOf.String()) // attempt converting from string to comparable interface + } else { + fEv[fltrFld] = fldVal + } + } + return +} + // Threshold is the unit matched by filters type Threshold struct { Tenant string @@ -127,7 +153,8 @@ func NewThresholdService(dm *DataManager, filteredFields []string, storeInterval filteredFields: filteredFields, storeInterval: storeInterval, statS: statS, - stopBackup: make(chan struct{})}, nil + stopBackup: make(chan struct{}), + storedTdIDs: make(utils.StringMap)}, nil } // ThresholdService manages Threshold execution and storing them to dataDB @@ -246,7 +273,7 @@ func (tS *ThresholdService) matchingThresholdsForEvent(ev *ThresholdEvent) (ts T } passAllFilters := true for _, fltr := range tPrfl.Filters { - if pass, err := fltr.Pass(ev.Fields, "", tS.statS); err != nil { + if pass, err := fltr.Pass(ev.FilterableEvent(nil), "", tS.statS); err != nil { return nil, err } else if !pass { passAllFilters = false diff --git a/utils/consts.go b/utils/consts.go index b470b2de3..ccd9355a3 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -61,6 +61,8 @@ var ( CacheTimings: TimingsPrefix, CacheStatQueueProfiles: StatQueueProfilePrefix, CacheStatQueues: StatQueuePrefix, + CacheThresholdProfiles: ThresholdProfilePrefix, + CacheThresholds: ThresholdPrefix, } CachePrefixToInstance map[string]string // will be built on init ) @@ -470,6 +472,8 @@ const ( BalanceID = "BalanceID" BalanceValue = "BalanceValue" ResourceS = "ResourceS" + CacheThresholdProfiles = "threshold_profiles" + CacheThresholds = "thresholds" ) func buildCacheInstRevPrefixes() {