From 7f88e7e153ecba5e546d955e9a676ea41645b0e9 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 3 Oct 2019 14:22:05 +0300 Subject: [PATCH] Updated Scheduler Service --- cmd/cgr-engine/cgr-engine.go | 41 +++++++++++++--------- services/schedulers.go | 63 +++++++++++++++++++++++----------- services/schedulers_it_test.go | 12 +++---- servmanager/servmanager.go | 20 +++++------ 4 files changed, 82 insertions(+), 54 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 820317dda..c4661e80e 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -655,9 +655,11 @@ func main() { internalServeManagerChan := make(chan rpcclient.RpcClientConnection, 1) internalConfigChan := make(chan rpcclient.RpcClientConnection, 1) internalCoreSv1Chan := make(chan rpcclient.RpcClientConnection, 1) - internalCacheSChan := make(chan rpcclient.RpcClientConnection, 1) + internalRaterChan := make(chan rpcclient.RpcClientConnection, 1) + // internalRALsv1Chan := make(chan rpcclient.RpcClientConnection, 1) + // internalSMGChan := make(chan rpcclient.RpcClientConnection, 1) // init CacheS cacheS := initCacheS(internalCacheSChan, server, dm, exitChan) @@ -679,16 +681,22 @@ func main() { reS := services.NewResourceService(cfg, dm, cacheS, filterSChan, server, tS.GetIntenternalChan(), internalDispatcherSChan) supS := services.NewSupplierService(cfg, dm, cacheS, filterSChan, server, - attrS.GetIntenternalChan(), stS.GetIntenternalChan(), reS.GetIntenternalChan(), internalDispatcherSChan) - /* schS := services.NewSchedulerService() - cdrS := services.NewCDRServer() - rals := services.NewRalService(srvManager) - apiv1, _ := srvManager.GetService(utils.ApierV1) - apiv2, _ := srvManager.GetService(utils.ApierV2) - resp, _ := srvManager.GetService(utils.ResponderS) - smg := services.NewSessionService() - grd := services.NewGuardianService()*/ - srvManager.AddServices( /*chS, */ attrS, chrS, tS, stS, reS, supS) /*, schS, cdrS, rals, smg, grd, + attrS.GetIntenternalChan(), stS.GetIntenternalChan(), + reS.GetIntenternalChan(), internalDispatcherSChan) + schS := services.NewSchedulerService(cfg, dm, cacheS, server, internalDispatcherSChan) + // cdrS := services.NewCDRServer(cfg, dm, cdrDb, filterSChan, server, + // chrS.GetIntenternalChan(),internalRALsv1Chan, + // attrS.GetIntenternalChan(), tS.GetIntenternalChan(), + // stS.GetIntenternalChan(), internalDispatcherSChan) + // schS.SetCdrsConns(cdrS.GetIntenternalChan()) + /* + rals := services.NewRalService(srvManager) + apiv1, _ := srvManager.GetService(utils.ApierV1) + apiv2, _ := srvManager.GetService(utils.ApierV2) + resp, _ := srvManager.GetService(utils.ResponderS) + smg := services.NewSessionService() + grd := services.NewGuardianService()*/ + srvManager.AddServices( /*chS, */ attrS, chrS, tS, stS, reS, supS, schS) /*, cdrS, rals, smg, grd, services.NewEventReaderService(), services.NewDNSAgent(), services.NewFreeswitchAgent(), @@ -701,7 +709,6 @@ func main() { /* internalSchedSChan := schS.GetIntenternalChan() - internalCdrSChan := cdrS.GetIntenternalChan() internalCacheSChan := chS.GetIntenternalChan() internalApierV1Chan := apiv1.GetIntenternalChan() internalApierV2Chan := apiv2.GetIntenternalChan() @@ -727,14 +734,14 @@ func main() { // engine.IntRPC.AddInternalRPCClient(utils.ApierV2, internalApierV2Chan) engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, attrS.GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.CacheSv1, internalCacheSChan) // server or from apier - // engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, internalCdrSChan) - // engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, internalCdrSChan) + // engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, cdrS.GetIntenternalChan()) + // engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, cdrS.GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.ChargerSv1, chrS.GetIntenternalChan()) // engine.IntRPC.AddInternalRPCClient(utils.GuardianSv1, internalGuardianSChan) engine.IntRPC.AddInternalRPCClient(utils.LoaderSv1, internalLoaderSChan) engine.IntRPC.AddInternalRPCClient(utils.ResourceSv1, reS.GetIntenternalChan()) // engine.IntRPC.AddInternalRPCClient(utils.Responder, internalRaterChan) - // engine.IntRPC.AddInternalRPCClient(utils.SchedulerSv1, internalSchedSChan) // server or from apier + engine.IntRPC.AddInternalRPCClient(utils.SchedulerSv1, schS.GetIntenternalChan()) // engine.IntRPC.AddInternalRPCClient(utils.SessionSv1, internalSMGChan) // server or from apier engine.IntRPC.AddInternalRPCClient(utils.StatSv1, stS.GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.SupplierSv1, supS.GetIntenternalChan()) @@ -748,7 +755,7 @@ func main() { initConfigSv1(internalConfigChan, server) // Start CDRC components if necessary - // go startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan, filterSChan, exitChan) + // go startCdrcs(cdrS.GetIntenternalChan(), internalRaterChan, internalDispatcherSChan, filterSChan, exitChan) if cfg.DispatcherSCfg().Enabled { go startDispatcherService(internalDispatcherSChan, @@ -764,7 +771,7 @@ func main() { // Serve rpc connections /* - go startRpc(server, internalRaterChan, internalCdrSChan, + go startRpc(server, internalRaterChan, cdrS.GetIntenternalChan(), reS.GetIntenternalChan(), stS.GetIntenternalChan(), attrS.GetIntenternalChan(), chrS.GetIntenternalChan(), tS.GetIntenternalChan(), supS.GetIntenternalChan(), internalSMGChan, internalAnalyzerSChan, diff --git a/services/schedulers.go b/services/schedulers.go index 9bf81a71a..539a29d3a 100644 --- a/services/schedulers.go +++ b/services/schedulers.go @@ -23,53 +23,65 @@ import ( "sync" v1 "github.com/cgrates/cgrates/apier/v1" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/scheduler" - "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) // NewSchedulerService returns the Scheduler Service -func NewSchedulerService() *SchedulerService { +func NewSchedulerService(cfg *config.CGRConfig, dm *engine.DataManager, + cacheS *engine.CacheS, server *utils.Server, + dispatcherChan chan rpcclient.RpcClientConnection) *SchedulerService { return &SchedulerService{ - connChan: make(chan rpcclient.RpcClientConnection, 1), + connChan: make(chan rpcclient.RpcClientConnection, 1), + cfg: cfg, + dm: dm, + cacheS: cacheS, + server: server, + dispatcherChan: dispatcherChan, } } // SchedulerService implements Service interface type SchedulerService struct { sync.RWMutex + cfg *config.CGRConfig + dm *engine.DataManager + cacheS *engine.CacheS + server *utils.Server + cdrSChan chan rpcclient.RpcClientConnection + dispatcherChan chan rpcclient.RpcClientConnection + schS *scheduler.Scheduler rpc *v1.SchedulerSv1 connChan chan rpcclient.RpcClientConnection } // Start should handle the sercive start -func (schS *SchedulerService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { +func (schS *SchedulerService) Start() (err error) { if schS.IsRunning() { return fmt.Errorf("service aleady running") } - if waitCache { // Wait for cache to load data before starting - <-sp.GetCacheS().GetPrecacheChannel(utils.CacheActionPlans) // wait for ActionPlans to be cached - } + <-schS.cacheS.GetPrecacheChannel(utils.CacheActionPlans) // wait for ActionPlans to be cached schS.Lock() defer schS.Unlock() utils.Logger.Info(" Starting CGRateS Scheduler.") - schS.schS = scheduler.NewScheduler(sp.GetDM()) + schS.schS = scheduler.NewScheduler(schS.dm) go schS.schS.Loop() - schS.rpc = v1.NewSchedulerSv1(sp.GetConfig()) - if !sp.GetConfig().DispatcherSCfg().Enabled { - sp.GetServer().RpcRegister(schS.rpc) + schS.rpc = v1.NewSchedulerSv1(schS.cfg) + if !schS.cfg.DispatcherSCfg().Enabled { + schS.server.RpcRegister(schS.rpc) } schS.connChan <- schS.rpc // Create connection to CDR Server and share it in engine(used for *cdrlog action) - cdrsConn, err := sp.NewConnection(utils.CDRServer, sp.GetConfig().SchedulerCfg().CDRsConns) + cdrsConn, err := NewConnection(schS.cfg, schS.cdrSChan, schS.dispatcherChan, schS.cfg.SchedulerCfg().CDRsConns) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to CDRServer: %s", utils.SchedulerS, err.Error())) return @@ -87,7 +99,14 @@ func (schS *SchedulerService) GetIntenternalChan() (conn chan rpcclient.RpcClien } // Reload handles the change of config -func (schS *SchedulerService) Reload(sp servmanager.ServiceProvider) (err error) { +func (schS *SchedulerService) Reload() (err error) { + cdrsConn, err := NewConnection(schS.cfg, schS.cdrSChan, schS.dispatcherChan, schS.cfg.SchedulerCfg().CDRsConns) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to CDRServer: %s", utils.SchedulerS, err.Error())) + return + } + // ToDo: this should be send to scheduler + engine.SetSchedCdrsConns(cdrsConn) schS.Lock() schS.schS.Reload() schS.Unlock() @@ -105,13 +124,6 @@ func (schS *SchedulerService) Shutdown() (err error) { return } -// GetRPCInterface returns the interface to register for server -func (schS *SchedulerService) GetRPCInterface() interface{} { - schS.RLock() - defer schS.RUnlock() - return schS.rpc -} - // IsRunning returns if the service is running func (schS *SchedulerService) IsRunning() bool { schS.RLock() @@ -130,3 +142,14 @@ func (schS *SchedulerService) GetScheduler() *scheduler.Scheduler { defer schS.RUnlock() return schS.schS } + +// ShouldRun returns if the service should be running +func (schS *SchedulerService) ShouldRun() bool { + return schS.cfg.SchedulerCfg().Enabled +} + +// SetCdrsConns sets the value for cdrSChan +// this needs to be called before StartServices +func (schS *SchedulerService) SetCdrsConns(cdrSChan chan rpcclient.RpcClientConnection) { + schS.cdrSChan = cdrSChan +} diff --git a/services/schedulers_it_test.go b/services/schedulers_it_test.go index cbfacabe1..a6bb972a2 100644 --- a/services/schedulers_it_test.go +++ b/services/schedulers_it_test.go @@ -38,8 +38,6 @@ func TestSchedulerSReload(t *testing.T) { } utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) utils.Logger.SetLogLevel(7) - filterSChan := make(chan *engine.FilterS, 1) - filterSChan <- nil engineShutdown := make(chan bool, 1) chS := engine.NewCacheS(cfg, nil) close(chS.GetPrecacheChannel(utils.CacheActionPlans)) @@ -52,15 +50,15 @@ func TestSchedulerSReload(t *testing.T) { if err != nil { t.Fatal(err) } - srvMngr := servmanager.NewServiceManager(cfg, dm, - /*cdrStorage*/ nil, - /*loadStorage*/ nil, filterSChan, + srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil, + /*cdrStorage*/ nil /*loadStorage*/, nil /*filterSChan*/, nil, server, nil, engineShutdown) srvMngr.SetCacheS(chS) - schS := NewSchedulerService() + schS := NewSchedulerService(cfg, dm, chS, server, nil) internalCdrSChan := make(chan rpcclient.RpcClientConnection, 1) internalCdrSChan <- nil - srvMngr.AddService(schS, &CDRServer{connChan: internalCdrSChan}) + schS.SetCdrsConns(internalCdrSChan) + srvMngr.AddServices(schS) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 5d9154b97..bd75bbe22 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -258,10 +258,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) { } if srvMngr.GetConfig().SupplierSCfg().Enabled { go srvMngr.startService(utils.SupplierS) + } + if srvMngr.GetConfig().SchedulerCfg().Enabled { + go srvMngr.startService(utils.SchedulerS) } /* - if srvMngr.GetConfig().SchedulerCfg().Enabled { - go srvMngr.startService(utils.SchedulerS) - } if srvMngr.GetConfig().CdrsCfg().Enabled { go srvMngr.startService(utils.CDRServer) } @@ -351,15 +351,15 @@ func (srvMngr *ServiceManager) handleReload() { case <-srvMngr.GetConfig().GetReloadChan(config.SupplierSJson): if err = srvMngr.reloadService(utils.SupplierS); err != nil { return + } + case <-srvMngr.GetConfig().GetReloadChan(config.SCHEDULER_JSN): + if err = srvMngr.reloadService(utils.SchedulerS); err != nil { + return } /* - case <-srvMngr.GetConfig().GetReloadChan(config.SCHEDULER_JSN): - if err = srvMngr.reloadService(utils.SchedulerS); err != nil { - return - } case <-srvMngr.GetConfig().GetReloadChan(config.CDRS_JSN): - if err = srvMngr.reloadService(utils.CDRServer); err != nil { - return - } + if err = srvMngr.reloadService(utils.CDRServer); err != nil { + return + } case <-srvMngr.GetConfig().GetReloadChan(config.RALS_JSN): if err = srvMngr.reloadService(utils.RALService); err != nil { return