diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index e32883cee..14eb89df1 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -456,13 +456,11 @@ func main() { var loadDb engine.LoadStorage var cdrDb engine.CdrStorage - var dm *engine.DataManager dmService := services.NewDataDBService(cfg) if dmService.ShouldRun() { // Some services can run without db, ie: CDRC if err = dmService.Start(); err != nil { return } - dm = dmService.GetDM() } if cfg.RalsCfg().Enabled || cfg.CdrsCfg().Enabled { storDb, err := engine.ConfigureStorStorage(cfg.StorDbCfg().StorDBType, @@ -510,7 +508,7 @@ func main() { internalCDRServerChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency // init CacheS - cacheS := initCacheS(internalCacheSChan, server, dm, exitChan) + cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan) // init GuardianSv1 initGuardianSv1(internalGuardianSChan, server) @@ -522,33 +520,33 @@ func main() { srvManager := servmanager.NewServiceManager(cfg, exitChan) attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server) - dspS := services.NewDispatcherService(cfg, dm, cacheS, filterSChan, server, attrS.GetIntenternalChan()) - chrS := services.NewChargerService(cfg, dm, cacheS, filterSChan, server, + dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, attrS.GetIntenternalChan()) + chrS := services.NewChargerService(cfg, dmService.GetDM(), cacheS, filterSChan, server, attrS.GetIntenternalChan(), dspS.GetIntenternalChan()) - tS := services.NewThresholdService(cfg, dm, cacheS, filterSChan, server) - stS := services.NewStatService(cfg, dm, cacheS, filterSChan, server, + tS := services.NewThresholdService(cfg, dmService.GetDM(), cacheS, filterSChan, server) + stS := services.NewStatService(cfg, dmService.GetDM(), cacheS, filterSChan, server, tS.GetIntenternalChan(), dspS.GetIntenternalChan()) - reS := services.NewResourceService(cfg, dm, cacheS, filterSChan, server, + reS := services.NewResourceService(cfg, dmService.GetDM(), cacheS, filterSChan, server, tS.GetIntenternalChan(), dspS.GetIntenternalChan()) - supS := services.NewSupplierService(cfg, dm, cacheS, filterSChan, server, + supS := services.NewSupplierService(cfg, dmService.GetDM(), cacheS, filterSChan, server, attrS.GetIntenternalChan(), stS.GetIntenternalChan(), reS.GetIntenternalChan(), dspS.GetIntenternalChan()) - schS := services.NewSchedulerService(cfg, dm, cacheS, server, internalCDRServerChan, dspS.GetIntenternalChan()) - rals := services.NewRalService(cfg, dm, cdrDb, loadDb, cacheS, filterSChan, server, + schS := services.NewSchedulerService(cfg, dmService.GetDM(), cacheS, server, internalCDRServerChan, dspS.GetIntenternalChan()) + rals := services.NewRalService(cfg, dmService.GetDM(), cdrDb, loadDb, cacheS, filterSChan, server, tS.GetIntenternalChan(), stS.GetIntenternalChan(), internalCacheSChan, schS.GetIntenternalChan(), attrS.GetIntenternalChan(), dspS.GetIntenternalChan(), schS, exitChan) - cdrS := services.NewCDRServer(cfg, dm, cdrDb, filterSChan, server, internalCDRServerChan, + cdrS := services.NewCDRServer(cfg, dmService.GetDM(), cdrDb, filterSChan, server, internalCDRServerChan, chrS.GetIntenternalChan(), rals.GetResponder().GetIntenternalChan(), attrS.GetIntenternalChan(), tS.GetIntenternalChan(), stS.GetIntenternalChan(), dspS.GetIntenternalChan()) - smg := services.NewSessionService(cfg, dm, server, chrS.GetIntenternalChan(), + smg := services.NewSessionService(cfg, dmService.GetDM(), server, chrS.GetIntenternalChan(), rals.GetResponder().GetIntenternalChan(), reS.GetIntenternalChan(), tS.GetIntenternalChan(), stS.GetIntenternalChan(), supS.GetIntenternalChan(), attrS.GetIntenternalChan(), cdrS.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan) - ldrs := services.NewLoaderService(cfg, dm, filterSChan, server, internalCacheSChan, dspS.GetIntenternalChan(), exitChan) + ldrs := services.NewLoaderService(cfg, dmService.GetDM(), filterSChan, server, internalCacheSChan, dspS.GetIntenternalChan(), exitChan) anz := services.NewAnalyzerService(cfg, server, exitChan) srvManager.AddServices(attrS, chrS, tS, stS, reS, supS, schS, rals, rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS, smg, @@ -568,7 +566,7 @@ func main() { // Start FilterS go startFilterService(filterSChan, cacheS, stS.GetIntenternalChan(), reS.GetIntenternalChan(), rals.GetResponder().GetIntenternalChan(), - cfg, dm, exitChan) + cfg, dmService.GetDM(), exitChan) initServiceManagerV1(internalServeManagerChan, srvManager, server) diff --git a/config/config.go b/config/config.go index a7bee3da3..9a04566e0 100755 --- a/config/config.go +++ b/config/config.go @@ -1662,6 +1662,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) { } fallthrough case DispatcherSJson: + cfg.rldChans[DATADB_JSN] <- struct{}{} // reload datadb before cfg.rldChans[DispatcherSJson] <- struct{}{} if !fall { break diff --git a/services/dispatchers.go b/services/dispatchers.go index 47119d2e4..f0daf1cbf 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -32,7 +32,7 @@ import ( ) // NewDispatcherService returns the Dispatcher Service -func NewDispatcherService(cfg *config.CGRConfig, dm *engine.DataManager, +func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, server *utils.Server, attrsChan chan rpcclient.RpcClientConnection) servmanager.Service { return &DispatcherService{ @@ -50,7 +50,7 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *engine.DataManager, type DispatcherService struct { sync.RWMutex cfg *config.CGRConfig - dm *engine.DataManager + dm *DataDBService cacheS *engine.CacheS filterSChan chan *engine.FilterS server *utils.Server @@ -89,7 +89,7 @@ func (dspS *DispatcherService) Start() (err error) { return } } - if dspS.dspS, err = dispatchers.NewDispatcherService(dspS.dm, dspS.cfg, fltrS, attrSConn); err != nil { + if dspS.dspS, err = dispatchers.NewDispatcherService(dspS.dm.GetDM(), dspS.cfg, fltrS, attrSConn); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherS, err.Error())) return } diff --git a/services/dispatchers_it_test.go b/services/dispatchers_it_test.go index c6e684a0a..8e289d126 100644 --- a/services/dispatchers_it_test.go +++ b/services/dispatchers_it_test.go @@ -51,7 +51,7 @@ func TestDispatcherSReload(t *testing.T) { srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg) attrS := NewAttributeService(cfg, db, chS, filterSChan, server) - srv := NewDispatcherService(cfg, nil, chS, filterSChan, server, attrS.GetIntenternalChan()) + srv := NewDispatcherService(cfg, db, chS, filterSChan, server, attrS.GetIntenternalChan()) srvMngr.AddServices(attrS, srv, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) @@ -59,6 +59,9 @@ func TestDispatcherSReload(t *testing.T) { if srv.IsRunning() { t.Errorf("Expected service to be down") } + if db.IsRunning() { + t.Errorf("Expected service to be down") + } var reply string if err = cfg.V1ReloadConfig(&config.ConfigReloadWithArgDispatcher{ Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "dispatchers", "dispatchers"), @@ -72,6 +75,9 @@ func TestDispatcherSReload(t *testing.T) { if !srv.IsRunning() { t.Errorf("Expected service to be running") } + if !db.IsRunning() { + t.Errorf("Expected service to be running") + } cfg.DispatcherSCfg().Enabled = false cfg.GetReloadChan(config.DispatcherSJson) <- struct{}{} time.Sleep(10 * time.Millisecond)