From dbd497c4d4efdee0a053838be4bb2e681fba0d1d Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 22 Feb 2023 05:30:33 -0500 Subject: [PATCH] Update to latest amqp 1.0 package version and fix compilation errors --- apier/v1/cdre_amqpv1_it_test.go | 13 +++---- engine/poster_it_test.go | 8 ++-- engine/pstr_amqpv1.go | 69 +++++++-------------------------- go.mod | 2 +- go.sum | 2 + 5 files changed, 24 insertions(+), 70 deletions(-) diff --git a/apier/v1/cdre_amqpv1_it_test.go b/apier/v1/cdre_amqpv1_it_test.go index 529ab7780..203662bae 100644 --- a/apier/v1/cdre_amqpv1_it_test.go +++ b/apier/v1/cdre_amqpv1_it_test.go @@ -216,7 +216,7 @@ func testAMQPv1ExportCDRs(t *testing.T) { func testAMQPv1VerifyExport(t *testing.T) { // Create client - client, err := amqp.Dial(amqpv1DialURL) + client, err := amqp.Dial(amqpv1DialURL, nil) /* an alternative way to create the client client, err := amqp.Dial("amqps://name-space.servicebus.windows.net", amqp.ConnSASLPlain("access-key-name", "access-key"), @@ -227,14 +227,14 @@ func testAMQPv1VerifyExport(t *testing.T) { } defer client.Close() + ctx := context.Background() + // Open a session - session, err := client.NewSession() + session, err := client.NewSession(ctx, nil) if err != nil { t.Fatal("Creating AMQP session:", err) } - ctx := context.Background() - expCDRs := []string{ `{"Account":"1001","CGRID":"Cdr2","Category":"call","Cost":"-1.0000","Destination":"+4986517174963","OriginID":"OriginCDR2","RunID":"*default","Source":"test2","Tenant":"cgrates.org","Usage":"5s"}`, `{"Account":"1001","CGRID":"Cdr3","Category":"call","Cost":"-1.0000","Destination":"+4986517174963","OriginID":"OriginCDR3","RunID":"*default","Source":"test2","Tenant":"cgrates.org","Usage":"30s"}`, @@ -242,10 +242,7 @@ func testAMQPv1VerifyExport(t *testing.T) { rplyCDRs := make([]string, 0) // Create a receiver - receiver, err := session.NewReceiver( - amqp.LinkSourceAddress("/cgrates_cdrs"), - amqp.LinkCredit(10), - ) + receiver, err := session.NewReceiver(ctx, "/cgrates_cdrs", nil) if err != nil { t.Fatal("Creating receiver link:", err) } diff --git a/engine/poster_it_test.go b/engine/poster_it_test.go index 772e462db..3ee66ac96 100644 --- a/engine/poster_it_test.go +++ b/engine/poster_it_test.go @@ -298,14 +298,14 @@ func TestAMQPv1Poster(t *testing.T) { t.Fatal(err) } // Create client - client, err := amqpv1.Dial(endpoint) + client, err := amqpv1.Dial(endpoint, nil) if err != nil { t.Fatal("Dialing AMQP server:", err) } defer client.Close() // Open a session - session, err := client.NewSession() + session, err := client.NewSession(context.Background(), nil) if err != nil { t.Fatal("Creating AMQP session:", err) } @@ -313,9 +313,7 @@ func TestAMQPv1Poster(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) // Create a receiver - receiver, err := session.NewReceiver( - amqpv1.LinkSourceAddress("/" + qname), - ) + receiver, err := session.NewReceiver(ctx, "/"+qname, nil) if err != nil { t.Fatal("Creating receiver link:", err) } diff --git a/engine/pstr_amqpv1.go b/engine/pstr_amqpv1.go index e038ae335..35c42267d 100644 --- a/engine/pstr_amqpv1.go +++ b/engine/pstr_amqpv1.go @@ -21,7 +21,6 @@ package engine import ( "context" "fmt" - "net" "sync" "time" @@ -48,16 +47,16 @@ type AMQPv1Poster struct { dialURL string queueID string // identifier of the CDR queue where we publish attempts int - client *amqpv1.Client + conn *amqpv1.Conn } // Close closes the connections func (pstr *AMQPv1Poster) Close() { pstr.Lock() - if pstr.client != nil { - pstr.client.Close() + if pstr.conn != nil { + pstr.conn.Close() } - pstr.client = nil + pstr.conn = nil pstr.Unlock() } @@ -72,10 +71,10 @@ func (pstr *AMQPv1Poster) Post(content []byte, _ string) (err error) { } // reset client and try again // used in case of closed connection because of idle time - if pstr.client != nil { - pstr.client.Close() // Make shure the connection is closed before reseting it + if pstr.conn != nil { + pstr.conn.Close() // Make shure the connection is closed before reseting it } - pstr.client = nil + pstr.conn = nil if i+1 < pstr.attempts { time.Sleep(time.Duration(fib()) * time.Second) } @@ -87,22 +86,11 @@ func (pstr *AMQPv1Poster) Post(content []byte, _ string) (err error) { ctx := context.Background() for i := 0; i < pstr.attempts; i++ { - sender, err := s.NewSender( - amqpv1.LinkTargetAddress(pstr.queueID), - ) + sender, err := s.NewSender(ctx, pstr.queueID, nil) if err != nil { if i+1 < pstr.attempts { time.Sleep(time.Duration(fib()) * time.Second) } - // if pstr.isRecoverableError(err) { - // s.Close(ctx) - // pstr.client.Close() - // pstr.client = nil - // stmp, err := pstr.newPosterSession() - // if err == nil { - // s = stmp - // } - // } continue } // Send message @@ -114,15 +102,6 @@ func (pstr *AMQPv1Poster) Post(content []byte, _ string) (err error) { if i+1 < pstr.attempts { time.Sleep(time.Duration(fib()) * time.Second) } - // if pstr.isRecoverableError(err) { - // s.Close(ctx) - // pstr.client.Close() - // pstr.client = nil - // stmp, err := pstr.newPosterSession() - // if err == nil { - // s = stmp - // } - // } } if err != nil { return @@ -136,35 +115,13 @@ func (pstr *AMQPv1Poster) Post(content []byte, _ string) (err error) { func (pstr *AMQPv1Poster) newPosterSession() (s *amqpv1.Session, err error) { pstr.Lock() defer pstr.Unlock() - if pstr.client == nil { - var client *amqpv1.Client - client, err = amqpv1.Dial(pstr.dialURL) + if pstr.conn == nil { + var client *amqpv1.Conn + client, err = amqpv1.Dial(pstr.dialURL, nil) if err != nil { return nil, err } - pstr.client = client + pstr.conn = client } - return pstr.client.NewSession() -} - -func isRecoverableCloseError(err error) bool { - return err == amqpv1.ErrConnClosed || - err == amqpv1.ErrLinkClosed || - err == amqpv1.ErrSessionClosed -} - -func (pstr *AMQPv1Poster) isRecoverableError(err error) bool { - switch err.(type) { - case *amqpv1.Error, *amqpv1.DetachError, net.Error: - if netErr, ok := err.(net.Error); ok { - if !netErr.Temporary() { - return false - } - } - default: - if !isRecoverableCloseError(err) { - return false - } - } - return true + return pstr.conn.NewSession(context.TODO(), nil) } diff --git a/go.mod b/go.mod index 1de0a643f..dfc68bf03 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,7 @@ require ( google.golang.org/api v0.103.0 ) -require github.com/Azure/go-amqp v0.17.5 +require github.com/Azure/go-amqp v0.18.1 require ( cloud.google.com/go/compute v1.12.1 // indirect diff --git a/go.sum b/go.sum index 4b5f4ab07..ec6131d84 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,8 @@ cloud.google.com/go/compute/metadata v0.2.1/go.mod h1:jgHgmJd2RKBGzXqF5LR2EZMGxB cloud.google.com/go/longrunning v0.3.0 h1:NjljC+FYPV3uh5/OwWT6pVU+doBqMg2x/rZlE+CamDs= github.com/Azure/go-amqp v0.17.5 h1:7Lsi9H9ijCAfqOaMiNmQ4c+GL9bdrpCjebNKhV/eQ+c= github.com/Azure/go-amqp v0.17.5/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg= +github.com/Azure/go-amqp v0.18.1 h1:D5Ca+uijuTcj5g76sF+zT4OQZcFFY397+IGf/5Ip5Sc= +github.com/Azure/go-amqp v0.18.1/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBKeU+7zCJoLcc= github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=