Updated how FilterS is passed to service

This commit is contained in:
Trial97
2019-10-03 10:22:26 +03:00
committed by Dan Christian Bogos
parent 9e2e755776
commit 8349748441
5 changed files with 33 additions and 27 deletions

View File

@@ -673,18 +673,15 @@ func main() {
initCoreSv1(internalCoreSv1Chan, server)
// Start FilterS
// force a litl
startFilterService(filterSChan, cacheS, internalStatSChan, internalRsChan, internalRaterChan, cfg, dm, exitChan)
filterS := <-filterSChan
filterSChan <- filterS
go startFilterService(filterSChan, cacheS, internalStatSChan, internalRsChan, internalRaterChan, cfg, dm, exitChan)
// Start ServiceManager
srvManager := servmanager.NewServiceManager(cfg, dm, cdrDb,
loadDb, filterSChan, server, internalDispatcherSChan, exitChan)
// chS := services.NewCacheService()
attrS := services.NewAttributeService(cfg, dm, cacheS, filterS, server)
chrS := services.NewChargerService(cfg, dm, cacheS, filterS, server,
attrS := services.NewAttributeService(cfg, dm, cacheS, filterSChan, server)
chrS := services.NewChargerService(cfg, dm, cacheS, filterSChan, server,
attrS.GetIntenternalChan(), internalDispatcherSChan)
/*
tS := services.NewThresholdService()

View File

@@ -32,26 +32,26 @@ import (
// NewAttributeService returns the Attribute Service
func NewAttributeService(cfg *config.CGRConfig, dm *engine.DataManager,
cacheS *engine.CacheS, filterS *engine.FilterS,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
server *utils.Server) servmanager.Service {
return &AttributeService{
cfg: cfg,
dm: dm,
cacheS: cacheS,
filterS: filterS,
server: server,
connChan: make(chan rpcclient.RpcClientConnection, 1),
cfg: cfg,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
connChan: make(chan rpcclient.RpcClientConnection, 1),
}
}
// AttributeService implements Service interface
type AttributeService struct {
sync.RWMutex
cfg *config.CGRConfig
dm *engine.DataManager
cacheS *engine.CacheS
filterS *engine.FilterS
server *utils.Server
cfg *config.CGRConfig
dm *engine.DataManager
cacheS *engine.CacheS
filterSChan chan *engine.FilterS
server *utils.Server
attrS *engine.AttributeService
rpc *v1.AttributeSv1
@@ -67,10 +67,12 @@ func (attrS *AttributeService) Start() (err error) {
<-attrS.cacheS.GetPrecacheChannel(utils.CacheAttributeProfiles)
<-attrS.cacheS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes)
filterS := <-attrS.filterSChan
attrS.filterSChan <- filterS
attrS.Lock()
defer attrS.Unlock()
attrS.attrS, err = engine.NewAttributeService(attrS.dm,
attrS.filterS, attrS.cfg)
attrS.attrS, err = engine.NewAttributeService(attrS.dm, filterS, attrS.cfg)
if err != nil {
utils.Logger.Crit(
fmt.Sprintf("<%s> Could not init, error: %s",

View File

@@ -40,6 +40,8 @@ func TestAttributeSReload(t *testing.T) {
engineShutdown := make(chan bool, 1)
chS := engine.NewCacheS(cfg, nil)
filterSChan := make(chan *engine.FilterS, 1)
filterSChan <- nil
close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles))
close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes))
server := utils.NewServer()
@@ -49,7 +51,7 @@ func TestAttributeSReload(t *testing.T) {
engineShutdown)
srvMngr.SetCacheS(chS)
attrS := NewAttributeService(cfg, nil,
chS, nil, server)
chS, filterSChan, server)
srvMngr.AddServices(attrS)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)

View File

@@ -32,7 +32,7 @@ import (
// NewChargerService returns the Charger Service
func NewChargerService(cfg *config.CGRConfig, dm *engine.DataManager,
cacheS *engine.CacheS, filterS *engine.FilterS, server *utils.Server,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS, server *utils.Server,
attrsChan, dispatcherChan chan rpcclient.RpcClientConnection) servmanager.Service {
return &ChargerService{
connChan: make(chan rpcclient.RpcClientConnection, 1),
@@ -40,7 +40,7 @@ func NewChargerService(cfg *config.CGRConfig, dm *engine.DataManager,
cfg: cfg,
dm: dm,
cacheS: cacheS,
filterS: filterS,
filterSChan: filterSChan,
server: server,
attrsChan: attrsChan,
dispatcherChan: dispatcherChan,
@@ -53,7 +53,7 @@ type ChargerService struct {
cfg *config.CGRConfig
dm *engine.DataManager
cacheS *engine.CacheS
filterS *engine.FilterS
filterSChan chan *engine.FilterS
server *utils.Server
attrsChan chan rpcclient.RpcClientConnection
dispatcherChan chan rpcclient.RpcClientConnection
@@ -71,6 +71,9 @@ func (chrS *ChargerService) Start() (err error) {
chrS.cacheS.GetPrecacheChannel(utils.CacheChargerProfiles)
chrS.cacheS.GetPrecacheChannel(utils.CacheChargerFilterIndexes)
filterS := <-chrS.filterSChan
chrS.filterSChan <- filterS
var attrSConn rpcclient.RpcClientConnection
if attrSConn, err = NewConnection(chrS.cfg, chrS.attrsChan, chrS.dispatcherChan, chrS.cfg.ChargerSCfg().AttributeSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
@@ -79,7 +82,7 @@ func (chrS *ChargerService) Start() (err error) {
}
chrS.Lock()
defer chrS.Unlock()
if chrS.chrS, err = engine.NewChargerService(chrS.dm, chrS.filterS, attrSConn, chrS.cfg); err != nil {
if chrS.chrS, err = engine.NewChargerService(chrS.dm, filterS, attrSConn, chrS.cfg); err != nil {
utils.Logger.Crit(
fmt.Sprintf("<%s> Could not init, error: %s",
utils.ChargerS, err.Error()))

View File

@@ -44,13 +44,15 @@ func TestChargerSReload(t *testing.T) {
close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes))
close(chS.GetPrecacheChannel(utils.CacheChargerProfiles))
close(chS.GetPrecacheChannel(utils.CacheChargerFilterIndexes))
filterSChan := make(chan *engine.FilterS, 1)
filterSChan <- nil
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil,
/*cdrStorage*/ nil /*loadStorage*/, nil /*filterSChan*/, nil,
server, nil, engineShutdown)
srvMngr.SetCacheS(chS)
attrS := NewAttributeService(cfg, nil, chS, nil, server)
chrS := NewChargerService(cfg, nil, chS, nil, server, attrS.GetIntenternalChan(), nil)
attrS := NewAttributeService(cfg, nil, chS, filterSChan, server)
chrS := NewChargerService(cfg, nil, chS, filterSChan, server, attrS.GetIntenternalChan(), nil)
srvMngr.AddServices(attrS, chrS)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)