Scheduler with support for filtering tasks

This commit is contained in:
DanB
2019-11-06 13:57:01 +01:00
parent db5d84f818
commit ba533bd706
10 changed files with 122 additions and 25 deletions

View File

@@ -531,7 +531,7 @@ func main() {
supS := services.NewSupplierService(cfg, dmService, cacheS, filterSChan, server,
attrS.GetIntenternalChan(), stS.GetIntenternalChan(),
reS.GetIntenternalChan(), dspS.GetIntenternalChan())
schS := services.NewSchedulerService(cfg, dmService, cacheS, server, internalCDRServerChan, dspS.GetIntenternalChan())
schS := services.NewSchedulerService(cfg, dmService, cacheS, filterSChan, server, internalCDRServerChan, dspS.GetIntenternalChan())
rals := services.NewRalService(cfg, dmService, cdrDb, loadDb, cacheS, filterSChan, server,
tS.GetIntenternalChan(), stS.GetIntenternalChan(), internalCacheSChan,
schS.GetIntenternalChan(), attrS.GetIntenternalChan(), dspS.GetIntenternalChan(),

View File

@@ -270,6 +270,7 @@ func TestCGRConfigReloadSchedulerS(t *testing.T) {
Transport: utils.MetaJSONrpc,
},
},
Filters: []string{},
}
if !reflect.DeepEqual(expAttr, cfg.SchedulerCfg()) {
t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.SchedulerCfg()))

View File

@@ -45,12 +45,6 @@ type ActionTiming struct {
stCache time.Time // cached time of the next start
}
type Task struct {
Uuid string
AccountID string
ActionsID string
}
type ActionPlan struct {
Id string // informative purpose only
AccountIDs utils.StringMap
@@ -72,14 +66,6 @@ func (apl *ActionPlan) Clone() (interface{}, error) {
return cln, nil
}
func (t *Task) Execute() error {
return (&ActionTiming{
Uuid: t.Uuid,
ActionsID: t.ActionsID,
accountIDs: utils.StringMap{t.AccountID: true},
}).Execute(nil, nil)
}
func (at *ActionTiming) GetNextStartTime(now time.Time) (t time.Time) {
if !at.stCache.IsZero() {
return at.stCache

85
engine/task.go Normal file
View File

@@ -0,0 +1,85 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"net"
"strings"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
// Task is a one time action executed by the scheduler
type Task struct {
Uuid string
AccountID string
ActionsID string
}
func (t *Task) Execute() error {
return (&ActionTiming{
Uuid: t.Uuid,
ActionsID: t.ActionsID,
accountIDs: utils.StringMap{t.AccountID: true},
}).Execute(nil, nil)
}
// String implements config.DataProvider
func (t *Task) String() string {
return utils.ToJSON(t)
}
// AsNavigableMap implements config.DataProvider
func (t *Task) AsNavigableMap(_ []*config.FCTemplate) (nm *config.NavigableMap, err error) {
nm = new(config.NavigableMap)
nm.Set([]string{utils.UUID}, t.Uuid, false, false)
nm.Set([]string{utils.AccountID}, t.AccountID, false, false)
nm.Set([]string{utils.ActionsID}, t.ActionsID, false, false)
return
}
// FieldAsInterface implements config.DataProvider
// ToDo: support Action fields
func (t *Task) FieldAsInterface(fldPath []string) (iface interface{}, err error) {
return t.FieldAsString(fldPath)
}
// FieldAsInterface implements config.DataProvider
// ToDo: support Action fields
func (t *Task) FieldAsString(fldPath []string) (s string, err error) {
if len(fldPath) == 0 {
return
}
switch fldPath[0] {
case utils.UUID:
return t.Uuid, nil
case utils.AccountID:
return t.AccountID, nil
case utils.ActionsID:
return t.ActionsID, nil
default:
return "", utils.ErrPrefixNotFound(strings.Join(fldPath, utils.NestingSep))
}
}
// RemoteHost implements config.DataProvider
func (t *Task) RemoteHost() (rh net.Addr) {
return
}

View File

@@ -130,7 +130,8 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10`
}
func TestDZ1ExecuteActions(t *testing.T) {
scheduler.NewScheduler(dataDB).Reload()
scheduler.NewScheduler(dataDB, config.CgrConfig(),
engine.NewFilterS(config.CgrConfig(), nil, nil, nil, dataDB)).Reload()
time.Sleep(10 * time.Millisecond) // Give time to scheduler to topup the account
if acnt, err := dataDB.DataDB().GetAccount("cgrates.org:12344"); err != nil {
t.Error(err)

View File

@@ -127,7 +127,8 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10`
}
func TestExecuteActions2(t *testing.T) {
scheduler.NewScheduler(dataDB2).Reload()
scheduler.NewScheduler(dataDB2, config.CgrConfig(),
engine.NewFilterS(config.CgrConfig(), nil, nil, nil, dataDB)).Reload()
time.Sleep(10 * time.Millisecond) // Give time to scheduler to topup the account
if acnt, err := dataDB2.DataDB().GetAccount("cgrates.org:12345"); err != nil {
t.Error(err)

View File

@@ -126,7 +126,8 @@ cgrates.org,call,discounted_minutes,2013-01-06T00:00:00Z,RP_UK_Mobile_BIG5_PKG,`
}
func TestExecuteActions3(t *testing.T) {
scheduler.NewScheduler(dataDB3).Reload()
scheduler.NewScheduler(dataDB3, config.CgrConfig(),
engine.NewFilterS(config.CgrConfig(), nil, nil, nil, dataDB)).Reload()
time.Sleep(10 * time.Millisecond) // Give time to scheduler to topup the account
if acnt, err := dataDB3.DataDB().GetAccount("cgrates.org:12346"); err != nil {
t.Error(err)

View File

@@ -25,6 +25,7 @@ import (
"sync"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -35,6 +36,8 @@ type Scheduler struct {
timer *time.Timer
restartLoop chan bool
dm *engine.DataManager
cfg *config.CGRConfig
fltrS *engine.FilterS
schedulerStarted bool
actStatsInterval time.Duration // How long time to keep the stats in memory
actSucessChan, actFailedChan chan *engine.Action // ActionPlan will pass actions via these channels
@@ -42,13 +45,16 @@ type Scheduler struct {
actSuccessStats, actFailedStats map[string]map[time.Time]bool // keep here stats regarding executed actions, map[actionType]map[execTime]bool
}
func NewScheduler(dm *engine.DataManager) *Scheduler {
s := &Scheduler{
func NewScheduler(dm *engine.DataManager, cfg *config.CGRConfig,
fltrS *engine.FilterS) (s *Scheduler) {
s = &Scheduler{
restartLoop: make(chan bool),
dm: dm,
cfg: cfg,
fltrS: fltrS,
}
s.Reload()
return s
return
}
func (s *Scheduler) updateActStats(act *engine.Action, isFailed bool) {
@@ -142,9 +148,18 @@ func (s *Scheduler) loadActionPlans() {
if err != nil || task == nil {
break
}
if pass, err := s.fltrS.Pass(s.cfg.GeneralCfg().DefaultTenant,
s.cfg.SchedulerCfg().Filters, task); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> querying filters for path: <%+v>, not executing task <%s> on account <%s>",
utils.SchedulerS, err.Error(), s.cfg.SchedulerCfg().Filters[1:], task.ActionsID, task.AccountID))
continue
} else if !pass {
continue
}
limit <- true
go func() {
utils.Logger.Info(fmt.Sprintf("<Scheduler> executing task %s on account %s", task.ActionsID, task.AccountID))
utils.Logger.Info(fmt.Sprintf("<%s> executing task %s on account %s", utils.SchedulerS, task.ActionsID, task.AccountID))
task.Execute()
<-limit
}()

View File

@@ -32,13 +32,15 @@ import (
// NewSchedulerService returns the Scheduler Service
func NewSchedulerService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *engine.CacheS, server *utils.Server, internalCDRServerChan,
cacheS *engine.CacheS, fltrSChan chan *engine.FilterS,
server *utils.Server, internalCDRServerChan,
dispatcherChan chan rpcclient.RpcClientConnection) *SchedulerService {
return &SchedulerService{
connChan: make(chan rpcclient.RpcClientConnection, 1),
cfg: cfg,
dm: dm,
cacheS: cacheS,
fltrSChan: fltrSChan,
server: server,
cdrSChan: internalCDRServerChan,
dispatcherChan: dispatcherChan,
@@ -51,6 +53,7 @@ type SchedulerService struct {
cfg *config.CGRConfig
dm *DataDBService
cacheS *engine.CacheS
fltrSChan chan *engine.FilterS
server *utils.Server
cdrSChan chan rpcclient.RpcClientConnection
dispatcherChan chan rpcclient.RpcClientConnection
@@ -68,11 +71,13 @@ func (schS *SchedulerService) Start() (err error) {
<-schS.cacheS.GetPrecacheChannel(utils.CacheActionPlans) // wait for ActionPlans to be cached
fltrS := <-schS.fltrSChan
schS.fltrSChan <- fltrS
schS.Lock()
defer schS.Unlock()
utils.Logger.Info("<ServiceManager> Starting CGRateS Scheduler.")
schS.schS = scheduler.NewScheduler(schS.dm.GetDM())
schS.schS = scheduler.NewScheduler(schS.dm.GetDM(), schS.cfg, fltrS)
go schS.schS.Loop()
schS.rpc = v1.NewSchedulerSv1(schS.cfg)

View File

@@ -570,6 +570,8 @@ const (
PassLow = "pass"
SentinelLow = "sentinel"
QueryLow = "query"
UUID = "UUID"
ActionsID = "ActionsID"
)
// Migrator Action