From 8b70d13eaeedfafa9826a5577d79cab9c897db9e Mon Sep 17 00:00:00 2001 From: Trial97 Date: Mon, 9 Sep 2019 18:54:00 +0300 Subject: [PATCH] Updated Kafka event reader --- config/config_json_test.go | 2 - config/config_test.go | 2 - config/erscfg.go | 10 ---- config/erscfg_test.go | 6 --- config/libconfig_json.go | 2 - ers/kafka.go | 72 ++++++++++++++++++++-------- ers/kafka_it_test.go | 15 +++--- general_tests/cdrs_onlexp_it_test.go | 10 ++-- utils/consts.go | 1 + 9 files changed, 63 insertions(+), 57 deletions(-) diff --git a/config/config_json_test.go b/config/config_json_test.go index 5e2e4e2c2..484bcd0a3 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -1625,7 +1625,6 @@ func TestDfEventReaderCfg(t *testing.T) { Source_path: utils.StringPointer("/var/spool/cgrates/cdrc/in"), Processed_path: utils.StringPointer("/var/spool/cgrates/cdrc/out"), Xml_root_path: utils.StringPointer(utils.EmptyString), - Source_id: utils.StringPointer("ers_csv"), Tenant: utils.StringPointer(utils.EmptyString), Timezone: utils.StringPointer(utils.EmptyString), Filters: &[]string{}, @@ -1633,7 +1632,6 @@ func TestDfEventReaderCfg(t *testing.T) { Header_fields: &[]*FcTemplateJsonCfg{}, Content_fields: &cdrFields, Trailer_fields: &[]*FcTemplateJsonCfg{}, - Continue: utils.BoolPointer(false), }, }, } diff --git a/config/config_test.go b/config/config_test.go index 21632187c..483afe5a8 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -1821,7 +1821,6 @@ func TestCgrCdfEventReader(t *testing.T) { SourcePath: "/var/spool/cgrates/cdrc/in", ProcessedPath: "/var/spool/cgrates/cdrc/out", XmlRootPath: utils.EmptyString, - SourceID: "ers_csv", Tenant: nil, Timezone: utils.EmptyString, Filters: []string{}, @@ -1870,7 +1869,6 @@ func TestCgrCfgEventReaderDefault(t *testing.T) { SourcePath: "/var/spool/cgrates/cdrc/in", ProcessedPath: "/var/spool/cgrates/cdrc/out", XmlRootPath: utils.EmptyString, - SourceID: "ers_csv", Tenant: nil, Timezone: utils.EmptyString, Filters: nil, diff --git a/config/erscfg.go b/config/erscfg.go index 14fec8623..67ac042f1 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -104,7 +104,6 @@ type EventReaderCfg struct { SourcePath string ProcessedPath string XmlRootPath string - SourceID string Tenant RSRParsers Timezone string Filters []string @@ -112,7 +111,6 @@ type EventReaderCfg struct { HeaderFields []*FCTemplate ContentFields []*FCTemplate TrailerFields []*FCTemplate - Continue bool } func (er *EventReaderCfg) loadFromJsonCfg(jsnCfg *EventReaderJsonCfg, sep string) (err error) { @@ -143,9 +141,6 @@ func (er *EventReaderCfg) loadFromJsonCfg(jsnCfg *EventReaderJsonCfg, sep string if jsnCfg.Xml_root_path != nil { er.XmlRootPath = *jsnCfg.Xml_root_path } - if jsnCfg.Source_id != nil { - er.SourceID = *jsnCfg.Source_id - } if jsnCfg.Tenant != nil { if er.Tenant, err = NewRSRParsers(*jsnCfg.Tenant, true, sep); err != nil { return err @@ -180,9 +175,6 @@ func (er *EventReaderCfg) loadFromJsonCfg(jsnCfg *EventReaderJsonCfg, sep string return err } } - if jsnCfg.Continue != nil { - er.Continue = *jsnCfg.Continue - } return } @@ -197,7 +189,6 @@ func (er *EventReaderCfg) Clone() (cln *EventReaderCfg) { cln.SourcePath = er.SourcePath cln.ProcessedPath = er.ProcessedPath cln.XmlRootPath = er.XmlRootPath - cln.SourceID = er.SourceID if len(er.Tenant) != 0 { cln.Tenant = make(RSRParsers, len(er.Tenant)) for idx, val := range er.Tenant { @@ -225,6 +216,5 @@ func (er *EventReaderCfg) Clone() (cln *EventReaderCfg) { for idx, fld := range er.TrailerFields { cln.TrailerFields[idx] = fld.Clone() } - cln.Continue = er.Continue return } diff --git a/config/erscfg_test.go b/config/erscfg_test.go index 409c0fa85..73f75654f 100644 --- a/config/erscfg_test.go +++ b/config/erscfg_test.go @@ -30,7 +30,6 @@ func TestEventRedearClone(t *testing.T) { ID: utils.MetaDefault, Type: "RandomType", FieldSep: ",", - SourceID: "RandomSource", Filters: []string{"Filter1", "Filter2"}, Tenant: NewRSRParsersMustCompile("cgrates.org", true, utils.INFIELD_SEP), HeaderFields: []*FCTemplate{}, @@ -51,7 +50,6 @@ func TestEventRedearClone(t *testing.T) { }, }, TrailerFields: []*FCTemplate{}, - Continue: true, } cloned := orig.Clone() if !reflect.DeepEqual(cloned, orig) { @@ -61,7 +59,6 @@ func TestEventRedearClone(t *testing.T) { ID: utils.MetaDefault, Type: "RandomType", FieldSep: ",", - SourceID: "RandomSource", Filters: []string{"Filter1", "Filter2"}, Tenant: NewRSRParsersMustCompile("cgrates.org", true, utils.INFIELD_SEP), HeaderFields: []*FCTemplate{}, @@ -82,7 +79,6 @@ func TestEventRedearClone(t *testing.T) { }, }, TrailerFields: []*FCTemplate{}, - Continue: true, } orig.Filters = []string{"SingleFilter"} orig.ContentFields = []*FCTemplate{ @@ -117,7 +113,6 @@ func TestEventReaderLoadFromJSON(t *testing.T) { SourcePath: "/var/spool/cgrates/cdrc/in", ProcessedPath: "/var/spool/cgrates/cdrc/out", XmlRootPath: utils.EmptyString, - SourceID: "ers_csv", Tenant: nil, Timezone: utils.EmptyString, Filters: []string{}, @@ -158,7 +153,6 @@ func TestEventReaderLoadFromJSON(t *testing.T) { SourcePath: "/tmp/ers/in", ProcessedPath: "/tmp/ers/out", XmlRootPath: utils.EmptyString, - SourceID: "ers_csv", Tenant: nil, Timezone: utils.EmptyString, Filters: nil, diff --git a/config/libconfig_json.go b/config/libconfig_json.go index c4d9a8447..b9473152a 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -187,7 +187,6 @@ type EventReaderJsonCfg struct { Source_path *string Processed_path *string Xml_root_path *string - Source_id *string Tenant *string Timezone *string Filters *[]string @@ -195,7 +194,6 @@ type EventReaderJsonCfg struct { Header_fields *[]*FcTemplateJsonCfg Content_fields *[]*FcTemplateJsonCfg Trailer_fields *[]*FcTemplateJsonCfg - Continue *bool } // SM-Generic config section diff --git a/ers/kafka.go b/ers/kafka.go index b537bd4c3..28a38b52b 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -38,6 +38,7 @@ import ( const ( defaultTopic = "cgrates_cdrc" defaultGroupID = "cgrates_consumer" + defaultMaxWait = time.Millisecond ) // NewKafkaER return a new kafka event reader @@ -53,6 +54,12 @@ func NewKafkaER(cfg *config.CGRConfig, cfgIdx int, rdrExit: rdrExit, rdrErr: rdrErr, } + if concReq := rdr.Config().ConcurrentReqs; concReq != -1 { + rdr.cap = make(chan struct{}, concReq) + for i := 0; i < concReq; i++ { + rdr.cap <- struct{}{} + } + } er = rdr err = rdr.setURL(rdr.Config().SourcePath) return @@ -68,24 +75,26 @@ type KafkaER struct { dialURL string topic string groupID string + maxWait time.Duration rdrEvents chan *erEvent // channel to dispatch the events created to rdrExit chan struct{} rdrErr chan error + cap chan struct{} } +// Config returns the curent configuration func (rdr *KafkaER) Config() *config.EventReaderCfg { return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] } +// Serve will start the gorutines needed to watch the kafka topic func (rdr *KafkaER) Serve() (err error) { r := kafka.NewReader(kafka.ReaderConfig{ - Brokers: []string{rdr.dialURL}, - GroupID: rdr.groupID, - Topic: rdr.topic, - MinBytes: 10e3, // 10KB - MaxBytes: 10e6, // 10MB - RebalanceTimeout: time.Second, + Brokers: []string{rdr.dialURL}, + GroupID: rdr.groupID, + Topic: rdr.topic, + MaxWait: rdr.maxWait, }) if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API @@ -102,27 +111,46 @@ func (rdr *KafkaER) Serve() (err error) { return } }(r) - go func(r *kafka.Reader) { // read until the conection is closed - for { - msg, err := r.ReadMessage(context.Background()) - if err != nil { - if err == io.EOF { - // ignore io.EOF received from closing the connection from our side - // this is happening when we stop the reader - return - } - // send it to the error channel - rdr.rdrErr <- err + go rdr.readLoop(r) // read until the conection is closed + return +} + +func (rdr *KafkaER) readLoop(r *kafka.Reader) { + for { + if rdr.Config().ConcurrentReqs != -1 { + <-rdr.cap // do not try to read if the limit is reached + } + msg, err := r.ReadMessage(context.Background()) + if err != nil { + if err == io.EOF { + // ignore io.EOF received from closing the connection from our side + // this is happening when we stop the reader return } + // send it to the error channel + rdr.rdrErr <- err + return + } + go func(msg kafka.Message) { if err := rdr.processMessage(msg.Value); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> processing message %s error: %s", utils.ERs, string(msg.Key), err.Error())) } - } - }(r) - return + if rdr.Config().ProcessedPath != utils.EmptyString { // post it + if err := engine.PostersCache.PostKafka(rdr.Config().ProcessedPath, + rdr.cgrCfg.GeneralCfg().PosterAttempts, msg.Value, "", + utils.META_NONE, string(msg.Key)); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> writing message %s error: %s", + utils.ERs, string(msg.Key), err.Error())) + } + } + if rdr.Config().ConcurrentReqs != -1 { + rdr.cap <- struct{}{} + } + }(msg) + } } func (rdr *KafkaER) processMessage(msg []byte) (err error) { @@ -170,5 +198,9 @@ func (rdr *KafkaER) setURL(dialURL string) (err error) { if vals, has := qry[utils.KafkaGroupID]; has && len(vals) != 0 { rdr.groupID = vals[0] } + rdr.maxWait = defaultMaxWait + if vals, has := qry[utils.KafkaMaxWait]; has && len(vals) != 0 { + rdr.maxWait, err = time.ParseDuration(vals[0]) + } return } diff --git a/ers/kafka_it_test.go b/ers/kafka_it_test.go index 1b4564b24..d346cecfd 100644 --- a/ers/kafka_it_test.go +++ b/ers/kafka_it_test.go @@ -45,13 +45,12 @@ func TestKafkaER(t *testing.T) { "enabled": true, // starts the EventReader service: "readers": [ { - "id": "kafka", // identifier of the EventReader profile - "type": "*kafka_json_map", // reader type <*file_csv> - "run_delay": -1, // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together - // "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited - "source_path": "localhost:9092", // read data from this path + "id": "kafka", // identifier of the EventReader profile + "type": "*kafka_json_map", // reader type <*file_csv> + "run_delay": -1, // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together + "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited + "source_path": "localhost:9092", // read data from this path // "processed_path": "/var/spool/cgrates/cdrc/out", // move processed data here - // "source_id": "ers_csv", // free form field, tag identifying the source of the CDRs within CDRS database "tenant": "cgrates.org", // tenant used by import "filters": [], // limit parsing based on the filters "flags": [], // flags to influence the event processing @@ -60,7 +59,6 @@ func TestKafkaER(t *testing.T) { {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "field_id": "CGRID"}, ], // "trailer_fields": [], // template of the import trailer fields - "continue": false, // continue to the next template if executed }, ], }, @@ -110,9 +108,8 @@ func TestKafkaER(t *testing.T) { if !reflect.DeepEqual(ev.cgrEvent, expected) { t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent)) } - case <-time.After(30 * time.Second): + case <-time.After(10 * time.Second): t.Errorf("Timeout") } rdrExit <- struct{}{} - } diff --git a/general_tests/cdrs_onlexp_it_test.go b/general_tests/cdrs_onlexp_it_test.go index bdc492f7a..4ade74059 100644 --- a/general_tests/cdrs_onlexp_it_test.go +++ b/general_tests/cdrs_onlexp_it_test.go @@ -479,12 +479,10 @@ func TestCDRsOnExpKafkaPosterFileFailover(t *testing.T) { failoverContent := [][]byte{[]byte(`{"CGRID":"57548d485d61ebcba55afbe5d939c82a8e9ff670"}`), []byte(`{"CGRID":"88ed9c38005f07576a1e1af293063833b60edcc6"}`)} reader := kafka.NewReader(kafka.ReaderConfig{ - Brokers: []string{"localhost:9092"}, - Topic: "cgrates_cdrs", - GroupID: "tmp", - MinBytes: 10e3, // 10KB - MaxBytes: 10e6, // 10MB - RebalanceTimeout: 1, + Brokers: []string{"localhost:9092"}, + Topic: "cgrates_cdrs", + GroupID: "tmp", + MaxWait: time.Millisecond, }) defer reader.Close() diff --git a/utils/consts.go b/utils/consts.go index ee13b1ae1..1f2cd0f07 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -1178,6 +1178,7 @@ const ( AWSSecret = "aws_secret" KafkaTopic = "topic" KafkaGroupID = "group_id" + KafkaMaxWait = "max_wait" ) // Google_API