Updated AWSPoster to reset connection if is forced disconnected

This commit is contained in:
Tripon Alexandru-Ionut
2019-05-31 10:22:09 +03:00
committed by Dan Christian Bogos
parent 33412f9bb5
commit 8d70e355d7

View File

@@ -24,6 +24,7 @@ import (
"crypto/tls"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
@@ -492,6 +493,9 @@ func (pstr *AWSPoster) Post(content []byte, fallbackFileName string) (err error)
if s, err = pstr.newPosterSession(); err == nil {
break
}
// reset client and try again
// used in case of closed conection because of idle time
pstr.client = nil
time.Sleep(time.Duration(fib()) * time.Second)
}
if err != nil {
@@ -502,30 +506,47 @@ func (pstr *AWSPoster) Post(content []byte, fallbackFileName string) (err error)
return err
}
ctx := context.Background()
for i := 0; i < pstr.attempts; i++ {
sender, err := s.NewSender(
amqpv1.LinkTargetAddress(pstr.queueID),
)
if err != nil {
time.Sleep(time.Duration(fib()) * time.Second)
// if pstr.isRecoverableError(err) {
// s.Close(ctx)
// pstr.client.Close()
// pstr.client = nil
// stmp, err := pstr.newPosterSession()
// if err == nil {
// s = stmp
// }
// }
continue
}
ctx := context.Background()
// Send message
err = sender.Send(ctx, amqpv1.NewMessage(content))
sender.Close(ctx)
if err == nil {
sender.Close(ctx)
break
}
time.Sleep(time.Duration(fib()) * time.Second)
// if pstr.isRecoverableError(err) {
// s.Close(ctx)
// pstr.client.Close()
// pstr.client = nil
// stmp, err := pstr.newPosterSession()
// if err == nil {
// s = stmp
// }
// }
}
if err != nil && fallbackFileName != utils.META_NONE {
err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content)
return err
}
if s != nil {
s.Close(context.Background())
s.Close(ctx)
}
return
}
@@ -544,6 +565,28 @@ func (pstr *AWSPoster) newPosterSession() (s *amqpv1.Session, err error) {
return pstr.client.NewSession()
}
func isRecoverableCloseError(err error) bool {
return err == amqpv1.ErrConnClosed ||
err == amqpv1.ErrLinkClosed ||
err == amqpv1.ErrSessionClosed
}
func (pstr *AWSPoster) isRecoverableError(err error) bool {
switch err.(type) {
case *amqpv1.Error, *amqpv1.DetachError, net.Error:
if netErr, ok := err.(net.Error); ok {
if !netErr.Temporary() {
return false
}
}
default:
if !isRecoverableCloseError(err) {
return false
}
}
return true
}
func NewSQSPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) {
pstr := &SQSPoster{
attempts: attempts,