From a722d3242b9a274ed78752adafe3ffb1438ff9e0 Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 15 Jul 2014 19:46:53 +0200 Subject: [PATCH] Concurent import for CDR files --- cdrc/cdrc.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index e3a9ead15..5399f68e5 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -150,9 +150,11 @@ func (self *Cdrc) processCdrDir() error { filesInDir, _ := ioutil.ReadDir(self.cdrInDir) for _, file := range filesInDir { if self.cdrType != FS_CSV || path.Ext(file.Name()) != ".csv" { - if err := self.processFile(path.Join(self.cdrInDir, file.Name())); err != nil { - return err - } + go func() { //Enable async processing here + if err := self.processFile(path.Join(self.cdrInDir, file.Name())); err != nil { + engine.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", file, err.Error())) + } + }() } } return nil @@ -174,9 +176,11 @@ func (self *Cdrc) trackCDRFiles() (err error) { select { case ev := <-watcher.Event: if ev.IsCreate() && (self.cdrType != FS_CSV || path.Ext(ev.Name) != ".csv") { - if err = self.processFile(ev.Name); err != nil { - engine.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", ev.Name, err.Error())) - } + go func() { //Enable async processing here + if err = self.processFile(ev.Name); err != nil { + engine.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", ev.Name, err.Error())) + } + }() } case err := <-watcher.Error: engine.Logger.Err(fmt.Sprintf("Inotify error: %s", err.Error()))