Added *eventType to opts for process threhold

This commit is contained in:
Trial97
2020-10-08 17:35:05 +03:00
committed by Dan Christian Bogos
parent 50789d6000
commit 61d2b4e922
30 changed files with 318 additions and 169 deletions

View File

@@ -27,10 +27,10 @@ import (
)
type EventExporter interface {
ID() string // return the exporter identificator
ExportEvent(cgrEv *utils.CGREvent) (err error) // called on each event to be exported
OnEvicted(itmID string, value interface{}) // called when the exporter needs to terminate
GetMetrics() utils.MapStorage // called to get metrics
ID() string // return the exporter identificator
ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) // called on each event to be exported
OnEvicted(itmID string, value interface{}) // called when the exporter needs to terminate
GetMetrics() utils.MapStorage // called to get metrics
}
// NewEventExporter produces exporters

View File

@@ -32,9 +32,9 @@ import (
)
// NewEventExporterRequest returns a new EventExporterRequest
func NewEventExporterRequest(req utils.DataProvider, dc utils.MapStorage,
tntTpl config.RSRParsers,
dfltTenant, timezone string, filterS *engine.FilterS) (eeR *EventExporterRequest) {
func NewEventExporterRequest(req utils.DataProvider, dc, opts utils.MapStorage,
tntTpl config.RSRParsers, dfltTenant, timezone string,
filterS *engine.FilterS) (eeR *EventExporterRequest) {
eeR = &EventExporterRequest{
req: req,
tmz: timezone,
@@ -43,6 +43,7 @@ func NewEventExporterRequest(req utils.DataProvider, dc utils.MapStorage,
hdr: utils.NewOrderedNavigableMap(),
trl: utils.NewOrderedNavigableMap(),
dc: dc,
opts: opts,
}
// populate tenant
if tntIf, err := eeR.ParseField(
@@ -66,6 +67,7 @@ type EventExporterRequest struct {
hdr *utils.OrderedNavigableMap // Used in reply to access the request that was send
trl *utils.OrderedNavigableMap // Used in reply to access the request that was send
dc utils.MapStorage
opts utils.MapStorage
filterS *engine.FilterS
}
@@ -95,6 +97,8 @@ func (eeR *EventExporterRequest) FieldAsInterface(fldPath []string) (val interfa
}
case utils.MetaDC:
val, err = eeR.dc.FieldAsInterface(fldPath[1:])
case utils.MetaOpts:
val, err = eeR.opts.FieldAsInterface(fldPath[1:])
}
if err != nil {
return

View File

@@ -218,7 +218,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma
utils.EventExporterS, ee.ID()))
}
go func(evict, sync bool, ee EventExporter) {
if err := ee.ExportEvent(cgrEv.CGREvent); err != nil {
if err := ee.ExportEvent(cgrEv.CGREventWithOpts); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> with id <%s>, error: <%s>",
utils.EventExporterS, ee.ID(), err.Error()))

View File

@@ -123,7 +123,7 @@ func (eEe *ElasticEe) OnEvicted(_ string, _ interface{}) {
}
// ExportEvent implements EventExporter
func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) {
eEe.Lock()
defer func() {
if err != nil {
@@ -140,7 +140,7 @@ func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
valMp = cgrEv.Event
} else {
req := utils.MapStorage(cgrEv.Event)
eeReq := NewEventExporterRequest(req, eEe.dc,
eeReq := NewEventExporterRequest(req, eEe.dc, cgrEv.Opts,
eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Tenant,
eEe.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Timezone,

View File

@@ -91,7 +91,7 @@ func (fCsv *FileCSVee) OnEvicted(_ string, _ interface{}) {
}
// ExportEvent implements EventExporter
func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) {
fCsv.Lock()
defer func() {
if err != nil {
@@ -109,7 +109,8 @@ func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
var csvRecord []string
req := utils.MapStorage(cgrEv.Event)
eeReq := NewEventExporterRequest(req, fCsv.dc, fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Tenant,
eeReq := NewEventExporterRequest(req, fCsv.dc, cgrEv.Opts,
fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Tenant,
fCsv.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Timezone,
fCsv.cgrCfg.GeneralCfg().DefaultTimezone),
@@ -136,7 +137,8 @@ func (fCsv *FileCSVee) composeHeader() (err error) {
return
}
var csvRecord []string
eeReq := NewEventExporterRequest(nil, fCsv.dc, fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Tenant,
eeReq := NewEventExporterRequest(nil, fCsv.dc, nil,
fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Tenant,
fCsv.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Timezone,
fCsv.cgrCfg.GeneralCfg().DefaultTimezone),
@@ -160,7 +162,7 @@ func (fCsv *FileCSVee) composeTrailer() (err error) {
return
}
var csvRecord []string
eeReq := NewEventExporterRequest(nil, fCsv.dc,
eeReq := NewEventExporterRequest(nil, fCsv.dc, nil,
fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Tenant,
fCsv.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Timezone,

View File

@@ -82,7 +82,7 @@ func (fFwv *FileFWVee) OnEvicted(_ string, _ interface{}) {
}
// ExportEvent implements EventExporter
func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) {
fFwv.Lock()
defer func() {
if err != nil {
@@ -95,7 +95,7 @@ func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
fFwv.dc[utils.NumberOfEvents] = fFwv.dc[utils.NumberOfEvents].(int64) + 1
var records []string
req := utils.MapStorage(cgrEv.Event)
eeReq := NewEventExporterRequest(req, fFwv.dc,
eeReq := NewEventExporterRequest(req, fFwv.dc, cgrEv.Opts,
fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Tenant,
fFwv.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Timezone,
@@ -128,7 +128,7 @@ func (fFwv *FileFWVee) composeHeader() (err error) {
return
}
var records []string
eeReq := NewEventExporterRequest(nil, fFwv.dc,
eeReq := NewEventExporterRequest(nil, fFwv.dc, nil,
fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Tenant,
fFwv.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Timezone,
@@ -158,7 +158,7 @@ func (fFwv *FileFWVee) composeTrailer() (err error) {
return
}
var records []string
eeReq := NewEventExporterRequest(nil, fFwv.dc,
eeReq := NewEventExporterRequest(nil, fFwv.dc, nil,
fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Tenant,
fFwv.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Timezone,

View File

@@ -85,7 +85,7 @@ func (pstrEE *PosterJSONMapEE) OnEvicted(string, interface{}) {
}
// ExportEvent implements EventExporter
func (pstrEE *PosterJSONMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
func (pstrEE *PosterJSONMapEE) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) {
pstrEE.Lock()
defer func() {
if err != nil {
@@ -102,7 +102,7 @@ func (pstrEE *PosterJSONMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
if len(pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].ContentFields()) == 0 {
valMp = cgrEv.Event
} else {
eeReq := NewEventExporterRequest(utils.MapStorage(cgrEv.Event), pstrEE.dc,
eeReq := NewEventExporterRequest(utils.MapStorage(cgrEv.Event), pstrEE.dc, cgrEv.Opts,
pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Tenant,
pstrEE.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Timezone,

View File

@@ -61,7 +61,7 @@ func (httpPost *HTTPPost) OnEvicted(_ string, _ interface{}) {
}
// ExportEvent implements EventExporter
func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) {
httpPost.Lock()
defer func() {
if err != nil {
@@ -80,7 +80,7 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
}
} else {
req := utils.MapStorage(cgrEv.Event)
eeReq := NewEventExporterRequest(req, httpPost.dc,
eeReq := NewEventExporterRequest(req, httpPost.dc, cgrEv.Opts,
httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Tenant,
httpPost.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Timezone,

View File

@@ -60,7 +60,7 @@ func (vEe *VirtualEe) OnEvicted(_ string, _ interface{}) {
}
// ExportEvent implements EventExporter
func (vEe *VirtualEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
func (vEe *VirtualEe) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) {
vEe.Lock()
defer func() {
if err != nil {
@@ -73,7 +73,7 @@ func (vEe *VirtualEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
vEe.dc[utils.NumberOfEvents] = vEe.dc[utils.NumberOfEvents].(int64) + 1
req := utils.MapStorage(cgrEv.Event)
eeReq := NewEventExporterRequest(req, vEe.dc,
eeReq := NewEventExporterRequest(req, vEe.dc, cgrEv.Opts,
vEe.cgrCfg.EEsCfg().Exporters[vEe.cfgIdx].Tenant,
vEe.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(vEe.cgrCfg.EEsCfg().Exporters[vEe.cfgIdx].Timezone,