From 336e755d531ddc873d58143bc011cd835e967690 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Fri, 29 May 2020 13:46:10 +0300 Subject: [PATCH] Added support to reference CSV fields by the column name. Fixes #2194 --- config/config_defaults.go | 1 + config/config_it_test.go | 60 ++++++----- config/config_json_test.go | 31 +++--- config/config_test.go | 52 ++++----- config/erscfg.go | 6 ++ config/erscfg_test.go | 114 ++++++++++---------- config/libconfig_json.go | 1 + config/slicedp.go | 32 ++++-- data/conf/samples/ers_internal/cgrates.json | 18 ++-- data/conf/samples/ers_mongo/cgrates.json | 18 ++-- data/conf/samples/ers_mysql/cgrates.json | 18 ++-- data/conf/samples/ers_postgres/cgrates.json | 18 ++-- ers/filecsv.go | 16 ++- ers/filecsv_it_test.go | 4 +- ers/flatstore.go | 2 +- ers/partial_csv.go | 13 ++- packages/debian/changelog | 2 + utils/consts.go | 1 + 18 files changed, 234 insertions(+), 173 deletions(-) diff --git a/config/config_defaults.go b/config/config_defaults.go index 1d8c5b849..d0eec4654 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -316,6 +316,7 @@ const CGRATES_CFG_JSON = ` "type": "*none", // reader type <*file_csv> "row_length" : 0, // Number of fields from csv file "field_separator": ",", // separator used in case of csv files + "header_define_character": ":", // the starting character for header definition used in case of CSV files "run_delay": "0", // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited "source_path": "/var/spool/cgrates/cdrc/in", // read data from this path diff --git a/config/config_it_test.go b/config/config_it_test.go index e81567ee8..d65fd8866 100644 --- a/config/config_it_test.go +++ b/config/config_it_test.go @@ -458,32 +458,34 @@ func testCGRConfigReloadERs(t *testing.T) { SessionSConns: []string{utils.MetaLocalHost}, Readers: []*EventReaderCfg{ { - ID: utils.MetaDefault, - Type: utils.META_NONE, - RowLength: 0, - FieldSep: ",", - RunDelay: 0, - ConcurrentReqs: 1024, - SourcePath: "/var/spool/cgrates/cdrc/in", - ProcessedPath: "/var/spool/cgrates/cdrc/out", - Filters: []string{}, - Flags: flagsDefault, - Fields: content, - CacheDumpFields: []*FCTemplate{}, - XmlRootPath: utils.HierarchyPath{utils.EmptyString}, + ID: utils.MetaDefault, + Type: utils.META_NONE, + RowLength: 0, + FieldSep: ",", + HeaderDefineChar: ":", + RunDelay: 0, + ConcurrentReqs: 1024, + SourcePath: "/var/spool/cgrates/cdrc/in", + ProcessedPath: "/var/spool/cgrates/cdrc/out", + Filters: []string{}, + Flags: flagsDefault, + Fields: content, + CacheDumpFields: []*FCTemplate{}, + XmlRootPath: utils.HierarchyPath{utils.EmptyString}, }, { - ID: "file_reader1", - Type: utils.MetaFileCSV, - FieldSep: ",", - RunDelay: -1, - ConcurrentReqs: 1024, - SourcePath: "/tmp/ers/in", - ProcessedPath: "/tmp/ers/out", - Flags: flags, - Fields: content, - CacheDumpFields: []*FCTemplate{}, - XmlRootPath: utils.HierarchyPath{utils.EmptyString}, + ID: "file_reader1", + Type: utils.MetaFileCSV, + FieldSep: ",", + HeaderDefineChar: ":", + RunDelay: -1, + ConcurrentReqs: 1024, + SourcePath: "/tmp/ers/in", + ProcessedPath: "/tmp/ers/out", + Flags: flags, + Fields: content, + CacheDumpFields: []*FCTemplate{}, + XmlRootPath: utils.HierarchyPath{utils.EmptyString}, }, }, } @@ -851,6 +853,7 @@ func testCgrCfgV1ReloadConfigSection(t *testing.T) { "ConcurrentReqs": 1024, "Fields": content, "FieldSep": ",", + "HeaderDefineChar": ":", "Filters": []interface{}{}, "Flags": map[string]interface{}{}, "FailedCallsPrefix": "", @@ -865,10 +868,11 @@ func testCgrCfgV1ReloadConfigSection(t *testing.T) { "XmlRootPath": []interface{}{utils.EmptyString}, }, map[string]interface{}{ - "CacheDumpFields": []interface{}{}, - "ConcurrentReqs": 1024, - "FieldSep": ",", - "Filters": nil, + "CacheDumpFields": []interface{}{}, + "ConcurrentReqs": 1024, + "FieldSep": ",", + "HeaderDefineChar": ":", + "Filters": nil, "Flags": map[string]interface{}{ "*dryrun": []interface{}{}, }, diff --git a/config/config_json_test.go b/config/config_json_test.go index 95419efa6..9438e9103 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -1847,21 +1847,22 @@ func TestDfEventReaderCfg(t *testing.T) { Sessions_conns: &[]string{utils.MetaInternal}, Readers: &[]*EventReaderJsonCfg{ { - Id: utils.StringPointer(utils.MetaDefault), - Type: utils.StringPointer(utils.META_NONE), - Row_length: utils.IntPointer(0), - Field_separator: utils.StringPointer(","), - Run_delay: utils.StringPointer("0"), - Concurrent_requests: utils.IntPointer(1024), - Source_path: utils.StringPointer("/var/spool/cgrates/cdrc/in"), - Processed_path: utils.StringPointer("/var/spool/cgrates/cdrc/out"), - Xml_root_path: utils.StringPointer(utils.EmptyString), - Tenant: utils.StringPointer(utils.EmptyString), - Timezone: utils.StringPointer(utils.EmptyString), - Filters: &[]string{}, - Flags: &[]string{}, - Fields: &cdrFields, - Cache_dump_fields: &[]*FcTemplateJsonCfg{}, + Id: utils.StringPointer(utils.MetaDefault), + Type: utils.StringPointer(utils.META_NONE), + Row_length: utils.IntPointer(0), + Field_separator: utils.StringPointer(","), + Header_define_character: utils.StringPointer(":"), + Run_delay: utils.StringPointer("0"), + Concurrent_requests: utils.IntPointer(1024), + Source_path: utils.StringPointer("/var/spool/cgrates/cdrc/in"), + Processed_path: utils.StringPointer("/var/spool/cgrates/cdrc/out"), + Xml_root_path: utils.StringPointer(utils.EmptyString), + Tenant: utils.StringPointer(utils.EmptyString), + Timezone: utils.StringPointer(utils.EmptyString), + Filters: &[]string{}, + Flags: &[]string{}, + Fields: &cdrFields, + Cache_dump_fields: &[]*FcTemplateJsonCfg{}, }, }, } diff --git a/config/config_test.go b/config/config_test.go index 4d01d43e7..54e0ddd21 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -1809,19 +1809,20 @@ func TestCgrCdfEventReader(t *testing.T) { Enabled: false, SessionSConns: []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS)}, Readers: []*EventReaderCfg{ - &EventReaderCfg{ - ID: utils.MetaDefault, - Type: utils.META_NONE, - FieldSep: ",", - RunDelay: time.Duration(0), - ConcurrentReqs: 1024, - SourcePath: "/var/spool/cgrates/cdrc/in", - ProcessedPath: "/var/spool/cgrates/cdrc/out", - XmlRootPath: utils.HierarchyPath{utils.EmptyString}, - Tenant: nil, - Timezone: utils.EmptyString, - Filters: []string{}, - Flags: utils.FlagsWithParams{}, + { + ID: utils.MetaDefault, + Type: utils.META_NONE, + FieldSep: ",", + HeaderDefineChar: ":", + RunDelay: time.Duration(0), + ConcurrentReqs: 1024, + SourcePath: "/var/spool/cgrates/cdrc/in", + ProcessedPath: "/var/spool/cgrates/cdrc/out", + XmlRootPath: utils.HierarchyPath{utils.EmptyString}, + Tenant: nil, + Timezone: utils.EmptyString, + Filters: []string{}, + Flags: utils.FlagsWithParams{}, Fields: []*FCTemplate{ {Tag: utils.ToR, Path: utils.MetaCgreq + utils.NestingSep + utils.ToR, Type: utils.MetaVariable, Value: NewRSRParsersMustCompile("~*req.2", true, utils.INFIELD_SEP), Mandatory: true, Layout: time.RFC3339}, @@ -2105,18 +2106,19 @@ func TestCgrCdfEventExporter(t *testing.T) { func TestCgrCfgEventReaderDefault(t *testing.T) { eCfg := &EventReaderCfg{ - ID: utils.MetaDefault, - Type: utils.META_NONE, - FieldSep: ",", - RunDelay: time.Duration(0), - ConcurrentReqs: 1024, - SourcePath: "/var/spool/cgrates/cdrc/in", - ProcessedPath: "/var/spool/cgrates/cdrc/out", - XmlRootPath: utils.HierarchyPath{utils.EmptyString}, - Tenant: nil, - Timezone: utils.EmptyString, - Filters: nil, - Flags: utils.FlagsWithParams{}, + ID: utils.MetaDefault, + Type: utils.META_NONE, + FieldSep: ",", + HeaderDefineChar: ":", + RunDelay: time.Duration(0), + ConcurrentReqs: 1024, + SourcePath: "/var/spool/cgrates/cdrc/in", + ProcessedPath: "/var/spool/cgrates/cdrc/out", + XmlRootPath: utils.HierarchyPath{utils.EmptyString}, + Tenant: nil, + Timezone: utils.EmptyString, + Filters: nil, + Flags: utils.FlagsWithParams{}, Fields: []*FCTemplate{ {Tag: utils.ToR, Path: utils.MetaCgreq + utils.NestingSep + utils.ToR, Type: utils.MetaVariable, Value: NewRSRParsersMustCompile("~*req.2", true, utils.INFIELD_SEP), Mandatory: true, Layout: time.RFC3339}, diff --git a/config/erscfg.go b/config/erscfg.go index 9e44284f6..70f55c50d 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -116,6 +116,7 @@ type EventReaderCfg struct { Type string RowLength int FieldSep string + HeaderDefineChar string RunDelay time.Duration ConcurrentReqs int SourcePath string @@ -148,6 +149,9 @@ func (er *EventReaderCfg) loadFromJsonCfg(jsnCfg *EventReaderJsonCfg, sep string if jsnCfg.Field_separator != nil { er.FieldSep = *jsnCfg.Field_separator } + if jsnCfg.Header_define_character != nil { + er.HeaderDefineChar = *jsnCfg.Header_define_character + } if jsnCfg.Run_delay != nil { if er.RunDelay, err = utils.ParseDurationWithNanosecs(*jsnCfg.Run_delay); err != nil { return @@ -214,6 +218,7 @@ func (er *EventReaderCfg) Clone() (cln *EventReaderCfg) { cln.ID = er.ID cln.Type = er.Type cln.FieldSep = er.FieldSep + cln.HeaderDefineChar = er.HeaderDefineChar cln.RunDelay = er.RunDelay cln.ConcurrentReqs = er.ConcurrentReqs cln.SourcePath = er.SourcePath @@ -295,6 +300,7 @@ func (er *EventReaderCfg) AsMapInterface(separator string) map[string]interface{ utils.TypeCfg: er.Type, utils.RowLengthCfg: er.RowLength, utils.FieldSepCfg: er.FieldSep, + utils.HeaderDefCharCfg: er.HeaderDefineChar, utils.RunDelayCfg: runDelay, utils.ConcurrentReqsCfg: er.ConcurrentReqs, utils.SourcePathCfg: er.SourcePath, diff --git a/config/erscfg_test.go b/config/erscfg_test.go index 9de44a7b3..87fc1455e 100644 --- a/config/erscfg_test.go +++ b/config/erscfg_test.go @@ -105,18 +105,19 @@ func TestEventReaderLoadFromJSON(t *testing.T) { SessionSConns: []string{"conn1", "conn3"}, Readers: []*EventReaderCfg{ { - ID: utils.MetaDefault, - Type: utils.META_NONE, - FieldSep: ",", - RunDelay: time.Duration(0), - ConcurrentReqs: 1024, - SourcePath: "/var/spool/cgrates/cdrc/in", - ProcessedPath: "/var/spool/cgrates/cdrc/out", - XmlRootPath: utils.HierarchyPath{utils.EmptyString}, - Tenant: nil, - Timezone: utils.EmptyString, - Filters: []string{}, - Flags: utils.FlagsWithParams{}, + ID: utils.MetaDefault, + Type: utils.META_NONE, + FieldSep: ",", + HeaderDefineChar: ":", + RunDelay: time.Duration(0), + ConcurrentReqs: 1024, + SourcePath: "/var/spool/cgrates/cdrc/in", + ProcessedPath: "/var/spool/cgrates/cdrc/out", + XmlRootPath: utils.HierarchyPath{utils.EmptyString}, + Tenant: nil, + Timezone: utils.EmptyString, + Filters: []string{}, + Flags: utils.FlagsWithParams{}, Fields: []*FCTemplate{ {Tag: utils.ToR, Path: utils.MetaCgreq + utils.NestingSep + utils.ToR, Type: utils.MetaVariable, Value: NewRSRParsersMustCompile("~*req.2", true, utils.INFIELD_SEP), Mandatory: true, Layout: time.RFC3339}, @@ -144,18 +145,19 @@ func TestEventReaderLoadFromJSON(t *testing.T) { CacheDumpFields: make([]*FCTemplate, 0), }, { - ID: "file_reader1", - Type: utils.MetaFileCSV, - FieldSep: ",", - RunDelay: time.Duration(-1), - ConcurrentReqs: 1024, - SourcePath: "/tmp/ers/in", - ProcessedPath: "/tmp/ers/out", - XmlRootPath: utils.HierarchyPath{utils.EmptyString}, - Tenant: nil, - Timezone: utils.EmptyString, - Filters: nil, - Flags: utils.FlagsWithParams{}, + ID: "file_reader1", + Type: utils.MetaFileCSV, + FieldSep: ",", + HeaderDefineChar: ":", + RunDelay: time.Duration(-1), + ConcurrentReqs: 1024, + SourcePath: "/tmp/ers/in", + ProcessedPath: "/tmp/ers/out", + XmlRootPath: utils.HierarchyPath{utils.EmptyString}, + Tenant: nil, + Timezone: utils.EmptyString, + Filters: nil, + Flags: utils.FlagsWithParams{}, Fields: []*FCTemplate{ {Tag: utils.ToR, Path: utils.MetaCgreq + utils.NestingSep + utils.ToR, Type: utils.MetaVariable, Value: NewRSRParsersMustCompile("~*req.2", true, utils.INFIELD_SEP), Mandatory: true, Layout: time.RFC3339}, @@ -241,18 +243,19 @@ func TestEventReaderSameID(t *testing.T) { SessionSConns: []string{"conn1"}, Readers: []*EventReaderCfg{ { - ID: utils.MetaDefault, - Type: utils.META_NONE, - FieldSep: ",", - RunDelay: time.Duration(0), - ConcurrentReqs: 1024, - SourcePath: "/var/spool/cgrates/cdrc/in", - ProcessedPath: "/var/spool/cgrates/cdrc/out", - XmlRootPath: utils.HierarchyPath{utils.EmptyString}, - Tenant: nil, - Timezone: utils.EmptyString, - Filters: []string{}, - Flags: utils.FlagsWithParams{}, + ID: utils.MetaDefault, + Type: utils.META_NONE, + FieldSep: ",", + HeaderDefineChar: ":", + RunDelay: time.Duration(0), + ConcurrentReqs: 1024, + SourcePath: "/var/spool/cgrates/cdrc/in", + ProcessedPath: "/var/spool/cgrates/cdrc/out", + XmlRootPath: utils.HierarchyPath{utils.EmptyString}, + Tenant: nil, + Timezone: utils.EmptyString, + Filters: []string{}, + Flags: utils.FlagsWithParams{}, Fields: []*FCTemplate{ {Tag: utils.ToR, Path: utils.MetaCgreq + utils.NestingSep + utils.ToR, Type: utils.MetaVariable, Value: NewRSRParsersMustCompile("~*req.2", true, utils.INFIELD_SEP), Mandatory: true, Layout: time.RFC3339}, @@ -280,19 +283,20 @@ func TestEventReaderSameID(t *testing.T) { CacheDumpFields: make([]*FCTemplate, 0), }, { - ID: "file_reader1", - Type: utils.MetaFileCSV, - RowLength: 5, - FieldSep: ",", - RunDelay: time.Duration(-1), - ConcurrentReqs: 1024, - SourcePath: "/tmp/ers/in", - ProcessedPath: "/tmp/ers/out", - XmlRootPath: utils.HierarchyPath{utils.EmptyString}, - Tenant: nil, - Timezone: utils.EmptyString, - Filters: nil, - Flags: utils.FlagsWithParams{}, + ID: "file_reader1", + Type: utils.MetaFileCSV, + RowLength: 5, + FieldSep: ",", + HeaderDefineChar: ":", + RunDelay: time.Duration(-1), + ConcurrentReqs: 1024, + SourcePath: "/tmp/ers/in", + ProcessedPath: "/tmp/ers/out", + XmlRootPath: utils.HierarchyPath{utils.EmptyString}, + Tenant: nil, + Timezone: utils.EmptyString, + Filters: nil, + Flags: utils.FlagsWithParams{}, Fields: []*FCTemplate{ {Tag: "CustomTag2", Path: "CustomPath2", Type: utils.MetaVariable, Value: NewRSRParsersMustCompile("CustomValue2", true, utils.INFIELD_SEP), Mandatory: true, Layout: time.RFC3339}, @@ -386,6 +390,7 @@ func TestERsCfgAsMapInterface(t *testing.T) { "db_type": "*none", "failed_calls_prefix": "", "field_separator": ",", + utils.HeaderDefCharCfg: ":", "fields": []map[string]interface{}{ {"mandatory": true, "path": "*cgreq.ToR", "tag": "ToR", "type": "*variable", "value": "~*req.2"}, {"mandatory": true, "path": "*cgreq.OriginID", "tag": "OriginID", "type": "*variable", "value": "~*req.3"}, @@ -401,11 +406,12 @@ func TestERsCfgAsMapInterface(t *testing.T) { }, }, { - "cache_dump_fields": []map[string]interface{}{}, - "concurrent_requests": 1024, - "db_type": "*file_csv", - "failed_calls_prefix": "", - "field_separator": ",", + "cache_dump_fields": []map[string]interface{}{}, + "concurrent_requests": 1024, + "db_type": "*file_csv", + "failed_calls_prefix": "", + "field_separator": ",", + utils.HeaderDefCharCfg: ":", "fields": []map[string]interface{}{ {"mandatory": true, "path": "*cgreq.ToR", "tag": "ToR", "type": "*variable", "value": "~*req.2"}, {"mandatory": true, "path": "*cgreq.OriginID", "tag": "OriginID", "type": "*variable", "value": "~*req.3"}, diff --git a/config/libconfig_json.go b/config/libconfig_json.go index ba668a25e..df2c0f3f8 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -179,6 +179,7 @@ type EventReaderJsonCfg struct { Type *string Row_length *int Field_separator *string + Header_define_character *string Run_delay *string Concurrent_requests *int Source_path *string diff --git a/config/slicedp.go b/config/slicedp.go index ee7a69567..c39b982fb 100644 --- a/config/slicedp.go +++ b/config/slicedp.go @@ -27,15 +27,19 @@ import ( ) // NewSliceDP constructs a utils.DataProvider -func NewSliceDP(record []string) (dP utils.DataProvider) { - dP = &SliceDP{req: record, cache: utils.MapStorage{}} - return +func NewSliceDP(record []string, headers map[string]int) (dP utils.DataProvider) { + return &SliceDP{ + req: record, + cache: utils.MapStorage{}, + hdrs: headers, + } } // SliceDP implements engine.utils.DataProvider so we can pass it to filters type SliceDP struct { req []string cache utils.MapStorage + hdrs map[string]int } // String is part of engine.utils.DataProvider interface @@ -57,14 +61,14 @@ func (cP *SliceDP) FieldAsInterface(fldPath []string) (data interface{}, err err err != utils.ErrNotFound { // item found in cache return } - err = nil // cancel previous err - if cfgFieldIdx, err := strconv.Atoi(idx); err != nil { + var cfgFieldIdx int + if cfgFieldIdx, err = cP.getIndex(idx); err != nil { return nil, fmt.Errorf("Ignoring record: %v with error : %+v", cP.req, err) - } else if len(cP.req) <= cfgFieldIdx { - return nil, utils.ErrNotFound - } else { - data = cP.req[cfgFieldIdx] } + if len(cP.req) <= cfgFieldIdx { + return nil, utils.ErrNotFound + } + data = cP.req[cfgFieldIdx] cP.cache.Set(fldPath, data) return } @@ -83,3 +87,13 @@ func (cP *SliceDP) FieldAsString(fldPath []string) (data string, err error) { func (cP *SliceDP) RemoteHost() net.Addr { return utils.LocalAddr() } + +func (cP *SliceDP) getIndex(idx string) (fieldIdx int, err error) { + if cP.hdrs != nil { + var has bool + if fieldIdx, has = cP.hdrs[idx]; has { + return + } + } + return strconv.Atoi(idx) +} diff --git a/data/conf/samples/ers_internal/cgrates.json b/data/conf/samples/ers_internal/cgrates.json index 13011d79b..e357dd53f 100644 --- a/data/conf/samples/ers_internal/cgrates.json +++ b/data/conf/samples/ers_internal/cgrates.json @@ -120,17 +120,17 @@ "flags": ["*initiate","*accounts","*resources","*attributes","*log"], "processed_path": "/tmp/init_session/out", "fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value - {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.0", "mandatory": true}, + {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.Tenant", "mandatory": true}, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, - {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.2", "mandatory": true}, - {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.3", "mandatory": true}, + {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.RequestType", "mandatory": true}, {"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call", "mandatory": true}, - {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.4", "mandatory": true}, - {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.5", "mandatory": true}, - {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.6", "mandatory": true}, - {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.7", "mandatory": true}, - {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.8", "mandatory": true}, - {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.9", "mandatory": true}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.Account", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.Subject", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.Destination", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.SetupTime", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.Usage", "mandatory": true}, ], }, { diff --git a/data/conf/samples/ers_mongo/cgrates.json b/data/conf/samples/ers_mongo/cgrates.json index 46899652c..28d20009f 100644 --- a/data/conf/samples/ers_mongo/cgrates.json +++ b/data/conf/samples/ers_mongo/cgrates.json @@ -121,17 +121,17 @@ "flags": ["*initiate","*accounts","*resources","*attributes","*log"], "processed_path": "/tmp/init_session/out", "fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value - {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.0", "mandatory": true}, + {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.Tenant", "mandatory": true}, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, - {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.2", "mandatory": true}, - {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.3", "mandatory": true}, + {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.RequestType", "mandatory": true}, {"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call", "mandatory": true}, - {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.4", "mandatory": true}, - {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.5", "mandatory": true}, - {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.6", "mandatory": true}, - {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.7", "mandatory": true}, - {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.8", "mandatory": true}, - {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.9", "mandatory": true}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.Account", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.Subject", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.Destination", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.SetupTime", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.Usage", "mandatory": true}, ], }, { diff --git a/data/conf/samples/ers_mysql/cgrates.json b/data/conf/samples/ers_mysql/cgrates.json index b386722ab..ba0f86c32 100644 --- a/data/conf/samples/ers_mysql/cgrates.json +++ b/data/conf/samples/ers_mysql/cgrates.json @@ -118,17 +118,17 @@ "flags": ["*initiate","*accounts","*resources","*attributes","*log"], "processed_path": "/tmp/init_session/out", "fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value - {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.0", "mandatory": true}, + {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.Tenant", "mandatory": true}, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, - {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.2", "mandatory": true}, - {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.3", "mandatory": true}, + {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.RequestType", "mandatory": true}, {"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call", "mandatory": true}, - {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.4", "mandatory": true}, - {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.5", "mandatory": true}, - {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.6", "mandatory": true}, - {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.7", "mandatory": true}, - {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.8", "mandatory": true}, - {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.9", "mandatory": true}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.Account", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.Subject", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.Destination", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.SetupTime", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.Usage", "mandatory": true}, ], }, { diff --git a/data/conf/samples/ers_postgres/cgrates.json b/data/conf/samples/ers_postgres/cgrates.json index fa33f3548..ba7f0b2c9 100644 --- a/data/conf/samples/ers_postgres/cgrates.json +++ b/data/conf/samples/ers_postgres/cgrates.json @@ -115,17 +115,17 @@ "flags": ["*initiate","*accounts","*resources","*attributes","*log"], "processed_path": "/tmp/init_session/out", "fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value - {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.0", "mandatory": true}, + {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.Tenant", "mandatory": true}, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, - {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.2", "mandatory": true}, - {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.3", "mandatory": true}, + {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.RequestType", "mandatory": true}, {"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call", "mandatory": true}, - {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.4", "mandatory": true}, - {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.5", "mandatory": true}, - {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.6", "mandatory": true}, - {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.7", "mandatory": true}, - {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.8", "mandatory": true}, - {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.9", "mandatory": true}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.Account", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.Subject", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.Destination", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.SetupTime", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.Usage", "mandatory": true}, ], }, { diff --git a/ers/filecsv.go b/ers/filecsv.go index 862b10200..206535520 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -131,11 +131,12 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) { defer file.Close() csvReader := csv.NewReader(bufio.NewReader(file)) csvReader.FieldsPerRecord = rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].RowLength + csvReader.Comment = utils.COMMENT_CHAR csvReader.Comma = utils.CSV_SEP if len(rdr.Config().FieldSep) > 0 { csvReader.Comma = rune(rdr.Config().FieldSep[0]) } - csvReader.Comment = '#' + var headers map[string]int rowNr := 0 // This counts the rows in the file, not really number of CDRs evsPosted := 0 timeStart := time.Now() @@ -148,9 +149,20 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) { } return } + if rowNr == 0 && len(record) > 0 && + strings.HasPrefix(record[0], rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].HeaderDefineChar) { + record[0] = strings.TrimPrefix(record[0], rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].HeaderDefineChar) + // map the templates + headers = make(map[string]int) + for i, hdr := range record { + headers[hdr] = i + } + continue + } rowNr++ // increment the rowNr after checking if it's not the end of file + agReq := agents.NewAgentRequest( - config.NewSliceDP(record), reqVars, + config.NewSliceDP(record, headers), reqVars, nil, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, diff --git a/ers/filecsv_it_test.go b/ers/filecsv_it_test.go index 026e334af..8de769ed2 100644 --- a/ers/filecsv_it_test.go +++ b/ers/filecsv_it_test.go @@ -48,7 +48,8 @@ accid22;*postpaid;itsyscom.com;1001;+4986517174963;2013-02-03 19:54:00;123;val_e #accid1;*pseudoprepaid;itsyscom.com;1001;+4986517174963;2013-02-03 19:54:00;12;val_extra3;"";val_extra1 accid23;*rated;cgrates.org;1001;086517174963;2013-02-03 19:54:00;26;val_extra3;"";val_extra1` - fileContent3 = `cgrates.org,*voice,SessionFromCsv,*prepaid,1001,ANY2CNT,1002,2018-01-07 17:00:00 +0000 UTC,2018-01-07 17:00:10 +0000 UTC,5m + fileContent3 = `:Tenant,ToR,OriginID,RequestType,Account,Subject,Destination,SetupTime,AnswerTime,Usage +cgrates.org,*voice,SessionFromCsv,*prepaid,1001,ANY2CNT,1002,2018-01-07 17:00:00 +0000 UTC,2018-01-07 17:00:10 +0000 UTC,5m ` fileContentForFilter = `accid21;*prepaid;itsyscom.com;1002;086517174963;2013-02-03 19:54:00;62;val_extra3;"";val_extra1 @@ -79,7 +80,6 @@ accid23;*rated;cgrates.org;1001;086517174963;2013-02-03 19:54:00;26;val_extra3;" ) func TestCsvReadFile(t *testing.T) { - switch *dbType { case utils.MetaInternal: csvCfgDIR = "ers_internal" diff --git a/ers/flatstore.go b/ers/flatstore.go index c935eac8f..a299056f2 100644 --- a/ers/flatstore.go +++ b/ers/flatstore.go @@ -189,7 +189,7 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) { } rowNr++ // increment the rowNr after checking if it's not the end of file agReq := agents.NewAgentRequest( - config.NewSliceDP(record), reqVars, + config.NewSliceDP(record, nil), reqVars, nil, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, diff --git a/ers/partial_csv.go b/ers/partial_csv.go index 802c7fddf..d89a27e6b 100644 --- a/ers/partial_csv.go +++ b/ers/partial_csv.go @@ -155,6 +155,7 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) { csvReader.Comma = rune(rdr.Config().FieldSep[0]) } csvReader.Comment = '#' + var headers map[string]int rowNr := 0 // This counts the rows in the file, not really number of CDRs evsPosted := 0 timeStart := time.Now() @@ -167,9 +168,19 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) { } return } + if rowNr == 0 && len(record) > 0 && + strings.HasPrefix(record[0], rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].HeaderDefineChar) { + record[0] = strings.TrimPrefix(record[0], rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].HeaderDefineChar) + // map the templates + headers = make(map[string]int) + for i, hdr := range record { + headers[hdr] = i + } + continue + } rowNr++ // increment the rowNr after checking if it's not the end of file agReq := agents.NewAgentRequest( - config.NewSliceDP(record), reqVars, + config.NewSliceDP(record, headers), reqVars, nil, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, diff --git a/packages/debian/changelog b/packages/debian/changelog index ed10e5ad1..d9736f07b 100644 --- a/packages/debian/changelog +++ b/packages/debian/changelog @@ -63,6 +63,8 @@ cgrates (0.11.0~dev) UNRELEASED; urgency=medium * [AgentS] Added *constant: prefix to do not proccess the value with RSRParsers * [AgentS] Added DynamicDataProvider to AgentRequest + * [Server] Corectly log the server listen error + * [ERs] Added support to reference CSV fields by the column name -- DanB Wed, 19 Feb 2020 13:25:52 +0200 diff --git a/utils/consts.go b/utils/consts.go index 5dd2d0498..9c903c643 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -2036,6 +2036,7 @@ const ( IDCfg = "id" RowLengthCfg = "row_length" FieldSepCfg = "field_separator" + HeaderDefCharCfg = "header_define_character" RunDelayCfg = "run_delay" SourcePathCfg = "source_path" ProcessedPathCfg = "processed_path"