From 4900a70eafeab0e6fb6c866349c94980fb3a39e2 Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 28 Mar 2018 16:30:12 +0200 Subject: [PATCH] Adding Loader service in cgr-engine --- apier/v1/loaders.go | 39 ++++++++++++++++++++++++++++++++++++ cmd/cgr-engine/cgr-engine.go | 14 +++++++++++++ loaders/loader.go | 17 +++++++++++----- loaders/loaders.go | 36 ++++++++++++++++++++++++++------- 4 files changed, 94 insertions(+), 12 deletions(-) create mode 100644 apier/v1/loaders.go diff --git a/apier/v1/loaders.go b/apier/v1/loaders.go new file mode 100644 index 000000000..d685f7890 --- /dev/null +++ b/apier/v1/loaders.go @@ -0,0 +1,39 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package v1 + +import ( + "github.com/cgrates/cgrates/loaders" + "github.com/cgrates/cgrates/utils" +) + +func NewLoaderSv1(ldrS *loaders.LoaderService) *LoaderSv1 { + return &LoaderSv1{ldrS: ldrS} +} + +// Exports RPC from LoaderService +type LoaderSv1 struct { + ldrS *loaders.LoaderService +} + +// Call implements rpcclient.RpcClientConnection interface for internal RPC +func (ldrSv1 *LoaderSv1) Call(serviceMethod string, + args interface{}, reply interface{}) error { + return utils.APIerRPCCall(ldrSv1, serviceMethod, args, reply) +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 44ba4a72d..3adfdaecc 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -34,6 +34,7 @@ import ( "github.com/cgrates/cgrates/cdrc" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/loaders" "github.com/cgrates/cgrates/scheduler" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/sessions" @@ -698,6 +699,17 @@ func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS, } +// loaderService will start and register APIs for LoaderService if enabled +func loaderService(cacheS *engine.CacheS, cfg *config.CGRConfig, + dm *engine.DataManager, server *utils.Server, exitChan chan bool) { + ldrS := loaders.NewLoaderService(dm, cfg.LoaderSCfg(), cfg.DefaultTimezone) + if !ldrS.Enabled() { + return + } + go ldrS.ListenAndServe(exitChan) + server.RpcRegister(v1.NewLoaderSv1(ldrS)) +} + func startRpc(server *utils.Server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRsChan, internalStatSChan, internalSMGChan chan rpcclient.RpcClientConnection) { @@ -995,6 +1007,8 @@ func main() { cfg, dm, server, exitChan, filterSChan) } + go loaderService(cacheS, cfg, dm, server, exitChan) + // Serve rpc connections go startRpc(server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRsChan, internalStatSChan, internalSMGChan) diff --git a/loaders/loader.go b/loaders/loader.go index 22f4395e4..a8c209f19 100644 --- a/loaders/loader.go +++ b/loaders/loader.go @@ -38,7 +38,7 @@ type openedCSVFile struct { } func NewLoader(dm *engine.DataManager, cfg *config.LoaderSConfig, - timezone string) (ldr *Loader, err error) { + timezone string) (ldr *Loader) { ldr = &Loader{ enabled: cfg.Enabled, dryRun: cfg.DryRun, @@ -87,6 +87,13 @@ type Loader struct { timezone string } +func (ldr *Loader) ListenAndServe(exitChan chan struct{}) (err error) { + utils.Logger.Info(fmt.Sprintf("Starting <%s-%s>", utils.LoaderS, ldr.ldrID)) + e := <-exitChan + exitChan <- e // put back for the others listening for shutdown request + return +} + // ProcessFolder will process the content in the folder with locking func (ldr *Loader) ProcessFolder() (err error) { if err = ldr.lockFolder(); err != nil { @@ -95,13 +102,13 @@ func (ldr *Loader) ProcessFolder() (err error) { defer ldr.unlockFolder() for ldrType := range ldr.rdrs { if err = ldr.openFiles(ldrType); err != nil { - utils.Logger.Warning(fmt.Sprintf("<%s> loaderType: <%s> cannot open files, err: %s", - utils.LoaderS, ldrType, err.Error())) + utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s> cannot open files, err: %s", + utils.LoaderS, ldr.ldrID, ldrType, err.Error())) continue } if err = ldr.processContent(ldrType); err != nil { - utils.Logger.Warning(fmt.Sprintf("<%s> loaderType: <%s>, err: %s", - utils.LoaderS, ldrType, err.Error())) + utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s>, err: %s", + utils.LoaderS, ldr.ldrID, ldrType, err.Error())) } } return diff --git a/loaders/loaders.go b/loaders/loaders.go index eb10059fc..8e3cd2400 100644 --- a/loaders/loaders.go +++ b/loaders/loaders.go @@ -23,22 +23,44 @@ import ( "github.com/cgrates/cgrates/engine" ) -func NewLoaderS(dm *engine.DataManager, cfg []*config.LoaderSConfig, - timezone string) (ldrS *LoaderS, err error) { +func NewLoaderService(dm *engine.DataManager, ldrsCfg []*config.LoaderSConfig, + timezone string) (ldrS *LoaderService) { + ldrS = &LoaderService{ldrs: make(map[string]*Loader)} + for _, ldrCfg := range ldrsCfg { + if !ldrCfg.Enabled { + continue + } + ldrS.ldrs[ldrCfg.Id] = NewLoader(dm, ldrCfg, timezone) + } return } // LoaderS is the Loader service handling independent Loaders -type LoaderS struct { - loaders map[string]*Loader +type LoaderService struct { + ldrs map[string]*Loader } -// isEnabled returns true if at least one loader is enabled -func (ldrS *LoaderS) isEnabled() bool { - for _, ldr := range ldrS.loaders { +// IsEnabled returns true if at least one loader is enabled +func (ldrS *LoaderService) Enabled() bool { + for _, ldr := range ldrS.ldrs { if ldr.enabled { return true } } return false } + +func (ldrS *LoaderService) ListenAndServe(exitChan chan bool) (err error) { + ldrExitChan := make(chan struct{}) + for _, ldr := range ldrS.ldrs { + go ldr.ListenAndServe(ldrExitChan) + } + select { // exit on errors coming from server or any loader + case e := <-exitChan: + close(ldrExitChan) + exitChan <- e // put back for the others listening for shutdown request + case <-ldrExitChan: + exitChan <- true + } + return +}