Update to latest amqp 1.0 package version and fix compilation errors

This commit is contained in:
ionutboangiu
2023-02-22 05:30:33 -05:00
committed by Dan Christian Bogos
parent 5a0e973c5e
commit dbd497c4d4
5 changed files with 24 additions and 70 deletions

View File

@@ -216,7 +216,7 @@ func testAMQPv1ExportCDRs(t *testing.T) {
func testAMQPv1VerifyExport(t *testing.T) {
// Create client
client, err := amqp.Dial(amqpv1DialURL)
client, err := amqp.Dial(amqpv1DialURL, nil)
/* an alternative way to create the client
client, err := amqp.Dial("amqps://name-space.servicebus.windows.net",
amqp.ConnSASLPlain("access-key-name", "access-key"),
@@ -227,14 +227,14 @@ func testAMQPv1VerifyExport(t *testing.T) {
}
defer client.Close()
ctx := context.Background()
// Open a session
session, err := client.NewSession()
session, err := client.NewSession(ctx, nil)
if err != nil {
t.Fatal("Creating AMQP session:", err)
}
ctx := context.Background()
expCDRs := []string{
`{"Account":"1001","CGRID":"Cdr2","Category":"call","Cost":"-1.0000","Destination":"+4986517174963","OriginID":"OriginCDR2","RunID":"*default","Source":"test2","Tenant":"cgrates.org","Usage":"5s"}`,
`{"Account":"1001","CGRID":"Cdr3","Category":"call","Cost":"-1.0000","Destination":"+4986517174963","OriginID":"OriginCDR3","RunID":"*default","Source":"test2","Tenant":"cgrates.org","Usage":"30s"}`,
@@ -242,10 +242,7 @@ func testAMQPv1VerifyExport(t *testing.T) {
rplyCDRs := make([]string, 0)
// Create a receiver
receiver, err := session.NewReceiver(
amqp.LinkSourceAddress("/cgrates_cdrs"),
amqp.LinkCredit(10),
)
receiver, err := session.NewReceiver(ctx, "/cgrates_cdrs", nil)
if err != nil {
t.Fatal("Creating receiver link:", err)
}

View File

@@ -298,14 +298,14 @@ func TestAMQPv1Poster(t *testing.T) {
t.Fatal(err)
}
// Create client
client, err := amqpv1.Dial(endpoint)
client, err := amqpv1.Dial(endpoint, nil)
if err != nil {
t.Fatal("Dialing AMQP server:", err)
}
defer client.Close()
// Open a session
session, err := client.NewSession()
session, err := client.NewSession(context.Background(), nil)
if err != nil {
t.Fatal("Creating AMQP session:", err)
}
@@ -313,9 +313,7 @@ func TestAMQPv1Poster(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
// Create a receiver
receiver, err := session.NewReceiver(
amqpv1.LinkSourceAddress("/" + qname),
)
receiver, err := session.NewReceiver(ctx, "/"+qname, nil)
if err != nil {
t.Fatal("Creating receiver link:", err)
}

View File

@@ -21,7 +21,6 @@ package engine
import (
"context"
"fmt"
"net"
"sync"
"time"
@@ -48,16 +47,16 @@ type AMQPv1Poster struct {
dialURL string
queueID string // identifier of the CDR queue where we publish
attempts int
client *amqpv1.Client
conn *amqpv1.Conn
}
// Close closes the connections
func (pstr *AMQPv1Poster) Close() {
pstr.Lock()
if pstr.client != nil {
pstr.client.Close()
if pstr.conn != nil {
pstr.conn.Close()
}
pstr.client = nil
pstr.conn = nil
pstr.Unlock()
}
@@ -72,10 +71,10 @@ func (pstr *AMQPv1Poster) Post(content []byte, _ string) (err error) {
}
// reset client and try again
// used in case of closed connection because of idle time
if pstr.client != nil {
pstr.client.Close() // Make shure the connection is closed before reseting it
if pstr.conn != nil {
pstr.conn.Close() // Make shure the connection is closed before reseting it
}
pstr.client = nil
pstr.conn = nil
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
@@ -87,22 +86,11 @@ func (pstr *AMQPv1Poster) Post(content []byte, _ string) (err error) {
ctx := context.Background()
for i := 0; i < pstr.attempts; i++ {
sender, err := s.NewSender(
amqpv1.LinkTargetAddress(pstr.queueID),
)
sender, err := s.NewSender(ctx, pstr.queueID, nil)
if err != nil {
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
// if pstr.isRecoverableError(err) {
// s.Close(ctx)
// pstr.client.Close()
// pstr.client = nil
// stmp, err := pstr.newPosterSession()
// if err == nil {
// s = stmp
// }
// }
continue
}
// Send message
@@ -114,15 +102,6 @@ func (pstr *AMQPv1Poster) Post(content []byte, _ string) (err error) {
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
// if pstr.isRecoverableError(err) {
// s.Close(ctx)
// pstr.client.Close()
// pstr.client = nil
// stmp, err := pstr.newPosterSession()
// if err == nil {
// s = stmp
// }
// }
}
if err != nil {
return
@@ -136,35 +115,13 @@ func (pstr *AMQPv1Poster) Post(content []byte, _ string) (err error) {
func (pstr *AMQPv1Poster) newPosterSession() (s *amqpv1.Session, err error) {
pstr.Lock()
defer pstr.Unlock()
if pstr.client == nil {
var client *amqpv1.Client
client, err = amqpv1.Dial(pstr.dialURL)
if pstr.conn == nil {
var client *amqpv1.Conn
client, err = amqpv1.Dial(pstr.dialURL, nil)
if err != nil {
return nil, err
}
pstr.client = client
pstr.conn = client
}
return pstr.client.NewSession()
}
func isRecoverableCloseError(err error) bool {
return err == amqpv1.ErrConnClosed ||
err == amqpv1.ErrLinkClosed ||
err == amqpv1.ErrSessionClosed
}
func (pstr *AMQPv1Poster) isRecoverableError(err error) bool {
switch err.(type) {
case *amqpv1.Error, *amqpv1.DetachError, net.Error:
if netErr, ok := err.(net.Error); ok {
if !netErr.Temporary() {
return false
}
}
default:
if !isRecoverableCloseError(err) {
return false
}
}
return true
return pstr.conn.NewSession(context.TODO(), nil)
}

2
go.mod
View File

@@ -35,7 +35,7 @@ require (
google.golang.org/api v0.103.0
)
require github.com/Azure/go-amqp v0.17.5
require github.com/Azure/go-amqp v0.18.1
require (
cloud.google.com/go/compute v1.12.1 // indirect

2
go.sum
View File

@@ -7,6 +7,8 @@ cloud.google.com/go/compute/metadata v0.2.1/go.mod h1:jgHgmJd2RKBGzXqF5LR2EZMGxB
cloud.google.com/go/longrunning v0.3.0 h1:NjljC+FYPV3uh5/OwWT6pVU+doBqMg2x/rZlE+CamDs=
github.com/Azure/go-amqp v0.17.5 h1:7Lsi9H9ijCAfqOaMiNmQ4c+GL9bdrpCjebNKhV/eQ+c=
github.com/Azure/go-amqp v0.17.5/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
github.com/Azure/go-amqp v0.18.1 h1:D5Ca+uijuTcj5g76sF+zT4OQZcFFY397+IGf/5Ip5Sc=
github.com/Azure/go-amqp v0.18.1/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBKeU+7zCJoLcc=
github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=