Add -preload=LoderID1,LoaderID2,... flag to cgr-engine

This commit is contained in:
TeoV
2020-07-14 11:30:41 +03:00
committed by Dan Christian Bogos
parent 1a8195c5e7
commit b79524e9ab
2 changed files with 36 additions and 12 deletions

View File

@@ -32,6 +32,8 @@ import (
"syscall"
"time"
"github.com/cgrates/cgrates/loaders"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -118,6 +120,7 @@ func startRpc(server *utils.Server, internalRaterChan,
internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan,
internalLoaderSChan, internalRALsv1Chan, internalCacheSChan,
internalEEsChan, internalRateSChan chan rpcclient.ClientConnector,
internalPreloadChan chan struct{},
exitChan chan bool) {
if !cfg.DispatcherSCfg().Enabled {
select { // Any of the rpc methods will unlock listening to rpc requests
@@ -151,6 +154,8 @@ func startRpc(server *utils.Server, internalRaterChan,
internalEEsChan <- eeS
case rateS := <-internalRateSChan:
internalRateSChan <- rateS
case preload := <-internalPreloadChan:
internalPreloadChan <- preload
}
} else {
select {
@@ -321,6 +326,31 @@ func singnalHandler(exitChan chan bool) {
}
}
func runPreload(loader *services.LoaderService, internalLoaderSChan chan rpcclient.ClientConnector,
internalPreloadChan chan struct{}, exitChan chan bool) {
if !cfg.LoaderCfg().Enabled() {
utils.Logger.Err(fmt.Sprintf("<%s> not enabled but required by preload mechanism", utils.LoaderS))
exitChan <- true
return
}
ldr := <-internalLoaderSChan
internalLoaderSChan <- ldr
var reply string
for _, loaderID := range strings.Split(*preload, utils.FIELDS_SEP) {
if err := loader.GetLoaderS().V1Load(&loaders.ArgsProcessFolder{
LoaderID: loaderID,
}, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> preload failed on loadID <%s> , err: <%s>", utils.LoaderS, loaderID, err.Error()))
exitChan <- true
return
}
}
internalPreloadChan <- struct{}{}
}
func main() {
if err := cgrEngineFlags.Parse(os.Args[1:]); err != nil {
return
@@ -421,6 +451,7 @@ func main() {
internalLoaderSChan := make(chan rpcclient.ClientConnector, 1)
internalEEsChan := make(chan rpcclient.ClientConnector, 1)
internalRateSChan := make(chan rpcclient.ClientConnector, 1)
internalPreloadChan := make(chan struct{}, 1)
// initialize the connManager before creating the DMService
// because we need to pass the connection to it
@@ -514,6 +545,7 @@ func main() {
ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, exitChan,
internalLoaderSChan, connManager)
anz := services.NewAnalyzerService(cfg, server, exitChan, internalAnalyzerSChan)
srvManager.AddServices(attrS, chrS, tS, stS, reS, routeS, schS, rals,
@@ -567,26 +599,17 @@ func main() {
initConfigSv1(internalConfigChan, server)
// verify if we need to do preload
if *preload != utils.EmptyString {
if !cfg.LoaderCfg().Enabled() {
fmt.Println("*preload flag required LoaderS to be enabled")
exitChan <- true
return
}
if err = ldrs.GetLoaderS().Preload(strings.Split(*preload, utils.FIELDS_SEP)); err != nil {
fmt.Println(err)
exitChan <- true
return
}
runPreload(ldrs, internalLoaderSChan, internalPreloadChan, exitChan)
}
// Serve rpc connections
go startRpc(server, internalResponderChan, internalCDRServerChan,
internalResourceSChan, internalStatSChan,
internalAttributeSChan, internalChargerSChan, internalThresholdSChan,
internalRouteSChan, internalSessionSChan, internalAnalyzerSChan,
internalDispatcherSChan, internalLoaderSChan, internalRALsChan,
internalCacheSChan, internalEEsChan, internalRateSChan, exitChan)
internalCacheSChan, internalEEsChan, internalRateSChan, internalPreloadChan, exitChan)
<-exitChan
if *cpuProfDir != "" { // wait to end cpuProfiling

View File

@@ -77,6 +77,7 @@ func (ldrs *LoaderService) Start() (err error) {
ldrs.ldrs = loaders.NewLoaderService(datadb, ldrs.cfg.LoaderCfg(),
ldrs.cfg.GeneralCfg().DefaultTimezone, ldrs.exitChan, filterS, ldrs.connMgr)
if !ldrs.ldrs.Enabled() {
return
}