mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-12 02:26:26 +05:00
Consider Filter from Fields in cdrc
This commit is contained in:
committed by
Dan Christian Bogos
parent
8cce0ce7f0
commit
d9fa6bc4cc
@@ -147,8 +147,10 @@ func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcCfg *con
|
||||
return nil, err
|
||||
}
|
||||
if pass, err := self.filterS.Pass(tenant,
|
||||
cdrcCfg.Filters, csvProvider); err != nil || !pass {
|
||||
continue // Not passes filters, ignore this CDR
|
||||
cdrFldCfg.Filters, csvProvider); err != nil {
|
||||
return nil, err
|
||||
} else if !pass {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.dfltCdrcCfg.CdrFormat) { // Hardcode some values in case of flatstore
|
||||
|
||||
@@ -342,3 +342,89 @@ func TestCsvIT3KillEngine(t *testing.T) {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Begin tests for cdrc csv with new filters
|
||||
var fileContent1_4 = `accid21;*prepaid;itsyscom.com;1002;086517174963;2013-02-03 19:54:00;62;val_extra3;"";val_extra1
|
||||
accid22;*postpaid;itsyscom.com;1001;+4986517174963;2013-02-03 19:54:00;123;val_extra3;"";val_extra1
|
||||
accid23;*postpaid;cgrates.org;1002;086517174963;2013-02-03 19:54:00;76;val_extra3;"";val_extra1`
|
||||
|
||||
func TestCsvIT4InitConfig(t *testing.T) {
|
||||
var err error
|
||||
csvCfgPath = path.Join(*dataDir, "conf", "samples", "csvwithfieldfilter")
|
||||
if csvCfg, err = config.NewCGRConfigFromFolder(csvCfgPath); err != nil {
|
||||
t.Fatal("Got config error: ", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// InitDb so we can rely on count
|
||||
func TestCsvIT4InitCdrDb(t *testing.T) {
|
||||
if err := engine.InitStorDb(csvCfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCsvIT4CreateCdrDirs(t *testing.T) {
|
||||
for _, cdrcProfiles := range csvCfg.CdrcProfiles {
|
||||
for _, cdrcInst := range cdrcProfiles {
|
||||
for _, dir := range []string{cdrcInst.CdrInDir, cdrcInst.CdrOutDir} {
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
t.Fatal("Error removing folder: ", dir, err)
|
||||
}
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
t.Fatal("Error creating folder: ", dir, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCsvIT4StartEngine(t *testing.T) {
|
||||
if _, err := engine.StopStartEngine(csvCfgPath, *waitRater); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Connect rpc client to rater
|
||||
func TestCsvIT4RpcConn(t *testing.T) {
|
||||
var err error
|
||||
cdrcRpc, err = jsonrpc.Dial("tcp", csvCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
|
||||
if err != nil {
|
||||
t.Fatal("Could not connect to rater: ", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Scenario out of first .xml config
|
||||
func TestCsvIT4HandleCdr2File(t *testing.T) {
|
||||
fileName := "file1.csv"
|
||||
tmpFilePath := path.Join("/tmp", fileName)
|
||||
if err := ioutil.WriteFile(tmpFilePath, []byte(fileContent1_4), 0644); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
if err := os.Rename(tmpFilePath, path.Join("/tmp/csvwithfielfilter/csvit2/in", fileName)); err != nil {
|
||||
t.Fatal("Error moving file to processing directory: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCsvIT4ProcessedFiles(t *testing.T) {
|
||||
time.Sleep(time.Duration(2**waitRater) * time.Millisecond)
|
||||
if outContent4, err := ioutil.ReadFile("/tmp/csvwithfielfilter/csvit2/out/file1.csv"); err != nil {
|
||||
t.Error(err)
|
||||
} else if fileContent1_4 != string(outContent4) {
|
||||
t.Errorf("Expecting: %q, received: %q", fileContent1_4, string(outContent4))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCsvIT4AnalyseCDRs(t *testing.T) {
|
||||
var reply []*engine.ExternalCDR
|
||||
if err := cdrcRpc.Call("ApierV2.GetCdrs", utils.RPCCDRsFilter{}, &reply); err != nil {
|
||||
t.Error("Unexpected error: ", err.Error())
|
||||
} else if len(reply) != 2 {
|
||||
t.Error("Unexpected number of CDRs returned: ", len(reply))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCsvIT4KillEngine(t *testing.T) {
|
||||
if err := engine.KillEngine(*waitRater); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -161,14 +161,13 @@ func (self *FwvRecordsProcessor) recordToStoredCdr(record string, cdrcCfg *confi
|
||||
duMultiplyFactor = cdrcCfg.DataUsageMultiplyFactor
|
||||
}
|
||||
for _, cdrFldCfg := range cfgFields {
|
||||
if len(cdrcCfg.Filters) != 0 {
|
||||
|
||||
if len(cdrFldCfg.Filters) != 0 {
|
||||
tenant, err := cdrcCfg.Tenant.ParseValue("")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if pass, err := self.filterS.Pass(tenant,
|
||||
cdrcCfg.Filters, fwvProvider); err != nil || !pass {
|
||||
cdrFldCfg.Filters, fwvProvider); err != nil || !pass {
|
||||
continue // Not passes filters, ignore this CDR
|
||||
}
|
||||
}
|
||||
|
||||
@@ -77,7 +77,8 @@ func (prc *PartialRecordsCache) dumpPartialRecords(originID string) {
|
||||
csvWriter := csv.NewWriter(fileOut)
|
||||
csvWriter.Comma = prc.csvSep
|
||||
for _, cdr := range prc.partialRecords[originID].cdrs {
|
||||
expRec, err := cdr.AsExportRecord(prc.partialRecords[originID].cacheDumpFields, prc.httpSkipTlsCheck, nil, prc.roundDecimals, prc.filterS)
|
||||
expRec, err := cdr.AsExportRecord(prc.partialRecords[originID].cacheDumpFields,
|
||||
prc.httpSkipTlsCheck, nil, prc.roundDecimals, prc.filterS)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -103,7 +104,7 @@ func (prc *PartialRecordsCache) postCDR(originID string) {
|
||||
cdr := prc.partialRecords[originID].MergeCDRs()
|
||||
cdr.Partial = false // force completion
|
||||
var reply string
|
||||
if err := prc.cdrs.Call("CdrsV1.ProcessCDR", cdr, &reply); err != nil {
|
||||
if err := prc.cdrs.Call(utils.CdrsV2ProcessCDR, cdr.AsCGREvent(), &reply); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<Cdrc> Failed sending CDR %+v from partial cache, error: %s", cdr, err.Error()))
|
||||
} else if reply != utils.OK {
|
||||
utils.Logger.Err(fmt.Sprintf("<Cdrc> Received unexpected reply for CDR, %+v, reply: %s", cdr, reply))
|
||||
|
||||
@@ -168,7 +168,7 @@ func (xmlProc *XMLRecordsProcessor) recordToCDR(xmlEntity tree.Res, cdrcCfg *con
|
||||
return nil, err
|
||||
}
|
||||
if pass, err := xmlProc.filterS.Pass(tenant,
|
||||
cdrcCfg.Filters, xmlProvider); err != nil || !pass {
|
||||
cdrFldCfg.Filters, xmlProvider); err != nil || !pass {
|
||||
continue // Not passes filters, ignore this CDR
|
||||
}
|
||||
}
|
||||
|
||||
58
data/conf/samples/csvwithfieldfilter/cgrates.json
Executable file
58
data/conf/samples/csvwithfieldfilter/cgrates.json
Executable file
@@ -0,0 +1,58 @@
|
||||
{
|
||||
|
||||
// Real-time Charging System for Telecom & ISP environments
|
||||
// Copyright (C) ITsysCOM GmbH
|
||||
//
|
||||
// This file contains the default configuration hardcoded into CGRateS.
|
||||
// This is what you get when you load CGRateS with an empty configuration file.
|
||||
"general": {
|
||||
"log_level": 7,
|
||||
},
|
||||
|
||||
"stor_db": { // database used to store offline tariff plans and CDRs
|
||||
"db_password": "CGRateS.org", // password to use when connecting to stordb
|
||||
},
|
||||
|
||||
"rals": {
|
||||
"enabled": true // so we can query CDRs
|
||||
},
|
||||
|
||||
"cdrs": {
|
||||
"enabled": true,
|
||||
"rals_conns": [], // no rating support, just *raw CDR testing
|
||||
},
|
||||
|
||||
|
||||
|
||||
"cdrc": [
|
||||
{
|
||||
"id": "*CSVit4", // identifier of the CDRC runner
|
||||
"enabled": true, // enable CDR client functionality
|
||||
"field_separator": ";",
|
||||
"cdr_in_dir": "/tmp/csvwithfielfilter/csvit2/in", // absolute path towards the directory where the CDRs are stored
|
||||
"cdr_out_dir": "/tmp/csvwithfielfilter/csvit2/out", // absolute path towards the directory where processed CDRs will be moved
|
||||
"cdr_source_id": "csvit4", // free form field, tag identifying the source of the CDRs within CDRS database
|
||||
"filters":["*string:1:*postpaid"], //filter Account to be 1002
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"id": "TOR", "field_id": "ToR", "type": "*composed", "value": "*voice", "mandatory": true},
|
||||
{"id": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "~0", "mandatory": true},
|
||||
{"id": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "~1", "mandatory": true},
|
||||
{"id": "Direction", "field_id": "Direction", "type": "*composed", "value": "*out", "mandatory": true},
|
||||
{"id": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "~2", "mandatory": true},
|
||||
{"id": "Category", "field_id": "Category", "type": "*composed", "value": "call", "mandatory": true},
|
||||
{"id": "Account", "field_id": "Account", "type": "*composed", "value": "~3", "mandatory": true},
|
||||
{"id": "Subject", "field_id": "Subject", "type": "*composed", "value": "~3", "mandatory": true},
|
||||
{"id": "Destination", "field_id": "Destination", "type": "*composed", "value": "~4:s/0([1-9]\\d+)/+49${1}/", "mandatory": true},
|
||||
{"id": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "~5", "mandatory": true},
|
||||
{"id": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "~5", "mandatory": true},
|
||||
{"id": "Usage", "field_id": "Usage", "type": "*composed", "value": "~6", "mandatory": true},
|
||||
{"id": "HDRExtra3", "field_id": "HDRExtra3", "type": "*composed", "value": "~6", "mandatory": true},
|
||||
{"id": "HDRExtra2", "field_id": "HDRExtra2", "type": "*composed", "value": "~6", "mandatory": true},
|
||||
{"id": "HDRExtra1", "field_id": "HDRExtra1", "type": "*composed", "value": "~6", "mandatory": true},
|
||||
{"id": "RandomVal", "field_id": "RandomVal", "type": "*composed", "value": "*randomValue","filters":["*string:3:1001"]},
|
||||
],
|
||||
},
|
||||
],
|
||||
|
||||
|
||||
}
|
||||
@@ -498,12 +498,6 @@ func (cdr *CDR) combimedCdrFieldVal(cfgCdrFld *config.FCTemplate, groupCDRs []*C
|
||||
|
||||
// Extracts the value specified by cfgHdr out of cdr, used for export values
|
||||
func (cdr *CDR) exportFieldValue(cfgCdrFld *config.FCTemplate, filterS *FilterS) (retVal string, err error) {
|
||||
if pass, err := filterS.Pass(cdr.Tenant,
|
||||
cfgCdrFld.Filters, config.NewNavigableMap(cdr.AsMapStringIface())); err != nil {
|
||||
return "", err
|
||||
} else if !pass {
|
||||
return "", utils.ErrFilterNotPassingNoCaps
|
||||
}
|
||||
for _, rsrFld := range cfgCdrFld.Value {
|
||||
var cdrVal string
|
||||
switch cfgCdrFld.ID {
|
||||
@@ -597,6 +591,12 @@ func (cdr *CDR) formatField(cfgFld *config.FCTemplate, httpSkipTlsCheck bool,
|
||||
func (cdr *CDR) AsExportRecord(exportFields []*config.FCTemplate,
|
||||
httpSkipTlsCheck bool, groupedCDRs []*CDR, roundingDecs int, filterS *FilterS) (expRecord []string, err error) {
|
||||
for _, cfgFld := range exportFields {
|
||||
if pass, err := filterS.Pass(cdr.Tenant,
|
||||
cfgFld.Filters, config.NewNavigableMap(cdr.AsMapStringIface())); err != nil {
|
||||
return []string{}, err
|
||||
} else if !pass {
|
||||
continue
|
||||
}
|
||||
if roundingDecs != 0 {
|
||||
clnFld := new(config.FCTemplate) // Clone so we can modify the rounding decimals without affecting the template
|
||||
*clnFld = *cfgFld
|
||||
@@ -604,9 +604,6 @@ func (cdr *CDR) AsExportRecord(exportFields []*config.FCTemplate,
|
||||
cfgFld = clnFld
|
||||
}
|
||||
if fmtOut, err := cdr.formatField(cfgFld, httpSkipTlsCheck, groupedCDRs, filterS); err != nil {
|
||||
if err == utils.ErrFilterNotPassingNoCaps {
|
||||
continue // not exporting this field value
|
||||
}
|
||||
utils.Logger.Warning(fmt.Sprintf("<CDR> error: %s exporting field: %s, CDR: %s\n",
|
||||
err.Error(), utils.ToJSON(cfgFld), utils.ToJSON(cdr)))
|
||||
return nil, err
|
||||
|
||||
Reference in New Issue
Block a user