diff --git a/ees/ee.go b/ees/ee.go
index e91b75957..8324d88b0 100644
--- a/ees/ee.go
+++ b/ees/ee.go
@@ -22,6 +22,7 @@ import (
"fmt"
"github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -32,10 +33,10 @@ type EventExporter interface {
}
// NewEventExporter produces exporters
-func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int) (ee EventExporter, err error) {
+func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS) (ee EventExporter, err error) {
switch cgrCfg.EEsCfg().Exporters[cfgIdx].Type {
case utils.MetaFileCSV:
- return NewFileCSVee(cgrCfg, cfgIdx)
+ return NewFileCSVee(cgrCfg, cfgIdx, filterS)
default:
return nil, fmt.Errorf("unsupported exporter type: <%s>", cgrCfg.EEsCfg().Exporters[cfgIdx].Type)
}
diff --git a/ees/ees.go b/ees/ees.go
index 27eb66fcd..e4bbb18ed 100644
--- a/ees/ees.go
+++ b/ees/ees.go
@@ -171,7 +171,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithOpts, rply *s
}
}
if !isCached {
- if ee, err = NewEventExporter(eeS.cfg, cfgIdx); err != nil {
+ if ee, err = NewEventExporter(eeS.cfg, cfgIdx, eeS.filterS); err != nil {
return
}
if hasCache {
@@ -194,7 +194,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithOpts, rply *s
if sync {
wg.Done()
}
- }(!isCached, eeCfg.Synchronous)
+ }(!hasCache, eeCfg.Synchronous)
}
wg.Wait()
if withErr {
diff --git a/ees/filecsv.go b/ees/filecsv.go
index 24e7802a6..20f7d5d86 100644
--- a/ees/filecsv.go
+++ b/ees/filecsv.go
@@ -19,25 +19,57 @@ along with this program. If not, see
package ees
import (
+ "encoding/csv"
+ "os"
+ "strings"
+ "time"
+
+ "github.com/cgrates/cgrates/engine"
+
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
-func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int) (fCsv *FileCSVee, err error) {
- fCsv = &FileCSVee{cgrCfg: cgrCfg, cfgIdx: cfgIdx}
+func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS) (fCsv *FileCSVee, err error) {
+ fCsv = &FileCSVee{cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS}
err = fCsv.init()
return
}
// FileCSVee implements EventExporter interface for .csv files
type FileCSVee struct {
- id string
- cgrCfg *config.CGRConfig
- cfgIdx int // index of config instance within ERsCfg.Readers
+ id string
+ cgrCfg *config.CGRConfig
+ cfgIdx int // index of config instance within ERsCfg.Readers
+ filterS *engine.FilterS
+ file *os.File
+ csvWriter *csv.Writer
+ csvRecords [][]string
+ //hdrFields
+ //trlFields
+ //cntFields
+ firstEventATime, lastEventATime time.Time
+ numberOfRecords int
+ totalDuration, totalDataUsage, totalSmsUsage,
+ totalMmsUsage, totalGenericUsage time.Duration
+ totalCost float64
+ firstExpOrderID, lastExpOrderID int64
+ positiveExports utils.StringSet
+ negativeExports utils.StringSet
}
// init will create all the necessary dependencies, including opening the file
func (fCsv *FileCSVee) init() (err error) {
+ if fCsv.file, err = os.Create(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ExportPath); err != nil {
+ return
+ }
+ fCsv.csvWriter = csv.NewWriter(fCsv.file)
+ fCsv.csvWriter.Comma = utils.CSV_SEP
+ if len(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].FieldSep) > 0 {
+ fCsv.csvWriter.Comma = rune(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].FieldSep[0])
+ }
+ fCsv.positiveExports = utils.StringSet{}
+ fCsv.negativeExports = utils.StringSet{}
return
}
@@ -48,10 +80,139 @@ func (fCsv *FileCSVee) ID() string {
// OnEvicted implements EventExporter, doing the cleanup before exit
func (fCsv *FileCSVee) OnEvicted(_ string, _ interface{}) {
+ // verify if we need to add the trailer
+
+ fCsv.csvWriter.Flush()
+ fCsv.file.Close()
return
}
// ExportEvent implements EventExporter
func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
+ // convert cgrEvent in export record
+ fCsv.numberOfRecords++
+ var csvRecord []string
+ navMp := config.NewNavigableMap(map[string]interface{}{
+ utils.MetaReq: cgrEv.Event,
+ })
+ for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Fields {
+ if !strings.HasPrefix(cfgFld.Path, utils.MetaReq) {
+ continue
+ }
+ if pass, err := fCsv.filterS.Pass(cgrEv.Tenant, cfgFld.Filters,
+ navMp); err != nil || !pass {
+ continue
+ }
+ val, err := cfgFld.Value.ParseDataProvider(navMp, fCsv.cgrCfg.GeneralCfg().RSRSep)
+ if err != nil {
+ fCsv.negativeExports.Add(cgrEv.ID)
+ return
+ }
+ csvRecord = append(csvRecord, val)
+ }
+ if cost, err := cgrEv.FieldAsFloat64(utils.Cost); err == nil {
+ fCsv.totalCost += cost
+ }
+ fCsv.positiveExports.Add(cgrEv.ID)
+ fCsv.csvWriter.Write(csvRecord)
return
}
+
+//// Handle various meta functions used in header/trailer
+//func (fCsv *FileCSVee) metaHandler(tag, arg string) (string, error) {
+// switch tag {
+// case metaExportID:
+// return cdre.exportID, nil
+// case metaTimeNow:
+// return time.Now().Format(arg), nil
+// case metaFirstCDRAtime:
+// return cdre.firstCdrATime.Format(arg), nil
+// case metaLastCDRAtime:
+// return cdre.lastCdrATime.Format(arg), nil
+// case metaNrCDRs:
+// return strconv.Itoa(cdre.numberOfRecords), nil
+// case metaDurCDRs:
+// cdr := &CDR{ToR: utils.VOICE, Usage: cdre.totalDuration}
+// return cdr.FieldAsString(&config.RSRParser{Rules: "~" + utils.Usage, AllFiltersMatch: true})
+// case metaSMSUsage:
+// cdr := &CDR{ToR: utils.SMS, Usage: cdre.totalDuration}
+// return cdr.FieldAsString(&config.RSRParser{Rules: "~" + utils.Usage, AllFiltersMatch: true})
+// case metaMMSUsage:
+// cdr := &CDR{ToR: utils.MMS, Usage: cdre.totalDuration}
+// return cdr.FieldAsString(&config.RSRParser{Rules: "~" + utils.Usage, AllFiltersMatch: true})
+// case metaGenericUsage:
+// cdr := &CDR{ToR: utils.GENERIC, Usage: cdre.totalDuration}
+// return cdr.FieldAsString(&config.RSRParser{Rules: "~" + utils.Usage, AllFiltersMatch: true})
+// case metaDataUsage:
+// cdr := &CDR{ToR: utils.DATA, Usage: cdre.totalDuration}
+// return cdr.FieldAsString(&config.RSRParser{Rules: "~" + utils.Usage, AllFiltersMatch: true})
+// case metaCostCDRs:
+// return strconv.FormatFloat(utils.Round(cdre.totalCost,
+// globalRoundingDecimals, utils.ROUNDING_MIDDLE), 'f', -1, 64), nil
+// default:
+// return "", fmt.Errorf("Unsupported METATAG: %s", tag)
+// }
+//}
+
+// Compose and cache the header
+func (fCsv *FileCSVee) composeHeader() (err error) {
+ var csvRecord []string
+ for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Fields {
+ if !strings.HasPrefix(cfgFld.Path, utils.MetaHdr) {
+ continue
+ }
+ val, err := cfgFld.Value.ParseValue(utils.EmptyString)
+ if err != nil {
+ return
+ }
+ csvRecord = append(csvRecord, val)
+ }
+ fCsv.csvWriter.Write(csvRecord)
+ return nil
+}
+
+// Compose and cache the trailer
+func (fCsv *FileCSVee) composeTrailer() (err error) {
+ for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Fields {
+ if !strings.HasPrefix(cfgFld.Path, utils.MetaTrl) {
+ continue
+ }
+ //var outVal string
+ //switch cfgFld.Type {
+ //case utils.META_FILLER:
+ // out, err := cfgFld.Value.ParseValue(utils.EmptyString)
+ // if err != nil {
+ // return err
+ // }
+ // outVal = out
+ // cfgFld.Padding = utils.MetaRight
+ //case utils.META_CONSTANT:
+ // out, err := cfgFld.Value.ParseValue(utils.EmptyString)
+ // if err != nil {
+ // return err
+ // }
+ // outVal = out
+ //case utils.META_HANDLER:
+ // out, err := cfgFld.Value.ParseValue(utils.EmptyString)
+ // if err != nil {
+ // return err
+ // }
+ // outVal, err = cdre.metaHandler(out, cfgFld.Layout)
+ //default:
+ // return fmt.Errorf("Unsupported field type: %s", cfgFld.Type)
+ //}
+ //if err != nil {
+ // utils.Logger.Err(fmt.Sprintf(" Cannot export CDR trailer, field: %s, error: %s", cfgFld.Tag, err.Error()))
+ // return err
+ //}
+ //fmtOut := outVal
+ //if fmtOut, err = utils.FmtFieldWidth(cfgFld.Tag, outVal, cfgFld.Width, cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil {
+ // utils.Logger.Err(fmt.Sprintf(" Cannot export CDR trailer, field: %s, error: %s", cfgFld.Tag, err.Error()))
+ // return err
+ //}
+ //cdre.Lock()
+ //cdre.trailer = append(cdre.trailer, fmtOut)
+ //cdre.Unlock()
+ }
+ return nil
+}