diff --git a/ers/ers.go b/ers/ers.go index 42324b00c..bc7a9674c 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -87,7 +87,6 @@ func (erS *ERService) ListenAndServe(stopChan, cfgRldChan chan struct{}) (err er erS.closeAllRdrs() return case erEv := <-erS.rdrEvents: - fmt.Println("yay") if err := erS.processEvent(erEv.cgrEvent, erEv.rdrCfg); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> reading event: <%s> got error: <%s>", diff --git a/ers/ers_it_test.go b/ers/ers_it_test.go index aae352dfe..c2a3b347e 100644 --- a/ers/ers_it_test.go +++ b/ers/ers_it_test.go @@ -231,7 +231,7 @@ func TestERsListenAndServeRdrErr(t *testing.T) { cfgRldChan := make(chan struct{}, 1) srv.rdrErr = make(chan error, 1) srv.rdrErr <- utils.ErrNotFound - time.Sleep(30 * time.Millisecond) + time.Sleep(10 * time.Millisecond) err := srv.ListenAndServe(stopChan, cfgRldChan) if err == nil || err != utils.ErrNotFound { t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrNotFound, err) @@ -269,14 +269,13 @@ func TestERsListenAndServeStopchan(t *testing.T) { stopChan := make(chan struct{}, 1) cfgRldChan := make(chan struct{}, 1) stopChan <- struct{}{} - time.Sleep(30 * time.Millisecond) + time.Sleep(10 * time.Millisecond) err := srv.ListenAndServe(stopChan, cfgRldChan) if err != nil { t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", nil, err) } } -/* func TestERsListenAndServeRdrEvents(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.ERsCfg().Readers = []*config.EventReaderCfg{ @@ -307,6 +306,7 @@ func TestERsListenAndServeRdrEvents(t *testing.T) { srv := NewERService(cfg, fltrS, nil) stopChan := make(chan struct{}, 1) cfgRldChan := make(chan struct{}, 1) + srv.rdrErr = make(chan error, 1) srv.rdrEvents = make(chan *erEvent, 1) srv.rdrEvents <- &erEvent{ cgrEvent: &utils.CGREvent{ @@ -339,10 +339,67 @@ func TestERsListenAndServeRdrEvents(t *testing.T) { CacheDumpFields: nil, }, } - time.Sleep(30 * time.Millisecond) + time.Sleep(10 * time.Millisecond) + srv.rdrErr <- utils.ErrNotFound + time.Sleep(15 * time.Millisecond) err := srv.ListenAndServe(stopChan, cfgRldChan) - if err != nil { - t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", nil, err) + if err == nil || err != utils.ErrNotFound { + t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrNotFound, err) + } +} + +func TestERsListenAndServeCfgRldChan(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.ERsCfg().Readers = []*config.EventReaderCfg{ + { + ID: "", + Type: utils.MetaNone, + }, + } + fltrS := &engine.FilterS{} + srv := NewERService(cfg, fltrS, nil) + stopChan := make(chan struct{}, 1) + cfgRldChan := make(chan struct{}, 1) + srv.rdrErr = make(chan error, 1) + cfgRldChan <- struct{}{} + time.Sleep(10 * time.Millisecond) + srv.rdrErr <- utils.ErrNotFound + time.Sleep(15 * time.Millisecond) + err := srv.ListenAndServe(stopChan, cfgRldChan) + if err == nil || err != utils.ErrNotFound { + t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrNotFound, err) + } +} + +func TestERsListenAndServeCfgRldChan2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.ERsCfg().Readers = []*config.EventReaderCfg{ + { + ID: "test", + Type: utils.MetaNone, + }, + } + fltrS := &engine.FilterS{} + srv := NewERService(cfg, fltrS, nil) + exp := &CSVFileER{ + cgrCfg: cfg, + cfgIdx: 0, + } + var expected EventReader = exp + srv.rdrs = map[string]EventReader{ + "test": expected, + } + srv.stopLsn["test"] = make(chan struct{}) + stopChan := make(chan struct{}, 1) + cfgRldChan := make(chan struct{}, 1) + srv.rdrErr = make(chan error, 1) + + cfgRldChan <- struct{}{} + time.Sleep(10 * time.Millisecond) + srv.rdrErr <- utils.ErrNotFound + time.Sleep(15 * time.Millisecond) + err := srv.ListenAndServe(stopChan, cfgRldChan) + if err == nil || err != utils.ErrNotFound { + t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrNotFound, err) } } -*/