Merge pull request #1833 from Trial97/master

Updated *sql integration tests
This commit is contained in:
Dan Christian Bogos
2019-12-18 13:56:46 +01:00
committed by GitHub
5 changed files with 50 additions and 19 deletions

View File

@@ -74,7 +74,9 @@ func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName, _ string) (err
pstr.client.Close() // Make shure the connection is closed before reseting it
}
pstr.client = nil
time.Sleep(time.Duration(fib()) * time.Second)
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
}
if err != nil {
if fallbackFileName != utils.META_NONE {
@@ -90,7 +92,9 @@ func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName, _ string) (err
amqpv1.LinkTargetAddress(pstr.queueID),
)
if err != nil {
time.Sleep(time.Duration(fib()) * time.Second)
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
// if pstr.isRecoverableError(err) {
// s.Close(ctx)
// pstr.client.Close()
@@ -108,7 +112,9 @@ func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName, _ string) (err
if err == nil {
break
}
time.Sleep(time.Duration(fib()) * time.Second)
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
// if pstr.isRecoverableError(err) {
// s.Close(ctx)
// pstr.client.Close()

View File

@@ -28,6 +28,7 @@ import (
"os"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
)
@@ -93,19 +94,25 @@ func (poster *HTTPPoster) Post(addr string, contentType string, content interfac
}
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error: <%s>", addr, err.Error()))
time.Sleep(time.Duration(fib()) * time.Second)
if i+1 < attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
continue
}
defer resp.Body.Close()
respBody, err = ioutil.ReadAll(resp.Body)
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error: <%s>", addr, err.Error()))
time.Sleep(time.Duration(fib()) * time.Second)
if i+1 < attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
continue
}
if resp.StatusCode > 299 {
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, unexpected status code received: <%d>", addr, resp.StatusCode))
time.Sleep(time.Duration(fib()) * time.Second)
if i+1 < attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
continue
}
return respBody, nil
@@ -120,7 +127,7 @@ func (poster *HTTPPoster) Post(addr string, contentType string, content interfac
_, err = fileOut.Write(body)
fileOut.Close()
return nil, err
}, time.Duration(2*time.Second), utils.FileLockPrefix+fallbackFilePath)
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+fallbackFilePath)
}
return
}

View File

@@ -91,7 +91,9 @@ func (pstr *S3Poster) Post(message []byte, fallbackFileName, key string) (err er
if svc, err = pstr.newPosterSession(); err == nil {
break
}
time.Sleep(time.Duration(fib()) * time.Second)
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
}
if err != nil {
if fallbackFileName != utils.META_NONE {
@@ -118,6 +120,9 @@ func (pstr *S3Poster) Post(message []byte, fallbackFileName, key string) (err er
}); err == nil {
break
}
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
}
if err != nil && fallbackFileName != utils.META_NONE {
utils.Logger.Warning(fmt.Sprintf("<S3Poster> posting new message, err: %s", err.Error()))

View File

@@ -123,7 +123,9 @@ func (pstr *SQSPoster) Post(message []byte, fallbackFileName, _ string) (err err
if svc, err = pstr.newPosterSession(); err == nil {
break
}
time.Sleep(time.Duration(fib()) * time.Second)
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
}
if err != nil {
if fallbackFileName != utils.META_NONE {
@@ -142,6 +144,9 @@ func (pstr *SQSPoster) Post(message []byte, fallbackFileName, _ string) (err err
); err == nil {
break
}
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
}
if err != nil && fallbackFileName != utils.META_NONE {
utils.Logger.Warning(fmt.Sprintf("<SQSPoster> posting new message, err: %s", err.Error()))

View File

@@ -21,6 +21,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ers
import (
"fmt"
"reflect"
"testing"
"time"
@@ -37,6 +38,7 @@ var (
sqlCfg *config.CGRConfig
sqlTests = []func(t *testing.T){
testSQLInitConfig,
testSQLInitDBs,
testSQLInitCdrDb,
testSQLInitDB,
testSQLReader,
@@ -53,7 +55,7 @@ var (
RunID: "RunID",
}
db *gorm.DB
dbConnString = "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/cgrates2?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'"
dbConnString = "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'"
)
func TestSQL(t *testing.T) {
@@ -98,16 +100,9 @@ func testSQLInitConfig(t *testing.T) {
}
func testSQLInitCdrDb(t *testing.T) {
rdrsql := sqlCfg.StorDbCfg().Clone()
if err := engine.InitStorDb(sqlCfg); err != nil {
t.Fatal(err)
}
sqlCfg.StorDbCfg().Name = "cgrates2"
if err := engine.InitStorDb(sqlCfg); err != nil {
t.Fatal(err)
}
*sqlCfg.StorDbCfg() = *rdrsql
}
type testModelSql struct {
@@ -141,15 +136,28 @@ func (_ *testModelSql) TableName() string {
return "cdrs2"
}
func testSQLInitDBs(t *testing.T) {
var err error
if db, err = gorm.Open("mysql", fmt.Sprintf(dbConnString, "cgrates")); err != nil {
t.Fatal(err)
}
if _, err = db.DB().Exec(`CREATE DATABASE IF NOT EXISTS cgrates2;`); err != nil {
t.Fatal(err)
}
}
func testSQLInitDB(t *testing.T) {
cdr.CGRID = utils.UUIDSha1Prefix()
var err error
db, err = gorm.Open("mysql", dbConnString)
db, err = gorm.Open("mysql", fmt.Sprintf(dbConnString, "cgrates2"))
if err != nil {
t.Fatal(err)
}
if !db.HasTable("cdrs") {
db = db.CreateTable(new(engine.CDRsql))
}
if !db.HasTable("cdrs2") {
db = db.CreateTable(&testModelSql{})
db = db.CreateTable(new(testModelSql))
}
db = db.Table(utils.CDRsTBL)
tx := db.Begin()