diff --git a/ers/sql.go b/ers/sql.go index 5e479dd6e..f6b4a88ce 100644 --- a/ers/sql.go +++ b/ers/sql.go @@ -37,7 +37,7 @@ import ( const ( dbName = "db_name" - tableName = "db_name" + tableName = "table_name" sslMode = "sslmode" defaultSSLMode = "disable" defaultDBName = "cgrates" @@ -98,9 +98,8 @@ func (rdr *SQLEventReader) Config() *config.EventReaderCfg { // Serve will start the gorutines needed to watch the kafka topic func (rdr *SQLEventReader) Serve() (err error) { - // connectString := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", user, password, host, port, name) var db *gorm.DB - if db, err = gorm.Open(strings.TrimPrefix(rdr.connType, utils.Meta), rdr.connString); err != nil { + if db, err = gorm.Open(rdr.connType, rdr.connString); err != nil { return } if err = db.DB().Ping(); err != nil { @@ -165,9 +164,13 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB) { fmt.Sprintf("<%s> processing message %s error: %s", utils.ERs, utils.ToJSON(msg), err.Error())) } - db.Delete(msg) // to ensure we don't read it again + db = db.Delete(msg) // to ensure we don't read it again if rdr.Config().ProcessedPath != utils.EmptyString { - // post it + if err = rdr.postCDR(columns); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> posting message %s error: %s", + utils.ERs, utils.ToJSON(msg), err.Error())) + } } if rdr.Config().ConcurrentReqs != -1 { rdr.cap <- struct{}{} @@ -207,21 +210,14 @@ func (rdr *SQLEventReader) processMessage(msg map[string]interface{}) (err error } func (rdr *SQLEventReader) setURL(inURL, outURL string) (err error) { - // *dbtype:user:password@host:port?options - split := strings.SplitN(inURL, utils.InInFieldSep, 2) - if len(split) != 2 { - return utils.NewErrMandatoryIeMissing("db_type") - } - rdr.connType = split[0] - inURL = split[1] - - //outhpath if no meta is op[tions only + inURL = strings.TrimPrefix(inURL, utils.Meta) var u *url.URL if u, err = url.Parse(inURL); err != nil { return } password, _ := u.User.Password() qry := u.Query() + rdr.connType = u.Scheme dbname := defaultDBName if vals, has := qry[dbName]; has && len(vals) != 0 { @@ -236,12 +232,11 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string) (err error) { if vals, has := qry[tableName]; has && len(vals) != 0 { rdr.tableName = vals[0] } - switch rdr.connType { - case utils.MetaMySQL: + case utils.MYSQL: rdr.connString = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", u.User.Username(), password, u.Hostname(), u.Port(), dbname) - case utils.MetaPostgres: + case utils.POSTGRES: rdr.connString = fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s", u.Hostname(), u.Port(), dbname, u.User.Username(), password, ssl) default: return fmt.Errorf("unknown db_type %s", rdr.connType) @@ -263,23 +258,17 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string) (err error) { return } } else { - split := strings.SplitN(inURL, utils.ConcatenatedKey(), 2) - if len(split) != 2 { - return utils.NewErrMandatoryIeMissing("db_type") - } - rdr.expConnType = split[0] - inURL = split[1] - - //outhpath if no meta is op[tions only - var outURL *url.URL - if outURL, err = url.Parse(inURL); err != nil { + outURL = strings.TrimPrefix(outURL, utils.Meta) + var oURL *url.URL + if oURL, err = url.Parse(inURL); err != nil { return } - outPassword, _ = outURL.User.Password() - outUser = outURL.User.Username() - outHost = outURL.Hostname() - outPort = outURL.Port() - oqry = outURL.Query() + rdr.expConnType = oURL.Scheme + outPassword, _ = oURL.User.Password() + outUser = oURL.User.Username() + outHost = oURL.Hostname() + outPort = oURL.Port() + oqry = oURL.Query() } outDBname = defaultDBName @@ -291,19 +280,45 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string) (err error) { outSSL = vals[0] } rdr.expTableName = utils.CDRsTBL - if vals, has := qry[tableName]; has && len(vals) != 0 { + if vals, has := oqry[tableName]; has && len(vals) != 0 { rdr.expTableName = vals[0] } - switch rdr.connType { - case utils.MetaMySQL: + switch rdr.expConnType { + case utils.MYSQL: rdr.expConnString = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", outUser, outPassword, outHost, outPort, outDBname) - case utils.MetaPostgres: + case utils.POSTGRES: rdr.expConnString = fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s", outHost, outPort, outDBname, outUser, outPassword, outSSL) default: - return fmt.Errorf("unknown db type") + return fmt.Errorf("unknown db_type") } return } + +func (rdr *SQLEventReader) postCDR(in []interface{}) (err error) { + sqlValues := make([]string, len(in)) + for i := range in { + sqlValues[i] = "?" + } + sqlStatement := fmt.Sprintf("INSERT INTO %s VALUES (%s); ", rdr.expTableName, strings.Join(sqlValues, ",")) + var db *gorm.DB + if db, err = gorm.Open(rdr.expConnType, rdr.expConnString); err != nil { + return + } + if err = db.DB().Ping(); err != nil { + return + } + tx := db.Begin() + _, err = db.DB().Exec(sqlStatement, in...) + if err != nil { + tx.Rollback() + if strings.Contains(err.Error(), "1062") || strings.Contains(err.Error(), "duplicate key") { // returns 1062/pq when key is duplicated + return utils.ErrExists + } + return + } + tx.Commit() + return +} diff --git a/ers/sql_it_test.go b/ers/sql_it_test.go index a15694c9b..b673da954 100644 --- a/ers/sql_it_test.go +++ b/ers/sql_it_test.go @@ -41,8 +41,11 @@ var ( testSQLInitDB, testSQLReader, testSQLEmptyTable, + testSQLPoster, + testSQLInitDB, testSQLReader2, + testSQLStop, } cdr = &engine.CDR{ @@ -50,7 +53,7 @@ var ( RunID: "RunID", } db *gorm.DB - dbConnString = "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/cgrates?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'" + dbConnString = "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/cgrates2?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'" ) func TestSQL(t *testing.T) { @@ -74,8 +77,8 @@ func testSQLInitConfig(t *testing.T) { "type": "*sql", // reader type <*file_csv> "run_delay": 1, // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited - "source_path": "*mysql:cgrates:CGRateS.org@127.0.0.1:3306", // read data from this path - // "processed_path": "/var/spool/cgrates/cdrc/out", // move processed data here + "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2", // read data from this path + "processed_path": "db_name=cgrates2&table_name=cdrs2", // move processed data here "tenant": "cgrates.org", // tenant used by import "filters": [], // limit parsing based on the filters "flags": [], // flags to influence the event processing @@ -95,9 +98,47 @@ func testSQLInitConfig(t *testing.T) { } func testSQLInitCdrDb(t *testing.T) { + rdrsql := sqlCfg.StorDbCfg().Clone() if err := engine.InitStorDb(sqlCfg); err != nil { t.Fatal(err) } + sqlCfg.StorDbCfg().Name = "cgrates2" + if err := engine.InitStorDb(sqlCfg); err != nil { + t.Fatal(err) + } + *sqlCfg.StorDbCfg() = *rdrsql + +} + +type testModelSql struct { + ID int64 + Cgrid string + RunID string + OriginHost string + Source string + OriginID string + TOR string + RequestType string + Tenant string + Category string + Account string + Subject string + Destination string + SetupTime time.Time + AnswerTime time.Time + Usage int64 + ExtraFields string + CostSource string + Cost float64 + CostDetails string + ExtraInfo string + CreatedAt time.Time + UpdatedAt time.Time + DeletedAt *time.Time +} + +func (_ *testModelSql) TableName() string { + return "cdrs2" } func testSQLInitDB(t *testing.T) { @@ -107,8 +148,10 @@ func testSQLInitDB(t *testing.T) { if err != nil { t.Fatal(err) } + if !db.HasTable("cdrs2") { + db = db.CreateTable(&testModelSql{}) + } db = db.Table(utils.CDRsTBL) - tx := db.Begin() cdrSql := cdr.AsCDRsql() cdrSql.CreatedAt = time.Now() @@ -154,17 +197,33 @@ func testSQLReader(t *testing.T) { } func testSQLEmptyTable(t *testing.T) { + time.Sleep(10 * time.Millisecond) rows, err := db.Table(utils.CDRsTBL).Select("*").Rows() if err != nil { - t.Error(err) + t.Fatal(err) + } + colNames, err := rows.Columns() + if err != nil { + t.Fatal(err) } for rows.Next() { - t.Fatal("Expected empty table") + columns := make([]interface{}, len(colNames)) + columnPointers := make([]interface{}, len(colNames)) + for i := range columns { + columnPointers[i] = &columns[i] + } + if err = rows.Scan(columnPointers...); err != nil { + t.Fatal(err) + } + msg := make(map[string]interface{}) + for i, colName := range colNames { + msg[colName] = columns[i] + } + t.Fatal("Expected empty table ", utils.ToJSON(msg)) } } func testSQLReader2(t *testing.T) { - select { case err := <-rdrErr: t.Error(err) @@ -188,8 +247,38 @@ func testSQLReader2(t *testing.T) { } } +func testSQLPoster(t *testing.T) { + rows, err := db.Table("cdrs2").Select("*").Rows() + if err != nil { + t.Fatal(err) + } + colNames, err := rows.Columns() + if err != nil { + t.Fatal(err) + } + for rows.Next() { + columns := make([]interface{}, len(colNames)) + columnPointers := make([]interface{}, len(colNames)) + for i := range columns { + columnPointers[i] = &columns[i] + } + if err = rows.Scan(columnPointers...); err != nil { + t.Fatal(err) + } + msg := make(map[string]interface{}) + for i, colName := range colNames { + msg[colName] = columns[i] + } + db.Table("cdrs2").Delete(msg) + if cgrid := utils.IfaceAsString(msg["cgrid"]); cgrid != cdr.CGRID { + t.Errorf("Expected: %s ,receieved: %s", cgrid, cdr.CGRID) + } + } +} + func testSQLStop(t *testing.T) { rdrExit <- struct{}{} + db = db.DropTable("cdrs2") if err := db.Close(); err != nil { t.Error(err) }