mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Updated go-elasticsearch client
This commit is contained in:
committed by
Dan Christian Bogos
parent
2e96f02be4
commit
fcc9fc109b
@@ -535,6 +535,21 @@ const CGRATES_CFG_JSON = `
|
||||
|
||||
|
||||
// Elasticsearch options
|
||||
// "elsCloud":true, //ExportPath will be an CLoud ID deployment
|
||||
// "elsApiKey": "", // Base64-encoded token for authorization; if set, overrides username/password and service token.
|
||||
// "elsUsername":"", // Username for HTTP Basic Authentication.
|
||||
// "elsPassword":"", // Password for HTTP Basic Authentication.
|
||||
// "elsServiceToken":"" // Service token for authorization; if set, overrides username/password.
|
||||
// "elsCertificateFingerPrint":"" // SHA256 hex fingerprint given by Elasticsearch on first launch.
|
||||
// "elsDiscoverNodesOnStart":false, // Discover nodes when initializing the client. Default: false.
|
||||
// "elsDiscoverNodesInterval":"10s", // Discover nodes periodically. Default: disabled.
|
||||
// "elsEnableDebugLogger":"false", // Enable the debug logging.
|
||||
// "elsLogger":"", //The logger type can either be elsJson,elsColor and elsText
|
||||
// "elsCompressRequestBody":false, //Enable compression on requests
|
||||
// "elsCompressRequestBodyLevel":0, //Default: gzip.DefaultCompression, 9:BestCompression,-2:HuffmanOnly,1:BestSpeed
|
||||
// "elsRetryOnStatus":[], // List of status codes for retry. Default: 502, 503, 504.
|
||||
// "elsMaxRetries": 0, // Default: 3.
|
||||
// "elsDisableRetry": false, // Default: false.
|
||||
// "elsIndex": "", // ElsIndex
|
||||
// "elsIfPrimaryTerm": 0, // ElsIfPrimaryTerm
|
||||
// "elsIfSeqNo": 0, // ElsIfSeqNo
|
||||
|
||||
292
config/eescfg.go
292
config/eescfg.go
@@ -146,59 +146,74 @@ func (eeS EEsCfg) AsMapInterface(separator string) any {
|
||||
}
|
||||
|
||||
type EventExporterOpts struct {
|
||||
CSVFieldSeparator *string
|
||||
ElsIndex *string
|
||||
ElsIfPrimaryTerm *int
|
||||
ElsIfSeqNo *int
|
||||
ElsOpType *string
|
||||
ElsPipeline *string
|
||||
ElsRouting *string
|
||||
ElsTimeout *time.Duration
|
||||
ElsVersion *int
|
||||
ElsVersionType *string
|
||||
ElsWaitForActiveShards *string
|
||||
SQLMaxIdleConns *int
|
||||
SQLMaxOpenConns *int
|
||||
SQLConnMaxLifetime *time.Duration
|
||||
MYSQLDSNParams map[string]string
|
||||
SQLTableName *string
|
||||
SQLDBName *string
|
||||
PgSSLMode *string
|
||||
KafkaTopic *string
|
||||
KafkaTLS *bool
|
||||
KafkaCAPath *string
|
||||
KafkaSkipTLSVerify *bool
|
||||
AMQPRoutingKey *string
|
||||
AMQPQueueID *string
|
||||
AMQPExchange *string
|
||||
AMQPExchangeType *string
|
||||
AMQPUsername *string
|
||||
AMQPPassword *string
|
||||
AWSRegion *string
|
||||
AWSKey *string
|
||||
AWSSecret *string
|
||||
AWSToken *string
|
||||
SQSQueueID *string
|
||||
S3BucketID *string
|
||||
S3FolderPath *string
|
||||
NATSJetStream *bool
|
||||
NATSSubject *string
|
||||
NATSJWTFile *string
|
||||
NATSSeedFile *string
|
||||
NATSCertificateAuthority *string
|
||||
NATSClientCertificate *string
|
||||
NATSClientKey *string
|
||||
NATSJetStreamMaxWait *time.Duration
|
||||
RPCCodec *string
|
||||
ServiceMethod *string
|
||||
KeyPath *string
|
||||
CertPath *string
|
||||
CAPath *string
|
||||
TLS *bool
|
||||
ConnIDs *[]string
|
||||
RPCConnTimeout *time.Duration
|
||||
RPCReplyTimeout *time.Duration
|
||||
RPCAPIOpts map[string]any
|
||||
CSVFieldSeparator *string
|
||||
ElsIndex *string
|
||||
ElsDiscoverNodesOnStart *bool
|
||||
ElsDiscoverNodeInterval *time.Duration
|
||||
ElsCloud *bool
|
||||
ElsAPIKey *string
|
||||
ElsCertificateFingerprint *string
|
||||
ElsServiceToken *string
|
||||
ElsUsername *string // Username for HTTP Basic Authentication.
|
||||
ElsPassword *string
|
||||
ElsEnableDebugLogger *bool
|
||||
ElsLogger *string
|
||||
ElsCompressRequestBody *bool
|
||||
ElsCompressRequestBodyLevel *int
|
||||
ElsRetryOnStatus *[]int
|
||||
ElsMaxRetries *int
|
||||
ElsDisableRetry *bool
|
||||
ElsIfPrimaryTerm *int
|
||||
ElsIfSeqNo *int
|
||||
ElsOpType *string
|
||||
ElsPipeline *string
|
||||
ElsRouting *string
|
||||
ElsTimeout *time.Duration
|
||||
ElsVersion *int
|
||||
ElsVersionType *string
|
||||
ElsWaitForActiveShards *string
|
||||
SQLMaxIdleConns *int
|
||||
SQLMaxOpenConns *int
|
||||
SQLConnMaxLifetime *time.Duration
|
||||
MYSQLDSNParams map[string]string
|
||||
SQLTableName *string
|
||||
SQLDBName *string
|
||||
PgSSLMode *string
|
||||
KafkaTopic *string
|
||||
KafkaTLS *bool
|
||||
KafkaCAPath *string
|
||||
KafkaSkipTLSVerify *bool
|
||||
AMQPRoutingKey *string
|
||||
AMQPQueueID *string
|
||||
AMQPExchange *string
|
||||
AMQPExchangeType *string
|
||||
AMQPUsername *string
|
||||
AMQPPassword *string
|
||||
AWSRegion *string
|
||||
AWSKey *string
|
||||
AWSSecret *string
|
||||
AWSToken *string
|
||||
SQSQueueID *string
|
||||
S3BucketID *string
|
||||
S3FolderPath *string
|
||||
NATSJetStream *bool
|
||||
NATSSubject *string
|
||||
NATSJWTFile *string
|
||||
NATSSeedFile *string
|
||||
NATSCertificateAuthority *string
|
||||
NATSClientCertificate *string
|
||||
NATSClientKey *string
|
||||
NATSJetStreamMaxWait *time.Duration
|
||||
RPCCodec *string
|
||||
ServiceMethod *string
|
||||
KeyPath *string
|
||||
CertPath *string
|
||||
CAPath *string
|
||||
TLS *bool
|
||||
ConnIDs *[]string
|
||||
RPCConnTimeout *time.Duration
|
||||
RPCReplyTimeout *time.Duration
|
||||
RPCAPIOpts map[string]any
|
||||
}
|
||||
|
||||
// EventExporterCfg the config for a Event Exporter
|
||||
@@ -247,6 +262,55 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson)
|
||||
if jsnCfg.CSVFieldSeparator != nil {
|
||||
eeOpts.CSVFieldSeparator = jsnCfg.CSVFieldSeparator
|
||||
}
|
||||
if jsnCfg.ElsCloud != nil {
|
||||
eeOpts.ElsCloud = jsnCfg.ElsCloud
|
||||
}
|
||||
if jsnCfg.ElsAPIKey != nil {
|
||||
eeOpts.ElsAPIKey = jsnCfg.ElsAPIKey
|
||||
}
|
||||
if jsnCfg.ElsServiceToken != nil {
|
||||
eeOpts.ElsServiceToken = jsnCfg.ElsServiceToken
|
||||
}
|
||||
if jsnCfg.ElsCertificateFingerprint != nil {
|
||||
eeOpts.ElsCertificateFingerprint = jsnCfg.ElsCertificateFingerprint
|
||||
}
|
||||
if jsnCfg.ElsEnableDebugLogger != nil {
|
||||
eeOpts.ElsEnableDebugLogger = jsnCfg.ElsEnableDebugLogger
|
||||
}
|
||||
if jsnCfg.ElsLogger != nil {
|
||||
eeOpts.ElsLogger = jsnCfg.ElsLogger
|
||||
}
|
||||
if jsnCfg.ElsCompressRequestBody != nil {
|
||||
eeOpts.ElsCompressRequestBody = jsnCfg.ElsCompressRequestBody
|
||||
}
|
||||
if jsnCfg.ElsCompressRequestBodyLevel != nil {
|
||||
eeOpts.ElsCompressRequestBodyLevel = jsnCfg.ElsCompressRequestBodyLevel
|
||||
}
|
||||
if jsnCfg.ElsUsername != nil {
|
||||
eeOpts.ElsUsername = jsnCfg.ElsUsername
|
||||
}
|
||||
if jsnCfg.ElsPassword != nil {
|
||||
eeOpts.ElsPassword = jsnCfg.ElsPassword
|
||||
}
|
||||
if jsnCfg.ElsDiscoverNodesOnStart != nil {
|
||||
eeOpts.ElsDiscoverNodesOnStart = jsnCfg.ElsDiscoverNodesOnStart
|
||||
}
|
||||
if jsnCfg.ElsDiscoverNodesInterval != nil {
|
||||
var nodesInterval time.Duration
|
||||
if nodesInterval, err = utils.ParseDurationWithSecs(*jsnCfg.ElsDiscoverNodesInterval); err != nil {
|
||||
return
|
||||
}
|
||||
eeOpts.ElsDiscoverNodeInterval = utils.DurationPointer(nodesInterval)
|
||||
}
|
||||
if jsnCfg.ElsRetryOnStatus != nil {
|
||||
eeOpts.ElsRetryOnStatus = jsnCfg.ElsRetryOnStatus
|
||||
}
|
||||
if jsnCfg.ElsMaxRetries != nil {
|
||||
eeOpts.ElsMaxRetries = jsnCfg.ElsMaxRetries
|
||||
}
|
||||
if jsnCfg.ElsDisableRetry != nil {
|
||||
eeOpts.ElsDisableRetry = jsnCfg.ElsDisableRetry
|
||||
}
|
||||
if jsnCfg.ElsIndex != nil {
|
||||
eeOpts.ElsIndex = jsnCfg.ElsIndex
|
||||
}
|
||||
@@ -826,6 +890,7 @@ func (optsEes *EventExporterOpts) AsMapInterface() map[string]any {
|
||||
if optsEes.CSVFieldSeparator != nil {
|
||||
opts[utils.CSVFieldSepOpt] = *optsEes.CSVFieldSeparator
|
||||
}
|
||||
|
||||
if optsEes.ElsIndex != nil {
|
||||
opts[utils.ElsIndex] = *optsEes.ElsIndex
|
||||
}
|
||||
@@ -986,59 +1051,74 @@ func (optsEes *EventExporterOpts) AsMapInterface() map[string]any {
|
||||
}
|
||||
|
||||
type EventExporterOptsJson struct {
|
||||
CSVFieldSeparator *string `json:"csvFieldSeparator"`
|
||||
ElsIndex *string `json:"elsIndex"`
|
||||
ElsIfPrimaryTerm *int `json:"elsIfPrimaryTerm"`
|
||||
ElsIfSeqNo *int `json:"elsIfSeqNo"`
|
||||
ElsOpType *string `json:"elsOpType"`
|
||||
ElsPipeline *string `json:"elsPipeline"`
|
||||
ElsRouting *string `json:"elsRouting"`
|
||||
ElsTimeout *string `json:"elsTimeout"`
|
||||
ElsVersion *int `json:"elsVersion"`
|
||||
ElsVersionType *string `json:"elsVersionType"`
|
||||
ElsWaitForActiveShards *string `json:"elsWaitForActiveShards"`
|
||||
SQLMaxIdleConns *int `json:"sqlMaxIdleConns"`
|
||||
SQLMaxOpenConns *int `json:"sqlMaxOpenConns"`
|
||||
SQLConnMaxLifetime *string `json:"sqlConnMaxLifetime"`
|
||||
MYSQLDSNParams map[string]string `json:"mysqlDSNParams"`
|
||||
SQLTableName *string `json:"sqlTableName"`
|
||||
SQLDBName *string `json:"sqlDBName"`
|
||||
PgSSLMode *string `json:"pgSSLMode"`
|
||||
KafkaTopic *string `json:"kafkaTopic"`
|
||||
KafkaTLS *bool `json:"kafkaTLS"`
|
||||
KafkaCAPath *string `json:"kafkaCAPath"`
|
||||
KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"`
|
||||
AMQPQueueID *string `json:"amqpQueueID"`
|
||||
AMQPRoutingKey *string `json:"amqpRoutingKey"`
|
||||
AMQPExchange *string `json:"amqpExchange"`
|
||||
AMQPExchangeType *string `json:"amqpExchangeType"`
|
||||
AMQPUsername *string `json:"amqpUsername"`
|
||||
AMQPPassword *string `json:"amqpPassword"`
|
||||
AWSRegion *string `json:"awsRegion"`
|
||||
AWSKey *string `json:"awsKey"`
|
||||
AWSSecret *string `json:"awsSecret"`
|
||||
AWSToken *string `json:"awsToken"`
|
||||
SQSQueueID *string `json:"sqsQueueID"`
|
||||
S3BucketID *string `json:"s3BucketID"`
|
||||
S3FolderPath *string `json:"s3FolderPath"`
|
||||
NATSJetStream *bool `json:"natsJetStream"`
|
||||
NATSSubject *string `json:"natsSubject"`
|
||||
NATSJWTFile *string `json:"natsJWTFile"`
|
||||
NATSSeedFile *string `json:"natsSeedFile"`
|
||||
NATSCertificateAuthority *string `json:"natsCertificateAuthority"`
|
||||
NATSClientCertificate *string `json:"natsClientCertificate"`
|
||||
NATSClientKey *string `json:"natsClientKey"`
|
||||
NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"`
|
||||
RPCCodec *string `json:"rpcCodec"`
|
||||
ServiceMethod *string `json:"serviceMethod"`
|
||||
KeyPath *string `json:"keyPath"`
|
||||
CertPath *string `json:"certPath"`
|
||||
CAPath *string `json:"caPath"`
|
||||
ConnIDs *[]string `json:"connIDs"`
|
||||
TLS *bool `json:"tls"`
|
||||
RPCConnTimeout *string `json:"rpcConnTimeout"`
|
||||
RPCReplyTimeout *string `json:"rpcReplyTimeout"`
|
||||
RPCAPIOpts map[string]any `json:"rpcAPIOpts"`
|
||||
CSVFieldSeparator *string `json:"csvFieldSeparator"`
|
||||
ElsCloud *bool `json:"elsCloud"`
|
||||
ElsAPIKey *string `json:"elsApiKey"`
|
||||
ElsServiceToken *string `json:"elsServiceToken"`
|
||||
ElsCertificateFingerprint *string `json:"elsCertificateFingerPrint"`
|
||||
ElsUsername *string `json:"elsUsername"`
|
||||
ElsPassword *string `json:"elsPassword"`
|
||||
ElsDiscoverNodesOnStart *bool `json:"elsDiscoverNodesOnStart"`
|
||||
ElsDiscoverNodesInterval *string `json:"elsDiscoverNodesInterval"`
|
||||
ElsEnableDebugLogger *bool `json:"elsEnableDebugLogger"`
|
||||
ElsLogger *string `json:"elsLogger"`
|
||||
ElsCompressRequestBody *bool `json:"elsCompressRequestBody"`
|
||||
ElsCompressRequestBodyLevel *int `json:"elsCompressRequestBodyLevel"`
|
||||
ElsRetryOnStatus *[]int `json:"elsRetryOnStatus"`
|
||||
ElsMaxRetries *int `json:"elsMaxRetries"`
|
||||
ElsDisableRetry *bool `json:"elsDisableRetry"`
|
||||
ElsIndex *string `json:"elsIndex"`
|
||||
ElsIfPrimaryTerm *int `json:"elsIfPrimaryTerm"`
|
||||
ElsIfSeqNo *int `json:"elsIfSeqNo"`
|
||||
ElsOpType *string `json:"elsOpType"`
|
||||
ElsPipeline *string `json:"elsPipeline"`
|
||||
ElsRouting *string `json:"elsRouting"`
|
||||
ElsTimeout *string `json:"elsTimeout"`
|
||||
ElsVersion *int `json:"elsVersion"`
|
||||
ElsVersionType *string `json:"elsVersionType"`
|
||||
ElsWaitForActiveShards *string `json:"elsWaitForActiveShards"`
|
||||
SQLMaxIdleConns *int `json:"sqlMaxIdleConns"`
|
||||
SQLMaxOpenConns *int `json:"sqlMaxOpenConns"`
|
||||
SQLConnMaxLifetime *string `json:"sqlConnMaxLifetime"`
|
||||
MYSQLDSNParams map[string]string `json:"mysqlDSNParams"`
|
||||
SQLTableName *string `json:"sqlTableName"`
|
||||
SQLDBName *string `json:"sqlDBName"`
|
||||
PgSSLMode *string `json:"pgSSLMode"`
|
||||
KafkaTopic *string `json:"kafkaTopic"`
|
||||
KafkaTLS *bool `json:"kafkaTLS"`
|
||||
KafkaCAPath *string `json:"kafkaCAPath"`
|
||||
KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"`
|
||||
AMQPQueueID *string `json:"amqpQueueID"`
|
||||
AMQPRoutingKey *string `json:"amqpRoutingKey"`
|
||||
AMQPExchange *string `json:"amqpExchange"`
|
||||
AMQPExchangeType *string `json:"amqpExchangeType"`
|
||||
AMQPUsername *string `json:"amqpUsername"`
|
||||
AMQPPassword *string `json:"amqpPassword"`
|
||||
AWSRegion *string `json:"awsRegion"`
|
||||
AWSKey *string `json:"awsKey"`
|
||||
AWSSecret *string `json:"awsSecret"`
|
||||
AWSToken *string `json:"awsToken"`
|
||||
SQSQueueID *string `json:"sqsQueueID"`
|
||||
S3BucketID *string `json:"s3BucketID"`
|
||||
S3FolderPath *string `json:"s3FolderPath"`
|
||||
NATSJetStream *bool `json:"natsJetStream"`
|
||||
NATSSubject *string `json:"natsSubject"`
|
||||
NATSJWTFile *string `json:"natsJWTFile"`
|
||||
NATSSeedFile *string `json:"natsSeedFile"`
|
||||
NATSCertificateAuthority *string `json:"natsCertificateAuthority"`
|
||||
NATSClientCertificate *string `json:"natsClientCertificate"`
|
||||
NATSClientKey *string `json:"natsClientKey"`
|
||||
NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"`
|
||||
RPCCodec *string `json:"rpcCodec"`
|
||||
ServiceMethod *string `json:"serviceMethod"`
|
||||
KeyPath *string `json:"keyPath"`
|
||||
CertPath *string `json:"certPath"`
|
||||
CAPath *string `json:"caPath"`
|
||||
ConnIDs *[]string `json:"connIDs"`
|
||||
TLS *bool `json:"tls"`
|
||||
RPCConnTimeout *string `json:"rpcConnTimeout"`
|
||||
RPCReplyTimeout *string `json:"rpcReplyTimeout"`
|
||||
RPCAPIOpts map[string]any `json:"rpcAPIOpts"`
|
||||
}
|
||||
|
||||
// EventExporterJsonCfg is the configuration of a single EventExporter
|
||||
|
||||
@@ -388,6 +388,151 @@
|
||||
{"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"},
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "ElasticsearchExporterCluster",
|
||||
"type": "*els",
|
||||
"export_path": "http://192.168.56.22:9200;http://192.168.56.64:9200",
|
||||
"attempts": 1,
|
||||
"opts": {
|
||||
"elsIndex": "cdrs",
|
||||
"elsDiscoverNodesOnStart":true,
|
||||
//"elsDiscoverNodesInterval":"10s",
|
||||
// "elsLogger":"elsJson",
|
||||
// "elsEnableDebugLogger":false,
|
||||
// "elsCompressRequestBody":false,
|
||||
// "elsCompressRequestBodyLevel":0,
|
||||
// "elsRetryOnStatus":[],
|
||||
// "elsMaxRetries": 0,
|
||||
// "elsDisableRetry": false,
|
||||
|
||||
//"elsIfPrimaryTerm": 0,
|
||||
//"elsIfSeqNo": 0,
|
||||
"elsOpType": "",
|
||||
"elsPipeline": "",
|
||||
"require_alias": "",
|
||||
"elsRouting": "",
|
||||
"elsTimeout": "0",
|
||||
//"elsVersion": 0,
|
||||
"elsVersionType": "",
|
||||
"elsWaitForActiveShards": "",
|
||||
},
|
||||
"fields":[
|
||||
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"},
|
||||
{"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"},
|
||||
{"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"},
|
||||
{"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"},
|
||||
{"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"},
|
||||
{"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*vars.*tenant"},
|
||||
{"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"},
|
||||
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
|
||||
{"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"},
|
||||
{"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"},
|
||||
{"tag": "@timestamp", "path": "*exp.@timestamp", "type": "*datetime", "value": "*now"},
|
||||
{"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime"},
|
||||
{"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime"},
|
||||
{"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"},
|
||||
{"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"},
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "ElasticsearchExporterCloud",
|
||||
"type": "*els",
|
||||
"export_path": "Deployment:dXMtY2VudHJh0YW==",
|
||||
"attempts": 1,
|
||||
"opts": {
|
||||
"elsIndex": "cdrs",
|
||||
"elsUsername":"elastic_user",
|
||||
"elsPassword":"",
|
||||
"elsCloud":true,
|
||||
//"elsApiKey":"aZmd2UQ==",
|
||||
// "elsLogger":"elsJson",
|
||||
// "elsEnableDebugLogger":false,
|
||||
// "elsCompressRequestBody":false,
|
||||
// "elsCompressRequestBodyLevel":0,
|
||||
// "elsRetryOnStatus":[],
|
||||
// "elsMaxRetries": 0,
|
||||
// "elsDisableRetry": false,
|
||||
|
||||
//"elsIfPrimaryTerm": 0,
|
||||
//"elsIfSeqNo": 0,
|
||||
"elsOpType": "",
|
||||
"elsPipeline": "",
|
||||
"require_alias": "",
|
||||
"elsRouting": "",
|
||||
"elsTimeout": "0",
|
||||
//"elsVersion": 0,
|
||||
"elsVersionType": "",
|
||||
"elsWaitForActiveShards": "",
|
||||
},
|
||||
"fields":[
|
||||
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"},
|
||||
{"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"},
|
||||
{"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"},
|
||||
{"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"},
|
||||
{"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"},
|
||||
{"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*vars.*tenant"},
|
||||
{"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"},
|
||||
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
|
||||
{"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"},
|
||||
{"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"},
|
||||
{"tag": "@timestamp", "path": "*exp.@timestamp", "type": "*datetime", "value": "*now"},
|
||||
{"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime"},
|
||||
{"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime"},
|
||||
{"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"},
|
||||
{"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"},
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "ElasticsearchExporterHttps",
|
||||
"type": "*els",
|
||||
"export_path": "https://192.168.56.29:9200",
|
||||
"attempts": 1,
|
||||
"opts": {
|
||||
"elsIndex": "cdrs",
|
||||
"elsUsername":"elastic",
|
||||
"elsPassword":"",
|
||||
"caPath":"/path/to/http_ca.crt",
|
||||
//"elsCertificateFingerPrint":"",
|
||||
// "elsServiceToken":""
|
||||
// "elsLogger":"elsJson",
|
||||
// "elsDiscoverNodesOnStart":true,
|
||||
// "elsDiscoverNodesInterval":"10s",
|
||||
// "elsEnableDebugLogger":false,
|
||||
// "elsCompressRequestBody":true,
|
||||
// "elsCompressRequestBodyLevel":0,
|
||||
// "elsRetryOnStatus":[],
|
||||
// "elsMaxRetries": 0,
|
||||
// "elsDisableRetry": false,
|
||||
|
||||
//"elsIfPrimaryTerm": 0,
|
||||
//"elsIfSeqNo": 0,
|
||||
"elsOpType": "",
|
||||
"elsPipeline": "",
|
||||
"require_alias": "",
|
||||
"elsRouting": "",
|
||||
"elsTimeout": "0",
|
||||
//"elsVersion": 0,
|
||||
"elsVersionType": "",
|
||||
"elsWaitForActiveShards": "",
|
||||
},
|
||||
"fields":[
|
||||
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"},
|
||||
{"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"},
|
||||
{"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"},
|
||||
{"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"},
|
||||
{"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"},
|
||||
{"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*vars.*tenant"},
|
||||
{"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"},
|
||||
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
|
||||
{"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"},
|
||||
{"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"},
|
||||
{"tag": "@timestamp", "path": "*exp.@timestamp", "type": "*datetime", "value": "*now"},
|
||||
{"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime"},
|
||||
{"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime"},
|
||||
{"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"},
|
||||
{"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"},
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "HTTPJsonMapExporterWithNoFields",
|
||||
"type": "*httpJSONMap",
|
||||
|
||||
@@ -268,6 +268,7 @@ func exportEventWithExporter(ctx *context.Context, exp EventExporter, connMngr *
|
||||
utils.MetaDC: exp.GetMetrics(),
|
||||
utils.MetaOpts: utils.MapStorage(ev.APIOpts),
|
||||
utils.MetaCfg: cfg.GetDataProvider(),
|
||||
utils.MetaVars: utils.MapStorage{utils.MetaTenant: ev.Tenant},
|
||||
}, utils.FirstNonEmpty(ev.Tenant, cfg.GeneralCfg().DefaultTenant),
|
||||
filterS,
|
||||
map[string]*utils.OrderedNavigableMap{utils.MetaExp: expNM}).SetFields(ctx, exp.Cfg().ContentFields())
|
||||
|
||||
@@ -22,16 +22,18 @@ import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/elastic/go-elasticsearch/esapi"
|
||||
"github.com/elastic/elastic-transport-go/v8/elastictransport"
|
||||
"github.com/elastic/go-elasticsearch/v8/esapi"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
elasticsearch "github.com/elastic/go-elasticsearch"
|
||||
elasticsearch "github.com/elastic/go-elasticsearch/v8"
|
||||
)
|
||||
|
||||
func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (eEe *ElasticEE, err error) {
|
||||
@@ -46,11 +48,12 @@ func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (eEe *
|
||||
|
||||
// ElasticEE implements EventExporter interface for ElasticSearch export
|
||||
type ElasticEE struct {
|
||||
cfg *config.EventExporterCfg
|
||||
eClnt *elasticsearch.Client
|
||||
dc *utils.SafeMapStorage
|
||||
opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap
|
||||
reqs *concReq
|
||||
cfg *config.EventExporterCfg
|
||||
eClnt *elasticsearch.Client
|
||||
dc *utils.SafeMapStorage
|
||||
opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap
|
||||
clntOpts elasticsearch.Config
|
||||
reqs *concReq
|
||||
sync.RWMutex
|
||||
bytePreparing
|
||||
}
|
||||
@@ -83,6 +86,74 @@ func (eEe *ElasticEE) prepareOpts() (err error) {
|
||||
if eEe.Cfg().Opts.ElsWaitForActiveShards != nil {
|
||||
eEe.opts.WaitForActiveShards = *eEe.Cfg().Opts.ElsWaitForActiveShards
|
||||
}
|
||||
|
||||
//client opts
|
||||
if eEe.Cfg().Opts.ElsCloud != nil && *eEe.Cfg().Opts.ElsCloud {
|
||||
eEe.clntOpts.CloudID = eEe.Cfg().ExportPath
|
||||
} else {
|
||||
eEe.clntOpts.Addresses = strings.Split(eEe.Cfg().ExportPath, utils.InfieldSep)
|
||||
}
|
||||
if eEe.Cfg().Opts.ElsUsername != nil {
|
||||
eEe.clntOpts.Username = *eEe.Cfg().Opts.ElsUsername
|
||||
}
|
||||
if eEe.Cfg().Opts.ElsPassword != nil {
|
||||
eEe.clntOpts.Password = *eEe.Cfg().Opts.ElsPassword
|
||||
}
|
||||
if eEe.Cfg().Opts.ElsAPIKey != nil {
|
||||
eEe.clntOpts.APIKey = *eEe.Cfg().Opts.ElsAPIKey
|
||||
}
|
||||
if eEe.Cfg().Opts.CAPath != nil {
|
||||
var cacert []byte
|
||||
cacert, err = os.ReadFile(*eEe.Cfg().Opts.CAPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
eEe.clntOpts.CACert = cacert
|
||||
}
|
||||
if eEe.Cfg().Opts.ElsCertificateFingerprint != nil {
|
||||
eEe.clntOpts.CertificateFingerprint = *eEe.Cfg().Opts.ElsCertificateFingerprint
|
||||
}
|
||||
if eEe.Cfg().Opts.ElsServiceToken != nil {
|
||||
eEe.clntOpts.ServiceToken = *eEe.Cfg().Opts.ElsServiceToken
|
||||
}
|
||||
if eEe.Cfg().Opts.ElsDiscoverNodesOnStart != nil {
|
||||
eEe.clntOpts.DiscoverNodesOnStart = *eEe.Cfg().Opts.ElsDiscoverNodesOnStart
|
||||
}
|
||||
if eEe.Cfg().Opts.ElsDiscoverNodeInterval != nil {
|
||||
eEe.clntOpts.DiscoverNodesInterval = *eEe.Cfg().Opts.ElsDiscoverNodeInterval
|
||||
}
|
||||
if eEe.Cfg().Opts.ElsEnableDebugLogger != nil {
|
||||
eEe.clntOpts.EnableDebugLogger = *eEe.Cfg().Opts.ElsEnableDebugLogger
|
||||
}
|
||||
if loggerType := eEe.Cfg().Opts.ElsLogger; loggerType != nil {
|
||||
var logger elastictransport.Logger
|
||||
switch *loggerType {
|
||||
case utils.ElsJson:
|
||||
logger = &elastictransport.JSONLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true}
|
||||
case utils.ElsColor:
|
||||
logger = &elastictransport.ColorLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true}
|
||||
case utils.ElsText:
|
||||
logger = &elastictransport.TextLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true}
|
||||
default:
|
||||
return
|
||||
}
|
||||
eEe.clntOpts.Logger = logger
|
||||
}
|
||||
if eEe.Cfg().Opts.ElsCompressRequestBody != nil {
|
||||
eEe.clntOpts.CompressRequestBody = *eEe.Cfg().Opts.ElsCompressRequestBody
|
||||
}
|
||||
if eEe.Cfg().Opts.ElsRetryOnStatus != nil {
|
||||
eEe.clntOpts.RetryOnStatus = *eEe.Cfg().Opts.ElsRetryOnStatus
|
||||
}
|
||||
if eEe.Cfg().Opts.ElsMaxRetries != nil {
|
||||
eEe.clntOpts.MaxRetries = *eEe.Cfg().Opts.ElsMaxRetries
|
||||
}
|
||||
if eEe.Cfg().Opts.ElsDisableRetry != nil {
|
||||
eEe.clntOpts.DisableRetry = *eEe.Cfg().Opts.ElsDisableRetry
|
||||
}
|
||||
if eEe.Cfg().Opts.ElsCompressRequestBodyLevel != nil {
|
||||
eEe.clntOpts.CompressRequestBodyLevel = *eEe.Cfg().Opts.ElsCompressRequestBodyLevel
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -91,11 +162,10 @@ func (eEe *ElasticEE) Cfg() *config.EventExporterCfg { return eEe.cfg }
|
||||
func (eEe *ElasticEE) Connect() (err error) {
|
||||
eEe.Lock()
|
||||
// create the client
|
||||
if eEe.eClnt == nil {
|
||||
eEe.eClnt, err = elasticsearch.NewClient(
|
||||
elasticsearch.Config{Addresses: strings.Split(eEe.Cfg().ExportPath, utils.InfieldSep)},
|
||||
)
|
||||
if eEe.eClnt != nil {
|
||||
return
|
||||
}
|
||||
eEe.eClnt, err = elasticsearch.NewClient(eEe.clntOpts)
|
||||
eEe.Unlock()
|
||||
return
|
||||
}
|
||||
@@ -120,7 +190,6 @@ func (eEe *ElasticEE) ExportEvent(ctx *context.Context, ev, extraData any) (err
|
||||
IfPrimaryTerm: eEe.opts.IfPrimaryTerm,
|
||||
IfSeqNo: eEe.opts.IfSeqNo,
|
||||
OpType: eEe.opts.OpType,
|
||||
Parent: eEe.opts.Parent,
|
||||
Pipeline: eEe.opts.Pipeline,
|
||||
Routing: eEe.opts.Routing,
|
||||
Timeout: eEe.opts.Timeout,
|
||||
|
||||
@@ -34,7 +34,7 @@ import (
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
elasticsearch "github.com/elastic/go-elasticsearch"
|
||||
elasticsearch "github.com/elastic/go-elasticsearch/v8"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
|
||||
3
go.mod
3
go.mod
@@ -25,7 +25,7 @@ require (
|
||||
github.com/cgrates/ugocodec v0.0.0-20201023092048-df93d0123f60
|
||||
github.com/creack/pty v1.1.18
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
||||
github.com/elastic/go-elasticsearch v0.0.0
|
||||
github.com/elastic/go-elasticsearch/v8 v8.8.0
|
||||
github.com/ericlagergren/decimal v0.0.0-20211103172832-aca2edc11f73
|
||||
github.com/fiorix/go-diameter/v4 v4.0.4
|
||||
github.com/fsnotify/fsnotify v1.5.4
|
||||
@@ -114,6 +114,7 @@ require (
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/bits-and-blooms/bitset v1.2.2 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||
github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6 // indirect
|
||||
github.com/google/uuid v1.3.0 // indirect
|
||||
github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
|
||||
|
||||
8
go.sum
8
go.sum
@@ -183,8 +183,16 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
|
||||
github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6 h1:1+44gxLdKRnR/Bx/iAtr+XqNcE4e0oODa63+FABNANI=
|
||||
github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
|
||||
github.com/elastic/elastic-transport-go/v8 v8.3.0 h1:DJGxovyQLXGr62e9nDMPSxRyWION0Bh6d9eCFBriiHo=
|
||||
github.com/elastic/elastic-transport-go/v8 v8.3.0/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
|
||||
github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWnBO0y+TZaA=
|
||||
github.com/elastic/go-elasticsearch v0.0.0/go.mod h1:TkBSJBuTyFdBnrNqoPc54FN0vKf5c04IdM4zuStJ7xg=
|
||||
github.com/elastic/go-elasticsearch/v8 v8.8.0 h1:yNBPlXNo6wstMG7I3KiZPbLFgA82RMryYqkh1xBMV3A=
|
||||
github.com/elastic/go-elasticsearch/v8 v8.8.0/go.mod h1:NGmpvohKiRHXI0Sw4fuUGn6hYOmAXlyCphKpzVBiqDE=
|
||||
github.com/elastic/go-elasticsearch/v8 v8.8.1 h1:/OiP5Yex40q5eWpzFVQIS8jRE7SaEZrFkG9JbE6TXtY=
|
||||
github.com/elastic/go-elasticsearch/v8 v8.8.1/go.mod h1:GU1BJHO7WeamP7UhuElYwzzHtvf9SDmeVpSSy9+o6Qg=
|
||||
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
|
||||
|
||||
@@ -2496,6 +2496,10 @@ const (
|
||||
ElsVersionLow = "elsVersion"
|
||||
ElsVersionType = "elsVersionType"
|
||||
ElsWaitForActiveShards = "elsWaitForActiveShards"
|
||||
//EES ElasticSearch Logger Options
|
||||
ElsJson = "elsJson"
|
||||
ElsColor = "elsColor"
|
||||
ElsText = "elsText"
|
||||
// nats
|
||||
NatsSubject = "natsSubject"
|
||||
NatsQueueID = "natsQueueID"
|
||||
|
||||
Reference in New Issue
Block a user