diff --git a/ers/kafka_test.go b/ers/kafka_test.go index 79935683d..3b862dc42 100644 --- a/ers/kafka_test.go +++ b/ers/kafka_test.go @@ -19,8 +19,13 @@ along with this program. If not, see package ers import ( + "reflect" "testing" "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" ) func TestKafkasetOpts(t *testing.T) { @@ -91,3 +96,169 @@ func TestKafkasetOpts(t *testing.T) { t.Errorf("Expected: %s ,received: %s", expKafka.maxWait, k.maxWait) } } + +func TestKafkaERServe(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + fltrS := new(engine.FilterS) + rdrEvents := make(chan *erEvent, 1) + rdrExit := make(chan struct{}, 1) + rdrErr := make(chan error, 1) + rdr, err := NewKafkaER(cfg, 0, rdrEvents, rdrErr, fltrS, rdrExit) + if err != nil { + t.Error(err) + } + if err := rdr.Serve(); err != nil { + t.Error(err) + } + rdr.Config().RunDelay = 1 * time.Millisecond + if err := rdr.Serve(); err != nil { + t.Error(err) + } + rdr.Config().Opts = map[string]interface{}{} + rdr.Config().ProcessedPath = "" + rdr.(*KafkaER).createPoster() +} + +func TestKafkaERServe2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &KafkaER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + dialURL: "testURL", + groupID: "testGroupID", + topic: "testTopic", + maxWait: time.Duration(1), + cap: make(chan struct{}, 1), + poster: engine.NewKafkaPoster("url", 1, make(map[string]interface{})), + } + rdr.rdrExit <- struct{}{} + rdr.Config().RunDelay = 1 * time.Millisecond + if err := rdr.Serve(); err != nil { + t.Error(err) + } +} + +func TestKafkaERProcessMessage(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &KafkaER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + dialURL: "testURL", + groupID: "testGroupID", + topic: "testTopic", + maxWait: time.Duration(1), + cap: make(chan struct{}, 1), + } + expEvent := &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + utils.ToR: "*voice", + }, + APIOpts: map[string]interface{}{}, + } + rdr.Config().Fields = []*config.FCTemplate{ + { + Tag: "Tor", + Type: utils.MetaConstant, + Value: config.NewRSRParsersMustCompile("*voice", utils.InfieldSep), + Path: "*cgreq.ToR", + }, + } + rdr.Config().Fields[0].ComputePath() + + msg := []byte(`{"test":"input"}`) + if err := rdr.processMessage(msg); err != nil { + t.Error(err) + } + select { + case data := <-rdr.rdrEvents: + expEvent.ID = data.cgrEvent.ID + expEvent.Time = data.cgrEvent.Time + if !reflect.DeepEqual(data.cgrEvent, expEvent) { + t.Errorf("Expected %v but received %v", utils.ToJSON(expEvent), utils.ToJSON(data.cgrEvent)) + } + case <-time.After(50 * time.Millisecond): + t.Error("Time limit exceeded") + } +} + +func TestKafkaERProcessMessageError1(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &KafkaER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + dialURL: "testURL", + groupID: "testGroupID", + topic: "testTopic", + maxWait: time.Duration(1), + cap: make(chan struct{}, 1), + } + rdr.Config().Fields = []*config.FCTemplate{ + {}, + } + msg := []byte(`{"test":"input"}`) + errExpect := "unsupported type: <>" + if err := rdr.processMessage(msg); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } +} + +func TestKafkaERProcessMessageError2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + data := engine.NewInternalDB(nil, nil, true) + dm := engine.NewDataManager(data, cfg.CacheCfg(), nil) + fltrs := engine.NewFilterS(cfg, nil, dm) + rdr := &KafkaER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + dialURL: "testURL", + groupID: "testGroupID", + topic: "testTopic", + maxWait: time.Duration(1), + cap: make(chan struct{}, 1), + } + rdr.Config().Filters = []string{"Filter1"} + msg := []byte(`{"test":"input"}`) + errExpect := "NOT_FOUND:Filter1" + if err := rdr.processMessage(msg); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } +} + +func TestKafkaERProcessMessageError3(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &KafkaER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + dialURL: "testURL", + groupID: "testGroupID", + topic: "testTopic", + maxWait: time.Duration(1), + cap: make(chan struct{}, 1), + } + msg := []byte(`{"invalid":"input"`) + errExpect := "unexpected end of JSON input" + if err := rdr.processMessage(msg); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } +} diff --git a/ers/s3.go b/ers/s3.go index 1b31841ba..feffe7927 100644 --- a/ers/s3.go +++ b/ers/s3.go @@ -165,7 +165,7 @@ func (rdr *S3ER) parseOpts(opts map[string]interface{}) { func (rdr *S3ER) readLoop(scv s3Client) (err error) { var keys []string if err = scv.ListObjectsV2Pages(&s3.ListObjectsV2Input{Bucket: aws.String(rdr.queueID)}, - func(lovo *s3.ListObjectsV2Output, b bool) bool { + func(lovo *s3.ListObjectsV2Output, _ bool) bool { for _, objMeta := range lovo.Contents { if objMeta.Key != nil { keys = append(keys, *objMeta.Key) diff --git a/ers/s3_test.go b/ers/s3_test.go index eb3a53a31..2fcffde35 100644 --- a/ers/s3_test.go +++ b/ers/s3_test.go @@ -315,6 +315,14 @@ func TestS3ERReadLoop(t *testing.T) { poster: nil, } listObjects := func(input *s3.ListObjectsV2Input, fn func(*s3.ListObjectsV2Output, bool) bool) error { + obj := &s3.ListObjectsV2Output{ + Contents: []*s3.Object{ + { + Key: utils.StringPointer("Key"), + }, + }, + } + fn(obj, false) return nil } scv := &s3ClientMock{ @@ -546,3 +554,52 @@ func TestS3ERReadMsgError4(t *testing.T) { t.Errorf("Expected %v but received %v", errExp, err) } } + +func TestS3ERReadMsgError5(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &S3ER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + cap: make(chan struct{}, 1), + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: engine.NewSQSPoster("url", 1, make(map[string]interface{})), + } + rdr.Config().SourcePath = rdr.awsRegion + rdr.Config().ConcurrentReqs = -1 + rdr.Config().Fields = []*config.FCTemplate{ + { + Tag: "Tor", + Type: utils.MetaConstant, + Value: config.NewRSRParsersMustCompile("*voice", utils.InfieldSep), + Path: "*cgreq.ToR", + }, + } + rdr.Config().Fields[0].ComputePath() + listObjects := func(input *s3.ListObjectsV2Input, fn func(*s3.ListObjectsV2Output, bool) bool) error { + return nil + } + getObject := func(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) { + return &s3.GetObjectOutput{Body: io.NopCloser(bytes.NewBuffer([]byte(`{"key":"value"}`)))}, nil + } + deleteObject := func(input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) { + return nil, nil + } + scv := &s3ClientMock{ + ListObjectsV2PagesF: listObjects, + GetObjectF: getObject, + DeleteObjectF: deleteObject, + } + errExp := "MissingRegion: could not find region configuration" + if err := rdr.readMsg(scv, "AWSKey"); err == nil || err.Error() != errExp { + t.Errorf("Expected %v but received %v", errExp, err) + } +}