From c86cbb71550b6d19487aebf5ac4a0e4dc16425e8 Mon Sep 17 00:00:00 2001 From: nickolasdaniel Date: Thu, 29 Apr 2021 14:44:19 +0300 Subject: [PATCH] Coverage tests for ers --- ers/filefwv.go | 4 +- ers/filefwv_it_test.go | 284 +++++++++++++++++++++++++++++++++++++++++ ers/s3.go | 1 - ers/s3_test.go | 282 ++++++++++++++++++++++++++++++++++++++++ ers/sqs_test.go | 60 +++++++++ 5 files changed, 628 insertions(+), 3 deletions(-) diff --git a/ers/filefwv.go b/ers/filefwv.go index 2d4cad7c6..3ac66eadb 100644 --- a/ers/filefwv.go +++ b/ers/filefwv.go @@ -299,7 +299,7 @@ func (rdr *FWVFileER) processTrailer(file *os.File, rowNr, evsPosted int, absPat rdr.fltrS, nil, rdr.trailerDP) // create an AgentRequest if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass { - return nil + return err } if err := agReq.SetFields(trailerFields); err != nil { utils.Logger.Warning( @@ -339,7 +339,7 @@ func (rdr *FWVFileER) createHeaderMap(record string, rowNr, evsPosted int, absPa rdr.fltrS, rdr.headerDP, nil) // create an AgentRequest if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass { - return nil + return err } if err := agReq.SetFields(hdrFields); err != nil { utils.Logger.Warning( diff --git a/ers/filefwv_it_test.go b/ers/filefwv_it_test.go index 71c2d0b46..7d911b33e 100644 --- a/ers/filefwv_it_test.go +++ b/ers/filefwv_it_test.go @@ -437,3 +437,287 @@ func TestFileFWVExit(t *testing.T) { } eR.rdrExit <- struct{}{} } + +func TestFileFWVProcessTrailer(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + data := engine.NewInternalDB(nil, nil, true) + dm := engine.NewDataManager(data, cfg.CacheCfg(), nil) + cfg.ERsCfg().Readers[0].ProcessedPath = "" + fltrs := engine.NewFilterS(cfg, nil, dm) + eR := &FWVFileER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/fwvErs/out", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + expEvent := &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + "OriginID": "testOriginID", + }, + APIOpts: map[string]interface{}{}, + } + eR.conReqs <- struct{}{} + filePath := "/tmp/TestFileFWVProcessTrailer/" + if err := os.MkdirAll(filePath, 0777); err != nil { + t.Error(err) + } + file, err := os.Create(path.Join(filePath, "file1.txt")) + if err != nil { + t.Error(err) + } + trailerFields := []*config.FCTemplate{ + { + Tag: "OriginId", + Path: "*cgreq.OriginID", + Type: utils.MetaConstant, + Value: config.NewRSRParsersMustCompile("testOriginID", utils.InfieldSep), + }, + } + eR.Config().Fields = trailerFields + eR.Config().Fields[0].ComputePath() + if err := eR.processTrailer(file, 0, 0, "/tmp/fwvErs/out", trailerFields); err != nil { + t.Error(err) + } + select { + case data := <-eR.rdrEvents: + expEvent.ID = data.cgrEvent.ID + expEvent.Time = data.cgrEvent.Time + if !reflect.DeepEqual(data.cgrEvent, expEvent) { + t.Errorf("Expected %v but received %v", utils.ToJSON(expEvent), utils.ToJSON(data.cgrEvent)) + } + case <-time.After(50 * time.Millisecond): + t.Error("Time limit exceeded") + } + if err := os.RemoveAll(filePath); err != nil { + t.Error(err) + } +} + +func TestFileFWVProcessTrailerError1(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + data := engine.NewInternalDB(nil, nil, true) + dm := engine.NewDataManager(data, cfg.CacheCfg(), nil) + cfg.ERsCfg().Readers[0].ProcessedPath = "" + fltrs := engine.NewFilterS(cfg, nil, dm) + eR := &FWVFileER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/fwvErs/out", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + filePath := "/tmp/TestFileFWVProcessTrailer/" + if err := os.MkdirAll(filePath, 0777); err != nil { + t.Error(err) + } + file, err := os.Create(path.Join(filePath, "file1.txt")) + if err != nil { + t.Error(err) + } + trailerFields := []*config.FCTemplate{ + {}, + } + errExpect := "unsupported type: <>" + if err := eR.processTrailer(file, 0, 0, "/tmp/fwvErs/out", trailerFields); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } + if err := os.RemoveAll(filePath); err != nil { + t.Error(err) + } +} + +func TestFileFWVProcessTrailerError2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + data := engine.NewInternalDB(nil, nil, true) + dm := engine.NewDataManager(data, cfg.CacheCfg(), nil) + fltrs := engine.NewFilterS(cfg, nil, dm) + eR := &FWVFileER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/fwvErs/out", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + eR.Config().Tenant = config.RSRParsers{ + { + Rules: "cgrates.org", + }, + } + filePath := "/tmp/TestFileFWVProcessTrailer/" + if err := os.MkdirAll(filePath, 0777); err != nil { + t.Error(err) + } + file, err := os.Create(path.Join(filePath, "file1.txt")) + if err != nil { + t.Error(err) + } + + trailerFields := []*config.FCTemplate{ + { + Tag: "OriginId", + Path: "*cgreq.OriginID", + Type: utils.MetaConstant, + Value: config.NewRSRParsersMustCompile("testOriginID", utils.InfieldSep), + }, + } + + // + eR.Config().Filters = []string{"Filter1"} + errExpect := "NOT_FOUND:Filter1" + if err := eR.processTrailer(file, 0, 0, "/tmp/fwvErs/out", trailerFields); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } +} + +func TestFileFWVProcessTrailerError3(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + data := engine.NewInternalDB(nil, nil, true) + dm := engine.NewDataManager(data, cfg.CacheCfg(), nil) + fltrs := engine.NewFilterS(cfg, nil, dm) + eR := &FWVFileER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/fwvErs/out", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + trailerFields := []*config.FCTemplate{ + { + Tag: "OriginId", + Path: "*cgreq.OriginID", + Type: utils.MetaConstant, + Value: config.NewRSRParsersMustCompile("testOriginID", utils.InfieldSep), + }, + } + var file *os.File + errExp := "invalid argument" + if err := eR.processTrailer(file, 0, 0, "/tmp/fwvErs/out", trailerFields); err == nil || err.Error() != errExp { + t.Errorf("Expected %v but received %v", errExp, err) + } +} + +func TestFileFWVCreateHeaderMap(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + data := engine.NewInternalDB(nil, nil, true) + dm := engine.NewDataManager(data, cfg.CacheCfg(), nil) + fltrs := engine.NewFilterS(cfg, nil, dm) + eR := &FWVFileER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/fwvErs/out", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + expEvent := &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + "OriginID": "testOriginID", + }, + APIOpts: map[string]interface{}{}, + } + hdrFields := []*config.FCTemplate{ + { + Tag: "OriginId", + Path: "*cgreq.OriginID", + Type: utils.MetaConstant, + Value: config.NewRSRParsersMustCompile("testOriginID", utils.InfieldSep), + }, + } + eR.Config().Fields = hdrFields + eR.Config().Fields[0].ComputePath() + record := "testRecord" + if err := eR.createHeaderMap(record, 0, 0, "/tmp/fwvErs/out", hdrFields); err != nil { + t.Error(err) + } + select { + case data := <-eR.rdrEvents: + expEvent.ID = data.cgrEvent.ID + expEvent.Time = data.cgrEvent.Time + if !reflect.DeepEqual(data.cgrEvent, expEvent) { + t.Errorf("Expected %v but received %v", utils.ToJSON(expEvent), utils.ToJSON(data.cgrEvent)) + } + case <-time.After(50 * time.Millisecond): + t.Error("Time limit exceeded") + } +} + +func TestFileFWVCreateHeaderMapError1(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + data := engine.NewInternalDB(nil, nil, true) + dm := engine.NewDataManager(data, cfg.CacheCfg(), nil) + fltrs := engine.NewFilterS(cfg, nil, dm) + eR := &FWVFileER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/fwvErs/out", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + trailerFields := []*config.FCTemplate{ + {}, + } + record := "testRecord" + errExpect := "unsupported type: <>" + if err := eR.createHeaderMap(record, 0, 0, "/tmp/fwvErs/out", trailerFields); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } +} + +func TestFileFWVCreateHeaderMapError2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + data := engine.NewInternalDB(nil, nil, true) + dm := engine.NewDataManager(data, cfg.CacheCfg(), nil) + fltrs := engine.NewFilterS(cfg, nil, dm) + eR := &FWVFileER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/fwvErs/out", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + record := "testRecord" + trailerFields := []*config.FCTemplate{ + { + Tag: "OriginId", + Path: "*cgreq.OriginID", + Type: utils.MetaConstant, + Value: config.NewRSRParsersMustCompile("testOriginID", utils.InfieldSep), + }, + } + + // + eR.Config().Filters = []string{"Filter1"} + errExpect := "NOT_FOUND:Filter1" + if err := eR.createHeaderMap(record, 0, 0, "/tmp/fwvErs/out", trailerFields); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } +} diff --git a/ers/s3.go b/ers/s3.go index c051752ce..1b31841ba 100644 --- a/ers/s3.go +++ b/ers/s3.go @@ -214,7 +214,6 @@ func (rdr *S3ER) readMsg(scv s3Client, key string) (err error) { } obj, err := scv.GetObject(&s3.GetObjectInput{Bucket: &rdr.queueID, Key: &key}) - fmt.Println(obj) if err != nil { rdr.rdrErr <- err return diff --git a/ers/s3_test.go b/ers/s3_test.go index 403d3439d..eb3a53a31 100644 --- a/ers/s3_test.go +++ b/ers/s3_test.go @@ -19,10 +19,13 @@ along with this program. If not, see package ers import ( + "bytes" + "io" "reflect" "testing" "time" + "github.com/aws/aws-sdk-go/service/s3" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -264,3 +267,282 @@ func TestS3ERIsClosed(t *testing.T) { t.Errorf("Expected %v but received %v", true, false) } } + +type s3ClientMock struct { + ListObjectsV2PagesF func(input *s3.ListObjectsV2Input, fn func(*s3.ListObjectsV2Output, bool) bool) error + GetObjectF func(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) + DeleteObjectF func(input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) +} + +func (s *s3ClientMock) ListObjectsV2Pages(input *s3.ListObjectsV2Input, fn func(*s3.ListObjectsV2Output, bool) bool) error { + if s.ListObjectsV2PagesF != nil { + return s.ListObjectsV2PagesF(input, fn) + } + return utils.ErrNotFound +} + +func (s *s3ClientMock) GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) { + if s.GetObjectF != nil { + return s.GetObjectF(input) + } + return nil, utils.ErrNotImplemented +} + +func (s *s3ClientMock) DeleteObject(input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) { + // return nil, nil + if s.DeleteObjectF != nil { + return s.DeleteObjectF(input) + } + return nil, utils.ErrInvalidPath +} + +func TestS3ERReadLoop(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &S3ER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + listObjects := func(input *s3.ListObjectsV2Input, fn func(*s3.ListObjectsV2Output, bool) bool) error { + return nil + } + scv := &s3ClientMock{ + ListObjectsV2PagesF: listObjects, + } + if err := rdr.readLoop(scv); err != nil { + t.Error(err) + } +} + +func TestS3ERReadLoopIsClosed(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &S3ER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + listObjects := func(input *s3.ListObjectsV2Input, fn func(*s3.ListObjectsV2Output, bool) bool) error { + return nil + } + scv := &s3ClientMock{ + ListObjectsV2PagesF: listObjects, + } + rdr.rdrExit <- struct{}{} + if err := rdr.readLoop(scv); err != nil { + t.Error(err) + } +} + +func TestS3ERReadMsg(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &S3ER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + // rdr.poster = engine.NewS3Poster(rdr.Config().SourcePath, 1, make(map[string]interface{})) + rdr.Config().SourcePath = rdr.awsRegion + rdr.Config().ConcurrentReqs = -1 + rdr.Config().Fields = []*config.FCTemplate{ + { + Tag: "Tor", + Type: utils.MetaConstant, + Value: config.NewRSRParsersMustCompile("*voice", utils.InfieldSep), + Path: "*cgreq.ToR", + }, + } + rdr.Config().Fields[0].ComputePath() + + listObjects := func(input *s3.ListObjectsV2Input, fn func(*s3.ListObjectsV2Output, bool) bool) error { + return nil + } + getObject := func(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) { + return &s3.GetObjectOutput{Body: io.NopCloser(bytes.NewBuffer([]byte(`{"key":"value"}`)))}, nil + } + deleteObject := func(input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) { + return nil, nil + } + scv := &s3ClientMock{ + ListObjectsV2PagesF: listObjects, + GetObjectF: getObject, + DeleteObjectF: deleteObject, + } + if err := rdr.readMsg(scv, "AWSKey"); err != nil { + t.Error(err) + } +} + +func TestS3ERReadMsgError1(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &S3ER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + cap: make(chan struct{}, 1), + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + rdr.Config().ConcurrentReqs = 1 + listObjects := func(input *s3.ListObjectsV2Input, fn func(*s3.ListObjectsV2Output, bool) bool) error { + return nil + } + getObject := func(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) { + return &s3.GetObjectOutput{Body: io.NopCloser(bytes.NewBuffer([]byte(`{"key":"value"}`)))}, nil + } + deleteObject := func(input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) { + return nil, nil + } + scv := &s3ClientMock{ + ListObjectsV2PagesF: listObjects, + GetObjectF: getObject, + DeleteObjectF: deleteObject, + } + rdr.cap <- struct{}{} + errExp := "NOT_FOUND:ToR" + if err := rdr.readMsg(scv, "AWSKey"); err == nil || err.Error() != errExp { + t.Errorf("Expected %v but received %v", errExp, err) + } +} + +func TestS3ERReadMsgError2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &S3ER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + cap: make(chan struct{}, 1), + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + rdr.Config().ConcurrentReqs = 1 + scv := &s3ClientMock{} + rdr.cap <- struct{}{} + rdr.rdrExit <- struct{}{} + if err := rdr.readMsg(scv, "AWSKey"); err != nil { + t.Error(err) + } +} + +func TestS3ERReadMsgError3(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &S3ER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + cap: make(chan struct{}, 1), + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + rdr.Config().ConcurrentReqs = -1 + scv := &s3ClientMock{} + errExp := "NOT_IMPLEMENTED" + if err := rdr.readMsg(scv, "AWSKey"); err == nil || err.Error() != errExp { + t.Errorf("Expected %v but received %v", errExp, err) + } +} + +func TestS3ERReadMsgError4(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &S3ER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + cap: make(chan struct{}, 1), + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + rdr.Config().SourcePath = rdr.awsRegion + rdr.Config().ConcurrentReqs = -1 + rdr.Config().Fields = []*config.FCTemplate{ + { + Tag: "Tor", + Type: utils.MetaConstant, + Value: config.NewRSRParsersMustCompile("*voice", utils.InfieldSep), + Path: "*cgreq.ToR", + }, + } + rdr.Config().Fields[0].ComputePath() + listObjects := func(input *s3.ListObjectsV2Input, fn func(*s3.ListObjectsV2Output, bool) bool) error { + return nil + } + getObject := func(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) { + return &s3.GetObjectOutput{Body: io.NopCloser(bytes.NewBuffer([]byte(`{"key":"value"}`)))}, nil + } + deleteObject := func(input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) { + return nil, utils.ErrInvalidPath + } + scv := &s3ClientMock{ + ListObjectsV2PagesF: listObjects, + GetObjectF: getObject, + DeleteObjectF: deleteObject, + } + errExp := "INVALID_PATH" + if err := rdr.readMsg(scv, "AWSKey"); err == nil || err.Error() != errExp { + t.Errorf("Expected %v but received %v", errExp, err) + } +} diff --git a/ers/sqs_test.go b/ers/sqs_test.go index 0e3c5fb1d..8558b26b1 100644 --- a/ers/sqs_test.go +++ b/ers/sqs_test.go @@ -23,6 +23,9 @@ import ( "testing" "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sqs" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -323,3 +326,60 @@ func TestSQSERIsClosed(t *testing.T) { t.Errorf("Expected %v but received %v", true, false) } } + +func TestSQSERReadMsgError1(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &SQSER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + // queueURL: utils.StringPointer("url"), + session: nil, + poster: nil, + } + awsCfg := aws.Config{Endpoint: aws.String(rdr.Config().SourcePath)} + rdr.session, _ = session.NewSessionWithOptions( + session.Options{ + Config: awsCfg, + }, + ) + // rdrEvents := make(chan *erEvent, 1) + // rdrErr := make(chan error, 1) + // rdrExit := make(chan struct{}, 1) + + // sqsRdr, err := NewSQSER(cfg, 1, rdrEvents, + // rdrErr, new(engine.FilterS), rdrExit) + // if err != nil { + // t.Error(err) + // } + // rdr := sqsRdr.(*SQSER) + rdr.Config().ConcurrentReqs = -1 + rdr.Config().Fields = []*config.FCTemplate{ + { + Tag: "Tor", + Type: utils.MetaConstant, + Value: config.NewRSRParsersMustCompile("*voice", utils.InfieldSep), + Path: "*cgreq.ToR", + }, + } + rdr.Config().Fields[0].ComputePath() + scv := &sqs.SQS{} + msg := &sqs.Message{ + Body: utils.StringPointer(`{"msgBody":"BODY"`), + MessageId: utils.StringPointer(`{"msgId":"MESSAGE"}`), + ReceiptHandle: utils.StringPointer(`{"msgReceiptHandle":"RECEIPT_HANDLE"}`), + } + errExp := "unexpected end of JSON input" + if err := rdr.readMsg(scv, msg); err == nil || err.Error() != errExp { + t.Errorf("Expected %v but received %v", errExp, err) + } +}