Coverage tests for ers

This commit is contained in:
nickolasdaniel
2021-04-26 13:35:31 +03:00
committed by Dan Christian Bogos
parent 4b7b9fa247
commit b451279779
11 changed files with 770 additions and 129 deletions

View File

@@ -75,6 +75,36 @@ func (rdr *CSVFileER) Config() *config.EventReaderCfg {
return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx]
}
func (rdr *CSVFileER) serveDefault() {
tm := time.NewTimer(0)
for {
// Not automated, process and sleep approach
select {
case <-rdr.rdrExit:
tm.Stop()
utils.Logger.Info(
fmt.Sprintf("<%s> stop monitoring path <%s>",
utils.ERs, rdr.rdrDir))
return
case <-tm.C:
}
filesInDir, _ := os.ReadDir(rdr.rdrDir)
for _, file := range filesInDir {
if !strings.HasSuffix(file.Name(), utils.CSVSuffix) { // hardcoded file extension for csv event reader
continue // used in order to filter the files from directory
}
go func(fileName string) {
if err := rdr.processFile(rdr.rdrDir, fileName); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing file %s, error: %s",
utils.ERs, fileName, err.Error()))
}
}(file.Name())
}
tm.Reset(rdr.Config().RunDelay)
}
}
func (rdr *CSVFileER) Serve() (err error) {
switch rdr.Config().RunDelay {
case time.Duration(0): // 0 disables the automatic read, maybe done per API
@@ -83,35 +113,7 @@ func (rdr *CSVFileER) Serve() (err error) {
return utils.WatchDir(rdr.rdrDir, rdr.processFile,
utils.ERs, rdr.rdrExit)
default:
go func() {
tm := time.NewTimer(0)
for {
// Not automated, process and sleep approach
select {
case <-rdr.rdrExit:
tm.Stop()
utils.Logger.Info(
fmt.Sprintf("<%s> stop monitoring path <%s>",
utils.ERs, rdr.rdrDir))
return
case <-tm.C:
}
filesInDir, _ := os.ReadDir(rdr.rdrDir)
for _, file := range filesInDir {
if !strings.HasSuffix(file.Name(), utils.CSVSuffix) { // hardcoded file extension for csv event reader
continue // used in order to filter the files from directory
}
go func(fileName string) {
if err := rdr.processFile(rdr.rdrDir, fileName); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing file %s, error: %s",
utils.ERs, fileName, err.Error()))
}
}(file.Name())
}
tm.Reset(rdr.Config().RunDelay)
}
}()
go rdr.serveDefault()
}
return
}

View File

@@ -667,8 +667,29 @@ func TestFileCSV(t *testing.T) {
conReqs: make(chan struct{}, 1),
}
eR.conReqs <- struct{}{}
eR.Config().RunDelay = 1 * time.Millisecond
if err := eR.Serve(); err != nil {
t.Error(err)
}
}
func TestFileCSVServeDefault(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fltrs := &engine.FilterS{}
eR := &CSVFileER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/ers/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
var err error
eR.conReqs <- struct{}{}
filePath := "/tmp/ers/out/"
err := os.MkdirAll(filePath, 0777)
err = os.MkdirAll(filePath, 0777)
if err != nil {
t.Error(err)
}
@@ -679,8 +700,12 @@ func TestFileCSV(t *testing.T) {
}
eR.Config().RunDelay = 1 * time.Millisecond
os.Create(path.Join(filePath, "file1.txt"))
eR.Config().RunDelay = 1 * time.Millisecond
if err := eR.Serve(); err != nil {
go func() {
time.Sleep(20 * time.Millisecond)
close(eR.rdrExit)
}()
eR.serveDefault()
if err := os.RemoveAll(filePath); err != nil {
t.Error(err)
}
}

View File

@@ -81,6 +81,36 @@ func (rdr *FWVFileER) Config() *config.EventReaderCfg {
return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx]
}
func (rdr *FWVFileER) serveDefault() {
tm := time.NewTimer(0)
for {
// Not automated, process and sleep approach
select {
case <-rdr.rdrExit:
tm.Stop()
utils.Logger.Info(
fmt.Sprintf("<%s> stop monitoring path <%s>",
utils.ERs, rdr.rdrDir))
return
case <-tm.C:
}
filesInDir, _ := os.ReadDir(rdr.rdrDir)
for _, file := range filesInDir {
if !strings.HasSuffix(file.Name(), utils.FWVSuffix) { // hardcoded file extension for xml event reader
continue // used in order to filter the files from directory
}
go func(fileName string) {
if err := rdr.processFile(rdr.rdrDir, fileName); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing file %s, error: %s",
utils.ERs, fileName, err.Error()))
}
}(file.Name())
}
tm.Reset(rdr.Config().RunDelay)
}
}
func (rdr *FWVFileER) Serve() (err error) {
switch rdr.Config().RunDelay {
case time.Duration(0): // 0 disables the automatic read, maybe done per API
@@ -89,35 +119,7 @@ func (rdr *FWVFileER) Serve() (err error) {
return utils.WatchDir(rdr.rdrDir, rdr.processFile,
utils.ERs, rdr.rdrExit)
default:
go func() {
tm := time.NewTimer(0)
for {
// Not automated, process and sleep approach
select {
case <-rdr.rdrExit:
tm.Stop()
utils.Logger.Info(
fmt.Sprintf("<%s> stop monitoring path <%s>",
utils.ERs, rdr.rdrDir))
return
case <-tm.C:
}
filesInDir, _ := os.ReadDir(rdr.rdrDir)
for _, file := range filesInDir {
if !strings.HasSuffix(file.Name(), utils.FWVSuffix) { // hardcoded file extension for xml event reader
continue // used in order to filter the files from directory
}
go func(fileName string) {
if err := rdr.processFile(rdr.rdrDir, fileName); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing file %s, error: %s",
utils.ERs, fileName, err.Error()))
}
}(file.Name())
}
tm.Reset(rdr.Config().RunDelay)
}
}()
go rdr.serveDefault()
}
return
}

View File

@@ -381,6 +381,42 @@ func TestFileFWV(t *testing.T) {
}
}
func TestFileFWVServeDefault(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fltrs := &engine.FilterS{}
eR := &FWVFileER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/fwvErs/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
eR.conReqs <- struct{}{}
filePath := "/tmp/fwvErs/out"
err := os.MkdirAll(filePath, 0777)
if err != nil {
t.Error(err)
}
for i := 1; i < 4; i++ {
if _, err := os.Create(path.Join(filePath, fmt.Sprintf("file%d.fwv", i))); err != nil {
t.Error(err)
}
}
os.Create(path.Join(filePath, "file1.txt"))
eR.Config().RunDelay = 1 * time.Millisecond
go func() {
time.Sleep(20 * time.Millisecond)
close(eR.rdrExit)
}()
eR.serveDefault()
if err := os.RemoveAll(filePath); err != nil {
t.Error(err)
}
}
func TestFileFWVExit(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fltrs := &engine.FilterS{}

View File

@@ -75,6 +75,36 @@ func (rdr *JSONFileER) Config() *config.EventReaderCfg {
return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx]
}
func (rdr *JSONFileER) serveDefault() {
tm := time.NewTimer(0)
for {
// Not automated, process and sleep approach
select {
case <-rdr.rdrExit:
tm.Stop()
utils.Logger.Info(
fmt.Sprintf("<%s> stop monitoring path <%s>",
utils.ERs, rdr.rdrDir))
return
case <-tm.C:
}
filesInDir, _ := os.ReadDir(rdr.rdrDir)
for _, file := range filesInDir {
if !strings.HasSuffix(file.Name(), utils.JSNSuffix) { // hardcoded file extension for json event reader
continue // used in order to filter the files from directory
}
go func(fileName string) {
if err := rdr.processFile(rdr.rdrDir, fileName); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing file %s, error: %s",
utils.ERs, fileName, err.Error()))
}
}(file.Name())
}
tm.Reset(rdr.Config().RunDelay)
}
}
func (rdr *JSONFileER) Serve() (err error) {
switch rdr.Config().RunDelay {
case time.Duration(0): // 0 disables the automatic read, maybe done per API
@@ -83,35 +113,7 @@ func (rdr *JSONFileER) Serve() (err error) {
return utils.WatchDir(rdr.rdrDir, rdr.processFile,
utils.ERs, rdr.rdrExit)
default:
go func() {
tm := time.NewTimer(0)
for {
// Not automated, process and sleep approach
select {
case <-rdr.rdrExit:
tm.Stop()
utils.Logger.Info(
fmt.Sprintf("<%s> stop monitoring path <%s>",
utils.ERs, rdr.rdrDir))
return
case <-tm.C:
}
filesInDir, _ := os.ReadDir(rdr.rdrDir)
for _, file := range filesInDir {
if !strings.HasSuffix(file.Name(), utils.JSNSuffix) { // hardcoded file extension for json event reader
continue // used in order to filter the files from directory
}
go func(fileName string) {
if err := rdr.processFile(rdr.rdrDir, fileName); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing file %s, error: %s",
utils.ERs, fileName, err.Error()))
}
}(file.Name())
}
tm.Reset(rdr.Config().RunDelay)
}
}()
go rdr.serveDefault()
}
return
}

View File

@@ -524,6 +524,25 @@ func TestFileJSONProcessEventError3(t *testing.T) {
}
func TestFileJSON(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fltrs := &engine.FilterS{}
eR := &JSONFileER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/ErsJSON/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
eR.conReqs <- struct{}{}
if err := eR.Serve(); err != nil {
t.Error(err)
}
}
func TestFileJSONServeDefault(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fltrs := &engine.FilterS{}
eR := &JSONFileER{
@@ -548,12 +567,13 @@ func TestFileJSON(t *testing.T) {
}
}
eR.Config().RunDelay = 1 * time.Millisecond
if err := eR.Serve(); err != nil {
t.Error(err)
}
os.Create(path.Join(filePath, "file1.txt"))
eR.Config().RunDelay = 1 * time.Millisecond
if err := eR.Serve(); err != nil {
go func() {
time.Sleep(20 * time.Millisecond)
close(eR.rdrExit)
}()
eR.serveDefault()
if err := os.RemoveAll(filePath); err != nil {
t.Error(err)
}
}

View File

@@ -82,6 +82,36 @@ func (rdr *FlatstoreER) Config() *config.EventReaderCfg {
return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx]
}
func (rdr *FlatstoreER) serveDefault() {
tm := time.NewTimer(0)
for {
// Not automated, process and sleep approach
select {
case <-rdr.rdrExit:
tm.Stop()
utils.Logger.Info(
fmt.Sprintf("<%s> stop monitoring path <%s>",
utils.ERs, rdr.rdrDir))
return
case <-tm.C:
}
filesInDir, _ := os.ReadDir(rdr.rdrDir)
for _, file := range filesInDir {
if !strings.HasSuffix(file.Name(), utils.CSVSuffix) { // hardcoded file extension for csv event reader
continue // used in order to filter the files from directory
}
go func(fileName string) {
if err := rdr.processFile(rdr.rdrDir, fileName); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing file %s, error: %s",
utils.ERs, fileName, err.Error()))
}
}(file.Name())
}
tm.Reset(rdr.Config().RunDelay)
}
}
func (rdr *FlatstoreER) Serve() (err error) {
switch rdr.Config().RunDelay {
case time.Duration(0): // 0 disables the automatic read, maybe done per API
@@ -90,35 +120,7 @@ func (rdr *FlatstoreER) Serve() (err error) {
return utils.WatchDir(rdr.rdrDir, rdr.processFile,
utils.ERs, rdr.rdrExit)
default:
go func() {
tm := time.NewTimer(0)
for {
// Not automated, process and sleep approach
select {
case <-rdr.rdrExit:
tm.Stop()
utils.Logger.Info(
fmt.Sprintf("<%s> stop monitoring path <%s>",
utils.ERs, rdr.rdrDir))
return
case <-tm.C:
}
filesInDir, _ := os.ReadDir(rdr.rdrDir)
for _, file := range filesInDir {
if !strings.HasSuffix(file.Name(), utils.CSVSuffix) { // hardcoded file extension for csv event reader
continue // used in order to filter the files from directory
}
go func(fileName string) {
if err := rdr.processFile(rdr.rdrDir, fileName); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing file %s, error: %s",
utils.ERs, fileName, err.Error()))
}
}(file.Name())
}
tm.Reset(rdr.Config().RunDelay)
}
}()
go rdr.serveDefault()
}
return
}

View File

@@ -619,6 +619,25 @@ func TestFlatstoreProcessEventDirError(t *testing.T) {
}
func TestFlatstore(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fltrs := &engine.FilterS{}
eR := &FlatstoreER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/flatstoreErs/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
eR.conReqs <- struct{}{}
if err := eR.Serve(); err != nil {
t.Error(err)
}
}
func TestFlatstoreServeDefault(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fltrs := &engine.FilterS{}
eR := &FlatstoreER{
@@ -643,12 +662,13 @@ func TestFlatstore(t *testing.T) {
}
}
eR.Config().RunDelay = 1 * time.Millisecond
if err := eR.Serve(); err != nil {
t.Error(err)
}
os.Create(path.Join(filePath, "file1.txt"))
eR.Config().RunDelay = 1 * time.Millisecond
if err := eR.Serve(); err != nil {
go func() {
time.Sleep(20 * time.Millisecond)
close(eR.rdrExit)
}()
eR.serveDefault()
if err := os.RemoveAll(filePath); err != nil {
t.Error(err)
}
}

View File

@@ -294,3 +294,67 @@ func TestNewAMQPv1Reader(t *testing.T) {
t.Errorf("Expected \n%v but received \n%v", expected, rcv)
}
}
func TestNewS3Reader(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fltr := &engine.FilterS{}
cfg.ERsCfg().Readers[0].Type = utils.MetaS3jsonMap
cfg.ERsCfg().Readers[0].ConcurrentReqs = -1
exp := &S3ER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltr,
rdrEvents: nil,
rdrExit: nil,
rdrErr: nil,
queueID: "cgrates_cdrs",
}
exp.Config().ProcessedPath = ""
exp.Config().Opts = map[string]interface{}{}
exp.createPoster()
var expected EventReader = exp
rcv, err := NewEventReader(cfg, 0, nil, nil, fltr, nil)
if err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expected, rcv) {
t.Errorf("Expected \n%v but received \n%v", expected, rcv)
}
}
func TestNewSQSReader(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fltr := &engine.FilterS{}
cfg.ERsCfg().Readers[0].Type = utils.MetaSQSjsonMap
cfg.ERsCfg().Readers[0].ConcurrentReqs = -1
exp := &SQSER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltr,
rdrEvents: nil,
rdrExit: nil,
rdrErr: nil,
queueID: "cgrates_cdrs",
}
exp.Config().SourcePath = "string"
// var err error
// awsCfg := aws.Config{Endpoint: aws.String(exp.Config().SourcePath)}
// exp.session, err = session.NewSessionWithOptions(
// session.Options{
// Config: awsCfg,
// },
// )
// if err != nil {
// t.Error(err)
// }
exp.Config().ProcessedPath = ""
exp.Config().Opts = map[string]interface{}{}
exp.createPoster()
var expected EventReader = exp
rcv, err := NewEventReader(cfg, 0, nil, nil, fltr, nil)
exp.session = rcv.(*SQSER).session
if err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expected, rcv) {
t.Errorf("Expected \n%v but received \n%v", expected, rcv)
}
}

266
ers/s3_test.go Normal file
View File

@@ -0,0 +1,266 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package ers
import (
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func TestS3ERServe(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
rdr, err := NewS3ER(cfg, 0, nil,
nil, nil, nil)
if err != nil {
t.Error(err)
}
rdr.Config().RunDelay = 1 * time.Millisecond
if err := rdr.Serve(); err != nil {
t.Error(err)
}
}
func TestS3ERServe2(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
rdr := &S3ER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: nil,
rdrEvents: nil,
rdrExit: nil,
rdrErr: nil,
cap: nil,
awsRegion: "us-east-2",
awsID: "AWSId",
awsKey: "AWSAccessKeyId",
awsToken: "",
queueID: "cgrates_cdrs",
session: nil,
poster: nil,
}
if err := rdr.Serve(); err != nil {
t.Error(err)
}
}
func TestS3ERProcessMessage(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
rdr := &S3ER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: new(engine.FilterS),
rdrEvents: make(chan *erEvent, 1),
rdrExit: make(chan struct{}),
rdrErr: make(chan error, 1),
cap: nil,
awsRegion: "us-east-2",
awsID: "AWSId",
awsKey: "AWSAccessKeyId",
awsToken: "",
queueID: "cgrates_cdrs",
session: nil,
poster: nil,
}
expEvent := &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
utils.CGRID: "testCgrId",
},
APIOpts: map[string]interface{}{},
}
body := []byte(`{"CGRID":"testCgrId"}`)
rdr.Config().Fields = []*config.FCTemplate{
{
Tag: "CGRID",
Type: utils.MetaConstant,
Value: config.NewRSRParsersMustCompile("testCgrId", utils.InfieldSep),
Path: "*cgreq.CGRID",
},
}
rdr.Config().Fields[0].ComputePath()
if err := rdr.processMessage(body); err != nil {
t.Error(err)
}
select {
case data := <-rdr.rdrEvents:
expEvent.ID = data.cgrEvent.ID
expEvent.Time = data.cgrEvent.Time
if !reflect.DeepEqual(data.cgrEvent, expEvent) {
t.Errorf("Expected %v but received %v", utils.ToJSON(expEvent), utils.ToJSON(data.cgrEvent))
}
case <-time.After(50 * time.Millisecond):
t.Error("Time limit exceeded")
}
}
func TestS3ERProcessMessageError1(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
rdr := &S3ER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: new(engine.FilterS),
rdrEvents: make(chan *erEvent, 1),
rdrExit: make(chan struct{}),
rdrErr: make(chan error, 1),
cap: nil,
awsRegion: "us-east-2",
awsID: "AWSId",
awsKey: "AWSAccessKeyId",
awsToken: "",
queueID: "cgrates_cdrs",
session: nil,
poster: nil,
}
rdr.Config().Fields = []*config.FCTemplate{
{},
}
body := []byte(`{"CGRID":"testCgrId"}`)
errExpect := "unsupported type: <>"
if err := rdr.processMessage(body); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
}
func TestS3ERProcessMessageError2(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
data := engine.NewInternalDB(nil, nil, true)
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
cfg.ERsCfg().Readers[0].ProcessedPath = ""
fltrs := engine.NewFilterS(cfg, nil, dm)
rdr := &S3ER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrEvents: make(chan *erEvent, 1),
rdrExit: make(chan struct{}),
rdrErr: make(chan error, 1),
cap: nil,
awsRegion: "us-east-2",
awsID: "AWSId",
awsKey: "AWSAccessKeyId",
awsToken: "",
queueID: "cgrates_cdrs",
session: nil,
poster: nil,
}
body := []byte(`{"CGRID":"testCgrId"}`)
rdr.Config().Filters = []string{"Filter1"}
errExpect := "NOT_FOUND:Filter1"
if err := rdr.processMessage(body); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
//
rdr.Config().Filters = []string{"*exists:~*req..Account:"}
if err := rdr.processMessage(body); err != nil {
t.Error(err)
}
}
func TestS3ERProcessMessageError3(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
rdr := &S3ER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: new(engine.FilterS),
rdrEvents: make(chan *erEvent, 1),
rdrExit: make(chan struct{}),
rdrErr: make(chan error, 1),
cap: nil,
awsRegion: "us-east-2",
awsID: "AWSId",
awsKey: "AWSAccessKeyId",
awsToken: "",
queueID: "cgrates_cdrs",
session: nil,
poster: nil,
}
body := []byte("invalid_format")
errExpect := "invalid character 'i' looking for beginning of value"
if err := rdr.processMessage(body); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
}
func TestS3ERParseOpts(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
rdr := &S3ER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: new(engine.FilterS),
rdrEvents: make(chan *erEvent, 1),
rdrExit: make(chan struct{}),
rdrErr: make(chan error, 1),
cap: nil,
awsRegion: "us-east-2",
awsID: "AWSId",
awsKey: "AWSAccessKeyId",
awsToken: "",
queueID: "cgrates_cdrs",
session: nil,
poster: nil,
}
opts := map[string]interface{}{
utils.QueueID: "QueueID",
utils.AWSRegion: "AWSRegion",
utils.AWSKey: "AWSKey",
utils.AWSSecret: "AWSSecret",
utils.AWSToken: "AWSToken",
}
rdr.parseOpts(opts)
if rdr.queueID != opts[utils.QueueID] || rdr.awsRegion != opts[utils.AWSRegion] || rdr.awsID != opts[utils.AWSKey] || rdr.awsKey != opts[utils.AWSSecret] || rdr.awsToken != opts[utils.AWSToken] {
t.Error("Fields do not corespond")
}
rdr.Config().Opts = map[string]interface{}{}
rdr.Config().ProcessedPath = utils.EmptyString
rdr.createPoster()
}
func TestS3ERIsClosed(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
rdr := &S3ER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: new(engine.FilterS),
rdrEvents: make(chan *erEvent, 1),
rdrExit: make(chan struct{}, 1),
rdrErr: make(chan error, 1),
cap: nil,
awsRegion: "us-east-2",
awsID: "AWSId",
awsKey: "AWSAccessKeyId",
awsToken: "",
queueID: "cgrates_cdrs",
session: nil,
poster: nil,
}
if rcv := rdr.isClosed(); rcv != false {
t.Errorf("Expected %v but received %v", false, true)
}
rdr.rdrExit <- struct{}{}
if rcv := rdr.isClosed(); rcv != true {
t.Errorf("Expected %v but received %v", true, false)
}
}

View File

@@ -24,6 +24,7 @@ import (
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -121,3 +122,204 @@ func TestSQSERServe(t *testing.T) {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, result)
}
}
func TestSQSERProcessMessage(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
rdr := &SQSER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: new(engine.FilterS),
rdrEvents: make(chan *erEvent, 1),
rdrExit: make(chan struct{}),
rdrErr: make(chan error, 1),
cap: nil,
awsRegion: "us-east-2",
awsID: "AWSId",
awsKey: "AWSAccessKeyId",
awsToken: "",
queueID: "cgrates_cdrs",
session: nil,
poster: nil,
}
expEvent := &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
utils.CGRID: "testCgrId",
},
APIOpts: map[string]interface{}{},
}
body := []byte(`{"CGRID":"testCgrId"}`)
rdr.Config().Fields = []*config.FCTemplate{
{
Tag: "CGRID",
Type: utils.MetaConstant,
Value: config.NewRSRParsersMustCompile("testCgrId", utils.InfieldSep),
Path: "*cgreq.CGRID",
},
}
rdr.Config().Fields[0].ComputePath()
if err := rdr.processMessage(body); err != nil {
t.Error(err)
}
select {
case data := <-rdr.rdrEvents:
expEvent.ID = data.cgrEvent.ID
expEvent.Time = data.cgrEvent.Time
if !reflect.DeepEqual(data.cgrEvent, expEvent) {
t.Errorf("Expected %v but received %v", utils.ToJSON(expEvent), utils.ToJSON(data.cgrEvent))
}
case <-time.After(50 * time.Millisecond):
t.Error("Time limit exceeded")
}
}
func TestSQSERProcessMessageError1(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
rdr := &SQSER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: new(engine.FilterS),
rdrEvents: make(chan *erEvent, 1),
rdrExit: make(chan struct{}),
rdrErr: make(chan error, 1),
cap: nil,
awsRegion: "us-east-2",
awsID: "AWSId",
awsKey: "AWSAccessKeyId",
awsToken: "",
queueID: "cgrates_cdrs",
session: nil,
poster: nil,
}
rdr.Config().Fields = []*config.FCTemplate{
{},
}
body := []byte(`{"CGRID":"testCgrId"}`)
errExpect := "unsupported type: <>"
if err := rdr.processMessage(body); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
}
func TestSQSERProcessMessageError2(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
data := engine.NewInternalDB(nil, nil, true)
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
cfg.ERsCfg().Readers[0].ProcessedPath = ""
fltrs := engine.NewFilterS(cfg, nil, dm)
rdr := &SQSER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrEvents: make(chan *erEvent, 1),
rdrExit: make(chan struct{}),
rdrErr: make(chan error, 1),
cap: nil,
awsRegion: "us-east-2",
awsID: "AWSId",
awsKey: "AWSAccessKeyId",
awsToken: "",
queueID: "cgrates_cdrs",
session: nil,
poster: nil,
}
body := []byte(`{"CGRID":"testCgrId"}`)
rdr.Config().Filters = []string{"Filter1"}
errExpect := "NOT_FOUND:Filter1"
if err := rdr.processMessage(body); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
//
rdr.Config().Filters = []string{"*exists:~*req..Account:"}
if err := rdr.processMessage(body); err != nil {
t.Error(err)
}
}
func TestSQSERProcessMessageError3(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
rdr := &SQSER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: new(engine.FilterS),
rdrEvents: make(chan *erEvent, 1),
rdrExit: make(chan struct{}),
rdrErr: make(chan error, 1),
cap: nil,
awsRegion: "us-east-2",
awsID: "AWSId",
awsKey: "AWSAccessKeyId",
awsToken: "",
queueID: "cgrates_cdrs",
session: nil,
poster: nil,
}
body := []byte("invalid_format")
errExpect := "invalid character 'i' looking for beginning of value"
if err := rdr.processMessage(body); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
}
func TestSQSERParseOpts(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
rdr := &SQSER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: new(engine.FilterS),
rdrEvents: make(chan *erEvent, 1),
rdrExit: make(chan struct{}),
rdrErr: make(chan error, 1),
cap: nil,
awsRegion: "us-east-2",
awsID: "AWSId",
awsKey: "AWSAccessKeyId",
awsToken: "",
queueID: "cgrates_cdrs",
session: nil,
poster: nil,
}
opts := map[string]interface{}{
utils.QueueID: "QueueID",
utils.AWSRegion: "AWSRegion",
utils.AWSKey: "AWSKey",
utils.AWSSecret: "AWSSecret",
utils.AWSToken: "AWSToken",
}
rdr.parseOpts(opts)
if rdr.queueID != opts[utils.QueueID] || rdr.awsRegion != opts[utils.AWSRegion] || rdr.awsID != opts[utils.AWSKey] || rdr.awsKey != opts[utils.AWSSecret] || rdr.awsToken != opts[utils.AWSToken] {
t.Error("Fields do not corespond")
}
rdr.Config().Opts = map[string]interface{}{}
rdr.Config().ProcessedPath = utils.EmptyString
rdr.createPoster()
}
func TestSQSERIsClosed(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
rdr := &SQSER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: new(engine.FilterS),
rdrEvents: make(chan *erEvent, 1),
rdrExit: make(chan struct{}, 1),
rdrErr: make(chan error, 1),
cap: nil,
awsRegion: "us-east-2",
awsID: "AWSId",
awsKey: "AWSAccessKeyId",
awsToken: "",
queueID: "cgrates_cdrs",
session: nil,
poster: nil,
}
if rcv := rdr.isClosed(); rcv != false {
t.Errorf("Expected %v but received %v", false, true)
}
rdr.rdrExit <- struct{}{}
if rcv := rdr.isClosed(); rcv != true {
t.Errorf("Expected %v but received %v", true, false)
}
}