From 834974844184e22865d89ef85862521116a32167 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 3 Oct 2019 10:22:26 +0300 Subject: [PATCH] Updated how FilterS is passed to service --- cmd/cgr-engine/cgr-engine.go | 9 +++------ services/attributes.go | 30 ++++++++++++++++-------------- services/attributes_it_test.go | 4 +++- services/chargers.go | 11 +++++++---- services/chargers_it_test.go | 6 ++++-- 5 files changed, 33 insertions(+), 27 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 0dcc077ed..45168f854 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -673,18 +673,15 @@ func main() { initCoreSv1(internalCoreSv1Chan, server) // Start FilterS - // force a litl - startFilterService(filterSChan, cacheS, internalStatSChan, internalRsChan, internalRaterChan, cfg, dm, exitChan) - filterS := <-filterSChan - filterSChan <- filterS + go startFilterService(filterSChan, cacheS, internalStatSChan, internalRsChan, internalRaterChan, cfg, dm, exitChan) // Start ServiceManager srvManager := servmanager.NewServiceManager(cfg, dm, cdrDb, loadDb, filterSChan, server, internalDispatcherSChan, exitChan) // chS := services.NewCacheService() - attrS := services.NewAttributeService(cfg, dm, cacheS, filterS, server) - chrS := services.NewChargerService(cfg, dm, cacheS, filterS, server, + attrS := services.NewAttributeService(cfg, dm, cacheS, filterSChan, server) + chrS := services.NewChargerService(cfg, dm, cacheS, filterSChan, server, attrS.GetIntenternalChan(), internalDispatcherSChan) /* tS := services.NewThresholdService() diff --git a/services/attributes.go b/services/attributes.go index 53250e263..fec07a13a 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -32,26 +32,26 @@ import ( // NewAttributeService returns the Attribute Service func NewAttributeService(cfg *config.CGRConfig, dm *engine.DataManager, - cacheS *engine.CacheS, filterS *engine.FilterS, + cacheS *engine.CacheS, filterSChan chan *engine.FilterS, server *utils.Server) servmanager.Service { return &AttributeService{ - cfg: cfg, - dm: dm, - cacheS: cacheS, - filterS: filterS, - server: server, - connChan: make(chan rpcclient.RpcClientConnection, 1), + cfg: cfg, + dm: dm, + cacheS: cacheS, + filterSChan: filterSChan, + server: server, + connChan: make(chan rpcclient.RpcClientConnection, 1), } } // AttributeService implements Service interface type AttributeService struct { sync.RWMutex - cfg *config.CGRConfig - dm *engine.DataManager - cacheS *engine.CacheS - filterS *engine.FilterS - server *utils.Server + cfg *config.CGRConfig + dm *engine.DataManager + cacheS *engine.CacheS + filterSChan chan *engine.FilterS + server *utils.Server attrS *engine.AttributeService rpc *v1.AttributeSv1 @@ -67,10 +67,12 @@ func (attrS *AttributeService) Start() (err error) { <-attrS.cacheS.GetPrecacheChannel(utils.CacheAttributeProfiles) <-attrS.cacheS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes) + filterS := <-attrS.filterSChan + attrS.filterSChan <- filterS + attrS.Lock() defer attrS.Unlock() - attrS.attrS, err = engine.NewAttributeService(attrS.dm, - attrS.filterS, attrS.cfg) + attrS.attrS, err = engine.NewAttributeService(attrS.dm, filterS, attrS.cfg) if err != nil { utils.Logger.Crit( fmt.Sprintf("<%s> Could not init, error: %s", diff --git a/services/attributes_it_test.go b/services/attributes_it_test.go index 2ea245f49..576d4f86b 100644 --- a/services/attributes_it_test.go +++ b/services/attributes_it_test.go @@ -40,6 +40,8 @@ func TestAttributeSReload(t *testing.T) { engineShutdown := make(chan bool, 1) chS := engine.NewCacheS(cfg, nil) + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles)) close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes)) server := utils.NewServer() @@ -49,7 +51,7 @@ func TestAttributeSReload(t *testing.T) { engineShutdown) srvMngr.SetCacheS(chS) attrS := NewAttributeService(cfg, nil, - chS, nil, server) + chS, filterSChan, server) srvMngr.AddServices(attrS) if err = srvMngr.StartServices(); err != nil { t.Error(err) diff --git a/services/chargers.go b/services/chargers.go index cacfccb07..7fd590982 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -32,7 +32,7 @@ import ( // NewChargerService returns the Charger Service func NewChargerService(cfg *config.CGRConfig, dm *engine.DataManager, - cacheS *engine.CacheS, filterS *engine.FilterS, server *utils.Server, + cacheS *engine.CacheS, filterSChan chan *engine.FilterS, server *utils.Server, attrsChan, dispatcherChan chan rpcclient.RpcClientConnection) servmanager.Service { return &ChargerService{ connChan: make(chan rpcclient.RpcClientConnection, 1), @@ -40,7 +40,7 @@ func NewChargerService(cfg *config.CGRConfig, dm *engine.DataManager, cfg: cfg, dm: dm, cacheS: cacheS, - filterS: filterS, + filterSChan: filterSChan, server: server, attrsChan: attrsChan, dispatcherChan: dispatcherChan, @@ -53,7 +53,7 @@ type ChargerService struct { cfg *config.CGRConfig dm *engine.DataManager cacheS *engine.CacheS - filterS *engine.FilterS + filterSChan chan *engine.FilterS server *utils.Server attrsChan chan rpcclient.RpcClientConnection dispatcherChan chan rpcclient.RpcClientConnection @@ -71,6 +71,9 @@ func (chrS *ChargerService) Start() (err error) { chrS.cacheS.GetPrecacheChannel(utils.CacheChargerProfiles) chrS.cacheS.GetPrecacheChannel(utils.CacheChargerFilterIndexes) + filterS := <-chrS.filterSChan + chrS.filterSChan <- filterS + var attrSConn rpcclient.RpcClientConnection if attrSConn, err = NewConnection(chrS.cfg, chrS.attrsChan, chrS.dispatcherChan, chrS.cfg.ChargerSCfg().AttributeSConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", @@ -79,7 +82,7 @@ func (chrS *ChargerService) Start() (err error) { } chrS.Lock() defer chrS.Unlock() - if chrS.chrS, err = engine.NewChargerService(chrS.dm, chrS.filterS, attrSConn, chrS.cfg); err != nil { + if chrS.chrS, err = engine.NewChargerService(chrS.dm, filterS, attrSConn, chrS.cfg); err != nil { utils.Logger.Crit( fmt.Sprintf("<%s> Could not init, error: %s", utils.ChargerS, err.Error())) diff --git a/services/chargers_it_test.go b/services/chargers_it_test.go index 709f3e65d..4535ffc7f 100644 --- a/services/chargers_it_test.go +++ b/services/chargers_it_test.go @@ -44,13 +44,15 @@ func TestChargerSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes)) close(chS.GetPrecacheChannel(utils.CacheChargerProfiles)) close(chS.GetPrecacheChannel(utils.CacheChargerFilterIndexes)) + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil, /*cdrStorage*/ nil /*loadStorage*/, nil /*filterSChan*/, nil, server, nil, engineShutdown) srvMngr.SetCacheS(chS) - attrS := NewAttributeService(cfg, nil, chS, nil, server) - chrS := NewChargerService(cfg, nil, chS, nil, server, attrS.GetIntenternalChan(), nil) + attrS := NewAttributeService(cfg, nil, chS, filterSChan, server) + chrS := NewChargerService(cfg, nil, chS, filterSChan, server, attrS.GetIntenternalChan(), nil) srvMngr.AddServices(attrS, chrS) if err = srvMngr.StartServices(); err != nil { t.Error(err)