From 8f74b8581ac2641640aff407897874b2e85d4770 Mon Sep 17 00:00:00 2001 From: porosnicuadrian Date: Fri, 25 Jun 2021 11:20:51 +0300 Subject: [PATCH] FInished APIs for CPUProfiling --- apier/v1/core.go | 6 +-- cmd/cgr-engine/cgr-engine.go | 34 ++++++------- cores/core.go | 49 ++++++++++++------- .../dispatchers/all_mysql/cgrates.json | 4 -- services/cores.go | 9 ++-- 5 files changed, 55 insertions(+), 47 deletions(-) diff --git a/apier/v1/core.go b/apier/v1/core.go index fd551c7fb..feaeea27a 100644 --- a/apier/v1/core.go +++ b/apier/v1/core.go @@ -58,7 +58,7 @@ func (cS *CoreSv1) Sleep(arg *utils.DurationArgs, reply *string) error { } // StartCPUProfiling is used to start CPUProfiling in the given path -func (cS *CoreSv1) StartCPUProfiling(args, reply *string) error { +func (cS *CoreSv1) StartCPUProfiling(args string, reply *string) error { if err := cS.cS.StartCPUProfiling(args); err != nil { return err } @@ -67,8 +67,8 @@ func (cS *CoreSv1) StartCPUProfiling(args, reply *string) error { } // StopCPUProfiling is used to stop CPUProfiling in the given path -func (cS *CoreSv1) StopCPUProfiling(args, reply *string) error { - if err := cS.cS.StopCPUProfiling(args); err != nil { +func (cS *CoreSv1) StopCPUProfiling(_ string, reply *string) error { + if err := cS.cS.StopCPUProfiling(); err != nil { return err } *reply = utils.OK diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 2d7a527c0..f737439b1 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -296,21 +296,6 @@ func memProfiling(memProfDir string, interval time.Duration, nrFiles int, shdWg } } -func startCPUProfiling(cpuProfDir string) (f io.WriteCloser, err error) { - cpuPath := path.Join(cpuProfDir, utils.CpuPathCgr) - if f, err = os.Create(cpuPath); err != nil { - utils.Logger.Crit(fmt.Sprintf("could not create cpu profile file: %s", err)) - return - } - pprof.StartCPUProfile(f) - return -} - -func stopCPUProfiling(f io.Closer) { - pprof.StopCPUProfile() - f.Close() -} - func singnalHandler(shdWg *sync.WaitGroup, shdChan *utils.SyncedChan) { shutdownSignal := make(chan os.Signal, 1) reloadSignal := make(chan os.Signal, 1) @@ -399,13 +384,25 @@ func main() { shdWg.Add(1) go memProfiling(*memProfDir, *memProfInterval, *memProfNrFiles, shdWg, shdChan) } + var cpuProfileFile io.Closer + var cS *cores.CoreService if *cpuProfDir != utils.EmptyString { - f, err := startCPUProfiling(*cpuProfDir) + cpuPath := path.Join(*cpuProfDir, utils.CpuPathCgr) + cpuProfileFile, err = cores.StartCPUProfiling(cpuPath) if err != nil { return } - defer stopCPUProfiling(f) } + defer func() { + if cS != nil { + cS.StopCPUProfiling() + return + } + if cpuProfileFile != nil { + pprof.StopCPUProfile() + cpuProfileFile.Close() + } + }() if *scheduledShutdown != utils.EmptyString { shutdownDur, err := utils.ParseDurationWithNanosecs(*scheduledShutdown) @@ -600,12 +597,13 @@ func main() { } // init CoreSv1 - coreS := services.NewCoreService(cfg, caps, server, internalCoreSv1Chan, anz, srvDep) + coreS := services.NewCoreService(cfg, caps, server, internalCoreSv1Chan, anz, cpuProfileFile, srvDep) shdWg.Add(1) if err := coreS.Start(); err != nil { fmt.Println(err) return } + cS = coreS.GetCoreS() // init CacheS cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), shdChan, anz, coreS.GetCoreS().CapsStats) diff --git a/cores/core.go b/cores/core.go index 86c19e010..4e39f4652 100644 --- a/cores/core.go +++ b/cores/core.go @@ -20,16 +20,18 @@ package cores import ( "fmt" + "io" "os" "runtime" "runtime/pprof" + "sync" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) -func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, stopChan chan struct{}) *CoreService { +func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, file io.Closer, stopChan chan struct{}) *CoreService { var st *engine.CapsStats if caps.IsLimited() && cfg.CoreSCfg().CapsStatsInterval != 0 { st = engine.NewCapsStats(cfg.CoreSCfg().CapsStatsInterval, caps, stopChan) @@ -37,12 +39,15 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, stopChan chan stru return &CoreService{ cfg: cfg, CapsStats: st, + file: file, } } type CoreService struct { cfg *config.CGRConfig CapsStats *engine.CapsStats + file io.Closer + fileMx sync.Mutex } // Shutdown is called to shutdown the service @@ -71,31 +76,37 @@ func (cS *CoreService) Status(arg *utils.TenantWithAPIOpts, reply *map[string]in } // StartCPUProfiling is used to start CPUProfiling in the given path -func (cS *CoreService) StartCPUProfiling(argPath *string) (err error) { - if *argPath == utils.EmptyString { +func (cS *CoreService) StartCPUProfiling(argPath string) (err error) { + cS.fileMx.Lock() + defer cS.fileMx.Unlock() + if cS.file != nil { + return fmt.Errorf("CPU profiling already started") + } + if argPath == utils.EmptyString { return utils.NewErrMandatoryIeMissing("Path") } - f, err := os.Create(*argPath) - if err != nil { - return fmt.Errorf("could not create CPU profile: %v", err) - } - if err := pprof.StartCPUProfile(f); err != nil { - return fmt.Errorf("could not create CPU profile: %v", err) - } - defer f.Close() + cS.file, err = StartCPUProfiling(argPath) return } // StopCPUProfiling is used to stop CPUProfiling in the given path -func (cS *CoreService) StopCPUProfiling(argPath *string) (err error) { - f, err := os.Create(*argPath) - if err != nil { - return fmt.Errorf("could not create CPU profile: %v", err) - } - if err := pprof.StartCPUProfile(f); err != nil { - // this means CPUProfiling is already active,so we can shut down now +func (cS *CoreService) StopCPUProfiling() (err error) { + cS.fileMx.Lock() + defer cS.fileMx.Unlock() + if cS.file != nil { pprof.StopCPUProfile() - return nil + err = cS.file.Close() + cS.file = nil + return } + return fmt.Errorf(" cannot stop because CPUProfiling is not active") +} + +func StartCPUProfiling(path string) (file io.WriteCloser, err error) { + file, err = os.Create(path) + if err != nil { + return nil, fmt.Errorf("could not create CPU profile: %v", err) + } + err = pprof.StartCPUProfile(file) return } diff --git a/data/conf/samples/dispatchers/all_mysql/cgrates.json b/data/conf/samples/dispatchers/all_mysql/cgrates.json index 5a81c9f37..b1ab1e83a 100644 --- a/data/conf/samples/dispatchers/all_mysql/cgrates.json +++ b/data/conf/samples/dispatchers/all_mysql/cgrates.json @@ -31,10 +31,6 @@ "strategy": "*first", "conns": [{"address": "127.0.0.1:6012", "transport":"*json"}], }, - "rplConn": { - "strategy": "*broadcast_sync", - "conns": [{"address": "127.0.0.1:7012", "transport":"*json"}], - } }, diff --git a/services/cores.go b/services/cores.go index 4b2f1b87d..a23e05d59 100644 --- a/services/cores.go +++ b/services/cores.go @@ -20,6 +20,7 @@ package services import ( "fmt" + "io" "sync" v1 "github.com/cgrates/cgrates/apier/v1" @@ -33,11 +34,13 @@ import ( // NewCoreService returns the Core Service func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, server *cores.Server, internalCoreSChan chan rpcclient.ClientConnector, anz *AnalyzerService, + fileCpu io.Closer, srvDep map[string]*sync.WaitGroup) *CoreService { return &CoreService{ connChan: internalCoreSChan, cfg: cfg, caps: caps, + fileCpu: fileCpu, server: server, anz: anz, srvDep: srvDep, @@ -51,7 +54,7 @@ type CoreService struct { server *cores.Server caps *engine.Caps stopChan chan struct{} - + fileCpu io.Closer cS *cores.CoreService rpc *v1.CoreSv1 connChan chan rpcclient.ClientConnector @@ -59,7 +62,7 @@ type CoreService struct { srvDep map[string]*sync.WaitGroup } -// Start should handle the sercive start +// Start should handle the service start func (cS *CoreService) Start() (err error) { if cS.IsRunning() { return utils.ErrServiceAlreadyRunning @@ -69,7 +72,7 @@ func (cS *CoreService) Start() (err error) { defer cS.Unlock() utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CoreS)) cS.stopChan = make(chan struct{}) - cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.stopChan) + cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.fileCpu, cS.stopChan) cS.rpc = v1.NewCoreSv1(cS.cS) if !cS.cfg.DispatcherSCfg().Enabled { cS.server.RpcRegister(cS.rpc)