diff --git a/ers/filecsv.go b/ers/filecsv.go index 8bb314b54..056d5352c 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -75,6 +75,36 @@ func (rdr *CSVFileER) Config() *config.EventReaderCfg { return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] } +func (rdr *CSVFileER) serveDefault() { + tm := time.NewTimer(0) + for { + // Not automated, process and sleep approach + select { + case <-rdr.rdrExit: + tm.Stop() + utils.Logger.Info( + fmt.Sprintf("<%s> stop monitoring path <%s>", + utils.ERs, rdr.rdrDir)) + return + case <-tm.C: + } + filesInDir, _ := os.ReadDir(rdr.rdrDir) + for _, file := range filesInDir { + if !strings.HasSuffix(file.Name(), utils.CSVSuffix) { // hardcoded file extension for csv event reader + continue // used in order to filter the files from directory + } + go func(fileName string) { + if err := rdr.processFile(rdr.rdrDir, fileName); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> processing file %s, error: %s", + utils.ERs, fileName, err.Error())) + } + }(file.Name()) + } + tm.Reset(rdr.Config().RunDelay) + } +} + func (rdr *CSVFileER) Serve() (err error) { switch rdr.Config().RunDelay { case time.Duration(0): // 0 disables the automatic read, maybe done per API @@ -83,35 +113,7 @@ func (rdr *CSVFileER) Serve() (err error) { return utils.WatchDir(rdr.rdrDir, rdr.processFile, utils.ERs, rdr.rdrExit) default: - go func() { - tm := time.NewTimer(0) - for { - // Not automated, process and sleep approach - select { - case <-rdr.rdrExit: - tm.Stop() - utils.Logger.Info( - fmt.Sprintf("<%s> stop monitoring path <%s>", - utils.ERs, rdr.rdrDir)) - return - case <-tm.C: - } - filesInDir, _ := os.ReadDir(rdr.rdrDir) - for _, file := range filesInDir { - if !strings.HasSuffix(file.Name(), utils.CSVSuffix) { // hardcoded file extension for csv event reader - continue // used in order to filter the files from directory - } - go func(fileName string) { - if err := rdr.processFile(rdr.rdrDir, fileName); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> processing file %s, error: %s", - utils.ERs, fileName, err.Error())) - } - }(file.Name()) - } - tm.Reset(rdr.Config().RunDelay) - } - }() + go rdr.serveDefault() } return } diff --git a/ers/filecsv_it_test.go b/ers/filecsv_it_test.go index 3bd7af6f4..b06bc5cfa 100644 --- a/ers/filecsv_it_test.go +++ b/ers/filecsv_it_test.go @@ -667,8 +667,29 @@ func TestFileCSV(t *testing.T) { conReqs: make(chan struct{}, 1), } eR.conReqs <- struct{}{} + eR.Config().RunDelay = 1 * time.Millisecond + if err := eR.Serve(); err != nil { + t.Error(err) + } +} + +func TestFileCSVServeDefault(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + fltrs := &engine.FilterS{} + eR := &CSVFileER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/ers/out/", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + var err error + eR.conReqs <- struct{}{} filePath := "/tmp/ers/out/" - err := os.MkdirAll(filePath, 0777) + err = os.MkdirAll(filePath, 0777) if err != nil { t.Error(err) } @@ -679,8 +700,12 @@ func TestFileCSV(t *testing.T) { } eR.Config().RunDelay = 1 * time.Millisecond os.Create(path.Join(filePath, "file1.txt")) - eR.Config().RunDelay = 1 * time.Millisecond - if err := eR.Serve(); err != nil { + go func() { + time.Sleep(20 * time.Millisecond) + close(eR.rdrExit) + }() + eR.serveDefault() + if err := os.RemoveAll(filePath); err != nil { t.Error(err) } } diff --git a/ers/filefwv.go b/ers/filefwv.go index c699b8c6c..2d4cad7c6 100644 --- a/ers/filefwv.go +++ b/ers/filefwv.go @@ -81,6 +81,36 @@ func (rdr *FWVFileER) Config() *config.EventReaderCfg { return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] } +func (rdr *FWVFileER) serveDefault() { + tm := time.NewTimer(0) + for { + // Not automated, process and sleep approach + select { + case <-rdr.rdrExit: + tm.Stop() + utils.Logger.Info( + fmt.Sprintf("<%s> stop monitoring path <%s>", + utils.ERs, rdr.rdrDir)) + return + case <-tm.C: + } + filesInDir, _ := os.ReadDir(rdr.rdrDir) + for _, file := range filesInDir { + if !strings.HasSuffix(file.Name(), utils.FWVSuffix) { // hardcoded file extension for xml event reader + continue // used in order to filter the files from directory + } + go func(fileName string) { + if err := rdr.processFile(rdr.rdrDir, fileName); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> processing file %s, error: %s", + utils.ERs, fileName, err.Error())) + } + }(file.Name()) + } + tm.Reset(rdr.Config().RunDelay) + } +} + func (rdr *FWVFileER) Serve() (err error) { switch rdr.Config().RunDelay { case time.Duration(0): // 0 disables the automatic read, maybe done per API @@ -89,35 +119,7 @@ func (rdr *FWVFileER) Serve() (err error) { return utils.WatchDir(rdr.rdrDir, rdr.processFile, utils.ERs, rdr.rdrExit) default: - go func() { - tm := time.NewTimer(0) - for { - // Not automated, process and sleep approach - select { - case <-rdr.rdrExit: - tm.Stop() - utils.Logger.Info( - fmt.Sprintf("<%s> stop monitoring path <%s>", - utils.ERs, rdr.rdrDir)) - return - case <-tm.C: - } - filesInDir, _ := os.ReadDir(rdr.rdrDir) - for _, file := range filesInDir { - if !strings.HasSuffix(file.Name(), utils.FWVSuffix) { // hardcoded file extension for xml event reader - continue // used in order to filter the files from directory - } - go func(fileName string) { - if err := rdr.processFile(rdr.rdrDir, fileName); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> processing file %s, error: %s", - utils.ERs, fileName, err.Error())) - } - }(file.Name()) - } - tm.Reset(rdr.Config().RunDelay) - } - }() + go rdr.serveDefault() } return } diff --git a/ers/filefwv_it_test.go b/ers/filefwv_it_test.go index 166a67780..71c2d0b46 100644 --- a/ers/filefwv_it_test.go +++ b/ers/filefwv_it_test.go @@ -381,6 +381,42 @@ func TestFileFWV(t *testing.T) { } } +func TestFileFWVServeDefault(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + fltrs := &engine.FilterS{} + eR := &FWVFileER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/fwvErs/out", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + filePath := "/tmp/fwvErs/out" + err := os.MkdirAll(filePath, 0777) + if err != nil { + t.Error(err) + } + for i := 1; i < 4; i++ { + if _, err := os.Create(path.Join(filePath, fmt.Sprintf("file%d.fwv", i))); err != nil { + t.Error(err) + } + } + os.Create(path.Join(filePath, "file1.txt")) + eR.Config().RunDelay = 1 * time.Millisecond + go func() { + time.Sleep(20 * time.Millisecond) + close(eR.rdrExit) + }() + eR.serveDefault() + if err := os.RemoveAll(filePath); err != nil { + t.Error(err) + } +} + func TestFileFWVExit(t *testing.T) { cfg := config.NewDefaultCGRConfig() fltrs := &engine.FilterS{} diff --git a/ers/filejson.go b/ers/filejson.go index 8cc86d8c2..a3a98f2c3 100644 --- a/ers/filejson.go +++ b/ers/filejson.go @@ -75,6 +75,36 @@ func (rdr *JSONFileER) Config() *config.EventReaderCfg { return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] } +func (rdr *JSONFileER) serveDefault() { + tm := time.NewTimer(0) + for { + // Not automated, process and sleep approach + select { + case <-rdr.rdrExit: + tm.Stop() + utils.Logger.Info( + fmt.Sprintf("<%s> stop monitoring path <%s>", + utils.ERs, rdr.rdrDir)) + return + case <-tm.C: + } + filesInDir, _ := os.ReadDir(rdr.rdrDir) + for _, file := range filesInDir { + if !strings.HasSuffix(file.Name(), utils.JSNSuffix) { // hardcoded file extension for json event reader + continue // used in order to filter the files from directory + } + go func(fileName string) { + if err := rdr.processFile(rdr.rdrDir, fileName); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> processing file %s, error: %s", + utils.ERs, fileName, err.Error())) + } + }(file.Name()) + } + tm.Reset(rdr.Config().RunDelay) + } +} + func (rdr *JSONFileER) Serve() (err error) { switch rdr.Config().RunDelay { case time.Duration(0): // 0 disables the automatic read, maybe done per API @@ -83,35 +113,7 @@ func (rdr *JSONFileER) Serve() (err error) { return utils.WatchDir(rdr.rdrDir, rdr.processFile, utils.ERs, rdr.rdrExit) default: - go func() { - tm := time.NewTimer(0) - for { - // Not automated, process and sleep approach - select { - case <-rdr.rdrExit: - tm.Stop() - utils.Logger.Info( - fmt.Sprintf("<%s> stop monitoring path <%s>", - utils.ERs, rdr.rdrDir)) - return - case <-tm.C: - } - filesInDir, _ := os.ReadDir(rdr.rdrDir) - for _, file := range filesInDir { - if !strings.HasSuffix(file.Name(), utils.JSNSuffix) { // hardcoded file extension for json event reader - continue // used in order to filter the files from directory - } - go func(fileName string) { - if err := rdr.processFile(rdr.rdrDir, fileName); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> processing file %s, error: %s", - utils.ERs, fileName, err.Error())) - } - }(file.Name()) - } - tm.Reset(rdr.Config().RunDelay) - } - }() + go rdr.serveDefault() } return } diff --git a/ers/filejson_it_test.go b/ers/filejson_it_test.go index deaa58d1b..0d4456080 100644 --- a/ers/filejson_it_test.go +++ b/ers/filejson_it_test.go @@ -524,6 +524,25 @@ func TestFileJSONProcessEventError3(t *testing.T) { } func TestFileJSON(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + fltrs := &engine.FilterS{} + eR := &JSONFileER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/ErsJSON/out/", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + if err := eR.Serve(); err != nil { + t.Error(err) + } +} + +func TestFileJSONServeDefault(t *testing.T) { cfg := config.NewDefaultCGRConfig() fltrs := &engine.FilterS{} eR := &JSONFileER{ @@ -548,12 +567,13 @@ func TestFileJSON(t *testing.T) { } } eR.Config().RunDelay = 1 * time.Millisecond - if err := eR.Serve(); err != nil { - t.Error(err) - } os.Create(path.Join(filePath, "file1.txt")) - eR.Config().RunDelay = 1 * time.Millisecond - if err := eR.Serve(); err != nil { + go func() { + time.Sleep(20 * time.Millisecond) + close(eR.rdrExit) + }() + eR.serveDefault() + if err := os.RemoveAll(filePath); err != nil { t.Error(err) } } diff --git a/ers/flatstore.go b/ers/flatstore.go index 35a502bd2..99bfaa622 100644 --- a/ers/flatstore.go +++ b/ers/flatstore.go @@ -82,6 +82,36 @@ func (rdr *FlatstoreER) Config() *config.EventReaderCfg { return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] } +func (rdr *FlatstoreER) serveDefault() { + tm := time.NewTimer(0) + for { + // Not automated, process and sleep approach + select { + case <-rdr.rdrExit: + tm.Stop() + utils.Logger.Info( + fmt.Sprintf("<%s> stop monitoring path <%s>", + utils.ERs, rdr.rdrDir)) + return + case <-tm.C: + } + filesInDir, _ := os.ReadDir(rdr.rdrDir) + for _, file := range filesInDir { + if !strings.HasSuffix(file.Name(), utils.CSVSuffix) { // hardcoded file extension for csv event reader + continue // used in order to filter the files from directory + } + go func(fileName string) { + if err := rdr.processFile(rdr.rdrDir, fileName); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> processing file %s, error: %s", + utils.ERs, fileName, err.Error())) + } + }(file.Name()) + } + tm.Reset(rdr.Config().RunDelay) + } +} + func (rdr *FlatstoreER) Serve() (err error) { switch rdr.Config().RunDelay { case time.Duration(0): // 0 disables the automatic read, maybe done per API @@ -90,35 +120,7 @@ func (rdr *FlatstoreER) Serve() (err error) { return utils.WatchDir(rdr.rdrDir, rdr.processFile, utils.ERs, rdr.rdrExit) default: - go func() { - tm := time.NewTimer(0) - for { - // Not automated, process and sleep approach - select { - case <-rdr.rdrExit: - tm.Stop() - utils.Logger.Info( - fmt.Sprintf("<%s> stop monitoring path <%s>", - utils.ERs, rdr.rdrDir)) - return - case <-tm.C: - } - filesInDir, _ := os.ReadDir(rdr.rdrDir) - for _, file := range filesInDir { - if !strings.HasSuffix(file.Name(), utils.CSVSuffix) { // hardcoded file extension for csv event reader - continue // used in order to filter the files from directory - } - go func(fileName string) { - if err := rdr.processFile(rdr.rdrDir, fileName); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> processing file %s, error: %s", - utils.ERs, fileName, err.Error())) - } - }(file.Name()) - } - tm.Reset(rdr.Config().RunDelay) - } - }() + go rdr.serveDefault() } return } diff --git a/ers/flatstore_it_test.go b/ers/flatstore_it_test.go index e622c4cfc..5909b2e53 100644 --- a/ers/flatstore_it_test.go +++ b/ers/flatstore_it_test.go @@ -619,6 +619,25 @@ func TestFlatstoreProcessEventDirError(t *testing.T) { } func TestFlatstore(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + fltrs := &engine.FilterS{} + eR := &FlatstoreER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/flatstoreErs/out", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + if err := eR.Serve(); err != nil { + t.Error(err) + } +} + +func TestFlatstoreServeDefault(t *testing.T) { cfg := config.NewDefaultCGRConfig() fltrs := &engine.FilterS{} eR := &FlatstoreER{ @@ -643,12 +662,13 @@ func TestFlatstore(t *testing.T) { } } eR.Config().RunDelay = 1 * time.Millisecond - if err := eR.Serve(); err != nil { - t.Error(err) - } os.Create(path.Join(filePath, "file1.txt")) - eR.Config().RunDelay = 1 * time.Millisecond - if err := eR.Serve(); err != nil { + go func() { + time.Sleep(20 * time.Millisecond) + close(eR.rdrExit) + }() + eR.serveDefault() + if err := os.RemoveAll(filePath); err != nil { t.Error(err) } } diff --git a/ers/readers_test.go b/ers/readers_test.go index 7542834db..49bd55a73 100644 --- a/ers/readers_test.go +++ b/ers/readers_test.go @@ -294,3 +294,67 @@ func TestNewAMQPv1Reader(t *testing.T) { t.Errorf("Expected \n%v but received \n%v", expected, rcv) } } + +func TestNewS3Reader(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + fltr := &engine.FilterS{} + cfg.ERsCfg().Readers[0].Type = utils.MetaS3jsonMap + cfg.ERsCfg().Readers[0].ConcurrentReqs = -1 + exp := &S3ER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltr, + rdrEvents: nil, + rdrExit: nil, + rdrErr: nil, + queueID: "cgrates_cdrs", + } + exp.Config().ProcessedPath = "" + exp.Config().Opts = map[string]interface{}{} + exp.createPoster() + var expected EventReader = exp + rcv, err := NewEventReader(cfg, 0, nil, nil, fltr, nil) + if err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expected, rcv) { + t.Errorf("Expected \n%v but received \n%v", expected, rcv) + } +} + +func TestNewSQSReader(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + fltr := &engine.FilterS{} + cfg.ERsCfg().Readers[0].Type = utils.MetaSQSjsonMap + cfg.ERsCfg().Readers[0].ConcurrentReqs = -1 + exp := &SQSER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltr, + rdrEvents: nil, + rdrExit: nil, + rdrErr: nil, + queueID: "cgrates_cdrs", + } + exp.Config().SourcePath = "string" + // var err error + // awsCfg := aws.Config{Endpoint: aws.String(exp.Config().SourcePath)} + // exp.session, err = session.NewSessionWithOptions( + // session.Options{ + // Config: awsCfg, + // }, + // ) + // if err != nil { + // t.Error(err) + // } + exp.Config().ProcessedPath = "" + exp.Config().Opts = map[string]interface{}{} + exp.createPoster() + var expected EventReader = exp + rcv, err := NewEventReader(cfg, 0, nil, nil, fltr, nil) + exp.session = rcv.(*SQSER).session + if err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expected, rcv) { + t.Errorf("Expected \n%v but received \n%v", expected, rcv) + } +} diff --git a/ers/s3_test.go b/ers/s3_test.go new file mode 100644 index 000000000..403d3439d --- /dev/null +++ b/ers/s3_test.go @@ -0,0 +1,266 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ers + +import ( + "reflect" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func TestS3ERServe(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr, err := NewS3ER(cfg, 0, nil, + nil, nil, nil) + if err != nil { + t.Error(err) + } + rdr.Config().RunDelay = 1 * time.Millisecond + if err := rdr.Serve(); err != nil { + t.Error(err) + } +} + +func TestS3ERServe2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &S3ER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: nil, + rdrEvents: nil, + rdrExit: nil, + rdrErr: nil, + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + if err := rdr.Serve(); err != nil { + t.Error(err) + } +} + +func TestS3ERProcessMessage(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &S3ER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}), + rdrErr: make(chan error, 1), + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + expEvent := &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + utils.CGRID: "testCgrId", + }, + APIOpts: map[string]interface{}{}, + } + body := []byte(`{"CGRID":"testCgrId"}`) + rdr.Config().Fields = []*config.FCTemplate{ + { + Tag: "CGRID", + Type: utils.MetaConstant, + Value: config.NewRSRParsersMustCompile("testCgrId", utils.InfieldSep), + Path: "*cgreq.CGRID", + }, + } + rdr.Config().Fields[0].ComputePath() + if err := rdr.processMessage(body); err != nil { + t.Error(err) + } + select { + case data := <-rdr.rdrEvents: + expEvent.ID = data.cgrEvent.ID + expEvent.Time = data.cgrEvent.Time + if !reflect.DeepEqual(data.cgrEvent, expEvent) { + t.Errorf("Expected %v but received %v", utils.ToJSON(expEvent), utils.ToJSON(data.cgrEvent)) + } + case <-time.After(50 * time.Millisecond): + t.Error("Time limit exceeded") + } +} + +func TestS3ERProcessMessageError1(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &S3ER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}), + rdrErr: make(chan error, 1), + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + rdr.Config().Fields = []*config.FCTemplate{ + {}, + } + body := []byte(`{"CGRID":"testCgrId"}`) + errExpect := "unsupported type: <>" + if err := rdr.processMessage(body); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } +} + +func TestS3ERProcessMessageError2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + data := engine.NewInternalDB(nil, nil, true) + dm := engine.NewDataManager(data, cfg.CacheCfg(), nil) + cfg.ERsCfg().Readers[0].ProcessedPath = "" + fltrs := engine.NewFilterS(cfg, nil, dm) + rdr := &S3ER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}), + rdrErr: make(chan error, 1), + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + body := []byte(`{"CGRID":"testCgrId"}`) + rdr.Config().Filters = []string{"Filter1"} + errExpect := "NOT_FOUND:Filter1" + if err := rdr.processMessage(body); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } + + // + rdr.Config().Filters = []string{"*exists:~*req..Account:"} + if err := rdr.processMessage(body); err != nil { + t.Error(err) + } +} + +func TestS3ERProcessMessageError3(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &S3ER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}), + rdrErr: make(chan error, 1), + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + body := []byte("invalid_format") + errExpect := "invalid character 'i' looking for beginning of value" + if err := rdr.processMessage(body); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } +} + +func TestS3ERParseOpts(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &S3ER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}), + rdrErr: make(chan error, 1), + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + + opts := map[string]interface{}{ + utils.QueueID: "QueueID", + utils.AWSRegion: "AWSRegion", + utils.AWSKey: "AWSKey", + utils.AWSSecret: "AWSSecret", + utils.AWSToken: "AWSToken", + } + rdr.parseOpts(opts) + if rdr.queueID != opts[utils.QueueID] || rdr.awsRegion != opts[utils.AWSRegion] || rdr.awsID != opts[utils.AWSKey] || rdr.awsKey != opts[utils.AWSSecret] || rdr.awsToken != opts[utils.AWSToken] { + t.Error("Fields do not corespond") + } + rdr.Config().Opts = map[string]interface{}{} + rdr.Config().ProcessedPath = utils.EmptyString + rdr.createPoster() +} + +func TestS3ERIsClosed(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &S3ER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + if rcv := rdr.isClosed(); rcv != false { + t.Errorf("Expected %v but received %v", false, true) + } + rdr.rdrExit <- struct{}{} + if rcv := rdr.isClosed(); rcv != true { + t.Errorf("Expected %v but received %v", true, false) + } +} diff --git a/ers/sqs_test.go b/ers/sqs_test.go index bd1400793..0e3c5fb1d 100644 --- a/ers/sqs_test.go +++ b/ers/sqs_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) @@ -121,3 +122,204 @@ func TestSQSERServe(t *testing.T) { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, result) } } + +func TestSQSERProcessMessage(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &SQSER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}), + rdrErr: make(chan error, 1), + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + expEvent := &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + utils.CGRID: "testCgrId", + }, + APIOpts: map[string]interface{}{}, + } + body := []byte(`{"CGRID":"testCgrId"}`) + rdr.Config().Fields = []*config.FCTemplate{ + { + Tag: "CGRID", + Type: utils.MetaConstant, + Value: config.NewRSRParsersMustCompile("testCgrId", utils.InfieldSep), + Path: "*cgreq.CGRID", + }, + } + rdr.Config().Fields[0].ComputePath() + if err := rdr.processMessage(body); err != nil { + t.Error(err) + } + select { + case data := <-rdr.rdrEvents: + expEvent.ID = data.cgrEvent.ID + expEvent.Time = data.cgrEvent.Time + if !reflect.DeepEqual(data.cgrEvent, expEvent) { + t.Errorf("Expected %v but received %v", utils.ToJSON(expEvent), utils.ToJSON(data.cgrEvent)) + } + case <-time.After(50 * time.Millisecond): + t.Error("Time limit exceeded") + } +} + +func TestSQSERProcessMessageError1(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &SQSER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}), + rdrErr: make(chan error, 1), + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + rdr.Config().Fields = []*config.FCTemplate{ + {}, + } + body := []byte(`{"CGRID":"testCgrId"}`) + errExpect := "unsupported type: <>" + if err := rdr.processMessage(body); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } +} + +func TestSQSERProcessMessageError2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + data := engine.NewInternalDB(nil, nil, true) + dm := engine.NewDataManager(data, cfg.CacheCfg(), nil) + cfg.ERsCfg().Readers[0].ProcessedPath = "" + fltrs := engine.NewFilterS(cfg, nil, dm) + rdr := &SQSER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}), + rdrErr: make(chan error, 1), + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + body := []byte(`{"CGRID":"testCgrId"}`) + rdr.Config().Filters = []string{"Filter1"} + errExpect := "NOT_FOUND:Filter1" + if err := rdr.processMessage(body); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } + + // + rdr.Config().Filters = []string{"*exists:~*req..Account:"} + if err := rdr.processMessage(body); err != nil { + t.Error(err) + } +} + +func TestSQSERProcessMessageError3(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &SQSER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}), + rdrErr: make(chan error, 1), + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + body := []byte("invalid_format") + errExpect := "invalid character 'i' looking for beginning of value" + if err := rdr.processMessage(body); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } +} + +func TestSQSERParseOpts(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &SQSER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}), + rdrErr: make(chan error, 1), + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + + opts := map[string]interface{}{ + utils.QueueID: "QueueID", + utils.AWSRegion: "AWSRegion", + utils.AWSKey: "AWSKey", + utils.AWSSecret: "AWSSecret", + utils.AWSToken: "AWSToken", + } + rdr.parseOpts(opts) + if rdr.queueID != opts[utils.QueueID] || rdr.awsRegion != opts[utils.AWSRegion] || rdr.awsID != opts[utils.AWSKey] || rdr.awsKey != opts[utils.AWSSecret] || rdr.awsToken != opts[utils.AWSToken] { + t.Error("Fields do not corespond") + } + rdr.Config().Opts = map[string]interface{}{} + rdr.Config().ProcessedPath = utils.EmptyString + rdr.createPoster() +} + +func TestSQSERIsClosed(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &SQSER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: new(engine.FilterS), + rdrEvents: make(chan *erEvent, 1), + rdrExit: make(chan struct{}, 1), + rdrErr: make(chan error, 1), + cap: nil, + awsRegion: "us-east-2", + awsID: "AWSId", + awsKey: "AWSAccessKeyId", + awsToken: "", + queueID: "cgrates_cdrs", + session: nil, + poster: nil, + } + if rcv := rdr.isClosed(); rcv != false { + t.Errorf("Expected %v but received %v", false, true) + } + rdr.rdrExit <- struct{}{} + if rcv := rdr.isClosed(); rcv != true { + t.Errorf("Expected %v but received %v", true, false) + } +}