diff --git a/config/config.go b/config/config.go index d9304aba5..6365e3dfc 100755 --- a/config/config.go +++ b/config/config.go @@ -690,6 +690,9 @@ func (self *CGRConfig) checkConfigSanity() error { return fmt.Errorf("<%s> empty FieldSep for reader with ID: %s", utils.ERs, rdr.ID) } } + if rdr.Type == utils.MetaKafkajsonMap && rdr.RunDelay > 0 { + return fmt.Errorf("<%s> RunDelay field can not be bigger than zero for reader with ID: %s", utils.ERs, rdr.ID) + } } } @@ -1463,8 +1466,8 @@ func (cfg *CGRConfig) loadConfig(path, section string) (err error) { return cfg.loadConfigFromPath(path, loadFuncs) } -func (_ *CGRConfig) loadConfigFromReader(rdr io.Reader, loadFuncs []func(jsnCfg *CgrJsonCfg) error) (err error) { - var jsnCfg *CgrJsonCfg = new(CgrJsonCfg) +func (*CGRConfig) loadConfigFromReader(rdr io.Reader, loadFuncs []func(jsnCfg *CgrJsonCfg) error) (err error) { + jsnCfg := new(CgrJsonCfg) var rjr *rjReader if rjr, err = NewRjReader(rdr); err != nil { return diff --git a/ers/kafka.go b/ers/kafka.go index 28a38b52b..a1e6a3cdf 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -36,8 +36,8 @@ import ( ) const ( - defaultTopic = "cgrates_cdrc" - defaultGroupID = "cgrates_consumer" + defaultTopic = "cgrates" + defaultGroupID = "cgrates" defaultMaxWait = time.Millisecond ) diff --git a/ers/kafka_it_test.go b/ers/kafka_it_test.go index d346cecfd..c625a72c7 100644 --- a/ers/kafka_it_test.go +++ b/ers/kafka_it_test.go @@ -64,8 +64,9 @@ func TestKafkaER(t *testing.T) { }, }`) if err != nil { - t.Error(err) + t.Fatal(err) } + rdrEvents = make(chan *erEvent, 1) rdrErr = make(chan error, 1) rdrExit = make(chan struct{}, 1) diff --git a/general_tests/cdrs_onlexp_it_test.go b/general_tests/cdrs_onlexp_it_test.go index 4ade74059..f720cbc08 100644 --- a/general_tests/cdrs_onlexp_it_test.go +++ b/general_tests/cdrs_onlexp_it_test.go @@ -487,7 +487,7 @@ func TestCDRsOnExpKafkaPosterFileFailover(t *testing.T) { defer reader.Close() - for i := 0; i < 6; i++ { + for i := 0; i < 4; i++ { ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) if m, err := reader.ReadMessage(ctx); err != nil { t.Fatal(err)