prevent memprof file overwrites for small intervals

Previously, the timestamp was accurate only down to seconds. For smaller intervals,
this would truncate the previous file(s). Now, for small intervals, the number of
microseconds is appended to the file name.

Added the possibility to disable timestamps in the memory profile file names and use
increments of 1 instead.

Updated the memory profiling integration tests.
This commit is contained in:
ionutboangiu
2024-07-24 19:00:28 +03:00
committed by Dan Christian Bogos
parent 218ad92635
commit 63129feb4c
6 changed files with 152 additions and 70 deletions

View File

@@ -22,10 +22,14 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package v1
import (
"fmt"
"errors"
"io/fs"
"os"
"os/exec"
"path"
"path/filepath"
"strconv"
"strings"
"testing"
"time"
@@ -46,6 +50,7 @@ var (
coreV1Rpc *birpc.Client
coreV1ConfDIR string //run tests for specific configuration
argPath string
memProfNr int
sTestCoreSv1 = []func(t *testing.T){
testCoreSv1LoadCofig,
testCoreSv1InitDataDB,
@@ -66,7 +71,6 @@ var (
testCoreSv1Sleep,
testCoreSv1StopMemoryProfiling,
testCoreSv1KillEngine,
testCoreSv1CheckFinalMemProfiling,
// test CPU and Memory just by APIs
testCoreSv1StartEngine,
@@ -84,12 +88,11 @@ var (
testCoreSv1Sleep,
testCoreSv1StopMemoryProfiling,
testCoreSv1KillEngine,
testCoreSv1CheckFinalMemProfiling,
}
)
func TestITCoreSv1(t *testing.T) {
argPath = "/tmp"
argPath = t.TempDir()
switch *utils.DBType {
case utils.MetaInternal:
coreV1ConfDIR = "tutinternal"
@@ -132,11 +135,9 @@ func testCoreSv1StartEngineByExecWithCPUProfiling(t *testing.T) {
}
fib := utils.FibDuration(time.Millisecond, 0)
var connected bool
for i := 0; i < 200; i++ {
for i := 0; i < 16; i++ {
time.Sleep(fib())
if _, err := jsonrpc.Dial(utils.TCP, coreV1Cfg.ListenCfg().RPCJSONListen); err != nil {
t.Log(err)
} else {
if _, err := jsonrpc.Dial(utils.TCP, coreV1Cfg.ListenCfg().RPCJSONListen); err == nil {
connected = true
break
}
@@ -158,7 +159,7 @@ func testCoreSv1StartCPUProfilingErrorAlreadyStarted(t *testing.T) {
dirPath := &utils.DirectoryArgs{
DirPath: argPath,
}
expectedErr := "CPU profiling already started"
expectedErr := "start CPU profiling: already started"
if err := coreV1Rpc.Call(context.Background(), utils.CoreSv1StartCPUProfiling,
dirPath, &reply); err == nil || err.Error() != expectedErr {
t.Errorf("Expected %+v, received %+v", expectedErr, err)
@@ -173,7 +174,7 @@ func testCoreSv1StartEngine(t *testing.T) {
func testCoreSv1StopMemProfilingBeforeStart(t *testing.T) {
var reply string
expectedErr := " Memory Profiling is not started"
expectedErr := "stop memory profiling: not started yet"
if err := coreV1Rpc.Call(context.Background(), utils.CoreSv1StopMemoryProfiling,
new(utils.TenantWithAPIOpts), &reply); err == nil || err.Error() != expectedErr {
t.Errorf("Expected %+q, received %+q", expectedErr, err)
@@ -182,17 +183,15 @@ func testCoreSv1StopMemProfilingBeforeStart(t *testing.T) {
func testCoreSv1StartEngineByExecWIthMemProfiling(t *testing.T) {
engine := exec.Command("cgr-engine", "-config_path", coreV1CfgPath,
"-memprof_dir", argPath, "-memprof_interval", "100ms", "-memprof_maxfiles", "2")
"-memprof_dir", argPath, "-memprof_interval", "100ms", "-memprof_maxfiles", "2", "-memprof_timestamp")
if err := engine.Start(); err != nil {
t.Error(err)
}
fib := utils.FibDuration(time.Millisecond, 0)
var connected bool
for i := 0; i < 200; i++ {
for i := 0; i < 16; i++ {
time.Sleep(fib())
if _, err := jsonrpc.Dial(utils.TCP, coreV1Cfg.ListenCfg().RPCJSONListen); err != nil {
t.Log(err)
} else {
if _, err := jsonrpc.Dial(utils.TCP, coreV1Cfg.ListenCfg().RPCJSONListen); err == nil {
connected = true
break
}
@@ -200,6 +199,7 @@ func testCoreSv1StartEngineByExecWIthMemProfiling(t *testing.T) {
if !connected {
t.Errorf("engine did not open port <%s>", coreV1Cfg.ListenCfg().RPCJSONListen)
}
memProfNr = 3
}
func testCoreSv1StopMemoryProfiling(t *testing.T) {
@@ -211,58 +211,17 @@ func testCoreSv1StopMemoryProfiling(t *testing.T) {
t.Errorf("Unexpected reply returned")
}
time.Sleep(10 * time.Millisecond)
//mem_prof1, mem_prof2
for i := 1; i <= 2; i++ {
file, err := os.Open(path.Join(argPath, fmt.Sprintf("mem%v.prof", i)))
if err != nil {
t.Error(err)
}
defer file.Close()
//compare the size
size, err := file.Stat()
if err != nil {
t.Error(err)
} else if size.Size() < int64(300) {
t.Errorf("Size of MemoryProfile %v is lower that expected", size.Size())
}
//after we checked that CPUProfile was made successfully, can delete it
if err := os.Remove(path.Join(argPath, fmt.Sprintf("mem%v.prof", i))); err != nil {
t.Error(err)
}
}
}
func testCoreSv1CheckFinalMemProfiling(t *testing.T) {
// as the engine was killed, mem_final.prof was created and we must check it
file, err := os.Open(path.Join(argPath, fmt.Sprintf(utils.MemProfFinalFile)))
if err != nil {
t.Error(err)
}
defer file.Close()
//compare the size
size, err := file.Stat()
if err != nil {
t.Error(err)
} else if size.Size() < int64(300) {
t.Errorf("Size of MemoryProfile %v is lower that expected", size.Size())
}
//after we checked that CPUProfile was made successfully, can delete it
if err := os.Remove(path.Join(argPath, fmt.Sprintf(utils.MemProfFinalFile))); err != nil {
t.Error(err)
}
}
func testCoreSv1StartMemProfilingErrorAlreadyStarted(t *testing.T) {
var reply string
args := &cores.MemoryProfilingParams{
DirPath: argPath,
Interval: 100 * time.Millisecond,
MaxFiles: 2,
DirPath: argPath,
Interval: 100 * time.Millisecond,
MaxFiles: 2,
UseTimestamp: true,
}
expErr := "Memory Profiling already started"
expErr := "start memory profiling: already started"
if err := coreV1Rpc.Call(context.Background(), utils.CoreSv1StartMemoryProfiling,
args, &reply); err == nil || err.Error() != expErr {
t.Errorf("Expected %+v, received %+v", expErr, err)
@@ -271,7 +230,7 @@ func testCoreSv1StartMemProfilingErrorAlreadyStarted(t *testing.T) {
func testCoreSv1StopCPUProfilingBeforeStart(t *testing.T) {
var reply string
expectedErr := " cannot stop because CPUProfiling is not active"
expectedErr := "stop CPU profiling: not started yet"
if err := coreV1Rpc.Call(context.Background(), utils.CoreSv1StopCPUProfiling,
new(utils.TenantWithAPIOpts), &reply); err == nil || err.Error() != expectedErr {
t.Errorf("Expected %+q, received %+q", expectedErr, err)
@@ -335,9 +294,10 @@ func testCoreSv1StopCPUProfiling(t *testing.T) {
func testCoreSv1StartMemoryProfiling(t *testing.T) {
var reply string
args := cores.MemoryProfilingParams{
DirPath: argPath,
Interval: 100 * time.Millisecond,
MaxFiles: 2,
DirPath: argPath,
Interval: 100 * time.Millisecond,
MaxFiles: 2,
UseTimestamp: true,
}
if err := coreV1Rpc.Call(context.Background(), utils.CoreSv1StartMemoryProfiling,
args, &reply); err != nil {
@@ -345,10 +305,91 @@ func testCoreSv1StartMemoryProfiling(t *testing.T) {
} else if reply != utils.OK {
t.Errorf("Unexpected reply returned")
}
memProfNr = 5
}
func testCoreSv1KillEngine(t *testing.T) {
if err := engine.KillEngine(*utils.WaitRater); err != nil {
t.Error(err)
}
time.Sleep(time.Second)
checkMemProfiles(t, argPath, memProfNr)
}
func checkMemProfiles(t *testing.T, memDirPath string, wantCount int) {
t.Helper()
hasFinal := false
memFileCount := 0
_ = filepath.WalkDir(memDirPath, func(path string, d fs.DirEntry, err error) error {
if err != nil {
t.Logf("failed to access path %s: %v", path, err)
return nil // skip paths that cause an error
}
defer func() {
}()
switch {
case d.IsDir():
// Memory profiles should be directly under 'memDirPath', skip all directories (excluding 'memDirPath')
// and their contents.
if path == memDirPath {
return nil
}
return filepath.SkipDir
case !strings.HasPrefix(d.Name(), "mem_") || !strings.HasSuffix(d.Name(), ".prof"):
return nil // skip files that don't have 'mem_*.prof' format
case d.Name() == utils.MemProfFinalFile:
hasFinal = true
fallthrough // test should be the same as for a normal mem file
default: // files with format 'mem_*.prof'
fi, err := d.Info()
if err != nil {
t.Errorf("failed to retrieve FileInfo from %q: %v", path, err)
}
if fi.Size() == 0 {
t.Errorf("memory profile file %q is empty", path)
}
if d.Name() != utils.MemProfFinalFile {
// Check that date within file name is from within this minute.
layout := "20060102150405"
timestamp := strings.TrimPrefix(d.Name(), "mem_")
timestamp = strings.TrimSuffix(timestamp, ".prof")
date, extra, has := strings.Cut(timestamp, "_")
if !has {
t.Errorf("expected timestamp to have '<date>_<milliseconds>' format, got: %s", timestamp)
}
parsedTime, err := time.ParseInLocation(layout, date, time.Local)
if err != nil {
t.Errorf("time.Parse(%q,%q) returned unexpected err: %v", layout, date, err)
}
// Convert 'extra' to microseconds and add to the parsed time.
microSCount, err := strconv.Atoi(extra)
if err != nil {
t.Errorf("strconv.Atoi(%q) returned unexpected err: %v", extra, err)
}
parsedTime.Add(time.Duration(microSCount) * time.Microsecond)
now := time.Now()
oneMinuteEarlier := now.Add(-time.Minute)
if parsedTime.Before(oneMinuteEarlier) || parsedTime.After(now) {
t.Errorf("file name (%s) timestamp not from within last minute", d.Name())
}
}
memFileCount++
}
return nil
})
if wantCount != 0 && !hasFinal {
t.Error("final mem file is missing")
}
if memFileCount != wantCount {
t.Errorf("memory file count = %d, want %d (including final mem profile)", memFileCount, wantCount)
}
if err := os.Remove(filepath.Join(memDirPath, utils.MemProfFinalFile)); err != nil &&
!errors.Is(err, fs.ErrNotExist) {
t.Error(err)
}
memProfNr = 0
}

View File

@@ -59,6 +59,7 @@ var (
memProfDir = cgrEngineFlags.String(utils.MemProfDirCgr, utils.EmptyString, "Directory for memory profiles")
memProfInterval = cgrEngineFlags.Duration(utils.MemProfIntervalCgr, 15*time.Second, "Interval between memory profile saves")
memProfMaxFiles = cgrEngineFlags.Int(utils.MemProfMaxFilesCgr, 1, "Number of memory profiles to keep (most recent)")
memProfTimestamp = cgrEngineFlags.Bool(utils.MemProfTimestampCgr, false, "Add timestamp to memory profile files")
scheduledShutdown = cgrEngineFlags.Duration(utils.ScheduledShutdownCgr, 0, "Shutdown the engine after the specified duration")
singleCPU = cgrEngineFlags.Bool(utils.SingleCpuCgr, false, "Run on a single CPU core")
syslogger = cgrEngineFlags.String(utils.LoggerCfg, utils.EmptyString, "Logger type <*syslog|*stdout>")
@@ -694,9 +695,10 @@ func main() {
if *memProfDir != utils.EmptyString {
if err := cS.StartMemoryProfiling(cores.MemoryProfilingParams{
DirPath: *memProfDir,
Interval: *memProfInterval,
MaxFiles: *memProfMaxFiles,
DirPath: *memProfDir,
Interval: *memProfInterval,
MaxFiles: *memProfMaxFiles,
UseTimestamp: *memProfTimestamp,
}); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> %v", utils.CoreS, err))
return

View File

@@ -98,6 +98,13 @@ func TestCgrEngineFlags(t *testing.T) {
defaultVal: 1,
want: 3,
},
{
name: "memProfTimestamp",
flags: []string{"-memprof_timestamp"},
flagVar: memProfTimestamp,
defaultVal: false,
want: true,
},
{
name: "scheduledShutdown",
flags: []string{"-scheduled_shutdown", "1h"},

View File

@@ -136,7 +136,14 @@ type MemoryProfilingParams struct {
DirPath string // directory path where memory profiles will be saved
Interval time.Duration // duration between consecutive memory profile captures
MaxFiles int // maximum number of profile files to retain
APIOpts map[string]any
// UseTimestamp determines if the filename includes a timestamp.
// The format is 'mem_20060102150405[_<microseconds>].prof'.
// Microseconds are included if the interval is less than one second to avoid duplicate names.
// If false, filenames follow an incremental format: 'mem_<n>.prof'.
UseTimestamp bool
APIOpts map[string]any
}
// StartMemoryProfiling starts memory profiling in the specified directory.
@@ -171,16 +178,38 @@ func (cS *CoreService) StartMemoryProfiling(params MemoryProfilingParams) error
return nil
}
// newMemProfNameFunc returns a closure that generates memory profile filenames.
func newMemProfNameFunc(interval time.Duration, useTimestamp bool) func() string {
if !useTimestamp {
i := 0
return func() string {
i++
return fmt.Sprintf("mem_%d.prof", i)
}
}
if interval < time.Second {
return func() string {
now := time.Now()
return fmt.Sprintf("mem_%s_%d.prof", now.Format("20060102150405"), now.Nanosecond()/1e3)
}
}
return func() string {
return fmt.Sprintf("mem_%s.prof", time.Now().Format("20060102150405"))
}
}
// profileMemory runs the memory profiling loop, writing profiles to files at the specified interval.
func (cS *CoreService) profileMemory(params MemoryProfilingParams) {
defer cS.shdWg.Done()
fileName := newMemProfNameFunc(params.Interval, params.UseTimestamp)
ticker := time.NewTicker(params.Interval)
defer ticker.Stop()
files := make([]string, 0, params.MaxFiles)
for {
select {
case <-ticker.C:
path := filepath.Join(params.DirPath, fmt.Sprintf("mem_%s.prof", time.Now().Format("20060102150405"))) // mem20060102150405.prof
path := filepath.Join(params.DirPath, fileName())
if err := writeHeapProfile(path); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> %v", utils.CoreS, err))
cS.StopMemoryProfiling()

View File

@@ -30,6 +30,8 @@ Able to read the configuration from either a local directory of *.json* files w
Interval between memory profile saves (default 15s)
-memprof_maxfiles int
Number of memory profiles to keep (most recent) (default 1)
-memprof_timestamp
Add timestamp to memory profile files
-node_id string
Node ID of the engine
-pid string

View File

@@ -2886,6 +2886,7 @@ const (
MemProfDirCgr = "memprof_dir"
MemProfIntervalCgr = "memprof_interval"
MemProfMaxFilesCgr = "memprof_maxfiles"
MemProfTimestampCgr = "memprof_timestamp"
ScheduledShutdownCgr = "scheduled_shutdown"
SingleCpuCgr = "singlecpu"
PreloadCgr = "preload"