mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
ActionS - schedule also non account related actions
This commit is contained in:
@@ -170,22 +170,36 @@ func (aS *ActionS) scheduleActions(tnt string, aPrflIDs []string, cgrEv *utils.C
|
||||
for _, aPf := range aPfs {
|
||||
ctx := context.Background()
|
||||
var acts []actioner
|
||||
for acntID := range aPf.AccountIDs {
|
||||
// actsExec will be used bellow as common code block
|
||||
actsExec := func(acntID string) (errExec error) {
|
||||
if len(acts) == 0 { // not yet initialized
|
||||
if acts, err = newActionersFromActions(aS.cfg, aS.fltrS, aS.dm, aPf.Actions); err != nil {
|
||||
if acts, errExec = newActionersFromActions(aS.cfg, aS.fltrS, aS.dm, aPf.Actions); errExec != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
sActs := newScheduledActs(aPf.Tenant, aPf.ID, acntID, ctx, evNm, acts)
|
||||
if aPf.Schedule == utils.ASAP {
|
||||
go aS.asapExecuteActions(sActs)
|
||||
continue
|
||||
return
|
||||
}
|
||||
if _, err = aS.crn.AddFunc(aPf.Schedule, sActs.ScheduledExecute); err != nil {
|
||||
if _, errExec = aS.crn.AddFunc(aPf.Schedule, sActs.ScheduledExecute); errExec != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> scheduling ActionProfile with id: <%s:%s>, error: <%s>",
|
||||
utils.ActionS, sActs.tenant, sActs.apID, err))
|
||||
utils.ActionS, sActs.tenant, sActs.apID, errExec))
|
||||
errExec = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
if len(aPf.AccountIDs) == 0 { // no accounts, other acts
|
||||
if err = actsExec(utils.EmptyString); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
for acntID := range aPf.AccountIDs {
|
||||
if err = actsExec(acntID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,7 +49,8 @@ func (s *scheduledActs) ScheduledExecute() {
|
||||
func (s *scheduledActs) Execute() (err error) {
|
||||
var partExec bool
|
||||
for _, act := range s.acts {
|
||||
if err := act.execute(nil, s.data); err != nil {
|
||||
//ctx, cancel := context.WithTimeout(s.ctx, act.cfg().TTL)
|
||||
if err := act.execute(s.ctx, s.data); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("executing action: <%s>, error: <%s>", act.id(), err))
|
||||
partExec = true
|
||||
}
|
||||
@@ -88,6 +89,7 @@ func newActioner(cfg *config.CGRConfig, fltrS *engine.FilterS, dm *engine.DataMa
|
||||
// actioner is implemented by each action type
|
||||
type actioner interface {
|
||||
id() string
|
||||
cfg() *engine.APAction
|
||||
execute(ctx context.Context, data utils.RWDataProvider) (err error)
|
||||
}
|
||||
|
||||
@@ -100,6 +102,10 @@ func (aL *actLog) id() string {
|
||||
return aL.aCfg.ID
|
||||
}
|
||||
|
||||
func (aL *actLog) cfg() *engine.APAction {
|
||||
return aL.aCfg
|
||||
}
|
||||
|
||||
// execute implements actioner interface
|
||||
func (aL *actLog) execute(ctx context.Context, data utils.RWDataProvider) (err error) {
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user