rename *dc DataProvider to *em (from EventMetrics)

This commit is contained in:
ionutboangiu
2025-03-14 20:24:46 +02:00
committed by Dan Christian Bogos
parent e1c154b9e0
commit 7bb065fda6
40 changed files with 283 additions and 283 deletions

View File

@@ -98,7 +98,7 @@
{"tag": "RatingPlan", "path": "*hdr.RatingPlan", "type": "*constant", "value": "RatingPlan"},
{"tag": "RatingPlanSubject", "path": "*hdr.RatingPlanSubject", "type": "*constant", "value": "RatingPlanSubject"},
{"tag": "Number", "path": "*exp.Number", "type": "*variable", "value": "~*dc.NumberOfEvents"},
{"tag": "Number", "path": "*exp.Number", "type": "*variable", "value": "~*em.NumberOfEvents"},
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"},
{"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"},
{"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"},
@@ -117,10 +117,10 @@
{"tag": "RatingPlan", "path": "*exp.RatingPlan", "type": "*variable", "value": "~*ec.Charges[0].Rating.RatingFilter.RatingPlanID"},
{"tag": "RatingPlanSubject", "path": "*exp.RatingPlanSubject", "type": "*variable", "value": "~*ec.Charges[0].Rating.RatingFilter.Subject"},
{"tag": "NumberOfEvents", "path": "*trl.NumberOfEvents", "type": "*variable", "value": "~*dc.NumberOfEvents"},
{"tag": "TotalDuration", "path": "*trl.TotalDuration", "type": "*variable", "value": "~*dc.TotalDuration"},
{"tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*dc.TotalSMSUsage"},
{"tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*dc.TotalCost{*round:4}"},
{"tag": "NumberOfEvents", "path": "*trl.NumberOfEvents", "type": "*variable", "value": "~*em.NumberOfEvents"},
{"tag": "TotalDuration", "path": "*trl.TotalDuration", "type": "*variable", "value": "~*em.TotalDuration"},
{"tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*em.TotalSMSUsage"},
{"tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*em.TotalCost{*round:4}"},
],
},
{
@@ -137,7 +137,7 @@
{"tag": "DistributorCode", "path": "*hdr.DistributorCode",
"type": "*constant", "value": "VOI","width": 3},
{"tag": "FileCreationTime", "path": "*hdr.FileCreationTime",
"type": "*variable","value":"~*dc.TimeNow{*time_string:020106150400}",
"type": "*variable","value":"~*em.TimeNow{*time_string:020106150400}",
"width": 12 },
{"tag": "FileVersion", "path": "*hdr.FileVersion", "type": "*constant",
"value": "01","width": 2},
@@ -185,13 +185,13 @@
{"tag": "DistributorCode", "path": "*trl.DistributorCode",
"type": "*constant", "value": "VOI","width": 3},
{"tag": "NumberOfRecords", "path": "*trl.NumberOfRecords",
"type": "*variable", "value": "~*dc.NumberOfEvents","width": 6,"padding": "*zeroleft"},
"type": "*variable", "value": "~*em.NumberOfEvents","width": 6,"padding": "*zeroleft"},
{"tag": "CdrsDuration", "path": "*trl.CdrsDuration", "type": "*variable",
"value": "~*dc.TotalDuration","width": 8,"padding":"*zeroleft","layout": "seconds"},
"value": "~*em.TotalDuration","width": 8,"padding":"*zeroleft","layout": "seconds"},
{"tag": "FirstCdrTime", "path": "*trl.FirstCdrTime", "type": "*variable",
"value": "~*dc.FirstEventATime{*time_string:020106150400}", "width": 12},
"value": "~*em.FirstEventATime{*time_string:020106150400}", "width": 12},
{"tag": "LastCdrTime", "path": "*hdr.LastCdrTime", "type": "*variable",
"value": "~*dc.LastEventATime{*time_string:020106150400}", "width": 12,},
"value": "~*em.LastEventATime{*time_string:020106150400}", "width": 12,},
{"tag": "Filler2", "path": "*trl.Filler2", "type": "*filler",
"width": 93}
],

View File

@@ -52,4 +52,4 @@ cgrates.org,ROUTE_LCR,,,,,route_2,,,RP_TEST_2,,,,,,
cgrates.org,ROUTE_LOAD_DIST,FLTR_SPP_LOAD_DIST,,*load,route1:2;route2:7;*default:5,,,,,,,,,,20
cgrates.org,ROUTE_LOAD_DIST,,,,,route1,,,,,Stat_Supplier1:*sum#~*req.LoadReq,10,false,,
cgrates.org,ROUTE_LOAD_DIST,,,,,route2,,,,,Stat_Supplier2:*sum#~*req.LoadReq,20,,,
cgrates.org,ROUTE_LOAD_DIST,,,,,route3,,,,,Stat_Supplier3:*sum#~*req.LoadReq,35,,,
cgrates.org,ROUTE_LOAD_DIST,,,,,route3,,,,,Stat_Supplier3:*sum#~*req.LoadReq,35,,,
1 #Tenant ID FilterIDs ActivationInterval Sorting SortingParameters RouteID RouteFilterIDs RouteAccountIDs RouteRatingPlanIDs RouteResourceIDs RouteStatIDs RouteWeight RouteBlocker RouteParameters Weight
52
53
54
55

View File

@@ -30,10 +30,10 @@ import (
// NewAMQPee creates a new amqp poster
// "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs"
func NewAMQPee(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *AMQPee {
func NewAMQPee(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *AMQPee {
amqp := &AMQPee{
cfg: cfg,
dc: dc,
em: em,
reqs: newConcReq(cfg.ConcurrentRequests),
}
amqp.parseOpts(cfg.Opts)
@@ -50,7 +50,7 @@ type AMQPee struct {
postChan *amqp.Channel
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
reqs *concReq
sync.RWMutex // protect connection
bytePreparing
@@ -177,4 +177,4 @@ func (pstr *AMQPee) Close() (err error) {
return
}
func (pstr *AMQPee) GetMetrics() *utils.ExporterMetrics { return pstr.dc }
func (pstr *AMQPee) GetMetrics() *utils.ExporterMetrics { return pstr.em }

View File

@@ -28,7 +28,7 @@ import (
func TestAmqpGetMetrics(t *testing.T) {
expectedMetrics := &utils.ExporterMetrics{}
pstr := &AMQPee{
dc: expectedMetrics,
em: expectedMetrics,
}
result := pstr.GetMetrics()
if result != expectedMetrics {
@@ -50,7 +50,7 @@ func TestCfg(t *testing.T) {
func TestAmqpToGetMetrics(t *testing.T) {
expectedMetrics := &utils.ExporterMetrics{}
amqp := &AMQPv1EE{
dc: expectedMetrics,
em: expectedMetrics,
}
result := amqp.GetMetrics()
if result != expectedMetrics {

View File

@@ -28,10 +28,10 @@ import (
)
// NewAMQPv1EE creates a poster for amqpv1
func NewAMQPv1EE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *AMQPv1EE {
func NewAMQPv1EE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *AMQPv1EE {
pstr := &AMQPv1EE{
cfg: cfg,
dc: dc,
em: em,
queueID: "/" + utils.DefaultQueueID,
reqs: newConcReq(cfg.ConcurrentRequests),
}
@@ -56,7 +56,7 @@ type AMQPv1EE struct {
session *amqpv1.Session
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
reqs *concReq
sync.RWMutex // protect connection
bytePreparing
@@ -121,4 +121,4 @@ func (pstr *AMQPv1EE) Close() (err error) {
return
}
func (pstr *AMQPv1EE) GetMetrics() *utils.ExporterMetrics { return pstr.dc }
func (pstr *AMQPv1EE) GetMetrics() *utils.ExporterMetrics { return pstr.em }

118
ees/ee.go
View File

@@ -47,40 +47,40 @@ func NewEventExporter(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, fi
if err != nil {
return nil, err
}
dc := utils.NewExporterMetrics(cfg.MetricsResetSchedule, loc)
em := utils.NewExporterMetrics(cfg.MetricsResetSchedule, loc)
switch cfg.Type {
case utils.MetaFileCSV:
return NewFileCSVee(cfg, cgrCfg, filterS, dc)
return NewFileCSVee(cfg, cgrCfg, filterS, em)
case utils.MetaFileFWV:
return NewFileFWVee(cfg, cgrCfg, filterS, dc)
return NewFileFWVee(cfg, cgrCfg, filterS, em)
case utils.MetaHTTPPost:
return NewHTTPPostEE(cfg, cgrCfg, filterS, dc)
return NewHTTPPostEE(cfg, cgrCfg, filterS, em)
case utils.MetaHTTPjsonMap:
return NewHTTPjsonMapEE(cfg, cgrCfg, filterS, dc)
return NewHTTPjsonMapEE(cfg, cgrCfg, filterS, em)
case utils.MetaNatsjsonMap:
return NewNatsEE(cfg, cgrCfg.GeneralCfg().NodeID,
cgrCfg.GeneralCfg().ConnectTimeout, dc)
cgrCfg.GeneralCfg().ConnectTimeout, em)
case utils.MetaAMQPjsonMap:
return NewAMQPee(cfg, dc), nil
return NewAMQPee(cfg, em), nil
case utils.MetaAMQPV1jsonMap:
return NewAMQPv1EE(cfg, dc), nil
return NewAMQPv1EE(cfg, em), nil
case utils.MetaS3jsonMap:
return NewS3EE(cfg, dc), nil
return NewS3EE(cfg, em), nil
case utils.MetaSQSjsonMap:
return NewSQSee(cfg, dc), nil
return NewSQSee(cfg, em), nil
case utils.MetaKafkajsonMap:
return NewKafkaEE(cfg, dc)
return NewKafkaEE(cfg, em)
case utils.MetaVirt:
return NewVirtualEE(cfg, dc), nil
return NewVirtualEE(cfg, em), nil
case utils.MetaElastic:
return NewElasticEE(cfg, dc)
return NewElasticEE(cfg, em)
case utils.MetaSQL:
return NewSQLEe(cfg, dc)
return NewSQLEe(cfg, em)
case utils.MetaLog:
return NewLogEE(cfg, dc), nil
return NewLogEE(cfg, em), nil
case utils.MetaRPC:
return NewRpcEE(cfg, dc, connMngr)
return NewRpcEE(cfg, em, connMngr)
default:
return nil, fmt.Errorf("unsupported exporter type: <%s>", cfg.Type)
}
@@ -114,88 +114,88 @@ func (c *concReq) done() {
}
// composeHeaderTrailer will return the orderNM for *hdr or *trl
func composeHeaderTrailer(prfx string, fields []*config.FCTemplate, dc utils.DataStorage, cfg *config.CGRConfig, fltS *engine.FilterS) (r *utils.OrderedNavigableMap, err error) {
func composeHeaderTrailer(prfx string, fields []*config.FCTemplate, em utils.DataStorage, cfg *config.CGRConfig, fltS *engine.FilterS) (r *utils.OrderedNavigableMap, err error) {
r = utils.NewOrderedNavigableMap()
err = engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaDC: dc,
utils.MetaEM: em,
utils.MetaCfg: cfg.GetDataProvider(),
}, cfg.GeneralCfg().DefaultTenant, fltS,
map[string]*utils.OrderedNavigableMap{prfx: r}).SetFields(fields)
return
}
func updateEEMetrics(dc *utils.ExporterMetrics, cgrID string, ev engine.MapEvent, hasError bool, timezone string) {
dc.Lock()
defer dc.Unlock()
func updateEEMetrics(em *utils.ExporterMetrics, cgrID string, ev engine.MapEvent, hasError bool, timezone string) {
em.Lock()
defer em.Unlock()
if hasError {
dc.MapStorage[utils.NegativeExports].(utils.StringSet).Add(cgrID)
em.MapStorage[utils.NegativeExports].(utils.StringSet).Add(cgrID)
} else {
dc.MapStorage[utils.PositiveExports].(utils.StringSet).Add(cgrID)
em.MapStorage[utils.PositiveExports].(utils.StringSet).Add(cgrID)
}
if aTime, err := ev.GetTime(utils.AnswerTime, timezone); err == nil {
if _, has := dc.MapStorage[utils.FirstEventATime]; !has {
dc.MapStorage[utils.FirstEventATime] = time.Time{}
if _, has := em.MapStorage[utils.FirstEventATime]; !has {
em.MapStorage[utils.FirstEventATime] = time.Time{}
}
if _, has := dc.MapStorage[utils.LastEventATime]; !has {
dc.MapStorage[utils.LastEventATime] = time.Time{}
if _, has := em.MapStorage[utils.LastEventATime]; !has {
em.MapStorage[utils.LastEventATime] = time.Time{}
}
if dc.MapStorage[utils.FirstEventATime].(time.Time).IsZero() ||
aTime.Before(dc.MapStorage[utils.FirstEventATime].(time.Time)) {
dc.MapStorage[utils.FirstEventATime] = aTime
if em.MapStorage[utils.FirstEventATime].(time.Time).IsZero() ||
aTime.Before(em.MapStorage[utils.FirstEventATime].(time.Time)) {
em.MapStorage[utils.FirstEventATime] = aTime
}
if aTime.After(dc.MapStorage[utils.LastEventATime].(time.Time)) {
dc.MapStorage[utils.LastEventATime] = aTime
if aTime.After(em.MapStorage[utils.LastEventATime].(time.Time)) {
em.MapStorage[utils.LastEventATime] = aTime
}
}
if oID, err := ev.GetTInt64(utils.OrderID); err == nil {
if _, has := dc.MapStorage[utils.FirstExpOrderID]; !has {
dc.MapStorage[utils.FirstExpOrderID] = int64(0)
if _, has := em.MapStorage[utils.FirstExpOrderID]; !has {
em.MapStorage[utils.FirstExpOrderID] = int64(0)
}
if _, has := dc.MapStorage[utils.LastExpOrderID]; !has {
dc.MapStorage[utils.LastExpOrderID] = int64(0)
if _, has := em.MapStorage[utils.LastExpOrderID]; !has {
em.MapStorage[utils.LastExpOrderID] = int64(0)
}
if dc.MapStorage[utils.FirstExpOrderID].(int64) == 0 ||
dc.MapStorage[utils.FirstExpOrderID].(int64) > oID {
dc.MapStorage[utils.FirstExpOrderID] = oID
if em.MapStorage[utils.FirstExpOrderID].(int64) == 0 ||
em.MapStorage[utils.FirstExpOrderID].(int64) > oID {
em.MapStorage[utils.FirstExpOrderID] = oID
}
if dc.MapStorage[utils.LastExpOrderID].(int64) < oID {
dc.MapStorage[utils.LastExpOrderID] = oID
if em.MapStorage[utils.LastExpOrderID].(int64) < oID {
em.MapStorage[utils.LastExpOrderID] = oID
}
}
if cost, err := ev.GetFloat64(utils.Cost); err == nil {
if _, has := dc.MapStorage[utils.TotalCost]; !has {
dc.MapStorage[utils.TotalCost] = float64(0.0)
if _, has := em.MapStorage[utils.TotalCost]; !has {
em.MapStorage[utils.TotalCost] = float64(0.0)
}
dc.MapStorage[utils.TotalCost] = dc.MapStorage[utils.TotalCost].(float64) + cost
em.MapStorage[utils.TotalCost] = em.MapStorage[utils.TotalCost].(float64) + cost
}
if tor, err := ev.GetString(utils.ToR); err == nil {
if usage, err := ev.GetDuration(utils.Usage); err == nil {
switch tor {
case utils.MetaVoice:
if _, has := dc.MapStorage[utils.TotalDuration]; !has {
dc.MapStorage[utils.TotalDuration] = time.Duration(0)
if _, has := em.MapStorage[utils.TotalDuration]; !has {
em.MapStorage[utils.TotalDuration] = time.Duration(0)
}
dc.MapStorage[utils.TotalDuration] = dc.MapStorage[utils.TotalDuration].(time.Duration) + usage
em.MapStorage[utils.TotalDuration] = em.MapStorage[utils.TotalDuration].(time.Duration) + usage
case utils.MetaSMS:
if _, has := dc.MapStorage[utils.TotalSMSUsage]; !has {
dc.MapStorage[utils.TotalSMSUsage] = time.Duration(0)
if _, has := em.MapStorage[utils.TotalSMSUsage]; !has {
em.MapStorage[utils.TotalSMSUsage] = time.Duration(0)
}
dc.MapStorage[utils.TotalSMSUsage] = dc.MapStorage[utils.TotalSMSUsage].(time.Duration) + usage
em.MapStorage[utils.TotalSMSUsage] = em.MapStorage[utils.TotalSMSUsage].(time.Duration) + usage
case utils.MetaMMS:
if _, has := dc.MapStorage[utils.TotalMMSUsage]; !has {
dc.MapStorage[utils.TotalMMSUsage] = time.Duration(0)
if _, has := em.MapStorage[utils.TotalMMSUsage]; !has {
em.MapStorage[utils.TotalMMSUsage] = time.Duration(0)
}
dc.MapStorage[utils.TotalMMSUsage] = dc.MapStorage[utils.TotalMMSUsage].(time.Duration) + usage
em.MapStorage[utils.TotalMMSUsage] = em.MapStorage[utils.TotalMMSUsage].(time.Duration) + usage
case utils.MetaGeneric:
if _, has := dc.MapStorage[utils.TotalGenericUsage]; !has {
dc.MapStorage[utils.TotalGenericUsage] = time.Duration(0)
if _, has := em.MapStorage[utils.TotalGenericUsage]; !has {
em.MapStorage[utils.TotalGenericUsage] = time.Duration(0)
}
dc.MapStorage[utils.TotalGenericUsage] = dc.MapStorage[utils.TotalGenericUsage].(time.Duration) + usage
em.MapStorage[utils.TotalGenericUsage] = em.MapStorage[utils.TotalGenericUsage].(time.Duration) + usage
case utils.MetaData:
if _, has := dc.MapStorage[utils.TotalDataUsage]; !has {
dc.MapStorage[utils.TotalDataUsage] = time.Duration(0)
if _, has := em.MapStorage[utils.TotalDataUsage]; !has {
em.MapStorage[utils.TotalDataUsage] = time.Duration(0)
}
dc.MapStorage[utils.TotalDataUsage] = dc.MapStorage[utils.TotalDataUsage].(time.Duration) + usage
em.MapStorage[utils.TotalDataUsage] = em.MapStorage[utils.TotalDataUsage].(time.Duration) + usage
}
}
}

View File

@@ -40,18 +40,18 @@ func TestNewEventExporter(t *testing.T) {
if strings.Contains(errExpect, err.Error()) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
}
dc := utils.NewExporterMetrics("", time.Local)
eeExpect, err := NewFileCSVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc)
em := utils.NewExporterMetrics("", time.Local)
eeExpect, err := NewFileCSVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, em)
if strings.Contains(errExpect, err.Error()) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
}
err = eeExpect.init()
newEE := ee.(*FileCSVee)
newEE.dc.MapStorage[utils.TimeNow] = nil
newEE.dc.MapStorage[utils.ExportPath] = nil
newEE.em.MapStorage[utils.TimeNow] = nil
newEE.em.MapStorage[utils.ExportPath] = nil
eeExpect.csvWriter = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.ExportPath] = nil
eeExpect.em.MapStorage[utils.TimeNow] = nil
eeExpect.em.MapStorage[utils.ExportPath] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE))
}
@@ -68,17 +68,17 @@ func TestNewEventExporterCase2(t *testing.T) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
}
dc := utils.NewExporterMetrics("", time.Local)
eeExpect, err := NewFileFWVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc)
em := utils.NewExporterMetrics("", time.Local)
eeExpect, err := NewFileFWVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, em)
if strings.Contains(errExpect, err.Error()) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
}
err = eeExpect.init()
newEE := ee.(*FileFWVee)
newEE.dc.MapStorage[utils.TimeNow] = nil
newEE.dc.MapStorage[utils.ExportPath] = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.ExportPath] = nil
newEE.em.MapStorage[utils.TimeNow] = nil
newEE.em.MapStorage[utils.ExportPath] = nil
eeExpect.em.MapStorage[utils.TimeNow] = nil
eeExpect.em.MapStorage[utils.ExportPath] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE))
}
@@ -93,14 +93,14 @@ func TestNewEventExporterCase3(t *testing.T) {
if err != nil {
t.Error(err)
}
dc := utils.NewExporterMetrics("", time.Local)
eeExpect, err := NewHTTPPostEE(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc)
em := utils.NewExporterMetrics("", time.Local)
eeExpect, err := NewHTTPPostEE(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, em)
if err != nil {
t.Error(err)
}
newEE := ee.(*HTTPPostEE)
newEE.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
newEE.em.MapStorage[utils.TimeNow] = nil
eeExpect.em.MapStorage[utils.TimeNow] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE))
}
@@ -115,14 +115,14 @@ func TestNewEventExporterCase4(t *testing.T) {
if err != nil {
t.Error(err)
}
dc := utils.NewExporterMetrics("", time.Local)
eeExpect, err := NewHTTPjsonMapEE(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc)
em := utils.NewExporterMetrics("", time.Local)
eeExpect, err := NewHTTPjsonMapEE(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, em)
if err != nil {
t.Error(err)
}
newEE := ee.(*HTTPjsonMapEE)
newEE.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
newEE.em.MapStorage[utils.TimeNow] = nil
eeExpect.em.MapStorage[utils.TimeNow] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE))
}
@@ -137,11 +137,11 @@ func TestNewEventExporterCase6(t *testing.T) {
if err != nil {
t.Error(err)
}
dc := utils.NewExporterMetrics("", time.Local)
eeExpect := NewVirtualEE(cgrCfg.EEsCfg().Exporters[0], dc)
em := utils.NewExporterMetrics("", time.Local)
eeExpect := NewVirtualEE(cgrCfg.EEsCfg().Exporters[0], em)
newEE := ee.(*VirtualEE)
newEE.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
newEE.em.MapStorage[utils.TimeNow] = nil
eeExpect.em.MapStorage[utils.TimeNow] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE))
}
@@ -170,17 +170,17 @@ func TestNewEventExporterCase7(t *testing.T) {
if err != nil {
t.Error(err)
}
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
if err != nil {
t.Error(err)
}
eeExpect, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc)
eeExpect, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], em)
if err != nil {
t.Error(err)
}
newEE := ee.(*ElasticEE)
newEE.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
newEE.em.MapStorage[utils.TimeNow] = nil
eeExpect.em.MapStorage[utils.TimeNow] = nil
eeExpect.client = newEE.client
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", eeExpect, newEE)

View File

@@ -316,7 +316,7 @@ func exportEventWithExporter(exp EventExporter, ev *utils.CGREvent, oneTime bool
expNM := utils.NewOrderedNavigableMap()
dsMap := map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(ev.Event),
utils.MetaDC: exp.GetMetrics(),
utils.MetaEM: exp.GetMetrics(),
utils.MetaOpts: utils.MapStorage(ev.APIOpts),
utils.MetaCfg: cfg.GetDataProvider(),
utils.MetaVars: utils.MapStorage{utils.MetaTenant: ev.Tenant, utils.MetaExporterID: ev.APIOpts[utils.MetaExporterID]},

View File

@@ -292,7 +292,7 @@ func TestV1ProcessEvent4(t *testing.T) {
}
func newMockEventExporter() *mockEventExporter {
return &mockEventExporter{dc: &utils.ExporterMetrics{
return &mockEventExporter{em: &utils.ExporterMetrics{
MapStorage: utils.MapStorage{
utils.NumberOfEvents: int64(0),
utils.PositiveExports: utils.StringSet{},
@@ -301,12 +301,12 @@ func newMockEventExporter() *mockEventExporter {
}
type mockEventExporter struct {
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
bytePreparing
}
func (m mockEventExporter) GetMetrics() *utils.ExporterMetrics {
return m.dc
return m.em
}
func (mockEventExporter) Cfg() *config.EventExporterCfg { return new(config.EventExporterCfg) }
@@ -438,7 +438,7 @@ func TestOnCacheEvicted(t *testing.T) {
}
func TestUpdateEEMetrics(t *testing.T) {
dc := utils.NewExporterMetrics("", time.UTC)
em := utils.NewExporterMetrics("", time.UTC)
tnow := time.Now()
ev := engine.MapEvent{
utils.AnswerTime: tnow,
@@ -454,10 +454,10 @@ func TestUpdateEEMetrics(t *testing.T) {
exp.MapStorage[utils.LastExpOrderID] = int64(1)
exp.MapStorage[utils.TotalCost] = float64(5.5)
exp.MapStorage[utils.TotalDuration] = time.Second
exp.MapStorage[utils.TimeNow] = dc.MapStorage[utils.TimeNow]
exp.MapStorage[utils.TimeNow] = em.MapStorage[utils.TimeNow]
exp.MapStorage[utils.PositiveExports] = utils.StringSet{"": {}}
if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
if updateEEMetrics(em, "", ev, false, utils.EmptyString); !reflect.DeepEqual(em, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(em))
}
tnow = tnow.Add(24 * time.Hour)
@@ -472,8 +472,8 @@ func TestUpdateEEMetrics(t *testing.T) {
exp.MapStorage[utils.LastExpOrderID] = int64(2)
exp.MapStorage[utils.TotalCost] = float64(11)
exp.MapStorage[utils.TotalSMSUsage] = time.Second
if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
if updateEEMetrics(em, "", ev, false, utils.EmptyString); !reflect.DeepEqual(em, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(em))
}
tnow = tnow.Add(24 * time.Hour)
@@ -488,8 +488,8 @@ func TestUpdateEEMetrics(t *testing.T) {
exp.MapStorage[utils.LastExpOrderID] = int64(3)
exp.MapStorage[utils.TotalCost] = float64(16.5)
exp.MapStorage[utils.TotalMMSUsage] = time.Second
if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
if updateEEMetrics(em, "", ev, false, utils.EmptyString); !reflect.DeepEqual(em, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(em))
}
tnow = tnow.Add(24 * time.Hour)
@@ -504,8 +504,8 @@ func TestUpdateEEMetrics(t *testing.T) {
exp.MapStorage[utils.LastExpOrderID] = int64(4)
exp.MapStorage[utils.TotalCost] = float64(22)
exp.MapStorage[utils.TotalGenericUsage] = time.Second
if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
if updateEEMetrics(em, "", ev, false, utils.EmptyString); !reflect.DeepEqual(em, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(em))
}
tnow = tnow.Add(24 * time.Hour)
@@ -520,7 +520,7 @@ func TestUpdateEEMetrics(t *testing.T) {
exp.MapStorage[utils.LastExpOrderID] = int64(5)
exp.MapStorage[utils.TotalCost] = float64(27.5)
exp.MapStorage[utils.TotalDataUsage] = time.Second
if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
if updateEEMetrics(em, "", ev, false, utils.EmptyString); !reflect.DeepEqual(em, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(em))
}
}

View File

@@ -38,17 +38,17 @@ import (
type ElasticEE struct {
mu sync.RWMutex
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
reqs *concReq
client *elasticsearch.TypedClient
clientCfg elasticsearch.Config
}
func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) (*ElasticEE, error) {
func NewElasticEE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) (*ElasticEE, error) {
el := &ElasticEE{
cfg: cfg,
dc: dc,
em: em,
reqs: newConcReq(cfg.ConcurrentRequests),
}
if err := el.parseClientOpts(); err != nil {
@@ -222,4 +222,4 @@ func (e *ElasticEE) Close() error {
return nil
}
func (e *ElasticEE) GetMetrics() *utils.ExporterMetrics { return e.dc }
func (e *ElasticEE) GetMetrics() *utils.ExporterMetrics { return e.em }

View File

@@ -28,13 +28,13 @@ import (
)
func TestGetMetrics(t *testing.T) {
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
ee := &ElasticEE{
dc: dc,
em: em,
}
if rcv := ee.GetMetrics(); !reflect.DeepEqual(rcv, ee.dc) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(ee.dc))
if rcv := ee.GetMetrics(); !reflect.DeepEqual(rcv, ee.em) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(ee.em))
}
}
@@ -59,8 +59,8 @@ func TestInitClient(t *testing.T) {
func TestElasticExportEventErr(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dc := utils.NewExporterMetrics("", time.Local)
eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc)
em := utils.NewExporterMetrics("", time.Local)
eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], em)
if err != nil {
t.Error(err)
}

View File

@@ -61,7 +61,7 @@ func TestExporterMetricsIT(t *testing.T) {
"synchronous": true,
"metrics_reset_schedule": "@every 5s",
"fields": [
{ "tag": "Number", "path": "*exp.Number", "type": "*variable", "value": "~*dc.NumberOfEvents" },
{ "tag": "Number", "path": "*exp.Number", "type": "*variable", "value": "~*em.NumberOfEvents" },
{ "tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID" },
{ "tag": "ToR", "path": "*exp.ToR", "type": "*constant", "value": "*sms" },
{ "tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account" },
@@ -70,10 +70,10 @@ func TestExporterMetricsIT(t *testing.T) {
{ "tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage" },
{ "tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}" },
{ "tag": "NumberOfEvents", "path": "*trl.NumberOfEvents", "type": "*variable", "value": "~*dc.NumberOfEvents" },
{ "tag": "TotalDuration", "path": "*trl.TotalDuration", "type": "*variable", "value": "~*dc.TotalDuration" },
{ "tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*dc.TotalSMSUsage" },
{ "tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*dc.TotalCost{*round:4}" }
{ "tag": "NumberOfEvents", "path": "*trl.NumberOfEvents", "type": "*variable", "value": "~*em.NumberOfEvents" },
{ "tag": "TotalDuration", "path": "*trl.TotalDuration", "type": "*variable", "value": "~*em.TotalDuration" },
{ "tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*em.TotalSMSUsage" },
{ "tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*em.TotalCost{*round:4}" }
]
}
]

View File

@@ -34,10 +34,10 @@ import (
func NewFileCSVee(cfg *config.EventExporterCfg,
cgrCfg *config.CGRConfig, filterS *engine.FilterS,
dc *utils.ExporterMetrics) (fCsv *FileCSVee, err error) {
em *utils.ExporterMetrics) (fCsv *FileCSVee, err error) {
fCsv = &FileCSVee{
cfg: cfg,
dc: dc,
em: em,
cgrCfg: cgrCfg,
filterS: filterS,
@@ -49,7 +49,7 @@ func NewFileCSVee(cfg *config.EventExporterCfg,
// FileCSVee implements EventExporter interface for .csv files
type FileCSVee struct {
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
file io.WriteCloser
csvWriter *csv.Writer
sync.Mutex
@@ -65,7 +65,7 @@ func (fCsv *FileCSVee) init() (err error) {
// create the file
filePath := path.Join(fCsv.Cfg().ExportPath,
fCsv.Cfg().ID+utils.Underline+utils.UUIDSha1Prefix()+utils.CSVSuffix)
fCsv.dc.Set([]string{utils.ExportPath}, filePath)
fCsv.em.Set([]string{utils.ExportPath}, filePath)
if fCsv.file, err = os.Create(filePath); err != nil {
return
}
@@ -81,7 +81,7 @@ func (fCsv *FileCSVee) init() (err error) {
func (fCsv *FileCSVee) composeHeader() (err error) {
if len(fCsv.Cfg().HeaderFields()) != 0 {
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(utils.MetaHdr, fCsv.Cfg().HeaderFields(), fCsv.dc, fCsv.cgrCfg, fCsv.filterS); err != nil {
if exp, err = composeHeaderTrailer(utils.MetaHdr, fCsv.Cfg().HeaderFields(), fCsv.em, fCsv.cgrCfg, fCsv.filterS); err != nil {
return
}
return fCsv.csvWriter.Write(exp.OrderedFieldsAsStrings())
@@ -93,7 +93,7 @@ func (fCsv *FileCSVee) composeHeader() (err error) {
func (fCsv *FileCSVee) composeTrailer() (err error) {
if len(fCsv.Cfg().TrailerFields()) != 0 {
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(utils.MetaTrl, fCsv.Cfg().TrailerFields(), fCsv.dc, fCsv.cgrCfg, fCsv.filterS); err != nil {
if exp, err = composeHeaderTrailer(utils.MetaTrl, fCsv.Cfg().TrailerFields(), fCsv.em, fCsv.cgrCfg, fCsv.filterS); err != nil {
return
}
return fCsv.csvWriter.Write(exp.OrderedFieldsAsStrings())
@@ -127,4 +127,4 @@ func (fCsv *FileCSVee) Close() (err error) {
return
}
func (fCsv *FileCSVee) GetMetrics() *utils.ExporterMetrics { return fCsv.dc }
func (fCsv *FileCSVee) GetMetrics() *utils.ExporterMetrics { return fCsv.em }

View File

@@ -677,11 +677,11 @@ func TestCsvInitFileCSV(t *testing.T) {
if err := os.MkdirAll("/tmp/TestInitFileCSV", 0666); err != nil {
t.Error(err)
}
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
fCsv := &FileCSVee{
cgrCfg: cgrCfg,
cfg: cgrCfg.EEsCfg().Exporters[0],
dc: dc,
em: em,
}
if err := fCsv.init(); err != nil {
t.Error(err)

View File

@@ -32,11 +32,11 @@ import (
)
func TestFileCsvGetMetrics(t *testing.T) {
dc := utils.NewExporterMetrics("", time.Local)
fCsv := &FileCSVee{dc: dc}
em := utils.NewExporterMetrics("", time.Local)
fCsv := &FileCSVee{em: em}
if rcv := fCsv.GetMetrics(); !reflect.DeepEqual(rcv, fCsv.dc) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(fCsv.dc))
if rcv := fCsv.GetMetrics(); !reflect.DeepEqual(rcv, fCsv.em) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(fCsv.em))
}
}
@@ -59,7 +59,7 @@ func TestFileCsvComposeHeader(t *testing.T) {
filterS: filterS,
file: nopCloser{byteBuff},
csvWriter: csvNW,
dc: &utils.ExporterMetrics{},
em: &utils.ExporterMetrics{},
}
fCsv.Cfg().Fields = []*config.FCTemplate{
{
@@ -122,7 +122,7 @@ func TestFileCsvComposeTrailer(t *testing.T) {
filterS: filterS,
file: nopCloser{byteBuff},
csvWriter: csvNW,
dc: &utils.ExporterMetrics{},
em: &utils.ExporterMetrics{},
}
fCsv.Cfg().Fields = []*config.FCTemplate{
{
@@ -179,14 +179,14 @@ func TestFileCsvExportEvent(t *testing.T) {
filterS := engine.NewFilterS(cfg, nil, newDM)
byteBuff := new(bytes.Buffer)
csvNW := csv.NewWriter(byteBuff)
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
fCsv := &FileCSVee{
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
file: nopCloser{byteBuff},
csvWriter: csvNW,
dc: dc,
em: em,
}
if err := fCsv.ExportEvent([]string{"value", "3"}, ""); err != nil {
@@ -212,7 +212,7 @@ func TestFileCsvOnEvictedTrailer(t *testing.T) {
filterS: filterS,
file: nopCloserWrite{byteBuff},
csvWriter: csvNW,
dc: &utils.ExporterMetrics{},
em: &utils.ExporterMetrics{},
}
fCsv.Cfg().Fields = []*config.FCTemplate{
{
@@ -246,7 +246,7 @@ func TestFileCsvOnEvictedClose(t *testing.T) {
filterS: filterS,
file: nopCloserError{byteBuff},
csvWriter: csvNW,
dc: &utils.ExporterMetrics{},
em: &utils.ExporterMetrics{},
}
fCsv.Cfg().Fields = []*config.FCTemplate{
{

View File

@@ -30,10 +30,10 @@ import (
"github.com/cgrates/cgrates/utils"
)
func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, dc *utils.ExporterMetrics) (fFwv *FileFWVee, err error) {
func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, em *utils.ExporterMetrics) (fFwv *FileFWVee, err error) {
fFwv = &FileFWVee{
cfg: cfg,
dc: dc,
em: em,
cgrCfg: cgrCfg,
filterS: filterS,
@@ -45,7 +45,7 @@ func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filter
// FileFWVee implements EventExporter interface for .fwv files
type FileFWVee struct {
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
file io.WriteCloser
sync.Mutex
slicePreparing
@@ -59,7 +59,7 @@ type FileFWVee struct {
func (fFwv *FileFWVee) init() (err error) {
filePath := path.Join(fFwv.Cfg().ExportPath,
fFwv.Cfg().ID+utils.Underline+utils.UUIDSha1Prefix()+utils.FWVSuffix)
fFwv.dc.Set([]string{utils.ExportPath}, filePath)
fFwv.em.Set([]string{utils.ExportPath}, filePath)
// create the file
if fFwv.file, err = os.Create(filePath); err != nil {
return
@@ -73,7 +73,7 @@ func (fFwv *FileFWVee) composeHeader() (err error) {
return
}
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(utils.MetaHdr, fFwv.Cfg().HeaderFields(), fFwv.dc, fFwv.cgrCfg, fFwv.filterS); err != nil {
if exp, err = composeHeaderTrailer(utils.MetaHdr, fFwv.Cfg().HeaderFields(), fFwv.em, fFwv.cgrCfg, fFwv.filterS); err != nil {
return
}
for _, record := range exp.OrderedFieldsAsStrings() {
@@ -91,7 +91,7 @@ func (fFwv *FileFWVee) composeTrailer() (err error) {
return
}
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(utils.MetaTrl, fFwv.Cfg().TrailerFields(), fFwv.dc, fFwv.cgrCfg, fFwv.filterS); err != nil {
if exp, err = composeHeaderTrailer(utils.MetaTrl, fFwv.Cfg().TrailerFields(), fFwv.em, fFwv.cgrCfg, fFwv.filterS); err != nil {
return
}
for _, record := range exp.OrderedFieldsAsStrings() {
@@ -134,4 +134,4 @@ func (fFwv *FileFWVee) Close() (err error) {
return
}
func (fFwv *FileFWVee) GetMetrics() *utils.ExporterMetrics { return fFwv.dc }
func (fFwv *FileFWVee) GetMetrics() *utils.ExporterMetrics { return fFwv.em }

View File

@@ -164,11 +164,11 @@ func TestFileFwvInit(t *testing.T) {
if err := os.MkdirAll("/tmp/TestInitFileFWV", 0666); err != nil {
t.Error(err)
}
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
fFwv := &FileFWVee{
cgrCfg: cgrCfg,
cfg: cgrCfg.EEsCfg().Exporters[0],
dc: dc,
em: em,
}
if err := fFwv.init(); err != nil {
t.Error(err)

View File

@@ -32,11 +32,11 @@ import (
)
func TestFileFwvGetMetrics(t *testing.T) {
dc := utils.NewExporterMetrics("", time.Local)
fFwv := &FileFWVee{dc: dc}
em := utils.NewExporterMetrics("", time.Local)
fFwv := &FileFWVee{em: em}
if rcv := fFwv.GetMetrics(); !reflect.DeepEqual(rcv, fFwv.dc) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(fFwv.dc))
if rcv := fFwv.GetMetrics(); !reflect.DeepEqual(rcv, fFwv.em) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(fFwv.em))
}
}
@@ -52,7 +52,7 @@ func TestFileFwvComposeHeader(t *testing.T) {
cgrCfg: cfg,
filterS: filterS,
file: nopCloser{byteBuff},
dc: &utils.ExporterMetrics{},
em: &utils.ExporterMetrics{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
{
@@ -114,7 +114,7 @@ func TestFileFwvComposeTrailer(t *testing.T) {
cgrCfg: cfg,
filterS: filterS,
file: nopCloser{byteBuff},
dc: &utils.ExporterMetrics{},
em: &utils.ExporterMetrics{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
{
@@ -171,13 +171,13 @@ func TestFileFwvExportEvent(t *testing.T) {
filterS := engine.NewFilterS(cfg, nil, newDM)
byteBuff := new(bytes.Buffer)
csvNW := csv.NewWriter(byteBuff)
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
fFwv := &FileFWVee{
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
file: nopCloser{byteBuff},
dc: dc,
em: em,
}
if err := fFwv.ExportEvent([]string{"value", "3"}, ""); err != nil {
t.Error(err)
@@ -204,13 +204,13 @@ func TestFileFwvExportEventWriteError(t *testing.T) {
newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, newDM)
byteBuff := new(bytes.Buffer)
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
fFwv := &FileFWVee{
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
file: nopCloserWrite{byteBuff},
dc: dc,
em: em,
}
if err := fFwv.ExportEvent([]string{""}, ""); err == nil || err != utils.ErrNotImplemented {
t.Errorf("Expected %q but received %q", utils.ErrNotImplemented, err)
@@ -228,7 +228,7 @@ func TestFileFwvComposeHeaderWriteError(t *testing.T) {
cgrCfg: cfg,
filterS: filterS,
file: nopCloserWrite{byteBuff},
dc: &utils.ExporterMetrics{},
em: &utils.ExporterMetrics{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
{
@@ -260,7 +260,7 @@ func TestFileFwvComposeTrailerWriteError(t *testing.T) {
cgrCfg: cfg,
filterS: filterS,
file: nopCloserWrite{byteBuff},
dc: &utils.ExporterMetrics{},
em: &utils.ExporterMetrics{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
{
@@ -291,7 +291,7 @@ func TestFileFwvOnEvictedTrailer(t *testing.T) {
cgrCfg: cfg,
filterS: filterS,
file: nopCloserWrite{byteBuff},
dc: &utils.ExporterMetrics{},
em: &utils.ExporterMetrics{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
{
@@ -329,7 +329,7 @@ func TestFileFwvOnEvictedClose(t *testing.T) {
cgrCfg: cfg,
filterS: filterS,
file: nopCloserError{byteBuff},
dc: &utils.ExporterMetrics{},
em: &utils.ExporterMetrics{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
{

View File

@@ -33,10 +33,10 @@ import (
)
func NewHTTPjsonMapEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS,
dc *utils.ExporterMetrics) (pstrJSON *HTTPjsonMapEE, err error) {
em *utils.ExporterMetrics) (pstrJSON *HTTPjsonMapEE, err error) {
pstrJSON = &HTTPjsonMapEE{
cfg: cfg,
dc: dc,
em: em,
client: &http.Client{Transport: engine.GetHTTPPstrTransport(), Timeout: cgrCfg.GeneralCfg().ReplyTimeout},
reqs: newConcReq(cfg.ConcurrentRequests),
}
@@ -47,7 +47,7 @@ func NewHTTPjsonMapEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, fi
// HTTPjsonMapEE implements EventExporter interface for .csv files
type HTTPjsonMapEE struct {
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
client *http.Client
reqs *concReq
@@ -61,7 +61,7 @@ func (httpEE *HTTPjsonMapEE) composeHeader(cgrCfg *config.CGRConfig, filterS *en
return
}
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(utils.MetaHdr, httpEE.Cfg().HeaderFields(), httpEE.dc, cgrCfg, filterS); err != nil {
if exp, err = composeHeaderTrailer(utils.MetaHdr, httpEE.Cfg().HeaderFields(), httpEE.em, cgrCfg, filterS); err != nil {
return
}
for el := exp.GetFirstElement(); el != nil; el = el.Next() {
@@ -91,7 +91,7 @@ func (httpEE *HTTPjsonMapEE) ExportEvent(content any, _ string) (err error) {
func (httpEE *HTTPjsonMapEE) Close() (_ error) { return }
func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.ExporterMetrics { return httpEE.dc }
func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.ExporterMetrics { return httpEE.em }
func (httpEE *HTTPjsonMapEE) PrepareMap(mp *utils.CGREvent) (any, error) {
body, err := json.Marshal(mp.Event)

View File

@@ -32,13 +32,13 @@ import (
)
func TestHttpJsonMapGetMetrics(t *testing.T) {
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
httpEE := &HTTPjsonMapEE{
dc: dc,
em: em,
}
if rcv := httpEE.GetMetrics(); !reflect.DeepEqual(rcv, httpEE.dc) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(httpEE.dc))
if rcv := httpEE.GetMetrics(); !reflect.DeepEqual(rcv, httpEE.em) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(httpEE.em))
}
}

View File

@@ -29,10 +29,10 @@ import (
)
func NewHTTPPostEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS,
dc *utils.ExporterMetrics) (httpPost *HTTPPostEE, err error) {
em *utils.ExporterMetrics) (httpPost *HTTPPostEE, err error) {
httpPost = &HTTPPostEE{
cfg: cfg,
dc: dc,
em: em,
client: &http.Client{Transport: engine.GetHTTPPstrTransport(), Timeout: cgrCfg.GeneralCfg().ReplyTimeout},
reqs: newConcReq(cfg.ConcurrentRequests),
}
@@ -43,7 +43,7 @@ func NewHTTPPostEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filte
// FileCSVee implements EventExporter interface for .csv files
type HTTPPostEE struct {
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
client *http.Client
reqs *concReq
@@ -62,7 +62,7 @@ func (httpPost *HTTPPostEE) composeHeader(cgrCfg *config.CGRConfig, filterS *eng
return
}
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(utils.MetaHdr, httpPost.Cfg().HeaderFields(), httpPost.dc, cgrCfg, filterS); err != nil {
if exp, err = composeHeaderTrailer(utils.MetaHdr, httpPost.Cfg().HeaderFields(), httpPost.em, cgrCfg, filterS); err != nil {
return
}
for el := exp.GetFirstElement(); el != nil; el = el.Next() {
@@ -92,7 +92,7 @@ func (httpPost *HTTPPostEE) ExportEvent(content any, _ string) (err error) {
func (httpPost *HTTPPostEE) Close() (_ error) { return }
func (httpPost *HTTPPostEE) GetMetrics() *utils.ExporterMetrics { return httpPost.dc }
func (httpPost *HTTPPostEE) GetMetrics() *utils.ExporterMetrics { return httpPost.em }
func (httpPost *HTTPPostEE) PrepareMap(mp *utils.CGREvent) (any, error) {
urlVals := url.Values{}

View File

@@ -33,13 +33,13 @@ import (
)
func TestHttpPostGetMetrics(t *testing.T) {
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
httpPost := &HTTPPostEE{
dc: dc,
em: em,
}
if rcv := httpPost.GetMetrics(); !reflect.DeepEqual(rcv, httpPost.dc) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(httpPost.dc))
if rcv := httpPost.GetMetrics(); !reflect.DeepEqual(rcv, httpPost.em) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(httpPost.em))
}
}

View File

@@ -30,10 +30,10 @@ import (
)
// NewKafkaEE creates a kafka poster
func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) (*KafkaEE, error) {
func NewKafkaEE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) (*KafkaEE, error) {
pstr := &KafkaEE{
cfg: cfg,
dc: dc,
em: em,
reqs: newConcReq(cfg.ConcurrentRequests),
}
@@ -99,7 +99,7 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) (*Kafka
type KafkaEE struct {
writer *kafka.Writer
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
reqs *concReq
bytePreparing
}
@@ -130,4 +130,4 @@ func (k *KafkaEE) Close() error {
return k.writer.Close()
}
func (k *KafkaEE) GetMetrics() *utils.ExporterMetrics { return k.dc }
func (k *KafkaEE) GetMetrics() *utils.ExporterMetrics { return k.em }

View File

@@ -29,7 +29,7 @@ func TestKafkaEEConnect(t *testing.T) {
kafkaEE := &KafkaEE{
writer: nil,
cfg: &config.EventExporterCfg{},
dc: &utils.ExporterMetrics{},
em: &utils.ExporterMetrics{},
reqs: &concReq{},
}
err := kafkaEE.Connect()
@@ -52,7 +52,7 @@ func TestKafkaEE_Cfg(t *testing.T) {
func TestKafkaEEGetMetrics(t *testing.T) {
safeMapStorage := &utils.ExporterMetrics{}
kafkaEE := &KafkaEE{
dc: safeMapStorage,
em: safeMapStorage,
}
result := kafkaEE.GetMetrics()
if result != safeMapStorage {

View File

@@ -26,17 +26,17 @@ import (
"github.com/cgrates/cgrates/utils"
)
func NewLogEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *LogEE {
func NewLogEE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *LogEE {
return &LogEE{
cfg: cfg,
dc: dc,
em: em,
}
}
// LogEE implements EventExporter interface for .csv files
type LogEE struct {
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
}
func (vEe *LogEE) Cfg() *config.EventExporterCfg { return vEe.cfg }
@@ -48,7 +48,7 @@ func (vEe *LogEE) ExportEvent(mp any, _ string) error {
return nil
}
func (vEe *LogEE) Close() error { return nil }
func (vEe *LogEE) GetMetrics() *utils.ExporterMetrics { return vEe.dc }
func (vEe *LogEE) GetMetrics() *utils.ExporterMetrics { return vEe.em }
func (vEe *LogEE) PrepareMap(mp *utils.CGREvent) (any, error) {
return mp.Event, nil
}

View File

@@ -77,7 +77,7 @@ func TestLogEE_GetMetrics(t *testing.T) {
mockMetrics := &utils.ExporterMetrics{}
vEe := &LogEE{
dc: mockMetrics,
em: mockMetrics,
}
result := vEe.GetMetrics()
@@ -104,9 +104,9 @@ func TestLogEE_PrepareMap(t *testing.T) {
func TestNewLogEE(t *testing.T) {
cfg := &config.EventExporterCfg{}
dc := &utils.ExporterMetrics{}
em := &utils.ExporterMetrics{}
logEE := NewLogEE(cfg, dc)
logEE := NewLogEE(cfg, em)
if logEE == nil {
t.Fatal("NewLogEE returned nil")
@@ -116,7 +116,7 @@ func TestNewLogEE(t *testing.T) {
t.Errorf("Expected cfg to be %v, but got %v", cfg, logEE.cfg)
}
if logEE.dc != dc {
t.Errorf("Expected dc to be %v, but got %v", dc, logEE.dc)
if logEE.em != em {
t.Errorf("Expected em to be %v, but got %v", em, logEE.em)
}
}

View File

@@ -34,10 +34,10 @@ import (
)
// NewNatsEE creates a kafka poster
func NewNatsEE(cfg *config.EventExporterCfg, nodeID string, connTimeout time.Duration, dc *utils.ExporterMetrics) (natsPstr *NatsEE, err error) {
func NewNatsEE(cfg *config.EventExporterCfg, nodeID string, connTimeout time.Duration, em *utils.ExporterMetrics) (natsPstr *NatsEE, err error) {
natsPstr = &NatsEE{
cfg: cfg,
dc: dc,
em: em,
subject: utils.DefaultQueueID,
reqs: newConcReq(cfg.ConcurrentRequests),
}
@@ -55,7 +55,7 @@ type NatsEE struct {
posterJS jetstream.JetStream
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
reqs *concReq
sync.RWMutex // protect writer
bytePreparing
@@ -140,7 +140,7 @@ func (pstr *NatsEE) Close() error {
return err
}
func (pstr *NatsEE) GetMetrics() *utils.ExporterMetrics { return pstr.dc }
func (pstr *NatsEE) GetMetrics() *utils.ExporterMetrics { return pstr.em }
func GetNatsOpts(opts *config.NATSOpts, nodeID string, connTimeout time.Duration) ([]nats.Option, error) {
natsOpts := make([]nats.Option, 0, 7)

View File

@@ -43,11 +43,11 @@ func TestNewNatsEE(t *testing.T) {
}
nodeID := "node_id1"
connTimeout := 2 * time.Second
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
exp := new(NatsEE)
exp.cfg = cfg
exp.dc = dc
exp.em = em
exp.subject = utils.DefaultQueueID
exp.reqs = newConcReq(cfg.ConcurrentRequests)
// err = exp.parseOpt(cfg.Opts, nodeID, connTimeout)
@@ -55,7 +55,7 @@ func TestNewNatsEE(t *testing.T) {
// t.Error(err)
// }
rcv, err := NewNatsEE(cfg, nodeID, connTimeout, dc)
rcv, err := NewNatsEE(cfg, nodeID, connTimeout, em)
if err != nil {
t.Error(err)
}
@@ -87,8 +87,8 @@ func TestParseOpt(t *testing.T) {
opts := &config.EventExporterOpts{}
nodeID := "node_id1"
connTimeout := 2 * time.Second
dc := utils.NewExporterMetrics("", time.Local)
pstr, err := NewNatsEE(cfg, nodeID, connTimeout, dc)
em := utils.NewExporterMetrics("", time.Local)
pstr, err := NewNatsEE(cfg, nodeID, connTimeout, em)
if err != nil {
t.Error(err)
}
@@ -121,8 +121,8 @@ func TestParseOptJetStream(t *testing.T) {
}
nodeID := "node_id1"
connTimeout := 2 * time.Second
dc := utils.NewExporterMetrics("", time.Local)
pstr, err := NewNatsEE(cfg, nodeID, connTimeout, dc)
em := utils.NewExporterMetrics("", time.Local)
pstr, err := NewNatsEE(cfg, nodeID, connTimeout, em)
if err != nil {
t.Error(err)
}
@@ -158,8 +158,8 @@ func TestParseOptSubject(t *testing.T) {
}}
nodeID := "node_id1"
connTimeout := 2 * time.Second
dc := utils.NewExporterMetrics("", time.Local)
pstr, err := NewNatsEE(cfg, nodeID, connTimeout, dc)
em := utils.NewExporterMetrics("", time.Local)
pstr, err := NewNatsEE(cfg, nodeID, connTimeout, em)
if err != nil {
t.Error(err)
}
@@ -242,7 +242,7 @@ func TestNatsEECfg(t *testing.T) {
func TestNatsEEGetMetrics(t *testing.T) {
expectedMetrics := &utils.ExporterMetrics{}
pstr := &NatsEE{
dc: expectedMetrics,
em: expectedMetrics,
}
actualMetrics := pstr.GetMetrics()
if actualMetrics != expectedMetrics {

View File

@@ -29,11 +29,11 @@ import (
"github.com/cgrates/cgrates/utils"
)
func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics,
func NewRpcEE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics,
connMgr *engine.ConnManager) (e *RPCee, err error) {
e = &RPCee{
cfg: cfg,
dc: dc,
em: em,
connMgr: connMgr,
}
err = e.parseOpts()
@@ -42,7 +42,7 @@ func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics,
type RPCee struct {
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
connMgr *engine.ConnManager
//opts
@@ -82,7 +82,7 @@ func (e *RPCee) Close() (err error) {
}
func (e *RPCee) GetMetrics() (mp *utils.ExporterMetrics) {
return e.dc
return e.em
}
func (e *RPCee) PrepareMap(mp *utils.CGREvent) (any, error) {

View File

@@ -31,17 +31,17 @@ import (
func TestNewRpcEE(t *testing.T) {
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
connMgr := engine.NewConnManager(config.NewDefaultCGRConfig(), make(map[string]chan birpc.ClientConnector))
rcv, err := NewRpcEE(eeSCfg, dc, connMgr)
rcv, err := NewRpcEE(eeSCfg, em, connMgr)
if err != nil {
t.Error(err)
}
exp := &RPCee{
cfg: eeSCfg,
dc: dc,
em: em,
connMgr: connMgr,
}
@@ -101,9 +101,9 @@ func TestRPCCfg(t *testing.T) {
func TestRPCConnect(t *testing.T) {
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
connMgr := engine.NewConnManager(config.NewDefaultCGRConfig(), make(map[string]chan birpc.ClientConnector))
rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr)
rpcEe, err := NewRpcEE(eeSCfg, em, connMgr)
if err != nil {
t.Error(err)
}
@@ -114,9 +114,9 @@ func TestRPCConnect(t *testing.T) {
func TestRPCClose(t *testing.T) {
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
connMgr := engine.NewConnManager(config.NewDefaultCGRConfig(), make(map[string]chan birpc.ClientConnector))
rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr)
rpcEe, err := NewRpcEE(eeSCfg, em, connMgr)
if err != nil {
t.Error(err)
}
@@ -130,28 +130,28 @@ func TestRPCClose(t *testing.T) {
func TestRPCGetMetrics(t *testing.T) {
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
dc := &utils.ExporterMetrics{
em := &utils.ExporterMetrics{
MapStorage: utils.MapStorage{
"time": "now",
"just_a_field": "just_a_value",
},
}
connMgr := engine.NewConnManager(config.NewDefaultCGRConfig(), make(map[string]chan birpc.ClientConnector))
rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr)
rpcEe, err := NewRpcEE(eeSCfg, em, connMgr)
if err != nil {
t.Error(err)
}
if rcv := rpcEe.GetMetrics(); !reflect.DeepEqual(rcv, dc) {
t.Errorf("Expected %v \n but received \n %v", dc, rcv)
if rcv := rpcEe.GetMetrics(); !reflect.DeepEqual(rcv, em) {
t.Errorf("Expected %v \n but received \n %v", em, rcv)
}
}
func TestRPCPrepareMap(t *testing.T) {
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
connMgr := engine.NewConnManager(config.NewDefaultCGRConfig(), make(map[string]chan birpc.ClientConnector))
rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr)
rpcEe, err := NewRpcEE(eeSCfg, em, connMgr)
if err != nil {
t.Error(err)
}

View File

@@ -32,10 +32,10 @@ import (
)
// NewS3EE creates a s3 poster
func NewS3EE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *S3EE {
func NewS3EE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *S3EE {
pstr := &S3EE{
cfg: cfg,
dc: dc,
em: em,
reqs: newConcReq(cfg.ConcurrentRequests),
}
pstr.parseOpts(cfg.Opts)
@@ -54,7 +54,7 @@ type S3EE struct {
up *s3manager.Uploader
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
reqs *concReq
sync.RWMutex // protect connection
bytePreparing
@@ -137,4 +137,4 @@ func (pstr *S3EE) ExportEvent(message any, key string) (err error) {
func (pstr *S3EE) Close() (_ error) { return }
func (pstr *S3EE) GetMetrics() *utils.ExporterMetrics { return pstr.dc }
func (pstr *S3EE) GetMetrics() *utils.ExporterMetrics { return pstr.em }

View File

@@ -26,17 +26,17 @@ import (
)
func TestS3GetMetrics(t *testing.T) {
safeMapStorage := &utils.ExporterMetrics{}
em := &utils.ExporterMetrics{}
pstr := &S3EE{
dc: safeMapStorage,
em: em,
}
result := pstr.GetMetrics()
if result == nil {
t.Errorf("GetMetrics() returned nil; expected a non-nil SafeMapStorage")
return
}
if result != safeMapStorage {
t.Errorf("GetMetrics() returned unexpected result; got %v, want %v", result, safeMapStorage)
if result != em {
t.Errorf("GetMetrics() returned unexpected result; got %v, want %v", result, em)
}
}

View File

@@ -35,10 +35,10 @@ import (
)
func NewSQLEe(cfg *config.EventExporterCfg,
dc *utils.ExporterMetrics) (sqlEe *SQLEe, err error) {
em *utils.ExporterMetrics) (sqlEe *SQLEe, err error) {
sqlEe = &SQLEe{
cfg: cfg,
dc: dc,
em: em,
reqs: newConcReq(cfg.ConcurrentRequests),
}
err = sqlEe.initDialector()
@@ -48,7 +48,7 @@ func NewSQLEe(cfg *config.EventExporterCfg,
// SQLEe implements EventExporter interface for SQL
type SQLEe struct {
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
db *gorm.DB
sqldb *sql.DB
reqs *concReq
@@ -156,7 +156,7 @@ func (sqlEe *SQLEe) Close() (err error) {
return
}
func (sqlEe *SQLEe) GetMetrics() *utils.ExporterMetrics { return sqlEe.dc }
func (sqlEe *SQLEe) GetMetrics() *utils.ExporterMetrics { return sqlEe.em }
// Create the sqlPosterRequest used to instert the map into the table
func (sqlEe *SQLEe) PrepareMap(cgrEv *utils.CGREvent) (any, error) {

View File

@@ -33,12 +33,12 @@ import (
)
func TestSqlGetMetrics(t *testing.T) {
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
sqlEe := &SQLEe{
dc: dc,
em: em,
}
if rcv := sqlEe.GetMetrics(); !reflect.DeepEqual(rcv, sqlEe.dc) {
t.Errorf("Expected %+v but got %+v", utils.ToJSON(rcv), utils.ToJSON(sqlEe.dc))
if rcv := sqlEe.GetMetrics(); !reflect.DeepEqual(rcv, sqlEe.em) {
t.Errorf("Expected %+v but got %+v", utils.ToJSON(rcv), utils.ToJSON(sqlEe.em))
}
}

View File

@@ -31,10 +31,10 @@ import (
)
// NewSQSee creates a poster for sqs
func NewSQSee(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *SQSee {
func NewSQSee(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *SQSee {
pstr := &SQSee{
cfg: cfg,
dc: dc,
em: em,
reqs: newConcReq(cfg.ConcurrentRequests),
}
pstr.parseOpts(cfg.Opts)
@@ -53,7 +53,7 @@ type SQSee struct {
svc *sqs.SQS
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
reqs *concReq
sync.RWMutex // protect connection
bytePreparing
@@ -147,4 +147,4 @@ func (pstr *SQSee) ExportEvent(message any, _ string) (err error) {
func (pstr *SQSee) Close() (_ error) { return }
func (pstr *SQSee) GetMetrics() *utils.ExporterMetrics { return pstr.dc }
func (pstr *SQSee) GetMetrics() *utils.ExporterMetrics { return pstr.em }

View File

@@ -26,11 +26,11 @@ import (
)
func TestGetMetricsT(t *testing.T) {
metrics := &utils.ExporterMetrics{}
pstr := &SQSee{dc: metrics}
em := &utils.ExporterMetrics{}
pstr := &SQSee{em: em}
result := pstr.GetMetrics()
if result != metrics {
t.Errorf("Expected %v, but got %v", metrics, result)
if result != em {
t.Errorf("Expected %v, but got %v", em, result)
}
}

View File

@@ -26,17 +26,17 @@ import (
"github.com/cgrates/cgrates/utils"
)
func NewVirtualEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *VirtualEE {
func NewVirtualEE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *VirtualEE {
return &VirtualEE{
cfg: cfg,
dc: dc,
em: em,
}
}
// VirtualEE implements EventExporter interface for .csv files
type VirtualEE struct {
cfg *config.EventExporterCfg
dc *utils.ExporterMetrics
em *utils.ExporterMetrics
}
func (vEe *VirtualEE) Cfg() *config.EventExporterCfg { return vEe.cfg }
@@ -50,7 +50,7 @@ func (vEe *VirtualEE) ExportEvent(payload any, _ string) error {
}
func (vEe *VirtualEE) Close() error { return nil }
func (vEe *VirtualEE) GetMetrics() *utils.ExporterMetrics { return vEe.dc }
func (vEe *VirtualEE) GetMetrics() *utils.ExporterMetrics { return vEe.em }
func (vEe *VirtualEE) PrepareMap(cgrEv *utils.CGREvent) (any, error) {
return cgrEv.Event, nil

View File

@@ -29,13 +29,13 @@ import (
)
func TestVirtualEeGetMetrics(t *testing.T) {
dc := utils.NewExporterMetrics("", time.Local)
em := utils.NewExporterMetrics("", time.Local)
vEe := &VirtualEE{
dc: dc,
em: em,
}
if rcv := vEe.GetMetrics(); !reflect.DeepEqual(rcv, vEe.dc) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(vEe.dc))
if rcv := vEe.GetMetrics(); !reflect.DeepEqual(rcv, vEe.em) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(vEe.em))
}
}
func TestVirtualEeExportEvent(t *testing.T) {

View File

@@ -113,7 +113,7 @@ func TestCdrLogEes(t *testing.T) {
"field_separator": ",",
"fields":[
{"tag": "Number", "path": "*exp.Number", "type": "*variable", "value": "~*dc.NumberOfEvents"},
{"tag": "Number", "path": "*exp.Number", "type": "*variable", "value": "~*em.NumberOfEvents"},
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"},
{"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"},
{"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"},
@@ -132,10 +132,10 @@ func TestCdrLogEes(t *testing.T) {
{"tag": "RatingPlan", "path": "*exp.RatingPlan", "type": "*variable", "value": "~*ec.Charges[0].Rating.RatingFilter.RatingPlanID"},
{"tag": "RatingPlanSubject", "path": "*exp.RatingPlanSubject", "type": "*variable", "value": "~*ec.Charges[0].Rating.RatingFilter.Subject"},
{"tag": "NumberOfEvents", "path": "*trl.NumberOfEvents", "type": "*variable", "value": "~*dc.NumberOfEvents"},
{"tag": "TotalDuration", "path": "*trl.TotalDuration", "type": "*variable", "value": "~*dc.TotalDuration"},
{"tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*dc.TotalSMSUsage"},
{"tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*dc.TotalCost{*round:4}"},
{"tag": "NumberOfEvents", "path": "*trl.NumberOfEvents", "type": "*variable", "value": "~*em.NumberOfEvents"},
{"tag": "TotalDuration", "path": "*trl.TotalDuration", "type": "*variable", "value": "~*em.TotalDuration"},
{"tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*em.TotalSMSUsage"},
{"tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*em.TotalCost{*round:4}"},
],
},

View File

@@ -422,7 +422,7 @@ const (
MetaDispatcherHosts = "*dispatcher_hosts"
MetaFilters = "*filters"
MetaCDRs = "*cdrs"
MetaDC = "*dc"
MetaEM = "*em"
MetaCaches = "*caches"
MetaUCH = "*uch"
MetaGuardian = "*guardians"