From 3122589e480c28e1a9d994ae2b3d2ef2730fcfa7 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Tue, 4 Nov 2025 13:52:17 +0200 Subject: [PATCH] ees: add cron-based/manual reset to ExporterMetrics --- ees/amqp.go | 6 +- ees/amqp_test.go | 5 +- ees/amqpv1.go | 6 +- ees/apis.go | 37 +++-- ees/ee.go | 28 +--- ees/ee_test.go | 46 +----- ees/ees.go | 4 +- ees/ees_test.go | 12 +- ees/elastic.go | 6 +- ees/elastic_test.go | 11 +- ees/filecsv.go | 6 +- ees/filecsv_it_test.go | 8 +- ees/filecsv_test.go | 25 +-- ees/filefwv.go | 6 +- ees/filefwv_it_test.go | 8 +- ees/filefwv_test.go | 28 ++-- ees/httpjsonmap.go | 6 +- ees/httpjsonmap_test.go | 5 +- ees/httppost.go | 6 +- ees/httppost_test.go | 8 +- ees/kafka.go | 6 +- ees/log.go | 10 +- ees/log_test.go | 48 ++---- ees/nats.go | 6 +- ees/rpc.go | 6 +- ees/rpc_test.go | 23 +-- ees/s3.go | 6 +- ees/sql.go | 6 +- ees/sql_test.go | 6 +- ees/sqs.go | 6 +- ees/virtualee.go | 6 +- ees/virtualee_test.go | 6 +- utils/exportermetrics.go | 151 ++++++++++++++++++ ...torage_test.go => exportermetrics_test.go} | 52 ++---- utils/safemapstorage.go | 83 ---------- 35 files changed, 308 insertions(+), 380 deletions(-) create mode 100644 utils/exportermetrics.go rename utils/{safemapstorage_test.go => exportermetrics_test.go} (74%) delete mode 100644 utils/safemapstorage.go diff --git a/ees/amqp.go b/ees/amqp.go index d279d8437..b09e2761a 100644 --- a/ees/amqp.go +++ b/ees/amqp.go @@ -30,7 +30,7 @@ import ( // NewAMQPee creates a new amqp poster // "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs" -func NewAMQPee(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *AMQPee { +func NewAMQPee(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *AMQPee { amqp := &AMQPee{ cfg: cfg, dc: dc, @@ -50,7 +50,7 @@ type AMQPee struct { postChan *amqp.Channel cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics reqs *concReq sync.RWMutex // protect connection bytePreparing @@ -175,6 +175,6 @@ func (pstr *AMQPee) Close() (err error) { return } -func (pstr *AMQPee) GetMetrics() *utils.SafeMapStorage { return pstr.dc } +func (pstr *AMQPee) GetMetrics() *utils.ExporterMetrics { return pstr.dc } func (pstr *AMQPee) ExtraData(*utils.CGREvent) any { return nil } diff --git a/ees/amqp_test.go b/ees/amqp_test.go index 7cf984feb..5e03541db 100644 --- a/ees/amqp_test.go +++ b/ees/amqp_test.go @@ -28,12 +28,13 @@ import ( func TestNewAMQPee(t *testing.T) { cfg := config.NewDefaultCGRConfig() - dc := &utils.SafeMapStorage{ + dc := &utils.ExporterMetrics{ MapStorage: utils.MapStorage{ utils.NumberOfEvents: int64(0), utils.PositiveExports: utils.StringSet{}, utils.NegativeExports: 5, - }} + }, + } cfg.EEsCfg().ExporterCfg(utils.MetaDefault).ConcurrentRequests = 2 rcv := NewAMQPee(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), dc) exp := &AMQPee{ diff --git a/ees/amqpv1.go b/ees/amqpv1.go index d7319528a..99d24757e 100644 --- a/ees/amqpv1.go +++ b/ees/amqpv1.go @@ -28,7 +28,7 @@ import ( ) // NewAMQPv1EE creates a poster for amqpv1 -func NewAMQPv1EE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *AMQPv1EE { +func NewAMQPv1EE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *AMQPv1EE { pstr := &AMQPv1EE{ cfg: cfg, dc: dc, @@ -54,7 +54,7 @@ type AMQPv1EE struct { session *amqpv1.Session cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics reqs *concReq sync.RWMutex // protect connection bytePreparing @@ -118,6 +118,6 @@ func (pstr *AMQPv1EE) Close() (err error) { return } -func (pstr *AMQPv1EE) GetMetrics() *utils.SafeMapStorage { return pstr.dc } +func (pstr *AMQPv1EE) GetMetrics() *utils.ExporterMetrics { return pstr.dc } func (pstr *AMQPv1EE) ExtraData(*utils.CGREvent) any { return nil } diff --git a/ees/apis.go b/ees/apis.go index e391ae071..b05495a86 100644 --- a/ees/apis.go +++ b/ees/apis.go @@ -186,9 +186,7 @@ func exportEventWithExporter(ctx *context.Context, exp EventExporter, connMngr * }() var eEv any - exp.GetMetrics().Lock() - exp.GetMetrics().MapStorage[utils.NumberOfEvents] = exp.GetMetrics().MapStorage[utils.NumberOfEvents].(int64) + 1 - exp.GetMetrics().Unlock() + exp.GetMetrics().IncrementEvents() if len(exp.Cfg().ContentFields()) == 0 { if eEv, err = exp.PrepareMap(ev); err != nil { return @@ -223,30 +221,31 @@ func (eeS *EeS) V1ArchiveEventsInReply(ctx *context.Context, args *ArchiveEvents return fmt.Errorf("ExporterID is missing from argument's options") } // check if there are any exporters that match our expID - var eesCfg *config.EventExporterCfg + var eeCfg *config.EventExporterCfg for _, exporter := range eeS.cfg.EEsCfg().Exporters { if exporter.ID == expID { - eesCfg = exporter + eeCfg = exporter break } } - if eesCfg == nil { + if eeCfg == nil { return fmt.Errorf("exporter config with ID: %s is missing", expID) } // also mandatory to be synchronous - if !eesCfg.Synchronous { + if !eeCfg.Synchronous { return fmt.Errorf("exporter with ID: %s is not synchronous", expID) } // also mandatory to be type of *buffer - if eesCfg.ExportPath != utils.MetaBuffer { + if eeCfg.ExportPath != utils.MetaBuffer { return fmt.Errorf("exporter with ID: %s has an invalid ExportPath for archiving", expID) } - var dc *utils.SafeMapStorage - if dc, err = newEEMetrics(utils.FirstNonEmpty( - eesCfg.Timezone, - eeS.cfg.GeneralCfg().DefaultTimezone)); err != nil { - return + timezone := utils.FirstNonEmpty(eeCfg.Timezone, eeS.cfg.GeneralCfg().DefaultTimezone) + loc, err := time.LoadLocation(timezone) + if err != nil { + return err } + dc := utils.NewExporterMetrics(eeCfg.MetricsResetSchedule, loc) + var ee EventExporter buff := new(bytes.Buffer) @@ -260,13 +259,13 @@ func (eeS *EeS) V1ArchiveEventsInReply(ctx *context.Context, args *ArchiveEvents }); err != nil { return err } - switch eesCfg.Type { + switch eeCfg.Type { case utils.MetaFileCSV: - ee, err = NewFileCSVee(eesCfg, eeS.cfg, eeS.fltrS, dc, &buffer{wrtr}) + ee, err = NewFileCSVee(eeCfg, eeS.cfg, eeS.fltrS, dc, &buffer{wrtr}) case utils.MetaFileFWV: - ee, err = NewFileFWVee(eesCfg, eeS.cfg, eeS.fltrS, dc, &buffer{wrtr}) + ee, err = NewFileFWVee(eeCfg, eeS.cfg, eeS.fltrS, dc, &buffer{wrtr}) default: - err = fmt.Errorf("unsupported exporter type: %s>", eesCfg.Type) + err = fmt.Errorf("unsupported exporter type: %s>", eeCfg.Type) } if err != nil { return err @@ -286,10 +285,10 @@ func (eeS *EeS) V1ArchiveEventsInReply(ctx *context.Context, args *ArchiveEvents tnt := utils.FirstNonEmpty(args.Tenant, eeS.cfg.GeneralCfg().DefaultTenant) var exported bool for _, event := range args.Events { - if len(eesCfg.Filters) != 0 && !ignoreFltr { + if len(eeCfg.Filters) != 0 && !ignoreFltr { cgrDp[utils.MetaReq] = event.Event if pass, errPass := eeS.fltrS.Pass(ctx, tnt, - eesCfg.Filters, cgrDp); errPass != nil { + eeCfg.Filters, cgrDp); errPass != nil { return errPass } else if !pass { continue // does not pass the filters, ignore the exporter diff --git a/ees/ee.go b/ees/ee.go index 996a9ab13..3ac8b442b 100644 --- a/ees/ee.go +++ b/ees/ee.go @@ -35,7 +35,7 @@ type EventExporter interface { Connect() error // called before exporting an event to make sure it is connected ExportEvent(ctx *context.Context, content any, extraData any) error // called on each event to be exported Close() error // called when the exporter needs to terminate - GetMetrics() *utils.SafeMapStorage // called to get metrics + GetMetrics() *utils.ExporterMetrics // called to get metrics PrepareMap(*utils.CGREvent) (any, error) PrepareOrderMap(*utils.OrderedNavigableMap) (any, error) ExtraData(*utils.CGREvent) any @@ -44,12 +44,13 @@ type EventExporter interface { // NewEventExporter produces exporters func NewEventExporter(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, connMngr *engine.ConnManager) (ee EventExporter, err error) { - var dc *utils.SafeMapStorage - if dc, err = newEEMetrics(utils.FirstNonEmpty( - cfg.Timezone, - cgrCfg.GeneralCfg().DefaultTimezone)); err != nil { - return + timezone := utils.FirstNonEmpty(cfg.Timezone, cgrCfg.GeneralCfg().DefaultTimezone) + loc, err := time.LoadLocation(timezone) + if err != nil { + return nil, err } + dc := utils.NewExporterMetrics(cfg.MetricsResetSchedule, loc) + switch cfg.Type { case utils.MetaFileCSV: return NewFileCSVee(cfg, cgrCfg, filterS, dc, nil) @@ -125,20 +126,7 @@ func composeHeaderTrailer(ctx *context.Context, prfx string, fields []*config.FC return } -func newEEMetrics(location string) (*utils.SafeMapStorage, error) { - loc, err := time.LoadLocation(location) - if err != nil { - return nil, err - } - return &utils.SafeMapStorage{MapStorage: utils.MapStorage{ - utils.NumberOfEvents: int64(0), - utils.PositiveExports: utils.StringSet{}, - utils.NegativeExports: utils.StringSet{}, - utils.TimeNow: time.Now().In(loc), - }}, nil -} - -func updateEEMetrics(dc *utils.SafeMapStorage, originID string, ev engine.MapEvent, hasError bool, timezone string) { +func updateEEMetrics(dc *utils.ExporterMetrics, originID string, ev engine.MapEvent, hasError bool, timezone string) { dc.Lock() defer dc.Unlock() if hasError { diff --git a/ees/ee_test.go b/ees/ee_test.go index 9c6e8c188..70d61676a 100644 --- a/ees/ee_test.go +++ b/ees/ee_test.go @@ -23,6 +23,7 @@ import ( "reflect" "strings" "testing" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -39,13 +40,7 @@ func TestNewEventExporter(t *testing.T) { if strings.Contains(errExpect, err.Error()) { t.Errorf("Expected %+v but got %+v", errExpect, err) } - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) eeExpect, err := NewFileCSVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc, nil) if strings.Contains(errExpect, err.Error()) { t.Errorf("Expected %+v but got %+v", errExpect, err) @@ -76,13 +71,7 @@ func TestNewEventExporterCase2(t *testing.T) { t.Errorf("Expected %+v but got %+v", errExpect, err) } - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) eeExpect, err := NewFileFWVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc, io.Discard) if strings.Contains(errExpect, err.Error()) { t.Errorf("Expected %+v but got %+v", errExpect, err) @@ -110,13 +99,7 @@ func TestNewEventExporterCase3(t *testing.T) { if err != nil { t.Error(err) } - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) eeExpect, err := NewHTTPPostEE(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc) if err != nil { t.Error(err) @@ -138,13 +121,7 @@ func TestNewEventExporterCase4(t *testing.T) { if err != nil { t.Error(err) } - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) eeExpect, err := NewHTTPjsonMapEE(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc) if err != nil { t.Error(err) @@ -166,13 +143,7 @@ func TestNewEventExporterCase6(t *testing.T) { if err != nil { t.Error(err) } - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) eeExpect := NewVirtualEE(cgrCfg.EEsCfg().Exporters[0], dc) newEE := ee.(*VirtualEE) newEE.dc.MapStorage[utils.TimeNow] = nil @@ -205,10 +176,7 @@ func TestNewEventExporterCase7(t *testing.T) { if err != nil { t.Error(err) } - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) + dc := utils.NewExporterMetrics("", time.Local) if err != nil { t.Error(err) } diff --git a/ees/ees.go b/ees/ees.go index 3da52e532..256fa369f 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -33,7 +33,9 @@ import ( // onCacheEvicted is called by ltcache when evicting an item func onCacheEvicted(_ string, value any) { - value.(EventExporter).Close() + ee := value.(EventExporter) + ee.GetMetrics().StopCron() + ee.Close() } // NewEventExporterS initializes a new EventExporterS. diff --git a/ees/ees_test.go b/ees/ees_test.go index aace7f4c2..7f1390fc3 100644 --- a/ees/ees_test.go +++ b/ees/ees_test.go @@ -304,7 +304,7 @@ func TestV1ProcessEvent4(t *testing.T) { } func newMockEventExporter() *mockEventExporter { - return &mockEventExporter{dc: &utils.SafeMapStorage{ + return &mockEventExporter{dc: &utils.ExporterMetrics{ MapStorage: utils.MapStorage{ utils.NumberOfEvents: int64(0), utils.PositiveExports: utils.StringSet{}, @@ -313,11 +313,11 @@ func newMockEventExporter() *mockEventExporter { } type mockEventExporter struct { - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics bytePreparing } -func (m mockEventExporter) GetMetrics() *utils.SafeMapStorage { +func (m mockEventExporter) GetMetrics() *utils.ExporterMetrics { return m.dc } @@ -331,7 +331,7 @@ func (mockEventExporter) Close() error { } func TestV1ProcessEventMockMetrics(t *testing.T) { - mEe := mockEventExporter{dc: &utils.SafeMapStorage{ + mEe := mockEventExporter{dc: &utils.ExporterMetrics{ MapStorage: utils.MapStorage{ utils.NumberOfEvents: int64(0), utils.PositiveExports: utils.StringSet{}, @@ -455,7 +455,7 @@ func TestOnCacheEvicted(t *testing.T) { } func TestUpdateEEMetrics(t *testing.T) { - dc, _ := newEEMetrics(utils.EmptyString) + dc := utils.NewExporterMetrics("", time.UTC) tnow := time.Now() ev := engine.MapEvent{ utils.AnswerTime: tnow, @@ -464,7 +464,7 @@ func TestUpdateEEMetrics(t *testing.T) { utils.ToR: utils.MetaVoice, utils.Usage: time.Second, } - exp, _ := newEEMetrics(utils.EmptyString) + exp := utils.NewExporterMetrics("", time.UTC) exp.MapStorage[utils.FirstEventATime] = tnow exp.MapStorage[utils.LastEventATime] = tnow exp.MapStorage[utils.FirstExpOrderID] = int64(1) diff --git a/ees/elastic.go b/ees/elastic.go index e86bee422..c6708b4f1 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -39,14 +39,14 @@ import ( type ElasticEE struct { mu sync.RWMutex cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics reqs *concReq client *elasticsearch.TypedClient clientCfg elasticsearch.Config } -func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*ElasticEE, error) { +func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) (*ElasticEE, error) { el := &ElasticEE{ cfg: cfg, dc: dc, @@ -221,7 +221,7 @@ func (e *ElasticEE) Close() error { return nil } -func (e *ElasticEE) GetMetrics() *utils.SafeMapStorage { return e.dc } +func (e *ElasticEE) GetMetrics() *utils.ExporterMetrics { return e.dc } func (eEE *ElasticEE) ExtraData(ev *utils.CGREvent) any { return utils.ConcatenatedKey( diff --git a/ees/elastic_test.go b/ees/elastic_test.go index 291ffdfa0..739c3b865 100644 --- a/ees/elastic_test.go +++ b/ees/elastic_test.go @@ -20,6 +20,7 @@ package ees import ( "reflect" "testing" + "time" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" @@ -27,10 +28,7 @@ import ( ) func TestGetMetrics(t *testing.T) { - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) ee := &ElasticEE{ dc: dc, } @@ -58,10 +56,7 @@ func TestInitClient(t *testing.T) { func TestElasticExportEventErr(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc) if err != nil { t.Error(err) diff --git a/ees/filecsv.go b/ees/filecsv.go index 3fc27d287..62854f9a3 100644 --- a/ees/filecsv.go +++ b/ees/filecsv.go @@ -35,7 +35,7 @@ import ( func NewFileCSVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, - dc *utils.SafeMapStorage, wrtr io.WriteCloser) (fCsv *FileCSVee, err error) { + dc *utils.ExporterMetrics, wrtr io.WriteCloser) (fCsv *FileCSVee, err error) { fCsv = &FileCSVee{ cfg: cfg, dc: dc, @@ -50,7 +50,7 @@ func NewFileCSVee(cfg *config.EventExporterCfg, // FileCSVee implements EventExporter interface for .csv files type FileCSVee struct { cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics wrtr io.WriteCloser // writer for the csv csvWriter *csv.Writer sync.Mutex @@ -132,7 +132,7 @@ func (fCsv *FileCSVee) Close() (err error) { return } -func (fCsv *FileCSVee) GetMetrics() *utils.SafeMapStorage { return fCsv.dc } +func (fCsv *FileCSVee) GetMetrics() *utils.ExporterMetrics { return fCsv.dc } func (fCsv *FileCSVee) ExtraData(ev *utils.CGREvent) any { return nil } diff --git a/ees/filecsv_it_test.go b/ees/filecsv_it_test.go index 6d6c0500e..ccb375bde 100644 --- a/ees/filecsv_it_test.go +++ b/ees/filecsv_it_test.go @@ -723,13 +723,7 @@ func TestCsvInitFileCSV(t *testing.T) { if err := os.MkdirAll("/tmp/TestInitFileCSV", 0666); err != nil { t.Error(err) } - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) fCsv := &FileCSVee{ cgrCfg: cgrCfg, cfg: cgrCfg.EEsCfg().Exporters[0], diff --git a/ees/filecsv_test.go b/ees/filecsv_test.go index b82b6c6fb..cb15a1330 100644 --- a/ees/filecsv_test.go +++ b/ees/filecsv_test.go @@ -24,6 +24,7 @@ import ( "io" "reflect" "testing" + "time" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" @@ -32,13 +33,7 @@ import ( ) func TestFileCsvGetMetrics(t *testing.T) { - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) fCsv := &FileCSVee{dc: dc} if rcv := fCsv.GetMetrics(); !reflect.DeepEqual(rcv, fCsv.dc) { @@ -66,7 +61,7 @@ func TestFileCsvComposeHeader(t *testing.T) { filterS: filterS, wrtr: nopCloser{byteBuff}, csvWriter: csvNW, - dc: &utils.SafeMapStorage{}, + dc: &utils.ExporterMetrics{}, } fCsv.Cfg().Fields = []*config.FCTemplate{ { @@ -130,7 +125,7 @@ func TestFileCsvComposeTrailer(t *testing.T) { filterS: filterS, wrtr: nopCloser{byteBuff}, csvWriter: csvNW, - dc: &utils.SafeMapStorage{}, + dc: &utils.ExporterMetrics{}, } fCsv.Cfg().Fields = []*config.FCTemplate{ { @@ -188,13 +183,7 @@ func TestFileCsvExportEvent(t *testing.T) { filterS := engine.NewFilterS(cfg, nil, newDM) byteBuff := new(bytes.Buffer) csvNW := csv.NewWriter(byteBuff) - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) fCsv := &FileCSVee{ cfg: cfg.EEsCfg().Exporters[0], cgrCfg: cfg, @@ -228,7 +217,7 @@ func TestFileCsvOnEvictedTrailer(t *testing.T) { filterS: filterS, wrtr: nopCloserWrite{byteBuff}, csvWriter: csvNW, - dc: &utils.SafeMapStorage{}, + dc: &utils.ExporterMetrics{}, } fCsv.Cfg().Fields = []*config.FCTemplate{ { @@ -263,7 +252,7 @@ func TestFileCsvOnEvictedClose(t *testing.T) { filterS: filterS, wrtr: nopCloserError{byteBuff}, csvWriter: csvNW, - dc: &utils.SafeMapStorage{}, + dc: &utils.ExporterMetrics{}, } fCsv.Cfg().Fields = []*config.FCTemplate{ { diff --git a/ees/filefwv.go b/ees/filefwv.go index 6a6f896f0..1926a7d98 100644 --- a/ees/filefwv.go +++ b/ees/filefwv.go @@ -31,7 +31,7 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, dc *utils.SafeMapStorage, writer io.Writer) (fFwv *FileFWVee, err error) { +func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, dc *utils.ExporterMetrics, writer io.Writer) (fFwv *FileFWVee, err error) { fFwv = &FileFWVee{ cfg: cfg, dc: dc, @@ -46,7 +46,7 @@ func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filter // FileFWVee implements EventExporter interface for .fwv files type FileFWVee struct { cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics writer io.WriteCloser sync.Mutex slicePreparing @@ -139,6 +139,6 @@ func (fFwv *FileFWVee) Close() (err error) { return } -func (fFwv *FileFWVee) GetMetrics() *utils.SafeMapStorage { return fFwv.dc } +func (fFwv *FileFWVee) GetMetrics() *utils.ExporterMetrics { return fFwv.dc } func (fFwv *FileFWVee) ExtraData(ev *utils.CGREvent) any { return nil } diff --git a/ees/filefwv_it_test.go b/ees/filefwv_it_test.go index 0b20e9829..238af2dc8 100644 --- a/ees/filefwv_it_test.go +++ b/ees/filefwv_it_test.go @@ -157,13 +157,7 @@ func TestFileFwvInit(t *testing.T) { if err := os.MkdirAll("/tmp/TestInitFileFWV", 0666); err != nil { t.Error(err) } - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) fFwv := &FileFWVee{ cgrCfg: cgrCfg, cfg: cgrCfg.EEsCfg().Exporters[0], diff --git a/ees/filefwv_test.go b/ees/filefwv_test.go index ad7e9654a..013e479e2 100644 --- a/ees/filefwv_test.go +++ b/ees/filefwv_test.go @@ -24,6 +24,7 @@ import ( "io" "reflect" "testing" + "time" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" @@ -32,10 +33,7 @@ import ( ) func TestFileFwvGetMetrics(t *testing.T) { - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) fFwv := &FileFWVee{dc: dc} if rcv := fFwv.GetMetrics(); !reflect.DeepEqual(rcv, fFwv.dc) { @@ -56,7 +54,7 @@ func TestFileFwvComposeHeader(t *testing.T) { cgrCfg: cfg, filterS: filterS, writer: nopCloser{byteBuff}, - dc: &utils.SafeMapStorage{}, + dc: &utils.ExporterMetrics{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ { @@ -119,7 +117,7 @@ func TestFileFwvComposeTrailer(t *testing.T) { cgrCfg: cfg, filterS: filterS, writer: nopCloser{byteBuff}, - dc: &utils.SafeMapStorage{}, + dc: &utils.ExporterMetrics{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ { @@ -177,10 +175,7 @@ func TestFileFwvExportEvent(t *testing.T) { filterS := engine.NewFilterS(cfg, nil, newDM) byteBuff := new(bytes.Buffer) csvNW := csv.NewWriter(byteBuff) - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) fFwv := &FileFWVee{ cfg: cfg.EEsCfg().Exporters[0], cgrCfg: cfg, @@ -214,10 +209,7 @@ func TestFileFwvExportEventWriteError(t *testing.T) { newDM := engine.NewDataManager(dbCM, cfg, nil) filterS := engine.NewFilterS(cfg, nil, newDM) byteBuff := new(bytes.Buffer) - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) fFwv := &FileFWVee{ cfg: cfg.EEsCfg().Exporters[0], cgrCfg: cfg, @@ -242,7 +234,7 @@ func TestFileFwvComposeHeaderWriteError(t *testing.T) { cgrCfg: cfg, filterS: filterS, writer: nopCloserWrite{byteBuff}, - dc: &utils.SafeMapStorage{}, + dc: &utils.ExporterMetrics{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ { @@ -275,7 +267,7 @@ func TestFileFwvComposeTrailerWriteError(t *testing.T) { cgrCfg: cfg, filterS: filterS, writer: nopCloserWrite{byteBuff}, - dc: &utils.SafeMapStorage{}, + dc: &utils.ExporterMetrics{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ { @@ -307,7 +299,7 @@ func TestFileFwvOnEvictedTrailer(t *testing.T) { cgrCfg: cfg, filterS: filterS, writer: nopCloserWrite{byteBuff}, - dc: &utils.SafeMapStorage{}, + dc: &utils.ExporterMetrics{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ { @@ -346,7 +338,7 @@ func TestFileFwvOnEvictedClose(t *testing.T) { cgrCfg: cfg, filterS: filterS, writer: nopCloserError{byteBuff}, - dc: &utils.SafeMapStorage{}, + dc: &utils.ExporterMetrics{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ { diff --git a/ees/httpjsonmap.go b/ees/httpjsonmap.go index 3da73d2a4..e5a318bb4 100644 --- a/ees/httpjsonmap.go +++ b/ees/httpjsonmap.go @@ -34,7 +34,7 @@ import ( ) func NewHTTPjsonMapEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, - dc *utils.SafeMapStorage) (pstrJSON *HTTPjsonMapEE, err error) { + dc *utils.ExporterMetrics) (pstrJSON *HTTPjsonMapEE, err error) { pstrJSON = &HTTPjsonMapEE{ cfg: cfg, dc: dc, @@ -48,7 +48,7 @@ func NewHTTPjsonMapEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, fi // HTTPjsonMapEE implements EventExporter interface for .csv files type HTTPjsonMapEE struct { cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics client *http.Client reqs *concReq @@ -92,7 +92,7 @@ func (httpEE *HTTPjsonMapEE) ExportEvent(ctx *context.Context, content, _ any) ( func (httpEE *HTTPjsonMapEE) Close() (_ error) { return } -func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.SafeMapStorage { return httpEE.dc } +func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.ExporterMetrics { return httpEE.dc } func (httpEE *HTTPjsonMapEE) ExtraData(ev *utils.CGREvent) any { return nil } diff --git a/ees/httpjsonmap_test.go b/ees/httpjsonmap_test.go index efdb9a490..16d403849 100644 --- a/ees/httpjsonmap_test.go +++ b/ees/httpjsonmap_test.go @@ -33,10 +33,7 @@ import ( ) func TestHttpJsonMapGetMetrics(t *testing.T) { - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) httpEE := &HTTPjsonMapEE{ dc: dc, } diff --git a/ees/httppost.go b/ees/httppost.go index 002be953d..a020db08e 100644 --- a/ees/httppost.go +++ b/ees/httppost.go @@ -30,7 +30,7 @@ import ( ) func NewHTTPPostEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, - dc *utils.SafeMapStorage) (httpPost *HTTPPostEE, err error) { + dc *utils.ExporterMetrics) (httpPost *HTTPPostEE, err error) { httpPost = &HTTPPostEE{ cfg: cfg, dc: dc, @@ -44,7 +44,7 @@ func NewHTTPPostEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filte // FileCSVee implements EventExporter interface for .csv files type HTTPPostEE struct { cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics client *http.Client reqs *concReq @@ -92,7 +92,7 @@ func (httpPost *HTTPPostEE) ExportEvent(ctx *context.Context, content, _ any) (e func (httpPost *HTTPPostEE) Close() (_ error) { return } -func (httpPost *HTTPPostEE) GetMetrics() *utils.SafeMapStorage { return httpPost.dc } +func (httpPost *HTTPPostEE) GetMetrics() *utils.ExporterMetrics { return httpPost.dc } func (httpPost *HTTPPostEE) ExtraData(ev *utils.CGREvent) any { return nil } diff --git a/ees/httppost_test.go b/ees/httppost_test.go index 1c95f60a1..c6b53ffd2 100644 --- a/ees/httppost_test.go +++ b/ees/httppost_test.go @@ -34,13 +34,7 @@ import ( ) func TestHttpPostGetMetrics(t *testing.T) { - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) httpPost := &HTTPPostEE{ dc: dc, } diff --git a/ees/kafka.go b/ees/kafka.go index e6b4b488c..97653868b 100644 --- a/ees/kafka.go +++ b/ees/kafka.go @@ -32,7 +32,7 @@ import ( ) // NewKafkaEE creates a kafka poster -func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*KafkaEE, error) { +func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) (*KafkaEE, error) { pstr := &KafkaEE{ cfg: cfg, dc: dc, @@ -100,7 +100,7 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*KafkaE type KafkaEE struct { writer *kafka.Writer cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics reqs *concReq bytePreparing } @@ -131,7 +131,7 @@ func (k *KafkaEE) Close() error { return k.writer.Close() } -func (k *KafkaEE) GetMetrics() *utils.SafeMapStorage { return k.dc } +func (k *KafkaEE) GetMetrics() *utils.ExporterMetrics { return k.dc } func (k *KafkaEE) ExtraData(ev *utils.CGREvent) any { return utils.ConcatenatedKey( utils.FirstNonEmpty(engine.MapEvent(ev.APIOpts).GetStringIgnoreErrors(utils.MetaOriginID), utils.GenUUID()), diff --git a/ees/log.go b/ees/log.go index 565f39adf..71b20d06c 100644 --- a/ees/log.go +++ b/ees/log.go @@ -27,7 +27,7 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewLogEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *LogEE { +func NewLogEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *LogEE { return &LogEE{ cfg: cfg, dc: dc, @@ -37,7 +37,7 @@ func NewLogEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *LogEE { // LogEE implements EventExporter interface for .csv files type LogEE struct { cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics } func (vEe *LogEE) Cfg() *config.EventExporterCfg { return vEe.cfg } @@ -48,9 +48,9 @@ func (vEe *LogEE) ExportEvent(_ *context.Context, mp, _ any) error { utils.EEs, vEe.Cfg().ID, utils.ToJSON(mp))) return nil } -func (vEe *LogEE) Close() error { return nil } -func (vEe *LogEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc } -func (vEe *LogEE) ExtraData(ev *utils.CGREvent) any { return nil } +func (vEe *LogEE) Close() error { return nil } +func (vEe *LogEE) GetMetrics() *utils.ExporterMetrics { return vEe.dc } +func (vEe *LogEE) ExtraData(ev *utils.CGREvent) any { return nil } func (vEe *LogEE) PrepareMap(mp *utils.CGREvent) (any, error) { return mp.Event, nil } diff --git a/ees/log_test.go b/ees/log_test.go index ee5e2cc38..22a7a3ed6 100644 --- a/ees/log_test.go +++ b/ees/log_test.go @@ -23,6 +23,7 @@ import ( "reflect" "strings" "testing" + "time" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" @@ -31,10 +32,7 @@ import ( func TestNewLogEE(t *testing.T) { cfg := config.NewDefaultCGRConfig() - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) expected := &LogEE{ cfg: cfg.EEsCfg().ExporterCfg(utils.MetaDefault), @@ -49,10 +47,7 @@ func TestNewLogEE(t *testing.T) { func TestLogEEExportEvent(t *testing.T) { cfg := config.NewDefaultCGRConfig() - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), dc) mp := map[string]any{ "field1": 2, @@ -72,33 +67,23 @@ func TestLogEEExportEvent(t *testing.T) { } } -func TestLogEEGetMetrics(t *testing.T) { - cfg := config.NewDefaultCGRConfig() - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) +func TestLogEE_GetMetrics(t *testing.T) { + mockMetrics := &utils.ExporterMetrics{} + + vEe := &LogEE{ + dc: mockMetrics, } - dc.MapStorage = utils.MapStorage{ - "metric1": "value", - } - expected := &utils.SafeMapStorage{ - MapStorage: utils.MapStorage{ - "metric1": "value", - }, - } - logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), dc) - rcv := logEE.GetMetrics() - if !reflect.DeepEqual(rcv, expected) { - t.Errorf("Expected %T \n but received \n %T", expected, rcv) + + result := vEe.GetMetrics() + + if result != mockMetrics { + t.Errorf("expected %v, got %v", mockMetrics, result) } } func TestLogEEPrepareMap(t *testing.T) { cfg := config.NewDefaultCGRConfig() - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), dc) mp := &utils.CGREvent{ Event: map[string]any{ @@ -114,10 +99,7 @@ func TestLogEEPrepareMap(t *testing.T) { func TestLogEEPrepareOrderMap(t *testing.T) { cfg := config.NewDefaultCGRConfig() - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), dc) mp := utils.NewOrderedNavigableMap() fullPath := &utils.FullPath{ diff --git a/ees/nats.go b/ees/nats.go index 22d5764a0..e6cf758cc 100644 --- a/ees/nats.go +++ b/ees/nats.go @@ -34,7 +34,7 @@ import ( ) // NewNatsEE creates a kafka poster -func NewNatsEE(cfg *config.EventExporterCfg, nodeID string, connTimeout time.Duration, dc *utils.SafeMapStorage) (natsPstr *NatsEE, err error) { +func NewNatsEE(cfg *config.EventExporterCfg, nodeID string, connTimeout time.Duration, dc *utils.ExporterMetrics) (natsPstr *NatsEE, err error) { natsPstr = &NatsEE{ cfg: cfg, dc: dc, @@ -55,7 +55,7 @@ type NatsEE struct { posterJS jetstream.JetStream cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics reqs *concReq sync.RWMutex // protect writer bytePreparing @@ -127,7 +127,7 @@ func (pstr *NatsEE) Close() error { return err } -func (pstr *NatsEE) GetMetrics() *utils.SafeMapStorage { return pstr.dc } +func (pstr *NatsEE) GetMetrics() *utils.ExporterMetrics { return pstr.dc } func (pstr *NatsEE) ExtraData(ev *utils.CGREvent) any { return nil } diff --git a/ees/rpc.go b/ees/rpc.go index a25f338e8..ecb097d2c 100644 --- a/ees/rpc.go +++ b/ees/rpc.go @@ -29,7 +29,7 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage, +func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics, connMgr *engine.ConnManager) (e *RPCee, err error) { e = &RPCee{ cfg: cfg, @@ -42,7 +42,7 @@ func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage, type RPCee struct { cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics connMgr *engine.ConnManager //opts @@ -81,7 +81,7 @@ func (e *RPCee) Close() (err error) { return } -func (e *RPCee) GetMetrics() (mp *utils.SafeMapStorage) { +func (e *RPCee) GetMetrics() (mp *utils.ExporterMetrics) { return e.dc } func (e *RPCee) ExtraData(ev *utils.CGREvent) any { return nil } diff --git a/ees/rpc_test.go b/ees/rpc_test.go index 9109069f8..ae68ae2b5 100644 --- a/ees/rpc_test.go +++ b/ees/rpc_test.go @@ -21,6 +21,7 @@ package ees import ( "reflect" "testing" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -29,10 +30,7 @@ import ( func TestNewRpcEE(t *testing.T) { eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault) - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) connMgr := engine.NewConnManager(config.NewDefaultCGRConfig()) rcv, err := NewRpcEE(eeSCfg, dc, connMgr) @@ -102,10 +100,7 @@ func TestRPCCfg(t *testing.T) { func TestRPCConnect(t *testing.T) { eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault) - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) connMgr := engine.NewConnManager(config.NewDefaultCGRConfig()) rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr) if err != nil { @@ -147,10 +142,7 @@ func TestRPCConnect(t *testing.T) { func TestRPCClose(t *testing.T) { eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault) - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) connMgr := engine.NewConnManager(config.NewDefaultCGRConfig()) rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr) if err != nil { @@ -166,7 +158,7 @@ func TestRPCClose(t *testing.T) { func TestRPCGetMetrics(t *testing.T) { eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault) - dc := &utils.SafeMapStorage{ + dc := &utils.ExporterMetrics{ MapStorage: utils.MapStorage{ "time": "now", "just_a_field": "just_a_value", @@ -185,10 +177,7 @@ func TestRPCGetMetrics(t *testing.T) { func TestRPCPrepareMap(t *testing.T) { eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault) - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) connMgr := engine.NewConnManager(config.NewDefaultCGRConfig()) rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr) if err != nil { diff --git a/ees/s3.go b/ees/s3.go index c981d9c45..a207840d6 100644 --- a/ees/s3.go +++ b/ees/s3.go @@ -34,7 +34,7 @@ import ( ) // NewS3EE creates a s3 poster -func NewS3EE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *S3EE { +func NewS3EE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *S3EE { pstr := &S3EE{ cfg: cfg, dc: dc, @@ -56,7 +56,7 @@ type S3EE struct { up *s3manager.Uploader cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics reqs *concReq sync.RWMutex // protect connection bytePreparing @@ -138,7 +138,7 @@ func (pstr *S3EE) ExportEvent(ctx *context.Context, message, extraData any) (err func (pstr *S3EE) Close() (_ error) { return } -func (pstr *S3EE) GetMetrics() *utils.SafeMapStorage { return pstr.dc } +func (pstr *S3EE) GetMetrics() *utils.ExporterMetrics { return pstr.dc } func (pstr *S3EE) ExtraData(ev *utils.CGREvent) any { return utils.ConcatenatedKey( diff --git a/ees/sql.go b/ees/sql.go index 71f755526..8da69a404 100644 --- a/ees/sql.go +++ b/ees/sql.go @@ -36,7 +36,7 @@ import ( ) func NewSQLEe(cfg *config.EventExporterCfg, - dc *utils.SafeMapStorage) (sqlEe *SQLEe, err error) { + dc *utils.ExporterMetrics) (sqlEe *SQLEe, err error) { sqlEe = &SQLEe{ cfg: cfg, dc: dc, @@ -49,7 +49,7 @@ func NewSQLEe(cfg *config.EventExporterCfg, // SQLEe implements EventExporter interface for SQL type SQLEe struct { cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics db *gorm.DB sqldb *sql.DB reqs *concReq @@ -157,7 +157,7 @@ func (sqlEe *SQLEe) Close() (err error) { return } -func (sqlEe *SQLEe) GetMetrics() *utils.SafeMapStorage { return sqlEe.dc } +func (sqlEe *SQLEe) GetMetrics() *utils.ExporterMetrics { return sqlEe.dc } func (sqlEe *SQLEe) ExtraData(ev *utils.CGREvent) any { return nil } diff --git a/ees/sql_test.go b/ees/sql_test.go index 3ef3a7193..814a35480 100644 --- a/ees/sql_test.go +++ b/ees/sql_test.go @@ -22,6 +22,7 @@ import ( "fmt" "reflect" "testing" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" @@ -32,10 +33,7 @@ import ( ) func TestSqlGetMetrics(t *testing.T) { - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) sqlEe := &SQLEe{ dc: dc, } diff --git a/ees/sqs.go b/ees/sqs.go index 8e4f1bc53..2e066f588 100644 --- a/ees/sqs.go +++ b/ees/sqs.go @@ -32,7 +32,7 @@ import ( ) // NewSQSee creates a poster for sqs -func NewSQSee(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *SQSee { +func NewSQSee(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *SQSee { pstr := &SQSee{ cfg: cfg, dc: dc, @@ -54,7 +54,7 @@ type SQSee struct { svc *sqs.SQS cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics reqs *concReq sync.RWMutex // protect connection bytePreparing @@ -146,6 +146,6 @@ func (pstr *SQSee) ExportEvent(ctx *context.Context, message, _ any) (err error) func (pstr *SQSee) Close() (_ error) { return } -func (pstr *SQSee) GetMetrics() *utils.SafeMapStorage { return pstr.dc } +func (pstr *SQSee) GetMetrics() *utils.ExporterMetrics { return pstr.dc } func (pstr *SQSee) ExtraData(ev *utils.CGREvent) any { return nil } diff --git a/ees/virtualee.go b/ees/virtualee.go index ee3cc7d83..56d969abe 100644 --- a/ees/virtualee.go +++ b/ees/virtualee.go @@ -24,7 +24,7 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewVirtualEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *VirtualEE { +func NewVirtualEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *VirtualEE { return &VirtualEE{ cfg: cfg, dc: dc, @@ -34,14 +34,14 @@ func NewVirtualEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *Virtu // VirtualEE implements EventExporter interface for .csv files type VirtualEE struct { cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics } func (vEe *VirtualEE) Cfg() *config.EventExporterCfg { return vEe.cfg } func (vEe *VirtualEE) Connect() error { return nil } func (vEe *VirtualEE) ExportEvent(*context.Context, any, any) error { return nil } func (vEe *VirtualEE) Close() error { return nil } -func (vEe *VirtualEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc } +func (vEe *VirtualEE) GetMetrics() *utils.ExporterMetrics { return vEe.dc } func (vEe *VirtualEE) ExtraData(*utils.CGREvent) any { return nil } func (vEe *VirtualEE) PrepareMap(mp *utils.CGREvent) (any, error) { return nil, nil } func (vEe *VirtualEE) PrepareOrderMap(*utils.OrderedNavigableMap) (any, error) { diff --git a/ees/virtualee_test.go b/ees/virtualee_test.go index 84ea42b01..d6a2b8663 100644 --- a/ees/virtualee_test.go +++ b/ees/virtualee_test.go @@ -21,16 +21,14 @@ package ees import ( "reflect" "testing" + "time" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/utils" ) func TestVirtualEeGetMetrics(t *testing.T) { - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) vEe := &VirtualEE{ dc: dc, } diff --git a/utils/exportermetrics.go b/utils/exportermetrics.go new file mode 100644 index 000000000..ade0fc90b --- /dev/null +++ b/utils/exportermetrics.go @@ -0,0 +1,151 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package utils + +import ( + "sync" + "time" + + "github.com/cgrates/cron" +) + +// ExporterMetrics stores export statistics with thread-safe access and +// cron-scheduled resets. +type ExporterMetrics struct { + mu sync.RWMutex + MapStorage MapStorage + cron *cron.Cron + loc *time.Location +} + +// NewExporterMetrics creates metrics with optional automatic reset. +// schedule is a cron expression for reset timing (empty to disable). +func NewExporterMetrics(schedule string, loc *time.Location) *ExporterMetrics { + m := &ExporterMetrics{ + loc: loc, + } + m.Reset() // init MapStorage with default values + + if schedule != "" { + m.cron = cron.New() + m.cron.AddFunc(schedule, func() { + m.Reset() + }) + m.cron.Start() + } + return m +} + +// Reset immediately clears all metrics and resets them to initial values. +func (m *ExporterMetrics) Reset() { + m.mu.Lock() + defer m.mu.Unlock() + m.MapStorage = MapStorage{ + NumberOfEvents: int64(0), + PositiveExports: StringSet{}, + NegativeExports: StringSet{}, + TimeNow: time.Now().In(m.loc), + } +} + +// StopCron stops the automatic reset schedule if one is active. +func (m *ExporterMetrics) StopCron() { + if m.cron == nil { + return + } + m.cron.Stop() + // ctx := m.cron.Stop() + // <-ctx.Done() // wait for any running jobs to complete +} + +// String returns the map as json string. +func (m *ExporterMetrics) String() string { + m.mu.RLock() + defer m.mu.RUnlock() + return m.MapStorage.String() +} + +// FieldAsInterface returns the value from the path. +func (m *ExporterMetrics) FieldAsInterface(fldPath []string) (val any, err error) { + m.mu.RLock() + defer m.mu.RUnlock() + return m.MapStorage.FieldAsInterface(fldPath) +} + +// FieldAsString returns the value from path as string. +func (m *ExporterMetrics) FieldAsString(fldPath []string) (str string, err error) { + m.mu.RLock() + defer m.mu.RUnlock() + return m.MapStorage.FieldAsString(fldPath) +} + +// Set sets the value at the given path. +func (m *ExporterMetrics) Set(fldPath []string, val any) (err error) { + m.mu.Lock() + defer m.mu.Unlock() + return m.MapStorage.Set(fldPath, val) +} + +// GetKeys returns all the keys from map. +func (m *ExporterMetrics) GetKeys(nested bool, nestedLimit int, prefix string) (keys []string) { + m.mu.RLock() + defer m.mu.RUnlock() + return m.MapStorage.GetKeys(nested, nestedLimit, prefix) +} + +// Remove removes the item at path +func (m *ExporterMetrics) Remove(fldPath []string) (err error) { + m.mu.Lock() + defer m.mu.Unlock() + return m.MapStorage.Remove(fldPath) +} + +func (m *ExporterMetrics) ClonedMapStorage() (msClone MapStorage) { + m.mu.RLock() + defer m.mu.RUnlock() + return m.MapStorage.Clone() +} + +// IncrementEvents increases the event counter (NumberOfEvents) by 1. +func (m *ExporterMetrics) IncrementEvents() { + m.mu.Lock() + defer m.mu.Unlock() + count, _ := m.MapStorage[NumberOfEvents].(int64) + m.MapStorage[NumberOfEvents] = count + 1 +} + +// Lock locks the ExporterMetrics mutex. +func (m *ExporterMetrics) Lock() { + m.mu.Lock() +} + +// Unlock unlocks the ExporterMetrics mutex. +func (m *ExporterMetrics) Unlock() { + m.mu.Unlock() +} + +// RLock locks the ExporterMetrics mutex for reading. +func (m *ExporterMetrics) RLock() { + m.mu.RLock() +} + +// RUnlock unlocks the read lock on the ExporterMetrics mutex. +func (m *ExporterMetrics) RUnlock() { + m.mu.RUnlock() +} diff --git a/utils/safemapstorage_test.go b/utils/exportermetrics_test.go similarity index 74% rename from utils/safemapstorage_test.go rename to utils/exportermetrics_test.go index e2b4f3885..e118222cc 100644 --- a/utils/safemapstorage_test.go +++ b/utils/exportermetrics_test.go @@ -23,8 +23,8 @@ import ( "testing" ) -func TestSafeMapStorageString(t *testing.T) { - ms := &SafeMapStorage{ +func TestExporterMetricsString(t *testing.T) { + ms := &ExporterMetrics{ MapStorage: MapStorage{ "field1": 2, }, @@ -35,8 +35,8 @@ func TestSafeMapStorageString(t *testing.T) { } } -func TestSafeMapStorageFieldAsInterface(t *testing.T) { - ms := &SafeMapStorage{ +func TestExporterMetricsFieldAsInterface(t *testing.T) { + ms := &ExporterMetrics{ MapStorage: MapStorage{ "field1": 2, }, @@ -51,8 +51,8 @@ func TestSafeMapStorageFieldAsInterface(t *testing.T) { } } -func TestSafeMapStorageFieldAsString(t *testing.T) { - ms := &SafeMapStorage{ +func TestExporterMetricsFieldAsString(t *testing.T) { + ms := &ExporterMetrics{ MapStorage: MapStorage{ "field1": 2, }, @@ -67,14 +67,14 @@ func TestSafeMapStorageFieldAsString(t *testing.T) { } } -func TestSafeMapStorageSet(t *testing.T) { - ms := &SafeMapStorage{ +func TestExporterMetricsSet(t *testing.T) { + ms := &ExporterMetrics{ MapStorage: MapStorage{ "field1": 2, }, } - expected := &SafeMapStorage{ + expected := &ExporterMetrics{ MapStorage: MapStorage{ "field1": 2, "field2": 3, @@ -88,8 +88,8 @@ func TestSafeMapStorageSet(t *testing.T) { } } -func TestSafeMapStorageGetKeys(t *testing.T) { - ms := &SafeMapStorage{ +func TestExporterMetricsGetKeys(t *testing.T) { + ms := &ExporterMetrics{ MapStorage: MapStorage{ "field1": 2, "field2": 3, @@ -105,15 +105,15 @@ func TestSafeMapStorageGetKeys(t *testing.T) { } } -func TestSafeMapStorageRemove(t *testing.T) { - ms := &SafeMapStorage{ +func TestExporterMetricsRemove(t *testing.T) { + ms := &ExporterMetrics{ MapStorage: MapStorage{ "field1": 2, "field2": 3, }, } - expected := &SafeMapStorage{ + expected := &ExporterMetrics{ MapStorage: MapStorage{ "field1": 2, }, @@ -126,28 +126,8 @@ func TestSafeMapStorageRemove(t *testing.T) { } } -func TestSafeMapStorageClone(t *testing.T) { - ms := &SafeMapStorage{ - MapStorage: MapStorage{ - "field1": 2, - "field2": 3, - }, - } - - expected := &SafeMapStorage{ - MapStorage: MapStorage{ - "field1": 2, - "field2": 3, - }, - } - - if reply := ms.Clone(); !reflect.DeepEqual(reply, expected) { - t.Errorf("Expected %v \n but received \n %v", expected, reply) - } -} - -func TestClonedMapStorage(t *testing.T) { - ms := &SafeMapStorage{ +func TestExporterMetricsClonedMapStorage(t *testing.T) { + ms := &ExporterMetrics{ MapStorage: MapStorage{ "field1": []string{"v1", "v2"}, }, diff --git a/utils/safemapstorage.go b/utils/safemapstorage.go deleted file mode 100644 index da1966f79..000000000 --- a/utils/safemapstorage.go +++ /dev/null @@ -1,83 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see -*/ - -package utils - -import ( - "sync" -) - -// SafeMapStorage is a Mapstorage with mutex -type SafeMapStorage struct { - MapStorage - sync.RWMutex -} - -// String returns the map as json string -func (ms *SafeMapStorage) String() string { - ms.RLock() - defer ms.RUnlock() - return ms.MapStorage.String() -} - -// FieldAsInterface returns the value from the path -func (ms *SafeMapStorage) FieldAsInterface(fldPath []string) (val any, err error) { - ms.RLock() - defer ms.RUnlock() - return ms.MapStorage.FieldAsInterface(fldPath) -} - -// FieldAsString returns the value from path as string -func (ms *SafeMapStorage) FieldAsString(fldPath []string) (str string, err error) { - ms.RLock() - defer ms.RUnlock() - return ms.MapStorage.FieldAsString(fldPath) -} - -// Set sets the value at the given path -func (ms *SafeMapStorage) Set(fldPath []string, val any) (err error) { - ms.Lock() - defer ms.Unlock() - return ms.MapStorage.Set(fldPath, val) -} - -// GetKeys returns all the keys from map -func (ms *SafeMapStorage) GetKeys(nested bool, nestedLimit int, prefix string) (keys []string) { - ms.RLock() - defer ms.RUnlock() - return ms.MapStorage.GetKeys(nested, nestedLimit, prefix) -} - -// Remove removes the item at path -func (ms *SafeMapStorage) Remove(fldPath []string) (err error) { - ms.Lock() - defer ms.Unlock() - return ms.MapStorage.Remove(fldPath) -} - -func (ms *SafeMapStorage) Clone() (msClone *SafeMapStorage) { - ms.RLock() - defer ms.RUnlock() - return &SafeMapStorage{MapStorage: ms.MapStorage.Clone()} -} - -func (ms *SafeMapStorage) ClonedMapStorage() (msClone MapStorage) { - ms.RLock() - defer ms.RUnlock() - return ms.MapStorage.Clone() -}