mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-19 22:28:45 +05:00
Merge branch 'master' of https://github.com/cgrates/cgrates
This commit is contained in:
@@ -590,7 +590,7 @@ func removeAccountAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Ac
|
||||
if err = ratingStorage.RemAccountActionPlans(accID, nil); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if err = ratingStorage.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{accID}, true); err != nil {
|
||||
if err = ratingStorage.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{accID}, true); err != nil && err.Error() != utils.ErrNotFound.Error() {
|
||||
return 0, err
|
||||
}
|
||||
return 0, nil
|
||||
|
||||
@@ -792,6 +792,11 @@ func (cdr *CDR) formatField(cfgFld *config.CfgCdrField, httpSkipTlsCheck bool, g
|
||||
|
||||
}
|
||||
|
||||
// Part of event interface
|
||||
func (cdr *CDR) AsMapStringIface() (map[string]interface{}, error) {
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
// Used in place where we need to export the CDR based on an export template
|
||||
// ExportRecord is a []string to keep it compatible with encoding/csv Writer
|
||||
func (cdr *CDR) AsExportRecord(exportFields []*config.CfgCdrField, httpSkipTlsCheck bool, groupedCDRs []*CDR, roundingDecs int) (expRecord []string, err error) {
|
||||
@@ -812,16 +817,17 @@ func (cdr *CDR) AsExportRecord(exportFields []*config.CfgCdrField, httpSkipTlsCh
|
||||
return expRecord, nil
|
||||
}
|
||||
|
||||
// Part of event interface
|
||||
func (cdr *CDR) AsMapStringIface() (map[string]interface{}, error) {
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
// AsExportMap converts the CDR into a map[string]string based on export template
|
||||
// Used in real-time replication as well as remote exports
|
||||
func (cdr *CDR) AsExportMap(exportFields []*config.CfgCdrField, httpSkipTlsCheck bool, groupedCDRs []*CDR) (expMap map[string]string, err error) {
|
||||
func (cdr *CDR) AsExportMap(exportFields []*config.CfgCdrField, httpSkipTlsCheck bool, groupedCDRs []*CDR, roundingDecs int) (expMap map[string]string, err error) {
|
||||
expMap = make(map[string]string)
|
||||
for _, cfgFld := range exportFields {
|
||||
if roundingDecs != 0 {
|
||||
clnFld := new(config.CfgCdrField) // Clone so we can modify the rounding decimals without affecting the template
|
||||
*clnFld = *cfgFld
|
||||
clnFld.RoundingDecimals = roundingDecs
|
||||
cfgFld = clnFld
|
||||
}
|
||||
if fmtOut, err := cdr.formatField(cfgFld, httpSkipTlsCheck, groupedCDRs); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
|
||||
@@ -579,7 +579,7 @@ func TestCDRAsExportMap(t *testing.T) {
|
||||
&config.CfgCdrField{FieldId: utils.DESTINATION, Type: utils.META_COMPOSED, Value: utils.ParseRSRFieldsMustCompile("~Destination:s/^\\+(\\d+)$/00${1}/", utils.INFIELD_SEP)},
|
||||
&config.CfgCdrField{FieldId: "FieldExtra1", Type: utils.META_COMPOSED, Value: utils.ParseRSRFieldsMustCompile("field_extr1", utils.INFIELD_SEP)},
|
||||
}
|
||||
if cdrMp, err := cdr.AsExportMap(expFlds, false, nil); err != nil {
|
||||
if cdrMp, err := cdr.AsExportMap(expFlds, false, nil, 0); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eCDRMp, cdrMp) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", eCDRMp, cdrMp)
|
||||
|
||||
493
engine/cdre.go
Normal file
493
engine/cdre.go
Normal file
@@ -0,0 +1,493 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package engine
|
||||
|
||||
import (
|
||||
"encoding/csv"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
const (
|
||||
META_EXPORTID = "*export_id"
|
||||
META_TIMENOW = "*time_now"
|
||||
META_FIRSTCDRATIME = "*first_cdr_atime"
|
||||
META_LASTCDRATIME = "*last_cdr_atime"
|
||||
META_NRCDRS = "*cdrs_number"
|
||||
META_DURCDRS = "*cdrs_duration"
|
||||
META_SMSUSAGE = "*sms_usage"
|
||||
META_MMSUSAGE = "*mms_usage"
|
||||
META_GENERICUSAGE = "*generic_usage"
|
||||
META_DATAUSAGE = "*data_usage"
|
||||
META_COSTCDRS = "*cdrs_cost"
|
||||
META_FORMATCOST = "*format_cost"
|
||||
)
|
||||
|
||||
func NewCDRExporter(cdrs []*CDR, exportTemplate *config.CdreConfig, exportFormat, exportPath, fallbackPath, exportID string,
|
||||
synchronous bool, attempts int, fieldSeparator rune, usageMultiplyFactor utils.FieldMultiplyFactor,
|
||||
costMultiplyFactor float64, roundingDecimals int, httpSkipTlsCheck bool, httpPoster *utils.HTTPPoster) (*CDRExporter, error) {
|
||||
if len(cdrs) == 0 { // Nothing to export
|
||||
return nil, nil
|
||||
}
|
||||
cdre := &CDRExporter{
|
||||
cdrs: cdrs,
|
||||
exportTemplate: exportTemplate,
|
||||
exportFormat: exportFormat,
|
||||
exportPath: exportPath,
|
||||
fallbackPath: fallbackPath,
|
||||
exportID: exportID,
|
||||
synchronous: synchronous,
|
||||
attempts: attempts,
|
||||
fieldSeparator: fieldSeparator,
|
||||
usageMultiplyFactor: usageMultiplyFactor,
|
||||
costMultiplyFactor: costMultiplyFactor,
|
||||
roundingDecimals: roundingDecimals,
|
||||
httpSkipTlsCheck: httpSkipTlsCheck,
|
||||
httpPoster: httpPoster,
|
||||
negativeExports: make(map[string]string),
|
||||
}
|
||||
return cdre, nil
|
||||
}
|
||||
|
||||
type CDRExporter struct {
|
||||
sync.RWMutex
|
||||
cdrs []*CDR
|
||||
exportTemplate *config.CdreConfig
|
||||
exportFormat string
|
||||
exportPath string
|
||||
fallbackPath string // folder where we save failed CDRs
|
||||
exportID string // Unique identifier or this export
|
||||
synchronous bool
|
||||
attempts int
|
||||
fieldSeparator rune
|
||||
usageMultiplyFactor utils.FieldMultiplyFactor
|
||||
costMultiplyFactor float64
|
||||
roundingDecimals int
|
||||
httpSkipTlsCheck bool
|
||||
httpPoster *utils.HTTPPoster
|
||||
|
||||
header, trailer []string // Header and Trailer fields
|
||||
content [][]string // Rows of cdr fields
|
||||
|
||||
firstCdrATime, lastCdrATime time.Time
|
||||
numberOfRecords int
|
||||
totalDuration, totalDataUsage, totalSmsUsage,
|
||||
totalMmsUsage, totalGenericUsage time.Duration
|
||||
totalCost float64
|
||||
firstExpOrderId, lastExpOrderId int64
|
||||
positiveExports []string // CGRIDs of successfully exported CDRs
|
||||
negativeExports map[string]string // CGRIDs of failed exports
|
||||
}
|
||||
|
||||
// Handle various meta functions used in header/trailer
|
||||
func (cdre *CDRExporter) metaHandler(tag, arg string) (string, error) {
|
||||
switch tag {
|
||||
case META_EXPORTID:
|
||||
return cdre.exportID, nil
|
||||
case META_TIMENOW:
|
||||
return time.Now().Format(arg), nil
|
||||
case META_FIRSTCDRATIME:
|
||||
return cdre.firstCdrATime.Format(arg), nil
|
||||
case META_LASTCDRATIME:
|
||||
return cdre.lastCdrATime.Format(arg), nil
|
||||
case META_NRCDRS:
|
||||
return strconv.Itoa(cdre.numberOfRecords), nil
|
||||
case META_DURCDRS:
|
||||
emulatedCdr := &CDR{ToR: utils.VOICE, Usage: cdre.totalDuration}
|
||||
return emulatedCdr.FormatUsage(arg), nil
|
||||
case META_SMSUSAGE:
|
||||
emulatedCdr := &CDR{ToR: utils.SMS, Usage: cdre.totalSmsUsage}
|
||||
return emulatedCdr.FormatUsage(arg), nil
|
||||
case META_MMSUSAGE:
|
||||
emulatedCdr := &CDR{ToR: utils.MMS, Usage: cdre.totalMmsUsage}
|
||||
return emulatedCdr.FormatUsage(arg), nil
|
||||
case META_GENERICUSAGE:
|
||||
emulatedCdr := &CDR{ToR: utils.GENERIC, Usage: cdre.totalGenericUsage}
|
||||
return emulatedCdr.FormatUsage(arg), nil
|
||||
case META_DATAUSAGE:
|
||||
emulatedCdr := &CDR{ToR: utils.DATA, Usage: cdre.totalDataUsage}
|
||||
return emulatedCdr.FormatUsage(arg), nil
|
||||
case META_COSTCDRS:
|
||||
return strconv.FormatFloat(utils.Round(cdre.totalCost, cdre.roundingDecimals, utils.ROUNDING_MIDDLE), 'f', -1, 64), nil
|
||||
default:
|
||||
return "", fmt.Errorf("Unsupported METATAG: %s", tag)
|
||||
}
|
||||
}
|
||||
|
||||
// Compose and cache the header
|
||||
func (cdre *CDRExporter) composeHeader() (err error) {
|
||||
for _, cfgFld := range cdre.exportTemplate.HeaderFields {
|
||||
var outVal string
|
||||
switch cfgFld.Type {
|
||||
case utils.META_FILLER:
|
||||
outVal = cfgFld.Value.Id()
|
||||
cfgFld.Padding = "right"
|
||||
case utils.META_CONSTANT:
|
||||
outVal = cfgFld.Value.Id()
|
||||
case utils.META_HANDLER:
|
||||
outVal, err = cdre.metaHandler(cfgFld.Value.Id(), cfgFld.Layout)
|
||||
default:
|
||||
return fmt.Errorf("Unsupported field type: %s", cfgFld.Type)
|
||||
}
|
||||
if err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<CdreFw> Cannot export CDR header, field %s, error: %s", cfgFld.Tag, err.Error()))
|
||||
return err
|
||||
}
|
||||
fmtOut := outVal
|
||||
if fmtOut, err = utils.FmtFieldWidth(outVal, cfgFld.Width, cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<CdreFw> Cannot export CDR header, field %s, error: %s", cfgFld.Tag, err.Error()))
|
||||
return err
|
||||
}
|
||||
cdre.Lock()
|
||||
cdre.header = append(cdre.header, fmtOut)
|
||||
cdre.Unlock()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Compose and cache the trailer
|
||||
func (cdre *CDRExporter) composeTrailer() (err error) {
|
||||
for _, cfgFld := range cdre.exportTemplate.TrailerFields {
|
||||
var outVal string
|
||||
switch cfgFld.Type {
|
||||
case utils.META_FILLER:
|
||||
outVal = cfgFld.Value.Id()
|
||||
cfgFld.Padding = "right"
|
||||
case utils.META_CONSTANT:
|
||||
outVal = cfgFld.Value.Id()
|
||||
case utils.META_HANDLER:
|
||||
outVal, err = cdre.metaHandler(cfgFld.Value.Id(), cfgFld.Layout)
|
||||
default:
|
||||
return fmt.Errorf("Unsupported field type: %s", cfgFld.Type)
|
||||
}
|
||||
if err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<CdreFw> Cannot export CDR trailer, field: %s, error: %s", cfgFld.Tag, err.Error()))
|
||||
return err
|
||||
}
|
||||
fmtOut := outVal
|
||||
if fmtOut, err = utils.FmtFieldWidth(outVal, cfgFld.Width, cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<CdreFw> Cannot export CDR trailer, field: %s, error: %s", cfgFld.Tag, err.Error()))
|
||||
return err
|
||||
}
|
||||
cdre.Lock()
|
||||
cdre.trailer = append(cdre.trailer, fmtOut)
|
||||
cdre.Unlock()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) {
|
||||
var body interface{}
|
||||
switch cdre.exportFormat {
|
||||
case utils.MetaHTTPjsonCDR, utils.MetaAMQPjsonCDR:
|
||||
jsn, err := json.Marshal(cdr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
body = jsn
|
||||
case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap:
|
||||
expMp, err := cdr.AsExportMap(cdre.exportTemplate.ContentFields, cdre.httpSkipTlsCheck, nil, cdre.roundingDecimals)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
jsn, err := json.Marshal(expMp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
body = jsn
|
||||
case utils.META_HTTP_POST:
|
||||
expMp, err := cdr.AsExportMap(cdre.exportTemplate.ContentFields, cdre.httpSkipTlsCheck, nil, cdre.roundingDecimals)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
vals := url.Values{}
|
||||
for fld, val := range expMp {
|
||||
vals.Set(fld, val)
|
||||
}
|
||||
body = vals
|
||||
default:
|
||||
err = fmt.Errorf("unsupported exportFormat: <%s>", cdre.exportFormat)
|
||||
}
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// compute fallbackPath
|
||||
fallbackPath := utils.META_NONE
|
||||
ffn := &utils.FallbackFileName{Module: utils.CDRPoster, Transport: cdre.exportFormat, Address: cdre.exportPath, RequestID: utils.GenUUID()}
|
||||
fallbackFileName := ffn.AsString()
|
||||
if cdre.fallbackPath != utils.META_NONE { // not none, need fallback
|
||||
fallbackPath = path.Join(cdre.fallbackPath, fallbackFileName)
|
||||
}
|
||||
switch cdre.exportFormat {
|
||||
case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST:
|
||||
_, err = cdre.httpPoster.Post(cdre.exportPath, utils.PosterTransportContentTypes[cdre.exportFormat], body, cdre.attempts, fallbackPath)
|
||||
case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap:
|
||||
var amqpPoster *utils.AMQPPoster
|
||||
amqpPoster, err = utils.AMQPPostersCache.GetAMQPPoster(cdre.exportPath, cdre.attempts, cdre.fallbackPath)
|
||||
if err == nil { // error will be checked bellow
|
||||
var chn *amqp.Channel
|
||||
chn, err = amqpPoster.Post(
|
||||
nil, utils.PosterTransportContentTypes[cdre.exportFormat], body.([]byte), fallbackFileName)
|
||||
if chn != nil {
|
||||
chn.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Write individual cdr into content buffer, build stats
|
||||
func (cdre *CDRExporter) processCDR(cdr *CDR) (err error) {
|
||||
if cdr.ExtraFields == nil { // Avoid assignment in nil map if not initialized
|
||||
cdr.ExtraFields = make(map[string]string)
|
||||
}
|
||||
// Usage multiply, find config based on ToR field or *any
|
||||
for _, key := range []string{cdr.ToR, utils.ANY} {
|
||||
if uM, hasIt := cdre.usageMultiplyFactor[key]; hasIt && uM != 1.0 {
|
||||
cdr.UsageMultiply(uM, cdre.roundingDecimals)
|
||||
break
|
||||
}
|
||||
}
|
||||
if cdre.costMultiplyFactor != 0.0 {
|
||||
cdr.CostMultiply(cdre.costMultiplyFactor, cdre.roundingDecimals)
|
||||
}
|
||||
switch cdre.exportFormat {
|
||||
case utils.MetaFileFWV, utils.MetaFileCSV:
|
||||
var cdrRow []string
|
||||
cdrRow, err = cdr.AsExportRecord(cdre.exportTemplate.ContentFields, cdre.httpSkipTlsCheck, cdre.cdrs, cdre.roundingDecimals)
|
||||
if len(cdrRow) == 0 { // No CDR data, most likely no configuration fields defined
|
||||
return
|
||||
} else {
|
||||
cdre.Lock()
|
||||
cdre.content = append(cdre.content, cdrRow)
|
||||
cdre.Unlock()
|
||||
}
|
||||
default: // attempt posting CDR
|
||||
err = cdre.postCdr(cdr)
|
||||
}
|
||||
if err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<CDRE> Cannot export CDR with CGRID: %s and runid: %s, error: %s", cdr.CGRID, cdr.RunID, err.Error()))
|
||||
return
|
||||
}
|
||||
// Done with writing content, compute stats here
|
||||
if cdre.firstCdrATime.IsZero() || cdr.AnswerTime.Before(cdre.firstCdrATime) {
|
||||
cdre.firstCdrATime = cdr.AnswerTime
|
||||
}
|
||||
if cdr.AnswerTime.After(cdre.lastCdrATime) {
|
||||
cdre.lastCdrATime = cdr.AnswerTime
|
||||
}
|
||||
cdre.numberOfRecords += 1
|
||||
if cdr.ToR == utils.VOICE { // Only count duration for non data cdrs
|
||||
cdre.totalDuration += cdr.Usage
|
||||
}
|
||||
if cdr.ToR == utils.SMS { // Count usage for SMS
|
||||
cdre.totalSmsUsage += cdr.Usage
|
||||
}
|
||||
if cdr.ToR == utils.MMS { // Count usage for MMS
|
||||
cdre.totalMmsUsage += cdr.Usage
|
||||
}
|
||||
if cdr.ToR == utils.GENERIC { // Count usage for GENERIC
|
||||
cdre.totalGenericUsage += cdr.Usage
|
||||
}
|
||||
if cdr.ToR == utils.DATA { // Count usage for DATA
|
||||
cdre.totalDataUsage += cdr.Usage
|
||||
}
|
||||
if cdr.Cost != -1 {
|
||||
cdre.totalCost += cdr.Cost
|
||||
cdre.totalCost = utils.Round(cdre.totalCost, cdre.roundingDecimals, utils.ROUNDING_MIDDLE)
|
||||
}
|
||||
if cdre.firstExpOrderId > cdr.OrderID || cdre.firstExpOrderId == 0 {
|
||||
cdre.firstExpOrderId = cdr.OrderID
|
||||
}
|
||||
if cdre.lastExpOrderId < cdr.OrderID {
|
||||
cdre.lastExpOrderId = cdr.OrderID
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Builds header, content and trailers
|
||||
func (cdre *CDRExporter) processCDRs() (err error) {
|
||||
var wg sync.WaitGroup
|
||||
for _, cdr := range cdre.cdrs {
|
||||
if cdr == nil || len(cdr.CGRID) == 0 { // CDR needs to exist and it's CGRID needs to be populated
|
||||
continue
|
||||
}
|
||||
passesFilters := true
|
||||
for _, cdrFltr := range cdre.exportTemplate.CDRFilter {
|
||||
if !cdrFltr.FilterPasses(cdr.FieldAsString(cdrFltr)) {
|
||||
passesFilters = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if !passesFilters { // Not passes filters, ignore this CDR
|
||||
continue
|
||||
}
|
||||
if cdre.synchronous ||
|
||||
utils.IsSliceMember([]string{utils.MetaFileCSV, utils.MetaFileFWV}, cdre.exportFormat) {
|
||||
wg.Add(1) // wait for synchronous or file ones since these need to be done before continuing
|
||||
}
|
||||
go func(cdr *CDR) {
|
||||
if err := cdre.processCDR(cdr); err != nil {
|
||||
cdre.Lock()
|
||||
cdre.negativeExports[cdr.CGRID] = err.Error()
|
||||
cdre.Unlock()
|
||||
} else {
|
||||
cdre.Lock()
|
||||
cdre.positiveExports = append(cdre.positiveExports, cdr.CGRID)
|
||||
cdre.Unlock()
|
||||
}
|
||||
if cdre.synchronous ||
|
||||
utils.IsSliceMember([]string{utils.MetaFileCSV, utils.MetaFileFWV}, cdre.exportFormat) {
|
||||
wg.Done()
|
||||
}
|
||||
}(cdr)
|
||||
}
|
||||
wg.Wait()
|
||||
// Process header and trailer after processing cdrs since the metatag functions can access stats out of built cdrs
|
||||
if cdre.exportTemplate.HeaderFields != nil {
|
||||
if err = cdre.composeHeader(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
if cdre.exportTemplate.TrailerFields != nil {
|
||||
if err = cdre.composeTrailer(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Simple write method
|
||||
func (cdre *CDRExporter) writeOut(ioWriter io.Writer) error {
|
||||
cdre.Lock()
|
||||
defer cdre.Unlock()
|
||||
if len(cdre.header) != 0 {
|
||||
for _, fld := range append(cdre.header, "\n") {
|
||||
if _, err := io.WriteString(ioWriter, fld); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, cdrContent := range cdre.content {
|
||||
for _, cdrFld := range append(cdrContent, "\n") {
|
||||
if _, err := io.WriteString(ioWriter, cdrFld); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(cdre.trailer) != 0 {
|
||||
for _, fld := range append(cdre.trailer, "\n") {
|
||||
if _, err := io.WriteString(ioWriter, fld); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// csvWriter specific method
|
||||
func (cdre *CDRExporter) writeCsv(csvWriter *csv.Writer) error {
|
||||
csvWriter.Comma = cdre.fieldSeparator
|
||||
cdre.RLock()
|
||||
defer cdre.RUnlock()
|
||||
if len(cdre.header) != 0 {
|
||||
if err := csvWriter.Write(cdre.header); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, cdrContent := range cdre.content {
|
||||
if err := csvWriter.Write(cdrContent); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if len(cdre.trailer) != 0 {
|
||||
if err := csvWriter.Write(cdre.trailer); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
csvWriter.Flush()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cdre *CDRExporter) ExportCDRs() (err error) {
|
||||
if err = cdre.processCDRs(); err != nil {
|
||||
return
|
||||
}
|
||||
if utils.IsSliceMember([]string{utils.MetaFileCSV, utils.MetaFileFWV}, cdre.exportFormat) { // files are written after processing all CDRs
|
||||
cdre.RLock()
|
||||
contLen := len(cdre.content)
|
||||
cdre.RUnlock()
|
||||
if contLen == 0 {
|
||||
return
|
||||
}
|
||||
fileOut, err := os.Create(cdre.exportPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer fileOut.Close()
|
||||
if cdre.exportFormat == utils.MetaFileCSV {
|
||||
return cdre.writeCsv(csv.NewWriter(fileOut))
|
||||
}
|
||||
return cdre.writeOut(fileOut)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Return the first exported Cdr OrderId
|
||||
func (cdre *CDRExporter) FirstOrderId() int64 {
|
||||
return cdre.firstExpOrderId
|
||||
}
|
||||
|
||||
// Return the last exported Cdr OrderId
|
||||
func (cdre *CDRExporter) LastOrderId() int64 {
|
||||
return cdre.lastExpOrderId
|
||||
}
|
||||
|
||||
// Return total cost in the exported cdrs
|
||||
func (cdre *CDRExporter) TotalCost() float64 {
|
||||
return cdre.totalCost
|
||||
}
|
||||
|
||||
func (cdre *CDRExporter) TotalExportedCdrs() int {
|
||||
return cdre.numberOfRecords
|
||||
}
|
||||
|
||||
// Return successfully exported CGRIDs
|
||||
func (cdre *CDRExporter) PositiveExports() []string {
|
||||
cdre.RLock()
|
||||
defer cdre.RUnlock()
|
||||
return cdre.positiveExports
|
||||
}
|
||||
|
||||
// Return failed exported CGRIDs together with the reason
|
||||
func (cdre *CDRExporter) NegativeExports() map[string]string {
|
||||
cdre.RLock()
|
||||
defer cdre.RUnlock()
|
||||
return cdre.negativeExports
|
||||
}
|
||||
92
engine/cdrecsv_test.go
Normal file
92
engine/cdrecsv_test.go
Normal file
@@ -0,0 +1,92 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package engine
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/csv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func TestCsvCdrWriter(t *testing.T) {
|
||||
writer := &bytes.Buffer{}
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
storedCdr1 := &CDR{
|
||||
CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()), ToR: utils.VOICE, OriginID: "dsafdsaf", OriginHost: "192.168.1.1",
|
||||
RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org",
|
||||
Category: "call", Account: "1001", Subject: "1001", Destination: "1002", SetupTime: time.Unix(1383813745, 0).UTC(), AnswerTime: time.Unix(1383813746, 0).UTC(),
|
||||
Usage: time.Duration(10) * time.Second, RunID: utils.DEFAULT_RUNID,
|
||||
ExtraFields: map[string]string{"extra1": "val_extra1", "extra2": "val_extra2", "extra3": "val_extra3"}, Cost: 1.01,
|
||||
}
|
||||
cdre, err := NewCDRExporter([]*CDR{storedCdr1}, cfg.CdreProfiles["*default"], utils.MetaFileCSV, "", "", "firstexport",
|
||||
true, 1, ',', map[string]float64{}, 0.0, cfg.RoundingDecimals, cfg.HttpSkipTlsVerify, nil)
|
||||
if err != nil {
|
||||
t.Error("Unexpected error received: ", err)
|
||||
}
|
||||
if err = cdre.processCDRs(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
csvWriter := csv.NewWriter(writer)
|
||||
if err := cdre.writeCsv(csvWriter); err != nil {
|
||||
t.Error("Unexpected error: ", err)
|
||||
}
|
||||
expected := `dbafe9c8614c785a65aabd116dd3959c3c56f7f6,*default,*voice,dsafdsaf,*rated,*out,cgrates.org,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10,1.01000`
|
||||
result := strings.TrimSpace(writer.String())
|
||||
if result != expected {
|
||||
t.Errorf("Expected: \n%s received: \n%s.", expected, result)
|
||||
}
|
||||
if cdre.TotalCost() != 1.01 {
|
||||
t.Error("Unexpected TotalCost: ", cdre.TotalCost())
|
||||
}
|
||||
}
|
||||
|
||||
func TestAlternativeFieldSeparator(t *testing.T) {
|
||||
writer := &bytes.Buffer{}
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
storedCdr1 := &CDR{CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()), ToR: utils.VOICE, OriginID: "dsafdsaf", OriginHost: "192.168.1.1",
|
||||
RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org",
|
||||
Category: "call", Account: "1001", Subject: "1001", Destination: "1002", SetupTime: time.Unix(1383813745, 0).UTC(), AnswerTime: time.Unix(1383813746, 0).UTC(),
|
||||
Usage: time.Duration(10) * time.Second, RunID: utils.DEFAULT_RUNID,
|
||||
ExtraFields: map[string]string{"extra1": "val_extra1", "extra2": "val_extra2", "extra3": "val_extra3"}, Cost: 1.01,
|
||||
}
|
||||
cdre, err := NewCDRExporter([]*CDR{storedCdr1}, cfg.CdreProfiles["*default"], utils.MetaFileCSV, "", "", "firstexport",
|
||||
true, 1, '|', map[string]float64{}, 0.0, cfg.RoundingDecimals, cfg.HttpSkipTlsVerify, nil)
|
||||
if err != nil {
|
||||
t.Error("Unexpected error received: ", err)
|
||||
}
|
||||
if err = cdre.processCDRs(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
csvWriter := csv.NewWriter(writer)
|
||||
if err := cdre.writeCsv(csvWriter); err != nil {
|
||||
t.Error("Unexpected error: ", err)
|
||||
}
|
||||
expected := `dbafe9c8614c785a65aabd116dd3959c3c56f7f6|*default|*voice|dsafdsaf|*rated|*out|cgrates.org|call|1001|1001|1002|2013-11-07T08:42:25Z|2013-11-07T08:42:26Z|10|1.01000`
|
||||
result := strings.TrimSpace(writer.String())
|
||||
if result != expected {
|
||||
t.Errorf("Expected: \n%s received: \n%s.", expected, result)
|
||||
}
|
||||
if cdre.TotalCost() != 1.01 {
|
||||
t.Error("Unexpected TotalCost: ", cdre.TotalCost())
|
||||
}
|
||||
}
|
||||
245
engine/cdrefwv_test.go
Normal file
245
engine/cdrefwv_test.go
Normal file
@@ -0,0 +1,245 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package engine
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
var hdrJsnCfgFlds = []*config.CdrFieldJsonCfg{
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("TypeOfRecord"), Type: utils.StringPointer(utils.META_CONSTANT), Value: utils.StringPointer("10"), Width: utils.IntPointer(2)},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("Filler1"), Type: utils.StringPointer(utils.META_FILLER), Width: utils.IntPointer(3)},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("DistributorCode"), Type: utils.StringPointer(utils.META_CONSTANT), Value: utils.StringPointer("VOI"), Width: utils.IntPointer(3)},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("FileSeqNr"), Type: utils.StringPointer(utils.META_HANDLER), Value: utils.StringPointer(META_EXPORTID),
|
||||
Width: utils.IntPointer(5), Strip: utils.StringPointer("right"), Padding: utils.StringPointer("zeroleft")},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("LastCdr"), Type: utils.StringPointer(utils.META_HANDLER), Value: utils.StringPointer(META_LASTCDRATIME),
|
||||
Width: utils.IntPointer(12), Layout: utils.StringPointer("020106150400")},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("FileCreationfTime"), Type: utils.StringPointer(utils.META_HANDLER), Value: utils.StringPointer(META_TIMENOW),
|
||||
Width: utils.IntPointer(12), Layout: utils.StringPointer("020106150400")},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("FileVersion"), Type: utils.StringPointer(utils.META_CONSTANT), Value: utils.StringPointer("01"), Width: utils.IntPointer(2)},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("Filler2"), Type: utils.StringPointer(utils.META_FILLER), Width: utils.IntPointer(105)},
|
||||
}
|
||||
|
||||
var contentJsnCfgFlds = []*config.CdrFieldJsonCfg{
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("TypeOfRecord"), Type: utils.StringPointer(utils.META_CONSTANT), Value: utils.StringPointer("20"), Width: utils.IntPointer(2)},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("Account"), Type: utils.StringPointer(utils.META_COMPOSED), Value: utils.StringPointer(utils.ACCOUNT), Width: utils.IntPointer(12),
|
||||
Strip: utils.StringPointer("left"), Padding: utils.StringPointer("right")},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("Subject"), Type: utils.StringPointer(utils.META_COMPOSED), Value: utils.StringPointer(utils.SUBJECT), Width: utils.IntPointer(5),
|
||||
Strip: utils.StringPointer("right"), Padding: utils.StringPointer("right")},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("CLI"), Type: utils.StringPointer(utils.META_COMPOSED), Value: utils.StringPointer("cli"), Width: utils.IntPointer(15),
|
||||
Strip: utils.StringPointer("xright"), Padding: utils.StringPointer("right")},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("Destination"), Type: utils.StringPointer(utils.META_COMPOSED), Value: utils.StringPointer(utils.DESTINATION), Width: utils.IntPointer(24),
|
||||
Strip: utils.StringPointer("xright"), Padding: utils.StringPointer("right")},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("TOR"), Type: utils.StringPointer(utils.META_CONSTANT), Value: utils.StringPointer("02"), Width: utils.IntPointer(2)},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("SubtypeTOR"), Type: utils.StringPointer(utils.META_CONSTANT), Value: utils.StringPointer("11"), Width: utils.IntPointer(4),
|
||||
Padding: utils.StringPointer("right")},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("SetupTime"), Type: utils.StringPointer(utils.META_COMPOSED), Value: utils.StringPointer(utils.SETUP_TIME), Width: utils.IntPointer(12),
|
||||
Strip: utils.StringPointer("right"), Padding: utils.StringPointer("right"), Layout: utils.StringPointer("020106150400")},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("Duration"), Type: utils.StringPointer(utils.META_COMPOSED), Value: utils.StringPointer(utils.USAGE), Width: utils.IntPointer(6),
|
||||
Strip: utils.StringPointer("right"), Padding: utils.StringPointer("right"), Layout: utils.StringPointer(utils.SECONDS)},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("DataVolume"), Type: utils.StringPointer(utils.META_FILLER), Width: utils.IntPointer(6)},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("TaxCode"), Type: utils.StringPointer(utils.META_CONSTANT), Value: utils.StringPointer("1"), Width: utils.IntPointer(1)},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("OperatorCode"), Type: utils.StringPointer(utils.META_COMPOSED), Value: utils.StringPointer("opercode"), Width: utils.IntPointer(2),
|
||||
Strip: utils.StringPointer("right"), Padding: utils.StringPointer("right")},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("ProductId"), Type: utils.StringPointer(utils.META_COMPOSED), Value: utils.StringPointer("productid"), Width: utils.IntPointer(5),
|
||||
Strip: utils.StringPointer("right"), Padding: utils.StringPointer("right")},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("NetworkId"), Type: utils.StringPointer(utils.META_CONSTANT), Value: utils.StringPointer("3"), Width: utils.IntPointer(1)},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("CallId"), Type: utils.StringPointer(utils.META_COMPOSED), Value: utils.StringPointer(utils.ACCID), Width: utils.IntPointer(16),
|
||||
Padding: utils.StringPointer("right")},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("Filler"), Type: utils.StringPointer(utils.META_FILLER), Width: utils.IntPointer(8)},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("Filler"), Type: utils.StringPointer(utils.META_FILLER), Width: utils.IntPointer(8)},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("TerminationCode"), Type: utils.StringPointer(utils.META_COMPOSED), Value: utils.StringPointer("operator;product"),
|
||||
Width: utils.IntPointer(5), Strip: utils.StringPointer("right"), Padding: utils.StringPointer("right")},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("Cost"), Type: utils.StringPointer(utils.META_COMPOSED), Value: utils.StringPointer(utils.COST), Width: utils.IntPointer(9),
|
||||
Padding: utils.StringPointer("zeroleft")},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("DestinationPrivacy"), Type: utils.StringPointer(utils.MetaMaskedDestination), Width: utils.IntPointer(1)},
|
||||
}
|
||||
|
||||
var trailerJsnCfgFlds = []*config.CdrFieldJsonCfg{
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("TypeOfRecord"), Type: utils.StringPointer(utils.META_CONSTANT), Value: utils.StringPointer("90"), Width: utils.IntPointer(2)},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("Filler1"), Type: utils.StringPointer(utils.META_FILLER), Width: utils.IntPointer(3)},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("DistributorCode"), Type: utils.StringPointer(utils.META_CONSTANT), Value: utils.StringPointer("VOI"), Width: utils.IntPointer(3)},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("FileSeqNr"), Type: utils.StringPointer(utils.META_HANDLER), Value: utils.StringPointer(META_EXPORTID), Width: utils.IntPointer(5),
|
||||
Strip: utils.StringPointer("right"), Padding: utils.StringPointer("zeroleft")},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("NumberOfRecords"), Type: utils.StringPointer(utils.META_HANDLER), Value: utils.StringPointer(META_NRCDRS),
|
||||
Width: utils.IntPointer(6), Padding: utils.StringPointer("zeroleft")},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("CdrsDuration"), Type: utils.StringPointer(utils.META_HANDLER), Value: utils.StringPointer(META_DURCDRS),
|
||||
Width: utils.IntPointer(8), Padding: utils.StringPointer("zeroleft"), Layout: utils.StringPointer(utils.SECONDS)},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("FirstCdrTime"), Type: utils.StringPointer(utils.META_HANDLER), Value: utils.StringPointer(META_FIRSTCDRATIME),
|
||||
Width: utils.IntPointer(12), Layout: utils.StringPointer("020106150400")},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("LastCdrTime"), Type: utils.StringPointer(utils.META_HANDLER), Value: utils.StringPointer(META_LASTCDRATIME),
|
||||
Width: utils.IntPointer(12), Layout: utils.StringPointer("020106150400")},
|
||||
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("Filler2"), Type: utils.StringPointer(utils.META_FILLER), Width: utils.IntPointer(93)},
|
||||
}
|
||||
|
||||
var hdrCfgFlds, contentCfgFlds, trailerCfgFlds []*config.CfgCdrField
|
||||
|
||||
// Write one CDR and test it's results only for content buffer
|
||||
func TestWriteCdr(t *testing.T) {
|
||||
var err error
|
||||
wrBuf := &bytes.Buffer{}
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
if hdrCfgFlds, err = config.CfgCdrFieldsFromCdrFieldsJsonCfg(hdrJsnCfgFlds); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if contentCfgFlds, err = config.CfgCdrFieldsFromCdrFieldsJsonCfg(contentJsnCfgFlds); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if trailerCfgFlds, err = config.CfgCdrFieldsFromCdrFieldsJsonCfg(trailerJsnCfgFlds); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
cdreCfg := &config.CdreConfig{
|
||||
ExportFormat: utils.MetaFileFWV,
|
||||
HeaderFields: hdrCfgFlds,
|
||||
ContentFields: contentCfgFlds,
|
||||
TrailerFields: trailerCfgFlds,
|
||||
}
|
||||
cdr := &CDR{CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC).String()),
|
||||
ToR: utils.VOICE, OrderID: 1, OriginID: "dsafdsaf", OriginHost: "192.168.1.1",
|
||||
RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org",
|
||||
Category: "call", Account: "1001", Subject: "1001", Destination: "1002",
|
||||
SetupTime: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC),
|
||||
AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC),
|
||||
Usage: time.Duration(10) * time.Second, RunID: utils.DEFAULT_RUNID, Cost: 2.34567,
|
||||
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"},
|
||||
}
|
||||
|
||||
cdre, err := NewCDRExporter([]*CDR{cdr}, cdreCfg, utils.MetaFileFWV, "", "", "fwv_1",
|
||||
true, 1, '|', map[string]float64{}, 0.0, cfg.RoundingDecimals, cfg.HttpSkipTlsVerify, nil)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err = cdre.processCDRs(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
eHeader := "10 VOIfwv_107111308420018011511340001 \n"
|
||||
eContentOut := "201001 1001 1002 0211 07111308420010 1 3dsafdsaf 0002.34570\n"
|
||||
eTrailer := "90 VOIfwv_100000100000010071113084200071113084200 \n"
|
||||
if err := cdre.writeOut(wrBuf); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
allOut := wrBuf.String()
|
||||
eAllOut := eHeader + eContentOut + eTrailer
|
||||
if math.Mod(float64(len(allOut)), 145) != 0 {
|
||||
t.Error("Unexpected export content length", len(allOut))
|
||||
} else if len(allOut) != len(eAllOut) {
|
||||
t.Errorf("Output does not match expected length. Have output %q, expecting: %q", allOut, eAllOut)
|
||||
}
|
||||
// Test stats
|
||||
if !cdre.firstCdrATime.Equal(cdr.AnswerTime) {
|
||||
t.Error("Unexpected firstCdrATime in stats: ", cdre.firstCdrATime)
|
||||
} else if !cdre.lastCdrATime.Equal(cdr.AnswerTime) {
|
||||
t.Error("Unexpected lastCdrATime in stats: ", cdre.lastCdrATime)
|
||||
} else if cdre.numberOfRecords != 1 {
|
||||
t.Error("Unexpected number of records in the stats: ", cdre.numberOfRecords)
|
||||
} else if cdre.totalDuration != cdr.Usage {
|
||||
t.Error("Unexpected total duration in the stats: ", cdre.totalDuration)
|
||||
} else if cdre.totalCost != utils.Round(cdr.Cost, cdre.roundingDecimals, utils.ROUNDING_MIDDLE) {
|
||||
t.Error("Unexpected total cost in the stats: ", cdre.totalCost)
|
||||
}
|
||||
if cdre.FirstOrderId() != 1 {
|
||||
t.Error("Unexpected FirstOrderId", cdre.FirstOrderId())
|
||||
}
|
||||
if cdre.LastOrderId() != 1 {
|
||||
t.Error("Unexpected LastOrderId", cdre.LastOrderId())
|
||||
}
|
||||
if cdre.TotalCost() != utils.Round(cdr.Cost, cdre.roundingDecimals, utils.ROUNDING_MIDDLE) {
|
||||
t.Error("Unexpected TotalCost: ", cdre.TotalCost())
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteCdrs(t *testing.T) {
|
||||
wrBuf := &bytes.Buffer{}
|
||||
cdreCfg := &config.CdreConfig{
|
||||
ExportFormat: utils.MetaFileFWV,
|
||||
HeaderFields: hdrCfgFlds,
|
||||
ContentFields: contentCfgFlds,
|
||||
TrailerFields: trailerCfgFlds,
|
||||
}
|
||||
cdr1 := &CDR{CGRID: utils.Sha1("aaa1", time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC).String()),
|
||||
ToR: utils.VOICE, OrderID: 2, OriginID: "aaa1", OriginHost: "192.168.1.1", RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org",
|
||||
Category: "call", Account: "1001", Subject: "1001", Destination: "1010",
|
||||
SetupTime: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC),
|
||||
AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC),
|
||||
Usage: time.Duration(10) * time.Second, RunID: utils.DEFAULT_RUNID, Cost: 2.25,
|
||||
ExtraFields: map[string]string{"productnumber": "12341", "fieldextr2": "valextr2"},
|
||||
}
|
||||
cdr2 := &CDR{CGRID: utils.Sha1("aaa2", time.Date(2013, 11, 7, 7, 42, 20, 0, time.UTC).String()),
|
||||
ToR: utils.VOICE, OrderID: 4, OriginID: "aaa2", OriginHost: "192.168.1.2", RequestType: utils.META_PREPAID, Direction: "*out", Tenant: "cgrates.org",
|
||||
Category: "call", Account: "1002", Subject: "1002", Destination: "1011",
|
||||
SetupTime: time.Date(2013, 11, 7, 7, 42, 20, 0, time.UTC),
|
||||
AnswerTime: time.Date(2013, 11, 7, 7, 42, 26, 0, time.UTC),
|
||||
Usage: time.Duration(5) * time.Minute, RunID: utils.DEFAULT_RUNID, Cost: 1.40001,
|
||||
ExtraFields: map[string]string{"productnumber": "12342", "fieldextr2": "valextr2"},
|
||||
}
|
||||
cdr3 := &CDR{}
|
||||
cdr4 := &CDR{CGRID: utils.Sha1("aaa3", time.Date(2013, 11, 7, 9, 42, 18, 0, time.UTC).String()),
|
||||
ToR: utils.VOICE, OrderID: 3, OriginID: "aaa4", OriginHost: "192.168.1.4", RequestType: utils.META_POSTPAID, Direction: "*out", Tenant: "cgrates.org",
|
||||
Category: "call", Account: "1004", Subject: "1004", Destination: "1013",
|
||||
SetupTime: time.Date(2013, 11, 7, 9, 42, 18, 0, time.UTC),
|
||||
AnswerTime: time.Date(2013, 11, 7, 9, 42, 26, 0, time.UTC),
|
||||
Usage: time.Duration(20) * time.Second, RunID: utils.DEFAULT_RUNID, Cost: 2.34567,
|
||||
ExtraFields: map[string]string{"productnumber": "12344", "fieldextr2": "valextr2"},
|
||||
}
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
cdre, err := NewCDRExporter([]*CDR{cdr1, cdr2, cdr3, cdr4}, cdreCfg, utils.MetaFileFWV, "", "", "fwv_1",
|
||||
true, 1, ',', map[string]float64{}, 0.0, cfg.RoundingDecimals, cfg.HttpSkipTlsVerify, nil)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err = cdre.processCDRs(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err := cdre.writeOut(wrBuf); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if len(wrBuf.String()) != 725 {
|
||||
t.Error("Output buffer does not contain expected info. Expecting len: 725, got: ", len(wrBuf.String()))
|
||||
}
|
||||
// Test stats
|
||||
if !cdre.firstCdrATime.Equal(cdr2.AnswerTime) {
|
||||
t.Error("Unexpected firstCdrATime in stats: ", cdre.firstCdrATime)
|
||||
}
|
||||
if !cdre.lastCdrATime.Equal(cdr4.AnswerTime) {
|
||||
t.Error("Unexpected lastCdrATime in stats: ", cdre.lastCdrATime)
|
||||
}
|
||||
if cdre.numberOfRecords != 3 {
|
||||
t.Error("Unexpected number of records in the stats: ", cdre.numberOfRecords)
|
||||
}
|
||||
if cdre.totalDuration != time.Duration(330)*time.Second {
|
||||
t.Error("Unexpected total duration in the stats: ", cdre.totalDuration)
|
||||
}
|
||||
if cdre.totalCost != 5.99568 {
|
||||
t.Error("Unexpected total cost in the stats: ", cdre.totalCost)
|
||||
}
|
||||
if cdre.FirstOrderId() != 2 {
|
||||
t.Error("Unexpected FirstOrderId", cdre.FirstOrderId())
|
||||
}
|
||||
if cdre.LastOrderId() != 4 {
|
||||
t.Error("Unexpected LastOrderId", cdre.LastOrderId())
|
||||
}
|
||||
if cdre.TotalCost() != 5.99568 {
|
||||
t.Error("Unexpected TotalCost: ", cdre.TotalCost())
|
||||
}
|
||||
}
|
||||
110
engine/cdrs.go
110
engine/cdrs.go
@@ -18,12 +18,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package engine
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -33,7 +30,6 @@ import (
|
||||
"github.com/cgrates/cgrates/guardian"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
var cdrServer *CdrServer // Share the server so we can use it in http handlers
|
||||
@@ -195,12 +191,11 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) {
|
||||
var out int
|
||||
go self.stats.Call("CDRStatsV1.AppendCDR", cdr, &out)
|
||||
}
|
||||
if len(self.cgrCfg.CDRSCdrReplication) != 0 { // Replicate raw CDR
|
||||
go self.replicateCdr(cdr)
|
||||
if len(self.cgrCfg.CDRSOnlineCDRExports) != 0 { // Replicate raw CDR
|
||||
self.replicateCDRs([]*CDR{cdr})
|
||||
}
|
||||
|
||||
if self.rals != nil && !cdr.Rated { // CDRs not rated will be processed by Rating
|
||||
go self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CDRSStoreCdrs, self.stats != nil, len(self.cgrCfg.CDRSCdrReplication) != 0)
|
||||
go self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CDRSStoreCdrs, self.stats != nil, len(self.cgrCfg.CDRSOnlineCDRExports) != 0)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -284,9 +279,7 @@ func (self *CdrServer) deriveRateStoreStatsReplicate(cdr *CDR, store, stats, rep
|
||||
}
|
||||
}
|
||||
if replicate {
|
||||
for _, ratedCDR := range ratedCDRs {
|
||||
self.replicateCdr(ratedCDR)
|
||||
}
|
||||
self.replicateCDRs(ratedCDRs)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -452,89 +445,22 @@ func (self *CdrServer) getCostFromRater(cdr *CDR) (*CallCost, error) {
|
||||
return cc, nil
|
||||
}
|
||||
|
||||
// ToDo: Add websocket support
|
||||
func (self *CdrServer) replicateCdr(cdr *CDR) error {
|
||||
for _, rplCfg := range self.cgrCfg.CDRSCdrReplication {
|
||||
passesFilters := true
|
||||
for _, cdfFltr := range rplCfg.CdrFilter {
|
||||
if !cdfFltr.FilterPasses(cdr.FieldAsString(cdfFltr)) {
|
||||
passesFilters = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if !passesFilters { // Not passes filters, ignore this replication
|
||||
func (self *CdrServer) replicateCDRs(cdrs []*CDR) (err error) {
|
||||
for _, exportID := range self.cgrCfg.CDRSOnlineCDRExports {
|
||||
expTpl := self.cgrCfg.CdreProfiles[exportID] // not checking for existence of profile since this should be done in a higher layer
|
||||
var cdre *CDRExporter
|
||||
if cdre, err = NewCDRExporter(cdrs, expTpl, expTpl.ExportFormat, expTpl.ExportPath, self.cgrCfg.FailedPostsDir, "CDRSReplication",
|
||||
expTpl.Synchronous, expTpl.Attempts, expTpl.FieldSeparator, expTpl.UsageMultiplyFactor,
|
||||
expTpl.CostMultiplyFactor, self.cgrCfg.RoundingDecimals, self.cgrCfg.HttpSkipTlsVerify, self.httpPoster); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<CDRS> Building CDRExporter for online exports got error: <%s>", err.Error()))
|
||||
continue
|
||||
}
|
||||
var body interface{}
|
||||
var content = ""
|
||||
switch rplCfg.Transport {
|
||||
case utils.MetaHTTPjsonCDR, utils.MetaAMQPjsonCDR:
|
||||
jsn, err := json.Marshal(cdr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
body = jsn
|
||||
case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap:
|
||||
expMp, err := cdr.AsExportMap(rplCfg.ContentFields, self.cgrCfg.HttpSkipTlsVerify, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
jsn, err := json.Marshal(expMp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
body = jsn
|
||||
case utils.META_HTTP_POST:
|
||||
expMp, err := cdr.AsExportMap(rplCfg.ContentFields, self.cgrCfg.HttpSkipTlsVerify, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
vals := url.Values{}
|
||||
for fld, val := range expMp {
|
||||
vals.Set(fld, val)
|
||||
}
|
||||
body = vals
|
||||
}
|
||||
var errChan chan error
|
||||
if rplCfg.Synchronous {
|
||||
errChan = make(chan error)
|
||||
}
|
||||
go func(body interface{}, rplCfg *config.CDRReplicationCfg, content string, errChan chan error) {
|
||||
var err error
|
||||
fallbackPath := utils.META_NONE
|
||||
if rplCfg.FallbackFileName() != utils.META_NONE {
|
||||
fallbackPath = path.Join(self.cgrCfg.FailedPostsDir, rplCfg.FallbackFileName())
|
||||
}
|
||||
switch rplCfg.Transport {
|
||||
case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST:
|
||||
_, err = self.httpPoster.Post(rplCfg.Address, utils.PosterTransportContentTypes[rplCfg.Transport], body, rplCfg.Attempts, fallbackPath)
|
||||
case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap:
|
||||
var amqpPoster *utils.AMQPPoster
|
||||
amqpPoster, err = utils.AMQPPostersCache.GetAMQPPoster(rplCfg.Address, rplCfg.Attempts, self.cgrCfg.FailedPostsDir)
|
||||
if err == nil { // error will be checked bellow
|
||||
var chn *amqp.Channel
|
||||
chn, err = amqpPoster.Post(
|
||||
nil, utils.PosterTransportContentTypes[rplCfg.Transport], body.([]byte), rplCfg.FallbackFileName())
|
||||
if chn != nil {
|
||||
chn.Close()
|
||||
}
|
||||
}
|
||||
default:
|
||||
err = fmt.Errorf("unsupported replication transport: %s", rplCfg.Transport)
|
||||
}
|
||||
if err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf(
|
||||
"<CDRReplicator> Replicating CDR: %+v, transport: %s, got error: %s", cdr, rplCfg.Transport, err.Error()))
|
||||
}
|
||||
if rplCfg.Synchronous {
|
||||
errChan <- err
|
||||
}
|
||||
}(body, rplCfg, content, errChan)
|
||||
if rplCfg.Synchronous { // Synchronize here
|
||||
<-errChan
|
||||
if err = cdre.ExportCDRs(); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<CDRS> Replicating CDR: %+v, got error: <%s>", err.Error()))
|
||||
continue
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
// Called by rate/re-rate API, FixMe: deprecate it once new APIer structure is operational
|
||||
@@ -544,7 +470,7 @@ func (self *CdrServer) RateCDRs(cdrFltr *utils.CDRsFilter, sendToStats bool) err
|
||||
return err
|
||||
}
|
||||
for _, cdr := range cdrs {
|
||||
if err := self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CDRSStoreCdrs, sendToStats, len(self.cgrCfg.CDRSCdrReplication) != 0); err != nil {
|
||||
if err := self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CDRSStoreCdrs, sendToStats, len(self.cgrCfg.CDRSOnlineCDRExports) != 0); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<CDRS> Processing CDR %+v, got error: %s", cdr, err.Error()))
|
||||
}
|
||||
}
|
||||
@@ -614,7 +540,7 @@ func (self *CdrServer) V1RateCDRs(attrs utils.AttrRateCDRs, reply *string) error
|
||||
if attrs.SendToStatS != nil {
|
||||
sendToStats = *attrs.SendToStatS
|
||||
}
|
||||
replicate := len(self.cgrCfg.CDRSCdrReplication) != 0
|
||||
replicate := len(self.cgrCfg.CDRSOnlineCDRExports) != 0
|
||||
if attrs.ReplicateCDRs != nil {
|
||||
replicate = *attrs.ReplicateCDRs
|
||||
}
|
||||
|
||||
@@ -42,6 +42,7 @@ type Storage interface {
|
||||
// Interface for storage providers.
|
||||
type RatingStorage interface {
|
||||
Storage
|
||||
Marshaler() Marshaler
|
||||
HasData(string, string) (bool, error)
|
||||
LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs []string) error
|
||||
GetRatingPlan(string, bool, string) (*RatingPlan, error)
|
||||
@@ -83,6 +84,7 @@ type RatingStorage interface {
|
||||
|
||||
type AccountingStorage interface {
|
||||
Storage
|
||||
Marshaler() Marshaler
|
||||
LoadAccountingCache(alsIDs, rvAlsIDs, rlIDs []string) error
|
||||
GetAccount(string) (*Account, error)
|
||||
SetAccount(*Account) error
|
||||
@@ -117,6 +119,7 @@ type AccountingStorage interface {
|
||||
// OnlineStorage contains methods to use for administering online data
|
||||
type DataDB interface {
|
||||
Storage
|
||||
Marshaler() Marshaler
|
||||
HasData(string, string) (bool, error)
|
||||
LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs []string) error
|
||||
GetRatingPlan(string, bool, string) (*RatingPlan, error)
|
||||
|
||||
@@ -87,6 +87,10 @@ func (ms *MapStorage) Flush(ignore string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *MapStorage) Marshaler() Marshaler {
|
||||
return ms.ms
|
||||
}
|
||||
|
||||
func (ms *MapStorage) SelectDatabase(dbName string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -322,6 +322,15 @@ func (ms *MongoStorage) Flush(ignore string) (err error) {
|
||||
return dbSession.DB(ms.db).DropDatabase()
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) Marshaler() Marshaler {
|
||||
return ms.ms
|
||||
}
|
||||
|
||||
// CloneSession returns a clone of the existing session so we can perform queries from outside of engine package
|
||||
func (ms *MongoStorage) CloneSession() *mgo.Session {
|
||||
return ms.session.Copy()
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SelectDatabase(dbName string) (err error) {
|
||||
ms.db = dbName
|
||||
return
|
||||
|
||||
@@ -109,6 +109,10 @@ func (rs *RedisStorage) Flush(ignore string) error {
|
||||
return rs.Cmd("FLUSHDB").Err
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) Marshaler() Marshaler {
|
||||
return rs.ms
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SelectDatabase(dbName string) (err error) {
|
||||
return rs.Cmd("SELECT", dbName).Err
|
||||
}
|
||||
|
||||
@@ -61,7 +61,7 @@ func (self *SQLStorage) GetKeysForPrefix(prefix string) ([]string, error) {
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
func (ms *SQLStorage) RebuildReverseForPrefix(prefix string) error {
|
||||
func (self *SQLStorage) RebuildReverseForPrefix(prefix string) error {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user