changed conReqs to not populate buffer at start

This commit is contained in:
gezimbll
2024-04-17 05:41:34 -04:00
committed by Dan Christian Bogos
parent afed0dd805
commit 5f941b1f9d
16 changed files with 33 additions and 79 deletions

View File

@@ -46,9 +46,6 @@ func NewAMQPv1ER(cfg *config.CGRConfig, cfgIdx int,
}
if concReq := rdr.Config().ConcurrentReqs; concReq != -1 {
rdr.cap = make(chan struct{}, concReq)
for i := 0; i < concReq; i++ {
rdr.cap <- struct{}{}
}
}
if amqOpts := rdr.Config().Opts.AMQP; amqOpts != nil {
@@ -120,7 +117,7 @@ func (rdr *AMQPv1ER) Serve() (err error) {
func (rdr *AMQPv1ER) readLoop(recv *amqpv1.Receiver) (err error) {
for {
if rdr.Config().ConcurrentReqs != -1 {
<-rdr.cap // do not try to read if the limit is reached
rdr.cap <- struct{}{} // do not try to read if the limit is reached
}
ctx := context.Background()
var msg *amqpv1.Message
@@ -147,7 +144,7 @@ func (rdr *AMQPv1ER) readLoop(recv *amqpv1.Receiver) (err error) {
utils.ERs, err.Error()))
}
if rdr.Config().ConcurrentReqs != -1 {
rdr.cap <- struct{}{}
<-rdr.cap
}
}(msg)
}

View File

@@ -49,11 +49,9 @@ func NewCSVFileER(cfg *config.CGRConfig, cfgIdx int,
partialEvents: partialEvents,
rdrError: rdrErr,
rdrExit: rdrExit,
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)}
var processFile struct{}
for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ {
csvEr.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs),
}
return csvEr, nil
}
@@ -113,8 +111,8 @@ func (rdr *CSVFileER) Serve() (err error) {
// processFile is called for each file in a directory and dispatches erEvents from it
func (rdr *CSVFileER) processFile(fName string) (err error) {
if cap(rdr.conReqs) != 0 { // 0 goes for no limit
processFile := <-rdr.conReqs // Queue here for maxOpenFiles
defer func() { rdr.conReqs <- processFile }()
rdr.conReqs <- struct{}{} // Queue here for maxOpenFiles
defer func() { <-rdr.conReqs }()
}
absPath := path.Join(rdr.sourceDir, fName)
utils.Logger.Info(

View File

@@ -418,7 +418,6 @@ func TestFileCSVProcessEvent(t *testing.T) {
},
APIOpts: map[string]any{},
}
eR.conReqs <- struct{}{}
eR.Config().Fields = []*config.FCTemplate{
{
@@ -538,7 +537,6 @@ func TestFileCSVProcessEventError(t *testing.T) {
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
eR.conReqs <- struct{}{}
errExpect := "open /tmp/TestFileCSVProcessEvent/file1.csv: no such file or directory"
if err := eR.processFile(fname); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
@@ -571,7 +569,6 @@ func TestFileCSVProcessEventError2(t *testing.T) {
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
eR.conReqs <- struct{}{}
eR.Config().Fields = []*config.FCTemplate{
{},
@@ -615,7 +612,6 @@ func TestFileCSVProcessEventError3(t *testing.T) {
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
eR.conReqs <- struct{}{}
//
eR.Config().Filters = []string{"Filter1"}

View File

@@ -50,11 +50,9 @@ func NewFWVFileER(cfg *config.CGRConfig, cfgIdx int,
partialEvents: partialEvents,
rdrError: rdrErr,
rdrExit: rdrExit,
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)}
var processFile struct{}
for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ {
fwvER.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs),
}
return fwvER, nil
}
@@ -122,8 +120,8 @@ func (rdr *FWVFileER) Serve() (err error) {
// processFile is called for each file in a directory and dispatches erEvents from it
func (rdr *FWVFileER) processFile(fName string) (err error) {
if cap(rdr.conReqs) != 0 { // 0 goes for no limit
processFile := <-rdr.conReqs // Queue here for maxOpenFiles
defer func() { rdr.conReqs <- processFile }()
rdr.conReqs <- struct{}{} // Queue here for maxOpenFiles
defer func() { <-rdr.conReqs }()
}
absPath := path.Join(rdr.sourceDir, fName)
utils.Logger.Info(

View File

@@ -291,7 +291,6 @@ func TestFileFWVProcessEvent(t *testing.T) {
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
eR.conReqs <- struct{}{}
fname := "file1.fwv"
errExpect := "unsupported field prefix: <> when set fields"
eR.Config().Fields = []*config.FCTemplate{

View File

@@ -51,11 +51,9 @@ func NewJSONFileER(cfg *config.CGRConfig, cfgIdx int,
partialEvents: partialEvents,
rdrError: rdrErr,
rdrExit: rdrExit,
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)}
var processFile struct{}
for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ {
jsonEr.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs),
}
return jsonEr, nil
}
@@ -116,8 +114,8 @@ func (rdr *JSONFileER) Serve() (err error) {
// processFile is called for each file in a directory and dispatches erEvents from it
func (rdr *JSONFileER) processFile(fName string) (err error) {
if cap(rdr.conReqs) != 0 { // 0 goes for no limit
processFile := <-rdr.conReqs // Queue here for maxOpenFiles
defer func() { rdr.conReqs <- processFile }()
rdr.conReqs <- struct{}{} // Queue here for maxOpenFiles
defer func() { <-rdr.conReqs }()
}
absPath := path.Join(rdr.sourceDir, fName)
utils.Logger.Info(

View File

@@ -362,7 +362,6 @@ func TestFileJSONProcessEvent(t *testing.T) {
APIOpts: map[string]any{},
}
// expEvent := &utils.CGREvent{}
eR.conReqs <- struct{}{}
fname := "file1.json"
if err := eR.processFile(fname); err != nil {
t.Error(err)
@@ -397,7 +396,6 @@ func TestFileJSONProcessEventReadError(t *testing.T) {
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
eR.conReqs <- struct{}{}
errExpect := "open /tmp/TestFileJSONProcessEvent/file2.json: no such file or directory"
if err := eR.processFile(fname); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
@@ -445,7 +443,6 @@ func TestFileJSONProcessEventError2(t *testing.T) {
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
eR.conReqs <- struct{}{}
eR.Config().Fields = []*config.FCTemplate{
{},
@@ -504,7 +501,6 @@ func TestFileJSONProcessEventError3(t *testing.T) {
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
eR.conReqs <- struct{}{}
fname := "file1.json"
//

View File

@@ -51,10 +51,7 @@ func NewXMLFileER(cfg *config.CGRConfig, cfgIdx int,
partialEvents: partialEvents,
rdrError: rdrErr,
rdrExit: rdrExit,
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)}
var processFile struct{}
for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ {
xmlER.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs),
}
return xmlER, nil
}
@@ -134,8 +131,8 @@ func (rdr *XMLFileER) Serve() (err error) {
// processFile is called for each file in a directory and dispatches erEvents from it
func (rdr *XMLFileER) processFile(fName string) error {
if cap(rdr.conReqs) != 0 { // 0 goes for no limit
processFile := <-rdr.conReqs // Queue here for maxOpenFiles
defer func() { rdr.conReqs <- processFile }()
rdr.conReqs <- struct{}{} // Queue here for maxOpenFiles
defer func() { <-rdr.conReqs }()
}
absPath := path.Join(rdr.sourceDir, fName)
utils.Logger.Info(

View File

@@ -355,14 +355,9 @@ func TestNewXMLFileER(t *testing.T) {
rdrExit: nil,
conReqs: make(chan struct{}, 1),
}
var value struct{}
expEr.conReqs <- value
eR, err := NewXMLFileER(cfg, 0, nil, nil, nil, fltrs, nil)
expConReq := make(chan struct{}, 1)
expConReq <- struct{}{}
if <-expConReq != <-eR.(*XMLFileER).conReqs {
t.Errorf("Expected %v but received %v", <-expConReq, <-eR.(*XMLFileER).conReqs)
}
expEr.conReqs = nil
eR.(*XMLFileER).conReqs = nil
if err != nil {
@@ -422,7 +417,6 @@ func TestFileXMLProcessEvent(t *testing.T) {
eR.Config().Fields[0].ComputePath()
eR.conReqs <- struct{}{}
fileName := "file1.xml"
if err := eR.processFile(fileName); err != nil {
t.Error(err)
@@ -464,7 +458,6 @@ func TestFileXMLProcessEventError1(t *testing.T) {
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
eR.conReqs <- struct{}{}
errExpect := "open /tmp/TestFileXMLProcessEvent/file1.xml: no such file or directory"
if err := eR.processFile(fname); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
@@ -510,7 +503,7 @@ func TestFileXMLProcessEVentError2(t *testing.T) {
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
eR.conReqs <- struct{}{}
eR.Config().Tenant = config.RSRParsers{
{
Rules: "test",
@@ -560,7 +553,6 @@ func TestFileXMLProcessEventParseError(t *testing.T) {
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
eR.conReqs <- struct{}{}
fileName := "file1.xml"
errExpect := "XML syntax error on line 2: unexpected EOF"

View File

@@ -52,9 +52,6 @@ func NewKafkaER(cfg *config.CGRConfig, cfgIdx int,
}
if concReq := rdr.Config().ConcurrentReqs; concReq != -1 {
rdr.cap = make(chan struct{}, concReq)
for i := 0; i < concReq; i++ {
rdr.cap <- struct{}{}
}
}
rdr.dialURL = rdr.Config().SourcePath
if err := rdr.setOpts(rdr.Config().Opts); err != nil {
@@ -146,7 +143,7 @@ func (rdr *KafkaER) Serve() (err error) {
func (rdr *KafkaER) readLoop(r *kafka.Reader) {
for {
if rdr.Config().ConcurrentReqs != -1 {
<-rdr.cap // do not try to read if the limit is reached
rdr.cap <- struct{}{} // do not try to read if the limit is reached
}
msg, err := r.ReadMessage(context.Background())
if err != nil {
@@ -166,7 +163,7 @@ func (rdr *KafkaER) readLoop(r *kafka.Reader) {
utils.ERs, string(msg.Key), err.Error()))
}
if rdr.Config().ConcurrentReqs != -1 {
rdr.cap <- struct{}{}
<-rdr.cap
}
}(msg)
}

View File

@@ -50,9 +50,6 @@ func NewNatsER(cfg *config.CGRConfig, cfgIdx int,
}
if concReq := rdr.Config().ConcurrentReqs; concReq != -1 {
rdr.cap = make(chan struct{}, concReq)
for i := 0; i < concReq; i++ {
rdr.cap <- struct{}{}
}
}
if err := rdr.processOpts(); err != nil {
return nil, err
@@ -102,7 +99,7 @@ func (rdr *NatsER) Serve() error {
// If the rdr.cap channel buffer is empty, block until a resource is available. Otherwise
// allocate one resource and start processing the message.
if rdr.Config().ConcurrentReqs != -1 {
<-rdr.cap
rdr.cap <- struct{}{}
}
go func() {
handlerErr := rdr.processMessage(msgData)
@@ -114,7 +111,7 @@ func (rdr *NatsER) Serve() error {
// Release the resource back to rdr.cap channel.
if rdr.Config().ConcurrentReqs != -1 {
rdr.cap <- struct{}{}
<-rdr.cap
}
}()

View File

@@ -50,9 +50,6 @@ func NewS3ER(cfg *config.CGRConfig, cfgIdx int,
}
if concReq := rdr.Config().ConcurrentReqs; concReq != -1 {
rdr.cap = make(chan struct{}, concReq)
for i := 0; i < concReq; i++ {
rdr.cap <- struct{}{}
}
}
rdr.parseOpts(rdr.Config().Opts)
return rdr, nil
@@ -201,8 +198,8 @@ func (rdr *S3ER) isClosed() bool {
func (rdr *S3ER) readMsg(scv s3Client, key string) (err error) {
if rdr.Config().ConcurrentReqs != -1 {
<-rdr.cap // do not try to read if the limit is reached
defer func() { rdr.cap <- struct{}{} }()
rdr.cap <- struct{}{} // do not try to read if the limit is reached
defer func() { <-rdr.cap }()
}
if rdr.isClosed() {
return

View File

@@ -440,7 +440,7 @@ func TestS3ERReadMsgError1(t *testing.T) {
GetObjectF: getObject,
DeleteObjectF: deleteObject,
}
rdr.cap <- struct{}{}
errExp := "NOT_FOUND:ToR"
if err := rdr.readMsg(scv, "AWSKey"); err == nil || err.Error() != errExp {
t.Errorf("Expected %v but received %v", errExp, err)
@@ -466,7 +466,7 @@ func TestS3ERReadMsgError2(t *testing.T) {
}
rdr.Config().ConcurrentReqs = 1
scv := &s3ClientMock{}
rdr.cap <- struct{}{}
rdr.rdrExit <- struct{}{}
if err := rdr.readMsg(scv, "AWSKey"); err != nil {
t.Error(err)

View File

@@ -59,9 +59,6 @@ func NewSQLEventReader(cfg *config.CGRConfig, cfgIdx int,
}
if concReq := rdr.Config().ConcurrentReqs; concReq != -1 {
rdr.cap = make(chan struct{}, concReq)
for i := 0; i < concReq; i++ {
rdr.cap <- struct{}{}
}
}
if err = rdr.setURL(rdr.Config().SourcePath, rdr.Config().Opts); err != nil {
return nil, err
@@ -155,7 +152,7 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) {
return
}
if rdr.Config().ConcurrentReqs != -1 {
<-rdr.cap // do not try to read if the limit is reached
rdr.cap <- struct{}{}
}
columns := make([]any, len(colNames))
columnPointers := make([]any, len(colNames))
@@ -203,7 +200,7 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) {
utils.ERs, utils.ToJSON(msg), err.Error()))
}
if rdr.Config().ConcurrentReqs != -1 {
rdr.cap <- struct{}{}
<-rdr.cap
}
}(msg)
}

View File

@@ -50,9 +50,6 @@ func NewSQSER(cfg *config.CGRConfig, cfgIdx int,
}
if concReq := rdr.Config().ConcurrentReqs; concReq != -1 {
rdr.cap = make(chan struct{}, concReq)
for i := 0; i < concReq; i++ {
rdr.cap <- struct{}{}
}
}
rdr.parseOpts(rdr.Config().Opts)
return rdr, nil
@@ -192,7 +189,7 @@ func (rdr *SQSER) getQueueURLWithClient(svc sqsClient) (err error) {
func (rdr *SQSER) readLoop(scv sqsClient) (err error) {
for !rdr.isClosed() {
if rdr.Config().ConcurrentReqs != -1 {
<-rdr.cap // do not try to read if the limit is reached
rdr.cap <- struct{}{} // do not try to read if the limit is reached
}
var msgs *sqs.ReceiveMessageOutput
if msgs, err = scv.ReceiveMessage(&sqs.ReceiveMessageInput{
@@ -205,7 +202,7 @@ func (rdr *SQSER) readLoop(scv sqsClient) (err error) {
if len(msgs.Messages) != 0 {
go rdr.readMsg(scv, msgs.Messages[0])
} else if rdr.Config().ConcurrentReqs != -1 {
rdr.cap <- struct{}{}
<-rdr.cap
}
}
@@ -224,7 +221,7 @@ func (rdr *SQSER) isClosed() bool {
func (rdr *SQSER) readMsg(scv sqsClient, msg *sqs.Message) (err error) {
if rdr.Config().ConcurrentReqs != -1 {
defer func() { rdr.cap <- struct{}{} }()
defer func() { <-rdr.cap }()
}
body := []byte(*msg.Body)
key := *msg.MessageId

View File

@@ -518,7 +518,6 @@ func TestSQSERReadLoop(t *testing.T) {
queueURL: utils.StringPointer("testQueueURL"),
session: nil,
}
rdr.cap <- struct{}{}
rdr.Config().ConcurrentReqs = 1
counter := 0
receiveMessage := func(input *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) {
@@ -563,7 +562,6 @@ func TestSQSERReadLoop2(t *testing.T) {
queueURL: utils.StringPointer("testQueueURL"),
session: nil,
}
rdr.cap <- struct{}{}
rdr.Config().ConcurrentReqs = 1
counter := 0
receiveMessage := func(input *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) {