From 63129feb4cd0a81ace4d330c6bc7d988ccb5fb94 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 24 Jul 2024 19:00:28 +0300 Subject: [PATCH] prevent memprof file overwrites for small intervals Previously, the timestamp was accurate only down to seconds. For smaller intervals, this would truncate the previous file(s). Now, for small intervals, the number of microseconds is appended to the file name. Added the possibility to disable timestamps in the memory profile file names and use increments of 1 instead. Updated the memory profiling integration tests. --- apier/v1/core_it_test.go | 171 +++++++++++++++--------- cmd/cgr-engine/cgr-engine.go | 8 +- cmd/cgr-engine/cgr-engine_flags_test.go | 7 + cores/core.go | 33 ++++- docs/cgr-engine.rst | 2 + utils/consts.go | 1 + 6 files changed, 152 insertions(+), 70 deletions(-) diff --git a/apier/v1/core_it_test.go b/apier/v1/core_it_test.go index a47b0a906..1119da975 100644 --- a/apier/v1/core_it_test.go +++ b/apier/v1/core_it_test.go @@ -22,10 +22,14 @@ along with this program. If not, see package v1 import ( - "fmt" + "errors" + "io/fs" "os" "os/exec" "path" + "path/filepath" + "strconv" + "strings" "testing" "time" @@ -46,6 +50,7 @@ var ( coreV1Rpc *birpc.Client coreV1ConfDIR string //run tests for specific configuration argPath string + memProfNr int sTestCoreSv1 = []func(t *testing.T){ testCoreSv1LoadCofig, testCoreSv1InitDataDB, @@ -66,7 +71,6 @@ var ( testCoreSv1Sleep, testCoreSv1StopMemoryProfiling, testCoreSv1KillEngine, - testCoreSv1CheckFinalMemProfiling, // test CPU and Memory just by APIs testCoreSv1StartEngine, @@ -84,12 +88,11 @@ var ( testCoreSv1Sleep, testCoreSv1StopMemoryProfiling, testCoreSv1KillEngine, - testCoreSv1CheckFinalMemProfiling, } ) func TestITCoreSv1(t *testing.T) { - argPath = "/tmp" + argPath = t.TempDir() switch *utils.DBType { case utils.MetaInternal: coreV1ConfDIR = "tutinternal" @@ -132,11 +135,9 @@ func testCoreSv1StartEngineByExecWithCPUProfiling(t *testing.T) { } fib := utils.FibDuration(time.Millisecond, 0) var connected bool - for i := 0; i < 200; i++ { + for i := 0; i < 16; i++ { time.Sleep(fib()) - if _, err := jsonrpc.Dial(utils.TCP, coreV1Cfg.ListenCfg().RPCJSONListen); err != nil { - t.Log(err) - } else { + if _, err := jsonrpc.Dial(utils.TCP, coreV1Cfg.ListenCfg().RPCJSONListen); err == nil { connected = true break } @@ -158,7 +159,7 @@ func testCoreSv1StartCPUProfilingErrorAlreadyStarted(t *testing.T) { dirPath := &utils.DirectoryArgs{ DirPath: argPath, } - expectedErr := "CPU profiling already started" + expectedErr := "start CPU profiling: already started" if err := coreV1Rpc.Call(context.Background(), utils.CoreSv1StartCPUProfiling, dirPath, &reply); err == nil || err.Error() != expectedErr { t.Errorf("Expected %+v, received %+v", expectedErr, err) @@ -173,7 +174,7 @@ func testCoreSv1StartEngine(t *testing.T) { func testCoreSv1StopMemProfilingBeforeStart(t *testing.T) { var reply string - expectedErr := " Memory Profiling is not started" + expectedErr := "stop memory profiling: not started yet" if err := coreV1Rpc.Call(context.Background(), utils.CoreSv1StopMemoryProfiling, new(utils.TenantWithAPIOpts), &reply); err == nil || err.Error() != expectedErr { t.Errorf("Expected %+q, received %+q", expectedErr, err) @@ -182,17 +183,15 @@ func testCoreSv1StopMemProfilingBeforeStart(t *testing.T) { func testCoreSv1StartEngineByExecWIthMemProfiling(t *testing.T) { engine := exec.Command("cgr-engine", "-config_path", coreV1CfgPath, - "-memprof_dir", argPath, "-memprof_interval", "100ms", "-memprof_maxfiles", "2") + "-memprof_dir", argPath, "-memprof_interval", "100ms", "-memprof_maxfiles", "2", "-memprof_timestamp") if err := engine.Start(); err != nil { t.Error(err) } fib := utils.FibDuration(time.Millisecond, 0) var connected bool - for i := 0; i < 200; i++ { + for i := 0; i < 16; i++ { time.Sleep(fib()) - if _, err := jsonrpc.Dial(utils.TCP, coreV1Cfg.ListenCfg().RPCJSONListen); err != nil { - t.Log(err) - } else { + if _, err := jsonrpc.Dial(utils.TCP, coreV1Cfg.ListenCfg().RPCJSONListen); err == nil { connected = true break } @@ -200,6 +199,7 @@ func testCoreSv1StartEngineByExecWIthMemProfiling(t *testing.T) { if !connected { t.Errorf("engine did not open port <%s>", coreV1Cfg.ListenCfg().RPCJSONListen) } + memProfNr = 3 } func testCoreSv1StopMemoryProfiling(t *testing.T) { @@ -211,58 +211,17 @@ func testCoreSv1StopMemoryProfiling(t *testing.T) { t.Errorf("Unexpected reply returned") } time.Sleep(10 * time.Millisecond) - - //mem_prof1, mem_prof2 - for i := 1; i <= 2; i++ { - file, err := os.Open(path.Join(argPath, fmt.Sprintf("mem%v.prof", i))) - if err != nil { - t.Error(err) - } - defer file.Close() - - //compare the size - size, err := file.Stat() - if err != nil { - t.Error(err) - } else if size.Size() < int64(300) { - t.Errorf("Size of MemoryProfile %v is lower that expected", size.Size()) - } - //after we checked that CPUProfile was made successfully, can delete it - if err := os.Remove(path.Join(argPath, fmt.Sprintf("mem%v.prof", i))); err != nil { - t.Error(err) - } - } -} - -func testCoreSv1CheckFinalMemProfiling(t *testing.T) { - // as the engine was killed, mem_final.prof was created and we must check it - file, err := os.Open(path.Join(argPath, fmt.Sprintf(utils.MemProfFinalFile))) - if err != nil { - t.Error(err) - } - defer file.Close() - - //compare the size - size, err := file.Stat() - if err != nil { - t.Error(err) - } else if size.Size() < int64(300) { - t.Errorf("Size of MemoryProfile %v is lower that expected", size.Size()) - } - //after we checked that CPUProfile was made successfully, can delete it - if err := os.Remove(path.Join(argPath, fmt.Sprintf(utils.MemProfFinalFile))); err != nil { - t.Error(err) - } } func testCoreSv1StartMemProfilingErrorAlreadyStarted(t *testing.T) { var reply string args := &cores.MemoryProfilingParams{ - DirPath: argPath, - Interval: 100 * time.Millisecond, - MaxFiles: 2, + DirPath: argPath, + Interval: 100 * time.Millisecond, + MaxFiles: 2, + UseTimestamp: true, } - expErr := "Memory Profiling already started" + expErr := "start memory profiling: already started" if err := coreV1Rpc.Call(context.Background(), utils.CoreSv1StartMemoryProfiling, args, &reply); err == nil || err.Error() != expErr { t.Errorf("Expected %+v, received %+v", expErr, err) @@ -271,7 +230,7 @@ func testCoreSv1StartMemProfilingErrorAlreadyStarted(t *testing.T) { func testCoreSv1StopCPUProfilingBeforeStart(t *testing.T) { var reply string - expectedErr := " cannot stop because CPUProfiling is not active" + expectedErr := "stop CPU profiling: not started yet" if err := coreV1Rpc.Call(context.Background(), utils.CoreSv1StopCPUProfiling, new(utils.TenantWithAPIOpts), &reply); err == nil || err.Error() != expectedErr { t.Errorf("Expected %+q, received %+q", expectedErr, err) @@ -335,9 +294,10 @@ func testCoreSv1StopCPUProfiling(t *testing.T) { func testCoreSv1StartMemoryProfiling(t *testing.T) { var reply string args := cores.MemoryProfilingParams{ - DirPath: argPath, - Interval: 100 * time.Millisecond, - MaxFiles: 2, + DirPath: argPath, + Interval: 100 * time.Millisecond, + MaxFiles: 2, + UseTimestamp: true, } if err := coreV1Rpc.Call(context.Background(), utils.CoreSv1StartMemoryProfiling, args, &reply); err != nil { @@ -345,10 +305,91 @@ func testCoreSv1StartMemoryProfiling(t *testing.T) { } else if reply != utils.OK { t.Errorf("Unexpected reply returned") } + memProfNr = 5 } func testCoreSv1KillEngine(t *testing.T) { if err := engine.KillEngine(*utils.WaitRater); err != nil { t.Error(err) } + time.Sleep(time.Second) + checkMemProfiles(t, argPath, memProfNr) +} + +func checkMemProfiles(t *testing.T, memDirPath string, wantCount int) { + t.Helper() + hasFinal := false + memFileCount := 0 + _ = filepath.WalkDir(memDirPath, func(path string, d fs.DirEntry, err error) error { + if err != nil { + t.Logf("failed to access path %s: %v", path, err) + return nil // skip paths that cause an error + } + defer func() { + }() + switch { + case d.IsDir(): + // Memory profiles should be directly under 'memDirPath', skip all directories (excluding 'memDirPath') + // and their contents. + if path == memDirPath { + return nil + } + return filepath.SkipDir + case !strings.HasPrefix(d.Name(), "mem_") || !strings.HasSuffix(d.Name(), ".prof"): + return nil // skip files that don't have 'mem_*.prof' format + case d.Name() == utils.MemProfFinalFile: + hasFinal = true + fallthrough // test should be the same as for a normal mem file + default: // files with format 'mem_*.prof' + fi, err := d.Info() + if err != nil { + t.Errorf("failed to retrieve FileInfo from %q: %v", path, err) + } + if fi.Size() == 0 { + t.Errorf("memory profile file %q is empty", path) + } + if d.Name() != utils.MemProfFinalFile { + // Check that date within file name is from within this minute. + layout := "20060102150405" + timestamp := strings.TrimPrefix(d.Name(), "mem_") + timestamp = strings.TrimSuffix(timestamp, ".prof") + date, extra, has := strings.Cut(timestamp, "_") + if !has { + t.Errorf("expected timestamp to have '_' format, got: %s", timestamp) + } + parsedTime, err := time.ParseInLocation(layout, date, time.Local) + if err != nil { + t.Errorf("time.Parse(%q,%q) returned unexpected err: %v", layout, date, err) + } + + // Convert 'extra' to microseconds and add to the parsed time. + microSCount, err := strconv.Atoi(extra) + if err != nil { + t.Errorf("strconv.Atoi(%q) returned unexpected err: %v", extra, err) + } + parsedTime.Add(time.Duration(microSCount) * time.Microsecond) + + now := time.Now() + oneMinuteEarlier := now.Add(-time.Minute) + if parsedTime.Before(oneMinuteEarlier) || parsedTime.After(now) { + t.Errorf("file name (%s) timestamp not from within last minute", d.Name()) + } + } + memFileCount++ + } + return nil + }) + + if wantCount != 0 && !hasFinal { + t.Error("final mem file is missing") + } + if memFileCount != wantCount { + t.Errorf("memory file count = %d, want %d (including final mem profile)", memFileCount, wantCount) + } + + if err := os.Remove(filepath.Join(memDirPath, utils.MemProfFinalFile)); err != nil && + !errors.Is(err, fs.ErrNotExist) { + t.Error(err) + } + memProfNr = 0 } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 5de47ab60..eba2b8609 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -59,6 +59,7 @@ var ( memProfDir = cgrEngineFlags.String(utils.MemProfDirCgr, utils.EmptyString, "Directory for memory profiles") memProfInterval = cgrEngineFlags.Duration(utils.MemProfIntervalCgr, 15*time.Second, "Interval between memory profile saves") memProfMaxFiles = cgrEngineFlags.Int(utils.MemProfMaxFilesCgr, 1, "Number of memory profiles to keep (most recent)") + memProfTimestamp = cgrEngineFlags.Bool(utils.MemProfTimestampCgr, false, "Add timestamp to memory profile files") scheduledShutdown = cgrEngineFlags.Duration(utils.ScheduledShutdownCgr, 0, "Shutdown the engine after the specified duration") singleCPU = cgrEngineFlags.Bool(utils.SingleCpuCgr, false, "Run on a single CPU core") syslogger = cgrEngineFlags.String(utils.LoggerCfg, utils.EmptyString, "Logger type <*syslog|*stdout>") @@ -694,9 +695,10 @@ func main() { if *memProfDir != utils.EmptyString { if err := cS.StartMemoryProfiling(cores.MemoryProfilingParams{ - DirPath: *memProfDir, - Interval: *memProfInterval, - MaxFiles: *memProfMaxFiles, + DirPath: *memProfDir, + Interval: *memProfInterval, + MaxFiles: *memProfMaxFiles, + UseTimestamp: *memProfTimestamp, }); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> %v", utils.CoreS, err)) return diff --git a/cmd/cgr-engine/cgr-engine_flags_test.go b/cmd/cgr-engine/cgr-engine_flags_test.go index 109dbec85..18ec0951b 100644 --- a/cmd/cgr-engine/cgr-engine_flags_test.go +++ b/cmd/cgr-engine/cgr-engine_flags_test.go @@ -98,6 +98,13 @@ func TestCgrEngineFlags(t *testing.T) { defaultVal: 1, want: 3, }, + { + name: "memProfTimestamp", + flags: []string{"-memprof_timestamp"}, + flagVar: memProfTimestamp, + defaultVal: false, + want: true, + }, { name: "scheduledShutdown", flags: []string{"-scheduled_shutdown", "1h"}, diff --git a/cores/core.go b/cores/core.go index ba2360e50..75d876476 100644 --- a/cores/core.go +++ b/cores/core.go @@ -136,7 +136,14 @@ type MemoryProfilingParams struct { DirPath string // directory path where memory profiles will be saved Interval time.Duration // duration between consecutive memory profile captures MaxFiles int // maximum number of profile files to retain - APIOpts map[string]any + + // UseTimestamp determines if the filename includes a timestamp. + // The format is 'mem_20060102150405[_].prof'. + // Microseconds are included if the interval is less than one second to avoid duplicate names. + // If false, filenames follow an incremental format: 'mem_.prof'. + UseTimestamp bool + + APIOpts map[string]any } // StartMemoryProfiling starts memory profiling in the specified directory. @@ -171,16 +178,38 @@ func (cS *CoreService) StartMemoryProfiling(params MemoryProfilingParams) error return nil } +// newMemProfNameFunc returns a closure that generates memory profile filenames. +func newMemProfNameFunc(interval time.Duration, useTimestamp bool) func() string { + if !useTimestamp { + i := 0 + return func() string { + i++ + return fmt.Sprintf("mem_%d.prof", i) + } + } + if interval < time.Second { + return func() string { + now := time.Now() + return fmt.Sprintf("mem_%s_%d.prof", now.Format("20060102150405"), now.Nanosecond()/1e3) + } + } + + return func() string { + return fmt.Sprintf("mem_%s.prof", time.Now().Format("20060102150405")) + } +} + // profileMemory runs the memory profiling loop, writing profiles to files at the specified interval. func (cS *CoreService) profileMemory(params MemoryProfilingParams) { defer cS.shdWg.Done() + fileName := newMemProfNameFunc(params.Interval, params.UseTimestamp) ticker := time.NewTicker(params.Interval) defer ticker.Stop() files := make([]string, 0, params.MaxFiles) for { select { case <-ticker.C: - path := filepath.Join(params.DirPath, fmt.Sprintf("mem_%s.prof", time.Now().Format("20060102150405"))) // mem20060102150405.prof + path := filepath.Join(params.DirPath, fileName()) if err := writeHeapProfile(path); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> %v", utils.CoreS, err)) cS.StopMemoryProfiling() diff --git a/docs/cgr-engine.rst b/docs/cgr-engine.rst index 6be45a08b..890dc41d1 100644 --- a/docs/cgr-engine.rst +++ b/docs/cgr-engine.rst @@ -30,6 +30,8 @@ Able to read the configuration from either a local directory of *.json* files w Interval between memory profile saves (default 15s) -memprof_maxfiles int Number of memory profiles to keep (most recent) (default 1) + -memprof_timestamp + Add timestamp to memory profile files -node_id string Node ID of the engine -pid string diff --git a/utils/consts.go b/utils/consts.go index eb3da55a6..8d701781d 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2886,6 +2886,7 @@ const ( MemProfDirCgr = "memprof_dir" MemProfIntervalCgr = "memprof_interval" MemProfMaxFilesCgr = "memprof_maxfiles" + MemProfTimestampCgr = "memprof_timestamp" ScheduledShutdownCgr = "scheduled_shutdown" SingleCpuCgr = "singlecpu" PreloadCgr = "preload"