Add kafkaBatchSize configuration field

Avoids the default 1 second delay when the batch doesn't
reach 100 messages within that time.

Useful when the Kafka exporter is not cached, as it would
otherwise encounter that delay. Setting BatchSize to 1
prevents this.
This commit is contained in:
ionutboangiu
2024-10-23 18:04:09 +03:00
committed by Dan Christian Bogos
parent a6529e7879
commit c69d5afc56
4 changed files with 31 additions and 0 deletions

View File

@@ -451,6 +451,7 @@ const CGRATES_CFG_JSON = `
// Kafka
// "kafkaTopic": "cgrates", // the topic from were the events are read
// "kafkaBatchSize": 100, // limit on how many messages will be buffered before being sent
// "kafkaGroupID": "cgrates", // the group that reads the events
// "kafkaMaxWait": "1ms", // the maximum amount of time to wait for new data to come
// "kafkaTLS": false, // if set to true it will try to authenticate the server

View File

@@ -189,6 +189,7 @@ type EventExporterOpts struct {
SQLDBName *string
PgSSLMode *string
KafkaTopic *string
KafkaBatchSize *int
KafkaTLS *bool
KafkaCAPath *string
KafkaSkipTLSVerify *bool
@@ -383,6 +384,9 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson)
if jsnCfg.KafkaTopic != nil {
eeOpts.KafkaTopic = jsnCfg.KafkaTopic
}
if jsnCfg.KafkaBatchSize != nil {
eeOpts.KafkaBatchSize = jsnCfg.KafkaBatchSize
}
if jsnCfg.KafkaTLS != nil {
eeOpts.KafkaTLS = jsnCfg.KafkaTLS
}
@@ -675,6 +679,10 @@ func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts {
cln.KafkaTopic = new(string)
*cln.KafkaTopic = *eeOpts.KafkaTopic
}
if eeOpts.KafkaBatchSize != nil {
cln.KafkaBatchSize = new(int)
*cln.KafkaBatchSize = *eeOpts.KafkaBatchSize
}
if eeOpts.KafkaTLS != nil {
cln.KafkaTLS = new(bool)
*cln.KafkaTLS = *eeOpts.KafkaTLS
@@ -954,6 +962,9 @@ func (optsEes *EventExporterOpts) AsMapInterface() map[string]any {
if optsEes.KafkaTopic != nil {
opts[utils.KafkaTopic] = *optsEes.KafkaTopic
}
if optsEes.KafkaBatchSize != nil {
opts[utils.KafkaBatchSize] = *optsEes.KafkaBatchSize
}
if optsEes.KafkaTLS != nil {
opts[utils.KafkaTLS] = *optsEes.KafkaTLS
}
@@ -1094,6 +1105,7 @@ type EventExporterOptsJson struct {
SQLDBName *string `json:"sqlDBName"`
PgSSLMode *string `json:"pgSSLMode"`
KafkaTopic *string `json:"kafkaTopic"`
KafkaBatchSize *int `json:"kafkaBatchSize"`
KafkaTLS *bool `json:"kafkaTLS"`
KafkaCAPath *string `json:"kafkaCAPath"`
KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"`
@@ -1305,6 +1317,14 @@ func diffEventExporterOptsJsonCfg(d *EventExporterOptsJson, v1, v2 *EventExporte
} else {
d.KafkaTopic = nil
}
if v2.KafkaBatchSize != nil {
if v1.KafkaBatchSize == nil ||
*v1.KafkaBatchSize != *v2.KafkaBatchSize {
d.KafkaBatchSize = v2.KafkaBatchSize
}
} else {
d.KafkaBatchSize = nil
}
if v2.KafkaTLS != nil {
if v1.KafkaTLS == nil ||
*v1.KafkaTLS != *v2.KafkaTLS {

View File

@@ -1337,6 +1337,7 @@ func TestEventExporterOptsClone(t *testing.T) {
SQLDBName: utils.StringPointer("cgrates"),
PgSSLMode: utils.StringPointer("sslm"),
KafkaTopic: utils.StringPointer("topic1"),
KafkaBatchSize: utils.IntPointer(50),
KafkaCAPath: utils.StringPointer("kafkaCAPath"),
KafkaSkipTLSVerify: utils.BoolPointer(false),
AMQPRoutingKey: utils.StringPointer("routing_key"),
@@ -1391,6 +1392,7 @@ func TestEventExporterOptsClone(t *testing.T) {
SQLDBName: utils.StringPointer("cgrates"),
PgSSLMode: utils.StringPointer("sslm"),
KafkaTopic: utils.StringPointer("topic1"),
KafkaBatchSize: utils.IntPointer(50),
KafkaCAPath: utils.StringPointer("kafkaCAPath"),
KafkaSkipTLSVerify: utils.BoolPointer(false),
AMQPRoutingKey: utils.StringPointer("routing_key"),
@@ -1453,6 +1455,7 @@ func TestLoadFromJSONCfg(t *testing.T) {
SQLDBName: utils.StringPointer("cgrates"),
PgSSLMode: utils.StringPointer("sslm"),
KafkaTopic: utils.StringPointer("topic1"),
KafkaBatchSize: utils.IntPointer(50),
KafkaCAPath: utils.StringPointer("kafkaCAPath"),
KafkaSkipTLSVerify: utils.BoolPointer(false),
AMQPRoutingKey: utils.StringPointer("routing_key"),
@@ -1506,6 +1509,7 @@ func TestLoadFromJSONCfg(t *testing.T) {
SQLDBName: utils.StringPointer("cgrates"),
PgSSLMode: utils.StringPointer("sslm"),
KafkaTopic: utils.StringPointer("topic1"),
KafkaBatchSize: utils.IntPointer(50),
KafkaCAPath: utils.StringPointer("kafkaCAPath"),
KafkaSkipTLSVerify: utils.BoolPointer(false),
AMQPRoutingKey: utils.StringPointer("routing_key"),
@@ -1666,6 +1670,7 @@ func TestEEsAsMapInterface(t *testing.T) {
SQLDBName: utils.StringPointer("cgrates"),
PgSSLMode: utils.StringPointer("sslm"),
KafkaTopic: utils.StringPointer("topic1"),
KafkaBatchSize: utils.IntPointer(50),
KafkaCAPath: utils.StringPointer("kafkaCAPath"),
KafkaSkipTLSVerify: utils.BoolPointer(false),
AMQPRoutingKey: utils.StringPointer("routing_key"),
@@ -1726,6 +1731,7 @@ func TestEEsAsMapInterface(t *testing.T) {
"elsVersionType": "version_type",
"elsWaitForActiveShards": "wfas",
"kafkaTopic": "topic1",
utils.KafkaBatchSize: 50,
"kafkaCAPath": "kafkaCAPath",
"kafkaSkipTLSVerify": false,
"keyPath": "/path/to/key",
@@ -1803,6 +1809,7 @@ func TestEescfgNewEventExporterCfg(t *testing.T) {
SQLDBName: &str,
PgSSLMode: &str,
KafkaTopic: &str,
KafkaBatchSize: &nm,
KafkaTLS: &bl,
KafkaCAPath: &str,
KafkaSkipTLSVerify: &bl,
@@ -1909,6 +1916,7 @@ func TestEescfgloadFromJSONCfg(t *testing.T) {
SQLDBName: &str,
PgSSLMode: &str,
KafkaTopic: &str,
KafkaBatchSize: &nm,
KafkaTLS: &bl,
KafkaCAPath: &str,
KafkaSkipTLSVerify: &bl,
@@ -1985,6 +1993,7 @@ func TestEescfgloadFromJSONCfg(t *testing.T) {
SQLDBName: &str,
PgSSLMode: &str,
KafkaTopic: &str,
KafkaBatchSize: &nm,
KafkaTLS: &bl,
KafkaCAPath: &str,
KafkaSkipTLSVerify: &bl,

View File

@@ -2599,6 +2599,7 @@ const (
KafkaDefaultMaxWait = time.Millisecond
KafkaTopic = "kafkaTopic"
KafkaBatchSize = "kafkaBatchSize"
KafkaTLS = "kafkaTLS"
KafkaCAPath = "kafkaCAPath"
KafkaSkipTLSVerify = "kafkaSkipTLSVerify"