From 56fb801b4a11c23a527826b6cd1b57b77a88f780 Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 10 Feb 2017 20:18:23 +0100 Subject: [PATCH] Fix ApierV1.GetScheduledActions, unify SetAccount logic between v1 and v2 apis --- apier/v1/accounts.go | 81 +++++++++++++++++++------------------- apier/v1/apier_it_test.go | 15 ++++++- apier/v1/scheduler.go | 70 ++------------------------------ console/scheduler_queue.go | 12 +++--- scheduler/scheduler.go | 64 ++++++++++++++++++++++++++++-- 5 files changed, 124 insertions(+), 118 deletions(-) diff --git a/apier/v1/accounts.go b/apier/v1/accounts.go index 739460db4..f886c4299 100644 --- a/apier/v1/accounts.go +++ b/apier/v1/accounts.go @@ -170,10 +170,10 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e if missing := utils.MissingStructFields(&attr, []string{"Tenant", "Account"}); len(missing) != 0 { return utils.NewErrMandatoryIeMissing(missing...) } - var schedulerReloadNeeded = false accID := utils.AccountKey(attr.Tenant, attr.Account) - var ub *engine.Account + dirtyActionPlans := make(map[string]*engine.ActionPlan) _, err = guardian.Guardian.Guard(func() (interface{}, error) { + var ub *engine.Account if bal, _ := self.AccountDb.GetAccount(accID); bal != nil { ub = bal } else { // Not found in db, create it here @@ -181,19 +181,38 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e ID: accID, } } - if len(attr.ActionPlanId) != 0 { + if attr.ActionPlanId != "" { _, err := guardian.Guardian.Guard(func() (interface{}, error) { - var ap *engine.ActionPlan - ap, err := self.RatingDb.GetActionPlan(attr.ActionPlanId, false, utils.NonTransactional) - if err != nil { + acntAPids, err := self.RatingDb.GetAccountActionPlans(accID, false, utils.NonTransactional) + if err != nil && err != utils.ErrNotFound { return 0, err } - if _, exists := ap.AccountIDs[accID]; !exists { + // clean previous action plans + for i := 0; i < len(acntAPids); { + apID := acntAPids[i] + if attr.ActionPlanId == apID { + i++ // increase index since we don't remove from slice + continue + } + ap, err := self.RatingDb.GetActionPlan(apID, false, utils.NonTransactional) + if err != nil { + return 0, err + } + delete(ap.AccountIDs, accID) + dirtyActionPlans[apID] = ap + acntAPids = append(acntAPids[:i], acntAPids[i+1:]...) // remove the item from the list so we can overwrite the real list + } + if !utils.IsSliceMember(acntAPids, attr.ActionPlanId) { // Account not yet attached to action plan, do it here + ap, err := self.RatingDb.GetActionPlan(attr.ActionPlanId, false, utils.NonTransactional) + if err != nil { + return 0, err + } if ap.AccountIDs == nil { ap.AccountIDs = make(utils.StringMap) } ap.AccountIDs[accID] = true - schedulerReloadNeeded = true + dirtyActionPlans[attr.ActionPlanId] = ap + acntAPids = append(acntAPids, attr.ActionPlanId) // create tasks for _, at := range ap.ActionTimings { if at.IsASAP() { @@ -207,31 +226,20 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e } } } - if err := self.RatingDb.SetActionPlan(attr.ActionPlanId, ap, true, utils.NonTransactional); err != nil { + } + apIDs := make([]string, len(dirtyActionPlans)) + i := 0 + for actionPlanID, ap := range dirtyActionPlans { + if err := self.RatingDb.SetActionPlan(actionPlanID, ap, true, utils.NonTransactional); err != nil { return 0, err } + apIDs[i] = actionPlanID + i++ } - // clean previous action plans - acntAPids, err := self.RatingDb.GetAccountActionPlans(accID, false, utils.NonTransactional) - if err != nil && err != utils.ErrNotFound { + if err := self.RatingDb.CacheDataFromDB(utils.ACTION_PLAN_PREFIX, apIDs, true); err != nil { return 0, err } - for _, apID := range acntAPids { - if apID != attr.ActionPlanId { - ap, err := self.RatingDb.GetActionPlan(apID, false, utils.NonTransactional) - if err != nil { - return 0, err - } - delete(ap.AccountIDs, accID) - if err = self.RatingDb.SetActionPlan(apID, ap, true, utils.NonTransactional); err != nil { - return 0, err - } - if err = self.RatingDb.CacheDataFromDB(utils.ACTION_PLAN_PREFIX, []string{ap.Id}, true); err != nil { - return 0, err - } - } - } - if err = self.RatingDb.SetAccountActionPlans(accID, []string{attr.ActionPlanId}, false); err != nil { + if err := self.RatingDb.SetAccountActionPlans(accID, acntAPids, true); err != nil { return 0, err } if err = self.RatingDb.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{accID}, true); err != nil { @@ -243,7 +251,8 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e return 0, err } } - if len(attr.ActionTriggersId) != 0 { + + if attr.ActionTriggersId != "" { atrs, err := self.RatingDb.GetActionTriggers(attr.ActionTriggersId, false, utils.NonTransactional) if err != nil { return 0, err @@ -251,6 +260,7 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e ub.ActionTriggers = atrs ub.InitCounters() } + if attr.AllowNegative != nil { ub.AllowNegative = *attr.AllowNegative } @@ -266,23 +276,14 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e if err != nil { return utils.NewErrServerError(err) } - if attr.ActionPlanId != "" { - if err = self.RatingDb.SetAccountActionPlans(accID, []string{attr.ActionPlanId}, false); err != nil { - return - } - if err = self.RatingDb.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{accID}, true); err != nil { - return - } - - } - if attr.ReloadScheduler && schedulerReloadNeeded { + if attr.ReloadScheduler && len(dirtyActionPlans) > 0 { sched := self.ServManager.GetScheduler() if sched == nil { return errors.New(utils.SchedulerNotRunningCaps) } sched.Reload() } - *reply = OK // This will mark saving of the account, error still can show up in actionTimingsId + *reply = utils.OK // This will mark saving of the account, error still can show up in actionTimingsId return nil } diff --git a/apier/v1/apier_it_test.go b/apier/v1/apier_it_test.go index 40ebeca3c..c92db0707 100644 --- a/apier/v1/apier_it_test.go +++ b/apier/v1/apier_it_test.go @@ -38,6 +38,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/scheduler" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" "github.com/streadway/amqp" @@ -1097,6 +1098,16 @@ func TestApierGetAccountActionPlan(t *testing.T) { } } +// Make sure we have scheduled actions +func TestApierITGetScheduledActionsForAccount(t *testing.T) { + var rply []*scheduler.ScheduledAction + if err := rater.Call("ApierV1.GetScheduledActions", scheduler.ArgsGetScheduledActions{Tenant: utils.StringPointer("cgrates.org"), Account: utils.StringPointer("dan7")}, &rply); err != nil { + t.Error("Unexpected error: ", err) + } else if len(rply) == 0 { + t.Errorf("ScheduledActions: %+v", rply) + } +} + // Test here RemoveActionTiming func TestApierRemUniqueIDActionTiming(t *testing.T) { var rmReply string @@ -1549,8 +1560,8 @@ func TestApierITRemAccountAliases(t *testing.T) { } func TestApierITGetScheduledActions(t *testing.T) { - var rply []*ScheduledActions - if err := rater.Call("ApierV1.GetScheduledActions", AttrsGetScheduledActions{}, &rply); err != nil { + var rply []*scheduler.ScheduledAction + if err := rater.Call("ApierV1.GetScheduledActions", scheduler.ArgsGetScheduledActions{}, &rply); err != nil { t.Error("Unexpected error: ", err) } } diff --git a/apier/v1/scheduler.go b/apier/v1/scheduler.go index 079672ddd..9357c68ea 100644 --- a/apier/v1/scheduler.go +++ b/apier/v1/scheduler.go @@ -21,10 +21,10 @@ import ( "errors" "fmt" "sort" - "strings" "time" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/scheduler" "github.com/cgrates/cgrates/utils" ) @@ -99,76 +99,12 @@ import ( ] */ -type AttrsGetScheduledActions struct { - Tenant, Account string - TimeStart, TimeEnd time.Time // Filter based on next runTime - utils.Paginator -} - -type ScheduledActions struct { - NextRunTime time.Time - Accounts int - ActionsId, ActionPlanId, ActionTimingUuid string -} - -func (self *ApierV1) GetScheduledActions(attrs AttrsGetScheduledActions, reply *[]*ScheduledActions) error { +func (self *ApierV1) GetScheduledActions(args scheduler.ArgsGetScheduledActions, reply *[]*scheduler.ScheduledAction) error { sched := self.ServManager.GetScheduler() if sched == nil { return errors.New(utils.SchedulerNotRunningCaps) } - schedActions := make([]*ScheduledActions, 0) // needs to be initialized if remains empty - scheduledActions := sched.GetQueue() - for _, qActions := range scheduledActions { - sas := &ScheduledActions{ActionsId: qActions.ActionsID, ActionPlanId: qActions.GetActionPlanID(), ActionTimingUuid: qActions.Uuid, Accounts: len(qActions.GetAccountIDs())} - if attrs.SearchTerm != "" && - !(strings.Contains(sas.ActionPlanId, attrs.SearchTerm) || - strings.Contains(sas.ActionsId, attrs.SearchTerm)) { - continue - } - sas.NextRunTime = qActions.GetNextStartTime(time.Now()) - if !attrs.TimeStart.IsZero() && sas.NextRunTime.Before(attrs.TimeStart) { - continue // Filter here only requests in the filtered interval - } - if !attrs.TimeEnd.IsZero() && (sas.NextRunTime.After(attrs.TimeEnd) || sas.NextRunTime.Equal(attrs.TimeEnd)) { - continue - } - // filter on account - if attrs.Tenant != "" || attrs.Account != "" { - found := false - for accID := range qActions.GetAccountIDs() { - split := strings.Split(accID, utils.CONCATENATED_KEY_SEP) - if len(split) != 2 { - continue // malformed account id - } - if attrs.Tenant != "" && attrs.Tenant != split[0] { - continue - } - if attrs.Account != "" && attrs.Account != split[1] { - continue - } - found = true - break - } - if !found { - continue - } - } - - // we have a winner - - schedActions = append(schedActions, sas) - } - if attrs.Paginator.Offset != nil { - if *attrs.Paginator.Offset <= len(schedActions) { - schedActions = schedActions[*attrs.Paginator.Offset:] - } - } - if attrs.Paginator.Limit != nil { - if *attrs.Paginator.Limit <= len(schedActions) { - schedActions = schedActions[:*attrs.Paginator.Limit] - } - } - *reply = schedActions + *reply = sched.GetScheduledActions(args) return nil } diff --git a/console/scheduler_queue.go b/console/scheduler_queue.go index 4f7c7adc2..e003f300b 100644 --- a/console/scheduler_queue.go +++ b/console/scheduler_queue.go @@ -17,13 +17,15 @@ along with this program. If not, see */ package console -import "github.com/cgrates/cgrates/apier/v1" +import ( + "github.com/cgrates/cgrates/scheduler" +) func init() { c := &CmdGetScheduledActions{ name: "scheduler_queue", rpcMethod: "ApierV1.GetScheduledActions", - rpcParams: &v1.AttrsGetScheduledActions{}, + rpcParams: &scheduler.ArgsGetScheduledActions{}, } commands[c.Name()] = c c.CommandExecuter = &CommandExecuter{c} @@ -33,7 +35,7 @@ func init() { type CmdGetScheduledActions struct { name string rpcMethod string - rpcParams *v1.AttrsGetScheduledActions + rpcParams *scheduler.ArgsGetScheduledActions *CommandExecuter } @@ -47,7 +49,7 @@ func (self *CmdGetScheduledActions) RpcMethod() string { func (self *CmdGetScheduledActions) RpcParams(reset bool) interface{} { if reset || self.rpcParams == nil { - self.rpcParams = &v1.AttrsGetScheduledActions{} + self.rpcParams = &scheduler.ArgsGetScheduledActions{} } return self.rpcParams } @@ -57,6 +59,6 @@ func (self *CmdGetScheduledActions) PostprocessRpcParams() error { } func (self *CmdGetScheduledActions) RpcResult() interface{} { - s := make([]*v1.ScheduledActions, 0) + s := make([]*scheduler.ScheduledAction, 0) return &s } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 374915230..163e1da60 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -20,6 +20,7 @@ package scheduler import ( "fmt" "sort" + "strings" "sync" "time" @@ -90,6 +91,7 @@ func (s *Scheduler) Loop() { utils.Logger.Info(fmt.Sprintf(" Scheduler queue length: %v", len(s.queue))) s.Lock() a0 := s.queue[0] + utils.Logger.Debug(fmt.Sprintf("Have at scheduled: %+v", a0)) utils.Logger.Info(fmt.Sprintf(" Action: %s", a0.ActionsID)) now := time.Now() start := a0.GetNextStartTime(now) @@ -174,6 +176,7 @@ func (s *Scheduler) loadActionPlans() { } at.SetAccountIDs(actionPlan.AccountIDs) // copy the accounts at.SetActionPlanID(actionPlan.Id) + utils.Logger.Debug(fmt.Sprintf("Scheduling queue, add at: %+v", at)) s.queue = append(s.queue, at) } @@ -191,11 +194,64 @@ func (s *Scheduler) restart() { } } -func (s *Scheduler) GetQueue() (queue engine.ActionTimingPriorityList) { +type ArgsGetScheduledActions struct { + Tenant, Account *string + TimeStart, TimeEnd *time.Time // Filter based on next runTime + utils.Paginator +} + +type ScheduledAction struct { + NextRunTime time.Time + Accounts int // Number of acccounts this action will run on + ActionPlanID, ActionTimingUUID, ActionsID string +} + +func (s *Scheduler) GetScheduledActions(fltr ArgsGetScheduledActions) (schedActions []*ScheduledAction) { s.RLock() - utils.Clone(s.queue, &queue) - defer s.RUnlock() - return queue + for _, at := range s.queue { + sas := &ScheduledAction{NextRunTime: at.GetNextStartTime(time.Now()), Accounts: len(at.GetAccountIDs()), + ActionPlanID: at.GetActionPlanID(), ActionTimingUUID: at.Uuid, ActionsID: at.ActionsID} + if fltr.TimeStart != nil && !fltr.TimeStart.IsZero() && sas.NextRunTime.Before(*fltr.TimeStart) { + continue // need to match the filter interval + } + if fltr.TimeEnd != nil && !fltr.TimeEnd.IsZero() && (sas.NextRunTime.After(*fltr.TimeEnd) || sas.NextRunTime.Equal(*fltr.TimeEnd)) { + continue + } + // filter on account + if fltr.Tenant != nil || fltr.Account != nil { + found := false + for accID := range at.GetAccountIDs() { + split := strings.Split(accID, utils.CONCATENATED_KEY_SEP) + if len(split) != 2 { + continue // malformed account id + } + if fltr.Tenant != nil && *fltr.Tenant != split[0] { + continue + } + if fltr.Account != nil && *fltr.Account != split[1] { + continue + } + found = true + break + } + if !found { + continue + } + } + schedActions = append(schedActions, sas) + } + if fltr.Paginator.Offset != nil { + if *fltr.Paginator.Offset <= len(schedActions) { + schedActions = schedActions[*fltr.Paginator.Offset:] + } + } + if fltr.Paginator.Limit != nil { + if *fltr.Paginator.Limit <= len(schedActions) { + schedActions = schedActions[:*fltr.Paginator.Limit] + } + } + s.RUnlock() + return } func (s *Scheduler) Shutdown() {