From f76caab4c797cd7034b511672a31b3b4614ecbf4 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 3 Oct 2019 10:56:19 +0300 Subject: [PATCH] Updated Stat Service --- cmd/cgr-engine/cgr-engine.go | 11 ++++---- services/stats.go | 53 ++++++++++++++++++++++++++---------- services/stats_it_test.go | 8 +++--- servmanager/servmanager.go | 14 +++++----- 4 files changed, 56 insertions(+), 30 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index adc1ba469..005cc1beb 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -662,7 +662,6 @@ func main() { internalCacheSChan := make(chan rpcclient.RpcClientConnection, 1) // tmp - internalStatSChan := make(chan rpcclient.RpcClientConnection, 1) internalRsChan := make(chan rpcclient.RpcClientConnection, 1) internalRaterChan := make(chan rpcclient.RpcClientConnection, 1) @@ -672,9 +671,6 @@ func main() { // init CoreSv1 initCoreSv1(internalCoreSv1Chan, server) - // Start FilterS - go startFilterService(filterSChan, cacheS, internalStatSChan, internalRsChan, internalRaterChan, cfg, dm, exitChan) - // Start ServiceManager srvManager := servmanager.NewServiceManager(cfg, dm, cdrDb, loadDb, filterSChan, server, internalDispatcherSChan, exitChan) @@ -684,8 +680,9 @@ func main() { chrS := services.NewChargerService(cfg, dm, cacheS, filterSChan, server, attrS.GetIntenternalChan(), internalDispatcherSChan) tS := services.NewThresholdService(cfg, dm, cacheS, filterSChan, server) + stS := services.NewStatService(cfg, dm, cacheS, filterSChan, server, + tS.GetIntenternalChan(), internalDispatcherSChan) /* - stS := services.NewStatService() reS := services.NewResourceService() supS := services.NewSupplierService() schS := services.NewSchedulerService() @@ -723,6 +720,10 @@ func main() { internalSMGChan := smg.GetIntenternalChan() internalGuardianSChan := grd.GetIntenternalChan()*/ srvManager.StartServices() + + // Start FilterS + go startFilterService(filterSChan, cacheS, stS.GetIntenternalChan(), internalRsChan, internalRaterChan, cfg, dm, exitChan) + /* cacheS := srvManager.GetCacheS() diff --git a/services/stats.go b/services/stats.go index 34b45a0bf..d5bd0675a 100644 --- a/services/stats.go +++ b/services/stats.go @@ -23,6 +23,7 @@ import ( "sync" v1 "github.com/cgrates/cgrates/apier/v1" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" @@ -30,39 +31,58 @@ import ( ) // NewStatService returns the Stat Service -func NewStatService() servmanager.Service { +func NewStatService(cfg *config.CGRConfig, dm *engine.DataManager, + cacheS *engine.CacheS, filterSChan chan *engine.FilterS, + server *utils.Server, thrsChan chan rpcclient.RpcClientConnection, + dispatcherChan chan rpcclient.RpcClientConnection) servmanager.Service { return &StatService{ - connChan: make(chan rpcclient.RpcClientConnection, 1), + connChan: make(chan rpcclient.RpcClientConnection, 1), + cfg: cfg, + dm: dm, + cacheS: cacheS, + filterSChan: filterSChan, + server: server, + thrsChan: thrsChan, + dispatcherChan: dispatcherChan, } } // StatService implements Service interface type StatService struct { sync.RWMutex + cfg *config.CGRConfig + dm *engine.DataManager + cacheS *engine.CacheS + filterSChan chan *engine.FilterS + server *utils.Server + thrsChan chan rpcclient.RpcClientConnection + dispatcherChan chan rpcclient.RpcClientConnection + sts *engine.StatService rpc *v1.StatSv1 connChan chan rpcclient.RpcClientConnection } // Start should handle the sercive start -func (sts *StatService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { +func (sts *StatService) Start() (err error) { if sts.IsRunning() { return fmt.Errorf("service aleady running") } - if waitCache { - <-sp.GetCacheS().GetPrecacheChannel(utils.CacheStatQueueProfiles) - <-sp.GetCacheS().GetPrecacheChannel(utils.CacheStatQueues) - <-sp.GetCacheS().GetPrecacheChannel(utils.CacheStatFilterIndexes) - } + sts.cacheS.GetPrecacheChannel(utils.CacheStatQueueProfiles) + sts.cacheS.GetPrecacheChannel(utils.CacheStatQueues) + sts.cacheS.GetPrecacheChannel(utils.CacheStatFilterIndexes) + + filterS := <-sts.filterSChan + sts.filterSChan <- filterS var thdSConn rpcclient.RpcClientConnection - if thdSConn, err = sp.NewConnection(utils.ThresholdS, sp.GetConfig().StatSCfg().ThresholdSConns); err != nil { + if thdSConn, err = NewConnection(sts.cfg, sts.thrsChan, sts.dispatcherChan, sts.cfg.StatSCfg().ThresholdSConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.StatS, err.Error())) return } sts.Lock() defer sts.Unlock() - sts.sts, err = engine.NewStatService(sp.GetDM(), sp.GetConfig(), thdSConn, sp.GetFilterS()) + sts.sts, err = engine.NewStatService(sts.dm, sts.cfg, thdSConn, filterS) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not init, error: %s", err.Error())) return @@ -70,8 +90,8 @@ func (sts *StatService) Start(sp servmanager.ServiceProvider, waitCache bool) (e utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.StatS)) sts.sts.StartLoop() sts.rpc = v1.NewStatSv1(sts.sts) - if !sp.GetConfig().DispatcherSCfg().Enabled { - sp.GetServer().RpcRegister(sts.rpc) + if !sts.cfg.DispatcherSCfg().Enabled { + sts.server.RpcRegister(sts.rpc) } sts.connChan <- sts.rpc return @@ -83,9 +103,9 @@ func (sts *StatService) GetIntenternalChan() (conn chan rpcclient.RpcClientConne } // Reload handles the change of config -func (sts *StatService) Reload(sp servmanager.ServiceProvider) (err error) { +func (sts *StatService) Reload() (err error) { var thdSConn rpcclient.RpcClientConnection - if thdSConn, err = sp.NewConnection(utils.ThresholdS, sp.GetConfig().StatSCfg().ThresholdSConns); err != nil { + if thdSConn, err = NewConnection(sts.cfg, sts.thrsChan, sts.dispatcherChan, sts.cfg.StatSCfg().ThresholdSConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.StatS, err.Error())) return } @@ -125,3 +145,8 @@ func (sts *StatService) IsRunning() bool { func (sts *StatService) ServiceName() string { return utils.StatS } + +// ShouldRun returns if the service should be running +func (sts *StatService) ShouldRun() bool { + return sts.cfg.StatSCfg().Enabled +} diff --git a/services/stats_it_test.go b/services/stats_it_test.go index f0d5b7a90..2096fa754 100644 --- a/services/stats_it_test.go +++ b/services/stats_it_test.go @@ -51,12 +51,12 @@ func TestStatSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheStatFilterIndexes)) server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil, - /*cdrStorage*/ nil, - /*loadStorage*/ nil, filterSChan, + /*cdrStorage*/ nil /*loadStorage*/, nil /*filterSChan*/, nil, server, nil, engineShutdown) srvMngr.SetCacheS(chS) - sS := NewStatService() - srvMngr.AddService(&CacheService{chS: chS}, NewThresholdService(), sS) + tS := NewThresholdService(cfg, nil, chS, filterSChan, server) + sS := NewStatService(cfg, nil, chS, filterSChan, server, tS.GetIntenternalChan(), nil) + srvMngr.AddServices(tS, sS) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 0fa3d1eee..07a783313 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -249,10 +249,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) { } if srvMngr.GetConfig().ThresholdSCfg().Enabled { go srvMngr.startService(utils.ThresholdS) + } + if srvMngr.GetConfig().StatSCfg().Enabled { + go srvMngr.startService(utils.StatS) } /* - if srvMngr.GetConfig().StatSCfg().Enabled { - go srvMngr.startService(utils.StatS) - } if srvMngr.GetConfig().ResourceSCfg().Enabled { go srvMngr.startService(utils.ResourceS) } @@ -339,11 +339,11 @@ func (srvMngr *ServiceManager) handleReload() { case <-srvMngr.GetConfig().GetReloadChan(config.THRESHOLDS_JSON): if err = srvMngr.reloadService(utils.ThresholdS); err != nil { return + } + case <-srvMngr.GetConfig().GetReloadChan(config.STATS_JSON): + if err = srvMngr.reloadService(utils.StatS); err != nil { + return } /* - case <-srvMngr.GetConfig().GetReloadChan(config.STATS_JSON): - if err = srvMngr.reloadService(utils.StatS); err != nil { - return - } case <-srvMngr.GetConfig().GetReloadChan(config.RESOURCES_JSON): if err = srvMngr.reloadService(utils.ResourceS); err != nil { return