ActionS - V1ExecuteActions with targets

This commit is contained in:
DanB
2020-12-18 17:47:33 +01:00
parent 31ca4fcc36
commit 26c04a2415
3 changed files with 95 additions and 105 deletions

View File

@@ -19,10 +19,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package actions
import (
"context"
"fmt"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/cron"
)
@@ -125,7 +127,76 @@ func (aS *ActionS) matchingActionProfilesForEvent(tnt string, aPrflIDs []string,
return
}
/*
// scheduleActions is responsible for scheduling the action profiles matching cgrEv
func (aS *ActionS) scheduleActions(tnt string, aPrflIDs []string, cgrEv *utils.CGREventWithOpts, forceASAP bool) (err error) {
var partExec bool
var aPfs engine.ActionProfiles
if aPfs, err = aS.matchingActionProfilesForEvent(tnt, aPrflIDs, cgrEv); err != nil {
return
}
for _, aPf := range aPfs {
ctx := context.Background()
var trgActs map[string][]actioner // build here the list of actioners based on the trgKey
var trgKey string
for _, aCfg := range aPf.Actions { // create actioners and attach them to the right target
if trgTyp := actionTarget(aCfg.Type); trgTyp != utils.META_NONE ||
trgKey == utils.EmptyString {
trgKey = trgTyp
}
if act, errAct := newActioner(aS.cfg, aS.fltrS, aS.dm, aCfg); errAct != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> ignoring ActionProfile with id: <%s:%s> creating action: <%s>, error: <%s>",
utils.ActionS, aPf.Tenant, aPf.ID, aCfg.ID, errAct))
partExec = true
break
} else {
trgActs[trgKey] = append(trgActs[trgKey], act)
}
}
if partExec {
continue // skip this profile from processing further
}
// build schedActSet
var schedActSet []*scheduledActs
for trg, acts := range trgActs {
if trg == utils.META_NONE { // only one scheduledActs set
schedActSet = append(schedActSet, newScheduledActs(aPf.Tenant, aPf.ID, trg, utils.EmptyString,
ctx, &ActData{cgrEv.CGREvent.Event, cgrEv.Opts}, acts))
continue
}
if len(aPf.Targets[trg]) == 0 {
continue // no items selected
}
for trgID := range aPf.Targets[trg] {
schedActSet = append(schedActSet, newScheduledActs(aPf.Tenant, aPf.ID, trg, trgID,
ctx, &ActData{cgrEv.CGREvent.Event, cgrEv.Opts}, acts))
}
}
// execute the actions
for _, sActs := range schedActSet {
if aPf.Schedule == utils.ASAP || forceASAP {
go aS.asapExecuteActions(sActs)
continue
}
if _, errExec := aS.crn.AddFunc(aPf.Schedule, sActs.ScheduledExecute); errExec != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> scheduling ActionProfile with id: <%s:%s>, error: <%s>",
utils.ActionS, sActs.tenant, sActs.apID, errExec))
partExec = true
continue
}
}
}
if partExec {
err = utils.ErrPartiallyExecuted
}
return
}
// asapExecuteActions executes the scheduledActs and removes the executed from database
// uses locks to avoid concurrent access
func (aS *ActionS) asapExecuteActions(sActs *scheduledActs) (err error) {
@@ -141,13 +212,8 @@ func (aS *ActionS) asapExecuteActions(sActs *scheduledActs) (err error) {
if gErr = sActs.Execute(); gErr != nil { // cannot remove due to errors on execution
return
}
delete(ap.AccountIDs, sActs.apID)
if len(ap.AccountIDs) == 0 {
gErr = aS.dm.RemoveActionProfile(sActs.tenant, sActs.apID, utils.NonTransactional, true)
} else {
gErr = aS.dm.SetActionProfile(ap, true)
}
if gErr != nil {
delete(ap.Targets[sActs.trgTyp], sActs.trgID)
if gErr = aS.dm.SetActionProfile(ap, true); gErr != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> saving ActionProfile with id: <%s:%s>, error: <%s>",
@@ -158,52 +224,6 @@ func (aS *ActionS) asapExecuteActions(sActs *scheduledActs) (err error) {
return
}
// scheduleActions is responsible for scheduling the actions needing execution
func (aS *ActionS) scheduleActions(tnt string, aPrflIDs []string, cgrEv *utils.CGREventWithOpts) (err error) {
var aPfs engine.ActionProfiles
if aPfs, err = aS.matchingActionProfilesForEvent(tnt, aPrflIDs, cgrEv); err != nil {
return
}
for _, aPf := range aPfs {
ctx := context.Background()
var acts []actioner
// actsExec will be used bellow as common code block
actsExec := func(acntID string) (errExec error) {
if len(acts) == 0 { // not yet initialized
if acts, errExec = newActionersFromActions(aS.cfg, aS.fltrS, aS.dm, aPf.Actions); errExec != nil {
return
}
}
sActs := newScheduledActs(aPf.Tenant, aPf.ID, acntID, ctx,
&ActData{cgrEv.CGREvent.Event, cgrEv.Opts}, acts)
if aPf.Schedule == utils.ASAP {
go aS.asapExecuteActions(sActs)
return
}
if _, errExec = aS.crn.AddFunc(aPf.Schedule, sActs.ScheduledExecute); errExec != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> scheduling ActionProfile with id: <%s:%s>, error: <%s>",
utils.ActionS, sActs.tenant, sActs.apID, errExec))
errExec = nil
}
return
}
if len(aPf.AccountIDs) == 0 { // no accounts, other acts
if err = actsExec(utils.EmptyString); err != nil {
return err
}
continue
}
for acntID := range aPf.AccountIDs {
if err = actsExec(acntID); err != nil {
return err
}
}
}
return
}
type ArgActionSv1ExecuteActions struct {
*utils.CGREventWithOpts
ActionProfileIDs []string
@@ -211,51 +231,10 @@ type ArgActionSv1ExecuteActions struct {
// V1ExecuteActions will be called to execute ASAP action profiles, ignoring their Schedule field
func (aS *ActionS) V1ExecuteActions(args *ArgActionSv1ExecuteActions, rpl *string) (err error) {
var aPfs engine.ActionProfiles
if aPfs, err = aS.matchingActionProfilesForEvent(args.Tenant, args.ActionProfileIDs,
args.CGREventWithOpts); err != nil {
return utils.NewErrServerError(err)
}
var partExec bool
for _, aPf := range aPfs {
ctx := context.Background()
var acts []actioner
// actsExec will be used bellow as common code block
actsExec := func(acntID string) (errExec error) {
if len(acts) == 0 { // not yet initialized
if acts, errExec = newActionersFromActions(aS.cfg, aS.fltrS, aS.dm, aPf.Actions); errExec != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> creating actions for ActionProfile with id: <%s:%s>, error: <%s>",
utils.ActionS, args.Tenant, aPf.ID, errExec))
partExec = true
return
}
}
sActs := newScheduledActs(aPf.Tenant, aPf.ID, acntID, ctx,
&ActData{args.CGREvent.Event, args.Opts}, acts)
if errExec = aS.asapExecuteActions(sActs); errExec != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> executing ActionProfile with id: <%s:%s>, error: <%s>",
utils.ActionS, sActs.tenant, sActs.apID, errExec))
partExec = true
return
}
return
}
if len(aPf.AccountIDs) == 0 { // no accounts, other acts
actsExec(utils.EmptyString)
continue
}
for acntID := range aPf.AccountIDs {
actsExec(acntID)
}
}
if partExec {
return utils.ErrPartiallyExecuted
if err = aS.scheduleActions(args.CGREventWithOpts.Tenant, args.ActionProfileIDs,
args.CGREventWithOpts, true); err != nil {
return
}
*rpl = utils.OK
return
}
*/

View File

@@ -28,18 +28,27 @@ import (
"github.com/cgrates/ltcache"
)
func newScheduledActs(tenant, apID, acntID string, ctx context.Context,
data *ActData, acts []actioner) (sActs *scheduledActs) {
return &scheduledActs{tenant, apID, acntID, ctx, data, acts,
// actionTarget returns the target attached to an action
func actionTarget(act string) (trgt string) {
switch act {
default:
trgt = utils.META_NONE
}
return
}
func newScheduledActs(tenant, apID, trgTyp, trgID string,
ctx context.Context, data *ActData, acts []actioner) (sActs *scheduledActs) {
return &scheduledActs{tenant, apID, trgTyp, trgID, ctx, data, acts,
ltcache.NewTransCache(map[string]*ltcache.CacheConfig{})}
}
// scheduled is a set of actions which will be executed directly or by the cron.schedule
type scheduledActs struct {
tenant, apID, acntID string
ctx context.Context
data *ActData
acts []actioner
tenant, apID, trgTyp, trgID string
ctx context.Context
data *ActData
acts []actioner
cch *ltcache.TransCache // cache data between actions here
}

2
data/scripts/eval_dep.sh Executable file
View File

@@ -0,0 +1,2 @@
#!/bin/sh
eval `go build -work -a 2>&1` && find $WORK -type f -name "*.a" | xargs -I{} du -hxs "{}" | sort -rh | sed -e s:${WORK}/::g