using tasks and action plans with no accounts

This commit is contained in:
Radu Ioan Fericean
2015-12-17 19:10:38 +02:00
parent 9d228d80bf
commit 65516e201e
13 changed files with 230 additions and 72 deletions

View File

@@ -34,11 +34,17 @@ const (
type ActionTiming struct {
Uuid string
Timing *RateInterval
ActionsId string
ActionsID string
Weight float64
actions Actions
accountIDs map[string]struct{}
stCache time.Time // cached time of the next start
accountIDs map[string]struct{} // copy of action plans accounts
stCache time.Time // cached time of the next start
}
type Task struct {
Uuid string
AccountID string
ActionsID string
}
type ActionPlan struct {
@@ -55,6 +61,14 @@ func (apl *ActionPlan) RemoveAccountID(accID string) (found bool) {
return
}
func (t *Task) Execute() error {
return (&ActionTiming{
Uuid: t.Uuid,
ActionsID: t.ActionsID,
accountIDs: map[string]struct{}{t.AccountID: struct{}{}},
}).Execute()
}
func (at *ActionTiming) GetNextStartTime(now time.Time) (t time.Time) {
if !at.stCache.IsZero() {
return at.stCache
@@ -236,37 +250,37 @@ func (at *ActionTiming) SetActions(as Actions) {
at.actions = as
}
func (at *ActionTiming) SetAccountIDs(accIDs map[string]struct{}) {
at.accountIDs = accIDs
}
func (at *ActionTiming) getActions() (as []*Action, err error) {
if at.actions == nil {
at.actions, err = ratingStorage.GetActions(at.ActionsId, false)
at.actions, err = ratingStorage.GetActions(at.ActionsID, false)
}
at.actions.Sort()
return at.actions, err
}
func (at *ActionTiming) Execute() (err error) {
if len(at.accountIDs) == 0 { // nothing to do if no accounts set
return
}
at.ResetStartTimeCache()
aac, err := at.getActions()
if err != nil {
utils.Logger.Err(fmt.Sprintf("Failed to get actions for %s: %s", at.ActionsId, err))
utils.Logger.Err(fmt.Sprintf("Failed to get actions for %s: %s", at.ActionsID, err))
return
}
for accId, _ := range at.accountIDs {
for accID, _ := range at.accountIDs {
_, err = Guardian.Guard(func() (interface{}, error) {
ub, err := accountingStorage.GetAccount(accId)
ub, err := accountingStorage.GetAccount(accID)
if err != nil {
utils.Logger.Warning(fmt.Sprintf("Could not get user balances for this id: %s. Skipping!", accId))
utils.Logger.Warning(fmt.Sprintf("Could not get account id: %s. Skipping!", accID))
return 0, err
}
transactionFailed := false
toBeSaved := true
for _, a := range aac {
if ub.Disabled && a.ActionType != ENABLE_ACCOUNT {
continue // disabled acocunts are not removed from action plan
//return 0, fmt.Errorf("Account %s is disabled", accId)
//return 0, fmt.Errorf("Account %s is disabled", accID)
}
if expDate, parseErr := utils.ParseDate(a.ExpirationString); (a.Balance == nil || a.Balance.ExpirationDate.IsZero()) && parseErr == nil && !expDate.IsZero() {
a.Balance.ExpirationDate = expDate
@@ -285,13 +299,33 @@ func (at *ActionTiming) Execute() (err error) {
transactionFailed = true
break
}
toBeSaved = true
}
if !transactionFailed && toBeSaved {
if !transactionFailed {
accountingStorage.SetAccount(ub)
}
return 0, nil
}, 0, accId)
}, 0, accID)
}
if len(at.accountIDs) == 0 { // action timing executing without accounts
for _, a := range aac {
if expDate, parseErr := utils.ParseDate(a.ExpirationString); (a.Balance == nil || a.Balance.ExpirationDate.IsZero()) &&
parseErr == nil && !expDate.IsZero() {
a.Balance.ExpirationDate = expDate
}
actionFunction, exists := getActionFunc(a.ActionType)
if !exists {
// do not allow the action plan to be rescheduled
at.Timing = nil
utils.Logger.Err(fmt.Sprintf("Function type %v not available, aborting execution!", a.ActionType))
break
}
if err := actionFunction(nil, nil, a, aac); err != nil {
utils.Logger.Err(fmt.Sprintf("Error executing action %s: %v!", a.ActionType, err))
break
}
}
}
if err != nil {
utils.Logger.Warning(fmt.Sprintf("Error executing action plan: %v", err))
@@ -344,8 +378,8 @@ func (atpl ActionTimingPriorityList) Sort() {
ats[idx], ats = ats[len(ats)-1], ats[:len(ats)-1]
continue
}
for iAcc, accId := range at.AccountIds {
if accId == accountId {
for iAcc, accID := range at.AccountIds {
if accID == accountId {
if len(at.AccountIds) == 1 { // Only one balance, remove complete at
if len(ats) == 1 { // Removing last item, by init empty
return make([]*ActionPlan, 0)

View File

@@ -73,7 +73,6 @@ func (at *ActionTrigger) Execute(ub *Account, sq *StatsQueueTriggered) (err erro
}
at.Executed = true
transactionFailed := false
toBeSaved := true
for _, a := range aac {
if a.Balance == nil {
a.Balance = &Balance{}
@@ -91,16 +90,13 @@ func (at *ActionTrigger) Execute(ub *Account, sq *StatsQueueTriggered) (err erro
transactionFailed = false
break
}
toBeSaved = true
}
if transactionFailed || at.Recurrent {
at.Executed = false
}
if !transactionFailed && ub != nil {
storageLogger.LogActionTrigger(ub.Id, utils.RATER_SOURCE, at, aac)
if toBeSaved {
accountingStorage.SetAccount(ub)
}
accountingStorage.SetAccount(ub)
}
return
}

View File

@@ -1061,9 +1061,9 @@ func TestActionPlanLogging(t *testing.T) {
accountIDs: map[string]struct{}{"one": struct{}{}, "two": struct{}{}, "three": struct{}{}},
Timing: i,
Weight: 10.0,
ActionsId: "TEST_ACTIONS",
ActionsID: "TEST_ACTIONS",
}
as, err := ratingStorage.GetActions(at.ActionsId, false)
as, err := ratingStorage.GetActions(at.ActionsID, false)
if err != nil {
t.Error("Error getting actions for the action trigger: ", err)
}

View File

@@ -1002,7 +1002,7 @@ func TestLoadActionTimings(t *testing.T) {
},
},
Weight: 10,
ActionsId: "MINI",
ActionsID: "MINI",
},
&ActionTiming{
Timing: &RateInterval{
@@ -1015,7 +1015,7 @@ func TestLoadActionTimings(t *testing.T) {
},
},
Weight: 10,
ActionsId: "SHARED",
ActionsID: "SHARED",
},
},
}

View File

@@ -66,6 +66,8 @@ type RatingStorage interface {
GetActionPlan(string, bool) (*ActionPlan, error)
SetActionPlan(string, *ActionPlan) error
GetAllActionPlans() (map[string]*ActionPlan, error)
PushTask(*Task) error
PopTask() (*Task, error)
}
type AccountingStorage interface {

View File

@@ -33,8 +33,9 @@ import (
)
type MapStorage struct {
dict map[string][]byte
ms Marshaler
dict map[string][]byte
tasks [][]byte
ms Marshaler
}
func NewMapStorage() (*MapStorage, error) {
@@ -690,6 +691,27 @@ func (ms *MapStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error
return
}
func (ms *MapStorage) PushTask(t *Task) error {
result, err := ms.ms.Marshal(t)
if err != nil {
return err
}
ms.tasks = append(ms.tasks, result)
return nil
}
func (ms *MapStorage) PopTask() (t *Task, err error) {
if len(ms.tasks) > 0 {
var values []byte
values, ms.tasks = ms.tasks[0], ms.tasks[1:]
t = &Task{}
err = ms.ms.Unmarshal(values, t)
} else {
err = utils.ErrNotFound
}
return
}
func (ms *MapStorage) GetDerivedChargers(key string, skipCache bool) (dcs *utils.DerivedChargers, err error) {
key = utils.DERIVEDCHARGERS_PREFIX + key
if !skipCache {

View File

@@ -34,6 +34,7 @@ const (
colDst = "destinations"
colAct = "actions"
colApl = "actionplans"
colTsk = "tasks"
colAtr = "actiontriggers"
colRpl = "ratingplans"
colRpf = "ratingprofiles"
@@ -1043,10 +1044,13 @@ func (ms *MongoStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPl
ats = kv.Value
cache2go.Cache(utils.ACTION_PLAN_PREFIX+key, ats)
}
ats.AccountIDs = utils.YesDots(ats.AccountIDs)
return
}
func (ms *MongoStorage) SetActionPlan(key string, ats *ActionPlan) error {
// clean dots from account ids map
ats.AccountIDs = utils.NoDots(ats.AccountIDs)
if len(ats.ActionTimings) == 0 {
cache2go.RemKey(utils.ACTION_PLAN_PREFIX + key)
err := ms.db.C(colApl).Remove(bson.M{"key": key})
@@ -1071,12 +1075,32 @@ func (ms *MongoStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err err
ats = make(map[string]*ActionPlan, len(apls))
for key, value := range apls {
apl := value.(*ActionPlan)
apl.AccountIDs = utils.YesDots(apl.AccountIDs)
ats[key] = apl
}
return
}
func (ms *MongoStorage) PushTask(t *Task) error {
return ms.db.C(colTsk).Insert(bson.M{"_id": bson.NewObjectId(), "task": t})
}
func (ms *MongoStorage) PopTask() (t *Task, err error) {
v := struct {
ID bson.ObjectId `bson:"_id"`
Task *Task
}{}
if err = ms.db.C(colTsk).Find(nil).One(&v); err == nil {
if remErr := ms.db.C(colTsk).Remove(bson.M{"_id": v.ID}); remErr != nil {
return nil, remErr
}
t = v.Task
}
return
}
func (ms *MongoStorage) GetDerivedChargers(key string, skipCache bool) (dcs *utils.DerivedChargers, err error) {
if !skipCache {
if x, err := cache2go.Get(utils.DERIVEDCHARGERS_PREFIX + key); err == nil {

View File

@@ -886,7 +886,7 @@ func (rs *RedisStorage) SetActionTriggers(key string, atrs ActionTriggers) (err
// delete the key
return conn.Cmd("DEL", utils.ACTION_TRIGGER_PREFIX+key).Err
}
result, err := rs.ms.Marshal(&atrs)
result, err := rs.ms.Marshal(atrs)
if err != nil {
return err
}
@@ -940,6 +940,23 @@ func (rs *RedisStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err err
return
}
func (rs *RedisStorage) PushTask(t *Task) error {
result, err := rs.ms.Marshal(t)
if err != nil {
return err
}
return rs.db.Cmd("RPUSH", utils.TASKS_KEY, result).Err
}
func (rs *RedisStorage) PopTask() (t *Task, err error) {
var values []byte
if values, err = rs.db.Cmd("LPOP", utils.TASKS_KEY).Bytes(); err == nil {
t = &Task{}
err = rs.ms.Unmarshal(values, t)
}
return
}
func (rs *RedisStorage) GetDerivedChargers(key string, skipCache bool) (dcs *utils.DerivedChargers, err error) {
key = utils.DERIVEDCHARGERS_PREFIX + key
if !skipCache {
@@ -1037,7 +1054,7 @@ func (rs *RedisStorage) LogActionTrigger(ubId, source string, at *ActionTrigger,
return rs.db.Cmd("SET", utils.LOG_ACTION_TRIGGER_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%v*%v*%v", ubId, string(mat), string(mas)))).Err
}
func (rs *RedisStorage) LogActionPlan(source string, at *ActionPlan, as Actions) (err error) {
func (rs *RedisStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) {
mat, err := rs.ms.Marshal(at)
if err != nil {
return

View File

@@ -272,6 +272,41 @@ func TestDifferentUuid(t *testing.T) {
}
}
func TestStorageTask(t *testing.T) {
// clean previous unused tasks
for i := 0; i < 16; i++ {
ratingStorage.PopTask()
}
if err := ratingStorage.PushTask(&Task{Uuid: "1"}); err != nil {
t.Error("Error pushing task: ", err)
}
if err := ratingStorage.PushTask(&Task{Uuid: "2"}); err != nil {
t.Error("Error pushing task: ", err)
}
if err := ratingStorage.PushTask(&Task{Uuid: "3"}); err != nil {
t.Error("Error pushing task: ", err)
}
if err := ratingStorage.PushTask(&Task{Uuid: "4"}); err != nil {
t.Error("Error pushing task: ", err)
}
if task, err := ratingStorage.PopTask(); err != nil && task.Uuid != "1" {
t.Error("Error poping task: ", task, err)
}
if task, err := ratingStorage.PopTask(); err != nil && task.Uuid != "2" {
t.Error("Error poping task: ", task, err)
}
if task, err := ratingStorage.PopTask(); err != nil && task.Uuid != "3" {
t.Error("Error poping task: ", task, err)
}
if task, err := ratingStorage.PopTask(); err != nil && task.Uuid != "4" {
t.Error("Error poping task: ", task, err)
}
if task, err := ratingStorage.PopTask(); err == nil && task != nil {
t.Errorf("Error poping task %+v, %v: ", task, err)
}
}
/************************** Benchmarks *****************************/
func GetUB() *Account {

View File

@@ -593,7 +593,7 @@ func (tpr *TpReader) LoadActionPlans() (err error) {
StartTime: t.StartTime,
},
},
ActionsId: at.ActionsId,
ActionsID: at.ActionsId,
})
tpr.actionPlans[atId] = actPln
@@ -724,7 +724,7 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *TpAccountAction) error
StartTime: t.StartTime,
},
},
ActionsId: at.ActionsId,
ActionsID: at.ActionsId,
})
// collect action ids from timings
actionsIds = append(actionsIds, at.ActionsId)
@@ -873,7 +873,6 @@ func (tpr *TpReader) LoadAccountActions() (err error) {
return fmt.Errorf("could not get action triggers for tag %s", aa.ActionTriggersId)
}
}
ub := &Account{
Id: aa.KeyId(),
ActionTriggers: aTriggers,
@@ -1318,8 +1317,22 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose bool) (err error) {
if verbose {
log.Print("Action Plans:")
}
for k, ats := range tpr.actionPlans {
err = tpr.ratingStorage.SetActionPlan(k, ats)
for k, ap := range tpr.actionPlans {
for _, at := range ap.ActionTimings {
if at.IsASAP() {
for accID := range ap.AccountIDs {
t := &Task{
Uuid: utils.GenUUID(),
AccountID: accID,
ActionsID: at.ActionsID,
}
if err = tpr.ratingStorage.PushTask(t); err != nil {
return err
}
}
}
}
err = tpr.ratingStorage.SetActionPlan(k, ap)
if err != nil {
return err
}

View File

@@ -29,7 +29,7 @@ import (
)
type Scheduler struct {
queue engine.ActionPlanPriotityList
queue engine.ActionTimingPriorityList
timer *time.Timer
restartLoop chan bool
sync.Mutex
@@ -56,7 +56,7 @@ func (s *Scheduler) Loop() {
utils.Logger.Info(fmt.Sprintf("<Scheduler> Scheduler queue length: %v", len(s.queue)))
s.Lock()
a0 := s.queue[0]
utils.Logger.Info(fmt.Sprintf("<Scheduler> Action: %s", a0.Id))
utils.Logger.Info(fmt.Sprintf("<Scheduler> Action: %s", a0.ActionsID))
now := time.Now()
start := a0.GetNextStartTime(now)
if start.Equal(now) || start.Before(now) {
@@ -77,12 +77,12 @@ func (s *Scheduler) Loop() {
} else {
s.Unlock()
d := a0.GetNextStartTime(now).Sub(now)
utils.Logger.Info(fmt.Sprintf("<Scheduler> Time to next action (%s): %v", a0.Id, d))
utils.Logger.Info(fmt.Sprintf("<Scheduler> Time to next action (%s): %v", a0.ActionsID, d))
s.timer = time.NewTimer(d)
select {
case <-s.timer.C:
// timer has expired
utils.Logger.Info(fmt.Sprintf("<Scheduler> Time for action on %v", a0.Id))
utils.Logger.Info(fmt.Sprintf("<Scheduler> Time for action on %v", a0.ActionsID))
case <-s.restartLoop:
// nothing to do, just continue the loop
}
@@ -120,6 +120,21 @@ func (s *Scheduler) Reload(protect bool) {
}
func (s *Scheduler) loadActionPlans() {
// limit the number of concurrent tasks
var limit = make(chan bool, 10)
// execute existing tasks
for {
task, err := s.storage.PopTask()
if err != nil || task == nil {
break
}
limit <- true
go func() {
task.Execute()
<-limit
}()
}
actionPlans, err := s.storage.GetAllActionPlans()
if err != nil && err != utils.ErrNotFound {
utils.Logger.Warning(fmt.Sprintf("<Scheduler> Cannot get action plans: %v", err))
@@ -128,43 +143,25 @@ func (s *Scheduler) loadActionPlans() {
// recreate the queue
s.Lock()
defer s.Unlock()
s.queue = engine.ActionPlanPriotityList{}
for key, aps := range actionPlans {
toBeSaved := false
isAsap := false
var newApls []*engine.ActionPlan // will remove the one time runs from the database
for _, ap := range aps {
if ap.Timing == nil {
utils.Logger.Warning(fmt.Sprintf("<Scheduler> Nil timing on action plan: %+v, discarding!", ap))
s.queue = engine.ActionTimingPriorityList{}
for _, actionPlan := range actionPlans {
for _, at := range actionPlan.ActionTimings {
if at.Timing == nil {
utils.Logger.Warning(fmt.Sprintf("<Scheduler> Nil timing on action plan: %+v, discarding!", at))
continue
}
if len(ap.AccountIds) == 0 { // no accounts just ignore
if at.IsASAP() {
continue
}
isAsap = ap.IsASAP()
toBeSaved = toBeSaved || isAsap
if isAsap {
utils.Logger.Info(fmt.Sprintf("<Scheduler> Time for one time action on %v", key))
ap.Execute()
ap.AccountIds = make([]string, 0)
} else {
now := time.Now()
if ap.GetNextStartTime(now).Before(now) {
// the task is obsolete, do not add it to the queue
continue
}
s.queue = append(s.queue, ap)
now := time.Now()
if at.GetNextStartTime(now).Before(now) {
// the task is obsolete, do not add it to the queue
continue
}
// save even asap action plans with empty account id list
newApls = append(newApls, ap)
}
if toBeSaved {
engine.Guardian.Guard(func() (interface{}, error) {
s.storage.SetActionPlans(key, newApls)
s.storage.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}})
return 0, nil
}, 0, utils.ACTION_PLAN_PREFIX)
at.SetAccountIDs(actionPlan.AccountIDs) // copy the accounts
s.queue = append(s.queue, at)
}
}
sort.Sort(s.queue)
@@ -180,6 +177,6 @@ func (s *Scheduler) restart() {
}
}
func (s *Scheduler) GetQueue() engine.ActionPlanPriotityList {
func (s *Scheduler) GetQueue() engine.ActionTimingPriorityList {
return s.queue
}

View File

@@ -169,6 +169,7 @@ const (
ASR = "ASR"
ACD = "ACD"
FILTER_REGEXP_TPL = "$1$2$3$4$5"
TASKS_KEY = "tasks"
ACTION_PLAN_PREFIX = "apl_"
ACTION_TRIGGER_PREFIX = "atr_"
RATING_PLAN_PREFIX = "rpl_"

View File

@@ -143,3 +143,20 @@ func (sm StringMap) GetOne() string {
}
return ""
}
func NoDots(m map[string]struct{}) map[string]struct{} {
return MapKeysReplace(m, ".", "")
}
func YesDots(m map[string]struct{}) map[string]struct{} {
return MapKeysReplace(m, "", ".")
}
func MapKeysReplace(m map[string]struct{}, old, new string) map[string]struct{} {
for key, val := range m {
delete(m, key)
key = strings.Replace(key, old, new, -1)
m[key] = val
}
return m
}