Add kafkaBatchSize configuration field

Avoids the default 1 second delay when the batch doesn't
reach 100 messages within that time.

Useful when the Kafka exporter is not cached, as it would
otherwise encounter that delay. Setting BatchSize to 1
prevents this.
This commit is contained in:
ionutboangiu
2024-06-30 10:01:10 +03:00
committed by Dan Christian Bogos
parent 9c94fbfe58
commit 11b96de00a
5 changed files with 19 additions and 1 deletions

View File

@@ -541,7 +541,8 @@ const CGRATES_CFG_JSON = `
// Kafka
// "kafkaTopic": "cgrates", // the topic from where the events are exported
// "kafkaTopic": "cgrates_cdrs", // the topic from where the events are exported
// "kafkaBatchSize": 100, // limit on how many messages will be buffered before being sent
// "kafkaTLS": false, // if true, it will try to authenticate the server
// "kafkaCAPath": "", // path to certificate authority pem
// "kafkaSkipTLSVerify: false, // if true it will skip certificate verification

View File

@@ -243,6 +243,7 @@ type RPCOpts struct {
type KafkaOpts struct {
Topic *string
BatchSize *int
TLS *bool
CAPath *string
SkipTLSVerify *bool
@@ -386,6 +387,9 @@ func (kafkaOpts *KafkaOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err
if jsnCfg.KafkaTopic != nil {
kafkaOpts.Topic = jsnCfg.KafkaTopic
}
if jsnCfg.KafkaBatchSize != nil {
kafkaOpts.BatchSize = jsnCfg.KafkaBatchSize
}
if jsnCfg.KafkaTLS != nil {
kafkaOpts.TLS = jsnCfg.KafkaTLS
}
@@ -727,6 +731,10 @@ func (kafkaOpts *KafkaOpts) Clone() *KafkaOpts {
cln.Topic = new(string)
*cln.Topic = *kafkaOpts.Topic
}
if kafkaOpts.BatchSize != nil {
cln.BatchSize = new(int)
*cln.BatchSize = *kafkaOpts.BatchSize
}
if kafkaOpts.TLS != nil {
cln.TLS = new(bool)
*cln.TLS = *kafkaOpts.TLS
@@ -1058,6 +1066,9 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str
if kafkaOpts.Topic != nil {
opts[utils.KafkaTopic] = *kafkaOpts.Topic
}
if kafkaOpts.BatchSize != nil {
opts[utils.KafkaBatchSize] = *kafkaOpts.BatchSize
}
if kafkaOpts.TLS != nil {
opts[utils.KafkaTLS] = *kafkaOpts.TLS
}

View File

@@ -322,6 +322,7 @@ type EventExporterOptsJson struct {
SQLDBName *string `json:"sqlDBName"`
PgSSLMode *string `json:"pgSSLMode"`
KafkaTopic *string `json:"kafkaTopic"`
KafkaBatchSize *int `json:"kafkaBatchSize"`
KafkaTLS *bool `json:"kafkaTLS"`
KafkaCAPath *string `json:"kafkaCAPath"`
KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"`

View File

@@ -88,6 +88,10 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*KafkaE
},
}
if opts.BatchSize != nil {
pstr.writer.BatchSize = *opts.BatchSize
}
return pstr, nil
}

View File

@@ -2719,6 +2719,7 @@ const (
KafkaDefaultMaxWait = time.Millisecond
KafkaTopic = "kafkaTopic"
KafkaBatchSize = "kafkaBatchSize"
KafkaTLS = "kafkaTLS"
KafkaCAPath = "kafkaCAPath"
KafkaSkipTLSVerify = "kafkaSkipTLSVerify"