diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 62dda9272..cdc3a862d 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -52,6 +52,7 @@ type ApierV1 struct { Users rpcclient.RpcClientConnection CDRs rpcclient.RpcClientConnection // FixMe: populate it from cgr-engine ServManager *servmanager.ServiceManager // Need to have them capitalize so we can export in V2 + HTTPPoster *utils.HTTPPoster } func (self *ApierV1) GetDestination(dstId string, reply *engine.Destination) error { diff --git a/apier/v1/cdre.go b/apier/v1/cdre.go index 493e529cd..a0ead4ef4 100644 --- a/apier/v1/cdre.go +++ b/apier/v1/cdre.go @@ -32,8 +32,8 @@ import ( "time" "unicode/utf8" - "github.com/cgrates/cgrates/cdre" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) @@ -90,11 +90,7 @@ func (self *ApierV1) ExportCdrsToZipString(attr utils.AttrExpFileCdrs, reply *st } // Export Cdrs to file -func (self *ApierV1) ExportCdrsToFile(attr utils.AttrExpFileCdrs, reply *utils.ExportedFileCdrs) error { - var err error - - //cdreReloadStruct := <-self.Config.ConfigReloads[utils.CDRE] // Read the content of the channel, locking it - //defer func() { self.Config.ConfigReloads[utils.CDRE] <- cdreReloadStruct }() // Unlock reloads at exit +func (self *ApierV1) ExportCdrsToFile(attr utils.AttrExpFileCdrs, reply *utils.ExportedFileCdrs) (err error) { exportTemplate := self.Config.CdreProfiles[utils.META_DEFAULT] if attr.ExportTemplate != nil && len(*attr.ExportTemplate) != 0 { // Export template prefered, use it var hasIt bool @@ -105,11 +101,11 @@ func (self *ApierV1) ExportCdrsToFile(attr utils.AttrExpFileCdrs, reply *utils.E if exportTemplate == nil { return fmt.Errorf("%s:ExportTemplate", utils.ErrMandatoryIeMissing.Error()) } - cdrFormat := exportTemplate.CdrFormat + exportFormat := exportTemplate.ExportFormat if attr.CdrFormat != nil && len(*attr.CdrFormat) != 0 { - cdrFormat = strings.ToLower(*attr.CdrFormat) + exportFormat = strings.ToLower(*attr.CdrFormat) } - if !utils.IsSliceMember(utils.CdreCdrFormats, cdrFormat) { + if !utils.IsSliceMember(utils.CDRExportFormats, exportFormat) { return fmt.Errorf("%s:%s", utils.ErrMandatoryIeMissing.Error(), "CdrFormat") } fieldSep := exportTemplate.FieldSeparator @@ -119,37 +115,34 @@ func (self *ApierV1) ExportCdrsToFile(attr utils.AttrExpFileCdrs, reply *utils.E return fmt.Errorf("%s:FieldSeparator:%s", utils.ErrServerError.Error(), "Invalid") } } - exportDir := exportTemplate.ExportDirectory + exportPath := exportTemplate.ExportPath if attr.ExportDir != nil && len(*attr.ExportDir) != 0 { - exportDir = *attr.ExportDir + exportPath = *attr.ExportDir } - exportId := strconv.FormatInt(time.Now().Unix(), 10) + exportID := strconv.FormatInt(time.Now().Unix(), 10) if attr.ExportId != nil && len(*attr.ExportId) != 0 { - exportId = *attr.ExportId + exportID = *attr.ExportId } - fileName := fmt.Sprintf("cdre_%s.%s", exportId, cdrFormat) + fileName := fmt.Sprintf("cdre_%s.%s", exportID, exportFormat) if attr.ExportFileName != nil && len(*attr.ExportFileName) != 0 { fileName = *attr.ExportFileName } - filePath := path.Join(exportDir, fileName) - if cdrFormat == utils.DRYRUN { + filePath := path.Join(exportPath, fileName) + if exportFormat == utils.DRYRUN { filePath = utils.DRYRUN } - dataUsageMultiplyFactor := exportTemplate.DataUsageMultiplyFactor + usageMultiplyFactor := exportTemplate.UsageMultiplyFactor if attr.DataUsageMultiplyFactor != nil && *attr.DataUsageMultiplyFactor != 0.0 { - dataUsageMultiplyFactor = *attr.DataUsageMultiplyFactor + usageMultiplyFactor[utils.DATA] = *attr.DataUsageMultiplyFactor } - smsUsageMultiplyFactor := exportTemplate.SMSUsageMultiplyFactor if attr.SmsUsageMultiplyFactor != nil && *attr.SmsUsageMultiplyFactor != 0.0 { - smsUsageMultiplyFactor = *attr.SmsUsageMultiplyFactor + usageMultiplyFactor[utils.SMS] = *attr.SmsUsageMultiplyFactor } - mmsUsageMultiplyFactor := exportTemplate.MMSUsageMultiplyFactor if attr.MmsUsageMultiplyFactor != nil && *attr.MmsUsageMultiplyFactor != 0.0 { - mmsUsageMultiplyFactor = *attr.MmsUsageMultiplyFactor + usageMultiplyFactor[utils.MMS] = *attr.MmsUsageMultiplyFactor } - genericUsageMultiplyFactor := exportTemplate.GenericUsageMultiplyFactor if attr.GenericUsageMultiplyFactor != nil && *attr.GenericUsageMultiplyFactor != 0.0 { - genericUsageMultiplyFactor = *attr.GenericUsageMultiplyFactor + usageMultiplyFactor[utils.GENERIC] = *attr.GenericUsageMultiplyFactor } costMultiplyFactor := exportTemplate.CostMultiplyFactor if attr.CostMultiplyFactor != nil && *attr.CostMultiplyFactor != 0.0 { @@ -166,18 +159,19 @@ func (self *ApierV1) ExportCdrsToFile(attr utils.AttrExpFileCdrs, reply *utils.E *reply = utils.ExportedFileCdrs{ExportedFilePath: ""} return nil } - cdrexp, err := cdre.NewCdrExporter(cdrs, self.CdrDb, exportTemplate, cdrFormat, fieldSep, exportId, dataUsageMultiplyFactor, smsUsageMultiplyFactor, - mmsUsageMultiplyFactor, genericUsageMultiplyFactor, costMultiplyFactor, self.Config.RoundingDecimals, self.Config.HttpSkipTlsVerify) + cdrexp, err := engine.NewCDRExporter(cdrs, exportTemplate, exportFormat, filePath, utils.META_NONE, exportID, + exportTemplate.Synchronous, exportTemplate.Attempts, fieldSep, usageMultiplyFactor, + costMultiplyFactor, self.Config.RoundingDecimals, self.Config.HttpSkipTlsVerify, self.HTTPPoster) if err != nil { return utils.NewErrServerError(err) } + if err := cdrexp.ExportCDRs(); err != nil { + return utils.NewErrServerError(err) + } if cdrexp.TotalExportedCdrs() == 0 { *reply = utils.ExportedFileCdrs{ExportedFilePath: ""} return nil } - if err := cdrexp.WriteToFile(filePath); err != nil { - return utils.NewErrServerError(err) - } *reply = utils.ExportedFileCdrs{ExportedFilePath: filePath, TotalRecords: len(cdrs), TotalCost: cdrexp.TotalCost(), FirstOrderId: cdrexp.FirstOrderId(), LastOrderId: cdrexp.LastOrderId()} if !attr.SuppressCgrIds { reply.ExportedCgrIds = cdrexp.PositiveExports() diff --git a/apier/v2/cdre.go b/apier/v2/cdre.go index 1bb81457e..3a177c3d1 100644 --- a/apier/v2/cdre.go +++ b/apier/v2/cdre.go @@ -25,7 +25,7 @@ import ( "time" "unicode/utf8" - "github.com/cgrates/cgrates/cdre" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) @@ -41,11 +41,11 @@ func (self *ApierV2) ExportCdrsToFile(attr utils.AttrExportCdrsToFile, reply *ut return fmt.Errorf("%s:ExportTemplate", utils.ErrNotFound) } } - cdrFormat := exportTemplate.CdrFormat + exportFormat := exportTemplate.ExportFormat if attr.CdrFormat != nil && len(*attr.CdrFormat) != 0 { - cdrFormat = strings.ToLower(*attr.CdrFormat) + exportFormat = strings.ToLower(*attr.CdrFormat) } - if !utils.IsSliceMember(utils.CdreCdrFormats, cdrFormat) { + if !utils.IsSliceMember(utils.CDRExportFormats, exportFormat) { return utils.NewErrMandatoryIeMissing("CdrFormat") } fieldSep := exportTemplate.FieldSeparator @@ -55,37 +55,34 @@ func (self *ApierV2) ExportCdrsToFile(attr utils.AttrExportCdrsToFile, reply *ut return fmt.Errorf("%s:FieldSeparator:%s", utils.ErrServerError, "Invalid") } } - eDir := exportTemplate.ExportDirectory + eDir := exportTemplate.ExportPath if attr.ExportDirectory != nil && len(*attr.ExportDirectory) != 0 { eDir = *attr.ExportDirectory } - ExportID := strconv.FormatInt(time.Now().Unix(), 10) + exportID := strconv.FormatInt(time.Now().Unix(), 10) if attr.ExportID != nil && len(*attr.ExportID) != 0 { - ExportID = *attr.ExportID + exportID = *attr.ExportID } - fileName := fmt.Sprintf("cdre_%s.%s", ExportID, cdrFormat) + fileName := fmt.Sprintf("cdre_%s.%s", exportID, exportFormat) if attr.ExportFileName != nil && len(*attr.ExportFileName) != 0 { fileName = *attr.ExportFileName } filePath := path.Join(eDir, fileName) - if cdrFormat == utils.DRYRUN { + if exportFormat == utils.DRYRUN { filePath = utils.DRYRUN } - dataUsageMultiplyFactor := exportTemplate.DataUsageMultiplyFactor + usageMultiplyFactor := exportTemplate.UsageMultiplyFactor if attr.DataUsageMultiplyFactor != nil && *attr.DataUsageMultiplyFactor != 0.0 { - dataUsageMultiplyFactor = *attr.DataUsageMultiplyFactor + usageMultiplyFactor[utils.DATA] = *attr.DataUsageMultiplyFactor } - SMSUsageMultiplyFactor := exportTemplate.SMSUsageMultiplyFactor if attr.SMSUsageMultiplyFactor != nil && *attr.SMSUsageMultiplyFactor != 0.0 { - SMSUsageMultiplyFactor = *attr.SMSUsageMultiplyFactor + usageMultiplyFactor[utils.SMS] = *attr.SMSUsageMultiplyFactor } - MMSUsageMultiplyFactor := exportTemplate.MMSUsageMultiplyFactor if attr.MMSUsageMultiplyFactor != nil && *attr.MMSUsageMultiplyFactor != 0.0 { - MMSUsageMultiplyFactor = *attr.MMSUsageMultiplyFactor + usageMultiplyFactor[utils.MMS] = *attr.MMSUsageMultiplyFactor } - genericUsageMultiplyFactor := exportTemplate.GenericUsageMultiplyFactor if attr.GenericUsageMultiplyFactor != nil && *attr.GenericUsageMultiplyFactor != 0.0 { - genericUsageMultiplyFactor = *attr.GenericUsageMultiplyFactor + usageMultiplyFactor[utils.GENERIC] = *attr.GenericUsageMultiplyFactor } costMultiplyFactor := exportTemplate.CostMultiplyFactor if attr.CostMultiplyFactor != nil && *attr.CostMultiplyFactor != 0.0 { @@ -106,18 +103,19 @@ func (self *ApierV2) ExportCdrsToFile(attr utils.AttrExportCdrsToFile, reply *ut if attr.RoundingDecimals != nil { roundingDecimals = *attr.RoundingDecimals } - cdrexp, err := cdre.NewCdrExporter(cdrs, self.CdrDb, exportTemplate, cdrFormat, fieldSep, ExportID, dataUsageMultiplyFactor, SMSUsageMultiplyFactor, - MMSUsageMultiplyFactor, genericUsageMultiplyFactor, costMultiplyFactor, roundingDecimals, self.Config.HttpSkipTlsVerify) + cdrexp, err := engine.NewCDRExporter(cdrs, exportTemplate, exportFormat, filePath, utils.META_NONE, exportID, + exportTemplate.Synchronous, exportTemplate.Attempts, fieldSep, usageMultiplyFactor, + costMultiplyFactor, roundingDecimals, self.Config.HttpSkipTlsVerify, self.HTTPPoster) if err != nil { return utils.NewErrServerError(err) } + if err := cdrexp.ExportCDRs(); err != nil { + return utils.NewErrServerError(err) + } if cdrexp.TotalExportedCdrs() == 0 { *reply = utils.ExportedFileCdrs{ExportedFilePath: ""} return nil } - if err := cdrexp.WriteToFile(filePath); err != nil { - return utils.NewErrServerError(err) - } *reply = utils.ExportedFileCdrs{ExportedFilePath: filePath, TotalRecords: len(cdrs), TotalCost: cdrexp.TotalCost(), FirstOrderId: cdrexp.FirstOrderId(), LastOrderId: cdrexp.LastOrderId()} if !attr.Verbose { reply.ExportedCgrIds = cdrexp.PositiveExports() diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index 52ef90fce..58c215cb5 100644 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -231,7 +231,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC responder := &engine.Responder{Bal: bal, ExitChan: exitChan} responder.SetTimeToLive(cfg.ResponseCacheTTL, nil) apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, - Config: cfg, Responder: responder, ServManager: serviceManager} + Config: cfg, Responder: responder, ServManager: serviceManager, HTTPPoster: utils.NewHTTPPoster(cfg.HttpSkipTlsVerify, cfg.ReplyTimeout)} if cdrStats != nil { // ToDo: Fix here properly the init of stats responder.Stats = cdrStats apierRpcV1.CdrStatsSrv = cdrStats diff --git a/config/cdreconfig.go b/config/cdreconfig.go index d36908ebb..af5aae508 100644 --- a/config/cdreconfig.go +++ b/config/cdreconfig.go @@ -25,6 +25,8 @@ import ( type CdreConfig struct { ExportFormat string ExportPath string + FallbackPath string + CDRFilter utils.RSRFields Synchronous bool Attempts int FieldSeparator rune @@ -46,6 +48,11 @@ func (self *CdreConfig) loadFromJsonCfg(jsnCfg *CdreJsonCfg) error { if jsnCfg.Export_path != nil { self.ExportPath = *jsnCfg.Export_path } + if jsnCfg.Cdr_filter != nil { + if self.CDRFilter, err = utils.ParseRSRFields(*jsnCfg.Cdr_filter, utils.INFIELD_SEP); err != nil { + return err + } + } if jsnCfg.Synchronous != nil { self.Synchronous = *jsnCfg.Synchronous } diff --git a/config/config_json_test.go b/config/config_json_test.go index 301d0de08..3b5a62c96 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -284,6 +284,7 @@ func TestDfCdreJsonCfgs(t *testing.T) { utils.META_DEFAULT: &CdreJsonCfg{ Export_format: utils.StringPointer(utils.MetaFileCSV), Export_path: utils.StringPointer("/var/spool/cgrates/cdre"), + Cdr_filter: utils.StringPointer(""), Synchronous: utils.BoolPointer(false), Attempts: utils.IntPointer(1), Field_separator: utils.StringPointer(","), diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 112a50196..9800f1918 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -147,6 +147,7 @@ type CdrFieldJsonCfg struct { type CdreJsonCfg struct { Export_format *string Export_path *string + Cdr_filter *string Synchronous *bool Attempts *int Field_separator *string diff --git a/engine/cdr.go b/engine/cdr.go index 3f93fe7b3..9b0e6f8a6 100644 --- a/engine/cdr.go +++ b/engine/cdr.go @@ -792,6 +792,11 @@ func (cdr *CDR) formatField(cfgFld *config.CfgCdrField, httpSkipTlsCheck bool, g } +// Part of event interface +func (cdr *CDR) AsMapStringIface() (map[string]interface{}, error) { + return nil, utils.ErrNotImplemented +} + // Used in place where we need to export the CDR based on an export template // ExportRecord is a []string to keep it compatible with encoding/csv Writer func (cdr *CDR) AsExportRecord(exportFields []*config.CfgCdrField, httpSkipTlsCheck bool, groupedCDRs []*CDR, roundingDecs int) (expRecord []string, err error) { @@ -812,16 +817,17 @@ func (cdr *CDR) AsExportRecord(exportFields []*config.CfgCdrField, httpSkipTlsCh return expRecord, nil } -// Part of event interface -func (cdr *CDR) AsMapStringIface() (map[string]interface{}, error) { - return nil, utils.ErrNotImplemented -} - // AsExportMap converts the CDR into a map[string]string based on export template // Used in real-time replication as well as remote exports -func (cdr *CDR) AsExportMap(exportFields []*config.CfgCdrField, httpSkipTlsCheck bool, groupedCDRs []*CDR) (expMap map[string]string, err error) { +func (cdr *CDR) AsExportMap(exportFields []*config.CfgCdrField, httpSkipTlsCheck bool, groupedCDRs []*CDR, roundingDecs int) (expMap map[string]string, err error) { expMap = make(map[string]string) for _, cfgFld := range exportFields { + if roundingDecs != 0 { + clnFld := new(config.CfgCdrField) // Clone so we can modify the rounding decimals without affecting the template + *clnFld = *cfgFld + clnFld.RoundingDecimals = roundingDecs + cfgFld = clnFld + } if fmtOut, err := cdr.formatField(cfgFld, httpSkipTlsCheck, groupedCDRs); err != nil { return nil, err } else { diff --git a/engine/cdr_test.go b/engine/cdr_test.go index d78d5eaad..51dc51a2a 100644 --- a/engine/cdr_test.go +++ b/engine/cdr_test.go @@ -579,7 +579,7 @@ func TestCDRAsExportMap(t *testing.T) { &config.CfgCdrField{FieldId: utils.DESTINATION, Type: utils.META_COMPOSED, Value: utils.ParseRSRFieldsMustCompile("~Destination:s/^\\+(\\d+)$/00${1}/", utils.INFIELD_SEP)}, &config.CfgCdrField{FieldId: "FieldExtra1", Type: utils.META_COMPOSED, Value: utils.ParseRSRFieldsMustCompile("field_extr1", utils.INFIELD_SEP)}, } - if cdrMp, err := cdr.AsExportMap(expFlds, false, nil); err != nil { + if cdrMp, err := cdr.AsExportMap(expFlds, false, nil, 0); err != nil { t.Error(err) } else if !reflect.DeepEqual(eCDRMp, cdrMp) { t.Errorf("Expecting: %+v, received: %+v", eCDRMp, cdrMp) diff --git a/engine/cdrexporter.go b/engine/cdre.go similarity index 52% rename from engine/cdrexporter.go rename to engine/cdre.go index f08445834..7e6ee13fa 100644 --- a/engine/cdrexporter.go +++ b/engine/cdre.go @@ -15,19 +15,23 @@ GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see */ -package cdre +package engine import ( "encoding/csv" + "encoding/json" "fmt" "io" + "net/url" "os" + "path" "strconv" + "sync" "time" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/streadway/amqp" ) const ( @@ -45,68 +49,68 @@ const ( META_FORMATCOST = "*format_cost" ) -var err error - -func NewCdrExporter(cdrs []*engine.CDR, cdrDb engine.CdrStorage, exportTpl *config.CdreConfig, cdrFormat string, fieldSeparator rune, exportId string, - dataUsageMultiplyFactor, smsUsageMultiplyFactor, mmsUsageMultiplyFactor, genericUsageMultiplyFactor, costMultiplyFactor float64, - roundingDecimals int, httpSkipTlsCheck bool) (*CdrExporter, error) { +func NewCDRExporter(cdrs []*CDR, exportTemplate *config.CdreConfig, exportFormat, exportPath, fallbackPath, exportID string, + synchronous bool, attempts int, fieldSeparator rune, usageMultiplyFactor utils.FieldMultiplyFactor, + costMultiplyFactor float64, roundingDecimals int, httpSkipTlsCheck bool, httpPoster *utils.HTTPPoster) (*CDRExporter, error) { if len(cdrs) == 0 { // Nothing to export return nil, nil } - cdre := &CdrExporter{ - cdrs: cdrs, - cdrDb: cdrDb, - exportTemplate: exportTpl, - cdrFormat: cdrFormat, - fieldSeparator: fieldSeparator, - exportId: exportId, - dataUsageMultiplyFactor: dataUsageMultiplyFactor, - smsUsageMultiplyFactor: smsUsageMultiplyFactor, - mmsUsageMultiplyFactor: mmsUsageMultiplyFactor, - genericUsageMultiplyFactor: genericUsageMultiplyFactor, - costMultiplyFactor: costMultiplyFactor, - roundingDecimals: roundingDecimals, - httpSkipTlsCheck: httpSkipTlsCheck, - negativeExports: make(map[string]string), - } - if err := cdre.processCdrs(); err != nil { - return nil, err + cdre := &CDRExporter{ + cdrs: cdrs, + exportTemplate: exportTemplate, + exportFormat: exportFormat, + exportPath: exportPath, + fallbackPath: fallbackPath, + exportID: exportID, + synchronous: synchronous, + attempts: attempts, + fieldSeparator: fieldSeparator, + usageMultiplyFactor: usageMultiplyFactor, + costMultiplyFactor: costMultiplyFactor, + roundingDecimals: roundingDecimals, + httpSkipTlsCheck: httpSkipTlsCheck, + httpPoster: httpPoster, + negativeExports: make(map[string]string), } return cdre, nil } -type CdrExporter struct { - cdrs []*engine.CDR - cdrDb engine.CdrStorage // Used to extract cost_details if these are requested - exportTemplate *config.CdreConfig - cdrFormat string // csv, fwv - fieldSeparator rune - exportId string // Unique identifier or this export - dataUsageMultiplyFactor, - smsUsageMultiplyFactor, // Multiply the SMS usage (eg: some billing systems billing them as minutes) - mmsUsageMultiplyFactor, - genericUsageMultiplyFactor, - costMultiplyFactor float64 - roundingDecimals int - httpSkipTlsCheck bool - header, trailer []string // Header and Trailer fields - content [][]string // Rows of cdr fields +type CDRExporter struct { + cdrs []*CDR + exportTemplate *config.CdreConfig + exportFormat string + exportPath string + fallbackPath string // folder where we save failed CDRs + exportID string // Unique identifier or this export + synchronous bool + attempts int + fieldSeparator rune + usageMultiplyFactor utils.FieldMultiplyFactor + costMultiplyFactor float64 + roundingDecimals int + httpSkipTlsCheck bool + httpPoster *utils.HTTPPoster + + header, trailer []string // Header and Trailer fields + content [][]string // Rows of cdr fields + firstCdrATime, lastCdrATime time.Time numberOfRecords int totalDuration, totalDataUsage, totalSmsUsage, totalMmsUsage, totalGenericUsage time.Duration - totalCost float64 firstExpOrderId, lastExpOrderId int64 positiveExports []string // CGRIDs of successfully exported CDRs + pEMux sync.RWMutex // protect positiveExports negativeExports map[string]string // CGRIDs of failed exports + nEMux sync.RWMutex // protect negativeExports } // Handle various meta functions used in header/trailer -func (cdre *CdrExporter) metaHandler(tag, arg string) (string, error) { +func (cdre *CDRExporter) metaHandler(tag, arg string) (string, error) { switch tag { case META_EXPORTID: - return cdre.exportId, nil + return cdre.exportID, nil case META_TIMENOW: return time.Now().Format(arg), nil case META_FIRSTCDRATIME: @@ -116,19 +120,19 @@ func (cdre *CdrExporter) metaHandler(tag, arg string) (string, error) { case META_NRCDRS: return strconv.Itoa(cdre.numberOfRecords), nil case META_DURCDRS: - emulatedCdr := &engine.CDR{ToR: utils.VOICE, Usage: cdre.totalDuration} + emulatedCdr := &CDR{ToR: utils.VOICE, Usage: cdre.totalDuration} return emulatedCdr.FormatUsage(arg), nil case META_SMSUSAGE: - emulatedCdr := &engine.CDR{ToR: utils.SMS, Usage: cdre.totalSmsUsage} + emulatedCdr := &CDR{ToR: utils.SMS, Usage: cdre.totalSmsUsage} return emulatedCdr.FormatUsage(arg), nil case META_MMSUSAGE: - emulatedCdr := &engine.CDR{ToR: utils.MMS, Usage: cdre.totalMmsUsage} + emulatedCdr := &CDR{ToR: utils.MMS, Usage: cdre.totalMmsUsage} return emulatedCdr.FormatUsage(arg), nil case META_GENERICUSAGE: - emulatedCdr := &engine.CDR{ToR: utils.GENERIC, Usage: cdre.totalGenericUsage} + emulatedCdr := &CDR{ToR: utils.GENERIC, Usage: cdre.totalGenericUsage} return emulatedCdr.FormatUsage(arg), nil case META_DATAUSAGE: - emulatedCdr := &engine.CDR{ToR: utils.DATA, Usage: cdre.totalDataUsage} + emulatedCdr := &CDR{ToR: utils.DATA, Usage: cdre.totalDataUsage} return emulatedCdr.FormatUsage(arg), nil case META_COSTCDRS: return strconv.FormatFloat(utils.Round(cdre.totalCost, cdre.roundingDecimals, utils.ROUNDING_MIDDLE), 'f', -1, 64), nil @@ -138,7 +142,7 @@ func (cdre *CdrExporter) metaHandler(tag, arg string) (string, error) { } // Compose and cache the header -func (cdre *CdrExporter) composeHeader() error { +func (cdre *CDRExporter) composeHeader() (err error) { for _, cfgFld := range cdre.exportTemplate.HeaderFields { var outVal string switch cfgFld.Type { @@ -167,7 +171,7 @@ func (cdre *CdrExporter) composeHeader() error { } // Compose and cache the trailer -func (cdre *CdrExporter) composeTrailer() error { +func (cdre *CDRExporter) composeTrailer() (err error) { for _, cfgFld := range cdre.exportTemplate.TrailerFields { var outVal string switch cfgFld.Type { @@ -195,35 +199,96 @@ func (cdre *CdrExporter) composeTrailer() error { return nil } +func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) { + var body interface{} + switch cdre.exportFormat { + case utils.MetaHTTPjsonCDR, utils.MetaAMQPjsonCDR: + jsn, err := json.Marshal(cdr) + if err != nil { + return err + } + body = jsn + case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap: + expMp, err := cdr.AsExportMap(cdre.exportTemplate.ContentFields, cdre.httpSkipTlsCheck, nil, cdre.roundingDecimals) + if err != nil { + return err + } + jsn, err := json.Marshal(expMp) + if err != nil { + return err + } + body = jsn + case utils.META_HTTP_POST: + expMp, err := cdr.AsExportMap(cdre.exportTemplate.ContentFields, cdre.httpSkipTlsCheck, nil, cdre.roundingDecimals) + if err != nil { + return err + } + vals := url.Values{} + for fld, val := range expMp { + vals.Set(fld, val) + } + body = vals + default: + err = fmt.Errorf("unsupported exportFormat: <%s>", cdre.exportFormat) + } + if err != nil { + return + } + // compute fallbackPath + fallbackPath := utils.META_NONE + ffn := &utils.FallbackFileName{Module: utils.CDRPoster, Transport: cdre.exportFormat, Address: cdre.exportPath, RequestID: utils.GenUUID()} + fallbackFileName := ffn.AsString() + if cdre.fallbackPath != utils.META_NONE { // not none, need fallback + fallbackPath = path.Join(cdre.fallbackPath, fallbackFileName) + } + switch cdre.exportFormat { + case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST: + _, err = cdre.httpPoster.Post(cdre.exportPath, utils.PosterTransportContentTypes[cdre.exportFormat], body, cdre.attempts, fallbackPath) + case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap: + var amqpPoster *utils.AMQPPoster + amqpPoster, err = utils.AMQPPostersCache.GetAMQPPoster(cdre.exportPath, cdre.attempts, cdre.fallbackPath) + if err == nil { // error will be checked bellow + var chn *amqp.Channel + chn, err = amqpPoster.Post( + nil, utils.PosterTransportContentTypes[cdre.exportFormat], body.([]byte), fallbackFileName) + if chn != nil { + chn.Close() + } + } + } + return +} + // Write individual cdr into content buffer, build stats -func (cdre *CdrExporter) processCdr(cdr *engine.CDR) error { - if cdr == nil || len(cdr.CGRID) == 0 { // We do not export empty CDRs - return nil - } else if cdr.ExtraFields == nil { // Avoid assignment in nil map if not initialized +func (cdre *CDRExporter) processCdr(cdr *CDR) (err error) { + if cdr.ExtraFields == nil { // Avoid assignment in nil map if not initialized cdr.ExtraFields = make(map[string]string) } - // Cost multiply - if cdre.dataUsageMultiplyFactor != 0.0 && cdr.ToR == utils.DATA { - cdr.UsageMultiply(cdre.dataUsageMultiplyFactor, cdre.roundingDecimals) - } else if cdre.smsUsageMultiplyFactor != 0 && cdr.ToR == utils.SMS { - cdr.UsageMultiply(cdre.smsUsageMultiplyFactor, cdre.roundingDecimals) - } else if cdre.mmsUsageMultiplyFactor != 0 && cdr.ToR == utils.MMS { - cdr.UsageMultiply(cdre.mmsUsageMultiplyFactor, cdre.roundingDecimals) - } else if cdre.genericUsageMultiplyFactor != 0 && cdr.ToR == utils.GENERIC { - cdr.UsageMultiply(cdre.genericUsageMultiplyFactor, cdre.roundingDecimals) + // Usage multiply, find config based on ToR field or *any + for _, key := range []string{cdr.ToR, utils.ANY} { + if uM, hasIt := cdre.usageMultiplyFactor[key]; hasIt && uM != 1.0 { + cdr.UsageMultiply(uM, cdre.roundingDecimals) + break + } } if cdre.costMultiplyFactor != 0.0 { cdr.CostMultiply(cdre.costMultiplyFactor, cdre.roundingDecimals) } - cdrRow, err := cdr.AsExportRecord(cdre.exportTemplate.ContentFields, cdre.httpSkipTlsCheck, cdre.cdrs, cdre.roundingDecimals) - if err != nil { - utils.Logger.Err(fmt.Sprintf(" Cannot export CDR with CGRID: %s and runid: %s, error: %s", cdr.CGRID, cdr.RunID, err.Error())) - return err + switch cdre.exportFormat { + case utils.MetaFileFWV, utils.MetaFileCSV: + var cdrRow []string + cdrRow, err = cdr.AsExportRecord(cdre.exportTemplate.ContentFields, cdre.httpSkipTlsCheck, cdre.cdrs, cdre.roundingDecimals) + if len(cdrRow) == 0 { // No CDR data, most likely no configuration fields defined + return + } else { + cdre.content = append(cdre.content, cdrRow) + } + default: // attempt posting CDR + err = cdre.postCdr(cdr) } - if len(cdrRow) == 0 { // No CDR data, most likely no configuration fields defined - return nil - } else { - cdre.content = append(cdre.content, cdrRow) + if err != nil { + utils.Logger.Err(fmt.Sprintf(" Cannot export CDR with CGRID: %s and runid: %s, error: %s", cdr.CGRID, cdr.RunID, err.Error())) + return } // Done with writing content, compute stats here if cdre.firstCdrATime.IsZero() || cdr.AnswerTime.Before(cdre.firstCdrATime) { @@ -262,14 +327,41 @@ func (cdre *CdrExporter) processCdr(cdr *engine.CDR) error { } // Builds header, content and trailers -func (cdre *CdrExporter) processCdrs() error { +func (cdre *CDRExporter) processCdrs() error { + var wg sync.WaitGroup for _, cdr := range cdre.cdrs { - if err := cdre.processCdr(cdr); err != nil { - cdre.negativeExports[cdr.CGRID] = err.Error() - } else { - cdre.positiveExports = append(cdre.positiveExports, cdr.CGRID) + if cdr == nil || len(cdr.CGRID) == 0 { // CDR needs to exist and it's CGRID needs to be populated + continue } + passesFilters := true + for _, cdfFltr := range cdre.exportTemplate.CDRFilter { + if !cdfFltr.FilterPasses(cdr.FieldAsString(cdfFltr)) { + passesFilters = false + break + } + } + if !passesFilters { // Not passes filters, ignore this CDR + continue + } + if cdre.synchronous { + wg.Add(1) + } + go func(cdr *CDR) { + if err := cdre.processCdr(cdr); err != nil { + cdre.nEMux.Lock() + cdre.negativeExports[cdr.CGRID] = err.Error() + cdre.nEMux.Unlock() + } else { + cdre.pEMux.Lock() + cdre.positiveExports = append(cdre.positiveExports, cdr.CGRID) + cdre.pEMux.Unlock() + } + if cdre.synchronous { + wg.Done() + } + }(cdr) } + wg.Wait() // Process header and trailer after processing cdrs since the metatag functions can access stats out of built cdrs if cdre.exportTemplate.HeaderFields != nil { if err := cdre.composeHeader(); err != nil { @@ -285,7 +377,7 @@ func (cdre *CdrExporter) processCdrs() error { } // Simple write method -func (cdre *CdrExporter) writeOut(ioWriter io.Writer) error { +func (cdre *CDRExporter) writeOut(ioWriter io.Writer) error { if len(cdre.header) != 0 { for _, fld := range append(cdre.header, "\n") { if _, err := io.WriteString(ioWriter, fld); err != nil { @@ -311,7 +403,7 @@ func (cdre *CdrExporter) writeOut(ioWriter io.Writer) error { } // csvWriter specific method -func (cdre *CdrExporter) writeCsv(csvWriter *csv.Writer) error { +func (cdre *CDRExporter) writeCsv(csvWriter *csv.Writer) error { csvWriter.Comma = cdre.fieldSeparator if len(cdre.header) != 0 { if err := csvWriter.Write(cdre.header); err != nil { @@ -332,54 +424,57 @@ func (cdre *CdrExporter) writeCsv(csvWriter *csv.Writer) error { return nil } -// General method to write the content out to a file -func (cdre *CdrExporter) WriteToFile(filePath string) error { - fileOut, err := os.Create(filePath) - if err != nil { - return err +func (cdre *CDRExporter) ExportCDRs() (err error) { + if err = cdre.processCdrs(); err != nil { + return } - defer fileOut.Close() - switch cdre.cdrFormat { - case utils.DRYRUN: - return nil - case utils.CDRE_FIXED_WIDTH: - if err := cdre.writeOut(fileOut); err != nil { - return utils.NewErrServerError(err) + switch cdre.exportFormat { + case utils.MetaFileFWV, utils.MetaFileCSV: + if len(cdre.content) == 0 { + return } - case utils.CSV: - csvWriter := csv.NewWriter(fileOut) - if err := cdre.writeCsv(csvWriter); err != nil { - return utils.NewErrServerError(err) + fileOut, err := os.Create(cdre.exportPath) + if err != nil { + return err } + defer fileOut.Close() + if cdre.exportFormat == utils.MetaFileCSV { + return cdre.writeCsv(csv.NewWriter(fileOut)) + } + return cdre.writeOut(fileOut) } - return nil + return } // Return the first exported Cdr OrderId -func (cdre *CdrExporter) FirstOrderId() int64 { +func (cdre *CDRExporter) FirstOrderId() int64 { return cdre.firstExpOrderId } // Return the last exported Cdr OrderId -func (cdre *CdrExporter) LastOrderId() int64 { +func (cdre *CDRExporter) LastOrderId() int64 { return cdre.lastExpOrderId } // Return total cost in the exported cdrs -func (cdre *CdrExporter) TotalCost() float64 { +func (cdre *CDRExporter) TotalCost() float64 { return cdre.totalCost } -func (cdre *CdrExporter) TotalExportedCdrs() int { +func (cdre *CDRExporter) TotalExportedCdrs() int { return cdre.numberOfRecords } // Return successfully exported CGRIDs -func (cdre *CdrExporter) PositiveExports() []string { +func (cdre *CDRExporter) PositiveExports() []string { + cdre.pEMux.RLock() + defer cdre.pEMux.RUnlock() return cdre.positiveExports } // Return failed exported CGRIDs together with the reason -func (cdre *CdrExporter) NegativeExports() map[string]string { +func (cdre *CDRExporter) NegativeExports() map[string]string { + cdre.nEMux.RLock() + defer cdre.nEMux.RUnlock() return cdre.negativeExports } diff --git a/engine/cdrecsv_test.go b/engine/cdrecsv_test.go index 4ebe8352b..8b9720105 100644 --- a/engine/cdrecsv_test.go +++ b/engine/cdrecsv_test.go @@ -15,7 +15,7 @@ GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see */ -package cdre +package engine import ( "bytes" @@ -25,24 +25,27 @@ import ( "time" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) func TestCsvCdrWriter(t *testing.T) { writer := &bytes.Buffer{} cfg, _ := config.NewDefaultCGRConfig() - storedCdr1 := &engine.CDR{ + storedCdr1 := &CDR{ CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()), ToR: utils.VOICE, OriginID: "dsafdsaf", OriginHost: "192.168.1.1", RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1001", Subject: "1001", Destination: "1002", SetupTime: time.Unix(1383813745, 0).UTC(), AnswerTime: time.Unix(1383813746, 0).UTC(), Usage: time.Duration(10) * time.Second, RunID: utils.DEFAULT_RUNID, ExtraFields: map[string]string{"extra1": "val_extra1", "extra2": "val_extra2", "extra3": "val_extra3"}, Cost: 1.01, } - cdre, err := NewCdrExporter([]*engine.CDR{storedCdr1}, nil, cfg.CdreProfiles["*default"], utils.CSV, ',', "firstexport", 0.0, 0.0, 0.0, 0.0, 0.0, cfg.RoundingDecimals, cfg.HttpSkipTlsVerify) + cdre, err := NewCDRExporter([]*CDR{storedCdr1}, cfg.CdreProfiles["*default"], utils.MetaFileCSV, "", "", "firstexport", + true, 1, ',', map[string]float64{}, 0.0, cfg.RoundingDecimals, cfg.HttpSkipTlsVerify, nil) if err != nil { t.Error("Unexpected error received: ", err) } + if err = cdre.processCdrs(); err != nil { + t.Error(err) + } csvWriter := csv.NewWriter(writer) if err := cdre.writeCsv(csvWriter); err != nil { t.Error("Unexpected error: ", err) @@ -60,17 +63,20 @@ func TestCsvCdrWriter(t *testing.T) { func TestAlternativeFieldSeparator(t *testing.T) { writer := &bytes.Buffer{} cfg, _ := config.NewDefaultCGRConfig() - storedCdr1 := &engine.CDR{CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()), ToR: utils.VOICE, OriginID: "dsafdsaf", OriginHost: "192.168.1.1", + storedCdr1 := &CDR{CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()), ToR: utils.VOICE, OriginID: "dsafdsaf", OriginHost: "192.168.1.1", RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1001", Subject: "1001", Destination: "1002", SetupTime: time.Unix(1383813745, 0).UTC(), AnswerTime: time.Unix(1383813746, 0).UTC(), Usage: time.Duration(10) * time.Second, RunID: utils.DEFAULT_RUNID, ExtraFields: map[string]string{"extra1": "val_extra1", "extra2": "val_extra2", "extra3": "val_extra3"}, Cost: 1.01, } - cdre, err := NewCdrExporter([]*engine.CDR{storedCdr1}, nil, cfg.CdreProfiles["*default"], utils.CSV, '|', - "firstexport", 0.0, 0.0, 0.0, 0.0, 0.0, cfg.RoundingDecimals, cfg.HttpSkipTlsVerify) + cdre, err := NewCDRExporter([]*CDR{storedCdr1}, cfg.CdreProfiles["*default"], utils.MetaFileCSV, "", "", "firstexport", + true, 1, '|', map[string]float64{}, 0.0, cfg.RoundingDecimals, cfg.HttpSkipTlsVerify, nil) if err != nil { t.Error("Unexpected error received: ", err) } + if err = cdre.processCdrs(); err != nil { + t.Error(err) + } csvWriter := csv.NewWriter(writer) if err := cdre.writeCsv(csvWriter); err != nil { t.Error("Unexpected error: ", err) diff --git a/engine/cdrefwv_test.go b/engine/cdrefwv_test.go index 8389afc1d..35aa95f5d 100644 --- a/engine/cdrefwv_test.go +++ b/engine/cdrefwv_test.go @@ -15,7 +15,7 @@ GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see */ -package cdre +package engine import ( "bytes" @@ -24,7 +24,6 @@ import ( "time" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) @@ -111,12 +110,12 @@ func TestWriteCdr(t *testing.T) { t.Error(err) } cdreCfg := &config.CdreConfig{ - CdrFormat: utils.CDRE_FIXED_WIDTH, + ExportFormat: utils.MetaFileFWV, HeaderFields: hdrCfgFlds, ContentFields: contentCfgFlds, TrailerFields: trailerCfgFlds, } - cdr := &engine.CDR{CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC).String()), + cdr := &CDR{CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC).String()), ToR: utils.VOICE, OrderID: 1, OriginID: "dsafdsaf", OriginHost: "192.168.1.1", RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1001", Subject: "1001", Destination: "1002", @@ -125,11 +124,15 @@ func TestWriteCdr(t *testing.T) { Usage: time.Duration(10) * time.Second, RunID: utils.DEFAULT_RUNID, Cost: 2.34567, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, } - cdre, err := NewCdrExporter([]*engine.CDR{cdr}, nil, cdreCfg, utils.CDRE_FIXED_WIDTH, ',', "fwv_1", 0.0, 0.0, 0.0, 0.0, 0.0, - cfg.RoundingDecimals, cfg.HttpSkipTlsVerify) + + cdre, err := NewCDRExporter([]*CDR{cdr}, cdreCfg, utils.MetaFileFWV, "", "", "fwv_1", + true, 1, '|', map[string]float64{}, 0.0, cfg.RoundingDecimals, cfg.HttpSkipTlsVerify, nil) if err != nil { t.Error(err) } + if err = cdre.processCdrs(); err != nil { + t.Error(err) + } eHeader := "10 VOIfwv_107111308420018011511340001 \n" eContentOut := "201001 1001 1002 0211 07111308420010 1 3dsafdsaf 0002.34570\n" eTrailer := "90 VOIfwv_100000100000010071113084200071113084200 \n" @@ -169,12 +172,12 @@ func TestWriteCdr(t *testing.T) { func TestWriteCdrs(t *testing.T) { wrBuf := &bytes.Buffer{} cdreCfg := &config.CdreConfig{ - CdrFormat: utils.CDRE_FIXED_WIDTH, + ExportFormat: utils.MetaFileFWV, HeaderFields: hdrCfgFlds, ContentFields: contentCfgFlds, TrailerFields: trailerCfgFlds, } - cdr1 := &engine.CDR{CGRID: utils.Sha1("aaa1", time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC).String()), + cdr1 := &CDR{CGRID: utils.Sha1("aaa1", time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC).String()), ToR: utils.VOICE, OrderID: 2, OriginID: "aaa1", OriginHost: "192.168.1.1", RequestType: utils.META_RATED, Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1001", Subject: "1001", Destination: "1010", SetupTime: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC), @@ -182,7 +185,7 @@ func TestWriteCdrs(t *testing.T) { Usage: time.Duration(10) * time.Second, RunID: utils.DEFAULT_RUNID, Cost: 2.25, ExtraFields: map[string]string{"productnumber": "12341", "fieldextr2": "valextr2"}, } - cdr2 := &engine.CDR{CGRID: utils.Sha1("aaa2", time.Date(2013, 11, 7, 7, 42, 20, 0, time.UTC).String()), + cdr2 := &CDR{CGRID: utils.Sha1("aaa2", time.Date(2013, 11, 7, 7, 42, 20, 0, time.UTC).String()), ToR: utils.VOICE, OrderID: 4, OriginID: "aaa2", OriginHost: "192.168.1.2", RequestType: utils.META_PREPAID, Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1002", Subject: "1002", Destination: "1011", SetupTime: time.Date(2013, 11, 7, 7, 42, 20, 0, time.UTC), @@ -190,8 +193,8 @@ func TestWriteCdrs(t *testing.T) { Usage: time.Duration(5) * time.Minute, RunID: utils.DEFAULT_RUNID, Cost: 1.40001, ExtraFields: map[string]string{"productnumber": "12342", "fieldextr2": "valextr2"}, } - cdr3 := &engine.CDR{} - cdr4 := &engine.CDR{CGRID: utils.Sha1("aaa3", time.Date(2013, 11, 7, 9, 42, 18, 0, time.UTC).String()), + cdr3 := &CDR{} + cdr4 := &CDR{CGRID: utils.Sha1("aaa3", time.Date(2013, 11, 7, 9, 42, 18, 0, time.UTC).String()), ToR: utils.VOICE, OrderID: 3, OriginID: "aaa4", OriginHost: "192.168.1.4", RequestType: utils.META_POSTPAID, Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1004", Subject: "1004", Destination: "1013", SetupTime: time.Date(2013, 11, 7, 9, 42, 18, 0, time.UTC), @@ -200,11 +203,14 @@ func TestWriteCdrs(t *testing.T) { ExtraFields: map[string]string{"productnumber": "12344", "fieldextr2": "valextr2"}, } cfg, _ := config.NewDefaultCGRConfig() - cdre, err := NewCdrExporter([]*engine.CDR{cdr1, cdr2, cdr3, cdr4}, nil, cdreCfg, utils.CDRE_FIXED_WIDTH, ',', - "fwv_1", 0.0, 0.0, 0.0, 0.0, 0.0, cfg.RoundingDecimals, cfg.HttpSkipTlsVerify) + cdre, err := NewCDRExporter([]*CDR{cdr1, cdr2, cdr3, cdr4}, cdreCfg, utils.MetaFileFWV, "", "", "fwv_1", + true, 1, ',', map[string]float64{}, 0.0, cfg.RoundingDecimals, cfg.HttpSkipTlsVerify, nil) if err != nil { t.Error(err) } + if err = cdre.processCdrs(); err != nil { + t.Error(err) + } if err := cdre.writeOut(wrBuf); err != nil { t.Error(err) } diff --git a/engine/cdrs.go b/engine/cdrs.go index 8ec54b2e0..4f828af2f 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -18,12 +18,9 @@ along with this program. If not, see package engine import ( - "encoding/json" "fmt" "io/ioutil" "net/http" - "net/url" - "path" "reflect" "strings" "time" @@ -33,7 +30,6 @@ import ( "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" - "github.com/streadway/amqp" ) var cdrServer *CdrServer // Share the server so we can use it in http handlers @@ -195,12 +191,12 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) { var out int go self.stats.Call("CDRStatsV1.AppendCDR", cdr, &out) } - if len(self.cgrCfg.CDRSCdrReplication) != 0 { // Replicate raw CDR - go self.replicateCdr(cdr) + if len(self.cgrCfg.CDRSOnlineCDRExports) != 0 { // Replicate raw CDR + self.replicateCdr(cdr) } if self.rals != nil && !cdr.Rated { // CDRs not rated will be processed by Rating - go self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CDRSStoreCdrs, self.stats != nil, len(self.cgrCfg.CDRSCdrReplication) != 0) + self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CDRSStoreCdrs, self.stats != nil, len(self.cgrCfg.CDRSOnlineCDRExports) != 0) } return nil } @@ -452,89 +448,91 @@ func (self *CdrServer) getCostFromRater(cdr *CDR) (*CallCost, error) { return cc, nil } -// ToDo: Add websocket support func (self *CdrServer) replicateCdr(cdr *CDR) error { - for _, rplCfg := range self.cgrCfg.CDRSCdrReplication { - passesFilters := true - for _, cdfFltr := range rplCfg.CdrFilter { - if !cdfFltr.FilterPasses(cdr.FieldAsString(cdfFltr)) { - passesFilters = false - break - } - } - if !passesFilters { // Not passes filters, ignore this replication - continue - } - var body interface{} - var content = "" - switch rplCfg.Transport { - case utils.MetaHTTPjsonCDR, utils.MetaAMQPjsonCDR: - jsn, err := json.Marshal(cdr) - if err != nil { - return err - } - body = jsn - case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap: - expMp, err := cdr.AsExportMap(rplCfg.ContentFields, self.cgrCfg.HttpSkipTlsVerify, nil) - if err != nil { - return err - } - jsn, err := json.Marshal(expMp) - if err != nil { - return err - } - body = jsn - case utils.META_HTTP_POST: - expMp, err := cdr.AsExportMap(rplCfg.ContentFields, self.cgrCfg.HttpSkipTlsVerify, nil) - if err != nil { - return err - } - vals := url.Values{} - for fld, val := range expMp { - vals.Set(fld, val) - } - body = vals - } - var errChan chan error - if rplCfg.Synchronous { - errChan = make(chan error) - } - go func(body interface{}, rplCfg *config.CDRReplicationCfg, content string, errChan chan error) { - var err error - fallbackPath := utils.META_NONE - if rplCfg.FallbackFileName() != utils.META_NONE { - fallbackPath = path.Join(self.cgrCfg.FailedPostsDir, rplCfg.FallbackFileName()) - } - switch rplCfg.Transport { - case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST: - _, err = self.httpPoster.Post(rplCfg.Address, utils.PosterTransportContentTypes[rplCfg.Transport], body, rplCfg.Attempts, fallbackPath) - case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap: - var amqpPoster *utils.AMQPPoster - amqpPoster, err = utils.AMQPPostersCache.GetAMQPPoster(rplCfg.Address, rplCfg.Attempts, self.cgrCfg.FailedPostsDir) - if err == nil { // error will be checked bellow - var chn *amqp.Channel - chn, err = amqpPoster.Post( - nil, utils.PosterTransportContentTypes[rplCfg.Transport], body.([]byte), rplCfg.FallbackFileName()) - if chn != nil { - chn.Close() - } - } - default: - err = fmt.Errorf("unsupported replication transport: %s", rplCfg.Transport) - } - if err != nil { - utils.Logger.Err(fmt.Sprintf( - " Replicating CDR: %+v, transport: %s, got error: %s", cdr, rplCfg.Transport, err.Error())) - } - if rplCfg.Synchronous { - errChan <- err - } - }(body, rplCfg, content, errChan) - if rplCfg.Synchronous { // Synchronize here - <-errChan - } - } return nil + /* + for _, rplCfg := range self.cgrCfg.CDRSOnlineCDRExports { + passesFilters := true + for _, cdfFltr := range rplCfg.CdrFilter { + if !cdfFltr.FilterPasses(cdr.FieldAsString(cdfFltr)) { + passesFilters = false + break + } + } + if !passesFilters { // Not passes filters, ignore this replication + continue + } + var body interface{} + var content = "" + switch rplCfg.Transport { + case utils.MetaHTTPjsonCDR, utils.MetaAMQPjsonCDR: + jsn, err := json.Marshal(cdr) + if err != nil { + return err + } + body = jsn + case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap: + expMp, err := cdr.AsExportMap(rplCfg.ContentFields, self.cgrCfg.HttpSkipTlsVerify, nil) + if err != nil { + return err + } + jsn, err := json.Marshal(expMp) + if err != nil { + return err + } + body = jsn + case utils.META_HTTP_POST: + expMp, err := cdr.AsExportMap(rplCfg.ContentFields, self.cgrCfg.HttpSkipTlsVerify, nil) + if err != nil { + return err + } + vals := url.Values{} + for fld, val := range expMp { + vals.Set(fld, val) + } + body = vals + } + var errChan chan error + if rplCfg.Synchronous { + errChan = make(chan error) + } + go func(body interface{}, rplCfg *config.CDRReplicationCfg, content string, errChan chan error) { + var err error + fallbackPath := utils.META_NONE + if rplCfg.FallbackFileName() != utils.META_NONE { + fallbackPath = path.Join(self.cgrCfg.FailedPostsDir, rplCfg.FallbackFileName()) + } + switch rplCfg.Transport { + case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST: + _, err = self.httpPoster.Post(rplCfg.Address, utils.PosterTransportContentTypes[rplCfg.Transport], body, rplCfg.Attempts, fallbackPath) + case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap: + var amqpPoster *utils.AMQPPoster + amqpPoster, err = utils.AMQPPostersCache.GetAMQPPoster(rplCfg.Address, rplCfg.Attempts, self.cgrCfg.FailedPostsDir) + if err == nil { // error will be checked bellow + var chn *amqp.Channel + chn, err = amqpPoster.Post( + nil, utils.PosterTransportContentTypes[rplCfg.Transport], body.([]byte), rplCfg.FallbackFileName()) + if chn != nil { + chn.Close() + } + } + default: + err = fmt.Errorf("unsupported replication transport: %s", rplCfg.Transport) + } + if err != nil { + utils.Logger.Err(fmt.Sprintf( + " Replicating CDR: %+v, transport: %s, got error: %s", cdr, rplCfg.Transport, err.Error())) + } + if rplCfg.Synchronous { + errChan <- err + } + }(body, rplCfg, content, errChan) + if rplCfg.Synchronous { // Synchronize here + <-errChan + } + } + return nil + */ } // Called by rate/re-rate API, FixMe: deprecate it once new APIer structure is operational @@ -544,7 +542,7 @@ func (self *CdrServer) RateCDRs(cdrFltr *utils.CDRsFilter, sendToStats bool) err return err } for _, cdr := range cdrs { - if err := self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CDRSStoreCdrs, sendToStats, len(self.cgrCfg.CDRSCdrReplication) != 0); err != nil { + if err := self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CDRSStoreCdrs, sendToStats, len(self.cgrCfg.CDRSOnlineCDRExports) != 0); err != nil { utils.Logger.Err(fmt.Sprintf(" Processing CDR %+v, got error: %s", cdr, err.Error())) } } @@ -614,7 +612,7 @@ func (self *CdrServer) V1RateCDRs(attrs utils.AttrRateCDRs, reply *string) error if attrs.SendToStatS != nil { sendToStats = *attrs.SendToStatS } - replicate := len(self.cgrCfg.CDRSCdrReplication) != 0 + replicate := len(self.cgrCfg.CDRSOnlineCDRExports) != 0 if attrs.ReplicateCDRs != nil { replicate = *attrs.ReplicateCDRs } diff --git a/utils/consts.go b/utils/consts.go index c0012be2b..90a98fabb 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -18,7 +18,7 @@ along with this program. If not, see package utils var ( - CdreCdrFormats = []string{CSV, DRYRUN, CDRE_FIXED_WIDTH} + CDRExportFormats = []string{DRYRUN, MetaFileCSV, MetaFileFWV, MetaHTTPjsonCDR, MetaHTTPjsonMap, MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap} PrimaryCdrFields = []string{CGRID, CDRSOURCE, CDRHOST, ACCID, TOR, REQTYPE, DIRECTION, TENANT, CATEGORY, ACCOUNT, SUBJECT, DESTINATION, SETUP_TIME, PDD, ANSWER_TIME, USAGE, SUPPLIER, DISCONNECT_CAUSE, COST, RATED, PartialField, MEDI_RUNID} GitLastLog string // If set, it will be processed as part of versioning @@ -30,6 +30,15 @@ var ( MetaAMQPjsonCDR: CONTENT_JSON, MetaAMQPjsonMap: CONTENT_JSON, } + CDREFileSuffixes = map[string]string{ + MetaHTTPjsonCDR: JSNSuffix, + MetaHTTPjsonMap: JSNSuffix, + MetaAMQPjsonCDR: JSNSuffix, + MetaAMQPjsonMap: JSNSuffix, + META_HTTP_POST: FormSuffix, + MetaFileCSV: CSVSuffix, + MetaFileFWV: FWVSuffix, + } ) const ( @@ -348,10 +357,14 @@ const ( TxtSuffix = ".txt" JSNSuffix = ".json" FormSuffix = ".form" + CSVSuffix = ".csv" + FWVSuffix = ".fwv" CONTENT_JSON = "json" CONTENT_FORM = "form" CONTENT_TEXT = "text" FileLockPrefix = "file_" ActionsPoster = "act" CDRPoster = "cdr" + MetaFileCSV = "*file_csv" + MetaFileFWV = "*file_fwv" ) diff --git a/utils/poster.go b/utils/poster.go index 36b09ceeb..8b0903829 100644 --- a/utils/poster.go +++ b/utils/poster.go @@ -97,6 +97,9 @@ type FallbackFileName struct { } func (ffn *FallbackFileName) AsString() string { + if ffn.FileSuffix == "" { // Autopopulate FileSuffix based on the transport used + ffn.FileSuffix = CDREFileSuffixes[ffn.Transport] + } return fmt.Sprintf("%s%s%s%s%s%s%s%s", ffn.Module, HandlerArgSep, ffn.Transport, HandlerArgSep, url.QueryEscape(ffn.Address), HandlerArgSep, ffn.RequestID, ffn.FileSuffix) }