From 107932e3d27fec162654033be5983069a64ebeba Mon Sep 17 00:00:00 2001 From: adi Date: Thu, 28 Jul 2022 16:53:00 +0300 Subject: [PATCH] context and efs service starting --- data/conf/samples/efs/efs_internal/cgrates.json | 13 +++++++------ efs/failed_loggs.go | 2 +- engine/kafka_logger.go | 14 +++++++++----- services/cgr-engine.go | 16 ++++++++++++---- services/efs.go | 3 +-- 5 files changed, 30 insertions(+), 18 deletions(-) diff --git a/data/conf/samples/efs/efs_internal/cgrates.json b/data/conf/samples/efs/efs_internal/cgrates.json index 5c37234ae..7a2d4f989 100644 --- a/data/conf/samples/efs/efs_internal/cgrates.json +++ b/data/conf/samples/efs/efs_internal/cgrates.json @@ -5,20 +5,21 @@ "opts": { "kafka_conn": "unavailable_conn:9092", // the connection trough kafka "kafka_topic": "TutorialTopic", // the topic from where the events are exported - "kafka_attempts": 5, // number of attempts of connecting - }, + "kafka_attempts": 5 // number of attempts of connecting + } }, "data_db": { - "db_type": "*internal", + "db_type": "*internal" }, "efs": { - "enabled": true + "enabled": true, + //"efs_conns": ["*localhost"] }, "admins": { "enabled": true, - "ees_conns": ["*localhost"], - }, + "ees_conns": ["*localhost"] + } } \ No newline at end of file diff --git a/efs/failed_loggs.go b/efs/failed_loggs.go index 28c450851..ce2e28f93 100644 --- a/efs/failed_loggs.go +++ b/efs/failed_loggs.go @@ -76,7 +76,7 @@ func (expEv *FailedExportersLogg) ReplayFailedPosts(ctx *context.Context, attemp if err != nil { return } - expLogger := engine.NewExportLogger(nodeID, tnt, logLvl, + expLogger := engine.NewExportLogger(ctx, nodeID, tnt, logLvl, expEv.connMngr, expEv.cfg) for _, event := range expEv.Events { var content []byte diff --git a/engine/kafka_logger.go b/engine/kafka_logger.go index d8990a0f8..a0ddf520b 100644 --- a/engine/kafka_logger.go +++ b/engine/kafka_logger.go @@ -20,6 +20,7 @@ package engine import ( "fmt" + "log" "log/syslog" "sync" "time" @@ -30,11 +31,11 @@ import ( "github.com/segmentio/kafka-go" ) -func NewLogger(loggerType, tnt, nodeID string, +func NewLogger(ctx *context.Context, loggerType, tnt, nodeID string, connMgr *ConnManager, cfg *config.CGRConfig) (utils.LoggerInterface, error) { switch loggerType { case utils.MetaKafkaLog: - return NewExportLogger(nodeID, tnt, cfg.LoggerCfg().Level, connMgr, cfg), nil + return NewExportLogger(ctx, nodeID, tnt, cfg.LoggerCfg().Level, connMgr, cfg), nil case utils.MetaStdLog, utils.MetaSysLog: return utils.NewLogger(loggerType, nodeID, cfg.LoggerCfg().Level) default: @@ -47,6 +48,7 @@ type ExportLogger struct { sync.Mutex cfg *config.CGRConfig connMgr *ConnManager + ctx *context.Context LogLevel int FldPostDir string @@ -56,9 +58,10 @@ type ExportLogger struct { } // NewExportLogger will export loggers to kafka -func NewExportLogger(nodeID, tenant string, level int, +func NewExportLogger(ctx *context.Context, nodeID, tenant string, level int, connMgr *ConnManager, cfg *config.CGRConfig) (el *ExportLogger) { el = &ExportLogger{ + ctx: ctx, connMgr: connMgr, cfg: cfg, LogLevel: level, @@ -97,7 +100,7 @@ func (el *ExportLogger) call(m string, level int) (err error) { if content, err = utils.ToUnescapedJSON(eventExport); err != nil { return } - if err = el.Writer.WriteMessages(context.Background(), kafka.Message{ + if err = el.Writer.WriteMessages(el.ctx, kafka.Message{ Key: []byte(utils.GenUUID()), Value: content, }); err != nil { @@ -111,8 +114,9 @@ func (el *ExportLogger) call(m string, level int) (err error) { APIOpts: el.GetMeta(), } var reply string - if err = el.connMgr.Call(context.Background(), el.cfg.LoggerCfg().EFsConns, + if err = el.connMgr.Call(el.ctx, el.cfg.LoggerCfg().EFsConns, utils.EfSv1ProcessEvent, args, &reply); err != nil { + log.Printf("err la sefprocessEvent: %v", err) /* utils.Logger.Warning( fmt.Sprintf("<%s> Exporter could not writte failed event with <%s> service because err: <%s>", utils.Logger, utils.EFs, err.Error())) */ diff --git a/services/cgr-engine.go b/services/cgr-engine.go index ff8e2f37a..42c1f0c62 100644 --- a/services/cgr-engine.go +++ b/services/cgr-engine.go @@ -23,7 +23,6 @@ import ( "io" "os" "path" - "runtime" "runtime/pprof" "sync" "time" @@ -106,6 +105,7 @@ type CGREngine struct { coreS *CoreService cacheS *CacheService ldrs *LoaderService + efs *ExportFailoverService // chans (need to move this as services) iFilterSCh chan *engine.FilterS @@ -212,6 +212,8 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr cgr.ldrs = NewLoaderService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.server, iLoaderSCh, cgr.cM, cgr.anzS, cgr.srvDep) + cgr.efs = NewExportFailoverService(cgr.cfg, cgr.cM, iEFsCh, cgr.server, cgr.srvDep) + cgr.srvManager.AddServices(cgr.gvS, cgr.coreS, cgr.cacheS, cgr.ldrs, cgr.anzS, dspS, cgr.dmS, NewAdminSv1Service(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.server, @@ -252,7 +254,6 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr NewActionService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.server, iActionSCh, cgr.anzS, cgr.srvDep), NewAccountService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.server, iAccountSCh, cgr.anzS, cgr.srvDep), NewTPeService(cgr.cfg, cgr.cM, cgr.dmS, cgr.server, cgr.srvDep), - NewExportFailoverService(cgr.cfg, cgr.cM, iEFsCh, cgr.server, cgr.srvDep), ) return @@ -269,6 +270,13 @@ func (cgr *CGREngine) StartServices(ctx *context.Context, shtDw context.CancelFu cgr.shdWg.Done() return } + if cgr.efs.ShouldRun() { // efs checking first beacause of loggers + cgr.shdWg.Add(1) + if err = cgr.efs.Start(ctx, shtDw); err != nil { + cgr.shdWg.Done() + return + } + } if cgr.dmS.ShouldRun() { // Some services can run without db, ie: ERs cgr.shdWg.Add(1) if err = cgr.dmS.Start(ctx, shtDw); err != nil { @@ -354,7 +362,7 @@ func (cgr *CGREngine) Init(ctx *context.Context, shtDw context.CancelFunc, flags } // init syslog - if utils.Logger, err = engine.NewLogger( + if utils.Logger, err = engine.NewLogger(ctx, utils.FirstNonEmpty(*flags.SysLogger, cgr.cfg.LoggerCfg().Type), cgr.cfg.GeneralCfg().DefaultTenant, cgr.cfg.GeneralCfg().NodeID, @@ -362,7 +370,7 @@ func (cgr *CGREngine) Init(ctx *context.Context, shtDw context.CancelFunc, flags return fmt.Errorf("Could not initialize syslog connection, err: <%s>", err) } efs.SetFailedPostCacheTTL(cgr.cfg.EFsCfg().FailedPostsTTL) // init failedPosts to posts loggers/exporters in case of failing - utils.Logger.Info(fmt.Sprintf(" starting version <%s><%s>", vers, runtime.Version())) + //utils.Logger.Info(fmt.Sprintf(" starting version <%s><%s>", vers, runtime.Version())) cgr.cfg.LazySanityCheck() return cgr.InitServices(*flags.HttpPrfPath, cpuPrfF, *flags.MemPrfDir, memPrfStop) diff --git a/services/efs.go b/services/efs.go index 26609c6f0..1d8ef7333 100644 --- a/services/efs.go +++ b/services/efs.go @@ -30,7 +30,6 @@ import ( "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/efs" "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) @@ -52,7 +51,7 @@ type ExportFailoverService struct { // NewExportFailoverService is the constructor for the TpeService func NewExportFailoverService(cfg *config.CGRConfig, connMgr *engine.ConnManager, intConnChan chan birpc.ClientConnector, - server *cores.Server, srvDep map[string]*sync.WaitGroup) servmanager.Service { + server *cores.Server, srvDep map[string]*sync.WaitGroup) *ExportFailoverService { return &ExportFailoverService{ cfg: cfg, server: server,