From 8d7628410e9ffe916d20c4ea178d0d5cba4bfdd2 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 27 Nov 2024 18:41:17 +0200 Subject: [PATCH] Make elasticsearch refresh opt configurable --- config/config_defaults.go | 5 +++-- config/eescfg.go | 26 +++++++++++++++++----- config/eescfg_test.go | 8 +++++++ config/libconfig_json.go | 1 + data/conf/cgrates/cgrates.json | 1 + data/conf/samples/ees_elastic/cgrates.json | 6 +++-- ees/elastic.go | 6 +++-- utils/consts.go | 1 + 8 files changed, 42 insertions(+), 12 deletions(-) diff --git a/config/config_defaults.go b/config/config_defaults.go index 8d7961349..a2fc026ca 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -541,6 +541,7 @@ const CGRATES_CFG_JSON = ` // "elsIndex": "", // ElsIndex + // "elsRefresh": "false", // controls when changes become visible // "elsOpType": "", // ElsOpType // "elsPipeline": "", // ElsPipeline // "elsRouting": "", // ElsRouting @@ -549,9 +550,9 @@ const CGRATES_CFG_JSON = ` // SQL - // "sqlMaxIdleConns": 0, // SQLMaxIdleConns + // "sqlMaxIdleConns": 0, // SQLMaxIdleConns // "sqlMaxOpenConns": 0, // SQLMaxOpenConns - // "sqlConnMaxLifetime": "0", // SQLConnMaxLifetime + // "sqlConnMaxLifetime": "0", // SQLConnMaxLifetime // "mysqlDSNParams": {}, // DSN params diff --git a/config/eescfg.go b/config/eescfg.go index 6a9de5599..f57145f3c 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -163,7 +163,16 @@ func (eeS *EEsCfg) exporterIDs() []string { } type ElsOpts struct { - Index *string + // index request opts + Index *string + Refresh *string + OpType *string + Pipeline *string + Routing *string + Timeout *time.Duration + WaitForActiveShards *string + + // elasticsearch client opts DiscoverNodesOnStart *bool DiscoverNodeInterval *time.Duration Cloud *bool @@ -179,11 +188,6 @@ type ElsOpts struct { RetryOnStatus *[]int MaxRetries *int DisableRetry *bool - OpType *string - Pipeline *string - Routing *string - Timeout *time.Duration - WaitForActiveShards *string } type SQLOpts struct { @@ -347,6 +351,9 @@ func (elsOpts *ElsOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err erro if jsnCfg.ElsIndex != nil { elsOpts.Index = jsnCfg.ElsIndex } + if jsnCfg.ElsRefresh != nil { + elsOpts.Refresh = jsnCfg.ElsRefresh + } if jsnCfg.ElsOpType != nil { elsOpts.OpType = jsnCfg.ElsOpType } @@ -672,6 +679,10 @@ func (elsOpts *ElsOpts) Clone() *ElsOpts { cln.Index = new(string) *cln.Index = *elsOpts.Index } + if elsOpts.Refresh != nil { + cln.Refresh = new(string) + *cln.Refresh = *elsOpts.Refresh + } if elsOpts.OpType != nil { cln.OpType = new(string) *cln.OpType = *elsOpts.OpType @@ -981,6 +992,9 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str if elsOpts.Index != nil { opts[utils.ElsIndex] = *elsOpts.Index } + if elsOpts.Refresh != nil { + opts[utils.ElsRefresh] = *elsOpts.Refresh + } if elsOpts.OpType != nil { opts[utils.ElsOpType] = *elsOpts.OpType } diff --git a/config/eescfg_test.go b/config/eescfg_test.go index 6cad489cb..2d345cedd 100644 --- a/config/eescfg_test.go +++ b/config/eescfg_test.go @@ -46,6 +46,7 @@ func TestEESClone(t *testing.T) { "allowNativePasswords": "true", }, "elsIndex":"test", + "elsRefresh":"true", "elsOpType":"test2", "elsPipeline":"test3", "elsRouting":"test4", @@ -240,6 +241,7 @@ func TestEESClone(t *testing.T) { }, Els: &ElsOpts{ Index: utils.StringPointer("test"), + Refresh: utils.StringPointer("true"), OpType: utils.StringPointer("test2"), Pipeline: utils.StringPointer("test3"), Routing: utils.StringPointer("test4"), @@ -368,6 +370,7 @@ func TestEventExporterOptsloadFromJsonCfg(t *testing.T) { eventExporterOptsJSON := &EventExporterOptsJson{ ElsIndex: utils.StringPointer("test"), + ElsRefresh: utils.StringPointer("true"), ElsOpType: utils.StringPointer("test2"), ElsPipeline: utils.StringPointer("test3"), ElsRouting: utils.StringPointer("test4"), @@ -394,6 +397,7 @@ func TestEventExporterOptsloadFromJsonCfg(t *testing.T) { expected := &EventExporterOpts{ Els: &ElsOpts{ Index: utils.StringPointer("test"), + Refresh: utils.StringPointer("true"), OpType: utils.StringPointer("test2"), Pipeline: utils.StringPointer("test3"), Routing: utils.StringPointer("test4"), @@ -882,6 +886,7 @@ func TestEEsCfgAsMapInterface(t *testing.T) { "export_path": "/tmp/testCSV", "opts": { "elsIndex":"test", + "elsRefresh": "true", "kafkaTopic": "test", "elsOpType":"test2", "elsPipeline":"test3", @@ -965,6 +970,7 @@ func TestEEsCfgAsMapInterface(t *testing.T) { utils.OptsCfg: map[string]any{ utils.KafkaTopic: "test", utils.ElsIndex: "test", + utils.ElsRefresh: "true", utils.ElsOpType: "test2", utils.ElsPipeline: "test3", utils.ElsRouting: "test4", @@ -1120,6 +1126,7 @@ func TestEEsCfgloadFromJSONCfg(t *testing.T) { ElsMaxRetries: &nm, ElsDisableRetry: &bl, ElsIndex: &str, + ElsRefresh: &str, ElsOpType: &str, ElsPipeline: &str, ElsRouting: &str, @@ -1167,6 +1174,7 @@ func TestEEsCfgloadFromJSONCfg(t *testing.T) { } exp := &ElsOpts{ Index: &str, + Refresh: &str, DiscoverNodesOnStart: &bl, DiscoverNodeInterval: &tm, Cloud: &bl, diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 80c217709..21629a778 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -317,6 +317,7 @@ type EventExporterOptsJson struct { ElsMaxRetries *int `json:"elsMaxRetries"` ElsDisableRetry *bool `json:"elsDisableRetry"` ElsIndex *string `json:"elsIndex"` + ElsRefresh *string `json:"elsRefresh"` ElsOpType *string `json:"elsOpType"` ElsPipeline *string `json:"elsPipeline"` ElsRouting *string `json:"elsRouting"` diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index 2371321c7..9bdeed0bd 100755 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -512,6 +512,7 @@ // // "elsIndex": "", // ElsIndex +// // "elsRefresh": "", // ElsRefresh // // "elsOpType": "", // ElsOpType // // "elsPipeline": "", // ElsPipeline // // "elsRouting": "", // ElsRouting diff --git a/data/conf/samples/ees_elastic/cgrates.json b/data/conf/samples/ees_elastic/cgrates.json index c60687e87..ee8cf1e1b 100644 --- a/data/conf/samples/ees_elastic/cgrates.json +++ b/data/conf/samples/ees_elastic/cgrates.json @@ -16,7 +16,8 @@ "synchronous": true, "failed_posts_dir": "*none", "opts": { - "elsIndex": "cdrs_basic" + "elsIndex": "cdrs_basic", + "elsRefresh": "true" } }, { @@ -26,7 +27,8 @@ "synchronous": true, "failed_posts_dir": "*none", "opts": { - "elsIndex": "cdrs_fields" + "elsIndex": "cdrs_fields", + "elsRefresh": "true" }, "fields": [ { "tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID" }, diff --git a/ees/elastic.go b/ees/elastic.go index a2c959866..a2560bb53 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -172,9 +172,11 @@ func (e *ElasticEE) ExportEvent(event any, key string) error { } req := e.client.Index(indexName). Id(key). - Request(event). - Refresh(refresh.True) + Request(event) + if opts.Refresh != nil { + req.Refresh(refresh.Refresh{Name: *opts.Refresh}) + } if opts.OpType != nil { req.OpType(optype.OpType{Name: *opts.OpType}) } diff --git a/utils/consts.go b/utils/consts.go index d5a2f3179..8700c57c5 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2856,6 +2856,7 @@ const ( // EEs Elasticsearch options ElsIndex = "elsIndex" + ElsRefresh = "elsRefresh" ElsOpType = "elsOpType" ElsPipeline = "elsPipeline" ElsRouting = "elsRouting"