mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Tested if we can send multiple concurrent requests withiut locking on HttpJSONMap, HttpPost and PosterJSONMap, also tested the limit as well
This commit is contained in:
committed by
Dan Christian Bogos
parent
c6845687fa
commit
635d875ca9
@@ -23,7 +23,9 @@ import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -347,3 +349,124 @@ func TestHttpJsonMapComposeHeader(t *testing.T) {
|
||||
t.Errorf("Expected %q but received %q", errExpect, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHttpJsonMapSync(t *testing.T) {
|
||||
//Create new exporter
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
var cfgIdx int
|
||||
cfgIdx = 0
|
||||
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Type = "*http_json_map"
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone,
|
||||
cgrCfg.GeneralCfg().DefaultTimezone))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
//Create an event
|
||||
cgrEvent := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
Event: map[string]interface{}{
|
||||
"Account": "1001",
|
||||
"Destination": "1002",
|
||||
},
|
||||
}
|
||||
var wg1 sync.WaitGroup
|
||||
|
||||
wg1.Add(3)
|
||||
|
||||
test := make(chan struct{})
|
||||
go func() {
|
||||
wg1.Wait()
|
||||
close(test)
|
||||
}()
|
||||
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
||||
// fmt.Println("2")
|
||||
time.Sleep(3 * time.Second)
|
||||
wg1.Done()
|
||||
}))
|
||||
|
||||
defer ts.Close()
|
||||
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath = ts.URL
|
||||
|
||||
exp, err := NewHTTPjsonMapEE(cgrCfg, cfgIdx, new(engine.FilterS), dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
go exp.ExportEvent(cgrEvent)
|
||||
}
|
||||
// exp.ExportEvent(cgrEvent)
|
||||
|
||||
select {
|
||||
case <-test:
|
||||
return
|
||||
case <-time.After(4 * time.Second):
|
||||
t.Error("Can't asynchronously export events")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHttpJsonMapSyncLimit(t *testing.T) {
|
||||
//Create new exporter
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
var cfgIdx int
|
||||
cfgIdx = 0
|
||||
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Type = "*http_json_map"
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests = 1
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone,
|
||||
cgrCfg.GeneralCfg().DefaultTimezone))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
//Create an event
|
||||
cgrEvent := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
Event: map[string]interface{}{
|
||||
"Account": "1001",
|
||||
"Destination": "1002",
|
||||
},
|
||||
}
|
||||
var wg1 sync.WaitGroup
|
||||
|
||||
wg1.Add(3)
|
||||
|
||||
test := make(chan struct{})
|
||||
go func() {
|
||||
wg1.Wait()
|
||||
close(test)
|
||||
}()
|
||||
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
||||
// fmt.Println("2")
|
||||
time.Sleep(3 * time.Second)
|
||||
wg1.Done()
|
||||
}))
|
||||
|
||||
defer ts.Close()
|
||||
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath = ts.URL
|
||||
|
||||
exp, err := NewHTTPjsonMapEE(cgrCfg, cfgIdx, new(engine.FilterS), dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
go exp.ExportEvent(cgrEvent)
|
||||
}
|
||||
// exp.ExportEvent(cgrEvent)
|
||||
|
||||
select {
|
||||
case <-test:
|
||||
t.Error("Should not have been possible to asynchronously export events")
|
||||
case <-time.After(4 * time.Second):
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,7 +23,9 @@ import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -301,3 +303,126 @@ func TestHttpPostComposeHeader(t *testing.T) {
|
||||
t.Errorf("Expected %q but received %q", errExpect, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHttpPostSync(t *testing.T) {
|
||||
//Create new exporter
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
var cfgIdx int
|
||||
cfgIdx = 0
|
||||
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Type = "*http_post"
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone,
|
||||
cgrCfg.GeneralCfg().DefaultTimezone))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
//Create an event
|
||||
cgrEvent := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
Event: map[string]interface{}{
|
||||
"Account": "1001",
|
||||
"Destination": "1002",
|
||||
},
|
||||
}
|
||||
var wg1 sync.WaitGroup
|
||||
|
||||
wg1.Add(3)
|
||||
|
||||
test := make(chan struct{})
|
||||
go func() {
|
||||
wg1.Wait()
|
||||
close(test)
|
||||
}()
|
||||
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
||||
// fmt.Println("2")
|
||||
time.Sleep(3 * time.Second)
|
||||
wg1.Done()
|
||||
}))
|
||||
|
||||
defer ts.Close()
|
||||
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath = ts.URL
|
||||
|
||||
exp, err := NewHTTPPostEe(cgrCfg, cfgIdx, new(engine.FilterS), dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
go exp.ExportEvent(cgrEvent)
|
||||
}
|
||||
// exp.ExportEvent(cgrEvent)
|
||||
|
||||
select {
|
||||
case <-test:
|
||||
return
|
||||
case <-time.After(4 * time.Second):
|
||||
t.Error("Can't asynchronously export events")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHttpPostSyncLimit(t *testing.T) {
|
||||
//Create new exporter
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
var cfgIdx int
|
||||
cfgIdx = 0
|
||||
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Type = "*http_post"
|
||||
|
||||
// We set the limit of events to be exported lower than the amount of events we asynchronously want to export
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests = 1
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone,
|
||||
cgrCfg.GeneralCfg().DefaultTimezone))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
//Create an event
|
||||
cgrEvent := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
Event: map[string]interface{}{
|
||||
"Account": "1001",
|
||||
"Destination": "1002",
|
||||
},
|
||||
}
|
||||
var wg1 sync.WaitGroup
|
||||
|
||||
wg1.Add(3)
|
||||
|
||||
test := make(chan struct{})
|
||||
go func() {
|
||||
wg1.Wait()
|
||||
close(test)
|
||||
}()
|
||||
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
||||
// fmt.Println("2")
|
||||
time.Sleep(3 * time.Second)
|
||||
wg1.Done()
|
||||
}))
|
||||
|
||||
defer ts.Close()
|
||||
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath = ts.URL
|
||||
|
||||
exp, err := NewHTTPPostEe(cgrCfg, cfgIdx, new(engine.FilterS), dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
go exp.ExportEvent(cgrEvent)
|
||||
}
|
||||
// exp.ExportEvent(cgrEvent)
|
||||
|
||||
select {
|
||||
case <-test:
|
||||
t.Error("Should not have been possible to asynchronously export events")
|
||||
case <-time.After(4 * time.Second):
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,9 @@ package ees
|
||||
import (
|
||||
"encoding/json"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -356,3 +358,137 @@ func TestPosterJsonMapExportEvent3(t *testing.T) {
|
||||
}
|
||||
pstrEE.OnEvicted("test", "test")
|
||||
}
|
||||
|
||||
type mockPoster struct {
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
func (mp mockPoster) Post(body []byte, key string) error {
|
||||
// resp, err := http.Get(mp.url)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// defer resp.Body.Close()
|
||||
time.Sleep(3 * time.Second)
|
||||
mp.wg.Done()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mockPoster) Close() {
|
||||
return
|
||||
}
|
||||
|
||||
func TestPosterJsonMapSync(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
var cfgIdx int
|
||||
cfgIdx = 0
|
||||
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Type = "*http_json_map"
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone,
|
||||
cgrCfg.GeneralCfg().DefaultTimezone))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
//Create an event
|
||||
cgrEvent := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
Event: map[string]interface{}{
|
||||
"Account": "1001",
|
||||
"Destination": "1002",
|
||||
},
|
||||
}
|
||||
|
||||
var wg1 = &sync.WaitGroup{}
|
||||
|
||||
wg1.Add(3)
|
||||
|
||||
test := make(chan struct{})
|
||||
go func() {
|
||||
wg1.Wait()
|
||||
close(test)
|
||||
}()
|
||||
|
||||
mckPoster := mockPoster{
|
||||
wg: wg1,
|
||||
}
|
||||
exp := &PosterJSONMapEE{
|
||||
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
|
||||
cgrCfg: cgrCfg,
|
||||
cfgIdx: cfgIdx,
|
||||
filterS: new(engine.FilterS),
|
||||
poster: mckPoster,
|
||||
dc: dc,
|
||||
reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests),
|
||||
}
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
go exp.ExportEvent(cgrEvent)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-test:
|
||||
return
|
||||
case <-time.After(4 * time.Second):
|
||||
t.Error("Can't asynchronously export events")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosterJsonMapSyncLimit(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
var cfgIdx int
|
||||
cfgIdx = 0
|
||||
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Type = "*http_json_map"
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests = 1
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone,
|
||||
cgrCfg.GeneralCfg().DefaultTimezone))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
//Create an event
|
||||
cgrEvent := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
Event: map[string]interface{}{
|
||||
"Account": "1001",
|
||||
"Destination": "1002",
|
||||
},
|
||||
}
|
||||
|
||||
var wg1 = &sync.WaitGroup{}
|
||||
|
||||
wg1.Add(3)
|
||||
|
||||
test := make(chan struct{})
|
||||
go func() {
|
||||
wg1.Wait()
|
||||
close(test)
|
||||
}()
|
||||
|
||||
mckPoster := mockPoster{
|
||||
wg: wg1,
|
||||
}
|
||||
exp := &PosterJSONMapEE{
|
||||
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
|
||||
cgrCfg: cgrCfg,
|
||||
cfgIdx: cfgIdx,
|
||||
filterS: new(engine.FilterS),
|
||||
poster: mckPoster,
|
||||
dc: dc,
|
||||
reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests),
|
||||
}
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
go exp.ExportEvent(cgrEvent)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-test:
|
||||
t.Error("Should not have been possible to asynchronously export events")
|
||||
case <-time.After(4 * time.Second):
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user