Update to latest amqp 1.0 package version and fix compilation errors

This commit is contained in:
ionutboangiu
2023-02-22 03:58:30 -05:00
committed by Dan Christian Bogos
parent 748764d4a7
commit 73dd31a8ca
7 changed files with 31 additions and 38 deletions

View File

@@ -44,7 +44,7 @@ func NewAMQPv1EE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *AMQPv1
// AMQPv1EE a poster for amqpv1
type AMQPv1EE struct {
queueID string // identifier of the CDR queue where we publish
client *amqpv1.Client
conn *amqpv1.Conn
session *amqpv1.Session
cfg *config.EventExporterCfg
@@ -59,19 +59,19 @@ func (pstr *AMQPv1EE) Cfg() *config.EventExporterCfg { return pstr.cfg }
func (pstr *AMQPv1EE) Connect() (err error) {
pstr.Lock()
defer pstr.Unlock()
if pstr.client == nil {
if pstr.client, err = amqpv1.Dial(pstr.Cfg().ExportPath); err != nil {
if pstr.conn == nil {
if pstr.conn, err = amqpv1.Dial(pstr.Cfg().ExportPath, nil); err != nil {
return
}
}
if pstr.session == nil {
pstr.session, err = pstr.client.NewSession()
pstr.session, err = pstr.conn.NewSession(context.TODO(), nil)
if err != nil {
// reset client and try again
// used in case of closed connection because of idle time
if pstr.client != nil {
pstr.client.Close() // Make shure the connection is closed before reseting it
pstr.client = nil
if pstr.conn != nil {
pstr.conn.Close() // Make shure the connection is closed before reseting it
pstr.conn = nil
}
}
}
@@ -88,9 +88,7 @@ func (pstr *AMQPv1EE) ExportEvent(ctx *context.Context, content, _ interface{})
if pstr.session == nil {
return utils.ErrDisconnected
}
sender, err := pstr.session.NewSender(
amqpv1.LinkTargetAddress(pstr.queueID),
)
sender, err := pstr.session.NewSender(ctx, pstr.queueID, nil)
if err != nil {
return
}
@@ -106,9 +104,9 @@ func (pstr *AMQPv1EE) Close() (err error) {
pstr.session.Close(context.Background())
pstr.session = nil
}
if pstr.client != nil {
err = pstr.client.Close()
pstr.client = nil
if pstr.conn != nil {
err = pstr.conn.Close()
pstr.conn = nil
}
pstr.Unlock()
return

View File

@@ -45,8 +45,10 @@ var (
testAMQPv1ResetDataDB,
testAMQPv1StartEngine,
testAMQPv1RPCConn,
testAMQPv1ExportEvent,
testAMQPv1VerifyExport,
testStopCgrEngine,
}
)
@@ -131,7 +133,7 @@ func testAMQPv1ExportEvent(t *testing.T) {
func testAMQPv1VerifyExport(t *testing.T) {
// Create client
client, err := amqpv1.Dial(amqpv1DialURL)
client, err := amqpv1.Dial(amqpv1DialURL, nil)
/* an alternative way to create the client
client, err := amqpv1.Dial("amqps://cgratescdrs.servicebus.windows.net",
amqpv1.ConnSASLPlain("access-key-name", "access-key"),
@@ -143,7 +145,7 @@ func testAMQPv1VerifyExport(t *testing.T) {
defer client.Close()
// Open a session
session, err := client.NewSession()
session, err := client.NewSession(context.Background(), nil)
if err != nil {
t.Fatal("Creating AMQP session:", err)
}
@@ -151,10 +153,7 @@ func testAMQPv1VerifyExport(t *testing.T) {
ctx := context.Background()
// Create a receiver
receiver, err := session.NewReceiver(
amqpv1.LinkSourceAddress("/cgrates_cdrs"),
amqpv1.LinkCredit(10),
)
receiver, err := session.NewReceiver(context.Background(), "/cgrates_cdrs", nil)
if err != nil {
t.Fatal("Creating receiver link:", err)
}

View File

@@ -298,14 +298,14 @@ func TestAMQPv1Poster(t *testing.T) {
t.Fatal(err)
}
// Create client
client, err := amqpv1.Dial(endpoint)
client, err := amqpv1.Dial(endpoint, nil)
if err != nil {
t.Fatal("Dialing AMQP server:", err)
}
defer client.Close()
// Open a session
session, err := client.NewSession()
session, err := client.NewSession(context.Background(), nil)
if err != nil {
t.Fatal("Creating AMQP session:", err)
}
@@ -313,9 +313,7 @@ func TestAMQPv1Poster(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// Create a receiver
receiver, err := session.NewReceiver(
amqpv1.LinkSourceAddress("/" + qname),
)
receiver, err := session.NewReceiver(ctx, "/"+qname, nil)
if err != nil {
t.Fatal("Creating receiver link:", err)
}

View File

@@ -75,7 +75,7 @@ type AMQPv1ER struct {
rdrErr chan error
cap chan struct{}
conn *amqpv1.Client
conn *amqpv1.Conn
ses *amqpv1.Session
poster *ees.AMQPv1EE
@@ -88,10 +88,10 @@ func (rdr *AMQPv1ER) Config() *config.EventReaderCfg {
// Serve will start the gorutines needed to watch the amqpv1 topic
func (rdr *AMQPv1ER) Serve() (err error) {
if rdr.conn, err = amqpv1.Dial(rdr.Config().SourcePath); err != nil {
if rdr.conn, err = amqpv1.Dial(rdr.Config().SourcePath, nil); err != nil {
return
}
if rdr.ses, err = rdr.conn.NewSession(); err != nil {
if rdr.ses, err = rdr.conn.NewSession(context.TODO(), nil); err != nil {
rdr.close()
return
}
@@ -100,9 +100,8 @@ func (rdr *AMQPv1ER) Serve() (err error) {
}
var receiver *amqpv1.Receiver
if receiver, err = rdr.ses.NewReceiver(
amqpv1.LinkSourceAddress(rdr.queueID),
); err != nil {
if receiver, err = rdr.ses.NewReceiver(context.TODO(), rdr.queueID,
nil); err != nil {
return
}
go func() {
@@ -125,7 +124,7 @@ func (rdr *AMQPv1ER) readLoop(recv *amqpv1.Receiver) (err error) {
ctx := context.Background()
var msg *amqpv1.Message
if msg, err = recv.Receive(ctx); err != nil {
if err == amqpv1.ErrLinkClosed {
if err.Error() == "amqp: link closed" {
err = nil
return
}

View File

@@ -84,20 +84,20 @@ func TestAMQPERv1(t *testing.T) {
t.Fatal(err)
}
amqpv1Rdr := rdr.(*AMQPv1ER)
connection, err := amqpv1.Dial("amqps://RootManageSharedAccessKey:Je8l%2Bt9tyOgZbdA%2B5SmGIJEsEzhZ9VdIO7yRke5EYtM%3D@test0123456y.servicebus.windows.net")
connection, err := amqpv1.Dial("amqps://RootManageSharedAccessKey:Je8l%2Bt9tyOgZbdA%2B5SmGIJEsEzhZ9VdIO7yRke5EYtM%3D@test0123456y.servicebus.windows.net", nil)
if err != nil {
t.Fatal(err)
}
defer connection.Close()
channel, err := connection.NewSession()
channel, err := connection.NewSession(context.Background(), nil)
if err != nil {
t.Fatal(err)
}
defer channel.Close(context.Background())
randomOriginID := utils.UUIDSha1Prefix()
sndr, err := channel.NewSender(amqpv1.LinkTargetAddress(amqpv1Rdr.queueID))
sndr, err := channel.NewSender(context.Background(), amqpv1Rdr.queueID, nil)
if err != nil {
t.Fatal(err)
}

2
go.mod
View File

@@ -7,7 +7,7 @@ go 1.18
// replace github.com/cgrates/rpcclient => ../rpcclient
require (
github.com/Azure/go-amqp v0.17.5
github.com/Azure/go-amqp v0.18.1
github.com/antchfx/xmlquery v1.3.11
github.com/aws/aws-sdk-go v1.44.43
github.com/blevesearch/bleve v1.0.14

5
go.sum
View File

@@ -55,8 +55,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.22.1/go.mod h1:S8N1cAStu7BOeFfE8KAQzmyyLkK8p/vmRq6kuBTW58Y=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/go-amqp v0.17.5 h1:7Lsi9H9ijCAfqOaMiNmQ4c+GL9bdrpCjebNKhV/eQ+c=
github.com/Azure/go-amqp v0.17.5/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
github.com/Azure/go-amqp v0.18.1 h1:D5Ca+uijuTcj5g76sF+zT4OQZcFFY397+IGf/5Ip5Sc=
github.com/Azure/go-amqp v0.18.1/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/Masterminds/semver/v3 v3.1.1 h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030IGemrRc=
@@ -203,7 +203,6 @@ github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+ne
github.com/fiorix/go-diameter/v4 v4.0.4 h1:/nw5zEmEW7pmP9YUYjOfU1GomR0LupKdYy52yd1j3NM=
github.com/fiorix/go-diameter/v4 v4.0.4/go.mod h1:Qx/+pf+c9sBUHWq1d7EH3bkdwN8U0mUpdy9BieDw6UQ=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=
github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=