From 3188b94df8b8ec0822acd7b5395eb9ef10dc1248 Mon Sep 17 00:00:00 2001 From: andronache Date: Thu, 11 Mar 2021 17:02:50 +0200 Subject: [PATCH] Integration test and cover tests in ers --- ers/sql_it_test.go | 270 +++++++++++++++++++++++++++++++++++++++++---- ers/sql_test.go | 76 +++++++++++++ 2 files changed, 323 insertions(+), 23 deletions(-) diff --git a/ers/sql_it_test.go b/ers/sql_it_test.go index 7665057cf..2d377d3db 100644 --- a/ers/sql_it_test.go +++ b/ers/sql_it_test.go @@ -51,6 +51,20 @@ var ( testSQLStop, } + sqlTests2 = []func(t *testing.T){ + testSQLInitConfig2, + testSQLInitDBs2, + testSQLInitCdrDb2, + testSQLInitDB2, + testSQLReader3, + testSQLEmptyTable2, + testSQLPoster2, + + testSQLAddData2, + testSQLReader4, + + testSQLStop2, + } cdr = &engine.CDR{ CGRID: "CGRID", RunID: "RunID", @@ -219,7 +233,6 @@ func testSQLReader(t *testing.T) { t.Fatal(err) } sqlER.Serve() - select { case err = <-rdrErr: t.Error(err) @@ -337,27 +350,238 @@ func TestSQLReaderServeBadTypeErr(t *testing.T) { } } -func TestSQLPostCDR(t *testing.T) { - cfg := config.NewDefaultCGRConfig() - rdr := &SQLEventReader{ - cgrCfg: cfg, - cfgIdx: 1, - fltrS: &engine.FilterS{}, - connString: "testString", - connType: "testType", - tableName: "testName", - expConnString: "testExpConnString", - expConnType: "testExpConnType", - expTableName: "testExpTableName", - rdrEvents: nil, - rdrExit: nil, - rdrErr: nil, - cap: nil, - } - in := make([]interface{}, 2) - err := rdr.postCDR(in) - expected := "db type not supported" - if err == nil || err.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) +func TestSQL2(t *testing.T) { + // sqlCfgPath = path.Join(*dataDir, "conf", "samples", "ers_reload", "disabled") + for _, test := range sqlTests2 { + t.Run("TestSQL", test) } } + +func testSQLInitConfig2(t *testing.T) { + var err error + if sqlCfg, err = config.NewCGRConfigFromJSONStringWithDefaults(`{ + "stor_db": { + "db_password": "CGRateS.org", + }, + "ers": { // EventReaderService + "enabled": true, // starts the EventReader service: + "readers": [ + { + "id": "mysql", // identifier of the EventReader profile + "type": "*sql", // reader type <*file_csv> + "run_delay": "1", // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together + "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited + "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", // read data from this path + "opts": { + "dbName":"cgrates2", + "dbNameProcessed":"cgrates2", + "tableNameProcessed":"cdrs2", + }, + "processed_path": "", // move processed data here + "tenant": "cgrates.org", // tenant used by import + "filters": [], // limit parsing based on the filters + "flags": [], // flags to influence the event processing + "fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value + {"tag": "CGRID", "type": "*composed", "value": "~*req.cgrid", "path": "*cgreq.CGRID"}, + ], + }, + ], + }, + }`); err != nil { + t.Fatal(err) + } + utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, sqlCfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) +} + +func testSQLInitCdrDb2(t *testing.T) { + if err := engine.InitStorDb(sqlCfg); err != nil { + t.Fatal(err) + } +} + +func testSQLInitDBs2(t *testing.T) { + var err error + var db2 *gorm.DB + if db2, err = gorm.Open(mysql.Open(fmt.Sprintf(dbConnString, "cgrates")), + &gorm.Config{ + AllowGlobalUpdate: true, + Logger: logger.Default.LogMode(logger.Silent), + }); err != nil { + t.Fatal(err) + } + + if err = db2.Exec(`CREATE DATABASE IF NOT EXISTS cgrates2;`).Error; err != nil { + t.Fatal(err) + } +} +func testSQLInitDB2(t *testing.T) { + cdr.CGRID = utils.UUIDSha1Prefix() + var err error + if db, err = gorm.Open(mysql.Open(fmt.Sprintf(dbConnString, "cgrates2")), + &gorm.Config{ + AllowGlobalUpdate: true, + Logger: logger.Default.LogMode(logger.Silent), + }); err != nil { + t.Fatal(err) + } + tx := db.Begin() + if !tx.Migrator().HasTable("cdrs") { + if err = tx.Migrator().CreateTable(new(engine.CDRsql)); err != nil { + tx.Rollback() + t.Fatal(err) + } + } + if !tx.Migrator().HasTable("cdrs2") { + if err = tx.Migrator().CreateTable(new(testModelSql)); err != nil { + tx.Rollback() + t.Fatal(err) + } + } + tx.Commit() + tx = db.Begin() + tx = tx.Table(utils.CDRsTBL) + cdrSql := cdr.AsCDRsql() + cdrSql.CreatedAt = time.Now() + saved := tx.Save(cdrSql) + if saved.Error != nil { + tx.Rollback() + t.Fatal(err) + } + tx.Commit() + time.Sleep(10 * time.Millisecond) + var result int64 + db.Table(utils.CDRsTBL).Count(&result) + if result != 1 { + t.Fatal("Expected table to have only one result ", result) + } +} + +func testSQLAddData2(t *testing.T) { + tx := db.Begin() + tx = tx.Table(utils.CDRsTBL) + cdrSql := cdr.AsCDRsql() + cdrSql.CreatedAt = time.Now() + saved := tx.Save(cdrSql) + if saved.Error != nil { + tx.Rollback() + t.Fatal(saved.Error) + } + tx.Commit() + time.Sleep(10 * time.Millisecond) +} +func testSQLReader3(t *testing.T) { + rdrEvents = make(chan *erEvent, 1) + rdrErr = make(chan error, 1) + rdrExit = make(chan struct{}, 1) + sqlER, err := NewEventReader(sqlCfg, 1, rdrEvents, rdrErr, new(engine.FilterS), rdrExit) + if err != nil { + t.Fatal(err) + } + sqlER.Serve() + + select { + case err = <-rdrErr: + t.Error(err) + case ev := <-rdrEvents: + if ev.rdrCfg.ID != "mysql" { + t.Errorf("Expected 'mysql' received `%s`", ev.rdrCfg.ID) + } + expected := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: ev.cgrEvent.ID, + Time: ev.cgrEvent.Time, + Event: map[string]interface{}{ + "CGRID": cdr.CGRID, + }, + Opts: map[string]interface{}{}, + } + if !reflect.DeepEqual(ev.cgrEvent, expected) { + t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent)) + } + case <-time.After(time.Second): + t.Fatal("Timeout") + } +} + +func testSQLEmptyTable2(t *testing.T) { + time.Sleep(10 * time.Millisecond) + var result int64 + db.Table(utils.CDRsTBL).Count(&result) + if result != 0 { + t.Fatal("Expected empty table ", result) + } +} + +func testSQLReader4(t *testing.T) { + select { + case err := <-rdrErr: + t.Error(err) + case ev := <-rdrEvents: + if ev.rdrCfg.ID != "mysql" { + t.Errorf("Expected 'mysql' received `%s`", ev.rdrCfg.ID) + } + expected := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: ev.cgrEvent.ID, + Time: ev.cgrEvent.Time, + Event: map[string]interface{}{ + "CGRID": cdr.CGRID, + }, + Opts: map[string]interface{}{}, + } + if !reflect.DeepEqual(ev.cgrEvent, expected) { + t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent)) + } + case <-time.After(time.Second): + t.Fatal("Timeout") + } +} + +func testSQLPoster2(t *testing.T) { + rows, err := db.Table("cdrs2").Select("*").Rows() + if err != nil { + t.Fatal(err) + } + colNames, err := rows.Columns() + if err != nil { + t.Fatal(err) + } + for rows.Next() { + columns := make([]interface{}, len(colNames)) + columnPointers := make([]interface{}, len(colNames)) + for i := range columns { + columnPointers[i] = &columns[i] + } + if err = rows.Scan(columnPointers...); err != nil { + t.Fatal(err) + } + msg := make(map[string]interface{}) + for i, colName := range colNames { + msg[colName] = columns[i] + } + db.Table("cdrs2").Delete(msg) + if cgrid := utils.IfaceAsString(msg["cgrid"]); cgrid != cdr.CGRID { + t.Errorf("Expected: %s ,receieved: %s", cgrid, cdr.CGRID) + } + } +} + +func testSQLStop2(t *testing.T) { + close(rdrExit) + if err := db.Migrator().DropTable("cdrs2"); err != nil { + t.Fatal(err) + } + if err := db.Migrator().DropTable("cdrs"); err != nil { + t.Fatal(err) + } + if err := db.Exec(`DROP DATABASE cgrates2;`).Error; err != nil { + t.Fatal(err) + } + if db2, err := db.DB(); err != nil { + t.Fatal(err) + } else if err = db2.Close(); err != nil { + t.Fatal(err) + } + +} diff --git a/ers/sql_test.go b/ers/sql_test.go index 63e4b6a44..ecb5e401a 100644 --- a/ers/sql_test.go +++ b/ers/sql_test.go @@ -20,6 +20,12 @@ package ers import ( "testing" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + + "github.com/cgrates/cgrates/utils" + "gorm.io/gorm/logger" ) func TestSQLSetURL(t *testing.T) { @@ -139,3 +145,73 @@ func TestSQLSetURL(t *testing.T) { t.Errorf("Expected error: 'unknown db_type postgres2' ,received: %v", err) } } + +func TestSQLPostCDR(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + rdr := &SQLEventReader{ + cgrCfg: cfg, + cfgIdx: 1, + fltrS: &engine.FilterS{}, + connString: "testString", + connType: "testType", + tableName: "testName", + expConnString: "testExpConnString", + expConnType: "testExpConnType", + expTableName: "testExpTableName", + rdrEvents: nil, + rdrExit: nil, + rdrErr: nil, + cap: nil, + } + in := make([]interface{}, 2) + err := rdr.postCDR(in) + expected := "db type not supported" + if err == nil || err.Error() != expected { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) + } +} + +func TestSQLReaderServePostgresErr(t *testing.T) { + tmp := logger.Default + logger.Default = logger.Default.LogMode(logger.Silent) + rdr := &SQLEventReader{ + connType: utils.Postgres, + connString: "host=127.0.0.1 port=9999 dbname=cdrs user=cgrates password=CGRateS.org sslmode=disabled", + } + expected := "cannot parse `host=127.0.0.1 port=9999 dbname=cdrs user=cgrates password=xxxxx sslmode=disabled`: failed to configure TLS (sslmode is invalid)" + err := rdr.Serve() + if err == nil || err.Error() != expected { + t.Errorf("\nExpected: <%+v>, \nreceived: <%+v>", expected, err) + } + logger.Default = tmp +} + +func TestSQLReaderServeBadType(t *testing.T) { + tmp := logger.Default + logger.Default = logger.Default.LogMode(logger.Silent) + rdr := &SQLEventReader{ + connType: utils.Postgres, + connString: "host=127.0.0.1 port=9999 dbname=cdrs user=cgrates password=CGRateS.org sslmode=disabled", + } + expected := "cannot parse `host=127.0.0.1 port=9999 dbname=cdrs user=cgrates password=xxxxx sslmode=disabled`: failed to configure TLS (sslmode is invalid)" + err := rdr.Serve() + if err == nil || err.Error() != expected { + t.Errorf("\nExpected: <%+v>, \nreceived: <%+v>", expected, err) + } + logger.Default = tmp +} + +func TestSQLReaderServeBadType2(t *testing.T) { + tmp := logger.Default + logger.Default = logger.Default.LogMode(logger.Silent) + rdr := &SQLEventReader{ + connType: utils.MySQL, + connString: "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/cgrates2?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", + } + expected := "Error 1049: Unknown database 'cgrates2'" + err := rdr.Serve() + if err == nil || err.Error() != expected { + t.Errorf("\nExpected: <%+v>, \nreceived: <%+v>", expected, err) + } + logger.Default = tmp +}