mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Added CDRServer service implementation
This commit is contained in:
committed by
Dan Christian Bogos
parent
0fd0813b24
commit
92df8864b8
@@ -34,7 +34,6 @@ import (
|
||||
"github.com/cgrates/cgrates/agents"
|
||||
"github.com/cgrates/cgrates/analyzers"
|
||||
v1 "github.com/cgrates/cgrates/apier/v1"
|
||||
v2 "github.com/cgrates/cgrates/apier/v2"
|
||||
"github.com/cgrates/cgrates/cdrc"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/dispatchers"
|
||||
@@ -681,110 +680,6 @@ func startHTTPAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcC
|
||||
}
|
||||
}
|
||||
|
||||
func startCDRS(internalCdrSChan, internalRaterChan, internalAttributeSChan, internalThresholdSChan,
|
||||
internalStatSChan, internalChargerSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection,
|
||||
cdrDb engine.CdrStorage, dm *engine.DataManager, server *utils.Server,
|
||||
filterSChan chan *engine.FilterS, exitChan chan bool) {
|
||||
filterS := <-filterSChan
|
||||
filterSChan <- filterS
|
||||
var err error
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CDRs))
|
||||
|
||||
var ralConn, attrSConn, thresholdSConn, statsConn, chargerSConn rpcclient.RpcClientConnection
|
||||
|
||||
intChargerSChan := internalChargerSChan
|
||||
intRaterChan := internalRaterChan
|
||||
intAttributeSChan := internalAttributeSChan
|
||||
intThresholdSChan := internalThresholdSChan
|
||||
intStatSChan := internalStatSChan
|
||||
if cfg.DispatcherSCfg().Enabled {
|
||||
intChargerSChan = internalDispatcherSChan
|
||||
intRaterChan = internalDispatcherSChan
|
||||
intAttributeSChan = internalDispatcherSChan
|
||||
intThresholdSChan = internalDispatcherSChan
|
||||
intStatSChan = internalDispatcherSChan
|
||||
}
|
||||
if len(cfg.CdrsCfg().CDRSChargerSConns) != 0 { // Conn pool towards RAL
|
||||
chargerSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
|
||||
cfg.TlsCfg().ClientKey,
|
||||
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
|
||||
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
|
||||
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
|
||||
cfg.CdrsCfg().CDRSChargerSConns, intChargerSChan, false)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s",
|
||||
utils.ChargerS, err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
if len(cfg.CdrsCfg().CDRSRaterConns) != 0 { // Conn pool towards RAL
|
||||
ralConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
|
||||
cfg.TlsCfg().ClientKey,
|
||||
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
|
||||
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
|
||||
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
|
||||
cfg.CdrsCfg().CDRSRaterConns, intRaterChan, false)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to RAL: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
if len(cfg.CdrsCfg().CDRSAttributeSConns) != 0 { // Users connection init
|
||||
attrSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
|
||||
cfg.TlsCfg().ClientKey,
|
||||
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
|
||||
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
|
||||
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
|
||||
cfg.CdrsCfg().CDRSAttributeSConns, intAttributeSChan, false)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s",
|
||||
utils.AttributeS, err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
if len(cfg.CdrsCfg().CDRSThresholdSConns) != 0 { // Stats connection init
|
||||
thresholdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
|
||||
cfg.TlsCfg().ClientKey,
|
||||
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
|
||||
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
|
||||
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
|
||||
cfg.CdrsCfg().CDRSThresholdSConns, intThresholdSChan, false)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to ThresholdS: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
if len(cfg.CdrsCfg().CDRSStatSConns) != 0 { // Stats connection init
|
||||
statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
|
||||
cfg.TlsCfg().ClientKey,
|
||||
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
|
||||
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
|
||||
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
|
||||
cfg.CdrsCfg().CDRSStatSConns, intStatSChan, false)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to StatS: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
cdrServer := engine.NewCDRServer(cfg, cdrDb, dm,
|
||||
ralConn, attrSConn,
|
||||
thresholdSConn, statsConn, chargerSConn, filterS)
|
||||
utils.Logger.Info("Registering CDRS HTTP Handlers.")
|
||||
cdrServer.RegisterHandlersToServer(server)
|
||||
utils.Logger.Info("Registering CDRS RPC service.")
|
||||
cdrSrv := v1.NewCDRsV1(cdrServer)
|
||||
server.RpcRegister(cdrSrv)
|
||||
server.RpcRegister(&v2.CDRsV2{CDRsV1: *cdrSrv})
|
||||
// Make the cdr server available for internal communication
|
||||
server.RpcRegister(cdrServer) // register CdrServer for internal usage (TODO: refactor this)
|
||||
internalCdrSChan <- cdrServer // Signal that cdrS is operational
|
||||
}
|
||||
|
||||
// startFilterService fires up the FilterS
|
||||
func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS,
|
||||
internalStatSChan, internalResourceSChan, internalRalSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
|
||||
@@ -1345,7 +1240,8 @@ func main() {
|
||||
reS := services.NewResourceService()
|
||||
supS := services.NewSupplierService()
|
||||
schS := services.NewSchedulerService()
|
||||
srvManager.AddService(attrS, chrS, tS, stS, reS, supS, schS, services.NewCDRServer(internalCdrSChan))
|
||||
cdrS := services.NewCDRServer()
|
||||
srvManager.AddService(attrS, chrS, tS, stS, reS, supS, schS, cdrS, services.NewResponderService(internalRaterChan))
|
||||
internalAttributeSChan = attrS.GetIntenternalChan()
|
||||
internalChargerSChan = chrS.GetIntenternalChan()
|
||||
internalThresholdSChan = tS.GetIntenternalChan()
|
||||
@@ -1353,6 +1249,7 @@ func main() {
|
||||
internalRsChan = reS.GetIntenternalChan()
|
||||
internalSupplierSChan = supS.GetIntenternalChan()
|
||||
internalSchedSChan = schS.GetIntenternalChan()
|
||||
internalCdrSChan = cdrS.GetIntenternalChan()
|
||||
go srvManager.StartServices()
|
||||
|
||||
initServiceManagerV1(internalServeManagerChan, srvManager, server)
|
||||
@@ -1393,13 +1290,6 @@ func main() {
|
||||
/*srvManager*/ schS, server, dm, loadDb, cdrDb, cacheS, filterSChan, exitChan)
|
||||
}
|
||||
|
||||
// Start CDR Server
|
||||
if cfg.CdrsCfg().CDRSEnabled {
|
||||
go startCDRS(internalCdrSChan, internalRaterChan, internalAttributeSChan,
|
||||
internalThresholdSChan, internalStatSChan, internalChargerSChan,
|
||||
internalDispatcherSChan, cdrDb, dm, server, filterSChan, exitChan)
|
||||
}
|
||||
|
||||
// Start CDRC components if necessary
|
||||
go startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan, filterSChan, exitChan)
|
||||
|
||||
|
||||
@@ -1262,7 +1262,10 @@ func (cfg *CGRConfig) RalsCfg() *RalsCfg {
|
||||
return cfg.ralsCfg
|
||||
}
|
||||
|
||||
// CdrsCfg returns the config for CDR Server
|
||||
func (cfg *CGRConfig) CdrsCfg() *CdrsCfg {
|
||||
cfg.lks[CDRS_JSN].Lock()
|
||||
defer cfg.lks[CDRS_JSN].Unlock()
|
||||
return cfg.cdrsCfg
|
||||
}
|
||||
|
||||
@@ -1496,6 +1499,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) {
|
||||
}
|
||||
fallthrough
|
||||
case CDRS_JSN:
|
||||
cfg.rldChans[CDRS_JSN] <- struct{}{}
|
||||
if !fall {
|
||||
break
|
||||
}
|
||||
|
||||
@@ -19,33 +19,89 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package services
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
v1 "github.com/cgrates/cgrates/apier/v1"
|
||||
v2 "github.com/cgrates/cgrates/apier/v2"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
// NewCDRServer returns the CDR Service
|
||||
func NewCDRServer(connChan chan rpcclient.RpcClientConnection) servmanager.Service {
|
||||
// NewCDRServer returns the CDR Server
|
||||
func NewCDRServer() servmanager.Service {
|
||||
return &CDRServer{
|
||||
connChan: connChan,
|
||||
connChan: make(chan rpcclient.RpcClientConnection, 1),
|
||||
}
|
||||
}
|
||||
|
||||
// CDRServer implements Service interface
|
||||
// ToDo: Add the rest of functionality
|
||||
// only the chanel without reload functionality
|
||||
type CDRServer struct {
|
||||
// cdrS *engine.CDRServer
|
||||
// rpc *v1.CDRsV1
|
||||
sync.RWMutex
|
||||
cdrS *engine.CDRServer
|
||||
rpcv1 *v1.CDRsV1
|
||||
rpcv2 *v2.CDRsV2
|
||||
connChan chan rpcclient.RpcClientConnection
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (cdrS *CDRServer) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) {
|
||||
// if cdrS.IsRunning() {
|
||||
// return fmt.Errorf("service aleady running")
|
||||
// }
|
||||
return utils.ErrNotImplemented
|
||||
if cdrS.IsRunning() {
|
||||
return fmt.Errorf("service aleady running")
|
||||
}
|
||||
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CDRs))
|
||||
|
||||
var ralConn, attrSConn, thresholdSConn, statsConn, chargerSConn rpcclient.RpcClientConnection
|
||||
|
||||
chargerSConn, err = sp.GetConnection(utils.ChargerS, sp.GetConfig().CdrsCfg().CDRSChargerSConns)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s",
|
||||
utils.ChargerS, err.Error()))
|
||||
return
|
||||
}
|
||||
ralConn, err = sp.GetConnection(utils.ResponderS, sp.GetConfig().CdrsCfg().CDRSRaterConns)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s",
|
||||
utils.RALService, err.Error()))
|
||||
return
|
||||
}
|
||||
attrSConn, err = sp.GetConnection(utils.AttributeS, sp.GetConfig().CdrsCfg().CDRSAttributeSConns)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s",
|
||||
utils.AttributeS, err.Error()))
|
||||
return
|
||||
}
|
||||
thresholdSConn, err = sp.GetConnection(utils.ThresholdS, sp.GetConfig().CdrsCfg().CDRSThresholdSConns)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s",
|
||||
utils.ThresholdS, err.Error()))
|
||||
return
|
||||
}
|
||||
statsConn, err = sp.GetConnection(utils.StatS, sp.GetConfig().CdrsCfg().CDRSStatSConns)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s",
|
||||
utils.StatS, err.Error()))
|
||||
return
|
||||
}
|
||||
cdrS.Lock()
|
||||
defer cdrS.Unlock()
|
||||
cdrS.cdrS = engine.NewCDRServer(sp.GetConfig(), sp.GetCDRStorage(), sp.GetDM(),
|
||||
ralConn, attrSConn,
|
||||
thresholdSConn, statsConn, chargerSConn, sp.GetFilterS())
|
||||
utils.Logger.Info("Registering CDRS HTTP Handlers.")
|
||||
cdrS.cdrS.RegisterHandlersToServer(sp.GetServer())
|
||||
utils.Logger.Info("Registering CDRS RPC service.")
|
||||
cdrS.rpcv1 = v1.NewCDRsV1(cdrS.cdrS)
|
||||
cdrS.rpcv2 = &v2.CDRsV2{CDRsV1: *cdrS.rpcv1}
|
||||
sp.GetServer().RpcRegister(cdrS.rpcv1)
|
||||
sp.GetServer().RpcRegister(cdrS.rpcv2)
|
||||
// Make the cdr server available for internal communication
|
||||
sp.GetServer().RpcRegister(cdrS.cdrS) // register CdrServer for internal usage (TODO: refactor this)
|
||||
cdrS.connChan <- cdrS.cdrS // Signal that cdrS is operational
|
||||
return
|
||||
}
|
||||
|
||||
// GetIntenternalChan returns the internal connection chanel
|
||||
@@ -55,29 +111,26 @@ func (cdrS *CDRServer) GetIntenternalChan() (conn chan rpcclient.RpcClientConnec
|
||||
|
||||
// Reload handles the change of config
|
||||
func (cdrS *CDRServer) Reload(sp servmanager.ServiceProvider) (err error) {
|
||||
return utils.ErrNotImplemented
|
||||
return
|
||||
}
|
||||
|
||||
// Shutdown stops the service
|
||||
func (cdrS *CDRServer) Shutdown() (err error) {
|
||||
return utils.ErrNotImplemented
|
||||
// if err = cdrS.cdrS.Shutdown(); err != nil {
|
||||
// return
|
||||
// }
|
||||
// cdrS.cdrS = nil
|
||||
// cdrS.rpc = nil
|
||||
// <-cdrS.connChan
|
||||
// return
|
||||
cdrS.cdrS = nil
|
||||
cdrS.rpcv1 = nil
|
||||
cdrS.rpcv2 = nil
|
||||
<-cdrS.connChan
|
||||
return
|
||||
}
|
||||
|
||||
// GetRPCInterface returns the interface to register for server
|
||||
func (cdrS *CDRServer) GetRPCInterface() interface{} {
|
||||
return nil //cdrS.rpc
|
||||
return cdrS.cdrS
|
||||
}
|
||||
|
||||
// IsRunning returns if the service is running
|
||||
func (cdrS *CDRServer) IsRunning() bool {
|
||||
return cdrS != nil // && cdrS.cdrS != nil
|
||||
return cdrS != nil && cdrS.cdrS != nil
|
||||
}
|
||||
|
||||
// ServiceName returns the service name
|
||||
|
||||
86
services/responders.go
Normal file
86
services/responders.go
Normal file
@@ -0,0 +1,86 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package services
|
||||
|
||||
import (
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
// NewResponderService returns the Resonder Service
|
||||
func NewResponderService(connChan chan rpcclient.RpcClientConnection) servmanager.Service {
|
||||
return &ResponderService{
|
||||
connChan: connChan,
|
||||
}
|
||||
}
|
||||
|
||||
// ResponderService implements Service interface
|
||||
// ToDo: Add the rest of functionality
|
||||
// only the chanel without reload functionality
|
||||
type ResponderService struct {
|
||||
// resp *engine.ResponderService
|
||||
// rpc *v1.respV1
|
||||
connChan chan rpcclient.RpcClientConnection
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (resp *ResponderService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) {
|
||||
// if resp.IsRunning() {
|
||||
// return fmt.Errorf("service aleady running")
|
||||
// }
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
// GetIntenternalChan returns the internal connection chanel
|
||||
func (resp *ResponderService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) {
|
||||
return resp.connChan
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (resp *ResponderService) Reload(sp servmanager.ServiceProvider) (err error) {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
// Shutdown stops the service
|
||||
func (resp *ResponderService) Shutdown() (err error) {
|
||||
return utils.ErrNotImplemented
|
||||
// if err = resp.resp.Shutdown(); err != nil {
|
||||
// return
|
||||
// }
|
||||
// resp.resp = nil
|
||||
// resp.rpc = nil
|
||||
// <-resp.connChan
|
||||
// return
|
||||
}
|
||||
|
||||
// GetRPCInterface returns the interface to register for server
|
||||
func (resp *ResponderService) GetRPCInterface() interface{} {
|
||||
return nil //resp.rpc
|
||||
}
|
||||
|
||||
// IsRunning returns if the service is running
|
||||
func (resp *ResponderService) IsRunning() bool {
|
||||
return resp != nil // && resp.resp != nil
|
||||
}
|
||||
|
||||
// ServiceName returns the service name
|
||||
func (resp *ResponderService) ServiceName() string {
|
||||
return utils.ResponderS
|
||||
}
|
||||
@@ -59,7 +59,7 @@ func TestSchedulerSReload(t *testing.T) {
|
||||
schS := NewSchedulerService()
|
||||
internalCdrSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
internalCdrSChan <- nil
|
||||
srvMngr.AddService(schS, NewCDRServer(internalCdrSChan))
|
||||
srvMngr.AddService(schS, &CDRServer{connChan: internalCdrSChan})
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -241,10 +241,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
|
||||
go srvMngr.handleReload()
|
||||
if srvMngr.cfg.AttributeSCfg().Enabled {
|
||||
go func() {
|
||||
if attrS, has := srvMngr.subsystems[utils.AttributeS]; !has {
|
||||
if srv, has := srvMngr.subsystems[utils.AttributeS]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.AttributeS))
|
||||
srvMngr.engineShutdown <- true
|
||||
} else if err = attrS.Start(srvMngr, true); err != nil {
|
||||
} else if err = srv.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.AttributeS, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
@@ -252,10 +252,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
|
||||
}
|
||||
if srvMngr.cfg.ChargerSCfg().Enabled {
|
||||
go func() {
|
||||
if chrS, has := srvMngr.subsystems[utils.ChargerS]; !has {
|
||||
if srv, has := srvMngr.subsystems[utils.ChargerS]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ChargerS))
|
||||
srvMngr.engineShutdown <- true
|
||||
} else if err = chrS.Start(srvMngr, true); err != nil {
|
||||
} else if err = srv.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ChargerS, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
@@ -263,10 +263,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
|
||||
}
|
||||
if srvMngr.cfg.ThresholdSCfg().Enabled {
|
||||
go func() {
|
||||
if thrS, has := srvMngr.subsystems[utils.ThresholdS]; !has {
|
||||
if srv, has := srvMngr.subsystems[utils.ThresholdS]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ThresholdS))
|
||||
srvMngr.engineShutdown <- true
|
||||
} else if err = thrS.Start(srvMngr, true); err != nil {
|
||||
} else if err = srv.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ThresholdS, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
@@ -274,10 +274,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
|
||||
}
|
||||
if srvMngr.cfg.StatSCfg().Enabled {
|
||||
go func() {
|
||||
if stS, has := srvMngr.subsystems[utils.StatS]; !has {
|
||||
if srv, has := srvMngr.subsystems[utils.StatS]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.StatS))
|
||||
srvMngr.engineShutdown <- true
|
||||
} else if err = stS.Start(srvMngr, true); err != nil {
|
||||
} else if err = srv.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.StatS, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
@@ -285,10 +285,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
|
||||
}
|
||||
if srvMngr.cfg.ResourceSCfg().Enabled {
|
||||
go func() {
|
||||
if reS, has := srvMngr.subsystems[utils.ResourceS]; !has {
|
||||
if srv, has := srvMngr.subsystems[utils.ResourceS]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ResourceS))
|
||||
srvMngr.engineShutdown <- true
|
||||
} else if err = reS.Start(srvMngr, true); err != nil {
|
||||
} else if err = srv.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ResourceS, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
@@ -296,10 +296,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
|
||||
}
|
||||
if srvMngr.cfg.SupplierSCfg().Enabled {
|
||||
go func() {
|
||||
if supS, has := srvMngr.subsystems[utils.SupplierS]; !has {
|
||||
if srv, has := srvMngr.subsystems[utils.SupplierS]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.SupplierS))
|
||||
srvMngr.engineShutdown <- true
|
||||
} else if err = supS.Start(srvMngr, true); err != nil {
|
||||
} else if err = srv.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.SupplierS, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
@@ -307,15 +307,26 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
|
||||
}
|
||||
if srvMngr.cfg.SchedulerCfg().Enabled {
|
||||
go func() {
|
||||
if supS, has := srvMngr.subsystems[utils.SchedulerS]; !has {
|
||||
if srv, has := srvMngr.subsystems[utils.SchedulerS]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.SchedulerS))
|
||||
srvMngr.engineShutdown <- true
|
||||
} else if err = supS.Start(srvMngr, true); err != nil {
|
||||
} else if err = srv.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.SchedulerS, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
}()
|
||||
}
|
||||
if srvMngr.cfg.CdrsCfg().CDRSEnabled {
|
||||
go func() {
|
||||
if srv, has := srvMngr.subsystems[utils.CDRServer]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.CDRServer))
|
||||
srvMngr.engineShutdown <- true
|
||||
} else if err = srv.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.CDRServer, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
}()
|
||||
}
|
||||
// startServer()
|
||||
return
|
||||
}
|
||||
@@ -407,6 +418,16 @@ func (srvMngr *ServiceManager) handleReload() {
|
||||
if err = srvMngr.reloadService(srv, srvMngr.cfg.SchedulerCfg().Enabled); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.cfg.GetReloadChan(config.CDRS_JSN):
|
||||
srv, has := srvMngr.subsystems[utils.CDRServer]
|
||||
if !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.CDRServer))
|
||||
srvMngr.engineShutdown <- true
|
||||
return // stop if we encounter an error
|
||||
}
|
||||
if err = srvMngr.reloadService(srv, srvMngr.cfg.SchedulerCfg().Enabled); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
// handle RPC server
|
||||
}
|
||||
|
||||
@@ -601,6 +601,7 @@ const (
|
||||
CacheS = "CacheS"
|
||||
AnalyzerS = "AnalyzerS"
|
||||
CDRServer = "CDRServer"
|
||||
ResponderS = "ResponderS"
|
||||
)
|
||||
|
||||
// Lower service names
|
||||
|
||||
Reference in New Issue
Block a user