diff --git a/ees/amqpv1.go b/ees/amqpv1.go index 21226589b..499a62072 100644 --- a/ees/amqpv1.go +++ b/ees/amqpv1.go @@ -44,7 +44,7 @@ func NewAMQPv1EE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *AMQPv1 // AMQPv1EE a poster for amqpv1 type AMQPv1EE struct { queueID string // identifier of the CDR queue where we publish - client *amqpv1.Client + conn *amqpv1.Conn session *amqpv1.Session cfg *config.EventExporterCfg @@ -59,19 +59,19 @@ func (pstr *AMQPv1EE) Cfg() *config.EventExporterCfg { return pstr.cfg } func (pstr *AMQPv1EE) Connect() (err error) { pstr.Lock() defer pstr.Unlock() - if pstr.client == nil { - if pstr.client, err = amqpv1.Dial(pstr.Cfg().ExportPath); err != nil { + if pstr.conn == nil { + if pstr.conn, err = amqpv1.Dial(pstr.Cfg().ExportPath, nil); err != nil { return } } if pstr.session == nil { - pstr.session, err = pstr.client.NewSession() + pstr.session, err = pstr.conn.NewSession(context.TODO(), nil) if err != nil { // reset client and try again // used in case of closed connection because of idle time - if pstr.client != nil { - pstr.client.Close() // Make shure the connection is closed before reseting it - pstr.client = nil + if pstr.conn != nil { + pstr.conn.Close() // Make shure the connection is closed before reseting it + pstr.conn = nil } } } @@ -88,9 +88,7 @@ func (pstr *AMQPv1EE) ExportEvent(ctx *context.Context, content, _ interface{}) if pstr.session == nil { return utils.ErrDisconnected } - sender, err := pstr.session.NewSender( - amqpv1.LinkTargetAddress(pstr.queueID), - ) + sender, err := pstr.session.NewSender(ctx, pstr.queueID, nil) if err != nil { return } @@ -106,9 +104,9 @@ func (pstr *AMQPv1EE) Close() (err error) { pstr.session.Close(context.Background()) pstr.session = nil } - if pstr.client != nil { - err = pstr.client.Close() - pstr.client = nil + if pstr.conn != nil { + err = pstr.conn.Close() + pstr.conn = nil } pstr.Unlock() return diff --git a/ees/amqpv1_it_test.go b/ees/amqpv1_it_test.go index 62c390ef9..2ac8714dc 100644 --- a/ees/amqpv1_it_test.go +++ b/ees/amqpv1_it_test.go @@ -45,8 +45,10 @@ var ( testAMQPv1ResetDataDB, testAMQPv1StartEngine, testAMQPv1RPCConn, + testAMQPv1ExportEvent, testAMQPv1VerifyExport, + testStopCgrEngine, } ) @@ -131,7 +133,7 @@ func testAMQPv1ExportEvent(t *testing.T) { func testAMQPv1VerifyExport(t *testing.T) { // Create client - client, err := amqpv1.Dial(amqpv1DialURL) + client, err := amqpv1.Dial(amqpv1DialURL, nil) /* an alternative way to create the client client, err := amqpv1.Dial("amqps://cgratescdrs.servicebus.windows.net", amqpv1.ConnSASLPlain("access-key-name", "access-key"), @@ -143,7 +145,7 @@ func testAMQPv1VerifyExport(t *testing.T) { defer client.Close() // Open a session - session, err := client.NewSession() + session, err := client.NewSession(context.Background(), nil) if err != nil { t.Fatal("Creating AMQP session:", err) } @@ -151,10 +153,7 @@ func testAMQPv1VerifyExport(t *testing.T) { ctx := context.Background() // Create a receiver - receiver, err := session.NewReceiver( - amqpv1.LinkSourceAddress("/cgrates_cdrs"), - amqpv1.LinkCredit(10), - ) + receiver, err := session.NewReceiver(context.Background(), "/cgrates_cdrs", nil) if err != nil { t.Fatal("Creating receiver link:", err) } diff --git a/ees/poster_it_test.go b/ees/poster_it_test.go index 7be032be8..c4eae4526 100644 --- a/ees/poster_it_test.go +++ b/ees/poster_it_test.go @@ -298,14 +298,14 @@ func TestAMQPv1Poster(t *testing.T) { t.Fatal(err) } // Create client - client, err := amqpv1.Dial(endpoint) + client, err := amqpv1.Dial(endpoint, nil) if err != nil { t.Fatal("Dialing AMQP server:", err) } defer client.Close() // Open a session - session, err := client.NewSession() + session, err := client.NewSession(context.Background(), nil) if err != nil { t.Fatal("Creating AMQP session:", err) } @@ -313,9 +313,7 @@ func TestAMQPv1Poster(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) // Create a receiver - receiver, err := session.NewReceiver( - amqpv1.LinkSourceAddress("/" + qname), - ) + receiver, err := session.NewReceiver(ctx, "/"+qname, nil) if err != nil { t.Fatal("Creating receiver link:", err) } diff --git a/ers/amqpv1.go b/ers/amqpv1.go index f70969ee7..1771303d1 100644 --- a/ers/amqpv1.go +++ b/ers/amqpv1.go @@ -75,7 +75,7 @@ type AMQPv1ER struct { rdrErr chan error cap chan struct{} - conn *amqpv1.Client + conn *amqpv1.Conn ses *amqpv1.Session poster *ees.AMQPv1EE @@ -88,10 +88,10 @@ func (rdr *AMQPv1ER) Config() *config.EventReaderCfg { // Serve will start the gorutines needed to watch the amqpv1 topic func (rdr *AMQPv1ER) Serve() (err error) { - if rdr.conn, err = amqpv1.Dial(rdr.Config().SourcePath); err != nil { + if rdr.conn, err = amqpv1.Dial(rdr.Config().SourcePath, nil); err != nil { return } - if rdr.ses, err = rdr.conn.NewSession(); err != nil { + if rdr.ses, err = rdr.conn.NewSession(context.TODO(), nil); err != nil { rdr.close() return } @@ -100,9 +100,8 @@ func (rdr *AMQPv1ER) Serve() (err error) { } var receiver *amqpv1.Receiver - if receiver, err = rdr.ses.NewReceiver( - amqpv1.LinkSourceAddress(rdr.queueID), - ); err != nil { + if receiver, err = rdr.ses.NewReceiver(context.TODO(), rdr.queueID, + nil); err != nil { return } go func() { @@ -125,7 +124,7 @@ func (rdr *AMQPv1ER) readLoop(recv *amqpv1.Receiver) (err error) { ctx := context.Background() var msg *amqpv1.Message if msg, err = recv.Receive(ctx); err != nil { - if err == amqpv1.ErrLinkClosed { + if err.Error() == "amqp: link closed" { err = nil return } diff --git a/ers/amqpv1_it_test.go b/ers/amqpv1_it_test.go index f23e86367..05d277314 100644 --- a/ers/amqpv1_it_test.go +++ b/ers/amqpv1_it_test.go @@ -84,20 +84,20 @@ func TestAMQPERv1(t *testing.T) { t.Fatal(err) } amqpv1Rdr := rdr.(*AMQPv1ER) - connection, err := amqpv1.Dial("amqps://RootManageSharedAccessKey:Je8l%2Bt9tyOgZbdA%2B5SmGIJEsEzhZ9VdIO7yRke5EYtM%3D@test0123456y.servicebus.windows.net") + connection, err := amqpv1.Dial("amqps://RootManageSharedAccessKey:Je8l%2Bt9tyOgZbdA%2B5SmGIJEsEzhZ9VdIO7yRke5EYtM%3D@test0123456y.servicebus.windows.net", nil) if err != nil { t.Fatal(err) } defer connection.Close() - channel, err := connection.NewSession() + channel, err := connection.NewSession(context.Background(), nil) if err != nil { t.Fatal(err) } defer channel.Close(context.Background()) randomOriginID := utils.UUIDSha1Prefix() - sndr, err := channel.NewSender(amqpv1.LinkTargetAddress(amqpv1Rdr.queueID)) + sndr, err := channel.NewSender(context.Background(), amqpv1Rdr.queueID, nil) if err != nil { t.Fatal(err) } diff --git a/go.mod b/go.mod index 707c43fff..add6eddd7 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ go 1.18 // replace github.com/cgrates/rpcclient => ../rpcclient require ( - github.com/Azure/go-amqp v0.17.5 + github.com/Azure/go-amqp v0.18.1 github.com/antchfx/xmlquery v1.3.11 github.com/aws/aws-sdk-go v1.44.43 github.com/blevesearch/bleve v1.0.14 diff --git a/go.sum b/go.sum index 9ed918663..13473dc7c 100644 --- a/go.sum +++ b/go.sum @@ -55,8 +55,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.22.1/go.mod h1:S8N1cAStu7BOeFfE8KAQzmyyLkK8p/vmRq6kuBTW58Y= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -github.com/Azure/go-amqp v0.17.5 h1:7Lsi9H9ijCAfqOaMiNmQ4c+GL9bdrpCjebNKhV/eQ+c= -github.com/Azure/go-amqp v0.17.5/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg= +github.com/Azure/go-amqp v0.18.1 h1:D5Ca+uijuTcj5g76sF+zT4OQZcFFY397+IGf/5Ip5Sc= +github.com/Azure/go-amqp v0.18.1/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/Masterminds/semver/v3 v3.1.1 h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030IGemrRc= @@ -203,7 +203,6 @@ github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+ne github.com/fiorix/go-diameter/v4 v4.0.4 h1:/nw5zEmEW7pmP9YUYjOfU1GomR0LupKdYy52yd1j3NM= github.com/fiorix/go-diameter/v4 v4.0.4/go.mod h1:Qx/+pf+c9sBUHWq1d7EH3bkdwN8U0mUpdy9BieDw6UQ= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= -github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI= github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=