mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 10:06:24 +05:00
Fix ApierV1.GetScheduledActions, unify SetAccount logic between v1 and v2 apis
This commit is contained in:
@@ -170,10 +170,10 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e
|
||||
if missing := utils.MissingStructFields(&attr, []string{"Tenant", "Account"}); len(missing) != 0 {
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
}
|
||||
var schedulerReloadNeeded = false
|
||||
accID := utils.AccountKey(attr.Tenant, attr.Account)
|
||||
var ub *engine.Account
|
||||
dirtyActionPlans := make(map[string]*engine.ActionPlan)
|
||||
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
var ub *engine.Account
|
||||
if bal, _ := self.AccountDb.GetAccount(accID); bal != nil {
|
||||
ub = bal
|
||||
} else { // Not found in db, create it here
|
||||
@@ -181,19 +181,38 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e
|
||||
ID: accID,
|
||||
}
|
||||
}
|
||||
if len(attr.ActionPlanId) != 0 {
|
||||
if attr.ActionPlanId != "" {
|
||||
_, err := guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
var ap *engine.ActionPlan
|
||||
ap, err := self.RatingDb.GetActionPlan(attr.ActionPlanId, false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
acntAPids, err := self.RatingDb.GetAccountActionPlans(accID, false, utils.NonTransactional)
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
return 0, err
|
||||
}
|
||||
if _, exists := ap.AccountIDs[accID]; !exists {
|
||||
// clean previous action plans
|
||||
for i := 0; i < len(acntAPids); {
|
||||
apID := acntAPids[i]
|
||||
if attr.ActionPlanId == apID {
|
||||
i++ // increase index since we don't remove from slice
|
||||
continue
|
||||
}
|
||||
ap, err := self.RatingDb.GetActionPlan(apID, false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
delete(ap.AccountIDs, accID)
|
||||
dirtyActionPlans[apID] = ap
|
||||
acntAPids = append(acntAPids[:i], acntAPids[i+1:]...) // remove the item from the list so we can overwrite the real list
|
||||
}
|
||||
if !utils.IsSliceMember(acntAPids, attr.ActionPlanId) { // Account not yet attached to action plan, do it here
|
||||
ap, err := self.RatingDb.GetActionPlan(attr.ActionPlanId, false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if ap.AccountIDs == nil {
|
||||
ap.AccountIDs = make(utils.StringMap)
|
||||
}
|
||||
ap.AccountIDs[accID] = true
|
||||
schedulerReloadNeeded = true
|
||||
dirtyActionPlans[attr.ActionPlanId] = ap
|
||||
acntAPids = append(acntAPids, attr.ActionPlanId)
|
||||
// create tasks
|
||||
for _, at := range ap.ActionTimings {
|
||||
if at.IsASAP() {
|
||||
@@ -207,31 +226,20 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := self.RatingDb.SetActionPlan(attr.ActionPlanId, ap, true, utils.NonTransactional); err != nil {
|
||||
}
|
||||
apIDs := make([]string, len(dirtyActionPlans))
|
||||
i := 0
|
||||
for actionPlanID, ap := range dirtyActionPlans {
|
||||
if err := self.RatingDb.SetActionPlan(actionPlanID, ap, true, utils.NonTransactional); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
apIDs[i] = actionPlanID
|
||||
i++
|
||||
}
|
||||
// clean previous action plans
|
||||
acntAPids, err := self.RatingDb.GetAccountActionPlans(accID, false, utils.NonTransactional)
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
if err := self.RatingDb.CacheDataFromDB(utils.ACTION_PLAN_PREFIX, apIDs, true); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
for _, apID := range acntAPids {
|
||||
if apID != attr.ActionPlanId {
|
||||
ap, err := self.RatingDb.GetActionPlan(apID, false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
delete(ap.AccountIDs, accID)
|
||||
if err = self.RatingDb.SetActionPlan(apID, ap, true, utils.NonTransactional); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if err = self.RatingDb.CacheDataFromDB(utils.ACTION_PLAN_PREFIX, []string{ap.Id}, true); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
}
|
||||
if err = self.RatingDb.SetAccountActionPlans(accID, []string{attr.ActionPlanId}, false); err != nil {
|
||||
if err := self.RatingDb.SetAccountActionPlans(accID, acntAPids, true); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if err = self.RatingDb.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{accID}, true); err != nil {
|
||||
@@ -243,7 +251,8 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
if len(attr.ActionTriggersId) != 0 {
|
||||
|
||||
if attr.ActionTriggersId != "" {
|
||||
atrs, err := self.RatingDb.GetActionTriggers(attr.ActionTriggersId, false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@@ -251,6 +260,7 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e
|
||||
ub.ActionTriggers = atrs
|
||||
ub.InitCounters()
|
||||
}
|
||||
|
||||
if attr.AllowNegative != nil {
|
||||
ub.AllowNegative = *attr.AllowNegative
|
||||
}
|
||||
@@ -266,23 +276,14 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
if attr.ActionPlanId != "" {
|
||||
if err = self.RatingDb.SetAccountActionPlans(accID, []string{attr.ActionPlanId}, false); err != nil {
|
||||
return
|
||||
}
|
||||
if err = self.RatingDb.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{accID}, true); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
if attr.ReloadScheduler && schedulerReloadNeeded {
|
||||
if attr.ReloadScheduler && len(dirtyActionPlans) > 0 {
|
||||
sched := self.ServManager.GetScheduler()
|
||||
if sched == nil {
|
||||
return errors.New(utils.SchedulerNotRunningCaps)
|
||||
}
|
||||
sched.Reload()
|
||||
}
|
||||
*reply = OK // This will mark saving of the account, error still can show up in actionTimingsId
|
||||
*reply = utils.OK // This will mark saving of the account, error still can show up in actionTimingsId
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -38,6 +38,7 @@ import (
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/scheduler"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/streadway/amqp"
|
||||
@@ -1097,6 +1098,16 @@ func TestApierGetAccountActionPlan(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure we have scheduled actions
|
||||
func TestApierITGetScheduledActionsForAccount(t *testing.T) {
|
||||
var rply []*scheduler.ScheduledAction
|
||||
if err := rater.Call("ApierV1.GetScheduledActions", scheduler.ArgsGetScheduledActions{Tenant: utils.StringPointer("cgrates.org"), Account: utils.StringPointer("dan7")}, &rply); err != nil {
|
||||
t.Error("Unexpected error: ", err)
|
||||
} else if len(rply) == 0 {
|
||||
t.Errorf("ScheduledActions: %+v", rply)
|
||||
}
|
||||
}
|
||||
|
||||
// Test here RemoveActionTiming
|
||||
func TestApierRemUniqueIDActionTiming(t *testing.T) {
|
||||
var rmReply string
|
||||
@@ -1549,8 +1560,8 @@ func TestApierITRemAccountAliases(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestApierITGetScheduledActions(t *testing.T) {
|
||||
var rply []*ScheduledActions
|
||||
if err := rater.Call("ApierV1.GetScheduledActions", AttrsGetScheduledActions{}, &rply); err != nil {
|
||||
var rply []*scheduler.ScheduledAction
|
||||
if err := rater.Call("ApierV1.GetScheduledActions", scheduler.ArgsGetScheduledActions{}, &rply); err != nil {
|
||||
t.Error("Unexpected error: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,10 +21,10 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/scheduler"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
@@ -99,76 +99,12 @@ import (
|
||||
]
|
||||
*/
|
||||
|
||||
type AttrsGetScheduledActions struct {
|
||||
Tenant, Account string
|
||||
TimeStart, TimeEnd time.Time // Filter based on next runTime
|
||||
utils.Paginator
|
||||
}
|
||||
|
||||
type ScheduledActions struct {
|
||||
NextRunTime time.Time
|
||||
Accounts int
|
||||
ActionsId, ActionPlanId, ActionTimingUuid string
|
||||
}
|
||||
|
||||
func (self *ApierV1) GetScheduledActions(attrs AttrsGetScheduledActions, reply *[]*ScheduledActions) error {
|
||||
func (self *ApierV1) GetScheduledActions(args scheduler.ArgsGetScheduledActions, reply *[]*scheduler.ScheduledAction) error {
|
||||
sched := self.ServManager.GetScheduler()
|
||||
if sched == nil {
|
||||
return errors.New(utils.SchedulerNotRunningCaps)
|
||||
}
|
||||
schedActions := make([]*ScheduledActions, 0) // needs to be initialized if remains empty
|
||||
scheduledActions := sched.GetQueue()
|
||||
for _, qActions := range scheduledActions {
|
||||
sas := &ScheduledActions{ActionsId: qActions.ActionsID, ActionPlanId: qActions.GetActionPlanID(), ActionTimingUuid: qActions.Uuid, Accounts: len(qActions.GetAccountIDs())}
|
||||
if attrs.SearchTerm != "" &&
|
||||
!(strings.Contains(sas.ActionPlanId, attrs.SearchTerm) ||
|
||||
strings.Contains(sas.ActionsId, attrs.SearchTerm)) {
|
||||
continue
|
||||
}
|
||||
sas.NextRunTime = qActions.GetNextStartTime(time.Now())
|
||||
if !attrs.TimeStart.IsZero() && sas.NextRunTime.Before(attrs.TimeStart) {
|
||||
continue // Filter here only requests in the filtered interval
|
||||
}
|
||||
if !attrs.TimeEnd.IsZero() && (sas.NextRunTime.After(attrs.TimeEnd) || sas.NextRunTime.Equal(attrs.TimeEnd)) {
|
||||
continue
|
||||
}
|
||||
// filter on account
|
||||
if attrs.Tenant != "" || attrs.Account != "" {
|
||||
found := false
|
||||
for accID := range qActions.GetAccountIDs() {
|
||||
split := strings.Split(accID, utils.CONCATENATED_KEY_SEP)
|
||||
if len(split) != 2 {
|
||||
continue // malformed account id
|
||||
}
|
||||
if attrs.Tenant != "" && attrs.Tenant != split[0] {
|
||||
continue
|
||||
}
|
||||
if attrs.Account != "" && attrs.Account != split[1] {
|
||||
continue
|
||||
}
|
||||
found = true
|
||||
break
|
||||
}
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// we have a winner
|
||||
|
||||
schedActions = append(schedActions, sas)
|
||||
}
|
||||
if attrs.Paginator.Offset != nil {
|
||||
if *attrs.Paginator.Offset <= len(schedActions) {
|
||||
schedActions = schedActions[*attrs.Paginator.Offset:]
|
||||
}
|
||||
}
|
||||
if attrs.Paginator.Limit != nil {
|
||||
if *attrs.Paginator.Limit <= len(schedActions) {
|
||||
schedActions = schedActions[:*attrs.Paginator.Limit]
|
||||
}
|
||||
}
|
||||
*reply = schedActions
|
||||
*reply = sched.GetScheduledActions(args)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -17,13 +17,15 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package console
|
||||
|
||||
import "github.com/cgrates/cgrates/apier/v1"
|
||||
import (
|
||||
"github.com/cgrates/cgrates/scheduler"
|
||||
)
|
||||
|
||||
func init() {
|
||||
c := &CmdGetScheduledActions{
|
||||
name: "scheduler_queue",
|
||||
rpcMethod: "ApierV1.GetScheduledActions",
|
||||
rpcParams: &v1.AttrsGetScheduledActions{},
|
||||
rpcParams: &scheduler.ArgsGetScheduledActions{},
|
||||
}
|
||||
commands[c.Name()] = c
|
||||
c.CommandExecuter = &CommandExecuter{c}
|
||||
@@ -33,7 +35,7 @@ func init() {
|
||||
type CmdGetScheduledActions struct {
|
||||
name string
|
||||
rpcMethod string
|
||||
rpcParams *v1.AttrsGetScheduledActions
|
||||
rpcParams *scheduler.ArgsGetScheduledActions
|
||||
*CommandExecuter
|
||||
}
|
||||
|
||||
@@ -47,7 +49,7 @@ func (self *CmdGetScheduledActions) RpcMethod() string {
|
||||
|
||||
func (self *CmdGetScheduledActions) RpcParams(reset bool) interface{} {
|
||||
if reset || self.rpcParams == nil {
|
||||
self.rpcParams = &v1.AttrsGetScheduledActions{}
|
||||
self.rpcParams = &scheduler.ArgsGetScheduledActions{}
|
||||
}
|
||||
return self.rpcParams
|
||||
}
|
||||
@@ -57,6 +59,6 @@ func (self *CmdGetScheduledActions) PostprocessRpcParams() error {
|
||||
}
|
||||
|
||||
func (self *CmdGetScheduledActions) RpcResult() interface{} {
|
||||
s := make([]*v1.ScheduledActions, 0)
|
||||
s := make([]*scheduler.ScheduledAction, 0)
|
||||
return &s
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ package scheduler
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -90,6 +91,7 @@ func (s *Scheduler) Loop() {
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> Scheduler queue length: %v", len(s.queue)))
|
||||
s.Lock()
|
||||
a0 := s.queue[0]
|
||||
utils.Logger.Debug(fmt.Sprintf("Have at scheduled: %+v", a0))
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> Action: %s", a0.ActionsID))
|
||||
now := time.Now()
|
||||
start := a0.GetNextStartTime(now)
|
||||
@@ -174,6 +176,7 @@ func (s *Scheduler) loadActionPlans() {
|
||||
}
|
||||
at.SetAccountIDs(actionPlan.AccountIDs) // copy the accounts
|
||||
at.SetActionPlanID(actionPlan.Id)
|
||||
utils.Logger.Debug(fmt.Sprintf("Scheduling queue, add at: %+v", at))
|
||||
s.queue = append(s.queue, at)
|
||||
|
||||
}
|
||||
@@ -191,11 +194,64 @@ func (s *Scheduler) restart() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) GetQueue() (queue engine.ActionTimingPriorityList) {
|
||||
type ArgsGetScheduledActions struct {
|
||||
Tenant, Account *string
|
||||
TimeStart, TimeEnd *time.Time // Filter based on next runTime
|
||||
utils.Paginator
|
||||
}
|
||||
|
||||
type ScheduledAction struct {
|
||||
NextRunTime time.Time
|
||||
Accounts int // Number of acccounts this action will run on
|
||||
ActionPlanID, ActionTimingUUID, ActionsID string
|
||||
}
|
||||
|
||||
func (s *Scheduler) GetScheduledActions(fltr ArgsGetScheduledActions) (schedActions []*ScheduledAction) {
|
||||
s.RLock()
|
||||
utils.Clone(s.queue, &queue)
|
||||
defer s.RUnlock()
|
||||
return queue
|
||||
for _, at := range s.queue {
|
||||
sas := &ScheduledAction{NextRunTime: at.GetNextStartTime(time.Now()), Accounts: len(at.GetAccountIDs()),
|
||||
ActionPlanID: at.GetActionPlanID(), ActionTimingUUID: at.Uuid, ActionsID: at.ActionsID}
|
||||
if fltr.TimeStart != nil && !fltr.TimeStart.IsZero() && sas.NextRunTime.Before(*fltr.TimeStart) {
|
||||
continue // need to match the filter interval
|
||||
}
|
||||
if fltr.TimeEnd != nil && !fltr.TimeEnd.IsZero() && (sas.NextRunTime.After(*fltr.TimeEnd) || sas.NextRunTime.Equal(*fltr.TimeEnd)) {
|
||||
continue
|
||||
}
|
||||
// filter on account
|
||||
if fltr.Tenant != nil || fltr.Account != nil {
|
||||
found := false
|
||||
for accID := range at.GetAccountIDs() {
|
||||
split := strings.Split(accID, utils.CONCATENATED_KEY_SEP)
|
||||
if len(split) != 2 {
|
||||
continue // malformed account id
|
||||
}
|
||||
if fltr.Tenant != nil && *fltr.Tenant != split[0] {
|
||||
continue
|
||||
}
|
||||
if fltr.Account != nil && *fltr.Account != split[1] {
|
||||
continue
|
||||
}
|
||||
found = true
|
||||
break
|
||||
}
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
}
|
||||
schedActions = append(schedActions, sas)
|
||||
}
|
||||
if fltr.Paginator.Offset != nil {
|
||||
if *fltr.Paginator.Offset <= len(schedActions) {
|
||||
schedActions = schedActions[*fltr.Paginator.Offset:]
|
||||
}
|
||||
}
|
||||
if fltr.Paginator.Limit != nil {
|
||||
if *fltr.Paginator.Limit <= len(schedActions) {
|
||||
schedActions = schedActions[:*fltr.Paginator.Limit]
|
||||
}
|
||||
}
|
||||
s.RUnlock()
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Scheduler) Shutdown() {
|
||||
|
||||
Reference in New Issue
Block a user