Updated *sql Event Reader

This commit is contained in:
Trial97
2019-12-17 11:43:12 +02:00
parent 7c8e3810b2
commit 656fb8be00
2 changed files with 148 additions and 44 deletions

View File

@@ -37,7 +37,7 @@ import (
const (
dbName = "db_name"
tableName = "db_name"
tableName = "table_name"
sslMode = "sslmode"
defaultSSLMode = "disable"
defaultDBName = "cgrates"
@@ -98,9 +98,8 @@ func (rdr *SQLEventReader) Config() *config.EventReaderCfg {
// Serve will start the gorutines needed to watch the kafka topic
func (rdr *SQLEventReader) Serve() (err error) {
// connectString := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", user, password, host, port, name)
var db *gorm.DB
if db, err = gorm.Open(strings.TrimPrefix(rdr.connType, utils.Meta), rdr.connString); err != nil {
if db, err = gorm.Open(rdr.connType, rdr.connString); err != nil {
return
}
if err = db.DB().Ping(); err != nil {
@@ -165,9 +164,13 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB) {
fmt.Sprintf("<%s> processing message %s error: %s",
utils.ERs, utils.ToJSON(msg), err.Error()))
}
db.Delete(msg) // to ensure we don't read it again
db = db.Delete(msg) // to ensure we don't read it again
if rdr.Config().ProcessedPath != utils.EmptyString {
// post it
if err = rdr.postCDR(columns); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> posting message %s error: %s",
utils.ERs, utils.ToJSON(msg), err.Error()))
}
}
if rdr.Config().ConcurrentReqs != -1 {
rdr.cap <- struct{}{}
@@ -207,21 +210,14 @@ func (rdr *SQLEventReader) processMessage(msg map[string]interface{}) (err error
}
func (rdr *SQLEventReader) setURL(inURL, outURL string) (err error) {
// *dbtype:user:password@host:port?options
split := strings.SplitN(inURL, utils.InInFieldSep, 2)
if len(split) != 2 {
return utils.NewErrMandatoryIeMissing("db_type")
}
rdr.connType = split[0]
inURL = split[1]
//outhpath if no meta is op[tions only
inURL = strings.TrimPrefix(inURL, utils.Meta)
var u *url.URL
if u, err = url.Parse(inURL); err != nil {
return
}
password, _ := u.User.Password()
qry := u.Query()
rdr.connType = u.Scheme
dbname := defaultDBName
if vals, has := qry[dbName]; has && len(vals) != 0 {
@@ -236,12 +232,11 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string) (err error) {
if vals, has := qry[tableName]; has && len(vals) != 0 {
rdr.tableName = vals[0]
}
switch rdr.connType {
case utils.MetaMySQL:
case utils.MYSQL:
rdr.connString = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
u.User.Username(), password, u.Hostname(), u.Port(), dbname)
case utils.MetaPostgres:
case utils.POSTGRES:
rdr.connString = fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s", u.Hostname(), u.Port(), dbname, u.User.Username(), password, ssl)
default:
return fmt.Errorf("unknown db_type %s", rdr.connType)
@@ -263,23 +258,17 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string) (err error) {
return
}
} else {
split := strings.SplitN(inURL, utils.ConcatenatedKey(), 2)
if len(split) != 2 {
return utils.NewErrMandatoryIeMissing("db_type")
}
rdr.expConnType = split[0]
inURL = split[1]
//outhpath if no meta is op[tions only
var outURL *url.URL
if outURL, err = url.Parse(inURL); err != nil {
outURL = strings.TrimPrefix(outURL, utils.Meta)
var oURL *url.URL
if oURL, err = url.Parse(inURL); err != nil {
return
}
outPassword, _ = outURL.User.Password()
outUser = outURL.User.Username()
outHost = outURL.Hostname()
outPort = outURL.Port()
oqry = outURL.Query()
rdr.expConnType = oURL.Scheme
outPassword, _ = oURL.User.Password()
outUser = oURL.User.Username()
outHost = oURL.Hostname()
outPort = oURL.Port()
oqry = oURL.Query()
}
outDBname = defaultDBName
@@ -291,19 +280,45 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string) (err error) {
outSSL = vals[0]
}
rdr.expTableName = utils.CDRsTBL
if vals, has := qry[tableName]; has && len(vals) != 0 {
if vals, has := oqry[tableName]; has && len(vals) != 0 {
rdr.expTableName = vals[0]
}
switch rdr.connType {
case utils.MetaMySQL:
switch rdr.expConnType {
case utils.MYSQL:
rdr.expConnString = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
outUser, outPassword, outHost, outPort, outDBname)
case utils.MetaPostgres:
case utils.POSTGRES:
rdr.expConnString = fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s",
outHost, outPort, outDBname, outUser, outPassword, outSSL)
default:
return fmt.Errorf("unknown db type")
return fmt.Errorf("unknown db_type")
}
return
}
func (rdr *SQLEventReader) postCDR(in []interface{}) (err error) {
sqlValues := make([]string, len(in))
for i := range in {
sqlValues[i] = "?"
}
sqlStatement := fmt.Sprintf("INSERT INTO %s VALUES (%s); ", rdr.expTableName, strings.Join(sqlValues, ","))
var db *gorm.DB
if db, err = gorm.Open(rdr.expConnType, rdr.expConnString); err != nil {
return
}
if err = db.DB().Ping(); err != nil {
return
}
tx := db.Begin()
_, err = db.DB().Exec(sqlStatement, in...)
if err != nil {
tx.Rollback()
if strings.Contains(err.Error(), "1062") || strings.Contains(err.Error(), "duplicate key") { // returns 1062/pq when key is duplicated
return utils.ErrExists
}
return
}
tx.Commit()
return
}

View File

@@ -41,8 +41,11 @@ var (
testSQLInitDB,
testSQLReader,
testSQLEmptyTable,
testSQLPoster,
testSQLInitDB,
testSQLReader2,
testSQLStop,
}
cdr = &engine.CDR{
@@ -50,7 +53,7 @@ var (
RunID: "RunID",
}
db *gorm.DB
dbConnString = "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/cgrates?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'"
dbConnString = "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/cgrates2?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'"
)
func TestSQL(t *testing.T) {
@@ -74,8 +77,8 @@ func testSQLInitConfig(t *testing.T) {
"type": "*sql", // reader type <*file_csv>
"run_delay": 1, // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together
"concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited
"source_path": "*mysql:cgrates:CGRateS.org@127.0.0.1:3306", // read data from this path
// "processed_path": "/var/spool/cgrates/cdrc/out", // move processed data here
"source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2", // read data from this path
"processed_path": "db_name=cgrates2&table_name=cdrs2", // move processed data here
"tenant": "cgrates.org", // tenant used by import
"filters": [], // limit parsing based on the filters
"flags": [], // flags to influence the event processing
@@ -95,9 +98,47 @@ func testSQLInitConfig(t *testing.T) {
}
func testSQLInitCdrDb(t *testing.T) {
rdrsql := sqlCfg.StorDbCfg().Clone()
if err := engine.InitStorDb(sqlCfg); err != nil {
t.Fatal(err)
}
sqlCfg.StorDbCfg().Name = "cgrates2"
if err := engine.InitStorDb(sqlCfg); err != nil {
t.Fatal(err)
}
*sqlCfg.StorDbCfg() = *rdrsql
}
type testModelSql struct {
ID int64
Cgrid string
RunID string
OriginHost string
Source string
OriginID string
TOR string
RequestType string
Tenant string
Category string
Account string
Subject string
Destination string
SetupTime time.Time
AnswerTime time.Time
Usage int64
ExtraFields string
CostSource string
Cost float64
CostDetails string
ExtraInfo string
CreatedAt time.Time
UpdatedAt time.Time
DeletedAt *time.Time
}
func (_ *testModelSql) TableName() string {
return "cdrs2"
}
func testSQLInitDB(t *testing.T) {
@@ -107,8 +148,10 @@ func testSQLInitDB(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if !db.HasTable("cdrs2") {
db = db.CreateTable(&testModelSql{})
}
db = db.Table(utils.CDRsTBL)
tx := db.Begin()
cdrSql := cdr.AsCDRsql()
cdrSql.CreatedAt = time.Now()
@@ -154,17 +197,33 @@ func testSQLReader(t *testing.T) {
}
func testSQLEmptyTable(t *testing.T) {
time.Sleep(10 * time.Millisecond)
rows, err := db.Table(utils.CDRsTBL).Select("*").Rows()
if err != nil {
t.Error(err)
t.Fatal(err)
}
colNames, err := rows.Columns()
if err != nil {
t.Fatal(err)
}
for rows.Next() {
t.Fatal("Expected empty table")
columns := make([]interface{}, len(colNames))
columnPointers := make([]interface{}, len(colNames))
for i := range columns {
columnPointers[i] = &columns[i]
}
if err = rows.Scan(columnPointers...); err != nil {
t.Fatal(err)
}
msg := make(map[string]interface{})
for i, colName := range colNames {
msg[colName] = columns[i]
}
t.Fatal("Expected empty table ", utils.ToJSON(msg))
}
}
func testSQLReader2(t *testing.T) {
select {
case err := <-rdrErr:
t.Error(err)
@@ -188,8 +247,38 @@ func testSQLReader2(t *testing.T) {
}
}
func testSQLPoster(t *testing.T) {
rows, err := db.Table("cdrs2").Select("*").Rows()
if err != nil {
t.Fatal(err)
}
colNames, err := rows.Columns()
if err != nil {
t.Fatal(err)
}
for rows.Next() {
columns := make([]interface{}, len(colNames))
columnPointers := make([]interface{}, len(colNames))
for i := range columns {
columnPointers[i] = &columns[i]
}
if err = rows.Scan(columnPointers...); err != nil {
t.Fatal(err)
}
msg := make(map[string]interface{})
for i, colName := range colNames {
msg[colName] = columns[i]
}
db.Table("cdrs2").Delete(msg)
if cgrid := utils.IfaceAsString(msg["cgrid"]); cgrid != cdr.CGRID {
t.Errorf("Expected: %s ,receieved: %s", cgrid, cdr.CGRID)
}
}
}
func testSQLStop(t *testing.T) {
rdrExit <- struct{}{}
db = db.DropTable("cdrs2")
if err := db.Close(); err != nil {
t.Error(err)
}