diff --git a/config/config_defaults.go b/config/config_defaults.go index b3e6f64ee..3a616033e 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -319,7 +319,53 @@ const CGRATES_CFG_JSON = ` "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited "source_path": "/var/spool/cgrates/ers/in", // read data from this path "processed_path": "/var/spool/cgrates/ers/out", // move processed data here - "opts": {}, + "opts": { + // "queueID": "cgrates_cdrs", // the queue id for AMQP, AMQPv1, SQS and S3 readers from were the events are read + // "queueIDProcessed": "", // the queue id for AMQP, AMQPv1, SQS and S3 readers were the events are sent after they are processed + + // FileCSV, FlatStore and PartialCSV + // "lazyQuotes": false, // if a quote may appear in an unquoted field and a non-doubled quote may appear in a quoted field + + // AMQP + // "consumerTag": "cgrates", // the ID of the consumer + // "exchange": "", + // "exchangeType": "", + // "routingKey": "", + + // "exchangeProcessed": "", + // "exchangeTypeProcessed": "", + // "routingKeyProcessed": "", + + // Kafka + // "topic": "cgrates", // the topic from were the events are read + // "groupID": "cgrates", // the group that reads the events + // "maxWait": "1ms", // the maximum amount of time to wait for new data to come + + // "topicProcessed": "", // the topic were the events are sent after they are processed + + + // SQL + // "dbName": "cgrates", // the name of the database from were the events are read + // "tableName": "cdrs", // the name of the table from were the events are read + // "sslmode": "disable", // the ssl mode for postgres db + + // "dbNameProcessed": "", // the name of the database were the events are sent after they are processed + // "tableNameProcessed": "", // the name of the table were the events are sent after they are processed + // "sslmodeProcessed": "", // the ssl mode for postgres db + + // SQS and S3 + // "awsRegion": "", + // "awsKey": "", + // "awsSecret": "", + // "awsToken": "", + + // "awsRegionProcessed": "", + // "awsKeyProcessed": "", + // "awsSecretProcessed": "", + // "awsTokenProcessed": "", + // "folderPathProcessed": "", // only for S3 event posting + + }, "xml_root_path": "", // path towards one event in case of XML CDRs "tenant": "", // tenant used by import "timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index 7134bf05b..57279944d 100755 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -332,7 +332,53 @@ // "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited // "source_path": "/var/spool/cgrates/ers/in", // read data from this path // "processed_path": "/var/spool/cgrates/ers/out", // move processed data here -// "opts": {}, +// "opts": { +// // "queueID": "cgrates_cdrs", // the queue id for AMQP, AMQPv1, SQS and S3 readers from were the events are read +// // "queueIDProcessed": "", // the queue id for AMQP, AMQPv1, SQS and S3 readers were the events are sent after they are processed + +// // FileCSV, FlatStore and PartialCSV +// // "lazyQuotes": false, // if a quote may appear in an unquoted field and a non-doubled quote may appear in a quoted field + +// // AMQP +// // "consumerTag": "cgrates", // the ID of the consumer +// // "exchange": "", +// // "exchangeType": "", +// // "routingKey": "", + +// // "exchangeProcessed": "", +// // "exchangeTypeProcessed": "", +// // "routingKeyProcessed": "", + +// // Kafka +// // "topic": "cgrates", // the topic from were the events are read +// // "groupID": "cgrates", // the group that reads the events +// // "maxWait": "1ms", // the maximum amount of time to wait for new data to come + +// // "topicProcessed": "", // the topic were the events are sent after they are processed + + +// // SQL +// // "dbName": "cgrates", // the name of the database from were the events are read +// // "tableName": "cdrs", // the name of the table from were the events are read +// // "sslmode": "disable", // the ssl mode for postgres db + +// // "dbNameProcessed": "", // the name of the database were the events are sent after they are processed +// // "tableNameProcessed": "", // the name of the table were the events are sent after they are processed +// // "sslmodeProcessed": "", // the ssl mode for postgres db + +// // SQS and S3 +// // "awsRegion": "", +// // "awsKey": "", +// // "awsSecret": "", +// // "awsToken": "", + +// // "awsRegionProcessed": "", +// // "awsKeyProcessed": "", +// // "awsSecretProcessed": "", +// // "awsTokenProcessed": "", +// // "folderPathProcessed": "", // only for S3 event posting + +// }, // "xml_root_path": "", // path towards one event in case of XML CDRs // "tenant": "", // tenant used by import // "timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> diff --git a/ers/filecsv.go b/ers/filecsv.go index d4484c311..a3e3365fd 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -19,7 +19,6 @@ along with this program. If not, see package ers import ( - "bufio" "encoding/csv" "fmt" "io" @@ -131,12 +130,12 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) { return } defer file.Close() - csvReader := csv.NewReader(bufio.NewReader(file)) - csvReader.FieldsPerRecord = rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].RowLength - csvReader.Comment = utils.CommentChar - csvReader.Comma = utils.CSVSep - if len(rdr.Config().FieldSep) > 0 { - csvReader.Comma = rune(rdr.Config().FieldSep[0]) + var csvReader *csv.Reader + if csvReader, err = newCSVReader(file, rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].RowLength, rdr.Config().FieldSep, rdr.Config().Opts); err != nil { + utils.Logger.Err( + fmt.Sprintf("<%s> failed creating CSV reader for <%s>, due to option parsing error: <%s>", + utils.ERs, rdr.Config().ID, err.Error())) + return } var indxAls map[string]int rowNr := 0 // This counts the rows in the file, not really number of CDRs diff --git a/ers/flatstore.go b/ers/flatstore.go index 9b2c8ee0e..5ab0c1abe 100644 --- a/ers/flatstore.go +++ b/ers/flatstore.go @@ -19,7 +19,6 @@ along with this program. If not, see package ers import ( - "bufio" "encoding/csv" "errors" "fmt" @@ -138,13 +137,13 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) { return } defer file.Close() - csvReader := csv.NewReader(bufio.NewReader(file)) - csvReader.FieldsPerRecord = rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].RowLength - csvReader.Comma = ',' - if len(rdr.Config().FieldSep) > 0 { - csvReader.Comma = rune(rdr.Config().FieldSep[0]) + var csvReader *csv.Reader + if csvReader, err = newCSVReader(file, rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].RowLength, rdr.Config().FieldSep, rdr.Config().Opts); err != nil { + utils.Logger.Err( + fmt.Sprintf("<%s> failed creating CSV reader for <%s>, due to option parsing error: <%s>", + utils.ERs, rdr.Config().ID, err.Error())) + return } - csvReader.Comment = '#' rowNr := 0 // This counts the rows in the file, not really number of CDRs evsPosted := 0 timeStart := time.Now() diff --git a/ers/libers.go b/ers/libers.go index cfd4d6818..a321f612b 100644 --- a/ers/libers.go +++ b/ers/libers.go @@ -19,6 +19,8 @@ along with this program. If not, see package ers import ( + "encoding/csv" + "io" "strings" "github.com/cgrates/cgrates/utils" @@ -33,3 +35,17 @@ func getProcessOptions(opts map[string]interface{}) (proc map[string]interface{} } return } + +func newCSVReader(file io.Reader, rowLenght int, fieldSep string, opts map[string]interface{}) (csvReader *csv.Reader, err error) { + csvReader = csv.NewReader(file) + csvReader.FieldsPerRecord = rowLenght + csvReader.Comment = utils.CommentChar + csvReader.Comma = utils.CSVSep + if len(fieldSep) > 0 { + csvReader.Comma = rune(fieldSep[0]) + } + if val, has := opts[utils.LazyQuotes]; has { + csvReader.LazyQuotes, err = utils.IfaceAsBool(val) + } + return +} diff --git a/ers/partial_csv.go b/ers/partial_csv.go index fedfab64a..9fb9095da 100644 --- a/ers/partial_csv.go +++ b/ers/partial_csv.go @@ -19,7 +19,6 @@ along with this program. If not, see package ers import ( - "bufio" "encoding/csv" "fmt" "io" @@ -145,13 +144,13 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) { return } defer file.Close() - csvReader := csv.NewReader(bufio.NewReader(file)) - csvReader.FieldsPerRecord = rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].RowLength - csvReader.Comma = ',' - if len(rdr.Config().FieldSep) > 0 { - csvReader.Comma = rune(rdr.Config().FieldSep[0]) + var csvReader *csv.Reader + if csvReader, err = newCSVReader(file, rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].RowLength, rdr.Config().FieldSep, rdr.Config().Opts); err != nil { + utils.Logger.Err( + fmt.Sprintf("<%s> failed creating CSV reader for <%s>, due to option parsing error: <%s>", + utils.ERs, rdr.Config().ID, err.Error())) + return } - csvReader.Comment = '#' var indxAls map[string]int rowNr := 0 // This counts the rows in the file, not really number of CDRs evsPosted := 0 diff --git a/utils/consts.go b/utils/consts.go index 54eb88267..6effaf60c 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2339,6 +2339,8 @@ const ( AMQPDefaultConsumerTag = "cgrates" AMQPConsumerTag = "consumerTag" + LazyQuotes = "lazyQuotes" + KafkaDefaultTopic = "cgrates" KafkaDefaultGroupID = "cgrates" KafkaDefaultMaxWait = time.Millisecond