Added LoaderS as service in ServiceManager

This commit is contained in:
Trial97
2019-10-06 13:40:55 +03:00
committed by Dan Christian Bogos
parent 44be111066
commit 0786d31c61
5 changed files with 181 additions and 87 deletions

View File

@@ -37,7 +37,6 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/loaders"
"github.com/cgrates/cgrates/services"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
@@ -145,24 +144,6 @@ func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS,
filterSChan <- engine.NewFilterS(cfg, internalStatSChan, internalResourceSChan, internalRalSChan, dm)
}
// loaderService will start and register APIs for LoaderService if enabled
func startLoaderS(internalLoaderSChan, cacheSChan chan rpcclient.RpcClientConnection,
cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server,
filterSChan chan *engine.FilterS, exitChan chan bool) {
filterS := <-filterSChan
filterSChan <- filterS
ldrS := loaders.NewLoaderService(dm, cfg.LoaderCfg(),
cfg.GeneralCfg().DefaultTimezone, exitChan, filterS, cacheSChan)
if !ldrS.Enabled() {
return
}
go ldrS.ListenAndServe(exitChan)
ldrSv1 := v1.NewLoaderSv1(ldrS)
server.RpcRegister(ldrSv1)
internalLoaderSChan <- ldrSv1
}
// startDispatcherService fires up the DispatcherS
func startDispatcherService(internalDispatcherSChan, internalAttributeSChan chan rpcclient.RpcClientConnection,
cfg *config.CGRConfig, cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
@@ -658,7 +639,6 @@ func main() {
filterSChan := make(chan *engine.FilterS, 1)
internalDispatcherSChan := make(chan rpcclient.RpcClientConnection, 1)
internalAnalyzerSChan := make(chan rpcclient.RpcClientConnection, 1)
internalLoaderSChan := make(chan rpcclient.RpcClientConnection, 1)
internalServeManagerChan := make(chan rpcclient.RpcClientConnection, 1)
internalConfigChan := make(chan rpcclient.RpcClientConnection, 1)
@@ -678,8 +658,7 @@ func main() {
initCoreSv1(internalCoreSv1Chan, server)
// Start ServiceManager
srvManager := servmanager.NewServiceManager(cfg, dm, cdrDb,
loadDb, filterSChan, server, internalDispatcherSChan, exitChan)
srvManager := servmanager.NewServiceManager(cfg, exitChan)
attrS := services.NewAttributeService(cfg, dm, cacheS, filterSChan, server)
chrS := services.NewChargerService(cfg, dm, cacheS, filterSChan, server,
@@ -707,6 +686,7 @@ func main() {
tS.GetIntenternalChan(), stS.GetIntenternalChan(), supS.GetIntenternalChan(),
attrS.GetIntenternalChan(), cdrS.GetIntenternalChan(), internalDispatcherSChan, exitChan)
ldrs := services.NewLoaderService(cfg, dm, filterSChan, server, internalCacheSChan, internalDispatcherSChan, exitChan)
srvManager.AddServices(attrS, chrS, tS, stS, reS, supS, schS, rals,
rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS, smg,
services.NewEventReaderService(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan),
@@ -717,6 +697,7 @@ func main() {
services.NewRadiusAgent(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan), // partial reload
services.NewDiameterAgent(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan), // partial reload
services.NewHTTPAgent(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, server), // no reload
ldrs,
)
srvManager.StartServices()
@@ -740,7 +721,7 @@ func main() {
engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, cdrS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.ChargerSv1, chrS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.GuardianSv1, internalGuardianSChan)
engine.IntRPC.AddInternalRPCClient(utils.LoaderSv1, internalLoaderSChan)
engine.IntRPC.AddInternalRPCClient(utils.LoaderSv1, ldrs.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.ResourceSv1, reS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.Responder, rals.GetResponder().GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.SchedulerSv1, schS.GetIntenternalChan())
@@ -769,14 +750,12 @@ func main() {
go startAnalyzerService(internalAnalyzerSChan, server, exitChan)
}
go startLoaderS(internalLoaderSChan, internalCacheSChan, cfg, dm, server, filterSChan, exitChan)
// Serve rpc connections
go startRpc(server, rals.GetResponder().GetIntenternalChan(), cdrS.GetIntenternalChan(),
reS.GetIntenternalChan(), stS.GetIntenternalChan(),
attrS.GetIntenternalChan(), chrS.GetIntenternalChan(), tS.GetIntenternalChan(),
supS.GetIntenternalChan(), smg.GetIntenternalChan(), internalAnalyzerSChan,
internalDispatcherSChan, internalLoaderSChan, rals.GetIntenternalChan(), internalCacheSChan, exitChan)
internalDispatcherSChan, ldrs.GetIntenternalChan(), rals.GetIntenternalChan(), internalCacheSChan, exitChan)
<-exitChan
if *cpuProfDir != "" { // wait to end cpuProfiling

View File

@@ -1654,11 +1654,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) {
}
fallthrough
case LoaderJson:
if !fall {
break
}
fallthrough
case CgrLoaderCfgJson:
cfg.rldChans[LoaderJson] <- struct{}{}
if !fall {
break
}

View File

@@ -21,6 +21,7 @@ package loaders
import (
"errors"
"fmt"
"sync"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -43,6 +44,7 @@ func NewLoaderService(dm *engine.DataManager, ldrsCfg []*config.LoaderSCfg,
// LoaderS is the Loader service handling independent Loaders
type LoaderService struct {
sync.RWMutex
ldrs map[string]*Loader
}
@@ -57,7 +59,6 @@ func (ldrS *LoaderService) Enabled() bool {
}
func (ldrS *LoaderService) ListenAndServe(exitChan chan bool) (err error) {
// seems useless
ldrExitChan := make(chan struct{})
for _, ldr := range ldrS.ldrs {
go ldr.ListenAndServe(ldrExitChan)
@@ -80,6 +81,8 @@ type ArgsProcessFolder struct {
func (ldrS *LoaderService) V1Load(args *ArgsProcessFolder,
rply *string) (err error) {
ldrS.RLock()
defer ldrS.RUnlock()
if args.LoaderID == "" {
args.LoaderID = utils.META_DEFAULT
}
@@ -111,6 +114,8 @@ func (ldrS *LoaderService) V1Load(args *ArgsProcessFolder,
func (ldrS *LoaderService) V1Remove(args *ArgsProcessFolder,
rply *string) (err error) {
ldrS.RLock()
defer ldrS.RUnlock()
if args.LoaderID == "" {
args.LoaderID = utils.META_DEFAULT
}
@@ -139,3 +144,18 @@ func (ldrS *LoaderService) V1Remove(args *ArgsProcessFolder,
*rply = utils.OK
return
}
// Reload recreates the loaders map thread safe
func (ldrS *LoaderService) Reload(dm *engine.DataManager, ldrsCfg []*config.LoaderSCfg,
timezone string, exitChan chan bool, filterS *engine.FilterS,
internalCacheSChan chan rpcclient.RpcClientConnection) {
ldrS.Lock()
ldrS.ldrs = make(map[string]*Loader)
for _, ldrCfg := range ldrsCfg {
if !ldrCfg.Enabled {
continue
}
ldrS.ldrs[ldrCfg.Id] = NewLoader(dm, ldrCfg, timezone, exitChan, filterS, internalCacheSChan)
}
ldrS.Unlock()
}

139
services/loaders.go Normal file
View File

@@ -0,0 +1,139 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package services
import (
"fmt"
"sync"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/loaders"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// NewLoaderService returns the Loader Service
func NewLoaderService(cfg *config.CGRConfig, dm *engine.DataManager,
filterSChan chan *engine.FilterS, server *utils.Server,
cacheSChan, dispatcherChan chan rpcclient.RpcClientConnection,
exitChan chan bool) servmanager.Service {
return &LoaderService{
connChan: make(chan rpcclient.RpcClientConnection, 1),
cfg: cfg,
dm: dm,
cacheSChan: cacheSChan,
dispatcherChan: dispatcherChan,
filterSChan: filterSChan,
server: server,
exitChan: exitChan,
}
}
// LoaderService implements Service interface
type LoaderService struct {
sync.RWMutex
cfg *config.CGRConfig
dm *engine.DataManager
filterSChan chan *engine.FilterS
server *utils.Server
cacheSChan chan rpcclient.RpcClientConnection
dispatcherChan chan rpcclient.RpcClientConnection
exitChan chan bool
ldrs *loaders.LoaderService
rpc *v1.LoaderSv1
connChan chan rpcclient.RpcClientConnection
}
// Start should handle the sercive start
func (ldrs *LoaderService) Start() (err error) {
if ldrs.IsRunning() {
return fmt.Errorf("service aleady running")
}
filterS := <-ldrs.filterSChan
ldrs.filterSChan <- filterS
internalChan := ldrs.cacheSChan
if ldrs.cfg.DispatcherSCfg().Enabled {
internalChan = ldrs.dispatcherChan
}
ldrs.Lock()
defer ldrs.Unlock()
ldrs.ldrs = loaders.NewLoaderService(ldrs.dm, ldrs.cfg.LoaderCfg(),
ldrs.cfg.GeneralCfg().DefaultTimezone, ldrs.exitChan, filterS, internalChan)
if !ldrs.ldrs.Enabled() {
return
}
ldrs.rpc = v1.NewLoaderSv1(ldrs.ldrs)
ldrs.server.RpcRegister(ldrs.rpc)
ldrs.connChan <- ldrs.rpc
return
}
// GetIntenternalChan returns the internal connection chanel
func (ldrs *LoaderService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) {
return ldrs.connChan
}
// Reload handles the change of config
func (ldrs *LoaderService) Reload() (err error) {
filterS := <-ldrs.filterSChan
ldrs.filterSChan <- filterS
ldrs.RLock()
internalChan := ldrs.cacheSChan
if ldrs.cfg.DispatcherSCfg().Enabled {
internalChan = ldrs.dispatcherChan
}
ldrs.ldrs.Reload(ldrs.dm, ldrs.cfg.LoaderCfg(), ldrs.cfg.GeneralCfg().DefaultTimezone,
ldrs.exitChan, filterS, internalChan)
ldrs.RUnlock()
return
}
// Shutdown stops the service
func (ldrs *LoaderService) Shutdown() (err error) {
ldrs.Lock()
ldrs.ldrs = nil
ldrs.rpc = nil
<-ldrs.connChan
ldrs.Unlock()
return
}
// IsRunning returns if the service is running
func (ldrs *LoaderService) IsRunning() bool {
ldrs.RLock()
defer ldrs.RUnlock()
return ldrs != nil && ldrs.ldrs != nil && ldrs.ldrs.Enabled()
}
// ServiceName returns the service name
func (ldrs *LoaderService) ServiceName() string {
return utils.LoaderS
}
// ShouldRun returns if the service should be running
func (ldrs *LoaderService) ShouldRun() bool {
return true
}

View File

@@ -215,60 +215,13 @@ func (srvMngr *ServiceManager) GetExitChan() chan bool {
// StartServices starts all enabled services
func (srvMngr *ServiceManager) StartServices() (err error) {
go srvMngr.handleReload()
if srvMngr.GetConfig().AttributeSCfg().Enabled {
go srvMngr.startService(utils.AttributeS)
}
if srvMngr.GetConfig().ChargerSCfg().Enabled {
go srvMngr.startService(utils.ChargerS)
}
if srvMngr.GetConfig().ThresholdSCfg().Enabled {
go srvMngr.startService(utils.ThresholdS)
}
if srvMngr.GetConfig().StatSCfg().Enabled {
go srvMngr.startService(utils.StatS)
}
if srvMngr.GetConfig().ResourceSCfg().Enabled {
go srvMngr.startService(utils.ResourceS)
}
if srvMngr.GetConfig().SupplierSCfg().Enabled {
go srvMngr.startService(utils.SupplierS)
}
if srvMngr.GetConfig().SchedulerCfg().Enabled {
go srvMngr.startService(utils.SchedulerS)
}
if srvMngr.GetConfig().RalsCfg().Enabled {
go srvMngr.startService(utils.RALService)
}
if srvMngr.GetConfig().CdrsCfg().Enabled {
go srvMngr.startService(utils.CDRServer)
}
if srvMngr.GetConfig().SessionSCfg().Enabled {
go srvMngr.startService(utils.SessionS)
}
if srvMngr.GetConfig().ERsCfg().Enabled {
go srvMngr.startService(utils.ERs)
}
if srvMngr.GetConfig().DNSAgentCfg().Enabled {
go srvMngr.startService(utils.DNSAgent)
}
if srvMngr.GetConfig().FsAgentCfg().Enabled {
go srvMngr.startService(utils.FreeSWITCHAgent)
}
if srvMngr.GetConfig().KamAgentCfg().Enabled {
go srvMngr.startService(utils.KamailioAgent)
}
if srvMngr.GetConfig().AsteriskAgentCfg().Enabled {
go srvMngr.startService(utils.AsteriskAgent)
}
if srvMngr.GetConfig().RadiusAgentCfg().Enabled {
go srvMngr.startService(utils.RadiusAgent)
}
if srvMngr.GetConfig().DiameterAgentCfg().Enabled {
go srvMngr.startService(utils.DiameterAgent)
}
if len(srvMngr.GetConfig().HttpAgentCfg()) != 0 {
go srvMngr.startService(utils.HTTPAgent)
for _, serviceName := range []string{utils.AttributeS, utils.ChargerS,
utils.ThresholdS, utils.StatS, utils.ResourceS, utils.SupplierS,
utils.SchedulerS, utils.RALService, utils.CDRServer, utils.SessionS,
utils.ERs, utils.DNSAgent, utils.FreeSWITCHAgent, utils.KamailioAgent,
utils.AsteriskAgent, utils.RadiusAgent, utils.DiameterAgent, utils.HTTPAgent,
utils.LoaderS} {
go srvMngr.startServiceIfNeeded(serviceName)
}
// startServer()
return
@@ -378,6 +331,10 @@ func (srvMngr *ServiceManager) handleReload() {
if err = srvMngr.reloadService(utils.HTTPAgent); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.LoaderJson):
if err = srvMngr.reloadService(utils.LoaderS); err != nil {
return
}
}
// handle RPC server
}
@@ -410,8 +367,11 @@ func (srvMngr *ServiceManager) reloadService(srviceName string) (err error) {
return
}
func (srvMngr *ServiceManager) startService(srviceName string) {
func (srvMngr *ServiceManager) startServiceIfNeeded(srviceName string) {
srv := srvMngr.GetService(srviceName)
if !srv.ShouldRun() {
return
}
if err := srv.Start(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, srviceName, err))
srvMngr.engineShutdown <- true