From 07159d8d6ad1323cac33df59a51ee66817af02cb Mon Sep 17 00:00:00 2001 From: TeoV Date: Thu, 17 Dec 2020 19:34:58 +0200 Subject: [PATCH] Add SQL Exporter --- config/config.go | 2 +- config/configsanity.go | 4 + data/conf/samples/ees/cgrates.json | 25 +++- ees/ee.go | 2 + ees/sql.go | 172 ++++++++++++++++++++++++++ ees/sql_it_test.go | 188 +++++++++++++++++++++++++++++ utils/consts.go | 10 ++ 7 files changed, 401 insertions(+), 2 deletions(-) create mode 100644 ees/sql.go create mode 100644 ees/sql_it_test.go diff --git a/config/config.go b/config/config.go index 8b382e909..c1c0f7df7 100644 --- a/config/config.go +++ b/config/config.go @@ -345,7 +345,7 @@ var possibleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV, var possibleExporterTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.META_NONE, utils.MetaFileFWV, utils.MetaHTTPPost, utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap, - utils.MetaKafkajsonMap, utils.MetaS3jsonMap, utils.MetaElastic, utils.MetaVirt}) + utils.MetaKafkajsonMap, utils.MetaS3jsonMap, utils.MetaElastic, utils.MetaVirt, utils.MetaSQL}) // LazySanityCheck used after check config sanity to display warnings related to the config func (cfg *CGRConfig) LazySanityCheck() { diff --git a/config/configsanity.go b/config/configsanity.go index bb00dac87..73cf9348b 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -602,6 +602,10 @@ func (cfg *CGRConfig) checkConfigSanity() error { return fmt.Errorf("<%s> nonexistent folder: %s for exporter with ID: %s", utils.EEs, dir, exp.ID) } } + case utils.MetaSQL: + if len(exp.ContentFields()) == 0 { + return fmt.Errorf("<%s> empty content fields for exporter with ID: %s", utils.EEs, exp.ID) + } } for _, field := range exp.Fields { if field.Type != utils.META_NONE && field.Path == utils.EmptyString { diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json index 2242c3b45..e8eb325ae 100644 --- a/data/conf/samples/ees/cgrates.json +++ b/data/conf/samples/ees/cgrates.json @@ -386,7 +386,30 @@ "flags": ["*attributes"], "attribute_context": "customContext", "attempts": 1, - } + }, + { + "id": "SQLExporter", + "type": "*sql", + "tenant": "cgrates.org", + "attempts": 1, + "opts": { + "user": "cgrates", + "password": "CGRateS.org", + "host": "127.0.0.1", + "port": "3306", + "name": "exportedDatabase", + "tableName": "expTable", + "maxIdleConns": "10", + "maxOpenConns": "100", + "maxConnLifetime": "0", + }, + "fields":[ + {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, + {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime"}, + {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, + {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, + ] + }, ] }, diff --git a/ees/ee.go b/ees/ee.go index 162a566d8..d4b94839f 100644 --- a/ees/ee.go +++ b/ees/ee.go @@ -56,6 +56,8 @@ func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Filt return NewVirtualExporter(cgrCfg, cfgIdx, filterS, dc) case utils.MetaElastic: return NewElasticExporter(cgrCfg, cfgIdx, filterS, dc) + case utils.MetaSQL: + return NewSQLEe(cgrCfg, cfgIdx, filterS, dc) default: return nil, fmt.Errorf("unsupported exporter type: <%s>", cgrCfg.EEsCfg().Exporters[cfgIdx].Type) } diff --git a/ees/sql.go b/ees/sql.go new file mode 100644 index 000000000..eb10fe710 --- /dev/null +++ b/ees/sql.go @@ -0,0 +1,172 @@ +/* +Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ees + +import ( + "fmt" + "strings" + "sync" + + "github.com/jinzhu/gorm" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func NewSQLEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, + dc utils.MapStorage) (sqlEe *SQLEe, err error) { + sqlEe = &SQLEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, + cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc} + + // take the connection parameters from opts + connectString := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", + utils.IfaceAsString(cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLUser]), + utils.IfaceAsString(cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLPassword]), + utils.IfaceAsString(cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLHost]), + utils.IfaceAsString(cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLPort]), + utils.IfaceAsString(cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLName])) + db, err := gorm.Open("mysql", connectString) + if err != nil { + return nil, err + } + if err = db.DB().Ping(); err != nil { + return nil, err + } + // tableName is mandatory in opts + if iface, has := cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLTableName]; !has { + return nil, utils.NewErrMandatoryIeMissing(utils.SQLTableName) + } else { + sqlEe.tableName = utils.IfaceAsString(iface) + } + + if iface, has := cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLMaxIdleConns]; has { + val, err := utils.IfaceAsTInt64(iface) + if err != nil { + return nil, err + } + db.DB().SetMaxIdleConns(int(val)) + } + if iface, has := cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLMaxOpenConns]; has { + val, err := utils.IfaceAsTInt64(iface) + if err != nil { + return nil, err + } + db.DB().SetMaxOpenConns(int(val)) + } + if iface, has := cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLMaxConnLifetime]; has { + val, err := utils.IfaceAsDuration(iface) + if err != nil { + return nil, err + } + db.DB().SetConnMaxLifetime(val) + } + + sqlEe.db = db + return +} + +// SQLEe implements EventExporter interface for SQL +type SQLEe struct { + id string + cgrCfg *config.CGRConfig + cfgIdx int // index of config instance within ERsCfg.Readers + filterS *engine.FilterS + db *gorm.DB + + tableName string + + sync.RWMutex + dc utils.MapStorage +} + +// ID returns the identificator of this exporter +func (sqlEe *SQLEe) ID() string { + return sqlEe.id +} + +// OnEvicted implements EventExporter, doing the cleanup before exit +func (sqlEe *SQLEe) OnEvicted(_ string, _ interface{}) { + return +} + +// ExportEvent implements EventExporter +func (sqlEe *SQLEe) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) { + sqlEe.Lock() + defer func() { + if err != nil { + sqlEe.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID) + } else { + sqlEe.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID) + } + sqlEe.Unlock() + }() + sqlEe.dc[utils.NumberOfEvents] = sqlEe.dc[utils.NumberOfEvents].(int64) + 1 + + var vals []interface{} + + req := utils.MapStorage(cgrEv.Event) + eeReq := NewEventExporterRequest(req, sqlEe.dc, cgrEv.Opts, + sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Tenant, + sqlEe.cgrCfg.GeneralCfg().DefaultTenant, + utils.FirstNonEmpty(sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Timezone, + sqlEe.cgrCfg.GeneralCfg().DefaultTimezone), + sqlEe.filterS) + if err = eeReq.SetFields(sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].ContentFields()); err != nil { + return + } + for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() { + var iface interface{} + if iface, err = eeReq.cnt.FieldAsInterface(el.Value.Slice()); err != nil { + return + } + vals = append(vals, iface) + } + + sqlValues := make([]string, len(vals)) + for i := range vals { + sqlValues[i] = "?" + } + utils.Logger.Debug(fmt.Sprintf("Test??")) + utils.Logger.Debug(fmt.Sprintf("%+v", sqlValues)) + utils.Logger.Debug(fmt.Sprintf("%+v", vals)) + sqlStatement := fmt.Sprintf("INSERT INTO %s VALUES (%s); ", sqlEe.tableName, strings.Join(sqlValues, ",")) + utils.Logger.Debug(fmt.Sprintf("%+v", sqlStatement)) + tx := sqlEe.db.Begin() + utils.Logger.Debug(fmt.Sprintf("TestDaca ajunge aici ???? ")) + res, err := tx.DB().Exec(sqlStatement, vals...) + utils.Logger.Debug(fmt.Sprintf("ALO %+v", res)) + utils.Logger.Debug(fmt.Sprintf("ALO2 %+v", err)) + if err != nil { + utils.Logger.Debug(fmt.Sprintf("%+v", err)) + tx.Rollback() + return err + } + + tx.Commit() + defer tx.Close() + updateEEMetrics(sqlEe.dc, cgrEv.Event, utils.FirstNonEmpty(sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Timezone, + sqlEe.cgrCfg.GeneralCfg().DefaultTimezone)) + + return +} + +func (sqlEe *SQLEe) GetMetrics() utils.MapStorage { + return sqlEe.dc.Clone() +} diff --git a/ees/sql_it_test.go b/ees/sql_it_test.go new file mode 100644 index 000000000..cd56d8ff0 --- /dev/null +++ b/ees/sql_it_test.go @@ -0,0 +1,188 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ees + +import ( + "fmt" + "net/rpc" + "path" + "testing" + "time" + + "github.com/cgrates/cgrates/utils" + + "github.com/jinzhu/gorm" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" +) + +var ( + sqlEeConfigDir string + sqlEeCfgPath string + sqlEeCfg *config.CGRConfig + sqlEeRpc *rpc.Client + db2 *gorm.DB + dbConnString = "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'" + + sTestsSqlEe = []func(t *testing.T){ + testCreateDirectory, + testSqlEeCreateTable, + testSqlEeLoadConfig, + testSqlEeResetDataDB, + testSqlEeResetStorDb, + testSqlEeStartEngine, + testSqlEeRPCConn, + testSqlEeExportEvent, + testSqlEeVerifyExportedEvent, + testStopCgrEngine, + testCleanDirectory, + } +) + +func TestSqlEeExport(t *testing.T) { + sqlEeConfigDir = "ees" + for _, stest := range sTestsSqlEe { + t.Run(sqlEeConfigDir, stest) + } +} + +// create a struct serve as model for *sql exporter +type testModelSql struct { + ID int64 + Cgrid string + AnswerTime time.Time + Usage int64 + Cost float64 + CreatedAt time.Time + UpdatedAt time.Time + DeletedAt *time.Time +} + +func (_ *testModelSql) TableName() string { + return "expTable" +} + +type nopLogger struct{} + +func (nopLogger) Print(values ...interface{}) {} + +func testSqlEeCreateTable(t *testing.T) { + var err error + + if db2, err = gorm.Open("mysql", fmt.Sprintf(dbConnString, "exportedDatabase")); err != nil { + t.Fatal(err) + } + db2.SetLogger(new(nopLogger)) + + if _, err = db2.DB().Exec(`CREATE DATABASE IF NOT EXISTS exportedDatabase;`); err != nil { + t.Fatal(err) + } + tx := db2.Begin() + if !tx.HasTable("expTable") { + tx = tx.CreateTable(new(testModelSql)) + if err = tx.Error; err != nil { + tx.Rollback() + t.Fatal(err) + } + } + tx.Commit() +} + +func testSqlEeLoadConfig(t *testing.T) { + var err error + sqlEeCfgPath = path.Join(*dataDir, "conf", "samples", sqlEeConfigDir) + if sqlEeCfg, err = config.NewCGRConfigFromPath(sqlEeCfgPath); err != nil { + t.Error(err) + } +} + +func testSqlEeResetDataDB(t *testing.T) { + if err := engine.InitDataDb(sqlEeCfg); err != nil { + t.Fatal(err) + } +} + +func testSqlEeResetStorDb(t *testing.T) { + if err := engine.InitStorDb(sqlEeCfg); err != nil { + t.Fatal(err) + } +} + +func testSqlEeStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(sqlEeCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +func testSqlEeRPCConn(t *testing.T) { + var err error + sqlEeRpc, err = newRPCClient(sqlEeCfg.ListenCfg()) + if err != nil { + t.Fatal(err) + } +} + +func testSqlEeExportEvent(t *testing.T) { + eventVoice := &utils.CGREventWithEeIDs{ + EeIDs: []string{"SQLExporter"}, + CGREventWithOpts: &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "voiceEvent", + Time: utils.TimePointer(time.Now()), + Event: map[string]interface{}{ + utils.CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()), + utils.ToR: utils.VOICE, + utils.OriginID: "dsafdsaf", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.META_RATED, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: 10 * time.Second, + utils.RunID: utils.MetaDefault, + utils.Cost: 1.01, + "ExtraFields": map[string]string{"extra1": "val_extra1", + "extra2": "val_extra2", "extra3": "val_extra3"}, + }, + }, + }, + } + + var reply map[string]utils.MapStorage + if err := sqlEeRpc.Call(utils.EeSv1ProcessEvent, eventVoice, &reply); err != nil { + t.Error(err) + } + time.Sleep(10 * time.Millisecond) +} + +func testSqlEeVerifyExportedEvent(t *testing.T) { + var result int64 + db2.Table("expTable").Count(&result) + if result != 1 { + t.Fatal("Expected table to have only one result ", result) + } +} diff --git a/utils/consts.go b/utils/consts.go index 4c5dec124..52a51436e 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -2504,6 +2504,16 @@ const ( ElsVersionLow = "version" ElsVersionType = "version_type" ElsWaitForActiveShards = "wait_for_active_shards" + // SQLEe options + SQLUser = "user" + SQLPassword = "password" + SQLHost = "host" + SQLPort = "port" + SQLName = "name" + SQLMaxIdleConns = "maxIdleConns" + SQLMaxOpenConns = "maxOpenConns" + SQLMaxConnLifetime = "maxConnLifetime" + // Others OptsContext = "*context" Subsys = "*subsys"