diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json index caaa35206..97e3e6af1 100644 --- a/data/conf/samples/ees/cgrates.json +++ b/data/conf/samples/ees/cgrates.json @@ -94,7 +94,7 @@ {"tag": "Usage", "path": "*hdr.Usage", "type": "*constant", "value": "Usage"}, {"tag": "Cost", "path": "*hdr.Cost", "type": "*constant", "value": "Cost"}, - {"tag": "Number", "path": "*exp.Number", "type": "*variable", "value": "~*dc.NumberOfEvents"}, + {"tag": "Number", "path": "*exp.Number", "type": "*variable", "value": "~*em.NumberOfEvents"}, {"tag": "*originID", "path": "*exp.*originID", "type": "*variable", "value": "~*opts.*originID"}, {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, @@ -111,10 +111,10 @@ {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, - {"tag": "NumberOfEvents", "path": "*trl.NumberOfEvents", "type": "*variable", "value": "~*dc.NumberOfEvents"}, - {"tag": "TotalDuration", "path": "*trl.TotalDuration", "type": "*variable", "value": "~*dc.TotalDuration"}, - {"tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*dc.TotalSMSUsage"}, - {"tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*dc.TotalCost{*round:4}"}, + {"tag": "NumberOfEvents", "path": "*trl.NumberOfEvents", "type": "*variable", "value": "~*em.NumberOfEvents"}, + {"tag": "TotalDuration", "path": "*trl.TotalDuration", "type": "*variable", "value": "~*em.TotalDuration"}, + {"tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*em.TotalSMSUsage"}, + {"tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*em.TotalCost{*round:4}"}, ], }, { @@ -142,7 +142,7 @@ {"tag": "Usage", "path": "*hdr.Usage", "type": "*constant", "value": "Usage"}, {"tag": "Cost", "path": "*hdr.Cost", "type": "*constant", "value": "Cost"}, - {"tag": "Number", "path": "*exp.Number", "type": "*variable", "value": "~*dc.NumberOfEvents"}, + {"tag": "Number", "path": "*exp.Number", "type": "*variable", "value": "~*em.NumberOfEvents"}, {"tag": "*originID", "path": "*exp.*originID", "type": "*variable", "value": "~*opts.*originID"}, {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, {"tag": "ToR", "path": "*exp.ToR", "type": "*constant", "value": ""}, @@ -162,10 +162,10 @@ {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, - {"tag": "NumberOfEvents", "path": "*trl.NumberOfEvents", "type": "*variable", "value": "~*dc.NumberOfEvents"}, - {"tag": "TotalDuration", "path": "*trl.TotalDuration", "type": "*variable", "value": "~*dc.TotalDuration"}, - {"tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*dc.TotalSMSUsage"}, - {"tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*dc.TotalCost{*round:4}"}, + {"tag": "NumberOfEvents", "path": "*trl.NumberOfEvents", "type": "*variable", "value": "~*em.NumberOfEvents"}, + {"tag": "TotalDuration", "path": "*trl.TotalDuration", "type": "*variable", "value": "~*em.TotalDuration"}, + {"tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*em.TotalSMSUsage"}, + {"tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*em.TotalCost{*round:4}"}, ], }, { @@ -182,7 +182,7 @@ {"tag": "DistributorCode", "path": "*hdr.DistributorCode", "type": "*constant", "value": "VOI","width": 3}, {"tag": "FileCreationTime", "path": "*hdr.FileCreationTime", - "type": "*variable","value":"~*dc.TimeNow{*time_string:020106150400}", + "type": "*variable","value":"~*em.TimeNow{*time_string:020106150400}", "width": 12 }, {"tag": "FileVersion", "path": "*hdr.FileVersion", "type": "*constant", "value": "01","width": 2}, @@ -230,13 +230,13 @@ {"tag": "DistributorCode", "path": "*trl.DistributorCode", "type": "*constant", "value": "VOI","width": 3}, {"tag": "NumberOfRecords", "path": "*trl.NumberOfRecords", - "type": "*variable", "value": "~*dc.NumberOfEvents","width": 6,"padding": "*zeroleft"}, + "type": "*variable", "value": "~*em.NumberOfEvents","width": 6,"padding": "*zeroleft"}, {"tag": "CdrsDuration", "path": "*trl.CdrsDuration", "type": "*variable", - "value": "~*dc.TotalDuration","width": 8,"padding":"*zeroleft","layout": "seconds"}, + "value": "~*em.TotalDuration","width": 8,"padding":"*zeroleft","layout": "seconds"}, {"tag": "FirstCdrTime", "path": "*trl.FirstCdrTime", "type": "*variable", - "value": "~*dc.FirstEventATime{*time_string:020106150400}", "width": 12}, + "value": "~*em.FirstEventATime{*time_string:020106150400}", "width": 12}, {"tag": "LastCdrTime", "path": "*hdr.LastCdrTime", "type": "*variable", - "value": "~*dc.LastEventATime{*time_string:020106150400}", "width": 12,}, + "value": "~*em.LastEventATime{*time_string:020106150400}", "width": 12,}, {"tag": "Filler2", "path": "*trl.Filler2", "type": "*filler", "width": 93} ], diff --git a/ees/amqp.go b/ees/amqp.go index b09e2761a..3fe0d83b1 100644 --- a/ees/amqp.go +++ b/ees/amqp.go @@ -30,10 +30,10 @@ import ( // NewAMQPee creates a new amqp poster // "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs" -func NewAMQPee(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *AMQPee { +func NewAMQPee(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *AMQPee { amqp := &AMQPee{ cfg: cfg, - dc: dc, + em: em, reqs: newConcReq(cfg.ConcurrentRequests), } amqp.parseOpts(cfg.Opts) @@ -50,7 +50,7 @@ type AMQPee struct { postChan *amqp.Channel cfg *config.EventExporterCfg - dc *utils.ExporterMetrics + em *utils.ExporterMetrics reqs *concReq sync.RWMutex // protect connection bytePreparing @@ -175,6 +175,6 @@ func (pstr *AMQPee) Close() (err error) { return } -func (pstr *AMQPee) GetMetrics() *utils.ExporterMetrics { return pstr.dc } +func (pstr *AMQPee) GetMetrics() *utils.ExporterMetrics { return pstr.em } func (pstr *AMQPee) ExtraData(*utils.CGREvent) any { return nil } diff --git a/ees/amqp_test.go b/ees/amqp_test.go index 5e03541db..10ab7c203 100644 --- a/ees/amqp_test.go +++ b/ees/amqp_test.go @@ -28,7 +28,7 @@ import ( func TestNewAMQPee(t *testing.T) { cfg := config.NewDefaultCGRConfig() - dc := &utils.ExporterMetrics{ + em := &utils.ExporterMetrics{ MapStorage: utils.MapStorage{ utils.NumberOfEvents: int64(0), utils.PositiveExports: utils.StringSet{}, @@ -36,10 +36,10 @@ func TestNewAMQPee(t *testing.T) { }, } cfg.EEsCfg().ExporterCfg(utils.MetaDefault).ConcurrentRequests = 2 - rcv := NewAMQPee(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), dc) + rcv := NewAMQPee(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), em) exp := &AMQPee{ cfg: cfg.EEsCfg().ExporterCfg(utils.MetaDefault), - dc: dc, + em: em, reqs: newConcReq(cfg.EEsCfg().ExporterCfg(utils.MetaDefault).ConcurrentRequests), } rcv.reqs = nil @@ -49,24 +49,3 @@ func TestNewAMQPee(t *testing.T) { t.Errorf("Expected %v\n but received %v", exp, rcv) } } - -// func TestAMQPExportEvent(t *testing.T) { -// cfg := config.NewDefaultCGRConfig() -// dc := &utils.SafeMapStorage{ -// MapStorage: utils.MapStorage{ -// utils.NumberOfEvents: int64(0), -// utils.PositiveExports: utils.StringSet{}, -// utils.NegativeExports: 5, -// }} -// // cfg.EEsCfg().ExporterCfg(utils.MetaDefault).ConcurrentRequests = 2 -// // cfg.EEsCfg().ExporterCfg(utils.MetaDefault).Opts = &config.EventExporterOpts{ - -// // } -// pstr := NewAMQPee(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), dc) -// content := "some_content" -// pstr.postChan = -// if err := pstr.ExportEvent(context.Background(), content, ""); err != nil { -// t.Error(err) -// } - -// } diff --git a/ees/amqpv1.go b/ees/amqpv1.go index 99d24757e..617dc2614 100644 --- a/ees/amqpv1.go +++ b/ees/amqpv1.go @@ -28,10 +28,10 @@ import ( ) // NewAMQPv1EE creates a poster for amqpv1 -func NewAMQPv1EE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *AMQPv1EE { +func NewAMQPv1EE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *AMQPv1EE { pstr := &AMQPv1EE{ cfg: cfg, - dc: dc, + em: em, queueID: "/" + utils.DefaultQueueID, reqs: newConcReq(cfg.ConcurrentRequests), } @@ -54,7 +54,7 @@ type AMQPv1EE struct { session *amqpv1.Session cfg *config.EventExporterCfg - dc *utils.ExporterMetrics + em *utils.ExporterMetrics reqs *concReq sync.RWMutex // protect connection bytePreparing @@ -118,6 +118,6 @@ func (pstr *AMQPv1EE) Close() (err error) { return } -func (pstr *AMQPv1EE) GetMetrics() *utils.ExporterMetrics { return pstr.dc } +func (pstr *AMQPv1EE) GetMetrics() *utils.ExporterMetrics { return pstr.em } func (pstr *AMQPv1EE) ExtraData(*utils.CGREvent) any { return nil } diff --git a/ees/apis.go b/ees/apis.go index 8d283f41c..b4213a569 100644 --- a/ees/apis.go +++ b/ees/apis.go @@ -195,7 +195,7 @@ func exportEventWithExporter(ctx *context.Context, exp EventExporter, connMngr * expNM := utils.NewOrderedNavigableMap() err = NewExportRequest(map[string]utils.DataStorage{ utils.MetaReq: utils.MapStorage(ev.Event), - utils.MetaDC: exp.GetMetrics(), + utils.MetaEM: exp.GetMetrics(), utils.MetaOpts: utils.MapStorage(ev.APIOpts), utils.MetaCfg: cfg.GetDataProvider(), utils.MetaVars: utils.MapStorage{utils.MetaTenant: ev.Tenant, utils.MetaExporterID: ev.APIOpts[utils.MetaExporterID]}, @@ -244,7 +244,7 @@ func (eeS *EeS) V1ArchiveEventsInReply(ctx *context.Context, args *ArchiveEvents if err != nil { return err } - dc := utils.NewExporterMetrics(eeCfg.MetricsResetSchedule, loc) + em := utils.NewExporterMetrics(eeCfg.MetricsResetSchedule, loc) var ee EventExporter @@ -261,9 +261,9 @@ func (eeS *EeS) V1ArchiveEventsInReply(ctx *context.Context, args *ArchiveEvents } switch eeCfg.Type { case utils.MetaFileCSV: - ee, err = NewFileCSVee(eeCfg, eeS.cfg, eeS.fltrS, dc, &buffer{wrtr}) + ee, err = NewFileCSVee(eeCfg, eeS.cfg, eeS.fltrS, em, &buffer{wrtr}) case utils.MetaFileFWV: - ee, err = NewFileFWVee(eeCfg, eeS.cfg, eeS.fltrS, dc, &buffer{wrtr}) + ee, err = NewFileFWVee(eeCfg, eeS.cfg, eeS.fltrS, em, &buffer{wrtr}) default: err = fmt.Errorf("unsupported exporter type: %s>", eeCfg.Type) } diff --git a/ees/ee.go b/ees/ee.go index 3ac8b442b..b39d55634 100644 --- a/ees/ee.go +++ b/ees/ee.go @@ -49,40 +49,40 @@ func NewEventExporter(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, if err != nil { return nil, err } - dc := utils.NewExporterMetrics(cfg.MetricsResetSchedule, loc) + em := utils.NewExporterMetrics(cfg.MetricsResetSchedule, loc) switch cfg.Type { case utils.MetaFileCSV: - return NewFileCSVee(cfg, cgrCfg, filterS, dc, nil) + return NewFileCSVee(cfg, cgrCfg, filterS, em, nil) case utils.MetaFileFWV: - return NewFileFWVee(cfg, cgrCfg, filterS, dc, nil) + return NewFileFWVee(cfg, cgrCfg, filterS, em, nil) case utils.MetaHTTPPost: - return NewHTTPPostEE(cfg, cgrCfg, filterS, dc) + return NewHTTPPostEE(cfg, cgrCfg, filterS, em) case utils.MetaHTTPjsonMap: - return NewHTTPjsonMapEE(cfg, cgrCfg, filterS, dc) + return NewHTTPjsonMapEE(cfg, cgrCfg, filterS, em) case utils.MetaNATSJSONMap: return NewNatsEE(cfg, cgrCfg.GeneralCfg().NodeID, - cgrCfg.GeneralCfg().ConnectTimeout, dc) + cgrCfg.GeneralCfg().ConnectTimeout, em) case utils.MetaAMQPjsonMap: - return NewAMQPee(cfg, dc), nil + return NewAMQPee(cfg, em), nil case utils.MetaAMQPV1jsonMap: - return NewAMQPv1EE(cfg, dc), nil + return NewAMQPv1EE(cfg, em), nil case utils.MetaS3jsonMap: - return NewS3EE(cfg, dc), nil + return NewS3EE(cfg, em), nil case utils.MetaSQSjsonMap: - return NewSQSee(cfg, dc), nil + return NewSQSee(cfg, em), nil case utils.MetaKafkajsonMap: - return NewKafkaEE(cfg, dc) + return NewKafkaEE(cfg, em) case utils.MetaVirt: - return NewVirtualEE(cfg, dc), nil + return NewVirtualEE(cfg, em), nil case utils.MetaElastic: - return NewElasticEE(cfg, dc) + return NewElasticEE(cfg, em) case utils.MetaSQL: - return NewSQLEe(cfg, dc) + return NewSQLEe(cfg, em) case utils.MetaLog: - return NewLogEE(cfg, dc), nil + return NewLogEE(cfg, em), nil case utils.MetaRpc: - return NewRpcEE(cfg, dc, connMngr) + return NewRpcEE(cfg, em, connMngr) default: return nil, fmt.Errorf("unsupported exporter type: <%s>", cfg.Type) } @@ -116,88 +116,88 @@ func (c *concReq) done() { } // composeHeaderTrailer will return the orderNM for *hdr or *trl -func composeHeaderTrailer(ctx *context.Context, prfx string, fields []*config.FCTemplate, dc utils.DataStorage, cfg *config.CGRConfig, fltS *engine.FilterS) (r *utils.OrderedNavigableMap, err error) { +func composeHeaderTrailer(ctx *context.Context, prfx string, fields []*config.FCTemplate, em utils.DataStorage, cfg *config.CGRConfig, fltS *engine.FilterS) (r *utils.OrderedNavigableMap, err error) { r = utils.NewOrderedNavigableMap() err = NewExportRequest(map[string]utils.DataStorage{ - utils.MetaDC: dc, + utils.MetaEM: em, utils.MetaCfg: cfg.GetDataProvider(), }, cfg.GeneralCfg().DefaultTenant, fltS, map[string]*utils.OrderedNavigableMap{prfx: r}).SetFields(ctx, fields) return } -func updateEEMetrics(dc *utils.ExporterMetrics, originID string, ev engine.MapEvent, hasError bool, timezone string) { - dc.Lock() - defer dc.Unlock() +func updateEEMetrics(em *utils.ExporterMetrics, originID string, ev engine.MapEvent, hasError bool, timezone string) { + em.Lock() + defer em.Unlock() if hasError { - dc.MapStorage[utils.NegativeExports].(utils.StringSet).Add(originID) + em.MapStorage[utils.NegativeExports].(utils.StringSet).Add(originID) } else { - dc.MapStorage[utils.PositiveExports].(utils.StringSet).Add(originID) + em.MapStorage[utils.PositiveExports].(utils.StringSet).Add(originID) } if aTime, err := ev.GetTime(utils.AnswerTime, timezone); err == nil { - if _, has := dc.MapStorage[utils.FirstEventATime]; !has { - dc.MapStorage[utils.FirstEventATime] = time.Time{} + if _, has := em.MapStorage[utils.FirstEventATime]; !has { + em.MapStorage[utils.FirstEventATime] = time.Time{} } - if _, has := dc.MapStorage[utils.LastEventATime]; !has { - dc.MapStorage[utils.LastEventATime] = time.Time{} + if _, has := em.MapStorage[utils.LastEventATime]; !has { + em.MapStorage[utils.LastEventATime] = time.Time{} } - if dc.MapStorage[utils.FirstEventATime].(time.Time).IsZero() || - aTime.Before(dc.MapStorage[utils.FirstEventATime].(time.Time)) { - dc.MapStorage[utils.FirstEventATime] = aTime + if em.MapStorage[utils.FirstEventATime].(time.Time).IsZero() || + aTime.Before(em.MapStorage[utils.FirstEventATime].(time.Time)) { + em.MapStorage[utils.FirstEventATime] = aTime } - if aTime.After(dc.MapStorage[utils.LastEventATime].(time.Time)) { - dc.MapStorage[utils.LastEventATime] = aTime + if aTime.After(em.MapStorage[utils.LastEventATime].(time.Time)) { + em.MapStorage[utils.LastEventATime] = aTime } } if oID, err := ev.GetTInt64(utils.OrderID); err == nil { - if _, has := dc.MapStorage[utils.FirstExpOrderID]; !has { - dc.MapStorage[utils.FirstExpOrderID] = int64(0) + if _, has := em.MapStorage[utils.FirstExpOrderID]; !has { + em.MapStorage[utils.FirstExpOrderID] = int64(0) } - if _, has := dc.MapStorage[utils.LastExpOrderID]; !has { - dc.MapStorage[utils.LastExpOrderID] = int64(0) + if _, has := em.MapStorage[utils.LastExpOrderID]; !has { + em.MapStorage[utils.LastExpOrderID] = int64(0) } - if dc.MapStorage[utils.FirstExpOrderID].(int64) == 0 || - dc.MapStorage[utils.FirstExpOrderID].(int64) > oID { - dc.MapStorage[utils.FirstExpOrderID] = oID + if em.MapStorage[utils.FirstExpOrderID].(int64) == 0 || + em.MapStorage[utils.FirstExpOrderID].(int64) > oID { + em.MapStorage[utils.FirstExpOrderID] = oID } - if dc.MapStorage[utils.LastExpOrderID].(int64) < oID { - dc.MapStorage[utils.LastExpOrderID] = oID + if em.MapStorage[utils.LastExpOrderID].(int64) < oID { + em.MapStorage[utils.LastExpOrderID] = oID } } if cost, err := ev.GetFloat64(utils.Cost); err == nil { - if _, has := dc.MapStorage[utils.TotalCost]; !has { - dc.MapStorage[utils.TotalCost] = float64(0.0) + if _, has := em.MapStorage[utils.TotalCost]; !has { + em.MapStorage[utils.TotalCost] = float64(0.0) } - dc.MapStorage[utils.TotalCost] = dc.MapStorage[utils.TotalCost].(float64) + cost + em.MapStorage[utils.TotalCost] = em.MapStorage[utils.TotalCost].(float64) + cost } if tor, err := ev.GetString(utils.ToR); err == nil { if usage, err := ev.GetDuration(utils.Usage); err == nil { switch tor { case utils.MetaVoice: - if _, has := dc.MapStorage[utils.TotalDuration]; !has { - dc.MapStorage[utils.TotalDuration] = time.Duration(0) + if _, has := em.MapStorage[utils.TotalDuration]; !has { + em.MapStorage[utils.TotalDuration] = time.Duration(0) } - dc.MapStorage[utils.TotalDuration] = dc.MapStorage[utils.TotalDuration].(time.Duration) + usage + em.MapStorage[utils.TotalDuration] = em.MapStorage[utils.TotalDuration].(time.Duration) + usage case utils.MetaSMS: - if _, has := dc.MapStorage[utils.TotalSMSUsage]; !has { - dc.MapStorage[utils.TotalSMSUsage] = time.Duration(0) + if _, has := em.MapStorage[utils.TotalSMSUsage]; !has { + em.MapStorage[utils.TotalSMSUsage] = time.Duration(0) } - dc.MapStorage[utils.TotalSMSUsage] = dc.MapStorage[utils.TotalSMSUsage].(time.Duration) + usage + em.MapStorage[utils.TotalSMSUsage] = em.MapStorage[utils.TotalSMSUsage].(time.Duration) + usage case utils.MetaMMS: - if _, has := dc.MapStorage[utils.TotalMMSUsage]; !has { - dc.MapStorage[utils.TotalMMSUsage] = time.Duration(0) + if _, has := em.MapStorage[utils.TotalMMSUsage]; !has { + em.MapStorage[utils.TotalMMSUsage] = time.Duration(0) } - dc.MapStorage[utils.TotalMMSUsage] = dc.MapStorage[utils.TotalMMSUsage].(time.Duration) + usage + em.MapStorage[utils.TotalMMSUsage] = em.MapStorage[utils.TotalMMSUsage].(time.Duration) + usage case utils.MetaGeneric: - if _, has := dc.MapStorage[utils.TotalGenericUsage]; !has { - dc.MapStorage[utils.TotalGenericUsage] = time.Duration(0) + if _, has := em.MapStorage[utils.TotalGenericUsage]; !has { + em.MapStorage[utils.TotalGenericUsage] = time.Duration(0) } - dc.MapStorage[utils.TotalGenericUsage] = dc.MapStorage[utils.TotalGenericUsage].(time.Duration) + usage + em.MapStorage[utils.TotalGenericUsage] = em.MapStorage[utils.TotalGenericUsage].(time.Duration) + usage case utils.MetaData: - if _, has := dc.MapStorage[utils.TotalDataUsage]; !has { - dc.MapStorage[utils.TotalDataUsage] = time.Duration(0) + if _, has := em.MapStorage[utils.TotalDataUsage]; !has { + em.MapStorage[utils.TotalDataUsage] = time.Duration(0) } - dc.MapStorage[utils.TotalDataUsage] = dc.MapStorage[utils.TotalDataUsage].(time.Duration) + usage + em.MapStorage[utils.TotalDataUsage] = em.MapStorage[utils.TotalDataUsage].(time.Duration) + usage } } } diff --git a/ees/ee_test.go b/ees/ee_test.go index 70d61676a..a9ddfa5af 100644 --- a/ees/ee_test.go +++ b/ees/ee_test.go @@ -40,8 +40,8 @@ func TestNewEventExporter(t *testing.T) { if strings.Contains(errExpect, err.Error()) { t.Errorf("Expected %+v but got %+v", errExpect, err) } - dc := utils.NewExporterMetrics("", time.Local) - eeExpect, err := NewFileCSVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc, nil) + em := utils.NewExporterMetrics("", time.Local) + eeExpect, err := NewFileCSVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, em, nil) if strings.Contains(errExpect, err.Error()) { t.Errorf("Expected %+v but got %+v", errExpect, err) } @@ -50,11 +50,11 @@ func TestNewEventExporter(t *testing.T) { t.Error("\nExpected an error") } newEE := ee.(*FileCSVee) - newEE.dc.MapStorage[utils.TimeNow] = nil - newEE.dc.MapStorage[utils.ExportPath] = nil + newEE.em.MapStorage[utils.TimeNow] = nil + newEE.em.MapStorage[utils.ExportPath] = nil eeExpect.csvWriter = nil - eeExpect.dc.MapStorage[utils.TimeNow] = nil - eeExpect.dc.MapStorage[utils.ExportPath] = nil + eeExpect.em.MapStorage[utils.TimeNow] = nil + eeExpect.em.MapStorage[utils.ExportPath] = nil if !reflect.DeepEqual(eeExpect, newEE) { t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE)) } @@ -71,8 +71,8 @@ func TestNewEventExporterCase2(t *testing.T) { t.Errorf("Expected %+v but got %+v", errExpect, err) } - dc := utils.NewExporterMetrics("", time.Local) - eeExpect, err := NewFileFWVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc, io.Discard) + em := utils.NewExporterMetrics("", time.Local) + eeExpect, err := NewFileFWVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, em, io.Discard) if strings.Contains(errExpect, err.Error()) { t.Errorf("Expected %+v but got %+v", errExpect, err) } @@ -81,10 +81,10 @@ func TestNewEventExporterCase2(t *testing.T) { t.Error("\nExpected an error") } newEE := ee.(*FileFWVee) - newEE.dc.MapStorage[utils.TimeNow] = nil - newEE.dc.MapStorage[utils.ExportPath] = nil - eeExpect.dc.MapStorage[utils.TimeNow] = nil - eeExpect.dc.MapStorage[utils.ExportPath] = nil + newEE.em.MapStorage[utils.TimeNow] = nil + newEE.em.MapStorage[utils.ExportPath] = nil + eeExpect.em.MapStorage[utils.TimeNow] = nil + eeExpect.em.MapStorage[utils.ExportPath] = nil if !reflect.DeepEqual(eeExpect, newEE) { t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE)) } @@ -99,14 +99,14 @@ func TestNewEventExporterCase3(t *testing.T) { if err != nil { t.Error(err) } - dc := utils.NewExporterMetrics("", time.Local) - eeExpect, err := NewHTTPPostEE(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc) + em := utils.NewExporterMetrics("", time.Local) + eeExpect, err := NewHTTPPostEE(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, em) if err != nil { t.Error(err) } newEE := ee.(*HTTPPostEE) - newEE.dc.MapStorage[utils.TimeNow] = nil - eeExpect.dc.MapStorage[utils.TimeNow] = nil + newEE.em.MapStorage[utils.TimeNow] = nil + eeExpect.em.MapStorage[utils.TimeNow] = nil if !reflect.DeepEqual(eeExpect, newEE) { t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE)) } @@ -121,14 +121,14 @@ func TestNewEventExporterCase4(t *testing.T) { if err != nil { t.Error(err) } - dc := utils.NewExporterMetrics("", time.Local) - eeExpect, err := NewHTTPjsonMapEE(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc) + em := utils.NewExporterMetrics("", time.Local) + eeExpect, err := NewHTTPjsonMapEE(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, em) if err != nil { t.Error(err) } newEE := ee.(*HTTPjsonMapEE) - newEE.dc.MapStorage[utils.TimeNow] = nil - eeExpect.dc.MapStorage[utils.TimeNow] = nil + newEE.em.MapStorage[utils.TimeNow] = nil + eeExpect.em.MapStorage[utils.TimeNow] = nil if !reflect.DeepEqual(eeExpect, newEE) { t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE)) } @@ -143,11 +143,11 @@ func TestNewEventExporterCase6(t *testing.T) { if err != nil { t.Error(err) } - dc := utils.NewExporterMetrics("", time.Local) - eeExpect := NewVirtualEE(cgrCfg.EEsCfg().Exporters[0], dc) + em := utils.NewExporterMetrics("", time.Local) + eeExpect := NewVirtualEE(cgrCfg.EEsCfg().Exporters[0], em) newEE := ee.(*VirtualEE) - newEE.dc.MapStorage[utils.TimeNow] = nil - eeExpect.dc.MapStorage[utils.TimeNow] = nil + newEE.em.MapStorage[utils.TimeNow] = nil + eeExpect.em.MapStorage[utils.TimeNow] = nil if !reflect.DeepEqual(eeExpect, newEE) { t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE)) } @@ -176,17 +176,17 @@ func TestNewEventExporterCase7(t *testing.T) { if err != nil { t.Error(err) } - dc := utils.NewExporterMetrics("", time.Local) + em := utils.NewExporterMetrics("", time.Local) if err != nil { t.Error(err) } - eeExpect, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc) + eeExpect, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], em) if err != nil { t.Error(err) } newEE := ee.(*ElasticEE) - newEE.dc.MapStorage[utils.TimeNow] = nil - eeExpect.dc.MapStorage[utils.TimeNow] = nil + newEE.em.MapStorage[utils.TimeNow] = nil + eeExpect.em.MapStorage[utils.TimeNow] = nil eeExpect.client = newEE.client if !reflect.DeepEqual(eeExpect, newEE) { t.Errorf("Expected %+v \n but got %+v", eeExpect, newEE) diff --git a/ees/ees_test.go b/ees/ees_test.go index 7f1390fc3..0ed977537 100644 --- a/ees/ees_test.go +++ b/ees/ees_test.go @@ -304,7 +304,7 @@ func TestV1ProcessEvent4(t *testing.T) { } func newMockEventExporter() *mockEventExporter { - return &mockEventExporter{dc: &utils.ExporterMetrics{ + return &mockEventExporter{em: &utils.ExporterMetrics{ MapStorage: utils.MapStorage{ utils.NumberOfEvents: int64(0), utils.PositiveExports: utils.StringSet{}, @@ -313,12 +313,12 @@ func newMockEventExporter() *mockEventExporter { } type mockEventExporter struct { - dc *utils.ExporterMetrics + em *utils.ExporterMetrics bytePreparing } func (m mockEventExporter) GetMetrics() *utils.ExporterMetrics { - return m.dc + return m.em } func (mockEventExporter) Cfg() *config.EventExporterCfg { return new(config.EventExporterCfg) } @@ -331,7 +331,7 @@ func (mockEventExporter) Close() error { } func TestV1ProcessEventMockMetrics(t *testing.T) { - mEe := mockEventExporter{dc: &utils.ExporterMetrics{ + mEe := mockEventExporter{em: &utils.ExporterMetrics{ MapStorage: utils.MapStorage{ utils.NumberOfEvents: int64(0), utils.PositiveExports: utils.StringSet{}, @@ -455,7 +455,7 @@ func TestOnCacheEvicted(t *testing.T) { } func TestUpdateEEMetrics(t *testing.T) { - dc := utils.NewExporterMetrics("", time.UTC) + em := utils.NewExporterMetrics("", time.UTC) tnow := time.Now() ev := engine.MapEvent{ utils.AnswerTime: tnow, @@ -471,10 +471,10 @@ func TestUpdateEEMetrics(t *testing.T) { exp.MapStorage[utils.LastExpOrderID] = int64(1) exp.MapStorage[utils.TotalCost] = float64(5.5) exp.MapStorage[utils.TotalDuration] = time.Second - exp.MapStorage[utils.TimeNow] = dc.MapStorage[utils.TimeNow] + exp.MapStorage[utils.TimeNow] = em.MapStorage[utils.TimeNow] exp.MapStorage[utils.PositiveExports] = utils.StringSet{"": {}} - if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) { - t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc)) + if updateEEMetrics(em, "", ev, false, utils.EmptyString); !reflect.DeepEqual(em, exp) { + t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(em)) } tnow = tnow.Add(24 * time.Hour) @@ -489,8 +489,8 @@ func TestUpdateEEMetrics(t *testing.T) { exp.MapStorage[utils.LastExpOrderID] = int64(2) exp.MapStorage[utils.TotalCost] = float64(11) exp.MapStorage[utils.TotalSMSUsage] = time.Second - if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) { - t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc)) + if updateEEMetrics(em, "", ev, false, utils.EmptyString); !reflect.DeepEqual(em, exp) { + t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(em)) } tnow = tnow.Add(24 * time.Hour) @@ -505,8 +505,8 @@ func TestUpdateEEMetrics(t *testing.T) { exp.MapStorage[utils.LastExpOrderID] = int64(3) exp.MapStorage[utils.TotalCost] = float64(16.5) exp.MapStorage[utils.TotalMMSUsage] = time.Second - if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) { - t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc)) + if updateEEMetrics(em, "", ev, false, utils.EmptyString); !reflect.DeepEqual(em, exp) { + t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(em)) } tnow = tnow.Add(24 * time.Hour) @@ -521,8 +521,8 @@ func TestUpdateEEMetrics(t *testing.T) { exp.MapStorage[utils.LastExpOrderID] = int64(4) exp.MapStorage[utils.TotalCost] = float64(22) exp.MapStorage[utils.TotalGenericUsage] = time.Second - if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) { - t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc)) + if updateEEMetrics(em, "", ev, false, utils.EmptyString); !reflect.DeepEqual(em, exp) { + t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(em)) } tnow = tnow.Add(24 * time.Hour) @@ -537,8 +537,8 @@ func TestUpdateEEMetrics(t *testing.T) { exp.MapStorage[utils.LastExpOrderID] = int64(5) exp.MapStorage[utils.TotalCost] = float64(27.5) exp.MapStorage[utils.TotalDataUsage] = time.Second - if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) { - t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc)) + if updateEEMetrics(em, "", ev, false, utils.EmptyString); !reflect.DeepEqual(em, exp) { + t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(em)) } } diff --git a/ees/elastic.go b/ees/elastic.go index c6708b4f1..a00d1cb64 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -39,17 +39,17 @@ import ( type ElasticEE struct { mu sync.RWMutex cfg *config.EventExporterCfg - dc *utils.ExporterMetrics + em *utils.ExporterMetrics reqs *concReq client *elasticsearch.TypedClient clientCfg elasticsearch.Config } -func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) (*ElasticEE, error) { +func NewElasticEE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) (*ElasticEE, error) { el := &ElasticEE{ cfg: cfg, - dc: dc, + em: em, reqs: newConcReq(cfg.ConcurrentRequests), } if err := el.parseClientOpts(); err != nil { @@ -221,7 +221,7 @@ func (e *ElasticEE) Close() error { return nil } -func (e *ElasticEE) GetMetrics() *utils.ExporterMetrics { return e.dc } +func (e *ElasticEE) GetMetrics() *utils.ExporterMetrics { return e.em } func (eEE *ElasticEE) ExtraData(ev *utils.CGREvent) any { return utils.ConcatenatedKey( diff --git a/ees/elastic_test.go b/ees/elastic_test.go index 739c3b865..553f947b9 100644 --- a/ees/elastic_test.go +++ b/ees/elastic_test.go @@ -28,13 +28,13 @@ import ( ) func TestGetMetrics(t *testing.T) { - dc := utils.NewExporterMetrics("", time.Local) + em := utils.NewExporterMetrics("", time.Local) ee := &ElasticEE{ - dc: dc, + em: em, } - if rcv := ee.GetMetrics(); !reflect.DeepEqual(rcv, ee.dc) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(ee.dc)) + if rcv := ee.GetMetrics(); !reflect.DeepEqual(rcv, ee.em) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(ee.em)) } } @@ -56,8 +56,8 @@ func TestInitClient(t *testing.T) { func TestElasticExportEventErr(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - dc := utils.NewExporterMetrics("", time.Local) - eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc) + em := utils.NewExporterMetrics("", time.Local) + eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], em) if err != nil { t.Error(err) } diff --git a/ees/em_it_test.go b/ees/em_it_test.go index 5db02768e..85dc4da35 100644 --- a/ees/em_it_test.go +++ b/ees/em_it_test.go @@ -61,7 +61,7 @@ func TestExporterMetricsIT(t *testing.T) { "synchronous": true, "metrics_reset_schedule": "@every 5s", "fields": [ - { "tag": "Number", "path": "*exp.Number", "type": "*variable", "value": "~*dc.NumberOfEvents" }, + { "tag": "Number", "path": "*exp.Number", "type": "*variable", "value": "~*em.NumberOfEvents" }, { "tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*opts.*originID" }, { "tag": "ToR", "path": "*exp.ToR", "type": "*constant", "value": "*sms" }, { "tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account" }, @@ -70,10 +70,10 @@ func TestExporterMetricsIT(t *testing.T) { { "tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage" }, { "tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}" }, - { "tag": "NumberOfEvents", "path": "*trl.NumberOfEvents", "type": "*variable", "value": "~*dc.NumberOfEvents" }, - { "tag": "TotalDuration", "path": "*trl.TotalDuration", "type": "*variable", "value": "~*dc.TotalDuration" }, - { "tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*dc.TotalSMSUsage" }, - { "tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*dc.TotalCost{*round:4}" } + { "tag": "NumberOfEvents", "path": "*trl.NumberOfEvents", "type": "*variable", "value": "~*em.NumberOfEvents" }, + { "tag": "TotalDuration", "path": "*trl.TotalDuration", "type": "*variable", "value": "~*em.TotalDuration" }, + { "tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*em.TotalSMSUsage" }, + { "tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*em.TotalCost{*round:4}" } ] } ] diff --git a/ees/filecsv.go b/ees/filecsv.go index d84525a96..01ffce3e2 100644 --- a/ees/filecsv.go +++ b/ees/filecsv.go @@ -35,10 +35,10 @@ import ( func NewFileCSVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, - dc *utils.ExporterMetrics, wrtr io.WriteCloser) (fCsv *FileCSVee, err error) { + em *utils.ExporterMetrics, wrtr io.WriteCloser) (fCsv *FileCSVee, err error) { fCsv = &FileCSVee{ cfg: cfg, - dc: dc, + em: em, wrtr: wrtr, cgrCfg: cgrCfg, filterS: filterS, @@ -50,7 +50,7 @@ func NewFileCSVee(cfg *config.EventExporterCfg, // FileCSVee implements EventExporter interface for .csv files type FileCSVee struct { cfg *config.EventExporterCfg - dc *utils.ExporterMetrics + em *utils.ExporterMetrics wrtr io.WriteCloser // writer for the csv csvWriter *csv.Writer sync.Mutex @@ -66,7 +66,7 @@ func (fCsv *FileCSVee) init(wrtr io.WriteCloser) (err error) { // create the file filePath := path.Join(fCsv.Cfg().ExportPath, fCsv.Cfg().ID+utils.Underline+utils.UUIDSha1Prefix()+utils.CSVSuffix) - fCsv.dc.Set([]string{utils.ExportPath}, filePath) + fCsv.em.Set([]string{utils.ExportPath}, filePath) if fCsv.cfg.ExportPath == utils.MetaBuffer { fCsv.wrtr = wrtr } else if fCsv.wrtr, err = os.Create(filePath); err != nil { @@ -84,7 +84,7 @@ func (fCsv *FileCSVee) init(wrtr io.WriteCloser) (err error) { func (fCsv *FileCSVee) composeHeader() (err error) { if len(fCsv.Cfg().HeaderFields()) != 0 { var exp *utils.OrderedNavigableMap - if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, fCsv.Cfg().HeaderFields(), fCsv.dc, fCsv.cgrCfg, fCsv.filterS); err != nil { + if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, fCsv.Cfg().HeaderFields(), fCsv.em, fCsv.cgrCfg, fCsv.filterS); err != nil { return } return fCsv.csvWriter.Write(exp.OrderedFieldsAsStrings()) @@ -96,7 +96,7 @@ func (fCsv *FileCSVee) composeHeader() (err error) { func (fCsv *FileCSVee) composeTrailer() (err error) { if len(fCsv.Cfg().TrailerFields()) != 0 { var exp *utils.OrderedNavigableMap - if exp, err = composeHeaderTrailer(context.Background(), utils.MetaTrl, fCsv.Cfg().TrailerFields(), fCsv.dc, fCsv.cgrCfg, fCsv.filterS); err != nil { + if exp, err = composeHeaderTrailer(context.Background(), utils.MetaTrl, fCsv.Cfg().TrailerFields(), fCsv.em, fCsv.cgrCfg, fCsv.filterS); err != nil { return } return fCsv.csvWriter.Write(exp.OrderedFieldsAsStrings()) @@ -130,7 +130,7 @@ func (fCsv *FileCSVee) Close() (err error) { return } -func (fCsv *FileCSVee) GetMetrics() *utils.ExporterMetrics { return fCsv.dc } +func (fCsv *FileCSVee) GetMetrics() *utils.ExporterMetrics { return fCsv.em } func (fCsv *FileCSVee) ExtraData(ev *utils.CGREvent) any { return nil } diff --git a/ees/filecsv_it_test.go b/ees/filecsv_it_test.go index ccb375bde..0f4df479d 100644 --- a/ees/filecsv_it_test.go +++ b/ees/filecsv_it_test.go @@ -723,11 +723,11 @@ func TestCsvInitFileCSV(t *testing.T) { if err := os.MkdirAll("/tmp/TestInitFileCSV", 0666); err != nil { t.Error(err) } - dc := utils.NewExporterMetrics("", time.Local) + em := utils.NewExporterMetrics("", time.Local) fCsv := &FileCSVee{ cgrCfg: cgrCfg, cfg: cgrCfg.EEsCfg().Exporters[0], - dc: dc, + em: em, } if err := fCsv.init(nil); err != nil { t.Error(err) diff --git a/ees/filecsv_test.go b/ees/filecsv_test.go index cb15a1330..620098ed5 100644 --- a/ees/filecsv_test.go +++ b/ees/filecsv_test.go @@ -33,11 +33,11 @@ import ( ) func TestFileCsvGetMetrics(t *testing.T) { - dc := utils.NewExporterMetrics("", time.Local) - fCsv := &FileCSVee{dc: dc} + em := utils.NewExporterMetrics("", time.Local) + fCsv := &FileCSVee{em: em} - if rcv := fCsv.GetMetrics(); !reflect.DeepEqual(rcv, fCsv.dc) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(fCsv.dc)) + if rcv := fCsv.GetMetrics(); !reflect.DeepEqual(rcv, fCsv.em) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(fCsv.em)) } } @@ -61,7 +61,7 @@ func TestFileCsvComposeHeader(t *testing.T) { filterS: filterS, wrtr: nopCloser{byteBuff}, csvWriter: csvNW, - dc: &utils.ExporterMetrics{}, + em: &utils.ExporterMetrics{}, } fCsv.Cfg().Fields = []*config.FCTemplate{ { @@ -125,7 +125,7 @@ func TestFileCsvComposeTrailer(t *testing.T) { filterS: filterS, wrtr: nopCloser{byteBuff}, csvWriter: csvNW, - dc: &utils.ExporterMetrics{}, + em: &utils.ExporterMetrics{}, } fCsv.Cfg().Fields = []*config.FCTemplate{ { @@ -183,14 +183,14 @@ func TestFileCsvExportEvent(t *testing.T) { filterS := engine.NewFilterS(cfg, nil, newDM) byteBuff := new(bytes.Buffer) csvNW := csv.NewWriter(byteBuff) - dc := utils.NewExporterMetrics("", time.Local) + em := utils.NewExporterMetrics("", time.Local) fCsv := &FileCSVee{ cfg: cfg.EEsCfg().Exporters[0], cgrCfg: cfg, filterS: filterS, wrtr: nopCloser{byteBuff}, csvWriter: csvNW, - dc: dc, + em: em, } if err := fCsv.ExportEvent(context.Background(), []string{"value", "3"}, ""); err != nil { @@ -217,7 +217,7 @@ func TestFileCsvOnEvictedTrailer(t *testing.T) { filterS: filterS, wrtr: nopCloserWrite{byteBuff}, csvWriter: csvNW, - dc: &utils.ExporterMetrics{}, + em: &utils.ExporterMetrics{}, } fCsv.Cfg().Fields = []*config.FCTemplate{ { @@ -252,7 +252,7 @@ func TestFileCsvOnEvictedClose(t *testing.T) { filterS: filterS, wrtr: nopCloserError{byteBuff}, csvWriter: csvNW, - dc: &utils.ExporterMetrics{}, + em: &utils.ExporterMetrics{}, } fCsv.Cfg().Fields = []*config.FCTemplate{ { diff --git a/ees/filefwv.go b/ees/filefwv.go index 7a82609a9..c34388ee7 100644 --- a/ees/filefwv.go +++ b/ees/filefwv.go @@ -31,10 +31,10 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, dc *utils.ExporterMetrics, writer io.Writer) (fFwv *FileFWVee, err error) { +func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, em *utils.ExporterMetrics, writer io.Writer) (fFwv *FileFWVee, err error) { fFwv = &FileFWVee{ cfg: cfg, - dc: dc, + em: em, cgrCfg: cgrCfg, filterS: filterS, @@ -46,7 +46,7 @@ func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filter // FileFWVee implements EventExporter interface for .fwv files type FileFWVee struct { cfg *config.EventExporterCfg - dc *utils.ExporterMetrics + em *utils.ExporterMetrics writer io.WriteCloser sync.Mutex slicePreparing @@ -60,7 +60,7 @@ type FileFWVee struct { func (fFwv *FileFWVee) init(writer io.Writer) (err error) { filePath := path.Join(fFwv.Cfg().ExportPath, fFwv.Cfg().ID+utils.Underline+utils.UUIDSha1Prefix()+utils.FWVSuffix) - fFwv.dc.Set([]string{utils.ExportPath}, filePath) + fFwv.em.Set([]string{utils.ExportPath}, filePath) // create the file if fFwv.cfg.ExportPath == utils.MetaBuffer { fFwv.writer = &buffer{writer} @@ -76,7 +76,7 @@ func (fFwv *FileFWVee) composeHeader() (err error) { return } var exp *utils.OrderedNavigableMap - if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, fFwv.Cfg().HeaderFields(), fFwv.dc, fFwv.cgrCfg, fFwv.filterS); err != nil { + if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, fFwv.Cfg().HeaderFields(), fFwv.em, fFwv.cgrCfg, fFwv.filterS); err != nil { return } for _, record := range exp.OrderedFieldsAsStrings() { @@ -94,7 +94,7 @@ func (fFwv *FileFWVee) composeTrailer() (err error) { return } var exp *utils.OrderedNavigableMap - if exp, err = composeHeaderTrailer(context.Background(), utils.MetaTrl, fFwv.Cfg().TrailerFields(), fFwv.dc, fFwv.cgrCfg, fFwv.filterS); err != nil { + if exp, err = composeHeaderTrailer(context.Background(), utils.MetaTrl, fFwv.Cfg().TrailerFields(), fFwv.em, fFwv.cgrCfg, fFwv.filterS); err != nil { return } for _, record := range exp.OrderedFieldsAsStrings() { @@ -137,6 +137,6 @@ func (fFwv *FileFWVee) Close() (err error) { return } -func (fFwv *FileFWVee) GetMetrics() *utils.ExporterMetrics { return fFwv.dc } +func (fFwv *FileFWVee) GetMetrics() *utils.ExporterMetrics { return fFwv.em } func (fFwv *FileFWVee) ExtraData(ev *utils.CGREvent) any { return nil } diff --git a/ees/filefwv_it_test.go b/ees/filefwv_it_test.go index 238af2dc8..5739b2192 100644 --- a/ees/filefwv_it_test.go +++ b/ees/filefwv_it_test.go @@ -157,11 +157,11 @@ func TestFileFwvInit(t *testing.T) { if err := os.MkdirAll("/tmp/TestInitFileFWV", 0666); err != nil { t.Error(err) } - dc := utils.NewExporterMetrics("", time.Local) + em := utils.NewExporterMetrics("", time.Local) fFwv := &FileFWVee{ cgrCfg: cgrCfg, cfg: cgrCfg.EEsCfg().Exporters[0], - dc: dc, + em: em, } if err := fFwv.init(io.Discard); err != nil { t.Error(err) diff --git a/ees/filefwv_test.go b/ees/filefwv_test.go index 013e479e2..3321f9fc0 100644 --- a/ees/filefwv_test.go +++ b/ees/filefwv_test.go @@ -33,11 +33,11 @@ import ( ) func TestFileFwvGetMetrics(t *testing.T) { - dc := utils.NewExporterMetrics("", time.Local) - fFwv := &FileFWVee{dc: dc} + em := utils.NewExporterMetrics("", time.Local) + fFwv := &FileFWVee{em: em} - if rcv := fFwv.GetMetrics(); !reflect.DeepEqual(rcv, fFwv.dc) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(fFwv.dc)) + if rcv := fFwv.GetMetrics(); !reflect.DeepEqual(rcv, fFwv.em) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(fFwv.em)) } } @@ -54,7 +54,7 @@ func TestFileFwvComposeHeader(t *testing.T) { cgrCfg: cfg, filterS: filterS, writer: nopCloser{byteBuff}, - dc: &utils.ExporterMetrics{}, + em: &utils.ExporterMetrics{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ { @@ -117,7 +117,7 @@ func TestFileFwvComposeTrailer(t *testing.T) { cgrCfg: cfg, filterS: filterS, writer: nopCloser{byteBuff}, - dc: &utils.ExporterMetrics{}, + em: &utils.ExporterMetrics{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ { @@ -175,13 +175,13 @@ func TestFileFwvExportEvent(t *testing.T) { filterS := engine.NewFilterS(cfg, nil, newDM) byteBuff := new(bytes.Buffer) csvNW := csv.NewWriter(byteBuff) - dc := utils.NewExporterMetrics("", time.Local) + em := utils.NewExporterMetrics("", time.Local) fFwv := &FileFWVee{ cfg: cfg.EEsCfg().Exporters[0], cgrCfg: cfg, filterS: filterS, writer: nopCloser{byteBuff}, - dc: dc, + em: em, } if err := fFwv.ExportEvent(context.Background(), []string{"value", "3"}, ""); err != nil { t.Error(err) @@ -209,13 +209,13 @@ func TestFileFwvExportEventWriteError(t *testing.T) { newDM := engine.NewDataManager(dbCM, cfg, nil) filterS := engine.NewFilterS(cfg, nil, newDM) byteBuff := new(bytes.Buffer) - dc := utils.NewExporterMetrics("", time.Local) + em := utils.NewExporterMetrics("", time.Local) fFwv := &FileFWVee{ cfg: cfg.EEsCfg().Exporters[0], cgrCfg: cfg, filterS: filterS, writer: nopCloserWrite{byteBuff}, - dc: dc, + em: em, } if err := fFwv.ExportEvent(context.Background(), []string{""}, ""); err == nil || err != utils.ErrNotImplemented { t.Errorf("Expected %q but received %q", utils.ErrNotImplemented, err) @@ -234,7 +234,7 @@ func TestFileFwvComposeHeaderWriteError(t *testing.T) { cgrCfg: cfg, filterS: filterS, writer: nopCloserWrite{byteBuff}, - dc: &utils.ExporterMetrics{}, + em: &utils.ExporterMetrics{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ { @@ -267,7 +267,7 @@ func TestFileFwvComposeTrailerWriteError(t *testing.T) { cgrCfg: cfg, filterS: filterS, writer: nopCloserWrite{byteBuff}, - dc: &utils.ExporterMetrics{}, + em: &utils.ExporterMetrics{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ { @@ -299,7 +299,7 @@ func TestFileFwvOnEvictedTrailer(t *testing.T) { cgrCfg: cfg, filterS: filterS, writer: nopCloserWrite{byteBuff}, - dc: &utils.ExporterMetrics{}, + em: &utils.ExporterMetrics{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ { @@ -338,7 +338,7 @@ func TestFileFwvOnEvictedClose(t *testing.T) { cgrCfg: cfg, filterS: filterS, writer: nopCloserError{byteBuff}, - dc: &utils.ExporterMetrics{}, + em: &utils.ExporterMetrics{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ { diff --git a/ees/httpjsonmap.go b/ees/httpjsonmap.go index e5a318bb4..687b9dc4f 100644 --- a/ees/httpjsonmap.go +++ b/ees/httpjsonmap.go @@ -34,10 +34,10 @@ import ( ) func NewHTTPjsonMapEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, - dc *utils.ExporterMetrics) (pstrJSON *HTTPjsonMapEE, err error) { + em *utils.ExporterMetrics) (pstrJSON *HTTPjsonMapEE, err error) { pstrJSON = &HTTPjsonMapEE{ cfg: cfg, - dc: dc, + em: em, client: &http.Client{Transport: engine.HTTPPstrTransport(), Timeout: cgrCfg.GeneralCfg().ReplyTimeout}, reqs: newConcReq(cfg.ConcurrentRequests), } @@ -48,7 +48,7 @@ func NewHTTPjsonMapEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, fi // HTTPjsonMapEE implements EventExporter interface for .csv files type HTTPjsonMapEE struct { cfg *config.EventExporterCfg - dc *utils.ExporterMetrics + em *utils.ExporterMetrics client *http.Client reqs *concReq @@ -62,7 +62,7 @@ func (httpEE *HTTPjsonMapEE) composeHeader(cgrCfg *config.CGRConfig, filterS *en return } var exp *utils.OrderedNavigableMap - if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, httpEE.Cfg().HeaderFields(), httpEE.dc, cgrCfg, filterS); err != nil { + if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, httpEE.Cfg().HeaderFields(), httpEE.em, cgrCfg, filterS); err != nil { return } for el := exp.GetFirstElement(); el != nil; el = el.Next() { @@ -92,7 +92,7 @@ func (httpEE *HTTPjsonMapEE) ExportEvent(ctx *context.Context, content, _ any) ( func (httpEE *HTTPjsonMapEE) Close() (_ error) { return } -func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.ExporterMetrics { return httpEE.dc } +func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.ExporterMetrics { return httpEE.em } func (httpEE *HTTPjsonMapEE) ExtraData(ev *utils.CGREvent) any { return nil } diff --git a/ees/httpjsonmap_test.go b/ees/httpjsonmap_test.go index 16d403849..75cc81e75 100644 --- a/ees/httpjsonmap_test.go +++ b/ees/httpjsonmap_test.go @@ -33,13 +33,13 @@ import ( ) func TestHttpJsonMapGetMetrics(t *testing.T) { - dc := utils.NewExporterMetrics("", time.Local) + em := utils.NewExporterMetrics("", time.Local) httpEE := &HTTPjsonMapEE{ - dc: dc, + em: em, } - if rcv := httpEE.GetMetrics(); !reflect.DeepEqual(rcv, httpEE.dc) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(httpEE.dc)) + if rcv := httpEE.GetMetrics(); !reflect.DeepEqual(rcv, httpEE.em) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(httpEE.em)) } } diff --git a/ees/httppost.go b/ees/httppost.go index a020db08e..69c4e077c 100644 --- a/ees/httppost.go +++ b/ees/httppost.go @@ -30,10 +30,10 @@ import ( ) func NewHTTPPostEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, - dc *utils.ExporterMetrics) (httpPost *HTTPPostEE, err error) { + em *utils.ExporterMetrics) (httpPost *HTTPPostEE, err error) { httpPost = &HTTPPostEE{ cfg: cfg, - dc: dc, + em: em, client: &http.Client{Transport: engine.HTTPPstrTransport(), Timeout: cgrCfg.GeneralCfg().ReplyTimeout}, reqs: newConcReq(cfg.ConcurrentRequests), } @@ -44,7 +44,7 @@ func NewHTTPPostEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filte // FileCSVee implements EventExporter interface for .csv files type HTTPPostEE struct { cfg *config.EventExporterCfg - dc *utils.ExporterMetrics + em *utils.ExporterMetrics client *http.Client reqs *concReq @@ -62,7 +62,7 @@ func (httpPost *HTTPPostEE) composeHeader(cgrCfg *config.CGRConfig, filterS *eng return } var exp *utils.OrderedNavigableMap - if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, httpPost.Cfg().HeaderFields(), httpPost.dc, cgrCfg, filterS); err != nil { + if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, httpPost.Cfg().HeaderFields(), httpPost.em, cgrCfg, filterS); err != nil { return } for el := exp.GetFirstElement(); el != nil; el = el.Next() { @@ -92,7 +92,7 @@ func (httpPost *HTTPPostEE) ExportEvent(ctx *context.Context, content, _ any) (e func (httpPost *HTTPPostEE) Close() (_ error) { return } -func (httpPost *HTTPPostEE) GetMetrics() *utils.ExporterMetrics { return httpPost.dc } +func (httpPost *HTTPPostEE) GetMetrics() *utils.ExporterMetrics { return httpPost.em } func (httpPost *HTTPPostEE) ExtraData(ev *utils.CGREvent) any { return nil } diff --git a/ees/httppost_test.go b/ees/httppost_test.go index c6b53ffd2..904750053 100644 --- a/ees/httppost_test.go +++ b/ees/httppost_test.go @@ -34,13 +34,13 @@ import ( ) func TestHttpPostGetMetrics(t *testing.T) { - dc := utils.NewExporterMetrics("", time.Local) + em := utils.NewExporterMetrics("", time.Local) httpPost := &HTTPPostEE{ - dc: dc, + em: em, } - if rcv := httpPost.GetMetrics(); !reflect.DeepEqual(rcv, httpPost.dc) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(httpPost.dc)) + if rcv := httpPost.GetMetrics(); !reflect.DeepEqual(rcv, httpPost.em) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(httpPost.em)) } } diff --git a/ees/kafka.go b/ees/kafka.go index 97653868b..3077bbfb5 100644 --- a/ees/kafka.go +++ b/ees/kafka.go @@ -32,10 +32,10 @@ import ( ) // NewKafkaEE creates a kafka poster -func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) (*KafkaEE, error) { +func NewKafkaEE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) (*KafkaEE, error) { pstr := &KafkaEE{ cfg: cfg, - dc: dc, + em: em, reqs: newConcReq(cfg.ConcurrentRequests), } @@ -100,7 +100,7 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) (*Kafka type KafkaEE struct { writer *kafka.Writer cfg *config.EventExporterCfg - dc *utils.ExporterMetrics + em *utils.ExporterMetrics reqs *concReq bytePreparing } @@ -131,7 +131,7 @@ func (k *KafkaEE) Close() error { return k.writer.Close() } -func (k *KafkaEE) GetMetrics() *utils.ExporterMetrics { return k.dc } +func (k *KafkaEE) GetMetrics() *utils.ExporterMetrics { return k.em } func (k *KafkaEE) ExtraData(ev *utils.CGREvent) any { return utils.ConcatenatedKey( utils.FirstNonEmpty(engine.MapEvent(ev.APIOpts).GetStringIgnoreErrors(utils.MetaOriginID), utils.GenUUID()), diff --git a/ees/log.go b/ees/log.go index 71b20d06c..a5d417077 100644 --- a/ees/log.go +++ b/ees/log.go @@ -27,17 +27,17 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewLogEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *LogEE { +func NewLogEE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *LogEE { return &LogEE{ cfg: cfg, - dc: dc, + em: em, } } // LogEE implements EventExporter interface for .csv files type LogEE struct { cfg *config.EventExporterCfg - dc *utils.ExporterMetrics + em *utils.ExporterMetrics } func (vEe *LogEE) Cfg() *config.EventExporterCfg { return vEe.cfg } @@ -49,7 +49,7 @@ func (vEe *LogEE) ExportEvent(_ *context.Context, mp, _ any) error { return nil } func (vEe *LogEE) Close() error { return nil } -func (vEe *LogEE) GetMetrics() *utils.ExporterMetrics { return vEe.dc } +func (vEe *LogEE) GetMetrics() *utils.ExporterMetrics { return vEe.em } func (vEe *LogEE) ExtraData(ev *utils.CGREvent) any { return nil } func (vEe *LogEE) PrepareMap(mp *utils.CGREvent) (any, error) { return mp.Event, nil diff --git a/ees/log_test.go b/ees/log_test.go index 22a7a3ed6..b34b72f05 100644 --- a/ees/log_test.go +++ b/ees/log_test.go @@ -32,14 +32,14 @@ import ( func TestNewLogEE(t *testing.T) { cfg := config.NewDefaultCGRConfig() - dc := utils.NewExporterMetrics("", time.Local) + em := utils.NewExporterMetrics("", time.Local) expected := &LogEE{ cfg: cfg.EEsCfg().ExporterCfg(utils.MetaDefault), - dc: dc, + em: em, } - rcv := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), dc) + rcv := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), em) if !reflect.DeepEqual(rcv, expected) { t.Errorf("Expected %v \n but received \n %v", expected, rcv) } @@ -47,8 +47,8 @@ func TestNewLogEE(t *testing.T) { func TestLogEEExportEvent(t *testing.T) { cfg := config.NewDefaultCGRConfig() - dc := utils.NewExporterMetrics("", time.Local) - logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), dc) + em := utils.NewExporterMetrics("", time.Local) + logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), em) mp := map[string]any{ "field1": 2, "field2": "value", @@ -71,7 +71,7 @@ func TestLogEE_GetMetrics(t *testing.T) { mockMetrics := &utils.ExporterMetrics{} vEe := &LogEE{ - dc: mockMetrics, + em: mockMetrics, } result := vEe.GetMetrics() @@ -83,8 +83,8 @@ func TestLogEE_GetMetrics(t *testing.T) { func TestLogEEPrepareMap(t *testing.T) { cfg := config.NewDefaultCGRConfig() - dc := utils.NewExporterMetrics("", time.Local) - logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), dc) + em := utils.NewExporterMetrics("", time.Local) + logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), em) mp := &utils.CGREvent{ Event: map[string]any{ "field1": 2, @@ -99,8 +99,8 @@ func TestLogEEPrepareMap(t *testing.T) { func TestLogEEPrepareOrderMap(t *testing.T) { cfg := config.NewDefaultCGRConfig() - dc := utils.NewExporterMetrics("", time.Local) - logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), dc) + em := utils.NewExporterMetrics("", time.Local) + logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), em) mp := utils.NewOrderedNavigableMap() fullPath := &utils.FullPath{ PathSlice: []string{"*path1"}, diff --git a/ees/nats.go b/ees/nats.go index e6cf758cc..ac2428db5 100644 --- a/ees/nats.go +++ b/ees/nats.go @@ -34,10 +34,10 @@ import ( ) // NewNatsEE creates a kafka poster -func NewNatsEE(cfg *config.EventExporterCfg, nodeID string, connTimeout time.Duration, dc *utils.ExporterMetrics) (natsPstr *NatsEE, err error) { +func NewNatsEE(cfg *config.EventExporterCfg, nodeID string, connTimeout time.Duration, em *utils.ExporterMetrics) (natsPstr *NatsEE, err error) { natsPstr = &NatsEE{ cfg: cfg, - dc: dc, + em: em, subject: utils.DefaultQueueID, reqs: newConcReq(cfg.ConcurrentRequests), } @@ -55,7 +55,7 @@ type NatsEE struct { posterJS jetstream.JetStream cfg *config.EventExporterCfg - dc *utils.ExporterMetrics + em *utils.ExporterMetrics reqs *concReq sync.RWMutex // protect writer bytePreparing @@ -127,7 +127,7 @@ func (pstr *NatsEE) Close() error { return err } -func (pstr *NatsEE) GetMetrics() *utils.ExporterMetrics { return pstr.dc } +func (pstr *NatsEE) GetMetrics() *utils.ExporterMetrics { return pstr.em } func (pstr *NatsEE) ExtraData(ev *utils.CGREvent) any { return nil } diff --git a/ees/rpc.go b/ees/rpc.go index ecb097d2c..94891a5cb 100644 --- a/ees/rpc.go +++ b/ees/rpc.go @@ -29,11 +29,11 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics, +func NewRpcEE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics, connMgr *engine.ConnManager) (e *RPCee, err error) { e = &RPCee{ cfg: cfg, - dc: dc, + em: em, connMgr: connMgr, } err = e.parseOpts() @@ -42,7 +42,7 @@ func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics, type RPCee struct { cfg *config.EventExporterCfg - dc *utils.ExporterMetrics + em *utils.ExporterMetrics connMgr *engine.ConnManager //opts @@ -82,7 +82,7 @@ func (e *RPCee) Close() (err error) { } func (e *RPCee) GetMetrics() (mp *utils.ExporterMetrics) { - return e.dc + return e.em } func (e *RPCee) ExtraData(ev *utils.CGREvent) any { return nil } diff --git a/ees/rpc_test.go b/ees/rpc_test.go index ae68ae2b5..8f8bff846 100644 --- a/ees/rpc_test.go +++ b/ees/rpc_test.go @@ -30,17 +30,17 @@ import ( func TestNewRpcEE(t *testing.T) { eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault) - dc := utils.NewExporterMetrics("", time.Local) + em := utils.NewExporterMetrics("", time.Local) connMgr := engine.NewConnManager(config.NewDefaultCGRConfig()) - rcv, err := NewRpcEE(eeSCfg, dc, connMgr) + rcv, err := NewRpcEE(eeSCfg, em, connMgr) if err != nil { t.Error(err) } exp := &RPCee{ cfg: eeSCfg, - dc: dc, + em: em, connMgr: connMgr, } @@ -100,9 +100,9 @@ func TestRPCCfg(t *testing.T) { func TestRPCConnect(t *testing.T) { eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault) - dc := utils.NewExporterMetrics("", time.Local) + em := utils.NewExporterMetrics("", time.Local) connMgr := engine.NewConnManager(config.NewDefaultCGRConfig()) - rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr) + rpcEe, err := NewRpcEE(eeSCfg, em, connMgr) if err != nil { t.Error(err) } @@ -113,12 +113,9 @@ func TestRPCConnect(t *testing.T) { // func TestRPCExportEvent(t *testing.T) { // eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault) -// dc, err := newEEMetrics("Local") -// if err != nil { -// t.Error(err) -// } +// em := utils.NewExporterMetrics("",time.Local) // connMgr := engine.NewConnManager(config.NewDefaultCGRConfig()) -// rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr) +// rpcEe, err := NewRpcEE(eeSCfg, em, connMgr) // if err != nil { // t.Error(err) // } @@ -142,9 +139,9 @@ func TestRPCConnect(t *testing.T) { func TestRPCClose(t *testing.T) { eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault) - dc := utils.NewExporterMetrics("", time.Local) + em := utils.NewExporterMetrics("", time.Local) connMgr := engine.NewConnManager(config.NewDefaultCGRConfig()) - rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr) + rpcEe, err := NewRpcEE(eeSCfg, em, connMgr) if err != nil { t.Error(err) } @@ -158,28 +155,28 @@ func TestRPCClose(t *testing.T) { func TestRPCGetMetrics(t *testing.T) { eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault) - dc := &utils.ExporterMetrics{ + em := &utils.ExporterMetrics{ MapStorage: utils.MapStorage{ "time": "now", "just_a_field": "just_a_value", }, } connMgr := engine.NewConnManager(config.NewDefaultCGRConfig()) - rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr) + rpcEe, err := NewRpcEE(eeSCfg, em, connMgr) if err != nil { t.Error(err) } - if rcv := rpcEe.GetMetrics(); !reflect.DeepEqual(rcv, dc) { - t.Errorf("Expected %v \n but received \n %v", dc, rcv) + if rcv := rpcEe.GetMetrics(); !reflect.DeepEqual(rcv, em) { + t.Errorf("Expected %v \n but received \n %v", em, rcv) } } func TestRPCPrepareMap(t *testing.T) { eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault) - dc := utils.NewExporterMetrics("", time.Local) + em := utils.NewExporterMetrics("", time.Local) connMgr := engine.NewConnManager(config.NewDefaultCGRConfig()) - rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr) + rpcEe, err := NewRpcEE(eeSCfg, em, connMgr) if err != nil { t.Error(err) } diff --git a/ees/s3.go b/ees/s3.go index a207840d6..dc2b63ab9 100644 --- a/ees/s3.go +++ b/ees/s3.go @@ -34,10 +34,10 @@ import ( ) // NewS3EE creates a s3 poster -func NewS3EE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *S3EE { +func NewS3EE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *S3EE { pstr := &S3EE{ cfg: cfg, - dc: dc, + em: em, reqs: newConcReq(cfg.ConcurrentRequests), } pstr.parseOpts(cfg.Opts) @@ -56,7 +56,7 @@ type S3EE struct { up *s3manager.Uploader cfg *config.EventExporterCfg - dc *utils.ExporterMetrics + em *utils.ExporterMetrics reqs *concReq sync.RWMutex // protect connection bytePreparing @@ -138,7 +138,7 @@ func (pstr *S3EE) ExportEvent(ctx *context.Context, message, extraData any) (err func (pstr *S3EE) Close() (_ error) { return } -func (pstr *S3EE) GetMetrics() *utils.ExporterMetrics { return pstr.dc } +func (pstr *S3EE) GetMetrics() *utils.ExporterMetrics { return pstr.em } func (pstr *S3EE) ExtraData(ev *utils.CGREvent) any { return utils.ConcatenatedKey( diff --git a/ees/sql.go b/ees/sql.go index 8da69a404..5d2307a54 100644 --- a/ees/sql.go +++ b/ees/sql.go @@ -36,10 +36,10 @@ import ( ) func NewSQLEe(cfg *config.EventExporterCfg, - dc *utils.ExporterMetrics) (sqlEe *SQLEe, err error) { + em *utils.ExporterMetrics) (sqlEe *SQLEe, err error) { sqlEe = &SQLEe{ cfg: cfg, - dc: dc, + em: em, reqs: newConcReq(cfg.ConcurrentRequests), } err = sqlEe.initDialector() @@ -49,7 +49,7 @@ func NewSQLEe(cfg *config.EventExporterCfg, // SQLEe implements EventExporter interface for SQL type SQLEe struct { cfg *config.EventExporterCfg - dc *utils.ExporterMetrics + em *utils.ExporterMetrics db *gorm.DB sqldb *sql.DB reqs *concReq @@ -157,7 +157,7 @@ func (sqlEe *SQLEe) Close() (err error) { return } -func (sqlEe *SQLEe) GetMetrics() *utils.ExporterMetrics { return sqlEe.dc } +func (sqlEe *SQLEe) GetMetrics() *utils.ExporterMetrics { return sqlEe.em } func (sqlEe *SQLEe) ExtraData(ev *utils.CGREvent) any { return nil } diff --git a/ees/sql_test.go b/ees/sql_test.go index 814a35480..84689deac 100644 --- a/ees/sql_test.go +++ b/ees/sql_test.go @@ -33,12 +33,12 @@ import ( ) func TestSqlGetMetrics(t *testing.T) { - dc := utils.NewExporterMetrics("", time.Local) + em := utils.NewExporterMetrics("", time.Local) sqlEe := &SQLEe{ - dc: dc, + em: em, } - if rcv := sqlEe.GetMetrics(); !reflect.DeepEqual(rcv, sqlEe.dc) { - t.Errorf("Expected %+v but got %+v", utils.ToJSON(rcv), utils.ToJSON(sqlEe.dc)) + if rcv := sqlEe.GetMetrics(); !reflect.DeepEqual(rcv, sqlEe.em) { + t.Errorf("Expected %+v but got %+v", utils.ToJSON(rcv), utils.ToJSON(sqlEe.em)) } } diff --git a/ees/sqs.go b/ees/sqs.go index 2e066f588..2ffcf8706 100644 --- a/ees/sqs.go +++ b/ees/sqs.go @@ -32,10 +32,10 @@ import ( ) // NewSQSee creates a poster for sqs -func NewSQSee(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *SQSee { +func NewSQSee(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *SQSee { pstr := &SQSee{ cfg: cfg, - dc: dc, + em: em, reqs: newConcReq(cfg.ConcurrentRequests), } pstr.parseOpts(cfg.Opts) @@ -54,7 +54,7 @@ type SQSee struct { svc *sqs.SQS cfg *config.EventExporterCfg - dc *utils.ExporterMetrics + em *utils.ExporterMetrics reqs *concReq sync.RWMutex // protect connection bytePreparing @@ -146,6 +146,6 @@ func (pstr *SQSee) ExportEvent(ctx *context.Context, message, _ any) (err error) func (pstr *SQSee) Close() (_ error) { return } -func (pstr *SQSee) GetMetrics() *utils.ExporterMetrics { return pstr.dc } +func (pstr *SQSee) GetMetrics() *utils.ExporterMetrics { return pstr.em } func (pstr *SQSee) ExtraData(ev *utils.CGREvent) any { return nil } diff --git a/ees/virtualee.go b/ees/virtualee.go index 56d969abe..5e11d576f 100644 --- a/ees/virtualee.go +++ b/ees/virtualee.go @@ -24,24 +24,24 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewVirtualEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *VirtualEE { +func NewVirtualEE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *VirtualEE { return &VirtualEE{ cfg: cfg, - dc: dc, + em: em, } } // VirtualEE implements EventExporter interface for .csv files type VirtualEE struct { cfg *config.EventExporterCfg - dc *utils.ExporterMetrics + em *utils.ExporterMetrics } func (vEe *VirtualEE) Cfg() *config.EventExporterCfg { return vEe.cfg } func (vEe *VirtualEE) Connect() error { return nil } func (vEe *VirtualEE) ExportEvent(*context.Context, any, any) error { return nil } func (vEe *VirtualEE) Close() error { return nil } -func (vEe *VirtualEE) GetMetrics() *utils.ExporterMetrics { return vEe.dc } +func (vEe *VirtualEE) GetMetrics() *utils.ExporterMetrics { return vEe.em } func (vEe *VirtualEE) ExtraData(*utils.CGREvent) any { return nil } func (vEe *VirtualEE) PrepareMap(mp *utils.CGREvent) (any, error) { return nil, nil } func (vEe *VirtualEE) PrepareOrderMap(*utils.OrderedNavigableMap) (any, error) { diff --git a/ees/virtualee_test.go b/ees/virtualee_test.go index d6a2b8663..601202fb9 100644 --- a/ees/virtualee_test.go +++ b/ees/virtualee_test.go @@ -28,13 +28,13 @@ import ( ) func TestVirtualEeGetMetrics(t *testing.T) { - dc := utils.NewExporterMetrics("", time.Local) + em := utils.NewExporterMetrics("", time.Local) vEe := &VirtualEE{ - dc: dc, + em: em, } - if rcv := vEe.GetMetrics(); !reflect.DeepEqual(rcv, vEe.dc) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(vEe.dc)) + if rcv := vEe.GetMetrics(); !reflect.DeepEqual(rcv, vEe.em) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(vEe.em)) } } func TestVirtualEeExportEvent(t *testing.T) { diff --git a/utils/consts.go b/utils/consts.go index 99c925bc3..772ed631d 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -392,7 +392,7 @@ const ( MetaTpes = "*tpes" MetaFilters = "*filters" MetaCDRs = "*cdrs" - MetaDC = "*dc" + MetaEM = "*em" MetaCaches = "*caches" MetaUCH = "*uch" MetaGuardian = "*guardians"