mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Added DSN opts for exporter sql
This commit is contained in:
committed by
Dan Christian Bogos
parent
b2e84bfdf0
commit
89a9cd8598
@@ -523,6 +523,7 @@ const CGRATES_CFG_JSON = `
|
||||
// "sqlMaxIdleConns": 0, // SQLMaxIdleConns
|
||||
// "sqlMaxOpenConns": 0, // SQLMaxOpenConns
|
||||
// "sqlConnMaxLifetime": "0", // SQLConnMaxLifetime
|
||||
// "sqlDSNParams": {}, // DSN params
|
||||
|
||||
|
||||
// "sqlTableName":"cdrs", // the name of the table from where the events are exported
|
||||
|
||||
112
config/eescfg.go
112
config/eescfg.go
@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package config
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
@@ -159,6 +160,7 @@ type EventExporterOpts struct {
|
||||
SQLMaxIdleConns *int
|
||||
SQLMaxOpenConns *int
|
||||
SQLConnMaxLifetime *time.Duration
|
||||
SQLDSNParams map[string]string
|
||||
SQLTableName *string
|
||||
SQLDBName *string
|
||||
SSLMode *string
|
||||
@@ -268,6 +270,10 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson)
|
||||
}
|
||||
eeOpts.SQLConnMaxLifetime = utils.DurationPointer(sqlConnMaxLifetime)
|
||||
}
|
||||
if jsnCfg.SQLDSNParams != nil {
|
||||
eeOpts.SQLDSNParams = make(map[string]string)
|
||||
eeOpts.SQLDSNParams = jsnCfg.SQLDSNParams
|
||||
}
|
||||
if jsnCfg.SQLTableName != nil {
|
||||
eeOpts.SQLTableName = jsnCfg.SQLTableName
|
||||
}
|
||||
@@ -514,6 +520,9 @@ func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts {
|
||||
if eeOpts.SQLConnMaxLifetime != nil {
|
||||
cln.SQLConnMaxLifetime = utils.DurationPointer(*eeOpts.SQLConnMaxLifetime)
|
||||
}
|
||||
if eeOpts.SQLDSNParams != nil {
|
||||
cln.SQLDSNParams = eeOpts.SQLDSNParams
|
||||
}
|
||||
if eeOpts.SQLTableName != nil {
|
||||
cln.SQLTableName = utils.StringPointer(*eeOpts.SQLTableName)
|
||||
}
|
||||
@@ -697,6 +706,9 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str
|
||||
if eeC.Opts.SQLConnMaxLifetime != nil {
|
||||
opts[utils.SQLConnMaxLifetime] = eeC.Opts.SQLConnMaxLifetime.String()
|
||||
}
|
||||
if eeC.Opts.SQLDSNParams != nil {
|
||||
opts[utils.SQLDSNParams] = eeC.Opts.SQLDSNParams
|
||||
}
|
||||
if eeC.Opts.SQLTableName != nil {
|
||||
opts[utils.SQLTableNameOpt] = *eeC.Opts.SQLTableName
|
||||
}
|
||||
@@ -821,52 +833,53 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str
|
||||
}
|
||||
|
||||
type EventExporterOptsJson struct {
|
||||
CSVFieldSeparator *string `json:"csvFieldSeparator"`
|
||||
ElsIndex *string `json:"elsIndex"`
|
||||
ElsIfPrimaryTerm *int `json:"elsIfPrimaryTerm"`
|
||||
ElsIfSeqNo *int `json:"elsIfSeqNo"`
|
||||
ElsOpType *string `json:"elsOpType"`
|
||||
ElsPipeline *string `json:"elsPipeline"`
|
||||
ElsRouting *string `json:"elsRouting"`
|
||||
ElsTimeout *string `json:"elsTimeout"`
|
||||
ElsVersion *int `json:"elsVersion"`
|
||||
ElsVersionType *string `json:"elsVersionType"`
|
||||
ElsWaitForActiveShards *string `json:"elsWaitForActiveShards"`
|
||||
SQLMaxIdleConns *int `json:"sqlMaxIdleConns"`
|
||||
SQLMaxOpenConns *int `json:"sqlMaxOpenConns"`
|
||||
SQLConnMaxLifetime *string `json:"sqlConnMaxLifetime"`
|
||||
SQLTableName *string `json:"sqlTableName"`
|
||||
SQLDBName *string `json:"sqlDBName"`
|
||||
SSLMode *string `json:"sslMode"`
|
||||
KafkaTopic *string `json:"kafkaTopic"`
|
||||
AMQPQueueID *string `json:"amqpQueueID"`
|
||||
AMQPRoutingKey *string `json:"amqpRoutingKey"`
|
||||
AMQPExchange *string `json:"amqpExchange"`
|
||||
AMQPExchangeType *string `json:"amqpExchangeType"`
|
||||
AWSRegion *string `json:"awsRegion"`
|
||||
AWSKey *string `json:"awsKey"`
|
||||
AWSSecret *string `json:"awsSecret"`
|
||||
AWSToken *string `json:"awsToken"`
|
||||
SQSQueueID *string `json:"sqsQueueID"`
|
||||
S3BucketID *string `json:"s3BucketID"`
|
||||
S3FolderPath *string `json:"s3FolderPath"`
|
||||
NATSJetStream *bool `json:"natsJetStream"`
|
||||
NATSSubject *string `json:"natsSubject"`
|
||||
NATSJWTFile *string `json:"natsJWTFile"`
|
||||
NATSSeedFile *string `json:"natsSeedFile"`
|
||||
NATSCertificateAuthority *string `json:"natsCertificateAuthority"`
|
||||
NATSClientCertificate *string `json:"natsClientCertificate"`
|
||||
NATSClientKey *string `json:"natsClientKey"`
|
||||
NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"`
|
||||
RPCCodec *string `json:"rpcCodec"`
|
||||
ServiceMethod *string `json:"serviceMethod"`
|
||||
KeyPath *string `json:"keyPath"`
|
||||
CertPath *string `json:"certPath"`
|
||||
CAPath *string `json:"caPath"`
|
||||
ConnIDs *[]string `json:"connIDs"`
|
||||
TLS *bool `json:"tls"`
|
||||
RPCConnTimeout *string `json:"rpcConnTimeout"`
|
||||
RPCReplyTimeout *string `json:"rpcReplyTimeout"`
|
||||
CSVFieldSeparator *string `json:"csvFieldSeparator"`
|
||||
ElsIndex *string `json:"elsIndex"`
|
||||
ElsIfPrimaryTerm *int `json:"elsIfPrimaryTerm"`
|
||||
ElsIfSeqNo *int `json:"elsIfSeqNo"`
|
||||
ElsOpType *string `json:"elsOpType"`
|
||||
ElsPipeline *string `json:"elsPipeline"`
|
||||
ElsRouting *string `json:"elsRouting"`
|
||||
ElsTimeout *string `json:"elsTimeout"`
|
||||
ElsVersion *int `json:"elsVersion"`
|
||||
ElsVersionType *string `json:"elsVersionType"`
|
||||
ElsWaitForActiveShards *string `json:"elsWaitForActiveShards"`
|
||||
SQLMaxIdleConns *int `json:"sqlMaxIdleConns"`
|
||||
SQLMaxOpenConns *int `json:"sqlMaxOpenConns"`
|
||||
SQLConnMaxLifetime *string `json:"sqlConnMaxLifetime"`
|
||||
SQLDSNParams map[string]string `json:"sqlDSNParams"`
|
||||
SQLTableName *string `json:"sqlTableName"`
|
||||
SQLDBName *string `json:"sqlDBName"`
|
||||
SSLMode *string `json:"sslMode"`
|
||||
KafkaTopic *string `json:"kafkaTopic"`
|
||||
AMQPQueueID *string `json:"amqpQueueID"`
|
||||
AMQPRoutingKey *string `json:"amqpRoutingKey"`
|
||||
AMQPExchange *string `json:"amqpExchange"`
|
||||
AMQPExchangeType *string `json:"amqpExchangeType"`
|
||||
AWSRegion *string `json:"awsRegion"`
|
||||
AWSKey *string `json:"awsKey"`
|
||||
AWSSecret *string `json:"awsSecret"`
|
||||
AWSToken *string `json:"awsToken"`
|
||||
SQSQueueID *string `json:"sqsQueueID"`
|
||||
S3BucketID *string `json:"s3BucketID"`
|
||||
S3FolderPath *string `json:"s3FolderPath"`
|
||||
NATSJetStream *bool `json:"natsJetStream"`
|
||||
NATSSubject *string `json:"natsSubject"`
|
||||
NATSJWTFile *string `json:"natsJWTFile"`
|
||||
NATSSeedFile *string `json:"natsSeedFile"`
|
||||
NATSCertificateAuthority *string `json:"natsCertificateAuthority"`
|
||||
NATSClientCertificate *string `json:"natsClientCertificate"`
|
||||
NATSClientKey *string `json:"natsClientKey"`
|
||||
NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"`
|
||||
RPCCodec *string `json:"rpcCodec"`
|
||||
ServiceMethod *string `json:"serviceMethod"`
|
||||
KeyPath *string `json:"keyPath"`
|
||||
CertPath *string `json:"certPath"`
|
||||
CAPath *string `json:"caPath"`
|
||||
ConnIDs *[]string `json:"connIDs"`
|
||||
TLS *bool `json:"tls"`
|
||||
RPCConnTimeout *string `json:"rpcConnTimeout"`
|
||||
RPCReplyTimeout *string `json:"rpcReplyTimeout"`
|
||||
}
|
||||
|
||||
// EventExporterJsonCfg is the configuration of a single EventExporter
|
||||
@@ -1003,6 +1016,13 @@ func diffEventExporterOptsJsonCfg(d *EventExporterOptsJson, v1, v2 *EventExporte
|
||||
} else {
|
||||
d.SQLConnMaxLifetime = nil
|
||||
}
|
||||
if v2.SQLDSNParams != nil {
|
||||
if v1.SQLDSNParams == nil || !reflect.DeepEqual(v1.SQLDSNParams, v2.SQLDSNParams) {
|
||||
d.SQLDSNParams = v2.SQLDSNParams
|
||||
}
|
||||
} else {
|
||||
d.SQLDSNParams = nil
|
||||
}
|
||||
if v2.SQLTableName != nil {
|
||||
if v1.SQLTableName == nil ||
|
||||
*v1.SQLTableName != *v2.SQLTableName {
|
||||
|
||||
@@ -608,6 +608,10 @@ func TestEEsCfgAsMapInterface(t *testing.T) {
|
||||
"export_path": "/tmp/testCSV",
|
||||
"opts": {
|
||||
"awsSecret": "test",
|
||||
"sqlDSNParams": {
|
||||
"allowOldPasswords": "true",
|
||||
"allowNativePasswords": "true",
|
||||
},
|
||||
},
|
||||
"timezone": "UTC",
|
||||
"filters": [],
|
||||
@@ -642,6 +646,10 @@ func TestEEsCfgAsMapInterface(t *testing.T) {
|
||||
utils.ExportPathCfg: "/tmp/testCSV",
|
||||
utils.OptsCfg: map[string]interface{}{
|
||||
utils.AWSSecret: "test",
|
||||
utils.SQLDSNParams: map[string]string{
|
||||
"allowOldPasswords": "true",
|
||||
"allowNativePasswords": "true",
|
||||
},
|
||||
},
|
||||
utils.TimezoneCfg: "UTC",
|
||||
utils.FiltersCfg: []string{},
|
||||
@@ -679,7 +687,7 @@ func TestEEsCfgAsMapInterface(t *testing.T) {
|
||||
if !reflect.DeepEqual(rcv[utils.ExportersCfg].([]map[string]interface{})[1],
|
||||
eMap[utils.ExportersCfg].([]map[string]interface{})[0]) {
|
||||
t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(eMap[utils.ExportersCfg].([]map[string]interface{})[0]),
|
||||
utils.ToJSON(rcv[utils.ExportersCfg].([]map[string]interface{})[0]))
|
||||
utils.ToJSON(rcv[utils.ExportersCfg].([]map[string]interface{})[1]))
|
||||
}
|
||||
rcv[utils.ExportersCfg] = nil
|
||||
eMap[utils.ExportersCfg] = nil
|
||||
|
||||
@@ -437,6 +437,10 @@
|
||||
"sqlMaxIdleConns": 10,
|
||||
"sqlMaxOpenConns": 100,
|
||||
"sqlConnMaxLifetime": "0",
|
||||
"sqlDSNParams": {
|
||||
"allowOldPasswords": "true",
|
||||
"allowNativePasswords": "true",
|
||||
},
|
||||
},
|
||||
"fields":[ // in case that the path is *exp.*row user must complete all the fields one to one with his sql schema in the correct order
|
||||
{"tag": "CGRID", "path": "*exp.*row", "type": "*group", "value": "~*req.CGRID"},
|
||||
@@ -457,6 +461,7 @@
|
||||
"sqlMaxIdleConns": 10,
|
||||
"sqlMaxOpenConns": 100,
|
||||
"sqlConnMaxLifetime": "0",
|
||||
|
||||
},
|
||||
"fields":[ // the path constains *exp.columnName
|
||||
{"tag": "CGRID", "path": "*exp.cgrid", "type": "*variable", "value": "~*req.CGRID"},
|
||||
|
||||
14
ees/sql.go
14
ees/sql.go
@@ -90,7 +90,7 @@ func (sqlEe *SQLEe) initDialector() (err error) {
|
||||
switch u.Scheme {
|
||||
case utils.MySQL:
|
||||
sqlEe.dialect = mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
|
||||
u.User.Username(), password, u.Hostname(), u.Port(), dbname))
|
||||
u.User.Username(), password, u.Hostname(), u.Port(), dbname) + appendToMysqlDSNOpts(sqlEe.Cfg().Opts))
|
||||
case utils.Postgres:
|
||||
sqlEe.dialect = postgres.Open(fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s", u.Hostname(), u.Port(), dbname, u.User.Username(), password, ssl))
|
||||
default:
|
||||
@@ -99,6 +99,18 @@ func (sqlEe *SQLEe) initDialector() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func appendToMysqlDSNOpts(opts *config.EventExporterOpts) string {
|
||||
if opts.SQLDSNParams != nil {
|
||||
var dsn string
|
||||
for key, val := range opts.SQLDSNParams {
|
||||
dsn = dsn + "&" + key + "=" + val
|
||||
}
|
||||
utils.Logger.Debug("dsn: " + dsn)
|
||||
return dsn
|
||||
}
|
||||
return utils.EmptyString
|
||||
}
|
||||
|
||||
func openDB(dialect gorm.Dialector, opts *config.EventExporterOpts) (db *gorm.DB, sqlDB *sql.DB, err error) {
|
||||
if db, err = gorm.Open(dialect, &gorm.Config{AllowGlobalUpdate: true}); err != nil {
|
||||
return
|
||||
|
||||
@@ -2439,6 +2439,7 @@ const (
|
||||
|
||||
SQLMaxOpenConns = "sqlMaxOpenConns"
|
||||
SQLConnMaxLifetime = "sqlConnMaxLifetime"
|
||||
SQLDSNParams = "sqlDSNParams"
|
||||
|
||||
// fileCSV
|
||||
CSVRowLengthOpt = "csvRowLength"
|
||||
|
||||
Reference in New Issue
Block a user