diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index a0b536c58..00f606609 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -1098,25 +1098,6 @@ func initLogger(cfg *config.CGRConfig) error { return nil } -func schedCDRsConns(internalCDRSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, exitChan chan bool) { - intChan := internalCDRSChan - if cfg.DispatcherSCfg().Enabled { - intChan = internalDispatcherSChan - } - cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.SchedulerCfg().CDRsConns, intChan, false) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to CDRServer: %s", utils.SchedulerS, err.Error())) - exitChan <- true - return - } - engine.SetSchedCdrsConns(cdrsConn) -} - func initConfigSv1(internalConfigChan chan rpcclient.RpcClientConnection, server *utils.Server) { cfgSv1 := v1.NewConfigSv1(cfg) @@ -1419,11 +1400,6 @@ func main() { internalDispatcherSChan, cdrDb, dm, server, filterSChan, exitChan) } - // Create connection to CDR Server and share it in engine(used for *cdrlog action) - if len(cfg.SchedulerCfg().CDRsConns) != 0 { - go schedCDRsConns(internalCdrSChan, internalDispatcherSChan, exitChan) - } - // Start CDRC components if necessary go startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan, filterSChan, exitChan) diff --git a/config/config_it_test.go b/config/config_it_test.go index 938bdcaaf..4eb2dd117 100644 --- a/config/config_it_test.go +++ b/config/config_it_test.go @@ -248,6 +248,34 @@ func TestCGRConfigReloadSupplierS(t *testing.T) { } } +func TestCGRConfigReloadSchedulerS(t *testing.T) { + cfg, err := NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + var reply string + if err = cfg.V1ReloadConfig(&ConfigReloadWithArgDispatcher{ + Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo2"), + Section: SCHEDULER_JSN, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expected OK received: %s", reply) + } + expAttr := &SchedulerCfg{ + Enabled: true, + CDRsConns: []*RemoteHost{ + &RemoteHost{ + Address: "127.0.0.1:2012", + Transport: utils.MetaJSONrpc, + }, + }, + } + if !reflect.DeepEqual(expAttr, cfg.SchedulerCfg()) { + t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.SchedulerCfg())) + } +} + func TestCgrCfgV1ReloadConfigSection(t *testing.T) { for _, dir := range []string{"/tmp/ers/in", "/tmp/ers/out"} { if err := os.RemoveAll(dir); err != nil { diff --git a/services/schedulers.go b/services/schedulers.go index 26764c64e..2dcda1760 100644 --- a/services/schedulers.go +++ b/services/schedulers.go @@ -23,6 +23,7 @@ import ( "sync" v1 "github.com/cgrates/cgrates/apier/v1" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/scheduler" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" @@ -51,21 +52,30 @@ func (schS *SchedulerService) Start(sp servmanager.ServiceProvider, waitCache bo } schS.Lock() - - if !waitCache { // Wait for cache to load data before starting + if waitCache { // Wait for cache to load data before starting <-sp.GetCacheS().GetPrecacheChannel(utils.CacheActionPlans) // wait for ActionPlans to be cached } utils.Logger.Info(" Starting CGRateS Scheduler.") schS.schS = scheduler.NewScheduler(sp.GetDM()) - schS.Unlock() go schS.schS.Loop() schS.rpc = v1.NewSchedulerSv1(sp.GetConfig()) if !sp.GetConfig().DispatcherSCfg().Enabled { sp.GetServer().RpcRegister(schS.rpc) } + schS.Unlock() schS.connChan <- schS.rpc + // Create connection to CDR Server and share it in engine(used for *cdrlog action) + cdrsConn, err := sp.GetConnection(utils.CDRs, sp.GetConfig().SchedulerCfg().CDRsConns) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to CDRServer: %s", utils.SchedulerS, err.Error())) + return + } + + // ToDo: this should be send to scheduler + engine.SetSchedCdrsConns(cdrsConn) + return } @@ -76,7 +86,9 @@ func (schS *SchedulerService) GetIntenternalChan() (conn chan rpcclient.RpcClien // Reload handles the change of config func (schS *SchedulerService) Reload(sp servmanager.ServiceProvider) (err error) { + schS.RLock() schS.schS.Reload() + defer schS.RUnlock() return } @@ -93,11 +105,15 @@ func (schS *SchedulerService) Shutdown() (err error) { // GetRPCInterface returns the interface to register for server func (schS *SchedulerService) GetRPCInterface() interface{} { + schS.RLock() + defer schS.RUnlock() return schS.rpc } // IsRunning returns if the service is running func (schS *SchedulerService) IsRunning() bool { + schS.RLock() + defer schS.RUnlock() return schS != nil && schS.schS != nil } diff --git a/services/schedulers_it_test.go b/services/schedulers_it_test.go new file mode 100644 index 000000000..656aa4aa1 --- /dev/null +++ b/services/schedulers_it_test.go @@ -0,0 +1,91 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package services + +import ( + "fmt" + "path" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" +) + +func TestSchedulerSReload(t *testing.T) { + cfg, err := config.NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + engineShutdown := make(chan bool, 1) + chS := engine.NewCacheS(cfg, nil) + close(chS.GetPrecacheChannel(utils.CacheActionPlans)) + server := utils.NewServer() + dm, err := engine.ConfigureDataStorage(cfg.DataDbCfg().DataDbType, + cfg.DataDbCfg().DataDbHost, cfg.DataDbCfg().DataDbPort, + cfg.DataDbCfg().DataDbName, cfg.DataDbCfg().DataDbUser, + cfg.DataDbCfg().DataDbPass, cfg.GeneralCfg().DBDataEncoding, + cfg.CacheCfg(), cfg.DataDbCfg().DataDbSentinelName) + if err != nil { + t.Fatal(err) + } + srvMngr := servmanager.NewServiceManager(cfg, dm, + chS /*cdrStorage*/, nil, + /*loadStorage*/ nil, filterSChan, + server, nil, engineShutdown) + schS := NewSchedulerService() + srvMngr.AddService(schS) + if err = srvMngr.StartServices(); err != nil { + t.Error(err) + } + if schS.IsRunning() { + t.Errorf("Expected service to be down") + } + var reply string + if err := cfg.V1ReloadConfig(&config.ConfigReloadWithArgDispatcher{ + Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo"), + Section: config.SCHEDULER_JSN, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expecting OK ,received %s", reply) + } + time.Sleep(10 * time.Millisecond) //need to switch to gorutine + if !schS.IsRunning() { + t.Errorf("Expected service to be running") + } + cfg.SchedulerCfg().Enabled = false + fmt.Println("1") + cfg.GetReloadChan(config.SCHEDULER_JSN) <- struct{}{} + fmt.Println("2") + time.Sleep(10 * time.Millisecond) + if schS.IsRunning() { + t.Errorf("Expected service to be down") + } + fmt.Println("3") + engineShutdown <- true + fmt.Println("4") +} diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index f550199bb..7ebfd1cde 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -207,7 +207,11 @@ func (srvMngr *ServiceManager) GetConnection(subsystem string, conns []*config.R } // srvMngr.RLock() // defer srvMngr.RUnlock() - internalChan := srvMngr.subsystems[subsystem].GetIntenternalChan() + service, has := srvMngr.subsystems[subsystem] + if !has { // used to bypass the not implemented services + return nil, nil + } + internalChan := service.GetIntenternalChan() if srvMngr.GetConfig().DispatcherSCfg().Enabled { internalChan = srvMngr.dispatcherSChan } @@ -288,7 +292,6 @@ func (srvMngr *ServiceManager) StartServices() (err error) { } }() } - fmt.Println(srvMngr.cfg.SchedulerCfg().Enabled) if srvMngr.cfg.SchedulerCfg().Enabled { go func() { if supS, has := srvMngr.subsystems[utils.SchedulerS]; !has {