mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 10:06:24 +05:00
flood protection for scheduler
This commit is contained in:
@@ -99,8 +99,7 @@ func (self *ApierV1) RemActionTiming(attrs AttrRemActionTiming, reply *string) e
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
if attrs.ReloadScheduler && self.Sched != nil {
|
||||
self.Sched.LoadActionPlans(self.RatingDb)
|
||||
self.Sched.Restart()
|
||||
self.Sched.Reload(true)
|
||||
}
|
||||
*reply = OK
|
||||
return nil
|
||||
@@ -224,8 +223,7 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) error
|
||||
if schedulerReloadNeeded {
|
||||
// reload scheduler
|
||||
if self.Sched != nil {
|
||||
self.Sched.LoadActionPlans(self.RatingDb)
|
||||
self.Sched.Restart()
|
||||
self.Sched.Reload(true)
|
||||
}
|
||||
}
|
||||
*reply = OK // This will mark saving of the account, error still can show up in actionTimingsId
|
||||
@@ -283,8 +281,7 @@ func (self *ApierV1) RemoveAccount(attr utils.AttrRemoveAccount, reply *string)
|
||||
if schedulerReloadNeeded {
|
||||
// reload scheduler
|
||||
if self.Sched != nil {
|
||||
self.Sched.LoadActionPlans(self.RatingDb)
|
||||
self.Sched.Restart()
|
||||
self.Sched.Reload(true)
|
||||
}
|
||||
}
|
||||
*reply = OK
|
||||
|
||||
@@ -519,8 +519,7 @@ func (self *ApierV1) LoadTariffPlanFromStorDb(attrs AttrLoadTpFromStorDb, reply
|
||||
|
||||
if len(aps) != 0 && self.Sched != nil {
|
||||
utils.Logger.Info("ApierV1.LoadTariffPlanFromStorDb, reloading scheduler.")
|
||||
self.Sched.LoadActionPlans(self.RatingDb)
|
||||
self.Sched.Restart()
|
||||
self.Sched.Reload(true)
|
||||
}
|
||||
|
||||
if len(cstKeys) != 0 && self.CdrStatsSrv != nil {
|
||||
@@ -768,8 +767,7 @@ func (self *ApierV1) SetActionPlan(attrs AttrSetActionPlan, reply *string) error
|
||||
if self.Sched == nil {
|
||||
return errors.New("SCHEDULER_NOT_ENABLED")
|
||||
}
|
||||
self.Sched.LoadActionPlans(self.RatingDb)
|
||||
self.Sched.Restart()
|
||||
self.Sched.Reload(true)
|
||||
}
|
||||
*reply = OK
|
||||
return nil
|
||||
@@ -952,8 +950,7 @@ func (self *ApierV1) LoadAccountActions(attrs utils.TPAccountActions, reply *str
|
||||
return err
|
||||
}
|
||||
if self.Sched != nil {
|
||||
self.Sched.LoadActionPlans(self.RatingDb)
|
||||
self.Sched.Restart()
|
||||
self.Sched.Reload(true)
|
||||
}
|
||||
*reply = OK
|
||||
return nil
|
||||
@@ -963,8 +960,7 @@ func (self *ApierV1) ReloadScheduler(input string, reply *string) error {
|
||||
if self.Sched == nil {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
self.Sched.LoadActionPlans(self.RatingDb)
|
||||
self.Sched.Restart()
|
||||
self.Sched.Reload(true)
|
||||
*reply = OK
|
||||
return nil
|
||||
}
|
||||
@@ -1208,8 +1204,7 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
|
||||
}
|
||||
if len(aps) != 0 && self.Sched != nil {
|
||||
utils.Logger.Info("ApierV1.LoadTariffPlanFromFolder, reloading scheduler.")
|
||||
self.Sched.LoadActionPlans(self.RatingDb)
|
||||
self.Sched.Restart()
|
||||
self.Sched.Reload(true)
|
||||
}
|
||||
if len(cstKeys) != 0 && self.CdrStatsSrv != nil {
|
||||
if err := self.CdrStatsSrv.ReloadQueues(cstKeys, nil); err != nil {
|
||||
|
||||
@@ -91,8 +91,7 @@ func (self *ApierV2) LoadAccountActions(attrs AttrLoadAccountActions, reply *str
|
||||
return err
|
||||
}
|
||||
if self.Sched != nil {
|
||||
self.Sched.LoadActionPlans(self.RatingDb)
|
||||
self.Sched.Restart()
|
||||
self.Sched.Reload(true)
|
||||
}
|
||||
*reply = v1.OK
|
||||
return nil
|
||||
@@ -249,8 +248,7 @@ func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
|
||||
}
|
||||
if len(aps) != 0 && self.Sched != nil {
|
||||
utils.Logger.Info("ApierV1.LoadTariffPlanFromFolder, reloading scheduler.")
|
||||
self.Sched.LoadActionPlans(self.RatingDb)
|
||||
self.Sched.Restart()
|
||||
self.Sched.Reload(true)
|
||||
}
|
||||
if len(cstKeys) != 0 && self.CdrStatsSrv != nil {
|
||||
if err := self.CdrStatsSrv.ReloadQueues(cstKeys, nil); err != nil {
|
||||
|
||||
@@ -489,11 +489,11 @@ func startScheduler(internalSchedulerChan chan *scheduler.Scheduler, cacheDoneCh
|
||||
cacheDone := <-cacheDoneChan
|
||||
cacheDoneChan <- cacheDone
|
||||
utils.Logger.Info("Starting CGRateS Scheduler.")
|
||||
sched := scheduler.NewScheduler()
|
||||
sched := scheduler.NewScheduler(ratingDb)
|
||||
go reloadSchedulerSingnalHandler(sched, ratingDb)
|
||||
time.Sleep(1)
|
||||
internalSchedulerChan <- sched
|
||||
sched.LoadActionPlans(ratingDb)
|
||||
sched.Reload(true)
|
||||
sched.Loop()
|
||||
exitChan <- true // Should not get out of loop though
|
||||
}
|
||||
|
||||
@@ -125,9 +125,7 @@ func reloadSchedulerSingnalHandler(sched *scheduler.Scheduler, getter engine.Rat
|
||||
sig := <-c
|
||||
|
||||
utils.Logger.Info(fmt.Sprintf("Caught signal %v, reloading action timings.\n", sig))
|
||||
sched.LoadActionPlans(getter)
|
||||
// check the tip of the queue for new actions
|
||||
sched.Restart()
|
||||
sched.Reload(true)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ import "github.com/cgrates/cgrates/utils"
|
||||
|
||||
func init() {
|
||||
c := &CmdAddAccount{
|
||||
name: "account_add",
|
||||
name: "account_set",
|
||||
rpcMethod: "ApierV1.SetAccount",
|
||||
}
|
||||
commands[c.Name()] = c
|
||||
@@ -58,10 +58,11 @@ func (rit *RITiming) CronString() string {
|
||||
hour, min, sec = "*", "*", "*"
|
||||
} else {
|
||||
hms := strings.Split(rit.StartTime, ":")
|
||||
if len(hms) != 3 {
|
||||
if len(hms) == 3 {
|
||||
hour, min, sec = hms[0], hms[1], hms[2]
|
||||
} else {
|
||||
hour, min, sec = "*", "*", "*"
|
||||
}
|
||||
hour, min, sec = hms[0], hms[1], hms[2]
|
||||
if strings.HasPrefix(hour, "0") {
|
||||
hour = hour[1:]
|
||||
}
|
||||
|
||||
@@ -129,8 +129,8 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10`
|
||||
}
|
||||
|
||||
func TestExecuteActions(t *testing.T) {
|
||||
scheduler.NewScheduler().LoadActionPlans(ratingDb)
|
||||
time.Sleep(time.Millisecond) // Give time to scheduler to topup the account
|
||||
scheduler.NewScheduler(ratingDb).Reload(false)
|
||||
time.Sleep(time.Second) // Give time to scheduler to topup the account
|
||||
if acnt, err := acntDb.GetAccount("cgrates.org:12344"); err != nil {
|
||||
t.Error(err)
|
||||
} else if len(acnt.BalanceMap) != 2 {
|
||||
|
||||
@@ -128,7 +128,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10`
|
||||
}
|
||||
|
||||
func TestExecuteActions2(t *testing.T) {
|
||||
scheduler.NewScheduler().LoadActionPlans(ratingDb2)
|
||||
scheduler.NewScheduler(ratingDb2).Reload(false)
|
||||
time.Sleep(time.Millisecond) // Give time to scheduler to topup the account
|
||||
if acnt, err := acntDb2.GetAccount("cgrates.org:12345"); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
@@ -126,7 +126,7 @@ RP_UK,DR_UK_Mobile_BIG5,ALWAYS,10`
|
||||
}
|
||||
|
||||
func TestExecuteActions3(t *testing.T) {
|
||||
scheduler.NewScheduler().LoadActionPlans(ratingDb3)
|
||||
scheduler.NewScheduler(ratingDb3).Reload(false)
|
||||
time.Sleep(time.Millisecond) // Give time to scheduler to topup the account
|
||||
if acnt, err := acntDb3.GetAccount("cgrates.org:12346"); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
@@ -33,21 +33,30 @@ type Scheduler struct {
|
||||
timer *time.Timer
|
||||
restartLoop chan bool
|
||||
sync.Mutex
|
||||
storage engine.RatingStorage
|
||||
waitingReload bool
|
||||
loopChecker chan int
|
||||
schedulerStarted bool
|
||||
}
|
||||
|
||||
func NewScheduler() *Scheduler {
|
||||
return &Scheduler{restartLoop: make(chan bool)}
|
||||
func NewScheduler(storage engine.RatingStorage) *Scheduler {
|
||||
return &Scheduler{
|
||||
restartLoop: make(chan bool),
|
||||
storage: storage,
|
||||
loopChecker: make(chan int),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) Loop() {
|
||||
s.schedulerStarted = true
|
||||
for {
|
||||
for len(s.queue) == 0 { //hang here if empty
|
||||
<-s.restartLoop
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> Scheduler queue length: %v", len(s.queue)))
|
||||
s.Lock()
|
||||
a0 := s.queue[0]
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> Action: %s", a0.Id))
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> Scheduler queue length: %v", len(s.queue)))
|
||||
now := time.Now()
|
||||
start := a0.GetNextStartTime(now)
|
||||
if start.Equal(now) || start.Before(now) {
|
||||
@@ -81,15 +90,40 @@ func (s *Scheduler) Loop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) LoadActionPlans(storage engine.RatingStorage) {
|
||||
actionPlans, err := storage.GetAllActionPlans()
|
||||
func (s *Scheduler) Reload(protect bool) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
if protect {
|
||||
if s.waitingReload {
|
||||
s.loopChecker <- 1
|
||||
}
|
||||
s.waitingReload = true
|
||||
go func() {
|
||||
t := time.NewTicker(time.Second) // wait 1 second before start
|
||||
select {
|
||||
case <-s.loopChecker:
|
||||
t.Stop() // cancel reload
|
||||
case <-t.C:
|
||||
s.LoadActionPlans()
|
||||
s.Restart()
|
||||
t.Stop()
|
||||
s.waitingReload = false
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
s.LoadActionPlans()
|
||||
s.Restart()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) LoadActionPlans() {
|
||||
actionPlans, err := s.storage.GetAllActionPlans()
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
utils.Logger.Warning(fmt.Sprintf("<Scheduler> Cannot get action plans: %v", err))
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> processing %d action plans", len(actionPlans)))
|
||||
// recreate the queue
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.queue = engine.ActionPlanPriotityList{}
|
||||
for key, aps := range actionPlans {
|
||||
toBeSaved := false
|
||||
@@ -123,8 +157,8 @@ func (s *Scheduler) LoadActionPlans(storage engine.RatingStorage) {
|
||||
}
|
||||
if toBeSaved {
|
||||
engine.Guardian.Guard(func() (interface{}, error) {
|
||||
storage.SetActionPlans(key, newApls)
|
||||
storage.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}})
|
||||
s.storage.SetActionPlans(key, newApls)
|
||||
s.storage.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}})
|
||||
return 0, nil
|
||||
}, 0, utils.ACTION_PLAN_PREFIX)
|
||||
}
|
||||
@@ -134,7 +168,9 @@ func (s *Scheduler) LoadActionPlans(storage engine.RatingStorage) {
|
||||
}
|
||||
|
||||
func (s *Scheduler) Restart() {
|
||||
s.restartLoop <- true
|
||||
if s.schedulerStarted {
|
||||
s.restartLoop <- true
|
||||
}
|
||||
if s.timer != nil {
|
||||
s.timer.Stop()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user