Adding Loader service in cgr-engine

This commit is contained in:
DanB
2018-03-28 16:30:12 +02:00
parent d687c98b3a
commit 4900a70eaf
4 changed files with 94 additions and 12 deletions

39
apier/v1/loaders.go Normal file
View File

@@ -0,0 +1,39 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package v1
import (
"github.com/cgrates/cgrates/loaders"
"github.com/cgrates/cgrates/utils"
)
func NewLoaderSv1(ldrS *loaders.LoaderService) *LoaderSv1 {
return &LoaderSv1{ldrS: ldrS}
}
// Exports RPC from LoaderService
type LoaderSv1 struct {
ldrS *loaders.LoaderService
}
// Call implements rpcclient.RpcClientConnection interface for internal RPC
func (ldrSv1 *LoaderSv1) Call(serviceMethod string,
args interface{}, reply interface{}) error {
return utils.APIerRPCCall(ldrSv1, serviceMethod, args, reply)
}

View File

@@ -34,6 +34,7 @@ import (
"github.com/cgrates/cgrates/cdrc"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/loaders"
"github.com/cgrates/cgrates/scheduler"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/sessions"
@@ -698,6 +699,17 @@ func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS,
}
// loaderService will start and register APIs for LoaderService if enabled
func loaderService(cacheS *engine.CacheS, cfg *config.CGRConfig,
dm *engine.DataManager, server *utils.Server, exitChan chan bool) {
ldrS := loaders.NewLoaderService(dm, cfg.LoaderSCfg(), cfg.DefaultTimezone)
if !ldrS.Enabled() {
return
}
go ldrS.ListenAndServe(exitChan)
server.RpcRegister(v1.NewLoaderSv1(ldrS))
}
func startRpc(server *utils.Server, internalRaterChan,
internalCdrSChan, internalCdrStatSChan, internalPubSubSChan, internalUserSChan,
internalAliaseSChan, internalRsChan, internalStatSChan, internalSMGChan chan rpcclient.RpcClientConnection) {
@@ -995,6 +1007,8 @@ func main() {
cfg, dm, server, exitChan, filterSChan)
}
go loaderService(cacheS, cfg, dm, server, exitChan)
// Serve rpc connections
go startRpc(server, internalRaterChan, internalCdrSChan, internalCdrStatSChan,
internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRsChan, internalStatSChan, internalSMGChan)

View File

@@ -38,7 +38,7 @@ type openedCSVFile struct {
}
func NewLoader(dm *engine.DataManager, cfg *config.LoaderSConfig,
timezone string) (ldr *Loader, err error) {
timezone string) (ldr *Loader) {
ldr = &Loader{
enabled: cfg.Enabled,
dryRun: cfg.DryRun,
@@ -87,6 +87,13 @@ type Loader struct {
timezone string
}
func (ldr *Loader) ListenAndServe(exitChan chan struct{}) (err error) {
utils.Logger.Info(fmt.Sprintf("Starting <%s-%s>", utils.LoaderS, ldr.ldrID))
e := <-exitChan
exitChan <- e // put back for the others listening for shutdown request
return
}
// ProcessFolder will process the content in the folder with locking
func (ldr *Loader) ProcessFolder() (err error) {
if err = ldr.lockFolder(); err != nil {
@@ -95,13 +102,13 @@ func (ldr *Loader) ProcessFolder() (err error) {
defer ldr.unlockFolder()
for ldrType := range ldr.rdrs {
if err = ldr.openFiles(ldrType); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> loaderType: <%s> cannot open files, err: %s",
utils.LoaderS, ldrType, err.Error()))
utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s> cannot open files, err: %s",
utils.LoaderS, ldr.ldrID, ldrType, err.Error()))
continue
}
if err = ldr.processContent(ldrType); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> loaderType: <%s>, err: %s",
utils.LoaderS, ldrType, err.Error()))
utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s>, err: %s",
utils.LoaderS, ldr.ldrID, ldrType, err.Error()))
}
}
return

View File

@@ -23,22 +23,44 @@ import (
"github.com/cgrates/cgrates/engine"
)
func NewLoaderS(dm *engine.DataManager, cfg []*config.LoaderSConfig,
timezone string) (ldrS *LoaderS, err error) {
func NewLoaderService(dm *engine.DataManager, ldrsCfg []*config.LoaderSConfig,
timezone string) (ldrS *LoaderService) {
ldrS = &LoaderService{ldrs: make(map[string]*Loader)}
for _, ldrCfg := range ldrsCfg {
if !ldrCfg.Enabled {
continue
}
ldrS.ldrs[ldrCfg.Id] = NewLoader(dm, ldrCfg, timezone)
}
return
}
// LoaderS is the Loader service handling independent Loaders
type LoaderS struct {
loaders map[string]*Loader
type LoaderService struct {
ldrs map[string]*Loader
}
// isEnabled returns true if at least one loader is enabled
func (ldrS *LoaderS) isEnabled() bool {
for _, ldr := range ldrS.loaders {
// IsEnabled returns true if at least one loader is enabled
func (ldrS *LoaderService) Enabled() bool {
for _, ldr := range ldrS.ldrs {
if ldr.enabled {
return true
}
}
return false
}
func (ldrS *LoaderService) ListenAndServe(exitChan chan bool) (err error) {
ldrExitChan := make(chan struct{})
for _, ldr := range ldrS.ldrs {
go ldr.ListenAndServe(ldrExitChan)
}
select { // exit on errors coming from server or any loader
case e := <-exitChan:
close(ldrExitChan)
exitChan <- e // put back for the others listening for shutdown request
case <-ldrExitChan:
exitChan <- true
}
return
}