Added suport for RabbitMQ routing keys. Fix #1332

This commit is contained in:
Trial97
2018-12-19 12:21:59 +02:00
committed by Dan Christian Bogos
parent 26655606f5
commit b3378d62af
2 changed files with 139 additions and 16 deletions

View File

@@ -36,6 +36,17 @@ import (
"github.com/streadway/amqp"
)
var AMQPQuery = []string{"cacertfile", "certfile", "keyfile", "verify", "server_name_indication", "auth_mechanism", "heartbeat", "connection_timeout", "channel_max"}
const (
defaultQueueID = "cgrates_cdrs"
defaultExchangeType = "direct"
queueID = "queue_id"
exchange = "exchange"
exchangeType = "exchange_type"
routingKey = "routing_key"
)
func init() {
AMQPPostersCache = &AMQPCachedPosters{cache: make(map[string]*AMQPPoster)} // Initialize the cache for amqpPosters
}
@@ -161,29 +172,59 @@ func (pc *AMQPCachedPosters) GetAMQPPoster(dialURL string, attempts int,
// "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs"
func NewAMQPPoster(dialURL string, attempts int, fallbackFileDir string) (*AMQPPoster, error) {
u, err := url.Parse(dialURL)
if err != nil {
amqp := &AMQPPoster{
attempts: attempts,
fallbackFileDir: fallbackFileDir,
}
if err := amqp.parseURL(dialURL); err != nil {
return nil, err
}
qry := u.Query()
posterQueueID := "cgrates_cdrs"
if vals, has := qry["queue_id"]; has && len(vals) != 0 {
posterQueueID = vals[0]
}
dialURL = strings.Split(dialURL, "?")[0] // Take query params out of dialURL
return &AMQPPoster{dialURL: dialURL, posterQueueID: posterQueueID,
attempts: attempts, fallbackFileDir: fallbackFileDir}, nil
return amqp, nil
}
type AMQPPoster struct {
dialURL string
posterQueueID string // identifier of the CDR queue where we publish
queueID string // identifier of the CDR queue where we publish
exchange string
exchangeType string
routingKey string
attempts int
fallbackFileDir string
sync.Mutex // protect connection
conn *amqp.Connection
}
func (pstr *AMQPPoster) parseURL(dialURL string) error {
u, err := url.Parse(dialURL)
if err != nil {
return err
}
qry := u.Query()
q := url.Values{}
for _, key := range AMQPQuery {
if vals, has := qry[key]; has && len(vals) != 0 {
q.Add(key, vals[0])
}
}
pstr.dialURL = strings.Split(dialURL, "?")[0] + "?" + q.Encode()
pstr.queueID = defaultQueueID
pstr.routingKey = defaultQueueID
if vals, has := qry[queueID]; has && len(vals) != 0 {
pstr.queueID = vals[0]
}
if vals, has := qry[routingKey]; has && len(vals) != 0 {
pstr.routingKey = vals[0]
}
if vals, has := qry[exchange]; has && len(vals) != 0 {
pstr.exchange = vals[0]
pstr.exchangeType = defaultExchangeType
}
if vals, has := qry[exchangeType]; has && len(vals) != 0 {
pstr.exchangeType = vals[0]
}
return nil
}
// Post is the method being called when we need to post anything in the queue
// the optional chn will permits channel caching
func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []byte,
@@ -208,10 +249,10 @@ func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []by
}
for i := 0; i < pstr.attempts; i++ {
if err = chn.Publish(
"", // exchange
pstr.posterQueueID, // routing key
false, // mandatory
false, // immediate
pstr.exchange, // exchange
pstr.queueID, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: contentType,
@@ -259,7 +300,46 @@ func (pstr *AMQPPoster) NewPostChannel() (postChan *amqp.Channel, err error) {
if postChan, err = pstr.conn.Channel(); err != nil {
return
}
_, err = postChan.QueueDeclare(pstr.posterQueueID, true, false, false, false, nil)
if pstr.exchange != "" {
err = postChan.ExchangeDeclare(
pstr.exchange, // name
pstr.exchangeType, // type
true, // durable
false, // audo-delete
false, // internal
false, // no-wait
nil, // args
)
if err != nil {
return
}
}
_, err = postChan.QueueDeclare(
pstr.queueID, // name
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
nil, // args
)
if err != nil {
return
}
if pstr.exchange != "" {
err = postChan.QueueBind(
pstr.queueID, // queue
routingKey, // key
pstr.exchange, // exchange
false, // no-wait
nil, // args
)
if err != nil {
return
}
}
return
}

43
engine/poster_test.go Normal file
View File

@@ -0,0 +1,43 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"reflect"
"testing"
"github.com/cgrates/cgrates/utils"
)
func TestAMQPPosterParseURL(t *testing.T) {
amqp := &AMQPPoster{}
expected := &AMQPPoster{
dialURL: "amqp://guest:guest@localhost:5672/?heartbeat=5",
queueID: "q1",
exchange: "E1",
exchangeType: "fanout",
routingKey: "CGRCDR",
}
dialURL := "amqp://guest:guest@localhost:5672/?queue_id=q1&exchange=E1&routing_key=CGRCDR&heartbeat=5&exchange_type=fanout"
if err := amqp.parseURL(dialURL); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expected, amqp) {
t.Errorf("Expected: %s ,recived: %s", utils.ToJSON(expected), utils.ToJSON(amqp))
}
}