mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Coverage tests for ers
This commit is contained in:
committed by
Dan Christian Bogos
parent
3ef60c98d4
commit
c86cbb7155
@@ -299,7 +299,7 @@ func (rdr *FWVFileER) processTrailer(file *os.File, rowNr, evsPosted int, absPat
|
||||
rdr.fltrS, nil, rdr.trailerDP) // create an AgentRequest
|
||||
if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
|
||||
agReq); err != nil || !pass {
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
if err := agReq.SetFields(trailerFields); err != nil {
|
||||
utils.Logger.Warning(
|
||||
@@ -339,7 +339,7 @@ func (rdr *FWVFileER) createHeaderMap(record string, rowNr, evsPosted int, absPa
|
||||
rdr.fltrS, rdr.headerDP, nil) // create an AgentRequest
|
||||
if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
|
||||
agReq); err != nil || !pass {
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
if err := agReq.SetFields(hdrFields); err != nil {
|
||||
utils.Logger.Warning(
|
||||
|
||||
@@ -437,3 +437,287 @@ func TestFileFWVExit(t *testing.T) {
|
||||
}
|
||||
eR.rdrExit <- struct{}{}
|
||||
}
|
||||
|
||||
func TestFileFWVProcessTrailer(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
data := engine.NewInternalDB(nil, nil, true)
|
||||
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
|
||||
cfg.ERsCfg().Readers[0].ProcessedPath = ""
|
||||
fltrs := engine.NewFilterS(cfg, nil, dm)
|
||||
eR := &FWVFileER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: 0,
|
||||
fltrS: fltrs,
|
||||
rdrDir: "/tmp/fwvErs/out",
|
||||
rdrEvents: make(chan *erEvent, 1),
|
||||
rdrError: make(chan error, 1),
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
expEvent := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
Event: map[string]interface{}{
|
||||
"OriginID": "testOriginID",
|
||||
},
|
||||
APIOpts: map[string]interface{}{},
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
filePath := "/tmp/TestFileFWVProcessTrailer/"
|
||||
if err := os.MkdirAll(filePath, 0777); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
file, err := os.Create(path.Join(filePath, "file1.txt"))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
trailerFields := []*config.FCTemplate{
|
||||
{
|
||||
Tag: "OriginId",
|
||||
Path: "*cgreq.OriginID",
|
||||
Type: utils.MetaConstant,
|
||||
Value: config.NewRSRParsersMustCompile("testOriginID", utils.InfieldSep),
|
||||
},
|
||||
}
|
||||
eR.Config().Fields = trailerFields
|
||||
eR.Config().Fields[0].ComputePath()
|
||||
if err := eR.processTrailer(file, 0, 0, "/tmp/fwvErs/out", trailerFields); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
select {
|
||||
case data := <-eR.rdrEvents:
|
||||
expEvent.ID = data.cgrEvent.ID
|
||||
expEvent.Time = data.cgrEvent.Time
|
||||
if !reflect.DeepEqual(data.cgrEvent, expEvent) {
|
||||
t.Errorf("Expected %v but received %v", utils.ToJSON(expEvent), utils.ToJSON(data.cgrEvent))
|
||||
}
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
t.Error("Time limit exceeded")
|
||||
}
|
||||
if err := os.RemoveAll(filePath); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFileFWVProcessTrailerError1(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
data := engine.NewInternalDB(nil, nil, true)
|
||||
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
|
||||
cfg.ERsCfg().Readers[0].ProcessedPath = ""
|
||||
fltrs := engine.NewFilterS(cfg, nil, dm)
|
||||
eR := &FWVFileER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: 0,
|
||||
fltrS: fltrs,
|
||||
rdrDir: "/tmp/fwvErs/out",
|
||||
rdrEvents: make(chan *erEvent, 1),
|
||||
rdrError: make(chan error, 1),
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
filePath := "/tmp/TestFileFWVProcessTrailer/"
|
||||
if err := os.MkdirAll(filePath, 0777); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
file, err := os.Create(path.Join(filePath, "file1.txt"))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
trailerFields := []*config.FCTemplate{
|
||||
{},
|
||||
}
|
||||
errExpect := "unsupported type: <>"
|
||||
if err := eR.processTrailer(file, 0, 0, "/tmp/fwvErs/out", trailerFields); err == nil || err.Error() != errExpect {
|
||||
t.Errorf("Expected %v but received %v", errExpect, err)
|
||||
}
|
||||
if err := os.RemoveAll(filePath); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFileFWVProcessTrailerError2(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
data := engine.NewInternalDB(nil, nil, true)
|
||||
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
|
||||
fltrs := engine.NewFilterS(cfg, nil, dm)
|
||||
eR := &FWVFileER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: 0,
|
||||
fltrS: fltrs,
|
||||
rdrDir: "/tmp/fwvErs/out",
|
||||
rdrEvents: make(chan *erEvent, 1),
|
||||
rdrError: make(chan error, 1),
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
eR.Config().Tenant = config.RSRParsers{
|
||||
{
|
||||
Rules: "cgrates.org",
|
||||
},
|
||||
}
|
||||
filePath := "/tmp/TestFileFWVProcessTrailer/"
|
||||
if err := os.MkdirAll(filePath, 0777); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
file, err := os.Create(path.Join(filePath, "file1.txt"))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
trailerFields := []*config.FCTemplate{
|
||||
{
|
||||
Tag: "OriginId",
|
||||
Path: "*cgreq.OriginID",
|
||||
Type: utils.MetaConstant,
|
||||
Value: config.NewRSRParsersMustCompile("testOriginID", utils.InfieldSep),
|
||||
},
|
||||
}
|
||||
|
||||
//
|
||||
eR.Config().Filters = []string{"Filter1"}
|
||||
errExpect := "NOT_FOUND:Filter1"
|
||||
if err := eR.processTrailer(file, 0, 0, "/tmp/fwvErs/out", trailerFields); err == nil || err.Error() != errExpect {
|
||||
t.Errorf("Expected %v but received %v", errExpect, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFileFWVProcessTrailerError3(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
data := engine.NewInternalDB(nil, nil, true)
|
||||
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
|
||||
fltrs := engine.NewFilterS(cfg, nil, dm)
|
||||
eR := &FWVFileER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: 0,
|
||||
fltrS: fltrs,
|
||||
rdrDir: "/tmp/fwvErs/out",
|
||||
rdrEvents: make(chan *erEvent, 1),
|
||||
rdrError: make(chan error, 1),
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
trailerFields := []*config.FCTemplate{
|
||||
{
|
||||
Tag: "OriginId",
|
||||
Path: "*cgreq.OriginID",
|
||||
Type: utils.MetaConstant,
|
||||
Value: config.NewRSRParsersMustCompile("testOriginID", utils.InfieldSep),
|
||||
},
|
||||
}
|
||||
var file *os.File
|
||||
errExp := "invalid argument"
|
||||
if err := eR.processTrailer(file, 0, 0, "/tmp/fwvErs/out", trailerFields); err == nil || err.Error() != errExp {
|
||||
t.Errorf("Expected %v but received %v", errExp, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFileFWVCreateHeaderMap(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
data := engine.NewInternalDB(nil, nil, true)
|
||||
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
|
||||
fltrs := engine.NewFilterS(cfg, nil, dm)
|
||||
eR := &FWVFileER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: 0,
|
||||
fltrS: fltrs,
|
||||
rdrDir: "/tmp/fwvErs/out",
|
||||
rdrEvents: make(chan *erEvent, 1),
|
||||
rdrError: make(chan error, 1),
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
expEvent := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
Event: map[string]interface{}{
|
||||
"OriginID": "testOriginID",
|
||||
},
|
||||
APIOpts: map[string]interface{}{},
|
||||
}
|
||||
hdrFields := []*config.FCTemplate{
|
||||
{
|
||||
Tag: "OriginId",
|
||||
Path: "*cgreq.OriginID",
|
||||
Type: utils.MetaConstant,
|
||||
Value: config.NewRSRParsersMustCompile("testOriginID", utils.InfieldSep),
|
||||
},
|
||||
}
|
||||
eR.Config().Fields = hdrFields
|
||||
eR.Config().Fields[0].ComputePath()
|
||||
record := "testRecord"
|
||||
if err := eR.createHeaderMap(record, 0, 0, "/tmp/fwvErs/out", hdrFields); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
select {
|
||||
case data := <-eR.rdrEvents:
|
||||
expEvent.ID = data.cgrEvent.ID
|
||||
expEvent.Time = data.cgrEvent.Time
|
||||
if !reflect.DeepEqual(data.cgrEvent, expEvent) {
|
||||
t.Errorf("Expected %v but received %v", utils.ToJSON(expEvent), utils.ToJSON(data.cgrEvent))
|
||||
}
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
t.Error("Time limit exceeded")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFileFWVCreateHeaderMapError1(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
data := engine.NewInternalDB(nil, nil, true)
|
||||
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
|
||||
fltrs := engine.NewFilterS(cfg, nil, dm)
|
||||
eR := &FWVFileER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: 0,
|
||||
fltrS: fltrs,
|
||||
rdrDir: "/tmp/fwvErs/out",
|
||||
rdrEvents: make(chan *erEvent, 1),
|
||||
rdrError: make(chan error, 1),
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
trailerFields := []*config.FCTemplate{
|
||||
{},
|
||||
}
|
||||
record := "testRecord"
|
||||
errExpect := "unsupported type: <>"
|
||||
if err := eR.createHeaderMap(record, 0, 0, "/tmp/fwvErs/out", trailerFields); err == nil || err.Error() != errExpect {
|
||||
t.Errorf("Expected %v but received %v", errExpect, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFileFWVCreateHeaderMapError2(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
data := engine.NewInternalDB(nil, nil, true)
|
||||
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
|
||||
fltrs := engine.NewFilterS(cfg, nil, dm)
|
||||
eR := &FWVFileER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: 0,
|
||||
fltrS: fltrs,
|
||||
rdrDir: "/tmp/fwvErs/out",
|
||||
rdrEvents: make(chan *erEvent, 1),
|
||||
rdrError: make(chan error, 1),
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
record := "testRecord"
|
||||
trailerFields := []*config.FCTemplate{
|
||||
{
|
||||
Tag: "OriginId",
|
||||
Path: "*cgreq.OriginID",
|
||||
Type: utils.MetaConstant,
|
||||
Value: config.NewRSRParsersMustCompile("testOriginID", utils.InfieldSep),
|
||||
},
|
||||
}
|
||||
|
||||
//
|
||||
eR.Config().Filters = []string{"Filter1"}
|
||||
errExpect := "NOT_FOUND:Filter1"
|
||||
if err := eR.createHeaderMap(record, 0, 0, "/tmp/fwvErs/out", trailerFields); err == nil || err.Error() != errExpect {
|
||||
t.Errorf("Expected %v but received %v", errExpect, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -214,7 +214,6 @@ func (rdr *S3ER) readMsg(scv s3Client, key string) (err error) {
|
||||
}
|
||||
|
||||
obj, err := scv.GetObject(&s3.GetObjectInput{Bucket: &rdr.queueID, Key: &key})
|
||||
fmt.Println(obj)
|
||||
if err != nil {
|
||||
rdr.rdrErr <- err
|
||||
return
|
||||
|
||||
282
ers/s3_test.go
282
ers/s3_test.go
@@ -19,10 +19,13 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package ers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -264,3 +267,282 @@ func TestS3ERIsClosed(t *testing.T) {
|
||||
t.Errorf("Expected %v but received %v", true, false)
|
||||
}
|
||||
}
|
||||
|
||||
type s3ClientMock struct {
|
||||
ListObjectsV2PagesF func(input *s3.ListObjectsV2Input, fn func(*s3.ListObjectsV2Output, bool) bool) error
|
||||
GetObjectF func(input *s3.GetObjectInput) (*s3.GetObjectOutput, error)
|
||||
DeleteObjectF func(input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error)
|
||||
}
|
||||
|
||||
func (s *s3ClientMock) ListObjectsV2Pages(input *s3.ListObjectsV2Input, fn func(*s3.ListObjectsV2Output, bool) bool) error {
|
||||
if s.ListObjectsV2PagesF != nil {
|
||||
return s.ListObjectsV2PagesF(input, fn)
|
||||
}
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
|
||||
func (s *s3ClientMock) GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
|
||||
if s.GetObjectF != nil {
|
||||
return s.GetObjectF(input)
|
||||
}
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
func (s *s3ClientMock) DeleteObject(input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) {
|
||||
// return nil, nil
|
||||
if s.DeleteObjectF != nil {
|
||||
return s.DeleteObjectF(input)
|
||||
}
|
||||
return nil, utils.ErrInvalidPath
|
||||
}
|
||||
|
||||
func TestS3ERReadLoop(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
rdr := &S3ER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: 0,
|
||||
fltrS: new(engine.FilterS),
|
||||
rdrEvents: make(chan *erEvent, 1),
|
||||
rdrExit: make(chan struct{}, 1),
|
||||
rdrErr: make(chan error, 1),
|
||||
cap: nil,
|
||||
awsRegion: "us-east-2",
|
||||
awsID: "AWSId",
|
||||
awsKey: "AWSAccessKeyId",
|
||||
awsToken: "",
|
||||
queueID: "cgrates_cdrs",
|
||||
session: nil,
|
||||
poster: nil,
|
||||
}
|
||||
listObjects := func(input *s3.ListObjectsV2Input, fn func(*s3.ListObjectsV2Output, bool) bool) error {
|
||||
return nil
|
||||
}
|
||||
scv := &s3ClientMock{
|
||||
ListObjectsV2PagesF: listObjects,
|
||||
}
|
||||
if err := rdr.readLoop(scv); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestS3ERReadLoopIsClosed(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
rdr := &S3ER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: 0,
|
||||
fltrS: new(engine.FilterS),
|
||||
rdrEvents: make(chan *erEvent, 1),
|
||||
rdrExit: make(chan struct{}, 1),
|
||||
rdrErr: make(chan error, 1),
|
||||
cap: nil,
|
||||
awsRegion: "us-east-2",
|
||||
awsID: "AWSId",
|
||||
awsKey: "AWSAccessKeyId",
|
||||
awsToken: "",
|
||||
queueID: "cgrates_cdrs",
|
||||
session: nil,
|
||||
poster: nil,
|
||||
}
|
||||
listObjects := func(input *s3.ListObjectsV2Input, fn func(*s3.ListObjectsV2Output, bool) bool) error {
|
||||
return nil
|
||||
}
|
||||
scv := &s3ClientMock{
|
||||
ListObjectsV2PagesF: listObjects,
|
||||
}
|
||||
rdr.rdrExit <- struct{}{}
|
||||
if err := rdr.readLoop(scv); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestS3ERReadMsg(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
rdr := &S3ER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: 0,
|
||||
fltrS: new(engine.FilterS),
|
||||
rdrEvents: make(chan *erEvent, 1),
|
||||
rdrExit: make(chan struct{}, 1),
|
||||
rdrErr: make(chan error, 1),
|
||||
cap: nil,
|
||||
awsRegion: "us-east-2",
|
||||
awsID: "AWSId",
|
||||
awsKey: "AWSAccessKeyId",
|
||||
awsToken: "",
|
||||
queueID: "cgrates_cdrs",
|
||||
session: nil,
|
||||
poster: nil,
|
||||
}
|
||||
// rdr.poster = engine.NewS3Poster(rdr.Config().SourcePath, 1, make(map[string]interface{}))
|
||||
rdr.Config().SourcePath = rdr.awsRegion
|
||||
rdr.Config().ConcurrentReqs = -1
|
||||
rdr.Config().Fields = []*config.FCTemplate{
|
||||
{
|
||||
Tag: "Tor",
|
||||
Type: utils.MetaConstant,
|
||||
Value: config.NewRSRParsersMustCompile("*voice", utils.InfieldSep),
|
||||
Path: "*cgreq.ToR",
|
||||
},
|
||||
}
|
||||
rdr.Config().Fields[0].ComputePath()
|
||||
|
||||
listObjects := func(input *s3.ListObjectsV2Input, fn func(*s3.ListObjectsV2Output, bool) bool) error {
|
||||
return nil
|
||||
}
|
||||
getObject := func(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
|
||||
return &s3.GetObjectOutput{Body: io.NopCloser(bytes.NewBuffer([]byte(`{"key":"value"}`)))}, nil
|
||||
}
|
||||
deleteObject := func(input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) {
|
||||
return nil, nil
|
||||
}
|
||||
scv := &s3ClientMock{
|
||||
ListObjectsV2PagesF: listObjects,
|
||||
GetObjectF: getObject,
|
||||
DeleteObjectF: deleteObject,
|
||||
}
|
||||
if err := rdr.readMsg(scv, "AWSKey"); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestS3ERReadMsgError1(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
rdr := &S3ER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: 0,
|
||||
fltrS: new(engine.FilterS),
|
||||
rdrEvents: make(chan *erEvent, 1),
|
||||
rdrExit: make(chan struct{}, 1),
|
||||
rdrErr: make(chan error, 1),
|
||||
cap: make(chan struct{}, 1),
|
||||
awsRegion: "us-east-2",
|
||||
awsID: "AWSId",
|
||||
awsKey: "AWSAccessKeyId",
|
||||
awsToken: "",
|
||||
queueID: "cgrates_cdrs",
|
||||
session: nil,
|
||||
poster: nil,
|
||||
}
|
||||
rdr.Config().ConcurrentReqs = 1
|
||||
listObjects := func(input *s3.ListObjectsV2Input, fn func(*s3.ListObjectsV2Output, bool) bool) error {
|
||||
return nil
|
||||
}
|
||||
getObject := func(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
|
||||
return &s3.GetObjectOutput{Body: io.NopCloser(bytes.NewBuffer([]byte(`{"key":"value"}`)))}, nil
|
||||
}
|
||||
deleteObject := func(input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) {
|
||||
return nil, nil
|
||||
}
|
||||
scv := &s3ClientMock{
|
||||
ListObjectsV2PagesF: listObjects,
|
||||
GetObjectF: getObject,
|
||||
DeleteObjectF: deleteObject,
|
||||
}
|
||||
rdr.cap <- struct{}{}
|
||||
errExp := "NOT_FOUND:ToR"
|
||||
if err := rdr.readMsg(scv, "AWSKey"); err == nil || err.Error() != errExp {
|
||||
t.Errorf("Expected %v but received %v", errExp, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestS3ERReadMsgError2(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
rdr := &S3ER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: 0,
|
||||
fltrS: new(engine.FilterS),
|
||||
rdrEvents: make(chan *erEvent, 1),
|
||||
rdrExit: make(chan struct{}, 1),
|
||||
rdrErr: make(chan error, 1),
|
||||
cap: make(chan struct{}, 1),
|
||||
awsRegion: "us-east-2",
|
||||
awsID: "AWSId",
|
||||
awsKey: "AWSAccessKeyId",
|
||||
awsToken: "",
|
||||
queueID: "cgrates_cdrs",
|
||||
session: nil,
|
||||
poster: nil,
|
||||
}
|
||||
rdr.Config().ConcurrentReqs = 1
|
||||
scv := &s3ClientMock{}
|
||||
rdr.cap <- struct{}{}
|
||||
rdr.rdrExit <- struct{}{}
|
||||
if err := rdr.readMsg(scv, "AWSKey"); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestS3ERReadMsgError3(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
rdr := &S3ER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: 0,
|
||||
fltrS: new(engine.FilterS),
|
||||
rdrEvents: make(chan *erEvent, 1),
|
||||
rdrExit: make(chan struct{}, 1),
|
||||
rdrErr: make(chan error, 1),
|
||||
cap: make(chan struct{}, 1),
|
||||
awsRegion: "us-east-2",
|
||||
awsID: "AWSId",
|
||||
awsKey: "AWSAccessKeyId",
|
||||
awsToken: "",
|
||||
queueID: "cgrates_cdrs",
|
||||
session: nil,
|
||||
poster: nil,
|
||||
}
|
||||
rdr.Config().ConcurrentReqs = -1
|
||||
scv := &s3ClientMock{}
|
||||
errExp := "NOT_IMPLEMENTED"
|
||||
if err := rdr.readMsg(scv, "AWSKey"); err == nil || err.Error() != errExp {
|
||||
t.Errorf("Expected %v but received %v", errExp, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestS3ERReadMsgError4(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
rdr := &S3ER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: 0,
|
||||
fltrS: new(engine.FilterS),
|
||||
rdrEvents: make(chan *erEvent, 1),
|
||||
rdrExit: make(chan struct{}, 1),
|
||||
rdrErr: make(chan error, 1),
|
||||
cap: make(chan struct{}, 1),
|
||||
awsRegion: "us-east-2",
|
||||
awsID: "AWSId",
|
||||
awsKey: "AWSAccessKeyId",
|
||||
awsToken: "",
|
||||
queueID: "cgrates_cdrs",
|
||||
session: nil,
|
||||
poster: nil,
|
||||
}
|
||||
rdr.Config().SourcePath = rdr.awsRegion
|
||||
rdr.Config().ConcurrentReqs = -1
|
||||
rdr.Config().Fields = []*config.FCTemplate{
|
||||
{
|
||||
Tag: "Tor",
|
||||
Type: utils.MetaConstant,
|
||||
Value: config.NewRSRParsersMustCompile("*voice", utils.InfieldSep),
|
||||
Path: "*cgreq.ToR",
|
||||
},
|
||||
}
|
||||
rdr.Config().Fields[0].ComputePath()
|
||||
listObjects := func(input *s3.ListObjectsV2Input, fn func(*s3.ListObjectsV2Output, bool) bool) error {
|
||||
return nil
|
||||
}
|
||||
getObject := func(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
|
||||
return &s3.GetObjectOutput{Body: io.NopCloser(bytes.NewBuffer([]byte(`{"key":"value"}`)))}, nil
|
||||
}
|
||||
deleteObject := func(input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) {
|
||||
return nil, utils.ErrInvalidPath
|
||||
}
|
||||
scv := &s3ClientMock{
|
||||
ListObjectsV2PagesF: listObjects,
|
||||
GetObjectF: getObject,
|
||||
DeleteObjectF: deleteObject,
|
||||
}
|
||||
errExp := "INVALID_PATH"
|
||||
if err := rdr.readMsg(scv, "AWSKey"); err == nil || err.Error() != errExp {
|
||||
t.Errorf("Expected %v but received %v", errExp, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,6 +23,9 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/sqs"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -323,3 +326,60 @@ func TestSQSERIsClosed(t *testing.T) {
|
||||
t.Errorf("Expected %v but received %v", true, false)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSQSERReadMsgError1(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
rdr := &SQSER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: 0,
|
||||
fltrS: new(engine.FilterS),
|
||||
rdrEvents: make(chan *erEvent, 1),
|
||||
rdrExit: make(chan struct{}, 1),
|
||||
rdrErr: make(chan error, 1),
|
||||
cap: nil,
|
||||
awsRegion: "us-east-2",
|
||||
awsID: "AWSId",
|
||||
awsKey: "AWSAccessKeyId",
|
||||
awsToken: "",
|
||||
queueID: "cgrates_cdrs",
|
||||
// queueURL: utils.StringPointer("url"),
|
||||
session: nil,
|
||||
poster: nil,
|
||||
}
|
||||
awsCfg := aws.Config{Endpoint: aws.String(rdr.Config().SourcePath)}
|
||||
rdr.session, _ = session.NewSessionWithOptions(
|
||||
session.Options{
|
||||
Config: awsCfg,
|
||||
},
|
||||
)
|
||||
// rdrEvents := make(chan *erEvent, 1)
|
||||
// rdrErr := make(chan error, 1)
|
||||
// rdrExit := make(chan struct{}, 1)
|
||||
|
||||
// sqsRdr, err := NewSQSER(cfg, 1, rdrEvents,
|
||||
// rdrErr, new(engine.FilterS), rdrExit)
|
||||
// if err != nil {
|
||||
// t.Error(err)
|
||||
// }
|
||||
// rdr := sqsRdr.(*SQSER)
|
||||
rdr.Config().ConcurrentReqs = -1
|
||||
rdr.Config().Fields = []*config.FCTemplate{
|
||||
{
|
||||
Tag: "Tor",
|
||||
Type: utils.MetaConstant,
|
||||
Value: config.NewRSRParsersMustCompile("*voice", utils.InfieldSep),
|
||||
Path: "*cgreq.ToR",
|
||||
},
|
||||
}
|
||||
rdr.Config().Fields[0].ComputePath()
|
||||
scv := &sqs.SQS{}
|
||||
msg := &sqs.Message{
|
||||
Body: utils.StringPointer(`{"msgBody":"BODY"`),
|
||||
MessageId: utils.StringPointer(`{"msgId":"MESSAGE"}`),
|
||||
ReceiptHandle: utils.StringPointer(`{"msgReceiptHandle":"RECEIPT_HANDLE"}`),
|
||||
}
|
||||
errExp := "unexpected end of JSON input"
|
||||
if err := rdr.readMsg(scv, msg); err == nil || err.Error() != errExp {
|
||||
t.Errorf("Expected %v but received %v", errExp, err)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user