diff --git a/ers/nats.go b/ers/nats.go index 4c02990c2..8e87c6335 100644 --- a/ers/nats.go +++ b/ers/nats.go @@ -113,40 +113,43 @@ func (rdr *NatsER) Serve() (err error) { return } } - for { - if rdr.Config().ConcurrentReqs != -1 { - <-rdr.cap // do not try to read if the limit is reached - } - select { - case <-rdr.rdrExit: - utils.Logger.Info( - fmt.Sprintf("<%s> stop monitoring nats path <%s>", - utils.ERs, rdr.Config().SourcePath)) - nc.Drain() - if rdr.poster != nil { - rdr.poster.Close() + go func() { + for { + if rdr.Config().ConcurrentReqs != -1 { + <-rdr.cap // do not try to read if the limit is reached } - return - case msg := <-ch: - go func(msg *nats.Msg) { - if err := rdr.processMessage(msg.Data); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> processing message %s error: %s", - utils.ERs, string(msg.Data), err.Error())) + select { + case <-rdr.rdrExit: + utils.Logger.Info( + fmt.Sprintf("<%s> stop monitoring nats path <%s>", + utils.ERs, rdr.Config().SourcePath)) + nc.Drain() + if rdr.poster != nil { + rdr.poster.Close() } - if rdr.poster != nil { // post it - if err := ees.ExportWithAttempts(rdr.poster, msg.Data, utils.EmptyString); err != nil { + return + case msg := <-ch: + go func(msg *nats.Msg) { + if err := rdr.processMessage(msg.Data); err != nil { utils.Logger.Warning( - fmt.Sprintf("<%s> writing message %s error: %s", + fmt.Sprintf("<%s> processing message %s error: %s", utils.ERs, string(msg.Data), err.Error())) } - } - if rdr.Config().ConcurrentReqs != -1 { - rdr.cap <- struct{}{} - } - }(msg) + if rdr.poster != nil { // post it + if err := ees.ExportWithAttempts(rdr.poster, msg.Data, utils.EmptyString); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> writing message %s error: %s", + utils.ERs, string(msg.Data), err.Error())) + } + } + if rdr.Config().ConcurrentReqs != -1 { + rdr.cap <- struct{}{} + } + }(msg) + } } - } + }() + return } func (rdr *NatsER) processMessage(msg []byte) (err error) {