This commit is contained in:
TeoV
2018-09-17 06:49:49 -04:00
committed by Dan Christian Bogos
parent e13585a2ad
commit 4bcd7c9028
9 changed files with 64 additions and 29 deletions

View File

@@ -102,21 +102,21 @@ func (self *CsvRecordsProcessor) processFlatstoreRecord(record []string) ([]stri
// Takes the record from a slice and turns it into StoredCdrs, posting them to the cdrServer
func (self *CsvRecordsProcessor) processRecord(record []string) ([]*engine.CDR, error) {
csvProvider := newCsvProvider(record)
recordCdrs := make([]*engine.CDR, 0) // More CDRs based on the number of filters and field templates
for _, cdrcCfg := range self.cdrcCfgs { // cdrFields coming from more templates will produce individual storCdr records
tenant, err := cdrcCfg.Tenant.ParseValue("") // each profile of cdrc can have different tenant
if err != nil {
return nil, err
}
// Make sure filters are matching
if len(cdrcCfg.Filters) != 0 {
csvProvider := newCsvProvider(record)
tenant, err := cdrcCfg.Tenant.ParseValue("")
if err != nil {
return nil, err
}
if pass, err := self.filterS.Pass(tenant,
cdrcCfg.Filters, csvProvider); err != nil || !pass {
continue // Not passes filters, ignore this CDR
}
}
storedCdr, err := self.recordToStoredCdr(record, cdrcCfg)
storedCdr, err := self.recordToStoredCdr(record, cdrcCfg, tenant)
if err != nil {
return nil, fmt.Errorf("Failed converting to StoredCdr, error: %s", err.Error())
} else if self.dfltCdrcCfg.CdrFormat == utils.PartialCSV {
@@ -135,17 +135,13 @@ func (self *CsvRecordsProcessor) processRecord(record []string) ([]*engine.CDR,
}
// Takes the record out of csv and turns it into storedCdr which can be processed by CDRS
func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcCfg *config.CdrcConfig) (*engine.CDR, error) {
func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcCfg *config.CdrcConfig, tenant string) (*engine.CDR, error) {
storedCdr := &engine.CDR{OriginHost: "0.0.0.0", Source: cdrcCfg.CdrSourceId, ExtraFields: make(map[string]string), Cost: -1}
var err error
csvProvider := newCsvProvider(record) // used for filterS and for RSRParsers
var lazyHttpFields []*config.FCTemplate
for _, cdrFldCfg := range cdrcCfg.ContentFields {
if len(cdrFldCfg.Filters) != 0 {
tenant, err := cdrcCfg.Tenant.ParseValue("")
if err != nil {
return nil, err
}
if pass, err := self.filterS.Pass(tenant,
cdrFldCfg.Filters, csvProvider); err != nil {
return nil, err

View File

@@ -36,14 +36,14 @@ func TestCsvRecordToCDR(t *testing.T) {
Value: config.NewRSRParsersMustCompile("*default", true)})
csvProcessor := &CsvRecordsProcessor{dfltCdrcCfg: cdrcConfig, cdrcCfgs: []*config.CdrcConfig{cdrcConfig}}
cdrRow := []string{"firstField", "secondField"}
_, err := csvProcessor.recordToStoredCdr(cdrRow, cdrcConfig)
_, err := csvProcessor.recordToStoredCdr(cdrRow, cdrcConfig, "cgrates.org")
if err == nil {
t.Error("Failed to corectly detect missing fields from record")
}
cdrRow = []string{"ignored", "ignored", utils.VOICE, "acc1", utils.META_PREPAID, "*out", "cgrates.org",
"call", "1001", "1001", "+4986517174963", "2013-02-03 19:50:00", "2013-02-03 19:54:00",
"62s", "supplier1", "172.16.1.1", "NORMAL_DISCONNECT"}
rtCdr, err := csvProcessor.recordToStoredCdr(cdrRow, cdrcConfig)
rtCdr, err := csvProcessor.recordToStoredCdr(cdrRow, cdrcConfig, "cgrates.org")
if err != nil {
t.Error("Failed to parse CDR in rated cdr", err)
}
@@ -84,7 +84,7 @@ func TestCsvDataMultiplyFactor(t *testing.T) {
csvProcessor := &CsvRecordsProcessor{dfltCdrcCfg: cdrcConfig, cdrcCfgs: []*config.CdrcConfig{cdrcConfig}}
csvProcessor.cdrcCfgs[0].DataUsageMultiplyFactor = 0
cdrRow := []string{"*data", "1"}
rtCdr, err := csvProcessor.recordToStoredCdr(cdrRow, cdrcConfig)
rtCdr, err := csvProcessor.recordToStoredCdr(cdrRow, cdrcConfig, "cgrates.org")
if err != nil {
t.Error("Failed to parse CDR in rated cdr", err)
}
@@ -112,7 +112,7 @@ func TestCsvDataMultiplyFactor(t *testing.T) {
Cost: -1,
}
if rtCdr, _ := csvProcessor.recordToStoredCdr(cdrRow,
cdrcConfig); !reflect.DeepEqual(expectedCdr, rtCdr) {
cdrcConfig, "cgrates.org"); !reflect.DeepEqual(expectedCdr, rtCdr) {
t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr)
}
cdrRow = []string{"*voice", "1s"}
@@ -126,7 +126,7 @@ func TestCsvDataMultiplyFactor(t *testing.T) {
Cost: -1,
}
if rtCdr, _ := csvProcessor.recordToStoredCdr(cdrRow,
cdrcConfig); !reflect.DeepEqual(expectedCdr, rtCdr) {
cdrcConfig, "cgrates.org"); !reflect.DeepEqual(expectedCdr, rtCdr) {
t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr)
}
}

View File

@@ -192,7 +192,6 @@ func (self *FwvRecordsProcessor) recordToStoredCdr(record string, cdrcCfg *confi
if err := storedCdr.ParseFieldValue(cdrFldCfg.FieldId, fieldVal, self.timezone); err != nil {
return nil, err
}
}
if storedCdr.CGRID == "" && storedCdr.OriginID != "" && cfgKey != "*header" {
storedCdr.CGRID = utils.Sha1(storedCdr.OriginID, storedCdr.SetupTime.UTC().String())

View File

@@ -115,18 +115,18 @@ func (xmlProc *XMLRecordsProcessor) ProcessNextRecord() (cdrs []*engine.CDR, err
cdrXML := xmlProc.cdrXmlElmts[xmlProc.procItems]
xmlProc.procItems += 1
for _, cdrcCfg := range xmlProc.cdrcCfgs {
tenant, err := cdrcCfg.Tenant.ParseValue("")
if err != nil {
return nil, err
}
if len(cdrcCfg.Filters) != 0 {
xmlProvider := newXmlProvider(cdrXML, xmlProc.cdrPath)
tenant, err := cdrcCfg.Tenant.ParseValue("")
if err != nil {
return nil, err
}
if pass, err := xmlProc.filterS.Pass(tenant,
cdrcCfg.Filters, xmlProvider); err != nil || !pass {
continue // Not passes filters, ignore this CDR
}
}
if cdr, err := xmlProc.recordToCDR(cdrXML, cdrcCfg); err != nil {
if cdr, err := xmlProc.recordToCDR(cdrXML, cdrcCfg, tenant); err != nil {
return nil, fmt.Errorf("<CDRC> Failed converting to CDR, error: %s", err.Error())
} else {
cdrs = append(cdrs, cdr)
@@ -138,7 +138,7 @@ func (xmlProc *XMLRecordsProcessor) ProcessNextRecord() (cdrs []*engine.CDR, err
return cdrs, nil
}
func (xmlProc *XMLRecordsProcessor) recordToCDR(xmlEntity *etree.Element, cdrcCfg *config.CdrcConfig) (*engine.CDR, error) {
func (xmlProc *XMLRecordsProcessor) recordToCDR(xmlEntity *etree.Element, cdrcCfg *config.CdrcConfig, tenant string) (*engine.CDR, error) {
cdr := &engine.CDR{OriginHost: "0.0.0.0", Source: cdrcCfg.CdrSourceId, ExtraFields: make(map[string]string), Cost: -1}
var lazyHttpFields []*config.FCTemplate
var err error
@@ -146,10 +146,6 @@ func (xmlProc *XMLRecordsProcessor) recordToCDR(xmlEntity *etree.Element, cdrcCf
xmlProvider := newXmlProvider(xmlEntity, xmlProc.cdrPath)
for _, cdrFldCfg := range cdrcCfg.ContentFields {
if len(cdrFldCfg.Filters) != 0 {
tenant, err := cdrcCfg.Tenant.ParseValue("")
if err != nil {
return nil, err
}
if pass, err := xmlProc.filterS.Pass(tenant,
cdrFldCfg.Filters, xmlProvider); err != nil || !pass {
continue // Not passes filters, ignore this CDR

View File

@@ -18,6 +18,11 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package config
// import (
// "github.com/cgrates/cgrates/engine"
// "github.com/cgrates/cgrates/utils"
// )
func NewFCTemplateFromFCTemplateJsonCfg(jsnCfg *FcTemplateJsonCfg) *FCTemplate {
fcTmp := new(FCTemplate)
if jsnCfg.Tag != nil {
@@ -116,3 +121,20 @@ func FCTemplatesFromFCTemapltesJsonCfg(jsnCfgFlds []*FcTemplateJsonCfg) []*FCTem
}
return retFields
}
// type FCTemplates []*FCTemplate
// func (tmps *FCTemplates) AsNavigableMap(tenant, timezone string, dp DataProvider, filterS *engine.FilterS) (*NavigableMap, error) {
// nM := NewNavigableMap(nil)
// for _, tmp := range tmps {
// if len(tmp.Filters) != 0 {
// if pass, err := filterS.Pass(tenant,
// tmp.Filters, dp); err != nil || !pass {
// continue // Not passes filters, ignore this CDR
// }
// }
// switch tmp.Type {
// case utils.META_COMPOSED:
// }
// }
// }

View File

@@ -78,13 +78,14 @@
"cdrs": {
"enabled": true,
//"online_cdr_exports":["TemplateWithFilter"],
},
"cdre": {
"TemplateWithFilter": {
"export_format": "*file_csv",
"export_path": "/tmp",
"export_path": "/tmp/",
"filters" :["*string:Source:test2"],
"content_fields": [
{"tag": "CGRID", "type": "*composed", "value": "~CGRID"},

View File

@@ -206,6 +206,8 @@ func (cdr *CDR) ParseFieldValue(fieldId, fieldVal, timezone string) error {
if cdr.OrderID, err = strconv.ParseInt(fieldVal, 10, 64); err != nil {
return err
}
case utils.OriginHost: // overwrite if originHost is given from template
cdr.OriginHost = fieldVal
case utils.ToR:
cdr.ToR += fieldVal
case utils.RunID:

View File

@@ -26,6 +26,7 @@ import (
"net/url"
"os"
"path"
"path/filepath"
"strconv"
"sync"
"time"
@@ -479,7 +480,24 @@ func (cdre *CDRExporter) ExportCDRs() (err error) {
if contLen == 0 {
return
}
fileOut, err := os.Create(cdre.exportPath)
var expFormat string
switch cdre.exportFormat {
case utils.MetaFileFWV:
expFormat = "fwv"
case utils.MetaFileCSV:
expFormat = "csv"
default:
expFormat = cdre.exportFormat
}
utils.Logger.Debug(fmt.Sprintf("CDRS : %+v", cdre.cdrs))
expPath := cdre.exportPath
if len(filepath.Ext(expPath)) == 0 { // verify extension from exportPath (if have extension is file else is directory)
fileName := fmt.Sprintf("cdre_%s.%s", utils.UUIDSha1Prefix(), expFormat)
expPath = path.Join(expPath, fileName)
}
utils.Logger.Debug(fmt.Sprintf("expPath : %+v", expPath))
utils.Logger.Debug(fmt.Sprintf("-------------------------------------------------------------"))
fileOut, err := os.Create(expPath)
if err != nil {
return err
}

View File

@@ -487,6 +487,7 @@ func (self *CdrServer) getCostFromRater(cdr *CDR) (*CallCost, error) {
return cc, nil
}
// replicateCDRs used by online exports
func (self *CdrServer) replicateCDRs(cdrs []*CDR) (err error) {
for _, exportID := range self.cgrCfg.CDRSOnlineCDRExports {
expTpl := self.cgrCfg.CdreProfiles[exportID] // not checking for existence of profile since this should be done in a higher layer