From 02ae2cce79ba9f06106c71bd52c922ed7ae1cb6a Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Sun, 30 Jun 2024 10:01:01 +0300 Subject: [PATCH] Revise Kafka exporter implementation - added extra error return parameter to constructor - failing to parse PEM certificates returns an error (previously nil return) - moved Connect logic inside the constructor* - removed unnecessary fields from poster structure. Use the configuration fields directly instead. - removed mutex from poster structure (kafka writer is thread-safe) - removed nil writer check. Message is exported directly. - shortened receiver name (https://google.github.io/styleguide/go/decisions#receiver-names) *The Kafka Transport is not a connection but a configuration for the connection created during the first export. The connection and its related goroutines stay running until manually closed. --- ees/ee.go | 2 +- ees/kafka.go | 159 ++++++++++++++++++++------------------------- ees/poster_test.go | 22 +++++-- 3 files changed, 88 insertions(+), 95 deletions(-) diff --git a/ees/ee.go b/ees/ee.go index 520fbe2ce..e746a3d1b 100644 --- a/ees/ee.go +++ b/ees/ee.go @@ -69,7 +69,7 @@ func NewEventExporter(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, fi case utils.MetaSQSjsonMap: return NewSQSee(cfg, dc), nil case utils.MetaKafkajsonMap: - return NewKafkaEE(cfg, dc), nil + return NewKafkaEE(cfg, dc) case utils.MetaVirt: return NewVirtualEE(cfg, dc), nil case utils.MetaElastic: diff --git a/ees/kafka.go b/ees/kafka.go index 607c91b96..8ada5d6a6 100644 --- a/ees/kafka.go +++ b/ees/kafka.go @@ -21,10 +21,8 @@ import ( "context" "crypto/tls" "crypto/x509" - "net" + "errors" "os" - "sync" - "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" @@ -32,115 +30,100 @@ import ( ) // NewKafkaEE creates a kafka poster -func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *KafkaEE { - kfkPstr := &KafkaEE{ - cfg: cfg, - dc: dc, - topic: utils.DefaultQueueID, - reqs: newConcReq(cfg.ConcurrentRequests), +func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*KafkaEE, error) { + pstr := &KafkaEE{ + cfg: cfg, + dc: dc, + reqs: newConcReq(cfg.ConcurrentRequests), } - if cfg.Opts.Kafka.Topic != nil { - kfkPstr.topic = *cfg.Opts.Kafka.Topic - } - if cfg.Opts.Kafka.TLS != nil && *cfg.Opts.Kafka.TLS { - kfkPstr.tls = true - } - if cfg.Opts.Kafka.CAPath != nil { - kfkPstr.caPath = *cfg.Opts.Kafka.CAPath - } - if cfg.Opts.Kafka.SkipTLSVerify != nil && *cfg.Opts.Kafka.SkipTLSVerify { - kfkPstr.skipTLSVerify = true - } - return kfkPstr -} -// KafkaEE is a kafka poster -type KafkaEE struct { - topic string // identifier of the CDR queue where we publish - tls bool // if true, it will attempt to authenticate the server - caPath string // path to CA pem file - skipTLSVerify bool // if true, it skips certificate verification - writer *kafka.Writer - - cfg *config.EventExporterCfg - dc *utils.SafeMapStorage - reqs *concReq - sync.RWMutex // protect connection - bytePreparing -} - -func (pstr *KafkaEE) Cfg() *config.EventExporterCfg { return pstr.cfg } - -func (pstr *KafkaEE) Connect() (_ error) { - pstr.Lock() - defer pstr.Unlock() - if pstr.writer == nil { - pstr.writer = &kafka.Writer{ - Addr: kafka.TCP(pstr.Cfg().ExportPath), - Topic: pstr.topic, - - // Leave it to the ExportWithAttempts function - // to handle the connect attempts. - MaxAttempts: 1, - } + opts := cfg.Opts.Kafka + topic := utils.DefaultQueueID + if opts.Topic != nil { + topic = *opts.Topic } - if pstr.tls { + + // Configure TLS if enabled. + var tlsCfg *tls.Config + if opts.TLS != nil && *opts.TLS { rootCAs, err := x509.SystemCertPool() if err != nil { - return + return nil, err } if rootCAs == nil { rootCAs = x509.NewCertPool() } - if pstr.caPath != "" { - ca, err := os.ReadFile(pstr.caPath) + + // Load additional CA certificates if a path is provided. + if opts.CAPath != nil && *opts.CAPath != "" { + ca, err := os.ReadFile(*opts.CAPath) if err != nil { - return + return nil, err } if !rootCAs.AppendCertsFromPEM(ca) { - return + return nil, errors.New("failed to append certificates from PEM file") } } - pstr.writer.Transport = &kafka.Transport{ - Dial: (&net.Dialer{ - Timeout: 3 * time.Second, - DualStack: true, - }).DialContext, - TLS: &tls.Config{ - RootCAs: rootCAs, - InsecureSkipVerify: pstr.skipTLSVerify, - }, + + tlsCfg = &tls.Config{ + RootCAs: rootCAs, + InsecureSkipVerify: opts.SkipTLSVerify != nil && *opts.SkipTLSVerify, } } - return + pstr.writer = &kafka.Writer{ + Addr: kafka.TCP(pstr.Cfg().ExportPath), + Topic: topic, + + // Leave it to the ExportWithAttempts function + // to handle the connect attempts. + MaxAttempts: 1, + + // To handle both TLS and non-TLS connections consistently in the Close() function, + // we always specify Transport, even if empty. This allows us to call + // CloseIdleConnections on our Transport instance, avoiding the need to differentiate + // between TLS and non-TLS connections. + Transport: &kafka.Transport{ + TLS: tlsCfg, + }, + } + + return pstr, nil } -func (pstr *KafkaEE) ExportEvent(content any, key string) (err error) { - pstr.reqs.get() - pstr.RLock() - if pstr.writer == nil { - pstr.RUnlock() - pstr.reqs.done() - return utils.ErrDisconnected - } - err = pstr.writer.WriteMessages(context.Background(), kafka.Message{ +// KafkaEE is a kafka poster +type KafkaEE struct { + writer *kafka.Writer + cfg *config.EventExporterCfg + dc *utils.SafeMapStorage + reqs *concReq + bytePreparing +} + +func (k *KafkaEE) Cfg() *config.EventExporterCfg { return k.cfg } + +func (k *KafkaEE) Connect() error { return nil } + +func (k *KafkaEE) ExportEvent(content any, key string) (err error) { + k.reqs.get() + defer k.reqs.done() + return k.writer.WriteMessages(context.Background(), kafka.Message{ Key: []byte(key), Value: content.([]byte), }) - pstr.RUnlock() - pstr.reqs.done() - return } -func (pstr *KafkaEE) Close() (err error) { - pstr.Lock() - if pstr.writer != nil { - err = pstr.writer.Close() - pstr.writer = nil +func (k *KafkaEE) Close() error { + + // Manually close idle connections to prevent them from running indefinitely + // after the Kafka writer is purged. Without this, goroutines will accumulate + // over time with each new Kafka writer. + tsp, ok := k.writer.Transport.(*kafka.Transport) + if ok { + tsp.CloseIdleConnections() } - pstr.Unlock() - return + + return k.writer.Close() } -func (pstr *KafkaEE) GetMetrics() *utils.SafeMapStorage { return pstr.dc } +func (k *KafkaEE) GetMetrics() *utils.SafeMapStorage { return k.dc } diff --git a/ees/poster_test.go b/ees/poster_test.go index 4578d4740..42b222d09 100644 --- a/ees/poster_test.go +++ b/ees/poster_test.go @@ -24,6 +24,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" + kafka "github.com/segmentio/kafka-go" ) func TestAMQPeeParseURL(t *testing.T) { @@ -60,12 +61,21 @@ func TestKafkaParseURL(t *testing.T) { }, }, } - exp := &KafkaEE{ - cfg: cfg, - topic: "cdr_billing", - reqs: newConcReq(0), + want := &KafkaEE{ + cfg: cfg, + reqs: newConcReq(0), + writer: &kafka.Writer{ + Addr: kafka.TCP("127.0.0.1:9092"), + Topic: "cdr_billing", + MaxAttempts: 1, + Transport: &kafka.Transport{}, + }, } - if kfk := NewKafkaEE(cfg, nil); !reflect.DeepEqual(exp, kfk) { - t.Errorf("Expected: %s ,received: %s", utils.ToJSON(exp), utils.ToJSON(kfk)) + got, err := NewKafkaEE(cfg, nil) + if err != nil { + t.Fatalf("NewKafkaEE() failed unexpectedly: %v", err) + } + if !reflect.DeepEqual(want, got) { + t.Errorf("NewKafkaEE() = %+v, want %+v", got, want) } }