From 65516e201e4d41bb545d442ea45229948ef3cf7c Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Thu, 17 Dec 2015 19:10:38 +0200 Subject: [PATCH] using tasks and action plans with no accounts --- engine/action_plan.go | 70 +++++++++++++++++++++++++++---------- engine/action_trigger.go | 6 +--- engine/actions_test.go | 4 +-- engine/loader_csv_test.go | 4 +-- engine/storage_interface.go | 2 ++ engine/storage_map.go | 26 ++++++++++++-- engine/storage_mongo.go | 24 +++++++++++++ engine/storage_redis.go | 21 +++++++++-- engine/storage_test.go | 35 +++++++++++++++++++ engine/tp_reader.go | 23 +++++++++--- scheduler/scheduler.go | 69 +++++++++++++++++------------------- utils/consts.go | 1 + utils/map.go | 17 +++++++++ 13 files changed, 230 insertions(+), 72 deletions(-) diff --git a/engine/action_plan.go b/engine/action_plan.go index 231c9568e..e5914c79c 100644 --- a/engine/action_plan.go +++ b/engine/action_plan.go @@ -34,11 +34,17 @@ const ( type ActionTiming struct { Uuid string Timing *RateInterval - ActionsId string + ActionsID string Weight float64 actions Actions - accountIDs map[string]struct{} - stCache time.Time // cached time of the next start + accountIDs map[string]struct{} // copy of action plans accounts + stCache time.Time // cached time of the next start +} + +type Task struct { + Uuid string + AccountID string + ActionsID string } type ActionPlan struct { @@ -55,6 +61,14 @@ func (apl *ActionPlan) RemoveAccountID(accID string) (found bool) { return } +func (t *Task) Execute() error { + return (&ActionTiming{ + Uuid: t.Uuid, + ActionsID: t.ActionsID, + accountIDs: map[string]struct{}{t.AccountID: struct{}{}}, + }).Execute() +} + func (at *ActionTiming) GetNextStartTime(now time.Time) (t time.Time) { if !at.stCache.IsZero() { return at.stCache @@ -236,37 +250,37 @@ func (at *ActionTiming) SetActions(as Actions) { at.actions = as } +func (at *ActionTiming) SetAccountIDs(accIDs map[string]struct{}) { + at.accountIDs = accIDs +} + func (at *ActionTiming) getActions() (as []*Action, err error) { if at.actions == nil { - at.actions, err = ratingStorage.GetActions(at.ActionsId, false) + at.actions, err = ratingStorage.GetActions(at.ActionsID, false) } at.actions.Sort() return at.actions, err } func (at *ActionTiming) Execute() (err error) { - if len(at.accountIDs) == 0 { // nothing to do if no accounts set - return - } at.ResetStartTimeCache() aac, err := at.getActions() if err != nil { - utils.Logger.Err(fmt.Sprintf("Failed to get actions for %s: %s", at.ActionsId, err)) + utils.Logger.Err(fmt.Sprintf("Failed to get actions for %s: %s", at.ActionsID, err)) return } - for accId, _ := range at.accountIDs { + for accID, _ := range at.accountIDs { _, err = Guardian.Guard(func() (interface{}, error) { - ub, err := accountingStorage.GetAccount(accId) + ub, err := accountingStorage.GetAccount(accID) if err != nil { - utils.Logger.Warning(fmt.Sprintf("Could not get user balances for this id: %s. Skipping!", accId)) + utils.Logger.Warning(fmt.Sprintf("Could not get account id: %s. Skipping!", accID)) return 0, err } transactionFailed := false - toBeSaved := true for _, a := range aac { if ub.Disabled && a.ActionType != ENABLE_ACCOUNT { continue // disabled acocunts are not removed from action plan - //return 0, fmt.Errorf("Account %s is disabled", accId) + //return 0, fmt.Errorf("Account %s is disabled", accID) } if expDate, parseErr := utils.ParseDate(a.ExpirationString); (a.Balance == nil || a.Balance.ExpirationDate.IsZero()) && parseErr == nil && !expDate.IsZero() { a.Balance.ExpirationDate = expDate @@ -285,13 +299,33 @@ func (at *ActionTiming) Execute() (err error) { transactionFailed = true break } - toBeSaved = true } - if !transactionFailed && toBeSaved { + if !transactionFailed { accountingStorage.SetAccount(ub) } return 0, nil - }, 0, accId) + }, 0, accID) + } + if len(at.accountIDs) == 0 { // action timing executing without accounts + for _, a := range aac { + + if expDate, parseErr := utils.ParseDate(a.ExpirationString); (a.Balance == nil || a.Balance.ExpirationDate.IsZero()) && + parseErr == nil && !expDate.IsZero() { + a.Balance.ExpirationDate = expDate + } + + actionFunction, exists := getActionFunc(a.ActionType) + if !exists { + // do not allow the action plan to be rescheduled + at.Timing = nil + utils.Logger.Err(fmt.Sprintf("Function type %v not available, aborting execution!", a.ActionType)) + break + } + if err := actionFunction(nil, nil, a, aac); err != nil { + utils.Logger.Err(fmt.Sprintf("Error executing action %s: %v!", a.ActionType, err)) + break + } + } } if err != nil { utils.Logger.Warning(fmt.Sprintf("Error executing action plan: %v", err)) @@ -344,8 +378,8 @@ func (atpl ActionTimingPriorityList) Sort() { ats[idx], ats = ats[len(ats)-1], ats[:len(ats)-1] continue } - for iAcc, accId := range at.AccountIds { - if accId == accountId { + for iAcc, accID := range at.AccountIds { + if accID == accountId { if len(at.AccountIds) == 1 { // Only one balance, remove complete at if len(ats) == 1 { // Removing last item, by init empty return make([]*ActionPlan, 0) diff --git a/engine/action_trigger.go b/engine/action_trigger.go index c7bec094b..9c3d16178 100644 --- a/engine/action_trigger.go +++ b/engine/action_trigger.go @@ -73,7 +73,6 @@ func (at *ActionTrigger) Execute(ub *Account, sq *StatsQueueTriggered) (err erro } at.Executed = true transactionFailed := false - toBeSaved := true for _, a := range aac { if a.Balance == nil { a.Balance = &Balance{} @@ -91,16 +90,13 @@ func (at *ActionTrigger) Execute(ub *Account, sq *StatsQueueTriggered) (err erro transactionFailed = false break } - toBeSaved = true } if transactionFailed || at.Recurrent { at.Executed = false } if !transactionFailed && ub != nil { storageLogger.LogActionTrigger(ub.Id, utils.RATER_SOURCE, at, aac) - if toBeSaved { - accountingStorage.SetAccount(ub) - } + accountingStorage.SetAccount(ub) } return } diff --git a/engine/actions_test.go b/engine/actions_test.go index b96003200..75b6ef9b9 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -1061,9 +1061,9 @@ func TestActionPlanLogging(t *testing.T) { accountIDs: map[string]struct{}{"one": struct{}{}, "two": struct{}{}, "three": struct{}{}}, Timing: i, Weight: 10.0, - ActionsId: "TEST_ACTIONS", + ActionsID: "TEST_ACTIONS", } - as, err := ratingStorage.GetActions(at.ActionsId, false) + as, err := ratingStorage.GetActions(at.ActionsID, false) if err != nil { t.Error("Error getting actions for the action trigger: ", err) } diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index e01df4be7..e0e24f13f 100644 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -1002,7 +1002,7 @@ func TestLoadActionTimings(t *testing.T) { }, }, Weight: 10, - ActionsId: "MINI", + ActionsID: "MINI", }, &ActionTiming{ Timing: &RateInterval{ @@ -1015,7 +1015,7 @@ func TestLoadActionTimings(t *testing.T) { }, }, Weight: 10, - ActionsId: "SHARED", + ActionsID: "SHARED", }, }, } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 916cd511c..ee67ad681 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -66,6 +66,8 @@ type RatingStorage interface { GetActionPlan(string, bool) (*ActionPlan, error) SetActionPlan(string, *ActionPlan) error GetAllActionPlans() (map[string]*ActionPlan, error) + PushTask(*Task) error + PopTask() (*Task, error) } type AccountingStorage interface { diff --git a/engine/storage_map.go b/engine/storage_map.go index 776ffcaff..a362e1e01 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -33,8 +33,9 @@ import ( ) type MapStorage struct { - dict map[string][]byte - ms Marshaler + dict map[string][]byte + tasks [][]byte + ms Marshaler } func NewMapStorage() (*MapStorage, error) { @@ -690,6 +691,27 @@ func (ms *MapStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error return } +func (ms *MapStorage) PushTask(t *Task) error { + result, err := ms.ms.Marshal(t) + if err != nil { + return err + } + ms.tasks = append(ms.tasks, result) + return nil +} + +func (ms *MapStorage) PopTask() (t *Task, err error) { + if len(ms.tasks) > 0 { + var values []byte + values, ms.tasks = ms.tasks[0], ms.tasks[1:] + t = &Task{} + err = ms.ms.Unmarshal(values, t) + } else { + err = utils.ErrNotFound + } + return +} + func (ms *MapStorage) GetDerivedChargers(key string, skipCache bool) (dcs *utils.DerivedChargers, err error) { key = utils.DERIVEDCHARGERS_PREFIX + key if !skipCache { diff --git a/engine/storage_mongo.go b/engine/storage_mongo.go index 510442356..3f33bdf61 100644 --- a/engine/storage_mongo.go +++ b/engine/storage_mongo.go @@ -34,6 +34,7 @@ const ( colDst = "destinations" colAct = "actions" colApl = "actionplans" + colTsk = "tasks" colAtr = "actiontriggers" colRpl = "ratingplans" colRpf = "ratingprofiles" @@ -1043,10 +1044,13 @@ func (ms *MongoStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPl ats = kv.Value cache2go.Cache(utils.ACTION_PLAN_PREFIX+key, ats) } + ats.AccountIDs = utils.YesDots(ats.AccountIDs) return } func (ms *MongoStorage) SetActionPlan(key string, ats *ActionPlan) error { + // clean dots from account ids map + ats.AccountIDs = utils.NoDots(ats.AccountIDs) if len(ats.ActionTimings) == 0 { cache2go.RemKey(utils.ACTION_PLAN_PREFIX + key) err := ms.db.C(colApl).Remove(bson.M{"key": key}) @@ -1071,12 +1075,32 @@ func (ms *MongoStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err err ats = make(map[string]*ActionPlan, len(apls)) for key, value := range apls { apl := value.(*ActionPlan) + apl.AccountIDs = utils.YesDots(apl.AccountIDs) ats[key] = apl } return } +func (ms *MongoStorage) PushTask(t *Task) error { + return ms.db.C(colTsk).Insert(bson.M{"_id": bson.NewObjectId(), "task": t}) +} + +func (ms *MongoStorage) PopTask() (t *Task, err error) { + v := struct { + ID bson.ObjectId `bson:"_id"` + Task *Task + }{} + if err = ms.db.C(colTsk).Find(nil).One(&v); err == nil { + if remErr := ms.db.C(colTsk).Remove(bson.M{"_id": v.ID}); remErr != nil { + return nil, remErr + } + t = v.Task + } + + return +} + func (ms *MongoStorage) GetDerivedChargers(key string, skipCache bool) (dcs *utils.DerivedChargers, err error) { if !skipCache { if x, err := cache2go.Get(utils.DERIVEDCHARGERS_PREFIX + key); err == nil { diff --git a/engine/storage_redis.go b/engine/storage_redis.go index e8bb6d493..4f252548b 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -886,7 +886,7 @@ func (rs *RedisStorage) SetActionTriggers(key string, atrs ActionTriggers) (err // delete the key return conn.Cmd("DEL", utils.ACTION_TRIGGER_PREFIX+key).Err } - result, err := rs.ms.Marshal(&atrs) + result, err := rs.ms.Marshal(atrs) if err != nil { return err } @@ -940,6 +940,23 @@ func (rs *RedisStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err err return } +func (rs *RedisStorage) PushTask(t *Task) error { + result, err := rs.ms.Marshal(t) + if err != nil { + return err + } + return rs.db.Cmd("RPUSH", utils.TASKS_KEY, result).Err +} + +func (rs *RedisStorage) PopTask() (t *Task, err error) { + var values []byte + if values, err = rs.db.Cmd("LPOP", utils.TASKS_KEY).Bytes(); err == nil { + t = &Task{} + err = rs.ms.Unmarshal(values, t) + } + return +} + func (rs *RedisStorage) GetDerivedChargers(key string, skipCache bool) (dcs *utils.DerivedChargers, err error) { key = utils.DERIVEDCHARGERS_PREFIX + key if !skipCache { @@ -1037,7 +1054,7 @@ func (rs *RedisStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, return rs.db.Cmd("SET", utils.LOG_ACTION_TRIGGER_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%v*%v*%v", ubId, string(mat), string(mas)))).Err } -func (rs *RedisStorage) LogActionPlan(source string, at *ActionPlan, as Actions) (err error) { +func (rs *RedisStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) { mat, err := rs.ms.Marshal(at) if err != nil { return diff --git a/engine/storage_test.go b/engine/storage_test.go index cd5cf9144..375979fa9 100644 --- a/engine/storage_test.go +++ b/engine/storage_test.go @@ -272,6 +272,41 @@ func TestDifferentUuid(t *testing.T) { } } +func TestStorageTask(t *testing.T) { + // clean previous unused tasks + for i := 0; i < 16; i++ { + ratingStorage.PopTask() + } + + if err := ratingStorage.PushTask(&Task{Uuid: "1"}); err != nil { + t.Error("Error pushing task: ", err) + } + if err := ratingStorage.PushTask(&Task{Uuid: "2"}); err != nil { + t.Error("Error pushing task: ", err) + } + if err := ratingStorage.PushTask(&Task{Uuid: "3"}); err != nil { + t.Error("Error pushing task: ", err) + } + if err := ratingStorage.PushTask(&Task{Uuid: "4"}); err != nil { + t.Error("Error pushing task: ", err) + } + if task, err := ratingStorage.PopTask(); err != nil && task.Uuid != "1" { + t.Error("Error poping task: ", task, err) + } + if task, err := ratingStorage.PopTask(); err != nil && task.Uuid != "2" { + t.Error("Error poping task: ", task, err) + } + if task, err := ratingStorage.PopTask(); err != nil && task.Uuid != "3" { + t.Error("Error poping task: ", task, err) + } + if task, err := ratingStorage.PopTask(); err != nil && task.Uuid != "4" { + t.Error("Error poping task: ", task, err) + } + if task, err := ratingStorage.PopTask(); err == nil && task != nil { + t.Errorf("Error poping task %+v, %v: ", task, err) + } +} + /************************** Benchmarks *****************************/ func GetUB() *Account { diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 912d1a629..abafa2811 100644 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -593,7 +593,7 @@ func (tpr *TpReader) LoadActionPlans() (err error) { StartTime: t.StartTime, }, }, - ActionsId: at.ActionsId, + ActionsID: at.ActionsId, }) tpr.actionPlans[atId] = actPln @@ -724,7 +724,7 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *TpAccountAction) error StartTime: t.StartTime, }, }, - ActionsId: at.ActionsId, + ActionsID: at.ActionsId, }) // collect action ids from timings actionsIds = append(actionsIds, at.ActionsId) @@ -873,7 +873,6 @@ func (tpr *TpReader) LoadAccountActions() (err error) { return fmt.Errorf("could not get action triggers for tag %s", aa.ActionTriggersId) } } - ub := &Account{ Id: aa.KeyId(), ActionTriggers: aTriggers, @@ -1318,8 +1317,22 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose bool) (err error) { if verbose { log.Print("Action Plans:") } - for k, ats := range tpr.actionPlans { - err = tpr.ratingStorage.SetActionPlan(k, ats) + for k, ap := range tpr.actionPlans { + for _, at := range ap.ActionTimings { + if at.IsASAP() { + for accID := range ap.AccountIDs { + t := &Task{ + Uuid: utils.GenUUID(), + AccountID: accID, + ActionsID: at.ActionsID, + } + if err = tpr.ratingStorage.PushTask(t); err != nil { + return err + } + } + } + } + err = tpr.ratingStorage.SetActionPlan(k, ap) if err != nil { return err } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 5b29216ec..fecc15608 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -29,7 +29,7 @@ import ( ) type Scheduler struct { - queue engine.ActionPlanPriotityList + queue engine.ActionTimingPriorityList timer *time.Timer restartLoop chan bool sync.Mutex @@ -56,7 +56,7 @@ func (s *Scheduler) Loop() { utils.Logger.Info(fmt.Sprintf(" Scheduler queue length: %v", len(s.queue))) s.Lock() a0 := s.queue[0] - utils.Logger.Info(fmt.Sprintf(" Action: %s", a0.Id)) + utils.Logger.Info(fmt.Sprintf(" Action: %s", a0.ActionsID)) now := time.Now() start := a0.GetNextStartTime(now) if start.Equal(now) || start.Before(now) { @@ -77,12 +77,12 @@ func (s *Scheduler) Loop() { } else { s.Unlock() d := a0.GetNextStartTime(now).Sub(now) - utils.Logger.Info(fmt.Sprintf(" Time to next action (%s): %v", a0.Id, d)) + utils.Logger.Info(fmt.Sprintf(" Time to next action (%s): %v", a0.ActionsID, d)) s.timer = time.NewTimer(d) select { case <-s.timer.C: // timer has expired - utils.Logger.Info(fmt.Sprintf(" Time for action on %v", a0.Id)) + utils.Logger.Info(fmt.Sprintf(" Time for action on %v", a0.ActionsID)) case <-s.restartLoop: // nothing to do, just continue the loop } @@ -120,6 +120,21 @@ func (s *Scheduler) Reload(protect bool) { } func (s *Scheduler) loadActionPlans() { + // limit the number of concurrent tasks + var limit = make(chan bool, 10) + // execute existing tasks + for { + task, err := s.storage.PopTask() + if err != nil || task == nil { + break + } + limit <- true + go func() { + task.Execute() + <-limit + }() + } + actionPlans, err := s.storage.GetAllActionPlans() if err != nil && err != utils.ErrNotFound { utils.Logger.Warning(fmt.Sprintf(" Cannot get action plans: %v", err)) @@ -128,43 +143,25 @@ func (s *Scheduler) loadActionPlans() { // recreate the queue s.Lock() defer s.Unlock() - s.queue = engine.ActionPlanPriotityList{} - for key, aps := range actionPlans { - toBeSaved := false - isAsap := false - var newApls []*engine.ActionPlan // will remove the one time runs from the database - for _, ap := range aps { - if ap.Timing == nil { - utils.Logger.Warning(fmt.Sprintf(" Nil timing on action plan: %+v, discarding!", ap)) + s.queue = engine.ActionTimingPriorityList{} + for _, actionPlan := range actionPlans { + for _, at := range actionPlan.ActionTimings { + if at.Timing == nil { + utils.Logger.Warning(fmt.Sprintf(" Nil timing on action plan: %+v, discarding!", at)) continue } - if len(ap.AccountIds) == 0 { // no accounts just ignore + if at.IsASAP() { continue } - isAsap = ap.IsASAP() - toBeSaved = toBeSaved || isAsap - if isAsap { - utils.Logger.Info(fmt.Sprintf(" Time for one time action on %v", key)) - ap.Execute() - ap.AccountIds = make([]string, 0) - } else { - now := time.Now() - if ap.GetNextStartTime(now).Before(now) { - // the task is obsolete, do not add it to the queue - continue - } - s.queue = append(s.queue, ap) + now := time.Now() + if at.GetNextStartTime(now).Before(now) { + // the task is obsolete, do not add it to the queue + continue } - // save even asap action plans with empty account id list - newApls = append(newApls, ap) - } - if toBeSaved { - engine.Guardian.Guard(func() (interface{}, error) { - s.storage.SetActionPlans(key, newApls) - s.storage.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}}) - return 0, nil - }, 0, utils.ACTION_PLAN_PREFIX) + at.SetAccountIDs(actionPlan.AccountIDs) // copy the accounts + s.queue = append(s.queue, at) + } } sort.Sort(s.queue) @@ -180,6 +177,6 @@ func (s *Scheduler) restart() { } } -func (s *Scheduler) GetQueue() engine.ActionPlanPriotityList { +func (s *Scheduler) GetQueue() engine.ActionTimingPriorityList { return s.queue } diff --git a/utils/consts.go b/utils/consts.go index 38e56d530..492ced815 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -169,6 +169,7 @@ const ( ASR = "ASR" ACD = "ACD" FILTER_REGEXP_TPL = "$1$2$3$4$5" + TASKS_KEY = "tasks" ACTION_PLAN_PREFIX = "apl_" ACTION_TRIGGER_PREFIX = "atr_" RATING_PLAN_PREFIX = "rpl_" diff --git a/utils/map.go b/utils/map.go index 5a71ca506..90bce4cd4 100644 --- a/utils/map.go +++ b/utils/map.go @@ -143,3 +143,20 @@ func (sm StringMap) GetOne() string { } return "" } + +func NoDots(m map[string]struct{}) map[string]struct{} { + return MapKeysReplace(m, ".", ".") +} + +func YesDots(m map[string]struct{}) map[string]struct{} { + return MapKeysReplace(m, ".", ".") +} + +func MapKeysReplace(m map[string]struct{}, old, new string) map[string]struct{} { + for key, val := range m { + delete(m, key) + key = strings.Replace(key, old, new, -1) + m[key] = val + } + return m +}