diff --git a/ees/filecsv_test.go b/ees/filecsv_test.go index 735b9e122..d20f5ac10 100644 --- a/ees/filecsv_test.go +++ b/ees/filecsv_test.go @@ -23,7 +23,9 @@ import ( "encoding/csv" "io" "reflect" + "sync" "testing" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -340,3 +342,76 @@ func TestFileCsvOnEvictedClose(t *testing.T) { cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ComputeFields() fCsv.OnEvicted("test", "test") } + +type mockCsv struct { + wg *sync.WaitGroup +} + +func (mc *mockCsv) Close() error { return nil } +func (mc *mockCsv) Write(s []byte) (n int, err error) { + time.Sleep(3 * time.Second) + mc.wg.Done() + return 0, nil +} + +func TestFileCSVSync(t *testing.T) { + //Create new exporter + cgrCfg := config.NewDefaultCGRConfig() + var cfgIdx int + cfgIdx = 0 + + cgrCfg.EEsCfg().Exporters[cfgIdx].Type = "*file_csv" + dc, err := newEEMetrics(utils.FirstNonEmpty( + cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone, + cgrCfg.GeneralCfg().DefaultTimezone)) + if err != nil { + t.Error(err) + } + + //Create an event + cgrEvent := &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + "Account": "1001", + "Destination": "1002", + }, + } + + var wg1 = &sync.WaitGroup{} + + wg1.Add(3) + + test := make(chan struct{}) + go func() { + wg1.Wait() + close(test) + }() + mckCsv := &mockCsv{ + wg: wg1, + } + exp := &FileCSVee{ + id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, + cgrCfg: cgrCfg, + cfgIdx: cfgIdx, + filterS: new(engine.FilterS), + file: mckCsv, + csvWriter: csv.NewWriter(mckCsv), + dc: dc, + reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests), + } + + for i := 0; i < 3; i++ { + go func() { + exp.ExportEvent(cgrEvent) + exp.csvWriter.Flush() + }() + } + // exp.ExportEvent(cgrEvent) + + select { + case <-test: + return + case <-time.After(4 * time.Second): + t.Error("Can't asynchronously export events") + } +} diff --git a/ees/filefwv_test.go b/ees/filefwv_test.go index 321eb300d..b22091278 100644 --- a/ees/filefwv_test.go +++ b/ees/filefwv_test.go @@ -23,7 +23,9 @@ import ( "encoding/csv" "io" "reflect" + "sync" "testing" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -455,3 +457,124 @@ func TestFileFwvOnEvictedClose(t *testing.T) { cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].ComputeFields() fFwv.OnEvicted("test", "test") } + +func TestFileFWVSync(t *testing.T) { + //Create new exporter + cgrCfg := config.NewDefaultCGRConfig() + var cfgIdx int + cfgIdx = 0 + + cgrCfg.EEsCfg().Exporters[cfgIdx].Type = "*file_fwv" + dc, err := newEEMetrics(utils.FirstNonEmpty( + cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone, + cgrCfg.GeneralCfg().DefaultTimezone)) + if err != nil { + t.Error(err) + } + + //Create an event + cgrEvent := &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + "Account": "1001", + "Destination": "1002", + }, + } + + var wg1 = &sync.WaitGroup{} + + wg1.Add(3) + + test := make(chan struct{}) + go func() { + wg1.Wait() + close(test) + }() + mckCsv := &mockCsv{ + wg: wg1, + } + exp := &FileFWVee{ + id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, + cgrCfg: cgrCfg, + cfgIdx: cfgIdx, + filterS: new(engine.FilterS), + file: mckCsv, + dc: dc, + reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests), + } + + for i := 0; i < 3; i++ { + go func() { + exp.ExportEvent(cgrEvent) + }() + } + // exp.ExportEvent(cgrEvent) + + select { + case <-test: + return + case <-time.After(4 * time.Second): + t.Error("Can't asynchronously export events") + } +} + +func TestFileFWVSyncLimit(t *testing.T) { + //Create new exporter + cgrCfg := config.NewDefaultCGRConfig() + var cfgIdx int + cfgIdx = 0 + + cgrCfg.EEsCfg().Exporters[cfgIdx].Type = "*file_fwv" + cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests = 1 + dc, err := newEEMetrics(utils.FirstNonEmpty( + cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone, + cgrCfg.GeneralCfg().DefaultTimezone)) + if err != nil { + t.Error(err) + } + + //Create an event + cgrEvent := &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + "Account": "1001", + "Destination": "1002", + }, + } + + var wg1 = &sync.WaitGroup{} + + wg1.Add(3) + + test := make(chan struct{}) + go func() { + wg1.Wait() + close(test) + }() + mckCsv := &mockCsv{ + wg: wg1, + } + exp := &FileFWVee{ + id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, + cgrCfg: cgrCfg, + cfgIdx: cfgIdx, + filterS: new(engine.FilterS), + file: mckCsv, + dc: dc, + reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests), + } + + for i := 0; i < 3; i++ { + go func() { + exp.ExportEvent(cgrEvent) + }() + } + // exp.ExportEvent(cgrEvent) + + select { + case <-test: + t.Error("Should not have been possible to asynchronously export events") + case <-time.After(4 * time.Second): + return + } +}