Added tests for scheduler as service

This commit is contained in:
Trial97
2019-09-19 14:34:22 +03:00
committed by Dan Christian Bogos
parent 55c3e50911
commit d18f9da6d6
5 changed files with 143 additions and 29 deletions

View File

@@ -1098,25 +1098,6 @@ func initLogger(cfg *config.CGRConfig) error {
return nil
}
func schedCDRsConns(internalCDRSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
intChan := internalCDRSChan
if cfg.DispatcherSCfg().Enabled {
intChan = internalDispatcherSChan
}
cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SchedulerCfg().CDRsConns, intChan, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to CDRServer: %s", utils.SchedulerS, err.Error()))
exitChan <- true
return
}
engine.SetSchedCdrsConns(cdrsConn)
}
func initConfigSv1(internalConfigChan chan rpcclient.RpcClientConnection,
server *utils.Server) {
cfgSv1 := v1.NewConfigSv1(cfg)
@@ -1419,11 +1400,6 @@ func main() {
internalDispatcherSChan, cdrDb, dm, server, filterSChan, exitChan)
}
// Create connection to CDR Server and share it in engine(used for *cdrlog action)
if len(cfg.SchedulerCfg().CDRsConns) != 0 {
go schedCDRsConns(internalCdrSChan, internalDispatcherSChan, exitChan)
}
// Start CDRC components if necessary
go startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan, filterSChan, exitChan)

View File

@@ -248,6 +248,34 @@ func TestCGRConfigReloadSupplierS(t *testing.T) {
}
}
func TestCGRConfigReloadSchedulerS(t *testing.T) {
cfg, err := NewDefaultCGRConfig()
if err != nil {
t.Fatal(err)
}
var reply string
if err = cfg.V1ReloadConfig(&ConfigReloadWithArgDispatcher{
Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo2"),
Section: SCHEDULER_JSN,
}, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("Expected OK received: %s", reply)
}
expAttr := &SchedulerCfg{
Enabled: true,
CDRsConns: []*RemoteHost{
&RemoteHost{
Address: "127.0.0.1:2012",
Transport: utils.MetaJSONrpc,
},
},
}
if !reflect.DeepEqual(expAttr, cfg.SchedulerCfg()) {
t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.SchedulerCfg()))
}
}
func TestCgrCfgV1ReloadConfigSection(t *testing.T) {
for _, dir := range []string{"/tmp/ers/in", "/tmp/ers/out"} {
if err := os.RemoveAll(dir); err != nil {

View File

@@ -23,6 +23,7 @@ import (
"sync"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/scheduler"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
@@ -51,21 +52,30 @@ func (schS *SchedulerService) Start(sp servmanager.ServiceProvider, waitCache bo
}
schS.Lock()
if !waitCache { // Wait for cache to load data before starting
if waitCache { // Wait for cache to load data before starting
<-sp.GetCacheS().GetPrecacheChannel(utils.CacheActionPlans) // wait for ActionPlans to be cached
}
utils.Logger.Info("<ServiceManager> Starting CGRateS Scheduler.")
schS.schS = scheduler.NewScheduler(sp.GetDM())
schS.Unlock()
go schS.schS.Loop()
schS.rpc = v1.NewSchedulerSv1(sp.GetConfig())
if !sp.GetConfig().DispatcherSCfg().Enabled {
sp.GetServer().RpcRegister(schS.rpc)
}
schS.Unlock()
schS.connChan <- schS.rpc
// Create connection to CDR Server and share it in engine(used for *cdrlog action)
cdrsConn, err := sp.GetConnection(utils.CDRs, sp.GetConfig().SchedulerCfg().CDRsConns)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to CDRServer: %s", utils.SchedulerS, err.Error()))
return
}
// ToDo: this should be send to scheduler
engine.SetSchedCdrsConns(cdrsConn)
return
}
@@ -76,7 +86,9 @@ func (schS *SchedulerService) GetIntenternalChan() (conn chan rpcclient.RpcClien
// Reload handles the change of config
func (schS *SchedulerService) Reload(sp servmanager.ServiceProvider) (err error) {
schS.RLock()
schS.schS.Reload()
defer schS.RUnlock()
return
}
@@ -93,11 +105,15 @@ func (schS *SchedulerService) Shutdown() (err error) {
// GetRPCInterface returns the interface to register for server
func (schS *SchedulerService) GetRPCInterface() interface{} {
schS.RLock()
defer schS.RUnlock()
return schS.rpc
}
// IsRunning returns if the service is running
func (schS *SchedulerService) IsRunning() bool {
schS.RLock()
defer schS.RUnlock()
return schS != nil && schS.schS != nil
}

View File

@@ -0,0 +1,91 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package services
import (
"fmt"
"path"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
func TestSchedulerSReload(t *testing.T) {
cfg, err := config.NewDefaultCGRConfig()
if err != nil {
t.Fatal(err)
}
utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID)
utils.Logger.SetLogLevel(7)
filterSChan := make(chan *engine.FilterS, 1)
filterSChan <- nil
engineShutdown := make(chan bool, 1)
chS := engine.NewCacheS(cfg, nil)
close(chS.GetPrecacheChannel(utils.CacheActionPlans))
server := utils.NewServer()
dm, err := engine.ConfigureDataStorage(cfg.DataDbCfg().DataDbType,
cfg.DataDbCfg().DataDbHost, cfg.DataDbCfg().DataDbPort,
cfg.DataDbCfg().DataDbName, cfg.DataDbCfg().DataDbUser,
cfg.DataDbCfg().DataDbPass, cfg.GeneralCfg().DBDataEncoding,
cfg.CacheCfg(), cfg.DataDbCfg().DataDbSentinelName)
if err != nil {
t.Fatal(err)
}
srvMngr := servmanager.NewServiceManager(cfg, dm,
chS /*cdrStorage*/, nil,
/*loadStorage*/ nil, filterSChan,
server, nil, engineShutdown)
schS := NewSchedulerService()
srvMngr.AddService(schS)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}
if schS.IsRunning() {
t.Errorf("Expected service to be down")
}
var reply string
if err := cfg.V1ReloadConfig(&config.ConfigReloadWithArgDispatcher{
Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo"),
Section: config.SCHEDULER_JSN,
}, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("Expecting OK ,received %s", reply)
}
time.Sleep(10 * time.Millisecond) //need to switch to gorutine
if !schS.IsRunning() {
t.Errorf("Expected service to be running")
}
cfg.SchedulerCfg().Enabled = false
fmt.Println("1")
cfg.GetReloadChan(config.SCHEDULER_JSN) <- struct{}{}
fmt.Println("2")
time.Sleep(10 * time.Millisecond)
if schS.IsRunning() {
t.Errorf("Expected service to be down")
}
fmt.Println("3")
engineShutdown <- true
fmt.Println("4")
}

View File

@@ -207,7 +207,11 @@ func (srvMngr *ServiceManager) GetConnection(subsystem string, conns []*config.R
}
// srvMngr.RLock()
// defer srvMngr.RUnlock()
internalChan := srvMngr.subsystems[subsystem].GetIntenternalChan()
service, has := srvMngr.subsystems[subsystem]
if !has { // used to bypass the not implemented services
return nil, nil
}
internalChan := service.GetIntenternalChan()
if srvMngr.GetConfig().DispatcherSCfg().Enabled {
internalChan = srvMngr.dispatcherSChan
}
@@ -288,7 +292,6 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
}
}()
}
fmt.Println(srvMngr.cfg.SchedulerCfg().Enabled)
if srvMngr.cfg.SchedulerCfg().Enabled {
go func() {
if supS, has := srvMngr.subsystems[utils.SchedulerS]; !has {