From 1c490a902022d4e1aad348c567c2d8f3a9cf2d8f Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Mon, 22 Jul 2024 08:20:26 +0300 Subject: [PATCH] Revise memory profiling implementation - merge StopChanMemProf with StopMemoryProfiling - remove fileMEM and stopMemProf from struct and constructors - add separate mutex for memory profiling, ensure thread safety - handle all significant errors - log error if StopMemoryProfiling fails during CoreS Shutdown - ignore errors if profiling inactive in Shutdown and deferred Stop - move validations inside V1 functions - return error if StartMemoryProfiling already started - return error if StopMemoryProfiling already stopped or never started - close profiling loop on error, not the cgr-engine - StopMemoryProfiling closes channel and profiling loop writes final profile - rename Path to DirPath for mandatory field error - rename memprof_nrfiles flag to memprof_maxfiles - increase default memprof_interval - consider MaxFiles <= 0 as unlimited - move memory profiling logic after starting services - use CoreService Start/StopMemoryProfiling in main - remove final memory profile block (created by deferred Stop) - convert MemProfiling to method on CoreService and rename to profileMemory - use Ticker for recurrent actions instead of Timer - compute mem_final.prof full path in StartMemoryProfiling - suffix profile files with current time instead of numbers - update dispatcher methods after changes - move MemoryPrf from utils to cores, rename to MemoryProfilingParams - add logs for starting/stopping profiling --- apier/v1/core.go | 15 +- apier/v1/core_it_test.go | 15 +- apier/v1/dispatcher.go | 9 +- cmd/cgr-engine/cgr-engine.go | 38 ++-- cmd/cgr-engine/cgr-engine_flags_test.go | 8 +- cores/core.go | 287 ++++++++++++++---------- cores/core_test.go | 17 +- dispatchers/cores.go | 50 +++-- docs/cgr-engine.rst | 5 +- services/cores.go | 50 ++--- services/cores_it_test.go | 2 +- services/cores_test.go | 2 +- utils/consts.go | 4 +- utils/coreutils.go | 8 - 14 files changed, 271 insertions(+), 239 deletions(-) diff --git a/apier/v1/core.go b/apier/v1/core.go index 70d5ded8a..038e1818f 100644 --- a/apier/v1/core.go +++ b/apier/v1/core.go @@ -44,7 +44,7 @@ func (cS *CoreSv1) Status(ctx *context.Context, arg *utils.TenantWithAPIOpts, re } // Ping used to determinate if component is active -func (cS *CoreSv1) Ping(ctx *context.Context, ign *utils.CGREvent, reply *string) error { +func (cS *CoreSv1) Ping(ctx *context.Context, _ *utils.CGREvent, reply *string) error { *reply = utils.Pong return nil } @@ -65,15 +65,14 @@ func (cS *CoreSv1) StopCPUProfiling(ctx *context.Context, args *utils.TenantWith return cS.cS.V1StopCPUProfiling(ctx, args, reply) } -// StartMemoryProfiling is used to start MemoryProfiling in the given path -func (cS *CoreSv1) StartMemoryProfiling(ctx *context.Context, args *utils.MemoryPrf, reply *string) error { - return cS.cS.V1StartMemoryProfiling(ctx, args, reply) +// StartMemoryProfiling starts memory profiling in the specified directory. +func (cS *CoreSv1) StartMemoryProfiling(ctx *context.Context, params cores.MemoryProfilingParams, reply *string) error { + return cS.cS.V1StartMemoryProfiling(ctx, params, reply) } -// StopMemoryProfiling is used to stop MemoryProfiling. The file should be written on the path -// where the MemoryProfiling already started -func (cS *CoreSv1) StopMemoryProfiling(ctx *context.Context, args *utils.TenantWithAPIOpts, reply *string) error { - return cS.cS.V1StopMemoryProfiling(ctx, args, reply) +// StopMemoryProfiling stops memory profiling and writes the final profile. +func (cS *CoreSv1) StopMemoryProfiling(ctx *context.Context, params utils.TenantWithAPIOpts, reply *string) error { + return cS.cS.V1StopMemoryProfiling(ctx, params, reply) } diff --git a/apier/v1/core_it_test.go b/apier/v1/core_it_test.go index db3ba9e77..a47b0a906 100644 --- a/apier/v1/core_it_test.go +++ b/apier/v1/core_it_test.go @@ -32,6 +32,7 @@ import ( "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/birpc/jsonrpc" + "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -181,7 +182,7 @@ func testCoreSv1StopMemProfilingBeforeStart(t *testing.T) { func testCoreSv1StartEngineByExecWIthMemProfiling(t *testing.T) { engine := exec.Command("cgr-engine", "-config_path", coreV1CfgPath, - "-memprof_dir", argPath, "-memprof_interval", "100ms", "-memprof_nrfiles", "2") + "-memprof_dir", argPath, "-memprof_interval", "100ms", "-memprof_maxfiles", "2") if err := engine.Start(); err != nil { t.Error(err) } @@ -235,7 +236,7 @@ func testCoreSv1StopMemoryProfiling(t *testing.T) { func testCoreSv1CheckFinalMemProfiling(t *testing.T) { // as the engine was killed, mem_final.prof was created and we must check it - file, err := os.Open(path.Join(argPath, fmt.Sprintf(utils.MemProfFileCgr))) + file, err := os.Open(path.Join(argPath, fmt.Sprintf(utils.MemProfFinalFile))) if err != nil { t.Error(err) } @@ -249,17 +250,17 @@ func testCoreSv1CheckFinalMemProfiling(t *testing.T) { t.Errorf("Size of MemoryProfile %v is lower that expected", size.Size()) } //after we checked that CPUProfile was made successfully, can delete it - if err := os.Remove(path.Join(argPath, fmt.Sprintf(utils.MemProfFileCgr))); err != nil { + if err := os.Remove(path.Join(argPath, fmt.Sprintf(utils.MemProfFinalFile))); err != nil { t.Error(err) } } func testCoreSv1StartMemProfilingErrorAlreadyStarted(t *testing.T) { var reply string - args := &utils.MemoryPrf{ + args := &cores.MemoryProfilingParams{ DirPath: argPath, Interval: 100 * time.Millisecond, - NrFiles: 2, + MaxFiles: 2, } expErr := "Memory Profiling already started" if err := coreV1Rpc.Call(context.Background(), utils.CoreSv1StartMemoryProfiling, @@ -333,10 +334,10 @@ func testCoreSv1StopCPUProfiling(t *testing.T) { func testCoreSv1StartMemoryProfiling(t *testing.T) { var reply string - args := &utils.MemoryPrf{ + args := cores.MemoryProfilingParams{ DirPath: argPath, Interval: 100 * time.Millisecond, - NrFiles: 2, + MaxFiles: 2, } if err := coreV1Rpc.Call(context.Background(), utils.CoreSv1StartMemoryProfiling, args, &reply); err != nil { diff --git a/apier/v1/dispatcher.go b/apier/v1/dispatcher.go index e229984c4..8a3cdf7de 100644 --- a/apier/v1/dispatcher.go +++ b/apier/v1/dispatcher.go @@ -24,6 +24,7 @@ import ( "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/dispatchers" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/ers" @@ -959,12 +960,12 @@ func (dS *DispatcherCoreSv1) StopCPUProfiling(ctx *context.Context, args *utils. return dS.dS.CoreSv1StopCPUProfiling(ctx, args, reply) } -func (dS *DispatcherCoreSv1) StartMemoryProfiling(ctx *context.Context, args *utils.MemoryPrf, reply *string) error { - return dS.dS.CoreSv1StartMemoryProfiling(ctx, args, reply) +func (dS *DispatcherCoreSv1) StartMemoryProfiling(ctx *context.Context, params cores.MemoryProfilingParams, reply *string) error { + return dS.dS.CoreSv1StartMemoryProfiling(ctx, params, reply) } -func (dS *DispatcherCoreSv1) StopMemoryProfiling(ctx *context.Context, args *utils.TenantWithAPIOpts, reply *string) error { - return dS.dS.CoreSv1StopMemoryProfiling(ctx, args, reply) +func (dS *DispatcherCoreSv1) StopMemoryProfiling(ctx *context.Context, params utils.TenantWithAPIOpts, reply *string) error { + return dS.dS.CoreSv1StopMemoryProfiling(ctx, params, reply) } func (dS *DispatcherCoreSv1) Panic(ctx *context.Context, args *utils.PanicMessageArgs, reply *string) error { diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index a9f9427c8..82d6499d2 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -25,7 +25,6 @@ import ( "log" "os" "os/signal" - "path" "path/filepath" "runtime" "runtime/pprof" @@ -59,8 +58,8 @@ var ( httpPprof = cgrEngineFlags.Bool(utils.HttpPprofCgr, false, "Enable HTTP pprof profiling") cpuProfDir = cgrEngineFlags.String(utils.CpuProfDirCgr, utils.EmptyString, "Directory for CPU profiles") memProfDir = cgrEngineFlags.String(utils.MemProfDirCgr, utils.EmptyString, "Directory for memory profiles") - memProfInterval = cgrEngineFlags.Duration(utils.MemProfIntervalCgr, 5*time.Second, "Interval between memory profile saves") - memProfNrFiles = cgrEngineFlags.Int(utils.MemProfNrFilesCgr, 1, "Number of memory profiles to keep (most recent)") + memProfInterval = cgrEngineFlags.Duration(utils.MemProfIntervalCgr, 15*time.Second, "Interval between memory profile saves") + memProfMaxFiles = cgrEngineFlags.Int(utils.MemProfMaxFilesCgr, 1, "Number of memory profiles to keep (most recent)") scheduledShutdown = cgrEngineFlags.Duration(utils.ScheduledShutdownCgr, 0, "Shutdown the engine after the specified duration") singleCPU = cgrEngineFlags.Bool(utils.SingleCpuCgr, false, "Run on a single CPU core") syslogger = cgrEngineFlags.String(utils.LoggerCfg, utils.EmptyString, "Logger type <*syslog|*stdout>") @@ -355,20 +354,6 @@ func main() { go singnalHandler(shdWg, shdChan) var cS *cores.CoreService - var stopMemProf chan struct{} - var memPrfDirForCores string - if *memProfDir != utils.EmptyString { - shdWg.Add(1) - stopMemProf = make(chan struct{}) - memPrfDirForCores = *memProfDir - go cores.MemProfiling(*memProfDir, *memProfInterval, *memProfNrFiles, shdWg, stopMemProf, shdChan) - defer func() { - if cS == nil { - close(stopMemProf) - } - }() - } - var cpuProf io.Closer if *cpuProfDir != utils.EmptyString { cpuPath := filepath.Join(*cpuProfDir, utils.CpuPathCgr) @@ -409,7 +394,6 @@ func main() { cfg, err = config.NewCGRConfigFromPath(*cfgPath) if err != nil { log.Fatalf("Could not parse config: <%s>", err.Error()) - return } if *nodeID != utils.EmptyString { @@ -422,7 +406,6 @@ func main() { if utils.Logger, err = utils.Newlogger(utils.FirstNonEmpty(*syslogger, cfg.GeneralCfg().Logger), cfg.GeneralCfg().NodeID); err != nil { log.Fatalf("Could not initialize syslog connection, err: <%s>", err.Error()) - return } lgLevel := cfg.GeneralCfg().LogLevel if *logLevel != -1 { // Modify the log level if provided by command arguments @@ -580,7 +563,7 @@ func main() { // init CoreSv1 - coreS := services.NewCoreService(cfg, caps, server, internalCoreSv1Chan, anz, cpuProf, *memProfDir, shdWg, stopMemProf, shdChan, srvDep) + coreS := services.NewCoreService(cfg, caps, server, internalCoreSv1Chan, anz, cpuProf, shdWg, shdChan, srvDep) shdWg.Add(1) if err := coreS.Start(); err != nil { log.Fatalf("<%s> error received: <%s>, exiting!", utils.InitS, err.Error()) @@ -710,6 +693,18 @@ func main() { internalDispatcherSChan, internalLoaderSChan, internalRALsChan, internalCacheSChan, internalEEsChan, internalERsChan, shdChan) + if *memProfDir != utils.EmptyString { + if err := cS.StartMemoryProfiling(cores.MemoryProfilingParams{ + DirPath: *memProfDir, + Interval: *memProfInterval, + MaxFiles: *memProfMaxFiles, + }); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> %v", utils.CoreS, err)) + return + } + defer cS.StopMemoryProfiling() // safe to ignore error (irrelevant) + } + <-shdChan.Done() shtdDone := make(chan struct{}) go func() { @@ -723,9 +718,6 @@ func main() { utils.ServiceManager)) } - if *memProfDir != utils.EmptyString { // write last memory profiling - cores.MemProfFile(path.Join(*memProfDir, utils.MemProfFileCgr)) - } if *pidFile != utils.EmptyString { if err := os.Remove(*pidFile); err != nil { utils.Logger.Warning("Could not remove pid file: " + err.Error()) diff --git a/cmd/cgr-engine/cgr-engine_flags_test.go b/cmd/cgr-engine/cgr-engine_flags_test.go index 7c551dd6e..109dbec85 100644 --- a/cmd/cgr-engine/cgr-engine_flags_test.go +++ b/cmd/cgr-engine/cgr-engine_flags_test.go @@ -88,13 +88,13 @@ func TestCgrEngineFlags(t *testing.T) { name: "memProfInterval", flags: []string{"-memprof_interval", "1s"}, flagVar: memProfInterval, - defaultVal: 5 * time.Second, + defaultVal: 15 * time.Second, want: time.Second, }, { - name: "memProfNrFiles", - flags: []string{"-memprof_nrfiles", "3"}, - flagVar: memProfNrFiles, + name: "memProfMaxFiles", + flags: []string{"-memprof_maxfiles", "3"}, + flagVar: memProfMaxFiles, defaultVal: 1, want: 3, }, diff --git a/cores/core.go b/cores/core.go index de61d076e..c6abbaba9 100644 --- a/cores/core.go +++ b/cores/core.go @@ -24,6 +24,7 @@ import ( "io" "os" "path" + "path/filepath" "runtime" "runtime/pprof" "sync" @@ -35,34 +36,34 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, fileCPU io.Closer, fileMem string, stopChan chan struct{}, - shdWg *sync.WaitGroup, stopMemPrf chan struct{}, shdChan *utils.SyncedChan) *CoreService { +func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, fileCPU io.Closer, stopChan chan struct{}, + shdWg *sync.WaitGroup, shdChan *utils.SyncedChan) *CoreService { var st *engine.CapsStats if caps.IsLimited() && cfg.CoreSCfg().CapsStatsInterval != 0 { st = engine.NewCapsStats(cfg.CoreSCfg().CapsStatsInterval, caps, stopChan) } return &CoreService{ - shdWg: shdWg, - stopMemPrf: stopMemPrf, - shdChan: shdChan, - cfg: cfg, - CapsStats: st, - fileCPU: fileCPU, - fileMEM: fileMem, - caps: caps, + shdWg: shdWg, + shdChan: shdChan, + cfg: cfg, + CapsStats: st, + fileCPU: fileCPU, + caps: caps, } } type CoreService struct { - cfg *config.CGRConfig - CapsStats *engine.CapsStats - shdWg *sync.WaitGroup - stopMemPrf chan struct{} - shdChan *utils.SyncedChan - fileMEM string + cfg *config.CGRConfig + CapsStats *engine.CapsStats + shdWg *sync.WaitGroup + shdChan *utils.SyncedChan - fileMux sync.Mutex - fileCPU io.Closer + memProfMux sync.Mutex + finalMemProf string // full path of the final memory profile created on stop/shutdown + stopMemProf chan struct{} // signal end of memory profiling + + fileCPUMux sync.Mutex + fileCPU io.Closer caps *engine.Caps } @@ -70,18 +71,19 @@ type CoreService struct { // Shutdown is called to shutdown the service func (cS *CoreService) Shutdown() { utils.Logger.Info(fmt.Sprintf("<%s> shutdown initialized", utils.CoreS)) - cS.StopChanMemProf() + cS.StopMemoryProfiling() // safe to ignore error (irrelevant) utils.Logger.Info(fmt.Sprintf("<%s> shutdown complete", utils.CoreS)) } -// StopChanMemProf will stop the MemoryProfiling Channel in order to create -// the final MemoryProfiling when CoreS subsystem will stop. -func (cS *CoreService) StopChanMemProf() { - if cS.stopMemPrf != nil { - MemProfFile(cS.fileMEM) - close(cS.stopMemPrf) - cS.stopMemPrf = nil +// StartCPUProfiling starts CPU profiling and saves the profile to the specified path. +func (cS *CoreService) StartCPUProfiling(path string) (err error) { + if path == utils.EmptyString { + return utils.NewErrMandatoryIeMissing("DirPath") } + cS.fileCPUMux.Lock() + defer cS.fileCPUMux.Unlock() + cS.fileCPU, err = StartCPUProfiling(path) + return } // StartCPUProfiling creates a file and passes it to pprof.StartCPUProfile. It returns the file @@ -101,42 +103,139 @@ func StartCPUProfiling(path string) (io.Closer, error) { return f, nil } -func MemProfFile(memProfPath string) bool { - f, err := os.Create(memProfPath) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("could not create memory profile file: %s", err)) - return false +// StopCPUProfiling stops CPU profiling and closes the profile file. +func (cS *CoreService) StopCPUProfiling() error { + cS.fileCPUMux.Lock() + defer cS.fileCPUMux.Unlock() + pprof.StopCPUProfile() + if cS.fileCPU == nil { + return errors.New("stop CPU profiling: not started yet") } - runtime.GC() // get up-to-date statistics - if err := pprof.WriteHeapProfile(f); err != nil { - utils.Logger.Crit(fmt.Sprintf("could not write memory profile: %s", err)) - f.Close() - return false + if err := cS.fileCPU.Close(); err != nil { + if errors.Is(err, os.ErrClosed) { + return errors.New("stop CPU profiling: already stopped") + } + return fmt.Errorf("could not close profile file: %v", err) } - f.Close() - return true + return nil } -func MemProfiling(memProfDir string, interval time.Duration, nrFiles int, shdWg *sync.WaitGroup, stopChan chan struct{}, shdChan *utils.SyncedChan) { - tm := time.NewTimer(interval) - for i := 1; ; i++ { - select { - case <-stopChan: - tm.Stop() - shdWg.Done() - return - case <-tm.C: - } - if !MemProfFile(path.Join(memProfDir, fmt.Sprintf("mem%v.prof", i))) { - shdChan.CloseOnce() - shdWg.Done() - return - } - if i%nrFiles == 0 { - i = 0 // reset the counting - } - tm.Reset(interval) +// MemoryProfilingParams represents the parameters for memory profiling. +type MemoryProfilingParams struct { + Tenant string + DirPath string // directory path where memory profiles will be saved + Interval time.Duration // duration between consecutive memory profile captures + MaxFiles int // maximum number of profile files to retain + APIOpts map[string]any +} + +// StartMemoryProfiling starts memory profiling in the specified directory. +func (cS *CoreService) StartMemoryProfiling(params MemoryProfilingParams) error { + if params.Interval <= 0 { + params.Interval = 15 * time.Second } + if params.MaxFiles < 0 { + // consider any negative number to mean unlimited files + params.MaxFiles = 0 + } + + cS.memProfMux.Lock() + defer cS.memProfMux.Unlock() + + // Check if profiling is already started. + select { + case <-cS.stopMemProf: // triggered only on channel closed + default: + if cS.stopMemProf != nil { + // stopMemProf being not closed and different from nil means that the profiling loop is already active. + return errors.New("start memory profiling: already started") + } + } + + utils.Logger.Info(fmt.Sprintf( + "<%s> starting memory profiling loop, writing to directory %q", utils.CoreS, params.DirPath)) + cS.stopMemProf = make(chan struct{}) + cS.finalMemProf = filepath.Join(params.DirPath, utils.MemProfFinalFile) + cS.shdWg.Add(1) + go cS.profileMemory(params) + return nil +} + +// profileMemory runs the memory profiling loop, writing profiles to files at the specified interval. +func (cS *CoreService) profileMemory(params MemoryProfilingParams) { + defer cS.shdWg.Done() + ticker := time.NewTicker(params.Interval) + defer ticker.Stop() + files := make([]string, 0, params.MaxFiles) + for { + select { + case <-ticker.C: + path := filepath.Join(params.DirPath, fmt.Sprintf("mem_%s.prof", time.Now().Format("20060102150405"))) // mem20060102150405.prof + if err := writeHeapProfile(path); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> %v", utils.CoreS, err)) + cS.StopMemoryProfiling() + } + if params.MaxFiles == 0 { + // no file limit + continue + } + if len(files) == params.MaxFiles { + oldest := files[0] + utils.Logger.Info(fmt.Sprintf("<%s> removing old heap profile file %q", utils.CoreS, oldest)) + files = files[1:] // remove oldest file from the list + if err := os.Remove(oldest); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> %v", utils.CoreS, err)) + } + } + files = append(files, path) + case <-cS.stopMemProf: + if err := writeHeapProfile(cS.finalMemProf); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> %v", utils.CoreS, err)) + } + return + } + } +} + +// writeHeapProfile writes the heap profile to the specified path. +func writeHeapProfile(path string) error { + f, err := os.Create(path) + if err != nil { + return fmt.Errorf("could not create memory profile: %v", err) + } + defer func() { + if err := f.Close(); err != nil { + utils.Logger.Warning(fmt.Sprintf( + "<%s> could not close file %q: %v", utils.CoreS, f.Name(), err)) + } + }() + utils.Logger.Info(fmt.Sprintf("<%s> writing heap profile to %q", utils.CoreS, path)) + runtime.GC() // get up-to-date statistics + if err := pprof.WriteHeapProfile(f); err != nil { + return fmt.Errorf("could not write memory profile: %v", err) + } + return nil +} + +// StopMemoryProfiling stops memory profiling. +func (cS *CoreService) StopMemoryProfiling() error { + cS.memProfMux.Lock() + defer cS.memProfMux.Unlock() + + // Check if profiling is already stopped to prevent a channel close panic. + select { + case <-cS.stopMemProf: // triggered only on channel closed + return errors.New("stop memory profiling: already stopped") + default: // prevents blocking + if cS.stopMemProf == nil { + // stopMemProf being nil means that StartMemoryProfiling has never been called. There is nothing to stop. + return errors.New("stop memory profiling: not started yet") + } + } + + utils.Logger.Info(fmt.Sprintf("<%s> stopping memory profiling loop", utils.CoreS)) + close(cS.stopMemProf) + return nil } // V1Status returns the status of the engine @@ -163,65 +262,6 @@ func (cS *CoreService) V1Status(_ *context.Context, _ *utils.TenantWithAPIOpts, return } -// StartCPUProfiling starts CPU profiling and saves the profile to the specified path. -func (cS *CoreService) StartCPUProfiling(path string) (err error) { - if path == utils.EmptyString { - return utils.NewErrMandatoryIeMissing("DirPath") - } - cS.fileMux.Lock() - defer cS.fileMux.Unlock() - cS.fileCPU, err = StartCPUProfiling(path) - return -} - -// StopCPUProfiling stops CPU profiling and closes the profile file. -func (cS *CoreService) StopCPUProfiling() error { - cS.fileMux.Lock() - defer cS.fileMux.Unlock() - pprof.StopCPUProfile() - if cS.fileCPU == nil { - return errors.New("CPU profiling has not been started") - } - if err := cS.fileCPU.Close(); err != nil { - if errors.Is(err, os.ErrClosed) { - return errors.New("CPU profiling has already been stopped") - } - return fmt.Errorf("could not close profile file: %v", err) - } - return nil -} - -// StartMemoryProfiling is used to start MemoryProfiling in the given path -func (cS *CoreService) StartMemoryProfiling(args *utils.MemoryPrf) (err error) { - if args.DirPath == utils.EmptyString { - return utils.NewErrMandatoryIeMissing("Path") - } - if cS.stopMemPrf != nil { - return errors.New("Memory Profiling already started") - } - if args.Interval <= 0 { - args.Interval = 5 * time.Second - } - if args.NrFiles == 0 { - args.NrFiles = 1 - } - cS.shdWg.Add(1) - cS.stopMemPrf = make(chan struct{}) - cS.fileMEM = args.DirPath - go MemProfiling(args.DirPath, args.Interval, args.NrFiles, cS.shdWg, cS.stopMemPrf, cS.shdChan) - return -} - -// StopMemoryProfiling is used to stop MemoryProfiling -func (cS *CoreService) StopMemoryProfiling() (err error) { - if cS.stopMemPrf == nil { - return errors.New(" Memory Profiling is not started") - } - cS.fileMEM = path.Join(cS.fileMEM, utils.MemProfFileCgr) - cS.StopChanMemProf() - return -} - // Sleep is used to test the concurrent requests mechanism func (cS *CoreService) V1Sleep(_ *context.Context, arg *utils.DurationArgs, reply *string) error { time.Sleep(arg.Duration) @@ -230,6 +270,7 @@ func (cS *CoreService) V1Sleep(_ *context.Context, arg *utils.DurationArgs, repl } // StartCPUProfiling is used to start CPUProfiling in the given path +// V1StartCPUProfiling starts CPU profiling and saves the profile to the specified path. func (cS *CoreService) V1StartCPUProfiling(_ *context.Context, args *utils.DirectoryArgs, reply *string) error { if err := cS.StartCPUProfiling(path.Join(args.DirPath, utils.CpuPathCgr)); err != nil { return err @@ -248,18 +289,20 @@ func (cS *CoreService) V1StopCPUProfiling(_ *context.Context, _ *utils.TenantWit return nil } -// StartMemoryProfiling is used to start MemoryProfiling in the given path -func (cS *CoreService) V1StartMemoryProfiling(_ *context.Context, args *utils.MemoryPrf, reply *string) error { - if err := cS.StartMemoryProfiling(args); err != nil { +// V1StartMemoryProfiling starts memory profiling in the specified directory. +func (cS *CoreService) V1StartMemoryProfiling(_ *context.Context, params MemoryProfilingParams, reply *string) error { + if params.DirPath == utils.EmptyString { + return utils.NewErrMandatoryIeMissing("DirPath") + } + if err := cS.StartMemoryProfiling(params); err != nil { return err } *reply = utils.OK return nil } -// V1StopMemoryProfiling is used to stop MemoryProfiling. The file should be written on the path -// where the MemoryProfiling already started -func (cS *CoreService) V1StopMemoryProfiling(_ *context.Context, _ *utils.TenantWithAPIOpts, reply *string) error { +// V1StopMemoryProfiling stops memory profiling. +func (cS *CoreService) V1StopMemoryProfiling(_ *context.Context, _ utils.TenantWithAPIOpts, reply *string) error { if err := cS.StopMemoryProfiling(); err != nil { return err } diff --git a/cores/core_test.go b/cores/core_test.go index d7ecc9d87..fcc5b4a87 100644 --- a/cores/core_test.go +++ b/cores/core_test.go @@ -39,18 +39,15 @@ func TestNewCoreService(t *testing.T) { sts := engine.NewCapsStats(cfgDflt.CoreSCfg().CapsStatsInterval, caps, stopchan) shdWg := new(sync.WaitGroup) shdChan := utils.NewSyncedChan() - stopMemPrf := make(chan struct{}) expected := &CoreService{ - fileMEM: "/tmp", - shdWg: shdWg, - shdChan: shdChan, - stopMemPrf: stopMemPrf, - cfg: cfgDflt, - CapsStats: sts, - caps: caps, + shdWg: shdWg, + shdChan: shdChan, + cfg: cfgDflt, + CapsStats: sts, + caps: caps, } - rcv := NewCoreService(cfgDflt, caps, nil, "/tmp", stopchan, shdWg, stopMemPrf, shdChan) + rcv := NewCoreService(cfgDflt, caps, nil, stopchan, shdWg, shdChan) if !reflect.DeepEqual(expected, rcv) { t.Errorf("Expected %+v, received %+v", expected, rcv) } @@ -65,7 +62,7 @@ func TestCoreServiceStatus(t *testing.T) { caps := engine.NewCaps(1, utils.MetaBusy) stopChan := make(chan struct{}, 1) - cores := NewCoreService(cfgDflt, caps, nil, "/tmp", stopChan, nil, nil, nil) + cores := NewCoreService(cfgDflt, caps, nil, stopChan, nil, nil) args := &utils.TenantWithAPIOpts{ Tenant: "cgrates.org", APIOpts: map[string]any{}, diff --git a/dispatchers/cores.go b/dispatchers/cores.go index e38cf0c9b..d28c0f184 100644 --- a/dispatchers/cores.go +++ b/dispatchers/cores.go @@ -22,6 +22,7 @@ import ( "time" "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/utils" ) @@ -102,23 +103,29 @@ func (dS *DispatcherService) CoreSv1StartCPUProfiling(ctx *context.Context, args } return dS.Dispatch(&utils.CGREvent{Tenant: tnt, Event: ev, APIOpts: opts}, utils.MetaCore, utils.CoreSv1StartCPUProfiling, args, reply) } -func (dS *DispatcherService) CoreSv1StartMemoryProfiling(ctx *context.Context, args *utils.MemoryPrf, reply *string) (err error) { - tnt := dS.cfg.GeneralCfg().DefaultTenant - if args != nil && len(args.Tenant) != 0 { - tnt = args.Tenant +func (dS *DispatcherService) CoreSv1StartMemoryProfiling(ctx *context.Context, params cores.MemoryProfilingParams, reply *string) (err error) { + if params.Tenant == utils.EmptyString { + params.Tenant = dS.cfg.GeneralCfg().DefaultTenant } ev := make(map[string]any) - opts := make(map[string]any) - if args != nil { - opts = args.APIOpts + if params.APIOpts == nil { + params.APIOpts = make(map[string]any) } if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { - if err = dS.authorize(utils.CoreSv1StartMemoryProfiling, tnt, - utils.IfaceAsString(opts[utils.OptsAPIKey]), utils.TimePointer(time.Now())); err != nil { + if err = dS.authorize(utils.CoreSv1StartMemoryProfiling, params.Tenant, + utils.IfaceAsString(params.APIOpts[utils.OptsAPIKey]), + utils.TimePointer(time.Now())); err != nil { return } } - return dS.Dispatch(&utils.CGREvent{Tenant: tnt, Event: ev, APIOpts: opts}, utils.MetaCore, utils.CoreSv1StartMemoryProfiling, args, reply) + return dS.Dispatch( + &utils.CGREvent{ + Tenant: params.Tenant, + Event: ev, + APIOpts: params.APIOpts, + }, utils.MetaCore, + utils.CoreSv1StartMemoryProfiling, params, reply, + ) } func (dS *DispatcherService) CoreSv1Status(ctx *context.Context, args *utils.TenantWithAPIOpts, reply *map[string]any) (err error) { tnt := dS.cfg.GeneralCfg().DefaultTenant @@ -156,21 +163,26 @@ func (dS *DispatcherService) CoreSv1StopCPUProfiling(ctx *context.Context, args } return dS.Dispatch(&utils.CGREvent{Tenant: tnt, Event: ev, APIOpts: opts}, utils.MetaCore, utils.CoreSv1StopCPUProfiling, args, reply) } -func (dS *DispatcherService) CoreSv1StopMemoryProfiling(ctx *context.Context, args *utils.TenantWithAPIOpts, reply *string) (err error) { - tnt := dS.cfg.GeneralCfg().DefaultTenant - if args != nil && len(args.Tenant) != 0 { - tnt = args.Tenant +func (dS *DispatcherService) CoreSv1StopMemoryProfiling(ctx *context.Context, params utils.TenantWithAPIOpts, reply *string) (err error) { + if params.Tenant == utils.EmptyString { + params.Tenant = dS.cfg.GeneralCfg().DefaultTenant } ev := make(map[string]any) - opts := make(map[string]any) - if args != nil { - opts = args.APIOpts + if params.APIOpts == nil { + params.APIOpts = make(map[string]any) } if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { if err = dS.authorize(utils.CoreSv1StopMemoryProfiling, - tnt, utils.IfaceAsString(opts[utils.OptsAPIKey]), utils.TimePointer(time.Now())); err != nil { + params.Tenant, utils.IfaceAsString(params.APIOpts[utils.OptsAPIKey]), utils.TimePointer(time.Now())); err != nil { return } } - return dS.Dispatch(&utils.CGREvent{Tenant: tnt, Event: ev, APIOpts: opts}, utils.MetaCore, utils.CoreSv1StopMemoryProfiling, args, reply) + return dS.Dispatch( + &utils.CGREvent{ + Tenant: params.Tenant, + Event: ev, + APIOpts: params.APIOpts, + }, utils.MetaCore, + utils.CoreSv1StopMemoryProfiling, params, reply, + ) } diff --git a/docs/cgr-engine.rst b/docs/cgr-engine.rst index 094a62caf..6be45a08b 100644 --- a/docs/cgr-engine.rst +++ b/docs/cgr-engine.rst @@ -27,8 +27,8 @@ Able to read the configuration from either a local directory of *.json* files w -memprof_dir string Directory for memory profiles -memprof_interval duration - Interval between memory profile saves (default 5s) - -memprof_nrfiles int + Interval between memory profile saves (default 15s) + -memprof_maxfiles int Number of memory profiles to keep (most recent) (default 1) -node_id string Node ID of the engine @@ -48,7 +48,6 @@ Able to read the configuration from either a local directory of *.json* files w Print application version and exit - .. hint:: $ cgr-engine -config_path=/etc/cgrates .. figure:: images/CGRateSInternalArchitecture.png diff --git a/services/cores.go b/services/cores.go index e5c42cd2f..2de91e012 100644 --- a/services/cores.go +++ b/services/cores.go @@ -33,39 +33,35 @@ import ( // NewCoreService returns the Core Service func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, server *cores.Server, internalCoreSChan chan birpc.ClientConnector, anz *AnalyzerService, - fileCpu io.Closer, fileMEM string, shdWg *sync.WaitGroup, stopMemPrf chan struct{}, - shdChan *utils.SyncedChan, srvDep map[string]*sync.WaitGroup) *CoreService { + fileCpu io.Closer, shdWg *sync.WaitGroup, shdChan *utils.SyncedChan, + srvDep map[string]*sync.WaitGroup) *CoreService { return &CoreService{ - shdChan: shdChan, - shdWg: shdWg, - stopMemPrf: stopMemPrf, - connChan: internalCoreSChan, - cfg: cfg, - caps: caps, - fileCpu: fileCpu, - fileMem: fileMEM, - server: server, - anz: anz, - srvDep: srvDep, + shdChan: shdChan, + shdWg: shdWg, + connChan: internalCoreSChan, + cfg: cfg, + caps: caps, + fileCpu: fileCpu, + server: server, + anz: anz, + srvDep: srvDep, } } // CoreService implements Service interface type CoreService struct { sync.RWMutex - cfg *config.CGRConfig - server *cores.Server - caps *engine.Caps - stopChan chan struct{} - shdWg *sync.WaitGroup - stopMemPrf chan struct{} - shdChan *utils.SyncedChan - fileCpu io.Closer - fileMem string - cS *cores.CoreService - connChan chan birpc.ClientConnector - anz *AnalyzerService - srvDep map[string]*sync.WaitGroup + cfg *config.CGRConfig + server *cores.Server + caps *engine.Caps + stopChan chan struct{} + shdWg *sync.WaitGroup + shdChan *utils.SyncedChan + fileCpu io.Closer + cS *cores.CoreService + connChan chan birpc.ClientConnector + anz *AnalyzerService + srvDep map[string]*sync.WaitGroup } // Start should handle the service start @@ -78,7 +74,7 @@ func (cS *CoreService) Start() error { defer cS.Unlock() utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CoreS)) cS.stopChan = make(chan struct{}) - cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.fileCpu, cS.fileMem, cS.stopChan, cS.shdWg, cS.stopMemPrf, cS.shdChan) + cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.fileCpu, cS.stopChan, cS.shdWg, cS.shdChan) srv, err := engine.NewServiceWithName(cS.cS, utils.CoreS, true) if err != nil { return err diff --git a/services/cores_it_test.go b/services/cores_it_test.go index d02a88d70..d3a35f8ed 100644 --- a/services/cores_it_test.go +++ b/services/cores_it_test.go @@ -52,7 +52,7 @@ func TestCoreSReload(t *testing.T) { coreRPC := make(chan birpc.ClientConnector, 1) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) caps := engine.NewCaps(1, "test_caps") - coreS := NewCoreService(cfg, caps, server, coreRPC, anz, nil, "", nil, nil, nil, srvDep) + coreS := NewCoreService(cfg, caps, server, coreRPC, anz, nil, nil, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(coreS, NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db) diff --git a/services/cores_test.go b/services/cores_test.go index b31cb442a..1429d642b 100644 --- a/services/cores_test.go +++ b/services/cores_test.go @@ -41,7 +41,7 @@ func TestCoreSCoverage(t *testing.T) { srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) srv := NewCoreService(cfg, caps, server, - internalCoreSChan, anz, nil, "/tmp", nil, nil, nil, srvDep) + internalCoreSChan, anz, nil, nil, nil, srvDep) if srv == nil { t.Errorf("\nExpecting ,\n Received <%+v>", utils.ToJSON(srv)) } diff --git a/utils/consts.go b/utils/consts.go index 4e665895a..eb3da55a6 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2885,12 +2885,12 @@ const ( CpuProfDirCgr = "cpuprof_dir" MemProfDirCgr = "memprof_dir" MemProfIntervalCgr = "memprof_interval" - MemProfNrFilesCgr = "memprof_nrfiles" + MemProfMaxFilesCgr = "memprof_maxfiles" ScheduledShutdownCgr = "scheduled_shutdown" SingleCpuCgr = "singlecpu" PreloadCgr = "preload" SetVersionsCgr = "set_versions" - MemProfFileCgr = "mem_final.prof" + MemProfFinalFile = "mem_final.prof" CpuPathCgr = "cpu.prof" //Cgr loader CgrLoader = "cgr-loader" diff --git a/utils/coreutils.go b/utils/coreutils.go index 974a8e64e..ad10259b0 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -817,14 +817,6 @@ type TenantWithAPIOpts struct { APIOpts map[string]any } -type MemoryPrf struct { - Tenant string - DirPath string - Interval time.Duration - NrFiles int - APIOpts map[string]any -} - type TenantID struct { Tenant string ID string