From 0786d31c619522a8832d66be9f5b7d4f59b9ecc7 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Sun, 6 Oct 2019 13:40:55 +0300 Subject: [PATCH] Added LoaderS as service in ServiceManager --- cmd/cgr-engine/cgr-engine.go | 31 ++------ config/config.go | 6 +- loaders/loaders.go | 22 +++++- services/loaders.go | 139 +++++++++++++++++++++++++++++++++++ servmanager/servmanager.go | 70 ++++-------------- 5 files changed, 181 insertions(+), 87 deletions(-) create mode 100644 services/loaders.go diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index d519e8071..0d14a1e96 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -37,7 +37,6 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/dispatchers" "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/loaders" "github.com/cgrates/cgrates/services" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" @@ -145,24 +144,6 @@ func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS, filterSChan <- engine.NewFilterS(cfg, internalStatSChan, internalResourceSChan, internalRalSChan, dm) } -// loaderService will start and register APIs for LoaderService if enabled -func startLoaderS(internalLoaderSChan, cacheSChan chan rpcclient.RpcClientConnection, - cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server, - filterSChan chan *engine.FilterS, exitChan chan bool) { - filterS := <-filterSChan - filterSChan <- filterS - - ldrS := loaders.NewLoaderService(dm, cfg.LoaderCfg(), - cfg.GeneralCfg().DefaultTimezone, exitChan, filterS, cacheSChan) - if !ldrS.Enabled() { - return - } - go ldrS.ListenAndServe(exitChan) - ldrSv1 := v1.NewLoaderSv1(ldrS) - server.RpcRegister(ldrSv1) - internalLoaderSChan <- ldrSv1 -} - // startDispatcherService fires up the DispatcherS func startDispatcherService(internalDispatcherSChan, internalAttributeSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, @@ -658,7 +639,6 @@ func main() { filterSChan := make(chan *engine.FilterS, 1) internalDispatcherSChan := make(chan rpcclient.RpcClientConnection, 1) internalAnalyzerSChan := make(chan rpcclient.RpcClientConnection, 1) - internalLoaderSChan := make(chan rpcclient.RpcClientConnection, 1) internalServeManagerChan := make(chan rpcclient.RpcClientConnection, 1) internalConfigChan := make(chan rpcclient.RpcClientConnection, 1) @@ -678,8 +658,7 @@ func main() { initCoreSv1(internalCoreSv1Chan, server) // Start ServiceManager - srvManager := servmanager.NewServiceManager(cfg, dm, cdrDb, - loadDb, filterSChan, server, internalDispatcherSChan, exitChan) + srvManager := servmanager.NewServiceManager(cfg, exitChan) attrS := services.NewAttributeService(cfg, dm, cacheS, filterSChan, server) chrS := services.NewChargerService(cfg, dm, cacheS, filterSChan, server, @@ -707,6 +686,7 @@ func main() { tS.GetIntenternalChan(), stS.GetIntenternalChan(), supS.GetIntenternalChan(), attrS.GetIntenternalChan(), cdrS.GetIntenternalChan(), internalDispatcherSChan, exitChan) + ldrs := services.NewLoaderService(cfg, dm, filterSChan, server, internalCacheSChan, internalDispatcherSChan, exitChan) srvManager.AddServices(attrS, chrS, tS, stS, reS, supS, schS, rals, rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS, smg, services.NewEventReaderService(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan), @@ -717,6 +697,7 @@ func main() { services.NewRadiusAgent(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan), // partial reload services.NewDiameterAgent(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan), // partial reload services.NewHTTPAgent(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, server), // no reload + ldrs, ) srvManager.StartServices() @@ -740,7 +721,7 @@ func main() { engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, cdrS.GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.ChargerSv1, chrS.GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.GuardianSv1, internalGuardianSChan) - engine.IntRPC.AddInternalRPCClient(utils.LoaderSv1, internalLoaderSChan) + engine.IntRPC.AddInternalRPCClient(utils.LoaderSv1, ldrs.GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.ResourceSv1, reS.GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.Responder, rals.GetResponder().GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.SchedulerSv1, schS.GetIntenternalChan()) @@ -769,14 +750,12 @@ func main() { go startAnalyzerService(internalAnalyzerSChan, server, exitChan) } - go startLoaderS(internalLoaderSChan, internalCacheSChan, cfg, dm, server, filterSChan, exitChan) - // Serve rpc connections go startRpc(server, rals.GetResponder().GetIntenternalChan(), cdrS.GetIntenternalChan(), reS.GetIntenternalChan(), stS.GetIntenternalChan(), attrS.GetIntenternalChan(), chrS.GetIntenternalChan(), tS.GetIntenternalChan(), supS.GetIntenternalChan(), smg.GetIntenternalChan(), internalAnalyzerSChan, - internalDispatcherSChan, internalLoaderSChan, rals.GetIntenternalChan(), internalCacheSChan, exitChan) + internalDispatcherSChan, ldrs.GetIntenternalChan(), rals.GetIntenternalChan(), internalCacheSChan, exitChan) <-exitChan if *cpuProfDir != "" { // wait to end cpuProfiling diff --git a/config/config.go b/config/config.go index 7d49cff17..3d4a2dcae 100755 --- a/config/config.go +++ b/config/config.go @@ -1654,11 +1654,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) { } fallthrough case LoaderJson: - if !fall { - break - } - fallthrough - case CgrLoaderCfgJson: + cfg.rldChans[LoaderJson] <- struct{}{} if !fall { break } diff --git a/loaders/loaders.go b/loaders/loaders.go index 4c9869977..d98cec9ac 100644 --- a/loaders/loaders.go +++ b/loaders/loaders.go @@ -21,6 +21,7 @@ package loaders import ( "errors" "fmt" + "sync" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -43,6 +44,7 @@ func NewLoaderService(dm *engine.DataManager, ldrsCfg []*config.LoaderSCfg, // LoaderS is the Loader service handling independent Loaders type LoaderService struct { + sync.RWMutex ldrs map[string]*Loader } @@ -57,7 +59,6 @@ func (ldrS *LoaderService) Enabled() bool { } func (ldrS *LoaderService) ListenAndServe(exitChan chan bool) (err error) { - // seems useless ldrExitChan := make(chan struct{}) for _, ldr := range ldrS.ldrs { go ldr.ListenAndServe(ldrExitChan) @@ -80,6 +81,8 @@ type ArgsProcessFolder struct { func (ldrS *LoaderService) V1Load(args *ArgsProcessFolder, rply *string) (err error) { + ldrS.RLock() + defer ldrS.RUnlock() if args.LoaderID == "" { args.LoaderID = utils.META_DEFAULT } @@ -111,6 +114,8 @@ func (ldrS *LoaderService) V1Load(args *ArgsProcessFolder, func (ldrS *LoaderService) V1Remove(args *ArgsProcessFolder, rply *string) (err error) { + ldrS.RLock() + defer ldrS.RUnlock() if args.LoaderID == "" { args.LoaderID = utils.META_DEFAULT } @@ -139,3 +144,18 @@ func (ldrS *LoaderService) V1Remove(args *ArgsProcessFolder, *rply = utils.OK return } + +// Reload recreates the loaders map thread safe +func (ldrS *LoaderService) Reload(dm *engine.DataManager, ldrsCfg []*config.LoaderSCfg, + timezone string, exitChan chan bool, filterS *engine.FilterS, + internalCacheSChan chan rpcclient.RpcClientConnection) { + ldrS.Lock() + ldrS.ldrs = make(map[string]*Loader) + for _, ldrCfg := range ldrsCfg { + if !ldrCfg.Enabled { + continue + } + ldrS.ldrs[ldrCfg.Id] = NewLoader(dm, ldrCfg, timezone, exitChan, filterS, internalCacheSChan) + } + ldrS.Unlock() +} diff --git a/services/loaders.go b/services/loaders.go new file mode 100644 index 000000000..5f95d361e --- /dev/null +++ b/services/loaders.go @@ -0,0 +1,139 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + "sync" + + v1 "github.com/cgrates/cgrates/apier/v1" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/loaders" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewLoaderService returns the Loader Service +func NewLoaderService(cfg *config.CGRConfig, dm *engine.DataManager, + filterSChan chan *engine.FilterS, server *utils.Server, + cacheSChan, dispatcherChan chan rpcclient.RpcClientConnection, + exitChan chan bool) servmanager.Service { + return &LoaderService{ + connChan: make(chan rpcclient.RpcClientConnection, 1), + cfg: cfg, + dm: dm, + cacheSChan: cacheSChan, + dispatcherChan: dispatcherChan, + filterSChan: filterSChan, + server: server, + exitChan: exitChan, + } +} + +// LoaderService implements Service interface +type LoaderService struct { + sync.RWMutex + cfg *config.CGRConfig + dm *engine.DataManager + filterSChan chan *engine.FilterS + server *utils.Server + cacheSChan chan rpcclient.RpcClientConnection + dispatcherChan chan rpcclient.RpcClientConnection + exitChan chan bool + + ldrs *loaders.LoaderService + rpc *v1.LoaderSv1 + connChan chan rpcclient.RpcClientConnection +} + +// Start should handle the sercive start +func (ldrs *LoaderService) Start() (err error) { + if ldrs.IsRunning() { + return fmt.Errorf("service aleady running") + } + + filterS := <-ldrs.filterSChan + ldrs.filterSChan <- filterS + internalChan := ldrs.cacheSChan + if ldrs.cfg.DispatcherSCfg().Enabled { + internalChan = ldrs.dispatcherChan + } + + ldrs.Lock() + defer ldrs.Unlock() + + ldrs.ldrs = loaders.NewLoaderService(ldrs.dm, ldrs.cfg.LoaderCfg(), + ldrs.cfg.GeneralCfg().DefaultTimezone, ldrs.exitChan, filterS, internalChan) + if !ldrs.ldrs.Enabled() { + return + } + ldrs.rpc = v1.NewLoaderSv1(ldrs.ldrs) + ldrs.server.RpcRegister(ldrs.rpc) + ldrs.connChan <- ldrs.rpc + return +} + +// GetIntenternalChan returns the internal connection chanel +func (ldrs *LoaderService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) { + return ldrs.connChan +} + +// Reload handles the change of config +func (ldrs *LoaderService) Reload() (err error) { + filterS := <-ldrs.filterSChan + ldrs.filterSChan <- filterS + ldrs.RLock() + internalChan := ldrs.cacheSChan + if ldrs.cfg.DispatcherSCfg().Enabled { + internalChan = ldrs.dispatcherChan + } + ldrs.ldrs.Reload(ldrs.dm, ldrs.cfg.LoaderCfg(), ldrs.cfg.GeneralCfg().DefaultTimezone, + ldrs.exitChan, filterS, internalChan) + ldrs.RUnlock() + return +} + +// Shutdown stops the service +func (ldrs *LoaderService) Shutdown() (err error) { + ldrs.Lock() + ldrs.ldrs = nil + ldrs.rpc = nil + <-ldrs.connChan + ldrs.Unlock() + return +} + +// IsRunning returns if the service is running +func (ldrs *LoaderService) IsRunning() bool { + ldrs.RLock() + defer ldrs.RUnlock() + return ldrs != nil && ldrs.ldrs != nil && ldrs.ldrs.Enabled() +} + +// ServiceName returns the service name +func (ldrs *LoaderService) ServiceName() string { + return utils.LoaderS +} + +// ShouldRun returns if the service should be running +func (ldrs *LoaderService) ShouldRun() bool { + return true +} diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 8fc7ea765..fa0b9b974 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -215,60 +215,13 @@ func (srvMngr *ServiceManager) GetExitChan() chan bool { // StartServices starts all enabled services func (srvMngr *ServiceManager) StartServices() (err error) { go srvMngr.handleReload() - - if srvMngr.GetConfig().AttributeSCfg().Enabled { - go srvMngr.startService(utils.AttributeS) - } - if srvMngr.GetConfig().ChargerSCfg().Enabled { - go srvMngr.startService(utils.ChargerS) - } - if srvMngr.GetConfig().ThresholdSCfg().Enabled { - go srvMngr.startService(utils.ThresholdS) - } - if srvMngr.GetConfig().StatSCfg().Enabled { - go srvMngr.startService(utils.StatS) - } - if srvMngr.GetConfig().ResourceSCfg().Enabled { - go srvMngr.startService(utils.ResourceS) - } - if srvMngr.GetConfig().SupplierSCfg().Enabled { - go srvMngr.startService(utils.SupplierS) - } - if srvMngr.GetConfig().SchedulerCfg().Enabled { - go srvMngr.startService(utils.SchedulerS) - } - if srvMngr.GetConfig().RalsCfg().Enabled { - go srvMngr.startService(utils.RALService) - } - if srvMngr.GetConfig().CdrsCfg().Enabled { - go srvMngr.startService(utils.CDRServer) - } - if srvMngr.GetConfig().SessionSCfg().Enabled { - go srvMngr.startService(utils.SessionS) - } - if srvMngr.GetConfig().ERsCfg().Enabled { - go srvMngr.startService(utils.ERs) - } - if srvMngr.GetConfig().DNSAgentCfg().Enabled { - go srvMngr.startService(utils.DNSAgent) - } - if srvMngr.GetConfig().FsAgentCfg().Enabled { - go srvMngr.startService(utils.FreeSWITCHAgent) - } - if srvMngr.GetConfig().KamAgentCfg().Enabled { - go srvMngr.startService(utils.KamailioAgent) - } - if srvMngr.GetConfig().AsteriskAgentCfg().Enabled { - go srvMngr.startService(utils.AsteriskAgent) - } - if srvMngr.GetConfig().RadiusAgentCfg().Enabled { - go srvMngr.startService(utils.RadiusAgent) - } - if srvMngr.GetConfig().DiameterAgentCfg().Enabled { - go srvMngr.startService(utils.DiameterAgent) - } - if len(srvMngr.GetConfig().HttpAgentCfg()) != 0 { - go srvMngr.startService(utils.HTTPAgent) + for _, serviceName := range []string{utils.AttributeS, utils.ChargerS, + utils.ThresholdS, utils.StatS, utils.ResourceS, utils.SupplierS, + utils.SchedulerS, utils.RALService, utils.CDRServer, utils.SessionS, + utils.ERs, utils.DNSAgent, utils.FreeSWITCHAgent, utils.KamailioAgent, + utils.AsteriskAgent, utils.RadiusAgent, utils.DiameterAgent, utils.HTTPAgent, + utils.LoaderS} { + go srvMngr.startServiceIfNeeded(serviceName) } // startServer() return @@ -378,6 +331,10 @@ func (srvMngr *ServiceManager) handleReload() { if err = srvMngr.reloadService(utils.HTTPAgent); err != nil { return } + case <-srvMngr.GetConfig().GetReloadChan(config.LoaderJson): + if err = srvMngr.reloadService(utils.LoaderS); err != nil { + return + } } // handle RPC server } @@ -410,8 +367,11 @@ func (srvMngr *ServiceManager) reloadService(srviceName string) (err error) { return } -func (srvMngr *ServiceManager) startService(srviceName string) { +func (srvMngr *ServiceManager) startServiceIfNeeded(srviceName string) { srv := srvMngr.GetService(srviceName) + if !srv.ShouldRun() { + return + } if err := srv.Start(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, srviceName, err)) srvMngr.engineShutdown <- true