Init cM,caps,cls,wg inside CGREngine constructor

Also pass the profile to the CGREngine struct the moment profiling started.
This commit is contained in:
ionutboangiu
2024-11-07 15:18:15 +02:00
committed by Dan Christian Bogos
parent 0d9358cf30
commit e78722ae4e
2 changed files with 45 additions and 49 deletions

View File

@@ -23,13 +23,10 @@ import (
"log"
"os"
"runtime"
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/services"
"github.com/cgrates/cgrates/utils"
)
@@ -60,9 +57,7 @@ func RunCGREngine(fs []string) (err error) {
if cfg, err = services.InitConfigFromPath(ctx, *flags.CfgPath, *flags.NodeID, *flags.LogLevel); err != nil || *flags.CheckConfig {
return
}
cps := engine.NewCaps(cfg.CoreSCfg().Caps, cfg.CoreSCfg().CapsStrategy)
cls := commonlisteners.NewCommonListenerS(cps)
cgr := services.NewCGREngine(cfg, engine.NewConnManager(cfg), new(sync.WaitGroup), cls, cps)
cgr := services.NewCGREngine(cfg)
defer cgr.Stop(*flags.PidFile)
if err = cgr.Init(ctx, cancel, flags, vers); err != nil {

View File

@@ -40,15 +40,17 @@ import (
"github.com/cgrates/rpcclient"
)
func NewCGREngine(cfg *config.CGRConfig, cM *engine.ConnManager, shdWg *sync.WaitGroup,
server *commonlisteners.CommonListenerS, caps *engine.Caps) *CGREngine {
func NewCGREngine(cfg *config.CGRConfig) *CGREngine {
cM := engine.NewConnManager(cfg)
caps := engine.NewCaps(cfg.CoreSCfg().Caps, cfg.CoreSCfg().CapsStrategy)
shdWg := new(sync.WaitGroup)
return &CGREngine{
cfg: cfg, // Engine configuration
cM: cM, // connection manager
cfg: cfg, // Engine configuration
cM: cM,
caps: caps, // caps is used to limit RPC CPS
shdWg: shdWg, // wait for shutdown
srvManager: servmanager.NewServiceManager(shdWg, cM, cfg),
server: server, // Rpc/http server
cls: commonlisteners.NewCommonListenerS(caps),
srvDep: map[string]*sync.WaitGroup{
utils.AccountS: new(sync.WaitGroup),
utils.ActionS: new(sync.WaitGroup),
@@ -96,7 +98,7 @@ type CGREngine struct {
srvDep map[string]*sync.WaitGroup
shdWg *sync.WaitGroup
cM *engine.ConnManager
server *commonlisteners.CommonListenerS
cls *commonlisteners.CommonListenerS
caps *engine.Caps
cpuPrfF *os.File
@@ -116,8 +118,7 @@ type CGREngine struct {
iGuardianSCh chan birpc.ClientConnector
iConfigCh chan birpc.ClientConnector
iServeManagerCh chan birpc.ClientConnector
iDispatcherSCh chan birpc.ClientConnector
iDispatcherSCh chan birpc.ClientConnector
}
func (cgr *CGREngine) GetServDeps() map[string]*sync.WaitGroup {
@@ -131,12 +132,12 @@ func (cgr *CGREngine) AddService(service servmanager.Service, connName, apiPrefi
cgr.cM.AddInternalConn(connName, apiPrefix, iConnCh)
}
func (cgr *CGREngine) InitServices(setVersions bool, cpuPrfFl *os.File) {
func (cgr *CGREngine) InitServices(setVersions bool) {
if len(cgr.cfg.HTTPCfg().RegistrarSURL) != 0 {
cgr.server.RegisterHTTPFunc(cgr.cfg.HTTPCfg().RegistrarSURL, registrarc.Registrar)
cgr.cls.RegisterHTTPFunc(cgr.cfg.HTTPCfg().RegistrarSURL, registrarc.Registrar)
}
if cgr.cfg.ConfigSCfg().Enabled {
cgr.server.RegisterHTTPFunc(cgr.cfg.ConfigSCfg().URL, config.HandlerConfigS)
cgr.cls.RegisterHTTPFunc(cgr.cfg.ConfigSCfg().URL, config.HandlerConfigS)
}
// init the channel here because we need to pass them to connManager
@@ -200,70 +201,69 @@ func (cgr *CGREngine) InitServices(setVersions bool, cpuPrfFl *os.File) {
cgr.gvS = NewGlobalVarS(cgr.cfg, cgr.srvDep)
cgr.dmS = NewDataDBService(cgr.cfg, cgr.cM, setVersions, cgr.srvDep)
cgr.sdbS = NewStorDBService(cgr.cfg, setVersions, cgr.srvDep)
cgr.anzS = NewAnalyzerService(cgr.cfg, cgr.server,
cgr.anzS = NewAnalyzerService(cgr.cfg, cgr.cls,
cgr.iFilterSCh, iAnalyzerSCh, cgr.srvDep) // init AnalyzerS
cgr.coreS = NewCoreService(cgr.cfg, cgr.caps, cgr.server, iCoreSv1Ch, cgr.anzS, cpuPrfFl, cgr.shdWg, cgr.srvDep) // init CoreSv1
cgr.cpuPrfF = cpuPrfFl
cgr.coreS = NewCoreService(cgr.cfg, cgr.caps, cgr.cls, iCoreSv1Ch, cgr.anzS, cgr.cpuPrfF, cgr.shdWg, cgr.srvDep) // init CoreSv1
cgr.cacheS = NewCacheService(cgr.cfg, cgr.dmS, cgr.cM,
cgr.server, iCacheSCh, cgr.anzS, cgr.coreS,
cgr.cls, iCacheSCh, cgr.anzS, cgr.coreS,
cgr.srvDep) // init CacheS
dspS := NewDispatcherService(cgr.cfg, cgr.dmS, cgr.cacheS,
cgr.iFilterSCh, cgr.server, cgr.iDispatcherSCh, cgr.cM,
cgr.iFilterSCh, cgr.cls, cgr.iDispatcherSCh, cgr.cM,
cgr.anzS, cgr.srvDep)
cgr.ldrs = NewLoaderService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.server,
cgr.ldrs = NewLoaderService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.cls,
iLoaderSCh, cgr.cM, cgr.anzS, cgr.srvDep)
cgr.efs = NewExportFailoverService(cgr.cfg, cgr.cM, iEFsCh, cgr.server, cgr.srvDep)
cgr.efs = NewExportFailoverService(cgr.cfg, cgr.cM, iEFsCh, cgr.cls, cgr.srvDep)
cgr.srvManager.AddServices(cgr.gvS, cgr.coreS, cgr.cacheS,
cgr.ldrs, cgr.anzS, dspS, cgr.dmS, cgr.sdbS, cgr.efs,
NewAdminSv1Service(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.server,
NewAdminSv1Service(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.cls,
iAdminSCh, cgr.cM, cgr.anzS, cgr.srvDep),
NewSessionService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.server, iSessionSCh, cgr.cM, cgr.anzS, cgr.srvDep),
NewAttributeService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.server, iAttributeSCh,
NewSessionService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.cls, iSessionSCh, cgr.cM, cgr.anzS, cgr.srvDep),
NewAttributeService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls, iAttributeSCh,
cgr.anzS, dspS, cgr.srvDep),
NewChargerService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.server,
NewChargerService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls,
iChargerSCh, cgr.cM, cgr.anzS, cgr.srvDep),
NewRouteService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.server,
NewRouteService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls,
iRouteSCh, cgr.cM, cgr.anzS, cgr.srvDep),
NewResourceService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.server,
NewResourceService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls,
iResourceSCh, cgr.cM, cgr.anzS, cgr.srvDep),
NewTrendService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.server,
NewTrendService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls,
iTrendSCh, cgr.cM, cgr.anzS, cgr.srvDep),
NewRankingService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.server,
NewRankingService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls,
iRankingSCh, cgr.cM, cgr.anzS, cgr.srvDep),
NewThresholdService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh,
cgr.cM, cgr.server, iThresholdSCh, cgr.anzS, cgr.srvDep),
NewStatService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.server,
cgr.cM, cgr.cls, iThresholdSCh, cgr.anzS, cgr.srvDep),
NewStatService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls,
iStatSCh, cgr.cM, cgr.anzS, cgr.srvDep),
NewEventReaderService(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.server, iERsCh, cgr.anzS, cgr.srvDep),
NewEventReaderService(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.cls, iERsCh, cgr.anzS, cgr.srvDep),
NewDNSAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep),
NewFreeswitchAgent(cgr.cfg, cgr.cM, cgr.srvDep),
NewKamailioAgent(cgr.cfg, cgr.cM, cgr.srvDep),
NewJanusAgent(cgr.cfg, cgr.iFilterSCh, cgr.server, cgr.cM, cgr.srvDep),
NewJanusAgent(cgr.cfg, cgr.iFilterSCh, cgr.cls, cgr.cM, cgr.srvDep),
NewAsteriskAgent(cgr.cfg, cgr.cM, cgr.srvDep), // partial reload
NewRadiusAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), // partial reload
NewDiameterAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.caps, cgr.srvDep), // partial reload
NewHTTPAgent(cgr.cfg, cgr.iFilterSCh, cgr.server, cgr.cM, cgr.srvDep), // no reload
NewHTTPAgent(cgr.cfg, cgr.iFilterSCh, cgr.cls, cgr.cM, cgr.srvDep), // no reload
NewSIPAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep),
NewEventExporterService(cgr.cfg, cgr.iFilterSCh,
cgr.cM, cgr.server, iEEsCh, cgr.anzS, cgr.srvDep),
NewCDRServer(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.server, iCDRServerCh,
cgr.cM, cgr.cls, iEEsCh, cgr.anzS, cgr.srvDep),
NewCDRServer(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.cls, iCDRServerCh,
cgr.cM, cgr.anzS, cgr.srvDep),
NewRegistrarCService(cgr.cfg, cgr.server, cgr.cM, cgr.anzS, cgr.srvDep),
NewRegistrarCService(cgr.cfg, cgr.cls, cgr.cM, cgr.anzS, cgr.srvDep),
NewRateService(cgr.cfg, cgr.cacheS, cgr.iFilterSCh, cgr.dmS,
cgr.server, iRateSCh, cgr.anzS, cgr.srvDep),
NewActionService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.server, iActionSCh, cgr.anzS, cgr.srvDep),
NewAccountService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.server, iAccountSCh, cgr.anzS, cgr.srvDep),
NewTPeService(cgr.cfg, cgr.cM, cgr.dmS, cgr.server, cgr.srvDep),
cgr.cls, iRateSCh, cgr.anzS, cgr.srvDep),
NewActionService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.cls, iActionSCh, cgr.anzS, cgr.srvDep),
NewAccountService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.cls, iAccountSCh, cgr.anzS, cgr.srvDep),
NewTPeService(cgr.cfg, cgr.cM, cgr.dmS, cgr.cls, cgr.srvDep),
)
}
@@ -323,9 +323,9 @@ func (cgr *CGREngine) StartServices(ctx *context.Context, shtDw context.CancelFu
go cgrStartFilterService(ctx, cgr.iFilterSCh, cgr.cacheS.GetCacheSChan(), cgr.cM,
cgr.cfg, cgr.dmS)
cgrInitServiceManagerV1(cgr.iServeManagerCh, cgr.srvManager, cgr.cfg, cgr.server, cgr.anzS)
cgrInitGuardianSv1(cgr.iGuardianSCh, cgr.cfg, cgr.server, cgr.anzS) // init GuardianSv1
cgrInitConfigSv1(cgr.iConfigCh, cgr.cfg, cgr.server, cgr.anzS)
cgrInitServiceManagerV1(cgr.iServeManagerCh, cgr.srvManager, cgr.cfg, cgr.cls, cgr.anzS)
cgrInitGuardianSv1(cgr.iGuardianSCh, cgr.cfg, cgr.cls, cgr.anzS) // init GuardianSv1
cgrInitConfigSv1(cgr.iConfigCh, cgr.cfg, cgr.cls, cgr.anzS)
if preload != utils.EmptyString {
if err = cgrRunPreload(ctx, cgr.cfg, preload, cgr.ldrs); err != nil {
@@ -334,7 +334,7 @@ func (cgr *CGREngine) StartServices(ctx *context.Context, shtDw context.CancelFu
}
// Serve rpc connections
cgrStartRPC(ctx, shtDw, cgr.cfg, cgr.server, cgr.iDispatcherSCh)
cgrStartRPC(ctx, shtDw, cgr.cfg, cgr.cls, cgr.iDispatcherSCh)
// TODO: find a better location for this if block
if memProfParams.DirPath != "" {
@@ -355,6 +355,7 @@ func (cgr *CGREngine) Init(ctx *context.Context, shtDw context.CancelFunc, flags
if cpuPrfF, err = cores.StartCPUProfiling(cpuPath); err != nil {
return
}
cgr.cpuPrfF = cpuPrfF
}
if *flags.ScheduledShutdown != utils.EmptyString {
@@ -385,7 +386,7 @@ func (cgr *CGREngine) Init(ctx *context.Context, shtDw context.CancelFunc, flags
}
efs.SetFailedPostCacheTTL(cgr.cfg.EFsCfg().FailedPostsTTL) // init failedPosts to posts loggers/exporters in case of failing
utils.Logger.Info(fmt.Sprintf("<CoreS> starting version <%s><%s>", vers, runtime.Version()))
cgr.InitServices(*flags.SetVersions, cpuPrfF)
cgr.InitServices(*flags.SetVersions)
return nil
}