From eff74851b7e24a9724b0bff0cb986e05b95ff9dd Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 9 Jan 2020 10:49:25 +0200 Subject: [PATCH] Updated dataDB reload --- services/apierv1.go | 7 +++++-- services/attributes.go | 5 ++++- services/cdrs.go | 6 ++++-- services/chargers.go | 5 ++++- services/datadb.go | 1 - services/dispatchers.go | 5 ++++- services/loaders.go | 11 +++++++++-- services/resources.go | 5 ++++- services/schedulers.go | 5 ++++- services/sessions.go | 9 +++++++-- services/stats.go | 5 ++++- services/suppliers.go | 5 ++++- services/thresholds.go | 5 ++++- 13 files changed, 57 insertions(+), 17 deletions(-) diff --git a/services/apierv1.go b/services/apierv1.go index 403c28bbf..cfb348806 100644 --- a/services/apierv1.go +++ b/services/apierv1.go @@ -78,6 +78,9 @@ func (api *ApierV1Service) Start() (err error) { filterS := <-api.filterSChan api.filterSChan <- filterS + dbchan := api.dm.GetDMChan() + datadb := <-dbchan + dbchan <- datadb api.Lock() defer api.Unlock() @@ -88,7 +91,7 @@ func (api *ApierV1Service) Start() (err error) { stordb := <-api.storDBChan api.api = &v1.ApierV1{ - DataManager: api.dm.GetDM(), + DataManager: datadb, CdrDb: stordb, StorDb: stordb, Config: api.cfg, @@ -102,7 +105,7 @@ func (api *ApierV1Service) Start() (err error) { if !api.cfg.DispatcherSCfg().Enabled { api.server.RpcRegister(api.api) - api.server.RpcRegister(v1.NewReplicatorSv1(api.dm.GetDM())) + api.server.RpcRegister(v1.NewReplicatorSv1(datadb)) } utils.RegisterRpcParams("", &v1.CDRsV1{}) diff --git a/services/attributes.go b/services/attributes.go index 1fa3a8a53..1d1828b59 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -69,10 +69,13 @@ func (attrS *AttributeService) Start() (err error) { filterS := <-attrS.filterSChan attrS.filterSChan <- filterS + dbchan := attrS.dm.GetDMChan() + datadb := <-dbchan + dbchan <- datadb attrS.Lock() defer attrS.Unlock() - attrS.attrS, err = engine.NewAttributeService(attrS.dm.GetDM(), filterS, attrS.cfg) + attrS.attrS, err = engine.NewAttributeService(datadb, filterS, attrS.cfg) if err != nil { utils.Logger.Crit( fmt.Sprintf("<%s> Could not init, error: %s", diff --git a/services/cdrs.go b/services/cdrs.go index 8f16b17d0..dee2c4bd7 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -76,6 +76,9 @@ func (cdrS *CDRServer) Start() (err error) { filterS := <-cdrS.filterSChan cdrS.filterSChan <- filterS + dbchan := cdrS.dm.GetDMChan() + datadb := <-dbchan + dbchan <- datadb cdrS.Lock() defer cdrS.Unlock() @@ -85,8 +88,7 @@ func (cdrS *CDRServer) Start() (err error) { cdrS.storDB.RegisterSyncChan(cdrS.storDBChan) stordb := <-cdrS.storDBChan - cdrS.cdrS = engine.NewCDRServer(cdrS.cfg, stordb, cdrS.dm.GetDM(), - filterS, cdrS.connMgr) + cdrS.cdrS = engine.NewCDRServer(cdrS.cfg, stordb, datadb, filterS, cdrS.connMgr) utils.Logger.Info("Registering CDRS HTTP Handlers.") cdrS.cdrS.RegisterHandlersToServer(cdrS.server) utils.Logger.Info("Registering CDRS RPC service.") diff --git a/services/chargers.go b/services/chargers.go index 715945057..9f384e56a 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -71,10 +71,13 @@ func (chrS *ChargerService) Start() (err error) { filterS := <-chrS.filterSChan chrS.filterSChan <- filterS + dbchan := chrS.dm.GetDMChan() + datadb := <-dbchan + dbchan <- datadb chrS.Lock() defer chrS.Unlock() - if chrS.chrS, err = engine.NewChargerService(chrS.dm.GetDM(), filterS, chrS.cfg, chrS.connMgr); err != nil { + if chrS.chrS, err = engine.NewChargerService(datadb, filterS, chrS.cfg, chrS.connMgr); err != nil { utils.Logger.Crit( fmt.Sprintf("<%s> Could not init, error: %s", utils.ChargerS, err.Error())) diff --git a/services/datadb.go b/services/datadb.go index 3ea41ad63..2fadb2a1a 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -150,7 +150,6 @@ func (db *DataDBService) GetDM() *engine.DataManager { // needsConnectionReload returns if the DB connection needs to reloaded func (db *DataDBService) needsConnectionReload() bool { - if db.oldDBCfg.DataDbType != db.cfg.DataDbCfg().DataDbType || db.oldDBCfg.DataDbHost != db.cfg.DataDbCfg().DataDbHost || db.oldDBCfg.DataDbName != db.cfg.DataDbCfg().DataDbName || diff --git a/services/dispatchers.go b/services/dispatchers.go index 41ddc8d2c..4b6ec29fa 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -72,11 +72,14 @@ func (dspS *DispatcherService) Start() (err error) { <-dspS.cacheS.GetPrecacheChannel(utils.CacheDispatcherProfiles) <-dspS.cacheS.GetPrecacheChannel(utils.CacheDispatcherHosts) <-dspS.cacheS.GetPrecacheChannel(utils.CacheDispatcherFilterIndexes) + dbchan := dspS.dm.GetDMChan() + datadb := <-dbchan + dbchan <- datadb dspS.Lock() defer dspS.Unlock() - if dspS.dspS, err = dispatchers.NewDispatcherService(dspS.dm.GetDM(), dspS.cfg, fltrS, dspS.connMgr); err != nil { + if dspS.dspS, err = dispatchers.NewDispatcherService(datadb, dspS.cfg, fltrS, dspS.connMgr); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherS, err.Error())) return } diff --git a/services/loaders.go b/services/loaders.go index c5b0faa53..74bd5250b 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -70,11 +70,14 @@ func (ldrs *LoaderService) Start() (err error) { filterS := <-ldrs.filterSChan ldrs.filterSChan <- filterS + dbchan := ldrs.dm.GetDMChan() + datadb := <-dbchan + dbchan <- datadb ldrs.Lock() defer ldrs.Unlock() - ldrs.ldrs = loaders.NewLoaderService(ldrs.dm.GetDM(), ldrs.cfg.LoaderCfg(), + ldrs.ldrs = loaders.NewLoaderService(datadb, ldrs.cfg.LoaderCfg(), ldrs.cfg.GeneralCfg().DefaultTimezone, ldrs.exitChan, filterS, ldrs.connMgr) if !ldrs.ldrs.Enabled() { return @@ -94,9 +97,13 @@ func (ldrs *LoaderService) GetIntenternalChan() (conn chan rpcclient.ClientConne func (ldrs *LoaderService) Reload() (err error) { filterS := <-ldrs.filterSChan ldrs.filterSChan <- filterS + dbchan := ldrs.dm.GetDMChan() + datadb := <-dbchan + dbchan <- datadb + ldrs.RLock() - ldrs.ldrs.Reload(ldrs.dm.GetDM(), ldrs.cfg.LoaderCfg(), ldrs.cfg.GeneralCfg().DefaultTimezone, + ldrs.ldrs.Reload(datadb, ldrs.cfg.LoaderCfg(), ldrs.cfg.GeneralCfg().DefaultTimezone, ldrs.exitChan, filterS, ldrs.connMgr) ldrs.RUnlock() return diff --git a/services/resources.go b/services/resources.go index ab9fd474d..3e96c1583 100644 --- a/services/resources.go +++ b/services/resources.go @@ -73,10 +73,13 @@ func (reS *ResourceService) Start() (err error) { filterS := <-reS.filterSChan reS.filterSChan <- filterS + dbchan := reS.dm.GetDMChan() + datadb := <-dbchan + dbchan <- datadb reS.Lock() defer reS.Unlock() - reS.reS, err = engine.NewResourceService(reS.dm.GetDM(), reS.cfg, filterS, reS.connMgr) + reS.reS, err = engine.NewResourceService(datadb, reS.cfg, filterS, reS.connMgr) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.ResourceS, err.Error())) return diff --git a/services/schedulers.go b/services/schedulers.go index 6753af3d1..1ba2196e2 100644 --- a/services/schedulers.go +++ b/services/schedulers.go @@ -71,11 +71,14 @@ func (schS *SchedulerService) Start() (err error) { fltrS := <-schS.fltrSChan schS.fltrSChan <- fltrS + dbchan := schS.dm.GetDMChan() + datadb := <-dbchan + dbchan <- datadb schS.Lock() defer schS.Unlock() utils.Logger.Info(" Starting CGRateS Scheduler.") - schS.schS = scheduler.NewScheduler(schS.dm.GetDM(), schS.cfg, fltrS) + schS.schS = scheduler.NewScheduler(datadb, schS.cfg, fltrS) go schS.schS.Loop() schS.rpc = v1.NewSchedulerSv1(schS.cfg) diff --git a/services/sessions.go b/services/sessions.go index 0e09a3e51..ab6f30e6d 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -69,11 +69,16 @@ func (smg *SessionService) Start() (err error) { if smg.IsRunning() { return fmt.Errorf("service aleady running") } - + var datadb *engine.DataManager + if smg.dm.IsRunning() { + dbchan := smg.dm.GetDMChan() + datadb = <-dbchan + dbchan <- datadb + } smg.Lock() defer smg.Unlock() - smg.sm = sessions.NewSessionS(smg.cfg, smg.dm.GetDM(), smg.connMgr) + smg.sm = sessions.NewSessionS(smg.cfg, datadb, smg.connMgr) //start sync session in a separate gorutine go func(sm *sessions.SessionS) { if err = sm.ListenAndServe(smg.exitChan); err != nil { diff --git a/services/stats.go b/services/stats.go index 7d3ae4914..a9e2edfa3 100644 --- a/services/stats.go +++ b/services/stats.go @@ -72,10 +72,13 @@ func (sts *StatService) Start() (err error) { filterS := <-sts.filterSChan sts.filterSChan <- filterS + dbchan := sts.dm.GetDMChan() + datadb := <-dbchan + dbchan <- datadb sts.Lock() defer sts.Unlock() - sts.sts, err = engine.NewStatService(sts.dm.GetDM(), sts.cfg, filterS, sts.connMgr) + sts.sts, err = engine.NewStatService(datadb, sts.cfg, filterS, sts.connMgr) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not init, error: %s", err.Error())) return diff --git a/services/suppliers.go b/services/suppliers.go index 507397565..0c22090d5 100644 --- a/services/suppliers.go +++ b/services/suppliers.go @@ -72,10 +72,13 @@ func (splS *SupplierService) Start() (err error) { filterS := <-splS.filterSChan splS.filterSChan <- filterS + dbchan := splS.dm.GetDMChan() + datadb := <-dbchan + dbchan <- datadb splS.Lock() defer splS.Unlock() - splS.splS, err = engine.NewSupplierService(splS.dm.GetDM(), filterS, splS.cfg, + splS.splS, err = engine.NewSupplierService(datadb, filterS, splS.cfg, splS.connMgr) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", diff --git a/services/thresholds.go b/services/thresholds.go index 9c6413b3e..6944ee1f5 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -70,10 +70,13 @@ func (thrs *ThresholdService) Start() (err error) { filterS := <-thrs.filterSChan thrs.filterSChan <- filterS + dbchan := thrs.dm.GetDMChan() + datadb := <-dbchan + dbchan <- datadb thrs.Lock() defer thrs.Unlock() - thrs.thrs, err = engine.NewThresholdService(thrs.dm.GetDM(), thrs.cfg, filterS) + thrs.thrs, err = engine.NewThresholdService(datadb, thrs.cfg, filterS) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.ThresholdS, err.Error())) return