From 3fd5237e14cc251b00d703e8b43d50a94cf40be6 Mon Sep 17 00:00:00 2001 From: nickolasdaniel Date: Mon, 12 Apr 2021 14:48:12 +0300 Subject: [PATCH] Coverage tests for ers --- ers/filecsv_it_test.go | 22 +++- ers/filefwv_it_test.go | 133 ++++++++++++++++++++++++ ers/filejson_it_test.go | 221 ++++++++++++++++++++++++++++++++++------ ers/sql.go | 34 ++++--- ers/sql_it_test.go | 71 +++++++++++++ 5 files changed, 432 insertions(+), 49 deletions(-) diff --git a/ers/filecsv_it_test.go b/ers/filecsv_it_test.go index 7f8d17955..be1acced2 100644 --- a/ers/filecsv_it_test.go +++ b/ers/filecsv_it_test.go @@ -458,7 +458,7 @@ func TestFileCSVProcessEventError(t *testing.T) { } } -func TestFileCSV(t *testing.T) { +func TestFileCSVDirErr(t *testing.T) { cfg := config.NewDefaultCGRConfig() fltrs := &engine.FilterS{} eR := &CSVFileER{ @@ -477,6 +477,21 @@ func TestFileCSV(t *testing.T) { if err := eR.Serve(); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } +} +func TestFileCSV(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + fltrs := &engine.FilterS{} + eR := &CSVFileER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/ers/out/", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} filePath := "/tmp/ers/out/" err := os.MkdirAll(filePath, 0777) if err != nil { @@ -491,6 +506,11 @@ func TestFileCSV(t *testing.T) { if err := eR.Serve(); err != nil { t.Error(err) } + os.Create(path.Join(filePath, "file1.txt")) + eR.Config().RunDelay = 1 * time.Millisecond + if err := eR.Serve(); err != nil { + t.Error(err) + } } func TestFileCSVExit(t *testing.T) { diff --git a/ers/filefwv_it_test.go b/ers/filefwv_it_test.go index b68dfd6fd..166a67780 100644 --- a/ers/filefwv_it_test.go +++ b/ers/filefwv_it_test.go @@ -21,6 +21,7 @@ along with this program. If not, see package ers import ( + "fmt" "net/rpc" "os" "path" @@ -268,3 +269,135 @@ func TestFWVFileConfig(t *testing.T) { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) } } + +func TestFileFWVProcessEvent(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.ERsCfg().Readers[0].ProcessedPath = "" + fltrs := &engine.FilterS{} + filePath := "/tmp/TestFileFWVProcessEvent/" + if err := os.MkdirAll(filePath, 0777); err != nil { + t.Error(err) + } + file, err := os.Create(path.Join(filePath, "file1.fwv")) + if err != nil { + t.Error(err) + } + file.Write([]byte("test,test2")) + file.Close() + eR := &FWVFileER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/fwvErs/out", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + fname := "file1.fwv" + errExpect := "unsupported field prefix: <> when set fields" + eR.Config().Fields = []*config.FCTemplate{ + { + Value: config.RSRParsers{ + { + Rules: "~*hdr", + }, + }, + Type: utils.MetaRemove, + // Path: utils.MetaVars, + }, + } + eR.Config().Fields[0].ComputePath() + if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } + if err := os.RemoveAll(filePath); err != nil { + t.Error(err) + } +} + +func TestFileFWVServeErrTimeDuration0(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfgIdx := 0 + rdr, err := NewFWVFileER(cfg, cfgIdx, nil, nil, nil, nil) + if err != nil { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) + } + rdr.Config().RunDelay = time.Duration(0) + result := rdr.Serve() + if !reflect.DeepEqual(result, nil) { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, result) + } +} + +func TestFileFWVServeErrTimeDurationNeg1(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfgIdx := 0 + rdr, err := NewFWVFileER(cfg, cfgIdx, nil, nil, nil, nil) + if err != nil { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) + } + rdr.Config().RunDelay = time.Duration(-1) + expected := "no such file or directory" + err = rdr.Serve() + if err == nil || err.Error() != expected { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) + } +} + +func TestFileFWV(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + fltrs := &engine.FilterS{} + eR := &FWVFileER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/fwvErs/out", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + filePath := "/tmp/fwvErs/out" + err := os.MkdirAll(filePath, 0777) + if err != nil { + t.Error(err) + } + for i := 1; i < 4; i++ { + if _, err := os.Create(path.Join(filePath, fmt.Sprintf("file%d.fwv", i))); err != nil { + t.Error(err) + } + } + eR.Config().RunDelay = 1 * time.Millisecond + if err := eR.Serve(); err != nil { + t.Error(err) + } + os.Create(path.Join(filePath, "file1.txt")) + eR.Config().RunDelay = 1 * time.Millisecond + if err := eR.Serve(); err != nil { + t.Error(err) + } +} + +func TestFileFWVExit(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + fltrs := &engine.FilterS{} + eR := &FWVFileER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/fwvErs/out", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + eR.Config().RunDelay = 1 * time.Millisecond + if err := eR.Serve(); err != nil { + t.Error(err) + } + eR.rdrExit <- struct{}{} +} diff --git a/ers/filejson_it_test.go b/ers/filejson_it_test.go index 049918550..88b7ac1e5 100644 --- a/ers/filejson_it_test.go +++ b/ers/filejson_it_test.go @@ -21,9 +21,12 @@ along with this program. If not, see package ers import ( + "encoding/json" + "fmt" "net/rpc" "os" "path" + "reflect" "testing" "time" @@ -226,7 +229,6 @@ func testJSONKillEngine(t *testing.T) { } } -/* func TestFileJSONServeErrTimeDuration0(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfgIdx := 0 @@ -241,7 +243,6 @@ func TestFileJSONServeErrTimeDuration0(t *testing.T) { } } - func TestFileJSONServeErrTimeDurationNeg1(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfgIdx := 0 @@ -257,47 +258,203 @@ func TestFileJSONServeErrTimeDurationNeg1(t *testing.T) { } } -func TestFileJSONServeTimeDefault(t *testing.T) { +// func TestFileJSONServeTimeDefault(t *testing.T) { +// cfg := config.NewDefaultCGRConfig() +// cfgIdx := 0 +// rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil) +// if err != nil { +// t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) +// } +// rdr.Config().RunDelay = time.Duration(1) +// result := rdr.Serve() +// if !reflect.DeepEqual(result, nil) { +// t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, result) +// } +// } + +// func TestFileJSONServeTimeDefaultChanExit(t *testing.T) { +// cfg := config.NewDefaultCGRConfig() +// cfgIdx := 0 +// rdrExit := make(chan struct{}, 1) +// rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, rdrExit) +// if err != nil { +// t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) +// } +// rdrExit <- struct{}{} +// rdr.Config().RunDelay = time.Duration(1) +// result := rdr.Serve() +// if !reflect.DeepEqual(result, nil) { +// t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, result) +// } +// } + +// func TestFileJSONProcessFile(t *testing.T) { +// cfg := config.NewDefaultCGRConfig() +// cfgIdx := 0 +// rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil) +// if err != nil { +// t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) +// } +// expected := "open : no such file or directory" +// err2 := rdr.(*JSONFileER).processFile("", "") +// if err2 == nil || err2.Error() != expected { +// t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err2) +// } +// } + +func TestFileJSONProcessEvent(t *testing.T) { cfg := config.NewDefaultCGRConfig() - cfgIdx := 0 - rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil) - if err != nil { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) + cfg.ERsCfg().Readers[0].ProcessedPath = "" + fltrs := &engine.FilterS{} + filePath := "/tmp/TestFileJSONProcessEvent/" + if err := os.MkdirAll(filePath, 0777); err != nil { + t.Error(err) } - rdr.Config().RunDelay = time.Duration(1) - result := rdr.Serve() - if !reflect.DeepEqual(result, nil) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, result) + file, err := os.Create(path.Join(filePath, "file1.json")) + if err != nil { + t.Error(err) + } + fcTemp := map[string]interface{}{ + "2": "tor_test", + "3": "originid_test", + "4": "requestType_test", + "6": "tenant_test", + "7": "category_test", + "8": "account_test", + "9": "subject_test", + "10": "destination_test", + "11": "setupTime_test", + "12": "answerTime_test", + "13": "usage_test", + } + rcv, err := json.Marshal(fcTemp) + if err != nil { + t.Error(err) + } + file.Write([]byte(rcv)) + file.Close() + eR := &JSONFileER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/ErsJSON/out/", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + expEvent := &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + utils.AccountField: "account_test", + utils.AnswerTime: "answerTime_test", + utils.Category: "category_test", + utils.Destination: "destination_test", + utils.OriginID: "originid_test", + utils.RequestType: "requestType_test", + utils.SetupTime: "setupTime_test", + utils.Subject: "subject_test", + utils.Tenant: "tenant_test", + utils.ToR: "tor_test", + utils.Usage: "usage_test", + }, + APIOpts: map[string]interface{}{}, + } + // expEvent := &utils.CGREvent{} + eR.conReqs <- struct{}{} + fname := "file1.json" + if err := eR.processFile(filePath, fname); err != nil { + t.Error(err) + } + select { + case data := <-eR.rdrEvents: + expEvent.ID = data.cgrEvent.ID + expEvent.Time = data.cgrEvent.Time + if !reflect.DeepEqual(data.cgrEvent, expEvent) { + t.Errorf("Expected %v but received %v", utils.ToJSON(expEvent), utils.ToJSON(data.cgrEvent)) + } + case <-time.After(50 * time.Millisecond): + t.Error("Time limit exceeded") + } + if err := os.RemoveAll(filePath); err != nil { + t.Error(err) } } -func TestFileJSONServeTimeDefaultChanExit(t *testing.T) { +func TestFileJSONProcessEventReadError(t *testing.T) { cfg := config.NewDefaultCGRConfig() - cfgIdx := 0 - rdrExit := make(chan struct{}, 1) - rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, rdrExit) - if err != nil { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) + fltrs := &engine.FilterS{} + filePath := "/tmp/TestFileJSONProcessEvent/" + fname := "file2.json" + eR := &CSVFileER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/ErsJSON/out/", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), } - rdrExit <- struct{}{} - rdr.Config().RunDelay = time.Duration(1) - result := rdr.Serve() - if !reflect.DeepEqual(result, nil) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, result) + eR.conReqs <- struct{}{} + errExpect := "open /tmp/TestFileJSONProcessEvent/file2.json: no such file or directory" + if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) } } -func TestFileJSONProcessFile(t *testing.T) { +func TestFileJSON(t *testing.T) { cfg := config.NewDefaultCGRConfig() - cfgIdx := 0 - rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil) - if err != nil { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) + fltrs := &engine.FilterS{} + eR := &JSONFileER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/ErsJSON/out/", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), } - expected := "open : no such file or directory" - err2 := rdr.(*JSONFileER).processFile("", "") - if err2 == nil || err2.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err2) + eR.conReqs <- struct{}{} + filePath := "/tmp/ErsJSON/out/" + err := os.MkdirAll(filePath, 0777) + if err != nil { + t.Error(err) + } + for i := 1; i < 4; i++ { + if _, err := os.Create(path.Join(filePath, fmt.Sprintf("file%d.json", i))); err != nil { + t.Error(err) + } + } + eR.Config().RunDelay = 1 * time.Millisecond + if err := eR.Serve(); err != nil { + t.Error(err) + } + os.Create(path.Join(filePath, "file1.txt")) + eR.Config().RunDelay = 1 * time.Millisecond + if err := eR.Serve(); err != nil { + t.Error(err) } } -*/ + +func TestFileJSONExit(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + fltrs := &engine.FilterS{} + eR := &JSONFileER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/ErsJSON/out/", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + eR.Config().RunDelay = 1 * time.Millisecond + if err := eR.Serve(); err != nil { + t.Error(err) + } + eR.rdrExit <- struct{}{} +} diff --git a/ers/sql.go b/ers/sql.go index f00db6b7e..3a3ff80ff 100644 --- a/ers/sql.go +++ b/ers/sql.go @@ -96,6 +96,23 @@ func (rdr *SQLEventReader) Config() *config.EventReaderCfg { return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] } +func (rdr *SQLEventReader) openDB(dialect gorm.Dialector) (err error) { + var db *gorm.DB + if db, err = gorm.Open(dialect, &gorm.Config{AllowGlobalUpdate: true}); err != nil { + return + } + var sqlDB *sql.DB + if sqlDB, err = db.DB(); err != nil { + return + } + sqlDB.SetMaxOpenConns(10) + if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API + return + } + go rdr.readLoop(db, sqlDB) // read until the connection is closed + return +} + // Serve will start the gorutines needed to watch the sql topic func (rdr *SQLEventReader) Serve() (err error) { var dialect gorm.Dialector @@ -107,22 +124,7 @@ func (rdr *SQLEventReader) Serve() (err error) { default: return fmt.Errorf("db type <%s> not supported", rdr.connType) } - var db *gorm.DB - if db, err = gorm.Open(dialect, &gorm.Config{AllowGlobalUpdate: true}); err != nil { - return - } - var sqlDB *sql.DB - if sqlDB, err = db.DB(); err != nil { - return - } - sqlDB.SetMaxOpenConns(10) - if err = sqlDB.Ping(); err != nil { - return - } - if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API - return - } - go rdr.readLoop(db, sqlDB) // read until the connection is closed + err = rdr.openDB(dialect) return } diff --git a/ers/sql_it_test.go b/ers/sql_it_test.go index 2dd04192e..3b399fb45 100644 --- a/ers/sql_it_test.go +++ b/ers/sql_it_test.go @@ -31,7 +31,9 @@ import ( "github.com/cgrates/cgrates/utils" "gorm.io/driver/mysql" "gorm.io/gorm" + "gorm.io/gorm/clause" "gorm.io/gorm/logger" + "gorm.io/gorm/schema" ) var ( @@ -688,3 +690,72 @@ func TestErsSqlPostCDRS(t *testing.T) { } logger.Default = tmp } + +// type mockDialector interface { +// Name() string +// Initialize(*gorm.DB) error +// Migrator(db *gorm.DB) gorm.Migrator +// DataTypeOf(*schema.Field) string +// DefaultValueOf(*schema.Field) clause.Expression +// BindVarTo(writer clause.Writer, stmt *Statement, v interface{}) +// QuoteTo(clause.Writer, string) +// Explain(sql string, vars ...interface{}) string +// } +type mockDialect struct{} + +func (mockDialect) Name() string { return "" } +func (mockDialect) Initialize(db *gorm.DB) error { return nil } +func (mockDialect) Migrator(db *gorm.DB) gorm.Migrator { return nil } +func (mockDialect) DataTypeOf(*schema.Field) string { return "" } +func (mockDialect) DefaultValueOf(*schema.Field) clause.Expression { return nil } +func (mockDialect) BindVarTo(writer clause.Writer, stmt *gorm.Statement, v interface{}) { return } +func (mockDialect) QuoteTo(clause.Writer, string) { return } +func (mockDialect) Explain(sql string, vars ...interface{}) string { return "" } + +func TestMockOpenDB(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + sqlRdr := &SQLEventReader{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: &engine.FilterS{}, + connString: "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/cgrates?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", + connType: utils.MySQL, + tableName: "testName", + expConnString: "", + expConnType: utils.Postgres, + expTableName: "", + rdrEvents: nil, + rdrExit: nil, + rdrErr: nil, + cap: nil, + } + var mckDlct mockDialect + errExpect := "invalid db" + if err := sqlRdr.openDB(mckDlct); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } +} + +func TestSQLServeErr1(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + sqlRdr := &SQLEventReader{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: &engine.FilterS{}, + connString: "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/cgrates?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", + connType: utils.MySQL, + tableName: "testName", + expConnString: "", + expConnType: utils.Postgres, + expTableName: "", + rdrEvents: nil, + rdrExit: nil, + rdrErr: nil, + cap: nil, + } + cfg.StorDbCfg().Password = "CGRateS.org" + sqlRdr.Config().RunDelay = time.Duration(0) + if err := sqlRdr.Serve(); err != nil { + t.Error(err) + } +}