diff --git a/ers/reader.go b/ers/reader.go index 11e936e87..15183f67d 100644 --- a/ers/reader.go +++ b/ers/reader.go @@ -59,6 +59,8 @@ func NewEventReader(cfg *config.CGRConfig, cfgIdx int, return NewAMQPER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) case utils.MetaS3jsonMap: return NewS3ER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) + case utils.MetaSQSjsonMap: + return NewSQSER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) } return } diff --git a/ers/s3.go b/ers/s3.go index a8e11b2fb..ced81408d 100644 --- a/ers/s3.go +++ b/ers/s3.go @@ -186,7 +186,7 @@ func (rdr *S3ER) createPoster() { len(rdr.Config().ProcessedPath) == 0 { return } - rdr.poster = engine.NewKafkaPoster(utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), + rdr.poster = engine.NewS3Poster(utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), rdr.cgrCfg.GeneralCfg().PosterAttempts, processedOpt) } diff --git a/ers/s3_it_test.go b/ers/s3_it_test.go index bf8af074f..1ef083948 100644 --- a/ers/s3_it_test.go +++ b/ers/s3_it_test.go @@ -38,7 +38,7 @@ import ( ) var ( - itTestS3 = flag.Bool("s4", false, "Run the test for S3Reader") + itTestS3 = flag.Bool("s3", false, "Run the test for S3Reader") ) func TestS3ER(t *testing.T) { diff --git a/ers/sqs.go b/ers/sqs.go new file mode 100644 index 000000000..80c5c9376 --- /dev/null +++ b/ers/sqs.go @@ -0,0 +1,267 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ers + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/cgrates/cgrates/agents" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +// NewSQSER return a new s3 event reader +func NewSQSER(cfg *config.CGRConfig, cfgIdx int, + rdrEvents chan *erEvent, rdrErr chan error, + fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { + + rdr := &SQSER{ + cgrCfg: cfg, + cfgIdx: cfgIdx, + fltrS: fltrS, + rdrEvents: rdrEvents, + rdrExit: rdrExit, + rdrErr: rdrErr, + } + if concReq := rdr.Config().ConcurrentReqs; concReq != -1 { + rdr.cap = make(chan struct{}, concReq) + for i := 0; i < concReq; i++ { + rdr.cap <- struct{}{} + } + } + rdr.parseOpts(rdr.Config().Opts) + return rdr, nil +} + +// SQSER implements EventReader interface for kafka message +type SQSER struct { + // sync.RWMutex + cgrCfg *config.CGRConfig + cfgIdx int // index of config instance within ERsCfg.Readers + fltrS *engine.FilterS + + rdrEvents chan *erEvent // channel to dispatch the events created to + rdrExit chan struct{} + rdrErr chan error + cap chan struct{} + + queueURL *string + awsRegion string + awsID string + awsKey string + awsToken string + queueID string + session *session.Session + + poster engine.Poster +} + +// Config returns the curent configuration +func (rdr *SQSER) Config() *config.EventReaderCfg { + return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] +} + +// Serve will start the gorutines needed to watch the kafka topic +func (rdr *SQSER) Serve() (err error) { + if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API + return + } + go rdr.readLoop() // read until the connection is closed + return +} + +func (rdr *SQSER) processMessage(body []byte) (err error) { + var decodedMessage map[string]interface{} + if err = json.Unmarshal(body, &decodedMessage); err != nil { + return + } + + agReq := agents.NewAgentRequest( + utils.MapStorage(decodedMessage), nil, + nil, nil, nil, rdr.Config().Tenant, + rdr.cgrCfg.GeneralCfg().DefaultTenant, + utils.FirstNonEmpty(rdr.Config().Timezone, + rdr.cgrCfg.GeneralCfg().DefaultTimezone), + rdr.fltrS, nil, nil) // create an AgentRequest + var pass bool + if pass, err = rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, + agReq); err != nil || !pass { + return + } + if err = agReq.SetFields(rdr.Config().Fields); err != nil { + return + } + rdr.rdrEvents <- &erEvent{ + cgrEvent: config.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep), + rdrCfg: rdr.Config(), + opts: config.NMAsMapInterface(agReq.Opts, utils.NestingSep), + } + return +} + +func (rdr *SQSER) parseOpts(opts map[string]interface{}) { + rdr.queueID = utils.DefaultQueueID + if val, has := opts[utils.QueueID]; has { + rdr.queueID = utils.IfaceAsString(val) + } + if val, has := opts[utils.AWSRegion]; has { + rdr.awsRegion = utils.IfaceAsString(val) + } + if val, has := opts[utils.AWSKey]; has { + rdr.awsID = utils.IfaceAsString(val) + } + if val, has := opts[utils.AWSSecret]; has { + rdr.awsKey = utils.IfaceAsString(val) + } + if val, has := opts[utils.AWSToken]; has { + rdr.awsToken = utils.IfaceAsString(val) + } + rdr.getQueueURL() +} + +func (rdr *SQSER) getQueueURL() (err error) { + if rdr.queueURL != nil { + return nil + } + if err = rdr.newSession(); err != nil { + return + } + svc := sqs.New(rdr.session) + var result *sqs.GetQueueUrlOutput + if result, err = svc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String(rdr.queueID), + }); err == nil { + rdr.queueURL = new(string) + *(rdr.queueURL) = *result.QueueUrl + return + } + if aerr, ok := err.(awserr.Error); ok && aerr.Code() == sqs.ErrCodeQueueDoesNotExist { + // For CreateQueue + var createResult *sqs.CreateQueueOutput + if createResult, err = svc.CreateQueue(&sqs.CreateQueueInput{ + QueueName: aws.String(rdr.queueID), + }); err == nil { + rdr.queueURL = utils.StringPointer(*createResult.QueueUrl) + return + } + } + utils.Logger.Warning(fmt.Sprintf(" can not get url for queue with ID=%s because err: %v", rdr.queueID, err)) + return +} + +func (rdr *SQSER) readLoop() (err error) { + scv := sqs.New(rdr.session) + for !rdr.isClosed() { + if rdr.Config().ConcurrentReqs != -1 { + <-rdr.cap // do not try to read if the limit is reached + } + var msgs *sqs.ReceiveMessageOutput + if msgs, err = scv.ReceiveMessage(&sqs.ReceiveMessageInput{ + QueueUrl: rdr.queueURL, + MaxNumberOfMessages: aws.Int64(1), + WaitTimeSeconds: aws.Int64(1), + }); err != nil { + return + } + if len(msgs.Messages) != 0 { + go rdr.readMsg(scv, msgs.Messages[0]) + } else if rdr.Config().ConcurrentReqs != -1 { + rdr.cap <- struct{}{} + } + + } + + return +} + +func (rdr *SQSER) createPoster() { + processedOpt := getProcessOptions(rdr.Config().Opts) + if len(processedOpt) == 0 && + len(rdr.Config().ProcessedPath) == 0 { + return + } + rdr.poster = engine.NewSQSPoster(utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), + rdr.cgrCfg.GeneralCfg().PosterAttempts, processedOpt) +} + +func (rdr *SQSER) isClosed() bool { + select { + case <-rdr.rdrExit: + return true + default: + return false + } +} + +func (rdr *SQSER) readMsg(scv *sqs.SQS, msg *sqs.Message) (err error) { + if rdr.Config().ConcurrentReqs != -1 { + defer func() { rdr.cap <- struct{}{} }() + } + body := []byte(*msg.Body) + key := *msg.MessageId + if err = rdr.processMessage(body); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> processing message %s error: %s", + utils.ERs, key, err.Error())) + return + } + if _, err = scv.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: rdr.queueURL, + ReceiptHandle: msg.ReceiptHandle, + }); err != nil { + rdr.rdrErr <- err + return + } + + if rdr.poster != nil { // post it + if err = rdr.poster.Post(body, key); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> writing message %s error: %s", + utils.ERs, key, err.Error())) + return + } + } + + return +} + +func (rdr *SQSER) newSession() (err error) { + cfg := aws.Config{Endpoint: aws.String(rdr.Config().SourcePath)} + if len(rdr.awsRegion) != 0 { + cfg.Region = aws.String(rdr.awsRegion) + } + if len(rdr.awsID) != 0 && + len(rdr.awsKey) != 0 { + cfg.Credentials = credentials.NewStaticCredentials(rdr.awsID, rdr.awsKey, rdr.awsToken) + } + rdr.session, err = session.NewSessionWithOptions( + session.Options{ + Config: cfg, + }, + ) + return +} diff --git a/ers/sqs_it_test.go b/ers/sqs_it_test.go new file mode 100644 index 000000000..aaf80f194 --- /dev/null +++ b/ers/sqs_it_test.go @@ -0,0 +1,130 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ers + +import ( + "flag" + "fmt" + "reflect" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + itTestSQS = flag.Bool("sqs", false, "Run the test for SQSReader") +) + +func TestSQSER(t *testing.T) { + if !*itTestSQS { + t.SkipNow() + } + cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ +"ers": { // EventReaderService + "enabled": true, // starts the EventReader service: + "readers": [ + { + "id": "sqs", // identifier of the EventReader profile + "type": "*sqs_json_map", // reader type <*file_csv> + "run_delay": "-1", // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together + "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited + "source_path": "sqs.us-east-2.amazonaws.com", // read data from this path + // "processed_path": "/var/spool/cgrates/ers/out", // move processed data here + "tenant": "cgrates.org", // tenant used by import + "filters": [], // limit parsing based on the filters + "flags": [], // flags to influence the event processing + "opts": { + "queueID": "cgrates-cdrs", + "awsRegion": "us-east-2", + "awsKey": "AWSAccessKeyId", + "awsSecret": "AWSSecretKey", + // "awsToken": "". + }, + "fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value + {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, + ], + }, + ], +}, +}`) + if err != nil { + t.Fatal(err) + } + + rdrEvents = make(chan *erEvent, 1) + rdrErr = make(chan error, 1) + rdrExit = make(chan struct{}, 1) + + if rdr, err = NewSQSER(cfg, 1, rdrEvents, + rdrErr, new(engine.FilterS), rdrExit); err != nil { + t.Fatal(err) + } + sqsRdr := rdr.(*SQSER) + var sess *session.Session + awsCfg := aws.Config{Endpoint: aws.String(rdr.Config().SourcePath)} + awsCfg.Region = aws.String(sqsRdr.awsRegion) + awsCfg.Credentials = credentials.NewStaticCredentials(sqsRdr.awsID, sqsRdr.awsKey, sqsRdr.awsToken) + + if sess, err = session.NewSessionWithOptions(session.Options{Config: awsCfg}); err != nil { + return + } + scv := sqs.New(sess) + + randomCGRID := utils.UUIDSha1Prefix() + scv.SendMessage(&sqs.SendMessageInput{ + MessageBody: aws.String(fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID)), + QueueUrl: sqsRdr.queueURL, + }) + + if err = rdr.Serve(); err != nil { + t.Fatal(err) + } + + select { + case err = <-rdrErr: + t.Error(err) + case ev := <-rdrEvents: + if ev.rdrCfg.ID != "sqs" { + t.Errorf("Expected 'sqs' received `%s`", ev.rdrCfg.ID) + } + expected := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: ev.cgrEvent.ID, + Time: ev.cgrEvent.Time, + Event: map[string]interface{}{ + "CGRID": randomCGRID, + }, + } + if !reflect.DeepEqual(ev.cgrEvent, expected) { + t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent)) + } + case <-time.After(10 * time.Second): + t.Fatal("Timeout") + } + close(rdrExit) +} diff --git a/packages/debian/changelog b/packages/debian/changelog index 7df8e94aa..4e7d4efd7 100644 --- a/packages/debian/changelog +++ b/packages/debian/changelog @@ -113,6 +113,7 @@ cgrates (0.11.0~dev) UNRELEASED; urgency=medium * [ConfigS] Add "redis_" prefix to "dataDB" option for redis * [DataDB] Add support for redis with TLS connection ( + integration test ) * [ERs] Added support for *s3_json_map type + * [ERs] Added support for *sqs_json_map type -- DanB Wed, 19 Feb 2020 13:25:52 +0200