From 16d25cc6cbae5c2342ec560326727a22dad0593f Mon Sep 17 00:00:00 2001 From: adragusin Date: Tue, 31 Mar 2020 13:51:05 +0300 Subject: [PATCH] Added a new configuration option in ers, row_length --- config/config_defaults.go | 1 + config/erscfg.go | 1 + config/erscfg_test.go | 3 +++ data/conf/cgrates/cgrates.json | 1 + data/conf/samples/ers_example/cgrates.json | 1 + data/conf/samples/ers_internal/cgrates.json | 1 + data/conf/samples/ers_mongo/cgrates.json | 1 + data/conf/samples/ers_mysql/cgrates.json | 1 + data/conf/samples/ers_postgres/cgrates.json | 1 + data/conf/samples/ers_reload/first_reload/cgrates.json | 1 + data/conf/samples/ers_reload/internal/cgrates.json | 1 + data/conf/samples/ers_reload/second_reload/cgrates.json | 1 + docs/ers.rst | 1 + ers/filecsv.go | 1 + ers/flatstore.go | 1 + ers/kafka_it_test.go | 1 + ers/partial_csv.go | 1 + ers/sql_it_test.go | 1 + 18 files changed, 20 insertions(+) diff --git a/config/config_defaults.go b/config/config_defaults.go index 7c97d53cb..41ab90da7 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -300,6 +300,7 @@ const CGRATES_CFG_JSON = ` "ers": { // EventReaderService "enabled": false, // starts the EventReader service: "sessions_conns":["*internal"], // RPC Connections IDs + "row_length" : 0, // Number of fields from csv file "readers": [ { "id": "*default", // identifier of the EventReader profile diff --git a/config/erscfg.go b/config/erscfg.go index 47f4e1821..c9e3e0313 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -27,6 +27,7 @@ import ( type ERsCfg struct { Enabled bool SessionSConns []string + RowLength int Readers []*EventReaderCfg } diff --git a/config/erscfg_test.go b/config/erscfg_test.go index 002ed4e88..2b0fe282b 100644 --- a/config/erscfg_test.go +++ b/config/erscfg_test.go @@ -183,6 +183,7 @@ func TestEventReaderLoadFromJSON(t *testing.T) { "ers": { "enabled": true, "sessions_conns":["conn1","conn3"], + "row_length" : 0, "readers": [ { "id": "file_reader1", @@ -208,6 +209,7 @@ func TestEventReaderSanitization(t *testing.T) { cfgJSONStr := `{ "ers": { "enabled": true, + "row_length" : 0, "readers": [ { "id": "file_reader1", @@ -295,6 +297,7 @@ func TestEventReaderSameID(t *testing.T) { "ers": { "enabled": true, "sessions_conns":["conn1"], + "row_length" : 0, "readers": [ { "id": "file_reader1", diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index 95fd57b9a..76aaba9d6 100755 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -279,6 +279,7 @@ // "ers": { // EventReaderService // "enabled": false, // starts the EventReader service: // "sessions_conns":["*internal"], // RPC Connections IDs +// "row_length" : 0, // "readers": [ // { // "id": "*default", // identifier of the EventReader profile diff --git a/data/conf/samples/ers_example/cgrates.json b/data/conf/samples/ers_example/cgrates.json index 51a504886..4f9308b71 100644 --- a/data/conf/samples/ers_example/cgrates.json +++ b/data/conf/samples/ers_example/cgrates.json @@ -67,6 +67,7 @@ "ers": { "enabled": true, "sessions_conns": ["*localhost"], + "row_length" : 0, "readers": [ { "id": "file_reader1", diff --git a/data/conf/samples/ers_internal/cgrates.json b/data/conf/samples/ers_internal/cgrates.json index 13011d79b..5e526c296 100644 --- a/data/conf/samples/ers_internal/cgrates.json +++ b/data/conf/samples/ers_internal/cgrates.json @@ -77,6 +77,7 @@ "ers": { "enabled": true, "sessions_conns": ["*internal"], + "row_length" : 0, "readers": [ { "id": "file_reader1", diff --git a/data/conf/samples/ers_mongo/cgrates.json b/data/conf/samples/ers_mongo/cgrates.json index 46899652c..b3657e92e 100644 --- a/data/conf/samples/ers_mongo/cgrates.json +++ b/data/conf/samples/ers_mongo/cgrates.json @@ -78,6 +78,7 @@ "ers": { "enabled": true, "sessions_conns": ["*internal"], + "row_length" : 0, "readers": [ { "id": "file_reader1", diff --git a/data/conf/samples/ers_mysql/cgrates.json b/data/conf/samples/ers_mysql/cgrates.json index b386722ab..ea3764200 100644 --- a/data/conf/samples/ers_mysql/cgrates.json +++ b/data/conf/samples/ers_mysql/cgrates.json @@ -75,6 +75,7 @@ "ers": { "enabled": true, "sessions_conns": ["*internal"], + "row_length" : 0, "readers": [ { "id": "file_reader1", diff --git a/data/conf/samples/ers_postgres/cgrates.json b/data/conf/samples/ers_postgres/cgrates.json index fa33f3548..6413fd0dc 100644 --- a/data/conf/samples/ers_postgres/cgrates.json +++ b/data/conf/samples/ers_postgres/cgrates.json @@ -72,6 +72,7 @@ "ers": { "enabled": true, "sessions_conns": ["*internal"], + "row_length" : 0, "readers": [ { "id": "file_reader1", diff --git a/data/conf/samples/ers_reload/first_reload/cgrates.json b/data/conf/samples/ers_reload/first_reload/cgrates.json index a18b62153..ee40df698 100644 --- a/data/conf/samples/ers_reload/first_reload/cgrates.json +++ b/data/conf/samples/ers_reload/first_reload/cgrates.json @@ -67,6 +67,7 @@ "ers": { "enabled": true, "sessions_conns": ["*localhost"], + "row_length" : 0, "readers": [ { "id": "file_reader1", diff --git a/data/conf/samples/ers_reload/internal/cgrates.json b/data/conf/samples/ers_reload/internal/cgrates.json index d769b65c6..c78837d09 100644 --- a/data/conf/samples/ers_reload/internal/cgrates.json +++ b/data/conf/samples/ers_reload/internal/cgrates.json @@ -67,6 +67,7 @@ "ers": { "enabled": true, "sessions_conns": ["*internal"], + "row_length" : 0, "readers": [ { "id": "file_reader1", diff --git a/data/conf/samples/ers_reload/second_reload/cgrates.json b/data/conf/samples/ers_reload/second_reload/cgrates.json index aa2178b2d..6e1d1f934 100644 --- a/data/conf/samples/ers_reload/second_reload/cgrates.json +++ b/data/conf/samples/ers_reload/second_reload/cgrates.json @@ -67,6 +67,7 @@ "ers": { "enabled": true, "sessions_conns": ["*localhost"], + "row_length" : 0, "readers": [ { "id": "file_reader1", diff --git a/docs/ers.rst b/docs/ers.rst index de91319c1..a8fe88318 100644 --- a/docs/ers.rst +++ b/docs/ers.rst @@ -33,6 +33,7 @@ With explanations in the comments: "ers": { "enabled": true, // enable the service "sessions_conns": ["*internal"], // connection towards SessionS + "row_length" : 0, // Number of fields from csv file "readers": [ // list of active readers { "id": "file_reader2", // file_reader2 reader diff --git a/ers/filecsv.go b/ers/filecsv.go index 4cdd2bf11..8c08e10d3 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -130,6 +130,7 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) { } defer file.Close() csvReader := csv.NewReader(bufio.NewReader(file)) + csvReader.FieldsPerRecord = rdr.cgrCfg.ERsCfg().RowLength csvReader.Comma = utils.CSV_SEP if len(rdr.Config().FieldSep) > 0 { csvReader.Comma = rune(rdr.Config().FieldSep[0]) diff --git a/ers/flatstore.go b/ers/flatstore.go index f85d824cb..75d0f904e 100644 --- a/ers/flatstore.go +++ b/ers/flatstore.go @@ -137,6 +137,7 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) { } defer file.Close() csvReader := csv.NewReader(bufio.NewReader(file)) + csvReader.FieldsPerRecord = rdr.cgrCfg.ERsCfg().RowLength csvReader.Comma = ',' if len(rdr.Config().FieldSep) > 0 { csvReader.Comma = rune(rdr.Config().FieldSep[0]) diff --git a/ers/kafka_it_test.go b/ers/kafka_it_test.go index 67f9b1055..a5eda5f03 100644 --- a/ers/kafka_it_test.go +++ b/ers/kafka_it_test.go @@ -44,6 +44,7 @@ func TestKafkaER(t *testing.T) { cfg, err := config.NewCGRConfigFromJsonStringWithDefaults(`{ "ers": { // EventReaderService "enabled": true, // starts the EventReader service: + "row_length" : 0, "readers": [ { "id": "kafka", // identifier of the EventReader profile diff --git a/ers/partial_csv.go b/ers/partial_csv.go index 455fa8b30..bcd765ca9 100644 --- a/ers/partial_csv.go +++ b/ers/partial_csv.go @@ -144,6 +144,7 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) { } defer file.Close() csvReader := csv.NewReader(bufio.NewReader(file)) + csvReader.FieldsPerRecord = rdr.cgrCfg.ERsCfg().RowLength csvReader.Comma = ',' if len(rdr.Config().FieldSep) > 0 { csvReader.Comma = rune(rdr.Config().FieldSep[0]) diff --git a/ers/sql_it_test.go b/ers/sql_it_test.go index 601d6bdb1..08ffd03c1 100644 --- a/ers/sql_it_test.go +++ b/ers/sql_it_test.go @@ -73,6 +73,7 @@ func testSQLInitConfig(t *testing.T) { }, "ers": { // EventReaderService "enabled": true, // starts the EventReader service: + "row_length" : 0, "readers": [ { "id": "mysql", // identifier of the EventReader profile