diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 78f0a9075..022b089b1 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -23,13 +23,10 @@ import ( "log" "os" "runtime" - "sync" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/services" "github.com/cgrates/cgrates/utils" ) @@ -60,9 +57,7 @@ func RunCGREngine(fs []string) (err error) { if cfg, err = services.InitConfigFromPath(ctx, *flags.CfgPath, *flags.NodeID, *flags.LogLevel); err != nil || *flags.CheckConfig { return } - cps := engine.NewCaps(cfg.CoreSCfg().Caps, cfg.CoreSCfg().CapsStrategy) - cls := commonlisteners.NewCommonListenerS(cps) - cgr := services.NewCGREngine(cfg, engine.NewConnManager(cfg), new(sync.WaitGroup), cls, cps) + cgr := services.NewCGREngine(cfg) defer cgr.Stop(*flags.PidFile) if err = cgr.Init(ctx, cancel, flags, vers); err != nil { diff --git a/services/cgr-engine.go b/services/cgr-engine.go index ec8226127..be744b649 100644 --- a/services/cgr-engine.go +++ b/services/cgr-engine.go @@ -40,15 +40,17 @@ import ( "github.com/cgrates/rpcclient" ) -func NewCGREngine(cfg *config.CGRConfig, cM *engine.ConnManager, shdWg *sync.WaitGroup, - server *commonlisteners.CommonListenerS, caps *engine.Caps) *CGREngine { +func NewCGREngine(cfg *config.CGRConfig) *CGREngine { + cM := engine.NewConnManager(cfg) + caps := engine.NewCaps(cfg.CoreSCfg().Caps, cfg.CoreSCfg().CapsStrategy) + shdWg := new(sync.WaitGroup) return &CGREngine{ - cfg: cfg, // Engine configuration - cM: cM, // connection manager + cfg: cfg, // Engine configuration + cM: cM, caps: caps, // caps is used to limit RPC CPS shdWg: shdWg, // wait for shutdown srvManager: servmanager.NewServiceManager(shdWg, cM, cfg), - server: server, // Rpc/http server + cls: commonlisteners.NewCommonListenerS(caps), srvDep: map[string]*sync.WaitGroup{ utils.AccountS: new(sync.WaitGroup), utils.ActionS: new(sync.WaitGroup), @@ -96,7 +98,7 @@ type CGREngine struct { srvDep map[string]*sync.WaitGroup shdWg *sync.WaitGroup cM *engine.ConnManager - server *commonlisteners.CommonListenerS + cls *commonlisteners.CommonListenerS caps *engine.Caps cpuPrfF *os.File @@ -116,8 +118,7 @@ type CGREngine struct { iGuardianSCh chan birpc.ClientConnector iConfigCh chan birpc.ClientConnector iServeManagerCh chan birpc.ClientConnector - - iDispatcherSCh chan birpc.ClientConnector + iDispatcherSCh chan birpc.ClientConnector } func (cgr *CGREngine) GetServDeps() map[string]*sync.WaitGroup { @@ -131,12 +132,12 @@ func (cgr *CGREngine) AddService(service servmanager.Service, connName, apiPrefi cgr.cM.AddInternalConn(connName, apiPrefix, iConnCh) } -func (cgr *CGREngine) InitServices(setVersions bool, cpuPrfFl *os.File) { +func (cgr *CGREngine) InitServices(setVersions bool) { if len(cgr.cfg.HTTPCfg().RegistrarSURL) != 0 { - cgr.server.RegisterHTTPFunc(cgr.cfg.HTTPCfg().RegistrarSURL, registrarc.Registrar) + cgr.cls.RegisterHTTPFunc(cgr.cfg.HTTPCfg().RegistrarSURL, registrarc.Registrar) } if cgr.cfg.ConfigSCfg().Enabled { - cgr.server.RegisterHTTPFunc(cgr.cfg.ConfigSCfg().URL, config.HandlerConfigS) + cgr.cls.RegisterHTTPFunc(cgr.cfg.ConfigSCfg().URL, config.HandlerConfigS) } // init the channel here because we need to pass them to connManager @@ -200,70 +201,69 @@ func (cgr *CGREngine) InitServices(setVersions bool, cpuPrfFl *os.File) { cgr.gvS = NewGlobalVarS(cgr.cfg, cgr.srvDep) cgr.dmS = NewDataDBService(cgr.cfg, cgr.cM, setVersions, cgr.srvDep) cgr.sdbS = NewStorDBService(cgr.cfg, setVersions, cgr.srvDep) - cgr.anzS = NewAnalyzerService(cgr.cfg, cgr.server, + cgr.anzS = NewAnalyzerService(cgr.cfg, cgr.cls, cgr.iFilterSCh, iAnalyzerSCh, cgr.srvDep) // init AnalyzerS - cgr.coreS = NewCoreService(cgr.cfg, cgr.caps, cgr.server, iCoreSv1Ch, cgr.anzS, cpuPrfFl, cgr.shdWg, cgr.srvDep) // init CoreSv1 - cgr.cpuPrfF = cpuPrfFl + cgr.coreS = NewCoreService(cgr.cfg, cgr.caps, cgr.cls, iCoreSv1Ch, cgr.anzS, cgr.cpuPrfF, cgr.shdWg, cgr.srvDep) // init CoreSv1 cgr.cacheS = NewCacheService(cgr.cfg, cgr.dmS, cgr.cM, - cgr.server, iCacheSCh, cgr.anzS, cgr.coreS, + cgr.cls, iCacheSCh, cgr.anzS, cgr.coreS, cgr.srvDep) // init CacheS dspS := NewDispatcherService(cgr.cfg, cgr.dmS, cgr.cacheS, - cgr.iFilterSCh, cgr.server, cgr.iDispatcherSCh, cgr.cM, + cgr.iFilterSCh, cgr.cls, cgr.iDispatcherSCh, cgr.cM, cgr.anzS, cgr.srvDep) - cgr.ldrs = NewLoaderService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.server, + cgr.ldrs = NewLoaderService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.cls, iLoaderSCh, cgr.cM, cgr.anzS, cgr.srvDep) - cgr.efs = NewExportFailoverService(cgr.cfg, cgr.cM, iEFsCh, cgr.server, cgr.srvDep) + cgr.efs = NewExportFailoverService(cgr.cfg, cgr.cM, iEFsCh, cgr.cls, cgr.srvDep) cgr.srvManager.AddServices(cgr.gvS, cgr.coreS, cgr.cacheS, cgr.ldrs, cgr.anzS, dspS, cgr.dmS, cgr.sdbS, cgr.efs, - NewAdminSv1Service(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.server, + NewAdminSv1Service(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.cls, iAdminSCh, cgr.cM, cgr.anzS, cgr.srvDep), - NewSessionService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.server, iSessionSCh, cgr.cM, cgr.anzS, cgr.srvDep), - NewAttributeService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.server, iAttributeSCh, + NewSessionService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.cls, iSessionSCh, cgr.cM, cgr.anzS, cgr.srvDep), + NewAttributeService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls, iAttributeSCh, cgr.anzS, dspS, cgr.srvDep), - NewChargerService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.server, + NewChargerService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls, iChargerSCh, cgr.cM, cgr.anzS, cgr.srvDep), - NewRouteService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.server, + NewRouteService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls, iRouteSCh, cgr.cM, cgr.anzS, cgr.srvDep), - NewResourceService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.server, + NewResourceService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls, iResourceSCh, cgr.cM, cgr.anzS, cgr.srvDep), - NewTrendService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.server, + NewTrendService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls, iTrendSCh, cgr.cM, cgr.anzS, cgr.srvDep), - NewRankingService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.server, + NewRankingService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls, iRankingSCh, cgr.cM, cgr.anzS, cgr.srvDep), NewThresholdService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, - cgr.cM, cgr.server, iThresholdSCh, cgr.anzS, cgr.srvDep), - NewStatService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.server, + cgr.cM, cgr.cls, iThresholdSCh, cgr.anzS, cgr.srvDep), + NewStatService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls, iStatSCh, cgr.cM, cgr.anzS, cgr.srvDep), - NewEventReaderService(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.server, iERsCh, cgr.anzS, cgr.srvDep), + NewEventReaderService(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.cls, iERsCh, cgr.anzS, cgr.srvDep), NewDNSAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), NewFreeswitchAgent(cgr.cfg, cgr.cM, cgr.srvDep), NewKamailioAgent(cgr.cfg, cgr.cM, cgr.srvDep), - NewJanusAgent(cgr.cfg, cgr.iFilterSCh, cgr.server, cgr.cM, cgr.srvDep), + NewJanusAgent(cgr.cfg, cgr.iFilterSCh, cgr.cls, cgr.cM, cgr.srvDep), NewAsteriskAgent(cgr.cfg, cgr.cM, cgr.srvDep), // partial reload NewRadiusAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), // partial reload NewDiameterAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.caps, cgr.srvDep), // partial reload - NewHTTPAgent(cgr.cfg, cgr.iFilterSCh, cgr.server, cgr.cM, cgr.srvDep), // no reload + NewHTTPAgent(cgr.cfg, cgr.iFilterSCh, cgr.cls, cgr.cM, cgr.srvDep), // no reload NewSIPAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), NewEventExporterService(cgr.cfg, cgr.iFilterSCh, - cgr.cM, cgr.server, iEEsCh, cgr.anzS, cgr.srvDep), - NewCDRServer(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.server, iCDRServerCh, + cgr.cM, cgr.cls, iEEsCh, cgr.anzS, cgr.srvDep), + NewCDRServer(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.cls, iCDRServerCh, cgr.cM, cgr.anzS, cgr.srvDep), - NewRegistrarCService(cgr.cfg, cgr.server, cgr.cM, cgr.anzS, cgr.srvDep), + NewRegistrarCService(cgr.cfg, cgr.cls, cgr.cM, cgr.anzS, cgr.srvDep), NewRateService(cgr.cfg, cgr.cacheS, cgr.iFilterSCh, cgr.dmS, - cgr.server, iRateSCh, cgr.anzS, cgr.srvDep), - NewActionService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.server, iActionSCh, cgr.anzS, cgr.srvDep), - NewAccountService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.server, iAccountSCh, cgr.anzS, cgr.srvDep), - NewTPeService(cgr.cfg, cgr.cM, cgr.dmS, cgr.server, cgr.srvDep), + cgr.cls, iRateSCh, cgr.anzS, cgr.srvDep), + NewActionService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.cls, iActionSCh, cgr.anzS, cgr.srvDep), + NewAccountService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.cls, iAccountSCh, cgr.anzS, cgr.srvDep), + NewTPeService(cgr.cfg, cgr.cM, cgr.dmS, cgr.cls, cgr.srvDep), ) } @@ -323,9 +323,9 @@ func (cgr *CGREngine) StartServices(ctx *context.Context, shtDw context.CancelFu go cgrStartFilterService(ctx, cgr.iFilterSCh, cgr.cacheS.GetCacheSChan(), cgr.cM, cgr.cfg, cgr.dmS) - cgrInitServiceManagerV1(cgr.iServeManagerCh, cgr.srvManager, cgr.cfg, cgr.server, cgr.anzS) - cgrInitGuardianSv1(cgr.iGuardianSCh, cgr.cfg, cgr.server, cgr.anzS) // init GuardianSv1 - cgrInitConfigSv1(cgr.iConfigCh, cgr.cfg, cgr.server, cgr.anzS) + cgrInitServiceManagerV1(cgr.iServeManagerCh, cgr.srvManager, cgr.cfg, cgr.cls, cgr.anzS) + cgrInitGuardianSv1(cgr.iGuardianSCh, cgr.cfg, cgr.cls, cgr.anzS) // init GuardianSv1 + cgrInitConfigSv1(cgr.iConfigCh, cgr.cfg, cgr.cls, cgr.anzS) if preload != utils.EmptyString { if err = cgrRunPreload(ctx, cgr.cfg, preload, cgr.ldrs); err != nil { @@ -334,7 +334,7 @@ func (cgr *CGREngine) StartServices(ctx *context.Context, shtDw context.CancelFu } // Serve rpc connections - cgrStartRPC(ctx, shtDw, cgr.cfg, cgr.server, cgr.iDispatcherSCh) + cgrStartRPC(ctx, shtDw, cgr.cfg, cgr.cls, cgr.iDispatcherSCh) // TODO: find a better location for this if block if memProfParams.DirPath != "" { @@ -355,6 +355,7 @@ func (cgr *CGREngine) Init(ctx *context.Context, shtDw context.CancelFunc, flags if cpuPrfF, err = cores.StartCPUProfiling(cpuPath); err != nil { return } + cgr.cpuPrfF = cpuPrfF } if *flags.ScheduledShutdown != utils.EmptyString { @@ -385,7 +386,7 @@ func (cgr *CGREngine) Init(ctx *context.Context, shtDw context.CancelFunc, flags } efs.SetFailedPostCacheTTL(cgr.cfg.EFsCfg().FailedPostsTTL) // init failedPosts to posts loggers/exporters in case of failing utils.Logger.Info(fmt.Sprintf(" starting version <%s><%s>", vers, runtime.Version())) - cgr.InitServices(*flags.SetVersions, cpuPrfF) + cgr.InitServices(*flags.SetVersions) return nil }