diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json index e8eb325ae..c07a7a9e0 100644 --- a/data/conf/samples/ees/cgrates.json +++ b/data/conf/samples/ees/cgrates.json @@ -388,28 +388,46 @@ "attempts": 1, }, { - "id": "SQLExporter", + "id": "SQLExporterFull", "type": "*sql", "tenant": "cgrates.org", + "export_path": "mysql://cgrates:CGRateS.org@127.0.0.1:3306", "attempts": 1, "opts": { - "user": "cgrates", - "password": "CGRateS.org", - "host": "127.0.0.1", - "port": "3306", - "name": "exportedDatabase", - "tableName": "expTable", + "dbName": "exportedDatabase", // if dbName is not present "cgrates" will be used as default + "tableName": "expTable", // tableName is mandatory in opts for sql exporter + "sslmode": "disable", "maxIdleConns": "10", "maxOpenConns": "100", "maxConnLifetime": "0", }, - "fields":[ - {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, - {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime"}, - {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, - {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, + "fields":[ // in case that the path is *exp.*row user must complete all the fields one to one with his sql schema in the correct order + {"tag": "CGRID", "path": "*exp.*row", "type": "*group", "value": "~*req.CGRID"}, + {"tag": "AnswerTime", "path": "*exp.*row", "type": "*group", "value": "~*req.AnswerTime"}, + {"tag": "Usage", "path": "*exp.*row", "type": "*group", "value": "~*req.Usage"}, + {"tag": "Cost", "path": "*exp.*row", "type": "*group", "value": "~*req.Cost{*round:4}"}, ] }, + { + "id": "SQLExporterPartial", + "type": "*sql", + "tenant": "cgrates.org", + "export_path": "mysql://cgrates:CGRateS.org@127.0.0.1:3306", + "attempts": 1, + "opts": { + "dbName": "exportedDatabase", + "tableName": "expTable", + "sslmode": "disable", + "maxIdleConns": "10", + "maxOpenConns": "100", + "maxConnLifetime": "0", + }, + "fields":[ // the path constains *exp.columnName + {"tag": "CGRID", "path": "*exp.cgrid", "type": "*variable", "value": "~*req.CGRID"}, + {"tag": "AnswerTime", "path": "*exp.answer_time", "type": "*variable", "value": "~*req.AnswerTime"}, + {"tag": "Cost", "path": "*exp.cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, + ] + } ] }, diff --git a/ees/eereq.go b/ees/eereq.go index 273731a99..0141215b8 100644 --- a/ees/eereq.go +++ b/ees/eereq.go @@ -172,6 +172,8 @@ func (eeR *EventExporterRequest) SetFields(tplFlds []*config.FCTemplate) (err er switch tplFld.Type { case utils.META_COMPOSED: err = utils.ComposeNavMapVal(eeR, fullPath, nMItm) + case utils.MetaGroup: // in case of *group type simply append to valSet + err = utils.AppendNavMapVal(eeR, fullPath, nMItm) default: _, err = eeR.Set(fullPath, &utils.NMSlice{nMItm}) } @@ -188,6 +190,7 @@ func (eeR *EventExporterRequest) SetFields(tplFlds []*config.FCTemplate) (err er // Set implements utils.NMInterface func (eeR *EventExporterRequest) Set(fullPath *utils.FullPath, nm utils.NMInterface) (added bool, err error) { + switch fullPath.PathItems[0].Field { default: return false, fmt.Errorf("unsupported field prefix: <%s> when set field", fullPath.PathItems[0].Field) @@ -231,7 +234,7 @@ func (eeR *EventExporterRequest) ParseField( case utils.MetaRemoteHost: out = eeR.RemoteHost().String() isString = true - case utils.MetaVariable, utils.META_COMPOSED: + case utils.MetaVariable, utils.META_COMPOSED, utils.MetaGroup: out, err = cfgFld.Value.ParseDataProvider(eeR) isString = true case utils.META_USAGE_DIFFERENCE: diff --git a/ees/sql.go b/ees/sql.go index eb10fe710..44b344e8d 100644 --- a/ees/sql.go +++ b/ees/sql.go @@ -20,6 +20,7 @@ package ees import ( "fmt" + "net/url" "strings" "sync" @@ -35,19 +36,19 @@ func NewSQLEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, sqlEe = &SQLEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc} - // take the connection parameters from opts - connectString := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", - utils.IfaceAsString(cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLUser]), - utils.IfaceAsString(cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLPassword]), - utils.IfaceAsString(cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLHost]), - utils.IfaceAsString(cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLPort]), - utils.IfaceAsString(cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLName])) - db, err := gorm.Open("mysql", connectString) - if err != nil { - return nil, err + var u *url.URL + if u, err = url.Parse(strings.TrimPrefix(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, utils.Meta)); err != nil { + return } - if err = db.DB().Ping(); err != nil { - return nil, err + password, _ := u.User.Password() + + dbname := utils.SQLDefaultDBName + if vals, has := cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLDBName]; has { + dbname = utils.IfaceAsString(vals) + } + ssl := utils.SQLDefaultSSLMode + if vals, has := cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLSSLMode]; has { + ssl = utils.IfaceAsString(vals) } // tableName is mandatory in opts if iface, has := cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLTableName]; !has { @@ -56,6 +57,25 @@ func NewSQLEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, sqlEe.tableName = utils.IfaceAsString(iface) } + var connString string + switch u.Scheme { + case utils.MYSQL: + connString = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", + u.User.Username(), password, u.Hostname(), u.Port(), dbname) + case utils.POSTGRES: + connString = fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s", u.Hostname(), u.Port(), dbname, u.User.Username(), password, ssl) + default: + return nil, fmt.Errorf("unknown db_type %s", u.Scheme) + } + + db, err := gorm.Open(u.Scheme, connString) + if err != nil { + return nil, err + } + if err = db.DB().Ping(); err != nil { + return nil, err + } + if iface, has := cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLMaxIdleConns]; has { val, err := utils.IfaceAsTInt64(iface) if err != nil { @@ -120,7 +140,7 @@ func (sqlEe *SQLEe) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) { sqlEe.dc[utils.NumberOfEvents] = sqlEe.dc[utils.NumberOfEvents].(int64) + 1 var vals []interface{} - + var colNames []string req := utils.MapStorage(cgrEv.Event) eeReq := NewEventExporterRequest(req, sqlEe.dc, cgrEv.Opts, sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Tenant, @@ -131,11 +151,16 @@ func (sqlEe *SQLEe) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) { if err = eeReq.SetFields(sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].ContentFields()); err != nil { return } + for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() { var iface interface{} if iface, err = eeReq.cnt.FieldAsInterface(el.Value.Slice()); err != nil { return } + pathWithoutIndex := utils.GetPathWithoutIndex(el.Value.String()) + if pathWithoutIndex != utils.MetaRow { + colNames = append(colNames, pathWithoutIndex) + } vals = append(vals, iface) } @@ -143,24 +168,16 @@ func (sqlEe *SQLEe) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) { for i := range vals { sqlValues[i] = "?" } - utils.Logger.Debug(fmt.Sprintf("Test??")) - utils.Logger.Debug(fmt.Sprintf("%+v", sqlValues)) - utils.Logger.Debug(fmt.Sprintf("%+v", vals)) - sqlStatement := fmt.Sprintf("INSERT INTO %s VALUES (%s); ", sqlEe.tableName, strings.Join(sqlValues, ",")) - utils.Logger.Debug(fmt.Sprintf("%+v", sqlStatement)) - tx := sqlEe.db.Begin() - utils.Logger.Debug(fmt.Sprintf("TestDaca ajunge aici ???? ")) - res, err := tx.DB().Exec(sqlStatement, vals...) - utils.Logger.Debug(fmt.Sprintf("ALO %+v", res)) - utils.Logger.Debug(fmt.Sprintf("ALO2 %+v", err)) - if err != nil { - utils.Logger.Debug(fmt.Sprintf("%+v", err)) - tx.Rollback() - return err + + var sqlQuery string + if len(colNames) != len(vals) { + sqlQuery = fmt.Sprintf("INSERT INTO %s VALUES (%s); ", sqlEe.tableName, strings.Join(sqlValues, ",")) + } else { + colNamesStr := "(" + strings.Join(colNames, ", ") + ")" + sqlQuery = fmt.Sprintf("INSERT INTO %s %s VALUES (%s); ", sqlEe.tableName, colNamesStr, strings.Join(sqlValues, ",")) } - tx.Commit() - defer tx.Close() + sqlEe.db.Table(sqlEe.tableName).Exec(sqlQuery, vals...) updateEEMetrics(sqlEe.dc, cgrEv.Event, utils.FirstNonEmpty(sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Timezone, sqlEe.cgrCfg.GeneralCfg().DefaultTimezone)) diff --git a/ees/sql_it_test.go b/ees/sql_it_test.go index cd56d8ff0..18ccab564 100644 --- a/ees/sql_it_test.go +++ b/ees/sql_it_test.go @@ -48,11 +48,12 @@ var ( testSqlEeCreateTable, testSqlEeLoadConfig, testSqlEeResetDataDB, - testSqlEeResetStorDb, testSqlEeStartEngine, testSqlEeRPCConn, - testSqlEeExportEvent, + testSqlEeExportEventFull, testSqlEeVerifyExportedEvent, + testSqlEeExportEventPartial, + testSqlEeVerifyExportedEvent2, testStopCgrEngine, testCleanDirectory, } @@ -67,14 +68,10 @@ func TestSqlEeExport(t *testing.T) { // create a struct serve as model for *sql exporter type testModelSql struct { - ID int64 Cgrid string AnswerTime time.Time Usage int64 Cost float64 - CreatedAt time.Time - UpdatedAt time.Time - DeletedAt *time.Time } func (_ *testModelSql) TableName() string { @@ -88,22 +85,29 @@ func (nopLogger) Print(values ...interface{}) {} func testSqlEeCreateTable(t *testing.T) { var err error - if db2, err = gorm.Open("mysql", fmt.Sprintf(dbConnString, "exportedDatabase")); err != nil { + if db2, err = gorm.Open("mysql", fmt.Sprintf(dbConnString, "cgrates")); err != nil { t.Fatal(err) } db2.SetLogger(new(nopLogger)) - if _, err = db2.DB().Exec(`CREATE DATABASE IF NOT EXISTS exportedDatabase;`); err != nil { t.Fatal(err) } + if db2, err = gorm.Open("mysql", fmt.Sprintf(dbConnString, "exportedDatabase")); err != nil { + t.Fatal(err) + } tx := db2.Begin() - if !tx.HasTable("expTable") { - tx = tx.CreateTable(new(testModelSql)) + if tx.HasTable("expTable") { + tx = tx.DropTable(new(testModelSql)) if err = tx.Error; err != nil { tx.Rollback() t.Fatal(err) } } + tx = tx.CreateTable(new(testModelSql)) + if err = tx.Error; err != nil { + tx.Rollback() + t.Fatal(err) + } tx.Commit() } @@ -121,12 +125,6 @@ func testSqlEeResetDataDB(t *testing.T) { } } -func testSqlEeResetStorDb(t *testing.T) { - if err := engine.InitStorDb(sqlEeCfg); err != nil { - t.Fatal(err) - } -} - func testSqlEeStartEngine(t *testing.T) { if _, err := engine.StopStartEngine(sqlEeCfgPath, *waitRater); err != nil { t.Fatal(err) @@ -141,9 +139,9 @@ func testSqlEeRPCConn(t *testing.T) { } } -func testSqlEeExportEvent(t *testing.T) { +func testSqlEeExportEventFull(t *testing.T) { eventVoice := &utils.CGREventWithEeIDs{ - EeIDs: []string{"SQLExporter"}, + EeIDs: []string{"SQLExporterFull"}, CGREventWithOpts: &utils.CGREventWithOpts{ CGREvent: &utils.CGREvent{ Tenant: "cgrates.org", @@ -179,6 +177,44 @@ func testSqlEeExportEvent(t *testing.T) { time.Sleep(10 * time.Millisecond) } +func testSqlEeExportEventPartial(t *testing.T) { + eventVoice := &utils.CGREventWithEeIDs{ + EeIDs: []string{"SQLExporterPartial"}, + CGREventWithOpts: &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "voiceEvent", + Time: utils.TimePointer(time.Now()), + Event: map[string]interface{}{ + utils.CGRID: utils.Sha1("asd", time.Unix(1383813745, 0).UTC().String()), + utils.ToR: utils.VOICE, + utils.OriginID: "dsafdsaf", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.META_RATED, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: 10 * time.Second, + utils.RunID: utils.MetaDefault, + utils.Cost: 123, + "ExtraFields": map[string]string{"extra1": "val_extra1", + "extra2": "val_extra2", "extra3": "val_extra3"}, + }, + }, + }, + } + + var reply map[string]utils.MapStorage + if err := sqlEeRpc.Call(utils.EeSv1ProcessEvent, eventVoice, &reply); err != nil { + t.Error(err) + } + time.Sleep(10 * time.Millisecond) +} + func testSqlEeVerifyExportedEvent(t *testing.T) { var result int64 db2.Table("expTable").Count(&result) @@ -186,3 +222,11 @@ func testSqlEeVerifyExportedEvent(t *testing.T) { t.Fatal("Expected table to have only one result ", result) } } + +func testSqlEeVerifyExportedEvent2(t *testing.T) { + var result int64 + db2.Table("expTable").Count(&result) + if result != 2 { + t.Fatal("Expected table to have only one result ", result) + } +} diff --git a/utils/consts.go b/utils/consts.go index 52a51436e..ccb04dda3 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -930,6 +930,7 @@ const ( Time = "Time" TargetIDs = "TargetIDs" TargetType = "TargetType" + MetaRow = "*row" ) // Migrator Action