diff --git a/ees/filecsv_test.go b/ees/filecsv_test.go index 4b3f13766..b44c14d4b 100644 --- a/ees/filecsv_test.go +++ b/ees/filecsv_test.go @@ -349,7 +349,6 @@ type mockCsv struct { func (mc *mockCsv) Close() error { return nil } func (mc *mockCsv) Write(s []byte) (n int, err error) { - // fmt.Println(string(s)) time.Sleep(3 * time.Second) mc.wg.Done() return 0, nil @@ -407,7 +406,6 @@ func TestFileCSVSync(t *testing.T) { exp.csvWriter.Flush() }() } - // exp.ExportEvent(cgrEvent) select { case <-test: @@ -416,66 +414,3 @@ func TestFileCSVSync(t *testing.T) { t.Error("Can't asynchronously export events") } } - -// func TestFileCSVSyncLimit(t *testing.T) { -// //Create new exporter -// cgrCfg := config.NewDefaultCGRConfig() -// var cfgIdx int -// cfgIdx = 0 - -// cgrCfg.EEsCfg().Exporters[cfgIdx].Type = "*file_csv" -// cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests = 1 -// dc, err := newEEMetrics(utils.FirstNonEmpty( -// cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone, -// cgrCfg.GeneralCfg().DefaultTimezone)) -// if err != nil { -// t.Error(err) -// } - -// //Create an event -// cgrEvent := &utils.CGREvent{ -// Tenant: "cgrates.org", -// Event: map[string]interface{}{ -// "Account": "1001", -// "Destination": "1002", -// }, -// } - -// var wg1 = &sync.WaitGroup{} - -// wg1.Add(3) - -// test := make(chan struct{}) -// go func() { -// wg1.Wait() -// close(test) -// }() -// mckCsv := &mockCsv{ -// wg: wg1, -// } -// exp := &FileCSVee{ -// id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, -// cgrCfg: cgrCfg, -// cfgIdx: cfgIdx, -// filterS: new(engine.FilterS), -// file: mckCsv, -// csvWriter: csv.NewWriter(mckCsv), -// dc: dc, -// reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests), -// } - -// for i := 0; i < 3; i++ { -// go func() { -// exp.ExportEvent(cgrEvent) -// exp.csvWriter.Flush() -// }() -// } -// // exp.ExportEvent(cgrEvent) - -// select { -// case <-test: -// t.Error("Should not have been possible to asynchronously export events") -// case <-time.After(4 * time.Second): -// return -// } -// } diff --git a/ees/filefwv_test.go b/ees/filefwv_test.go index 321eb300d..5a7fd2fee 100644 --- a/ees/filefwv_test.go +++ b/ees/filefwv_test.go @@ -23,7 +23,9 @@ import ( "encoding/csv" "io" "reflect" + "sync" "testing" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -128,6 +130,64 @@ func TestFileFwvComposeHeader(t *testing.T) { } } +func TestFileFWVSyncLimit(t *testing.T) { + //Create new exporter + cgrCfg := config.NewDefaultCGRConfig() + var cfgIdx int + cfgIdx = 0 + + cgrCfg.EEsCfg().Exporters[cfgIdx].Type = utils.MetaFileFWV + cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests = 1 + dc, err := newEEMetrics(utils.FirstNonEmpty( + cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone, + cgrCfg.GeneralCfg().DefaultTimezone)) + if err != nil { + t.Error(err) + } + + //Create an event + cgrEvent := &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + "Account": "1001", + "Destination": "1002", + }, + } + + var wg1 = &sync.WaitGroup{} + + wg1.Add(3) + + test := make(chan struct{}) + go func() { + wg1.Wait() + close(test) + }() + mckCsv := &mockCsv{ + wg: wg1, + } + exp := &FileFWVee{ + id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, + cgrCfg: cgrCfg, + cfgIdx: cfgIdx, + filterS: new(engine.FilterS), + file: mckCsv, + dc: dc, + reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests), + } + + for i := 0; i < 3; i++ { + go exp.ExportEvent(cgrEvent) + } + + select { + case <-test: + t.Error("Should not have been possible to asynchronously export events") + case <-time.After(4 * time.Second): + return + } +} + func TestFileFwvComposeTrailer(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() newIDb := engine.NewInternalDB(nil, nil, true) @@ -455,3 +515,60 @@ func TestFileFwvOnEvictedClose(t *testing.T) { cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].ComputeFields() fFwv.OnEvicted("test", "test") } + +func TestFileFWVSync(t *testing.T) { + //Create new exporter + cgrCfg := config.NewDefaultCGRConfig() + var cfgIdx int + cfgIdx = 0 + + cgrCfg.EEsCfg().Exporters[cfgIdx].Type = utils.MetaFileFWV + dc, err := newEEMetrics(utils.FirstNonEmpty( + cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone, + cgrCfg.GeneralCfg().DefaultTimezone)) + if err != nil { + t.Error(err) + } + + //Create an event + cgrEvent := &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + "Account": "1001", + "Destination": "1002", + }, + } + + var wg1 = &sync.WaitGroup{} + + wg1.Add(3) + + test := make(chan struct{}) + go func() { + wg1.Wait() + close(test) + }() + mckCsv := &mockCsv{ + wg: wg1, + } + exp := &FileFWVee{ + id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, + cgrCfg: cgrCfg, + cfgIdx: cfgIdx, + filterS: new(engine.FilterS), + file: mckCsv, + dc: dc, + reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests), + } + + for i := 0; i < 3; i++ { + go exp.ExportEvent(cgrEvent) + } + + select { + case <-test: + return + case <-time.After(4 * time.Second): + t.Error("Can't asynchronously export events") + } +} diff --git a/ees/httpjsonmap_test.go b/ees/httpjsonmap_test.go index 96934e952..6d309e5be 100644 --- a/ees/httpjsonmap_test.go +++ b/ees/httpjsonmap_test.go @@ -392,7 +392,7 @@ func TestHttpJsonMapSync(t *testing.T) { }() ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - // fmt.Println("2") + time.Sleep(3 * time.Second) wg1.Done() })) @@ -409,7 +409,6 @@ func TestHttpJsonMapSync(t *testing.T) { for i := 0; i < 3; i++ { go exp.ExportEvent(cgrEvent) } - // exp.ExportEvent(cgrEvent) select { case <-test: @@ -453,7 +452,6 @@ func TestHttpJsonMapSyncLimit(t *testing.T) { }() ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - // fmt.Println("2") time.Sleep(3 * time.Second) wg1.Done() })) @@ -470,7 +468,6 @@ func TestHttpJsonMapSyncLimit(t *testing.T) { for i := 0; i < 3; i++ { go exp.ExportEvent(cgrEvent) } - // exp.ExportEvent(cgrEvent) select { case <-test: diff --git a/ees/httppost_test.go b/ees/httppost_test.go index 376736412..4294af2cf 100644 --- a/ees/httppost_test.go +++ b/ees/httppost_test.go @@ -337,7 +337,6 @@ func TestHttpPostSync(t *testing.T) { }() ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - // fmt.Println("2") time.Sleep(3 * time.Second) wg1.Done() })) @@ -354,7 +353,6 @@ func TestHttpPostSync(t *testing.T) { for i := 0; i < 3; i++ { go exp.ExportEvent(cgrEvent) } - // exp.ExportEvent(cgrEvent) select { case <-test: @@ -398,7 +396,6 @@ func TestHttpPostSyncLimit(t *testing.T) { }() ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - // fmt.Println("2") time.Sleep(3 * time.Second) wg1.Done() })) @@ -415,7 +412,6 @@ func TestHttpPostSyncLimit(t *testing.T) { for i := 0; i < 3; i++ { go exp.ExportEvent(cgrEvent) } - // exp.ExportEvent(cgrEvent) select { case <-test: