diff --git a/apier/v1/accounts.go b/apier/v1/accounts.go index 087cb614e..f0213d955 100644 --- a/apier/v1/accounts.go +++ b/apier/v1/accounts.go @@ -99,8 +99,7 @@ func (self *ApierV1) RemActionTiming(attrs AttrRemActionTiming, reply *string) e return utils.NewErrServerError(err) } if attrs.ReloadScheduler && self.Sched != nil { - self.Sched.LoadActionPlans(self.RatingDb) - self.Sched.Restart() + self.Sched.Reload(true) } *reply = OK return nil @@ -224,8 +223,7 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) error if schedulerReloadNeeded { // reload scheduler if self.Sched != nil { - self.Sched.LoadActionPlans(self.RatingDb) - self.Sched.Restart() + self.Sched.Reload(true) } } *reply = OK // This will mark saving of the account, error still can show up in actionTimingsId @@ -283,8 +281,7 @@ func (self *ApierV1) RemoveAccount(attr utils.AttrRemoveAccount, reply *string) if schedulerReloadNeeded { // reload scheduler if self.Sched != nil { - self.Sched.LoadActionPlans(self.RatingDb) - self.Sched.Restart() + self.Sched.Reload(true) } } *reply = OK diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 15ce611c9..2bf808db5 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -519,8 +519,7 @@ func (self *ApierV1) LoadTariffPlanFromStorDb(attrs AttrLoadTpFromStorDb, reply if len(aps) != 0 && self.Sched != nil { utils.Logger.Info("ApierV1.LoadTariffPlanFromStorDb, reloading scheduler.") - self.Sched.LoadActionPlans(self.RatingDb) - self.Sched.Restart() + self.Sched.Reload(true) } if len(cstKeys) != 0 && self.CdrStatsSrv != nil { @@ -768,8 +767,7 @@ func (self *ApierV1) SetActionPlan(attrs AttrSetActionPlan, reply *string) error if self.Sched == nil { return errors.New("SCHEDULER_NOT_ENABLED") } - self.Sched.LoadActionPlans(self.RatingDb) - self.Sched.Restart() + self.Sched.Reload(true) } *reply = OK return nil @@ -952,8 +950,7 @@ func (self *ApierV1) LoadAccountActions(attrs utils.TPAccountActions, reply *str return err } if self.Sched != nil { - self.Sched.LoadActionPlans(self.RatingDb) - self.Sched.Restart() + self.Sched.Reload(true) } *reply = OK return nil @@ -963,8 +960,7 @@ func (self *ApierV1) ReloadScheduler(input string, reply *string) error { if self.Sched == nil { return utils.ErrNotFound } - self.Sched.LoadActionPlans(self.RatingDb) - self.Sched.Restart() + self.Sched.Reload(true) *reply = OK return nil } @@ -1208,8 +1204,7 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, } if len(aps) != 0 && self.Sched != nil { utils.Logger.Info("ApierV1.LoadTariffPlanFromFolder, reloading scheduler.") - self.Sched.LoadActionPlans(self.RatingDb) - self.Sched.Restart() + self.Sched.Reload(true) } if len(cstKeys) != 0 && self.CdrStatsSrv != nil { if err := self.CdrStatsSrv.ReloadQueues(cstKeys, nil); err != nil { diff --git a/apier/v2/apier.go b/apier/v2/apier.go index e35c48a7d..90b146480 100644 --- a/apier/v2/apier.go +++ b/apier/v2/apier.go @@ -91,8 +91,7 @@ func (self *ApierV2) LoadAccountActions(attrs AttrLoadAccountActions, reply *str return err } if self.Sched != nil { - self.Sched.LoadActionPlans(self.RatingDb) - self.Sched.Restart() + self.Sched.Reload(true) } *reply = v1.OK return nil @@ -249,8 +248,7 @@ func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, } if len(aps) != 0 && self.Sched != nil { utils.Logger.Info("ApierV1.LoadTariffPlanFromFolder, reloading scheduler.") - self.Sched.LoadActionPlans(self.RatingDb) - self.Sched.Restart() + self.Sched.Reload(true) } if len(cstKeys) != 0 && self.CdrStatsSrv != nil { if err := self.CdrStatsSrv.ReloadQueues(cstKeys, nil); err != nil { diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 3e819fa82..452b8cda5 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -489,11 +489,11 @@ func startScheduler(internalSchedulerChan chan *scheduler.Scheduler, cacheDoneCh cacheDone := <-cacheDoneChan cacheDoneChan <- cacheDone utils.Logger.Info("Starting CGRateS Scheduler.") - sched := scheduler.NewScheduler() + sched := scheduler.NewScheduler(ratingDb) go reloadSchedulerSingnalHandler(sched, ratingDb) time.Sleep(1) internalSchedulerChan <- sched - sched.LoadActionPlans(ratingDb) + sched.Reload(true) sched.Loop() exitChan <- true // Should not get out of loop though } diff --git a/cmd/cgr-engine/registration.go b/cmd/cgr-engine/registration.go index ec856d6f5..985d469bb 100644 --- a/cmd/cgr-engine/registration.go +++ b/cmd/cgr-engine/registration.go @@ -125,9 +125,7 @@ func reloadSchedulerSingnalHandler(sched *scheduler.Scheduler, getter engine.Rat sig := <-c utils.Logger.Info(fmt.Sprintf("Caught signal %v, reloading action timings.\n", sig)) - sched.LoadActionPlans(getter) - // check the tip of the queue for new actions - sched.Restart() + sched.Reload(true) } } diff --git a/console/account_add.go b/console/account_set.go similarity index 98% rename from console/account_add.go rename to console/account_set.go index 5875a4fce..d81b0c38e 100644 --- a/console/account_add.go +++ b/console/account_set.go @@ -22,7 +22,7 @@ import "github.com/cgrates/cgrates/utils" func init() { c := &CmdAddAccount{ - name: "account_add", + name: "account_set", rpcMethod: "ApierV1.SetAccount", } commands[c.Name()] = c diff --git a/engine/rateinterval.go b/engine/rateinterval.go index e11f731e3..9a330d029 100644 --- a/engine/rateinterval.go +++ b/engine/rateinterval.go @@ -58,10 +58,11 @@ func (rit *RITiming) CronString() string { hour, min, sec = "*", "*", "*" } else { hms := strings.Split(rit.StartTime, ":") - if len(hms) != 3 { + if len(hms) == 3 { + hour, min, sec = hms[0], hms[1], hms[2] + } else { hour, min, sec = "*", "*", "*" } - hour, min, sec = hms[0], hms[1], hms[2] if strings.HasPrefix(hour, "0") { hour = hour[1:] } diff --git a/general_tests/ddazmbl1_test.go b/general_tests/ddazmbl1_test.go index 34c8a2988..7155f41da 100644 --- a/general_tests/ddazmbl1_test.go +++ b/general_tests/ddazmbl1_test.go @@ -129,8 +129,8 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10` } func TestExecuteActions(t *testing.T) { - scheduler.NewScheduler().LoadActionPlans(ratingDb) - time.Sleep(time.Millisecond) // Give time to scheduler to topup the account + scheduler.NewScheduler(ratingDb).Reload(false) + time.Sleep(time.Second) // Give time to scheduler to topup the account if acnt, err := acntDb.GetAccount("cgrates.org:12344"); err != nil { t.Error(err) } else if len(acnt.BalanceMap) != 2 { diff --git a/general_tests/ddazmbl2_test.go b/general_tests/ddazmbl2_test.go index 1a0004d1a..eab079787 100644 --- a/general_tests/ddazmbl2_test.go +++ b/general_tests/ddazmbl2_test.go @@ -128,7 +128,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10` } func TestExecuteActions2(t *testing.T) { - scheduler.NewScheduler().LoadActionPlans(ratingDb2) + scheduler.NewScheduler(ratingDb2).Reload(false) time.Sleep(time.Millisecond) // Give time to scheduler to topup the account if acnt, err := acntDb2.GetAccount("cgrates.org:12345"); err != nil { t.Error(err) diff --git a/general_tests/ddazmbl3_test.go b/general_tests/ddazmbl3_test.go index 0b49f8ea7..0ba5d7a84 100644 --- a/general_tests/ddazmbl3_test.go +++ b/general_tests/ddazmbl3_test.go @@ -126,7 +126,7 @@ RP_UK,DR_UK_Mobile_BIG5,ALWAYS,10` } func TestExecuteActions3(t *testing.T) { - scheduler.NewScheduler().LoadActionPlans(ratingDb3) + scheduler.NewScheduler(ratingDb3).Reload(false) time.Sleep(time.Millisecond) // Give time to scheduler to topup the account if acnt, err := acntDb3.GetAccount("cgrates.org:12346"); err != nil { t.Error(err) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 95a72c1fd..7a891def1 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -33,21 +33,30 @@ type Scheduler struct { timer *time.Timer restartLoop chan bool sync.Mutex + storage engine.RatingStorage + waitingReload bool + loopChecker chan int + schedulerStarted bool } -func NewScheduler() *Scheduler { - return &Scheduler{restartLoop: make(chan bool)} +func NewScheduler(storage engine.RatingStorage) *Scheduler { + return &Scheduler{ + restartLoop: make(chan bool), + storage: storage, + loopChecker: make(chan int), + } } func (s *Scheduler) Loop() { + s.schedulerStarted = true for { for len(s.queue) == 0 { //hang here if empty <-s.restartLoop } + utils.Logger.Info(fmt.Sprintf(" Scheduler queue length: %v", len(s.queue))) s.Lock() a0 := s.queue[0] utils.Logger.Info(fmt.Sprintf(" Action: %s", a0.Id)) - utils.Logger.Info(fmt.Sprintf(" Scheduler queue length: %v", len(s.queue))) now := time.Now() start := a0.GetNextStartTime(now) if start.Equal(now) || start.Before(now) { @@ -81,15 +90,40 @@ func (s *Scheduler) Loop() { } } -func (s *Scheduler) LoadActionPlans(storage engine.RatingStorage) { - actionPlans, err := storage.GetAllActionPlans() +func (s *Scheduler) Reload(protect bool) { + s.Lock() + defer s.Unlock() + + if protect { + if s.waitingReload { + s.loopChecker <- 1 + } + s.waitingReload = true + go func() { + t := time.NewTicker(time.Second) // wait 1 second before start + select { + case <-s.loopChecker: + t.Stop() // cancel reload + case <-t.C: + s.LoadActionPlans() + s.Restart() + t.Stop() + s.waitingReload = false + } + }() + } else { + s.LoadActionPlans() + s.Restart() + } +} + +func (s *Scheduler) LoadActionPlans() { + actionPlans, err := s.storage.GetAllActionPlans() if err != nil && err != utils.ErrNotFound { utils.Logger.Warning(fmt.Sprintf(" Cannot get action plans: %v", err)) } utils.Logger.Info(fmt.Sprintf(" processing %d action plans", len(actionPlans))) // recreate the queue - s.Lock() - defer s.Unlock() s.queue = engine.ActionPlanPriotityList{} for key, aps := range actionPlans { toBeSaved := false @@ -123,8 +157,8 @@ func (s *Scheduler) LoadActionPlans(storage engine.RatingStorage) { } if toBeSaved { engine.Guardian.Guard(func() (interface{}, error) { - storage.SetActionPlans(key, newApls) - storage.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}}) + s.storage.SetActionPlans(key, newApls) + s.storage.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}}) return 0, nil }, 0, utils.ACTION_PLAN_PREFIX) } @@ -134,7 +168,9 @@ func (s *Scheduler) LoadActionPlans(storage engine.RatingStorage) { } func (s *Scheduler) Restart() { - s.restartLoop <- true + if s.schedulerStarted { + s.restartLoop <- true + } if s.timer != nil { s.timer.Stop() }