Updated Threshold Service

This commit is contained in:
Trial97
2019-10-03 10:37:53 +03:00
committed by Dan Christian Bogos
parent 8349748441
commit 32ce25ebb1
6 changed files with 46 additions and 33 deletions

View File

@@ -683,8 +683,8 @@ func main() {
attrS := services.NewAttributeService(cfg, dm, cacheS, filterSChan, server)
chrS := services.NewChargerService(cfg, dm, cacheS, filterSChan, server,
attrS.GetIntenternalChan(), internalDispatcherSChan)
tS := services.NewThresholdService(cfg, dm, cacheS, filterSChan, server)
/*
tS := services.NewThresholdService()
stS := services.NewStatService()
reS := services.NewResourceService()
supS := services.NewSupplierService()
@@ -696,7 +696,7 @@ func main() {
resp, _ := srvManager.GetService(utils.ResponderS)
smg := services.NewSessionService()
grd := services.NewGuardianService()*/
srvManager.AddServices( /*chS, */ attrS, chrS) /*, tS, stS, reS, supS, schS, cdrS, rals, smg, grd,
srvManager.AddServices( /*chS, */ attrS, chrS, tS) /* stS, reS, supS, schS, cdrS, rals, smg, grd,
services.NewEventReaderService(),
services.NewDNSAgent(),
services.NewFreeswitchAgent(),

View File

@@ -35,12 +35,12 @@ func NewAttributeService(cfg *config.CGRConfig, dm *engine.DataManager,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
server *utils.Server) servmanager.Service {
return &AttributeService{
connChan: make(chan rpcclient.RpcClientConnection, 1),
cfg: cfg,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
connChan: make(chan rpcclient.RpcClientConnection, 1),
}
}

View File

@@ -35,8 +35,7 @@ func NewChargerService(cfg *config.CGRConfig, dm *engine.DataManager,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS, server *utils.Server,
attrsChan, dispatcherChan chan rpcclient.RpcClientConnection) servmanager.Service {
return &ChargerService{
connChan: make(chan rpcclient.RpcClientConnection, 1),
connChan: make(chan rpcclient.RpcClientConnection, 1),
cfg: cfg,
dm: dm,
cacheS: cacheS,

View File

@@ -23,6 +23,7 @@ import (
"sync"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
@@ -30,35 +31,49 @@ import (
)
// NewThresholdService returns the Threshold Service
func NewThresholdService() servmanager.Service {
func NewThresholdService(cfg *config.CGRConfig, dm *engine.DataManager,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
server *utils.Server) servmanager.Service {
return &ThresholdService{
connChan: make(chan rpcclient.RpcClientConnection, 1),
connChan: make(chan rpcclient.RpcClientConnection, 1),
cfg: cfg,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
}
}
// ThresholdService implements Service interface
type ThresholdService struct {
sync.RWMutex
cfg *config.CGRConfig
dm *engine.DataManager
cacheS *engine.CacheS
filterSChan chan *engine.FilterS
server *utils.Server
thrs *engine.ThresholdService
rpc *v1.ThresholdSv1
connChan chan rpcclient.RpcClientConnection
}
// Start should handle the sercive start
func (thrs *ThresholdService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) {
func (thrs *ThresholdService) Start() (err error) {
if thrs.IsRunning() {
return fmt.Errorf("service aleady running")
}
if waitCache {
<-sp.GetCacheS().GetPrecacheChannel(utils.CacheThresholdProfiles)
<-sp.GetCacheS().GetPrecacheChannel(utils.CacheThresholds)
<-sp.GetCacheS().GetPrecacheChannel(utils.CacheThresholdFilterIndexes)
}
thrs.cacheS.GetPrecacheChannel(utils.CacheThresholdProfiles)
thrs.cacheS.GetPrecacheChannel(utils.CacheThresholds)
thrs.cacheS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes)
filterS := <-thrs.filterSChan
thrs.filterSChan <- filterS
thrs.Lock()
defer thrs.Unlock()
thrs.thrs, err = engine.NewThresholdService(sp.GetDM(), sp.GetConfig(), sp.GetFilterS())
thrs.thrs, err = engine.NewThresholdService(thrs.dm, thrs.cfg, filterS)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.ThresholdS, err.Error()))
return
@@ -66,8 +81,8 @@ func (thrs *ThresholdService) Start(sp servmanager.ServiceProvider, waitCache bo
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ThresholdS))
thrs.thrs.StartLoop()
thrs.rpc = v1.NewThresholdSv1(thrs.thrs)
if !sp.GetConfig().DispatcherSCfg().Enabled {
sp.GetServer().RpcRegister(thrs.rpc)
if !thrs.cfg.DispatcherSCfg().Enabled {
thrs.server.RpcRegister(thrs.rpc)
}
thrs.connChan <- thrs.rpc
return
@@ -79,7 +94,7 @@ func (thrs *ThresholdService) GetIntenternalChan() (conn chan rpcclient.RpcClien
}
// Reload handles the change of config
func (thrs *ThresholdService) Reload(sp servmanager.ServiceProvider) (err error) {
func (thrs *ThresholdService) Reload() (err error) {
thrs.Lock()
thrs.thrs.Reload()
thrs.Unlock()
@@ -99,11 +114,6 @@ func (thrs *ThresholdService) Shutdown() (err error) {
return
}
// GetRPCInterface returns the interface to register for server
func (thrs *ThresholdService) GetRPCInterface() interface{} {
return thrs.rpc
}
// IsRunning returns if the service is running
func (thrs *ThresholdService) IsRunning() bool {
thrs.RLock()
@@ -115,3 +125,8 @@ func (thrs *ThresholdService) IsRunning() bool {
func (thrs *ThresholdService) ServiceName() string {
return utils.ThresholdS
}
// ShouldRun returns if the service should be running
func (thrs *ThresholdService) ShouldRun() bool {
return thrs.cfg.ThresholdSCfg().Enabled
}

View File

@@ -47,12 +47,11 @@ func TestThresholdSReload(t *testing.T) {
close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes))
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil,
/*cdrStorage*/ nil,
/*loadStorage*/ nil, filterSChan,
/*cdrStorage*/ nil /*loadStorage*/, nil /*filterSChan*/, nil,
server, nil, engineShutdown)
srvMngr.SetCacheS(chS)
tS := NewThresholdService()
srvMngr.AddService(tS)
tS := NewThresholdService(cfg, nil, chS, filterSChan, server)
srvMngr.AddServices(tS)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -246,10 +246,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
}
if srvMngr.GetConfig().ChargerSCfg().Enabled {
go srvMngr.startService(utils.ChargerS)
}
if srvMngr.GetConfig().ThresholdSCfg().Enabled {
go srvMngr.startService(utils.ThresholdS)
} /*
if srvMngr.GetConfig().ThresholdSCfg().Enabled {
go srvMngr.startService(utils.ThresholdS)
}
if srvMngr.GetConfig().StatSCfg().Enabled {
go srvMngr.startService(utils.StatS)
}
@@ -335,11 +335,11 @@ func (srvMngr *ServiceManager) handleReload() {
case <-srvMngr.GetConfig().GetReloadChan(config.ChargerSCfgJson):
if err = srvMngr.reloadService(utils.ChargerS); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.THRESHOLDS_JSON):
if err = srvMngr.reloadService(utils.ThresholdS); err != nil {
return
} /*
case <-srvMngr.GetConfig().GetReloadChan(config.THRESHOLDS_JSON):
if err = srvMngr.reloadService(utils.ThresholdS); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.STATS_JSON):
if err = srvMngr.reloadService(utils.StatS); err != nil {
return