From 32ce25ebb1ec4651c3013211f9ba6fe255a66a74 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 3 Oct 2019 10:37:53 +0300 Subject: [PATCH] Updated Threshold Service --- cmd/cgr-engine/cgr-engine.go | 4 +-- services/attributes.go | 2 +- services/chargers.go | 3 +-- services/thresholds.go | 49 ++++++++++++++++++++++------------ services/thresholds_it_test.go | 7 +++-- servmanager/servmanager.go | 14 +++++----- 6 files changed, 46 insertions(+), 33 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 45168f854..adc1ba469 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -683,8 +683,8 @@ func main() { attrS := services.NewAttributeService(cfg, dm, cacheS, filterSChan, server) chrS := services.NewChargerService(cfg, dm, cacheS, filterSChan, server, attrS.GetIntenternalChan(), internalDispatcherSChan) + tS := services.NewThresholdService(cfg, dm, cacheS, filterSChan, server) /* - tS := services.NewThresholdService() stS := services.NewStatService() reS := services.NewResourceService() supS := services.NewSupplierService() @@ -696,7 +696,7 @@ func main() { resp, _ := srvManager.GetService(utils.ResponderS) smg := services.NewSessionService() grd := services.NewGuardianService()*/ - srvManager.AddServices( /*chS, */ attrS, chrS) /*, tS, stS, reS, supS, schS, cdrS, rals, smg, grd, + srvManager.AddServices( /*chS, */ attrS, chrS, tS) /* stS, reS, supS, schS, cdrS, rals, smg, grd, services.NewEventReaderService(), services.NewDNSAgent(), services.NewFreeswitchAgent(), diff --git a/services/attributes.go b/services/attributes.go index fec07a13a..3fe69368e 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -35,12 +35,12 @@ func NewAttributeService(cfg *config.CGRConfig, dm *engine.DataManager, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, server *utils.Server) servmanager.Service { return &AttributeService{ + connChan: make(chan rpcclient.RpcClientConnection, 1), cfg: cfg, dm: dm, cacheS: cacheS, filterSChan: filterSChan, server: server, - connChan: make(chan rpcclient.RpcClientConnection, 1), } } diff --git a/services/chargers.go b/services/chargers.go index 7fd590982..700e03498 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -35,8 +35,7 @@ func NewChargerService(cfg *config.CGRConfig, dm *engine.DataManager, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, server *utils.Server, attrsChan, dispatcherChan chan rpcclient.RpcClientConnection) servmanager.Service { return &ChargerService{ - connChan: make(chan rpcclient.RpcClientConnection, 1), - + connChan: make(chan rpcclient.RpcClientConnection, 1), cfg: cfg, dm: dm, cacheS: cacheS, diff --git a/services/thresholds.go b/services/thresholds.go index 5db8f2bb9..4d9af2a79 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -23,6 +23,7 @@ import ( "sync" v1 "github.com/cgrates/cgrates/apier/v1" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" @@ -30,35 +31,49 @@ import ( ) // NewThresholdService returns the Threshold Service -func NewThresholdService() servmanager.Service { +func NewThresholdService(cfg *config.CGRConfig, dm *engine.DataManager, + cacheS *engine.CacheS, filterSChan chan *engine.FilterS, + server *utils.Server) servmanager.Service { return &ThresholdService{ - connChan: make(chan rpcclient.RpcClientConnection, 1), + connChan: make(chan rpcclient.RpcClientConnection, 1), + cfg: cfg, + dm: dm, + cacheS: cacheS, + filterSChan: filterSChan, + server: server, } } // ThresholdService implements Service interface type ThresholdService struct { sync.RWMutex + cfg *config.CGRConfig + dm *engine.DataManager + cacheS *engine.CacheS + filterSChan chan *engine.FilterS + server *utils.Server + thrs *engine.ThresholdService rpc *v1.ThresholdSv1 connChan chan rpcclient.RpcClientConnection } // Start should handle the sercive start -func (thrs *ThresholdService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { +func (thrs *ThresholdService) Start() (err error) { if thrs.IsRunning() { return fmt.Errorf("service aleady running") } - if waitCache { - <-sp.GetCacheS().GetPrecacheChannel(utils.CacheThresholdProfiles) - <-sp.GetCacheS().GetPrecacheChannel(utils.CacheThresholds) - <-sp.GetCacheS().GetPrecacheChannel(utils.CacheThresholdFilterIndexes) - } + thrs.cacheS.GetPrecacheChannel(utils.CacheThresholdProfiles) + thrs.cacheS.GetPrecacheChannel(utils.CacheThresholds) + thrs.cacheS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes) + + filterS := <-thrs.filterSChan + thrs.filterSChan <- filterS thrs.Lock() defer thrs.Unlock() - thrs.thrs, err = engine.NewThresholdService(sp.GetDM(), sp.GetConfig(), sp.GetFilterS()) + thrs.thrs, err = engine.NewThresholdService(thrs.dm, thrs.cfg, filterS) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.ThresholdS, err.Error())) return @@ -66,8 +81,8 @@ func (thrs *ThresholdService) Start(sp servmanager.ServiceProvider, waitCache bo utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ThresholdS)) thrs.thrs.StartLoop() thrs.rpc = v1.NewThresholdSv1(thrs.thrs) - if !sp.GetConfig().DispatcherSCfg().Enabled { - sp.GetServer().RpcRegister(thrs.rpc) + if !thrs.cfg.DispatcherSCfg().Enabled { + thrs.server.RpcRegister(thrs.rpc) } thrs.connChan <- thrs.rpc return @@ -79,7 +94,7 @@ func (thrs *ThresholdService) GetIntenternalChan() (conn chan rpcclient.RpcClien } // Reload handles the change of config -func (thrs *ThresholdService) Reload(sp servmanager.ServiceProvider) (err error) { +func (thrs *ThresholdService) Reload() (err error) { thrs.Lock() thrs.thrs.Reload() thrs.Unlock() @@ -99,11 +114,6 @@ func (thrs *ThresholdService) Shutdown() (err error) { return } -// GetRPCInterface returns the interface to register for server -func (thrs *ThresholdService) GetRPCInterface() interface{} { - return thrs.rpc -} - // IsRunning returns if the service is running func (thrs *ThresholdService) IsRunning() bool { thrs.RLock() @@ -115,3 +125,8 @@ func (thrs *ThresholdService) IsRunning() bool { func (thrs *ThresholdService) ServiceName() string { return utils.ThresholdS } + +// ShouldRun returns if the service should be running +func (thrs *ThresholdService) ShouldRun() bool { + return thrs.cfg.ThresholdSCfg().Enabled +} diff --git a/services/thresholds_it_test.go b/services/thresholds_it_test.go index e84f4afeb..93560dfaf 100644 --- a/services/thresholds_it_test.go +++ b/services/thresholds_it_test.go @@ -47,12 +47,11 @@ func TestThresholdSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes)) server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil, - /*cdrStorage*/ nil, - /*loadStorage*/ nil, filterSChan, + /*cdrStorage*/ nil /*loadStorage*/, nil /*filterSChan*/, nil, server, nil, engineShutdown) srvMngr.SetCacheS(chS) - tS := NewThresholdService() - srvMngr.AddService(tS) + tS := NewThresholdService(cfg, nil, chS, filterSChan, server) + srvMngr.AddServices(tS) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 95f0e7df9..0fa3d1eee 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -246,10 +246,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) { } if srvMngr.GetConfig().ChargerSCfg().Enabled { go srvMngr.startService(utils.ChargerS) + } + if srvMngr.GetConfig().ThresholdSCfg().Enabled { + go srvMngr.startService(utils.ThresholdS) } /* - if srvMngr.GetConfig().ThresholdSCfg().Enabled { - go srvMngr.startService(utils.ThresholdS) - } if srvMngr.GetConfig().StatSCfg().Enabled { go srvMngr.startService(utils.StatS) } @@ -335,11 +335,11 @@ func (srvMngr *ServiceManager) handleReload() { case <-srvMngr.GetConfig().GetReloadChan(config.ChargerSCfgJson): if err = srvMngr.reloadService(utils.ChargerS); err != nil { return + } + case <-srvMngr.GetConfig().GetReloadChan(config.THRESHOLDS_JSON): + if err = srvMngr.reloadService(utils.ThresholdS); err != nil { + return } /* - case <-srvMngr.GetConfig().GetReloadChan(config.THRESHOLDS_JSON): - if err = srvMngr.reloadService(utils.ThresholdS); err != nil { - return - } case <-srvMngr.GetConfig().GetReloadChan(config.STATS_JSON): if err = srvMngr.reloadService(utils.StatS); err != nil { return