ActionS - schedInit for scheduler reload

This commit is contained in:
DanB
2020-12-20 17:09:09 +01:00
parent 42324852c2
commit 73e897916d
4 changed files with 89 additions and 27 deletions

View File

@@ -21,6 +21,8 @@ package actions
import (
"context"
"fmt"
"sync"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -35,7 +37,6 @@ func NewActionS(cfg *config.CGRConfig, fltrS *engine.FilterS, dm *engine.DataMan
cfg: cfg,
fltrS: fltrS,
dm: dm,
crn: cron.New(),
}
}
@@ -45,13 +46,14 @@ type ActionS struct {
fltrS *engine.FilterS
dm *engine.DataManager
crn *cron.Cron
crnLk *sync.RWMutex
}
// ListenAndServe keeps the service alive
func (aS *ActionS) ListenAndServe(stopChan, cfgRld chan struct{}) {
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s>",
utils.CoreS, utils.ActionS))
aS.crn.Start()
aS.schedInit() // initialize cron and schedule actions
for {
select {
case <-stopChan:
@@ -65,7 +67,9 @@ func (aS *ActionS) ListenAndServe(stopChan, cfgRld chan struct{}) {
// Shutdown is called to shutdown the service
func (aS *ActionS) Shutdown() (err error) {
utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.ActionS))
aS.crnLk.RLock()
aS.crn.Stop()
aS.crnLk.RUnlock()
return
}
@@ -74,6 +78,66 @@ func (aS *ActionS) Call(serviceMethod string, args interface{}, reply interface{
return utils.RPCCall(aS, serviceMethod, args, reply)
}
// schedInit will set up cron and load the matching data
func (aS *ActionS) schedInit() (err error) {
aS.crnLk.Lock() // make sure we don't have parallel processes running setu
defer aS.crnLk.Unlock()
utils.Logger.Info(fmt.Sprintf("<%s> initializing scheduler.", utils.ActionS))
tnts := []string{aS.cfg.GeneralCfg().DefaultTenant}
if aS.cfg.ActionSCfg().Tenants != nil {
tnts = *aS.cfg.ActionSCfg().Tenants
}
if len(tnts) == 0 {
return utils.NewErrMandatoryIeMissing(utils.TenantCfg)
}
crn := cron.New()
var partExec bool
for _, tnt := range tnts {
cgrEv := &utils.CGREventWithOpts{
CGREvent: &utils.CGREvent{
Tenant: tnt,
ID: utils.GenUUID(),
Time: utils.TimePointer(time.Now()),
},
Opts: map[string]interface{}{
utils.EventType: utils.SchedulerInit,
utils.NodeID: aS.cfg.GeneralCfg().NodeID,
},
}
var schedActSet []*scheduledActs
if schedActSet, err = aS.scheduledActions(tnt, nil, cgrEv, false); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> scheduler init, ignoring tenant: <%s>, error: <%s>",
utils.ActionS, tnt, err))
partExec = true
continue
}
for _, sActs := range schedActSet {
if sActs.schedule == utils.ASAP {
go aS.asapExecuteActions(sActs)
continue
}
if _, err = crn.AddFunc(sActs.schedule, sActs.ScheduledExecute); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> scheduling ActionProfile with id: <%s:%s>, error: <%s>",
utils.ActionS, sActs.tenant, sActs.apID, err))
partExec = true
continue
}
}
}
if partExec {
err = utils.ErrPartiallyExecuted
}
if aS.crn != nil {
aS.crn.Stop()
}
aS.crn.Start()
return
}
// matchingActionProfilesForEvent returns the matched ActionProfiles for the given event
func (aS *ActionS) matchingActionProfilesForEvent(tnt string, aPrflIDs []string,
cgrEv *utils.CGREventWithOpts) (aPfs engine.ActionProfiles, err error) {
@@ -127,8 +191,9 @@ func (aS *ActionS) matchingActionProfilesForEvent(tnt string, aPrflIDs []string,
return
}
// scheduleActions is responsible for scheduling the action profiles matching cgrEv
func (aS *ActionS) scheduleActions(tnt string, aPrflIDs []string, cgrEv *utils.CGREventWithOpts, forceASAP bool) (err error) {
// scheduledActions is responsible for scheduling the action profiles matching cgrEv
func (aS *ActionS) scheduledActions(tnt string, aPrflIDs []string, cgrEv *utils.CGREventWithOpts,
forceASAP bool) (schedActs []*scheduledActs, err error) {
var partExec bool
var aPfs engine.ActionProfiles
if aPfs, err = aS.matchingActionProfilesForEvent(tnt, aPrflIDs, cgrEv); err != nil {
@@ -158,11 +223,9 @@ func (aS *ActionS) scheduleActions(tnt string, aPrflIDs []string, cgrEv *utils.C
if partExec {
continue // skip this profile from processing further
}
// build schedActSet
var schedActSet []*scheduledActs
for trg, acts := range trgActs {
if trg == utils.META_NONE { // only one scheduledActs set
schedActSet = append(schedActSet, newScheduledActs(aPf.Tenant, aPf.ID, trg, utils.EmptyString,
schedActs = append(schedActs, newScheduledActs(aPf.Tenant, aPf.ID, trg, utils.EmptyString, aPf.Schedule,
ctx, &ActData{cgrEv.CGREvent.Event, cgrEv.Opts}, acts))
continue
}
@@ -170,26 +233,10 @@ func (aS *ActionS) scheduleActions(tnt string, aPrflIDs []string, cgrEv *utils.C
continue // no items selected
}
for trgID := range aPf.Targets[trg] {
schedActSet = append(schedActSet, newScheduledActs(aPf.Tenant, aPf.ID, trg, trgID,
schedActs = append(schedActs, newScheduledActs(aPf.Tenant, aPf.ID, trg, trgID, aPf.Schedule,
ctx, &ActData{cgrEv.CGREvent.Event, cgrEv.Opts}, acts))
}
}
// execute the actions
for _, sActs := range schedActSet {
if aPf.Schedule == utils.ASAP || forceASAP {
go aS.asapExecuteActions(sActs)
continue
}
if _, errExec := aS.crn.AddFunc(aPf.Schedule, sActs.ScheduledExecute); errExec != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> scheduling ActionProfile with id: <%s:%s>, error: <%s>",
utils.ActionS, sActs.tenant, sActs.apID, errExec))
partExec = true
continue
}
}
}
if partExec {
err = utils.ErrPartiallyExecuted
@@ -231,10 +278,22 @@ type ArgActionSv1ExecuteActions struct {
// V1ExecuteActions will be called to execute ASAP action profiles, ignoring their Schedule field
func (aS *ActionS) V1ExecuteActions(args *ArgActionSv1ExecuteActions, rpl *string) (err error) {
if err = aS.scheduleActions(args.CGREventWithOpts.Tenant, args.ActionProfileIDs,
var schedActSet []*scheduledActs
if schedActSet, err = aS.scheduledActions(args.CGREventWithOpts.Tenant, args.ActionProfileIDs,
args.CGREventWithOpts, true); err != nil {
return
}
var partExec bool
// execute the actions
for _, sActs := range schedActSet {
if err = aS.asapExecuteActions(sActs); err != nil {
partExec = true
}
}
if partExec {
err = utils.ErrPartiallyExecuted
return
}
*rpl = utils.OK
return
}

View File

@@ -37,15 +37,16 @@ func actionTarget(act string) (trgt string) {
return
}
func newScheduledActs(tenant, apID, trgTyp, trgID string,
func newScheduledActs(tenant, apID, trgTyp, trgID, schedule string,
ctx context.Context, data *ActData, acts []actioner) (sActs *scheduledActs) {
return &scheduledActs{tenant, apID, trgTyp, trgID, ctx, data, acts,
return &scheduledActs{tenant, apID, trgTyp, trgID, schedule, ctx, data, acts,
ltcache.NewTransCache(map[string]*ltcache.CacheConfig{})}
}
// scheduled is a set of actions which will be executed directly or by the cron.schedule
type scheduledActs struct {
tenant, apID, trgTyp, trgID string
schedule string
ctx context.Context
data *ActData
acts []actioner

View File

@@ -23,6 +23,7 @@ import "github.com/cgrates/cgrates/utils"
// ActionSCfg is the configuration of ActionS
type ActionSCfg struct {
Enabled bool
Tenants *[]string
IndexedSelects bool
StringIndexedFields *[]string
PrefixIndexedFields *[]string

View File

@@ -2525,6 +2525,7 @@ const (
OptsDispatcherMethod = "*method"
MetaEventType = "*eventType"
EventType = "EventType"
SchedulerInit = "SchedulerInit"
)
// Event Flags