mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-15 05:09:54 +05:00
Replaced ThresholdEvent with CGREvent
This commit is contained in:
committed by
Dan Christian Bogos
parent
03a718bd35
commit
0118c7f1d4
@@ -45,7 +45,7 @@ func (tSv1 *ThresholdSv1) GetThresholdIDs(tenant string, tIDs *[]string) error {
|
||||
}
|
||||
|
||||
// GetThresholdsForEvent returns a list of thresholds matching an event
|
||||
func (tSv1 *ThresholdSv1) GetThresholdsForEvent(ev *engine.ThresholdEvent, reply *engine.Thresholds) error {
|
||||
func (tSv1 *ThresholdSv1) GetThresholdsForEvent(ev *utils.CGREvent, reply *engine.Thresholds) error {
|
||||
return tSv1.tS.V1GetThresholdsForEvent(ev, reply)
|
||||
}
|
||||
|
||||
@@ -55,7 +55,7 @@ func (tSv1 *ThresholdSv1) GetThreshold(tntID *utils.TenantID, t *engine.Threshol
|
||||
}
|
||||
|
||||
// ProcessEvent will process an Event
|
||||
func (tSv1 *ThresholdSv1) ProcessEvent(ev *engine.ThresholdEvent, hits *int) error {
|
||||
func (tSv1 *ThresholdSv1) ProcessEvent(ev *utils.CGREvent, hits *int) error {
|
||||
return tSv1.tS.V1ProcessEvent(ev, hits)
|
||||
}
|
||||
|
||||
|
||||
@@ -41,8 +41,8 @@ var (
|
||||
thdsDelay int
|
||||
)
|
||||
|
||||
var tEvs = []*engine.ThresholdEvent{
|
||||
&engine.ThresholdEvent{ // hitting THD_ACNT_BALANCE_1
|
||||
var tEvs = []*utils.CGREvent{
|
||||
&utils.CGREvent{ // hitting THD_ACNT_BALANCE_1
|
||||
Tenant: "cgrates.org",
|
||||
ID: "event1",
|
||||
Event: map[string]interface{}{
|
||||
@@ -50,7 +50,7 @@ var tEvs = []*engine.ThresholdEvent{
|
||||
utils.ACCOUNT: "1002",
|
||||
utils.AllowNegative: true,
|
||||
utils.Disabled: false}},
|
||||
&engine.ThresholdEvent{ // hitting THD_ACNT_BALANCE_1
|
||||
&utils.CGREvent{ // hitting THD_ACNT_BALANCE_1
|
||||
Tenant: "cgrates.org",
|
||||
ID: "event2",
|
||||
Event: map[string]interface{}{
|
||||
@@ -60,7 +60,7 @@ var tEvs = []*engine.ThresholdEvent{
|
||||
utils.Units: 12.3,
|
||||
utils.ExpiryTime: time.Date(2009, 11, 10, 23, 00, 0, 0, time.UTC).Local(),
|
||||
}},
|
||||
&engine.ThresholdEvent{ // hitting THD_STATS_1
|
||||
&utils.CGREvent{ // hitting THD_STATS_1
|
||||
Tenant: "cgrates.org",
|
||||
ID: "event3",
|
||||
Event: map[string]interface{}{
|
||||
@@ -74,7 +74,7 @@ var tEvs = []*engine.ThresholdEvent{
|
||||
"ACC": 0.75,
|
||||
"PDD": "2s",
|
||||
}},
|
||||
&engine.ThresholdEvent{ // hitting THD_STATS_1 and THD_STATS_2
|
||||
&utils.CGREvent{ // hitting THD_STATS_1 and THD_STATS_2
|
||||
Tenant: "cgrates.org",
|
||||
ID: "event4",
|
||||
Event: map[string]interface{}{
|
||||
@@ -85,7 +85,7 @@ var tEvs = []*engine.ThresholdEvent{
|
||||
"ACD": "2m45s",
|
||||
"TCD": "1h",
|
||||
}},
|
||||
&engine.ThresholdEvent{ // hitting THD_STATS_3
|
||||
&utils.CGREvent{ // hitting THD_STATS_3
|
||||
Tenant: "cgrates.org",
|
||||
ID: "event5",
|
||||
Event: map[string]interface{}{
|
||||
@@ -95,7 +95,7 @@ var tEvs = []*engine.ThresholdEvent{
|
||||
"ACD": "2m45s",
|
||||
"TCD": "3h1s",
|
||||
}},
|
||||
&engine.ThresholdEvent{ // hitting THD_RES_1
|
||||
&utils.CGREvent{ // hitting THD_RES_1
|
||||
Tenant: "cgrates.org",
|
||||
ID: "event6",
|
||||
Event: map[string]interface{}{
|
||||
@@ -103,7 +103,7 @@ var tEvs = []*engine.ThresholdEvent{
|
||||
utils.ACCOUNT: "1002",
|
||||
utils.ResourceID: "RES_GRP_1",
|
||||
utils.USAGE: 10.0}},
|
||||
&engine.ThresholdEvent{ // hitting THD_RES_1
|
||||
&utils.CGREvent{ // hitting THD_RES_1
|
||||
Tenant: "cgrates.org",
|
||||
ID: "event6",
|
||||
Event: map[string]interface{}{
|
||||
@@ -111,7 +111,7 @@ var tEvs = []*engine.ThresholdEvent{
|
||||
utils.ACCOUNT: "1002",
|
||||
utils.ResourceID: "RES_GRP_1",
|
||||
utils.USAGE: 10.0}},
|
||||
&engine.ThresholdEvent{ // hitting THD_RES_1
|
||||
&utils.CGREvent{ // hitting THD_RES_1
|
||||
Tenant: "cgrates.org",
|
||||
ID: "event6",
|
||||
Event: map[string]interface{}{
|
||||
@@ -119,7 +119,7 @@ var tEvs = []*engine.ThresholdEvent{
|
||||
utils.ACCOUNT: "1002",
|
||||
utils.ResourceID: "RES_GRP_1",
|
||||
utils.USAGE: 10.0}},
|
||||
&engine.ThresholdEvent{ // hitting THD_CDRS_1
|
||||
&utils.CGREvent{ // hitting THD_CDRS_1
|
||||
Tenant: "cgrates.org",
|
||||
ID: "cdrev1",
|
||||
Event: map[string]interface{}{
|
||||
|
||||
@@ -779,7 +779,7 @@ func (bc Balances) SaveDirtyBalances(acc *Account) {
|
||||
accountId = b.account.ID
|
||||
acntTnt := utils.NewTenantID(accountId)
|
||||
if thresholdS != nil {
|
||||
ev := &ThresholdEvent{
|
||||
ev := &utils.CGREvent{
|
||||
Tenant: acntTnt.Tenant,
|
||||
ID: utils.GenUUID(),
|
||||
Event: map[string]interface{}{
|
||||
@@ -827,7 +827,7 @@ func (bc Balances) SaveDirtyBalances(acc *Account) {
|
||||
if len(savedAccounts) != 0 && thresholdS != nil {
|
||||
for _, acnt := range savedAccounts {
|
||||
acntTnt := utils.NewTenantID(acnt.ID)
|
||||
ev := &ThresholdEvent{
|
||||
ev := &utils.CGREvent{
|
||||
Tenant: acntTnt.Tenant,
|
||||
ID: utils.GenUUID(),
|
||||
Event: map[string]interface{}{
|
||||
|
||||
@@ -196,7 +196,7 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) {
|
||||
}
|
||||
if self.thdS != nil {
|
||||
cdrIf, _ := cdr.AsMapStringIface()
|
||||
ev := &ThresholdEvent{
|
||||
ev := &utils.CGREvent{
|
||||
Tenant: cdr.Tenant,
|
||||
ID: utils.GenUUID(),
|
||||
Event: cdrIf}
|
||||
|
||||
@@ -493,7 +493,7 @@ func (rS *ResourceService) processThresholds(r *Resource) (err error) {
|
||||
if rS.thdS == nil {
|
||||
return
|
||||
}
|
||||
ev := &ThresholdEvent{
|
||||
ev := &utils.CGREvent{
|
||||
Tenant: r.Tenant,
|
||||
ID: utils.GenUUID(),
|
||||
Event: map[string]interface{}{
|
||||
|
||||
@@ -231,7 +231,7 @@ func (sS *StatService) processEvent(ev *StatEvent) (err error) {
|
||||
sS.ssqMux.Unlock()
|
||||
}
|
||||
if sS.thdS != nil {
|
||||
ev := &ThresholdEvent{
|
||||
ev := &utils.CGREvent{
|
||||
Tenant: sq.Tenant,
|
||||
ID: utils.GenUUID(),
|
||||
Event: map[string]interface{}{
|
||||
|
||||
@@ -21,7 +21,6 @@ package engine
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -50,54 +49,6 @@ func (tp *ThresholdProfile) TenantID() string {
|
||||
return utils.ConcatenatedKey(tp.Tenant, tp.ID)
|
||||
}
|
||||
|
||||
// ThresholdEvent is an event processed by ThresholdService
|
||||
type ThresholdEvent struct {
|
||||
Tenant string
|
||||
ID string
|
||||
Event map[string]interface{}
|
||||
}
|
||||
|
||||
func (te *ThresholdEvent) TenantID() string {
|
||||
return utils.ConcatenatedKey(te.Tenant, te.ID)
|
||||
}
|
||||
|
||||
func (te *ThresholdEvent) Account() (acnt string, err error) {
|
||||
acntIf, has := te.Event[utils.ACCOUNT]
|
||||
if !has {
|
||||
return "", utils.ErrNotFound
|
||||
}
|
||||
var canCast bool
|
||||
if acnt, canCast = acntIf.(string); !canCast {
|
||||
return "", fmt.Errorf("field %s is not string", utils.ACCOUNT)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (te *ThresholdEvent) FilterableEvent(fltredFields []string) (fEv map[string]interface{}) {
|
||||
fEv = make(map[string]interface{})
|
||||
if len(fltredFields) == 0 {
|
||||
i := 0
|
||||
fltredFields = make([]string, len(te.Event))
|
||||
for k := range te.Event {
|
||||
fltredFields[i] = k
|
||||
i++
|
||||
}
|
||||
}
|
||||
for _, fltrFld := range fltredFields {
|
||||
fldVal, has := te.Event[fltrFld]
|
||||
if !has {
|
||||
continue // the field does not exist in map, ignore it
|
||||
}
|
||||
valOf := reflect.ValueOf(fldVal)
|
||||
if valOf.Kind() == reflect.String {
|
||||
fEv[fltrFld] = utils.StringToInterface(valOf.String()) // attempt converting from string to comparable interface
|
||||
} else {
|
||||
fEv[fltrFld] = fldVal
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Threshold is the unit matched by filters
|
||||
type Threshold struct {
|
||||
Tenant string
|
||||
@@ -115,14 +66,14 @@ func (t *Threshold) TenantID() string {
|
||||
|
||||
// ProcessEvent processes an ThresholdEvent
|
||||
// concurrentActions limits the number of simultaneous action sets executed
|
||||
func (t *Threshold) ProcessEvent(ev *ThresholdEvent, dm *DataManager) (err error) {
|
||||
func (t *Threshold) ProcessEvent(ev *utils.CGREvent, dm *DataManager) (err error) {
|
||||
if t.Snooze.After(time.Now()) { // snoozed, not executing actions
|
||||
return
|
||||
}
|
||||
if t.Hits < t.tPrfl.MinHits { // number of hits was not met, will not execute actions
|
||||
return
|
||||
}
|
||||
acnt, _ := ev.Account()
|
||||
acnt, _ := ev.utils.FieldAsString(utils.ACCOUNT)
|
||||
var acntID string
|
||||
if acnt != "" {
|
||||
acntID = utils.ConcatenatedKey(ev.Tenant, acnt)
|
||||
@@ -262,7 +213,7 @@ func (tS *ThresholdService) StoreThreshold(t *Threshold) (err error) {
|
||||
}
|
||||
|
||||
// matchingThresholdsForEvent returns ordered list of matching thresholds which are active for an Event
|
||||
func (tS *ThresholdService) matchingThresholdsForEvent(ev *ThresholdEvent) (ts Thresholds, err error) {
|
||||
func (tS *ThresholdService) matchingThresholdsForEvent(ev *utils.CGREvent) (ts Thresholds, err error) {
|
||||
matchingTs := make(map[string]*Threshold)
|
||||
tIDs, err := matchingItemIDsForEvent(ev.Event, tS.indexedFields, tS.dm, utils.ThresholdStringIndex+ev.Tenant)
|
||||
if err != nil {
|
||||
@@ -316,7 +267,7 @@ func (tS *ThresholdService) matchingThresholdsForEvent(ev *ThresholdEvent) (ts T
|
||||
}
|
||||
|
||||
// processEvent processes a new event, dispatching to matching thresholds
|
||||
func (tS *ThresholdService) processEvent(ev *ThresholdEvent) (hits int, err error) {
|
||||
func (tS *ThresholdService) processEvent(ev *utils.CGREvent) (hits int, err error) {
|
||||
matchTs, err := tS.matchingThresholdsForEvent(ev)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@@ -361,7 +312,7 @@ func (tS *ThresholdService) processEvent(ev *ThresholdEvent) (hits int, err erro
|
||||
}
|
||||
|
||||
// V1ProcessEvent implements ThresholdService method for processing an Event
|
||||
func (tS *ThresholdService) V1ProcessEvent(ev *ThresholdEvent, reply *int) (err error) {
|
||||
func (tS *ThresholdService) V1ProcessEvent(ev *utils.CGREvent, reply *int) (err error) {
|
||||
if missing := utils.MissingStructFields(ev, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
}
|
||||
@@ -374,7 +325,7 @@ func (tS *ThresholdService) V1ProcessEvent(ev *ThresholdEvent, reply *int) (err
|
||||
}
|
||||
|
||||
// V1GetThresholdsForEvent queries thresholds matching an Event
|
||||
func (tS *ThresholdService) V1GetThresholdsForEvent(ev *ThresholdEvent, reply *Thresholds) (err error) {
|
||||
func (tS *ThresholdService) V1GetThresholdsForEvent(ev *utils.CGREvent, reply *Thresholds) (err error) {
|
||||
if missing := utils.MissingStructFields(ev, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ package utils
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -89,3 +90,32 @@ func (ev *CGREvent) FieldAsDuration(fldName string) (d time.Duration, err error)
|
||||
}
|
||||
return ParseDurationWithNanosecs(s)
|
||||
}
|
||||
|
||||
func (te *CGREvent) TenantID() string {
|
||||
return ConcatenatedKey(te.Tenant, te.ID)
|
||||
}
|
||||
|
||||
func (te *CGREvent) FilterableEvent(fltredFields []string) (fEv map[string]interface{}) {
|
||||
fEv = make(map[string]interface{})
|
||||
if len(fltredFields) == 0 {
|
||||
i := 0
|
||||
fltredFields = make([]string, len(te.Event))
|
||||
for k := range te.Event {
|
||||
fltredFields[i] = k
|
||||
i++
|
||||
}
|
||||
}
|
||||
for _, fltrFld := range fltredFields {
|
||||
fldVal, has := te.Event[fltrFld]
|
||||
if !has {
|
||||
continue // the field does not exist in map, ignore it
|
||||
}
|
||||
valOf := reflect.ValueOf(fldVal)
|
||||
if valOf.Kind() == reflect.String {
|
||||
fEv[fltrFld] = StringToInterface(valOf.String()) // attempt converting from string to comparable interface
|
||||
} else {
|
||||
fEv[fltrFld] = fldVal
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user