mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-22 23:58:44 +05:00
EventExporterService implementation
This commit is contained in:
@@ -114,7 +114,8 @@ func startRpc(server *utils.Server, internalRaterChan,
|
||||
internalCdrSChan, internalRsChan, internalStatSChan,
|
||||
internalAttrSChan, internalChargerSChan, internalThdSChan, internalSuplSChan,
|
||||
internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan,
|
||||
internalLoaderSChan, internalRALsv1Chan, internalCacheSChan chan rpcclient.ClientConnector,
|
||||
internalLoaderSChan, internalRALsv1Chan, internalCacheSChan,
|
||||
internalEEsChan chan rpcclient.ClientConnector,
|
||||
exitChan chan bool) {
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
select { // Any of the rpc methods will unlock listening to rpc requests
|
||||
@@ -144,6 +145,8 @@ func startRpc(server *utils.Server, internalRaterChan,
|
||||
internalRALsv1Chan <- ralS
|
||||
case chS := <-internalCacheSChan: // added in order to start the RPC before precaching is done
|
||||
internalCacheSChan <- chS
|
||||
case eeS := <-internalEEsChan:
|
||||
internalEEsChan <- eeS
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
@@ -518,6 +521,8 @@ func main() {
|
||||
services.NewDiameterAgent(cfg, filterSChan, exitChan, connManager), // partial reload
|
||||
services.NewHTTPAgent(cfg, filterSChan, server, connManager), // no reload
|
||||
ldrs, anz, dspS, dmService, storDBService,
|
||||
services.NewEventExporterService(cfg, filterSChan,
|
||||
connManager, server, exitChan, internalEEsChan),
|
||||
)
|
||||
srvManager.StartServices()
|
||||
// Start FilterS
|
||||
@@ -526,7 +531,7 @@ func main() {
|
||||
|
||||
initServiceManagerV1(internalServeManagerChan, srvManager, server)
|
||||
|
||||
// init internalRPCSet because we can have double connections in rpc_conns and one of it could be *internal
|
||||
// init internalRPCSet to share internal connections among the engine
|
||||
engine.IntRPC = engine.NewRPCClientSet()
|
||||
engine.IntRPC.AddInternalRPCClient(utils.AnalyzerSv1, anz.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.APIerSv1, apiSv1.GetIntenternalChan())
|
||||
@@ -557,7 +562,8 @@ func main() {
|
||||
reS.GetIntenternalChan(), stS.GetIntenternalChan(),
|
||||
attrS.GetIntenternalChan(), chrS.GetIntenternalChan(), tS.GetIntenternalChan(),
|
||||
routeS.GetIntenternalChan(), smg.GetIntenternalChan(), anz.GetIntenternalChan(),
|
||||
dspS.GetIntenternalChan(), ldrs.GetIntenternalChan(), rals.GetIntenternalChan(), internalCacheSChan, exitChan)
|
||||
dspS.GetIntenternalChan(), ldrs.GetIntenternalChan(), rals.GetIntenternalChan(),
|
||||
internalCacheSChan, internalEEsChan, exitChan)
|
||||
<-exitChan
|
||||
|
||||
if *cpuProfDir != "" { // wait to end cpuProfiling
|
||||
|
||||
35
ees/ees.go
35
ees/ees.go
@@ -34,10 +34,10 @@ func onCacheEvicted(itmID string, value interface{}) {
|
||||
ee.OnEvicted(itmID, value)
|
||||
}
|
||||
|
||||
// NewERService instantiates the EEService
|
||||
func NewEEService(cfg *config.CGRConfig, filterS *engine.FilterS,
|
||||
connMgr *engine.ConnManager) *EEService {
|
||||
return &EEService{
|
||||
// NewERService instantiates the EventExporterS
|
||||
func NewEventExporterS(cfg *config.CGRConfig, filterS *engine.FilterS,
|
||||
connMgr *engine.ConnManager) *EventExporterS {
|
||||
return &EventExporterS{
|
||||
cfg: cfg,
|
||||
filterS: filterS,
|
||||
connMgr: connMgr,
|
||||
@@ -45,8 +45,8 @@ func NewEEService(cfg *config.CGRConfig, filterS *engine.FilterS,
|
||||
}
|
||||
}
|
||||
|
||||
// EEService is managing the EventExporters
|
||||
type EEService struct {
|
||||
// EventExporterS is managing the EventExporters
|
||||
type EventExporterS struct {
|
||||
cfg *config.CGRConfig
|
||||
filterS *engine.FilterS
|
||||
connMgr *engine.ConnManager
|
||||
@@ -56,9 +56,9 @@ type EEService struct {
|
||||
}
|
||||
|
||||
// ListenAndServe keeps the service alive
|
||||
func (eeS *EEService) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (err error) {
|
||||
func (eeS *EventExporterS) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (err error) {
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s>",
|
||||
utils.CoreS, utils.EventExporterService))
|
||||
utils.CoreS, utils.EventExporterS))
|
||||
for {
|
||||
select {
|
||||
case e := <-exitChan: // global exit
|
||||
@@ -68,7 +68,7 @@ func (eeS *EEService) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (
|
||||
case rld := <-cfgRld: // configuration was reloaded, destroy the cache
|
||||
cfgRld <- rld
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> reloading configuration internals.",
|
||||
utils.EventExporterService))
|
||||
utils.EventExporterS))
|
||||
eeS.setupCache(eeS.cfg.EEsCfg().Cache)
|
||||
}
|
||||
}
|
||||
@@ -76,14 +76,19 @@ func (eeS *EEService) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (
|
||||
}
|
||||
|
||||
// Shutdown is called to shutdown the service
|
||||
func (eeS *EEService) Shutdown() (err error) {
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.EventExporterService))
|
||||
func (eeS *EventExporterS) Shutdown() (err error) {
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.EventExporterS))
|
||||
eeS.setupCache(nil) // cleanup exporters
|
||||
return
|
||||
}
|
||||
|
||||
// Call implements rpcclient.ClientConnector interface for internal RPC
|
||||
func (eeS *EventExporterS) Call(serviceMethod string, args interface{}, reply interface{}) error {
|
||||
return utils.RPCCall(eeS, serviceMethod, args, reply)
|
||||
}
|
||||
|
||||
// setupCache deals with cleanup and initialization of the cache of EventExporters
|
||||
func (eeS *EEService) setupCache(chCfgs map[string]*config.CacheParamCfg) {
|
||||
func (eeS *EventExporterS) setupCache(chCfgs map[string]*config.CacheParamCfg) {
|
||||
eeS.eesMux.Lock()
|
||||
for chID, ch := range eeS.eesChs { // cleanup
|
||||
ch.Clear()
|
||||
@@ -99,7 +104,7 @@ func (eeS *EEService) setupCache(chCfgs map[string]*config.CacheParamCfg) {
|
||||
eeS.eesMux.Unlock()
|
||||
}
|
||||
|
||||
func (eeS *EEService) attrSProcessEvent(cgrEv *utils.CGREventWithOpts, attrIDs []string, ctx string) (err error) {
|
||||
func (eeS *EventExporterS) attrSProcessEvent(cgrEv *utils.CGREventWithOpts, attrIDs []string, ctx string) (err error) {
|
||||
var rplyEv engine.AttrSProcessEventReply
|
||||
attrArgs := &engine.AttrArgsProcessEvent{
|
||||
AttributeIDs: attrIDs,
|
||||
@@ -121,7 +126,7 @@ func (eeS *EEService) attrSProcessEvent(cgrEv *utils.CGREventWithOpts, attrIDs [
|
||||
}
|
||||
|
||||
// ProcessEvent will be called each time a new event is received from readers
|
||||
func (eeS *EEService) V1ProcessEvent(cgrEv *utils.CGREventWithOpts) (err error) {
|
||||
func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithOpts) (err error) {
|
||||
eeS.cfg.RLocks(config.EEsJson)
|
||||
defer eeS.cfg.RUnlocks(config.EEsJson)
|
||||
|
||||
@@ -180,7 +185,7 @@ func (eeS *EEService) V1ProcessEvent(cgrEv *utils.CGREventWithOpts) (err error)
|
||||
if err := ee.ExportEvent(cgrEv.CGREvent); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> with id <%s>, error: <%s>",
|
||||
utils.EventExporterService, ee.ID(), err.Error()))
|
||||
utils.EventExporterS, ee.ID(), err.Error()))
|
||||
withErr = true
|
||||
}
|
||||
if evict {
|
||||
|
||||
@@ -54,8 +54,8 @@ type AttributeService struct {
|
||||
server *utils.Server
|
||||
|
||||
attrS *engine.AttributeService
|
||||
rpc *v1.AttributeSv1
|
||||
connChan chan rpcclient.ClientConnector
|
||||
rpc *v1.AttributeSv1 // useful on restart
|
||||
connChan chan rpcclient.ClientConnector // publish the internal Subsystem when available
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
|
||||
124
services/ees.go
Normal file
124
services/ees.go
Normal file
@@ -0,0 +1,124 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package services
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/ees"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
// NewEventExporterService constructs EventExporterService
|
||||
func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
|
||||
connMgr *engine.ConnManager, server *utils.Server, exitChan chan bool,
|
||||
intConnChan chan rpcclient.ClientConnector) servmanager.Service {
|
||||
return &EventExporterService{
|
||||
cfg: cfg,
|
||||
filterSChan: filterSChan,
|
||||
connMgr: connMgr,
|
||||
server: server,
|
||||
exitChan: exitChan,
|
||||
intConnChan: intConnChan,
|
||||
rldChan: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// EventExporterService is the service structure for EventExporterS
|
||||
type EventExporterService struct {
|
||||
sync.RWMutex
|
||||
|
||||
cfg *config.CGRConfig
|
||||
filterSChan chan *engine.FilterS
|
||||
connMgr *engine.ConnManager
|
||||
server *utils.Server
|
||||
exitChan chan bool
|
||||
intConnChan chan rpcclient.ClientConnector
|
||||
rldChan chan struct{}
|
||||
|
||||
eeS *ees.EventExporterS
|
||||
}
|
||||
|
||||
// GetIntenternalChan is deprecated and it will be removed shortly
|
||||
func (es *EventExporterService) GetIntenternalChan() (conn chan rpcclient.ClientConnector) {
|
||||
panic("deprecated method")
|
||||
}
|
||||
|
||||
// IsRunning returns if the service is running
|
||||
func (es *EventExporterService) IsRunning() bool {
|
||||
es.RLock()
|
||||
defer es.RUnlock()
|
||||
return es != nil && es.eeS != nil
|
||||
}
|
||||
|
||||
// ServiceName returns the service name
|
||||
func (es *EventExporterService) ServiceName() string {
|
||||
return utils.EventExporterS
|
||||
}
|
||||
|
||||
// ShouldRun returns if the service should be running
|
||||
func (es *EventExporterService) ShouldRun() (should bool) {
|
||||
es.cfg.RLocks(config.EEsJson)
|
||||
should = es.cfg.EEsCfg().Enabled
|
||||
es.cfg.RUnlocks(config.EEsJson)
|
||||
return
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (es *EventExporterService) Reload() (err error) {
|
||||
es.rldChan <- struct{}{}
|
||||
return // for the momment nothing to reload
|
||||
}
|
||||
|
||||
// Shutdown stops the service
|
||||
func (es *EventExporterService) Shutdown() (err error) {
|
||||
es.Lock()
|
||||
defer es.Unlock()
|
||||
if err = es.eeS.Shutdown(); err != nil {
|
||||
return
|
||||
}
|
||||
es.eeS = nil
|
||||
<-es.intConnChan
|
||||
return
|
||||
}
|
||||
|
||||
// Start should handle the service start
|
||||
func (es *EventExporterService) Start() (err error) {
|
||||
if es.IsRunning() {
|
||||
return fmt.Errorf("service aleady running")
|
||||
}
|
||||
|
||||
fltrS := <-es.filterSChan
|
||||
es.filterSChan <- fltrS
|
||||
|
||||
es.Lock()
|
||||
es.eeS = ees.NewEventExporterS(es.cfg, fltrS, es.connMgr)
|
||||
es.Unlock()
|
||||
if err != nil {
|
||||
|
||||
return
|
||||
}
|
||||
es.intConnChan <- es.eeS
|
||||
return es.eeS.ListenAndServe(es.exitChan, es.rldChan)
|
||||
}
|
||||
@@ -78,12 +78,15 @@ func (sts *StatService) Start() (err error) {
|
||||
|
||||
sts.Lock()
|
||||
defer sts.Unlock()
|
||||
sts.sts, err = engine.NewStatService(datadb, sts.cfg, filterS, sts.connMgr)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<StatS> Could not init, error: %s", err.Error()))
|
||||
if sts.sts, err = engine.NewStatService(datadb,
|
||||
sts.cfg, filterS, sts.connMgr); err != nil {
|
||||
utils.Logger.Crit(
|
||||
fmt.Sprintf("<StatS> Could not init, error: %s",
|
||||
err.Error()))
|
||||
return
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.StatS))
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem",
|
||||
utils.CoreS, utils.StatS))
|
||||
sts.sts.StartLoop()
|
||||
sts.rpc = v1.NewStatSv1(sts.sts)
|
||||
if !sts.cfg.DispatcherSCfg().Enabled {
|
||||
|
||||
@@ -720,7 +720,7 @@ const (
|
||||
Count = "Count"
|
||||
ProfileID = "ProfileID"
|
||||
SortedRoutes = "SortedRoutes"
|
||||
EventExporterService = "EventExporterService"
|
||||
EventExporterS = "EventExporterS"
|
||||
MetaMonthly = "*monthly"
|
||||
MetaYearly = "*yearly"
|
||||
MetaDaily = "*daily"
|
||||
|
||||
Reference in New Issue
Block a user