diff --git a/ees/ees.go b/ees/ees.go
index 8b37b6c7d..62a53bba0 100644
--- a/ees/ees.go
+++ b/ees/ees.go
@@ -281,7 +281,7 @@ func exportEventWithExporter(ctx *context.Context, exp EventExporter, connMngr *
}
func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{}, key interface{},
- connMnngr *engine.ConnManager, tnt string) (err error) {
+ connMngr *engine.ConnManager, tnt string) (err error) {
if exp.Cfg().FailedPostsDir != utils.MetaNone {
defer func() {
if err != nil {
@@ -294,16 +294,11 @@ func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{}
APIOpts: exp.Cfg().Opts.AsMapInterface(),
}
var reply string
- if err = connMnngr.Call(ctx, exp.Cfg().EFsConns,
+ if err = connMngr.Call(ctx, exp.Cfg().EFsConns,
utils.EfSv1ProcessEvent, args, &reply); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> Exporter <%s> could not be written with <%s> service because err: <%s>",
utils.EEs, exp.Cfg().ID, utils.EFs, err.Error()))
- /*
- utils.AddFailedMessage(exp.Cfg().FailedPostsDir, exp.Cfg().ExportPath,
- exp.Cfg().Type, utils.EEs,
- eEv, exp.Cfg().Opts.AsMapInterface())
- */
}
}
}()
diff --git a/ees/ees_test.go b/ees/ees_test.go
index e77d41a03..effcf18fe 100644
--- a/ees/ees_test.go
+++ b/ees/ees_test.go
@@ -111,7 +111,6 @@ func TestAttrSProcessEvent(t *testing.T) {
connMgr := engine.NewConnManager(cfg)
connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes), utils.AttributeSv1, clientConn)
eeS := NewEventExporterS(cfg, filterS, connMgr)
- // cgrEv := &utils.CGREvent{}
if err := eeS.attrSProcessEvent(context.TODO(), cgrEv, []string{}, utils.EmptyString); err != nil {
t.Error(err)
}
@@ -263,6 +262,14 @@ func TestV1ProcessEvent3(t *testing.T) {
}
func TestV1ProcessEvent4(t *testing.T) {
+ testMock := &testMockEvent{
+ calls: map[string]func(_ *context.Context, _, _ interface{}) error{
+ utils.EfSv1ProcessEvent: func(_ *context.Context, args, reply interface{}) error {
+ *reply.(*string) = utils.OK
+ return nil
+ },
+ },
+ }
cfg := config.NewDefaultCGRConfig()
cfg.EEsCfg().Exporters[0].Type = utils.MetaHTTPPost
cfg.EEsCfg().Exporters[0].ID = "SQLExporterFull"
@@ -270,12 +277,17 @@ func TestV1ProcessEvent4(t *testing.T) {
newIDb := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, newDM)
- eeS := NewEventExporterS(cfg, filterS, nil)
+ connMngr := engine.NewConnManager(cfg)
+ clientConn := make(chan birpc.ClientConnector, 1)
+ clientConn <- testMock
+ connMgr := engine.NewConnManager(cfg)
+ connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEFs), utils.EfSv1, clientConn)
+ eeS := NewEventExporterS(cfg, filterS, connMngr)
eeS.eesChs = map[string]*ltcache.Cache{
utils.MetaHTTPPost: ltcache.NewCache(1,
time.Second, false, onCacheEvicted),
}
- newEeS, err := NewEventExporter(cfg.EEsCfg().Exporters[0], cfg, filterS, nil)
+ newEeS, err := NewEventExporter(cfg.EEsCfg().Exporters[0], cfg, filterS, connMngr)
if err != nil {
t.Error(err)
}
diff --git a/efs/failed_loggs.go b/efs/failed_loggs.go
index d0bd74b25..28c450851 100644
--- a/efs/failed_loggs.go
+++ b/efs/failed_loggs.go
@@ -25,6 +25,7 @@ import (
"sync"
"github.com/cgrates/birpc/context"
+ "github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/segmentio/kafka-go"
@@ -40,8 +41,8 @@ type FailedExportersLogg struct {
FailedPostsDir string
Module string
- efsConns []string
connMngr *engine.ConnManager
+ cfg *config.CGRConfig
}
// AddEvent adds one event
@@ -75,8 +76,8 @@ func (expEv *FailedExportersLogg) ReplayFailedPosts(ctx *context.Context, attemp
if err != nil {
return
}
- expLogger := utils.NewExportLogger(nodeID, tnt, logLvl,
- expEv.Path, expEv.Format, attempts, expEv.FailedPostsDir)
+ expLogger := engine.NewExportLogger(nodeID, tnt, logLvl,
+ expEv.connMngr, expEv.cfg)
for _, event := range expEv.Events {
var content []byte
if content, err = utils.ToUnescapedJSON(event); err != nil {
@@ -88,7 +89,7 @@ func (expEv *FailedExportersLogg) ReplayFailedPosts(ctx *context.Context, attemp
}); err != nil {
var reply string
// if there are any errors in kafka, we will post in FailedPostDirectory
- if err = expEv.connMngr.Call(ctx, expEv.efsConns, utils.EfSv1ProcessEvent,
+ if err = expEv.connMngr.Call(ctx, expEv.cfg.LoggerCfg().EFsConns, utils.EfSv1ProcessEvent,
&utils.ArgsFailedPosts{
Tenant: tnt,
Path: expLogger.Writer.Addr.String(),
@@ -98,11 +99,6 @@ func (expEv *FailedExportersLogg) ReplayFailedPosts(ctx *context.Context, attemp
APIOpts: expLogger.GetMeta(),
}, &reply); err != nil {
return err
- /*
- utils.AddFailedMessage(expLogger.FldPostDir, expLogger.Writer.Addr.String(), utils.
- MetaKafkaLog, utils.Kafka,
- event, expLogger.GetMeta())
- */
}
return nil
}
diff --git a/efs/libefs.go b/efs/libefs.go
index f883d537c..2e00f8b7e 100644
--- a/efs/libefs.go
+++ b/efs/libefs.go
@@ -116,7 +116,7 @@ func NewFailoverPosterFromFile(filePath, providerType string, efs *EfS) (failPos
connMngr: efs.connMgr,
}
case utils.Kafka:
- expEv.efsConns = efs.cfg.LoggerCfg().EFsConns
+ expEv.cfg = efs.cfg
expEv.connMngr = efs.connMgr
failPoster = expEv
}
diff --git a/engine/kafka_logger.go b/engine/kafka_logger.go
new file mode 100644
index 000000000..e8f5388fe
--- /dev/null
+++ b/engine/kafka_logger.go
@@ -0,0 +1,268 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package engine
+
+import (
+ "fmt"
+ "log/syslog"
+ "sync"
+ "time"
+
+ "github.com/cgrates/birpc/context"
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/utils"
+ "github.com/segmentio/kafka-go"
+)
+
+func NewLogger(loggerType, tnt, nodeID string,
+ connMgr *ConnManager, cfg *config.CGRConfig) (utils.LoggerInterface, error) {
+ switch loggerType {
+ case utils.MetaKafkaLog:
+ return NewExportLogger(nodeID, tnt, cfg.LoggerCfg().Level, connMgr, cfg), nil
+ case utils.MetaStdLog, utils.MetaSysLog:
+ return utils.NewLogger(loggerType, nodeID, cfg.LoggerCfg().Level)
+ default:
+ return nil, fmt.Errorf("unsupported logger: <%+s>", loggerType)
+ }
+}
+
+// Logs to kafka
+type ExportLogger struct {
+ sync.Mutex
+ cfg *config.CGRConfig
+ connMgr *ConnManager
+
+ LogLevel int
+ FldPostDir string
+ Writer *kafka.Writer
+ NodeID string
+ Tenant string
+}
+
+// NewExportLogger will export loggers to kafka
+func NewExportLogger(nodeID, tenant string, level int,
+ connMgr *ConnManager, cfg *config.CGRConfig) (el *ExportLogger) {
+ el = &ExportLogger{
+ connMgr: connMgr,
+ cfg: cfg,
+ LogLevel: level,
+ FldPostDir: cfg.LoggerCfg().Opts.FailedPostsDir,
+ NodeID: nodeID,
+ Tenant: tenant,
+ Writer: &kafka.Writer{
+ Addr: kafka.TCP(cfg.LoggerCfg().Opts.KafkaConn),
+ Topic: cfg.LoggerCfg().Opts.KafkaTopic,
+ MaxAttempts: cfg.LoggerCfg().Opts.KafkaAttempts,
+ },
+ }
+ return
+}
+
+func (el *ExportLogger) Close() (err error) {
+ if el.Writer != nil {
+ err = el.Writer.Close()
+ el.Writer = nil
+ }
+ return
+}
+
+func (el *ExportLogger) call(m string, level int) (err error) {
+ eventExport := &utils.CGREvent{
+ Tenant: el.Tenant,
+ Event: map[string]interface{}{
+ utils.NodeID: el.NodeID,
+ utils.Message: m,
+ utils.Severity: level,
+ utils.Timestamp: time.Now().Format("2006-01-02 15:04:05"),
+ },
+ }
+ // event will be exported through kafka as json format
+ var content []byte
+ if content, err = utils.ToUnescapedJSON(eventExport); err != nil {
+ return
+ }
+ if err = el.Writer.WriteMessages(context.Background(), kafka.Message{
+ Key: []byte(utils.GenUUID()),
+ Value: content,
+ }); err != nil {
+ // if there are any errors in kafka, we will post in FailedPostDirectory
+ args := &utils.ArgsFailedPosts{
+ Tenant: el.Tenant,
+ Path: el.Writer.Addr.String(),
+ Event: eventExport,
+ FailedDir: el.FldPostDir,
+ Module: utils.Kafka,
+ APIOpts: el.GetMeta(),
+ }
+ var reply string
+ if err = el.connMgr.Call(context.Background(), el.cfg.LoggerCfg().EFsConns,
+ utils.EfSv1ProcessEvent, args, &reply); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> Exporter could not writte failed event with <%s> service because err: <%s>",
+ utils.Logger, utils.EFs, err.Error()))
+ }
+ // also the content should be printed as a stdout logger type
+ return utils.ErrLoggerChanged
+ }
+ return
+}
+
+func (el *ExportLogger) Write(p []byte) (n int, err error) {
+ n = len(p)
+ err = el.call(string(p), 8)
+ return
+}
+
+func (sl *ExportLogger) GetSyslog() *syslog.Writer {
+ return nil
+}
+
+// GetLogLevel() returns the level logger number for the server
+func (el *ExportLogger) GetLogLevel() int {
+ return el.LogLevel
+}
+
+// SetLogLevel changes the log level
+func (el *ExportLogger) SetLogLevel(level int) {
+ el.LogLevel = level
+}
+
+// Alert logs to EEs with alert level
+func (el *ExportLogger) Alert(m string) (err error) {
+ if el.LogLevel < utils.LOGLEVEL_ALERT {
+ return nil
+ }
+ if err = el.call(m, utils.LOGLEVEL_ALERT); err != nil {
+ if err == utils.ErrLoggerChanged {
+ utils.NewStdLogger(el.NodeID, el.LogLevel).Alert(m)
+ err = nil
+ }
+ }
+ return
+}
+
+// Crit logs to EEs with critical level
+func (el *ExportLogger) Crit(m string) (err error) {
+ if el.LogLevel < utils.LOGLEVEL_CRITICAL {
+ return nil
+ }
+ if el.call(m, utils.LOGLEVEL_CRITICAL); err != nil {
+ if err == utils.ErrLoggerChanged {
+ utils.NewStdLogger(el.NodeID, el.LogLevel).Crit(m)
+ err = nil
+ }
+ }
+ return
+}
+
+// Debug logs to EEs with debug level
+func (el *ExportLogger) Debug(m string) (err error) {
+ if el.LogLevel < utils.LOGLEVEL_DEBUG {
+ return nil
+ }
+ if err = el.call(m, utils.LOGLEVEL_DEBUG); err != nil {
+ if err == utils.ErrLoggerChanged {
+ utils.NewStdLogger(el.NodeID, el.LogLevel).Debug(m)
+ err = nil
+ }
+ }
+ return
+}
+
+// Emerg logs to EEs with emergency level
+func (el *ExportLogger) Emerg(m string) (err error) {
+ if el.LogLevel < utils.LOGLEVEL_EMERGENCY {
+ return nil
+ }
+ if err = el.call(m, utils.LOGLEVEL_EMERGENCY); err != nil {
+ if err == utils.ErrLoggerChanged {
+ utils.NewStdLogger(el.NodeID, el.LogLevel).Emerg(m)
+ err = nil
+ }
+ }
+ return
+}
+
+// Err logs to EEs with error level
+func (el *ExportLogger) Err(m string) (err error) {
+ if el.LogLevel < utils.LOGLEVEL_ERROR {
+ return nil
+ }
+ if err = el.call(m, utils.LOGLEVEL_ERROR); err != nil {
+ if err == utils.ErrLoggerChanged {
+ utils.NewStdLogger(el.NodeID, el.LogLevel).Err(m)
+ err = nil
+ }
+ }
+ return
+}
+
+// Info logs to EEs with info level
+func (el *ExportLogger) Info(m string) (err error) {
+ if el.LogLevel < utils.LOGLEVEL_INFO {
+ return nil
+ }
+ if err = el.call(m, utils.LOGLEVEL_INFO); err != nil {
+ if err == utils.ErrLoggerChanged {
+ utils.NewStdLogger(el.NodeID, el.LogLevel).Info(m)
+ err = nil
+ }
+ }
+ return
+}
+
+// Notice logs to EEs with notice level
+func (el *ExportLogger) Notice(m string) (err error) {
+ if el.LogLevel < utils.LOGLEVEL_NOTICE {
+ return nil
+ }
+ if err = el.call(m, utils.LOGLEVEL_NOTICE); err != nil {
+ if err == utils.ErrLoggerChanged {
+ utils.NewStdLogger(el.NodeID, el.LogLevel).Notice(m)
+ err = nil
+ }
+ }
+ return
+}
+
+// Warning logs to EEs with warning level
+func (el *ExportLogger) Warning(m string) (err error) {
+ if el.LogLevel < utils.LOGLEVEL_WARNING {
+ return nil
+ }
+ if err = el.call(m, utils.LOGLEVEL_WARNING); err != nil {
+ if err == utils.ErrLoggerChanged {
+ utils.NewStdLogger(el.NodeID, el.LogLevel).Warning(m)
+ err = nil
+ }
+ }
+ return
+}
+
+func (el *ExportLogger) GetMeta() map[string]interface{} {
+ return map[string]interface{}{
+ utils.Tenant: el.Tenant,
+ utils.NodeID: el.NodeID,
+ utils.Level: el.LogLevel,
+ utils.Format: el.Writer.Topic,
+ utils.Conn: el.Writer.Addr.String(),
+ utils.FailedPostsDir: el.FldPostDir,
+ utils.Attempts: el.Writer.MaxAttempts,
+ }
+}
diff --git a/utils/kafka_logger_test.go b/engine/kafka_logger_test.go
similarity index 99%
rename from utils/kafka_logger_test.go
rename to engine/kafka_logger_test.go
index 7557ca2ae..e6a8a1379 100644
--- a/utils/kafka_logger_test.go
+++ b/engine/kafka_logger_test.go
@@ -16,7 +16,7 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see
*/
-package utils
+package engine
/*
func TestLoggerNewLoggerExport(t *testing.T) {
diff --git a/ers/amqp.go b/ers/amqp.go
index 171d87092..f0283bfeb 100644
--- a/ers/amqp.go
+++ b/ers/amqp.go
@@ -35,8 +35,9 @@ import (
// NewAMQPER return a new amqp event reader
func NewAMQPER(cfg *config.CGRConfig, cfgIdx int,
rdrEvents, partialEvents chan *erEvent, rdrErr chan error,
- fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) {
+ fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (er EventReader, err error) {
rdr := &AMQPER{
+ connMgr: connMgr,
cgrCfg: cfg,
cfgIdx: cfgIdx,
fltrS: fltrS,
@@ -60,9 +61,10 @@ func NewAMQPER(cfg *config.CGRConfig, cfgIdx int,
// AMQPER implements EventReader interface for amqp message
type AMQPER struct {
// sync.RWMutex
- cgrCfg *config.CGRConfig
- cfgIdx int // index of config instance within ERsCfg.Readers
- fltrS *engine.FilterS
+ cgrCfg *config.CGRConfig
+ cfgIdx int // index of config instance within ERsCfg.Readers
+ fltrS *engine.FilterS
+ connMgr *engine.ConnManager
dialURL string
queueID string
@@ -167,7 +169,8 @@ func (rdr *AMQPER) readLoop(msgChan <-chan amqp.Delivery) {
utils.ERs, msg.MessageId, err.Error()))
}
if rdr.poster != nil { // post it
- if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Body, utils.EmptyString); err != nil {
+ if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Body, utils.EmptyString,
+ rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> writing message %s error: %s",
utils.ERs, msg.MessageId, err.Error()))
diff --git a/ers/amqp_it_test.go b/ers/amqp_it_test.go
index ce44b46c9..dcaf3c10a 100644
--- a/ers/amqp_it_test.go
+++ b/ers/amqp_it_test.go
@@ -75,7 +75,7 @@ func TestAMQPER(t *testing.T) {
rdrExit = make(chan struct{}, 1)
if rdr, err = NewAMQPER(cfg, 1, rdrEvents, make(chan *erEvent, 1),
- rdrErr, new(engine.FilterS), rdrExit); err != nil {
+ rdrErr, new(engine.FilterS), rdrExit, nil); err != nil {
t.Fatal(err)
}
connection, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
@@ -136,7 +136,7 @@ func TestAMQPERServeError(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfgIdx := 0
expected := "AMQP scheme must be either 'amqp://' or 'amqps://'"
- rdr, err := NewAMQPER(cfg, cfgIdx, nil, nil, nil, nil, nil)
+ rdr, err := NewAMQPER(cfg, cfgIdx, nil, nil, nil, nil, nil, nil)
if err != nil {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
}
diff --git a/ers/amqpv1.go b/ers/amqpv1.go
index 8b614be79..f70969ee7 100644
--- a/ers/amqpv1.go
+++ b/ers/amqpv1.go
@@ -35,8 +35,9 @@ import (
// NewAMQPv1ER return a new amqpv1 event reader
func NewAMQPv1ER(cfg *config.CGRConfig, cfgIdx int,
rdrEvents, partialEvents chan *erEvent, rdrErr chan error,
- fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) {
+ fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (er EventReader, err error) {
rdr := &AMQPv1ER{
+ connMgr: connMgr,
cgrCfg: cfg,
cfgIdx: cfgIdx,
fltrS: fltrS,
@@ -61,9 +62,10 @@ func NewAMQPv1ER(cfg *config.CGRConfig, cfgIdx int,
// AMQPv1ER implements EventReader interface for amqpv1 message
type AMQPv1ER struct {
// sync.RWMutex
- cgrCfg *config.CGRConfig
- cfgIdx int // index of config instance within ERsCfg.Readers
- fltrS *engine.FilterS
+ cgrCfg *config.CGRConfig
+ cfgIdx int // index of config instance within ERsCfg.Readers
+ fltrS *engine.FilterS
+ connMgr *engine.ConnManager
queueID string
@@ -145,7 +147,8 @@ func (rdr *AMQPv1ER) readLoop(recv *amqpv1.Receiver) (err error) {
utils.ERs, err.Error()))
}
if rdr.poster != nil { // post it
- if err := ees.ExportWithAttempts(context.Background(), rdr.poster, body, utils.EmptyString); err != nil {
+ if err := ees.ExportWithAttempts(context.Background(), rdr.poster, body, utils.EmptyString, rdr.connMgr,
+ rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> writing message error: %s",
utils.ERs, err.Error()))
diff --git a/ers/amqpv1_it_test.go b/ers/amqpv1_it_test.go
index fe9d97d19..f23e86367 100644
--- a/ers/amqpv1_it_test.go
+++ b/ers/amqpv1_it_test.go
@@ -80,7 +80,7 @@ func TestAMQPERv1(t *testing.T) {
rdrExit = make(chan struct{}, 1)
if rdr, err = NewAMQPv1ER(cfg, 1, rdrEvents, make(chan *erEvent, 1),
- rdrErr, new(engine.FilterS), rdrExit); err != nil {
+ rdrErr, new(engine.FilterS), rdrExit, nil); err != nil {
t.Fatal(err)
}
amqpv1Rdr := rdr.(*AMQPv1ER)
@@ -153,7 +153,7 @@ func TestAmqpv1NewAMQPv1ER(t *testing.T) {
},
}
- result, err := NewAMQPv1ER(cfg, cfgIdx, nil, nil, nil, nil, nil)
+ result, err := NewAMQPv1ER(cfg, cfgIdx, nil, nil, nil, nil, nil, nil)
if err != nil {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
}
@@ -184,7 +184,7 @@ func TestAmqpv1NewAMQPv1ER2(t *testing.T) {
},
}
- result, err := NewAMQPv1ER(cfg, cfgIdx, nil, nil, nil, nil, nil)
+ result, err := NewAMQPv1ER(cfg, cfgIdx, nil, nil, nil, nil, nil, nil)
if err != nil {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
}
diff --git a/ers/ers.go b/ers/ers.go
index 6ac0f9ba6..a93d74497 100644
--- a/ers/ers.go
+++ b/ers/ers.go
@@ -164,7 +164,7 @@ func (erS *ERService) addReader(rdrID string, cfgIdx int) (err error) {
var rdr EventReader
if rdr, err = NewEventReader(erS.cfg, cfgIdx,
erS.rdrEvents, erS.partialEvents, erS.rdrErr,
- erS.filterS, erS.stopLsn[rdrID]); err != nil {
+ erS.filterS, erS.stopLsn[rdrID], erS.connMgr); err != nil {
return
}
erS.rdrs[rdrID] = rdr
diff --git a/ers/kafka.go b/ers/kafka.go
index ccef5724a..c578a05a0 100644
--- a/ers/kafka.go
+++ b/ers/kafka.go
@@ -37,9 +37,10 @@ import (
// NewKafkaER return a new kafka event reader
func NewKafkaER(cfg *config.CGRConfig, cfgIdx int,
rdrEvents, partialEvents chan *erEvent, rdrErr chan error,
- fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) {
+ fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (er EventReader, err error) {
rdr := &KafkaER{
+ connMgr: connMgr,
cgrCfg: cfg,
cfgIdx: cfgIdx,
fltrS: fltrS,
@@ -65,9 +66,10 @@ func NewKafkaER(cfg *config.CGRConfig, cfgIdx int,
// KafkaER implements EventReader interface for kafka message
type KafkaER struct {
// sync.RWMutex
- cgrCfg *config.CGRConfig
- cfgIdx int // index of config instance within ERsCfg.Readers
- fltrS *engine.FilterS
+ cgrCfg *config.CGRConfig
+ cfgIdx int // index of config instance within ERsCfg.Readers
+ fltrS *engine.FilterS
+ connMgr *engine.ConnManager
dialURL string
topic string
@@ -141,7 +143,8 @@ func (rdr *KafkaER) readLoop(r *kafka.Reader) {
utils.ERs, string(msg.Key), err.Error()))
}
if rdr.poster != nil { // post it
- if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Value, string(msg.Key)); err != nil {
+ if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Value, string(msg.Key),
+ rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> writing message %s error: %s",
utils.ERs, string(msg.Key), err.Error()))
diff --git a/ers/kafka_it_test.go b/ers/kafka_it_test.go
index 3c20768dd..c0e7c1414 100644
--- a/ers/kafka_it_test.go
+++ b/ers/kafka_it_test.go
@@ -75,7 +75,7 @@ func TestKafkaER(t *testing.T) {
rdrExit = make(chan struct{}, 1)
if rdr, err = NewKafkaER(cfg, 1, rdrEvents, make(chan *erEvent, 1),
- rdrErr, new(engine.FilterS), rdrExit); err != nil {
+ rdrErr, new(engine.FilterS), rdrExit, nil); err != nil {
t.Fatal(err)
}
w := kafka.NewWriter(kafka.WriterConfig{
diff --git a/ers/kafka_test.go b/ers/kafka_test.go
index dc8ee5d1f..5af31ad4e 100644
--- a/ers/kafka_test.go
+++ b/ers/kafka_test.go
@@ -104,7 +104,7 @@ func TestKafkaERServe(t *testing.T) {
rdrEvents := make(chan *erEvent, 1)
rdrExit := make(chan struct{}, 1)
rdrErr := make(chan error, 1)
- rdr, err := NewKafkaER(cfg, 0, rdrEvents, make(chan *erEvent, 1), rdrErr, fltrS, rdrExit)
+ rdr, err := NewKafkaER(cfg, 0, rdrEvents, make(chan *erEvent, 1), rdrErr, fltrS, rdrExit, nil)
if err != nil {
t.Error(err)
}
diff --git a/ers/nats.go b/ers/nats.go
index 9d4b79909..877a1c9e3 100644
--- a/ers/nats.go
+++ b/ers/nats.go
@@ -38,8 +38,9 @@ import (
// NewNatsER return a new amqp event reader
func NewNatsER(cfg *config.CGRConfig, cfgIdx int,
rdrEvents, partialEvents chan *erEvent, rdrErr chan error,
- fltrS *engine.FilterS, rdrExit chan struct{}) (_ EventReader, err error) {
+ fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (_ EventReader, err error) {
rdr := &NatsER{
+ connMgr: connMgr,
cgrCfg: cfg,
cfgIdx: cfgIdx,
fltrS: fltrS,
@@ -66,9 +67,10 @@ func NewNatsER(cfg *config.CGRConfig, cfgIdx int,
// NatsER implements EventReader interface for amqp message
type NatsER struct {
// sync.RWMutex
- cgrCfg *config.CGRConfig
- cfgIdx int // index of config instance within ERsCfg.Readers
- fltrS *engine.FilterS
+ cgrCfg *config.CGRConfig
+ cfgIdx int // index of config instance within ERsCfg.Readers
+ fltrS *engine.FilterS
+ connMgr *engine.ConnManager
rdrEvents chan *erEvent // channel to dispatch the events created to
partialEvents chan *erEvent // channel to dispatch the partial events created to
@@ -139,7 +141,8 @@ func (rdr *NatsER) Serve() (err error) {
utils.ERs, string(msg.Data), err.Error()))
}
if rdr.poster != nil { // post it
- if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Data, utils.EmptyString); err != nil {
+ if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Data, utils.EmptyString,
+ rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> writing message %s error: %s",
utils.ERs, string(msg.Data), err.Error()))
diff --git a/ers/nats_it_test.go b/ers/nats_it_test.go
index afc54ed04..257e842e4 100644
--- a/ers/nats_it_test.go
+++ b/ers/nats_it_test.go
@@ -75,7 +75,7 @@ func testCheckNatsJetStream(t *testing.T, cfg *config.CGRConfig) {
rdrExit = make(chan struct{}, 1)
var err error
if rdr, err = NewNatsER(cfg, 1, rdrEvents, make(chan *erEvent, 1),
- rdrErr, new(engine.FilterS), rdrExit); err != nil {
+ rdrErr, new(engine.FilterS), rdrExit, nil); err != nil {
t.Fatal(err)
}
@@ -162,7 +162,7 @@ func testCheckNatsNormal(t *testing.T, cfg *config.CGRConfig) {
var err error
if rdr, err = NewNatsER(cfg, 1, rdrEvents, make(chan *erEvent, 1),
- rdrErr, new(engine.FilterS), rdrExit); err != nil {
+ rdrErr, new(engine.FilterS), rdrExit, nil); err != nil {
t.Fatal(err)
}
diff --git a/ers/reader.go b/ers/reader.go
index 8d57521ca..c2817572e 100644
--- a/ers/reader.go
+++ b/ers/reader.go
@@ -35,7 +35,7 @@ type EventReader interface {
// NewEventReader instantiates the event reader based on configuration at index
func NewEventReader(cfg *config.CGRConfig, cfgIdx int,
rdrEvents, partialEvents chan *erEvent, rdrErr chan error,
- fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) {
+ fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (er EventReader, err error) {
switch cfg.ERsCfg().Readers[cfgIdx].Type {
default:
err = fmt.Errorf("unsupported reader type: <%s>", cfg.ERsCfg().Readers[cfgIdx].Type)
@@ -46,21 +46,21 @@ func NewEventReader(cfg *config.CGRConfig, cfgIdx int,
case utils.MetaFileFWV:
return NewFWVFileER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
case utils.MetaKafkajsonMap:
- return NewKafkaER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
+ return NewKafkaER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit, connMgr)
case utils.MetaSQL:
return NewSQLEventReader(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
case utils.MetaFileJSON:
return NewJSONFileER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
case utils.MetaAMQPjsonMap:
- return NewAMQPER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
+ return NewAMQPER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit, connMgr)
case utils.MetaS3jsonMap:
- return NewS3ER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
+ return NewS3ER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit, connMgr)
case utils.MetaSQSjsonMap:
- return NewSQSER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
+ return NewSQSER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit, connMgr)
case utils.MetaAMQPV1jsonMap:
- return NewAMQPv1ER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
+ return NewAMQPv1ER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit, connMgr)
case utils.MetaNatsjsonMap:
- return NewNatsER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
+ return NewNatsER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit, connMgr)
}
return
}
diff --git a/ers/readers_test.go b/ers/readers_test.go
index f1a7bab9e..331742b98 100644
--- a/ers/readers_test.go
+++ b/ers/readers_test.go
@@ -36,7 +36,7 @@ func TestNewInvalidReader(t *testing.T) {
if len(cfg.ERsCfg().Readers) != 2 {
t.Errorf("Expecting: <2>, received: <%+v>", len(cfg.ERsCfg().Readers))
}
- if _, err := NewEventReader(cfg, 1, nil, nil, nil, &engine.FilterS{}, nil); err == nil || err.Error() != "unsupported reader type: " {
+ if _, err := NewEventReader(cfg, 1, nil, nil, nil, &engine.FilterS{}, nil, nil); err == nil || err.Error() != "unsupported reader type: " {
t.Errorf("Expecting: >, received: <%+v>", err)
}
}
@@ -61,7 +61,7 @@ func TestNewCsvReader(t *testing.T) {
rdrExit: nil,
conReqs: nil}
var expected EventReader = exp
- if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil); err != nil {
+ if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil, nil); err != nil {
t.Errorf("Expecting: , received: <%+v>", err)
} else {
// because we use function make to init the channel when we create the EventReader reflect.DeepEqual
@@ -84,11 +84,11 @@ func TestNewKafkaReader(t *testing.T) {
if len(cfg.ERsCfg().Readers) != 2 {
t.Errorf("Expecting: <2>, received: <%+v>", len(cfg.ERsCfg().Readers))
}
- expected, err := NewKafkaER(cfg, 1, nil, nil, nil, fltr, nil)
+ expected, err := NewKafkaER(cfg, 1, nil, nil, nil, fltr, nil, nil)
if err != nil {
t.Errorf("Expecting: , received: <%+v>", err)
}
- if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil); err != nil {
+ if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil, nil); err != nil {
t.Errorf("Expecting: , received: <%+v>", err)
} else if !reflect.DeepEqual(expected, rcv) {
t.Errorf("Expecting: <%+v>, received: <%+v>", expected, rcv)
@@ -115,7 +115,7 @@ func TestNewSQLReader(t *testing.T) {
if err != nil {
t.Errorf("Expecting: , received: <%+v>", err)
}
- if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil); err != nil {
+ if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil, nil); err != nil {
t.Errorf("Expecting: , received: <%+v>", err)
} else if !reflect.DeepEqual(expected, rcv) {
t.Errorf("Expecting: <%+v>, received: <%+v>", expected, rcv)
@@ -149,7 +149,7 @@ func TestNewFileXMLReader(t *testing.T) {
if err != nil {
t.Error(err)
}
- rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil)
+ rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil)
if err != nil {
t.Error(err)
} else {
@@ -169,7 +169,7 @@ func TestNewFileFWVReader(t *testing.T) {
if err != nil {
t.Error(err)
}
- rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil)
+ rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil)
if err != nil {
t.Error(nil)
} else {
@@ -189,7 +189,7 @@ func TestNewJSONReader(t *testing.T) {
if err != nil {
t.Error(err)
}
- rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil)
+ rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil)
if err != nil {
t.Error(err)
} else {
@@ -219,7 +219,7 @@ func TestNewAMQPReader(t *testing.T) {
exp.setOpts(&config.EventReaderOpts{})
exp.createPoster()
var expected EventReader = exp
- rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil)
+ rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil)
if err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expected, rcv) {
@@ -244,7 +244,7 @@ func TestNewAMQPv1Reader(t *testing.T) {
exp.Config().Opts = &config.EventReaderOpts{}
exp.createPoster()
var expected EventReader = exp
- rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil)
+ rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil)
if err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expected, rcv) {
@@ -270,7 +270,7 @@ func TestNewS3Reader(t *testing.T) {
exp.Config().Opts = &config.EventReaderOpts{}
exp.createPoster()
var expected EventReader = exp
- rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil)
+ rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil)
if err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expected, rcv) {
@@ -307,7 +307,7 @@ func TestNewSQSReader(t *testing.T) {
exp.Config().Opts = &config.EventReaderOpts{}
exp.createPoster()
var expected EventReader = exp
- rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil)
+ rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil)
exp.session = rcv.(*SQSER).session
if err != nil {
t.Error(err)
diff --git a/ers/s3.go b/ers/s3.go
index b84c86f4d..18f4addef 100644
--- a/ers/s3.go
+++ b/ers/s3.go
@@ -39,9 +39,10 @@ import (
// NewS3ER return a new s3 event reader
func NewS3ER(cfg *config.CGRConfig, cfgIdx int,
rdrEvents, partialEvents chan *erEvent, rdrErr chan error,
- fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) {
+ fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (er EventReader, err error) {
rdr := &S3ER{
+ connMgr: connMgr,
cgrCfg: cfg,
cfgIdx: cfgIdx,
fltrS: fltrS,
@@ -63,9 +64,10 @@ func NewS3ER(cfg *config.CGRConfig, cfgIdx int,
// S3ER implements EventReader interface for s3 message
type S3ER struct {
// sync.RWMutex
- cgrCfg *config.CGRConfig
- cfgIdx int // index of config instance within ERsCfg.Readers
- fltrS *engine.FilterS
+ cgrCfg *config.CGRConfig
+ cfgIdx int // index of config instance within ERsCfg.Readers
+ fltrS *engine.FilterS
+ connMgr *engine.ConnManager
rdrEvents chan *erEvent // channel to dispatch the events created to
partialEvents chan *erEvent // channel to dispatch the partial events created to
@@ -247,7 +249,8 @@ func (rdr *S3ER) readMsg(scv *s3.S3, key string) (err error) {
}
if rdr.poster != nil { // post it
- if err = ees.ExportWithAttempts(context.Background(), rdr.poster, msg, key); err != nil {
+ if err = ees.ExportWithAttempts(context.Background(), rdr.poster, msg, key,
+ rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> writing message %s error: %s",
utils.ERs, key, err.Error()))
diff --git a/ers/s3_it_test.go b/ers/s3_it_test.go
index 1b2bee7e9..dcde99e2f 100644
--- a/ers/s3_it_test.go
+++ b/ers/s3_it_test.go
@@ -86,7 +86,7 @@ func TestS3ER(t *testing.T) {
rdrExit = make(chan struct{}, 1)
if rdr, err = NewS3ER(cfg, 1, rdrEvents, make(chan *erEvent, 1),
- rdrErr, new(engine.FilterS), rdrExit); err != nil {
+ rdrErr, new(engine.FilterS), rdrExit, nil); err != nil {
t.Fatal(err)
}
s3Rdr := rdr.(*S3ER)
@@ -176,7 +176,7 @@ func TestNewS3ER(t *testing.T) {
}
rdr, err := NewS3ER(cfg, 1, nil, nil,
- nil, nil, nil)
+ nil, nil, nil, nil)
if err != nil {
t.Fatal(err)
}
@@ -190,18 +190,11 @@ func TestNewS3ERCase2(t *testing.T) {
expected := &S3ER{
cgrCfg: cfg,
cfgIdx: 0,
- fltrS: nil,
- rdrEvents: nil,
- rdrExit: nil,
- rdrErr: nil,
- cap: nil,
awsRegion: "",
awsID: "",
awsKey: "",
awsToken: "",
bucket: "cgrates_cdrs",
- session: nil,
- poster: nil,
}
cfg.ERsCfg().Readers = []*config.EventReaderCfg{
{
@@ -217,7 +210,7 @@ func TestNewS3ERCase2(t *testing.T) {
}
rdr, err := NewS3ER(cfg, 0, nil, nil,
- nil, nil, nil)
+ nil, nil, nil, nil)
if err != nil {
t.Fatal(err)
}
diff --git a/ers/sqs.go b/ers/sqs.go
index 44376c068..108137e71 100644
--- a/ers/sqs.go
+++ b/ers/sqs.go
@@ -39,9 +39,10 @@ import (
// NewSQSER return a new sqs event reader
func NewSQSER(cfg *config.CGRConfig, cfgIdx int,
rdrEvents, partialEvents chan *erEvent, rdrErr chan error,
- fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) {
+ fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (er EventReader, err error) {
rdr := &SQSER{
+ connMgr: connMgr,
cgrCfg: cfg,
cfgIdx: cfgIdx,
fltrS: fltrS,
@@ -63,9 +64,10 @@ func NewSQSER(cfg *config.CGRConfig, cfgIdx int,
// SQSER implements EventReader interface for sqs message
type SQSER struct {
// sync.RWMutex
- cgrCfg *config.CGRConfig
- cfgIdx int // index of config instance within ERsCfg.Readers
- fltrS *engine.FilterS
+ cgrCfg *config.CGRConfig
+ cfgIdx int // index of config instance within ERsCfg.Readers
+ fltrS *engine.FilterS
+ connMgr *engine.ConnManager
rdrEvents chan *erEvent // channel to dispatch the events created to
partialEvents chan *erEvent // channel to dispatch the partial events created to
@@ -263,7 +265,8 @@ func (rdr *SQSER) readMsg(scv sqsClient, msg *sqs.Message) (err error) {
}
if rdr.poster != nil { // post it
- if err = ees.ExportWithAttempts(context.Background(), rdr.poster, body, key); err != nil {
+ if err = ees.ExportWithAttempts(context.Background(), rdr.poster, body, key,
+ rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> writing message %s error: %s",
utils.ERs, key, err.Error()))
diff --git a/ers/sqs_test.go b/ers/sqs_test.go
index 01332c54f..197f2d350 100644
--- a/ers/sqs_test.go
+++ b/ers/sqs_test.go
@@ -53,7 +53,7 @@ func TestNewSQSER(t *testing.T) {
},
}
rdr, err := NewSQSER(cfg, 0, nil, nil,
- nil, nil, nil)
+ nil, nil, nil, nil)
if err != nil {
t.Fatal(err)
}
@@ -81,7 +81,7 @@ func TestSQSERServeRunDelay0(t *testing.T) {
},
}
rdr, err := NewSQSER(cfg, 0, nil, nil,
- nil, nil, nil)
+ nil, nil, nil, nil)
if err != nil {
t.Fatal(err)
}
@@ -107,7 +107,7 @@ func TestSQSERServe(t *testing.T) {
},
}
rdr, err := NewSQSER(cfg, 0, nil, nil,
- nil, nil, nil)
+ nil, nil, nil, nil)
if err != nil {
t.Fatal(err)
}
diff --git a/general_tests/cdrs_post_failover_it_test.go b/general_tests/cdrs_post_failover_it_test.go
index e563b74a4..cbce1154b 100644
--- a/general_tests/cdrs_post_failover_it_test.go
+++ b/general_tests/cdrs_post_failover_it_test.go
@@ -203,7 +203,7 @@ func testCDRsPostFailoverToFile(t *testing.T) {
fileName := file.Name()
filePath := path.Join(cdrsPostFailCfg.EFsCfg().FailedPostsDir, fileName)
- ev, err := efs.NewFailoverPosterFromFile(filePath, utils.EEs)
+ ev, err := efs.NewFailoverPosterFromFile(filePath, utils.EEs, nil)
if err != nil {
t.Errorf("<%s> for file <%s>", err, fileName)
continue
diff --git a/services/cgr-engine.go b/services/cgr-engine.go
index ebd57f3dd..ff8e2f37a 100644
--- a/services/cgr-engine.go
+++ b/services/cgr-engine.go
@@ -354,15 +354,11 @@ func (cgr *CGREngine) Init(ctx *context.Context, shtDw context.CancelFunc, flags
}
// init syslog
- if utils.Logger, err = utils.NewLogger(
+ if utils.Logger, err = engine.NewLogger(
utils.FirstNonEmpty(*flags.SysLogger, cgr.cfg.LoggerCfg().Type),
cgr.cfg.GeneralCfg().DefaultTenant,
cgr.cfg.GeneralCfg().NodeID,
- cgr.cfg.LoggerCfg().Level,
- cgr.cfg.LoggerCfg().Opts.KafkaAttempts,
- cgr.cfg.LoggerCfg().Opts.KafkaConn,
- cgr.cfg.LoggerCfg().Opts.KafkaTopic,
- cgr.cfg.LoggerCfg().Opts.FailedPostsDir); err != nil {
+ cgr.cM, cgr.cfg); err != nil {
return fmt.Errorf("Could not initialize syslog connection, err: <%s>", err)
}
efs.SetFailedPostCacheTTL(cgr.cfg.EFsCfg().FailedPostsTTL) // init failedPosts to posts loggers/exporters in case of failing
diff --git a/utils/kafka_logger.go b/utils/kafka_logger.go
deleted file mode 100644
index 7b44bc805..000000000
--- a/utils/kafka_logger.go
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
-Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
-Copyright (C) ITsysCOM GmbH
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see
-*/
-
-package utils
-
-import (
- "log/syslog"
- "sync"
- "time"
-
- "github.com/cgrates/birpc/context"
- "github.com/segmentio/kafka-go"
-)
-
-// Logs to kafka
-type ExportLogger struct {
- sync.Mutex
-
- LogLevel int
- FldPostDir string
- Writer *kafka.Writer
- NodeID string
- Tenant string
-}
-
-// NewExportLogger will export loggers to kafka
-func NewExportLogger(nodeID, tenant string, level int,
- connOpts, connTopic string, attempts int, fldPostDir string) (el *ExportLogger) {
- el = &ExportLogger{
- LogLevel: level,
- FldPostDir: fldPostDir,
- NodeID: nodeID,
- Tenant: tenant,
- Writer: &kafka.Writer{
- Addr: kafka.TCP(connOpts),
- Topic: connTopic,
- MaxAttempts: attempts,
- },
- }
- return
-}
-
-func (el *ExportLogger) Close() (err error) {
- if el.Writer != nil {
- err = el.Writer.Close()
- el.Writer = nil
- }
- return
-}
-
-func (el *ExportLogger) call(m string, level int) (err error) {
- eventExport := &CGREvent{
- Tenant: el.Tenant,
- Event: map[string]interface{}{
- NodeID: el.NodeID,
- Message: m,
- Severity: level,
- Timestamp: time.Now().Format("2006-01-02 15:04:05"),
- },
- }
- // event will be exported through kafka as json format
- var content []byte
- if content, err = ToUnescapedJSON(eventExport); err != nil {
- return
- }
- if err = el.Writer.WriteMessages(context.Background(), kafka.Message{
- Key: []byte(GenUUID()),
- Value: content,
- }); err != nil {
- /*
- // if there are any errors in kafka, we will post in FailedPostDirectory
- AddFailedMessage(el.FldPostDir, el.Writer.Addr.String(), MetaKafkaLog, Kafka,
- eventExport, el.GetMeta())
- // also the content should be printed as a stdout logger type
- return ErrLoggerChanged
- */
- }
- return
-}
-
-func (el *ExportLogger) Write(p []byte) (n int, err error) {
- n = len(p)
- err = el.call(string(p), 8)
- return
-}
-
-func (sl *ExportLogger) GetSyslog() *syslog.Writer {
- return nil
-}
-
-// GetLogLevel() returns the level logger number for the server
-func (el *ExportLogger) GetLogLevel() int {
- return el.LogLevel
-}
-
-// SetLogLevel changes the log level
-func (el *ExportLogger) SetLogLevel(level int) {
- el.LogLevel = level
-}
-
-// Alert logs to EEs with alert level
-func (el *ExportLogger) Alert(m string) (err error) {
- if el.LogLevel < LOGLEVEL_ALERT {
- return nil
- }
- if err = el.call(m, LOGLEVEL_ALERT); err != nil {
- if err == ErrLoggerChanged {
- NewStdLogger(el.NodeID, el.LogLevel).Alert(m)
- err = nil
- }
- }
- return
-}
-
-// Crit logs to EEs with critical level
-func (el *ExportLogger) Crit(m string) (err error) {
- if el.LogLevel < LOGLEVEL_CRITICAL {
- return nil
- }
- if el.call(m, LOGLEVEL_CRITICAL); err != nil {
- if err == ErrLoggerChanged {
- NewStdLogger(el.NodeID, el.LogLevel).Crit(m)
- err = nil
- }
- }
- return
-}
-
-// Debug logs to EEs with debug level
-func (el *ExportLogger) Debug(m string) (err error) {
- if el.LogLevel < LOGLEVEL_DEBUG {
- return nil
- }
- if err = el.call(m, LOGLEVEL_DEBUG); err != nil {
- if err == ErrLoggerChanged {
- NewStdLogger(el.NodeID, el.LogLevel).Debug(m)
- err = nil
- }
- }
- return
-}
-
-// Emerg logs to EEs with emergency level
-func (el *ExportLogger) Emerg(m string) (err error) {
- if el.LogLevel < LOGLEVEL_EMERGENCY {
- return nil
- }
- if err = el.call(m, LOGLEVEL_EMERGENCY); err != nil {
- if err == ErrLoggerChanged {
- NewStdLogger(el.NodeID, el.LogLevel).Emerg(m)
- err = nil
- }
- }
- return
-}
-
-// Err logs to EEs with error level
-func (el *ExportLogger) Err(m string) (err error) {
- if el.LogLevel < LOGLEVEL_ERROR {
- return nil
- }
- if err = el.call(m, LOGLEVEL_ERROR); err != nil {
- if err == ErrLoggerChanged {
- NewStdLogger(el.NodeID, el.LogLevel).Err(m)
- err = nil
- }
- }
- return
-}
-
-// Info logs to EEs with info level
-func (el *ExportLogger) Info(m string) (err error) {
- if el.LogLevel < LOGLEVEL_INFO {
- return nil
- }
- if err = el.call(m, LOGLEVEL_INFO); err != nil {
- if err == ErrLoggerChanged {
- NewStdLogger(el.NodeID, el.LogLevel).Info(m)
- err = nil
- }
- }
- return
-}
-
-// Notice logs to EEs with notice level
-func (el *ExportLogger) Notice(m string) (err error) {
- if el.LogLevel < LOGLEVEL_NOTICE {
- return nil
- }
- if err = el.call(m, LOGLEVEL_NOTICE); err != nil {
- if err == ErrLoggerChanged {
- NewStdLogger(el.NodeID, el.LogLevel).Notice(m)
- err = nil
- }
- }
- return
-}
-
-// Warning logs to EEs with warning level
-func (el *ExportLogger) Warning(m string) (err error) {
- if el.LogLevel < LOGLEVEL_WARNING {
- return nil
- }
- if err = el.call(m, LOGLEVEL_WARNING); err != nil {
- if err == ErrLoggerChanged {
- NewStdLogger(el.NodeID, el.LogLevel).Warning(m)
- err = nil
- }
- }
- return
-}
-
-func (el *ExportLogger) GetMeta() map[string]interface{} {
- return map[string]interface{}{
- Tenant: el.Tenant,
- NodeID: el.NodeID,
- Level: el.LogLevel,
- Format: el.Writer.Topic,
- Conn: el.Writer.Addr.String(),
- FailedPostsDir: el.FldPostDir,
- Attempts: el.Writer.MaxAttempts,
- }
-}
diff --git a/utils/logger.go b/utils/logger.go
index 3ee36dfc3..afe9dbe73 100644
--- a/utils/logger.go
+++ b/utils/logger.go
@@ -29,8 +29,7 @@ import (
var Logger LoggerInterface
func init() {
- Logger, _ = NewLogger(MetaStdLog, EmptyString, EmptyString, 0,
- 0, EmptyString, EmptyString, EmptyString)
+ Logger, _ = NewLogger(MetaStdLog, EmptyString, 0)
}
// log severities following rfc3164
@@ -61,11 +60,8 @@ type LoggerInterface interface {
Write(p []byte) (n int, err error)
}
-func NewLogger(loggerType, tenant, nodeID string, logLvl, attempts int, connOpts,
- topicOpts, fldPostsDir string) (LoggerInterface, error) {
+func NewLogger(loggerType, nodeID string, logLvl int) (LoggerInterface, error) {
switch loggerType {
- case MetaKafkaLog:
- return NewExportLogger(nodeID, tenant, logLvl, connOpts, topicOpts, attempts, fldPostsDir), nil
case MetaStdLog:
return NewStdLogger(nodeID, logLvl), nil
case MetaSysLog:
diff --git a/utils/logger_test.go b/utils/logger_test.go
index 25b874ae0..4a1134bcf 100644
--- a/utils/logger_test.go
+++ b/utils/logger_test.go
@@ -35,7 +35,7 @@ func TestLoggerNewLoggerStdoutOK(t *testing.T) {
log.New(os.Stderr, EmptyString, log.LstdFlags),
},
}
- if rcv, err := NewLogger(MetaStdLog, EmptyString, "1234", 7, 0, EmptyString, EmptyString, EmptyString); err != nil {
+ if rcv, err := NewLogger(MetaStdLog, "1234", 7); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(rcv, exp) {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv)
@@ -46,7 +46,7 @@ func TestLoggerNewLoggerSyslogOK(t *testing.T) {
exp := &SysLogger{
logLevel: 7,
}
- if rcv, err := NewLogger(MetaSysLog, EmptyString, "1234", 7, 0, EmptyString, EmptyString, EmptyString); err != nil {
+ if rcv, err := NewLogger(MetaSysLog, EmptyString, 7); err != nil {
t.Error(err)
} else {
exp.syslog = rcv.GetSyslog()
@@ -58,7 +58,7 @@ func TestLoggerNewLoggerSyslogOK(t *testing.T) {
func TestLoggerNewLoggerUnsupported(t *testing.T) {
experr := `unsupported logger: `
- if _, err := NewLogger("unsupported", EmptyString, "1234", 7, 0, EmptyString, EmptyString, EmptyString); err == nil || err.Error() != experr {
+ if _, err := NewLogger("unsupported", EmptyString, 7); err == nil || err.Error() != experr {
t.Errorf("expected: <%s>, \nreceived: <%+v>", experr, err)
}
}