Added DB reload for DispatcherS

This commit is contained in:
Trial97
2019-10-08 11:52:20 +03:00
committed by Dan Christian Bogos
parent c18a0c83c4
commit 0189ad71da
4 changed files with 24 additions and 19 deletions

View File

@@ -456,13 +456,11 @@ func main() {
var loadDb engine.LoadStorage
var cdrDb engine.CdrStorage
var dm *engine.DataManager
dmService := services.NewDataDBService(cfg)
if dmService.ShouldRun() { // Some services can run without db, ie: CDRC
if err = dmService.Start(); err != nil {
return
}
dm = dmService.GetDM()
}
if cfg.RalsCfg().Enabled || cfg.CdrsCfg().Enabled {
storDb, err := engine.ConfigureStorStorage(cfg.StorDbCfg().StorDBType,
@@ -510,7 +508,7 @@ func main() {
internalCDRServerChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
// init CacheS
cacheS := initCacheS(internalCacheSChan, server, dm, exitChan)
cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan)
// init GuardianSv1
initGuardianSv1(internalGuardianSChan, server)
@@ -522,33 +520,33 @@ func main() {
srvManager := servmanager.NewServiceManager(cfg, exitChan)
attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server)
dspS := services.NewDispatcherService(cfg, dm, cacheS, filterSChan, server, attrS.GetIntenternalChan())
chrS := services.NewChargerService(cfg, dm, cacheS, filterSChan, server,
dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, attrS.GetIntenternalChan())
chrS := services.NewChargerService(cfg, dmService.GetDM(), cacheS, filterSChan, server,
attrS.GetIntenternalChan(), dspS.GetIntenternalChan())
tS := services.NewThresholdService(cfg, dm, cacheS, filterSChan, server)
stS := services.NewStatService(cfg, dm, cacheS, filterSChan, server,
tS := services.NewThresholdService(cfg, dmService.GetDM(), cacheS, filterSChan, server)
stS := services.NewStatService(cfg, dmService.GetDM(), cacheS, filterSChan, server,
tS.GetIntenternalChan(), dspS.GetIntenternalChan())
reS := services.NewResourceService(cfg, dm, cacheS, filterSChan, server,
reS := services.NewResourceService(cfg, dmService.GetDM(), cacheS, filterSChan, server,
tS.GetIntenternalChan(), dspS.GetIntenternalChan())
supS := services.NewSupplierService(cfg, dm, cacheS, filterSChan, server,
supS := services.NewSupplierService(cfg, dmService.GetDM(), cacheS, filterSChan, server,
attrS.GetIntenternalChan(), stS.GetIntenternalChan(),
reS.GetIntenternalChan(), dspS.GetIntenternalChan())
schS := services.NewSchedulerService(cfg, dm, cacheS, server, internalCDRServerChan, dspS.GetIntenternalChan())
rals := services.NewRalService(cfg, dm, cdrDb, loadDb, cacheS, filterSChan, server,
schS := services.NewSchedulerService(cfg, dmService.GetDM(), cacheS, server, internalCDRServerChan, dspS.GetIntenternalChan())
rals := services.NewRalService(cfg, dmService.GetDM(), cdrDb, loadDb, cacheS, filterSChan, server,
tS.GetIntenternalChan(), stS.GetIntenternalChan(), internalCacheSChan,
schS.GetIntenternalChan(), attrS.GetIntenternalChan(), dspS.GetIntenternalChan(),
schS, exitChan)
cdrS := services.NewCDRServer(cfg, dm, cdrDb, filterSChan, server, internalCDRServerChan,
cdrS := services.NewCDRServer(cfg, dmService.GetDM(), cdrDb, filterSChan, server, internalCDRServerChan,
chrS.GetIntenternalChan(), rals.GetResponder().GetIntenternalChan(),
attrS.GetIntenternalChan(), tS.GetIntenternalChan(),
stS.GetIntenternalChan(), dspS.GetIntenternalChan())
smg := services.NewSessionService(cfg, dm, server, chrS.GetIntenternalChan(),
smg := services.NewSessionService(cfg, dmService.GetDM(), server, chrS.GetIntenternalChan(),
rals.GetResponder().GetIntenternalChan(), reS.GetIntenternalChan(),
tS.GetIntenternalChan(), stS.GetIntenternalChan(), supS.GetIntenternalChan(),
attrS.GetIntenternalChan(), cdrS.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan)
ldrs := services.NewLoaderService(cfg, dm, filterSChan, server, internalCacheSChan, dspS.GetIntenternalChan(), exitChan)
ldrs := services.NewLoaderService(cfg, dmService.GetDM(), filterSChan, server, internalCacheSChan, dspS.GetIntenternalChan(), exitChan)
anz := services.NewAnalyzerService(cfg, server, exitChan)
srvManager.AddServices(attrS, chrS, tS, stS, reS, supS, schS, rals,
rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS, smg,
@@ -568,7 +566,7 @@ func main() {
// Start FilterS
go startFilterService(filterSChan, cacheS, stS.GetIntenternalChan(),
reS.GetIntenternalChan(), rals.GetResponder().GetIntenternalChan(),
cfg, dm, exitChan)
cfg, dmService.GetDM(), exitChan)
initServiceManagerV1(internalServeManagerChan, srvManager, server)

View File

@@ -1662,6 +1662,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) {
}
fallthrough
case DispatcherSJson:
cfg.rldChans[DATADB_JSN] <- struct{}{} // reload datadb before
cfg.rldChans[DispatcherSJson] <- struct{}{}
if !fall {
break

View File

@@ -32,7 +32,7 @@ import (
)
// NewDispatcherService returns the Dispatcher Service
func NewDispatcherService(cfg *config.CGRConfig, dm *engine.DataManager,
func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
server *utils.Server, attrsChan chan rpcclient.RpcClientConnection) servmanager.Service {
return &DispatcherService{
@@ -50,7 +50,7 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *engine.DataManager,
type DispatcherService struct {
sync.RWMutex
cfg *config.CGRConfig
dm *engine.DataManager
dm *DataDBService
cacheS *engine.CacheS
filterSChan chan *engine.FilterS
server *utils.Server
@@ -89,7 +89,7 @@ func (dspS *DispatcherService) Start() (err error) {
return
}
}
if dspS.dspS, err = dispatchers.NewDispatcherService(dspS.dm, dspS.cfg, fltrS, attrSConn); err != nil {
if dspS.dspS, err = dispatchers.NewDispatcherService(dspS.dm.GetDM(), dspS.cfg, fltrS, attrSConn); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherS, err.Error()))
return
}

View File

@@ -51,7 +51,7 @@ func TestDispatcherSReload(t *testing.T) {
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
db := NewDataDBService(cfg)
attrS := NewAttributeService(cfg, db, chS, filterSChan, server)
srv := NewDispatcherService(cfg, nil, chS, filterSChan, server, attrS.GetIntenternalChan())
srv := NewDispatcherService(cfg, db, chS, filterSChan, server, attrS.GetIntenternalChan())
srvMngr.AddServices(attrS, srv, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
@@ -59,6 +59,9 @@ func TestDispatcherSReload(t *testing.T) {
if srv.IsRunning() {
t.Errorf("Expected service to be down")
}
if db.IsRunning() {
t.Errorf("Expected service to be down")
}
var reply string
if err = cfg.V1ReloadConfig(&config.ConfigReloadWithArgDispatcher{
Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "dispatchers", "dispatchers"),
@@ -72,6 +75,9 @@ func TestDispatcherSReload(t *testing.T) {
if !srv.IsRunning() {
t.Errorf("Expected service to be running")
}
if !db.IsRunning() {
t.Errorf("Expected service to be running")
}
cfg.DispatcherSCfg().Enabled = false
cfg.GetReloadChan(config.DispatcherSJson) <- struct{}{}
time.Sleep(10 * time.Millisecond)