This commit is contained in:
alin104n
2017-02-27 00:50:25 +02:00
72 changed files with 2325 additions and 1095 deletions

View File

@@ -45,6 +45,10 @@ func init() {
var err error
switch DB {
case "map":
if cgrCfg := config.CgrConfig(); cgrCfg == nil {
cgrCfg, _ = config.NewDefaultCGRConfig()
config.SetCgrConfig(cgrCfg)
}
ratingStorage, _ = NewMapStorage()
accountingStorage, _ = NewMapStorage()
case utils.MONGO:

View File

@@ -792,6 +792,11 @@ func (cdr *CDR) formatField(cfgFld *config.CfgCdrField, httpSkipTlsCheck bool, g
}
// Part of event interface
func (cdr *CDR) AsMapStringIface() (map[string]interface{}, error) {
return nil, utils.ErrNotImplemented
}
// Used in place where we need to export the CDR based on an export template
// ExportRecord is a []string to keep it compatible with encoding/csv Writer
func (cdr *CDR) AsExportRecord(exportFields []*config.CfgCdrField, httpSkipTlsCheck bool, groupedCDRs []*CDR, roundingDecs int) (expRecord []string, err error) {
@@ -812,16 +817,17 @@ func (cdr *CDR) AsExportRecord(exportFields []*config.CfgCdrField, httpSkipTlsCh
return expRecord, nil
}
// Part of event interface
func (cdr *CDR) AsMapStringIface() (map[string]interface{}, error) {
return nil, utils.ErrNotImplemented
}
// AsExportMap converts the CDR into a map[string]string based on export template
// Used in real-time replication as well as remote exports
func (cdr *CDR) AsExportMap(exportFields []*config.CfgCdrField, httpSkipTlsCheck bool, groupedCDRs []*CDR) (expMap map[string]string, err error) {
func (cdr *CDR) AsExportMap(exportFields []*config.CfgCdrField, httpSkipTlsCheck bool, groupedCDRs []*CDR, roundingDecs int) (expMap map[string]string, err error) {
expMap = make(map[string]string)
for _, cfgFld := range exportFields {
if roundingDecs != 0 {
clnFld := new(config.CfgCdrField) // Clone so we can modify the rounding decimals without affecting the template
*clnFld = *cfgFld
clnFld.RoundingDecimals = roundingDecs
cfgFld = clnFld
}
if fmtOut, err := cdr.formatField(cfgFld, httpSkipTlsCheck, groupedCDRs); err != nil {
return nil, err
} else {

View File

@@ -579,7 +579,7 @@ func TestCDRAsExportMap(t *testing.T) {
&config.CfgCdrField{FieldId: utils.DESTINATION, Type: utils.META_COMPOSED, Value: utils.ParseRSRFieldsMustCompile("~Destination:s/^\\+(\\d+)$/00${1}/", utils.INFIELD_SEP)},
&config.CfgCdrField{FieldId: "FieldExtra1", Type: utils.META_COMPOSED, Value: utils.ParseRSRFieldsMustCompile("field_extr1", utils.INFIELD_SEP)},
}
if cdrMp, err := cdr.AsExportMap(expFlds, false, nil); err != nil {
if cdrMp, err := cdr.AsExportMap(expFlds, false, nil, 0); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCDRMp, cdrMp) {
t.Errorf("Expecting: %+v, received: %+v", eCDRMp, cdrMp)

493
engine/cdre.go Normal file
View File

@@ -0,0 +1,493 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"encoding/csv"
"encoding/json"
"fmt"
"io"
"net/url"
"os"
"path"
"strconv"
"sync"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/streadway/amqp"
)
const (
META_EXPORTID = "*export_id"
META_TIMENOW = "*time_now"
META_FIRSTCDRATIME = "*first_cdr_atime"
META_LASTCDRATIME = "*last_cdr_atime"
META_NRCDRS = "*cdrs_number"
META_DURCDRS = "*cdrs_duration"
META_SMSUSAGE = "*sms_usage"
META_MMSUSAGE = "*mms_usage"
META_GENERICUSAGE = "*generic_usage"
META_DATAUSAGE = "*data_usage"
META_COSTCDRS = "*cdrs_cost"
META_FORMATCOST = "*format_cost"
)
func NewCDRExporter(cdrs []*CDR, exportTemplate *config.CdreConfig, exportFormat, exportPath, fallbackPath, exportID string,
synchronous bool, attempts int, fieldSeparator rune, usageMultiplyFactor utils.FieldMultiplyFactor,
costMultiplyFactor float64, roundingDecimals int, httpSkipTlsCheck bool, httpPoster *utils.HTTPPoster) (*CDRExporter, error) {
if len(cdrs) == 0 { // Nothing to export
return nil, nil
}
cdre := &CDRExporter{
cdrs: cdrs,
exportTemplate: exportTemplate,
exportFormat: exportFormat,
exportPath: exportPath,
fallbackPath: fallbackPath,
exportID: exportID,
synchronous: synchronous,
attempts: attempts,
fieldSeparator: fieldSeparator,
usageMultiplyFactor: usageMultiplyFactor,
costMultiplyFactor: costMultiplyFactor,
roundingDecimals: roundingDecimals,
httpSkipTlsCheck: httpSkipTlsCheck,
httpPoster: httpPoster,
negativeExports: make(map[string]string),
}
return cdre, nil
}
type CDRExporter struct {
sync.RWMutex
cdrs []*CDR
exportTemplate *config.CdreConfig
exportFormat string
exportPath string
fallbackPath string // folder where we save failed CDRs
exportID string // Unique identifier or this export
synchronous bool
attempts int
fieldSeparator rune
usageMultiplyFactor utils.FieldMultiplyFactor
costMultiplyFactor float64
roundingDecimals int
httpSkipTlsCheck bool
httpPoster *utils.HTTPPoster
header, trailer []string // Header and Trailer fields
content [][]string // Rows of cdr fields
firstCdrATime, lastCdrATime time.Time
numberOfRecords int
totalDuration, totalDataUsage, totalSmsUsage,
totalMmsUsage, totalGenericUsage time.Duration
totalCost float64
firstExpOrderId, lastExpOrderId int64
positiveExports []string // CGRIDs of successfully exported CDRs
negativeExports map[string]string // CGRIDs of failed exports
}
// Handle various meta functions used in header/trailer
func (cdre *CDRExporter) metaHandler(tag, arg string) (string, error) {
switch tag {
case META_EXPORTID:
return cdre.exportID, nil
case META_TIMENOW:
return time.Now().Format(arg), nil
case META_FIRSTCDRATIME:
return cdre.firstCdrATime.Format(arg), nil
case META_LASTCDRATIME:
return cdre.lastCdrATime.Format(arg), nil
case META_NRCDRS:
return strconv.Itoa(cdre.numberOfRecords), nil
case META_DURCDRS:
emulatedCdr := &CDR{ToR: utils.VOICE, Usage: cdre.totalDuration}
return emulatedCdr.FormatUsage(arg), nil
case META_SMSUSAGE:
emulatedCdr := &CDR{ToR: utils.SMS, Usage: cdre.totalSmsUsage}
return emulatedCdr.FormatUsage(arg), nil
case META_MMSUSAGE:
emulatedCdr := &CDR{ToR: utils.MMS, Usage: cdre.totalMmsUsage}
return emulatedCdr.FormatUsage(arg), nil
case META_GENERICUSAGE:
emulatedCdr := &CDR{ToR: utils.GENERIC, Usage: cdre.totalGenericUsage}
return emulatedCdr.FormatUsage(arg), nil
case META_DATAUSAGE:
emulatedCdr := &CDR{ToR: utils.DATA, Usage: cdre.totalDataUsage}
return emulatedCdr.FormatUsage(arg), nil
case META_COSTCDRS:
return strconv.FormatFloat(utils.Round(cdre.totalCost, cdre.roundingDecimals, utils.ROUNDING_MIDDLE), 'f', -1, 64), nil
default:
return "", fmt.Errorf("Unsupported METATAG: %s", tag)
}
}
// Compose and cache the header
func (cdre *CDRExporter) composeHeader() (err error) {
for _, cfgFld := range cdre.exportTemplate.HeaderFields {
var outVal string
switch cfgFld.Type {
case utils.META_FILLER:
outVal = cfgFld.Value.Id()
cfgFld.Padding = "right"
case utils.META_CONSTANT:
outVal = cfgFld.Value.Id()
case utils.META_HANDLER:
outVal, err = cdre.metaHandler(cfgFld.Value.Id(), cfgFld.Layout)
default:
return fmt.Errorf("Unsupported field type: %s", cfgFld.Type)
}
if err != nil {
utils.Logger.Err(fmt.Sprintf("<CdreFw> Cannot export CDR header, field %s, error: %s", cfgFld.Tag, err.Error()))
return err
}
fmtOut := outVal
if fmtOut, err = utils.FmtFieldWidth(outVal, cfgFld.Width, cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil {
utils.Logger.Err(fmt.Sprintf("<CdreFw> Cannot export CDR header, field %s, error: %s", cfgFld.Tag, err.Error()))
return err
}
cdre.Lock()
cdre.header = append(cdre.header, fmtOut)
cdre.Unlock()
}
return nil
}
// Compose and cache the trailer
func (cdre *CDRExporter) composeTrailer() (err error) {
for _, cfgFld := range cdre.exportTemplate.TrailerFields {
var outVal string
switch cfgFld.Type {
case utils.META_FILLER:
outVal = cfgFld.Value.Id()
cfgFld.Padding = "right"
case utils.META_CONSTANT:
outVal = cfgFld.Value.Id()
case utils.META_HANDLER:
outVal, err = cdre.metaHandler(cfgFld.Value.Id(), cfgFld.Layout)
default:
return fmt.Errorf("Unsupported field type: %s", cfgFld.Type)
}
if err != nil {
utils.Logger.Err(fmt.Sprintf("<CdreFw> Cannot export CDR trailer, field: %s, error: %s", cfgFld.Tag, err.Error()))
return err
}
fmtOut := outVal
if fmtOut, err = utils.FmtFieldWidth(outVal, cfgFld.Width, cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil {
utils.Logger.Err(fmt.Sprintf("<CdreFw> Cannot export CDR trailer, field: %s, error: %s", cfgFld.Tag, err.Error()))
return err
}
cdre.Lock()
cdre.trailer = append(cdre.trailer, fmtOut)
cdre.Unlock()
}
return nil
}
func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) {
var body interface{}
switch cdre.exportFormat {
case utils.MetaHTTPjsonCDR, utils.MetaAMQPjsonCDR:
jsn, err := json.Marshal(cdr)
if err != nil {
return err
}
body = jsn
case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap:
expMp, err := cdr.AsExportMap(cdre.exportTemplate.ContentFields, cdre.httpSkipTlsCheck, nil, cdre.roundingDecimals)
if err != nil {
return err
}
jsn, err := json.Marshal(expMp)
if err != nil {
return err
}
body = jsn
case utils.META_HTTP_POST:
expMp, err := cdr.AsExportMap(cdre.exportTemplate.ContentFields, cdre.httpSkipTlsCheck, nil, cdre.roundingDecimals)
if err != nil {
return err
}
vals := url.Values{}
for fld, val := range expMp {
vals.Set(fld, val)
}
body = vals
default:
err = fmt.Errorf("unsupported exportFormat: <%s>", cdre.exportFormat)
}
if err != nil {
return
}
// compute fallbackPath
fallbackPath := utils.META_NONE
ffn := &utils.FallbackFileName{Module: utils.CDRPoster, Transport: cdre.exportFormat, Address: cdre.exportPath, RequestID: utils.GenUUID()}
fallbackFileName := ffn.AsString()
if cdre.fallbackPath != utils.META_NONE { // not none, need fallback
fallbackPath = path.Join(cdre.fallbackPath, fallbackFileName)
}
switch cdre.exportFormat {
case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST:
_, err = cdre.httpPoster.Post(cdre.exportPath, utils.PosterTransportContentTypes[cdre.exportFormat], body, cdre.attempts, fallbackPath)
case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap:
var amqpPoster *utils.AMQPPoster
amqpPoster, err = utils.AMQPPostersCache.GetAMQPPoster(cdre.exportPath, cdre.attempts, cdre.fallbackPath)
if err == nil { // error will be checked bellow
var chn *amqp.Channel
chn, err = amqpPoster.Post(
nil, utils.PosterTransportContentTypes[cdre.exportFormat], body.([]byte), fallbackFileName)
if chn != nil {
chn.Close()
}
}
}
return
}
// Write individual cdr into content buffer, build stats
func (cdre *CDRExporter) processCDR(cdr *CDR) (err error) {
if cdr.ExtraFields == nil { // Avoid assignment in nil map if not initialized
cdr.ExtraFields = make(map[string]string)
}
// Usage multiply, find config based on ToR field or *any
for _, key := range []string{cdr.ToR, utils.ANY} {
if uM, hasIt := cdre.usageMultiplyFactor[key]; hasIt && uM != 1.0 {
cdr.UsageMultiply(uM, cdre.roundingDecimals)
break
}
}
if cdre.costMultiplyFactor != 0.0 {
cdr.CostMultiply(cdre.costMultiplyFactor, cdre.roundingDecimals)
}
switch cdre.exportFormat {
case utils.MetaFileFWV, utils.MetaFileCSV:
var cdrRow []string
cdrRow, err = cdr.AsExportRecord(cdre.exportTemplate.ContentFields, cdre.httpSkipTlsCheck, cdre.cdrs, cdre.roundingDecimals)
if len(cdrRow) == 0 { // No CDR data, most likely no configuration fields defined
return
} else {
cdre.Lock()
cdre.content = append(cdre.content, cdrRow)
cdre.Unlock()
}
default: // attempt posting CDR
err = cdre.postCdr(cdr)
}
if err != nil {
utils.Logger.Err(fmt.Sprintf("<CDRE> Cannot export CDR with CGRID: %s and runid: %s, error: %s", cdr.CGRID, cdr.RunID, err.Error()))
return
}
// Done with writing content, compute stats here
if cdre.firstCdrATime.IsZero() || cdr.AnswerTime.Before(cdre.firstCdrATime) {
cdre.firstCdrATime = cdr.AnswerTime
}
if cdr.AnswerTime.After(cdre.lastCdrATime) {
cdre.lastCdrATime = cdr.AnswerTime
}
cdre.numberOfRecords += 1
if cdr.ToR == utils.VOICE { // Only count duration for non data cdrs
cdre.totalDuration += cdr.Usage
}
if cdr.ToR == utils.SMS { // Count usage for SMS
cdre.totalSmsUsage += cdr.Usage
}
if cdr.ToR == utils.MMS { // Count usage for MMS
cdre.totalMmsUsage += cdr.Usage
}
if cdr.ToR == utils.GENERIC { // Count usage for GENERIC
cdre.totalGenericUsage += cdr.Usage
}
if cdr.ToR == utils.DATA { // Count usage for DATA
cdre.totalDataUsage += cdr.Usage
}
if cdr.Cost != -1 {
cdre.totalCost += cdr.Cost
cdre.totalCost = utils.Round(cdre.totalCost, cdre.roundingDecimals, utils.ROUNDING_MIDDLE)
}
if cdre.firstExpOrderId > cdr.OrderID || cdre.firstExpOrderId == 0 {
cdre.firstExpOrderId = cdr.OrderID
}
if cdre.lastExpOrderId < cdr.OrderID {
cdre.lastExpOrderId = cdr.OrderID
}
return nil
}
// Builds header, content and trailers
func (cdre *CDRExporter) processCDRs() (err error) {
var wg sync.WaitGroup
for _, cdr := range cdre.cdrs {
if cdr == nil || len(cdr.CGRID) == 0 { // CDR needs to exist and it's CGRID needs to be populated
continue
}
passesFilters := true
for _, cdrFltr := range cdre.exportTemplate.CDRFilter {
if !cdrFltr.FilterPasses(cdr.FieldAsString(cdrFltr)) {
passesFilters = false
break
}
}
if !passesFilters { // Not passes filters, ignore this CDR
continue
}
if cdre.synchronous ||
utils.IsSliceMember([]string{utils.MetaFileCSV, utils.MetaFileFWV}, cdre.exportFormat) {
wg.Add(1) // wait for synchronous or file ones since these need to be done before continuing
}
go func(cdr *CDR) {
if err := cdre.processCDR(cdr); err != nil {
cdre.Lock()
cdre.negativeExports[cdr.CGRID] = err.Error()
cdre.Unlock()
} else {
cdre.Lock()
cdre.positiveExports = append(cdre.positiveExports, cdr.CGRID)
cdre.Unlock()
}
if cdre.synchronous ||
utils.IsSliceMember([]string{utils.MetaFileCSV, utils.MetaFileFWV}, cdre.exportFormat) {
wg.Done()
}
}(cdr)
}
wg.Wait()
// Process header and trailer after processing cdrs since the metatag functions can access stats out of built cdrs
if cdre.exportTemplate.HeaderFields != nil {
if err = cdre.composeHeader(); err != nil {
return
}
}
if cdre.exportTemplate.TrailerFields != nil {
if err = cdre.composeTrailer(); err != nil {
return
}
}
return
}
// Simple write method
func (cdre *CDRExporter) writeOut(ioWriter io.Writer) error {
cdre.Lock()
defer cdre.Unlock()
if len(cdre.header) != 0 {
for _, fld := range append(cdre.header, "\n") {
if _, err := io.WriteString(ioWriter, fld); err != nil {
return err
}
}
}
for _, cdrContent := range cdre.content {
for _, cdrFld := range append(cdrContent, "\n") {
if _, err := io.WriteString(ioWriter, cdrFld); err != nil {
return err
}
}
}
if len(cdre.trailer) != 0 {
for _, fld := range append(cdre.trailer, "\n") {
if _, err := io.WriteString(ioWriter, fld); err != nil {
return err
}
}
}
return nil
}
// csvWriter specific method
func (cdre *CDRExporter) writeCsv(csvWriter *csv.Writer) error {
csvWriter.Comma = cdre.fieldSeparator
cdre.RLock()
defer cdre.RUnlock()
if len(cdre.header) != 0 {
if err := csvWriter.Write(cdre.header); err != nil {
return err
}
}
for _, cdrContent := range cdre.content {
if err := csvWriter.Write(cdrContent); err != nil {
return err
}
}
if len(cdre.trailer) != 0 {
if err := csvWriter.Write(cdre.trailer); err != nil {
return err
}
}
csvWriter.Flush()
return nil
}
func (cdre *CDRExporter) ExportCDRs() (err error) {
if err = cdre.processCDRs(); err != nil {
return
}
if utils.IsSliceMember([]string{utils.MetaFileCSV, utils.MetaFileFWV}, cdre.exportFormat) { // files are written after processing all CDRs
cdre.RLock()
contLen := len(cdre.content)
cdre.RUnlock()
if contLen == 0 {
return
}
fileOut, err := os.Create(cdre.exportPath)
if err != nil {
return err
}
defer fileOut.Close()
if cdre.exportFormat == utils.MetaFileCSV {
return cdre.writeCsv(csv.NewWriter(fileOut))
}
return cdre.writeOut(fileOut)
}
return
}
// Return the first exported Cdr OrderId
func (cdre *CDRExporter) FirstOrderId() int64 {
return cdre.firstExpOrderId
}
// Return the last exported Cdr OrderId
func (cdre *CDRExporter) LastOrderId() int64 {
return cdre.lastExpOrderId
}
// Return total cost in the exported cdrs
func (cdre *CDRExporter) TotalCost() float64 {
return cdre.totalCost
}
func (cdre *CDRExporter) TotalExportedCdrs() int {
return cdre.numberOfRecords
}
// Return successfully exported CGRIDs
func (cdre *CDRExporter) PositiveExports() []string {
cdre.RLock()
defer cdre.RUnlock()
return cdre.positiveExports
}
// Return failed exported CGRIDs together with the reason
func (cdre *CDRExporter) NegativeExports() map[string]string {
cdre.RLock()
defer cdre.RUnlock()
return cdre.negativeExports
}

92
engine/cdrecsv_test.go Normal file
View File

@@ -0,0 +1,92 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"bytes"
"encoding/csv"
"strings"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
func TestCsvCdrWriter(t *testing.T) {
writer := &bytes.Buffer{}
cfg, _ := config.NewDefaultCGRConfig()
storedCdr1 := &CDR{
CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()), ToR: utils.VOICE, OriginID: "dsafdsaf", OriginHost: "192.168.1.1",
RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org",
Category: "call", Account: "1001", Subject: "1001", Destination: "1002", SetupTime: time.Unix(1383813745, 0).UTC(), AnswerTime: time.Unix(1383813746, 0).UTC(),
Usage: time.Duration(10) * time.Second, RunID: utils.DEFAULT_RUNID,
ExtraFields: map[string]string{"extra1": "val_extra1", "extra2": "val_extra2", "extra3": "val_extra3"}, Cost: 1.01,
}
cdre, err := NewCDRExporter([]*CDR{storedCdr1}, cfg.CdreProfiles["*default"], utils.MetaFileCSV, "", "", "firstexport",
true, 1, ',', map[string]float64{}, 0.0, cfg.RoundingDecimals, cfg.HttpSkipTlsVerify, nil)
if err != nil {
t.Error("Unexpected error received: ", err)
}
if err = cdre.processCDRs(); err != nil {
t.Error(err)
}
csvWriter := csv.NewWriter(writer)
if err := cdre.writeCsv(csvWriter); err != nil {
t.Error("Unexpected error: ", err)
}
expected := `dbafe9c8614c785a65aabd116dd3959c3c56f7f6,*default,*voice,dsafdsaf,*rated,*out,cgrates.org,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10,1.01000`
result := strings.TrimSpace(writer.String())
if result != expected {
t.Errorf("Expected: \n%s received: \n%s.", expected, result)
}
if cdre.TotalCost() != 1.01 {
t.Error("Unexpected TotalCost: ", cdre.TotalCost())
}
}
func TestAlternativeFieldSeparator(t *testing.T) {
writer := &bytes.Buffer{}
cfg, _ := config.NewDefaultCGRConfig()
storedCdr1 := &CDR{CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()), ToR: utils.VOICE, OriginID: "dsafdsaf", OriginHost: "192.168.1.1",
RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org",
Category: "call", Account: "1001", Subject: "1001", Destination: "1002", SetupTime: time.Unix(1383813745, 0).UTC(), AnswerTime: time.Unix(1383813746, 0).UTC(),
Usage: time.Duration(10) * time.Second, RunID: utils.DEFAULT_RUNID,
ExtraFields: map[string]string{"extra1": "val_extra1", "extra2": "val_extra2", "extra3": "val_extra3"}, Cost: 1.01,
}
cdre, err := NewCDRExporter([]*CDR{storedCdr1}, cfg.CdreProfiles["*default"], utils.MetaFileCSV, "", "", "firstexport",
true, 1, '|', map[string]float64{}, 0.0, cfg.RoundingDecimals, cfg.HttpSkipTlsVerify, nil)
if err != nil {
t.Error("Unexpected error received: ", err)
}
if err = cdre.processCDRs(); err != nil {
t.Error(err)
}
csvWriter := csv.NewWriter(writer)
if err := cdre.writeCsv(csvWriter); err != nil {
t.Error("Unexpected error: ", err)
}
expected := `dbafe9c8614c785a65aabd116dd3959c3c56f7f6|*default|*voice|dsafdsaf|*rated|*out|cgrates.org|call|1001|1001|1002|2013-11-07T08:42:25Z|2013-11-07T08:42:26Z|10|1.01000`
result := strings.TrimSpace(writer.String())
if result != expected {
t.Errorf("Expected: \n%s received: \n%s.", expected, result)
}
if cdre.TotalCost() != 1.01 {
t.Error("Unexpected TotalCost: ", cdre.TotalCost())
}
}

245
engine/cdrefwv_test.go Normal file
View File

@@ -0,0 +1,245 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"bytes"
"math"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
var hdrJsnCfgFlds = []*config.CdrFieldJsonCfg{
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("TypeOfRecord"), Type: utils.StringPointer(utils.META_CONSTANT), Value: utils.StringPointer("10"), Width: utils.IntPointer(2)},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("Filler1"), Type: utils.StringPointer(utils.META_FILLER), Width: utils.IntPointer(3)},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("DistributorCode"), Type: utils.StringPointer(utils.META_CONSTANT), Value: utils.StringPointer("VOI"), Width: utils.IntPointer(3)},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("FileSeqNr"), Type: utils.StringPointer(utils.META_HANDLER), Value: utils.StringPointer(META_EXPORTID),
Width: utils.IntPointer(5), Strip: utils.StringPointer("right"), Padding: utils.StringPointer("zeroleft")},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("LastCdr"), Type: utils.StringPointer(utils.META_HANDLER), Value: utils.StringPointer(META_LASTCDRATIME),
Width: utils.IntPointer(12), Layout: utils.StringPointer("020106150400")},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("FileCreationfTime"), Type: utils.StringPointer(utils.META_HANDLER), Value: utils.StringPointer(META_TIMENOW),
Width: utils.IntPointer(12), Layout: utils.StringPointer("020106150400")},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("FileVersion"), Type: utils.StringPointer(utils.META_CONSTANT), Value: utils.StringPointer("01"), Width: utils.IntPointer(2)},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("Filler2"), Type: utils.StringPointer(utils.META_FILLER), Width: utils.IntPointer(105)},
}
var contentJsnCfgFlds = []*config.CdrFieldJsonCfg{
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("TypeOfRecord"), Type: utils.StringPointer(utils.META_CONSTANT), Value: utils.StringPointer("20"), Width: utils.IntPointer(2)},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("Account"), Type: utils.StringPointer(utils.META_COMPOSED), Value: utils.StringPointer(utils.ACCOUNT), Width: utils.IntPointer(12),
Strip: utils.StringPointer("left"), Padding: utils.StringPointer("right")},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("Subject"), Type: utils.StringPointer(utils.META_COMPOSED), Value: utils.StringPointer(utils.SUBJECT), Width: utils.IntPointer(5),
Strip: utils.StringPointer("right"), Padding: utils.StringPointer("right")},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("CLI"), Type: utils.StringPointer(utils.META_COMPOSED), Value: utils.StringPointer("cli"), Width: utils.IntPointer(15),
Strip: utils.StringPointer("xright"), Padding: utils.StringPointer("right")},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("Destination"), Type: utils.StringPointer(utils.META_COMPOSED), Value: utils.StringPointer(utils.DESTINATION), Width: utils.IntPointer(24),
Strip: utils.StringPointer("xright"), Padding: utils.StringPointer("right")},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("TOR"), Type: utils.StringPointer(utils.META_CONSTANT), Value: utils.StringPointer("02"), Width: utils.IntPointer(2)},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("SubtypeTOR"), Type: utils.StringPointer(utils.META_CONSTANT), Value: utils.StringPointer("11"), Width: utils.IntPointer(4),
Padding: utils.StringPointer("right")},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("SetupTime"), Type: utils.StringPointer(utils.META_COMPOSED), Value: utils.StringPointer(utils.SETUP_TIME), Width: utils.IntPointer(12),
Strip: utils.StringPointer("right"), Padding: utils.StringPointer("right"), Layout: utils.StringPointer("020106150400")},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("Duration"), Type: utils.StringPointer(utils.META_COMPOSED), Value: utils.StringPointer(utils.USAGE), Width: utils.IntPointer(6),
Strip: utils.StringPointer("right"), Padding: utils.StringPointer("right"), Layout: utils.StringPointer(utils.SECONDS)},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("DataVolume"), Type: utils.StringPointer(utils.META_FILLER), Width: utils.IntPointer(6)},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("TaxCode"), Type: utils.StringPointer(utils.META_CONSTANT), Value: utils.StringPointer("1"), Width: utils.IntPointer(1)},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("OperatorCode"), Type: utils.StringPointer(utils.META_COMPOSED), Value: utils.StringPointer("opercode"), Width: utils.IntPointer(2),
Strip: utils.StringPointer("right"), Padding: utils.StringPointer("right")},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("ProductId"), Type: utils.StringPointer(utils.META_COMPOSED), Value: utils.StringPointer("productid"), Width: utils.IntPointer(5),
Strip: utils.StringPointer("right"), Padding: utils.StringPointer("right")},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("NetworkId"), Type: utils.StringPointer(utils.META_CONSTANT), Value: utils.StringPointer("3"), Width: utils.IntPointer(1)},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("CallId"), Type: utils.StringPointer(utils.META_COMPOSED), Value: utils.StringPointer(utils.ACCID), Width: utils.IntPointer(16),
Padding: utils.StringPointer("right")},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("Filler"), Type: utils.StringPointer(utils.META_FILLER), Width: utils.IntPointer(8)},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("Filler"), Type: utils.StringPointer(utils.META_FILLER), Width: utils.IntPointer(8)},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("TerminationCode"), Type: utils.StringPointer(utils.META_COMPOSED), Value: utils.StringPointer("operator;product"),
Width: utils.IntPointer(5), Strip: utils.StringPointer("right"), Padding: utils.StringPointer("right")},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("Cost"), Type: utils.StringPointer(utils.META_COMPOSED), Value: utils.StringPointer(utils.COST), Width: utils.IntPointer(9),
Padding: utils.StringPointer("zeroleft")},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("DestinationPrivacy"), Type: utils.StringPointer(utils.MetaMaskedDestination), Width: utils.IntPointer(1)},
}
var trailerJsnCfgFlds = []*config.CdrFieldJsonCfg{
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("TypeOfRecord"), Type: utils.StringPointer(utils.META_CONSTANT), Value: utils.StringPointer("90"), Width: utils.IntPointer(2)},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("Filler1"), Type: utils.StringPointer(utils.META_FILLER), Width: utils.IntPointer(3)},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("DistributorCode"), Type: utils.StringPointer(utils.META_CONSTANT), Value: utils.StringPointer("VOI"), Width: utils.IntPointer(3)},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("FileSeqNr"), Type: utils.StringPointer(utils.META_HANDLER), Value: utils.StringPointer(META_EXPORTID), Width: utils.IntPointer(5),
Strip: utils.StringPointer("right"), Padding: utils.StringPointer("zeroleft")},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("NumberOfRecords"), Type: utils.StringPointer(utils.META_HANDLER), Value: utils.StringPointer(META_NRCDRS),
Width: utils.IntPointer(6), Padding: utils.StringPointer("zeroleft")},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("CdrsDuration"), Type: utils.StringPointer(utils.META_HANDLER), Value: utils.StringPointer(META_DURCDRS),
Width: utils.IntPointer(8), Padding: utils.StringPointer("zeroleft"), Layout: utils.StringPointer(utils.SECONDS)},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("FirstCdrTime"), Type: utils.StringPointer(utils.META_HANDLER), Value: utils.StringPointer(META_FIRSTCDRATIME),
Width: utils.IntPointer(12), Layout: utils.StringPointer("020106150400")},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("LastCdrTime"), Type: utils.StringPointer(utils.META_HANDLER), Value: utils.StringPointer(META_LASTCDRATIME),
Width: utils.IntPointer(12), Layout: utils.StringPointer("020106150400")},
&config.CdrFieldJsonCfg{Tag: utils.StringPointer("Filler2"), Type: utils.StringPointer(utils.META_FILLER), Width: utils.IntPointer(93)},
}
var hdrCfgFlds, contentCfgFlds, trailerCfgFlds []*config.CfgCdrField
// Write one CDR and test it's results only for content buffer
func TestWriteCdr(t *testing.T) {
var err error
wrBuf := &bytes.Buffer{}
cfg, _ := config.NewDefaultCGRConfig()
if hdrCfgFlds, err = config.CfgCdrFieldsFromCdrFieldsJsonCfg(hdrJsnCfgFlds); err != nil {
t.Error(err)
}
if contentCfgFlds, err = config.CfgCdrFieldsFromCdrFieldsJsonCfg(contentJsnCfgFlds); err != nil {
t.Error(err)
}
if trailerCfgFlds, err = config.CfgCdrFieldsFromCdrFieldsJsonCfg(trailerJsnCfgFlds); err != nil {
t.Error(err)
}
cdreCfg := &config.CdreConfig{
ExportFormat: utils.MetaFileFWV,
HeaderFields: hdrCfgFlds,
ContentFields: contentCfgFlds,
TrailerFields: trailerCfgFlds,
}
cdr := &CDR{CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC).String()),
ToR: utils.VOICE, OrderID: 1, OriginID: "dsafdsaf", OriginHost: "192.168.1.1",
RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org",
Category: "call", Account: "1001", Subject: "1001", Destination: "1002",
SetupTime: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC),
AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC),
Usage: time.Duration(10) * time.Second, RunID: utils.DEFAULT_RUNID, Cost: 2.34567,
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"},
}
cdre, err := NewCDRExporter([]*CDR{cdr}, cdreCfg, utils.MetaFileFWV, "", "", "fwv_1",
true, 1, '|', map[string]float64{}, 0.0, cfg.RoundingDecimals, cfg.HttpSkipTlsVerify, nil)
if err != nil {
t.Error(err)
}
if err = cdre.processCDRs(); err != nil {
t.Error(err)
}
eHeader := "10 VOIfwv_107111308420018011511340001 \n"
eContentOut := "201001 1001 1002 0211 07111308420010 1 3dsafdsaf 0002.34570\n"
eTrailer := "90 VOIfwv_100000100000010071113084200071113084200 \n"
if err := cdre.writeOut(wrBuf); err != nil {
t.Error(err)
}
allOut := wrBuf.String()
eAllOut := eHeader + eContentOut + eTrailer
if math.Mod(float64(len(allOut)), 145) != 0 {
t.Error("Unexpected export content length", len(allOut))
} else if len(allOut) != len(eAllOut) {
t.Errorf("Output does not match expected length. Have output %q, expecting: %q", allOut, eAllOut)
}
// Test stats
if !cdre.firstCdrATime.Equal(cdr.AnswerTime) {
t.Error("Unexpected firstCdrATime in stats: ", cdre.firstCdrATime)
} else if !cdre.lastCdrATime.Equal(cdr.AnswerTime) {
t.Error("Unexpected lastCdrATime in stats: ", cdre.lastCdrATime)
} else if cdre.numberOfRecords != 1 {
t.Error("Unexpected number of records in the stats: ", cdre.numberOfRecords)
} else if cdre.totalDuration != cdr.Usage {
t.Error("Unexpected total duration in the stats: ", cdre.totalDuration)
} else if cdre.totalCost != utils.Round(cdr.Cost, cdre.roundingDecimals, utils.ROUNDING_MIDDLE) {
t.Error("Unexpected total cost in the stats: ", cdre.totalCost)
}
if cdre.FirstOrderId() != 1 {
t.Error("Unexpected FirstOrderId", cdre.FirstOrderId())
}
if cdre.LastOrderId() != 1 {
t.Error("Unexpected LastOrderId", cdre.LastOrderId())
}
if cdre.TotalCost() != utils.Round(cdr.Cost, cdre.roundingDecimals, utils.ROUNDING_MIDDLE) {
t.Error("Unexpected TotalCost: ", cdre.TotalCost())
}
}
func TestWriteCdrs(t *testing.T) {
wrBuf := &bytes.Buffer{}
cdreCfg := &config.CdreConfig{
ExportFormat: utils.MetaFileFWV,
HeaderFields: hdrCfgFlds,
ContentFields: contentCfgFlds,
TrailerFields: trailerCfgFlds,
}
cdr1 := &CDR{CGRID: utils.Sha1("aaa1", time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC).String()),
ToR: utils.VOICE, OrderID: 2, OriginID: "aaa1", OriginHost: "192.168.1.1", RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org",
Category: "call", Account: "1001", Subject: "1001", Destination: "1010",
SetupTime: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC),
AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC),
Usage: time.Duration(10) * time.Second, RunID: utils.DEFAULT_RUNID, Cost: 2.25,
ExtraFields: map[string]string{"productnumber": "12341", "fieldextr2": "valextr2"},
}
cdr2 := &CDR{CGRID: utils.Sha1("aaa2", time.Date(2013, 11, 7, 7, 42, 20, 0, time.UTC).String()),
ToR: utils.VOICE, OrderID: 4, OriginID: "aaa2", OriginHost: "192.168.1.2", RequestType: utils.META_PREPAID, Direction: "*out", Tenant: "cgrates.org",
Category: "call", Account: "1002", Subject: "1002", Destination: "1011",
SetupTime: time.Date(2013, 11, 7, 7, 42, 20, 0, time.UTC),
AnswerTime: time.Date(2013, 11, 7, 7, 42, 26, 0, time.UTC),
Usage: time.Duration(5) * time.Minute, RunID: utils.DEFAULT_RUNID, Cost: 1.40001,
ExtraFields: map[string]string{"productnumber": "12342", "fieldextr2": "valextr2"},
}
cdr3 := &CDR{}
cdr4 := &CDR{CGRID: utils.Sha1("aaa3", time.Date(2013, 11, 7, 9, 42, 18, 0, time.UTC).String()),
ToR: utils.VOICE, OrderID: 3, OriginID: "aaa4", OriginHost: "192.168.1.4", RequestType: utils.META_POSTPAID, Direction: "*out", Tenant: "cgrates.org",
Category: "call", Account: "1004", Subject: "1004", Destination: "1013",
SetupTime: time.Date(2013, 11, 7, 9, 42, 18, 0, time.UTC),
AnswerTime: time.Date(2013, 11, 7, 9, 42, 26, 0, time.UTC),
Usage: time.Duration(20) * time.Second, RunID: utils.DEFAULT_RUNID, Cost: 2.34567,
ExtraFields: map[string]string{"productnumber": "12344", "fieldextr2": "valextr2"},
}
cfg, _ := config.NewDefaultCGRConfig()
cdre, err := NewCDRExporter([]*CDR{cdr1, cdr2, cdr3, cdr4}, cdreCfg, utils.MetaFileFWV, "", "", "fwv_1",
true, 1, ',', map[string]float64{}, 0.0, cfg.RoundingDecimals, cfg.HttpSkipTlsVerify, nil)
if err != nil {
t.Error(err)
}
if err = cdre.processCDRs(); err != nil {
t.Error(err)
}
if err := cdre.writeOut(wrBuf); err != nil {
t.Error(err)
}
if len(wrBuf.String()) != 725 {
t.Error("Output buffer does not contain expected info. Expecting len: 725, got: ", len(wrBuf.String()))
}
// Test stats
if !cdre.firstCdrATime.Equal(cdr2.AnswerTime) {
t.Error("Unexpected firstCdrATime in stats: ", cdre.firstCdrATime)
}
if !cdre.lastCdrATime.Equal(cdr4.AnswerTime) {
t.Error("Unexpected lastCdrATime in stats: ", cdre.lastCdrATime)
}
if cdre.numberOfRecords != 3 {
t.Error("Unexpected number of records in the stats: ", cdre.numberOfRecords)
}
if cdre.totalDuration != time.Duration(330)*time.Second {
t.Error("Unexpected total duration in the stats: ", cdre.totalDuration)
}
if cdre.totalCost != 5.99568 {
t.Error("Unexpected total cost in the stats: ", cdre.totalCost)
}
if cdre.FirstOrderId() != 2 {
t.Error("Unexpected FirstOrderId", cdre.FirstOrderId())
}
if cdre.LastOrderId() != 4 {
t.Error("Unexpected LastOrderId", cdre.LastOrderId())
}
if cdre.TotalCost() != 5.99568 {
t.Error("Unexpected TotalCost: ", cdre.TotalCost())
}
}

View File

@@ -18,12 +18,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"path"
"reflect"
"strings"
"time"
@@ -33,7 +30,6 @@ import (
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
"github.com/streadway/amqp"
)
var cdrServer *CdrServer // Share the server so we can use it in http handlers
@@ -195,12 +191,11 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) {
var out int
go self.stats.Call("CDRStatsV1.AppendCDR", cdr, &out)
}
if len(self.cgrCfg.CDRSCdrReplication) != 0 { // Replicate raw CDR
go self.replicateCdr(cdr)
if len(self.cgrCfg.CDRSOnlineCDRExports) != 0 { // Replicate raw CDR
self.replicateCDRs([]*CDR{cdr})
}
if self.rals != nil && !cdr.Rated { // CDRs not rated will be processed by Rating
go self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CDRSStoreCdrs, self.stats != nil, len(self.cgrCfg.CDRSCdrReplication) != 0)
go self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CDRSStoreCdrs, self.stats != nil, len(self.cgrCfg.CDRSOnlineCDRExports) != 0)
}
return nil
}
@@ -284,9 +279,7 @@ func (self *CdrServer) deriveRateStoreStatsReplicate(cdr *CDR, store, stats, rep
}
}
if replicate {
for _, ratedCDR := range ratedCDRs {
self.replicateCdr(ratedCDR)
}
self.replicateCDRs(ratedCDRs)
}
return nil
}
@@ -452,89 +445,22 @@ func (self *CdrServer) getCostFromRater(cdr *CDR) (*CallCost, error) {
return cc, nil
}
// ToDo: Add websocket support
func (self *CdrServer) replicateCdr(cdr *CDR) error {
for _, rplCfg := range self.cgrCfg.CDRSCdrReplication {
passesFilters := true
for _, cdfFltr := range rplCfg.CdrFilter {
if !cdfFltr.FilterPasses(cdr.FieldAsString(cdfFltr)) {
passesFilters = false
break
}
}
if !passesFilters { // Not passes filters, ignore this replication
func (self *CdrServer) replicateCDRs(cdrs []*CDR) (err error) {
for _, exportID := range self.cgrCfg.CDRSOnlineCDRExports {
expTpl := self.cgrCfg.CdreProfiles[exportID] // not checking for existence of profile since this should be done in a higher layer
var cdre *CDRExporter
if cdre, err = NewCDRExporter(cdrs, expTpl, expTpl.ExportFormat, expTpl.ExportPath, self.cgrCfg.FailedPostsDir, "CDRSReplication",
expTpl.Synchronous, expTpl.Attempts, expTpl.FieldSeparator, expTpl.UsageMultiplyFactor,
expTpl.CostMultiplyFactor, self.cgrCfg.RoundingDecimals, self.cgrCfg.HttpSkipTlsVerify, self.httpPoster); err != nil {
utils.Logger.Err(fmt.Sprintf("<CDRS> Building CDRExporter for online exports got error: <%s>", err.Error()))
continue
}
var body interface{}
var content = ""
switch rplCfg.Transport {
case utils.MetaHTTPjsonCDR, utils.MetaAMQPjsonCDR:
jsn, err := json.Marshal(cdr)
if err != nil {
return err
}
body = jsn
case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap:
expMp, err := cdr.AsExportMap(rplCfg.ContentFields, self.cgrCfg.HttpSkipTlsVerify, nil)
if err != nil {
return err
}
jsn, err := json.Marshal(expMp)
if err != nil {
return err
}
body = jsn
case utils.META_HTTP_POST:
expMp, err := cdr.AsExportMap(rplCfg.ContentFields, self.cgrCfg.HttpSkipTlsVerify, nil)
if err != nil {
return err
}
vals := url.Values{}
for fld, val := range expMp {
vals.Set(fld, val)
}
body = vals
}
var errChan chan error
if rplCfg.Synchronous {
errChan = make(chan error)
}
go func(body interface{}, rplCfg *config.CDRReplicationCfg, content string, errChan chan error) {
var err error
fallbackPath := utils.META_NONE
if rplCfg.FallbackFileName() != utils.META_NONE {
fallbackPath = path.Join(self.cgrCfg.FailedPostsDir, rplCfg.FallbackFileName())
}
switch rplCfg.Transport {
case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST:
_, err = self.httpPoster.Post(rplCfg.Address, utils.PosterTransportContentTypes[rplCfg.Transport], body, rplCfg.Attempts, fallbackPath)
case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap:
var amqpPoster *utils.AMQPPoster
amqpPoster, err = utils.AMQPPostersCache.GetAMQPPoster(rplCfg.Address, rplCfg.Attempts, self.cgrCfg.FailedPostsDir)
if err == nil { // error will be checked bellow
var chn *amqp.Channel
chn, err = amqpPoster.Post(
nil, utils.PosterTransportContentTypes[rplCfg.Transport], body.([]byte), rplCfg.FallbackFileName())
if chn != nil {
chn.Close()
}
}
default:
err = fmt.Errorf("unsupported replication transport: %s", rplCfg.Transport)
}
if err != nil {
utils.Logger.Err(fmt.Sprintf(
"<CDRReplicator> Replicating CDR: %+v, transport: %s, got error: %s", cdr, rplCfg.Transport, err.Error()))
}
if rplCfg.Synchronous {
errChan <- err
}
}(body, rplCfg, content, errChan)
if rplCfg.Synchronous { // Synchronize here
<-errChan
if err = cdre.ExportCDRs(); err != nil {
utils.Logger.Err(fmt.Sprintf("<CDRS> Replicating CDR: %+v, got error: <%s>", err.Error()))
continue
}
}
return nil
return
}
// Called by rate/re-rate API, FixMe: deprecate it once new APIer structure is operational
@@ -544,7 +470,7 @@ func (self *CdrServer) RateCDRs(cdrFltr *utils.CDRsFilter, sendToStats bool) err
return err
}
for _, cdr := range cdrs {
if err := self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CDRSStoreCdrs, sendToStats, len(self.cgrCfg.CDRSCdrReplication) != 0); err != nil {
if err := self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CDRSStoreCdrs, sendToStats, len(self.cgrCfg.CDRSOnlineCDRExports) != 0); err != nil {
utils.Logger.Err(fmt.Sprintf("<CDRS> Processing CDR %+v, got error: %s", cdr, err.Error()))
}
}
@@ -614,7 +540,7 @@ func (self *CdrServer) V1RateCDRs(attrs utils.AttrRateCDRs, reply *string) error
if attrs.SendToStatS != nil {
sendToStats = *attrs.SendToStatS
}
replicate := len(self.cgrCfg.CDRSCdrReplication) != 0
replicate := len(self.cgrCfg.CDRSOnlineCDRExports) != 0
if attrs.ReplicateCDRs != nil {
replicate = *attrs.ReplicateCDRs
}

View File

@@ -19,7 +19,6 @@ package engine
import (
"reflect"
"regexp"
"testing"
"time"
@@ -97,9 +96,7 @@ func TestSearchExtraFieldInSlice(t *testing.T) {
}
func TestSearchReplaceInExtraFields(t *testing.T) {
fsCdrCfg.CDRSExtraFields = []*utils.RSRField{&utils.RSRField{Id: "read_codec"},
&utils.RSRField{Id: "sip_user_agent", RSRules: []*utils.ReSearchReplace{&utils.ReSearchReplace{SearchRegexp: regexp.MustCompile(`([A-Za-z]*).+`), ReplaceTemplate: "$1"}}},
&utils.RSRField{Id: "write_codec"}}
fsCdrCfg.CDRSExtraFields = utils.ParseRSRFieldsMustCompile(`read_codec;~sip_user_agent:s/([A-Za-z]*).+/$1/;write_codec`, utils.INFIELD_SEP)
fsCdr, _ := NewFSCdr(body, fsCdrCfg)
extraFields := fsCdr.getExtraFields()
if len(extraFields) != 3 {
@@ -147,11 +144,11 @@ func TestDDazRSRExtraFields(t *testing.T) {
}`)
var err error
fsCdrCfg, err = config.NewCGRConfigFromJsonString(eFieldsCfg)
expCdrExtra := utils.ParseRSRFieldsMustCompile(`~effective_caller_id_number:s/(\d+)/+$1/`, utils.INFIELD_SEP)
if err != nil {
t.Error("Could not parse the config", err.Error())
} else if !reflect.DeepEqual(fsCdrCfg.CDRSExtraFields, []*utils.RSRField{&utils.RSRField{Id: "effective_caller_id_number",
RSRules: []*utils.ReSearchReplace{&utils.ReSearchReplace{SearchRegexp: regexp.MustCompile(`(\d+)`), ReplaceTemplate: "+$1"}}}}) {
t.Errorf("Unexpected value for config CdrsExtraFields: %v", fsCdrCfg.CDRSExtraFields)
} else if !reflect.DeepEqual(expCdrExtra[0], fsCdrCfg.CDRSExtraFields[0]) { // Kinda deepEqual bug since without index does not match
t.Errorf("Expecting: %+v, received: %+v", expCdrExtra, fsCdrCfg.CDRSExtraFields)
}
fsCdr, err := NewFSCdr(simpleJsonCdr, fsCdrCfg)
if err != nil {
@@ -159,6 +156,6 @@ func TestDDazRSRExtraFields(t *testing.T) {
}
extraFields := fsCdr.getExtraFields()
if extraFields["effective_caller_id_number"] != "+4986517174963" {
t.Error("Unexpected effective_caller_id_number received", extraFields["effective_caller_id_number"])
t.Errorf("Unexpected effective_caller_id_number received: %+v", extraFields["effective_caller_id_number"])
}
}

View File

@@ -59,7 +59,7 @@ type PubSub struct {
accountDb AccountingStorage
}
func NewPubSub(accountDb AccountingStorage, ttlVerify bool) *PubSub {
func NewPubSub(accountDb AccountingStorage, ttlVerify bool) (*PubSub, error) {
ps := &PubSub{
ttlVerify: ttlVerify,
subscribers: make(map[string]*SubscriberData),
@@ -68,10 +68,17 @@ func NewPubSub(accountDb AccountingStorage, ttlVerify bool) *PubSub {
accountDb: accountDb,
}
// load subscribers
if subs, err := accountDb.GetSubscribers(); err == nil {
if subs, err := accountDb.GetSubscribers(); err != nil {
return nil, err
} else {
ps.subscribers = subs
}
return ps
for _, sData := range ps.subscribers {
if err := sData.Filters.ParseRules(); err != nil { // Parse rules into regexp objects
utils.Logger.Err(fmt.Sprintf("<PubSub> Error <%s> when parsing rules out of subscriber data: %+v", err.Error(), sData))
}
}
return ps, nil
}
func (ps *PubSub) saveSubscriber(key string) {

View File

@@ -25,7 +25,10 @@ import (
)
func TestSubscribe(t *testing.T) {
ps := NewPubSub(accountingStorage, false)
ps, err := NewPubSub(accountingStorage, false)
if err != nil {
t.Error(err)
}
var r string
if err := ps.Subscribe(SubscribeInfo{
EventFilter: "EventName/test",
@@ -41,7 +44,10 @@ func TestSubscribe(t *testing.T) {
}
func TestSubscribeSave(t *testing.T) {
ps := NewPubSub(accountingStorage, false)
ps, err := NewPubSub(accountingStorage, false)
if err != nil {
t.Error(err)
}
var r string
if err := ps.Subscribe(SubscribeInfo{
EventFilter: "EventName/test",
@@ -58,7 +64,10 @@ func TestSubscribeSave(t *testing.T) {
}
func TestSubscribeNoTransport(t *testing.T) {
ps := NewPubSub(accountingStorage, false)
ps, err := NewPubSub(accountingStorage, false)
if err != nil {
t.Error(err)
}
var r string
if err := ps.Subscribe(SubscribeInfo{
EventFilter: "EventName/test",
@@ -71,7 +80,10 @@ func TestSubscribeNoTransport(t *testing.T) {
}
func TestSubscribeNoExpire(t *testing.T) {
ps := NewPubSub(accountingStorage, false)
ps, err := NewPubSub(accountingStorage, false)
if err != nil {
t.Error(err)
}
var r string
if err := ps.Subscribe(SubscribeInfo{
EventFilter: "EventName/test",
@@ -87,7 +99,10 @@ func TestSubscribeNoExpire(t *testing.T) {
}
func TestUnsubscribe(t *testing.T) {
ps := NewPubSub(accountingStorage, false)
ps, err := NewPubSub(accountingStorage, false)
if err != nil {
t.Error(err)
}
var r string
if err := ps.Subscribe(SubscribeInfo{
EventFilter: "EventName/test",
@@ -110,7 +125,10 @@ func TestUnsubscribe(t *testing.T) {
}
func TestUnsubscribeSave(t *testing.T) {
ps := NewPubSub(accountingStorage, false)
ps, err := NewPubSub(accountingStorage, false)
if err != nil {
t.Error(err)
}
var r string
if err := ps.Subscribe(SubscribeInfo{
EventFilter: "EventName/test",
@@ -134,7 +152,10 @@ func TestUnsubscribeSave(t *testing.T) {
}
func TestPublishExpired(t *testing.T) {
ps := NewPubSub(accountingStorage, true)
ps, err := NewPubSub(accountingStorage, true)
if err != nil {
t.Error(err)
}
ps.pubFunc = func(url string, ttl bool, obj []byte) ([]byte, error) {
return nil, nil
}
@@ -156,7 +177,10 @@ func TestPublishExpired(t *testing.T) {
}
func TestPublishExpiredSave(t *testing.T) {
ps := NewPubSub(accountingStorage, true)
ps, err := NewPubSub(accountingStorage, true)
if err != nil {
t.Error(err)
}
ps.pubFunc = func(url string, ttl bool, obj []byte) ([]byte, error) {
return nil, nil
}

View File

@@ -42,6 +42,7 @@ type Storage interface {
// Interface for storage providers.
type RatingStorage interface {
Storage
Marshaler() Marshaler
HasData(string, string) (bool, error)
LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs []string) error
GetRatingPlan(string, bool, string) (*RatingPlan, error)
@@ -83,6 +84,7 @@ type RatingStorage interface {
type AccountingStorage interface {
Storage
Marshaler() Marshaler
LoadAccountingCache(alsIDs, rvAlsIDs, rlIDs []string) error
GetAccount(string) (*Account, error)
SetAccount(*Account) error
@@ -117,6 +119,7 @@ type AccountingStorage interface {
// OnlineStorage contains methods to use for administering online data
type DataDB interface {
Storage
Marshaler() Marshaler
HasData(string, string) (bool, error)
LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs []string) error
GetRatingPlan(string, bool, string) (*RatingPlan, error)

View File

@@ -21,6 +21,7 @@ import (
"bytes"
"compress/zlib"
"errors"
"fmt"
"io/ioutil"
"strings"
"sync"
@@ -70,11 +71,13 @@ func (s storage) smembers(key string, ms Marshaler) (idMap utils.StringMap, ok b
}
func NewMapStorage() (*MapStorage, error) {
return &MapStorage{dict: make(map[string][]byte), ms: NewCodecMsgpackMarshaler(), cacheCfg: &config.CacheConfig{RatingPlans: &config.CacheParamConfig{Precache: true}}}, nil
return &MapStorage{dict: make(map[string][]byte), ms: NewCodecMsgpackMarshaler(), cacheCfg: config.CgrConfig().CacheConfig}, nil
}
func NewMapStorageJson() (*MapStorage, error) {
return &MapStorage{dict: make(map[string][]byte), ms: new(JSONBufMarshaler), cacheCfg: &config.CacheConfig{RatingPlans: &config.CacheParamConfig{Precache: true}}}, nil
func NewMapStorageJson() (mpStorage *MapStorage, err error) {
mpStorage, err = NewMapStorage()
mpStorage.ms = new(JSONBufMarshaler)
return
}
func (ms *MapStorage) Close() {}
@@ -86,12 +89,16 @@ func (ms *MapStorage) Flush(ignore string) error {
return nil
}
func (ms *MapStorage) Marshaler() Marshaler {
return ms.ms
}
func (ms *MapStorage) SelectDatabase(dbName string) (err error) {
return
}
func (ms *MapStorage) RebuildReverseForPrefix(prefix string) error {
// FIXME: should do transaction
// ToDo: should do transaction
keys, err := ms.GetKeysForPrefix(prefix)
if err != nil {
return err
@@ -138,7 +145,6 @@ func (ms *MapStorage) RebuildReverseForPrefix(prefix string) error {
return nil
}
// FixMe
func (ms *MapStorage) LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs []string) error {
if ms.cacheCfg == nil {
return nil
@@ -246,6 +252,119 @@ func (ms *MapStorage) PreloadCacheForPrefix(prefix string) error {
// CacheDataFromDB loads data to cache,
// prefix represents the cache prefix, IDs should be nil if all available data should be loaded
func (ms *MapStorage) CacheDataFromDB(prefix string, IDs []string, mustBeCached bool) (err error) {
if !utils.IsSliceMember([]string{utils.DESTINATION_PREFIX,
utils.REVERSE_DESTINATION_PREFIX,
utils.RATING_PLAN_PREFIX,
utils.RATING_PROFILE_PREFIX,
utils.ACTION_PREFIX,
utils.ACTION_PLAN_PREFIX,
utils.AccountActionPlansPrefix,
utils.ACTION_TRIGGER_PREFIX,
utils.SHARED_GROUP_PREFIX,
utils.DERIVEDCHARGERS_PREFIX,
utils.LCR_PREFIX,
utils.ALIASES_PREFIX,
utils.REVERSE_ALIASES_PREFIX,
utils.ResourceLimitsPrefix}, prefix) {
return utils.NewCGRError(utils.REDIS,
utils.MandatoryIEMissingCaps,
utils.UnsupportedCachePrefix,
fmt.Sprintf("prefix <%s> is not a supported cache prefix", prefix))
}
if IDs == nil {
keyIDs, err := ms.GetKeysForPrefix(prefix)
if err != nil {
return utils.NewCGRError(utils.REDIS,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("MapStorage error <%s> querying keys for prefix: <%s>", prefix))
}
for _, keyID := range keyIDs {
if mustBeCached { // Only consider loading ids which are already in cache
if _, hasIt := cache.Get(keyID); !hasIt {
continue
}
}
IDs = append(IDs, keyID[len(prefix):])
}
var nrItems int
switch prefix {
case utils.DESTINATION_PREFIX:
nrItems = ms.cacheCfg.Destinations.Limit
case utils.REVERSE_DESTINATION_PREFIX:
nrItems = ms.cacheCfg.ReverseDestinations.Limit
case utils.RATING_PLAN_PREFIX:
nrItems = ms.cacheCfg.RatingPlans.Limit
case utils.RATING_PROFILE_PREFIX:
nrItems = ms.cacheCfg.RatingProfiles.Limit
case utils.ACTION_PREFIX:
nrItems = ms.cacheCfg.Actions.Limit
case utils.ACTION_PLAN_PREFIX:
nrItems = ms.cacheCfg.ActionPlans.Limit
case utils.AccountActionPlansPrefix:
nrItems = ms.cacheCfg.AccountActionPlans.Limit
case utils.ACTION_TRIGGER_PREFIX:
nrItems = ms.cacheCfg.ActionTriggers.Limit
case utils.SHARED_GROUP_PREFIX:
nrItems = ms.cacheCfg.SharedGroups.Limit
case utils.DERIVEDCHARGERS_PREFIX:
nrItems = ms.cacheCfg.DerivedChargers.Limit
case utils.LCR_PREFIX:
nrItems = ms.cacheCfg.Lcr.Limit
case utils.ALIASES_PREFIX:
nrItems = ms.cacheCfg.Aliases.Limit
case utils.REVERSE_ALIASES_PREFIX:
nrItems = ms.cacheCfg.ReverseAliases.Limit
case utils.ResourceLimitsPrefix:
nrItems = ms.cacheCfg.ResourceLimits.Limit
}
if nrItems != 0 && nrItems < len(IDs) {
IDs = IDs[:nrItems]
}
}
for _, dataID := range IDs {
if mustBeCached {
if _, hasIt := cache.Get(prefix + dataID); !hasIt { // only cache if previously there
continue
}
}
switch prefix {
case utils.DESTINATION_PREFIX:
_, err = ms.GetDestination(dataID, true, utils.NonTransactional)
case utils.REVERSE_DESTINATION_PREFIX:
_, err = ms.GetReverseDestination(dataID, true, utils.NonTransactional)
case utils.RATING_PLAN_PREFIX:
_, err = ms.GetRatingPlan(dataID, true, utils.NonTransactional)
case utils.RATING_PROFILE_PREFIX:
_, err = ms.GetRatingProfile(dataID, true, utils.NonTransactional)
case utils.ACTION_PREFIX:
_, err = ms.GetActions(dataID, true, utils.NonTransactional)
case utils.ACTION_PLAN_PREFIX:
_, err = ms.GetActionPlan(dataID, true, utils.NonTransactional)
case utils.AccountActionPlansPrefix:
_, err = ms.GetAccountActionPlans(dataID, true, utils.NonTransactional)
case utils.ACTION_TRIGGER_PREFIX:
_, err = ms.GetActionTriggers(dataID, true, utils.NonTransactional)
case utils.SHARED_GROUP_PREFIX:
_, err = ms.GetSharedGroup(dataID, true, utils.NonTransactional)
case utils.DERIVEDCHARGERS_PREFIX:
_, err = ms.GetDerivedChargers(dataID, true, utils.NonTransactional)
case utils.LCR_PREFIX:
_, err = ms.GetLCR(dataID, true, utils.NonTransactional)
case utils.ALIASES_PREFIX:
_, err = ms.GetAlias(dataID, true, utils.NonTransactional)
case utils.REVERSE_ALIASES_PREFIX:
_, err = ms.GetReverseAlias(dataID, true, utils.NonTransactional)
case utils.ResourceLimitsPrefix:
_, err = ms.GetResourceLimit(dataID, true, utils.NonTransactional)
}
if err != nil {
return utils.NewCGRError(utils.REDIS,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error <%s> querying MapStorage for category: <%s>, dataID: <%s>", prefix, dataID))
}
}
return
}
@@ -339,7 +458,9 @@ func (ms *MapStorage) GetRatingProfile(key string, skipCache bool, transactionID
cCommit := cacheCommit(transactionID)
if values, ok := ms.dict[key]; ok {
rpf = new(RatingProfile)
err = ms.ms.Unmarshal(values, rpf)
if err = ms.ms.Unmarshal(values, &rpf); err != nil {
return nil, err
}
} else {
cache.Set(key, nil, cCommit, transactionID)
return nil, utils.ErrNotFound
@@ -435,14 +556,18 @@ func (ms *MapStorage) GetDestination(key string, skipCache bool, transactionID s
}
r.Close()
dest = new(Destination)
err = ms.ms.Unmarshal(out, dest)
err = ms.ms.Unmarshal(out, &dest)
if err != nil {
cache.Set(key, dest, cCommit, transactionID)
cache.Set(key, nil, cCommit, transactionID)
return nil, utils.ErrNotFound
}
} else {
}
if dest == nil {
cache.Set(key, nil, cCommit, transactionID)
return nil, utils.ErrNotFound
}
cache.Set(key, dest, cCommit, transactionID)
return
}
@@ -504,16 +629,19 @@ func (ms *MapStorage) RemoveDestination(destID string, transactionID string) (er
if err != nil {
return
}
ms.mu.Lock()
delete(ms.dict, key)
ms.mu.Unlock()
cache.RemKey(key, cacheCommit(transactionID), transactionID)
for _, prefix := range d.Prefixes {
ms.mu.Lock()
ms.dict.srem(utils.REVERSE_DESTINATION_PREFIX+prefix, destID, ms.ms)
ms.mu.Unlock()
ms.GetReverseDestination(prefix, true, transactionID) // it will recache the destination
}
return
}
@@ -573,9 +701,9 @@ func (ms *MapStorage) GetActions(key string, skipCache bool, transactionID strin
ms.mu.RLock()
defer ms.mu.RUnlock()
cCommit := cacheCommit(transactionID)
key = utils.ACTION_PREFIX + key
cachekey := utils.ACTION_PREFIX + key
if !skipCache {
if x, err := cache.GetCloned(key); err != nil {
if x, err := cache.GetCloned(cachekey); err != nil {
if err.Error() != utils.ItemNotFound {
return nil, err
}
@@ -585,13 +713,13 @@ func (ms *MapStorage) GetActions(key string, skipCache bool, transactionID strin
return x.(Actions), nil
}
}
if values, ok := ms.dict[key]; ok {
if values, ok := ms.dict[cachekey]; ok {
err = ms.ms.Unmarshal(values, &as)
} else {
cache.Set(key, nil, cCommit, transactionID)
cache.Set(cachekey, nil, cCommit, transactionID)
return nil, utils.ErrNotFound
}
cache.Set(key, as, cCommit, transactionID)
cache.Set(cachekey, as, cCommit, transactionID)
return
}
@@ -599,26 +727,28 @@ func (ms *MapStorage) SetActions(key string, as Actions, transactionID string) (
ms.mu.Lock()
defer ms.mu.Unlock()
cCommit := cacheCommit(transactionID)
cachekey := utils.ACTION_PREFIX + key
result, err := ms.ms.Marshal(&as)
ms.dict[utils.ACTION_PREFIX+key] = result
cache.RemKey(utils.ACTION_PREFIX+key, cCommit, transactionID)
ms.dict[cachekey] = result
cache.RemKey(cachekey, cCommit, transactionID)
return
}
func (ms *MapStorage) RemoveActions(key string, transactionID string) (err error) {
cachekey := utils.ACTION_PREFIX + key
ms.mu.Lock()
defer ms.mu.Unlock()
delete(ms.dict, utils.ACTION_PREFIX+key)
cache.RemKey(utils.ACTION_PREFIX+key, cacheCommit(transactionID), transactionID)
delete(ms.dict, cachekey)
ms.mu.Unlock()
cache.RemKey(cachekey, cacheCommit(transactionID), transactionID)
return
}
func (ms *MapStorage) GetSharedGroup(key string, skipCache bool, transactionID string) (sg *SharedGroup, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
key = utils.SHARED_GROUP_PREFIX + key
cachekey := utils.SHARED_GROUP_PREFIX + key
if !skipCache {
if x, ok := cache.Get(key); ok {
if x, ok := cache.Get(cachekey); ok {
if x != nil {
return x.(*SharedGroup), nil
}
@@ -626,13 +756,13 @@ func (ms *MapStorage) GetSharedGroup(key string, skipCache bool, transactionID s
}
}
cCommit := cacheCommit(transactionID)
if values, ok := ms.dict[key]; ok {
if values, ok := ms.dict[cachekey]; ok {
err = ms.ms.Unmarshal(values, &sg)
if err == nil {
cache.Set(key, sg, cCommit, transactionID)
cache.Set(cachekey, sg, cCommit, transactionID)
}
} else {
cache.Set(key, nil, cCommit, transactionID)
cache.Set(cachekey, nil, cCommit, transactionID)
return nil, utils.ErrNotFound
}
return
@@ -650,12 +780,19 @@ func (ms *MapStorage) SetSharedGroup(sg *SharedGroup, transactionID string) (err
func (ms *MapStorage) GetAccount(key string) (ub *Account, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
if values, ok := ms.dict[utils.ACCOUNT_PREFIX+key]; ok {
ub = &Account{ID: key}
err = ms.ms.Unmarshal(values, ub)
} else {
values, ok := ms.dict[utils.ACCOUNT_PREFIX+key]
if !ok {
return nil, utils.ErrNotFound
}
ub = &Account{ID: key}
err = ms.ms.Unmarshal(values, &ub)
if err != nil {
return nil, err
}
if len(values) == 0 {
return nil, utils.ErrNotFound
}
return
}
@@ -781,41 +918,40 @@ func (ms *MapStorage) RemoveUser(key string) error {
func (ms *MapStorage) GetAlias(key string, skipCache bool, transactionID string) (al *Alias, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
origKey := key
key = utils.ALIASES_PREFIX + key
cacheKey := utils.ALIASES_PREFIX + key
cCommit := cacheCommit(transactionID)
if !skipCache {
if x, ok := cache.Get(key); ok {
if x != nil {
al = &Alias{Values: x.(AliasValues)}
al.SetId(origKey)
return al, nil
if x, ok := cache.Get(cacheKey); ok {
if x == nil {
return nil, utils.ErrNotFound
}
return nil, utils.ErrNotFound
return x.(*Alias), nil
}
}
cCommit := cacheCommit(transactionID)
if values, ok := ms.dict[key]; ok {
if values, ok := ms.dict[cacheKey]; ok {
al = &Alias{Values: make(AliasValues, 0)}
al.SetId(key[len(utils.ALIASES_PREFIX):])
err = ms.ms.Unmarshal(values, &al.Values)
if err == nil {
cache.Set(key, al.Values, cCommit, transactionID)
al.SetId(key)
if err = ms.ms.Unmarshal(values, &al.Values); err != nil {
return nil, err
}
} else {
cache.Set(key, nil, cCommit, transactionID)
cache.Set(cacheKey, nil, cCommit, transactionID)
return nil, utils.ErrNotFound
}
return al, nil
cache.Set(cacheKey, al, cCommit, transactionID)
return
}
func (ms *MapStorage) SetAlias(al *Alias, transactionID string) error {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(al.Values)
if err != nil {
return err
}
key := utils.ALIASES_PREFIX + al.GetId()
ms.mu.Lock()
defer ms.mu.Unlock()
ms.dict[key] = result
cache.RemKey(key, cacheCommit(transactionID), transactionID)
return nil
@@ -833,15 +969,15 @@ func (ms *MapStorage) GetReverseAlias(reverseID string, skipCache bool, transact
return nil, utils.ErrNotFound
}
}
var values []string
cCommit := cacheCommit(transactionID)
if idMap, ok := ms.dict.smembers(key, ms.ms); len(idMap) > 0 && ok {
values = idMap.Slice()
ids = idMap.Slice()
} else {
cache.Set(key, nil, cCommit, transactionID)
return nil, utils.ErrNotFound
}
cache.Set(key, values, cCommit, transactionID)
cache.Set(key, ids, cCommit, transactionID)
return
}
@@ -1023,6 +1159,8 @@ func (ms *MapStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error
}
func (ms *MapStorage) GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (apIDs []string, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
key := utils.AccountActionPlansPrefix + acntID
if !skipCache {
if x, ok := cache.Get(key); ok {
@@ -1032,9 +1170,7 @@ func (ms *MapStorage) GetAccountActionPlans(acntID string, skipCache bool, trans
return x.([]string), nil
}
}
ms.mu.RLock()
values, ok := ms.dict[key]
ms.mu.RUnlock()
if !ok {
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
err = utils.ErrNotFound
@@ -1049,8 +1185,7 @@ func (ms *MapStorage) GetAccountActionPlans(acntID string, skipCache bool, trans
func (ms *MapStorage) SetAccountActionPlans(acntID string, apIDs []string, overwrite bool) (err error) {
if !overwrite {
oldaPlIDs, err := ms.GetAccountActionPlans(acntID, true, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
if oldaPlIDs, err := ms.GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil && err != utils.ErrNotFound {
return err
} else {
for _, oldAPid := range oldaPlIDs {
@@ -1060,7 +1195,6 @@ func (ms *MapStorage) SetAccountActionPlans(acntID string, apIDs []string, overw
}
}
}
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(apIDs)
@@ -1068,12 +1202,11 @@ func (ms *MapStorage) SetAccountActionPlans(acntID string, apIDs []string, overw
return err
}
ms.dict[utils.AccountActionPlansPrefix+acntID] = result
return
}
func (ms *MapStorage) RemAccountActionPlans(acntID string, apIDs []string) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
key := utils.AccountActionPlansPrefix + acntID
if len(apIDs) == 0 {
delete(ms.dict, key)
@@ -1090,10 +1223,17 @@ func (ms *MapStorage) RemAccountActionPlans(acntID string, apIDs []string) (err
}
i++
}
ms.mu.Lock()
defer ms.mu.Unlock()
if len(oldaPlIDs) == 0 {
delete(ms.dict, key)
return
}
var result []byte
if result, err = ms.ms.Marshal(oldaPlIDs); err != nil {
return err
}
ms.dict[key] = result
return
}
@@ -1273,7 +1413,8 @@ func (ms *MapStorage) SetResourceLimit(rl *ResourceLimit, transactionID string)
if err != nil {
return err
}
ms.dict[utils.ResourceLimitsPrefix+rl.ID] = result
key := utils.ResourceLimitsPrefix + rl.ID
ms.dict[key] = result
return nil
}
@@ -1310,9 +1451,10 @@ func (ms *MapStorage) SetReqFilterIndexes(dbKey string, indexes map[string]map[s
return
}
func (ms *MapStorage) MatchReqFilterIndex(dbKey, fieldValKey string) (itemIDs utils.StringMap, err error) {
cacheKey := dbKey + fieldValKey
ms.mu.RLock()
defer ms.mu.RUnlock()
if x, ok := cache.Get(dbKey + fieldValKey); ok { // Attempt to find in cache first
if x, ok := cache.Get(cacheKey); ok { // Attempt to find in cache first
if x != nil {
return x.(utils.StringMap), nil
}
@@ -1321,7 +1463,7 @@ func (ms *MapStorage) MatchReqFilterIndex(dbKey, fieldValKey string) (itemIDs ut
// Not found in cache, check in DB
values, ok := ms.dict[dbKey]
if !ok {
cache.Set(dbKey+fieldValKey, nil, true, utils.NonTransactional)
cache.Set(cacheKey, nil, true, utils.NonTransactional)
return nil, utils.ErrNotFound
}
var indexes map[string]map[string]utils.StringMap
@@ -1332,7 +1474,12 @@ func (ms *MapStorage) MatchReqFilterIndex(dbKey, fieldValKey string) (itemIDs ut
if _, hasIt := indexes[keySplt[0]]; hasIt {
itemIDs = indexes[keySplt[0]][keySplt[1]]
}
cache.Set(dbKey+fieldValKey, itemIDs, true, utils.NonTransactional)
//Verify items
if len(itemIDs) == 0 {
cache.Set(cacheKey, nil, true, utils.NonTransactional)
return nil, utils.ErrNotFound
}
cache.Set(cacheKey, itemIDs, true, utils.NonTransactional)
return
}

View File

@@ -88,11 +88,17 @@ var (
)
func NewMongoStorage(host, port, db, user, pass, storageType string, cdrsIndexes []string, cacheCfg *config.CacheConfig, loadHistorySize int) (ms *MongoStorage, err error) {
address := fmt.Sprintf("%s:%s", host, port)
if user != "" && pass != "" {
address = fmt.Sprintf("%s:%s@%s", user, pass, address)
url := host
if port != "" {
url += ":" + port
}
session, err := mgo.Dial(address)
if user != "" && pass != "" {
url = fmt.Sprintf("%s:%s@%s", user, pass, url)
}
if db != "" {
url += "/" + db
}
session, err := mgo.Dial(url)
if err != nil {
return nil, err
}
@@ -322,6 +328,15 @@ func (ms *MongoStorage) Flush(ignore string) (err error) {
return dbSession.DB(ms.db).DropDatabase()
}
func (ms *MongoStorage) Marshaler() Marshaler {
return ms.ms
}
// DB returnes a database object with cloned session inside
func (ms *MongoStorage) DB() *mgo.Database {
return ms.session.Copy().DB(ms.db)
}
func (ms *MongoStorage) SelectDatabase(dbName string) (err error) {
ms.db = dbName
return

View File

@@ -109,6 +109,10 @@ func (rs *RedisStorage) Flush(ignore string) error {
return rs.Cmd("FLUSHDB").Err
}
func (rs *RedisStorage) Marshaler() Marshaler {
return rs.ms
}
func (rs *RedisStorage) SelectDatabase(dbName string) (err error) {
return rs.Cmd("SELECT", dbName).Err
}

View File

@@ -61,7 +61,7 @@ func (self *SQLStorage) GetKeysForPrefix(prefix string) ([]string, error) {
return nil, utils.ErrNotImplemented
}
func (ms *SQLStorage) RebuildReverseForPrefix(prefix string) error {
func (self *SQLStorage) RebuildReverseForPrefix(prefix string) error {
return utils.ErrNotImplemented
}

View File

@@ -240,7 +240,7 @@ func (sv *StructVersion) CompareAndMigrate(dbVer *StructVersion) []*MigrationInf
}
func CurrentStorDBVersions() Versions {
return Versions{utils.COST_DETAILS: 2}
return Versions{utils.COST_DETAILS: 2, utils.Accounts: 2}
}
// Versions will keep trac of various item versions