mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
ers: remove redundant concurrency channel init
This commit is contained in:
committed by
Dan Christian Bogos
parent
fd08eed5d4
commit
8037076ded
@@ -46,9 +46,6 @@ func NewAMQPv1ER(cfg *config.CGRConfig, cfgIdx int,
|
||||
}
|
||||
if concReq := rdr.Config().ConcurrentReqs; concReq != -1 {
|
||||
rdr.cap = make(chan struct{}, concReq)
|
||||
for i := 0; i < concReq; i++ {
|
||||
rdr.cap <- struct{}{}
|
||||
}
|
||||
}
|
||||
if rdr.Config().Opts.AMQPQueueID != nil {
|
||||
rdr.queueID = "/" + *rdr.Config().Opts.AMQPQueueID
|
||||
@@ -125,7 +122,7 @@ func (rdr *AMQPv1ER) readLoop(recv *amqpv1.Receiver) (err error) {
|
||||
}
|
||||
for {
|
||||
if rdr.Config().ConcurrentReqs != -1 {
|
||||
<-rdr.cap // do not try to read if the limit is reached
|
||||
rdr.cap <- struct{}{}
|
||||
}
|
||||
ctx := context.Background()
|
||||
var msg *amqpv1.Message
|
||||
@@ -152,7 +149,7 @@ func (rdr *AMQPv1ER) readLoop(recv *amqpv1.Receiver) (err error) {
|
||||
utils.ERs, err.Error()))
|
||||
}
|
||||
if rdr.Config().ConcurrentReqs != -1 {
|
||||
rdr.cap <- struct{}{}
|
||||
<-rdr.cap
|
||||
}
|
||||
}(msg)
|
||||
}
|
||||
|
||||
@@ -50,11 +50,9 @@ func NewCSVFileER(cfg *config.CGRConfig, cfgIdx int,
|
||||
partialEvents: partialEvents,
|
||||
rdrError: rdrErr,
|
||||
rdrExit: rdrExit,
|
||||
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)}
|
||||
var processFile struct{}
|
||||
for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ {
|
||||
csvEr.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop
|
||||
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs),
|
||||
}
|
||||
|
||||
return csvEr, nil
|
||||
}
|
||||
|
||||
@@ -126,9 +124,9 @@ func (rdr *CSVFileER) Serve() (err error) {
|
||||
|
||||
// processFile is called for each file in a directory and dispatches erEvents from it
|
||||
func (rdr *CSVFileER) processFile(fName string) (err error) {
|
||||
if cap(rdr.conReqs) != 0 { // 0 goes for no limit
|
||||
processFile := <-rdr.conReqs // Queue here for maxOpenFiles
|
||||
defer func() { rdr.conReqs <- processFile }()
|
||||
if cap(rdr.conReqs) != 0 {
|
||||
rdr.conReqs <- struct{}{}
|
||||
defer func() { <-rdr.conReqs }()
|
||||
}
|
||||
absPath := path.Join(rdr.sourceDir, fName)
|
||||
utils.Logger.Info(
|
||||
|
||||
@@ -51,11 +51,9 @@ func NewFWVFileER(cfg *config.CGRConfig, cfgIdx int,
|
||||
partialEvents: partialEvents,
|
||||
rdrError: rdrErr,
|
||||
rdrExit: rdrExit,
|
||||
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)}
|
||||
var processFile struct{}
|
||||
for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ {
|
||||
fwvER.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop
|
||||
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs),
|
||||
}
|
||||
|
||||
return fwvER, nil
|
||||
}
|
||||
|
||||
@@ -134,9 +132,9 @@ func (rdr *FWVFileER) Serve() (err error) {
|
||||
|
||||
// processFile is called for each file in a directory and dispatches erEvents from it
|
||||
func (rdr *FWVFileER) processFile(fName string) (err error) {
|
||||
if cap(rdr.conReqs) != 0 { // 0 goes for no limit
|
||||
processFile := <-rdr.conReqs // Queue here for maxOpenFiles
|
||||
defer func() { rdr.conReqs <- processFile }()
|
||||
if cap(rdr.conReqs) != 0 {
|
||||
rdr.conReqs <- struct{}{}
|
||||
defer func() { <-rdr.conReqs }()
|
||||
}
|
||||
absPath := path.Join(rdr.sourceDir, fName)
|
||||
utils.Logger.Info(
|
||||
|
||||
@@ -291,7 +291,6 @@ func TestFileFWVProcessEvent(t *testing.T) {
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
fname := "file1.fwv"
|
||||
errExpect := "unsupported field prefix: <> when set fields"
|
||||
eR.Config().Fields = []*config.FCTemplate{
|
||||
@@ -356,7 +355,6 @@ func TestFileFWV(t *testing.T) {
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
filePath := "/tmp/fwvErs/out"
|
||||
err := os.MkdirAll(filePath, 0777)
|
||||
if err != nil {
|
||||
@@ -391,7 +389,6 @@ func TestFileFWVServeDefault(t *testing.T) {
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
filePath := "/tmp/fwvErs/out"
|
||||
err := os.MkdirAll(filePath, 0777)
|
||||
if err != nil {
|
||||
@@ -427,7 +424,6 @@ func TestFileFWVExit(t *testing.T) {
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
eR.Config().RunDelay = 1 * time.Millisecond
|
||||
if err := eR.Serve(); err != nil {
|
||||
t.Error(err)
|
||||
@@ -459,7 +455,6 @@ dm := engine.NewDataManager(dbCM, cfg.CacheCfg(), nil)
|
||||
},
|
||||
APIOpts: map[string]any{},
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
filePath := "/tmp/TestFileFWVProcessTrailer/"
|
||||
if err := os.MkdirAll(filePath, 0777); err != nil {
|
||||
t.Error(err)
|
||||
@@ -512,7 +507,6 @@ dm := engine.NewDataManager(dbCM, cfg.CacheCfg(), nil)
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
filePath := "/tmp/TestFileFWVProcessTrailer/"
|
||||
if err := os.MkdirAll(filePath, 0777); err != nil {
|
||||
t.Error(err)
|
||||
@@ -549,7 +543,6 @@ dm := engine.NewDataManager(dbCM, cfg.CacheCfg(), nil)
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
eR.Config().Tenant = utils.RSRParsers{
|
||||
{
|
||||
Rules: "cgrates.org",
|
||||
@@ -597,7 +590,6 @@ dm := engine.NewDataManager(dbCM, cfg.CacheCfg(), nil)
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
trailerFields := []*config.FCTemplate{
|
||||
{
|
||||
Tag: "OriginId",
|
||||
@@ -629,7 +621,6 @@ dm := engine.NewDataManager(dbCM, cfg.CacheCfg(), nil)
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
expEvent := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
Event: map[string]any{
|
||||
@@ -678,7 +669,6 @@ dm := engine.NewDataManager(dbCM, cfg.CacheCfg(), nil)
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
trailerFields := []*config.FCTemplate{
|
||||
{},
|
||||
}
|
||||
@@ -705,7 +695,6 @@ dm := engine.NewDataManager(dbCM, cfg.CacheCfg(), nil)
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
record := "testRecord"
|
||||
trailerFields := []*config.FCTemplate{
|
||||
{
|
||||
|
||||
@@ -51,11 +51,9 @@ func NewJSONFileER(cfg *config.CGRConfig, cfgIdx int,
|
||||
partialEvents: partialEvents,
|
||||
rdrError: rdrErr,
|
||||
rdrExit: rdrExit,
|
||||
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)}
|
||||
var processFile struct{}
|
||||
for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ {
|
||||
jsonEr.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop
|
||||
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs),
|
||||
}
|
||||
|
||||
return jsonEr, nil
|
||||
}
|
||||
|
||||
@@ -127,9 +125,9 @@ func (rdr *JSONFileER) Serve() (err error) {
|
||||
|
||||
// processFile is called for each file in a directory and dispatches erEvents from it
|
||||
func (rdr *JSONFileER) processFile(fName string) (err error) {
|
||||
if cap(rdr.conReqs) != 0 { // 0 goes for no limit
|
||||
processFile := <-rdr.conReqs // Queue here for maxOpenFiles
|
||||
defer func() { rdr.conReqs <- processFile }()
|
||||
if cap(rdr.conReqs) != 0 {
|
||||
rdr.conReqs <- struct{}{}
|
||||
defer func() { <-rdr.conReqs }()
|
||||
}
|
||||
absPath := path.Join(rdr.sourceDir, fName)
|
||||
utils.Logger.Info(
|
||||
|
||||
@@ -52,10 +52,7 @@ func NewXMLFileER(cfg *config.CGRConfig, cfgIdx int,
|
||||
partialEvents: partialEvents,
|
||||
rdrError: rdrErr,
|
||||
rdrExit: rdrExit,
|
||||
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)}
|
||||
var processFile struct{}
|
||||
for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ {
|
||||
xmlER.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop
|
||||
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs),
|
||||
}
|
||||
return xmlER, nil
|
||||
}
|
||||
@@ -148,9 +145,9 @@ func (rdr *XMLFileER) Serve() (err error) {
|
||||
|
||||
// processFile is called for each file in a directory and dispatches erEvents from it
|
||||
func (rdr *XMLFileER) processFile(fName string) error {
|
||||
if cap(rdr.conReqs) != 0 { // 0 goes for no limit
|
||||
processFile := <-rdr.conReqs // Queue here for maxOpenFiles
|
||||
defer func() { rdr.conReqs <- processFile }()
|
||||
if cap(rdr.conReqs) != 0 {
|
||||
rdr.conReqs <- struct{}{}
|
||||
defer func() { <-rdr.conReqs }()
|
||||
}
|
||||
absPath := path.Join(rdr.sourceDir, fName)
|
||||
utils.Logger.Info(
|
||||
|
||||
@@ -364,16 +364,9 @@ func TestNewXMLFileER(t *testing.T) {
|
||||
rdrEvents: nil,
|
||||
rdrError: nil,
|
||||
rdrExit: nil,
|
||||
conReqs: make(chan struct{}, 1),
|
||||
conReqs: nil,
|
||||
}
|
||||
var value struct{}
|
||||
expEr.conReqs <- value
|
||||
eR, err := NewXMLFileER(cfg, 0, nil, nil, nil, fltrs, nil)
|
||||
expConReq := make(chan struct{}, 1)
|
||||
expConReq <- struct{}{}
|
||||
if <-expConReq != <-eR.(*XMLFileER).conReqs {
|
||||
t.Errorf("Expected %v but received %v", <-expConReq, <-eR.(*XMLFileER).conReqs)
|
||||
}
|
||||
expEr.conReqs = nil
|
||||
eR.(*XMLFileER).conReqs = nil
|
||||
if err != nil {
|
||||
@@ -433,7 +426,6 @@ func TestFileXMLProcessEvent(t *testing.T) {
|
||||
|
||||
eR.Config().Fields[0].ComputePath()
|
||||
|
||||
eR.conReqs <- struct{}{}
|
||||
fileName := "file1.xml"
|
||||
if err := eR.processFile(fileName); err != nil {
|
||||
t.Error(err)
|
||||
@@ -474,7 +466,6 @@ func TestFileXMLProcessEventError1(t *testing.T) {
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
errExpect := "open /tmp/TestFileXMLProcessEvent/file1.xml: no such file or directory"
|
||||
if err := eR.processFile(fname); err == nil || err.Error() != errExpect {
|
||||
t.Errorf("Expected %v but received %v", errExpect, err)
|
||||
@@ -521,7 +512,6 @@ func TestFileXMLProcessEVentError2(t *testing.T) {
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
eR.Config().Tenant = utils.RSRParsers{
|
||||
{
|
||||
Rules: "test",
|
||||
@@ -584,7 +574,6 @@ func TestFileXMLProcessEVentError3(t *testing.T) {
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
|
||||
eR.Config().Fields = []*config.FCTemplate{
|
||||
{
|
||||
@@ -638,7 +627,6 @@ func TestFileXMLProcessEventParseError(t *testing.T) {
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
|
||||
fileName := "file1.xml"
|
||||
errExpect := "XML syntax error on line 2: unexpected EOF"
|
||||
@@ -663,7 +651,6 @@ func TestFileXML(t *testing.T) {
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
err := os.MkdirAll(eR.sourceDir, 0777)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
@@ -716,7 +703,6 @@ func TestFileXMLError(t *testing.T) {
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
err := os.MkdirAll(eR.sourceDir, 0777)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
@@ -759,7 +745,6 @@ func TestFileXMLExit(t *testing.T) {
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
eR.Config().RunDelay = 1 * time.Millisecond
|
||||
if err := eR.Serve(); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
@@ -50,9 +50,6 @@ func NewKafkaER(cfg *config.CGRConfig, cfgIdx int, rdrEvents, partialEvents chan
|
||||
}
|
||||
if concReq := rdr.Config().ConcurrentReqs; concReq != -1 {
|
||||
rdr.cap = make(chan struct{}, concReq)
|
||||
for i := 0; i < concReq; i++ {
|
||||
rdr.cap <- struct{}{}
|
||||
}
|
||||
}
|
||||
rdr.dialURL = rdr.Config().SourcePath
|
||||
if err := rdr.setOpts(rdr.Config().Opts); err != nil {
|
||||
@@ -154,7 +151,7 @@ func (rdr *KafkaER) readLoop(r *kafka.Reader) {
|
||||
}
|
||||
for {
|
||||
if rdr.Config().ConcurrentReqs != -1 {
|
||||
<-rdr.cap // do not try to read if the limit is reached
|
||||
rdr.cap <- struct{}{}
|
||||
}
|
||||
msg, err := r.ReadMessage(context.Background())
|
||||
if err != nil {
|
||||
@@ -174,7 +171,7 @@ func (rdr *KafkaER) readLoop(r *kafka.Reader) {
|
||||
utils.ERs, string(msg.Key), err.Error()))
|
||||
}
|
||||
if rdr.Config().ConcurrentReqs != -1 {
|
||||
rdr.cap <- struct{}{}
|
||||
<-rdr.cap
|
||||
}
|
||||
}(msg)
|
||||
}
|
||||
|
||||
12
ers/nats.go
12
ers/nats.go
@@ -50,9 +50,6 @@ func NewNatsER(cfg *config.CGRConfig, cfgIdx int,
|
||||
}
|
||||
if concReq := rdr.Config().ConcurrentReqs; concReq != -1 {
|
||||
rdr.cap = make(chan struct{}, concReq)
|
||||
for i := 0; i < concReq; i++ {
|
||||
rdr.cap <- struct{}{}
|
||||
}
|
||||
}
|
||||
if err := rdr.processOpts(); err != nil {
|
||||
return nil, err
|
||||
@@ -96,13 +93,9 @@ func (rdr *NatsER) Serve() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Define the message handler. Its content will get executed for every received message.
|
||||
handleMessage := func(msgData []byte) {
|
||||
|
||||
// If the rdr.cap channel buffer is empty, block until a resource is available. Otherwise
|
||||
// allocate one resource and start processing the message.
|
||||
if rdr.Config().ConcurrentReqs != -1 {
|
||||
<-rdr.cap
|
||||
rdr.cap <- struct{}{}
|
||||
}
|
||||
go func() {
|
||||
handlerErr := rdr.processMessage(msgData)
|
||||
@@ -112,9 +105,8 @@ func (rdr *NatsER) Serve() error {
|
||||
utils.ERs, string(msgData), handlerErr.Error()))
|
||||
}
|
||||
|
||||
// Release the resource back to rdr.cap channel.
|
||||
if rdr.Config().ConcurrentReqs != -1 {
|
||||
rdr.cap <- struct{}{}
|
||||
<-rdr.cap
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
@@ -51,9 +51,6 @@ func NewS3ER(cfg *config.CGRConfig, cfgIdx int,
|
||||
}
|
||||
if concReq := rdr.Config().ConcurrentReqs; concReq != -1 {
|
||||
rdr.cap = make(chan struct{}, concReq)
|
||||
for i := 0; i < concReq; i++ {
|
||||
rdr.cap <- struct{}{}
|
||||
}
|
||||
}
|
||||
rdr.parseOpts(rdr.Config().Opts)
|
||||
return rdr, nil
|
||||
@@ -203,8 +200,8 @@ func (rdr *S3ER) isClosed() bool {
|
||||
|
||||
func (rdr *S3ER) readMsg(scv *s3.S3, key string) (err error) {
|
||||
if rdr.Config().ConcurrentReqs != -1 {
|
||||
<-rdr.cap // do not try to read if the limit is reached
|
||||
defer func() { rdr.cap <- struct{}{} }()
|
||||
rdr.cap <- struct{}{}
|
||||
defer func() { <-rdr.cap }()
|
||||
}
|
||||
if rdr.isClosed() {
|
||||
return
|
||||
|
||||
@@ -62,9 +62,6 @@ func NewSQLEventReader(cfg *config.CGRConfig, cfgIdx int,
|
||||
}
|
||||
if concReq := rdr.Config().ConcurrentReqs; concReq != -1 {
|
||||
rdr.cap = make(chan struct{}, concReq)
|
||||
for i := 0; i < concReq; i++ {
|
||||
rdr.cap <- struct{}{}
|
||||
}
|
||||
}
|
||||
if err := rdr.setURL(rdr.Config().SourcePath, rdr.Config().Opts); err != nil {
|
||||
return nil, err
|
||||
@@ -205,7 +202,7 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) {
|
||||
return
|
||||
}
|
||||
if rdr.Config().ConcurrentReqs != -1 {
|
||||
<-rdr.cap
|
||||
rdr.cap <- struct{}{}
|
||||
}
|
||||
|
||||
columns := make([]any, len(colNames)) // create a list of interfaces correlating to the columns selected
|
||||
@@ -273,7 +270,7 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) {
|
||||
utils.ERs, utils.ToJSON(ev), err.Error()))
|
||||
}
|
||||
if rdr.Config().ConcurrentReqs != -1 {
|
||||
rdr.cap <- struct{}{}
|
||||
<-rdr.cap
|
||||
}
|
||||
}(ev)
|
||||
}
|
||||
|
||||
@@ -49,9 +49,6 @@ func NewSQSER(cfg *config.CGRConfig, cfgIdx int, rdrEvents, partialEvents chan *
|
||||
}
|
||||
if concReq := rdr.Config().ConcurrentReqs; concReq != -1 {
|
||||
rdr.cap = make(chan struct{}, concReq)
|
||||
for i := 0; i < concReq; i++ {
|
||||
rdr.cap <- struct{}{}
|
||||
}
|
||||
}
|
||||
rdr.parseOpts(rdr.Config().Opts)
|
||||
return rdr, nil
|
||||
@@ -197,7 +194,7 @@ func (rdr *SQSER) readLoop(scv sqsClient) (err error) {
|
||||
// scv := sqs.New(rdr.session)
|
||||
for !rdr.isClosed() {
|
||||
if rdr.Config().ConcurrentReqs != -1 {
|
||||
<-rdr.cap // do not try to read if the limit is reached
|
||||
rdr.cap <- struct{}{}
|
||||
}
|
||||
var msgs *sqs.ReceiveMessageOutput
|
||||
if msgs, err = scv.ReceiveMessage(&sqs.ReceiveMessageInput{
|
||||
@@ -210,7 +207,7 @@ func (rdr *SQSER) readLoop(scv sqsClient) (err error) {
|
||||
if len(msgs.Messages) != 0 {
|
||||
go rdr.readMsg(scv, msgs.Messages[0])
|
||||
} else if rdr.Config().ConcurrentReqs != -1 {
|
||||
rdr.cap <- struct{}{}
|
||||
<-rdr.cap
|
||||
}
|
||||
|
||||
}
|
||||
@@ -229,7 +226,7 @@ func (rdr *SQSER) isClosed() bool {
|
||||
|
||||
func (rdr *SQSER) readMsg(scv sqsClient, msg *sqs.Message) (err error) {
|
||||
if rdr.Config().ConcurrentReqs != -1 {
|
||||
defer func() { rdr.cap <- struct{}{} }()
|
||||
defer func() { <-rdr.cap }()
|
||||
}
|
||||
body := []byte(*msg.Body)
|
||||
key := *msg.MessageId
|
||||
|
||||
@@ -513,7 +513,6 @@ func TestSQSERReadLoop(t *testing.T) {
|
||||
queueURL: utils.StringPointer("testQueueURL"),
|
||||
session: nil,
|
||||
}
|
||||
rdr.cap <- struct{}{}
|
||||
rdr.Config().ConcurrentReqs = 1
|
||||
counter := 0
|
||||
receiveMessage := func(input *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) {
|
||||
@@ -558,7 +557,6 @@ func TestSQSERReadLoop2(t *testing.T) {
|
||||
queueURL: utils.StringPointer("testQueueURL"),
|
||||
session: nil,
|
||||
}
|
||||
rdr.cap <- struct{}{}
|
||||
rdr.Config().ConcurrentReqs = 1
|
||||
counter := 0
|
||||
receiveMessage := func(input *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) {
|
||||
|
||||
Reference in New Issue
Block a user