From 4ce5caea763d87cfcb6f13918b69e21f7aefef93 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Mon, 16 Sep 2019 11:44:16 +0300 Subject: [PATCH] Fixed typos and updated tests --- agents/dnsagent.go | 13 ++--- cmd/cgr-engine/cgr-engine.go | 99 ------------------------------------ config/config.go | 9 ++-- config/config_it_test.go | 4 -- ers/readers_test.go | 4 +- 5 files changed, 13 insertions(+), 116 deletions(-) diff --git a/agents/dnsagent.go b/agents/dnsagent.go index 199eb09fd..294a5a34c 100644 --- a/agents/dnsagent.go +++ b/agents/dnsagent.go @@ -46,7 +46,8 @@ type DNSAgent struct { server *dns.Server } -func (da *DNSAgent) newDNSServer() (err error) { +// initDNSServer instantiates the DNS server +func (da *DNSAgent) initDNSServer() (err error) { handler := dns.HandlerFunc(func(w dns.ResponseWriter, m *dns.Msg) { go da.handleMessage(w, m) }) @@ -80,13 +81,13 @@ func (da *DNSAgent) ListenAndServe() (err error) { errChan := make(chan error, 1) rldChan := da.cgrCfg.GetReloadChan(config.DNSAgentJson) - if err = da.newDNSServer(); err != nil { + if err = da.initDNSServer(); err != nil { return } - lisenAndServe := func() { + listenAndServe := func() { errChan <- da.server.ListenAndServe() } - go lisenAndServe() + go listenAndServe() for { select { case err = <-errChan: @@ -95,10 +96,10 @@ func (da *DNSAgent) ListenAndServe() (err error) { if err = da.Shutdown(); err != nil { return } - if err = da.newDNSServer(); err != nil { + if err = da.initDNSServer(); err != nil { return } - go lisenAndServe() //restart the gorutine + go listenAndServe() //restart the gorutine } } } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index bef98c979..08f8302a2 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -798,95 +798,6 @@ func startScheduler(internalSchedulerChan chan *scheduler.Scheduler, cacheDoneCh exitChan <- true // Should not get out of loop though } -// startAttributeService fires up the AttributeS -func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnection, - cacheS *engine.CacheS, cfg *config.CGRConfig, dm *engine.DataManager, - server *utils.Server, filterSChan chan *engine.FilterS, exitChan chan bool) { - filterS := <-filterSChan - filterSChan <- filterS - <-cacheS.GetPrecacheChannel(utils.CacheAttributeProfiles) - <-cacheS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes) - - aS, err := engine.NewAttributeService(dm, filterS, cfg) - if err != nil { - utils.Logger.Crit( - fmt.Sprintf("<%s> Could not init, error: %s", - utils.AttributeS, err.Error())) - exitChan <- true - return - } - go func() { - if err := aS.ListenAndServe(exitChan); err != nil { - utils.Logger.Crit( - fmt.Sprintf("<%s> Error: %s listening for packets", - utils.AttributeS, err.Error())) - } - aS.Shutdown() - exitChan <- true - return - }() - aSv1 := v1.NewAttributeSv1(aS) - if !cfg.DispatcherSCfg().Enabled { - server.RpcRegister(aSv1) - } - internalAttributeSChan <- aSv1 -} - -// startChargerService fires up the ChargerS -func startChargerService(internalChargerSChan, internalAttributeSChan, - internalDispatcherSChan chan rpcclient.RpcClientConnection, - cacheS *engine.CacheS, cfg *config.CGRConfig, - dm *engine.DataManager, server *utils.Server, - filterSChan chan *engine.FilterS, exitChan chan bool) { - filterS := <-filterSChan - filterSChan <- filterS - <-cacheS.GetPrecacheChannel(utils.CacheChargerProfiles) - <-cacheS.GetPrecacheChannel(utils.CacheChargerFilterIndexes) - var attrSConn rpcclient.RpcClientConnection - var err error - intAttributeSChan := internalAttributeSChan - if cfg.DispatcherSCfg().Enabled { - intAttributeSChan = internalDispatcherSChan - } - if len(cfg.ChargerSCfg().AttributeSConns) != 0 { // AttributeS connection init - attrSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.ChargerSCfg().AttributeSConns, intAttributeSChan, false) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", - utils.ChargerS, utils.AttributeS, err.Error())) - exitChan <- true - return - } - } - cS, err := engine.NewChargerService(dm, filterS, attrSConn, cfg) - if err != nil { - utils.Logger.Crit( - fmt.Sprintf("<%s> Could not init, error: %s", - utils.ChargerS, err.Error())) - exitChan <- true - return - } - go func() { - if err := cS.ListenAndServe(exitChan); err != nil { - utils.Logger.Crit( - fmt.Sprintf("<%s> Error: %s listening for packets", - utils.ChargerS, err.Error())) - } - cS.Shutdown() - exitChan <- true - return - }() - cSv1 := v1.NewChargerSv1(cS) - if !cfg.DispatcherSCfg().Enabled { - server.RpcRegister(cSv1) - } - internalChargerSChan <- cSv1 -} - func startResourceService(internalRsChan, internalThresholdSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS, cfg *config.CGRConfig, @@ -1807,16 +1718,6 @@ func main() { // Start FilterS go startFilterService(filterSChan, cacheS, internalStatSChan, internalRsChan, internalRaterChan, cfg, dm, exitChan) - // if cfg.AttributeSCfg().Enabled { - // go startAttributeService(internalAttributeSChan, cacheS, - // cfg, dm, server, filterSChan, exitChan) - // } - // if cfg.ChargerSCfg().Enabled { - // go startChargerService(internalChargerSChan, internalAttributeSChan, - // internalDispatcherSChan, cacheS, cfg, dm, server, - // filterSChan, exitChan) - // } - // Start RL service if cfg.ResourceSCfg().Enabled { go startResourceService(internalRsChan, internalThresholdSChan, diff --git a/config/config.go b/config/config.go index d2cda7aef..31fe4ce12 100755 --- a/config/config.go +++ b/config/config.go @@ -131,7 +131,7 @@ func SetCgrConfig(cfg *CGRConfig) { func NewDefaultCGRConfig() (cfg *CGRConfig, err error) { cfg = new(CGRConfig) - cfg.populteChanels() + cfg.initChanels() cfg.DataFolderPath = "/usr/share/cgrates/" cfg.MaxCallDuration = time.Duration(3) * time.Hour // Hardcoded for now @@ -300,7 +300,7 @@ var posibleLoaderTypes = utils.NewStringSet([]string{utils.MetaAttributes, utils.MetaSuppliers, utils.MetaThresholds, utils.MetaChargers, utils.MetaDispatchers, utils.MetaDispatcherHosts}) -var poisbleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.MetaKafkajsonMap}) +var possibleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.MetaKafkajsonMap}) func (self *CGRConfig) checkConfigSanity() error { // Rater checks @@ -670,7 +670,7 @@ func (self *CGRConfig) checkConfigSanity() error { } } for _, rdr := range self.ersCfg.Readers { - if !poisbleReaderTypes.Has(rdr.Type) { + if !possibleReaderTypes.Has(rdr.Type) { return fmt.Errorf("<%s> unsupported data type: %s for reader with ID: %s", utils.ERs, rdr.Type, rdr.ID) } @@ -1557,6 +1557,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) { } fallthrough case ChargerSCfgJson: + cfg.rldChans[ChargerSCfgJson] <- struct{}{} if !fall { break } @@ -2021,7 +2022,7 @@ func (cfg *CGRConfig) loadConfigFromHttp(urlPaths string, loadFuncs []func(jsnCf } // populates the config locks and the reload channels -func (cfg *CGRConfig) populteChanels() { +func (cfg *CGRConfig) initChanels() { cfg.lks = make(map[string]*sync.RWMutex) cfg.rldChans = make(map[string]chan struct{}) for _, section := range []string{GENERAL_JSN, DATADB_JSN, STORDB_JSN, LISTEN_JSN, TlsCfgJson, HTTP_JSN, SCHEDULER_JSN, CACHE_JSN, FILTERS_JSON, RALS_JSN, diff --git a/config/config_it_test.go b/config/config_it_test.go index 21de6aea2..c22eb35ec 100644 --- a/config/config_it_test.go +++ b/config/config_it_test.go @@ -441,7 +441,6 @@ func TestCgrCfgV1ReloadConfigSection(t *testing.T) { map[string]interface{}{ "ConcurrentReqs": 1024, "ContentFields": content, - "Continue": false, "FieldSep": ",", "Filters": []interface{}{}, "Flags": map[string]interface{}{}, @@ -449,7 +448,6 @@ func TestCgrCfgV1ReloadConfigSection(t *testing.T) { "ID": "*default", "ProcessedPath": "/var/spool/cgrates/cdrc/out", "RunDelay": 0, - "SourceID": "ers_csv", "SourcePath": "/var/spool/cgrates/cdrc/in", "Tenant": nil, "Timezone": "", @@ -459,7 +457,6 @@ func TestCgrCfgV1ReloadConfigSection(t *testing.T) { }, map[string]interface{}{ "ConcurrentReqs": 1024, - "Continue": false, "FieldSep": ",", "Filters": nil, "Flags": map[string]interface{}{ @@ -473,7 +470,6 @@ func TestCgrCfgV1ReloadConfigSection(t *testing.T) { "Tenant": nil, "Timezone": "", "TrailerFields": []interface{}{}, - "SourceID": "ers_csv", "Type": "*file_csv", "XmlRootPath": "", "ContentFields": content, diff --git a/ers/readers_test.go b/ers/readers_test.go index 404bf2653..c9800c640 100644 --- a/ers/readers_test.go +++ b/ers/readers_test.go @@ -28,7 +28,6 @@ import ( ) func TestNewInvalidReader(t *testing.T) { - cfg, _ := config.NewDefaultCGRConfig() reader := cfg.ERsCfg().Readers[0] reader.Type = "Invalid" @@ -43,7 +42,6 @@ func TestNewInvalidReader(t *testing.T) { } func TestNewCsvReader(t *testing.T) { - cfg, _ := config.NewDefaultCGRConfig() fltr := &engine.FilterS{} reader := cfg.ERsCfg().Readers[0] @@ -66,12 +64,12 @@ func TestNewCsvReader(t *testing.T) { } func TestNewKafkaReader(t *testing.T) { - cfg, _ := config.NewDefaultCGRConfig() fltr := &engine.FilterS{} reader := cfg.ERsCfg().Readers[0] reader.Type = utils.MetaKafkajsonMap reader.ID = "file_reader" + reader.ConcurrentReqs = -1 cfg.ERsCfg().Readers = append(cfg.ERsCfg().Readers, reader) if len(cfg.ERsCfg().Readers) != 2 { t.Errorf("Expecting: <2>, received: <%+v>", len(cfg.ERsCfg().Readers))