Make elasticsearch refresh opt configurable

This commit is contained in:
ionutboangiu
2024-11-27 18:41:17 +02:00
committed by Dan Christian Bogos
parent f1cdf920df
commit 8d7628410e
8 changed files with 42 additions and 12 deletions

View File

@@ -541,6 +541,7 @@ const CGRATES_CFG_JSON = `
// "elsIndex": "", // ElsIndex
// "elsRefresh": "false", // controls when changes become visible <true|false|wait_for>
// "elsOpType": "", // ElsOpType
// "elsPipeline": "", // ElsPipeline
// "elsRouting": "", // ElsRouting
@@ -549,9 +550,9 @@ const CGRATES_CFG_JSON = `
// SQL
// "sqlMaxIdleConns": 0, // SQLMaxIdleConns
// "sqlMaxIdleConns": 0, // SQLMaxIdleConns
// "sqlMaxOpenConns": 0, // SQLMaxOpenConns
// "sqlConnMaxLifetime": "0", // SQLConnMaxLifetime
// "sqlConnMaxLifetime": "0", // SQLConnMaxLifetime
// "mysqlDSNParams": {}, // DSN params

View File

@@ -163,7 +163,16 @@ func (eeS *EEsCfg) exporterIDs() []string {
}
type ElsOpts struct {
Index *string
// index request opts
Index *string
Refresh *string
OpType *string
Pipeline *string
Routing *string
Timeout *time.Duration
WaitForActiveShards *string
// elasticsearch client opts
DiscoverNodesOnStart *bool
DiscoverNodeInterval *time.Duration
Cloud *bool
@@ -179,11 +188,6 @@ type ElsOpts struct {
RetryOnStatus *[]int
MaxRetries *int
DisableRetry *bool
OpType *string
Pipeline *string
Routing *string
Timeout *time.Duration
WaitForActiveShards *string
}
type SQLOpts struct {
@@ -347,6 +351,9 @@ func (elsOpts *ElsOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err erro
if jsnCfg.ElsIndex != nil {
elsOpts.Index = jsnCfg.ElsIndex
}
if jsnCfg.ElsRefresh != nil {
elsOpts.Refresh = jsnCfg.ElsRefresh
}
if jsnCfg.ElsOpType != nil {
elsOpts.OpType = jsnCfg.ElsOpType
}
@@ -672,6 +679,10 @@ func (elsOpts *ElsOpts) Clone() *ElsOpts {
cln.Index = new(string)
*cln.Index = *elsOpts.Index
}
if elsOpts.Refresh != nil {
cln.Refresh = new(string)
*cln.Refresh = *elsOpts.Refresh
}
if elsOpts.OpType != nil {
cln.OpType = new(string)
*cln.OpType = *elsOpts.OpType
@@ -981,6 +992,9 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str
if elsOpts.Index != nil {
opts[utils.ElsIndex] = *elsOpts.Index
}
if elsOpts.Refresh != nil {
opts[utils.ElsRefresh] = *elsOpts.Refresh
}
if elsOpts.OpType != nil {
opts[utils.ElsOpType] = *elsOpts.OpType
}

View File

@@ -46,6 +46,7 @@ func TestEESClone(t *testing.T) {
"allowNativePasswords": "true",
},
"elsIndex":"test",
"elsRefresh":"true",
"elsOpType":"test2",
"elsPipeline":"test3",
"elsRouting":"test4",
@@ -240,6 +241,7 @@ func TestEESClone(t *testing.T) {
},
Els: &ElsOpts{
Index: utils.StringPointer("test"),
Refresh: utils.StringPointer("true"),
OpType: utils.StringPointer("test2"),
Pipeline: utils.StringPointer("test3"),
Routing: utils.StringPointer("test4"),
@@ -368,6 +370,7 @@ func TestEventExporterOptsloadFromJsonCfg(t *testing.T) {
eventExporterOptsJSON := &EventExporterOptsJson{
ElsIndex: utils.StringPointer("test"),
ElsRefresh: utils.StringPointer("true"),
ElsOpType: utils.StringPointer("test2"),
ElsPipeline: utils.StringPointer("test3"),
ElsRouting: utils.StringPointer("test4"),
@@ -394,6 +397,7 @@ func TestEventExporterOptsloadFromJsonCfg(t *testing.T) {
expected := &EventExporterOpts{
Els: &ElsOpts{
Index: utils.StringPointer("test"),
Refresh: utils.StringPointer("true"),
OpType: utils.StringPointer("test2"),
Pipeline: utils.StringPointer("test3"),
Routing: utils.StringPointer("test4"),
@@ -882,6 +886,7 @@ func TestEEsCfgAsMapInterface(t *testing.T) {
"export_path": "/tmp/testCSV",
"opts": {
"elsIndex":"test",
"elsRefresh": "true",
"kafkaTopic": "test",
"elsOpType":"test2",
"elsPipeline":"test3",
@@ -965,6 +970,7 @@ func TestEEsCfgAsMapInterface(t *testing.T) {
utils.OptsCfg: map[string]any{
utils.KafkaTopic: "test",
utils.ElsIndex: "test",
utils.ElsRefresh: "true",
utils.ElsOpType: "test2",
utils.ElsPipeline: "test3",
utils.ElsRouting: "test4",
@@ -1120,6 +1126,7 @@ func TestEEsCfgloadFromJSONCfg(t *testing.T) {
ElsMaxRetries: &nm,
ElsDisableRetry: &bl,
ElsIndex: &str,
ElsRefresh: &str,
ElsOpType: &str,
ElsPipeline: &str,
ElsRouting: &str,
@@ -1167,6 +1174,7 @@ func TestEEsCfgloadFromJSONCfg(t *testing.T) {
}
exp := &ElsOpts{
Index: &str,
Refresh: &str,
DiscoverNodesOnStart: &bl,
DiscoverNodeInterval: &tm,
Cloud: &bl,

View File

@@ -317,6 +317,7 @@ type EventExporterOptsJson struct {
ElsMaxRetries *int `json:"elsMaxRetries"`
ElsDisableRetry *bool `json:"elsDisableRetry"`
ElsIndex *string `json:"elsIndex"`
ElsRefresh *string `json:"elsRefresh"`
ElsOpType *string `json:"elsOpType"`
ElsPipeline *string `json:"elsPipeline"`
ElsRouting *string `json:"elsRouting"`

View File

@@ -512,6 +512,7 @@
// // "elsIndex": "", // ElsIndex
// // "elsRefresh": "", // ElsRefresh
// // "elsOpType": "", // ElsOpType
// // "elsPipeline": "", // ElsPipeline
// // "elsRouting": "", // ElsRouting

View File

@@ -16,7 +16,8 @@
"synchronous": true,
"failed_posts_dir": "*none",
"opts": {
"elsIndex": "cdrs_basic"
"elsIndex": "cdrs_basic",
"elsRefresh": "true"
}
},
{
@@ -26,7 +27,8 @@
"synchronous": true,
"failed_posts_dir": "*none",
"opts": {
"elsIndex": "cdrs_fields"
"elsIndex": "cdrs_fields",
"elsRefresh": "true"
},
"fields": [
{ "tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID" },

View File

@@ -172,9 +172,11 @@ func (e *ElasticEE) ExportEvent(event any, key string) error {
}
req := e.client.Index(indexName).
Id(key).
Request(event).
Refresh(refresh.True)
Request(event)
if opts.Refresh != nil {
req.Refresh(refresh.Refresh{Name: *opts.Refresh})
}
if opts.OpType != nil {
req.OpType(optype.OpType{Name: *opts.OpType})
}

View File

@@ -2856,6 +2856,7 @@ const (
// EEs Elasticsearch options
ElsIndex = "elsIndex"
ElsRefresh = "elsRefresh"
ElsOpType = "elsOpType"
ElsPipeline = "elsPipeline"
ElsRouting = "elsRouting"