diff --git a/ees/ee.go b/ees/ee.go index c8411403e..fe2489679 100644 --- a/ees/ee.go +++ b/ees/ee.go @@ -71,7 +71,7 @@ func NewEventExporter(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, case utils.MetaSQSjsonMap: return NewSQSee(cfg, dc), nil case utils.MetaKafkajsonMap: - return NewKafkaEE(cfg, dc), nil + return NewKafkaEE(cfg, dc) case utils.MetaVirt: return NewVirtualEE(cfg, dc), nil case utils.MetaElastic: diff --git a/ees/kafka.go b/ees/kafka.go index ec1495f9e..70f926cda 100644 --- a/ees/kafka.go +++ b/ees/kafka.go @@ -15,134 +15,124 @@ GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see */ + package ees import ( "crypto/tls" "crypto/x509" - "net" + "errors" "os" - "sync" - "time" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" - kafka "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go" ) // NewKafkaEE creates a kafka poster -func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *KafkaEE { - kfkPstr := &KafkaEE{ - cfg: cfg, - dc: dc, - topic: utils.DefaultQueueID, - reqs: newConcReq(cfg.ConcurrentRequests), +func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*KafkaEE, error) { + pstr := &KafkaEE{ + cfg: cfg, + dc: dc, + reqs: newConcReq(cfg.ConcurrentRequests), } + + topic := utils.DefaultQueueID if cfg.Opts.KafkaTopic != nil { - kfkPstr.topic = *cfg.Opts.KafkaTopic + topic = *cfg.Opts.KafkaTopic } + + // Configure TLS if enabled. + var tlsCfg *tls.Config if cfg.Opts.KafkaTLS != nil && *cfg.Opts.KafkaTLS { - kfkPstr.TLS = true - } - if cfg.Opts.KafkaCAPath != nil { - kfkPstr.caPath = *cfg.Opts.KafkaCAPath - } - if cfg.Opts.KafkaSkipTLSVerify != nil && *cfg.Opts.KafkaSkipTLSVerify { - kfkPstr.skipTLSVerify = true - } - return kfkPstr -} - -// KafkaEE is a kafka poster -type KafkaEE struct { - topic string // identifier of the CDR queue where we publish - TLS bool // if true, it will attempt to authenticate the server - caPath string // path to CA pem file - skipTLSVerify bool // if true, it skips certificate validation - writer *kafka.Writer - - cfg *config.EventExporterCfg - dc *utils.SafeMapStorage - reqs *concReq - sync.RWMutex // protect connection - bytePreparing -} - -func (pstr *KafkaEE) Cfg() *config.EventExporterCfg { return pstr.cfg } - -func (pstr *KafkaEE) Connect() (err error) { - pstr.Lock() - if pstr.writer == nil { - pstr.writer = &kafka.Writer{ - Addr: kafka.TCP(pstr.Cfg().ExportPath), - Topic: pstr.topic, - MaxAttempts: pstr.Cfg().Attempts, - } - } - if pstr.TLS { - var rootCAs *x509.CertPool - if rootCAs, err = x509.SystemCertPool(); err != nil { - return + rootCAs, err := x509.SystemCertPool() + if err != nil { + return nil, err } if rootCAs == nil { rootCAs = x509.NewCertPool() } - if pstr.caPath != "" { - var ca []byte - if ca, err = os.ReadFile(pstr.caPath); err != nil { - return + + // Load additional CA certificates if a path is provided. + if cfg.Opts.KafkaCAPath != nil && *cfg.Opts.KafkaCAPath != "" { + ca, err := os.ReadFile(*cfg.Opts.KafkaCAPath) + if err != nil { + return nil, err } if !rootCAs.AppendCertsFromPEM(ca) { - return + return nil, errors.New("failed to append certificates from PEM file") } } - pstr.writer.Transport = &kafka.Transport{ - Dial: (&net.Dialer{ - Timeout: 3 * time.Second, - DualStack: true, - }).DialContext, - TLS: &tls.Config{ - RootCAs: rootCAs, - InsecureSkipVerify: pstr.skipTLSVerify, - }, + + tlsCfg = &tls.Config{ + RootCAs: rootCAs, + InsecureSkipVerify: cfg.Opts.KafkaSkipTLSVerify != nil && *cfg.Opts.KafkaSkipTLSVerify, } } - pstr.Unlock() - return + + pstr.writer = &kafka.Writer{ + Addr: kafka.TCP(pstr.Cfg().ExportPath), + Topic: topic, + + // Leave it to the ExportWithAttempts function + // to handle the connect attempts. + MaxAttempts: 1, + + // To handle both TLS and non-TLS connections consistently in the Close() function, + // we always specify Transport, even if empty. This allows us to call + // CloseIdleConnections on our Transport instance, avoiding the need to differentiate + // between TLS and non-TLS connections. + Transport: &kafka.Transport{ + TLS: tlsCfg, + }, + } + + if cfg.Opts.KafkaBatchSize != nil { + pstr.writer.BatchSize = *cfg.Opts.KafkaBatchSize + } + + return pstr, nil } -func (pstr *KafkaEE) ExportEvent(ctx *context.Context, content any, extraData any) (err error) { - pstr.reqs.get() - pstr.RLock() - if pstr.writer == nil { - pstr.RUnlock() - pstr.reqs.done() - return utils.ErrDisconnected - } - kafkaKey := extraData.(string) - err = pstr.writer.WriteMessages(ctx, kafka.Message{ - Key: []byte(kafkaKey), +// KafkaEE is a kafka poster +type KafkaEE struct { + writer *kafka.Writer + cfg *config.EventExporterCfg + dc *utils.SafeMapStorage + reqs *concReq + bytePreparing +} + +func (k *KafkaEE) Cfg() *config.EventExporterCfg { return k.cfg } + +func (k *KafkaEE) Connect() error { return nil } + +func (k *KafkaEE) ExportEvent(_ *context.Context, content any, key any) error { + k.reqs.get() + defer k.reqs.done() + return k.writer.WriteMessages(context.TODO(), kafka.Message{ + Key: []byte(key.(string)), Value: content.([]byte), }) - pstr.RUnlock() - pstr.reqs.done() - return } -func (pstr *KafkaEE) Close() (err error) { - pstr.Lock() - if pstr.writer != nil { - err = pstr.writer.Close() - pstr.writer = nil +func (k *KafkaEE) Close() error { + + // Manually close idle connections to prevent them from running indefinitely + // after the Kafka writer is purged. Without this, goroutines will accumulate + // over time with each new Kafka writer. + tsp, ok := k.writer.Transport.(*kafka.Transport) + if ok { + tsp.CloseIdleConnections() } - pstr.Unlock() - return + + return k.writer.Close() } -func (pstr *KafkaEE) GetMetrics() *utils.SafeMapStorage { return pstr.dc } -func (pstr *KafkaEE) ExtraData(ev *utils.CGREvent) any { +func (k *KafkaEE) GetMetrics() *utils.SafeMapStorage { return k.dc } +func (k *KafkaEE) ExtraData(ev *utils.CGREvent) any { return utils.ConcatenatedKey( utils.FirstNonEmpty(engine.MapEvent(ev.APIOpts).GetStringIgnoreErrors(utils.MetaOriginID), utils.GenUUID()), utils.FirstNonEmpty(engine.MapEvent(ev.APIOpts).GetStringIgnoreErrors(utils.MetaRunID), utils.MetaDefault), diff --git a/ees/poster_test.go b/ees/poster_test.go index 75ccd7c50..6f2cbc73c 100644 --- a/ees/poster_test.go +++ b/ees/poster_test.go @@ -24,6 +24,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" + "github.com/segmentio/kafka-go" ) func TestAMQPeeParseURL(t *testing.T) { @@ -57,12 +58,21 @@ func TestKafkaParseURL(t *testing.T) { KafkaTopic: utils.StringPointer("cdr_billing"), }, } - exp := &KafkaEE{ - cfg: cfg, - topic: "cdr_billing", - reqs: newConcReq(0), + want := &KafkaEE{ + cfg: cfg, + reqs: newConcReq(0), + writer: &kafka.Writer{ + Addr: kafka.TCP("127.0.0.1:9092"), + Topic: "cdr_billing", + MaxAttempts: 1, + Transport: &kafka.Transport{}, + }, } - if kfk := NewKafkaEE(cfg, nil); !reflect.DeepEqual(exp, kfk) { - t.Errorf("Expected: %s ,received: %s", utils.ToJSON(exp), utils.ToJSON(kfk)) + got, err := NewKafkaEE(cfg, nil) + if err != nil { + t.Fatalf("NewKafkaEE() failed unexpectedly: %v", err) + } + if !reflect.DeepEqual(want, got) { + t.Errorf("NewKafkaEE() = %+v, want %+v", got, want) } }