add error handling for cron AddFunc

This commit is contained in:
ionutboangiu
2025-11-04 13:52:26 +02:00
committed by Dan Christian Bogos
parent f2a4427d2c
commit a0adddb11f
16 changed files with 131 additions and 49 deletions

View File

@@ -240,11 +240,10 @@ func (eeS *EeS) V1ArchiveEventsInReply(ctx *context.Context, args *ArchiveEvents
return fmt.Errorf("exporter with ID: %s has an invalid ExportPath for archiving", expID)
}
timezone := utils.FirstNonEmpty(eeCfg.Timezone, eeS.cfg.GeneralCfg().DefaultTimezone)
loc, err := time.LoadLocation(timezone)
em, err := utils.NewExporterMetrics(eeCfg.MetricsResetSchedule, timezone)
if err != nil {
return err
return fmt.Errorf("failed to initialize exporter metrics: %v", err)
}
em := utils.NewExporterMetrics(eeCfg.MetricsResetSchedule, loc)
var ee EventExporter

View File

@@ -45,11 +45,10 @@ type EventExporter interface {
func NewEventExporter(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig,
filterS *engine.FilterS, connMngr *engine.ConnManager) (ee EventExporter, err error) {
timezone := utils.FirstNonEmpty(cfg.Timezone, cgrCfg.GeneralCfg().DefaultTimezone)
loc, err := time.LoadLocation(timezone)
em, err := utils.NewExporterMetrics(cfg.MetricsResetSchedule, timezone)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to initialize exporter metrics: %v", err)
}
em := utils.NewExporterMetrics(cfg.MetricsResetSchedule, loc)
switch cfg.Type {
case utils.MetaFileCSV:

View File

@@ -23,7 +23,6 @@ import (
"reflect"
"strings"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -40,7 +39,10 @@ func TestNewEventExporter(t *testing.T) {
if strings.Contains(errExpect, err.Error()) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
}
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
eeExpect, err := NewFileCSVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, em, nil)
if strings.Contains(errExpect, err.Error()) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
@@ -71,7 +73,10 @@ func TestNewEventExporterCase2(t *testing.T) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
}
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
eeExpect, err := NewFileFWVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, em, io.Discard)
if strings.Contains(errExpect, err.Error()) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
@@ -99,7 +104,10 @@ func TestNewEventExporterCase3(t *testing.T) {
if err != nil {
t.Error(err)
}
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
eeExpect, err := NewHTTPPostEE(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, em)
if err != nil {
t.Error(err)
@@ -121,7 +129,10 @@ func TestNewEventExporterCase4(t *testing.T) {
if err != nil {
t.Error(err)
}
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
eeExpect, err := NewHTTPjsonMapEE(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, em)
if err != nil {
t.Error(err)
@@ -143,7 +154,10 @@ func TestNewEventExporterCase6(t *testing.T) {
if err != nil {
t.Error(err)
}
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
eeExpect := NewVirtualEE(cgrCfg.EEsCfg().Exporters[0], em)
newEE := ee.(*VirtualEE)
newEE.em.MapStorage[utils.TimeNow] = nil
@@ -176,9 +190,9 @@ func TestNewEventExporterCase7(t *testing.T) {
if err != nil {
t.Error(err)
}
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Error(err)
t.Fatal(err)
}
eeExpect, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], em)
if err != nil {
@@ -211,7 +225,7 @@ func TestNewEventExporterDcCase(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.GeneralCfg().DefaultTimezone = "invalid_timezone"
_, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, nil, nil)
errExpect := "unknown time zone invalid_timezone"
errExpect := "failed to initialize exporter metrics: unknown time zone invalid_timezone"
if err == nil || err.Error() != errExpect {
t.Errorf("Expected %+v \n but got %+v", errExpect, err)
}

View File

@@ -455,7 +455,10 @@ func TestOnCacheEvicted(t *testing.T) {
}
func TestUpdateEEMetrics(t *testing.T) {
em := utils.NewExporterMetrics("", time.UTC)
em, err := utils.NewExporterMetrics("", "")
if err != nil {
t.Fatal(err)
}
tnow := time.Now()
ev := engine.MapEvent{
utils.AnswerTime: tnow,
@@ -464,7 +467,10 @@ func TestUpdateEEMetrics(t *testing.T) {
utils.ToR: utils.MetaVoice,
utils.Usage: time.Second,
}
exp := utils.NewExporterMetrics("", time.UTC)
exp, err := utils.NewExporterMetrics("", "")
if err != nil {
t.Fatal(err)
}
exp.MapStorage[utils.FirstEventATime] = tnow
exp.MapStorage[utils.LastEventATime] = tnow
exp.MapStorage[utils.FirstExpOrderID] = int64(1)

View File

@@ -20,7 +20,6 @@ package ees
import (
"reflect"
"testing"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
@@ -28,7 +27,10 @@ import (
)
func TestGetMetrics(t *testing.T) {
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
ee := &ElasticEE{
em: em,
}
@@ -56,7 +58,10 @@ func TestInitClient(t *testing.T) {
func TestElasticExportEventErr(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], em)
if err != nil {
t.Error(err)

View File

@@ -723,7 +723,10 @@ func TestCsvInitFileCSV(t *testing.T) {
if err := os.MkdirAll("/tmp/TestInitFileCSV", 0666); err != nil {
t.Error(err)
}
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
fCsv := &FileCSVee{
cgrCfg: cgrCfg,
cfg: cgrCfg.EEsCfg().Exporters[0],

View File

@@ -24,7 +24,6 @@ import (
"io"
"reflect"
"testing"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
@@ -33,7 +32,10 @@ import (
)
func TestFileCsvGetMetrics(t *testing.T) {
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
fCsv := &FileCSVee{em: em}
if rcv := fCsv.GetMetrics(); !reflect.DeepEqual(rcv, fCsv.em) {
@@ -183,7 +185,10 @@ func TestFileCsvExportEvent(t *testing.T) {
filterS := engine.NewFilterS(cfg, nil, newDM)
byteBuff := new(bytes.Buffer)
csvNW := csv.NewWriter(byteBuff)
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
fCsv := &FileCSVee{
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,

View File

@@ -157,7 +157,10 @@ func TestFileFwvInit(t *testing.T) {
if err := os.MkdirAll("/tmp/TestInitFileFWV", 0666); err != nil {
t.Error(err)
}
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
fFwv := &FileFWVee{
cgrCfg: cgrCfg,
cfg: cgrCfg.EEsCfg().Exporters[0],

View File

@@ -24,7 +24,6 @@ import (
"io"
"reflect"
"testing"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
@@ -33,7 +32,10 @@ import (
)
func TestFileFwvGetMetrics(t *testing.T) {
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
fFwv := &FileFWVee{em: em}
if rcv := fFwv.GetMetrics(); !reflect.DeepEqual(rcv, fFwv.em) {
@@ -175,7 +177,10 @@ func TestFileFwvExportEvent(t *testing.T) {
filterS := engine.NewFilterS(cfg, nil, newDM)
byteBuff := new(bytes.Buffer)
csvNW := csv.NewWriter(byteBuff)
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
fFwv := &FileFWVee{
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
@@ -209,7 +214,10 @@ func TestFileFwvExportEventWriteError(t *testing.T) {
newDM := engine.NewDataManager(dbCM, cfg, nil)
filterS := engine.NewFilterS(cfg, nil, newDM)
byteBuff := new(bytes.Buffer)
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
fFwv := &FileFWVee{
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,

View File

@@ -33,7 +33,10 @@ import (
)
func TestHttpJsonMapGetMetrics(t *testing.T) {
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
httpEE := &HTTPjsonMapEE{
em: em,
}

View File

@@ -34,7 +34,10 @@ import (
)
func TestHttpPostGetMetrics(t *testing.T) {
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
httpPost := &HTTPPostEE{
em: em,
}

View File

@@ -23,7 +23,6 @@ import (
"reflect"
"strings"
"testing"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
@@ -32,7 +31,10 @@ import (
func TestNewLogEE(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
expected := &LogEE{
cfg: cfg.EEsCfg().ExporterCfg(utils.MetaDefault),
@@ -47,7 +49,10 @@ func TestNewLogEE(t *testing.T) {
func TestLogEEExportEvent(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), em)
mp := map[string]any{
"field1": 2,
@@ -83,7 +88,10 @@ func TestLogEE_GetMetrics(t *testing.T) {
func TestLogEEPrepareMap(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), em)
mp := &utils.CGREvent{
Event: map[string]any{
@@ -99,7 +107,10 @@ func TestLogEEPrepareMap(t *testing.T) {
func TestLogEEPrepareOrderMap(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
logEE := NewLogEE(cfg.EEsCfg().ExporterCfg(utils.MetaDefault), em)
mp := utils.NewOrderedNavigableMap()
fullPath := &utils.FullPath{

View File

@@ -21,7 +21,6 @@ package ees
import (
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -30,7 +29,10 @@ import (
func TestNewRpcEE(t *testing.T) {
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
connMgr := engine.NewConnManager(config.NewDefaultCGRConfig())
rcv, err := NewRpcEE(eeSCfg, em, connMgr)
@@ -100,7 +102,10 @@ func TestRPCCfg(t *testing.T) {
func TestRPCConnect(t *testing.T) {
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
connMgr := engine.NewConnManager(config.NewDefaultCGRConfig())
rpcEe, err := NewRpcEE(eeSCfg, em, connMgr)
if err != nil {
@@ -139,7 +144,10 @@ func TestRPCConnect(t *testing.T) {
func TestRPCClose(t *testing.T) {
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
connMgr := engine.NewConnManager(config.NewDefaultCGRConfig())
rpcEe, err := NewRpcEE(eeSCfg, em, connMgr)
if err != nil {
@@ -174,7 +182,10 @@ func TestRPCGetMetrics(t *testing.T) {
func TestRPCPrepareMap(t *testing.T) {
eeSCfg := config.NewDefaultCGRConfig().EEsCfg().ExporterCfg(utils.MetaDefault)
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
connMgr := engine.NewConnManager(config.NewDefaultCGRConfig())
rpcEe, err := NewRpcEE(eeSCfg, em, connMgr)
if err != nil {

View File

@@ -22,7 +22,6 @@ import (
"fmt"
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
@@ -33,7 +32,10 @@ import (
)
func TestSqlGetMetrics(t *testing.T) {
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
sqlEe := &SQLEe{
em: em,
}

View File

@@ -21,14 +21,16 @@ package ees
import (
"reflect"
"testing"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/utils"
)
func TestVirtualEeGetMetrics(t *testing.T) {
em := utils.NewExporterMetrics("", time.Local)
em, err := utils.NewExporterMetrics("", "Local")
if err != nil {
t.Fatal(err)
}
vEe := &VirtualEE{
em: em,
}

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package utils
import (
"fmt"
"sync"
"time"
@@ -36,7 +37,11 @@ type ExporterMetrics struct {
// NewExporterMetrics creates metrics with optional automatic reset.
// schedule is a cron expression for reset timing (empty to disable).
func NewExporterMetrics(schedule string, loc *time.Location) *ExporterMetrics {
func NewExporterMetrics(schedule, timezone string) (*ExporterMetrics, error) {
loc, err := time.LoadLocation(timezone)
if err != nil {
return nil, err
}
m := &ExporterMetrics{
loc: loc,
}
@@ -44,12 +49,15 @@ func NewExporterMetrics(schedule string, loc *time.Location) *ExporterMetrics {
if schedule != "" {
m.cron = cron.New()
m.cron.AddFunc(schedule, func() {
if _, err := m.cron.AddFunc(schedule, func() {
m.Reset()
})
}); err != nil {
// Only fails if schedule is an invalid cron expression.
return nil, fmt.Errorf("invalid cron expr %q: %v", schedule, err)
}
m.cron.Start()
}
return m
return m, nil
}
// Reset immediately clears all metrics and resets them to initial values.