Updated Resource Service

This commit is contained in:
Trial97
2019-10-03 11:25:54 +03:00
committed by Dan Christian Bogos
parent f76caab4c7
commit 3d612deb6b
5 changed files with 57 additions and 41 deletions

View File

@@ -662,7 +662,6 @@ func main() {
internalCacheSChan := make(chan rpcclient.RpcClientConnection, 1)
// tmp
internalRsChan := make(chan rpcclient.RpcClientConnection, 1)
internalRaterChan := make(chan rpcclient.RpcClientConnection, 1)
// init CacheS
@@ -682,9 +681,9 @@ func main() {
tS := services.NewThresholdService(cfg, dm, cacheS, filterSChan, server)
stS := services.NewStatService(cfg, dm, cacheS, filterSChan, server,
tS.GetIntenternalChan(), internalDispatcherSChan)
/*
reS := services.NewResourceService()
supS := services.NewSupplierService()
reS := services.NewResourceService(cfg, dm, cacheS, filterSChan, server,
tS.GetIntenternalChan(), internalDispatcherSChan)
/* supS := services.NewSupplierService()
schS := services.NewSchedulerService()
cdrS := services.NewCDRServer()
rals := services.NewRalService(srvManager)
@@ -693,7 +692,7 @@ func main() {
resp, _ := srvManager.GetService(utils.ResponderS)
smg := services.NewSessionService()
grd := services.NewGuardianService()*/
srvManager.AddServices( /*chS, */ attrS, chrS, tS) /* stS, reS, supS, schS, cdrS, rals, smg, grd,
srvManager.AddServices( /*chS, */ attrS, chrS, tS, stS, reS) /*, supS, schS, cdrS, rals, smg, grd,
services.NewEventReaderService(),
services.NewDNSAgent(),
services.NewFreeswitchAgent(),
@@ -722,7 +721,7 @@ func main() {
srvManager.StartServices()
// Start FilterS
go startFilterService(filterSChan, cacheS, stS.GetIntenternalChan(), internalRsChan, internalRaterChan, cfg, dm, exitChan)
go startFilterService(filterSChan, cacheS, stS.GetIntenternalChan(), reS.GetIntenternalChan(), internalRaterChan, cfg, dm, exitChan)
/*
cacheS := srvManager.GetCacheS()

View File

@@ -23,6 +23,7 @@ import (
"sync"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
@@ -30,39 +31,60 @@ import (
)
// NewResourceService returns the Resource Service
func NewResourceService() servmanager.Service {
func NewResourceService(cfg *config.CGRConfig, dm *engine.DataManager,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
server *utils.Server, thrsChan chan rpcclient.RpcClientConnection,
dispatcherChan chan rpcclient.RpcClientConnection) servmanager.Service {
return &ResourceService{
connChan: make(chan rpcclient.RpcClientConnection, 1),
connChan: make(chan rpcclient.RpcClientConnection, 1),
cfg: cfg,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
thrsChan: thrsChan,
dispatcherChan: dispatcherChan,
}
}
// ResourceService implements Service interface
type ResourceService struct {
sync.RWMutex
cfg *config.CGRConfig
dm *engine.DataManager
cacheS *engine.CacheS
filterSChan chan *engine.FilterS
server *utils.Server
thrsChan chan rpcclient.RpcClientConnection
dispatcherChan chan rpcclient.RpcClientConnection
reS *engine.ResourceService
rpc *v1.ResourceSv1
connChan chan rpcclient.RpcClientConnection
}
// Start should handle the sercive start
func (reS *ResourceService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) {
func (reS *ResourceService) Start() (err error) {
if reS.IsRunning() {
return fmt.Errorf("service aleady running")
}
if waitCache {
<-sp.GetCacheS().GetPrecacheChannel(utils.CacheResourceProfiles)
<-sp.GetCacheS().GetPrecacheChannel(utils.CacheResources)
<-sp.GetCacheS().GetPrecacheChannel(utils.CacheResourceFilterIndexes)
}
reS.cacheS.GetPrecacheChannel(utils.CacheResourceProfiles)
reS.cacheS.GetPrecacheChannel(utils.CacheResources)
reS.cacheS.GetPrecacheChannel(utils.CacheResourceFilterIndexes)
filterS := <-reS.filterSChan
reS.filterSChan <- filterS
var thdSConn rpcclient.RpcClientConnection
if thdSConn, err = sp.NewConnection(utils.ThresholdS, sp.GetConfig().ResourceSCfg().ThresholdSConns); err != nil {
if thdSConn, err = NewConnection(reS.cfg, reS.thrsChan, reS.dispatcherChan, reS.cfg.ResourceSCfg().ThresholdSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.ResourceS, err.Error()))
return
}
reS.Lock()
defer reS.Unlock()
reS.reS, err = engine.NewResourceService(sp.GetDM(), sp.GetConfig(), thdSConn, sp.GetFilterS())
reS.reS, err = engine.NewResourceService(reS.dm, reS.cfg, thdSConn, filterS)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.ResourceS, err.Error()))
return
@@ -70,8 +92,8 @@ func (reS *ResourceService) Start(sp servmanager.ServiceProvider, waitCache bool
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ResourceS))
reS.reS.StartLoop()
reS.rpc = v1.NewResourceSv1(reS.reS)
if !sp.GetConfig().DispatcherSCfg().Enabled {
sp.GetServer().RpcRegister(reS.rpc)
if !reS.cfg.DispatcherSCfg().Enabled {
reS.server.RpcRegister(reS.rpc)
}
reS.connChan <- reS.rpc
return
@@ -83,9 +105,9 @@ func (reS *ResourceService) GetIntenternalChan() (conn chan rpcclient.RpcClientC
}
// Reload handles the change of config
func (reS *ResourceService) Reload(sp servmanager.ServiceProvider) (err error) {
func (reS *ResourceService) Reload() (err error) {
var thdSConn rpcclient.RpcClientConnection
if thdSConn, err = sp.NewConnection(utils.ThresholdS, sp.GetConfig().ResourceSCfg().ThresholdSConns); err != nil {
if thdSConn, err = NewConnection(reS.cfg, reS.thrsChan, reS.dispatcherChan, reS.cfg.ResourceSCfg().ThresholdSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.ResourceS, err.Error()))
return
}
@@ -109,11 +131,6 @@ func (reS *ResourceService) Shutdown() (err error) {
return
}
// GetRPCInterface returns the interface to register for server
func (reS *ResourceService) GetRPCInterface() interface{} {
return reS.rpc
}
// IsRunning returns if the service is running
func (reS *ResourceService) IsRunning() bool {
reS.RLock()
@@ -125,3 +142,8 @@ func (reS *ResourceService) IsRunning() bool {
func (reS *ResourceService) ServiceName() string {
return utils.ResourceS
}
// ShouldRun returns if the service should be running
func (reS *ResourceService) ShouldRun() bool {
return reS.cfg.ResourceSCfg().Enabled
}

View File

@@ -51,12 +51,12 @@ func TestResourceSReload(t *testing.T) {
close(chS.GetPrecacheChannel(utils.CacheResourceFilterIndexes))
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil,
/*cdrStorage*/ nil,
/*loadStorage*/ nil, filterSChan,
/*cdrStorage*/ nil /*loadStorage*/, nil /*filterSChan*/, nil,
server, nil, engineShutdown)
srvMngr.SetCacheS(chS)
reS := NewResourceService()
srvMngr.AddService(NewThresholdService(), reS)
tS := NewThresholdService(cfg, nil, chS, filterSChan, server)
reS := NewResourceService(cfg, nil, chS, filterSChan, server, tS.GetIntenternalChan(), nil)
srvMngr.AddServices(tS, reS)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -129,11 +129,6 @@ func (sts *StatService) Shutdown() (err error) {
return
}
// GetRPCInterface returns the interface to register for server
func (sts *StatService) GetRPCInterface() interface{} {
return sts.rpc
}
// IsRunning returns if the service is running
func (sts *StatService) IsRunning() bool {
sts.RLock()

View File

@@ -252,10 +252,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
}
if srvMngr.GetConfig().StatSCfg().Enabled {
go srvMngr.startService(utils.StatS)
}
if srvMngr.GetConfig().ResourceSCfg().Enabled {
go srvMngr.startService(utils.ResourceS)
} /*
if srvMngr.GetConfig().ResourceSCfg().Enabled {
go srvMngr.startService(utils.ResourceS)
}
if srvMngr.GetConfig().SupplierSCfg().Enabled {
go srvMngr.startService(utils.SupplierS)
}
@@ -343,11 +343,11 @@ func (srvMngr *ServiceManager) handleReload() {
case <-srvMngr.GetConfig().GetReloadChan(config.STATS_JSON):
if err = srvMngr.reloadService(utils.StatS); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.RESOURCES_JSON):
if err = srvMngr.reloadService(utils.ResourceS); err != nil {
return
} /*
case <-srvMngr.GetConfig().GetReloadChan(config.RESOURCES_JSON):
if err = srvMngr.reloadService(utils.ResourceS); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.SupplierSJson):
if err = srvMngr.reloadService(utils.SupplierS); err != nil {
return