From 9b9c7bb9141bc751f3ffb64b84f23f274a71d770 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Mon, 27 Jan 2025 19:58:43 +0200 Subject: [PATCH] Make elasticsearch refresh opt configurable --- config/config_defaults.go | 1 + config/eescfg.go | 125 +++++++++++++++++++++++--------------- config/eescfg_test.go | 7 +++ efs/failed_ees.go | 3 + utils/consts.go | 1 + 5 files changed, 87 insertions(+), 50 deletions(-) diff --git a/config/config_defaults.go b/config/config_defaults.go index d0b3e848f..9ad7e12cc 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -556,6 +556,7 @@ const CGRATES_CFG_JSON = ` // "elsMaxRetries": 0, // Default: 3. // "elsDisableRetry": false, // Default: false. // "elsIndex": "", // ElsIndex + // "elsRefresh": "false", // controls when changes become visible // "elsOpType": "", // ElsOpType // "elsPipeline": "", // ElsPipeline // "elsRouting": "", // ElsRouting diff --git a/config/eescfg.go b/config/eescfg.go index 15ca152c1..e2216bdb2 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -157,8 +157,18 @@ func (eeS *EEsCfg) exporterIDs() []string { } type EventExporterOpts struct { - CSVFieldSeparator *string - ElsIndex *string + CSVFieldSeparator *string + + // elasticsearch index request opts + ElsIndex *string + ElsRefresh *string + ElsOpType *string + ElsPipeline *string + ElsRouting *string + ElsTimeout *time.Duration + ElsWaitForActiveShards *string + + // elasticsearch client opts ElsDiscoverNodesOnStart *bool ElsDiscoverNodeInterval *time.Duration ElsCloud *bool @@ -174,54 +184,50 @@ type EventExporterOpts struct { ElsRetryOnStatus *[]int ElsMaxRetries *int ElsDisableRetry *bool - ElsOpType *string - ElsPipeline *string - ElsRouting *string - ElsTimeout *time.Duration - ElsWaitForActiveShards *string - SQLMaxIdleConns *int - SQLMaxOpenConns *int - SQLConnMaxLifetime *time.Duration - MYSQLDSNParams map[string]string - SQLTableName *string - SQLDBName *string - PgSSLMode *string - KafkaTopic *string - KafkaBatchSize *int - KafkaTLS *bool - KafkaCAPath *string - KafkaSkipTLSVerify *bool - AMQPRoutingKey *string - AMQPQueueID *string - AMQPExchange *string - AMQPExchangeType *string - AMQPUsername *string - AMQPPassword *string - AWSRegion *string - AWSKey *string - AWSSecret *string - AWSToken *string - SQSQueueID *string - S3BucketID *string - S3FolderPath *string - NATSJetStream *bool - NATSSubject *string - NATSJWTFile *string - NATSSeedFile *string - NATSCertificateAuthority *string - NATSClientCertificate *string - NATSClientKey *string - NATSJetStreamMaxWait *time.Duration - RPCCodec *string - ServiceMethod *string - KeyPath *string - CertPath *string - CAPath *string - TLS *bool - ConnIDs *[]string - RPCConnTimeout *time.Duration - RPCReplyTimeout *time.Duration - RPCAPIOpts map[string]any + + SQLMaxIdleConns *int + SQLMaxOpenConns *int + SQLConnMaxLifetime *time.Duration + MYSQLDSNParams map[string]string + SQLTableName *string + SQLDBName *string + PgSSLMode *string + KafkaTopic *string + KafkaBatchSize *int + KafkaTLS *bool + KafkaCAPath *string + KafkaSkipTLSVerify *bool + AMQPRoutingKey *string + AMQPQueueID *string + AMQPExchange *string + AMQPExchangeType *string + AMQPUsername *string + AMQPPassword *string + AWSRegion *string + AWSKey *string + AWSSecret *string + AWSToken *string + SQSQueueID *string + S3BucketID *string + S3FolderPath *string + NATSJetStream *bool + NATSSubject *string + NATSJWTFile *string + NATSSeedFile *string + NATSCertificateAuthority *string + NATSClientCertificate *string + NATSClientKey *string + NATSJetStreamMaxWait *time.Duration + RPCCodec *string + ServiceMethod *string + KeyPath *string + CertPath *string + CAPath *string + TLS *bool + ConnIDs *[]string + RPCConnTimeout *time.Duration + RPCReplyTimeout *time.Duration + RPCAPIOpts map[string]any } // EventExporterCfg the config for a Event Exporter @@ -322,6 +328,9 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) if jsnCfg.ElsIndex != nil { eeOpts.ElsIndex = jsnCfg.ElsIndex } + if jsnCfg.ElsRefresh != nil { + eeOpts.ElsRefresh = jsnCfg.ElsRefresh + } if jsnCfg.ElsOpType != nil { eeOpts.ElsOpType = jsnCfg.ElsOpType } @@ -598,6 +607,10 @@ func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts { cln.ElsIndex = new(string) *cln.ElsIndex = *eeOpts.ElsIndex } + if eeOpts.ElsRefresh != nil { + cln.ElsRefresh = new(string) + *cln.ElsRefresh = *eeOpts.ElsRefresh + } if eeOpts.ElsOpType != nil { cln.ElsOpType = new(string) *cln.ElsOpType = *eeOpts.ElsOpType @@ -881,6 +894,9 @@ func (optsEes *EventExporterOpts) AsMapInterface() map[string]any { if optsEes.ElsIndex != nil { opts[utils.ElsIndex] = *optsEes.ElsIndex } + if optsEes.ElsRefresh != nil { + opts[utils.ElsRefresh] = *optsEes.ElsRefresh + } if optsEes.ElsOpType != nil { opts[utils.ElsOpType] = *optsEes.ElsOpType } @@ -1046,6 +1062,7 @@ type EventExporterOptsJson struct { ElsMaxRetries *int `json:"elsMaxRetries"` ElsDisableRetry *bool `json:"elsDisableRetry"` ElsIndex *string `json:"elsIndex"` + ElsRefresh *string `json:"elsRefresh"` ElsOpType *string `json:"elsOpType"` ElsPipeline *string `json:"elsPipeline"` ElsRouting *string `json:"elsRouting"` @@ -1136,6 +1153,14 @@ func diffEventExporterOptsJsonCfg(d *EventExporterOptsJson, v1, v2 *EventExporte } else { d.ElsIndex = nil } + if v2.ElsRefresh != nil { + if v1.ElsRefresh == nil || + *v1.ElsRefresh != *v2.ElsRefresh { + d.ElsRefresh = v2.ElsRefresh + } + } else { + d.ElsRefresh = nil + } if v2.ElsOpType != nil { if v1.ElsOpType == nil || *v1.ElsOpType != *v2.ElsOpType { diff --git a/config/eescfg_test.go b/config/eescfg_test.go index 921eea06b..a3cbd35da 100644 --- a/config/eescfg_test.go +++ b/config/eescfg_test.go @@ -1189,6 +1189,7 @@ func TestDiffEventExporterOptsJsonCfg(t *testing.T) { v2 := &EventExporterOpts{ CSVFieldSeparator: utils.StringPointer(","), ElsIndex: utils.StringPointer("idx1"), + ElsRefresh: utils.StringPointer("true"), ElsOpType: utils.StringPointer("op_type"), ElsPipeline: utils.StringPointer("pipeline"), ElsRouting: utils.StringPointer("routing"), @@ -1313,6 +1314,7 @@ func TestEventExporterOptsClone(t *testing.T) { eeOpts := &EventExporterOpts{ CSVFieldSeparator: utils.StringPointer(","), ElsIndex: utils.StringPointer("idx1"), + ElsRefresh: utils.StringPointer("true"), ElsOpType: utils.StringPointer("op_type"), ElsPipeline: utils.StringPointer("pipeline"), ElsRouting: utils.StringPointer("routing"), @@ -1364,6 +1366,7 @@ func TestEventExporterOptsClone(t *testing.T) { exp := &EventExporterOpts{ CSVFieldSeparator: utils.StringPointer(","), ElsIndex: utils.StringPointer("idx1"), + ElsRefresh: utils.StringPointer("true"), ElsOpType: utils.StringPointer("op_type"), ElsPipeline: utils.StringPointer("pipeline"), ElsRouting: utils.StringPointer("routing"), @@ -1473,6 +1476,7 @@ func TestLoadFromJSONCfg(t *testing.T) { exp := &EventExporterOpts{ CSVFieldSeparator: utils.StringPointer(","), ElsIndex: utils.StringPointer("idx1"), + ElsRefresh: utils.StringPointer("true"), ElsOpType: utils.StringPointer("op_type"), ElsPipeline: utils.StringPointer("pipeline"), ElsRouting: utils.StringPointer("routing"), @@ -1626,6 +1630,7 @@ func TestEEsAsMapInterface(t *testing.T) { Opts: &EventExporterOpts{ CSVFieldSeparator: utils.StringPointer(","), ElsIndex: utils.StringPointer("idx1"), + ElsRefresh: utils.StringPointer("true"), ElsOpType: utils.StringPointer("op_type"), ElsPipeline: utils.StringPointer("pipeline"), ElsRouting: utils.StringPointer("routing"), @@ -1741,6 +1746,7 @@ func TestEescfgNewEventExporterCfg(t *testing.T) { eeo := &EventExporterOpts{ CSVFieldSeparator: &str, ElsIndex: &str, + ElsRefresh: &str, ElsDiscoverNodesOnStart: &bl, ElsDiscoverNodeInterval: &tm, ElsCloud: &bl, @@ -1917,6 +1923,7 @@ func TestEescfgloadFromJSONCfg(t *testing.T) { exp := &EventExporterOpts{ CSVFieldSeparator: &str, ElsIndex: &str, + ElsRefresh: &str, ElsDiscoverNodesOnStart: &bl, ElsDiscoverNodeInterval: &tm, ElsCloud: &bl, diff --git a/efs/failed_ees.go b/efs/failed_ees.go index 888c365ad..163a5b639 100644 --- a/efs/failed_ees.go +++ b/efs/failed_ees.go @@ -52,6 +52,9 @@ func AsOptsEESConfig(opts map[string]any) (*config.EventExporterOpts, error) { if _, has := opts[utils.ElsIndex]; has { optsCfg.ElsIndex = utils.StringPointer(utils.IfaceAsString(utils.ElsIndex)) } + if _, has := opts[utils.ElsRefresh]; has { + optsCfg.ElsRefresh = utils.StringPointer(utils.IfaceAsString(utils.ElsRefresh)) + } if _, has := opts[utils.ElsOpType]; has { optsCfg.ElsOpType = utils.StringPointer(utils.IfaceAsString(utils.ElsOpType)) } diff --git a/utils/consts.go b/utils/consts.go index 89b7c913d..4d8a67eaa 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2632,6 +2632,7 @@ const ( // EEs Elasticsearch options ElsIndex = "elsIndex" + ElsRefresh = "elsRefresh" ElsOpType = "elsOpType" ElsPipeline = "elsPipeline" ElsRouting = "elsRouting"