diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index e91fe3c6c..af741e9c7 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -531,7 +531,7 @@ func main() { supS := services.NewSupplierService(cfg, dmService, cacheS, filterSChan, server, attrS.GetIntenternalChan(), stS.GetIntenternalChan(), reS.GetIntenternalChan(), dspS.GetIntenternalChan()) - schS := services.NewSchedulerService(cfg, dmService, cacheS, server, internalCDRServerChan, dspS.GetIntenternalChan()) + schS := services.NewSchedulerService(cfg, dmService, cacheS, filterSChan, server, internalCDRServerChan, dspS.GetIntenternalChan()) rals := services.NewRalService(cfg, dmService, cdrDb, loadDb, cacheS, filterSChan, server, tS.GetIntenternalChan(), stS.GetIntenternalChan(), internalCacheSChan, schS.GetIntenternalChan(), attrS.GetIntenternalChan(), dspS.GetIntenternalChan(), diff --git a/config/config_it_test.go b/config/config_it_test.go index c86cbad7a..188edab16 100644 --- a/config/config_it_test.go +++ b/config/config_it_test.go @@ -270,6 +270,7 @@ func TestCGRConfigReloadSchedulerS(t *testing.T) { Transport: utils.MetaJSONrpc, }, }, + Filters: []string{}, } if !reflect.DeepEqual(expAttr, cfg.SchedulerCfg()) { t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.SchedulerCfg())) diff --git a/engine/action_plan.go b/engine/action_plan.go index de3a1a095..bf58de039 100644 --- a/engine/action_plan.go +++ b/engine/action_plan.go @@ -45,12 +45,6 @@ type ActionTiming struct { stCache time.Time // cached time of the next start } -type Task struct { - Uuid string - AccountID string - ActionsID string -} - type ActionPlan struct { Id string // informative purpose only AccountIDs utils.StringMap @@ -72,14 +66,6 @@ func (apl *ActionPlan) Clone() (interface{}, error) { return cln, nil } -func (t *Task) Execute() error { - return (&ActionTiming{ - Uuid: t.Uuid, - ActionsID: t.ActionsID, - accountIDs: utils.StringMap{t.AccountID: true}, - }).Execute(nil, nil) -} - func (at *ActionTiming) GetNextStartTime(now time.Time) (t time.Time) { if !at.stCache.IsZero() { return at.stCache diff --git a/engine/task.go b/engine/task.go new file mode 100644 index 000000000..4776e92b6 --- /dev/null +++ b/engine/task.go @@ -0,0 +1,85 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "net" + "strings" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" +) + +// Task is a one time action executed by the scheduler +type Task struct { + Uuid string + AccountID string + ActionsID string +} + +func (t *Task) Execute() error { + return (&ActionTiming{ + Uuid: t.Uuid, + ActionsID: t.ActionsID, + accountIDs: utils.StringMap{t.AccountID: true}, + }).Execute(nil, nil) +} + +// String implements config.DataProvider +func (t *Task) String() string { + return utils.ToJSON(t) +} + +// AsNavigableMap implements config.DataProvider +func (t *Task) AsNavigableMap(_ []*config.FCTemplate) (nm *config.NavigableMap, err error) { + nm = new(config.NavigableMap) + nm.Set([]string{utils.UUID}, t.Uuid, false, false) + nm.Set([]string{utils.AccountID}, t.AccountID, false, false) + nm.Set([]string{utils.ActionsID}, t.ActionsID, false, false) + return +} + +// FieldAsInterface implements config.DataProvider +// ToDo: support Action fields +func (t *Task) FieldAsInterface(fldPath []string) (iface interface{}, err error) { + return t.FieldAsString(fldPath) +} + +// FieldAsInterface implements config.DataProvider +// ToDo: support Action fields +func (t *Task) FieldAsString(fldPath []string) (s string, err error) { + if len(fldPath) == 0 { + return + } + switch fldPath[0] { + case utils.UUID: + return t.Uuid, nil + case utils.AccountID: + return t.AccountID, nil + case utils.ActionsID: + return t.ActionsID, nil + default: + return "", utils.ErrPrefixNotFound(strings.Join(fldPath, utils.NestingSep)) + } +} + +// RemoteHost implements config.DataProvider +func (t *Task) RemoteHost() (rh net.Addr) { + return +} diff --git a/general_tests/ddazmbl1_test.go b/general_tests/ddazmbl1_test.go index f46181767..4d937c6bc 100644 --- a/general_tests/ddazmbl1_test.go +++ b/general_tests/ddazmbl1_test.go @@ -130,7 +130,8 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10` } func TestDZ1ExecuteActions(t *testing.T) { - scheduler.NewScheduler(dataDB).Reload() + scheduler.NewScheduler(dataDB, config.CgrConfig(), + engine.NewFilterS(config.CgrConfig(), nil, nil, nil, dataDB)).Reload() time.Sleep(10 * time.Millisecond) // Give time to scheduler to topup the account if acnt, err := dataDB.DataDB().GetAccount("cgrates.org:12344"); err != nil { t.Error(err) diff --git a/general_tests/ddazmbl2_test.go b/general_tests/ddazmbl2_test.go index 3045dfa0c..51c8adb69 100644 --- a/general_tests/ddazmbl2_test.go +++ b/general_tests/ddazmbl2_test.go @@ -127,7 +127,8 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10` } func TestExecuteActions2(t *testing.T) { - scheduler.NewScheduler(dataDB2).Reload() + scheduler.NewScheduler(dataDB2, config.CgrConfig(), + engine.NewFilterS(config.CgrConfig(), nil, nil, nil, dataDB)).Reload() time.Sleep(10 * time.Millisecond) // Give time to scheduler to topup the account if acnt, err := dataDB2.DataDB().GetAccount("cgrates.org:12345"); err != nil { t.Error(err) diff --git a/general_tests/ddazmbl3_test.go b/general_tests/ddazmbl3_test.go index 406071f9c..0586c2f1c 100644 --- a/general_tests/ddazmbl3_test.go +++ b/general_tests/ddazmbl3_test.go @@ -126,7 +126,8 @@ cgrates.org,call,discounted_minutes,2013-01-06T00:00:00Z,RP_UK_Mobile_BIG5_PKG,` } func TestExecuteActions3(t *testing.T) { - scheduler.NewScheduler(dataDB3).Reload() + scheduler.NewScheduler(dataDB3, config.CgrConfig(), + engine.NewFilterS(config.CgrConfig(), nil, nil, nil, dataDB)).Reload() time.Sleep(10 * time.Millisecond) // Give time to scheduler to topup the account if acnt, err := dataDB3.DataDB().GetAccount("cgrates.org:12346"); err != nil { t.Error(err) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 4e40486c8..fedb5f4bd 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) @@ -35,6 +36,8 @@ type Scheduler struct { timer *time.Timer restartLoop chan bool dm *engine.DataManager + cfg *config.CGRConfig + fltrS *engine.FilterS schedulerStarted bool actStatsInterval time.Duration // How long time to keep the stats in memory actSucessChan, actFailedChan chan *engine.Action // ActionPlan will pass actions via these channels @@ -42,13 +45,16 @@ type Scheduler struct { actSuccessStats, actFailedStats map[string]map[time.Time]bool // keep here stats regarding executed actions, map[actionType]map[execTime]bool } -func NewScheduler(dm *engine.DataManager) *Scheduler { - s := &Scheduler{ +func NewScheduler(dm *engine.DataManager, cfg *config.CGRConfig, + fltrS *engine.FilterS) (s *Scheduler) { + s = &Scheduler{ restartLoop: make(chan bool), dm: dm, + cfg: cfg, + fltrS: fltrS, } s.Reload() - return s + return } func (s *Scheduler) updateActStats(act *engine.Action, isFailed bool) { @@ -142,9 +148,18 @@ func (s *Scheduler) loadActionPlans() { if err != nil || task == nil { break } + if pass, err := s.fltrS.Pass(s.cfg.GeneralCfg().DefaultTenant, + s.cfg.SchedulerCfg().Filters, task); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: <%s> querying filters for path: <%+v>, not executing task <%s> on account <%s>", + utils.SchedulerS, err.Error(), s.cfg.SchedulerCfg().Filters[1:], task.ActionsID, task.AccountID)) + continue + } else if !pass { + continue + } limit <- true go func() { - utils.Logger.Info(fmt.Sprintf(" executing task %s on account %s", task.ActionsID, task.AccountID)) + utils.Logger.Info(fmt.Sprintf("<%s> executing task %s on account %s", utils.SchedulerS, task.ActionsID, task.AccountID)) task.Execute() <-limit }() diff --git a/services/schedulers.go b/services/schedulers.go index 43312debf..923ea5ee0 100644 --- a/services/schedulers.go +++ b/services/schedulers.go @@ -32,13 +32,15 @@ import ( // NewSchedulerService returns the Scheduler Service func NewSchedulerService(cfg *config.CGRConfig, dm *DataDBService, - cacheS *engine.CacheS, server *utils.Server, internalCDRServerChan, + cacheS *engine.CacheS, fltrSChan chan *engine.FilterS, + server *utils.Server, internalCDRServerChan, dispatcherChan chan rpcclient.RpcClientConnection) *SchedulerService { return &SchedulerService{ connChan: make(chan rpcclient.RpcClientConnection, 1), cfg: cfg, dm: dm, cacheS: cacheS, + fltrSChan: fltrSChan, server: server, cdrSChan: internalCDRServerChan, dispatcherChan: dispatcherChan, @@ -51,6 +53,7 @@ type SchedulerService struct { cfg *config.CGRConfig dm *DataDBService cacheS *engine.CacheS + fltrSChan chan *engine.FilterS server *utils.Server cdrSChan chan rpcclient.RpcClientConnection dispatcherChan chan rpcclient.RpcClientConnection @@ -68,11 +71,13 @@ func (schS *SchedulerService) Start() (err error) { <-schS.cacheS.GetPrecacheChannel(utils.CacheActionPlans) // wait for ActionPlans to be cached + fltrS := <-schS.fltrSChan + schS.fltrSChan <- fltrS + schS.Lock() defer schS.Unlock() - utils.Logger.Info(" Starting CGRateS Scheduler.") - schS.schS = scheduler.NewScheduler(schS.dm.GetDM()) + schS.schS = scheduler.NewScheduler(schS.dm.GetDM(), schS.cfg, fltrS) go schS.schS.Loop() schS.rpc = v1.NewSchedulerSv1(schS.cfg) diff --git a/utils/consts.go b/utils/consts.go index b35404e8c..e664e80a2 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -570,6 +570,8 @@ const ( PassLow = "pass" SentinelLow = "sentinel" QueryLow = "query" + UUID = "UUID" + ActionsID = "ActionsID" ) // Migrator Action