mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-20 06:38:45 +05:00
Merge branch 'master' into hapool
This commit is contained in:
@@ -99,8 +99,7 @@ func (self *ApierV1) RemActionTiming(attrs AttrRemActionTiming, reply *string) e
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
if attrs.ReloadScheduler && self.Sched != nil {
|
||||
self.Sched.LoadActionPlans(self.RatingDb)
|
||||
self.Sched.Restart()
|
||||
self.Sched.Reload(true)
|
||||
}
|
||||
*reply = OK
|
||||
return nil
|
||||
@@ -161,6 +160,7 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) error
|
||||
if missing := utils.MissingStructFields(&attr, []string{"Tenant", "Account"}); len(missing) != 0 {
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
}
|
||||
var schedulerReloadNeeded = false
|
||||
accId := utils.AccountKey(attr.Tenant, attr.Account)
|
||||
var ub *engine.Account
|
||||
_, err := engine.Guardian.Guard(func() (interface{}, error) {
|
||||
@@ -183,15 +183,12 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) error
|
||||
at.AccountIds = append(at.AccountIds, accId)
|
||||
}
|
||||
if len(ats) != 0 {
|
||||
schedulerReloadNeeded = true
|
||||
if err := self.RatingDb.SetActionPlans(attr.ActionPlanId, ats); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// update cache
|
||||
self.RatingDb.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + attr.ActionPlanId}})
|
||||
if self.Sched != nil {
|
||||
self.Sched.LoadActionPlans(self.RatingDb)
|
||||
self.Sched.Restart()
|
||||
}
|
||||
}
|
||||
return 0, nil
|
||||
}, 0, utils.ACTION_PLAN_PREFIX)
|
||||
@@ -223,7 +220,12 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) error
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
|
||||
if schedulerReloadNeeded {
|
||||
// reload scheduler
|
||||
if self.Sched != nil {
|
||||
self.Sched.Reload(true)
|
||||
}
|
||||
}
|
||||
*reply = OK // This will mark saving of the account, error still can show up in actionTimingsId
|
||||
return nil
|
||||
}
|
||||
@@ -233,6 +235,7 @@ func (self *ApierV1) RemoveAccount(attr utils.AttrRemoveAccount, reply *string)
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
}
|
||||
accountId := utils.AccountKey(attr.Tenant, attr.Account)
|
||||
var schedulerReloadNeeded bool
|
||||
_, err := engine.Guardian.Guard(func() (interface{}, error) {
|
||||
// remove it from all action plans
|
||||
allATs, err := self.RatingDb.GetAllActionPlans()
|
||||
@@ -247,16 +250,23 @@ func (self *ApierV1) RemoveAccount(attr utils.AttrRemoveAccount, reply *string)
|
||||
// delete without preserving order
|
||||
at.AccountIds[i] = at.AccountIds[len(at.AccountIds)-1]
|
||||
at.AccountIds = at.AccountIds[:len(at.AccountIds)-1]
|
||||
i -= 1
|
||||
i--
|
||||
changed = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if changed {
|
||||
// save action plan
|
||||
self.RatingDb.SetActionPlans(key, ats)
|
||||
// cache
|
||||
self.RatingDb.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}})
|
||||
schedulerReloadNeeded = true
|
||||
_, err := engine.Guardian.Guard(func() (interface{}, error) {
|
||||
// save action plan
|
||||
self.RatingDb.SetActionPlans(key, ats)
|
||||
// cache
|
||||
self.RatingDb.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}})
|
||||
return 0, nil
|
||||
}, 0, utils.ACTION_PLAN_PREFIX)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := self.AccountDb.RemoveAccount(accountId); err != nil {
|
||||
@@ -268,7 +278,12 @@ func (self *ApierV1) RemoveAccount(attr utils.AttrRemoveAccount, reply *string)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
|
||||
if schedulerReloadNeeded {
|
||||
// reload scheduler
|
||||
if self.Sched != nil {
|
||||
self.Sched.Reload(true)
|
||||
}
|
||||
}
|
||||
*reply = OK
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -113,6 +113,7 @@ type AttrAddBalance struct {
|
||||
Value float64
|
||||
ExpiryTime string
|
||||
RatingSubject string
|
||||
Categories string
|
||||
DestinationIds string
|
||||
Weight float64
|
||||
SharedGroups string
|
||||
@@ -121,14 +122,14 @@ type AttrAddBalance struct {
|
||||
}
|
||||
|
||||
func (self *ApierV1) AddBalance(attr *AttrAddBalance, reply *string) error {
|
||||
expTime, err := utils.ParseDate(attr.ExpiryTime)
|
||||
expTime, err := utils.ParseTimeDetectLayout(attr.ExpiryTime, self.Config.DefaultTimezone)
|
||||
if err != nil {
|
||||
*reply = err.Error()
|
||||
return err
|
||||
}
|
||||
tag := utils.ConcatenatedKey(attr.Tenant, attr.Account)
|
||||
if _, err := self.AccountDb.GetAccount(tag); err != nil {
|
||||
// create user balance if not exists
|
||||
// create account if not exists
|
||||
account := &engine.Account{
|
||||
Id: tag,
|
||||
}
|
||||
@@ -159,6 +160,7 @@ func (self *ApierV1) AddBalance(attr *AttrAddBalance, reply *string) error {
|
||||
RatingSubject: attr.RatingSubject,
|
||||
Directions: utils.ParseStringMap(attr.Directions),
|
||||
DestinationIds: utils.ParseStringMap(attr.DestinationIds),
|
||||
Categories: utils.ParseStringMap(attr.Categories),
|
||||
Weight: attr.Weight,
|
||||
SharedGroups: utils.ParseStringMap(attr.SharedGroups),
|
||||
Disabled: attr.Disabled,
|
||||
@@ -520,8 +522,7 @@ func (self *ApierV1) LoadTariffPlanFromStorDb(attrs AttrLoadTpFromStorDb, reply
|
||||
|
||||
if len(aps) != 0 && self.Sched != nil {
|
||||
utils.Logger.Info("ApierV1.LoadTariffPlanFromStorDb, reloading scheduler.")
|
||||
self.Sched.LoadActionPlans(self.RatingDb)
|
||||
self.Sched.Restart()
|
||||
self.Sched.Reload(true)
|
||||
}
|
||||
|
||||
if len(cstKeys) != 0 && self.CdrStatsSrv != nil {
|
||||
@@ -590,16 +591,22 @@ func (self *ApierV1) SetRatingProfile(attrs AttrSetRatingProfile, reply *string)
|
||||
}
|
||||
tpRpf := utils.TPRatingProfile{Tenant: attrs.Tenant, Category: attrs.Category, Direction: attrs.Direction, Subject: attrs.Subject}
|
||||
keyId := tpRpf.KeyId()
|
||||
var rpfl *engine.RatingProfile
|
||||
if !attrs.Overwrite {
|
||||
if exists, err := self.RatingDb.HasData(utils.RATING_PROFILE_PREFIX, keyId); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
} else if exists {
|
||||
return utils.ErrExists
|
||||
var err error
|
||||
if rpfl, err = self.RatingDb.GetRatingProfile(keyId, false); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
rpfl := &engine.RatingProfile{Id: keyId, RatingPlanActivations: make(engine.RatingPlanActivations, len(attrs.RatingPlanActivations))}
|
||||
for idx, ra := range attrs.RatingPlanActivations {
|
||||
at, err := utils.ParseDate(ra.ActivationTime)
|
||||
if rpfl == nil {
|
||||
rpfl = &engine.RatingProfile{Id: keyId, RatingPlanActivations: make(engine.RatingPlanActivations, 0)}
|
||||
}
|
||||
for _, ra := range attrs.RatingPlanActivations {
|
||||
at, err := utils.ParseTimeDetectLayout(ra.ActivationTime, self.Config.DefaultTimezone)
|
||||
if err != nil {
|
||||
return fmt.Errorf(fmt.Sprintf("%s:Cannot parse activation time from %v", utils.ErrServerError.Error(), ra.ActivationTime))
|
||||
}
|
||||
@@ -608,8 +615,8 @@ func (self *ApierV1) SetRatingProfile(attrs AttrSetRatingProfile, reply *string)
|
||||
} else if !exists {
|
||||
return fmt.Errorf(fmt.Sprintf("%s:RatingPlanId:%s", utils.ErrNotFound.Error(), ra.RatingPlanId))
|
||||
}
|
||||
rpfl.RatingPlanActivations[idx] = &engine.RatingPlanActivation{ActivationTime: at, RatingPlanId: ra.RatingPlanId,
|
||||
FallbackKeys: utils.FallbackSubjKeys(tpRpf.Direction, tpRpf.Tenant, tpRpf.Category, ra.FallbackSubjects)}
|
||||
rpfl.RatingPlanActivations = append(rpfl.RatingPlanActivations, &engine.RatingPlanActivation{ActivationTime: at, RatingPlanId: ra.RatingPlanId,
|
||||
FallbackKeys: utils.FallbackSubjKeys(tpRpf.Direction, tpRpf.Tenant, tpRpf.Category, ra.FallbackSubjects)})
|
||||
}
|
||||
if err := self.RatingDb.SetRatingProfile(rpfl); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
@@ -769,13 +776,37 @@ func (self *ApierV1) SetActionPlan(attrs AttrSetActionPlan, reply *string) error
|
||||
if self.Sched == nil {
|
||||
return errors.New("SCHEDULER_NOT_ENABLED")
|
||||
}
|
||||
self.Sched.LoadActionPlans(self.RatingDb)
|
||||
self.Sched.Restart()
|
||||
self.Sched.Reload(true)
|
||||
}
|
||||
*reply = OK
|
||||
return nil
|
||||
}
|
||||
|
||||
type AttrGetActionPlan struct {
|
||||
Id string
|
||||
}
|
||||
|
||||
func (self *ApierV1) GetActionPlan(attr AttrGetActionPlan, reply *[]engine.ActionPlans) error {
|
||||
var result []engine.ActionPlans
|
||||
if attr.Id == "" || attr.Id == "*" {
|
||||
aplsMap, err := self.RatingDb.GetAllActionPlans()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, apls := range aplsMap {
|
||||
result = append(result, apls)
|
||||
}
|
||||
} else {
|
||||
apls, err := self.RatingDb.GetActionPlans(attr.Id, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
result = append(result, apls)
|
||||
}
|
||||
*reply = result
|
||||
return nil
|
||||
}
|
||||
|
||||
type AttrAddActionTrigger struct {
|
||||
ActionTriggersId string
|
||||
Tenant string
|
||||
@@ -928,8 +959,7 @@ func (self *ApierV1) LoadAccountActions(attrs utils.TPAccountActions, reply *str
|
||||
return err
|
||||
}
|
||||
if self.Sched != nil {
|
||||
self.Sched.LoadActionPlans(self.RatingDb)
|
||||
self.Sched.Restart()
|
||||
self.Sched.Reload(true)
|
||||
}
|
||||
*reply = OK
|
||||
return nil
|
||||
@@ -939,8 +969,7 @@ func (self *ApierV1) ReloadScheduler(input string, reply *string) error {
|
||||
if self.Sched == nil {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
self.Sched.LoadActionPlans(self.RatingDb)
|
||||
self.Sched.Restart()
|
||||
self.Sched.Reload(true)
|
||||
*reply = OK
|
||||
return nil
|
||||
}
|
||||
@@ -1184,8 +1213,7 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
|
||||
}
|
||||
if len(aps) != 0 && self.Sched != nil {
|
||||
utils.Logger.Info("ApierV1.LoadTariffPlanFromFolder, reloading scheduler.")
|
||||
self.Sched.LoadActionPlans(self.RatingDb)
|
||||
self.Sched.Restart()
|
||||
self.Sched.Reload(true)
|
||||
}
|
||||
if len(cstKeys) != 0 && self.CdrStatsSrv != nil {
|
||||
var out int
|
||||
|
||||
@@ -735,8 +735,8 @@ func TestApierSetRatingProfile(t *testing.T) {
|
||||
} else if reply != "OK" {
|
||||
t.Error("Calling ApierV1.SetRatingProfile got reply: ", reply)
|
||||
}
|
||||
// Calling the second time should raise EXISTS
|
||||
if err := rater.Call("ApierV1.SetRatingProfile", rpf, &reply); err == nil || err.Error() != "EXISTS" {
|
||||
// Calling the second time should not raise EXISTS
|
||||
if err := rater.Call("ApierV1.SetRatingProfile", rpf, &reply); err != nil {
|
||||
t.Error("Unexpected result on duplication: ", err.Error())
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond) // Give time for cache reload
|
||||
@@ -1133,6 +1133,7 @@ func TestApierGetAccount(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond) // give scheduler time to react
|
||||
var reply *engine.Account
|
||||
attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"}
|
||||
if err := rater.Call("ApierV2.GetAccount", attrs, &reply); err != nil {
|
||||
|
||||
@@ -105,7 +105,7 @@ type AttrsGetScheduledActions struct {
|
||||
|
||||
type ScheduledActions struct {
|
||||
NextRunTime time.Time
|
||||
Accounts []*utils.TenantAccount
|
||||
Accounts int
|
||||
ActionsId, ActionPlanId, ActionPlanUuid string
|
||||
}
|
||||
|
||||
@@ -116,7 +116,7 @@ func (self *ApierV1) GetScheduledActions(attrs AttrsGetScheduledActions, reply *
|
||||
schedActions := make([]*ScheduledActions, 0) // needs to be initialized if remains empty
|
||||
scheduledActions := self.Sched.GetQueue()
|
||||
for _, qActions := range scheduledActions {
|
||||
sas := &ScheduledActions{ActionsId: qActions.ActionsId, ActionPlanId: qActions.Id, ActionPlanUuid: qActions.Uuid}
|
||||
sas := &ScheduledActions{ActionsId: qActions.ActionsId, ActionPlanId: qActions.Id, ActionPlanUuid: qActions.Uuid, Accounts: len(qActions.AccountIds)}
|
||||
if attrs.SearchTerm != "" &&
|
||||
!(strings.Contains(sas.ActionPlanId, attrs.SearchTerm) ||
|
||||
strings.Contains(sas.ActionsId, attrs.SearchTerm)) {
|
||||
@@ -150,7 +150,9 @@ func (self *ApierV1) GetScheduledActions(attrs AttrsGetScheduledActions, reply *
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// we have a winner
|
||||
|
||||
schedActions = append(schedActions, sas)
|
||||
}
|
||||
if attrs.Paginator.Offset != nil {
|
||||
|
||||
@@ -91,8 +91,7 @@ func (self *ApierV2) LoadAccountActions(attrs AttrLoadAccountActions, reply *str
|
||||
return err
|
||||
}
|
||||
if self.Sched != nil {
|
||||
self.Sched.LoadActionPlans(self.RatingDb)
|
||||
self.Sched.Restart()
|
||||
self.Sched.Reload(true)
|
||||
}
|
||||
*reply = v1.OK
|
||||
return nil
|
||||
@@ -249,8 +248,7 @@ func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
|
||||
}
|
||||
if len(aps) != 0 && self.Sched != nil {
|
||||
utils.Logger.Info("ApierV1.LoadTariffPlanFromFolder, reloading scheduler.")
|
||||
self.Sched.LoadActionPlans(self.RatingDb)
|
||||
self.Sched.Restart()
|
||||
self.Sched.Reload(true)
|
||||
}
|
||||
if len(cstKeys) != 0 && self.CdrStatsSrv != nil {
|
||||
var out int
|
||||
|
||||
@@ -498,11 +498,11 @@ func startScheduler(internalSchedulerChan chan *scheduler.Scheduler, cacheDoneCh
|
||||
cacheDone := <-cacheDoneChan
|
||||
cacheDoneChan <- cacheDone
|
||||
utils.Logger.Info("Starting CGRateS Scheduler.")
|
||||
sched := scheduler.NewScheduler()
|
||||
sched := scheduler.NewScheduler(ratingDb)
|
||||
go reloadSchedulerSingnalHandler(sched, ratingDb)
|
||||
time.Sleep(1)
|
||||
internalSchedulerChan <- sched
|
||||
sched.LoadActionPlans(ratingDb)
|
||||
sched.Reload(true)
|
||||
sched.Loop()
|
||||
exitChan <- true // Should not get out of loop though
|
||||
}
|
||||
|
||||
@@ -126,9 +126,7 @@ func reloadSchedulerSingnalHandler(sched *scheduler.Scheduler, getter engine.Rat
|
||||
sig := <-c
|
||||
|
||||
utils.Logger.Info(fmt.Sprintf("Caught signal %v, reloading action timings.\n", sig))
|
||||
sched.LoadActionPlans(getter)
|
||||
// check the tip of the queue for new actions
|
||||
sched.Restart()
|
||||
sched.Reload(true)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
63
console/account_actionplan_get.go
Normal file
63
console/account_actionplan_get.go
Normal file
@@ -0,0 +1,63 @@
|
||||
/*
|
||||
Rating system designed to be used in VoIP Carriers World
|
||||
Copyright (C) 2012-2015 ITsysCOM
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package console
|
||||
|
||||
import "github.com/cgrates/cgrates/apier/v1"
|
||||
|
||||
func init() {
|
||||
c := &CmdGetAccountActionPlan{
|
||||
name: "account_actionplan_get",
|
||||
rpcMethod: "ApierV1.GetAccountActionPlan",
|
||||
rpcParams: &v1.AttrAcntAction{},
|
||||
}
|
||||
commands[c.Name()] = c
|
||||
c.CommandExecuter = &CommandExecuter{c}
|
||||
}
|
||||
|
||||
// Commander implementation
|
||||
type CmdGetAccountActionPlan struct {
|
||||
name string
|
||||
rpcMethod string
|
||||
rpcParams *v1.AttrAcntAction
|
||||
*CommandExecuter
|
||||
}
|
||||
|
||||
func (self *CmdGetAccountActionPlan) Name() string {
|
||||
return self.name
|
||||
}
|
||||
|
||||
func (self *CmdGetAccountActionPlan) RpcMethod() string {
|
||||
return self.rpcMethod
|
||||
}
|
||||
|
||||
func (self *CmdGetAccountActionPlan) RpcParams(reset bool) interface{} {
|
||||
if reset || self.rpcParams == nil {
|
||||
self.rpcParams = &v1.AttrAcntAction{}
|
||||
}
|
||||
return self.rpcParams
|
||||
}
|
||||
|
||||
func (self *CmdGetAccountActionPlan) PostprocessRpcParams() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *CmdGetAccountActionPlan) RpcResult() interface{} {
|
||||
s := make([]*v1.AccountActionTiming, 0)
|
||||
return &s
|
||||
}
|
||||
@@ -22,7 +22,7 @@ import "github.com/cgrates/cgrates/utils"
|
||||
|
||||
func init() {
|
||||
c := &CmdAddAccount{
|
||||
name: "account_add",
|
||||
name: "account_set",
|
||||
rpcMethod: "ApierV1.SetAccount",
|
||||
}
|
||||
commands[c.Name()] = c
|
||||
66
console/actionplan_get.go
Normal file
66
console/actionplan_get.go
Normal file
@@ -0,0 +1,66 @@
|
||||
/*
|
||||
Rating system designed to be used in VoIP Carriers World
|
||||
Copyright (C) 2012-2015 ITsysCOM
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package console
|
||||
|
||||
import (
|
||||
"github.com/cgrates/cgrates/apier/v1"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
)
|
||||
|
||||
func init() {
|
||||
c := &CmdGetActionPlan{
|
||||
name: "actionplan_get",
|
||||
rpcMethod: "ApierV1.GetActionPlan",
|
||||
rpcParams: &v1.AttrGetActionPlan{},
|
||||
}
|
||||
commands[c.Name()] = c
|
||||
c.CommandExecuter = &CommandExecuter{c}
|
||||
}
|
||||
|
||||
// Commander implementation
|
||||
type CmdGetActionPlan struct {
|
||||
name string
|
||||
rpcMethod string
|
||||
rpcParams *v1.AttrGetActionPlan
|
||||
*CommandExecuter
|
||||
}
|
||||
|
||||
func (self *CmdGetActionPlan) Name() string {
|
||||
return self.name
|
||||
}
|
||||
|
||||
func (self *CmdGetActionPlan) RpcMethod() string {
|
||||
return self.rpcMethod
|
||||
}
|
||||
|
||||
func (self *CmdGetActionPlan) RpcParams(reset bool) interface{} {
|
||||
if reset || self.rpcParams == nil {
|
||||
self.rpcParams = &v1.AttrGetActionPlan{}
|
||||
}
|
||||
return self.rpcParams
|
||||
}
|
||||
|
||||
func (self *CmdGetActionPlan) PostprocessRpcParams() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *CmdGetActionPlan) RpcResult() interface{} {
|
||||
s := make([]*engine.ActionPlans, 0)
|
||||
return &s
|
||||
}
|
||||
@@ -18,45 +18,46 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
|
||||
package console
|
||||
|
||||
import "github.com/cgrates/cgrates/utils"
|
||||
import "github.com/cgrates/cgrates/apier/v1"
|
||||
|
||||
func init() {
|
||||
c := &CmdSetAccountActions{
|
||||
name: "accountactions_set",
|
||||
rpcMethod: "ApierV1.SetAccountActions",
|
||||
c := &CmdSetActionPlan{
|
||||
name: "actionplan_set",
|
||||
rpcMethod: "ApierV1.SetActionPlan",
|
||||
rpcParams: &v1.AttrSetActionPlan{},
|
||||
}
|
||||
commands[c.Name()] = c
|
||||
c.CommandExecuter = &CommandExecuter{c}
|
||||
}
|
||||
|
||||
// Commander implementation
|
||||
type CmdSetAccountActions struct {
|
||||
type CmdSetActionPlan struct {
|
||||
name string
|
||||
rpcMethod string
|
||||
rpcParams *utils.TPAccountActions
|
||||
rpcParams *v1.AttrSetActionPlan
|
||||
*CommandExecuter
|
||||
}
|
||||
|
||||
func (self *CmdSetAccountActions) Name() string {
|
||||
func (self *CmdSetActionPlan) Name() string {
|
||||
return self.name
|
||||
}
|
||||
|
||||
func (self *CmdSetAccountActions) RpcMethod() string {
|
||||
func (self *CmdSetActionPlan) RpcMethod() string {
|
||||
return self.rpcMethod
|
||||
}
|
||||
|
||||
func (self *CmdSetAccountActions) RpcParams(reset bool) interface{} {
|
||||
func (self *CmdSetActionPlan) RpcParams(reset bool) interface{} {
|
||||
if reset || self.rpcParams == nil {
|
||||
self.rpcParams = &utils.TPAccountActions{}
|
||||
self.rpcParams = &v1.AttrSetActionPlan{}
|
||||
}
|
||||
return self.rpcParams
|
||||
}
|
||||
|
||||
func (self *CmdSetAccountActions) PostprocessRpcParams() error {
|
||||
func (self *CmdSetActionPlan) PostprocessRpcParams() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *CmdSetAccountActions) RpcResult() interface{} {
|
||||
func (self *CmdSetActionPlan) RpcResult() interface{} {
|
||||
var s string
|
||||
return &s
|
||||
}
|
||||
@@ -218,7 +218,7 @@ YEARS:
|
||||
return
|
||||
}
|
||||
|
||||
func (at *ActionPlan) resetStartTimeCache() {
|
||||
func (at *ActionPlan) ResetStartTimeCache() {
|
||||
at.stCache = time.Date(1, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
}
|
||||
|
||||
@@ -238,7 +238,7 @@ func (at *ActionPlan) Execute() (err error) {
|
||||
if len(at.AccountIds) == 0 { // nothing to do if no accounts set
|
||||
return
|
||||
}
|
||||
at.resetStartTimeCache()
|
||||
at.ResetStartTimeCache()
|
||||
aac, err := at.getActions()
|
||||
if err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("Failed to get actions for %s: %s", at.ActionsId, err))
|
||||
@@ -283,7 +283,7 @@ func (at *ActionPlan) Execute() (err error) {
|
||||
// delete without preserving order
|
||||
at.AccountIds[i] = at.AccountIds[len(at.AccountIds)-1]
|
||||
at.AccountIds = at.AccountIds[:len(at.AccountIds)-1]
|
||||
i -= 1
|
||||
i--
|
||||
changed = true
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,6 +56,17 @@ func TestActionPlanNothing(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestActionTimingMidnight(t *testing.T) {
|
||||
at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{StartTime: "00:00:00"}}}
|
||||
y, m, d := referenceDate.Date()
|
||||
now := time.Date(y, m, d, 0, 0, 1, 0, time.Local)
|
||||
st := at.GetNextStartTime(now)
|
||||
expected := time.Date(y, m, d, 0, 0, 0, 0, time.Local).AddDate(0, 0, 1)
|
||||
if !st.Equal(expected) {
|
||||
t.Errorf("Expected %v was %v", expected, st)
|
||||
}
|
||||
}
|
||||
|
||||
func TestActionPlanOnlyHour(t *testing.T) {
|
||||
at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{StartTime: "10:01:00"}}}
|
||||
st := at.GetNextStartTime(referenceDate)
|
||||
|
||||
@@ -58,10 +58,11 @@ func (rit *RITiming) CronString() string {
|
||||
hour, min, sec = "*", "*", "*"
|
||||
} else {
|
||||
hms := strings.Split(rit.StartTime, ":")
|
||||
if len(hms) != 3 {
|
||||
if len(hms) == 3 {
|
||||
hour, min, sec = hms[0], hms[1], hms[2]
|
||||
} else {
|
||||
hour, min, sec = "*", "*", "*"
|
||||
}
|
||||
hour, min, sec = hms[0], hms[1], hms[2]
|
||||
if strings.HasPrefix(hour, "0") {
|
||||
hour = hour[1:]
|
||||
}
|
||||
|
||||
@@ -129,8 +129,8 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10`
|
||||
}
|
||||
|
||||
func TestExecuteActions(t *testing.T) {
|
||||
scheduler.NewScheduler().LoadActionPlans(ratingDb)
|
||||
time.Sleep(time.Millisecond) // Give time to scheduler to topup the account
|
||||
scheduler.NewScheduler(ratingDb).Reload(false)
|
||||
time.Sleep(10 * time.Millisecond) // Give time to scheduler to topup the account
|
||||
if acnt, err := acntDb.GetAccount("cgrates.org:12344"); err != nil {
|
||||
t.Error(err)
|
||||
} else if len(acnt.BalanceMap) != 2 {
|
||||
|
||||
@@ -128,8 +128,8 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10`
|
||||
}
|
||||
|
||||
func TestExecuteActions2(t *testing.T) {
|
||||
scheduler.NewScheduler().LoadActionPlans(ratingDb2)
|
||||
time.Sleep(time.Millisecond) // Give time to scheduler to topup the account
|
||||
scheduler.NewScheduler(ratingDb2).Reload(false)
|
||||
time.Sleep(10 * time.Millisecond) // Give time to scheduler to topup the account
|
||||
if acnt, err := acntDb2.GetAccount("cgrates.org:12345"); err != nil {
|
||||
t.Error(err)
|
||||
} else if len(acnt.BalanceMap) != 2 {
|
||||
|
||||
@@ -126,8 +126,8 @@ RP_UK,DR_UK_Mobile_BIG5,ALWAYS,10`
|
||||
}
|
||||
|
||||
func TestExecuteActions3(t *testing.T) {
|
||||
scheduler.NewScheduler().LoadActionPlans(ratingDb3)
|
||||
time.Sleep(time.Millisecond) // Give time to scheduler to topup the account
|
||||
scheduler.NewScheduler(ratingDb3).Reload(false)
|
||||
time.Sleep(10 * time.Millisecond) // Give time to scheduler to topup the account
|
||||
if acnt, err := acntDb3.GetAccount("cgrates.org:12346"); err != nil {
|
||||
t.Error(err)
|
||||
} else if len(acnt.BalanceMap) != 1 {
|
||||
|
||||
@@ -104,7 +104,7 @@ func TestTutLocalLoadTariffPlanFromFolder(t *testing.T) {
|
||||
} else if loadInst.LoadId == "" {
|
||||
t.Error("Empty loadId received, loadInstance: ", loadInst)
|
||||
}
|
||||
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups
|
||||
time.Sleep(100*time.Millisecond + time.Duration(*waitRater)*time.Millisecond) // Give time for scheduler to execute topups
|
||||
}
|
||||
|
||||
// Check loaded stats
|
||||
@@ -1100,6 +1100,7 @@ func TestTutLocalSetAccount(t *testing.T) {
|
||||
Offset int // Set the item offset
|
||||
Limit int // Limit number of items retrieved
|
||||
}
|
||||
time.Sleep(100*time.Millisecond + time.Duration(*waitRater)*time.Millisecond) // Give time for scheduler to execute topups
|
||||
var acnts []*engine.Account
|
||||
if err := tutLocalRpc.Call("ApierV2.GetAccounts", utils.AttrGetAccounts{Tenant: attrs.Tenant, AccountIds: []string{attrs.Account}}, &acnts); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
@@ -33,26 +33,37 @@ type Scheduler struct {
|
||||
timer *time.Timer
|
||||
restartLoop chan bool
|
||||
sync.Mutex
|
||||
storage engine.RatingStorage
|
||||
waitingReload bool
|
||||
loopChecker chan int
|
||||
schedulerStarted bool
|
||||
}
|
||||
|
||||
func NewScheduler() *Scheduler {
|
||||
return &Scheduler{restartLoop: make(chan bool)}
|
||||
func NewScheduler(storage engine.RatingStorage) *Scheduler {
|
||||
return &Scheduler{
|
||||
restartLoop: make(chan bool),
|
||||
storage: storage,
|
||||
loopChecker: make(chan int),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) Loop() {
|
||||
s.schedulerStarted = true
|
||||
for {
|
||||
for len(s.queue) == 0 { //hang here if empty
|
||||
<-s.restartLoop
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> Scheduler queue length: %v", len(s.queue)))
|
||||
s.Lock()
|
||||
a0 := s.queue[0]
|
||||
//utils.Logger.Info(fmt.Sprintf("Scheduler qeue length: %v", len(s.qeue)))
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> Action: %s", a0.Id))
|
||||
now := time.Now()
|
||||
start := a0.GetNextStartTime(now)
|
||||
if start.Equal(now) || start.Before(now) {
|
||||
go a0.Execute()
|
||||
// if after execute the next start time is in the past then
|
||||
// do not add it to the queue
|
||||
a0.ResetStartTimeCache()
|
||||
now = time.Now().Add(time.Second)
|
||||
start = a0.GetNextStartTime(now)
|
||||
if start.Before(now) {
|
||||
@@ -71,7 +82,7 @@ func (s *Scheduler) Loop() {
|
||||
select {
|
||||
case <-s.timer.C:
|
||||
// timer has expired
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> Time for action on %v", a0))
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> Time for action on %v", a0.Id))
|
||||
case <-s.restartLoop:
|
||||
// nothing to do, just continue the loop
|
||||
}
|
||||
@@ -79,14 +90,44 @@ func (s *Scheduler) Loop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) LoadActionPlans(storage engine.RatingStorage) {
|
||||
actionPlans, err := storage.GetAllActionPlans()
|
||||
func (s *Scheduler) Reload(protect bool) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
if protect {
|
||||
if s.waitingReload {
|
||||
s.loopChecker <- 1
|
||||
}
|
||||
s.waitingReload = true
|
||||
go func() {
|
||||
t := time.NewTicker(100 * time.Millisecond) // wait for loops before start
|
||||
select {
|
||||
case <-s.loopChecker:
|
||||
t.Stop() // cancel reload
|
||||
case <-t.C:
|
||||
s.loadActionPlans()
|
||||
s.restart()
|
||||
t.Stop()
|
||||
s.waitingReload = false
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
go func() {
|
||||
s.loadActionPlans()
|
||||
s.restart()
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) loadActionPlans() {
|
||||
actionPlans, err := s.storage.GetAllActionPlans()
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
utils.Logger.Warning(fmt.Sprintf("<Scheduler> Cannot get action plans: %v", err))
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> processing %d action plans", len(actionPlans)))
|
||||
// recreate the queue
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.queue = engine.ActionPlanPriotityList{}
|
||||
for key, aps := range actionPlans {
|
||||
toBeSaved := false
|
||||
@@ -120,19 +161,20 @@ func (s *Scheduler) LoadActionPlans(storage engine.RatingStorage) {
|
||||
}
|
||||
if toBeSaved {
|
||||
engine.Guardian.Guard(func() (interface{}, error) {
|
||||
storage.SetActionPlans(key, newApls)
|
||||
storage.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}})
|
||||
s.storage.SetActionPlans(key, newApls)
|
||||
s.storage.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}})
|
||||
return 0, nil
|
||||
}, 0, utils.ACTION_PLAN_PREFIX)
|
||||
}
|
||||
}
|
||||
sort.Sort(s.queue)
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> queued %d action plans", len(s.queue)))
|
||||
s.Unlock()
|
||||
}
|
||||
|
||||
func (s *Scheduler) Restart() {
|
||||
s.restartLoop <- true
|
||||
func (s *Scheduler) restart() {
|
||||
if s.schedulerStarted {
|
||||
s.restartLoop <- true
|
||||
}
|
||||
if s.timer != nil {
|
||||
s.timer.Stop()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user