mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-12 02:26:26 +05:00
Moved Kafka constants in utils
This commit is contained in:
committed by
Dan Christian Bogos
parent
d686d3b40a
commit
ca6f3b3319
@@ -34,14 +34,10 @@ const (
|
||||
defaultQueueID = "cgrates_cdrs"
|
||||
defaultExchangeType = "direct"
|
||||
queueID = "queue_id"
|
||||
topic = "topic"
|
||||
exchange = "exchange"
|
||||
exchangeType = "exchange_type"
|
||||
routingKey = "routing_key"
|
||||
|
||||
//awsRegion = "aws_region"
|
||||
//awsID = "aws_key"
|
||||
//awsSecret = "aws_secret"
|
||||
awsToken = "aws_token"
|
||||
folderPath = "folder_path"
|
||||
)
|
||||
|
||||
@@ -57,7 +57,7 @@ func (pstr *KafkaPoster) parseURL(dialURL string) error {
|
||||
|
||||
pstr.dialURL = strings.Split(dialURL, "?")[0]
|
||||
pstr.topic = defaultQueueID
|
||||
if vals, has := qry[topic]; has && len(vals) != 0 {
|
||||
if vals, has := qry[utils.KafkaTopic]; has && len(vals) != 0 {
|
||||
pstr.topic = vals[0]
|
||||
}
|
||||
return nil
|
||||
|
||||
13
ers/kafka.go
13
ers/kafka.go
@@ -38,12 +38,9 @@ import (
|
||||
const (
|
||||
defaultTopic = "cgrates_cdrc"
|
||||
defaultGroupID = "cgrates_consumer"
|
||||
|
||||
// ToDo: export it to utils
|
||||
topic = "topic"
|
||||
groupID = "group_id"
|
||||
)
|
||||
|
||||
// NewKafkaER return a new kafka event reader
|
||||
func NewKafkaER(cfg *config.CGRConfig, cfgIdx int,
|
||||
rdrEvents chan *erEvent, rdrErr chan error,
|
||||
fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) {
|
||||
@@ -57,7 +54,7 @@ func NewKafkaER(cfg *config.CGRConfig, cfgIdx int,
|
||||
rdrErr: rdrErr,
|
||||
}
|
||||
er = rdr
|
||||
err = rdr.setUrl(rdr.Config().SourcePath)
|
||||
err = rdr.setURL(rdr.Config().SourcePath)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -157,7 +154,7 @@ func (rdr *KafkaER) processMessage(msg []byte) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (rdr *KafkaER) setUrl(dialURL string) (err error) {
|
||||
func (rdr *KafkaER) setURL(dialURL string) (err error) {
|
||||
var u *url.URL
|
||||
if u, err = url.Parse(dialURL); err != nil {
|
||||
return
|
||||
@@ -166,11 +163,11 @@ func (rdr *KafkaER) setUrl(dialURL string) (err error) {
|
||||
|
||||
rdr.dialURL = strings.Split(dialURL, "?")[0]
|
||||
rdr.topic = defaultTopic
|
||||
if vals, has := qry[topic]; has && len(vals) != 0 {
|
||||
if vals, has := qry[utils.KafkaTopic]; has && len(vals) != 0 {
|
||||
rdr.topic = vals[0]
|
||||
}
|
||||
rdr.groupID = defaultGroupID
|
||||
if vals, has := qry[groupID]; has && len(vals) != 0 {
|
||||
if vals, has := qry[utils.KafkaGroupID]; has && len(vals) != 0 {
|
||||
rdr.groupID = vals[0]
|
||||
}
|
||||
return
|
||||
|
||||
@@ -1171,11 +1171,13 @@ const (
|
||||
|
||||
// Poster
|
||||
const (
|
||||
SQSPoster = "SQSPoster"
|
||||
S3Poster = "S3Poster"
|
||||
AWSRegion = "aws_region"
|
||||
AWSKey = "aws_key"
|
||||
AWSSecret = "aws_secret"
|
||||
SQSPoster = "SQSPoster"
|
||||
S3Poster = "S3Poster"
|
||||
AWSRegion = "aws_region"
|
||||
AWSKey = "aws_key"
|
||||
AWSSecret = "aws_secret"
|
||||
KafkaTopic = "topic"
|
||||
KafkaGroupID = "group_id"
|
||||
)
|
||||
|
||||
// Google_API
|
||||
|
||||
Reference in New Issue
Block a user