From fa09ca2f097dfbd0a74eb300c4b924ae126b25ec Mon Sep 17 00:00:00 2001 From: gezimbll Date: Tue, 13 Jun 2023 11:00:50 -0400 Subject: [PATCH] Added exporter config for els cloud --- config/eescfg.go | 136 +++++++++++++++++------------ config/libconfig_json.go | 118 +++++++++++++------------ data/conf/samples/ees/cgrates.json | 39 +++++++++ ees/elastic.go | 24 +++++ 4 files changed, 205 insertions(+), 112 deletions(-) diff --git a/config/eescfg.go b/config/eescfg.go index db208caa8..8a210b8da 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -154,62 +154,68 @@ func (eeS *EEsCfg) AsMapInterface(separator string) (initialMP map[string]any) { } type EventExporterOpts struct { - CSVFieldSeparator *string - ElsIndex *string - ElsIfPrimaryTerm *int - ElsDiscoverNodesOnStart *bool - ElsDiscoverNodeInterval *time.Duration - ElsCloud *bool - ElsAPIKey *string - ElsUsername *string // Username for HTTP Basic Authentication. - ElsPassword *string - ElsIfSeqNo *int - ElsOpType *string - ElsPipeline *string - ElsRouting *string - ElsTimeout *time.Duration - ElsVersion *int - ElsVersionType *string - ElsWaitForActiveShards *string - SQLMaxIdleConns *int - SQLMaxOpenConns *int - SQLConnMaxLifetime *time.Duration - MYSQLDSNParams map[string]string - SQLTableName *string - SQLDBName *string - PgSSLMode *string - KafkaTopic *string - AMQPRoutingKey *string - AMQPQueueID *string - AMQPExchange *string - AMQPExchangeType *string - AMQPUsername *string - AMQPPassword *string - AWSRegion *string - AWSKey *string - AWSSecret *string - AWSToken *string - SQSQueueID *string - S3BucketID *string - S3FolderPath *string - NATSJetStream *bool - NATSSubject *string - NATSJWTFile *string - NATSSeedFile *string - NATSCertificateAuthority *string - NATSClientCertificate *string - NATSClientKey *string - NATSJetStreamMaxWait *time.Duration - RPCCodec *string - ServiceMethod *string - KeyPath *string - CertPath *string - CAPath *string - TLS *bool - ConnIDs *[]string - RPCConnTimeout *time.Duration - RPCReplyTimeout *time.Duration - RPCAPIOpts map[string]any + CSVFieldSeparator *string + ElsIndex *string + ElsIfPrimaryTerm *int + ElsDiscoverNodesOnStart *bool + ElsDiscoverNodeInterval *time.Duration + ElsCloud *bool + ElsAPIKey *string + ElsCACert *string + ElsCertificateFingerprint *string + ElsServiceToken *string + ElsUsername *string // Username for HTTP Basic Authentication. + ElsPassword *string + ElsEnableDebugLogger *bool + ElsCompressRequestBody *bool + ElsRetryOnStatus *[]int + ElsIfSeqNo *int + ElsOpType *string + ElsPipeline *string + ElsRouting *string + ElsTimeout *time.Duration + ElsVersion *int + ElsVersionType *string + ElsWaitForActiveShards *string + SQLMaxIdleConns *int + SQLMaxOpenConns *int + SQLConnMaxLifetime *time.Duration + MYSQLDSNParams map[string]string + SQLTableName *string + SQLDBName *string + PgSSLMode *string + KafkaTopic *string + AMQPRoutingKey *string + AMQPQueueID *string + AMQPExchange *string + AMQPExchangeType *string + AMQPUsername *string + AMQPPassword *string + AWSRegion *string + AWSKey *string + AWSSecret *string + AWSToken *string + SQSQueueID *string + S3BucketID *string + S3FolderPath *string + NATSJetStream *bool + NATSSubject *string + NATSJWTFile *string + NATSSeedFile *string + NATSCertificateAuthority *string + NATSClientCertificate *string + NATSClientKey *string + NATSJetStreamMaxWait *time.Duration + RPCCodec *string + ServiceMethod *string + KeyPath *string + CertPath *string + CAPath *string + TLS *bool + ConnIDs *[]string + RPCConnTimeout *time.Duration + RPCReplyTimeout *time.Duration + RPCAPIOpts map[string]any } // EventExporterCfg the config for a Event Exporter @@ -262,6 +268,21 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) if jsnCfg.ElsAPIKey != nil { eeOpts.ElsAPIKey = jsnCfg.ElsAPIKey } + if jsnCfg.CAPath != nil { + eeOpts.CAPath = jsnCfg.CAPath + } + if jsnCfg.ElsServiceToken != nil { + eeOpts.ElsServiceToken = jsnCfg.ElsServiceToken + } + if jsnCfg.ElsCertificateFingerprint != nil { + eeOpts.ElsCertificateFingerprint = jsnCfg.ElsCertificateFingerprint + } + if jsnCfg.ElsEnableDebugLogger != nil { + eeOpts.ElsEnableDebugLogger = jsnCfg.ElsEnableDebugLogger + } + if jsnCfg.ElsCompressRequestBody != nil { + eeOpts.ElsCompressRequestBody = jsnCfg.ElsCompressRequestBody + } if jsnCfg.ElsUsername != nil { eeOpts.ElsUsername = jsnCfg.ElsUsername } @@ -278,6 +299,9 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) } eeOpts.ElsDiscoverNodeInterval = utils.DurationPointer(nodesInterval) } + if jsnCfg.ElsRetryOnStatus != nil { + eeOpts.ElsRetryOnStatus = jsnCfg.ElsRetryOnStatus + } if jsnCfg.ElsIndex != nil { eeOpts.ElsIndex = jsnCfg.ElsIndex } diff --git a/config/libconfig_json.go b/config/libconfig_json.go index b78ef00f8..6e46128bf 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -298,62 +298,68 @@ type EEsJsonCfg struct { } type EventExporterOptsJson struct { - CSVFieldSeparator *string `json:"csvFieldSeparator"` - ElsCloud *bool `json:"elsCloud"` - ElsAPIKey *string `json:"elsApiKey"` - ElsUsername *string `json:"elsUsername"` - ElsPassword *string `json:"elsPassword"` - ElsDiscoverNodesOnStart *bool `json:"elsDiscoverNodesOnStart"` - ElsDiscoverNodesInterval *string `json:"elsDiscoverNodesInterval"` - ElsIndex *string `json:"elsIndex"` - ElsIfPrimaryTerm *int `json:"elsIfPrimaryTerm"` - ElsIfSeqNo *int `json:"elsIfSeqNo"` - ElsOpType *string `json:"elsOpType"` - ElsPipeline *string `json:"elsPipeline"` - ElsRouting *string `json:"elsRouting"` - ElsTimeout *string `json:"elsTimeout"` - ElsVersion *int `json:"elsVersion"` - ElsVersionType *string `json:"elsVersionType"` - ElsWaitForActiveShards *string `json:"elsWaitForActiveShards"` - SQLMaxIdleConns *int `json:"sqlMaxIdleConns"` - SQLMaxOpenConns *int `json:"sqlMaxOpenConns"` - SQLConnMaxLifetime *string `json:"sqlConnMaxLifetime"` - MYSQLDSNParams map[string]string `json:"mysqlDSNParams"` - SQLTableName *string `json:"sqlTableName"` - SQLDBName *string `json:"sqlDBName"` - PgSSLMode *string `json:"pgSSLMode"` - KafkaTopic *string `json:"kafkaTopic"` - AMQPQueueID *string `json:"amqpQueueID"` - AMQPRoutingKey *string `json:"amqpRoutingKey"` - AMQPExchange *string `json:"amqpExchange"` - AMQPExchangeType *string `json:"amqpExchangeType"` - AMQPUsername *string `json:"amqpUsername"` - AMQPPassword *string `json:"amqpPassword"` - AWSRegion *string `json:"awsRegion"` - AWSKey *string `json:"awsKey"` - AWSSecret *string `json:"awsSecret"` - AWSToken *string `json:"awsToken"` - SQSQueueID *string `json:"sqsQueueID"` - S3BucketID *string `json:"s3BucketID"` - S3FolderPath *string `json:"s3FolderPath"` - NATSJetStream *bool `json:"natsJetStream"` - NATSSubject *string `json:"natsSubject"` - NATSJWTFile *string `json:"natsJWTFile"` - NATSSeedFile *string `json:"natsSeedFile"` - NATSCertificateAuthority *string `json:"natsCertificateAuthority"` - NATSClientCertificate *string `json:"natsClientCertificate"` - NATSClientKey *string `json:"natsClientKey"` - NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"` - RPCCodec *string `json:"rpcCodec"` - ServiceMethod *string `json:"serviceMethod"` - KeyPath *string `json:"keyPath"` - CertPath *string `json:"certPath"` - CAPath *string `json:"caPath"` - ConnIDs *[]string `json:"connIDs"` - TLS *bool `json:"tls"` - RPCConnTimeout *string `json:"rpcConnTimeout"` - RPCReplyTimeout *string `json:"rpcReplyTimeout"` - RPCAPIOpts map[string]any `json:"rpcAPIOpts"` + CSVFieldSeparator *string `json:"csvFieldSeparator"` + ElsCloud *bool `json:"elsCloud"` + ElsAPIKey *string `json:"elsApiKey"` + ElsServiceToken *string `json:"elsServiceToken"` + ElsCACert *string `json:"elsCACert"` + ElsCertificateFingerprint *string `json:"elsCertificateFingerPrint"` + ElsUsername *string `json:"elsUsername"` + ElsPassword *string `json:"elsPassword"` + ElsDiscoverNodesOnStart *bool `json:"elsDiscoverNodesOnStart"` + ElsDiscoverNodesInterval *string `json:"elsDiscoverNodesInterval"` + ElsEnableDebugLogger *bool `json:"elsEnableDebugLogger"` + ElsCompressRequestBody *bool `json:"elsCompressRequestBody"` + ElsRetryOnStatus *[]int `json:"elsRetryOnStatus"` + ElsIndex *string `json:"elsIndex"` + ElsIfPrimaryTerm *int `json:"elsIfPrimaryTerm"` + ElsIfSeqNo *int `json:"elsIfSeqNo"` + ElsOpType *string `json:"elsOpType"` + ElsPipeline *string `json:"elsPipeline"` + ElsRouting *string `json:"elsRouting"` + ElsTimeout *string `json:"elsTimeout"` + ElsVersion *int `json:"elsVersion"` + ElsVersionType *string `json:"elsVersionType"` + ElsWaitForActiveShards *string `json:"elsWaitForActiveShards"` + SQLMaxIdleConns *int `json:"sqlMaxIdleConns"` + SQLMaxOpenConns *int `json:"sqlMaxOpenConns"` + SQLConnMaxLifetime *string `json:"sqlConnMaxLifetime"` + MYSQLDSNParams map[string]string `json:"mysqlDSNParams"` + SQLTableName *string `json:"sqlTableName"` + SQLDBName *string `json:"sqlDBName"` + PgSSLMode *string `json:"pgSSLMode"` + KafkaTopic *string `json:"kafkaTopic"` + AMQPQueueID *string `json:"amqpQueueID"` + AMQPRoutingKey *string `json:"amqpRoutingKey"` + AMQPExchange *string `json:"amqpExchange"` + AMQPExchangeType *string `json:"amqpExchangeType"` + AMQPUsername *string `json:"amqpUsername"` + AMQPPassword *string `json:"amqpPassword"` + AWSRegion *string `json:"awsRegion"` + AWSKey *string `json:"awsKey"` + AWSSecret *string `json:"awsSecret"` + AWSToken *string `json:"awsToken"` + SQSQueueID *string `json:"sqsQueueID"` + S3BucketID *string `json:"s3BucketID"` + S3FolderPath *string `json:"s3FolderPath"` + NATSJetStream *bool `json:"natsJetStream"` + NATSSubject *string `json:"natsSubject"` + NATSJWTFile *string `json:"natsJWTFile"` + NATSSeedFile *string `json:"natsSeedFile"` + NATSCertificateAuthority *string `json:"natsCertificateAuthority"` + NATSClientCertificate *string `json:"natsClientCertificate"` + NATSClientKey *string `json:"natsClientKey"` + NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"` + RPCCodec *string `json:"rpcCodec"` + ServiceMethod *string `json:"serviceMethod"` + KeyPath *string `json:"keyPath"` + CertPath *string `json:"certPath"` + CAPath *string `json:"caPath"` + ConnIDs *[]string `json:"connIDs"` + TLS *bool `json:"tls"` + RPCConnTimeout *string `json:"rpcConnTimeout"` + RPCReplyTimeout *string `json:"rpcReplyTimeout"` + RPCAPIOpts map[string]any `json:"rpcAPIOpts"` } // EventExporterJsonCfg is the configuration of a single EventExporter diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json index 67e06d885..e7ceaf842 100644 --- a/data/conf/samples/ees/cgrates.json +++ b/data/conf/samples/ees/cgrates.json @@ -395,6 +395,45 @@ {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, ], }, + { + "id": "ElasticsearchExporterCloud", + "type": "*els", + "export_path": "Deployment:dXMtY2c0ZjI0YWN3Zg==", + "attempts": 1, + "opts": { + "elsIndex": "cdrs", + "elsUsername":"elastic_user", + "elsPassword":"rhYwyCE234", + "elsCloud":true, + //"elsApiKey":"aZmd2UQ==", + //"elsIfPrimaryTerm": 0, + //"elsIfSeqNo": 0, + "elsOpType": "", + "elsPipeline": "", + "require_alias": "", + "elsRouting": "", + "elsTimeout": "0", + //"elsVersion": 0, + "elsVersionType": "", + "elsWaitForActiveShards": "", + }, + "fields":[ + {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, + {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, + {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, + {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, + {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, + {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"}, + {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, + {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, + {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime"}, + {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime"}, + {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, + {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, + ], + }, { "id": "HTTPJsonMapExporterWithNoFields", "type": "*http_json_map", diff --git a/ees/elastic.go b/ees/elastic.go index 370dc494b..cd994c63f 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -23,6 +23,7 @@ import ( "context" "encoding/json" "fmt" + "os" "strings" "sync" @@ -99,12 +100,35 @@ func (eEe *ElasticEE) prepareOpts() (err error) { if eEe.Cfg().Opts.ElsAPIKey != nil { eEe.clnOpts.APIKey = *eEe.Cfg().Opts.ElsAPIKey } + if eEe.Cfg().Opts.ElsCACert != nil { + var cacert []byte + cacert, err = os.ReadFile(*eEe.Cfg().Opts.ElsCACert) + if err != nil { + return + } + eEe.clnOpts.CACert = cacert + } + if eEe.Cfg().Opts.ElsCertificateFingerprint != nil { + eEe.clnOpts.CertificateFingerprint = *eEe.Cfg().Opts.ElsCertificateFingerprint + } + if eEe.Cfg().Opts.ElsServiceToken != nil { + eEe.clnOpts.ServiceToken = *eEe.Cfg().Opts.ElsServiceToken + } if eEe.Cfg().Opts.ElsDiscoverNodesOnStart != nil { eEe.clnOpts.DiscoverNodesOnStart = *eEe.Cfg().Opts.ElsDiscoverNodesOnStart } if eEe.Cfg().Opts.ElsDiscoverNodeInterval != nil { eEe.clnOpts.DiscoverNodesInterval = *eEe.Cfg().Opts.ElsDiscoverNodeInterval } + if eEe.Cfg().Opts.ElsEnableDebugLogger != nil { + eEe.clnOpts.EnableDebugLogger = *eEe.Cfg().Opts.ElsEnableDebugLogger + } + if eEe.Cfg().Opts.ElsCompressRequestBody != nil { + eEe.clnOpts.CompressRequestBody = *eEe.Cfg().Opts.ElsCompressRequestBody + } + if eEe.Cfg().Opts.ElsRetryOnStatus != nil { + eEe.clnOpts.RetryOnStatus = *eEe.Cfg().Opts.ElsRetryOnStatus + } return }