From c185e4645526eed370c805b96abc7eccd774bf78 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 29 Oct 2025 09:25:39 +0200 Subject: [PATCH] ees: preserve exporter attempts in failed posts --- apier/v1/apier.go | 2 +- apier/v1/apier_it_test.go | 12 +++++++----- ees/ees.go | 2 +- ees/failedposts.go | 16 ++++++++++------ ees/failedposts_test.go | 27 +++++++++++++++------------ ees/poster_it_test.go | 4 ++-- 6 files changed, 36 insertions(+), 27 deletions(-) diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 35b2d62d0..cecdd36c1 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -1391,7 +1391,7 @@ func (apierSv1 *APIerSv1) ReplayFailedPosts(ctx *context.Context, args ReplayFai failoverPath = filepath.Join(args.FailedPath, d.Name()) } - failedPosts, err := expEv.ReplayFailedPosts(apierSv1.Config.GeneralCfg().PosterAttempts) + failedPosts, err := expEv.ReplayFailedPosts() if err != nil && failoverPath != utils.MetaNone { // Write the events that failed to be replayed to the failover directory if err = failedPosts.WriteToFile(failoverPath); err != nil { diff --git a/apier/v1/apier_it_test.go b/apier/v1/apier_it_test.go index ebcac7a0e..d65c9af88 100644 --- a/apier/v1/apier_it_test.go +++ b/apier/v1/apier_it_test.go @@ -2173,9 +2173,10 @@ func testApierRemoveRatingProfilesWithoutTenant(t *testing.T) { func testApierReplayFldPosts(t *testing.T) { bev := []byte(`{"ID":"cgrates.org:1007","BalanceMap":{"*monetary":[{"Uuid":"367be35a-96ee-40a5-b609-9130661f5f12","ID":"","Value":0,"ExpirationDate":"0001-01-01T00:00:00Z","Weight":10,"DestinationIDs":{},"RatingSubject":"","Categories":{},"SharedGroups":{"SHARED_A":true},"Timings":null,"TimingIDs":{},"Disabled":false,"Factors":null,"Blocker":false}]},"UnitCounters":{"*monetary":[{"CounterType":"*event","Counters":[{"Value":0,"Filter":{"Uuid":null,"ID":"b8531413-10d5-47ad-81ad-2bc272e8f0ca","Type":"*monetary","Value":null,"ExpirationDate":null,"Weight":null,"DestinationIDs":{"FS_USERS":true},"RatingSubject":null,"Categories":null,"SharedGroups":null,"TimingIDs":null,"Timings":null,"Disabled":null,"Factors":null,"Blocker":null}}]}]},"ActionTriggers":[{"ID":"STANDARD_TRIGGERS","UniqueID":"46ac7b8c-685d-4555-bf73-fa6cfbc2fa21","ThresholdType":"*min_balance","ThresholdValue":2,"Recurrent":false,"MinSleep":0,"ExpirationDate":"0001-01-01T00:00:00Z","ActivationDate":"0001-01-01T00:00:00Z","Balance":{"Uuid":null,"ID":null,"Type":"*monetary","Value":null,"ExpirationDate":null,"Weight":null,"DestinationIDs":null,"RatingSubject":null,"Categories":null,"SharedGroups":null,"TimingIDs":null,"Timings":null,"Disabled":null,"Factors":null,"Blocker":null},"Weight":10,"ActionsID":"LOG_WARNING","MinQueuedItems":0,"Executed":true,"LastExecutionTime":"2017-01-31T14:03:57.961651647+01:00"},{"ID":"STANDARD_TRIGGERS","UniqueID":"b8531413-10d5-47ad-81ad-2bc272e8f0ca","ThresholdType":"*max_event_counter","ThresholdValue":5,"Recurrent":false,"MinSleep":0,"ExpirationDate":"0001-01-01T00:00:00Z","ActivationDate":"0001-01-01T00:00:00Z","Balance":{"Uuid":null,"ID":null,"Type":"*monetary","Value":null,"ExpirationDate":null,"Weight":null,"DestinationIDs":{"FS_USERS":true},"RatingSubject":null,"Categories":null,"SharedGroups":null,"TimingIDs":null,"Timings":null,"Disabled":null,"Factors":null,"Blocker":null},"Weight":10,"ActionsID":"LOG_WARNING","MinQueuedItems":0,"Executed":false,"LastExecutionTime":"0001-01-01T00:00:00Z"},{"ID":"STANDARD_TRIGGERS","UniqueID":"8b424186-7a31-4aef-99c5-35e12e6fed41","ThresholdType":"*max_balance","ThresholdValue":20,"Recurrent":false,"MinSleep":0,"ExpirationDate":"0001-01-01T00:00:00Z","ActivationDate":"0001-01-01T00:00:00Z","Balance":{"Uuid":null,"ID":null,"Type":"*monetary","Value":null,"ExpirationDate":null,"Weight":null,"DestinationIDs":null,"RatingSubject":null,"Categories":null,"SharedGroups":null,"TimingIDs":null,"Timings":null,"Disabled":null,"Factors":null,"Blocker":null},"Weight":10,"ActionsID":"LOG_WARNING","MinQueuedItems":0,"Executed":false,"LastExecutionTime":"0001-01-01T00:00:00Z"},{"ID":"STANDARD_TRIGGERS","UniqueID":"28557f3b-139c-4a27-9d17-bda1f54b7c19","ThresholdType":"*max_balance","ThresholdValue":100,"Recurrent":false,"MinSleep":0,"ExpirationDate":"0001-01-01T00:00:00Z","ActivationDate":"0001-01-01T00:00:00Z","Balance":{"Uuid":null,"ID":null,"Type":"*monetary","Value":null,"ExpirationDate":null,"Weight":null,"DestinationIDs":null,"RatingSubject":null,"Categories":null,"SharedGroups":null,"TimingIDs":null,"Timings":null,"Disabled":null,"Factors":null,"Blocker":null},"Weight":10,"ActionsID":"DISABLE_AND_LOG","MinQueuedItems":0,"Executed":false,"LastExecutionTime":"0001-01-01T00:00:00Z"}],"AllowNegative":false,"Disabled":false}"`) ev := &ees.ExportEvents{ - Path: "http://localhost:2081", - Type: utils.MetaHTTPjsonMap, - Events: []any{&ees.HTTPPosterRequest{Body: bev, Header: http.Header{"Content-Type": []string{"application/json"}}}}, + Path: "http://localhost:2081", + Type: utils.MetaHTTPjsonMap, + Attempts: 1, + Events: []any{&ees.HTTPPosterRequest{Body: bev, Header: http.Header{"Content-Type": []string{"application/json"}}}}, } fileName := "act>*http_post|63bed4ea-615e-4096-b1f4-499f64f29b28.json" @@ -2198,13 +2199,14 @@ func testApierReplayFldPosts(t *testing.T) { if err != nil { t.Error(err) } else if !reflect.DeepEqual(ev, outEv) { - t.Errorf("Expecting: %q, received: %q", utils.ToJSON(ev), utils.ToJSON(outEv)) + t.Errorf("Expecting: %s, received: %s", utils.ToJSON(ev), utils.ToJSON(outEv)) } fileName = "cdr|ae8cc4b3-5e60-4396-b82a-64b96a72a03c.json" bev = []byte(`{"CGRID":"88ed9c38005f07576a1e1af293063833b60edcc6"}`) fileInPath := path.Join(args.SourcePath, fileName) ev = &ees.ExportEvents{ - Path: "amqp://guest:guest@localhost:5672/", + Path: "amqp://guest:guest@localhost:5672/", + Attempts: 1, Opts: &config.EventExporterOpts{ AMQP: &config.AMQPOpts{ QueueID: utils.StringPointer("cgrates_cdrs")}, diff --git a/ees/ees.go b/ees/ees.go index cd737008f..6c85fbc78 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -351,7 +351,7 @@ func ExportWithAttempts(exp EventExporter, eEv any, key string) (err error) { defer func() { if err != nil { AddFailedPost(exp.Cfg().FailedPostsDir, exp.Cfg().ExportPath, - exp.Cfg().Type, eEv, exp.Cfg().Opts) + exp.Cfg().Type, exp.Cfg().Attempts, eEv, exp.Cfg().Opts) } }() } diff --git a/ees/failedposts.go b/ees/failedposts.go index 56b372710..b5ce11e7d 100644 --- a/ees/failedposts.go +++ b/ees/failedposts.go @@ -52,7 +52,8 @@ func writeFailedPosts(_ string, value any) { } } -func AddFailedPost(failedPostsDir, expPath, format string, ev any, opts *config.EventExporterOpts) { +func AddFailedPost(failedPostsDir, expPath, format string, attempts int, ev any, + opts *config.EventExporterOpts) { key := utils.ConcatenatedKey(failedPostsDir, expPath, format) // also in case of amqp,amqpv1,s3,sqs and kafka also separe them after queue id var amqpQueueID string @@ -93,6 +94,7 @@ func AddFailedPost(failedPostsDir, expPath, format string, ev any, opts *config. failedPost = &ExportEvents{ Path: expPath, Type: format, + Attempts: attempts, Opts: opts, failedPostsDir: failedPostsDir, } @@ -126,6 +128,7 @@ type ExportEvents struct { Path string Opts *config.EventExporterOpts Type string + Attempts int Events []any failedPostsDir string } @@ -157,9 +160,9 @@ func (expEv *ExportEvents) AddEvent(ev any) { } // ReplayFailedPosts tryies to post cdrs again -func (expEv *ExportEvents) ReplayFailedPosts(attempts int) (failedEvents *ExportEvents, err error) { +func (expEv *ExportEvents) ReplayFailedPosts() (failedEvents *ExportEvents, err error) { eeCfg := config.NewEventExporterCfg("ReplayFailedPosts", expEv.Type, expEv.Path, utils.MetaNone, - attempts, expEv.Opts) + expEv.Attempts, expEv.Opts) var ee EventExporter if ee, err = NewEventExporter(eeCfg, config.CgrConfig(), nil, nil); err != nil { return @@ -169,9 +172,10 @@ func (expEv *ExportEvents) ReplayFailedPosts(attempts int) (failedEvents *Export keyFunc = utils.UUIDSha1Prefix } failedEvents = &ExportEvents{ - Path: expEv.Path, - Opts: expEv.Opts, - Type: expEv.Type, + Path: expEv.Path, + Opts: expEv.Opts, + Type: expEv.Type, + Attempts: expEv.Attempts, } for _, ev := range expEv.Events { if err = ExportWithAttempts(ee, ev, keyFunc()); err != nil { diff --git a/ees/failedposts_test.go b/ees/failedposts_test.go index efa5fcd5e..c1b98f1ad 100644 --- a/ees/failedposts_test.go +++ b/ees/failedposts_test.go @@ -39,7 +39,7 @@ func TestSetFldPostCacheTTL(t *testing.T) { func TestAddFldPost(t *testing.T) { InitFailedPostCache(5*time.Second, false) - AddFailedPost("", "path1", "format1", "1", &config.EventExporterOpts{ + AddFailedPost("", "path1", "format1", 1, "1", &config.EventExporterOpts{ AMQP: &config.AMQPOpts{}, Els: &config.ElsOpts{}, AWS: &config.AWSOpts{}, @@ -61,9 +61,10 @@ func TestAddFldPost(t *testing.T) { t.Error("Error when casting") } eOut := &ExportEvents{ - Path: "path1", - Type: "format1", - Events: []any{"1"}, + Path: "path1", + Type: "format1", + Attempts: 1, + Events: []any{"1"}, Opts: &config.EventExporterOpts{ AMQP: &config.AMQPOpts{}, Els: &config.ElsOpts{}, @@ -77,7 +78,7 @@ func TestAddFldPost(t *testing.T) { if !reflect.DeepEqual(eOut, failedPost) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost)) } - AddFailedPost("", "path1", "format1", "2", &config.EventExporterOpts{ + AddFailedPost("", "path1", "format1", 1, "2", &config.EventExporterOpts{ AMQP: &config.AMQPOpts{}, Els: &config.ElsOpts{}, AWS: &config.AWSOpts{}, @@ -86,7 +87,7 @@ func TestAddFldPost(t *testing.T) { RPC: &config.RPCOpts{}, SQL: &config.SQLOpts{}, }) - AddFailedPost("", "path2", "format2", "3", &config.EventExporterOpts{ + AddFailedPost("", "path2", "format2", 1, "3", &config.EventExporterOpts{ AWS: &config.AWSOpts{ SQSQueueID: utils.StringPointer("qID"), }, @@ -109,9 +110,10 @@ func TestAddFldPost(t *testing.T) { t.Error("Error when casting") } eOut = &ExportEvents{ - Path: "path1", - Type: "format1", - Events: []any{"1", "2"}, + Path: "path1", + Type: "format1", + Attempts: 1, + Events: []any{"1", "2"}, Opts: &config.EventExporterOpts{ AMQP: &config.AMQPOpts{}, Els: &config.ElsOpts{}, @@ -137,9 +139,10 @@ func TestAddFldPost(t *testing.T) { t.Error("Error when casting") } eOut = &ExportEvents{ - Path: "path2", - Type: "format2", - Events: []any{"3"}, + Path: "path2", + Type: "format2", + Attempts: 1, + Events: []any{"3"}, Opts: &config.EventExporterOpts{ Els: &config.ElsOpts{}, NATS: &config.NATSOpts{}, diff --git a/ees/poster_it_test.go b/ees/poster_it_test.go index 5c7905ecc..e87cc5b5e 100644 --- a/ees/poster_it_test.go +++ b/ees/poster_it_test.go @@ -88,7 +88,7 @@ func TestHttpJsonPoster(t *testing.T) { if err = ExportWithAttempts(pstr, &HTTPPosterRequest{Body: jsn, Header: make(http.Header)}, ""); err == nil { t.Error("Expected error") } - AddFailedPost("/tmp", "http://localhost:8080/invalid", utils.MetaHTTPjsonMap, jsn, &config.EventExporterOpts{ + AddFailedPost("/tmp", "http://localhost:8080/invalid", utils.MetaHTTPjsonMap, 1, jsn, &config.EventExporterOpts{ AMQP: &config.AMQPOpts{}, Els: &config.ElsOpts{}, AWS: &config.AWSOpts{}, @@ -146,7 +146,7 @@ func TestHttpBytesPoster(t *testing.T) { if err = ExportWithAttempts(pstr, &HTTPPosterRequest{Body: content, Header: make(http.Header)}, ""); err == nil { t.Error("Expected error") } - AddFailedPost("/tmp", "http://localhost:8080/invalid", utils.ContentJSON, content, &config.EventExporterOpts{ + AddFailedPost("/tmp", "http://localhost:8080/invalid", utils.ContentJSON, 1, content, &config.EventExporterOpts{ AMQP: &config.AMQPOpts{}, Els: &config.ElsOpts{}, AWS: &config.AWSOpts{},