diff --git a/config/config_defaults.go b/config/config_defaults.go index db58cb22a..5ffcf0f99 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -451,6 +451,7 @@ const CGRATES_CFG_JSON = ` // Kafka // "kafkaTopic": "cgrates", // the topic from were the events are read + // "kafkaBatchSize": 100, // limit on how many messages will be buffered before being sent // "kafkaGroupID": "cgrates", // the group that reads the events // "kafkaMaxWait": "1ms", // the maximum amount of time to wait for new data to come // "kafkaTLS": false, // if set to true it will try to authenticate the server diff --git a/config/eescfg.go b/config/eescfg.go index 8605b8783..a06ad539b 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -189,6 +189,7 @@ type EventExporterOpts struct { SQLDBName *string PgSSLMode *string KafkaTopic *string + KafkaBatchSize *int KafkaTLS *bool KafkaCAPath *string KafkaSkipTLSVerify *bool @@ -383,6 +384,9 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) if jsnCfg.KafkaTopic != nil { eeOpts.KafkaTopic = jsnCfg.KafkaTopic } + if jsnCfg.KafkaBatchSize != nil { + eeOpts.KafkaBatchSize = jsnCfg.KafkaBatchSize + } if jsnCfg.KafkaTLS != nil { eeOpts.KafkaTLS = jsnCfg.KafkaTLS } @@ -675,6 +679,10 @@ func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts { cln.KafkaTopic = new(string) *cln.KafkaTopic = *eeOpts.KafkaTopic } + if eeOpts.KafkaBatchSize != nil { + cln.KafkaBatchSize = new(int) + *cln.KafkaBatchSize = *eeOpts.KafkaBatchSize + } if eeOpts.KafkaTLS != nil { cln.KafkaTLS = new(bool) *cln.KafkaTLS = *eeOpts.KafkaTLS @@ -954,6 +962,9 @@ func (optsEes *EventExporterOpts) AsMapInterface() map[string]any { if optsEes.KafkaTopic != nil { opts[utils.KafkaTopic] = *optsEes.KafkaTopic } + if optsEes.KafkaBatchSize != nil { + opts[utils.KafkaBatchSize] = *optsEes.KafkaBatchSize + } if optsEes.KafkaTLS != nil { opts[utils.KafkaTLS] = *optsEes.KafkaTLS } @@ -1094,6 +1105,7 @@ type EventExporterOptsJson struct { SQLDBName *string `json:"sqlDBName"` PgSSLMode *string `json:"pgSSLMode"` KafkaTopic *string `json:"kafkaTopic"` + KafkaBatchSize *int `json:"kafkaBatchSize"` KafkaTLS *bool `json:"kafkaTLS"` KafkaCAPath *string `json:"kafkaCAPath"` KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"` @@ -1305,6 +1317,14 @@ func diffEventExporterOptsJsonCfg(d *EventExporterOptsJson, v1, v2 *EventExporte } else { d.KafkaTopic = nil } + if v2.KafkaBatchSize != nil { + if v1.KafkaBatchSize == nil || + *v1.KafkaBatchSize != *v2.KafkaBatchSize { + d.KafkaBatchSize = v2.KafkaBatchSize + } + } else { + d.KafkaBatchSize = nil + } if v2.KafkaTLS != nil { if v1.KafkaTLS == nil || *v1.KafkaTLS != *v2.KafkaTLS { diff --git a/config/eescfg_test.go b/config/eescfg_test.go index dde2ca3be..8ebcff9f8 100644 --- a/config/eescfg_test.go +++ b/config/eescfg_test.go @@ -1337,6 +1337,7 @@ func TestEventExporterOptsClone(t *testing.T) { SQLDBName: utils.StringPointer("cgrates"), PgSSLMode: utils.StringPointer("sslm"), KafkaTopic: utils.StringPointer("topic1"), + KafkaBatchSize: utils.IntPointer(50), KafkaCAPath: utils.StringPointer("kafkaCAPath"), KafkaSkipTLSVerify: utils.BoolPointer(false), AMQPRoutingKey: utils.StringPointer("routing_key"), @@ -1391,6 +1392,7 @@ func TestEventExporterOptsClone(t *testing.T) { SQLDBName: utils.StringPointer("cgrates"), PgSSLMode: utils.StringPointer("sslm"), KafkaTopic: utils.StringPointer("topic1"), + KafkaBatchSize: utils.IntPointer(50), KafkaCAPath: utils.StringPointer("kafkaCAPath"), KafkaSkipTLSVerify: utils.BoolPointer(false), AMQPRoutingKey: utils.StringPointer("routing_key"), @@ -1453,6 +1455,7 @@ func TestLoadFromJSONCfg(t *testing.T) { SQLDBName: utils.StringPointer("cgrates"), PgSSLMode: utils.StringPointer("sslm"), KafkaTopic: utils.StringPointer("topic1"), + KafkaBatchSize: utils.IntPointer(50), KafkaCAPath: utils.StringPointer("kafkaCAPath"), KafkaSkipTLSVerify: utils.BoolPointer(false), AMQPRoutingKey: utils.StringPointer("routing_key"), @@ -1506,6 +1509,7 @@ func TestLoadFromJSONCfg(t *testing.T) { SQLDBName: utils.StringPointer("cgrates"), PgSSLMode: utils.StringPointer("sslm"), KafkaTopic: utils.StringPointer("topic1"), + KafkaBatchSize: utils.IntPointer(50), KafkaCAPath: utils.StringPointer("kafkaCAPath"), KafkaSkipTLSVerify: utils.BoolPointer(false), AMQPRoutingKey: utils.StringPointer("routing_key"), @@ -1666,6 +1670,7 @@ func TestEEsAsMapInterface(t *testing.T) { SQLDBName: utils.StringPointer("cgrates"), PgSSLMode: utils.StringPointer("sslm"), KafkaTopic: utils.StringPointer("topic1"), + KafkaBatchSize: utils.IntPointer(50), KafkaCAPath: utils.StringPointer("kafkaCAPath"), KafkaSkipTLSVerify: utils.BoolPointer(false), AMQPRoutingKey: utils.StringPointer("routing_key"), @@ -1726,6 +1731,7 @@ func TestEEsAsMapInterface(t *testing.T) { "elsVersionType": "version_type", "elsWaitForActiveShards": "wfas", "kafkaTopic": "topic1", + utils.KafkaBatchSize: 50, "kafkaCAPath": "kafkaCAPath", "kafkaSkipTLSVerify": false, "keyPath": "/path/to/key", @@ -1803,6 +1809,7 @@ func TestEescfgNewEventExporterCfg(t *testing.T) { SQLDBName: &str, PgSSLMode: &str, KafkaTopic: &str, + KafkaBatchSize: &nm, KafkaTLS: &bl, KafkaCAPath: &str, KafkaSkipTLSVerify: &bl, @@ -1909,6 +1916,7 @@ func TestEescfgloadFromJSONCfg(t *testing.T) { SQLDBName: &str, PgSSLMode: &str, KafkaTopic: &str, + KafkaBatchSize: &nm, KafkaTLS: &bl, KafkaCAPath: &str, KafkaSkipTLSVerify: &bl, @@ -1985,6 +1993,7 @@ func TestEescfgloadFromJSONCfg(t *testing.T) { SQLDBName: &str, PgSSLMode: &str, KafkaTopic: &str, + KafkaBatchSize: &nm, KafkaTLS: &bl, KafkaCAPath: &str, KafkaSkipTLSVerify: &bl, diff --git a/utils/consts.go b/utils/consts.go index 263f85513..b17b10c3b 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2599,6 +2599,7 @@ const ( KafkaDefaultMaxWait = time.Millisecond KafkaTopic = "kafkaTopic" + KafkaBatchSize = "kafkaBatchSize" KafkaTLS = "kafkaTLS" KafkaCAPath = "kafkaCAPath" KafkaSkipTLSVerify = "kafkaSkipTLSVerify"