Updated config check sanity for erS

This commit is contained in:
Trial97
2019-09-09 12:48:58 +03:00
committed by Dan Christian Bogos
parent ca6f3b3319
commit b1607b46c7
4 changed files with 17 additions and 17 deletions

View File

@@ -306,7 +306,7 @@ var posibleLoaderTypes = utils.NewStringSet([]string{utils.MetaAttributes,
utils.MetaSuppliers, utils.MetaThresholds, utils.MetaChargers,
utils.MetaDispatchers, utils.MetaDispatcherHosts})
var poisbleReaderType = utils.NewStringSet([]string{utils.MetaFileCSV})
var poisbleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.MetaKafkajsonMap})
func (self *CGRConfig) checkConfigSanity() error {
// Rater checks
@@ -676,17 +676,19 @@ func (self *CGRConfig) checkConfigSanity() error {
}
}
for _, rdr := range self.ersCfg.Readers {
for _, dir := range []string{rdr.ProcessedPath, rdr.SourcePath} {
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
return fmt.Errorf("<%s> Nonexistent folder: %s for reader with ID: %s", utils.ERs, dir, rdr.ID)
}
}
if !poisbleReaderType.Has(rdr.Type) {
if !poisbleReaderTypes.Has(rdr.Type) {
return fmt.Errorf("<%s> unsupported data type: %s for reader with ID: %s", utils.ERs, rdr.Type, rdr.ID)
}
if rdr.FieldSep == utils.EmptyString {
return fmt.Errorf("<%s> empty FieldSep for reader with ID: %s", utils.ERs, rdr.ID)
if rdr.Type == utils.MetaFileCSV {
for _, dir := range []string{rdr.ProcessedPath, rdr.SourcePath} {
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
return fmt.Errorf("<%s> Nonexistent folder: %s for reader with ID: %s", utils.ERs, dir, rdr.ID)
}
}
if rdr.FieldSep == utils.EmptyString {
return fmt.Errorf("<%s> empty FieldSep for reader with ID: %s", utils.ERs, rdr.ID)
}
}
}
}
@@ -1339,7 +1341,7 @@ func (cfg *CGRConfig) V1GetConfigSection(args *StringWithArgDispatcher, reply *m
jsonString = utils.ToJSON(cfg.SupplierSCfg())
case SURETAX_JSON:
jsonString = utils.ToJSON(cfg.SureTaxCfg())
case DispatcherJson:
case DispatcherSJson:
jsonString = utils.ToJSON(cfg.DispatcherSCfg())
case LoaderJson:
jsonString = utils.ToJSON(cfg.LoaderCfg())
@@ -1491,7 +1493,7 @@ func (cfg *CGRConfig) loadConfigFromPath(path string, loadFuncs []func(jsnCfg *C
}
return
} else if !fi.IsDir() && path != utils.CONFIG_PATH { // If config dir defined, needs to exist, not checking for default
return fmt.Errorf("Path: %s not a directory.", path)
return fmt.Errorf("path: %s not a directory", path)
}
if fi.IsDir() {
return cfg.loadConfigFromFolder(path, loadFuncs)

View File

@@ -53,7 +53,6 @@ const (
LoaderJson = "loaders"
MAILER_JSN = "mailer"
SURETAX_JSON = "suretax"
DispatcherJson = "dispatcher"
DispatcherSJson = "dispatchers"
CgrLoaderCfgJson = "loader"
CgrMigratorCfgJson = "migrator"

View File

@@ -117,8 +117,8 @@ func (rdr *KafkaER) Serve() (err error) {
}
if err := rdr.processMessage(msg.Value); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing message error: %s",
utils.ERs, err.Error()))
fmt.Sprintf("<%s> processing message %s error: %s",
utils.ERs, string(msg.Key), err.Error()))
}
}
}(r)

View File

@@ -97,7 +97,7 @@ func TestKafkaER(t *testing.T) {
case ev := <-rdrEvents:
// fmt.Printf("It took %s to proccess the message.\n", time.Now().Sub(tStart))
if ev.rdrCfg.ID != "kafka" {
t.Errorf("Expected ....")
t.Errorf("Expected 'kakfa' received `%s`", ev.rdrCfg.ID)
}
expected := &utils.CGREvent{
Tenant: "cgrates.org",
@@ -110,7 +110,6 @@ func TestKafkaER(t *testing.T) {
if !reflect.DeepEqual(ev.cgrEvent, expected) {
t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent))
}
case <-time.After(30 * time.Second):
t.Errorf("Timeout")
}