diff --git a/ers/amqpv1.go b/ers/amqpv1.go index b4573c1bc..d93939872 100644 --- a/ers/amqpv1.go +++ b/ers/amqpv1.go @@ -46,9 +46,6 @@ func NewAMQPv1ER(cfg *config.CGRConfig, cfgIdx int, } if concReq := rdr.Config().ConcurrentReqs; concReq != -1 { rdr.cap = make(chan struct{}, concReq) - for i := 0; i < concReq; i++ { - rdr.cap <- struct{}{} - } } if rdr.Config().Opts.AMQPQueueID != nil { rdr.queueID = "/" + *rdr.Config().Opts.AMQPQueueID @@ -125,7 +122,7 @@ func (rdr *AMQPv1ER) readLoop(recv *amqpv1.Receiver) (err error) { } for { if rdr.Config().ConcurrentReqs != -1 { - <-rdr.cap // do not try to read if the limit is reached + rdr.cap <- struct{}{} } ctx := context.Background() var msg *amqpv1.Message @@ -152,7 +149,7 @@ func (rdr *AMQPv1ER) readLoop(recv *amqpv1.Receiver) (err error) { utils.ERs, err.Error())) } if rdr.Config().ConcurrentReqs != -1 { - rdr.cap <- struct{}{} + <-rdr.cap } }(msg) } diff --git a/ers/filecsv.go b/ers/filecsv.go index a10f1481c..6990db15f 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -50,11 +50,9 @@ func NewCSVFileER(cfg *config.CGRConfig, cfgIdx int, partialEvents: partialEvents, rdrError: rdrErr, rdrExit: rdrExit, - conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)} - var processFile struct{} - for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ { - csvEr.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop + conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs), } + return csvEr, nil } @@ -126,9 +124,9 @@ func (rdr *CSVFileER) Serve() (err error) { // processFile is called for each file in a directory and dispatches erEvents from it func (rdr *CSVFileER) processFile(fName string) (err error) { - if cap(rdr.conReqs) != 0 { // 0 goes for no limit - processFile := <-rdr.conReqs // Queue here for maxOpenFiles - defer func() { rdr.conReqs <- processFile }() + if cap(rdr.conReqs) != 0 { + rdr.conReqs <- struct{}{} + defer func() { <-rdr.conReqs }() } absPath := path.Join(rdr.sourceDir, fName) utils.Logger.Info( diff --git a/ers/filefwv.go b/ers/filefwv.go index e893aad1c..035bcf87e 100644 --- a/ers/filefwv.go +++ b/ers/filefwv.go @@ -51,11 +51,9 @@ func NewFWVFileER(cfg *config.CGRConfig, cfgIdx int, partialEvents: partialEvents, rdrError: rdrErr, rdrExit: rdrExit, - conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)} - var processFile struct{} - for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ { - fwvER.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop + conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs), } + return fwvER, nil } @@ -134,9 +132,9 @@ func (rdr *FWVFileER) Serve() (err error) { // processFile is called for each file in a directory and dispatches erEvents from it func (rdr *FWVFileER) processFile(fName string) (err error) { - if cap(rdr.conReqs) != 0 { // 0 goes for no limit - processFile := <-rdr.conReqs // Queue here for maxOpenFiles - defer func() { rdr.conReqs <- processFile }() + if cap(rdr.conReqs) != 0 { + rdr.conReqs <- struct{}{} + defer func() { <-rdr.conReqs }() } absPath := path.Join(rdr.sourceDir, fName) utils.Logger.Info( diff --git a/ers/filefwv_it_test.go b/ers/filefwv_it_test.go index 7c1ad95ee..accc35893 100644 --- a/ers/filefwv_it_test.go +++ b/ers/filefwv_it_test.go @@ -291,7 +291,6 @@ func TestFileFWVProcessEvent(t *testing.T) { rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } - eR.conReqs <- struct{}{} fname := "file1.fwv" errExpect := "unsupported field prefix: <> when set fields" eR.Config().Fields = []*config.FCTemplate{ @@ -356,7 +355,6 @@ func TestFileFWV(t *testing.T) { rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } - eR.conReqs <- struct{}{} filePath := "/tmp/fwvErs/out" err := os.MkdirAll(filePath, 0777) if err != nil { @@ -391,7 +389,6 @@ func TestFileFWVServeDefault(t *testing.T) { rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } - eR.conReqs <- struct{}{} filePath := "/tmp/fwvErs/out" err := os.MkdirAll(filePath, 0777) if err != nil { @@ -427,7 +424,6 @@ func TestFileFWVExit(t *testing.T) { rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } - eR.conReqs <- struct{}{} eR.Config().RunDelay = 1 * time.Millisecond if err := eR.Serve(); err != nil { t.Error(err) @@ -459,7 +455,6 @@ dm := engine.NewDataManager(dbCM, cfg.CacheCfg(), nil) }, APIOpts: map[string]any{}, } - eR.conReqs <- struct{}{} filePath := "/tmp/TestFileFWVProcessTrailer/" if err := os.MkdirAll(filePath, 0777); err != nil { t.Error(err) @@ -512,7 +507,6 @@ dm := engine.NewDataManager(dbCM, cfg.CacheCfg(), nil) rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } - eR.conReqs <- struct{}{} filePath := "/tmp/TestFileFWVProcessTrailer/" if err := os.MkdirAll(filePath, 0777); err != nil { t.Error(err) @@ -549,7 +543,6 @@ dm := engine.NewDataManager(dbCM, cfg.CacheCfg(), nil) rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } - eR.conReqs <- struct{}{} eR.Config().Tenant = utils.RSRParsers{ { Rules: "cgrates.org", @@ -597,7 +590,6 @@ dm := engine.NewDataManager(dbCM, cfg.CacheCfg(), nil) rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } - eR.conReqs <- struct{}{} trailerFields := []*config.FCTemplate{ { Tag: "OriginId", @@ -629,7 +621,6 @@ dm := engine.NewDataManager(dbCM, cfg.CacheCfg(), nil) rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } - eR.conReqs <- struct{}{} expEvent := &utils.CGREvent{ Tenant: "cgrates.org", Event: map[string]any{ @@ -678,7 +669,6 @@ dm := engine.NewDataManager(dbCM, cfg.CacheCfg(), nil) rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } - eR.conReqs <- struct{}{} trailerFields := []*config.FCTemplate{ {}, } @@ -705,7 +695,6 @@ dm := engine.NewDataManager(dbCM, cfg.CacheCfg(), nil) rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } - eR.conReqs <- struct{}{} record := "testRecord" trailerFields := []*config.FCTemplate{ { diff --git a/ers/filejson.go b/ers/filejson.go index 04111d93a..8cd4d0692 100644 --- a/ers/filejson.go +++ b/ers/filejson.go @@ -51,11 +51,9 @@ func NewJSONFileER(cfg *config.CGRConfig, cfgIdx int, partialEvents: partialEvents, rdrError: rdrErr, rdrExit: rdrExit, - conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)} - var processFile struct{} - for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ { - jsonEr.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop + conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs), } + return jsonEr, nil } @@ -127,9 +125,9 @@ func (rdr *JSONFileER) Serve() (err error) { // processFile is called for each file in a directory and dispatches erEvents from it func (rdr *JSONFileER) processFile(fName string) (err error) { - if cap(rdr.conReqs) != 0 { // 0 goes for no limit - processFile := <-rdr.conReqs // Queue here for maxOpenFiles - defer func() { rdr.conReqs <- processFile }() + if cap(rdr.conReqs) != 0 { + rdr.conReqs <- struct{}{} + defer func() { <-rdr.conReqs }() } absPath := path.Join(rdr.sourceDir, fName) utils.Logger.Info( diff --git a/ers/filexml.go b/ers/filexml.go index 76e9ac889..aed1ce9c8 100644 --- a/ers/filexml.go +++ b/ers/filexml.go @@ -52,10 +52,7 @@ func NewXMLFileER(cfg *config.CGRConfig, cfgIdx int, partialEvents: partialEvents, rdrError: rdrErr, rdrExit: rdrExit, - conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)} - var processFile struct{} - for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ { - xmlER.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop + conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs), } return xmlER, nil } @@ -148,9 +145,9 @@ func (rdr *XMLFileER) Serve() (err error) { // processFile is called for each file in a directory and dispatches erEvents from it func (rdr *XMLFileER) processFile(fName string) error { - if cap(rdr.conReqs) != 0 { // 0 goes for no limit - processFile := <-rdr.conReqs // Queue here for maxOpenFiles - defer func() { rdr.conReqs <- processFile }() + if cap(rdr.conReqs) != 0 { + rdr.conReqs <- struct{}{} + defer func() { <-rdr.conReqs }() } absPath := path.Join(rdr.sourceDir, fName) utils.Logger.Info( diff --git a/ers/filexml_it_test.go b/ers/filexml_it_test.go index 1672bbe6f..6c0aa0cbb 100644 --- a/ers/filexml_it_test.go +++ b/ers/filexml_it_test.go @@ -364,16 +364,9 @@ func TestNewXMLFileER(t *testing.T) { rdrEvents: nil, rdrError: nil, rdrExit: nil, - conReqs: make(chan struct{}, 1), + conReqs: nil, } - var value struct{} - expEr.conReqs <- value eR, err := NewXMLFileER(cfg, 0, nil, nil, nil, fltrs, nil) - expConReq := make(chan struct{}, 1) - expConReq <- struct{}{} - if <-expConReq != <-eR.(*XMLFileER).conReqs { - t.Errorf("Expected %v but received %v", <-expConReq, <-eR.(*XMLFileER).conReqs) - } expEr.conReqs = nil eR.(*XMLFileER).conReqs = nil if err != nil { @@ -433,7 +426,6 @@ func TestFileXMLProcessEvent(t *testing.T) { eR.Config().Fields[0].ComputePath() - eR.conReqs <- struct{}{} fileName := "file1.xml" if err := eR.processFile(fileName); err != nil { t.Error(err) @@ -474,7 +466,6 @@ func TestFileXMLProcessEventError1(t *testing.T) { rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } - eR.conReqs <- struct{}{} errExpect := "open /tmp/TestFileXMLProcessEvent/file1.xml: no such file or directory" if err := eR.processFile(fname); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) @@ -521,7 +512,6 @@ func TestFileXMLProcessEVentError2(t *testing.T) { rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } - eR.conReqs <- struct{}{} eR.Config().Tenant = utils.RSRParsers{ { Rules: "test", @@ -584,7 +574,6 @@ func TestFileXMLProcessEVentError3(t *testing.T) { rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } - eR.conReqs <- struct{}{} eR.Config().Fields = []*config.FCTemplate{ { @@ -638,7 +627,6 @@ func TestFileXMLProcessEventParseError(t *testing.T) { rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } - eR.conReqs <- struct{}{} fileName := "file1.xml" errExpect := "XML syntax error on line 2: unexpected EOF" @@ -663,7 +651,6 @@ func TestFileXML(t *testing.T) { rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } - eR.conReqs <- struct{}{} err := os.MkdirAll(eR.sourceDir, 0777) if err != nil { t.Error(err) @@ -716,7 +703,6 @@ func TestFileXMLError(t *testing.T) { rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } - eR.conReqs <- struct{}{} err := os.MkdirAll(eR.sourceDir, 0777) if err != nil { t.Error(err) @@ -759,7 +745,6 @@ func TestFileXMLExit(t *testing.T) { rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } - eR.conReqs <- struct{}{} eR.Config().RunDelay = 1 * time.Millisecond if err := eR.Serve(); err != nil { t.Error(err) diff --git a/ers/kafka.go b/ers/kafka.go index f00142cf1..a7de677e6 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -50,9 +50,6 @@ func NewKafkaER(cfg *config.CGRConfig, cfgIdx int, rdrEvents, partialEvents chan } if concReq := rdr.Config().ConcurrentReqs; concReq != -1 { rdr.cap = make(chan struct{}, concReq) - for i := 0; i < concReq; i++ { - rdr.cap <- struct{}{} - } } rdr.dialURL = rdr.Config().SourcePath if err := rdr.setOpts(rdr.Config().Opts); err != nil { @@ -154,7 +151,7 @@ func (rdr *KafkaER) readLoop(r *kafka.Reader) { } for { if rdr.Config().ConcurrentReqs != -1 { - <-rdr.cap // do not try to read if the limit is reached + rdr.cap <- struct{}{} } msg, err := r.ReadMessage(context.Background()) if err != nil { @@ -174,7 +171,7 @@ func (rdr *KafkaER) readLoop(r *kafka.Reader) { utils.ERs, string(msg.Key), err.Error())) } if rdr.Config().ConcurrentReqs != -1 { - rdr.cap <- struct{}{} + <-rdr.cap } }(msg) } diff --git a/ers/nats.go b/ers/nats.go index a398994e9..0e256eafa 100644 --- a/ers/nats.go +++ b/ers/nats.go @@ -50,9 +50,6 @@ func NewNatsER(cfg *config.CGRConfig, cfgIdx int, } if concReq := rdr.Config().ConcurrentReqs; concReq != -1 { rdr.cap = make(chan struct{}, concReq) - for i := 0; i < concReq; i++ { - rdr.cap <- struct{}{} - } } if err := rdr.processOpts(); err != nil { return nil, err @@ -96,13 +93,9 @@ func (rdr *NatsER) Serve() error { return err } - // Define the message handler. Its content will get executed for every received message. handleMessage := func(msgData []byte) { - - // If the rdr.cap channel buffer is empty, block until a resource is available. Otherwise - // allocate one resource and start processing the message. if rdr.Config().ConcurrentReqs != -1 { - <-rdr.cap + rdr.cap <- struct{}{} } go func() { handlerErr := rdr.processMessage(msgData) @@ -112,9 +105,8 @@ func (rdr *NatsER) Serve() error { utils.ERs, string(msgData), handlerErr.Error())) } - // Release the resource back to rdr.cap channel. if rdr.Config().ConcurrentReqs != -1 { - rdr.cap <- struct{}{} + <-rdr.cap } }() diff --git a/ers/s3.go b/ers/s3.go index 1f803ee3f..99059f154 100644 --- a/ers/s3.go +++ b/ers/s3.go @@ -51,9 +51,6 @@ func NewS3ER(cfg *config.CGRConfig, cfgIdx int, } if concReq := rdr.Config().ConcurrentReqs; concReq != -1 { rdr.cap = make(chan struct{}, concReq) - for i := 0; i < concReq; i++ { - rdr.cap <- struct{}{} - } } rdr.parseOpts(rdr.Config().Opts) return rdr, nil @@ -203,8 +200,8 @@ func (rdr *S3ER) isClosed() bool { func (rdr *S3ER) readMsg(scv *s3.S3, key string) (err error) { if rdr.Config().ConcurrentReqs != -1 { - <-rdr.cap // do not try to read if the limit is reached - defer func() { rdr.cap <- struct{}{} }() + rdr.cap <- struct{}{} + defer func() { <-rdr.cap }() } if rdr.isClosed() { return diff --git a/ers/sql.go b/ers/sql.go index 10b32800d..11cd6ef44 100644 --- a/ers/sql.go +++ b/ers/sql.go @@ -62,9 +62,6 @@ func NewSQLEventReader(cfg *config.CGRConfig, cfgIdx int, } if concReq := rdr.Config().ConcurrentReqs; concReq != -1 { rdr.cap = make(chan struct{}, concReq) - for i := 0; i < concReq; i++ { - rdr.cap <- struct{}{} - } } if err := rdr.setURL(rdr.Config().SourcePath, rdr.Config().Opts); err != nil { return nil, err @@ -205,7 +202,7 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) { return } if rdr.Config().ConcurrentReqs != -1 { - <-rdr.cap + rdr.cap <- struct{}{} } columns := make([]any, len(colNames)) // create a list of interfaces correlating to the columns selected @@ -273,7 +270,7 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) { utils.ERs, utils.ToJSON(ev), err.Error())) } if rdr.Config().ConcurrentReqs != -1 { - rdr.cap <- struct{}{} + <-rdr.cap } }(ev) } diff --git a/ers/sqs.go b/ers/sqs.go index b5dc09cc2..44a5da473 100644 --- a/ers/sqs.go +++ b/ers/sqs.go @@ -49,9 +49,6 @@ func NewSQSER(cfg *config.CGRConfig, cfgIdx int, rdrEvents, partialEvents chan * } if concReq := rdr.Config().ConcurrentReqs; concReq != -1 { rdr.cap = make(chan struct{}, concReq) - for i := 0; i < concReq; i++ { - rdr.cap <- struct{}{} - } } rdr.parseOpts(rdr.Config().Opts) return rdr, nil @@ -197,7 +194,7 @@ func (rdr *SQSER) readLoop(scv sqsClient) (err error) { // scv := sqs.New(rdr.session) for !rdr.isClosed() { if rdr.Config().ConcurrentReqs != -1 { - <-rdr.cap // do not try to read if the limit is reached + rdr.cap <- struct{}{} } var msgs *sqs.ReceiveMessageOutput if msgs, err = scv.ReceiveMessage(&sqs.ReceiveMessageInput{ @@ -210,7 +207,7 @@ func (rdr *SQSER) readLoop(scv sqsClient) (err error) { if len(msgs.Messages) != 0 { go rdr.readMsg(scv, msgs.Messages[0]) } else if rdr.Config().ConcurrentReqs != -1 { - rdr.cap <- struct{}{} + <-rdr.cap } } @@ -229,7 +226,7 @@ func (rdr *SQSER) isClosed() bool { func (rdr *SQSER) readMsg(scv sqsClient, msg *sqs.Message) (err error) { if rdr.Config().ConcurrentReqs != -1 { - defer func() { rdr.cap <- struct{}{} }() + defer func() { <-rdr.cap }() } body := []byte(*msg.Body) key := *msg.MessageId diff --git a/ers/sqs_test.go b/ers/sqs_test.go index c125d7159..1658463a7 100644 --- a/ers/sqs_test.go +++ b/ers/sqs_test.go @@ -513,7 +513,6 @@ func TestSQSERReadLoop(t *testing.T) { queueURL: utils.StringPointer("testQueueURL"), session: nil, } - rdr.cap <- struct{}{} rdr.Config().ConcurrentReqs = 1 counter := 0 receiveMessage := func(input *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) { @@ -558,7 +557,6 @@ func TestSQSERReadLoop2(t *testing.T) { queueURL: utils.StringPointer("testQueueURL"), session: nil, } - rdr.cap <- struct{}{} rdr.Config().ConcurrentReqs = 1 counter := 0 receiveMessage := func(input *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) {