From 04f746c6347bedc6ef04cfda5c6f27bf259eb8be Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Tue, 14 Jan 2025 19:50:47 +0200 Subject: [PATCH] Implement LoggerService + refactoring (incomplete) --- cmd/cgr-engine/cgr-engine.go | 31 +++++++----- efs/efs.go | 5 +- efs/failed_logs.go | 23 ++++++--- engine/kafka_logger.go | 44 ++++++----------- engine/kafka_logger_test.go | 44 +++++------------ services/libcgr-engine.go | 9 ++-- services/logger.go | 93 ++++++++++++++++++++++++++++++++++++ utils/consts.go | 1 + utils/logger.go | 20 ++------ utils/logger_test.go | 24 ---------- 10 files changed, 170 insertions(+), 124 deletions(-) create mode 100644 services/logger.go diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 90c738d75..b357dd330 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -36,7 +36,6 @@ import ( "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" - "github.com/cgrates/cgrates/efs" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/loaders" "github.com/cgrates/cgrates/services" @@ -72,10 +71,27 @@ func runCGREngine(fs []string) (err error) { } var cfg *config.CGRConfig - if cfg, err = services.InitConfigFromPath(context.TODO(), *flags.CfgPath, *flags.NodeID, *flags.LogLevel); err != nil || *flags.CheckConfig { + if cfg, err = services.InitConfigFromPath(context.TODO(), *flags.CfgPath, *flags.NodeID, + *flags.Logger, *flags.LogLevel); err != nil || *flags.CheckConfig { return } + if cfg.LoggerCfg().Level >= 0 { + switch cfg.LoggerCfg().Type { + case utils.MetaSysLog: + utils.Logger, err = utils.NewSysLogger(cfg.GeneralCfg().NodeID, cfg.LoggerCfg().Level) + if err != nil { + return + } + case utils.MetaStdLog, utils.MetaKafkaLog: + // If the logger is of type *kafka, use the *stdout logger until + // LoggerService finishes startup. + utils.Logger = utils.NewStdLogger(cfg.GeneralCfg().NodeID, cfg.LoggerCfg().Level) + default: + return fmt.Errorf("unsupported logger type: %q", cfg.LoggerCfg().Type) + } + } + var cpuPrfF *os.File if *flags.CpuPrfDir != utils.EmptyString { cpuPath := filepath.Join(*flags.CpuPrfDir, utils.CpuPathCgr) @@ -107,15 +123,6 @@ func runCGREngine(fs []string) (err error) { }() } - // init syslog - if utils.Logger, err = engine.NewLogger(context.TODO(), - utils.FirstNonEmpty(*flags.Logger, cfg.LoggerCfg().Type), - cfg.GeneralCfg().DefaultTenant, - cfg.GeneralCfg().NodeID, - nil, cfg); err != nil { - return fmt.Errorf("Could not initialize syslog connection, err: <%s>", err) - } - efs.SetFailedPostCacheTTL(cfg.EFsCfg().FailedPostsTTL) // init failedPosts to posts loggers/exporters in case of failing utils.Logger.Info(fmt.Sprintf(" starting version <%s><%s>", vers, runtime.Version())) caps := engine.NewCaps(cfg.CoreSCfg().Caps, cfg.CoreSCfg().CapsStrategy) @@ -129,6 +136,7 @@ func runCGREngine(fs []string) (err error) { cls := services.NewCommonListenerService(cfg, caps) anzS := services.NewAnalyzerService(cfg) cms := services.NewConnManagerService(cfg) + lgs := services.NewLoggerService(cfg, *flags.Logger) dmS := services.NewDataDBService(cfg, *flags.SetVersions, srvDep) sdbS := services.NewStorDBService(cfg, *flags.SetVersions) configS := services.NewConfigService(cfg) @@ -172,6 +180,7 @@ func runCGREngine(fs []string) (err error) { cls, anzS, cms, + lgs, dmS, sdbS, configS, diff --git a/efs/efs.go b/efs/efs.go index d066c1fff..c7c67c4c8 100644 --- a/efs/efs.go +++ b/efs/efs.go @@ -111,7 +111,10 @@ type ReplayEventsParams struct { // V1ReplayEvents will read the Events from gob files that were failed to be exported and try to re-export them again. func (efS *EfS) V1ReplayEvents(ctx *context.Context, args ReplayEventsParams, reply *string) error { - // Set default directories if not provided. + // Set default tenant and directories if not provided. + if args.Tenant == "" { + args.Tenant = efS.cfg.GeneralCfg().DefaultTenant + } if args.SourcePath == "" { args.SourcePath = efS.cfg.EFsCfg().FailedPostsDir } diff --git a/efs/failed_logs.go b/efs/failed_logs.go index 938bb2337..658f2b46f 100644 --- a/efs/failed_logs.go +++ b/efs/failed_logs.go @@ -70,15 +70,24 @@ func NewExportEventsFromFile(filePath string) (*FailedExportersLog, error) { return &expEv, nil } -// ReplayFailedPosts tryies to post cdrs again +// ReplayFailedPosts tries to repost failed cdrs. func (expEv *FailedExportersLog) ReplayFailedPosts(ctx *context.Context, attempts int, tnt string) error { - nodeID := utils.IfaceAsString(expEv.Opts[utils.NodeID]) - logLvl, err := utils.IfaceAsInt(expEv.Opts[utils.Level]) - if err != nil { - return err - } - expLogger := engine.NewExportLogger(ctx, nodeID, tnt, logLvl, + expLogger := engine.NewExportLogger(ctx, tnt, expEv.connMngr, expEv.cfg) + + // Fall back to config values even if LogLevel and NodeID are always passed to + // the opts (through the GetMeta method on the ExportLogger), just to be safe. + if v, has := expEv.Opts[utils.NodeID]; has { + expLogger.NodeID = utils.IfaceAsString(v) + } + if v, has := expEv.Opts[utils.Level]; has { + lvl, err := utils.IfaceAsInt(v) + if err != nil { + return err + } + expLogger.LogLevel = lvl + } + for _, event := range expEv.Events { content, err := utils.ToUnescapedJSON(event) if err != nil { diff --git a/engine/kafka_logger.go b/engine/kafka_logger.go index bedb46b67..45285bedb 100644 --- a/engine/kafka_logger.go +++ b/engine/kafka_logger.go @@ -20,7 +20,6 @@ package engine import ( "fmt" - "log" "log/syslog" "sync" "time" @@ -31,24 +30,12 @@ import ( "github.com/segmentio/kafka-go" ) -func NewLogger(ctx *context.Context, loggerType, tnt, nodeID string, - connMgr *ConnManager, cfg *config.CGRConfig) (utils.LoggerInterface, error) { - switch loggerType { - case utils.MetaKafkaLog: - return NewExportLogger(ctx, nodeID, tnt, cfg.LoggerCfg().Level, connMgr, cfg), nil - case utils.MetaStdLog, utils.MetaSysLog: - return utils.NewLogger(loggerType, nodeID, cfg.LoggerCfg().Level) - default: - return nil, fmt.Errorf("unsupported logger: <%+s>", loggerType) - } -} - // Logs to kafka type ExportLogger struct { sync.Mutex - cfg *config.CGRConfig - connMgr *ConnManager - ctx *context.Context + efsConns []string + connMgr *ConnManager + ctx *context.Context LogLevel int FldPostDir string @@ -58,15 +45,15 @@ type ExportLogger struct { } // NewExportLogger will export loggers to kafka -func NewExportLogger(ctx *context.Context, nodeID, tenant string, level int, - connMgr *ConnManager, cfg *config.CGRConfig) (el *ExportLogger) { - el = &ExportLogger{ +func NewExportLogger(ctx *context.Context, tenant string, connMgr *ConnManager, + cfg *config.CGRConfig) *ExportLogger { + return &ExportLogger{ ctx: ctx, + efsConns: cfg.LoggerCfg().EFsConns, connMgr: connMgr, - cfg: cfg, - LogLevel: level, + LogLevel: cfg.LoggerCfg().Level, FldPostDir: cfg.LoggerCfg().Opts.FailedPostsDir, - NodeID: nodeID, + NodeID: cfg.GeneralCfg().NodeID, Tenant: tenant, Writer: &kafka.Writer{ Addr: kafka.TCP(cfg.LoggerCfg().Opts.KafkaConn), @@ -74,7 +61,6 @@ func NewExportLogger(ctx *context.Context, nodeID, tenant string, level int, MaxAttempts: cfg.LoggerCfg().Opts.KafkaAttempts, }, } - return } func (el *ExportLogger) Close() (err error) { @@ -115,12 +101,10 @@ func (el *ExportLogger) call(m string, level int) (err error) { APIOpts: el.GetMeta(), } var reply string - if err = el.connMgr.Call(el.ctx, el.cfg.LoggerCfg().EFsConns, - utils.EfSv1ProcessEvent, args, &reply); err != nil { - log.Printf("err la sefprocessEvent: %v", err) - /* utils.Logger.Warning( - fmt.Sprintf("<%s> Exporter could not writte failed event with <%s> service because err: <%s>", - utils.Logger, utils.EFs, err.Error())) */ + if err = el.connMgr.Call(el.ctx, el.efsConns, utils.EfSv1ProcessEvent, + args, &reply); err != nil { + utils.Logger.Warning(fmt.Sprintf( + "<%s> failed to export log event: %v", utils.EFs, err)) } }() // also the content should be printed as a stdout logger type @@ -168,7 +152,7 @@ func (el *ExportLogger) Crit(m string) (err error) { if el.LogLevel < utils.LOGLEVEL_CRITICAL { return nil } - if el.call(m, utils.LOGLEVEL_CRITICAL); err != nil { + if err = el.call(m, utils.LOGLEVEL_CRITICAL); err != nil { if err == utils.ErrLoggerChanged { utils.NewStdLogger(el.NodeID, el.LogLevel).Crit(m) err = nil diff --git a/engine/kafka_logger_test.go b/engine/kafka_logger_test.go index 9124ab6eb..6887fa34c 100644 --- a/engine/kafka_logger_test.go +++ b/engine/kafka_logger_test.go @@ -29,34 +29,14 @@ import ( "github.com/segmentio/kafka-go" ) -func TestLoggerNewLoggerExportKafkaLog(t *testing.T) { - cfg := config.NewDefaultCGRConfig() - cM := NewConnManager(cfg) - - exp := NewExportLogger(context.Background(), "123", "cgrates.org", 6, cM, cfg) - if rcv, err := NewLogger(context.Background(), utils.MetaKafkaLog, "cgrates.org", "123", cM, cfg); err != nil { - t.Error(err) - } else if !reflect.DeepEqual(rcv, exp) { - t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv) - } -} - -func TestLoggerNewLoggerDefault(t *testing.T) { - cfg := config.NewDefaultCGRConfig() - cM := NewConnManager(cfg) - experr := `unsupported logger: ` - if _, err := NewLogger(context.Background(), "invalid", "cgrates.org", "123", cM, cfg); err == nil || - err.Error() != experr { - t.Errorf("expected: <%s>, \nreceived: <%s>", experr, err) - } -} - func TestLoggerNewExportLogger(t *testing.T) { cfg := config.NewDefaultCGRConfig() + cfg.LoggerCfg().Level = 7 + cfg.GeneralCfg().NodeID = "123" cM := NewConnManager(cfg) exp := &ExportLogger{ ctx: context.Background(), - cfg: cfg, + efsConns: []string{"*internal:*efs"}, connMgr: cM, FldPostDir: "/var/spool/cgrates/failed_posts", LogLevel: 7, @@ -68,18 +48,20 @@ func TestLoggerNewExportLogger(t *testing.T) { MaxAttempts: cfg.LoggerCfg().Opts.KafkaAttempts, }, } - if rcv := NewExportLogger(context.Background(), "123", "cgrates.org", 7, cM, cfg); !reflect.DeepEqual(rcv, exp) { + if rcv := NewExportLogger(context.Background(), "cgrates.org", cM, cfg); !reflect.DeepEqual(rcv, exp) { t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv) } } func TestCloseExportLogger(t *testing.T) { cfg := config.NewDefaultCGRConfig() + cfg.LoggerCfg().Level = 7 + cfg.GeneralCfg().NodeID = "123" cM := NewConnManager(cfg) - el := NewExportLogger(context.Background(), "123", "cgrates.org", 7, cM, cfg) + el := NewExportLogger(context.Background(), "cgrates.org", cM, cfg) if el == nil { - t.Error("Export logger should'nt be empty") + t.Error("Export logger shouldn't be empty") } if err := el.Close(); err != nil { @@ -88,7 +70,7 @@ func TestCloseExportLogger(t *testing.T) { exp := &ExportLogger{ ctx: context.Background(), connMgr: cM, - cfg: cfg, + efsConns: []string{"*internal:*efs"}, LogLevel: 7, FldPostDir: cfg.LoggerCfg().Opts.FailedPostsDir, NodeID: "123", @@ -137,7 +119,7 @@ func TestExportLoggerCallErrWriter(t *testing.T) { dm := NewDataManager(db, cfg.CacheCfg(), cM) Cache = NewCacheS(cfg, dm, cM, nil) - el := NewExportLogger(context.Background(), "123", "cgrates.org", 7, cM, cfg) + el := NewExportLogger(context.Background(), "cgrates.org", cM, cfg) if err := el.call("test msg", 7); err != utils.ErrLoggerChanged || err == nil { t.Error(err) @@ -155,7 +137,7 @@ func TestLoggerExportEmergNil(t *testing.T) { cfg := config.NewDefaultCGRConfig() cM := NewConnManager(cfg) - el := NewExportLogger(context.Background(), "123", "cgrates.org", -1, cM, cfg) + el := NewExportLogger(context.Background(), "cgrates.org", cM, cfg) if err := el.Emerg("Emergency message"); err != nil { t.Error(err) @@ -544,7 +526,7 @@ func TestLoggerExportDebug(t *testing.T) { func TestLoggerSetGetLogLevel(t *testing.T) { cfg := config.NewDefaultCGRConfig() cM := NewConnManager(cfg) - el := NewExportLogger(context.Background(), "123", "cgrates.org", 6, cM, cfg) + el := NewExportLogger(context.Background(), "cgrates.org", cM, cfg) if rcv := el.GetLogLevel(); rcv != 6 { t.Errorf("expected: <%+v>, \nreceived: <%+v>", 6, rcv) } @@ -557,7 +539,7 @@ func TestLoggerSetGetLogLevel(t *testing.T) { func TestLoggerGetSyslog(t *testing.T) { cfg := config.NewDefaultCGRConfig() cM := NewConnManager(cfg) - el := NewExportLogger(context.Background(), "123", "cgrates.org", 6, cM, cfg) + el := NewExportLogger(context.Background(), "cgrates.org", cM, cfg) if el.GetSyslog() != nil { t.Errorf("expected: <%+v>, \nreceived: <%+v>", nil, el.GetSyslog()) } diff --git a/services/libcgr-engine.go b/services/libcgr-engine.go index 73a35677b..5ecf6064c 100644 --- a/services/libcgr-engine.go +++ b/services/libcgr-engine.go @@ -102,7 +102,7 @@ func waitForFilterS(ctx *context.Context, fsCh chan *engine.FilterS) (filterS *e return } -func InitConfigFromPath(ctx *context.Context, path, nodeID string, lgLevel int) (cfg *config.CGRConfig, err error) { +func InitConfigFromPath(ctx *context.Context, path, nodeID, logType string, logLevel int) (cfg *config.CGRConfig, err error) { // Init config if cfg, err = config.NewCGRConfigFromPath(ctx, path); err != nil { err = fmt.Errorf("could not parse config: <%s>", err) @@ -126,8 +126,11 @@ func InitConfigFromPath(ctx *context.Context, path, nodeID string, lgLevel int) if nodeID != utils.EmptyString { cfg.GeneralCfg().NodeID = nodeID } - if lgLevel != -1 { // Modify the log level if provided by command arguments - cfg.LoggerCfg().Level = lgLevel + if logLevel != -1 { // Modify the log level if provided by command arguments + cfg.LoggerCfg().Level = logLevel + } + if logType != utils.EmptyString { + cfg.LoggerCfg().Type = logType } if utils.ConcurrentReqsLimit != 0 { // used as shared variable cfg.CoreSCfg().Caps = utils.ConcurrentReqsLimit diff --git a/services/logger.go b/services/logger.go new file mode 100644 index 000000000..9627007d2 --- /dev/null +++ b/services/logger.go @@ -0,0 +1,93 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "sync" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/efs" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" +) + +// NewLoggerService instantiates a new LoggerService. +func NewLoggerService(cfg *config.CGRConfig, loggerType string) *LoggerService { + return &LoggerService{ + cfg: cfg, + loggerType: loggerType, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), + } +} + +// LoggerService implements Service interface. +type LoggerService struct { + mu sync.RWMutex + cfg *config.CGRConfig + stateDeps *StateDependencies // channel subscriptions for state changes + + loggerType string +} + +// Start handles the service start. +func (s *LoggerService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { + if s.loggerType != utils.MetaKafkaLog { + return nil + } + // TODO: check if we should also wait for EFs. Currently, in case of *kafka + // logger, we log to *stdout until initiated. We should also consider + // removing ErrLoggerChanged error cases if they turn out to be redundant + // (see engine/kafka_logger.go). + cms, err := WaitForServiceState(utils.StateServiceUP, utils.ConnManager, registry, + s.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err + } + cm := cms.(*ConnManagerService).ConnManager() + utils.Logger = engine.NewExportLogger(context.TODO(), s.cfg.GeneralCfg().DefaultTenant, cm, s.cfg) + efs.SetFailedPostCacheTTL(s.cfg.EFsCfg().FailedPostsTTL) + return nil +} + +// Reload handles the config changes. +func (s *LoggerService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error { + return nil +} + +// Shutdown stops the service. +func (s *LoggerService) Shutdown(_ *servmanager.ServiceRegistry) error { + return nil +} + +// ServiceName returns the service name +func (s *LoggerService) ServiceName() string { + return utils.LoggerS +} + +// ShouldRun returns if the service should be running. +func (s *LoggerService) ShouldRun() bool { + return true +} + +// StateChan returns signaling channel of specific state +func (s *LoggerService) StateChan(stateID string) chan struct{} { + return s.stateDeps.StateChan(stateID) +} diff --git a/utils/consts.go b/utils/consts.go index 39cecf694..749ed4b52 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -994,6 +994,7 @@ const ( ServiceManagerS = "ServiceManager" CommonListenerS = "CommonListenerS" ConnManager = "ConnManager" + LoggerS = "LoggerS" ) // Lower service names diff --git a/utils/logger.go b/utils/logger.go index 132922e8a..11dba551e 100644 --- a/utils/logger.go +++ b/utils/logger.go @@ -31,9 +31,9 @@ var noSysLog bool func init() { var err error - if Logger, err = NewLogger(MetaSysLog, EmptyString, 0); err != nil { - noSysLog = true - Logger, _ = NewLogger(MetaStdLog, EmptyString, 0) + Logger, err = NewSysLogger(EmptyString, 0) + if err != nil { + Logger = NewStdLogger(EmptyString, 0) } } @@ -65,20 +65,6 @@ type LoggerInterface interface { Write(p []byte) (n int, err error) } -func NewLogger(loggerType, nodeID string, logLvl int) (LoggerInterface, error) { - switch loggerType { - case MetaStdLog: - return NewStdLogger(nodeID, logLvl), nil - case MetaSysLog: - if noSysLog { - return NewStdLogger(nodeID, logLvl), nil - } - return NewSysLogger(nodeID, logLvl) - default: - return nil, fmt.Errorf("unsupported logger: <%+s>", loggerType) - } -} - type SysLogger struct { logLevel int syslog *syslog.Writer diff --git a/utils/logger_test.go b/utils/logger_test.go index e90a908b1..f7fa7ab99 100644 --- a/utils/logger_test.go +++ b/utils/logger_test.go @@ -28,30 +28,6 @@ import ( "testing" ) -func TestLoggerNewLoggerSyslogOK(t *testing.T) { - if noSysLog { - t.SkipNow() - } - exp := &SysLogger{ - logLevel: 7, - } - if rcv, err := NewLogger(MetaSysLog, EmptyString, 7); err != nil { - t.Error(err) - } else { - exp.syslog = rcv.GetSyslog() - if !reflect.DeepEqual(rcv, exp) { - t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv) - } - } -} - -func TestLoggerNewLoggerUnsupported(t *testing.T) { - experr := `unsupported logger: ` - if _, err := NewLogger("unsupported", EmptyString, 7); err == nil || err.Error() != experr { - t.Errorf("expected: <%s>, \nreceived: <%+v>", experr, err) - } -} - func TestLoggerSysloggerSetGetLogLevel(t *testing.T) { if noSysLog { t.SkipNow()