Updated Posters

This commit is contained in:
Trial97
2019-12-18 13:29:51 +02:00
parent ea5c691e73
commit fec0de9ef8
4 changed files with 32 additions and 9 deletions

View File

@@ -74,7 +74,9 @@ func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName, _ string) (err
pstr.client.Close() // Make shure the connection is closed before reseting it
}
pstr.client = nil
time.Sleep(time.Duration(fib()) * time.Second)
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
}
if err != nil {
if fallbackFileName != utils.META_NONE {
@@ -90,7 +92,9 @@ func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName, _ string) (err
amqpv1.LinkTargetAddress(pstr.queueID),
)
if err != nil {
time.Sleep(time.Duration(fib()) * time.Second)
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
// if pstr.isRecoverableError(err) {
// s.Close(ctx)
// pstr.client.Close()
@@ -108,7 +112,9 @@ func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName, _ string) (err
if err == nil {
break
}
time.Sleep(time.Duration(fib()) * time.Second)
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
// if pstr.isRecoverableError(err) {
// s.Close(ctx)
// pstr.client.Close()

View File

@@ -28,6 +28,7 @@ import (
"os"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
)
@@ -93,19 +94,25 @@ func (poster *HTTPPoster) Post(addr string, contentType string, content interfac
}
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error: <%s>", addr, err.Error()))
time.Sleep(time.Duration(fib()) * time.Second)
if i+1 < attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
continue
}
defer resp.Body.Close()
respBody, err = ioutil.ReadAll(resp.Body)
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error: <%s>", addr, err.Error()))
time.Sleep(time.Duration(fib()) * time.Second)
if i+1 < attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
continue
}
if resp.StatusCode > 299 {
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, unexpected status code received: <%d>", addr, resp.StatusCode))
time.Sleep(time.Duration(fib()) * time.Second)
if i+1 < attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
continue
}
return respBody, nil
@@ -120,7 +127,7 @@ func (poster *HTTPPoster) Post(addr string, contentType string, content interfac
_, err = fileOut.Write(body)
fileOut.Close()
return nil, err
}, time.Duration(2*time.Second), utils.FileLockPrefix+fallbackFilePath)
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+fallbackFilePath)
}
return
}

View File

@@ -91,7 +91,9 @@ func (pstr *S3Poster) Post(message []byte, fallbackFileName, key string) (err er
if svc, err = pstr.newPosterSession(); err == nil {
break
}
time.Sleep(time.Duration(fib()) * time.Second)
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
}
if err != nil {
if fallbackFileName != utils.META_NONE {
@@ -118,6 +120,9 @@ func (pstr *S3Poster) Post(message []byte, fallbackFileName, key string) (err er
}); err == nil {
break
}
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
}
if err != nil && fallbackFileName != utils.META_NONE {
utils.Logger.Warning(fmt.Sprintf("<S3Poster> posting new message, err: %s", err.Error()))

View File

@@ -123,7 +123,9 @@ func (pstr *SQSPoster) Post(message []byte, fallbackFileName, _ string) (err err
if svc, err = pstr.newPosterSession(); err == nil {
break
}
time.Sleep(time.Duration(fib()) * time.Second)
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
}
if err != nil {
if fallbackFileName != utils.META_NONE {
@@ -142,6 +144,9 @@ func (pstr *SQSPoster) Post(message []byte, fallbackFileName, _ string) (err err
); err == nil {
break
}
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
}
if err != nil && fallbackFileName != utils.META_NONE {
utils.Logger.Warning(fmt.Sprintf("<SQSPoster> posting new message, err: %s", err.Error()))