mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Add cron-based/manual reset to ExporterMetrics
This commit is contained in:
committed by
Dan Christian Bogos
parent
934815d9ee
commit
1c00a3aad1
@@ -30,7 +30,7 @@ import (
|
||||
|
||||
// NewAMQPee creates a new amqp poster
|
||||
// "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs"
|
||||
func NewAMQPee(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *AMQPee {
|
||||
func NewAMQPee(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *AMQPee {
|
||||
amqp := &AMQPee{
|
||||
cfg: cfg,
|
||||
dc: dc,
|
||||
@@ -50,7 +50,7 @@ type AMQPee struct {
|
||||
postChan *amqp.Channel
|
||||
|
||||
cfg *config.EventExporterCfg
|
||||
dc *utils.SafeMapStorage
|
||||
dc *utils.ExporterMetrics
|
||||
reqs *concReq
|
||||
sync.RWMutex // protect connection
|
||||
bytePreparing
|
||||
@@ -177,4 +177,4 @@ func (pstr *AMQPee) Close() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (pstr *AMQPee) GetMetrics() *utils.SafeMapStorage { return pstr.dc }
|
||||
func (pstr *AMQPee) GetMetrics() *utils.ExporterMetrics { return pstr.dc }
|
||||
|
||||
@@ -26,7 +26,7 @@ import (
|
||||
)
|
||||
|
||||
func TestAmqpGetMetrics(t *testing.T) {
|
||||
expectedMetrics := &utils.SafeMapStorage{}
|
||||
expectedMetrics := &utils.ExporterMetrics{}
|
||||
pstr := &AMQPee{
|
||||
dc: expectedMetrics,
|
||||
}
|
||||
@@ -48,7 +48,7 @@ func TestCfg(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAmqpToGetMetrics(t *testing.T) {
|
||||
expectedMetrics := &utils.SafeMapStorage{}
|
||||
expectedMetrics := &utils.ExporterMetrics{}
|
||||
amqp := &AMQPv1EE{
|
||||
dc: expectedMetrics,
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ import (
|
||||
)
|
||||
|
||||
// NewAMQPv1EE creates a poster for amqpv1
|
||||
func NewAMQPv1EE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *AMQPv1EE {
|
||||
func NewAMQPv1EE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *AMQPv1EE {
|
||||
pstr := &AMQPv1EE{
|
||||
cfg: cfg,
|
||||
dc: dc,
|
||||
@@ -56,7 +56,7 @@ type AMQPv1EE struct {
|
||||
session *amqpv1.Session
|
||||
|
||||
cfg *config.EventExporterCfg
|
||||
dc *utils.SafeMapStorage
|
||||
dc *utils.ExporterMetrics
|
||||
reqs *concReq
|
||||
sync.RWMutex // protect connection
|
||||
bytePreparing
|
||||
@@ -121,4 +121,4 @@ func (pstr *AMQPv1EE) Close() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (pstr *AMQPv1EE) GetMetrics() *utils.SafeMapStorage { return pstr.dc }
|
||||
func (pstr *AMQPv1EE) GetMetrics() *utils.ExporterMetrics { return pstr.dc }
|
||||
|
||||
36
ees/ee.go
36
ees/ee.go
@@ -30,11 +30,11 @@ import (
|
||||
)
|
||||
|
||||
type EventExporter interface {
|
||||
Cfg() *config.EventExporterCfg // return the config
|
||||
Connect() error // called before exporting an event to make sure it is connected
|
||||
ExportEvent(any, string) error // called on each event to be exported
|
||||
Close() error // called when the exporter needs to terminate
|
||||
GetMetrics() *utils.SafeMapStorage // called to get metrics
|
||||
Cfg() *config.EventExporterCfg // return the config
|
||||
Connect() error // called before exporting an event to make sure it is connected
|
||||
ExportEvent(any, string) error // called on each event to be exported
|
||||
Close() error // called when the exporter needs to terminate
|
||||
GetMetrics() *utils.ExporterMetrics // called to get metrics
|
||||
PrepareMap(*utils.CGREvent) (any, error)
|
||||
PrepareOrderMap(*utils.OrderedNavigableMap) (any, error)
|
||||
}
|
||||
@@ -42,12 +42,13 @@ type EventExporter interface {
|
||||
// NewEventExporter produces exporters
|
||||
func NewEventExporter(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS,
|
||||
connMngr *engine.ConnManager) (ee EventExporter, err error) {
|
||||
var dc *utils.SafeMapStorage
|
||||
if dc, err = newEEMetrics(utils.FirstNonEmpty(
|
||||
cfg.Timezone,
|
||||
cgrCfg.GeneralCfg().DefaultTimezone)); err != nil {
|
||||
return
|
||||
timezone := utils.FirstNonEmpty(cfg.Timezone, cgrCfg.GeneralCfg().DefaultTimezone)
|
||||
loc, err := time.LoadLocation(timezone)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dc := utils.NewExporterMetrics(cfg.MetricsResetSchedule, loc)
|
||||
|
||||
switch cfg.Type {
|
||||
case utils.MetaFileCSV:
|
||||
return NewFileCSVee(cfg, cgrCfg, filterS, dc)
|
||||
@@ -123,20 +124,7 @@ func composeHeaderTrailer(prfx string, fields []*config.FCTemplate, dc utils.Dat
|
||||
return
|
||||
}
|
||||
|
||||
func newEEMetrics(location string) (*utils.SafeMapStorage, error) {
|
||||
loc, err := time.LoadLocation(location)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &utils.SafeMapStorage{MapStorage: utils.MapStorage{
|
||||
utils.NumberOfEvents: int64(0),
|
||||
utils.PositiveExports: utils.StringSet{},
|
||||
utils.NegativeExports: utils.StringSet{},
|
||||
utils.TimeNow: time.Now().In(loc),
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func updateEEMetrics(dc *utils.SafeMapStorage, cgrID string, ev engine.MapEvent, hasError bool, timezone string) {
|
||||
func updateEEMetrics(dc *utils.ExporterMetrics, cgrID string, ev engine.MapEvent, hasError bool, timezone string) {
|
||||
dc.Lock()
|
||||
defer dc.Unlock()
|
||||
if hasError {
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -39,13 +40,7 @@ func TestNewEventExporter(t *testing.T) {
|
||||
if strings.Contains(errExpect, err.Error()) {
|
||||
t.Errorf("Expected %+v but got %+v", errExpect, err)
|
||||
}
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
eeExpect, err := NewFileCSVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc)
|
||||
if strings.Contains(errExpect, err.Error()) {
|
||||
t.Errorf("Expected %+v but got %+v", errExpect, err)
|
||||
@@ -73,10 +68,7 @@ func TestNewEventExporterCase2(t *testing.T) {
|
||||
t.Errorf("Expected %+v but got %+v", errExpect, err)
|
||||
}
|
||||
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
eeExpect, err := NewFileFWVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc)
|
||||
if strings.Contains(errExpect, err.Error()) {
|
||||
t.Errorf("Expected %+v but got %+v", errExpect, err)
|
||||
@@ -101,10 +93,7 @@ func TestNewEventExporterCase3(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
eeExpect, err := NewHTTPPostEE(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
@@ -126,10 +115,7 @@ func TestNewEventExporterCase4(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
eeExpect, err := NewHTTPjsonMapEE(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
@@ -151,13 +137,7 @@ func TestNewEventExporterCase6(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
eeExpect := NewVirtualEE(cgrCfg.EEsCfg().Exporters[0], dc)
|
||||
newEE := ee.(*VirtualEE)
|
||||
newEE.dc.MapStorage[utils.TimeNow] = nil
|
||||
@@ -190,10 +170,7 @@ func TestNewEventExporterCase7(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -35,6 +35,7 @@ import (
|
||||
// onCacheEvicted is called by ltcache when evicting an item
|
||||
func onCacheEvicted(_ string, value any) {
|
||||
ee := value.(EventExporter)
|
||||
ee.GetMetrics().StopCron()
|
||||
ee.Close()
|
||||
}
|
||||
|
||||
@@ -306,9 +307,7 @@ func exportEventWithExporter(exp EventExporter, ev *utils.CGREvent, oneTime bool
|
||||
}()
|
||||
var eEv any
|
||||
|
||||
exp.GetMetrics().Lock()
|
||||
exp.GetMetrics().MapStorage[utils.NumberOfEvents] = exp.GetMetrics().MapStorage[utils.NumberOfEvents].(int64) + 1
|
||||
exp.GetMetrics().Unlock()
|
||||
exp.GetMetrics().IncrementEvents()
|
||||
if len(exp.Cfg().ContentFields()) == 0 {
|
||||
if eEv, err = exp.PrepareMap(ev); err != nil {
|
||||
return
|
||||
|
||||
@@ -292,7 +292,7 @@ func TestV1ProcessEvent4(t *testing.T) {
|
||||
}
|
||||
|
||||
func newMockEventExporter() *mockEventExporter {
|
||||
return &mockEventExporter{dc: &utils.SafeMapStorage{
|
||||
return &mockEventExporter{dc: &utils.ExporterMetrics{
|
||||
MapStorage: utils.MapStorage{
|
||||
utils.NumberOfEvents: int64(0),
|
||||
utils.PositiveExports: utils.StringSet{},
|
||||
@@ -301,11 +301,11 @@ func newMockEventExporter() *mockEventExporter {
|
||||
}
|
||||
|
||||
type mockEventExporter struct {
|
||||
dc *utils.SafeMapStorage
|
||||
dc *utils.ExporterMetrics
|
||||
bytePreparing
|
||||
}
|
||||
|
||||
func (m mockEventExporter) GetMetrics() *utils.SafeMapStorage {
|
||||
func (m mockEventExporter) GetMetrics() *utils.ExporterMetrics {
|
||||
return m.dc
|
||||
}
|
||||
|
||||
@@ -438,7 +438,7 @@ func TestOnCacheEvicted(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUpdateEEMetrics(t *testing.T) {
|
||||
dc, _ := newEEMetrics(utils.EmptyString)
|
||||
dc := utils.NewExporterMetrics("", time.UTC)
|
||||
tnow := time.Now()
|
||||
ev := engine.MapEvent{
|
||||
utils.AnswerTime: tnow,
|
||||
@@ -447,7 +447,7 @@ func TestUpdateEEMetrics(t *testing.T) {
|
||||
utils.ToR: utils.MetaVoice,
|
||||
utils.Usage: time.Second,
|
||||
}
|
||||
exp, _ := newEEMetrics(utils.EmptyString)
|
||||
exp := utils.NewExporterMetrics("", time.UTC)
|
||||
exp.MapStorage[utils.FirstEventATime] = tnow
|
||||
exp.MapStorage[utils.LastEventATime] = tnow
|
||||
exp.MapStorage[utils.FirstExpOrderID] = int64(1)
|
||||
|
||||
@@ -38,14 +38,14 @@ import (
|
||||
type ElasticEE struct {
|
||||
mu sync.RWMutex
|
||||
cfg *config.EventExporterCfg
|
||||
dc *utils.SafeMapStorage
|
||||
dc *utils.ExporterMetrics
|
||||
reqs *concReq
|
||||
|
||||
client *elasticsearch.TypedClient
|
||||
clientCfg elasticsearch.Config
|
||||
}
|
||||
|
||||
func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*ElasticEE, error) {
|
||||
func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) (*ElasticEE, error) {
|
||||
el := &ElasticEE{
|
||||
cfg: cfg,
|
||||
dc: dc,
|
||||
@@ -222,4 +222,4 @@ func (e *ElasticEE) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *ElasticEE) GetMetrics() *utils.SafeMapStorage { return e.dc }
|
||||
func (e *ElasticEE) GetMetrics() *utils.ExporterMetrics { return e.dc }
|
||||
|
||||
@@ -20,6 +20,7 @@ package ees
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -27,10 +28,7 @@ import (
|
||||
)
|
||||
|
||||
func TestGetMetrics(t *testing.T) {
|
||||
dc, err := newEEMetrics("Local")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
ee := &ElasticEE{
|
||||
dc: dc,
|
||||
}
|
||||
@@ -61,10 +59,7 @@ func TestInitClient(t *testing.T) {
|
||||
|
||||
func TestElasticExportEventErr(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
dc, err := newEEMetrics("Local")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
|
||||
@@ -34,7 +34,7 @@ import (
|
||||
|
||||
func NewFileCSVee(cfg *config.EventExporterCfg,
|
||||
cgrCfg *config.CGRConfig, filterS *engine.FilterS,
|
||||
dc *utils.SafeMapStorage) (fCsv *FileCSVee, err error) {
|
||||
dc *utils.ExporterMetrics) (fCsv *FileCSVee, err error) {
|
||||
fCsv = &FileCSVee{
|
||||
cfg: cfg,
|
||||
dc: dc,
|
||||
@@ -49,7 +49,7 @@ func NewFileCSVee(cfg *config.EventExporterCfg,
|
||||
// FileCSVee implements EventExporter interface for .csv files
|
||||
type FileCSVee struct {
|
||||
cfg *config.EventExporterCfg
|
||||
dc *utils.SafeMapStorage
|
||||
dc *utils.ExporterMetrics
|
||||
file io.WriteCloser
|
||||
csvWriter *csv.Writer
|
||||
sync.Mutex
|
||||
@@ -129,4 +129,4 @@ func (fCsv *FileCSVee) Close() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (fCsv *FileCSVee) GetMetrics() *utils.SafeMapStorage { return fCsv.dc }
|
||||
func (fCsv *FileCSVee) GetMetrics() *utils.ExporterMetrics { return fCsv.dc }
|
||||
|
||||
@@ -677,13 +677,7 @@ func TestCsvInitFileCSV(t *testing.T) {
|
||||
if err := os.MkdirAll("/tmp/TestInitFileCSV", 0666); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
fCsv := &FileCSVee{
|
||||
cgrCfg: cgrCfg,
|
||||
cfg: cgrCfg.EEsCfg().Exporters[0],
|
||||
|
||||
@@ -24,6 +24,7 @@ import (
|
||||
"io"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -31,13 +32,7 @@ import (
|
||||
)
|
||||
|
||||
func TestFileCsvGetMetrics(t *testing.T) {
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
fCsv := &FileCSVee{dc: dc}
|
||||
|
||||
if rcv := fCsv.GetMetrics(); !reflect.DeepEqual(rcv, fCsv.dc) {
|
||||
@@ -64,7 +59,7 @@ func TestFileCsvComposeHeader(t *testing.T) {
|
||||
filterS: filterS,
|
||||
file: nopCloser{byteBuff},
|
||||
csvWriter: csvNW,
|
||||
dc: &utils.SafeMapStorage{},
|
||||
dc: &utils.ExporterMetrics{},
|
||||
}
|
||||
fCsv.Cfg().Fields = []*config.FCTemplate{
|
||||
{
|
||||
@@ -127,7 +122,7 @@ func TestFileCsvComposeTrailer(t *testing.T) {
|
||||
filterS: filterS,
|
||||
file: nopCloser{byteBuff},
|
||||
csvWriter: csvNW,
|
||||
dc: &utils.SafeMapStorage{},
|
||||
dc: &utils.ExporterMetrics{},
|
||||
}
|
||||
fCsv.Cfg().Fields = []*config.FCTemplate{
|
||||
{
|
||||
@@ -184,13 +179,7 @@ func TestFileCsvExportEvent(t *testing.T) {
|
||||
filterS := engine.NewFilterS(cfg, nil, newDM)
|
||||
byteBuff := new(bytes.Buffer)
|
||||
csvNW := csv.NewWriter(byteBuff)
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
fCsv := &FileCSVee{
|
||||
cfg: cfg.EEsCfg().Exporters[0],
|
||||
cgrCfg: cfg,
|
||||
@@ -223,7 +212,7 @@ func TestFileCsvOnEvictedTrailer(t *testing.T) {
|
||||
filterS: filterS,
|
||||
file: nopCloserWrite{byteBuff},
|
||||
csvWriter: csvNW,
|
||||
dc: &utils.SafeMapStorage{},
|
||||
dc: &utils.ExporterMetrics{},
|
||||
}
|
||||
fCsv.Cfg().Fields = []*config.FCTemplate{
|
||||
{
|
||||
@@ -257,7 +246,7 @@ func TestFileCsvOnEvictedClose(t *testing.T) {
|
||||
filterS: filterS,
|
||||
file: nopCloserError{byteBuff},
|
||||
csvWriter: csvNW,
|
||||
dc: &utils.SafeMapStorage{},
|
||||
dc: &utils.ExporterMetrics{},
|
||||
}
|
||||
fCsv.Cfg().Fields = []*config.FCTemplate{
|
||||
{
|
||||
|
||||
@@ -30,7 +30,7 @@ import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, dc *utils.SafeMapStorage) (fFwv *FileFWVee, err error) {
|
||||
func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, dc *utils.ExporterMetrics) (fFwv *FileFWVee, err error) {
|
||||
fFwv = &FileFWVee{
|
||||
cfg: cfg,
|
||||
dc: dc,
|
||||
@@ -45,7 +45,7 @@ func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filter
|
||||
// FileFWVee implements EventExporter interface for .fwv files
|
||||
type FileFWVee struct {
|
||||
cfg *config.EventExporterCfg
|
||||
dc *utils.SafeMapStorage
|
||||
dc *utils.ExporterMetrics
|
||||
file io.WriteCloser
|
||||
sync.Mutex
|
||||
slicePreparing
|
||||
@@ -136,4 +136,4 @@ func (fFwv *FileFWVee) Close() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (fFwv *FileFWVee) GetMetrics() *utils.SafeMapStorage { return fFwv.dc }
|
||||
func (fFwv *FileFWVee) GetMetrics() *utils.ExporterMetrics { return fFwv.dc }
|
||||
|
||||
@@ -164,13 +164,7 @@ func TestFileFwvInit(t *testing.T) {
|
||||
if err := os.MkdirAll("/tmp/TestInitFileFWV", 0666); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
fFwv := &FileFWVee{
|
||||
cgrCfg: cgrCfg,
|
||||
cfg: cgrCfg.EEsCfg().Exporters[0],
|
||||
|
||||
@@ -24,6 +24,7 @@ import (
|
||||
"io"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -31,10 +32,7 @@ import (
|
||||
)
|
||||
|
||||
func TestFileFwvGetMetrics(t *testing.T) {
|
||||
dc, err := newEEMetrics("Local")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
fFwv := &FileFWVee{dc: dc}
|
||||
|
||||
if rcv := fFwv.GetMetrics(); !reflect.DeepEqual(rcv, fFwv.dc) {
|
||||
@@ -54,7 +52,7 @@ func TestFileFwvComposeHeader(t *testing.T) {
|
||||
cgrCfg: cfg,
|
||||
filterS: filterS,
|
||||
file: nopCloser{byteBuff},
|
||||
dc: &utils.SafeMapStorage{},
|
||||
dc: &utils.ExporterMetrics{},
|
||||
}
|
||||
fFwv.Cfg().Fields = []*config.FCTemplate{
|
||||
{
|
||||
@@ -116,7 +114,7 @@ func TestFileFwvComposeTrailer(t *testing.T) {
|
||||
cgrCfg: cfg,
|
||||
filterS: filterS,
|
||||
file: nopCloser{byteBuff},
|
||||
dc: &utils.SafeMapStorage{},
|
||||
dc: &utils.ExporterMetrics{},
|
||||
}
|
||||
fFwv.Cfg().Fields = []*config.FCTemplate{
|
||||
{
|
||||
@@ -173,10 +171,7 @@ func TestFileFwvExportEvent(t *testing.T) {
|
||||
filterS := engine.NewFilterS(cfg, nil, newDM)
|
||||
byteBuff := new(bytes.Buffer)
|
||||
csvNW := csv.NewWriter(byteBuff)
|
||||
dc, err := newEEMetrics("Local")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
fFwv := &FileFWVee{
|
||||
cfg: cfg.EEsCfg().Exporters[0],
|
||||
cgrCfg: cfg,
|
||||
@@ -209,10 +204,7 @@ func TestFileFwvExportEventWriteError(t *testing.T) {
|
||||
newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil)
|
||||
filterS := engine.NewFilterS(cfg, nil, newDM)
|
||||
byteBuff := new(bytes.Buffer)
|
||||
dc, err := newEEMetrics("Local")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
fFwv := &FileFWVee{
|
||||
cfg: cfg.EEsCfg().Exporters[0],
|
||||
cgrCfg: cfg,
|
||||
@@ -236,7 +228,7 @@ func TestFileFwvComposeHeaderWriteError(t *testing.T) {
|
||||
cgrCfg: cfg,
|
||||
filterS: filterS,
|
||||
file: nopCloserWrite{byteBuff},
|
||||
dc: &utils.SafeMapStorage{},
|
||||
dc: &utils.ExporterMetrics{},
|
||||
}
|
||||
fFwv.Cfg().Fields = []*config.FCTemplate{
|
||||
{
|
||||
@@ -268,7 +260,7 @@ func TestFileFwvComposeTrailerWriteError(t *testing.T) {
|
||||
cgrCfg: cfg,
|
||||
filterS: filterS,
|
||||
file: nopCloserWrite{byteBuff},
|
||||
dc: &utils.SafeMapStorage{},
|
||||
dc: &utils.ExporterMetrics{},
|
||||
}
|
||||
fFwv.Cfg().Fields = []*config.FCTemplate{
|
||||
{
|
||||
@@ -299,7 +291,7 @@ func TestFileFwvOnEvictedTrailer(t *testing.T) {
|
||||
cgrCfg: cfg,
|
||||
filterS: filterS,
|
||||
file: nopCloserWrite{byteBuff},
|
||||
dc: &utils.SafeMapStorage{},
|
||||
dc: &utils.ExporterMetrics{},
|
||||
}
|
||||
fFwv.Cfg().Fields = []*config.FCTemplate{
|
||||
{
|
||||
@@ -337,7 +329,7 @@ func TestFileFwvOnEvictedClose(t *testing.T) {
|
||||
cgrCfg: cfg,
|
||||
filterS: filterS,
|
||||
file: nopCloserError{byteBuff},
|
||||
dc: &utils.SafeMapStorage{},
|
||||
dc: &utils.ExporterMetrics{},
|
||||
}
|
||||
fFwv.Cfg().Fields = []*config.FCTemplate{
|
||||
{
|
||||
|
||||
@@ -33,7 +33,7 @@ import (
|
||||
)
|
||||
|
||||
func NewHTTPjsonMapEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS,
|
||||
dc *utils.SafeMapStorage) (pstrJSON *HTTPjsonMapEE, err error) {
|
||||
dc *utils.ExporterMetrics) (pstrJSON *HTTPjsonMapEE, err error) {
|
||||
pstrJSON = &HTTPjsonMapEE{
|
||||
cfg: cfg,
|
||||
dc: dc,
|
||||
@@ -47,7 +47,7 @@ func NewHTTPjsonMapEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, fi
|
||||
// HTTPjsonMapEE implements EventExporter interface for .csv files
|
||||
type HTTPjsonMapEE struct {
|
||||
cfg *config.EventExporterCfg
|
||||
dc *utils.SafeMapStorage
|
||||
dc *utils.ExporterMetrics
|
||||
client *http.Client
|
||||
reqs *concReq
|
||||
|
||||
@@ -91,7 +91,7 @@ func (httpEE *HTTPjsonMapEE) ExportEvent(content any, _ string) (err error) {
|
||||
|
||||
func (httpEE *HTTPjsonMapEE) Close() (_ error) { return }
|
||||
|
||||
func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.SafeMapStorage { return httpEE.dc }
|
||||
func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.ExporterMetrics { return httpEE.dc }
|
||||
|
||||
func (httpEE *HTTPjsonMapEE) PrepareMap(mp *utils.CGREvent) (any, error) {
|
||||
body, err := json.Marshal(mp.Event)
|
||||
|
||||
@@ -32,10 +32,7 @@ import (
|
||||
)
|
||||
|
||||
func TestHttpJsonMapGetMetrics(t *testing.T) {
|
||||
dc, err := newEEMetrics("Local")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
httpEE := &HTTPjsonMapEE{
|
||||
dc: dc,
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ import (
|
||||
)
|
||||
|
||||
func NewHTTPPostEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS,
|
||||
dc *utils.SafeMapStorage) (httpPost *HTTPPostEE, err error) {
|
||||
dc *utils.ExporterMetrics) (httpPost *HTTPPostEE, err error) {
|
||||
httpPost = &HTTPPostEE{
|
||||
cfg: cfg,
|
||||
dc: dc,
|
||||
@@ -43,7 +43,7 @@ func NewHTTPPostEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filte
|
||||
// FileCSVee implements EventExporter interface for .csv files
|
||||
type HTTPPostEE struct {
|
||||
cfg *config.EventExporterCfg
|
||||
dc *utils.SafeMapStorage
|
||||
dc *utils.ExporterMetrics
|
||||
client *http.Client
|
||||
reqs *concReq
|
||||
|
||||
@@ -92,7 +92,7 @@ func (httpPost *HTTPPostEE) ExportEvent(content any, _ string) (err error) {
|
||||
|
||||
func (httpPost *HTTPPostEE) Close() (_ error) { return }
|
||||
|
||||
func (httpPost *HTTPPostEE) GetMetrics() *utils.SafeMapStorage { return httpPost.dc }
|
||||
func (httpPost *HTTPPostEE) GetMetrics() *utils.ExporterMetrics { return httpPost.dc }
|
||||
|
||||
func (httpPost *HTTPPostEE) PrepareMap(mp *utils.CGREvent) (any, error) {
|
||||
urlVals := url.Values{}
|
||||
|
||||
@@ -33,13 +33,7 @@ import (
|
||||
)
|
||||
|
||||
func TestHttpPostGetMetrics(t *testing.T) {
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
httpPost := &HTTPPostEE{
|
||||
dc: dc,
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ import (
|
||||
)
|
||||
|
||||
// NewKafkaEE creates a kafka poster
|
||||
func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*KafkaEE, error) {
|
||||
func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) (*KafkaEE, error) {
|
||||
pstr := &KafkaEE{
|
||||
cfg: cfg,
|
||||
dc: dc,
|
||||
@@ -99,7 +99,7 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*KafkaE
|
||||
type KafkaEE struct {
|
||||
writer *kafka.Writer
|
||||
cfg *config.EventExporterCfg
|
||||
dc *utils.SafeMapStorage
|
||||
dc *utils.ExporterMetrics
|
||||
reqs *concReq
|
||||
bytePreparing
|
||||
}
|
||||
@@ -130,4 +130,4 @@ func (k *KafkaEE) Close() error {
|
||||
return k.writer.Close()
|
||||
}
|
||||
|
||||
func (k *KafkaEE) GetMetrics() *utils.SafeMapStorage { return k.dc }
|
||||
func (k *KafkaEE) GetMetrics() *utils.ExporterMetrics { return k.dc }
|
||||
|
||||
@@ -29,7 +29,7 @@ func TestKafkaEEConnect(t *testing.T) {
|
||||
kafkaEE := &KafkaEE{
|
||||
writer: nil,
|
||||
cfg: &config.EventExporterCfg{},
|
||||
dc: &utils.SafeMapStorage{},
|
||||
dc: &utils.ExporterMetrics{},
|
||||
reqs: &concReq{},
|
||||
}
|
||||
err := kafkaEE.Connect()
|
||||
@@ -50,7 +50,7 @@ func TestKafkaEE_Cfg(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestKafkaEEGetMetrics(t *testing.T) {
|
||||
safeMapStorage := &utils.SafeMapStorage{}
|
||||
safeMapStorage := &utils.ExporterMetrics{}
|
||||
kafkaEE := &KafkaEE{
|
||||
dc: safeMapStorage,
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func NewLogEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *LogEE {
|
||||
func NewLogEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *LogEE {
|
||||
return &LogEE{
|
||||
cfg: cfg,
|
||||
dc: dc,
|
||||
@@ -36,7 +36,7 @@ func NewLogEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *LogEE {
|
||||
// LogEE implements EventExporter interface for .csv files
|
||||
type LogEE struct {
|
||||
cfg *config.EventExporterCfg
|
||||
dc *utils.SafeMapStorage
|
||||
dc *utils.ExporterMetrics
|
||||
}
|
||||
|
||||
func (vEe *LogEE) Cfg() *config.EventExporterCfg { return vEe.cfg }
|
||||
@@ -47,8 +47,8 @@ func (vEe *LogEE) ExportEvent(mp any, _ string) error {
|
||||
utils.EEs, vEe.Cfg().ID, utils.ToJSON(mp)))
|
||||
return nil
|
||||
}
|
||||
func (vEe *LogEE) Close() error { return nil }
|
||||
func (vEe *LogEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc }
|
||||
func (vEe *LogEE) Close() error { return nil }
|
||||
func (vEe *LogEE) GetMetrics() *utils.ExporterMetrics { return vEe.dc }
|
||||
func (vEe *LogEE) PrepareMap(mp *utils.CGREvent) (any, error) {
|
||||
return mp.Event, nil
|
||||
}
|
||||
|
||||
@@ -74,7 +74,7 @@ func TestLogEE_Close(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLogEE_GetMetrics(t *testing.T) {
|
||||
mockMetrics := &utils.SafeMapStorage{}
|
||||
mockMetrics := &utils.ExporterMetrics{}
|
||||
|
||||
vEe := &LogEE{
|
||||
dc: mockMetrics,
|
||||
@@ -104,7 +104,7 @@ func TestLogEE_PrepareMap(t *testing.T) {
|
||||
func TestNewLogEE(t *testing.T) {
|
||||
|
||||
cfg := &config.EventExporterCfg{}
|
||||
dc := &utils.SafeMapStorage{}
|
||||
dc := &utils.ExporterMetrics{}
|
||||
|
||||
logEE := NewLogEE(cfg, dc)
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ import (
|
||||
)
|
||||
|
||||
// NewNatsEE creates a kafka poster
|
||||
func NewNatsEE(cfg *config.EventExporterCfg, nodeID string, connTimeout time.Duration, dc *utils.SafeMapStorage) (natsPstr *NatsEE, err error) {
|
||||
func NewNatsEE(cfg *config.EventExporterCfg, nodeID string, connTimeout time.Duration, dc *utils.ExporterMetrics) (natsPstr *NatsEE, err error) {
|
||||
natsPstr = &NatsEE{
|
||||
cfg: cfg,
|
||||
dc: dc,
|
||||
@@ -55,7 +55,7 @@ type NatsEE struct {
|
||||
posterJS jetstream.JetStream
|
||||
|
||||
cfg *config.EventExporterCfg
|
||||
dc *utils.SafeMapStorage
|
||||
dc *utils.ExporterMetrics
|
||||
reqs *concReq
|
||||
sync.RWMutex // protect writer
|
||||
bytePreparing
|
||||
@@ -140,7 +140,7 @@ func (pstr *NatsEE) Close() error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (pstr *NatsEE) GetMetrics() *utils.SafeMapStorage { return pstr.dc }
|
||||
func (pstr *NatsEE) GetMetrics() *utils.ExporterMetrics { return pstr.dc }
|
||||
|
||||
func GetNatsOpts(opts *config.NATSOpts, nodeID string, connTimeout time.Duration) ([]nats.Option, error) {
|
||||
natsOpts := make([]nats.Option, 0, 7)
|
||||
|
||||
@@ -43,10 +43,7 @@ func TestNewNatsEE(t *testing.T) {
|
||||
}
|
||||
nodeID := "node_id1"
|
||||
connTimeout := 2 * time.Second
|
||||
dc, err := newEEMetrics("Local")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
|
||||
exp := new(NatsEE)
|
||||
exp.cfg = cfg
|
||||
@@ -90,10 +87,7 @@ func TestParseOpt(t *testing.T) {
|
||||
opts := &config.EventExporterOpts{}
|
||||
nodeID := "node_id1"
|
||||
connTimeout := 2 * time.Second
|
||||
dc, err := newEEMetrics("Local")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
pstr, err := NewNatsEE(cfg, nodeID, connTimeout, dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
@@ -127,10 +121,7 @@ func TestParseOptJetStream(t *testing.T) {
|
||||
}
|
||||
nodeID := "node_id1"
|
||||
connTimeout := 2 * time.Second
|
||||
dc, err := newEEMetrics("Local")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
pstr, err := NewNatsEE(cfg, nodeID, connTimeout, dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
@@ -167,10 +158,7 @@ func TestParseOptSubject(t *testing.T) {
|
||||
}}
|
||||
nodeID := "node_id1"
|
||||
connTimeout := 2 * time.Second
|
||||
dc, err := newEEMetrics("Local")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
pstr, err := NewNatsEE(cfg, nodeID, connTimeout, dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
@@ -252,7 +240,7 @@ func TestNatsEECfg(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNatsEEGetMetrics(t *testing.T) {
|
||||
expectedMetrics := &utils.SafeMapStorage{}
|
||||
expectedMetrics := &utils.ExporterMetrics{}
|
||||
pstr := &NatsEE{
|
||||
dc: expectedMetrics,
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage,
|
||||
func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics,
|
||||
connMgr *engine.ConnManager) (e *RPCee, err error) {
|
||||
e = &RPCee{
|
||||
cfg: cfg,
|
||||
@@ -42,7 +42,7 @@ func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage,
|
||||
|
||||
type RPCee struct {
|
||||
cfg *config.EventExporterCfg
|
||||
dc *utils.SafeMapStorage
|
||||
dc *utils.ExporterMetrics
|
||||
connMgr *engine.ConnManager
|
||||
|
||||
//opts
|
||||
@@ -81,7 +81,7 @@ func (e *RPCee) Close() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (e *RPCee) GetMetrics() (mp *utils.SafeMapStorage) {
|
||||
func (e *RPCee) GetMetrics() (mp *utils.ExporterMetrics) {
|
||||
return e.dc
|
||||
}
|
||||
|
||||
|
||||
@@ -31,10 +31,7 @@ import (
|
||||
|
||||
func TestNewRpcEE(t *testing.T) {
|
||||
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
|
||||
dc, err := newEEMetrics("Local")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
connMgr := engine.NewConnManager(config.NewDefaultCGRConfig(), make(map[string]chan birpc.ClientConnector))
|
||||
|
||||
rcv, err := NewRpcEE(eeSCfg, dc, connMgr)
|
||||
@@ -104,10 +101,7 @@ func TestRPCCfg(t *testing.T) {
|
||||
|
||||
func TestRPCConnect(t *testing.T) {
|
||||
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
|
||||
dc, err := newEEMetrics("Local")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
connMgr := engine.NewConnManager(config.NewDefaultCGRConfig(), make(map[string]chan birpc.ClientConnector))
|
||||
rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr)
|
||||
if err != nil {
|
||||
@@ -120,10 +114,7 @@ func TestRPCConnect(t *testing.T) {
|
||||
|
||||
func TestRPCClose(t *testing.T) {
|
||||
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
|
||||
dc, err := newEEMetrics("Local")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
connMgr := engine.NewConnManager(config.NewDefaultCGRConfig(), make(map[string]chan birpc.ClientConnector))
|
||||
rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr)
|
||||
if err != nil {
|
||||
@@ -139,7 +130,7 @@ func TestRPCClose(t *testing.T) {
|
||||
|
||||
func TestRPCGetMetrics(t *testing.T) {
|
||||
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
|
||||
dc := &utils.SafeMapStorage{
|
||||
dc := &utils.ExporterMetrics{
|
||||
MapStorage: utils.MapStorage{
|
||||
"time": "now",
|
||||
"just_a_field": "just_a_value",
|
||||
@@ -158,10 +149,7 @@ func TestRPCGetMetrics(t *testing.T) {
|
||||
|
||||
func TestRPCPrepareMap(t *testing.T) {
|
||||
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
|
||||
dc, err := newEEMetrics("Local")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
connMgr := engine.NewConnManager(config.NewDefaultCGRConfig(), make(map[string]chan birpc.ClientConnector))
|
||||
rpcEe, err := NewRpcEE(eeSCfg, dc, connMgr)
|
||||
if err != nil {
|
||||
|
||||
@@ -32,7 +32,7 @@ import (
|
||||
)
|
||||
|
||||
// NewS3EE creates a s3 poster
|
||||
func NewS3EE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *S3EE {
|
||||
func NewS3EE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *S3EE {
|
||||
pstr := &S3EE{
|
||||
cfg: cfg,
|
||||
dc: dc,
|
||||
@@ -54,7 +54,7 @@ type S3EE struct {
|
||||
up *s3manager.Uploader
|
||||
|
||||
cfg *config.EventExporterCfg
|
||||
dc *utils.SafeMapStorage
|
||||
dc *utils.ExporterMetrics
|
||||
reqs *concReq
|
||||
sync.RWMutex // protect connection
|
||||
bytePreparing
|
||||
@@ -137,4 +137,4 @@ func (pstr *S3EE) ExportEvent(message any, key string) (err error) {
|
||||
|
||||
func (pstr *S3EE) Close() (_ error) { return }
|
||||
|
||||
func (pstr *S3EE) GetMetrics() *utils.SafeMapStorage { return pstr.dc }
|
||||
func (pstr *S3EE) GetMetrics() *utils.ExporterMetrics { return pstr.dc }
|
||||
|
||||
@@ -26,7 +26,7 @@ import (
|
||||
)
|
||||
|
||||
func TestS3GetMetrics(t *testing.T) {
|
||||
safeMapStorage := &utils.SafeMapStorage{}
|
||||
safeMapStorage := &utils.ExporterMetrics{}
|
||||
pstr := &S3EE{
|
||||
dc: safeMapStorage,
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ import (
|
||||
)
|
||||
|
||||
func NewSQLEe(cfg *config.EventExporterCfg,
|
||||
dc *utils.SafeMapStorage) (sqlEe *SQLEe, err error) {
|
||||
dc *utils.ExporterMetrics) (sqlEe *SQLEe, err error) {
|
||||
sqlEe = &SQLEe{
|
||||
cfg: cfg,
|
||||
dc: dc,
|
||||
@@ -48,7 +48,7 @@ func NewSQLEe(cfg *config.EventExporterCfg,
|
||||
// SQLEe implements EventExporter interface for SQL
|
||||
type SQLEe struct {
|
||||
cfg *config.EventExporterCfg
|
||||
dc *utils.SafeMapStorage
|
||||
dc *utils.ExporterMetrics
|
||||
db *gorm.DB
|
||||
sqldb *sql.DB
|
||||
reqs *concReq
|
||||
@@ -156,7 +156,7 @@ func (sqlEe *SQLEe) Close() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (sqlEe *SQLEe) GetMetrics() *utils.SafeMapStorage { return sqlEe.dc }
|
||||
func (sqlEe *SQLEe) GetMetrics() *utils.ExporterMetrics { return sqlEe.dc }
|
||||
|
||||
// Create the sqlPosterRequest used to instert the map into the table
|
||||
func (sqlEe *SQLEe) PrepareMap(cgrEv *utils.CGREvent) (any, error) {
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -32,10 +33,7 @@ import (
|
||||
)
|
||||
|
||||
func TestSqlGetMetrics(t *testing.T) {
|
||||
dc, err := newEEMetrics("Local")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
sqlEe := &SQLEe{
|
||||
dc: dc,
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ import (
|
||||
)
|
||||
|
||||
// NewSQSee creates a poster for sqs
|
||||
func NewSQSee(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *SQSee {
|
||||
func NewSQSee(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *SQSee {
|
||||
pstr := &SQSee{
|
||||
cfg: cfg,
|
||||
dc: dc,
|
||||
@@ -53,7 +53,7 @@ type SQSee struct {
|
||||
svc *sqs.SQS
|
||||
|
||||
cfg *config.EventExporterCfg
|
||||
dc *utils.SafeMapStorage
|
||||
dc *utils.ExporterMetrics
|
||||
reqs *concReq
|
||||
sync.RWMutex // protect connection
|
||||
bytePreparing
|
||||
@@ -147,4 +147,4 @@ func (pstr *SQSee) ExportEvent(message any, _ string) (err error) {
|
||||
|
||||
func (pstr *SQSee) Close() (_ error) { return }
|
||||
|
||||
func (pstr *SQSee) GetMetrics() *utils.SafeMapStorage { return pstr.dc }
|
||||
func (pstr *SQSee) GetMetrics() *utils.ExporterMetrics { return pstr.dc }
|
||||
|
||||
@@ -26,7 +26,7 @@ import (
|
||||
)
|
||||
|
||||
func TestGetMetricsT(t *testing.T) {
|
||||
metrics := &utils.SafeMapStorage{}
|
||||
metrics := &utils.ExporterMetrics{}
|
||||
pstr := &SQSee{dc: metrics}
|
||||
result := pstr.GetMetrics()
|
||||
if result != metrics {
|
||||
|
||||
@@ -26,7 +26,7 @@ import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func NewVirtualEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *VirtualEE {
|
||||
func NewVirtualEE(cfg *config.EventExporterCfg, dc *utils.ExporterMetrics) *VirtualEE {
|
||||
return &VirtualEE{
|
||||
cfg: cfg,
|
||||
dc: dc,
|
||||
@@ -36,7 +36,7 @@ func NewVirtualEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *Virtu
|
||||
// VirtualEE implements EventExporter interface for .csv files
|
||||
type VirtualEE struct {
|
||||
cfg *config.EventExporterCfg
|
||||
dc *utils.SafeMapStorage
|
||||
dc *utils.ExporterMetrics
|
||||
}
|
||||
|
||||
func (vEe *VirtualEE) Cfg() *config.EventExporterCfg { return vEe.cfg }
|
||||
@@ -49,8 +49,8 @@ func (vEe *VirtualEE) ExportEvent(payload any, _ string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (vEe *VirtualEE) Close() error { return nil }
|
||||
func (vEe *VirtualEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc }
|
||||
func (vEe *VirtualEE) Close() error { return nil }
|
||||
func (vEe *VirtualEE) GetMetrics() *utils.ExporterMetrics { return vEe.dc }
|
||||
|
||||
func (vEe *VirtualEE) PrepareMap(cgrEv *utils.CGREvent) (any, error) {
|
||||
return cgrEv.Event, nil
|
||||
|
||||
@@ -21,6 +21,7 @@ package ees
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -28,10 +29,7 @@ import (
|
||||
)
|
||||
|
||||
func TestVirtualEeGetMetrics(t *testing.T) {
|
||||
dc, err := newEEMetrics("Local")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dc := utils.NewExporterMetrics("", time.Local)
|
||||
vEe := &VirtualEE{
|
||||
dc: dc,
|
||||
}
|
||||
|
||||
151
utils/exportermetrics.go
Normal file
151
utils/exportermetrics.go
Normal file
@@ -0,0 +1,151 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package utils
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cron"
|
||||
)
|
||||
|
||||
// ExporterMetrics stores export statistics with thread-safe access and
|
||||
// cron-scheduled resets.
|
||||
type ExporterMetrics struct {
|
||||
mu sync.RWMutex
|
||||
MapStorage MapStorage
|
||||
cron *cron.Cron
|
||||
loc *time.Location
|
||||
}
|
||||
|
||||
// NewExporterMetrics creates metrics with optional automatic reset.
|
||||
// schedule is a cron expression for reset timing (empty to disable).
|
||||
func NewExporterMetrics(schedule string, loc *time.Location) *ExporterMetrics {
|
||||
m := &ExporterMetrics{
|
||||
loc: loc,
|
||||
}
|
||||
m.Reset() // init MapStorage with default values
|
||||
|
||||
if schedule != "" {
|
||||
m.cron = cron.New()
|
||||
m.cron.AddFunc(schedule, func() {
|
||||
m.Reset()
|
||||
})
|
||||
m.cron.Start()
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// Reset immediately clears all metrics and resets them to initial values.
|
||||
func (m *ExporterMetrics) Reset() {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.MapStorage = MapStorage{
|
||||
NumberOfEvents: int64(0),
|
||||
PositiveExports: StringSet{},
|
||||
NegativeExports: StringSet{},
|
||||
TimeNow: time.Now().In(m.loc),
|
||||
}
|
||||
}
|
||||
|
||||
// StopCron stops the automatic reset schedule if one is active.
|
||||
func (m *ExporterMetrics) StopCron() {
|
||||
if m.cron == nil {
|
||||
return
|
||||
}
|
||||
m.cron.Stop()
|
||||
// ctx := m.cron.Stop()
|
||||
// <-ctx.Done() // wait for any running jobs to complete
|
||||
}
|
||||
|
||||
// String returns the map as json string.
|
||||
func (m *ExporterMetrics) String() string {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.MapStorage.String()
|
||||
}
|
||||
|
||||
// FieldAsInterface returns the value from the path.
|
||||
func (m *ExporterMetrics) FieldAsInterface(fldPath []string) (val any, err error) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.MapStorage.FieldAsInterface(fldPath)
|
||||
}
|
||||
|
||||
// FieldAsString returns the value from path as string.
|
||||
func (m *ExporterMetrics) FieldAsString(fldPath []string) (str string, err error) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.MapStorage.FieldAsString(fldPath)
|
||||
}
|
||||
|
||||
// Set sets the value at the given path.
|
||||
func (m *ExporterMetrics) Set(fldPath []string, val any) (err error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
return m.MapStorage.Set(fldPath, val)
|
||||
}
|
||||
|
||||
// GetKeys returns all the keys from map.
|
||||
func (m *ExporterMetrics) GetKeys(nested bool, nestedLimit int, prefix string) (keys []string) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.MapStorage.GetKeys(nested, nestedLimit, prefix)
|
||||
}
|
||||
|
||||
// Remove removes the item at path
|
||||
func (m *ExporterMetrics) Remove(fldPath []string) (err error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
return m.MapStorage.Remove(fldPath)
|
||||
}
|
||||
|
||||
func (m *ExporterMetrics) ClonedMapStorage() (msClone MapStorage) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.MapStorage.Clone()
|
||||
}
|
||||
|
||||
// IncrementEvents increases the event counter (NumberOfEvents) by 1.
|
||||
func (m *ExporterMetrics) IncrementEvents() {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
count, _ := m.MapStorage[NumberOfEvents].(int64)
|
||||
m.MapStorage[NumberOfEvents] = count + 1
|
||||
}
|
||||
|
||||
// Lock locks the ExporterMetrics mutex.
|
||||
func (m *ExporterMetrics) Lock() {
|
||||
m.mu.Lock()
|
||||
}
|
||||
|
||||
// Unlock unlocks the ExporterMetrics mutex.
|
||||
func (m *ExporterMetrics) Unlock() {
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
// RLock locks the ExporterMetrics mutex for reading.
|
||||
func (m *ExporterMetrics) RLock() {
|
||||
m.mu.RLock()
|
||||
}
|
||||
|
||||
// RUnlock unlocks the read lock on the ExporterMetrics mutex.
|
||||
func (m *ExporterMetrics) RUnlock() {
|
||||
m.mu.RUnlock()
|
||||
}
|
||||
@@ -23,8 +23,8 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSafeMapStorageString(t *testing.T) {
|
||||
ms := &SafeMapStorage{
|
||||
func TestExporterMetricsString(t *testing.T) {
|
||||
ms := &ExporterMetrics{
|
||||
MapStorage: MapStorage{
|
||||
"field1": 2,
|
||||
},
|
||||
@@ -35,8 +35,8 @@ func TestSafeMapStorageString(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSafeMapStorageFieldAsInterface(t *testing.T) {
|
||||
ms := &SafeMapStorage{
|
||||
func TestExporterMetricsFieldAsInterface(t *testing.T) {
|
||||
ms := &ExporterMetrics{
|
||||
MapStorage: MapStorage{
|
||||
"field1": 2,
|
||||
},
|
||||
@@ -51,8 +51,8 @@ func TestSafeMapStorageFieldAsInterface(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSafeMapStorageFieldAsString(t *testing.T) {
|
||||
ms := &SafeMapStorage{
|
||||
func TestExporterMetricsFieldAsString(t *testing.T) {
|
||||
ms := &ExporterMetrics{
|
||||
MapStorage: MapStorage{
|
||||
"field1": 2,
|
||||
},
|
||||
@@ -67,14 +67,14 @@ func TestSafeMapStorageFieldAsString(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSafeMapStorageSet(t *testing.T) {
|
||||
ms := &SafeMapStorage{
|
||||
func TestExporterMetricsSet(t *testing.T) {
|
||||
ms := &ExporterMetrics{
|
||||
MapStorage: MapStorage{
|
||||
"field1": 2,
|
||||
},
|
||||
}
|
||||
|
||||
expected := &SafeMapStorage{
|
||||
expected := &ExporterMetrics{
|
||||
MapStorage: MapStorage{
|
||||
"field1": 2,
|
||||
"field2": 3,
|
||||
@@ -88,8 +88,8 @@ func TestSafeMapStorageSet(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSafeMapStorageGetKeys(t *testing.T) {
|
||||
ms := &SafeMapStorage{
|
||||
func TestExporterMetricsGetKeys(t *testing.T) {
|
||||
ms := &ExporterMetrics{
|
||||
MapStorage: MapStorage{
|
||||
"field1": 2,
|
||||
"field2": 3,
|
||||
@@ -104,15 +104,15 @@ func TestSafeMapStorageGetKeys(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSafeMapStorageRemove(t *testing.T) {
|
||||
ms := &SafeMapStorage{
|
||||
func TestExporterMetricsRemove(t *testing.T) {
|
||||
ms := &ExporterMetrics{
|
||||
MapStorage: MapStorage{
|
||||
"field1": 2,
|
||||
"field2": 3,
|
||||
},
|
||||
}
|
||||
|
||||
expected := &SafeMapStorage{
|
||||
expected := &ExporterMetrics{
|
||||
MapStorage: MapStorage{
|
||||
"field1": 2,
|
||||
},
|
||||
@@ -125,29 +125,9 @@ func TestSafeMapStorageRemove(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSafeMapStorageClone(t *testing.T) {
|
||||
ms := &SafeMapStorage{
|
||||
MapStorage: MapStorage{
|
||||
"field1": 2,
|
||||
"field2": 3,
|
||||
},
|
||||
}
|
||||
func TestExporterMetricsClonedMapStorage(t *testing.T) {
|
||||
|
||||
expected := &SafeMapStorage{
|
||||
MapStorage: MapStorage{
|
||||
"field1": 2,
|
||||
"field2": 3,
|
||||
},
|
||||
}
|
||||
|
||||
if reply := ms.Clone(); !reflect.DeepEqual(reply, expected) {
|
||||
t.Errorf("Expected %v \n but received \n %v", expected, reply)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSafeMapClonedMapStorage(t *testing.T) {
|
||||
|
||||
ms := &SafeMapStorage{
|
||||
ms := &ExporterMetrics{
|
||||
MapStorage: MapStorage{
|
||||
"field1": 2,
|
||||
"field2": 3,
|
||||
@@ -1,83 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package utils
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// SafeMapStorage is a Mapstorage with mutex
|
||||
type SafeMapStorage struct {
|
||||
MapStorage
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
// String returns the map as json string
|
||||
func (ms *SafeMapStorage) String() string {
|
||||
ms.RLock()
|
||||
defer ms.RUnlock()
|
||||
return ms.MapStorage.String()
|
||||
}
|
||||
|
||||
// FieldAsInterface returns the value from the path
|
||||
func (ms *SafeMapStorage) FieldAsInterface(fldPath []string) (val any, err error) {
|
||||
ms.RLock()
|
||||
defer ms.RUnlock()
|
||||
return ms.MapStorage.FieldAsInterface(fldPath)
|
||||
}
|
||||
|
||||
// FieldAsString returns the value from path as string
|
||||
func (ms *SafeMapStorage) FieldAsString(fldPath []string) (str string, err error) {
|
||||
ms.RLock()
|
||||
defer ms.RUnlock()
|
||||
return ms.MapStorage.FieldAsString(fldPath)
|
||||
}
|
||||
|
||||
// Set sets the value at the given path
|
||||
func (ms *SafeMapStorage) Set(fldPath []string, val any) (err error) {
|
||||
ms.Lock()
|
||||
defer ms.Unlock()
|
||||
return ms.MapStorage.Set(fldPath, val)
|
||||
}
|
||||
|
||||
// GetKeys returns all the keys from map
|
||||
func (ms *SafeMapStorage) GetKeys(nested bool, nestedLimit int, prefix string) (keys []string) {
|
||||
ms.RLock()
|
||||
defer ms.RUnlock()
|
||||
return ms.MapStorage.GetKeys(nested, nestedLimit, prefix)
|
||||
}
|
||||
|
||||
// Remove removes the item at path
|
||||
func (ms *SafeMapStorage) Remove(fldPath []string) (err error) {
|
||||
ms.Lock()
|
||||
defer ms.Unlock()
|
||||
return ms.MapStorage.Remove(fldPath)
|
||||
}
|
||||
|
||||
func (ms *SafeMapStorage) Clone() (msClone *SafeMapStorage) {
|
||||
ms.RLock()
|
||||
defer ms.RUnlock()
|
||||
return &SafeMapStorage{MapStorage: ms.MapStorage.Clone()}
|
||||
}
|
||||
|
||||
func (ms *SafeMapStorage) ClonedMapStorage() (msClone MapStorage) {
|
||||
ms.RLock()
|
||||
defer ms.RUnlock()
|
||||
return ms.MapStorage.Clone()
|
||||
}
|
||||
Reference in New Issue
Block a user