From 25e08ebe797eea8588eae360206d6b1744766f67 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 22 Feb 2023 11:54:16 -0500 Subject: [PATCH] Add options for SASL PLAIN auth (amqp 1.0) --- config/config_defaults.go | 23 +++++++++----- config/eescfg.go | 20 ++++++++++++ config/erscfg.go | 40 ++++++++++++++++++++++++ config/libconfig_json.go | 6 ++++ data/conf/samples/ees_cloud/cgrates.json | 6 ++-- ees/amqpv1.go | 14 ++++++--- ees/amqpv1_it_test.go | 25 ++++++++------- ers/amqpv1.go | 12 +++++-- ers/libers.go | 12 +++++++ utils/consts.go | 4 +++ 10 files changed, 133 insertions(+), 29 deletions(-) diff --git a/config/config_defaults.go b/config/config_defaults.go index 65edc8fc4..6d520580b 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -389,11 +389,16 @@ const CGRATES_CFG_JSON = ` // "amqpQueueID": "cgrates_cdrs", // the queue id for AMQP and AMQPv1 readers from were the events are read // "amqpQueueIDProcessed": "", // the queue id for AMQP and AMQPv1 readers were the events are sent after they are processed - // AMQP - // "amqpConsumerTag": "cgrates", // the ID of the consumer - // "amqpExchange": "", - // "amqpExchangeType": "", - // "amqpRoutingKey": "", + // "amqpUsername": "", // amqp 1.0 exclusive, used for SASL PLAIN auth, usually represents the policy name + // "amqpPassword": "", // amqp 1.0 exclusive, used for SASL PLAIN auth, populated with one of its policy's keys + + // "amqpUsernameProcessed": "", + // "amqpPasswordProcessed": "", + + // "amqpConsumerTag": "cgrates", // the ID of the consumer, amqp 0.9.1 exclusive + // "amqpExchange": "", // amqp 0.9.1 exclusive + // "amqpExchangeType": "", // amqp 0.9.1 exclusive + // "amqpRoutingKey": "", // amqp 0.9.1 exclusive // "amqpExchangeProcessed": "", // "amqpExchangeTypeProcessed": "", @@ -532,9 +537,11 @@ const CGRATES_CFG_JSON = ` // AMQP // "amqpQueueID": "cgrates_cdrs", // the queue id for AMQP exporters from were the events are exported - // "amqpRoutingKey": "", // RoutingKey - // "amqpExchange": "", // Exchange - // "amqpExchangeType": "", // ExchangeType + // "amqpRoutingKey": "", // RoutingKey, amqp 0.9.1 exclusive + // "amqpExchange": "", // Exchange, amqp 0.9.1 exclusive + // "amqpExchangeType": "", // ExchangeType, amqp 0.9.1 exclusive + // "amqpUsername": "", // amqp 1.0 exclusive, used for SASL PLAIN auth, usually represents the policy name + // "amqpPassword": "", // amqp 1.0 exclusive, used for SASL PLAIN auth, populated with one of its policy's keys // SQS and S3 diff --git a/config/eescfg.go b/config/eescfg.go index d3c271857..2182516a1 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -177,6 +177,8 @@ type EventExporterOpts struct { AMQPQueueID *string AMQPExchange *string AMQPExchangeType *string + AMQPUsername *string + AMQPPassword *string AWSRegion *string AWSKey *string AWSSecret *string @@ -307,6 +309,12 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) if jsnCfg.AMQPExchangeType != nil { eeOpts.AMQPExchangeType = jsnCfg.AMQPExchangeType } + if jsnCfg.AMQPUsername != nil { + eeOpts.AMQPUsername = jsnCfg.AMQPUsername + } + if jsnCfg.AMQPPassword != nil { + eeOpts.AMQPPassword = jsnCfg.AMQPPassword + } if jsnCfg.AWSRegion != nil { eeOpts.AWSRegion = jsnCfg.AWSRegion } @@ -567,6 +575,12 @@ func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts { if eeOpts.AMQPExchangeType != nil { cln.AMQPExchangeType = utils.StringPointer(*eeOpts.AMQPExchangeType) } + if eeOpts.AMQPUsername != nil { + cln.AMQPUsername = utils.StringPointer(*eeOpts.AMQPUsername) + } + if eeOpts.AMQPPassword != nil { + cln.AMQPPassword = utils.StringPointer(*eeOpts.AMQPPassword) + } if eeOpts.AWSRegion != nil { cln.AWSRegion = utils.StringPointer(*eeOpts.AWSRegion) } @@ -766,6 +780,12 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str if eeC.Opts.AMQPExchangeType != nil { opts[utils.AMQPExchangeType] = *eeC.Opts.AMQPExchangeType } + if eeC.Opts.AMQPUsername != nil { + opts[utils.AMQPUsername] = *eeC.Opts.AMQPUsername + } + if eeC.Opts.AMQPPassword != nil { + opts[utils.AMQPPassword] = *eeC.Opts.AMQPPassword + } if eeC.Opts.AWSRegion != nil { opts[utils.AWSRegion] = *eeC.Opts.AWSRegion } diff --git a/config/erscfg.go b/config/erscfg.go index c1c0f35d0..4bedd6029 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -147,6 +147,10 @@ type EventReaderOpts struct { XMLRootPath *string AMQPQueueID *string AMQPQueueIDProcessed *string + AMQPUsername *string + AMQPPassword *string + AMQPUsernameProcessed *string + AMQPPasswordProcessed *string AMQPConsumerTag *string AMQPExchange *string AMQPExchangeType *string @@ -252,6 +256,18 @@ func (erOpts *EventReaderOpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err if jsnCfg.AMQPQueueIDProcessed != nil { erOpts.AMQPQueueIDProcessed = jsnCfg.AMQPQueueIDProcessed } + if jsnCfg.AMQPUsername != nil { + erOpts.AMQPUsername = jsnCfg.AMQPUsername + } + if jsnCfg.AMQPPassword != nil { + erOpts.AMQPPassword = jsnCfg.AMQPPassword + } + if jsnCfg.AMQPUsernameProcessed != nil { + erOpts.AMQPUsernameProcessed = jsnCfg.AMQPUsernameProcessed + } + if jsnCfg.AMQPPasswordProcessed != nil { + erOpts.AMQPPasswordProcessed = jsnCfg.AMQPPasswordProcessed + } if jsnCfg.AMQPConsumerTag != nil { erOpts.AMQPConsumerTag = jsnCfg.AMQPConsumerTag } @@ -523,6 +539,18 @@ func (erOpts *EventReaderOpts) Clone() *EventReaderOpts { if erOpts.AMQPQueueIDProcessed != nil { cln.AMQPQueueIDProcessed = utils.StringPointer(*erOpts.AMQPQueueIDProcessed) } + if erOpts.AMQPUsername != nil { + cln.AMQPUsername = utils.StringPointer(*erOpts.AMQPUsername) + } + if erOpts.AMQPPassword != nil { + cln.AMQPPassword = utils.StringPointer(*erOpts.AMQPPassword) + } + if erOpts.AMQPUsernameProcessed != nil { + cln.AMQPUsernameProcessed = utils.StringPointer(*erOpts.AMQPUsernameProcessed) + } + if erOpts.AMQPPasswordProcessed != nil { + cln.AMQPPasswordProcessed = utils.StringPointer(*erOpts.AMQPPasswordProcessed) + } if erOpts.AMQPConsumerTag != nil { cln.AMQPConsumerTag = utils.StringPointer(*erOpts.AMQPConsumerTag) } @@ -748,6 +776,18 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string if er.Opts.AMQPQueueIDProcessed != nil { opts[utils.AMQPQueueIDProcessedCfg] = *er.Opts.AMQPQueueIDProcessed } + if er.Opts.AMQPUsername != nil { + opts[utils.AMQPUsername] = *er.Opts.AMQPUsername + } + if er.Opts.AMQPPassword != nil { + opts[utils.AMQPPassword] = *er.Opts.AMQPPassword + } + if er.Opts.AMQPUsernameProcessed != nil { + opts[utils.AMQPUsernameProcessedCfg] = *er.Opts.AMQPUsernameProcessed + } + if er.Opts.AMQPPasswordProcessed != nil { + opts[utils.AMQPPasswordProcessedCfg] = *er.Opts.AMQPPasswordProcessed + } if er.Opts.AMQPConsumerTag != nil { opts[utils.AMQPConsumerTag] = *er.Opts.AMQPConsumerTag } diff --git a/config/libconfig_json.go b/config/libconfig_json.go index eeda5f106..27d31962a 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -217,6 +217,10 @@ type EventReaderOptsJson struct { XMLRootPath *string `json:"xmlRootPath"` AMQPQueueID *string `json:"amqpQueueID"` AMQPQueueIDProcessed *string `json:"amqpQueueIDProcessed"` + AMQPUsername *string `json:"amqpUsername"` + AMQPPassword *string `json:"amqpPassword"` + AMQPUsernameProcessed *string `json:"amqpUsernameProcessed"` + AMQPPasswordProcessed *string `json:"amqpPasswordProcessed"` AMQPConsumerTag *string `json:"amqpConsumerTag"` AMQPExchange *string `json:"amqpExchange"` AMQPExchangeType *string `json:"amqpExchangeType"` @@ -317,6 +321,8 @@ type EventExporterOptsJson struct { AMQPRoutingKey *string `json:"amqpRoutingKey"` AMQPExchange *string `json:"amqpExchange"` AMQPExchangeType *string `json:"amqpExchangeType"` + AMQPUsername *string `json:"amqpUsername"` + AMQPPassword *string `json:"amqpPassword"` AWSRegion *string `json:"awsRegion"` AWSKey *string `json:"awsKey"` AWSSecret *string `json:"awsSecret"` diff --git a/data/conf/samples/ees_cloud/cgrates.json b/data/conf/samples/ees_cloud/cgrates.json index 993f5bc7e..9705778aa 100644 --- a/data/conf/samples/ees_cloud/cgrates.json +++ b/data/conf/samples/ees_cloud/cgrates.json @@ -58,9 +58,11 @@ // connection string from Azure Portal can be found here: Home > [Service Bus namespace] > // > Shared access policies > RootManageSharedAccessKey > Primary Connection String where // access-key-name is RootManageSharedAccessKey and access-key is the primary connection string - "export_path": "amqps://access-key-name:access-key@name-space.servicebus.windows.net", + "export_path": "amqps://name-space.servicebus.windows.net", "opts": { - "amqpQueueID": "cgrates_cdrs" + "amqpQueueID": "cgrates_cdrs", + "amqpUsername": "access-key-name", + "amqpPassword": "access-key" }, "attempts": 1, "failed_posts_dir": "/var/spool/cgrates/failed_posts2", diff --git a/ees/amqpv1.go b/ees/amqpv1.go index a6b9d2390..9d2fad755 100644 --- a/ees/amqpv1.go +++ b/ees/amqpv1.go @@ -38,14 +38,20 @@ func NewAMQPv1EE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *AMQPv1 if cfg.Opts.AMQPQueueID != nil { pstr.queueID = "/" + *cfg.Opts.AMQPQueueID } + if cfg.Opts.AMQPUsername != nil && cfg.Opts.AMQPPassword != nil { + pstr.connOpts = &amqpv1.ConnOptions{ + SASLType: amqpv1.SASLTypePlain(*cfg.Opts.AMQPUsername, *cfg.Opts.AMQPPassword), + } + } return pstr } // AMQPv1EE a poster for amqpv1 type AMQPv1EE struct { - queueID string // identifier of the CDR queue where we publish - conn *amqpv1.Conn - session *amqpv1.Session + queueID string // identifier of the CDR queue where we publish + conn *amqpv1.Conn + connOpts *amqpv1.ConnOptions + session *amqpv1.Session cfg *config.EventExporterCfg dc *utils.SafeMapStorage @@ -60,7 +66,7 @@ func (pstr *AMQPv1EE) Connect() (err error) { pstr.Lock() defer pstr.Unlock() if pstr.conn == nil { - if pstr.conn, err = amqpv1.Dial(pstr.Cfg().ExportPath, nil); err != nil { + if pstr.conn, err = amqpv1.Dial(pstr.Cfg().ExportPath, pstr.connOpts); err != nil { return } } diff --git a/ees/amqpv1_it_test.go b/ees/amqpv1_it_test.go index d01844618..93b424608 100644 --- a/ees/amqpv1_it_test.go +++ b/ees/amqpv1_it_test.go @@ -33,12 +33,13 @@ import ( ) var ( - runAMQPv1Test = flag.Bool("amqpv1_ees", false, "Run the integration test for the AMQPv1 exporter") - amqpv1ConfDir string - amqpv1CfgPath string - amqpv1Cfg *config.CGRConfig - amqpv1RPC *rpc.Client - amqpv1DialURL string + runAMQPv1Test = flag.Bool("amqpv1_ees", false, "Run the integration test for the AMQPv1 exporter") + amqpv1ConfDir string + amqpv1CfgPath string + amqpv1Cfg *config.CGRConfig + amqpv1RPC *rpc.Client + amqpv1DialURL string + amqpv1ConnOpts *amqpv1.ConnOptions sTestsAMQPv1 = []func(t *testing.T){ testAMQPv1LoadConfig, @@ -73,6 +74,11 @@ func testAMQPv1LoadConfig(t *testing.T) { for _, value := range amqpv1Cfg.EEsCfg().Exporters { if value.ID == "amqpv1_test_file" { amqpv1DialURL = value.ExportPath + if value.Opts.AMQPUsername != nil && value.Opts.AMQPPassword != nil { + amqpv1ConnOpts = &amqpv1.ConnOptions{ + SASLType: amqpv1.SASLTypePlain(*value.Opts.AMQPUsername, *value.Opts.AMQPPassword), + } + } } } } @@ -137,12 +143,7 @@ func testAMQPv1ExportEvent(t *testing.T) { func testAMQPv1VerifyExport(t *testing.T) { // Create client - client, err := amqpv1.Dial(amqpv1DialURL, nil) - /* an alternative way to create the client - client, err := amqpv1.Dial("amqps://cgratescdrs.servicebus.windows.net", - amqpv1.ConnSASLPlain("access-key-name", "access-key"), - ) - */ + client, err := amqpv1.Dial(amqpv1DialURL, amqpv1ConnOpts) if err != nil { t.Fatal("Dialing AMQP server:", err) } diff --git a/ers/amqpv1.go b/ers/amqpv1.go index ba965adc6..1f99b0e7a 100644 --- a/ers/amqpv1.go +++ b/ers/amqpv1.go @@ -54,6 +54,11 @@ func NewAMQPv1ER(cfg *config.CGRConfig, cfgIdx int, if rdr.Config().Opts.AMQPQueueID != nil { rdr.queueID = "/" + *rdr.Config().Opts.AMQPQueueID } + if rdr.Config().Opts.AMQPUsername != nil && rdr.Config().Opts.AMQPPassword != nil { + rdr.connOpts = &amqpv1.ConnOptions{ + SASLType: amqpv1.SASLTypePlain(*rdr.Config().Opts.AMQPUsername, *rdr.Config().Opts.AMQPPassword), + } + } rdr.createPoster() return rdr, nil } @@ -73,8 +78,9 @@ type AMQPv1ER struct { rdrErr chan error cap chan struct{} - conn *amqpv1.Conn - ses *amqpv1.Session + conn *amqpv1.Conn + connOpts *amqpv1.ConnOptions + ses *amqpv1.Session poster *ees.AMQPv1EE } @@ -86,7 +92,7 @@ func (rdr *AMQPv1ER) Config() *config.EventReaderCfg { // Serve will start the gorutines needed to watch the amqpv1 topic func (rdr *AMQPv1ER) Serve() (err error) { - if rdr.conn, err = amqpv1.Dial(rdr.Config().SourcePath, nil); err != nil { + if rdr.conn, err = amqpv1.Dial(rdr.Config().SourcePath, rdr.connOpts); err != nil { return } if rdr.ses, err = rdr.conn.NewSession(context.TODO(), nil); err != nil { diff --git a/ers/libers.go b/ers/libers.go index 387969ef7..89af398ec 100644 --- a/ers/libers.go +++ b/ers/libers.go @@ -54,6 +54,18 @@ func getProcessOptions(erOpts *config.EventReaderOpts) (eeOpts *config.EventExpo } eeOpts.AMQPRoutingKey = erOpts.AMQPRoutingKeyProcessed } + if erOpts.AMQPUsernameProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.AMQPUsername = erOpts.AMQPUsernameProcessed + } + if erOpts.AMQPPasswordProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.AMQPPassword = erOpts.AMQPPasswordProcessed + } if erOpts.AWSKeyProcessed != nil { if eeOpts == nil { eeOpts = new(config.EventExporterOpts) diff --git a/utils/consts.go b/utils/consts.go index f6c9d9c00..6fe50ef76 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2596,6 +2596,8 @@ const ( AMQPExchange = "amqpExchange" AMQPExchangeType = "amqpExchangeType" AMQPRoutingKey = "amqpRoutingKey" + AMQPUsername = "amqpUsername" + AMQPPassword = "amqpPassword" // kafka KafkaDefaultTopic = "cgrates" @@ -2652,6 +2654,8 @@ const ( // processed opts AMQPQueueIDProcessedCfg = "amqpQueueIDProcessed" + AMQPUsernameProcessedCfg = "amqpUsernameProcessed" + AMQPPasswordProcessedCfg = "amqpPasswordProcessed" AMQPExchangeProcessedCfg = "amqpExchangeProcessed" AMQPExchangeTypeProcessedCfg = "amqpExchangeTypeProcessed" AMQPRoutingKeyProcessedCfg = "amqpRoutingKeyProcessed"