Updated Kafka checkConfigSanity

This commit is contained in:
Trial97
2019-09-10 15:26:42 +03:00
committed by Dan Christian Bogos
parent 8b70d13eae
commit 577343a94c
4 changed files with 10 additions and 6 deletions

View File

@@ -690,6 +690,9 @@ func (self *CGRConfig) checkConfigSanity() error {
return fmt.Errorf("<%s> empty FieldSep for reader with ID: %s", utils.ERs, rdr.ID)
}
}
if rdr.Type == utils.MetaKafkajsonMap && rdr.RunDelay > 0 {
return fmt.Errorf("<%s> RunDelay field can not be bigger than zero for reader with ID: %s", utils.ERs, rdr.ID)
}
}
}
@@ -1463,8 +1466,8 @@ func (cfg *CGRConfig) loadConfig(path, section string) (err error) {
return cfg.loadConfigFromPath(path, loadFuncs)
}
func (_ *CGRConfig) loadConfigFromReader(rdr io.Reader, loadFuncs []func(jsnCfg *CgrJsonCfg) error) (err error) {
var jsnCfg *CgrJsonCfg = new(CgrJsonCfg)
func (*CGRConfig) loadConfigFromReader(rdr io.Reader, loadFuncs []func(jsnCfg *CgrJsonCfg) error) (err error) {
jsnCfg := new(CgrJsonCfg)
var rjr *rjReader
if rjr, err = NewRjReader(rdr); err != nil {
return

View File

@@ -36,8 +36,8 @@ import (
)
const (
defaultTopic = "cgrates_cdrc"
defaultGroupID = "cgrates_consumer"
defaultTopic = "cgrates"
defaultGroupID = "cgrates"
defaultMaxWait = time.Millisecond
)

View File

@@ -64,8 +64,9 @@ func TestKafkaER(t *testing.T) {
},
}`)
if err != nil {
t.Error(err)
t.Fatal(err)
}
rdrEvents = make(chan *erEvent, 1)
rdrErr = make(chan error, 1)
rdrExit = make(chan struct{}, 1)

View File

@@ -487,7 +487,7 @@ func TestCDRsOnExpKafkaPosterFileFailover(t *testing.T) {
defer reader.Close()
for i := 0; i < 6; i++ {
for i := 0; i < 4; i++ {
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
if m, err := reader.ReadMessage(ctx); err != nil {
t.Fatal(err)