diff --git a/config/apis.go b/config/apis.go
index 830afb93a..c1f959405 100644
--- a/config/apis.go
+++ b/config/apis.go
@@ -350,6 +350,12 @@ func storeDiffSection(ctx *context.Context, section string, db ConfigDB, v1, v2
return
}
return db.SetSection(ctx, section, diffGeneralJsonCfg(jsn, v1.GeneralCfg(), v2.GeneralCfg()))
+ case LoggerJSON:
+ jsn := new(LoggerJsonCfg)
+ if err = db.GetSection(ctx, section, jsn); err != nil {
+ return
+ }
+ return db.SetSection(ctx, section, diffLoggerJsonCfg(jsn, v1.LoggerCfg(), v2.LoggerCfg()))
case RPCConnsJSON:
jsn := make(RPCConnsJson)
if err = db.GetSection(ctx, section, &jsn); err != nil {
diff --git a/config/config_defaults.go b/config/config_defaults.go
index 4fdf596ba..f44092e28 100644
--- a/config/config_defaults.go
+++ b/config/config_defaults.go
@@ -70,12 +70,13 @@ const CGRATES_CFG_JSON = `
},
"logger": {
- "type": "*syslog", // controls the destination of logs <*syslog|*stdout|*kafka>
- "level": 6, // system level precision for floats
+ "type": "*syslog", // controls the destination of logs <*syslog|*stdout|*kafka>
+ "level": 6, // system level precision for floats
"opts": {
- "*kafka_conn": "", // the connection trough kafka
- "*kafka_topic": "", // the topic from where the events are exported
- "*attempts": 1, // number of attempts of connecting
+ "kafka_conn": "", // the connection trough kafka
+ "kafka_topic": "", // the topic from where the events are exported
+ "attempts": 1, // number of attempts of connecting
+ "failed_posts_dir": "/var/spool/cgrates/failed_posts" // path where fail logs are exported
},
},
diff --git a/config/logger.go b/config/logger.go
index e0a7d29d0..61c58f72d 100644
--- a/config/logger.go
+++ b/config/logger.go
@@ -40,7 +40,7 @@ func (loggCfg *LoggerCfg) Load(ctx *context.Context, jsnCfg ConfigDB, _ *CGRConf
// loadFromJSONCfg loads Logger config from JsonCfg
func (loggCfg *LoggerCfg) loadFromJSONCfg(jsnLoggerCfg *LoggerJsonCfg) (err error) {
- if loggCfg == nil {
+ if jsnLoggerCfg == nil {
return nil
}
if jsnLoggerCfg.Type != nil && *jsnLoggerCfg.Type != utils.EmptyString {
@@ -65,9 +65,10 @@ func (loggCfg *LoggerCfg) AsMapInterface(string) interface{} {
}
type LoggerOptsCfg struct {
- KafkaConn string `json:"*kakfa_conn"`
- KafkaTopic string `json:"*kakfa_topic"`
- Attempts int `json:"*attempts"`
+ KafkaConn string
+ KafkaTopic string
+ Attempts int
+ FailedPostsDir string
}
func (LoggerCfg) SName() string { return LoggerJSON }
@@ -96,14 +97,18 @@ func (loggOpts *LoggerOptsCfg) loadFromJSONCfg(jsnCfg *LoggerOptsJson) {
if jsnCfg.Attempts != nil {
loggOpts.Attempts = *jsnCfg.Attempts
}
+ if jsnCfg.Failed_posts_dir != nil {
+ loggOpts.FailedPostsDir = *jsnCfg.Failed_posts_dir
+ }
}
// AsMapInterface returns the config of logger OPTS as a map[string]interface{}
func (loggOpts *LoggerOptsCfg) AsMapInterface() interface{} {
return map[string]interface{}{
- utils.KafkaConnCfg: loggOpts.KafkaConn,
- utils.KafkaTopicCfg: loggOpts.KafkaTopic,
- utils.AttemptsCfg: loggOpts.Attempts,
+ utils.KafkaConnCfg: loggOpts.KafkaConn,
+ utils.KafkaTopicCfg: loggOpts.KafkaTopic,
+ utils.AttemptsCfg: loggOpts.Attempts,
+ utils.FailedPostsDirCfg: loggOpts.FailedPostsDir,
}
}
@@ -113,9 +118,10 @@ func (loggerOpts *LoggerOptsCfg) Clone() *LoggerOptsCfg {
return nil
}
return &LoggerOptsCfg{
- KafkaConn: loggerOpts.KafkaConn,
- KafkaTopic: loggerOpts.KafkaTopic,
- Attempts: loggerOpts.Attempts,
+ KafkaConn: loggerOpts.KafkaConn,
+ KafkaTopic: loggerOpts.KafkaTopic,
+ Attempts: loggerOpts.Attempts,
+ FailedPostsDir: loggerOpts.FailedPostsDir,
}
}
@@ -126,9 +132,10 @@ type LoggerJsonCfg struct {
}
type LoggerOptsJson struct {
- Kafka_conn *string
- Kafka_topic *string
- Attempts *int
+ Kafka_conn *string `json:"kafka_conn"`
+ Kafka_topic *string `json:"kafka_topic"`
+ Attempts *int `json:"attempts"`
+ Failed_posts_dir *string `json:"failed_posts_dir"`
}
func diffLoggerJsonCfg(d *LoggerJsonCfg, v1, v2 *LoggerCfg) *LoggerJsonCfg {
@@ -158,5 +165,8 @@ func diffLoggerOptsJsonCfg(d *LoggerOptsJson, v1, v2 *LoggerOptsCfg) *LoggerOpts
if v1.Attempts != v2.Attempts {
d.Attempts = utils.IntPointer(v2.Attempts)
}
+ if v1.FailedPostsDir != v2.FailedPostsDir {
+ d.Failed_posts_dir = utils.StringPointer(v2.FailedPostsDir)
+ }
return d
}
diff --git a/data/conf/samples/tutinternal/cgrates.json b/data/conf/samples/tutinternal/cgrates.json
index da5da7341..9920fa6a7 100644
--- a/data/conf/samples/tutinternal/cgrates.json
+++ b/data/conf/samples/tutinternal/cgrates.json
@@ -1,10 +1,18 @@
{
"general": {
- "log_level": 7,
"reply_timeout": "50s"
},
+"logger": {
+ "type": "*kafka", // controls the destination of logs <*syslog|*stdout|*kafka>
+ "level": 6, // system level precision for floats
+ "opts": {
+ "kafka_conn": "192.168.55.44:9092", // the connection trough kafka
+ "kafka_topic": "TutorialTopic", // the topic from where the events are exported
+ "attempts": 1, // number of attempts of connecting
+ },
+},
"listen": {
"rpc_json": ":2012",
diff --git a/data/tariffplans/oldtutorial/Attributes.csv b/data/tariffplans/oldtutorial/Attributes.csv
index 920a2ef09..ca520370b 100644
--- a/data/tariffplans/oldtutorial/Attributes.csv
+++ b/data/tariffplans/oldtutorial/Attributes.csv
@@ -1,4 +1,5 @@
#Tenant,ID,FilterIDs,Weights,Blockers,AttributeFilterIDs,AttributeBlockers,Path,Type,Value
cgrates.org,ATTR_1,*string:~*opts.*context:*sessions|*cdrs;*string:~*req.Account:1007,;10,;false,,,*req.Account,*constant,1001
cgrates.org,ATTR_1,,,,,,*req.Subject,*constant,1001
-cgrates.org,ATTR_PASS,*string:~*opts.*context:*sessions;*string:~*req.Account:1001,;10,;false,,,*req.PasswordFromAttributes,*constant,CGRateSPassword1
\ No newline at end of file
+cgrates.org,ATTR_PASS,*string:~*opts.*context:*sessions;*string:~*req.Account:1001,;10,;false,,,*req.PasswordFromAttributes,*constant,CGRateSPassword1
+
diff --git a/data/tariffplans/testit/Accounts.csv b/data/tariffplans/testit/Accounts.csv
index f3fb0d81c..432f775a9 100644
--- a/data/tariffplans/testit/Accounts.csv
+++ b/data/tariffplans/testit/Accounts.csv
@@ -1,7 +1,5 @@
#Tenant,ID,FilterIDs,Weights,Blockers,Opts,BalanceID,BalanceFilterIDs,BalanceWeights,BalanceBlockers,BalanceType,BalanceUnits,BalanceUnitFactors,BalanceOpts,BalanceCostIncrements,BalanceAttributeIDs,BalanceRateProfileIDs,ThresholdIDs
cgrates.org,ACC_PRF_1,,;20,,,MonetaryBalance,,;10,,*monetary,14,fltr1&fltr2;100;fltr3;200,,fltr1&fltr2;1.3;2.3;3.3,attr1;attr2,,*none
cgrates.org,1001,,,,,VoiceBalance,,;10,,*voice,1h,,,,,,
-
-#Tenant,ID,FilterIDs,Weights,Blockers,Opts,BalanceID,BalanceFilterIDs,BalanceWeights,BalanceBlockers,BalanceType,BalanceUnits,BalanceUnitFactors,BalanceOpts,BalanceCostIncrements,BalanceAttributeIDs,BalanceRateProfileIDs,ThresholdIDs
cgrates.org,1001,,;20,,,MonetaryBalance,,;10,,*monetary,14,fltr1&fltr2;100;fltr3;200,,fltr1&fltr2;1.3;2.3;3.3,attr1;attr2,,*none
cgrates.org,1001,,,,,VoiceBalance,,;10,*string:~*req.Destination:1002;true;;false,*voice,1h,,,,,,
\ No newline at end of file
diff --git a/engine/logger.go b/engine/logger.go
index 92231f8f1..54b052faf 100644
--- a/engine/logger.go
+++ b/engine/logger.go
@@ -19,9 +19,6 @@ along with this program. If not, see
package engine
import (
- "bytes"
- "encoding/json"
- "fmt"
"log/syslog"
"time"
@@ -31,26 +28,26 @@ import (
"github.com/segmentio/kafka-go"
)
-func NewLogger(loggerType, tenant, nodeID string, loggCfg *config.LoggerCfg, ctx *context.Context) (utils.LoggerInterface, error) {
+func NewLogger(loggerType, tenant, nodeID string, loggCfg *config.LoggerCfg) (utils.LoggerInterface, error) {
switch loggerType {
case utils.MetaKafka:
- return NewExportLogger(nodeID, tenant, loggCfg.Level, loggCfg.Opts, ctx), nil
+ return NewExportLogger(nodeID, tenant, loggCfg.Level, loggCfg.Opts), nil
default:
return utils.NewLogger(loggerType, nodeID, loggCfg.Level)
}
}
-// Logs to EEs
+// Logs to kafka
type ExportLogger struct {
logLevel int
+ fPost *utils.FailoverPoster
loggOpts *config.LoggerOptsCfg
writer *kafka.Writer
- ctx *context.Context
nodeID string
tenant string
}
-func NewExportLogger(nodeID, tenant string, level int, opts *config.LoggerOptsCfg, ctx *context.Context) (el *ExportLogger) {
+func NewExportLogger(nodeID, tenant string, level int, opts *config.LoggerOptsCfg) (el *ExportLogger) {
el = &ExportLogger{
logLevel: level,
loggOpts: opts,
@@ -61,7 +58,6 @@ func NewExportLogger(nodeID, tenant string, level int, opts *config.LoggerOptsCf
Topic: opts.KafkaTopic,
MaxAttempts: opts.Attempts,
},
- ctx: ctx,
}
return
}
@@ -86,27 +82,23 @@ func (el *ExportLogger) call(m string, level int) (err error) {
}
// event will be exported through kafka as json format
var content []byte
- if content, err = getContent(eventExport); err != nil {
- return err
- }
- fmt.Println("content: %v", string(content))
- if err = el.writer.WriteMessages(el.ctx, kafka.Message{
- Key: []byte("KafkaExport" + utils.PipeSep + el.loggOpts.KafkaTopic),
- Value: content,
- }); err != nil {
-
- }
- return
-}
-
-func getContent(event *utils.CGREvent) (content []byte, err error) {
- buf := &bytes.Buffer{}
- enc := json.NewEncoder(buf)
- enc.SetEscapeHTML(false)
- if err = enc.Encode(event); err != nil {
+ if content, err = utils.ToUnescapedJSON(eventExport); err != nil {
return
}
- return buf.Bytes(), err
+ if err = el.writer.WriteMessages(context.Background(), kafka.Message{
+ Key: []byte(utils.GenUUID()),
+ Value: content,
+ }); err != nil {
+ // if there are any errors in kafka, we will post in FailedPostDirectory
+ el.fPost = utils.NewFailoverPoster()
+ if err = el.fPost.AddMessage(el.loggOpts.FailedPostsDir,
+ el.loggOpts.KafkaConn, el.loggOpts.KafkaTopic, eventExport); err != nil {
+ return
+ }
+ // also the content should be printed as a stdout logger type
+ return utils.ErrLoggerChanged
+ }
+ return
}
func (el *ExportLogger) Write(p []byte) (n int, err error) {
@@ -130,65 +122,113 @@ func (el *ExportLogger) SetLogLevel(level int) {
}
// Alert logs to EEs with alert level
-func (el *ExportLogger) Alert(m string) error {
+func (el *ExportLogger) Alert(m string) (err error) {
if el.logLevel < utils.LOGLEVEL_ALERT {
return nil
}
- return el.call(m, utils.LOGLEVEL_ALERT)
+ if err = el.call(m, utils.LOGLEVEL_ALERT); err != nil {
+ if err == utils.ErrLoggerChanged {
+ utils.NewStdLogger(el.nodeID, el.logLevel).Alert(m)
+ err = nil
+ }
+ }
+ return //el.call(m, utils.LOGLEVEL_ALERT)
}
// Crit logs to EEs with critical level
-func (el *ExportLogger) Crit(m string) error {
+func (el *ExportLogger) Crit(m string) (err error) {
if el.logLevel < utils.LOGLEVEL_CRITICAL {
return nil
}
- return el.call(m, utils.LOGLEVEL_CRITICAL)
+ if el.call(m, utils.LOGLEVEL_CRITICAL); err != nil {
+ if err == utils.ErrLoggerChanged {
+ utils.NewStdLogger(el.nodeID, el.logLevel).Crit(m)
+ err = nil
+ }
+ }
+ return // el.call(m, utils.LOGLEVEL_CRITICAL)
}
// Debug logs to EEs with debug level
-func (el *ExportLogger) Debug(m string) error {
+func (el *ExportLogger) Debug(m string) (err error) {
if el.logLevel < utils.LOGLEVEL_DEBUG {
return nil
}
- return el.call(m, utils.LOGLEVEL_DEBUG)
+ if err = el.call(m, utils.LOGLEVEL_DEBUG); err != nil {
+ if err == utils.ErrLoggerChanged {
+ utils.NewStdLogger(el.nodeID, el.logLevel).Debug(m)
+ err = nil
+ }
+ }
+ return // el.call(m, utils.LOGLEVEL_DEBUG)
}
// Emerg logs to EEs with emergency level
-func (el *ExportLogger) Emerg(m string) error {
+func (el *ExportLogger) Emerg(m string) (err error) {
if el.logLevel < utils.LOGLEVEL_EMERGENCY {
return nil
}
- return el.call(m, utils.LOGLEVEL_EMERGENCY)
+ if err = el.call(m, utils.LOGLEVEL_EMERGENCY); err != nil {
+ if err == utils.ErrLoggerChanged {
+ utils.NewStdLogger(el.nodeID, el.logLevel).Emerg(m)
+ err = nil
+ }
+ }
+ return // el.call(m, utils.LOGLEVEL_EMERGENCY)
}
// Err logs to EEs with error level
-func (el *ExportLogger) Err(m string) error {
+func (el *ExportLogger) Err(m string) (err error) {
if el.logLevel < utils.LOGLEVEL_ERROR {
return nil
}
- return el.call(m, utils.LOGLEVEL_ERROR)
+ if err = el.call(m, utils.LOGLEVEL_ERROR); err != nil {
+ if err == utils.ErrLoggerChanged {
+ utils.NewStdLogger(el.nodeID, el.logLevel).Err(m)
+ err = nil
+ }
+ }
+ return // el.call(m, utils.LOGLEVEL_ERROR)
}
// Info logs to EEs with info level
-func (el *ExportLogger) Info(m string) error {
+func (el *ExportLogger) Info(m string) (err error) {
if el.logLevel < utils.LOGLEVEL_INFO {
return nil
}
- return el.call(m, utils.LOGLEVEL_INFO)
+ if err = el.call(m, utils.LOGLEVEL_INFO); err != nil {
+ if err == utils.ErrLoggerChanged {
+ utils.NewStdLogger(el.nodeID, el.logLevel).Info(m)
+ err = nil
+ }
+ }
+ return // el.call(m, utils.LOGLEVEL_INFO)
}
// Notice logs to EEs with notice level
-func (el *ExportLogger) Notice(m string) error {
+func (el *ExportLogger) Notice(m string) (err error) {
if el.logLevel < utils.LOGLEVEL_NOTICE {
return nil
}
- return el.call(m, utils.LOGLEVEL_NOTICE)
+ if err = el.call(m, utils.LOGLEVEL_NOTICE); err != nil {
+ if err == utils.ErrLoggerChanged {
+ utils.NewStdLogger(el.nodeID, el.logLevel).Notice(m)
+ err = nil
+ }
+ }
+ return // el.call(m, utils.LOGLEVEL_NOTICE)
}
// Warning logs to EEs with warning level
-func (el *ExportLogger) Warning(m string) error {
+func (el *ExportLogger) Warning(m string) (err error) {
if el.logLevel < utils.LOGLEVEL_WARNING {
return nil
}
- return el.call(m, utils.LOGLEVEL_WARNING)
+ if err = el.call(m, utils.LOGLEVEL_WARNING); err != nil {
+ if err == utils.ErrLoggerChanged {
+ utils.NewStdLogger(el.nodeID, el.logLevel).Warning(m)
+ err = nil
+ }
+ }
+ return // el.call(m, utils.LOGLEVEL_WARNING)
}
diff --git a/engine/logger_test.go b/engine/logger_test.go
index c5d27e194..fbf542202 100644
--- a/engine/logger_test.go
+++ b/engine/logger_test.go
@@ -33,7 +33,7 @@ import (
func TestLoggerNewLoggerExport(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
exp := &ExportLogger{
- logLevel: 7,
+ logLevel: 6,
nodeID: "123",
tenant: "cgrates.org",
loggOpts: cfg.LoggerCfg().Opts,
@@ -43,7 +43,7 @@ func TestLoggerNewLoggerExport(t *testing.T) {
MaxAttempts: cfg.LoggerCfg().Opts.Attempts,
},
}
- if rcv, err := NewLogger(utils.MetaEEs, "cgrates.org", "123", nil, nil); err != nil {
+ if rcv, err := NewLogger(utils.MetaKafka, "cgrates.org", "123", cfg.LoggerCfg()); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(rcv.(*ExportLogger), exp) {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv)
@@ -52,7 +52,7 @@ func TestLoggerNewLoggerExport(t *testing.T) {
func TestLoggerNewLoggerDefault(t *testing.T) {
experr := `unsupported logger: `
- if _, err := NewLogger("invalid", "cgrates.org", "123", nil, nil); err == nil ||
+ if _, err := NewLogger("invalid", "cgrates.org", "123", nil); err == nil ||
err.Error() != experr {
t.Errorf("expected: <%s>, \nreceived: <%s>", experr, err)
}
@@ -71,7 +71,7 @@ func TestLoggerNewExportLogger(t *testing.T) {
MaxAttempts: cfg.LoggerCfg().Opts.Attempts,
},
}
- if rcv := NewExportLogger("123", "cgrates.org", 7, cfg.LoggerCfg().Opts, nil); !reflect.DeepEqual(rcv, exp) {
+ if rcv := NewExportLogger("123", "cgrates.org", 7, cfg.LoggerCfg().Opts); !reflect.DeepEqual(rcv, exp) {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv)
}
}
@@ -112,7 +112,7 @@ func TestLoggerExportEmerg(t *testing.T) {
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
- el := NewExportLogger("123", "cgrates.org", -1, cM, []string{eesConn})
+ el := NewExportLogger("123", "cgrates.org", -1, nil)
if err := el.Emerg("Emergency message"); err != nil {
t.Error(err)
@@ -159,7 +159,7 @@ func TestLoggerExportAlert(t *testing.T) {
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
- el := NewExportLogger("123", "cgrates.org", 0, cM, []string{eesConn})
+ el := NewExportLogger("123", "cgrates.org", 0, nil)
if err := el.Alert("Alert message"); err != nil {
t.Error(err)
@@ -206,7 +206,7 @@ func TestLoggerExportCrit(t *testing.T) {
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
- el := NewExportLogger("123", "cgrates.org", 1, cM, []string{eesConn})
+ el := NewExportLogger("123", "cgrates.org", 1, nil)
if err := el.Crit("Critical message"); err != nil {
t.Error(err)
@@ -253,7 +253,7 @@ func TestLoggerExportErr(t *testing.T) {
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
- el := NewExportLogger("123", "cgrates.org", 2, cM, []string{eesConn})
+ el := NewExportLogger("123", "cgrates.org", 2, nil)
if err := el.Err("Error message"); err != nil {
t.Error(err)
@@ -300,7 +300,7 @@ func TestLoggerExportWarning(t *testing.T) {
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
- el := NewExportLogger("123", "cgrates.org", 3, cM, []string{eesConn})
+ el := NewExportLogger("123", "cgrates.org", 3, nil)
if err := el.Warning("Warning message"); err != nil {
t.Error(err)
@@ -347,7 +347,7 @@ func TestLoggerExportNotice(t *testing.T) {
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
- el := NewExportLogger("123", "cgrates.org", 4, cM, []string{eesConn})
+ el := NewExportLogger("123", "cgrates.org", 4, nil)
if err := el.Notice("Notice message"); err != nil {
t.Error(err)
@@ -394,7 +394,7 @@ func TestLoggerExportInfo(t *testing.T) {
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
- el := NewExportLogger("123", "cgrates.org", 5, cM, []string{eesConn})
+ el := NewExportLogger("123", "cgrates.org", 5, nil)
if err := el.Info("Info message"); err != nil {
t.Error(err)
@@ -441,7 +441,7 @@ func TestLoggerExportDebug(t *testing.T) {
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
- el := NewExportLogger("123", "cgrates.org", 6, cM, []string{eesConn})
+ el := NewExportLogger("123", "cgrates.org", 6, nil)
if err := el.Debug("Debug message"); err != nil {
t.Error(err)
@@ -453,7 +453,7 @@ func TestLoggerExportDebug(t *testing.T) {
}
func TestLoggerSetGetLogLevel(t *testing.T) {
- el := NewExportLogger("123", "cgrates.org", 6, nil, nil)
+ el := NewExportLogger("123", "cgrates.org", 6, nil)
if rcv := el.GetLogLevel(); rcv != 6 {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", 6, rcv)
}
@@ -464,7 +464,7 @@ func TestLoggerSetGetLogLevel(t *testing.T) {
}
func TestLoggerGetSyslog(t *testing.T) {
- el := NewExportLogger("123", "cgrates.org", 6, nil, nil)
+ el := NewExportLogger("123", "cgrates.org", 6, nil)
if el.GetSyslog() != nil {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", nil, el.GetSyslog())
}
@@ -506,7 +506,7 @@ func TestLoggerExportWrite(t *testing.T) {
rpcInternal <- ccM
cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal)
- el := NewExportLogger("123", "cgrates.org", 8, cM, []string{eesConn})
+ el := NewExportLogger("123", "cgrates.org", 8, nil)
if _, err := el.Write([]byte("message")); err != nil {
t.Error(err)
diff --git a/services/cgr-engine.go b/services/cgr-engine.go
index f127f6fba..a551bdb5b 100644
--- a/services/cgr-engine.go
+++ b/services/cgr-engine.go
@@ -350,7 +350,7 @@ func (cgr *CGREngine) Init(ctx *context.Context, shtDw context.CancelFunc, flags
// init syslog
if utils.Logger, err = engine.NewLogger(utils.FirstNonEmpty(*flags.SysLogger, cgr.cfg.LoggerCfg().Type),
- cgr.cfg.GeneralCfg().DefaultTenant, cgr.cfg.GeneralCfg().NodeID, cgr.cfg.LoggerCfg(), ctx); err != nil {
+ cgr.cfg.GeneralCfg().DefaultTenant, cgr.cfg.GeneralCfg().NodeID, cgr.cfg.LoggerCfg()); err != nil {
return fmt.Errorf("Could not initialize syslog connection, err: <%s>", err)
}
utils.Logger.Info(fmt.Sprintf(" starting version <%s><%s>", vers, runtime.Version()))
diff --git a/utils/consts.go b/utils/consts.go
index 03c0e226a..42d721c6a 100644
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -1731,8 +1731,8 @@ const (
const (
LevelCfg = "level"
- KafkaConnCfg = "*kafka_conn"
- KafkaTopicCfg = "*kafka_topic"
+ KafkaConnCfg = "kafka_conn"
+ KafkaTopicCfg = "kafka_topic"
)
const (
diff --git a/utils/coreutils.go b/utils/coreutils.go
index e39fb6a9f..e683ddc8f 100644
--- a/utils/coreutils.go
+++ b/utils/coreutils.go
@@ -486,6 +486,16 @@ func ToJSON(v interface{}) string {
return string(b)
}
+func ToUnescapedJSON(value interface{}) (bts []byte, err error) {
+ buf := &bytes.Buffer{}
+ enc := json.NewEncoder(buf)
+ enc.SetEscapeHTML(false)
+ if err = enc.Encode(value); err != nil {
+ return
+ }
+ return buf.Bytes(), err
+}
+
// Used as generic function logic for various fields
// Attributes
diff --git a/utils/errors.go b/utils/errors.go
index a1b0235a9..27eb69044 100644
--- a/utils/errors.go
+++ b/utils/errors.go
@@ -65,6 +65,7 @@ var (
DispatcherErrorPrefix = "DISPATCHER_ERROR"
RateSErrPrfx = "RATES_ERROR"
AccountSErrPrfx = "ACCOUNTS_ERROR"
+ ErrLoggerChanged = errors.New("LOGGER_CHANGED")
ErrUnsupportedFormat = errors.New("UNSUPPORTED_FORMAT")
ErrNoDatabaseConn = errors.New("NO_DATABASE_CONNECTION")
ErrMaxIncrementsExceeded = errors.New("MAX_INCREMENTS_EXCEEDED")
diff --git a/utils/failover_export.go b/utils/failover_export.go
new file mode 100644
index 000000000..c1bdc8e73
--- /dev/null
+++ b/utils/failover_export.go
@@ -0,0 +1,70 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package utils
+
+import (
+ "encoding/gob"
+ "fmt"
+ "os"
+ "path/filepath"
+)
+
+type FailoverPoster struct {
+ MessageProvider
+}
+
+func NewFailoverPoster( /*addMsg *MessageProvider*/ ) *FailoverPoster {
+ return new(FailoverPoster)
+}
+
+func (fldPst *FailoverPoster) AddMessage(failedPostDir, kafkaConn,
+ kafkaTopic string, content interface{}) (err error) {
+ filePath := filepath.Join(failedPostDir, kafkaTopic+PipeSep+MetaKafka+GOBSuffix)
+ var fileOut *os.File
+ if _, err = os.Stat(filePath); os.IsNotExist(err) {
+ fileOut, err = os.Create(filePath)
+ if err != nil {
+ return fmt.Errorf(fmt.Sprintf(" failed to write logs to file <%s> because <%s>", filePath, err))
+ }
+ } else {
+ fileOut, err = os.OpenFile(filePath, os.O_RDWR|os.O_APPEND, 0755)
+ if err != nil {
+ return err
+ }
+ }
+ enc := gob.NewEncoder(fileOut)
+ if err = enc.Encode(content); err != nil {
+ return err
+ }
+ fileOut.Close()
+ return
+}
+
+type MessageProvider interface {
+ GetContent() string
+ GetMeta() string
+}
+
+func (fldPst *FailoverPoster) GetContent() string {
+ return EmptyString
+}
+
+func (fldPst *FailoverPoster) GetMeta() string {
+ return EmptyString
+}