Failed posts and failover for export loggers

This commit is contained in:
adi
2022-06-28 16:56:10 +03:00
committed by Dan Christian Bogos
parent f562de49df
commit c22f43c788
13 changed files with 229 additions and 84 deletions

View File

@@ -19,9 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"bytes"
"encoding/json"
"fmt"
"log/syslog"
"time"
@@ -31,26 +28,26 @@ import (
"github.com/segmentio/kafka-go"
)
func NewLogger(loggerType, tenant, nodeID string, loggCfg *config.LoggerCfg, ctx *context.Context) (utils.LoggerInterface, error) {
func NewLogger(loggerType, tenant, nodeID string, loggCfg *config.LoggerCfg) (utils.LoggerInterface, error) {
switch loggerType {
case utils.MetaKafka:
return NewExportLogger(nodeID, tenant, loggCfg.Level, loggCfg.Opts, ctx), nil
return NewExportLogger(nodeID, tenant, loggCfg.Level, loggCfg.Opts), nil
default:
return utils.NewLogger(loggerType, nodeID, loggCfg.Level)
}
}
// Logs to EEs
// Logs to kafka
type ExportLogger struct {
logLevel int
fPost *utils.FailoverPoster
loggOpts *config.LoggerOptsCfg
writer *kafka.Writer
ctx *context.Context
nodeID string
tenant string
}
func NewExportLogger(nodeID, tenant string, level int, opts *config.LoggerOptsCfg, ctx *context.Context) (el *ExportLogger) {
func NewExportLogger(nodeID, tenant string, level int, opts *config.LoggerOptsCfg) (el *ExportLogger) {
el = &ExportLogger{
logLevel: level,
loggOpts: opts,
@@ -61,7 +58,6 @@ func NewExportLogger(nodeID, tenant string, level int, opts *config.LoggerOptsCf
Topic: opts.KafkaTopic,
MaxAttempts: opts.Attempts,
},
ctx: ctx,
}
return
}
@@ -86,27 +82,23 @@ func (el *ExportLogger) call(m string, level int) (err error) {
}
// event will be exported through kafka as json format
var content []byte
if content, err = getContent(eventExport); err != nil {
return err
}
fmt.Println("content: %v", string(content))
if err = el.writer.WriteMessages(el.ctx, kafka.Message{
Key: []byte("KafkaExport" + utils.PipeSep + el.loggOpts.KafkaTopic),
Value: content,
}); err != nil {
}
return
}
func getContent(event *utils.CGREvent) (content []byte, err error) {
buf := &bytes.Buffer{}
enc := json.NewEncoder(buf)
enc.SetEscapeHTML(false)
if err = enc.Encode(event); err != nil {
if content, err = utils.ToUnescapedJSON(eventExport); err != nil {
return
}
return buf.Bytes(), err
if err = el.writer.WriteMessages(context.Background(), kafka.Message{
Key: []byte(utils.GenUUID()),
Value: content,
}); err != nil {
// if there are any errors in kafka, we will post in FailedPostDirectory
el.fPost = utils.NewFailoverPoster()
if err = el.fPost.AddMessage(el.loggOpts.FailedPostsDir,
el.loggOpts.KafkaConn, el.loggOpts.KafkaTopic, eventExport); err != nil {
return
}
// also the content should be printed as a stdout logger type
return utils.ErrLoggerChanged
}
return
}
func (el *ExportLogger) Write(p []byte) (n int, err error) {
@@ -130,65 +122,113 @@ func (el *ExportLogger) SetLogLevel(level int) {
}
// Alert logs to EEs with alert level
func (el *ExportLogger) Alert(m string) error {
func (el *ExportLogger) Alert(m string) (err error) {
if el.logLevel < utils.LOGLEVEL_ALERT {
return nil
}
return el.call(m, utils.LOGLEVEL_ALERT)
if err = el.call(m, utils.LOGLEVEL_ALERT); err != nil {
if err == utils.ErrLoggerChanged {
utils.NewStdLogger(el.nodeID, el.logLevel).Alert(m)
err = nil
}
}
return //el.call(m, utils.LOGLEVEL_ALERT)
}
// Crit logs to EEs with critical level
func (el *ExportLogger) Crit(m string) error {
func (el *ExportLogger) Crit(m string) (err error) {
if el.logLevel < utils.LOGLEVEL_CRITICAL {
return nil
}
return el.call(m, utils.LOGLEVEL_CRITICAL)
if el.call(m, utils.LOGLEVEL_CRITICAL); err != nil {
if err == utils.ErrLoggerChanged {
utils.NewStdLogger(el.nodeID, el.logLevel).Crit(m)
err = nil
}
}
return // el.call(m, utils.LOGLEVEL_CRITICAL)
}
// Debug logs to EEs with debug level
func (el *ExportLogger) Debug(m string) error {
func (el *ExportLogger) Debug(m string) (err error) {
if el.logLevel < utils.LOGLEVEL_DEBUG {
return nil
}
return el.call(m, utils.LOGLEVEL_DEBUG)
if err = el.call(m, utils.LOGLEVEL_DEBUG); err != nil {
if err == utils.ErrLoggerChanged {
utils.NewStdLogger(el.nodeID, el.logLevel).Debug(m)
err = nil
}
}
return // el.call(m, utils.LOGLEVEL_DEBUG)
}
// Emerg logs to EEs with emergency level
func (el *ExportLogger) Emerg(m string) error {
func (el *ExportLogger) Emerg(m string) (err error) {
if el.logLevel < utils.LOGLEVEL_EMERGENCY {
return nil
}
return el.call(m, utils.LOGLEVEL_EMERGENCY)
if err = el.call(m, utils.LOGLEVEL_EMERGENCY); err != nil {
if err == utils.ErrLoggerChanged {
utils.NewStdLogger(el.nodeID, el.logLevel).Emerg(m)
err = nil
}
}
return // el.call(m, utils.LOGLEVEL_EMERGENCY)
}
// Err logs to EEs with error level
func (el *ExportLogger) Err(m string) error {
func (el *ExportLogger) Err(m string) (err error) {
if el.logLevel < utils.LOGLEVEL_ERROR {
return nil
}
return el.call(m, utils.LOGLEVEL_ERROR)
if err = el.call(m, utils.LOGLEVEL_ERROR); err != nil {
if err == utils.ErrLoggerChanged {
utils.NewStdLogger(el.nodeID, el.logLevel).Err(m)
err = nil
}
}
return // el.call(m, utils.LOGLEVEL_ERROR)
}
// Info logs to EEs with info level
func (el *ExportLogger) Info(m string) error {
func (el *ExportLogger) Info(m string) (err error) {
if el.logLevel < utils.LOGLEVEL_INFO {
return nil
}
return el.call(m, utils.LOGLEVEL_INFO)
if err = el.call(m, utils.LOGLEVEL_INFO); err != nil {
if err == utils.ErrLoggerChanged {
utils.NewStdLogger(el.nodeID, el.logLevel).Info(m)
err = nil
}
}
return // el.call(m, utils.LOGLEVEL_INFO)
}
// Notice logs to EEs with notice level
func (el *ExportLogger) Notice(m string) error {
func (el *ExportLogger) Notice(m string) (err error) {
if el.logLevel < utils.LOGLEVEL_NOTICE {
return nil
}
return el.call(m, utils.LOGLEVEL_NOTICE)
if err = el.call(m, utils.LOGLEVEL_NOTICE); err != nil {
if err == utils.ErrLoggerChanged {
utils.NewStdLogger(el.nodeID, el.logLevel).Notice(m)
err = nil
}
}
return // el.call(m, utils.LOGLEVEL_NOTICE)
}
// Warning logs to EEs with warning level
func (el *ExportLogger) Warning(m string) error {
func (el *ExportLogger) Warning(m string) (err error) {
if el.logLevel < utils.LOGLEVEL_WARNING {
return nil
}
return el.call(m, utils.LOGLEVEL_WARNING)
if err = el.call(m, utils.LOGLEVEL_WARNING); err != nil {
if err == utils.ErrLoggerChanged {
utils.NewStdLogger(el.nodeID, el.logLevel).Warning(m)
err = nil
}
}
return // el.call(m, utils.LOGLEVEL_WARNING)
}

View File

@@ -33,7 +33,7 @@ import (
func TestLoggerNewLoggerExport(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
exp := &ExportLogger{
logLevel: 7,
logLevel: 6,
nodeID: "123",
tenant: "cgrates.org",
loggOpts: cfg.LoggerCfg().Opts,
@@ -43,7 +43,7 @@ func TestLoggerNewLoggerExport(t *testing.T) {
MaxAttempts: cfg.LoggerCfg().Opts.Attempts,
},
}
if rcv, err := NewLogger(utils.MetaEEs, "cgrates.org", "123", nil, nil); err != nil {
if rcv, err := NewLogger(utils.MetaKafka, "cgrates.org", "123", cfg.LoggerCfg()); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(rcv.(*ExportLogger), exp) {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv)
@@ -52,7 +52,7 @@ func TestLoggerNewLoggerExport(t *testing.T) {
func TestLoggerNewLoggerDefault(t *testing.T) {
experr := `unsupported logger: <invalid>`
if _, err := NewLogger("invalid", "cgrates.org", "123", nil, nil); err == nil ||
if _, err := NewLogger("invalid", "cgrates.org", "123", nil); err == nil ||
err.Error() != experr {
t.Errorf("expected: <%s>, \nreceived: <%s>", experr, err)
}
@@ -71,7 +71,7 @@ func TestLoggerNewExportLogger(t *testing.T) {
MaxAttempts: cfg.LoggerCfg().Opts.Attempts,
},
}
if rcv := NewExportLogger("123", "cgrates.org", 7, cfg.LoggerCfg().Opts, nil); !reflect.DeepEqual(rcv, exp) {
if rcv := NewExportLogger("123", "cgrates.org", 7, cfg.LoggerCfg().Opts); !reflect.DeepEqual(rcv, exp) {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv)
}
}
@@ -112,7 +112,7 @@ func TestLoggerExportEmerg(t *testing.T) {
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
el := NewExportLogger("123", "cgrates.org", -1, cM, []string{eesConn})
el := NewExportLogger("123", "cgrates.org", -1, nil)
if err := el.Emerg("Emergency message"); err != nil {
t.Error(err)
@@ -159,7 +159,7 @@ func TestLoggerExportAlert(t *testing.T) {
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
el := NewExportLogger("123", "cgrates.org", 0, cM, []string{eesConn})
el := NewExportLogger("123", "cgrates.org", 0, nil)
if err := el.Alert("Alert message"); err != nil {
t.Error(err)
@@ -206,7 +206,7 @@ func TestLoggerExportCrit(t *testing.T) {
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
el := NewExportLogger("123", "cgrates.org", 1, cM, []string{eesConn})
el := NewExportLogger("123", "cgrates.org", 1, nil)
if err := el.Crit("Critical message"); err != nil {
t.Error(err)
@@ -253,7 +253,7 @@ func TestLoggerExportErr(t *testing.T) {
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
el := NewExportLogger("123", "cgrates.org", 2, cM, []string{eesConn})
el := NewExportLogger("123", "cgrates.org", 2, nil)
if err := el.Err("Error message"); err != nil {
t.Error(err)
@@ -300,7 +300,7 @@ func TestLoggerExportWarning(t *testing.T) {
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
el := NewExportLogger("123", "cgrates.org", 3, cM, []string{eesConn})
el := NewExportLogger("123", "cgrates.org", 3, nil)
if err := el.Warning("Warning message"); err != nil {
t.Error(err)
@@ -347,7 +347,7 @@ func TestLoggerExportNotice(t *testing.T) {
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
el := NewExportLogger("123", "cgrates.org", 4, cM, []string{eesConn})
el := NewExportLogger("123", "cgrates.org", 4, nil)
if err := el.Notice("Notice message"); err != nil {
t.Error(err)
@@ -394,7 +394,7 @@ func TestLoggerExportInfo(t *testing.T) {
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
el := NewExportLogger("123", "cgrates.org", 5, cM, []string{eesConn})
el := NewExportLogger("123", "cgrates.org", 5, nil)
if err := el.Info("Info message"); err != nil {
t.Error(err)
@@ -441,7 +441,7 @@ func TestLoggerExportDebug(t *testing.T) {
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
el := NewExportLogger("123", "cgrates.org", 6, cM, []string{eesConn})
el := NewExportLogger("123", "cgrates.org", 6, nil)
if err := el.Debug("Debug message"); err != nil {
t.Error(err)
@@ -453,7 +453,7 @@ func TestLoggerExportDebug(t *testing.T) {
}
func TestLoggerSetGetLogLevel(t *testing.T) {
el := NewExportLogger("123", "cgrates.org", 6, nil, nil)
el := NewExportLogger("123", "cgrates.org", 6, nil)
if rcv := el.GetLogLevel(); rcv != 6 {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", 6, rcv)
}
@@ -464,7 +464,7 @@ func TestLoggerSetGetLogLevel(t *testing.T) {
}
func TestLoggerGetSyslog(t *testing.T) {
el := NewExportLogger("123", "cgrates.org", 6, nil, nil)
el := NewExportLogger("123", "cgrates.org", 6, nil)
if el.GetSyslog() != nil {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", nil, el.GetSyslog())
}
@@ -506,7 +506,7 @@ func TestLoggerExportWrite(t *testing.T) {
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
el := NewExportLogger("123", "cgrates.org", 8, cM, []string{eesConn})
el := NewExportLogger("123", "cgrates.org", 8, nil)
if _, err := el.Write([]byte("message")); err != nil {
t.Error(err)