diff --git a/ees/amqpv1.go b/ees/amqpv1.go index 2256d32f3..a6b9d2390 100644 --- a/ees/amqpv1.go +++ b/ees/amqpv1.go @@ -44,7 +44,7 @@ func NewAMQPv1EE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *AMQPv1 // AMQPv1EE a poster for amqpv1 type AMQPv1EE struct { queueID string // identifier of the CDR queue where we publish - client *amqpv1.Client + conn *amqpv1.Conn session *amqpv1.Session cfg *config.EventExporterCfg @@ -59,19 +59,19 @@ func (pstr *AMQPv1EE) Cfg() *config.EventExporterCfg { return pstr.cfg } func (pstr *AMQPv1EE) Connect() (err error) { pstr.Lock() defer pstr.Unlock() - if pstr.client == nil { - if pstr.client, err = amqpv1.Dial(pstr.Cfg().ExportPath); err != nil { + if pstr.conn == nil { + if pstr.conn, err = amqpv1.Dial(pstr.Cfg().ExportPath, nil); err != nil { return } } if pstr.session == nil { - pstr.session, err = pstr.client.NewSession() + pstr.session, err = pstr.conn.NewSession(context.TODO(), nil) if err != nil { // reset client and try again // used in case of closed connection because of idle time - if pstr.client != nil { - pstr.client.Close() // Make shure the connection is closed before reseting it - pstr.client = nil + if pstr.conn != nil { + pstr.conn.Close() // Make shure the connection is closed before reseting it + pstr.conn = nil } } } @@ -88,9 +88,7 @@ func (pstr *AMQPv1EE) ExportEvent(content interface{}, _ string) (err error) { if pstr.session == nil { return utils.ErrDisconnected } - sender, err := pstr.session.NewSender( - amqpv1.LinkTargetAddress(pstr.queueID), - ) + sender, err := pstr.session.NewSender(context.TODO(), pstr.queueID, nil) if err != nil { return } @@ -107,9 +105,9 @@ func (pstr *AMQPv1EE) Close() (err error) { pstr.session.Close(context.Background()) pstr.session = nil } - if pstr.client != nil { - err = pstr.client.Close() - pstr.client = nil + if pstr.conn != nil { + err = pstr.conn.Close() + pstr.conn = nil } pstr.Unlock() return diff --git a/ees/amqpv1_it_test.go b/ees/amqpv1_it_test.go index 8bee9622f..d01844618 100644 --- a/ees/amqpv1_it_test.go +++ b/ees/amqpv1_it_test.go @@ -137,7 +137,7 @@ func testAMQPv1ExportEvent(t *testing.T) { func testAMQPv1VerifyExport(t *testing.T) { // Create client - client, err := amqpv1.Dial(amqpv1DialURL) + client, err := amqpv1.Dial(amqpv1DialURL, nil) /* an alternative way to create the client client, err := amqpv1.Dial("amqps://cgratescdrs.servicebus.windows.net", amqpv1.ConnSASLPlain("access-key-name", "access-key"), @@ -148,19 +148,16 @@ func testAMQPv1VerifyExport(t *testing.T) { } defer client.Close() + ctx := context.Background() + // Open a session - session, err := client.NewSession() + session, err := client.NewSession(ctx, nil) if err != nil { t.Fatal("Creating AMQP session:", err) } - ctx := context.Background() - // Create a receiver - receiver, err := session.NewReceiver( - amqpv1.LinkSourceAddress("/cgrates_cdrs"), - amqpv1.LinkCredit(10), - ) + receiver, err := session.NewReceiver(ctx, "/cgrates_cdrs", nil) if err != nil { t.Fatal("Creating receiver link:", err) } diff --git a/ees/poster_it_test.go b/ees/poster_it_test.go index e4ecc0fc8..818285ec5 100644 --- a/ees/poster_it_test.go +++ b/ees/poster_it_test.go @@ -317,14 +317,14 @@ func TestAMQPv1Poster(t *testing.T) { t.Fatal(err) } // Create client - client, err := amqpv1.Dial(endpoint) + client, err := amqpv1.Dial(endpoint, nil) if err != nil { t.Fatal("Dialing AMQP server:", err) } defer client.Close() // Open a session - session, err := client.NewSession() + session, err := client.NewSession(context.Background(), nil) if err != nil { t.Fatal("Creating AMQP session:", err) } @@ -332,9 +332,7 @@ func TestAMQPv1Poster(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) // Create a receiver - receiver, err := session.NewReceiver( - amqpv1.LinkSourceAddress("/" + qname), - ) + receiver, err := session.NewReceiver(ctx, "/"+qname, nil) if err != nil { t.Fatal("Creating receiver link:", err) } diff --git a/ers/amqpv1.go b/ers/amqpv1.go index 4d853bab2..ba965adc6 100644 --- a/ers/amqpv1.go +++ b/ers/amqpv1.go @@ -73,7 +73,7 @@ type AMQPv1ER struct { rdrErr chan error cap chan struct{} - conn *amqpv1.Client + conn *amqpv1.Conn ses *amqpv1.Session poster *ees.AMQPv1EE @@ -86,10 +86,10 @@ func (rdr *AMQPv1ER) Config() *config.EventReaderCfg { // Serve will start the gorutines needed to watch the amqpv1 topic func (rdr *AMQPv1ER) Serve() (err error) { - if rdr.conn, err = amqpv1.Dial(rdr.Config().SourcePath); err != nil { + if rdr.conn, err = amqpv1.Dial(rdr.Config().SourcePath, nil); err != nil { return } - if rdr.ses, err = rdr.conn.NewSession(); err != nil { + if rdr.ses, err = rdr.conn.NewSession(context.TODO(), nil); err != nil { rdr.close() return } @@ -98,9 +98,8 @@ func (rdr *AMQPv1ER) Serve() (err error) { } var receiver *amqpv1.Receiver - if receiver, err = rdr.ses.NewReceiver( - amqpv1.LinkSourceAddress(rdr.queueID), - ); err != nil { + if receiver, err = rdr.ses.NewReceiver(context.TODO(), rdr.queueID, + nil); err != nil { return } go func() { @@ -121,7 +120,7 @@ func (rdr *AMQPv1ER) readLoop(recv *amqpv1.Receiver) (err error) { ctx := context.Background() var msg *amqpv1.Message if msg, err = recv.Receive(ctx); err != nil { - if err == amqpv1.ErrLinkClosed { + if err.Error() == "amqp: link closed" { err = nil return } diff --git a/ers/amqpv1_it_test.go b/ers/amqpv1_it_test.go index ec0f47d4a..a614f45d7 100644 --- a/ers/amqpv1_it_test.go +++ b/ers/amqpv1_it_test.go @@ -86,20 +86,20 @@ func TestAMQPERv1(t *testing.T) { t.Fatal(err) } amqpv1Rdr := rdr.(*AMQPv1ER) - connection, err := amqpv1.Dial("amqps://RootManageSharedAccessKey:Je8l%2Bt9tyOgZbdA%2B5SmGIJEsEzhZ9VdIO7yRke5EYtM%3D@test0123456y.servicebus.windows.net") + connection, err := amqpv1.Dial("amqps://RootManageSharedAccessKey:Je8l%2Bt9tyOgZbdA%2B5SmGIJEsEzhZ9VdIO7yRke5EYtM%3D@test0123456y.servicebus.windows.net", nil) if err != nil { t.Fatal(err) } defer connection.Close() - channel, err := connection.NewSession() + channel, err := connection.NewSession(context.Background(), nil) if err != nil { t.Fatal(err) } defer channel.Close(context.Background()) randomCGRID := utils.UUIDSha1Prefix() - sndr, err := channel.NewSender(amqpv1.LinkTargetAddress(amqpv1Rdr.queueID)) + sndr, err := channel.NewSender(context.Background(), amqpv1Rdr.queueID, nil) if err != nil { t.Fatal(err) } diff --git a/go.mod b/go.mod index 0380f9646..a48c0e46a 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ go 1.20 // replace github.com/cgrates/aringo => ../aringo require ( - github.com/Azure/go-amqp v0.17.5 + github.com/Azure/go-amqp v0.18.1 github.com/antchfx/xmlquery v1.3.3 github.com/aws/aws-sdk-go v1.36.24 github.com/blevesearch/bleve v1.0.14 diff --git a/go.sum b/go.sum index 1f0c288cb..c36333e6a 100644 --- a/go.sum +++ b/go.sum @@ -34,8 +34,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -github.com/Azure/go-amqp v0.17.5 h1:7Lsi9H9ijCAfqOaMiNmQ4c+GL9bdrpCjebNKhV/eQ+c= -github.com/Azure/go-amqp v0.17.5/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg= +github.com/Azure/go-amqp v0.18.1 h1:D5Ca+uijuTcj5g76sF+zT4OQZcFFY397+IGf/5Ip5Sc= +github.com/Azure/go-amqp v0.18.1/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/RoaringBitmap/roaring v0.4.23/go.mod h1:D0gp8kJQgE1A4LQ5wFLggQEyvDi06Mq5mKs52e1TwOo= @@ -148,7 +148,6 @@ github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+ne github.com/fiorix/go-diameter/v4 v4.0.2 h1:JVxecvfWqqvZWVO6PLTwUj/CV7CusaqYtz40WNYfWvI= github.com/fiorix/go-diameter/v4 v4.0.2/go.mod h1:Qx/+pf+c9sBUHWq1d7EH3bkdwN8U0mUpdy9BieDw6UQ= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= -github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -211,8 +210,8 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=