diff --git a/apier/v1/accounts.go b/apier/v1/accounts.go
index 739460db4..f886c4299 100644
--- a/apier/v1/accounts.go
+++ b/apier/v1/accounts.go
@@ -170,10 +170,10 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e
if missing := utils.MissingStructFields(&attr, []string{"Tenant", "Account"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
- var schedulerReloadNeeded = false
accID := utils.AccountKey(attr.Tenant, attr.Account)
- var ub *engine.Account
+ dirtyActionPlans := make(map[string]*engine.ActionPlan)
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
+ var ub *engine.Account
if bal, _ := self.AccountDb.GetAccount(accID); bal != nil {
ub = bal
} else { // Not found in db, create it here
@@ -181,19 +181,38 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e
ID: accID,
}
}
- if len(attr.ActionPlanId) != 0 {
+ if attr.ActionPlanId != "" {
_, err := guardian.Guardian.Guard(func() (interface{}, error) {
- var ap *engine.ActionPlan
- ap, err := self.RatingDb.GetActionPlan(attr.ActionPlanId, false, utils.NonTransactional)
- if err != nil {
+ acntAPids, err := self.RatingDb.GetAccountActionPlans(accID, false, utils.NonTransactional)
+ if err != nil && err != utils.ErrNotFound {
return 0, err
}
- if _, exists := ap.AccountIDs[accID]; !exists {
+ // clean previous action plans
+ for i := 0; i < len(acntAPids); {
+ apID := acntAPids[i]
+ if attr.ActionPlanId == apID {
+ i++ // increase index since we don't remove from slice
+ continue
+ }
+ ap, err := self.RatingDb.GetActionPlan(apID, false, utils.NonTransactional)
+ if err != nil {
+ return 0, err
+ }
+ delete(ap.AccountIDs, accID)
+ dirtyActionPlans[apID] = ap
+ acntAPids = append(acntAPids[:i], acntAPids[i+1:]...) // remove the item from the list so we can overwrite the real list
+ }
+ if !utils.IsSliceMember(acntAPids, attr.ActionPlanId) { // Account not yet attached to action plan, do it here
+ ap, err := self.RatingDb.GetActionPlan(attr.ActionPlanId, false, utils.NonTransactional)
+ if err != nil {
+ return 0, err
+ }
if ap.AccountIDs == nil {
ap.AccountIDs = make(utils.StringMap)
}
ap.AccountIDs[accID] = true
- schedulerReloadNeeded = true
+ dirtyActionPlans[attr.ActionPlanId] = ap
+ acntAPids = append(acntAPids, attr.ActionPlanId)
// create tasks
for _, at := range ap.ActionTimings {
if at.IsASAP() {
@@ -207,31 +226,20 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e
}
}
}
- if err := self.RatingDb.SetActionPlan(attr.ActionPlanId, ap, true, utils.NonTransactional); err != nil {
+ }
+ apIDs := make([]string, len(dirtyActionPlans))
+ i := 0
+ for actionPlanID, ap := range dirtyActionPlans {
+ if err := self.RatingDb.SetActionPlan(actionPlanID, ap, true, utils.NonTransactional); err != nil {
return 0, err
}
+ apIDs[i] = actionPlanID
+ i++
}
- // clean previous action plans
- acntAPids, err := self.RatingDb.GetAccountActionPlans(accID, false, utils.NonTransactional)
- if err != nil && err != utils.ErrNotFound {
+ if err := self.RatingDb.CacheDataFromDB(utils.ACTION_PLAN_PREFIX, apIDs, true); err != nil {
return 0, err
}
- for _, apID := range acntAPids {
- if apID != attr.ActionPlanId {
- ap, err := self.RatingDb.GetActionPlan(apID, false, utils.NonTransactional)
- if err != nil {
- return 0, err
- }
- delete(ap.AccountIDs, accID)
- if err = self.RatingDb.SetActionPlan(apID, ap, true, utils.NonTransactional); err != nil {
- return 0, err
- }
- if err = self.RatingDb.CacheDataFromDB(utils.ACTION_PLAN_PREFIX, []string{ap.Id}, true); err != nil {
- return 0, err
- }
- }
- }
- if err = self.RatingDb.SetAccountActionPlans(accID, []string{attr.ActionPlanId}, false); err != nil {
+ if err := self.RatingDb.SetAccountActionPlans(accID, acntAPids, true); err != nil {
return 0, err
}
if err = self.RatingDb.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{accID}, true); err != nil {
@@ -243,7 +251,8 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e
return 0, err
}
}
- if len(attr.ActionTriggersId) != 0 {
+
+ if attr.ActionTriggersId != "" {
atrs, err := self.RatingDb.GetActionTriggers(attr.ActionTriggersId, false, utils.NonTransactional)
if err != nil {
return 0, err
@@ -251,6 +260,7 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e
ub.ActionTriggers = atrs
ub.InitCounters()
}
+
if attr.AllowNegative != nil {
ub.AllowNegative = *attr.AllowNegative
}
@@ -266,23 +276,14 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e
if err != nil {
return utils.NewErrServerError(err)
}
- if attr.ActionPlanId != "" {
- if err = self.RatingDb.SetAccountActionPlans(accID, []string{attr.ActionPlanId}, false); err != nil {
- return
- }
- if err = self.RatingDb.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{accID}, true); err != nil {
- return
- }
-
- }
- if attr.ReloadScheduler && schedulerReloadNeeded {
+ if attr.ReloadScheduler && len(dirtyActionPlans) > 0 {
sched := self.ServManager.GetScheduler()
if sched == nil {
return errors.New(utils.SchedulerNotRunningCaps)
}
sched.Reload()
}
- *reply = OK // This will mark saving of the account, error still can show up in actionTimingsId
+ *reply = utils.OK // This will mark saving of the account, error still can show up in actionTimingsId
return nil
}
diff --git a/apier/v1/apier_it_test.go b/apier/v1/apier_it_test.go
index 40ebeca3c..c92db0707 100644
--- a/apier/v1/apier_it_test.go
+++ b/apier/v1/apier_it_test.go
@@ -38,6 +38,7 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/scheduler"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
"github.com/streadway/amqp"
@@ -1097,6 +1098,16 @@ func TestApierGetAccountActionPlan(t *testing.T) {
}
}
+// Make sure we have scheduled actions
+func TestApierITGetScheduledActionsForAccount(t *testing.T) {
+ var rply []*scheduler.ScheduledAction
+ if err := rater.Call("ApierV1.GetScheduledActions", scheduler.ArgsGetScheduledActions{Tenant: utils.StringPointer("cgrates.org"), Account: utils.StringPointer("dan7")}, &rply); err != nil {
+ t.Error("Unexpected error: ", err)
+ } else if len(rply) == 0 {
+ t.Errorf("ScheduledActions: %+v", rply)
+ }
+}
+
// Test here RemoveActionTiming
func TestApierRemUniqueIDActionTiming(t *testing.T) {
var rmReply string
@@ -1549,8 +1560,8 @@ func TestApierITRemAccountAliases(t *testing.T) {
}
func TestApierITGetScheduledActions(t *testing.T) {
- var rply []*ScheduledActions
- if err := rater.Call("ApierV1.GetScheduledActions", AttrsGetScheduledActions{}, &rply); err != nil {
+ var rply []*scheduler.ScheduledAction
+ if err := rater.Call("ApierV1.GetScheduledActions", scheduler.ArgsGetScheduledActions{}, &rply); err != nil {
t.Error("Unexpected error: ", err)
}
}
diff --git a/apier/v1/scheduler.go b/apier/v1/scheduler.go
index 079672ddd..9357c68ea 100644
--- a/apier/v1/scheduler.go
+++ b/apier/v1/scheduler.go
@@ -21,10 +21,10 @@ import (
"errors"
"fmt"
"sort"
- "strings"
"time"
"github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/scheduler"
"github.com/cgrates/cgrates/utils"
)
@@ -99,76 +99,12 @@ import (
]
*/
-type AttrsGetScheduledActions struct {
- Tenant, Account string
- TimeStart, TimeEnd time.Time // Filter based on next runTime
- utils.Paginator
-}
-
-type ScheduledActions struct {
- NextRunTime time.Time
- Accounts int
- ActionsId, ActionPlanId, ActionTimingUuid string
-}
-
-func (self *ApierV1) GetScheduledActions(attrs AttrsGetScheduledActions, reply *[]*ScheduledActions) error {
+func (self *ApierV1) GetScheduledActions(args scheduler.ArgsGetScheduledActions, reply *[]*scheduler.ScheduledAction) error {
sched := self.ServManager.GetScheduler()
if sched == nil {
return errors.New(utils.SchedulerNotRunningCaps)
}
- schedActions := make([]*ScheduledActions, 0) // needs to be initialized if remains empty
- scheduledActions := sched.GetQueue()
- for _, qActions := range scheduledActions {
- sas := &ScheduledActions{ActionsId: qActions.ActionsID, ActionPlanId: qActions.GetActionPlanID(), ActionTimingUuid: qActions.Uuid, Accounts: len(qActions.GetAccountIDs())}
- if attrs.SearchTerm != "" &&
- !(strings.Contains(sas.ActionPlanId, attrs.SearchTerm) ||
- strings.Contains(sas.ActionsId, attrs.SearchTerm)) {
- continue
- }
- sas.NextRunTime = qActions.GetNextStartTime(time.Now())
- if !attrs.TimeStart.IsZero() && sas.NextRunTime.Before(attrs.TimeStart) {
- continue // Filter here only requests in the filtered interval
- }
- if !attrs.TimeEnd.IsZero() && (sas.NextRunTime.After(attrs.TimeEnd) || sas.NextRunTime.Equal(attrs.TimeEnd)) {
- continue
- }
- // filter on account
- if attrs.Tenant != "" || attrs.Account != "" {
- found := false
- for accID := range qActions.GetAccountIDs() {
- split := strings.Split(accID, utils.CONCATENATED_KEY_SEP)
- if len(split) != 2 {
- continue // malformed account id
- }
- if attrs.Tenant != "" && attrs.Tenant != split[0] {
- continue
- }
- if attrs.Account != "" && attrs.Account != split[1] {
- continue
- }
- found = true
- break
- }
- if !found {
- continue
- }
- }
-
- // we have a winner
-
- schedActions = append(schedActions, sas)
- }
- if attrs.Paginator.Offset != nil {
- if *attrs.Paginator.Offset <= len(schedActions) {
- schedActions = schedActions[*attrs.Paginator.Offset:]
- }
- }
- if attrs.Paginator.Limit != nil {
- if *attrs.Paginator.Limit <= len(schedActions) {
- schedActions = schedActions[:*attrs.Paginator.Limit]
- }
- }
- *reply = schedActions
+ *reply = sched.GetScheduledActions(args)
return nil
}
diff --git a/console/scheduler_queue.go b/console/scheduler_queue.go
index 4f7c7adc2..e003f300b 100644
--- a/console/scheduler_queue.go
+++ b/console/scheduler_queue.go
@@ -17,13 +17,15 @@ along with this program. If not, see
*/
package console
-import "github.com/cgrates/cgrates/apier/v1"
+import (
+ "github.com/cgrates/cgrates/scheduler"
+)
func init() {
c := &CmdGetScheduledActions{
name: "scheduler_queue",
rpcMethod: "ApierV1.GetScheduledActions",
- rpcParams: &v1.AttrsGetScheduledActions{},
+ rpcParams: &scheduler.ArgsGetScheduledActions{},
}
commands[c.Name()] = c
c.CommandExecuter = &CommandExecuter{c}
@@ -33,7 +35,7 @@ func init() {
type CmdGetScheduledActions struct {
name string
rpcMethod string
- rpcParams *v1.AttrsGetScheduledActions
+ rpcParams *scheduler.ArgsGetScheduledActions
*CommandExecuter
}
@@ -47,7 +49,7 @@ func (self *CmdGetScheduledActions) RpcMethod() string {
func (self *CmdGetScheduledActions) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
- self.rpcParams = &v1.AttrsGetScheduledActions{}
+ self.rpcParams = &scheduler.ArgsGetScheduledActions{}
}
return self.rpcParams
}
@@ -57,6 +59,6 @@ func (self *CmdGetScheduledActions) PostprocessRpcParams() error {
}
func (self *CmdGetScheduledActions) RpcResult() interface{} {
- s := make([]*v1.ScheduledActions, 0)
+ s := make([]*scheduler.ScheduledAction, 0)
return &s
}
diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go
index 374915230..163e1da60 100644
--- a/scheduler/scheduler.go
+++ b/scheduler/scheduler.go
@@ -20,6 +20,7 @@ package scheduler
import (
"fmt"
"sort"
+ "strings"
"sync"
"time"
@@ -90,6 +91,7 @@ func (s *Scheduler) Loop() {
utils.Logger.Info(fmt.Sprintf(" Scheduler queue length: %v", len(s.queue)))
s.Lock()
a0 := s.queue[0]
+ utils.Logger.Debug(fmt.Sprintf("Have at scheduled: %+v", a0))
utils.Logger.Info(fmt.Sprintf(" Action: %s", a0.ActionsID))
now := time.Now()
start := a0.GetNextStartTime(now)
@@ -174,6 +176,7 @@ func (s *Scheduler) loadActionPlans() {
}
at.SetAccountIDs(actionPlan.AccountIDs) // copy the accounts
at.SetActionPlanID(actionPlan.Id)
+ utils.Logger.Debug(fmt.Sprintf("Scheduling queue, add at: %+v", at))
s.queue = append(s.queue, at)
}
@@ -191,11 +194,64 @@ func (s *Scheduler) restart() {
}
}
-func (s *Scheduler) GetQueue() (queue engine.ActionTimingPriorityList) {
+type ArgsGetScheduledActions struct {
+ Tenant, Account *string
+ TimeStart, TimeEnd *time.Time // Filter based on next runTime
+ utils.Paginator
+}
+
+type ScheduledAction struct {
+ NextRunTime time.Time
+ Accounts int // Number of acccounts this action will run on
+ ActionPlanID, ActionTimingUUID, ActionsID string
+}
+
+func (s *Scheduler) GetScheduledActions(fltr ArgsGetScheduledActions) (schedActions []*ScheduledAction) {
s.RLock()
- utils.Clone(s.queue, &queue)
- defer s.RUnlock()
- return queue
+ for _, at := range s.queue {
+ sas := &ScheduledAction{NextRunTime: at.GetNextStartTime(time.Now()), Accounts: len(at.GetAccountIDs()),
+ ActionPlanID: at.GetActionPlanID(), ActionTimingUUID: at.Uuid, ActionsID: at.ActionsID}
+ if fltr.TimeStart != nil && !fltr.TimeStart.IsZero() && sas.NextRunTime.Before(*fltr.TimeStart) {
+ continue // need to match the filter interval
+ }
+ if fltr.TimeEnd != nil && !fltr.TimeEnd.IsZero() && (sas.NextRunTime.After(*fltr.TimeEnd) || sas.NextRunTime.Equal(*fltr.TimeEnd)) {
+ continue
+ }
+ // filter on account
+ if fltr.Tenant != nil || fltr.Account != nil {
+ found := false
+ for accID := range at.GetAccountIDs() {
+ split := strings.Split(accID, utils.CONCATENATED_KEY_SEP)
+ if len(split) != 2 {
+ continue // malformed account id
+ }
+ if fltr.Tenant != nil && *fltr.Tenant != split[0] {
+ continue
+ }
+ if fltr.Account != nil && *fltr.Account != split[1] {
+ continue
+ }
+ found = true
+ break
+ }
+ if !found {
+ continue
+ }
+ }
+ schedActions = append(schedActions, sas)
+ }
+ if fltr.Paginator.Offset != nil {
+ if *fltr.Paginator.Offset <= len(schedActions) {
+ schedActions = schedActions[*fltr.Paginator.Offset:]
+ }
+ }
+ if fltr.Paginator.Limit != nil {
+ if *fltr.Paginator.Limit <= len(schedActions) {
+ schedActions = schedActions[:*fltr.Paginator.Limit]
+ }
+ }
+ s.RUnlock()
+ return
}
func (s *Scheduler) Shutdown() {