Updated Kafka URL parsing

This commit is contained in:
Trial97
2020-08-17 12:46:35 +03:00
committed by Dan Christian Bogos
parent 23746ab9dc
commit d453a177bb
4 changed files with 71 additions and 15 deletions

View File

@@ -41,3 +41,28 @@ func TestAMQPPosterParseURL(t *testing.T) {
t.Errorf("Expected: %s ,recived: %s", utils.ToJSON(expected), utils.ToJSON(amqp))
}
}
func TestKafkaParseURL(t *testing.T) {
u := "127.0.0.1:9092?topic=cdr_billing"
exp := &KafkaPoster{
dialURL: "127.0.0.1:9092",
topic: "cdr_billing",
attempts: 10,
}
if kfk, err := NewKafkaPoster(u, 10); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(exp, kfk) {
t.Errorf("Expected: %s ,recived: %s", utils.ToJSON(exp), utils.ToJSON(kfk))
}
u = "localhost:9092?topic=cdr_billing"
exp = &KafkaPoster{
dialURL: "localhost:9092",
topic: "cdr_billing",
attempts: 10,
}
if kfk, err := NewKafkaPoster(u, 10); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(exp, kfk) {
t.Errorf("Expected: %s ,recived: %s", utils.ToJSON(exp), utils.ToJSON(kfk))
}
}

View File

@@ -29,13 +29,13 @@ import (
// NewKafkaPoster creates a kafka poster
func NewKafkaPoster(dialURL string, attempts int) (*KafkaPoster, error) {
amqp := &KafkaPoster{
kfkPstr := &KafkaPoster{
attempts: attempts,
}
if err := amqp.parseURL(dialURL); err != nil {
if err := kfkPstr.parseURL(dialURL); err != nil {
return nil, err
}
return amqp, nil
return kfkPstr, nil
}
// KafkaPoster is a kafka poster
@@ -48,14 +48,19 @@ type KafkaPoster struct {
}
func (pstr *KafkaPoster) parseURL(dialURL string) error {
u, err := url.Parse(dialURL)
pstr.topic = defaultQueueID
i := strings.IndexByte(dialURL, '?')
if i < 0 {
pstr.dialURL = dialURL
return nil
}
pstr.dialURL = dialURL[:i]
rawQuery := dialURL[i+1:]
qry, err := url.ParseQuery(rawQuery)
if err != nil {
return err
}
qry := u.Query()
pstr.dialURL = strings.Split(dialURL, "?")[0]
pstr.topic = defaultQueueID
if vals, has := qry[utils.KafkaTopic]; has && len(vals) != 0 {
pstr.topic = vals[0]
}

View File

@@ -182,22 +182,28 @@ func (rdr *KafkaER) processMessage(msg []byte) (err error) {
}
func (rdr *KafkaER) setURL(dialURL string) (err error) {
var u *url.URL
if u, err = url.Parse(dialURL); err != nil {
rdr.topic = defaultTopic
rdr.groupID = defaultGroupID
rdr.maxWait = defaultMaxWait
i := strings.IndexByte(dialURL, '?')
if i < 0 {
rdr.dialURL = dialURL
return
}
rdr.dialURL = dialURL[:i]
rawQuery := dialURL[i+1:]
var qry url.Values
if qry, err = url.ParseQuery(rawQuery); err != nil {
return
}
qry := u.Query()
rdr.dialURL = strings.Split(dialURL, "?")[0]
rdr.topic = defaultTopic
if vals, has := qry[utils.KafkaTopic]; has && len(vals) != 0 {
rdr.topic = vals[0]
}
rdr.groupID = defaultGroupID
if vals, has := qry[utils.KafkaGroupID]; has && len(vals) != 0 {
rdr.groupID = vals[0]
}
rdr.maxWait = defaultMaxWait
if vals, has := qry[utils.KafkaMaxWait]; has && len(vals) != 0 {
rdr.maxWait, err = time.ParseDuration(vals[0])
}

View File

@@ -69,7 +69,27 @@ func TestKafkaSetURL(t *testing.T) {
groupID: "cgrates",
maxWait: time.Millisecond,
}
if err := k.setURL(":"); err == nil {
if err := k.setURL("127.0.0.1?%"); err == nil {
t.Errorf("Expected error received: %v", err)
}
k = new(KafkaER)
expKafka = &KafkaER{
dialURL: "127.0.0.1:2013",
topic: "cdrs",
groupID: "new",
maxWait: time.Second,
}
url = "127.0.0.1:2013?topic=cdrs&group_id=new&max_wait=1s"
if err := k.setURL(url); err != nil {
t.Fatal(err)
} else if expKafka.dialURL != k.dialURL {
t.Errorf("Expected: %s ,received: %s", expKafka.dialURL, k.dialURL)
} else if expKafka.topic != k.topic {
t.Errorf("Expected: %s ,received: %s", expKafka.topic, k.topic)
} else if expKafka.groupID != k.groupID {
t.Errorf("Expected: %s ,received: %s", expKafka.groupID, k.groupID)
} else if expKafka.maxWait != k.maxWait {
t.Errorf("Expected: %s ,received: %s", expKafka.maxWait, k.maxWait)
}
}