mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Added StatS as service in ServiceManager
This commit is contained in:
committed by
Dan Christian Bogos
parent
8c826e1e33
commit
4763133501
@@ -851,59 +851,6 @@ func startResourceService(internalRsChan, internalThresholdSChan,
|
||||
internalRsChan <- rsV1
|
||||
}
|
||||
|
||||
// startStatService fires up the StatS
|
||||
func startStatService(internalStatSChan, internalThresholdSChan,
|
||||
internalDispatcherSChan chan rpcclient.RpcClientConnection,
|
||||
cacheS *engine.CacheS, cfg *config.CGRConfig,
|
||||
dm *engine.DataManager, server *utils.Server,
|
||||
filterSChan chan *engine.FilterS, exitChan chan bool) {
|
||||
var err error
|
||||
var thdSConn rpcclient.RpcClientConnection
|
||||
filterS := <-filterSChan
|
||||
filterSChan <- filterS
|
||||
intThresholdSChan := internalThresholdSChan
|
||||
if cfg.DispatcherSCfg().Enabled {
|
||||
intThresholdSChan = internalDispatcherSChan
|
||||
}
|
||||
if len(cfg.StatSCfg().ThresholdSConns) != 0 { // Stats connection init
|
||||
thdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
|
||||
cfg.TlsCfg().ClientKey,
|
||||
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
|
||||
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
|
||||
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
|
||||
cfg.StatSCfg().ThresholdSConns, intThresholdSChan, false)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<StatS> Could not connect to ThresholdS: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheStatQueueProfiles)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheStatQueues)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheStatFilterIndexes)
|
||||
|
||||
sS, err := engine.NewStatService(dm, cfg.StatSCfg().StoreInterval,
|
||||
thdSConn, filterS, cfg.StatSCfg().StringIndexedFields, cfg.StatSCfg().PrefixIndexedFields)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<StatS> Could not init, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
if err := sS.ListenAndServe(exitChan); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<StatS> Error: %s listening for packets", err.Error()))
|
||||
}
|
||||
sS.Shutdown()
|
||||
exitChan <- true
|
||||
return
|
||||
}()
|
||||
stsV1 := v1.NewStatSv1(sS)
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(stsV1)
|
||||
}
|
||||
internalStatSChan <- stsV1
|
||||
}
|
||||
|
||||
// startSupplierService fires up the SupplierS
|
||||
func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSChan,
|
||||
internalAttrSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection,
|
||||
@@ -1575,10 +1522,12 @@ func main() {
|
||||
attrS := services.NewAttributeService()
|
||||
chrS := services.NewChargerService()
|
||||
tS := services.NewThresholdService()
|
||||
srvManager.AddService(attrS, chrS, tS)
|
||||
stS := services.NewStatService()
|
||||
srvManager.AddService(attrS, chrS, tS, stS)
|
||||
internalAttributeSChan = attrS.GetIntenternalChan()
|
||||
internalChargerSChan = chrS.GetIntenternalChan()
|
||||
internalThresholdSChan = tS.GetIntenternalChan()
|
||||
internalStatSChan = stS.GetIntenternalChan()
|
||||
go srvManager.StartServices()
|
||||
|
||||
initServiceManagerV1(internalServeManagerChan, srvManager, server)
|
||||
@@ -1695,12 +1644,6 @@ func main() {
|
||||
filterSChan, exitChan)
|
||||
}
|
||||
|
||||
if cfg.StatSCfg().Enabled {
|
||||
go startStatService(internalStatSChan, internalThresholdSChan,
|
||||
internalDispatcherSChan, cacheS, cfg, dm, server,
|
||||
filterSChan, exitChan)
|
||||
}
|
||||
|
||||
if cfg.SupplierSCfg().Enabled {
|
||||
go startSupplierService(internalSupplierSChan, internalRsChan,
|
||||
internalStatSChan, internalAttributeSChan, internalDispatcherSChan,
|
||||
|
||||
@@ -1167,8 +1167,10 @@ func (self *CGRConfig) ResourceSCfg() *ResourceSConfig {
|
||||
return self.resourceSCfg
|
||||
}
|
||||
|
||||
// ToDo: fix locking
|
||||
func (cfg *CGRConfig) StatSCfg() *StatSCfg {
|
||||
// StatSCfg returns the config for StatS
|
||||
func (cfg *CGRConfig) StatSCfg() *StatSCfg { // not done
|
||||
cfg.lks[STATS_JSON].Lock()
|
||||
defer cfg.lks[STATS_JSON].Unlock()
|
||||
return cfg.statsCfg
|
||||
}
|
||||
|
||||
@@ -1572,6 +1574,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) {
|
||||
}
|
||||
fallthrough
|
||||
case STATS_JSON:
|
||||
cfg.rldChans[STATS_JSON] <- struct{}{}
|
||||
if !fall {
|
||||
break
|
||||
}
|
||||
|
||||
@@ -163,6 +163,34 @@ func TestCGRConfigReloadThresholdS(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCGRConfigReloadStatS(t *testing.T) {
|
||||
cfg, err := NewDefaultCGRConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var reply string
|
||||
if err = cfg.V1ReloadConfig(&ConfigReloadWithArgDispatcher{
|
||||
Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo2"),
|
||||
Section: STATS_JSON,
|
||||
}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("Expected OK received: %s", reply)
|
||||
}
|
||||
expAttr := &StatSCfg{
|
||||
Enabled: true,
|
||||
StringIndexedFields: &[]string{utils.Account},
|
||||
PrefixIndexedFields: &[]string{},
|
||||
IndexedSelects: true,
|
||||
ThresholdSConns: []*RemoteHost{
|
||||
&RemoteHost{Address: "127.0.0.1:2012", Transport: utils.MetaJSONrpc},
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(expAttr, cfg.StatSCfg()) {
|
||||
t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.StatSCfg()))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCgrCfgV1ReloadConfigSection(t *testing.T) {
|
||||
for _, dir := range []string{"/tmp/ers/in", "/tmp/ers/out"} {
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
|
||||
@@ -32,33 +32,31 @@ import (
|
||||
)
|
||||
|
||||
// NewStatService initializes a StatService
|
||||
func NewStatService(dm *DataManager, storeInterval time.Duration,
|
||||
thdS rpcclient.RpcClientConnection, filterS *FilterS, stringIndexedFields, prefixIndexedFields *[]string) (ss *StatService, err error) {
|
||||
func NewStatService(dm *DataManager, cgrcfg *config.CGRConfig,
|
||||
thdS rpcclient.RpcClientConnection, filterS *FilterS) (ss *StatService, err error) {
|
||||
if thdS != nil && reflect.ValueOf(thdS).IsNil() { // fix nil value in interface
|
||||
thdS = nil
|
||||
}
|
||||
return &StatService{
|
||||
dm: dm,
|
||||
storeInterval: storeInterval,
|
||||
thdS: thdS,
|
||||
filterS: filterS,
|
||||
stringIndexedFields: stringIndexedFields,
|
||||
prefixIndexedFields: prefixIndexedFields,
|
||||
storedStatQueues: make(utils.StringMap),
|
||||
stopBackup: make(chan struct{})}, nil
|
||||
dm: dm,
|
||||
thdS: thdS,
|
||||
filterS: filterS,
|
||||
cgrcfg: cgrcfg,
|
||||
storedStatQueues: make(utils.StringMap),
|
||||
loopStoped: make(chan struct{}),
|
||||
stopBackup: make(chan struct{})}, nil
|
||||
}
|
||||
|
||||
// StatService builds stats for events
|
||||
type StatService struct {
|
||||
dm *DataManager
|
||||
storeInterval time.Duration
|
||||
thdS rpcclient.RpcClientConnection // rpc connection towards ThresholdS
|
||||
filterS *FilterS
|
||||
stringIndexedFields *[]string
|
||||
prefixIndexedFields *[]string
|
||||
stopBackup chan struct{}
|
||||
storedStatQueues utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool
|
||||
ssqMux sync.RWMutex // protects storedStatQueues
|
||||
dm *DataManager
|
||||
thdS rpcclient.RpcClientConnection // rpc connection towards ThresholdS
|
||||
filterS *FilterS
|
||||
cgrcfg *config.CGRConfig
|
||||
loopStoped chan struct{}
|
||||
stopBackup chan struct{}
|
||||
storedStatQueues utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool
|
||||
ssqMux sync.RWMutex // protects storedStatQueues
|
||||
}
|
||||
|
||||
// ListenAndServe loops keeps the service alive
|
||||
@@ -81,17 +79,19 @@ func (sS *StatService) Shutdown() error {
|
||||
|
||||
// runBackup will regularly store resources changed to dataDB
|
||||
func (sS *StatService) runBackup() {
|
||||
if sS.storeInterval <= 0 {
|
||||
storeInterval := sS.cgrcfg.StatSCfg().StoreInterval
|
||||
if storeInterval <= 0 {
|
||||
sS.loopStoped <- struct{}{}
|
||||
return
|
||||
}
|
||||
for {
|
||||
sS.storeStats()
|
||||
select {
|
||||
case <-sS.stopBackup:
|
||||
sS.loopStoped <- struct{}{}
|
||||
return
|
||||
default:
|
||||
case <-time.After(storeInterval):
|
||||
}
|
||||
sS.storeStats()
|
||||
time.Sleep(sS.storeInterval)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -160,7 +160,7 @@ func (sS *StatService) matchingStatQueuesForEvent(args *StatsArgsProcessEvent) (
|
||||
if len(args.StatIDs) != 0 {
|
||||
sqIDs = args.StatIDs
|
||||
} else {
|
||||
mapIDs, err := MatchingItemIDsForEvent(args.Event, sS.stringIndexedFields, sS.prefixIndexedFields,
|
||||
mapIDs, err := MatchingItemIDsForEvent(args.Event, sS.cgrcfg.StatSCfg().StringIndexedFields, sS.cgrcfg.StatSCfg().PrefixIndexedFields,
|
||||
sS.dm, utils.CacheStatFilterIndexes, args.Tenant, sS.filterS.cfg.StatSCfg().IndexedSelects)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -257,8 +257,8 @@ func (sS *StatService) processEvent(args *StatsArgsProcessEvent) (statQueueIDs [
|
||||
sq.TenantID(), args.TenantID(), err.Error()))
|
||||
withErrors = true
|
||||
}
|
||||
if sS.storeInterval != 0 && sq.dirty != nil { // don't save
|
||||
if sS.storeInterval == -1 {
|
||||
if sS.cgrcfg.StatSCfg().StoreInterval != 0 && sq.dirty != nil { // don't save
|
||||
if sS.cgrcfg.StatSCfg().StoreInterval == -1 {
|
||||
sS.StoreStatQueue(sq)
|
||||
} else {
|
||||
*sq.dirty = true // mark it to be saved
|
||||
@@ -409,3 +409,16 @@ func (sS *StatService) V1GetQueueIDs(tenant string, qIDs *[]string) (err error)
|
||||
*qIDs = retIDs
|
||||
return
|
||||
}
|
||||
|
||||
// Reload stops the backupLoop and restarts it
|
||||
func (sS *StatService) Reload() {
|
||||
close(sS.stopBackup)
|
||||
<-sS.loopStoped // wait until the loop is done
|
||||
sS.stopBackup = make(chan struct{})
|
||||
go sS.runBackup()
|
||||
}
|
||||
|
||||
// StartLoop starsS the gorutine with the backup loop
|
||||
func (sS *StatService) StartLoop() {
|
||||
go sS.runBackup()
|
||||
}
|
||||
|
||||
@@ -149,8 +149,11 @@ func TestStatQueuesPopulateService(t *testing.T) {
|
||||
t.Errorf("Error: %+v", err)
|
||||
}
|
||||
|
||||
statService, err = NewStatService(dmSTS, time.Duration(1),
|
||||
nil, &FilterS{dm: dmSTS, cfg: defaultCfg}, nil, nil)
|
||||
defaultCfg.StatSCfg().StoreInterval = 1
|
||||
defaultCfg.StatSCfg().StringIndexedFields = nil
|
||||
defaultCfg.StatSCfg().PrefixIndexedFields = nil
|
||||
statService, err = NewStatService(dmSTS, defaultCfg,
|
||||
nil, &FilterS{dm: dmSTS, cfg: defaultCfg})
|
||||
if err != nil {
|
||||
t.Errorf("Error: %+v", err)
|
||||
}
|
||||
|
||||
111
services/stats.go
Normal file
111
services/stats.go
Normal file
@@ -0,0 +1,111 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package services
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
v1 "github.com/cgrates/cgrates/apier/v1"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
// NewStatService returns the Stat Service
|
||||
func NewStatService() servmanager.Service {
|
||||
return &StatService{
|
||||
connChan: make(chan rpcclient.RpcClientConnection, 1),
|
||||
}
|
||||
}
|
||||
|
||||
// StatService implements Service interface
|
||||
type StatService struct {
|
||||
sts *engine.StatService
|
||||
rpc *v1.StatSv1
|
||||
connChan chan rpcclient.RpcClientConnection
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (sts *StatService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) {
|
||||
if sts.IsRunning() {
|
||||
return fmt.Errorf("service aleady running")
|
||||
}
|
||||
if waitCache {
|
||||
<-sp.GetCacheS().GetPrecacheChannel(utils.CacheStatQueueProfiles)
|
||||
<-sp.GetCacheS().GetPrecacheChannel(utils.CacheStatQueues)
|
||||
<-sp.GetCacheS().GetPrecacheChannel(utils.CacheStatFilterIndexes)
|
||||
}
|
||||
|
||||
var thdSConn rpcclient.RpcClientConnection
|
||||
if thdSConn, err = sp.GetConnection(utils.ThresholdS, sp.GetConfig().StatSCfg().ThresholdSConns); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.StatS, err.Error()))
|
||||
return
|
||||
}
|
||||
sts.sts, err = engine.NewStatService(sp.GetDM(), sp.GetConfig(), thdSConn, sp.GetFilterS())
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<StatS> Could not init, error: %s", err.Error()))
|
||||
return
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.StatS))
|
||||
sts.sts.StartLoop()
|
||||
sts.rpc = v1.NewStatSv1(sts.sts)
|
||||
if !sp.GetConfig().DispatcherSCfg().Enabled {
|
||||
sp.GetServer().RpcRegister(sts.rpc)
|
||||
}
|
||||
sts.connChan <- sts.rpc
|
||||
return
|
||||
}
|
||||
|
||||
// GetIntenternalChan returns the internal connection chanel
|
||||
func (sts *StatService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) {
|
||||
return sts.connChan
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (sts *StatService) Reload(sp servmanager.ServiceProvider) (err error) {
|
||||
sts.sts.Reload()
|
||||
return
|
||||
}
|
||||
|
||||
// Shutdown stops the service
|
||||
func (sts *StatService) Shutdown() (err error) {
|
||||
if err = sts.sts.Shutdown(); err != nil {
|
||||
return
|
||||
}
|
||||
sts.sts = nil
|
||||
sts.rpc = nil
|
||||
<-sts.connChan
|
||||
return
|
||||
}
|
||||
|
||||
// GetRPCInterface returns the interface to register for server
|
||||
func (sts *StatService) GetRPCInterface() interface{} {
|
||||
return sts.rpc
|
||||
}
|
||||
|
||||
// IsRunning returns if the service is running
|
||||
func (sts *StatService) IsRunning() bool {
|
||||
return sts != nil && sts.sts != nil
|
||||
}
|
||||
|
||||
// ServiceName returns the service name
|
||||
func (sts *StatService) ServiceName() string {
|
||||
return utils.StatS
|
||||
}
|
||||
83
services/stats_it_test.go
Normal file
83
services/stats_it_test.go
Normal file
@@ -0,0 +1,83 @@
|
||||
// +build integration
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package services
|
||||
|
||||
import (
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func TestStatSReload(t *testing.T) {
|
||||
// utils.Logger.SetLogLevel(7)
|
||||
cfg, err := config.NewDefaultCGRConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cfg.ThresholdSCfg().Enabled = true
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
filterSChan <- nil
|
||||
engineShutdown := make(chan bool, 1)
|
||||
chS := engine.NewCacheS(cfg, nil)
|
||||
close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles))
|
||||
close(chS.GetPrecacheChannel(utils.CacheThresholds))
|
||||
close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes))
|
||||
close(chS.GetPrecacheChannel(utils.CacheStatQueueProfiles))
|
||||
close(chS.GetPrecacheChannel(utils.CacheStatQueues))
|
||||
close(chS.GetPrecacheChannel(utils.CacheStatFilterIndexes))
|
||||
server := utils.NewServer()
|
||||
srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil,
|
||||
chS /*cdrStorage*/, nil,
|
||||
/*loadStorage*/ nil, filterSChan,
|
||||
server, nil, engineShutdown)
|
||||
sS := NewStatService()
|
||||
srvMngr.AddService(NewThresholdService(), sS)
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if sS.IsRunning() {
|
||||
t.Errorf("Expected service to be down")
|
||||
}
|
||||
var reply string
|
||||
if err = cfg.V1ReloadConfig(&config.ConfigReloadWithArgDispatcher{
|
||||
Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo"),
|
||||
Section: config.STATS_JSON,
|
||||
}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("Expecting OK ,received %s", reply)
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond) //need to switch to gorutine
|
||||
if !sS.IsRunning() {
|
||||
t.Errorf("Expected service to be running")
|
||||
}
|
||||
cfg.StatSCfg().Enabled = false
|
||||
cfg.GetReloadChan(config.STATS_JSON) <- struct{}{}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
if sS.IsRunning() {
|
||||
t.Errorf("Expected service to be down")
|
||||
}
|
||||
engineShutdown <- true
|
||||
}
|
||||
@@ -59,6 +59,7 @@ func (thrs *ThresholdService) Start(sp servmanager.ServiceProvider, waitCache bo
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.ThresholdS, err.Error()))
|
||||
return
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ThresholdS))
|
||||
thrs.thrs.StartLoop()
|
||||
thrs.rpc = v1.NewThresholdSv1(thrs.thrs)
|
||||
if !sp.GetConfig().DispatcherSCfg().Enabled {
|
||||
|
||||
@@ -309,10 +309,21 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
|
||||
if srvMngr.cfg.ThresholdSCfg().Enabled {
|
||||
go func() {
|
||||
if chrS, has := srvMngr.subsystems[utils.ThresholdS]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ChargerS))
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ThresholdS))
|
||||
srvMngr.engineShutdown <- true
|
||||
} else if err = chrS.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ChargerS, err))
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ThresholdS, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
}()
|
||||
}
|
||||
if srvMngr.cfg.StatSCfg().Enabled {
|
||||
go func() {
|
||||
if chrS, has := srvMngr.subsystems[utils.StatS]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.StatS))
|
||||
srvMngr.engineShutdown <- true
|
||||
} else if err = chrS.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.StatS, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
}()
|
||||
@@ -422,6 +433,34 @@ func (srvMngr *ServiceManager) handleReload() {
|
||||
return // stop if we encounter an error
|
||||
}
|
||||
}
|
||||
case <-srvMngr.cfg.GetReloadChan(config.STATS_JSON):
|
||||
tS, has := srvMngr.subsystems[utils.StatS]
|
||||
if !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.StatS))
|
||||
srvMngr.engineShutdown <- true
|
||||
return // stop if we encounter an error
|
||||
}
|
||||
if srvMngr.cfg.StatSCfg().Enabled {
|
||||
if tS.IsRunning() {
|
||||
if err = tS.Reload(srvMngr); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to reload <%s>", utils.ServiceManager, utils.StatS))
|
||||
srvMngr.engineShutdown <- true
|
||||
return // stop if we encounter an error
|
||||
}
|
||||
} else {
|
||||
if err = tS.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.StatS))
|
||||
srvMngr.engineShutdown <- true
|
||||
return // stop if we encounter an error
|
||||
}
|
||||
}
|
||||
} else if tS.IsRunning() {
|
||||
if err = tS.Shutdown(); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to stop service <%s>", utils.ServiceManager, utils.StatS))
|
||||
srvMngr.engineShutdown <- true
|
||||
return // stop if we encounter an error
|
||||
}
|
||||
}
|
||||
}
|
||||
// handle RPC server
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user