mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Updated dataDB reload
This commit is contained in:
@@ -78,6 +78,9 @@ func (api *ApierV1Service) Start() (err error) {
|
||||
|
||||
filterS := <-api.filterSChan
|
||||
api.filterSChan <- filterS
|
||||
dbchan := api.dm.GetDMChan()
|
||||
datadb := <-dbchan
|
||||
dbchan <- datadb
|
||||
|
||||
api.Lock()
|
||||
defer api.Unlock()
|
||||
@@ -88,7 +91,7 @@ func (api *ApierV1Service) Start() (err error) {
|
||||
stordb := <-api.storDBChan
|
||||
|
||||
api.api = &v1.ApierV1{
|
||||
DataManager: api.dm.GetDM(),
|
||||
DataManager: datadb,
|
||||
CdrDb: stordb,
|
||||
StorDb: stordb,
|
||||
Config: api.cfg,
|
||||
@@ -102,7 +105,7 @@ func (api *ApierV1Service) Start() (err error) {
|
||||
|
||||
if !api.cfg.DispatcherSCfg().Enabled {
|
||||
api.server.RpcRegister(api.api)
|
||||
api.server.RpcRegister(v1.NewReplicatorSv1(api.dm.GetDM()))
|
||||
api.server.RpcRegister(v1.NewReplicatorSv1(datadb))
|
||||
}
|
||||
|
||||
utils.RegisterRpcParams("", &v1.CDRsV1{})
|
||||
|
||||
@@ -69,10 +69,13 @@ func (attrS *AttributeService) Start() (err error) {
|
||||
|
||||
filterS := <-attrS.filterSChan
|
||||
attrS.filterSChan <- filterS
|
||||
dbchan := attrS.dm.GetDMChan()
|
||||
datadb := <-dbchan
|
||||
dbchan <- datadb
|
||||
|
||||
attrS.Lock()
|
||||
defer attrS.Unlock()
|
||||
attrS.attrS, err = engine.NewAttributeService(attrS.dm.GetDM(), filterS, attrS.cfg)
|
||||
attrS.attrS, err = engine.NewAttributeService(datadb, filterS, attrS.cfg)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(
|
||||
fmt.Sprintf("<%s> Could not init, error: %s",
|
||||
|
||||
@@ -76,6 +76,9 @@ func (cdrS *CDRServer) Start() (err error) {
|
||||
|
||||
filterS := <-cdrS.filterSChan
|
||||
cdrS.filterSChan <- filterS
|
||||
dbchan := cdrS.dm.GetDMChan()
|
||||
datadb := <-dbchan
|
||||
dbchan <- datadb
|
||||
|
||||
cdrS.Lock()
|
||||
defer cdrS.Unlock()
|
||||
@@ -85,8 +88,7 @@ func (cdrS *CDRServer) Start() (err error) {
|
||||
cdrS.storDB.RegisterSyncChan(cdrS.storDBChan)
|
||||
stordb := <-cdrS.storDBChan
|
||||
|
||||
cdrS.cdrS = engine.NewCDRServer(cdrS.cfg, stordb, cdrS.dm.GetDM(),
|
||||
filterS, cdrS.connMgr)
|
||||
cdrS.cdrS = engine.NewCDRServer(cdrS.cfg, stordb, datadb, filterS, cdrS.connMgr)
|
||||
utils.Logger.Info("Registering CDRS HTTP Handlers.")
|
||||
cdrS.cdrS.RegisterHandlersToServer(cdrS.server)
|
||||
utils.Logger.Info("Registering CDRS RPC service.")
|
||||
|
||||
@@ -71,10 +71,13 @@ func (chrS *ChargerService) Start() (err error) {
|
||||
|
||||
filterS := <-chrS.filterSChan
|
||||
chrS.filterSChan <- filterS
|
||||
dbchan := chrS.dm.GetDMChan()
|
||||
datadb := <-dbchan
|
||||
dbchan <- datadb
|
||||
|
||||
chrS.Lock()
|
||||
defer chrS.Unlock()
|
||||
if chrS.chrS, err = engine.NewChargerService(chrS.dm.GetDM(), filterS, chrS.cfg, chrS.connMgr); err != nil {
|
||||
if chrS.chrS, err = engine.NewChargerService(datadb, filterS, chrS.cfg, chrS.connMgr); err != nil {
|
||||
utils.Logger.Crit(
|
||||
fmt.Sprintf("<%s> Could not init, error: %s",
|
||||
utils.ChargerS, err.Error()))
|
||||
|
||||
@@ -150,7 +150,6 @@ func (db *DataDBService) GetDM() *engine.DataManager {
|
||||
|
||||
// needsConnectionReload returns if the DB connection needs to reloaded
|
||||
func (db *DataDBService) needsConnectionReload() bool {
|
||||
|
||||
if db.oldDBCfg.DataDbType != db.cfg.DataDbCfg().DataDbType ||
|
||||
db.oldDBCfg.DataDbHost != db.cfg.DataDbCfg().DataDbHost ||
|
||||
db.oldDBCfg.DataDbName != db.cfg.DataDbCfg().DataDbName ||
|
||||
|
||||
@@ -72,11 +72,14 @@ func (dspS *DispatcherService) Start() (err error) {
|
||||
<-dspS.cacheS.GetPrecacheChannel(utils.CacheDispatcherProfiles)
|
||||
<-dspS.cacheS.GetPrecacheChannel(utils.CacheDispatcherHosts)
|
||||
<-dspS.cacheS.GetPrecacheChannel(utils.CacheDispatcherFilterIndexes)
|
||||
dbchan := dspS.dm.GetDMChan()
|
||||
datadb := <-dbchan
|
||||
dbchan <- datadb
|
||||
|
||||
dspS.Lock()
|
||||
defer dspS.Unlock()
|
||||
|
||||
if dspS.dspS, err = dispatchers.NewDispatcherService(dspS.dm.GetDM(), dspS.cfg, fltrS, dspS.connMgr); err != nil {
|
||||
if dspS.dspS, err = dispatchers.NewDispatcherService(datadb, dspS.cfg, fltrS, dspS.connMgr); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherS, err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -70,11 +70,14 @@ func (ldrs *LoaderService) Start() (err error) {
|
||||
|
||||
filterS := <-ldrs.filterSChan
|
||||
ldrs.filterSChan <- filterS
|
||||
dbchan := ldrs.dm.GetDMChan()
|
||||
datadb := <-dbchan
|
||||
dbchan <- datadb
|
||||
|
||||
ldrs.Lock()
|
||||
defer ldrs.Unlock()
|
||||
|
||||
ldrs.ldrs = loaders.NewLoaderService(ldrs.dm.GetDM(), ldrs.cfg.LoaderCfg(),
|
||||
ldrs.ldrs = loaders.NewLoaderService(datadb, ldrs.cfg.LoaderCfg(),
|
||||
ldrs.cfg.GeneralCfg().DefaultTimezone, ldrs.exitChan, filterS, ldrs.connMgr)
|
||||
if !ldrs.ldrs.Enabled() {
|
||||
return
|
||||
@@ -94,9 +97,13 @@ func (ldrs *LoaderService) GetIntenternalChan() (conn chan rpcclient.ClientConne
|
||||
func (ldrs *LoaderService) Reload() (err error) {
|
||||
filterS := <-ldrs.filterSChan
|
||||
ldrs.filterSChan <- filterS
|
||||
dbchan := ldrs.dm.GetDMChan()
|
||||
datadb := <-dbchan
|
||||
dbchan <- datadb
|
||||
|
||||
ldrs.RLock()
|
||||
|
||||
ldrs.ldrs.Reload(ldrs.dm.GetDM(), ldrs.cfg.LoaderCfg(), ldrs.cfg.GeneralCfg().DefaultTimezone,
|
||||
ldrs.ldrs.Reload(datadb, ldrs.cfg.LoaderCfg(), ldrs.cfg.GeneralCfg().DefaultTimezone,
|
||||
ldrs.exitChan, filterS, ldrs.connMgr)
|
||||
ldrs.RUnlock()
|
||||
return
|
||||
|
||||
@@ -73,10 +73,13 @@ func (reS *ResourceService) Start() (err error) {
|
||||
|
||||
filterS := <-reS.filterSChan
|
||||
reS.filterSChan <- filterS
|
||||
dbchan := reS.dm.GetDMChan()
|
||||
datadb := <-dbchan
|
||||
dbchan <- datadb
|
||||
|
||||
reS.Lock()
|
||||
defer reS.Unlock()
|
||||
reS.reS, err = engine.NewResourceService(reS.dm.GetDM(), reS.cfg, filterS, reS.connMgr)
|
||||
reS.reS, err = engine.NewResourceService(datadb, reS.cfg, filterS, reS.connMgr)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.ResourceS, err.Error()))
|
||||
return
|
||||
|
||||
@@ -71,11 +71,14 @@ func (schS *SchedulerService) Start() (err error) {
|
||||
|
||||
fltrS := <-schS.fltrSChan
|
||||
schS.fltrSChan <- fltrS
|
||||
dbchan := schS.dm.GetDMChan()
|
||||
datadb := <-dbchan
|
||||
dbchan <- datadb
|
||||
|
||||
schS.Lock()
|
||||
defer schS.Unlock()
|
||||
utils.Logger.Info("<ServiceManager> Starting CGRateS Scheduler.")
|
||||
schS.schS = scheduler.NewScheduler(schS.dm.GetDM(), schS.cfg, fltrS)
|
||||
schS.schS = scheduler.NewScheduler(datadb, schS.cfg, fltrS)
|
||||
go schS.schS.Loop()
|
||||
|
||||
schS.rpc = v1.NewSchedulerSv1(schS.cfg)
|
||||
|
||||
@@ -69,11 +69,16 @@ func (smg *SessionService) Start() (err error) {
|
||||
if smg.IsRunning() {
|
||||
return fmt.Errorf("service aleady running")
|
||||
}
|
||||
|
||||
var datadb *engine.DataManager
|
||||
if smg.dm.IsRunning() {
|
||||
dbchan := smg.dm.GetDMChan()
|
||||
datadb = <-dbchan
|
||||
dbchan <- datadb
|
||||
}
|
||||
smg.Lock()
|
||||
defer smg.Unlock()
|
||||
|
||||
smg.sm = sessions.NewSessionS(smg.cfg, smg.dm.GetDM(), smg.connMgr)
|
||||
smg.sm = sessions.NewSessionS(smg.cfg, datadb, smg.connMgr)
|
||||
//start sync session in a separate gorutine
|
||||
go func(sm *sessions.SessionS) {
|
||||
if err = sm.ListenAndServe(smg.exitChan); err != nil {
|
||||
|
||||
@@ -72,10 +72,13 @@ func (sts *StatService) Start() (err error) {
|
||||
|
||||
filterS := <-sts.filterSChan
|
||||
sts.filterSChan <- filterS
|
||||
dbchan := sts.dm.GetDMChan()
|
||||
datadb := <-dbchan
|
||||
dbchan <- datadb
|
||||
|
||||
sts.Lock()
|
||||
defer sts.Unlock()
|
||||
sts.sts, err = engine.NewStatService(sts.dm.GetDM(), sts.cfg, filterS, sts.connMgr)
|
||||
sts.sts, err = engine.NewStatService(datadb, sts.cfg, filterS, sts.connMgr)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<StatS> Could not init, error: %s", err.Error()))
|
||||
return
|
||||
|
||||
@@ -72,10 +72,13 @@ func (splS *SupplierService) Start() (err error) {
|
||||
|
||||
filterS := <-splS.filterSChan
|
||||
splS.filterSChan <- filterS
|
||||
dbchan := splS.dm.GetDMChan()
|
||||
datadb := <-dbchan
|
||||
dbchan <- datadb
|
||||
|
||||
splS.Lock()
|
||||
defer splS.Unlock()
|
||||
splS.splS, err = engine.NewSupplierService(splS.dm.GetDM(), filterS, splS.cfg,
|
||||
splS.splS, err = engine.NewSupplierService(datadb, filterS, splS.cfg,
|
||||
splS.connMgr)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s",
|
||||
|
||||
@@ -70,10 +70,13 @@ func (thrs *ThresholdService) Start() (err error) {
|
||||
|
||||
filterS := <-thrs.filterSChan
|
||||
thrs.filterSChan <- filterS
|
||||
dbchan := thrs.dm.GetDMChan()
|
||||
datadb := <-dbchan
|
||||
dbchan <- datadb
|
||||
|
||||
thrs.Lock()
|
||||
defer thrs.Unlock()
|
||||
thrs.thrs, err = engine.NewThresholdService(thrs.dm.GetDM(), thrs.cfg, filterS)
|
||||
thrs.thrs, err = engine.NewThresholdService(datadb, thrs.cfg, filterS)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.ThresholdS, err.Error()))
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user