From b1607b46c7554512e86ca7a5541a5747c4de6343 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Mon, 9 Sep 2019 12:48:58 +0300 Subject: [PATCH] Updated config check sanity for erS --- config/config.go | 26 ++++++++++++++------------ config/config_json.go | 1 - ers/kafka.go | 4 ++-- ers/kafka_it_test.go | 3 +-- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/config/config.go b/config/config.go index 33c1d4986..d9304aba5 100755 --- a/config/config.go +++ b/config/config.go @@ -306,7 +306,7 @@ var posibleLoaderTypes = utils.NewStringSet([]string{utils.MetaAttributes, utils.MetaSuppliers, utils.MetaThresholds, utils.MetaChargers, utils.MetaDispatchers, utils.MetaDispatcherHosts}) -var poisbleReaderType = utils.NewStringSet([]string{utils.MetaFileCSV}) +var poisbleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.MetaKafkajsonMap}) func (self *CGRConfig) checkConfigSanity() error { // Rater checks @@ -676,17 +676,19 @@ func (self *CGRConfig) checkConfigSanity() error { } } for _, rdr := range self.ersCfg.Readers { - for _, dir := range []string{rdr.ProcessedPath, rdr.SourcePath} { - if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) { - return fmt.Errorf("<%s> Nonexistent folder: %s for reader with ID: %s", utils.ERs, dir, rdr.ID) - } - } - - if !poisbleReaderType.Has(rdr.Type) { + if !poisbleReaderTypes.Has(rdr.Type) { return fmt.Errorf("<%s> unsupported data type: %s for reader with ID: %s", utils.ERs, rdr.Type, rdr.ID) } - if rdr.FieldSep == utils.EmptyString { - return fmt.Errorf("<%s> empty FieldSep for reader with ID: %s", utils.ERs, rdr.ID) + + if rdr.Type == utils.MetaFileCSV { + for _, dir := range []string{rdr.ProcessedPath, rdr.SourcePath} { + if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) { + return fmt.Errorf("<%s> Nonexistent folder: %s for reader with ID: %s", utils.ERs, dir, rdr.ID) + } + } + if rdr.FieldSep == utils.EmptyString { + return fmt.Errorf("<%s> empty FieldSep for reader with ID: %s", utils.ERs, rdr.ID) + } } } } @@ -1339,7 +1341,7 @@ func (cfg *CGRConfig) V1GetConfigSection(args *StringWithArgDispatcher, reply *m jsonString = utils.ToJSON(cfg.SupplierSCfg()) case SURETAX_JSON: jsonString = utils.ToJSON(cfg.SureTaxCfg()) - case DispatcherJson: + case DispatcherSJson: jsonString = utils.ToJSON(cfg.DispatcherSCfg()) case LoaderJson: jsonString = utils.ToJSON(cfg.LoaderCfg()) @@ -1491,7 +1493,7 @@ func (cfg *CGRConfig) loadConfigFromPath(path string, loadFuncs []func(jsnCfg *C } return } else if !fi.IsDir() && path != utils.CONFIG_PATH { // If config dir defined, needs to exist, not checking for default - return fmt.Errorf("Path: %s not a directory.", path) + return fmt.Errorf("path: %s not a directory", path) } if fi.IsDir() { return cfg.loadConfigFromFolder(path, loadFuncs) diff --git a/config/config_json.go b/config/config_json.go index 807ae344c..58e7541f2 100644 --- a/config/config_json.go +++ b/config/config_json.go @@ -53,7 +53,6 @@ const ( LoaderJson = "loaders" MAILER_JSN = "mailer" SURETAX_JSON = "suretax" - DispatcherJson = "dispatcher" DispatcherSJson = "dispatchers" CgrLoaderCfgJson = "loader" CgrMigratorCfgJson = "migrator" diff --git a/ers/kafka.go b/ers/kafka.go index fa5ab9c0c..b537bd4c3 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -117,8 +117,8 @@ func (rdr *KafkaER) Serve() (err error) { } if err := rdr.processMessage(msg.Value); err != nil { utils.Logger.Warning( - fmt.Sprintf("<%s> processing message error: %s", - utils.ERs, err.Error())) + fmt.Sprintf("<%s> processing message %s error: %s", + utils.ERs, string(msg.Key), err.Error())) } } }(r) diff --git a/ers/kafka_it_test.go b/ers/kafka_it_test.go index d44d14f4f..1b4564b24 100644 --- a/ers/kafka_it_test.go +++ b/ers/kafka_it_test.go @@ -97,7 +97,7 @@ func TestKafkaER(t *testing.T) { case ev := <-rdrEvents: // fmt.Printf("It took %s to proccess the message.\n", time.Now().Sub(tStart)) if ev.rdrCfg.ID != "kafka" { - t.Errorf("Expected ....") + t.Errorf("Expected 'kakfa' received `%s`", ev.rdrCfg.ID) } expected := &utils.CGREvent{ Tenant: "cgrates.org", @@ -110,7 +110,6 @@ func TestKafkaER(t *testing.T) { if !reflect.DeepEqual(ev.cgrEvent, expected) { t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent)) } - case <-time.After(30 * time.Second): t.Errorf("Timeout") }