Added lazyQuotes option for csv reader

This commit is contained in:
Trial97
2021-04-26 11:26:19 +03:00
committed by Dan Christian Bogos
parent b451279779
commit 13cd8e8a99
7 changed files with 130 additions and 23 deletions

View File

@@ -368,7 +368,53 @@ const CGRATES_CFG_JSON = `
"concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited
"source_path": "/var/spool/cgrates/ers/in", // read data from this path
"processed_path": "/var/spool/cgrates/ers/out", // move processed data here
"opts": {},
"opts": {
// "queueID": "cgrates_cdrs", // the queue id for AMQP, AMQPv1, SQS and S3 readers from were the events are read
// "queueIDProcessed": "", // the queue id for AMQP, AMQPv1, SQS and S3 readers were the events are sent after they are processed
// FileCSV, FlatStore and PartialCSV
// "lazyQuotes": false, // if a quote may appear in an unquoted field and a non-doubled quote may appear in a quoted field
// AMQP
// "consumerTag": "cgrates", // the ID of the consumer
// "exchange": "",
// "exchangeType": "",
// "routingKey": "",
// "exchangeProcessed": "",
// "exchangeTypeProcessed": "",
// "routingKeyProcessed": "",
// Kafka
// "topic": "cgrates", // the topic from were the events are read
// "groupID": "cgrates", // the group that reads the events
// "maxWait": "1ms", // the maximum amount of time to wait for new data to come
// "topicProcessed": "", // the topic were the events are sent after they are processed
// SQL
// "dbName": "cgrates", // the name of the database from were the events are read
// "tableName": "cdrs", // the name of the table from were the events are read
// "sslmode": "disable", // the ssl mode for postgres db
// "dbNameProcessed": "", // the name of the database were the events are sent after they are processed
// "tableNameProcessed": "", // the name of the table were the events are sent after they are processed
// "sslmodeProcessed": "", // the ssl mode for postgres db
// SQS and S3
// "awsRegion": "",
// "awsKey": "",
// "awsSecret": "",
// "awsToken": "",
// "awsRegionProcessed": "",
// "awsKeyProcessed": "",
// "awsSecretProcessed": "",
// "awsTokenProcessed": "",
// "folderPathProcessed": "", // only for S3 event posting
},
"xml_root_path": "", // path towards one event in case of XML CDRs
"tenant": "", // tenant used by import
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>

View File

@@ -347,7 +347,53 @@
// "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited
// "source_path": "/var/spool/cgrates/ers/in", // read data from this path
// "processed_path": "/var/spool/cgrates/ers/out", // move processed data here
// "opts": {},
// "opts": {
// // "queueID": "cgrates_cdrs", // the queue id for AMQP, AMQPv1, SQS and S3 readers from were the events are read
// // "queueIDProcessed": "", // the queue id for AMQP, AMQPv1, SQS and S3 readers were the events are sent after they are processed
// // FileCSV, FlatStore and PartialCSV
// // "lazyQuotes": false, // if a quote may appear in an unquoted field and a non-doubled quote may appear in a quoted field
// // AMQP
// // "consumerTag": "cgrates", // the ID of the consumer
// // "exchange": "",
// // "exchangeType": "",
// // "routingKey": "",
// // "exchangeProcessed": "",
// // "exchangeTypeProcessed": "",
// // "routingKeyProcessed": "",
// // Kafka
// // "topic": "cgrates", // the topic from were the events are read
// // "groupID": "cgrates", // the group that reads the events
// // "maxWait": "1ms", // the maximum amount of time to wait for new data to come
// // "topicProcessed": "", // the topic were the events are sent after they are processed
// // SQL
// // "dbName": "cgrates", // the name of the database from were the events are read
// // "tableName": "cdrs", // the name of the table from were the events are read
// // "sslmode": "disable", // the ssl mode for postgres db
// // "dbNameProcessed": "", // the name of the database were the events are sent after they are processed
// // "tableNameProcessed": "", // the name of the table were the events are sent after they are processed
// // "sslmodeProcessed": "", // the ssl mode for postgres db
// // SQS and S3
// // "awsRegion": "",
// // "awsKey": "",
// // "awsSecret": "",
// // "awsToken": "",
// // "awsRegionProcessed": "",
// // "awsKeyProcessed": "",
// // "awsSecretProcessed": "",
// // "awsTokenProcessed": "",
// // "folderPathProcessed": "", // only for S3 event posting
// },
// "xml_root_path": "", // path towards one event in case of XML CDRs
// "tenant": "", // tenant used by import
// "timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ers
import (
"bufio"
"encoding/csv"
"fmt"
"io"
@@ -132,12 +131,12 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) {
return
}
defer file.Close()
csvReader := csv.NewReader(bufio.NewReader(file))
csvReader.FieldsPerRecord = rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].RowLength
csvReader.Comment = utils.CommentChar
csvReader.Comma = utils.CSVSep
if len(rdr.Config().FieldSep) > 0 {
csvReader.Comma = rune(rdr.Config().FieldSep[0])
var csvReader *csv.Reader
if csvReader, err = newCSVReader(file, rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].RowLength, rdr.Config().FieldSep, rdr.Config().Opts); err != nil {
utils.Logger.Err(
fmt.Sprintf("<%s> failed creating CSV reader for <%s>, due to option parsing error: <%s>",
utils.ERs, rdr.Config().ID, err.Error()))
return
}
var indxAls map[string]int
rowNr := 0 // This counts the rows in the file, not really number of CDRs

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ers
import (
"bufio"
"encoding/csv"
"errors"
"fmt"
@@ -139,13 +138,13 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) {
return
}
defer file.Close()
csvReader := csv.NewReader(bufio.NewReader(file))
csvReader.FieldsPerRecord = rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].RowLength
csvReader.Comma = ','
if len(rdr.Config().FieldSep) > 0 {
csvReader.Comma = rune(rdr.Config().FieldSep[0])
var csvReader *csv.Reader
if csvReader, err = newCSVReader(file, rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].RowLength, rdr.Config().FieldSep, rdr.Config().Opts); err != nil {
utils.Logger.Err(
fmt.Sprintf("<%s> failed creating CSV reader for <%s>, due to option parsing error: <%s>",
utils.ERs, rdr.Config().ID, err.Error()))
return
}
csvReader.Comment = '#'
rowNr := 0 // This counts the rows in the file, not really number of CDRs
evsPosted := 0
timeStart := time.Now()

View File

@@ -19,6 +19,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ers
import (
"encoding/csv"
"io"
"strings"
"github.com/cgrates/cgrates/utils"
@@ -33,3 +35,17 @@ func getProcessOptions(opts map[string]interface{}) (proc map[string]interface{}
}
return
}
func newCSVReader(file io.Reader, rowLenght int, fieldSep string, opts map[string]interface{}) (csvReader *csv.Reader, err error) {
csvReader = csv.NewReader(file)
csvReader.FieldsPerRecord = rowLenght
csvReader.Comment = utils.CommentChar
csvReader.Comma = utils.CSVSep
if len(fieldSep) > 0 {
csvReader.Comma = rune(fieldSep[0])
}
if val, has := opts[utils.LazyQuotes]; has {
csvReader.LazyQuotes, err = utils.IfaceAsBool(val)
}
return
}

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ers
import (
"bufio"
"encoding/csv"
"fmt"
"io"
@@ -144,13 +143,13 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) {
return
}
defer file.Close()
csvReader := csv.NewReader(bufio.NewReader(file))
csvReader.FieldsPerRecord = rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].RowLength
csvReader.Comma = ','
if len(rdr.Config().FieldSep) > 0 {
csvReader.Comma = rune(rdr.Config().FieldSep[0])
var csvReader *csv.Reader
if csvReader, err = newCSVReader(file, rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].RowLength, rdr.Config().FieldSep, rdr.Config().Opts); err != nil {
utils.Logger.Err(
fmt.Sprintf("<%s> failed creating CSV reader for <%s>, due to option parsing error: <%s>",
utils.ERs, rdr.Config().ID, err.Error()))
return
}
csvReader.Comment = '#'
var indxAls map[string]int
rowNr := 0 // This counts the rows in the file, not really number of CDRs
evsPosted := 0

View File

@@ -2556,6 +2556,8 @@ const (
AMQPDefaultConsumerTag = "cgrates"
AMQPConsumerTag = "consumerTag"
LazyQuotes = "lazyQuotes"
KafkaDefaultTopic = "cgrates"
KafkaDefaultGroupID = "cgrates"
KafkaDefaultMaxWait = time.Millisecond