Implement EventExporterCfg constructor and get rid of boilerplate code

This commit is contained in:
ionutboangiu
2023-03-07 10:40:32 -05:00
committed by Dan Christian Bogos
parent 58745047ae
commit 8620b23231
11 changed files with 56 additions and 90 deletions

View File

@@ -43,13 +43,9 @@ func newActHTTPPost(ctx *context.Context, tnt string, cgrEv *utils.CGREvent,
if err != nil {
return nil, err
}
aL.pstrs[i], _ = ees.NewHTTPjsonMapEE(&config.EventExporterCfg{
ID: aL.id(),
ExportPath: actD.Path,
Attempts: attempts,
FailedPostsDir: cfg.EEsCfg().GetDefaultExporter().FailedPostsDir,
Opts: &config.EventExporterOpts{},
}, cfg, nil, nil)
eeCfg := config.NewEventExporterCfg(aL.id(), "", actD.Path, cfg.EEsCfg().GetDefaultExporter().FailedPostsDir,
attempts, nil)
aL.pstrs[i], _ = ees.NewHTTPjsonMapEE(eeCfg, cfg, nil, nil)
}
return
}

View File

@@ -224,6 +224,22 @@ type EventExporterCfg struct {
trailerFields []*FCTemplate
}
// NewEventExporterCfg is a constructor for the EventExporterCfg, that is needed to initialize posters that are used by the
// readers and HTTP exporter actions
func NewEventExporterCfg(ID, exportType, exportPath, failedPostsDir string, attempts int, opts *EventExporterOpts) *EventExporterCfg {
if opts == nil {
opts = new(EventExporterOpts)
}
return &EventExporterCfg{
ID: ID,
Type: exportType,
ExportPath: exportPath,
FailedPostsDir: failedPostsDir,
Attempts: attempts,
Opts: opts,
}
}
func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) {
if jsnCfg == nil {
return

View File

@@ -282,15 +282,10 @@ func (expEv *FailedExportersEEs) ReplayFailedPosts(attempts int) (err error) {
Format: expEv.Format,
}
eeCfg := config.NewEventExporterCfg("ReplayFailedPosts", expEv.Format, expEv.Path,
utils.MetaNone, attempts, expEv.Opts)
var ee EventExporter
if ee, err = NewEventExporter(&config.EventExporterCfg{
ID: "ReplayFailedPosts",
Type: expEv.Format,
ExportPath: expEv.Path,
Opts: expEv.Opts,
Attempts: attempts,
FailedPostsDir: utils.MetaNone,
}, config.CgrConfig(), nil, nil); err != nil {
if ee, err = NewEventExporter(eeCfg, config.CgrConfig(), nil, nil); err != nil {
return
}
keyFunc := func() string { return utils.EmptyString }

View File

@@ -256,15 +256,10 @@ func (expEv *FailedExportersEEs) ReplayFailedPosts(ctx *context.Context, attempt
Format: expEv.Format,
}
eeCfg := config.NewEventExporterCfg("ReplayFailedPosts", expEv.Format, expEv.Path, utils.MetaNone,
attempts, expEv.Opts)
var ee ees.EventExporter
if ee, err = ees.NewEventExporter(&config.EventExporterCfg{
ID: "ReplayFailedPosts",
Type: expEv.Format,
ExportPath: expEv.Path,
Opts: expEv.Opts,
Attempts: attempts,
FailedPostsDir: utils.MetaNone,
}, config.CgrConfig(), nil, nil); err != nil {
if ee, err = ees.NewEventExporter(eeCfg, config.CgrConfig(), nil, nil); err != nil {
return
}
keyFunc := func() string { return utils.EmptyString }

View File

@@ -255,16 +255,10 @@ func (rdr *AMQPER) close() (err error) {
func (rdr *AMQPER) createPoster() {
processedOpt := getProcessOptions(rdr.Config().Opts)
if processedOpt == nil {
if len(rdr.Config().ProcessedPath) == 0 {
return
}
processedOpt = new(config.EventExporterOpts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
rdr.poster = ees.NewAMQPee(&config.EventExporterCfg{
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts,
Opts: processedOpt,
}, nil)
eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
"", rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts, processedOpt)
rdr.poster = ees.NewAMQPee(eeCfg, nil)
}

View File

@@ -212,16 +212,10 @@ func (rdr *AMQPv1ER) close() (err error) {
func (rdr *AMQPv1ER) createPoster() {
processedOpt := getProcessOptions(rdr.Config().Opts)
if processedOpt == nil {
if len(rdr.Config().ProcessedPath) == 0 {
return
}
processedOpt = new(config.EventExporterOpts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
rdr.poster = ees.NewAMQPv1EE(&config.EventExporterCfg{
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts,
Opts: processedOpt,
}, nil)
eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
"", rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts, processedOpt)
rdr.poster = ees.NewAMQPv1EE(eeCfg, nil)
}

View File

@@ -251,16 +251,10 @@ func (rdr *KafkaER) setOpts(opts *config.EventReaderOpts) (err error) {
func (rdr *KafkaER) createPoster() {
processedOpt := getProcessOptions(rdr.Config().Opts)
if processedOpt == nil {
if len(rdr.Config().ProcessedPath) == 0 {
return
}
processedOpt = new(config.EventExporterOpts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
rdr.poster = ees.NewKafkaEE(&config.EventExporterCfg{
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts,
Opts: processedOpt,
}, nil)
eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
"", rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts, processedOpt)
rdr.poster = ees.NewKafkaEE(eeCfg, nil)
}

View File

@@ -28,6 +28,7 @@ import (
"github.com/cgrates/cgrates/utils"
)
// getProcessOptions assigns all non-nil fields ending in "Processed" from EventReaderOpts to their counterparts in EventExporterOpts
func getProcessOptions(erOpts *config.EventReaderOpts) (eeOpts *config.EventExporterOpts) {
if erOpts.AMQPExchangeProcessed != nil {
if eeOpts == nil {

View File

@@ -192,19 +192,12 @@ func (rdr *NatsER) processMessage(msg []byte) (err error) {
func (rdr *NatsER) createPoster() (err error) {
processedOpt := getProcessOptions(rdr.Config().Opts)
if processedOpt == nil {
if len(rdr.Config().ProcessedPath) == 0 {
return
}
processedOpt = new(config.EventExporterOpts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
rdr.poster, err = ees.NewNatsEE(&config.EventExporterCfg{
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(
rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Opts: processedOpt,
Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts,
}, rdr.cgrCfg.GeneralCfg().NodeID,
eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
"", rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts, processedOpt)
rdr.poster, err = ees.NewNatsEE(eeCfg, rdr.cgrCfg.GeneralCfg().NodeID,
rdr.cgrCfg.GeneralCfg().ConnectTimeout, nil)
return
}

View File

@@ -192,18 +192,12 @@ func (rdr *S3ER) readLoop() (err error) {
func (rdr *S3ER) createPoster() {
processedOpt := getProcessOptions(rdr.Config().Opts)
if processedOpt == nil {
if len(rdr.Config().ProcessedPath) == 0 {
return
}
processedOpt = new(config.EventExporterOpts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
rdr.poster = ees.NewS3EE(&config.EventExporterCfg{
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts,
Opts: processedOpt,
}, nil)
eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
"", rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts, processedOpt)
rdr.poster = ees.NewS3EE(eeCfg, nil)
}
func (rdr *S3ER) isClosed() bool {

View File

@@ -221,18 +221,12 @@ func (rdr *SQSER) readLoop(scv sqsClient) (err error) {
func (rdr *SQSER) createPoster() {
processedOpt := getProcessOptions(rdr.Config().Opts)
if processedOpt == nil {
if len(rdr.Config().ProcessedPath) == 0 {
return
}
processedOpt = new(config.EventExporterOpts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
rdr.poster = ees.NewSQSee(&config.EventExporterCfg{
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts,
Opts: processedOpt,
}, nil)
eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
"", rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts, processedOpt)
rdr.poster = ees.NewSQSee(eeCfg, nil)
}
func (rdr *SQSER) isClosed() bool {