Updated shutdown channel handling

This commit is contained in:
Trial97
2020-11-27 15:35:27 +02:00
committed by Dan Christian Bogos
parent 49fbe92945
commit 9de6a2d172
50 changed files with 543 additions and 398 deletions

View File

@@ -21,6 +21,7 @@ package main
import (
"flag"
"fmt"
"io"
"log"
"os"
"os/signal"
@@ -29,6 +30,7 @@ import (
"runtime/pprof"
"strconv"
"strings"
"sync"
"syscall"
"time"
@@ -75,14 +77,14 @@ func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS,
// initCacheS inits the CacheS and starts precaching as well as populating internal channel for RPC conns
func initCacheS(internalCacheSChan chan rpcclient.ClientConnector,
server *cores.Server, dm *engine.DataManager, exitChan chan<- struct{},
server *cores.Server, dm *engine.DataManager, shdChan *utils.SyncedChan,
anz *services.AnalyzerService,
cpS *engine.CapsStats) (chS *engine.CacheS) {
chS = engine.NewCacheS(cfg, dm, cpS)
go func() {
if err := chS.Precache(); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> could not init, error: %s", utils.CacheS, err.Error()))
close(exitChan)
shdChan.CloseOnce()
}
}()
@@ -143,7 +145,7 @@ func startRPC(server *cores.Server, internalRaterChan,
internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan,
internalLoaderSChan, internalRALsv1Chan, internalCacheSChan,
internalEEsChan, internalRateSChan chan rpcclient.ClientConnector,
stopChan <-chan struct{}, exitChan chan<- struct{}) {
shdChan *utils.SyncedChan) {
if !cfg.DispatcherSCfg().Enabled {
select { // Any of the rpc methods will unlock listening to rpc requests
case resp := <-internalRaterChan:
@@ -176,27 +178,27 @@ func startRPC(server *cores.Server, internalRaterChan,
internalEEsChan <- eeS
case rateS := <-internalRateSChan:
internalRateSChan <- rateS
case <-stopChan:
case <-shdChan.Done():
return
}
} else {
select {
case dispatcherS := <-internalDispatcherSChan:
internalDispatcherSChan <- dispatcherS
case <-stopChan:
case <-shdChan.Done():
return
}
}
go server.ServeJSON(cfg.ListenCfg().RPCJSONListen, exitChan)
go server.ServeGOB(cfg.ListenCfg().RPCGOBListen, exitChan)
go server.ServeJSON(cfg.ListenCfg().RPCJSONListen, shdChan)
go server.ServeGOB(cfg.ListenCfg().RPCGOBListen, shdChan)
go server.ServeHTTP(
cfg.ListenCfg().HTTPListen,
cfg.HTTPCfg().HTTPJsonRPCURL,
cfg.HTTPCfg().HTTPWSURL,
cfg.HTTPCfg().HTTPUseBasicAuth,
cfg.HTTPCfg().HTTPAuthUsers,
exitChan,
shdChan,
)
if (len(cfg.ListenCfg().RPCGOBTLSListen) != 0 ||
len(cfg.ListenCfg().RPCJSONTLSListen) != 0 ||
@@ -214,7 +216,7 @@ func startRPC(server *cores.Server, internalRaterChan,
cfg.TLSCfg().CaCertificate,
cfg.TLSCfg().ServerPolicy,
cfg.TLSCfg().ServerName,
exitChan,
shdChan,
)
}
if cfg.ListenCfg().RPCJSONTLSListen != "" {
@@ -225,7 +227,7 @@ func startRPC(server *cores.Server, internalRaterChan,
cfg.TLSCfg().CaCertificate,
cfg.TLSCfg().ServerPolicy,
cfg.TLSCfg().ServerName,
exitChan,
shdChan,
)
}
if cfg.ListenCfg().HTTPTLSListen != "" {
@@ -240,7 +242,7 @@ func startRPC(server *cores.Server, internalRaterChan,
cfg.HTTPCfg().HTTPWSURL,
cfg.HTTPCfg().HTTPUseBasicAuth,
cfg.HTTPCfg().HTTPAuthUsers,
exitChan,
shdChan,
)
}
}
@@ -273,36 +275,46 @@ func memProfFile(memProfPath string) bool {
return true
}
func memProfiling(memProfDir string, interval time.Duration, nrFiles int, exitChan chan<- struct{}) {
func memProfiling(memProfDir string, interval time.Duration, nrFiles int, shdWg *sync.WaitGroup, shdChan *utils.SyncedChan) {
tm := time.NewTimer(interval)
for i := 1; ; i++ {
select {
case <-shdChan.Done():
tm.Stop()
shdWg.Done()
return
case <-tm.C:
}
time.Sleep(interval)
memPath := path.Join(memProfDir, fmt.Sprintf("mem%v.prof", i))
if !memProfFile(memPath) {
close(exitChan)
shdChan.CloseOnce()
shdWg.Done()
return
}
if i%nrFiles == 0 {
i = 0 // reset the counting
}
tm.Reset(interval)
}
}
func cpuProfiling(cpuProfDir string, stopChan, doneChan chan struct{}, exitChan chan<- struct{}) {
func startCPUProfiling(cpuProfDir string) (f io.WriteCloser, err error) {
cpuPath := path.Join(cpuProfDir, "cpu.prof")
f, err := os.Create(cpuPath)
defer func() { close(doneChan) }()
if err != nil {
if f, err = os.Create(cpuPath); err != nil {
utils.Logger.Crit(fmt.Sprintf("<cpuProfiling>could not create cpu profile file: %s", err))
close(exitChan)
return
}
pprof.StartCPUProfile(f)
<-stopChan
pprof.StopCPUProfile()
f.Close()
return
}
func singnalHandler(stopChan <-chan struct{}, exitChan chan<- struct{}) {
func stopCPUProfiling(f io.Closer) {
pprof.StopCPUProfile()
f.Close()
}
func singnalHandler(shdWg *sync.WaitGroup, shdChan *utils.SyncedChan) {
shutdownSignal := make(chan os.Signal)
reloadSignal := make(chan os.Signal)
signal.Notify(shutdownSignal, os.Interrupt,
@@ -310,10 +322,12 @@ func singnalHandler(stopChan <-chan struct{}, exitChan chan<- struct{}) {
signal.Notify(reloadSignal, syscall.SIGHUP)
for {
select {
case <-stopChan:
case <-shdChan.Done():
shdWg.Done()
return
case <-shutdownSignal:
close(exitChan)
shdChan.CloseOnce()
shdWg.Done()
return
case <-reloadSignal:
// do it in it's own gorutine in order to not block the signal handler with the reload functionality
@@ -333,10 +347,10 @@ func singnalHandler(stopChan <-chan struct{}, exitChan chan<- struct{}) {
}
func runPreload(loader *services.LoaderService, internalLoaderSChan chan rpcclient.ClientConnector,
exitChan chan<- struct{}) {
shdChan *utils.SyncedChan) {
if !cfg.LoaderCfg().Enabled() {
utils.Logger.Err(fmt.Sprintf("<%s> not enabled but required by preload mechanism", utils.LoaderS))
close(exitChan)
shdChan.CloseOnce()
return
}
@@ -351,7 +365,7 @@ func runPreload(loader *services.LoaderService, internalLoaderSChan chan rpcclie
StopOnError: true,
}, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> preload failed on loadID <%s> , err: <%s>", utils.LoaderS, loaderID, err.Error()))
close(exitChan)
shdChan.CloseOnce()
return
}
}
@@ -378,18 +392,22 @@ func main() {
runtime.GOMAXPROCS(1) // Having multiple cpus may slow down computing due to CPU management, to be reviewed in future Go releases
}
exitChan := make(chan struct{})
signStop := make(chan struct{})
rpcStop := make(chan struct{})
go singnalHandler(signStop, exitChan)
shdWg := new(sync.WaitGroup)
shdChan := utils.NewSyncedChan()
shdWg.Add(1)
go singnalHandler(shdWg, shdChan)
if *memProfDir != utils.EmptyString {
go memProfiling(*memProfDir, *memProfInterval, *memProfNrFiles, exitChan)
shdWg.Add(1)
go memProfiling(*memProfDir, *memProfInterval, *memProfNrFiles, shdWg, shdChan)
}
cpuProfChanStop := make(chan struct{})
cpuProfChanDone := make(chan struct{})
if *cpuProfDir != utils.EmptyString {
go cpuProfiling(*cpuProfDir, cpuProfChanStop, cpuProfChanDone, exitChan)
f, err := startCPUProfiling(*cpuProfDir)
if err != nil {
return
}
defer stopCPUProfiling(f)
}
if *scheduledShutdown != utils.EmptyString {
@@ -397,27 +415,36 @@ func main() {
if err != nil {
log.Fatal(err)
}
shdWg.Add(1)
go func() { // Schedule shutdown
time.Sleep(shutdownDur)
close(exitChan)
return
tm := time.NewTimer(shutdownDur)
select {
case <-tm.C:
shdChan.CloseOnce()
case <-shdChan.Done():
tm.Stop()
}
shdWg.Done()
}()
}
// Init config
cfg, err = config.NewCGRConfigFromPath(*cfgPath)
if err != nil {
log.Fatalf("Could not parse config: <%s>", err.Error())
return
}
if *nodeID != utils.EmptyString {
cfg.GeneralCfg().NodeID = *nodeID
}
if *checkConfig {
if err := cfg.CheckConfigSanity(); err != nil {
fmt.Println(err)
}
return
}
if *nodeID != utils.EmptyString {
cfg.GeneralCfg().NodeID = *nodeID
}
config.SetCgrConfig(cfg) // Share the config object
// init syslog
@@ -496,11 +523,13 @@ func main() {
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers): internalDispatcherSChan,
})
gvService := services.NewGlobalVarS(cfg)
shdWg.Add(1)
if err = gvService.Start(); err != nil {
return
}
dmService := services.NewDataDBService(cfg, connManager)
if dmService.ShouldRun() { // Some services can run without db, ie: ERs
shdWg.Add(1)
if err = dmService.Start(); err != nil {
return
}
@@ -508,15 +537,12 @@ func main() {
storDBService := services.NewStorDBService(cfg)
if storDBService.ShouldRun() { // Some services can run without db, ie: ERs
shdWg.Add(1)
if err = storDBService.Start(); err != nil {
return
}
}
// Done initing DBs
engine.SetRoundingDecimals(cfg.GeneralCfg().RoundingDecimals)
engine.SetFailedPostCacheTTL(cfg.GeneralCfg().FailedPostsTTL)
// Rpc/http server
server := cores.NewServer(caps)
if len(cfg.HTTPCfg().DispatchersRegistrarURL) != 0 {
@@ -526,16 +552,16 @@ func main() {
server.RegisterHttpFunc(cfg.ConfigSCfg().URL, config.HandlerConfigS)
}
if *httpPprofPath != "" {
go server.RegisterProfiler(*httpPprofPath)
server.RegisterProfiler(*httpPprofPath)
}
// Async starts here, will follow cgrates.json start order
// Define internal connections via channels
filterSChan := make(chan *engine.FilterS, 1)
// init AnalyzerS
anz := services.NewAnalyzerService(cfg, server, filterSChan, exitChan, internalAnalyzerSChan)
anz := services.NewAnalyzerService(cfg, server, filterSChan, shdChan, internalAnalyzerSChan)
if anz.ShouldRun() {
shdWg.Add(1)
if err := anz.Start(); err != nil {
fmt.Println(err)
return
@@ -544,20 +570,21 @@ func main() {
// init CoreSv1
coreS := services.NewCoreService(cfg, caps, server, internalCoreSv1Chan, anz)
shdWg.Add(1)
if err := coreS.Start(); err != nil {
fmt.Println(err)
return
}
// init CacheS
cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan, anz, coreS.GetCoreS().CapsStats)
cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), shdChan, anz, coreS.GetCoreS().CapsStats)
engine.SetCache(cacheS)
// init GuardianSv1
initGuardianSv1(internalGuardianSChan, server, anz)
// Start ServiceManager
srvManager := servmanager.NewServiceManager(cfg, exitChan)
srvManager := servmanager.NewServiceManager(cfg, shdChan, shdWg)
attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan, anz)
dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalDispatcherSChan, connManager, anz)
dspH := services.NewDispatcherHostsService(cfg, server, connManager, anz)
@@ -576,7 +603,7 @@ func main() {
rals := services.NewRalService(cfg, cacheS, server,
internalRALsChan, internalResponderChan,
exitChan, connManager, anz)
shdChan, connManager, anz)
apiSv1 := services.NewAPIerSv1Service(cfg, dmService, storDBService, filterSChan, server, schS, rals.GetResponder(),
internalAPIerSv1Chan, connManager, anz)
@@ -586,27 +613,27 @@ func main() {
cdrS := services.NewCDRServer(cfg, dmService, storDBService, filterSChan, server, internalCDRServerChan,
connManager, anz)
smg := services.NewSessionService(cfg, dmService, server, internalSessionSChan, exitChan, connManager, caps, anz)
smg := services.NewSessionService(cfg, dmService, server, internalSessionSChan, shdChan, connManager, caps, anz)
ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server,
internalLoaderSChan, connManager, anz)
srvManager.AddServices(gvService, attrS, chrS, tS, stS, reS, routeS, schS, rals,
apiSv1, apiSv2, cdrS, smg, coreS,
services.NewEventReaderService(cfg, filterSChan, exitChan, connManager),
services.NewDNSAgent(cfg, filterSChan, exitChan, connManager),
services.NewFreeswitchAgent(cfg, exitChan, connManager),
services.NewKamailioAgent(cfg, exitChan, connManager),
services.NewAsteriskAgent(cfg, exitChan, connManager), // partial reload
services.NewRadiusAgent(cfg, filterSChan, exitChan, connManager), // partial reload
services.NewDiameterAgent(cfg, filterSChan, exitChan, connManager), // partial reload
services.NewHTTPAgent(cfg, filterSChan, server, connManager), // no reload
services.NewEventReaderService(cfg, filterSChan, shdChan, connManager),
services.NewDNSAgent(cfg, filterSChan, shdChan, connManager),
services.NewFreeswitchAgent(cfg, shdChan, connManager),
services.NewKamailioAgent(cfg, shdChan, connManager),
services.NewAsteriskAgent(cfg, shdChan, connManager), // partial reload
services.NewRadiusAgent(cfg, filterSChan, shdChan, connManager), // partial reload
services.NewDiameterAgent(cfg, filterSChan, shdChan, connManager), // partial reload
services.NewHTTPAgent(cfg, filterSChan, server, connManager), // no reload
ldrs, anz, dspS, dspH, dmService, storDBService,
services.NewEventExporterService(cfg, filterSChan,
connManager, server, internalEEsChan, anz),
services.NewRateService(cfg, cacheS, filterSChan, dmService,
server, internalRateSChan, anz),
services.NewSIPAgent(cfg, filterSChan, exitChan, connManager),
services.NewSIPAgent(cfg, filterSChan, shdChan, connManager),
)
srvManager.StartServices()
// Start FilterS
@@ -645,7 +672,7 @@ func main() {
initConfigSv1(internalConfigChan, server, anz)
if *preload != utils.EmptyString {
runPreload(ldrs, internalLoaderSChan, exitChan)
runPreload(ldrs, internalLoaderSChan, shdChan)
}
// Serve rpc connections
@@ -654,16 +681,21 @@ func main() {
internalAttributeSChan, internalChargerSChan, internalThresholdSChan,
internalRouteSChan, internalSessionSChan, internalAnalyzerSChan,
internalDispatcherSChan, internalLoaderSChan, internalRALsChan,
internalCacheSChan, internalEEsChan, internalRateSChan, rpcStop, exitChan)
<-exitChan
close(rpcStop)
close(signStop)
srvManager.ShutdownServices(cfg.CoreSCfg().ShutdownTimeout)
internalCacheSChan, internalEEsChan, internalRateSChan, shdChan)
if *cpuProfDir != "" { // wait to end cpuProfiling
close(cpuProfChanStop)
<-cpuProfChanDone
<-shdChan.Done()
shtdDone := make(chan struct{})
go func() {
shdWg.Wait()
close(shtdDone)
}()
select {
case <-shtdDone:
case <-time.After(10 * time.Second): //cfg.CoreSCfg().ShutdownTimeout):
utils.Logger.Err(fmt.Sprintf("<%s> Failed to shutdown all subsystems in the given time",
utils.ServiceManager))
}
if *memProfDir != "" { // write last memory profiling
memProfFile(path.Join(*memProfDir, "mem_final.prof"))
}