diff --git a/apier/v1/apier.go b/apier/v1/apier.go index a34028aa2..9a5f76dd0 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -22,8 +22,10 @@ import ( "encoding/csv" "errors" "fmt" + "io/fs" "os" "path" + "path/filepath" "slices" "strconv" "strings" @@ -1416,57 +1418,62 @@ func (apierSv1 *APIerSv1) RemoveActions(ctx *context.Context, attr *AttrRemoveAc return nil } -type ArgsReplyFailedPosts struct { - FailedRequestsInDir *string // if defined it will be our source of requests to be replayed - FailedRequestsOutDir *string // if defined it will become our destination for files failing to be replayed, *none to be discarded - Modules []string // list of modules for which replay the requests, nil for all +// ReplayFailedPostsParams contains parameters for replaying failed posts. +type ReplayFailedPostsParams struct { + SourcePath string // path for events to be replayed + FailedPath string // path for events that failed to replay, *none to discard, defaults to SourceDir if empty + Modules []string // list of modules to replay requests for, nil for all } // ReplayFailedPosts will repost failed requests found in the FailedRequestsInDir -func (apierSv1 *APIerSv1) ReplayFailedPosts(ctx *context.Context, args *ArgsReplyFailedPosts, reply *string) (err error) { - failedReqsInDir := apierSv1.Config.GeneralCfg().FailedPostsDir - if args.FailedRequestsInDir != nil && *args.FailedRequestsInDir != "" { - failedReqsInDir = *args.FailedRequestsInDir +func (apierSv1 *APIerSv1) ReplayFailedPosts(ctx *context.Context, args ReplayFailedPostsParams, reply *string) error { + + // Set default directories if not provided. + if args.SourcePath == "" { + args.SourcePath = apierSv1.Config.GeneralCfg().FailedPostsDir } - failedReqsOutDir := failedReqsInDir - if args.FailedRequestsOutDir != nil && *args.FailedRequestsOutDir != "" { - failedReqsOutDir = *args.FailedRequestsOutDir + if args.FailedPath == "" { + args.FailedPath = args.SourcePath } - filesInDir, _ := os.ReadDir(failedReqsInDir) - if len(filesInDir) == 0 { - return utils.ErrNotFound - } - for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config - if len(args.Modules) != 0 { - var allowedModule bool - for _, mod := range args.Modules { - if strings.HasPrefix(file.Name(), mod) { - allowedModule = true - break - } - } - if !allowedModule { - continue // this file is not to be processed due to Modules ACL - } - } - filePath := path.Join(failedReqsInDir, file.Name()) - expEv, err := ees.NewExportEventsFromFile(filePath) + + if err := filepath.WalkDir(args.SourcePath, func(path string, d fs.DirEntry, err error) error { if err != nil { - return utils.NewErrServerError(err) + utils.Logger.Warning(fmt.Sprintf(" failed to access path %s: %v", path, err)) + return nil // skip paths that cause an error + } + if d.IsDir() { + return nil // skip directories } + // Skip files not belonging to the specified modules. + if len(args.Modules) != 0 && !slices.ContainsFunc(args.Modules, func(mod string) bool { + return strings.HasPrefix(d.Name(), mod) + }) { + utils.Logger.Info(fmt.Sprintf(" skipping file %s: not found within specified modules", d.Name())) + return nil + } + + expEv, err := ees.NewExportEventsFromFile(path) + if err != nil { + return fmt.Errorf("failed to init ExportEvents from %s: %v", path, err) + } + + // Determine the failover path. failoverPath := utils.MetaNone - if failedReqsOutDir != utils.MetaNone { - failoverPath = path.Join(failedReqsOutDir, file.Name()) + if args.FailedPath != utils.MetaNone { + failoverPath = filepath.Join(args.FailedPath, d.Name()) } failedPosts, err := expEv.ReplayFailedPosts(apierSv1.Config.GeneralCfg().PosterAttempts) - if err != nil && failedReqsOutDir != utils.MetaNone { // Got error from HTTPPoster could be that content was not written, we need to write it ourselves + if err != nil && failoverPath != utils.MetaNone { + // Write the events that failed to be replayed to the failover directory if err = failedPosts.WriteToFile(failoverPath); err != nil { - return utils.NewErrServerError(err) + return fmt.Errorf("failed to write the events that failed to be replayed to %s: %v", path, err) } } - + return nil + }); err != nil { + return utils.NewErrServerError(err) } *reply = utils.OK return nil diff --git a/apier/v1/apier_it_test.go b/apier/v1/apier_it_test.go index c4fc0b8c7..0bd272e1d 100644 --- a/apier/v1/apier_it_test.go +++ b/apier/v1/apier_it_test.go @@ -2170,34 +2170,26 @@ func testApierReplayFldPosts(t *testing.T) { bev := []byte(`{"ID":"cgrates.org:1007","BalanceMap":{"*monetary":[{"Uuid":"367be35a-96ee-40a5-b609-9130661f5f12","ID":"","Value":0,"ExpirationDate":"0001-01-01T00:00:00Z","Weight":10,"DestinationIDs":{},"RatingSubject":"","Categories":{},"SharedGroups":{"SHARED_A":true},"Timings":null,"TimingIDs":{},"Disabled":false,"Factors":null,"Blocker":false}]},"UnitCounters":{"*monetary":[{"CounterType":"*event","Counters":[{"Value":0,"Filter":{"Uuid":null,"ID":"b8531413-10d5-47ad-81ad-2bc272e8f0ca","Type":"*monetary","Value":null,"ExpirationDate":null,"Weight":null,"DestinationIDs":{"FS_USERS":true},"RatingSubject":null,"Categories":null,"SharedGroups":null,"TimingIDs":null,"Timings":null,"Disabled":null,"Factors":null,"Blocker":null}}]}]},"ActionTriggers":[{"ID":"STANDARD_TRIGGERS","UniqueID":"46ac7b8c-685d-4555-bf73-fa6cfbc2fa21","ThresholdType":"*min_balance","ThresholdValue":2,"Recurrent":false,"MinSleep":0,"ExpirationDate":"0001-01-01T00:00:00Z","ActivationDate":"0001-01-01T00:00:00Z","Balance":{"Uuid":null,"ID":null,"Type":"*monetary","Value":null,"ExpirationDate":null,"Weight":null,"DestinationIDs":null,"RatingSubject":null,"Categories":null,"SharedGroups":null,"TimingIDs":null,"Timings":null,"Disabled":null,"Factors":null,"Blocker":null},"Weight":10,"ActionsID":"LOG_WARNING","MinQueuedItems":0,"Executed":true,"LastExecutionTime":"2017-01-31T14:03:57.961651647+01:00"},{"ID":"STANDARD_TRIGGERS","UniqueID":"b8531413-10d5-47ad-81ad-2bc272e8f0ca","ThresholdType":"*max_event_counter","ThresholdValue":5,"Recurrent":false,"MinSleep":0,"ExpirationDate":"0001-01-01T00:00:00Z","ActivationDate":"0001-01-01T00:00:00Z","Balance":{"Uuid":null,"ID":null,"Type":"*monetary","Value":null,"ExpirationDate":null,"Weight":null,"DestinationIDs":{"FS_USERS":true},"RatingSubject":null,"Categories":null,"SharedGroups":null,"TimingIDs":null,"Timings":null,"Disabled":null,"Factors":null,"Blocker":null},"Weight":10,"ActionsID":"LOG_WARNING","MinQueuedItems":0,"Executed":false,"LastExecutionTime":"0001-01-01T00:00:00Z"},{"ID":"STANDARD_TRIGGERS","UniqueID":"8b424186-7a31-4aef-99c5-35e12e6fed41","ThresholdType":"*max_balance","ThresholdValue":20,"Recurrent":false,"MinSleep":0,"ExpirationDate":"0001-01-01T00:00:00Z","ActivationDate":"0001-01-01T00:00:00Z","Balance":{"Uuid":null,"ID":null,"Type":"*monetary","Value":null,"ExpirationDate":null,"Weight":null,"DestinationIDs":null,"RatingSubject":null,"Categories":null,"SharedGroups":null,"TimingIDs":null,"Timings":null,"Disabled":null,"Factors":null,"Blocker":null},"Weight":10,"ActionsID":"LOG_WARNING","MinQueuedItems":0,"Executed":false,"LastExecutionTime":"0001-01-01T00:00:00Z"},{"ID":"STANDARD_TRIGGERS","UniqueID":"28557f3b-139c-4a27-9d17-bda1f54b7c19","ThresholdType":"*max_balance","ThresholdValue":100,"Recurrent":false,"MinSleep":0,"ExpirationDate":"0001-01-01T00:00:00Z","ActivationDate":"0001-01-01T00:00:00Z","Balance":{"Uuid":null,"ID":null,"Type":"*monetary","Value":null,"ExpirationDate":null,"Weight":null,"DestinationIDs":null,"RatingSubject":null,"Categories":null,"SharedGroups":null,"TimingIDs":null,"Timings":null,"Disabled":null,"Factors":null,"Blocker":null},"Weight":10,"ActionsID":"DISABLE_AND_LOG","MinQueuedItems":0,"Executed":false,"LastExecutionTime":"0001-01-01T00:00:00Z"}],"AllowNegative":false,"Disabled":false}"`) ev := &ees.ExportEvents{ Path: "http://localhost:2081", - Format: utils.MetaHTTPjsonMap, + Type: utils.MetaHTTPjsonMap, Events: []any{&ees.HTTPPosterRequest{Body: bev, Header: http.Header{"Content-Type": []string{"application/json"}}}}, } fileName := "act>*http_post|63bed4ea-615e-4096-b1f4-499f64f29b28.json" - args := ArgsReplyFailedPosts{ - FailedRequestsInDir: utils.StringPointer("/tmp/TestsAPIerSv1/in"), - FailedRequestsOutDir: utils.StringPointer("/tmp/TestsAPIerSv1/out"), + args := ReplayFailedPostsParams{ + SourcePath: t.TempDir(), + FailedPath: t.TempDir(), } - for _, dir := range []string{*args.FailedRequestsInDir, *args.FailedRequestsOutDir} { - if err := os.RemoveAll(dir); err != nil { - t.Errorf("Error %s removing folder: %s", err, dir) - } - if err := os.MkdirAll(dir, 0755); err != nil { - t.Errorf("Error %s creating folder: %s", err, dir) - } - } - err := ev.WriteToFile(path.Join(*args.FailedRequestsInDir, fileName)) + err := ev.WriteToFile(path.Join(args.SourcePath, fileName)) if err != nil { t.Error(err) } var reply string - if err := rater.Call(context.Background(), utils.APIerSv1ReplayFailedPosts, &args, &reply); err != nil { + if err := rater.Call(context.Background(), utils.APIerSv1ReplayFailedPosts, args, &reply); err != nil { t.Error(err) } else if reply != utils.OK { t.Error("Unexpected reply: ", reply) } - outPath := path.Join(*args.FailedRequestsOutDir, fileName) + outPath := path.Join(args.FailedPath, fileName) outEv, err := ees.NewExportEventsFromFile(outPath) if err != nil { t.Error(err) @@ -2206,21 +2198,21 @@ func testApierReplayFldPosts(t *testing.T) { } fileName = "cdr|ae8cc4b3-5e60-4396-b82a-64b96a72a03c.json" bev = []byte(`{"CGRID":"88ed9c38005f07576a1e1af293063833b60edcc6"}`) - fileInPath := path.Join(*args.FailedRequestsInDir, fileName) + fileInPath := path.Join(args.SourcePath, fileName) ev = &ees.ExportEvents{ Path: "amqp://guest:guest@localhost:5672/", Opts: &config.EventExporterOpts{ AMQP: &config.AMQPOpts{ QueueID: utils.StringPointer("cgrates_cdrs")}, }, - Format: utils.MetaAMQPjsonMap, + Type: utils.MetaAMQPjsonMap, Events: []any{bev}, } - err = ev.WriteToFile(path.Join(*args.FailedRequestsInDir, fileName)) + err = ev.WriteToFile(path.Join(args.SourcePath, fileName)) if err != nil { t.Error(err) } - if err := rater.Call(context.Background(), utils.APIerSv1ReplayFailedPosts, &args, &reply); err != nil { + if err := rater.Call(context.Background(), utils.APIerSv1ReplayFailedPosts, args, &reply); err != nil { t.Error(err) } else if reply != utils.OK { t.Error("Unexpected reply: ", reply) @@ -2228,7 +2220,7 @@ func testApierReplayFldPosts(t *testing.T) { if _, err := os.Stat(fileInPath); !os.IsNotExist(err) { t.Error("InFile still exists") } - if _, err := os.Stat(path.Join(*args.FailedRequestsOutDir, fileName)); !os.IsNotExist(err) { + if _, err := os.Stat(path.Join(args.FailedPath, fileName)); !os.IsNotExist(err) { t.Error("OutFile created") } // connect to RabbitMQ server and check if the content was posted there @@ -2262,7 +2254,7 @@ func testApierReplayFldPosts(t *testing.T) { case <-time.After(100 * time.Millisecond): t.Error("No message received from RabbitMQ") } - for _, dir := range []string{*args.FailedRequestsInDir, *args.FailedRequestsOutDir} { + for _, dir := range []string{args.SourcePath, args.FailedPath} { if err := os.RemoveAll(dir); err != nil { t.Errorf("Error %s removing folder: %s", err, dir) } diff --git a/ees/libcdre.go b/ees/libcdre.go index f5f0e8b85..593dcf38a 100644 --- a/ees/libcdre.go +++ b/ees/libcdre.go @@ -96,7 +96,7 @@ func AddFailedPost(failedPostsDir, expPath, format string, ev any, opts *config. if failedPost == nil { failedPost = &ExportEvents{ Path: expPath, - Format: format, + Type: format, Opts: opts, failedPostsDir: failedPostsDir, } @@ -129,7 +129,7 @@ type ExportEvents struct { lk sync.RWMutex Path string Opts *config.EventExporterOpts - Format string + Type string Events []any failedPostsDir string } @@ -162,32 +162,34 @@ func (expEv *ExportEvents) AddEvent(ev any) { // ReplayFailedPosts tryies to post cdrs again func (expEv *ExportEvents) ReplayFailedPosts(attempts int) (failedEvents *ExportEvents, err error) { - failedEvents = &ExportEvents{ - Path: expEv.Path, - Opts: expEv.Opts, - Format: expEv.Format, - } - - eeCfg := config.NewEventExporterCfg("ReplayFailedPosts", expEv.Format, expEv.Path, utils.MetaNone, + eeCfg := config.NewEventExporterCfg("ReplayFailedPosts", expEv.Type, expEv.Path, utils.MetaNone, attempts, expEv.Opts) var ee EventExporter if ee, err = NewEventExporter(eeCfg, config.CgrConfig(), nil, nil); err != nil { return } keyFunc := func() string { return utils.EmptyString } - if expEv.Format == utils.MetaKafkajsonMap || expEv.Format == utils.MetaS3jsonMap { + if expEv.Type == utils.MetaKafkajsonMap || expEv.Type == utils.MetaS3jsonMap { keyFunc = utils.UUIDSha1Prefix } + failedEvents = &ExportEvents{ + Path: expEv.Path, + Opts: expEv.Opts, + Type: expEv.Type, + } for _, ev := range expEv.Events { if err = ExportWithAttempts(ee, ev, keyFunc()); err != nil { failedEvents.AddEvent(ev) } } ee.Close() - if len(failedEvents.Events) > 0 { - err = utils.ErrPartiallyExecuted - } else { - failedEvents = nil + + switch len(failedEvents.Events) { + case 0: // none failed to be replayed + return nil, nil + case len(expEv.Events): // all failed, return last encountered error + return failedEvents, err + default: + return failedEvents, utils.ErrPartiallyExecuted } - return } diff --git a/ees/libcdre_it_test.go b/ees/libcdre_it_test.go index 24032e8a4..07b24b26b 100644 --- a/ees/libcdre_it_test.go +++ b/ees/libcdre_it_test.go @@ -81,7 +81,7 @@ func TestWriteToFile(t *testing.T) { exportEvent = &ExportEvents{ Events: []any{"something1", "something2"}, Path: "path", - Format: "test", + Type: "test", } filePath = "/tmp/engine/libcdre_test/writeToFile2.txt" if err := exportEvent.WriteToFile(filePath); err != nil { diff --git a/ees/libcdre_test.go b/ees/libcdre_test.go index 93f7dd944..edc5a6acf 100644 --- a/ees/libcdre_test.go +++ b/ees/libcdre_test.go @@ -62,7 +62,7 @@ func TestAddFldPost(t *testing.T) { } eOut := &ExportEvents{ Path: "path1", - Format: "format1", + Type: "format1", Events: []any{"1"}, Opts: &config.EventExporterOpts{ AMQP: &config.AMQPOpts{}, @@ -110,7 +110,7 @@ func TestAddFldPost(t *testing.T) { } eOut = &ExportEvents{ Path: "path1", - Format: "format1", + Type: "format1", Events: []any{"1", "2"}, Opts: &config.EventExporterOpts{ AMQP: &config.AMQPOpts{}, @@ -138,7 +138,7 @@ func TestAddFldPost(t *testing.T) { } eOut = &ExportEvents{ Path: "path2", - Format: "format2", + Type: "format2", Events: []any{"3"}, Opts: &config.EventExporterOpts{ Els: &config.ElsOpts{}, diff --git a/general_tests/cdrs_exp_it_test.go b/general_tests/cdrs_exp_it_test.go index 4eba4a660..03334564f 100644 --- a/general_tests/cdrs_exp_it_test.go +++ b/general_tests/cdrs_exp_it_test.go @@ -376,7 +376,7 @@ func testCDRsExpFileFailover(t *testing.T) { t.Error("Expected at least one event") continue } - rcvFormats.Add(ev.Format) + rcvFormats.Add(ev.Type) if err := checkContent(ev, []any{[]byte(utils.ToJSON(cdrsExpEvExp))}); err != nil { t.Errorf("For file <%s> and event <%s> received %s", filePath, utils.ToJSON(ev), err) } diff --git a/general_tests/cdrs_onlexp_it_test.go b/general_tests/cdrs_onlexp_it_test.go index 58c1d897e..e7fd4e02d 100644 --- a/general_tests/cdrs_onlexp_it_test.go +++ b/general_tests/cdrs_onlexp_it_test.go @@ -474,8 +474,8 @@ func testCDRsOnExpFileFailover(t *testing.T) { t.Error("Expected at least one event") continue } - if ev.Format != utils.MetaHTTPPost { - t.Errorf("Expected %s to be only failed exporter,received <%s>", utils.MetaHTTPPost, ev.Format) + if ev.Type != utils.MetaHTTPPost { + t.Errorf("Expected %s to be only failed exporter,received <%s>", utils.MetaHTTPPost, ev.Type) } if err := checkContent(ev, httpContent); err != nil { t.Errorf("For file <%s> and event <%s> received %s", filePath, utils.ToJSON(ev), err) diff --git a/general_tests/cdrs_post_failover_it_test.go b/general_tests/cdrs_post_failover_it_test.go index e15c732a4..5f682b238 100644 --- a/general_tests/cdrs_post_failover_it_test.go +++ b/general_tests/cdrs_post_failover_it_test.go @@ -203,8 +203,8 @@ func testCDRsPostFailoverToFile(t *testing.T) { t.Error("Expected at least one event") continue } - if ev.Format != utils.MetaS3jsonMap { - t.Errorf("Expected event to use %q received: %q", utils.MetaS3jsonMap, ev.Format) + if ev.Type != utils.MetaS3jsonMap { + t.Errorf("Expected event to use %q received: %q", utils.MetaS3jsonMap, ev.Type) } if len(ev.Events) != 3 { t.Errorf("Expected all the events to be saved in the same file, ony %v saved in this file.", len(ev.Events)) diff --git a/general_tests/ees_it_test.go b/general_tests/ees_it_test.go index 41eeee05d..b8ce63bb3 100644 --- a/general_tests/ees_it_test.go +++ b/general_tests/ees_it_test.go @@ -21,13 +21,19 @@ along with this program. If not, see package general_tests import ( + "fmt" + "os/exec" "testing" "time" "github.com/cgrates/birpc/context" + v1 "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/nats-io/nats.go" ) // TestEEsExportEventChanges tests if the event that's about to be exported can be changed from one exporter to @@ -326,3 +332,200 @@ func TestEEsExportEventChanges(t *testing.T) { } }) } + +// TestEEsReplayFailedPosts tests the implementation of the APIerSv1.ReplayFailedPosts. +func TestEEsReplayFailedPosts(t *testing.T) { + switch *utils.DBType { + case utils.MetaInternal: + case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("unsupported dbtype value") + } + + failedDir := t.TempDir() + + content := fmt.Sprintf(`{ + +"general": { + "log_level": 7, + "failed_posts_ttl": "3ms", + "poster_attempts": 1 +}, + +"data_db": { + "db_type": "*internal" +}, + +"stor_db": { + "db_type": "*internal" +}, + +"apiers": { + "enabled": true +}, + +"ees": { + "enabled": true, + "exporters": [ + { + "id": "nats_exporter", + "type": "*nats_json_map", + "flags": ["*log"], + "export_path": "nats://localhost:4222", + "failed_posts_dir": "%s", + "attempts": 1, + "synchronous": true, + "opts": { + "natsSubject": "processed_cdrs", + }, + "fields":[ + {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, + ] + } + ] +} + +}`, failedDir) + + testEnv := TestEnvironment{ + Name: "TestEEsReplayFailedPosts", + ConfigJSON: content, + // LogBuffer: &bytes.Buffer{}, + } + // defer fmt.Println(testEnv.LogBuffer) + client, _ := testEnv.Setup(t, 0) + + // helper to sort slices + less := func(a, b string) bool { return a < b } + + // amount of events to export/replay + count := 5 + + t.Run("successful nats export", func(t *testing.T) { + cmd := exec.Command("nats-server") + if err := cmd.Start(); err != nil { + t.Fatalf("failed to start nats-server: %v", err) + } + time.Sleep(50 * time.Millisecond) + defer cmd.Process.Kill() + + nc, err := nats.Connect("nats://localhost:4222", nats.Timeout(time.Second), nats.DrainTimeout(time.Second)) + if err != nil { + t.Fatalf("failed to connect to nats-server: %v", err) + } + defer nc.Drain() + + ch := make(chan *nats.Msg, count) + sub, err := nc.ChanQueueSubscribe("processed_cdrs", "", ch) + if err != nil { + t.Fatalf("failed to subscribe to nats queue: %v", err) + } + + var reply map[string]map[string]any + for i := range count { + if err := client.Call(context.Background(), utils.EeSv1ProcessEvent, &engine.CGREventWithEeIDs{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "test", + Event: map[string]any{ + utils.CGRID: i, + }, + }, + }, &reply); err != nil { + t.Errorf("EeSv1.ProcessEvent returned unexpected err: %v", err) + } + } + + time.Sleep(1 * time.Millisecond) // wait for the channel to receive the replayed exports + want := make([]string, 0, count) + for i := range count { + want = append(want, fmt.Sprintf(`{"CGRID":"%d"}`, i)) + } + if err := sub.Unsubscribe(); err != nil { + t.Errorf("failed to unsubscribe from nats subject: %v", err) + } + close(ch) + got := make([]string, 0, count) + for elem := range ch { + got = append(got, string(elem.Data)) + } + if diff := cmp.Diff(want, got, cmpopts.SortSlices(less)); diff != "" { + t.Errorf("unexpected nats messages received over channel (-want +got): \n%s", diff) + } + }) + + t.Run("replay failed nats export", func(t *testing.T) { + var exportReply map[string]map[string]any + for i := range count { + err := client.Call(context.Background(), utils.EeSv1ProcessEvent, + &engine.CGREventWithEeIDs{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "test", + Event: map[string]any{ + utils.CGRID: i, + }, + }, + }, &exportReply) + if err == nil || err.Error() != utils.ErrPartiallyExecuted.Error() { + t.Errorf("EeSv1.ProcessEvent err = %v, want %v", err, utils.ErrPartiallyExecuted) + } + } + + time.Sleep(5 * time.Millisecond) + replayFailedDir := t.TempDir() + var replayReply string + if err := client.Call(context.Background(), utils.APIerSv1ReplayFailedPosts, v1.ReplayFailedPostsParams{ + SourcePath: failedDir, + FailedPath: replayFailedDir, + Modules: []string{"test", "EEs"}, + }, &replayReply); err != nil { + t.Errorf("APIerSv1.ReplayFailedPosts returned unexpected err: %v", err) + } + + cmd := exec.Command("nats-server") + if err := cmd.Start(); err != nil { + t.Fatalf("failed to start nats-server: %v", err) + } + time.Sleep(50 * time.Millisecond) + defer cmd.Process.Kill() + + nc, err := nats.Connect("nats://localhost:4222", nats.Timeout(time.Second), nats.DrainTimeout(time.Second)) + if err != nil { + t.Fatalf("failed to connect to nats-server: %v", err) + } + defer nc.Drain() + + ch := make(chan *nats.Msg, count) + sub, err := nc.ChanQueueSubscribe("processed_cdrs", "", ch) + if err != nil { + t.Fatalf("failed to subscribe to nats queue: %v", err) + } + + if err := client.Call(context.Background(), utils.APIerSv1ReplayFailedPosts, v1.ReplayFailedPostsParams{ + SourcePath: replayFailedDir, + FailedPath: utils.MetaNone, + Modules: []string{"test", "EEs"}, + }, &replayReply); err != nil { + t.Errorf("APIerSv1.ReplayFailedPosts returned unexpected err: %v", err) + } + time.Sleep(time.Millisecond) // wait for the channel to receive the replayed exports + + want := make([]string, 0, count) + for i := range count { + want = append(want, fmt.Sprintf(`{"CGRID":"%d"}`, i)) + } + if err := sub.Unsubscribe(); err != nil { + t.Errorf("failed to unsubscribe from nats subject: %v", err) + } + close(ch) + got := make([]string, 0, count) + for elem := range ch { + got = append(got, string(elem.Data)) + } + if diff := cmp.Diff(want, got, cmpopts.SortSlices(less)); diff != "" { + t.Errorf("unexpected nats messages received over channel (-want +got): \n%s", diff) + } + }) +} diff --git a/general_tests/poster_it_test.go b/general_tests/poster_it_test.go index 9f411a9cc..a64b9e49e 100644 --- a/general_tests/poster_it_test.go +++ b/general_tests/poster_it_test.go @@ -136,7 +136,7 @@ func testPosterReadFolder(format string) (expEv *ees.ExportEvents, err error) { if err != nil { return } - if expEv.Format == format { + if expEv.Type == format { return } }