From 8d70e355d79baf9e46a32f8bb6e273bff763293a Mon Sep 17 00:00:00 2001 From: Tripon Alexandru-Ionut Date: Fri, 31 May 2019 10:22:09 +0300 Subject: [PATCH] Updated AWSPoster to reset connection if is forced disconnected --- engine/poster.go | 51 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 47 insertions(+), 4 deletions(-) diff --git a/engine/poster.go b/engine/poster.go index bcb94efb4..588ffc751 100644 --- a/engine/poster.go +++ b/engine/poster.go @@ -24,6 +24,7 @@ import ( "crypto/tls" "fmt" "io/ioutil" + "net" "net/http" "net/url" "os" @@ -492,6 +493,9 @@ func (pstr *AWSPoster) Post(content []byte, fallbackFileName string) (err error) if s, err = pstr.newPosterSession(); err == nil { break } + // reset client and try again + // used in case of closed conection because of idle time + pstr.client = nil time.Sleep(time.Duration(fib()) * time.Second) } if err != nil { @@ -502,30 +506,47 @@ func (pstr *AWSPoster) Post(content []byte, fallbackFileName string) (err error) return err } + ctx := context.Background() for i := 0; i < pstr.attempts; i++ { sender, err := s.NewSender( amqpv1.LinkTargetAddress(pstr.queueID), ) if err != nil { time.Sleep(time.Duration(fib()) * time.Second) + // if pstr.isRecoverableError(err) { + // s.Close(ctx) + // pstr.client.Close() + // pstr.client = nil + // stmp, err := pstr.newPosterSession() + // if err == nil { + // s = stmp + // } + // } continue } - ctx := context.Background() - // Send message err = sender.Send(ctx, amqpv1.NewMessage(content)) + sender.Close(ctx) if err == nil { - sender.Close(ctx) break } time.Sleep(time.Duration(fib()) * time.Second) + // if pstr.isRecoverableError(err) { + // s.Close(ctx) + // pstr.client.Close() + // pstr.client = nil + // stmp, err := pstr.newPosterSession() + // if err == nil { + // s = stmp + // } + // } } if err != nil && fallbackFileName != utils.META_NONE { err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content) return err } if s != nil { - s.Close(context.Background()) + s.Close(ctx) } return } @@ -544,6 +565,28 @@ func (pstr *AWSPoster) newPosterSession() (s *amqpv1.Session, err error) { return pstr.client.NewSession() } +func isRecoverableCloseError(err error) bool { + return err == amqpv1.ErrConnClosed || + err == amqpv1.ErrLinkClosed || + err == amqpv1.ErrSessionClosed +} + +func (pstr *AWSPoster) isRecoverableError(err error) bool { + switch err.(type) { + case *amqpv1.Error, *amqpv1.DetachError, net.Error: + if netErr, ok := err.(net.Error); ok { + if !netErr.Temporary() { + return false + } + } + default: + if !isRecoverableCloseError(err) { + return false + } + } + return true +} + func NewSQSPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) { pstr := &SQSPoster{ attempts: attempts,