mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-14 20:59:53 +05:00
Concurent import for CDR files
This commit is contained in:
16
cdrc/cdrc.go
16
cdrc/cdrc.go
@@ -150,9 +150,11 @@ func (self *Cdrc) processCdrDir() error {
|
||||
filesInDir, _ := ioutil.ReadDir(self.cdrInDir)
|
||||
for _, file := range filesInDir {
|
||||
if self.cdrType != FS_CSV || path.Ext(file.Name()) != ".csv" {
|
||||
if err := self.processFile(path.Join(self.cdrInDir, file.Name())); err != nil {
|
||||
return err
|
||||
}
|
||||
go func() { //Enable async processing here
|
||||
if err := self.processFile(path.Join(self.cdrInDir, file.Name())); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", file, err.Error()))
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@@ -174,9 +176,11 @@ func (self *Cdrc) trackCDRFiles() (err error) {
|
||||
select {
|
||||
case ev := <-watcher.Event:
|
||||
if ev.IsCreate() && (self.cdrType != FS_CSV || path.Ext(ev.Name) != ".csv") {
|
||||
if err = self.processFile(ev.Name); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", ev.Name, err.Error()))
|
||||
}
|
||||
go func() { //Enable async processing here
|
||||
if err = self.processFile(ev.Name); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", ev.Name, err.Error()))
|
||||
}
|
||||
}()
|
||||
}
|
||||
case err := <-watcher.Error:
|
||||
engine.Logger.Err(fmt.Sprintf("Inotify error: %s", err.Error()))
|
||||
|
||||
Reference in New Issue
Block a user