diff --git a/agents/agentreq.go b/agents/agentreq.go index 21a2ee811..d2612f247 100644 --- a/agents/agentreq.go +++ b/agents/agentreq.go @@ -38,7 +38,8 @@ func NewAgentRequest(req config.DataProvider, rply *config.NavigableMap, tntTpl config.RSRParsers, dfltTenant, timezone string, - filterS *engine.FilterS) (ar *AgentRequest) { + filterS *engine.FilterS, + header, trailer config.DataProvider) (ar *AgentRequest) { if cgrRply == nil { cgrRply = config.NewNavigableMap(nil) } @@ -53,6 +54,8 @@ func NewAgentRequest(req config.DataProvider, Reply: rply, Timezone: timezone, filterS: filterS, + Header: header, + Trailer: trailer, } // populate tenant if tntIf, err := ar.ParseField( @@ -78,6 +81,8 @@ type AgentRequest struct { Tenant, Timezone string filterS *engine.FilterS + Header config.DataProvider + Trailer config.DataProvider } // String implements engine.DataProvider @@ -107,6 +112,10 @@ func (ar *AgentRequest) FieldAsInterface(fldPath []string) (val interface{}, err val, err = ar.Reply.FieldAsInterface(fldPath[1:]) case utils.MetaCGRAReq: val, err = ar.CGRAReq.FieldAsInterface(fldPath[1:]) + case utils.MetaHdr: + val, err = ar.Header.FieldAsInterface(fldPath[1:]) + case utils.MetaTrl: + val, err = ar.Trailer.FieldAsInterface(fldPath[1:]) } if nmItems, isNMItems := val.([]*config.NMItem); isNMItems { // special handling of NMItems, take the last value out of it val = nmItems[len(nmItems)-1].Data // could be we need nil protection here diff --git a/agents/agentreq_test.go b/agents/agentreq_test.go index 02b57e345..bfb9829f9 100644 --- a/agents/agentreq_test.go +++ b/agents/agentreq_test.go @@ -41,7 +41,7 @@ func TestAgReqAsNavigableMap(t *testing.T) { data := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set([]string{utils.CGRID}, utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), @@ -137,7 +137,7 @@ func TestAgReqMaxCost(t *testing.T) { data := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set([]string{utils.CapMaxUsage}, "120s", false, false) @@ -182,7 +182,7 @@ func TestAgReqParseFieldDiameter(t *testing.T) { config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) //pass the data provider to agent request - agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) tplFlds := []*config.FCTemplate{ &config.FCTemplate{Tag: "MandatoryFalse", @@ -232,7 +232,7 @@ func TestAgReqParseFieldRadius(t *testing.T) { dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) //pass the data provider to agent request - agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) tplFlds := []*config.FCTemplate{ &config.FCTemplate{Tag: "MandatoryFalse", FieldId: "MandatoryFalse", Type: utils.META_COMPOSED, @@ -272,7 +272,7 @@ Host: api.cgrates.org dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) //pass the data provider to agent request - agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) tplFlds := []*config.FCTemplate{ &config.FCTemplate{Tag: "MandatoryFalse", FieldId: "MandatoryFalse", Type: utils.META_COMPOSED, @@ -344,7 +344,7 @@ func TestAgReqParseFieldHttpXml(t *testing.T) { filterS := engine.NewFilterS(cfg, nil, dm) //pass the data provider to agent request - agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) tplFlds := []*config.FCTemplate{ &config.FCTemplate{Tag: "MandatoryFalse", FieldId: "MandatoryFalse", Type: utils.META_COMPOSED, @@ -372,7 +372,7 @@ func TestAgReqEmptyFilter(t *testing.T) { data := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set([]string{utils.CGRID}, utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), @@ -415,7 +415,7 @@ func TestAgReqMetaExponent(t *testing.T) { dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items), config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) agReq.CGRRequest.Set([]string{"Value"}, "2", false, false) agReq.CGRRequest.Set([]string{"Exponent"}, "2", false, false) @@ -441,7 +441,7 @@ func TestAgReqCGRActiveRequest(t *testing.T) { data := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) // populate request, emulating the way will be done in HTTPAgent tplFlds := []*config.FCTemplate{ @@ -484,7 +484,7 @@ func TestAgReqFieldAsNone(t *testing.T) { data := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set([]string{utils.ToR}, utils.VOICE, false, false) agReq.CGRRequest.Set([]string{utils.Account}, "1001", false, false) @@ -521,7 +521,7 @@ func TestAgReqFieldAsNone2(t *testing.T) { dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items), config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set([]string{utils.ToR}, utils.VOICE, false, false) agReq.CGRRequest.Set([]string{utils.Account}, "1001", false, false) @@ -561,7 +561,7 @@ func TestAgReqAsNavigableMap2(t *testing.T) { data := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set([]string{utils.ToR}, utils.VOICE, false, false) agReq.CGRRequest.Set([]string{utils.Account}, "1001", false, false) @@ -618,7 +618,7 @@ func TestAgReqFieldAsInterface(t *testing.T) { dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items), config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) // populate request, emulating the way will be done in HTTPAgent agReq.CGRAReq = config.NewNavigableMap(nil) agReq.CGRAReq.Set([]string{utils.Usage}, []*config.NMItem{{Data: 3 * time.Minute}}, false, false) @@ -684,7 +684,7 @@ func TestAgReqNewARWithCGRRplyAndRply(t *testing.T) { } cgrRply := config.NewNavigableMap(ev2) - agReq := NewAgentRequest(nil, nil, cgrRply, rply, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, cgrRply, rply, nil, "cgrates.org", "", filterS, nil, nil) tplFlds := []*config.FCTemplate{ &config.FCTemplate{Tag: "Fld1", @@ -725,7 +725,7 @@ func TestAgReqSetCGRReplyWithError(t *testing.T) { } rply := config.NewNavigableMap(ev) - agReq := NewAgentRequest(nil, nil, nil, rply, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, nil, rply, nil, "cgrates.org", "", filterS, nil, nil) agReq.setCGRReply(nil, utils.ErrNotFound) @@ -774,7 +774,7 @@ func TestAgReqSetCGRReplyWithoutError(t *testing.T) { utils.Error: "", } - agReq := NewAgentRequest(nil, nil, nil, rply, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, nil, rply, nil, "cgrates.org", "", filterS, nil, nil) agReq.setCGRReply(myEv, nil) @@ -819,7 +819,7 @@ func TestAgReqParseFieldMetaCCUsage(t *testing.T) { config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) //pass the data provider to agent request - agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) tplFlds := []*config.FCTemplate{ &config.FCTemplate{Tag: "CCUsage", Filters: []string{}, @@ -897,7 +897,7 @@ func TestAgReqParseFieldMetaUsageDifference(t *testing.T) { config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) //pass the data provider to agent request - agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) tplFlds := []*config.FCTemplate{ &config.FCTemplate{Tag: "Usage", Filters: []string{}, @@ -963,7 +963,7 @@ func TestAgReqParseFieldMetaSum(t *testing.T) { dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) //pass the data provider to agent request - agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) tplFlds := []*config.FCTemplate{ &config.FCTemplate{Tag: "Sum", Filters: []string{}, @@ -1007,7 +1007,7 @@ func TestAgReqParseFieldMetaDifference(t *testing.T) { config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) //pass the data provider to agent request - agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) tplFlds := []*config.FCTemplate{ &config.FCTemplate{Tag: "Diff", Filters: []string{}, @@ -1051,7 +1051,7 @@ func TestAgReqParseFieldMetaValueExponent(t *testing.T) { dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) //pass the data provider to agent request - agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) tplFlds := []*config.FCTemplate{ &config.FCTemplate{Tag: "ValExp", Filters: []string{}, diff --git a/agents/diamagent.go b/agents/diamagent.go index 647ce5d75..494a985c2 100644 --- a/agents/diamagent.go +++ b/agents/diamagent.go @@ -208,7 +208,7 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) { reqProcessor.Tenant, da.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(reqProcessor.Timezone, da.cgrCfg.GeneralCfg().DefaultTimezone), - da.filterS)) + da.filterS, nil, nil)) if lclProcessed { processed = lclProcessed } @@ -452,7 +452,7 @@ func (da *DiameterAgent) V1DisconnectSession(args utils.AttrDisconnectSession, r config.NewNavigableMap(nil), nil, da.cgrCfg.GeneralCfg().DefaultTenant, - da.cgrCfg.GeneralCfg().DefaultTimezone, da.filterS) + da.cgrCfg.GeneralCfg().DefaultTimezone, da.filterS, nil, nil) nM, err := aReq.AsNavigableMap(da.cgrCfg.DiameterAgentCfg().Templates[da.cgrCfg.DiameterAgentCfg().ASRTemplate]) if err != nil { utils.Logger.Warning( diff --git a/agents/dnsagent.go b/agents/dnsagent.go index 12007d601..03beec96c 100644 --- a/agents/dnsagent.go +++ b/agents/dnsagent.go @@ -125,7 +125,7 @@ func (da *DNSAgent) handleMessage(w dns.ResponseWriter, req *dns.Msg) { da.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(da.cgrCfg.DNSAgentCfg().Timezone, da.cgrCfg.GeneralCfg().DefaultTimezone), - da.fltrS)) + da.fltrS, nil, nil)) if lclProcessed { processed = lclProcessed } diff --git a/agents/httpagent.go b/agents/httpagent.go index f37bded13..b707c67cc 100644 --- a/agents/httpagent.go +++ b/agents/httpagent.go @@ -70,7 +70,7 @@ func (ha *HTTPAgent) ServeHTTP(w http.ResponseWriter, req *http.Request) { reqProcessor.Tenant, ha.dfltTenant, utils.FirstNonEmpty(reqProcessor.Timezone, config.CgrConfig().GeneralCfg().DefaultTimezone), - ha.filterS) + ha.filterS, nil, nil) lclProcessed, err := ha.processRequest(reqProcessor, agReq) if err != nil { utils.Logger.Warning( diff --git a/agents/libdiam.go b/agents/libdiam.go index 80b9390e5..93064c758 100644 --- a/agents/libdiam.go +++ b/agents/libdiam.go @@ -481,7 +481,7 @@ func diamErr(m *diam.Message, resCode uint32, newDADataProvider(nil, m), reqVars, config.NewNavigableMap(nil), config.NewNavigableMap(nil), - nil, tnt, tmz, filterS) + nil, tnt, tmz, filterS, nil, nil) var rplyData *config.NavigableMap if rplyData, err = aReq.AsNavigableMap(tpl); err != nil { return diff --git a/agents/librad_test.go b/agents/librad_test.go index 61f40b2bc..2b36f7a16 100644 --- a/agents/librad_test.go +++ b/agents/librad_test.go @@ -96,7 +96,7 @@ func TestRadComposedFieldValue(t *testing.T) { if err := pkt.AddAVPWithName("Cisco-NAS-Port", "CGR1", "Cisco"); err != nil { t.Error(err) } - agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil, nil, nil) agReq.Vars.Set([]string{MetaRadReqType}, MetaRadAcctStart, false, false) agReq.Vars.Set([]string{"Cisco"}, "CGR1", false, false) agReq.Vars.Set([]string{"User-Name"}, "flopsy", false, false) @@ -116,7 +116,7 @@ func TestRadFieldOutVal(t *testing.T) { t.Error(err) } eOut := fmt.Sprintf("%s|flopsy|CGR1", MetaRadAcctStart) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil, nil, nil) agReq.Vars.Set([]string{MetaRadReqType}, MetaRadAcctStart, false, false) agReq.Vars.Set([]string{"Cisco"}, "CGR1", false, false) agReq.Vars.Set([]string{"User-Name"}, "flopsy", false, false) @@ -137,7 +137,7 @@ func TestRadReplyAppendAttributes(t *testing.T) { &config.FCTemplate{Tag: "Acct-Session-Time", FieldId: "Acct-Session-Time", Type: utils.META_COMPOSED, Value: config.NewRSRParsersMustCompile("~*cgrep.MaxUsage{*duration_seconds}", true, utils.INFIELD_SEP)}, } - agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil, nil, nil) agReq.CGRReply.Set([]string{utils.CapMaxUsage}, time.Duration(time.Hour), false, false) agReq.CGRReply.Set([]string{utils.CapAttributes, "RadReply"}, "AccessAccept", false, false) agReq.CGRReply.Set([]string{utils.CapAttributes, utils.Account}, "1001", false, false) diff --git a/agents/radagent.go b/agents/radagent.go index 208730c5d..4e2018ce3 100644 --- a/agents/radagent.go +++ b/agents/radagent.go @@ -82,7 +82,7 @@ func (ra *RadiusAgent) handleAuth(req *radigo.Packet) (rpl *radigo.Packet, err e reqProcessor.Tenant, ra.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(reqProcessor.Timezone, config.CgrConfig().GeneralCfg().DefaultTimezone), - ra.filterS) + ra.filterS, nil, nil) agReq.Vars.Set([]string{MetaRadReqType}, utils.StringToInterface(MetaRadAuth), false, true) var lclProcessed bool if lclProcessed, err = ra.processRequest(reqProcessor, agReq, rpl); lclProcessed { @@ -119,7 +119,7 @@ func (ra *RadiusAgent) handleAcct(req *radigo.Packet) (rpl *radigo.Packet, err e reqProcessor.Tenant, ra.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(reqProcessor.Timezone, config.CgrConfig().GeneralCfg().DefaultTimezone), - ra.filterS) + ra.filterS, nil, nil) var lclProcessed bool if lclProcessed, err = ra.processRequest(reqProcessor, agReq, rpl); lclProcessed { processed = lclProcessed diff --git a/config/fwvdp.go b/config/fwvdp.go index 4754e4927..5d5c1cc45 100644 --- a/config/fwvdp.go +++ b/config/fwvdp.go @@ -58,7 +58,7 @@ func (fP *FWVProvider) FieldAsInterface(fldPath []string) (data interface{}, err err = nil // cancel previous err indexes := strings.Split(fwvIdx, "-") if len(indexes) != 2 { - return "", fmt.Errorf("Invalid format for index : %+v", fldPath[1]) + return "", fmt.Errorf("Invalid format for index : %+v", fldPath) } startIndex, err := strconv.Atoi(indexes[0]) if err != nil { diff --git a/data/conf/samples/ers_mysql/cgrates.json b/data/conf/samples/ers_mysql/cgrates.json index 016526c94..a5edafdfb 100644 --- a/data/conf/samples/ers_mysql/cgrates.json +++ b/data/conf/samples/ers_mysql/cgrates.json @@ -225,7 +225,7 @@ "source_path": "/tmp/fwvErs/in", "flags": ["*cdrs"], "processed_path": "/tmp/fwvErs/out", - "content_fields": [ + "fields": [ {"tag": "FileName", "field_id": "CdrFileName", "type": "*variable", "value": "~*hdr.95-135", "padding":"right"}, {"tag": "FileSeqNr", "field_id": "FileSeqNr", "type": "*variable", "value": "~*hdr.135-141", "padding":"zeroleft"}, {"tag": "AccId1", "field_id": "AccId1", "type": "*variable", "value": "~*hdr.135-141", "padding":"zeroleft"}, diff --git a/ers/filecsv.go b/ers/filecsv.go index e26e0c483..e3481fb71 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -148,7 +148,7 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS) // create an AgentRequest + rdr.fltrS, nil, nil) // create an AgentRequest if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass { continue diff --git a/ers/filefwv.go b/ers/filefwv.go index 1d1a54869..49d2d2f39 100644 --- a/ers/filefwv.go +++ b/ers/filefwv.go @@ -58,7 +58,6 @@ type FWVFileER struct { cgrCfg *config.CGRConfig cfgIdx int // index of config instance within ERsCfg.Readers fltrS *engine.FilterS - headerMap *config.NavigableMap rdrDir string rdrEvents chan *erEvent // channel to dispatch the events created to rdrError chan error @@ -68,6 +67,9 @@ type FWVFileER struct { offset int64 // Index of the next byte to process headerOffset int64 trailerOffset int64 // Index where trailer starts, to be used as boundary when reading cdrs + trailerLenght int64 + headerDP config.DataProvider + trailerDP config.DataProvider } func (rdr *FWVFileER) Config() *config.EventReaderCfg { @@ -137,17 +139,28 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) { var hasHeader, hasTrailer bool var headerFields, trailerFields []*config.FCTemplate if rdr.offset == 0 { // First time, set the necessary offsets - rdr.preProcessFIelds(hasHeader, hasTrailer, headerFields, trailerFields) - if err := rdr.setLineLen(file, hasHeader); err != nil { + // preprocess the fields for header and trailer + for _, fld := range rdr.Config().Fields { + if strings.HasPrefix(fld.Value[0].Rules, utils.DynamicDataPrefix+utils.MetaHdr) { + hasHeader = true + headerFields = append(headerFields, fld) + } + if strings.HasPrefix(fld.Value[0].Rules, utils.DynamicDataPrefix+utils.MetaTrl) { + hasTrailer = true + trailerFields = append(trailerFields, fld) + } + } + + if err = rdr.setLineLen(file, hasHeader, hasTrailer); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Row 0, error: cannot set lineLen: %s", utils.ERs, err.Error())) break } if hasTrailer { - if fi, err := file.Stat(); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Row 0, error: cannot get file stats: %s", utils.ERs, err.Error())) - return err - } else { - rdr.trailerOffset = fi.Size() - rdr.lineLen + + // process trailer here + if err = rdr.processTrailer(file, rowNr, evsPosted, absPath, trailerFields); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Read trailer error: %s ", utils.ERs, err.Error())) + return } } if hasHeader { @@ -160,14 +173,15 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) { } buf := make([]byte, rdr.lineLen) - nRead, err := file.Read(buf) - if err != nil { - rdr.offset += rdr.lineLen // increase the offset when exit + if nRead, err := file.Read(buf); err != nil { + if err == io.EOF { + break + } return err - } else if nRead != len(buf) { + } else if nRead != len(buf) && int64(nRead) != rdr.trailerLenght { utils.Logger.Err(fmt.Sprintf("<%s> Could not read complete line, have instead: %s", utils.ERs, string(buf))) rdr.offset += rdr.lineLen // increase the offset when exit - break + continue } rowNr++ // increment the rowNr after checking if it's not the end of file record := string(buf) @@ -177,7 +191,7 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS) // create an AgentRequest + rdr.fltrS, rdr.headerDP, rdr.trailerDP) // create an AgentRequest if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass { continue @@ -190,21 +204,12 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) { rdr.offset += rdr.lineLen // increase the offset when exit continue } - //backwards compatible with CDRC - if rdr.headerMap != nil { - navMp.Merge(rdr.headerMap) - } rdr.offset += rdr.lineLen // increase the offset rdr.rdrEvents <- &erEvent{cgrEvent: navMp.AsCGREvent( agReq.Tenant, utils.NestingSep), rdrCfg: rdr.Config()} evsPosted++ - if rdr.trailerOffset != 0 && rdr.offset >= rdr.trailerOffset { - if err := rdr.processTrailer(file, rowNr, evsPosted, absPath, trailerFields); err != nil && err != io.EOF { - utils.Logger.Err(fmt.Sprintf("<%s> Read trailer error: %s ", utils.ERs, err.Error())) - } - break - } + } if rdr.Config().ProcessedPath != "" { @@ -222,21 +227,37 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) { } // Sets the line length based on first line, sets offset back to initial after reading -func (rdr *FWVFileER) setLineLen(file *os.File, hasHeader bool) error { +func (rdr *FWVFileER) setLineLen(file *os.File, hasHeader, hasTrailer bool) error { buff := bufio.NewReader(file) // in case we have header we take the length of first line and add it as headerOffset - if hasHeader { + i := 0 + lastLineSize := 0 + for { readBytes, err := buff.ReadBytes('\n') if err != nil { - return err + break } - rdr.headerOffset = int64(len(readBytes)) + if hasHeader && i == 0 { + rdr.headerOffset = int64(len(readBytes)) + i++ + continue + } + if (hasHeader && i == 1) || (!hasHeader && i == 0) { + rdr.lineLen = int64(len(readBytes)) + i++ + continue + } + lastLineSize = len(readBytes) } - readBytes, err := buff.ReadBytes('\n') - if err != nil { - return err + if hasTrailer { + if fi, err := file.Stat(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Row 0, error: cannot get file stats: %s", utils.ERs, err.Error())) + return err + } else { + rdr.trailerOffset = fi.Size() - int64(lastLineSize) + rdr.trailerLenght = int64(lastLineSize) + } } - rdr.lineLen = int64(len(readBytes)) if _, err := file.Seek(0, 0); err != nil { return err @@ -245,21 +266,22 @@ func (rdr *FWVFileER) setLineLen(file *os.File, hasHeader bool) error { } func (rdr *FWVFileER) processTrailer(file *os.File, rowNr, evsPosted int, absPath string, trailerFields []*config.FCTemplate) (err error) { - buf := make([]byte, rdr.trailerOffset) - if nRead, err := file.ReadAt(buf, rdr.trailerOffset); err != nil { + buf := make([]byte, rdr.trailerLenght) + if nRead, err := file.ReadAt(buf, rdr.trailerOffset); err != nil && err != io.EOF { return err } else if nRead != len(buf) { - return fmt.Errorf("In trailer, line len: %d, have read: %d", rdr.trailerOffset, nRead) + return fmt.Errorf("In trailer, line len: %d, have read: %d instead of: %d", rdr.trailerOffset, nRead, len(buf)) } record := string(buf) reqVars := make(map[string]interface{}) + rdr.trailerDP = config.NewFWVProvider(record) agReq := agents.NewAgentRequest( - config.NewFWVProvider(record), reqVars, + nil, reqVars, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS) // create an AgentRequest + rdr.fltrS, nil, rdr.trailerDP) // create an AgentRequest if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass { return nil @@ -275,6 +297,8 @@ func (rdr *FWVFileER) processTrailer(file *os.File, rowNr, evsPosted int, absPat agReq.Tenant, utils.NestingSep), rdrCfg: rdr.Config()} evsPosted++ + // reset the cursor after process the trailer + _, err = file.Seek(0, 0) return } @@ -290,13 +314,14 @@ func (rdr *FWVFileER) processHeader(file *os.File, rowNr, evsPosted int, absPath func (rdr *FWVFileER) createHeaderMap(record string, rowNr, evsPosted int, absPath string, hdrFields []*config.FCTemplate) (err error) { reqVars := make(map[string]interface{}) + rdr.headerDP = config.NewFWVProvider(record) agReq := agents.NewAgentRequest( - config.NewFWVProvider(record), reqVars, + nil, reqVars, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS) // create an AgentRequest + rdr.fltrS, rdr.headerDP, nil) // create an AgentRequest if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass { return nil @@ -309,7 +334,6 @@ func (rdr *FWVFileER) createHeaderMap(record string, rowNr, evsPosted int, absPa rdr.offset += rdr.lineLen // increase the offset when exit return err } - rdr.headerMap = navMp rdr.offset += rdr.headerOffset // increase the offset rdr.rdrEvents <- &erEvent{cgrEvent: navMp.AsCGREvent( agReq.Tenant, utils.NestingSep), @@ -317,16 +341,3 @@ func (rdr *FWVFileER) createHeaderMap(record string, rowNr, evsPosted int, absPa evsPosted++ return } - -func (rdr *FWVFileER) preProcessFIelds(hasHeader, hasTrailer bool, headerFields, trailerFields []*config.FCTemplate) { - for _, fld := range rdr.Config().Fields { - if fld.Value[0].Rules == utils.DynamicDataPrefix+utils.MetaHdr { - hasHeader = true - headerFields = append(headerFields, fld) - } - if fld.Value[0].Rules == utils.DynamicDataPrefix+utils.MetaTrl { - hasTrailer = true - trailerFields = append(trailerFields, fld) - } - } -} diff --git a/ers/filexml.go b/ers/filexml.go index fa8e783f9..c8953f406 100644 --- a/ers/filexml.go +++ b/ers/filexml.go @@ -140,7 +140,7 @@ func (rdr *XMLFileER) processFile(fPath, fName string) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS) // create an AgentRequest + rdr.fltrS, nil, nil) // create an AgentRequest if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass { continue diff --git a/ers/flatstore.go b/ers/flatstore.go index d5a294955..20896c7a6 100644 --- a/ers/flatstore.go +++ b/ers/flatstore.go @@ -187,7 +187,7 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS) // create an AgentRequest + rdr.fltrS, nil, nil) // create an AgentRequest if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass { continue diff --git a/ers/kafka.go b/ers/kafka.go index a1c69b24a..d7ff9050e 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -165,7 +165,7 @@ func (rdr *KafkaER) processMessage(msg []byte) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS) // create an AgentRequest + rdr.fltrS, nil, nil) // create an AgentRequest var pass bool if pass, err = rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass { diff --git a/ers/partial_csv.go b/ers/partial_csv.go index 2ba1fe6bc..adb26d8d3 100644 --- a/ers/partial_csv.go +++ b/ers/partial_csv.go @@ -163,7 +163,7 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS) // create an AgentRequest + rdr.fltrS, nil, nil) // create an AgentRequest if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass { continue diff --git a/ers/sql.go b/ers/sql.go index 4f39bfe30..d898b771e 100644 --- a/ers/sql.go +++ b/ers/sql.go @@ -192,7 +192,7 @@ func (rdr *SQLEventReader) processMessage(msg map[string]interface{}) (err error rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS) // create an AgentRequest + rdr.fltrS, nil, nil) // create an AgentRequest var pass bool if pass, err = rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass {