Finish implementation for ees *file_csv format

This commit is contained in:
TeoV
2020-06-01 16:55:36 +03:00
committed by Dan Christian Bogos
parent 50511c0eab
commit 31387e2fe2
14 changed files with 612 additions and 291 deletions

View File

@@ -23,7 +23,6 @@ import (
"fmt"
"os"
"path"
"strconv"
"sync"
"time"
@@ -33,9 +32,11 @@ import (
"github.com/cgrates/cgrates/utils"
)
func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS) (fCsv *FileCSVee, err error) {
func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc utils.MapStorage) (fCsv *FileCSVee, err error) {
dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID
fCsv = &FileCSVee{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS}
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
err = fCsv.init()
return
}
@@ -49,15 +50,7 @@ type FileCSVee struct {
file *os.File
csvWriter *csv.Writer
sync.RWMutex
firstEventATime, lastEventATime time.Time
numberOfEvents int
totalDuration, totalDataUsage, totalSmsUsage,
totalMmsUsage, totalGenericUsage time.Duration
totalCost float64
firstExpOrderID, lastExpOrderID int64
positiveExports utils.StringSet
negativeExports utils.StringSet
dc utils.MapStorage
}
// init will create all the necessary dependencies, including opening the file
@@ -72,8 +65,6 @@ func (fCsv *FileCSVee) init() (err error) {
if len(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].FieldSep) > 0 {
fCsv.csvWriter.Comma = rune(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].FieldSep[0])
}
fCsv.positiveExports = utils.StringSet{}
fCsv.negativeExports = utils.StringSet{}
return fCsv.composeHeader()
}
@@ -101,60 +92,64 @@ func (fCsv *FileCSVee) OnEvicted(_ string, _ interface{}) {
func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
fCsv.Lock()
defer fCsv.Unlock()
fCsv.numberOfEvents++
fCsv.dc[utils.NumberOfEvents] = fCsv.dc[utils.NumberOfEvents].(int) + 1
var csvRecord []string
navMp := utils.MapStorage{utils.MetaReq: cgrEv.Event}
for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ContentFields() {
if pass, err := fCsv.filterS.Pass(cgrEv.Tenant, cfgFld.Filters,
navMp); err != nil || !pass {
continue
req := utils.MapStorage{}
for k, v := range cgrEv.Event {
req[k] = v
}
eeReq := NewEventExporterRequest(req, fCsv.dc, cgrEv.Tenant, fCsv.cgrCfg.GeneralCfg().DefaultTimezone,
fCsv.filterS)
if err = eeReq.SetFields(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ContentFields()); err != nil {
fCsv.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
return
}
for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() {
var strVal string
if strVal, err = eeReq.cnt.FieldAsString(el.Value.Slice()); err != nil {
return
}
val, err := cfgFld.Value.ParseDataProvider(navMp, utils.NestingSep)
if err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, cfgFld.Value.GetRule())
}
fCsv.negativeExports.Add(cgrEv.ID)
return err
}
csvRecord = append(csvRecord, val)
csvRecord = append(csvRecord, strVal)
}
if aTime, err := cgrEv.FieldAsTime(utils.AnswerTime, fCsv.cgrCfg.GeneralCfg().DefaultTimezone); err == nil {
if fCsv.firstEventATime.IsZero() || fCsv.firstEventATime.Before(aTime) {
fCsv.firstEventATime = aTime
if fCsv.dc[utils.FirstEventATime].(time.Time).IsZero() || fCsv.dc[utils.FirstEventATime].(time.Time).Before(aTime) {
fCsv.dc[utils.FirstEventATime] = aTime
}
if aTime.After(fCsv.lastEventATime) {
fCsv.lastEventATime = aTime
if aTime.After(fCsv.dc[utils.LastEventATime].(time.Time)) {
fCsv.dc[utils.LastEventATime] = aTime
}
}
if oID, err := cgrEv.FieldAsInt64(utils.OrderID); err == nil {
if fCsv.firstExpOrderID > oID || fCsv.firstExpOrderID == 0 {
fCsv.firstExpOrderID = oID
if fCsv.dc[utils.FirstExpOrderID].(int64) > oID || fCsv.dc[utils.FirstExpOrderID].(int64) == 0 {
fCsv.dc[utils.FirstExpOrderID] = oID
}
if fCsv.lastExpOrderID < oID {
fCsv.lastExpOrderID = oID
if fCsv.dc[utils.LastExpOrderID].(int64) < oID {
fCsv.dc[utils.LastExpOrderID] = oID
}
}
if cost, err := cgrEv.FieldAsFloat64(utils.Cost); err == nil {
fCsv.totalCost += cost
fCsv.dc[utils.TotalCost] = fCsv.dc[utils.TotalCost].(float64) + cost
}
if tor, err := cgrEv.FieldAsString(utils.ToR); err == nil {
if usage, err := cgrEv.FieldAsDuration(utils.Usage); err == nil {
switch tor {
case utils.VOICE:
fCsv.totalDuration += usage
fCsv.dc[utils.TotalDuration] = fCsv.dc[utils.TotalDuration].(time.Duration) + usage
case utils.SMS:
fCsv.totalSmsUsage += usage
fCsv.dc[utils.TotalSMSUsage] = fCsv.dc[utils.TotalSMSUsage].(time.Duration) + usage
case utils.MMS:
fCsv.totalMmsUsage += usage
fCsv.dc[utils.TotalMMSUsage] = fCsv.dc[utils.TotalMMSUsage].(time.Duration) + usage
case utils.GENERIC:
fCsv.totalGenericUsage += usage
fCsv.dc[utils.TotalGenericUsage] = fCsv.dc[utils.TotalGenericUsage].(time.Duration) + usage
case utils.DATA:
fCsv.totalDataUsage += usage
fCsv.dc[utils.TotalDataUsage] = fCsv.dc[utils.TotalDataUsage].(time.Duration) + usage
}
}
}
fCsv.positiveExports.Add(cgrEv.ID)
fCsv.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
fCsv.csvWriter.Write(csvRecord)
return
}
@@ -165,30 +160,17 @@ func (fCsv *FileCSVee) composeHeader() (err error) {
return
}
var csvRecord []string
for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].HeaderFields() {
var outVal string
switch cfgFld.Type {
case utils.META_CONSTANT:
outVal, err = cfgFld.Value.ParseValue(utils.EmptyString)
if err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, cfgFld.Value.GetRule())
}
return err
}
case utils.MetaExportID:
outVal = fCsv.id
case utils.MetaTimeNow:
outVal = time.Now().String()
default:
return fmt.Errorf("unsupported type in header for field: <%+v>", utils.ToJSON(cfgFld))
eeReq := NewEventExporterRequest(nil, fCsv.dc, fCsv.cgrCfg.GeneralCfg().DefaultTenant, fCsv.cgrCfg.GeneralCfg().DefaultTimezone,
fCsv.filterS)
if err = eeReq.SetFields(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].HeaderFields()); err != nil {
return
}
for el := eeReq.hdr.GetFirstElement(); el != nil; el = el.Next() {
var strVal string
if strVal, err = eeReq.hdr.FieldAsString(el.Value.Slice()); err != nil {
return
}
fmtOut := outVal
if fmtOut, err = utils.FmtFieldWidth(cfgFld.Tag, outVal, cfgFld.Width,
cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil {
return err
}
csvRecord = append(csvRecord, fmtOut)
csvRecord = append(csvRecord, strVal)
}
return fCsv.csvWriter.Write(csvRecord)
}
@@ -199,57 +181,17 @@ func (fCsv *FileCSVee) composeTrailer() (err error) {
return
}
var csvRecord []string
for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].TrailerFields() {
var val string
switch cfgFld.Type {
case utils.META_CONSTANT:
val, err = cfgFld.Value.ParseValue(utils.EmptyString)
if err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, cfgFld.Value.GetRule())
}
return err
}
case utils.MetaExportID:
val = fCsv.id
case utils.MetaTimeNow:
val = time.Now().String()
case utils.MetaFirstEventATime:
val = fCsv.firstEventATime.Format(cfgFld.Layout)
case utils.MetaLastEventATime:
val = fCsv.lastEventATime.Format(cfgFld.Layout)
case utils.MetaEventNumber:
val = strconv.Itoa(fCsv.numberOfEvents)
case utils.MetaEventCost:
rounding := fCsv.cgrCfg.GeneralCfg().RoundingDecimals
if cfgFld.RoundingDecimals != nil {
rounding = *cfgFld.RoundingDecimals
}
val = strconv.FormatFloat(utils.Round(fCsv.totalCost,
rounding, utils.ROUNDING_MIDDLE), 'f', -1, 64)
case utils.MetaVoiceUsage:
val = fCsv.totalDuration.String()
case utils.MetaDataUsage:
val = strconv.Itoa(int(fCsv.totalDataUsage.Nanoseconds()))
case utils.MetaSMSUsage:
val = strconv.Itoa(int(fCsv.totalSmsUsage.Nanoseconds()))
case utils.MetaMMSUsage:
val = strconv.Itoa(int(fCsv.totalMmsUsage.Nanoseconds()))
case utils.MetaGenericUsage:
val = strconv.Itoa(int(fCsv.totalGenericUsage.Nanoseconds()))
case utils.MetaNegativeExports:
val = strconv.Itoa(len(fCsv.negativeExports.AsSlice()))
case utils.MetaPositiveExports:
val = strconv.Itoa(len(fCsv.positiveExports.AsSlice()))
default:
return fmt.Errorf("unsupported type in trailer for field: <%+v>", utils.ToJSON(cfgFld))
eeReq := NewEventExporterRequest(nil, fCsv.dc, fCsv.cgrCfg.GeneralCfg().DefaultTenant, fCsv.cgrCfg.GeneralCfg().DefaultTimezone,
fCsv.filterS)
if err = eeReq.SetFields(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].TrailerFields()); err != nil {
return
}
for el := eeReq.trl.GetFirstElement(); el != nil; el = el.Next() {
var strVal string
if strVal, err = eeReq.trl.FieldAsString(el.Value.Slice()); err != nil {
return
}
fmtOut := val
if fmtOut, err = utils.FmtFieldWidth(cfgFld.Tag, val, cfgFld.Width,
cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil {
return err
}
csvRecord = append(csvRecord, fmtOut)
csvRecord = append(csvRecord, strVal)
}
return fCsv.csvWriter.Write(csvRecord)
}