mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Coverage tests for ers
This commit is contained in:
committed by
Dan Christian Bogos
parent
4b132a40eb
commit
2382e547dc
148
ers/amqpv1_test.go
Normal file
148
ers/amqpv1_test.go
Normal file
@@ -0,0 +1,148 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package ers
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func TestAMQPv1ERProcessMessage(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
rdr := &AMQPv1ER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: 0,
|
||||
fltrS: new(engine.FilterS),
|
||||
rdrEvents: make(chan *erEvent, 1),
|
||||
rdrExit: make(chan struct{}),
|
||||
rdrErr: make(chan error, 1),
|
||||
cap: nil,
|
||||
queueID: "cgrates_cdrs",
|
||||
poster: nil,
|
||||
}
|
||||
expEvent := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
Event: map[string]interface{}{
|
||||
utils.CGRID: "testCgrId",
|
||||
},
|
||||
APIOpts: map[string]interface{}{},
|
||||
}
|
||||
body := []byte(`{"CGRID":"testCgrId"}`)
|
||||
rdr.Config().Fields = []*config.FCTemplate{
|
||||
{
|
||||
Tag: "CGRID",
|
||||
Type: utils.MetaConstant,
|
||||
Value: config.NewRSRParsersMustCompile("testCgrId", utils.InfieldSep),
|
||||
Path: "*cgreq.CGRID",
|
||||
},
|
||||
}
|
||||
rdr.Config().Fields[0].ComputePath()
|
||||
if err := rdr.processMessage(body); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
select {
|
||||
case data := <-rdr.rdrEvents:
|
||||
expEvent.ID = data.cgrEvent.ID
|
||||
expEvent.Time = data.cgrEvent.Time
|
||||
if !reflect.DeepEqual(data.cgrEvent, expEvent) {
|
||||
t.Errorf("Expected %v but received %v", utils.ToJSON(expEvent), utils.ToJSON(data.cgrEvent))
|
||||
}
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
t.Error("Time limit exceeded")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAMQPv1ERProcessMessageError1(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
rdr := &AMQPv1ER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: 0,
|
||||
fltrS: new(engine.FilterS),
|
||||
rdrEvents: make(chan *erEvent, 1),
|
||||
rdrExit: make(chan struct{}),
|
||||
rdrErr: make(chan error, 1),
|
||||
cap: nil,
|
||||
queueID: "cgrates_cdrs",
|
||||
poster: nil,
|
||||
}
|
||||
rdr.Config().Fields = []*config.FCTemplate{
|
||||
{},
|
||||
}
|
||||
body := []byte(`{"CGRID":"testCgrId"}`)
|
||||
errExpect := "unsupported type: <>"
|
||||
if err := rdr.processMessage(body); err == nil || err.Error() != errExpect {
|
||||
t.Errorf("Expected %v but received %v", errExpect, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAMQPv1ERProcessMessageError2(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
data := engine.NewInternalDB(nil, nil, true)
|
||||
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
|
||||
cfg.ERsCfg().Readers[0].ProcessedPath = ""
|
||||
fltrs := engine.NewFilterS(cfg, nil, dm)
|
||||
rdr := &AMQPv1ER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: 0,
|
||||
fltrS: fltrs,
|
||||
rdrEvents: make(chan *erEvent, 1),
|
||||
rdrExit: make(chan struct{}),
|
||||
rdrErr: make(chan error, 1),
|
||||
cap: nil,
|
||||
queueID: "cgrates_cdrs",
|
||||
poster: nil,
|
||||
}
|
||||
body := []byte(`{"CGRID":"testCgrId"}`)
|
||||
rdr.Config().Filters = []string{"Filter1"}
|
||||
errExpect := "NOT_FOUND:Filter1"
|
||||
if err := rdr.processMessage(body); err == nil || err.Error() != errExpect {
|
||||
t.Errorf("Expected %v but received %v", errExpect, err)
|
||||
}
|
||||
|
||||
//
|
||||
rdr.Config().Filters = []string{"*exists:~*req..Account:"}
|
||||
if err := rdr.processMessage(body); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAMQPv1ERProcessMessageError3(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
rdr := &AMQPv1ER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: 0,
|
||||
fltrS: new(engine.FilterS),
|
||||
rdrEvents: make(chan *erEvent, 1),
|
||||
rdrExit: make(chan struct{}),
|
||||
rdrErr: make(chan error, 1),
|
||||
cap: nil,
|
||||
queueID: "cgrates_cdrs",
|
||||
poster: nil,
|
||||
}
|
||||
body := []byte("invalid_format")
|
||||
errExpect := "invalid character 'i' looking for beginning of value"
|
||||
if err := rdr.processMessage(body); err == nil || err.Error() != errExpect {
|
||||
t.Errorf("Expected %v but received %v", errExpect, err)
|
||||
}
|
||||
}
|
||||
15
ers/s3.go
15
ers/s3.go
@@ -80,6 +80,12 @@ type S3ER struct {
|
||||
poster engine.Poster
|
||||
}
|
||||
|
||||
type s3Client interface {
|
||||
ListObjectsV2Pages(input *s3.ListObjectsV2Input, fn func(*s3.ListObjectsV2Output, bool) bool) error
|
||||
GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error)
|
||||
DeleteObject(input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error)
|
||||
}
|
||||
|
||||
// Config returns the curent configuration
|
||||
func (rdr *S3ER) Config() *config.EventReaderCfg {
|
||||
return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx]
|
||||
@@ -104,8 +110,7 @@ func (rdr *S3ER) Serve() (err error) {
|
||||
if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API
|
||||
return
|
||||
}
|
||||
|
||||
go rdr.readLoop() // read until the connection is closed
|
||||
go rdr.readLoop(s3.New(rdr.session)) // read until the connection is closed
|
||||
return
|
||||
}
|
||||
|
||||
@@ -157,8 +162,7 @@ func (rdr *S3ER) parseOpts(opts map[string]interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func (rdr *S3ER) readLoop() (err error) {
|
||||
scv := s3.New(rdr.session)
|
||||
func (rdr *S3ER) readLoop(scv s3Client) (err error) {
|
||||
var keys []string
|
||||
if err = scv.ListObjectsV2Pages(&s3.ListObjectsV2Input{Bucket: aws.String(rdr.queueID)},
|
||||
func(lovo *s3.ListObjectsV2Output, b bool) bool {
|
||||
@@ -200,7 +204,7 @@ func (rdr *S3ER) isClosed() bool {
|
||||
}
|
||||
}
|
||||
|
||||
func (rdr *S3ER) readMsg(scv *s3.S3, key string) (err error) {
|
||||
func (rdr *S3ER) readMsg(scv s3Client, key string) (err error) {
|
||||
if rdr.Config().ConcurrentReqs != -1 {
|
||||
<-rdr.cap // do not try to read if the limit is reached
|
||||
defer func() { rdr.cap <- struct{}{} }()
|
||||
@@ -210,6 +214,7 @@ func (rdr *S3ER) readMsg(scv *s3.S3, key string) (err error) {
|
||||
}
|
||||
|
||||
obj, err := scv.GetObject(&s3.GetObjectInput{Bucket: &rdr.queueID, Key: &key})
|
||||
fmt.Println(obj)
|
||||
if err != nil {
|
||||
rdr.rdrErr <- err
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user