From 7bea1b2697436123b275013bba82de7765e9d3b3 Mon Sep 17 00:00:00 2001 From: porosnicuadrian Date: Thu, 8 Jul 2021 09:23:55 +0300 Subject: [PATCH] Finished adding final_memprof file in stop api --- apier/v1/core_it_test.go | 4 +++- cmd/cgr-engine/cgr-engine.go | 5 ++++- cores/core.go | 16 +++++++++------- cores/core_test.go | 5 +++-- services/cores.go | 6 ++++-- services/cores_test.go | 2 +- 6 files changed, 24 insertions(+), 14 deletions(-) diff --git a/apier/v1/core_it_test.go b/apier/v1/core_it_test.go index ef6be309b..d42c16d77 100644 --- a/apier/v1/core_it_test.go +++ b/apier/v1/core_it_test.go @@ -64,6 +64,7 @@ var ( testCoreSv1StopMemoryProfiling, testCoreSv1KillEngine, testCoreSv1CheckFinalMemProfiling, + // test CPU and Memory just by APIs testCoreSv1StartEngine, testCoreSv1RPCConn, @@ -80,6 +81,7 @@ var ( testCoreSv1Sleep, testCoreSv1StopMemoryProfiling, testCoreSv1KillEngine, + testCoreSv1CheckFinalMemProfiling, } ) @@ -287,7 +289,7 @@ func testCoreSv1StartCPUProfiling(t *testing.T) { func testCoreSv1Sleep(t *testing.T) { args := &utils.DurationArgs{ - Duration: 500 * time.Millisecond, + Duration: 600 * time.Millisecond, } var reply string if err := coreV1Rpc.Call(utils.CoreSv1Sleep, diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 9cfe4d2f2..edd29003e 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -343,9 +343,11 @@ func main() { var cS *cores.CoreService var stopMemProf chan struct{} + var memPrfDirForCores string if *memProfDir != utils.EmptyString { shdWg.Add(1) stopMemProf = make(chan struct{}) + memPrfDirForCores = *memProfDir go cores.MemProfiling(*memProfDir, *memProfInterval, *memProfNrFiles, shdWg, stopMemProf, shdChan) defer func() { if cS == nil { @@ -566,7 +568,8 @@ func main() { } // init CoreSv1 - coreS := services.NewCoreService(cfg, caps, server, internalCoreSv1Chan, anz, cpuProfileFile, shdWg, stopMemProf, shdChan, srvDep) + + coreS := services.NewCoreService(cfg, caps, server, internalCoreSv1Chan, anz, cpuProfileFile, memPrfDirForCores, shdWg, stopMemProf, shdChan, srvDep) shdWg.Add(1) if err := coreS.Start(); err != nil { fmt.Println(err) diff --git a/cores/core.go b/cores/core.go index 85e43582e..a86ec983c 100644 --- a/cores/core.go +++ b/cores/core.go @@ -34,7 +34,7 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, file io.Closer, stopChan chan struct{}, +func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, fileCPU io.Closer, fileMem string, stopChan chan struct{}, shdWg *sync.WaitGroup, stopMemPrf chan struct{}, shdChan *utils.SyncedChan) *CoreService { var st *engine.CapsStats if caps.IsLimited() && cfg.CoreSCfg().CapsStatsInterval != 0 { @@ -46,7 +46,8 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, file io.Closer, st shdChan: shdChan, cfg: cfg, CapsStats: st, - fileCPU: file, + fileCPU: fileCPU, + fileMEM: fileMem, } } @@ -56,6 +57,7 @@ type CoreService struct { shdWg *sync.WaitGroup stopMemPrf chan struct{} shdChan *utils.SyncedChan + fileMEM string fileCPU io.Closer fileMx sync.Mutex } @@ -70,13 +72,12 @@ func (cS *CoreService) Shutdown() { // StopChanMemProf will stop the MemoryProfiling Channel in order to create // the final MemoryProfiling when CoreS subsystem will stop. -func (cS *CoreService) StopChanMemProf() bool { +func (cS *CoreService) StopChanMemProf() { if cS.stopMemPrf != nil { + MemProfFile(cS.fileMEM) close(cS.stopMemPrf) cS.stopMemPrf = nil - return true } - return false } func StartCPUProfiling(path string) (file io.WriteCloser, err error) { @@ -187,6 +188,7 @@ func (cS *CoreService) StartMemoryProfiling(args *utils.MemoryPrf) (err error) { } cS.shdWg.Add(1) cS.stopMemPrf = make(chan struct{}) + cS.fileMEM = args.DirPath go MemProfiling(args.DirPath, args.Interval, args.NrFiles, cS.shdWg, cS.stopMemPrf, cS.shdChan) return } @@ -196,7 +198,7 @@ func (cS *CoreService) StopMemoryProfiling() (err error) { if cS.stopMemPrf == nil { return errors.New(" Memory Profiling is not started") } - close(cS.stopMemPrf) - cS.stopMemPrf = nil + cS.fileMEM = path.Join(cS.fileMEM, utils.MemProfFileCgr) + cS.StopChanMemProf() return } diff --git a/cores/core_test.go b/cores/core_test.go index 20ac4ec47..41158ea1e 100644 --- a/cores/core_test.go +++ b/cores/core_test.go @@ -40,6 +40,7 @@ func TestNewCoreService(t *testing.T) { shdChan := utils.NewSyncedChan() stopMemPrf := make(chan struct{}) expected := &CoreService{ + fileMEM: "/tmp", shdWg: shdWg, shdChan: shdChan, stopMemPrf: stopMemPrf, @@ -47,7 +48,7 @@ func TestNewCoreService(t *testing.T) { CapsStats: sts, } - rcv := NewCoreService(cfgDflt, caps, nil, stopchan, shdWg, stopMemPrf, shdChan) + rcv := NewCoreService(cfgDflt, caps, nil,"/tmp", stopchan, shdWg, stopMemPrf, shdChan) if !reflect.DeepEqual(expected, rcv) { t.Errorf("Expected %+v, received %+v", utils.ToJSON(expected), utils.ToJSON(rcv)) } @@ -62,7 +63,7 @@ func TestCoreServiceStatus(t *testing.T) { caps := engine.NewCaps(1, utils.MetaBusy) stopChan := make(chan struct{}, 1) - cores := NewCoreService(cfgDflt, caps, nil, stopChan, nil, nil, nil) + cores := NewCoreService(cfgDflt, caps, nil,"/tmp", stopChan, nil, nil, nil) args := &utils.TenantWithAPIOpts{ Tenant: "cgrates.org", APIOpts: map[string]interface{}{}, diff --git a/services/cores.go b/services/cores.go index 4ed033c92..7c27079b2 100644 --- a/services/cores.go +++ b/services/cores.go @@ -34,7 +34,7 @@ import ( // NewCoreService returns the Core Service func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, server *cores.Server, internalCoreSChan chan rpcclient.ClientConnector, anz *AnalyzerService, - fileCpu io.Closer, shdWg *sync.WaitGroup, stopMemPrf chan struct{}, + fileCpu io.Closer, fileMEM string, shdWg *sync.WaitGroup, stopMemPrf chan struct{}, shdChan *utils.SyncedChan, srvDep map[string]*sync.WaitGroup) *CoreService { return &CoreService{ shdChan: shdChan, @@ -44,6 +44,7 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, server *cores.Serv cfg: cfg, caps: caps, fileCpu: fileCpu, + fileMem: fileMEM, server: server, anz: anz, srvDep: srvDep, @@ -61,6 +62,7 @@ type CoreService struct { stopMemPrf chan struct{} shdChan *utils.SyncedChan fileCpu io.Closer + fileMem string cS *cores.CoreService rpc *v1.CoreSv1 connChan chan rpcclient.ClientConnector @@ -78,7 +80,7 @@ func (cS *CoreService) Start() (err error) { defer cS.Unlock() utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CoreS)) cS.stopChan = make(chan struct{}) - cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.fileCpu, cS.stopChan, cS.shdWg, cS.stopMemPrf, cS.shdChan) + cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.fileCpu, cS.fileMem, cS.stopChan, cS.shdWg, cS.stopMemPrf, cS.shdChan) cS.rpc = v1.NewCoreSv1(cS.cS) if !cS.cfg.DispatcherSCfg().Enabled { cS.server.RpcRegister(cS.rpc) diff --git a/services/cores_test.go b/services/cores_test.go index feca6127d..90ea7ef54 100644 --- a/services/cores_test.go +++ b/services/cores_test.go @@ -41,7 +41,7 @@ func TestCoreSCoverage(t *testing.T) { srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) srv := NewCoreService(cfg, caps, server, - internalCoreSChan, anz, nil, nil, nil, nil, srvDep) + internalCoreSChan, anz, nil, "/tmp", nil, nil, nil, srvDep) if srv == nil { t.Errorf("\nExpecting ,\n Received <%+v>", utils.ToJSON(srv)) }