Updated Kafka event reader

This commit is contained in:
Trial97
2019-09-09 18:54:00 +03:00
committed by Dan Christian Bogos
parent b1607b46c7
commit 8b70d13eae
9 changed files with 63 additions and 57 deletions

View File

@@ -1625,7 +1625,6 @@ func TestDfEventReaderCfg(t *testing.T) {
Source_path: utils.StringPointer("/var/spool/cgrates/cdrc/in"),
Processed_path: utils.StringPointer("/var/spool/cgrates/cdrc/out"),
Xml_root_path: utils.StringPointer(utils.EmptyString),
Source_id: utils.StringPointer("ers_csv"),
Tenant: utils.StringPointer(utils.EmptyString),
Timezone: utils.StringPointer(utils.EmptyString),
Filters: &[]string{},
@@ -1633,7 +1632,6 @@ func TestDfEventReaderCfg(t *testing.T) {
Header_fields: &[]*FcTemplateJsonCfg{},
Content_fields: &cdrFields,
Trailer_fields: &[]*FcTemplateJsonCfg{},
Continue: utils.BoolPointer(false),
},
},
}

View File

@@ -1821,7 +1821,6 @@ func TestCgrCdfEventReader(t *testing.T) {
SourcePath: "/var/spool/cgrates/cdrc/in",
ProcessedPath: "/var/spool/cgrates/cdrc/out",
XmlRootPath: utils.EmptyString,
SourceID: "ers_csv",
Tenant: nil,
Timezone: utils.EmptyString,
Filters: []string{},
@@ -1870,7 +1869,6 @@ func TestCgrCfgEventReaderDefault(t *testing.T) {
SourcePath: "/var/spool/cgrates/cdrc/in",
ProcessedPath: "/var/spool/cgrates/cdrc/out",
XmlRootPath: utils.EmptyString,
SourceID: "ers_csv",
Tenant: nil,
Timezone: utils.EmptyString,
Filters: nil,

View File

@@ -104,7 +104,6 @@ type EventReaderCfg struct {
SourcePath string
ProcessedPath string
XmlRootPath string
SourceID string
Tenant RSRParsers
Timezone string
Filters []string
@@ -112,7 +111,6 @@ type EventReaderCfg struct {
HeaderFields []*FCTemplate
ContentFields []*FCTemplate
TrailerFields []*FCTemplate
Continue bool
}
func (er *EventReaderCfg) loadFromJsonCfg(jsnCfg *EventReaderJsonCfg, sep string) (err error) {
@@ -143,9 +141,6 @@ func (er *EventReaderCfg) loadFromJsonCfg(jsnCfg *EventReaderJsonCfg, sep string
if jsnCfg.Xml_root_path != nil {
er.XmlRootPath = *jsnCfg.Xml_root_path
}
if jsnCfg.Source_id != nil {
er.SourceID = *jsnCfg.Source_id
}
if jsnCfg.Tenant != nil {
if er.Tenant, err = NewRSRParsers(*jsnCfg.Tenant, true, sep); err != nil {
return err
@@ -180,9 +175,6 @@ func (er *EventReaderCfg) loadFromJsonCfg(jsnCfg *EventReaderJsonCfg, sep string
return err
}
}
if jsnCfg.Continue != nil {
er.Continue = *jsnCfg.Continue
}
return
}
@@ -197,7 +189,6 @@ func (er *EventReaderCfg) Clone() (cln *EventReaderCfg) {
cln.SourcePath = er.SourcePath
cln.ProcessedPath = er.ProcessedPath
cln.XmlRootPath = er.XmlRootPath
cln.SourceID = er.SourceID
if len(er.Tenant) != 0 {
cln.Tenant = make(RSRParsers, len(er.Tenant))
for idx, val := range er.Tenant {
@@ -225,6 +216,5 @@ func (er *EventReaderCfg) Clone() (cln *EventReaderCfg) {
for idx, fld := range er.TrailerFields {
cln.TrailerFields[idx] = fld.Clone()
}
cln.Continue = er.Continue
return
}

View File

@@ -30,7 +30,6 @@ func TestEventRedearClone(t *testing.T) {
ID: utils.MetaDefault,
Type: "RandomType",
FieldSep: ",",
SourceID: "RandomSource",
Filters: []string{"Filter1", "Filter2"},
Tenant: NewRSRParsersMustCompile("cgrates.org", true, utils.INFIELD_SEP),
HeaderFields: []*FCTemplate{},
@@ -51,7 +50,6 @@ func TestEventRedearClone(t *testing.T) {
},
},
TrailerFields: []*FCTemplate{},
Continue: true,
}
cloned := orig.Clone()
if !reflect.DeepEqual(cloned, orig) {
@@ -61,7 +59,6 @@ func TestEventRedearClone(t *testing.T) {
ID: utils.MetaDefault,
Type: "RandomType",
FieldSep: ",",
SourceID: "RandomSource",
Filters: []string{"Filter1", "Filter2"},
Tenant: NewRSRParsersMustCompile("cgrates.org", true, utils.INFIELD_SEP),
HeaderFields: []*FCTemplate{},
@@ -82,7 +79,6 @@ func TestEventRedearClone(t *testing.T) {
},
},
TrailerFields: []*FCTemplate{},
Continue: true,
}
orig.Filters = []string{"SingleFilter"}
orig.ContentFields = []*FCTemplate{
@@ -117,7 +113,6 @@ func TestEventReaderLoadFromJSON(t *testing.T) {
SourcePath: "/var/spool/cgrates/cdrc/in",
ProcessedPath: "/var/spool/cgrates/cdrc/out",
XmlRootPath: utils.EmptyString,
SourceID: "ers_csv",
Tenant: nil,
Timezone: utils.EmptyString,
Filters: []string{},
@@ -158,7 +153,6 @@ func TestEventReaderLoadFromJSON(t *testing.T) {
SourcePath: "/tmp/ers/in",
ProcessedPath: "/tmp/ers/out",
XmlRootPath: utils.EmptyString,
SourceID: "ers_csv",
Tenant: nil,
Timezone: utils.EmptyString,
Filters: nil,

View File

@@ -187,7 +187,6 @@ type EventReaderJsonCfg struct {
Source_path *string
Processed_path *string
Xml_root_path *string
Source_id *string
Tenant *string
Timezone *string
Filters *[]string
@@ -195,7 +194,6 @@ type EventReaderJsonCfg struct {
Header_fields *[]*FcTemplateJsonCfg
Content_fields *[]*FcTemplateJsonCfg
Trailer_fields *[]*FcTemplateJsonCfg
Continue *bool
}
// SM-Generic config section

View File

@@ -38,6 +38,7 @@ import (
const (
defaultTopic = "cgrates_cdrc"
defaultGroupID = "cgrates_consumer"
defaultMaxWait = time.Millisecond
)
// NewKafkaER return a new kafka event reader
@@ -53,6 +54,12 @@ func NewKafkaER(cfg *config.CGRConfig, cfgIdx int,
rdrExit: rdrExit,
rdrErr: rdrErr,
}
if concReq := rdr.Config().ConcurrentReqs; concReq != -1 {
rdr.cap = make(chan struct{}, concReq)
for i := 0; i < concReq; i++ {
rdr.cap <- struct{}{}
}
}
er = rdr
err = rdr.setURL(rdr.Config().SourcePath)
return
@@ -68,24 +75,26 @@ type KafkaER struct {
dialURL string
topic string
groupID string
maxWait time.Duration
rdrEvents chan *erEvent // channel to dispatch the events created to
rdrExit chan struct{}
rdrErr chan error
cap chan struct{}
}
// Config returns the curent configuration
func (rdr *KafkaER) Config() *config.EventReaderCfg {
return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx]
}
// Serve will start the gorutines needed to watch the kafka topic
func (rdr *KafkaER) Serve() (err error) {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{rdr.dialURL},
GroupID: rdr.groupID,
Topic: rdr.topic,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
RebalanceTimeout: time.Second,
Brokers: []string{rdr.dialURL},
GroupID: rdr.groupID,
Topic: rdr.topic,
MaxWait: rdr.maxWait,
})
if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API
@@ -102,27 +111,46 @@ func (rdr *KafkaER) Serve() (err error) {
return
}
}(r)
go func(r *kafka.Reader) { // read until the conection is closed
for {
msg, err := r.ReadMessage(context.Background())
if err != nil {
if err == io.EOF {
// ignore io.EOF received from closing the connection from our side
// this is happening when we stop the reader
return
}
// send it to the error channel
rdr.rdrErr <- err
go rdr.readLoop(r) // read until the conection is closed
return
}
func (rdr *KafkaER) readLoop(r *kafka.Reader) {
for {
if rdr.Config().ConcurrentReqs != -1 {
<-rdr.cap // do not try to read if the limit is reached
}
msg, err := r.ReadMessage(context.Background())
if err != nil {
if err == io.EOF {
// ignore io.EOF received from closing the connection from our side
// this is happening when we stop the reader
return
}
// send it to the error channel
rdr.rdrErr <- err
return
}
go func(msg kafka.Message) {
if err := rdr.processMessage(msg.Value); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing message %s error: %s",
utils.ERs, string(msg.Key), err.Error()))
}
}
}(r)
return
if rdr.Config().ProcessedPath != utils.EmptyString { // post it
if err := engine.PostersCache.PostKafka(rdr.Config().ProcessedPath,
rdr.cgrCfg.GeneralCfg().PosterAttempts, msg.Value, "",
utils.META_NONE, string(msg.Key)); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> writing message %s error: %s",
utils.ERs, string(msg.Key), err.Error()))
}
}
if rdr.Config().ConcurrentReqs != -1 {
rdr.cap <- struct{}{}
}
}(msg)
}
}
func (rdr *KafkaER) processMessage(msg []byte) (err error) {
@@ -170,5 +198,9 @@ func (rdr *KafkaER) setURL(dialURL string) (err error) {
if vals, has := qry[utils.KafkaGroupID]; has && len(vals) != 0 {
rdr.groupID = vals[0]
}
rdr.maxWait = defaultMaxWait
if vals, has := qry[utils.KafkaMaxWait]; has && len(vals) != 0 {
rdr.maxWait, err = time.ParseDuration(vals[0])
}
return
}

View File

@@ -45,13 +45,12 @@ func TestKafkaER(t *testing.T) {
"enabled": true, // starts the EventReader service: <true|false>
"readers": [
{
"id": "kafka", // identifier of the EventReader profile
"type": "*kafka_json_map", // reader type <*file_csv>
"run_delay": -1, // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together
// "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited
"source_path": "localhost:9092", // read data from this path
"id": "kafka", // identifier of the EventReader profile
"type": "*kafka_json_map", // reader type <*file_csv>
"run_delay": -1, // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together
"concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited
"source_path": "localhost:9092", // read data from this path
// "processed_path": "/var/spool/cgrates/cdrc/out", // move processed data here
// "source_id": "ers_csv", // free form field, tag identifying the source of the CDRs within CDRS database
"tenant": "cgrates.org", // tenant used by import
"filters": [], // limit parsing based on the filters
"flags": [], // flags to influence the event processing
@@ -60,7 +59,6 @@ func TestKafkaER(t *testing.T) {
{"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "field_id": "CGRID"},
],
// "trailer_fields": [], // template of the import trailer fields
"continue": false, // continue to the next template if executed
},
],
},
@@ -110,9 +108,8 @@ func TestKafkaER(t *testing.T) {
if !reflect.DeepEqual(ev.cgrEvent, expected) {
t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent))
}
case <-time.After(30 * time.Second):
case <-time.After(10 * time.Second):
t.Errorf("Timeout")
}
rdrExit <- struct{}{}
}

View File

@@ -479,12 +479,10 @@ func TestCDRsOnExpKafkaPosterFileFailover(t *testing.T) {
failoverContent := [][]byte{[]byte(`{"CGRID":"57548d485d61ebcba55afbe5d939c82a8e9ff670"}`), []byte(`{"CGRID":"88ed9c38005f07576a1e1af293063833b60edcc6"}`)}
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "cgrates_cdrs",
GroupID: "tmp",
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
RebalanceTimeout: 1,
Brokers: []string{"localhost:9092"},
Topic: "cgrates_cdrs",
GroupID: "tmp",
MaxWait: time.Millisecond,
})
defer reader.Close()

View File

@@ -1178,6 +1178,7 @@ const (
AWSSecret = "aws_secret"
KafkaTopic = "topic"
KafkaGroupID = "group_id"
KafkaMaxWait = "max_wait"
)
// Google_API