From 73e897916d34bc1ba4af554167571afa25596fbf Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 20 Dec 2020 17:09:09 +0100 Subject: [PATCH] ActionS - schedInit for scheduler reload --- actions/actions.go | 109 ++++++++++++++++++++++++++++++++---------- actions/libactions.go | 5 +- config/actionscfg.go | 1 + utils/consts.go | 1 + 4 files changed, 89 insertions(+), 27 deletions(-) diff --git a/actions/actions.go b/actions/actions.go index 5d380771a..d549503d1 100644 --- a/actions/actions.go +++ b/actions/actions.go @@ -21,6 +21,8 @@ package actions import ( "context" "fmt" + "sync" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -35,7 +37,6 @@ func NewActionS(cfg *config.CGRConfig, fltrS *engine.FilterS, dm *engine.DataMan cfg: cfg, fltrS: fltrS, dm: dm, - crn: cron.New(), } } @@ -45,13 +46,14 @@ type ActionS struct { fltrS *engine.FilterS dm *engine.DataManager crn *cron.Cron + crnLk *sync.RWMutex } // ListenAndServe keeps the service alive func (aS *ActionS) ListenAndServe(stopChan, cfgRld chan struct{}) { utils.Logger.Info(fmt.Sprintf("<%s> starting <%s>", utils.CoreS, utils.ActionS)) - aS.crn.Start() + aS.schedInit() // initialize cron and schedule actions for { select { case <-stopChan: @@ -65,7 +67,9 @@ func (aS *ActionS) ListenAndServe(stopChan, cfgRld chan struct{}) { // Shutdown is called to shutdown the service func (aS *ActionS) Shutdown() (err error) { utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.ActionS)) + aS.crnLk.RLock() aS.crn.Stop() + aS.crnLk.RUnlock() return } @@ -74,6 +78,66 @@ func (aS *ActionS) Call(serviceMethod string, args interface{}, reply interface{ return utils.RPCCall(aS, serviceMethod, args, reply) } +// schedInit will set up cron and load the matching data +func (aS *ActionS) schedInit() (err error) { + aS.crnLk.Lock() // make sure we don't have parallel processes running setu + defer aS.crnLk.Unlock() + utils.Logger.Info(fmt.Sprintf("<%s> initializing scheduler.", utils.ActionS)) + tnts := []string{aS.cfg.GeneralCfg().DefaultTenant} + if aS.cfg.ActionSCfg().Tenants != nil { + tnts = *aS.cfg.ActionSCfg().Tenants + } + if len(tnts) == 0 { + return utils.NewErrMandatoryIeMissing(utils.TenantCfg) + } + crn := cron.New() + var partExec bool + for _, tnt := range tnts { + cgrEv := &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ + Tenant: tnt, + ID: utils.GenUUID(), + Time: utils.TimePointer(time.Now()), + }, + Opts: map[string]interface{}{ + utils.EventType: utils.SchedulerInit, + utils.NodeID: aS.cfg.GeneralCfg().NodeID, + }, + } + var schedActSet []*scheduledActs + if schedActSet, err = aS.scheduledActions(tnt, nil, cgrEv, false); err != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> scheduler init, ignoring tenant: <%s>, error: <%s>", + utils.ActionS, tnt, err)) + partExec = true + continue + } + for _, sActs := range schedActSet { + if sActs.schedule == utils.ASAP { + go aS.asapExecuteActions(sActs) + continue + } + if _, err = crn.AddFunc(sActs.schedule, sActs.ScheduledExecute); err != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> scheduling ActionProfile with id: <%s:%s>, error: <%s>", + utils.ActionS, sActs.tenant, sActs.apID, err)) + partExec = true + continue + } + } + } + if partExec { + err = utils.ErrPartiallyExecuted + } + if aS.crn != nil { + aS.crn.Stop() + } + aS.crn.Start() + return +} + // matchingActionProfilesForEvent returns the matched ActionProfiles for the given event func (aS *ActionS) matchingActionProfilesForEvent(tnt string, aPrflIDs []string, cgrEv *utils.CGREventWithOpts) (aPfs engine.ActionProfiles, err error) { @@ -127,8 +191,9 @@ func (aS *ActionS) matchingActionProfilesForEvent(tnt string, aPrflIDs []string, return } -// scheduleActions is responsible for scheduling the action profiles matching cgrEv -func (aS *ActionS) scheduleActions(tnt string, aPrflIDs []string, cgrEv *utils.CGREventWithOpts, forceASAP bool) (err error) { +// scheduledActions is responsible for scheduling the action profiles matching cgrEv +func (aS *ActionS) scheduledActions(tnt string, aPrflIDs []string, cgrEv *utils.CGREventWithOpts, + forceASAP bool) (schedActs []*scheduledActs, err error) { var partExec bool var aPfs engine.ActionProfiles if aPfs, err = aS.matchingActionProfilesForEvent(tnt, aPrflIDs, cgrEv); err != nil { @@ -158,11 +223,9 @@ func (aS *ActionS) scheduleActions(tnt string, aPrflIDs []string, cgrEv *utils.C if partExec { continue // skip this profile from processing further } - // build schedActSet - var schedActSet []*scheduledActs for trg, acts := range trgActs { if trg == utils.META_NONE { // only one scheduledActs set - schedActSet = append(schedActSet, newScheduledActs(aPf.Tenant, aPf.ID, trg, utils.EmptyString, + schedActs = append(schedActs, newScheduledActs(aPf.Tenant, aPf.ID, trg, utils.EmptyString, aPf.Schedule, ctx, &ActData{cgrEv.CGREvent.Event, cgrEv.Opts}, acts)) continue } @@ -170,26 +233,10 @@ func (aS *ActionS) scheduleActions(tnt string, aPrflIDs []string, cgrEv *utils.C continue // no items selected } for trgID := range aPf.Targets[trg] { - schedActSet = append(schedActSet, newScheduledActs(aPf.Tenant, aPf.ID, trg, trgID, + schedActs = append(schedActs, newScheduledActs(aPf.Tenant, aPf.ID, trg, trgID, aPf.Schedule, ctx, &ActData{cgrEv.CGREvent.Event, cgrEv.Opts}, acts)) } } - // execute the actions - for _, sActs := range schedActSet { - if aPf.Schedule == utils.ASAP || forceASAP { - go aS.asapExecuteActions(sActs) - continue - } - if _, errExec := aS.crn.AddFunc(aPf.Schedule, sActs.ScheduledExecute); errExec != nil { - utils.Logger.Warning( - fmt.Sprintf( - "<%s> scheduling ActionProfile with id: <%s:%s>, error: <%s>", - utils.ActionS, sActs.tenant, sActs.apID, errExec)) - partExec = true - continue - } - - } } if partExec { err = utils.ErrPartiallyExecuted @@ -231,10 +278,22 @@ type ArgActionSv1ExecuteActions struct { // V1ExecuteActions will be called to execute ASAP action profiles, ignoring their Schedule field func (aS *ActionS) V1ExecuteActions(args *ArgActionSv1ExecuteActions, rpl *string) (err error) { - if err = aS.scheduleActions(args.CGREventWithOpts.Tenant, args.ActionProfileIDs, + var schedActSet []*scheduledActs + if schedActSet, err = aS.scheduledActions(args.CGREventWithOpts.Tenant, args.ActionProfileIDs, args.CGREventWithOpts, true); err != nil { return } + var partExec bool + // execute the actions + for _, sActs := range schedActSet { + if err = aS.asapExecuteActions(sActs); err != nil { + partExec = true + } + } + if partExec { + err = utils.ErrPartiallyExecuted + return + } *rpl = utils.OK return } diff --git a/actions/libactions.go b/actions/libactions.go index c68add259..9169153b2 100644 --- a/actions/libactions.go +++ b/actions/libactions.go @@ -37,15 +37,16 @@ func actionTarget(act string) (trgt string) { return } -func newScheduledActs(tenant, apID, trgTyp, trgID string, +func newScheduledActs(tenant, apID, trgTyp, trgID, schedule string, ctx context.Context, data *ActData, acts []actioner) (sActs *scheduledActs) { - return &scheduledActs{tenant, apID, trgTyp, trgID, ctx, data, acts, + return &scheduledActs{tenant, apID, trgTyp, trgID, schedule, ctx, data, acts, ltcache.NewTransCache(map[string]*ltcache.CacheConfig{})} } // scheduled is a set of actions which will be executed directly or by the cron.schedule type scheduledActs struct { tenant, apID, trgTyp, trgID string + schedule string ctx context.Context data *ActData acts []actioner diff --git a/config/actionscfg.go b/config/actionscfg.go index 88447f95b..b0f4169b9 100644 --- a/config/actionscfg.go +++ b/config/actionscfg.go @@ -23,6 +23,7 @@ import "github.com/cgrates/cgrates/utils" // ActionSCfg is the configuration of ActionS type ActionSCfg struct { Enabled bool + Tenants *[]string IndexedSelects bool StringIndexedFields *[]string PrefixIndexedFields *[]string diff --git a/utils/consts.go b/utils/consts.go index f28f03e66..eb7d45310 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -2525,6 +2525,7 @@ const ( OptsDispatcherMethod = "*method" MetaEventType = "*eventType" EventType = "EventType" + SchedulerInit = "SchedulerInit" ) // Event Flags