From 3d612deb6b5a1f7ddf5791af0b2e89c682d82153 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 3 Oct 2019 11:25:54 +0300 Subject: [PATCH] Updated Resource Service --- cmd/cgr-engine/cgr-engine.go | 11 +++---- services/resources.go | 60 ++++++++++++++++++++++++----------- services/resources_it_test.go | 8 ++--- services/stats.go | 5 --- servmanager/servmanager.go | 14 ++++---- 5 files changed, 57 insertions(+), 41 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 005cc1beb..8700426f2 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -662,7 +662,6 @@ func main() { internalCacheSChan := make(chan rpcclient.RpcClientConnection, 1) // tmp - internalRsChan := make(chan rpcclient.RpcClientConnection, 1) internalRaterChan := make(chan rpcclient.RpcClientConnection, 1) // init CacheS @@ -682,9 +681,9 @@ func main() { tS := services.NewThresholdService(cfg, dm, cacheS, filterSChan, server) stS := services.NewStatService(cfg, dm, cacheS, filterSChan, server, tS.GetIntenternalChan(), internalDispatcherSChan) - /* - reS := services.NewResourceService() - supS := services.NewSupplierService() + reS := services.NewResourceService(cfg, dm, cacheS, filterSChan, server, + tS.GetIntenternalChan(), internalDispatcherSChan) + /* supS := services.NewSupplierService() schS := services.NewSchedulerService() cdrS := services.NewCDRServer() rals := services.NewRalService(srvManager) @@ -693,7 +692,7 @@ func main() { resp, _ := srvManager.GetService(utils.ResponderS) smg := services.NewSessionService() grd := services.NewGuardianService()*/ - srvManager.AddServices( /*chS, */ attrS, chrS, tS) /* stS, reS, supS, schS, cdrS, rals, smg, grd, + srvManager.AddServices( /*chS, */ attrS, chrS, tS, stS, reS) /*, supS, schS, cdrS, rals, smg, grd, services.NewEventReaderService(), services.NewDNSAgent(), services.NewFreeswitchAgent(), @@ -722,7 +721,7 @@ func main() { srvManager.StartServices() // Start FilterS - go startFilterService(filterSChan, cacheS, stS.GetIntenternalChan(), internalRsChan, internalRaterChan, cfg, dm, exitChan) + go startFilterService(filterSChan, cacheS, stS.GetIntenternalChan(), reS.GetIntenternalChan(), internalRaterChan, cfg, dm, exitChan) /* cacheS := srvManager.GetCacheS() diff --git a/services/resources.go b/services/resources.go index d983e32b5..021286a8d 100644 --- a/services/resources.go +++ b/services/resources.go @@ -23,6 +23,7 @@ import ( "sync" v1 "github.com/cgrates/cgrates/apier/v1" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" @@ -30,39 +31,60 @@ import ( ) // NewResourceService returns the Resource Service -func NewResourceService() servmanager.Service { +func NewResourceService(cfg *config.CGRConfig, dm *engine.DataManager, + cacheS *engine.CacheS, filterSChan chan *engine.FilterS, + server *utils.Server, thrsChan chan rpcclient.RpcClientConnection, + dispatcherChan chan rpcclient.RpcClientConnection) servmanager.Service { return &ResourceService{ - connChan: make(chan rpcclient.RpcClientConnection, 1), + connChan: make(chan rpcclient.RpcClientConnection, 1), + cfg: cfg, + dm: dm, + cacheS: cacheS, + filterSChan: filterSChan, + server: server, + thrsChan: thrsChan, + dispatcherChan: dispatcherChan, } } // ResourceService implements Service interface type ResourceService struct { sync.RWMutex + cfg *config.CGRConfig + dm *engine.DataManager + cacheS *engine.CacheS + filterSChan chan *engine.FilterS + server *utils.Server + thrsChan chan rpcclient.RpcClientConnection + dispatcherChan chan rpcclient.RpcClientConnection + reS *engine.ResourceService rpc *v1.ResourceSv1 connChan chan rpcclient.RpcClientConnection } // Start should handle the sercive start -func (reS *ResourceService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { +func (reS *ResourceService) Start() (err error) { if reS.IsRunning() { return fmt.Errorf("service aleady running") } - if waitCache { - <-sp.GetCacheS().GetPrecacheChannel(utils.CacheResourceProfiles) - <-sp.GetCacheS().GetPrecacheChannel(utils.CacheResources) - <-sp.GetCacheS().GetPrecacheChannel(utils.CacheResourceFilterIndexes) - } + + reS.cacheS.GetPrecacheChannel(utils.CacheResourceProfiles) + reS.cacheS.GetPrecacheChannel(utils.CacheResources) + reS.cacheS.GetPrecacheChannel(utils.CacheResourceFilterIndexes) + + filterS := <-reS.filterSChan + reS.filterSChan <- filterS + var thdSConn rpcclient.RpcClientConnection - if thdSConn, err = sp.NewConnection(utils.ThresholdS, sp.GetConfig().ResourceSCfg().ThresholdSConns); err != nil { + if thdSConn, err = NewConnection(reS.cfg, reS.thrsChan, reS.dispatcherChan, reS.cfg.ResourceSCfg().ThresholdSConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.ResourceS, err.Error())) return } reS.Lock() defer reS.Unlock() - reS.reS, err = engine.NewResourceService(sp.GetDM(), sp.GetConfig(), thdSConn, sp.GetFilterS()) + reS.reS, err = engine.NewResourceService(reS.dm, reS.cfg, thdSConn, filterS) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.ResourceS, err.Error())) return @@ -70,8 +92,8 @@ func (reS *ResourceService) Start(sp servmanager.ServiceProvider, waitCache bool utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ResourceS)) reS.reS.StartLoop() reS.rpc = v1.NewResourceSv1(reS.reS) - if !sp.GetConfig().DispatcherSCfg().Enabled { - sp.GetServer().RpcRegister(reS.rpc) + if !reS.cfg.DispatcherSCfg().Enabled { + reS.server.RpcRegister(reS.rpc) } reS.connChan <- reS.rpc return @@ -83,9 +105,9 @@ func (reS *ResourceService) GetIntenternalChan() (conn chan rpcclient.RpcClientC } // Reload handles the change of config -func (reS *ResourceService) Reload(sp servmanager.ServiceProvider) (err error) { +func (reS *ResourceService) Reload() (err error) { var thdSConn rpcclient.RpcClientConnection - if thdSConn, err = sp.NewConnection(utils.ThresholdS, sp.GetConfig().ResourceSCfg().ThresholdSConns); err != nil { + if thdSConn, err = NewConnection(reS.cfg, reS.thrsChan, reS.dispatcherChan, reS.cfg.ResourceSCfg().ThresholdSConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.ResourceS, err.Error())) return } @@ -109,11 +131,6 @@ func (reS *ResourceService) Shutdown() (err error) { return } -// GetRPCInterface returns the interface to register for server -func (reS *ResourceService) GetRPCInterface() interface{} { - return reS.rpc -} - // IsRunning returns if the service is running func (reS *ResourceService) IsRunning() bool { reS.RLock() @@ -125,3 +142,8 @@ func (reS *ResourceService) IsRunning() bool { func (reS *ResourceService) ServiceName() string { return utils.ResourceS } + +// ShouldRun returns if the service should be running +func (reS *ResourceService) ShouldRun() bool { + return reS.cfg.ResourceSCfg().Enabled +} diff --git a/services/resources_it_test.go b/services/resources_it_test.go index 18c2493ab..1f0ee5312 100644 --- a/services/resources_it_test.go +++ b/services/resources_it_test.go @@ -51,12 +51,12 @@ func TestResourceSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheResourceFilterIndexes)) server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil, - /*cdrStorage*/ nil, - /*loadStorage*/ nil, filterSChan, + /*cdrStorage*/ nil /*loadStorage*/, nil /*filterSChan*/, nil, server, nil, engineShutdown) srvMngr.SetCacheS(chS) - reS := NewResourceService() - srvMngr.AddService(NewThresholdService(), reS) + tS := NewThresholdService(cfg, nil, chS, filterSChan, server) + reS := NewResourceService(cfg, nil, chS, filterSChan, server, tS.GetIntenternalChan(), nil) + srvMngr.AddServices(tS, reS) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/stats.go b/services/stats.go index d5bd0675a..cb398decb 100644 --- a/services/stats.go +++ b/services/stats.go @@ -129,11 +129,6 @@ func (sts *StatService) Shutdown() (err error) { return } -// GetRPCInterface returns the interface to register for server -func (sts *StatService) GetRPCInterface() interface{} { - return sts.rpc -} - // IsRunning returns if the service is running func (sts *StatService) IsRunning() bool { sts.RLock() diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 07a783313..430427d19 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -252,10 +252,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) { } if srvMngr.GetConfig().StatSCfg().Enabled { go srvMngr.startService(utils.StatS) + } + if srvMngr.GetConfig().ResourceSCfg().Enabled { + go srvMngr.startService(utils.ResourceS) } /* - if srvMngr.GetConfig().ResourceSCfg().Enabled { - go srvMngr.startService(utils.ResourceS) - } if srvMngr.GetConfig().SupplierSCfg().Enabled { go srvMngr.startService(utils.SupplierS) } @@ -343,11 +343,11 @@ func (srvMngr *ServiceManager) handleReload() { case <-srvMngr.GetConfig().GetReloadChan(config.STATS_JSON): if err = srvMngr.reloadService(utils.StatS); err != nil { return + } + case <-srvMngr.GetConfig().GetReloadChan(config.RESOURCES_JSON): + if err = srvMngr.reloadService(utils.ResourceS); err != nil { + return } /* - case <-srvMngr.GetConfig().GetReloadChan(config.RESOURCES_JSON): - if err = srvMngr.reloadService(utils.ResourceS); err != nil { - return - } case <-srvMngr.GetConfig().GetReloadChan(config.SupplierSJson): if err = srvMngr.reloadService(utils.SupplierS); err != nil { return