diff --git a/engine/poster.go b/engine/poster.go index 4928b70b5..ebb82ab45 100644 --- a/engine/poster.go +++ b/engine/poster.go @@ -34,14 +34,10 @@ const ( defaultQueueID = "cgrates_cdrs" defaultExchangeType = "direct" queueID = "queue_id" - topic = "topic" exchange = "exchange" exchangeType = "exchange_type" routingKey = "routing_key" - //awsRegion = "aws_region" - //awsID = "aws_key" - //awsSecret = "aws_secret" awsToken = "aws_token" folderPath = "folder_path" ) diff --git a/engine/pstr_kafka.go b/engine/pstr_kafka.go index d7df61d26..5d6f21e85 100644 --- a/engine/pstr_kafka.go +++ b/engine/pstr_kafka.go @@ -57,7 +57,7 @@ func (pstr *KafkaPoster) parseURL(dialURL string) error { pstr.dialURL = strings.Split(dialURL, "?")[0] pstr.topic = defaultQueueID - if vals, has := qry[topic]; has && len(vals) != 0 { + if vals, has := qry[utils.KafkaTopic]; has && len(vals) != 0 { pstr.topic = vals[0] } return nil diff --git a/ers/kafka.go b/ers/kafka.go index 9a0ec1f89..fa5ab9c0c 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -38,12 +38,9 @@ import ( const ( defaultTopic = "cgrates_cdrc" defaultGroupID = "cgrates_consumer" - - // ToDo: export it to utils - topic = "topic" - groupID = "group_id" ) +// NewKafkaER return a new kafka event reader func NewKafkaER(cfg *config.CGRConfig, cfgIdx int, rdrEvents chan *erEvent, rdrErr chan error, fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { @@ -57,7 +54,7 @@ func NewKafkaER(cfg *config.CGRConfig, cfgIdx int, rdrErr: rdrErr, } er = rdr - err = rdr.setUrl(rdr.Config().SourcePath) + err = rdr.setURL(rdr.Config().SourcePath) return } @@ -157,7 +154,7 @@ func (rdr *KafkaER) processMessage(msg []byte) (err error) { return } -func (rdr *KafkaER) setUrl(dialURL string) (err error) { +func (rdr *KafkaER) setURL(dialURL string) (err error) { var u *url.URL if u, err = url.Parse(dialURL); err != nil { return @@ -166,11 +163,11 @@ func (rdr *KafkaER) setUrl(dialURL string) (err error) { rdr.dialURL = strings.Split(dialURL, "?")[0] rdr.topic = defaultTopic - if vals, has := qry[topic]; has && len(vals) != 0 { + if vals, has := qry[utils.KafkaTopic]; has && len(vals) != 0 { rdr.topic = vals[0] } rdr.groupID = defaultGroupID - if vals, has := qry[groupID]; has && len(vals) != 0 { + if vals, has := qry[utils.KafkaGroupID]; has && len(vals) != 0 { rdr.groupID = vals[0] } return diff --git a/utils/consts.go b/utils/consts.go index 081a50cb9..ee13b1ae1 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -1171,11 +1171,13 @@ const ( // Poster const ( - SQSPoster = "SQSPoster" - S3Poster = "S3Poster" - AWSRegion = "aws_region" - AWSKey = "aws_key" - AWSSecret = "aws_secret" + SQSPoster = "SQSPoster" + S3Poster = "S3Poster" + AWSRegion = "aws_region" + AWSKey = "aws_key" + AWSSecret = "aws_secret" + KafkaTopic = "topic" + KafkaGroupID = "group_id" ) // Google_API