Engine with startERs section supporting configuration reloads

This commit is contained in:
DanB
2019-08-26 22:05:02 +02:00
parent 673ad90b95
commit bee505b278
3 changed files with 135 additions and 71 deletions

View File

@@ -39,6 +39,7 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/ers"
"github.com/cgrates/cgrates/loaders"
"github.com/cgrates/cgrates/scheduler"
"github.com/cgrates/cgrates/servmanager"
@@ -320,6 +321,46 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
}
}
// startERs handles starting of the EventReader Service
func startERs(sSChan, dspSChan chan rpcclient.RpcClientConnection,
filterSChan chan *engine.FilterS,
cfgRld chan struct{}, exitChan chan bool) {
var err error
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ERs))
filterS := <-filterSChan
filterSChan <- filterS
// overwrite the session service channel with dispatcher one
if cfg.DispatcherSCfg().Enabled {
sSChan = dspSChan
}
var sS rpcclient.RpcClientConnection
if sS, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.ERsCfg().SessionSConns, sSChan, false); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>",
utils.ERs, utils.SessionS, err.Error()))
exitChan <- true
return
}
var erS *ers.ERService
if erS, err = ers.NewERService(cfg, filterS, sS); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error()))
exitChan <- true
return
}
if err = erS.ListenAndServe(cfgRld, exitChan); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error()))
}
exitChan <- true
}
func startAsteriskAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
var err error
var sS rpcclient.RpcClientConnection
@@ -1712,6 +1753,11 @@ func main() {
internalAttributeSChan, internalCdrSChan, internalChargerSChan,
internalDispatcherSChan, server, dm, exitChan)
}
if cfg.ERsCfg().Enabled {
go startERs(internalSMGChan, internalDispatcherSChan,
filterSChan, cfg.GetReloadChan(config.ERsJson), exitChan)
}
// Start FreeSWITCHAgent
if cfg.FsAgentCfg().Enabled {
go startFsAgent(internalSMGChan, internalDispatcherSChan, exitChan)

View File

@@ -27,6 +27,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"github.com/cgrates/cgrates/utils"
@@ -128,6 +129,8 @@ func SetCgrConfig(cfg *CGRConfig) {
func NewDefaultCGRConfig() (*CGRConfig, error) {
cfg := new(CGRConfig)
cfg.lks = make(map[string]*sync.RWMutex)
cfg.lks[ERsJson] = new(sync.RWMutex)
cfg.DataFolderPath = "/usr/share/cgrates/"
cfg.MaxCallDuration = time.Duration(3) * time.Hour // Hardcoded for now
@@ -182,6 +185,9 @@ func NewDefaultCGRConfig() (*CGRConfig, error) {
cfg.ConfigReloads[utils.SMAsterisk] = make(chan struct{}, 1)
cfg.ConfigReloads[utils.SMAsterisk] <- struct{}{} // Unlock the channel
cfg.rldChans = make(map[string]chan struct{})
cfg.rldChans[ERsJson] = make(chan struct{}, 1)
cgrJsonCfg, err := NewCgrJsonCfgFromReader(strings.NewReader(CGRATES_CFG_JSON))
if err != nil {
return nil, err
@@ -311,6 +317,7 @@ func loadConfigFromHttp(cfg *CGRConfig, urlPaths string) (*CGRConfig, error) {
// Holds system configuration, defaults are overwritten with values from config file if found
type CGRConfig struct {
lks map[string]*sync.RWMutex
MaxCallDuration time.Duration // The maximum call duration (used by responder when querying DerivedCharging) // ToDo: export it in configuration file
DataFolderPath string // Path towards data folder, for tests internal usage, not loading out of .json options
ConfigPath string // Path towards config
@@ -326,6 +333,7 @@ type CGRConfig struct {
httpAgentCfg HttpAgentCfgs // HttpAgent configs
ConfigReloads map[string]chan struct{} // Signals to specific entities that a config reload should occur
rldChans map[string]chan struct{} // index here the channels used for reloads
generalCfg *GeneralCfg // General config
dataDbCfg *DataDbCfg // Database config
@@ -1233,10 +1241,17 @@ func (cfg *CGRConfig) ApierCfg() *ApierCfg {
return cfg.apier
}
// ERsCfg reads the EventReader configuration
func (cfg *CGRConfig) ERsCfg() *ERsCfg {
cfg.lks[ERsJson].RLock()
defer cfg.lks[ERsJson].RUnlock()
return cfg.ersCfg
}
func (cfg *CGRConfig) GetReloadChan(sectID string) chan struct{} {
return cfg.rldChans[sectID]
}
// Call implements rpcclient.RpcClientConnection interface for internal RPC
func (cSv1 *CGRConfig) Call(serviceMethod string,
args interface{}, reply interface{}) error {

View File

@@ -32,15 +32,12 @@ import (
)
// NewERService instantiates the ERService
func NewERService(cfg *config.CGRConfig,
filterS *engine.FilterS,
sS rpcclient.RpcClientConnection,
cfgRld chan struct{}) (erS *ERService, err error) {
func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS,
sS rpcclient.RpcClientConnection) (erS *ERService, err error) {
erS = &ERService{
cfg: cfg,
rdrs: make(map[string][]EventReader),
stopLsn: make(map[string]chan struct{}),
cfgRld: cfgRld,
sS: sS,
}
return
@@ -50,16 +47,16 @@ func NewERService(cfg *config.CGRConfig,
type ERService struct {
sync.RWMutex
cfg *config.CGRConfig
rdrs map[string][]EventReader // list of readers on specific paths map[path]reader
stopLsn map[string]chan struct{} // stops listening on paths
filterS *engine.FilterS
rdrs map[string][]EventReader // list of readers on specific paths map[path]reader
stopLsn map[string]chan struct{} // stops listening on paths
cfgRld chan struct{} // signal the need of config reloading - chan path / *any
sS rpcclient.RpcClientConnection // connection towards SessionS
}
// ListenAndServe loops keeps the service alive
func (erS *ERService) ListenAndServe(exitChan chan bool) (err error) {
func (erS *ERService) ListenAndServe(cfgRldChan chan struct{},
exitChan chan bool) (err error) {
for _, rdrCfg := range erS.cfg.ERsCfg().Readers {
var rdr EventReader
if rdr, err = NewEventReader(rdrCfg); err != nil {
@@ -76,7 +73,7 @@ func (erS *ERService) ListenAndServe(exitChan chan bool) (err error) {
erS.rdrs[rdrCfg.SourcePath] = append(erS.rdrs[rdrCfg.SourcePath], rdr)
}
go erS.handleReloads()
go erS.handleReloads(cfgRldChan, exitChan)
e := <-exitChan
exitChan <- e // put back for the others listening for shutdown request
return
@@ -88,78 +85,83 @@ type erCfgRef struct {
idx int
}
func (erS *ERService) handleReloads() {
// handleReloads will handle the config reloads which are signaled over cfgRldChan
func (erS *ERService) handleReloads(cfgRldChan chan struct{}, exitChan chan bool) {
for {
<-erS.cfgRld
cfgIDs := make(map[string]int) // IDs which are configured in EventReader profiles as map[id]cfgIdx
inUseIDs := make(map[string]*erCfgRef) // IDs which are running in ERService map[id]rdrIdx
addIDs := make(map[string]struct{}) // IDs which need to be added to ERService
remIDs := make(map[string]struct{}) // IDs which need to be removed from ERService
// index config IDs
for i, rdrCfg := range erS.cfg.ERsCfg().Readers {
cfgIDs[rdrCfg.ID] = i
}
erS.Lock()
// index in use IDs
for path, rdrs := range erS.rdrs {
for i, rdr := range rdrs {
inUseIDs[rdr.ID()] = &erCfgRef{path: path, idx: i}
select {
case <-exitChan:
return
case <-cfgRldChan:
cfgIDs := make(map[string]int) // IDs which are configured in EventReader profiles as map[id]cfgIdx
inUseIDs := make(map[string]*erCfgRef) // IDs which are running in ERService map[id]rdrIdx
addIDs := make(map[string]struct{}) // IDs which need to be added to ERService
remIDs := make(map[string]struct{}) // IDs which need to be removed from ERService
// index config IDs
for i, rdrCfg := range erS.cfg.ERsCfg().Readers {
cfgIDs[rdrCfg.ID] = i
}
}
// find out removed ids
for id := range inUseIDs {
if _, has := cfgIDs[id]; !has {
remIDs[id] = struct{}{}
}
}
// find out added ids
for id := range cfgIDs {
if _, has := inUseIDs[id]; !has {
addIDs[id] = struct{}{}
}
}
for id := range remIDs {
ref := inUseIDs[id]
rdrSlc := erS.rdrs[ref.path]
// remove the ids
copy(rdrSlc[ref.idx:], rdrSlc[ref.idx+1:])
rdrSlc[len(rdrSlc)-1] = nil // so it can be garbage collected
rdrSlc = rdrSlc[:len(rdrSlc)-1]
if len(rdrSlc) == 0 { // no more
delete(erS.rdrs, ref.path)
if chn, has := erS.stopLsn[ref.path]; has {
close(chn)
erS.Lock()
// index in use IDs
for path, rdrs := range erS.rdrs {
for i, rdr := range rdrs {
inUseIDs[rdr.ID()] = &erCfgRef{path: path, idx: i}
}
}
}
// add new ids:
for id := range addIDs {
rdrCfg := erS.cfg.ERsCfg().Readers[cfgIDs[id]]
if rdr, err := NewEventReader(rdrCfg); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> error reloading config with ID: <%s>, err: <%s>",
utils.ERs, id, err.Error()))
} else {
if _, hasPath := erS.rdrs[rdrCfg.SourcePath]; !hasPath &&
rdrCfg.Type == utils.MetaFileCSV &&
rdrCfg.RunDelay == time.Duration(-1) { // set the channel to control listen stop
erS.stopLsn[rdrCfg.SourcePath] = make(chan struct{})
if err := erS.watchDir(rdrCfg.SourcePath); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> error scheduling dir watch for config: <%s>, err: <%s>",
utils.ERs, id, err.Error()))
// find out removed ids
for id := range inUseIDs {
if _, has := cfgIDs[id]; !has {
remIDs[id] = struct{}{}
}
}
// find out added ids
for id := range cfgIDs {
if _, has := inUseIDs[id]; !has {
addIDs[id] = struct{}{}
}
}
for id := range remIDs {
ref := inUseIDs[id]
rdrSlc := erS.rdrs[ref.path]
// remove the ids
copy(rdrSlc[ref.idx:], rdrSlc[ref.idx+1:])
rdrSlc[len(rdrSlc)-1] = nil // so it can be garbage collected
rdrSlc = rdrSlc[:len(rdrSlc)-1]
if len(rdrSlc) == 0 { // no more
delete(erS.rdrs, ref.path)
if chn, has := erS.stopLsn[ref.path]; has {
close(chn)
}
}
erS.rdrs[rdrCfg.SourcePath] = append(erS.rdrs[rdrCfg.SourcePath], rdr)
}
// add new ids:
for id := range addIDs {
rdrCfg := erS.cfg.ERsCfg().Readers[cfgIDs[id]]
if rdr, err := NewEventReader(rdrCfg); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> error reloading config with ID: <%s>, err: <%s>",
utils.ERs, id, err.Error()))
} else {
if _, hasPath := erS.rdrs[rdrCfg.SourcePath]; !hasPath &&
rdrCfg.Type == utils.MetaFileCSV &&
rdrCfg.RunDelay == time.Duration(-1) { // set the channel to control listen stop
erS.stopLsn[rdrCfg.SourcePath] = make(chan struct{})
if err := erS.watchDir(rdrCfg.SourcePath); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> error scheduling dir watch for config: <%s>, err: <%s>",
utils.ERs, id, err.Error()))
}
}
erS.rdrs[rdrCfg.SourcePath] = append(erS.rdrs[rdrCfg.SourcePath], rdr)
}
}
erS.Unlock()
}
erS.Unlock()
}
}
// trackFiles
// watchDir sets up a watcher via inotify to be triggered on new files
func (erS *ERService) watchDir(dirPath string) (err error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
@@ -194,5 +196,6 @@ func (erS *ERService) watchDir(dirPath string) (err error) {
}
func (erS *ERService) processPath(path string) (err error) {
fmt.Printf("ERService processPath: <%s>", path)
return
}