mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
New ServiceManager handling dynamic service start/stop, *scheduler management through it, new CapitalizeMessage function in utils
This commit is contained in:
@@ -18,6 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package v1
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
@@ -130,8 +131,12 @@ func (self *ApierV1) RemActionTiming(attrs AttrRemActionTiming, reply *string) e
|
||||
*reply = err.Error()
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
if attrs.ReloadScheduler && self.Sched != nil {
|
||||
self.Sched.Reload(true)
|
||||
if attrs.ReloadScheduler {
|
||||
sched := self.ServManager.GetScheduler()
|
||||
if sched == nil {
|
||||
return errors.New(utils.SchedulerNotRunningCaps)
|
||||
}
|
||||
sched.Reload()
|
||||
}
|
||||
*reply = OK
|
||||
return nil
|
||||
@@ -235,10 +240,11 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) error
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
if attr.ReloadScheduler && schedulerReloadNeeded {
|
||||
// reload scheduler
|
||||
if self.Sched != nil {
|
||||
self.Sched.Reload(true)
|
||||
sched := self.ServManager.GetScheduler()
|
||||
if sched == nil {
|
||||
return errors.New(utils.SchedulerNotRunningCaps)
|
||||
}
|
||||
sched.Reload()
|
||||
}
|
||||
*reply = OK // This will mark saving of the account, error still can show up in actionTimingsId
|
||||
return nil
|
||||
@@ -288,12 +294,7 @@ func (self *ApierV1) RemoveAccount(attr utils.AttrRemoveAccount, reply *string)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
if attr.ReloadScheduler && len(dirtyActionPlans) > 0 {
|
||||
// reload scheduler
|
||||
if self.Sched != nil {
|
||||
self.Sched.Reload(true)
|
||||
}
|
||||
}
|
||||
|
||||
*reply = OK
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ import (
|
||||
"github.com/cgrates/cgrates/cache"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/scheduler"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
@@ -43,9 +43,9 @@ type ApierV1 struct {
|
||||
RatingDb engine.RatingStorage
|
||||
AccountDb engine.AccountingStorage
|
||||
CdrDb engine.CdrStorage
|
||||
Sched *scheduler.Scheduler
|
||||
Config *config.CGRConfig
|
||||
Responder *engine.Responder
|
||||
ServManager *servmanager.ServiceManager
|
||||
CdrStatsSrv rpcclient.RpcClientConnection
|
||||
Users rpcclient.RpcClientConnection
|
||||
CDRs rpcclient.RpcClientConnection // FixMe: populate it from cgr-engine
|
||||
@@ -333,9 +333,11 @@ func (self *ApierV1) LoadTariffPlanFromStorDb(attrs AttrLoadTpFromStorDb, reply
|
||||
self.RatingDb.PreloadRatingCache()
|
||||
self.AccountDb.PreloadAccountingCache()
|
||||
|
||||
if len(aps) != 0 && self.Sched != nil {
|
||||
utils.Logger.Info("ApierV1.LoadTariffPlanFromStorDb, reloading scheduler.")
|
||||
self.Sched.Reload(true)
|
||||
if len(aps) != 0 {
|
||||
sched := self.ServManager.GetScheduler()
|
||||
if sched != nil {
|
||||
sched.Reload()
|
||||
}
|
||||
}
|
||||
|
||||
if len(cstKeys) != 0 && self.CdrStatsSrv != nil {
|
||||
@@ -621,10 +623,11 @@ func (self *ApierV1) SetActionPlan(attrs AttrSetActionPlan, reply *string) error
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
if attrs.ReloadScheduler {
|
||||
if self.Sched == nil {
|
||||
return errors.New("SCHEDULER_NOT_ENABLED")
|
||||
sched := self.ServManager.GetScheduler()
|
||||
if sched == nil {
|
||||
return errors.New(utils.SchedulerNotRunningCaps)
|
||||
}
|
||||
self.Sched.Reload(true)
|
||||
sched.Reload()
|
||||
}
|
||||
*reply = OK
|
||||
return nil
|
||||
@@ -672,19 +675,21 @@ func (self *ApierV1) LoadAccountActions(attrs utils.TPAccountActions, reply *str
|
||||
}
|
||||
// ToDo: Get the action keys loaded by dbReader so we reload only these in cache
|
||||
// Need to do it before scheduler otherwise actions to run will be unknown
|
||||
if self.Sched != nil {
|
||||
self.Sched.Reload(true)
|
||||
sched := self.ServManager.GetScheduler()
|
||||
if sched != nil {
|
||||
sched.Reload()
|
||||
}
|
||||
*reply = OK
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *ApierV1) ReloadScheduler(input string, reply *string) error {
|
||||
if self.Sched == nil {
|
||||
return utils.ErrNotFound
|
||||
func (self *ApierV1) ReloadScheduler(ignore string, reply *string) error {
|
||||
sched := self.ServManager.GetScheduler()
|
||||
if sched == nil {
|
||||
return errors.New(utils.SchedulerNotRunningCaps)
|
||||
}
|
||||
self.Sched.Reload(true)
|
||||
*reply = OK
|
||||
sched.Reload()
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -928,10 +933,11 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
|
||||
cache.Flush()
|
||||
self.RatingDb.PreloadRatingCache()
|
||||
self.AccountDb.PreloadAccountingCache()
|
||||
|
||||
if len(aps) != 0 && self.Sched != nil {
|
||||
utils.Logger.Info("ApierV1.LoadTariffPlanFromFolder, reloading scheduler.")
|
||||
self.Sched.Reload(true)
|
||||
if len(aps) != 0 {
|
||||
sched := self.ServManager.GetScheduler()
|
||||
if sched != nil {
|
||||
sched.Reload()
|
||||
}
|
||||
}
|
||||
if len(cstKeys) != 0 && self.CdrStatsSrv != nil {
|
||||
var out int
|
||||
|
||||
@@ -99,18 +99,9 @@ func TestApierInitStorDb(t *testing.T) {
|
||||
|
||||
// Finds cgr-engine executable and starts it with default configuration
|
||||
func TestApierStartEngine(t *testing.T) {
|
||||
enginePath, err := exec.LookPath("cgr-engine")
|
||||
if err != nil {
|
||||
t.Fatal("Cannot find cgr-engine executable")
|
||||
if _, err := engine.StopStartEngine(cfgPath, *waitRater); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
exec.Command("pkill", "cgr-engine").Run() // Just to make sure another one is not running, bit brutal maybe we can fine tune it
|
||||
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
|
||||
engine := exec.Command(enginePath, "-config_dir", cfgPath)
|
||||
//engine.Stderr = os.Stderr
|
||||
if err := engine.Start(); err != nil {
|
||||
t.Fatal("Cannot start cgr-engine: ", err.Error())
|
||||
}
|
||||
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time to rater to fire up
|
||||
}
|
||||
|
||||
// Connect rpc client to rater
|
||||
|
||||
@@ -112,11 +112,12 @@ type ScheduledActions struct {
|
||||
}
|
||||
|
||||
func (self *ApierV1) GetScheduledActions(attrs AttrsGetScheduledActions, reply *[]*ScheduledActions) error {
|
||||
if self.Sched == nil {
|
||||
return errors.New("SCHEDULER_NOT_ENABLED")
|
||||
sched := self.ServManager.GetScheduler()
|
||||
if sched == nil {
|
||||
return errors.New(utils.SchedulerNotRunningCaps)
|
||||
}
|
||||
schedActions := make([]*ScheduledActions, 0) // needs to be initialized if remains empty
|
||||
scheduledActions := self.Sched.GetQueue()
|
||||
scheduledActions := sched.GetQueue()
|
||||
for _, qActions := range scheduledActions {
|
||||
sas := &ScheduledActions{ActionsId: qActions.ActionsID, ActionPlanId: qActions.GetActionPlanID(), ActionTimingUuid: qActions.Uuid, Accounts: len(qActions.GetAccountIDs())}
|
||||
if attrs.SearchTerm != "" &&
|
||||
|
||||
@@ -18,6 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package v2
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
@@ -207,10 +208,11 @@ func (self *ApierV2) SetAccount(attr AttrSetAccount, reply *string) error {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
if attr.ReloadScheduler && len(dirtyActionPlans) > 0 {
|
||||
// reload scheduler
|
||||
if self.Sched != nil {
|
||||
self.Sched.Reload(true)
|
||||
sched := self.ServManager.GetScheduler()
|
||||
if sched == nil {
|
||||
return errors.New(utils.SchedulerNotRunningCaps)
|
||||
}
|
||||
sched.Reload()
|
||||
}
|
||||
*reply = utils.OK // This will mark saving of the account, error still can show up in actionTimingsId
|
||||
return nil
|
||||
|
||||
@@ -79,8 +79,9 @@ func (self *ApierV2) LoadAccountActions(attrs AttrLoadAccountActions, reply *str
|
||||
}, 0, attrs.AccountActionsId); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
if self.Sched != nil {
|
||||
self.Sched.Reload(true)
|
||||
sched := self.ServManager.GetScheduler()
|
||||
if sched != nil {
|
||||
sched.Reload()
|
||||
}
|
||||
*reply = v1.OK
|
||||
return nil
|
||||
@@ -211,9 +212,12 @@ func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
|
||||
self.RatingDb.PreloadRatingCache()
|
||||
self.AccountDb.PreloadAccountingCache()
|
||||
|
||||
if len(aps) != 0 && self.Sched != nil {
|
||||
utils.Logger.Info("ApierV2.LoadTariffPlanFromFolder, reloading scheduler.")
|
||||
self.Sched.Reload(true)
|
||||
if len(aps) != 0 {
|
||||
sched := self.ServManager.GetScheduler()
|
||||
if sched != nil {
|
||||
utils.Logger.Info("ApierV2.LoadTariffPlanFromFolder, reloading scheduler.")
|
||||
sched.Reload()
|
||||
}
|
||||
}
|
||||
if len(cstKeys) != 0 && self.CdrStatsSrv != nil {
|
||||
var out int
|
||||
|
||||
@@ -39,6 +39,7 @@ import (
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/history"
|
||||
"github.com/cgrates/cgrates/scheduler"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/sessionmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
@@ -412,21 +413,20 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, cdrDb engine
|
||||
internalCdrSChan <- cdrServer // Signal that cdrS is operational
|
||||
}
|
||||
|
||||
func startScheduler(internalSchedulerChan chan *scheduler.Scheduler, cacheDoneChan chan struct{}, ratingDb engine.RatingStorage, exitChan chan bool) {
|
||||
func startScheduler(internalSchedulerChan chan *scheduler.Scheduler, cacheDoneChan chan struct{}, ratingDB engine.RatingStorage, exitChan chan bool) {
|
||||
// Wait for cache to load data before starting
|
||||
cacheDone := <-cacheDoneChan
|
||||
cacheDoneChan <- cacheDone
|
||||
utils.Logger.Info("Starting CGRateS Scheduler.")
|
||||
sched := scheduler.NewScheduler(ratingDb)
|
||||
time.Sleep(1)
|
||||
sched := scheduler.NewScheduler(ratingDB)
|
||||
internalSchedulerChan <- sched
|
||||
sched.Reload(true)
|
||||
|
||||
sched.Loop()
|
||||
exitChan <- true // Should not get out of loop though
|
||||
}
|
||||
|
||||
func startCdrStats(internalCdrStatSChan chan rpcclient.RpcClientConnection, ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, server *utils.Server) {
|
||||
cdrStats := engine.NewStats(ratingDb, accountDb, cfg.CDRStatsSaveInterval)
|
||||
func startCdrStats(internalCdrStatSChan chan rpcclient.RpcClientConnection, ratingDB engine.RatingStorage, accountDb engine.AccountingStorage, server *utils.Server) {
|
||||
cdrStats := engine.NewStats(ratingDB, accountDb, cfg.CDRStatsSaveInterval)
|
||||
server.RpcRegister(cdrStats)
|
||||
server.RpcRegister(&v1.CDRStatsV1{CdrStats: cdrStats}) // Public APIs
|
||||
internalCdrStatSChan <- cdrStats
|
||||
@@ -615,19 +615,19 @@ func main() {
|
||||
// Init cache
|
||||
cache.NewCache(cfg.CacheConfig)
|
||||
|
||||
var ratingDb engine.RatingStorage
|
||||
var ratingDB engine.RatingStorage
|
||||
var accountDb engine.AccountingStorage
|
||||
var loadDb engine.LoadStorage
|
||||
var cdrDb engine.CdrStorage
|
||||
if cfg.RALsEnabled || cfg.SchedulerEnabled || cfg.CDRStatsEnabled { // Only connect to dataDb if necessary
|
||||
ratingDb, err = engine.ConfigureRatingStorage(cfg.TpDbType, cfg.TpDbHost, cfg.TpDbPort,
|
||||
ratingDB, err = engine.ConfigureRatingStorage(cfg.TpDbType, cfg.TpDbHost, cfg.TpDbPort,
|
||||
cfg.TpDbName, cfg.TpDbUser, cfg.TpDbPass, cfg.DBDataEncoding, cfg.CacheConfig, cfg.LoadHistorySize)
|
||||
if err != nil { // Cannot configure getter database, show stopper
|
||||
utils.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err))
|
||||
return
|
||||
}
|
||||
defer ratingDb.Close()
|
||||
engine.SetRatingStorage(ratingDb)
|
||||
defer ratingDB.Close()
|
||||
engine.SetRatingStorage(ratingDB)
|
||||
}
|
||||
if cfg.RALsEnabled || cfg.CDRStatsEnabled || cfg.PubSubServerEnabled || cfg.AliasesServerEnabled || cfg.UserServerEnabled {
|
||||
accountDb, err = engine.ConfigureAccountingStorage(cfg.DataDbType, cfg.DataDbHost, cfg.DataDbPort,
|
||||
@@ -671,7 +671,6 @@ func main() {
|
||||
internalBalancerChan := make(chan *balancer2go.Balancer, 1)
|
||||
internalRaterChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
cacheDoneChan := make(chan struct{}, 1)
|
||||
internalSchedulerChan := make(chan *scheduler.Scheduler, 1)
|
||||
internalCdrSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
internalCdrStatSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
internalHistorySChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
@@ -680,6 +679,10 @@ func main() {
|
||||
internalAliaseSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
internalSMGChan := make(chan *sessionmanager.SMGeneric, 1)
|
||||
internalRLSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
|
||||
// Start ServiceManager
|
||||
srvManager := servmanager.NewServiceManager(cfg, ratingDB, exitChan, cacheDoneChan)
|
||||
|
||||
// Start balancer service
|
||||
if cfg.BalancerEnabled {
|
||||
go startBalancer(internalBalancerChan, &stopHandled, exitChan) // Not really needed async here but to cope with uniformity
|
||||
@@ -687,13 +690,13 @@ func main() {
|
||||
|
||||
// Start rater service
|
||||
if cfg.RALsEnabled {
|
||||
go startRater(internalRaterChan, cacheDoneChan, internalBalancerChan, internalSchedulerChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan,
|
||||
server, ratingDb, accountDb, loadDb, cdrDb, &stopHandled, exitChan)
|
||||
go startRater(internalRaterChan, cacheDoneChan, internalBalancerChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan,
|
||||
srvManager, server, ratingDB, accountDb, loadDb, cdrDb, &stopHandled, exitChan)
|
||||
}
|
||||
|
||||
// Start Scheduler
|
||||
if cfg.SchedulerEnabled {
|
||||
go startScheduler(internalSchedulerChan, cacheDoneChan, ratingDb, exitChan)
|
||||
go srvManager.StartScheduler(true)
|
||||
}
|
||||
|
||||
// Start CDR Server
|
||||
@@ -704,7 +707,7 @@ func main() {
|
||||
|
||||
// Start CDR Stats server
|
||||
if cfg.CDRStatsEnabled {
|
||||
go startCdrStats(internalCdrStatSChan, ratingDb, accountDb, server)
|
||||
go startCdrStats(internalCdrStatSChan, ratingDB, accountDb, server)
|
||||
}
|
||||
|
||||
// Start CDRC components if necessary
|
||||
|
||||
@@ -26,7 +26,7 @@ import (
|
||||
"github.com/cgrates/cgrates/balancer2go"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/history"
|
||||
"github.com/cgrates/cgrates/scheduler"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
@@ -50,10 +50,10 @@ func startBalancer(internalBalancerChan chan *balancer2go.Balancer, stopHandled
|
||||
}
|
||||
|
||||
// Starts rater and reports on chan
|
||||
func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneChan chan struct{}, internalBalancerChan chan *balancer2go.Balancer, internalSchedulerChan chan *scheduler.Scheduler,
|
||||
func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneChan chan struct{}, internalBalancerChan chan *balancer2go.Balancer,
|
||||
internalCdrStatSChan chan rpcclient.RpcClientConnection, internalHistorySChan chan rpcclient.RpcClientConnection,
|
||||
internalPubSubSChan chan rpcclient.RpcClientConnection, internalUserSChan chan rpcclient.RpcClientConnection, internalAliaseSChan chan rpcclient.RpcClientConnection,
|
||||
server *utils.Server,
|
||||
serviceManager *servmanager.ServiceManager, server *utils.Server,
|
||||
ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, stopHandled *bool, exitChan chan bool) {
|
||||
var waitTasks []chan struct{}
|
||||
|
||||
@@ -79,24 +79,6 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
|
||||
cacheDoneChan <- struct{}{}
|
||||
}()
|
||||
|
||||
// Retrieve scheduler for it's API methods
|
||||
var sched *scheduler.Scheduler // Need the scheduler in APIer
|
||||
if cfg.SchedulerEnabled {
|
||||
schedTaskChan := make(chan struct{})
|
||||
waitTasks = append(waitTasks, schedTaskChan)
|
||||
go func() {
|
||||
defer close(schedTaskChan)
|
||||
select {
|
||||
case sched = <-internalSchedulerChan:
|
||||
internalSchedulerChan <- sched
|
||||
case <-time.After(cfg.InternalTtl):
|
||||
utils.Logger.Crit("<Rater>: Internal scheduler connection timeout.")
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
|
||||
}()
|
||||
}
|
||||
var bal *balancer2go.Balancer
|
||||
if cfg.RALsBalancer != "" { // Connection to balancer
|
||||
balTaskChan := make(chan struct{})
|
||||
@@ -200,8 +182,8 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
|
||||
}
|
||||
responder := &engine.Responder{Bal: bal, ExitChan: exitChan}
|
||||
responder.SetTimeToLive(cfg.ResponseCacheTTL, nil)
|
||||
apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, Sched: sched,
|
||||
Config: cfg, Responder: responder}
|
||||
apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb,
|
||||
Config: cfg, Responder: responder, ServManager: serviceManager}
|
||||
if cdrStats != nil { // ToDo: Fix here properly the init of stats
|
||||
responder.Stats = cdrStats
|
||||
apierRpcV1.CdrStatsSrv = cdrStats
|
||||
@@ -212,7 +194,6 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
|
||||
apierRpcV2 := &v2.ApierV2{
|
||||
ApierV1: *apierRpcV1}
|
||||
|
||||
// internalSchedulerChan shared here
|
||||
server.RpcRegister(responder)
|
||||
server.RpcRegister(apierRpcV1)
|
||||
server.RpcRegister(apierRpcV2)
|
||||
|
||||
@@ -129,7 +129,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10`
|
||||
}
|
||||
|
||||
func TestExecuteActions(t *testing.T) {
|
||||
scheduler.NewScheduler(ratingDb).Reload(false)
|
||||
scheduler.NewScheduler(ratingDb).Reload()
|
||||
time.Sleep(10 * time.Millisecond) // Give time to scheduler to topup the account
|
||||
if acnt, err := acntDb.GetAccount("cgrates.org:12344"); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
@@ -129,7 +129,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10`
|
||||
}
|
||||
|
||||
func TestExecuteActions2(t *testing.T) {
|
||||
scheduler.NewScheduler(ratingDb2).Reload(false)
|
||||
scheduler.NewScheduler(ratingDb2).Reload()
|
||||
time.Sleep(10 * time.Millisecond) // Give time to scheduler to topup the account
|
||||
if acnt, err := acntDb2.GetAccount("cgrates.org:12345"); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
@@ -127,7 +127,7 @@ RP_UK,DR_UK_Mobile_BIG5,ALWAYS,10`
|
||||
}
|
||||
|
||||
func TestExecuteActions3(t *testing.T) {
|
||||
scheduler.NewScheduler(ratingDb3).Reload(false)
|
||||
scheduler.NewScheduler(ratingDb3).Reload()
|
||||
time.Sleep(10 * time.Millisecond) // Give time to scheduler to topup the account
|
||||
if acnt, err := acntDb3.GetAccount("cgrates.org:12346"); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
@@ -28,10 +28,10 @@ import (
|
||||
)
|
||||
|
||||
type Scheduler struct {
|
||||
queue engine.ActionTimingPriorityList
|
||||
timer *time.Timer
|
||||
restartLoop chan bool
|
||||
sync.Mutex
|
||||
sync.RWMutex
|
||||
queue engine.ActionTimingPriorityList
|
||||
timer *time.Timer
|
||||
restartLoop chan bool
|
||||
storage engine.RatingStorage
|
||||
schedulerStarted bool
|
||||
}
|
||||
@@ -158,11 +158,14 @@ func (s *Scheduler) restart() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) GetQueue() engine.ActionTimingPriorityList {
|
||||
return s.queue
|
||||
func (s *Scheduler) GetQueue() (queue engine.ActionTimingPriorityList) {
|
||||
s.RLock()
|
||||
utils.Clone(s.queue, &queue)
|
||||
defer s.RUnlock()
|
||||
return queue
|
||||
}
|
||||
|
||||
func (s *Scheduler) Shutdown() error {
|
||||
func (s *Scheduler) Shutdown() {
|
||||
s.schedulerStarted = false // disable loop on next run
|
||||
s.restartLoop <- true // cancel waiting tasks
|
||||
if s.timer != nil {
|
||||
|
||||
181
servmanager/servmanager.go
Normal file
181
servmanager/servmanager.go
Normal file
@@ -0,0 +1,181 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package servmanager
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/scheduler"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
func NewServiceManager(cfg *config.CGRConfig, ratingDB engine.RatingStorage, engineShutdown chan bool, cacheDoneChan chan struct{}) *ServiceManager {
|
||||
return &ServiceManager{cfg: cfg, ratingDB: ratingDB, engineShutdown: engineShutdown, cacheDoneChan: cacheDoneChan}
|
||||
}
|
||||
|
||||
// ServiceManager handles starting/stopping of the services ran by the engine
|
||||
type ServiceManager struct {
|
||||
sync.RWMutex // lock access to any shared data
|
||||
cfg *config.CGRConfig
|
||||
ratingDB engine.RatingStorage
|
||||
engineShutdown chan bool
|
||||
cacheDoneChan chan struct{} // Wait for cache to load
|
||||
sched *scheduler.Scheduler
|
||||
}
|
||||
|
||||
func (srvMngr *ServiceManager) StartScheduler(waitCache bool) error {
|
||||
srvMngr.RLock()
|
||||
schedRunning := srvMngr.sched != nil
|
||||
srvMngr.RUnlock()
|
||||
if schedRunning {
|
||||
return utils.NewCGRError(utils.ServiceManager,
|
||||
utils.CapitalizedMessage(utils.ServiceAlreadyRunning),
|
||||
utils.ServiceAlreadyRunning,
|
||||
"the scheduler is already running")
|
||||
}
|
||||
if waitCache { // Wait for cache to load data before starting
|
||||
cacheDone := <-srvMngr.cacheDoneChan
|
||||
srvMngr.cacheDoneChan <- cacheDone
|
||||
}
|
||||
utils.Logger.Info("<ServiceManager> Starting CGRateS Scheduler.")
|
||||
sched := scheduler.NewScheduler(srvMngr.ratingDB)
|
||||
srvMngr.Lock()
|
||||
srvMngr.sched = sched
|
||||
srvMngr.Unlock()
|
||||
go func() {
|
||||
sched.Loop()
|
||||
srvMngr.Lock()
|
||||
srvMngr.sched = nil // if we are after loop, the service is down
|
||||
srvMngr.Unlock()
|
||||
if srvMngr.cfg.SchedulerEnabled {
|
||||
srvMngr.engineShutdown <- true // shutdown engine since this service should be running
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (srvMngr *ServiceManager) StopScheduler() error {
|
||||
var sched *scheduler.Scheduler
|
||||
srvMngr.Lock()
|
||||
schedRunning := srvMngr.sched != nil
|
||||
if schedRunning {
|
||||
sched = srvMngr.sched
|
||||
srvMngr.sched = nil // optimize the lock and release here
|
||||
}
|
||||
srvMngr.Unlock()
|
||||
if !schedRunning {
|
||||
return utils.NewCGRError(utils.ServiceManager,
|
||||
utils.CapitalizedMessage(utils.ServiceAlreadyRunning),
|
||||
utils.ServiceAlreadyRunning,
|
||||
"the scheduler is not running")
|
||||
}
|
||||
sched.Shutdown()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (srvMngr *ServiceManager) GetScheduler() *scheduler.Scheduler {
|
||||
srvMngr.RLock()
|
||||
defer srvMngr.RUnlock()
|
||||
return srvMngr.sched
|
||||
}
|
||||
|
||||
func (srvMngr *ServiceManager) Call(serviceMethod string, args interface{}, reply interface{}) error {
|
||||
parts := strings.Split(serviceMethod, ".")
|
||||
if len(parts) != 2 {
|
||||
return rpcclient.ErrUnsupporteServiceMethod
|
||||
}
|
||||
// get method
|
||||
method := reflect.ValueOf(srvMngr).MethodByName(parts[0][len(parts[0])-2:] + parts[1]) // Inherit the version in the method
|
||||
if !method.IsValid() {
|
||||
return rpcclient.ErrUnsupporteServiceMethod
|
||||
}
|
||||
// construct the params
|
||||
params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
|
||||
ret := method.Call(params)
|
||||
if len(ret) != 1 {
|
||||
return utils.ErrServerError
|
||||
}
|
||||
if ret[0].Interface() == nil {
|
||||
return nil
|
||||
}
|
||||
err, ok := ret[0].Interface().(error)
|
||||
if !ok {
|
||||
return utils.ErrServerError
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// ArgShutdownService are passed to ShutdownService RPC method
|
||||
type ArgStartService struct {
|
||||
ServiceID string
|
||||
}
|
||||
|
||||
// ShutdownService shuts-down a service with ID
|
||||
func (srvMngr *ServiceManager) V1StartService(args ArgStartService, reply *string) (err error) {
|
||||
switch args.ServiceID {
|
||||
case utils.MetaScheduler:
|
||||
err = srvMngr.StartScheduler(false)
|
||||
default:
|
||||
err = errors.New(utils.UnsupportedServiceIDCaps)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// ShutdownService shuts-down a service with ID
|
||||
func (srvMngr *ServiceManager) V1StopService(args ArgStartService, reply *string) (err error) {
|
||||
switch args.ServiceID {
|
||||
case utils.MetaScheduler:
|
||||
err = srvMngr.StopScheduler()
|
||||
default:
|
||||
err = errors.New(utils.UnsupportedServiceIDCaps)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// ShutdownService shuts-down a service with ID
|
||||
func (srvMngr *ServiceManager) V1ServiceStatus(args ArgStartService, reply *string) error {
|
||||
srvMngr.RLock()
|
||||
defer srvMngr.RUnlock()
|
||||
var running bool
|
||||
switch args.ServiceID {
|
||||
case utils.MetaScheduler:
|
||||
running = srvMngr.sched != nil
|
||||
default:
|
||||
return errors.New(utils.UnsupportedServiceIDCaps)
|
||||
}
|
||||
if running {
|
||||
*reply = utils.RunningCaps
|
||||
} else {
|
||||
*reply = utils.StoppedCaps
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -313,4 +313,12 @@ const (
|
||||
UnsupportedCachePrefix = "unsupported cache prefix"
|
||||
CDRSCtx = "cdrs"
|
||||
MandatoryInfoMissing = "mandatory information missing"
|
||||
UnsupportedServiceIDCaps = "UNSUPPORTED_SERVICE_ID"
|
||||
ServiceManager = "service_manager"
|
||||
ServiceAlreadyRunning = "service already running"
|
||||
ServiceNotRunning = "service not running"
|
||||
RunningCaps = "RUNNING"
|
||||
StoppedCaps = "STOPPED"
|
||||
SchedulerNotRunningCaps = "SCHEDULLER_NOT_RUNNING"
|
||||
MetaScheduler = "*scheduler"
|
||||
)
|
||||
|
||||
@@ -623,3 +623,10 @@ func (slc Int64Slice) Swap(i, j int) {
|
||||
func (slc Int64Slice) Less(i, j int) bool {
|
||||
return slc[i] < slc[j]
|
||||
}
|
||||
|
||||
// CapitalizeErrorMessage returns the capitalized version of an error, useful in APIs
|
||||
func CapitalizedMessage(errMessage string) (capStr string) {
|
||||
capStr = strings.ToUpper(errMessage)
|
||||
capStr = strings.Replace(capStr, " ", "_", -1)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -732,3 +732,9 @@ func TestLess(t *testing.T) {
|
||||
t.Error("Expected:", expected, ", received:", t1.Less(1, 2))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCapitalizedMessage(t *testing.T) {
|
||||
if capMsg := CapitalizedMessage(ServiceAlreadyRunning); capMsg != "SERVICE_ALREADY_RUNNING" {
|
||||
t.Errorf("Received: <%s>", capMsg)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user