From b3378d62afa233de418b9fb9c09e8a06f11eacaa Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 19 Dec 2018 12:21:59 +0200 Subject: [PATCH] Added suport for RabbitMQ routing keys. Fix #1332 --- engine/poster.go | 112 ++++++++++++++++++++++++++++++++++++------ engine/poster_test.go | 43 ++++++++++++++++ 2 files changed, 139 insertions(+), 16 deletions(-) create mode 100644 engine/poster_test.go diff --git a/engine/poster.go b/engine/poster.go index b8fdacb00..578b857a3 100644 --- a/engine/poster.go +++ b/engine/poster.go @@ -36,6 +36,17 @@ import ( "github.com/streadway/amqp" ) +var AMQPQuery = []string{"cacertfile", "certfile", "keyfile", "verify", "server_name_indication", "auth_mechanism", "heartbeat", "connection_timeout", "channel_max"} + +const ( + defaultQueueID = "cgrates_cdrs" + defaultExchangeType = "direct" + queueID = "queue_id" + exchange = "exchange" + exchangeType = "exchange_type" + routingKey = "routing_key" +) + func init() { AMQPPostersCache = &AMQPCachedPosters{cache: make(map[string]*AMQPPoster)} // Initialize the cache for amqpPosters } @@ -161,29 +172,59 @@ func (pc *AMQPCachedPosters) GetAMQPPoster(dialURL string, attempts int, // "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs" func NewAMQPPoster(dialURL string, attempts int, fallbackFileDir string) (*AMQPPoster, error) { - u, err := url.Parse(dialURL) - if err != nil { + amqp := &AMQPPoster{ + attempts: attempts, + fallbackFileDir: fallbackFileDir, + } + if err := amqp.parseURL(dialURL); err != nil { return nil, err } - qry := u.Query() - posterQueueID := "cgrates_cdrs" - if vals, has := qry["queue_id"]; has && len(vals) != 0 { - posterQueueID = vals[0] - } - dialURL = strings.Split(dialURL, "?")[0] // Take query params out of dialURL - return &AMQPPoster{dialURL: dialURL, posterQueueID: posterQueueID, - attempts: attempts, fallbackFileDir: fallbackFileDir}, nil + return amqp, nil } type AMQPPoster struct { dialURL string - posterQueueID string // identifier of the CDR queue where we publish + queueID string // identifier of the CDR queue where we publish + exchange string + exchangeType string + routingKey string attempts int fallbackFileDir string sync.Mutex // protect connection conn *amqp.Connection } +func (pstr *AMQPPoster) parseURL(dialURL string) error { + u, err := url.Parse(dialURL) + if err != nil { + return err + } + qry := u.Query() + q := url.Values{} + for _, key := range AMQPQuery { + if vals, has := qry[key]; has && len(vals) != 0 { + q.Add(key, vals[0]) + } + } + pstr.dialURL = strings.Split(dialURL, "?")[0] + "?" + q.Encode() + pstr.queueID = defaultQueueID + pstr.routingKey = defaultQueueID + if vals, has := qry[queueID]; has && len(vals) != 0 { + pstr.queueID = vals[0] + } + if vals, has := qry[routingKey]; has && len(vals) != 0 { + pstr.routingKey = vals[0] + } + if vals, has := qry[exchange]; has && len(vals) != 0 { + pstr.exchange = vals[0] + pstr.exchangeType = defaultExchangeType + } + if vals, has := qry[exchangeType]; has && len(vals) != 0 { + pstr.exchangeType = vals[0] + } + return nil +} + // Post is the method being called when we need to post anything in the queue // the optional chn will permits channel caching func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []byte, @@ -208,10 +249,10 @@ func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []by } for i := 0; i < pstr.attempts; i++ { if err = chn.Publish( - "", // exchange - pstr.posterQueueID, // routing key - false, // mandatory - false, // immediate + pstr.exchange, // exchange + pstr.queueID, // routing key + false, // mandatory + false, // immediate amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: contentType, @@ -259,7 +300,46 @@ func (pstr *AMQPPoster) NewPostChannel() (postChan *amqp.Channel, err error) { if postChan, err = pstr.conn.Channel(); err != nil { return } - _, err = postChan.QueueDeclare(pstr.posterQueueID, true, false, false, false, nil) + + if pstr.exchange != "" { + err = postChan.ExchangeDeclare( + pstr.exchange, // name + pstr.exchangeType, // type + true, // durable + false, // audo-delete + false, // internal + false, // no-wait + nil, // args + ) + if err != nil { + return + } + } + + _, err = postChan.QueueDeclare( + pstr.queueID, // name + true, // durable + false, // auto-delete + false, // exclusive + false, // no-wait + nil, // args + ) + if err != nil { + return + } + + if pstr.exchange != "" { + err = postChan.QueueBind( + pstr.queueID, // queue + routingKey, // key + pstr.exchange, // exchange + false, // no-wait + nil, // args + ) + if err != nil { + return + } + } return } diff --git a/engine/poster_test.go b/engine/poster_test.go new file mode 100644 index 000000000..7cf764283 --- /dev/null +++ b/engine/poster_test.go @@ -0,0 +1,43 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "reflect" + "testing" + + "github.com/cgrates/cgrates/utils" +) + +func TestAMQPPosterParseURL(t *testing.T) { + amqp := &AMQPPoster{} + expected := &AMQPPoster{ + dialURL: "amqp://guest:guest@localhost:5672/?heartbeat=5", + queueID: "q1", + exchange: "E1", + exchangeType: "fanout", + routingKey: "CGRCDR", + } + dialURL := "amqp://guest:guest@localhost:5672/?queue_id=q1&exchange=E1&routing_key=CGRCDR&heartbeat=5&exchange_type=fanout" + if err := amqp.parseURL(dialURL); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expected, amqp) { + t.Errorf("Expected: %s ,recived: %s", utils.ToJSON(expected), utils.ToJSON(amqp)) + } +}