diff --git a/config/eescfg.go b/config/eescfg.go index 2182516a1..a935a841e 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -227,6 +227,22 @@ type EventExporterCfg struct { trailerFields []*FCTemplate } +// NewEventExporterCfg is a constructor for the EventExporterCfg, that is needed to initialize posters that are used by the +// readers and HTTP exporter actions +func NewEventExporterCfg(ID, exportType, exportPath, failedPostsDir string, attempts int, opts *EventExporterOpts) *EventExporterCfg { + if opts == nil { + opts = new(EventExporterOpts) + } + return &EventExporterCfg{ + ID: ID, + Type: exportType, + ExportPath: exportPath, + FailedPostsDir: failedPostsDir, + Attempts: attempts, + Opts: opts, + } +} + func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) { if jsnCfg == nil { return diff --git a/ees/libactions.go b/ees/libactions.go index 5431a21c6..5c48f715d 100644 --- a/ees/libactions.go +++ b/ees/libactions.go @@ -52,12 +52,9 @@ func callURL(ub *engine.Account, a *engine.Action, _ engine.Actions, _ *engine.F if err != nil { return err } - pstr, err := NewHTTPjsonMapEE(&config.EventExporterCfg{ - ID: a.Id, - ExportPath: a.ExtraParameters, - Attempts: config.CgrConfig().GeneralCfg().PosterAttempts, - FailedPostsDir: config.CgrConfig().GeneralCfg().FailedPostsDir, - }, config.CgrConfig(), nil, nil) + eeCfg := config.NewEventExporterCfg(a.Id, "", a.ExtraParameters, config.CgrConfig().GeneralCfg().FailedPostsDir, + config.CgrConfig().GeneralCfg().PosterAttempts, nil) + pstr, err := NewHTTPjsonMapEE(eeCfg, config.CgrConfig(), nil, nil) if err != nil { return err } @@ -74,12 +71,9 @@ func callURLAsync(ub *engine.Account, a *engine.Action, _ engine.Actions, _ *eng if err != nil { return err } - pstr, err := NewHTTPjsonMapEE(&config.EventExporterCfg{ - ID: a.Id, - ExportPath: a.ExtraParameters, - Attempts: config.CgrConfig().GeneralCfg().PosterAttempts, - FailedPostsDir: config.CgrConfig().GeneralCfg().FailedPostsDir, - }, config.CgrConfig(), nil, nil) + eeCfg := config.NewEventExporterCfg(a.Id, "", a.ExtraParameters, config.CgrConfig().GeneralCfg().FailedPostsDir, + config.CgrConfig().GeneralCfg().PosterAttempts, nil) + pstr, err := NewHTTPjsonMapEE(eeCfg, config.CgrConfig(), nil, nil) if err != nil { return err } @@ -92,13 +86,9 @@ func postEvent(_ *engine.Account, a *engine.Action, _ engine.Actions, _ *engine. if err != nil { return err } - pstr, err := NewHTTPjsonMapEE(&config.EventExporterCfg{ - ID: a.Id, - ExportPath: a.ExtraParameters, - Attempts: config.CgrConfig().GeneralCfg().PosterAttempts, - FailedPostsDir: config.CgrConfig().GeneralCfg().FailedPostsDir, - Opts: &config.EventExporterOpts{}, - }, config.CgrConfig(), nil, nil) + eeCfg := config.NewEventExporterCfg(a.Id, "", a.ExtraParameters, config.CgrConfig().GeneralCfg().FailedPostsDir, + config.CgrConfig().GeneralCfg().PosterAttempts, nil) + pstr, err := NewHTTPjsonMapEE(eeCfg, config.CgrConfig(), nil, nil) if err != nil { return err } diff --git a/ees/libcdre.go b/ees/libcdre.go index 769d91f21..9edf189d4 100644 --- a/ees/libcdre.go +++ b/ees/libcdre.go @@ -160,15 +160,10 @@ func (expEv *ExportEvents) ReplayFailedPosts(attempts int) (failedEvents *Export Format: expEv.Format, } + eeCfg := config.NewEventExporterCfg("ReplayFailedPosts", expEv.Format, expEv.Path, utils.MetaNone, + attempts, expEv.Opts) var ee EventExporter - if ee, err = NewEventExporter(&config.EventExporterCfg{ - ID: "ReplayFailedPosts", - Type: expEv.Format, - ExportPath: expEv.Path, - Opts: expEv.Opts, - Attempts: attempts, - FailedPostsDir: utils.MetaNone, - }, config.CgrConfig(), nil, nil); err != nil { + if ee, err = NewEventExporter(eeCfg, config.CgrConfig(), nil, nil); err != nil { return } keyFunc := func() string { return utils.EmptyString } diff --git a/ers/amqp.go b/ers/amqp.go index a53b6485a..3d409b44a 100644 --- a/ers/amqp.go +++ b/ers/amqp.go @@ -251,17 +251,10 @@ func (rdr *AMQPER) close() (err error) { func (rdr *AMQPER) createPoster() { processedOpt := getProcessOptions(rdr.Config().Opts) - if processedOpt == nil { - if len(rdr.Config().ProcessedPath) == 0 { - return - } - processedOpt = new(config.EventExporterOpts) + if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { + return } - rdr.poster = ees.NewAMQPee(&config.EventExporterCfg{ - ID: rdr.Config().ID, - ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), - Attempts: rdr.cgrCfg.GeneralCfg().PosterAttempts, - Opts: processedOpt, - FailedPostsDir: rdr.cgrCfg.GeneralCfg().FailedPostsDir, - }, nil) + eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), + rdr.cgrCfg.GeneralCfg().FailedPostsDir, rdr.cgrCfg.GeneralCfg().PosterAttempts, processedOpt) + rdr.poster = ees.NewAMQPee(eeCfg, nil) } diff --git a/ers/amqpv1.go b/ers/amqpv1.go index 1f99b0e7a..cce42d3f7 100644 --- a/ers/amqpv1.go +++ b/ers/amqpv1.go @@ -207,17 +207,10 @@ func (rdr *AMQPv1ER) close() (err error) { func (rdr *AMQPv1ER) createPoster() { processedOpt := getProcessOptions(rdr.Config().Opts) - if processedOpt == nil { - if len(rdr.Config().ProcessedPath) == 0 { - return - } - processedOpt = new(config.EventExporterOpts) + if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { + return } - rdr.poster = ees.NewAMQPv1EE(&config.EventExporterCfg{ - ID: rdr.Config().ID, - ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), - Attempts: rdr.cgrCfg.GeneralCfg().PosterAttempts, - Opts: processedOpt, - FailedPostsDir: rdr.cgrCfg.GeneralCfg().FailedPostsDir, - }, nil) + eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), + rdr.cgrCfg.GeneralCfg().FailedPostsDir, rdr.cgrCfg.GeneralCfg().PosterAttempts, processedOpt) + rdr.poster = ees.NewAMQPv1EE(eeCfg, nil) } diff --git a/ers/kafka.go b/ers/kafka.go index f551065fd..363366977 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -202,17 +202,10 @@ func (rdr *KafkaER) setOpts(opts *config.EventReaderOpts) (err error) { func (rdr *KafkaER) createPoster() { processedOpt := getProcessOptions(rdr.Config().Opts) - if processedOpt == nil { - if len(rdr.Config().ProcessedPath) == 0 { - return - } - processedOpt = new(config.EventExporterOpts) + if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { + return } - rdr.poster = ees.NewKafkaEE(&config.EventExporterCfg{ - ID: rdr.Config().ID, - ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), - Attempts: rdr.cgrCfg.GeneralCfg().PosterAttempts, - Opts: processedOpt, - FailedPostsDir: rdr.cgrCfg.GeneralCfg().FailedPostsDir, - }, nil) + eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), + rdr.cgrCfg.GeneralCfg().FailedPostsDir, rdr.cgrCfg.GeneralCfg().PosterAttempts, processedOpt) + rdr.poster = ees.NewKafkaEE(eeCfg, nil) } diff --git a/ers/nats.go b/ers/nats.go index 7f839270e..cde6786c5 100644 --- a/ers/nats.go +++ b/ers/nats.go @@ -188,20 +188,12 @@ func (rdr *NatsER) processMessage(msg []byte) (err error) { func (rdr *NatsER) createPoster() (err error) { processedOpt := getProcessOptions(rdr.Config().Opts) - if processedOpt == nil { - if len(rdr.Config().ProcessedPath) == 0 { - return - } - processedOpt = new(config.EventExporterOpts) + if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { + return } - rdr.poster, err = ees.NewNatsEE(&config.EventExporterCfg{ - ID: rdr.Config().ID, - ExportPath: utils.FirstNonEmpty( - rdr.Config().ProcessedPath, rdr.Config().SourcePath), - Opts: processedOpt, - Attempts: rdr.cgrCfg.GeneralCfg().PosterAttempts, - FailedPostsDir: rdr.cgrCfg.GeneralCfg().FailedPostsDir, - }, rdr.cgrCfg.GeneralCfg().NodeID, + eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), + rdr.cgrCfg.GeneralCfg().FailedPostsDir, rdr.cgrCfg.GeneralCfg().PosterAttempts, processedOpt) + rdr.poster, err = ees.NewNatsEE(eeCfg, rdr.cgrCfg.GeneralCfg().NodeID, rdr.cgrCfg.GeneralCfg().ConnectTimeout, nil) return } diff --git a/ers/s3.go b/ers/s3.go index a32c8463c..d2f80c1fd 100644 --- a/ers/s3.go +++ b/ers/s3.go @@ -194,19 +194,12 @@ func (rdr *S3ER) readLoop(scv s3Client) (err error) { func (rdr *S3ER) createPoster() { processedOpt := getProcessOptions(rdr.Config().Opts) - if processedOpt == nil { - if len(rdr.Config().ProcessedPath) == 0 { - return - } - processedOpt = new(config.EventExporterOpts) + if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { + return } - rdr.poster = ees.NewS3EE(&config.EventExporterCfg{ - ID: rdr.Config().ID, - ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), - Attempts: rdr.cgrCfg.GeneralCfg().PosterAttempts, - Opts: processedOpt, - FailedPostsDir: rdr.cgrCfg.GeneralCfg().FailedPostsDir, - }, nil) + eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), + rdr.cgrCfg.GeneralCfg().FailedPostsDir, rdr.cgrCfg.GeneralCfg().PosterAttempts, processedOpt) + rdr.poster = ees.NewS3EE(eeCfg, nil) } func (rdr *S3ER) isClosed() bool { diff --git a/ers/sqs.go b/ers/sqs.go index f5f1555a7..e34cd4fc9 100644 --- a/ers/sqs.go +++ b/ers/sqs.go @@ -217,19 +217,12 @@ func (rdr *SQSER) readLoop(scv sqsClient) (err error) { func (rdr *SQSER) createPoster() { processedOpt := getProcessOptions(rdr.Config().Opts) - if processedOpt == nil { - if len(rdr.Config().ProcessedPath) == 0 { - return - } - processedOpt = new(config.EventExporterOpts) + if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { + return } - rdr.poster = ees.NewSQSee(&config.EventExporterCfg{ - ID: rdr.Config().ID, - ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), - Attempts: rdr.cgrCfg.GeneralCfg().PosterAttempts, - Opts: processedOpt, - FailedPostsDir: rdr.cgrCfg.GeneralCfg().FailedPostsDir, - }, nil) + eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), + rdr.cgrCfg.GeneralCfg().FailedPostsDir, rdr.cgrCfg.GeneralCfg().PosterAttempts, processedOpt) + rdr.poster = ees.NewSQSee(eeCfg, nil) } func (rdr *SQSER) isClosed() bool {