diff --git a/config/config_defaults.go b/config/config_defaults.go index 52beb258e..cdc51c79a 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -541,7 +541,8 @@ const CGRATES_CFG_JSON = ` // Kafka - // "kafkaTopic": "cgrates", // the topic from where the events are exported + // "kafkaTopic": "cgrates_cdrs", // the topic from where the events are exported + // "kafkaBatchSize": 100, // limit on how many messages will be buffered before being sent // "kafkaTLS": false, // if true, it will try to authenticate the server // "kafkaCAPath": "", // path to certificate authority pem // "kafkaSkipTLSVerify: false, // if true it will skip certificate verification diff --git a/config/eescfg.go b/config/eescfg.go index e9e968eb1..cfed87491 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -243,6 +243,7 @@ type RPCOpts struct { type KafkaOpts struct { Topic *string + BatchSize *int TLS *bool CAPath *string SkipTLSVerify *bool @@ -386,6 +387,9 @@ func (kafkaOpts *KafkaOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err if jsnCfg.KafkaTopic != nil { kafkaOpts.Topic = jsnCfg.KafkaTopic } + if jsnCfg.KafkaBatchSize != nil { + kafkaOpts.BatchSize = jsnCfg.KafkaBatchSize + } if jsnCfg.KafkaTLS != nil { kafkaOpts.TLS = jsnCfg.KafkaTLS } @@ -727,6 +731,10 @@ func (kafkaOpts *KafkaOpts) Clone() *KafkaOpts { cln.Topic = new(string) *cln.Topic = *kafkaOpts.Topic } + if kafkaOpts.BatchSize != nil { + cln.BatchSize = new(int) + *cln.BatchSize = *kafkaOpts.BatchSize + } if kafkaOpts.TLS != nil { cln.TLS = new(bool) *cln.TLS = *kafkaOpts.TLS @@ -1058,6 +1066,9 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str if kafkaOpts.Topic != nil { opts[utils.KafkaTopic] = *kafkaOpts.Topic } + if kafkaOpts.BatchSize != nil { + opts[utils.KafkaBatchSize] = *kafkaOpts.BatchSize + } if kafkaOpts.TLS != nil { opts[utils.KafkaTLS] = *kafkaOpts.TLS } diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 48dbae87c..1c807655c 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -322,6 +322,7 @@ type EventExporterOptsJson struct { SQLDBName *string `json:"sqlDBName"` PgSSLMode *string `json:"pgSSLMode"` KafkaTopic *string `json:"kafkaTopic"` + KafkaBatchSize *int `json:"kafkaBatchSize"` KafkaTLS *bool `json:"kafkaTLS"` KafkaCAPath *string `json:"kafkaCAPath"` KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"` diff --git a/ees/kafka.go b/ees/kafka.go index 8ada5d6a6..199bfe589 100644 --- a/ees/kafka.go +++ b/ees/kafka.go @@ -88,6 +88,10 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*KafkaE }, } + if opts.BatchSize != nil { + pstr.writer.BatchSize = *opts.BatchSize + } + return pstr, nil } diff --git a/utils/consts.go b/utils/consts.go index b67eba2fe..c1ddb38d2 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2719,6 +2719,7 @@ const ( KafkaDefaultMaxWait = time.Millisecond KafkaTopic = "kafkaTopic" + KafkaBatchSize = "kafkaBatchSize" KafkaTLS = "kafkaTLS" KafkaCAPath = "kafkaCAPath" KafkaSkipTLSVerify = "kafkaSkipTLSVerify"