diff --git a/apier/v1/core_it_test.go b/apier/v1/core_it_test.go index a47b0a906..1119da975 100644 --- a/apier/v1/core_it_test.go +++ b/apier/v1/core_it_test.go @@ -22,10 +22,14 @@ along with this program. If not, see package v1 import ( - "fmt" + "errors" + "io/fs" "os" "os/exec" "path" + "path/filepath" + "strconv" + "strings" "testing" "time" @@ -46,6 +50,7 @@ var ( coreV1Rpc *birpc.Client coreV1ConfDIR string //run tests for specific configuration argPath string + memProfNr int sTestCoreSv1 = []func(t *testing.T){ testCoreSv1LoadCofig, testCoreSv1InitDataDB, @@ -66,7 +71,6 @@ var ( testCoreSv1Sleep, testCoreSv1StopMemoryProfiling, testCoreSv1KillEngine, - testCoreSv1CheckFinalMemProfiling, // test CPU and Memory just by APIs testCoreSv1StartEngine, @@ -84,12 +88,11 @@ var ( testCoreSv1Sleep, testCoreSv1StopMemoryProfiling, testCoreSv1KillEngine, - testCoreSv1CheckFinalMemProfiling, } ) func TestITCoreSv1(t *testing.T) { - argPath = "/tmp" + argPath = t.TempDir() switch *utils.DBType { case utils.MetaInternal: coreV1ConfDIR = "tutinternal" @@ -132,11 +135,9 @@ func testCoreSv1StartEngineByExecWithCPUProfiling(t *testing.T) { } fib := utils.FibDuration(time.Millisecond, 0) var connected bool - for i := 0; i < 200; i++ { + for i := 0; i < 16; i++ { time.Sleep(fib()) - if _, err := jsonrpc.Dial(utils.TCP, coreV1Cfg.ListenCfg().RPCJSONListen); err != nil { - t.Log(err) - } else { + if _, err := jsonrpc.Dial(utils.TCP, coreV1Cfg.ListenCfg().RPCJSONListen); err == nil { connected = true break } @@ -158,7 +159,7 @@ func testCoreSv1StartCPUProfilingErrorAlreadyStarted(t *testing.T) { dirPath := &utils.DirectoryArgs{ DirPath: argPath, } - expectedErr := "CPU profiling already started" + expectedErr := "start CPU profiling: already started" if err := coreV1Rpc.Call(context.Background(), utils.CoreSv1StartCPUProfiling, dirPath, &reply); err == nil || err.Error() != expectedErr { t.Errorf("Expected %+v, received %+v", expectedErr, err) @@ -173,7 +174,7 @@ func testCoreSv1StartEngine(t *testing.T) { func testCoreSv1StopMemProfilingBeforeStart(t *testing.T) { var reply string - expectedErr := " Memory Profiling is not started" + expectedErr := "stop memory profiling: not started yet" if err := coreV1Rpc.Call(context.Background(), utils.CoreSv1StopMemoryProfiling, new(utils.TenantWithAPIOpts), &reply); err == nil || err.Error() != expectedErr { t.Errorf("Expected %+q, received %+q", expectedErr, err) @@ -182,17 +183,15 @@ func testCoreSv1StopMemProfilingBeforeStart(t *testing.T) { func testCoreSv1StartEngineByExecWIthMemProfiling(t *testing.T) { engine := exec.Command("cgr-engine", "-config_path", coreV1CfgPath, - "-memprof_dir", argPath, "-memprof_interval", "100ms", "-memprof_maxfiles", "2") + "-memprof_dir", argPath, "-memprof_interval", "100ms", "-memprof_maxfiles", "2", "-memprof_timestamp") if err := engine.Start(); err != nil { t.Error(err) } fib := utils.FibDuration(time.Millisecond, 0) var connected bool - for i := 0; i < 200; i++ { + for i := 0; i < 16; i++ { time.Sleep(fib()) - if _, err := jsonrpc.Dial(utils.TCP, coreV1Cfg.ListenCfg().RPCJSONListen); err != nil { - t.Log(err) - } else { + if _, err := jsonrpc.Dial(utils.TCP, coreV1Cfg.ListenCfg().RPCJSONListen); err == nil { connected = true break } @@ -200,6 +199,7 @@ func testCoreSv1StartEngineByExecWIthMemProfiling(t *testing.T) { if !connected { t.Errorf("engine did not open port <%s>", coreV1Cfg.ListenCfg().RPCJSONListen) } + memProfNr = 3 } func testCoreSv1StopMemoryProfiling(t *testing.T) { @@ -211,58 +211,17 @@ func testCoreSv1StopMemoryProfiling(t *testing.T) { t.Errorf("Unexpected reply returned") } time.Sleep(10 * time.Millisecond) - - //mem_prof1, mem_prof2 - for i := 1; i <= 2; i++ { - file, err := os.Open(path.Join(argPath, fmt.Sprintf("mem%v.prof", i))) - if err != nil { - t.Error(err) - } - defer file.Close() - - //compare the size - size, err := file.Stat() - if err != nil { - t.Error(err) - } else if size.Size() < int64(300) { - t.Errorf("Size of MemoryProfile %v is lower that expected", size.Size()) - } - //after we checked that CPUProfile was made successfully, can delete it - if err := os.Remove(path.Join(argPath, fmt.Sprintf("mem%v.prof", i))); err != nil { - t.Error(err) - } - } -} - -func testCoreSv1CheckFinalMemProfiling(t *testing.T) { - // as the engine was killed, mem_final.prof was created and we must check it - file, err := os.Open(path.Join(argPath, fmt.Sprintf(utils.MemProfFinalFile))) - if err != nil { - t.Error(err) - } - defer file.Close() - - //compare the size - size, err := file.Stat() - if err != nil { - t.Error(err) - } else if size.Size() < int64(300) { - t.Errorf("Size of MemoryProfile %v is lower that expected", size.Size()) - } - //after we checked that CPUProfile was made successfully, can delete it - if err := os.Remove(path.Join(argPath, fmt.Sprintf(utils.MemProfFinalFile))); err != nil { - t.Error(err) - } } func testCoreSv1StartMemProfilingErrorAlreadyStarted(t *testing.T) { var reply string args := &cores.MemoryProfilingParams{ - DirPath: argPath, - Interval: 100 * time.Millisecond, - MaxFiles: 2, + DirPath: argPath, + Interval: 100 * time.Millisecond, + MaxFiles: 2, + UseTimestamp: true, } - expErr := "Memory Profiling already started" + expErr := "start memory profiling: already started" if err := coreV1Rpc.Call(context.Background(), utils.CoreSv1StartMemoryProfiling, args, &reply); err == nil || err.Error() != expErr { t.Errorf("Expected %+v, received %+v", expErr, err) @@ -271,7 +230,7 @@ func testCoreSv1StartMemProfilingErrorAlreadyStarted(t *testing.T) { func testCoreSv1StopCPUProfilingBeforeStart(t *testing.T) { var reply string - expectedErr := " cannot stop because CPUProfiling is not active" + expectedErr := "stop CPU profiling: not started yet" if err := coreV1Rpc.Call(context.Background(), utils.CoreSv1StopCPUProfiling, new(utils.TenantWithAPIOpts), &reply); err == nil || err.Error() != expectedErr { t.Errorf("Expected %+q, received %+q", expectedErr, err) @@ -335,9 +294,10 @@ func testCoreSv1StopCPUProfiling(t *testing.T) { func testCoreSv1StartMemoryProfiling(t *testing.T) { var reply string args := cores.MemoryProfilingParams{ - DirPath: argPath, - Interval: 100 * time.Millisecond, - MaxFiles: 2, + DirPath: argPath, + Interval: 100 * time.Millisecond, + MaxFiles: 2, + UseTimestamp: true, } if err := coreV1Rpc.Call(context.Background(), utils.CoreSv1StartMemoryProfiling, args, &reply); err != nil { @@ -345,10 +305,91 @@ func testCoreSv1StartMemoryProfiling(t *testing.T) { } else if reply != utils.OK { t.Errorf("Unexpected reply returned") } + memProfNr = 5 } func testCoreSv1KillEngine(t *testing.T) { if err := engine.KillEngine(*utils.WaitRater); err != nil { t.Error(err) } + time.Sleep(time.Second) + checkMemProfiles(t, argPath, memProfNr) +} + +func checkMemProfiles(t *testing.T, memDirPath string, wantCount int) { + t.Helper() + hasFinal := false + memFileCount := 0 + _ = filepath.WalkDir(memDirPath, func(path string, d fs.DirEntry, err error) error { + if err != nil { + t.Logf("failed to access path %s: %v", path, err) + return nil // skip paths that cause an error + } + defer func() { + }() + switch { + case d.IsDir(): + // Memory profiles should be directly under 'memDirPath', skip all directories (excluding 'memDirPath') + // and their contents. + if path == memDirPath { + return nil + } + return filepath.SkipDir + case !strings.HasPrefix(d.Name(), "mem_") || !strings.HasSuffix(d.Name(), ".prof"): + return nil // skip files that don't have 'mem_*.prof' format + case d.Name() == utils.MemProfFinalFile: + hasFinal = true + fallthrough // test should be the same as for a normal mem file + default: // files with format 'mem_*.prof' + fi, err := d.Info() + if err != nil { + t.Errorf("failed to retrieve FileInfo from %q: %v", path, err) + } + if fi.Size() == 0 { + t.Errorf("memory profile file %q is empty", path) + } + if d.Name() != utils.MemProfFinalFile { + // Check that date within file name is from within this minute. + layout := "20060102150405" + timestamp := strings.TrimPrefix(d.Name(), "mem_") + timestamp = strings.TrimSuffix(timestamp, ".prof") + date, extra, has := strings.Cut(timestamp, "_") + if !has { + t.Errorf("expected timestamp to have '_' format, got: %s", timestamp) + } + parsedTime, err := time.ParseInLocation(layout, date, time.Local) + if err != nil { + t.Errorf("time.Parse(%q,%q) returned unexpected err: %v", layout, date, err) + } + + // Convert 'extra' to microseconds and add to the parsed time. + microSCount, err := strconv.Atoi(extra) + if err != nil { + t.Errorf("strconv.Atoi(%q) returned unexpected err: %v", extra, err) + } + parsedTime.Add(time.Duration(microSCount) * time.Microsecond) + + now := time.Now() + oneMinuteEarlier := now.Add(-time.Minute) + if parsedTime.Before(oneMinuteEarlier) || parsedTime.After(now) { + t.Errorf("file name (%s) timestamp not from within last minute", d.Name()) + } + } + memFileCount++ + } + return nil + }) + + if wantCount != 0 && !hasFinal { + t.Error("final mem file is missing") + } + if memFileCount != wantCount { + t.Errorf("memory file count = %d, want %d (including final mem profile)", memFileCount, wantCount) + } + + if err := os.Remove(filepath.Join(memDirPath, utils.MemProfFinalFile)); err != nil && + !errors.Is(err, fs.ErrNotExist) { + t.Error(err) + } + memProfNr = 0 } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 5de47ab60..eba2b8609 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -59,6 +59,7 @@ var ( memProfDir = cgrEngineFlags.String(utils.MemProfDirCgr, utils.EmptyString, "Directory for memory profiles") memProfInterval = cgrEngineFlags.Duration(utils.MemProfIntervalCgr, 15*time.Second, "Interval between memory profile saves") memProfMaxFiles = cgrEngineFlags.Int(utils.MemProfMaxFilesCgr, 1, "Number of memory profiles to keep (most recent)") + memProfTimestamp = cgrEngineFlags.Bool(utils.MemProfTimestampCgr, false, "Add timestamp to memory profile files") scheduledShutdown = cgrEngineFlags.Duration(utils.ScheduledShutdownCgr, 0, "Shutdown the engine after the specified duration") singleCPU = cgrEngineFlags.Bool(utils.SingleCpuCgr, false, "Run on a single CPU core") syslogger = cgrEngineFlags.String(utils.LoggerCfg, utils.EmptyString, "Logger type <*syslog|*stdout>") @@ -694,9 +695,10 @@ func main() { if *memProfDir != utils.EmptyString { if err := cS.StartMemoryProfiling(cores.MemoryProfilingParams{ - DirPath: *memProfDir, - Interval: *memProfInterval, - MaxFiles: *memProfMaxFiles, + DirPath: *memProfDir, + Interval: *memProfInterval, + MaxFiles: *memProfMaxFiles, + UseTimestamp: *memProfTimestamp, }); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> %v", utils.CoreS, err)) return diff --git a/cmd/cgr-engine/cgr-engine_flags_test.go b/cmd/cgr-engine/cgr-engine_flags_test.go index 109dbec85..18ec0951b 100644 --- a/cmd/cgr-engine/cgr-engine_flags_test.go +++ b/cmd/cgr-engine/cgr-engine_flags_test.go @@ -98,6 +98,13 @@ func TestCgrEngineFlags(t *testing.T) { defaultVal: 1, want: 3, }, + { + name: "memProfTimestamp", + flags: []string{"-memprof_timestamp"}, + flagVar: memProfTimestamp, + defaultVal: false, + want: true, + }, { name: "scheduledShutdown", flags: []string{"-scheduled_shutdown", "1h"}, diff --git a/cores/core.go b/cores/core.go index ba2360e50..75d876476 100644 --- a/cores/core.go +++ b/cores/core.go @@ -136,7 +136,14 @@ type MemoryProfilingParams struct { DirPath string // directory path where memory profiles will be saved Interval time.Duration // duration between consecutive memory profile captures MaxFiles int // maximum number of profile files to retain - APIOpts map[string]any + + // UseTimestamp determines if the filename includes a timestamp. + // The format is 'mem_20060102150405[_].prof'. + // Microseconds are included if the interval is less than one second to avoid duplicate names. + // If false, filenames follow an incremental format: 'mem_.prof'. + UseTimestamp bool + + APIOpts map[string]any } // StartMemoryProfiling starts memory profiling in the specified directory. @@ -171,16 +178,38 @@ func (cS *CoreService) StartMemoryProfiling(params MemoryProfilingParams) error return nil } +// newMemProfNameFunc returns a closure that generates memory profile filenames. +func newMemProfNameFunc(interval time.Duration, useTimestamp bool) func() string { + if !useTimestamp { + i := 0 + return func() string { + i++ + return fmt.Sprintf("mem_%d.prof", i) + } + } + if interval < time.Second { + return func() string { + now := time.Now() + return fmt.Sprintf("mem_%s_%d.prof", now.Format("20060102150405"), now.Nanosecond()/1e3) + } + } + + return func() string { + return fmt.Sprintf("mem_%s.prof", time.Now().Format("20060102150405")) + } +} + // profileMemory runs the memory profiling loop, writing profiles to files at the specified interval. func (cS *CoreService) profileMemory(params MemoryProfilingParams) { defer cS.shdWg.Done() + fileName := newMemProfNameFunc(params.Interval, params.UseTimestamp) ticker := time.NewTicker(params.Interval) defer ticker.Stop() files := make([]string, 0, params.MaxFiles) for { select { case <-ticker.C: - path := filepath.Join(params.DirPath, fmt.Sprintf("mem_%s.prof", time.Now().Format("20060102150405"))) // mem20060102150405.prof + path := filepath.Join(params.DirPath, fileName()) if err := writeHeapProfile(path); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> %v", utils.CoreS, err)) cS.StopMemoryProfiling() diff --git a/docs/cgr-engine.rst b/docs/cgr-engine.rst index 6be45a08b..890dc41d1 100644 --- a/docs/cgr-engine.rst +++ b/docs/cgr-engine.rst @@ -30,6 +30,8 @@ Able to read the configuration from either a local directory of *.json* files w Interval between memory profile saves (default 15s) -memprof_maxfiles int Number of memory profiles to keep (most recent) (default 1) + -memprof_timestamp + Add timestamp to memory profile files -node_id string Node ID of the engine -pid string diff --git a/utils/consts.go b/utils/consts.go index eb3da55a6..8d701781d 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2886,6 +2886,7 @@ const ( MemProfDirCgr = "memprof_dir" MemProfIntervalCgr = "memprof_interval" MemProfMaxFilesCgr = "memprof_maxfiles" + MemProfTimestampCgr = "memprof_timestamp" ScheduledShutdownCgr = "scheduled_shutdown" SingleCpuCgr = "singlecpu" PreloadCgr = "preload"