Correctly populate ConcurrentRequest from config in EventReader fixes #1932

This commit is contained in:
TeoV
2020-02-18 14:59:29 +02:00
committed by Dan Christian Bogos
parent bf9fe43237
commit 3e3fc6c457
6 changed files with 52 additions and 14 deletions

View File

@@ -43,14 +43,20 @@ func NewCSVFileER(cfg *config.CGRConfig, cfgIdx int,
if strings.HasSuffix(srcPath, utils.Slash) {
srcPath = srcPath[:len(srcPath)-1]
}
return &CSVFileER{
csvEr := &CSVFileER{
cgrCfg: cfg,
cfgIdx: cfgIdx,
fltrS: fltrS,
rdrDir: srcPath,
rdrEvents: rdrEvents,
rdrError: rdrErr,
rdrExit: rdrExit}, nil
rdrExit: rdrExit,
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)}
var processFile struct{}
for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ {
csvEr.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop
}
return csvEr, nil
}
// CSVFileER implements EventReader interface for .csv files

View File

@@ -42,14 +42,20 @@ func NewFWVFileERER(cfg *config.CGRConfig, cfgIdx int,
if strings.HasSuffix(srcPath, utils.Slash) {
srcPath = srcPath[:len(srcPath)-1]
}
return &FWVFileER{
fwvER := &FWVFileER{
cgrCfg: cfg,
cfgIdx: cfgIdx,
fltrS: fltrS,
rdrDir: srcPath,
rdrEvents: rdrEvents,
rdrError: rdrErr,
rdrExit: rdrExit}, nil
rdrExit: rdrExit,
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)}
var processFile struct{}
for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ {
fwvER.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop
}
return fwvER, nil
}
// XMLFileER implements EventReader interface for .xml files

View File

@@ -43,14 +43,20 @@ func NewXMLFileER(cfg *config.CGRConfig, cfgIdx int,
if strings.HasSuffix(srcPath, utils.Slash) {
srcPath = srcPath[:len(srcPath)-1]
}
return &XMLFileER{
xmlER := &XMLFileER{
cgrCfg: cfg,
cfgIdx: cfgIdx,
fltrS: fltrS,
rdrDir: srcPath,
rdrEvents: rdrEvents,
rdrError: rdrErr,
rdrExit: rdrExit}, nil
rdrExit: rdrExit,
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)}
var processFile struct{}
for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ {
xmlER.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop
}
return xmlER, nil
}
// XMLFileER implements EventReader interface for .xml files

View File

@@ -55,10 +55,14 @@ func NewFlatstoreER(cfg *config.CGRConfig, cfgIdx int,
rdrEvents: rdrEvents,
rdrError: rdrErr,
rdrExit: rdrExit,
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs),
}
var processFile struct{}
for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ {
flatER.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop
}
flatER.cache = ltcache.NewCache(ltcache.UnlimitedCaching, cfg.ERsCfg().Readers[cfgIdx].PartialRecordCache, false, flatER.dumpToFile)
return flatER, err
}
// FlatstoreER implements EventReader interface for Flatstore CDR

View File

@@ -55,7 +55,8 @@ func NewPartialCSVFileER(cfg *config.CGRConfig, cfgIdx int,
rdrDir: srcPath,
rdrEvents: rdrEvents,
rdrError: rdrErr,
rdrExit: rdrExit}
rdrExit: rdrExit,
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)}
var function func(itmID string, value interface{})
if cfg.ERsCfg().Readers[cfgIdx].PartialCacheExpiryAction == utils.MetaDumpToFile {
@@ -63,6 +64,10 @@ func NewPartialCSVFileER(cfg *config.CGRConfig, cfgIdx int,
} else {
function = pCSVFileER.postCDR
}
var processFile struct{}
for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ {
pCSVFileER.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop
}
pCSVFileER.cache = ltcache.NewCache(ltcache.UnlimitedCaching, cfg.ERsCfg().Readers[cfgIdx].PartialRecordCache, false, function)
return pCSVFileER, nil
}

View File

@@ -51,14 +51,25 @@ func TestNewCsvReader(t *testing.T) {
if len(cfg.ERsCfg().Readers) != 2 {
t.Errorf("Expecting: <2>, received: <%+v>", len(cfg.ERsCfg().Readers))
}
expected, err := NewCSVFileER(cfg, 1, nil, nil, fltr, nil)
if err != nil {
t.Errorf("Expecting: <nil>, received: <%+v>", err)
}
exp := &CSVFileER{
cgrCfg: cfg,
cfgIdx: 1,
fltrS: fltr,
rdrDir: cfg.ERsCfg().Readers[1].SourcePath,
rdrEvents: nil,
rdrError: nil,
rdrExit: nil,
conReqs: nil}
var expected EventReader = exp
if rcv, err := NewEventReader(cfg, 1, nil, nil, fltr, nil); err != nil {
t.Errorf("Expecting: <nil>, received: <%+v>", err)
} else if !reflect.DeepEqual(expected, rcv) {
t.Errorf("Expecting: <%+v>, received: <%+v>", expected, rcv)
} else {
// because we use function make to init the channel when we create the EventReader reflect.DeepEqual
// says it doesn't match
rcv.(*CSVFileER).conReqs = nil
if !reflect.DeepEqual(expected, rcv) {
t.Errorf("Expecting: <%+v>, received: <%+v>", expected, rcv)
}
}
}