From 92286c9d2ed81d414e022bd590e1ebaf9bf0a4ee Mon Sep 17 00:00:00 2001 From: nickolasdaniel Date: Tue, 4 May 2021 13:35:28 +0300 Subject: [PATCH] Coverage tests for ers --- ers/sqs.go | 12 +- ers/sqs_test.go | 324 ++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 322 insertions(+), 14 deletions(-) diff --git a/ers/sqs.go b/ers/sqs.go index 98e10f9e3..822f2b4b1 100644 --- a/ers/sqs.go +++ b/ers/sqs.go @@ -81,6 +81,11 @@ type SQSER struct { poster engine.Poster } +type sqsClient interface { + ReceiveMessage(input *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) + DeleteMessage(input *sqs.DeleteMessageInput) (*sqs.DeleteMessageOutput, error) +} + // Config returns the curent configuration func (rdr *SQSER) Config() *config.EventReaderCfg { return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] @@ -91,7 +96,7 @@ func (rdr *SQSER) Serve() (err error) { if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API return } - go rdr.readLoop() // read until the connection is closed + go rdr.readLoop(sqs.New(rdr.session)) // read until the connection is closed return } @@ -174,8 +179,7 @@ func (rdr *SQSER) getQueueURL() (err error) { return } -func (rdr *SQSER) readLoop() (err error) { - scv := sqs.New(rdr.session) +func (rdr *SQSER) readLoop(scv sqsClient) (err error) { for !rdr.isClosed() { if rdr.Config().ConcurrentReqs != -1 { <-rdr.cap // do not try to read if the limit is reached @@ -218,7 +222,7 @@ func (rdr *SQSER) isClosed() bool { } } -func (rdr *SQSER) readMsg(scv *sqs.SQS, msg *sqs.Message) (err error) { +func (rdr *SQSER) readMsg(scv sqsClient, msg *sqs.Message) (err error) { if rdr.Config().ConcurrentReqs != -1 { defer func() { rdr.cap <- struct{}{} }() } diff --git a/ers/sqs_test.go b/ers/sqs_test.go index 8558b26b1..afbbb041e 100644 --- a/ers/sqs_test.go +++ b/ers/sqs_test.go @@ -327,6 +327,98 @@ func TestSQSERIsClosed(t *testing.T) { } } +// Mock the SCV +type sqsClientMock struct { + ReceiveMessageF func(input *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) + DeleteMessageF func(input *sqs.DeleteMessageInput) (*sqs.DeleteMessageOutput, error) + // GetQueueUrlF func(input *sqs.GetQueueUrlInput) (*sqs.GetQueueUrlOutput, error) + // CreateQueueF func(input *sqs.CreateQueueInput) (*sqs.CreateQueueOutput, error) +} + +func (s *sqsClientMock) ReceiveMessage(input *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) { + if s.ReceiveMessageF != nil { + return s.ReceiveMessageF(input) + } + return nil, utils.ErrNotFound +} + +func (s *sqsClientMock) DeleteMessage(input *sqs.DeleteMessageInput) (*sqs.DeleteMessageOutput, error) { + if s.DeleteMessageF != nil { + return s.DeleteMessageF(input) + } + return nil, utils.ErrNotImplemented +} + +// func (s *sqsClientMock) GetQueueUrl(input *sqs.GetQueueUrlInput) (*sqs.GetQueueUrlOutput, error) { +// if s.GetQueueUrlF != nil { +// return s.GetQueueUrlF(input) +// } +// return nil, nil +// } + +// func (s *sqsClientMock) CreateQueue(input *sqs.CreateQueueInput) (*sqs.CreateQueueOutput, error) { +// if s.CreateQueueF != nil { +// return s.CreateQueueF(input) +// } +// return nil, utils.ErrInvalidPath +// } + +func TestSQSERReadMsg(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &SQSER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + // queueURL: utils.StringPointer("url"), + session: nil, + poster: nil, + } + awsCfg := aws.Config{Endpoint: aws.String(rdr.Config().SourcePath)} + rdr.session, _ = session.NewSessionWithOptions( + session.Options{ + Config: awsCfg, + }, + ) + + rdr.Config().ConcurrentReqs = -1 + rdr.Config().Fields = []*config.FCTemplate{ + { + Tag: "Tor", + Type: utils.MetaConstant, + Value: config.NewRSRParsersMustCompile("*voice", utils.InfieldSep), + Path: "*cgreq.ToR", + }, + } + rdr.Config().Fields[0].ComputePath() + receiveMessage := func(input *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) { + return nil, nil + } + deleteMessage := func(input *sqs.DeleteMessageInput) (*sqs.DeleteMessageOutput, error) { + return nil, nil + } + scv := &sqsClientMock{ + ReceiveMessageF: receiveMessage, + DeleteMessageF: deleteMessage, + } + msg := &sqs.Message{ + Body: utils.StringPointer(`{"msgBody":"BODY"}`), + MessageId: utils.StringPointer(`{"msgId":"MESSAGE"}`), + ReceiptHandle: utils.StringPointer(`{"msgReceiptHandle":"RECEIPT_HANDLE"}`), + } + if err := rdr.readMsg(scv, msg); err != nil { + t.Error(err) + } +} + func TestSQSERReadMsgError1(t *testing.T) { cfg := config.NewDefaultCGRConfig() rdr := &SQSER{ @@ -352,16 +444,6 @@ func TestSQSERReadMsgError1(t *testing.T) { Config: awsCfg, }, ) - // rdrEvents := make(chan *erEvent, 1) - // rdrErr := make(chan error, 1) - // rdrExit := make(chan struct{}, 1) - - // sqsRdr, err := NewSQSER(cfg, 1, rdrEvents, - // rdrErr, new(engine.FilterS), rdrExit) - // if err != nil { - // t.Error(err) - // } - // rdr := sqsRdr.(*SQSER) rdr.Config().ConcurrentReqs = -1 rdr.Config().Fields = []*config.FCTemplate{ { @@ -383,3 +465,225 @@ func TestSQSERReadMsgError1(t *testing.T) { t.Errorf("Expected %v but received %v", errExp, err) } } + +func TestSQSERReadMsgError2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &SQSER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + awsCfg := aws.Config{Endpoint: aws.String(rdr.Config().SourcePath)} + rdr.session, _ = session.NewSessionWithOptions( + session.Options{ + Config: awsCfg, + }, + ) + rdr.Config().ConcurrentReqs = -1 + rdr.Config().Fields = []*config.FCTemplate{ + { + Tag: "Tor", + Type: utils.MetaConstant, + Value: config.NewRSRParsersMustCompile("*voice", utils.InfieldSep), + Path: "*cgreq.ToR", + }, + } + rdr.Config().Fields[0].ComputePath() + receiveMessage := func(input *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) { + return nil, nil + } + scv := &sqsClientMock{ + ReceiveMessageF: receiveMessage, + } + msg := &sqs.Message{ + Body: utils.StringPointer(`{"msgBody":"BODY"}`), + MessageId: utils.StringPointer(`{"msgId":"MESSAGE"}`), + ReceiptHandle: utils.StringPointer(`{"msgReceiptHandle":"RECEIPT_HANDLE"}`), + } + errExp := "NOT_IMPLEMENTED" + if err := rdr.readMsg(scv, msg); err == nil || err.Error() != errExp { + t.Errorf("Expected %v but received %v", errExp, err) + } +} + +func TestSQSERReadMsgError3(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &SQSER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: engine.NewSQSPoster("url", 1, make(map[string]interface{})), + } + awsCfg := aws.Config{Endpoint: aws.String(rdr.Config().SourcePath)} + rdr.session, _ = session.NewSessionWithOptions( + session.Options{ + Config: awsCfg, + }, + ) + rdr.Config().ConcurrentReqs = -1 + rdr.Config().Fields = []*config.FCTemplate{ + { + Tag: "Tor", + Type: utils.MetaConstant, + Value: config.NewRSRParsersMustCompile("*voice", utils.InfieldSep), + Path: "*cgreq.ToR", + }, + } + rdr.Config().Fields[0].ComputePath() + receiveMessage := func(input *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) { + return nil, nil + } + deleteMessage := func(input *sqs.DeleteMessageInput) (*sqs.DeleteMessageOutput, error) { + return nil, nil + } + scv := &sqsClientMock{ + ReceiveMessageF: receiveMessage, + DeleteMessageF: deleteMessage, + } + msg := &sqs.Message{ + Body: utils.StringPointer(`{"msgBody":"BODY"}`), + MessageId: utils.StringPointer(`{"msgId":"MESSAGE"}`), + ReceiptHandle: utils.StringPointer(`{"msgReceiptHandle":"RECEIPT_HANDLE"}`), + } + errExp := "MissingRegion: could not find region configuration" + if err := rdr.readMsg(scv, msg); err == nil || err.Error() != errExp { + t.Errorf("Expected %v but received %v", errExp, err) + } +} + +func TestSQSERReadLoop(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &SQSER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + cap: make(chan struct{}, 1), + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + queueURL: utils.StringPointer("testQueueURL"), + session: nil, + poster: nil, + } + rdr.cap <- struct{}{} + rdr.Config().ConcurrentReqs = 1 + counter := 0 + receiveMessage := func(input *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) { + msg := &sqs.ReceiveMessageOutput{ + Messages: []*sqs.Message{ + { + Body: utils.StringPointer(`{"msgBody":"BODY"`), + MessageId: utils.StringPointer(`{"msgId":"MESSAGE"}`), + }, + }, + } + if counter == 0 { + counter++ + return msg, nil + } + return nil, utils.ErrNotImplemented + } + scv := &sqsClientMock{ + ReceiveMessageF: receiveMessage, + } + errExpect := "NOT_IMPLEMENTED" + if err := rdr.readLoop(scv); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } +} + +func TestSQSERReadLoop2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &SQSER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + cap: make(chan struct{}, 1), + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + queueURL: utils.StringPointer("testQueueURL"), + session: nil, + poster: nil, + } + rdr.cap <- struct{}{} + rdr.Config().ConcurrentReqs = 1 + counter := 0 + receiveMessage := func(input *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) { + msg := &sqs.ReceiveMessageOutput{ + Messages: []*sqs.Message{}, + } + if counter == 0 { + counter++ + return msg, nil + } + return nil, utils.ErrNotImplemented + } + scv := &sqsClientMock{ + ReceiveMessageF: receiveMessage, + } + errExpect := "NOT_IMPLEMENTED" + if err := rdr.readLoop(scv); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } + rdr.rdrExit <- struct{}{} + if err := rdr.readLoop(scv); err != nil { + t.Error(err) + } +} + +func TestSQSERGetQueueURL(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &SQSER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + // scv := &sqsClientMock{} + rdr.queueURL = utils.StringPointer("queueURL") + if err := rdr.getQueueURL(); err != nil { + t.Error(err) + } +}