Implemention for els exporter over https && adding logger type for els exporter requests

This commit is contained in:
gezimbll
2023-06-14 06:36:13 -04:00
committed by Dan Christian Bogos
parent fa09ca2f09
commit 5eb97906be
7 changed files with 260 additions and 141 deletions

View File

@@ -507,12 +507,23 @@ const CGRATES_CFG_JSON = `
// Elasticsearch options
// "elsCloud":false, //ELS Cloud Endpoint
// "elsApiKey": "",
// "elsUsername":"",
// "elsPassword":"",
// "discoverNodesOnStart":false,
// "discoverNodesOnInterval":"100s",
// "elsCloud":true, //ExportPath will be an CLoud ID deployment
// "elsApiKey": "", // Base64-encoded token for authorization; if set, overrides username/password and service token.
// "elsUsername":"", // Username for HTTP Basic Authentication.
// "elsPassword":"", // Password for HTTP Basic Authentication.
// "elsServiceToken":"" // Service token for authorization; if set, overrides username/password.
// "elsCertificateFingerPrint":"" // SHA256 hex fingerprint given by Elasticsearch on first launch.
// "elsDiscoverNodesOnStart":false, // Discover nodes when initializing the client. Default: false.
// "elsDiscoverNodesInterval":"10s", // Discover nodes periodically. Default: disabled.
// "elsEnableDebugLogger":"false", // Enable the debug logging.
// "elsLogger":"", //The logger type can either be elsJson,elsColor and elsText
// "elsCompressRequestBody":false, //Enable compression on requests
// "elsCompressRequestBodyLevel":0, //Default: gzip.DefaultCompression, 9:BestCompression,-2:HuffmanOnly,1:BestSpeed
// "elsRetryOnStatus":[], // List of status codes for retry. Default: 502, 503, 504.
// "elsMaxRetries": 0, // Default: 3.
// "elsDisableRetry": false, // Default: false.
// "elsIndex": "", // ElsIndex
// "elsIfPrimaryTerm": 0, // ElsIfPrimaryTerm
// "elsIfSeqNo": 0, // ElsIfSeqNo

View File

@@ -154,68 +154,71 @@ func (eeS *EEsCfg) AsMapInterface(separator string) (initialMP map[string]any) {
}
type EventExporterOpts struct {
CSVFieldSeparator *string
ElsIndex *string
ElsIfPrimaryTerm *int
ElsDiscoverNodesOnStart *bool
ElsDiscoverNodeInterval *time.Duration
ElsCloud *bool
ElsAPIKey *string
ElsCACert *string
ElsCertificateFingerprint *string
ElsServiceToken *string
ElsUsername *string // Username for HTTP Basic Authentication.
ElsPassword *string
ElsEnableDebugLogger *bool
ElsCompressRequestBody *bool
ElsRetryOnStatus *[]int
ElsIfSeqNo *int
ElsOpType *string
ElsPipeline *string
ElsRouting *string
ElsTimeout *time.Duration
ElsVersion *int
ElsVersionType *string
ElsWaitForActiveShards *string
SQLMaxIdleConns *int
SQLMaxOpenConns *int
SQLConnMaxLifetime *time.Duration
MYSQLDSNParams map[string]string
SQLTableName *string
SQLDBName *string
PgSSLMode *string
KafkaTopic *string
AMQPRoutingKey *string
AMQPQueueID *string
AMQPExchange *string
AMQPExchangeType *string
AMQPUsername *string
AMQPPassword *string
AWSRegion *string
AWSKey *string
AWSSecret *string
AWSToken *string
SQSQueueID *string
S3BucketID *string
S3FolderPath *string
NATSJetStream *bool
NATSSubject *string
NATSJWTFile *string
NATSSeedFile *string
NATSCertificateAuthority *string
NATSClientCertificate *string
NATSClientKey *string
NATSJetStreamMaxWait *time.Duration
RPCCodec *string
ServiceMethod *string
KeyPath *string
CertPath *string
CAPath *string
TLS *bool
ConnIDs *[]string
RPCConnTimeout *time.Duration
RPCReplyTimeout *time.Duration
RPCAPIOpts map[string]any
CSVFieldSeparator *string
ElsIndex *string
ElsIfPrimaryTerm *int
ElsDiscoverNodesOnStart *bool
ElsDiscoverNodeInterval *time.Duration
ElsCloud *bool
ElsAPIKey *string
ElsCertificateFingerprint *string
ElsServiceToken *string
ElsUsername *string // Username for HTTP Basic Authentication.
ElsPassword *string
ElsEnableDebugLogger *bool
ElsLogger *string
ElsCompressRequestBody *bool
ElsCompressRequestBodyLevel *int
ElsRetryOnStatus *[]int
ElsMaxRetries *int
ElsDisableRetry *bool
ElsIfSeqNo *int
ElsOpType *string
ElsPipeline *string
ElsRouting *string
ElsTimeout *time.Duration
ElsVersion *int
ElsVersionType *string
ElsWaitForActiveShards *string
SQLMaxIdleConns *int
SQLMaxOpenConns *int
SQLConnMaxLifetime *time.Duration
MYSQLDSNParams map[string]string
SQLTableName *string
SQLDBName *string
PgSSLMode *string
KafkaTopic *string
AMQPRoutingKey *string
AMQPQueueID *string
AMQPExchange *string
AMQPExchangeType *string
AMQPUsername *string
AMQPPassword *string
AWSRegion *string
AWSKey *string
AWSSecret *string
AWSToken *string
SQSQueueID *string
S3BucketID *string
S3FolderPath *string
NATSJetStream *bool
NATSSubject *string
NATSJWTFile *string
NATSSeedFile *string
NATSCertificateAuthority *string
NATSClientCertificate *string
NATSClientKey *string
NATSJetStreamMaxWait *time.Duration
RPCCodec *string
ServiceMethod *string
KeyPath *string
CertPath *string
CAPath *string
TLS *bool
ConnIDs *[]string
RPCConnTimeout *time.Duration
RPCReplyTimeout *time.Duration
RPCAPIOpts map[string]any
}
// EventExporterCfg the config for a Event Exporter
@@ -268,9 +271,6 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson)
if jsnCfg.ElsAPIKey != nil {
eeOpts.ElsAPIKey = jsnCfg.ElsAPIKey
}
if jsnCfg.CAPath != nil {
eeOpts.CAPath = jsnCfg.CAPath
}
if jsnCfg.ElsServiceToken != nil {
eeOpts.ElsServiceToken = jsnCfg.ElsServiceToken
}
@@ -280,9 +280,15 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson)
if jsnCfg.ElsEnableDebugLogger != nil {
eeOpts.ElsEnableDebugLogger = jsnCfg.ElsEnableDebugLogger
}
if jsnCfg.ElsLogger != nil {
eeOpts.ElsLogger = jsnCfg.ElsLogger
}
if jsnCfg.ElsCompressRequestBody != nil {
eeOpts.ElsCompressRequestBody = jsnCfg.ElsCompressRequestBody
}
if jsnCfg.ElsCompressRequestBodyLevel != nil {
eeOpts.ElsCompressRequestBodyLevel = jsnCfg.ElsCompressRequestBodyLevel
}
if jsnCfg.ElsUsername != nil {
eeOpts.ElsUsername = jsnCfg.ElsUsername
}
@@ -302,6 +308,12 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson)
if jsnCfg.ElsRetryOnStatus != nil {
eeOpts.ElsRetryOnStatus = jsnCfg.ElsRetryOnStatus
}
if jsnCfg.ElsMaxRetries != nil {
eeOpts.ElsMaxRetries = jsnCfg.ElsMaxRetries
}
if jsnCfg.ElsDisableRetry != nil {
eeOpts.ElsDisableRetry = jsnCfg.ElsDisableRetry
}
if jsnCfg.ElsIndex != nil {
eeOpts.ElsIndex = jsnCfg.ElsIndex
}

View File

@@ -298,68 +298,71 @@ type EEsJsonCfg struct {
}
type EventExporterOptsJson struct {
CSVFieldSeparator *string `json:"csvFieldSeparator"`
ElsCloud *bool `json:"elsCloud"`
ElsAPIKey *string `json:"elsApiKey"`
ElsServiceToken *string `json:"elsServiceToken"`
ElsCACert *string `json:"elsCACert"`
ElsCertificateFingerprint *string `json:"elsCertificateFingerPrint"`
ElsUsername *string `json:"elsUsername"`
ElsPassword *string `json:"elsPassword"`
ElsDiscoverNodesOnStart *bool `json:"elsDiscoverNodesOnStart"`
ElsDiscoverNodesInterval *string `json:"elsDiscoverNodesInterval"`
ElsEnableDebugLogger *bool `json:"elsEnableDebugLogger"`
ElsCompressRequestBody *bool `json:"elsCompressRequestBody"`
ElsRetryOnStatus *[]int `json:"elsRetryOnStatus"`
ElsIndex *string `json:"elsIndex"`
ElsIfPrimaryTerm *int `json:"elsIfPrimaryTerm"`
ElsIfSeqNo *int `json:"elsIfSeqNo"`
ElsOpType *string `json:"elsOpType"`
ElsPipeline *string `json:"elsPipeline"`
ElsRouting *string `json:"elsRouting"`
ElsTimeout *string `json:"elsTimeout"`
ElsVersion *int `json:"elsVersion"`
ElsVersionType *string `json:"elsVersionType"`
ElsWaitForActiveShards *string `json:"elsWaitForActiveShards"`
SQLMaxIdleConns *int `json:"sqlMaxIdleConns"`
SQLMaxOpenConns *int `json:"sqlMaxOpenConns"`
SQLConnMaxLifetime *string `json:"sqlConnMaxLifetime"`
MYSQLDSNParams map[string]string `json:"mysqlDSNParams"`
SQLTableName *string `json:"sqlTableName"`
SQLDBName *string `json:"sqlDBName"`
PgSSLMode *string `json:"pgSSLMode"`
KafkaTopic *string `json:"kafkaTopic"`
AMQPQueueID *string `json:"amqpQueueID"`
AMQPRoutingKey *string `json:"amqpRoutingKey"`
AMQPExchange *string `json:"amqpExchange"`
AMQPExchangeType *string `json:"amqpExchangeType"`
AMQPUsername *string `json:"amqpUsername"`
AMQPPassword *string `json:"amqpPassword"`
AWSRegion *string `json:"awsRegion"`
AWSKey *string `json:"awsKey"`
AWSSecret *string `json:"awsSecret"`
AWSToken *string `json:"awsToken"`
SQSQueueID *string `json:"sqsQueueID"`
S3BucketID *string `json:"s3BucketID"`
S3FolderPath *string `json:"s3FolderPath"`
NATSJetStream *bool `json:"natsJetStream"`
NATSSubject *string `json:"natsSubject"`
NATSJWTFile *string `json:"natsJWTFile"`
NATSSeedFile *string `json:"natsSeedFile"`
NATSCertificateAuthority *string `json:"natsCertificateAuthority"`
NATSClientCertificate *string `json:"natsClientCertificate"`
NATSClientKey *string `json:"natsClientKey"`
NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"`
RPCCodec *string `json:"rpcCodec"`
ServiceMethod *string `json:"serviceMethod"`
KeyPath *string `json:"keyPath"`
CertPath *string `json:"certPath"`
CAPath *string `json:"caPath"`
ConnIDs *[]string `json:"connIDs"`
TLS *bool `json:"tls"`
RPCConnTimeout *string `json:"rpcConnTimeout"`
RPCReplyTimeout *string `json:"rpcReplyTimeout"`
RPCAPIOpts map[string]any `json:"rpcAPIOpts"`
CSVFieldSeparator *string `json:"csvFieldSeparator"`
ElsCloud *bool `json:"elsCloud"`
ElsAPIKey *string `json:"elsApiKey"`
ElsServiceToken *string `json:"elsServiceToken"`
ElsCertificateFingerprint *string `json:"elsCertificateFingerPrint"`
ElsUsername *string `json:"elsUsername"`
ElsPassword *string `json:"elsPassword"`
ElsDiscoverNodesOnStart *bool `json:"elsDiscoverNodesOnStart"`
ElsDiscoverNodesInterval *string `json:"elsDiscoverNodesInterval"`
ElsEnableDebugLogger *bool `json:"elsEnableDebugLogger"`
ElsLogger *string `json:"elsLogger"`
ElsCompressRequestBody *bool `json:"elsCompressRequestBody"`
ElsCompressRequestBodyLevel *int `json:"elsCompressRequestBodyLevel"`
ElsRetryOnStatus *[]int `json:"elsRetryOnStatus"`
ElsMaxRetries *int `json:"elsMaxRetries"`
ElsDisableRetry *bool `json:"elsDisableRetry"`
ElsIndex *string `json:"elsIndex"`
ElsIfPrimaryTerm *int `json:"elsIfPrimaryTerm"`
ElsIfSeqNo *int `json:"elsIfSeqNo"`
ElsOpType *string `json:"elsOpType"`
ElsPipeline *string `json:"elsPipeline"`
ElsRouting *string `json:"elsRouting"`
ElsTimeout *string `json:"elsTimeout"`
ElsVersion *int `json:"elsVersion"`
ElsVersionType *string `json:"elsVersionType"`
ElsWaitForActiveShards *string `json:"elsWaitForActiveShards"`
SQLMaxIdleConns *int `json:"sqlMaxIdleConns"`
SQLMaxOpenConns *int `json:"sqlMaxOpenConns"`
SQLConnMaxLifetime *string `json:"sqlConnMaxLifetime"`
MYSQLDSNParams map[string]string `json:"mysqlDSNParams"`
SQLTableName *string `json:"sqlTableName"`
SQLDBName *string `json:"sqlDBName"`
PgSSLMode *string `json:"pgSSLMode"`
KafkaTopic *string `json:"kafkaTopic"`
AMQPQueueID *string `json:"amqpQueueID"`
AMQPRoutingKey *string `json:"amqpRoutingKey"`
AMQPExchange *string `json:"amqpExchange"`
AMQPExchangeType *string `json:"amqpExchangeType"`
AMQPUsername *string `json:"amqpUsername"`
AMQPPassword *string `json:"amqpPassword"`
AWSRegion *string `json:"awsRegion"`
AWSKey *string `json:"awsKey"`
AWSSecret *string `json:"awsSecret"`
AWSToken *string `json:"awsToken"`
SQSQueueID *string `json:"sqsQueueID"`
S3BucketID *string `json:"s3BucketID"`
S3FolderPath *string `json:"s3FolderPath"`
NATSJetStream *bool `json:"natsJetStream"`
NATSSubject *string `json:"natsSubject"`
NATSJWTFile *string `json:"natsJWTFile"`
NATSSeedFile *string `json:"natsSeedFile"`
NATSCertificateAuthority *string `json:"natsCertificateAuthority"`
NATSClientCertificate *string `json:"natsClientCertificate"`
NATSClientKey *string `json:"natsClientKey"`
NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"`
RPCCodec *string `json:"rpcCodec"`
ServiceMethod *string `json:"serviceMethod"`
KeyPath *string `json:"keyPath"`
CertPath *string `json:"certPath"`
CAPath *string `json:"caPath"`
ConnIDs *[]string `json:"connIDs"`
TLS *bool `json:"tls"`
RPCConnTimeout *string `json:"rpcConnTimeout"`
RPCReplyTimeout *string `json:"rpcReplyTimeout"`
RPCAPIOpts map[string]any `json:"rpcAPIOpts"`
}
// EventExporterJsonCfg is the configuration of a single EventExporter

View File

@@ -364,9 +364,15 @@
"opts": {
"elsIndex": "cdrs",
"elsDiscoverNodesOnStart":true,
//"elsCloud":false,
// "elsUsername":"",
// "elsPassword":"",
//"elsDiscoverNodesInterval":"10s",
// "elsLogger":"elsJson",
// "elsEnableDebugLogger":false,
// "elsCompressRequestBody":false,
// "elsCompressRequestBodyLevel":0,
// "elsRetryOnStatus":[],
// "elsMaxRetries": 0,
// "elsDisableRetry": false,
//"elsIfPrimaryTerm": 0,
//"elsIfSeqNo": 0,
"elsOpType": "",
@@ -398,14 +404,72 @@
{
"id": "ElasticsearchExporterCloud",
"type": "*els",
"export_path": "Deployment:dXMtY2c0ZjI0YWN3Zg==",
"export_path": "Deployment:dXMtY2VudHJh0YW==",
"attempts": 1,
"opts": {
"elsIndex": "cdrs",
"elsUsername":"elastic_user",
"elsPassword":"rhYwyCE234",
"elsPassword":"",
"elsCloud":true,
//"elsApiKey":"aZmd2UQ==",
// "elsLogger":"elsJson",
// "elsEnableDebugLogger":false,
// "elsCompressRequestBody":false,
// "elsCompressRequestBodyLevel":0,
// "elsRetryOnStatus":[],
// "elsMaxRetries": 0,
// "elsDisableRetry": false,
//"elsIfPrimaryTerm": 0,
//"elsIfSeqNo": 0,
"elsOpType": "",
"elsPipeline": "",
"require_alias": "",
"elsRouting": "",
"elsTimeout": "0",
//"elsVersion": 0,
"elsVersionType": "",
"elsWaitForActiveShards": "",
},
"fields":[
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"},
{"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"},
{"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"},
{"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"},
{"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"},
{"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"},
{"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"},
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
{"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"},
{"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"},
{"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime"},
{"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime"},
{"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"},
{"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"},
],
},
{
"id": "ElasticsearchExporterHttps",
"type": "*els",
"export_path": "https://192.168.56.29:9200",
"attempts": 1,
"opts": {
"elsIndex": "cdrs",
"elsUsername":"elastic",
"elsPassword":"",
"caPath":"/path/to/http_ca.crt",
//"elsCertificateFingerPrint":"",
// "elsServiceToken":""
// "elsLogger":"elsJson",
// "elsDiscoverNodesOnStart":true,
// "elsDiscoverNodesInterval":"10s",
// "elsEnableDebugLogger":false,
// "elsCompressRequestBody":true,
// "elsCompressRequestBodyLevel":0,
// "elsRetryOnStatus":[],
// "elsMaxRetries": 0,
// "elsDisableRetry": false,
//"elsIfPrimaryTerm": 0,
//"elsIfSeqNo": 0,
"elsOpType": "",

View File

@@ -27,6 +27,7 @@ import (
"strings"
"sync"
"github.com/elastic/elastic-transport-go/v8/elastictransport"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/cgrates/cgrates/config"
@@ -100,9 +101,9 @@ func (eEe *ElasticEE) prepareOpts() (err error) {
if eEe.Cfg().Opts.ElsAPIKey != nil {
eEe.clnOpts.APIKey = *eEe.Cfg().Opts.ElsAPIKey
}
if eEe.Cfg().Opts.ElsCACert != nil {
if eEe.Cfg().Opts.CAPath != nil {
var cacert []byte
cacert, err = os.ReadFile(*eEe.Cfg().Opts.ElsCACert)
cacert, err = os.ReadFile(*eEe.Cfg().Opts.CAPath)
if err != nil {
return
}
@@ -123,12 +124,35 @@ func (eEe *ElasticEE) prepareOpts() (err error) {
if eEe.Cfg().Opts.ElsEnableDebugLogger != nil {
eEe.clnOpts.EnableDebugLogger = *eEe.Cfg().Opts.ElsEnableDebugLogger
}
if loggerType := eEe.Cfg().Opts.ElsLogger; loggerType != nil {
var logger elastictransport.Logger
switch *loggerType {
case utils.ElsJson:
logger = &elastictransport.JSONLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true}
case utils.ElsColor:
logger = &elastictransport.ColorLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true}
case utils.ElsText:
logger = &elastictransport.TextLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true}
default:
return
}
eEe.clnOpts.Logger = logger
}
if eEe.Cfg().Opts.ElsCompressRequestBody != nil {
eEe.clnOpts.CompressRequestBody = *eEe.Cfg().Opts.ElsCompressRequestBody
}
if eEe.Cfg().Opts.ElsRetryOnStatus != nil {
eEe.clnOpts.RetryOnStatus = *eEe.Cfg().Opts.ElsRetryOnStatus
}
if eEe.Cfg().Opts.ElsMaxRetries != nil {
eEe.clnOpts.MaxRetries = *eEe.Cfg().Opts.ElsMaxRetries
}
if eEe.Cfg().Opts.ElsDisableRetry != nil {
eEe.clnOpts.DisableRetry = *eEe.Cfg().Opts.ElsDisableRetry
}
if eEe.Cfg().Opts.ElsCompressRequestBodyLevel != nil {
eEe.clnOpts.CompressRequestBodyLevel = *eEe.Cfg().Opts.ElsCompressRequestBodyLevel
}
return
}

View File

@@ -295,7 +295,7 @@ func TestElasticExportEvent3(t *testing.T) {
eEe.eClnt.Transport = new(mockClient)
// errExpect := `unsupported protocol scheme ""`
cgrCfg.EEsCfg().Exporters[0].ComputeFields()
if err := eEe.ExportEvent([]byte{}, ""); err != nil {
if err := eEe.ExportEvent([]byte{}, ""); err == nil {
t.Error(err)
}
}

View File

@@ -2629,6 +2629,11 @@ const (
ElsVersionType = "elsVersionType"
ElsWaitForActiveShards = "elsWaitForActiveShards"
//EES ElasticSearch Logger Options
ElsJson = "elsJson"
ElsColor = "elsColor"
ElsText = "elsText"
// nats
NatsSubject = "natsSubject"
NatsQueueID = "natsQueueID"