From 92df8864b87641ac22f0b15a9e2f507c57b68cab Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 19 Sep 2019 18:03:50 +0300 Subject: [PATCH] Added CDRServer service implementation --- cmd/cgr-engine/cgr-engine.go | 116 +-------------------------------- config/config.go | 4 ++ services/cdrs.go | 97 ++++++++++++++++++++------- services/responders.go | 86 ++++++++++++++++++++++++ services/schedulers_it_test.go | 2 +- servmanager/servmanager.go | 49 ++++++++++---- utils/consts.go | 1 + 7 files changed, 205 insertions(+), 150 deletions(-) create mode 100644 services/responders.go diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index ea2125e3d..1d84df05a 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -34,7 +34,6 @@ import ( "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/analyzers" v1 "github.com/cgrates/cgrates/apier/v1" - v2 "github.com/cgrates/cgrates/apier/v2" "github.com/cgrates/cgrates/cdrc" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/dispatchers" @@ -681,110 +680,6 @@ func startHTTPAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcC } } -func startCDRS(internalCdrSChan, internalRaterChan, internalAttributeSChan, internalThresholdSChan, - internalStatSChan, internalChargerSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, - cdrDb engine.CdrStorage, dm *engine.DataManager, server *utils.Server, - filterSChan chan *engine.FilterS, exitChan chan bool) { - filterS := <-filterSChan - filterSChan <- filterS - var err error - utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CDRs)) - - var ralConn, attrSConn, thresholdSConn, statsConn, chargerSConn rpcclient.RpcClientConnection - - intChargerSChan := internalChargerSChan - intRaterChan := internalRaterChan - intAttributeSChan := internalAttributeSChan - intThresholdSChan := internalThresholdSChan - intStatSChan := internalStatSChan - if cfg.DispatcherSCfg().Enabled { - intChargerSChan = internalDispatcherSChan - intRaterChan = internalDispatcherSChan - intAttributeSChan = internalDispatcherSChan - intThresholdSChan = internalDispatcherSChan - intStatSChan = internalDispatcherSChan - } - if len(cfg.CdrsCfg().CDRSChargerSConns) != 0 { // Conn pool towards RAL - chargerSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.CdrsCfg().CDRSChargerSConns, intChargerSChan, false) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", - utils.ChargerS, err.Error())) - exitChan <- true - return - } - } - if len(cfg.CdrsCfg().CDRSRaterConns) != 0 { // Conn pool towards RAL - ralConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.CdrsCfg().CDRSRaterConns, intRaterChan, false) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) - exitChan <- true - return - } - } - if len(cfg.CdrsCfg().CDRSAttributeSConns) != 0 { // Users connection init - attrSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.CdrsCfg().CDRSAttributeSConns, intAttributeSChan, false) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", - utils.AttributeS, err.Error())) - exitChan <- true - return - } - } - if len(cfg.CdrsCfg().CDRSThresholdSConns) != 0 { // Stats connection init - thresholdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.CdrsCfg().CDRSThresholdSConns, intThresholdSChan, false) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to ThresholdS: %s", err.Error())) - exitChan <- true - return - } - } - if len(cfg.CdrsCfg().CDRSStatSConns) != 0 { // Stats connection init - statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.CdrsCfg().CDRSStatSConns, intStatSChan, false) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to StatS: %s", err.Error())) - exitChan <- true - return - } - } - cdrServer := engine.NewCDRServer(cfg, cdrDb, dm, - ralConn, attrSConn, - thresholdSConn, statsConn, chargerSConn, filterS) - utils.Logger.Info("Registering CDRS HTTP Handlers.") - cdrServer.RegisterHandlersToServer(server) - utils.Logger.Info("Registering CDRS RPC service.") - cdrSrv := v1.NewCDRsV1(cdrServer) - server.RpcRegister(cdrSrv) - server.RpcRegister(&v2.CDRsV2{CDRsV1: *cdrSrv}) - // Make the cdr server available for internal communication - server.RpcRegister(cdrServer) // register CdrServer for internal usage (TODO: refactor this) - internalCdrSChan <- cdrServer // Signal that cdrS is operational -} - // startFilterService fires up the FilterS func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS, internalStatSChan, internalResourceSChan, internalRalSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, @@ -1345,7 +1240,8 @@ func main() { reS := services.NewResourceService() supS := services.NewSupplierService() schS := services.NewSchedulerService() - srvManager.AddService(attrS, chrS, tS, stS, reS, supS, schS, services.NewCDRServer(internalCdrSChan)) + cdrS := services.NewCDRServer() + srvManager.AddService(attrS, chrS, tS, stS, reS, supS, schS, cdrS, services.NewResponderService(internalRaterChan)) internalAttributeSChan = attrS.GetIntenternalChan() internalChargerSChan = chrS.GetIntenternalChan() internalThresholdSChan = tS.GetIntenternalChan() @@ -1353,6 +1249,7 @@ func main() { internalRsChan = reS.GetIntenternalChan() internalSupplierSChan = supS.GetIntenternalChan() internalSchedSChan = schS.GetIntenternalChan() + internalCdrSChan = cdrS.GetIntenternalChan() go srvManager.StartServices() initServiceManagerV1(internalServeManagerChan, srvManager, server) @@ -1393,13 +1290,6 @@ func main() { /*srvManager*/ schS, server, dm, loadDb, cdrDb, cacheS, filterSChan, exitChan) } - // Start CDR Server - if cfg.CdrsCfg().CDRSEnabled { - go startCDRS(internalCdrSChan, internalRaterChan, internalAttributeSChan, - internalThresholdSChan, internalStatSChan, internalChargerSChan, - internalDispatcherSChan, cdrDb, dm, server, filterSChan, exitChan) - } - // Start CDRC components if necessary go startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan, filterSChan, exitChan) diff --git a/config/config.go b/config/config.go index e174b1aac..c3ddad165 100755 --- a/config/config.go +++ b/config/config.go @@ -1262,7 +1262,10 @@ func (cfg *CGRConfig) RalsCfg() *RalsCfg { return cfg.ralsCfg } +// CdrsCfg returns the config for CDR Server func (cfg *CGRConfig) CdrsCfg() *CdrsCfg { + cfg.lks[CDRS_JSN].Lock() + defer cfg.lks[CDRS_JSN].Unlock() return cfg.cdrsCfg } @@ -1496,6 +1499,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) { } fallthrough case CDRS_JSN: + cfg.rldChans[CDRS_JSN] <- struct{}{} if !fall { break } diff --git a/services/cdrs.go b/services/cdrs.go index d265ce438..c6028c048 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -19,33 +19,89 @@ along with this program. If not, see package services import ( + "fmt" + "sync" + + v1 "github.com/cgrates/cgrates/apier/v1" + v2 "github.com/cgrates/cgrates/apier/v2" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) -// NewCDRServer returns the CDR Service -func NewCDRServer(connChan chan rpcclient.RpcClientConnection) servmanager.Service { +// NewCDRServer returns the CDR Server +func NewCDRServer() servmanager.Service { return &CDRServer{ - connChan: connChan, + connChan: make(chan rpcclient.RpcClientConnection, 1), } } // CDRServer implements Service interface -// ToDo: Add the rest of functionality -// only the chanel without reload functionality type CDRServer struct { - // cdrS *engine.CDRServer - // rpc *v1.CDRsV1 + sync.RWMutex + cdrS *engine.CDRServer + rpcv1 *v1.CDRsV1 + rpcv2 *v2.CDRsV2 connChan chan rpcclient.RpcClientConnection } // Start should handle the sercive start func (cdrS *CDRServer) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { - // if cdrS.IsRunning() { - // return fmt.Errorf("service aleady running") - // } - return utils.ErrNotImplemented + if cdrS.IsRunning() { + return fmt.Errorf("service aleady running") + } + + utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CDRs)) + + var ralConn, attrSConn, thresholdSConn, statsConn, chargerSConn rpcclient.RpcClientConnection + + chargerSConn, err = sp.GetConnection(utils.ChargerS, sp.GetConfig().CdrsCfg().CDRSChargerSConns) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", + utils.ChargerS, err.Error())) + return + } + ralConn, err = sp.GetConnection(utils.ResponderS, sp.GetConfig().CdrsCfg().CDRSRaterConns) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", + utils.RALService, err.Error())) + return + } + attrSConn, err = sp.GetConnection(utils.AttributeS, sp.GetConfig().CdrsCfg().CDRSAttributeSConns) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", + utils.AttributeS, err.Error())) + return + } + thresholdSConn, err = sp.GetConnection(utils.ThresholdS, sp.GetConfig().CdrsCfg().CDRSThresholdSConns) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", + utils.ThresholdS, err.Error())) + return + } + statsConn, err = sp.GetConnection(utils.StatS, sp.GetConfig().CdrsCfg().CDRSStatSConns) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", + utils.StatS, err.Error())) + return + } + cdrS.Lock() + defer cdrS.Unlock() + cdrS.cdrS = engine.NewCDRServer(sp.GetConfig(), sp.GetCDRStorage(), sp.GetDM(), + ralConn, attrSConn, + thresholdSConn, statsConn, chargerSConn, sp.GetFilterS()) + utils.Logger.Info("Registering CDRS HTTP Handlers.") + cdrS.cdrS.RegisterHandlersToServer(sp.GetServer()) + utils.Logger.Info("Registering CDRS RPC service.") + cdrS.rpcv1 = v1.NewCDRsV1(cdrS.cdrS) + cdrS.rpcv2 = &v2.CDRsV2{CDRsV1: *cdrS.rpcv1} + sp.GetServer().RpcRegister(cdrS.rpcv1) + sp.GetServer().RpcRegister(cdrS.rpcv2) + // Make the cdr server available for internal communication + sp.GetServer().RpcRegister(cdrS.cdrS) // register CdrServer for internal usage (TODO: refactor this) + cdrS.connChan <- cdrS.cdrS // Signal that cdrS is operational + return } // GetIntenternalChan returns the internal connection chanel @@ -55,29 +111,26 @@ func (cdrS *CDRServer) GetIntenternalChan() (conn chan rpcclient.RpcClientConnec // Reload handles the change of config func (cdrS *CDRServer) Reload(sp servmanager.ServiceProvider) (err error) { - return utils.ErrNotImplemented + return } // Shutdown stops the service func (cdrS *CDRServer) Shutdown() (err error) { - return utils.ErrNotImplemented - // if err = cdrS.cdrS.Shutdown(); err != nil { - // return - // } - // cdrS.cdrS = nil - // cdrS.rpc = nil - // <-cdrS.connChan - // return + cdrS.cdrS = nil + cdrS.rpcv1 = nil + cdrS.rpcv2 = nil + <-cdrS.connChan + return } // GetRPCInterface returns the interface to register for server func (cdrS *CDRServer) GetRPCInterface() interface{} { - return nil //cdrS.rpc + return cdrS.cdrS } // IsRunning returns if the service is running func (cdrS *CDRServer) IsRunning() bool { - return cdrS != nil // && cdrS.cdrS != nil + return cdrS != nil && cdrS.cdrS != nil } // ServiceName returns the service name diff --git a/services/responders.go b/services/responders.go new file mode 100644 index 000000000..bd92ee745 --- /dev/null +++ b/services/responders.go @@ -0,0 +1,86 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewResponderService returns the Resonder Service +func NewResponderService(connChan chan rpcclient.RpcClientConnection) servmanager.Service { + return &ResponderService{ + connChan: connChan, + } +} + +// ResponderService implements Service interface +// ToDo: Add the rest of functionality +// only the chanel without reload functionality +type ResponderService struct { + // resp *engine.ResponderService + // rpc *v1.respV1 + connChan chan rpcclient.RpcClientConnection +} + +// Start should handle the sercive start +func (resp *ResponderService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { + // if resp.IsRunning() { + // return fmt.Errorf("service aleady running") + // } + return utils.ErrNotImplemented +} + +// GetIntenternalChan returns the internal connection chanel +func (resp *ResponderService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) { + return resp.connChan +} + +// Reload handles the change of config +func (resp *ResponderService) Reload(sp servmanager.ServiceProvider) (err error) { + return utils.ErrNotImplemented +} + +// Shutdown stops the service +func (resp *ResponderService) Shutdown() (err error) { + return utils.ErrNotImplemented + // if err = resp.resp.Shutdown(); err != nil { + // return + // } + // resp.resp = nil + // resp.rpc = nil + // <-resp.connChan + // return +} + +// GetRPCInterface returns the interface to register for server +func (resp *ResponderService) GetRPCInterface() interface{} { + return nil //resp.rpc +} + +// IsRunning returns if the service is running +func (resp *ResponderService) IsRunning() bool { + return resp != nil // && resp.resp != nil +} + +// ServiceName returns the service name +func (resp *ResponderService) ServiceName() string { + return utils.ResponderS +} diff --git a/services/schedulers_it_test.go b/services/schedulers_it_test.go index f56ee8547..7448a8e91 100644 --- a/services/schedulers_it_test.go +++ b/services/schedulers_it_test.go @@ -59,7 +59,7 @@ func TestSchedulerSReload(t *testing.T) { schS := NewSchedulerService() internalCdrSChan := make(chan rpcclient.RpcClientConnection, 1) internalCdrSChan <- nil - srvMngr.AddService(schS, NewCDRServer(internalCdrSChan)) + srvMngr.AddService(schS, &CDRServer{connChan: internalCdrSChan}) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 456dbaf18..64688991d 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -241,10 +241,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) { go srvMngr.handleReload() if srvMngr.cfg.AttributeSCfg().Enabled { go func() { - if attrS, has := srvMngr.subsystems[utils.AttributeS]; !has { + if srv, has := srvMngr.subsystems[utils.AttributeS]; !has { utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.AttributeS)) srvMngr.engineShutdown <- true - } else if err = attrS.Start(srvMngr, true); err != nil { + } else if err = srv.Start(srvMngr, true); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.AttributeS, err)) srvMngr.engineShutdown <- true } @@ -252,10 +252,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) { } if srvMngr.cfg.ChargerSCfg().Enabled { go func() { - if chrS, has := srvMngr.subsystems[utils.ChargerS]; !has { + if srv, has := srvMngr.subsystems[utils.ChargerS]; !has { utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ChargerS)) srvMngr.engineShutdown <- true - } else if err = chrS.Start(srvMngr, true); err != nil { + } else if err = srv.Start(srvMngr, true); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ChargerS, err)) srvMngr.engineShutdown <- true } @@ -263,10 +263,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) { } if srvMngr.cfg.ThresholdSCfg().Enabled { go func() { - if thrS, has := srvMngr.subsystems[utils.ThresholdS]; !has { + if srv, has := srvMngr.subsystems[utils.ThresholdS]; !has { utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ThresholdS)) srvMngr.engineShutdown <- true - } else if err = thrS.Start(srvMngr, true); err != nil { + } else if err = srv.Start(srvMngr, true); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ThresholdS, err)) srvMngr.engineShutdown <- true } @@ -274,10 +274,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) { } if srvMngr.cfg.StatSCfg().Enabled { go func() { - if stS, has := srvMngr.subsystems[utils.StatS]; !has { + if srv, has := srvMngr.subsystems[utils.StatS]; !has { utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.StatS)) srvMngr.engineShutdown <- true - } else if err = stS.Start(srvMngr, true); err != nil { + } else if err = srv.Start(srvMngr, true); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.StatS, err)) srvMngr.engineShutdown <- true } @@ -285,10 +285,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) { } if srvMngr.cfg.ResourceSCfg().Enabled { go func() { - if reS, has := srvMngr.subsystems[utils.ResourceS]; !has { + if srv, has := srvMngr.subsystems[utils.ResourceS]; !has { utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ResourceS)) srvMngr.engineShutdown <- true - } else if err = reS.Start(srvMngr, true); err != nil { + } else if err = srv.Start(srvMngr, true); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ResourceS, err)) srvMngr.engineShutdown <- true } @@ -296,10 +296,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) { } if srvMngr.cfg.SupplierSCfg().Enabled { go func() { - if supS, has := srvMngr.subsystems[utils.SupplierS]; !has { + if srv, has := srvMngr.subsystems[utils.SupplierS]; !has { utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.SupplierS)) srvMngr.engineShutdown <- true - } else if err = supS.Start(srvMngr, true); err != nil { + } else if err = srv.Start(srvMngr, true); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.SupplierS, err)) srvMngr.engineShutdown <- true } @@ -307,15 +307,26 @@ func (srvMngr *ServiceManager) StartServices() (err error) { } if srvMngr.cfg.SchedulerCfg().Enabled { go func() { - if supS, has := srvMngr.subsystems[utils.SchedulerS]; !has { + if srv, has := srvMngr.subsystems[utils.SchedulerS]; !has { utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.SchedulerS)) srvMngr.engineShutdown <- true - } else if err = supS.Start(srvMngr, true); err != nil { + } else if err = srv.Start(srvMngr, true); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.SchedulerS, err)) srvMngr.engineShutdown <- true } }() } + if srvMngr.cfg.CdrsCfg().CDRSEnabled { + go func() { + if srv, has := srvMngr.subsystems[utils.CDRServer]; !has { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.CDRServer)) + srvMngr.engineShutdown <- true + } else if err = srv.Start(srvMngr, true); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.CDRServer, err)) + srvMngr.engineShutdown <- true + } + }() + } // startServer() return } @@ -407,6 +418,16 @@ func (srvMngr *ServiceManager) handleReload() { if err = srvMngr.reloadService(srv, srvMngr.cfg.SchedulerCfg().Enabled); err != nil { return } + case <-srvMngr.cfg.GetReloadChan(config.CDRS_JSN): + srv, has := srvMngr.subsystems[utils.CDRServer] + if !has { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.CDRServer)) + srvMngr.engineShutdown <- true + return // stop if we encounter an error + } + if err = srvMngr.reloadService(srv, srvMngr.cfg.SchedulerCfg().Enabled); err != nil { + return + } } // handle RPC server } diff --git a/utils/consts.go b/utils/consts.go index 24bb3321c..dc4fd12e8 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -601,6 +601,7 @@ const ( CacheS = "CacheS" AnalyzerS = "AnalyzerS" CDRServer = "CDRServer" + ResponderS = "ResponderS" ) // Lower service names