rename *dc DataProvider to *em (from EventMetrics)

This commit is contained in:
ionutboangiu
2025-11-04 13:52:24 +02:00
committed by Dan Christian Bogos
parent 36633ec11a
commit f2a4427d2c
34 changed files with 274 additions and 298 deletions

View File

@@ -94,7 +94,7 @@
{"tag": "Usage", "path": "*hdr.Usage", "type": "*constant", "value": "Usage"},
{"tag": "Cost", "path": "*hdr.Cost", "type": "*constant", "value": "Cost"},
{"tag": "Number", "path": "*exp.Number", "type": "*variable", "value": "~*dc.NumberOfEvents"},
{"tag": "Number", "path": "*exp.Number", "type": "*variable", "value": "~*em.NumberOfEvents"},
{"tag": "*originID", "path": "*exp.*originID", "type": "*variable", "value": "~*opts.*originID"},
{"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"},
{"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"},
@@ -111,10 +111,10 @@
{"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"},
{"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"},
{"tag": "NumberOfEvents", "path": "*trl.NumberOfEvents", "type": "*variable", "value": "~*dc.NumberOfEvents"},
{"tag": "TotalDuration", "path": "*trl.TotalDuration", "type": "*variable", "value": "~*dc.TotalDuration"},
{"tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*dc.TotalSMSUsage"},
{"tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*dc.TotalCost{*round:4}"},
{"tag": "NumberOfEvents", "path": "*trl.NumberOfEvents", "type": "*variable", "value": "~*em.NumberOfEvents"},
{"tag": "TotalDuration", "path": "*trl.TotalDuration", "type": "*variable", "value": "~*em.TotalDuration"},
{"tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*em.TotalSMSUsage"},
{"tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*em.TotalCost{*round:4}"},
],
},
{
@@ -142,7 +142,7 @@
{"tag": "Usage", "path": "*hdr.Usage", "type": "*constant", "value": "Usage"},
{"tag": "Cost", "path": "*hdr.Cost", "type": "*constant", "value": "Cost"},
{"tag": "Number", "path": "*exp.Number", "type": "*variable", "value": "~*dc.NumberOfEvents"},
{"tag": "Number", "path": "*exp.Number", "type": "*variable", "value": "~*em.NumberOfEvents"},
{"tag": "*originID", "path": "*exp.*originID", "type": "*variable", "value": "~*opts.*originID"},
{"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"},
{"tag": "ToR", "path": "*exp.ToR", "type": "*constant", "value": ""},
@@ -162,10 +162,10 @@
{"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"},
{"tag": "NumberOfEvents", "path": "*trl.NumberOfEvents", "type": "*variable", "value": "~*dc.NumberOfEvents"},
{"tag": "TotalDuration", "path": "*trl.TotalDuration", "type": "*variable", "value": "~*dc.TotalDuration"},
{"tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*dc.TotalSMSUsage"},
{"tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*dc.TotalCost{*round:4}"},
{"tag": "NumberOfEvents", "path": "*trl.NumberOfEvents", "type": "*variable", "value": "~*em.NumberOfEvents"},
{"tag": "TotalDuration", "path": "*trl.TotalDuration", "type": "*variable", "value": "~*em.TotalDuration"},
{"tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*em.TotalSMSUsage"},
{"tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*em.TotalCost{*round:4}"},
],
},
{
@@ -182,7 +182,7 @@
{"tag": "DistributorCode", "path": "*hdr.DistributorCode",
"type": "*constant", "value": "VOI","width": 3},
{"tag": "FileCreationTime", "path": "*hdr.FileCreationTime",
"type": "*variable","value":"~*dc.TimeNow{*time_string:020106150400}",
"type": "*variable","value":"~*em.TimeNow{*time_string:020106150400}",
"width": 12 },
{"tag": "FileVersion", "path": "*hdr.FileVersion", "type": "*constant",
"value": "01","width": 2},
@@ -230,13 +230,13 @@
{"tag": "DistributorCode", "path": "*trl.DistributorCode",
"type": "*constant", "value": "VOI","width": 3},
{"tag": "NumberOfRecords", "path": "*trl.NumberOfRecords",
"type": "*variable", "value": "~*dc.NumberOfEvents","width": 6,"padding": "*zeroleft"},
"type": "*variable", "value": "~*em.NumberOfEvents","width": 6,"padding": "*zeroleft"},
{"tag": "CdrsDuration", "path": "*trl.CdrsDuration", "type": "*variable",
"value": "~*dc.TotalDuration","width": 8,"padding":"*zeroleft","layout": "seconds"},
"value": "~*em.TotalDuration","width": 8,"padding":"*zeroleft","layout": "seconds"},
{"tag": "FirstCdrTime", "path": "*trl.FirstCdrTime", "type": "*variable",
"value": "~*dc.FirstEventATime{*time_string:020106150400}", "width": 12},
"value": "~*em.FirstEventATime{*time_string:020106150400}", "width": 12},
{"tag": "LastCdrTime", "path": "*hdr.LastCdrTime", "type": "*variable",
"value": "~*dc.LastEventATime{*time_string:020106150400}", "width": 12,},
"value": "~*em.LastEventATime{*time_string:020106150400}", "width": 12,},
{"tag": "Filler2", "path": "*trl.Filler2", "type": "*filler",
"width": 93}
],

View File

@@ -30,10 +30,10 @@ import (
// NewAMQPee creates a new amqp poster
// "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs"
func NewAMQPee(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *AMQPee {
func NewAMQPee(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *AMQPee {
amqp := &AMQPee{
cfg: cfg,
dc: dc,
em: em,
reqs: newConcReq(cfg.ConcurrentRequests),
}
amqp.parseOpts(cfg.Opts)
@@ -50,7 +50,7 @@ type AMQPee struct {
postChan *amqp.Channel
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
reqs *concReq
sync.RWMutex // protect connection
bytePreparing
@@ -175,6 +175,6 @@ func (pstr *AMQPee) Close() (err error) {
return
}
func (pstr *AMQPee) GetMetrics() *utils.ExporterMetrics { return pstr.dc }
func (pstr *AMQPee) GetMetrics() *utils.ExporterMetrics { return pstr.em }
func (pstr *AMQPee) ExtraData(*utils.CGREvent) any { return nil }

View File

@@ -28,7 +28,7 @@ import (
func TestNewAMQPee(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
dc := &utils.ExporterMetrics{
em := &utils.ExporterMetrics{
MapStorage: utils.MapStorage{
utils.NumberOfEvents: int64(0),
utils.PositiveExports: utils.StringSet{},
@@ -36,10 +36,10 @@ func TestNewAMQPee(t *testing.T) {
},
}
cfg.EEsCfg().ExporterCfg(utils.MetaDefault).ConcurrentRequests = 2
rcv := NewAMQPee(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), dc)
rcv := NewAMQPee(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), em)
exp := &AMQPee{
cfg: cfg.EEsCfg().ExporterCfg(utils.MetaDefault),
dc: dc,
em: em,
reqs: newConcReq(cfg.EEsCfg().ExporterCfg(utils.MetaDefault).ConcurrentRequests),
}
rcv.reqs = nil
@@ -49,24 +49,3 @@ func TestNewAMQPee(t *testing.T) {
t.Errorf("Expected %v\n but received %v", exp, rcv)
}
}
// func TestAMQPExportEvent(t *testing.T) {
// cfg := config.NewDefaultCGRConfig()
// dc := &utils.SafeMapStorage{
// MapStorage: utils.MapStorage{
// utils.NumberOfEvents: int64(0),
// utils.PositiveExports: utils.StringSet{},
// utils.NegativeExports: 5,
// }}
// // cfg.EEsCfg().ExporterCfg(utils.MetaDefault).ConcurrentRequests = 2
// // cfg.EEsCfg().ExporterCfg(utils.MetaDefault).Opts = &config.EventExporterOpts{
// // }
// pstr := NewAMQPee(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), dc)
// content := "some_content"
// pstr.postChan =
// if err := pstr.ExportEvent(context.Background(), content, ""); err != nil {
// t.Error(err)
// }
// }

View File

@@ -28,10 +28,10 @@ import (
)
// NewAMQPv1EE creates a poster for amqpv1
func NewAMQPv1EE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *AMQPv1EE {
func NewAMQPv1EE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *AMQPv1EE {
pstr := &AMQPv1EE{
cfg: cfg,
dc: dc,
em: em,
queueID: "/" + utils.DefaultQueueID,
reqs: newConcReq(cfg.ConcurrentRequests),
}
@@ -54,7 +54,7 @@ type AMQPv1EE struct {
session *amqpv1.Session
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
reqs *concReq
sync.RWMutex // protect connection
bytePreparing
@@ -118,6 +118,6 @@ func (pstr *AMQPv1EE) Close() (err error) {
return
}
func (pstr *AMQPv1EE) GetMetrics() *utils.ExporterMetrics { return pstr.dc }
func (pstr *AMQPv1EE) GetMetrics() *utils.ExporterMetrics { return pstr.em }
func (pstr *AMQPv1EE) ExtraData(*utils.CGREvent) any { return nil }

View File

@@ -195,7 +195,7 @@ func exportEventWithExporter(ctx *context.Context, exp EventExporter, connMngr *
expNM := utils.NewOrderedNavigableMap()
err = NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(ev.Event),
utils.MetaDC: exp.GetMetrics(),
utils.MetaEM: exp.GetMetrics(),
utils.MetaOpts: utils.MapStorage(ev.APIOpts),
utils.MetaCfg: cfg.GetDataProvider(),
utils.MetaVars: utils.MapStorage{utils.MetaTenant: ev.Tenant, utils.MetaExporterID: ev.APIOpts[utils.MetaExporterID]},
@@ -244,7 +244,7 @@ func (eeS *EeS) V1ArchiveEventsInReply(ctx *context.Context, args *ArchiveEvents
if err != nil {
return err
}
dc := utils.NewExporterMetrics(eeCfg.MetricsResetSchedule, loc)
em := utils.NewExporterMetrics(eeCfg.MetricsResetSchedule, loc)
var ee EventExporter
@@ -261,9 +261,9 @@ func (eeS *EeS) V1ArchiveEventsInReply(ctx *context.Context, args *ArchiveEvents
}
switch eeCfg.Type {
case utils.MetaFileCSV:
ee, err = NewFileCSVee(eeCfg, eeS.cfg, eeS.fltrS, dc, &buffer{wrtr})
ee, err = NewFileCSVee(eeCfg, eeS.cfg, eeS.fltrS, em, &buffer{wrtr})
case utils.MetaFileFWV:
ee, err = NewFileFWVee(eeCfg, eeS.cfg, eeS.fltrS, dc, &buffer{wrtr})
ee, err = NewFileFWVee(eeCfg, eeS.cfg, eeS.fltrS, em, &buffer{wrtr})
default:
err = fmt.Errorf("unsupported exporter type: %s>", eeCfg.Type)
}

118
ees/ee.go
View File

@@ -49,40 +49,40 @@ func NewEventExporter(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig,
if err != nil {
return nil, err
}
dc := utils.NewExporterMetrics(cfg.MetricsResetSchedule, loc)
em := utils.NewExporterMetrics(cfg.MetricsResetSchedule, loc)
switch cfg.Type {
case utils.MetaFileCSV:
return NewFileCSVee(cfg, cgrCfg, filterS, dc, nil)
return NewFileCSVee(cfg, cgrCfg, filterS, em, nil)
case utils.MetaFileFWV:
return NewFileFWVee(cfg, cgrCfg, filterS, dc, nil)
return NewFileFWVee(cfg, cgrCfg, filterS, em, nil)
case utils.MetaHTTPPost:
return NewHTTPPostEE(cfg, cgrCfg, filterS, dc)
return NewHTTPPostEE(cfg, cgrCfg, filterS, em)
case utils.MetaHTTPjsonMap:
return NewHTTPjsonMapEE(cfg, cgrCfg, filterS, dc)
return NewHTTPjsonMapEE(cfg, cgrCfg, filterS, em)
case utils.MetaNATSJSONMap:
return NewNatsEE(cfg, cgrCfg.GeneralCfg().NodeID,
cgrCfg.GeneralCfg().ConnectTimeout, dc)
cgrCfg.GeneralCfg().ConnectTimeout, em)
case utils.MetaAMQPjsonMap:
return NewAMQPee(cfg, dc), nil
return NewAMQPee(cfg, em), nil
case utils.MetaAMQPV1jsonMap:
return NewAMQPv1EE(cfg, dc), nil
return NewAMQPv1EE(cfg, em), nil
case utils.MetaS3jsonMap:
return NewS3EE(cfg, dc), nil
return NewS3EE(cfg, em), nil
case utils.MetaSQSjsonMap:
return NewSQSee(cfg, dc), nil
return NewSQSee(cfg, em), nil
case utils.MetaKafkajsonMap:
return NewKafkaEE(cfg, dc)
return NewKafkaEE(cfg, em)
case utils.MetaVirt:
return NewVirtualEE(cfg, dc), nil
return NewVirtualEE(cfg, em), nil
case utils.MetaElastic:
return NewElasticEE(cfg, dc)
return NewElasticEE(cfg, em)
case utils.MetaSQL:
return NewSQLEe(cfg, dc)
return NewSQLEe(cfg, em)
case utils.MetaLog:
return NewLogEE(cfg, dc), nil
return NewLogEE(cfg, em), nil
case utils.MetaRpc:
return NewRpcEE(cfg, dc, connMngr)
return NewRpcEE(cfg, em, connMngr)
default:
return nil, fmt.Errorf("unsupported exporter type: <%s>", cfg.Type)
}
@@ -116,88 +116,88 @@ func (c *concReq) done() {
}
// composeHeaderTrailer will return the orderNM for *hdr or *trl
func composeHeaderTrailer(ctx *context.Context, prfx string, fields []*config.FCTemplate, dc utils.DataStorage, cfg *config.CGRConfig, fltS *engine.FilterS) (r *utils.OrderedNavigableMap, err error) {
func composeHeaderTrailer(ctx *context.Context, prfx string, fields []*config.FCTemplate, em utils.DataStorage, cfg *config.CGRConfig, fltS *engine.FilterS) (r *utils.OrderedNavigableMap, err error) {
r = utils.NewOrderedNavigableMap()
err = NewExportRequest(map[string]utils.DataStorage{
utils.MetaDC: dc,
utils.MetaEM: em,
utils.MetaCfg: cfg.GetDataProvider(),
}, cfg.GeneralCfg().DefaultTenant, fltS,
map[string]*utils.OrderedNavigableMap{prfx: r}).SetFields(ctx, fields)
return
}
func updateEEMetrics(dc *utils.ExporterMetrics, originID string, ev engine.MapEvent, hasError bool, timezone string) {
dc.Lock()
defer dc.Unlock()
func updateEEMetrics(em *utils.ExporterMetrics, originID string, ev engine.MapEvent, hasError bool, timezone string) {
em.Lock()
defer em.Unlock()
if hasError {
dc.MapStorage[utils.NegativeExports].(utils.StringSet).Add(originID)
em.MapStorage[utils.NegativeExports].(utils.StringSet).Add(originID)
} else {
dc.MapStorage[utils.PositiveExports].(utils.StringSet).Add(originID)
em.MapStorage[utils.PositiveExports].(utils.StringSet).Add(originID)
}
if aTime, err := ev.GetTime(utils.AnswerTime, timezone); err == nil {
if _, has := dc.MapStorage[utils.FirstEventATime]; !has {
dc.MapStorage[utils.FirstEventATime] = time.Time{}
if _, has := em.MapStorage[utils.FirstEventATime]; !has {
em.MapStorage[utils.FirstEventATime] = time.Time{}
}
if _, has := dc.MapStorage[utils.LastEventATime]; !has {
dc.MapStorage[utils.LastEventATime] = time.Time{}
if _, has := em.MapStorage[utils.LastEventATime]; !has {
em.MapStorage[utils.LastEventATime] = time.Time{}
}
if dc.MapStorage[utils.FirstEventATime].(time.Time).IsZero() ||
aTime.Before(dc.MapStorage[utils.FirstEventATime].(time.Time)) {
dc.MapStorage[utils.FirstEventATime] = aTime
if em.MapStorage[utils.FirstEventATime].(time.Time).IsZero() ||
aTime.Before(em.MapStorage[utils.FirstEventATime].(time.Time)) {
em.MapStorage[utils.FirstEventATime] = aTime
}
if aTime.After(dc.MapStorage[utils.LastEventATime].(time.Time)) {
dc.MapStorage[utils.LastEventATime] = aTime
if aTime.After(em.MapStorage[utils.LastEventATime].(time.Time)) {
em.MapStorage[utils.LastEventATime] = aTime
}
}
if oID, err := ev.GetTInt64(utils.OrderID); err == nil {
if _, has := dc.MapStorage[utils.FirstExpOrderID]; !has {
dc.MapStorage[utils.FirstExpOrderID] = int64(0)
if _, has := em.MapStorage[utils.FirstExpOrderID]; !has {
em.MapStorage[utils.FirstExpOrderID] = int64(0)
}
if _, has := dc.MapStorage[utils.LastExpOrderID]; !has {
dc.MapStorage[utils.LastExpOrderID] = int64(0)
if _, has := em.MapStorage[utils.LastExpOrderID]; !has {
em.MapStorage[utils.LastExpOrderID] = int64(0)
}
if dc.MapStorage[utils.FirstExpOrderID].(int64) == 0 ||
dc.MapStorage[utils.FirstExpOrderID].(int64) > oID {
dc.MapStorage[utils.FirstExpOrderID] = oID
if em.MapStorage[utils.FirstExpOrderID].(int64) == 0 ||
em.MapStorage[utils.FirstExpOrderID].(int64) > oID {
em.MapStorage[utils.FirstExpOrderID] = oID
}
if dc.MapStorage[utils.LastExpOrderID].(int64) < oID {
dc.MapStorage[utils.LastExpOrderID] = oID
if em.MapStorage[utils.LastExpOrderID].(int64) < oID {
em.MapStorage[utils.LastExpOrderID] = oID
}
}
if cost, err := ev.GetFloat64(utils.Cost); err == nil {
if _, has := dc.MapStorage[utils.TotalCost]; !has {
dc.MapStorage[utils.TotalCost] = float64(0.0)
if _, has := em.MapStorage[utils.TotalCost]; !has {
em.MapStorage[utils.TotalCost] = float64(0.0)
}
dc.MapStorage[utils.TotalCost] = dc.MapStorage[utils.TotalCost].(float64) + cost
em.MapStorage[utils.TotalCost] = em.MapStorage[utils.TotalCost].(float64) + cost
}
if tor, err := ev.GetString(utils.ToR); err == nil {
if usage, err := ev.GetDuration(utils.Usage); err == nil {
switch tor {
case utils.MetaVoice:
if _, has := dc.MapStorage[utils.TotalDuration]; !has {
dc.MapStorage[utils.TotalDuration] = time.Duration(0)
if _, has := em.MapStorage[utils.TotalDuration]; !has {
em.MapStorage[utils.TotalDuration] = time.Duration(0)
}
dc.MapStorage[utils.TotalDuration] = dc.MapStorage[utils.TotalDuration].(time.Duration) + usage
em.MapStorage[utils.TotalDuration] = em.MapStorage[utils.TotalDuration].(time.Duration) + usage
case utils.MetaSMS:
if _, has := dc.MapStorage[utils.TotalSMSUsage]; !has {
dc.MapStorage[utils.TotalSMSUsage] = time.Duration(0)
if _, has := em.MapStorage[utils.TotalSMSUsage]; !has {
em.MapStorage[utils.TotalSMSUsage] = time.Duration(0)
}
dc.MapStorage[utils.TotalSMSUsage] = dc.MapStorage[utils.TotalSMSUsage].(time.Duration) + usage
em.MapStorage[utils.TotalSMSUsage] = em.MapStorage[utils.TotalSMSUsage].(time.Duration) + usage
case utils.MetaMMS:
if _, has := dc.MapStorage[utils.TotalMMSUsage]; !has {
dc.MapStorage[utils.TotalMMSUsage] = time.Duration(0)
if _, has := em.MapStorage[utils.TotalMMSUsage]; !has {
em.MapStorage[utils.TotalMMSUsage] = time.Duration(0)
}
dc.MapStorage[utils.TotalMMSUsage] = dc.MapStorage[utils.TotalMMSUsage].(time.Duration) + usage
em.MapStorage[utils.TotalMMSUsage] = em.MapStorage[utils.TotalMMSUsage].(time.Duration) + usage
case utils.MetaGeneric:
if _, has := dc.MapStorage[utils.TotalGenericUsage]; !has {
dc.MapStorage[utils.TotalGenericUsage] = time.Duration(0)
if _, has := em.MapStorage[utils.TotalGenericUsage]; !has {
em.MapStorage[utils.TotalGenericUsage] = time.Duration(0)
}
dc.MapStorage[utils.TotalGenericUsage] = dc.MapStorage[utils.TotalGenericUsage].(time.Duration) + usage
em.MapStorage[utils.TotalGenericUsage] = em.MapStorage[utils.TotalGenericUsage].(time.Duration) + usage
case utils.MetaData:
if _, has := dc.MapStorage[utils.TotalDataUsage]; !has {
dc.MapStorage[utils.TotalDataUsage] = time.Duration(0)
if _, has := em.MapStorage[utils.TotalDataUsage]; !has {
em.MapStorage[utils.TotalDataUsage] = time.Duration(0)
}
dc.MapStorage[utils.TotalDataUsage] = dc.MapStorage[utils.TotalDataUsage].(time.Duration) + usage
em.MapStorage[utils.TotalDataUsage] = em.MapStorage[utils.TotalDataUsage].(time.Duration) + usage
}
}
}

View File

@@ -40,8 +40,8 @@ func TestNewEventExporter(t *testing.T) {
if strings.Contains(errExpect, err.Error()) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
}
dc := utils.NewExporterMetrics("", time.Local)
eeExpect, err := NewFileCSVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc, nil)
em := utils.NewExporterMetrics("", time.Local)
eeExpect, err := NewFileCSVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, em, nil)
if strings.Contains(errExpect, err.Error()) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
}
@@ -50,11 +50,11 @@ func TestNewEventExporter(t *testing.T) {
t.Error("\nExpected an error")
}
newEE := ee.(*FileCSVee)
newEE.dc.MapStorage[utils.TimeNow] = nil
newEE.dc.MapStorage[utils.ExportPath] = nil
newEE.em.MapStorage[utils.TimeNow] = nil
newEE.em.MapStorage[utils.ExportPath] = nil
eeExpect.csvWriter = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.ExportPath] = nil
eeExpect.em.MapStorage[utils.TimeNow] = nil
eeExpect.em.MapStorage[utils.ExportPath] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE))
}
@@ -71,8 +71,8 @@ func TestNewEventExporterCase2(t *testing.T) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
}
dc := utils.NewExporterMetrics("", time.Local)
eeExpect, err := NewFileFWVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc, io.Discard)
em := utils.NewExporterMetrics("", time.Local)
eeExpect, err := NewFileFWVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, em, io.Discard)
if strings.Contains(errExpect, err.Error()) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
}
@@ -81,10 +81,10 @@ func TestNewEventExporterCase2(t *testing.T) {
t.Error("\nExpected an error")
}
newEE := ee.(*FileFWVee)
newEE.dc.MapStorage[utils.TimeNow] = nil
newEE.dc.MapStorage[utils.ExportPath] = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.ExportPath] = nil
newEE.em.MapStorage[utils.TimeNow] = nil
newEE.em.MapStorage[utils.ExportPath] = nil
eeExpect.em.MapStorage[utils.TimeNow] = nil
eeExpect.em.MapStorage[utils.ExportPath] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE))
}
@@ -99,14 +99,14 @@ func TestNewEventExporterCase3(t *testing.T) {
if err != nil {
t.Error(err)
}
dc := utils.NewExporterMetrics("", time.Local)
eeExpect, err := NewHTTPPostEE(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc)
em := utils.NewExporterMetrics("", time.Local)
eeExpect, err := NewHTTPPostEE(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, em)
if err != nil {
t.Error(err)
}
newEE := ee.(*HTTPPostEE)
newEE.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
newEE.em.MapStorage[utils.TimeNow] = nil
eeExpect.em.MapStorage[utils.TimeNow] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE))
}
@@ -121,14 +121,14 @@ func TestNewEventExporterCase4(t *testing.T) {
if err != nil {
t.Error(err)
}
dc := utils.NewExporterMetrics("", time.Local)
eeExpect, err := NewHTTPjsonMapEE(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc)
em := utils.NewExporterMetrics("", time.Local)
eeExpect, err := NewHTTPjsonMapEE(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, em)
if err != nil {
t.Error(err)
}
newEE := ee.(*HTTPjsonMapEE)
newEE.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
newEE.em.MapStorage[utils.TimeNow] = nil
eeExpect.em.MapStorage[utils.TimeNow] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE))
}
@@ -143,11 +143,11 @@ func TestNewEventExporterCase6(t *testing.T) {
if err != nil {
t.Error(err)
}
dc := utils.NewExporterMetrics("", time.Local)
eeExpect := NewVirtualEE(cgrCfg.EEsCfg().Exporters[0], dc)
em := utils.NewExporterMetrics("", time.Local)
eeExpect := NewVirtualEE(cgrCfg.EEsCfg().Exporters[0], em)
newEE := ee.(*VirtualEE)
newEE.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
newEE.em.MapStorage[utils.TimeNow] = nil
eeExpect.em.MapStorage[utils.TimeNow] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE))
}
@@ -176,17 +176,17 @@ func TestNewEventExporterCase7(t *testing.T) {
if err != nil {
t.Error(err)
}
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
if err != nil {
t.Error(err)
}
eeExpect, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc)
eeExpect, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], em)
if err != nil {
t.Error(err)
}
newEE := ee.(*ElasticEE)
newEE.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
newEE.em.MapStorage[utils.TimeNow] = nil
eeExpect.em.MapStorage[utils.TimeNow] = nil
eeExpect.client = newEE.client
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", eeExpect, newEE)

View File

@@ -304,7 +304,7 @@ func TestV1ProcessEvent4(t *testing.T) {
}
func newMockEventExporter() *mockEventExporter {
return &mockEventExporter{dc: &utils.ExporterMetrics{
return &mockEventExporter{em: &utils.ExporterMetrics{
MapStorage: utils.MapStorage{
utils.NumberOfEvents: int64(0),
utils.PositiveExports: utils.StringSet{},
@@ -313,12 +313,12 @@ func newMockEventExporter() *mockEventExporter {
}
type mockEventExporter struct {
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
bytePreparing
}
func (m mockEventExporter) GetMetrics() *utils.ExporterMetrics {
return m.dc
return m.em
}
func (mockEventExporter) Cfg() *config.EventExporterCfg { return new(config.EventExporterCfg) }
@@ -331,7 +331,7 @@ func (mockEventExporter) Close() error {
}
func TestV1ProcessEventMockMetrics(t *testing.T) {
mEe := mockEventExporter{dc: &utils.ExporterMetrics{
mEe := mockEventExporter{em: &utils.ExporterMetrics{
MapStorage: utils.MapStorage{
utils.NumberOfEvents: int64(0),
utils.PositiveExports: utils.StringSet{},
@@ -455,7 +455,7 @@ func TestOnCacheEvicted(t *testing.T) {
}
func TestUpdateEEMetrics(t *testing.T) {
dc := utils.NewExporterMetrics("", time.UTC)
em := utils.NewExporterMetrics("", time.UTC)
tnow := time.Now()
ev := engine.MapEvent{
utils.AnswerTime: tnow,
@@ -471,10 +471,10 @@ func TestUpdateEEMetrics(t *testing.T) {
exp.MapStorage[utils.LastExpOrderID] = int64(1)
exp.MapStorage[utils.TotalCost] = float64(5.5)
exp.MapStorage[utils.TotalDuration] = time.Second
exp.MapStorage[utils.TimeNow] = dc.MapStorage[utils.TimeNow]
exp.MapStorage[utils.TimeNow] = em.MapStorage[utils.TimeNow]
exp.MapStorage[utils.PositiveExports] = utils.StringSet{"": {}}
if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
if updateEEMetrics(em, "", ev, false, utils.EmptyString); !reflect.DeepEqual(em, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(em))
}
tnow = tnow.Add(24 * time.Hour)
@@ -489,8 +489,8 @@ func TestUpdateEEMetrics(t *testing.T) {
exp.MapStorage[utils.LastExpOrderID] = int64(2)
exp.MapStorage[utils.TotalCost] = float64(11)
exp.MapStorage[utils.TotalSMSUsage] = time.Second
if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
if updateEEMetrics(em, "", ev, false, utils.EmptyString); !reflect.DeepEqual(em, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(em))
}
tnow = tnow.Add(24 * time.Hour)
@@ -505,8 +505,8 @@ func TestUpdateEEMetrics(t *testing.T) {
exp.MapStorage[utils.LastExpOrderID] = int64(3)
exp.MapStorage[utils.TotalCost] = float64(16.5)
exp.MapStorage[utils.TotalMMSUsage] = time.Second
if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
if updateEEMetrics(em, "", ev, false, utils.EmptyString); !reflect.DeepEqual(em, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(em))
}
tnow = tnow.Add(24 * time.Hour)
@@ -521,8 +521,8 @@ func TestUpdateEEMetrics(t *testing.T) {
exp.MapStorage[utils.LastExpOrderID] = int64(4)
exp.MapStorage[utils.TotalCost] = float64(22)
exp.MapStorage[utils.TotalGenericUsage] = time.Second
if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
if updateEEMetrics(em, "", ev, false, utils.EmptyString); !reflect.DeepEqual(em, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(em))
}
tnow = tnow.Add(24 * time.Hour)
@@ -537,8 +537,8 @@ func TestUpdateEEMetrics(t *testing.T) {
exp.MapStorage[utils.LastExpOrderID] = int64(5)
exp.MapStorage[utils.TotalCost] = float64(27.5)
exp.MapStorage[utils.TotalDataUsage] = time.Second
if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
if updateEEMetrics(em, "", ev, false, utils.EmptyString); !reflect.DeepEqual(em, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(em))
}
}

View File

@@ -39,17 +39,17 @@ import (
type ElasticEE struct {
mu sync.RWMutex
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
reqs *concReq
client *elasticsearch.TypedClient
clientCfg elasticsearch.Config
}
func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) (*ElasticEE, error) {
func NewElasticEE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) (*ElasticEE, error) {
el := &ElasticEE{
cfg: cfg,
dc: dc,
em: em,
reqs: newConcReq(cfg.ConcurrentRequests),
}
if err := el.parseClientOpts(); err != nil {
@@ -221,7 +221,7 @@ func (e *ElasticEE) Close() error {
return nil
}
func (e *ElasticEE) GetMetrics() *utils.ExporterMetrics { return e.dc }
func (e *ElasticEE) GetMetrics() *utils.ExporterMetrics { return e.em }
func (eEE *ElasticEE) ExtraData(ev *utils.CGREvent) any {
return utils.ConcatenatedKey(

View File

@@ -28,13 +28,13 @@ import (
)
func TestGetMetrics(t *testing.T) {
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
ee := &ElasticEE{
dc: dc,
em: em,
}
if rcv := ee.GetMetrics(); !reflect.DeepEqual(rcv, ee.dc) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(ee.dc))
if rcv := ee.GetMetrics(); !reflect.DeepEqual(rcv, ee.em) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(ee.em))
}
}
@@ -56,8 +56,8 @@ func TestInitClient(t *testing.T) {
func TestElasticExportEventErr(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dc := utils.NewExporterMetrics("", time.Local)
eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc)
em := utils.NewExporterMetrics("", time.Local)
eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], em)
if err != nil {
t.Error(err)
}

View File

@@ -61,7 +61,7 @@ func TestExporterMetricsIT(t *testing.T) {
"synchronous": true,
"metrics_reset_schedule": "@every 5s",
"fields": [
{ "tag": "Number", "path": "*exp.Number", "type": "*variable", "value": "~*dc.NumberOfEvents" },
{ "tag": "Number", "path": "*exp.Number", "type": "*variable", "value": "~*em.NumberOfEvents" },
{ "tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*opts.*originID" },
{ "tag": "ToR", "path": "*exp.ToR", "type": "*constant", "value": "*sms" },
{ "tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account" },
@@ -70,10 +70,10 @@ func TestExporterMetricsIT(t *testing.T) {
{ "tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage" },
{ "tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}" },
{ "tag": "NumberOfEvents", "path": "*trl.NumberOfEvents", "type": "*variable", "value": "~*dc.NumberOfEvents" },
{ "tag": "TotalDuration", "path": "*trl.TotalDuration", "type": "*variable", "value": "~*dc.TotalDuration" },
{ "tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*dc.TotalSMSUsage" },
{ "tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*dc.TotalCost{*round:4}" }
{ "tag": "NumberOfEvents", "path": "*trl.NumberOfEvents", "type": "*variable", "value": "~*em.NumberOfEvents" },
{ "tag": "TotalDuration", "path": "*trl.TotalDuration", "type": "*variable", "value": "~*em.TotalDuration" },
{ "tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*em.TotalSMSUsage" },
{ "tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*em.TotalCost{*round:4}" }
]
}
]

View File

@@ -35,10 +35,10 @@ import (
func NewFileCSVee(cfg *config.EventExporterCfg,
cgrCfg *config.CGRConfig, filterS *engine.FilterS,
dc *utils.ExporterMetrics, wrtr io.WriteCloser) (fCsv *FileCSVee, err error) {
em *utils.ExporterMetrics, wrtr io.WriteCloser) (fCsv *FileCSVee, err error) {
fCsv = &FileCSVee{
cfg: cfg,
dc: dc,
em: em,
wrtr: wrtr,
cgrCfg: cgrCfg,
filterS: filterS,
@@ -50,7 +50,7 @@ func NewFileCSVee(cfg *config.EventExporterCfg,
// FileCSVee implements EventExporter interface for .csv files
type FileCSVee struct {
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
wrtr io.WriteCloser // writer for the csv
csvWriter *csv.Writer
sync.Mutex
@@ -66,7 +66,7 @@ func (fCsv *FileCSVee) init(wrtr io.WriteCloser) (err error) {
// create the file
filePath := path.Join(fCsv.Cfg().ExportPath,
fCsv.Cfg().ID+utils.Underline+utils.UUIDSha1Prefix()+utils.CSVSuffix)
fCsv.dc.Set([]string{utils.ExportPath}, filePath)
fCsv.em.Set([]string{utils.ExportPath}, filePath)
if fCsv.cfg.ExportPath == utils.MetaBuffer {
fCsv.wrtr = wrtr
} else if fCsv.wrtr, err = os.Create(filePath); err != nil {
@@ -84,7 +84,7 @@ func (fCsv *FileCSVee) init(wrtr io.WriteCloser) (err error) {
func (fCsv *FileCSVee) composeHeader() (err error) {
if len(fCsv.Cfg().HeaderFields()) != 0 {
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, fCsv.Cfg().HeaderFields(), fCsv.dc, fCsv.cgrCfg, fCsv.filterS); err != nil {
if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, fCsv.Cfg().HeaderFields(), fCsv.em, fCsv.cgrCfg, fCsv.filterS); err != nil {
return
}
return fCsv.csvWriter.Write(exp.OrderedFieldsAsStrings())
@@ -96,7 +96,7 @@ func (fCsv *FileCSVee) composeHeader() (err error) {
func (fCsv *FileCSVee) composeTrailer() (err error) {
if len(fCsv.Cfg().TrailerFields()) != 0 {
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(context.Background(), utils.MetaTrl, fCsv.Cfg().TrailerFields(), fCsv.dc, fCsv.cgrCfg, fCsv.filterS); err != nil {
if exp, err = composeHeaderTrailer(context.Background(), utils.MetaTrl, fCsv.Cfg().TrailerFields(), fCsv.em, fCsv.cgrCfg, fCsv.filterS); err != nil {
return
}
return fCsv.csvWriter.Write(exp.OrderedFieldsAsStrings())
@@ -130,7 +130,7 @@ func (fCsv *FileCSVee) Close() (err error) {
return
}
func (fCsv *FileCSVee) GetMetrics() *utils.ExporterMetrics { return fCsv.dc }
func (fCsv *FileCSVee) GetMetrics() *utils.ExporterMetrics { return fCsv.em }
func (fCsv *FileCSVee) ExtraData(ev *utils.CGREvent) any { return nil }

View File

@@ -723,11 +723,11 @@ func TestCsvInitFileCSV(t *testing.T) {
if err := os.MkdirAll("/tmp/TestInitFileCSV", 0666); err != nil {
t.Error(err)
}
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
fCsv := &FileCSVee{
cgrCfg: cgrCfg,
cfg: cgrCfg.EEsCfg().Exporters[0],
dc: dc,
em: em,
}
if err := fCsv.init(nil); err != nil {
t.Error(err)

View File

@@ -33,11 +33,11 @@ import (
)
func TestFileCsvGetMetrics(t *testing.T) {
dc := utils.NewExporterMetrics("", time.Local)
fCsv := &FileCSVee{dc: dc}
em := utils.NewExporterMetrics("", time.Local)
fCsv := &FileCSVee{em: em}
if rcv := fCsv.GetMetrics(); !reflect.DeepEqual(rcv, fCsv.dc) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(fCsv.dc))
if rcv := fCsv.GetMetrics(); !reflect.DeepEqual(rcv, fCsv.em) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(fCsv.em))
}
}
@@ -61,7 +61,7 @@ func TestFileCsvComposeHeader(t *testing.T) {
filterS: filterS,
wrtr: nopCloser{byteBuff},
csvWriter: csvNW,
dc: &utils.ExporterMetrics{},
em: &utils.ExporterMetrics{},
}
fCsv.Cfg().Fields = []*config.FCTemplate{
{
@@ -125,7 +125,7 @@ func TestFileCsvComposeTrailer(t *testing.T) {
filterS: filterS,
wrtr: nopCloser{byteBuff},
csvWriter: csvNW,
dc: &utils.ExporterMetrics{},
em: &utils.ExporterMetrics{},
}
fCsv.Cfg().Fields = []*config.FCTemplate{
{
@@ -183,14 +183,14 @@ func TestFileCsvExportEvent(t *testing.T) {
filterS := engine.NewFilterS(cfg, nil, newDM)
byteBuff := new(bytes.Buffer)
csvNW := csv.NewWriter(byteBuff)
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
fCsv := &FileCSVee{
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
wrtr: nopCloser{byteBuff},
csvWriter: csvNW,
dc: dc,
em: em,
}
if err := fCsv.ExportEvent(context.Background(), []string{"value", "3"}, ""); err != nil {
@@ -217,7 +217,7 @@ func TestFileCsvOnEvictedTrailer(t *testing.T) {
filterS: filterS,
wrtr: nopCloserWrite{byteBuff},
csvWriter: csvNW,
dc: &utils.ExporterMetrics{},
em: &utils.ExporterMetrics{},
}
fCsv.Cfg().Fields = []*config.FCTemplate{
{
@@ -252,7 +252,7 @@ func TestFileCsvOnEvictedClose(t *testing.T) {
filterS: filterS,
wrtr: nopCloserError{byteBuff},
csvWriter: csvNW,
dc: &utils.ExporterMetrics{},
em: &utils.ExporterMetrics{},
}
fCsv.Cfg().Fields = []*config.FCTemplate{
{

View File

@@ -31,10 +31,10 @@ import (
"github.com/cgrates/cgrates/utils"
)
func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, dc *utils.ExporterMetrics, writer io.Writer) (fFwv *FileFWVee, err error) {
func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, em *utils.ExporterMetrics, writer io.Writer) (fFwv *FileFWVee, err error) {
fFwv = &FileFWVee{
cfg: cfg,
dc: dc,
em: em,
cgrCfg: cgrCfg,
filterS: filterS,
@@ -46,7 +46,7 @@ func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filter
// FileFWVee implements EventExporter interface for .fwv files
type FileFWVee struct {
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
writer io.WriteCloser
sync.Mutex
slicePreparing
@@ -60,7 +60,7 @@ type FileFWVee struct {
func (fFwv *FileFWVee) init(writer io.Writer) (err error) {
filePath := path.Join(fFwv.Cfg().ExportPath,
fFwv.Cfg().ID+utils.Underline+utils.UUIDSha1Prefix()+utils.FWVSuffix)
fFwv.dc.Set([]string{utils.ExportPath}, filePath)
fFwv.em.Set([]string{utils.ExportPath}, filePath)
// create the file
if fFwv.cfg.ExportPath == utils.MetaBuffer {
fFwv.writer = &buffer{writer}
@@ -76,7 +76,7 @@ func (fFwv *FileFWVee) composeHeader() (err error) {
return
}
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, fFwv.Cfg().HeaderFields(), fFwv.dc, fFwv.cgrCfg, fFwv.filterS); err != nil {
if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, fFwv.Cfg().HeaderFields(), fFwv.em, fFwv.cgrCfg, fFwv.filterS); err != nil {
return
}
for _, record := range exp.OrderedFieldsAsStrings() {
@@ -94,7 +94,7 @@ func (fFwv *FileFWVee) composeTrailer() (err error) {
return
}
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(context.Background(), utils.MetaTrl, fFwv.Cfg().TrailerFields(), fFwv.dc, fFwv.cgrCfg, fFwv.filterS); err != nil {
if exp, err = composeHeaderTrailer(context.Background(), utils.MetaTrl, fFwv.Cfg().TrailerFields(), fFwv.em, fFwv.cgrCfg, fFwv.filterS); err != nil {
return
}
for _, record := range exp.OrderedFieldsAsStrings() {
@@ -137,6 +137,6 @@ func (fFwv *FileFWVee) Close() (err error) {
return
}
func (fFwv *FileFWVee) GetMetrics() *utils.ExporterMetrics { return fFwv.dc }
func (fFwv *FileFWVee) GetMetrics() *utils.ExporterMetrics { return fFwv.em }
func (fFwv *FileFWVee) ExtraData(ev *utils.CGREvent) any { return nil }

View File

@@ -157,11 +157,11 @@ func TestFileFwvInit(t *testing.T) {
if err := os.MkdirAll("/tmp/TestInitFileFWV", 0666); err != nil {
t.Error(err)
}
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
fFwv := &FileFWVee{
cgrCfg: cgrCfg,
cfg: cgrCfg.EEsCfg().Exporters[0],
dc: dc,
em: em,
}
if err := fFwv.init(io.Discard); err != nil {
t.Error(err)

View File

@@ -33,11 +33,11 @@ import (
)
func TestFileFwvGetMetrics(t *testing.T) {
dc := utils.NewExporterMetrics("", time.Local)
fFwv := &FileFWVee{dc: dc}
em := utils.NewExporterMetrics("", time.Local)
fFwv := &FileFWVee{em: em}
if rcv := fFwv.GetMetrics(); !reflect.DeepEqual(rcv, fFwv.dc) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(fFwv.dc))
if rcv := fFwv.GetMetrics(); !reflect.DeepEqual(rcv, fFwv.em) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(fFwv.em))
}
}
@@ -54,7 +54,7 @@ func TestFileFwvComposeHeader(t *testing.T) {
cgrCfg: cfg,
filterS: filterS,
writer: nopCloser{byteBuff},
dc: &utils.ExporterMetrics{},
em: &utils.ExporterMetrics{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
{
@@ -117,7 +117,7 @@ func TestFileFwvComposeTrailer(t *testing.T) {
cgrCfg: cfg,
filterS: filterS,
writer: nopCloser{byteBuff},
dc: &utils.ExporterMetrics{},
em: &utils.ExporterMetrics{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
{
@@ -175,13 +175,13 @@ func TestFileFwvExportEvent(t *testing.T) {
filterS := engine.NewFilterS(cfg, nil, newDM)
byteBuff := new(bytes.Buffer)
csvNW := csv.NewWriter(byteBuff)
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
fFwv := &FileFWVee{
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
writer: nopCloser{byteBuff},
dc: dc,
em: em,
}
if err := fFwv.ExportEvent(context.Background(), []string{"value", "3"}, ""); err != nil {
t.Error(err)
@@ -209,13 +209,13 @@ func TestFileFwvExportEventWriteError(t *testing.T) {
newDM := engine.NewDataManager(dbCM, cfg, nil)
filterS := engine.NewFilterS(cfg, nil, newDM)
byteBuff := new(bytes.Buffer)
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
fFwv := &FileFWVee{
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
writer: nopCloserWrite{byteBuff},
dc: dc,
em: em,
}
if err := fFwv.ExportEvent(context.Background(), []string{""}, ""); err == nil || err != utils.ErrNotImplemented {
t.Errorf("Expected %q but received %q", utils.ErrNotImplemented, err)
@@ -234,7 +234,7 @@ func TestFileFwvComposeHeaderWriteError(t *testing.T) {
cgrCfg: cfg,
filterS: filterS,
writer: nopCloserWrite{byteBuff},
dc: &utils.ExporterMetrics{},
em: &utils.ExporterMetrics{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
{
@@ -267,7 +267,7 @@ func TestFileFwvComposeTrailerWriteError(t *testing.T) {
cgrCfg: cfg,
filterS: filterS,
writer: nopCloserWrite{byteBuff},
dc: &utils.ExporterMetrics{},
em: &utils.ExporterMetrics{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
{
@@ -299,7 +299,7 @@ func TestFileFwvOnEvictedTrailer(t *testing.T) {
cgrCfg: cfg,
filterS: filterS,
writer: nopCloserWrite{byteBuff},
dc: &utils.ExporterMetrics{},
em: &utils.ExporterMetrics{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
{
@@ -338,7 +338,7 @@ func TestFileFwvOnEvictedClose(t *testing.T) {
cgrCfg: cfg,
filterS: filterS,
writer: nopCloserError{byteBuff},
dc: &utils.ExporterMetrics{},
em: &utils.ExporterMetrics{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
{

View File

@@ -34,10 +34,10 @@ import (
)
func NewHTTPjsonMapEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS,
dc *utils.ExporterMetrics) (pstrJSON *HTTPjsonMapEE, err error) {
em *utils.ExporterMetrics) (pstrJSON *HTTPjsonMapEE, err error) {
pstrJSON = &HTTPjsonMapEE{
cfg: cfg,
dc: dc,
em: em,
client: &http.Client{Transport: engine.HTTPPstrTransport(), Timeout: cgrCfg.GeneralCfg().ReplyTimeout},
reqs: newConcReq(cfg.ConcurrentRequests),
}
@@ -48,7 +48,7 @@ func NewHTTPjsonMapEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, fi
// HTTPjsonMapEE implements EventExporter interface for .csv files
type HTTPjsonMapEE struct {
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
client *http.Client
reqs *concReq
@@ -62,7 +62,7 @@ func (httpEE *HTTPjsonMapEE) composeHeader(cgrCfg *config.CGRConfig, filterS *en
return
}
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, httpEE.Cfg().HeaderFields(), httpEE.dc, cgrCfg, filterS); err != nil {
if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, httpEE.Cfg().HeaderFields(), httpEE.em, cgrCfg, filterS); err != nil {
return
}
for el := exp.GetFirstElement(); el != nil; el = el.Next() {
@@ -92,7 +92,7 @@ func (httpEE *HTTPjsonMapEE) ExportEvent(ctx *context.Context, content, _ any) (
func (httpEE *HTTPjsonMapEE) Close() (_ error) { return }
func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.ExporterMetrics { return httpEE.dc }
func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.ExporterMetrics { return httpEE.em }
func (httpEE *HTTPjsonMapEE) ExtraData(ev *utils.CGREvent) any { return nil }

View File

@@ -33,13 +33,13 @@ import (
)
func TestHttpJsonMapGetMetrics(t *testing.T) {
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
httpEE := &HTTPjsonMapEE{
dc: dc,
em: em,
}
if rcv := httpEE.GetMetrics(); !reflect.DeepEqual(rcv, httpEE.dc) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(httpEE.dc))
if rcv := httpEE.GetMetrics(); !reflect.DeepEqual(rcv, httpEE.em) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(httpEE.em))
}
}

View File

@@ -30,10 +30,10 @@ import (
)
func NewHTTPPostEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS,
dc *utils.ExporterMetrics) (httpPost *HTTPPostEE, err error) {
em *utils.ExporterMetrics) (httpPost *HTTPPostEE, err error) {
httpPost = &HTTPPostEE{
cfg: cfg,
dc: dc,
em: em,
client: &http.Client{Transport: engine.HTTPPstrTransport(), Timeout: cgrCfg.GeneralCfg().ReplyTimeout},
reqs: newConcReq(cfg.ConcurrentRequests),
}
@@ -44,7 +44,7 @@ func NewHTTPPostEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filte
// FileCSVee implements EventExporter interface for .csv files
type HTTPPostEE struct {
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
client *http.Client
reqs *concReq
@@ -62,7 +62,7 @@ func (httpPost *HTTPPostEE) composeHeader(cgrCfg *config.CGRConfig, filterS *eng
return
}
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, httpPost.Cfg().HeaderFields(), httpPost.dc, cgrCfg, filterS); err != nil {
if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, httpPost.Cfg().HeaderFields(), httpPost.em, cgrCfg, filterS); err != nil {
return
}
for el := exp.GetFirstElement(); el != nil; el = el.Next() {
@@ -92,7 +92,7 @@ func (httpPost *HTTPPostEE) ExportEvent(ctx *context.Context, content, _ any) (e
func (httpPost *HTTPPostEE) Close() (_ error) { return }
func (httpPost *HTTPPostEE) GetMetrics() *utils.ExporterMetrics { return httpPost.dc }
func (httpPost *HTTPPostEE) GetMetrics() *utils.ExporterMetrics { return httpPost.em }
func (httpPost *HTTPPostEE) ExtraData(ev *utils.CGREvent) any { return nil }

View File

@@ -34,13 +34,13 @@ import (
)
func TestHttpPostGetMetrics(t *testing.T) {
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
httpPost := &HTTPPostEE{
dc: dc,
em: em,
}
if rcv := httpPost.GetMetrics(); !reflect.DeepEqual(rcv, httpPost.dc) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(httpPost.dc))
if rcv := httpPost.GetMetrics(); !reflect.DeepEqual(rcv, httpPost.em) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(httpPost.em))
}
}

View File

@@ -32,10 +32,10 @@ import (
)
// NewKafkaEE creates a kafka poster
func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) (*KafkaEE, error) {
func NewKafkaEE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) (*KafkaEE, error) {
pstr := &KafkaEE{
cfg: cfg,
dc: dc,
em: em,
reqs: newConcReq(cfg.ConcurrentRequests),
}
@@ -100,7 +100,7 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) (*Kafka
type KafkaEE struct {
writer *kafka.Writer
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
reqs *concReq
bytePreparing
}
@@ -131,7 +131,7 @@ func (k *KafkaEE) Close() error {
return k.writer.Close()
}
func (k *KafkaEE) GetMetrics() *utils.ExporterMetrics { return k.dc }
func (k *KafkaEE) GetMetrics() *utils.ExporterMetrics { return k.em }
func (k *KafkaEE) ExtraData(ev *utils.CGREvent) any {
return utils.ConcatenatedKey(
utils.FirstNonEmpty(engine.MapEvent(ev.APIOpts).GetStringIgnoreErrors(utils.MetaOriginID), utils.GenUUID()),

View File

@@ -27,17 +27,17 @@ import (
"github.com/cgrates/cgrates/utils"
)
func NewLogEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *LogEE {
func NewLogEE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *LogEE {
return &LogEE{
cfg: cfg,
dc: dc,
em: em,
}
}
// LogEE implements EventExporter interface for .csv files
type LogEE struct {
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
}
func (vEe *LogEE) Cfg() *config.EventExporterCfg { return vEe.cfg }
@@ -49,7 +49,7 @@ func (vEe *LogEE) ExportEvent(_ *context.Context, mp, _ any) error {
return nil
}
func (vEe *LogEE) Close() error { return nil }
func (vEe *LogEE) GetMetrics() *utils.ExporterMetrics { return vEe.dc }
func (vEe *LogEE) GetMetrics() *utils.ExporterMetrics { return vEe.em }
func (vEe *LogEE) ExtraData(ev *utils.CGREvent) any { return nil }
func (vEe *LogEE) PrepareMap(mp *utils.CGREvent) (any, error) {
return mp.Event, nil

View File

@@ -32,14 +32,14 @@ import (
func TestNewLogEE(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
expected := &LogEE{
cfg: cfg.EEsCfg().ExporterCfg(utils.MetaDefault),
dc: dc,
em: em,
}
rcv := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), dc)
rcv := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), em)
if !reflect.DeepEqual(rcv, expected) {
t.Errorf("Expected %v \n but received \n %v", expected, rcv)
}
@@ -47,8 +47,8 @@ func TestNewLogEE(t *testing.T) {
func TestLogEEExportEvent(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
dc := utils.NewExporterMetrics("", time.Local)
logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), dc)
em := utils.NewExporterMetrics("", time.Local)
logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), em)
mp := map[string]any{
"field1": 2,
"field2": "value",
@@ -71,7 +71,7 @@ func TestLogEE_GetMetrics(t *testing.T) {
mockMetrics := &utils.ExporterMetrics{}
vEe := &LogEE{
dc: mockMetrics,
em: mockMetrics,
}
result := vEe.GetMetrics()
@@ -83,8 +83,8 @@ func TestLogEE_GetMetrics(t *testing.T) {
func TestLogEEPrepareMap(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
dc := utils.NewExporterMetrics("", time.Local)
logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), dc)
em := utils.NewExporterMetrics("", time.Local)
logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), em)
mp := &utils.CGREvent{
Event: map[string]any{
"field1": 2,
@@ -99,8 +99,8 @@ func TestLogEEPrepareMap(t *testing.T) {
func TestLogEEPrepareOrderMap(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
dc := utils.NewExporterMetrics("", time.Local)
logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), dc)
em := utils.NewExporterMetrics("", time.Local)
logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), em)
mp := utils.NewOrderedNavigableMap()
fullPath := &utils.FullPath{
PathSlice: []string{"*path1"},

View File

@@ -34,10 +34,10 @@ import (
)
// NewNatsEE creates a kafka poster
func NewNatsEE(cfg *config.EventExporterCfg, nodeID string, connTimeout time.Duration, dc *utils.ExporterMetrics) (natsPstr *NatsEE, err error) {
func NewNatsEE(cfg *config.EventExporterCfg, nodeID string, connTimeout time.Duration, em *utils.ExporterMetrics) (natsPstr *NatsEE, err error) {
natsPstr = &NatsEE{
cfg: cfg,
dc: dc,
em: em,
subject: utils.DefaultQueueID,
reqs: newConcReq(cfg.ConcurrentRequests),
}
@@ -55,7 +55,7 @@ type NatsEE struct {
posterJS jetstream.JetStream
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
reqs *concReq
sync.RWMutex // protect writer
bytePreparing
@@ -127,7 +127,7 @@ func (pstr *NatsEE) Close() error {
return err
}
func (pstr *NatsEE) GetMetrics() *utils.ExporterMetrics { return pstr.dc }
func (pstr *NatsEE) GetMetrics() *utils.ExporterMetrics { return pstr.em }
func (pstr *NatsEE) ExtraData(ev *utils.CGREvent) any { return nil }

View File

@@ -29,11 +29,11 @@ import (
"github.com/cgrates/cgrates/utils"
)
func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics,
func NewRpcEE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics,
connMgr *engine.ConnManager) (e *RPCee, err error) {
e = &RPCee{
cfg: cfg,
dc: dc,
em: em,
connMgr: connMgr,
}
err = e.parseOpts()
@@ -42,7 +42,7 @@ func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics,
type RPCee struct {
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
connMgr *engine.ConnManager
//opts
@@ -82,7 +82,7 @@ func (e *RPCee) Close() (err error) {
}
func (e *RPCee) GetMetrics() (mp *utils.ExporterMetrics) {
return e.dc
return e.em
}
func (e *RPCee) ExtraData(ev *utils.CGREvent) any { return nil }

View File

@@ -30,17 +30,17 @@ import (
func TestNewRpcEE(t *testing.T) {
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
connMgr := engine.NewConnManager(config.NewDefaultCGRConfig())
rcv, err := NewRpcEE(eeSCfg, dc, connMgr)
rcv, err := NewRpcEE(eeSCfg, em, connMgr)
if err != nil {
t.Error(err)
}
exp := &RPCee{
cfg: eeSCfg,
dc: dc,
em: em,
connMgr: connMgr,
}
@@ -100,9 +100,9 @@ func TestRPCCfg(t *testing.T) {
func TestRPCConnect(t *testing.T) {
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
connMgr := engine.NewConnManager(config.NewDefaultCGRConfig())
rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr)
rpcEe, err := NewRpcEE(eeSCfg, em, connMgr)
if err != nil {
t.Error(err)
}
@@ -113,12 +113,9 @@ func TestRPCConnect(t *testing.T) {
// func TestRPCExportEvent(t *testing.T) {
// eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
// dc, err := newEEMetrics("Local")
// if err != nil {
// t.Error(err)
// }
// em := utils.NewExporterMetrics("",time.Local)
// connMgr := engine.NewConnManager(config.NewDefaultCGRConfig())
// rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr)
// rpcEe, err := NewRpcEE(eeSCfg, em, connMgr)
// if err != nil {
// t.Error(err)
// }
@@ -142,9 +139,9 @@ func TestRPCConnect(t *testing.T) {
func TestRPCClose(t *testing.T) {
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
connMgr := engine.NewConnManager(config.NewDefaultCGRConfig())
rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr)
rpcEe, err := NewRpcEE(eeSCfg, em, connMgr)
if err != nil {
t.Error(err)
}
@@ -158,28 +155,28 @@ func TestRPCClose(t *testing.T) {
func TestRPCGetMetrics(t *testing.T) {
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
dc := &utils.ExporterMetrics{
em := &utils.ExporterMetrics{
MapStorage: utils.MapStorage{
"time": "now",
"just_a_field": "just_a_value",
},
}
connMgr := engine.NewConnManager(config.NewDefaultCGRConfig())
rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr)
rpcEe, err := NewRpcEE(eeSCfg, em, connMgr)
if err != nil {
t.Error(err)
}
if rcv := rpcEe.GetMetrics(); !reflect.DeepEqual(rcv, dc) {
t.Errorf("Expected %v \n but received \n %v", dc, rcv)
if rcv := rpcEe.GetMetrics(); !reflect.DeepEqual(rcv, em) {
t.Errorf("Expected %v \n but received \n %v", em, rcv)
}
}
func TestRPCPrepareMap(t *testing.T) {
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
connMgr := engine.NewConnManager(config.NewDefaultCGRConfig())
rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr)
rpcEe, err := NewRpcEE(eeSCfg, em, connMgr)
if err != nil {
t.Error(err)
}

View File

@@ -34,10 +34,10 @@ import (
)
// NewS3EE creates a s3 poster
func NewS3EE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *S3EE {
func NewS3EE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *S3EE {
pstr := &S3EE{
cfg: cfg,
dc: dc,
em: em,
reqs: newConcReq(cfg.ConcurrentRequests),
}
pstr.parseOpts(cfg.Opts)
@@ -56,7 +56,7 @@ type S3EE struct {
up *s3manager.Uploader
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
reqs *concReq
sync.RWMutex // protect connection
bytePreparing
@@ -138,7 +138,7 @@ func (pstr *S3EE) ExportEvent(ctx *context.Context, message, extraData any) (err
func (pstr *S3EE) Close() (_ error) { return }
func (pstr *S3EE) GetMetrics() *utils.ExporterMetrics { return pstr.dc }
func (pstr *S3EE) GetMetrics() *utils.ExporterMetrics { return pstr.em }
func (pstr *S3EE) ExtraData(ev *utils.CGREvent) any {
return utils.ConcatenatedKey(

View File

@@ -36,10 +36,10 @@ import (
)
func NewSQLEe(cfg *config.EventExporterCfg,
dc *utils.ExporterMetrics) (sqlEe *SQLEe, err error) {
em *utils.ExporterMetrics) (sqlEe *SQLEe, err error) {
sqlEe = &SQLEe{
cfg: cfg,
dc: dc,
em: em,
reqs: newConcReq(cfg.ConcurrentRequests),
}
err = sqlEe.initDialector()
@@ -49,7 +49,7 @@ func NewSQLEe(cfg *config.EventExporterCfg,
// SQLEe implements EventExporter interface for SQL
type SQLEe struct {
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
db *gorm.DB
sqldb *sql.DB
reqs *concReq
@@ -157,7 +157,7 @@ func (sqlEe *SQLEe) Close() (err error) {
return
}
func (sqlEe *SQLEe) GetMetrics() *utils.ExporterMetrics { return sqlEe.dc }
func (sqlEe *SQLEe) GetMetrics() *utils.ExporterMetrics { return sqlEe.em }
func (sqlEe *SQLEe) ExtraData(ev *utils.CGREvent) any { return nil }

View File

@@ -33,12 +33,12 @@ import (
)
func TestSqlGetMetrics(t *testing.T) {
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
sqlEe := &SQLEe{
dc: dc,
em: em,
}
if rcv := sqlEe.GetMetrics(); !reflect.DeepEqual(rcv, sqlEe.dc) {
t.Errorf("Expected %+v but got %+v", utils.ToJSON(rcv), utils.ToJSON(sqlEe.dc))
if rcv := sqlEe.GetMetrics(); !reflect.DeepEqual(rcv, sqlEe.em) {
t.Errorf("Expected %+v but got %+v", utils.ToJSON(rcv), utils.ToJSON(sqlEe.em))
}
}

View File

@@ -32,10 +32,10 @@ import (
)
// NewSQSee creates a poster for sqs
func NewSQSee(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *SQSee {
func NewSQSee(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *SQSee {
pstr := &SQSee{
cfg: cfg,
dc: dc,
em: em,
reqs: newConcReq(cfg.ConcurrentRequests),
}
pstr.parseOpts(cfg.Opts)
@@ -54,7 +54,7 @@ type SQSee struct {
svc *sqs.SQS
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
reqs *concReq
sync.RWMutex // protect connection
bytePreparing
@@ -146,6 +146,6 @@ func (pstr *SQSee) ExportEvent(ctx *context.Context, message, _ any) (err error)
func (pstr *SQSee) Close() (_ error) { return }
func (pstr *SQSee) GetMetrics() *utils.ExporterMetrics { return pstr.dc }
func (pstr *SQSee) GetMetrics() *utils.ExporterMetrics { return pstr.em }
func (pstr *SQSee) ExtraData(ev *utils.CGREvent) any { return nil }

View File

@@ -24,24 +24,24 @@ import (
"github.com/cgrates/cgrates/utils"
)
func NewVirtualEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *VirtualEE {
func NewVirtualEE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *VirtualEE {
return &VirtualEE{
cfg: cfg,
dc: dc,
em: em,
}
}
// VirtualEE implements EventExporter interface for .csv files
type VirtualEE struct {
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
}
func (vEe *VirtualEE) Cfg() *config.EventExporterCfg { return vEe.cfg }
func (vEe *VirtualEE) Connect() error { return nil }
func (vEe *VirtualEE) ExportEvent(*context.Context, any, any) error { return nil }
func (vEe *VirtualEE) Close() error { return nil }
func (vEe *VirtualEE) GetMetrics() *utils.ExporterMetrics { return vEe.dc }
func (vEe *VirtualEE) GetMetrics() *utils.ExporterMetrics { return vEe.em }
func (vEe *VirtualEE) ExtraData(*utils.CGREvent) any { return nil }
func (vEe *VirtualEE) PrepareMap(mp *utils.CGREvent) (any, error) { return nil, nil }
func (vEe *VirtualEE) PrepareOrderMap(*utils.OrderedNavigableMap) (any, error) {

View File

@@ -28,13 +28,13 @@ import (
)
func TestVirtualEeGetMetrics(t *testing.T) {
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
vEe := &VirtualEE{
dc: dc,
em: em,
}
if rcv := vEe.GetMetrics(); !reflect.DeepEqual(rcv, vEe.dc) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(vEe.dc))
if rcv := vEe.GetMetrics(); !reflect.DeepEqual(rcv, vEe.em) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(vEe.em))
}
}
func TestVirtualEeExportEvent(t *testing.T) {

View File

@@ -392,7 +392,7 @@ const (
MetaTpes = "*tpes"
MetaFilters = "*filters"
MetaCDRs = "*cdrs"
MetaDC = "*dc"
MetaEM = "*em"
MetaCaches = "*caches"
MetaUCH = "*uch"
MetaGuardian = "*guardians"