Revise APIerSv1.ReplayFailedPosts API + tests

- renamed parameter type: ArgsReplyFailedPosts -> ReplayFailedPostsParams
- renamed param fields:
  - FailedRequestsInDir -> SourcePath
  - FailedRequestsOutDir -> FailedPath
- changed param fields types from *string to string
- used the SourcePath and FailedPath params directly instead of creating separate variables
- used filepath.WalkDir instead of reading the directory and looping over the entries
- used slices.ContainsFunc to check if the file belongs to any module (if 1+ is specified)
- used filepath.Join instead of path.Join
- used the path provided by WalkFunc instead of building the file paths ourselves
- made error returns more descriptive
- added logs for directories/files that are skipped
- paths that cannot be accessed are skipped after logging the error
This commit is contained in:
ionutboangiu
2024-06-05 20:45:31 +03:00
committed by Dan Christian Bogos
parent 89f97d45e1
commit b7dacfe8a6
10 changed files with 286 additions and 82 deletions

View File

@@ -22,8 +22,10 @@ import (
"encoding/csv"
"errors"
"fmt"
"io/fs"
"os"
"path"
"path/filepath"
"slices"
"strconv"
"strings"
@@ -1416,57 +1418,62 @@ func (apierSv1 *APIerSv1) RemoveActions(ctx *context.Context, attr *AttrRemoveAc
return nil
}
type ArgsReplyFailedPosts struct {
FailedRequestsInDir *string // if defined it will be our source of requests to be replayed
FailedRequestsOutDir *string // if defined it will become our destination for files failing to be replayed, *none to be discarded
Modules []string // list of modules for which replay the requests, nil for all
// ReplayFailedPostsParams contains parameters for replaying failed posts.
type ReplayFailedPostsParams struct {
SourcePath string // path for events to be replayed
FailedPath string // path for events that failed to replay, *none to discard, defaults to SourceDir if empty
Modules []string // list of modules to replay requests for, nil for all
}
// ReplayFailedPosts will repost failed requests found in the FailedRequestsInDir
func (apierSv1 *APIerSv1) ReplayFailedPosts(ctx *context.Context, args *ArgsReplyFailedPosts, reply *string) (err error) {
failedReqsInDir := apierSv1.Config.GeneralCfg().FailedPostsDir
if args.FailedRequestsInDir != nil && *args.FailedRequestsInDir != "" {
failedReqsInDir = *args.FailedRequestsInDir
func (apierSv1 *APIerSv1) ReplayFailedPosts(ctx *context.Context, args ReplayFailedPostsParams, reply *string) error {
// Set default directories if not provided.
if args.SourcePath == "" {
args.SourcePath = apierSv1.Config.GeneralCfg().FailedPostsDir
}
failedReqsOutDir := failedReqsInDir
if args.FailedRequestsOutDir != nil && *args.FailedRequestsOutDir != "" {
failedReqsOutDir = *args.FailedRequestsOutDir
if args.FailedPath == "" {
args.FailedPath = args.SourcePath
}
filesInDir, _ := os.ReadDir(failedReqsInDir)
if len(filesInDir) == 0 {
return utils.ErrNotFound
}
for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config
if len(args.Modules) != 0 {
var allowedModule bool
for _, mod := range args.Modules {
if strings.HasPrefix(file.Name(), mod) {
allowedModule = true
break
}
}
if !allowedModule {
continue // this file is not to be processed due to Modules ACL
}
}
filePath := path.Join(failedReqsInDir, file.Name())
expEv, err := ees.NewExportEventsFromFile(filePath)
if err := filepath.WalkDir(args.SourcePath, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return utils.NewErrServerError(err)
utils.Logger.Warning(fmt.Sprintf("<ReplayFailedPosts> failed to access path %s: %v", path, err))
return nil // skip paths that cause an error
}
if d.IsDir() {
return nil // skip directories
}
// Skip files not belonging to the specified modules.
if len(args.Modules) != 0 && !slices.ContainsFunc(args.Modules, func(mod string) bool {
return strings.HasPrefix(d.Name(), mod)
}) {
utils.Logger.Info(fmt.Sprintf("<ReplayFailedPosts> skipping file %s: not found within specified modules", d.Name()))
return nil
}
expEv, err := ees.NewExportEventsFromFile(path)
if err != nil {
return fmt.Errorf("failed to init ExportEvents from %s: %v", path, err)
}
// Determine the failover path.
failoverPath := utils.MetaNone
if failedReqsOutDir != utils.MetaNone {
failoverPath = path.Join(failedReqsOutDir, file.Name())
if args.FailedPath != utils.MetaNone {
failoverPath = filepath.Join(args.FailedPath, d.Name())
}
failedPosts, err := expEv.ReplayFailedPosts(apierSv1.Config.GeneralCfg().PosterAttempts)
if err != nil && failedReqsOutDir != utils.MetaNone { // Got error from HTTPPoster could be that content was not written, we need to write it ourselves
if err != nil && failoverPath != utils.MetaNone {
// Write the events that failed to be replayed to the failover directory
if err = failedPosts.WriteToFile(failoverPath); err != nil {
return utils.NewErrServerError(err)
return fmt.Errorf("failed to write the events that failed to be replayed to %s: %v", path, err)
}
}
return nil
}); err != nil {
return utils.NewErrServerError(err)
}
*reply = utils.OK
return nil

View File

@@ -2170,34 +2170,26 @@ func testApierReplayFldPosts(t *testing.T) {
bev := []byte(`{"ID":"cgrates.org:1007","BalanceMap":{"*monetary":[{"Uuid":"367be35a-96ee-40a5-b609-9130661f5f12","ID":"","Value":0,"ExpirationDate":"0001-01-01T00:00:00Z","Weight":10,"DestinationIDs":{},"RatingSubject":"","Categories":{},"SharedGroups":{"SHARED_A":true},"Timings":null,"TimingIDs":{},"Disabled":false,"Factors":null,"Blocker":false}]},"UnitCounters":{"*monetary":[{"CounterType":"*event","Counters":[{"Value":0,"Filter":{"Uuid":null,"ID":"b8531413-10d5-47ad-81ad-2bc272e8f0ca","Type":"*monetary","Value":null,"ExpirationDate":null,"Weight":null,"DestinationIDs":{"FS_USERS":true},"RatingSubject":null,"Categories":null,"SharedGroups":null,"TimingIDs":null,"Timings":null,"Disabled":null,"Factors":null,"Blocker":null}}]}]},"ActionTriggers":[{"ID":"STANDARD_TRIGGERS","UniqueID":"46ac7b8c-685d-4555-bf73-fa6cfbc2fa21","ThresholdType":"*min_balance","ThresholdValue":2,"Recurrent":false,"MinSleep":0,"ExpirationDate":"0001-01-01T00:00:00Z","ActivationDate":"0001-01-01T00:00:00Z","Balance":{"Uuid":null,"ID":null,"Type":"*monetary","Value":null,"ExpirationDate":null,"Weight":null,"DestinationIDs":null,"RatingSubject":null,"Categories":null,"SharedGroups":null,"TimingIDs":null,"Timings":null,"Disabled":null,"Factors":null,"Blocker":null},"Weight":10,"ActionsID":"LOG_WARNING","MinQueuedItems":0,"Executed":true,"LastExecutionTime":"2017-01-31T14:03:57.961651647+01:00"},{"ID":"STANDARD_TRIGGERS","UniqueID":"b8531413-10d5-47ad-81ad-2bc272e8f0ca","ThresholdType":"*max_event_counter","ThresholdValue":5,"Recurrent":false,"MinSleep":0,"ExpirationDate":"0001-01-01T00:00:00Z","ActivationDate":"0001-01-01T00:00:00Z","Balance":{"Uuid":null,"ID":null,"Type":"*monetary","Value":null,"ExpirationDate":null,"Weight":null,"DestinationIDs":{"FS_USERS":true},"RatingSubject":null,"Categories":null,"SharedGroups":null,"TimingIDs":null,"Timings":null,"Disabled":null,"Factors":null,"Blocker":null},"Weight":10,"ActionsID":"LOG_WARNING","MinQueuedItems":0,"Executed":false,"LastExecutionTime":"0001-01-01T00:00:00Z"},{"ID":"STANDARD_TRIGGERS","UniqueID":"8b424186-7a31-4aef-99c5-35e12e6fed41","ThresholdType":"*max_balance","ThresholdValue":20,"Recurrent":false,"MinSleep":0,"ExpirationDate":"0001-01-01T00:00:00Z","ActivationDate":"0001-01-01T00:00:00Z","Balance":{"Uuid":null,"ID":null,"Type":"*monetary","Value":null,"ExpirationDate":null,"Weight":null,"DestinationIDs":null,"RatingSubject":null,"Categories":null,"SharedGroups":null,"TimingIDs":null,"Timings":null,"Disabled":null,"Factors":null,"Blocker":null},"Weight":10,"ActionsID":"LOG_WARNING","MinQueuedItems":0,"Executed":false,"LastExecutionTime":"0001-01-01T00:00:00Z"},{"ID":"STANDARD_TRIGGERS","UniqueID":"28557f3b-139c-4a27-9d17-bda1f54b7c19","ThresholdType":"*max_balance","ThresholdValue":100,"Recurrent":false,"MinSleep":0,"ExpirationDate":"0001-01-01T00:00:00Z","ActivationDate":"0001-01-01T00:00:00Z","Balance":{"Uuid":null,"ID":null,"Type":"*monetary","Value":null,"ExpirationDate":null,"Weight":null,"DestinationIDs":null,"RatingSubject":null,"Categories":null,"SharedGroups":null,"TimingIDs":null,"Timings":null,"Disabled":null,"Factors":null,"Blocker":null},"Weight":10,"ActionsID":"DISABLE_AND_LOG","MinQueuedItems":0,"Executed":false,"LastExecutionTime":"0001-01-01T00:00:00Z"}],"AllowNegative":false,"Disabled":false}"`)
ev := &ees.ExportEvents{
Path: "http://localhost:2081",
Format: utils.MetaHTTPjsonMap,
Type: utils.MetaHTTPjsonMap,
Events: []any{&ees.HTTPPosterRequest{Body: bev, Header: http.Header{"Content-Type": []string{"application/json"}}}},
}
fileName := "act>*http_post|63bed4ea-615e-4096-b1f4-499f64f29b28.json"
args := ArgsReplyFailedPosts{
FailedRequestsInDir: utils.StringPointer("/tmp/TestsAPIerSv1/in"),
FailedRequestsOutDir: utils.StringPointer("/tmp/TestsAPIerSv1/out"),
args := ReplayFailedPostsParams{
SourcePath: t.TempDir(),
FailedPath: t.TempDir(),
}
for _, dir := range []string{*args.FailedRequestsInDir, *args.FailedRequestsOutDir} {
if err := os.RemoveAll(dir); err != nil {
t.Errorf("Error %s removing folder: %s", err, dir)
}
if err := os.MkdirAll(dir, 0755); err != nil {
t.Errorf("Error %s creating folder: %s", err, dir)
}
}
err := ev.WriteToFile(path.Join(*args.FailedRequestsInDir, fileName))
err := ev.WriteToFile(path.Join(args.SourcePath, fileName))
if err != nil {
t.Error(err)
}
var reply string
if err := rater.Call(context.Background(), utils.APIerSv1ReplayFailedPosts, &args, &reply); err != nil {
if err := rater.Call(context.Background(), utils.APIerSv1ReplayFailedPosts, args, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Error("Unexpected reply: ", reply)
}
outPath := path.Join(*args.FailedRequestsOutDir, fileName)
outPath := path.Join(args.FailedPath, fileName)
outEv, err := ees.NewExportEventsFromFile(outPath)
if err != nil {
t.Error(err)
@@ -2206,21 +2198,21 @@ func testApierReplayFldPosts(t *testing.T) {
}
fileName = "cdr|ae8cc4b3-5e60-4396-b82a-64b96a72a03c.json"
bev = []byte(`{"CGRID":"88ed9c38005f07576a1e1af293063833b60edcc6"}`)
fileInPath := path.Join(*args.FailedRequestsInDir, fileName)
fileInPath := path.Join(args.SourcePath, fileName)
ev = &ees.ExportEvents{
Path: "amqp://guest:guest@localhost:5672/",
Opts: &config.EventExporterOpts{
AMQP: &config.AMQPOpts{
QueueID: utils.StringPointer("cgrates_cdrs")},
},
Format: utils.MetaAMQPjsonMap,
Type: utils.MetaAMQPjsonMap,
Events: []any{bev},
}
err = ev.WriteToFile(path.Join(*args.FailedRequestsInDir, fileName))
err = ev.WriteToFile(path.Join(args.SourcePath, fileName))
if err != nil {
t.Error(err)
}
if err := rater.Call(context.Background(), utils.APIerSv1ReplayFailedPosts, &args, &reply); err != nil {
if err := rater.Call(context.Background(), utils.APIerSv1ReplayFailedPosts, args, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Error("Unexpected reply: ", reply)
@@ -2228,7 +2220,7 @@ func testApierReplayFldPosts(t *testing.T) {
if _, err := os.Stat(fileInPath); !os.IsNotExist(err) {
t.Error("InFile still exists")
}
if _, err := os.Stat(path.Join(*args.FailedRequestsOutDir, fileName)); !os.IsNotExist(err) {
if _, err := os.Stat(path.Join(args.FailedPath, fileName)); !os.IsNotExist(err) {
t.Error("OutFile created")
}
// connect to RabbitMQ server and check if the content was posted there
@@ -2262,7 +2254,7 @@ func testApierReplayFldPosts(t *testing.T) {
case <-time.After(100 * time.Millisecond):
t.Error("No message received from RabbitMQ")
}
for _, dir := range []string{*args.FailedRequestsInDir, *args.FailedRequestsOutDir} {
for _, dir := range []string{args.SourcePath, args.FailedPath} {
if err := os.RemoveAll(dir); err != nil {
t.Errorf("Error %s removing folder: %s", err, dir)
}

View File

@@ -96,7 +96,7 @@ func AddFailedPost(failedPostsDir, expPath, format string, ev any, opts *config.
if failedPost == nil {
failedPost = &ExportEvents{
Path: expPath,
Format: format,
Type: format,
Opts: opts,
failedPostsDir: failedPostsDir,
}
@@ -129,7 +129,7 @@ type ExportEvents struct {
lk sync.RWMutex
Path string
Opts *config.EventExporterOpts
Format string
Type string
Events []any
failedPostsDir string
}
@@ -162,32 +162,34 @@ func (expEv *ExportEvents) AddEvent(ev any) {
// ReplayFailedPosts tryies to post cdrs again
func (expEv *ExportEvents) ReplayFailedPosts(attempts int) (failedEvents *ExportEvents, err error) {
failedEvents = &ExportEvents{
Path: expEv.Path,
Opts: expEv.Opts,
Format: expEv.Format,
}
eeCfg := config.NewEventExporterCfg("ReplayFailedPosts", expEv.Format, expEv.Path, utils.MetaNone,
eeCfg := config.NewEventExporterCfg("ReplayFailedPosts", expEv.Type, expEv.Path, utils.MetaNone,
attempts, expEv.Opts)
var ee EventExporter
if ee, err = NewEventExporter(eeCfg, config.CgrConfig(), nil, nil); err != nil {
return
}
keyFunc := func() string { return utils.EmptyString }
if expEv.Format == utils.MetaKafkajsonMap || expEv.Format == utils.MetaS3jsonMap {
if expEv.Type == utils.MetaKafkajsonMap || expEv.Type == utils.MetaS3jsonMap {
keyFunc = utils.UUIDSha1Prefix
}
failedEvents = &ExportEvents{
Path: expEv.Path,
Opts: expEv.Opts,
Type: expEv.Type,
}
for _, ev := range expEv.Events {
if err = ExportWithAttempts(ee, ev, keyFunc()); err != nil {
failedEvents.AddEvent(ev)
}
}
ee.Close()
if len(failedEvents.Events) > 0 {
err = utils.ErrPartiallyExecuted
} else {
failedEvents = nil
switch len(failedEvents.Events) {
case 0: // none failed to be replayed
return nil, nil
case len(expEv.Events): // all failed, return last encountered error
return failedEvents, err
default:
return failedEvents, utils.ErrPartiallyExecuted
}
return
}

View File

@@ -81,7 +81,7 @@ func TestWriteToFile(t *testing.T) {
exportEvent = &ExportEvents{
Events: []any{"something1", "something2"},
Path: "path",
Format: "test",
Type: "test",
}
filePath = "/tmp/engine/libcdre_test/writeToFile2.txt"
if err := exportEvent.WriteToFile(filePath); err != nil {

View File

@@ -62,7 +62,7 @@ func TestAddFldPost(t *testing.T) {
}
eOut := &ExportEvents{
Path: "path1",
Format: "format1",
Type: "format1",
Events: []any{"1"},
Opts: &config.EventExporterOpts{
AMQP: &config.AMQPOpts{},
@@ -110,7 +110,7 @@ func TestAddFldPost(t *testing.T) {
}
eOut = &ExportEvents{
Path: "path1",
Format: "format1",
Type: "format1",
Events: []any{"1", "2"},
Opts: &config.EventExporterOpts{
AMQP: &config.AMQPOpts{},
@@ -138,7 +138,7 @@ func TestAddFldPost(t *testing.T) {
}
eOut = &ExportEvents{
Path: "path2",
Format: "format2",
Type: "format2",
Events: []any{"3"},
Opts: &config.EventExporterOpts{
Els: &config.ElsOpts{},

View File

@@ -376,7 +376,7 @@ func testCDRsExpFileFailover(t *testing.T) {
t.Error("Expected at least one event")
continue
}
rcvFormats.Add(ev.Format)
rcvFormats.Add(ev.Type)
if err := checkContent(ev, []any{[]byte(utils.ToJSON(cdrsExpEvExp))}); err != nil {
t.Errorf("For file <%s> and event <%s> received %s", filePath, utils.ToJSON(ev), err)
}

View File

@@ -474,8 +474,8 @@ func testCDRsOnExpFileFailover(t *testing.T) {
t.Error("Expected at least one event")
continue
}
if ev.Format != utils.MetaHTTPPost {
t.Errorf("Expected %s to be only failed exporter,received <%s>", utils.MetaHTTPPost, ev.Format)
if ev.Type != utils.MetaHTTPPost {
t.Errorf("Expected %s to be only failed exporter,received <%s>", utils.MetaHTTPPost, ev.Type)
}
if err := checkContent(ev, httpContent); err != nil {
t.Errorf("For file <%s> and event <%s> received %s", filePath, utils.ToJSON(ev), err)

View File

@@ -203,8 +203,8 @@ func testCDRsPostFailoverToFile(t *testing.T) {
t.Error("Expected at least one event")
continue
}
if ev.Format != utils.MetaS3jsonMap {
t.Errorf("Expected event to use %q received: %q", utils.MetaS3jsonMap, ev.Format)
if ev.Type != utils.MetaS3jsonMap {
t.Errorf("Expected event to use %q received: %q", utils.MetaS3jsonMap, ev.Type)
}
if len(ev.Events) != 3 {
t.Errorf("Expected all the events to be saved in the same file, ony %v saved in this file.", len(ev.Events))

View File

@@ -21,13 +21,19 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package general_tests
import (
"fmt"
"os/exec"
"testing"
"time"
"github.com/cgrates/birpc/context"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/nats-io/nats.go"
)
// TestEEsExportEventChanges tests if the event that's about to be exported can be changed from one exporter to
@@ -326,3 +332,200 @@ func TestEEsExportEventChanges(t *testing.T) {
}
})
}
// TestEEsReplayFailedPosts tests the implementation of the APIerSv1.ReplayFailedPosts.
func TestEEsReplayFailedPosts(t *testing.T) {
switch *utils.DBType {
case utils.MetaInternal:
case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres:
t.SkipNow()
default:
t.Fatal("unsupported dbtype value")
}
failedDir := t.TempDir()
content := fmt.Sprintf(`{
"general": {
"log_level": 7,
"failed_posts_ttl": "3ms",
"poster_attempts": 1
},
"data_db": {
"db_type": "*internal"
},
"stor_db": {
"db_type": "*internal"
},
"apiers": {
"enabled": true
},
"ees": {
"enabled": true,
"exporters": [
{
"id": "nats_exporter",
"type": "*nats_json_map",
"flags": ["*log"],
"export_path": "nats://localhost:4222",
"failed_posts_dir": "%s",
"attempts": 1,
"synchronous": true,
"opts": {
"natsSubject": "processed_cdrs",
},
"fields":[
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"},
]
}
]
}
}`, failedDir)
testEnv := TestEnvironment{
Name: "TestEEsReplayFailedPosts",
ConfigJSON: content,
// LogBuffer: &bytes.Buffer{},
}
// defer fmt.Println(testEnv.LogBuffer)
client, _ := testEnv.Setup(t, 0)
// helper to sort slices
less := func(a, b string) bool { return a < b }
// amount of events to export/replay
count := 5
t.Run("successful nats export", func(t *testing.T) {
cmd := exec.Command("nats-server")
if err := cmd.Start(); err != nil {
t.Fatalf("failed to start nats-server: %v", err)
}
time.Sleep(50 * time.Millisecond)
defer cmd.Process.Kill()
nc, err := nats.Connect("nats://localhost:4222", nats.Timeout(time.Second), nats.DrainTimeout(time.Second))
if err != nil {
t.Fatalf("failed to connect to nats-server: %v", err)
}
defer nc.Drain()
ch := make(chan *nats.Msg, count)
sub, err := nc.ChanQueueSubscribe("processed_cdrs", "", ch)
if err != nil {
t.Fatalf("failed to subscribe to nats queue: %v", err)
}
var reply map[string]map[string]any
for i := range count {
if err := client.Call(context.Background(), utils.EeSv1ProcessEvent, &engine.CGREventWithEeIDs{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "test",
Event: map[string]any{
utils.CGRID: i,
},
},
}, &reply); err != nil {
t.Errorf("EeSv1.ProcessEvent returned unexpected err: %v", err)
}
}
time.Sleep(1 * time.Millisecond) // wait for the channel to receive the replayed exports
want := make([]string, 0, count)
for i := range count {
want = append(want, fmt.Sprintf(`{"CGRID":"%d"}`, i))
}
if err := sub.Unsubscribe(); err != nil {
t.Errorf("failed to unsubscribe from nats subject: %v", err)
}
close(ch)
got := make([]string, 0, count)
for elem := range ch {
got = append(got, string(elem.Data))
}
if diff := cmp.Diff(want, got, cmpopts.SortSlices(less)); diff != "" {
t.Errorf("unexpected nats messages received over channel (-want +got): \n%s", diff)
}
})
t.Run("replay failed nats export", func(t *testing.T) {
var exportReply map[string]map[string]any
for i := range count {
err := client.Call(context.Background(), utils.EeSv1ProcessEvent,
&engine.CGREventWithEeIDs{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "test",
Event: map[string]any{
utils.CGRID: i,
},
},
}, &exportReply)
if err == nil || err.Error() != utils.ErrPartiallyExecuted.Error() {
t.Errorf("EeSv1.ProcessEvent err = %v, want %v", err, utils.ErrPartiallyExecuted)
}
}
time.Sleep(5 * time.Millisecond)
replayFailedDir := t.TempDir()
var replayReply string
if err := client.Call(context.Background(), utils.APIerSv1ReplayFailedPosts, v1.ReplayFailedPostsParams{
SourcePath: failedDir,
FailedPath: replayFailedDir,
Modules: []string{"test", "EEs"},
}, &replayReply); err != nil {
t.Errorf("APIerSv1.ReplayFailedPosts returned unexpected err: %v", err)
}
cmd := exec.Command("nats-server")
if err := cmd.Start(); err != nil {
t.Fatalf("failed to start nats-server: %v", err)
}
time.Sleep(50 * time.Millisecond)
defer cmd.Process.Kill()
nc, err := nats.Connect("nats://localhost:4222", nats.Timeout(time.Second), nats.DrainTimeout(time.Second))
if err != nil {
t.Fatalf("failed to connect to nats-server: %v", err)
}
defer nc.Drain()
ch := make(chan *nats.Msg, count)
sub, err := nc.ChanQueueSubscribe("processed_cdrs", "", ch)
if err != nil {
t.Fatalf("failed to subscribe to nats queue: %v", err)
}
if err := client.Call(context.Background(), utils.APIerSv1ReplayFailedPosts, v1.ReplayFailedPostsParams{
SourcePath: replayFailedDir,
FailedPath: utils.MetaNone,
Modules: []string{"test", "EEs"},
}, &replayReply); err != nil {
t.Errorf("APIerSv1.ReplayFailedPosts returned unexpected err: %v", err)
}
time.Sleep(time.Millisecond) // wait for the channel to receive the replayed exports
want := make([]string, 0, count)
for i := range count {
want = append(want, fmt.Sprintf(`{"CGRID":"%d"}`, i))
}
if err := sub.Unsubscribe(); err != nil {
t.Errorf("failed to unsubscribe from nats subject: %v", err)
}
close(ch)
got := make([]string, 0, count)
for elem := range ch {
got = append(got, string(elem.Data))
}
if diff := cmp.Diff(want, got, cmpopts.SortSlices(less)); diff != "" {
t.Errorf("unexpected nats messages received over channel (-want +got): \n%s", diff)
}
})
}

View File

@@ -136,7 +136,7 @@ func testPosterReadFolder(format string) (expEv *ees.ExportEvents, err error) {
if err != nil {
return
}
if expEv.Format == format {
if expEv.Type == format {
return
}
}