mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-25 00:58:45 +05:00
Added exporting to prometheus to stats
This commit is contained in:
committed by
Dan Christian Bogos
parent
6b542b3dd8
commit
2be78a74c1
@@ -26,6 +26,7 @@ import (
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/ericlagergren/decimal"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// NewStatMetric instantiates the StatMetric
|
||||
@@ -847,3 +848,32 @@ func (dst *StatDistinct) Clone() StatMetric {
|
||||
}
|
||||
return cln
|
||||
}
|
||||
|
||||
func exportToPrometheus(matchSQs StatQueues, promIDs utils.StringSet) (err error) {
|
||||
for _, qos := range matchSQs {
|
||||
if _, has := promIDs[qos.ID]; !has {
|
||||
continue
|
||||
}
|
||||
tntID := strings.Replace(qos.TenantID(), ".", "_", -1)
|
||||
gaugeVal := prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Subsystem: "stats",
|
||||
Name: tntID,
|
||||
Help: "Metrics exported as gauge, depending on metricID's ID.",
|
||||
}, []string{"metricID"})
|
||||
if err = prometheus.Register(gaugeVal); err != nil {
|
||||
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
|
||||
// A gauge for that metric has been registered before.
|
||||
gaugeVal = are.ExistingCollector.(*prometheus.GaugeVec)
|
||||
err = nil
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for metricID, metricVal := range qos.SQMetrics {
|
||||
// bool indicating whether x can fit into a float64 without truncation, overflow, or underflow will be masked for now
|
||||
valToBeSet, _ := metricVal.GetValue().Float64()
|
||||
gaugeVal.WithLabelValues(metricID).Set(valToBeSet)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -306,8 +306,8 @@ func (sS *StatS) processThresholds(ctx *context.Context, sQs StatQueues, opts ma
|
||||
return
|
||||
}
|
||||
|
||||
// processEvent processes a new event, dispatching to matching queues
|
||||
// queues matching are also cached to speed up
|
||||
// processEvent processes a new event, dispatching to matching queues.
|
||||
// Queues matching are also cached to speed up
|
||||
func (sS *StatS) processEvent(ctx *context.Context, tnt string, args *utils.CGREvent) (statQueueIDs []string, err error) {
|
||||
evNm := args.AsDataProvider()
|
||||
var sqIDs []string
|
||||
@@ -335,12 +335,22 @@ func (sS *StatS) processEvent(ctx *context.Context, tnt string, args *utils.CGRE
|
||||
withErrors = true
|
||||
}
|
||||
sS.storeStatQueue(ctx, sq)
|
||||
|
||||
}
|
||||
if sS.processThresholds(ctx, matchSQs, args.APIOpts) != nil ||
|
||||
withErrors {
|
||||
err = utils.ErrPartiallyExecuted
|
||||
}
|
||||
|
||||
var promIDs []string
|
||||
if promIDs, err = GetStringSliceOpts(ctx, tnt, args, sS.fltrS, sS.cfg.StatSCfg().Opts.PrometheusMetrics,
|
||||
[]string{}, utils.OptsPrometheusMetrics); err != nil {
|
||||
return
|
||||
}
|
||||
if len(promIDs) != 0 {
|
||||
if err = exportToPrometheus(matchSQs, utils.NewStringSet(promIDs)); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
matchSQs.unlock()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1220,6 +1220,7 @@ func TestStatQueueProcessEventOK(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStatQueueProcessEventProcessThPartExec(t *testing.T) {
|
||||
Cache.Clear(nil)
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
data := NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
|
||||
dm := NewDataManager(data, cfg.CacheCfg(), nil)
|
||||
@@ -1269,80 +1270,7 @@ func TestStatQueueProcessEventProcessThPartExec(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestStatQueueProcessEventProcessEventErr(t *testing.T) {
|
||||
tmpLogger := utils.Logger
|
||||
defer func() {
|
||||
utils.Logger = tmpLogger
|
||||
}()
|
||||
var buf bytes.Buffer
|
||||
utils.Logger = utils.NewStdLoggerWithWriter(&buf, "", 4)
|
||||
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
data := NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
|
||||
dm := NewDataManager(data, cfg.CacheCfg(), nil)
|
||||
filterS := NewFilterS(cfg, nil, dm)
|
||||
sS := NewStatService(dm, cfg, filterS, nil)
|
||||
|
||||
sqPrf := &StatQueueProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "SQ1",
|
||||
FilterIDs: []string{"*string:~*req.Account:1001"},
|
||||
Weight: 10,
|
||||
Blocker: true,
|
||||
QueueLength: 10,
|
||||
ThresholdIDs: []string{"*none"},
|
||||
MinItems: 5,
|
||||
Metrics: []*MetricWithFilters{
|
||||
{
|
||||
MetricID: utils.MetaTCD,
|
||||
},
|
||||
},
|
||||
}
|
||||
sq := &StatQueue{
|
||||
sqPrfl: sqPrf,
|
||||
Tenant: "cgrates.org",
|
||||
ID: "SQ1",
|
||||
SQItems: []SQItem{
|
||||
{
|
||||
EventID: "SqProcessEvent",
|
||||
},
|
||||
},
|
||||
SQMetrics: map[string]StatMetric{
|
||||
utils.MetaTCD: &StatTCD{},
|
||||
},
|
||||
}
|
||||
|
||||
if err := dm.SetStatQueueProfile(context.Background(), sqPrf, true); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err := dm.SetStatQueue(context.Background(), sq); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
args := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "SqProcessEvent",
|
||||
Event: map[string]interface{}{
|
||||
utils.AccountField: "1001",
|
||||
},
|
||||
APIOpts: map[string]interface{}{
|
||||
utils.OptsStatsProfileIDs: []string{"SQ1"},
|
||||
},
|
||||
}
|
||||
|
||||
expLog := `[WARNING] <StatS> Queue: cgrates.org:SQ1, ignoring event: cgrates.org:SqProcessEvent, error: NOT_FOUND:Usage`
|
||||
expIDs := []string{"SQ1"}
|
||||
if rcvIDs, err := sS.processEvent(context.Background(), args.Tenant, args); err == nil ||
|
||||
err.Error() != utils.ErrPartiallyExecuted.Error() {
|
||||
t.Errorf("expected: <%+v>, \nreceived: <%+v>", utils.ErrPartiallyExecuted, err)
|
||||
} else if !reflect.DeepEqual(rcvIDs, expIDs) {
|
||||
t.Errorf("expected: <%+v>, \nreceived: <%+v>", expIDs, rcvIDs)
|
||||
} else if rcvLog := buf.String(); !strings.Contains(rcvLog, expLog) {
|
||||
t.Errorf("expected log <%+v> to be included in: <%+v>",
|
||||
expLog, rcvLog)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
func TestStatQueueV1ProcessEventProcessEventErr(t *testing.T) {
|
||||
tmp := Cache
|
||||
tmpC := config.CgrConfig()
|
||||
@@ -1410,6 +1338,7 @@ func TestStatQueueV1ProcessEventProcessEventErr(t *testing.T) {
|
||||
t.Errorf("expected: <%+v>, \nreceived: <%+v>", utils.ErrPartiallyExecuted, err)
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
func TestStatQueueV1ProcessEventMissingArgs(t *testing.T) {
|
||||
tmp := Cache
|
||||
|
||||
Reference in New Issue
Block a user