context and efs service starting

This commit is contained in:
adi
2022-07-28 16:53:00 +03:00
committed by Dan Christian Bogos
parent 0eb61c965a
commit 107932e3d2
5 changed files with 30 additions and 18 deletions

View File

@@ -5,20 +5,21 @@
"opts": {
"kafka_conn": "unavailable_conn:9092", // the connection trough kafka
"kafka_topic": "TutorialTopic", // the topic from where the events are exported
"kafka_attempts": 5, // number of attempts of connecting
},
"kafka_attempts": 5 // number of attempts of connecting
}
},
"data_db": {
"db_type": "*internal",
"db_type": "*internal"
},
"efs": {
"enabled": true
"enabled": true,
//"efs_conns": ["*localhost"]
},
"admins": {
"enabled": true,
"ees_conns": ["*localhost"],
},
"ees_conns": ["*localhost"]
}
}

View File

@@ -76,7 +76,7 @@ func (expEv *FailedExportersLogg) ReplayFailedPosts(ctx *context.Context, attemp
if err != nil {
return
}
expLogger := engine.NewExportLogger(nodeID, tnt, logLvl,
expLogger := engine.NewExportLogger(ctx, nodeID, tnt, logLvl,
expEv.connMngr, expEv.cfg)
for _, event := range expEv.Events {
var content []byte

View File

@@ -20,6 +20,7 @@ package engine
import (
"fmt"
"log"
"log/syslog"
"sync"
"time"
@@ -30,11 +31,11 @@ import (
"github.com/segmentio/kafka-go"
)
func NewLogger(loggerType, tnt, nodeID string,
func NewLogger(ctx *context.Context, loggerType, tnt, nodeID string,
connMgr *ConnManager, cfg *config.CGRConfig) (utils.LoggerInterface, error) {
switch loggerType {
case utils.MetaKafkaLog:
return NewExportLogger(nodeID, tnt, cfg.LoggerCfg().Level, connMgr, cfg), nil
return NewExportLogger(ctx, nodeID, tnt, cfg.LoggerCfg().Level, connMgr, cfg), nil
case utils.MetaStdLog, utils.MetaSysLog:
return utils.NewLogger(loggerType, nodeID, cfg.LoggerCfg().Level)
default:
@@ -47,6 +48,7 @@ type ExportLogger struct {
sync.Mutex
cfg *config.CGRConfig
connMgr *ConnManager
ctx *context.Context
LogLevel int
FldPostDir string
@@ -56,9 +58,10 @@ type ExportLogger struct {
}
// NewExportLogger will export loggers to kafka
func NewExportLogger(nodeID, tenant string, level int,
func NewExportLogger(ctx *context.Context, nodeID, tenant string, level int,
connMgr *ConnManager, cfg *config.CGRConfig) (el *ExportLogger) {
el = &ExportLogger{
ctx: ctx,
connMgr: connMgr,
cfg: cfg,
LogLevel: level,
@@ -97,7 +100,7 @@ func (el *ExportLogger) call(m string, level int) (err error) {
if content, err = utils.ToUnescapedJSON(eventExport); err != nil {
return
}
if err = el.Writer.WriteMessages(context.Background(), kafka.Message{
if err = el.Writer.WriteMessages(el.ctx, kafka.Message{
Key: []byte(utils.GenUUID()),
Value: content,
}); err != nil {
@@ -111,8 +114,9 @@ func (el *ExportLogger) call(m string, level int) (err error) {
APIOpts: el.GetMeta(),
}
var reply string
if err = el.connMgr.Call(context.Background(), el.cfg.LoggerCfg().EFsConns,
if err = el.connMgr.Call(el.ctx, el.cfg.LoggerCfg().EFsConns,
utils.EfSv1ProcessEvent, args, &reply); err != nil {
log.Printf("err la sefprocessEvent: %v", err)
/* utils.Logger.Warning(
fmt.Sprintf("<%s> Exporter could not writte failed event with <%s> service because err: <%s>",
utils.Logger, utils.EFs, err.Error())) */

View File

@@ -23,7 +23,6 @@ import (
"io"
"os"
"path"
"runtime"
"runtime/pprof"
"sync"
"time"
@@ -106,6 +105,7 @@ type CGREngine struct {
coreS *CoreService
cacheS *CacheService
ldrs *LoaderService
efs *ExportFailoverService
// chans (need to move this as services)
iFilterSCh chan *engine.FilterS
@@ -212,6 +212,8 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr
cgr.ldrs = NewLoaderService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.server,
iLoaderSCh, cgr.cM, cgr.anzS, cgr.srvDep)
cgr.efs = NewExportFailoverService(cgr.cfg, cgr.cM, iEFsCh, cgr.server, cgr.srvDep)
cgr.srvManager.AddServices(cgr.gvS, cgr.coreS, cgr.cacheS,
cgr.ldrs, cgr.anzS, dspS, cgr.dmS,
NewAdminSv1Service(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.server,
@@ -252,7 +254,6 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr
NewActionService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.server, iActionSCh, cgr.anzS, cgr.srvDep),
NewAccountService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.server, iAccountSCh, cgr.anzS, cgr.srvDep),
NewTPeService(cgr.cfg, cgr.cM, cgr.dmS, cgr.server, cgr.srvDep),
NewExportFailoverService(cgr.cfg, cgr.cM, iEFsCh, cgr.server, cgr.srvDep),
)
return
@@ -269,6 +270,13 @@ func (cgr *CGREngine) StartServices(ctx *context.Context, shtDw context.CancelFu
cgr.shdWg.Done()
return
}
if cgr.efs.ShouldRun() { // efs checking first beacause of loggers
cgr.shdWg.Add(1)
if err = cgr.efs.Start(ctx, shtDw); err != nil {
cgr.shdWg.Done()
return
}
}
if cgr.dmS.ShouldRun() { // Some services can run without db, ie: ERs
cgr.shdWg.Add(1)
if err = cgr.dmS.Start(ctx, shtDw); err != nil {
@@ -354,7 +362,7 @@ func (cgr *CGREngine) Init(ctx *context.Context, shtDw context.CancelFunc, flags
}
// init syslog
if utils.Logger, err = engine.NewLogger(
if utils.Logger, err = engine.NewLogger(ctx,
utils.FirstNonEmpty(*flags.SysLogger, cgr.cfg.LoggerCfg().Type),
cgr.cfg.GeneralCfg().DefaultTenant,
cgr.cfg.GeneralCfg().NodeID,
@@ -362,7 +370,7 @@ func (cgr *CGREngine) Init(ctx *context.Context, shtDw context.CancelFunc, flags
return fmt.Errorf("Could not initialize syslog connection, err: <%s>", err)
}
efs.SetFailedPostCacheTTL(cgr.cfg.EFsCfg().FailedPostsTTL) // init failedPosts to posts loggers/exporters in case of failing
utils.Logger.Info(fmt.Sprintf("<CoreS> starting version <%s><%s>", vers, runtime.Version()))
//utils.Logger.Info(fmt.Sprintf("<CoreS> starting version <%s><%s>", vers, runtime.Version()))
cgr.cfg.LazySanityCheck()
return cgr.InitServices(*flags.HttpPrfPath, cpuPrfF, *flags.MemPrfDir, memPrfStop)

View File

@@ -30,7 +30,6 @@ import (
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/efs"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
@@ -52,7 +51,7 @@ type ExportFailoverService struct {
// NewExportFailoverService is the constructor for the TpeService
func NewExportFailoverService(cfg *config.CGRConfig, connMgr *engine.ConnManager,
intConnChan chan birpc.ClientConnector,
server *cores.Server, srvDep map[string]*sync.WaitGroup) servmanager.Service {
server *cores.Server, srvDep map[string]*sync.WaitGroup) *ExportFailoverService {
return &ExportFailoverService{
cfg: cfg,
server: server,