Merge pull request #1842 from Trial97/master

Updated CDRExporter Attribute Context
This commit is contained in:
Dan Christian Bogos
2020-01-06 11:56:28 +01:00
committed by GitHub
2 changed files with 54 additions and 26 deletions

View File

@@ -51,9 +51,10 @@ const (
META_FORMATCOST = "*format_cost"
)
// NewCDRExporter returns a new CDRExporter
func NewCDRExporter(cdrs []*CDR, exportTemplate *config.CdreCfg, exportFormat, exportPath, fallbackPath, exportID string,
synchronous bool, attempts int, fieldSeparator rune,
httpSkipTlsCheck bool, httpPoster *HTTPPoster, attrsConns []string, filterS *FilterS) (*CDRExporter, error) {
httpSkipTLSCheck bool, httpPoster *HTTPPoster, attrsConns []string, filterS *FilterS) (*CDRExporter, error) {
if len(cdrs) == 0 { // Nothing to export
return nil, nil
}
@@ -67,7 +68,7 @@ func NewCDRExporter(cdrs []*CDR, exportTemplate *config.CdreCfg, exportFormat, e
synchronous: synchronous,
attempts: attempts,
fieldSeparator: fieldSeparator,
httpSkipTlsCheck: httpSkipTlsCheck,
httpSkipTLSCheck: httpSkipTLSCheck,
httpPoster: httpPoster,
negativeExports: make(map[string]string),
attrsConns: attrsConns,
@@ -76,6 +77,7 @@ func NewCDRExporter(cdrs []*CDR, exportTemplate *config.CdreCfg, exportFormat, e
return cdre, nil
}
// CDRExporter used to export the CDRs
type CDRExporter struct {
sync.RWMutex
cdrs []*CDR
@@ -87,7 +89,7 @@ type CDRExporter struct {
synchronous bool
attempts int
fieldSeparator rune
httpSkipTlsCheck bool
httpSkipTLSCheck bool
httpPoster *HTTPPoster
header, trailer []string // Header and Trailer fields
@@ -98,7 +100,7 @@ type CDRExporter struct {
totalDuration, totalDataUsage, totalSmsUsage,
totalMmsUsage, totalGenericUsage time.Duration
totalCost float64
firstExpOrderId, lastExpOrderId int64
firstExpOrderID, lastExpOrderID int64
positiveExports []string // CGRIDs of successfully exported CDRs
negativeExports map[string]string // CGRIDs of failed exports
@@ -244,7 +246,7 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) {
}
body = jsn
case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap, utils.MetaKafkajsonMap, utils.MetaS3jsonMap:
expMp, err := cdr.AsExportMap(cdre.exportTemplate.ContentFields, cdre.httpSkipTlsCheck, nil, cdre.filterS)
expMp, err := cdr.AsExportMap(cdre.exportTemplate.ContentFields, cdre.httpSkipTLSCheck, nil, cdre.filterS)
if err != nil {
return err
}
@@ -254,7 +256,7 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) {
}
body = jsn
case utils.META_HTTP_POST:
expMp, err := cdr.AsExportMap(cdre.exportTemplate.ContentFields, cdre.httpSkipTlsCheck, nil, cdre.filterS)
expMp, err := cdr.AsExportMap(cdre.exportTemplate.ContentFields, cdre.httpSkipTLSCheck, nil, cdre.filterS)
if err != nil {
return err
}
@@ -304,9 +306,12 @@ func (cdre *CDRExporter) processCDR(cdr *CDR) (err error) {
if len(cdre.attrsConns) == 0 {
return errors.New("no connection to AttributeS")
}
cdrEv := cdr.AsCGREvent()
args := &AttrArgsProcessEvent{
Context: utils.StringPointer(cdre.exportTemplate.AttributeSContext),
CGREvent: cdr.AsCGREvent(),
Context: utils.StringPointer(utils.FirstNonEmpty(
utils.IfaceAsString(cdrEv.Event[utils.Context]),
cdre.exportTemplate.AttributeSContext)),
CGREvent: cdrEv,
}
var evReply AttrSProcessEventReply
if err = connMgr.Call(cdre.attrsConns, nil,
@@ -324,14 +329,13 @@ func (cdre *CDRExporter) processCDR(cdr *CDR) (err error) {
switch cdre.exportFormat {
case utils.MetaFileFWV, utils.MetaFileCSV:
var cdrRow []string
cdrRow, err = cdr.AsExportRecord(cdre.exportTemplate.ContentFields, cdre.httpSkipTlsCheck, cdre.cdrs, cdre.filterS)
cdrRow, err = cdr.AsExportRecord(cdre.exportTemplate.ContentFields, cdre.httpSkipTLSCheck, cdre.cdrs, cdre.filterS)
if len(cdrRow) == 0 && err == nil { // No CDR data, most likely no configuration fields defined
return
} else {
cdre.Lock()
cdre.content = append(cdre.content, cdrRow)
cdre.Unlock()
}
cdre.Lock()
cdre.content = append(cdre.content, cdrRow)
cdre.Unlock()
default: // attempt posting CDR
err = cdre.postCdr(cdr)
}
@@ -348,7 +352,7 @@ func (cdre *CDRExporter) processCDR(cdr *CDR) (err error) {
if cdr.AnswerTime.After(cdre.lastCdrATime) {
cdre.lastCdrATime = cdr.AnswerTime
}
cdre.numberOfRecords += 1
cdre.numberOfRecords++
if cdr.ToR == utils.VOICE { // Only count duration for non data cdrs
cdre.totalDuration += cdr.Usage
}
@@ -368,11 +372,11 @@ func (cdre *CDRExporter) processCDR(cdr *CDR) (err error) {
cdre.totalCost += cdr.Cost
cdre.totalCost = utils.Round(cdre.totalCost, globalRoundingDecimals, utils.ROUNDING_MIDDLE)
}
if cdre.firstExpOrderId > cdr.OrderID || cdre.firstExpOrderId == 0 {
cdre.firstExpOrderId = cdr.OrderID
if cdre.firstExpOrderID > cdr.OrderID || cdre.firstExpOrderID == 0 {
cdre.firstExpOrderID = cdr.OrderID
}
if cdre.lastExpOrderId < cdr.OrderID {
cdre.lastExpOrderId = cdr.OrderID
if cdre.lastExpOrderID < cdr.OrderID {
cdre.lastExpOrderID = cdr.OrderID
}
return nil
}
@@ -482,6 +486,7 @@ func (cdre *CDRExporter) writeCsv(csvWriter *csv.Writer) error {
return nil
}
// ExportCDRs exports the given CDRs
func (cdre *CDRExporter) ExportCDRs() (err error) {
if err = cdre.processCDRs(); err != nil {
return
@@ -520,33 +525,34 @@ func (cdre *CDRExporter) ExportCDRs() (err error) {
return
}
// Return the first exported Cdr OrderId
// FirstOrderId returns the first exported Cdr OrderId
func (cdre *CDRExporter) FirstOrderId() int64 {
return cdre.firstExpOrderId
return cdre.firstExpOrderID
}
// Return the last exported Cdr OrderId
// LastOrderId return the last exported Cdr OrderId
func (cdre *CDRExporter) LastOrderId() int64 {
return cdre.lastExpOrderId
return cdre.lastExpOrderID
}
// Return total cost in the exported cdrs
// TotalCost returns the total cost in the exported cdrs
func (cdre *CDRExporter) TotalCost() float64 {
return cdre.totalCost
}
// TotalExportedCdrs returns the number of exported CDRs
func (cdre *CDRExporter) TotalExportedCdrs() int {
return cdre.numberOfRecords
}
// Return successfully exported CGRIDs
// PositiveExports returns the successfully exported CGRIDs
func (cdre *CDRExporter) PositiveExports() []string {
cdre.RLock()
defer cdre.RUnlock()
return cdre.positiveExports
}
// Return failed exported CGRIDs together with the reason
// NegativeExports returns the failed exported CGRIDs together with the reason
func (cdre *CDRExporter) NegativeExports() map[string]string {
cdre.RLock()
defer cdre.RUnlock()

View File

@@ -58,7 +58,6 @@ func TestNewCsvReader(t *testing.T) {
if rcv, err := NewEventReader(cfg, 1, nil, nil, fltr, nil); err != nil {
t.Errorf("Expecting: <nil>, received: <%+v>", err)
} else if !reflect.DeepEqual(expected, rcv) {
t.Errorf("Expecting: <%+v>, received: <%+v>", expected, rcv)
}
}
@@ -84,3 +83,26 @@ func TestNewKafkaReader(t *testing.T) {
t.Errorf("Expecting: <%+v>, received: <%+v>", expected, rcv)
}
}
func TestNewSQLReader(t *testing.T) {
cfg, _ := config.NewDefaultCGRConfig()
fltr := &engine.FilterS{}
reader := cfg.ERsCfg().Readers[0]
reader.Type = utils.MetaSQL
reader.ID = "file_reader"
reader.ConcurrentReqs = -1
reader.SourcePath = "*mysql://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2"
cfg.ERsCfg().Readers = append(cfg.ERsCfg().Readers, reader)
if len(cfg.ERsCfg().Readers) != 2 {
t.Errorf("Expecting: <2>, received: <%+v>", len(cfg.ERsCfg().Readers))
}
expected, err := NewSQLEventReader(cfg, 1, nil, nil, fltr, nil)
if err != nil {
t.Errorf("Expecting: <nil>, received: <%+v>", err)
}
if rcv, err := NewEventReader(cfg, 1, nil, nil, fltr, nil); err != nil {
t.Errorf("Expecting: <nil>, received: <%+v>", err)
} else if !reflect.DeepEqual(expected, rcv) {
t.Errorf("Expecting: <%+v>, received: <%+v>", expected, rcv)
}
}