From 635d875ca9a614fc3de134178ce81d9815c601c0 Mon Sep 17 00:00:00 2001 From: nickolasdaniel Date: Wed, 28 Jul 2021 17:21:03 +0300 Subject: [PATCH] Tested if we can send multiple concurrent requests withiut locking on HttpJSONMap, HttpPost and PosterJSONMap, also tested the limit as well --- ees/httpjsonmap_test.go | 123 ++++++++++++++++++++++++++++++++++ ees/httppost_test.go | 125 +++++++++++++++++++++++++++++++++++ ees/posterjsonmap_test.go | 136 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 384 insertions(+) diff --git a/ees/httpjsonmap_test.go b/ees/httpjsonmap_test.go index 048714177..20a9ca345 100644 --- a/ees/httpjsonmap_test.go +++ b/ees/httpjsonmap_test.go @@ -23,7 +23,9 @@ import ( "net/http" "net/http/httptest" "reflect" + "sync" "testing" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -347,3 +349,124 @@ func TestHttpJsonMapComposeHeader(t *testing.T) { t.Errorf("Expected %q but received %q", errExpect, err) } } + +func TestHttpJsonMapSync(t *testing.T) { + //Create new exporter + cgrCfg := config.NewDefaultCGRConfig() + var cfgIdx int + cfgIdx = 0 + + cgrCfg.EEsCfg().Exporters[cfgIdx].Type = "*http_json_map" + dc, err := newEEMetrics(utils.FirstNonEmpty( + cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone, + cgrCfg.GeneralCfg().DefaultTimezone)) + if err != nil { + t.Error(err) + } + + //Create an event + cgrEvent := &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + "Account": "1001", + "Destination": "1002", + }, + } + var wg1 sync.WaitGroup + + wg1.Add(3) + + test := make(chan struct{}) + go func() { + wg1.Wait() + close(test) + }() + + ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + // fmt.Println("2") + time.Sleep(3 * time.Second) + wg1.Done() + })) + + defer ts.Close() + + cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath = ts.URL + + exp, err := NewHTTPjsonMapEE(cgrCfg, cfgIdx, new(engine.FilterS), dc) + if err != nil { + t.Error(err) + } + + for i := 0; i < 3; i++ { + go exp.ExportEvent(cgrEvent) + } + // exp.ExportEvent(cgrEvent) + + select { + case <-test: + return + case <-time.After(4 * time.Second): + t.Error("Can't asynchronously export events") + } +} + +func TestHttpJsonMapSyncLimit(t *testing.T) { + //Create new exporter + cgrCfg := config.NewDefaultCGRConfig() + var cfgIdx int + cfgIdx = 0 + + cgrCfg.EEsCfg().Exporters[cfgIdx].Type = "*http_json_map" + cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests = 1 + dc, err := newEEMetrics(utils.FirstNonEmpty( + cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone, + cgrCfg.GeneralCfg().DefaultTimezone)) + if err != nil { + t.Error(err) + } + + //Create an event + cgrEvent := &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + "Account": "1001", + "Destination": "1002", + }, + } + var wg1 sync.WaitGroup + + wg1.Add(3) + + test := make(chan struct{}) + go func() { + wg1.Wait() + close(test) + }() + + ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + // fmt.Println("2") + time.Sleep(3 * time.Second) + wg1.Done() + })) + + defer ts.Close() + + cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath = ts.URL + + exp, err := NewHTTPjsonMapEE(cgrCfg, cfgIdx, new(engine.FilterS), dc) + if err != nil { + t.Error(err) + } + + for i := 0; i < 3; i++ { + go exp.ExportEvent(cgrEvent) + } + // exp.ExportEvent(cgrEvent) + + select { + case <-test: + t.Error("Should not have been possible to asynchronously export events") + case <-time.After(4 * time.Second): + return + } +} diff --git a/ees/httppost_test.go b/ees/httppost_test.go index e6618a5c4..c284508b5 100644 --- a/ees/httppost_test.go +++ b/ees/httppost_test.go @@ -23,7 +23,9 @@ import ( "net/http" "net/http/httptest" "reflect" + "sync" "testing" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -301,3 +303,126 @@ func TestHttpPostComposeHeader(t *testing.T) { t.Errorf("Expected %q but received %q", errExpect, err) } } + +func TestHttpPostSync(t *testing.T) { + //Create new exporter + cgrCfg := config.NewDefaultCGRConfig() + var cfgIdx int + cfgIdx = 0 + + cgrCfg.EEsCfg().Exporters[cfgIdx].Type = "*http_post" + dc, err := newEEMetrics(utils.FirstNonEmpty( + cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone, + cgrCfg.GeneralCfg().DefaultTimezone)) + if err != nil { + t.Error(err) + } + + //Create an event + cgrEvent := &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + "Account": "1001", + "Destination": "1002", + }, + } + var wg1 sync.WaitGroup + + wg1.Add(3) + + test := make(chan struct{}) + go func() { + wg1.Wait() + close(test) + }() + + ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + // fmt.Println("2") + time.Sleep(3 * time.Second) + wg1.Done() + })) + + defer ts.Close() + + cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath = ts.URL + + exp, err := NewHTTPPostEe(cgrCfg, cfgIdx, new(engine.FilterS), dc) + if err != nil { + t.Error(err) + } + + for i := 0; i < 3; i++ { + go exp.ExportEvent(cgrEvent) + } + // exp.ExportEvent(cgrEvent) + + select { + case <-test: + return + case <-time.After(4 * time.Second): + t.Error("Can't asynchronously export events") + } +} + +func TestHttpPostSyncLimit(t *testing.T) { + //Create new exporter + cgrCfg := config.NewDefaultCGRConfig() + var cfgIdx int + cfgIdx = 0 + + cgrCfg.EEsCfg().Exporters[cfgIdx].Type = "*http_post" + + // We set the limit of events to be exported lower than the amount of events we asynchronously want to export + cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests = 1 + dc, err := newEEMetrics(utils.FirstNonEmpty( + cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone, + cgrCfg.GeneralCfg().DefaultTimezone)) + if err != nil { + t.Error(err) + } + + //Create an event + cgrEvent := &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + "Account": "1001", + "Destination": "1002", + }, + } + var wg1 sync.WaitGroup + + wg1.Add(3) + + test := make(chan struct{}) + go func() { + wg1.Wait() + close(test) + }() + + ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + // fmt.Println("2") + time.Sleep(3 * time.Second) + wg1.Done() + })) + + defer ts.Close() + + cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath = ts.URL + + exp, err := NewHTTPPostEe(cgrCfg, cfgIdx, new(engine.FilterS), dc) + if err != nil { + t.Error(err) + } + + for i := 0; i < 3; i++ { + go exp.ExportEvent(cgrEvent) + } + // exp.ExportEvent(cgrEvent) + + select { + case <-test: + t.Error("Should not have been possible to asynchronously export events") + case <-time.After(4 * time.Second): + return + } +} diff --git a/ees/posterjsonmap_test.go b/ees/posterjsonmap_test.go index 9050e1b9e..2f12df1ca 100644 --- a/ees/posterjsonmap_test.go +++ b/ees/posterjsonmap_test.go @@ -21,7 +21,9 @@ package ees import ( "encoding/json" "reflect" + "sync" "testing" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -356,3 +358,137 @@ func TestPosterJsonMapExportEvent3(t *testing.T) { } pstrEE.OnEvicted("test", "test") } + +type mockPoster struct { + wg *sync.WaitGroup +} + +func (mp mockPoster) Post(body []byte, key string) error { + // resp, err := http.Get(mp.url) + // if err != nil { + // return err + // } + // defer resp.Body.Close() + time.Sleep(3 * time.Second) + mp.wg.Done() + return nil +} + +func (mockPoster) Close() { + return +} + +func TestPosterJsonMapSync(t *testing.T) { + cgrCfg := config.NewDefaultCGRConfig() + var cfgIdx int + cfgIdx = 0 + + cgrCfg.EEsCfg().Exporters[cfgIdx].Type = "*http_json_map" + dc, err := newEEMetrics(utils.FirstNonEmpty( + cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone, + cgrCfg.GeneralCfg().DefaultTimezone)) + if err != nil { + t.Error(err) + } + + //Create an event + cgrEvent := &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + "Account": "1001", + "Destination": "1002", + }, + } + + var wg1 = &sync.WaitGroup{} + + wg1.Add(3) + + test := make(chan struct{}) + go func() { + wg1.Wait() + close(test) + }() + + mckPoster := mockPoster{ + wg: wg1, + } + exp := &PosterJSONMapEE{ + id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, + cgrCfg: cgrCfg, + cfgIdx: cfgIdx, + filterS: new(engine.FilterS), + poster: mckPoster, + dc: dc, + reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests), + } + + for i := 0; i < 3; i++ { + go exp.ExportEvent(cgrEvent) + } + + select { + case <-test: + return + case <-time.After(4 * time.Second): + t.Error("Can't asynchronously export events") + } +} + +func TestPosterJsonMapSyncLimit(t *testing.T) { + cgrCfg := config.NewDefaultCGRConfig() + var cfgIdx int + cfgIdx = 0 + + cgrCfg.EEsCfg().Exporters[cfgIdx].Type = "*http_json_map" + cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests = 1 + dc, err := newEEMetrics(utils.FirstNonEmpty( + cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone, + cgrCfg.GeneralCfg().DefaultTimezone)) + if err != nil { + t.Error(err) + } + + //Create an event + cgrEvent := &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + "Account": "1001", + "Destination": "1002", + }, + } + + var wg1 = &sync.WaitGroup{} + + wg1.Add(3) + + test := make(chan struct{}) + go func() { + wg1.Wait() + close(test) + }() + + mckPoster := mockPoster{ + wg: wg1, + } + exp := &PosterJSONMapEE{ + id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, + cgrCfg: cgrCfg, + cfgIdx: cfgIdx, + filterS: new(engine.FilterS), + poster: mckPoster, + dc: dc, + reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests), + } + + for i := 0; i < 3; i++ { + go exp.ExportEvent(cgrEvent) + } + + select { + case <-test: + t.Error("Should not have been possible to asynchronously export events") + case <-time.After(4 * time.Second): + return + } +}