mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Merge content,header,trailer fields into fields
This commit is contained in:
committed by
Dan Christian Bogos
parent
e0b9dcb35d
commit
26ea469328
@@ -315,13 +315,11 @@ const CGRATES_CFG_JSON = `
|
||||
"source_path": "/var/spool/cgrates/cdrc/in", // read data from this path
|
||||
"processed_path": "/var/spool/cgrates/cdrc/out", // move processed data here
|
||||
"xml_root_path": "", // path towards one event in case of XML CDRs
|
||||
"source_id": "ers_csv", // free form field, tag identifying the source of the CDRs within CDRS database
|
||||
"tenant": "", // tenant used by import
|
||||
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
|
||||
"filters": [], // limit parsing based on the filters
|
||||
"flags": [], // flags to influence the event processing
|
||||
"header_fields": [], // template of the import header fields
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "~*req.2", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "~*req.3", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "~*req.4", "mandatory": true},
|
||||
@@ -334,7 +332,6 @@ const CGRATES_CFG_JSON = `
|
||||
{"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "~*req.12", "mandatory": true},
|
||||
{"tag": "Usage", "field_id": "Usage", "type": "*composed", "value": "~*req.13", "mandatory": true},
|
||||
],
|
||||
"trailer_fields": [], // template of the import trailer fields
|
||||
"continue": false, // continue to the next template if executed
|
||||
},
|
||||
],
|
||||
|
||||
@@ -1638,9 +1638,7 @@ func TestDfEventReaderCfg(t *testing.T) {
|
||||
Timezone: utils.StringPointer(utils.EmptyString),
|
||||
Filters: &[]string{},
|
||||
Flags: &[]string{},
|
||||
Header_fields: &[]*FcTemplateJsonCfg{},
|
||||
Content_fields: &cdrFields,
|
||||
Trailer_fields: &[]*FcTemplateJsonCfg{},
|
||||
Fields: &cdrFields,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -1627,8 +1627,7 @@ func TestCgrCdfEventReader(t *testing.T) {
|
||||
Timezone: utils.EmptyString,
|
||||
Filters: []string{},
|
||||
Flags: utils.FlagsWithParams{},
|
||||
HeaderFields: make([]*FCTemplate, 0),
|
||||
ContentFields: []*FCTemplate{
|
||||
Fields: []*FCTemplate{
|
||||
{Tag: "TOR", FieldId: "ToR", Type: utils.META_COMPOSED,
|
||||
Value: NewRSRParsersMustCompile("~*req.2", true, utils.INFIELD_SEP), Mandatory: true},
|
||||
{Tag: "OriginID", FieldId: "OriginID", Type: utils.META_COMPOSED,
|
||||
@@ -1652,7 +1651,6 @@ func TestCgrCdfEventReader(t *testing.T) {
|
||||
{Tag: "Usage", FieldId: "Usage", Type: utils.META_COMPOSED,
|
||||
Value: NewRSRParsersMustCompile("~*req.13", true, utils.INFIELD_SEP), Mandatory: true},
|
||||
},
|
||||
TrailerFields: make([]*FCTemplate, 0),
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -1675,8 +1673,7 @@ func TestCgrCfgEventReaderDefault(t *testing.T) {
|
||||
Timezone: utils.EmptyString,
|
||||
Filters: nil,
|
||||
Flags: utils.FlagsWithParams{},
|
||||
HeaderFields: make([]*FCTemplate, 0),
|
||||
ContentFields: []*FCTemplate{
|
||||
Fields: []*FCTemplate{
|
||||
{Tag: "TOR", FieldId: "ToR", Type: utils.META_COMPOSED,
|
||||
Value: NewRSRParsersMustCompile("~*req.2", true, utils.INFIELD_SEP), Mandatory: true},
|
||||
{Tag: "OriginID", FieldId: "OriginID", Type: utils.META_COMPOSED,
|
||||
@@ -1700,7 +1697,7 @@ func TestCgrCfgEventReaderDefault(t *testing.T) {
|
||||
{Tag: "Usage", FieldId: "Usage", Type: utils.META_COMPOSED,
|
||||
Value: NewRSRParsersMustCompile("~*req.13", true, utils.INFIELD_SEP), Mandatory: true},
|
||||
},
|
||||
TrailerFields: make([]*FCTemplate, 0),
|
||||
CacheDumpFields: make([]*FCTemplate, 0),
|
||||
}
|
||||
if !reflect.DeepEqual(cgrCfg.dfltEvRdr, eCfg) {
|
||||
t.Errorf("received: %+v,\n expecting: %+v", utils.ToJSON(cgrCfg.dfltEvRdr), utils.ToJSON(eCfg))
|
||||
|
||||
@@ -114,9 +114,7 @@ type EventReaderCfg struct {
|
||||
FailedCallsPrefix string // Used in case of flatstore CDRs to avoid searching for BYE records
|
||||
PartialRecordCache time.Duration // Duration to cache partial records when not pairing
|
||||
PartialCacheExpiryAction string
|
||||
HeaderFields []*FCTemplate
|
||||
ContentFields []*FCTemplate
|
||||
TrailerFields []*FCTemplate
|
||||
Fields []*FCTemplate
|
||||
CacheDumpFields []*FCTemplate
|
||||
}
|
||||
|
||||
@@ -178,18 +176,8 @@ func (er *EventReaderCfg) loadFromJsonCfg(jsnCfg *EventReaderJsonCfg, sep string
|
||||
if jsnCfg.Partial_cache_expiry_action != nil {
|
||||
er.PartialCacheExpiryAction = *jsnCfg.Partial_cache_expiry_action
|
||||
}
|
||||
if jsnCfg.Header_fields != nil {
|
||||
if er.HeaderFields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnCfg.Header_fields, sep); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if jsnCfg.Content_fields != nil {
|
||||
if er.ContentFields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnCfg.Content_fields, sep); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if jsnCfg.Trailer_fields != nil {
|
||||
if er.TrailerFields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnCfg.Trailer_fields, sep); err != nil {
|
||||
if jsnCfg.Fields != nil {
|
||||
if er.Fields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnCfg.Fields, sep); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -228,17 +216,13 @@ func (er *EventReaderCfg) Clone() (cln *EventReaderCfg) {
|
||||
}
|
||||
cln.Flags = er.Flags
|
||||
cln.FailedCallsPrefix = er.FailedCallsPrefix
|
||||
cln.HeaderFields = make([]*FCTemplate, len(er.HeaderFields))
|
||||
for idx, fld := range er.HeaderFields {
|
||||
cln.HeaderFields[idx] = fld.Clone()
|
||||
cln.Fields = make([]*FCTemplate, len(er.Fields))
|
||||
for idx, fld := range er.Fields {
|
||||
cln.Fields[idx] = fld.Clone()
|
||||
}
|
||||
cln.ContentFields = make([]*FCTemplate, len(er.ContentFields))
|
||||
for idx, fld := range er.ContentFields {
|
||||
cln.ContentFields[idx] = fld.Clone()
|
||||
}
|
||||
cln.TrailerFields = make([]*FCTemplate, len(er.TrailerFields))
|
||||
for idx, fld := range er.TrailerFields {
|
||||
cln.TrailerFields[idx] = fld.Clone()
|
||||
cln.CacheDumpFields = make([]*FCTemplate, len(er.CacheDumpFields))
|
||||
for idx, fld := range er.CacheDumpFields {
|
||||
cln.CacheDumpFields[idx] = fld.Clone()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -27,13 +27,12 @@ import (
|
||||
|
||||
func TestEventRedearClone(t *testing.T) {
|
||||
orig := &EventReaderCfg{
|
||||
ID: utils.MetaDefault,
|
||||
Type: "RandomType",
|
||||
FieldSep: ",",
|
||||
Filters: []string{"Filter1", "Filter2"},
|
||||
Tenant: NewRSRParsersMustCompile("cgrates.org", true, utils.INFIELD_SEP),
|
||||
HeaderFields: []*FCTemplate{},
|
||||
ContentFields: []*FCTemplate{
|
||||
ID: utils.MetaDefault,
|
||||
Type: "RandomType",
|
||||
FieldSep: ",",
|
||||
Filters: []string{"Filter1", "Filter2"},
|
||||
Tenant: NewRSRParsersMustCompile("cgrates.org", true, utils.INFIELD_SEP),
|
||||
Fields: []*FCTemplate{
|
||||
{
|
||||
Tag: "TOR",
|
||||
FieldId: "ToR",
|
||||
@@ -49,20 +48,19 @@ func TestEventRedearClone(t *testing.T) {
|
||||
Mandatory: true,
|
||||
},
|
||||
},
|
||||
TrailerFields: []*FCTemplate{},
|
||||
CacheDumpFields: make([]*FCTemplate, 0),
|
||||
}
|
||||
cloned := orig.Clone()
|
||||
if !reflect.DeepEqual(cloned, orig) {
|
||||
t.Errorf("expected: %s \n,received: %s", utils.ToJSON(orig), utils.ToJSON(cloned))
|
||||
}
|
||||
initialOrig := &EventReaderCfg{
|
||||
ID: utils.MetaDefault,
|
||||
Type: "RandomType",
|
||||
FieldSep: ",",
|
||||
Filters: []string{"Filter1", "Filter2"},
|
||||
Tenant: NewRSRParsersMustCompile("cgrates.org", true, utils.INFIELD_SEP),
|
||||
HeaderFields: []*FCTemplate{},
|
||||
ContentFields: []*FCTemplate{
|
||||
ID: utils.MetaDefault,
|
||||
Type: "RandomType",
|
||||
FieldSep: ",",
|
||||
Filters: []string{"Filter1", "Filter2"},
|
||||
Tenant: NewRSRParsersMustCompile("cgrates.org", true, utils.INFIELD_SEP),
|
||||
Fields: []*FCTemplate{
|
||||
{
|
||||
Tag: "TOR",
|
||||
FieldId: "ToR",
|
||||
@@ -78,10 +76,10 @@ func TestEventRedearClone(t *testing.T) {
|
||||
Mandatory: true,
|
||||
},
|
||||
},
|
||||
TrailerFields: []*FCTemplate{},
|
||||
CacheDumpFields: make([]*FCTemplate, 0),
|
||||
}
|
||||
orig.Filters = []string{"SingleFilter"}
|
||||
orig.ContentFields = []*FCTemplate{
|
||||
orig.Fields = []*FCTemplate{
|
||||
{
|
||||
Tag: "TOR",
|
||||
FieldId: "ToR",
|
||||
@@ -113,8 +111,7 @@ func TestEventReaderLoadFromJSON(t *testing.T) {
|
||||
Timezone: utils.EmptyString,
|
||||
Filters: []string{},
|
||||
Flags: utils.FlagsWithParams{},
|
||||
HeaderFields: make([]*FCTemplate, 0),
|
||||
ContentFields: []*FCTemplate{
|
||||
Fields: []*FCTemplate{
|
||||
{Tag: "TOR", FieldId: "ToR", Type: utils.META_COMPOSED,
|
||||
Value: NewRSRParsersMustCompile("~*req.2", true, utils.INFIELD_SEP), Mandatory: true},
|
||||
{Tag: "OriginID", FieldId: "OriginID", Type: utils.META_COMPOSED,
|
||||
@@ -138,7 +135,6 @@ func TestEventReaderLoadFromJSON(t *testing.T) {
|
||||
{Tag: "Usage", FieldId: "Usage", Type: utils.META_COMPOSED,
|
||||
Value: NewRSRParsersMustCompile("~*req.13", true, utils.INFIELD_SEP), Mandatory: true},
|
||||
},
|
||||
TrailerFields: make([]*FCTemplate, 0),
|
||||
},
|
||||
&EventReaderCfg{
|
||||
ID: "file_reader1",
|
||||
@@ -153,8 +149,7 @@ func TestEventReaderLoadFromJSON(t *testing.T) {
|
||||
Timezone: utils.EmptyString,
|
||||
Filters: nil,
|
||||
Flags: utils.FlagsWithParams{},
|
||||
HeaderFields: make([]*FCTemplate, 0),
|
||||
ContentFields: []*FCTemplate{
|
||||
Fields: []*FCTemplate{
|
||||
{Tag: "TOR", FieldId: "ToR", Type: utils.META_COMPOSED,
|
||||
Value: NewRSRParsersMustCompile("~*req.2", true, utils.INFIELD_SEP), Mandatory: true},
|
||||
{Tag: "OriginID", FieldId: "OriginID", Type: utils.META_COMPOSED,
|
||||
@@ -178,7 +173,7 @@ func TestEventReaderLoadFromJSON(t *testing.T) {
|
||||
{Tag: "Usage", FieldId: "Usage", Type: utils.META_COMPOSED,
|
||||
Value: NewRSRParsersMustCompile("~*req.13", true, utils.INFIELD_SEP), Mandatory: true},
|
||||
},
|
||||
TrailerFields: make([]*FCTemplate, 0),
|
||||
CacheDumpFields: make([]*FCTemplate, 0),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -33,6 +33,9 @@ func NewFCTemplateFromFCTemplateJsonCfg(jsnCfg *FcTemplateJsonCfg, separator str
|
||||
if jsnCfg.Field_id != nil {
|
||||
fcTmp.FieldId = *jsnCfg.Field_id
|
||||
}
|
||||
if jsnCfg.Path != nil {
|
||||
fcTmp.Path = *jsnCfg.Path
|
||||
}
|
||||
fcTmp.Tag = fcTmp.FieldId
|
||||
if jsnCfg.Tag != nil {
|
||||
fcTmp.Tag = *jsnCfg.Tag
|
||||
@@ -98,8 +101,9 @@ func NewFCTemplateFromFCTemplateJsonCfg(jsnCfg *FcTemplateJsonCfg, separator str
|
||||
|
||||
type FCTemplate struct {
|
||||
Tag string
|
||||
Type string // Type of field
|
||||
FieldId string // Field identifier
|
||||
Type string // Type of field
|
||||
FieldId string // Field identifier
|
||||
Path string
|
||||
Filters []string // list of filter profiles
|
||||
Value RSRParsers
|
||||
Width int
|
||||
|
||||
@@ -183,9 +183,7 @@ type EventReaderJsonCfg struct {
|
||||
Failed_calls_prefix *string
|
||||
Partial_record_cache *string
|
||||
Partial_cache_expiry_action *string
|
||||
Header_fields *[]*FcTemplateJsonCfg
|
||||
Content_fields *[]*FcTemplateJsonCfg
|
||||
Trailer_fields *[]*FcTemplateJsonCfg
|
||||
Fields *[]*FcTemplateJsonCfg
|
||||
Cache_dump_fields *[]*FcTemplateJsonCfg
|
||||
}
|
||||
|
||||
@@ -524,6 +522,7 @@ type FcTemplateJsonCfg struct {
|
||||
Tag *string
|
||||
Type *string
|
||||
Field_id *string
|
||||
Path *string
|
||||
Attribute_id *string
|
||||
Filters *[]string
|
||||
Value *string
|
||||
|
||||
@@ -75,7 +75,7 @@
|
||||
"flags": ["*dryrun"],
|
||||
"source_path": "/tmp/ers/in",
|
||||
"processed_path": "/tmp/ers/out",
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"fields":[
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "~*req.2", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "~*req.3", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "~*req.4", "mandatory": true},
|
||||
|
||||
@@ -92,7 +92,7 @@
|
||||
"source_path": "/tmp/ers2/in",
|
||||
"flags": ["*dryrun"],
|
||||
"processed_path": "/tmp/ers2/out",
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"fields":[
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "~*req.0", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "~*req.1", "mandatory": true},
|
||||
@@ -117,7 +117,7 @@
|
||||
"source_path": "/tmp/init_session/in",
|
||||
"flags": ["*initiate","*accounts","*resources","*attributes","*log"],
|
||||
"processed_path": "/tmp/init_session/out",
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"fields":[
|
||||
{"tag": "Tenant", "field_id": "Tenant", "type": "*variable", "value": "~*req.0", "mandatory": true},
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*variable", "value": "~*req.2", "mandatory": true},
|
||||
@@ -139,7 +139,7 @@
|
||||
"source_path": "/tmp/terminate_session/in",
|
||||
"flags": ["*terminate","*accounts","*resources","*log"],
|
||||
"processed_path": "/tmp/terminate_session/out",
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"fields":[
|
||||
{"tag": "Tenant", "field_id": "Tenant", "type": "*variable", "value": "~*req.0", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*variable", "value": "~*req.2", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*variable", "value": "~*req.3", "mandatory": true},
|
||||
@@ -154,7 +154,7 @@
|
||||
"source_path": "/tmp/cdrs/in",
|
||||
"flags": ["*cdrs","*log"],
|
||||
"processed_path": "/tmp/cdrs/out",
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"fields":[
|
||||
{"tag": "Tenant", "field_id": "Tenant", "type": "*variable", "value": "~*req.0", "mandatory": true},
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*variable", "value": "~*req.2", "mandatory": true},
|
||||
@@ -179,7 +179,7 @@
|
||||
"processed_path": "/tmp/ers_with_filters/out",
|
||||
"flags": ["*cdrs","*log"],
|
||||
"filters":["*string:~*req.3:1002"],
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"fields":[
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "~*req.0", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "~*req.1", "mandatory": true},
|
||||
@@ -205,7 +205,7 @@
|
||||
"flags": ["*cdrs","*log"],
|
||||
"processed_path": "/tmp/xmlErs/out",
|
||||
"xml_root_path": "broadWorksCDR.cdrData",
|
||||
"content_fields":[
|
||||
"fields":[
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.localCallId", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*rated", "mandatory": true},
|
||||
@@ -225,12 +225,10 @@
|
||||
"source_path": "/tmp/fwvErs/in",
|
||||
"flags": ["*cdrs"],
|
||||
"processed_path": "/tmp/fwvErs/out",
|
||||
"header_fields": [
|
||||
{"tag": "FileName", "field_id": "CdrFileName", "type": "*composed", "value": "~*req.95-135", "padding":"right"},
|
||||
{"tag": "FileSeqNr", "field_id": "FileSeqNr", "type": "*composed", "value": "~*req.135-141", "padding":"zeroleft"},
|
||||
{"tag": "AccId1", "field_id": "AccId1", "type": "*composed", "value": "~*req.135-141", "padding":"zeroleft"},
|
||||
],
|
||||
"content_fields": [
|
||||
"fields": [
|
||||
{"tag": "FileName", "field_id": "*cgreq.CdrFileName", "type": "*variable", "value": "~*hdr.95-135", "padding":"right"},
|
||||
{"tag": "FileSeqNr", "field_id": "FileSeqNr", "type": "*variable", "value": "~*hdr.135-141", "padding":"zeroleft"},
|
||||
{"tag": "AccId1", "field_id": "AccId1", "type": "*variable", "value": "~*hdr.135-141", "padding":"zeroleft"},
|
||||
{"tag": "Tor", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "rated", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*variable", "value": "~*req.0-10", "padding":"right", "mandatory": true},
|
||||
@@ -248,10 +246,8 @@
|
||||
{"tag": "WholesaleAmount", "field_id": "RetailAmount", "type": "*variable", "value": "~*req.115-123", "padding":"zeroleft"},
|
||||
{"tag": "AccId1", "field_id": "AccId1", "type": "*variable", "value": "~*req.3-6", "padding":"zeroleft", "mandatory": true},
|
||||
{"tag": "AccId2", "field_id": "AccId2", "type": "*variable", "value": "~*req.14-30", "padding":"right", "mandatory": true},
|
||||
],
|
||||
"trailer_fields": [
|
||||
{"tag": "NrOfCdrs", "type": "metatag", "metatag_id":"total_cdrs", "value": "~*req.142-150"},
|
||||
{"tag": "TotalDuration", "type": "metatag", "metatag_id":"total_duration", "value": "~*req.150-162"},
|
||||
{"tag": "NrOfCdrs", "type": "*variable", "field_id":"NrOfCdrs", "value": "~*trl.142-150"},
|
||||
{"tag": "TotalDuration", "type": "*variable", "field_id":"TotalDuration", "value": "~*trl.150-162"}
|
||||
],
|
||||
},
|
||||
{
|
||||
@@ -264,7 +260,7 @@
|
||||
"processed_path": "/tmp/partErs1/out",
|
||||
"partial_record_cache": "2s",
|
||||
"partial_cache_expiry_action": "*dump_to_file",
|
||||
"content_fields":[
|
||||
"fields":[
|
||||
{"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"},
|
||||
{"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"},
|
||||
@@ -305,7 +301,7 @@
|
||||
"flags": ["*cdrs"],
|
||||
"partial_record_cache": "1s",
|
||||
"partial_cache_expiry_action": "*post_cdr",
|
||||
"content_fields":[
|
||||
"fields":[
|
||||
{"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"},
|
||||
{"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"},
|
||||
@@ -336,7 +332,7 @@
|
||||
"failed_calls_prefix": "missed_calls",
|
||||
"flags": ["*cdrs"],
|
||||
"partial_record_cache": "2s",
|
||||
"content_fields":[
|
||||
"fields":[
|
||||
{"tag": "Tor", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed","value":"~*req.3;~*req.1;~*req.2", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*variable", "value": "~*req.7", "mandatory": true},
|
||||
|
||||
@@ -95,7 +95,7 @@
|
||||
"source_path": "/tmp/ers2/in",
|
||||
"flags": ["*dryrun"],
|
||||
"processed_path": "/tmp/ers2/out",
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"fields":[
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "~*req.0", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "~*req.1", "mandatory": true},
|
||||
@@ -120,7 +120,7 @@
|
||||
"source_path": "/tmp/init_session/in",
|
||||
"flags": ["*initiate","*accounts","*resources","*attributes","*log"],
|
||||
"processed_path": "/tmp/init_session/out",
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "Tenant", "field_id": "Tenant", "type": "*variable", "value": "~*req.0", "mandatory": true},
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*variable", "value": "~*req.2", "mandatory": true},
|
||||
@@ -142,7 +142,7 @@
|
||||
"source_path": "/tmp/terminate_session/in",
|
||||
"flags": ["*terminate","*accounts","*resources","*log"],
|
||||
"processed_path": "/tmp/terminate_session/out",
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "Tenant", "field_id": "Tenant", "type": "*variable", "value": "~*req.0", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*variable", "value": "~*req.2", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*variable", "value": "~*req.3", "mandatory": true},
|
||||
@@ -157,7 +157,7 @@
|
||||
"source_path": "/tmp/cdrs/in",
|
||||
"flags": ["*cdrs","*log"],
|
||||
"processed_path": "/tmp/cdrs/out",
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "Tenant", "field_id": "Tenant", "type": "*variable", "value": "~*req.0", "mandatory": true},
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*variable", "value": "~*req.2", "mandatory": true},
|
||||
@@ -182,7 +182,7 @@
|
||||
"processed_path": "/tmp/ers_with_filters/out",
|
||||
"flags": ["*cdrs","*log"],
|
||||
"filters":["*string:~*req.3:1002"],
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "~*req.0", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "~*req.1", "mandatory": true},
|
||||
@@ -208,7 +208,7 @@
|
||||
"flags": ["*cdrs","*log"],
|
||||
"processed_path": "/tmp/xmlErs/out",
|
||||
"xml_root_path": "broadWorksCDR.cdrData",
|
||||
"content_fields":[
|
||||
"fields":[
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.localCallId", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*rated", "mandatory": true},
|
||||
@@ -267,7 +267,7 @@
|
||||
"processed_path": "/tmp/partErs1/out",
|
||||
"partial_record_cache": "2s",
|
||||
"partial_cache_expiry_action": "*dump_to_file",
|
||||
"content_fields":[
|
||||
"fields":[
|
||||
{"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"},
|
||||
{"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"},
|
||||
@@ -308,7 +308,7 @@
|
||||
"flags": ["*cdrs"],
|
||||
"partial_record_cache": "1s",
|
||||
"partial_cache_expiry_action": "*post_cdr",
|
||||
"content_fields":[
|
||||
"fields":[
|
||||
{"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"},
|
||||
{"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"},
|
||||
@@ -339,7 +339,7 @@
|
||||
"failed_calls_prefix": "missed_calls",
|
||||
"flags": ["*cdrs"],
|
||||
"partial_record_cache": "2s",
|
||||
"content_fields":[
|
||||
"fields":[
|
||||
{"tag": "Tor", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed","value":"~*req.3;~*req.1;~*req.2", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*variable", "value": "~*req.7", "mandatory": true},
|
||||
|
||||
@@ -92,7 +92,7 @@
|
||||
"source_path": "/tmp/ers2/in",
|
||||
"flags": ["*dryrun"],
|
||||
"processed_path": "/tmp/ers2/out",
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "~*req.0", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "~*req.1", "mandatory": true},
|
||||
@@ -117,7 +117,7 @@
|
||||
"source_path": "/tmp/init_session/in",
|
||||
"flags": ["*initiate","*accounts","*resources","*attributes","*log"],
|
||||
"processed_path": "/tmp/init_session/out",
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "Tenant", "field_id": "Tenant", "type": "*variable", "value": "~*req.0", "mandatory": true},
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*variable", "value": "~*req.2", "mandatory": true},
|
||||
@@ -139,7 +139,7 @@
|
||||
"source_path": "/tmp/terminate_session/in",
|
||||
"flags": ["*terminate","*accounts","*resources","*log"],
|
||||
"processed_path": "/tmp/terminate_session/out",
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "Tenant", "field_id": "Tenant", "type": "*variable", "value": "~*req.0", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*variable", "value": "~*req.2", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*variable", "value": "~*req.3", "mandatory": true},
|
||||
@@ -154,7 +154,7 @@
|
||||
"source_path": "/tmp/cdrs/in",
|
||||
"flags": ["*cdrs","*log"],
|
||||
"processed_path": "/tmp/cdrs/out",
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "Tenant", "field_id": "Tenant", "type": "*variable", "value": "~*req.0", "mandatory": true},
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*variable", "value": "~*req.2", "mandatory": true},
|
||||
@@ -179,7 +179,7 @@
|
||||
"processed_path": "/tmp/ers_with_filters/out",
|
||||
"flags": ["*cdrs","*log"],
|
||||
"filters":["*string:~*req.3:1002"],
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "~*req.0", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "~*req.1", "mandatory": true},
|
||||
@@ -205,7 +205,7 @@
|
||||
"flags": ["*cdrs","*log"],
|
||||
"processed_path": "/tmp/xmlErs/out",
|
||||
"xml_root_path": "broadWorksCDR.cdrData",
|
||||
"content_fields":[
|
||||
"fields":[
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.localCallId", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*rated", "mandatory": true},
|
||||
@@ -264,7 +264,7 @@
|
||||
"processed_path": "/tmp/partErs1/out",
|
||||
"partial_record_cache": "2s",
|
||||
"partial_cache_expiry_action": "*dump_to_file",
|
||||
"content_fields":[
|
||||
"fields":[
|
||||
{"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"},
|
||||
{"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"},
|
||||
@@ -305,7 +305,7 @@
|
||||
"flags": ["*cdrs"],
|
||||
"partial_record_cache": "1s",
|
||||
"partial_cache_expiry_action": "*post_cdr",
|
||||
"content_fields":[
|
||||
"fields":[
|
||||
{"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"},
|
||||
{"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"},
|
||||
@@ -336,7 +336,7 @@
|
||||
"failed_calls_prefix": "missed_calls",
|
||||
"flags": ["*cdrs"],
|
||||
"partial_record_cache": "2s",
|
||||
"content_fields":[
|
||||
"fields":[
|
||||
{"tag": "Tor", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed","value":"~*req.3;~*req.1;~*req.2", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*variable", "value": "~*req.7", "mandatory": true},
|
||||
|
||||
@@ -89,7 +89,7 @@
|
||||
"source_path": "/tmp/ers2/in",
|
||||
"flags": ["*dryrun"],
|
||||
"processed_path": "/tmp/ers2/out",
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "~*req.0", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "~*req.1", "mandatory": true},
|
||||
@@ -114,7 +114,7 @@
|
||||
"source_path": "/tmp/init_session/in",
|
||||
"flags": ["*initiate","*accounts","*resources","*attributes","*log"],
|
||||
"processed_path": "/tmp/init_session/out",
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "Tenant", "field_id": "Tenant", "type": "*variable", "value": "~*req.0", "mandatory": true},
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*variable", "value": "~*req.2", "mandatory": true},
|
||||
@@ -136,7 +136,7 @@
|
||||
"source_path": "/tmp/terminate_session/in",
|
||||
"flags": ["*terminate","*accounts","*resources","*log"],
|
||||
"processed_path": "/tmp/terminate_session/out",
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "Tenant", "field_id": "Tenant", "type": "*variable", "value": "~*req.0", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*variable", "value": "~*req.2", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*variable", "value": "~*req.3", "mandatory": true},
|
||||
@@ -151,7 +151,7 @@
|
||||
"source_path": "/tmp/cdrs/in",
|
||||
"flags": ["*cdrs","*log"],
|
||||
"processed_path": "/tmp/cdrs/out",
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "Tenant", "field_id": "Tenant", "type": "*variable", "value": "~*req.0", "mandatory": true},
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*variable", "value": "~*req.2", "mandatory": true},
|
||||
@@ -176,7 +176,7 @@
|
||||
"processed_path": "/tmp/ers_with_filters/out",
|
||||
"flags": ["*cdrs","*log"],
|
||||
"filters":["*string:~*req.3:1002"],
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "~*req.0", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "~*req.1", "mandatory": true},
|
||||
@@ -202,7 +202,7 @@
|
||||
"flags": ["*cdrs","*log"],
|
||||
"processed_path": "/tmp/xmlErs/out",
|
||||
"xml_root_path": "broadWorksCDR.cdrData",
|
||||
"content_fields":[
|
||||
"fields":[
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*variable", "value": "~*req.broadWorksCDR.cdrData.basicModule.localCallId", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*rated", "mandatory": true},
|
||||
@@ -261,7 +261,7 @@
|
||||
"processed_path": "/tmp/partErs1/out",
|
||||
"partial_record_cache": "2s",
|
||||
"partial_cache_expiry_action": "*dump_to_file",
|
||||
"content_fields":[
|
||||
"fields":[
|
||||
{"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"},
|
||||
{"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"},
|
||||
@@ -302,7 +302,7 @@
|
||||
"flags": ["*cdrs"],
|
||||
"partial_record_cache": "1s",
|
||||
"partial_cache_expiry_action": "*post_cdr",
|
||||
"content_fields":[
|
||||
"fields":[
|
||||
{"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"},
|
||||
{"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"},
|
||||
@@ -333,7 +333,7 @@
|
||||
"failed_calls_prefix": "missed_calls",
|
||||
"flags": ["*cdrs"],
|
||||
"partial_record_cache": "2s",
|
||||
"content_fields":[
|
||||
"fields":[
|
||||
{"tag": "Tor", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed","value":"~*req.3;~*req.1;~*req.2", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*variable", "value": "~*req.7", "mandatory": true},
|
||||
|
||||
@@ -153,7 +153,7 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) {
|
||||
agReq); err != nil || !pass {
|
||||
continue
|
||||
}
|
||||
navMp, err := agReq.AsNavigableMap(rdr.Config().ContentFields)
|
||||
navMp, err := agReq.AsNavigableMap(rdr.Config().Fields)
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>",
|
||||
|
||||
@@ -95,7 +95,7 @@ func (rdr *FWVFileER) Serve() (err error) {
|
||||
}
|
||||
filesInDir, _ := ioutil.ReadDir(rdr.rdrDir)
|
||||
for _, file := range filesInDir {
|
||||
if !strings.HasSuffix(file.Name(), utils.XMLSuffix) { // hardcoded file extension for xml event reader
|
||||
if !strings.HasSuffix(file.Name(), utils.FWVSuffix) { // hardcoded file extension for xml event reader
|
||||
continue // used in order to filter the files from directory
|
||||
}
|
||||
go func(fileName string) {
|
||||
@@ -134,12 +134,15 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) {
|
||||
reqVars := make(map[string]interface{})
|
||||
|
||||
for {
|
||||
var hasHeader, hasTrailer bool
|
||||
var headerFields, trailerFields []*config.FCTemplate
|
||||
if rdr.offset == 0 { // First time, set the necessary offsets
|
||||
if err := rdr.setLineLen(file); err != nil {
|
||||
rdr.preProcessFIelds(hasHeader, hasTrailer, headerFields, trailerFields)
|
||||
if err := rdr.setLineLen(file, hasHeader); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Row 0, error: cannot set lineLen: %s", utils.ERs, err.Error()))
|
||||
break
|
||||
}
|
||||
if len(rdr.Config().TrailerFields) != 0 {
|
||||
if hasTrailer {
|
||||
if fi, err := file.Stat(); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Row 0, error: cannot get file stats: %s", utils.ERs, err.Error()))
|
||||
return err
|
||||
@@ -147,8 +150,8 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) {
|
||||
rdr.trailerOffset = fi.Size() - rdr.lineLen
|
||||
}
|
||||
}
|
||||
if len(rdr.Config().HeaderFields) != 0 {
|
||||
if err = rdr.processHeader(file, rowNr, evsPosted, absPath); err != nil {
|
||||
if hasHeader {
|
||||
if err = rdr.processHeader(file, rowNr, evsPosted, absPath, headerFields); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Row 0, error reading header: %s", utils.ERs, err.Error()))
|
||||
return
|
||||
}
|
||||
@@ -179,7 +182,7 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) {
|
||||
agReq); err != nil || !pass {
|
||||
continue
|
||||
}
|
||||
navMp, err := agReq.AsNavigableMap(rdr.Config().ContentFields)
|
||||
navMp, err := agReq.AsNavigableMap(rdr.Config().Fields)
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>",
|
||||
@@ -197,7 +200,7 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) {
|
||||
rdrCfg: rdr.Config()}
|
||||
evsPosted++
|
||||
if rdr.trailerOffset != 0 && rdr.offset >= rdr.trailerOffset {
|
||||
if err := rdr.processTrailer(file, rowNr, evsPosted, absPath); err != nil && err != io.EOF {
|
||||
if err := rdr.processTrailer(file, rowNr, evsPosted, absPath, trailerFields); err != nil && err != io.EOF {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Read trailer error: %s ", utils.ERs, err.Error()))
|
||||
}
|
||||
break
|
||||
@@ -219,10 +222,10 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) {
|
||||
}
|
||||
|
||||
// Sets the line length based on first line, sets offset back to initial after reading
|
||||
func (rdr *FWVFileER) setLineLen(file *os.File) error {
|
||||
func (rdr *FWVFileER) setLineLen(file *os.File, hasHeader bool) error {
|
||||
buff := bufio.NewReader(file)
|
||||
// in case we have header we take the length of first line and add it as headerOffset
|
||||
if len(rdr.Config().HeaderFields) != 0 {
|
||||
if hasHeader {
|
||||
readBytes, err := buff.ReadBytes('\n')
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -241,7 +244,7 @@ func (rdr *FWVFileER) setLineLen(file *os.File) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rdr *FWVFileER) processTrailer(file *os.File, rowNr, evsPosted int, absPath string) (err error) {
|
||||
func (rdr *FWVFileER) processTrailer(file *os.File, rowNr, evsPosted int, absPath string, trailerFields []*config.FCTemplate) (err error) {
|
||||
buf := make([]byte, rdr.trailerOffset)
|
||||
if nRead, err := file.ReadAt(buf, rdr.trailerOffset); err != nil {
|
||||
return err
|
||||
@@ -261,7 +264,7 @@ func (rdr *FWVFileER) processTrailer(file *os.File, rowNr, evsPosted int, absPat
|
||||
agReq); err != nil || !pass {
|
||||
return nil
|
||||
}
|
||||
navMp, err := agReq.AsNavigableMap(rdr.Config().TrailerFields)
|
||||
navMp, err := agReq.AsNavigableMap(trailerFields)
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>",
|
||||
@@ -275,17 +278,17 @@ func (rdr *FWVFileER) processTrailer(file *os.File, rowNr, evsPosted int, absPat
|
||||
return
|
||||
}
|
||||
|
||||
func (rdr *FWVFileER) processHeader(file *os.File, rowNr, evsPosted int, absPath string) error {
|
||||
func (rdr *FWVFileER) processHeader(file *os.File, rowNr, evsPosted int, absPath string, hdrFields []*config.FCTemplate) error {
|
||||
buf := make([]byte, rdr.headerOffset)
|
||||
if nRead, err := file.Read(buf); err != nil {
|
||||
return err
|
||||
} else if nRead != len(buf) {
|
||||
return fmt.Errorf("In header, line len: %d, have read: %d", rdr.headerOffset, nRead)
|
||||
}
|
||||
return rdr.createHeaderMap(string(buf), rowNr, evsPosted, absPath)
|
||||
return rdr.createHeaderMap(string(buf), rowNr, evsPosted, absPath, hdrFields)
|
||||
}
|
||||
|
||||
func (rdr *FWVFileER) createHeaderMap(record string, rowNr, evsPosted int, absPath string) (err error) {
|
||||
func (rdr *FWVFileER) createHeaderMap(record string, rowNr, evsPosted int, absPath string, hdrFields []*config.FCTemplate) (err error) {
|
||||
reqVars := make(map[string]interface{})
|
||||
agReq := agents.NewAgentRequest(
|
||||
config.NewFWVProvider(record), reqVars,
|
||||
@@ -298,7 +301,7 @@ func (rdr *FWVFileER) createHeaderMap(record string, rowNr, evsPosted int, absPa
|
||||
agReq); err != nil || !pass {
|
||||
return nil
|
||||
}
|
||||
navMp, err := agReq.AsNavigableMap(rdr.Config().HeaderFields)
|
||||
navMp, err := agReq.AsNavigableMap(hdrFields)
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>",
|
||||
@@ -314,3 +317,16 @@ func (rdr *FWVFileER) createHeaderMap(record string, rowNr, evsPosted int, absPa
|
||||
evsPosted++
|
||||
return
|
||||
}
|
||||
|
||||
func (rdr *FWVFileER) preProcessFIelds(hasHeader, hasTrailer bool, headerFields, trailerFields []*config.FCTemplate) {
|
||||
for _, fld := range rdr.Config().Fields {
|
||||
if fld.Value[0].Rules == utils.DynamicDataPrefix+utils.MetaHdr {
|
||||
hasHeader = true
|
||||
headerFields = append(headerFields, fld)
|
||||
}
|
||||
if fld.Value[0].Rules == utils.DynamicDataPrefix+utils.MetaTrl {
|
||||
hasTrailer = true
|
||||
trailerFields = append(trailerFields, fld)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -145,7 +145,7 @@ func (rdr *XMLFileER) processFile(fPath, fName string) (err error) {
|
||||
agReq); err != nil || !pass {
|
||||
continue
|
||||
}
|
||||
navMp, err := agReq.AsNavigableMap(rdr.Config().ContentFields)
|
||||
navMp, err := agReq.AsNavigableMap(rdr.Config().Fields)
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>",
|
||||
|
||||
@@ -175,9 +175,9 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) {
|
||||
}
|
||||
|
||||
// build Usage from contentFields based on record lenght
|
||||
for i, cntFld := range rdr.Config().ContentFields {
|
||||
for i, cntFld := range rdr.Config().Fields {
|
||||
if cntFld.FieldId == utils.Usage {
|
||||
rdr.Config().ContentFields[i].Value = config.NewRSRParsersMustCompile("~*req."+strconv.Itoa(len(record)-1), true, utils.INFIELD_SEP) // in case of flatstore, last element will be the duration computed by us
|
||||
rdr.Config().Fields[i].Value = config.NewRSRParsersMustCompile("~*req."+strconv.Itoa(len(record)-1), true, utils.INFIELD_SEP) // in case of flatstore, last element will be the duration computed by us
|
||||
}
|
||||
}
|
||||
rowNr++ // increment the rowNr after checking if it's not the end of file
|
||||
@@ -192,7 +192,7 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) {
|
||||
agReq); err != nil || !pass {
|
||||
continue
|
||||
}
|
||||
navMp, err := agReq.AsNavigableMap(rdr.Config().ContentFields)
|
||||
navMp, err := agReq.AsNavigableMap(rdr.Config().Fields)
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>",
|
||||
|
||||
@@ -172,7 +172,7 @@ func (rdr *KafkaER) processMessage(msg []byte) (err error) {
|
||||
return
|
||||
}
|
||||
var navMp *config.NavigableMap
|
||||
if navMp, err = agReq.AsNavigableMap(rdr.Config().ContentFields); err != nil {
|
||||
if navMp, err = agReq.AsNavigableMap(rdr.Config().Fields); err != nil {
|
||||
return
|
||||
}
|
||||
rdr.rdrEvents <- &erEvent{cgrEvent: navMp.AsCGREvent(
|
||||
|
||||
@@ -168,7 +168,7 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) {
|
||||
agReq); err != nil || !pass {
|
||||
continue
|
||||
}
|
||||
navMp, err := agReq.AsNavigableMap(rdr.Config().ContentFields)
|
||||
navMp, err := agReq.AsNavigableMap(rdr.Config().Fields)
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>",
|
||||
|
||||
@@ -199,7 +199,7 @@ func (rdr *SQLEventReader) processMessage(msg map[string]interface{}) (err error
|
||||
return
|
||||
}
|
||||
var navMp *config.NavigableMap
|
||||
if navMp, err = agReq.AsNavigableMap(rdr.Config().ContentFields); err != nil {
|
||||
if navMp, err = agReq.AsNavigableMap(rdr.Config().Fields); err != nil {
|
||||
return
|
||||
}
|
||||
rdr.rdrEvents <- &erEvent{
|
||||
|
||||
@@ -515,6 +515,8 @@ const (
|
||||
MetaReq = "*req"
|
||||
MetaVars = "*vars"
|
||||
MetaRep = "*rep"
|
||||
MetaHdr = "*hdr"
|
||||
MetaTrl = "*trl"
|
||||
CGROriginHost = "cgr_originhost"
|
||||
MetaInitiate = "*initiate"
|
||||
MetaUpdate = "*update"
|
||||
|
||||
Reference in New Issue
Block a user