mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
CDRE config reload, fixes #110, improved handling of the CDRC reloads without sync
This commit is contained in:
@@ -23,12 +23,12 @@ import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
type AttrReloadCdrcConfig struct {
|
||||
type AttrReloadConfig struct {
|
||||
ConfigDir string
|
||||
}
|
||||
|
||||
// Retrieves the callCost out of CGR logDb
|
||||
func (apier *ApierV1) ReloadCdrcConfig(attrs AttrReloadCdrcConfig, reply *string) error {
|
||||
func (apier *ApierV1) ReloadCdrcConfig(attrs AttrReloadConfig, reply *string) error {
|
||||
if attrs.ConfigDir == "" {
|
||||
attrs.ConfigDir = utils.CONFIG_DIR
|
||||
}
|
||||
@@ -36,9 +36,8 @@ func (apier *ApierV1) ReloadCdrcConfig(attrs AttrReloadCdrcConfig, reply *string
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
apier.Config.CdrcProfiles = newCfg.CdrcProfiles
|
||||
reloadChan := apier.Config.GetConfigReloadsItem(utils.CDRC)
|
||||
close(reloadChan) // Ask for reload
|
||||
apier.Config.CdrcProfiles = newCfg.CdrcProfiles // ToDo: Check if there can be any concurency involved here so we need to lock maybe
|
||||
apier.Config.ConfigReloads[utils.CDRC] <- struct{}{}
|
||||
*reply = OK
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -34,6 +34,8 @@ import (
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/cgrates/cgrates/cdre"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
@@ -92,6 +94,9 @@ func (self *ApierV1) ExportCdrsToZipString(attr utils.AttrExpFileCdrs, reply *st
|
||||
// Export Cdrs to file
|
||||
func (self *ApierV1) ExportCdrsToFile(attr utils.AttrExpFileCdrs, reply *utils.ExportedFileCdrs) error {
|
||||
var err error
|
||||
|
||||
cdreReloadStruct := <-self.Config.ConfigReloads[utils.CDRE] // Read the content of the channel, locking it
|
||||
defer func() { self.Config.ConfigReloads[utils.CDRE] <- cdreReloadStruct }() // Unlock reloads at exit
|
||||
exportTemplate := self.Config.CdreProfiles[utils.META_DEFAULT]
|
||||
if attr.ExportTemplate != nil && len(*attr.ExportTemplate) != 0 { // Export template prefered, use it
|
||||
var hasIt bool
|
||||
@@ -206,3 +211,20 @@ func (self *ApierV1) RemCdrs(attrs utils.AttrRemCdrs, reply *string) error {
|
||||
*reply = "OK"
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reloads CDRE configuration out of folder specified
|
||||
func (apier *ApierV1) ReloadCdreConfig(attrs AttrReloadConfig, reply *string) error {
|
||||
if attrs.ConfigDir == "" {
|
||||
attrs.ConfigDir = utils.CONFIG_DIR
|
||||
}
|
||||
newCfg, err := config.NewCGRConfigFromFolder(attrs.ConfigDir)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
cdreReloadStruct := <-apier.Config.ConfigReloads[utils.CDRE] // Get the CDRE reload channel // Read the content of the channel, locking it
|
||||
apier.Config.CdreProfiles = newCfg.CdreProfiles
|
||||
apier.Config.ConfigReloads[utils.CDRE] <- cdreReloadStruct // Unlock reloads
|
||||
engine.Logger.Info("<CDRE> Configuration reloaded")
|
||||
*reply = OK
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -33,6 +33,8 @@ import (
|
||||
// Export Cdrs to file
|
||||
func (self *ApierV2) ExportCdrsToFile(attr utils.AttrExportCdrsToFile, reply *utils.ExportedFileCdrs) error {
|
||||
var err error
|
||||
cdreReloadStruct := <-self.Config.ConfigReloads[utils.CDRE] // Read the content of the channel, locking it
|
||||
defer func() { self.Config.ConfigReloads[utils.CDRE] <- cdreReloadStruct }() // Unlock reloads at exit
|
||||
exportTemplate := self.Config.CdreProfiles[utils.META_DEFAULT]
|
||||
if attr.ExportTemplate != nil && len(*attr.ExportTemplate) != 0 { // Export template prefered, use it
|
||||
var hasIt bool
|
||||
|
||||
@@ -174,8 +174,7 @@ func (self *Cdrc) Run() error {
|
||||
// Not automated, process and sleep approach
|
||||
for {
|
||||
select {
|
||||
case closeChan := <-self.closeChan: // Exit, reinject closeChan for other CDRCs
|
||||
self.closeChan <- closeChan
|
||||
case <-self.closeChan: // Exit, reinject closeChan for other CDRCs
|
||||
engine.Logger.Info(fmt.Sprintf("<Cdrc> Shutting down CDRC on path %s.", self.cdrInDir))
|
||||
return nil
|
||||
default:
|
||||
|
||||
@@ -71,8 +71,8 @@ var (
|
||||
)
|
||||
|
||||
func startCdrcs(internalCdrSChan chan *engine.CdrServer, internalRaterChan chan *engine.Responder, exitChan chan bool) {
|
||||
cfgReloadsChan := cfg.GetConfigReloadsItem(utils.CDRC)
|
||||
for {
|
||||
cdrcChildrenChan := make(chan struct{}) // Will use it to communicate with the children of one fork
|
||||
for _, cdrcCfgs := range cfg.CdrcProfiles {
|
||||
var cdrcCfg *config.CdrcConfig
|
||||
for _, cdrcCfg = range cdrcCfgs { // Take a random config out since they should be the same
|
||||
@@ -81,15 +81,14 @@ func startCdrcs(internalCdrSChan chan *engine.CdrServer, internalRaterChan chan
|
||||
if cdrcCfg.Enabled == false {
|
||||
continue // Ignore not enabled
|
||||
}
|
||||
go startCdrc(internalCdrSChan, internalRaterChan, cdrcCfgs, cfg.HttpSkipTlsVerify, cfgReloadsChan, exitChan)
|
||||
go startCdrc(internalCdrSChan, internalRaterChan, cdrcCfgs, cfg.HttpSkipTlsVerify, cdrcChildrenChan, exitChan)
|
||||
}
|
||||
select {
|
||||
case <-exitChan: // Stop forking CDRCs
|
||||
break
|
||||
case <-cfgReloadsChan: // Will release another start as soon as channel will be closed
|
||||
engine.Logger.Info("Reloading CDRC configuration")
|
||||
cfgReloadsChan = make(chan struct{}) // Schedule waiting again for another reload
|
||||
cfg.SetConfigReloadsItem(utils.CDRC, cfgReloadsChan)
|
||||
case <-cfg.ConfigReloads[utils.CDRC]: // Consume the load request and wait for a new one
|
||||
close(cdrcChildrenChan) // Stop all the children of the previous run
|
||||
engine.Logger.Info("<CDRC> Configuration reload")
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,7 +25,6 @@ import (
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -65,8 +64,11 @@ func NewDefaultCGRConfig() (*CGRConfig, error) {
|
||||
cfg.SmFsConfig = new(SmFsConfig)
|
||||
cfg.SmKamConfig = new(SmKamConfig)
|
||||
cfg.SmOsipsConfig = new(SmOsipsConfig)
|
||||
cfg.configReloads = make(map[string]chan struct{})
|
||||
cfg.configReloads[utils.CDRC] = make(chan struct{})
|
||||
cfg.ConfigReloads = make(map[string]chan struct{})
|
||||
cfg.ConfigReloads[utils.CDRC] = make(chan struct{}, 1)
|
||||
cfg.ConfigReloads[utils.CDRC] <- struct{}{} // Unlock the channel
|
||||
cfg.ConfigReloads[utils.CDRE] = make(chan struct{}, 1)
|
||||
cfg.ConfigReloads[utils.CDRE] <- struct{}{} // Unlock the channel
|
||||
cgrJsonCfg, err := NewCgrJsonCfgFromReader(strings.NewReader(CGRATES_CFG_JSON))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -79,7 +81,6 @@ func NewDefaultCGRConfig() (*CGRConfig, error) {
|
||||
cfg.dfltCdrcProfile = cfg.CdrcProfiles["/var/log/cgrates/cdrc/in"][utils.META_DEFAULT].Clone()
|
||||
dfltFsConnConfig = cfg.SmFsConfig.Connections[0] // We leave it crashing here on purpose if no Connection defaults defined
|
||||
dfltKamConnConfig = cfg.SmKamConfig.Connections[0]
|
||||
cfg.cfgReloadsMutex = new(sync.RWMutex)
|
||||
if err := cfg.checkConfigSanity(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -235,11 +236,10 @@ type CGRConfig struct {
|
||||
MailerAuthPass string // Authenticate to email server with this password
|
||||
MailerFromAddr string // From address used when sending emails out
|
||||
DataFolderPath string // Path towards data folder, for tests internal usage, not loading out of .json options
|
||||
configReloads map[string]chan struct{} // Signals to specific entities that a config reload should occur
|
||||
ConfigReloads map[string]chan struct{} // Signals to specific entities that a config reload should occur
|
||||
// Cache defaults loaded from json and needing clones
|
||||
dfltCdreProfile *CdreConfig // Default cdreConfig profile
|
||||
dfltCdrcProfile *CdrcConfig // Default cdrcConfig profile
|
||||
cfgReloadsMutex *sync.RWMutex // cfgReloadsMutex parts of the configuration for reloads and such
|
||||
dfltCdreProfile *CdreConfig // Default cdreConfig profile
|
||||
dfltCdrcProfile *CdrcConfig // Default cdrcConfig profile
|
||||
}
|
||||
|
||||
func (self *CGRConfig) checkConfigSanity() error {
|
||||
@@ -784,17 +784,3 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Used to access config reloads map from outside
|
||||
func (self *CGRConfig) GetConfigReloadsItem(itemKey string) chan struct{} {
|
||||
self.cfgReloadsMutex.RLock()
|
||||
defer self.cfgReloadsMutex.RUnlock()
|
||||
return self.configReloads[itemKey]
|
||||
}
|
||||
|
||||
// Used to set config reloads map from outside
|
||||
func (self *CGRConfig) SetConfigReloadsItem(itemKey string, itemVal chan struct{}) {
|
||||
self.cfgReloadsMutex.Lock()
|
||||
defer self.cfgReloadsMutex.Unlock()
|
||||
self.configReloads[itemKey] = itemVal
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ func init() {
|
||||
type CmdCdrcConfigReload struct {
|
||||
name string
|
||||
rpcMethod string
|
||||
rpcParams *v1.AttrReloadCdrcConfig
|
||||
rpcParams *v1.AttrReloadConfig
|
||||
*CommandExecuter
|
||||
}
|
||||
|
||||
@@ -49,7 +49,7 @@ func (self *CmdCdrcConfigReload) RpcMethod() string {
|
||||
|
||||
func (self *CmdCdrcConfigReload) RpcParams(reset bool) interface{} {
|
||||
if reset || self.rpcParams == nil {
|
||||
self.rpcParams = new(v1.AttrReloadCdrcConfig)
|
||||
self.rpcParams = new(v1.AttrReloadConfig)
|
||||
}
|
||||
return self.rpcParams
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user