Start updating csv exporter

This commit is contained in:
TeoV
2020-05-12 17:15:20 +03:00
committed by Dan Christian Bogos
parent 465848a5dd
commit 1b060548cf
3 changed files with 171 additions and 9 deletions

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -32,10 +33,10 @@ type EventExporter interface {
}
// NewEventExporter produces exporters
func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int) (ee EventExporter, err error) {
func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS) (ee EventExporter, err error) {
switch cgrCfg.EEsCfg().Exporters[cfgIdx].Type {
case utils.MetaFileCSV:
return NewFileCSVee(cgrCfg, cfgIdx)
return NewFileCSVee(cgrCfg, cfgIdx, filterS)
default:
return nil, fmt.Errorf("unsupported exporter type: <%s>", cgrCfg.EEsCfg().Exporters[cfgIdx].Type)
}

View File

@@ -171,7 +171,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithOpts, rply *s
}
}
if !isCached {
if ee, err = NewEventExporter(eeS.cfg, cfgIdx); err != nil {
if ee, err = NewEventExporter(eeS.cfg, cfgIdx, eeS.filterS); err != nil {
return
}
if hasCache {
@@ -194,7 +194,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithOpts, rply *s
if sync {
wg.Done()
}
}(!isCached, eeCfg.Synchronous)
}(!hasCache, eeCfg.Synchronous)
}
wg.Wait()
if withErr {

View File

@@ -19,25 +19,57 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ees
import (
"encoding/csv"
"os"
"strings"
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int) (fCsv *FileCSVee, err error) {
fCsv = &FileCSVee{cgrCfg: cgrCfg, cfgIdx: cfgIdx}
func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS) (fCsv *FileCSVee, err error) {
fCsv = &FileCSVee{cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS}
err = fCsv.init()
return
}
// FileCSVee implements EventExporter interface for .csv files
type FileCSVee struct {
id string
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
id string
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
file *os.File
csvWriter *csv.Writer
csvRecords [][]string
//hdrFields
//trlFields
//cntFields
firstEventATime, lastEventATime time.Time
numberOfRecords int
totalDuration, totalDataUsage, totalSmsUsage,
totalMmsUsage, totalGenericUsage time.Duration
totalCost float64
firstExpOrderID, lastExpOrderID int64
positiveExports utils.StringSet
negativeExports utils.StringSet
}
// init will create all the necessary dependencies, including opening the file
func (fCsv *FileCSVee) init() (err error) {
if fCsv.file, err = os.Create(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ExportPath); err != nil {
return
}
fCsv.csvWriter = csv.NewWriter(fCsv.file)
fCsv.csvWriter.Comma = utils.CSV_SEP
if len(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].FieldSep) > 0 {
fCsv.csvWriter.Comma = rune(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].FieldSep[0])
}
fCsv.positiveExports = utils.StringSet{}
fCsv.negativeExports = utils.StringSet{}
return
}
@@ -48,10 +80,139 @@ func (fCsv *FileCSVee) ID() string {
// OnEvicted implements EventExporter, doing the cleanup before exit
func (fCsv *FileCSVee) OnEvicted(_ string, _ interface{}) {
// verify if we need to add the trailer
fCsv.csvWriter.Flush()
fCsv.file.Close()
return
}
// ExportEvent implements EventExporter
func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
// convert cgrEvent in export record
fCsv.numberOfRecords++
var csvRecord []string
navMp := config.NewNavigableMap(map[string]interface{}{
utils.MetaReq: cgrEv.Event,
})
for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Fields {
if !strings.HasPrefix(cfgFld.Path, utils.MetaReq) {
continue
}
if pass, err := fCsv.filterS.Pass(cgrEv.Tenant, cfgFld.Filters,
navMp); err != nil || !pass {
continue
}
val, err := cfgFld.Value.ParseDataProvider(navMp, fCsv.cgrCfg.GeneralCfg().RSRSep)
if err != nil {
fCsv.negativeExports.Add(cgrEv.ID)
return
}
csvRecord = append(csvRecord, val)
}
if cost, err := cgrEv.FieldAsFloat64(utils.Cost); err == nil {
fCsv.totalCost += cost
}
fCsv.positiveExports.Add(cgrEv.ID)
fCsv.csvWriter.Write(csvRecord)
return
}
//// Handle various meta functions used in header/trailer
//func (fCsv *FileCSVee) metaHandler(tag, arg string) (string, error) {
// switch tag {
// case metaExportID:
// return cdre.exportID, nil
// case metaTimeNow:
// return time.Now().Format(arg), nil
// case metaFirstCDRAtime:
// return cdre.firstCdrATime.Format(arg), nil
// case metaLastCDRAtime:
// return cdre.lastCdrATime.Format(arg), nil
// case metaNrCDRs:
// return strconv.Itoa(cdre.numberOfRecords), nil
// case metaDurCDRs:
// cdr := &CDR{ToR: utils.VOICE, Usage: cdre.totalDuration}
// return cdr.FieldAsString(&config.RSRParser{Rules: "~" + utils.Usage, AllFiltersMatch: true})
// case metaSMSUsage:
// cdr := &CDR{ToR: utils.SMS, Usage: cdre.totalDuration}
// return cdr.FieldAsString(&config.RSRParser{Rules: "~" + utils.Usage, AllFiltersMatch: true})
// case metaMMSUsage:
// cdr := &CDR{ToR: utils.MMS, Usage: cdre.totalDuration}
// return cdr.FieldAsString(&config.RSRParser{Rules: "~" + utils.Usage, AllFiltersMatch: true})
// case metaGenericUsage:
// cdr := &CDR{ToR: utils.GENERIC, Usage: cdre.totalDuration}
// return cdr.FieldAsString(&config.RSRParser{Rules: "~" + utils.Usage, AllFiltersMatch: true})
// case metaDataUsage:
// cdr := &CDR{ToR: utils.DATA, Usage: cdre.totalDuration}
// return cdr.FieldAsString(&config.RSRParser{Rules: "~" + utils.Usage, AllFiltersMatch: true})
// case metaCostCDRs:
// return strconv.FormatFloat(utils.Round(cdre.totalCost,
// globalRoundingDecimals, utils.ROUNDING_MIDDLE), 'f', -1, 64), nil
// default:
// return "", fmt.Errorf("Unsupported METATAG: %s", tag)
// }
//}
// Compose and cache the header
func (fCsv *FileCSVee) composeHeader() (err error) {
var csvRecord []string
for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Fields {
if !strings.HasPrefix(cfgFld.Path, utils.MetaHdr) {
continue
}
val, err := cfgFld.Value.ParseValue(utils.EmptyString)
if err != nil {
return
}
csvRecord = append(csvRecord, val)
}
fCsv.csvWriter.Write(csvRecord)
return nil
}
// Compose and cache the trailer
func (fCsv *FileCSVee) composeTrailer() (err error) {
for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Fields {
if !strings.HasPrefix(cfgFld.Path, utils.MetaTrl) {
continue
}
//var outVal string
//switch cfgFld.Type {
//case utils.META_FILLER:
// out, err := cfgFld.Value.ParseValue(utils.EmptyString)
// if err != nil {
// return err
// }
// outVal = out
// cfgFld.Padding = utils.MetaRight
//case utils.META_CONSTANT:
// out, err := cfgFld.Value.ParseValue(utils.EmptyString)
// if err != nil {
// return err
// }
// outVal = out
//case utils.META_HANDLER:
// out, err := cfgFld.Value.ParseValue(utils.EmptyString)
// if err != nil {
// return err
// }
// outVal, err = cdre.metaHandler(out, cfgFld.Layout)
//default:
// return fmt.Errorf("Unsupported field type: %s", cfgFld.Type)
//}
//if err != nil {
// utils.Logger.Err(fmt.Sprintf("<CdreFw> Cannot export CDR trailer, field: %s, error: %s", cfgFld.Tag, err.Error()))
// return err
//}
//fmtOut := outVal
//if fmtOut, err = utils.FmtFieldWidth(cfgFld.Tag, outVal, cfgFld.Width, cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil {
// utils.Logger.Err(fmt.Sprintf("<CdreFw> Cannot export CDR trailer, field: %s, error: %s", cfgFld.Tag, err.Error()))
// return err
//}
//cdre.Lock()
//cdre.trailer = append(cdre.trailer, fmtOut)
//cdre.Unlock()
}
return nil
}