Coverage tests for ers

This commit is contained in:
nickolasdaniel
2021-05-05 13:31:53 +03:00
committed by Dan Christian Bogos
parent 76a53556ce
commit 93346fb354
3 changed files with 229 additions and 1 deletions

View File

@@ -19,8 +19,13 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ers
import (
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func TestKafkasetOpts(t *testing.T) {
@@ -91,3 +96,169 @@ func TestKafkasetOpts(t *testing.T) {
t.Errorf("Expected: %s ,received: %s", expKafka.maxWait, k.maxWait)
}
}
func TestKafkaERServe(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fltrS := new(engine.FilterS)
rdrEvents := make(chan *erEvent, 1)
rdrExit := make(chan struct{}, 1)
rdrErr := make(chan error, 1)
rdr, err := NewKafkaER(cfg, 0, rdrEvents, rdrErr, fltrS, rdrExit)
if err != nil {
t.Error(err)
}
if err := rdr.Serve(); err != nil {
t.Error(err)
}
rdr.Config().RunDelay = 1 * time.Millisecond
if err := rdr.Serve(); err != nil {
t.Error(err)
}
rdr.Config().Opts = map[string]interface{}{}
rdr.Config().ProcessedPath = ""
rdr.(*KafkaER).createPoster()
}
func TestKafkaERServe2(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
rdr := &KafkaER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: new(engine.FilterS),
rdrEvents: make(chan *erEvent, 1),
rdrExit: make(chan struct{}, 1),
rdrErr: make(chan error, 1),
dialURL: "testURL",
groupID: "testGroupID",
topic: "testTopic",
maxWait: time.Duration(1),
cap: make(chan struct{}, 1),
poster: engine.NewKafkaPoster("url", 1, make(map[string]interface{})),
}
rdr.rdrExit <- struct{}{}
rdr.Config().RunDelay = 1 * time.Millisecond
if err := rdr.Serve(); err != nil {
t.Error(err)
}
}
func TestKafkaERProcessMessage(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
rdr := &KafkaER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: new(engine.FilterS),
rdrEvents: make(chan *erEvent, 1),
rdrExit: make(chan struct{}, 1),
rdrErr: make(chan error, 1),
dialURL: "testURL",
groupID: "testGroupID",
topic: "testTopic",
maxWait: time.Duration(1),
cap: make(chan struct{}, 1),
}
expEvent := &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
utils.ToR: "*voice",
},
APIOpts: map[string]interface{}{},
}
rdr.Config().Fields = []*config.FCTemplate{
{
Tag: "Tor",
Type: utils.MetaConstant,
Value: config.NewRSRParsersMustCompile("*voice", utils.InfieldSep),
Path: "*cgreq.ToR",
},
}
rdr.Config().Fields[0].ComputePath()
msg := []byte(`{"test":"input"}`)
if err := rdr.processMessage(msg); err != nil {
t.Error(err)
}
select {
case data := <-rdr.rdrEvents:
expEvent.ID = data.cgrEvent.ID
expEvent.Time = data.cgrEvent.Time
if !reflect.DeepEqual(data.cgrEvent, expEvent) {
t.Errorf("Expected %v but received %v", utils.ToJSON(expEvent), utils.ToJSON(data.cgrEvent))
}
case <-time.After(50 * time.Millisecond):
t.Error("Time limit exceeded")
}
}
func TestKafkaERProcessMessageError1(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
rdr := &KafkaER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: new(engine.FilterS),
rdrEvents: make(chan *erEvent, 1),
rdrExit: make(chan struct{}, 1),
rdrErr: make(chan error, 1),
dialURL: "testURL",
groupID: "testGroupID",
topic: "testTopic",
maxWait: time.Duration(1),
cap: make(chan struct{}, 1),
}
rdr.Config().Fields = []*config.FCTemplate{
{},
}
msg := []byte(`{"test":"input"}`)
errExpect := "unsupported type: <>"
if err := rdr.processMessage(msg); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
}
func TestKafkaERProcessMessageError2(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
data := engine.NewInternalDB(nil, nil, true)
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
fltrs := engine.NewFilterS(cfg, nil, dm)
rdr := &KafkaER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrEvents: make(chan *erEvent, 1),
rdrExit: make(chan struct{}, 1),
rdrErr: make(chan error, 1),
dialURL: "testURL",
groupID: "testGroupID",
topic: "testTopic",
maxWait: time.Duration(1),
cap: make(chan struct{}, 1),
}
rdr.Config().Filters = []string{"Filter1"}
msg := []byte(`{"test":"input"}`)
errExpect := "NOT_FOUND:Filter1"
if err := rdr.processMessage(msg); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
}
func TestKafkaERProcessMessageError3(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
rdr := &KafkaER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: new(engine.FilterS),
rdrEvents: make(chan *erEvent, 1),
rdrExit: make(chan struct{}, 1),
rdrErr: make(chan error, 1),
dialURL: "testURL",
groupID: "testGroupID",
topic: "testTopic",
maxWait: time.Duration(1),
cap: make(chan struct{}, 1),
}
msg := []byte(`{"invalid":"input"`)
errExpect := "unexpected end of JSON input"
if err := rdr.processMessage(msg); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
}

View File

@@ -165,7 +165,7 @@ func (rdr *S3ER) parseOpts(opts map[string]interface{}) {
func (rdr *S3ER) readLoop(scv s3Client) (err error) {
var keys []string
if err = scv.ListObjectsV2Pages(&s3.ListObjectsV2Input{Bucket: aws.String(rdr.queueID)},
func(lovo *s3.ListObjectsV2Output, b bool) bool {
func(lovo *s3.ListObjectsV2Output, _ bool) bool {
for _, objMeta := range lovo.Contents {
if objMeta.Key != nil {
keys = append(keys, *objMeta.Key)

View File

@@ -315,6 +315,14 @@ func TestS3ERReadLoop(t *testing.T) {
poster: nil,
}
listObjects := func(input *s3.ListObjectsV2Input, fn func(*s3.ListObjectsV2Output, bool) bool) error {
obj := &s3.ListObjectsV2Output{
Contents: []*s3.Object{
{
Key: utils.StringPointer("Key"),
},
},
}
fn(obj, false)
return nil
}
scv := &s3ClientMock{
@@ -546,3 +554,52 @@ func TestS3ERReadMsgError4(t *testing.T) {
t.Errorf("Expected %v but received %v", errExp, err)
}
}
func TestS3ERReadMsgError5(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
rdr := &S3ER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: new(engine.FilterS),
rdrEvents: make(chan *erEvent, 1),
rdrExit: make(chan struct{}, 1),
rdrErr: make(chan error, 1),
cap: make(chan struct{}, 1),
awsRegion: "us-east-2",
awsID: "AWSId",
awsKey: "AWSAccessKeyId",
awsToken: "",
queueID: "cgrates_cdrs",
session: nil,
poster: engine.NewSQSPoster("url", 1, make(map[string]interface{})),
}
rdr.Config().SourcePath = rdr.awsRegion
rdr.Config().ConcurrentReqs = -1
rdr.Config().Fields = []*config.FCTemplate{
{
Tag: "Tor",
Type: utils.MetaConstant,
Value: config.NewRSRParsersMustCompile("*voice", utils.InfieldSep),
Path: "*cgreq.ToR",
},
}
rdr.Config().Fields[0].ComputePath()
listObjects := func(input *s3.ListObjectsV2Input, fn func(*s3.ListObjectsV2Output, bool) bool) error {
return nil
}
getObject := func(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
return &s3.GetObjectOutput{Body: io.NopCloser(bytes.NewBuffer([]byte(`{"key":"value"}`)))}, nil
}
deleteObject := func(input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) {
return nil, nil
}
scv := &s3ClientMock{
ListObjectsV2PagesF: listObjects,
GetObjectF: getObject,
DeleteObjectF: deleteObject,
}
errExp := "MissingRegion: could not find region configuration"
if err := rdr.readMsg(scv, "AWSKey"); err == nil || err.Error() != errExp {
t.Errorf("Expected %v but received %v", errExp, err)
}
}