Add options for SASL PLAIN auth (amqp 1.0)

This commit is contained in:
ionutboangiu
2023-02-22 11:54:16 -05:00
committed by Dan Christian Bogos
parent f5d007a3e0
commit 25e08ebe79
10 changed files with 133 additions and 29 deletions

View File

@@ -389,11 +389,16 @@ const CGRATES_CFG_JSON = `
// "amqpQueueID": "cgrates_cdrs", // the queue id for AMQP and AMQPv1 readers from were the events are read
// "amqpQueueIDProcessed": "", // the queue id for AMQP and AMQPv1 readers were the events are sent after they are processed
// AMQP
// "amqpConsumerTag": "cgrates", // the ID of the consumer
// "amqpExchange": "",
// "amqpExchangeType": "",
// "amqpRoutingKey": "",
// "amqpUsername": "", // amqp 1.0 exclusive, used for SASL PLAIN auth, usually represents the policy name
// "amqpPassword": "", // amqp 1.0 exclusive, used for SASL PLAIN auth, populated with one of its policy's keys
// "amqpUsernameProcessed": "",
// "amqpPasswordProcessed": "",
// "amqpConsumerTag": "cgrates", // the ID of the consumer, amqp 0.9.1 exclusive
// "amqpExchange": "", // amqp 0.9.1 exclusive
// "amqpExchangeType": "", // amqp 0.9.1 exclusive
// "amqpRoutingKey": "", // amqp 0.9.1 exclusive
// "amqpExchangeProcessed": "",
// "amqpExchangeTypeProcessed": "",
@@ -532,9 +537,11 @@ const CGRATES_CFG_JSON = `
// AMQP
// "amqpQueueID": "cgrates_cdrs", // the queue id for AMQP exporters from were the events are exported
// "amqpRoutingKey": "", // RoutingKey
// "amqpExchange": "", // Exchange
// "amqpExchangeType": "", // ExchangeType
// "amqpRoutingKey": "", // RoutingKey, amqp 0.9.1 exclusive
// "amqpExchange": "", // Exchange, amqp 0.9.1 exclusive
// "amqpExchangeType": "", // ExchangeType, amqp 0.9.1 exclusive
// "amqpUsername": "", // amqp 1.0 exclusive, used for SASL PLAIN auth, usually represents the policy name
// "amqpPassword": "", // amqp 1.0 exclusive, used for SASL PLAIN auth, populated with one of its policy's keys
// SQS and S3

View File

@@ -177,6 +177,8 @@ type EventExporterOpts struct {
AMQPQueueID *string
AMQPExchange *string
AMQPExchangeType *string
AMQPUsername *string
AMQPPassword *string
AWSRegion *string
AWSKey *string
AWSSecret *string
@@ -307,6 +309,12 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson)
if jsnCfg.AMQPExchangeType != nil {
eeOpts.AMQPExchangeType = jsnCfg.AMQPExchangeType
}
if jsnCfg.AMQPUsername != nil {
eeOpts.AMQPUsername = jsnCfg.AMQPUsername
}
if jsnCfg.AMQPPassword != nil {
eeOpts.AMQPPassword = jsnCfg.AMQPPassword
}
if jsnCfg.AWSRegion != nil {
eeOpts.AWSRegion = jsnCfg.AWSRegion
}
@@ -567,6 +575,12 @@ func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts {
if eeOpts.AMQPExchangeType != nil {
cln.AMQPExchangeType = utils.StringPointer(*eeOpts.AMQPExchangeType)
}
if eeOpts.AMQPUsername != nil {
cln.AMQPUsername = utils.StringPointer(*eeOpts.AMQPUsername)
}
if eeOpts.AMQPPassword != nil {
cln.AMQPPassword = utils.StringPointer(*eeOpts.AMQPPassword)
}
if eeOpts.AWSRegion != nil {
cln.AWSRegion = utils.StringPointer(*eeOpts.AWSRegion)
}
@@ -766,6 +780,12 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str
if eeC.Opts.AMQPExchangeType != nil {
opts[utils.AMQPExchangeType] = *eeC.Opts.AMQPExchangeType
}
if eeC.Opts.AMQPUsername != nil {
opts[utils.AMQPUsername] = *eeC.Opts.AMQPUsername
}
if eeC.Opts.AMQPPassword != nil {
opts[utils.AMQPPassword] = *eeC.Opts.AMQPPassword
}
if eeC.Opts.AWSRegion != nil {
opts[utils.AWSRegion] = *eeC.Opts.AWSRegion
}

View File

@@ -147,6 +147,10 @@ type EventReaderOpts struct {
XMLRootPath *string
AMQPQueueID *string
AMQPQueueIDProcessed *string
AMQPUsername *string
AMQPPassword *string
AMQPUsernameProcessed *string
AMQPPasswordProcessed *string
AMQPConsumerTag *string
AMQPExchange *string
AMQPExchangeType *string
@@ -252,6 +256,18 @@ func (erOpts *EventReaderOpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err
if jsnCfg.AMQPQueueIDProcessed != nil {
erOpts.AMQPQueueIDProcessed = jsnCfg.AMQPQueueIDProcessed
}
if jsnCfg.AMQPUsername != nil {
erOpts.AMQPUsername = jsnCfg.AMQPUsername
}
if jsnCfg.AMQPPassword != nil {
erOpts.AMQPPassword = jsnCfg.AMQPPassword
}
if jsnCfg.AMQPUsernameProcessed != nil {
erOpts.AMQPUsernameProcessed = jsnCfg.AMQPUsernameProcessed
}
if jsnCfg.AMQPPasswordProcessed != nil {
erOpts.AMQPPasswordProcessed = jsnCfg.AMQPPasswordProcessed
}
if jsnCfg.AMQPConsumerTag != nil {
erOpts.AMQPConsumerTag = jsnCfg.AMQPConsumerTag
}
@@ -523,6 +539,18 @@ func (erOpts *EventReaderOpts) Clone() *EventReaderOpts {
if erOpts.AMQPQueueIDProcessed != nil {
cln.AMQPQueueIDProcessed = utils.StringPointer(*erOpts.AMQPQueueIDProcessed)
}
if erOpts.AMQPUsername != nil {
cln.AMQPUsername = utils.StringPointer(*erOpts.AMQPUsername)
}
if erOpts.AMQPPassword != nil {
cln.AMQPPassword = utils.StringPointer(*erOpts.AMQPPassword)
}
if erOpts.AMQPUsernameProcessed != nil {
cln.AMQPUsernameProcessed = utils.StringPointer(*erOpts.AMQPUsernameProcessed)
}
if erOpts.AMQPPasswordProcessed != nil {
cln.AMQPPasswordProcessed = utils.StringPointer(*erOpts.AMQPPasswordProcessed)
}
if erOpts.AMQPConsumerTag != nil {
cln.AMQPConsumerTag = utils.StringPointer(*erOpts.AMQPConsumerTag)
}
@@ -748,6 +776,18 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string
if er.Opts.AMQPQueueIDProcessed != nil {
opts[utils.AMQPQueueIDProcessedCfg] = *er.Opts.AMQPQueueIDProcessed
}
if er.Opts.AMQPUsername != nil {
opts[utils.AMQPUsername] = *er.Opts.AMQPUsername
}
if er.Opts.AMQPPassword != nil {
opts[utils.AMQPPassword] = *er.Opts.AMQPPassword
}
if er.Opts.AMQPUsernameProcessed != nil {
opts[utils.AMQPUsernameProcessedCfg] = *er.Opts.AMQPUsernameProcessed
}
if er.Opts.AMQPPasswordProcessed != nil {
opts[utils.AMQPPasswordProcessedCfg] = *er.Opts.AMQPPasswordProcessed
}
if er.Opts.AMQPConsumerTag != nil {
opts[utils.AMQPConsumerTag] = *er.Opts.AMQPConsumerTag
}

View File

@@ -217,6 +217,10 @@ type EventReaderOptsJson struct {
XMLRootPath *string `json:"xmlRootPath"`
AMQPQueueID *string `json:"amqpQueueID"`
AMQPQueueIDProcessed *string `json:"amqpQueueIDProcessed"`
AMQPUsername *string `json:"amqpUsername"`
AMQPPassword *string `json:"amqpPassword"`
AMQPUsernameProcessed *string `json:"amqpUsernameProcessed"`
AMQPPasswordProcessed *string `json:"amqpPasswordProcessed"`
AMQPConsumerTag *string `json:"amqpConsumerTag"`
AMQPExchange *string `json:"amqpExchange"`
AMQPExchangeType *string `json:"amqpExchangeType"`
@@ -317,6 +321,8 @@ type EventExporterOptsJson struct {
AMQPRoutingKey *string `json:"amqpRoutingKey"`
AMQPExchange *string `json:"amqpExchange"`
AMQPExchangeType *string `json:"amqpExchangeType"`
AMQPUsername *string `json:"amqpUsername"`
AMQPPassword *string `json:"amqpPassword"`
AWSRegion *string `json:"awsRegion"`
AWSKey *string `json:"awsKey"`
AWSSecret *string `json:"awsSecret"`

View File

@@ -58,9 +58,11 @@
// connection string from Azure Portal can be found here: Home > [Service Bus namespace] >
// > Shared access policies > RootManageSharedAccessKey > Primary Connection String where
// access-key-name is RootManageSharedAccessKey and access-key is the primary connection string
"export_path": "amqps://access-key-name:access-key@name-space.servicebus.windows.net",
"export_path": "amqps://name-space.servicebus.windows.net",
"opts": {
"amqpQueueID": "cgrates_cdrs"
"amqpQueueID": "cgrates_cdrs",
"amqpUsername": "access-key-name",
"amqpPassword": "access-key"
},
"attempts": 1,
"failed_posts_dir": "/var/spool/cgrates/failed_posts2",

View File

@@ -38,14 +38,20 @@ func NewAMQPv1EE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *AMQPv1
if cfg.Opts.AMQPQueueID != nil {
pstr.queueID = "/" + *cfg.Opts.AMQPQueueID
}
if cfg.Opts.AMQPUsername != nil && cfg.Opts.AMQPPassword != nil {
pstr.connOpts = &amqpv1.ConnOptions{
SASLType: amqpv1.SASLTypePlain(*cfg.Opts.AMQPUsername, *cfg.Opts.AMQPPassword),
}
}
return pstr
}
// AMQPv1EE a poster for amqpv1
type AMQPv1EE struct {
queueID string // identifier of the CDR queue where we publish
conn *amqpv1.Conn
session *amqpv1.Session
queueID string // identifier of the CDR queue where we publish
conn *amqpv1.Conn
connOpts *amqpv1.ConnOptions
session *amqpv1.Session
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
@@ -60,7 +66,7 @@ func (pstr *AMQPv1EE) Connect() (err error) {
pstr.Lock()
defer pstr.Unlock()
if pstr.conn == nil {
if pstr.conn, err = amqpv1.Dial(pstr.Cfg().ExportPath, nil); err != nil {
if pstr.conn, err = amqpv1.Dial(pstr.Cfg().ExportPath, pstr.connOpts); err != nil {
return
}
}

View File

@@ -33,12 +33,13 @@ import (
)
var (
runAMQPv1Test = flag.Bool("amqpv1_ees", false, "Run the integration test for the AMQPv1 exporter")
amqpv1ConfDir string
amqpv1CfgPath string
amqpv1Cfg *config.CGRConfig
amqpv1RPC *rpc.Client
amqpv1DialURL string
runAMQPv1Test = flag.Bool("amqpv1_ees", false, "Run the integration test for the AMQPv1 exporter")
amqpv1ConfDir string
amqpv1CfgPath string
amqpv1Cfg *config.CGRConfig
amqpv1RPC *rpc.Client
amqpv1DialURL string
amqpv1ConnOpts *amqpv1.ConnOptions
sTestsAMQPv1 = []func(t *testing.T){
testAMQPv1LoadConfig,
@@ -73,6 +74,11 @@ func testAMQPv1LoadConfig(t *testing.T) {
for _, value := range amqpv1Cfg.EEsCfg().Exporters {
if value.ID == "amqpv1_test_file" {
amqpv1DialURL = value.ExportPath
if value.Opts.AMQPUsername != nil && value.Opts.AMQPPassword != nil {
amqpv1ConnOpts = &amqpv1.ConnOptions{
SASLType: amqpv1.SASLTypePlain(*value.Opts.AMQPUsername, *value.Opts.AMQPPassword),
}
}
}
}
}
@@ -137,12 +143,7 @@ func testAMQPv1ExportEvent(t *testing.T) {
func testAMQPv1VerifyExport(t *testing.T) {
// Create client
client, err := amqpv1.Dial(amqpv1DialURL, nil)
/* an alternative way to create the client
client, err := amqpv1.Dial("amqps://cgratescdrs.servicebus.windows.net",
amqpv1.ConnSASLPlain("access-key-name", "access-key"),
)
*/
client, err := amqpv1.Dial(amqpv1DialURL, amqpv1ConnOpts)
if err != nil {
t.Fatal("Dialing AMQP server:", err)
}

View File

@@ -54,6 +54,11 @@ func NewAMQPv1ER(cfg *config.CGRConfig, cfgIdx int,
if rdr.Config().Opts.AMQPQueueID != nil {
rdr.queueID = "/" + *rdr.Config().Opts.AMQPQueueID
}
if rdr.Config().Opts.AMQPUsername != nil && rdr.Config().Opts.AMQPPassword != nil {
rdr.connOpts = &amqpv1.ConnOptions{
SASLType: amqpv1.SASLTypePlain(*rdr.Config().Opts.AMQPUsername, *rdr.Config().Opts.AMQPPassword),
}
}
rdr.createPoster()
return rdr, nil
}
@@ -73,8 +78,9 @@ type AMQPv1ER struct {
rdrErr chan error
cap chan struct{}
conn *amqpv1.Conn
ses *amqpv1.Session
conn *amqpv1.Conn
connOpts *amqpv1.ConnOptions
ses *amqpv1.Session
poster *ees.AMQPv1EE
}
@@ -86,7 +92,7 @@ func (rdr *AMQPv1ER) Config() *config.EventReaderCfg {
// Serve will start the gorutines needed to watch the amqpv1 topic
func (rdr *AMQPv1ER) Serve() (err error) {
if rdr.conn, err = amqpv1.Dial(rdr.Config().SourcePath, nil); err != nil {
if rdr.conn, err = amqpv1.Dial(rdr.Config().SourcePath, rdr.connOpts); err != nil {
return
}
if rdr.ses, err = rdr.conn.NewSession(context.TODO(), nil); err != nil {

View File

@@ -54,6 +54,18 @@ func getProcessOptions(erOpts *config.EventReaderOpts) (eeOpts *config.EventExpo
}
eeOpts.AMQPRoutingKey = erOpts.AMQPRoutingKeyProcessed
}
if erOpts.AMQPUsernameProcessed != nil {
if eeOpts == nil {
eeOpts = new(config.EventExporterOpts)
}
eeOpts.AMQPUsername = erOpts.AMQPUsernameProcessed
}
if erOpts.AMQPPasswordProcessed != nil {
if eeOpts == nil {
eeOpts = new(config.EventExporterOpts)
}
eeOpts.AMQPPassword = erOpts.AMQPPasswordProcessed
}
if erOpts.AWSKeyProcessed != nil {
if eeOpts == nil {
eeOpts = new(config.EventExporterOpts)

View File

@@ -2596,6 +2596,8 @@ const (
AMQPExchange = "amqpExchange"
AMQPExchangeType = "amqpExchangeType"
AMQPRoutingKey = "amqpRoutingKey"
AMQPUsername = "amqpUsername"
AMQPPassword = "amqpPassword"
// kafka
KafkaDefaultTopic = "cgrates"
@@ -2652,6 +2654,8 @@ const (
// processed opts
AMQPQueueIDProcessedCfg = "amqpQueueIDProcessed"
AMQPUsernameProcessedCfg = "amqpUsernameProcessed"
AMQPPasswordProcessedCfg = "amqpPasswordProcessed"
AMQPExchangeProcessedCfg = "amqpExchangeProcessed"
AMQPExchangeTypeProcessedCfg = "amqpExchangeTypeProcessed"
AMQPRoutingKeyProcessedCfg = "amqpRoutingKeyProcessed"