From d9fa6bc4cc29fc2b5bf4d417e0043a343d7d19ec Mon Sep 17 00:00:00 2001 From: TeoV Date: Wed, 29 Aug 2018 09:18:52 -0400 Subject: [PATCH] Consider Filter from Fields in cdrc --- cdrc/csv.go | 6 +- cdrc/csv_it_test.go | 86 +++++++++++++++++++ cdrc/fwv.go | 5 +- cdrc/partial_cdr.go | 5 +- cdrc/xml.go | 2 +- .../samples/csvwithfieldfilter/cgrates.json | 58 +++++++++++++ engine/cdr.go | 15 ++-- 7 files changed, 160 insertions(+), 17 deletions(-) create mode 100755 data/conf/samples/csvwithfieldfilter/cgrates.json diff --git a/cdrc/csv.go b/cdrc/csv.go index bd20cb5a3..3178342f1 100644 --- a/cdrc/csv.go +++ b/cdrc/csv.go @@ -147,8 +147,10 @@ func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcCfg *con return nil, err } if pass, err := self.filterS.Pass(tenant, - cdrcCfg.Filters, csvProvider); err != nil || !pass { - continue // Not passes filters, ignore this CDR + cdrFldCfg.Filters, csvProvider); err != nil { + return nil, err + } else if !pass { + continue } } if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.dfltCdrcCfg.CdrFormat) { // Hardcode some values in case of flatstore diff --git a/cdrc/csv_it_test.go b/cdrc/csv_it_test.go index b80f8c291..eba5d4093 100644 --- a/cdrc/csv_it_test.go +++ b/cdrc/csv_it_test.go @@ -342,3 +342,89 @@ func TestCsvIT3KillEngine(t *testing.T) { t.Error(err) } } + +// Begin tests for cdrc csv with new filters +var fileContent1_4 = `accid21;*prepaid;itsyscom.com;1002;086517174963;2013-02-03 19:54:00;62;val_extra3;"";val_extra1 +accid22;*postpaid;itsyscom.com;1001;+4986517174963;2013-02-03 19:54:00;123;val_extra3;"";val_extra1 +accid23;*postpaid;cgrates.org;1002;086517174963;2013-02-03 19:54:00;76;val_extra3;"";val_extra1` + +func TestCsvIT4InitConfig(t *testing.T) { + var err error + csvCfgPath = path.Join(*dataDir, "conf", "samples", "csvwithfieldfilter") + if csvCfg, err = config.NewCGRConfigFromFolder(csvCfgPath); err != nil { + t.Fatal("Got config error: ", err.Error()) + } +} + +// InitDb so we can rely on count +func TestCsvIT4InitCdrDb(t *testing.T) { + if err := engine.InitStorDb(csvCfg); err != nil { + t.Fatal(err) + } +} + +func TestCsvIT4CreateCdrDirs(t *testing.T) { + for _, cdrcProfiles := range csvCfg.CdrcProfiles { + for _, cdrcInst := range cdrcProfiles { + for _, dir := range []string{cdrcInst.CdrInDir, cdrcInst.CdrOutDir} { + if err := os.RemoveAll(dir); err != nil { + t.Fatal("Error removing folder: ", dir, err) + } + if err := os.MkdirAll(dir, 0755); err != nil { + t.Fatal("Error creating folder: ", dir, err) + } + } + } + } +} + +func TestCsvIT4StartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(csvCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func TestCsvIT4RpcConn(t *testing.T) { + var err error + cdrcRpc, err = jsonrpc.Dial("tcp", csvCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal("Could not connect to rater: ", err.Error()) + } +} + +// Scenario out of first .xml config +func TestCsvIT4HandleCdr2File(t *testing.T) { + fileName := "file1.csv" + tmpFilePath := path.Join("/tmp", fileName) + if err := ioutil.WriteFile(tmpFilePath, []byte(fileContent1_4), 0644); err != nil { + t.Fatal(err.Error()) + } + if err := os.Rename(tmpFilePath, path.Join("/tmp/csvwithfielfilter/csvit2/in", fileName)); err != nil { + t.Fatal("Error moving file to processing directory: ", err) + } +} + +func TestCsvIT4ProcessedFiles(t *testing.T) { + time.Sleep(time.Duration(2**waitRater) * time.Millisecond) + if outContent4, err := ioutil.ReadFile("/tmp/csvwithfielfilter/csvit2/out/file1.csv"); err != nil { + t.Error(err) + } else if fileContent1_4 != string(outContent4) { + t.Errorf("Expecting: %q, received: %q", fileContent1_4, string(outContent4)) + } +} + +func TestCsvIT4AnalyseCDRs(t *testing.T) { + var reply []*engine.ExternalCDR + if err := cdrcRpc.Call("ApierV2.GetCdrs", utils.RPCCDRsFilter{}, &reply); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(reply) != 2 { + t.Error("Unexpected number of CDRs returned: ", len(reply)) + } +} + +func TestCsvIT4KillEngine(t *testing.T) { + if err := engine.KillEngine(*waitRater); err != nil { + t.Error(err) + } +} diff --git a/cdrc/fwv.go b/cdrc/fwv.go index 491adf1b3..f881798a5 100644 --- a/cdrc/fwv.go +++ b/cdrc/fwv.go @@ -161,14 +161,13 @@ func (self *FwvRecordsProcessor) recordToStoredCdr(record string, cdrcCfg *confi duMultiplyFactor = cdrcCfg.DataUsageMultiplyFactor } for _, cdrFldCfg := range cfgFields { - if len(cdrcCfg.Filters) != 0 { - + if len(cdrFldCfg.Filters) != 0 { tenant, err := cdrcCfg.Tenant.ParseValue("") if err != nil { return nil, err } if pass, err := self.filterS.Pass(tenant, - cdrcCfg.Filters, fwvProvider); err != nil || !pass { + cdrFldCfg.Filters, fwvProvider); err != nil || !pass { continue // Not passes filters, ignore this CDR } } diff --git a/cdrc/partial_cdr.go b/cdrc/partial_cdr.go index c33d01930..da43365df 100644 --- a/cdrc/partial_cdr.go +++ b/cdrc/partial_cdr.go @@ -77,7 +77,8 @@ func (prc *PartialRecordsCache) dumpPartialRecords(originID string) { csvWriter := csv.NewWriter(fileOut) csvWriter.Comma = prc.csvSep for _, cdr := range prc.partialRecords[originID].cdrs { - expRec, err := cdr.AsExportRecord(prc.partialRecords[originID].cacheDumpFields, prc.httpSkipTlsCheck, nil, prc.roundDecimals, prc.filterS) + expRec, err := cdr.AsExportRecord(prc.partialRecords[originID].cacheDumpFields, + prc.httpSkipTlsCheck, nil, prc.roundDecimals, prc.filterS) if err != nil { return nil, err } @@ -103,7 +104,7 @@ func (prc *PartialRecordsCache) postCDR(originID string) { cdr := prc.partialRecords[originID].MergeCDRs() cdr.Partial = false // force completion var reply string - if err := prc.cdrs.Call("CdrsV1.ProcessCDR", cdr, &reply); err != nil { + if err := prc.cdrs.Call(utils.CdrsV2ProcessCDR, cdr.AsCGREvent(), &reply); err != nil { utils.Logger.Err(fmt.Sprintf(" Failed sending CDR %+v from partial cache, error: %s", cdr, err.Error())) } else if reply != utils.OK { utils.Logger.Err(fmt.Sprintf(" Received unexpected reply for CDR, %+v, reply: %s", cdr, reply)) diff --git a/cdrc/xml.go b/cdrc/xml.go index 6c1a8bc14..51e2ed801 100644 --- a/cdrc/xml.go +++ b/cdrc/xml.go @@ -168,7 +168,7 @@ func (xmlProc *XMLRecordsProcessor) recordToCDR(xmlEntity tree.Res, cdrcCfg *con return nil, err } if pass, err := xmlProc.filterS.Pass(tenant, - cdrcCfg.Filters, xmlProvider); err != nil || !pass { + cdrFldCfg.Filters, xmlProvider); err != nil || !pass { continue // Not passes filters, ignore this CDR } } diff --git a/data/conf/samples/csvwithfieldfilter/cgrates.json b/data/conf/samples/csvwithfieldfilter/cgrates.json new file mode 100755 index 000000000..14a55d991 --- /dev/null +++ b/data/conf/samples/csvwithfieldfilter/cgrates.json @@ -0,0 +1,58 @@ +{ + +// Real-time Charging System for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH +// +// This file contains the default configuration hardcoded into CGRateS. +// This is what you get when you load CGRateS with an empty configuration file. +"general": { + "log_level": 7, +}, + +"stor_db": { // database used to store offline tariff plans and CDRs + "db_password": "CGRateS.org", // password to use when connecting to stordb +}, + + "rals": { + "enabled": true // so we can query CDRs + }, + + "cdrs": { + "enabled": true, + "rals_conns": [], // no rating support, just *raw CDR testing +}, + + + + "cdrc": [ + { + "id": "*CSVit4", // identifier of the CDRC runner + "enabled": true, // enable CDR client functionality + "field_separator": ";", + "cdr_in_dir": "/tmp/csvwithfielfilter/csvit2/in", // absolute path towards the directory where the CDRs are stored + "cdr_out_dir": "/tmp/csvwithfielfilter/csvit2/out", // absolute path towards the directory where processed CDRs will be moved + "cdr_source_id": "csvit4", // free form field, tag identifying the source of the CDRs within CDRS database + "filters":["*string:1:*postpaid"], //filter Account to be 1002 + "content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value + {"id": "TOR", "field_id": "ToR", "type": "*composed", "value": "*voice", "mandatory": true}, + {"id": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "~0", "mandatory": true}, + {"id": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "~1", "mandatory": true}, + {"id": "Direction", "field_id": "Direction", "type": "*composed", "value": "*out", "mandatory": true}, + {"id": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "~2", "mandatory": true}, + {"id": "Category", "field_id": "Category", "type": "*composed", "value": "call", "mandatory": true}, + {"id": "Account", "field_id": "Account", "type": "*composed", "value": "~3", "mandatory": true}, + {"id": "Subject", "field_id": "Subject", "type": "*composed", "value": "~3", "mandatory": true}, + {"id": "Destination", "field_id": "Destination", "type": "*composed", "value": "~4:s/0([1-9]\\d+)/+49${1}/", "mandatory": true}, + {"id": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "~5", "mandatory": true}, + {"id": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "~5", "mandatory": true}, + {"id": "Usage", "field_id": "Usage", "type": "*composed", "value": "~6", "mandatory": true}, + {"id": "HDRExtra3", "field_id": "HDRExtra3", "type": "*composed", "value": "~6", "mandatory": true}, + {"id": "HDRExtra2", "field_id": "HDRExtra2", "type": "*composed", "value": "~6", "mandatory": true}, + {"id": "HDRExtra1", "field_id": "HDRExtra1", "type": "*composed", "value": "~6", "mandatory": true}, + {"id": "RandomVal", "field_id": "RandomVal", "type": "*composed", "value": "*randomValue","filters":["*string:3:1001"]}, + ], + }, +], + + +} \ No newline at end of file diff --git a/engine/cdr.go b/engine/cdr.go index 9eb644b9b..b4704620a 100644 --- a/engine/cdr.go +++ b/engine/cdr.go @@ -498,12 +498,6 @@ func (cdr *CDR) combimedCdrFieldVal(cfgCdrFld *config.FCTemplate, groupCDRs []*C // Extracts the value specified by cfgHdr out of cdr, used for export values func (cdr *CDR) exportFieldValue(cfgCdrFld *config.FCTemplate, filterS *FilterS) (retVal string, err error) { - if pass, err := filterS.Pass(cdr.Tenant, - cfgCdrFld.Filters, config.NewNavigableMap(cdr.AsMapStringIface())); err != nil { - return "", err - } else if !pass { - return "", utils.ErrFilterNotPassingNoCaps - } for _, rsrFld := range cfgCdrFld.Value { var cdrVal string switch cfgCdrFld.ID { @@ -597,6 +591,12 @@ func (cdr *CDR) formatField(cfgFld *config.FCTemplate, httpSkipTlsCheck bool, func (cdr *CDR) AsExportRecord(exportFields []*config.FCTemplate, httpSkipTlsCheck bool, groupedCDRs []*CDR, roundingDecs int, filterS *FilterS) (expRecord []string, err error) { for _, cfgFld := range exportFields { + if pass, err := filterS.Pass(cdr.Tenant, + cfgFld.Filters, config.NewNavigableMap(cdr.AsMapStringIface())); err != nil { + return []string{}, err + } else if !pass { + continue + } if roundingDecs != 0 { clnFld := new(config.FCTemplate) // Clone so we can modify the rounding decimals without affecting the template *clnFld = *cfgFld @@ -604,9 +604,6 @@ func (cdr *CDR) AsExportRecord(exportFields []*config.FCTemplate, cfgFld = clnFld } if fmtOut, err := cdr.formatField(cfgFld, httpSkipTlsCheck, groupedCDRs, filterS); err != nil { - if err == utils.ErrFilterNotPassingNoCaps { - continue // not exporting this field value - } utils.Logger.Warning(fmt.Sprintf(" error: %s exporting field: %s, CDR: %s\n", err.Error(), utils.ToJSON(cfgFld), utils.ToJSON(cdr))) return nil, err