From fc9eb41e7ef60d37d27714fd847f16b8b93a7b6f Mon Sep 17 00:00:00 2001 From: Trial97 Date: Mon, 19 Oct 2020 17:10:04 +0300 Subject: [PATCH] Added AMQPv1 Event Reader --- config/rjreader.go | 8 +- config/rjreader_test.go | 4 +- engine/pstr_amqpv1.go | 22 +--- ers/amqp.go | 8 +- ers/amqpv1.go | 206 ++++++++++++++++++++++++++++++++++++++ ers/amqpv1_it_test.go | 131 ++++++++++++++++++++++++ ers/kafka.go | 3 + ers/reader.go | 2 + ers/s3.go | 4 +- ers/sql.go | 4 +- ers/sqs.go | 6 +- packages/debian/changelog | 1 + 12 files changed, 364 insertions(+), 35 deletions(-) create mode 100644 ers/amqpv1.go create mode 100644 ers/amqpv1_it_test.go diff --git a/config/rjreader.go b/config/rjreader.go index 32abdb8bc..5121e7f23 100644 --- a/config/rjreader.go +++ b/config/rjreader.go @@ -30,7 +30,7 @@ import ( "github.com/cgrates/cgrates/utils" ) -// creates a new rjReader from a io.Reader +// NewRjReader creates a new rjReader from a io.Reader func NewRjReader(rdr io.Reader) (r *rjReader, err error) { var b []byte b, err = ioutil.ReadAll(rdr) @@ -40,7 +40,7 @@ func NewRjReader(rdr io.Reader) (r *rjReader, err error) { return NewRjReaderFromBytes(b), nil } -// creates a new rjReader from a slice of bytes +// NewRjReaderFromBytes creates a new rjReader from a slice of bytes func NewRjReaderFromBytes(b []byte) *rjReader { return &rjReader{buf: b} } @@ -289,12 +289,12 @@ func (rjr *rjReader) HandleJSONError(err error) error { } rjr.indx = 0 - line, character := rjr.getJsonOffsetLine(offset) + line, character := rjr.getJSONOffsetLine(offset) return fmt.Errorf("%s around line %v and position %v\n line: %q", err.Error(), line, character, strings.Split(string(rjr.buf), "\n")[line-1]) } -func (rjr *rjReader) getJsonOffsetLine(offset int64) (line, character int64) { +func (rjr *rjReader) getJSONOffsetLine(offset int64) (line, character int64) { line = 1 // start line counting from 1 var lastChar byte diff --git a/config/rjreader_test.go b/config/rjreader_test.go index 9d9a1dced..19242b04e 100644 --- a/config/rjreader_test.go +++ b/config/rjreader_test.go @@ -309,7 +309,7 @@ func TestGetErrorLine(t *testing.T) { r := NewRjReaderFromBytes([]byte(jsonstr)) var offset int64 = 31 var expLine, expChar int64 = 10, 23 - if line, character := r.getJsonOffsetLine(offset); expLine != line { + if line, character := r.getJSONOffsetLine(offset); expLine != line { t.Errorf("Expected line %v received:%v", expLine, line) } else if expChar != character { t.Errorf("Expected line %v received:%v", expChar, character) @@ -343,7 +343,7 @@ func TestGetErrorLine2(t *testing.T) { r := NewRjReaderFromBytes([]byte(jsonstr)) var offset int64 = 31 var expLine, expChar int64 = 10, 46 - if line, character := r.getJsonOffsetLine(offset); expLine != line { + if line, character := r.getJSONOffsetLine(offset); expLine != line { t.Errorf("Expected line %v received:%v", expLine, line) } else if expChar != character { t.Errorf("Expected line %v received:%v", expChar, character) diff --git a/engine/pstr_amqpv1.go b/engine/pstr_amqpv1.go index 88c0c56dc..85b6305e5 100644 --- a/engine/pstr_amqpv1.go +++ b/engine/pstr_amqpv1.go @@ -147,24 +147,10 @@ func (pstr *AMQPv1Poster) newPosterSession() (s *amqpv1.Session, err error) { return pstr.client.NewSession() } -func isRecoverableCloseError(err error) bool { - return err == amqpv1.ErrConnClosed || +func (pstr *AMQPv1Poster) isRecoverableError(err error) bool { + netErr, ok := err.(net.Error) + return (ok && netErr.Temporary()) || + err == amqpv1.ErrConnClosed || err == amqpv1.ErrLinkClosed || err == amqpv1.ErrSessionClosed } - -func (pstr *AMQPv1Poster) isRecoverableError(err error) bool { - switch err.(type) { - case *amqpv1.Error, *amqpv1.DetachError, net.Error: - if netErr, ok := err.(net.Error); ok { - if !netErr.Temporary() { - return false - } - } - default: - if !isRecoverableCloseError(err) { - return false - } - } - return true -} diff --git a/ers/amqp.go b/ers/amqp.go index 340b436b7..53c36e00d 100644 --- a/ers/amqp.go +++ b/ers/amqp.go @@ -30,7 +30,7 @@ import ( "github.com/streadway/amqp" ) -// NewAMQPER return a new kafka event reader +// NewAMQPER return a new amqp event reader func NewAMQPER(cfg *config.CGRConfig, cfgIdx int, rdrEvents chan *erEvent, rdrErr chan error, fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { @@ -54,7 +54,7 @@ func NewAMQPER(cfg *config.CGRConfig, cfgIdx int, return rdr, nil } -// AMQPER implements EventReader interface for kafka message +// AMQPER implements EventReader interface for amqp message type AMQPER struct { // sync.RWMutex cgrCfg *config.CGRConfig @@ -84,7 +84,7 @@ func (rdr *AMQPER) Config() *config.EventReaderCfg { return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] } -// Serve will start the gorutines needed to watch the kafka topic +// Serve will start the gorutines needed to watch the amqp topic func (rdr *AMQPER) Serve() (err error) { if rdr.conn, err = amqp.Dial(rdr.dialURL); err != nil { return @@ -151,7 +151,7 @@ func (rdr *AMQPER) readLoop(msgChan <-chan amqp.Delivery) { select { case <-rdr.rdrExit: utils.Logger.Info( - fmt.Sprintf("<%s> stop monitoring kafka path <%s>", + fmt.Sprintf("<%s> stop monitoring amqp path <%s>", utils.ERs, rdr.dialURL)) rdr.close() return diff --git a/ers/amqpv1.go b/ers/amqpv1.go new file mode 100644 index 000000000..1c9aa6926 --- /dev/null +++ b/ers/amqpv1.go @@ -0,0 +1,206 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ers + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/cgrates/cgrates/agents" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + amqpv1 "pack.ag/amqp" +) + +// NewAMQPv1ER return a new amqpv1 event reader +func NewAMQPv1ER(cfg *config.CGRConfig, cfgIdx int, + rdrEvents chan *erEvent, rdrErr chan error, + fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { + rdr := &AMQPv1ER{ + cgrCfg: cfg, + cfgIdx: cfgIdx, + fltrS: fltrS, + rdrEvents: rdrEvents, + rdrExit: rdrExit, + rdrErr: rdrErr, + } + if concReq := rdr.Config().ConcurrentReqs; concReq != -1 { + rdr.cap = make(chan struct{}, concReq) + for i := 0; i < concReq; i++ { + rdr.cap <- struct{}{} + } + } + if vals, has := rdr.Config().Opts[utils.QueueID]; has { + rdr.queueID = "/" + utils.IfaceAsString(vals) + } + rdr.createPoster() + return rdr, nil +} + +// AMQPv1ER implements EventReader interface for amqpv1 message +type AMQPv1ER struct { + // sync.RWMutex + cgrCfg *config.CGRConfig + cfgIdx int // index of config instance within ERsCfg.Readers + fltrS *engine.FilterS + + queueID string + + rdrEvents chan *erEvent // channel to dispatch the events created to + rdrExit chan struct{} + rdrErr chan error + cap chan struct{} + + conn *amqpv1.Client + ses *amqpv1.Session + + poster engine.Poster +} + +// Config returns the curent configuration +func (rdr *AMQPv1ER) Config() *config.EventReaderCfg { + return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] +} + +// Serve will start the gorutines needed to watch the amqpv1 topic +func (rdr *AMQPv1ER) Serve() (err error) { + if rdr.conn, err = amqpv1.Dial(rdr.Config().SourcePath); err != nil { + return + } + if rdr.ses, err = rdr.conn.NewSession(); err != nil { + rdr.close() + return + } + if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API + return + } + + var receiver *amqpv1.Receiver + if receiver, err = rdr.ses.NewReceiver( + amqpv1.LinkSourceAddress(rdr.queueID), + ); err != nil { + return + } + go func() { + select { + case <-rdr.rdrExit: + receiver.Close(context.Background()) + rdr.close() + } + }() + + go rdr.readLoop(receiver) // read until the connection is closed + return +} + +func (rdr *AMQPv1ER) readLoop(recv *amqpv1.Receiver) (err error) { + for { + if rdr.Config().ConcurrentReqs != -1 { + <-rdr.cap // do not try to read if the limit is reached + } + ctx := context.Background() + var msg *amqpv1.Message + if msg, err = recv.Receive(ctx); err != nil { + if err == amqpv1.ErrLinkClosed { + err = nil + return + } + rdr.rdrErr <- err + return + } + if err = msg.Accept(); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> unable to accept message error: %s", + utils.ERs, err.Error())) + continue + } + + go func(msg *amqpv1.Message) { + body := msg.GetData() + if err := rdr.processMessage(body); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> processing message error: %s", + utils.ERs, err.Error())) + } + if rdr.poster != nil { // post it + if err := rdr.poster.Post(body, utils.EmptyString); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> writing message error: %s", + utils.ERs, err.Error())) + } + } + if rdr.Config().ConcurrentReqs != -1 { + rdr.cap <- struct{}{} + } + }(msg) + } +} + +func (rdr *AMQPv1ER) processMessage(msg []byte) (err error) { + var decodedMessage map[string]interface{} + if err = json.Unmarshal(msg, &decodedMessage); err != nil { + return + } + agReq := agents.NewAgentRequest( + utils.MapStorage(decodedMessage), nil, + nil, nil, nil, rdr.Config().Tenant, + rdr.cgrCfg.GeneralCfg().DefaultTenant, + utils.FirstNonEmpty(rdr.Config().Timezone, + rdr.cgrCfg.GeneralCfg().DefaultTimezone), + rdr.fltrS, nil, nil) // create an AgentRequest + var pass bool + if pass, err = rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, + agReq); err != nil || !pass { + return + } + if err = agReq.SetFields(rdr.Config().Fields); err != nil { + return + } + rdr.rdrEvents <- &erEvent{ + cgrEvent: config.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep), + rdrCfg: rdr.Config(), + opts: config.NMAsMapInterface(agReq.Opts, utils.NestingSep), + } + return +} + +func (rdr *AMQPv1ER) close() (err error) { + if rdr.poster != nil { + rdr.poster.Close() + } + if rdr.ses != nil { + if err = rdr.ses.Close(context.Background()); err != nil { + return + } + } + return rdr.conn.Close() +} + +func (rdr *AMQPv1ER) createPoster() { + processedOpt := getProcessOptions(rdr.Config().Opts) + if len(processedOpt) == 0 && + len(rdr.Config().ProcessedPath) == 0 { + return + } + rdr.poster = engine.NewAMQPv1Poster(utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), + rdr.cgrCfg.GeneralCfg().PosterAttempts, processedOpt) +} diff --git a/ers/amqpv1_it_test.go b/ers/amqpv1_it_test.go new file mode 100644 index 000000000..c6005ff66 --- /dev/null +++ b/ers/amqpv1_it_test.go @@ -0,0 +1,131 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ers + +import ( + "context" + "flag" + "fmt" + "reflect" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + amqpv1 "pack.ag/amqp" +) + +var ( + itTestAMQPv1 = flag.Bool("amqpv1", false, "Run the test for AMQPv1Reader") +) + +func TestAMQPERv1(t *testing.T) { + if !*itTestAMQPv1 { + t.SkipNow() + } + cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ +"ers": { // EventReaderService + "enabled": true, // starts the EventReader service: + "readers": [ + { + "id": "amqpv1", // identifier of the EventReader profile + "type": "*amqpv1_json_map", // reader type <*file_csv> + "run_delay": "-1", // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together + "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited + "source_path": "amqps://RootManageSharedAccessKey:Je8l%2Bt9tyOgZbdA%2B5SmGIJEsEzhZ9VdIO7yRke5EYtM%3D@test0123456y.servicebus.windows.net",// read data from this path + "opts": { + "queueID": "cdrs3", + }, + "processed_path": "", // move processed data here + "tenant": "cgrates.org", // tenant used by import + "filters": [], // limit parsing based on the filters + "flags": [], // flags to influence the event processing + "fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value + {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, + ], + }, + ], +}, +}`) + if err != nil { + t.Fatal(err) + } + utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + + rdrEvents = make(chan *erEvent, 1) + rdrErr = make(chan error, 1) + rdrExit = make(chan struct{}, 1) + + if rdr, err = NewAMQPv1ER(cfg, 1, rdrEvents, + rdrErr, new(engine.FilterS), rdrExit); err != nil { + t.Fatal(err) + } + amqpv1Rdr := rdr.(*AMQPv1ER) + connection, err := amqpv1.Dial("amqps://RootManageSharedAccessKey:Je8l%2Bt9tyOgZbdA%2B5SmGIJEsEzhZ9VdIO7yRke5EYtM%3D@test0123456y.servicebus.windows.net") + if err != nil { + t.Fatal(err) + } + defer connection.Close() + + channel, err := connection.NewSession() + if err != nil { + t.Fatal(err) + } + defer channel.Close(context.Background()) + + randomCGRID := utils.UUIDSha1Prefix() + sndr, err := channel.NewSender(amqpv1.LinkTargetAddress(amqpv1Rdr.queueID)) + if err != nil { + t.Fatal(err) + } + if err = sndr.Send(context.Background(), + amqpv1.NewMessage([]byte(fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID)))); err != nil { + t.Fatal(err) + } + if err = rdr.Serve(); err != nil { + t.Fatal(err) + } + select { + case err = <-rdrErr: + t.Error(err) + case ev := <-rdrEvents: + if ev.rdrCfg.ID != "amqpv1" { + t.Errorf("Expected 'amqpv1' received `%s`", ev.rdrCfg.ID) + } + expected := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: ev.cgrEvent.ID, + Time: ev.cgrEvent.Time, + Event: map[string]interface{}{ + "CGRID": randomCGRID, + }, + } + if !reflect.DeepEqual(ev.cgrEvent, expected) { + t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent)) + } + case <-time.After(10 * time.Second): + t.Fatal("Timeout") + } + + close(rdrExit) +} diff --git a/ers/kafka.go b/ers/kafka.go index 00ed4cad4..449124067 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -104,6 +104,9 @@ func (rdr *KafkaER) Serve() (err error) { utils.Logger.Info( fmt.Sprintf("<%s> stop monitoring kafka path <%s>", utils.ERs, rdr.dialURL)) + if rdr.poster != nil { + rdr.poster.Close() + } r.Close() // already locked in library return } diff --git a/ers/reader.go b/ers/reader.go index 15183f67d..3332a1be3 100644 --- a/ers/reader.go +++ b/ers/reader.go @@ -61,6 +61,8 @@ func NewEventReader(cfg *config.CGRConfig, cfgIdx int, return NewS3ER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) case utils.MetaSQSjsonMap: return NewSQSER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) + case utils.MetaAMQPV1jsonMap: + return NewAMQPv1ER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) } return } diff --git a/ers/s3.go b/ers/s3.go index ced81408d..43fc8bef4 100644 --- a/ers/s3.go +++ b/ers/s3.go @@ -57,7 +57,7 @@ func NewS3ER(cfg *config.CGRConfig, cfgIdx int, return rdr, nil } -// S3ER implements EventReader interface for kafka message +// S3ER implements EventReader interface for s3 message type S3ER struct { // sync.RWMutex cgrCfg *config.CGRConfig @@ -84,7 +84,7 @@ func (rdr *S3ER) Config() *config.EventReaderCfg { return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] } -// Serve will start the gorutines needed to watch the kafka topic +// Serve will start the gorutines needed to watch the s3 topic func (rdr *S3ER) Serve() (err error) { var sess *session.Session cfg := aws.Config{Endpoint: aws.String(rdr.Config().SourcePath)} diff --git a/ers/sql.go b/ers/sql.go index 445818282..434f9261e 100644 --- a/ers/sql.go +++ b/ers/sql.go @@ -35,7 +35,7 @@ import ( _ "github.com/lib/pq" ) -// NewSQLEventReader return a new kafka event reader +// NewSQLEventReader return a new sql event reader func NewSQLEventReader(cfg *config.CGRConfig, cfgIdx int, rdrEvents chan *erEvent, rdrErr chan error, fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { @@ -88,7 +88,7 @@ func (rdr *SQLEventReader) Config() *config.EventReaderCfg { return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] } -// Serve will start the gorutines needed to watch the kafka topic +// Serve will start the gorutines needed to watch the sql topic func (rdr *SQLEventReader) Serve() (err error) { var db *gorm.DB if db, err = gorm.Open(rdr.connType, rdr.connString); err != nil { diff --git a/ers/sqs.go b/ers/sqs.go index 80c5c9376..3e5a2cba2 100644 --- a/ers/sqs.go +++ b/ers/sqs.go @@ -34,7 +34,7 @@ import ( "github.com/cgrates/cgrates/utils" ) -// NewSQSER return a new s3 event reader +// NewSQSER return a new sqs event reader func NewSQSER(cfg *config.CGRConfig, cfgIdx int, rdrEvents chan *erEvent, rdrErr chan error, fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { @@ -57,7 +57,7 @@ func NewSQSER(cfg *config.CGRConfig, cfgIdx int, return rdr, nil } -// SQSER implements EventReader interface for kafka message +// SQSER implements EventReader interface for sqs message type SQSER struct { // sync.RWMutex cgrCfg *config.CGRConfig @@ -85,7 +85,7 @@ func (rdr *SQSER) Config() *config.EventReaderCfg { return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] } -// Serve will start the gorutines needed to watch the kafka topic +// Serve will start the gorutines needed to watch the sqs topic func (rdr *SQSER) Serve() (err error) { if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API return diff --git a/packages/debian/changelog b/packages/debian/changelog index 4e7d4efd7..6c7a9f6e9 100644 --- a/packages/debian/changelog +++ b/packages/debian/changelog @@ -114,6 +114,7 @@ cgrates (0.11.0~dev) UNRELEASED; urgency=medium * [DataDB] Add support for redis with TLS connection ( + integration test ) * [ERs] Added support for *s3_json_map type * [ERs] Added support for *sqs_json_map type + * [ERs] Added support for *amqpv1_json_map type -- DanB Wed, 19 Feb 2020 13:25:52 +0200