Add SQL Exporter

This commit is contained in:
TeoV
2020-12-17 19:34:58 +02:00
committed by Dan Christian Bogos
parent 26c04a2415
commit 07159d8d6a
7 changed files with 401 additions and 2 deletions

View File

@@ -345,7 +345,7 @@ var possibleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV,
var possibleExporterTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.META_NONE, utils.MetaFileFWV,
utils.MetaHTTPPost, utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap,
utils.MetaKafkajsonMap, utils.MetaS3jsonMap, utils.MetaElastic, utils.MetaVirt})
utils.MetaKafkajsonMap, utils.MetaS3jsonMap, utils.MetaElastic, utils.MetaVirt, utils.MetaSQL})
// LazySanityCheck used after check config sanity to display warnings related to the config
func (cfg *CGRConfig) LazySanityCheck() {

View File

@@ -602,6 +602,10 @@ func (cfg *CGRConfig) checkConfigSanity() error {
return fmt.Errorf("<%s> nonexistent folder: %s for exporter with ID: %s", utils.EEs, dir, exp.ID)
}
}
case utils.MetaSQL:
if len(exp.ContentFields()) == 0 {
return fmt.Errorf("<%s> empty content fields for exporter with ID: %s", utils.EEs, exp.ID)
}
}
for _, field := range exp.Fields {
if field.Type != utils.META_NONE && field.Path == utils.EmptyString {

View File

@@ -386,7 +386,30 @@
"flags": ["*attributes"],
"attribute_context": "customContext",
"attempts": 1,
}
},
{
"id": "SQLExporter",
"type": "*sql",
"tenant": "cgrates.org",
"attempts": 1,
"opts": {
"user": "cgrates",
"password": "CGRateS.org",
"host": "127.0.0.1",
"port": "3306",
"name": "exportedDatabase",
"tableName": "expTable",
"maxIdleConns": "10",
"maxOpenConns": "100",
"maxConnLifetime": "0",
},
"fields":[
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"},
{"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime"},
{"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"},
{"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"},
]
},
]
},

View File

@@ -56,6 +56,8 @@ func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Filt
return NewVirtualExporter(cgrCfg, cfgIdx, filterS, dc)
case utils.MetaElastic:
return NewElasticExporter(cgrCfg, cfgIdx, filterS, dc)
case utils.MetaSQL:
return NewSQLEe(cgrCfg, cfgIdx, filterS, dc)
default:
return nil, fmt.Errorf("unsupported exporter type: <%s>", cgrCfg.EEsCfg().Exporters[cfgIdx].Type)
}

172
ees/sql.go Normal file
View File

@@ -0,0 +1,172 @@
/*
Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package ees
import (
"fmt"
"strings"
"sync"
"github.com/jinzhu/gorm"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func NewSQLEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc utils.MapStorage) (sqlEe *SQLEe, err error) {
sqlEe = &SQLEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
// take the connection parameters from opts
connectString := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
utils.IfaceAsString(cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLUser]),
utils.IfaceAsString(cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLPassword]),
utils.IfaceAsString(cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLHost]),
utils.IfaceAsString(cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLPort]),
utils.IfaceAsString(cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLName]))
db, err := gorm.Open("mysql", connectString)
if err != nil {
return nil, err
}
if err = db.DB().Ping(); err != nil {
return nil, err
}
// tableName is mandatory in opts
if iface, has := cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLTableName]; !has {
return nil, utils.NewErrMandatoryIeMissing(utils.SQLTableName)
} else {
sqlEe.tableName = utils.IfaceAsString(iface)
}
if iface, has := cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLMaxIdleConns]; has {
val, err := utils.IfaceAsTInt64(iface)
if err != nil {
return nil, err
}
db.DB().SetMaxIdleConns(int(val))
}
if iface, has := cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLMaxOpenConns]; has {
val, err := utils.IfaceAsTInt64(iface)
if err != nil {
return nil, err
}
db.DB().SetMaxOpenConns(int(val))
}
if iface, has := cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLMaxConnLifetime]; has {
val, err := utils.IfaceAsDuration(iface)
if err != nil {
return nil, err
}
db.DB().SetConnMaxLifetime(val)
}
sqlEe.db = db
return
}
// SQLEe implements EventExporter interface for SQL
type SQLEe struct {
id string
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
db *gorm.DB
tableName string
sync.RWMutex
dc utils.MapStorage
}
// ID returns the identificator of this exporter
func (sqlEe *SQLEe) ID() string {
return sqlEe.id
}
// OnEvicted implements EventExporter, doing the cleanup before exit
func (sqlEe *SQLEe) OnEvicted(_ string, _ interface{}) {
return
}
// ExportEvent implements EventExporter
func (sqlEe *SQLEe) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) {
sqlEe.Lock()
defer func() {
if err != nil {
sqlEe.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
} else {
sqlEe.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
}
sqlEe.Unlock()
}()
sqlEe.dc[utils.NumberOfEvents] = sqlEe.dc[utils.NumberOfEvents].(int64) + 1
var vals []interface{}
req := utils.MapStorage(cgrEv.Event)
eeReq := NewEventExporterRequest(req, sqlEe.dc, cgrEv.Opts,
sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Tenant,
sqlEe.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Timezone,
sqlEe.cgrCfg.GeneralCfg().DefaultTimezone),
sqlEe.filterS)
if err = eeReq.SetFields(sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].ContentFields()); err != nil {
return
}
for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() {
var iface interface{}
if iface, err = eeReq.cnt.FieldAsInterface(el.Value.Slice()); err != nil {
return
}
vals = append(vals, iface)
}
sqlValues := make([]string, len(vals))
for i := range vals {
sqlValues[i] = "?"
}
utils.Logger.Debug(fmt.Sprintf("Test??"))
utils.Logger.Debug(fmt.Sprintf("%+v", sqlValues))
utils.Logger.Debug(fmt.Sprintf("%+v", vals))
sqlStatement := fmt.Sprintf("INSERT INTO %s VALUES (%s); ", sqlEe.tableName, strings.Join(sqlValues, ","))
utils.Logger.Debug(fmt.Sprintf("%+v", sqlStatement))
tx := sqlEe.db.Begin()
utils.Logger.Debug(fmt.Sprintf("TestDaca ajunge aici ???? "))
res, err := tx.DB().Exec(sqlStatement, vals...)
utils.Logger.Debug(fmt.Sprintf("ALO %+v", res))
utils.Logger.Debug(fmt.Sprintf("ALO2 %+v", err))
if err != nil {
utils.Logger.Debug(fmt.Sprintf("%+v", err))
tx.Rollback()
return err
}
tx.Commit()
defer tx.Close()
updateEEMetrics(sqlEe.dc, cgrEv.Event, utils.FirstNonEmpty(sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Timezone,
sqlEe.cgrCfg.GeneralCfg().DefaultTimezone))
return
}
func (sqlEe *SQLEe) GetMetrics() utils.MapStorage {
return sqlEe.dc.Clone()
}

188
ees/sql_it_test.go Normal file
View File

@@ -0,0 +1,188 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package ees
import (
"fmt"
"net/rpc"
"path"
"testing"
"time"
"github.com/cgrates/cgrates/utils"
"github.com/jinzhu/gorm"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
)
var (
sqlEeConfigDir string
sqlEeCfgPath string
sqlEeCfg *config.CGRConfig
sqlEeRpc *rpc.Client
db2 *gorm.DB
dbConnString = "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'"
sTestsSqlEe = []func(t *testing.T){
testCreateDirectory,
testSqlEeCreateTable,
testSqlEeLoadConfig,
testSqlEeResetDataDB,
testSqlEeResetStorDb,
testSqlEeStartEngine,
testSqlEeRPCConn,
testSqlEeExportEvent,
testSqlEeVerifyExportedEvent,
testStopCgrEngine,
testCleanDirectory,
}
)
func TestSqlEeExport(t *testing.T) {
sqlEeConfigDir = "ees"
for _, stest := range sTestsSqlEe {
t.Run(sqlEeConfigDir, stest)
}
}
// create a struct serve as model for *sql exporter
type testModelSql struct {
ID int64
Cgrid string
AnswerTime time.Time
Usage int64
Cost float64
CreatedAt time.Time
UpdatedAt time.Time
DeletedAt *time.Time
}
func (_ *testModelSql) TableName() string {
return "expTable"
}
type nopLogger struct{}
func (nopLogger) Print(values ...interface{}) {}
func testSqlEeCreateTable(t *testing.T) {
var err error
if db2, err = gorm.Open("mysql", fmt.Sprintf(dbConnString, "exportedDatabase")); err != nil {
t.Fatal(err)
}
db2.SetLogger(new(nopLogger))
if _, err = db2.DB().Exec(`CREATE DATABASE IF NOT EXISTS exportedDatabase;`); err != nil {
t.Fatal(err)
}
tx := db2.Begin()
if !tx.HasTable("expTable") {
tx = tx.CreateTable(new(testModelSql))
if err = tx.Error; err != nil {
tx.Rollback()
t.Fatal(err)
}
}
tx.Commit()
}
func testSqlEeLoadConfig(t *testing.T) {
var err error
sqlEeCfgPath = path.Join(*dataDir, "conf", "samples", sqlEeConfigDir)
if sqlEeCfg, err = config.NewCGRConfigFromPath(sqlEeCfgPath); err != nil {
t.Error(err)
}
}
func testSqlEeResetDataDB(t *testing.T) {
if err := engine.InitDataDb(sqlEeCfg); err != nil {
t.Fatal(err)
}
}
func testSqlEeResetStorDb(t *testing.T) {
if err := engine.InitStorDb(sqlEeCfg); err != nil {
t.Fatal(err)
}
}
func testSqlEeStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(sqlEeCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
func testSqlEeRPCConn(t *testing.T) {
var err error
sqlEeRpc, err = newRPCClient(sqlEeCfg.ListenCfg())
if err != nil {
t.Fatal(err)
}
}
func testSqlEeExportEvent(t *testing.T) {
eventVoice := &utils.CGREventWithEeIDs{
EeIDs: []string{"SQLExporter"},
CGREventWithOpts: &utils.CGREventWithOpts{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "voiceEvent",
Time: utils.TimePointer(time.Now()),
Event: map[string]interface{}{
utils.CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()),
utils.ToR: utils.VOICE,
utils.OriginID: "dsafdsaf",
utils.OriginHost: "192.168.1.1",
utils.RequestType: utils.META_RATED,
utils.Tenant: "cgrates.org",
utils.Category: "call",
utils.Account: "1001",
utils.Subject: "1001",
utils.Destination: "1002",
utils.SetupTime: time.Unix(1383813745, 0).UTC(),
utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
utils.Usage: 10 * time.Second,
utils.RunID: utils.MetaDefault,
utils.Cost: 1.01,
"ExtraFields": map[string]string{"extra1": "val_extra1",
"extra2": "val_extra2", "extra3": "val_extra3"},
},
},
},
}
var reply map[string]utils.MapStorage
if err := sqlEeRpc.Call(utils.EeSv1ProcessEvent, eventVoice, &reply); err != nil {
t.Error(err)
}
time.Sleep(10 * time.Millisecond)
}
func testSqlEeVerifyExportedEvent(t *testing.T) {
var result int64
db2.Table("expTable").Count(&result)
if result != 1 {
t.Fatal("Expected table to have only one result ", result)
}
}

View File

@@ -2504,6 +2504,16 @@ const (
ElsVersionLow = "version"
ElsVersionType = "version_type"
ElsWaitForActiveShards = "wait_for_active_shards"
// SQLEe options
SQLUser = "user"
SQLPassword = "password"
SQLHost = "host"
SQLPort = "port"
SQLName = "name"
SQLMaxIdleConns = "maxIdleConns"
SQLMaxOpenConns = "maxOpenConns"
SQLMaxConnLifetime = "maxConnLifetime"
// Others
OptsContext = "*context"
Subsys = "*subsys"