diff --git a/ees/amqp.go b/ees/amqp.go index ddcf14377..a8cd9ab41 100644 --- a/ees/amqp.go +++ b/ees/amqp.go @@ -30,7 +30,7 @@ import ( // NewAMQPee creates a new amqp poster // "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs" -func NewAMQPee(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *AMQPee { +func NewAMQPee(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *AMQPee { amqp := &AMQPee{ cfg: cfg, dc: dc, @@ -50,7 +50,7 @@ type AMQPee struct { postChan *amqp.Channel cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics reqs *concReq sync.RWMutex // protect connection bytePreparing @@ -177,4 +177,4 @@ func (pstr *AMQPee) Close() (err error) { return } -func (pstr *AMQPee) GetMetrics() *utils.SafeMapStorage { return pstr.dc } +func (pstr *AMQPee) GetMetrics() *utils.ExporterMetrics { return pstr.dc } diff --git a/ees/amqp_test.go b/ees/amqp_test.go index dfb74b414..85e464026 100644 --- a/ees/amqp_test.go +++ b/ees/amqp_test.go @@ -26,7 +26,7 @@ import ( ) func TestAmqpGetMetrics(t *testing.T) { - expectedMetrics := &utils.SafeMapStorage{} + expectedMetrics := &utils.ExporterMetrics{} pstr := &AMQPee{ dc: expectedMetrics, } @@ -48,7 +48,7 @@ func TestCfg(t *testing.T) { } func TestAmqpToGetMetrics(t *testing.T) { - expectedMetrics := &utils.SafeMapStorage{} + expectedMetrics := &utils.ExporterMetrics{} amqp := &AMQPv1EE{ dc: expectedMetrics, } diff --git a/ees/amqpv1.go b/ees/amqpv1.go index 3a8536762..169a071e7 100644 --- a/ees/amqpv1.go +++ b/ees/amqpv1.go @@ -28,7 +28,7 @@ import ( ) // NewAMQPv1EE creates a poster for amqpv1 -func NewAMQPv1EE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *AMQPv1EE { +func NewAMQPv1EE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *AMQPv1EE { pstr := &AMQPv1EE{ cfg: cfg, dc: dc, @@ -56,7 +56,7 @@ type AMQPv1EE struct { session *amqpv1.Session cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics reqs *concReq sync.RWMutex // protect connection bytePreparing @@ -121,4 +121,4 @@ func (pstr *AMQPv1EE) Close() (err error) { return } -func (pstr *AMQPv1EE) GetMetrics() *utils.SafeMapStorage { return pstr.dc } +func (pstr *AMQPv1EE) GetMetrics() *utils.ExporterMetrics { return pstr.dc } diff --git a/ees/ee.go b/ees/ee.go index 734aa138b..3f255f076 100644 --- a/ees/ee.go +++ b/ees/ee.go @@ -30,11 +30,11 @@ import ( ) type EventExporter interface { - Cfg() *config.EventExporterCfg // return the config - Connect() error // called before exporting an event to make sure it is connected - ExportEvent(any, string) error // called on each event to be exported - Close() error // called when the exporter needs to terminate - GetMetrics() *utils.SafeMapStorage // called to get metrics + Cfg() *config.EventExporterCfg // return the config + Connect() error // called before exporting an event to make sure it is connected + ExportEvent(any, string) error // called on each event to be exported + Close() error // called when the exporter needs to terminate + GetMetrics() *utils.ExporterMetrics // called to get metrics PrepareMap(*utils.CGREvent) (any, error) PrepareOrderMap(*utils.OrderedNavigableMap) (any, error) } @@ -42,12 +42,13 @@ type EventExporter interface { // NewEventExporter produces exporters func NewEventExporter(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, connMngr *engine.ConnManager) (ee EventExporter, err error) { - var dc *utils.SafeMapStorage - if dc, err = newEEMetrics(utils.FirstNonEmpty( - cfg.Timezone, - cgrCfg.GeneralCfg().DefaultTimezone)); err != nil { - return + timezone := utils.FirstNonEmpty(cfg.Timezone, cgrCfg.GeneralCfg().DefaultTimezone) + loc, err := time.LoadLocation(timezone) + if err != nil { + return nil, err } + dc := utils.NewExporterMetrics(cfg.MetricsResetSchedule, loc) + switch cfg.Type { case utils.MetaFileCSV: return NewFileCSVee(cfg, cgrCfg, filterS, dc) @@ -123,20 +124,7 @@ func composeHeaderTrailer(prfx string, fields []*config.FCTemplate, dc utils.Dat return } -func newEEMetrics(location string) (*utils.SafeMapStorage, error) { - loc, err := time.LoadLocation(location) - if err != nil { - return nil, err - } - return &utils.SafeMapStorage{MapStorage: utils.MapStorage{ - utils.NumberOfEvents: int64(0), - utils.PositiveExports: utils.StringSet{}, - utils.NegativeExports: utils.StringSet{}, - utils.TimeNow: time.Now().In(loc), - }}, nil -} - -func updateEEMetrics(dc *utils.SafeMapStorage, cgrID string, ev engine.MapEvent, hasError bool, timezone string) { +func updateEEMetrics(dc *utils.ExporterMetrics, cgrID string, ev engine.MapEvent, hasError bool, timezone string) { dc.Lock() defer dc.Unlock() if hasError { diff --git a/ees/ee_test.go b/ees/ee_test.go index 2934ad0c7..87ea03fc8 100644 --- a/ees/ee_test.go +++ b/ees/ee_test.go @@ -23,6 +23,7 @@ import ( "reflect" "strings" "testing" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -39,13 +40,7 @@ func TestNewEventExporter(t *testing.T) { if strings.Contains(errExpect, err.Error()) { t.Errorf("Expected %+v but got %+v", errExpect, err) } - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) eeExpect, err := NewFileCSVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc) if strings.Contains(errExpect, err.Error()) { t.Errorf("Expected %+v but got %+v", errExpect, err) @@ -73,10 +68,7 @@ func TestNewEventExporterCase2(t *testing.T) { t.Errorf("Expected %+v but got %+v", errExpect, err) } - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) + dc := utils.NewExporterMetrics("", time.Local) eeExpect, err := NewFileFWVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc) if strings.Contains(errExpect, err.Error()) { t.Errorf("Expected %+v but got %+v", errExpect, err) @@ -101,10 +93,7 @@ func TestNewEventExporterCase3(t *testing.T) { if err != nil { t.Error(err) } - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) + dc := utils.NewExporterMetrics("", time.Local) eeExpect, err := NewHTTPPostEE(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc) if err != nil { t.Error(err) @@ -126,10 +115,7 @@ func TestNewEventExporterCase4(t *testing.T) { if err != nil { t.Error(err) } - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) + dc := utils.NewExporterMetrics("", time.Local) eeExpect, err := NewHTTPjsonMapEE(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc) if err != nil { t.Error(err) @@ -151,13 +137,7 @@ func TestNewEventExporterCase6(t *testing.T) { if err != nil { t.Error(err) } - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) eeExpect := NewVirtualEE(cgrCfg.EEsCfg().Exporters[0], dc) newEE := ee.(*VirtualEE) newEE.dc.MapStorage[utils.TimeNow] = nil @@ -190,10 +170,7 @@ func TestNewEventExporterCase7(t *testing.T) { if err != nil { t.Error(err) } - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) + dc := utils.NewExporterMetrics("", time.Local) if err != nil { t.Error(err) } diff --git a/ees/ees.go b/ees/ees.go index cfbd8419d..9c21ac52f 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -35,6 +35,7 @@ import ( // onCacheEvicted is called by ltcache when evicting an item func onCacheEvicted(_ string, value any) { ee := value.(EventExporter) + ee.GetMetrics().StopCron() ee.Close() } @@ -306,9 +307,7 @@ func exportEventWithExporter(exp EventExporter, ev *utils.CGREvent, oneTime bool }() var eEv any - exp.GetMetrics().Lock() - exp.GetMetrics().MapStorage[utils.NumberOfEvents] = exp.GetMetrics().MapStorage[utils.NumberOfEvents].(int64) + 1 - exp.GetMetrics().Unlock() + exp.GetMetrics().IncrementEvents() if len(exp.Cfg().ContentFields()) == 0 { if eEv, err = exp.PrepareMap(ev); err != nil { return diff --git a/ees/ees_test.go b/ees/ees_test.go index 7c1718e69..f31c62f17 100644 --- a/ees/ees_test.go +++ b/ees/ees_test.go @@ -292,7 +292,7 @@ func TestV1ProcessEvent4(t *testing.T) { } func newMockEventExporter() *mockEventExporter { - return &mockEventExporter{dc: &utils.SafeMapStorage{ + return &mockEventExporter{dc: &utils.ExporterMetrics{ MapStorage: utils.MapStorage{ utils.NumberOfEvents: int64(0), utils.PositiveExports: utils.StringSet{}, @@ -301,11 +301,11 @@ func newMockEventExporter() *mockEventExporter { } type mockEventExporter struct { - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics bytePreparing } -func (m mockEventExporter) GetMetrics() *utils.SafeMapStorage { +func (m mockEventExporter) GetMetrics() *utils.ExporterMetrics { return m.dc } @@ -438,7 +438,7 @@ func TestOnCacheEvicted(t *testing.T) { } func TestUpdateEEMetrics(t *testing.T) { - dc, _ := newEEMetrics(utils.EmptyString) + dc := utils.NewExporterMetrics("", time.UTC) tnow := time.Now() ev := engine.MapEvent{ utils.AnswerTime: tnow, @@ -447,7 +447,7 @@ func TestUpdateEEMetrics(t *testing.T) { utils.ToR: utils.MetaVoice, utils.Usage: time.Second, } - exp, _ := newEEMetrics(utils.EmptyString) + exp := utils.NewExporterMetrics("", time.UTC) exp.MapStorage[utils.FirstEventATime] = tnow exp.MapStorage[utils.LastEventATime] = tnow exp.MapStorage[utils.FirstExpOrderID] = int64(1) diff --git a/ees/elastic.go b/ees/elastic.go index 000c95427..621c4d326 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -38,14 +38,14 @@ import ( type ElasticEE struct { mu sync.RWMutex cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics reqs *concReq client *elasticsearch.TypedClient clientCfg elasticsearch.Config } -func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*ElasticEE, error) { +func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) (*ElasticEE, error) { el := &ElasticEE{ cfg: cfg, dc: dc, @@ -222,4 +222,4 @@ func (e *ElasticEE) Close() error { return nil } -func (e *ElasticEE) GetMetrics() *utils.SafeMapStorage { return e.dc } +func (e *ElasticEE) GetMetrics() *utils.ExporterMetrics { return e.dc } diff --git a/ees/elastic_test.go b/ees/elastic_test.go index 1cbbce3f8..87e82f4eb 100644 --- a/ees/elastic_test.go +++ b/ees/elastic_test.go @@ -20,6 +20,7 @@ package ees import ( "reflect" "testing" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" @@ -27,10 +28,7 @@ import ( ) func TestGetMetrics(t *testing.T) { - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) ee := &ElasticEE{ dc: dc, } @@ -61,10 +59,7 @@ func TestInitClient(t *testing.T) { func TestElasticExportEventErr(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc) if err != nil { t.Error(err) diff --git a/ees/filecsv.go b/ees/filecsv.go index f286f347a..f3697cd4e 100644 --- a/ees/filecsv.go +++ b/ees/filecsv.go @@ -34,7 +34,7 @@ import ( func NewFileCSVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, - dc *utils.SafeMapStorage) (fCsv *FileCSVee, err error) { + dc *utils.ExporterMetrics) (fCsv *FileCSVee, err error) { fCsv = &FileCSVee{ cfg: cfg, dc: dc, @@ -49,7 +49,7 @@ func NewFileCSVee(cfg *config.EventExporterCfg, // FileCSVee implements EventExporter interface for .csv files type FileCSVee struct { cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics file io.WriteCloser csvWriter *csv.Writer sync.Mutex @@ -129,4 +129,4 @@ func (fCsv *FileCSVee) Close() (err error) { return } -func (fCsv *FileCSVee) GetMetrics() *utils.SafeMapStorage { return fCsv.dc } +func (fCsv *FileCSVee) GetMetrics() *utils.ExporterMetrics { return fCsv.dc } diff --git a/ees/filecsv_it_test.go b/ees/filecsv_it_test.go index e8ea38008..c39e94dd4 100644 --- a/ees/filecsv_it_test.go +++ b/ees/filecsv_it_test.go @@ -677,13 +677,7 @@ func TestCsvInitFileCSV(t *testing.T) { if err := os.MkdirAll("/tmp/TestInitFileCSV", 0666); err != nil { t.Error(err) } - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) fCsv := &FileCSVee{ cgrCfg: cgrCfg, cfg: cgrCfg.EEsCfg().Exporters[0], diff --git a/ees/filecsv_test.go b/ees/filecsv_test.go index d1c52dd42..593c6e213 100644 --- a/ees/filecsv_test.go +++ b/ees/filecsv_test.go @@ -24,6 +24,7 @@ import ( "io" "reflect" "testing" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -31,13 +32,7 @@ import ( ) func TestFileCsvGetMetrics(t *testing.T) { - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) fCsv := &FileCSVee{dc: dc} if rcv := fCsv.GetMetrics(); !reflect.DeepEqual(rcv, fCsv.dc) { @@ -64,7 +59,7 @@ func TestFileCsvComposeHeader(t *testing.T) { filterS: filterS, file: nopCloser{byteBuff}, csvWriter: csvNW, - dc: &utils.SafeMapStorage{}, + dc: &utils.ExporterMetrics{}, } fCsv.Cfg().Fields = []*config.FCTemplate{ { @@ -127,7 +122,7 @@ func TestFileCsvComposeTrailer(t *testing.T) { filterS: filterS, file: nopCloser{byteBuff}, csvWriter: csvNW, - dc: &utils.SafeMapStorage{}, + dc: &utils.ExporterMetrics{}, } fCsv.Cfg().Fields = []*config.FCTemplate{ { @@ -184,13 +179,7 @@ func TestFileCsvExportEvent(t *testing.T) { filterS := engine.NewFilterS(cfg, nil, newDM) byteBuff := new(bytes.Buffer) csvNW := csv.NewWriter(byteBuff) - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) fCsv := &FileCSVee{ cfg: cfg.EEsCfg().Exporters[0], cgrCfg: cfg, @@ -223,7 +212,7 @@ func TestFileCsvOnEvictedTrailer(t *testing.T) { filterS: filterS, file: nopCloserWrite{byteBuff}, csvWriter: csvNW, - dc: &utils.SafeMapStorage{}, + dc: &utils.ExporterMetrics{}, } fCsv.Cfg().Fields = []*config.FCTemplate{ { @@ -257,7 +246,7 @@ func TestFileCsvOnEvictedClose(t *testing.T) { filterS: filterS, file: nopCloserError{byteBuff}, csvWriter: csvNW, - dc: &utils.SafeMapStorage{}, + dc: &utils.ExporterMetrics{}, } fCsv.Cfg().Fields = []*config.FCTemplate{ { diff --git a/ees/filefwv.go b/ees/filefwv.go index 0a68ab590..1ce58ba62 100644 --- a/ees/filefwv.go +++ b/ees/filefwv.go @@ -30,7 +30,7 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, dc *utils.SafeMapStorage) (fFwv *FileFWVee, err error) { +func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, dc *utils.ExporterMetrics) (fFwv *FileFWVee, err error) { fFwv = &FileFWVee{ cfg: cfg, dc: dc, @@ -45,7 +45,7 @@ func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filter // FileFWVee implements EventExporter interface for .fwv files type FileFWVee struct { cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics file io.WriteCloser sync.Mutex slicePreparing @@ -136,4 +136,4 @@ func (fFwv *FileFWVee) Close() (err error) { return } -func (fFwv *FileFWVee) GetMetrics() *utils.SafeMapStorage { return fFwv.dc } +func (fFwv *FileFWVee) GetMetrics() *utils.ExporterMetrics { return fFwv.dc } diff --git a/ees/filefwv_it_test.go b/ees/filefwv_it_test.go index ac2b28d83..c0685f035 100644 --- a/ees/filefwv_it_test.go +++ b/ees/filefwv_it_test.go @@ -164,13 +164,7 @@ func TestFileFwvInit(t *testing.T) { if err := os.MkdirAll("/tmp/TestInitFileFWV", 0666); err != nil { t.Error(err) } - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) fFwv := &FileFWVee{ cgrCfg: cgrCfg, cfg: cgrCfg.EEsCfg().Exporters[0], diff --git a/ees/filefwv_test.go b/ees/filefwv_test.go index 584a9d19e..e7a9c6574 100644 --- a/ees/filefwv_test.go +++ b/ees/filefwv_test.go @@ -24,6 +24,7 @@ import ( "io" "reflect" "testing" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -31,10 +32,7 @@ import ( ) func TestFileFwvGetMetrics(t *testing.T) { - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) fFwv := &FileFWVee{dc: dc} if rcv := fFwv.GetMetrics(); !reflect.DeepEqual(rcv, fFwv.dc) { @@ -54,7 +52,7 @@ func TestFileFwvComposeHeader(t *testing.T) { cgrCfg: cfg, filterS: filterS, file: nopCloser{byteBuff}, - dc: &utils.SafeMapStorage{}, + dc: &utils.ExporterMetrics{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ { @@ -116,7 +114,7 @@ func TestFileFwvComposeTrailer(t *testing.T) { cgrCfg: cfg, filterS: filterS, file: nopCloser{byteBuff}, - dc: &utils.SafeMapStorage{}, + dc: &utils.ExporterMetrics{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ { @@ -173,10 +171,7 @@ func TestFileFwvExportEvent(t *testing.T) { filterS := engine.NewFilterS(cfg, nil, newDM) byteBuff := new(bytes.Buffer) csvNW := csv.NewWriter(byteBuff) - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) fFwv := &FileFWVee{ cfg: cfg.EEsCfg().Exporters[0], cgrCfg: cfg, @@ -209,10 +204,7 @@ func TestFileFwvExportEventWriteError(t *testing.T) { newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, newDM) byteBuff := new(bytes.Buffer) - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) fFwv := &FileFWVee{ cfg: cfg.EEsCfg().Exporters[0], cgrCfg: cfg, @@ -236,7 +228,7 @@ func TestFileFwvComposeHeaderWriteError(t *testing.T) { cgrCfg: cfg, filterS: filterS, file: nopCloserWrite{byteBuff}, - dc: &utils.SafeMapStorage{}, + dc: &utils.ExporterMetrics{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ { @@ -268,7 +260,7 @@ func TestFileFwvComposeTrailerWriteError(t *testing.T) { cgrCfg: cfg, filterS: filterS, file: nopCloserWrite{byteBuff}, - dc: &utils.SafeMapStorage{}, + dc: &utils.ExporterMetrics{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ { @@ -299,7 +291,7 @@ func TestFileFwvOnEvictedTrailer(t *testing.T) { cgrCfg: cfg, filterS: filterS, file: nopCloserWrite{byteBuff}, - dc: &utils.SafeMapStorage{}, + dc: &utils.ExporterMetrics{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ { @@ -337,7 +329,7 @@ func TestFileFwvOnEvictedClose(t *testing.T) { cgrCfg: cfg, filterS: filterS, file: nopCloserError{byteBuff}, - dc: &utils.SafeMapStorage{}, + dc: &utils.ExporterMetrics{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ { diff --git a/ees/httpjsonmap.go b/ees/httpjsonmap.go index ff4f9b44b..f6e8d3dcf 100644 --- a/ees/httpjsonmap.go +++ b/ees/httpjsonmap.go @@ -33,7 +33,7 @@ import ( ) func NewHTTPjsonMapEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, - dc *utils.SafeMapStorage) (pstrJSON *HTTPjsonMapEE, err error) { + dc *utils.ExporterMetrics) (pstrJSON *HTTPjsonMapEE, err error) { pstrJSON = &HTTPjsonMapEE{ cfg: cfg, dc: dc, @@ -47,7 +47,7 @@ func NewHTTPjsonMapEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, fi // HTTPjsonMapEE implements EventExporter interface for .csv files type HTTPjsonMapEE struct { cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics client *http.Client reqs *concReq @@ -91,7 +91,7 @@ func (httpEE *HTTPjsonMapEE) ExportEvent(content any, _ string) (err error) { func (httpEE *HTTPjsonMapEE) Close() (_ error) { return } -func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.SafeMapStorage { return httpEE.dc } +func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.ExporterMetrics { return httpEE.dc } func (httpEE *HTTPjsonMapEE) PrepareMap(mp *utils.CGREvent) (any, error) { body, err := json.Marshal(mp.Event) diff --git a/ees/httpjsonmap_test.go b/ees/httpjsonmap_test.go index 10f5864e9..05ab88994 100644 --- a/ees/httpjsonmap_test.go +++ b/ees/httpjsonmap_test.go @@ -32,10 +32,7 @@ import ( ) func TestHttpJsonMapGetMetrics(t *testing.T) { - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) httpEE := &HTTPjsonMapEE{ dc: dc, } diff --git a/ees/httppost.go b/ees/httppost.go index ec4b101a9..3905a2e05 100644 --- a/ees/httppost.go +++ b/ees/httppost.go @@ -29,7 +29,7 @@ import ( ) func NewHTTPPostEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, - dc *utils.SafeMapStorage) (httpPost *HTTPPostEE, err error) { + dc *utils.ExporterMetrics) (httpPost *HTTPPostEE, err error) { httpPost = &HTTPPostEE{ cfg: cfg, dc: dc, @@ -43,7 +43,7 @@ func NewHTTPPostEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filte // FileCSVee implements EventExporter interface for .csv files type HTTPPostEE struct { cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics client *http.Client reqs *concReq @@ -92,7 +92,7 @@ func (httpPost *HTTPPostEE) ExportEvent(content any, _ string) (err error) { func (httpPost *HTTPPostEE) Close() (_ error) { return } -func (httpPost *HTTPPostEE) GetMetrics() *utils.SafeMapStorage { return httpPost.dc } +func (httpPost *HTTPPostEE) GetMetrics() *utils.ExporterMetrics { return httpPost.dc } func (httpPost *HTTPPostEE) PrepareMap(mp *utils.CGREvent) (any, error) { urlVals := url.Values{} diff --git a/ees/httppost_test.go b/ees/httppost_test.go index c4b69cb2c..3953f2692 100644 --- a/ees/httppost_test.go +++ b/ees/httppost_test.go @@ -33,13 +33,7 @@ import ( ) func TestHttpPostGetMetrics(t *testing.T) { - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) httpPost := &HTTPPostEE{ dc: dc, } diff --git a/ees/kafka.go b/ees/kafka.go index 199bfe589..1e9cdd514 100644 --- a/ees/kafka.go +++ b/ees/kafka.go @@ -30,7 +30,7 @@ import ( ) // NewKafkaEE creates a kafka poster -func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*KafkaEE, error) { +func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) (*KafkaEE, error) { pstr := &KafkaEE{ cfg: cfg, dc: dc, @@ -99,7 +99,7 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*KafkaE type KafkaEE struct { writer *kafka.Writer cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics reqs *concReq bytePreparing } @@ -130,4 +130,4 @@ func (k *KafkaEE) Close() error { return k.writer.Close() } -func (k *KafkaEE) GetMetrics() *utils.SafeMapStorage { return k.dc } +func (k *KafkaEE) GetMetrics() *utils.ExporterMetrics { return k.dc } diff --git a/ees/kafka_test.go b/ees/kafka_test.go index 6b8a2f022..7607d4fb3 100644 --- a/ees/kafka_test.go +++ b/ees/kafka_test.go @@ -29,7 +29,7 @@ func TestKafkaEEConnect(t *testing.T) { kafkaEE := &KafkaEE{ writer: nil, cfg: &config.EventExporterCfg{}, - dc: &utils.SafeMapStorage{}, + dc: &utils.ExporterMetrics{}, reqs: &concReq{}, } err := kafkaEE.Connect() @@ -50,7 +50,7 @@ func TestKafkaEE_Cfg(t *testing.T) { } func TestKafkaEEGetMetrics(t *testing.T) { - safeMapStorage := &utils.SafeMapStorage{} + safeMapStorage := &utils.ExporterMetrics{} kafkaEE := &KafkaEE{ dc: safeMapStorage, } diff --git a/ees/log.go b/ees/log.go index bfb08b400..6f87d1397 100644 --- a/ees/log.go +++ b/ees/log.go @@ -26,7 +26,7 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewLogEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *LogEE { +func NewLogEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *LogEE { return &LogEE{ cfg: cfg, dc: dc, @@ -36,7 +36,7 @@ func NewLogEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *LogEE { // LogEE implements EventExporter interface for .csv files type LogEE struct { cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics } func (vEe *LogEE) Cfg() *config.EventExporterCfg { return vEe.cfg } @@ -47,8 +47,8 @@ func (vEe *LogEE) ExportEvent(mp any, _ string) error { utils.EEs, vEe.Cfg().ID, utils.ToJSON(mp))) return nil } -func (vEe *LogEE) Close() error { return nil } -func (vEe *LogEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc } +func (vEe *LogEE) Close() error { return nil } +func (vEe *LogEE) GetMetrics() *utils.ExporterMetrics { return vEe.dc } func (vEe *LogEE) PrepareMap(mp *utils.CGREvent) (any, error) { return mp.Event, nil } diff --git a/ees/log_test.go b/ees/log_test.go index ddb672037..fb4f1f7c4 100644 --- a/ees/log_test.go +++ b/ees/log_test.go @@ -74,7 +74,7 @@ func TestLogEE_Close(t *testing.T) { } func TestLogEE_GetMetrics(t *testing.T) { - mockMetrics := &utils.SafeMapStorage{} + mockMetrics := &utils.ExporterMetrics{} vEe := &LogEE{ dc: mockMetrics, @@ -104,7 +104,7 @@ func TestLogEE_PrepareMap(t *testing.T) { func TestNewLogEE(t *testing.T) { cfg := &config.EventExporterCfg{} - dc := &utils.SafeMapStorage{} + dc := &utils.ExporterMetrics{} logEE := NewLogEE(cfg, dc) diff --git a/ees/nats.go b/ees/nats.go index a7f3dc0db..f7f21a0dc 100644 --- a/ees/nats.go +++ b/ees/nats.go @@ -34,7 +34,7 @@ import ( ) // NewNatsEE creates a kafka poster -func NewNatsEE(cfg *config.EventExporterCfg, nodeID string, connTimeout time.Duration, dc *utils.SafeMapStorage) (natsPstr *NatsEE, err error) { +func NewNatsEE(cfg *config.EventExporterCfg, nodeID string, connTimeout time.Duration, dc *utils.ExporterMetrics) (natsPstr *NatsEE, err error) { natsPstr = &NatsEE{ cfg: cfg, dc: dc, @@ -55,7 +55,7 @@ type NatsEE struct { posterJS jetstream.JetStream cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics reqs *concReq sync.RWMutex // protect writer bytePreparing @@ -140,7 +140,7 @@ func (pstr *NatsEE) Close() error { return err } -func (pstr *NatsEE) GetMetrics() *utils.SafeMapStorage { return pstr.dc } +func (pstr *NatsEE) GetMetrics() *utils.ExporterMetrics { return pstr.dc } func GetNatsOpts(opts *config.NATSOpts, nodeID string, connTimeout time.Duration) ([]nats.Option, error) { natsOpts := make([]nats.Option, 0, 7) diff --git a/ees/nats_test.go b/ees/nats_test.go index 2d8cfa1ab..cbf36e4be 100644 --- a/ees/nats_test.go +++ b/ees/nats_test.go @@ -43,10 +43,7 @@ func TestNewNatsEE(t *testing.T) { } nodeID := "node_id1" connTimeout := 2 * time.Second - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) exp := new(NatsEE) exp.cfg = cfg @@ -90,10 +87,7 @@ func TestParseOpt(t *testing.T) { opts := &config.EventExporterOpts{} nodeID := "node_id1" connTimeout := 2 * time.Second - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) pstr, err := NewNatsEE(cfg, nodeID, connTimeout, dc) if err != nil { t.Error(err) @@ -127,10 +121,7 @@ func TestParseOptJetStream(t *testing.T) { } nodeID := "node_id1" connTimeout := 2 * time.Second - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) pstr, err := NewNatsEE(cfg, nodeID, connTimeout, dc) if err != nil { t.Error(err) @@ -167,10 +158,7 @@ func TestParseOptSubject(t *testing.T) { }} nodeID := "node_id1" connTimeout := 2 * time.Second - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) pstr, err := NewNatsEE(cfg, nodeID, connTimeout, dc) if err != nil { t.Error(err) @@ -252,7 +240,7 @@ func TestNatsEECfg(t *testing.T) { } func TestNatsEEGetMetrics(t *testing.T) { - expectedMetrics := &utils.SafeMapStorage{} + expectedMetrics := &utils.ExporterMetrics{} pstr := &NatsEE{ dc: expectedMetrics, } diff --git a/ees/rpc.go b/ees/rpc.go index 6aace36a3..1b0356de6 100644 --- a/ees/rpc.go +++ b/ees/rpc.go @@ -29,7 +29,7 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage, +func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics, connMgr *engine.ConnManager) (e *RPCee, err error) { e = &RPCee{ cfg: cfg, @@ -42,7 +42,7 @@ func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage, type RPCee struct { cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics connMgr *engine.ConnManager //opts @@ -81,7 +81,7 @@ func (e *RPCee) Close() (err error) { return } -func (e *RPCee) GetMetrics() (mp *utils.SafeMapStorage) { +func (e *RPCee) GetMetrics() (mp *utils.ExporterMetrics) { return e.dc } diff --git a/ees/rpc_test.go b/ees/rpc_test.go index f067617a0..2963484b7 100644 --- a/ees/rpc_test.go +++ b/ees/rpc_test.go @@ -31,10 +31,7 @@ import ( func TestNewRpcEE(t *testing.T) { eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault) - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) connMgr := engine.NewConnManager(config.NewDefaultCGRConfig(), make(map[string]chan birpc.ClientConnector)) rcv, err := NewRpcEE(eeSCfg, dc, connMgr) @@ -104,10 +101,7 @@ func TestRPCCfg(t *testing.T) { func TestRPCConnect(t *testing.T) { eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault) - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) connMgr := engine.NewConnManager(config.NewDefaultCGRConfig(), make(map[string]chan birpc.ClientConnector)) rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr) if err != nil { @@ -120,10 +114,7 @@ func TestRPCConnect(t *testing.T) { func TestRPCClose(t *testing.T) { eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault) - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) connMgr := engine.NewConnManager(config.NewDefaultCGRConfig(), make(map[string]chan birpc.ClientConnector)) rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr) if err != nil { @@ -139,7 +130,7 @@ func TestRPCClose(t *testing.T) { func TestRPCGetMetrics(t *testing.T) { eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault) - dc := &utils.SafeMapStorage{ + dc := &utils.ExporterMetrics{ MapStorage: utils.MapStorage{ "time": "now", "just_a_field": "just_a_value", @@ -158,10 +149,7 @@ func TestRPCGetMetrics(t *testing.T) { func TestRPCPrepareMap(t *testing.T) { eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault) - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) connMgr := engine.NewConnManager(config.NewDefaultCGRConfig(), make(map[string]chan birpc.ClientConnector)) rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr) if err != nil { diff --git a/ees/s3.go b/ees/s3.go index 640279881..c879beaba 100644 --- a/ees/s3.go +++ b/ees/s3.go @@ -32,7 +32,7 @@ import ( ) // NewS3EE creates a s3 poster -func NewS3EE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *S3EE { +func NewS3EE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *S3EE { pstr := &S3EE{ cfg: cfg, dc: dc, @@ -54,7 +54,7 @@ type S3EE struct { up *s3manager.Uploader cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics reqs *concReq sync.RWMutex // protect connection bytePreparing @@ -137,4 +137,4 @@ func (pstr *S3EE) ExportEvent(message any, key string) (err error) { func (pstr *S3EE) Close() (_ error) { return } -func (pstr *S3EE) GetMetrics() *utils.SafeMapStorage { return pstr.dc } +func (pstr *S3EE) GetMetrics() *utils.ExporterMetrics { return pstr.dc } diff --git a/ees/s3_test.go b/ees/s3_test.go index 6d330392c..a7bc90131 100644 --- a/ees/s3_test.go +++ b/ees/s3_test.go @@ -26,7 +26,7 @@ import ( ) func TestS3GetMetrics(t *testing.T) { - safeMapStorage := &utils.SafeMapStorage{} + safeMapStorage := &utils.ExporterMetrics{} pstr := &S3EE{ dc: safeMapStorage, } diff --git a/ees/sql.go b/ees/sql.go index 4957927c3..c1db3dec9 100644 --- a/ees/sql.go +++ b/ees/sql.go @@ -35,7 +35,7 @@ import ( ) func NewSQLEe(cfg *config.EventExporterCfg, - dc *utils.SafeMapStorage) (sqlEe *SQLEe, err error) { + dc *utils.ExporterMetrics) (sqlEe *SQLEe, err error) { sqlEe = &SQLEe{ cfg: cfg, dc: dc, @@ -48,7 +48,7 @@ func NewSQLEe(cfg *config.EventExporterCfg, // SQLEe implements EventExporter interface for SQL type SQLEe struct { cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics db *gorm.DB sqldb *sql.DB reqs *concReq @@ -156,7 +156,7 @@ func (sqlEe *SQLEe) Close() (err error) { return } -func (sqlEe *SQLEe) GetMetrics() *utils.SafeMapStorage { return sqlEe.dc } +func (sqlEe *SQLEe) GetMetrics() *utils.ExporterMetrics { return sqlEe.dc } // Create the sqlPosterRequest used to instert the map into the table func (sqlEe *SQLEe) PrepareMap(cgrEv *utils.CGREvent) (any, error) { diff --git a/ees/sql_test.go b/ees/sql_test.go index 9cf378d42..9c10b24e6 100644 --- a/ees/sql_test.go +++ b/ees/sql_test.go @@ -22,6 +22,7 @@ import ( "fmt" "reflect" "testing" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" @@ -32,10 +33,7 @@ import ( ) func TestSqlGetMetrics(t *testing.T) { - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) sqlEe := &SQLEe{ dc: dc, } diff --git a/ees/sqs.go b/ees/sqs.go index 2c751bf94..8382fc037 100644 --- a/ees/sqs.go +++ b/ees/sqs.go @@ -31,7 +31,7 @@ import ( ) // NewSQSee creates a poster for sqs -func NewSQSee(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *SQSee { +func NewSQSee(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *SQSee { pstr := &SQSee{ cfg: cfg, dc: dc, @@ -53,7 +53,7 @@ type SQSee struct { svc *sqs.SQS cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics reqs *concReq sync.RWMutex // protect connection bytePreparing @@ -147,4 +147,4 @@ func (pstr *SQSee) ExportEvent(message any, _ string) (err error) { func (pstr *SQSee) Close() (_ error) { return } -func (pstr *SQSee) GetMetrics() *utils.SafeMapStorage { return pstr.dc } +func (pstr *SQSee) GetMetrics() *utils.ExporterMetrics { return pstr.dc } diff --git a/ees/sqs_test.go b/ees/sqs_test.go index 4e5d5dbfe..53cab1d17 100644 --- a/ees/sqs_test.go +++ b/ees/sqs_test.go @@ -26,7 +26,7 @@ import ( ) func TestGetMetricsT(t *testing.T) { - metrics := &utils.SafeMapStorage{} + metrics := &utils.ExporterMetrics{} pstr := &SQSee{dc: metrics} result := pstr.GetMetrics() if result != metrics { diff --git a/ees/virtualee.go b/ees/virtualee.go index 36424c354..d3d9af13d 100644 --- a/ees/virtualee.go +++ b/ees/virtualee.go @@ -26,7 +26,7 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewVirtualEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *VirtualEE { +func NewVirtualEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *VirtualEE { return &VirtualEE{ cfg: cfg, dc: dc, @@ -36,7 +36,7 @@ func NewVirtualEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *Virtu // VirtualEE implements EventExporter interface for .csv files type VirtualEE struct { cfg *config.EventExporterCfg - dc *utils.SafeMapStorage + dc *utils.ExporterMetrics } func (vEe *VirtualEE) Cfg() *config.EventExporterCfg { return vEe.cfg } @@ -49,8 +49,8 @@ func (vEe *VirtualEE) ExportEvent(payload any, _ string) error { return nil } -func (vEe *VirtualEE) Close() error { return nil } -func (vEe *VirtualEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc } +func (vEe *VirtualEE) Close() error { return nil } +func (vEe *VirtualEE) GetMetrics() *utils.ExporterMetrics { return vEe.dc } func (vEe *VirtualEE) PrepareMap(cgrEv *utils.CGREvent) (any, error) { return cgrEv.Event, nil diff --git a/ees/virtualee_test.go b/ees/virtualee_test.go index ef3a6a683..d17804227 100644 --- a/ees/virtualee_test.go +++ b/ees/virtualee_test.go @@ -21,6 +21,7 @@ package ees import ( "reflect" "testing" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" @@ -28,10 +29,7 @@ import ( ) func TestVirtualEeGetMetrics(t *testing.T) { - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } + dc := utils.NewExporterMetrics("", time.Local) vEe := &VirtualEE{ dc: dc, } diff --git a/utils/exportermetrics.go b/utils/exportermetrics.go new file mode 100644 index 000000000..ade0fc90b --- /dev/null +++ b/utils/exportermetrics.go @@ -0,0 +1,151 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package utils + +import ( + "sync" + "time" + + "github.com/cgrates/cron" +) + +// ExporterMetrics stores export statistics with thread-safe access and +// cron-scheduled resets. +type ExporterMetrics struct { + mu sync.RWMutex + MapStorage MapStorage + cron *cron.Cron + loc *time.Location +} + +// NewExporterMetrics creates metrics with optional automatic reset. +// schedule is a cron expression for reset timing (empty to disable). +func NewExporterMetrics(schedule string, loc *time.Location) *ExporterMetrics { + m := &ExporterMetrics{ + loc: loc, + } + m.Reset() // init MapStorage with default values + + if schedule != "" { + m.cron = cron.New() + m.cron.AddFunc(schedule, func() { + m.Reset() + }) + m.cron.Start() + } + return m +} + +// Reset immediately clears all metrics and resets them to initial values. +func (m *ExporterMetrics) Reset() { + m.mu.Lock() + defer m.mu.Unlock() + m.MapStorage = MapStorage{ + NumberOfEvents: int64(0), + PositiveExports: StringSet{}, + NegativeExports: StringSet{}, + TimeNow: time.Now().In(m.loc), + } +} + +// StopCron stops the automatic reset schedule if one is active. +func (m *ExporterMetrics) StopCron() { + if m.cron == nil { + return + } + m.cron.Stop() + // ctx := m.cron.Stop() + // <-ctx.Done() // wait for any running jobs to complete +} + +// String returns the map as json string. +func (m *ExporterMetrics) String() string { + m.mu.RLock() + defer m.mu.RUnlock() + return m.MapStorage.String() +} + +// FieldAsInterface returns the value from the path. +func (m *ExporterMetrics) FieldAsInterface(fldPath []string) (val any, err error) { + m.mu.RLock() + defer m.mu.RUnlock() + return m.MapStorage.FieldAsInterface(fldPath) +} + +// FieldAsString returns the value from path as string. +func (m *ExporterMetrics) FieldAsString(fldPath []string) (str string, err error) { + m.mu.RLock() + defer m.mu.RUnlock() + return m.MapStorage.FieldAsString(fldPath) +} + +// Set sets the value at the given path. +func (m *ExporterMetrics) Set(fldPath []string, val any) (err error) { + m.mu.Lock() + defer m.mu.Unlock() + return m.MapStorage.Set(fldPath, val) +} + +// GetKeys returns all the keys from map. +func (m *ExporterMetrics) GetKeys(nested bool, nestedLimit int, prefix string) (keys []string) { + m.mu.RLock() + defer m.mu.RUnlock() + return m.MapStorage.GetKeys(nested, nestedLimit, prefix) +} + +// Remove removes the item at path +func (m *ExporterMetrics) Remove(fldPath []string) (err error) { + m.mu.Lock() + defer m.mu.Unlock() + return m.MapStorage.Remove(fldPath) +} + +func (m *ExporterMetrics) ClonedMapStorage() (msClone MapStorage) { + m.mu.RLock() + defer m.mu.RUnlock() + return m.MapStorage.Clone() +} + +// IncrementEvents increases the event counter (NumberOfEvents) by 1. +func (m *ExporterMetrics) IncrementEvents() { + m.mu.Lock() + defer m.mu.Unlock() + count, _ := m.MapStorage[NumberOfEvents].(int64) + m.MapStorage[NumberOfEvents] = count + 1 +} + +// Lock locks the ExporterMetrics mutex. +func (m *ExporterMetrics) Lock() { + m.mu.Lock() +} + +// Unlock unlocks the ExporterMetrics mutex. +func (m *ExporterMetrics) Unlock() { + m.mu.Unlock() +} + +// RLock locks the ExporterMetrics mutex for reading. +func (m *ExporterMetrics) RLock() { + m.mu.RLock() +} + +// RUnlock unlocks the read lock on the ExporterMetrics mutex. +func (m *ExporterMetrics) RUnlock() { + m.mu.RUnlock() +} diff --git a/utils/safemapstorage_test.go b/utils/exportermetrics_test.go similarity index 73% rename from utils/safemapstorage_test.go rename to utils/exportermetrics_test.go index 4aeb7e9b8..be38ac052 100644 --- a/utils/safemapstorage_test.go +++ b/utils/exportermetrics_test.go @@ -23,8 +23,8 @@ import ( "testing" ) -func TestSafeMapStorageString(t *testing.T) { - ms := &SafeMapStorage{ +func TestExporterMetricsString(t *testing.T) { + ms := &ExporterMetrics{ MapStorage: MapStorage{ "field1": 2, }, @@ -35,8 +35,8 @@ func TestSafeMapStorageString(t *testing.T) { } } -func TestSafeMapStorageFieldAsInterface(t *testing.T) { - ms := &SafeMapStorage{ +func TestExporterMetricsFieldAsInterface(t *testing.T) { + ms := &ExporterMetrics{ MapStorage: MapStorage{ "field1": 2, }, @@ -51,8 +51,8 @@ func TestSafeMapStorageFieldAsInterface(t *testing.T) { } } -func TestSafeMapStorageFieldAsString(t *testing.T) { - ms := &SafeMapStorage{ +func TestExporterMetricsFieldAsString(t *testing.T) { + ms := &ExporterMetrics{ MapStorage: MapStorage{ "field1": 2, }, @@ -67,14 +67,14 @@ func TestSafeMapStorageFieldAsString(t *testing.T) { } } -func TestSafeMapStorageSet(t *testing.T) { - ms := &SafeMapStorage{ +func TestExporterMetricsSet(t *testing.T) { + ms := &ExporterMetrics{ MapStorage: MapStorage{ "field1": 2, }, } - expected := &SafeMapStorage{ + expected := &ExporterMetrics{ MapStorage: MapStorage{ "field1": 2, "field2": 3, @@ -88,8 +88,8 @@ func TestSafeMapStorageSet(t *testing.T) { } } -func TestSafeMapStorageGetKeys(t *testing.T) { - ms := &SafeMapStorage{ +func TestExporterMetricsGetKeys(t *testing.T) { + ms := &ExporterMetrics{ MapStorage: MapStorage{ "field1": 2, "field2": 3, @@ -104,15 +104,15 @@ func TestSafeMapStorageGetKeys(t *testing.T) { } } -func TestSafeMapStorageRemove(t *testing.T) { - ms := &SafeMapStorage{ +func TestExporterMetricsRemove(t *testing.T) { + ms := &ExporterMetrics{ MapStorage: MapStorage{ "field1": 2, "field2": 3, }, } - expected := &SafeMapStorage{ + expected := &ExporterMetrics{ MapStorage: MapStorage{ "field1": 2, }, @@ -125,29 +125,9 @@ func TestSafeMapStorageRemove(t *testing.T) { } } -func TestSafeMapStorageClone(t *testing.T) { - ms := &SafeMapStorage{ - MapStorage: MapStorage{ - "field1": 2, - "field2": 3, - }, - } +func TestExporterMetricsClonedMapStorage(t *testing.T) { - expected := &SafeMapStorage{ - MapStorage: MapStorage{ - "field1": 2, - "field2": 3, - }, - } - - if reply := ms.Clone(); !reflect.DeepEqual(reply, expected) { - t.Errorf("Expected %v \n but received \n %v", expected, reply) - } -} - -func TestSafeMapClonedMapStorage(t *testing.T) { - - ms := &SafeMapStorage{ + ms := &ExporterMetrics{ MapStorage: MapStorage{ "field1": 2, "field2": 3, diff --git a/utils/safemapstorage.go b/utils/safemapstorage.go deleted file mode 100644 index 9d9425d5e..000000000 --- a/utils/safemapstorage.go +++ /dev/null @@ -1,83 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package utils - -import ( - "sync" -) - -// SafeMapStorage is a Mapstorage with mutex -type SafeMapStorage struct { - MapStorage - sync.RWMutex -} - -// String returns the map as json string -func (ms *SafeMapStorage) String() string { - ms.RLock() - defer ms.RUnlock() - return ms.MapStorage.String() -} - -// FieldAsInterface returns the value from the path -func (ms *SafeMapStorage) FieldAsInterface(fldPath []string) (val any, err error) { - ms.RLock() - defer ms.RUnlock() - return ms.MapStorage.FieldAsInterface(fldPath) -} - -// FieldAsString returns the value from path as string -func (ms *SafeMapStorage) FieldAsString(fldPath []string) (str string, err error) { - ms.RLock() - defer ms.RUnlock() - return ms.MapStorage.FieldAsString(fldPath) -} - -// Set sets the value at the given path -func (ms *SafeMapStorage) Set(fldPath []string, val any) (err error) { - ms.Lock() - defer ms.Unlock() - return ms.MapStorage.Set(fldPath, val) -} - -// GetKeys returns all the keys from map -func (ms *SafeMapStorage) GetKeys(nested bool, nestedLimit int, prefix string) (keys []string) { - ms.RLock() - defer ms.RUnlock() - return ms.MapStorage.GetKeys(nested, nestedLimit, prefix) -} - -// Remove removes the item at path -func (ms *SafeMapStorage) Remove(fldPath []string) (err error) { - ms.Lock() - defer ms.Unlock() - return ms.MapStorage.Remove(fldPath) -} - -func (ms *SafeMapStorage) Clone() (msClone *SafeMapStorage) { - ms.RLock() - defer ms.RUnlock() - return &SafeMapStorage{MapStorage: ms.MapStorage.Clone()} -} - -func (ms *SafeMapStorage) ClonedMapStorage() (msClone MapStorage) { - ms.RLock() - defer ms.RUnlock() - return ms.MapStorage.Clone() -}