mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Added Analyzer as service in ServiceManager
This commit is contained in:
committed by
Dan Christian Bogos
parent
42a6396bcb
commit
3d10a8c390
@@ -638,7 +638,6 @@ func main() {
|
||||
// Define internal connections via channels
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
internalDispatcherSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
internalAnalyzerSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
|
||||
internalServeManagerChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
internalConfigChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
@@ -687,6 +686,7 @@ func main() {
|
||||
attrS.GetIntenternalChan(), cdrS.GetIntenternalChan(), internalDispatcherSChan, exitChan)
|
||||
|
||||
ldrs := services.NewLoaderService(cfg, dm, filterSChan, server, internalCacheSChan, internalDispatcherSChan, exitChan)
|
||||
anz := services.NewAnalyzerService(cfg, server, exitChan)
|
||||
srvManager.AddServices(attrS, chrS, tS, stS, reS, supS, schS, rals,
|
||||
rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS, smg,
|
||||
services.NewEventReaderService(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan),
|
||||
@@ -697,7 +697,7 @@ func main() {
|
||||
services.NewRadiusAgent(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan), // partial reload
|
||||
services.NewDiameterAgent(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan), // partial reload
|
||||
services.NewHTTPAgent(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, server), // no reload
|
||||
ldrs,
|
||||
ldrs, anz,
|
||||
)
|
||||
|
||||
srvManager.StartServices()
|
||||
@@ -712,7 +712,7 @@ func main() {
|
||||
// init internalRPCSet
|
||||
engine.IntRPC = engine.NewRPCClientSet()
|
||||
if cfg.DispatcherSCfg().Enabled {
|
||||
engine.IntRPC.AddInternalRPCClient(utils.AnalyzerSv1, internalAnalyzerSChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.AnalyzerSv1, anz.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.ApierV1, rals.GetAPIv1().GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.ApierV2, rals.GetAPIv2().GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, attrS.GetIntenternalChan())
|
||||
@@ -746,15 +746,11 @@ func main() {
|
||||
dm, server, exitChan)
|
||||
}
|
||||
|
||||
if cfg.AnalyzerSCfg().Enabled {
|
||||
go startAnalyzerService(internalAnalyzerSChan, server, exitChan)
|
||||
}
|
||||
|
||||
// Serve rpc connections
|
||||
go startRpc(server, rals.GetResponder().GetIntenternalChan(), cdrS.GetIntenternalChan(),
|
||||
reS.GetIntenternalChan(), stS.GetIntenternalChan(),
|
||||
attrS.GetIntenternalChan(), chrS.GetIntenternalChan(), tS.GetIntenternalChan(),
|
||||
supS.GetIntenternalChan(), smg.GetIntenternalChan(), internalAnalyzerSChan,
|
||||
supS.GetIntenternalChan(), smg.GetIntenternalChan(), anz.GetIntenternalChan(),
|
||||
internalDispatcherSChan, ldrs.GetIntenternalChan(), rals.GetIntenternalChan(), internalCacheSChan, exitChan)
|
||||
<-exitChan
|
||||
|
||||
|
||||
116
services/analyzers.go
Normal file
116
services/analyzers.go
Normal file
@@ -0,0 +1,116 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package services
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/cgrates/analyzers"
|
||||
v1 "github.com/cgrates/cgrates/apier/v1"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
// NewAnalyzerService returns the Analyzer Service
|
||||
func NewAnalyzerService(cfg *config.CGRConfig, server *utils.Server, exitChan chan bool) servmanager.Service {
|
||||
return &AnalyzerService{
|
||||
connChan: make(chan rpcclient.RpcClientConnection, 1),
|
||||
cfg: cfg,
|
||||
server: server,
|
||||
exitChan: exitChan,
|
||||
}
|
||||
}
|
||||
|
||||
// AnalyzerService implements Service interface
|
||||
type AnalyzerService struct {
|
||||
sync.RWMutex
|
||||
cfg *config.CGRConfig
|
||||
server *utils.Server
|
||||
exitChan chan bool
|
||||
|
||||
anz *analyzers.AnalyzerService
|
||||
rpc *v1.AnalyzerSv1
|
||||
connChan chan rpcclient.RpcClientConnection
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (anz *AnalyzerService) Start() (err error) {
|
||||
if anz.IsRunning() {
|
||||
return fmt.Errorf("service aleady running")
|
||||
}
|
||||
if anz.anz, err = analyzers.NewAnalyzerService(); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.AnalyzerS, err.Error()))
|
||||
anz.exitChan <- true
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
if err := anz.anz.ListenAndServe(anz.exitChan); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Error: %s listening for packets", utils.AnalyzerS, err.Error()))
|
||||
}
|
||||
anz.anz.Shutdown()
|
||||
anz.exitChan <- true
|
||||
return
|
||||
}()
|
||||
anz.rpc = v1.NewAnalyzerSv1(anz.anz)
|
||||
anz.server.RpcRegister(anz.rpc)
|
||||
anz.connChan <- anz.rpc
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// GetIntenternalChan returns the internal connection chanel
|
||||
func (anz *AnalyzerService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) {
|
||||
return anz.connChan
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (anz *AnalyzerService) Reload() (err error) {
|
||||
return // for the momment nothing to reload
|
||||
}
|
||||
|
||||
// Shutdown stops the service
|
||||
func (anz *AnalyzerService) Shutdown() (err error) {
|
||||
anz.Lock()
|
||||
anz.anz.Shutdown()
|
||||
anz.anz = nil
|
||||
anz.rpc = nil
|
||||
<-anz.connChan
|
||||
anz.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// IsRunning returns if the service is running
|
||||
func (anz *AnalyzerService) IsRunning() bool {
|
||||
anz.RLock()
|
||||
defer anz.RUnlock()
|
||||
return anz != nil && anz.anz != nil
|
||||
}
|
||||
|
||||
// ServiceName returns the service name
|
||||
func (anz *AnalyzerService) ServiceName() string {
|
||||
return utils.AnalyzerS
|
||||
}
|
||||
|
||||
// ShouldRun returns if the service should be running
|
||||
func (anz *AnalyzerService) ShouldRun() bool {
|
||||
return anz.cfg.AnalyzerSCfg().Enabled
|
||||
}
|
||||
@@ -166,6 +166,7 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
|
||||
utils.DiameterAgent: srvMngr.GetConfig().DiameterAgentCfg().Enabled,
|
||||
utils.HTTPAgent: len(srvMngr.GetConfig().HttpAgentCfg()) != 0,
|
||||
utils.LoaderS: true,
|
||||
utils.AnalyzerS: srvMngr.GetConfig().AnalyzerSCfg().Enabled,
|
||||
} {
|
||||
if shouldRun {
|
||||
go srvMngr.startService(serviceName)
|
||||
@@ -283,6 +284,10 @@ func (srvMngr *ServiceManager) handleReload() {
|
||||
if err = srvMngr.reloadService(utils.LoaderS); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.AnalyzerCfgJson):
|
||||
if err = srvMngr.reloadService(utils.AnalyzerS); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
// handle RPC server
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user