Updated ErS

This commit is contained in:
Trial97
2019-10-04 11:02:41 +03:00
committed by Dan Christian Bogos
parent 82771490a2
commit cc00a80f51
4 changed files with 47 additions and 29 deletions

View File

@@ -707,9 +707,10 @@ func main() {
attrS.GetIntenternalChan(), cdrS.GetIntenternalChan(), internalDispatcherSChan, exitChan)
srvManager.AddServices(attrS, chrS, tS, stS, reS, supS, schS, rals,
rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS, smg)
rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS, smg,
services.NewEventReaderService(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan),
)
/*
services.NewEventReaderService(),
services.NewDNSAgent(),
services.NewFreeswitchAgent(),
services.NewKamailioAgent(),
@@ -719,9 +720,6 @@ func main() {
services.NewHTTPAgent(), // no reload
*/
/*
internalSMGChan := smg.GetIntenternalChan()
*/
srvManager.StartServices()
// Start FilterS

View File

@@ -22,6 +22,8 @@ import (
"fmt"
"sync"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/ers"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
@@ -29,22 +31,35 @@ import (
)
// NewEventReaderService returns the EventReader Service
func NewEventReaderService() servmanager.Service {
func NewEventReaderService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
sSChan, dispatcherChan chan rpcclient.RpcClientConnection,
exitChan chan bool) servmanager.Service {
return &EventReaderService{
rldChan: make(chan struct{}, 1),
rldChan: make(chan struct{}, 1),
cfg: cfg,
filterSChan: filterSChan,
sSChan: sSChan,
dispatcherChan: dispatcherChan,
exitChan: exitChan,
}
}
// EventReaderService implements Service interface
type EventReaderService struct {
sync.RWMutex
cfg *config.CGRConfig
filterSChan chan *engine.FilterS
sSChan chan rpcclient.RpcClientConnection
dispatcherChan chan rpcclient.RpcClientConnection
exitChan chan bool
ers *ers.ERService
rldChan chan struct{}
stopChan chan struct{}
}
// Start should handle the sercive start
func (erS *EventReaderService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) {
func (erS *EventReaderService) Start() (err error) {
if erS.IsRunning() {
return fmt.Errorf("service aleady running")
}
@@ -52,22 +67,25 @@ func (erS *EventReaderService) Start(sp servmanager.ServiceProvider, waitCache b
erS.Lock()
defer erS.Unlock()
filterS := <-erS.filterSChan
erS.filterSChan <- filterS
// remake tht stop chan
erS.stopChan = make(chan struct{}, 1)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ERs))
var sS rpcclient.RpcClientConnection
if sS, err = sp.NewConnection(utils.SessionS, sp.GetConfig().ERsCfg().SessionSConns); err != nil {
if sS, err = NewConnection(erS.cfg, erS.sSChan, erS.dispatcherChan, erS.cfg.ERsCfg().SessionSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>",
utils.ERs, utils.SessionS, err.Error()))
return
}
// build the service
erS.ers = ers.NewERService(sp.GetConfig(), sp.GetFilterS(), sS, erS.stopChan)
go func(erS *ers.ERService, rldChan chan struct{}) {
if err := erS.ListenAndServe(rldChan); err != nil {
erS.ers = ers.NewERService(erS.cfg, filterS, sS, erS.stopChan)
go func(ers *ers.ERService, rldChan chan struct{}) {
if err := ers.ListenAndServe(rldChan); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error()))
sp.GetExitChan() <- true
erS.exitChan <- true
}
}(erS.ers, erS.rldChan)
return
@@ -79,9 +97,9 @@ func (erS *EventReaderService) GetIntenternalChan() (conn chan rpcclient.RpcClie
}
// Reload handles the change of config
func (erS *EventReaderService) Reload(sp servmanager.ServiceProvider) (err error) {
func (erS *EventReaderService) Reload() (err error) {
var sS rpcclient.RpcClientConnection
if sS, err = sp.NewConnection(utils.SessionS, sp.GetConfig().ERsCfg().SessionSConns); err != nil {
if sS, err = NewConnection(erS.cfg, erS.sSChan, erS.dispatcherChan, erS.cfg.ERsCfg().SessionSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>",
utils.ERs, utils.SessionS, err.Error()))
return
@@ -102,11 +120,6 @@ func (erS *EventReaderService) Shutdown() (err error) {
return
}
// GetRPCInterface returns the interface to register for server
func (erS *EventReaderService) GetRPCInterface() interface{} {
return erS.ers
}
// IsRunning returns if the service is running
func (erS *EventReaderService) IsRunning() bool {
erS.RLock()
@@ -118,3 +131,8 @@ func (erS *EventReaderService) IsRunning() bool {
func (erS *EventReaderService) ServiceName() string {
return utils.ERs
}
// ShouldRun returns if the service should be running
func (erS *EventReaderService) ShouldRun() bool {
return erS.cfg.ERsCfg().Enabled
}

View File

@@ -57,8 +57,10 @@ func TestEventReaderSReload(t *testing.T) {
/*loadStorage*/ nil, filterSChan,
server, nil, engineShutdown)
srvMngr.SetCacheS(chS)
attrS := NewEventReaderService()
srvMngr.AddService(attrS, NewSessionService())
sS := NewSessionService(cfg, nil, server, nil,
nil, nil, nil, nil, nil, nil, nil, nil, engineShutdown)
attrS := NewEventReaderService(cfg, filterSChan, sS.GetIntenternalChan(), nil, engineShutdown)
srvMngr.AddServices(attrS, sS)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -270,10 +270,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
}
if srvMngr.GetConfig().SessionSCfg().Enabled {
go srvMngr.startService(utils.SessionS)
}
if srvMngr.GetConfig().ERsCfg().Enabled {
go srvMngr.startService(utils.ERs)
} /*
if srvMngr.GetConfig().ERsCfg().Enabled {
go srvMngr.startService(utils.ERs)
}
if srvMngr.GetConfig().DNSAgentCfg().Enabled {
go srvMngr.startService(utils.DNSAgent)
}
@@ -371,11 +371,11 @@ func (srvMngr *ServiceManager) handleReload() {
case <-srvMngr.GetConfig().GetReloadChan(config.SessionSJson):
if err = srvMngr.reloadService(utils.SessionS); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.ERsJson):
if err = srvMngr.reloadService(utils.ERs); err != nil {
return
} /*
case <-srvMngr.GetConfig().GetReloadChan(config.ERsJson):
if err = srvMngr.reloadService(utils.ERs); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.DNSAgentJson):
if err = srvMngr.reloadService(utils.DNSAgent); err != nil {
return