mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-14 20:59:53 +05:00
Return error in case of filterS.Pass
This commit is contained in:
committed by
Dan Christian Bogos
parent
1f86b28527
commit
24b2206a2a
@@ -135,7 +135,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithOpts, rply *s
|
||||
var wg sync.WaitGroup
|
||||
var withErr bool
|
||||
for cfgIdx, eeCfg := range eeS.cfg.EEsNoLksCfg().Exporters {
|
||||
if eeCfg.Type == utils.META_NONE { // ignore *default exporter
|
||||
if eeCfg.Type == utils.META_NONE { // ignore *none type exporter
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -146,7 +146,9 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithOpts, rply *s
|
||||
tnt = eeTnt
|
||||
}
|
||||
if pass, errPass := eeS.filterS.Pass(tnt,
|
||||
eeCfg.Filters, cgrDp); errPass != nil || !pass {
|
||||
eeCfg.Filters, cgrDp); errPass != nil {
|
||||
return errPass
|
||||
} else if !pass {
|
||||
continue // does not pass the filters, ignore the exporter
|
||||
}
|
||||
}
|
||||
@@ -170,10 +172,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithOpts, rply *s
|
||||
var ee EventExporter
|
||||
if hasCache {
|
||||
var x interface{}
|
||||
//fmt.Println("Try to get exporter from cache ")
|
||||
//fmt.Println(eeCfg.ID)
|
||||
if x, isCached = eeCache.Get(eeCfg.ID); isCached {
|
||||
//fmt.Println("Get FROM CACHE")
|
||||
ee = x.(EventExporter)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -395,7 +395,9 @@ func (cdre *CDRExporter) processCDRs() (err error) {
|
||||
utils.MetaEC: cdr.CostDetails,
|
||||
}
|
||||
if pass, err := cdre.filterS.Pass(cdre.exportTemplate.Tenant,
|
||||
cdre.exportTemplate.Filters, cgrDp); err != nil || !pass {
|
||||
cdre.exportTemplate.Filters, cgrDp); err != nil {
|
||||
return err
|
||||
} else if !pass {
|
||||
continue // Not passes filters, ignore this CDR
|
||||
}
|
||||
}
|
||||
|
||||
@@ -157,14 +157,19 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) {
|
||||
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
|
||||
rdr.fltrS, nil, nil) // create an AgentRequest
|
||||
if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
|
||||
agReq); err != nil || !pass {
|
||||
agReq); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to filter error: <%s>",
|
||||
utils.ERs, absPath, rowNr, err.Error()))
|
||||
return err
|
||||
} else if !pass {
|
||||
continue
|
||||
}
|
||||
if err := agReq.SetFields(rdr.Config().Fields); err != nil {
|
||||
if err = agReq.SetFields(rdr.Config().Fields); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>",
|
||||
utils.ERs, absPath, rowNr, err.Error()))
|
||||
continue
|
||||
return
|
||||
}
|
||||
rdr.rdrEvents <- &erEvent{
|
||||
cgrEvent: config.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep),
|
||||
|
||||
@@ -200,15 +200,20 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) {
|
||||
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
|
||||
rdr.fltrS, rdr.headerDP, rdr.trailerDP) // create an AgentRequest
|
||||
if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
|
||||
agReq); err != nil || !pass {
|
||||
agReq); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to filter error: <%s>",
|
||||
utils.ERs, absPath, rowNr, err.Error()))
|
||||
return err
|
||||
} else if !pass {
|
||||
continue
|
||||
}
|
||||
if err := agReq.SetFields(rdr.Config().Fields); err != nil {
|
||||
if err = agReq.SetFields(rdr.Config().Fields); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>",
|
||||
utils.ERs, absPath, rowNr, err.Error()))
|
||||
rdr.offset += rdr.lineLen // increase the offset when exit
|
||||
continue
|
||||
return
|
||||
}
|
||||
rdr.offset += rdr.lineLen // increase the offset
|
||||
rdr.rdrEvents <- &erEvent{
|
||||
|
||||
@@ -150,8 +150,13 @@ func (rdr *JSONFileER) processFile(fPath, fName string) (err error) {
|
||||
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
|
||||
rdr.fltrS, nil, nil) // create an AgentRequest
|
||||
if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
|
||||
agReq); err != nil || !pass {
|
||||
agReq); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reading file: <%s> ignoring due to filter error: <%s>",
|
||||
utils.ERs, absPath, err.Error()))
|
||||
return err
|
||||
} else if !pass {
|
||||
return nil
|
||||
}
|
||||
if err = agReq.SetFields(rdr.Config().Fields); err != nil {
|
||||
utils.Logger.Warning(
|
||||
|
||||
@@ -148,10 +148,15 @@ func (rdr *XMLFileER) processFile(fPath, fName string) (err error) {
|
||||
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
|
||||
rdr.fltrS, nil, nil) // create an AgentRequest
|
||||
if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
|
||||
agReq); err != nil || !pass {
|
||||
agReq); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to filter error: <%s>",
|
||||
utils.ERs, absPath, rowNr, err.Error()))
|
||||
return err
|
||||
} else if !pass {
|
||||
continue
|
||||
}
|
||||
if err := agReq.SetFields(rdr.Config().Fields); err != nil {
|
||||
if err = agReq.SetFields(rdr.Config().Fields); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>",
|
||||
utils.ERs, absPath, rowNr, err.Error()))
|
||||
|
||||
@@ -196,14 +196,19 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) {
|
||||
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
|
||||
rdr.fltrS, nil, nil) // create an AgentRequest
|
||||
if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
|
||||
agReq); err != nil || !pass {
|
||||
agReq); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to filter error: <%s>",
|
||||
utils.ERs, absPath, rowNr, err.Error()))
|
||||
return err
|
||||
} else if !pass {
|
||||
continue
|
||||
}
|
||||
if err := agReq.SetFields(rdr.Config().Fields); err != nil {
|
||||
if err = agReq.SetFields(rdr.Config().Fields); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>",
|
||||
utils.ERs, absPath, rowNr, err.Error()))
|
||||
continue
|
||||
return
|
||||
}
|
||||
|
||||
rdr.rdrEvents <- &erEvent{
|
||||
|
||||
@@ -176,14 +176,19 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) {
|
||||
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
|
||||
rdr.fltrS, nil, nil) // create an AgentRequest
|
||||
if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
|
||||
agReq); err != nil || !pass {
|
||||
agReq); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to filter error: <%s>",
|
||||
utils.ERs, absPath, rowNr, err.Error()))
|
||||
return err
|
||||
} else if !pass {
|
||||
continue
|
||||
}
|
||||
if err := agReq.SetFields(rdr.Config().Fields); err != nil {
|
||||
if err = agReq.SetFields(rdr.Config().Fields); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>",
|
||||
utils.ERs, absPath, rowNr, err.Error()))
|
||||
continue
|
||||
return
|
||||
}
|
||||
|
||||
// take OriginID and OriginHost to compose CGRID
|
||||
|
||||
@@ -56,7 +56,9 @@ func (ld LoaderData) UpdateFromCSV(fileName string, record []string,
|
||||
return err
|
||||
}
|
||||
if pass, err := filterS.Pass(tenant,
|
||||
cfgFld.Filters, csvProvider); err != nil || !pass {
|
||||
cfgFld.Filters, csvProvider); err != nil {
|
||||
return err
|
||||
} else if !pass {
|
||||
continue // Not passes filters, ignore this CDR
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user