Populate custom fields out of event

This commit is contained in:
TeoV
2020-05-13 14:12:41 +03:00
committed by Dan Christian Bogos
parent 1b060548cf
commit ed6647d1f2
3 changed files with 92 additions and 71 deletions

View File

@@ -132,7 +132,9 @@ type EventExporterCfg struct {
Synchronous bool
Attempts int
FieldSep string
Fields []*FCTemplate
HeaderFields []*FCTemplate
ContentFields []*FCTemplate
TrailerFields []*FCTemplate
}
func (eeC *EventExporterCfg) loadFromJsonCfg(jsnEec *EventExporterJsonCfg, separator string) (err error) {
@@ -186,9 +188,22 @@ func (eeC *EventExporterCfg) loadFromJsonCfg(jsnEec *EventExporterJsonCfg, separ
eeC.FieldSep = *jsnEec.Field_separator
}
if jsnEec.Fields != nil {
if eeC.Fields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnEec.Fields, separator); err != nil {
if fields, err := FCTemplatesFromFCTemplatesJsonCfg(*jsnEec.Fields, separator); err != nil {
return err
} else {
for _, field := range fields {
switch field.GetPathSlice()[0] {
case utils.MetaHdr:
eeC.HeaderFields = append(eeC.HeaderFields, field)
case utils.MetaExp:
eeC.ContentFields = append(eeC.HeaderFields, field)
case utils.MetaTrl:
eeC.TrailerFields = append(eeC.HeaderFields, field)
}
}
}
}
return
}
@@ -223,9 +238,17 @@ func (eeC *EventExporterCfg) Clone() (cln *EventExporterCfg) {
cln.Synchronous = eeC.Synchronous
cln.Attempts = eeC.Attempts
cln.FieldSep = eeC.FieldSep
cln.Fields = make([]*FCTemplate, len(eeC.Fields))
for idx, fld := range eeC.Fields {
cln.Fields[idx] = fld.Clone()
cln.HeaderFields = make([]*FCTemplate, len(eeC.HeaderFields))
for idx, fld := range eeC.HeaderFields {
cln.HeaderFields[idx] = fld.Clone()
}
cln.ContentFields = make([]*FCTemplate, len(eeC.ContentFields))
for idx, fld := range eeC.ContentFields {
cln.ContentFields[idx] = fld.Clone()
}
cln.TrailerFields = make([]*FCTemplate, len(eeC.TrailerFields))
for idx, fld := range eeC.TrailerFields {
cln.TrailerFields[idx] = fld.Clone()
}
return
}
@@ -248,9 +271,15 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) map[string]interfa
}
flags[key] = buf
}
fields := make([]map[string]interface{}, len(eeC.Fields))
for i, item := range eeC.Fields {
fields[i] = item.AsMapInterface(separator)
fields := make([]map[string]interface{}, 0, len(eeC.HeaderFields)+len(eeC.ContentFields)+len(eeC.TrailerFields))
for _, fld := range eeC.HeaderFields {
fields = append(fields, fld.AsMapInterface(separator))
}
for _, fld := range eeC.ContentFields {
fields = append(fields, fld.AsMapInterface(separator))
}
for _, fld := range eeC.TrailerFields {
fields = append(fields, fld.AsMapInterface(separator))
}
return map[string]interface{}{

View File

@@ -21,7 +21,6 @@ package ees
import (
"encoding/csv"
"os"
"strings"
"time"
"github.com/cgrates/cgrates/engine"
@@ -38,16 +37,13 @@ func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS)
// FileCSVee implements EventExporter interface for .csv files
type FileCSVee struct {
id string
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
file *os.File
csvWriter *csv.Writer
csvRecords [][]string
//hdrFields
//trlFields
//cntFields
id string
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
file *os.File
csvWriter *csv.Writer
firstEventATime, lastEventATime time.Time
numberOfRecords int
totalDuration, totalDataUsage, totalSmsUsage,
@@ -81,7 +77,6 @@ func (fCsv *FileCSVee) ID() string {
// OnEvicted implements EventExporter, doing the cleanup before exit
func (fCsv *FileCSVee) OnEvicted(_ string, _ interface{}) {
// verify if we need to add the trailer
fCsv.csvWriter.Flush()
fCsv.file.Close()
return
@@ -95,10 +90,7 @@ func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
navMp := config.NewNavigableMap(map[string]interface{}{
utils.MetaReq: cgrEv.Event,
})
for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Fields {
if !strings.HasPrefix(cfgFld.Path, utils.MetaReq) {
continue
}
for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ContentFields {
if pass, err := fCsv.filterS.Pass(cgrEv.Tenant, cfgFld.Filters,
navMp); err != nil || !pass {
continue
@@ -106,13 +98,45 @@ func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
val, err := cfgFld.Value.ParseDataProvider(navMp, fCsv.cgrCfg.GeneralCfg().RSRSep)
if err != nil {
fCsv.negativeExports.Add(cgrEv.ID)
return
return err
}
csvRecord = append(csvRecord, val)
}
if aTime, err := cgrEv.FieldAsTime(utils.AnswerTime, fCsv.cgrCfg.GeneralCfg().DefaultTimezone); err == nil {
if fCsv.firstEventATime.IsZero() || fCsv.firstEventATime.Before(aTime) {
fCsv.firstEventATime = aTime
}
if aTime.After(fCsv.lastEventATime) {
fCsv.lastEventATime = aTime
}
}
if oID, err := cgrEv.FieldAsInt64(utils.OrderID); err == nil {
if fCsv.firstExpOrderID > oID || fCsv.firstExpOrderID == 0 {
fCsv.firstExpOrderID = oID
}
if fCsv.lastExpOrderID < oID {
fCsv.lastExpOrderID = oID
}
}
if cost, err := cgrEv.FieldAsFloat64(utils.Cost); err == nil {
fCsv.totalCost += cost
}
if tor, err := cgrEv.FieldAsString(utils.ToR); err == nil {
if usage, err := cgrEv.FieldAsDuration(utils.Usage); err == nil {
switch tor {
case utils.VOICE:
fCsv.totalDuration += usage
case utils.SMS:
fCsv.totalSmsUsage += usage
case utils.MMS:
fCsv.totalMmsUsage += usage
case utils.GENERIC:
fCsv.totalGenericUsage += usage
case utils.DATA:
fCsv.totalDataUsage += usage
}
}
}
fCsv.positiveExports.Add(cgrEv.ID)
fCsv.csvWriter.Write(csvRecord)
return
@@ -157,13 +181,10 @@ func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
// Compose and cache the header
func (fCsv *FileCSVee) composeHeader() (err error) {
var csvRecord []string
for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Fields {
if !strings.HasPrefix(cfgFld.Path, utils.MetaHdr) {
continue
}
for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].HeaderFields {
val, err := cfgFld.Value.ParseValue(utils.EmptyString)
if err != nil {
return
return err
}
csvRecord = append(csvRecord, val)
}
@@ -173,46 +194,8 @@ func (fCsv *FileCSVee) composeHeader() (err error) {
// Compose and cache the trailer
func (fCsv *FileCSVee) composeTrailer() (err error) {
for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Fields {
if !strings.HasPrefix(cfgFld.Path, utils.MetaTrl) {
continue
}
//var outVal string
//switch cfgFld.Type {
//case utils.META_FILLER:
// out, err := cfgFld.Value.ParseValue(utils.EmptyString)
// if err != nil {
// return err
// }
// outVal = out
// cfgFld.Padding = utils.MetaRight
//case utils.META_CONSTANT:
// out, err := cfgFld.Value.ParseValue(utils.EmptyString)
// if err != nil {
// return err
// }
// outVal = out
//case utils.META_HANDLER:
// out, err := cfgFld.Value.ParseValue(utils.EmptyString)
// if err != nil {
// return err
// }
// outVal, err = cdre.metaHandler(out, cfgFld.Layout)
//default:
// return fmt.Errorf("Unsupported field type: %s", cfgFld.Type)
//}
//if err != nil {
// utils.Logger.Err(fmt.Sprintf("<CdreFw> Cannot export CDR trailer, field: %s, error: %s", cfgFld.Tag, err.Error()))
// return err
//}
//fmtOut := outVal
//if fmtOut, err = utils.FmtFieldWidth(cfgFld.Tag, outVal, cfgFld.Width, cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil {
// utils.Logger.Err(fmt.Sprintf("<CdreFw> Cannot export CDR trailer, field: %s, error: %s", cfgFld.Tag, err.Error()))
// return err
//}
//cdre.Lock()
//cdre.trailer = append(cdre.trailer, fmtOut)
//cdre.Unlock()
}
//for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].TrailerFields {
//
//}
return nil
}

View File

@@ -82,6 +82,15 @@ func (ev *CGREvent) FieldAsFloat64(fldName string) (f float64, err error) {
return IfaceAsFloat64(iface)
}
// FieldAsInt64 returns a field as int64 instance
func (ev *CGREvent) FieldAsInt64(fldName string) (f int64, err error) {
iface, has := ev.Event[fldName]
if !has {
return f, ErrNotFound
}
return IfaceAsInt64(iface)
}
func (ev *CGREvent) TenantID() string {
return ConcatenatedKey(ev.Tenant, ev.ID)
}