diff --git a/ers/sqs.go b/ers/sqs.go index 822f2b4b1..2285a10ac 100644 --- a/ers/sqs.go +++ b/ers/sqs.go @@ -84,6 +84,8 @@ type SQSER struct { type sqsClient interface { ReceiveMessage(input *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) DeleteMessage(input *sqs.DeleteMessageInput) (*sqs.DeleteMessageOutput, error) + GetQueueUrl(input *sqs.GetQueueUrlInput) (*sqs.GetQueueUrlOutput, error) + CreateQueue(input *sqs.CreateQueueInput) (*sqs.CreateQueueOutput, error) } // Config returns the curent configuration @@ -156,7 +158,10 @@ func (rdr *SQSER) getQueueURL() (err error) { if err = rdr.newSession(); err != nil { return } - svc := sqs.New(rdr.session) + return rdr.getQueueURLWithClient(sqs.New(rdr.session)) +} + +func (rdr *SQSER) getQueueURLWithClient(svc sqsClient) (err error) { var result *sqs.GetQueueUrlOutput if result, err = svc.GetQueueUrl(&sqs.GetQueueUrlInput{ QueueName: aws.String(rdr.queueID), diff --git a/ers/sqs_test.go b/ers/sqs_test.go index afbbb041e..60d4af929 100644 --- a/ers/sqs_test.go +++ b/ers/sqs_test.go @@ -331,8 +331,8 @@ func TestSQSERIsClosed(t *testing.T) { type sqsClientMock struct { ReceiveMessageF func(input *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) DeleteMessageF func(input *sqs.DeleteMessageInput) (*sqs.DeleteMessageOutput, error) - // GetQueueUrlF func(input *sqs.GetQueueUrlInput) (*sqs.GetQueueUrlOutput, error) - // CreateQueueF func(input *sqs.CreateQueueInput) (*sqs.CreateQueueOutput, error) + GetQueueUrlF func(input *sqs.GetQueueUrlInput) (*sqs.GetQueueUrlOutput, error) + CreateQueueF func(input *sqs.CreateQueueInput) (*sqs.CreateQueueOutput, error) } func (s *sqsClientMock) ReceiveMessage(input *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) { @@ -349,19 +349,19 @@ func (s *sqsClientMock) DeleteMessage(input *sqs.DeleteMessageInput) (*sqs.Delet return nil, utils.ErrNotImplemented } -// func (s *sqsClientMock) GetQueueUrl(input *sqs.GetQueueUrlInput) (*sqs.GetQueueUrlOutput, error) { -// if s.GetQueueUrlF != nil { -// return s.GetQueueUrlF(input) -// } -// return nil, nil -// } +func (s *sqsClientMock) GetQueueUrl(input *sqs.GetQueueUrlInput) (*sqs.GetQueueUrlOutput, error) { + if s.GetQueueUrlF != nil { + return s.GetQueueUrlF(input) + } + return nil, nil +} -// func (s *sqsClientMock) CreateQueue(input *sqs.CreateQueueInput) (*sqs.CreateQueueOutput, error) { -// if s.CreateQueueF != nil { -// return s.CreateQueueF(input) -// } -// return nil, utils.ErrInvalidPath -// } +func (s *sqsClientMock) CreateQueue(input *sqs.CreateQueueInput) (*sqs.CreateQueueOutput, error) { + if s.CreateQueueF != nil { + return s.CreateQueueF(input) + } + return nil, utils.ErrInvalidPath +} func TestSQSERReadMsg(t *testing.T) { cfg := config.NewDefaultCGRConfig() @@ -687,3 +687,99 @@ func TestSQSERGetQueueURL(t *testing.T) { t.Error(err) } } + +func TestSQSERGetQueueURLWithClient(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &SQSER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + getQueueUrl := func(input *sqs.GetQueueUrlInput) (*sqs.GetQueueUrlOutput, error) { + output := &sqs.GetQueueUrlOutput{ + QueueUrl: utils.StringPointer("queueURL"), + } + return output, nil + } + scv := &sqsClientMock{ + GetQueueUrlF: getQueueUrl, + } + // rdr.queueURL = utils.StringPointer("queueURL") + if err := rdr.getQueueURLWithClient(scv); err != nil { + t.Error(err) + } + if !reflect.DeepEqual(rdr.queueURL, utils.StringPointer("queueURL")) { + t.Errorf("Expected %v but received %v", "queueURL", rdr.queueURL) + } +} + +type awserrMock struct { + error +} + +func (awserrMock) Code() string { + return sqs.ErrCodeQueueDoesNotExist +} + +func (awserrMock) Message() string { + return "" +} + +func (awserrMock) OrigErr() error { + return utils.ErrNotImplemented +} + +func TestSQSERGetQueueURLWithClient2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &SQSER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + getQueueUrl := func(input *sqs.GetQueueUrlInput) (output *sqs.GetQueueUrlOutput, err error) { + output = &sqs.GetQueueUrlOutput{ + QueueUrl: utils.StringPointer("queueURL"), + } + aerr := &awserrMock{} + return output, aerr + } + createQueue := func(input *sqs.CreateQueueInput) (*sqs.CreateQueueOutput, error) { + output := &sqs.CreateQueueOutput{ + QueueUrl: utils.StringPointer("queueURL"), + } + return output, nil + } + scv := &sqsClientMock{ + GetQueueUrlF: getQueueUrl, + CreateQueueF: createQueue, + } + // rdr.queueURL = utils.StringPointer("queueURL") + if err := rdr.getQueueURLWithClient(scv); err != nil { + t.Error(err) + } + if !reflect.DeepEqual(rdr.queueURL, utils.StringPointer("queueURL")) { + t.Errorf("Expected %v but received %v", "queueURL", rdr.queueURL) + } +}