diff --git a/apier/v1/accounts.go b/apier/v1/accounts.go
index 571e67cb7..148194a2b 100644
--- a/apier/v1/accounts.go
+++ b/apier/v1/accounts.go
@@ -18,6 +18,7 @@ along with this program. If not, see
package v1
import (
+ "errors"
"fmt"
"math"
"strings"
@@ -130,8 +131,12 @@ func (self *ApierV1) RemActionTiming(attrs AttrRemActionTiming, reply *string) e
*reply = err.Error()
return utils.NewErrServerError(err)
}
- if attrs.ReloadScheduler && self.Sched != nil {
- self.Sched.Reload(true)
+ if attrs.ReloadScheduler {
+ sched := self.ServManager.GetScheduler()
+ if sched == nil {
+ return errors.New(utils.SchedulerNotRunningCaps)
+ }
+ sched.Reload()
}
*reply = OK
return nil
@@ -235,10 +240,11 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) error
return utils.NewErrServerError(err)
}
if attr.ReloadScheduler && schedulerReloadNeeded {
- // reload scheduler
- if self.Sched != nil {
- self.Sched.Reload(true)
+ sched := self.ServManager.GetScheduler()
+ if sched == nil {
+ return errors.New(utils.SchedulerNotRunningCaps)
}
+ sched.Reload()
}
*reply = OK // This will mark saving of the account, error still can show up in actionTimingsId
return nil
@@ -288,12 +294,7 @@ func (self *ApierV1) RemoveAccount(attr utils.AttrRemoveAccount, reply *string)
if err != nil {
return utils.NewErrServerError(err)
}
- if attr.ReloadScheduler && len(dirtyActionPlans) > 0 {
- // reload scheduler
- if self.Sched != nil {
- self.Sched.Reload(true)
- }
- }
+
*reply = OK
return nil
}
diff --git a/apier/v1/apier.go b/apier/v1/apier.go
index 14232bcd1..5cd6735dc 100644
--- a/apier/v1/apier.go
+++ b/apier/v1/apier.go
@@ -29,7 +29,7 @@ import (
"github.com/cgrates/cgrates/cache"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
- "github.com/cgrates/cgrates/scheduler"
+ "github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
@@ -43,9 +43,9 @@ type ApierV1 struct {
RatingDb engine.RatingStorage
AccountDb engine.AccountingStorage
CdrDb engine.CdrStorage
- Sched *scheduler.Scheduler
Config *config.CGRConfig
Responder *engine.Responder
+ ServManager *servmanager.ServiceManager
CdrStatsSrv rpcclient.RpcClientConnection
Users rpcclient.RpcClientConnection
CDRs rpcclient.RpcClientConnection // FixMe: populate it from cgr-engine
@@ -333,9 +333,11 @@ func (self *ApierV1) LoadTariffPlanFromStorDb(attrs AttrLoadTpFromStorDb, reply
self.RatingDb.PreloadRatingCache()
self.AccountDb.PreloadAccountingCache()
- if len(aps) != 0 && self.Sched != nil {
- utils.Logger.Info("ApierV1.LoadTariffPlanFromStorDb, reloading scheduler.")
- self.Sched.Reload(true)
+ if len(aps) != 0 {
+ sched := self.ServManager.GetScheduler()
+ if sched != nil {
+ sched.Reload()
+ }
}
if len(cstKeys) != 0 && self.CdrStatsSrv != nil {
@@ -621,10 +623,11 @@ func (self *ApierV1) SetActionPlan(attrs AttrSetActionPlan, reply *string) error
return utils.NewErrServerError(err)
}
if attrs.ReloadScheduler {
- if self.Sched == nil {
- return errors.New("SCHEDULER_NOT_ENABLED")
+ sched := self.ServManager.GetScheduler()
+ if sched == nil {
+ return errors.New(utils.SchedulerNotRunningCaps)
}
- self.Sched.Reload(true)
+ sched.Reload()
}
*reply = OK
return nil
@@ -672,19 +675,21 @@ func (self *ApierV1) LoadAccountActions(attrs utils.TPAccountActions, reply *str
}
// ToDo: Get the action keys loaded by dbReader so we reload only these in cache
// Need to do it before scheduler otherwise actions to run will be unknown
- if self.Sched != nil {
- self.Sched.Reload(true)
+ sched := self.ServManager.GetScheduler()
+ if sched != nil {
+ sched.Reload()
}
*reply = OK
return nil
}
-func (self *ApierV1) ReloadScheduler(input string, reply *string) error {
- if self.Sched == nil {
- return utils.ErrNotFound
+func (self *ApierV1) ReloadScheduler(ignore string, reply *string) error {
+ sched := self.ServManager.GetScheduler()
+ if sched == nil {
+ return errors.New(utils.SchedulerNotRunningCaps)
}
- self.Sched.Reload(true)
- *reply = OK
+ sched.Reload()
+ *reply = utils.OK
return nil
}
@@ -928,10 +933,11 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
cache.Flush()
self.RatingDb.PreloadRatingCache()
self.AccountDb.PreloadAccountingCache()
-
- if len(aps) != 0 && self.Sched != nil {
- utils.Logger.Info("ApierV1.LoadTariffPlanFromFolder, reloading scheduler.")
- self.Sched.Reload(true)
+ if len(aps) != 0 {
+ sched := self.ServManager.GetScheduler()
+ if sched != nil {
+ sched.Reload()
+ }
}
if len(cstKeys) != 0 && self.CdrStatsSrv != nil {
var out int
diff --git a/apier/v1/apier_it_test.go b/apier/v1/apier_it_test.go
index d48b8fe52..901bf9959 100644
--- a/apier/v1/apier_it_test.go
+++ b/apier/v1/apier_it_test.go
@@ -99,18 +99,9 @@ func TestApierInitStorDb(t *testing.T) {
// Finds cgr-engine executable and starts it with default configuration
func TestApierStartEngine(t *testing.T) {
- enginePath, err := exec.LookPath("cgr-engine")
- if err != nil {
- t.Fatal("Cannot find cgr-engine executable")
+ if _, err := engine.StopStartEngine(cfgPath, *waitRater); err != nil {
+ t.Fatal(err)
}
- exec.Command("pkill", "cgr-engine").Run() // Just to make sure another one is not running, bit brutal maybe we can fine tune it
- time.Sleep(time.Duration(*waitRater) * time.Millisecond)
- engine := exec.Command(enginePath, "-config_dir", cfgPath)
- //engine.Stderr = os.Stderr
- if err := engine.Start(); err != nil {
- t.Fatal("Cannot start cgr-engine: ", err.Error())
- }
- time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time to rater to fire up
}
// Connect rpc client to rater
diff --git a/apier/v1/scheduler.go b/apier/v1/scheduler.go
index 732ed38b7..d0ee56f66 100644
--- a/apier/v1/scheduler.go
+++ b/apier/v1/scheduler.go
@@ -112,11 +112,12 @@ type ScheduledActions struct {
}
func (self *ApierV1) GetScheduledActions(attrs AttrsGetScheduledActions, reply *[]*ScheduledActions) error {
- if self.Sched == nil {
- return errors.New("SCHEDULER_NOT_ENABLED")
+ sched := self.ServManager.GetScheduler()
+ if sched == nil {
+ return errors.New(utils.SchedulerNotRunningCaps)
}
schedActions := make([]*ScheduledActions, 0) // needs to be initialized if remains empty
- scheduledActions := self.Sched.GetQueue()
+ scheduledActions := sched.GetQueue()
for _, qActions := range scheduledActions {
sas := &ScheduledActions{ActionsId: qActions.ActionsID, ActionPlanId: qActions.GetActionPlanID(), ActionTimingUuid: qActions.Uuid, Accounts: len(qActions.GetAccountIDs())}
if attrs.SearchTerm != "" &&
diff --git a/apier/v2/accounts.go b/apier/v2/accounts.go
index f0c29ded7..1fbbe0931 100644
--- a/apier/v2/accounts.go
+++ b/apier/v2/accounts.go
@@ -18,6 +18,7 @@ along with this program. If not, see
package v2
import (
+ "errors"
"fmt"
"math"
@@ -207,10 +208,11 @@ func (self *ApierV2) SetAccount(attr AttrSetAccount, reply *string) error {
return utils.NewErrServerError(err)
}
if attr.ReloadScheduler && len(dirtyActionPlans) > 0 {
- // reload scheduler
- if self.Sched != nil {
- self.Sched.Reload(true)
+ sched := self.ServManager.GetScheduler()
+ if sched == nil {
+ return errors.New(utils.SchedulerNotRunningCaps)
}
+ sched.Reload()
}
*reply = utils.OK // This will mark saving of the account, error still can show up in actionTimingsId
return nil
diff --git a/apier/v2/apier.go b/apier/v2/apier.go
index edb067d90..100d924af 100644
--- a/apier/v2/apier.go
+++ b/apier/v2/apier.go
@@ -79,8 +79,9 @@ func (self *ApierV2) LoadAccountActions(attrs AttrLoadAccountActions, reply *str
}, 0, attrs.AccountActionsId); err != nil {
return utils.NewErrServerError(err)
}
- if self.Sched != nil {
- self.Sched.Reload(true)
+ sched := self.ServManager.GetScheduler()
+ if sched != nil {
+ sched.Reload()
}
*reply = v1.OK
return nil
@@ -211,9 +212,12 @@ func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
self.RatingDb.PreloadRatingCache()
self.AccountDb.PreloadAccountingCache()
- if len(aps) != 0 && self.Sched != nil {
- utils.Logger.Info("ApierV2.LoadTariffPlanFromFolder, reloading scheduler.")
- self.Sched.Reload(true)
+ if len(aps) != 0 {
+ sched := self.ServManager.GetScheduler()
+ if sched != nil {
+ utils.Logger.Info("ApierV2.LoadTariffPlanFromFolder, reloading scheduler.")
+ sched.Reload()
+ }
}
if len(cstKeys) != 0 && self.CdrStatsSrv != nil {
var out int
diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go
index 2e1267be6..f020c69e8 100644
--- a/cmd/cgr-engine/cgr-engine.go
+++ b/cmd/cgr-engine/cgr-engine.go
@@ -39,6 +39,7 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/history"
"github.com/cgrates/cgrates/scheduler"
+ "github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/sessionmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
@@ -412,21 +413,20 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, cdrDb engine
internalCdrSChan <- cdrServer // Signal that cdrS is operational
}
-func startScheduler(internalSchedulerChan chan *scheduler.Scheduler, cacheDoneChan chan struct{}, ratingDb engine.RatingStorage, exitChan chan bool) {
+func startScheduler(internalSchedulerChan chan *scheduler.Scheduler, cacheDoneChan chan struct{}, ratingDB engine.RatingStorage, exitChan chan bool) {
// Wait for cache to load data before starting
cacheDone := <-cacheDoneChan
cacheDoneChan <- cacheDone
utils.Logger.Info("Starting CGRateS Scheduler.")
- sched := scheduler.NewScheduler(ratingDb)
- time.Sleep(1)
+ sched := scheduler.NewScheduler(ratingDB)
internalSchedulerChan <- sched
- sched.Reload(true)
+
sched.Loop()
exitChan <- true // Should not get out of loop though
}
-func startCdrStats(internalCdrStatSChan chan rpcclient.RpcClientConnection, ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, server *utils.Server) {
- cdrStats := engine.NewStats(ratingDb, accountDb, cfg.CDRStatsSaveInterval)
+func startCdrStats(internalCdrStatSChan chan rpcclient.RpcClientConnection, ratingDB engine.RatingStorage, accountDb engine.AccountingStorage, server *utils.Server) {
+ cdrStats := engine.NewStats(ratingDB, accountDb, cfg.CDRStatsSaveInterval)
server.RpcRegister(cdrStats)
server.RpcRegister(&v1.CDRStatsV1{CdrStats: cdrStats}) // Public APIs
internalCdrStatSChan <- cdrStats
@@ -615,19 +615,19 @@ func main() {
// Init cache
cache.NewCache(cfg.CacheConfig)
- var ratingDb engine.RatingStorage
+ var ratingDB engine.RatingStorage
var accountDb engine.AccountingStorage
var loadDb engine.LoadStorage
var cdrDb engine.CdrStorage
if cfg.RALsEnabled || cfg.SchedulerEnabled || cfg.CDRStatsEnabled { // Only connect to dataDb if necessary
- ratingDb, err = engine.ConfigureRatingStorage(cfg.TpDbType, cfg.TpDbHost, cfg.TpDbPort,
+ ratingDB, err = engine.ConfigureRatingStorage(cfg.TpDbType, cfg.TpDbHost, cfg.TpDbPort,
cfg.TpDbName, cfg.TpDbUser, cfg.TpDbPass, cfg.DBDataEncoding, cfg.CacheConfig, cfg.LoadHistorySize)
if err != nil { // Cannot configure getter database, show stopper
utils.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err))
return
}
- defer ratingDb.Close()
- engine.SetRatingStorage(ratingDb)
+ defer ratingDB.Close()
+ engine.SetRatingStorage(ratingDB)
}
if cfg.RALsEnabled || cfg.CDRStatsEnabled || cfg.PubSubServerEnabled || cfg.AliasesServerEnabled || cfg.UserServerEnabled {
accountDb, err = engine.ConfigureAccountingStorage(cfg.DataDbType, cfg.DataDbHost, cfg.DataDbPort,
@@ -671,7 +671,6 @@ func main() {
internalBalancerChan := make(chan *balancer2go.Balancer, 1)
internalRaterChan := make(chan rpcclient.RpcClientConnection, 1)
cacheDoneChan := make(chan struct{}, 1)
- internalSchedulerChan := make(chan *scheduler.Scheduler, 1)
internalCdrSChan := make(chan rpcclient.RpcClientConnection, 1)
internalCdrStatSChan := make(chan rpcclient.RpcClientConnection, 1)
internalHistorySChan := make(chan rpcclient.RpcClientConnection, 1)
@@ -680,6 +679,10 @@ func main() {
internalAliaseSChan := make(chan rpcclient.RpcClientConnection, 1)
internalSMGChan := make(chan *sessionmanager.SMGeneric, 1)
internalRLSChan := make(chan rpcclient.RpcClientConnection, 1)
+
+ // Start ServiceManager
+ srvManager := servmanager.NewServiceManager(cfg, ratingDB, exitChan, cacheDoneChan)
+
// Start balancer service
if cfg.BalancerEnabled {
go startBalancer(internalBalancerChan, &stopHandled, exitChan) // Not really needed async here but to cope with uniformity
@@ -687,13 +690,13 @@ func main() {
// Start rater service
if cfg.RALsEnabled {
- go startRater(internalRaterChan, cacheDoneChan, internalBalancerChan, internalSchedulerChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan,
- server, ratingDb, accountDb, loadDb, cdrDb, &stopHandled, exitChan)
+ go startRater(internalRaterChan, cacheDoneChan, internalBalancerChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan,
+ srvManager, server, ratingDB, accountDb, loadDb, cdrDb, &stopHandled, exitChan)
}
// Start Scheduler
if cfg.SchedulerEnabled {
- go startScheduler(internalSchedulerChan, cacheDoneChan, ratingDb, exitChan)
+ go srvManager.StartScheduler(true)
}
// Start CDR Server
@@ -704,7 +707,7 @@ func main() {
// Start CDR Stats server
if cfg.CDRStatsEnabled {
- go startCdrStats(internalCdrStatSChan, ratingDb, accountDb, server)
+ go startCdrStats(internalCdrStatSChan, ratingDB, accountDb, server)
}
// Start CDRC components if necessary
diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go
index cf157a3af..a85e68c07 100644
--- a/cmd/cgr-engine/rater.go
+++ b/cmd/cgr-engine/rater.go
@@ -26,7 +26,7 @@ import (
"github.com/cgrates/cgrates/balancer2go"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/history"
- "github.com/cgrates/cgrates/scheduler"
+ "github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
@@ -50,10 +50,10 @@ func startBalancer(internalBalancerChan chan *balancer2go.Balancer, stopHandled
}
// Starts rater and reports on chan
-func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneChan chan struct{}, internalBalancerChan chan *balancer2go.Balancer, internalSchedulerChan chan *scheduler.Scheduler,
+func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneChan chan struct{}, internalBalancerChan chan *balancer2go.Balancer,
internalCdrStatSChan chan rpcclient.RpcClientConnection, internalHistorySChan chan rpcclient.RpcClientConnection,
internalPubSubSChan chan rpcclient.RpcClientConnection, internalUserSChan chan rpcclient.RpcClientConnection, internalAliaseSChan chan rpcclient.RpcClientConnection,
- server *utils.Server,
+ serviceManager *servmanager.ServiceManager, server *utils.Server,
ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, stopHandled *bool, exitChan chan bool) {
var waitTasks []chan struct{}
@@ -79,24 +79,6 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
cacheDoneChan <- struct{}{}
}()
- // Retrieve scheduler for it's API methods
- var sched *scheduler.Scheduler // Need the scheduler in APIer
- if cfg.SchedulerEnabled {
- schedTaskChan := make(chan struct{})
- waitTasks = append(waitTasks, schedTaskChan)
- go func() {
- defer close(schedTaskChan)
- select {
- case sched = <-internalSchedulerChan:
- internalSchedulerChan <- sched
- case <-time.After(cfg.InternalTtl):
- utils.Logger.Crit(": Internal scheduler connection timeout.")
- exitChan <- true
- return
- }
-
- }()
- }
var bal *balancer2go.Balancer
if cfg.RALsBalancer != "" { // Connection to balancer
balTaskChan := make(chan struct{})
@@ -200,8 +182,8 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
}
responder := &engine.Responder{Bal: bal, ExitChan: exitChan}
responder.SetTimeToLive(cfg.ResponseCacheTTL, nil)
- apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, Sched: sched,
- Config: cfg, Responder: responder}
+ apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb,
+ Config: cfg, Responder: responder, ServManager: serviceManager}
if cdrStats != nil { // ToDo: Fix here properly the init of stats
responder.Stats = cdrStats
apierRpcV1.CdrStatsSrv = cdrStats
@@ -212,7 +194,6 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
apierRpcV2 := &v2.ApierV2{
ApierV1: *apierRpcV1}
- // internalSchedulerChan shared here
server.RpcRegister(responder)
server.RpcRegister(apierRpcV1)
server.RpcRegister(apierRpcV2)
diff --git a/general_tests/ddazmbl1_test.go b/general_tests/ddazmbl1_test.go
index abe9d2be9..bc5350413 100644
--- a/general_tests/ddazmbl1_test.go
+++ b/general_tests/ddazmbl1_test.go
@@ -129,7 +129,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10`
}
func TestExecuteActions(t *testing.T) {
- scheduler.NewScheduler(ratingDb).Reload(false)
+ scheduler.NewScheduler(ratingDb).Reload()
time.Sleep(10 * time.Millisecond) // Give time to scheduler to topup the account
if acnt, err := acntDb.GetAccount("cgrates.org:12344"); err != nil {
t.Error(err)
diff --git a/general_tests/ddazmbl2_test.go b/general_tests/ddazmbl2_test.go
index 7ea5fe9b6..090768b0b 100644
--- a/general_tests/ddazmbl2_test.go
+++ b/general_tests/ddazmbl2_test.go
@@ -129,7 +129,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10`
}
func TestExecuteActions2(t *testing.T) {
- scheduler.NewScheduler(ratingDb2).Reload(false)
+ scheduler.NewScheduler(ratingDb2).Reload()
time.Sleep(10 * time.Millisecond) // Give time to scheduler to topup the account
if acnt, err := acntDb2.GetAccount("cgrates.org:12345"); err != nil {
t.Error(err)
diff --git a/general_tests/ddazmbl3_test.go b/general_tests/ddazmbl3_test.go
index c5928bb6a..2aedde166 100644
--- a/general_tests/ddazmbl3_test.go
+++ b/general_tests/ddazmbl3_test.go
@@ -127,7 +127,7 @@ RP_UK,DR_UK_Mobile_BIG5,ALWAYS,10`
}
func TestExecuteActions3(t *testing.T) {
- scheduler.NewScheduler(ratingDb3).Reload(false)
+ scheduler.NewScheduler(ratingDb3).Reload()
time.Sleep(10 * time.Millisecond) // Give time to scheduler to topup the account
if acnt, err := acntDb3.GetAccount("cgrates.org:12346"); err != nil {
t.Error(err)
diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go
index bc9d44b59..7b6059187 100644
--- a/scheduler/scheduler.go
+++ b/scheduler/scheduler.go
@@ -28,10 +28,10 @@ import (
)
type Scheduler struct {
- queue engine.ActionTimingPriorityList
- timer *time.Timer
- restartLoop chan bool
- sync.Mutex
+ sync.RWMutex
+ queue engine.ActionTimingPriorityList
+ timer *time.Timer
+ restartLoop chan bool
storage engine.RatingStorage
schedulerStarted bool
}
@@ -158,11 +158,14 @@ func (s *Scheduler) restart() {
}
}
-func (s *Scheduler) GetQueue() engine.ActionTimingPriorityList {
- return s.queue
+func (s *Scheduler) GetQueue() (queue engine.ActionTimingPriorityList) {
+ s.RLock()
+ utils.Clone(s.queue, &queue)
+ defer s.RUnlock()
+ return queue
}
-func (s *Scheduler) Shutdown() error {
+func (s *Scheduler) Shutdown() {
s.schedulerStarted = false // disable loop on next run
s.restartLoop <- true // cancel waiting tasks
if s.timer != nil {
diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go
new file mode 100644
index 000000000..4aa0cf5a5
--- /dev/null
+++ b/servmanager/servmanager.go
@@ -0,0 +1,181 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+package servmanager
+
+import (
+ "errors"
+ "reflect"
+ "strings"
+ "sync"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/scheduler"
+ "github.com/cgrates/cgrates/utils"
+ "github.com/cgrates/rpcclient"
+)
+
+func NewServiceManager(cfg *config.CGRConfig, ratingDB engine.RatingStorage, engineShutdown chan bool, cacheDoneChan chan struct{}) *ServiceManager {
+ return &ServiceManager{cfg: cfg, ratingDB: ratingDB, engineShutdown: engineShutdown, cacheDoneChan: cacheDoneChan}
+}
+
+// ServiceManager handles starting/stopping of the services ran by the engine
+type ServiceManager struct {
+ sync.RWMutex // lock access to any shared data
+ cfg *config.CGRConfig
+ ratingDB engine.RatingStorage
+ engineShutdown chan bool
+ cacheDoneChan chan struct{} // Wait for cache to load
+ sched *scheduler.Scheduler
+}
+
+func (srvMngr *ServiceManager) StartScheduler(waitCache bool) error {
+ srvMngr.RLock()
+ schedRunning := srvMngr.sched != nil
+ srvMngr.RUnlock()
+ if schedRunning {
+ return utils.NewCGRError(utils.ServiceManager,
+ utils.CapitalizedMessage(utils.ServiceAlreadyRunning),
+ utils.ServiceAlreadyRunning,
+ "the scheduler is already running")
+ }
+ if waitCache { // Wait for cache to load data before starting
+ cacheDone := <-srvMngr.cacheDoneChan
+ srvMngr.cacheDoneChan <- cacheDone
+ }
+ utils.Logger.Info(" Starting CGRateS Scheduler.")
+ sched := scheduler.NewScheduler(srvMngr.ratingDB)
+ srvMngr.Lock()
+ srvMngr.sched = sched
+ srvMngr.Unlock()
+ go func() {
+ sched.Loop()
+ srvMngr.Lock()
+ srvMngr.sched = nil // if we are after loop, the service is down
+ srvMngr.Unlock()
+ if srvMngr.cfg.SchedulerEnabled {
+ srvMngr.engineShutdown <- true // shutdown engine since this service should be running
+ }
+ }()
+ return nil
+}
+
+func (srvMngr *ServiceManager) StopScheduler() error {
+ var sched *scheduler.Scheduler
+ srvMngr.Lock()
+ schedRunning := srvMngr.sched != nil
+ if schedRunning {
+ sched = srvMngr.sched
+ srvMngr.sched = nil // optimize the lock and release here
+ }
+ srvMngr.Unlock()
+ if !schedRunning {
+ return utils.NewCGRError(utils.ServiceManager,
+ utils.CapitalizedMessage(utils.ServiceAlreadyRunning),
+ utils.ServiceAlreadyRunning,
+ "the scheduler is not running")
+ }
+ sched.Shutdown()
+ return nil
+}
+
+func (srvMngr *ServiceManager) GetScheduler() *scheduler.Scheduler {
+ srvMngr.RLock()
+ defer srvMngr.RUnlock()
+ return srvMngr.sched
+}
+
+func (srvMngr *ServiceManager) Call(serviceMethod string, args interface{}, reply interface{}) error {
+ parts := strings.Split(serviceMethod, ".")
+ if len(parts) != 2 {
+ return rpcclient.ErrUnsupporteServiceMethod
+ }
+ // get method
+ method := reflect.ValueOf(srvMngr).MethodByName(parts[0][len(parts[0])-2:] + parts[1]) // Inherit the version in the method
+ if !method.IsValid() {
+ return rpcclient.ErrUnsupporteServiceMethod
+ }
+ // construct the params
+ params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
+ ret := method.Call(params)
+ if len(ret) != 1 {
+ return utils.ErrServerError
+ }
+ if ret[0].Interface() == nil {
+ return nil
+ }
+ err, ok := ret[0].Interface().(error)
+ if !ok {
+ return utils.ErrServerError
+ }
+ return err
+}
+
+// ArgShutdownService are passed to ShutdownService RPC method
+type ArgStartService struct {
+ ServiceID string
+}
+
+// ShutdownService shuts-down a service with ID
+func (srvMngr *ServiceManager) V1StartService(args ArgStartService, reply *string) (err error) {
+ switch args.ServiceID {
+ case utils.MetaScheduler:
+ err = srvMngr.StartScheduler(false)
+ default:
+ err = errors.New(utils.UnsupportedServiceIDCaps)
+ }
+ if err != nil {
+ return err
+ }
+ *reply = utils.OK
+ return
+}
+
+// ShutdownService shuts-down a service with ID
+func (srvMngr *ServiceManager) V1StopService(args ArgStartService, reply *string) (err error) {
+ switch args.ServiceID {
+ case utils.MetaScheduler:
+ err = srvMngr.StopScheduler()
+ default:
+ err = errors.New(utils.UnsupportedServiceIDCaps)
+ }
+ if err != nil {
+ return err
+ }
+ *reply = utils.OK
+ return
+}
+
+// ShutdownService shuts-down a service with ID
+func (srvMngr *ServiceManager) V1ServiceStatus(args ArgStartService, reply *string) error {
+ srvMngr.RLock()
+ defer srvMngr.RUnlock()
+ var running bool
+ switch args.ServiceID {
+ case utils.MetaScheduler:
+ running = srvMngr.sched != nil
+ default:
+ return errors.New(utils.UnsupportedServiceIDCaps)
+ }
+ if running {
+ *reply = utils.RunningCaps
+ } else {
+ *reply = utils.StoppedCaps
+ }
+ return nil
+}
diff --git a/utils/consts.go b/utils/consts.go
index 1283bf7df..952f90069 100644
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -313,4 +313,12 @@ const (
UnsupportedCachePrefix = "unsupported cache prefix"
CDRSCtx = "cdrs"
MandatoryInfoMissing = "mandatory information missing"
+ UnsupportedServiceIDCaps = "UNSUPPORTED_SERVICE_ID"
+ ServiceManager = "service_manager"
+ ServiceAlreadyRunning = "service already running"
+ ServiceNotRunning = "service not running"
+ RunningCaps = "RUNNING"
+ StoppedCaps = "STOPPED"
+ SchedulerNotRunningCaps = "SCHEDULLER_NOT_RUNNING"
+ MetaScheduler = "*scheduler"
)
diff --git a/utils/coreutils.go b/utils/coreutils.go
index 227c2af62..3c1dd12f2 100644
--- a/utils/coreutils.go
+++ b/utils/coreutils.go
@@ -623,3 +623,10 @@ func (slc Int64Slice) Swap(i, j int) {
func (slc Int64Slice) Less(i, j int) bool {
return slc[i] < slc[j]
}
+
+// CapitalizeErrorMessage returns the capitalized version of an error, useful in APIs
+func CapitalizedMessage(errMessage string) (capStr string) {
+ capStr = strings.ToUpper(errMessage)
+ capStr = strings.Replace(capStr, " ", "_", -1)
+ return
+}
diff --git a/utils/coreutils_test.go b/utils/coreutils_test.go
index b735c9b5f..bfd82db9f 100644
--- a/utils/coreutils_test.go
+++ b/utils/coreutils_test.go
@@ -732,3 +732,9 @@ func TestLess(t *testing.T) {
t.Error("Expected:", expected, ", received:", t1.Less(1, 2))
}
}
+
+func TestCapitalizedMessage(t *testing.T) {
+ if capMsg := CapitalizedMessage(ServiceAlreadyRunning); capMsg != "SERVICE_ALREADY_RUNNING" {
+ t.Errorf("Received: <%s>", capMsg)
+ }
+}