mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
RatedCDR maintaining CDR interface, Cdrc csv reader does not longer stops in case of row error, further test related fixups
This commit is contained in:
@@ -77,7 +77,7 @@ func TestCreateTables(t *testing.T) {
|
||||
mysql = d.(*engine.MySQLStorage)
|
||||
}
|
||||
for _, scriptName := range []string{engine.CREATE_CDRS_TABLES_SQL, engine.CREATE_COSTDETAILS_TABLES_SQL, engine.CREATE_MEDIATOR_TABLES_SQL, engine.CREATE_TARIFFPLAN_TABLES_SQL} {
|
||||
if err := mysql.CreateTablesFromScript(path.Join(*dataDir, "storage", *stprDbType, scriptName)); err != nil {
|
||||
if err := mysql.CreateTablesFromScript(path.Join(*dataDir, "storage", *storDbType, scriptName)); err != nil {
|
||||
t.Fatal("Error on mysql creation: ", err.Error())
|
||||
return // No point in going further
|
||||
}
|
||||
|
||||
Binary file not shown.
31
cdrc/cdrc.go
31
cdrc/cdrc.go
@@ -28,6 +28,7 @@ import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/howeyc/fsnotify"
|
||||
"io/ioutil"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
@@ -107,6 +108,7 @@ func (self *Cdrc) parseFieldIndexesFromConfig() error {
|
||||
|
||||
// Takes the record out of csv and turns it into http form which can be posted
|
||||
func (self *Cdrc) cdrAsHttpForm(record []string) (url.Values, error) {
|
||||
engine.Logger.Info(fmt.Sprintf("Processing record %v", record))
|
||||
v := url.Values{}
|
||||
v.Set(utils.CDRSOURCE, self.cgrCfg.CdrcSourceId)
|
||||
for fldName, idx := range self.fieldIndxes {
|
||||
@@ -120,7 +122,7 @@ func (self *Cdrc) cdrAsHttpForm(record []string) (url.Values, error) {
|
||||
|
||||
// One run over the CDR folder
|
||||
func (self *Cdrc) processCdrDir() error {
|
||||
engine.Logger.Info(fmt.Sprintf("Parsing folder %s for CDR files.", self.cgrCfg.CdrcCdrInDir))
|
||||
engine.Logger.Info(fmt.Sprintf("<Cdrc> Parsing folder %s for CDR files.", self.cgrCfg.CdrcCdrInDir))
|
||||
filesInDir, _ := ioutil.ReadDir(self.cgrCfg.CdrcCdrInDir)
|
||||
for _, file := range filesInDir {
|
||||
if err := self.processFile(path.Join(self.cgrCfg.CdrcCdrInDir, file.Name())); err != nil {
|
||||
@@ -141,11 +143,11 @@ func (self *Cdrc) trackCDRFiles() (err error) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
engine.Logger.Info(fmt.Sprintf("Monitoring %s for file moves.", self.cgrCfg.CdrcCdrInDir))
|
||||
engine.Logger.Info(fmt.Sprintf("<Cdrc> Monitoring %s for file moves.", self.cgrCfg.CdrcCdrInDir))
|
||||
for {
|
||||
select {
|
||||
case ev := <-watcher.Event:
|
||||
if ev.IsCreate() && path.Ext(ev.Name) != ".csv" {
|
||||
if ev.IsCreate() { //&& path.Ext(ev.Name) != ".csv"
|
||||
if err = self.processFile(ev.Name); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -160,7 +162,7 @@ func (self *Cdrc) trackCDRFiles() (err error) {
|
||||
// Processe file at filePath and posts the valid cdr rows out of it
|
||||
func (self *Cdrc) processFile(filePath string) error {
|
||||
_, fn := path.Split(filePath)
|
||||
engine.Logger.Info(fmt.Sprintf("Parsing: %s", filePath))
|
||||
engine.Logger.Info(fmt.Sprintf("<Cdrc> Parsing: %s", filePath))
|
||||
file, err := os.Open(filePath)
|
||||
defer file.Close()
|
||||
if err != nil {
|
||||
@@ -168,17 +170,30 @@ func (self *Cdrc) processFile(filePath string) error {
|
||||
return err
|
||||
}
|
||||
csvReader := csv.NewReader(bufio.NewReader(file))
|
||||
for record, ok := csvReader.Read(); ok == nil; record, ok = csvReader.Read() {
|
||||
for {
|
||||
record, err := csvReader.Read()
|
||||
if err != nil && err == io.EOF {
|
||||
break // End of file
|
||||
} else if err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Error in csv file: %s", err.Error()))
|
||||
continue // Other csv related errors, ignore
|
||||
}
|
||||
cdrAsForm, err := self.cdrAsHttpForm(record)
|
||||
if err != nil {
|
||||
engine.Logger.Err(err.Error())
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Error in csv file: %s", err.Error()))
|
||||
continue
|
||||
}
|
||||
if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cgr", self.cgrCfg.CdrcCdrs), cdrAsForm); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("Failed posting CDR, error: %s", err.Error()))
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed posting CDR, error: %s", err.Error()))
|
||||
continue
|
||||
}
|
||||
}
|
||||
// Finished with file, move it to processed folder
|
||||
return os.Rename(filePath, path.Join(self.cgrCfg.CdrcCdrOutDir, fn))
|
||||
newPath := path.Join(self.cgrCfg.CdrcCdrOutDir, fn)
|
||||
if err:= os.Rename(filePath, newPath); err != nil {
|
||||
engine.Logger.Err(err.Error())
|
||||
return err
|
||||
}
|
||||
engine.Logger.Info(fmt.Sprintf("Finished processing %s, moved to %s", fn, newPath))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ curl --data "accid=asbfdsaf&cdrhost=192.168.1.1&reqtype=rated&direction=*out&ten
|
||||
func TestCgrCdrFields(t *testing.T) {
|
||||
cfg, _ = config.NewDefaultCGRConfig()
|
||||
cgrCdr := CgrCdr{"accid": "dsafdsaf", "cdrhost": "192.168.1.1", "reqtype": "rated", "direction": "*out", "tenant": "cgrates.org", "tor": "call",
|
||||
"account": "1001", "subject": "1001", "destination": "1002", "answer_time": "1383813746", "duration": "10", "field_extr1": "val_extr1", "fieldextr2": "valextr2"}
|
||||
"account": "1001", "subject": "1001", "destination": "1002", "answer_time": "2013-11-07 08:42:26", "duration": "10", "field_extr1": "val_extr1", "fieldextr2": "valextr2"}
|
||||
if cgrCdr.GetCgrId() != utils.FSCgrId("dsafdsaf") {
|
||||
t.Error("Error parsing cdr: ", cgrCdr)
|
||||
}
|
||||
|
||||
@@ -6,8 +6,11 @@ go test github.com/cgrates/cgrates/apier/v1 -local
|
||||
ap=$?
|
||||
go test github.com/cgrates/cgrates/engine -local
|
||||
en=$?
|
||||
go test github.com/cgrates/cgrates/cdrc -local
|
||||
cdrc=$?
|
||||
|
||||
|
||||
|
||||
exit $gen && $ap && $en
|
||||
|
||||
exit $gen && $ap && $en && cdrc
|
||||
|
||||
|
||||
@@ -41,3 +41,61 @@ type RatedCDR struct {
|
||||
MediationRunId string
|
||||
Cost float64
|
||||
}
|
||||
|
||||
// Methods maintaining CDR interface
|
||||
|
||||
func (ratedCdr *RatedCDR) GetCgrId() string {
|
||||
return ratedCdr.CgrId
|
||||
}
|
||||
|
||||
func (ratedCdr *RatedCDR) GetAccId() string {
|
||||
return ratedCdr.AccId
|
||||
}
|
||||
|
||||
func (ratedCdr *RatedCDR) GetCdrHost() string {
|
||||
return ratedCdr.CdrHost
|
||||
}
|
||||
|
||||
func (ratedCdr *RatedCDR) GetCdrSource() string {
|
||||
return ratedCdr.CdrSource
|
||||
}
|
||||
|
||||
func (ratedCdr *RatedCDR) GetDirection() string {
|
||||
return ratedCdr.Direction
|
||||
}
|
||||
|
||||
func (ratedCdr *RatedCDR) GetSubject() string {
|
||||
return ratedCdr.Subject
|
||||
}
|
||||
|
||||
func (ratedCdr *RatedCDR) GetAccount() string {
|
||||
return ratedCdr.Account
|
||||
}
|
||||
|
||||
func (ratedCdr *RatedCDR) GetDestination() string {
|
||||
return ratedCdr.Destination
|
||||
}
|
||||
|
||||
func (ratedCdr *RatedCDR) GetTOR() string {
|
||||
return ratedCdr.TOR
|
||||
}
|
||||
|
||||
func (ratedCdr *RatedCDR) GetTenant() string {
|
||||
return ratedCdr.Tenant
|
||||
}
|
||||
|
||||
func (ratedCdr *RatedCDR) GetReqType() string {
|
||||
return ratedCdr.ReqType
|
||||
}
|
||||
|
||||
func (ratedCdr *RatedCDR) GetAnswerTime() (time.Time, error) {
|
||||
return ratedCdr.AnswerTime, nil
|
||||
}
|
||||
|
||||
func (ratedCdr *RatedCDR) GetDuration() int64 {
|
||||
return ratedCdr.Duration
|
||||
}
|
||||
|
||||
func (ratedCdr *RatedCDR) GetExtraFields() map[string]string {
|
||||
return ratedCdr.ExtraFields
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user