Updated LoaderS

This commit is contained in:
Trial97
2019-10-09 17:36:41 +03:00
committed by Dan Christian Bogos
parent f17b4140c3
commit 5549369fbf
8 changed files with 38 additions and 15 deletions

View File

@@ -170,7 +170,7 @@ func NewDefaultCGRConfig() (cfg *CGRConfig, err error) {
cfg.loaderCgrCfg = new(LoaderCgrCfg)
cfg.migratorCgrCfg = new(MigratorCgrCfg)
cfg.mailerCfg = new(MailerCfg)
cfg.loaderCfg = make([]*LoaderSCfg, 0)
cfg.loaderCfg = make(LoaderSCfgs, 0)
cfg.apier = new(ApierCfg)
cfg.ersCfg = new(ERsCfg)
@@ -255,7 +255,7 @@ type CGRConfig struct {
CdreProfiles map[string]*CdreCfg // Cdre config profiles
CdrcProfiles map[string][]*CdrcCfg // Number of CDRC instances running imports, format map[dirPath][]{Configs}
loaderCfg []*LoaderSCfg // LoaderS configs
loaderCfg LoaderSCfgs // LoaderS configs
httpAgentCfg HttpAgentCfgs // HttpAgent configs
ConfigReloads map[string]chan struct{} // Signals to specific entities that a config reload should occur
@@ -1021,7 +1021,7 @@ func (cfg *CGRConfig) loadLoaderSCfg(jsnCfg *CgrJsonCfg) (err error) {
return
}
if jsnLoaderCfg != nil {
// cfg.loaderCfg = make([]*LoaderSCfg, len(jsnLoaderCfg))
// cfg.loaderCfg = make(LoaderSCfgs, len(jsnLoaderCfg))
for _, profile := range jsnLoaderCfg {
loadSCfgp := NewDfltLoaderSCfg()
loadSCfgp.loadFromJsonCfg(profile, cfg.GeneralCfg().RSRSep)
@@ -1232,7 +1232,7 @@ func (cfg *CGRConfig) CacheCfg() CacheCfg {
}
// LoaderCfg returns the Loader Service
func (cfg *CGRConfig) LoaderCfg() []*LoaderSCfg {
func (cfg *CGRConfig) LoaderCfg() LoaderSCfgs {
cfg.lks[LoaderJson].Lock()
defer cfg.lks[LoaderJson].Unlock()
return cfg.loaderCfg
@@ -1685,6 +1685,9 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) {
}
fallthrough
case LoaderJson:
if !fall {
cfg.rldChans[DATADB_JSN] <- struct{}{} // reload datadb before
}
cfg.rldChans[LoaderJson] <- struct{}{}
if !fall {
break

View File

@@ -1038,7 +1038,7 @@ func TestDbDefaults(t *testing.T) {
}
func TestCgrLoaderCfgITDefaults(t *testing.T) {
eCfg := []*LoaderSCfg{
eCfg := LoaderSCfgs{
{
Id: utils.META_DEFAULT,
Enabled: false,

View File

@@ -30,6 +30,19 @@ func NewDfltLoaderSCfg() *LoaderSCfg {
return &dfltVal
}
// LoaderSCfgs to export some methods for LoaderS profiles
type LoaderSCfgs []*LoaderSCfg
// Enabled returns true if Loader Service is enabled
func (ldrs LoaderSCfgs) Enabled() bool {
for _, ldr := range ldrs {
if ldr.Enabled {
return true
}
}
return false
}
type LoaderSCfg struct {
Id string
Enabled bool

View File

@@ -20,7 +20,6 @@ package migrator
import (
"fmt"
"reflect"
"strings"
"github.com/cgrates/cgrates/config"

View File

@@ -31,8 +31,9 @@ import (
// NewDataDBService returns the DataDB Service
func NewDataDBService(cfg *config.CGRConfig) *DataDBService {
return &DataDBService{
cfg: cfg,
db: engine.NewDataManager(nil, cfg.CacheCfg()),
cfg: cfg,
dbchan: make(chan *engine.DataManager, 1),
db: engine.NewDataManager(nil, cfg.CacheCfg()), // to be removed
}
}
@@ -42,7 +43,8 @@ type DataDBService struct {
cfg *config.CGRConfig
oldDBCfg *config.DataDbCfg
db *engine.DataManager
db *engine.DataManager
dbchan chan *engine.DataManager
}
// Start should handle the sercive start
@@ -70,6 +72,7 @@ func (db *DataDBService) Start() (err error) {
fmt.Println(err)
return
}
db.dbchan <- db.db
return
}
@@ -131,7 +134,8 @@ func (db *DataDBService) ShouldRun() bool {
func (db *DataDBService) needsDB() bool {
return db.cfg.RalsCfg().Enabled || db.cfg.SchedulerCfg().Enabled || db.cfg.ChargerSCfg().Enabled ||
db.cfg.AttributeSCfg().Enabled || db.cfg.ResourceSCfg().Enabled || db.cfg.StatSCfg().Enabled ||
db.cfg.ThresholdSCfg().Enabled || db.cfg.SupplierSCfg().Enabled || db.cfg.DispatcherSCfg().Enabled
db.cfg.ThresholdSCfg().Enabled || db.cfg.SupplierSCfg().Enabled || db.cfg.DispatcherSCfg().Enabled ||
db.cfg.LoaderCfg().Enabled()
}
// GetDM returns the DataManager
@@ -156,3 +160,10 @@ func (db *DataDBService) needsConnectionReload() bool {
}
return false
}
// GetDMChan returns the DataManager chanel
func (db *DataDBService) GetDMChan() chan *engine.DataManager {
db.RLock()
defer db.RUnlock()
return db.dbchan
}

View File

@@ -135,5 +135,5 @@ func (ldrs *LoaderService) ServiceName() string {
// ShouldRun returns if the service should be running
func (ldrs *LoaderService) ShouldRun() bool {
return true
return ldrs.cfg.LoaderCfg().Enabled()
}

View File

@@ -68,9 +68,6 @@ func (schS *SchedulerService) Start() (err error) {
<-schS.cacheS.GetPrecacheChannel(utils.CacheActionPlans) // wait for ActionPlans to be cached
if !schS.dm.IsRunning() {
return fmt.Errorf("schedulerS needs DB")
}
schS.Lock()
defer schS.Unlock()

View File

@@ -165,7 +165,7 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
utils.RadiusAgent: srvMngr.GetConfig().RadiusAgentCfg().Enabled,
utils.DiameterAgent: srvMngr.GetConfig().DiameterAgentCfg().Enabled,
utils.HTTPAgent: len(srvMngr.GetConfig().HttpAgentCfg()) != 0,
utils.LoaderS: true,
utils.LoaderS: srvMngr.GetConfig().LoaderCfg().Enabled(),
utils.AnalyzerS: srvMngr.GetConfig().AnalyzerSCfg().Enabled,
utils.DispatcherS: srvMngr.GetConfig().DispatcherSCfg().Enabled,
} {