Threshold.ProcessEvent

This commit is contained in:
DanB
2017-09-29 18:41:58 +02:00
parent 7eb6d9600b
commit b54ac6e3b5
5 changed files with 174 additions and 39 deletions

View File

@@ -579,7 +579,9 @@ func (rS *ResourceService) V1AllocateResource(args utils.ArgRSv1ResourceUsage, r
rS.StoreResource(r)
} else if r.dirty != nil {
*r.dirty = true // mark it to be saved
rS.srMux.Lock()
rS.storedResources[r.TenantID()] = true
rS.srMux.Unlock()
}
}
rS.srMux.Unlock()

View File

@@ -21,8 +21,6 @@ package engine
import (
"fmt"
"math/rand"
"reflect"
"strings"
"sync"
"time"
@@ -30,7 +28,6 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// NewStatService initializes a StatService
@@ -193,35 +190,12 @@ func (sS *StatService) matchingStatQueuesForEvent(ev *StatEvent) (sqs StatQueues
// Call implements rpcclient.RpcClientConnection interface for internal RPC
// here for cases when passing StatsService as rpccclient.RpcClientConnection (ie. in ResourceS)
func (ss *StatService) Call(serviceMethod string, args interface{}, reply interface{}) error {
methodSplit := strings.Split(serviceMethod, ".")
if len(methodSplit) != 2 {
return rpcclient.ErrUnsupporteServiceMethod
}
method := reflect.ValueOf(ss).MethodByName(methodSplit[0][len(methodSplit[0])-2:] + methodSplit[1])
if !method.IsValid() {
return rpcclient.ErrUnsupporteServiceMethod
}
params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
ret := method.Call(params)
if len(ret) != 1 {
return utils.ErrServerError
}
if ret[0].Interface() == nil {
return nil
}
err, ok := ret[0].Interface().(error)
if !ok {
return utils.ErrServerError
}
return err
return utils.RPCCall(ss, serviceMethod, args, reply)
}
// processEvent processes a new event, dispatching to matching queues
// queues matching are also cached to speed up
func (sS *StatService) processEvent(ev *StatEvent) (err error) {
if missing := utils.MissingStructFields(ev, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
matchSQs, err := sS.matchingStatQueuesForEvent(ev)
if err != nil {
return err
@@ -237,9 +211,6 @@ func (sS *StatService) processEvent(ev *StatEvent) (err error) {
sq.TenantID(), ev.TenantID(), err.Error()))
withErrors = true
}
if sq.sqPrfl.Blocker {
break
}
}
if withErrors {
err = utils.ErrPartiallyExecuted
@@ -249,6 +220,9 @@ func (sS *StatService) processEvent(ev *StatEvent) (err error) {
// V1ProcessEvent implements StatV1 method for processing an Event
func (sS *StatService) V1ProcessEvent(ev *StatEvent, reply *string) (err error) {
if missing := utils.MissingStructFields(ev, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
if err = sS.processEvent(ev); err == nil {
*reply = utils.OK
}

View File

@@ -55,13 +55,27 @@ type ThresholdEvent struct {
Fields map[string]interface{}
}
func (te *ThresholdEvent) TenantID() string {
return utils.ConcatenatedKey(te.Tenant, te.ID)
}
func (te *ThresholdEvent) Account() (acnt string, err error) {
acntIf, has := te.Fields[utils.ACCOUNT]
if !has {
return "", utils.ErrNotFound
}
var canCast bool
if acnt, canCast = acntIf.(string); !canCast {
return "", fmt.Errorf("field %s is not string", utils.ACCOUNT)
}
return
}
// Threshold is the unit matched by filters
// It's WakeupTime is stored on demand
type Threshold struct {
Tenant string
ID string
LastExecuted time.Time
WakeupTime time.Time // prevent threshold to run too early
Tenant string
ID string
Snooze time.Time // prevent threshold to run too early
tPrfl *ThresholdProfile
dirty *bool // needs save
@@ -71,6 +85,33 @@ func (t *Threshold) TenantID() string {
return utils.ConcatenatedKey(t.Tenant, t.ID)
}
// ProcessEvent processes an ThresholdEvent
// concurrentActions limits the number of simultaneous action sets executed
func (t *Threshold) ProcessEvent(ev *ThresholdEvent, dm *DataManager) (err error) {
if t.Snooze.After(time.Now()) { // ignore the event
return
}
acnt, _ := ev.Account()
var acntID string
if acnt != "" {
acntID = utils.ConcatenatedKey(ev.Tenant, acnt)
}
for _, actionSetID := range t.tPrfl.ActionIDs {
at := &ActionTiming{
Uuid: utils.GenUUID(),
ActionsID: actionSetID,
}
if acntID != "" {
at.accountIDs = utils.NewStringMap(acntID)
}
if errExec := at.Execute(nil, nil); errExec != nil {
utils.Logger.Warning(fmt.Sprintf("<ThresholdS> failed executing actions: %s, error: %s", actionSetID, errExec.Error()))
err = utils.ErrPartiallyExecuted
}
}
return
}
// Thresholds is a sortable slice of Threshold
type Thresholds []*Threshold
@@ -79,16 +120,19 @@ func (ts Thresholds) Sort() {
sort.Slice(ts, func(i, j int) bool { return ts[i].tPrfl.Weight > ts[j].tPrfl.Weight })
}
func NewThresholdService(dm *DataManager, storeInterval time.Duration,
func NewThresholdService(dm *DataManager, filterFields []string, storeInterval time.Duration,
statS rpcclient.RpcClientConnection) (tS *ThresholdService, err error) {
return &ThresholdService{dm: dm, storeInterval: storeInterval,
statS: statS,
stopBackup: make(chan struct{})}, nil
return &ThresholdService{dm: dm,
filterFields: filterFields,
storeInterval: storeInterval,
statS: statS,
stopBackup: make(chan struct{})}, nil
}
// ThresholdService manages Threshold execution and storing them to dataDB
type ThresholdService struct {
dm *DataManager
filterFields []string // fields considered when searching for matching thresholds
storeInterval time.Duration
statS rpcclient.RpcClientConnection // allows applying filters based on stats
stopBackup chan struct{}
@@ -232,3 +276,47 @@ func (tS *ThresholdService) matchingThresholdsForEvent(ev *ThresholdEvent) (ts T
}
return
}
// processEvent processes a new event, dispatching to matching thresholds
func (tS *ThresholdService) processEvent(ev *ThresholdEvent) (err error) {
matchTs, err := tS.matchingThresholdsForEvent(ev)
if err != nil {
return err
} else if len(matchTs) == 0 {
return utils.ErrNotFound
}
var withErrors bool
for _, t := range matchTs {
err = t.ProcessEvent(ev, tS.dm)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<ThresholdService> threshold: %s, ignoring event: %s, error: %s",
t.TenantID(), ev.TenantID(), err.Error()))
withErrors = true
continue
}
lockThreshold := utils.ThresholdPrefix + t.TenantID()
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockThreshold)
if t.dirty == nil { // one time threshold
if err = tS.dm.DataDB().RemoveThreshold(t.Tenant, t.ID, utils.NonTransactional); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<ThresholdService> failed removing non-recurrent threshold: %s, error: %s",
t.TenantID(), err.Error()))
withErrors = true
guardian.Guardian.UnguardIDs(lockThreshold)
continue
}
}
// recurrent threshold
*t.dirty = true // mark it to be saved
t.Snooze = time.Now().Add(t.tPrfl.MinSleep)
tS.stMux.Lock()
tS.storedTdIDs[t.TenantID()] = true
tS.stMux.Unlock()
guardian.Guardian.UnguardIDs(lockThreshold)
}
if withErrors {
err = utils.ErrPartiallyExecuted
}
return
}

42
engine/thresholds_test.go Normal file
View File

@@ -0,0 +1,42 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"reflect"
"testing"
)
func TestThresholdsSort(t *testing.T) {
ts := Thresholds{
&Threshold{tPrfl: &ThresholdProfile{ID: "FIRST", Weight: 30.0}},
&Threshold{tPrfl: &ThresholdProfile{ID: "SECOND", Weight: 40.0}},
&Threshold{tPrfl: &ThresholdProfile{ID: "THIRD", Weight: 30.0}},
&Threshold{tPrfl: &ThresholdProfile{ID: "FOURTH", Weight: 35.0}},
}
ts.Sort()
eInst := Thresholds{
&Threshold{tPrfl: &ThresholdProfile{ID: "SECOND", Weight: 40.0}},
&Threshold{tPrfl: &ThresholdProfile{ID: "FOURTH", Weight: 35.0}},
&Threshold{tPrfl: &ThresholdProfile{ID: "FIRST", Weight: 30.0}},
&Threshold{tPrfl: &ThresholdProfile{ID: "THIRD", Weight: 30.0}},
}
if !reflect.DeepEqual(eInst, ts) {
t.Errorf("expecting: %+v, received: %+v", eInst, ts)
}
}

View File

@@ -41,6 +41,8 @@ import (
"strings"
"sync"
"time"
"github.com/cgrates/rpcclient"
)
func NewCounter(start, limit int64) *Counter {
@@ -779,3 +781,30 @@ type TenantID struct {
func (tID *TenantID) TenantID() string {
return ConcatenatedKey(tID.Tenant, tID.ID)
}
// RPCCall is a generic method calling RPC on a struct instance
// serviceMethod is assumed to be in the form InstanceV1.Method
// where V1Method will become RPC method called on instance
func RPCCall(inst interface{}, serviceMethod string, args interface{}, reply interface{}) error {
methodSplit := strings.Split(serviceMethod, ".")
if len(methodSplit) != 2 {
return rpcclient.ErrUnsupporteServiceMethod
}
method := reflect.ValueOf(inst).MethodByName(methodSplit[0][len(methodSplit[0])-2:] + methodSplit[1])
if !method.IsValid() {
return rpcclient.ErrUnsupporteServiceMethod
}
params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
ret := method.Call(params)
if len(ret) != 1 {
return ErrServerError
}
if ret[0].Interface() == nil {
return nil
}
err, ok := ret[0].Interface().(error)
if !ok {
return ErrServerError
}
return err
}