From b79524e9abac27eafef7cbc5cb1103730c13747e Mon Sep 17 00:00:00 2001 From: TeoV Date: Tue, 14 Jul 2020 11:30:41 +0300 Subject: [PATCH] Add -preload=LoderID1,LoaderID2,... flag to cgr-engine --- cmd/cgr-engine/cgr-engine.go | 47 +++++++++++++++++++++++++++--------- services/loaders.go | 1 + 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index c94fe9fab..bf156edae 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -32,6 +32,8 @@ import ( "syscall" "time" + "github.com/cgrates/cgrates/loaders" + v1 "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -118,6 +120,7 @@ func startRpc(server *utils.Server, internalRaterChan, internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan, internalLoaderSChan, internalRALsv1Chan, internalCacheSChan, internalEEsChan, internalRateSChan chan rpcclient.ClientConnector, + internalPreloadChan chan struct{}, exitChan chan bool) { if !cfg.DispatcherSCfg().Enabled { select { // Any of the rpc methods will unlock listening to rpc requests @@ -151,6 +154,8 @@ func startRpc(server *utils.Server, internalRaterChan, internalEEsChan <- eeS case rateS := <-internalRateSChan: internalRateSChan <- rateS + case preload := <-internalPreloadChan: + internalPreloadChan <- preload } } else { select { @@ -321,6 +326,31 @@ func singnalHandler(exitChan chan bool) { } } +func runPreload(loader *services.LoaderService, internalLoaderSChan chan rpcclient.ClientConnector, + internalPreloadChan chan struct{}, exitChan chan bool) { + + if !cfg.LoaderCfg().Enabled() { + utils.Logger.Err(fmt.Sprintf("<%s> not enabled but required by preload mechanism", utils.LoaderS)) + exitChan <- true + return + } + + ldr := <-internalLoaderSChan + internalLoaderSChan <- ldr + + var reply string + for _, loaderID := range strings.Split(*preload, utils.FIELDS_SEP) { + if err := loader.GetLoaderS().V1Load(&loaders.ArgsProcessFolder{ + LoaderID: loaderID, + }, &reply); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> preload failed on loadID <%s> , err: <%s>", utils.LoaderS, loaderID, err.Error())) + exitChan <- true + return + } + } + internalPreloadChan <- struct{}{} +} + func main() { if err := cgrEngineFlags.Parse(os.Args[1:]); err != nil { return @@ -421,6 +451,7 @@ func main() { internalLoaderSChan := make(chan rpcclient.ClientConnector, 1) internalEEsChan := make(chan rpcclient.ClientConnector, 1) internalRateSChan := make(chan rpcclient.ClientConnector, 1) + internalPreloadChan := make(chan struct{}, 1) // initialize the connManager before creating the DMService // because we need to pass the connection to it @@ -514,6 +545,7 @@ func main() { ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, exitChan, internalLoaderSChan, connManager) + anz := services.NewAnalyzerService(cfg, server, exitChan, internalAnalyzerSChan) srvManager.AddServices(attrS, chrS, tS, stS, reS, routeS, schS, rals, @@ -567,26 +599,17 @@ func main() { initConfigSv1(internalConfigChan, server) - // verify if we need to do preload if *preload != utils.EmptyString { - if !cfg.LoaderCfg().Enabled() { - fmt.Println("*preload flag required LoaderS to be enabled") - exitChan <- true - return - } - if err = ldrs.GetLoaderS().Preload(strings.Split(*preload, utils.FIELDS_SEP)); err != nil { - fmt.Println(err) - exitChan <- true - return - } + runPreload(ldrs, internalLoaderSChan, internalPreloadChan, exitChan) } + // Serve rpc connections go startRpc(server, internalResponderChan, internalCDRServerChan, internalResourceSChan, internalStatSChan, internalAttributeSChan, internalChargerSChan, internalThresholdSChan, internalRouteSChan, internalSessionSChan, internalAnalyzerSChan, internalDispatcherSChan, internalLoaderSChan, internalRALsChan, - internalCacheSChan, internalEEsChan, internalRateSChan, exitChan) + internalCacheSChan, internalEEsChan, internalRateSChan, internalPreloadChan, exitChan) <-exitChan if *cpuProfDir != "" { // wait to end cpuProfiling diff --git a/services/loaders.go b/services/loaders.go index 40a431cf0..7a70d5424 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -77,6 +77,7 @@ func (ldrs *LoaderService) Start() (err error) { ldrs.ldrs = loaders.NewLoaderService(datadb, ldrs.cfg.LoaderCfg(), ldrs.cfg.GeneralCfg().DefaultTimezone, ldrs.exitChan, filterS, ldrs.connMgr) + if !ldrs.ldrs.Enabled() { return }