From f17b4140c310db236dc53f18bc543e7876f656c0 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 9 Oct 2019 14:15:30 +0300 Subject: [PATCH] Added DB reload for LoaderS --- cmd/cgr-engine/cgr-engine.go | 2 +- migrator/attributes.go | 1 + services/attributes_it_test.go | 2 +- services/cdrs_it_test.go | 2 +- services/chargers_it_test.go | 2 +- services/datadb_it_test.go | 2 +- services/dispatchers_it_test.go | 2 +- services/dnsagent_it_test.go | 2 +- services/ers_it_test.go | 2 +- services/loaders.go | 8 ++++---- services/rals_it_test.go | 2 +- services/resources_it_test.go | 2 +- services/schedulers.go | 3 +++ services/schedulers_it_test.go | 2 +- services/sessions_it_test.go | 2 +- services/stats_it_test.go | 2 +- services/suppliers_it_test.go | 2 +- services/thresholds_it_test.go | 2 +- 18 files changed, 23 insertions(+), 19 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 781b6ff4b..06f8d3852 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -546,7 +546,7 @@ func main() { tS.GetIntenternalChan(), stS.GetIntenternalChan(), supS.GetIntenternalChan(), attrS.GetIntenternalChan(), cdrS.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan) - ldrs := services.NewLoaderService(cfg, dmService.GetDM(), filterSChan, server, internalCacheSChan, dspS.GetIntenternalChan(), exitChan) + ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, internalCacheSChan, dspS.GetIntenternalChan(), exitChan) anz := services.NewAnalyzerService(cfg, server, exitChan) srvManager.AddServices(attrS, chrS, tS, stS, reS, supS, schS, rals, rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS, smg, diff --git a/migrator/attributes.go b/migrator/attributes.go index d0a32be4e..fb2eacbff 100644 --- a/migrator/attributes.go +++ b/migrator/attributes.go @@ -20,6 +20,7 @@ package migrator import ( "fmt" + "reflect" "strings" "github.com/cgrates/cgrates/config" diff --git a/services/attributes_it_test.go b/services/attributes_it_test.go index d23899390..ad6520755 100644 --- a/services/attributes_it_test.go +++ b/services/attributes_it_test.go @@ -49,7 +49,7 @@ func TestAttributeSReload(t *testing.T) { db := NewDataDBService(cfg) attrS := NewAttributeService(cfg, db, chS, filterSChan, server) - srvMngr.AddServices(attrS, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db) + srvMngr.AddServices(attrS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/cdrs_it_test.go b/services/cdrs_it_test.go index 45b1ee972..77fd178ac 100644 --- a/services/cdrs_it_test.go +++ b/services/cdrs_it_test.go @@ -76,7 +76,7 @@ func TestCdrsReload(t *testing.T) { make(chan rpcclient.RpcClientConnection, 1), chrS.GetIntenternalChan(), ralS.GetResponder().GetIntenternalChan(), nil, nil, nil, nil) - srvMngr.AddServices(cdrS, ralS, schS, chrS, NewLoaderService(cfg, nil, filterSChan, server, cacheSChan, nil, engineShutdown), db) + srvMngr.AddServices(cdrS, ralS, schS, chrS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/chargers_it_test.go b/services/chargers_it_test.go index 405697f38..12b87359d 100644 --- a/services/chargers_it_test.go +++ b/services/chargers_it_test.go @@ -51,7 +51,7 @@ func TestChargerSReload(t *testing.T) { db := NewDataDBService(cfg) attrS := NewAttributeService(cfg, db, chS, filterSChan, server) chrS := NewChargerService(cfg, db, chS, filterSChan, server, attrS.GetIntenternalChan(), nil) - srvMngr.AddServices(attrS, chrS, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db) + srvMngr.AddServices(attrS, chrS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/datadb_it_test.go b/services/datadb_it_test.go index 2b5e83cb2..08cac78ce 100644 --- a/services/datadb_it_test.go +++ b/services/datadb_it_test.go @@ -49,7 +49,7 @@ func TestDataDBReload(t *testing.T) { srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg) srvMngr.AddServices(NewAttributeService(cfg, db, - chS, filterSChan, server), NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db) + chS, filterSChan, server), NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/dispatchers_it_test.go b/services/dispatchers_it_test.go index 8e289d126..86e8c679f 100644 --- a/services/dispatchers_it_test.go +++ b/services/dispatchers_it_test.go @@ -52,7 +52,7 @@ func TestDispatcherSReload(t *testing.T) { db := NewDataDBService(cfg) attrS := NewAttributeService(cfg, db, chS, filterSChan, server) srv := NewDispatcherService(cfg, db, chS, filterSChan, server, attrS.GetIntenternalChan()) - srvMngr.AddServices(attrS, srv, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db) + srvMngr.AddServices(attrS, srv, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/dnsagent_it_test.go b/services/dnsagent_it_test.go index 115910c6d..4e1e81486 100644 --- a/services/dnsagent_it_test.go +++ b/services/dnsagent_it_test.go @@ -53,7 +53,7 @@ func TestDNSAgentReload(t *testing.T) { sS := NewSessionService(cfg, db, server, nil, nil, nil, nil, nil, nil, nil, nil, nil, engineShutdown) srv := NewDNSAgent(cfg, filterSChan, sS.GetIntenternalChan(), nil, engineShutdown) - srvMngr.AddServices(srv, sS, NewLoaderService(cfg, nil, filterSChan, server, cacheSChan, nil, engineShutdown), db) + srvMngr.AddServices(srv, sS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/ers_it_test.go b/services/ers_it_test.go index 3e9b230a4..237c0fb33 100644 --- a/services/ers_it_test.go +++ b/services/ers_it_test.go @@ -56,7 +56,7 @@ func TestEventReaderSReload(t *testing.T) { sS := NewSessionService(cfg, db, server, nil, nil, nil, nil, nil, nil, nil, nil, nil, engineShutdown) attrS := NewEventReaderService(cfg, filterSChan, sS.GetIntenternalChan(), nil, engineShutdown) - srvMngr.AddServices(attrS, sS, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db) + srvMngr.AddServices(attrS, sS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/loaders.go b/services/loaders.go index 5f95d361e..87e01579d 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -32,7 +32,7 @@ import ( ) // NewLoaderService returns the Loader Service -func NewLoaderService(cfg *config.CGRConfig, dm *engine.DataManager, +func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, server *utils.Server, cacheSChan, dispatcherChan chan rpcclient.RpcClientConnection, exitChan chan bool) servmanager.Service { @@ -52,7 +52,7 @@ func NewLoaderService(cfg *config.CGRConfig, dm *engine.DataManager, type LoaderService struct { sync.RWMutex cfg *config.CGRConfig - dm *engine.DataManager + dm *DataDBService filterSChan chan *engine.FilterS server *utils.Server cacheSChan chan rpcclient.RpcClientConnection @@ -80,7 +80,7 @@ func (ldrs *LoaderService) Start() (err error) { ldrs.Lock() defer ldrs.Unlock() - ldrs.ldrs = loaders.NewLoaderService(ldrs.dm, ldrs.cfg.LoaderCfg(), + ldrs.ldrs = loaders.NewLoaderService(ldrs.dm.GetDM(), ldrs.cfg.LoaderCfg(), ldrs.cfg.GeneralCfg().DefaultTimezone, ldrs.exitChan, filterS, internalChan) if !ldrs.ldrs.Enabled() { return @@ -105,7 +105,7 @@ func (ldrs *LoaderService) Reload() (err error) { if ldrs.cfg.DispatcherSCfg().Enabled { internalChan = ldrs.dispatcherChan } - ldrs.ldrs.Reload(ldrs.dm, ldrs.cfg.LoaderCfg(), ldrs.cfg.GeneralCfg().DefaultTimezone, + ldrs.ldrs.Reload(ldrs.dm.GetDM(), ldrs.cfg.LoaderCfg(), ldrs.cfg.GeneralCfg().DefaultTimezone, ldrs.exitChan, filterS, internalChan) ldrs.RUnlock() return diff --git a/services/rals_it_test.go b/services/rals_it_test.go index 65a6dd6a2..3399fa33f 100644 --- a/services/rals_it_test.go +++ b/services/rals_it_test.go @@ -70,7 +70,7 @@ func TestRalsReload(t *testing.T) { ralS := NewRalService(cfg, db, nil, nil, chS, filterSChan, server, tS.GetIntenternalChan(), internalChan, cacheSChan, internalChan, internalChan, internalChan, schS, engineShutdown) - srvMngr.AddServices(ralS, schS, tS, NewLoaderService(cfg, nil, filterSChan, server, cacheSChan, nil, engineShutdown), db) + srvMngr.AddServices(ralS, schS, tS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/resources_it_test.go b/services/resources_it_test.go index 2e5780dc8..50119d9dd 100644 --- a/services/resources_it_test.go +++ b/services/resources_it_test.go @@ -54,7 +54,7 @@ func TestResourceSReload(t *testing.T) { db := NewDataDBService(cfg) tS := NewThresholdService(cfg, db, chS, filterSChan, server) reS := NewResourceService(cfg, db, chS, filterSChan, server, tS.GetIntenternalChan(), nil) - srvMngr.AddServices(tS, reS, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db) + srvMngr.AddServices(tS, reS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/schedulers.go b/services/schedulers.go index 43312debf..8747bcaa6 100644 --- a/services/schedulers.go +++ b/services/schedulers.go @@ -68,6 +68,9 @@ func (schS *SchedulerService) Start() (err error) { <-schS.cacheS.GetPrecacheChannel(utils.CacheActionPlans) // wait for ActionPlans to be cached + if !schS.dm.IsRunning() { + return fmt.Errorf("schedulerS needs DB") + } schS.Lock() defer schS.Unlock() diff --git a/services/schedulers_it_test.go b/services/schedulers_it_test.go index 4a059c67e..0604b870c 100644 --- a/services/schedulers_it_test.go +++ b/services/schedulers_it_test.go @@ -49,7 +49,7 @@ func TestSchedulerSReload(t *testing.T) { internalCdrSChan <- nil db := NewDataDBService(cfg) schS := NewSchedulerService(cfg, db, chS, server, internalCdrSChan, nil) - srvMngr.AddServices(schS, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db) + srvMngr.AddServices(schS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/sessions_it_test.go b/services/sessions_it_test.go index 9e5f1889d..775d74137 100644 --- a/services/sessions_it_test.go +++ b/services/sessions_it_test.go @@ -79,7 +79,7 @@ func TestSessionSReload(t *testing.T) { nil, nil, nil, nil) srv := NewSessionService(cfg, db, server, chrS.GetIntenternalChan(), ralS.GetResponder().GetIntenternalChan(), nil, nil, nil, nil, nil, cdrS.GetIntenternalChan(), nil, engineShutdown) - srvMngr.AddServices(srv, chrS, schS, ralS, cdrS, NewLoaderService(cfg, nil, filterSChan, server, cacheSChan, nil, engineShutdown), db) + srvMngr.AddServices(srv, chrS, schS, ralS, cdrS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/stats_it_test.go b/services/stats_it_test.go index 30d45940f..02f5e8f4d 100644 --- a/services/stats_it_test.go +++ b/services/stats_it_test.go @@ -54,7 +54,7 @@ func TestStatSReload(t *testing.T) { db := NewDataDBService(cfg) tS := NewThresholdService(cfg, db, chS, filterSChan, server) sS := NewStatService(cfg, db, chS, filterSChan, server, tS.GetIntenternalChan(), nil) - srvMngr.AddServices(tS, sS, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db) + srvMngr.AddServices(tS, sS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/suppliers_it_test.go b/services/suppliers_it_test.go index 298386faf..a5dcddd3f 100644 --- a/services/suppliers_it_test.go +++ b/services/suppliers_it_test.go @@ -52,7 +52,7 @@ func TestSupplierSReload(t *testing.T) { db := NewDataDBService(cfg) sts := NewStatService(cfg, db, chS, filterSChan, server, nil, nil) supS := NewSupplierService(cfg, db, chS, filterSChan, server, nil, sts.GetIntenternalChan(), nil, nil) - srvMngr.AddServices(supS, sts, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db) + srvMngr.AddServices(supS, sts, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/thresholds_it_test.go b/services/thresholds_it_test.go index 7477f84c0..ca9e008c3 100644 --- a/services/thresholds_it_test.go +++ b/services/thresholds_it_test.go @@ -49,7 +49,7 @@ func TestThresholdSReload(t *testing.T) { srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg) tS := NewThresholdService(cfg, db, chS, filterSChan, server) - srvMngr.AddServices(tS, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db) + srvMngr.AddServices(tS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) }