Updated Stat Service

This commit is contained in:
Trial97
2019-10-03 10:56:19 +03:00
committed by Dan Christian Bogos
parent 32ce25ebb1
commit f76caab4c7
4 changed files with 56 additions and 30 deletions

View File

@@ -662,7 +662,6 @@ func main() {
internalCacheSChan := make(chan rpcclient.RpcClientConnection, 1)
// tmp
internalStatSChan := make(chan rpcclient.RpcClientConnection, 1)
internalRsChan := make(chan rpcclient.RpcClientConnection, 1)
internalRaterChan := make(chan rpcclient.RpcClientConnection, 1)
@@ -672,9 +671,6 @@ func main() {
// init CoreSv1
initCoreSv1(internalCoreSv1Chan, server)
// Start FilterS
go startFilterService(filterSChan, cacheS, internalStatSChan, internalRsChan, internalRaterChan, cfg, dm, exitChan)
// Start ServiceManager
srvManager := servmanager.NewServiceManager(cfg, dm, cdrDb,
loadDb, filterSChan, server, internalDispatcherSChan, exitChan)
@@ -684,8 +680,9 @@ func main() {
chrS := services.NewChargerService(cfg, dm, cacheS, filterSChan, server,
attrS.GetIntenternalChan(), internalDispatcherSChan)
tS := services.NewThresholdService(cfg, dm, cacheS, filterSChan, server)
stS := services.NewStatService(cfg, dm, cacheS, filterSChan, server,
tS.GetIntenternalChan(), internalDispatcherSChan)
/*
stS := services.NewStatService()
reS := services.NewResourceService()
supS := services.NewSupplierService()
schS := services.NewSchedulerService()
@@ -723,6 +720,10 @@ func main() {
internalSMGChan := smg.GetIntenternalChan()
internalGuardianSChan := grd.GetIntenternalChan()*/
srvManager.StartServices()
// Start FilterS
go startFilterService(filterSChan, cacheS, stS.GetIntenternalChan(), internalRsChan, internalRaterChan, cfg, dm, exitChan)
/*
cacheS := srvManager.GetCacheS()

View File

@@ -23,6 +23,7 @@ import (
"sync"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
@@ -30,39 +31,58 @@ import (
)
// NewStatService returns the Stat Service
func NewStatService() servmanager.Service {
func NewStatService(cfg *config.CGRConfig, dm *engine.DataManager,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
server *utils.Server, thrsChan chan rpcclient.RpcClientConnection,
dispatcherChan chan rpcclient.RpcClientConnection) servmanager.Service {
return &StatService{
connChan: make(chan rpcclient.RpcClientConnection, 1),
connChan: make(chan rpcclient.RpcClientConnection, 1),
cfg: cfg,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
thrsChan: thrsChan,
dispatcherChan: dispatcherChan,
}
}
// StatService implements Service interface
type StatService struct {
sync.RWMutex
cfg *config.CGRConfig
dm *engine.DataManager
cacheS *engine.CacheS
filterSChan chan *engine.FilterS
server *utils.Server
thrsChan chan rpcclient.RpcClientConnection
dispatcherChan chan rpcclient.RpcClientConnection
sts *engine.StatService
rpc *v1.StatSv1
connChan chan rpcclient.RpcClientConnection
}
// Start should handle the sercive start
func (sts *StatService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) {
func (sts *StatService) Start() (err error) {
if sts.IsRunning() {
return fmt.Errorf("service aleady running")
}
if waitCache {
<-sp.GetCacheS().GetPrecacheChannel(utils.CacheStatQueueProfiles)
<-sp.GetCacheS().GetPrecacheChannel(utils.CacheStatQueues)
<-sp.GetCacheS().GetPrecacheChannel(utils.CacheStatFilterIndexes)
}
sts.cacheS.GetPrecacheChannel(utils.CacheStatQueueProfiles)
sts.cacheS.GetPrecacheChannel(utils.CacheStatQueues)
sts.cacheS.GetPrecacheChannel(utils.CacheStatFilterIndexes)
filterS := <-sts.filterSChan
sts.filterSChan <- filterS
var thdSConn rpcclient.RpcClientConnection
if thdSConn, err = sp.NewConnection(utils.ThresholdS, sp.GetConfig().StatSCfg().ThresholdSConns); err != nil {
if thdSConn, err = NewConnection(sts.cfg, sts.thrsChan, sts.dispatcherChan, sts.cfg.StatSCfg().ThresholdSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.StatS, err.Error()))
return
}
sts.Lock()
defer sts.Unlock()
sts.sts, err = engine.NewStatService(sp.GetDM(), sp.GetConfig(), thdSConn, sp.GetFilterS())
sts.sts, err = engine.NewStatService(sts.dm, sts.cfg, thdSConn, filterS)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<StatS> Could not init, error: %s", err.Error()))
return
@@ -70,8 +90,8 @@ func (sts *StatService) Start(sp servmanager.ServiceProvider, waitCache bool) (e
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.StatS))
sts.sts.StartLoop()
sts.rpc = v1.NewStatSv1(sts.sts)
if !sp.GetConfig().DispatcherSCfg().Enabled {
sp.GetServer().RpcRegister(sts.rpc)
if !sts.cfg.DispatcherSCfg().Enabled {
sts.server.RpcRegister(sts.rpc)
}
sts.connChan <- sts.rpc
return
@@ -83,9 +103,9 @@ func (sts *StatService) GetIntenternalChan() (conn chan rpcclient.RpcClientConne
}
// Reload handles the change of config
func (sts *StatService) Reload(sp servmanager.ServiceProvider) (err error) {
func (sts *StatService) Reload() (err error) {
var thdSConn rpcclient.RpcClientConnection
if thdSConn, err = sp.NewConnection(utils.ThresholdS, sp.GetConfig().StatSCfg().ThresholdSConns); err != nil {
if thdSConn, err = NewConnection(sts.cfg, sts.thrsChan, sts.dispatcherChan, sts.cfg.StatSCfg().ThresholdSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.StatS, err.Error()))
return
}
@@ -125,3 +145,8 @@ func (sts *StatService) IsRunning() bool {
func (sts *StatService) ServiceName() string {
return utils.StatS
}
// ShouldRun returns if the service should be running
func (sts *StatService) ShouldRun() bool {
return sts.cfg.StatSCfg().Enabled
}

View File

@@ -51,12 +51,12 @@ func TestStatSReload(t *testing.T) {
close(chS.GetPrecacheChannel(utils.CacheStatFilterIndexes))
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil,
/*cdrStorage*/ nil,
/*loadStorage*/ nil, filterSChan,
/*cdrStorage*/ nil /*loadStorage*/, nil /*filterSChan*/, nil,
server, nil, engineShutdown)
srvMngr.SetCacheS(chS)
sS := NewStatService()
srvMngr.AddService(&CacheService{chS: chS}, NewThresholdService(), sS)
tS := NewThresholdService(cfg, nil, chS, filterSChan, server)
sS := NewStatService(cfg, nil, chS, filterSChan, server, tS.GetIntenternalChan(), nil)
srvMngr.AddServices(tS, sS)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -249,10 +249,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
}
if srvMngr.GetConfig().ThresholdSCfg().Enabled {
go srvMngr.startService(utils.ThresholdS)
}
if srvMngr.GetConfig().StatSCfg().Enabled {
go srvMngr.startService(utils.StatS)
} /*
if srvMngr.GetConfig().StatSCfg().Enabled {
go srvMngr.startService(utils.StatS)
}
if srvMngr.GetConfig().ResourceSCfg().Enabled {
go srvMngr.startService(utils.ResourceS)
}
@@ -339,11 +339,11 @@ func (srvMngr *ServiceManager) handleReload() {
case <-srvMngr.GetConfig().GetReloadChan(config.THRESHOLDS_JSON):
if err = srvMngr.reloadService(utils.ThresholdS); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.STATS_JSON):
if err = srvMngr.reloadService(utils.StatS); err != nil {
return
} /*
case <-srvMngr.GetConfig().GetReloadChan(config.STATS_JSON):
if err = srvMngr.reloadService(utils.StatS); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.RESOURCES_JSON):
if err = srvMngr.reloadService(utils.ResourceS); err != nil {
return