From 3d10a8c390a162150f9621ecccac7aad28d1a9cd Mon Sep 17 00:00:00 2001 From: Trial97 Date: Sun, 6 Oct 2019 15:21:50 +0300 Subject: [PATCH] Added Analyzer as service in ServiceManager --- cmd/cgr-engine/cgr-engine.go | 12 ++-- services/analyzers.go | 116 +++++++++++++++++++++++++++++++++++ servmanager/servmanager.go | 5 ++ 3 files changed, 125 insertions(+), 8 deletions(-) create mode 100644 services/analyzers.go diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 0d14a1e96..7385be195 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -638,7 +638,6 @@ func main() { // Define internal connections via channels filterSChan := make(chan *engine.FilterS, 1) internalDispatcherSChan := make(chan rpcclient.RpcClientConnection, 1) - internalAnalyzerSChan := make(chan rpcclient.RpcClientConnection, 1) internalServeManagerChan := make(chan rpcclient.RpcClientConnection, 1) internalConfigChan := make(chan rpcclient.RpcClientConnection, 1) @@ -687,6 +686,7 @@ func main() { attrS.GetIntenternalChan(), cdrS.GetIntenternalChan(), internalDispatcherSChan, exitChan) ldrs := services.NewLoaderService(cfg, dm, filterSChan, server, internalCacheSChan, internalDispatcherSChan, exitChan) + anz := services.NewAnalyzerService(cfg, server, exitChan) srvManager.AddServices(attrS, chrS, tS, stS, reS, supS, schS, rals, rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS, smg, services.NewEventReaderService(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan), @@ -697,7 +697,7 @@ func main() { services.NewRadiusAgent(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan), // partial reload services.NewDiameterAgent(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan), // partial reload services.NewHTTPAgent(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, server), // no reload - ldrs, + ldrs, anz, ) srvManager.StartServices() @@ -712,7 +712,7 @@ func main() { // init internalRPCSet engine.IntRPC = engine.NewRPCClientSet() if cfg.DispatcherSCfg().Enabled { - engine.IntRPC.AddInternalRPCClient(utils.AnalyzerSv1, internalAnalyzerSChan) + engine.IntRPC.AddInternalRPCClient(utils.AnalyzerSv1, anz.GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.ApierV1, rals.GetAPIv1().GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.ApierV2, rals.GetAPIv2().GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, attrS.GetIntenternalChan()) @@ -746,15 +746,11 @@ func main() { dm, server, exitChan) } - if cfg.AnalyzerSCfg().Enabled { - go startAnalyzerService(internalAnalyzerSChan, server, exitChan) - } - // Serve rpc connections go startRpc(server, rals.GetResponder().GetIntenternalChan(), cdrS.GetIntenternalChan(), reS.GetIntenternalChan(), stS.GetIntenternalChan(), attrS.GetIntenternalChan(), chrS.GetIntenternalChan(), tS.GetIntenternalChan(), - supS.GetIntenternalChan(), smg.GetIntenternalChan(), internalAnalyzerSChan, + supS.GetIntenternalChan(), smg.GetIntenternalChan(), anz.GetIntenternalChan(), internalDispatcherSChan, ldrs.GetIntenternalChan(), rals.GetIntenternalChan(), internalCacheSChan, exitChan) <-exitChan diff --git a/services/analyzers.go b/services/analyzers.go new file mode 100644 index 000000000..6c0858f10 --- /dev/null +++ b/services/analyzers.go @@ -0,0 +1,116 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + "sync" + + "github.com/cgrates/cgrates/analyzers" + v1 "github.com/cgrates/cgrates/apier/v1" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewAnalyzerService returns the Analyzer Service +func NewAnalyzerService(cfg *config.CGRConfig, server *utils.Server, exitChan chan bool) servmanager.Service { + return &AnalyzerService{ + connChan: make(chan rpcclient.RpcClientConnection, 1), + cfg: cfg, + server: server, + exitChan: exitChan, + } +} + +// AnalyzerService implements Service interface +type AnalyzerService struct { + sync.RWMutex + cfg *config.CGRConfig + server *utils.Server + exitChan chan bool + + anz *analyzers.AnalyzerService + rpc *v1.AnalyzerSv1 + connChan chan rpcclient.RpcClientConnection +} + +// Start should handle the sercive start +func (anz *AnalyzerService) Start() (err error) { + if anz.IsRunning() { + return fmt.Errorf("service aleady running") + } + if anz.anz, err = analyzers.NewAnalyzerService(); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.AnalyzerS, err.Error())) + anz.exitChan <- true + return + } + go func() { + if err := anz.anz.ListenAndServe(anz.exitChan); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Error: %s listening for packets", utils.AnalyzerS, err.Error())) + } + anz.anz.Shutdown() + anz.exitChan <- true + return + }() + anz.rpc = v1.NewAnalyzerSv1(anz.anz) + anz.server.RpcRegister(anz.rpc) + anz.connChan <- anz.rpc + + return +} + +// GetIntenternalChan returns the internal connection chanel +func (anz *AnalyzerService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) { + return anz.connChan +} + +// Reload handles the change of config +func (anz *AnalyzerService) Reload() (err error) { + return // for the momment nothing to reload +} + +// Shutdown stops the service +func (anz *AnalyzerService) Shutdown() (err error) { + anz.Lock() + anz.anz.Shutdown() + anz.anz = nil + anz.rpc = nil + <-anz.connChan + anz.Unlock() + return +} + +// IsRunning returns if the service is running +func (anz *AnalyzerService) IsRunning() bool { + anz.RLock() + defer anz.RUnlock() + return anz != nil && anz.anz != nil +} + +// ServiceName returns the service name +func (anz *AnalyzerService) ServiceName() string { + return utils.AnalyzerS +} + +// ShouldRun returns if the service should be running +func (anz *AnalyzerService) ShouldRun() bool { + return anz.cfg.AnalyzerSCfg().Enabled +} diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index a22cda57a..f3c704c9e 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -166,6 +166,7 @@ func (srvMngr *ServiceManager) StartServices() (err error) { utils.DiameterAgent: srvMngr.GetConfig().DiameterAgentCfg().Enabled, utils.HTTPAgent: len(srvMngr.GetConfig().HttpAgentCfg()) != 0, utils.LoaderS: true, + utils.AnalyzerS: srvMngr.GetConfig().AnalyzerSCfg().Enabled, } { if shouldRun { go srvMngr.startService(serviceName) @@ -283,6 +284,10 @@ func (srvMngr *ServiceManager) handleReload() { if err = srvMngr.reloadService(utils.LoaderS); err != nil { return } + case <-srvMngr.GetConfig().GetReloadChan(config.AnalyzerCfgJson): + if err = srvMngr.reloadService(utils.AnalyzerS); err != nil { + return + } } // handle RPC server }