diff --git a/apier/v1/core.go b/apier/v1/core.go index e97dcf4ba..5f9f2f722 100644 --- a/apier/v1/core.go +++ b/apier/v1/core.go @@ -85,3 +85,13 @@ func (cS *CoreSv1) StartMemoryProfiling(args *utils.MemoryPrf, reply *string) er *reply = utils.OK return nil } + +// StopMemoryProfiling is used to stop MemoryProfiling. The file should be written on the path +// where the MemoryProfiling already started +func (cS *CoreSv1) StopMemoryProfiling(_ *utils.MemoryPrf, reply *string) error { + if err := cS.cS.StopMemoryProfiling(); err != nil { + return err + } + *reply = utils.OK + return nil +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 94a702650..b8bf7407d 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -341,12 +341,20 @@ func main() { shdWg.Add(1) go singnalHandler(shdWg, shdChan) + var cS *cores.CoreService + var stopMemProf chan struct{} if *memProfDir != utils.EmptyString { shdWg.Add(1) - go cores.MemProfiling(*memProfDir, *memProfInterval, *memProfNrFiles, shdWg, shdChan) + stopMemProf = make(chan struct{}) + go cores.MemProfiling(*memProfDir, *memProfInterval, *memProfNrFiles, shdWg, stopMemProf, shdChan) } + defer func() { + if stopMemProf != nil { + cS.StopMemoryProfiling() + } + }() + var cpuProfileFile io.Closer - var cS *cores.CoreService if *cpuProfDir != utils.EmptyString { cpuPath := path.Join(*cpuProfDir, utils.CpuPathCgr) cpuProfileFile, err = cores.StartCPUProfiling(cpuPath) @@ -558,7 +566,7 @@ func main() { } // init CoreSv1 - coreS := services.NewCoreService(cfg, caps, server, internalCoreSv1Chan, anz, cpuProfileFile, srvDep) + coreS := services.NewCoreService(cfg, caps, server, internalCoreSv1Chan, anz, cpuProfileFile, shdWg, stopMemProf, shdChan, srvDep) shdWg.Add(1) if err := coreS.Start(); err != nil { fmt.Println(err) diff --git a/cores/core.go b/cores/core.go index b459d4e61..02832791d 100644 --- a/cores/core.go +++ b/cores/core.go @@ -19,6 +19,7 @@ along with this program. If not, see package cores import ( + "errors" "fmt" "io" "os" @@ -33,29 +34,39 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, file io.Closer, stopChan chan struct{}) *CoreService { +func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, file io.Closer, stopChan chan struct{}, + shdWg *sync.WaitGroup, stopMemPrf chan struct{}, shdChan *utils.SyncedChan) *CoreService { var st *engine.CapsStats if caps.IsLimited() && cfg.CoreSCfg().CapsStatsInterval != 0 { st = engine.NewCapsStats(cfg.CoreSCfg().CapsStatsInterval, caps, stopChan) } return &CoreService{ - cfg: cfg, - CapsStats: st, - fileCPU: file, + shdWg: shdWg, + stopMemPrf: stopMemPrf, + shdChan: shdChan, + cfg: cfg, + CapsStats: st, + fileCPU: file, } } type CoreService struct { - cfg *config.CGRConfig - CapsStats *engine.CapsStats - fileCPU io.Closer - fileMem io.Closer - fileMx sync.Mutex + cfg *config.CGRConfig + CapsStats *engine.CapsStats + shdWg *sync.WaitGroup + stopMemPrf chan struct{} + shdChan *utils.SyncedChan + fileCPU io.Closer + fileMem io.Closer + fileMx sync.Mutex } // Shutdown is called to shutdown the service func (cS *CoreService) Shutdown() { utils.Logger.Info(fmt.Sprintf("<%s> shutdown initialized", utils.CoreS)) + if cS.stopMemPrf != nil { + close(cS.stopMemPrf) + } utils.Logger.Info(fmt.Sprintf("<%s> shutdown complete", utils.CoreS)) return } @@ -114,14 +125,23 @@ func StartCPUProfiling(path string) (file io.WriteCloser, err error) { return } +// StartMemoryProfiling is used to start MemoryProfiling in the given path func (cS *CoreService) StartMemoryProfiling(args *utils.MemoryPrf) (err error) { if args.DirPath == utils.EmptyString { return utils.NewErrMandatoryIeMissing("Path") } - shdWg := new(sync.WaitGroup) - shdChan := utils.NewSyncedChan() - shdWg.Add(1) - go MemProfiling(args.DirPath, args.Interval, args.NrFiles, shdWg, shdChan) + cS.shdWg.Add(1) + go MemProfiling(args.DirPath, args.Interval, args.NrFiles, cS.shdWg, cS.stopMemPrf, cS.shdChan) + return +} + +// StopMemoryProfiling is used to stop MemoryProfiling +func (cS *CoreService) StopMemoryProfiling() (err error) { + if cS.stopMemPrf == nil { + return errors.New(" Memory Profiling is not started") + } + close(cS.stopMemPrf) + cS.stopMemPrf = nil return } @@ -141,11 +161,11 @@ func MemProfFile(memProfPath string) bool { return true } -func MemProfiling(memProfDir string, interval time.Duration, nrFiles int, shdWg *sync.WaitGroup, shdChan *utils.SyncedChan) { +func MemProfiling(memProfDir string, interval time.Duration, nrFiles int, shdWg *sync.WaitGroup, stopChan chan struct{}, shdChan *utils.SyncedChan) { tm := time.NewTimer(interval) for i := 1; ; i++ { select { - case <-shdChan.Done(): + case <-stopChan: tm.Stop() shdWg.Done() return diff --git a/services/cores.go b/services/cores.go index a23e05d59..4ed033c92 100644 --- a/services/cores.go +++ b/services/cores.go @@ -34,32 +34,38 @@ import ( // NewCoreService returns the Core Service func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, server *cores.Server, internalCoreSChan chan rpcclient.ClientConnector, anz *AnalyzerService, - fileCpu io.Closer, - srvDep map[string]*sync.WaitGroup) *CoreService { + fileCpu io.Closer, shdWg *sync.WaitGroup, stopMemPrf chan struct{}, + shdChan *utils.SyncedChan, srvDep map[string]*sync.WaitGroup) *CoreService { return &CoreService{ - connChan: internalCoreSChan, - cfg: cfg, - caps: caps, - fileCpu: fileCpu, - server: server, - anz: anz, - srvDep: srvDep, + shdChan: shdChan, + shdWg: shdWg, + stopMemPrf: stopMemPrf, + connChan: internalCoreSChan, + cfg: cfg, + caps: caps, + fileCpu: fileCpu, + server: server, + anz: anz, + srvDep: srvDep, } } // CoreService implements Service interface type CoreService struct { sync.RWMutex - cfg *config.CGRConfig - server *cores.Server - caps *engine.Caps - stopChan chan struct{} - fileCpu io.Closer - cS *cores.CoreService - rpc *v1.CoreSv1 - connChan chan rpcclient.ClientConnector - anz *AnalyzerService - srvDep map[string]*sync.WaitGroup + cfg *config.CGRConfig + server *cores.Server + caps *engine.Caps + stopChan chan struct{} + shdWg *sync.WaitGroup + stopMemPrf chan struct{} + shdChan *utils.SyncedChan + fileCpu io.Closer + cS *cores.CoreService + rpc *v1.CoreSv1 + connChan chan rpcclient.ClientConnector + anz *AnalyzerService + srvDep map[string]*sync.WaitGroup } // Start should handle the service start @@ -72,7 +78,7 @@ func (cS *CoreService) Start() (err error) { defer cS.Unlock() utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CoreS)) cS.stopChan = make(chan struct{}) - cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.fileCpu, cS.stopChan) + cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.fileCpu, cS.stopChan, cS.shdWg, cS.stopMemPrf, cS.shdChan) cS.rpc = v1.NewCoreSv1(cS.cS) if !cS.cfg.DispatcherSCfg().Enabled { cS.server.RpcRegister(cS.rpc)