improve CoreSv1.Status API with more detailed metrics

- add new metrics (off by default)
- add Debug boolean option for full metric details
- add Timezone option for 'running_since' field
- update default metrics shown and their constants
- remove utils.GetStartTime(), now use process metrics
This commit is contained in:
ionutboangiu
2024-09-06 23:11:06 +03:00
committed by Dan Christian Bogos
parent 98f770c61b
commit 62c30ab539
13 changed files with 416 additions and 124 deletions

View File

@@ -277,28 +277,42 @@ func (cS *CoreService) StopMemoryProfiling() error {
return nil
}
// V1Status returns the status of the engine
func (cS *CoreService) V1Status(_ *context.Context, _ *utils.TenantWithAPIOpts, reply *map[string]any) (err error) {
memstats := new(runtime.MemStats)
runtime.ReadMemStats(memstats)
response := make(map[string]any)
response[utils.NodeID] = cS.cfg.GeneralCfg().NodeID
response[utils.MemoryUsage] = utils.SizeFmt(float64(memstats.HeapAlloc), "")
response[utils.ActiveGoroutines] = runtime.NumGoroutine()
if response[utils.VersionName], err = utils.GetCGRVersion(); err != nil {
utils.Logger.Err(err.Error())
err = nil
// V1StatusParams contains required parameters for a CoreSv1.Status request.
type V1StatusParams struct {
Debug bool
Timezone string
Tenant string
APIOpts map[string]any
}
// V1Status returns metrics related to the engine process.
func (cS *CoreService) V1Status(_ *context.Context, params *V1StatusParams, reply *map[string]any) error {
metrics, err := computeAppMetrics()
if err != nil {
return err
}
response[utils.RunningSince] = utils.GetStartTime()
response[utils.GoVersion] = runtime.Version()
metrics.NodeID = cS.cfg.GeneralCfg().NodeID
if cS.cfg.CoreSCfg().Caps != 0 {
response[utils.CAPSAllocated] = cS.caps.Allocated()
metrics.CapsStats = &CapsStats{
Allocated: cS.caps.Allocated(),
}
if cS.cfg.CoreSCfg().CapsStatsInterval != 0 {
response[utils.CAPSPeak] = cS.CapsStats.GetPeak()
peak := cS.CapsStats.GetPeak()
metrics.CapsStats.Peak = &peak
}
}
*reply = response
return
debug := false
timezone := cS.cfg.GeneralCfg().DefaultTimezone
if params != nil {
debug = params.Debug
timezone = params.Timezone
}
metricsMap, err := metrics.ToMap(debug, timezone)
if err != nil {
return fmt.Errorf("could not convert StatusMetrics to map[string]any: %v", err)
}
*reply = metricsMap
return nil
}
// Sleep is used to test the concurrent requests mechanism

View File

@@ -21,12 +21,10 @@ package cores
import (
"errors"
"reflect"
"runtime"
"sync"
"testing"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -57,63 +55,6 @@ func TestNewCoreService(t *testing.T) {
rcv.Shutdown()
}
func TestCoreServiceStatus(t *testing.T) {
cfgDflt := config.NewDefaultCGRConfig()
cfgDflt.CoreSCfg().CapsStatsInterval = 1
caps := engine.NewCaps(1, utils.MetaBusy)
stopChan := make(chan struct{}, 1)
cores := NewCoreService(cfgDflt, caps, nil, stopChan, nil, nil)
args := &utils.TenantWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]any{},
}
var reply map[string]any
cfgVrs, err := utils.GetCGRVersion()
if err != nil {
t.Error(err)
}
expected := map[string]any{
utils.GoVersion: runtime.Version(),
utils.RunningSince: "TIME_CHANGED",
utils.VersionName: cfgVrs,
utils.ActiveGoroutines: runtime.NumGoroutine(),
utils.MemoryUsage: "CHANGED_MEMORY_USAGE",
utils.NodeID: cfgDflt.GeneralCfg().NodeID,
}
if err := cores.V1Status(context.Background(), args, &reply); err != nil {
t.Error(err)
} else {
reply[utils.RunningSince] = "TIME_CHANGED"
reply[utils.MemoryUsage] = "CHANGED_MEMORY_USAGE"
}
if !reflect.DeepEqual(expected[utils.GoVersion], reply[utils.GoVersion]) {
t.Errorf("Expected %+v, received %+v", utils.ToJSON(expected[utils.GoVersion]), utils.ToJSON(reply[utils.GoVersion]))
}
if !reflect.DeepEqual(expected[utils.RunningSince], reply[utils.RunningSince]) {
t.Errorf("Expected %+v, received %+v", utils.ToJSON(expected[utils.RunningSince]), utils.ToJSON(reply[utils.RunningSince]))
}
if !reflect.DeepEqual(expected[utils.VersionName], reply[utils.VersionName]) {
t.Errorf("Expected %+v, received %+v", utils.ToJSON(expected[utils.VersionName]), utils.ToJSON(reply[utils.VersionName]))
}
if !reflect.DeepEqual(expected[utils.MemoryUsage], reply[utils.MemoryUsage]) {
t.Errorf("Expected %+v, received %+v", utils.ToJSON(expected[utils.MemoryUsage]), utils.ToJSON(reply[utils.MemoryUsage]))
}
if !reflect.DeepEqual(expected[utils.NodeID], reply[utils.NodeID]) {
t.Errorf("Expected %+v, received %+v", utils.ToJSON(expected[utils.NodeID]), utils.ToJSON(reply[utils.NodeID]))
}
utils.GitCommitDate = "wrong format"
utils.GitCommitHash = "73014DAA0C1D7EDCB532D5FE600B8A20D588CDF8"
if err := cores.V1Status(context.Background(), args, &reply); err != nil {
t.Error(err)
}
utils.GitCommitDate = ""
utils.GitCommitHash = ""
}
func TestV1Panic(t *testing.T) {
coreService := &CoreService{}
expectedMessage := "test panic message"

330
cores/metrics.go Normal file
View File

@@ -0,0 +1,330 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package cores
import (
"os"
"runtime"
"runtime/debug"
"strconv"
"time"
"github.com/cgrates/cgrates/utils"
"github.com/prometheus/procfs"
)
type StatusMetrics struct {
PID int `json:"pid"`
GoVersion string `json:"go_version"`
NodeID string `json:"node_id"`
Version string `json:"version"`
Goroutines int `json:"goroutines"`
Threads int `json:"threads"`
MemStats GoMemStats `json:"mem_stats"`
GCDurationStats GCDurationStats `json:"gc_duration_stats"`
ProcStats ProcStats `json:"proc_stats"`
CapsStats *CapsStats `json:"caps_stats"`
}
func (sm StatusMetrics) ToMap(debug bool, timezone string) (map[string]any, error) {
if !debug {
return sm.ToMapCondensed(timezone)
}
m := make(map[string]any)
m["pid"] = sm.PID
m["go_version"] = sm.GoVersion
m["node_id"] = sm.NodeID
m["version"] = sm.Version
m["goroutines"] = sm.Goroutines
m["threads"] = sm.Threads
m["mem_stats"] = sm.MemStats.ToMap()
m["gc_duration_stats"] = sm.GCDurationStats.ToMap()
m["proc_stats"] = sm.ProcStats.ToMap()
if sm.CapsStats != nil {
m["caps_stats"] = sm.CapsStats.ToMap()
}
return m, nil
}
func (sm StatusMetrics) ToMapCondensed(timezone string) (map[string]any, error) {
m := make(map[string]any)
m[utils.PID] = sm.PID
m[utils.GoVersion] = sm.GoVersion
m[utils.NodeID] = sm.NodeID
m[utils.VersionLower] = sm.Version
startTime, err := utils.ParseTimeDetectLayout(strconv.Itoa(int(sm.ProcStats.StartTime)), timezone)
if err != nil {
return nil, err
}
m[utils.RunningSince] = startTime.Format(time.UnixDate)
m[utils.Goroutines] = sm.Goroutines
m[utils.OpenFiles] = sm.ProcStats.OpenFDs
m[utils.ResidentMemory] = utils.SizeFmt(float64(sm.ProcStats.ResidentMemory), "")
m[utils.ActiveMemory] = utils.SizeFmt(float64(sm.MemStats.HeapAlloc), "")
m[utils.SystemMemory] = utils.SizeFmt(float64(sm.MemStats.Sys), "")
m[utils.OSThreadsInUse] = sm.Threads
durStr := strconv.FormatFloat(sm.ProcStats.CPUTime, 'f', -1, 64)
dur, err := utils.ParseDurationWithSecs(durStr)
if err != nil {
return nil, err
}
m[utils.CPUTime] = dur.String()
if sm.CapsStats != nil {
m[utils.CAPSAllocated] = sm.CapsStats.Allocated
if sm.CapsStats.Peak != nil {
m[utils.CAPSPeak] = *sm.CapsStats.Peak
}
}
return m, nil
}
type GoMemStats struct {
Alloc uint64 `json:"alloc"`
TotalAlloc uint64 `json:"total_alloc"`
Sys uint64 `json:"sys"`
Mallocs uint64 `json:"mallocs"`
Frees uint64 `json:"frees"`
Lookups uint64 `json:"lookups"`
HeapAlloc uint64 `json:"heap_alloc"`
HeapSys uint64 `json:"heap_sys"`
HeapIdle uint64 `json:"heap_idle"`
HeapInuse uint64 `json:"heap_inuse"`
HeapReleased uint64 `json:"heap_released"`
HeapObjects uint64 `json:"heap_objects"`
StackInuse uint64 `json:"stack_inuse"`
StackSys uint64 `json:"stack_sys"`
MSpanSys uint64 `json:"mspan_sys"`
MSpanInuse uint64 `json:"mspan_inuse"`
MCacheInuse uint64 `json:"mcache_inuse"`
MCacheSys uint64 `json:"mcache_sys"`
BuckHashSys uint64 `json:"buckhash_sys"`
GCSys uint64 `json:"gc_sys"`
OtherSys uint64 `json:"other_sys"`
NextGC uint64 `json:"next_gc"`
LastGC float64 `json:"last_gc"`
}
func (ms GoMemStats) ToMap() map[string]any {
m := make(map[string]any, 23)
m["alloc"] = ms.Alloc
m["total_alloc"] = ms.TotalAlloc
m["sys"] = ms.Sys
m["mallocs"] = ms.Mallocs
m["frees"] = ms.Frees
m["lookups"] = ms.Lookups
m["heap_alloc"] = ms.HeapAlloc
m["heap_sys"] = ms.HeapSys
m["heap_idle"] = ms.HeapIdle
m["heap_inuse"] = ms.HeapInuse
m["heap_released"] = ms.HeapReleased
m["heap_objects"] = ms.HeapObjects
m["stack_inuse"] = ms.StackInuse
m["stack_sys"] = ms.StackSys
m["mspan_sys"] = ms.MSpanSys
m["mspan_inuse"] = ms.MSpanInuse
m["mcache_inuse"] = ms.MCacheInuse
m["mcache_sys"] = ms.MCacheSys
m["buckhash_sys"] = ms.BuckHashSys
m["gc_sys"] = ms.GCSys
m["other_sys"] = ms.OtherSys
m["next_gc"] = ms.NextGC
m["last_gc"] = ms.LastGC
return m
}
type GCDurationStats struct {
Quantiles []Quantile `json:"quantiles"`
Sum float64 `json:"sum"`
Count uint64 `json:"count"`
}
func (s GCDurationStats) ToMap() map[string]any {
m := make(map[string]any, 3)
m["quantiles"] = s.Quantiles
m["sum"] = s.Sum
m["count"] = s.Count
return m
}
type Quantile struct {
Quantile float64 `json:"quantile"`
Value float64 `json:"value"`
}
type ProcStats struct {
CPUTime float64 `json:"cpu_time"`
MaxFDs uint64 `json:"max_fds"`
OpenFDs int `json:"open_fds"`
ResidentMemory int `json:"resident_memory"`
StartTime float64 `json:"start_time"`
VirtualMemory uint `json:"virtual_memory"`
MaxVirtualMemory uint64 `json:"max_virtual_memory"`
NetworkReceiveTotal float64 `json:"network_receive_total"`
NetworkTransmitTotal float64 `json:"network_transmit_total"`
}
func (ps ProcStats) ToMap() map[string]any {
m := make(map[string]any, 9)
m["cpu_time"] = ps.CPUTime
m["max_fds"] = ps.MaxFDs
m["open_fds"] = ps.OpenFDs
m["resident_memory"] = ps.ResidentMemory
m["start_time"] = ps.StartTime
m["virtual_memory"] = ps.VirtualMemory
m["max_virtual_memory"] = ps.MaxVirtualMemory
m["network_receive_total"] = ps.NetworkReceiveTotal
m["network_transmit_total"] = ps.NetworkTransmitTotal
return m
}
type CapsStats struct {
Allocated int `json:"allocated"`
Peak *int `json:"peak"`
}
func (cs *CapsStats) ToMap() map[string]any {
m := make(map[string]any, 2)
m["allocated"] = cs.Allocated
m["peak"] = cs.Peak
return m
}
func computeAppMetrics() (StatusMetrics, error) {
vers, err := utils.GetCGRVersion()
if err != nil {
return StatusMetrics{}, err
}
var m runtime.MemStats
runtime.ReadMemStats(&m)
memStats := GoMemStats{
Alloc: m.Alloc,
TotalAlloc: m.TotalAlloc,
Sys: m.Sys,
Mallocs: m.Mallocs,
Frees: m.Frees,
HeapAlloc: m.HeapAlloc,
HeapSys: m.HeapSys,
HeapIdle: m.HeapIdle,
HeapInuse: m.HeapInuse,
HeapReleased: m.HeapReleased,
HeapObjects: m.HeapObjects,
StackInuse: m.StackInuse,
StackSys: m.StackSys,
MSpanInuse: m.MSpanInuse,
MSpanSys: m.MSpanSys,
MCacheInuse: m.MCacheInuse,
MCacheSys: m.MCacheSys,
BuckHashSys: m.BuckHashSys,
GCSys: m.GCSys,
OtherSys: m.OtherSys,
NextGC: m.NextGC,
Lookups: m.Lookups,
}
threads, _ := runtime.ThreadCreateProfile(nil)
var stats debug.GCStats
stats.PauseQuantiles = make([]time.Duration, 5)
debug.ReadGCStats(&stats)
quantiles := make([]Quantile, 0, 5)
// Add the first quantile separately
quantiles = append(quantiles, Quantile{
Quantile: 0.0,
Value: stats.PauseQuantiles[0].Seconds(),
})
for idx, pq := range stats.PauseQuantiles[1:] {
q := Quantile{
Quantile: float64(idx+1) / float64(len(stats.PauseQuantiles)-1),
Value: pq.Seconds(),
}
quantiles = append(quantiles, q)
}
gcDur := GCDurationStats{
Quantiles: quantiles,
Count: uint64(stats.NumGC),
Sum: stats.PauseTotal.Seconds(),
}
memStats.LastGC = float64(stats.LastGC.UnixNano()) / 1e9
// Process metrics
pid := os.Getpid()
p, err := procfs.NewProc(pid)
if err != nil {
return StatusMetrics{}, err
}
procStats := ProcStats{}
if stat, err := p.Stat(); err == nil {
procStats.CPUTime = stat.CPUTime()
procStats.VirtualMemory = stat.VirtualMemory()
procStats.ResidentMemory = stat.ResidentMemory()
if startTime, err := stat.StartTime(); err == nil {
procStats.StartTime = startTime
} else {
return StatusMetrics{}, err
}
} else {
return StatusMetrics{}, err
}
if fds, err := p.FileDescriptorsLen(); err == nil {
procStats.OpenFDs = fds
} else {
return StatusMetrics{}, err
}
if limits, err := p.Limits(); err == nil {
procStats.MaxFDs = limits.OpenFiles
procStats.MaxVirtualMemory = limits.AddressSpace
} else {
return StatusMetrics{}, err
}
if netstat, err := p.Netstat(); err == nil {
var inOctets, outOctets float64
if netstat.IpExt.InOctets != nil {
inOctets = *netstat.IpExt.InOctets
}
if netstat.IpExt.OutOctets != nil {
outOctets = *netstat.IpExt.OutOctets
}
procStats.NetworkReceiveTotal = inOctets
procStats.NetworkTransmitTotal = outOctets
} else {
return StatusMetrics{}, err
}
return StatusMetrics{
PID: pid,
GoVersion: runtime.Version(),
Version: vers,
Goroutines: runtime.NumGoroutine(),
Threads: threads,
MemStats: memStats,
GCDurationStats: gcDur,
ProcStats: procStats,
}, nil
}