mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Fixed typos and updated tests
This commit is contained in:
committed by
Dan Christian Bogos
parent
da81afcdea
commit
4ce5caea76
@@ -46,7 +46,8 @@ type DNSAgent struct {
|
||||
server *dns.Server
|
||||
}
|
||||
|
||||
func (da *DNSAgent) newDNSServer() (err error) {
|
||||
// initDNSServer instantiates the DNS server
|
||||
func (da *DNSAgent) initDNSServer() (err error) {
|
||||
handler := dns.HandlerFunc(func(w dns.ResponseWriter, m *dns.Msg) {
|
||||
go da.handleMessage(w, m)
|
||||
})
|
||||
@@ -80,13 +81,13 @@ func (da *DNSAgent) ListenAndServe() (err error) {
|
||||
errChan := make(chan error, 1)
|
||||
rldChan := da.cgrCfg.GetReloadChan(config.DNSAgentJson)
|
||||
|
||||
if err = da.newDNSServer(); err != nil {
|
||||
if err = da.initDNSServer(); err != nil {
|
||||
return
|
||||
}
|
||||
lisenAndServe := func() {
|
||||
listenAndServe := func() {
|
||||
errChan <- da.server.ListenAndServe()
|
||||
}
|
||||
go lisenAndServe()
|
||||
go listenAndServe()
|
||||
for {
|
||||
select {
|
||||
case err = <-errChan:
|
||||
@@ -95,10 +96,10 @@ func (da *DNSAgent) ListenAndServe() (err error) {
|
||||
if err = da.Shutdown(); err != nil {
|
||||
return
|
||||
}
|
||||
if err = da.newDNSServer(); err != nil {
|
||||
if err = da.initDNSServer(); err != nil {
|
||||
return
|
||||
}
|
||||
go lisenAndServe() //restart the gorutine
|
||||
go listenAndServe() //restart the gorutine
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -798,95 +798,6 @@ func startScheduler(internalSchedulerChan chan *scheduler.Scheduler, cacheDoneCh
|
||||
exitChan <- true // Should not get out of loop though
|
||||
}
|
||||
|
||||
// startAttributeService fires up the AttributeS
|
||||
func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnection,
|
||||
cacheS *engine.CacheS, cfg *config.CGRConfig, dm *engine.DataManager,
|
||||
server *utils.Server, filterSChan chan *engine.FilterS, exitChan chan bool) {
|
||||
filterS := <-filterSChan
|
||||
filterSChan <- filterS
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheAttributeProfiles)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes)
|
||||
|
||||
aS, err := engine.NewAttributeService(dm, filterS, cfg)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(
|
||||
fmt.Sprintf("<%s> Could not init, error: %s",
|
||||
utils.AttributeS, err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
if err := aS.ListenAndServe(exitChan); err != nil {
|
||||
utils.Logger.Crit(
|
||||
fmt.Sprintf("<%s> Error: %s listening for packets",
|
||||
utils.AttributeS, err.Error()))
|
||||
}
|
||||
aS.Shutdown()
|
||||
exitChan <- true
|
||||
return
|
||||
}()
|
||||
aSv1 := v1.NewAttributeSv1(aS)
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(aSv1)
|
||||
}
|
||||
internalAttributeSChan <- aSv1
|
||||
}
|
||||
|
||||
// startChargerService fires up the ChargerS
|
||||
func startChargerService(internalChargerSChan, internalAttributeSChan,
|
||||
internalDispatcherSChan chan rpcclient.RpcClientConnection,
|
||||
cacheS *engine.CacheS, cfg *config.CGRConfig,
|
||||
dm *engine.DataManager, server *utils.Server,
|
||||
filterSChan chan *engine.FilterS, exitChan chan bool) {
|
||||
filterS := <-filterSChan
|
||||
filterSChan <- filterS
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheChargerProfiles)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheChargerFilterIndexes)
|
||||
var attrSConn rpcclient.RpcClientConnection
|
||||
var err error
|
||||
intAttributeSChan := internalAttributeSChan
|
||||
if cfg.DispatcherSCfg().Enabled {
|
||||
intAttributeSChan = internalDispatcherSChan
|
||||
}
|
||||
if len(cfg.ChargerSCfg().AttributeSConns) != 0 { // AttributeS connection init
|
||||
attrSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
|
||||
cfg.TlsCfg().ClientKey,
|
||||
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
|
||||
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
|
||||
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
|
||||
cfg.ChargerSCfg().AttributeSConns, intAttributeSChan, false)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
|
||||
utils.ChargerS, utils.AttributeS, err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
cS, err := engine.NewChargerService(dm, filterS, attrSConn, cfg)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(
|
||||
fmt.Sprintf("<%s> Could not init, error: %s",
|
||||
utils.ChargerS, err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
if err := cS.ListenAndServe(exitChan); err != nil {
|
||||
utils.Logger.Crit(
|
||||
fmt.Sprintf("<%s> Error: %s listening for packets",
|
||||
utils.ChargerS, err.Error()))
|
||||
}
|
||||
cS.Shutdown()
|
||||
exitChan <- true
|
||||
return
|
||||
}()
|
||||
cSv1 := v1.NewChargerSv1(cS)
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(cSv1)
|
||||
}
|
||||
internalChargerSChan <- cSv1
|
||||
}
|
||||
|
||||
func startResourceService(internalRsChan, internalThresholdSChan,
|
||||
internalDispatcherSChan chan rpcclient.RpcClientConnection,
|
||||
cacheS *engine.CacheS, cfg *config.CGRConfig,
|
||||
@@ -1807,16 +1718,6 @@ func main() {
|
||||
// Start FilterS
|
||||
go startFilterService(filterSChan, cacheS, internalStatSChan, internalRsChan, internalRaterChan, cfg, dm, exitChan)
|
||||
|
||||
// if cfg.AttributeSCfg().Enabled {
|
||||
// go startAttributeService(internalAttributeSChan, cacheS,
|
||||
// cfg, dm, server, filterSChan, exitChan)
|
||||
// }
|
||||
// if cfg.ChargerSCfg().Enabled {
|
||||
// go startChargerService(internalChargerSChan, internalAttributeSChan,
|
||||
// internalDispatcherSChan, cacheS, cfg, dm, server,
|
||||
// filterSChan, exitChan)
|
||||
// }
|
||||
|
||||
// Start RL service
|
||||
if cfg.ResourceSCfg().Enabled {
|
||||
go startResourceService(internalRsChan, internalThresholdSChan,
|
||||
|
||||
@@ -131,7 +131,7 @@ func SetCgrConfig(cfg *CGRConfig) {
|
||||
|
||||
func NewDefaultCGRConfig() (cfg *CGRConfig, err error) {
|
||||
cfg = new(CGRConfig)
|
||||
cfg.populteChanels()
|
||||
cfg.initChanels()
|
||||
cfg.DataFolderPath = "/usr/share/cgrates/"
|
||||
cfg.MaxCallDuration = time.Duration(3) * time.Hour // Hardcoded for now
|
||||
|
||||
@@ -300,7 +300,7 @@ var posibleLoaderTypes = utils.NewStringSet([]string{utils.MetaAttributes,
|
||||
utils.MetaSuppliers, utils.MetaThresholds, utils.MetaChargers,
|
||||
utils.MetaDispatchers, utils.MetaDispatcherHosts})
|
||||
|
||||
var poisbleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.MetaKafkajsonMap})
|
||||
var possibleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.MetaKafkajsonMap})
|
||||
|
||||
func (self *CGRConfig) checkConfigSanity() error {
|
||||
// Rater checks
|
||||
@@ -670,7 +670,7 @@ func (self *CGRConfig) checkConfigSanity() error {
|
||||
}
|
||||
}
|
||||
for _, rdr := range self.ersCfg.Readers {
|
||||
if !poisbleReaderTypes.Has(rdr.Type) {
|
||||
if !possibleReaderTypes.Has(rdr.Type) {
|
||||
return fmt.Errorf("<%s> unsupported data type: %s for reader with ID: %s", utils.ERs, rdr.Type, rdr.ID)
|
||||
}
|
||||
|
||||
@@ -1557,6 +1557,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) {
|
||||
}
|
||||
fallthrough
|
||||
case ChargerSCfgJson:
|
||||
cfg.rldChans[ChargerSCfgJson] <- struct{}{}
|
||||
if !fall {
|
||||
break
|
||||
}
|
||||
@@ -2021,7 +2022,7 @@ func (cfg *CGRConfig) loadConfigFromHttp(urlPaths string, loadFuncs []func(jsnCf
|
||||
}
|
||||
|
||||
// populates the config locks and the reload channels
|
||||
func (cfg *CGRConfig) populteChanels() {
|
||||
func (cfg *CGRConfig) initChanels() {
|
||||
cfg.lks = make(map[string]*sync.RWMutex)
|
||||
cfg.rldChans = make(map[string]chan struct{})
|
||||
for _, section := range []string{GENERAL_JSN, DATADB_JSN, STORDB_JSN, LISTEN_JSN, TlsCfgJson, HTTP_JSN, SCHEDULER_JSN, CACHE_JSN, FILTERS_JSON, RALS_JSN,
|
||||
|
||||
@@ -441,7 +441,6 @@ func TestCgrCfgV1ReloadConfigSection(t *testing.T) {
|
||||
map[string]interface{}{
|
||||
"ConcurrentReqs": 1024,
|
||||
"ContentFields": content,
|
||||
"Continue": false,
|
||||
"FieldSep": ",",
|
||||
"Filters": []interface{}{},
|
||||
"Flags": map[string]interface{}{},
|
||||
@@ -449,7 +448,6 @@ func TestCgrCfgV1ReloadConfigSection(t *testing.T) {
|
||||
"ID": "*default",
|
||||
"ProcessedPath": "/var/spool/cgrates/cdrc/out",
|
||||
"RunDelay": 0,
|
||||
"SourceID": "ers_csv",
|
||||
"SourcePath": "/var/spool/cgrates/cdrc/in",
|
||||
"Tenant": nil,
|
||||
"Timezone": "",
|
||||
@@ -459,7 +457,6 @@ func TestCgrCfgV1ReloadConfigSection(t *testing.T) {
|
||||
},
|
||||
map[string]interface{}{
|
||||
"ConcurrentReqs": 1024,
|
||||
"Continue": false,
|
||||
"FieldSep": ",",
|
||||
"Filters": nil,
|
||||
"Flags": map[string]interface{}{
|
||||
@@ -473,7 +470,6 @@ func TestCgrCfgV1ReloadConfigSection(t *testing.T) {
|
||||
"Tenant": nil,
|
||||
"Timezone": "",
|
||||
"TrailerFields": []interface{}{},
|
||||
"SourceID": "ers_csv",
|
||||
"Type": "*file_csv",
|
||||
"XmlRootPath": "",
|
||||
"ContentFields": content,
|
||||
|
||||
@@ -28,7 +28,6 @@ import (
|
||||
)
|
||||
|
||||
func TestNewInvalidReader(t *testing.T) {
|
||||
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
reader := cfg.ERsCfg().Readers[0]
|
||||
reader.Type = "Invalid"
|
||||
@@ -43,7 +42,6 @@ func TestNewInvalidReader(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNewCsvReader(t *testing.T) {
|
||||
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
fltr := &engine.FilterS{}
|
||||
reader := cfg.ERsCfg().Readers[0]
|
||||
@@ -66,12 +64,12 @@ func TestNewCsvReader(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNewKafkaReader(t *testing.T) {
|
||||
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
fltr := &engine.FilterS{}
|
||||
reader := cfg.ERsCfg().Readers[0]
|
||||
reader.Type = utils.MetaKafkajsonMap
|
||||
reader.ID = "file_reader"
|
||||
reader.ConcurrentReqs = -1
|
||||
cfg.ERsCfg().Readers = append(cfg.ERsCfg().Readers, reader)
|
||||
if len(cfg.ERsCfg().Readers) != 2 {
|
||||
t.Errorf("Expecting: <2>, received: <%+v>", len(cfg.ERsCfg().Readers))
|
||||
|
||||
Reference in New Issue
Block a user