diff --git a/apier/v1/apier_local_test.go b/apier/v1/apier_local_test.go index 1594dc6bf..160124d92 100644 --- a/apier/v1/apier_local_test.go +++ b/apier/v1/apier_local_test.go @@ -77,7 +77,7 @@ func TestCreateTables(t *testing.T) { mysql = d.(*engine.MySQLStorage) } for _, scriptName := range []string{engine.CREATE_CDRS_TABLES_SQL, engine.CREATE_COSTDETAILS_TABLES_SQL, engine.CREATE_MEDIATOR_TABLES_SQL, engine.CREATE_TARIFFPLAN_TABLES_SQL} { - if err := mysql.CreateTablesFromScript(path.Join(*dataDir, "storage", *stprDbType, scriptName)); err != nil { + if err := mysql.CreateTablesFromScript(path.Join(*dataDir, "storage", *storDbType, scriptName)); err != nil { t.Fatal("Error on mysql creation: ", err.Error()) return // No point in going further } diff --git a/cdrc/.cdrc.go.swp b/cdrc/.cdrc.go.swp index d65cf57f8..e5df2cc19 100644 Binary files a/cdrc/.cdrc.go.swp and b/cdrc/.cdrc.go.swp differ diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index 131146921..567655502 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -28,6 +28,7 @@ import ( "github.com/cgrates/cgrates/utils" "github.com/howeyc/fsnotify" "io/ioutil" + "io" "net/http" "net/url" "os" @@ -107,6 +108,7 @@ func (self *Cdrc) parseFieldIndexesFromConfig() error { // Takes the record out of csv and turns it into http form which can be posted func (self *Cdrc) cdrAsHttpForm(record []string) (url.Values, error) { + engine.Logger.Info(fmt.Sprintf("Processing record %v", record)) v := url.Values{} v.Set(utils.CDRSOURCE, self.cgrCfg.CdrcSourceId) for fldName, idx := range self.fieldIndxes { @@ -120,7 +122,7 @@ func (self *Cdrc) cdrAsHttpForm(record []string) (url.Values, error) { // One run over the CDR folder func (self *Cdrc) processCdrDir() error { - engine.Logger.Info(fmt.Sprintf("Parsing folder %s for CDR files.", self.cgrCfg.CdrcCdrInDir)) + engine.Logger.Info(fmt.Sprintf(" Parsing folder %s for CDR files.", self.cgrCfg.CdrcCdrInDir)) filesInDir, _ := ioutil.ReadDir(self.cgrCfg.CdrcCdrInDir) for _, file := range filesInDir { if err := self.processFile(path.Join(self.cgrCfg.CdrcCdrInDir, file.Name())); err != nil { @@ -141,11 +143,11 @@ func (self *Cdrc) trackCDRFiles() (err error) { if err != nil { return } - engine.Logger.Info(fmt.Sprintf("Monitoring %s for file moves.", self.cgrCfg.CdrcCdrInDir)) + engine.Logger.Info(fmt.Sprintf(" Monitoring %s for file moves.", self.cgrCfg.CdrcCdrInDir)) for { select { case ev := <-watcher.Event: - if ev.IsCreate() && path.Ext(ev.Name) != ".csv" { + if ev.IsCreate() { //&& path.Ext(ev.Name) != ".csv" if err = self.processFile(ev.Name); err != nil { return err } @@ -160,7 +162,7 @@ func (self *Cdrc) trackCDRFiles() (err error) { // Processe file at filePath and posts the valid cdr rows out of it func (self *Cdrc) processFile(filePath string) error { _, fn := path.Split(filePath) - engine.Logger.Info(fmt.Sprintf("Parsing: %s", filePath)) + engine.Logger.Info(fmt.Sprintf(" Parsing: %s", filePath)) file, err := os.Open(filePath) defer file.Close() if err != nil { @@ -168,17 +170,30 @@ func (self *Cdrc) processFile(filePath string) error { return err } csvReader := csv.NewReader(bufio.NewReader(file)) - for record, ok := csvReader.Read(); ok == nil; record, ok = csvReader.Read() { + for { + record, err := csvReader.Read() + if err != nil && err == io.EOF { + break // End of file + } else if err != nil { + engine.Logger.Err(fmt.Sprintf(" Error in csv file: %s", err.Error())) + continue // Other csv related errors, ignore + } cdrAsForm, err := self.cdrAsHttpForm(record) if err != nil { - engine.Logger.Err(err.Error()) + engine.Logger.Err(fmt.Sprintf(" Error in csv file: %s", err.Error())) continue } if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cgr", self.cgrCfg.CdrcCdrs), cdrAsForm); err != nil { - engine.Logger.Err(fmt.Sprintf("Failed posting CDR, error: %s", err.Error())) + engine.Logger.Err(fmt.Sprintf(" Failed posting CDR, error: %s", err.Error())) continue } } // Finished with file, move it to processed folder - return os.Rename(filePath, path.Join(self.cgrCfg.CdrcCdrOutDir, fn)) + newPath := path.Join(self.cgrCfg.CdrcCdrOutDir, fn) + if err:= os.Rename(filePath, newPath); err != nil { + engine.Logger.Err(err.Error()) + return err + } + engine.Logger.Info(fmt.Sprintf("Finished processing %s, moved to %s", fn, newPath)) + return nil } diff --git a/cdrs/cgrcdr_test.go b/cdrs/cgrcdr_test.go index b16a42de7..37f378671 100644 --- a/cdrs/cgrcdr_test.go +++ b/cdrs/cgrcdr_test.go @@ -32,7 +32,7 @@ curl --data "accid=asbfdsaf&cdrhost=192.168.1.1&reqtype=rated&direction=*out&ten func TestCgrCdrFields(t *testing.T) { cfg, _ = config.NewDefaultCGRConfig() cgrCdr := CgrCdr{"accid": "dsafdsaf", "cdrhost": "192.168.1.1", "reqtype": "rated", "direction": "*out", "tenant": "cgrates.org", "tor": "call", - "account": "1001", "subject": "1001", "destination": "1002", "answer_time": "1383813746", "duration": "10", "field_extr1": "val_extr1", "fieldextr2": "valextr2"} + "account": "1001", "subject": "1001", "destination": "1002", "answer_time": "2013-11-07 08:42:26", "duration": "10", "field_extr1": "val_extr1", "fieldextr2": "valextr2"} if cgrCdr.GetCgrId() != utils.FSCgrId("dsafdsaf") { t.Error("Error parsing cdr: ", cgrCdr) } diff --git a/local_test.sh b/local_test.sh index a93d87f53..77c35e2a3 100755 --- a/local_test.sh +++ b/local_test.sh @@ -6,8 +6,11 @@ go test github.com/cgrates/cgrates/apier/v1 -local ap=$? go test github.com/cgrates/cgrates/engine -local en=$? +go test github.com/cgrates/cgrates/cdrc -local +cdrc=$? -exit $gen && $ap && $en + +exit $gen && $ap && $en && cdrc diff --git a/utils/ratedcdr.go b/utils/ratedcdr.go index 19fee54fe..30146e323 100644 --- a/utils/ratedcdr.go +++ b/utils/ratedcdr.go @@ -41,3 +41,61 @@ type RatedCDR struct { MediationRunId string Cost float64 } + +// Methods maintaining CDR interface + +func (ratedCdr *RatedCDR) GetCgrId() string { + return ratedCdr.CgrId +} + +func (ratedCdr *RatedCDR) GetAccId() string { + return ratedCdr.AccId +} + +func (ratedCdr *RatedCDR) GetCdrHost() string { + return ratedCdr.CdrHost +} + +func (ratedCdr *RatedCDR) GetCdrSource() string { + return ratedCdr.CdrSource +} + +func (ratedCdr *RatedCDR) GetDirection() string { + return ratedCdr.Direction +} + +func (ratedCdr *RatedCDR) GetSubject() string { + return ratedCdr.Subject +} + +func (ratedCdr *RatedCDR) GetAccount() string { + return ratedCdr.Account +} + +func (ratedCdr *RatedCDR) GetDestination() string { + return ratedCdr.Destination +} + +func (ratedCdr *RatedCDR) GetTOR() string { + return ratedCdr.TOR +} + +func (ratedCdr *RatedCDR) GetTenant() string { + return ratedCdr.Tenant +} + +func (ratedCdr *RatedCDR) GetReqType() string { + return ratedCdr.ReqType +} + +func (ratedCdr *RatedCDR) GetAnswerTime() (time.Time, error) { + return ratedCdr.AnswerTime, nil +} + +func (ratedCdr *RatedCDR) GetDuration() int64 { + return ratedCdr.Duration +} + +func (ratedCdr *RatedCDR) GetExtraFields() map[string]string { + return ratedCdr.ExtraFields +}