Implement LoggerService + refactoring (incomplete)

This commit is contained in:
ionutboangiu
2025-01-14 19:50:47 +02:00
committed by Dan Christian Bogos
parent e7152dacf8
commit 04f746c634
10 changed files with 170 additions and 124 deletions

View File

@@ -36,7 +36,6 @@ import (
"github.com/cgrates/cgrates/apis"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/efs"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/loaders"
"github.com/cgrates/cgrates/services"
@@ -72,10 +71,27 @@ func runCGREngine(fs []string) (err error) {
}
var cfg *config.CGRConfig
if cfg, err = services.InitConfigFromPath(context.TODO(), *flags.CfgPath, *flags.NodeID, *flags.LogLevel); err != nil || *flags.CheckConfig {
if cfg, err = services.InitConfigFromPath(context.TODO(), *flags.CfgPath, *flags.NodeID,
*flags.Logger, *flags.LogLevel); err != nil || *flags.CheckConfig {
return
}
if cfg.LoggerCfg().Level >= 0 {
switch cfg.LoggerCfg().Type {
case utils.MetaSysLog:
utils.Logger, err = utils.NewSysLogger(cfg.GeneralCfg().NodeID, cfg.LoggerCfg().Level)
if err != nil {
return
}
case utils.MetaStdLog, utils.MetaKafkaLog:
// If the logger is of type *kafka, use the *stdout logger until
// LoggerService finishes startup.
utils.Logger = utils.NewStdLogger(cfg.GeneralCfg().NodeID, cfg.LoggerCfg().Level)
default:
return fmt.Errorf("unsupported logger type: %q", cfg.LoggerCfg().Type)
}
}
var cpuPrfF *os.File
if *flags.CpuPrfDir != utils.EmptyString {
cpuPath := filepath.Join(*flags.CpuPrfDir, utils.CpuPathCgr)
@@ -107,15 +123,6 @@ func runCGREngine(fs []string) (err error) {
}()
}
// init syslog
if utils.Logger, err = engine.NewLogger(context.TODO(),
utils.FirstNonEmpty(*flags.Logger, cfg.LoggerCfg().Type),
cfg.GeneralCfg().DefaultTenant,
cfg.GeneralCfg().NodeID,
nil, cfg); err != nil {
return fmt.Errorf("Could not initialize syslog connection, err: <%s>", err)
}
efs.SetFailedPostCacheTTL(cfg.EFsCfg().FailedPostsTTL) // init failedPosts to posts loggers/exporters in case of failing
utils.Logger.Info(fmt.Sprintf("<CoreS> starting version <%s><%s>", vers, runtime.Version()))
caps := engine.NewCaps(cfg.CoreSCfg().Caps, cfg.CoreSCfg().CapsStrategy)
@@ -129,6 +136,7 @@ func runCGREngine(fs []string) (err error) {
cls := services.NewCommonListenerService(cfg, caps)
anzS := services.NewAnalyzerService(cfg)
cms := services.NewConnManagerService(cfg)
lgs := services.NewLoggerService(cfg, *flags.Logger)
dmS := services.NewDataDBService(cfg, *flags.SetVersions, srvDep)
sdbS := services.NewStorDBService(cfg, *flags.SetVersions)
configS := services.NewConfigService(cfg)
@@ -172,6 +180,7 @@ func runCGREngine(fs []string) (err error) {
cls,
anzS,
cms,
lgs,
dmS,
sdbS,
configS,

View File

@@ -111,7 +111,10 @@ type ReplayEventsParams struct {
// V1ReplayEvents will read the Events from gob files that were failed to be exported and try to re-export them again.
func (efS *EfS) V1ReplayEvents(ctx *context.Context, args ReplayEventsParams, reply *string) error {
// Set default directories if not provided.
// Set default tenant and directories if not provided.
if args.Tenant == "" {
args.Tenant = efS.cfg.GeneralCfg().DefaultTenant
}
if args.SourcePath == "" {
args.SourcePath = efS.cfg.EFsCfg().FailedPostsDir
}

View File

@@ -70,15 +70,24 @@ func NewExportEventsFromFile(filePath string) (*FailedExportersLog, error) {
return &expEv, nil
}
// ReplayFailedPosts tryies to post cdrs again
// ReplayFailedPosts tries to repost failed cdrs.
func (expEv *FailedExportersLog) ReplayFailedPosts(ctx *context.Context, attempts int, tnt string) error {
nodeID := utils.IfaceAsString(expEv.Opts[utils.NodeID])
logLvl, err := utils.IfaceAsInt(expEv.Opts[utils.Level])
if err != nil {
return err
}
expLogger := engine.NewExportLogger(ctx, nodeID, tnt, logLvl,
expLogger := engine.NewExportLogger(ctx, tnt,
expEv.connMngr, expEv.cfg)
// Fall back to config values even if LogLevel and NodeID are always passed to
// the opts (through the GetMeta method on the ExportLogger), just to be safe.
if v, has := expEv.Opts[utils.NodeID]; has {
expLogger.NodeID = utils.IfaceAsString(v)
}
if v, has := expEv.Opts[utils.Level]; has {
lvl, err := utils.IfaceAsInt(v)
if err != nil {
return err
}
expLogger.LogLevel = lvl
}
for _, event := range expEv.Events {
content, err := utils.ToUnescapedJSON(event)
if err != nil {

View File

@@ -20,7 +20,6 @@ package engine
import (
"fmt"
"log"
"log/syslog"
"sync"
"time"
@@ -31,24 +30,12 @@ import (
"github.com/segmentio/kafka-go"
)
func NewLogger(ctx *context.Context, loggerType, tnt, nodeID string,
connMgr *ConnManager, cfg *config.CGRConfig) (utils.LoggerInterface, error) {
switch loggerType {
case utils.MetaKafkaLog:
return NewExportLogger(ctx, nodeID, tnt, cfg.LoggerCfg().Level, connMgr, cfg), nil
case utils.MetaStdLog, utils.MetaSysLog:
return utils.NewLogger(loggerType, nodeID, cfg.LoggerCfg().Level)
default:
return nil, fmt.Errorf("unsupported logger: <%+s>", loggerType)
}
}
// Logs to kafka
type ExportLogger struct {
sync.Mutex
cfg *config.CGRConfig
connMgr *ConnManager
ctx *context.Context
efsConns []string
connMgr *ConnManager
ctx *context.Context
LogLevel int
FldPostDir string
@@ -58,15 +45,15 @@ type ExportLogger struct {
}
// NewExportLogger will export loggers to kafka
func NewExportLogger(ctx *context.Context, nodeID, tenant string, level int,
connMgr *ConnManager, cfg *config.CGRConfig) (el *ExportLogger) {
el = &ExportLogger{
func NewExportLogger(ctx *context.Context, tenant string, connMgr *ConnManager,
cfg *config.CGRConfig) *ExportLogger {
return &ExportLogger{
ctx: ctx,
efsConns: cfg.LoggerCfg().EFsConns,
connMgr: connMgr,
cfg: cfg,
LogLevel: level,
LogLevel: cfg.LoggerCfg().Level,
FldPostDir: cfg.LoggerCfg().Opts.FailedPostsDir,
NodeID: nodeID,
NodeID: cfg.GeneralCfg().NodeID,
Tenant: tenant,
Writer: &kafka.Writer{
Addr: kafka.TCP(cfg.LoggerCfg().Opts.KafkaConn),
@@ -74,7 +61,6 @@ func NewExportLogger(ctx *context.Context, nodeID, tenant string, level int,
MaxAttempts: cfg.LoggerCfg().Opts.KafkaAttempts,
},
}
return
}
func (el *ExportLogger) Close() (err error) {
@@ -115,12 +101,10 @@ func (el *ExportLogger) call(m string, level int) (err error) {
APIOpts: el.GetMeta(),
}
var reply string
if err = el.connMgr.Call(el.ctx, el.cfg.LoggerCfg().EFsConns,
utils.EfSv1ProcessEvent, args, &reply); err != nil {
log.Printf("err la sefprocessEvent: %v", err)
/* utils.Logger.Warning(
fmt.Sprintf("<%s> Exporter could not writte failed event with <%s> service because err: <%s>",
utils.Logger, utils.EFs, err.Error())) */
if err = el.connMgr.Call(el.ctx, el.efsConns, utils.EfSv1ProcessEvent,
args, &reply); err != nil {
utils.Logger.Warning(fmt.Sprintf(
"<%s> failed to export log event: %v", utils.EFs, err))
}
}()
// also the content should be printed as a stdout logger type
@@ -168,7 +152,7 @@ func (el *ExportLogger) Crit(m string) (err error) {
if el.LogLevel < utils.LOGLEVEL_CRITICAL {
return nil
}
if el.call(m, utils.LOGLEVEL_CRITICAL); err != nil {
if err = el.call(m, utils.LOGLEVEL_CRITICAL); err != nil {
if err == utils.ErrLoggerChanged {
utils.NewStdLogger(el.NodeID, el.LogLevel).Crit(m)
err = nil

View File

@@ -29,34 +29,14 @@ import (
"github.com/segmentio/kafka-go"
)
func TestLoggerNewLoggerExportKafkaLog(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cM := NewConnManager(cfg)
exp := NewExportLogger(context.Background(), "123", "cgrates.org", 6, cM, cfg)
if rcv, err := NewLogger(context.Background(), utils.MetaKafkaLog, "cgrates.org", "123", cM, cfg); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(rcv, exp) {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv)
}
}
func TestLoggerNewLoggerDefault(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cM := NewConnManager(cfg)
experr := `unsupported logger: <invalid>`
if _, err := NewLogger(context.Background(), "invalid", "cgrates.org", "123", cM, cfg); err == nil ||
err.Error() != experr {
t.Errorf("expected: <%s>, \nreceived: <%s>", experr, err)
}
}
func TestLoggerNewExportLogger(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.LoggerCfg().Level = 7
cfg.GeneralCfg().NodeID = "123"
cM := NewConnManager(cfg)
exp := &ExportLogger{
ctx: context.Background(),
cfg: cfg,
efsConns: []string{"*internal:*efs"},
connMgr: cM,
FldPostDir: "/var/spool/cgrates/failed_posts",
LogLevel: 7,
@@ -68,18 +48,20 @@ func TestLoggerNewExportLogger(t *testing.T) {
MaxAttempts: cfg.LoggerCfg().Opts.KafkaAttempts,
},
}
if rcv := NewExportLogger(context.Background(), "123", "cgrates.org", 7, cM, cfg); !reflect.DeepEqual(rcv, exp) {
if rcv := NewExportLogger(context.Background(), "cgrates.org", cM, cfg); !reflect.DeepEqual(rcv, exp) {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv)
}
}
func TestCloseExportLogger(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.LoggerCfg().Level = 7
cfg.GeneralCfg().NodeID = "123"
cM := NewConnManager(cfg)
el := NewExportLogger(context.Background(), "123", "cgrates.org", 7, cM, cfg)
el := NewExportLogger(context.Background(), "cgrates.org", cM, cfg)
if el == nil {
t.Error("Export logger should'nt be empty")
t.Error("Export logger shouldn't be empty")
}
if err := el.Close(); err != nil {
@@ -88,7 +70,7 @@ func TestCloseExportLogger(t *testing.T) {
exp := &ExportLogger{
ctx: context.Background(),
connMgr: cM,
cfg: cfg,
efsConns: []string{"*internal:*efs"},
LogLevel: 7,
FldPostDir: cfg.LoggerCfg().Opts.FailedPostsDir,
NodeID: "123",
@@ -137,7 +119,7 @@ func TestExportLoggerCallErrWriter(t *testing.T) {
dm := NewDataManager(db, cfg.CacheCfg(), cM)
Cache = NewCacheS(cfg, dm, cM, nil)
el := NewExportLogger(context.Background(), "123", "cgrates.org", 7, cM, cfg)
el := NewExportLogger(context.Background(), "cgrates.org", cM, cfg)
if err := el.call("test msg", 7); err != utils.ErrLoggerChanged || err == nil {
t.Error(err)
@@ -155,7 +137,7 @@ func TestLoggerExportEmergNil(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cM := NewConnManager(cfg)
el := NewExportLogger(context.Background(), "123", "cgrates.org", -1, cM, cfg)
el := NewExportLogger(context.Background(), "cgrates.org", cM, cfg)
if err := el.Emerg("Emergency message"); err != nil {
t.Error(err)
@@ -544,7 +526,7 @@ func TestLoggerExportDebug(t *testing.T) {
func TestLoggerSetGetLogLevel(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cM := NewConnManager(cfg)
el := NewExportLogger(context.Background(), "123", "cgrates.org", 6, cM, cfg)
el := NewExportLogger(context.Background(), "cgrates.org", cM, cfg)
if rcv := el.GetLogLevel(); rcv != 6 {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", 6, rcv)
}
@@ -557,7 +539,7 @@ func TestLoggerSetGetLogLevel(t *testing.T) {
func TestLoggerGetSyslog(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cM := NewConnManager(cfg)
el := NewExportLogger(context.Background(), "123", "cgrates.org", 6, cM, cfg)
el := NewExportLogger(context.Background(), "cgrates.org", cM, cfg)
if el.GetSyslog() != nil {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", nil, el.GetSyslog())
}

View File

@@ -102,7 +102,7 @@ func waitForFilterS(ctx *context.Context, fsCh chan *engine.FilterS) (filterS *e
return
}
func InitConfigFromPath(ctx *context.Context, path, nodeID string, lgLevel int) (cfg *config.CGRConfig, err error) {
func InitConfigFromPath(ctx *context.Context, path, nodeID, logType string, logLevel int) (cfg *config.CGRConfig, err error) {
// Init config
if cfg, err = config.NewCGRConfigFromPath(ctx, path); err != nil {
err = fmt.Errorf("could not parse config: <%s>", err)
@@ -126,8 +126,11 @@ func InitConfigFromPath(ctx *context.Context, path, nodeID string, lgLevel int)
if nodeID != utils.EmptyString {
cfg.GeneralCfg().NodeID = nodeID
}
if lgLevel != -1 { // Modify the log level if provided by command arguments
cfg.LoggerCfg().Level = lgLevel
if logLevel != -1 { // Modify the log level if provided by command arguments
cfg.LoggerCfg().Level = logLevel
}
if logType != utils.EmptyString {
cfg.LoggerCfg().Type = logType
}
if utils.ConcurrentReqsLimit != 0 { // used as shared variable
cfg.CoreSCfg().Caps = utils.ConcurrentReqsLimit

93
services/logger.go Normal file
View File

@@ -0,0 +1,93 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package services
import (
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/efs"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
// NewLoggerService instantiates a new LoggerService.
func NewLoggerService(cfg *config.CGRConfig, loggerType string) *LoggerService {
return &LoggerService{
cfg: cfg,
loggerType: loggerType,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
// LoggerService implements Service interface.
type LoggerService struct {
mu sync.RWMutex
cfg *config.CGRConfig
stateDeps *StateDependencies // channel subscriptions for state changes
loggerType string
}
// Start handles the service start.
func (s *LoggerService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
if s.loggerType != utils.MetaKafkaLog {
return nil
}
// TODO: check if we should also wait for EFs. Currently, in case of *kafka
// logger, we log to *stdout until initiated. We should also consider
// removing ErrLoggerChanged error cases if they turn out to be redundant
// (see engine/kafka_logger.go).
cms, err := WaitForServiceState(utils.StateServiceUP, utils.ConnManager, registry,
s.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
cm := cms.(*ConnManagerService).ConnManager()
utils.Logger = engine.NewExportLogger(context.TODO(), s.cfg.GeneralCfg().DefaultTenant, cm, s.cfg)
efs.SetFailedPostCacheTTL(s.cfg.EFsCfg().FailedPostsTTL)
return nil
}
// Reload handles the config changes.
func (s *LoggerService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error {
return nil
}
// Shutdown stops the service.
func (s *LoggerService) Shutdown(_ *servmanager.ServiceRegistry) error {
return nil
}
// ServiceName returns the service name
func (s *LoggerService) ServiceName() string {
return utils.LoggerS
}
// ShouldRun returns if the service should be running.
func (s *LoggerService) ShouldRun() bool {
return true
}
// StateChan returns signaling channel of specific state
func (s *LoggerService) StateChan(stateID string) chan struct{} {
return s.stateDeps.StateChan(stateID)
}

View File

@@ -994,6 +994,7 @@ const (
ServiceManagerS = "ServiceManager"
CommonListenerS = "CommonListenerS"
ConnManager = "ConnManager"
LoggerS = "LoggerS"
)
// Lower service names

View File

@@ -31,9 +31,9 @@ var noSysLog bool
func init() {
var err error
if Logger, err = NewLogger(MetaSysLog, EmptyString, 0); err != nil {
noSysLog = true
Logger, _ = NewLogger(MetaStdLog, EmptyString, 0)
Logger, err = NewSysLogger(EmptyString, 0)
if err != nil {
Logger = NewStdLogger(EmptyString, 0)
}
}
@@ -65,20 +65,6 @@ type LoggerInterface interface {
Write(p []byte) (n int, err error)
}
func NewLogger(loggerType, nodeID string, logLvl int) (LoggerInterface, error) {
switch loggerType {
case MetaStdLog:
return NewStdLogger(nodeID, logLvl), nil
case MetaSysLog:
if noSysLog {
return NewStdLogger(nodeID, logLvl), nil
}
return NewSysLogger(nodeID, logLvl)
default:
return nil, fmt.Errorf("unsupported logger: <%+s>", loggerType)
}
}
type SysLogger struct {
logLevel int
syslog *syslog.Writer

View File

@@ -28,30 +28,6 @@ import (
"testing"
)
func TestLoggerNewLoggerSyslogOK(t *testing.T) {
if noSysLog {
t.SkipNow()
}
exp := &SysLogger{
logLevel: 7,
}
if rcv, err := NewLogger(MetaSysLog, EmptyString, 7); err != nil {
t.Error(err)
} else {
exp.syslog = rcv.GetSyslog()
if !reflect.DeepEqual(rcv, exp) {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv)
}
}
}
func TestLoggerNewLoggerUnsupported(t *testing.T) {
experr := `unsupported logger: <unsupported>`
if _, err := NewLogger("unsupported", EmptyString, 7); err == nil || err.Error() != experr {
t.Errorf("expected: <%s>, \nreceived: <%+v>", experr, err)
}
}
func TestLoggerSysloggerSetGetLogLevel(t *testing.T) {
if noSysLog {
t.SkipNow()