Revise memory profiling implementation

- merge StopChanMemProf with StopMemoryProfiling
- remove fileMEM and stopMemProf from struct and constructors
- add separate mutex for memory profiling, ensure thread safety
- handle all significant errors
- log error if StopMemoryProfiling fails during CoreS Shutdown
- ignore errors if profiling inactive in Shutdown and deferred Stop
- move validations inside V1 functions
- return error if StartMemoryProfiling already started
- return error if StopMemoryProfiling already stopped or never started
- close profiling loop on error, not the cgr-engine
- StopMemoryProfiling closes channel and profiling loop writes final profile
- rename Path to DirPath for mandatory field error
- rename memprof_nrfiles flag to memprof_maxfiles
- increase default memprof_interval
- consider MaxFiles <= 0 as unlimited
- move memory profiling logic after starting services
- use CoreService Start/StopMemoryProfiling in main
- remove final memory profile block (created by deferred Stop)
- convert MemProfiling to method on CoreService and rename to profileMemory
- use Ticker for recurrent actions instead of Timer
- compute mem_final.prof full path in StartMemoryProfiling
- suffix profile files with current time instead of numbers
- update dispatcher methods after changes
- move MemoryPrf from utils to cores, rename to MemoryProfilingParams
- add logs for starting/stopping profiling
This commit is contained in:
ionutboangiu
2024-07-22 08:20:26 +03:00
committed by Dan Christian Bogos
parent 9d4561f79c
commit 1c490a9020
14 changed files with 271 additions and 239 deletions

View File

@@ -24,6 +24,7 @@ import (
"io"
"os"
"path"
"path/filepath"
"runtime"
"runtime/pprof"
"sync"
@@ -35,34 +36,34 @@ import (
"github.com/cgrates/cgrates/utils"
)
func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, fileCPU io.Closer, fileMem string, stopChan chan struct{},
shdWg *sync.WaitGroup, stopMemPrf chan struct{}, shdChan *utils.SyncedChan) *CoreService {
func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, fileCPU io.Closer, stopChan chan struct{},
shdWg *sync.WaitGroup, shdChan *utils.SyncedChan) *CoreService {
var st *engine.CapsStats
if caps.IsLimited() && cfg.CoreSCfg().CapsStatsInterval != 0 {
st = engine.NewCapsStats(cfg.CoreSCfg().CapsStatsInterval, caps, stopChan)
}
return &CoreService{
shdWg: shdWg,
stopMemPrf: stopMemPrf,
shdChan: shdChan,
cfg: cfg,
CapsStats: st,
fileCPU: fileCPU,
fileMEM: fileMem,
caps: caps,
shdWg: shdWg,
shdChan: shdChan,
cfg: cfg,
CapsStats: st,
fileCPU: fileCPU,
caps: caps,
}
}
type CoreService struct {
cfg *config.CGRConfig
CapsStats *engine.CapsStats
shdWg *sync.WaitGroup
stopMemPrf chan struct{}
shdChan *utils.SyncedChan
fileMEM string
cfg *config.CGRConfig
CapsStats *engine.CapsStats
shdWg *sync.WaitGroup
shdChan *utils.SyncedChan
fileMux sync.Mutex
fileCPU io.Closer
memProfMux sync.Mutex
finalMemProf string // full path of the final memory profile created on stop/shutdown
stopMemProf chan struct{} // signal end of memory profiling
fileCPUMux sync.Mutex
fileCPU io.Closer
caps *engine.Caps
}
@@ -70,18 +71,19 @@ type CoreService struct {
// Shutdown is called to shutdown the service
func (cS *CoreService) Shutdown() {
utils.Logger.Info(fmt.Sprintf("<%s> shutdown initialized", utils.CoreS))
cS.StopChanMemProf()
cS.StopMemoryProfiling() // safe to ignore error (irrelevant)
utils.Logger.Info(fmt.Sprintf("<%s> shutdown complete", utils.CoreS))
}
// StopChanMemProf will stop the MemoryProfiling Channel in order to create
// the final MemoryProfiling when CoreS subsystem will stop.
func (cS *CoreService) StopChanMemProf() {
if cS.stopMemPrf != nil {
MemProfFile(cS.fileMEM)
close(cS.stopMemPrf)
cS.stopMemPrf = nil
// StartCPUProfiling starts CPU profiling and saves the profile to the specified path.
func (cS *CoreService) StartCPUProfiling(path string) (err error) {
if path == utils.EmptyString {
return utils.NewErrMandatoryIeMissing("DirPath")
}
cS.fileCPUMux.Lock()
defer cS.fileCPUMux.Unlock()
cS.fileCPU, err = StartCPUProfiling(path)
return
}
// StartCPUProfiling creates a file and passes it to pprof.StartCPUProfile. It returns the file
@@ -101,42 +103,139 @@ func StartCPUProfiling(path string) (io.Closer, error) {
return f, nil
}
func MemProfFile(memProfPath string) bool {
f, err := os.Create(memProfPath)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<memProfile>could not create memory profile file: %s", err))
return false
// StopCPUProfiling stops CPU profiling and closes the profile file.
func (cS *CoreService) StopCPUProfiling() error {
cS.fileCPUMux.Lock()
defer cS.fileCPUMux.Unlock()
pprof.StopCPUProfile()
if cS.fileCPU == nil {
return errors.New("stop CPU profiling: not started yet")
}
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
utils.Logger.Crit(fmt.Sprintf("<memProfile>could not write memory profile: %s", err))
f.Close()
return false
if err := cS.fileCPU.Close(); err != nil {
if errors.Is(err, os.ErrClosed) {
return errors.New("stop CPU profiling: already stopped")
}
return fmt.Errorf("could not close profile file: %v", err)
}
f.Close()
return true
return nil
}
func MemProfiling(memProfDir string, interval time.Duration, nrFiles int, shdWg *sync.WaitGroup, stopChan chan struct{}, shdChan *utils.SyncedChan) {
tm := time.NewTimer(interval)
for i := 1; ; i++ {
select {
case <-stopChan:
tm.Stop()
shdWg.Done()
return
case <-tm.C:
}
if !MemProfFile(path.Join(memProfDir, fmt.Sprintf("mem%v.prof", i))) {
shdChan.CloseOnce()
shdWg.Done()
return
}
if i%nrFiles == 0 {
i = 0 // reset the counting
}
tm.Reset(interval)
// MemoryProfilingParams represents the parameters for memory profiling.
type MemoryProfilingParams struct {
Tenant string
DirPath string // directory path where memory profiles will be saved
Interval time.Duration // duration between consecutive memory profile captures
MaxFiles int // maximum number of profile files to retain
APIOpts map[string]any
}
// StartMemoryProfiling starts memory profiling in the specified directory.
func (cS *CoreService) StartMemoryProfiling(params MemoryProfilingParams) error {
if params.Interval <= 0 {
params.Interval = 15 * time.Second
}
if params.MaxFiles < 0 {
// consider any negative number to mean unlimited files
params.MaxFiles = 0
}
cS.memProfMux.Lock()
defer cS.memProfMux.Unlock()
// Check if profiling is already started.
select {
case <-cS.stopMemProf: // triggered only on channel closed
default:
if cS.stopMemProf != nil {
// stopMemProf being not closed and different from nil means that the profiling loop is already active.
return errors.New("start memory profiling: already started")
}
}
utils.Logger.Info(fmt.Sprintf(
"<%s> starting memory profiling loop, writing to directory %q", utils.CoreS, params.DirPath))
cS.stopMemProf = make(chan struct{})
cS.finalMemProf = filepath.Join(params.DirPath, utils.MemProfFinalFile)
cS.shdWg.Add(1)
go cS.profileMemory(params)
return nil
}
// profileMemory runs the memory profiling loop, writing profiles to files at the specified interval.
func (cS *CoreService) profileMemory(params MemoryProfilingParams) {
defer cS.shdWg.Done()
ticker := time.NewTicker(params.Interval)
defer ticker.Stop()
files := make([]string, 0, params.MaxFiles)
for {
select {
case <-ticker.C:
path := filepath.Join(params.DirPath, fmt.Sprintf("mem_%s.prof", time.Now().Format("20060102150405"))) // mem20060102150405.prof
if err := writeHeapProfile(path); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> %v", utils.CoreS, err))
cS.StopMemoryProfiling()
}
if params.MaxFiles == 0 {
// no file limit
continue
}
if len(files) == params.MaxFiles {
oldest := files[0]
utils.Logger.Info(fmt.Sprintf("<%s> removing old heap profile file %q", utils.CoreS, oldest))
files = files[1:] // remove oldest file from the list
if err := os.Remove(oldest); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> %v", utils.CoreS, err))
}
}
files = append(files, path)
case <-cS.stopMemProf:
if err := writeHeapProfile(cS.finalMemProf); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> %v", utils.CoreS, err))
}
return
}
}
}
// writeHeapProfile writes the heap profile to the specified path.
func writeHeapProfile(path string) error {
f, err := os.Create(path)
if err != nil {
return fmt.Errorf("could not create memory profile: %v", err)
}
defer func() {
if err := f.Close(); err != nil {
utils.Logger.Warning(fmt.Sprintf(
"<%s> could not close file %q: %v", utils.CoreS, f.Name(), err))
}
}()
utils.Logger.Info(fmt.Sprintf("<%s> writing heap profile to %q", utils.CoreS, path))
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
return fmt.Errorf("could not write memory profile: %v", err)
}
return nil
}
// StopMemoryProfiling stops memory profiling.
func (cS *CoreService) StopMemoryProfiling() error {
cS.memProfMux.Lock()
defer cS.memProfMux.Unlock()
// Check if profiling is already stopped to prevent a channel close panic.
select {
case <-cS.stopMemProf: // triggered only on channel closed
return errors.New("stop memory profiling: already stopped")
default: // prevents blocking
if cS.stopMemProf == nil {
// stopMemProf being nil means that StartMemoryProfiling has never been called. There is nothing to stop.
return errors.New("stop memory profiling: not started yet")
}
}
utils.Logger.Info(fmt.Sprintf("<%s> stopping memory profiling loop", utils.CoreS))
close(cS.stopMemProf)
return nil
}
// V1Status returns the status of the engine
@@ -163,65 +262,6 @@ func (cS *CoreService) V1Status(_ *context.Context, _ *utils.TenantWithAPIOpts,
return
}
// StartCPUProfiling starts CPU profiling and saves the profile to the specified path.
func (cS *CoreService) StartCPUProfiling(path string) (err error) {
if path == utils.EmptyString {
return utils.NewErrMandatoryIeMissing("DirPath")
}
cS.fileMux.Lock()
defer cS.fileMux.Unlock()
cS.fileCPU, err = StartCPUProfiling(path)
return
}
// StopCPUProfiling stops CPU profiling and closes the profile file.
func (cS *CoreService) StopCPUProfiling() error {
cS.fileMux.Lock()
defer cS.fileMux.Unlock()
pprof.StopCPUProfile()
if cS.fileCPU == nil {
return errors.New("CPU profiling has not been started")
}
if err := cS.fileCPU.Close(); err != nil {
if errors.Is(err, os.ErrClosed) {
return errors.New("CPU profiling has already been stopped")
}
return fmt.Errorf("could not close profile file: %v", err)
}
return nil
}
// StartMemoryProfiling is used to start MemoryProfiling in the given path
func (cS *CoreService) StartMemoryProfiling(args *utils.MemoryPrf) (err error) {
if args.DirPath == utils.EmptyString {
return utils.NewErrMandatoryIeMissing("Path")
}
if cS.stopMemPrf != nil {
return errors.New("Memory Profiling already started")
}
if args.Interval <= 0 {
args.Interval = 5 * time.Second
}
if args.NrFiles == 0 {
args.NrFiles = 1
}
cS.shdWg.Add(1)
cS.stopMemPrf = make(chan struct{})
cS.fileMEM = args.DirPath
go MemProfiling(args.DirPath, args.Interval, args.NrFiles, cS.shdWg, cS.stopMemPrf, cS.shdChan)
return
}
// StopMemoryProfiling is used to stop MemoryProfiling
func (cS *CoreService) StopMemoryProfiling() (err error) {
if cS.stopMemPrf == nil {
return errors.New(" Memory Profiling is not started")
}
cS.fileMEM = path.Join(cS.fileMEM, utils.MemProfFileCgr)
cS.StopChanMemProf()
return
}
// Sleep is used to test the concurrent requests mechanism
func (cS *CoreService) V1Sleep(_ *context.Context, arg *utils.DurationArgs, reply *string) error {
time.Sleep(arg.Duration)
@@ -230,6 +270,7 @@ func (cS *CoreService) V1Sleep(_ *context.Context, arg *utils.DurationArgs, repl
}
// StartCPUProfiling is used to start CPUProfiling in the given path
// V1StartCPUProfiling starts CPU profiling and saves the profile to the specified path.
func (cS *CoreService) V1StartCPUProfiling(_ *context.Context, args *utils.DirectoryArgs, reply *string) error {
if err := cS.StartCPUProfiling(path.Join(args.DirPath, utils.CpuPathCgr)); err != nil {
return err
@@ -248,18 +289,20 @@ func (cS *CoreService) V1StopCPUProfiling(_ *context.Context, _ *utils.TenantWit
return nil
}
// StartMemoryProfiling is used to start MemoryProfiling in the given path
func (cS *CoreService) V1StartMemoryProfiling(_ *context.Context, args *utils.MemoryPrf, reply *string) error {
if err := cS.StartMemoryProfiling(args); err != nil {
// V1StartMemoryProfiling starts memory profiling in the specified directory.
func (cS *CoreService) V1StartMemoryProfiling(_ *context.Context, params MemoryProfilingParams, reply *string) error {
if params.DirPath == utils.EmptyString {
return utils.NewErrMandatoryIeMissing("DirPath")
}
if err := cS.StartMemoryProfiling(params); err != nil {
return err
}
*reply = utils.OK
return nil
}
// V1StopMemoryProfiling is used to stop MemoryProfiling. The file should be written on the path
// where the MemoryProfiling already started
func (cS *CoreService) V1StopMemoryProfiling(_ *context.Context, _ *utils.TenantWithAPIOpts, reply *string) error {
// V1StopMemoryProfiling stops memory profiling.
func (cS *CoreService) V1StopMemoryProfiling(_ *context.Context, _ utils.TenantWithAPIOpts, reply *string) error {
if err := cS.StopMemoryProfiling(); err != nil {
return err
}

View File

@@ -39,18 +39,15 @@ func TestNewCoreService(t *testing.T) {
sts := engine.NewCapsStats(cfgDflt.CoreSCfg().CapsStatsInterval, caps, stopchan)
shdWg := new(sync.WaitGroup)
shdChan := utils.NewSyncedChan()
stopMemPrf := make(chan struct{})
expected := &CoreService{
fileMEM: "/tmp",
shdWg: shdWg,
shdChan: shdChan,
stopMemPrf: stopMemPrf,
cfg: cfgDflt,
CapsStats: sts,
caps: caps,
shdWg: shdWg,
shdChan: shdChan,
cfg: cfgDflt,
CapsStats: sts,
caps: caps,
}
rcv := NewCoreService(cfgDflt, caps, nil, "/tmp", stopchan, shdWg, stopMemPrf, shdChan)
rcv := NewCoreService(cfgDflt, caps, nil, stopchan, shdWg, shdChan)
if !reflect.DeepEqual(expected, rcv) {
t.Errorf("Expected %+v, received %+v", expected, rcv)
}
@@ -65,7 +62,7 @@ func TestCoreServiceStatus(t *testing.T) {
caps := engine.NewCaps(1, utils.MetaBusy)
stopChan := make(chan struct{}, 1)
cores := NewCoreService(cfgDflt, caps, nil, "/tmp", stopChan, nil, nil, nil)
cores := NewCoreService(cfgDflt, caps, nil, stopChan, nil, nil)
args := &utils.TenantWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]any{},