diff --git a/ees/amqp.go b/ees/amqp.go
index d279d8437..b09e2761a 100644
--- a/ees/amqp.go
+++ b/ees/amqp.go
@@ -30,7 +30,7 @@ import (
// NewAMQPee creates a new amqp poster
// "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs"
-func NewAMQPee(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *AMQPee {
+func NewAMQPee(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *AMQPee {
amqp := &AMQPee{
cfg: cfg,
dc: dc,
@@ -50,7 +50,7 @@ type AMQPee struct {
postChan *amqp.Channel
cfg *config.EventExporterCfg
- dc *utils.SafeMapStorage
+ dc *utils.ExporterMetrics
reqs *concReq
sync.RWMutex // protect connection
bytePreparing
@@ -175,6 +175,6 @@ func (pstr *AMQPee) Close() (err error) {
return
}
-func (pstr *AMQPee) GetMetrics() *utils.SafeMapStorage { return pstr.dc }
+func (pstr *AMQPee) GetMetrics() *utils.ExporterMetrics { return pstr.dc }
func (pstr *AMQPee) ExtraData(*utils.CGREvent) any { return nil }
diff --git a/ees/amqp_test.go b/ees/amqp_test.go
index 7cf984feb..5e03541db 100644
--- a/ees/amqp_test.go
+++ b/ees/amqp_test.go
@@ -28,12 +28,13 @@ import (
func TestNewAMQPee(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
- dc := &utils.SafeMapStorage{
+ dc := &utils.ExporterMetrics{
MapStorage: utils.MapStorage{
utils.NumberOfEvents: int64(0),
utils.PositiveExports: utils.StringSet{},
utils.NegativeExports: 5,
- }}
+ },
+ }
cfg.EEsCfg().ExporterCfg(utils.MetaDefault).ConcurrentRequests = 2
rcv := NewAMQPee(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), dc)
exp := &AMQPee{
diff --git a/ees/amqpv1.go b/ees/amqpv1.go
index d7319528a..99d24757e 100644
--- a/ees/amqpv1.go
+++ b/ees/amqpv1.go
@@ -28,7 +28,7 @@ import (
)
// NewAMQPv1EE creates a poster for amqpv1
-func NewAMQPv1EE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *AMQPv1EE {
+func NewAMQPv1EE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *AMQPv1EE {
pstr := &AMQPv1EE{
cfg: cfg,
dc: dc,
@@ -54,7 +54,7 @@ type AMQPv1EE struct {
session *amqpv1.Session
cfg *config.EventExporterCfg
- dc *utils.SafeMapStorage
+ dc *utils.ExporterMetrics
reqs *concReq
sync.RWMutex // protect connection
bytePreparing
@@ -118,6 +118,6 @@ func (pstr *AMQPv1EE) Close() (err error) {
return
}
-func (pstr *AMQPv1EE) GetMetrics() *utils.SafeMapStorage { return pstr.dc }
+func (pstr *AMQPv1EE) GetMetrics() *utils.ExporterMetrics { return pstr.dc }
func (pstr *AMQPv1EE) ExtraData(*utils.CGREvent) any { return nil }
diff --git a/ees/apis.go b/ees/apis.go
index e391ae071..b05495a86 100644
--- a/ees/apis.go
+++ b/ees/apis.go
@@ -186,9 +186,7 @@ func exportEventWithExporter(ctx *context.Context, exp EventExporter, connMngr *
}()
var eEv any
- exp.GetMetrics().Lock()
- exp.GetMetrics().MapStorage[utils.NumberOfEvents] = exp.GetMetrics().MapStorage[utils.NumberOfEvents].(int64) + 1
- exp.GetMetrics().Unlock()
+ exp.GetMetrics().IncrementEvents()
if len(exp.Cfg().ContentFields()) == 0 {
if eEv, err = exp.PrepareMap(ev); err != nil {
return
@@ -223,30 +221,31 @@ func (eeS *EeS) V1ArchiveEventsInReply(ctx *context.Context, args *ArchiveEvents
return fmt.Errorf("ExporterID is missing from argument's options")
}
// check if there are any exporters that match our expID
- var eesCfg *config.EventExporterCfg
+ var eeCfg *config.EventExporterCfg
for _, exporter := range eeS.cfg.EEsCfg().Exporters {
if exporter.ID == expID {
- eesCfg = exporter
+ eeCfg = exporter
break
}
}
- if eesCfg == nil {
+ if eeCfg == nil {
return fmt.Errorf("exporter config with ID: %s is missing", expID)
}
// also mandatory to be synchronous
- if !eesCfg.Synchronous {
+ if !eeCfg.Synchronous {
return fmt.Errorf("exporter with ID: %s is not synchronous", expID)
}
// also mandatory to be type of *buffer
- if eesCfg.ExportPath != utils.MetaBuffer {
+ if eeCfg.ExportPath != utils.MetaBuffer {
return fmt.Errorf("exporter with ID: %s has an invalid ExportPath for archiving", expID)
}
- var dc *utils.SafeMapStorage
- if dc, err = newEEMetrics(utils.FirstNonEmpty(
- eesCfg.Timezone,
- eeS.cfg.GeneralCfg().DefaultTimezone)); err != nil {
- return
+ timezone := utils.FirstNonEmpty(eeCfg.Timezone, eeS.cfg.GeneralCfg().DefaultTimezone)
+ loc, err := time.LoadLocation(timezone)
+ if err != nil {
+ return err
}
+ dc := utils.NewExporterMetrics(eeCfg.MetricsResetSchedule, loc)
+
var ee EventExporter
buff := new(bytes.Buffer)
@@ -260,13 +259,13 @@ func (eeS *EeS) V1ArchiveEventsInReply(ctx *context.Context, args *ArchiveEvents
}); err != nil {
return err
}
- switch eesCfg.Type {
+ switch eeCfg.Type {
case utils.MetaFileCSV:
- ee, err = NewFileCSVee(eesCfg, eeS.cfg, eeS.fltrS, dc, &buffer{wrtr})
+ ee, err = NewFileCSVee(eeCfg, eeS.cfg, eeS.fltrS, dc, &buffer{wrtr})
case utils.MetaFileFWV:
- ee, err = NewFileFWVee(eesCfg, eeS.cfg, eeS.fltrS, dc, &buffer{wrtr})
+ ee, err = NewFileFWVee(eeCfg, eeS.cfg, eeS.fltrS, dc, &buffer{wrtr})
default:
- err = fmt.Errorf("unsupported exporter type: %s>", eesCfg.Type)
+ err = fmt.Errorf("unsupported exporter type: %s>", eeCfg.Type)
}
if err != nil {
return err
@@ -286,10 +285,10 @@ func (eeS *EeS) V1ArchiveEventsInReply(ctx *context.Context, args *ArchiveEvents
tnt := utils.FirstNonEmpty(args.Tenant, eeS.cfg.GeneralCfg().DefaultTenant)
var exported bool
for _, event := range args.Events {
- if len(eesCfg.Filters) != 0 && !ignoreFltr {
+ if len(eeCfg.Filters) != 0 && !ignoreFltr {
cgrDp[utils.MetaReq] = event.Event
if pass, errPass := eeS.fltrS.Pass(ctx, tnt,
- eesCfg.Filters, cgrDp); errPass != nil {
+ eeCfg.Filters, cgrDp); errPass != nil {
return errPass
} else if !pass {
continue // does not pass the filters, ignore the exporter
diff --git a/ees/ee.go b/ees/ee.go
index 996a9ab13..3ac8b442b 100644
--- a/ees/ee.go
+++ b/ees/ee.go
@@ -35,7 +35,7 @@ type EventExporter interface {
Connect() error // called before exporting an event to make sure it is connected
ExportEvent(ctx *context.Context, content any, extraData any) error // called on each event to be exported
Close() error // called when the exporter needs to terminate
- GetMetrics() *utils.SafeMapStorage // called to get metrics
+ GetMetrics() *utils.ExporterMetrics // called to get metrics
PrepareMap(*utils.CGREvent) (any, error)
PrepareOrderMap(*utils.OrderedNavigableMap) (any, error)
ExtraData(*utils.CGREvent) any
@@ -44,12 +44,13 @@ type EventExporter interface {
// NewEventExporter produces exporters
func NewEventExporter(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig,
filterS *engine.FilterS, connMngr *engine.ConnManager) (ee EventExporter, err error) {
- var dc *utils.SafeMapStorage
- if dc, err = newEEMetrics(utils.FirstNonEmpty(
- cfg.Timezone,
- cgrCfg.GeneralCfg().DefaultTimezone)); err != nil {
- return
+ timezone := utils.FirstNonEmpty(cfg.Timezone, cgrCfg.GeneralCfg().DefaultTimezone)
+ loc, err := time.LoadLocation(timezone)
+ if err != nil {
+ return nil, err
}
+ dc := utils.NewExporterMetrics(cfg.MetricsResetSchedule, loc)
+
switch cfg.Type {
case utils.MetaFileCSV:
return NewFileCSVee(cfg, cgrCfg, filterS, dc, nil)
@@ -125,20 +126,7 @@ func composeHeaderTrailer(ctx *context.Context, prfx string, fields []*config.FC
return
}
-func newEEMetrics(location string) (*utils.SafeMapStorage, error) {
- loc, err := time.LoadLocation(location)
- if err != nil {
- return nil, err
- }
- return &utils.SafeMapStorage{MapStorage: utils.MapStorage{
- utils.NumberOfEvents: int64(0),
- utils.PositiveExports: utils.StringSet{},
- utils.NegativeExports: utils.StringSet{},
- utils.TimeNow: time.Now().In(loc),
- }}, nil
-}
-
-func updateEEMetrics(dc *utils.SafeMapStorage, originID string, ev engine.MapEvent, hasError bool, timezone string) {
+func updateEEMetrics(dc *utils.ExporterMetrics, originID string, ev engine.MapEvent, hasError bool, timezone string) {
dc.Lock()
defer dc.Unlock()
if hasError {
diff --git a/ees/ee_test.go b/ees/ee_test.go
index 9c6e8c188..70d61676a 100644
--- a/ees/ee_test.go
+++ b/ees/ee_test.go
@@ -23,6 +23,7 @@ import (
"reflect"
"strings"
"testing"
+ "time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -39,13 +40,7 @@ func TestNewEventExporter(t *testing.T) {
if strings.Contains(errExpect, err.Error()) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
}
- dc, err := newEEMetrics(utils.FirstNonEmpty(
- "Local",
- utils.EmptyString,
- ))
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
eeExpect, err := NewFileCSVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc, nil)
if strings.Contains(errExpect, err.Error()) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
@@ -76,13 +71,7 @@ func TestNewEventExporterCase2(t *testing.T) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
}
- dc, err := newEEMetrics(utils.FirstNonEmpty(
- "Local",
- utils.EmptyString,
- ))
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
eeExpect, err := NewFileFWVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc, io.Discard)
if strings.Contains(errExpect, err.Error()) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
@@ -110,13 +99,7 @@ func TestNewEventExporterCase3(t *testing.T) {
if err != nil {
t.Error(err)
}
- dc, err := newEEMetrics(utils.FirstNonEmpty(
- "Local",
- utils.EmptyString,
- ))
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
eeExpect, err := NewHTTPPostEE(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc)
if err != nil {
t.Error(err)
@@ -138,13 +121,7 @@ func TestNewEventExporterCase4(t *testing.T) {
if err != nil {
t.Error(err)
}
- dc, err := newEEMetrics(utils.FirstNonEmpty(
- "Local",
- utils.EmptyString,
- ))
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
eeExpect, err := NewHTTPjsonMapEE(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc)
if err != nil {
t.Error(err)
@@ -166,13 +143,7 @@ func TestNewEventExporterCase6(t *testing.T) {
if err != nil {
t.Error(err)
}
- dc, err := newEEMetrics(utils.FirstNonEmpty(
- "Local",
- utils.EmptyString,
- ))
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
eeExpect := NewVirtualEE(cgrCfg.EEsCfg().Exporters[0], dc)
newEE := ee.(*VirtualEE)
newEE.dc.MapStorage[utils.TimeNow] = nil
@@ -205,10 +176,7 @@ func TestNewEventExporterCase7(t *testing.T) {
if err != nil {
t.Error(err)
}
- dc, err := newEEMetrics(utils.FirstNonEmpty(
- "Local",
- utils.EmptyString,
- ))
+ dc := utils.NewExporterMetrics("", time.Local)
if err != nil {
t.Error(err)
}
diff --git a/ees/ees.go b/ees/ees.go
index 3da52e532..256fa369f 100644
--- a/ees/ees.go
+++ b/ees/ees.go
@@ -33,7 +33,9 @@ import (
// onCacheEvicted is called by ltcache when evicting an item
func onCacheEvicted(_ string, value any) {
- value.(EventExporter).Close()
+ ee := value.(EventExporter)
+ ee.GetMetrics().StopCron()
+ ee.Close()
}
// NewEventExporterS initializes a new EventExporterS.
diff --git a/ees/ees_test.go b/ees/ees_test.go
index aace7f4c2..7f1390fc3 100644
--- a/ees/ees_test.go
+++ b/ees/ees_test.go
@@ -304,7 +304,7 @@ func TestV1ProcessEvent4(t *testing.T) {
}
func newMockEventExporter() *mockEventExporter {
- return &mockEventExporter{dc: &utils.SafeMapStorage{
+ return &mockEventExporter{dc: &utils.ExporterMetrics{
MapStorage: utils.MapStorage{
utils.NumberOfEvents: int64(0),
utils.PositiveExports: utils.StringSet{},
@@ -313,11 +313,11 @@ func newMockEventExporter() *mockEventExporter {
}
type mockEventExporter struct {
- dc *utils.SafeMapStorage
+ dc *utils.ExporterMetrics
bytePreparing
}
-func (m mockEventExporter) GetMetrics() *utils.SafeMapStorage {
+func (m mockEventExporter) GetMetrics() *utils.ExporterMetrics {
return m.dc
}
@@ -331,7 +331,7 @@ func (mockEventExporter) Close() error {
}
func TestV1ProcessEventMockMetrics(t *testing.T) {
- mEe := mockEventExporter{dc: &utils.SafeMapStorage{
+ mEe := mockEventExporter{dc: &utils.ExporterMetrics{
MapStorage: utils.MapStorage{
utils.NumberOfEvents: int64(0),
utils.PositiveExports: utils.StringSet{},
@@ -455,7 +455,7 @@ func TestOnCacheEvicted(t *testing.T) {
}
func TestUpdateEEMetrics(t *testing.T) {
- dc, _ := newEEMetrics(utils.EmptyString)
+ dc := utils.NewExporterMetrics("", time.UTC)
tnow := time.Now()
ev := engine.MapEvent{
utils.AnswerTime: tnow,
@@ -464,7 +464,7 @@ func TestUpdateEEMetrics(t *testing.T) {
utils.ToR: utils.MetaVoice,
utils.Usage: time.Second,
}
- exp, _ := newEEMetrics(utils.EmptyString)
+ exp := utils.NewExporterMetrics("", time.UTC)
exp.MapStorage[utils.FirstEventATime] = tnow
exp.MapStorage[utils.LastEventATime] = tnow
exp.MapStorage[utils.FirstExpOrderID] = int64(1)
diff --git a/ees/elastic.go b/ees/elastic.go
index e86bee422..c6708b4f1 100644
--- a/ees/elastic.go
+++ b/ees/elastic.go
@@ -39,14 +39,14 @@ import (
type ElasticEE struct {
mu sync.RWMutex
cfg *config.EventExporterCfg
- dc *utils.SafeMapStorage
+ dc *utils.ExporterMetrics
reqs *concReq
client *elasticsearch.TypedClient
clientCfg elasticsearch.Config
}
-func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*ElasticEE, error) {
+func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) (*ElasticEE, error) {
el := &ElasticEE{
cfg: cfg,
dc: dc,
@@ -221,7 +221,7 @@ func (e *ElasticEE) Close() error {
return nil
}
-func (e *ElasticEE) GetMetrics() *utils.SafeMapStorage { return e.dc }
+func (e *ElasticEE) GetMetrics() *utils.ExporterMetrics { return e.dc }
func (eEE *ElasticEE) ExtraData(ev *utils.CGREvent) any {
return utils.ConcatenatedKey(
diff --git a/ees/elastic_test.go b/ees/elastic_test.go
index 291ffdfa0..739c3b865 100644
--- a/ees/elastic_test.go
+++ b/ees/elastic_test.go
@@ -20,6 +20,7 @@ package ees
import (
"reflect"
"testing"
+ "time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
@@ -27,10 +28,7 @@ import (
)
func TestGetMetrics(t *testing.T) {
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
ee := &ElasticEE{
dc: dc,
}
@@ -58,10 +56,7 @@ func TestInitClient(t *testing.T) {
func TestElasticExportEventErr(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc)
if err != nil {
t.Error(err)
diff --git a/ees/filecsv.go b/ees/filecsv.go
index 3fc27d287..62854f9a3 100644
--- a/ees/filecsv.go
+++ b/ees/filecsv.go
@@ -35,7 +35,7 @@ import (
func NewFileCSVee(cfg *config.EventExporterCfg,
cgrCfg *config.CGRConfig, filterS *engine.FilterS,
- dc *utils.SafeMapStorage, wrtr io.WriteCloser) (fCsv *FileCSVee, err error) {
+ dc *utils.ExporterMetrics, wrtr io.WriteCloser) (fCsv *FileCSVee, err error) {
fCsv = &FileCSVee{
cfg: cfg,
dc: dc,
@@ -50,7 +50,7 @@ func NewFileCSVee(cfg *config.EventExporterCfg,
// FileCSVee implements EventExporter interface for .csv files
type FileCSVee struct {
cfg *config.EventExporterCfg
- dc *utils.SafeMapStorage
+ dc *utils.ExporterMetrics
wrtr io.WriteCloser // writer for the csv
csvWriter *csv.Writer
sync.Mutex
@@ -132,7 +132,7 @@ func (fCsv *FileCSVee) Close() (err error) {
return
}
-func (fCsv *FileCSVee) GetMetrics() *utils.SafeMapStorage { return fCsv.dc }
+func (fCsv *FileCSVee) GetMetrics() *utils.ExporterMetrics { return fCsv.dc }
func (fCsv *FileCSVee) ExtraData(ev *utils.CGREvent) any { return nil }
diff --git a/ees/filecsv_it_test.go b/ees/filecsv_it_test.go
index 6d6c0500e..ccb375bde 100644
--- a/ees/filecsv_it_test.go
+++ b/ees/filecsv_it_test.go
@@ -723,13 +723,7 @@ func TestCsvInitFileCSV(t *testing.T) {
if err := os.MkdirAll("/tmp/TestInitFileCSV", 0666); err != nil {
t.Error(err)
}
- dc, err := newEEMetrics(utils.FirstNonEmpty(
- "Local",
- utils.EmptyString,
- ))
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
fCsv := &FileCSVee{
cgrCfg: cgrCfg,
cfg: cgrCfg.EEsCfg().Exporters[0],
diff --git a/ees/filecsv_test.go b/ees/filecsv_test.go
index b82b6c6fb..cb15a1330 100644
--- a/ees/filecsv_test.go
+++ b/ees/filecsv_test.go
@@ -24,6 +24,7 @@ import (
"io"
"reflect"
"testing"
+ "time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
@@ -32,13 +33,7 @@ import (
)
func TestFileCsvGetMetrics(t *testing.T) {
- dc, err := newEEMetrics(utils.FirstNonEmpty(
- "Local",
- utils.EmptyString,
- ))
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
fCsv := &FileCSVee{dc: dc}
if rcv := fCsv.GetMetrics(); !reflect.DeepEqual(rcv, fCsv.dc) {
@@ -66,7 +61,7 @@ func TestFileCsvComposeHeader(t *testing.T) {
filterS: filterS,
wrtr: nopCloser{byteBuff},
csvWriter: csvNW,
- dc: &utils.SafeMapStorage{},
+ dc: &utils.ExporterMetrics{},
}
fCsv.Cfg().Fields = []*config.FCTemplate{
{
@@ -130,7 +125,7 @@ func TestFileCsvComposeTrailer(t *testing.T) {
filterS: filterS,
wrtr: nopCloser{byteBuff},
csvWriter: csvNW,
- dc: &utils.SafeMapStorage{},
+ dc: &utils.ExporterMetrics{},
}
fCsv.Cfg().Fields = []*config.FCTemplate{
{
@@ -188,13 +183,7 @@ func TestFileCsvExportEvent(t *testing.T) {
filterS := engine.NewFilterS(cfg, nil, newDM)
byteBuff := new(bytes.Buffer)
csvNW := csv.NewWriter(byteBuff)
- dc, err := newEEMetrics(utils.FirstNonEmpty(
- "Local",
- utils.EmptyString,
- ))
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
fCsv := &FileCSVee{
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
@@ -228,7 +217,7 @@ func TestFileCsvOnEvictedTrailer(t *testing.T) {
filterS: filterS,
wrtr: nopCloserWrite{byteBuff},
csvWriter: csvNW,
- dc: &utils.SafeMapStorage{},
+ dc: &utils.ExporterMetrics{},
}
fCsv.Cfg().Fields = []*config.FCTemplate{
{
@@ -263,7 +252,7 @@ func TestFileCsvOnEvictedClose(t *testing.T) {
filterS: filterS,
wrtr: nopCloserError{byteBuff},
csvWriter: csvNW,
- dc: &utils.SafeMapStorage{},
+ dc: &utils.ExporterMetrics{},
}
fCsv.Cfg().Fields = []*config.FCTemplate{
{
diff --git a/ees/filefwv.go b/ees/filefwv.go
index 6a6f896f0..1926a7d98 100644
--- a/ees/filefwv.go
+++ b/ees/filefwv.go
@@ -31,7 +31,7 @@ import (
"github.com/cgrates/cgrates/utils"
)
-func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, dc *utils.SafeMapStorage, writer io.Writer) (fFwv *FileFWVee, err error) {
+func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, dc *utils.ExporterMetrics, writer io.Writer) (fFwv *FileFWVee, err error) {
fFwv = &FileFWVee{
cfg: cfg,
dc: dc,
@@ -46,7 +46,7 @@ func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filter
// FileFWVee implements EventExporter interface for .fwv files
type FileFWVee struct {
cfg *config.EventExporterCfg
- dc *utils.SafeMapStorage
+ dc *utils.ExporterMetrics
writer io.WriteCloser
sync.Mutex
slicePreparing
@@ -139,6 +139,6 @@ func (fFwv *FileFWVee) Close() (err error) {
return
}
-func (fFwv *FileFWVee) GetMetrics() *utils.SafeMapStorage { return fFwv.dc }
+func (fFwv *FileFWVee) GetMetrics() *utils.ExporterMetrics { return fFwv.dc }
func (fFwv *FileFWVee) ExtraData(ev *utils.CGREvent) any { return nil }
diff --git a/ees/filefwv_it_test.go b/ees/filefwv_it_test.go
index 0b20e9829..238af2dc8 100644
--- a/ees/filefwv_it_test.go
+++ b/ees/filefwv_it_test.go
@@ -157,13 +157,7 @@ func TestFileFwvInit(t *testing.T) {
if err := os.MkdirAll("/tmp/TestInitFileFWV", 0666); err != nil {
t.Error(err)
}
- dc, err := newEEMetrics(utils.FirstNonEmpty(
- "Local",
- utils.EmptyString,
- ))
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
fFwv := &FileFWVee{
cgrCfg: cgrCfg,
cfg: cgrCfg.EEsCfg().Exporters[0],
diff --git a/ees/filefwv_test.go b/ees/filefwv_test.go
index ad7e9654a..013e479e2 100644
--- a/ees/filefwv_test.go
+++ b/ees/filefwv_test.go
@@ -24,6 +24,7 @@ import (
"io"
"reflect"
"testing"
+ "time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
@@ -32,10 +33,7 @@ import (
)
func TestFileFwvGetMetrics(t *testing.T) {
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
fFwv := &FileFWVee{dc: dc}
if rcv := fFwv.GetMetrics(); !reflect.DeepEqual(rcv, fFwv.dc) {
@@ -56,7 +54,7 @@ func TestFileFwvComposeHeader(t *testing.T) {
cgrCfg: cfg,
filterS: filterS,
writer: nopCloser{byteBuff},
- dc: &utils.SafeMapStorage{},
+ dc: &utils.ExporterMetrics{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
{
@@ -119,7 +117,7 @@ func TestFileFwvComposeTrailer(t *testing.T) {
cgrCfg: cfg,
filterS: filterS,
writer: nopCloser{byteBuff},
- dc: &utils.SafeMapStorage{},
+ dc: &utils.ExporterMetrics{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
{
@@ -177,10 +175,7 @@ func TestFileFwvExportEvent(t *testing.T) {
filterS := engine.NewFilterS(cfg, nil, newDM)
byteBuff := new(bytes.Buffer)
csvNW := csv.NewWriter(byteBuff)
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
fFwv := &FileFWVee{
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
@@ -214,10 +209,7 @@ func TestFileFwvExportEventWriteError(t *testing.T) {
newDM := engine.NewDataManager(dbCM, cfg, nil)
filterS := engine.NewFilterS(cfg, nil, newDM)
byteBuff := new(bytes.Buffer)
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
fFwv := &FileFWVee{
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
@@ -242,7 +234,7 @@ func TestFileFwvComposeHeaderWriteError(t *testing.T) {
cgrCfg: cfg,
filterS: filterS,
writer: nopCloserWrite{byteBuff},
- dc: &utils.SafeMapStorage{},
+ dc: &utils.ExporterMetrics{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
{
@@ -275,7 +267,7 @@ func TestFileFwvComposeTrailerWriteError(t *testing.T) {
cgrCfg: cfg,
filterS: filterS,
writer: nopCloserWrite{byteBuff},
- dc: &utils.SafeMapStorage{},
+ dc: &utils.ExporterMetrics{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
{
@@ -307,7 +299,7 @@ func TestFileFwvOnEvictedTrailer(t *testing.T) {
cgrCfg: cfg,
filterS: filterS,
writer: nopCloserWrite{byteBuff},
- dc: &utils.SafeMapStorage{},
+ dc: &utils.ExporterMetrics{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
{
@@ -346,7 +338,7 @@ func TestFileFwvOnEvictedClose(t *testing.T) {
cgrCfg: cfg,
filterS: filterS,
writer: nopCloserError{byteBuff},
- dc: &utils.SafeMapStorage{},
+ dc: &utils.ExporterMetrics{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
{
diff --git a/ees/httpjsonmap.go b/ees/httpjsonmap.go
index 3da73d2a4..e5a318bb4 100644
--- a/ees/httpjsonmap.go
+++ b/ees/httpjsonmap.go
@@ -34,7 +34,7 @@ import (
)
func NewHTTPjsonMapEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS,
- dc *utils.SafeMapStorage) (pstrJSON *HTTPjsonMapEE, err error) {
+ dc *utils.ExporterMetrics) (pstrJSON *HTTPjsonMapEE, err error) {
pstrJSON = &HTTPjsonMapEE{
cfg: cfg,
dc: dc,
@@ -48,7 +48,7 @@ func NewHTTPjsonMapEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, fi
// HTTPjsonMapEE implements EventExporter interface for .csv files
type HTTPjsonMapEE struct {
cfg *config.EventExporterCfg
- dc *utils.SafeMapStorage
+ dc *utils.ExporterMetrics
client *http.Client
reqs *concReq
@@ -92,7 +92,7 @@ func (httpEE *HTTPjsonMapEE) ExportEvent(ctx *context.Context, content, _ any) (
func (httpEE *HTTPjsonMapEE) Close() (_ error) { return }
-func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.SafeMapStorage { return httpEE.dc }
+func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.ExporterMetrics { return httpEE.dc }
func (httpEE *HTTPjsonMapEE) ExtraData(ev *utils.CGREvent) any { return nil }
diff --git a/ees/httpjsonmap_test.go b/ees/httpjsonmap_test.go
index efdb9a490..16d403849 100644
--- a/ees/httpjsonmap_test.go
+++ b/ees/httpjsonmap_test.go
@@ -33,10 +33,7 @@ import (
)
func TestHttpJsonMapGetMetrics(t *testing.T) {
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
httpEE := &HTTPjsonMapEE{
dc: dc,
}
diff --git a/ees/httppost.go b/ees/httppost.go
index 002be953d..a020db08e 100644
--- a/ees/httppost.go
+++ b/ees/httppost.go
@@ -30,7 +30,7 @@ import (
)
func NewHTTPPostEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS,
- dc *utils.SafeMapStorage) (httpPost *HTTPPostEE, err error) {
+ dc *utils.ExporterMetrics) (httpPost *HTTPPostEE, err error) {
httpPost = &HTTPPostEE{
cfg: cfg,
dc: dc,
@@ -44,7 +44,7 @@ func NewHTTPPostEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filte
// FileCSVee implements EventExporter interface for .csv files
type HTTPPostEE struct {
cfg *config.EventExporterCfg
- dc *utils.SafeMapStorage
+ dc *utils.ExporterMetrics
client *http.Client
reqs *concReq
@@ -92,7 +92,7 @@ func (httpPost *HTTPPostEE) ExportEvent(ctx *context.Context, content, _ any) (e
func (httpPost *HTTPPostEE) Close() (_ error) { return }
-func (httpPost *HTTPPostEE) GetMetrics() *utils.SafeMapStorage { return httpPost.dc }
+func (httpPost *HTTPPostEE) GetMetrics() *utils.ExporterMetrics { return httpPost.dc }
func (httpPost *HTTPPostEE) ExtraData(ev *utils.CGREvent) any { return nil }
diff --git a/ees/httppost_test.go b/ees/httppost_test.go
index 1c95f60a1..c6b53ffd2 100644
--- a/ees/httppost_test.go
+++ b/ees/httppost_test.go
@@ -34,13 +34,7 @@ import (
)
func TestHttpPostGetMetrics(t *testing.T) {
- dc, err := newEEMetrics(utils.FirstNonEmpty(
- "Local",
- utils.EmptyString,
- ))
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
httpPost := &HTTPPostEE{
dc: dc,
}
diff --git a/ees/kafka.go b/ees/kafka.go
index e6b4b488c..97653868b 100644
--- a/ees/kafka.go
+++ b/ees/kafka.go
@@ -32,7 +32,7 @@ import (
)
// NewKafkaEE creates a kafka poster
-func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*KafkaEE, error) {
+func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) (*KafkaEE, error) {
pstr := &KafkaEE{
cfg: cfg,
dc: dc,
@@ -100,7 +100,7 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*KafkaE
type KafkaEE struct {
writer *kafka.Writer
cfg *config.EventExporterCfg
- dc *utils.SafeMapStorage
+ dc *utils.ExporterMetrics
reqs *concReq
bytePreparing
}
@@ -131,7 +131,7 @@ func (k *KafkaEE) Close() error {
return k.writer.Close()
}
-func (k *KafkaEE) GetMetrics() *utils.SafeMapStorage { return k.dc }
+func (k *KafkaEE) GetMetrics() *utils.ExporterMetrics { return k.dc }
func (k *KafkaEE) ExtraData(ev *utils.CGREvent) any {
return utils.ConcatenatedKey(
utils.FirstNonEmpty(engine.MapEvent(ev.APIOpts).GetStringIgnoreErrors(utils.MetaOriginID), utils.GenUUID()),
diff --git a/ees/log.go b/ees/log.go
index 565f39adf..71b20d06c 100644
--- a/ees/log.go
+++ b/ees/log.go
@@ -27,7 +27,7 @@ import (
"github.com/cgrates/cgrates/utils"
)
-func NewLogEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *LogEE {
+func NewLogEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *LogEE {
return &LogEE{
cfg: cfg,
dc: dc,
@@ -37,7 +37,7 @@ func NewLogEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *LogEE {
// LogEE implements EventExporter interface for .csv files
type LogEE struct {
cfg *config.EventExporterCfg
- dc *utils.SafeMapStorage
+ dc *utils.ExporterMetrics
}
func (vEe *LogEE) Cfg() *config.EventExporterCfg { return vEe.cfg }
@@ -48,9 +48,9 @@ func (vEe *LogEE) ExportEvent(_ *context.Context, mp, _ any) error {
utils.EEs, vEe.Cfg().ID, utils.ToJSON(mp)))
return nil
}
-func (vEe *LogEE) Close() error { return nil }
-func (vEe *LogEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc }
-func (vEe *LogEE) ExtraData(ev *utils.CGREvent) any { return nil }
+func (vEe *LogEE) Close() error { return nil }
+func (vEe *LogEE) GetMetrics() *utils.ExporterMetrics { return vEe.dc }
+func (vEe *LogEE) ExtraData(ev *utils.CGREvent) any { return nil }
func (vEe *LogEE) PrepareMap(mp *utils.CGREvent) (any, error) {
return mp.Event, nil
}
diff --git a/ees/log_test.go b/ees/log_test.go
index ee5e2cc38..22a7a3ed6 100644
--- a/ees/log_test.go
+++ b/ees/log_test.go
@@ -23,6 +23,7 @@ import (
"reflect"
"strings"
"testing"
+ "time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
@@ -31,10 +32,7 @@ import (
func TestNewLogEE(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
expected := &LogEE{
cfg: cfg.EEsCfg().ExporterCfg(utils.MetaDefault),
@@ -49,10 +47,7 @@ func TestNewLogEE(t *testing.T) {
func TestLogEEExportEvent(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), dc)
mp := map[string]any{
"field1": 2,
@@ -72,33 +67,23 @@ func TestLogEEExportEvent(t *testing.T) {
}
}
-func TestLogEEGetMetrics(t *testing.T) {
- cfg := config.NewDefaultCGRConfig()
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
+func TestLogEE_GetMetrics(t *testing.T) {
+ mockMetrics := &utils.ExporterMetrics{}
+
+ vEe := &LogEE{
+ dc: mockMetrics,
}
- dc.MapStorage = utils.MapStorage{
- "metric1": "value",
- }
- expected := &utils.SafeMapStorage{
- MapStorage: utils.MapStorage{
- "metric1": "value",
- },
- }
- logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), dc)
- rcv := logEE.GetMetrics()
- if !reflect.DeepEqual(rcv, expected) {
- t.Errorf("Expected %T \n but received \n %T", expected, rcv)
+
+ result := vEe.GetMetrics()
+
+ if result != mockMetrics {
+ t.Errorf("expected %v, got %v", mockMetrics, result)
}
}
func TestLogEEPrepareMap(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), dc)
mp := &utils.CGREvent{
Event: map[string]any{
@@ -114,10 +99,7 @@ func TestLogEEPrepareMap(t *testing.T) {
func TestLogEEPrepareOrderMap(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), dc)
mp := utils.NewOrderedNavigableMap()
fullPath := &utils.FullPath{
diff --git a/ees/nats.go b/ees/nats.go
index 22d5764a0..e6cf758cc 100644
--- a/ees/nats.go
+++ b/ees/nats.go
@@ -34,7 +34,7 @@ import (
)
// NewNatsEE creates a kafka poster
-func NewNatsEE(cfg *config.EventExporterCfg, nodeID string, connTimeout time.Duration, dc *utils.SafeMapStorage) (natsPstr *NatsEE, err error) {
+func NewNatsEE(cfg *config.EventExporterCfg, nodeID string, connTimeout time.Duration, dc *utils.ExporterMetrics) (natsPstr *NatsEE, err error) {
natsPstr = &NatsEE{
cfg: cfg,
dc: dc,
@@ -55,7 +55,7 @@ type NatsEE struct {
posterJS jetstream.JetStream
cfg *config.EventExporterCfg
- dc *utils.SafeMapStorage
+ dc *utils.ExporterMetrics
reqs *concReq
sync.RWMutex // protect writer
bytePreparing
@@ -127,7 +127,7 @@ func (pstr *NatsEE) Close() error {
return err
}
-func (pstr *NatsEE) GetMetrics() *utils.SafeMapStorage { return pstr.dc }
+func (pstr *NatsEE) GetMetrics() *utils.ExporterMetrics { return pstr.dc }
func (pstr *NatsEE) ExtraData(ev *utils.CGREvent) any { return nil }
diff --git a/ees/rpc.go b/ees/rpc.go
index a25f338e8..ecb097d2c 100644
--- a/ees/rpc.go
+++ b/ees/rpc.go
@@ -29,7 +29,7 @@ import (
"github.com/cgrates/cgrates/utils"
)
-func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage,
+func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics,
connMgr *engine.ConnManager) (e *RPCee, err error) {
e = &RPCee{
cfg: cfg,
@@ -42,7 +42,7 @@ func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage,
type RPCee struct {
cfg *config.EventExporterCfg
- dc *utils.SafeMapStorage
+ dc *utils.ExporterMetrics
connMgr *engine.ConnManager
//opts
@@ -81,7 +81,7 @@ func (e *RPCee) Close() (err error) {
return
}
-func (e *RPCee) GetMetrics() (mp *utils.SafeMapStorage) {
+func (e *RPCee) GetMetrics() (mp *utils.ExporterMetrics) {
return e.dc
}
func (e *RPCee) ExtraData(ev *utils.CGREvent) any { return nil }
diff --git a/ees/rpc_test.go b/ees/rpc_test.go
index 9109069f8..ae68ae2b5 100644
--- a/ees/rpc_test.go
+++ b/ees/rpc_test.go
@@ -21,6 +21,7 @@ package ees
import (
"reflect"
"testing"
+ "time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -29,10 +30,7 @@ import (
func TestNewRpcEE(t *testing.T) {
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
connMgr := engine.NewConnManager(config.NewDefaultCGRConfig())
rcv, err := NewRpcEE(eeSCfg, dc, connMgr)
@@ -102,10 +100,7 @@ func TestRPCCfg(t *testing.T) {
func TestRPCConnect(t *testing.T) {
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
connMgr := engine.NewConnManager(config.NewDefaultCGRConfig())
rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr)
if err != nil {
@@ -147,10 +142,7 @@ func TestRPCConnect(t *testing.T) {
func TestRPCClose(t *testing.T) {
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
connMgr := engine.NewConnManager(config.NewDefaultCGRConfig())
rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr)
if err != nil {
@@ -166,7 +158,7 @@ func TestRPCClose(t *testing.T) {
func TestRPCGetMetrics(t *testing.T) {
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
- dc := &utils.SafeMapStorage{
+ dc := &utils.ExporterMetrics{
MapStorage: utils.MapStorage{
"time": "now",
"just_a_field": "just_a_value",
@@ -185,10 +177,7 @@ func TestRPCGetMetrics(t *testing.T) {
func TestRPCPrepareMap(t *testing.T) {
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
connMgr := engine.NewConnManager(config.NewDefaultCGRConfig())
rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr)
if err != nil {
diff --git a/ees/s3.go b/ees/s3.go
index c981d9c45..a207840d6 100644
--- a/ees/s3.go
+++ b/ees/s3.go
@@ -34,7 +34,7 @@ import (
)
// NewS3EE creates a s3 poster
-func NewS3EE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *S3EE {
+func NewS3EE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *S3EE {
pstr := &S3EE{
cfg: cfg,
dc: dc,
@@ -56,7 +56,7 @@ type S3EE struct {
up *s3manager.Uploader
cfg *config.EventExporterCfg
- dc *utils.SafeMapStorage
+ dc *utils.ExporterMetrics
reqs *concReq
sync.RWMutex // protect connection
bytePreparing
@@ -138,7 +138,7 @@ func (pstr *S3EE) ExportEvent(ctx *context.Context, message, extraData any) (err
func (pstr *S3EE) Close() (_ error) { return }
-func (pstr *S3EE) GetMetrics() *utils.SafeMapStorage { return pstr.dc }
+func (pstr *S3EE) GetMetrics() *utils.ExporterMetrics { return pstr.dc }
func (pstr *S3EE) ExtraData(ev *utils.CGREvent) any {
return utils.ConcatenatedKey(
diff --git a/ees/sql.go b/ees/sql.go
index 71f755526..8da69a404 100644
--- a/ees/sql.go
+++ b/ees/sql.go
@@ -36,7 +36,7 @@ import (
)
func NewSQLEe(cfg *config.EventExporterCfg,
- dc *utils.SafeMapStorage) (sqlEe *SQLEe, err error) {
+ dc *utils.ExporterMetrics) (sqlEe *SQLEe, err error) {
sqlEe = &SQLEe{
cfg: cfg,
dc: dc,
@@ -49,7 +49,7 @@ func NewSQLEe(cfg *config.EventExporterCfg,
// SQLEe implements EventExporter interface for SQL
type SQLEe struct {
cfg *config.EventExporterCfg
- dc *utils.SafeMapStorage
+ dc *utils.ExporterMetrics
db *gorm.DB
sqldb *sql.DB
reqs *concReq
@@ -157,7 +157,7 @@ func (sqlEe *SQLEe) Close() (err error) {
return
}
-func (sqlEe *SQLEe) GetMetrics() *utils.SafeMapStorage { return sqlEe.dc }
+func (sqlEe *SQLEe) GetMetrics() *utils.ExporterMetrics { return sqlEe.dc }
func (sqlEe *SQLEe) ExtraData(ev *utils.CGREvent) any { return nil }
diff --git a/ees/sql_test.go b/ees/sql_test.go
index 3ef3a7193..814a35480 100644
--- a/ees/sql_test.go
+++ b/ees/sql_test.go
@@ -22,6 +22,7 @@ import (
"fmt"
"reflect"
"testing"
+ "time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
@@ -32,10 +33,7 @@ import (
)
func TestSqlGetMetrics(t *testing.T) {
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
sqlEe := &SQLEe{
dc: dc,
}
diff --git a/ees/sqs.go b/ees/sqs.go
index 8e4f1bc53..2e066f588 100644
--- a/ees/sqs.go
+++ b/ees/sqs.go
@@ -32,7 +32,7 @@ import (
)
// NewSQSee creates a poster for sqs
-func NewSQSee(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *SQSee {
+func NewSQSee(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *SQSee {
pstr := &SQSee{
cfg: cfg,
dc: dc,
@@ -54,7 +54,7 @@ type SQSee struct {
svc *sqs.SQS
cfg *config.EventExporterCfg
- dc *utils.SafeMapStorage
+ dc *utils.ExporterMetrics
reqs *concReq
sync.RWMutex // protect connection
bytePreparing
@@ -146,6 +146,6 @@ func (pstr *SQSee) ExportEvent(ctx *context.Context, message, _ any) (err error)
func (pstr *SQSee) Close() (_ error) { return }
-func (pstr *SQSee) GetMetrics() *utils.SafeMapStorage { return pstr.dc }
+func (pstr *SQSee) GetMetrics() *utils.ExporterMetrics { return pstr.dc }
func (pstr *SQSee) ExtraData(ev *utils.CGREvent) any { return nil }
diff --git a/ees/virtualee.go b/ees/virtualee.go
index ee3cc7d83..56d969abe 100644
--- a/ees/virtualee.go
+++ b/ees/virtualee.go
@@ -24,7 +24,7 @@ import (
"github.com/cgrates/cgrates/utils"
)
-func NewVirtualEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *VirtualEE {
+func NewVirtualEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *VirtualEE {
return &VirtualEE{
cfg: cfg,
dc: dc,
@@ -34,14 +34,14 @@ func NewVirtualEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *Virtu
// VirtualEE implements EventExporter interface for .csv files
type VirtualEE struct {
cfg *config.EventExporterCfg
- dc *utils.SafeMapStorage
+ dc *utils.ExporterMetrics
}
func (vEe *VirtualEE) Cfg() *config.EventExporterCfg { return vEe.cfg }
func (vEe *VirtualEE) Connect() error { return nil }
func (vEe *VirtualEE) ExportEvent(*context.Context, any, any) error { return nil }
func (vEe *VirtualEE) Close() error { return nil }
-func (vEe *VirtualEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc }
+func (vEe *VirtualEE) GetMetrics() *utils.ExporterMetrics { return vEe.dc }
func (vEe *VirtualEE) ExtraData(*utils.CGREvent) any { return nil }
func (vEe *VirtualEE) PrepareMap(mp *utils.CGREvent) (any, error) { return nil, nil }
func (vEe *VirtualEE) PrepareOrderMap(*utils.OrderedNavigableMap) (any, error) {
diff --git a/ees/virtualee_test.go b/ees/virtualee_test.go
index 84ea42b01..d6a2b8663 100644
--- a/ees/virtualee_test.go
+++ b/ees/virtualee_test.go
@@ -21,16 +21,14 @@ package ees
import (
"reflect"
"testing"
+ "time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/utils"
)
func TestVirtualEeGetMetrics(t *testing.T) {
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
+ dc := utils.NewExporterMetrics("", time.Local)
vEe := &VirtualEE{
dc: dc,
}
diff --git a/utils/exportermetrics.go b/utils/exportermetrics.go
new file mode 100644
index 000000000..ade0fc90b
--- /dev/null
+++ b/utils/exportermetrics.go
@@ -0,0 +1,151 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package utils
+
+import (
+ "sync"
+ "time"
+
+ "github.com/cgrates/cron"
+)
+
+// ExporterMetrics stores export statistics with thread-safe access and
+// cron-scheduled resets.
+type ExporterMetrics struct {
+ mu sync.RWMutex
+ MapStorage MapStorage
+ cron *cron.Cron
+ loc *time.Location
+}
+
+// NewExporterMetrics creates metrics with optional automatic reset.
+// schedule is a cron expression for reset timing (empty to disable).
+func NewExporterMetrics(schedule string, loc *time.Location) *ExporterMetrics {
+ m := &ExporterMetrics{
+ loc: loc,
+ }
+ m.Reset() // init MapStorage with default values
+
+ if schedule != "" {
+ m.cron = cron.New()
+ m.cron.AddFunc(schedule, func() {
+ m.Reset()
+ })
+ m.cron.Start()
+ }
+ return m
+}
+
+// Reset immediately clears all metrics and resets them to initial values.
+func (m *ExporterMetrics) Reset() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.MapStorage = MapStorage{
+ NumberOfEvents: int64(0),
+ PositiveExports: StringSet{},
+ NegativeExports: StringSet{},
+ TimeNow: time.Now().In(m.loc),
+ }
+}
+
+// StopCron stops the automatic reset schedule if one is active.
+func (m *ExporterMetrics) StopCron() {
+ if m.cron == nil {
+ return
+ }
+ m.cron.Stop()
+ // ctx := m.cron.Stop()
+ // <-ctx.Done() // wait for any running jobs to complete
+}
+
+// String returns the map as json string.
+func (m *ExporterMetrics) String() string {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+ return m.MapStorage.String()
+}
+
+// FieldAsInterface returns the value from the path.
+func (m *ExporterMetrics) FieldAsInterface(fldPath []string) (val any, err error) {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+ return m.MapStorage.FieldAsInterface(fldPath)
+}
+
+// FieldAsString returns the value from path as string.
+func (m *ExporterMetrics) FieldAsString(fldPath []string) (str string, err error) {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+ return m.MapStorage.FieldAsString(fldPath)
+}
+
+// Set sets the value at the given path.
+func (m *ExporterMetrics) Set(fldPath []string, val any) (err error) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return m.MapStorage.Set(fldPath, val)
+}
+
+// GetKeys returns all the keys from map.
+func (m *ExporterMetrics) GetKeys(nested bool, nestedLimit int, prefix string) (keys []string) {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+ return m.MapStorage.GetKeys(nested, nestedLimit, prefix)
+}
+
+// Remove removes the item at path
+func (m *ExporterMetrics) Remove(fldPath []string) (err error) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return m.MapStorage.Remove(fldPath)
+}
+
+func (m *ExporterMetrics) ClonedMapStorage() (msClone MapStorage) {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+ return m.MapStorage.Clone()
+}
+
+// IncrementEvents increases the event counter (NumberOfEvents) by 1.
+func (m *ExporterMetrics) IncrementEvents() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ count, _ := m.MapStorage[NumberOfEvents].(int64)
+ m.MapStorage[NumberOfEvents] = count + 1
+}
+
+// Lock locks the ExporterMetrics mutex.
+func (m *ExporterMetrics) Lock() {
+ m.mu.Lock()
+}
+
+// Unlock unlocks the ExporterMetrics mutex.
+func (m *ExporterMetrics) Unlock() {
+ m.mu.Unlock()
+}
+
+// RLock locks the ExporterMetrics mutex for reading.
+func (m *ExporterMetrics) RLock() {
+ m.mu.RLock()
+}
+
+// RUnlock unlocks the read lock on the ExporterMetrics mutex.
+func (m *ExporterMetrics) RUnlock() {
+ m.mu.RUnlock()
+}
diff --git a/utils/safemapstorage_test.go b/utils/exportermetrics_test.go
similarity index 74%
rename from utils/safemapstorage_test.go
rename to utils/exportermetrics_test.go
index e2b4f3885..e118222cc 100644
--- a/utils/safemapstorage_test.go
+++ b/utils/exportermetrics_test.go
@@ -23,8 +23,8 @@ import (
"testing"
)
-func TestSafeMapStorageString(t *testing.T) {
- ms := &SafeMapStorage{
+func TestExporterMetricsString(t *testing.T) {
+ ms := &ExporterMetrics{
MapStorage: MapStorage{
"field1": 2,
},
@@ -35,8 +35,8 @@ func TestSafeMapStorageString(t *testing.T) {
}
}
-func TestSafeMapStorageFieldAsInterface(t *testing.T) {
- ms := &SafeMapStorage{
+func TestExporterMetricsFieldAsInterface(t *testing.T) {
+ ms := &ExporterMetrics{
MapStorage: MapStorage{
"field1": 2,
},
@@ -51,8 +51,8 @@ func TestSafeMapStorageFieldAsInterface(t *testing.T) {
}
}
-func TestSafeMapStorageFieldAsString(t *testing.T) {
- ms := &SafeMapStorage{
+func TestExporterMetricsFieldAsString(t *testing.T) {
+ ms := &ExporterMetrics{
MapStorage: MapStorage{
"field1": 2,
},
@@ -67,14 +67,14 @@ func TestSafeMapStorageFieldAsString(t *testing.T) {
}
}
-func TestSafeMapStorageSet(t *testing.T) {
- ms := &SafeMapStorage{
+func TestExporterMetricsSet(t *testing.T) {
+ ms := &ExporterMetrics{
MapStorage: MapStorage{
"field1": 2,
},
}
- expected := &SafeMapStorage{
+ expected := &ExporterMetrics{
MapStorage: MapStorage{
"field1": 2,
"field2": 3,
@@ -88,8 +88,8 @@ func TestSafeMapStorageSet(t *testing.T) {
}
}
-func TestSafeMapStorageGetKeys(t *testing.T) {
- ms := &SafeMapStorage{
+func TestExporterMetricsGetKeys(t *testing.T) {
+ ms := &ExporterMetrics{
MapStorage: MapStorage{
"field1": 2,
"field2": 3,
@@ -105,15 +105,15 @@ func TestSafeMapStorageGetKeys(t *testing.T) {
}
}
-func TestSafeMapStorageRemove(t *testing.T) {
- ms := &SafeMapStorage{
+func TestExporterMetricsRemove(t *testing.T) {
+ ms := &ExporterMetrics{
MapStorage: MapStorage{
"field1": 2,
"field2": 3,
},
}
- expected := &SafeMapStorage{
+ expected := &ExporterMetrics{
MapStorage: MapStorage{
"field1": 2,
},
@@ -126,28 +126,8 @@ func TestSafeMapStorageRemove(t *testing.T) {
}
}
-func TestSafeMapStorageClone(t *testing.T) {
- ms := &SafeMapStorage{
- MapStorage: MapStorage{
- "field1": 2,
- "field2": 3,
- },
- }
-
- expected := &SafeMapStorage{
- MapStorage: MapStorage{
- "field1": 2,
- "field2": 3,
- },
- }
-
- if reply := ms.Clone(); !reflect.DeepEqual(reply, expected) {
- t.Errorf("Expected %v \n but received \n %v", expected, reply)
- }
-}
-
-func TestClonedMapStorage(t *testing.T) {
- ms := &SafeMapStorage{
+func TestExporterMetricsClonedMapStorage(t *testing.T) {
+ ms := &ExporterMetrics{
MapStorage: MapStorage{
"field1": []string{"v1", "v2"},
},
diff --git a/utils/safemapstorage.go b/utils/safemapstorage.go
deleted file mode 100644
index da1966f79..000000000
--- a/utils/safemapstorage.go
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
-Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
-Copyright (C) ITsysCOM GmbH
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU Affero General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU Affero General Public License for more details.
-
-You should have received a copy of the GNU Affero General Public License
-along with this program. If not, see
-*/
-
-package utils
-
-import (
- "sync"
-)
-
-// SafeMapStorage is a Mapstorage with mutex
-type SafeMapStorage struct {
- MapStorage
- sync.RWMutex
-}
-
-// String returns the map as json string
-func (ms *SafeMapStorage) String() string {
- ms.RLock()
- defer ms.RUnlock()
- return ms.MapStorage.String()
-}
-
-// FieldAsInterface returns the value from the path
-func (ms *SafeMapStorage) FieldAsInterface(fldPath []string) (val any, err error) {
- ms.RLock()
- defer ms.RUnlock()
- return ms.MapStorage.FieldAsInterface(fldPath)
-}
-
-// FieldAsString returns the value from path as string
-func (ms *SafeMapStorage) FieldAsString(fldPath []string) (str string, err error) {
- ms.RLock()
- defer ms.RUnlock()
- return ms.MapStorage.FieldAsString(fldPath)
-}
-
-// Set sets the value at the given path
-func (ms *SafeMapStorage) Set(fldPath []string, val any) (err error) {
- ms.Lock()
- defer ms.Unlock()
- return ms.MapStorage.Set(fldPath, val)
-}
-
-// GetKeys returns all the keys from map
-func (ms *SafeMapStorage) GetKeys(nested bool, nestedLimit int, prefix string) (keys []string) {
- ms.RLock()
- defer ms.RUnlock()
- return ms.MapStorage.GetKeys(nested, nestedLimit, prefix)
-}
-
-// Remove removes the item at path
-func (ms *SafeMapStorage) Remove(fldPath []string) (err error) {
- ms.Lock()
- defer ms.Unlock()
- return ms.MapStorage.Remove(fldPath)
-}
-
-func (ms *SafeMapStorage) Clone() (msClone *SafeMapStorage) {
- ms.RLock()
- defer ms.RUnlock()
- return &SafeMapStorage{MapStorage: ms.MapStorage.Clone()}
-}
-
-func (ms *SafeMapStorage) ClonedMapStorage() (msClone MapStorage) {
- ms.RLock()
- defer ms.RUnlock()
- return ms.MapStorage.Clone()
-}