diff --git a/engine/poster_test.go b/engine/poster_test.go index 7cf764283..b0573f35f 100644 --- a/engine/poster_test.go +++ b/engine/poster_test.go @@ -41,3 +41,28 @@ func TestAMQPPosterParseURL(t *testing.T) { t.Errorf("Expected: %s ,recived: %s", utils.ToJSON(expected), utils.ToJSON(amqp)) } } + +func TestKafkaParseURL(t *testing.T) { + u := "127.0.0.1:9092?topic=cdr_billing" + exp := &KafkaPoster{ + dialURL: "127.0.0.1:9092", + topic: "cdr_billing", + attempts: 10, + } + if kfk, err := NewKafkaPoster(u, 10); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(exp, kfk) { + t.Errorf("Expected: %s ,recived: %s", utils.ToJSON(exp), utils.ToJSON(kfk)) + } + u = "localhost:9092?topic=cdr_billing" + exp = &KafkaPoster{ + dialURL: "localhost:9092", + topic: "cdr_billing", + attempts: 10, + } + if kfk, err := NewKafkaPoster(u, 10); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(exp, kfk) { + t.Errorf("Expected: %s ,recived: %s", utils.ToJSON(exp), utils.ToJSON(kfk)) + } +} diff --git a/engine/pstr_kafka.go b/engine/pstr_kafka.go index c88996a0a..a02fe8620 100644 --- a/engine/pstr_kafka.go +++ b/engine/pstr_kafka.go @@ -29,13 +29,13 @@ import ( // NewKafkaPoster creates a kafka poster func NewKafkaPoster(dialURL string, attempts int) (*KafkaPoster, error) { - amqp := &KafkaPoster{ + kfkPstr := &KafkaPoster{ attempts: attempts, } - if err := amqp.parseURL(dialURL); err != nil { + if err := kfkPstr.parseURL(dialURL); err != nil { return nil, err } - return amqp, nil + return kfkPstr, nil } // KafkaPoster is a kafka poster @@ -48,14 +48,19 @@ type KafkaPoster struct { } func (pstr *KafkaPoster) parseURL(dialURL string) error { - u, err := url.Parse(dialURL) + pstr.topic = defaultQueueID + i := strings.IndexByte(dialURL, '?') + if i < 0 { + pstr.dialURL = dialURL + return nil + } + pstr.dialURL = dialURL[:i] + rawQuery := dialURL[i+1:] + qry, err := url.ParseQuery(rawQuery) if err != nil { return err } - qry := u.Query() - pstr.dialURL = strings.Split(dialURL, "?")[0] - pstr.topic = defaultQueueID if vals, has := qry[utils.KafkaTopic]; has && len(vals) != 0 { pstr.topic = vals[0] } diff --git a/ers/kafka.go b/ers/kafka.go index 1944c6b3b..dda7cb60c 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -182,22 +182,28 @@ func (rdr *KafkaER) processMessage(msg []byte) (err error) { } func (rdr *KafkaER) setURL(dialURL string) (err error) { - var u *url.URL - if u, err = url.Parse(dialURL); err != nil { + rdr.topic = defaultTopic + rdr.groupID = defaultGroupID + rdr.maxWait = defaultMaxWait + + i := strings.IndexByte(dialURL, '?') + if i < 0 { + rdr.dialURL = dialURL + return + } + rdr.dialURL = dialURL[:i] + rawQuery := dialURL[i+1:] + var qry url.Values + if qry, err = url.ParseQuery(rawQuery); err != nil { return } - qry := u.Query() - rdr.dialURL = strings.Split(dialURL, "?")[0] - rdr.topic = defaultTopic if vals, has := qry[utils.KafkaTopic]; has && len(vals) != 0 { rdr.topic = vals[0] } - rdr.groupID = defaultGroupID if vals, has := qry[utils.KafkaGroupID]; has && len(vals) != 0 { rdr.groupID = vals[0] } - rdr.maxWait = defaultMaxWait if vals, has := qry[utils.KafkaMaxWait]; has && len(vals) != 0 { rdr.maxWait, err = time.ParseDuration(vals[0]) } diff --git a/ers/kafka_test.go b/ers/kafka_test.go index 0eb0301ec..40de4f6f8 100644 --- a/ers/kafka_test.go +++ b/ers/kafka_test.go @@ -69,7 +69,27 @@ func TestKafkaSetURL(t *testing.T) { groupID: "cgrates", maxWait: time.Millisecond, } - if err := k.setURL(":"); err == nil { + if err := k.setURL("127.0.0.1?%"); err == nil { t.Errorf("Expected error received: %v", err) } + + k = new(KafkaER) + expKafka = &KafkaER{ + dialURL: "127.0.0.1:2013", + topic: "cdrs", + groupID: "new", + maxWait: time.Second, + } + url = "127.0.0.1:2013?topic=cdrs&group_id=new&max_wait=1s" + if err := k.setURL(url); err != nil { + t.Fatal(err) + } else if expKafka.dialURL != k.dialURL { + t.Errorf("Expected: %s ,received: %s", expKafka.dialURL, k.dialURL) + } else if expKafka.topic != k.topic { + t.Errorf("Expected: %s ,received: %s", expKafka.topic, k.topic) + } else if expKafka.groupID != k.groupID { + t.Errorf("Expected: %s ,received: %s", expKafka.groupID, k.groupID) + } else if expKafka.maxWait != k.maxWait { + t.Errorf("Expected: %s ,received: %s", expKafka.maxWait, k.maxWait) + } }