Centralized replication/exports using field templates, CDRExporter with less parameters via API - moved to field templates

This commit is contained in:
DanB
2016-10-14 18:35:07 +02:00
parent 3c5c41d542
commit b7445e5e99
18 changed files with 129 additions and 211 deletions

View File

@@ -155,22 +155,6 @@ func (self *ApierV1) ExportCdrsToFile(attr utils.AttrExpFileCdrs, reply *utils.E
if attr.CostMultiplyFactor != nil && *attr.CostMultiplyFactor != 0.0 {
costMultiplyFactor = *attr.CostMultiplyFactor
}
costShiftDigits := exportTemplate.CostShiftDigits
if attr.CostShiftDigits != nil {
costShiftDigits = *attr.CostShiftDigits
}
roundingDecimals := exportTemplate.CostRoundingDecimals
if attr.RoundDecimals != nil {
roundingDecimals = *attr.RoundDecimals
}
maskDestId := exportTemplate.MaskDestinationID
if attr.MaskDestinationId != nil && len(*attr.MaskDestinationId) != 0 {
maskDestId = *attr.MaskDestinationId
}
maskLen := exportTemplate.MaskLength
if attr.MaskLength != nil {
maskLen = *attr.MaskLength
}
cdrsFltr, err := attr.AsCDRsFilter(self.Config.DefaultTimezone)
if err != nil {
return utils.NewErrServerError(err)
@@ -182,7 +166,8 @@ func (self *ApierV1) ExportCdrsToFile(attr utils.AttrExpFileCdrs, reply *utils.E
*reply = utils.ExportedFileCdrs{ExportedFilePath: ""}
return nil
}
cdrexp, err := cdre.NewCdrExporter(cdrs, self.CdrDb, exportTemplate, cdrFormat, fieldSep, exportId, dataUsageMultiplyFactor, smsUsageMultiplyFactor, mmsUsageMultiplyFactor, genericUsageMultiplyFactor, costMultiplyFactor, costShiftDigits, roundingDecimals, self.Config.RoundingDecimals, maskDestId, maskLen, self.Config.HttpSkipTlsVerify, self.Config.DefaultTimezone)
cdrexp, err := cdre.NewCdrExporter(cdrs, self.CdrDb, exportTemplate, cdrFormat, fieldSep, exportId, dataUsageMultiplyFactor, smsUsageMultiplyFactor,
mmsUsageMultiplyFactor, genericUsageMultiplyFactor, costMultiplyFactor, self.Config.RoundingDecimals, self.Config.HttpSkipTlsVerify)
if err != nil {
return utils.NewErrServerError(err)
}

View File

@@ -18,8 +18,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package v1
import (
"fmt"
"net/http"
"net/rpc"
"net/rpc/jsonrpc"
"path"
@@ -116,7 +114,6 @@ func TestCDRStatsLclPostCdrs(t *testing.T) {
if !*testLocal {
return
}
httpClient := new(http.Client)
storedCdrs := []*engine.CDR{
&engine.CDR{CGRID: utils.Sha1("dsafdsafa", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsafa",
OriginHost: "192.168.1.1", Source: "test",
@@ -147,9 +144,10 @@ func TestCDRStatsLclPostCdrs(t *testing.T) {
Usage: time.Duration(0) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
},
}
for _, storedCdr := range storedCdrs {
if _, err := httpClient.PostForm(fmt.Sprintf("http://%s/cdr_http", "127.0.0.1:2080"), storedCdr.AsHttpForm()); err != nil {
t.Error(err.Error())
for _, cdr := range storedCdrs {
var reply string
if err := cdrstRpc.Call("CdrsV1.ProcessCdr", cdr, &reply); err != nil {
t.Error(err)
}
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)

View File

@@ -91,22 +91,6 @@ func (self *ApierV2) ExportCdrsToFile(attr utils.AttrExportCdrsToFile, reply *ut
if attr.CostMultiplyFactor != nil && *attr.CostMultiplyFactor != 0.0 {
costMultiplyFactor = *attr.CostMultiplyFactor
}
costShiftDigits := exportTemplate.CostShiftDigits
if attr.CostShiftDigits != nil {
costShiftDigits = *attr.CostShiftDigits
}
roundingDecimals := exportTemplate.CostRoundingDecimals
if attr.RoundDecimals != nil {
roundingDecimals = *attr.RoundDecimals
}
maskDestId := exportTemplate.MaskDestinationID
if attr.MaskDestinationID != nil && len(*attr.MaskDestinationID) != 0 {
maskDestId = *attr.MaskDestinationID
}
maskLen := exportTemplate.MaskLength
if attr.MaskLength != nil {
maskLen = *attr.MaskLength
}
cdrsFltr, err := attr.RPCCDRsFilter.AsCDRsFilter(self.Config.DefaultTimezone)
if err != nil {
return utils.NewErrServerError(err)
@@ -118,7 +102,8 @@ func (self *ApierV2) ExportCdrsToFile(attr utils.AttrExportCdrsToFile, reply *ut
*reply = utils.ExportedFileCdrs{ExportedFilePath: ""}
return nil
}
cdrexp, err := cdre.NewCdrExporter(cdrs, self.CdrDb, exportTemplate, cdrFormat, fieldSep, ExportID, dataUsageMultiplyFactor, SMSUsageMultiplyFactor, MMSUsageMultiplyFactor, genericUsageMultiplyFactor, costMultiplyFactor, costShiftDigits, roundingDecimals, self.Config.RoundingDecimals, maskDestId, maskLen, self.Config.HttpSkipTlsVerify, self.Config.DefaultTimezone)
cdrexp, err := cdre.NewCdrExporter(cdrs, self.CdrDb, exportTemplate, cdrFormat, fieldSep, ExportID, dataUsageMultiplyFactor, SMSUsageMultiplyFactor,
MMSUsageMultiplyFactor, genericUsageMultiplyFactor, costMultiplyFactor, self.Config.RoundingDecimals, self.Config.HttpSkipTlsVerify)
if err != nil {
return utils.NewErrServerError(err)
}

View File

@@ -68,7 +68,7 @@ func (prc *PartialRecordsCache) dumpPartialRecords(originID string) {
csvWriter := csv.NewWriter(fileOut)
csvWriter.Comma = prc.csvSep
for _, cdr := range prc.partialRecords[originID].cdrs {
expRec, err := cdr.AsExportRecord(prc.partialRecords[originID].cacheDumpFields, 0, prc.roundDecimals, prc.timezone, prc.httpSkipTlsCheck, 0, "", nil)
expRec, err := cdr.AsExportRecord(prc.partialRecords[originID].cacheDumpFields, prc.httpSkipTlsCheck, nil)
if err != nil {
return nil, err
}

View File

@@ -49,7 +49,7 @@ var err error
func NewCdrExporter(cdrs []*engine.CDR, cdrDb engine.CdrStorage, exportTpl *config.CdreConfig, cdrFormat string, fieldSeparator rune, exportId string,
dataUsageMultiplyFactor, smsUsageMultiplyFactor, mmsUsageMultiplyFactor, genericUsageMultiplyFactor, costMultiplyFactor float64,
costShiftDigits, roundDecimals, cgrPrecision int, maskDestId string, maskLen int, httpSkipTlsCheck bool, timezone string) (*CdrExporter, error) {
cgrPrecision int, httpSkipTlsCheck bool) (*CdrExporter, error) {
if len(cdrs) == 0 { // Nothing to export
return nil, nil
}
@@ -63,13 +63,8 @@ func NewCdrExporter(cdrs []*engine.CDR, cdrDb engine.CdrStorage, exportTpl *conf
dataUsageMultiplyFactor: dataUsageMultiplyFactor,
mmsUsageMultiplyFactor: mmsUsageMultiplyFactor,
costMultiplyFactor: costMultiplyFactor,
costShiftDigits: costShiftDigits,
roundDecimals: roundDecimals,
cgrPrecision: cgrPrecision,
maskDestId: maskDestId,
httpSkipTlsCheck: httpSkipTlsCheck,
timezone: timezone,
maskLen: maskLen,
negativeExports: make(map[string]string),
}
if err := cdre.processCdrs(); err != nil {
@@ -90,16 +85,14 @@ type CdrExporter struct {
mmsUsageMultiplyFactor,
genericUsageMultiplyFactor,
costMultiplyFactor float64
costShiftDigits, roundDecimals, cgrPrecision int
maskDestId string
maskLen int
httpSkipTlsCheck bool
timezone string
header, trailer []string // Header and Trailer fields
content [][]string // Rows of cdr fields
firstCdrATime, lastCdrATime time.Time
numberOfRecords int
totalDuration, totalDataUsage, totalSmsUsage, totalMmsUsage, totalGenericUsage time.Duration
cgrPrecision int
httpSkipTlsCheck bool
header, trailer []string // Header and Trailer fields
content [][]string // Rows of cdr fields
firstCdrATime, lastCdrATime time.Time
numberOfRecords int
totalDuration, totalDataUsage, totalSmsUsage,
totalMmsUsage, totalGenericUsage time.Duration
totalCost float64
firstExpOrderId, lastExpOrderId int64
@@ -136,7 +129,7 @@ func (cdre *CdrExporter) metaHandler(tag, arg string) (string, error) {
emulatedCdr := &engine.CDR{ToR: utils.DATA, Usage: cdre.totalDataUsage}
return emulatedCdr.FormatUsage(arg), nil
case META_COSTCDRS:
return strconv.FormatFloat(utils.Round(cdre.totalCost, cdre.roundDecimals, utils.ROUNDING_MIDDLE), 'f', -1, 64), nil
return strconv.FormatFloat(utils.Round(cdre.totalCost, cdre.cgrPrecision, utils.ROUNDING_MIDDLE), 'f', -1, 64), nil
default:
return "", fmt.Errorf("Unsupported METATAG: %s", tag)
}
@@ -220,7 +213,7 @@ func (cdre *CdrExporter) processCdr(cdr *engine.CDR) error {
if cdre.costMultiplyFactor != 0.0 {
cdr.CostMultiply(cdre.costMultiplyFactor, cdre.cgrPrecision)
}
cdrRow, err := cdr.AsExportRecord(cdre.exportTemplate.ContentFields, cdre.costShiftDigits, cdre.roundDecimals, cdre.timezone, cdre.httpSkipTlsCheck, cdre.maskLen, cdre.maskDestId, cdre.cdrs)
cdrRow, err := cdr.AsExportRecord(cdre.exportTemplate.ContentFields, cdre.httpSkipTlsCheck, cdre.cdrs)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<CdreFw> Cannot export CDR with CGRID: %s and runid: %s, error: %s", cdr.CGRID, cdr.RunID, err.Error()))
return err
@@ -255,7 +248,7 @@ func (cdre *CdrExporter) processCdr(cdr *engine.CDR) error {
}
if cdr.Cost != -1 {
cdre.totalCost += cdr.Cost
cdre.totalCost = utils.Round(cdre.totalCost, cdre.roundDecimals, utils.ROUNDING_MIDDLE)
cdre.totalCost = utils.Round(cdre.totalCost, cdre.cgrPrecision, utils.ROUNDING_MIDDLE)
}
if cdre.firstExpOrderId > cdr.OrderID || cdre.firstExpOrderId == 0 {
cdre.firstExpOrderId = cdr.OrderID

View File

@@ -39,8 +39,7 @@ func TestCsvCdrWriter(t *testing.T) {
Usage: time.Duration(10) * time.Second, RunID: utils.DEFAULT_RUNID,
ExtraFields: map[string]string{"extra1": "val_extra1", "extra2": "val_extra2", "extra3": "val_extra3"}, Cost: 1.01,
}
cdre, err := NewCdrExporter([]*engine.CDR{storedCdr1}, nil, cfg.CdreProfiles["*default"], utils.CSV, ',', "firstexport", 0.0, 0.0, 0.0, 0.0, 0.0, 0, 4,
cfg.RoundingDecimals, "", 0, cfg.HttpSkipTlsVerify, "")
cdre, err := NewCdrExporter([]*engine.CDR{storedCdr1}, nil, cfg.CdreProfiles["*default"], utils.CSV, ',', "firstexport", 0.0, 0.0, 0.0, 0.0, 0.0, cfg.RoundingDecimals, cfg.HttpSkipTlsVerify)
if err != nil {
t.Error("Unexpected error received: ", err)
}
@@ -68,7 +67,7 @@ func TestAlternativeFieldSeparator(t *testing.T) {
ExtraFields: map[string]string{"extra1": "val_extra1", "extra2": "val_extra2", "extra3": "val_extra3"}, Cost: 1.01,
}
cdre, err := NewCdrExporter([]*engine.CDR{storedCdr1}, nil, cfg.CdreProfiles["*default"], utils.CSV, '|',
"firstexport", 0.0, 0.0, 0.0, 0.0, 0.0, 0, 4, cfg.RoundingDecimals, "", 0, cfg.HttpSkipTlsVerify, "")
"firstexport", 0.0, 0.0, 0.0, 0.0, 0.0, cfg.RoundingDecimals, cfg.HttpSkipTlsVerify)
if err != nil {
t.Error("Unexpected error received: ", err)
}

View File

@@ -125,8 +125,8 @@ func TestWriteCdr(t *testing.T) {
Usage: time.Duration(10) * time.Second, RunID: utils.DEFAULT_RUNID, Cost: 2.34567,
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"},
}
cdre, err := NewCdrExporter([]*engine.CDR{cdr}, nil, cdreCfg, utils.CDRE_FIXED_WIDTH, ',', "fwv_1", 0.0, 0.0, 0.0, 0.0, 0.0, 0, 4,
cfg.RoundingDecimals, "", -1, cfg.HttpSkipTlsVerify, "")
cdre, err := NewCdrExporter([]*engine.CDR{cdr}, nil, cdreCfg, utils.CDRE_FIXED_WIDTH, ',', "fwv_1", 0.0, 0.0, 0.0, 0.0, 0.0,
cfg.RoundingDecimals, cfg.HttpSkipTlsVerify)
if err != nil {
t.Error(err)
}
@@ -152,7 +152,7 @@ func TestWriteCdr(t *testing.T) {
t.Error("Unexpected number of records in the stats: ", cdre.numberOfRecords)
} else if cdre.totalDuration != cdr.Usage {
t.Error("Unexpected total duration in the stats: ", cdre.totalDuration)
} else if cdre.totalCost != utils.Round(cdr.Cost, cdre.roundDecimals, utils.ROUNDING_MIDDLE) {
} else if cdre.totalCost != utils.Round(cdr.Cost, cdre.cgrPrecision, utils.ROUNDING_MIDDLE) {
t.Error("Unexpected total cost in the stats: ", cdre.totalCost)
}
if cdre.FirstOrderId() != 1 {
@@ -161,7 +161,7 @@ func TestWriteCdr(t *testing.T) {
if cdre.LastOrderId() != 1 {
t.Error("Unexpected LastOrderId", cdre.LastOrderId())
}
if cdre.TotalCost() != utils.Round(cdr.Cost, cdre.roundDecimals, utils.ROUNDING_MIDDLE) {
if cdre.TotalCost() != utils.Round(cdr.Cost, cdre.cgrPrecision, utils.ROUNDING_MIDDLE) {
t.Error("Unexpected TotalCost: ", cdre.TotalCost())
}
}
@@ -201,7 +201,7 @@ func TestWriteCdrs(t *testing.T) {
}
cfg, _ := config.NewDefaultCGRConfig()
cdre, err := NewCdrExporter([]*engine.CDR{cdr1, cdr2, cdr3, cdr4}, nil, cdreCfg, utils.CDRE_FIXED_WIDTH, ',',
"fwv_1", 0.0, 0.0, 0.0, 0.0, 0.0, 0, 4, cfg.RoundingDecimals, "", -1, cfg.HttpSkipTlsVerify, "")
"fwv_1", 0.0, 0.0, 0.0, 0.0, 0.0, cfg.RoundingDecimals, cfg.HttpSkipTlsVerify)
if err != nil {
t.Error(err)
}
@@ -224,7 +224,7 @@ func TestWriteCdrs(t *testing.T) {
if cdre.totalDuration != time.Duration(330)*time.Second {
t.Error("Unexpected total duration in the stats: ", cdre.totalDuration)
}
if cdre.totalCost != 5.9957 {
if cdre.totalCost != 5.99568 {
t.Error("Unexpected total cost in the stats: ", cdre.totalCost)
}
if cdre.FirstOrderId() != 2 {
@@ -233,7 +233,7 @@ func TestWriteCdrs(t *testing.T) {
if cdre.LastOrderId() != 4 {
t.Error("Unexpected LastOrderId", cdre.LastOrderId())
}
if cdre.TotalCost() != 5.9957 {
if cdre.TotalCost() != 5.99568 {
t.Error("Unexpected TotalCost: ", cdre.TotalCost())
}
}

View File

@@ -26,10 +26,6 @@ type CdreConfig struct {
MMSUsageMultiplyFactor float64
GenericUsageMultiplyFactor float64
CostMultiplyFactor float64
CostRoundingDecimals int
CostShiftDigits int
MaskDestinationID string
MaskLength int
ExportDirectory string
HeaderFields []*CfgCdrField
ContentFields []*CfgCdrField
@@ -63,18 +59,6 @@ func (self *CdreConfig) loadFromJsonCfg(jsnCfg *CdreJsonCfg) error {
if jsnCfg.Cost_multiply_factor != nil {
self.CostMultiplyFactor = *jsnCfg.Cost_multiply_factor
}
if jsnCfg.Cost_rounding_decimals != nil {
self.CostRoundingDecimals = *jsnCfg.Cost_rounding_decimals
}
if jsnCfg.Cost_shift_digits != nil {
self.CostShiftDigits = *jsnCfg.Cost_shift_digits
}
if jsnCfg.Mask_destination_id != nil {
self.MaskDestinationID = *jsnCfg.Mask_destination_id
}
if jsnCfg.Mask_length != nil {
self.MaskLength = *jsnCfg.Mask_length
}
if jsnCfg.Export_directory != nil {
self.ExportDirectory = *jsnCfg.Export_directory
}
@@ -106,10 +90,6 @@ func (self *CdreConfig) Clone() *CdreConfig {
clnCdre.MMSUsageMultiplyFactor = self.MMSUsageMultiplyFactor
clnCdre.GenericUsageMultiplyFactor = self.GenericUsageMultiplyFactor
clnCdre.CostMultiplyFactor = self.CostMultiplyFactor
clnCdre.CostRoundingDecimals = self.CostRoundingDecimals
clnCdre.CostShiftDigits = self.CostShiftDigits
clnCdre.MaskDestinationID = self.MaskDestinationID
clnCdre.MaskLength = self.MaskLength
clnCdre.ExportDirectory = self.ExportDirectory
clnCdre.HeaderFields = make([]*CfgCdrField, len(self.HeaderFields))
for idx, fld := range self.HeaderFields {

View File

@@ -43,10 +43,6 @@ func TestCdreCfgClone(t *testing.T) {
FieldSeparator: rune(','),
DataUsageMultiplyFactor: 1.0,
CostMultiplyFactor: 1.0,
CostRoundingDecimals: -1,
CostShiftDigits: 0,
MaskDestinationID: "MASKED_DESTINATIONS",
MaskLength: 0,
ExportDirectory: "/var/spool/cgrates/cdre",
ContentFields: initContentFlds,
}
@@ -65,10 +61,6 @@ func TestCdreCfgClone(t *testing.T) {
FieldSeparator: rune(','),
DataUsageMultiplyFactor: 1.0,
CostMultiplyFactor: 1.0,
CostRoundingDecimals: -1,
CostShiftDigits: 0,
MaskDestinationID: "MASKED_DESTINATIONS",
MaskLength: 0,
ExportDirectory: "/var/spool/cgrates/cdre",
HeaderFields: emptyFields,
ContentFields: eClnContentFlds,
@@ -86,10 +78,6 @@ func TestCdreCfgClone(t *testing.T) {
if !reflect.DeepEqual(eClnCdreCfg, clnCdreCfg) { // MOdifying a field after clone should not affect cloned instance
t.Errorf("Cloned result: %+v", clnCdreCfg)
}
clnCdreCfg.CostShiftDigits = 2
if initCdreCfg.CostShiftDigits != 0 {
t.Error("Unexpected CostShiftDigits: ", initCdreCfg.CostShiftDigits)
}
clnCdreCfg.ContentFields[0].FieldId = "destination"
if initCdreCfg.ContentFields[0].FieldId != "cgrid" {
t.Error("Unexpected change of FieldId: ", initCdreCfg.ContentFields[0].FieldId)

View File

@@ -64,22 +64,42 @@ func NewCfgCdrFieldFromCdrFieldJsonCfg(jsnCfgFld *CdrFieldJsonCfg) (*CfgCdrField
if jsnCfgFld.Mandatory != nil {
cfgFld.Mandatory = *jsnCfgFld.Mandatory
}
if jsnCfgFld.Cost_shift_digits != nil {
cfgFld.CostShiftDigits = *jsnCfgFld.Cost_shift_digits
}
if jsnCfgFld.Rounding_decimals != nil {
cfgFld.RoundingDecimals = *jsnCfgFld.Rounding_decimals
}
if jsnCfgFld.Timezone != nil {
cfgFld.Timezone = *jsnCfgFld.Timezone
}
if jsnCfgFld.Mask_length != nil {
cfgFld.MaskLen = *jsnCfgFld.Mask_length
}
if jsnCfgFld.Mask_destinationd_id != nil {
cfgFld.MaskDestID = *jsnCfgFld.Mask_destinationd_id
}
return cfgFld, nil
}
type CfgCdrField struct {
Tag string // Identifier for the administrator
Type string // Type of field
FieldId string // Field identifier
HandlerId string
Value utils.RSRFields
Append bool
FieldFilter utils.RSRFields
Width int
Strip string
Padding string
Layout string
Mandatory bool
Tag string // Identifier for the administrator
Type string // Type of field
FieldId string // Field identifier
HandlerId string
Value utils.RSRFields
Append bool
FieldFilter utils.RSRFields
Width int
Strip string
Padding string
Layout string
Mandatory bool
CostShiftDigits int // Used in exports
RoundingDecimals int
Timezone string
MaskLen int
MaskDestID string
}
func CfgCdrFieldsFromCdrFieldsJsonCfg(jsnCfgFldss []*CdrFieldJsonCfg) ([]*CfgCdrField, error) {

View File

@@ -243,11 +243,7 @@ const CGRATES_CFG_JSON = `
"sms_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from SMS unit to call duration in some billing systems)
"mms_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from MMS unit to call duration in some billing systems)
"generic_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from GENERIC unit to call duration in some billing systems)
"cost_multiply_factor": 1, // multiply cost before export, eg: add VAT
"cost_rounding_decimals": -1, // rounding decimals for Cost values. -1 to disable rounding
"cost_shift_digits": 0, // shift digits in the cost on export (eg: convert from EUR to cents)
"mask_destination_id": "MASKED_DESTINATIONS", // destination id containing called addresses to be masked on export
"mask_length": 0, // length of the destination suffix to be masked
"cost_multiply_factor": 1, // multiply cost before export, eg: add VAT
"export_directory": "/var/spool/cgrates/cdre", // path where the exported CDRs will be placed
"header_fields": [], // template of the exported header fields
"content_fields": [ // template of the exported content fields
@@ -265,7 +261,7 @@ const CGRATES_CFG_JSON = `
{"tag":"SetupTime", "type": "*composed", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag":"AnswerTime", "type": "*composed", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag":"Usage", "type": "*composed", "value": "Usage"},
{"tag":"Cost", "type": "*composed", "value": "Cost"},
{"tag":"Cost", "type": "*composed", "value": "Cost", "rounding_decimals": 4},
],
"trailer_fields": [], // template of the exported trailer fields
},

View File

@@ -255,8 +255,9 @@ func TestDfCdreJsonCfgs(t *testing.T) {
Type: utils.StringPointer(utils.META_COMPOSED),
Value: utils.StringPointer(utils.USAGE)},
&CdrFieldJsonCfg{Tag: utils.StringPointer("Cost"),
Type: utils.StringPointer(utils.META_COMPOSED),
Value: utils.StringPointer(utils.COST)},
Type: utils.StringPointer(utils.META_COMPOSED),
Value: utils.StringPointer(utils.COST),
Rounding_decimals: utils.IntPointer(4)},
}
eCfg := map[string]*CdreJsonCfg{
utils.META_DEFAULT: &CdreJsonCfg{
@@ -267,10 +268,6 @@ func TestDfCdreJsonCfgs(t *testing.T) {
Mms_usage_multiply_factor: utils.Float64Pointer(1.0),
Generic_usage_multiply_factor: utils.Float64Pointer(1.0),
Cost_multiply_factor: utils.Float64Pointer(1.0),
Cost_rounding_decimals: utils.IntPointer(-1),
Cost_shift_digits: utils.IntPointer(0),
Mask_destination_id: utils.StringPointer("MASKED_DESTINATIONS"),
Mask_length: utils.IntPointer(0),
Export_directory: utils.StringPointer("/var/spool/cgrates/cdre"),
Header_fields: &eFields,
Content_fields: &eContentFlds,

View File

@@ -114,18 +114,23 @@ type CdrStatsJsonCfg struct {
// One cdr field config, used in cdre and cdrc
type CdrFieldJsonCfg struct {
Tag *string
Type *string
Field_id *string
Handler_id *string
Value *string
Append *bool
Width *int
Strip *string
Padding *string
Layout *string
Field_filter *string
Mandatory *bool
Tag *string
Type *string
Field_id *string
Handler_id *string
Value *string
Append *bool
Width *int
Strip *string
Padding *string
Layout *string
Field_filter *string
Mandatory *bool
Cost_shift_digits *int
Rounding_decimals *int
Timezone *string
Mask_destinationd_id *string
Mask_length *int
}
// Cdre config section
@@ -137,10 +142,6 @@ type CdreJsonCfg struct {
Mms_usage_multiply_factor *float64
Generic_usage_multiply_factor *float64
Cost_multiply_factor *float64
Cost_rounding_decimals *int
Cost_shift_digits *int
Mask_destination_id *string
Mask_length *int
Export_directory *string
Header_fields *[]*CdrFieldJsonCfg
Content_fields *[]*CdrFieldJsonCfg

View File

@@ -87,9 +87,6 @@ func TestMfCdreExport1Instance(t *testing.T) {
if mfCgrCfg.CdreProfiles[prfl].DataUsageMultiplyFactor != 1.0 {
t.Error("Export1 instance has DataUsageMultiplyFormat: ", mfCgrCfg.CdreProfiles[prfl].DataUsageMultiplyFactor)
}
if mfCgrCfg.CdreProfiles[prfl].CostRoundingDecimals != 3.0 {
t.Error("Export1 instance has cdrFormat: ", mfCgrCfg.CdreProfiles[prfl].CostRoundingDecimals)
}
if len(mfCgrCfg.CdreProfiles[prfl].HeaderFields) != 2 {
t.Error("Export1 instance has number of header fields: ", len(mfCgrCfg.CdreProfiles[prfl].HeaderFields))
}

View File

@@ -21,7 +21,6 @@ import (
"encoding/json"
"fmt"
"math"
"net/url"
"strconv"
"strings"
"time"
@@ -282,35 +281,6 @@ func (cdr *CDR) Clone() *CDR {
return &clnedCDR
}
// Ability to send the CgrCdr remotely to another CDR server, we do not include rating variables for now
func (cdr *CDR) AsHttpForm() url.Values {
v := url.Values{}
for fld, val := range cdr.ExtraFields {
v.Set(fld, val)
}
v.Set(utils.TOR, cdr.ToR)
v.Set(utils.ACCID, cdr.OriginID)
v.Set(utils.CDRHOST, cdr.OriginHost)
v.Set(utils.CDRSOURCE, cdr.Source)
v.Set(utils.REQTYPE, cdr.RequestType)
v.Set(utils.DIRECTION, cdr.Direction)
v.Set(utils.TENANT, cdr.Tenant)
v.Set(utils.CATEGORY, cdr.Category)
v.Set(utils.ACCOUNT, cdr.Account)
v.Set(utils.SUBJECT, cdr.Subject)
v.Set(utils.DESTINATION, cdr.Destination)
v.Set(utils.SETUP_TIME, cdr.SetupTime.Format(time.RFC3339))
v.Set(utils.PDD, cdr.FieldAsString(&utils.RSRField{Id: utils.PDD}))
v.Set(utils.ANSWER_TIME, cdr.AnswerTime.Format(time.RFC3339))
v.Set(utils.USAGE, cdr.FormatUsage(utils.SECONDS))
v.Set(utils.SUPPLIER, cdr.Supplier)
v.Set(utils.DISCONNECT_CAUSE, cdr.DisconnectCause)
if cdr.CostDetails != nil {
v.Set(utils.COST_DETAILS, cdr.CostDetailsJson())
}
return v
}
// Used in mediation, primaryMandatory marks whether missing field out of request represents error or can be ignored
func (cdr *CDR) ForkCdr(runId string, RequestTypeFld, directionFld, tenantFld, categFld, accountFld, subjectFld, destFld, setupTimeFld, PDDFld,
answerTimeFld, durationFld, supplierFld, disconnectCauseFld, ratedFld, costFld *utils.RSRField,
@@ -718,7 +688,7 @@ func (cdr *CDR) combimedCdrFieldVal(cfgCdrFld *config.CfgCdrField, groupCDRs []*
}
// Extracts the value specified by cfgHdr out of cdr, used for export values
func (cdr *CDR) exportFieldValue(cfgCdrFld *config.CfgCdrField, costShiftDigits, roundDecimals int, layout string, maskLen int, maskDestID string) (string, error) {
func (cdr *CDR) exportFieldValue(cfgCdrFld *config.CfgCdrField) (string, error) {
passesFilters := true
for _, cdfFltr := range cfgCdrFld.FieldFilter {
if !cdfFltr.FilterPasses(cdr.FieldAsString(cdfFltr)) {
@@ -741,17 +711,17 @@ func (cdr *CDR) exportFieldValue(cfgCdrFld *config.CfgCdrField, costShiftDigits,
cdrVal = string(jsonVal)
}
case utils.COST:
cdrVal = cdr.FormatCost(costShiftDigits, roundDecimals)
cdrVal = cdr.FormatCost(cfgCdrFld.CostShiftDigits, cfgCdrFld.RoundingDecimals)
case utils.USAGE:
cdrVal = cdr.FormatUsage(layout)
cdrVal = cdr.FormatUsage(cfgCdrFld.Layout)
case utils.SETUP_TIME:
cdrVal = cdr.SetupTime.Format(layout)
cdrVal = cdr.SetupTime.Format(cfgCdrFld.Layout)
case utils.ANSWER_TIME: // Format time based on layout
cdrVal = cdr.AnswerTime.Format(layout)
cdrVal = cdr.AnswerTime.Format(cfgCdrFld.Layout)
case utils.DESTINATION:
cdrVal = cdr.FieldAsString(rsrFld)
if maskLen != -1 && len(maskDestID) != 0 && CachedDestHasPrefix(maskDestID, cdrVal) {
cdrVal = utils.MaskSuffix(cdrVal, maskLen)
if cfgCdrFld.MaskLen != -1 && len(cfgCdrFld.MaskDestID) != 0 && CachedDestHasPrefix(cfgCdrFld.MaskDestID, cdrVal) {
cdrVal = utils.MaskSuffix(cdrVal, cfgCdrFld.MaskLen)
}
default:
cdrVal = cdr.FieldAsString(rsrFld)
@@ -761,8 +731,7 @@ func (cdr *CDR) exportFieldValue(cfgCdrFld *config.CfgCdrField, costShiftDigits,
return retVal, nil
}
func (cdr *CDR) formatField(cfgFld *config.CfgCdrField, costShiftDigits, roundDecimals int, timezone string,
httpSkipTlsCheck bool, maskLen int, maskDestID string, groupedCDRs []*CDR) (fmtOut string, err error) {
func (cdr *CDR) formatField(cfgFld *config.CfgCdrField, httpSkipTlsCheck bool, groupedCDRs []*CDR) (fmtOut string, err error) {
layout := cfgFld.Layout
if layout == "" {
layout = time.RFC3339
@@ -775,11 +744,11 @@ func (cdr *CDR) formatField(cfgFld *config.CfgCdrField, costShiftDigits, roundDe
case utils.META_CONSTANT:
outVal = cfgFld.Value.Id()
case utils.MetaDateTime: // Convert the requested field value into datetime with layout
rawVal, err := cdr.exportFieldValue(cfgFld, costShiftDigits, roundDecimals, layout, maskLen, maskDestID)
rawVal, err := cdr.exportFieldValue(cfgFld)
if err != nil {
return "", err
}
if dtFld, err := utils.ParseTimeDetectLayout(rawVal, timezone); err != nil { // Only one rule makes sense here
if dtFld, err := utils.ParseTimeDetectLayout(rawVal, cfgFld.Timezone); err != nil { // Only one rule makes sense here
return "", err
} else {
outVal = dtFld.Format(layout)
@@ -802,9 +771,9 @@ func (cdr *CDR) formatField(cfgFld *config.CfgCdrField, costShiftDigits, roundDe
case utils.META_COMBIMED:
outVal, err = cdr.combimedCdrFieldVal(cfgFld, groupedCDRs)
case utils.META_COMPOSED:
outVal, err = cdr.exportFieldValue(cfgFld, costShiftDigits, roundDecimals, layout, maskLen, maskDestID)
outVal, err = cdr.exportFieldValue(cfgFld)
case utils.MetaMaskedDestination:
if len(maskDestID) != 0 && CachedDestHasPrefix(maskDestID, cdr.Destination) {
if len(cfgFld.MaskDestID) != 0 && CachedDestHasPrefix(cfgFld.MaskDestID, cdr.Destination) {
outVal = "1"
} else {
outVal = "0"
@@ -819,10 +788,10 @@ func (cdr *CDR) formatField(cfgFld *config.CfgCdrField, costShiftDigits, roundDe
// Used in place where we need to export the CDR based on an export template
// ExportRecord is a []string to keep it compatible with encoding/csv Writer
func (cdr *CDR) AsExportRecord(exportFields []*config.CfgCdrField, costShiftDigits, roundDecimals int, timezone string, httpSkipTlsCheck bool, maskLen int, maskDestID string, groupedCDRs []*CDR) (expRecord []string, err error) {
func (cdr *CDR) AsExportRecord(exportFields []*config.CfgCdrField, httpSkipTlsCheck bool, groupedCDRs []*CDR) (expRecord []string, err error) {
expRecord = make([]string, len(exportFields))
for idx, cfgFld := range exportFields {
if fmtOut, err := cdr.formatField(cfgFld, costShiftDigits, roundDecimals, timezone, httpSkipTlsCheck, maskLen, maskDestID, groupedCDRs); err != nil {
if fmtOut, err := cdr.formatField(cfgFld, httpSkipTlsCheck, groupedCDRs); err != nil {
return nil, err
} else {
expRecord[idx] += fmtOut
@@ -838,8 +807,16 @@ func (cdr *CDR) AsMapStringIface() (map[string]interface{}, error) {
// AsExportMap converts the CDR into a map[string]string based on export template
// Used in real-time replication as well as remote exports
func (cdr *CDR) AsExportMap(exportFields []*config.CfgCdrField, costShiftDigits, roundDecimals int, timezone string, httpSkipTlsCheck bool, maskLen int, maskDestID string, groupedCDRs []*CDR) (map[string]string, error) {
return nil, nil
func (cdr *CDR) AsExportMap(exportFields []*config.CfgCdrField, httpSkipTlsCheck bool, groupedCDRs []*CDR) (expMap map[string]string, err error) {
expMap = make(map[string]string)
for _, cfgFld := range exportFields {
if fmtOut, err := cdr.formatField(cfgFld, httpSkipTlsCheck, groupedCDRs); err != nil {
return nil, err
} else {
expMap[cfgFld.FieldId] += fmtOut
}
}
return
}
type ExternalCDR struct {

View File

@@ -200,6 +200,7 @@ func TestFormatUsage(t *testing.T) {
}
}
/*
func TestCDRAsHttpForm(t *testing.T) {
storCdr := CDR{CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE, OriginID: "dsafdsaf",
OriginHost: "192.168.1.1", Source: utils.UNIT_TEST, RequestType: utils.META_RATED, Direction: "*out",
@@ -261,6 +262,7 @@ func TestCDRAsHttpForm(t *testing.T) {
t.Errorf("Expected: %s, received: %s", "valextr2", cdrForm.Get("fieldextr2"))
}
}
*/
func TestCDRForkCdr(t *testing.T) {
storCdr := CDR{CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, ToR: utils.VOICE,
@@ -501,36 +503,36 @@ func TestCDRAsExportRecord(t *testing.T) {
ExtraFields: map[string]string{"stop_time": "2014-06-11 19:19:00 +0000 UTC", "fieldextr2": "valextr2"}}
val, _ := utils.ParseRSRFields(utils.DESTINATION, utils.INFIELD_SEP)
cfgCdrFld := &config.CfgCdrField{Tag: "destination", Type: utils.META_COMPOSED, FieldId: utils.DESTINATION, Value: val}
if expRecord, err := cdr.AsExportRecord([]*config.CfgCdrField{cfgCdrFld}, 0, 0, "UTC", false, 0, "", nil); err != nil {
cfgCdrFld := &config.CfgCdrField{Tag: "destination", Type: utils.META_COMPOSED, FieldId: utils.DESTINATION, Value: val, Timezone: "UTC"}
if expRecord, err := cdr.AsExportRecord([]*config.CfgCdrField{cfgCdrFld}, false, nil); err != nil {
t.Error(err)
} else if expRecord[0] != cdr.Destination {
t.Errorf("Expecting: %s, received: %s", cdr.Destination, expRecord[0])
}
fltr, _ := utils.ParseRSRFields("Tenant(itsyscom.com)", utils.INFIELD_SEP)
cfgCdrFld = &config.CfgCdrField{Tag: "destination", Type: utils.META_COMPOSED, FieldId: utils.DESTINATION, Value: val, FieldFilter: fltr}
if _, err := cdr.AsExportRecord([]*config.CfgCdrField{cfgCdrFld}, 0, 0, "UTC", false, 0, "", nil); err == nil {
cfgCdrFld = &config.CfgCdrField{Tag: "destination", Type: utils.META_COMPOSED, FieldId: utils.DESTINATION, Value: val, FieldFilter: fltr, Timezone: "UTC"}
if _, err := cdr.AsExportRecord([]*config.CfgCdrField{cfgCdrFld}, false, nil); err == nil {
t.Error("Failed to use filter")
}
// Test MetaDateTime
val, _ = utils.ParseRSRFields("stop_time", utils.INFIELD_SEP)
layout := "2006-01-02 15:04:05"
cfgCdrFld = &config.CfgCdrField{Tag: "stop_time", Type: utils.MetaDateTime, FieldId: "stop_time", Value: val, Layout: layout}
if expRecord, err := cdr.AsExportRecord([]*config.CfgCdrField{cfgCdrFld}, 0, 0, "UTC", false, 0, "", nil); err != nil {
cfgCdrFld = &config.CfgCdrField{Tag: "stop_time", Type: utils.MetaDateTime, FieldId: "stop_time", Value: val, Layout: layout, Timezone: "UTC"}
if expRecord, err := cdr.AsExportRecord([]*config.CfgCdrField{cfgCdrFld}, false, nil); err != nil {
t.Error(err)
} else if expRecord[0] != "2014-06-11 19:19:00" {
t.Error("Expecting: 2014-06-11 19:19:00, got: ", expRecord[0])
}
// Test filter
fltr, _ = utils.ParseRSRFields("Tenant(itsyscom.com)", utils.INFIELD_SEP)
cfgCdrFld = &config.CfgCdrField{Tag: "stop_time", Type: utils.MetaDateTime, FieldId: "stop_time", Value: val, FieldFilter: fltr, Layout: layout}
if _, err := cdr.AsExportRecord([]*config.CfgCdrField{cfgCdrFld}, 0, 0, "UTC", false, 0, "", nil); err == nil {
cfgCdrFld = &config.CfgCdrField{Tag: "stop_time", Type: utils.MetaDateTime, FieldId: "stop_time", Value: val, FieldFilter: fltr, Layout: layout, Timezone: "UTC"}
if _, err := cdr.AsExportRecord([]*config.CfgCdrField{cfgCdrFld}, false, nil); err == nil {
t.Error("Received empty error", err)
}
val, _ = utils.ParseRSRFields("fieldextr2", utils.INFIELD_SEP)
cfgCdrFld = &config.CfgCdrField{Tag: "stop_time", Type: utils.MetaDateTime, FieldId: "stop_time", Value: val, Layout: layout}
cfgCdrFld = &config.CfgCdrField{Tag: "stop_time", Type: utils.MetaDateTime, FieldId: "stop_time", Value: val, Layout: layout, Timezone: "UTC"}
// Test time parse error
if _, err := cdr.AsExportRecord([]*config.CfgCdrField{cfgCdrFld}, 0, 0, "UTC", false, 0, "", nil); err == nil {
if _, err := cdr.AsExportRecord([]*config.CfgCdrField{cfgCdrFld}, false, nil); err == nil {
t.Error("Should give error here, got none.")
}
}

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
"path"
"reflect"
"strings"
@@ -462,7 +463,14 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error {
switch rplCfg.Transport {
case utils.META_HTTP_POST:
content = utils.CONTENT_FORM
body = cdr.AsHttpForm()
expMp, err := cdr.AsExportMap(rplCfg.ContentFields, self.cgrCfg.HttpSkipTlsVerify, nil)
if err != nil {
return err
}
body := url.Values{}
for fld, val := range expMp {
body.Set(fld, val)
}
case utils.META_HTTP_JSON:
content = utils.CONTENT_JSON
jsn, err := json.Marshal(cdr)

View File

@@ -594,10 +594,6 @@ type AttrExpFileCdrs struct {
MmsUsageMultiplyFactor *float64 // Multiply mms usage before export (eg: convert from MMS unit to call duration for some billing systems)
GenericUsageMultiplyFactor *float64 // Multiply generic usage before export (eg: convert from GENERIC unit to call duration for some billing systems)
CostMultiplyFactor *float64 // Multiply the cost before export, eg: apply VAT
CostShiftDigits *int // If defined it will shift cost digits before applying rouding (eg: convert from Eur->cents), -1 to use general config ones
RoundDecimals *int // Overwrite configured roundDecimals with this dynamically, -1 to use general config ones
MaskDestinationId *string // Overwrite configured MaskDestId
MaskLength *int // Overwrite configured MaskLength, -1 to use general config ones
CgrIds []string // If provided, it will filter based on the cgrids present in list
MediationRunIds []string // If provided, it will filter on mediation runid
TORs []string // If provided, filter on TypeOfRecord
@@ -1087,10 +1083,6 @@ type AttrExportCdrsToFile struct {
MMSUsageMultiplyFactor *float64 // Multiply mms usage before export (eg: convert from MMS unit to call duration for some billing systems)
GenericUsageMultiplyFactor *float64 // Multiply generic usage before export (eg: convert from GENERIC unit to call duration for some billing systems)
CostMultiplyFactor *float64 // Multiply the cost before export, eg: apply VAT
CostShiftDigits *int // If defined it will shift cost digits before applying rouding (eg: convert from Eur->cents), -1 to use general config ones
RoundDecimals *int // Overwrite configured roundDecimals with this dynamically, -1 to use general config ones
MaskDestinationID *string // Overwrite configured MaskDestId
MaskLength *int // Overwrite configured MaskLength, -1 to use general config ones
Verbose bool // Disable CgrIds reporting in reply/ExportedCgrIds and reply/UnexportedCgrIds
RPCCDRsFilter // Inherit the CDR filter attributes
}