Fixups CdrExporter, derived charging should not transfer but kill the session if errors onAnswer

This commit is contained in:
DanB
2014-06-15 12:22:06 +02:00
parent ff714a1bc8
commit c973ea99e2
6 changed files with 73 additions and 63 deletions

View File

@@ -19,13 +19,11 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package apier
import (
"encoding/csv"
"fmt"
"github.com/cgrates/cgrates/cdre"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"os"
"path"
"strconv"
"strings"
@@ -84,7 +82,7 @@ func (self *ApierV1) ExportCdrsToFile(attr utils.AttrExpFileCdrs, reply *utils.E
}
exportId := strconv.FormatInt(time.Now().Unix(), 10)
if attr.ExportId != nil {
exportId = exportId
exportId = *attr.ExportId
}
fileName := fmt.Sprintf("cdre_%s.%s", exportId, cdrFormat)
if attr.ExportFileName != nil {
@@ -126,26 +124,13 @@ func (self *ApierV1) ExportCdrsToFile(attr utils.AttrExpFileCdrs, reply *utils.E
*reply = utils.ExportedFileCdrs{ExportedFilePath: ""}
return nil
}
fileOut, err := os.Create(filePath)
if err != nil {
return err
}
defer fileOut.Close()
cdrexp, err := cdre.NewCdrExporter(cdrs, self.LogDb, exportTemplate, exportId,
cdrexp, err := cdre.NewCdrExporter(cdrs, self.LogDb, exportTemplate, cdrFormat, exportId,
dataUsageMultiplyFactor, costMultiplyFactor, costShiftDigits, roundingDecimals, self.Config.RoundingDecimals, maskDestId, maskLen, self.Config.HttpSkipTlsVerify)
if err != nil {
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
}
switch cdrFormat {
case utils.CDRE_FIXED_WIDTH:
if err := cdrexp.WriteOut(fileOut); err != nil {
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
}
case utils.CSV:
csvWriter := csv.NewWriter(fileOut)
if err := cdrexp.WriteCsv(csvWriter); err != nil {
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
}
if err := cdrexp.WriteToFile(filePath); err != nil {
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
}
*reply = utils.ExportedFileCdrs{ExportedFilePath: filePath, TotalRecords: len(cdrs), TotalCost: cdrexp.TotalCost(),
ExportedCgrIds: cdrexp.PositiveExports(), UnexportedCgrIds: cdrexp.NegativeExports(), FirstOrderId: cdrexp.FirstOrderId(), LastOrderId: cdrexp.LastOrderId()}

View File

@@ -26,6 +26,7 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"io"
"os"
"strconv"
"strings"
"time"
@@ -53,7 +54,7 @@ const (
var err error
func NewCdrExporter(cdrs []*utils.StoredCdr, logDb engine.LogStorage, exportTpl *config.CdreConfig, exportId string,
func NewCdrExporter(cdrs []*utils.StoredCdr, logDb engine.LogStorage, exportTpl *config.CdreConfig, cdrFormat, exportId string,
dataUsageMultiplyFactor, costMultiplyFactor float64, costShiftDigits, roundDecimals, cgrPrecision int, maskDestId string, maskLen int, httpSkipTlsCheck bool) (*CdrExporter, error) {
if len(cdrs) == 0 { // Nothing to export
return nil, nil
@@ -62,6 +63,7 @@ func NewCdrExporter(cdrs []*utils.StoredCdr, logDb engine.LogStorage, exportTpl
cdrs: cdrs,
logDb: logDb,
exportTemplate: exportTpl,
cdrFormat: cdrFormat,
exportId: exportId,
dataUsageMultiplyFactor: dataUsageMultiplyFactor,
costMultiplyFactor: costMultiplyFactor,
@@ -83,6 +85,7 @@ type CdrExporter struct {
cdrs []*utils.StoredCdr
logDb engine.LogStorage // Used to extract cost_details if these are requested
exportTemplate *config.CdreConfig
cdrFormat string // csv, fwv
exportId string // Unique identifier or this export
dataUsageMultiplyFactor, costMultiplyFactor float64
costShiftDigits, roundDecimals, cgrPrecision int
@@ -231,13 +234,13 @@ func (cdre *CdrExporter) composeHeader() error {
return fmt.Errorf("Unsupported field type: %s", cfgFld.Type)
}
if err != nil {
engine.Logger.Err(fmt.Sprintf("<CdreFw> Cannot export CDR header, error: %s", err.Error()))
engine.Logger.Err(fmt.Sprintf("<CdreFw> Cannot export CDR header, field %s, error: %s", cfgFld.Name, err.Error()))
return err
}
fmtOut := outVal
if cdre.exportTemplate.CdrFormat == utils.CDRE_FIXED_WIDTH {
if cdre.cdrFormat == utils.CDRE_FIXED_WIDTH {
if fmtOut, err = FmtFieldWidth(outVal, cfgFld.Width, cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil {
engine.Logger.Err(fmt.Sprintf("<CdrExporter> Cannot export CDR header, error: %s", err.Error()))
engine.Logger.Err(fmt.Sprintf("<CdreFw> Cannot export CDR header, field %s, error: %s", cfgFld.Name, err.Error()))
return err
}
}
@@ -262,13 +265,13 @@ func (cdre *CdrExporter) composeTrailer() error {
return fmt.Errorf("Unsupported field type: %s", cfgFld.Type)
}
if err != nil {
engine.Logger.Err(fmt.Sprintf("<CdreFw> Cannot export CDR trailer, error: %s", err.Error()))
engine.Logger.Err(fmt.Sprintf("<CdreFw> Cannot export CDR trailer, field: %s, error: %s", cfgFld.Name, err.Error()))
return err
}
fmtOut := outVal
if cdre.exportTemplate.CdrFormat == utils.CDRE_FIXED_WIDTH {
if cdre.cdrFormat == utils.CDRE_FIXED_WIDTH {
if fmtOut, err = FmtFieldWidth(outVal, cfgFld.Width, cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil {
engine.Logger.Err(fmt.Sprintf("<CdreFw> Cannot export CDR trailer, error: %s", err.Error()))
engine.Logger.Err(fmt.Sprintf("<CdreFw> Cannot export CDR trailer, field: %s, error: %s", cfgFld.Name, err.Error()))
return err
}
}
@@ -332,7 +335,7 @@ func (cdre *CdrExporter) processCdr(cdr *utils.StoredCdr) error {
return err
}
fmtOut := outVal
if cdre.exportTemplate.CdrFormat == utils.CDRE_FIXED_WIDTH {
if cdre.cdrFormat == utils.CDRE_FIXED_WIDTH {
if fmtOut, err = FmtFieldWidth(outVal, cfgFld.Width, cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil {
engine.Logger.Err(fmt.Sprintf("<CdreFw> Cannot export CDR with cgrid: %s, runid: %s, fieldName: %s, fieldValue: %s, error: %s", cdr.CgrId, cdr.MediationRunId, cfgFld.Name, outVal, err.Error()))
return err
@@ -389,28 +392,8 @@ func (cdre *CdrExporter) processCdrs() error {
return nil
}
func (cdre *CdrExporter) WriteCsv(csvWriter *csv.Writer) error {
if len(cdre.header) != 0 {
if err := csvWriter.Write(cdre.header); err != nil {
return err
}
}
for _, cdrContent := range cdre.content {
if err := csvWriter.Write(cdrContent); err != nil {
return err
}
}
if len(cdre.trailer) != 0 {
if err := csvWriter.Write(cdre.trailer); err != nil {
return err
}
}
csvWriter.Flush()
return nil
}
// Write fwv content
func (cdre *CdrExporter) WriteOut(ioWriter io.Writer) error {
// Simple write method
func (cdre *CdrExporter) writeOut(ioWriter io.Writer) error {
if len(cdre.header) != 0 {
for _, fld := range append(cdre.header, "\n") {
if _, err := io.WriteString(ioWriter, fld); err != nil {
@@ -435,6 +418,50 @@ func (cdre *CdrExporter) WriteOut(ioWriter io.Writer) error {
return nil
}
// csvWriter specific method
func (cdre *CdrExporter) writeCsv(csvWriter *csv.Writer) error {
if len(cdre.header) != 0 {
if err := csvWriter.Write(cdre.header); err != nil {
return err
}
}
for _, cdrContent := range cdre.content {
if err := csvWriter.Write(cdrContent); err != nil {
return err
}
}
if len(cdre.trailer) != 0 {
if err := csvWriter.Write(cdre.trailer); err != nil {
return err
}
}
csvWriter.Flush()
return nil
}
// General method to write the content out to a file
func (cdre *CdrExporter) WriteToFile(filePath string) error {
fileOut, err := os.Create(filePath)
if err != nil {
return err
}
defer fileOut.Close()
switch cdre.cdrFormat {
case utils.CDRE_DRYRUN:
return nil
case utils.CDRE_FIXED_WIDTH:
if err := cdre.writeOut(fileOut); err != nil {
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
}
case utils.CSV:
csvWriter := csv.NewWriter(fileOut)
if err := cdre.writeCsv(csvWriter); err != nil {
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
}
}
return nil
}
// Return the first exported Cdr OrderId
func (cdre *CdrExporter) FirstOrderId() int64 {
return cdre.firstExpOrderId

View File

@@ -52,7 +52,8 @@ func TestCdreGetCombimedCdrFieldVal(t *testing.T) {
Usage: time.Duration(10) * time.Second, MediationRunId: "RETAIL1", Cost: 5.01},
}
cdre, err := NewCdrExporter(cdrs, logDb, cfg.CdreDefaultInstance, "firstexport", 0.0, 0.0, 0, 4, cfg.RoundingDecimals, "", 0, cfg.HttpSkipTlsVerify)
cdre, err := NewCdrExporter(cdrs, logDb, cfg.CdreDefaultInstance, cfg.CdreDefaultInstance.CdrFormat, "firstexport", 0.0, 0.0, 0, 4,
cfg.RoundingDecimals, "", 0, cfg.HttpSkipTlsVerify)
if err != nil {
t.Error("Unexpected error received: ", err)
}

View File

@@ -39,12 +39,12 @@ func TestCsvCdrWriter(t *testing.T) {
Usage: time.Duration(10) * time.Second, MediationRunId: utils.DEFAULT_RUNID,
ExtraFields: map[string]string{"extra1": "val_extra1", "extra2": "val_extra2", "extra3": "val_extra3"}, Cost: 1.01,
}
cdre, err := NewCdrExporter([]*utils.StoredCdr{storedCdr1}, logDb, cfg.CdreDefaultInstance, "firstexport", 0.0, 0.0, 0, 4, cfg.RoundingDecimals, "", 0, cfg.HttpSkipTlsVerify)
cdre, err := NewCdrExporter([]*utils.StoredCdr{storedCdr1}, logDb, cfg.CdreDefaultInstance, utils.CSV, "firstexport", 0.0, 0.0, 0, 4, cfg.RoundingDecimals, "", 0, cfg.HttpSkipTlsVerify)
if err != nil {
t.Error("Unexpected error received: ", err)
}
csvWriter := csv.NewWriter(writer)
if err := cdre.WriteCsv(csvWriter); err != nil {
if err := cdre.writeCsv(csvWriter); err != nil {
t.Error("Unexpected error: ", err)
}
expected := `dbafe9c8614c785a65aabd116dd3959c3c56f7f6,default,*voice,dsafdsaf,rated,*out,cgrates.org,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10000000000,1.0100`

View File

@@ -94,14 +94,14 @@ func TestWriteCdr(t *testing.T) {
Usage: time.Duration(10) * time.Second, MediationRunId: utils.DEFAULT_RUNID, Cost: 2.34567,
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"},
}
cdre, err := NewCdrExporter([]*utils.StoredCdr{cdr}, logDb, exportTpl.AsCdreConfig(), "fwv_1", 0.0, 0.0, 0, 4, cfg.RoundingDecimals, "", -1, cfg.HttpSkipTlsVerify)
cdre, err := NewCdrExporter([]*utils.StoredCdr{cdr}, logDb, exportTpl.AsCdreConfig(), utils.CDRE_FIXED_WIDTH, "fwv_1", 0.0, 0.0, 0, 4, cfg.RoundingDecimals, "", -1, cfg.HttpSkipTlsVerify)
if err != nil {
t.Error(err)
}
eHeader := "10 VOI0000007111308420024031415390001 \n"
eContentOut := "201001 1001 1002 0211 07111308420010 1 3dsafdsaf 0002.34570\n"
eTrailer := "90 VOI0000000000100000010071113084260071113084200 \n"
if err := cdre.WriteOut(wrBuf); err != nil {
if err := cdre.writeOut(wrBuf); err != nil {
t.Error(err)
}
allOut := wrBuf.String()
@@ -167,12 +167,12 @@ func TestWriteCdrs(t *testing.T) {
ExtraFields: map[string]string{"productnumber": "12344", "fieldextr2": "valextr2"},
}
cfg, _ := config.NewDefaultCGRConfig()
cdre, err := NewCdrExporter([]*utils.StoredCdr{cdr1, cdr2, cdr3, cdr4}, logDb, exportTpl.AsCdreConfig(),
cdre, err := NewCdrExporter([]*utils.StoredCdr{cdr1, cdr2, cdr3, cdr4}, logDb, exportTpl.AsCdreConfig(), utils.CDRE_FIXED_WIDTH,
"fwv_1", 0.0, 0.0, 0, 4, cfg.RoundingDecimals, "", -1, cfg.HttpSkipTlsVerify)
if err != nil {
t.Error(err)
}
if err := cdre.WriteOut(wrBuf); err != nil {
if err := cdre.writeOut(wrBuf); err != nil {
t.Error(err)
}
if len(wrBuf.String()) != 725 {

View File

@@ -155,7 +155,7 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) {
Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)}
var dcs utils.DerivedChargers
if err := sm.connector.GetDerivedChargers(attrsDC, &dcs); err != nil {
engine.Logger.Err(fmt.Sprintf("Could not get derived charging for event %s: %s", ev.GetUUID(), err.Error()))
engine.Logger.Err(fmt.Sprintf("OnPark: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error()))
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) // We unpark on original destination
return
}
@@ -211,8 +211,6 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) {
}
func (sm *FSSessionManager) OnChannelAnswer(ev Event) {
//engine.Logger.Info("<SessionManager> FreeSWITCH answer.")
// Make sure cgr_type is enforced even if not set by FreeSWITCH
if ev.MissingParameter() {
sm.DisconnectSession(ev.GetUUID(), MISSING_PARAMETER)
}
@@ -223,8 +221,8 @@ func (sm *FSSessionManager) OnChannelAnswer(ev Event) {
Direction: ev.GetDirection(utils.META_DEFAULT), Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)}
var dcs utils.DerivedChargers
if err := sm.connector.GetDerivedChargers(attrsDC, &dcs); err != nil {
engine.Logger.Err(fmt.Sprintf("Could not get derived charging for event %s: %s", ev.GetUUID(), err.Error()))
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) // We unpark on original destination
engine.Logger.Err(fmt.Sprintf("OnAnswer: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error()))
sm.DisconnectSession(ev.GetUUID(), SYSTEM_ERROR) // Disconnect the session since we are not able to process sessions
return
}
dcs, _ = dcs.AppendDefaultRun()
@@ -246,8 +244,7 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) {
Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)}
var dcs utils.DerivedChargers
if err := sm.connector.GetDerivedChargers(attrsDC, &dcs); err != nil {
engine.Logger.Err(fmt.Sprintf("Could not get derived charging for event %s: %s", ev.GetUUID(), err.Error()))
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) // We unpark on original destination
engine.Logger.Err(fmt.Sprintf("OnHangup: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error()))
return
}
dcs, _ = dcs.AppendDefaultRun()