diff --git a/config/config_defaults.go b/config/config_defaults.go index 8f72c7ba4..84e6f8cb8 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -535,6 +535,21 @@ const CGRATES_CFG_JSON = ` // Elasticsearch options + // "elsCloud":true, //ExportPath will be an CLoud ID deployment + // "elsApiKey": "", // Base64-encoded token for authorization; if set, overrides username/password and service token. + // "elsUsername":"", // Username for HTTP Basic Authentication. + // "elsPassword":"", // Password for HTTP Basic Authentication. + // "elsServiceToken":"" // Service token for authorization; if set, overrides username/password. + // "elsCertificateFingerPrint":"" // SHA256 hex fingerprint given by Elasticsearch on first launch. + // "elsDiscoverNodesOnStart":false, // Discover nodes when initializing the client. Default: false. + // "elsDiscoverNodesInterval":"10s", // Discover nodes periodically. Default: disabled. + // "elsEnableDebugLogger":"false", // Enable the debug logging. + // "elsLogger":"", //The logger type can either be elsJson,elsColor and elsText + // "elsCompressRequestBody":false, //Enable compression on requests + // "elsCompressRequestBodyLevel":0, //Default: gzip.DefaultCompression, 9:BestCompression,-2:HuffmanOnly,1:BestSpeed + // "elsRetryOnStatus":[], // List of status codes for retry. Default: 502, 503, 504. + // "elsMaxRetries": 0, // Default: 3. + // "elsDisableRetry": false, // Default: false. // "elsIndex": "", // ElsIndex // "elsIfPrimaryTerm": 0, // ElsIfPrimaryTerm // "elsIfSeqNo": 0, // ElsIfSeqNo diff --git a/config/eescfg.go b/config/eescfg.go index 9ef8fc0ea..507991804 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -146,59 +146,74 @@ func (eeS EEsCfg) AsMapInterface(separator string) any { } type EventExporterOpts struct { - CSVFieldSeparator *string - ElsIndex *string - ElsIfPrimaryTerm *int - ElsIfSeqNo *int - ElsOpType *string - ElsPipeline *string - ElsRouting *string - ElsTimeout *time.Duration - ElsVersion *int - ElsVersionType *string - ElsWaitForActiveShards *string - SQLMaxIdleConns *int - SQLMaxOpenConns *int - SQLConnMaxLifetime *time.Duration - MYSQLDSNParams map[string]string - SQLTableName *string - SQLDBName *string - PgSSLMode *string - KafkaTopic *string - KafkaTLS *bool - KafkaCAPath *string - KafkaSkipTLSVerify *bool - AMQPRoutingKey *string - AMQPQueueID *string - AMQPExchange *string - AMQPExchangeType *string - AMQPUsername *string - AMQPPassword *string - AWSRegion *string - AWSKey *string - AWSSecret *string - AWSToken *string - SQSQueueID *string - S3BucketID *string - S3FolderPath *string - NATSJetStream *bool - NATSSubject *string - NATSJWTFile *string - NATSSeedFile *string - NATSCertificateAuthority *string - NATSClientCertificate *string - NATSClientKey *string - NATSJetStreamMaxWait *time.Duration - RPCCodec *string - ServiceMethod *string - KeyPath *string - CertPath *string - CAPath *string - TLS *bool - ConnIDs *[]string - RPCConnTimeout *time.Duration - RPCReplyTimeout *time.Duration - RPCAPIOpts map[string]any + CSVFieldSeparator *string + ElsIndex *string + ElsDiscoverNodesOnStart *bool + ElsDiscoverNodeInterval *time.Duration + ElsCloud *bool + ElsAPIKey *string + ElsCertificateFingerprint *string + ElsServiceToken *string + ElsUsername *string // Username for HTTP Basic Authentication. + ElsPassword *string + ElsEnableDebugLogger *bool + ElsLogger *string + ElsCompressRequestBody *bool + ElsCompressRequestBodyLevel *int + ElsRetryOnStatus *[]int + ElsMaxRetries *int + ElsDisableRetry *bool + ElsIfPrimaryTerm *int + ElsIfSeqNo *int + ElsOpType *string + ElsPipeline *string + ElsRouting *string + ElsTimeout *time.Duration + ElsVersion *int + ElsVersionType *string + ElsWaitForActiveShards *string + SQLMaxIdleConns *int + SQLMaxOpenConns *int + SQLConnMaxLifetime *time.Duration + MYSQLDSNParams map[string]string + SQLTableName *string + SQLDBName *string + PgSSLMode *string + KafkaTopic *string + KafkaTLS *bool + KafkaCAPath *string + KafkaSkipTLSVerify *bool + AMQPRoutingKey *string + AMQPQueueID *string + AMQPExchange *string + AMQPExchangeType *string + AMQPUsername *string + AMQPPassword *string + AWSRegion *string + AWSKey *string + AWSSecret *string + AWSToken *string + SQSQueueID *string + S3BucketID *string + S3FolderPath *string + NATSJetStream *bool + NATSSubject *string + NATSJWTFile *string + NATSSeedFile *string + NATSCertificateAuthority *string + NATSClientCertificate *string + NATSClientKey *string + NATSJetStreamMaxWait *time.Duration + RPCCodec *string + ServiceMethod *string + KeyPath *string + CertPath *string + CAPath *string + TLS *bool + ConnIDs *[]string + RPCConnTimeout *time.Duration + RPCReplyTimeout *time.Duration + RPCAPIOpts map[string]any } // EventExporterCfg the config for a Event Exporter @@ -247,6 +262,55 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) if jsnCfg.CSVFieldSeparator != nil { eeOpts.CSVFieldSeparator = jsnCfg.CSVFieldSeparator } + if jsnCfg.ElsCloud != nil { + eeOpts.ElsCloud = jsnCfg.ElsCloud + } + if jsnCfg.ElsAPIKey != nil { + eeOpts.ElsAPIKey = jsnCfg.ElsAPIKey + } + if jsnCfg.ElsServiceToken != nil { + eeOpts.ElsServiceToken = jsnCfg.ElsServiceToken + } + if jsnCfg.ElsCertificateFingerprint != nil { + eeOpts.ElsCertificateFingerprint = jsnCfg.ElsCertificateFingerprint + } + if jsnCfg.ElsEnableDebugLogger != nil { + eeOpts.ElsEnableDebugLogger = jsnCfg.ElsEnableDebugLogger + } + if jsnCfg.ElsLogger != nil { + eeOpts.ElsLogger = jsnCfg.ElsLogger + } + if jsnCfg.ElsCompressRequestBody != nil { + eeOpts.ElsCompressRequestBody = jsnCfg.ElsCompressRequestBody + } + if jsnCfg.ElsCompressRequestBodyLevel != nil { + eeOpts.ElsCompressRequestBodyLevel = jsnCfg.ElsCompressRequestBodyLevel + } + if jsnCfg.ElsUsername != nil { + eeOpts.ElsUsername = jsnCfg.ElsUsername + } + if jsnCfg.ElsPassword != nil { + eeOpts.ElsPassword = jsnCfg.ElsPassword + } + if jsnCfg.ElsDiscoverNodesOnStart != nil { + eeOpts.ElsDiscoverNodesOnStart = jsnCfg.ElsDiscoverNodesOnStart + } + if jsnCfg.ElsDiscoverNodesInterval != nil { + var nodesInterval time.Duration + if nodesInterval, err = utils.ParseDurationWithSecs(*jsnCfg.ElsDiscoverNodesInterval); err != nil { + return + } + eeOpts.ElsDiscoverNodeInterval = utils.DurationPointer(nodesInterval) + } + if jsnCfg.ElsRetryOnStatus != nil { + eeOpts.ElsRetryOnStatus = jsnCfg.ElsRetryOnStatus + } + if jsnCfg.ElsMaxRetries != nil { + eeOpts.ElsMaxRetries = jsnCfg.ElsMaxRetries + } + if jsnCfg.ElsDisableRetry != nil { + eeOpts.ElsDisableRetry = jsnCfg.ElsDisableRetry + } if jsnCfg.ElsIndex != nil { eeOpts.ElsIndex = jsnCfg.ElsIndex } @@ -826,6 +890,7 @@ func (optsEes *EventExporterOpts) AsMapInterface() map[string]any { if optsEes.CSVFieldSeparator != nil { opts[utils.CSVFieldSepOpt] = *optsEes.CSVFieldSeparator } + if optsEes.ElsIndex != nil { opts[utils.ElsIndex] = *optsEes.ElsIndex } @@ -986,59 +1051,74 @@ func (optsEes *EventExporterOpts) AsMapInterface() map[string]any { } type EventExporterOptsJson struct { - CSVFieldSeparator *string `json:"csvFieldSeparator"` - ElsIndex *string `json:"elsIndex"` - ElsIfPrimaryTerm *int `json:"elsIfPrimaryTerm"` - ElsIfSeqNo *int `json:"elsIfSeqNo"` - ElsOpType *string `json:"elsOpType"` - ElsPipeline *string `json:"elsPipeline"` - ElsRouting *string `json:"elsRouting"` - ElsTimeout *string `json:"elsTimeout"` - ElsVersion *int `json:"elsVersion"` - ElsVersionType *string `json:"elsVersionType"` - ElsWaitForActiveShards *string `json:"elsWaitForActiveShards"` - SQLMaxIdleConns *int `json:"sqlMaxIdleConns"` - SQLMaxOpenConns *int `json:"sqlMaxOpenConns"` - SQLConnMaxLifetime *string `json:"sqlConnMaxLifetime"` - MYSQLDSNParams map[string]string `json:"mysqlDSNParams"` - SQLTableName *string `json:"sqlTableName"` - SQLDBName *string `json:"sqlDBName"` - PgSSLMode *string `json:"pgSSLMode"` - KafkaTopic *string `json:"kafkaTopic"` - KafkaTLS *bool `json:"kafkaTLS"` - KafkaCAPath *string `json:"kafkaCAPath"` - KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"` - AMQPQueueID *string `json:"amqpQueueID"` - AMQPRoutingKey *string `json:"amqpRoutingKey"` - AMQPExchange *string `json:"amqpExchange"` - AMQPExchangeType *string `json:"amqpExchangeType"` - AMQPUsername *string `json:"amqpUsername"` - AMQPPassword *string `json:"amqpPassword"` - AWSRegion *string `json:"awsRegion"` - AWSKey *string `json:"awsKey"` - AWSSecret *string `json:"awsSecret"` - AWSToken *string `json:"awsToken"` - SQSQueueID *string `json:"sqsQueueID"` - S3BucketID *string `json:"s3BucketID"` - S3FolderPath *string `json:"s3FolderPath"` - NATSJetStream *bool `json:"natsJetStream"` - NATSSubject *string `json:"natsSubject"` - NATSJWTFile *string `json:"natsJWTFile"` - NATSSeedFile *string `json:"natsSeedFile"` - NATSCertificateAuthority *string `json:"natsCertificateAuthority"` - NATSClientCertificate *string `json:"natsClientCertificate"` - NATSClientKey *string `json:"natsClientKey"` - NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"` - RPCCodec *string `json:"rpcCodec"` - ServiceMethod *string `json:"serviceMethod"` - KeyPath *string `json:"keyPath"` - CertPath *string `json:"certPath"` - CAPath *string `json:"caPath"` - ConnIDs *[]string `json:"connIDs"` - TLS *bool `json:"tls"` - RPCConnTimeout *string `json:"rpcConnTimeout"` - RPCReplyTimeout *string `json:"rpcReplyTimeout"` - RPCAPIOpts map[string]any `json:"rpcAPIOpts"` + CSVFieldSeparator *string `json:"csvFieldSeparator"` + ElsCloud *bool `json:"elsCloud"` + ElsAPIKey *string `json:"elsApiKey"` + ElsServiceToken *string `json:"elsServiceToken"` + ElsCertificateFingerprint *string `json:"elsCertificateFingerPrint"` + ElsUsername *string `json:"elsUsername"` + ElsPassword *string `json:"elsPassword"` + ElsDiscoverNodesOnStart *bool `json:"elsDiscoverNodesOnStart"` + ElsDiscoverNodesInterval *string `json:"elsDiscoverNodesInterval"` + ElsEnableDebugLogger *bool `json:"elsEnableDebugLogger"` + ElsLogger *string `json:"elsLogger"` + ElsCompressRequestBody *bool `json:"elsCompressRequestBody"` + ElsCompressRequestBodyLevel *int `json:"elsCompressRequestBodyLevel"` + ElsRetryOnStatus *[]int `json:"elsRetryOnStatus"` + ElsMaxRetries *int `json:"elsMaxRetries"` + ElsDisableRetry *bool `json:"elsDisableRetry"` + ElsIndex *string `json:"elsIndex"` + ElsIfPrimaryTerm *int `json:"elsIfPrimaryTerm"` + ElsIfSeqNo *int `json:"elsIfSeqNo"` + ElsOpType *string `json:"elsOpType"` + ElsPipeline *string `json:"elsPipeline"` + ElsRouting *string `json:"elsRouting"` + ElsTimeout *string `json:"elsTimeout"` + ElsVersion *int `json:"elsVersion"` + ElsVersionType *string `json:"elsVersionType"` + ElsWaitForActiveShards *string `json:"elsWaitForActiveShards"` + SQLMaxIdleConns *int `json:"sqlMaxIdleConns"` + SQLMaxOpenConns *int `json:"sqlMaxOpenConns"` + SQLConnMaxLifetime *string `json:"sqlConnMaxLifetime"` + MYSQLDSNParams map[string]string `json:"mysqlDSNParams"` + SQLTableName *string `json:"sqlTableName"` + SQLDBName *string `json:"sqlDBName"` + PgSSLMode *string `json:"pgSSLMode"` + KafkaTopic *string `json:"kafkaTopic"` + KafkaTLS *bool `json:"kafkaTLS"` + KafkaCAPath *string `json:"kafkaCAPath"` + KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"` + AMQPQueueID *string `json:"amqpQueueID"` + AMQPRoutingKey *string `json:"amqpRoutingKey"` + AMQPExchange *string `json:"amqpExchange"` + AMQPExchangeType *string `json:"amqpExchangeType"` + AMQPUsername *string `json:"amqpUsername"` + AMQPPassword *string `json:"amqpPassword"` + AWSRegion *string `json:"awsRegion"` + AWSKey *string `json:"awsKey"` + AWSSecret *string `json:"awsSecret"` + AWSToken *string `json:"awsToken"` + SQSQueueID *string `json:"sqsQueueID"` + S3BucketID *string `json:"s3BucketID"` + S3FolderPath *string `json:"s3FolderPath"` + NATSJetStream *bool `json:"natsJetStream"` + NATSSubject *string `json:"natsSubject"` + NATSJWTFile *string `json:"natsJWTFile"` + NATSSeedFile *string `json:"natsSeedFile"` + NATSCertificateAuthority *string `json:"natsCertificateAuthority"` + NATSClientCertificate *string `json:"natsClientCertificate"` + NATSClientKey *string `json:"natsClientKey"` + NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"` + RPCCodec *string `json:"rpcCodec"` + ServiceMethod *string `json:"serviceMethod"` + KeyPath *string `json:"keyPath"` + CertPath *string `json:"certPath"` + CAPath *string `json:"caPath"` + ConnIDs *[]string `json:"connIDs"` + TLS *bool `json:"tls"` + RPCConnTimeout *string `json:"rpcConnTimeout"` + RPCReplyTimeout *string `json:"rpcReplyTimeout"` + RPCAPIOpts map[string]any `json:"rpcAPIOpts"` } // EventExporterJsonCfg is the configuration of a single EventExporter diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json index 756a0a56d..fc02ae173 100644 --- a/data/conf/samples/ees/cgrates.json +++ b/data/conf/samples/ees/cgrates.json @@ -388,6 +388,151 @@ {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, ], }, + { + "id": "ElasticsearchExporterCluster", + "type": "*els", + "export_path": "http://192.168.56.22:9200;http://192.168.56.64:9200", + "attempts": 1, + "opts": { + "elsIndex": "cdrs", + "elsDiscoverNodesOnStart":true, + //"elsDiscoverNodesInterval":"10s", + // "elsLogger":"elsJson", + // "elsEnableDebugLogger":false, + // "elsCompressRequestBody":false, + // "elsCompressRequestBodyLevel":0, + // "elsRetryOnStatus":[], + // "elsMaxRetries": 0, + // "elsDisableRetry": false, + + //"elsIfPrimaryTerm": 0, + //"elsIfSeqNo": 0, + "elsOpType": "", + "elsPipeline": "", + "require_alias": "", + "elsRouting": "", + "elsTimeout": "0", + //"elsVersion": 0, + "elsVersionType": "", + "elsWaitForActiveShards": "", + }, + "fields":[ + {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, + {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, + {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, + {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, + {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, + {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*vars.*tenant"}, + {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, + {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, + {"tag": "@timestamp", "path": "*exp.@timestamp", "type": "*datetime", "value": "*now"}, + {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime"}, + {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime"}, + {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, + {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, + ], + }, + { + "id": "ElasticsearchExporterCloud", + "type": "*els", + "export_path": "Deployment:dXMtY2VudHJh0YW==", + "attempts": 1, + "opts": { + "elsIndex": "cdrs", + "elsUsername":"elastic_user", + "elsPassword":"", + "elsCloud":true, + //"elsApiKey":"aZmd2UQ==", + // "elsLogger":"elsJson", + // "elsEnableDebugLogger":false, + // "elsCompressRequestBody":false, + // "elsCompressRequestBodyLevel":0, + // "elsRetryOnStatus":[], + // "elsMaxRetries": 0, + // "elsDisableRetry": false, + + //"elsIfPrimaryTerm": 0, + //"elsIfSeqNo": 0, + "elsOpType": "", + "elsPipeline": "", + "require_alias": "", + "elsRouting": "", + "elsTimeout": "0", + //"elsVersion": 0, + "elsVersionType": "", + "elsWaitForActiveShards": "", + }, + "fields":[ + {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, + {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, + {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, + {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, + {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, + {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*vars.*tenant"}, + {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, + {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, + {"tag": "@timestamp", "path": "*exp.@timestamp", "type": "*datetime", "value": "*now"}, + {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime"}, + {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime"}, + {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, + {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, + ], + }, + { + "id": "ElasticsearchExporterHttps", + "type": "*els", + "export_path": "https://192.168.56.29:9200", + "attempts": 1, + "opts": { + "elsIndex": "cdrs", + "elsUsername":"elastic", + "elsPassword":"", + "caPath":"/path/to/http_ca.crt", + //"elsCertificateFingerPrint":"", + // "elsServiceToken":"" + // "elsLogger":"elsJson", + // "elsDiscoverNodesOnStart":true, + // "elsDiscoverNodesInterval":"10s", + // "elsEnableDebugLogger":false, + // "elsCompressRequestBody":true, + // "elsCompressRequestBodyLevel":0, + // "elsRetryOnStatus":[], + // "elsMaxRetries": 0, + // "elsDisableRetry": false, + + //"elsIfPrimaryTerm": 0, + //"elsIfSeqNo": 0, + "elsOpType": "", + "elsPipeline": "", + "require_alias": "", + "elsRouting": "", + "elsTimeout": "0", + //"elsVersion": 0, + "elsVersionType": "", + "elsWaitForActiveShards": "", + }, + "fields":[ + {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, + {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, + {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, + {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, + {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, + {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*vars.*tenant"}, + {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, + {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, + {"tag": "@timestamp", "path": "*exp.@timestamp", "type": "*datetime", "value": "*now"}, + {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime"}, + {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime"}, + {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, + {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, + ], + }, { "id": "HTTPJsonMapExporterWithNoFields", "type": "*httpJSONMap", diff --git a/ees/ees.go b/ees/ees.go index d60c279c0..0683ffe9b 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -268,6 +268,7 @@ func exportEventWithExporter(ctx *context.Context, exp EventExporter, connMngr * utils.MetaDC: exp.GetMetrics(), utils.MetaOpts: utils.MapStorage(ev.APIOpts), utils.MetaCfg: cfg.GetDataProvider(), + utils.MetaVars: utils.MapStorage{utils.MetaTenant: ev.Tenant}, }, utils.FirstNonEmpty(ev.Tenant, cfg.GeneralCfg().DefaultTenant), filterS, map[string]*utils.OrderedNavigableMap{utils.MetaExp: expNM}).SetFields(ctx, exp.Cfg().ContentFields()) diff --git a/ees/elastic.go b/ees/elastic.go index 870f77dd3..7a01e25a2 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -22,16 +22,18 @@ import ( "bytes" "encoding/json" "fmt" + "os" "strings" "sync" - "github.com/elastic/go-elasticsearch/esapi" + "github.com/elastic/elastic-transport-go/v8/elastictransport" + "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" - elasticsearch "github.com/elastic/go-elasticsearch" + elasticsearch "github.com/elastic/go-elasticsearch/v8" ) func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (eEe *ElasticEE, err error) { @@ -46,11 +48,12 @@ func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (eEe * // ElasticEE implements EventExporter interface for ElasticSearch export type ElasticEE struct { - cfg *config.EventExporterCfg - eClnt *elasticsearch.Client - dc *utils.SafeMapStorage - opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap - reqs *concReq + cfg *config.EventExporterCfg + eClnt *elasticsearch.Client + dc *utils.SafeMapStorage + opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap + clntOpts elasticsearch.Config + reqs *concReq sync.RWMutex bytePreparing } @@ -83,6 +86,74 @@ func (eEe *ElasticEE) prepareOpts() (err error) { if eEe.Cfg().Opts.ElsWaitForActiveShards != nil { eEe.opts.WaitForActiveShards = *eEe.Cfg().Opts.ElsWaitForActiveShards } + + //client opts + if eEe.Cfg().Opts.ElsCloud != nil && *eEe.Cfg().Opts.ElsCloud { + eEe.clntOpts.CloudID = eEe.Cfg().ExportPath + } else { + eEe.clntOpts.Addresses = strings.Split(eEe.Cfg().ExportPath, utils.InfieldSep) + } + if eEe.Cfg().Opts.ElsUsername != nil { + eEe.clntOpts.Username = *eEe.Cfg().Opts.ElsUsername + } + if eEe.Cfg().Opts.ElsPassword != nil { + eEe.clntOpts.Password = *eEe.Cfg().Opts.ElsPassword + } + if eEe.Cfg().Opts.ElsAPIKey != nil { + eEe.clntOpts.APIKey = *eEe.Cfg().Opts.ElsAPIKey + } + if eEe.Cfg().Opts.CAPath != nil { + var cacert []byte + cacert, err = os.ReadFile(*eEe.Cfg().Opts.CAPath) + if err != nil { + return + } + eEe.clntOpts.CACert = cacert + } + if eEe.Cfg().Opts.ElsCertificateFingerprint != nil { + eEe.clntOpts.CertificateFingerprint = *eEe.Cfg().Opts.ElsCertificateFingerprint + } + if eEe.Cfg().Opts.ElsServiceToken != nil { + eEe.clntOpts.ServiceToken = *eEe.Cfg().Opts.ElsServiceToken + } + if eEe.Cfg().Opts.ElsDiscoverNodesOnStart != nil { + eEe.clntOpts.DiscoverNodesOnStart = *eEe.Cfg().Opts.ElsDiscoverNodesOnStart + } + if eEe.Cfg().Opts.ElsDiscoverNodeInterval != nil { + eEe.clntOpts.DiscoverNodesInterval = *eEe.Cfg().Opts.ElsDiscoverNodeInterval + } + if eEe.Cfg().Opts.ElsEnableDebugLogger != nil { + eEe.clntOpts.EnableDebugLogger = *eEe.Cfg().Opts.ElsEnableDebugLogger + } + if loggerType := eEe.Cfg().Opts.ElsLogger; loggerType != nil { + var logger elastictransport.Logger + switch *loggerType { + case utils.ElsJson: + logger = &elastictransport.JSONLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true} + case utils.ElsColor: + logger = &elastictransport.ColorLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true} + case utils.ElsText: + logger = &elastictransport.TextLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true} + default: + return + } + eEe.clntOpts.Logger = logger + } + if eEe.Cfg().Opts.ElsCompressRequestBody != nil { + eEe.clntOpts.CompressRequestBody = *eEe.Cfg().Opts.ElsCompressRequestBody + } + if eEe.Cfg().Opts.ElsRetryOnStatus != nil { + eEe.clntOpts.RetryOnStatus = *eEe.Cfg().Opts.ElsRetryOnStatus + } + if eEe.Cfg().Opts.ElsMaxRetries != nil { + eEe.clntOpts.MaxRetries = *eEe.Cfg().Opts.ElsMaxRetries + } + if eEe.Cfg().Opts.ElsDisableRetry != nil { + eEe.clntOpts.DisableRetry = *eEe.Cfg().Opts.ElsDisableRetry + } + if eEe.Cfg().Opts.ElsCompressRequestBodyLevel != nil { + eEe.clntOpts.CompressRequestBodyLevel = *eEe.Cfg().Opts.ElsCompressRequestBodyLevel + } return } @@ -91,11 +162,10 @@ func (eEe *ElasticEE) Cfg() *config.EventExporterCfg { return eEe.cfg } func (eEe *ElasticEE) Connect() (err error) { eEe.Lock() // create the client - if eEe.eClnt == nil { - eEe.eClnt, err = elasticsearch.NewClient( - elasticsearch.Config{Addresses: strings.Split(eEe.Cfg().ExportPath, utils.InfieldSep)}, - ) + if eEe.eClnt != nil { + return } + eEe.eClnt, err = elasticsearch.NewClient(eEe.clntOpts) eEe.Unlock() return } @@ -120,7 +190,6 @@ func (eEe *ElasticEE) ExportEvent(ctx *context.Context, ev, extraData any) (err IfPrimaryTerm: eEe.opts.IfPrimaryTerm, IfSeqNo: eEe.opts.IfSeqNo, OpType: eEe.opts.OpType, - Parent: eEe.opts.Parent, Pipeline: eEe.opts.Pipeline, Routing: eEe.opts.Routing, Timeout: eEe.opts.Timeout, diff --git a/ees/elastic_it_test.go b/ees/elastic_it_test.go index 15ad7c67b..434452cdc 100644 --- a/ees/elastic_it_test.go +++ b/ees/elastic_it_test.go @@ -34,7 +34,7 @@ import ( "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/utils" - elasticsearch "github.com/elastic/go-elasticsearch" + elasticsearch "github.com/elastic/go-elasticsearch/v8" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" diff --git a/go.mod b/go.mod index dae782b68..cc5f97efa 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/cgrates/ugocodec v0.0.0-20201023092048-df93d0123f60 github.com/creack/pty v1.1.18 github.com/dgrijalva/jwt-go v3.2.0+incompatible - github.com/elastic/go-elasticsearch v0.0.0 + github.com/elastic/go-elasticsearch/v8 v8.8.0 github.com/ericlagergren/decimal v0.0.0-20211103172832-aca2edc11f73 github.com/fiorix/go-diameter/v4 v4.0.4 github.com/fsnotify/fsnotify v1.5.4 @@ -114,6 +114,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.2.2 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6 // indirect github.com/google/uuid v1.3.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect diff --git a/go.sum b/go.sum index 3a2c605ef..0b8129476 100644 --- a/go.sum +++ b/go.sum @@ -183,8 +183,16 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6 h1:1+44gxLdKRnR/Bx/iAtr+XqNcE4e0oODa63+FABNANI= +github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI= +github.com/elastic/elastic-transport-go/v8 v8.3.0 h1:DJGxovyQLXGr62e9nDMPSxRyWION0Bh6d9eCFBriiHo= +github.com/elastic/elastic-transport-go/v8 v8.3.0/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI= github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWnBO0y+TZaA= github.com/elastic/go-elasticsearch v0.0.0/go.mod h1:TkBSJBuTyFdBnrNqoPc54FN0vKf5c04IdM4zuStJ7xg= +github.com/elastic/go-elasticsearch/v8 v8.8.0 h1:yNBPlXNo6wstMG7I3KiZPbLFgA82RMryYqkh1xBMV3A= +github.com/elastic/go-elasticsearch/v8 v8.8.0/go.mod h1:NGmpvohKiRHXI0Sw4fuUGn6hYOmAXlyCphKpzVBiqDE= +github.com/elastic/go-elasticsearch/v8 v8.8.1 h1:/OiP5Yex40q5eWpzFVQIS8jRE7SaEZrFkG9JbE6TXtY= +github.com/elastic/go-elasticsearch/v8 v8.8.1/go.mod h1:GU1BJHO7WeamP7UhuElYwzzHtvf9SDmeVpSSy9+o6Qg= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= diff --git a/utils/consts.go b/utils/consts.go index 8ad5f058b..145644a0e 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2496,6 +2496,10 @@ const ( ElsVersionLow = "elsVersion" ElsVersionType = "elsVersionType" ElsWaitForActiveShards = "elsWaitForActiveShards" + //EES ElasticSearch Logger Options + ElsJson = "elsJson" + ElsColor = "elsColor" + ElsText = "elsText" // nats NatsSubject = "natsSubject" NatsQueueID = "natsQueueID"