Update to latest amqp 1.0 package version and fix compilation errors

This commit is contained in:
ionutboangiu
2023-02-22 04:32:11 -05:00
committed by Dan Christian Bogos
parent 0a9f6d90d9
commit a1449499cf
7 changed files with 32 additions and 41 deletions

View File

@@ -44,7 +44,7 @@ func NewAMQPv1EE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *AMQPv1
// AMQPv1EE a poster for amqpv1
type AMQPv1EE struct {
queueID string // identifier of the CDR queue where we publish
client *amqpv1.Client
conn *amqpv1.Conn
session *amqpv1.Session
cfg *config.EventExporterCfg
@@ -59,19 +59,19 @@ func (pstr *AMQPv1EE) Cfg() *config.EventExporterCfg { return pstr.cfg }
func (pstr *AMQPv1EE) Connect() (err error) {
pstr.Lock()
defer pstr.Unlock()
if pstr.client == nil {
if pstr.client, err = amqpv1.Dial(pstr.Cfg().ExportPath); err != nil {
if pstr.conn == nil {
if pstr.conn, err = amqpv1.Dial(pstr.Cfg().ExportPath, nil); err != nil {
return
}
}
if pstr.session == nil {
pstr.session, err = pstr.client.NewSession()
pstr.session, err = pstr.conn.NewSession(context.TODO(), nil)
if err != nil {
// reset client and try again
// used in case of closed connection because of idle time
if pstr.client != nil {
pstr.client.Close() // Make shure the connection is closed before reseting it
pstr.client = nil
if pstr.conn != nil {
pstr.conn.Close() // Make shure the connection is closed before reseting it
pstr.conn = nil
}
}
}
@@ -88,9 +88,7 @@ func (pstr *AMQPv1EE) ExportEvent(content interface{}, _ string) (err error) {
if pstr.session == nil {
return utils.ErrDisconnected
}
sender, err := pstr.session.NewSender(
amqpv1.LinkTargetAddress(pstr.queueID),
)
sender, err := pstr.session.NewSender(context.TODO(), pstr.queueID, nil)
if err != nil {
return
}
@@ -107,9 +105,9 @@ func (pstr *AMQPv1EE) Close() (err error) {
pstr.session.Close(context.Background())
pstr.session = nil
}
if pstr.client != nil {
err = pstr.client.Close()
pstr.client = nil
if pstr.conn != nil {
err = pstr.conn.Close()
pstr.conn = nil
}
pstr.Unlock()
return

View File

@@ -137,7 +137,7 @@ func testAMQPv1ExportEvent(t *testing.T) {
func testAMQPv1VerifyExport(t *testing.T) {
// Create client
client, err := amqpv1.Dial(amqpv1DialURL)
client, err := amqpv1.Dial(amqpv1DialURL, nil)
/* an alternative way to create the client
client, err := amqpv1.Dial("amqps://cgratescdrs.servicebus.windows.net",
amqpv1.ConnSASLPlain("access-key-name", "access-key"),
@@ -148,19 +148,16 @@ func testAMQPv1VerifyExport(t *testing.T) {
}
defer client.Close()
ctx := context.Background()
// Open a session
session, err := client.NewSession()
session, err := client.NewSession(ctx, nil)
if err != nil {
t.Fatal("Creating AMQP session:", err)
}
ctx := context.Background()
// Create a receiver
receiver, err := session.NewReceiver(
amqpv1.LinkSourceAddress("/cgrates_cdrs"),
amqpv1.LinkCredit(10),
)
receiver, err := session.NewReceiver(ctx, "/cgrates_cdrs", nil)
if err != nil {
t.Fatal("Creating receiver link:", err)
}

View File

@@ -317,14 +317,14 @@ func TestAMQPv1Poster(t *testing.T) {
t.Fatal(err)
}
// Create client
client, err := amqpv1.Dial(endpoint)
client, err := amqpv1.Dial(endpoint, nil)
if err != nil {
t.Fatal("Dialing AMQP server:", err)
}
defer client.Close()
// Open a session
session, err := client.NewSession()
session, err := client.NewSession(context.Background(), nil)
if err != nil {
t.Fatal("Creating AMQP session:", err)
}
@@ -332,9 +332,7 @@ func TestAMQPv1Poster(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// Create a receiver
receiver, err := session.NewReceiver(
amqpv1.LinkSourceAddress("/" + qname),
)
receiver, err := session.NewReceiver(ctx, "/"+qname, nil)
if err != nil {
t.Fatal("Creating receiver link:", err)
}

View File

@@ -73,7 +73,7 @@ type AMQPv1ER struct {
rdrErr chan error
cap chan struct{}
conn *amqpv1.Client
conn *amqpv1.Conn
ses *amqpv1.Session
poster *ees.AMQPv1EE
@@ -86,10 +86,10 @@ func (rdr *AMQPv1ER) Config() *config.EventReaderCfg {
// Serve will start the gorutines needed to watch the amqpv1 topic
func (rdr *AMQPv1ER) Serve() (err error) {
if rdr.conn, err = amqpv1.Dial(rdr.Config().SourcePath); err != nil {
if rdr.conn, err = amqpv1.Dial(rdr.Config().SourcePath, nil); err != nil {
return
}
if rdr.ses, err = rdr.conn.NewSession(); err != nil {
if rdr.ses, err = rdr.conn.NewSession(context.TODO(), nil); err != nil {
rdr.close()
return
}
@@ -98,9 +98,8 @@ func (rdr *AMQPv1ER) Serve() (err error) {
}
var receiver *amqpv1.Receiver
if receiver, err = rdr.ses.NewReceiver(
amqpv1.LinkSourceAddress(rdr.queueID),
); err != nil {
if receiver, err = rdr.ses.NewReceiver(context.TODO(), rdr.queueID,
nil); err != nil {
return
}
go func() {
@@ -121,7 +120,7 @@ func (rdr *AMQPv1ER) readLoop(recv *amqpv1.Receiver) (err error) {
ctx := context.Background()
var msg *amqpv1.Message
if msg, err = recv.Receive(ctx); err != nil {
if err == amqpv1.ErrLinkClosed {
if err.Error() == "amqp: link closed" {
err = nil
return
}

View File

@@ -86,20 +86,20 @@ func TestAMQPERv1(t *testing.T) {
t.Fatal(err)
}
amqpv1Rdr := rdr.(*AMQPv1ER)
connection, err := amqpv1.Dial("amqps://RootManageSharedAccessKey:Je8l%2Bt9tyOgZbdA%2B5SmGIJEsEzhZ9VdIO7yRke5EYtM%3D@test0123456y.servicebus.windows.net")
connection, err := amqpv1.Dial("amqps://RootManageSharedAccessKey:Je8l%2Bt9tyOgZbdA%2B5SmGIJEsEzhZ9VdIO7yRke5EYtM%3D@test0123456y.servicebus.windows.net", nil)
if err != nil {
t.Fatal(err)
}
defer connection.Close()
channel, err := connection.NewSession()
channel, err := connection.NewSession(context.Background(), nil)
if err != nil {
t.Fatal(err)
}
defer channel.Close(context.Background())
randomCGRID := utils.UUIDSha1Prefix()
sndr, err := channel.NewSender(amqpv1.LinkTargetAddress(amqpv1Rdr.queueID))
sndr, err := channel.NewSender(context.Background(), amqpv1Rdr.queueID, nil)
if err != nil {
t.Fatal(err)
}

2
go.mod
View File

@@ -13,7 +13,7 @@ go 1.20
// replace github.com/cgrates/aringo => ../aringo
require (
github.com/Azure/go-amqp v0.17.5
github.com/Azure/go-amqp v0.18.1
github.com/antchfx/xmlquery v1.3.3
github.com/aws/aws-sdk-go v1.36.24
github.com/blevesearch/bleve v1.0.14

7
go.sum
View File

@@ -34,8 +34,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/go-amqp v0.17.5 h1:7Lsi9H9ijCAfqOaMiNmQ4c+GL9bdrpCjebNKhV/eQ+c=
github.com/Azure/go-amqp v0.17.5/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
github.com/Azure/go-amqp v0.18.1 h1:D5Ca+uijuTcj5g76sF+zT4OQZcFFY397+IGf/5Ip5Sc=
github.com/Azure/go-amqp v0.18.1/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/RoaringBitmap/roaring v0.4.23/go.mod h1:D0gp8kJQgE1A4LQ5wFLggQEyvDi06Mq5mKs52e1TwOo=
@@ -148,7 +148,6 @@ github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+ne
github.com/fiorix/go-diameter/v4 v4.0.2 h1:JVxecvfWqqvZWVO6PLTwUj/CV7CusaqYtz40WNYfWvI=
github.com/fiorix/go-diameter/v4 v4.0.2/go.mod h1:Qx/+pf+c9sBUHWq1d7EH3bkdwN8U0mUpdy9BieDw6UQ=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
@@ -211,8 +210,8 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=