diff --git a/ees/ees.go b/ees/ees.go index d8e31b868..911a2731b 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -135,7 +135,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithOpts, rply *s var wg sync.WaitGroup var withErr bool for cfgIdx, eeCfg := range eeS.cfg.EEsNoLksCfg().Exporters { - if eeCfg.Type == utils.META_NONE { // ignore *default exporter + if eeCfg.Type == utils.META_NONE { // ignore *none type exporter continue } @@ -146,7 +146,9 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithOpts, rply *s tnt = eeTnt } if pass, errPass := eeS.filterS.Pass(tnt, - eeCfg.Filters, cgrDp); errPass != nil || !pass { + eeCfg.Filters, cgrDp); errPass != nil { + return errPass + } else if !pass { continue // does not pass the filters, ignore the exporter } } @@ -170,10 +172,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithOpts, rply *s var ee EventExporter if hasCache { var x interface{} - //fmt.Println("Try to get exporter from cache ") - //fmt.Println(eeCfg.ID) if x, isCached = eeCache.Get(eeCfg.ID); isCached { - //fmt.Println("Get FROM CACHE") ee = x.(EventExporter) } } diff --git a/engine/cdre.go b/engine/cdre.go index 51e0159df..3fc823857 100644 --- a/engine/cdre.go +++ b/engine/cdre.go @@ -395,7 +395,9 @@ func (cdre *CDRExporter) processCDRs() (err error) { utils.MetaEC: cdr.CostDetails, } if pass, err := cdre.filterS.Pass(cdre.exportTemplate.Tenant, - cdre.exportTemplate.Filters, cgrDp); err != nil || !pass { + cdre.exportTemplate.Filters, cgrDp); err != nil { + return err + } else if !pass { continue // Not passes filters, ignore this CDR } } diff --git a/ers/filecsv.go b/ers/filecsv.go index 0c3d2344e..862b10200 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -157,14 +157,19 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTimezone), rdr.fltrS, nil, nil) // create an AgentRequest if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, - agReq); err != nil || !pass { + agReq); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to filter error: <%s>", + utils.ERs, absPath, rowNr, err.Error())) + return err + } else if !pass { continue } - if err := agReq.SetFields(rdr.Config().Fields); err != nil { + if err = agReq.SetFields(rdr.Config().Fields); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>", utils.ERs, absPath, rowNr, err.Error())) - continue + return } rdr.rdrEvents <- &erEvent{ cgrEvent: config.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep), diff --git a/ers/filefwv.go b/ers/filefwv.go index 7cdbd2969..328dc4595 100644 --- a/ers/filefwv.go +++ b/ers/filefwv.go @@ -200,15 +200,20 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTimezone), rdr.fltrS, rdr.headerDP, rdr.trailerDP) // create an AgentRequest if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, - agReq); err != nil || !pass { + agReq); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to filter error: <%s>", + utils.ERs, absPath, rowNr, err.Error())) + return err + } else if !pass { continue } - if err := agReq.SetFields(rdr.Config().Fields); err != nil { + if err = agReq.SetFields(rdr.Config().Fields); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>", utils.ERs, absPath, rowNr, err.Error())) rdr.offset += rdr.lineLen // increase the offset when exit - continue + return } rdr.offset += rdr.lineLen // increase the offset rdr.rdrEvents <- &erEvent{ diff --git a/ers/filejson.go b/ers/filejson.go index ea6b8d988..c4def70dc 100644 --- a/ers/filejson.go +++ b/ers/filejson.go @@ -150,8 +150,13 @@ func (rdr *JSONFileER) processFile(fPath, fName string) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTimezone), rdr.fltrS, nil, nil) // create an AgentRequest if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, - agReq); err != nil || !pass { + agReq); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> reading file: <%s> ignoring due to filter error: <%s>", + utils.ERs, absPath, err.Error())) return err + } else if !pass { + return nil } if err = agReq.SetFields(rdr.Config().Fields); err != nil { utils.Logger.Warning( diff --git a/ers/filexml.go b/ers/filexml.go index 17c341a0d..ea440a6bf 100644 --- a/ers/filexml.go +++ b/ers/filexml.go @@ -148,10 +148,15 @@ func (rdr *XMLFileER) processFile(fPath, fName string) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTimezone), rdr.fltrS, nil, nil) // create an AgentRequest if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, - agReq); err != nil || !pass { + agReq); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to filter error: <%s>", + utils.ERs, absPath, rowNr, err.Error())) + return err + } else if !pass { continue } - if err := agReq.SetFields(rdr.Config().Fields); err != nil { + if err = agReq.SetFields(rdr.Config().Fields); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>", utils.ERs, absPath, rowNr, err.Error())) diff --git a/ers/flatstore.go b/ers/flatstore.go index 5520505ac..c935eac8f 100644 --- a/ers/flatstore.go +++ b/ers/flatstore.go @@ -196,14 +196,19 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTimezone), rdr.fltrS, nil, nil) // create an AgentRequest if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, - agReq); err != nil || !pass { + agReq); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to filter error: <%s>", + utils.ERs, absPath, rowNr, err.Error())) + return err + } else if !pass { continue } - if err := agReq.SetFields(rdr.Config().Fields); err != nil { + if err = agReq.SetFields(rdr.Config().Fields); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>", utils.ERs, absPath, rowNr, err.Error())) - continue + return } rdr.rdrEvents <- &erEvent{ diff --git a/ers/partial_csv.go b/ers/partial_csv.go index d83765858..802c7fddf 100644 --- a/ers/partial_csv.go +++ b/ers/partial_csv.go @@ -176,14 +176,19 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTimezone), rdr.fltrS, nil, nil) // create an AgentRequest if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, - agReq); err != nil || !pass { + agReq); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to filter error: <%s>", + utils.ERs, absPath, rowNr, err.Error())) + return err + } else if !pass { continue } - if err := agReq.SetFields(rdr.Config().Fields); err != nil { + if err = agReq.SetFields(rdr.Config().Fields); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>", utils.ERs, absPath, rowNr, err.Error())) - continue + return } // take OriginID and OriginHost to compose CGRID diff --git a/loaders/libloader.go b/loaders/libloader.go index 645b4ab62..550f023d6 100644 --- a/loaders/libloader.go +++ b/loaders/libloader.go @@ -56,7 +56,9 @@ func (ld LoaderData) UpdateFromCSV(fileName string, record []string, return err } if pass, err := filterS.Pass(tenant, - cfgFld.Filters, csvProvider); err != nil || !pass { + cfgFld.Filters, csvProvider); err != nil { + return err + } else if !pass { continue // Not passes filters, ignore this CDR } }