mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Consider timezone and tenant from exporter first
This commit is contained in:
committed by
Dan Christian Bogos
parent
5c1cd3379e
commit
7fbab0c3a2
@@ -65,6 +65,7 @@
|
||||
"type": "*file_csv",
|
||||
"export_path": "/tmp/testCSV",
|
||||
"tenant": "cgrates.org",
|
||||
"timezone": "UTC",
|
||||
"synchronous": true,
|
||||
"attempts": 1,
|
||||
"field_separator": ",",
|
||||
@@ -90,6 +91,7 @@
|
||||
"type": "*file_csv",
|
||||
"export_path": "/tmp/testCSV2",
|
||||
"tenant": "cgrates.org",
|
||||
"timezone": "UTC",
|
||||
"synchronous": true,
|
||||
"attempts": 1,
|
||||
"field_separator": ",",
|
||||
@@ -116,6 +118,7 @@
|
||||
"type": "*file_csv",
|
||||
"export_path": "/tmp/testCSV3",
|
||||
"tenant": "cgrates.org",
|
||||
"timezone": "UTC",
|
||||
"synchronous": false,
|
||||
"attempts": 1,
|
||||
"field_separator": ",",
|
||||
|
||||
@@ -69,6 +69,7 @@
|
||||
"type": "*file_csv",
|
||||
"export_path": "/tmp/testCSV",
|
||||
"tenant": "cgrates.org",
|
||||
"timezone": "UTC",
|
||||
"synchronous": true,
|
||||
"attempts": 1,
|
||||
"field_separator": ",",
|
||||
@@ -94,6 +95,7 @@
|
||||
"type": "*file_csv",
|
||||
"export_path": "/tmp/testCSV2",
|
||||
"tenant": "cgrates.org",
|
||||
"timezone": "UTC",
|
||||
"synchronous": true,
|
||||
"attempts": 1,
|
||||
"field_separator": ",",
|
||||
@@ -120,6 +122,7 @@
|
||||
"type": "*file_csv",
|
||||
"export_path": "/tmp/testCSV3",
|
||||
"tenant": "cgrates.org",
|
||||
"timezone": "UTC",
|
||||
"synchronous": false,
|
||||
"attempts": 1,
|
||||
"field_separator": ",",
|
||||
|
||||
@@ -67,6 +67,7 @@
|
||||
"type": "*file_csv",
|
||||
"export_path": "/tmp/testCSV",
|
||||
"tenant": "cgrates.org",
|
||||
"timezone": "UTC",
|
||||
"synchronous": true,
|
||||
"attempts": 1,
|
||||
"field_separator": ",",
|
||||
@@ -81,8 +82,8 @@
|
||||
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
|
||||
{"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"},
|
||||
{"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"},
|
||||
{"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime{*time_string}" },
|
||||
{"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime{*time_string}"},
|
||||
{"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime{*time_string:2006-01-02T15:04:05Z}" },
|
||||
{"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime{*time_string:2006-01-02T15:04:05Z}"},
|
||||
{"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage{*duration_seconds}"},
|
||||
{"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"},
|
||||
],
|
||||
@@ -92,6 +93,7 @@
|
||||
"type": "*file_csv",
|
||||
"export_path": "/tmp/testCSV2",
|
||||
"tenant": "cgrates.org",
|
||||
"timezone": "UTC",
|
||||
"synchronous": true,
|
||||
"attempts": 1,
|
||||
"field_separator": ",",
|
||||
@@ -107,8 +109,8 @@
|
||||
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
|
||||
{"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"},
|
||||
{"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"},
|
||||
{"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime{*time_string}" },
|
||||
{"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime{*time_string}"},
|
||||
{"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime{*time_string:2006-01-02T15:04:05Z}" },
|
||||
{"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime{*time_string:2006-01-02T15:04:05Z}"},
|
||||
{"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage{*duration_seconds}"},
|
||||
{"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"},
|
||||
],
|
||||
@@ -118,6 +120,7 @@
|
||||
"type": "*file_csv",
|
||||
"export_path": "/tmp/testCSV3",
|
||||
"tenant": "cgrates.org",
|
||||
"timezone": "UTC",
|
||||
"synchronous": false,
|
||||
"attempts": 1,
|
||||
"field_separator": ",",
|
||||
@@ -133,8 +136,8 @@
|
||||
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
|
||||
{"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"},
|
||||
{"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"},
|
||||
{"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime{*time_string}" },
|
||||
{"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime{*time_string}"},
|
||||
{"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime{*time_string:2006-01-02T15:04:05Z}" },
|
||||
{"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime{*time_string:2006-01-02T15:04:05Z}"},
|
||||
{"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage{*duration_seconds}"},
|
||||
{"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"},
|
||||
],
|
||||
|
||||
@@ -34,7 +34,13 @@ type EventExporter interface {
|
||||
}
|
||||
|
||||
// NewEventExporter produces exporters
|
||||
func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, dc utils.MapStorage) (ee EventExporter, err error) {
|
||||
func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS) (ee EventExporter, err error) {
|
||||
var dc utils.MapStorage
|
||||
if dc, err = newEEMetrics(utils.FirstNonEmpty(
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone,
|
||||
cgrCfg.GeneralCfg().DefaultTimezone)); err != nil {
|
||||
return
|
||||
}
|
||||
switch cgrCfg.EEsCfg().Exporters[cfgIdx].Type {
|
||||
case utils.MetaFileCSV:
|
||||
return NewFileCSVee(cgrCfg, cfgIdx, filterS, dc)
|
||||
|
||||
12
ees/eereq.go
12
ees/eereq.go
@@ -33,17 +33,25 @@ import (
|
||||
|
||||
// NewEventExporterRequest returns a new EventExporterRequest
|
||||
func NewEventExporterRequest(req utils.DataProvider, dc utils.MapStorage,
|
||||
tnt, timezone string, filterS *engine.FilterS) (eeR *EventExporterRequest) {
|
||||
tntTpl config.RSRParsers,
|
||||
dfltTenant, timezone string, filterS *engine.FilterS) (eeR *EventExporterRequest) {
|
||||
eeR = &EventExporterRequest{
|
||||
req: req,
|
||||
tmz: timezone,
|
||||
tnt: tnt,
|
||||
filterS: filterS,
|
||||
cnt: utils.NewOrderedNavigableMap(),
|
||||
hdr: utils.NewOrderedNavigableMap(),
|
||||
trl: utils.NewOrderedNavigableMap(),
|
||||
dc: dc,
|
||||
}
|
||||
// populate tenant
|
||||
if tntIf, err := eeR.ParseField(
|
||||
&config.FCTemplate{Type: utils.META_COMPOSED,
|
||||
Value: tntTpl}); err == nil && tntIf.(string) != "" {
|
||||
eeR.tnt = tntIf.(string)
|
||||
} else {
|
||||
eeR.tnt = dfltTenant
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
14
ees/ees.go
14
ees/ees.go
@@ -198,7 +198,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma
|
||||
}
|
||||
}
|
||||
if !isCached {
|
||||
if ee, err = NewEventExporter(eeS.cfg, cfgIdx, eeS.filterS, newEEMetrics()); err != nil {
|
||||
if ee, err = NewEventExporter(eeS.cfg, cfgIdx, eeS.filterS); err != nil {
|
||||
return
|
||||
}
|
||||
if hasCache {
|
||||
@@ -265,13 +265,19 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma
|
||||
return
|
||||
}
|
||||
|
||||
func newEEMetrics() utils.MapStorage {
|
||||
func newEEMetrics(location string) (utils.MapStorage, error) {
|
||||
tNow := time.Now()
|
||||
loc, err := time.LoadLocation(location)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return utils.MapStorage{
|
||||
utils.NumberOfEvents: int64(0),
|
||||
utils.PositiveExports: utils.StringSet{},
|
||||
utils.NegativeExports: utils.StringSet{},
|
||||
utils.TimeNow: time.Now(),
|
||||
}
|
||||
utils.TimeNow: time.Date(tNow.Year(), tNow.Month(), tNow.Day(),
|
||||
tNow.Hour(), tNow.Minute(), tNow.Second(), tNow.Nanosecond(), loc),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func updateEEMetrics(dc utils.MapStorage, ev engine.MapEvent, timezone string) {
|
||||
|
||||
@@ -28,7 +28,7 @@ import (
|
||||
)
|
||||
|
||||
func TestUpdateEEMetrics(t *testing.T) {
|
||||
dc := newEEMetrics()
|
||||
dc, _ := newEEMetrics(utils.EmptyString)
|
||||
tnow := time.Now()
|
||||
ev := engine.MapEvent{
|
||||
utils.AnswerTime: tnow,
|
||||
@@ -37,7 +37,7 @@ func TestUpdateEEMetrics(t *testing.T) {
|
||||
utils.ToR: utils.VOICE,
|
||||
utils.Usage: time.Second,
|
||||
}
|
||||
exp := newEEMetrics()
|
||||
exp, _ := newEEMetrics(utils.EmptyString)
|
||||
exp[utils.FirstEventATime] = tnow
|
||||
exp[utils.LastEventATime] = tnow
|
||||
exp[utils.FirstExpOrderID] = int64(1)
|
||||
|
||||
@@ -105,7 +105,10 @@ func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
|
||||
for k, v := range cgrEv.Event {
|
||||
req[k] = v
|
||||
}
|
||||
eeReq := NewEventExporterRequest(req, fCsv.dc, cgrEv.Tenant, fCsv.cgrCfg.GeneralCfg().DefaultTimezone,
|
||||
eeReq := NewEventExporterRequest(req, fCsv.dc, fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Tenant,
|
||||
fCsv.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
utils.FirstNonEmpty(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Timezone,
|
||||
fCsv.cgrCfg.GeneralCfg().DefaultTimezone),
|
||||
fCsv.filterS)
|
||||
|
||||
if err = eeReq.SetFields(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ContentFields()); err != nil {
|
||||
@@ -118,7 +121,8 @@ func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
|
||||
}
|
||||
csvRecord = append(csvRecord, strVal)
|
||||
}
|
||||
updateEEMetrics(fCsv.dc, cgrEv.Event, fCsv.cgrCfg.GeneralCfg().DefaultTimezone)
|
||||
updateEEMetrics(fCsv.dc, cgrEv.Event, utils.FirstNonEmpty(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Timezone,
|
||||
fCsv.cgrCfg.GeneralCfg().DefaultTimezone))
|
||||
fCsv.csvWriter.Write(csvRecord)
|
||||
return
|
||||
}
|
||||
@@ -129,7 +133,10 @@ func (fCsv *FileCSVee) composeHeader() (err error) {
|
||||
return
|
||||
}
|
||||
var csvRecord []string
|
||||
eeReq := NewEventExporterRequest(nil, fCsv.dc, fCsv.cgrCfg.GeneralCfg().DefaultTenant, fCsv.cgrCfg.GeneralCfg().DefaultTimezone,
|
||||
eeReq := NewEventExporterRequest(nil, fCsv.dc, fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Tenant,
|
||||
fCsv.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
utils.FirstNonEmpty(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Timezone,
|
||||
fCsv.cgrCfg.GeneralCfg().DefaultTimezone),
|
||||
fCsv.filterS)
|
||||
if err = eeReq.SetFields(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].HeaderFields()); err != nil {
|
||||
return
|
||||
@@ -150,7 +157,11 @@ func (fCsv *FileCSVee) composeTrailer() (err error) {
|
||||
return
|
||||
}
|
||||
var csvRecord []string
|
||||
eeReq := NewEventExporterRequest(nil, fCsv.dc, fCsv.cgrCfg.GeneralCfg().DefaultTenant, fCsv.cgrCfg.GeneralCfg().DefaultTimezone,
|
||||
eeReq := NewEventExporterRequest(nil, fCsv.dc,
|
||||
fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Tenant,
|
||||
fCsv.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
utils.FirstNonEmpty(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Timezone,
|
||||
fCsv.cgrCfg.GeneralCfg().DefaultTimezone),
|
||||
fCsv.filterS)
|
||||
if err = eeReq.SetFields(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].TrailerFields()); err != nil {
|
||||
return
|
||||
|
||||
@@ -95,7 +95,11 @@ func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
|
||||
for k, v := range cgrEv.Event {
|
||||
req[k] = v
|
||||
}
|
||||
eeReq := NewEventExporterRequest(req, fFwv.dc, cgrEv.Tenant, fFwv.cgrCfg.GeneralCfg().DefaultTimezone,
|
||||
eeReq := NewEventExporterRequest(req, fFwv.dc,
|
||||
fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Tenant,
|
||||
fFwv.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
utils.FirstNonEmpty(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Timezone,
|
||||
fFwv.cgrCfg.GeneralCfg().DefaultTimezone),
|
||||
fFwv.filterS)
|
||||
|
||||
if err = eeReq.SetFields(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].ContentFields()); err != nil {
|
||||
@@ -108,7 +112,8 @@ func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
|
||||
}
|
||||
records = append(records, strVal)
|
||||
}
|
||||
updateEEMetrics(fFwv.dc, cgrEv.Event, fFwv.cgrCfg.GeneralCfg().DefaultTimezone)
|
||||
updateEEMetrics(fFwv.dc, cgrEv.Event, utils.FirstNonEmpty(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Timezone,
|
||||
fFwv.cgrCfg.GeneralCfg().DefaultTimezone))
|
||||
for _, record := range append(records, "\n") {
|
||||
if _, err = io.WriteString(fFwv.file, record); err != nil {
|
||||
return
|
||||
@@ -123,7 +128,11 @@ func (fFwv *FileFWVee) composeHeader() (err error) {
|
||||
return
|
||||
}
|
||||
var records []string
|
||||
eeReq := NewEventExporterRequest(nil, fFwv.dc, fFwv.cgrCfg.GeneralCfg().DefaultTenant, fFwv.cgrCfg.GeneralCfg().DefaultTimezone,
|
||||
eeReq := NewEventExporterRequest(nil, fFwv.dc,
|
||||
fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Tenant,
|
||||
fFwv.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
utils.FirstNonEmpty(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Timezone,
|
||||
fFwv.cgrCfg.GeneralCfg().DefaultTimezone),
|
||||
fFwv.filterS)
|
||||
if err = eeReq.SetFields(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].HeaderFields()); err != nil {
|
||||
return
|
||||
@@ -149,7 +158,11 @@ func (fFwv *FileFWVee) composeTrailer() (err error) {
|
||||
return
|
||||
}
|
||||
var records []string
|
||||
eeReq := NewEventExporterRequest(nil, fFwv.dc, fFwv.cgrCfg.GeneralCfg().DefaultTenant, fFwv.cgrCfg.GeneralCfg().DefaultTimezone,
|
||||
eeReq := NewEventExporterRequest(nil, fFwv.dc,
|
||||
fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Tenant,
|
||||
fFwv.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
utils.FirstNonEmpty(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Timezone,
|
||||
fFwv.cgrCfg.GeneralCfg().DefaultTimezone),
|
||||
fFwv.filterS)
|
||||
if err = eeReq.SetFields(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].TrailerFields()); err != nil {
|
||||
return
|
||||
|
||||
@@ -79,7 +79,11 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
|
||||
for k, v := range cgrEv.Event {
|
||||
req[k] = v
|
||||
}
|
||||
eeReq := NewEventExporterRequest(req, httpPost.dc, cgrEv.Tenant, httpPost.cgrCfg.GeneralCfg().DefaultTimezone,
|
||||
eeReq := NewEventExporterRequest(req, httpPost.dc,
|
||||
httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Tenant,
|
||||
httpPost.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
utils.FirstNonEmpty(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Timezone,
|
||||
httpPost.cgrCfg.GeneralCfg().DefaultTimezone),
|
||||
httpPost.filterS)
|
||||
|
||||
if err = eeReq.SetFields(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ContentFields()); err != nil {
|
||||
|
||||
@@ -77,12 +77,17 @@ func (vEe *VirtualEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
|
||||
for k, v := range cgrEv.Event {
|
||||
req[k] = v
|
||||
}
|
||||
eeReq := NewEventExporterRequest(req, vEe.dc, cgrEv.Tenant, vEe.cgrCfg.GeneralCfg().DefaultTimezone,
|
||||
eeReq := NewEventExporterRequest(req, vEe.dc,
|
||||
vEe.cgrCfg.EEsCfg().Exporters[vEe.cfgIdx].Tenant,
|
||||
vEe.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
utils.FirstNonEmpty(vEe.cgrCfg.EEsCfg().Exporters[vEe.cfgIdx].Timezone,
|
||||
vEe.cgrCfg.GeneralCfg().DefaultTimezone),
|
||||
vEe.filterS)
|
||||
if err = eeReq.SetFields(vEe.cgrCfg.EEsCfg().Exporters[vEe.cfgIdx].ContentFields()); err != nil {
|
||||
return
|
||||
}
|
||||
updateEEMetrics(vEe.dc, cgrEv.Event, vEe.cgrCfg.GeneralCfg().DefaultTimezone)
|
||||
updateEEMetrics(vEe.dc, cgrEv.Event, utils.FirstNonEmpty(vEe.cgrCfg.EEsCfg().Exporters[vEe.cfgIdx].Timezone,
|
||||
vEe.cgrCfg.GeneralCfg().DefaultTimezone))
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user