From b54ac6e3b5da7d862100cd80c2dfd3fde4b62862 Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 29 Sep 2017 18:41:58 +0200 Subject: [PATCH] Threshold.ProcessEvent --- engine/resources.go | 2 + engine/stats.go | 34 ++---------- engine/thresholds.go | 106 ++++++++++++++++++++++++++++++++++---- engine/thresholds_test.go | 42 +++++++++++++++ utils/coreutils.go | 29 +++++++++++ 5 files changed, 174 insertions(+), 39 deletions(-) create mode 100644 engine/thresholds_test.go diff --git a/engine/resources.go b/engine/resources.go index 428f04ec1..db721d636 100755 --- a/engine/resources.go +++ b/engine/resources.go @@ -579,7 +579,9 @@ func (rS *ResourceService) V1AllocateResource(args utils.ArgRSv1ResourceUsage, r rS.StoreResource(r) } else if r.dirty != nil { *r.dirty = true // mark it to be saved + rS.srMux.Lock() rS.storedResources[r.TenantID()] = true + rS.srMux.Unlock() } } rS.srMux.Unlock() diff --git a/engine/stats.go b/engine/stats.go index abd8a4011..5206bc885 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -21,8 +21,6 @@ package engine import ( "fmt" "math/rand" - "reflect" - "strings" "sync" "time" @@ -30,7 +28,6 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/rpcclient" ) // NewStatService initializes a StatService @@ -193,35 +190,12 @@ func (sS *StatService) matchingStatQueuesForEvent(ev *StatEvent) (sqs StatQueues // Call implements rpcclient.RpcClientConnection interface for internal RPC // here for cases when passing StatsService as rpccclient.RpcClientConnection (ie. in ResourceS) func (ss *StatService) Call(serviceMethod string, args interface{}, reply interface{}) error { - methodSplit := strings.Split(serviceMethod, ".") - if len(methodSplit) != 2 { - return rpcclient.ErrUnsupporteServiceMethod - } - method := reflect.ValueOf(ss).MethodByName(methodSplit[0][len(methodSplit[0])-2:] + methodSplit[1]) - if !method.IsValid() { - return rpcclient.ErrUnsupporteServiceMethod - } - params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)} - ret := method.Call(params) - if len(ret) != 1 { - return utils.ErrServerError - } - if ret[0].Interface() == nil { - return nil - } - err, ok := ret[0].Interface().(error) - if !ok { - return utils.ErrServerError - } - return err + return utils.RPCCall(ss, serviceMethod, args, reply) } // processEvent processes a new event, dispatching to matching queues // queues matching are also cached to speed up func (sS *StatService) processEvent(ev *StatEvent) (err error) { - if missing := utils.MissingStructFields(ev, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing - return utils.NewErrMandatoryIeMissing(missing...) - } matchSQs, err := sS.matchingStatQueuesForEvent(ev) if err != nil { return err @@ -237,9 +211,6 @@ func (sS *StatService) processEvent(ev *StatEvent) (err error) { sq.TenantID(), ev.TenantID(), err.Error())) withErrors = true } - if sq.sqPrfl.Blocker { - break - } } if withErrors { err = utils.ErrPartiallyExecuted @@ -249,6 +220,9 @@ func (sS *StatService) processEvent(ev *StatEvent) (err error) { // V1ProcessEvent implements StatV1 method for processing an Event func (sS *StatService) V1ProcessEvent(ev *StatEvent, reply *string) (err error) { + if missing := utils.MissingStructFields(ev, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) + } if err = sS.processEvent(ev); err == nil { *reply = utils.OK } diff --git a/engine/thresholds.go b/engine/thresholds.go index 6b3ab328f..fb0a056a2 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -55,13 +55,27 @@ type ThresholdEvent struct { Fields map[string]interface{} } +func (te *ThresholdEvent) TenantID() string { + return utils.ConcatenatedKey(te.Tenant, te.ID) +} + +func (te *ThresholdEvent) Account() (acnt string, err error) { + acntIf, has := te.Fields[utils.ACCOUNT] + if !has { + return "", utils.ErrNotFound + } + var canCast bool + if acnt, canCast = acntIf.(string); !canCast { + return "", fmt.Errorf("field %s is not string", utils.ACCOUNT) + } + return +} + // Threshold is the unit matched by filters -// It's WakeupTime is stored on demand type Threshold struct { - Tenant string - ID string - LastExecuted time.Time - WakeupTime time.Time // prevent threshold to run too early + Tenant string + ID string + Snooze time.Time // prevent threshold to run too early tPrfl *ThresholdProfile dirty *bool // needs save @@ -71,6 +85,33 @@ func (t *Threshold) TenantID() string { return utils.ConcatenatedKey(t.Tenant, t.ID) } +// ProcessEvent processes an ThresholdEvent +// concurrentActions limits the number of simultaneous action sets executed +func (t *Threshold) ProcessEvent(ev *ThresholdEvent, dm *DataManager) (err error) { + if t.Snooze.After(time.Now()) { // ignore the event + return + } + acnt, _ := ev.Account() + var acntID string + if acnt != "" { + acntID = utils.ConcatenatedKey(ev.Tenant, acnt) + } + for _, actionSetID := range t.tPrfl.ActionIDs { + at := &ActionTiming{ + Uuid: utils.GenUUID(), + ActionsID: actionSetID, + } + if acntID != "" { + at.accountIDs = utils.NewStringMap(acntID) + } + if errExec := at.Execute(nil, nil); errExec != nil { + utils.Logger.Warning(fmt.Sprintf(" failed executing actions: %s, error: %s", actionSetID, errExec.Error())) + err = utils.ErrPartiallyExecuted + } + } + return +} + // Thresholds is a sortable slice of Threshold type Thresholds []*Threshold @@ -79,16 +120,19 @@ func (ts Thresholds) Sort() { sort.Slice(ts, func(i, j int) bool { return ts[i].tPrfl.Weight > ts[j].tPrfl.Weight }) } -func NewThresholdService(dm *DataManager, storeInterval time.Duration, +func NewThresholdService(dm *DataManager, filterFields []string, storeInterval time.Duration, statS rpcclient.RpcClientConnection) (tS *ThresholdService, err error) { - return &ThresholdService{dm: dm, storeInterval: storeInterval, - statS: statS, - stopBackup: make(chan struct{})}, nil + return &ThresholdService{dm: dm, + filterFields: filterFields, + storeInterval: storeInterval, + statS: statS, + stopBackup: make(chan struct{})}, nil } // ThresholdService manages Threshold execution and storing them to dataDB type ThresholdService struct { dm *DataManager + filterFields []string // fields considered when searching for matching thresholds storeInterval time.Duration statS rpcclient.RpcClientConnection // allows applying filters based on stats stopBackup chan struct{} @@ -232,3 +276,47 @@ func (tS *ThresholdService) matchingThresholdsForEvent(ev *ThresholdEvent) (ts T } return } + +// processEvent processes a new event, dispatching to matching thresholds +func (tS *ThresholdService) processEvent(ev *ThresholdEvent) (err error) { + matchTs, err := tS.matchingThresholdsForEvent(ev) + if err != nil { + return err + } else if len(matchTs) == 0 { + return utils.ErrNotFound + } + var withErrors bool + for _, t := range matchTs { + err = t.ProcessEvent(ev, tS.dm) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf(" threshold: %s, ignoring event: %s, error: %s", + t.TenantID(), ev.TenantID(), err.Error())) + withErrors = true + continue + } + lockThreshold := utils.ThresholdPrefix + t.TenantID() + guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockThreshold) + if t.dirty == nil { // one time threshold + if err = tS.dm.DataDB().RemoveThreshold(t.Tenant, t.ID, utils.NonTransactional); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" failed removing non-recurrent threshold: %s, error: %s", + t.TenantID(), err.Error())) + withErrors = true + guardian.Guardian.UnguardIDs(lockThreshold) + continue + } + } + // recurrent threshold + *t.dirty = true // mark it to be saved + t.Snooze = time.Now().Add(t.tPrfl.MinSleep) + tS.stMux.Lock() + tS.storedTdIDs[t.TenantID()] = true + tS.stMux.Unlock() + guardian.Guardian.UnguardIDs(lockThreshold) + } + if withErrors { + err = utils.ErrPartiallyExecuted + } + return +} diff --git a/engine/thresholds_test.go b/engine/thresholds_test.go new file mode 100644 index 000000000..7414b019e --- /dev/null +++ b/engine/thresholds_test.go @@ -0,0 +1,42 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package engine + +import ( + "reflect" + "testing" +) + +func TestThresholdsSort(t *testing.T) { + ts := Thresholds{ + &Threshold{tPrfl: &ThresholdProfile{ID: "FIRST", Weight: 30.0}}, + &Threshold{tPrfl: &ThresholdProfile{ID: "SECOND", Weight: 40.0}}, + &Threshold{tPrfl: &ThresholdProfile{ID: "THIRD", Weight: 30.0}}, + &Threshold{tPrfl: &ThresholdProfile{ID: "FOURTH", Weight: 35.0}}, + } + ts.Sort() + eInst := Thresholds{ + &Threshold{tPrfl: &ThresholdProfile{ID: "SECOND", Weight: 40.0}}, + &Threshold{tPrfl: &ThresholdProfile{ID: "FOURTH", Weight: 35.0}}, + &Threshold{tPrfl: &ThresholdProfile{ID: "FIRST", Weight: 30.0}}, + &Threshold{tPrfl: &ThresholdProfile{ID: "THIRD", Weight: 30.0}}, + } + if !reflect.DeepEqual(eInst, ts) { + t.Errorf("expecting: %+v, received: %+v", eInst, ts) + } +} diff --git a/utils/coreutils.go b/utils/coreutils.go index df8db9ec3..94d95a43b 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -41,6 +41,8 @@ import ( "strings" "sync" "time" + + "github.com/cgrates/rpcclient" ) func NewCounter(start, limit int64) *Counter { @@ -779,3 +781,30 @@ type TenantID struct { func (tID *TenantID) TenantID() string { return ConcatenatedKey(tID.Tenant, tID.ID) } + +// RPCCall is a generic method calling RPC on a struct instance +// serviceMethod is assumed to be in the form InstanceV1.Method +// where V1Method will become RPC method called on instance +func RPCCall(inst interface{}, serviceMethod string, args interface{}, reply interface{}) error { + methodSplit := strings.Split(serviceMethod, ".") + if len(methodSplit) != 2 { + return rpcclient.ErrUnsupporteServiceMethod + } + method := reflect.ValueOf(inst).MethodByName(methodSplit[0][len(methodSplit[0])-2:] + methodSplit[1]) + if !method.IsValid() { + return rpcclient.ErrUnsupporteServiceMethod + } + params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)} + ret := method.Call(params) + if len(ret) != 1 { + return ErrServerError + } + if ret[0].Interface() == nil { + return nil + } + err, ok := ret[0].Interface().(error) + if !ok { + return ErrServerError + } + return err +}