From fec0de9ef8c6327d52717817cb9a874e8b2d2dfc Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 18 Dec 2019 13:29:51 +0200 Subject: [PATCH] Updated Posters --- engine/pstr_amqpv1.go | 12 +++++++++--- engine/pstr_http.go | 15 +++++++++++---- engine/pstr_s3.go | 7 ++++++- engine/pstr_sqs.go | 7 ++++++- 4 files changed, 32 insertions(+), 9 deletions(-) diff --git a/engine/pstr_amqpv1.go b/engine/pstr_amqpv1.go index b3f6a65c1..a121831fc 100644 --- a/engine/pstr_amqpv1.go +++ b/engine/pstr_amqpv1.go @@ -74,7 +74,9 @@ func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName, _ string) (err pstr.client.Close() // Make shure the connection is closed before reseting it } pstr.client = nil - time.Sleep(time.Duration(fib()) * time.Second) + if i+1 < pstr.attempts { + time.Sleep(time.Duration(fib()) * time.Second) + } } if err != nil { if fallbackFileName != utils.META_NONE { @@ -90,7 +92,9 @@ func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName, _ string) (err amqpv1.LinkTargetAddress(pstr.queueID), ) if err != nil { - time.Sleep(time.Duration(fib()) * time.Second) + if i+1 < pstr.attempts { + time.Sleep(time.Duration(fib()) * time.Second) + } // if pstr.isRecoverableError(err) { // s.Close(ctx) // pstr.client.Close() @@ -108,7 +112,9 @@ func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName, _ string) (err if err == nil { break } - time.Sleep(time.Duration(fib()) * time.Second) + if i+1 < pstr.attempts { + time.Sleep(time.Duration(fib()) * time.Second) + } // if pstr.isRecoverableError(err) { // s.Close(ctx) // pstr.client.Close() diff --git a/engine/pstr_http.go b/engine/pstr_http.go index 9de52d1af..43b14b01e 100644 --- a/engine/pstr_http.go +++ b/engine/pstr_http.go @@ -28,6 +28,7 @@ import ( "os" "time" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/utils" ) @@ -93,19 +94,25 @@ func (poster *HTTPPoster) Post(addr string, contentType string, content interfac } if err != nil { utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error())) - time.Sleep(time.Duration(fib()) * time.Second) + if i+1 < attempts { + time.Sleep(time.Duration(fib()) * time.Second) + } continue } defer resp.Body.Close() respBody, err = ioutil.ReadAll(resp.Body) if err != nil { utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error())) - time.Sleep(time.Duration(fib()) * time.Second) + if i+1 < attempts { + time.Sleep(time.Duration(fib()) * time.Second) + } continue } if resp.StatusCode > 299 { utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, unexpected status code received: <%d>", addr, resp.StatusCode)) - time.Sleep(time.Duration(fib()) * time.Second) + if i+1 < attempts { + time.Sleep(time.Duration(fib()) * time.Second) + } continue } return respBody, nil @@ -120,7 +127,7 @@ func (poster *HTTPPoster) Post(addr string, contentType string, content interfac _, err = fileOut.Write(body) fileOut.Close() return nil, err - }, time.Duration(2*time.Second), utils.FileLockPrefix+fallbackFilePath) + }, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+fallbackFilePath) } return } diff --git a/engine/pstr_s3.go b/engine/pstr_s3.go index 8f7558da7..4604deb3d 100644 --- a/engine/pstr_s3.go +++ b/engine/pstr_s3.go @@ -91,7 +91,9 @@ func (pstr *S3Poster) Post(message []byte, fallbackFileName, key string) (err er if svc, err = pstr.newPosterSession(); err == nil { break } - time.Sleep(time.Duration(fib()) * time.Second) + if i+1 < pstr.attempts { + time.Sleep(time.Duration(fib()) * time.Second) + } } if err != nil { if fallbackFileName != utils.META_NONE { @@ -118,6 +120,9 @@ func (pstr *S3Poster) Post(message []byte, fallbackFileName, key string) (err er }); err == nil { break } + if i+1 < pstr.attempts { + time.Sleep(time.Duration(fib()) * time.Second) + } } if err != nil && fallbackFileName != utils.META_NONE { utils.Logger.Warning(fmt.Sprintf(" posting new message, err: %s", err.Error())) diff --git a/engine/pstr_sqs.go b/engine/pstr_sqs.go index 41f3015c5..95976ae02 100644 --- a/engine/pstr_sqs.go +++ b/engine/pstr_sqs.go @@ -123,7 +123,9 @@ func (pstr *SQSPoster) Post(message []byte, fallbackFileName, _ string) (err err if svc, err = pstr.newPosterSession(); err == nil { break } - time.Sleep(time.Duration(fib()) * time.Second) + if i+1 < pstr.attempts { + time.Sleep(time.Duration(fib()) * time.Second) + } } if err != nil { if fallbackFileName != utils.META_NONE { @@ -142,6 +144,9 @@ func (pstr *SQSPoster) Post(message []byte, fallbackFileName, _ string) (err err ); err == nil { break } + if i+1 < pstr.attempts { + time.Sleep(time.Duration(fib()) * time.Second) + } } if err != nil && fallbackFileName != utils.META_NONE { utils.Logger.Warning(fmt.Sprintf(" posting new message, err: %s", err.Error()))