diff --git a/config/eescfg.go b/config/eescfg.go
index 99479a640..32768d3f8 100644
--- a/config/eescfg.go
+++ b/config/eescfg.go
@@ -751,6 +751,9 @@ func (optsEes *EventExporterOpts) AsMapInterface() map[string]interface{} {
if optsEes.SQLMaxOpenConns != nil {
opts[utils.SQLMaxOpenConns] = *optsEes.SQLMaxOpenConns
}
+ if optsEes.MYSQLDSNParams != nil {
+ opts[utils.MYSQLDSNParams] = optsEes.MYSQLDSNParams
+ }
if optsEes.SQLConnMaxLifetime != nil {
opts[utils.SQLConnMaxLifetime] = optsEes.SQLConnMaxLifetime.String()
}
diff --git a/data/conf/samples/tutinternal/cgrates.json b/data/conf/samples/tutinternal/cgrates.json
index ae037cdf4..26102844e 100644
--- a/data/conf/samples/tutinternal/cgrates.json
+++ b/data/conf/samples/tutinternal/cgrates.json
@@ -4,20 +4,22 @@
"reply_timeout": "50s"
},
-// "logger": {
-// "type": "*kafkaLog", // controls the destination of logs <*syslog|*stdout|*kafka>
-// "level": 6, // system level precision for floats
-// "opts": {
-// "kafka_conn": "localhost:9092", // the connection trough kafka
-// "kafka_topic": "TutorialTopic", // the topic from where the events are exported
-// "attempts": 1, // number of attempts of connecting
-// },
-//},
+/*
+ "logger": {
+ "type": "*kafkaLog", // controls the destination of logs <*syslog|*stdout|*kafka>
+ "level": 6, // system level precision for floats
+ "opts": {
+ "kafka_conn": "ldasdas:9092", // the connection trough kafka
+ "kafka_topic": "TutorialTopic", // the topic from where the events are exported
+ "attempts": 1, // number of attempts of connecting
+ },
+},
+*/
"logger": {
"type": "*syslog", // controls the destination of logs <*syslog|*stdout|*kafka>
"level": 6, // system level precision for floats
-},
+},
"listen": {
"rpc_json": ":2012",
diff --git a/ees/libcdre.go b/ees/libcdre.go
index 218fde038..7211bd5ae 100644
--- a/ees/libcdre.go
+++ b/ees/libcdre.go
@@ -286,7 +286,7 @@ func (expEv *FailedExportersEEs) AddEvent(ev interface{}) {
}
// ReplayFailedPosts tryies to post cdrs again
-func (expEv *FailedExportersEEs) ReplayFailedPosts(attempts int) (failedEvents *utils.FailedExportersLogg, err error) {
+func (expEv *FailedExportersEEs) ReplayFailedPosts(attempts int) (err error) {
eesFailedEvents := &FailedExportersEEs{
Path: expEv.Path,
Opts: expEv.Opts,
diff --git a/utils/failover_export.go b/utils/failover_export.go
index 9ba2dd919..e7a9b8d34 100644
--- a/utils/failover_export.go
+++ b/utils/failover_export.go
@@ -27,7 +27,9 @@ import (
"sync"
"time"
+ "github.com/cgrates/birpc/context"
"github.com/cgrates/ltcache"
+ "github.com/segmentio/kafka-go"
)
var failedPostCache *ltcache.Cache
@@ -143,14 +145,12 @@ func (expEv *FailedExportersLogg) AddEvent(ev interface{}) {
// used only on replay failed post
func NewExportEventsFromFile(filePath string) (expEv *FailedExportersLogg, err error) {
var fileContent []byte
- //err = guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) error {
if fileContent, err = os.ReadFile(filePath); err != nil {
return nil, err
}
if err = os.Remove(filePath); err != nil {
return nil, err
}
- // }, config.CgrConfig().GeneralCfg().LockingTimeout, FileLockPrefix+filePath)
dec := gob.NewDecoder(bytes.NewBuffer(fileContent))
// unmarshall it
expEv = new(FailedExportersLogg)
@@ -159,42 +159,33 @@ func NewExportEventsFromFile(filePath string) (expEv *FailedExportersLogg, err e
}
type FailoverPoster interface {
- ReplayFailedPosts(int) (*FailedExportersLogg, error)
+ ReplayFailedPosts(int) error
}
// ReplayFailedPosts tryies to post cdrs again
-func (expEv *FailedExportersLogg) ReplayFailedPosts(attempts int) (failedEvents *FailedExportersLogg, err error) {
- /* failedEvents = &ExportEvents{
- Path: expEv.Path,
- Opts: expEv.Opts,
- Format: expEv.Format,
- }
-
- var ee EventExporter
- if ee, err = NewEventExporter(&config.EventExporterCfg{
- ID: "ReplayFailedPosts",
- Type: expEv.Format,
- ExportPath: expEv.Path,
- Opts: expEv.Opts,
- Attempts: attempts,
- FailedPostsDir: MetaNone,
- }, config.CgrConfig(), nil, nil); err != nil {
+func (expEv *FailedExportersLogg) ReplayFailedPosts(attempts int) (err error) {
+ nodeID := IfaceAsString(expEv.Opts[NodeID])
+ tnt := IfaceAsString(expEv.Opts[Tenant])
+ logLvl, err := IfaceAsInt(expEv.Opts[Level])
+ if err != nil {
return
}
- keyFunc := func() string { return EmptyString }
- if expEv.Format == MetaKafkajsonMap || expEv.Format == MetaS3jsonMap {
- keyFunc = UUIDSha1Prefix
- }
- for _, ev := range expEv.Events {
- if err = ExportWithAttempts(context.Background(), ee, ev, keyFunc()); err != nil {
- failedEvents.AddEvent(ev)
+ expLogger := NewExportLogger(nodeID, tnt, logLvl,
+ expEv.Path, expEv.Format, attempts, expEv.FailedPostsDir)
+ for _, event := range expEv.Events {
+ var content []byte
+ if content, err = ToUnescapedJSON(event); err != nil {
+ return
+ }
+ if err = expLogger.writer.WriteMessages(context.Background(), kafka.Message{
+ Key: []byte(GenUUID()),
+ Value: content,
+ }); err != nil {
+ // if there are any errors in kafka, we will post in FailedPostDirectory
+ AddFailedMessage(expLogger.fldPostDir, expLogger.writer.Addr.String(), MetaKafkaLog, Kafka,
+ event, expLogger.GetMeta())
+ return nil
}
}
- ee.Close()
- if len(failedEvents.Events) > 0 {
- err = ErrPartiallyExecuted
- } else {
- failedEvents = nil
- } */
- return nil, nil
+ return err
}
diff --git a/utils/failover_export1.go b/utils/failover_export1.go
deleted file mode 100644
index 0257be868..000000000
--- a/utils/failover_export1.go
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
-Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
-Copyright (C) ITsysCOM GmbH
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see
-*/
-
-package utils
-
-/*
-import (
- "encoding/gob"
- "fmt"
- "os"
- "path/filepath"
- "sync"
-)
-
-type FailoverPoster struct {
- sync.Mutex
- mp MessageProvider // e.g kafka
-}
-
-func NewFailoverPoster(dataType, tenant, nodeID, conn, format, fldPostDir string,
- logLvl, attempts int) *FailoverPoster {
- fldPst := new(FailoverPoster)
- switch dataType {
- case MetaKafkaLog:
- fldPst.mp = NewExportLogger(nodeID, tenant, logLvl, conn, format, attempts, fldPostDir)
- }
- return fldPst
-}
-
-func (fldPst *FailoverPoster) AddFailedMessage(content interface{}) (err error) {
- fldPst.Lock()
- meta := fldPst.mp.GetMeta()
- filePath := filepath.Join(meta[FailedPostsDir].(string), meta[Format].(string)+
- PipeSep+MetaKafkaLog+GOBSuffix)
- var fileOut *os.File
- if _, err = os.Stat(filePath); os.IsNotExist(err) {
- fileOut, err = os.Create(filePath)
- if err != nil {
- return fmt.Errorf(fmt.Sprintf(" failed to write logs to file <%s> because <%s>", filePath, err))
- }
- } else {
- fileOut, err = os.OpenFile(filePath, os.O_RDWR|os.O_APPEND, 0755)
- if err != nil {
- return err
- }
- }
- failPoster := &FailoverPosterData{
- MetaData: meta,
- Content: content.(*CGREvent),
- }
- enc := gob.NewEncoder(fileOut)
- err = enc.Encode(failPoster)
- fileOut.Close()
- fldPst.Unlock()
- return
-}
-
-type MessageProvider interface {
- GetContent(filePath string) (string, error)
- GetMeta() map[string]interface{}
-}
-
-func NewMessageProvider(dataType string) (MessageProvider, error) {
- switch dataType {
- case MetaKafkaLog:
- return new(ExportLogger), nil
- default:
- return nil, fmt.Errorf("Invalid Message Provider type in order to read the failed posts")
- }
-}
-
-
-func (fldPst *FailoverPoster) GetContent(filePath string) (string, error) {
-
-}
-
-func (fldPst *FailoverPoster) GetMeta() string {
- return EmptyString
-}
-
-
-// FailoverPosterData will keep the data and the content of the failed post. It is used when we read from gob file to know these info
-type FailoverPosterData struct {
- MetaData map[string]interface{}
- Content *CGREvent
-} */
diff --git a/utils/kafka_logger.go b/utils/kafka_logger.go
index e48a6a680..8ba4d926a 100644
--- a/utils/kafka_logger.go
+++ b/utils/kafka_logger.go
@@ -225,6 +225,8 @@ func (el *ExportLogger) Warning(m string) (err error) {
func (el *ExportLogger) GetMeta() map[string]interface{} {
return map[string]interface{}{
+ Tenant: el.tenant,
+ NodeID: el.nodeID,
Level: el.logLevel,
Format: el.writer.Topic,
Conn: el.writer.Addr.String(),