mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
INtegrated efs in logger/ees + ers changes
This commit is contained in:
@@ -281,7 +281,7 @@ func exportEventWithExporter(ctx *context.Context, exp EventExporter, connMngr *
|
||||
}
|
||||
|
||||
func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{}, key interface{},
|
||||
connMnngr *engine.ConnManager, tnt string) (err error) {
|
||||
connMngr *engine.ConnManager, tnt string) (err error) {
|
||||
if exp.Cfg().FailedPostsDir != utils.MetaNone {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
@@ -294,16 +294,11 @@ func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{}
|
||||
APIOpts: exp.Cfg().Opts.AsMapInterface(),
|
||||
}
|
||||
var reply string
|
||||
if err = connMnngr.Call(ctx, exp.Cfg().EFsConns,
|
||||
if err = connMngr.Call(ctx, exp.Cfg().EFsConns,
|
||||
utils.EfSv1ProcessEvent, args, &reply); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> Exporter <%s> could not be written with <%s> service because err: <%s>",
|
||||
utils.EEs, exp.Cfg().ID, utils.EFs, err.Error()))
|
||||
/*
|
||||
utils.AddFailedMessage(exp.Cfg().FailedPostsDir, exp.Cfg().ExportPath,
|
||||
exp.Cfg().Type, utils.EEs,
|
||||
eEv, exp.Cfg().Opts.AsMapInterface())
|
||||
*/
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -111,7 +111,6 @@ func TestAttrSProcessEvent(t *testing.T) {
|
||||
connMgr := engine.NewConnManager(cfg)
|
||||
connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes), utils.AttributeSv1, clientConn)
|
||||
eeS := NewEventExporterS(cfg, filterS, connMgr)
|
||||
// cgrEv := &utils.CGREvent{}
|
||||
if err := eeS.attrSProcessEvent(context.TODO(), cgrEv, []string{}, utils.EmptyString); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
@@ -263,6 +262,14 @@ func TestV1ProcessEvent3(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestV1ProcessEvent4(t *testing.T) {
|
||||
testMock := &testMockEvent{
|
||||
calls: map[string]func(_ *context.Context, _, _ interface{}) error{
|
||||
utils.EfSv1ProcessEvent: func(_ *context.Context, args, reply interface{}) error {
|
||||
*reply.(*string) = utils.OK
|
||||
return nil
|
||||
},
|
||||
},
|
||||
}
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
cfg.EEsCfg().Exporters[0].Type = utils.MetaHTTPPost
|
||||
cfg.EEsCfg().Exporters[0].ID = "SQLExporterFull"
|
||||
@@ -270,12 +277,17 @@ func TestV1ProcessEvent4(t *testing.T) {
|
||||
newIDb := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
|
||||
newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil)
|
||||
filterS := engine.NewFilterS(cfg, nil, newDM)
|
||||
eeS := NewEventExporterS(cfg, filterS, nil)
|
||||
connMngr := engine.NewConnManager(cfg)
|
||||
clientConn := make(chan birpc.ClientConnector, 1)
|
||||
clientConn <- testMock
|
||||
connMgr := engine.NewConnManager(cfg)
|
||||
connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEFs), utils.EfSv1, clientConn)
|
||||
eeS := NewEventExporterS(cfg, filterS, connMngr)
|
||||
eeS.eesChs = map[string]*ltcache.Cache{
|
||||
utils.MetaHTTPPost: ltcache.NewCache(1,
|
||||
time.Second, false, onCacheEvicted),
|
||||
}
|
||||
newEeS, err := NewEventExporter(cfg.EEsCfg().Exporters[0], cfg, filterS, nil)
|
||||
newEeS, err := NewEventExporter(cfg.EEsCfg().Exporters[0], cfg, filterS, connMngr)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/segmentio/kafka-go"
|
||||
@@ -40,8 +41,8 @@ type FailedExportersLogg struct {
|
||||
FailedPostsDir string
|
||||
Module string
|
||||
|
||||
efsConns []string
|
||||
connMngr *engine.ConnManager
|
||||
cfg *config.CGRConfig
|
||||
}
|
||||
|
||||
// AddEvent adds one event
|
||||
@@ -75,8 +76,8 @@ func (expEv *FailedExportersLogg) ReplayFailedPosts(ctx *context.Context, attemp
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
expLogger := utils.NewExportLogger(nodeID, tnt, logLvl,
|
||||
expEv.Path, expEv.Format, attempts, expEv.FailedPostsDir)
|
||||
expLogger := engine.NewExportLogger(nodeID, tnt, logLvl,
|
||||
expEv.connMngr, expEv.cfg)
|
||||
for _, event := range expEv.Events {
|
||||
var content []byte
|
||||
if content, err = utils.ToUnescapedJSON(event); err != nil {
|
||||
@@ -88,7 +89,7 @@ func (expEv *FailedExportersLogg) ReplayFailedPosts(ctx *context.Context, attemp
|
||||
}); err != nil {
|
||||
var reply string
|
||||
// if there are any errors in kafka, we will post in FailedPostDirectory
|
||||
if err = expEv.connMngr.Call(ctx, expEv.efsConns, utils.EfSv1ProcessEvent,
|
||||
if err = expEv.connMngr.Call(ctx, expEv.cfg.LoggerCfg().EFsConns, utils.EfSv1ProcessEvent,
|
||||
&utils.ArgsFailedPosts{
|
||||
Tenant: tnt,
|
||||
Path: expLogger.Writer.Addr.String(),
|
||||
@@ -98,11 +99,6 @@ func (expEv *FailedExportersLogg) ReplayFailedPosts(ctx *context.Context, attemp
|
||||
APIOpts: expLogger.GetMeta(),
|
||||
}, &reply); err != nil {
|
||||
return err
|
||||
/*
|
||||
utils.AddFailedMessage(expLogger.FldPostDir, expLogger.Writer.Addr.String(), utils.
|
||||
MetaKafkaLog, utils.Kafka,
|
||||
event, expLogger.GetMeta())
|
||||
*/
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -116,7 +116,7 @@ func NewFailoverPosterFromFile(filePath, providerType string, efs *EfS) (failPos
|
||||
connMngr: efs.connMgr,
|
||||
}
|
||||
case utils.Kafka:
|
||||
expEv.efsConns = efs.cfg.LoggerCfg().EFsConns
|
||||
expEv.cfg = efs.cfg
|
||||
expEv.connMngr = efs.connMgr
|
||||
failPoster = expEv
|
||||
}
|
||||
|
||||
268
engine/kafka_logger.go
Normal file
268
engine/kafka_logger.go
Normal file
@@ -0,0 +1,268 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package engine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/syslog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/segmentio/kafka-go"
|
||||
)
|
||||
|
||||
func NewLogger(loggerType, tnt, nodeID string,
|
||||
connMgr *ConnManager, cfg *config.CGRConfig) (utils.LoggerInterface, error) {
|
||||
switch loggerType {
|
||||
case utils.MetaKafkaLog:
|
||||
return NewExportLogger(nodeID, tnt, cfg.LoggerCfg().Level, connMgr, cfg), nil
|
||||
case utils.MetaStdLog, utils.MetaSysLog:
|
||||
return utils.NewLogger(loggerType, nodeID, cfg.LoggerCfg().Level)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported logger: <%+s>", loggerType)
|
||||
}
|
||||
}
|
||||
|
||||
// Logs to kafka
|
||||
type ExportLogger struct {
|
||||
sync.Mutex
|
||||
cfg *config.CGRConfig
|
||||
connMgr *ConnManager
|
||||
|
||||
LogLevel int
|
||||
FldPostDir string
|
||||
Writer *kafka.Writer
|
||||
NodeID string
|
||||
Tenant string
|
||||
}
|
||||
|
||||
// NewExportLogger will export loggers to kafka
|
||||
func NewExportLogger(nodeID, tenant string, level int,
|
||||
connMgr *ConnManager, cfg *config.CGRConfig) (el *ExportLogger) {
|
||||
el = &ExportLogger{
|
||||
connMgr: connMgr,
|
||||
cfg: cfg,
|
||||
LogLevel: level,
|
||||
FldPostDir: cfg.LoggerCfg().Opts.FailedPostsDir,
|
||||
NodeID: nodeID,
|
||||
Tenant: tenant,
|
||||
Writer: &kafka.Writer{
|
||||
Addr: kafka.TCP(cfg.LoggerCfg().Opts.KafkaConn),
|
||||
Topic: cfg.LoggerCfg().Opts.KafkaTopic,
|
||||
MaxAttempts: cfg.LoggerCfg().Opts.KafkaAttempts,
|
||||
},
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (el *ExportLogger) Close() (err error) {
|
||||
if el.Writer != nil {
|
||||
err = el.Writer.Close()
|
||||
el.Writer = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (el *ExportLogger) call(m string, level int) (err error) {
|
||||
eventExport := &utils.CGREvent{
|
||||
Tenant: el.Tenant,
|
||||
Event: map[string]interface{}{
|
||||
utils.NodeID: el.NodeID,
|
||||
utils.Message: m,
|
||||
utils.Severity: level,
|
||||
utils.Timestamp: time.Now().Format("2006-01-02 15:04:05"),
|
||||
},
|
||||
}
|
||||
// event will be exported through kafka as json format
|
||||
var content []byte
|
||||
if content, err = utils.ToUnescapedJSON(eventExport); err != nil {
|
||||
return
|
||||
}
|
||||
if err = el.Writer.WriteMessages(context.Background(), kafka.Message{
|
||||
Key: []byte(utils.GenUUID()),
|
||||
Value: content,
|
||||
}); err != nil {
|
||||
// if there are any errors in kafka, we will post in FailedPostDirectory
|
||||
args := &utils.ArgsFailedPosts{
|
||||
Tenant: el.Tenant,
|
||||
Path: el.Writer.Addr.String(),
|
||||
Event: eventExport,
|
||||
FailedDir: el.FldPostDir,
|
||||
Module: utils.Kafka,
|
||||
APIOpts: el.GetMeta(),
|
||||
}
|
||||
var reply string
|
||||
if err = el.connMgr.Call(context.Background(), el.cfg.LoggerCfg().EFsConns,
|
||||
utils.EfSv1ProcessEvent, args, &reply); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> Exporter could not writte failed event with <%s> service because err: <%s>",
|
||||
utils.Logger, utils.EFs, err.Error()))
|
||||
}
|
||||
// also the content should be printed as a stdout logger type
|
||||
return utils.ErrLoggerChanged
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (el *ExportLogger) Write(p []byte) (n int, err error) {
|
||||
n = len(p)
|
||||
err = el.call(string(p), 8)
|
||||
return
|
||||
}
|
||||
|
||||
func (sl *ExportLogger) GetSyslog() *syslog.Writer {
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetLogLevel() returns the level logger number for the server
|
||||
func (el *ExportLogger) GetLogLevel() int {
|
||||
return el.LogLevel
|
||||
}
|
||||
|
||||
// SetLogLevel changes the log level
|
||||
func (el *ExportLogger) SetLogLevel(level int) {
|
||||
el.LogLevel = level
|
||||
}
|
||||
|
||||
// Alert logs to EEs with alert level
|
||||
func (el *ExportLogger) Alert(m string) (err error) {
|
||||
if el.LogLevel < utils.LOGLEVEL_ALERT {
|
||||
return nil
|
||||
}
|
||||
if err = el.call(m, utils.LOGLEVEL_ALERT); err != nil {
|
||||
if err == utils.ErrLoggerChanged {
|
||||
utils.NewStdLogger(el.NodeID, el.LogLevel).Alert(m)
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Crit logs to EEs with critical level
|
||||
func (el *ExportLogger) Crit(m string) (err error) {
|
||||
if el.LogLevel < utils.LOGLEVEL_CRITICAL {
|
||||
return nil
|
||||
}
|
||||
if el.call(m, utils.LOGLEVEL_CRITICAL); err != nil {
|
||||
if err == utils.ErrLoggerChanged {
|
||||
utils.NewStdLogger(el.NodeID, el.LogLevel).Crit(m)
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Debug logs to EEs with debug level
|
||||
func (el *ExportLogger) Debug(m string) (err error) {
|
||||
if el.LogLevel < utils.LOGLEVEL_DEBUG {
|
||||
return nil
|
||||
}
|
||||
if err = el.call(m, utils.LOGLEVEL_DEBUG); err != nil {
|
||||
if err == utils.ErrLoggerChanged {
|
||||
utils.NewStdLogger(el.NodeID, el.LogLevel).Debug(m)
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Emerg logs to EEs with emergency level
|
||||
func (el *ExportLogger) Emerg(m string) (err error) {
|
||||
if el.LogLevel < utils.LOGLEVEL_EMERGENCY {
|
||||
return nil
|
||||
}
|
||||
if err = el.call(m, utils.LOGLEVEL_EMERGENCY); err != nil {
|
||||
if err == utils.ErrLoggerChanged {
|
||||
utils.NewStdLogger(el.NodeID, el.LogLevel).Emerg(m)
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Err logs to EEs with error level
|
||||
func (el *ExportLogger) Err(m string) (err error) {
|
||||
if el.LogLevel < utils.LOGLEVEL_ERROR {
|
||||
return nil
|
||||
}
|
||||
if err = el.call(m, utils.LOGLEVEL_ERROR); err != nil {
|
||||
if err == utils.ErrLoggerChanged {
|
||||
utils.NewStdLogger(el.NodeID, el.LogLevel).Err(m)
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Info logs to EEs with info level
|
||||
func (el *ExportLogger) Info(m string) (err error) {
|
||||
if el.LogLevel < utils.LOGLEVEL_INFO {
|
||||
return nil
|
||||
}
|
||||
if err = el.call(m, utils.LOGLEVEL_INFO); err != nil {
|
||||
if err == utils.ErrLoggerChanged {
|
||||
utils.NewStdLogger(el.NodeID, el.LogLevel).Info(m)
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Notice logs to EEs with notice level
|
||||
func (el *ExportLogger) Notice(m string) (err error) {
|
||||
if el.LogLevel < utils.LOGLEVEL_NOTICE {
|
||||
return nil
|
||||
}
|
||||
if err = el.call(m, utils.LOGLEVEL_NOTICE); err != nil {
|
||||
if err == utils.ErrLoggerChanged {
|
||||
utils.NewStdLogger(el.NodeID, el.LogLevel).Notice(m)
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Warning logs to EEs with warning level
|
||||
func (el *ExportLogger) Warning(m string) (err error) {
|
||||
if el.LogLevel < utils.LOGLEVEL_WARNING {
|
||||
return nil
|
||||
}
|
||||
if err = el.call(m, utils.LOGLEVEL_WARNING); err != nil {
|
||||
if err == utils.ErrLoggerChanged {
|
||||
utils.NewStdLogger(el.NodeID, el.LogLevel).Warning(m)
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (el *ExportLogger) GetMeta() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
utils.Tenant: el.Tenant,
|
||||
utils.NodeID: el.NodeID,
|
||||
utils.Level: el.LogLevel,
|
||||
utils.Format: el.Writer.Topic,
|
||||
utils.Conn: el.Writer.Addr.String(),
|
||||
utils.FailedPostsDir: el.FldPostDir,
|
||||
utils.Attempts: el.Writer.MaxAttempts,
|
||||
}
|
||||
}
|
||||
@@ -16,7 +16,7 @@ You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package utils
|
||||
package engine
|
||||
|
||||
/*
|
||||
func TestLoggerNewLoggerExport(t *testing.T) {
|
||||
13
ers/amqp.go
13
ers/amqp.go
@@ -35,8 +35,9 @@ import (
|
||||
// NewAMQPER return a new amqp event reader
|
||||
func NewAMQPER(cfg *config.CGRConfig, cfgIdx int,
|
||||
rdrEvents, partialEvents chan *erEvent, rdrErr chan error,
|
||||
fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) {
|
||||
fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (er EventReader, err error) {
|
||||
rdr := &AMQPER{
|
||||
connMgr: connMgr,
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: cfgIdx,
|
||||
fltrS: fltrS,
|
||||
@@ -60,9 +61,10 @@ func NewAMQPER(cfg *config.CGRConfig, cfgIdx int,
|
||||
// AMQPER implements EventReader interface for amqp message
|
||||
type AMQPER struct {
|
||||
// sync.RWMutex
|
||||
cgrCfg *config.CGRConfig
|
||||
cfgIdx int // index of config instance within ERsCfg.Readers
|
||||
fltrS *engine.FilterS
|
||||
cgrCfg *config.CGRConfig
|
||||
cfgIdx int // index of config instance within ERsCfg.Readers
|
||||
fltrS *engine.FilterS
|
||||
connMgr *engine.ConnManager
|
||||
|
||||
dialURL string
|
||||
queueID string
|
||||
@@ -167,7 +169,8 @@ func (rdr *AMQPER) readLoop(msgChan <-chan amqp.Delivery) {
|
||||
utils.ERs, msg.MessageId, err.Error()))
|
||||
}
|
||||
if rdr.poster != nil { // post it
|
||||
if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Body, utils.EmptyString); err != nil {
|
||||
if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Body, utils.EmptyString,
|
||||
rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> writing message %s error: %s",
|
||||
utils.ERs, msg.MessageId, err.Error()))
|
||||
|
||||
@@ -75,7 +75,7 @@ func TestAMQPER(t *testing.T) {
|
||||
rdrExit = make(chan struct{}, 1)
|
||||
|
||||
if rdr, err = NewAMQPER(cfg, 1, rdrEvents, make(chan *erEvent, 1),
|
||||
rdrErr, new(engine.FilterS), rdrExit); err != nil {
|
||||
rdrErr, new(engine.FilterS), rdrExit, nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
connection, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
|
||||
@@ -136,7 +136,7 @@ func TestAMQPERServeError(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
cfgIdx := 0
|
||||
expected := "AMQP scheme must be either 'amqp://' or 'amqps://'"
|
||||
rdr, err := NewAMQPER(cfg, cfgIdx, nil, nil, nil, nil, nil)
|
||||
rdr, err := NewAMQPER(cfg, cfgIdx, nil, nil, nil, nil, nil, nil)
|
||||
if err != nil {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
|
||||
}
|
||||
|
||||
@@ -35,8 +35,9 @@ import (
|
||||
// NewAMQPv1ER return a new amqpv1 event reader
|
||||
func NewAMQPv1ER(cfg *config.CGRConfig, cfgIdx int,
|
||||
rdrEvents, partialEvents chan *erEvent, rdrErr chan error,
|
||||
fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) {
|
||||
fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (er EventReader, err error) {
|
||||
rdr := &AMQPv1ER{
|
||||
connMgr: connMgr,
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: cfgIdx,
|
||||
fltrS: fltrS,
|
||||
@@ -61,9 +62,10 @@ func NewAMQPv1ER(cfg *config.CGRConfig, cfgIdx int,
|
||||
// AMQPv1ER implements EventReader interface for amqpv1 message
|
||||
type AMQPv1ER struct {
|
||||
// sync.RWMutex
|
||||
cgrCfg *config.CGRConfig
|
||||
cfgIdx int // index of config instance within ERsCfg.Readers
|
||||
fltrS *engine.FilterS
|
||||
cgrCfg *config.CGRConfig
|
||||
cfgIdx int // index of config instance within ERsCfg.Readers
|
||||
fltrS *engine.FilterS
|
||||
connMgr *engine.ConnManager
|
||||
|
||||
queueID string
|
||||
|
||||
@@ -145,7 +147,8 @@ func (rdr *AMQPv1ER) readLoop(recv *amqpv1.Receiver) (err error) {
|
||||
utils.ERs, err.Error()))
|
||||
}
|
||||
if rdr.poster != nil { // post it
|
||||
if err := ees.ExportWithAttempts(context.Background(), rdr.poster, body, utils.EmptyString); err != nil {
|
||||
if err := ees.ExportWithAttempts(context.Background(), rdr.poster, body, utils.EmptyString, rdr.connMgr,
|
||||
rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> writing message error: %s",
|
||||
utils.ERs, err.Error()))
|
||||
|
||||
@@ -80,7 +80,7 @@ func TestAMQPERv1(t *testing.T) {
|
||||
rdrExit = make(chan struct{}, 1)
|
||||
|
||||
if rdr, err = NewAMQPv1ER(cfg, 1, rdrEvents, make(chan *erEvent, 1),
|
||||
rdrErr, new(engine.FilterS), rdrExit); err != nil {
|
||||
rdrErr, new(engine.FilterS), rdrExit, nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
amqpv1Rdr := rdr.(*AMQPv1ER)
|
||||
@@ -153,7 +153,7 @@ func TestAmqpv1NewAMQPv1ER(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
result, err := NewAMQPv1ER(cfg, cfgIdx, nil, nil, nil, nil, nil)
|
||||
result, err := NewAMQPv1ER(cfg, cfgIdx, nil, nil, nil, nil, nil, nil)
|
||||
if err != nil {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
|
||||
}
|
||||
@@ -184,7 +184,7 @@ func TestAmqpv1NewAMQPv1ER2(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
result, err := NewAMQPv1ER(cfg, cfgIdx, nil, nil, nil, nil, nil)
|
||||
result, err := NewAMQPv1ER(cfg, cfgIdx, nil, nil, nil, nil, nil, nil)
|
||||
if err != nil {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
|
||||
}
|
||||
|
||||
@@ -164,7 +164,7 @@ func (erS *ERService) addReader(rdrID string, cfgIdx int) (err error) {
|
||||
var rdr EventReader
|
||||
if rdr, err = NewEventReader(erS.cfg, cfgIdx,
|
||||
erS.rdrEvents, erS.partialEvents, erS.rdrErr,
|
||||
erS.filterS, erS.stopLsn[rdrID]); err != nil {
|
||||
erS.filterS, erS.stopLsn[rdrID], erS.connMgr); err != nil {
|
||||
return
|
||||
}
|
||||
erS.rdrs[rdrID] = rdr
|
||||
|
||||
13
ers/kafka.go
13
ers/kafka.go
@@ -37,9 +37,10 @@ import (
|
||||
// NewKafkaER return a new kafka event reader
|
||||
func NewKafkaER(cfg *config.CGRConfig, cfgIdx int,
|
||||
rdrEvents, partialEvents chan *erEvent, rdrErr chan error,
|
||||
fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) {
|
||||
fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (er EventReader, err error) {
|
||||
|
||||
rdr := &KafkaER{
|
||||
connMgr: connMgr,
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: cfgIdx,
|
||||
fltrS: fltrS,
|
||||
@@ -65,9 +66,10 @@ func NewKafkaER(cfg *config.CGRConfig, cfgIdx int,
|
||||
// KafkaER implements EventReader interface for kafka message
|
||||
type KafkaER struct {
|
||||
// sync.RWMutex
|
||||
cgrCfg *config.CGRConfig
|
||||
cfgIdx int // index of config instance within ERsCfg.Readers
|
||||
fltrS *engine.FilterS
|
||||
cgrCfg *config.CGRConfig
|
||||
cfgIdx int // index of config instance within ERsCfg.Readers
|
||||
fltrS *engine.FilterS
|
||||
connMgr *engine.ConnManager
|
||||
|
||||
dialURL string
|
||||
topic string
|
||||
@@ -141,7 +143,8 @@ func (rdr *KafkaER) readLoop(r *kafka.Reader) {
|
||||
utils.ERs, string(msg.Key), err.Error()))
|
||||
}
|
||||
if rdr.poster != nil { // post it
|
||||
if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Value, string(msg.Key)); err != nil {
|
||||
if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Value, string(msg.Key),
|
||||
rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> writing message %s error: %s",
|
||||
utils.ERs, string(msg.Key), err.Error()))
|
||||
|
||||
@@ -75,7 +75,7 @@ func TestKafkaER(t *testing.T) {
|
||||
rdrExit = make(chan struct{}, 1)
|
||||
|
||||
if rdr, err = NewKafkaER(cfg, 1, rdrEvents, make(chan *erEvent, 1),
|
||||
rdrErr, new(engine.FilterS), rdrExit); err != nil {
|
||||
rdrErr, new(engine.FilterS), rdrExit, nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
w := kafka.NewWriter(kafka.WriterConfig{
|
||||
|
||||
@@ -104,7 +104,7 @@ func TestKafkaERServe(t *testing.T) {
|
||||
rdrEvents := make(chan *erEvent, 1)
|
||||
rdrExit := make(chan struct{}, 1)
|
||||
rdrErr := make(chan error, 1)
|
||||
rdr, err := NewKafkaER(cfg, 0, rdrEvents, make(chan *erEvent, 1), rdrErr, fltrS, rdrExit)
|
||||
rdr, err := NewKafkaER(cfg, 0, rdrEvents, make(chan *erEvent, 1), rdrErr, fltrS, rdrExit, nil)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
13
ers/nats.go
13
ers/nats.go
@@ -38,8 +38,9 @@ import (
|
||||
// NewNatsER return a new amqp event reader
|
||||
func NewNatsER(cfg *config.CGRConfig, cfgIdx int,
|
||||
rdrEvents, partialEvents chan *erEvent, rdrErr chan error,
|
||||
fltrS *engine.FilterS, rdrExit chan struct{}) (_ EventReader, err error) {
|
||||
fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (_ EventReader, err error) {
|
||||
rdr := &NatsER{
|
||||
connMgr: connMgr,
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: cfgIdx,
|
||||
fltrS: fltrS,
|
||||
@@ -66,9 +67,10 @@ func NewNatsER(cfg *config.CGRConfig, cfgIdx int,
|
||||
// NatsER implements EventReader interface for amqp message
|
||||
type NatsER struct {
|
||||
// sync.RWMutex
|
||||
cgrCfg *config.CGRConfig
|
||||
cfgIdx int // index of config instance within ERsCfg.Readers
|
||||
fltrS *engine.FilterS
|
||||
cgrCfg *config.CGRConfig
|
||||
cfgIdx int // index of config instance within ERsCfg.Readers
|
||||
fltrS *engine.FilterS
|
||||
connMgr *engine.ConnManager
|
||||
|
||||
rdrEvents chan *erEvent // channel to dispatch the events created to
|
||||
partialEvents chan *erEvent // channel to dispatch the partial events created to
|
||||
@@ -139,7 +141,8 @@ func (rdr *NatsER) Serve() (err error) {
|
||||
utils.ERs, string(msg.Data), err.Error()))
|
||||
}
|
||||
if rdr.poster != nil { // post it
|
||||
if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Data, utils.EmptyString); err != nil {
|
||||
if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Data, utils.EmptyString,
|
||||
rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> writing message %s error: %s",
|
||||
utils.ERs, string(msg.Data), err.Error()))
|
||||
|
||||
@@ -75,7 +75,7 @@ func testCheckNatsJetStream(t *testing.T, cfg *config.CGRConfig) {
|
||||
rdrExit = make(chan struct{}, 1)
|
||||
var err error
|
||||
if rdr, err = NewNatsER(cfg, 1, rdrEvents, make(chan *erEvent, 1),
|
||||
rdrErr, new(engine.FilterS), rdrExit); err != nil {
|
||||
rdrErr, new(engine.FilterS), rdrExit, nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -162,7 +162,7 @@ func testCheckNatsNormal(t *testing.T, cfg *config.CGRConfig) {
|
||||
|
||||
var err error
|
||||
if rdr, err = NewNatsER(cfg, 1, rdrEvents, make(chan *erEvent, 1),
|
||||
rdrErr, new(engine.FilterS), rdrExit); err != nil {
|
||||
rdrErr, new(engine.FilterS), rdrExit, nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
||||
@@ -35,7 +35,7 @@ type EventReader interface {
|
||||
// NewEventReader instantiates the event reader based on configuration at index
|
||||
func NewEventReader(cfg *config.CGRConfig, cfgIdx int,
|
||||
rdrEvents, partialEvents chan *erEvent, rdrErr chan error,
|
||||
fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) {
|
||||
fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (er EventReader, err error) {
|
||||
switch cfg.ERsCfg().Readers[cfgIdx].Type {
|
||||
default:
|
||||
err = fmt.Errorf("unsupported reader type: <%s>", cfg.ERsCfg().Readers[cfgIdx].Type)
|
||||
@@ -46,21 +46,21 @@ func NewEventReader(cfg *config.CGRConfig, cfgIdx int,
|
||||
case utils.MetaFileFWV:
|
||||
return NewFWVFileER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
|
||||
case utils.MetaKafkajsonMap:
|
||||
return NewKafkaER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
|
||||
return NewKafkaER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit, connMgr)
|
||||
case utils.MetaSQL:
|
||||
return NewSQLEventReader(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
|
||||
case utils.MetaFileJSON:
|
||||
return NewJSONFileER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
|
||||
case utils.MetaAMQPjsonMap:
|
||||
return NewAMQPER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
|
||||
return NewAMQPER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit, connMgr)
|
||||
case utils.MetaS3jsonMap:
|
||||
return NewS3ER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
|
||||
return NewS3ER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit, connMgr)
|
||||
case utils.MetaSQSjsonMap:
|
||||
return NewSQSER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
|
||||
return NewSQSER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit, connMgr)
|
||||
case utils.MetaAMQPV1jsonMap:
|
||||
return NewAMQPv1ER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
|
||||
return NewAMQPv1ER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit, connMgr)
|
||||
case utils.MetaNatsjsonMap:
|
||||
return NewNatsER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
|
||||
return NewNatsER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit, connMgr)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -36,7 +36,7 @@ func TestNewInvalidReader(t *testing.T) {
|
||||
if len(cfg.ERsCfg().Readers) != 2 {
|
||||
t.Errorf("Expecting: <2>, received: <%+v>", len(cfg.ERsCfg().Readers))
|
||||
}
|
||||
if _, err := NewEventReader(cfg, 1, nil, nil, nil, &engine.FilterS{}, nil); err == nil || err.Error() != "unsupported reader type: <Invalid>" {
|
||||
if _, err := NewEventReader(cfg, 1, nil, nil, nil, &engine.FilterS{}, nil, nil); err == nil || err.Error() != "unsupported reader type: <Invalid>" {
|
||||
t.Errorf("Expecting: <unsupported reader type: <Invalid>>, received: <%+v>", err)
|
||||
}
|
||||
}
|
||||
@@ -61,7 +61,7 @@ func TestNewCsvReader(t *testing.T) {
|
||||
rdrExit: nil,
|
||||
conReqs: nil}
|
||||
var expected EventReader = exp
|
||||
if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil); err != nil {
|
||||
if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil, nil); err != nil {
|
||||
t.Errorf("Expecting: <nil>, received: <%+v>", err)
|
||||
} else {
|
||||
// because we use function make to init the channel when we create the EventReader reflect.DeepEqual
|
||||
@@ -84,11 +84,11 @@ func TestNewKafkaReader(t *testing.T) {
|
||||
if len(cfg.ERsCfg().Readers) != 2 {
|
||||
t.Errorf("Expecting: <2>, received: <%+v>", len(cfg.ERsCfg().Readers))
|
||||
}
|
||||
expected, err := NewKafkaER(cfg, 1, nil, nil, nil, fltr, nil)
|
||||
expected, err := NewKafkaER(cfg, 1, nil, nil, nil, fltr, nil, nil)
|
||||
if err != nil {
|
||||
t.Errorf("Expecting: <nil>, received: <%+v>", err)
|
||||
}
|
||||
if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil); err != nil {
|
||||
if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil, nil); err != nil {
|
||||
t.Errorf("Expecting: <nil>, received: <%+v>", err)
|
||||
} else if !reflect.DeepEqual(expected, rcv) {
|
||||
t.Errorf("Expecting: <%+v>, received: <%+v>", expected, rcv)
|
||||
@@ -115,7 +115,7 @@ func TestNewSQLReader(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("Expecting: <nil>, received: <%+v>", err)
|
||||
}
|
||||
if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil); err != nil {
|
||||
if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil, nil); err != nil {
|
||||
t.Errorf("Expecting: <nil>, received: <%+v>", err)
|
||||
} else if !reflect.DeepEqual(expected, rcv) {
|
||||
t.Errorf("Expecting: <%+v>, received: <%+v>", expected, rcv)
|
||||
@@ -149,7 +149,7 @@ func TestNewFileXMLReader(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil)
|
||||
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
} else {
|
||||
@@ -169,7 +169,7 @@ func TestNewFileFWVReader(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil)
|
||||
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil)
|
||||
if err != nil {
|
||||
t.Error(nil)
|
||||
} else {
|
||||
@@ -189,7 +189,7 @@ func TestNewJSONReader(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil)
|
||||
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
} else {
|
||||
@@ -219,7 +219,7 @@ func TestNewAMQPReader(t *testing.T) {
|
||||
exp.setOpts(&config.EventReaderOpts{})
|
||||
exp.createPoster()
|
||||
var expected EventReader = exp
|
||||
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil)
|
||||
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(expected, rcv) {
|
||||
@@ -244,7 +244,7 @@ func TestNewAMQPv1Reader(t *testing.T) {
|
||||
exp.Config().Opts = &config.EventReaderOpts{}
|
||||
exp.createPoster()
|
||||
var expected EventReader = exp
|
||||
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil)
|
||||
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(expected, rcv) {
|
||||
@@ -270,7 +270,7 @@ func TestNewS3Reader(t *testing.T) {
|
||||
exp.Config().Opts = &config.EventReaderOpts{}
|
||||
exp.createPoster()
|
||||
var expected EventReader = exp
|
||||
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil)
|
||||
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(expected, rcv) {
|
||||
@@ -307,7 +307,7 @@ func TestNewSQSReader(t *testing.T) {
|
||||
exp.Config().Opts = &config.EventReaderOpts{}
|
||||
exp.createPoster()
|
||||
var expected EventReader = exp
|
||||
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil)
|
||||
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil)
|
||||
exp.session = rcv.(*SQSER).session
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
|
||||
13
ers/s3.go
13
ers/s3.go
@@ -39,9 +39,10 @@ import (
|
||||
// NewS3ER return a new s3 event reader
|
||||
func NewS3ER(cfg *config.CGRConfig, cfgIdx int,
|
||||
rdrEvents, partialEvents chan *erEvent, rdrErr chan error,
|
||||
fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) {
|
||||
fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (er EventReader, err error) {
|
||||
|
||||
rdr := &S3ER{
|
||||
connMgr: connMgr,
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: cfgIdx,
|
||||
fltrS: fltrS,
|
||||
@@ -63,9 +64,10 @@ func NewS3ER(cfg *config.CGRConfig, cfgIdx int,
|
||||
// S3ER implements EventReader interface for s3 message
|
||||
type S3ER struct {
|
||||
// sync.RWMutex
|
||||
cgrCfg *config.CGRConfig
|
||||
cfgIdx int // index of config instance within ERsCfg.Readers
|
||||
fltrS *engine.FilterS
|
||||
cgrCfg *config.CGRConfig
|
||||
cfgIdx int // index of config instance within ERsCfg.Readers
|
||||
fltrS *engine.FilterS
|
||||
connMgr *engine.ConnManager
|
||||
|
||||
rdrEvents chan *erEvent // channel to dispatch the events created to
|
||||
partialEvents chan *erEvent // channel to dispatch the partial events created to
|
||||
@@ -247,7 +249,8 @@ func (rdr *S3ER) readMsg(scv *s3.S3, key string) (err error) {
|
||||
}
|
||||
|
||||
if rdr.poster != nil { // post it
|
||||
if err = ees.ExportWithAttempts(context.Background(), rdr.poster, msg, key); err != nil {
|
||||
if err = ees.ExportWithAttempts(context.Background(), rdr.poster, msg, key,
|
||||
rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> writing message %s error: %s",
|
||||
utils.ERs, key, err.Error()))
|
||||
|
||||
@@ -86,7 +86,7 @@ func TestS3ER(t *testing.T) {
|
||||
rdrExit = make(chan struct{}, 1)
|
||||
|
||||
if rdr, err = NewS3ER(cfg, 1, rdrEvents, make(chan *erEvent, 1),
|
||||
rdrErr, new(engine.FilterS), rdrExit); err != nil {
|
||||
rdrErr, new(engine.FilterS), rdrExit, nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
s3Rdr := rdr.(*S3ER)
|
||||
@@ -176,7 +176,7 @@ func TestNewS3ER(t *testing.T) {
|
||||
}
|
||||
|
||||
rdr, err := NewS3ER(cfg, 1, nil, nil,
|
||||
nil, nil, nil)
|
||||
nil, nil, nil, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -190,18 +190,11 @@ func TestNewS3ERCase2(t *testing.T) {
|
||||
expected := &S3ER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: 0,
|
||||
fltrS: nil,
|
||||
rdrEvents: nil,
|
||||
rdrExit: nil,
|
||||
rdrErr: nil,
|
||||
cap: nil,
|
||||
awsRegion: "",
|
||||
awsID: "",
|
||||
awsKey: "",
|
||||
awsToken: "",
|
||||
bucket: "cgrates_cdrs",
|
||||
session: nil,
|
||||
poster: nil,
|
||||
}
|
||||
cfg.ERsCfg().Readers = []*config.EventReaderCfg{
|
||||
{
|
||||
@@ -217,7 +210,7 @@ func TestNewS3ERCase2(t *testing.T) {
|
||||
}
|
||||
|
||||
rdr, err := NewS3ER(cfg, 0, nil, nil,
|
||||
nil, nil, nil)
|
||||
nil, nil, nil, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
13
ers/sqs.go
13
ers/sqs.go
@@ -39,9 +39,10 @@ import (
|
||||
// NewSQSER return a new sqs event reader
|
||||
func NewSQSER(cfg *config.CGRConfig, cfgIdx int,
|
||||
rdrEvents, partialEvents chan *erEvent, rdrErr chan error,
|
||||
fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) {
|
||||
fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (er EventReader, err error) {
|
||||
|
||||
rdr := &SQSER{
|
||||
connMgr: connMgr,
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: cfgIdx,
|
||||
fltrS: fltrS,
|
||||
@@ -63,9 +64,10 @@ func NewSQSER(cfg *config.CGRConfig, cfgIdx int,
|
||||
// SQSER implements EventReader interface for sqs message
|
||||
type SQSER struct {
|
||||
// sync.RWMutex
|
||||
cgrCfg *config.CGRConfig
|
||||
cfgIdx int // index of config instance within ERsCfg.Readers
|
||||
fltrS *engine.FilterS
|
||||
cgrCfg *config.CGRConfig
|
||||
cfgIdx int // index of config instance within ERsCfg.Readers
|
||||
fltrS *engine.FilterS
|
||||
connMgr *engine.ConnManager
|
||||
|
||||
rdrEvents chan *erEvent // channel to dispatch the events created to
|
||||
partialEvents chan *erEvent // channel to dispatch the partial events created to
|
||||
@@ -263,7 +265,8 @@ func (rdr *SQSER) readMsg(scv sqsClient, msg *sqs.Message) (err error) {
|
||||
}
|
||||
|
||||
if rdr.poster != nil { // post it
|
||||
if err = ees.ExportWithAttempts(context.Background(), rdr.poster, body, key); err != nil {
|
||||
if err = ees.ExportWithAttempts(context.Background(), rdr.poster, body, key,
|
||||
rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> writing message %s error: %s",
|
||||
utils.ERs, key, err.Error()))
|
||||
|
||||
@@ -53,7 +53,7 @@ func TestNewSQSER(t *testing.T) {
|
||||
},
|
||||
}
|
||||
rdr, err := NewSQSER(cfg, 0, nil, nil,
|
||||
nil, nil, nil)
|
||||
nil, nil, nil, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -81,7 +81,7 @@ func TestSQSERServeRunDelay0(t *testing.T) {
|
||||
},
|
||||
}
|
||||
rdr, err := NewSQSER(cfg, 0, nil, nil,
|
||||
nil, nil, nil)
|
||||
nil, nil, nil, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -107,7 +107,7 @@ func TestSQSERServe(t *testing.T) {
|
||||
},
|
||||
}
|
||||
rdr, err := NewSQSER(cfg, 0, nil, nil,
|
||||
nil, nil, nil)
|
||||
nil, nil, nil, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -203,7 +203,7 @@ func testCDRsPostFailoverToFile(t *testing.T) {
|
||||
fileName := file.Name()
|
||||
filePath := path.Join(cdrsPostFailCfg.EFsCfg().FailedPostsDir, fileName)
|
||||
|
||||
ev, err := efs.NewFailoverPosterFromFile(filePath, utils.EEs)
|
||||
ev, err := efs.NewFailoverPosterFromFile(filePath, utils.EEs, nil)
|
||||
if err != nil {
|
||||
t.Errorf("<%s> for file <%s>", err, fileName)
|
||||
continue
|
||||
|
||||
@@ -354,15 +354,11 @@ func (cgr *CGREngine) Init(ctx *context.Context, shtDw context.CancelFunc, flags
|
||||
}
|
||||
|
||||
// init syslog
|
||||
if utils.Logger, err = utils.NewLogger(
|
||||
if utils.Logger, err = engine.NewLogger(
|
||||
utils.FirstNonEmpty(*flags.SysLogger, cgr.cfg.LoggerCfg().Type),
|
||||
cgr.cfg.GeneralCfg().DefaultTenant,
|
||||
cgr.cfg.GeneralCfg().NodeID,
|
||||
cgr.cfg.LoggerCfg().Level,
|
||||
cgr.cfg.LoggerCfg().Opts.KafkaAttempts,
|
||||
cgr.cfg.LoggerCfg().Opts.KafkaConn,
|
||||
cgr.cfg.LoggerCfg().Opts.KafkaTopic,
|
||||
cgr.cfg.LoggerCfg().Opts.FailedPostsDir); err != nil {
|
||||
cgr.cM, cgr.cfg); err != nil {
|
||||
return fmt.Errorf("Could not initialize syslog connection, err: <%s>", err)
|
||||
}
|
||||
efs.SetFailedPostCacheTTL(cgr.cfg.EFsCfg().FailedPostsTTL) // init failedPosts to posts loggers/exporters in case of failing
|
||||
|
||||
@@ -1,238 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package utils
|
||||
|
||||
import (
|
||||
"log/syslog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/segmentio/kafka-go"
|
||||
)
|
||||
|
||||
// Logs to kafka
|
||||
type ExportLogger struct {
|
||||
sync.Mutex
|
||||
|
||||
LogLevel int
|
||||
FldPostDir string
|
||||
Writer *kafka.Writer
|
||||
NodeID string
|
||||
Tenant string
|
||||
}
|
||||
|
||||
// NewExportLogger will export loggers to kafka
|
||||
func NewExportLogger(nodeID, tenant string, level int,
|
||||
connOpts, connTopic string, attempts int, fldPostDir string) (el *ExportLogger) {
|
||||
el = &ExportLogger{
|
||||
LogLevel: level,
|
||||
FldPostDir: fldPostDir,
|
||||
NodeID: nodeID,
|
||||
Tenant: tenant,
|
||||
Writer: &kafka.Writer{
|
||||
Addr: kafka.TCP(connOpts),
|
||||
Topic: connTopic,
|
||||
MaxAttempts: attempts,
|
||||
},
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (el *ExportLogger) Close() (err error) {
|
||||
if el.Writer != nil {
|
||||
err = el.Writer.Close()
|
||||
el.Writer = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (el *ExportLogger) call(m string, level int) (err error) {
|
||||
eventExport := &CGREvent{
|
||||
Tenant: el.Tenant,
|
||||
Event: map[string]interface{}{
|
||||
NodeID: el.NodeID,
|
||||
Message: m,
|
||||
Severity: level,
|
||||
Timestamp: time.Now().Format("2006-01-02 15:04:05"),
|
||||
},
|
||||
}
|
||||
// event will be exported through kafka as json format
|
||||
var content []byte
|
||||
if content, err = ToUnescapedJSON(eventExport); err != nil {
|
||||
return
|
||||
}
|
||||
if err = el.Writer.WriteMessages(context.Background(), kafka.Message{
|
||||
Key: []byte(GenUUID()),
|
||||
Value: content,
|
||||
}); err != nil {
|
||||
/*
|
||||
// if there are any errors in kafka, we will post in FailedPostDirectory
|
||||
AddFailedMessage(el.FldPostDir, el.Writer.Addr.String(), MetaKafkaLog, Kafka,
|
||||
eventExport, el.GetMeta())
|
||||
// also the content should be printed as a stdout logger type
|
||||
return ErrLoggerChanged
|
||||
*/
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (el *ExportLogger) Write(p []byte) (n int, err error) {
|
||||
n = len(p)
|
||||
err = el.call(string(p), 8)
|
||||
return
|
||||
}
|
||||
|
||||
func (sl *ExportLogger) GetSyslog() *syslog.Writer {
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetLogLevel() returns the level logger number for the server
|
||||
func (el *ExportLogger) GetLogLevel() int {
|
||||
return el.LogLevel
|
||||
}
|
||||
|
||||
// SetLogLevel changes the log level
|
||||
func (el *ExportLogger) SetLogLevel(level int) {
|
||||
el.LogLevel = level
|
||||
}
|
||||
|
||||
// Alert logs to EEs with alert level
|
||||
func (el *ExportLogger) Alert(m string) (err error) {
|
||||
if el.LogLevel < LOGLEVEL_ALERT {
|
||||
return nil
|
||||
}
|
||||
if err = el.call(m, LOGLEVEL_ALERT); err != nil {
|
||||
if err == ErrLoggerChanged {
|
||||
NewStdLogger(el.NodeID, el.LogLevel).Alert(m)
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Crit logs to EEs with critical level
|
||||
func (el *ExportLogger) Crit(m string) (err error) {
|
||||
if el.LogLevel < LOGLEVEL_CRITICAL {
|
||||
return nil
|
||||
}
|
||||
if el.call(m, LOGLEVEL_CRITICAL); err != nil {
|
||||
if err == ErrLoggerChanged {
|
||||
NewStdLogger(el.NodeID, el.LogLevel).Crit(m)
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Debug logs to EEs with debug level
|
||||
func (el *ExportLogger) Debug(m string) (err error) {
|
||||
if el.LogLevel < LOGLEVEL_DEBUG {
|
||||
return nil
|
||||
}
|
||||
if err = el.call(m, LOGLEVEL_DEBUG); err != nil {
|
||||
if err == ErrLoggerChanged {
|
||||
NewStdLogger(el.NodeID, el.LogLevel).Debug(m)
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Emerg logs to EEs with emergency level
|
||||
func (el *ExportLogger) Emerg(m string) (err error) {
|
||||
if el.LogLevel < LOGLEVEL_EMERGENCY {
|
||||
return nil
|
||||
}
|
||||
if err = el.call(m, LOGLEVEL_EMERGENCY); err != nil {
|
||||
if err == ErrLoggerChanged {
|
||||
NewStdLogger(el.NodeID, el.LogLevel).Emerg(m)
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Err logs to EEs with error level
|
||||
func (el *ExportLogger) Err(m string) (err error) {
|
||||
if el.LogLevel < LOGLEVEL_ERROR {
|
||||
return nil
|
||||
}
|
||||
if err = el.call(m, LOGLEVEL_ERROR); err != nil {
|
||||
if err == ErrLoggerChanged {
|
||||
NewStdLogger(el.NodeID, el.LogLevel).Err(m)
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Info logs to EEs with info level
|
||||
func (el *ExportLogger) Info(m string) (err error) {
|
||||
if el.LogLevel < LOGLEVEL_INFO {
|
||||
return nil
|
||||
}
|
||||
if err = el.call(m, LOGLEVEL_INFO); err != nil {
|
||||
if err == ErrLoggerChanged {
|
||||
NewStdLogger(el.NodeID, el.LogLevel).Info(m)
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Notice logs to EEs with notice level
|
||||
func (el *ExportLogger) Notice(m string) (err error) {
|
||||
if el.LogLevel < LOGLEVEL_NOTICE {
|
||||
return nil
|
||||
}
|
||||
if err = el.call(m, LOGLEVEL_NOTICE); err != nil {
|
||||
if err == ErrLoggerChanged {
|
||||
NewStdLogger(el.NodeID, el.LogLevel).Notice(m)
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Warning logs to EEs with warning level
|
||||
func (el *ExportLogger) Warning(m string) (err error) {
|
||||
if el.LogLevel < LOGLEVEL_WARNING {
|
||||
return nil
|
||||
}
|
||||
if err = el.call(m, LOGLEVEL_WARNING); err != nil {
|
||||
if err == ErrLoggerChanged {
|
||||
NewStdLogger(el.NodeID, el.LogLevel).Warning(m)
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (el *ExportLogger) GetMeta() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
Tenant: el.Tenant,
|
||||
NodeID: el.NodeID,
|
||||
Level: el.LogLevel,
|
||||
Format: el.Writer.Topic,
|
||||
Conn: el.Writer.Addr.String(),
|
||||
FailedPostsDir: el.FldPostDir,
|
||||
Attempts: el.Writer.MaxAttempts,
|
||||
}
|
||||
}
|
||||
@@ -29,8 +29,7 @@ import (
|
||||
var Logger LoggerInterface
|
||||
|
||||
func init() {
|
||||
Logger, _ = NewLogger(MetaStdLog, EmptyString, EmptyString, 0,
|
||||
0, EmptyString, EmptyString, EmptyString)
|
||||
Logger, _ = NewLogger(MetaStdLog, EmptyString, 0)
|
||||
}
|
||||
|
||||
// log severities following rfc3164
|
||||
@@ -61,11 +60,8 @@ type LoggerInterface interface {
|
||||
Write(p []byte) (n int, err error)
|
||||
}
|
||||
|
||||
func NewLogger(loggerType, tenant, nodeID string, logLvl, attempts int, connOpts,
|
||||
topicOpts, fldPostsDir string) (LoggerInterface, error) {
|
||||
func NewLogger(loggerType, nodeID string, logLvl int) (LoggerInterface, error) {
|
||||
switch loggerType {
|
||||
case MetaKafkaLog:
|
||||
return NewExportLogger(nodeID, tenant, logLvl, connOpts, topicOpts, attempts, fldPostsDir), nil
|
||||
case MetaStdLog:
|
||||
return NewStdLogger(nodeID, logLvl), nil
|
||||
case MetaSysLog:
|
||||
|
||||
@@ -35,7 +35,7 @@ func TestLoggerNewLoggerStdoutOK(t *testing.T) {
|
||||
log.New(os.Stderr, EmptyString, log.LstdFlags),
|
||||
},
|
||||
}
|
||||
if rcv, err := NewLogger(MetaStdLog, EmptyString, "1234", 7, 0, EmptyString, EmptyString, EmptyString); err != nil {
|
||||
if rcv, err := NewLogger(MetaStdLog, "1234", 7); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(rcv, exp) {
|
||||
t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv)
|
||||
@@ -46,7 +46,7 @@ func TestLoggerNewLoggerSyslogOK(t *testing.T) {
|
||||
exp := &SysLogger{
|
||||
logLevel: 7,
|
||||
}
|
||||
if rcv, err := NewLogger(MetaSysLog, EmptyString, "1234", 7, 0, EmptyString, EmptyString, EmptyString); err != nil {
|
||||
if rcv, err := NewLogger(MetaSysLog, EmptyString, 7); err != nil {
|
||||
t.Error(err)
|
||||
} else {
|
||||
exp.syslog = rcv.GetSyslog()
|
||||
@@ -58,7 +58,7 @@ func TestLoggerNewLoggerSyslogOK(t *testing.T) {
|
||||
|
||||
func TestLoggerNewLoggerUnsupported(t *testing.T) {
|
||||
experr := `unsupported logger: <unsupported>`
|
||||
if _, err := NewLogger("unsupported", EmptyString, "1234", 7, 0, EmptyString, EmptyString, EmptyString); err == nil || err.Error() != experr {
|
||||
if _, err := NewLogger("unsupported", EmptyString, 7); err == nil || err.Error() != experr {
|
||||
t.Errorf("expected: <%s>, \nreceived: <%+v>", experr, err)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user