Coverage tests for ers

This commit is contained in:
nickolasdaniel
2021-04-12 14:48:12 +03:00
committed by Dan Christian Bogos
parent 26cb5c0133
commit 3fd5237e14
5 changed files with 432 additions and 49 deletions

View File

@@ -458,7 +458,7 @@ func TestFileCSVProcessEventError(t *testing.T) {
}
}
func TestFileCSV(t *testing.T) {
func TestFileCSVDirErr(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fltrs := &engine.FilterS{}
eR := &CSVFileER{
@@ -477,6 +477,21 @@ func TestFileCSV(t *testing.T) {
if err := eR.Serve(); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
}
func TestFileCSV(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fltrs := &engine.FilterS{}
eR := &CSVFileER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/ers/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
eR.conReqs <- struct{}{}
filePath := "/tmp/ers/out/"
err := os.MkdirAll(filePath, 0777)
if err != nil {
@@ -491,6 +506,11 @@ func TestFileCSV(t *testing.T) {
if err := eR.Serve(); err != nil {
t.Error(err)
}
os.Create(path.Join(filePath, "file1.txt"))
eR.Config().RunDelay = 1 * time.Millisecond
if err := eR.Serve(); err != nil {
t.Error(err)
}
}
func TestFileCSVExit(t *testing.T) {

View File

@@ -21,6 +21,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ers
import (
"fmt"
"net/rpc"
"os"
"path"
@@ -268,3 +269,135 @@ func TestFWVFileConfig(t *testing.T) {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
}
}
func TestFileFWVProcessEvent(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.ERsCfg().Readers[0].ProcessedPath = ""
fltrs := &engine.FilterS{}
filePath := "/tmp/TestFileFWVProcessEvent/"
if err := os.MkdirAll(filePath, 0777); err != nil {
t.Error(err)
}
file, err := os.Create(path.Join(filePath, "file1.fwv"))
if err != nil {
t.Error(err)
}
file.Write([]byte("test,test2"))
file.Close()
eR := &FWVFileER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/fwvErs/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
eR.conReqs <- struct{}{}
fname := "file1.fwv"
errExpect := "unsupported field prefix: <> when set fields"
eR.Config().Fields = []*config.FCTemplate{
{
Value: config.RSRParsers{
{
Rules: "~*hdr",
},
},
Type: utils.MetaRemove,
// Path: utils.MetaVars,
},
}
eR.Config().Fields[0].ComputePath()
if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
if err := os.RemoveAll(filePath); err != nil {
t.Error(err)
}
}
func TestFileFWVServeErrTimeDuration0(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfgIdx := 0
rdr, err := NewFWVFileER(cfg, cfgIdx, nil, nil, nil, nil)
if err != nil {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
}
rdr.Config().RunDelay = time.Duration(0)
result := rdr.Serve()
if !reflect.DeepEqual(result, nil) {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, result)
}
}
func TestFileFWVServeErrTimeDurationNeg1(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfgIdx := 0
rdr, err := NewFWVFileER(cfg, cfgIdx, nil, nil, nil, nil)
if err != nil {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
}
rdr.Config().RunDelay = time.Duration(-1)
expected := "no such file or directory"
err = rdr.Serve()
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
}
}
func TestFileFWV(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fltrs := &engine.FilterS{}
eR := &FWVFileER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/fwvErs/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
eR.conReqs <- struct{}{}
filePath := "/tmp/fwvErs/out"
err := os.MkdirAll(filePath, 0777)
if err != nil {
t.Error(err)
}
for i := 1; i < 4; i++ {
if _, err := os.Create(path.Join(filePath, fmt.Sprintf("file%d.fwv", i))); err != nil {
t.Error(err)
}
}
eR.Config().RunDelay = 1 * time.Millisecond
if err := eR.Serve(); err != nil {
t.Error(err)
}
os.Create(path.Join(filePath, "file1.txt"))
eR.Config().RunDelay = 1 * time.Millisecond
if err := eR.Serve(); err != nil {
t.Error(err)
}
}
func TestFileFWVExit(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fltrs := &engine.FilterS{}
eR := &FWVFileER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/fwvErs/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
eR.conReqs <- struct{}{}
eR.Config().RunDelay = 1 * time.Millisecond
if err := eR.Serve(); err != nil {
t.Error(err)
}
eR.rdrExit <- struct{}{}
}

View File

@@ -21,9 +21,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ers
import (
"encoding/json"
"fmt"
"net/rpc"
"os"
"path"
"reflect"
"testing"
"time"
@@ -226,7 +229,6 @@ func testJSONKillEngine(t *testing.T) {
}
}
/*
func TestFileJSONServeErrTimeDuration0(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfgIdx := 0
@@ -241,7 +243,6 @@ func TestFileJSONServeErrTimeDuration0(t *testing.T) {
}
}
func TestFileJSONServeErrTimeDurationNeg1(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfgIdx := 0
@@ -257,47 +258,203 @@ func TestFileJSONServeErrTimeDurationNeg1(t *testing.T) {
}
}
func TestFileJSONServeTimeDefault(t *testing.T) {
// func TestFileJSONServeTimeDefault(t *testing.T) {
// cfg := config.NewDefaultCGRConfig()
// cfgIdx := 0
// rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil)
// if err != nil {
// t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
// }
// rdr.Config().RunDelay = time.Duration(1)
// result := rdr.Serve()
// if !reflect.DeepEqual(result, nil) {
// t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, result)
// }
// }
// func TestFileJSONServeTimeDefaultChanExit(t *testing.T) {
// cfg := config.NewDefaultCGRConfig()
// cfgIdx := 0
// rdrExit := make(chan struct{}, 1)
// rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, rdrExit)
// if err != nil {
// t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
// }
// rdrExit <- struct{}{}
// rdr.Config().RunDelay = time.Duration(1)
// result := rdr.Serve()
// if !reflect.DeepEqual(result, nil) {
// t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, result)
// }
// }
// func TestFileJSONProcessFile(t *testing.T) {
// cfg := config.NewDefaultCGRConfig()
// cfgIdx := 0
// rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil)
// if err != nil {
// t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
// }
// expected := "open : no such file or directory"
// err2 := rdr.(*JSONFileER).processFile("", "")
// if err2 == nil || err2.Error() != expected {
// t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err2)
// }
// }
func TestFileJSONProcessEvent(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfgIdx := 0
rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil)
if err != nil {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
cfg.ERsCfg().Readers[0].ProcessedPath = ""
fltrs := &engine.FilterS{}
filePath := "/tmp/TestFileJSONProcessEvent/"
if err := os.MkdirAll(filePath, 0777); err != nil {
t.Error(err)
}
rdr.Config().RunDelay = time.Duration(1)
result := rdr.Serve()
if !reflect.DeepEqual(result, nil) {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, result)
file, err := os.Create(path.Join(filePath, "file1.json"))
if err != nil {
t.Error(err)
}
fcTemp := map[string]interface{}{
"2": "tor_test",
"3": "originid_test",
"4": "requestType_test",
"6": "tenant_test",
"7": "category_test",
"8": "account_test",
"9": "subject_test",
"10": "destination_test",
"11": "setupTime_test",
"12": "answerTime_test",
"13": "usage_test",
}
rcv, err := json.Marshal(fcTemp)
if err != nil {
t.Error(err)
}
file.Write([]byte(rcv))
file.Close()
eR := &JSONFileER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/ErsJSON/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
expEvent := &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
utils.AccountField: "account_test",
utils.AnswerTime: "answerTime_test",
utils.Category: "category_test",
utils.Destination: "destination_test",
utils.OriginID: "originid_test",
utils.RequestType: "requestType_test",
utils.SetupTime: "setupTime_test",
utils.Subject: "subject_test",
utils.Tenant: "tenant_test",
utils.ToR: "tor_test",
utils.Usage: "usage_test",
},
APIOpts: map[string]interface{}{},
}
// expEvent := &utils.CGREvent{}
eR.conReqs <- struct{}{}
fname := "file1.json"
if err := eR.processFile(filePath, fname); err != nil {
t.Error(err)
}
select {
case data := <-eR.rdrEvents:
expEvent.ID = data.cgrEvent.ID
expEvent.Time = data.cgrEvent.Time
if !reflect.DeepEqual(data.cgrEvent, expEvent) {
t.Errorf("Expected %v but received %v", utils.ToJSON(expEvent), utils.ToJSON(data.cgrEvent))
}
case <-time.After(50 * time.Millisecond):
t.Error("Time limit exceeded")
}
if err := os.RemoveAll(filePath); err != nil {
t.Error(err)
}
}
func TestFileJSONServeTimeDefaultChanExit(t *testing.T) {
func TestFileJSONProcessEventReadError(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfgIdx := 0
rdrExit := make(chan struct{}, 1)
rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, rdrExit)
if err != nil {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
fltrs := &engine.FilterS{}
filePath := "/tmp/TestFileJSONProcessEvent/"
fname := "file2.json"
eR := &CSVFileER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/ErsJSON/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
rdrExit <- struct{}{}
rdr.Config().RunDelay = time.Duration(1)
result := rdr.Serve()
if !reflect.DeepEqual(result, nil) {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, result)
eR.conReqs <- struct{}{}
errExpect := "open /tmp/TestFileJSONProcessEvent/file2.json: no such file or directory"
if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
}
func TestFileJSONProcessFile(t *testing.T) {
func TestFileJSON(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfgIdx := 0
rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil)
if err != nil {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
fltrs := &engine.FilterS{}
eR := &JSONFileER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/ErsJSON/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
expected := "open : no such file or directory"
err2 := rdr.(*JSONFileER).processFile("", "")
if err2 == nil || err2.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err2)
eR.conReqs <- struct{}{}
filePath := "/tmp/ErsJSON/out/"
err := os.MkdirAll(filePath, 0777)
if err != nil {
t.Error(err)
}
for i := 1; i < 4; i++ {
if _, err := os.Create(path.Join(filePath, fmt.Sprintf("file%d.json", i))); err != nil {
t.Error(err)
}
}
eR.Config().RunDelay = 1 * time.Millisecond
if err := eR.Serve(); err != nil {
t.Error(err)
}
os.Create(path.Join(filePath, "file1.txt"))
eR.Config().RunDelay = 1 * time.Millisecond
if err := eR.Serve(); err != nil {
t.Error(err)
}
}
*/
func TestFileJSONExit(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fltrs := &engine.FilterS{}
eR := &JSONFileER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/ErsJSON/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
eR.conReqs <- struct{}{}
eR.Config().RunDelay = 1 * time.Millisecond
if err := eR.Serve(); err != nil {
t.Error(err)
}
eR.rdrExit <- struct{}{}
}

View File

@@ -96,6 +96,23 @@ func (rdr *SQLEventReader) Config() *config.EventReaderCfg {
return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx]
}
func (rdr *SQLEventReader) openDB(dialect gorm.Dialector) (err error) {
var db *gorm.DB
if db, err = gorm.Open(dialect, &gorm.Config{AllowGlobalUpdate: true}); err != nil {
return
}
var sqlDB *sql.DB
if sqlDB, err = db.DB(); err != nil {
return
}
sqlDB.SetMaxOpenConns(10)
if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API
return
}
go rdr.readLoop(db, sqlDB) // read until the connection is closed
return
}
// Serve will start the gorutines needed to watch the sql topic
func (rdr *SQLEventReader) Serve() (err error) {
var dialect gorm.Dialector
@@ -107,22 +124,7 @@ func (rdr *SQLEventReader) Serve() (err error) {
default:
return fmt.Errorf("db type <%s> not supported", rdr.connType)
}
var db *gorm.DB
if db, err = gorm.Open(dialect, &gorm.Config{AllowGlobalUpdate: true}); err != nil {
return
}
var sqlDB *sql.DB
if sqlDB, err = db.DB(); err != nil {
return
}
sqlDB.SetMaxOpenConns(10)
if err = sqlDB.Ping(); err != nil {
return
}
if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API
return
}
go rdr.readLoop(db, sqlDB) // read until the connection is closed
err = rdr.openDB(dialect)
return
}

View File

@@ -31,7 +31,9 @@ import (
"github.com/cgrates/cgrates/utils"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"gorm.io/gorm/logger"
"gorm.io/gorm/schema"
)
var (
@@ -688,3 +690,72 @@ func TestErsSqlPostCDRS(t *testing.T) {
}
logger.Default = tmp
}
// type mockDialector interface {
// Name() string
// Initialize(*gorm.DB) error
// Migrator(db *gorm.DB) gorm.Migrator
// DataTypeOf(*schema.Field) string
// DefaultValueOf(*schema.Field) clause.Expression
// BindVarTo(writer clause.Writer, stmt *Statement, v interface{})
// QuoteTo(clause.Writer, string)
// Explain(sql string, vars ...interface{}) string
// }
type mockDialect struct{}
func (mockDialect) Name() string { return "" }
func (mockDialect) Initialize(db *gorm.DB) error { return nil }
func (mockDialect) Migrator(db *gorm.DB) gorm.Migrator { return nil }
func (mockDialect) DataTypeOf(*schema.Field) string { return "" }
func (mockDialect) DefaultValueOf(*schema.Field) clause.Expression { return nil }
func (mockDialect) BindVarTo(writer clause.Writer, stmt *gorm.Statement, v interface{}) { return }
func (mockDialect) QuoteTo(clause.Writer, string) { return }
func (mockDialect) Explain(sql string, vars ...interface{}) string { return "" }
func TestMockOpenDB(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
sqlRdr := &SQLEventReader{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: &engine.FilterS{},
connString: "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/cgrates?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
connType: utils.MySQL,
tableName: "testName",
expConnString: "",
expConnType: utils.Postgres,
expTableName: "",
rdrEvents: nil,
rdrExit: nil,
rdrErr: nil,
cap: nil,
}
var mckDlct mockDialect
errExpect := "invalid db"
if err := sqlRdr.openDB(mckDlct); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
}
func TestSQLServeErr1(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
sqlRdr := &SQLEventReader{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: &engine.FilterS{},
connString: "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/cgrates?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
connType: utils.MySQL,
tableName: "testName",
expConnString: "",
expConnType: utils.Postgres,
expTableName: "",
rdrEvents: nil,
rdrExit: nil,
rdrErr: nil,
cap: nil,
}
cfg.StorDbCfg().Password = "CGRateS.org"
sqlRdr.Config().RunDelay = time.Duration(0)
if err := sqlRdr.Serve(); err != nil {
t.Error(err)
}
}