From a448fb2c7a7f31eb3077107e22e3bc9d02584bd7 Mon Sep 17 00:00:00 2001 From: adi Date: Mon, 25 Jul 2022 16:53:01 +0300 Subject: [PATCH] INtegrated efs in logger/ees + ers changes --- ees/ees.go | 9 +- ees/ees_test.go | 18 +- efs/failed_loggs.go | 14 +- efs/libefs.go | 2 +- engine/kafka_logger.go | 268 ++++++++++++++++++++ {utils => engine}/kafka_logger_test.go | 2 +- ers/amqp.go | 13 +- ers/amqp_it_test.go | 4 +- ers/amqpv1.go | 13 +- ers/amqpv1_it_test.go | 6 +- ers/ers.go | 2 +- ers/kafka.go | 13 +- ers/kafka_it_test.go | 2 +- ers/kafka_test.go | 2 +- ers/nats.go | 13 +- ers/nats_it_test.go | 4 +- ers/reader.go | 14 +- ers/readers_test.go | 24 +- ers/s3.go | 13 +- ers/s3_it_test.go | 13 +- ers/sqs.go | 13 +- ers/sqs_test.go | 6 +- general_tests/cdrs_post_failover_it_test.go | 2 +- services/cgr-engine.go | 8 +- utils/kafka_logger.go | 238 ----------------- utils/logger.go | 8 +- utils/logger_test.go | 6 +- 27 files changed, 383 insertions(+), 347 deletions(-) create mode 100644 engine/kafka_logger.go rename {utils => engine}/kafka_logger_test.go (99%) delete mode 100644 utils/kafka_logger.go diff --git a/ees/ees.go b/ees/ees.go index 8b37b6c7d..62a53bba0 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -281,7 +281,7 @@ func exportEventWithExporter(ctx *context.Context, exp EventExporter, connMngr * } func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{}, key interface{}, - connMnngr *engine.ConnManager, tnt string) (err error) { + connMngr *engine.ConnManager, tnt string) (err error) { if exp.Cfg().FailedPostsDir != utils.MetaNone { defer func() { if err != nil { @@ -294,16 +294,11 @@ func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{} APIOpts: exp.Cfg().Opts.AsMapInterface(), } var reply string - if err = connMnngr.Call(ctx, exp.Cfg().EFsConns, + if err = connMngr.Call(ctx, exp.Cfg().EFsConns, utils.EfSv1ProcessEvent, args, &reply); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> Exporter <%s> could not be written with <%s> service because err: <%s>", utils.EEs, exp.Cfg().ID, utils.EFs, err.Error())) - /* - utils.AddFailedMessage(exp.Cfg().FailedPostsDir, exp.Cfg().ExportPath, - exp.Cfg().Type, utils.EEs, - eEv, exp.Cfg().Opts.AsMapInterface()) - */ } } }() diff --git a/ees/ees_test.go b/ees/ees_test.go index e77d41a03..effcf18fe 100644 --- a/ees/ees_test.go +++ b/ees/ees_test.go @@ -111,7 +111,6 @@ func TestAttrSProcessEvent(t *testing.T) { connMgr := engine.NewConnManager(cfg) connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes), utils.AttributeSv1, clientConn) eeS := NewEventExporterS(cfg, filterS, connMgr) - // cgrEv := &utils.CGREvent{} if err := eeS.attrSProcessEvent(context.TODO(), cgrEv, []string{}, utils.EmptyString); err != nil { t.Error(err) } @@ -263,6 +262,14 @@ func TestV1ProcessEvent3(t *testing.T) { } func TestV1ProcessEvent4(t *testing.T) { + testMock := &testMockEvent{ + calls: map[string]func(_ *context.Context, _, _ interface{}) error{ + utils.EfSv1ProcessEvent: func(_ *context.Context, args, reply interface{}) error { + *reply.(*string) = utils.OK + return nil + }, + }, + } cfg := config.NewDefaultCGRConfig() cfg.EEsCfg().Exporters[0].Type = utils.MetaHTTPPost cfg.EEsCfg().Exporters[0].ID = "SQLExporterFull" @@ -270,12 +277,17 @@ func TestV1ProcessEvent4(t *testing.T) { newIDb := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items) newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, newDM) - eeS := NewEventExporterS(cfg, filterS, nil) + connMngr := engine.NewConnManager(cfg) + clientConn := make(chan birpc.ClientConnector, 1) + clientConn <- testMock + connMgr := engine.NewConnManager(cfg) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEFs), utils.EfSv1, clientConn) + eeS := NewEventExporterS(cfg, filterS, connMngr) eeS.eesChs = map[string]*ltcache.Cache{ utils.MetaHTTPPost: ltcache.NewCache(1, time.Second, false, onCacheEvicted), } - newEeS, err := NewEventExporter(cfg.EEsCfg().Exporters[0], cfg, filterS, nil) + newEeS, err := NewEventExporter(cfg.EEsCfg().Exporters[0], cfg, filterS, connMngr) if err != nil { t.Error(err) } diff --git a/efs/failed_loggs.go b/efs/failed_loggs.go index d0bd74b25..28c450851 100644 --- a/efs/failed_loggs.go +++ b/efs/failed_loggs.go @@ -25,6 +25,7 @@ import ( "sync" "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/segmentio/kafka-go" @@ -40,8 +41,8 @@ type FailedExportersLogg struct { FailedPostsDir string Module string - efsConns []string connMngr *engine.ConnManager + cfg *config.CGRConfig } // AddEvent adds one event @@ -75,8 +76,8 @@ func (expEv *FailedExportersLogg) ReplayFailedPosts(ctx *context.Context, attemp if err != nil { return } - expLogger := utils.NewExportLogger(nodeID, tnt, logLvl, - expEv.Path, expEv.Format, attempts, expEv.FailedPostsDir) + expLogger := engine.NewExportLogger(nodeID, tnt, logLvl, + expEv.connMngr, expEv.cfg) for _, event := range expEv.Events { var content []byte if content, err = utils.ToUnescapedJSON(event); err != nil { @@ -88,7 +89,7 @@ func (expEv *FailedExportersLogg) ReplayFailedPosts(ctx *context.Context, attemp }); err != nil { var reply string // if there are any errors in kafka, we will post in FailedPostDirectory - if err = expEv.connMngr.Call(ctx, expEv.efsConns, utils.EfSv1ProcessEvent, + if err = expEv.connMngr.Call(ctx, expEv.cfg.LoggerCfg().EFsConns, utils.EfSv1ProcessEvent, &utils.ArgsFailedPosts{ Tenant: tnt, Path: expLogger.Writer.Addr.String(), @@ -98,11 +99,6 @@ func (expEv *FailedExportersLogg) ReplayFailedPosts(ctx *context.Context, attemp APIOpts: expLogger.GetMeta(), }, &reply); err != nil { return err - /* - utils.AddFailedMessage(expLogger.FldPostDir, expLogger.Writer.Addr.String(), utils. - MetaKafkaLog, utils.Kafka, - event, expLogger.GetMeta()) - */ } return nil } diff --git a/efs/libefs.go b/efs/libefs.go index f883d537c..2e00f8b7e 100644 --- a/efs/libefs.go +++ b/efs/libefs.go @@ -116,7 +116,7 @@ func NewFailoverPosterFromFile(filePath, providerType string, efs *EfS) (failPos connMngr: efs.connMgr, } case utils.Kafka: - expEv.efsConns = efs.cfg.LoggerCfg().EFsConns + expEv.cfg = efs.cfg expEv.connMngr = efs.connMgr failPoster = expEv } diff --git a/engine/kafka_logger.go b/engine/kafka_logger.go new file mode 100644 index 000000000..e8f5388fe --- /dev/null +++ b/engine/kafka_logger.go @@ -0,0 +1,268 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "fmt" + "log/syslog" + "sync" + "time" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" + "github.com/segmentio/kafka-go" +) + +func NewLogger(loggerType, tnt, nodeID string, + connMgr *ConnManager, cfg *config.CGRConfig) (utils.LoggerInterface, error) { + switch loggerType { + case utils.MetaKafkaLog: + return NewExportLogger(nodeID, tnt, cfg.LoggerCfg().Level, connMgr, cfg), nil + case utils.MetaStdLog, utils.MetaSysLog: + return utils.NewLogger(loggerType, nodeID, cfg.LoggerCfg().Level) + default: + return nil, fmt.Errorf("unsupported logger: <%+s>", loggerType) + } +} + +// Logs to kafka +type ExportLogger struct { + sync.Mutex + cfg *config.CGRConfig + connMgr *ConnManager + + LogLevel int + FldPostDir string + Writer *kafka.Writer + NodeID string + Tenant string +} + +// NewExportLogger will export loggers to kafka +func NewExportLogger(nodeID, tenant string, level int, + connMgr *ConnManager, cfg *config.CGRConfig) (el *ExportLogger) { + el = &ExportLogger{ + connMgr: connMgr, + cfg: cfg, + LogLevel: level, + FldPostDir: cfg.LoggerCfg().Opts.FailedPostsDir, + NodeID: nodeID, + Tenant: tenant, + Writer: &kafka.Writer{ + Addr: kafka.TCP(cfg.LoggerCfg().Opts.KafkaConn), + Topic: cfg.LoggerCfg().Opts.KafkaTopic, + MaxAttempts: cfg.LoggerCfg().Opts.KafkaAttempts, + }, + } + return +} + +func (el *ExportLogger) Close() (err error) { + if el.Writer != nil { + err = el.Writer.Close() + el.Writer = nil + } + return +} + +func (el *ExportLogger) call(m string, level int) (err error) { + eventExport := &utils.CGREvent{ + Tenant: el.Tenant, + Event: map[string]interface{}{ + utils.NodeID: el.NodeID, + utils.Message: m, + utils.Severity: level, + utils.Timestamp: time.Now().Format("2006-01-02 15:04:05"), + }, + } + // event will be exported through kafka as json format + var content []byte + if content, err = utils.ToUnescapedJSON(eventExport); err != nil { + return + } + if err = el.Writer.WriteMessages(context.Background(), kafka.Message{ + Key: []byte(utils.GenUUID()), + Value: content, + }); err != nil { + // if there are any errors in kafka, we will post in FailedPostDirectory + args := &utils.ArgsFailedPosts{ + Tenant: el.Tenant, + Path: el.Writer.Addr.String(), + Event: eventExport, + FailedDir: el.FldPostDir, + Module: utils.Kafka, + APIOpts: el.GetMeta(), + } + var reply string + if err = el.connMgr.Call(context.Background(), el.cfg.LoggerCfg().EFsConns, + utils.EfSv1ProcessEvent, args, &reply); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> Exporter could not writte failed event with <%s> service because err: <%s>", + utils.Logger, utils.EFs, err.Error())) + } + // also the content should be printed as a stdout logger type + return utils.ErrLoggerChanged + } + return +} + +func (el *ExportLogger) Write(p []byte) (n int, err error) { + n = len(p) + err = el.call(string(p), 8) + return +} + +func (sl *ExportLogger) GetSyslog() *syslog.Writer { + return nil +} + +// GetLogLevel() returns the level logger number for the server +func (el *ExportLogger) GetLogLevel() int { + return el.LogLevel +} + +// SetLogLevel changes the log level +func (el *ExportLogger) SetLogLevel(level int) { + el.LogLevel = level +} + +// Alert logs to EEs with alert level +func (el *ExportLogger) Alert(m string) (err error) { + if el.LogLevel < utils.LOGLEVEL_ALERT { + return nil + } + if err = el.call(m, utils.LOGLEVEL_ALERT); err != nil { + if err == utils.ErrLoggerChanged { + utils.NewStdLogger(el.NodeID, el.LogLevel).Alert(m) + err = nil + } + } + return +} + +// Crit logs to EEs with critical level +func (el *ExportLogger) Crit(m string) (err error) { + if el.LogLevel < utils.LOGLEVEL_CRITICAL { + return nil + } + if el.call(m, utils.LOGLEVEL_CRITICAL); err != nil { + if err == utils.ErrLoggerChanged { + utils.NewStdLogger(el.NodeID, el.LogLevel).Crit(m) + err = nil + } + } + return +} + +// Debug logs to EEs with debug level +func (el *ExportLogger) Debug(m string) (err error) { + if el.LogLevel < utils.LOGLEVEL_DEBUG { + return nil + } + if err = el.call(m, utils.LOGLEVEL_DEBUG); err != nil { + if err == utils.ErrLoggerChanged { + utils.NewStdLogger(el.NodeID, el.LogLevel).Debug(m) + err = nil + } + } + return +} + +// Emerg logs to EEs with emergency level +func (el *ExportLogger) Emerg(m string) (err error) { + if el.LogLevel < utils.LOGLEVEL_EMERGENCY { + return nil + } + if err = el.call(m, utils.LOGLEVEL_EMERGENCY); err != nil { + if err == utils.ErrLoggerChanged { + utils.NewStdLogger(el.NodeID, el.LogLevel).Emerg(m) + err = nil + } + } + return +} + +// Err logs to EEs with error level +func (el *ExportLogger) Err(m string) (err error) { + if el.LogLevel < utils.LOGLEVEL_ERROR { + return nil + } + if err = el.call(m, utils.LOGLEVEL_ERROR); err != nil { + if err == utils.ErrLoggerChanged { + utils.NewStdLogger(el.NodeID, el.LogLevel).Err(m) + err = nil + } + } + return +} + +// Info logs to EEs with info level +func (el *ExportLogger) Info(m string) (err error) { + if el.LogLevel < utils.LOGLEVEL_INFO { + return nil + } + if err = el.call(m, utils.LOGLEVEL_INFO); err != nil { + if err == utils.ErrLoggerChanged { + utils.NewStdLogger(el.NodeID, el.LogLevel).Info(m) + err = nil + } + } + return +} + +// Notice logs to EEs with notice level +func (el *ExportLogger) Notice(m string) (err error) { + if el.LogLevel < utils.LOGLEVEL_NOTICE { + return nil + } + if err = el.call(m, utils.LOGLEVEL_NOTICE); err != nil { + if err == utils.ErrLoggerChanged { + utils.NewStdLogger(el.NodeID, el.LogLevel).Notice(m) + err = nil + } + } + return +} + +// Warning logs to EEs with warning level +func (el *ExportLogger) Warning(m string) (err error) { + if el.LogLevel < utils.LOGLEVEL_WARNING { + return nil + } + if err = el.call(m, utils.LOGLEVEL_WARNING); err != nil { + if err == utils.ErrLoggerChanged { + utils.NewStdLogger(el.NodeID, el.LogLevel).Warning(m) + err = nil + } + } + return +} + +func (el *ExportLogger) GetMeta() map[string]interface{} { + return map[string]interface{}{ + utils.Tenant: el.Tenant, + utils.NodeID: el.NodeID, + utils.Level: el.LogLevel, + utils.Format: el.Writer.Topic, + utils.Conn: el.Writer.Addr.String(), + utils.FailedPostsDir: el.FldPostDir, + utils.Attempts: el.Writer.MaxAttempts, + } +} diff --git a/utils/kafka_logger_test.go b/engine/kafka_logger_test.go similarity index 99% rename from utils/kafka_logger_test.go rename to engine/kafka_logger_test.go index 7557ca2ae..e6a8a1379 100644 --- a/utils/kafka_logger_test.go +++ b/engine/kafka_logger_test.go @@ -16,7 +16,7 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -package utils +package engine /* func TestLoggerNewLoggerExport(t *testing.T) { diff --git a/ers/amqp.go b/ers/amqp.go index 171d87092..f0283bfeb 100644 --- a/ers/amqp.go +++ b/ers/amqp.go @@ -35,8 +35,9 @@ import ( // NewAMQPER return a new amqp event reader func NewAMQPER(cfg *config.CGRConfig, cfgIdx int, rdrEvents, partialEvents chan *erEvent, rdrErr chan error, - fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { + fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (er EventReader, err error) { rdr := &AMQPER{ + connMgr: connMgr, cgrCfg: cfg, cfgIdx: cfgIdx, fltrS: fltrS, @@ -60,9 +61,10 @@ func NewAMQPER(cfg *config.CGRConfig, cfgIdx int, // AMQPER implements EventReader interface for amqp message type AMQPER struct { // sync.RWMutex - cgrCfg *config.CGRConfig - cfgIdx int // index of config instance within ERsCfg.Readers - fltrS *engine.FilterS + cgrCfg *config.CGRConfig + cfgIdx int // index of config instance within ERsCfg.Readers + fltrS *engine.FilterS + connMgr *engine.ConnManager dialURL string queueID string @@ -167,7 +169,8 @@ func (rdr *AMQPER) readLoop(msgChan <-chan amqp.Delivery) { utils.ERs, msg.MessageId, err.Error())) } if rdr.poster != nil { // post it - if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Body, utils.EmptyString); err != nil { + if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Body, utils.EmptyString, + rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> writing message %s error: %s", utils.ERs, msg.MessageId, err.Error())) diff --git a/ers/amqp_it_test.go b/ers/amqp_it_test.go index ce44b46c9..dcaf3c10a 100644 --- a/ers/amqp_it_test.go +++ b/ers/amqp_it_test.go @@ -75,7 +75,7 @@ func TestAMQPER(t *testing.T) { rdrExit = make(chan struct{}, 1) if rdr, err = NewAMQPER(cfg, 1, rdrEvents, make(chan *erEvent, 1), - rdrErr, new(engine.FilterS), rdrExit); err != nil { + rdrErr, new(engine.FilterS), rdrExit, nil); err != nil { t.Fatal(err) } connection, err := amqp.Dial("amqp://guest:guest@localhost:5672/") @@ -136,7 +136,7 @@ func TestAMQPERServeError(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfgIdx := 0 expected := "AMQP scheme must be either 'amqp://' or 'amqps://'" - rdr, err := NewAMQPER(cfg, cfgIdx, nil, nil, nil, nil, nil) + rdr, err := NewAMQPER(cfg, cfgIdx, nil, nil, nil, nil, nil, nil) if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } diff --git a/ers/amqpv1.go b/ers/amqpv1.go index 8b614be79..f70969ee7 100644 --- a/ers/amqpv1.go +++ b/ers/amqpv1.go @@ -35,8 +35,9 @@ import ( // NewAMQPv1ER return a new amqpv1 event reader func NewAMQPv1ER(cfg *config.CGRConfig, cfgIdx int, rdrEvents, partialEvents chan *erEvent, rdrErr chan error, - fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { + fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (er EventReader, err error) { rdr := &AMQPv1ER{ + connMgr: connMgr, cgrCfg: cfg, cfgIdx: cfgIdx, fltrS: fltrS, @@ -61,9 +62,10 @@ func NewAMQPv1ER(cfg *config.CGRConfig, cfgIdx int, // AMQPv1ER implements EventReader interface for amqpv1 message type AMQPv1ER struct { // sync.RWMutex - cgrCfg *config.CGRConfig - cfgIdx int // index of config instance within ERsCfg.Readers - fltrS *engine.FilterS + cgrCfg *config.CGRConfig + cfgIdx int // index of config instance within ERsCfg.Readers + fltrS *engine.FilterS + connMgr *engine.ConnManager queueID string @@ -145,7 +147,8 @@ func (rdr *AMQPv1ER) readLoop(recv *amqpv1.Receiver) (err error) { utils.ERs, err.Error())) } if rdr.poster != nil { // post it - if err := ees.ExportWithAttempts(context.Background(), rdr.poster, body, utils.EmptyString); err != nil { + if err := ees.ExportWithAttempts(context.Background(), rdr.poster, body, utils.EmptyString, rdr.connMgr, + rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> writing message error: %s", utils.ERs, err.Error())) diff --git a/ers/amqpv1_it_test.go b/ers/amqpv1_it_test.go index fe9d97d19..f23e86367 100644 --- a/ers/amqpv1_it_test.go +++ b/ers/amqpv1_it_test.go @@ -80,7 +80,7 @@ func TestAMQPERv1(t *testing.T) { rdrExit = make(chan struct{}, 1) if rdr, err = NewAMQPv1ER(cfg, 1, rdrEvents, make(chan *erEvent, 1), - rdrErr, new(engine.FilterS), rdrExit); err != nil { + rdrErr, new(engine.FilterS), rdrExit, nil); err != nil { t.Fatal(err) } amqpv1Rdr := rdr.(*AMQPv1ER) @@ -153,7 +153,7 @@ func TestAmqpv1NewAMQPv1ER(t *testing.T) { }, } - result, err := NewAMQPv1ER(cfg, cfgIdx, nil, nil, nil, nil, nil) + result, err := NewAMQPv1ER(cfg, cfgIdx, nil, nil, nil, nil, nil, nil) if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } @@ -184,7 +184,7 @@ func TestAmqpv1NewAMQPv1ER2(t *testing.T) { }, } - result, err := NewAMQPv1ER(cfg, cfgIdx, nil, nil, nil, nil, nil) + result, err := NewAMQPv1ER(cfg, cfgIdx, nil, nil, nil, nil, nil, nil) if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } diff --git a/ers/ers.go b/ers/ers.go index 6ac0f9ba6..a93d74497 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -164,7 +164,7 @@ func (erS *ERService) addReader(rdrID string, cfgIdx int) (err error) { var rdr EventReader if rdr, err = NewEventReader(erS.cfg, cfgIdx, erS.rdrEvents, erS.partialEvents, erS.rdrErr, - erS.filterS, erS.stopLsn[rdrID]); err != nil { + erS.filterS, erS.stopLsn[rdrID], erS.connMgr); err != nil { return } erS.rdrs[rdrID] = rdr diff --git a/ers/kafka.go b/ers/kafka.go index ccef5724a..c578a05a0 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -37,9 +37,10 @@ import ( // NewKafkaER return a new kafka event reader func NewKafkaER(cfg *config.CGRConfig, cfgIdx int, rdrEvents, partialEvents chan *erEvent, rdrErr chan error, - fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { + fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (er EventReader, err error) { rdr := &KafkaER{ + connMgr: connMgr, cgrCfg: cfg, cfgIdx: cfgIdx, fltrS: fltrS, @@ -65,9 +66,10 @@ func NewKafkaER(cfg *config.CGRConfig, cfgIdx int, // KafkaER implements EventReader interface for kafka message type KafkaER struct { // sync.RWMutex - cgrCfg *config.CGRConfig - cfgIdx int // index of config instance within ERsCfg.Readers - fltrS *engine.FilterS + cgrCfg *config.CGRConfig + cfgIdx int // index of config instance within ERsCfg.Readers + fltrS *engine.FilterS + connMgr *engine.ConnManager dialURL string topic string @@ -141,7 +143,8 @@ func (rdr *KafkaER) readLoop(r *kafka.Reader) { utils.ERs, string(msg.Key), err.Error())) } if rdr.poster != nil { // post it - if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Value, string(msg.Key)); err != nil { + if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Value, string(msg.Key), + rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> writing message %s error: %s", utils.ERs, string(msg.Key), err.Error())) diff --git a/ers/kafka_it_test.go b/ers/kafka_it_test.go index 3c20768dd..c0e7c1414 100644 --- a/ers/kafka_it_test.go +++ b/ers/kafka_it_test.go @@ -75,7 +75,7 @@ func TestKafkaER(t *testing.T) { rdrExit = make(chan struct{}, 1) if rdr, err = NewKafkaER(cfg, 1, rdrEvents, make(chan *erEvent, 1), - rdrErr, new(engine.FilterS), rdrExit); err != nil { + rdrErr, new(engine.FilterS), rdrExit, nil); err != nil { t.Fatal(err) } w := kafka.NewWriter(kafka.WriterConfig{ diff --git a/ers/kafka_test.go b/ers/kafka_test.go index dc8ee5d1f..5af31ad4e 100644 --- a/ers/kafka_test.go +++ b/ers/kafka_test.go @@ -104,7 +104,7 @@ func TestKafkaERServe(t *testing.T) { rdrEvents := make(chan *erEvent, 1) rdrExit := make(chan struct{}, 1) rdrErr := make(chan error, 1) - rdr, err := NewKafkaER(cfg, 0, rdrEvents, make(chan *erEvent, 1), rdrErr, fltrS, rdrExit) + rdr, err := NewKafkaER(cfg, 0, rdrEvents, make(chan *erEvent, 1), rdrErr, fltrS, rdrExit, nil) if err != nil { t.Error(err) } diff --git a/ers/nats.go b/ers/nats.go index 9d4b79909..877a1c9e3 100644 --- a/ers/nats.go +++ b/ers/nats.go @@ -38,8 +38,9 @@ import ( // NewNatsER return a new amqp event reader func NewNatsER(cfg *config.CGRConfig, cfgIdx int, rdrEvents, partialEvents chan *erEvent, rdrErr chan error, - fltrS *engine.FilterS, rdrExit chan struct{}) (_ EventReader, err error) { + fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (_ EventReader, err error) { rdr := &NatsER{ + connMgr: connMgr, cgrCfg: cfg, cfgIdx: cfgIdx, fltrS: fltrS, @@ -66,9 +67,10 @@ func NewNatsER(cfg *config.CGRConfig, cfgIdx int, // NatsER implements EventReader interface for amqp message type NatsER struct { // sync.RWMutex - cgrCfg *config.CGRConfig - cfgIdx int // index of config instance within ERsCfg.Readers - fltrS *engine.FilterS + cgrCfg *config.CGRConfig + cfgIdx int // index of config instance within ERsCfg.Readers + fltrS *engine.FilterS + connMgr *engine.ConnManager rdrEvents chan *erEvent // channel to dispatch the events created to partialEvents chan *erEvent // channel to dispatch the partial events created to @@ -139,7 +141,8 @@ func (rdr *NatsER) Serve() (err error) { utils.ERs, string(msg.Data), err.Error())) } if rdr.poster != nil { // post it - if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Data, utils.EmptyString); err != nil { + if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Data, utils.EmptyString, + rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> writing message %s error: %s", utils.ERs, string(msg.Data), err.Error())) diff --git a/ers/nats_it_test.go b/ers/nats_it_test.go index afc54ed04..257e842e4 100644 --- a/ers/nats_it_test.go +++ b/ers/nats_it_test.go @@ -75,7 +75,7 @@ func testCheckNatsJetStream(t *testing.T, cfg *config.CGRConfig) { rdrExit = make(chan struct{}, 1) var err error if rdr, err = NewNatsER(cfg, 1, rdrEvents, make(chan *erEvent, 1), - rdrErr, new(engine.FilterS), rdrExit); err != nil { + rdrErr, new(engine.FilterS), rdrExit, nil); err != nil { t.Fatal(err) } @@ -162,7 +162,7 @@ func testCheckNatsNormal(t *testing.T, cfg *config.CGRConfig) { var err error if rdr, err = NewNatsER(cfg, 1, rdrEvents, make(chan *erEvent, 1), - rdrErr, new(engine.FilterS), rdrExit); err != nil { + rdrErr, new(engine.FilterS), rdrExit, nil); err != nil { t.Fatal(err) } diff --git a/ers/reader.go b/ers/reader.go index 8d57521ca..c2817572e 100644 --- a/ers/reader.go +++ b/ers/reader.go @@ -35,7 +35,7 @@ type EventReader interface { // NewEventReader instantiates the event reader based on configuration at index func NewEventReader(cfg *config.CGRConfig, cfgIdx int, rdrEvents, partialEvents chan *erEvent, rdrErr chan error, - fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { + fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (er EventReader, err error) { switch cfg.ERsCfg().Readers[cfgIdx].Type { default: err = fmt.Errorf("unsupported reader type: <%s>", cfg.ERsCfg().Readers[cfgIdx].Type) @@ -46,21 +46,21 @@ func NewEventReader(cfg *config.CGRConfig, cfgIdx int, case utils.MetaFileFWV: return NewFWVFileER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit) case utils.MetaKafkajsonMap: - return NewKafkaER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit) + return NewKafkaER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit, connMgr) case utils.MetaSQL: return NewSQLEventReader(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit) case utils.MetaFileJSON: return NewJSONFileER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit) case utils.MetaAMQPjsonMap: - return NewAMQPER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit) + return NewAMQPER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit, connMgr) case utils.MetaS3jsonMap: - return NewS3ER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit) + return NewS3ER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit, connMgr) case utils.MetaSQSjsonMap: - return NewSQSER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit) + return NewSQSER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit, connMgr) case utils.MetaAMQPV1jsonMap: - return NewAMQPv1ER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit) + return NewAMQPv1ER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit, connMgr) case utils.MetaNatsjsonMap: - return NewNatsER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit) + return NewNatsER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit, connMgr) } return } diff --git a/ers/readers_test.go b/ers/readers_test.go index f1a7bab9e..331742b98 100644 --- a/ers/readers_test.go +++ b/ers/readers_test.go @@ -36,7 +36,7 @@ func TestNewInvalidReader(t *testing.T) { if len(cfg.ERsCfg().Readers) != 2 { t.Errorf("Expecting: <2>, received: <%+v>", len(cfg.ERsCfg().Readers)) } - if _, err := NewEventReader(cfg, 1, nil, nil, nil, &engine.FilterS{}, nil); err == nil || err.Error() != "unsupported reader type: " { + if _, err := NewEventReader(cfg, 1, nil, nil, nil, &engine.FilterS{}, nil, nil); err == nil || err.Error() != "unsupported reader type: " { t.Errorf("Expecting: >, received: <%+v>", err) } } @@ -61,7 +61,7 @@ func TestNewCsvReader(t *testing.T) { rdrExit: nil, conReqs: nil} var expected EventReader = exp - if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil); err != nil { + if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil, nil); err != nil { t.Errorf("Expecting: , received: <%+v>", err) } else { // because we use function make to init the channel when we create the EventReader reflect.DeepEqual @@ -84,11 +84,11 @@ func TestNewKafkaReader(t *testing.T) { if len(cfg.ERsCfg().Readers) != 2 { t.Errorf("Expecting: <2>, received: <%+v>", len(cfg.ERsCfg().Readers)) } - expected, err := NewKafkaER(cfg, 1, nil, nil, nil, fltr, nil) + expected, err := NewKafkaER(cfg, 1, nil, nil, nil, fltr, nil, nil) if err != nil { t.Errorf("Expecting: , received: <%+v>", err) } - if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil); err != nil { + if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil, nil); err != nil { t.Errorf("Expecting: , received: <%+v>", err) } else if !reflect.DeepEqual(expected, rcv) { t.Errorf("Expecting: <%+v>, received: <%+v>", expected, rcv) @@ -115,7 +115,7 @@ func TestNewSQLReader(t *testing.T) { if err != nil { t.Errorf("Expecting: , received: <%+v>", err) } - if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil); err != nil { + if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil, nil); err != nil { t.Errorf("Expecting: , received: <%+v>", err) } else if !reflect.DeepEqual(expected, rcv) { t.Errorf("Expecting: <%+v>, received: <%+v>", expected, rcv) @@ -149,7 +149,7 @@ func TestNewFileXMLReader(t *testing.T) { if err != nil { t.Error(err) } - rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) + rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil) if err != nil { t.Error(err) } else { @@ -169,7 +169,7 @@ func TestNewFileFWVReader(t *testing.T) { if err != nil { t.Error(err) } - rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) + rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil) if err != nil { t.Error(nil) } else { @@ -189,7 +189,7 @@ func TestNewJSONReader(t *testing.T) { if err != nil { t.Error(err) } - rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) + rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil) if err != nil { t.Error(err) } else { @@ -219,7 +219,7 @@ func TestNewAMQPReader(t *testing.T) { exp.setOpts(&config.EventReaderOpts{}) exp.createPoster() var expected EventReader = exp - rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) + rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil) if err != nil { t.Error(err) } else if !reflect.DeepEqual(expected, rcv) { @@ -244,7 +244,7 @@ func TestNewAMQPv1Reader(t *testing.T) { exp.Config().Opts = &config.EventReaderOpts{} exp.createPoster() var expected EventReader = exp - rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) + rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil) if err != nil { t.Error(err) } else if !reflect.DeepEqual(expected, rcv) { @@ -270,7 +270,7 @@ func TestNewS3Reader(t *testing.T) { exp.Config().Opts = &config.EventReaderOpts{} exp.createPoster() var expected EventReader = exp - rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) + rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil) if err != nil { t.Error(err) } else if !reflect.DeepEqual(expected, rcv) { @@ -307,7 +307,7 @@ func TestNewSQSReader(t *testing.T) { exp.Config().Opts = &config.EventReaderOpts{} exp.createPoster() var expected EventReader = exp - rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) + rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil) exp.session = rcv.(*SQSER).session if err != nil { t.Error(err) diff --git a/ers/s3.go b/ers/s3.go index b84c86f4d..18f4addef 100644 --- a/ers/s3.go +++ b/ers/s3.go @@ -39,9 +39,10 @@ import ( // NewS3ER return a new s3 event reader func NewS3ER(cfg *config.CGRConfig, cfgIdx int, rdrEvents, partialEvents chan *erEvent, rdrErr chan error, - fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { + fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (er EventReader, err error) { rdr := &S3ER{ + connMgr: connMgr, cgrCfg: cfg, cfgIdx: cfgIdx, fltrS: fltrS, @@ -63,9 +64,10 @@ func NewS3ER(cfg *config.CGRConfig, cfgIdx int, // S3ER implements EventReader interface for s3 message type S3ER struct { // sync.RWMutex - cgrCfg *config.CGRConfig - cfgIdx int // index of config instance within ERsCfg.Readers - fltrS *engine.FilterS + cgrCfg *config.CGRConfig + cfgIdx int // index of config instance within ERsCfg.Readers + fltrS *engine.FilterS + connMgr *engine.ConnManager rdrEvents chan *erEvent // channel to dispatch the events created to partialEvents chan *erEvent // channel to dispatch the partial events created to @@ -247,7 +249,8 @@ func (rdr *S3ER) readMsg(scv *s3.S3, key string) (err error) { } if rdr.poster != nil { // post it - if err = ees.ExportWithAttempts(context.Background(), rdr.poster, msg, key); err != nil { + if err = ees.ExportWithAttempts(context.Background(), rdr.poster, msg, key, + rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> writing message %s error: %s", utils.ERs, key, err.Error())) diff --git a/ers/s3_it_test.go b/ers/s3_it_test.go index 1b2bee7e9..dcde99e2f 100644 --- a/ers/s3_it_test.go +++ b/ers/s3_it_test.go @@ -86,7 +86,7 @@ func TestS3ER(t *testing.T) { rdrExit = make(chan struct{}, 1) if rdr, err = NewS3ER(cfg, 1, rdrEvents, make(chan *erEvent, 1), - rdrErr, new(engine.FilterS), rdrExit); err != nil { + rdrErr, new(engine.FilterS), rdrExit, nil); err != nil { t.Fatal(err) } s3Rdr := rdr.(*S3ER) @@ -176,7 +176,7 @@ func TestNewS3ER(t *testing.T) { } rdr, err := NewS3ER(cfg, 1, nil, nil, - nil, nil, nil) + nil, nil, nil, nil) if err != nil { t.Fatal(err) } @@ -190,18 +190,11 @@ func TestNewS3ERCase2(t *testing.T) { expected := &S3ER{ cgrCfg: cfg, cfgIdx: 0, - fltrS: nil, - rdrEvents: nil, - rdrExit: nil, - rdrErr: nil, - cap: nil, awsRegion: "", awsID: "", awsKey: "", awsToken: "", bucket: "cgrates_cdrs", - session: nil, - poster: nil, } cfg.ERsCfg().Readers = []*config.EventReaderCfg{ { @@ -217,7 +210,7 @@ func TestNewS3ERCase2(t *testing.T) { } rdr, err := NewS3ER(cfg, 0, nil, nil, - nil, nil, nil) + nil, nil, nil, nil) if err != nil { t.Fatal(err) } diff --git a/ers/sqs.go b/ers/sqs.go index 44376c068..108137e71 100644 --- a/ers/sqs.go +++ b/ers/sqs.go @@ -39,9 +39,10 @@ import ( // NewSQSER return a new sqs event reader func NewSQSER(cfg *config.CGRConfig, cfgIdx int, rdrEvents, partialEvents chan *erEvent, rdrErr chan error, - fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { + fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (er EventReader, err error) { rdr := &SQSER{ + connMgr: connMgr, cgrCfg: cfg, cfgIdx: cfgIdx, fltrS: fltrS, @@ -63,9 +64,10 @@ func NewSQSER(cfg *config.CGRConfig, cfgIdx int, // SQSER implements EventReader interface for sqs message type SQSER struct { // sync.RWMutex - cgrCfg *config.CGRConfig - cfgIdx int // index of config instance within ERsCfg.Readers - fltrS *engine.FilterS + cgrCfg *config.CGRConfig + cfgIdx int // index of config instance within ERsCfg.Readers + fltrS *engine.FilterS + connMgr *engine.ConnManager rdrEvents chan *erEvent // channel to dispatch the events created to partialEvents chan *erEvent // channel to dispatch the partial events created to @@ -263,7 +265,8 @@ func (rdr *SQSER) readMsg(scv sqsClient, msg *sqs.Message) (err error) { } if rdr.poster != nil { // post it - if err = ees.ExportWithAttempts(context.Background(), rdr.poster, body, key); err != nil { + if err = ees.ExportWithAttempts(context.Background(), rdr.poster, body, key, + rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> writing message %s error: %s", utils.ERs, key, err.Error())) diff --git a/ers/sqs_test.go b/ers/sqs_test.go index 01332c54f..197f2d350 100644 --- a/ers/sqs_test.go +++ b/ers/sqs_test.go @@ -53,7 +53,7 @@ func TestNewSQSER(t *testing.T) { }, } rdr, err := NewSQSER(cfg, 0, nil, nil, - nil, nil, nil) + nil, nil, nil, nil) if err != nil { t.Fatal(err) } @@ -81,7 +81,7 @@ func TestSQSERServeRunDelay0(t *testing.T) { }, } rdr, err := NewSQSER(cfg, 0, nil, nil, - nil, nil, nil) + nil, nil, nil, nil) if err != nil { t.Fatal(err) } @@ -107,7 +107,7 @@ func TestSQSERServe(t *testing.T) { }, } rdr, err := NewSQSER(cfg, 0, nil, nil, - nil, nil, nil) + nil, nil, nil, nil) if err != nil { t.Fatal(err) } diff --git a/general_tests/cdrs_post_failover_it_test.go b/general_tests/cdrs_post_failover_it_test.go index e563b74a4..cbce1154b 100644 --- a/general_tests/cdrs_post_failover_it_test.go +++ b/general_tests/cdrs_post_failover_it_test.go @@ -203,7 +203,7 @@ func testCDRsPostFailoverToFile(t *testing.T) { fileName := file.Name() filePath := path.Join(cdrsPostFailCfg.EFsCfg().FailedPostsDir, fileName) - ev, err := efs.NewFailoverPosterFromFile(filePath, utils.EEs) + ev, err := efs.NewFailoverPosterFromFile(filePath, utils.EEs, nil) if err != nil { t.Errorf("<%s> for file <%s>", err, fileName) continue diff --git a/services/cgr-engine.go b/services/cgr-engine.go index ebd57f3dd..ff8e2f37a 100644 --- a/services/cgr-engine.go +++ b/services/cgr-engine.go @@ -354,15 +354,11 @@ func (cgr *CGREngine) Init(ctx *context.Context, shtDw context.CancelFunc, flags } // init syslog - if utils.Logger, err = utils.NewLogger( + if utils.Logger, err = engine.NewLogger( utils.FirstNonEmpty(*flags.SysLogger, cgr.cfg.LoggerCfg().Type), cgr.cfg.GeneralCfg().DefaultTenant, cgr.cfg.GeneralCfg().NodeID, - cgr.cfg.LoggerCfg().Level, - cgr.cfg.LoggerCfg().Opts.KafkaAttempts, - cgr.cfg.LoggerCfg().Opts.KafkaConn, - cgr.cfg.LoggerCfg().Opts.KafkaTopic, - cgr.cfg.LoggerCfg().Opts.FailedPostsDir); err != nil { + cgr.cM, cgr.cfg); err != nil { return fmt.Errorf("Could not initialize syslog connection, err: <%s>", err) } efs.SetFailedPostCacheTTL(cgr.cfg.EFsCfg().FailedPostsTTL) // init failedPosts to posts loggers/exporters in case of failing diff --git a/utils/kafka_logger.go b/utils/kafka_logger.go deleted file mode 100644 index 7b44bc805..000000000 --- a/utils/kafka_logger.go +++ /dev/null @@ -1,238 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package utils - -import ( - "log/syslog" - "sync" - "time" - - "github.com/cgrates/birpc/context" - "github.com/segmentio/kafka-go" -) - -// Logs to kafka -type ExportLogger struct { - sync.Mutex - - LogLevel int - FldPostDir string - Writer *kafka.Writer - NodeID string - Tenant string -} - -// NewExportLogger will export loggers to kafka -func NewExportLogger(nodeID, tenant string, level int, - connOpts, connTopic string, attempts int, fldPostDir string) (el *ExportLogger) { - el = &ExportLogger{ - LogLevel: level, - FldPostDir: fldPostDir, - NodeID: nodeID, - Tenant: tenant, - Writer: &kafka.Writer{ - Addr: kafka.TCP(connOpts), - Topic: connTopic, - MaxAttempts: attempts, - }, - } - return -} - -func (el *ExportLogger) Close() (err error) { - if el.Writer != nil { - err = el.Writer.Close() - el.Writer = nil - } - return -} - -func (el *ExportLogger) call(m string, level int) (err error) { - eventExport := &CGREvent{ - Tenant: el.Tenant, - Event: map[string]interface{}{ - NodeID: el.NodeID, - Message: m, - Severity: level, - Timestamp: time.Now().Format("2006-01-02 15:04:05"), - }, - } - // event will be exported through kafka as json format - var content []byte - if content, err = ToUnescapedJSON(eventExport); err != nil { - return - } - if err = el.Writer.WriteMessages(context.Background(), kafka.Message{ - Key: []byte(GenUUID()), - Value: content, - }); err != nil { - /* - // if there are any errors in kafka, we will post in FailedPostDirectory - AddFailedMessage(el.FldPostDir, el.Writer.Addr.String(), MetaKafkaLog, Kafka, - eventExport, el.GetMeta()) - // also the content should be printed as a stdout logger type - return ErrLoggerChanged - */ - } - return -} - -func (el *ExportLogger) Write(p []byte) (n int, err error) { - n = len(p) - err = el.call(string(p), 8) - return -} - -func (sl *ExportLogger) GetSyslog() *syslog.Writer { - return nil -} - -// GetLogLevel() returns the level logger number for the server -func (el *ExportLogger) GetLogLevel() int { - return el.LogLevel -} - -// SetLogLevel changes the log level -func (el *ExportLogger) SetLogLevel(level int) { - el.LogLevel = level -} - -// Alert logs to EEs with alert level -func (el *ExportLogger) Alert(m string) (err error) { - if el.LogLevel < LOGLEVEL_ALERT { - return nil - } - if err = el.call(m, LOGLEVEL_ALERT); err != nil { - if err == ErrLoggerChanged { - NewStdLogger(el.NodeID, el.LogLevel).Alert(m) - err = nil - } - } - return -} - -// Crit logs to EEs with critical level -func (el *ExportLogger) Crit(m string) (err error) { - if el.LogLevel < LOGLEVEL_CRITICAL { - return nil - } - if el.call(m, LOGLEVEL_CRITICAL); err != nil { - if err == ErrLoggerChanged { - NewStdLogger(el.NodeID, el.LogLevel).Crit(m) - err = nil - } - } - return -} - -// Debug logs to EEs with debug level -func (el *ExportLogger) Debug(m string) (err error) { - if el.LogLevel < LOGLEVEL_DEBUG { - return nil - } - if err = el.call(m, LOGLEVEL_DEBUG); err != nil { - if err == ErrLoggerChanged { - NewStdLogger(el.NodeID, el.LogLevel).Debug(m) - err = nil - } - } - return -} - -// Emerg logs to EEs with emergency level -func (el *ExportLogger) Emerg(m string) (err error) { - if el.LogLevel < LOGLEVEL_EMERGENCY { - return nil - } - if err = el.call(m, LOGLEVEL_EMERGENCY); err != nil { - if err == ErrLoggerChanged { - NewStdLogger(el.NodeID, el.LogLevel).Emerg(m) - err = nil - } - } - return -} - -// Err logs to EEs with error level -func (el *ExportLogger) Err(m string) (err error) { - if el.LogLevel < LOGLEVEL_ERROR { - return nil - } - if err = el.call(m, LOGLEVEL_ERROR); err != nil { - if err == ErrLoggerChanged { - NewStdLogger(el.NodeID, el.LogLevel).Err(m) - err = nil - } - } - return -} - -// Info logs to EEs with info level -func (el *ExportLogger) Info(m string) (err error) { - if el.LogLevel < LOGLEVEL_INFO { - return nil - } - if err = el.call(m, LOGLEVEL_INFO); err != nil { - if err == ErrLoggerChanged { - NewStdLogger(el.NodeID, el.LogLevel).Info(m) - err = nil - } - } - return -} - -// Notice logs to EEs with notice level -func (el *ExportLogger) Notice(m string) (err error) { - if el.LogLevel < LOGLEVEL_NOTICE { - return nil - } - if err = el.call(m, LOGLEVEL_NOTICE); err != nil { - if err == ErrLoggerChanged { - NewStdLogger(el.NodeID, el.LogLevel).Notice(m) - err = nil - } - } - return -} - -// Warning logs to EEs with warning level -func (el *ExportLogger) Warning(m string) (err error) { - if el.LogLevel < LOGLEVEL_WARNING { - return nil - } - if err = el.call(m, LOGLEVEL_WARNING); err != nil { - if err == ErrLoggerChanged { - NewStdLogger(el.NodeID, el.LogLevel).Warning(m) - err = nil - } - } - return -} - -func (el *ExportLogger) GetMeta() map[string]interface{} { - return map[string]interface{}{ - Tenant: el.Tenant, - NodeID: el.NodeID, - Level: el.LogLevel, - Format: el.Writer.Topic, - Conn: el.Writer.Addr.String(), - FailedPostsDir: el.FldPostDir, - Attempts: el.Writer.MaxAttempts, - } -} diff --git a/utils/logger.go b/utils/logger.go index 3ee36dfc3..afe9dbe73 100644 --- a/utils/logger.go +++ b/utils/logger.go @@ -29,8 +29,7 @@ import ( var Logger LoggerInterface func init() { - Logger, _ = NewLogger(MetaStdLog, EmptyString, EmptyString, 0, - 0, EmptyString, EmptyString, EmptyString) + Logger, _ = NewLogger(MetaStdLog, EmptyString, 0) } // log severities following rfc3164 @@ -61,11 +60,8 @@ type LoggerInterface interface { Write(p []byte) (n int, err error) } -func NewLogger(loggerType, tenant, nodeID string, logLvl, attempts int, connOpts, - topicOpts, fldPostsDir string) (LoggerInterface, error) { +func NewLogger(loggerType, nodeID string, logLvl int) (LoggerInterface, error) { switch loggerType { - case MetaKafkaLog: - return NewExportLogger(nodeID, tenant, logLvl, connOpts, topicOpts, attempts, fldPostsDir), nil case MetaStdLog: return NewStdLogger(nodeID, logLvl), nil case MetaSysLog: diff --git a/utils/logger_test.go b/utils/logger_test.go index 25b874ae0..4a1134bcf 100644 --- a/utils/logger_test.go +++ b/utils/logger_test.go @@ -35,7 +35,7 @@ func TestLoggerNewLoggerStdoutOK(t *testing.T) { log.New(os.Stderr, EmptyString, log.LstdFlags), }, } - if rcv, err := NewLogger(MetaStdLog, EmptyString, "1234", 7, 0, EmptyString, EmptyString, EmptyString); err != nil { + if rcv, err := NewLogger(MetaStdLog, "1234", 7); err != nil { t.Error(err) } else if !reflect.DeepEqual(rcv, exp) { t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv) @@ -46,7 +46,7 @@ func TestLoggerNewLoggerSyslogOK(t *testing.T) { exp := &SysLogger{ logLevel: 7, } - if rcv, err := NewLogger(MetaSysLog, EmptyString, "1234", 7, 0, EmptyString, EmptyString, EmptyString); err != nil { + if rcv, err := NewLogger(MetaSysLog, EmptyString, 7); err != nil { t.Error(err) } else { exp.syslog = rcv.GetSyslog() @@ -58,7 +58,7 @@ func TestLoggerNewLoggerSyslogOK(t *testing.T) { func TestLoggerNewLoggerUnsupported(t *testing.T) { experr := `unsupported logger: ` - if _, err := NewLogger("unsupported", EmptyString, "1234", 7, 0, EmptyString, EmptyString, EmptyString); err == nil || err.Error() != experr { + if _, err := NewLogger("unsupported", EmptyString, 7); err == nil || err.Error() != experr { t.Errorf("expected: <%s>, \nreceived: <%+v>", experr, err) } }