diff --git a/ers/kafka.go b/ers/kafka.go index 8f4571aa0..9a0ec1f89 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -45,8 +45,8 @@ const ( ) func NewKafkaER(cfg *config.CGRConfig, cfgIdx int, - rdrEvents chan *erEvent, fltrS *engine.FilterS, - rdrExit chan struct{}, appExit chan bool) (er EventReader, err error) { + rdrEvents chan *erEvent, rdrErr chan error, + fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { rdr := &KafkaER{ cgrCfg: cfg, @@ -54,8 +54,9 @@ func NewKafkaER(cfg *config.CGRConfig, cfgIdx int, fltrS: fltrS, rdrEvents: rdrEvents, rdrExit: rdrExit, - appExit: appExit, + rdrErr: rdrErr, } + er = rdr err = rdr.setUrl(rdr.Config().SourcePath) return } @@ -73,7 +74,7 @@ type KafkaER struct { rdrEvents chan *erEvent // channel to dispatch the events created to rdrExit chan struct{} - appExit chan bool + rdrErr chan error } func (rdr *KafkaER) Config() *config.EventReaderCfg { @@ -82,11 +83,12 @@ func (rdr *KafkaER) Config() *config.EventReaderCfg { func (rdr *KafkaER) Serve() (err error) { r := kafka.NewReader(kafka.ReaderConfig{ - Brokers: []string{rdr.dialURL}, - GroupID: rdr.groupID, - Topic: rdr.topic, - MinBytes: 10e3, // 10KB - MaxBytes: 10e6, // 10MB + Brokers: []string{rdr.dialURL}, + GroupID: rdr.groupID, + Topic: rdr.topic, + MinBytes: 10e3, // 10KB + MaxBytes: 10e6, // 10MB + RebalanceTimeout: time.Second, }) if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API @@ -106,10 +108,14 @@ func (rdr *KafkaER) Serve() (err error) { go func(r *kafka.Reader) { // read until the conection is closed for { msg, err := r.ReadMessage(context.Background()) - if err != nil && err != io.EOF { // ignore io.EOF received from closing the connection - utils.Logger.Warning( - fmt.Sprintf("<%s> processing message error: %s", - utils.ERs, err.Error())) + if err != nil { + if err == io.EOF { + // ignore io.EOF received from closing the connection from our side + // this is happening when we stop the reader + return + } + // send it to the error channel + rdr.rdrErr <- err return } if err := rdr.processMessage(msg.Value); err != nil { diff --git a/ers/kafka_test.go b/ers/kafka_test.go new file mode 100644 index 000000000..d44d14f4f --- /dev/null +++ b/ers/kafka_test.go @@ -0,0 +1,119 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ers + +import ( + "context" + "reflect" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + kafka "github.com/segmentio/kafka-go" +) + +var ( + rdrEvents chan *erEvent + rdrErr chan error + rdrExit chan struct{} + kfk EventReader +) + +func TestKafkaER(t *testing.T) { + cfg, err := config.NewCGRConfigFromJsonStringWithDefaults(`{ +"ers": { // EventReaderService + "enabled": true, // starts the EventReader service: + "readers": [ + { + "id": "kafka", // identifier of the EventReader profile + "type": "*kafka_json_map", // reader type <*file_csv> + "run_delay": -1, // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together + // "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited + "source_path": "localhost:9092", // read data from this path + // "processed_path": "/var/spool/cgrates/cdrc/out", // move processed data here + // "source_id": "ers_csv", // free form field, tag identifying the source of the CDRs within CDRS database + "tenant": "cgrates.org", // tenant used by import + "filters": [], // limit parsing based on the filters + "flags": [], // flags to influence the event processing + // "header_fields": [], // template of the import header fields + "content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value + {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "field_id": "CGRID"}, + ], + // "trailer_fields": [], // template of the import trailer fields + "continue": false, // continue to the next template if executed + }, + ], +}, +}`) + if err != nil { + t.Error(err) + } + rdrEvents = make(chan *erEvent, 1) + rdrErr = make(chan error, 1) + rdrExit = make(chan struct{}, 1) + + if kfk, err = NewKafkaER(cfg, 1, rdrEvents, + rdrErr, new(engine.FilterS), rdrExit); err != nil { + t.Fatal(err) + } + kfk.Serve() + w := kafka.NewWriter(kafka.WriterConfig{ + Brokers: []string{"localhost:9092"}, + Topic: defaultTopic, + }) + + w.WriteMessages(context.Background(), + kafka.Message{ + Key: []byte("TestKey"), // for the momment we do not proccess the key + Value: []byte(`{"CGRID": "RandomCGRID"}`), + }, + ) + + w.Close() + // tStart := time.Now() + select { + case err = <-rdrErr: + t.Error(err) + case ev := <-rdrEvents: + // fmt.Printf("It took %s to proccess the message.\n", time.Now().Sub(tStart)) + if ev.rdrCfg.ID != "kafka" { + t.Errorf("Expected ....") + } + expected := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: ev.cgrEvent.ID, + Time: ev.cgrEvent.Time, + Event: map[string]interface{}{ + "CGRID": "RandomCGRID", + }, + } + if !reflect.DeepEqual(ev.cgrEvent, expected) { + t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent)) + } + + case <-time.After(30 * time.Second): + t.Errorf("Timeout") + } + rdrExit <- struct{}{} + +} diff --git a/ers/reader.go b/ers/reader.go index 075aca6d4..bc520e2d3 100644 --- a/ers/reader.go +++ b/ers/reader.go @@ -41,7 +41,7 @@ func NewEventReader(cfg *config.CGRConfig, cfgIdx int, case utils.MetaFileCSV: return NewCSVFileER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) case utils.MetaKafkajsonMap: - return NewKafkaER(cfg, cfgIdx, rdrEvents, fltrS, rdrExit, appExit) + return NewKafkaER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) } return } diff --git a/general_tests/cdrs_onlexp_it_test.go b/general_tests/cdrs_onlexp_it_test.go index 03cc19d59..bdc492f7a 100644 --- a/general_tests/cdrs_onlexp_it_test.go +++ b/general_tests/cdrs_onlexp_it_test.go @@ -479,9 +479,12 @@ func TestCDRsOnExpKafkaPosterFileFailover(t *testing.T) { failoverContent := [][]byte{[]byte(`{"CGRID":"57548d485d61ebcba55afbe5d939c82a8e9ff670"}`), []byte(`{"CGRID":"88ed9c38005f07576a1e1af293063833b60edcc6"}`)} reader := kafka.NewReader(kafka.ReaderConfig{ - Brokers: []string{"localhost:9092"}, - Topic: "cgrates_cdrs", - GroupID: "tmp", + Brokers: []string{"localhost:9092"}, + Topic: "cgrates_cdrs", + GroupID: "tmp", + MinBytes: 10e3, // 10KB + MaxBytes: 10e6, // 10MB + RebalanceTimeout: 1, }) defer reader.Close() diff --git a/go.mod b/go.mod index 02d78aa0a..f5ba8e650 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( github.com/nyaruka/phonenumbers v1.0.43 github.com/peterh/liner v1.1.1-0.20190305032635-6f820f8f90ce github.com/pkg/errors v0.8.2-0.20190227000051-27936f6d90f9 - github.com/segmentio/kafka-go v0.2.6-0.20190708214315-03ea927bad14 + github.com/segmentio/kafka-go v0.3.3 github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 github.com/ugorji/go v0.0.0-20171112025056-5a66da2e74af github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c diff --git a/go.sum b/go.sum index 8d5cecbe6..d63c21106 100644 --- a/go.sum +++ b/go.sum @@ -140,6 +140,8 @@ github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/segmentio/kafka-go v0.2.6-0.20190708214315-03ea927bad14 h1:PR5Jl7oQea/vTqr+cS+w1SX8s0txlCxbhL823uSu4Lo= github.com/segmentio/kafka-go v0.2.6-0.20190708214315-03ea927bad14/go.mod h1:/D8aoUTJYhf4JKa28ZKxIZszXialN+H5b1Deh224FS4= +github.com/segmentio/kafka-go v0.3.3 h1:V4Ou5vOe0HXux6G/ZdheugcvgmSRFG3IA69btTGrYdo= +github.com/segmentio/kafka-go v0.3.3/go.mod h1:OT5KXBPbaJJTcvokhWR2KFmm0niEx3mnccTwjmLvSi4= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 h1:0ngsPmuP6XIjiFRNFYlvKwSr5zff2v+uPHaffZ6/M4k= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=