Updated CDRExporter Attribute Context

This commit is contained in:
Trial97
2020-01-06 10:37:53 +02:00
parent 4c337ee467
commit 2d413dac26

View File

@@ -51,9 +51,10 @@ const (
META_FORMATCOST = "*format_cost"
)
// NewCDRExporter returns a new CDRExporter
func NewCDRExporter(cdrs []*CDR, exportTemplate *config.CdreCfg, exportFormat, exportPath, fallbackPath, exportID string,
synchronous bool, attempts int, fieldSeparator rune,
httpSkipTlsCheck bool, httpPoster *HTTPPoster, attrsConns []string, filterS *FilterS) (*CDRExporter, error) {
httpSkipTLSCheck bool, httpPoster *HTTPPoster, attrsConns []string, filterS *FilterS) (*CDRExporter, error) {
if len(cdrs) == 0 { // Nothing to export
return nil, nil
}
@@ -67,7 +68,7 @@ func NewCDRExporter(cdrs []*CDR, exportTemplate *config.CdreCfg, exportFormat, e
synchronous: synchronous,
attempts: attempts,
fieldSeparator: fieldSeparator,
httpSkipTlsCheck: httpSkipTlsCheck,
httpSkipTLSCheck: httpSkipTLSCheck,
httpPoster: httpPoster,
negativeExports: make(map[string]string),
attrsConns: attrsConns,
@@ -76,6 +77,7 @@ func NewCDRExporter(cdrs []*CDR, exportTemplate *config.CdreCfg, exportFormat, e
return cdre, nil
}
// CDRExporter used to export the CDRs
type CDRExporter struct {
sync.RWMutex
cdrs []*CDR
@@ -87,7 +89,7 @@ type CDRExporter struct {
synchronous bool
attempts int
fieldSeparator rune
httpSkipTlsCheck bool
httpSkipTLSCheck bool
httpPoster *HTTPPoster
header, trailer []string // Header and Trailer fields
@@ -98,7 +100,7 @@ type CDRExporter struct {
totalDuration, totalDataUsage, totalSmsUsage,
totalMmsUsage, totalGenericUsage time.Duration
totalCost float64
firstExpOrderId, lastExpOrderId int64
firstExpOrderID, lastExpOrderID int64
positiveExports []string // CGRIDs of successfully exported CDRs
negativeExports map[string]string // CGRIDs of failed exports
@@ -244,7 +246,7 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) {
}
body = jsn
case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap, utils.MetaKafkajsonMap, utils.MetaS3jsonMap:
expMp, err := cdr.AsExportMap(cdre.exportTemplate.ContentFields, cdre.httpSkipTlsCheck, nil, cdre.filterS)
expMp, err := cdr.AsExportMap(cdre.exportTemplate.ContentFields, cdre.httpSkipTLSCheck, nil, cdre.filterS)
if err != nil {
return err
}
@@ -254,7 +256,7 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) {
}
body = jsn
case utils.META_HTTP_POST:
expMp, err := cdr.AsExportMap(cdre.exportTemplate.ContentFields, cdre.httpSkipTlsCheck, nil, cdre.filterS)
expMp, err := cdr.AsExportMap(cdre.exportTemplate.ContentFields, cdre.httpSkipTLSCheck, nil, cdre.filterS)
if err != nil {
return err
}
@@ -304,9 +306,12 @@ func (cdre *CDRExporter) processCDR(cdr *CDR) (err error) {
if len(cdre.attrsConns) == 0 {
return errors.New("no connection to AttributeS")
}
cdrEv := cdr.AsCGREvent()
args := &AttrArgsProcessEvent{
Context: utils.StringPointer(cdre.exportTemplate.AttributeSContext),
CGREvent: cdr.AsCGREvent(),
Context: utils.StringPointer(utils.FirstNonEmpty(
utils.IfaceAsString(cdrEv.Event[utils.Context]),
cdre.exportTemplate.AttributeSContext)),
CGREvent: cdrEv,
}
var evReply AttrSProcessEventReply
if err = connMgr.Call(cdre.attrsConns, nil,
@@ -324,14 +329,13 @@ func (cdre *CDRExporter) processCDR(cdr *CDR) (err error) {
switch cdre.exportFormat {
case utils.MetaFileFWV, utils.MetaFileCSV:
var cdrRow []string
cdrRow, err = cdr.AsExportRecord(cdre.exportTemplate.ContentFields, cdre.httpSkipTlsCheck, cdre.cdrs, cdre.filterS)
cdrRow, err = cdr.AsExportRecord(cdre.exportTemplate.ContentFields, cdre.httpSkipTLSCheck, cdre.cdrs, cdre.filterS)
if len(cdrRow) == 0 && err == nil { // No CDR data, most likely no configuration fields defined
return
} else {
cdre.Lock()
cdre.content = append(cdre.content, cdrRow)
cdre.Unlock()
}
cdre.Lock()
cdre.content = append(cdre.content, cdrRow)
cdre.Unlock()
default: // attempt posting CDR
err = cdre.postCdr(cdr)
}
@@ -348,7 +352,7 @@ func (cdre *CDRExporter) processCDR(cdr *CDR) (err error) {
if cdr.AnswerTime.After(cdre.lastCdrATime) {
cdre.lastCdrATime = cdr.AnswerTime
}
cdre.numberOfRecords += 1
cdre.numberOfRecords++
if cdr.ToR == utils.VOICE { // Only count duration for non data cdrs
cdre.totalDuration += cdr.Usage
}
@@ -368,11 +372,11 @@ func (cdre *CDRExporter) processCDR(cdr *CDR) (err error) {
cdre.totalCost += cdr.Cost
cdre.totalCost = utils.Round(cdre.totalCost, globalRoundingDecimals, utils.ROUNDING_MIDDLE)
}
if cdre.firstExpOrderId > cdr.OrderID || cdre.firstExpOrderId == 0 {
cdre.firstExpOrderId = cdr.OrderID
if cdre.firstExpOrderID > cdr.OrderID || cdre.firstExpOrderID == 0 {
cdre.firstExpOrderID = cdr.OrderID
}
if cdre.lastExpOrderId < cdr.OrderID {
cdre.lastExpOrderId = cdr.OrderID
if cdre.lastExpOrderID < cdr.OrderID {
cdre.lastExpOrderID = cdr.OrderID
}
return nil
}
@@ -482,6 +486,7 @@ func (cdre *CDRExporter) writeCsv(csvWriter *csv.Writer) error {
return nil
}
// ExportCDRs exports the given CDRs
func (cdre *CDRExporter) ExportCDRs() (err error) {
if err = cdre.processCDRs(); err != nil {
return
@@ -520,33 +525,34 @@ func (cdre *CDRExporter) ExportCDRs() (err error) {
return
}
// Return the first exported Cdr OrderId
// FirstOrderId returns the first exported Cdr OrderId
func (cdre *CDRExporter) FirstOrderId() int64 {
return cdre.firstExpOrderId
return cdre.firstExpOrderID
}
// Return the last exported Cdr OrderId
// LastOrderId return the last exported Cdr OrderId
func (cdre *CDRExporter) LastOrderId() int64 {
return cdre.lastExpOrderId
return cdre.lastExpOrderID
}
// Return total cost in the exported cdrs
// TotalCost returns the total cost in the exported cdrs
func (cdre *CDRExporter) TotalCost() float64 {
return cdre.totalCost
}
// TotalExportedCdrs returns the number of exported CDRs
func (cdre *CDRExporter) TotalExportedCdrs() int {
return cdre.numberOfRecords
}
// Return successfully exported CGRIDs
// PositiveExports returns the successfully exported CGRIDs
func (cdre *CDRExporter) PositiveExports() []string {
cdre.RLock()
defer cdre.RUnlock()
return cdre.positiveExports
}
// Return failed exported CGRIDs together with the reason
// NegativeExports returns the failed exported CGRIDs together with the reason
func (cdre *CDRExporter) NegativeExports() map[string]string {
cdre.RLock()
defer cdre.RUnlock()