From 2382e547dcfc7bdf122060d1b5a87c963632df1e Mon Sep 17 00:00:00 2001 From: nickolasdaniel Date: Tue, 27 Apr 2021 13:31:37 +0300 Subject: [PATCH] Coverage tests for ers --- ers/amqpv1_test.go | 148 +++++++++++++++++++++++++++++++++++++++++++++ ers/s3.go | 15 +++-- 2 files changed, 158 insertions(+), 5 deletions(-) create mode 100644 ers/amqpv1_test.go diff --git a/ers/amqpv1_test.go b/ers/amqpv1_test.go new file mode 100644 index 000000000..655e983ec --- /dev/null +++ b/ers/amqpv1_test.go @@ -0,0 +1,148 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ers + +import ( + "reflect" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func TestAMQPv1ERProcessMessage(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &AMQPv1ER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}), + rdrErr: make(chan error, 1), + cap: nil, + queueID: "cgrates_cdrs", + poster: nil, + } + expEvent := &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + utils.CGRID: "testCgrId", + }, + APIOpts: map[string]interface{}{}, + } + body := []byte(`{"CGRID":"testCgrId"}`) + rdr.Config().Fields = []*config.FCTemplate{ + { + Tag: "CGRID", + Type: utils.MetaConstant, + Value: config.NewRSRParsersMustCompile("testCgrId", utils.InfieldSep), + Path: "*cgreq.CGRID", + }, + } + rdr.Config().Fields[0].ComputePath() + if err := rdr.processMessage(body); err != nil { + t.Error(err) + } + select { + case data := <-rdr.rdrEvents: + expEvent.ID = data.cgrEvent.ID + expEvent.Time = data.cgrEvent.Time + if !reflect.DeepEqual(data.cgrEvent, expEvent) { + t.Errorf("Expected %v but received %v", utils.ToJSON(expEvent), utils.ToJSON(data.cgrEvent)) + } + case <-time.After(50 * time.Millisecond): + t.Error("Time limit exceeded") + } +} + +func TestAMQPv1ERProcessMessageError1(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &AMQPv1ER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}), + rdrErr: make(chan error, 1), + cap: nil, + queueID: "cgrates_cdrs", + poster: nil, + } + rdr.Config().Fields = []*config.FCTemplate{ + {}, + } + body := []byte(`{"CGRID":"testCgrId"}`) + errExpect := "unsupported type: <>" + if err := rdr.processMessage(body); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } +} + +func TestAMQPv1ERProcessMessageError2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + data := engine.NewInternalDB(nil, nil, true) + dm := engine.NewDataManager(data, cfg.CacheCfg(), nil) + cfg.ERsCfg().Readers[0].ProcessedPath = "" + fltrs := engine.NewFilterS(cfg, nil, dm) + rdr := &AMQPv1ER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}), + rdrErr: make(chan error, 1), + cap: nil, + queueID: "cgrates_cdrs", + poster: nil, + } + body := []byte(`{"CGRID":"testCgrId"}`) + rdr.Config().Filters = []string{"Filter1"} + errExpect := "NOT_FOUND:Filter1" + if err := rdr.processMessage(body); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } + + // + rdr.Config().Filters = []string{"*exists:~*req..Account:"} + if err := rdr.processMessage(body); err != nil { + t.Error(err) + } +} + +func TestAMQPv1ERProcessMessageError3(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &AMQPv1ER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}), + rdrErr: make(chan error, 1), + cap: nil, + queueID: "cgrates_cdrs", + poster: nil, + } + body := []byte("invalid_format") + errExpect := "invalid character 'i' looking for beginning of value" + if err := rdr.processMessage(body); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } +} diff --git a/ers/s3.go b/ers/s3.go index f3fe6a5e9..c051752ce 100644 --- a/ers/s3.go +++ b/ers/s3.go @@ -80,6 +80,12 @@ type S3ER struct { poster engine.Poster } +type s3Client interface { + ListObjectsV2Pages(input *s3.ListObjectsV2Input, fn func(*s3.ListObjectsV2Output, bool) bool) error + GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) + DeleteObject(input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) +} + // Config returns the curent configuration func (rdr *S3ER) Config() *config.EventReaderCfg { return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] @@ -104,8 +110,7 @@ func (rdr *S3ER) Serve() (err error) { if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API return } - - go rdr.readLoop() // read until the connection is closed + go rdr.readLoop(s3.New(rdr.session)) // read until the connection is closed return } @@ -157,8 +162,7 @@ func (rdr *S3ER) parseOpts(opts map[string]interface{}) { } } -func (rdr *S3ER) readLoop() (err error) { - scv := s3.New(rdr.session) +func (rdr *S3ER) readLoop(scv s3Client) (err error) { var keys []string if err = scv.ListObjectsV2Pages(&s3.ListObjectsV2Input{Bucket: aws.String(rdr.queueID)}, func(lovo *s3.ListObjectsV2Output, b bool) bool { @@ -200,7 +204,7 @@ func (rdr *S3ER) isClosed() bool { } } -func (rdr *S3ER) readMsg(scv *s3.S3, key string) (err error) { +func (rdr *S3ER) readMsg(scv s3Client, key string) (err error) { if rdr.Config().ConcurrentReqs != -1 { <-rdr.cap // do not try to read if the limit is reached defer func() { rdr.cap <- struct{}{} }() @@ -210,6 +214,7 @@ func (rdr *S3ER) readMsg(scv *s3.S3, key string) (err error) { } obj, err := scv.GetObject(&s3.GetObjectInput{Bucket: &rdr.queueID, Key: &key}) + fmt.Println(obj) if err != nil { rdr.rdrErr <- err return