Implement EventExporterCfg constructor and use it to get rid of boilerplate code

This commit is contained in:
ionutboangiu
2023-03-06 10:49:08 -05:00
committed by Dan Christian Bogos
parent d69f08890b
commit b1b9647104
9 changed files with 58 additions and 100 deletions

View File

@@ -227,6 +227,22 @@ type EventExporterCfg struct {
trailerFields []*FCTemplate
}
// NewEventExporterCfg is a constructor for the EventExporterCfg, that is needed to initialize posters that are used by the
// readers and HTTP exporter actions
func NewEventExporterCfg(ID, exportType, exportPath, failedPostsDir string, attempts int, opts *EventExporterOpts) *EventExporterCfg {
if opts == nil {
opts = new(EventExporterOpts)
}
return &EventExporterCfg{
ID: ID,
Type: exportType,
ExportPath: exportPath,
FailedPostsDir: failedPostsDir,
Attempts: attempts,
Opts: opts,
}
}
func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) {
if jsnCfg == nil {
return

View File

@@ -52,12 +52,9 @@ func callURL(ub *engine.Account, a *engine.Action, _ engine.Actions, _ *engine.F
if err != nil {
return err
}
pstr, err := NewHTTPjsonMapEE(&config.EventExporterCfg{
ID: a.Id,
ExportPath: a.ExtraParameters,
Attempts: config.CgrConfig().GeneralCfg().PosterAttempts,
FailedPostsDir: config.CgrConfig().GeneralCfg().FailedPostsDir,
}, config.CgrConfig(), nil, nil)
eeCfg := config.NewEventExporterCfg(a.Id, "", a.ExtraParameters, config.CgrConfig().GeneralCfg().FailedPostsDir,
config.CgrConfig().GeneralCfg().PosterAttempts, nil)
pstr, err := NewHTTPjsonMapEE(eeCfg, config.CgrConfig(), nil, nil)
if err != nil {
return err
}
@@ -74,12 +71,9 @@ func callURLAsync(ub *engine.Account, a *engine.Action, _ engine.Actions, _ *eng
if err != nil {
return err
}
pstr, err := NewHTTPjsonMapEE(&config.EventExporterCfg{
ID: a.Id,
ExportPath: a.ExtraParameters,
Attempts: config.CgrConfig().GeneralCfg().PosterAttempts,
FailedPostsDir: config.CgrConfig().GeneralCfg().FailedPostsDir,
}, config.CgrConfig(), nil, nil)
eeCfg := config.NewEventExporterCfg(a.Id, "", a.ExtraParameters, config.CgrConfig().GeneralCfg().FailedPostsDir,
config.CgrConfig().GeneralCfg().PosterAttempts, nil)
pstr, err := NewHTTPjsonMapEE(eeCfg, config.CgrConfig(), nil, nil)
if err != nil {
return err
}
@@ -92,13 +86,9 @@ func postEvent(_ *engine.Account, a *engine.Action, _ engine.Actions, _ *engine.
if err != nil {
return err
}
pstr, err := NewHTTPjsonMapEE(&config.EventExporterCfg{
ID: a.Id,
ExportPath: a.ExtraParameters,
Attempts: config.CgrConfig().GeneralCfg().PosterAttempts,
FailedPostsDir: config.CgrConfig().GeneralCfg().FailedPostsDir,
Opts: &config.EventExporterOpts{},
}, config.CgrConfig(), nil, nil)
eeCfg := config.NewEventExporterCfg(a.Id, "", a.ExtraParameters, config.CgrConfig().GeneralCfg().FailedPostsDir,
config.CgrConfig().GeneralCfg().PosterAttempts, nil)
pstr, err := NewHTTPjsonMapEE(eeCfg, config.CgrConfig(), nil, nil)
if err != nil {
return err
}

View File

@@ -160,15 +160,10 @@ func (expEv *ExportEvents) ReplayFailedPosts(attempts int) (failedEvents *Export
Format: expEv.Format,
}
eeCfg := config.NewEventExporterCfg("ReplayFailedPosts", expEv.Format, expEv.Path, utils.MetaNone,
attempts, expEv.Opts)
var ee EventExporter
if ee, err = NewEventExporter(&config.EventExporterCfg{
ID: "ReplayFailedPosts",
Type: expEv.Format,
ExportPath: expEv.Path,
Opts: expEv.Opts,
Attempts: attempts,
FailedPostsDir: utils.MetaNone,
}, config.CgrConfig(), nil, nil); err != nil {
if ee, err = NewEventExporter(eeCfg, config.CgrConfig(), nil, nil); err != nil {
return
}
keyFunc := func() string { return utils.EmptyString }

View File

@@ -251,17 +251,10 @@ func (rdr *AMQPER) close() (err error) {
func (rdr *AMQPER) createPoster() {
processedOpt := getProcessOptions(rdr.Config().Opts)
if processedOpt == nil {
if len(rdr.Config().ProcessedPath) == 0 {
return
}
processedOpt = new(config.EventExporterOpts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
rdr.poster = ees.NewAMQPee(&config.EventExporterCfg{
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Attempts: rdr.cgrCfg.GeneralCfg().PosterAttempts,
Opts: processedOpt,
FailedPostsDir: rdr.cgrCfg.GeneralCfg().FailedPostsDir,
}, nil)
eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
rdr.cgrCfg.GeneralCfg().FailedPostsDir, rdr.cgrCfg.GeneralCfg().PosterAttempts, processedOpt)
rdr.poster = ees.NewAMQPee(eeCfg, nil)
}

View File

@@ -207,17 +207,10 @@ func (rdr *AMQPv1ER) close() (err error) {
func (rdr *AMQPv1ER) createPoster() {
processedOpt := getProcessOptions(rdr.Config().Opts)
if processedOpt == nil {
if len(rdr.Config().ProcessedPath) == 0 {
return
}
processedOpt = new(config.EventExporterOpts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
rdr.poster = ees.NewAMQPv1EE(&config.EventExporterCfg{
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Attempts: rdr.cgrCfg.GeneralCfg().PosterAttempts,
Opts: processedOpt,
FailedPostsDir: rdr.cgrCfg.GeneralCfg().FailedPostsDir,
}, nil)
eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
rdr.cgrCfg.GeneralCfg().FailedPostsDir, rdr.cgrCfg.GeneralCfg().PosterAttempts, processedOpt)
rdr.poster = ees.NewAMQPv1EE(eeCfg, nil)
}

View File

@@ -202,17 +202,10 @@ func (rdr *KafkaER) setOpts(opts *config.EventReaderOpts) (err error) {
func (rdr *KafkaER) createPoster() {
processedOpt := getProcessOptions(rdr.Config().Opts)
if processedOpt == nil {
if len(rdr.Config().ProcessedPath) == 0 {
return
}
processedOpt = new(config.EventExporterOpts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
rdr.poster = ees.NewKafkaEE(&config.EventExporterCfg{
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Attempts: rdr.cgrCfg.GeneralCfg().PosterAttempts,
Opts: processedOpt,
FailedPostsDir: rdr.cgrCfg.GeneralCfg().FailedPostsDir,
}, nil)
eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
rdr.cgrCfg.GeneralCfg().FailedPostsDir, rdr.cgrCfg.GeneralCfg().PosterAttempts, processedOpt)
rdr.poster = ees.NewKafkaEE(eeCfg, nil)
}

View File

@@ -188,20 +188,12 @@ func (rdr *NatsER) processMessage(msg []byte) (err error) {
func (rdr *NatsER) createPoster() (err error) {
processedOpt := getProcessOptions(rdr.Config().Opts)
if processedOpt == nil {
if len(rdr.Config().ProcessedPath) == 0 {
return
}
processedOpt = new(config.EventExporterOpts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
rdr.poster, err = ees.NewNatsEE(&config.EventExporterCfg{
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(
rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Opts: processedOpt,
Attempts: rdr.cgrCfg.GeneralCfg().PosterAttempts,
FailedPostsDir: rdr.cgrCfg.GeneralCfg().FailedPostsDir,
}, rdr.cgrCfg.GeneralCfg().NodeID,
eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
rdr.cgrCfg.GeneralCfg().FailedPostsDir, rdr.cgrCfg.GeneralCfg().PosterAttempts, processedOpt)
rdr.poster, err = ees.NewNatsEE(eeCfg, rdr.cgrCfg.GeneralCfg().NodeID,
rdr.cgrCfg.GeneralCfg().ConnectTimeout, nil)
return
}

View File

@@ -194,19 +194,12 @@ func (rdr *S3ER) readLoop(scv s3Client) (err error) {
func (rdr *S3ER) createPoster() {
processedOpt := getProcessOptions(rdr.Config().Opts)
if processedOpt == nil {
if len(rdr.Config().ProcessedPath) == 0 {
return
}
processedOpt = new(config.EventExporterOpts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
rdr.poster = ees.NewS3EE(&config.EventExporterCfg{
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Attempts: rdr.cgrCfg.GeneralCfg().PosterAttempts,
Opts: processedOpt,
FailedPostsDir: rdr.cgrCfg.GeneralCfg().FailedPostsDir,
}, nil)
eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
rdr.cgrCfg.GeneralCfg().FailedPostsDir, rdr.cgrCfg.GeneralCfg().PosterAttempts, processedOpt)
rdr.poster = ees.NewS3EE(eeCfg, nil)
}
func (rdr *S3ER) isClosed() bool {

View File

@@ -217,19 +217,12 @@ func (rdr *SQSER) readLoop(scv sqsClient) (err error) {
func (rdr *SQSER) createPoster() {
processedOpt := getProcessOptions(rdr.Config().Opts)
if processedOpt == nil {
if len(rdr.Config().ProcessedPath) == 0 {
return
}
processedOpt = new(config.EventExporterOpts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
rdr.poster = ees.NewSQSee(&config.EventExporterCfg{
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Attempts: rdr.cgrCfg.GeneralCfg().PosterAttempts,
Opts: processedOpt,
FailedPostsDir: rdr.cgrCfg.GeneralCfg().FailedPostsDir,
}, nil)
eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
rdr.cgrCfg.GeneralCfg().FailedPostsDir, rdr.cgrCfg.GeneralCfg().PosterAttempts, processedOpt)
rdr.poster = ees.NewSQSee(eeCfg, nil)
}
func (rdr *SQSER) isClosed() bool {