Added EventReader as service in ServiceManager

This commit is contained in:
Trial97
2019-09-24 16:25:18 +03:00
committed by Dan Christian Bogos
parent 2037cdda82
commit 083cc56eba
4 changed files with 130 additions and 47 deletions

View File

@@ -38,7 +38,6 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/ers"
"github.com/cgrates/cgrates/loaders"
"github.com/cgrates/cgrates/services"
"github.com/cgrates/cgrates/servmanager"
@@ -140,39 +139,6 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne
}
}
// startERs handles starting of the EventReader Service
func startERs(sSChan, dspSChan chan rpcclient.RpcClientConnection,
filterSChan chan *engine.FilterS,
cfgRld chan struct{}, exitChan chan bool) {
var err error
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ERs))
filterS := <-filterSChan
filterSChan <- filterS
// overwrite the session service channel with dispatcher one
if cfg.DispatcherSCfg().Enabled {
sSChan = dspSChan
}
var sS rpcclient.RpcClientConnection
if sS, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.ERsCfg().SessionSConns, sSChan, false); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>",
utils.ERs, utils.SessionS, err.Error()))
exitChan <- true
return
}
// build the service
erS := ers.NewERService(cfg, filterS, sS, exitChan)
if err = erS.ListenAndServe(cfgRld); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error()))
}
exitChan <- true
}
func startAsteriskAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
var err error
var sS rpcclient.RpcClientConnection
@@ -1032,7 +998,7 @@ func main() {
apiv2, _ := srvManager.GetService(utils.ApierV2)
resp, _ := srvManager.GetService(utils.ResponderS)
smg := services.NewSessionService()
srvManager.AddService(chS, attrS, chrS, tS, stS, reS, supS, schS, cdrS, rals, smg)
srvManager.AddService(chS, attrS, chrS, tS, stS, reS, supS, schS, cdrS, rals, smg, services.NewEventReaderService())
internalAttributeSChan := attrS.GetIntenternalChan()
internalChargerSChan := chrS.GetIntenternalChan()
internalThresholdSChan := tS.GetIntenternalChan()
@@ -1084,10 +1050,6 @@ func main() {
// Start CDRC components if necessary
go startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan, filterSChan, exitChan)
if cfg.ERsCfg().Enabled {
go startERs(internalSMGChan, internalDispatcherSChan,
filterSChan, cfg.GetReloadChan(config.ERsJson), exitChan)
}
// Start FreeSWITCHAgent
if cfg.FsAgentCfg().Enabled {
go startFsAgent(internalSMGChan, internalDispatcherSChan, exitChan)

View File

@@ -37,7 +37,7 @@ type erEvent struct {
// NewERService instantiates the ERService
func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS,
sS rpcclient.RpcClientConnection, exitChan chan bool) *ERService {
sS rpcclient.RpcClientConnection, stopChan chan struct{}) *ERService {
return &ERService{
cfg: cfg,
rdrs: make(map[string]EventReader),
@@ -47,7 +47,7 @@ func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS,
rdrErr: make(chan error),
filterS: filterS,
sS: sS,
exitChan: exitChan,
stopChan: stopChan,
}
}
@@ -63,7 +63,7 @@ type ERService struct {
filterS *engine.FilterS
sS rpcclient.RpcClientConnection // connection towards SessionS
exitChan chan bool
stopChan chan struct{}
}
// ListenAndServe keeps the service alive
@@ -84,8 +84,7 @@ func (erS *ERService) ListenAndServe(cfgRldChan chan struct{}) (err error) {
fmt.Sprintf("<%s> running reader got error: <%s>",
utils.ERs, err.Error()))
return
case e := <-erS.exitChan:
erS.exitChan <- e // put back for the others listening for shutdown request
case <-erS.stopChan:
return
case erEv := <-erS.rdrEvents:
if err := erS.processEvent(erEv.cgrEvent, erEv.rdrCfg); err != nil {
@@ -95,7 +94,6 @@ func (erS *ERService) ListenAndServe(cfgRldChan chan struct{}) (err error) {
}
}
}
return
}
// addReader will add a new reader to the service
@@ -115,7 +113,7 @@ func (erS *ERService) addReader(rdrID string, cfgIdx int) (err error) {
func (erS *ERService) handleReloads(cfgRldChan chan struct{}) {
for {
select {
case <-erS.exitChan:
case <-erS.stopChan:
return
case <-cfgRldChan:
cfgIDs := make(map[string]int)
@@ -150,7 +148,7 @@ func (erS *ERService) handleReloads(cfgRldChan chan struct{}) {
utils.Logger.Crit(
fmt.Sprintf("<%s> adding reader <%s> got error: <%s>",
utils.ERs, id, err.Error()))
erS.exitChan <- true
erS.rdrErr <- err
}
}
erS.Unlock()

116
services/ers.go Normal file
View File

@@ -0,0 +1,116 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package services
import (
"fmt"
"sync"
"github.com/cgrates/cgrates/ers"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// NewEventReaderService returns the EventReader Service
func NewEventReaderService() servmanager.Service {
return &EventReaderService{
connChan: make(chan rpcclient.RpcClientConnection, 1),
rldChan: make(chan struct{}, 1),
}
}
// EventReaderService implements Service interface
type EventReaderService struct {
sync.RWMutex
ers *ers.ERService
rldChan chan struct{}
stopChan chan struct{}
connChan chan rpcclient.RpcClientConnection
}
// Start should handle the sercive start
func (erS *EventReaderService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) {
if erS.IsRunning() {
return fmt.Errorf("service aleady running")
}
erS.Lock()
defer erS.Unlock()
// remake tht stop chan
erS.stopChan = make(chan struct{}, 1)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ERs))
var sS rpcclient.RpcClientConnection
if sS, err = sp.GetConnection(utils.SessionS, sp.GetConfig().ERsCfg().SessionSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>",
utils.ERs, utils.SessionS, err.Error()))
return
}
// build the service
erS.ers = ers.NewERService(sp.GetConfig(), sp.GetFilterS(), sS, erS.stopChan)
go func(erS *ers.ERService, rldChan chan struct{}) {
if err = erS.ListenAndServe(rldChan); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error()))
}
sp.GetExitChan() <- true
}(erS.ers, erS.rldChan)
return
}
// GetIntenternalChan returns the internal connection chanel
func (erS *EventReaderService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) {
return erS.connChan
}
// Reload handles the change of config
func (erS *EventReaderService) Reload(sp servmanager.ServiceProvider) (err error) {
erS.Lock()
erS.rldChan <- struct{}{}
erS.Unlock()
return
}
// Shutdown stops the service
func (erS *EventReaderService) Shutdown() (err error) {
erS.Lock()
close(erS.stopChan)
erS.ers = nil
<-erS.connChan
erS.Unlock()
return
}
// GetRPCInterface returns the interface to register for server
func (erS *EventReaderService) GetRPCInterface() interface{} {
return erS.ers
}
// IsRunning returns if the service is running
func (erS *EventReaderService) IsRunning() bool {
erS.RLock()
defer erS.RUnlock()
return erS != nil && erS.ers != nil
}
// ServiceName returns the service name
func (erS *EventReaderService) ServiceName() string {
return utils.ERs
}

View File

@@ -277,6 +277,9 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
if srvMngr.GetConfig().SessionSCfg().Enabled {
go srvMngr.startService(utils.SessionS)
}
if srvMngr.GetConfig().ERsCfg().Enabled {
go srvMngr.startService(utils.ERs)
}
// startServer()
return
}
@@ -340,6 +343,10 @@ func (srvMngr *ServiceManager) handleReload() {
if err = srvMngr.reloadService(utils.SessionS, srvMngr.GetConfig().SessionSCfg().Enabled); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.ERsJson):
if err = srvMngr.reloadService(utils.ERs, srvMngr.GetConfig().ERsCfg().Enabled); err != nil {
return
}
}
// handle RPC server
}