From c22f43c788c9a64010295fff0f54ba2ef93d052d Mon Sep 17 00:00:00 2001 From: adi Date: Tue, 28 Jun 2022 16:56:10 +0300 Subject: [PATCH] Failed posts and failover for export loggers --- config/apis.go | 6 + config/config_defaults.go | 11 +- config/logger.go | 36 ++++-- data/conf/samples/tutinternal/cgrates.json | 10 +- data/tariffplans/oldtutorial/Attributes.csv | 3 +- data/tariffplans/testit/Accounts.csv | 2 - engine/logger.go | 128 +++++++++++++------- engine/logger_test.go | 30 ++--- services/cgr-engine.go | 2 +- utils/consts.go | 4 +- utils/coreutils.go | 10 ++ utils/errors.go | 1 + utils/failover_export.go | 70 +++++++++++ 13 files changed, 229 insertions(+), 84 deletions(-) create mode 100644 utils/failover_export.go diff --git a/config/apis.go b/config/apis.go index 830afb93a..c1f959405 100644 --- a/config/apis.go +++ b/config/apis.go @@ -350,6 +350,12 @@ func storeDiffSection(ctx *context.Context, section string, db ConfigDB, v1, v2 return } return db.SetSection(ctx, section, diffGeneralJsonCfg(jsn, v1.GeneralCfg(), v2.GeneralCfg())) + case LoggerJSON: + jsn := new(LoggerJsonCfg) + if err = db.GetSection(ctx, section, jsn); err != nil { + return + } + return db.SetSection(ctx, section, diffLoggerJsonCfg(jsn, v1.LoggerCfg(), v2.LoggerCfg())) case RPCConnsJSON: jsn := make(RPCConnsJson) if err = db.GetSection(ctx, section, &jsn); err != nil { diff --git a/config/config_defaults.go b/config/config_defaults.go index 4fdf596ba..f44092e28 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -70,12 +70,13 @@ const CGRATES_CFG_JSON = ` }, "logger": { - "type": "*syslog", // controls the destination of logs <*syslog|*stdout|*kafka> - "level": 6, // system level precision for floats + "type": "*syslog", // controls the destination of logs <*syslog|*stdout|*kafka> + "level": 6, // system level precision for floats "opts": { - "*kafka_conn": "", // the connection trough kafka - "*kafka_topic": "", // the topic from where the events are exported - "*attempts": 1, // number of attempts of connecting + "kafka_conn": "", // the connection trough kafka + "kafka_topic": "", // the topic from where the events are exported + "attempts": 1, // number of attempts of connecting + "failed_posts_dir": "/var/spool/cgrates/failed_posts" // path where fail logs are exported }, }, diff --git a/config/logger.go b/config/logger.go index e0a7d29d0..61c58f72d 100644 --- a/config/logger.go +++ b/config/logger.go @@ -40,7 +40,7 @@ func (loggCfg *LoggerCfg) Load(ctx *context.Context, jsnCfg ConfigDB, _ *CGRConf // loadFromJSONCfg loads Logger config from JsonCfg func (loggCfg *LoggerCfg) loadFromJSONCfg(jsnLoggerCfg *LoggerJsonCfg) (err error) { - if loggCfg == nil { + if jsnLoggerCfg == nil { return nil } if jsnLoggerCfg.Type != nil && *jsnLoggerCfg.Type != utils.EmptyString { @@ -65,9 +65,10 @@ func (loggCfg *LoggerCfg) AsMapInterface(string) interface{} { } type LoggerOptsCfg struct { - KafkaConn string `json:"*kakfa_conn"` - KafkaTopic string `json:"*kakfa_topic"` - Attempts int `json:"*attempts"` + KafkaConn string + KafkaTopic string + Attempts int + FailedPostsDir string } func (LoggerCfg) SName() string { return LoggerJSON } @@ -96,14 +97,18 @@ func (loggOpts *LoggerOptsCfg) loadFromJSONCfg(jsnCfg *LoggerOptsJson) { if jsnCfg.Attempts != nil { loggOpts.Attempts = *jsnCfg.Attempts } + if jsnCfg.Failed_posts_dir != nil { + loggOpts.FailedPostsDir = *jsnCfg.Failed_posts_dir + } } // AsMapInterface returns the config of logger OPTS as a map[string]interface{} func (loggOpts *LoggerOptsCfg) AsMapInterface() interface{} { return map[string]interface{}{ - utils.KafkaConnCfg: loggOpts.KafkaConn, - utils.KafkaTopicCfg: loggOpts.KafkaTopic, - utils.AttemptsCfg: loggOpts.Attempts, + utils.KafkaConnCfg: loggOpts.KafkaConn, + utils.KafkaTopicCfg: loggOpts.KafkaTopic, + utils.AttemptsCfg: loggOpts.Attempts, + utils.FailedPostsDirCfg: loggOpts.FailedPostsDir, } } @@ -113,9 +118,10 @@ func (loggerOpts *LoggerOptsCfg) Clone() *LoggerOptsCfg { return nil } return &LoggerOptsCfg{ - KafkaConn: loggerOpts.KafkaConn, - KafkaTopic: loggerOpts.KafkaTopic, - Attempts: loggerOpts.Attempts, + KafkaConn: loggerOpts.KafkaConn, + KafkaTopic: loggerOpts.KafkaTopic, + Attempts: loggerOpts.Attempts, + FailedPostsDir: loggerOpts.FailedPostsDir, } } @@ -126,9 +132,10 @@ type LoggerJsonCfg struct { } type LoggerOptsJson struct { - Kafka_conn *string - Kafka_topic *string - Attempts *int + Kafka_conn *string `json:"kafka_conn"` + Kafka_topic *string `json:"kafka_topic"` + Attempts *int `json:"attempts"` + Failed_posts_dir *string `json:"failed_posts_dir"` } func diffLoggerJsonCfg(d *LoggerJsonCfg, v1, v2 *LoggerCfg) *LoggerJsonCfg { @@ -158,5 +165,8 @@ func diffLoggerOptsJsonCfg(d *LoggerOptsJson, v1, v2 *LoggerOptsCfg) *LoggerOpts if v1.Attempts != v2.Attempts { d.Attempts = utils.IntPointer(v2.Attempts) } + if v1.FailedPostsDir != v2.FailedPostsDir { + d.Failed_posts_dir = utils.StringPointer(v2.FailedPostsDir) + } return d } diff --git a/data/conf/samples/tutinternal/cgrates.json b/data/conf/samples/tutinternal/cgrates.json index da5da7341..9920fa6a7 100644 --- a/data/conf/samples/tutinternal/cgrates.json +++ b/data/conf/samples/tutinternal/cgrates.json @@ -1,10 +1,18 @@ { "general": { - "log_level": 7, "reply_timeout": "50s" }, +"logger": { + "type": "*kafka", // controls the destination of logs <*syslog|*stdout|*kafka> + "level": 6, // system level precision for floats + "opts": { + "kafka_conn": "192.168.55.44:9092", // the connection trough kafka + "kafka_topic": "TutorialTopic", // the topic from where the events are exported + "attempts": 1, // number of attempts of connecting + }, +}, "listen": { "rpc_json": ":2012", diff --git a/data/tariffplans/oldtutorial/Attributes.csv b/data/tariffplans/oldtutorial/Attributes.csv index 920a2ef09..ca520370b 100644 --- a/data/tariffplans/oldtutorial/Attributes.csv +++ b/data/tariffplans/oldtutorial/Attributes.csv @@ -1,4 +1,5 @@ #Tenant,ID,FilterIDs,Weights,Blockers,AttributeFilterIDs,AttributeBlockers,Path,Type,Value cgrates.org,ATTR_1,*string:~*opts.*context:*sessions|*cdrs;*string:~*req.Account:1007,;10,;false,,,*req.Account,*constant,1001 cgrates.org,ATTR_1,,,,,,*req.Subject,*constant,1001 -cgrates.org,ATTR_PASS,*string:~*opts.*context:*sessions;*string:~*req.Account:1001,;10,;false,,,*req.PasswordFromAttributes,*constant,CGRateSPassword1 \ No newline at end of file +cgrates.org,ATTR_PASS,*string:~*opts.*context:*sessions;*string:~*req.Account:1001,;10,;false,,,*req.PasswordFromAttributes,*constant,CGRateSPassword1 + diff --git a/data/tariffplans/testit/Accounts.csv b/data/tariffplans/testit/Accounts.csv index f3fb0d81c..432f775a9 100644 --- a/data/tariffplans/testit/Accounts.csv +++ b/data/tariffplans/testit/Accounts.csv @@ -1,7 +1,5 @@ #Tenant,ID,FilterIDs,Weights,Blockers,Opts,BalanceID,BalanceFilterIDs,BalanceWeights,BalanceBlockers,BalanceType,BalanceUnits,BalanceUnitFactors,BalanceOpts,BalanceCostIncrements,BalanceAttributeIDs,BalanceRateProfileIDs,ThresholdIDs cgrates.org,ACC_PRF_1,,;20,,,MonetaryBalance,,;10,,*monetary,14,fltr1&fltr2;100;fltr3;200,,fltr1&fltr2;1.3;2.3;3.3,attr1;attr2,,*none cgrates.org,1001,,,,,VoiceBalance,,;10,,*voice,1h,,,,,, - -#Tenant,ID,FilterIDs,Weights,Blockers,Opts,BalanceID,BalanceFilterIDs,BalanceWeights,BalanceBlockers,BalanceType,BalanceUnits,BalanceUnitFactors,BalanceOpts,BalanceCostIncrements,BalanceAttributeIDs,BalanceRateProfileIDs,ThresholdIDs cgrates.org,1001,,;20,,,MonetaryBalance,,;10,,*monetary,14,fltr1&fltr2;100;fltr3;200,,fltr1&fltr2;1.3;2.3;3.3,attr1;attr2,,*none cgrates.org,1001,,,,,VoiceBalance,,;10,*string:~*req.Destination:1002;true;;false,*voice,1h,,,,,, \ No newline at end of file diff --git a/engine/logger.go b/engine/logger.go index 92231f8f1..54b052faf 100644 --- a/engine/logger.go +++ b/engine/logger.go @@ -19,9 +19,6 @@ along with this program. If not, see package engine import ( - "bytes" - "encoding/json" - "fmt" "log/syslog" "time" @@ -31,26 +28,26 @@ import ( "github.com/segmentio/kafka-go" ) -func NewLogger(loggerType, tenant, nodeID string, loggCfg *config.LoggerCfg, ctx *context.Context) (utils.LoggerInterface, error) { +func NewLogger(loggerType, tenant, nodeID string, loggCfg *config.LoggerCfg) (utils.LoggerInterface, error) { switch loggerType { case utils.MetaKafka: - return NewExportLogger(nodeID, tenant, loggCfg.Level, loggCfg.Opts, ctx), nil + return NewExportLogger(nodeID, tenant, loggCfg.Level, loggCfg.Opts), nil default: return utils.NewLogger(loggerType, nodeID, loggCfg.Level) } } -// Logs to EEs +// Logs to kafka type ExportLogger struct { logLevel int + fPost *utils.FailoverPoster loggOpts *config.LoggerOptsCfg writer *kafka.Writer - ctx *context.Context nodeID string tenant string } -func NewExportLogger(nodeID, tenant string, level int, opts *config.LoggerOptsCfg, ctx *context.Context) (el *ExportLogger) { +func NewExportLogger(nodeID, tenant string, level int, opts *config.LoggerOptsCfg) (el *ExportLogger) { el = &ExportLogger{ logLevel: level, loggOpts: opts, @@ -61,7 +58,6 @@ func NewExportLogger(nodeID, tenant string, level int, opts *config.LoggerOptsCf Topic: opts.KafkaTopic, MaxAttempts: opts.Attempts, }, - ctx: ctx, } return } @@ -86,27 +82,23 @@ func (el *ExportLogger) call(m string, level int) (err error) { } // event will be exported through kafka as json format var content []byte - if content, err = getContent(eventExport); err != nil { - return err - } - fmt.Println("content: %v", string(content)) - if err = el.writer.WriteMessages(el.ctx, kafka.Message{ - Key: []byte("KafkaExport" + utils.PipeSep + el.loggOpts.KafkaTopic), - Value: content, - }); err != nil { - - } - return -} - -func getContent(event *utils.CGREvent) (content []byte, err error) { - buf := &bytes.Buffer{} - enc := json.NewEncoder(buf) - enc.SetEscapeHTML(false) - if err = enc.Encode(event); err != nil { + if content, err = utils.ToUnescapedJSON(eventExport); err != nil { return } - return buf.Bytes(), err + if err = el.writer.WriteMessages(context.Background(), kafka.Message{ + Key: []byte(utils.GenUUID()), + Value: content, + }); err != nil { + // if there are any errors in kafka, we will post in FailedPostDirectory + el.fPost = utils.NewFailoverPoster() + if err = el.fPost.AddMessage(el.loggOpts.FailedPostsDir, + el.loggOpts.KafkaConn, el.loggOpts.KafkaTopic, eventExport); err != nil { + return + } + // also the content should be printed as a stdout logger type + return utils.ErrLoggerChanged + } + return } func (el *ExportLogger) Write(p []byte) (n int, err error) { @@ -130,65 +122,113 @@ func (el *ExportLogger) SetLogLevel(level int) { } // Alert logs to EEs with alert level -func (el *ExportLogger) Alert(m string) error { +func (el *ExportLogger) Alert(m string) (err error) { if el.logLevel < utils.LOGLEVEL_ALERT { return nil } - return el.call(m, utils.LOGLEVEL_ALERT) + if err = el.call(m, utils.LOGLEVEL_ALERT); err != nil { + if err == utils.ErrLoggerChanged { + utils.NewStdLogger(el.nodeID, el.logLevel).Alert(m) + err = nil + } + } + return //el.call(m, utils.LOGLEVEL_ALERT) } // Crit logs to EEs with critical level -func (el *ExportLogger) Crit(m string) error { +func (el *ExportLogger) Crit(m string) (err error) { if el.logLevel < utils.LOGLEVEL_CRITICAL { return nil } - return el.call(m, utils.LOGLEVEL_CRITICAL) + if el.call(m, utils.LOGLEVEL_CRITICAL); err != nil { + if err == utils.ErrLoggerChanged { + utils.NewStdLogger(el.nodeID, el.logLevel).Crit(m) + err = nil + } + } + return // el.call(m, utils.LOGLEVEL_CRITICAL) } // Debug logs to EEs with debug level -func (el *ExportLogger) Debug(m string) error { +func (el *ExportLogger) Debug(m string) (err error) { if el.logLevel < utils.LOGLEVEL_DEBUG { return nil } - return el.call(m, utils.LOGLEVEL_DEBUG) + if err = el.call(m, utils.LOGLEVEL_DEBUG); err != nil { + if err == utils.ErrLoggerChanged { + utils.NewStdLogger(el.nodeID, el.logLevel).Debug(m) + err = nil + } + } + return // el.call(m, utils.LOGLEVEL_DEBUG) } // Emerg logs to EEs with emergency level -func (el *ExportLogger) Emerg(m string) error { +func (el *ExportLogger) Emerg(m string) (err error) { if el.logLevel < utils.LOGLEVEL_EMERGENCY { return nil } - return el.call(m, utils.LOGLEVEL_EMERGENCY) + if err = el.call(m, utils.LOGLEVEL_EMERGENCY); err != nil { + if err == utils.ErrLoggerChanged { + utils.NewStdLogger(el.nodeID, el.logLevel).Emerg(m) + err = nil + } + } + return // el.call(m, utils.LOGLEVEL_EMERGENCY) } // Err logs to EEs with error level -func (el *ExportLogger) Err(m string) error { +func (el *ExportLogger) Err(m string) (err error) { if el.logLevel < utils.LOGLEVEL_ERROR { return nil } - return el.call(m, utils.LOGLEVEL_ERROR) + if err = el.call(m, utils.LOGLEVEL_ERROR); err != nil { + if err == utils.ErrLoggerChanged { + utils.NewStdLogger(el.nodeID, el.logLevel).Err(m) + err = nil + } + } + return // el.call(m, utils.LOGLEVEL_ERROR) } // Info logs to EEs with info level -func (el *ExportLogger) Info(m string) error { +func (el *ExportLogger) Info(m string) (err error) { if el.logLevel < utils.LOGLEVEL_INFO { return nil } - return el.call(m, utils.LOGLEVEL_INFO) + if err = el.call(m, utils.LOGLEVEL_INFO); err != nil { + if err == utils.ErrLoggerChanged { + utils.NewStdLogger(el.nodeID, el.logLevel).Info(m) + err = nil + } + } + return // el.call(m, utils.LOGLEVEL_INFO) } // Notice logs to EEs with notice level -func (el *ExportLogger) Notice(m string) error { +func (el *ExportLogger) Notice(m string) (err error) { if el.logLevel < utils.LOGLEVEL_NOTICE { return nil } - return el.call(m, utils.LOGLEVEL_NOTICE) + if err = el.call(m, utils.LOGLEVEL_NOTICE); err != nil { + if err == utils.ErrLoggerChanged { + utils.NewStdLogger(el.nodeID, el.logLevel).Notice(m) + err = nil + } + } + return // el.call(m, utils.LOGLEVEL_NOTICE) } // Warning logs to EEs with warning level -func (el *ExportLogger) Warning(m string) error { +func (el *ExportLogger) Warning(m string) (err error) { if el.logLevel < utils.LOGLEVEL_WARNING { return nil } - return el.call(m, utils.LOGLEVEL_WARNING) + if err = el.call(m, utils.LOGLEVEL_WARNING); err != nil { + if err == utils.ErrLoggerChanged { + utils.NewStdLogger(el.nodeID, el.logLevel).Warning(m) + err = nil + } + } + return // el.call(m, utils.LOGLEVEL_WARNING) } diff --git a/engine/logger_test.go b/engine/logger_test.go index c5d27e194..fbf542202 100644 --- a/engine/logger_test.go +++ b/engine/logger_test.go @@ -33,7 +33,7 @@ import ( func TestLoggerNewLoggerExport(t *testing.T) { cfg := config.NewDefaultCGRConfig() exp := &ExportLogger{ - logLevel: 7, + logLevel: 6, nodeID: "123", tenant: "cgrates.org", loggOpts: cfg.LoggerCfg().Opts, @@ -43,7 +43,7 @@ func TestLoggerNewLoggerExport(t *testing.T) { MaxAttempts: cfg.LoggerCfg().Opts.Attempts, }, } - if rcv, err := NewLogger(utils.MetaEEs, "cgrates.org", "123", nil, nil); err != nil { + if rcv, err := NewLogger(utils.MetaKafka, "cgrates.org", "123", cfg.LoggerCfg()); err != nil { t.Error(err) } else if !reflect.DeepEqual(rcv.(*ExportLogger), exp) { t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv) @@ -52,7 +52,7 @@ func TestLoggerNewLoggerExport(t *testing.T) { func TestLoggerNewLoggerDefault(t *testing.T) { experr := `unsupported logger: ` - if _, err := NewLogger("invalid", "cgrates.org", "123", nil, nil); err == nil || + if _, err := NewLogger("invalid", "cgrates.org", "123", nil); err == nil || err.Error() != experr { t.Errorf("expected: <%s>, \nreceived: <%s>", experr, err) } @@ -71,7 +71,7 @@ func TestLoggerNewExportLogger(t *testing.T) { MaxAttempts: cfg.LoggerCfg().Opts.Attempts, }, } - if rcv := NewExportLogger("123", "cgrates.org", 7, cfg.LoggerCfg().Opts, nil); !reflect.DeepEqual(rcv, exp) { + if rcv := NewExportLogger("123", "cgrates.org", 7, cfg.LoggerCfg().Opts); !reflect.DeepEqual(rcv, exp) { t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv) } } @@ -112,7 +112,7 @@ func TestLoggerExportEmerg(t *testing.T) { rpcInternal <- ccM cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal) - el := NewExportLogger("123", "cgrates.org", -1, cM, []string{eesConn}) + el := NewExportLogger("123", "cgrates.org", -1, nil) if err := el.Emerg("Emergency message"); err != nil { t.Error(err) @@ -159,7 +159,7 @@ func TestLoggerExportAlert(t *testing.T) { rpcInternal <- ccM cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal) - el := NewExportLogger("123", "cgrates.org", 0, cM, []string{eesConn}) + el := NewExportLogger("123", "cgrates.org", 0, nil) if err := el.Alert("Alert message"); err != nil { t.Error(err) @@ -206,7 +206,7 @@ func TestLoggerExportCrit(t *testing.T) { rpcInternal <- ccM cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal) - el := NewExportLogger("123", "cgrates.org", 1, cM, []string{eesConn}) + el := NewExportLogger("123", "cgrates.org", 1, nil) if err := el.Crit("Critical message"); err != nil { t.Error(err) @@ -253,7 +253,7 @@ func TestLoggerExportErr(t *testing.T) { rpcInternal <- ccM cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal) - el := NewExportLogger("123", "cgrates.org", 2, cM, []string{eesConn}) + el := NewExportLogger("123", "cgrates.org", 2, nil) if err := el.Err("Error message"); err != nil { t.Error(err) @@ -300,7 +300,7 @@ func TestLoggerExportWarning(t *testing.T) { rpcInternal <- ccM cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal) - el := NewExportLogger("123", "cgrates.org", 3, cM, []string{eesConn}) + el := NewExportLogger("123", "cgrates.org", 3, nil) if err := el.Warning("Warning message"); err != nil { t.Error(err) @@ -347,7 +347,7 @@ func TestLoggerExportNotice(t *testing.T) { rpcInternal <- ccM cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal) - el := NewExportLogger("123", "cgrates.org", 4, cM, []string{eesConn}) + el := NewExportLogger("123", "cgrates.org", 4, nil) if err := el.Notice("Notice message"); err != nil { t.Error(err) @@ -394,7 +394,7 @@ func TestLoggerExportInfo(t *testing.T) { rpcInternal <- ccM cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal) - el := NewExportLogger("123", "cgrates.org", 5, cM, []string{eesConn}) + el := NewExportLogger("123", "cgrates.org", 5, nil) if err := el.Info("Info message"); err != nil { t.Error(err) @@ -441,7 +441,7 @@ func TestLoggerExportDebug(t *testing.T) { rpcInternal <- ccM cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal) - el := NewExportLogger("123", "cgrates.org", 6, cM, []string{eesConn}) + el := NewExportLogger("123", "cgrates.org", 6, nil) if err := el.Debug("Debug message"); err != nil { t.Error(err) @@ -453,7 +453,7 @@ func TestLoggerExportDebug(t *testing.T) { } func TestLoggerSetGetLogLevel(t *testing.T) { - el := NewExportLogger("123", "cgrates.org", 6, nil, nil) + el := NewExportLogger("123", "cgrates.org", 6, nil) if rcv := el.GetLogLevel(); rcv != 6 { t.Errorf("expected: <%+v>, \nreceived: <%+v>", 6, rcv) } @@ -464,7 +464,7 @@ func TestLoggerSetGetLogLevel(t *testing.T) { } func TestLoggerGetSyslog(t *testing.T) { - el := NewExportLogger("123", "cgrates.org", 6, nil, nil) + el := NewExportLogger("123", "cgrates.org", 6, nil) if el.GetSyslog() != nil { t.Errorf("expected: <%+v>, \nreceived: <%+v>", nil, el.GetSyslog()) } @@ -506,7 +506,7 @@ func TestLoggerExportWrite(t *testing.T) { rpcInternal <- ccM cM.AddInternalConn(eesConn, utils.EeSv1, rpcInternal) - el := NewExportLogger("123", "cgrates.org", 8, cM, []string{eesConn}) + el := NewExportLogger("123", "cgrates.org", 8, nil) if _, err := el.Write([]byte("message")); err != nil { t.Error(err) diff --git a/services/cgr-engine.go b/services/cgr-engine.go index f127f6fba..a551bdb5b 100644 --- a/services/cgr-engine.go +++ b/services/cgr-engine.go @@ -350,7 +350,7 @@ func (cgr *CGREngine) Init(ctx *context.Context, shtDw context.CancelFunc, flags // init syslog if utils.Logger, err = engine.NewLogger(utils.FirstNonEmpty(*flags.SysLogger, cgr.cfg.LoggerCfg().Type), - cgr.cfg.GeneralCfg().DefaultTenant, cgr.cfg.GeneralCfg().NodeID, cgr.cfg.LoggerCfg(), ctx); err != nil { + cgr.cfg.GeneralCfg().DefaultTenant, cgr.cfg.GeneralCfg().NodeID, cgr.cfg.LoggerCfg()); err != nil { return fmt.Errorf("Could not initialize syslog connection, err: <%s>", err) } utils.Logger.Info(fmt.Sprintf(" starting version <%s><%s>", vers, runtime.Version())) diff --git a/utils/consts.go b/utils/consts.go index 03c0e226a..42d721c6a 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1731,8 +1731,8 @@ const ( const ( LevelCfg = "level" - KafkaConnCfg = "*kafka_conn" - KafkaTopicCfg = "*kafka_topic" + KafkaConnCfg = "kafka_conn" + KafkaTopicCfg = "kafka_topic" ) const ( diff --git a/utils/coreutils.go b/utils/coreutils.go index e39fb6a9f..e683ddc8f 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -486,6 +486,16 @@ func ToJSON(v interface{}) string { return string(b) } +func ToUnescapedJSON(value interface{}) (bts []byte, err error) { + buf := &bytes.Buffer{} + enc := json.NewEncoder(buf) + enc.SetEscapeHTML(false) + if err = enc.Encode(value); err != nil { + return + } + return buf.Bytes(), err +} + // Used as generic function logic for various fields // Attributes diff --git a/utils/errors.go b/utils/errors.go index a1b0235a9..27eb69044 100644 --- a/utils/errors.go +++ b/utils/errors.go @@ -65,6 +65,7 @@ var ( DispatcherErrorPrefix = "DISPATCHER_ERROR" RateSErrPrfx = "RATES_ERROR" AccountSErrPrfx = "ACCOUNTS_ERROR" + ErrLoggerChanged = errors.New("LOGGER_CHANGED") ErrUnsupportedFormat = errors.New("UNSUPPORTED_FORMAT") ErrNoDatabaseConn = errors.New("NO_DATABASE_CONNECTION") ErrMaxIncrementsExceeded = errors.New("MAX_INCREMENTS_EXCEEDED") diff --git a/utils/failover_export.go b/utils/failover_export.go new file mode 100644 index 000000000..c1bdc8e73 --- /dev/null +++ b/utils/failover_export.go @@ -0,0 +1,70 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package utils + +import ( + "encoding/gob" + "fmt" + "os" + "path/filepath" +) + +type FailoverPoster struct { + MessageProvider +} + +func NewFailoverPoster( /*addMsg *MessageProvider*/ ) *FailoverPoster { + return new(FailoverPoster) +} + +func (fldPst *FailoverPoster) AddMessage(failedPostDir, kafkaConn, + kafkaTopic string, content interface{}) (err error) { + filePath := filepath.Join(failedPostDir, kafkaTopic+PipeSep+MetaKafka+GOBSuffix) + var fileOut *os.File + if _, err = os.Stat(filePath); os.IsNotExist(err) { + fileOut, err = os.Create(filePath) + if err != nil { + return fmt.Errorf(fmt.Sprintf(" failed to write logs to file <%s> because <%s>", filePath, err)) + } + } else { + fileOut, err = os.OpenFile(filePath, os.O_RDWR|os.O_APPEND, 0755) + if err != nil { + return err + } + } + enc := gob.NewEncoder(fileOut) + if err = enc.Encode(content); err != nil { + return err + } + fileOut.Close() + return +} + +type MessageProvider interface { + GetContent() string + GetMeta() string +} + +func (fldPst *FailoverPoster) GetContent() string { + return EmptyString +} + +func (fldPst *FailoverPoster) GetMeta() string { + return EmptyString +}