Updated nats ERs

This commit is contained in:
Trial97
2021-08-30 10:35:13 +03:00
committed by Dan Christian Bogos
parent cec7899bc9
commit 76bc9f0cd8

View File

@@ -113,40 +113,43 @@ func (rdr *NatsER) Serve() (err error) {
return
}
}
for {
if rdr.Config().ConcurrentReqs != -1 {
<-rdr.cap // do not try to read if the limit is reached
}
select {
case <-rdr.rdrExit:
utils.Logger.Info(
fmt.Sprintf("<%s> stop monitoring nats path <%s>",
utils.ERs, rdr.Config().SourcePath))
nc.Drain()
if rdr.poster != nil {
rdr.poster.Close()
go func() {
for {
if rdr.Config().ConcurrentReqs != -1 {
<-rdr.cap // do not try to read if the limit is reached
}
return
case msg := <-ch:
go func(msg *nats.Msg) {
if err := rdr.processMessage(msg.Data); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing message %s error: %s",
utils.ERs, string(msg.Data), err.Error()))
select {
case <-rdr.rdrExit:
utils.Logger.Info(
fmt.Sprintf("<%s> stop monitoring nats path <%s>",
utils.ERs, rdr.Config().SourcePath))
nc.Drain()
if rdr.poster != nil {
rdr.poster.Close()
}
if rdr.poster != nil { // post it
if err := ees.ExportWithAttempts(rdr.poster, msg.Data, utils.EmptyString); err != nil {
return
case msg := <-ch:
go func(msg *nats.Msg) {
if err := rdr.processMessage(msg.Data); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> writing message %s error: %s",
fmt.Sprintf("<%s> processing message %s error: %s",
utils.ERs, string(msg.Data), err.Error()))
}
}
if rdr.Config().ConcurrentReqs != -1 {
rdr.cap <- struct{}{}
}
}(msg)
if rdr.poster != nil { // post it
if err := ees.ExportWithAttempts(rdr.poster, msg.Data, utils.EmptyString); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> writing message %s error: %s",
utils.ERs, string(msg.Data), err.Error()))
}
}
if rdr.Config().ConcurrentReqs != -1 {
rdr.cap <- struct{}{}
}
}(msg)
}
}
}
}()
return
}
func (rdr *NatsER) processMessage(msg []byte) (err error) {