diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index a116bb178..6dcaff4d5 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -22,16 +22,37 @@ import ( "fmt" "log" "os" + "os/signal" + "path/filepath" "runtime" + "runtime/pprof" + "strings" + "sync" + "syscall" + "time" + "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/apis" + "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" + "github.com/cgrates/cgrates/efs" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/guardian" + "github.com/cgrates/cgrates/loaders" "github.com/cgrates/cgrates/services" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) +func main() { + if err := runCGREngine(os.Args[1:]); err != nil { + log.Fatal(err) + } +} + // runCGREngine configures the CGREngine object and runs it func runCGREngine(fs []string) (err error) { flags := services.NewCGREngineFlags() @@ -60,18 +81,346 @@ func runCGREngine(fs []string) (err error) { return } - stopChan := make(chan struct{}) - cgr := services.NewCGREngine(stopChan, cfg, servmanager.NewServiceIndexer(), []servmanager.Service{}) - defer cgr.Stop(*flags.PidFile) + var cpuPrfF *os.File + if *flags.CpuPrfDir != utils.EmptyString { + cpuPath := filepath.Join(*flags.CpuPrfDir, utils.CpuPathCgr) + if cpuPrfF, err = cores.StartCPUProfiling(cpuPath); err != nil { + return + } + } - if err = cgr.Run(ctx, cancel, flags, vers, - cores.MemoryProfilingParams{ + shdWg := new(sync.WaitGroup) + shdWg.Add(1) + go handleSignals(ctx, cancel, cfg, shdWg) + + if *flags.ScheduledShutdown != utils.EmptyString { + var shtDwDur time.Duration + if shtDwDur, err = utils.ParseDurationWithNanosecs(*flags.ScheduledShutdown); err != nil { + return + } + shdWg.Add(1) + go func() { // Schedule shutdown + tm := time.NewTimer(shtDwDur) + select { + case <-tm.C: + cancel() + case <-ctx.Done(): + tm.Stop() + } + shdWg.Done() + }() + } + + connMgr := engine.NewConnManager(cfg) + // init syslog + if utils.Logger, err = engine.NewLogger(ctx, + utils.FirstNonEmpty(*flags.Logger, cfg.LoggerCfg().Type), + cfg.GeneralCfg().DefaultTenant, + cfg.GeneralCfg().NodeID, + connMgr, cfg); err != nil { + return fmt.Errorf("Could not initialize syslog connection, err: <%s>", err) + } + efs.SetFailedPostCacheTTL(cfg.EFsCfg().FailedPostsTTL) // init failedPosts to posts loggers/exporters in case of failing + utils.Logger.Info(fmt.Sprintf(" starting version <%s><%s>", vers, runtime.Version())) + + caps := engine.NewCaps(cfg.CoreSCfg().Caps, cfg.CoreSCfg().CapsStrategy) + srvDep := map[string]*sync.WaitGroup{ + utils.AccountS: new(sync.WaitGroup), + utils.ActionS: new(sync.WaitGroup), + utils.AdminS: new(sync.WaitGroup), + utils.AnalyzerS: new(sync.WaitGroup), + utils.AsteriskAgent: new(sync.WaitGroup), + utils.AttributeS: new(sync.WaitGroup), + utils.CDRServer: new(sync.WaitGroup), + utils.ChargerS: new(sync.WaitGroup), + utils.CoreS: new(sync.WaitGroup), + utils.DataDB: new(sync.WaitGroup), + utils.DiameterAgent: new(sync.WaitGroup), + utils.DispatcherS: new(sync.WaitGroup), + utils.DNSAgent: new(sync.WaitGroup), + utils.EEs: new(sync.WaitGroup), + utils.EFs: new(sync.WaitGroup), + utils.ERs: new(sync.WaitGroup), + utils.FreeSWITCHAgent: new(sync.WaitGroup), + utils.GlobalVarS: new(sync.WaitGroup), + utils.HTTPAgent: new(sync.WaitGroup), + utils.KamailioAgent: new(sync.WaitGroup), + utils.LoaderS: new(sync.WaitGroup), + utils.RadiusAgent: new(sync.WaitGroup), + utils.RateS: new(sync.WaitGroup), + utils.RegistrarC: new(sync.WaitGroup), + utils.ResourceS: new(sync.WaitGroup), + utils.RouteS: new(sync.WaitGroup), + utils.SchedulerS: new(sync.WaitGroup), + utils.SessionS: new(sync.WaitGroup), + utils.SIPAgent: new(sync.WaitGroup), + utils.StatS: new(sync.WaitGroup), + utils.TrendS: new(sync.WaitGroup), + utils.StorDB: new(sync.WaitGroup), + utils.ThresholdS: new(sync.WaitGroup), + utils.TPeS: new(sync.WaitGroup), + } + + // init the channel here because we need to pass them to connManager + iServeManagerCh := make(chan birpc.ClientConnector, 1) + iConfigCh := make(chan birpc.ClientConnector, 1) + iCoreSv1Ch := make(chan birpc.ClientConnector, 1) + iCacheSCh := make(chan birpc.ClientConnector, 1) + iGuardianSCh := make(chan birpc.ClientConnector, 1) + iAnalyzerSCh := make(chan birpc.ClientConnector, 1) + iCDRServerCh := make(chan birpc.ClientConnector, 1) + iAttributeSCh := make(chan birpc.ClientConnector, 1) + iDispatcherSCh := make(chan birpc.ClientConnector, 1) + iSessionSCh := make(chan birpc.ClientConnector, 1) + iChargerSCh := make(chan birpc.ClientConnector, 1) + iThresholdSCh := make(chan birpc.ClientConnector, 1) + iStatSCh := make(chan birpc.ClientConnector, 1) + iTrendSCh := make(chan birpc.ClientConnector, 1) + iRankingSCh := make(chan birpc.ClientConnector, 1) + iResourceSCh := make(chan birpc.ClientConnector, 1) + iRouteSCh := make(chan birpc.ClientConnector, 1) + iAdminSCh := make(chan birpc.ClientConnector, 1) + iLoaderSCh := make(chan birpc.ClientConnector, 1) + iEEsCh := make(chan birpc.ClientConnector, 1) + iRateSCh := make(chan birpc.ClientConnector, 1) + iActionSCh := make(chan birpc.ClientConnector, 1) + iAccountSCh := make(chan birpc.ClientConnector, 1) + iTpeSCh := make(chan birpc.ClientConnector, 1) + iEFsCh := make(chan birpc.ClientConnector, 1) + iERsCh := make(chan birpc.ClientConnector, 1) + + // initialize the connManager before creating the DMService + // because we need to pass the connection to it + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAnalyzerS), utils.AnalyzerSv1, iAnalyzerSCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAdminS), utils.AdminSv1, iAdminSCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes), utils.AttributeSv1, iAttributeSCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches), utils.CacheSv1, iCacheSCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs), utils.CDRsV1, iCDRServerCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers), utils.ChargerSv1, iChargerSCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian), utils.GuardianSv1, iGuardianSCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaLoaders), utils.LoaderSv1, iLoaderSCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources), utils.ResourceSv1, iResourceSCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS), utils.SessionSv1, iSessionSCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS), utils.SessionSv1, iSessionSCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats), utils.StatSv1, iStatSCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRankings), utils.RankingSv1, iRankingSCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTrends), utils.TrendSv1, iTrendSCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRoutes), utils.RouteSv1, iRouteSCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds), utils.ThresholdSv1, iThresholdSCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaServiceManager), utils.ServiceManagerV1, iServeManagerCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig), utils.ConfigSv1, iConfigCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore), utils.CoreSv1, iCoreSv1Ch) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs), utils.EeSv1, iEEsCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRates), utils.RateSv1, iRateSCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers), utils.DispatcherSv1, iDispatcherSCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts), utils.AccountSv1, iAccountSCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions), utils.ActionSv1, iActionSCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTpes), utils.TPeSv1, iTpeSCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEFs), utils.EfSv1, iEFsCh) + connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaERs), utils.ErSv1, iERsCh) + + clsCh := make(chan *commonlisteners.CommonListenerS, 1) + anzCh := make(chan *services.AnalyzerService, 1) + iFilterSCh := make(chan *engine.FilterS, 1) + + // ServiceIndexer will share service references to all services + srvIdxr := servmanager.NewServiceIndexer() + gvS := services.NewGlobalVarS(cfg, srvDep, srvIdxr) + dmS := services.NewDataDBService(cfg, connMgr, *flags.SetVersions, srvDep, srvIdxr) + sdbS := services.NewStorDBService(cfg, *flags.SetVersions, srvDep, srvIdxr) + cls := services.NewCommonListenerService(cfg, caps, clsCh, srvDep, srvIdxr) + anzS := services.NewAnalyzerService(cfg, clsCh, iFilterSCh, iAnalyzerSCh, anzCh, srvDep, srvIdxr) + coreS := services.NewCoreService(cfg, caps, clsCh, iCoreSv1Ch, anzCh, cpuPrfF, shdWg, srvDep, srvIdxr) + cacheS := services.NewCacheService(cfg, dmS, connMgr, clsCh, iCacheSCh, anzCh, coreS, srvDep, srvIdxr) + dspS := services.NewDispatcherService(cfg, dmS, cacheS, iFilterSCh, clsCh, iDispatcherSCh, connMgr, anzCh, srvDep, srvIdxr) + ldrs := services.NewLoaderService(cfg, dmS, iFilterSCh, clsCh, iLoaderSCh, connMgr, anzCh, srvDep, srvIdxr) + efs := services.NewExportFailoverService(cfg, connMgr, iEFsCh, clsCh, srvDep, srvIdxr) + adminS := services.NewAdminSv1Service(cfg, dmS, sdbS, iFilterSCh, clsCh, iAdminSCh, connMgr, anzCh, srvDep, srvIdxr) + sessionS := services.NewSessionService(cfg, dmS, iFilterSCh, clsCh, iSessionSCh, connMgr, anzCh, srvDep, srvIdxr) + attrS := services.NewAttributeService(cfg, dmS, cacheS, iFilterSCh, clsCh, iAttributeSCh, anzCh, dspS, srvDep, srvIdxr) + chrgS := services.NewChargerService(cfg, dmS, cacheS, iFilterSCh, clsCh, iChargerSCh, connMgr, anzCh, srvDep, srvIdxr) + routeS := services.NewRouteService(cfg, dmS, cacheS, iFilterSCh, clsCh, iRouteSCh, connMgr, anzCh, srvDep, srvIdxr) + resourceS := services.NewResourceService(cfg, dmS, cacheS, iFilterSCh, clsCh, iResourceSCh, connMgr, anzCh, srvDep, srvIdxr) + trendS := services.NewTrendService(cfg, dmS, cacheS, iFilterSCh, clsCh, iTrendSCh, connMgr, anzCh, srvDep, srvIdxr) + rankingS := services.NewRankingService(cfg, dmS, cacheS, iFilterSCh, clsCh, iRankingSCh, connMgr, anzCh, srvDep, srvIdxr) + thS := services.NewThresholdService(cfg, dmS, cacheS, iFilterSCh, connMgr, clsCh, iThresholdSCh, anzCh, srvDep, srvIdxr) + stS := services.NewStatService(cfg, dmS, cacheS, iFilterSCh, clsCh, iStatSCh, connMgr, anzCh, srvDep, srvIdxr) + erS := services.NewEventReaderService(cfg, iFilterSCh, connMgr, clsCh, iERsCh, anzCh, srvDep, srvIdxr) + dnsAgent := services.NewDNSAgent(cfg, iFilterSCh, connMgr, srvDep, srvIdxr) + fsAgent := services.NewFreeswitchAgent(cfg, connMgr, srvDep, srvIdxr) + kamAgent := services.NewKamailioAgent(cfg, connMgr, srvDep, srvIdxr) + janusAgent := services.NewJanusAgent(cfg, iFilterSCh, clsCh, connMgr, srvDep, srvIdxr) + astAgent := services.NewAsteriskAgent(cfg, connMgr, srvDep, srvIdxr) + radAgent := services.NewRadiusAgent(cfg, iFilterSCh, connMgr, srvDep, srvIdxr) + diamAgent := services.NewDiameterAgent(cfg, iFilterSCh, connMgr, caps, srvDep, srvIdxr) + httpAgent := services.NewHTTPAgent(cfg, iFilterSCh, clsCh, connMgr, srvDep, srvIdxr) + sipAgent := services.NewSIPAgent(cfg, iFilterSCh, connMgr, srvDep, srvIdxr) + eeS := services.NewEventExporterService(cfg, iFilterSCh, connMgr, clsCh, iEEsCh, anzCh, srvDep, srvIdxr) + cdrS := services.NewCDRServer(cfg, dmS, sdbS, iFilterSCh, clsCh, iCDRServerCh, connMgr, anzCh, srvDep, srvIdxr) + registrarcS := services.NewRegistrarCService(cfg, connMgr, srvDep, srvIdxr) + rateS := services.NewRateService(cfg, cacheS, iFilterSCh, dmS, clsCh, iRateSCh, anzCh, srvDep, srvIdxr) + actionS := services.NewActionService(cfg, dmS, cacheS, iFilterSCh, connMgr, clsCh, iActionSCh, anzCh, srvDep, srvIdxr) + accS := services.NewAccountService(cfg, dmS, cacheS, iFilterSCh, connMgr, clsCh, iAccountSCh, anzCh, srvDep, srvIdxr) + tpeS := services.NewTPeService(cfg, connMgr, dmS, clsCh, srvDep, srvIdxr) + + srvManager := servmanager.NewServiceManager(shdWg, connMgr, cfg, srvIdxr, []servmanager.Service{ + gvS, + dmS, + sdbS, + cls, + anzS, + coreS, + cacheS, + dspS, + ldrs, + efs, + adminS, + sessionS, + attrS, + chrgS, + routeS, + resourceS, + trendS, + rankingS, + thS, + stS, + erS, + dnsAgent, + fsAgent, + kamAgent, + janusAgent, + astAgent, + radAgent, + diamAgent, + httpAgent, + sipAgent, + eeS, + cdrS, + registrarcS, + rateS, + actionS, + accS, + tpeS, + }) + + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), cfg.CoreSCfg().ShutdownTimeout*10) + go func() { + shdWg.Wait() + cancel() + }() + <-ctx.Done() + if ctx.Err() != context.Canceled { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to shutdown all subsystems in the given time", + utils.ServiceManager)) + } + if *flags.PidFile != utils.EmptyString { + if err := os.Remove(*flags.PidFile); err != nil { + utils.Logger.Warning("Could not remove pid file: " + err.Error()) + } + } + if cpuPrfF != nil && coreS == nil { + pprof.StopCPUProfile() + if err := cpuPrfF.Close(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> %v", utils.CoreS, err)) + } + } + + // TODO: check if there's any need to manually stop memory profiling. + // It should be stopped automatically during CoreS service shutdown. + + utils.Logger.Info(" stopped all components. CGRateS shutdown!") + }() + + shdWg.Add(1) + if err = gvS.Start(ctx, cancel); err != nil { + shdWg.Done() + srvManager.ShutdownServices() + return + } + if cls.ShouldRun() { + shdWg.Add(1) + if err = cls.Start(ctx, cancel); err != nil { + shdWg.Done() + srvManager.ShutdownServices() + return + } + } + if efs.ShouldRun() { // efs checking first because of loggers + shdWg.Add(1) + if err = efs.Start(ctx, cancel); err != nil { + shdWg.Done() + srvManager.ShutdownServices() + return + } + } + if dmS.ShouldRun() { // Some services can run without db, ie: ERs + shdWg.Add(1) + if err = dmS.Start(ctx, cancel); err != nil { + shdWg.Done() + srvManager.ShutdownServices() + return + } + } + if sdbS.ShouldRun() { + shdWg.Add(1) + if err = sdbS.Start(ctx, cancel); err != nil { + shdWg.Done() + srvManager.ShutdownServices() + return + } + } + + if anzS.ShouldRun() { + shdWg.Add(1) + if err = anzS.Start(ctx, cancel); err != nil { + shdWg.Done() + srvManager.ShutdownServices() + return + } + } else { + anzCh <- anzS + } + + shdWg.Add(1) + if err = coreS.Start(ctx, cancel); err != nil { + shdWg.Done() + srvManager.ShutdownServices() + return + } + shdWg.Add(1) + if err = cacheS.Start(ctx, cancel); err != nil { + shdWg.Done() + srvManager.ShutdownServices() + return + } + srvManager.StartServices(ctx, cancel) + // Start FilterS + go cgrStartFilterService(ctx, iFilterSCh, cacheS.GetCacheSChan(), connMgr, cfg, dmS) + + cgrInitServiceManagerV1(iServeManagerCh, srvManager, cfg, clsCh, anzS) + cgrInitGuardianSv1(iGuardianSCh, cfg, clsCh, anzS) + cgrInitConfigSv1(iConfigCh, cfg, clsCh, anzS) + + if *flags.Preload != utils.EmptyString { + if err = cgrRunPreload(ctx, cfg, *flags.Preload, ldrs); err != nil { + return + } + } + + // Serve rpc connections + cgrStartRPC(ctx, cancel, cfg, clsCh, iDispatcherSCh) + + // TODO: find a better location for this if block + if *flags.MemPrfDir != "" { + if err := coreS.GetCoreS().StartMemoryProfiling(cores.MemoryProfilingParams{ DirPath: *flags.MemPrfDir, MaxFiles: *flags.MemPrfMaxF, Interval: *flags.MemPrfInterval, UseTimestamp: *flags.MemPrfTS, }); err != nil { - return + utils.Logger.Err(fmt.Sprintf("<%s> %v", utils.CoreS, err)) + } } <-ctx.Done() @@ -79,8 +428,138 @@ func runCGREngine(fs []string) (err error) { return } -func main() { - if err := runCGREngine(os.Args[1:]); err != nil { - log.Fatal(err) +func cgrRunPreload(ctx *context.Context, cfg *config.CGRConfig, loaderIDs string, + loader *services.LoaderService) (err error) { + if !cfg.LoaderCfg().Enabled() { + err = fmt.Errorf("<%s> not enabled but required by preload mechanism", utils.LoaderS) + return + } + ch := loader.GetRPCChan() + select { + case ldrs := <-ch: + ch <- ldrs + case <-ctx.Done(): + return + } + + var reply string + for _, loaderID := range strings.Split(loaderIDs, utils.FieldsSep) { + if err = loader.GetLoaderS().V1Run(ctx, &loaders.ArgsProcessFolder{ + APIOpts: map[string]any{ + utils.MetaForceLock: true, // force lock will unlock the file in case is locked and return error + utils.MetaStopOnError: true, + }, + LoaderID: loaderID, + }, &reply); err != nil { + err = fmt.Errorf("<%s> preload failed on loadID <%s> , err: <%s>", utils.LoaderS, loaderID, err) + return + } + } + return +} + +// cgrStartFilterService fires up the FilterS +func cgrStartFilterService(ctx *context.Context, iFilterSCh chan *engine.FilterS, + cacheSCh chan *engine.CacheS, connMgr *engine.ConnManager, + cfg *config.CGRConfig, db *services.DataDBService) { + var cacheS *engine.CacheS + select { + case cacheS = <-cacheSCh: + cacheSCh <- cacheS + case <-ctx.Done(): + return + } + dm, err := db.WaitForDM(ctx) + if err != nil { + return + } + select { + case <-cacheS.GetPrecacheChannel(utils.CacheFilters): + iFilterSCh <- engine.NewFilterS(cfg, connMgr, dm) + case <-ctx.Done(): + } +} + +func cgrInitGuardianSv1(iGuardianSCh chan birpc.ClientConnector, cfg *config.CGRConfig, + clSChan chan *commonlisteners.CommonListenerS, anz *services.AnalyzerService) { + cl := <-clSChan + clSChan <- cl + srv, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true) + if !cfg.DispatcherSCfg().Enabled { + for _, s := range srv { + cl.RpcRegister(s) + } + } + iGuardianSCh <- anz.GetInternalCodec(srv, utils.GuardianS) +} + +func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector, + srvMngr *servmanager.ServiceManager, cfg *config.CGRConfig, + clSChan chan *commonlisteners.CommonListenerS, anz *services.AnalyzerService) { + cl := <-clSChan + clSChan <- cl + srv, _ := birpc.NewService(apis.NewServiceManagerV1(srvMngr), utils.EmptyString, false) + if !cfg.DispatcherSCfg().Enabled { + cl.RpcRegister(srv) + } + iServMngrCh <- anz.GetInternalCodec(srv, utils.ServiceManager) +} + +func cgrInitConfigSv1(iConfigCh chan birpc.ClientConnector, + cfg *config.CGRConfig, clSChan chan *commonlisteners.CommonListenerS, anz *services.AnalyzerService) { + cl := <-clSChan + clSChan <- cl + srv, _ := engine.NewServiceWithName(cfg, utils.ConfigS, true) + // srv, _ := birpc.NewService(apis.NewConfigSv1(cfg), "", false) + if !cfg.DispatcherSCfg().Enabled { + for _, s := range srv { + cl.RpcRegister(s) + } + } + iConfigCh <- anz.GetInternalCodec(srv, utils.ConfigSv1) +} + +func cgrStartRPC(ctx *context.Context, shtdwnEngine context.CancelFunc, + cfg *config.CGRConfig, clSChan chan *commonlisteners.CommonListenerS, internalDispatcherSChan chan birpc.ClientConnector) { + cl := <-clSChan + clSChan <- cl + if cfg.DispatcherSCfg().Enabled { // wait only for dispatcher as cache is allways registered before this + select { + case dispatcherS := <-internalDispatcherSChan: + internalDispatcherSChan <- dispatcherS + case <-ctx.Done(): + return + } + } + cl.StartServer(ctx, shtdwnEngine, cfg) +} + +func handleSignals(ctx *context.Context, shutdown context.CancelFunc, + cfg *config.CGRConfig, shdWg *sync.WaitGroup) { + shutdownSignal := make(chan os.Signal, 1) + reloadSignal := make(chan os.Signal, 1) + signal.Notify(shutdownSignal, os.Interrupt, + syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) + signal.Notify(reloadSignal, syscall.SIGHUP) + for { + select { + case <-ctx.Done(): + shdWg.Done() + return + case <-shutdownSignal: + shutdown() + shdWg.Done() + return + case <-reloadSignal: + // do it in it's own goroutine in order to not block the signal handler with the reload functionality + go func() { + var reply string + if err := cfg.V1ReloadConfig(ctx, + new(config.ReloadArgs), &reply); err != nil { + utils.Logger.Warning( + fmt.Sprintf("Error reloading configuration: <%s>", err)) + } + }() + } } } diff --git a/services/cgr-engine.go b/services/cgr-engine.go deleted file mode 100644 index 248cda846..000000000 --- a/services/cgr-engine.go +++ /dev/null @@ -1,445 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package services - -import ( - "fmt" - "os" - "path/filepath" - "runtime" - "runtime/pprof" - "sync" - "time" - - "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/commonlisteners" - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/cores" - "github.com/cgrates/cgrates/efs" - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/servmanager" - "github.com/cgrates/cgrates/utils" - "github.com/cgrates/rpcclient" -) - -func NewCGREngine(stopChan chan struct{}, cfg *config.CGRConfig, sIdxr *servmanager.ServiceIndexer, - services []servmanager.Service) *CGREngine { - cM := engine.NewConnManager(cfg) - caps := engine.NewCaps(cfg.CoreSCfg().Caps, cfg.CoreSCfg().CapsStrategy) - shdWg := new(sync.WaitGroup) - return &CGREngine{ - stopChan: stopChan, - cfg: cfg, // Engine configuration - cM: cM, - caps: caps, // caps is used to limit RPC CPS - shdWg: shdWg, // wait for shutdown - srvManager: servmanager.NewServiceManager(shdWg, cM, cfg, sIdxr, services), - srvDep: map[string]*sync.WaitGroup{ - utils.AccountS: new(sync.WaitGroup), - utils.ActionS: new(sync.WaitGroup), - utils.AdminS: new(sync.WaitGroup), - utils.AnalyzerS: new(sync.WaitGroup), - utils.AsteriskAgent: new(sync.WaitGroup), - utils.AttributeS: new(sync.WaitGroup), - utils.CDRServer: new(sync.WaitGroup), - utils.ChargerS: new(sync.WaitGroup), - utils.CoreS: new(sync.WaitGroup), - utils.DataDB: new(sync.WaitGroup), - utils.DiameterAgent: new(sync.WaitGroup), - utils.DispatcherS: new(sync.WaitGroup), - utils.DNSAgent: new(sync.WaitGroup), - utils.EEs: new(sync.WaitGroup), - utils.EFs: new(sync.WaitGroup), - utils.ERs: new(sync.WaitGroup), - utils.FreeSWITCHAgent: new(sync.WaitGroup), - utils.GlobalVarS: new(sync.WaitGroup), - utils.HTTPAgent: new(sync.WaitGroup), - utils.KamailioAgent: new(sync.WaitGroup), - utils.LoaderS: new(sync.WaitGroup), - utils.RadiusAgent: new(sync.WaitGroup), - utils.RateS: new(sync.WaitGroup), - utils.RegistrarC: new(sync.WaitGroup), - utils.ResourceS: new(sync.WaitGroup), - utils.RouteS: new(sync.WaitGroup), - utils.SchedulerS: new(sync.WaitGroup), - utils.SessionS: new(sync.WaitGroup), - utils.SIPAgent: new(sync.WaitGroup), - utils.StatS: new(sync.WaitGroup), - utils.TrendS: new(sync.WaitGroup), - utils.StorDB: new(sync.WaitGroup), - utils.ThresholdS: new(sync.WaitGroup), - utils.TPeS: new(sync.WaitGroup), - }, - clsCh: make(chan *commonlisteners.CommonListenerS, 1), - anzCh: make(chan *AnalyzerService, 1), - iFilterSCh: make(chan *engine.FilterS, 1), - } -} - -type CGREngine struct { - stopChan chan struct{} - - cfg *config.CGRConfig - - srvManager *servmanager.ServiceManager - srvDep map[string]*sync.WaitGroup - shdWg *sync.WaitGroup - cM *engine.ConnManager - - caps *engine.Caps - cpuPrfF *os.File - - // services - gvS *GlobalVarS - dmS *DataDBService - sdbS *StorDBService - cls *CommonListenerService - anzS *AnalyzerService - coreS *CoreService - cacheS *CacheService - ldrs *LoaderService - efs *ExportFailoverService - - // chans (need to move this as services) - clsCh chan *commonlisteners.CommonListenerS - anzCh chan *AnalyzerService - iFilterSCh chan *engine.FilterS - iGuardianSCh chan birpc.ClientConnector - iConfigCh chan birpc.ClientConnector - iServeManagerCh chan birpc.ClientConnector - iDispatcherSCh chan birpc.ClientConnector -} - -func (cgr *CGREngine) GetServDeps() map[string]*sync.WaitGroup { - return cgr.srvDep -} - -func (cgr *CGREngine) AddService(service servmanager.Service, connName, apiPrefix string, - iConnCh chan birpc.ClientConnector) { - cgr.srvManager.AddServices(service) - cgr.srvDep[service.ServiceName()] = new(sync.WaitGroup) - cgr.cM.AddInternalConn(connName, apiPrefix, iConnCh) -} - -func (cgr *CGREngine) InitServices(setVersions bool) { - // init the channel here because we need to pass them to connManager - cgr.iServeManagerCh = make(chan birpc.ClientConnector, 1) - cgr.iConfigCh = make(chan birpc.ClientConnector, 1) - iCoreSv1Ch := make(chan birpc.ClientConnector, 1) - iCacheSCh := make(chan birpc.ClientConnector, 1) - cgr.iGuardianSCh = make(chan birpc.ClientConnector, 1) - iAnalyzerSCh := make(chan birpc.ClientConnector, 1) - iCDRServerCh := make(chan birpc.ClientConnector, 1) - iAttributeSCh := make(chan birpc.ClientConnector, 1) - cgr.iDispatcherSCh = make(chan birpc.ClientConnector, 1) - iSessionSCh := make(chan birpc.ClientConnector, 1) - iChargerSCh := make(chan birpc.ClientConnector, 1) - iThresholdSCh := make(chan birpc.ClientConnector, 1) - iStatSCh := make(chan birpc.ClientConnector, 1) - iTrendSCh := make(chan birpc.ClientConnector, 1) - iRankingSCh := make(chan birpc.ClientConnector, 1) - iResourceSCh := make(chan birpc.ClientConnector, 1) - iRouteSCh := make(chan birpc.ClientConnector, 1) - iAdminSCh := make(chan birpc.ClientConnector, 1) - iLoaderSCh := make(chan birpc.ClientConnector, 1) - iEEsCh := make(chan birpc.ClientConnector, 1) - iRateSCh := make(chan birpc.ClientConnector, 1) - iActionSCh := make(chan birpc.ClientConnector, 1) - iAccountSCh := make(chan birpc.ClientConnector, 1) - iTpeSCh := make(chan birpc.ClientConnector, 1) - iEFsCh := make(chan birpc.ClientConnector, 1) - iERsCh := make(chan birpc.ClientConnector, 1) - - // initialize the connManager before creating the DMService - // because we need to pass the connection to it - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAnalyzerS), utils.AnalyzerSv1, iAnalyzerSCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAdminS), utils.AdminSv1, iAdminSCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes), utils.AttributeSv1, iAttributeSCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches), utils.CacheSv1, iCacheSCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs), utils.CDRsV1, iCDRServerCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers), utils.ChargerSv1, iChargerSCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian), utils.GuardianSv1, cgr.iGuardianSCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaLoaders), utils.LoaderSv1, iLoaderSCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources), utils.ResourceSv1, iResourceSCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS), utils.SessionSv1, iSessionSCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS), utils.SessionSv1, iSessionSCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats), utils.StatSv1, iStatSCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRankings), utils.RankingSv1, iRankingSCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTrends), utils.TrendSv1, iTrendSCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRoutes), utils.RouteSv1, iRouteSCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds), utils.ThresholdSv1, iThresholdSCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaServiceManager), utils.ServiceManagerV1, cgr.iServeManagerCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig), utils.ConfigSv1, cgr.iConfigCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore), utils.CoreSv1, iCoreSv1Ch) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs), utils.EeSv1, iEEsCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRates), utils.RateSv1, iRateSCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers), utils.DispatcherSv1, cgr.iDispatcherSCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts), utils.AccountSv1, iAccountSCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions), utils.ActionSv1, iActionSCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTpes), utils.TPeSv1, iTpeSCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEFs), utils.EfSv1, iEFsCh) - cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaERs), utils.ErSv1, iERsCh) - - // ServiceIndexer will share service references to all services - srvIdxr := servmanager.NewServiceIndexer() - cgr.gvS = NewGlobalVarS(cgr.cfg, cgr.srvDep, srvIdxr) - cgr.dmS = NewDataDBService(cgr.cfg, cgr.cM, setVersions, cgr.srvDep, srvIdxr) - cgr.sdbS = NewStorDBService(cgr.cfg, setVersions, cgr.srvDep, srvIdxr) - cgr.cls = NewCommonListenerService(cgr.cfg, cgr.caps, cgr.clsCh, cgr.srvDep, srvIdxr) - cgr.anzS = NewAnalyzerService(cgr.cfg, cgr.clsCh, - cgr.iFilterSCh, iAnalyzerSCh, cgr.anzCh, cgr.srvDep, srvIdxr) - - cgr.coreS = NewCoreService(cgr.cfg, cgr.caps, cgr.clsCh, iCoreSv1Ch, cgr.anzCh, - cgr.cpuPrfF, cgr.shdWg, cgr.srvDep, srvIdxr) - - cgr.cacheS = NewCacheService(cgr.cfg, cgr.dmS, cgr.cM, - cgr.clsCh, iCacheSCh, cgr.anzCh, cgr.coreS, - cgr.srvDep, srvIdxr) - - dspS := NewDispatcherService(cgr.cfg, cgr.dmS, cgr.cacheS, - cgr.iFilterSCh, cgr.clsCh, cgr.iDispatcherSCh, cgr.cM, - cgr.anzCh, cgr.srvDep, srvIdxr) - - cgr.ldrs = NewLoaderService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.clsCh, - iLoaderSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr) - - cgr.efs = NewExportFailoverService(cgr.cfg, cgr.cM, iEFsCh, cgr.clsCh, cgr.srvDep, srvIdxr) - - cgr.srvManager.AddServices(cgr.gvS, cgr.cls, cgr.coreS, cgr.cacheS, - cgr.ldrs, cgr.anzS, dspS, cgr.dmS, cgr.sdbS, cgr.efs, - NewAdminSv1Service(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.clsCh, - iAdminSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr), - NewSessionService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.clsCh, iSessionSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr), - NewAttributeService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, iAttributeSCh, - cgr.anzCh, dspS, cgr.srvDep, srvIdxr), - NewChargerService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, - iChargerSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr), - NewRouteService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, - iRouteSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr), - NewResourceService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, - iResourceSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr), - NewTrendService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, - iTrendSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr), - NewRankingService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, - iRankingSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr), - NewThresholdService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, - cgr.cM, cgr.clsCh, iThresholdSCh, cgr.anzCh, cgr.srvDep, srvIdxr), - NewStatService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, - iStatSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr), - - NewEventReaderService(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.clsCh, iERsCh, cgr.anzCh, cgr.srvDep, srvIdxr), - NewDNSAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep, srvIdxr), - NewFreeswitchAgent(cgr.cfg, cgr.cM, cgr.srvDep, srvIdxr), - NewKamailioAgent(cgr.cfg, cgr.cM, cgr.srvDep, srvIdxr), - NewJanusAgent(cgr.cfg, cgr.iFilterSCh, cgr.clsCh, cgr.cM, cgr.srvDep, srvIdxr), - NewAsteriskAgent(cgr.cfg, cgr.cM, cgr.srvDep, srvIdxr), // partial reload - NewRadiusAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep, srvIdxr), // partial reload - NewDiameterAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.caps, cgr.srvDep, srvIdxr), // partial reload - NewHTTPAgent(cgr.cfg, cgr.iFilterSCh, cgr.clsCh, cgr.cM, cgr.srvDep, srvIdxr), // no reload - NewSIPAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep, srvIdxr), - - NewEventExporterService(cgr.cfg, cgr.iFilterSCh, - cgr.cM, cgr.clsCh, iEEsCh, cgr.anzCh, cgr.srvDep, srvIdxr), - NewCDRServer(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.clsCh, iCDRServerCh, - cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr), - - NewRegistrarCService(cgr.cfg, cgr.cM, cgr.srvDep, srvIdxr), - - NewRateService(cgr.cfg, cgr.cacheS, cgr.iFilterSCh, cgr.dmS, - cgr.clsCh, iRateSCh, cgr.anzCh, cgr.srvDep, srvIdxr), - NewActionService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, - cgr.clsCh, iActionSCh, cgr.anzCh, cgr.srvDep, srvIdxr), - NewAccountService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, - cgr.cM, cgr.clsCh, iAccountSCh, cgr.anzCh, cgr.srvDep, srvIdxr), - NewTPeService(cgr.cfg, cgr.cM, cgr.dmS, cgr.clsCh, cgr.srvDep, srvIdxr), - ) - -} - -// init will initialize the signal handlers and other functions needed by CGRateS service to run -func (cgr *CGREngine) init(ctx *context.Context, shtDw context.CancelFunc, flags *CGREngineFlags, vers string) (err error) { - cgr.shdWg.Add(1) - go cgrSingnalHandler(ctx, shtDw, cgr.cfg, cgr.shdWg) - - var cpuPrfF *os.File - if *flags.CpuPrfDir != utils.EmptyString { - cpuPath := filepath.Join(*flags.CpuPrfDir, utils.CpuPathCgr) - if cpuPrfF, err = cores.StartCPUProfiling(cpuPath); err != nil { - return - } - cgr.cpuPrfF = cpuPrfF - } - - if *flags.ScheduledShutdown != utils.EmptyString { - var shtDwDur time.Duration - if shtDwDur, err = utils.ParseDurationWithNanosecs(*flags.ScheduledShutdown); err != nil { - return - } - cgr.shdWg.Add(1) - go func() { // Schedule shutdown - tm := time.NewTimer(shtDwDur) - select { - case <-tm.C: - shtDw() - case <-ctx.Done(): - tm.Stop() - } - cgr.shdWg.Done() - }() - } - - // init syslog - if utils.Logger, err = engine.NewLogger(ctx, - utils.FirstNonEmpty(*flags.Logger, cgr.cfg.LoggerCfg().Type), - cgr.cfg.GeneralCfg().DefaultTenant, - cgr.cfg.GeneralCfg().NodeID, - cgr.cM, cgr.cfg); err != nil { - return fmt.Errorf("Could not initialize syslog connection, err: <%s>", err) - } - efs.SetFailedPostCacheTTL(cgr.cfg.EFsCfg().FailedPostsTTL) // init failedPosts to posts loggers/exporters in case of failing - utils.Logger.Info(fmt.Sprintf(" starting version <%s><%s>", vers, runtime.Version())) - cgr.InitServices(*flags.SetVersions) - return nil -} - -// startServices will start the services infrastructure -func (cgr *CGREngine) startServices(ctx *context.Context, shtDw context.CancelFunc, preload string, memProfParams cores.MemoryProfilingParams) (err error) { - defer func() { - if err != nil { - cgr.srvManager.ShutdownServices() - } - }() - cgr.shdWg.Add(1) - if err = cgr.gvS.Start(ctx, shtDw); err != nil { - cgr.shdWg.Done() - return - } - if cgr.cls.ShouldRun() { - cgr.shdWg.Add(1) - if err = cgr.cls.Start(ctx, shtDw); err != nil { - cgr.shdWg.Done() - return - } - } - if cgr.efs.ShouldRun() { // efs checking first because of loggers - cgr.shdWg.Add(1) - if err = cgr.efs.Start(ctx, shtDw); err != nil { - cgr.shdWg.Done() - return - } - } - if cgr.dmS.ShouldRun() { // Some services can run without db, ie: ERs - cgr.shdWg.Add(1) - if err = cgr.dmS.Start(ctx, shtDw); err != nil { - cgr.shdWg.Done() - return - } - } - if cgr.sdbS.ShouldRun() { - cgr.shdWg.Add(1) - if err = cgr.sdbS.Start(ctx, shtDw); err != nil { - cgr.shdWg.Done() - return - } - } - if cgr.anzS.ShouldRun() { - cgr.shdWg.Add(1) - if err = cgr.anzS.Start(ctx, shtDw); err != nil { - cgr.shdWg.Done() - return - } - } else { - cgr.anzCh <- cgr.anzS - } - - cgr.shdWg.Add(1) - if err = cgr.coreS.Start(ctx, shtDw); err != nil { - cgr.shdWg.Done() - return - } - cgr.shdWg.Add(1) - if err = cgr.cacheS.Start(ctx, shtDw); err != nil { - cgr.shdWg.Done() - return - } - cgr.srvManager.StartServices(ctx, shtDw) - // Start FilterS - go cgrStartFilterService(ctx, cgr.iFilterSCh, cgr.cacheS.GetCacheSChan(), cgr.cM, - cgr.cfg, cgr.dmS) - - cgrInitServiceManagerV1(cgr.iServeManagerCh, cgr.srvManager, cgr.cfg, cgr.clsCh, cgr.anzS) - cgrInitGuardianSv1(cgr.iGuardianSCh, cgr.cfg, cgr.clsCh, cgr.anzS) - cgrInitConfigSv1(cgr.iConfigCh, cgr.cfg, cgr.clsCh, cgr.anzS) - - if preload != utils.EmptyString { - if err = cgrRunPreload(ctx, cgr.cfg, preload, cgr.ldrs); err != nil { - return - } - } - - // Serve rpc connections - cgrStartRPC(ctx, shtDw, cgr.cfg, cgr.clsCh, cgr.iDispatcherSCh) - - // TODO: find a better location for this if block - if memProfParams.DirPath != "" { - if err := cgr.coreS.cS.StartMemoryProfiling(memProfParams); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> %v", utils.CoreS, err)) - } - } - return -} - -// Run will run the CGREngine, calling it's init and startServices -func (cgr *CGREngine) Run(ctx *context.Context, shtDw context.CancelFunc, stopChan chan struct{}, - flags *CGREngineFlags, vers string, memProfParams cores.MemoryProfilingParams) (err error) { - if err = cgr.init(ctx, shtDw, flags, vers); err != nil { - return - } - return cgr.startServices(ctx, shtDw, *flags.Preload, memProfParams) -} - -func (cgr *CGREngine) Stop(pidFile string) { - ctx, cancel := context.WithTimeout(context.Background(), cgr.cfg.CoreSCfg().ShutdownTimeout*10) - go func() { - cgr.shdWg.Wait() - cancel() - }() - <-ctx.Done() - if ctx.Err() != context.Canceled { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to shutdown all subsystems in the given time", - utils.ServiceManager)) - } - if pidFile != utils.EmptyString { - if err := os.Remove(pidFile); err != nil { - utils.Logger.Warning("Could not remove pid file: " + err.Error()) - } - } - if cgr.cpuPrfF != nil && cgr.coreS == nil { - pprof.StopCPUProfile() - if err := cgr.cpuPrfF.Close(); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> %v", utils.CoreS, err)) - } - } - - // TODO: check if there's any need to manually stop memory profiling. - // It should be stopped automatically during CoreS service shutdown. - - utils.Logger.Info(" stopped all components. CGRateS shutdown!") -} diff --git a/services/cores.go b/services/cores.go index f6d3e19c3..6404a0dab 100644 --- a/services/cores.go +++ b/services/cores.go @@ -169,3 +169,9 @@ func (cS *CoreService) StateChan(stateID string) chan struct{} { func (cS *CoreService) IntRPCConn() birpc.ClientConnector { return cS.intRPCconn } + +func (cS *CoreService) GetCoreS() *cores.CoreS { + cS.mu.RLock() + defer cS.mu.RUnlock() + return cS.cS +} diff --git a/services/libcgr-engine.go b/services/libcgr-engine.go index 4c7fdaf4c..73a35677b 100644 --- a/services/libcgr-engine.go +++ b/services/libcgr-engine.go @@ -22,22 +22,12 @@ import ( "flag" "fmt" "os" - "os/signal" "strconv" - "strings" - "sync" - "syscall" "time" - "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/apis" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/guardian" - "github.com/cgrates/cgrates/loaders" - "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) @@ -85,36 +75,6 @@ type CGREngineFlags struct { SetVersions *bool } -func cgrSingnalHandler(ctx *context.Context, shutdown context.CancelFunc, - cfg *config.CGRConfig, shdWg *sync.WaitGroup) { - shutdownSignal := make(chan os.Signal, 1) - reloadSignal := make(chan os.Signal, 1) - signal.Notify(shutdownSignal, os.Interrupt, - syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) - signal.Notify(reloadSignal, syscall.SIGHUP) - for { - select { - case <-ctx.Done(): - shdWg.Done() - return - case <-shutdownSignal: - shutdown() - shdWg.Done() - return - case <-reloadSignal: - // do it in it's own goroutine in order to not block the signal handler with the reload functionality - go func() { - var reply string - if err := cfg.V1ReloadConfig(ctx, - new(config.ReloadArgs), &reply); err != nil { - utils.Logger.Warning( - fmt.Sprintf("Error reloading configuration: <%s>", err)) - } - }() - } - } -} - func CgrWritePid(pidFile string) (err error) { var f *os.File if f, err = os.Create(pidFile); err != nil { @@ -132,112 +92,6 @@ func CgrWritePid(pidFile string) (err error) { return } -func cgrRunPreload(ctx *context.Context, cfg *config.CGRConfig, loaderIDs string, - loader *LoaderService) (err error) { - if !cfg.LoaderCfg().Enabled() { - err = fmt.Errorf("<%s> not enabled but required by preload mechanism", utils.LoaderS) - return - } - ch := loader.GetRPCChan() - select { - case ldrs := <-ch: - ch <- ldrs - case <-ctx.Done(): - return - } - - var reply string - for _, loaderID := range strings.Split(loaderIDs, utils.FieldsSep) { - if err = loader.GetLoaderS().V1Run(ctx, &loaders.ArgsProcessFolder{ - APIOpts: map[string]any{ - utils.MetaForceLock: true, // force lock will unlock the file in case is locked and return error - utils.MetaStopOnError: true, - }, - LoaderID: loaderID, - }, &reply); err != nil { - err = fmt.Errorf("<%s> preload failed on loadID <%s> , err: <%s>", utils.LoaderS, loaderID, err) - return - } - } - return -} - -// cgrStartFilterService fires up the FilterS -func cgrStartFilterService(ctx *context.Context, iFilterSCh chan *engine.FilterS, - cacheSCh chan *engine.CacheS, connMgr *engine.ConnManager, - cfg *config.CGRConfig, db *DataDBService) { - var cacheS *engine.CacheS - select { - case cacheS = <-cacheSCh: - cacheSCh <- cacheS - case <-ctx.Done(): - return - } - dm, err := db.WaitForDM(ctx) - if err != nil { - return - } - select { - case <-cacheS.GetPrecacheChannel(utils.CacheFilters): - iFilterSCh <- engine.NewFilterS(cfg, connMgr, dm) - case <-ctx.Done(): - } -} - -func cgrInitGuardianSv1(iGuardianSCh chan birpc.ClientConnector, cfg *config.CGRConfig, - clSChan chan *commonlisteners.CommonListenerS, anz *AnalyzerService) { - cl := <-clSChan - clSChan <- cl - srv, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true) - if !cfg.DispatcherSCfg().Enabled { - for _, s := range srv { - cl.RpcRegister(s) - } - } - iGuardianSCh <- anz.GetInternalCodec(srv, utils.GuardianS) -} - -func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector, - srvMngr *servmanager.ServiceManager, cfg *config.CGRConfig, - clSChan chan *commonlisteners.CommonListenerS, anz *AnalyzerService) { - cl := <-clSChan - clSChan <- cl - srv, _ := birpc.NewService(apis.NewServiceManagerV1(srvMngr), utils.EmptyString, false) - if !cfg.DispatcherSCfg().Enabled { - cl.RpcRegister(srv) - } - iServMngrCh <- anz.GetInternalCodec(srv, utils.ServiceManager) -} - -func cgrInitConfigSv1(iConfigCh chan birpc.ClientConnector, - cfg *config.CGRConfig, clSChan chan *commonlisteners.CommonListenerS, anz *AnalyzerService) { - cl := <-clSChan - clSChan <- cl - srv, _ := engine.NewServiceWithName(cfg, utils.ConfigS, true) - // srv, _ := birpc.NewService(apis.NewConfigSv1(cfg), "", false) - if !cfg.DispatcherSCfg().Enabled { - for _, s := range srv { - cl.RpcRegister(s) - } - } - iConfigCh <- anz.GetInternalCodec(srv, utils.ConfigSv1) -} - -func cgrStartRPC(ctx *context.Context, shtdwnEngine context.CancelFunc, - cfg *config.CGRConfig, clSChan chan *commonlisteners.CommonListenerS, internalDispatcherSChan chan birpc.ClientConnector) { - cl := <-clSChan - clSChan <- cl - if cfg.DispatcherSCfg().Enabled { // wait only for dispatcher as cache is allways registered before this - select { - case dispatcherS := <-internalDispatcherSChan: - internalDispatcherSChan <- dispatcherS - case <-ctx.Done(): - return - } - } - cl.StartServer(ctx, shtdwnEngine, cfg) -} - func waitForFilterS(ctx *context.Context, fsCh chan *engine.FilterS) (filterS *engine.FilterS, err error) { select { case <-ctx.Done(): diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index bdf36e019..3bf4dcfd8 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -20,6 +20,7 @@ package servmanager import ( "fmt" + "log" "sync" "github.com/cgrates/birpc" @@ -34,11 +35,12 @@ import ( func NewServiceManager(shdWg *sync.WaitGroup, connMgr *engine.ConnManager, cfg *config.CGRConfig, srvIndxr *ServiceIndexer, services []Service) (sM *ServiceManager) { sM = &ServiceManager{ - cfg: cfg, - subsystems: make(map[string]Service), - shdWg: shdWg, - connMgr: connMgr, - rldChan: cfg.GetReloadChan(), + cfg: cfg, + subsystems: make(map[string]Service), + serviceIndexer: srvIndxr, + shdWg: shdWg, + connMgr: connMgr, + rldChan: cfg.GetReloadChan(), } sM.AddServices(services...) return @@ -103,6 +105,7 @@ func (srvMngr *ServiceManager) AddServices(services ...Service) { srvMngr.connMgr.AddInternalConn(sAPIData[2], sAPIData[0], rpcIntChan) } go func() { // ToDo: centralize management into one single goroutine + log.Printf("service name: %s", srv.ServiceName()) if utils.StructChanTimeout( srvMngr.serviceIndexer.GetService(srv.ServiceName()).StateChan(utils.StateServiceUP), srvMngr.cfg.GeneralCfg().ConnectTimeout) {