mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Added DB reload for LoaderS
This commit is contained in:
committed by
Dan Christian Bogos
parent
6d273d107c
commit
f17b4140c3
@@ -546,7 +546,7 @@ func main() {
|
||||
tS.GetIntenternalChan(), stS.GetIntenternalChan(), supS.GetIntenternalChan(),
|
||||
attrS.GetIntenternalChan(), cdrS.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan)
|
||||
|
||||
ldrs := services.NewLoaderService(cfg, dmService.GetDM(), filterSChan, server, internalCacheSChan, dspS.GetIntenternalChan(), exitChan)
|
||||
ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, internalCacheSChan, dspS.GetIntenternalChan(), exitChan)
|
||||
anz := services.NewAnalyzerService(cfg, server, exitChan)
|
||||
srvManager.AddServices(attrS, chrS, tS, stS, reS, supS, schS, rals,
|
||||
rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS, smg,
|
||||
|
||||
@@ -20,6 +20,7 @@ package migrator
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
|
||||
@@ -49,7 +49,7 @@ func TestAttributeSReload(t *testing.T) {
|
||||
db := NewDataDBService(cfg)
|
||||
attrS := NewAttributeService(cfg, db,
|
||||
chS, filterSChan, server)
|
||||
srvMngr.AddServices(attrS, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
srvMngr.AddServices(attrS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -76,7 +76,7 @@ func TestCdrsReload(t *testing.T) {
|
||||
make(chan rpcclient.RpcClientConnection, 1),
|
||||
chrS.GetIntenternalChan(), ralS.GetResponder().GetIntenternalChan(),
|
||||
nil, nil, nil, nil)
|
||||
srvMngr.AddServices(cdrS, ralS, schS, chrS, NewLoaderService(cfg, nil, filterSChan, server, cacheSChan, nil, engineShutdown), db)
|
||||
srvMngr.AddServices(cdrS, ralS, schS, chrS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db)
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -51,7 +51,7 @@ func TestChargerSReload(t *testing.T) {
|
||||
db := NewDataDBService(cfg)
|
||||
attrS := NewAttributeService(cfg, db, chS, filterSChan, server)
|
||||
chrS := NewChargerService(cfg, db, chS, filterSChan, server, attrS.GetIntenternalChan(), nil)
|
||||
srvMngr.AddServices(attrS, chrS, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
srvMngr.AddServices(attrS, chrS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -49,7 +49,7 @@ func TestDataDBReload(t *testing.T) {
|
||||
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
|
||||
db := NewDataDBService(cfg)
|
||||
srvMngr.AddServices(NewAttributeService(cfg, db,
|
||||
chS, filterSChan, server), NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
chS, filterSChan, server), NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ func TestDispatcherSReload(t *testing.T) {
|
||||
db := NewDataDBService(cfg)
|
||||
attrS := NewAttributeService(cfg, db, chS, filterSChan, server)
|
||||
srv := NewDispatcherService(cfg, db, chS, filterSChan, server, attrS.GetIntenternalChan())
|
||||
srvMngr.AddServices(attrS, srv, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
srvMngr.AddServices(attrS, srv, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ func TestDNSAgentReload(t *testing.T) {
|
||||
sS := NewSessionService(cfg, db, server, nil,
|
||||
nil, nil, nil, nil, nil, nil, nil, nil, engineShutdown)
|
||||
srv := NewDNSAgent(cfg, filterSChan, sS.GetIntenternalChan(), nil, engineShutdown)
|
||||
srvMngr.AddServices(srv, sS, NewLoaderService(cfg, nil, filterSChan, server, cacheSChan, nil, engineShutdown), db)
|
||||
srvMngr.AddServices(srv, sS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db)
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -56,7 +56,7 @@ func TestEventReaderSReload(t *testing.T) {
|
||||
sS := NewSessionService(cfg, db, server, nil,
|
||||
nil, nil, nil, nil, nil, nil, nil, nil, engineShutdown)
|
||||
attrS := NewEventReaderService(cfg, filterSChan, sS.GetIntenternalChan(), nil, engineShutdown)
|
||||
srvMngr.AddServices(attrS, sS, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
srvMngr.AddServices(attrS, sS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ import (
|
||||
)
|
||||
|
||||
// NewLoaderService returns the Loader Service
|
||||
func NewLoaderService(cfg *config.CGRConfig, dm *engine.DataManager,
|
||||
func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
filterSChan chan *engine.FilterS, server *utils.Server,
|
||||
cacheSChan, dispatcherChan chan rpcclient.RpcClientConnection,
|
||||
exitChan chan bool) servmanager.Service {
|
||||
@@ -52,7 +52,7 @@ func NewLoaderService(cfg *config.CGRConfig, dm *engine.DataManager,
|
||||
type LoaderService struct {
|
||||
sync.RWMutex
|
||||
cfg *config.CGRConfig
|
||||
dm *engine.DataManager
|
||||
dm *DataDBService
|
||||
filterSChan chan *engine.FilterS
|
||||
server *utils.Server
|
||||
cacheSChan chan rpcclient.RpcClientConnection
|
||||
@@ -80,7 +80,7 @@ func (ldrs *LoaderService) Start() (err error) {
|
||||
ldrs.Lock()
|
||||
defer ldrs.Unlock()
|
||||
|
||||
ldrs.ldrs = loaders.NewLoaderService(ldrs.dm, ldrs.cfg.LoaderCfg(),
|
||||
ldrs.ldrs = loaders.NewLoaderService(ldrs.dm.GetDM(), ldrs.cfg.LoaderCfg(),
|
||||
ldrs.cfg.GeneralCfg().DefaultTimezone, ldrs.exitChan, filterS, internalChan)
|
||||
if !ldrs.ldrs.Enabled() {
|
||||
return
|
||||
@@ -105,7 +105,7 @@ func (ldrs *LoaderService) Reload() (err error) {
|
||||
if ldrs.cfg.DispatcherSCfg().Enabled {
|
||||
internalChan = ldrs.dispatcherChan
|
||||
}
|
||||
ldrs.ldrs.Reload(ldrs.dm, ldrs.cfg.LoaderCfg(), ldrs.cfg.GeneralCfg().DefaultTimezone,
|
||||
ldrs.ldrs.Reload(ldrs.dm.GetDM(), ldrs.cfg.LoaderCfg(), ldrs.cfg.GeneralCfg().DefaultTimezone,
|
||||
ldrs.exitChan, filterS, internalChan)
|
||||
ldrs.RUnlock()
|
||||
return
|
||||
|
||||
@@ -70,7 +70,7 @@ func TestRalsReload(t *testing.T) {
|
||||
ralS := NewRalService(cfg, db, nil, nil, chS, filterSChan, server,
|
||||
tS.GetIntenternalChan(), internalChan, cacheSChan, internalChan, internalChan,
|
||||
internalChan, schS, engineShutdown)
|
||||
srvMngr.AddServices(ralS, schS, tS, NewLoaderService(cfg, nil, filterSChan, server, cacheSChan, nil, engineShutdown), db)
|
||||
srvMngr.AddServices(ralS, schS, tS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db)
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -54,7 +54,7 @@ func TestResourceSReload(t *testing.T) {
|
||||
db := NewDataDBService(cfg)
|
||||
tS := NewThresholdService(cfg, db, chS, filterSChan, server)
|
||||
reS := NewResourceService(cfg, db, chS, filterSChan, server, tS.GetIntenternalChan(), nil)
|
||||
srvMngr.AddServices(tS, reS, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
srvMngr.AddServices(tS, reS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -68,6 +68,9 @@ func (schS *SchedulerService) Start() (err error) {
|
||||
|
||||
<-schS.cacheS.GetPrecacheChannel(utils.CacheActionPlans) // wait for ActionPlans to be cached
|
||||
|
||||
if !schS.dm.IsRunning() {
|
||||
return fmt.Errorf("schedulerS needs DB")
|
||||
}
|
||||
schS.Lock()
|
||||
defer schS.Unlock()
|
||||
|
||||
|
||||
@@ -49,7 +49,7 @@ func TestSchedulerSReload(t *testing.T) {
|
||||
internalCdrSChan <- nil
|
||||
db := NewDataDBService(cfg)
|
||||
schS := NewSchedulerService(cfg, db, chS, server, internalCdrSChan, nil)
|
||||
srvMngr.AddServices(schS, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
srvMngr.AddServices(schS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -79,7 +79,7 @@ func TestSessionSReload(t *testing.T) {
|
||||
nil, nil, nil, nil)
|
||||
srv := NewSessionService(cfg, db, server, chrS.GetIntenternalChan(),
|
||||
ralS.GetResponder().GetIntenternalChan(), nil, nil, nil, nil, nil, cdrS.GetIntenternalChan(), nil, engineShutdown)
|
||||
srvMngr.AddServices(srv, chrS, schS, ralS, cdrS, NewLoaderService(cfg, nil, filterSChan, server, cacheSChan, nil, engineShutdown), db)
|
||||
srvMngr.AddServices(srv, chrS, schS, ralS, cdrS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db)
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -54,7 +54,7 @@ func TestStatSReload(t *testing.T) {
|
||||
db := NewDataDBService(cfg)
|
||||
tS := NewThresholdService(cfg, db, chS, filterSChan, server)
|
||||
sS := NewStatService(cfg, db, chS, filterSChan, server, tS.GetIntenternalChan(), nil)
|
||||
srvMngr.AddServices(tS, sS, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
srvMngr.AddServices(tS, sS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ func TestSupplierSReload(t *testing.T) {
|
||||
db := NewDataDBService(cfg)
|
||||
sts := NewStatService(cfg, db, chS, filterSChan, server, nil, nil)
|
||||
supS := NewSupplierService(cfg, db, chS, filterSChan, server, nil, sts.GetIntenternalChan(), nil, nil)
|
||||
srvMngr.AddServices(supS, sts, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
srvMngr.AddServices(supS, sts, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -49,7 +49,7 @@ func TestThresholdSReload(t *testing.T) {
|
||||
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
|
||||
db := NewDataDBService(cfg)
|
||||
tS := NewThresholdService(cfg, db, chS, filterSChan, server)
|
||||
srvMngr.AddServices(tS, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
srvMngr.AddServices(tS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user