From fec0de9ef8c6327d52717817cb9a874e8b2d2dfc Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 18 Dec 2019 13:29:51 +0200 Subject: [PATCH 1/2] Updated Posters --- engine/pstr_amqpv1.go | 12 +++++++++--- engine/pstr_http.go | 15 +++++++++++---- engine/pstr_s3.go | 7 ++++++- engine/pstr_sqs.go | 7 ++++++- 4 files changed, 32 insertions(+), 9 deletions(-) diff --git a/engine/pstr_amqpv1.go b/engine/pstr_amqpv1.go index b3f6a65c1..a121831fc 100644 --- a/engine/pstr_amqpv1.go +++ b/engine/pstr_amqpv1.go @@ -74,7 +74,9 @@ func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName, _ string) (err pstr.client.Close() // Make shure the connection is closed before reseting it } pstr.client = nil - time.Sleep(time.Duration(fib()) * time.Second) + if i+1 < pstr.attempts { + time.Sleep(time.Duration(fib()) * time.Second) + } } if err != nil { if fallbackFileName != utils.META_NONE { @@ -90,7 +92,9 @@ func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName, _ string) (err amqpv1.LinkTargetAddress(pstr.queueID), ) if err != nil { - time.Sleep(time.Duration(fib()) * time.Second) + if i+1 < pstr.attempts { + time.Sleep(time.Duration(fib()) * time.Second) + } // if pstr.isRecoverableError(err) { // s.Close(ctx) // pstr.client.Close() @@ -108,7 +112,9 @@ func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName, _ string) (err if err == nil { break } - time.Sleep(time.Duration(fib()) * time.Second) + if i+1 < pstr.attempts { + time.Sleep(time.Duration(fib()) * time.Second) + } // if pstr.isRecoverableError(err) { // s.Close(ctx) // pstr.client.Close() diff --git a/engine/pstr_http.go b/engine/pstr_http.go index 9de52d1af..43b14b01e 100644 --- a/engine/pstr_http.go +++ b/engine/pstr_http.go @@ -28,6 +28,7 @@ import ( "os" "time" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/utils" ) @@ -93,19 +94,25 @@ func (poster *HTTPPoster) Post(addr string, contentType string, content interfac } if err != nil { utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error())) - time.Sleep(time.Duration(fib()) * time.Second) + if i+1 < attempts { + time.Sleep(time.Duration(fib()) * time.Second) + } continue } defer resp.Body.Close() respBody, err = ioutil.ReadAll(resp.Body) if err != nil { utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error())) - time.Sleep(time.Duration(fib()) * time.Second) + if i+1 < attempts { + time.Sleep(time.Duration(fib()) * time.Second) + } continue } if resp.StatusCode > 299 { utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, unexpected status code received: <%d>", addr, resp.StatusCode)) - time.Sleep(time.Duration(fib()) * time.Second) + if i+1 < attempts { + time.Sleep(time.Duration(fib()) * time.Second) + } continue } return respBody, nil @@ -120,7 +127,7 @@ func (poster *HTTPPoster) Post(addr string, contentType string, content interfac _, err = fileOut.Write(body) fileOut.Close() return nil, err - }, time.Duration(2*time.Second), utils.FileLockPrefix+fallbackFilePath) + }, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+fallbackFilePath) } return } diff --git a/engine/pstr_s3.go b/engine/pstr_s3.go index 8f7558da7..4604deb3d 100644 --- a/engine/pstr_s3.go +++ b/engine/pstr_s3.go @@ -91,7 +91,9 @@ func (pstr *S3Poster) Post(message []byte, fallbackFileName, key string) (err er if svc, err = pstr.newPosterSession(); err == nil { break } - time.Sleep(time.Duration(fib()) * time.Second) + if i+1 < pstr.attempts { + time.Sleep(time.Duration(fib()) * time.Second) + } } if err != nil { if fallbackFileName != utils.META_NONE { @@ -118,6 +120,9 @@ func (pstr *S3Poster) Post(message []byte, fallbackFileName, key string) (err er }); err == nil { break } + if i+1 < pstr.attempts { + time.Sleep(time.Duration(fib()) * time.Second) + } } if err != nil && fallbackFileName != utils.META_NONE { utils.Logger.Warning(fmt.Sprintf(" posting new message, err: %s", err.Error())) diff --git a/engine/pstr_sqs.go b/engine/pstr_sqs.go index 41f3015c5..95976ae02 100644 --- a/engine/pstr_sqs.go +++ b/engine/pstr_sqs.go @@ -123,7 +123,9 @@ func (pstr *SQSPoster) Post(message []byte, fallbackFileName, _ string) (err err if svc, err = pstr.newPosterSession(); err == nil { break } - time.Sleep(time.Duration(fib()) * time.Second) + if i+1 < pstr.attempts { + time.Sleep(time.Duration(fib()) * time.Second) + } } if err != nil { if fallbackFileName != utils.META_NONE { @@ -142,6 +144,9 @@ func (pstr *SQSPoster) Post(message []byte, fallbackFileName, _ string) (err err ); err == nil { break } + if i+1 < pstr.attempts { + time.Sleep(time.Duration(fib()) * time.Second) + } } if err != nil && fallbackFileName != utils.META_NONE { utils.Logger.Warning(fmt.Sprintf(" posting new message, err: %s", err.Error())) From 8af6eeb7a9d4f0c150cbf9044df6bd2a43c98a66 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 18 Dec 2019 14:21:52 +0200 Subject: [PATCH 2/2] Updated *sql integration tests --- ers/sql_it_test.go | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/ers/sql_it_test.go b/ers/sql_it_test.go index b673da954..966ace851 100644 --- a/ers/sql_it_test.go +++ b/ers/sql_it_test.go @@ -21,6 +21,7 @@ along with this program. If not, see package ers import ( + "fmt" "reflect" "testing" "time" @@ -37,6 +38,7 @@ var ( sqlCfg *config.CGRConfig sqlTests = []func(t *testing.T){ testSQLInitConfig, + testSQLInitDBs, testSQLInitCdrDb, testSQLInitDB, testSQLReader, @@ -53,7 +55,7 @@ var ( RunID: "RunID", } db *gorm.DB - dbConnString = "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/cgrates2?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'" + dbConnString = "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'" ) func TestSQL(t *testing.T) { @@ -98,16 +100,9 @@ func testSQLInitConfig(t *testing.T) { } func testSQLInitCdrDb(t *testing.T) { - rdrsql := sqlCfg.StorDbCfg().Clone() if err := engine.InitStorDb(sqlCfg); err != nil { t.Fatal(err) } - sqlCfg.StorDbCfg().Name = "cgrates2" - if err := engine.InitStorDb(sqlCfg); err != nil { - t.Fatal(err) - } - *sqlCfg.StorDbCfg() = *rdrsql - } type testModelSql struct { @@ -141,15 +136,28 @@ func (_ *testModelSql) TableName() string { return "cdrs2" } +func testSQLInitDBs(t *testing.T) { + var err error + if db, err = gorm.Open("mysql", fmt.Sprintf(dbConnString, "cgrates")); err != nil { + t.Fatal(err) + } + + if _, err = db.DB().Exec(`CREATE DATABASE IF NOT EXISTS cgrates2;`); err != nil { + t.Fatal(err) + } +} func testSQLInitDB(t *testing.T) { cdr.CGRID = utils.UUIDSha1Prefix() var err error - db, err = gorm.Open("mysql", dbConnString) + db, err = gorm.Open("mysql", fmt.Sprintf(dbConnString, "cgrates2")) if err != nil { t.Fatal(err) } + if !db.HasTable("cdrs") { + db = db.CreateTable(new(engine.CDRsql)) + } if !db.HasTable("cdrs2") { - db = db.CreateTable(&testModelSql{}) + db = db.CreateTable(new(testModelSql)) } db = db.Table(utils.CDRsTBL) tx := db.Begin()