Merge ContentFields,HeaderFields,TrailerFields into one Files

This commit is contained in:
TeoV
2020-01-29 14:01:22 +02:00
committed by Dan Christian Bogos
parent 4f2d99c982
commit 52f0e0a86c
17 changed files with 113 additions and 93 deletions

View File

@@ -38,7 +38,8 @@ func NewAgentRequest(req config.DataProvider,
rply *config.NavigableMap,
tntTpl config.RSRParsers,
dfltTenant, timezone string,
filterS *engine.FilterS) (ar *AgentRequest) {
filterS *engine.FilterS,
header, trailer config.DataProvider) (ar *AgentRequest) {
if cgrRply == nil {
cgrRply = config.NewNavigableMap(nil)
}
@@ -53,6 +54,8 @@ func NewAgentRequest(req config.DataProvider,
Reply: rply,
Timezone: timezone,
filterS: filterS,
Header: header,
Trailer: trailer,
}
// populate tenant
if tntIf, err := ar.ParseField(
@@ -78,6 +81,8 @@ type AgentRequest struct {
Tenant,
Timezone string
filterS *engine.FilterS
Header config.DataProvider
Trailer config.DataProvider
}
// String implements engine.DataProvider
@@ -107,6 +112,10 @@ func (ar *AgentRequest) FieldAsInterface(fldPath []string) (val interface{}, err
val, err = ar.Reply.FieldAsInterface(fldPath[1:])
case utils.MetaCGRAReq:
val, err = ar.CGRAReq.FieldAsInterface(fldPath[1:])
case utils.MetaHdr:
val, err = ar.Header.FieldAsInterface(fldPath[1:])
case utils.MetaTrl:
val, err = ar.Trailer.FieldAsInterface(fldPath[1:])
}
if nmItems, isNMItems := val.([]*config.NMItem); isNMItems { // special handling of NMItems, take the last value out of it
val = nmItems[len(nmItems)-1].Data // could be we need nil protection here

View File

@@ -41,7 +41,7 @@ func TestAgReqAsNavigableMap(t *testing.T) {
data := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, dm)
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil)
// populate request, emulating the way will be done in HTTPAgent
agReq.CGRRequest.Set([]string{utils.CGRID},
utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()),
@@ -137,7 +137,7 @@ func TestAgReqMaxCost(t *testing.T) {
data := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, dm)
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil)
// populate request, emulating the way will be done in HTTPAgent
agReq.CGRRequest.Set([]string{utils.CapMaxUsage}, "120s", false, false)
@@ -182,7 +182,7 @@ func TestAgReqParseFieldDiameter(t *testing.T) {
config.CgrConfig().CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, dm)
//pass the data provider to agent request
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil)
tplFlds := []*config.FCTemplate{
&config.FCTemplate{Tag: "MandatoryFalse",
@@ -232,7 +232,7 @@ func TestAgReqParseFieldRadius(t *testing.T) {
dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, dm)
//pass the data provider to agent request
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil)
tplFlds := []*config.FCTemplate{
&config.FCTemplate{Tag: "MandatoryFalse",
FieldId: "MandatoryFalse", Type: utils.META_COMPOSED,
@@ -272,7 +272,7 @@ Host: api.cgrates.org
dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, dm)
//pass the data provider to agent request
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil)
tplFlds := []*config.FCTemplate{
&config.FCTemplate{Tag: "MandatoryFalse",
FieldId: "MandatoryFalse", Type: utils.META_COMPOSED,
@@ -344,7 +344,7 @@ func TestAgReqParseFieldHttpXml(t *testing.T) {
filterS := engine.NewFilterS(cfg, nil, dm)
//pass the data provider to agent request
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil)
tplFlds := []*config.FCTemplate{
&config.FCTemplate{Tag: "MandatoryFalse",
FieldId: "MandatoryFalse", Type: utils.META_COMPOSED,
@@ -372,7 +372,7 @@ func TestAgReqEmptyFilter(t *testing.T) {
data := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, dm)
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil)
// populate request, emulating the way will be done in HTTPAgent
agReq.CGRRequest.Set([]string{utils.CGRID},
utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()),
@@ -415,7 +415,7 @@ func TestAgReqMetaExponent(t *testing.T) {
dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items),
config.CgrConfig().CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, dm)
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil)
agReq.CGRRequest.Set([]string{"Value"}, "2", false, false)
agReq.CGRRequest.Set([]string{"Exponent"}, "2", false, false)
@@ -441,7 +441,7 @@ func TestAgReqCGRActiveRequest(t *testing.T) {
data := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, dm)
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil)
// populate request, emulating the way will be done in HTTPAgent
tplFlds := []*config.FCTemplate{
@@ -484,7 +484,7 @@ func TestAgReqFieldAsNone(t *testing.T) {
data := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, dm)
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil)
// populate request, emulating the way will be done in HTTPAgent
agReq.CGRRequest.Set([]string{utils.ToR}, utils.VOICE, false, false)
agReq.CGRRequest.Set([]string{utils.Account}, "1001", false, false)
@@ -521,7 +521,7 @@ func TestAgReqFieldAsNone2(t *testing.T) {
dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items),
config.CgrConfig().CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, dm)
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil)
// populate request, emulating the way will be done in HTTPAgent
agReq.CGRRequest.Set([]string{utils.ToR}, utils.VOICE, false, false)
agReq.CGRRequest.Set([]string{utils.Account}, "1001", false, false)
@@ -561,7 +561,7 @@ func TestAgReqAsNavigableMap2(t *testing.T) {
data := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, dm)
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil)
// populate request, emulating the way will be done in HTTPAgent
agReq.CGRRequest.Set([]string{utils.ToR}, utils.VOICE, false, false)
agReq.CGRRequest.Set([]string{utils.Account}, "1001", false, false)
@@ -618,7 +618,7 @@ func TestAgReqFieldAsInterface(t *testing.T) {
dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items),
config.CgrConfig().CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, dm)
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil)
// populate request, emulating the way will be done in HTTPAgent
agReq.CGRAReq = config.NewNavigableMap(nil)
agReq.CGRAReq.Set([]string{utils.Usage}, []*config.NMItem{{Data: 3 * time.Minute}}, false, false)
@@ -684,7 +684,7 @@ func TestAgReqNewARWithCGRRplyAndRply(t *testing.T) {
}
cgrRply := config.NewNavigableMap(ev2)
agReq := NewAgentRequest(nil, nil, cgrRply, rply, nil, "cgrates.org", "", filterS)
agReq := NewAgentRequest(nil, nil, cgrRply, rply, nil, "cgrates.org", "", filterS, nil, nil)
tplFlds := []*config.FCTemplate{
&config.FCTemplate{Tag: "Fld1",
@@ -725,7 +725,7 @@ func TestAgReqSetCGRReplyWithError(t *testing.T) {
}
rply := config.NewNavigableMap(ev)
agReq := NewAgentRequest(nil, nil, nil, rply, nil, "cgrates.org", "", filterS)
agReq := NewAgentRequest(nil, nil, nil, rply, nil, "cgrates.org", "", filterS, nil, nil)
agReq.setCGRReply(nil, utils.ErrNotFound)
@@ -774,7 +774,7 @@ func TestAgReqSetCGRReplyWithoutError(t *testing.T) {
utils.Error: "",
}
agReq := NewAgentRequest(nil, nil, nil, rply, nil, "cgrates.org", "", filterS)
agReq := NewAgentRequest(nil, nil, nil, rply, nil, "cgrates.org", "", filterS, nil, nil)
agReq.setCGRReply(myEv, nil)
@@ -819,7 +819,7 @@ func TestAgReqParseFieldMetaCCUsage(t *testing.T) {
config.CgrConfig().CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, dm)
//pass the data provider to agent request
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil)
tplFlds := []*config.FCTemplate{
&config.FCTemplate{Tag: "CCUsage", Filters: []string{},
@@ -897,7 +897,7 @@ func TestAgReqParseFieldMetaUsageDifference(t *testing.T) {
config.CgrConfig().CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, dm)
//pass the data provider to agent request
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil)
tplFlds := []*config.FCTemplate{
&config.FCTemplate{Tag: "Usage", Filters: []string{},
@@ -963,7 +963,7 @@ func TestAgReqParseFieldMetaSum(t *testing.T) {
dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, dm)
//pass the data provider to agent request
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil)
tplFlds := []*config.FCTemplate{
&config.FCTemplate{Tag: "Sum", Filters: []string{},
@@ -1007,7 +1007,7 @@ func TestAgReqParseFieldMetaDifference(t *testing.T) {
config.CgrConfig().CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, dm)
//pass the data provider to agent request
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil)
tplFlds := []*config.FCTemplate{
&config.FCTemplate{Tag: "Diff", Filters: []string{},
@@ -1051,7 +1051,7 @@ func TestAgReqParseFieldMetaValueExponent(t *testing.T) {
dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, dm)
//pass the data provider to agent request
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil)
tplFlds := []*config.FCTemplate{
&config.FCTemplate{Tag: "ValExp", Filters: []string{},

View File

@@ -208,7 +208,7 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
reqProcessor.Tenant, da.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(reqProcessor.Timezone,
da.cgrCfg.GeneralCfg().DefaultTimezone),
da.filterS))
da.filterS, nil, nil))
if lclProcessed {
processed = lclProcessed
}
@@ -452,7 +452,7 @@ func (da *DiameterAgent) V1DisconnectSession(args utils.AttrDisconnectSession, r
config.NewNavigableMap(nil),
nil,
da.cgrCfg.GeneralCfg().DefaultTenant,
da.cgrCfg.GeneralCfg().DefaultTimezone, da.filterS)
da.cgrCfg.GeneralCfg().DefaultTimezone, da.filterS, nil, nil)
nM, err := aReq.AsNavigableMap(da.cgrCfg.DiameterAgentCfg().Templates[da.cgrCfg.DiameterAgentCfg().ASRTemplate])
if err != nil {
utils.Logger.Warning(

View File

@@ -125,7 +125,7 @@ func (da *DNSAgent) handleMessage(w dns.ResponseWriter, req *dns.Msg) {
da.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(da.cgrCfg.DNSAgentCfg().Timezone,
da.cgrCfg.GeneralCfg().DefaultTimezone),
da.fltrS))
da.fltrS, nil, nil))
if lclProcessed {
processed = lclProcessed
}

View File

@@ -70,7 +70,7 @@ func (ha *HTTPAgent) ServeHTTP(w http.ResponseWriter, req *http.Request) {
reqProcessor.Tenant, ha.dfltTenant,
utils.FirstNonEmpty(reqProcessor.Timezone,
config.CgrConfig().GeneralCfg().DefaultTimezone),
ha.filterS)
ha.filterS, nil, nil)
lclProcessed, err := ha.processRequest(reqProcessor, agReq)
if err != nil {
utils.Logger.Warning(

View File

@@ -481,7 +481,7 @@ func diamErr(m *diam.Message, resCode uint32,
newDADataProvider(nil, m), reqVars,
config.NewNavigableMap(nil),
config.NewNavigableMap(nil),
nil, tnt, tmz, filterS)
nil, tnt, tmz, filterS, nil, nil)
var rplyData *config.NavigableMap
if rplyData, err = aReq.AsNavigableMap(tpl); err != nil {
return

View File

@@ -96,7 +96,7 @@ func TestRadComposedFieldValue(t *testing.T) {
if err := pkt.AddAVPWithName("Cisco-NAS-Port", "CGR1", "Cisco"); err != nil {
t.Error(err)
}
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil)
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil, nil, nil)
agReq.Vars.Set([]string{MetaRadReqType}, MetaRadAcctStart, false, false)
agReq.Vars.Set([]string{"Cisco"}, "CGR1", false, false)
agReq.Vars.Set([]string{"User-Name"}, "flopsy", false, false)
@@ -116,7 +116,7 @@ func TestRadFieldOutVal(t *testing.T) {
t.Error(err)
}
eOut := fmt.Sprintf("%s|flopsy|CGR1", MetaRadAcctStart)
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil)
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil, nil, nil)
agReq.Vars.Set([]string{MetaRadReqType}, MetaRadAcctStart, false, false)
agReq.Vars.Set([]string{"Cisco"}, "CGR1", false, false)
agReq.Vars.Set([]string{"User-Name"}, "flopsy", false, false)
@@ -137,7 +137,7 @@ func TestRadReplyAppendAttributes(t *testing.T) {
&config.FCTemplate{Tag: "Acct-Session-Time", FieldId: "Acct-Session-Time", Type: utils.META_COMPOSED,
Value: config.NewRSRParsersMustCompile("~*cgrep.MaxUsage{*duration_seconds}", true, utils.INFIELD_SEP)},
}
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil)
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil, nil, nil)
agReq.CGRReply.Set([]string{utils.CapMaxUsage}, time.Duration(time.Hour), false, false)
agReq.CGRReply.Set([]string{utils.CapAttributes, "RadReply"}, "AccessAccept", false, false)
agReq.CGRReply.Set([]string{utils.CapAttributes, utils.Account}, "1001", false, false)

View File

@@ -82,7 +82,7 @@ func (ra *RadiusAgent) handleAuth(req *radigo.Packet) (rpl *radigo.Packet, err e
reqProcessor.Tenant, ra.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(reqProcessor.Timezone,
config.CgrConfig().GeneralCfg().DefaultTimezone),
ra.filterS)
ra.filterS, nil, nil)
agReq.Vars.Set([]string{MetaRadReqType}, utils.StringToInterface(MetaRadAuth), false, true)
var lclProcessed bool
if lclProcessed, err = ra.processRequest(reqProcessor, agReq, rpl); lclProcessed {
@@ -119,7 +119,7 @@ func (ra *RadiusAgent) handleAcct(req *radigo.Packet) (rpl *radigo.Packet, err e
reqProcessor.Tenant, ra.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(reqProcessor.Timezone,
config.CgrConfig().GeneralCfg().DefaultTimezone),
ra.filterS)
ra.filterS, nil, nil)
var lclProcessed bool
if lclProcessed, err = ra.processRequest(reqProcessor, agReq, rpl); lclProcessed {
processed = lclProcessed

View File

@@ -58,7 +58,7 @@ func (fP *FWVProvider) FieldAsInterface(fldPath []string) (data interface{}, err
err = nil // cancel previous err
indexes := strings.Split(fwvIdx, "-")
if len(indexes) != 2 {
return "", fmt.Errorf("Invalid format for index : %+v", fldPath[1])
return "", fmt.Errorf("Invalid format for index : %+v", fldPath)
}
startIndex, err := strconv.Atoi(indexes[0])
if err != nil {

View File

@@ -225,7 +225,7 @@
"source_path": "/tmp/fwvErs/in",
"flags": ["*cdrs"],
"processed_path": "/tmp/fwvErs/out",
"content_fields": [
"fields": [
{"tag": "FileName", "field_id": "CdrFileName", "type": "*variable", "value": "~*hdr.95-135", "padding":"right"},
{"tag": "FileSeqNr", "field_id": "FileSeqNr", "type": "*variable", "value": "~*hdr.135-141", "padding":"zeroleft"},
{"tag": "AccId1", "field_id": "AccId1", "type": "*variable", "value": "~*hdr.135-141", "padding":"zeroleft"},

View File

@@ -148,7 +148,7 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) {
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
rdr.fltrS) // create an AgentRequest
rdr.fltrS, nil, nil) // create an AgentRequest
if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
agReq); err != nil || !pass {
continue

View File

@@ -58,7 +58,6 @@ type FWVFileER struct {
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
fltrS *engine.FilterS
headerMap *config.NavigableMap
rdrDir string
rdrEvents chan *erEvent // channel to dispatch the events created to
rdrError chan error
@@ -68,6 +67,9 @@ type FWVFileER struct {
offset int64 // Index of the next byte to process
headerOffset int64
trailerOffset int64 // Index where trailer starts, to be used as boundary when reading cdrs
trailerLenght int64
headerDP config.DataProvider
trailerDP config.DataProvider
}
func (rdr *FWVFileER) Config() *config.EventReaderCfg {
@@ -137,17 +139,28 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) {
var hasHeader, hasTrailer bool
var headerFields, trailerFields []*config.FCTemplate
if rdr.offset == 0 { // First time, set the necessary offsets
rdr.preProcessFIelds(hasHeader, hasTrailer, headerFields, trailerFields)
if err := rdr.setLineLen(file, hasHeader); err != nil {
// preprocess the fields for header and trailer
for _, fld := range rdr.Config().Fields {
if strings.HasPrefix(fld.Value[0].Rules, utils.DynamicDataPrefix+utils.MetaHdr) {
hasHeader = true
headerFields = append(headerFields, fld)
}
if strings.HasPrefix(fld.Value[0].Rules, utils.DynamicDataPrefix+utils.MetaTrl) {
hasTrailer = true
trailerFields = append(trailerFields, fld)
}
}
if err = rdr.setLineLen(file, hasHeader, hasTrailer); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Row 0, error: cannot set lineLen: %s", utils.ERs, err.Error()))
break
}
if hasTrailer {
if fi, err := file.Stat(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Row 0, error: cannot get file stats: %s", utils.ERs, err.Error()))
return err
} else {
rdr.trailerOffset = fi.Size() - rdr.lineLen
// process trailer here
if err = rdr.processTrailer(file, rowNr, evsPosted, absPath, trailerFields); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Read trailer error: %s ", utils.ERs, err.Error()))
return
}
}
if hasHeader {
@@ -160,14 +173,15 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) {
}
buf := make([]byte, rdr.lineLen)
nRead, err := file.Read(buf)
if err != nil {
rdr.offset += rdr.lineLen // increase the offset when exit
if nRead, err := file.Read(buf); err != nil {
if err == io.EOF {
break
}
return err
} else if nRead != len(buf) {
} else if nRead != len(buf) && int64(nRead) != rdr.trailerLenght {
utils.Logger.Err(fmt.Sprintf("<%s> Could not read complete line, have instead: %s", utils.ERs, string(buf)))
rdr.offset += rdr.lineLen // increase the offset when exit
break
continue
}
rowNr++ // increment the rowNr after checking if it's not the end of file
record := string(buf)
@@ -177,7 +191,7 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) {
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
rdr.fltrS) // create an AgentRequest
rdr.fltrS, rdr.headerDP, rdr.trailerDP) // create an AgentRequest
if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
agReq); err != nil || !pass {
continue
@@ -190,21 +204,12 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) {
rdr.offset += rdr.lineLen // increase the offset when exit
continue
}
//backwards compatible with CDRC
if rdr.headerMap != nil {
navMp.Merge(rdr.headerMap)
}
rdr.offset += rdr.lineLen // increase the offset
rdr.rdrEvents <- &erEvent{cgrEvent: navMp.AsCGREvent(
agReq.Tenant, utils.NestingSep),
rdrCfg: rdr.Config()}
evsPosted++
if rdr.trailerOffset != 0 && rdr.offset >= rdr.trailerOffset {
if err := rdr.processTrailer(file, rowNr, evsPosted, absPath, trailerFields); err != nil && err != io.EOF {
utils.Logger.Err(fmt.Sprintf("<%s> Read trailer error: %s ", utils.ERs, err.Error()))
}
break
}
}
if rdr.Config().ProcessedPath != "" {
@@ -222,21 +227,37 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) {
}
// Sets the line length based on first line, sets offset back to initial after reading
func (rdr *FWVFileER) setLineLen(file *os.File, hasHeader bool) error {
func (rdr *FWVFileER) setLineLen(file *os.File, hasHeader, hasTrailer bool) error {
buff := bufio.NewReader(file)
// in case we have header we take the length of first line and add it as headerOffset
if hasHeader {
i := 0
lastLineSize := 0
for {
readBytes, err := buff.ReadBytes('\n')
if err != nil {
return err
break
}
rdr.headerOffset = int64(len(readBytes))
if hasHeader && i == 0 {
rdr.headerOffset = int64(len(readBytes))
i++
continue
}
if (hasHeader && i == 1) || (!hasHeader && i == 0) {
rdr.lineLen = int64(len(readBytes))
i++
continue
}
lastLineSize = len(readBytes)
}
readBytes, err := buff.ReadBytes('\n')
if err != nil {
return err
if hasTrailer {
if fi, err := file.Stat(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Row 0, error: cannot get file stats: %s", utils.ERs, err.Error()))
return err
} else {
rdr.trailerOffset = fi.Size() - int64(lastLineSize)
rdr.trailerLenght = int64(lastLineSize)
}
}
rdr.lineLen = int64(len(readBytes))
if _, err := file.Seek(0, 0); err != nil {
return err
@@ -245,21 +266,22 @@ func (rdr *FWVFileER) setLineLen(file *os.File, hasHeader bool) error {
}
func (rdr *FWVFileER) processTrailer(file *os.File, rowNr, evsPosted int, absPath string, trailerFields []*config.FCTemplate) (err error) {
buf := make([]byte, rdr.trailerOffset)
if nRead, err := file.ReadAt(buf, rdr.trailerOffset); err != nil {
buf := make([]byte, rdr.trailerLenght)
if nRead, err := file.ReadAt(buf, rdr.trailerOffset); err != nil && err != io.EOF {
return err
} else if nRead != len(buf) {
return fmt.Errorf("In trailer, line len: %d, have read: %d", rdr.trailerOffset, nRead)
return fmt.Errorf("In trailer, line len: %d, have read: %d instead of: %d", rdr.trailerOffset, nRead, len(buf))
}
record := string(buf)
reqVars := make(map[string]interface{})
rdr.trailerDP = config.NewFWVProvider(record)
agReq := agents.NewAgentRequest(
config.NewFWVProvider(record), reqVars,
nil, reqVars,
nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
rdr.fltrS) // create an AgentRequest
rdr.fltrS, nil, rdr.trailerDP) // create an AgentRequest
if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
agReq); err != nil || !pass {
return nil
@@ -275,6 +297,8 @@ func (rdr *FWVFileER) processTrailer(file *os.File, rowNr, evsPosted int, absPat
agReq.Tenant, utils.NestingSep),
rdrCfg: rdr.Config()}
evsPosted++
// reset the cursor after process the trailer
_, err = file.Seek(0, 0)
return
}
@@ -290,13 +314,14 @@ func (rdr *FWVFileER) processHeader(file *os.File, rowNr, evsPosted int, absPath
func (rdr *FWVFileER) createHeaderMap(record string, rowNr, evsPosted int, absPath string, hdrFields []*config.FCTemplate) (err error) {
reqVars := make(map[string]interface{})
rdr.headerDP = config.NewFWVProvider(record)
agReq := agents.NewAgentRequest(
config.NewFWVProvider(record), reqVars,
nil, reqVars,
nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
rdr.fltrS) // create an AgentRequest
rdr.fltrS, rdr.headerDP, nil) // create an AgentRequest
if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
agReq); err != nil || !pass {
return nil
@@ -309,7 +334,6 @@ func (rdr *FWVFileER) createHeaderMap(record string, rowNr, evsPosted int, absPa
rdr.offset += rdr.lineLen // increase the offset when exit
return err
}
rdr.headerMap = navMp
rdr.offset += rdr.headerOffset // increase the offset
rdr.rdrEvents <- &erEvent{cgrEvent: navMp.AsCGREvent(
agReq.Tenant, utils.NestingSep),
@@ -317,16 +341,3 @@ func (rdr *FWVFileER) createHeaderMap(record string, rowNr, evsPosted int, absPa
evsPosted++
return
}
func (rdr *FWVFileER) preProcessFIelds(hasHeader, hasTrailer bool, headerFields, trailerFields []*config.FCTemplate) {
for _, fld := range rdr.Config().Fields {
if fld.Value[0].Rules == utils.DynamicDataPrefix+utils.MetaHdr {
hasHeader = true
headerFields = append(headerFields, fld)
}
if fld.Value[0].Rules == utils.DynamicDataPrefix+utils.MetaTrl {
hasTrailer = true
trailerFields = append(trailerFields, fld)
}
}
}

View File

@@ -140,7 +140,7 @@ func (rdr *XMLFileER) processFile(fPath, fName string) (err error) {
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
rdr.fltrS) // create an AgentRequest
rdr.fltrS, nil, nil) // create an AgentRequest
if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
agReq); err != nil || !pass {
continue

View File

@@ -187,7 +187,7 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) {
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
rdr.fltrS) // create an AgentRequest
rdr.fltrS, nil, nil) // create an AgentRequest
if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
agReq); err != nil || !pass {
continue

View File

@@ -165,7 +165,7 @@ func (rdr *KafkaER) processMessage(msg []byte) (err error) {
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
rdr.fltrS) // create an AgentRequest
rdr.fltrS, nil, nil) // create an AgentRequest
var pass bool
if pass, err = rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
agReq); err != nil || !pass {

View File

@@ -163,7 +163,7 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) {
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
rdr.fltrS) // create an AgentRequest
rdr.fltrS, nil, nil) // create an AgentRequest
if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
agReq); err != nil || !pass {
continue

View File

@@ -192,7 +192,7 @@ func (rdr *SQLEventReader) processMessage(msg map[string]interface{}) (err error
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
rdr.fltrS) // create an AgentRequest
rdr.fltrS, nil, nil) // create an AgentRequest
var pass bool
if pass, err = rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
agReq); err != nil || !pass {