diff --git a/actions/export.go b/actions/export.go index 2d8d638d4..ef6c69b84 100644 --- a/actions/export.go +++ b/actions/export.go @@ -43,13 +43,9 @@ func newActHTTPPost(ctx *context.Context, tnt string, cgrEv *utils.CGREvent, if err != nil { return nil, err } - aL.pstrs[i], _ = ees.NewHTTPjsonMapEE(&config.EventExporterCfg{ - ID: aL.id(), - ExportPath: actD.Path, - Attempts: attempts, - FailedPostsDir: cfg.EEsCfg().GetDefaultExporter().FailedPostsDir, - Opts: &config.EventExporterOpts{}, - }, cfg, nil, nil) + eeCfg := config.NewEventExporterCfg(aL.id(), "", actD.Path, cfg.EEsCfg().GetDefaultExporter().FailedPostsDir, + attempts, nil) + aL.pstrs[i], _ = ees.NewHTTPjsonMapEE(eeCfg, cfg, nil, nil) } return } diff --git a/config/eescfg.go b/config/eescfg.go index d3651df72..0e3673924 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -224,6 +224,22 @@ type EventExporterCfg struct { trailerFields []*FCTemplate } +// NewEventExporterCfg is a constructor for the EventExporterCfg, that is needed to initialize posters that are used by the +// readers and HTTP exporter actions +func NewEventExporterCfg(ID, exportType, exportPath, failedPostsDir string, attempts int, opts *EventExporterOpts) *EventExporterCfg { + if opts == nil { + opts = new(EventExporterOpts) + } + return &EventExporterCfg{ + ID: ID, + Type: exportType, + ExportPath: exportPath, + FailedPostsDir: failedPostsDir, + Attempts: attempts, + Opts: opts, + } +} + func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) { if jsnCfg == nil { return diff --git a/ees/libcdre.go b/ees/libcdre.go index 6e7458a31..cdf51989e 100644 --- a/ees/libcdre.go +++ b/ees/libcdre.go @@ -282,15 +282,10 @@ func (expEv *FailedExportersEEs) ReplayFailedPosts(attempts int) (err error) { Format: expEv.Format, } + eeCfg := config.NewEventExporterCfg("ReplayFailedPosts", expEv.Format, expEv.Path, + utils.MetaNone, attempts, expEv.Opts) var ee EventExporter - if ee, err = NewEventExporter(&config.EventExporterCfg{ - ID: "ReplayFailedPosts", - Type: expEv.Format, - ExportPath: expEv.Path, - Opts: expEv.Opts, - Attempts: attempts, - FailedPostsDir: utils.MetaNone, - }, config.CgrConfig(), nil, nil); err != nil { + if ee, err = NewEventExporter(eeCfg, config.CgrConfig(), nil, nil); err != nil { return } keyFunc := func() string { return utils.EmptyString } diff --git a/efs/failed_ees.go b/efs/failed_ees.go index db2a82777..fc960ca95 100644 --- a/efs/failed_ees.go +++ b/efs/failed_ees.go @@ -256,15 +256,10 @@ func (expEv *FailedExportersEEs) ReplayFailedPosts(ctx *context.Context, attempt Format: expEv.Format, } + eeCfg := config.NewEventExporterCfg("ReplayFailedPosts", expEv.Format, expEv.Path, utils.MetaNone, + attempts, expEv.Opts) var ee ees.EventExporter - if ee, err = ees.NewEventExporter(&config.EventExporterCfg{ - ID: "ReplayFailedPosts", - Type: expEv.Format, - ExportPath: expEv.Path, - Opts: expEv.Opts, - Attempts: attempts, - FailedPostsDir: utils.MetaNone, - }, config.CgrConfig(), nil, nil); err != nil { + if ee, err = ees.NewEventExporter(eeCfg, config.CgrConfig(), nil, nil); err != nil { return } keyFunc := func() string { return utils.EmptyString } diff --git a/ers/amqp.go b/ers/amqp.go index 59c1e1f90..fdfb0b748 100644 --- a/ers/amqp.go +++ b/ers/amqp.go @@ -255,16 +255,10 @@ func (rdr *AMQPER) close() (err error) { func (rdr *AMQPER) createPoster() { processedOpt := getProcessOptions(rdr.Config().Opts) - if processedOpt == nil { - if len(rdr.Config().ProcessedPath) == 0 { - return - } - processedOpt = new(config.EventExporterOpts) + if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { + return } - rdr.poster = ees.NewAMQPee(&config.EventExporterCfg{ - ID: rdr.Config().ID, - ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), - Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts, - Opts: processedOpt, - }, nil) + eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), + "", rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts, processedOpt) + rdr.poster = ees.NewAMQPee(eeCfg, nil) } diff --git a/ers/amqpv1.go b/ers/amqpv1.go index 27f7f8938..22eeade4b 100644 --- a/ers/amqpv1.go +++ b/ers/amqpv1.go @@ -212,16 +212,10 @@ func (rdr *AMQPv1ER) close() (err error) { func (rdr *AMQPv1ER) createPoster() { processedOpt := getProcessOptions(rdr.Config().Opts) - if processedOpt == nil { - if len(rdr.Config().ProcessedPath) == 0 { - return - } - processedOpt = new(config.EventExporterOpts) + if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { + return } - rdr.poster = ees.NewAMQPv1EE(&config.EventExporterCfg{ - ID: rdr.Config().ID, - ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), - Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts, - Opts: processedOpt, - }, nil) + eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), + "", rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts, processedOpt) + rdr.poster = ees.NewAMQPv1EE(eeCfg, nil) } diff --git a/ers/kafka.go b/ers/kafka.go index 5ce7230f1..c662d1316 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -251,16 +251,10 @@ func (rdr *KafkaER) setOpts(opts *config.EventReaderOpts) (err error) { func (rdr *KafkaER) createPoster() { processedOpt := getProcessOptions(rdr.Config().Opts) - if processedOpt == nil { - if len(rdr.Config().ProcessedPath) == 0 { - return - } - processedOpt = new(config.EventExporterOpts) + if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { + return } - rdr.poster = ees.NewKafkaEE(&config.EventExporterCfg{ - ID: rdr.Config().ID, - ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), - Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts, - Opts: processedOpt, - }, nil) + eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), + "", rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts, processedOpt) + rdr.poster = ees.NewKafkaEE(eeCfg, nil) } diff --git a/ers/libers.go b/ers/libers.go index c204552d5..b31970186 100644 --- a/ers/libers.go +++ b/ers/libers.go @@ -28,6 +28,7 @@ import ( "github.com/cgrates/cgrates/utils" ) +// getProcessOptions assigns all non-nil fields ending in "Processed" from EventReaderOpts to their counterparts in EventExporterOpts func getProcessOptions(erOpts *config.EventReaderOpts) (eeOpts *config.EventExporterOpts) { if erOpts.AMQPExchangeProcessed != nil { if eeOpts == nil { diff --git a/ers/nats.go b/ers/nats.go index 877a1c9e3..de126dd3d 100644 --- a/ers/nats.go +++ b/ers/nats.go @@ -192,19 +192,12 @@ func (rdr *NatsER) processMessage(msg []byte) (err error) { func (rdr *NatsER) createPoster() (err error) { processedOpt := getProcessOptions(rdr.Config().Opts) - if processedOpt == nil { - if len(rdr.Config().ProcessedPath) == 0 { - return - } - processedOpt = new(config.EventExporterOpts) + if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { + return } - rdr.poster, err = ees.NewNatsEE(&config.EventExporterCfg{ - ID: rdr.Config().ID, - ExportPath: utils.FirstNonEmpty( - rdr.Config().ProcessedPath, rdr.Config().SourcePath), - Opts: processedOpt, - Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts, - }, rdr.cgrCfg.GeneralCfg().NodeID, + eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), + "", rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts, processedOpt) + rdr.poster, err = ees.NewNatsEE(eeCfg, rdr.cgrCfg.GeneralCfg().NodeID, rdr.cgrCfg.GeneralCfg().ConnectTimeout, nil) return } diff --git a/ers/s3.go b/ers/s3.go index 18f4addef..e9af556f0 100644 --- a/ers/s3.go +++ b/ers/s3.go @@ -192,18 +192,12 @@ func (rdr *S3ER) readLoop() (err error) { func (rdr *S3ER) createPoster() { processedOpt := getProcessOptions(rdr.Config().Opts) - if processedOpt == nil { - if len(rdr.Config().ProcessedPath) == 0 { - return - } - processedOpt = new(config.EventExporterOpts) + if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { + return } - rdr.poster = ees.NewS3EE(&config.EventExporterCfg{ - ID: rdr.Config().ID, - ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), - Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts, - Opts: processedOpt, - }, nil) + eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), + "", rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts, processedOpt) + rdr.poster = ees.NewS3EE(eeCfg, nil) } func (rdr *S3ER) isClosed() bool { diff --git a/ers/sqs.go b/ers/sqs.go index 49cdee236..3f3c1fc91 100644 --- a/ers/sqs.go +++ b/ers/sqs.go @@ -221,18 +221,12 @@ func (rdr *SQSER) readLoop(scv sqsClient) (err error) { func (rdr *SQSER) createPoster() { processedOpt := getProcessOptions(rdr.Config().Opts) - if processedOpt == nil { - if len(rdr.Config().ProcessedPath) == 0 { - return - } - processedOpt = new(config.EventExporterOpts) + if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { + return } - rdr.poster = ees.NewSQSee(&config.EventExporterCfg{ - ID: rdr.Config().ID, - ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), - Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts, - Opts: processedOpt, - }, nil) + eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), + "", rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts, processedOpt) + rdr.poster = ees.NewSQSee(eeCfg, nil) } func (rdr *SQSER) isClosed() bool {