Files
cgrates/agents/prometheus.go
2025-09-22 12:03:28 +02:00

471 lines
20 KiB
Go

/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package agents
import (
"fmt"
"net/http"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// Prometheus metric names
const (
// Gauge metrics
promGoGoroutines = "go_goroutines"
promGoThreads = "go_threads"
promProcessOpenFds = "process_open_fds"
promProcessMaxFds = "process_max_fds"
promProcessResidentMemoryBytes = "process_resident_memory_bytes"
promProcessVirtualMemoryBytes = "process_virtual_memory_bytes"
promProcessVirtualMemoryMaxBytes = "process_virtual_memory_max_bytes"
promProcessStartTimeSeconds = "process_start_time_seconds"
promGoMemstatsAllocBytes = "go_memstats_alloc_bytes"
promGoMemstatsHeapAllocBytes = "go_memstats_heap_alloc_bytes"
promGoMemstatsHeapIdleBytes = "go_memstats_heap_idle_bytes"
promGoMemstatsHeapInuseBytes = "go_memstats_heap_inuse_bytes"
promGoMemstatsHeapObjects = "go_memstats_heap_objects"
promGoMemstatsHeapReleasedBytes = "go_memstats_heap_released_bytes"
promGoMemstatsHeapSysBytes = "go_memstats_heap_sys_bytes"
promGoMemstatsBuckHashSysBytes = "go_memstats_buck_hash_sys_bytes"
promGoMemstatsGCSysBytes = "go_memstats_gc_sys_bytes"
promGoMemstatsMCacheInuseBytes = "go_memstats_mcache_inuse_bytes"
promGoMemstatsMCacheSysBytes = "go_memstats_mcache_sys_bytes"
promGoMemstatsMSpanInuseBytes = "go_memstats_mspan_inuse_bytes"
promGoMemstatsMSpanSysBytes = "go_memstats_mspan_sys_bytes"
promGoMemstatsNextGCBytes = "go_memstats_next_gc_bytes"
promGoMemstatsOtherSysBytes = "go_memstats_other_sys_bytes"
promGoMemstatsStackInuseBytes = "go_memstats_stack_inuse_bytes"
promGoMemstatsStackSysBytes = "go_memstats_stack_sys_bytes"
promGoMemstatsSysBytes = "go_memstats_sys_bytes"
promGoMemstatsLastGCTimeSeconds = "go_memstats_last_gc_time_seconds"
promGoGCGogcPercent = "go_gc_gogc_percent"
promGoGCGomemlimitBytes = "go_gc_gomemlimit_bytes"
promGoSchedGomaxprocsThreads = "go_sched_gomaxprocs_threads"
promGoInfo = "go_info"
promGoGCDurationSeconds = "go_gc_duration_seconds"
// Counter metrics
promProcessCPUSecondsTotal = "process_cpu_seconds_total"
promProcessNetworkReceiveByteTotal = "process_network_receive_bytes_total"
promProcessNetworkTransmitByteTotal = "process_network_transmit_bytes_total"
promGoMemstatsAllocBytesTotal = "go_memstats_alloc_bytes_total"
promGoMemstatsMallocsTotal = "go_memstats_mallocs_total"
promGoMemstatsFreesTotal = "go_memstats_frees_total"
)
// Prometheus metrics mappings
var (
processMetricsMapping = map[string]string{
promProcessCPUSecondsTotal: utils.MetricProcCPUTime,
promProcessOpenFds: utils.MetricProcOpenFDs,
promProcessMaxFds: utils.MetricProcMaxFDs,
promProcessResidentMemoryBytes: utils.MetricProcResidentMemory,
promProcessVirtualMemoryBytes: utils.MetricProcVirtualMemory,
promProcessVirtualMemoryMaxBytes: utils.MetricProcMaxVirtualMemory,
promProcessStartTimeSeconds: utils.MetricProcStartTime,
promProcessNetworkReceiveByteTotal: utils.MetricProcNetworkReceiveTotal,
promProcessNetworkTransmitByteTotal: utils.MetricProcNetworkTransmitTotal,
}
runtimeMetricsMapping = map[string]string{
promGoGoroutines: utils.MetricRuntimeGoroutines,
promGoThreads: utils.MetricRuntimeThreads,
promGoSchedGomaxprocsThreads: utils.MetricRuntimeMaxProcs,
promGoGCGogcPercent: utils.MetricGCPercent,
promGoGCGomemlimitBytes: utils.MetricMemLimit,
}
memStatsMetricsMapping = map[string]string{
promGoMemstatsAllocBytes: utils.MetricMemAlloc,
promGoMemstatsHeapAllocBytes: utils.MetricMemHeapAlloc,
promGoMemstatsHeapIdleBytes: utils.MetricMemHeapIdle,
promGoMemstatsHeapInuseBytes: utils.MetricMemHeapInuse,
promGoMemstatsHeapObjects: utils.MetricMemHeapObjects,
promGoMemstatsHeapReleasedBytes: utils.MetricMemHeapReleased,
promGoMemstatsHeapSysBytes: utils.MetricMemHeapSys,
promGoMemstatsBuckHashSysBytes: utils.MetricMemBuckHashSys,
promGoMemstatsGCSysBytes: utils.MetricMemGCSys,
promGoMemstatsMCacheInuseBytes: utils.MetricMemMCacheInuse,
promGoMemstatsMCacheSysBytes: utils.MetricMemMCacheSys,
promGoMemstatsMSpanInuseBytes: utils.MetricMemMSpanInuse,
promGoMemstatsMSpanSysBytes: utils.MetricMemMSpanSys,
promGoMemstatsNextGCBytes: utils.MetricMemNextGC,
promGoMemstatsOtherSysBytes: utils.MetricMemOtherSys,
promGoMemstatsStackInuseBytes: utils.MetricMemStackInuse,
promGoMemstatsStackSysBytes: utils.MetricMemStackSys,
promGoMemstatsSysBytes: utils.MetricMemSys,
promGoMemstatsLastGCTimeSeconds: utils.MetricMemLastGC,
promGoMemstatsAllocBytesTotal: utils.MetricMemTotalAlloc,
promGoMemstatsMallocsTotal: utils.MetricMemMallocs,
promGoMemstatsFreesTotal: utils.MetricMemFrees,
}
counterMetricsSet = map[string]struct{}{
promProcessCPUSecondsTotal: {},
promProcessNetworkReceiveByteTotal: {},
promProcessNetworkTransmitByteTotal: {},
promGoMemstatsAllocBytesTotal: {},
promGoMemstatsMallocsTotal: {},
promGoMemstatsFreesTotal: {},
}
)
// PrometheusAgent handles metrics collection for Prometheus.
// It collects stats from StatQueues and exposes them alongside
// optional Go runtime and process metrics.
type PrometheusAgent struct {
cfg *config.CGRConfig
cm *engine.ConnManager
handler http.Handler
statMetrics *prometheus.GaugeVec
cacheGroupsMetric *prometheus.GaugeVec
cacheItemsMetric *prometheus.GaugeVec
}
// NewPrometheusAgent creates and initializes a PrometheusAgent with
// pre-registered metrics based on the provided configuration.
func NewPrometheusAgent(cfg *config.CGRConfig, cm *engine.ConnManager) *PrometheusAgent {
reg := prometheus.NewRegistry()
if len(cfg.PrometheusAgentCfg().CoreSConns) != 0 {
coreMetricsCollector := newCoreMetricsCollector(cfg, cm)
reg.MustRegister(coreMetricsCollector)
}
cacheGroupsMetric := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "cgrates",
Subsystem: "cache",
Name: "groups_total",
Help: "Total number of cache groups",
}, []string{"cache", "node_id"})
reg.MustRegister(cacheGroupsMetric)
cacheItemsMetric := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "cgrates",
Subsystem: "cache",
Name: "items_total",
Help: "Total number of cache items",
}, []string{"cache", "node_id"})
reg.MustRegister(cacheItemsMetric)
statMetrics := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "cgrates",
Subsystem: "stats",
Name: "metrics",
Help: "Current values for StatQueue metrics",
}, []string{"tenant", "queue", "metric"})
reg.MustRegister(statMetrics)
if cfg.PrometheusAgentCfg().CollectGoMetrics {
reg.MustRegister(collectors.NewGoCollector())
}
if cfg.PrometheusAgentCfg().CollectProcessMetrics {
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
}
handler := promhttp.InstrumentMetricHandler(
reg,
promhttp.HandlerFor(reg, promhttp.HandlerOpts{}),
)
return &PrometheusAgent{
cfg: cfg,
cm: cm,
handler: handler,
statMetrics: statMetrics,
cacheGroupsMetric: cacheGroupsMetric,
cacheItemsMetric: cacheItemsMetric,
}
}
// ServeHTTP implements http.Handler interface. It updates all metrics on each
// scrape request before exposing them via the Prometheus HTTP handler.
func (pa *PrometheusAgent) ServeHTTP(w http.ResponseWriter, r *http.Request) {
pa.updateCacheStats()
pa.updateStatsMetrics()
pa.handler.ServeHTTP(w, r)
}
// updateStatsMetrics fetches and updates all StatQueue metrics by calling each
// configured StatS connection.
func (pa *PrometheusAgent) updateStatsMetrics() {
for connIdx, connID := range pa.cfg.PrometheusAgentCfg().StatSConns {
sqIDs := pa.cfg.PrometheusAgentCfg().StatQueueIDs
// When no StatQueueIDs set, fetch all available ones.
if len(sqIDs) == 0 {
apiersConnID := pa.cfg.PrometheusAgentCfg().ApierSConns[connIdx]
if err := pa.cm.Call(context.Background(), []string{apiersConnID},
utils.APIerSv1GetStatQueueProfileIDs,
&utils.PaginatorWithTenant{}, &sqIDs); err != nil {
utils.Logger.Err(fmt.Sprintf(
"<%s> failed to retrieve all StatQueue IDs (connID=%q): %v",
utils.PrometheusAgent, apiersConnID, err))
continue
}
}
for _, sqID := range sqIDs {
tenantID := utils.NewTenantID(sqID)
if tenantID.Tenant == "" {
tenantID.Tenant = pa.cfg.GeneralCfg().DefaultTenant
}
var metrics map[string]float64
err := pa.cm.Call(context.Background(), []string{connID},
utils.StatSv1GetQueueFloatMetrics,
&utils.TenantIDWithAPIOpts{
TenantID: tenantID,
}, &metrics)
if err != nil && err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Err(fmt.Sprintf(
"<%s> failed to retrieve metrics for StatQueue %q (connID=%q): %v",
utils.PrometheusAgent, sqID, connID, err))
continue
}
for metricID, val := range metrics {
pa.statMetrics.WithLabelValues(tenantID.Tenant, tenantID.ID, metricID).Set(val)
}
}
}
}
// updateCacheStats fetches cache statistics from configured CacheS connections
// and updates the corresponding Prometheus metrics.
func (pa *PrometheusAgent) updateCacheStats() {
if len(pa.cfg.PrometheusAgentCfg().CacheSConns) == 0 {
return
}
for _, connID := range pa.cfg.PrometheusAgentCfg().CacheSConns {
var reply engine.CacheStatsWithMetadata
if err := pa.cm.Call(context.Background(), []string{connID},
utils.CacheSv1GetStats,
&utils.AttrCacheIDsWithAPIOpts{
CacheIDs: pa.cfg.PrometheusAgentCfg().CacheIDs,
}, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf(
"<%s> failed to retrieve cache stats (connID=%q): %v",
utils.PrometheusAgent, connID, err))
continue
}
cacheStats := reply.CacheStatistics
nodeID := utils.IfaceAsString(reply.Metadata[utils.NodeID])
for cacheID, stats := range cacheStats {
if stats == nil {
continue
}
pa.cacheGroupsMetric.WithLabelValues(cacheID, nodeID).Set(float64(stats.Groups))
pa.cacheItemsMetric.WithLabelValues(cacheID, nodeID).Set(float64(stats.Items))
}
}
}
// coreMetricsCollector collects CoreS metrics. Equivalent to Go/Process collectors.
type coreMetricsCollector struct {
cfg *config.CGRConfig
cm *engine.ConnManager
// Pre-defined descriptors
descs map[string]*prometheus.Desc
}
// newCoreMetricsCollector creates a new collector with pre-defined descriptors
func newCoreMetricsCollector(cfg *config.CGRConfig, cm *engine.ConnManager) *coreMetricsCollector {
c := &coreMetricsCollector{
cfg: cfg,
cm: cm,
descs: make(map[string]*prometheus.Desc),
}
gaugeMetrics := map[string]string{
promGoGoroutines: "Number of goroutines that currently exist.",
promGoThreads: "Number of OS threads created.",
promProcessOpenFds: "Number of open file descriptors.",
promProcessMaxFds: "Maximum number of open file descriptors.",
promProcessResidentMemoryBytes: "Resident memory size in bytes.",
promProcessVirtualMemoryBytes: "Virtual memory size in bytes.",
promProcessVirtualMemoryMaxBytes: "Maximum amount of virtual memory available in bytes.",
promProcessStartTimeSeconds: "Start time of the process since unix epoch in seconds.",
promGoMemstatsAllocBytes: "Number of bytes allocated in heap and currently in use. Equals to /memory/classes/heap/objects:bytes.",
promGoMemstatsHeapAllocBytes: "Number of heap bytes allocated and currently in use, same as go_memstats_alloc_bytes. Equals to /memory/classes/heap/objects:bytes.",
promGoMemstatsHeapIdleBytes: "Number of heap bytes waiting to be used. Equals to /memory/classes/heap/released:bytes + /memory/classes/heap/free:bytes.",
promGoMemstatsHeapInuseBytes: "Number of heap bytes that are in use. Equals to /memory/classes/heap/objects:bytes + /memory/classes/heap/unused:bytes",
promGoMemstatsHeapObjects: "Number of currently allocated objects. Equals to /gc/heap/objects:objects.",
promGoMemstatsHeapReleasedBytes: "Number of heap bytes released to OS. Equals to /memory/classes/heap/released:bytes.",
promGoMemstatsHeapSysBytes: "Number of heap bytes obtained from system. Equals to /memory/classes/heap/objects:bytes + /memory/classes/heap/unused:bytes + /memory/classes/heap/released:bytes + /memory/classes/heap/free:bytes.",
promGoMemstatsBuckHashSysBytes: "Number of bytes used by the profiling bucket hash table. Equals to /memory/classes/profiling/buckets:bytes.",
promGoMemstatsGCSysBytes: "Number of bytes used for garbage collection system metadata. Equals to /memory/classes/metadata/other:bytes.",
promGoMemstatsMCacheInuseBytes: "Number of bytes in use by mcache structures. Equals to /memory/classes/metadata/mcache/inuse:bytes.",
promGoMemstatsMCacheSysBytes: "Number of bytes used for mcache structures obtained from system. Equals to /memory/classes/metadata/mcache/inuse:bytes + /memory/classes/metadata/mcache/free:bytes.",
promGoMemstatsMSpanInuseBytes: "Number of bytes in use by mspan structures. Equals to /memory/classes/metadata/mspan/inuse:bytes.",
promGoMemstatsMSpanSysBytes: "Number of bytes used for mspan structures obtained from system. Equals to /memory/classes/metadata/mspan/inuse:bytes + /memory/classes/metadata/mspan/free:bytes.",
promGoMemstatsNextGCBytes: "Number of heap bytes when next garbage collection will take place. Equals to /gc/heap/goal:bytes.",
promGoMemstatsOtherSysBytes: "Number of bytes used for other system allocations. Equals to /memory/classes/other:bytes.",
promGoMemstatsStackInuseBytes: "Number of bytes obtained from system for stack allocator in non-CGO environments. Equals to /memory/classes/heap/stacks:bytes.",
promGoMemstatsStackSysBytes: "Number of bytes obtained from system for stack allocator. Equals to /memory/classes/heap/stacks:bytes + /memory/classes/os-stacks:bytes.",
promGoMemstatsSysBytes: "Number of bytes obtained from system. Equals to /memory/classes/total:byte.",
promGoMemstatsLastGCTimeSeconds: "Number of seconds since 1970 of last garbage collection.",
promGoGCGogcPercent: "Heap size target percentage configured by the user, otherwise 100. This value is set by the GOGC environment variable, and the runtime/debug.SetGCPercent function. Sourced from /gc/gogc:percent.",
promGoGCGomemlimitBytes: "Go runtime memory limit configured by the user, otherwise math.MaxInt64. This value is set by the GOMEMLIMIT environment variable, and the runtime/debug.SetMemoryLimit function. Sourced from /gc/gomemlimit:bytes.",
promGoSchedGomaxprocsThreads: "The current runtime.GOMAXPROCS setting, or the number of operating system threads that can execute user-level Go code simultaneously. Sourced from /sched/gomaxprocs:threads.",
}
for name, help := range gaugeMetrics {
c.descs[name] = prometheus.NewDesc(name, help, []string{"node_id"}, nil)
}
counterMetrics := map[string]string{
promProcessCPUSecondsTotal: "Total user and system CPU time spent in seconds.",
promProcessNetworkReceiveByteTotal: "Number of bytes received by the process over the network.",
promProcessNetworkTransmitByteTotal: "Number of bytes sent by the process over the network.",
promGoMemstatsAllocBytesTotal: "Total number of bytes allocated in heap until now, even if released already. Equals to /gc/heap/allocs:bytes.",
promGoMemstatsMallocsTotal: "Total number of heap objects allocated, both live and gc-ed. Semantically a counter version for go_memstats_heap_objects gauge. Equals to /gc/heap/allocs:objects + /gc/heap/tiny/allocs:objects.",
promGoMemstatsFreesTotal: "Total number of heap objects frees. Equals to /gc/heap/frees:objects + /gc/heap/tiny/allocs:objects.",
}
for name, help := range counterMetrics {
c.descs[name] = prometheus.NewDesc(name, help, []string{"node_id"}, nil)
}
c.descs[promGoInfo] = prometheus.NewDesc(
promGoInfo,
"Information about the Go environment.",
[]string{"node_id", "version"},
nil)
c.descs[promGoGCDurationSeconds] = prometheus.NewDesc(
promGoGCDurationSeconds,
"A summary of the wall-time pause (stop-the-world) duration in garbage collection cycles.",
[]string{"node_id"},
nil,
)
return c
}
// Describe implements prometheus.Collector.
func (c *coreMetricsCollector) Describe(ch chan<- *prometheus.Desc) {
for _, desc := range c.descs {
ch <- desc
}
}
// Collect implements prometheus.Collector.
func (c *coreMetricsCollector) Collect(ch chan<- prometheus.Metric) {
for _, connID := range c.cfg.PrometheusAgentCfg().CoreSConns {
var reply map[string]any
if err := c.cm.Call(context.Background(), []string{connID},
utils.CoreSv1Status,
&cores.V1StatusParams{
Debug: true,
},
&reply); err != nil {
utils.Logger.Err(fmt.Sprintf(
"<%s> failed to retrieve metrics (connID=%q): %v",
utils.PrometheusAgent, connID, err))
continue
}
nodeID, ok := reply[utils.NodeID].(string)
if !ok {
panic("missing node_id in CoreSv1.Status reply")
}
if version, ok := reply[utils.GoVersion].(string); ok {
ch <- prometheus.MustNewConstMetric(c.descs[promGoInfo], prometheus.GaugeValue, 1, nodeID, version)
}
for metricName, key := range runtimeMetricsMapping {
if val, ok := reply[key].(float64); ok {
ch <- prometheus.MustNewConstMetric(c.descs[metricName], prometheus.GaugeValue, val, nodeID)
}
}
if procStats, ok := reply[utils.FieldProcStats].(map[string]any); ok {
for metricName, key := range processMetricsMapping {
if val, ok := procStats[key].(float64); ok {
metricType := prometheus.GaugeValue
if _, isCounter := counterMetricsSet[metricName]; isCounter {
metricType = prometheus.CounterValue
}
ch <- prometheus.MustNewConstMetric(c.descs[metricName], metricType, val, nodeID)
}
}
}
if memStats, ok := reply[utils.FieldMemStats].(map[string]any); ok {
for metricName, key := range memStatsMetricsMapping {
if val, ok := memStats[key].(float64); ok {
metricType := prometheus.GaugeValue
if _, isCounter := counterMetricsSet[metricName]; isCounter {
metricType = prometheus.CounterValue
}
ch <- prometheus.MustNewConstMetric(c.descs[metricName], metricType, val, nodeID)
}
}
}
if gcStats, ok := reply[utils.FieldGCDurationStats].(map[string]any); ok {
var count uint64
var sum float64
quantileValues := make(map[float64]float64)
if c, ok := gcStats[utils.MetricGCCount].(float64); ok {
count = uint64(c)
}
if s, ok := gcStats[utils.MetricGCSum].(float64); ok {
sum = s
}
// Handle different types that may be returned based on connection type:
// - []any: from serialized RPC connections where type information is lost
// - []cores.Quantile: from direct (*internal) calls where type is preserved
switch quantiles := gcStats[utils.MetricGCQuantiles].(type) {
case []any:
for _, q := range quantiles {
if qMap, ok := q.(map[string]any); ok {
if quantile, ok := qMap[utils.MetricGCQuantile].(float64); ok {
if val, ok := qMap[utils.MetricGCValue].(float64); ok {
quantileValues[quantile] = val
}
}
}
}
case []cores.Quantile:
for _, q := range quantiles {
quantileValues[q.Quantile] = q.Value
}
}
ch <- prometheus.MustNewConstSummary(
c.descs[promGoGCDurationSeconds],
count,
sum,
quantileValues,
nodeID,
)
}
}
}