mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-16 21:59:53 +05:00
Added kafka reader integration test
This commit is contained in:
committed by
Dan Christian Bogos
parent
883750b58d
commit
3ebba5c3e0
32
ers/kafka.go
32
ers/kafka.go
@@ -45,8 +45,8 @@ const (
|
||||
)
|
||||
|
||||
func NewKafkaER(cfg *config.CGRConfig, cfgIdx int,
|
||||
rdrEvents chan *erEvent, fltrS *engine.FilterS,
|
||||
rdrExit chan struct{}, appExit chan bool) (er EventReader, err error) {
|
||||
rdrEvents chan *erEvent, rdrErr chan error,
|
||||
fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) {
|
||||
|
||||
rdr := &KafkaER{
|
||||
cgrCfg: cfg,
|
||||
@@ -54,8 +54,9 @@ func NewKafkaER(cfg *config.CGRConfig, cfgIdx int,
|
||||
fltrS: fltrS,
|
||||
rdrEvents: rdrEvents,
|
||||
rdrExit: rdrExit,
|
||||
appExit: appExit,
|
||||
rdrErr: rdrErr,
|
||||
}
|
||||
er = rdr
|
||||
err = rdr.setUrl(rdr.Config().SourcePath)
|
||||
return
|
||||
}
|
||||
@@ -73,7 +74,7 @@ type KafkaER struct {
|
||||
|
||||
rdrEvents chan *erEvent // channel to dispatch the events created to
|
||||
rdrExit chan struct{}
|
||||
appExit chan bool
|
||||
rdrErr chan error
|
||||
}
|
||||
|
||||
func (rdr *KafkaER) Config() *config.EventReaderCfg {
|
||||
@@ -82,11 +83,12 @@ func (rdr *KafkaER) Config() *config.EventReaderCfg {
|
||||
|
||||
func (rdr *KafkaER) Serve() (err error) {
|
||||
r := kafka.NewReader(kafka.ReaderConfig{
|
||||
Brokers: []string{rdr.dialURL},
|
||||
GroupID: rdr.groupID,
|
||||
Topic: rdr.topic,
|
||||
MinBytes: 10e3, // 10KB
|
||||
MaxBytes: 10e6, // 10MB
|
||||
Brokers: []string{rdr.dialURL},
|
||||
GroupID: rdr.groupID,
|
||||
Topic: rdr.topic,
|
||||
MinBytes: 10e3, // 10KB
|
||||
MaxBytes: 10e6, // 10MB
|
||||
RebalanceTimeout: time.Second,
|
||||
})
|
||||
|
||||
if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API
|
||||
@@ -106,10 +108,14 @@ func (rdr *KafkaER) Serve() (err error) {
|
||||
go func(r *kafka.Reader) { // read until the conection is closed
|
||||
for {
|
||||
msg, err := r.ReadMessage(context.Background())
|
||||
if err != nil && err != io.EOF { // ignore io.EOF received from closing the connection
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> processing message error: %s",
|
||||
utils.ERs, err.Error()))
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
// ignore io.EOF received from closing the connection from our side
|
||||
// this is happening when we stop the reader
|
||||
return
|
||||
}
|
||||
// send it to the error channel
|
||||
rdr.rdrErr <- err
|
||||
return
|
||||
}
|
||||
if err := rdr.processMessage(msg.Value); err != nil {
|
||||
|
||||
119
ers/kafka_test.go
Normal file
119
ers/kafka_test.go
Normal file
@@ -0,0 +1,119 @@
|
||||
// +build integration
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package ers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
kafka "github.com/segmentio/kafka-go"
|
||||
)
|
||||
|
||||
var (
|
||||
rdrEvents chan *erEvent
|
||||
rdrErr chan error
|
||||
rdrExit chan struct{}
|
||||
kfk EventReader
|
||||
)
|
||||
|
||||
func TestKafkaER(t *testing.T) {
|
||||
cfg, err := config.NewCGRConfigFromJsonStringWithDefaults(`{
|
||||
"ers": { // EventReaderService
|
||||
"enabled": true, // starts the EventReader service: <true|false>
|
||||
"readers": [
|
||||
{
|
||||
"id": "kafka", // identifier of the EventReader profile
|
||||
"type": "*kafka_json_map", // reader type <*file_csv>
|
||||
"run_delay": -1, // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together
|
||||
// "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited
|
||||
"source_path": "localhost:9092", // read data from this path
|
||||
// "processed_path": "/var/spool/cgrates/cdrc/out", // move processed data here
|
||||
// "source_id": "ers_csv", // free form field, tag identifying the source of the CDRs within CDRS database
|
||||
"tenant": "cgrates.org", // tenant used by import
|
||||
"filters": [], // limit parsing based on the filters
|
||||
"flags": [], // flags to influence the event processing
|
||||
// "header_fields": [], // template of the import header fields
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "field_id": "CGRID"},
|
||||
],
|
||||
// "trailer_fields": [], // template of the import trailer fields
|
||||
"continue": false, // continue to the next template if executed
|
||||
},
|
||||
],
|
||||
},
|
||||
}`)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
rdrEvents = make(chan *erEvent, 1)
|
||||
rdrErr = make(chan error, 1)
|
||||
rdrExit = make(chan struct{}, 1)
|
||||
|
||||
if kfk, err = NewKafkaER(cfg, 1, rdrEvents,
|
||||
rdrErr, new(engine.FilterS), rdrExit); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
kfk.Serve()
|
||||
w := kafka.NewWriter(kafka.WriterConfig{
|
||||
Brokers: []string{"localhost:9092"},
|
||||
Topic: defaultTopic,
|
||||
})
|
||||
|
||||
w.WriteMessages(context.Background(),
|
||||
kafka.Message{
|
||||
Key: []byte("TestKey"), // for the momment we do not proccess the key
|
||||
Value: []byte(`{"CGRID": "RandomCGRID"}`),
|
||||
},
|
||||
)
|
||||
|
||||
w.Close()
|
||||
// tStart := time.Now()
|
||||
select {
|
||||
case err = <-rdrErr:
|
||||
t.Error(err)
|
||||
case ev := <-rdrEvents:
|
||||
// fmt.Printf("It took %s to proccess the message.\n", time.Now().Sub(tStart))
|
||||
if ev.rdrCfg.ID != "kafka" {
|
||||
t.Errorf("Expected ....")
|
||||
}
|
||||
expected := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: ev.cgrEvent.ID,
|
||||
Time: ev.cgrEvent.Time,
|
||||
Event: map[string]interface{}{
|
||||
"CGRID": "RandomCGRID",
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(ev.cgrEvent, expected) {
|
||||
t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent))
|
||||
}
|
||||
|
||||
case <-time.After(30 * time.Second):
|
||||
t.Errorf("Timeout")
|
||||
}
|
||||
rdrExit <- struct{}{}
|
||||
|
||||
}
|
||||
@@ -41,7 +41,7 @@ func NewEventReader(cfg *config.CGRConfig, cfgIdx int,
|
||||
case utils.MetaFileCSV:
|
||||
return NewCSVFileER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit)
|
||||
case utils.MetaKafkajsonMap:
|
||||
return NewKafkaER(cfg, cfgIdx, rdrEvents, fltrS, rdrExit, appExit)
|
||||
return NewKafkaER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -479,9 +479,12 @@ func TestCDRsOnExpKafkaPosterFileFailover(t *testing.T) {
|
||||
failoverContent := [][]byte{[]byte(`{"CGRID":"57548d485d61ebcba55afbe5d939c82a8e9ff670"}`), []byte(`{"CGRID":"88ed9c38005f07576a1e1af293063833b60edcc6"}`)}
|
||||
|
||||
reader := kafka.NewReader(kafka.ReaderConfig{
|
||||
Brokers: []string{"localhost:9092"},
|
||||
Topic: "cgrates_cdrs",
|
||||
GroupID: "tmp",
|
||||
Brokers: []string{"localhost:9092"},
|
||||
Topic: "cgrates_cdrs",
|
||||
GroupID: "tmp",
|
||||
MinBytes: 10e3, // 10KB
|
||||
MaxBytes: 10e6, // 10MB
|
||||
RebalanceTimeout: 1,
|
||||
})
|
||||
|
||||
defer reader.Close()
|
||||
|
||||
2
go.mod
2
go.mod
@@ -37,7 +37,7 @@ require (
|
||||
github.com/nyaruka/phonenumbers v1.0.43
|
||||
github.com/peterh/liner v1.1.1-0.20190305032635-6f820f8f90ce
|
||||
github.com/pkg/errors v0.8.2-0.20190227000051-27936f6d90f9
|
||||
github.com/segmentio/kafka-go v0.2.6-0.20190708214315-03ea927bad14
|
||||
github.com/segmentio/kafka-go v0.3.3
|
||||
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94
|
||||
github.com/ugorji/go v0.0.0-20171112025056-5a66da2e74af
|
||||
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
|
||||
|
||||
2
go.sum
2
go.sum
@@ -140,6 +140,8 @@ github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R
|
||||
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
||||
github.com/segmentio/kafka-go v0.2.6-0.20190708214315-03ea927bad14 h1:PR5Jl7oQea/vTqr+cS+w1SX8s0txlCxbhL823uSu4Lo=
|
||||
github.com/segmentio/kafka-go v0.2.6-0.20190708214315-03ea927bad14/go.mod h1:/D8aoUTJYhf4JKa28ZKxIZszXialN+H5b1Deh224FS4=
|
||||
github.com/segmentio/kafka-go v0.3.3 h1:V4Ou5vOe0HXux6G/ZdheugcvgmSRFG3IA69btTGrYdo=
|
||||
github.com/segmentio/kafka-go v0.3.3/go.mod h1:OT5KXBPbaJJTcvokhWR2KFmm0niEx3mnccTwjmLvSi4=
|
||||
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 h1:0ngsPmuP6XIjiFRNFYlvKwSr5zff2v+uPHaffZ6/M4k=
|
||||
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
|
||||
|
||||
Reference in New Issue
Block a user