Updated config reload

This commit is contained in:
Trial97
2020-01-08 18:00:59 +02:00
parent fc883ad930
commit 47e975d0d0
8 changed files with 194 additions and 322 deletions

View File

@@ -496,11 +496,6 @@ func main() {
return
}
}
if storDBService.ShouldRun() {
if err = storDBService.Start(); err != nil {
return
}
}
// Done initing DBs
engine.SetRoundingDecimals(cfg.GeneralCfg().RoundingDecimals)

View File

@@ -1186,7 +1186,7 @@ func (cfg *CGRConfig) V1ReloadConfigFromPath(args *ConfigReloadWithArgDispatcher
if missing := utils.MissingStructFields(args, []string{"Path"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
if err = cfg.loadConfig(args.Path, args.Section); err != nil {
if err = cfg.loadCfgWithLocks(args.Path, args.Section); err != nil {
return
}
// lock all sections
@@ -1200,252 +1200,18 @@ func (cfg *CGRConfig) V1ReloadConfigFromPath(args *ConfigReloadWithArgDispatcher
return
}
if err = cfg.reloadSection(args.Section); err != nil {
if args.Section == utils.EmptyString || args.Section == utils.MetaAll {
err = cfg.reloadSections(sortedCfgSections...)
} else {
err = cfg.reloadSections(args.Section)
}
if err != nil {
return
}
*reply = utils.OK
return
}
func (cfg *CGRConfig) reloadSection(section string) (err error) {
var fall bool
switch section {
default:
return fmt.Errorf("Invalid section: <%s>", section)
case utils.EmptyString, utils.MetaAll:
fall = true
fallthrough
case GENERAL_JSN: // nothing to reload
if !fall {
break
}
fallthrough
case RPCConnsJsonName: // nothing to reload
if !fall {
break
}
fallthrough
case DATADB_JSN:
cfg.rldChans[DATADB_JSN] <- struct{}{}
time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it)
if !fall {
break
}
fallthrough
case STORDB_JSN:
cfg.rldChans[STORDB_JSN] <- struct{}{}
time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it)
if !fall {
cfg.rldChans[CDRS_JSN] <- struct{}{}
cfg.rldChans[Apier] <- struct{}{}
break
}
fallthrough
case LISTEN_JSN:
if !fall {
break
}
fallthrough
case TlsCfgJson: // nothing to reload
if !fall {
break
}
fallthrough
case HTTP_JSN:
if !fall {
break
}
fallthrough
case SCHEDULER_JSN:
if !fall {
cfg.rldChans[DATADB_JSN] <- struct{}{} // reload datadb before
time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it)
}
cfg.rldChans[SCHEDULER_JSN] <- struct{}{}
if !fall {
break
}
fallthrough
case RALS_JSN:
if !fall {
cfg.rldChans[DATADB_JSN] <- struct{}{} // reload datadb before
cfg.rldChans[STORDB_JSN] <- struct{}{}
time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it)
}
cfg.rldChans[RALS_JSN] <- struct{}{}
if !fall {
break
}
fallthrough
case CDRS_JSN:
if !fall {
cfg.rldChans[DATADB_JSN] <- struct{}{} // reload datadb before
cfg.rldChans[STORDB_JSN] <- struct{}{}
time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it)
}
cfg.rldChans[CDRS_JSN] <- struct{}{}
if !fall {
break
}
fallthrough
case CDRC_JSN:
if !fall {
break
}
fallthrough
case ERsJson:
cfg.rldChans[ERsJson] <- struct{}{}
if !fall {
break
}
fallthrough
case SessionSJson:
if !fall {
cfg.rldChans[DATADB_JSN] <- struct{}{} // reload datadb before
time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it)
}
cfg.rldChans[SessionSJson] <- struct{}{}
if !fall {
break
}
fallthrough
case AsteriskAgentJSN:
cfg.rldChans[AsteriskAgentJSN] <- struct{}{}
if !fall {
break
}
fallthrough
case FreeSWITCHAgentJSN:
cfg.rldChans[FreeSWITCHAgentJSN] <- struct{}{}
if !fall {
break
}
fallthrough
case KamailioAgentJSN:
cfg.rldChans[KamailioAgentJSN] <- struct{}{}
if !fall {
break
}
fallthrough
case DA_JSN:
cfg.rldChans[DA_JSN] <- struct{}{}
if !fall {
break
}
fallthrough
case RA_JSN:
cfg.rldChans[RA_JSN] <- struct{}{}
if !fall {
break
}
fallthrough
case HttpAgentJson:
cfg.rldChans[HttpAgentJson] <- struct{}{}
if !fall {
break
}
fallthrough
case DNSAgentJson:
cfg.rldChans[DNSAgentJson] <- struct{}{}
if !fall {
break
}
fallthrough
case ATTRIBUTE_JSN:
if !fall {
cfg.rldChans[DATADB_JSN] <- struct{}{} // reload datadb before
time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it)
}
cfg.rldChans[ATTRIBUTE_JSN] <- struct{}{}
if !fall {
break
}
fallthrough
case ChargerSCfgJson:
if !fall {
cfg.rldChans[DATADB_JSN] <- struct{}{} // reload datadb before
time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it)
}
cfg.rldChans[ChargerSCfgJson] <- struct{}{}
if !fall {
break
}
fallthrough
case RESOURCES_JSON:
if !fall {
cfg.rldChans[DATADB_JSN] <- struct{}{} // reload datadb before
time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it)
}
cfg.rldChans[RESOURCES_JSON] <- struct{}{}
if !fall {
break
}
fallthrough
case STATS_JSON:
if !fall {
cfg.rldChans[DATADB_JSN] <- struct{}{} // reload datadb before
time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it)
}
cfg.rldChans[STATS_JSON] <- struct{}{}
if !fall {
break
}
fallthrough
case THRESHOLDS_JSON:
if !fall {
cfg.rldChans[DATADB_JSN] <- struct{}{} // reload datadb before
time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it)
}
cfg.rldChans[THRESHOLDS_JSON] <- struct{}{}
if !fall {
break
}
fallthrough
case SupplierSJson:
if !fall {
cfg.rldChans[DATADB_JSN] <- struct{}{} // reload datadb before
time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it)
}
cfg.rldChans[SupplierSJson] <- struct{}{}
if !fall {
break
}
fallthrough
case LoaderJson:
if !fall {
cfg.rldChans[DATADB_JSN] <- struct{}{} // reload datadb before
time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it)
}
cfg.rldChans[LoaderJson] <- struct{}{}
if !fall {
break
}
fallthrough
case DispatcherSJson:
if !fall {
cfg.rldChans[DATADB_JSN] <- struct{}{} // reload datadb before
time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it)
}
cfg.rldChans[DispatcherSJson] <- struct{}{}
if !fall {
break
}
fallthrough
case AnalyzerCfgJson:
if !fall {
break
}
fallthrough
case Apier:
if !fall {
cfg.rldChans[STORDB_JSN] <- struct{}{}
time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it)
}
cfg.rldChans[Apier] <- struct{}{}
}
return
}
func (cfg *CGRConfig) getLoadFunctions() map[string]func(*CgrJsonCfg) error {
return map[string]func(*CgrJsonCfg) error{
GENERAL_JSN: cfg.loadGeneralCfg,
@@ -1487,7 +1253,8 @@ func (cfg *CGRConfig) getLoadFunctions() map[string]func(*CgrJsonCfg) error {
RPCConnsJsonName: cfg.loadRPCConns,
}
}
func (cfg *CGRConfig) loadConfig(path, section string) (err error) {
func (cfg *CGRConfig) loadCfgWithLocks(path, section string) (err error) {
var loadFuncs []func(*CgrJsonCfg) error
loadMap := cfg.getLoadFunctions()
if section == utils.EmptyString || section == utils.MetaAll {
@@ -1638,7 +1405,7 @@ func (cfg *CGRConfig) V1ReloadConfigFromJSON(args *JSONReloadWithArgDispatcher,
return
}
if err = cfg.loadConfigFromJSON(bytes.NewBuffer(b), sections); err != nil {
if err = cfg.loadCfgFromJSONWithLocks(bytes.NewBuffer(b), sections); err != nil {
return
}
@@ -1649,46 +1416,15 @@ func (cfg *CGRConfig) V1ReloadConfigFromJSON(args *JSONReloadWithArgDispatcher,
cfg.rUnlockSections() // unlock before checking the error
err = cfg.reloadSections(sections...)
if err != nil {
return
}
subsystemsThatNeedDataDB := utils.NewStringSet([]string{DATADB_JSN, SCHEDULER_JSN,
RALS_JSN, CDRS_JSN, SessionSJson, ATTRIBUTE_JSN,
ChargerSCfgJson, RESOURCES_JSON, STATS_JSON, THRESHOLDS_JSON,
SupplierSJson, LoaderJson, DispatcherSJson})
needsDataDB := false
for _, section := range sections {
if subsystemsThatNeedDataDB.Has(section) {
needsDataDB = true
cfg.rldChans[DATADB_JSN] <- struct{}{} // reload datadb before
break
}
}
subsystemsThatNeedStorDB := utils.NewStringSet([]string{STORDB_JSN, RALS_JSN, CDRS_JSN, Apier})
needsStorDB := false
for _, section := range sections {
if subsystemsThatNeedStorDB.Has(section) {
needsStorDB = true
cfg.rldChans[STORDB_JSN] <- struct{}{} // reload stordb before
break
}
}
time.Sleep(1)
for _, section := range sections {
if needsDataDB && section == DATADB_JSN { // already reloaded
continue
}
if needsStorDB && section == STORDB_JSN {
continue
}
cfg.rldChans[section] <- struct{}{}
}
*reply = utils.OK
return
}
func (cfg *CGRConfig) loadConfigFromJSON(rdr io.Reader, sections []string) (err error) {
func (cfg *CGRConfig) loadCfgFromJSONWithLocks(rdr io.Reader, sections []string) (err error) {
var loadFuncs []func(*CgrJsonCfg) error
loadMap := cfg.getLoadFunctions()
for _, section := range sections {
@@ -1702,3 +1438,86 @@ func (cfg *CGRConfig) loadConfigFromJSON(rdr io.Reader, sections []string) (err
}
return cfg.loadConfigFromReader(rdr, loadFuncs)
}
func (cfg *CGRConfig) reloadSections(sections ...string) (err error) {
subsystemsThatNeedDataDB := utils.NewStringSet([]string{DATADB_JSN, SCHEDULER_JSN,
RALS_JSN, CDRS_JSN, SessionSJson, ATTRIBUTE_JSN,
ChargerSCfgJson, RESOURCES_JSON, STATS_JSON, THRESHOLDS_JSON,
SupplierSJson, LoaderJson, DispatcherSJson})
subsystemsThatNeedStorDB := utils.NewStringSet([]string{STORDB_JSN, RALS_JSN, CDRS_JSN, Apier})
needsDataDB := false
needsStorDB := false
for _, section := range sections {
if !needsDataDB && subsystemsThatNeedDataDB.Has(section) {
needsDataDB = true
cfg.rldChans[DATADB_JSN] <- struct{}{} // reload datadb before
}
if !needsStorDB && subsystemsThatNeedStorDB.Has(section) {
needsStorDB = true
cfg.rldChans[STORDB_JSN] <- struct{}{} // reload stordb before
}
if needsDataDB && needsStorDB {
break
}
}
time.Sleep(1)
for _, section := range sections {
switch section {
default:
return fmt.Errorf("Invalid section: <%s>", section)
case GENERAL_JSN: // nothing to reload
case RPCConnsJsonName: // nothing to reload
case DATADB_JSN: // reloaded before
case STORDB_JSN: // reloaded before
case LISTEN_JSN:
case TlsCfgJson: // nothing to reload
case HTTP_JSN:
case SCHEDULER_JSN:
cfg.rldChans[SCHEDULER_JSN] <- struct{}{}
case RALS_JSN:
cfg.rldChans[RALS_JSN] <- struct{}{}
case CDRS_JSN:
cfg.rldChans[CDRS_JSN] <- struct{}{}
case CDRC_JSN:
case ERsJson:
cfg.rldChans[ERsJson] <- struct{}{}
case SessionSJson:
cfg.rldChans[SessionSJson] <- struct{}{}
case AsteriskAgentJSN:
cfg.rldChans[AsteriskAgentJSN] <- struct{}{}
case FreeSWITCHAgentJSN:
cfg.rldChans[FreeSWITCHAgentJSN] <- struct{}{}
case KamailioAgentJSN:
cfg.rldChans[KamailioAgentJSN] <- struct{}{}
case DA_JSN:
cfg.rldChans[DA_JSN] <- struct{}{}
case RA_JSN:
cfg.rldChans[RA_JSN] <- struct{}{}
case HttpAgentJson:
cfg.rldChans[HttpAgentJson] <- struct{}{}
case DNSAgentJson:
cfg.rldChans[DNSAgentJson] <- struct{}{}
case ATTRIBUTE_JSN:
cfg.rldChans[ATTRIBUTE_JSN] <- struct{}{}
case ChargerSCfgJson:
cfg.rldChans[ChargerSCfgJson] <- struct{}{}
case RESOURCES_JSON:
cfg.rldChans[RESOURCES_JSON] <- struct{}{}
case STATS_JSON:
cfg.rldChans[STATS_JSON] <- struct{}{}
case THRESHOLDS_JSON:
cfg.rldChans[THRESHOLDS_JSON] <- struct{}{}
case SupplierSJson:
cfg.rldChans[SupplierSJson] <- struct{}{}
case LoaderJson:
cfg.rldChans[LoaderJson] <- struct{}{}
case DispatcherSJson:
cfg.rldChans[DispatcherSJson] <- struct{}{}
case AnalyzerCfgJson:
case Apier:
cfg.rldChans[Apier] <- struct{}{}
}
return
}
return
}

View File

@@ -64,6 +64,9 @@ type ApierV1Service struct {
api *v1.ApierV1
connChan chan rpcclient.ClientConnector
syncStop chan struct{}
storDBChan chan engine.StorDB
}
// Start should handle the sercive start
@@ -79,10 +82,15 @@ func (api *ApierV1Service) Start() (err error) {
api.Lock()
defer api.Unlock()
api.storDBChan = make(chan engine.StorDB, 1)
api.syncStop = make(chan struct{})
api.storDB.RegisterSyncChan(api.storDBChan)
stordb := <-api.storDBChan
api.api = &v1.ApierV1{
DataManager: api.dm.GetDM(),
CdrDb: api.storDB.GetDM(),
StorDb: api.storDB.GetDM(),
CdrDb: stordb,
StorDb: stordb,
Config: api.cfg,
Responder: api.responderService.GetResponder(),
SchedulerService: api.schedService,
@@ -102,6 +110,7 @@ func (api *ApierV1Service) Start() (err error) {
utils.RegisterRpcParams("", api.api)
api.connChan <- api.api
go api.sync()
return
}
@@ -113,20 +122,16 @@ func (api *ApierV1Service) GetIntenternalChan() (conn chan rpcclient.ClientConne
// Reload handles the change of config
func (api *ApierV1Service) Reload() (err error) {
api.Lock()
if api.storDB.WasReconnected() { // rewrite the connection if was changed
api.api.SetStorDB(api.storDB.GetDM())
}
api.Unlock()
return
}
// Shutdown stops the service
func (api *ApierV1Service) Shutdown() (err error) {
api.Lock()
defer api.Unlock()
close(api.syncStop)
api.api = nil
<-api.connChan
api.Unlock()
return
}
@@ -153,3 +158,22 @@ func (api *ApierV1Service) GetApierV1() *v1.ApierV1 {
func (api *ApierV1Service) ShouldRun() bool {
return api.cfg.RalsCfg().Enabled
}
// sync handles stordb sync
func (api *ApierV1Service) sync() {
for {
select {
case <-api.syncStop:
return
case stordb, ok := <-api.storDBChan:
if !ok { // the chanel was closed by the shutdown of stordbService
return
}
api.Lock()
if api.api != nil {
api.api.SetStorDB(stordb)
}
api.Unlock()
}
}
}

View File

@@ -61,6 +61,9 @@ type CDRServer struct {
rpcv2 *v2.CDRsV2
connChan chan rpcclient.ClientConnector
connMgr *engine.ConnManager
syncStop chan struct{}
storDBChan chan engine.StorDB
}
// Start should handle the sercive start
@@ -76,7 +79,13 @@ func (cdrS *CDRServer) Start() (err error) {
cdrS.Lock()
defer cdrS.Unlock()
cdrS.cdrS = engine.NewCDRServer(cdrS.cfg, cdrS.storDB.GetDM(), cdrS.dm.GetDM(),
cdrS.storDBChan = make(chan engine.StorDB, 1)
cdrS.syncStop = make(chan struct{})
cdrS.storDB.RegisterSyncChan(cdrS.storDBChan)
stordb := <-cdrS.storDBChan
cdrS.cdrS = engine.NewCDRServer(cdrS.cfg, stordb, cdrS.dm.GetDM(),
filterS, cdrS.connMgr)
utils.Logger.Info("Registering CDRS HTTP Handlers.")
cdrS.cdrS.RegisterHandlersToServer(cdrS.server)
@@ -88,6 +97,7 @@ func (cdrS *CDRServer) Start() (err error) {
// Make the cdr server available for internal communication
cdrS.server.RpcRegister(cdrS.cdrS) // register CdrServer for internal usage (TODO: refactor this)
cdrS.connChan <- cdrS.cdrS // Signal that cdrS is operational
go cdrS.sync()
return
}
@@ -98,18 +108,13 @@ func (cdrS *CDRServer) GetIntenternalChan() (conn chan rpcclient.ClientConnector
// Reload handles the change of config
func (cdrS *CDRServer) Reload() (err error) {
cdrS.Lock()
if cdrS.storDB.WasReconnected() { // rewrite the connection if was changed
cdrS.cdrS.SetStorDB(cdrS.storDB.GetDM())
}
cdrS.Unlock()
return
}
// Shutdown stops the service
func (cdrS *CDRServer) Shutdown() (err error) {
cdrS.Lock()
close(cdrS.syncStop)
cdrS.cdrS = nil
cdrS.rpcv1 = nil
cdrS.rpcv2 = nil
@@ -134,3 +139,22 @@ func (cdrS *CDRServer) ServiceName() string {
func (cdrS *CDRServer) ShouldRun() bool {
return cdrS.cfg.CdrsCfg().Enabled
}
// sync handles stordb sync
func (cdrS *CDRServer) sync() {
for {
select {
case <-cdrS.syncStop:
return
case stordb, ok := <-cdrS.storDBChan:
if !ok { // the chanel was closed by the shutdown of stordbService
return
}
cdrS.Lock()
if cdrS.cdrS != nil {
cdrS.cdrS.SetStorDB(stordb)
}
cdrS.Unlock()
}
}
}

View File

@@ -58,7 +58,6 @@ func TestCdrsReload(t *testing.T) {
close(chS.GetPrecacheChannel(utils.CacheTimings))
cfg.ChargerSCfg().Enabled = true
cfg.RalsCfg().Enabled = true
internalChan := make(chan rpcclient.ClientConnector, 1)
internalChan <- nil
cacheSChan := make(chan rpcclient.ClientConnector, 1)
@@ -95,6 +94,7 @@ func TestCdrsReload(t *testing.T) {
if stordb.IsRunning() {
t.Errorf("Expected service to be down")
}
cfg.RalsCfg().Enabled = true
var reply string
if err := cfg.V1ReloadConfigFromPath(&config.ConfigReloadWithArgDispatcher{
Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo"),

View File

@@ -32,8 +32,7 @@ import (
// NewStorDBService returns the StorDB Service
func NewStorDBService(cfg *config.CGRConfig) *StorDBService {
return &StorDBService{
cfg: cfg,
dbchan: make(chan engine.StorDB, 1),
cfg: cfg,
// db: engine.NewInternalDB([]string{}, []string{}), // to be removed
}
}
@@ -44,8 +43,8 @@ type StorDBService struct {
cfg *config.CGRConfig
oldDBCfg *config.StorDbCfg
db engine.StorDB
dbchan chan engine.StorDB
db engine.StorDB
syncChans []chan engine.StorDB
reconnected bool
}
@@ -73,7 +72,7 @@ func (db *StorDBService) Start() (err error) {
fmt.Println(err)
return
}
db.dbchan <- db.db
db.sync()
return
}
@@ -98,6 +97,7 @@ func (db *StorDBService) Reload() (err error) {
db.db.Close()
db.db = d
db.oldDBCfg = db.cfg.StorDbCfg().Clone()
db.sync() // sync only if needed
return
}
if db.cfg.StorDbCfg().Type == utils.MONGO {
@@ -135,6 +135,10 @@ func (db *StorDBService) Shutdown() (err error) {
db.db.Close()
db.db = nil
db.reconnected = false
for _, c := range db.syncChans {
close(c)
}
db.syncChans = nil
db.Unlock()
return
}
@@ -143,6 +147,11 @@ func (db *StorDBService) Shutdown() (err error) {
func (db *StorDBService) IsRunning() bool {
db.RLock()
defer db.RUnlock()
return db.isRunning()
}
// isRunning returns if the service is running (not thread safe)
func (db *StorDBService) isRunning() bool {
return db != nil && db.db != nil
}
@@ -156,11 +165,25 @@ func (db *StorDBService) ShouldRun() bool {
return db.cfg.RalsCfg().Enabled || db.cfg.CdrsCfg().Enabled
}
// GetDM returns the StorDB
func (db *StorDBService) GetDM() engine.StorDB {
db.RLock()
defer db.RUnlock()
return db.db
// RegisterSyncChan used by dependent subsystems to register a chanel to reload only the storDB(thread safe)
func (db *StorDBService) RegisterSyncChan(c chan engine.StorDB) {
db.Lock()
db.syncChans = append(db.syncChans, c)
if db.isRunning() {
for _, c := range db.syncChans {
c <- db.db
}
}
db.Unlock()
}
// sync sends the storDB over syncChansv (not thrad safe)
func (db *StorDBService) sync() {
if db.isRunning() {
for _, c := range db.syncChans {
c <- db.db
}
}
}
// needsConnectionReload returns if the DB connection needs to reloaded
@@ -179,17 +202,3 @@ func (db *StorDBService) needsConnectionReload() bool {
}
return false
}
// GetStorDBchan returns the StorDB chanel
func (db *StorDBService) GetStorDBchan() chan engine.StorDB {
db.RLock()
defer db.RUnlock()
return db.dbchan
}
// WasReconnected returns if after reload the DB was recreated
func (db *StorDBService) WasReconnected() bool {
db.RLock()
defer db.RUnlock()
return db.reconnected
}

View File

@@ -147,6 +147,7 @@ func (srvMngr *ServiceManager) GetConfig() *config.CGRConfig {
func (srvMngr *ServiceManager) StartServices() (err error) {
go srvMngr.handleReload()
for serviceName, shouldRun := range map[string]bool{
utils.StorDB: srvMngr.GetConfig().RalsCfg().Enabled || srvMngr.GetConfig().CdrsCfg().Enabled,
utils.AttributeS: srvMngr.GetConfig().AttributeSCfg().Enabled,
utils.ChargerS: srvMngr.GetConfig().ChargerSCfg().Enabled,
utils.ThresholdS: srvMngr.GetConfig().ThresholdSCfg().Enabled,
@@ -344,7 +345,7 @@ func (srvMngr *ServiceManager) startService(srviceName string) {
}
// GetService returns the named service
func (srvMngr ServiceManager) GetService(subsystem string) (srv Service) {
func (srvMngr *ServiceManager) GetService(subsystem string) (srv Service) {
var has bool
srvMngr.RLock()
srv, has = srvMngr.subsystems[subsystem]