Added apis for memoryProfiling

This commit is contained in:
porosnicuadrian
2021-06-29 17:15:52 +03:00
committed by Dan Christian Bogos
parent 95edfce5a2
commit 7531215df0
4 changed files with 82 additions and 38 deletions

View File

@@ -85,3 +85,13 @@ func (cS *CoreSv1) StartMemoryProfiling(args *utils.MemoryPrf, reply *string) er
*reply = utils.OK
return nil
}
// StopMemoryProfiling is used to stop MemoryProfiling. The file should be written on the path
// where the MemoryProfiling already started
func (cS *CoreSv1) StopMemoryProfiling(_ *utils.MemoryPrf, reply *string) error {
if err := cS.cS.StopMemoryProfiling(); err != nil {
return err
}
*reply = utils.OK
return nil
}

View File

@@ -341,12 +341,20 @@ func main() {
shdWg.Add(1)
go singnalHandler(shdWg, shdChan)
var cS *cores.CoreService
var stopMemProf chan struct{}
if *memProfDir != utils.EmptyString {
shdWg.Add(1)
go cores.MemProfiling(*memProfDir, *memProfInterval, *memProfNrFiles, shdWg, shdChan)
stopMemProf = make(chan struct{})
go cores.MemProfiling(*memProfDir, *memProfInterval, *memProfNrFiles, shdWg, stopMemProf, shdChan)
}
defer func() {
if stopMemProf != nil {
cS.StopMemoryProfiling()
}
}()
var cpuProfileFile io.Closer
var cS *cores.CoreService
if *cpuProfDir != utils.EmptyString {
cpuPath := path.Join(*cpuProfDir, utils.CpuPathCgr)
cpuProfileFile, err = cores.StartCPUProfiling(cpuPath)
@@ -558,7 +566,7 @@ func main() {
}
// init CoreSv1
coreS := services.NewCoreService(cfg, caps, server, internalCoreSv1Chan, anz, cpuProfileFile, srvDep)
coreS := services.NewCoreService(cfg, caps, server, internalCoreSv1Chan, anz, cpuProfileFile, shdWg, stopMemProf, shdChan, srvDep)
shdWg.Add(1)
if err := coreS.Start(); err != nil {
fmt.Println(err)

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package cores
import (
"errors"
"fmt"
"io"
"os"
@@ -33,29 +34,39 @@ import (
"github.com/cgrates/cgrates/utils"
)
func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, file io.Closer, stopChan chan struct{}) *CoreService {
func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, file io.Closer, stopChan chan struct{},
shdWg *sync.WaitGroup, stopMemPrf chan struct{}, shdChan *utils.SyncedChan) *CoreService {
var st *engine.CapsStats
if caps.IsLimited() && cfg.CoreSCfg().CapsStatsInterval != 0 {
st = engine.NewCapsStats(cfg.CoreSCfg().CapsStatsInterval, caps, stopChan)
}
return &CoreService{
cfg: cfg,
CapsStats: st,
fileCPU: file,
shdWg: shdWg,
stopMemPrf: stopMemPrf,
shdChan: shdChan,
cfg: cfg,
CapsStats: st,
fileCPU: file,
}
}
type CoreService struct {
cfg *config.CGRConfig
CapsStats *engine.CapsStats
fileCPU io.Closer
fileMem io.Closer
fileMx sync.Mutex
cfg *config.CGRConfig
CapsStats *engine.CapsStats
shdWg *sync.WaitGroup
stopMemPrf chan struct{}
shdChan *utils.SyncedChan
fileCPU io.Closer
fileMem io.Closer
fileMx sync.Mutex
}
// Shutdown is called to shutdown the service
func (cS *CoreService) Shutdown() {
utils.Logger.Info(fmt.Sprintf("<%s> shutdown initialized", utils.CoreS))
if cS.stopMemPrf != nil {
close(cS.stopMemPrf)
}
utils.Logger.Info(fmt.Sprintf("<%s> shutdown complete", utils.CoreS))
return
}
@@ -114,14 +125,23 @@ func StartCPUProfiling(path string) (file io.WriteCloser, err error) {
return
}
// StartMemoryProfiling is used to start MemoryProfiling in the given path
func (cS *CoreService) StartMemoryProfiling(args *utils.MemoryPrf) (err error) {
if args.DirPath == utils.EmptyString {
return utils.NewErrMandatoryIeMissing("Path")
}
shdWg := new(sync.WaitGroup)
shdChan := utils.NewSyncedChan()
shdWg.Add(1)
go MemProfiling(args.DirPath, args.Interval, args.NrFiles, shdWg, shdChan)
cS.shdWg.Add(1)
go MemProfiling(args.DirPath, args.Interval, args.NrFiles, cS.shdWg, cS.stopMemPrf, cS.shdChan)
return
}
// StopMemoryProfiling is used to stop MemoryProfiling
func (cS *CoreService) StopMemoryProfiling() (err error) {
if cS.stopMemPrf == nil {
return errors.New(" Memory Profiling is not started")
}
close(cS.stopMemPrf)
cS.stopMemPrf = nil
return
}
@@ -141,11 +161,11 @@ func MemProfFile(memProfPath string) bool {
return true
}
func MemProfiling(memProfDir string, interval time.Duration, nrFiles int, shdWg *sync.WaitGroup, shdChan *utils.SyncedChan) {
func MemProfiling(memProfDir string, interval time.Duration, nrFiles int, shdWg *sync.WaitGroup, stopChan chan struct{}, shdChan *utils.SyncedChan) {
tm := time.NewTimer(interval)
for i := 1; ; i++ {
select {
case <-shdChan.Done():
case <-stopChan:
tm.Stop()
shdWg.Done()
return

View File

@@ -34,32 +34,38 @@ import (
// NewCoreService returns the Core Service
func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, server *cores.Server,
internalCoreSChan chan rpcclient.ClientConnector, anz *AnalyzerService,
fileCpu io.Closer,
srvDep map[string]*sync.WaitGroup) *CoreService {
fileCpu io.Closer, shdWg *sync.WaitGroup, stopMemPrf chan struct{},
shdChan *utils.SyncedChan, srvDep map[string]*sync.WaitGroup) *CoreService {
return &CoreService{
connChan: internalCoreSChan,
cfg: cfg,
caps: caps,
fileCpu: fileCpu,
server: server,
anz: anz,
srvDep: srvDep,
shdChan: shdChan,
shdWg: shdWg,
stopMemPrf: stopMemPrf,
connChan: internalCoreSChan,
cfg: cfg,
caps: caps,
fileCpu: fileCpu,
server: server,
anz: anz,
srvDep: srvDep,
}
}
// CoreService implements Service interface
type CoreService struct {
sync.RWMutex
cfg *config.CGRConfig
server *cores.Server
caps *engine.Caps
stopChan chan struct{}
fileCpu io.Closer
cS *cores.CoreService
rpc *v1.CoreSv1
connChan chan rpcclient.ClientConnector
anz *AnalyzerService
srvDep map[string]*sync.WaitGroup
cfg *config.CGRConfig
server *cores.Server
caps *engine.Caps
stopChan chan struct{}
shdWg *sync.WaitGroup
stopMemPrf chan struct{}
shdChan *utils.SyncedChan
fileCpu io.Closer
cS *cores.CoreService
rpc *v1.CoreSv1
connChan chan rpcclient.ClientConnector
anz *AnalyzerService
srvDep map[string]*sync.WaitGroup
}
// Start should handle the service start
@@ -72,7 +78,7 @@ func (cS *CoreService) Start() (err error) {
defer cS.Unlock()
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CoreS))
cS.stopChan = make(chan struct{})
cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.fileCpu, cS.stopChan)
cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.fileCpu, cS.stopChan, cS.shdWg, cS.stopMemPrf, cS.shdChan)
cS.rpc = v1.NewCoreSv1(cS.cS)
if !cS.cfg.DispatcherSCfg().Enabled {
cS.server.RpcRegister(cS.rpc)