Add integration test for *sql exporter

This commit is contained in:
TeoV
2020-12-18 18:55:50 +02:00
committed by Dan Christian Bogos
parent 07159d8d6a
commit d955fbc959
5 changed files with 143 additions and 60 deletions

View File

@@ -388,28 +388,46 @@
"attempts": 1,
},
{
"id": "SQLExporter",
"id": "SQLExporterFull",
"type": "*sql",
"tenant": "cgrates.org",
"export_path": "mysql://cgrates:CGRateS.org@127.0.0.1:3306",
"attempts": 1,
"opts": {
"user": "cgrates",
"password": "CGRateS.org",
"host": "127.0.0.1",
"port": "3306",
"name": "exportedDatabase",
"tableName": "expTable",
"dbName": "exportedDatabase", // if dbName is not present "cgrates" will be used as default
"tableName": "expTable", // tableName is mandatory in opts for sql exporter
"sslmode": "disable",
"maxIdleConns": "10",
"maxOpenConns": "100",
"maxConnLifetime": "0",
},
"fields":[
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"},
{"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime"},
{"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"},
{"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"},
"fields":[ // in case that the path is *exp.*row user must complete all the fields one to one with his sql schema in the correct order
{"tag": "CGRID", "path": "*exp.*row", "type": "*group", "value": "~*req.CGRID"},
{"tag": "AnswerTime", "path": "*exp.*row", "type": "*group", "value": "~*req.AnswerTime"},
{"tag": "Usage", "path": "*exp.*row", "type": "*group", "value": "~*req.Usage"},
{"tag": "Cost", "path": "*exp.*row", "type": "*group", "value": "~*req.Cost{*round:4}"},
]
},
{
"id": "SQLExporterPartial",
"type": "*sql",
"tenant": "cgrates.org",
"export_path": "mysql://cgrates:CGRateS.org@127.0.0.1:3306",
"attempts": 1,
"opts": {
"dbName": "exportedDatabase",
"tableName": "expTable",
"sslmode": "disable",
"maxIdleConns": "10",
"maxOpenConns": "100",
"maxConnLifetime": "0",
},
"fields":[ // the path constains *exp.columnName
{"tag": "CGRID", "path": "*exp.cgrid", "type": "*variable", "value": "~*req.CGRID"},
{"tag": "AnswerTime", "path": "*exp.answer_time", "type": "*variable", "value": "~*req.AnswerTime"},
{"tag": "Cost", "path": "*exp.cost", "type": "*variable", "value": "~*req.Cost{*round:4}"},
]
}
]
},

View File

@@ -172,6 +172,8 @@ func (eeR *EventExporterRequest) SetFields(tplFlds []*config.FCTemplate) (err er
switch tplFld.Type {
case utils.META_COMPOSED:
err = utils.ComposeNavMapVal(eeR, fullPath, nMItm)
case utils.MetaGroup: // in case of *group type simply append to valSet
err = utils.AppendNavMapVal(eeR, fullPath, nMItm)
default:
_, err = eeR.Set(fullPath, &utils.NMSlice{nMItm})
}
@@ -188,6 +190,7 @@ func (eeR *EventExporterRequest) SetFields(tplFlds []*config.FCTemplate) (err er
// Set implements utils.NMInterface
func (eeR *EventExporterRequest) Set(fullPath *utils.FullPath, nm utils.NMInterface) (added bool, err error) {
switch fullPath.PathItems[0].Field {
default:
return false, fmt.Errorf("unsupported field prefix: <%s> when set field", fullPath.PathItems[0].Field)
@@ -231,7 +234,7 @@ func (eeR *EventExporterRequest) ParseField(
case utils.MetaRemoteHost:
out = eeR.RemoteHost().String()
isString = true
case utils.MetaVariable, utils.META_COMPOSED:
case utils.MetaVariable, utils.META_COMPOSED, utils.MetaGroup:
out, err = cfgFld.Value.ParseDataProvider(eeR)
isString = true
case utils.META_USAGE_DIFFERENCE:

View File

@@ -20,6 +20,7 @@ package ees
import (
"fmt"
"net/url"
"strings"
"sync"
@@ -35,19 +36,19 @@ func NewSQLEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
sqlEe = &SQLEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
// take the connection parameters from opts
connectString := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
utils.IfaceAsString(cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLUser]),
utils.IfaceAsString(cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLPassword]),
utils.IfaceAsString(cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLHost]),
utils.IfaceAsString(cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLPort]),
utils.IfaceAsString(cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLName]))
db, err := gorm.Open("mysql", connectString)
if err != nil {
return nil, err
var u *url.URL
if u, err = url.Parse(strings.TrimPrefix(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, utils.Meta)); err != nil {
return
}
if err = db.DB().Ping(); err != nil {
return nil, err
password, _ := u.User.Password()
dbname := utils.SQLDefaultDBName
if vals, has := cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLDBName]; has {
dbname = utils.IfaceAsString(vals)
}
ssl := utils.SQLDefaultSSLMode
if vals, has := cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLSSLMode]; has {
ssl = utils.IfaceAsString(vals)
}
// tableName is mandatory in opts
if iface, has := cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLTableName]; !has {
@@ -56,6 +57,25 @@ func NewSQLEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
sqlEe.tableName = utils.IfaceAsString(iface)
}
var connString string
switch u.Scheme {
case utils.MYSQL:
connString = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
u.User.Username(), password, u.Hostname(), u.Port(), dbname)
case utils.POSTGRES:
connString = fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s", u.Hostname(), u.Port(), dbname, u.User.Username(), password, ssl)
default:
return nil, fmt.Errorf("unknown db_type %s", u.Scheme)
}
db, err := gorm.Open(u.Scheme, connString)
if err != nil {
return nil, err
}
if err = db.DB().Ping(); err != nil {
return nil, err
}
if iface, has := cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLMaxIdleConns]; has {
val, err := utils.IfaceAsTInt64(iface)
if err != nil {
@@ -120,7 +140,7 @@ func (sqlEe *SQLEe) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) {
sqlEe.dc[utils.NumberOfEvents] = sqlEe.dc[utils.NumberOfEvents].(int64) + 1
var vals []interface{}
var colNames []string
req := utils.MapStorage(cgrEv.Event)
eeReq := NewEventExporterRequest(req, sqlEe.dc, cgrEv.Opts,
sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Tenant,
@@ -131,11 +151,16 @@ func (sqlEe *SQLEe) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) {
if err = eeReq.SetFields(sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].ContentFields()); err != nil {
return
}
for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() {
var iface interface{}
if iface, err = eeReq.cnt.FieldAsInterface(el.Value.Slice()); err != nil {
return
}
pathWithoutIndex := utils.GetPathWithoutIndex(el.Value.String())
if pathWithoutIndex != utils.MetaRow {
colNames = append(colNames, pathWithoutIndex)
}
vals = append(vals, iface)
}
@@ -143,24 +168,16 @@ func (sqlEe *SQLEe) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) {
for i := range vals {
sqlValues[i] = "?"
}
utils.Logger.Debug(fmt.Sprintf("Test??"))
utils.Logger.Debug(fmt.Sprintf("%+v", sqlValues))
utils.Logger.Debug(fmt.Sprintf("%+v", vals))
sqlStatement := fmt.Sprintf("INSERT INTO %s VALUES (%s); ", sqlEe.tableName, strings.Join(sqlValues, ","))
utils.Logger.Debug(fmt.Sprintf("%+v", sqlStatement))
tx := sqlEe.db.Begin()
utils.Logger.Debug(fmt.Sprintf("TestDaca ajunge aici ???? "))
res, err := tx.DB().Exec(sqlStatement, vals...)
utils.Logger.Debug(fmt.Sprintf("ALO %+v", res))
utils.Logger.Debug(fmt.Sprintf("ALO2 %+v", err))
if err != nil {
utils.Logger.Debug(fmt.Sprintf("%+v", err))
tx.Rollback()
return err
var sqlQuery string
if len(colNames) != len(vals) {
sqlQuery = fmt.Sprintf("INSERT INTO %s VALUES (%s); ", sqlEe.tableName, strings.Join(sqlValues, ","))
} else {
colNamesStr := "(" + strings.Join(colNames, ", ") + ")"
sqlQuery = fmt.Sprintf("INSERT INTO %s %s VALUES (%s); ", sqlEe.tableName, colNamesStr, strings.Join(sqlValues, ","))
}
tx.Commit()
defer tx.Close()
sqlEe.db.Table(sqlEe.tableName).Exec(sqlQuery, vals...)
updateEEMetrics(sqlEe.dc, cgrEv.Event, utils.FirstNonEmpty(sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Timezone,
sqlEe.cgrCfg.GeneralCfg().DefaultTimezone))

View File

@@ -48,11 +48,12 @@ var (
testSqlEeCreateTable,
testSqlEeLoadConfig,
testSqlEeResetDataDB,
testSqlEeResetStorDb,
testSqlEeStartEngine,
testSqlEeRPCConn,
testSqlEeExportEvent,
testSqlEeExportEventFull,
testSqlEeVerifyExportedEvent,
testSqlEeExportEventPartial,
testSqlEeVerifyExportedEvent2,
testStopCgrEngine,
testCleanDirectory,
}
@@ -67,14 +68,10 @@ func TestSqlEeExport(t *testing.T) {
// create a struct serve as model for *sql exporter
type testModelSql struct {
ID int64
Cgrid string
AnswerTime time.Time
Usage int64
Cost float64
CreatedAt time.Time
UpdatedAt time.Time
DeletedAt *time.Time
}
func (_ *testModelSql) TableName() string {
@@ -88,22 +85,29 @@ func (nopLogger) Print(values ...interface{}) {}
func testSqlEeCreateTable(t *testing.T) {
var err error
if db2, err = gorm.Open("mysql", fmt.Sprintf(dbConnString, "exportedDatabase")); err != nil {
if db2, err = gorm.Open("mysql", fmt.Sprintf(dbConnString, "cgrates")); err != nil {
t.Fatal(err)
}
db2.SetLogger(new(nopLogger))
if _, err = db2.DB().Exec(`CREATE DATABASE IF NOT EXISTS exportedDatabase;`); err != nil {
t.Fatal(err)
}
if db2, err = gorm.Open("mysql", fmt.Sprintf(dbConnString, "exportedDatabase")); err != nil {
t.Fatal(err)
}
tx := db2.Begin()
if !tx.HasTable("expTable") {
tx = tx.CreateTable(new(testModelSql))
if tx.HasTable("expTable") {
tx = tx.DropTable(new(testModelSql))
if err = tx.Error; err != nil {
tx.Rollback()
t.Fatal(err)
}
}
tx = tx.CreateTable(new(testModelSql))
if err = tx.Error; err != nil {
tx.Rollback()
t.Fatal(err)
}
tx.Commit()
}
@@ -121,12 +125,6 @@ func testSqlEeResetDataDB(t *testing.T) {
}
}
func testSqlEeResetStorDb(t *testing.T) {
if err := engine.InitStorDb(sqlEeCfg); err != nil {
t.Fatal(err)
}
}
func testSqlEeStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(sqlEeCfgPath, *waitRater); err != nil {
t.Fatal(err)
@@ -141,9 +139,9 @@ func testSqlEeRPCConn(t *testing.T) {
}
}
func testSqlEeExportEvent(t *testing.T) {
func testSqlEeExportEventFull(t *testing.T) {
eventVoice := &utils.CGREventWithEeIDs{
EeIDs: []string{"SQLExporter"},
EeIDs: []string{"SQLExporterFull"},
CGREventWithOpts: &utils.CGREventWithOpts{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
@@ -179,6 +177,44 @@ func testSqlEeExportEvent(t *testing.T) {
time.Sleep(10 * time.Millisecond)
}
func testSqlEeExportEventPartial(t *testing.T) {
eventVoice := &utils.CGREventWithEeIDs{
EeIDs: []string{"SQLExporterPartial"},
CGREventWithOpts: &utils.CGREventWithOpts{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "voiceEvent",
Time: utils.TimePointer(time.Now()),
Event: map[string]interface{}{
utils.CGRID: utils.Sha1("asd", time.Unix(1383813745, 0).UTC().String()),
utils.ToR: utils.VOICE,
utils.OriginID: "dsafdsaf",
utils.OriginHost: "192.168.1.1",
utils.RequestType: utils.META_RATED,
utils.Tenant: "cgrates.org",
utils.Category: "call",
utils.Account: "1001",
utils.Subject: "1001",
utils.Destination: "1002",
utils.SetupTime: time.Unix(1383813745, 0).UTC(),
utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
utils.Usage: 10 * time.Second,
utils.RunID: utils.MetaDefault,
utils.Cost: 123,
"ExtraFields": map[string]string{"extra1": "val_extra1",
"extra2": "val_extra2", "extra3": "val_extra3"},
},
},
},
}
var reply map[string]utils.MapStorage
if err := sqlEeRpc.Call(utils.EeSv1ProcessEvent, eventVoice, &reply); err != nil {
t.Error(err)
}
time.Sleep(10 * time.Millisecond)
}
func testSqlEeVerifyExportedEvent(t *testing.T) {
var result int64
db2.Table("expTable").Count(&result)
@@ -186,3 +222,11 @@ func testSqlEeVerifyExportedEvent(t *testing.T) {
t.Fatal("Expected table to have only one result ", result)
}
}
func testSqlEeVerifyExportedEvent2(t *testing.T) {
var result int64
db2.Table("expTable").Count(&result)
if result != 2 {
t.Fatal("Expected table to have only one result ", result)
}
}

View File

@@ -930,6 +930,7 @@ const (
Time = "Time"
TargetIDs = "TargetIDs"
TargetType = "TargetType"
MetaRow = "*row"
)
// Migrator Action