diff --git a/apier/v1/thresholds.go b/apier/v1/thresholds.go index b3c9b386a..0e8bf7423 100644 --- a/apier/v1/thresholds.go +++ b/apier/v1/thresholds.go @@ -45,7 +45,7 @@ func (tSv1 *ThresholdSv1) GetThresholdIDs(tenant string, tIDs *[]string) error { } // GetThresholdsForEvent returns a list of thresholds matching an event -func (tSv1 *ThresholdSv1) GetThresholdsForEvent(ev *engine.ThresholdEvent, reply *engine.Thresholds) error { +func (tSv1 *ThresholdSv1) GetThresholdsForEvent(ev *utils.CGREvent, reply *engine.Thresholds) error { return tSv1.tS.V1GetThresholdsForEvent(ev, reply) } @@ -55,7 +55,7 @@ func (tSv1 *ThresholdSv1) GetThreshold(tntID *utils.TenantID, t *engine.Threshol } // ProcessEvent will process an Event -func (tSv1 *ThresholdSv1) ProcessEvent(ev *engine.ThresholdEvent, hits *int) error { +func (tSv1 *ThresholdSv1) ProcessEvent(ev *utils.CGREvent, hits *int) error { return tSv1.tS.V1ProcessEvent(ev, hits) } diff --git a/apier/v1/thresholds_it_test.go b/apier/v1/thresholds_it_test.go index ee6be3762..26cefe23f 100644 --- a/apier/v1/thresholds_it_test.go +++ b/apier/v1/thresholds_it_test.go @@ -41,8 +41,8 @@ var ( thdsDelay int ) -var tEvs = []*engine.ThresholdEvent{ - &engine.ThresholdEvent{ // hitting THD_ACNT_BALANCE_1 +var tEvs = []*utils.CGREvent{ + &utils.CGREvent{ // hitting THD_ACNT_BALANCE_1 Tenant: "cgrates.org", ID: "event1", Event: map[string]interface{}{ @@ -50,7 +50,7 @@ var tEvs = []*engine.ThresholdEvent{ utils.ACCOUNT: "1002", utils.AllowNegative: true, utils.Disabled: false}}, - &engine.ThresholdEvent{ // hitting THD_ACNT_BALANCE_1 + &utils.CGREvent{ // hitting THD_ACNT_BALANCE_1 Tenant: "cgrates.org", ID: "event2", Event: map[string]interface{}{ @@ -60,7 +60,7 @@ var tEvs = []*engine.ThresholdEvent{ utils.Units: 12.3, utils.ExpiryTime: time.Date(2009, 11, 10, 23, 00, 0, 0, time.UTC).Local(), }}, - &engine.ThresholdEvent{ // hitting THD_STATS_1 + &utils.CGREvent{ // hitting THD_STATS_1 Tenant: "cgrates.org", ID: "event3", Event: map[string]interface{}{ @@ -74,7 +74,7 @@ var tEvs = []*engine.ThresholdEvent{ "ACC": 0.75, "PDD": "2s", }}, - &engine.ThresholdEvent{ // hitting THD_STATS_1 and THD_STATS_2 + &utils.CGREvent{ // hitting THD_STATS_1 and THD_STATS_2 Tenant: "cgrates.org", ID: "event4", Event: map[string]interface{}{ @@ -85,7 +85,7 @@ var tEvs = []*engine.ThresholdEvent{ "ACD": "2m45s", "TCD": "1h", }}, - &engine.ThresholdEvent{ // hitting THD_STATS_3 + &utils.CGREvent{ // hitting THD_STATS_3 Tenant: "cgrates.org", ID: "event5", Event: map[string]interface{}{ @@ -95,7 +95,7 @@ var tEvs = []*engine.ThresholdEvent{ "ACD": "2m45s", "TCD": "3h1s", }}, - &engine.ThresholdEvent{ // hitting THD_RES_1 + &utils.CGREvent{ // hitting THD_RES_1 Tenant: "cgrates.org", ID: "event6", Event: map[string]interface{}{ @@ -103,7 +103,7 @@ var tEvs = []*engine.ThresholdEvent{ utils.ACCOUNT: "1002", utils.ResourceID: "RES_GRP_1", utils.USAGE: 10.0}}, - &engine.ThresholdEvent{ // hitting THD_RES_1 + &utils.CGREvent{ // hitting THD_RES_1 Tenant: "cgrates.org", ID: "event6", Event: map[string]interface{}{ @@ -111,7 +111,7 @@ var tEvs = []*engine.ThresholdEvent{ utils.ACCOUNT: "1002", utils.ResourceID: "RES_GRP_1", utils.USAGE: 10.0}}, - &engine.ThresholdEvent{ // hitting THD_RES_1 + &utils.CGREvent{ // hitting THD_RES_1 Tenant: "cgrates.org", ID: "event6", Event: map[string]interface{}{ @@ -119,7 +119,7 @@ var tEvs = []*engine.ThresholdEvent{ utils.ACCOUNT: "1002", utils.ResourceID: "RES_GRP_1", utils.USAGE: 10.0}}, - &engine.ThresholdEvent{ // hitting THD_CDRS_1 + &utils.CGREvent{ // hitting THD_CDRS_1 Tenant: "cgrates.org", ID: "cdrev1", Event: map[string]interface{}{ diff --git a/engine/balances.go b/engine/balances.go index 68e631936..496688e33 100644 --- a/engine/balances.go +++ b/engine/balances.go @@ -779,7 +779,7 @@ func (bc Balances) SaveDirtyBalances(acc *Account) { accountId = b.account.ID acntTnt := utils.NewTenantID(accountId) if thresholdS != nil { - ev := &ThresholdEvent{ + ev := &utils.CGREvent{ Tenant: acntTnt.Tenant, ID: utils.GenUUID(), Event: map[string]interface{}{ @@ -827,7 +827,7 @@ func (bc Balances) SaveDirtyBalances(acc *Account) { if len(savedAccounts) != 0 && thresholdS != nil { for _, acnt := range savedAccounts { acntTnt := utils.NewTenantID(acnt.ID) - ev := &ThresholdEvent{ + ev := &utils.CGREvent{ Tenant: acntTnt.Tenant, ID: utils.GenUUID(), Event: map[string]interface{}{ diff --git a/engine/cdrs.go b/engine/cdrs.go index fe618a530..07be8cb5d 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -196,7 +196,7 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) { } if self.thdS != nil { cdrIf, _ := cdr.AsMapStringIface() - ev := &ThresholdEvent{ + ev := &utils.CGREvent{ Tenant: cdr.Tenant, ID: utils.GenUUID(), Event: cdrIf} diff --git a/engine/resources.go b/engine/resources.go index b4450d26c..52ff3a822 100755 --- a/engine/resources.go +++ b/engine/resources.go @@ -493,7 +493,7 @@ func (rS *ResourceService) processThresholds(r *Resource) (err error) { if rS.thdS == nil { return } - ev := &ThresholdEvent{ + ev := &utils.CGREvent{ Tenant: r.Tenant, ID: utils.GenUUID(), Event: map[string]interface{}{ diff --git a/engine/stats.go b/engine/stats.go index e7fd8e3ae..8a53064e6 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -231,7 +231,7 @@ func (sS *StatService) processEvent(ev *StatEvent) (err error) { sS.ssqMux.Unlock() } if sS.thdS != nil { - ev := &ThresholdEvent{ + ev := &utils.CGREvent{ Tenant: sq.Tenant, ID: utils.GenUUID(), Event: map[string]interface{}{ diff --git a/engine/thresholds.go b/engine/thresholds.go index 71a1f91ce..90605713e 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -21,7 +21,6 @@ package engine import ( "fmt" "math/rand" - "reflect" "sort" "sync" "time" @@ -50,54 +49,6 @@ func (tp *ThresholdProfile) TenantID() string { return utils.ConcatenatedKey(tp.Tenant, tp.ID) } -// ThresholdEvent is an event processed by ThresholdService -type ThresholdEvent struct { - Tenant string - ID string - Event map[string]interface{} -} - -func (te *ThresholdEvent) TenantID() string { - return utils.ConcatenatedKey(te.Tenant, te.ID) -} - -func (te *ThresholdEvent) Account() (acnt string, err error) { - acntIf, has := te.Event[utils.ACCOUNT] - if !has { - return "", utils.ErrNotFound - } - var canCast bool - if acnt, canCast = acntIf.(string); !canCast { - return "", fmt.Errorf("field %s is not string", utils.ACCOUNT) - } - return -} - -func (te *ThresholdEvent) FilterableEvent(fltredFields []string) (fEv map[string]interface{}) { - fEv = make(map[string]interface{}) - if len(fltredFields) == 0 { - i := 0 - fltredFields = make([]string, len(te.Event)) - for k := range te.Event { - fltredFields[i] = k - i++ - } - } - for _, fltrFld := range fltredFields { - fldVal, has := te.Event[fltrFld] - if !has { - continue // the field does not exist in map, ignore it - } - valOf := reflect.ValueOf(fldVal) - if valOf.Kind() == reflect.String { - fEv[fltrFld] = utils.StringToInterface(valOf.String()) // attempt converting from string to comparable interface - } else { - fEv[fltrFld] = fldVal - } - } - return -} - // Threshold is the unit matched by filters type Threshold struct { Tenant string @@ -115,14 +66,14 @@ func (t *Threshold) TenantID() string { // ProcessEvent processes an ThresholdEvent // concurrentActions limits the number of simultaneous action sets executed -func (t *Threshold) ProcessEvent(ev *ThresholdEvent, dm *DataManager) (err error) { +func (t *Threshold) ProcessEvent(ev *utils.CGREvent, dm *DataManager) (err error) { if t.Snooze.After(time.Now()) { // snoozed, not executing actions return } if t.Hits < t.tPrfl.MinHits { // number of hits was not met, will not execute actions return } - acnt, _ := ev.Account() + acnt, _ := ev.utils.FieldAsString(utils.ACCOUNT) var acntID string if acnt != "" { acntID = utils.ConcatenatedKey(ev.Tenant, acnt) @@ -262,7 +213,7 @@ func (tS *ThresholdService) StoreThreshold(t *Threshold) (err error) { } // matchingThresholdsForEvent returns ordered list of matching thresholds which are active for an Event -func (tS *ThresholdService) matchingThresholdsForEvent(ev *ThresholdEvent) (ts Thresholds, err error) { +func (tS *ThresholdService) matchingThresholdsForEvent(ev *utils.CGREvent) (ts Thresholds, err error) { matchingTs := make(map[string]*Threshold) tIDs, err := matchingItemIDsForEvent(ev.Event, tS.indexedFields, tS.dm, utils.ThresholdStringIndex+ev.Tenant) if err != nil { @@ -316,7 +267,7 @@ func (tS *ThresholdService) matchingThresholdsForEvent(ev *ThresholdEvent) (ts T } // processEvent processes a new event, dispatching to matching thresholds -func (tS *ThresholdService) processEvent(ev *ThresholdEvent) (hits int, err error) { +func (tS *ThresholdService) processEvent(ev *utils.CGREvent) (hits int, err error) { matchTs, err := tS.matchingThresholdsForEvent(ev) if err != nil { return 0, err @@ -361,7 +312,7 @@ func (tS *ThresholdService) processEvent(ev *ThresholdEvent) (hits int, err erro } // V1ProcessEvent implements ThresholdService method for processing an Event -func (tS *ThresholdService) V1ProcessEvent(ev *ThresholdEvent, reply *int) (err error) { +func (tS *ThresholdService) V1ProcessEvent(ev *utils.CGREvent, reply *int) (err error) { if missing := utils.MissingStructFields(ev, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing return utils.NewErrMandatoryIeMissing(missing...) } @@ -374,7 +325,7 @@ func (tS *ThresholdService) V1ProcessEvent(ev *ThresholdEvent, reply *int) (err } // V1GetThresholdsForEvent queries thresholds matching an Event -func (tS *ThresholdService) V1GetThresholdsForEvent(ev *ThresholdEvent, reply *Thresholds) (err error) { +func (tS *ThresholdService) V1GetThresholdsForEvent(ev *utils.CGREvent, reply *Thresholds) (err error) { if missing := utils.MissingStructFields(ev, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing return utils.NewErrMandatoryIeMissing(missing...) } diff --git a/utils/cgrevent.go b/utils/cgrevent.go index 2a7207130..cb00ef17c 100644 --- a/utils/cgrevent.go +++ b/utils/cgrevent.go @@ -20,6 +20,7 @@ package utils import ( "fmt" + "reflect" "time" ) @@ -89,3 +90,32 @@ func (ev *CGREvent) FieldAsDuration(fldName string) (d time.Duration, err error) } return ParseDurationWithNanosecs(s) } + +func (te *CGREvent) TenantID() string { + return ConcatenatedKey(te.Tenant, te.ID) +} + +func (te *CGREvent) FilterableEvent(fltredFields []string) (fEv map[string]interface{}) { + fEv = make(map[string]interface{}) + if len(fltredFields) == 0 { + i := 0 + fltredFields = make([]string, len(te.Event)) + for k := range te.Event { + fltredFields[i] = k + i++ + } + } + for _, fltrFld := range fltredFields { + fldVal, has := te.Event[fltrFld] + if !has { + continue // the field does not exist in map, ignore it + } + valOf := reflect.ValueOf(fldVal) + if valOf.Kind() == reflect.String { + fEv[fltrFld] = StringToInterface(valOf.String()) // attempt converting from string to comparable interface + } else { + fEv[fltrFld] = fldVal + } + } + return +}