Files
cgrates/cmd/cgr-engine/cgr-engine.go
2025-10-19 13:15:57 +02:00

412 lines
13 KiB
Go

/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>
*/
package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"path/filepath"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/apis"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/services"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
func main() {
if err := runCGREngine(os.Args[1:]); err != nil {
log.Fatal(err)
}
}
// flags holds all command line arguments. Flag descriptions are set in newFlags.
type flags struct {
*flag.FlagSet
config struct {
path string
check bool
print bool
version bool
}
process struct {
pidFile string
singleCPU bool
scheduledShutdown time.Duration
}
profiling struct {
cpu struct {
dir string
}
mem struct {
dir string
interval time.Duration
maxFiles int
useTS bool
}
}
logger struct {
level int
nodeID string
typ string // syslog|stdout|kafkaLog
}
data struct {
preloadIDs []string
setVersions bool
}
}
// newFlags creates and initializes a new flags instance with default values.
func newFlags() *flags {
f := &flags{
FlagSet: flag.NewFlagSet("cgr-engine", flag.ExitOnError),
}
f.StringVar(&f.config.path, utils.CfgPathCgr, utils.ConfigPath, "Configuration directory path")
f.BoolVar(&f.config.check, utils.CheckCfgCgr, false, "Verify the config without starting the engine")
f.BoolVar(&f.config.print, utils.PrintCfgCgr, false, "Print configuration object in JSON format")
f.BoolVar(&f.config.version, utils.VersionCgr, false, "Print application version and exit")
f.StringVar(&f.process.pidFile, utils.PidCgr, "", "Path to write the PID file")
f.BoolVar(&f.process.singleCPU, utils.SingleCpuCgr, false, "Run on a single CPU core")
f.DurationVar(&f.process.scheduledShutdown, utils.ScheduledShutdownCgr, 0, "Shutdown the engine after the specified duration")
f.StringVar(&f.profiling.cpu.dir, utils.CpuProfDirCgr, "", "Directory for CPU profiles")
f.StringVar(&f.profiling.mem.dir, utils.MemProfDirCgr, "", "Directory for memory profiles")
f.DurationVar(&f.profiling.mem.interval, utils.MemProfIntervalCgr, 15*time.Second, "Interval between memory profile saves")
f.IntVar(&f.profiling.mem.maxFiles, utils.MemProfMaxFilesCgr, 1, "Number of memory profiles to keep (most recent)")
f.BoolVar(&f.profiling.mem.useTS, utils.MemProfTimestampCgr, false, "Add timestamp to memory profile files")
f.IntVar(&f.logger.level, utils.LogLevelCfg, -1, "Log level (0=emergency to 7=debug)")
f.StringVar(&f.logger.nodeID, utils.NodeIDCfg, "", "Node ID of the engine")
f.StringVar(&f.logger.typ, utils.LoggerCfg, "", "Logger type <*syslog|*stdout|*kafkaLog>")
f.Func(utils.PreloadCgr, "Loader IDs used to load data before engine starts", func(val string) error {
f.data.preloadIDs = strings.Split(val, utils.FieldsSep)
return nil
})
f.BoolVar(&f.data.setVersions, utils.SetVersionsCgr, false, "Overwrite database versions")
return f
}
// runCGREngine initializes and runs the CGREngine with the provided command line arguments.
func runCGREngine(fs []string) (err error) {
flags := newFlags()
flags.Parse(fs)
var vers string
if vers, err = utils.GetCGRVersion(); err != nil {
return
}
if flags.config.version {
fmt.Println(vers)
return
}
if flags.process.pidFile != utils.EmptyString {
if err = writePIDFile(flags.process.pidFile); err != nil {
return
}
}
if flags.process.singleCPU {
runtime.GOMAXPROCS(1) // Having multiple cpus may slow down computing due to CPU management, to be reviewed in future Go releases
}
var cfg *config.CGRConfig
if cfg, err = initConfigFromPath(context.TODO(), flags.config.path, flags.logger.nodeID,
flags.logger.typ, flags.logger.level); err != nil || flags.config.check {
return
}
if cfg.LoggerCfg().Level >= 0 {
switch cfg.LoggerCfg().Type {
case utils.MetaSysLog:
utils.Logger, err = utils.NewSysLogger(cfg.GeneralCfg().NodeID, cfg.LoggerCfg().Level)
if err != nil {
return
}
case utils.MetaStdLog, utils.MetaKafkaLog:
// If the logger is of type *kafka, use the *stdout logger until
// LoggerService finishes startup.
utils.Logger = utils.NewStdLogger(cfg.GeneralCfg().NodeID, cfg.LoggerCfg().Level)
default:
return fmt.Errorf("unsupported logger type: %q", cfg.LoggerCfg().Type)
}
}
if flags.config.print {
cfgJSON := utils.ToIJSON(cfg.AsMapInterface())
utils.Logger.Info(fmt.Sprintf(
"Configuration loaded from %q:\n%s", flags.config.path, cfgJSON))
}
var cpuPrfF *os.File
if flags.profiling.cpu.dir != utils.EmptyString {
cpuPath := filepath.Join(flags.profiling.cpu.dir, utils.CpuPathCgr)
if cpuPrfF, err = cores.StartCPUProfiling(cpuPath); err != nil {
return
}
}
shdWg := new(sync.WaitGroup)
shdWg.Add(1)
shutdown := utils.NewSyncedChan()
go handleSignals(shutdown, cfg, shdWg)
if flags.process.scheduledShutdown != 0 {
shdWg.Add(1)
go func() { // Schedule shutdown
defer shdWg.Done()
tm := time.NewTimer(flags.process.scheduledShutdown)
select {
case <-tm.C:
shutdown.CloseOnce()
case <-shutdown.Done():
tm.Stop()
}
}()
}
utils.Logger.Info(fmt.Sprintf("<CoreS> starting version <%s><%s>", vers, runtime.Version()))
// ServiceIndexer will share service references to all services
registry := servmanager.NewServiceRegistry()
coreS := services.NewCoreService(cfg, cpuPrfF, shdWg)
srvManager := servmanager.NewServiceManager(shdWg, cfg, registry, []servmanager.Service{
services.NewGlobalVarS(cfg),
services.NewCapService(cfg),
services.NewCommonListenerService(cfg),
services.NewAnalyzerService(cfg),
services.NewConnManagerService(cfg),
services.NewLoggerService(cfg, flags.logger.typ),
services.NewDataDBService(cfg, flags.data.setVersions),
services.NewConfigService(cfg),
services.NewGuardianService(cfg),
coreS,
services.NewCacheService(cfg),
services.NewFilterService(cfg),
services.NewLoaderService(cfg, flags.data.preloadIDs),
services.NewExportFailoverService(cfg),
services.NewAdminSv1Service(cfg),
services.NewSessionService(cfg),
services.NewAttributeService(cfg),
services.NewChargerService(cfg),
services.NewRouteService(cfg),
services.NewResourceService(cfg),
services.NewIPService(cfg),
services.NewTrendService(cfg),
services.NewRankingService(cfg),
services.NewThresholdService(cfg),
services.NewStatService(cfg),
services.NewEventReaderService(cfg),
services.NewDNSAgent(cfg),
services.NewFreeswitchAgent(cfg),
services.NewKamailioAgent(cfg),
services.NewJanusAgent(cfg),
services.NewAsteriskAgent(cfg),
services.NewRadiusAgent(cfg),
services.NewDiameterAgent(cfg),
services.NewHTTPAgent(cfg),
services.NewSIPAgent(cfg),
services.NewEventExporterService(cfg),
services.NewCDRServer(cfg),
services.NewRegistrarCService(cfg),
services.NewRateService(cfg),
services.NewActionService(cfg),
services.NewAccountService(cfg),
services.NewTPeService(cfg),
services.NewPrometheusAgent(cfg),
})
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), cfg.CoreSCfg().ShutdownTimeout*10)
go func() {
shdWg.Wait()
cancel()
}()
<-ctx.Done()
if ctx.Err() != context.Canceled {
utils.Logger.Err(fmt.Sprintf("<%s> failed to shut down all services in the given time",
utils.ServiceManager))
}
if flags.process.pidFile != utils.EmptyString {
if err := os.Remove(flags.process.pidFile); err != nil {
utils.Logger.Warning("Could not remove pid file: " + err.Error())
}
}
if cpuPrfF != nil && coreS == nil {
pprof.StopCPUProfile()
if err := cpuPrfF.Close(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> %v", utils.CoreS, err))
}
}
// TODO: check if there's any need to manually stop memory profiling.
// It should be stopped automatically during CoreS service shutdown.
utils.Logger.Info(fmt.Sprintf("<%s> stopped all services. CGRateS shutdown!", utils.ServiceManager))
}()
srvManager.StartServices(shutdown)
initServiceManagerV1(cfg, srvManager, registry)
// Serve rpc connections
startRPC(cfg, registry, shutdown)
// TODO: find a better location for this if block
if flags.profiling.mem.dir != "" {
if err := coreS.CoreS().StartMemoryProfiling(cores.MemoryProfilingParams{
DirPath: flags.profiling.mem.dir,
MaxFiles: flags.profiling.mem.maxFiles,
Interval: flags.profiling.mem.interval,
UseTimestamp: flags.profiling.mem.useTS,
}); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> %v", utils.CoreS, err))
}
}
<-shutdown.Done()
return
}
// writePIDFile creates a file at the specified path containing the current process ID.
func writePIDFile(path string) error {
f, err := os.Create(path)
if err != nil {
return fmt.Errorf("failed to create pid file: %s", err)
}
if _, err := f.WriteString(strconv.Itoa(os.Getpid())); err != nil {
f.Close()
return fmt.Errorf("failed to write to pid file: %s", err)
}
if err := f.Close(); err != nil {
return fmt.Errorf("failed to close pid file: %s", err)
}
return nil
}
// initConfigFromPath loads and initializes the CGR configuration from the specified path.
func initConfigFromPath(ctx *context.Context, path, nodeID, logType string, logLevel int) (cfg *config.CGRConfig, err error) {
if cfg, err = config.NewCGRConfigFromPath(ctx, path); err != nil {
err = fmt.Errorf("could not parse config: <%s>", err)
return
}
if !(cfg.ConfigDBCfg().Type == utils.MetaInternal &&
cfg.ConfigDBCfg().Opts.InternalDBDumpInterval == 0) { // ignore if offline internal is disabled
var d config.ConfigDB
if d, err = engine.NewDataDBConn(cfg.ConfigDBCfg().Type,
cfg.ConfigDBCfg().Host, cfg.ConfigDBCfg().Port,
cfg.ConfigDBCfg().Name, cfg.ConfigDBCfg().User,
cfg.ConfigDBCfg().Password, cfg.GeneralCfg().DBDataEncoding, nil, nil,
cfg.ConfigDBCfg().Opts, nil); err != nil { // Cannot configure getter database, show stopper
err = fmt.Errorf("could not configure configDB: <%s>", err)
return
}
if err = cfg.LoadFromDB(ctx, d); err != nil {
err = fmt.Errorf("could not parse config from DB: <%s>", err)
return
}
}
if nodeID != utils.EmptyString {
cfg.GeneralCfg().NodeID = nodeID
}
if logLevel != -1 { // Modify the log level if provided by command arguments
cfg.LoggerCfg().Level = logLevel
}
if logType != utils.EmptyString {
cfg.LoggerCfg().Type = logType
}
if utils.ConcurrentReqsLimit != 0 { // used as shared variable
cfg.CoreSCfg().Caps = utils.ConcurrentReqsLimit
}
if len(utils.ConcurrentReqsStrategy) != 0 {
cfg.CoreSCfg().CapsStrategy = utils.ConcurrentReqsStrategy
}
config.SetCgrConfig(cfg) // Share the config object
return
}
// handleSignals manages system signals for graceful shutdown and configuration reload.
func handleSignals(shutdown *utils.SyncedChan, cfg *config.CGRConfig, shdWg *sync.WaitGroup) {
defer shdWg.Done()
shutdownSignal := make(chan os.Signal, 1)
reloadSignal := make(chan os.Signal, 1)
signal.Notify(shutdownSignal, os.Interrupt,
syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
signal.Notify(reloadSignal, syscall.SIGHUP)
for {
select {
case <-shutdown.Done():
return
case <-shutdownSignal:
shutdown.CloseOnce()
case <-reloadSignal:
// do it in its own goroutine in order to not block the signal handler with the reload functionality
go func() {
var reply string
if err := cfg.V1ReloadConfig(context.TODO(),
new(config.ReloadArgs), &reply); err != nil {
utils.Logger.Warning(
fmt.Sprintf("Error reloading configuration: <%s>", err))
}
}()
}
}
}
// initServiceManagerV1 registers the ServiceManager methods.
func initServiceManagerV1(cfg *config.CGRConfig, srvMngr *servmanager.ServiceManager,
registry *servmanager.ServiceRegistry) {
srvDeps, err := services.WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
utils.ConnManager,
},
registry, cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
cl := srvDeps[utils.CommonListenerS].(*services.CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*services.ConnManagerService)
srv, _ := birpc.NewService(apis.NewServiceManagerV1(srvMngr), utils.EmptyString, false)
cl.RpcRegister(srv)
cms.AddInternalConn(utils.ServiceManager, srv)
}
// startRPC initializes and starts the RPC server.
func startRPC(cfg *config.CGRConfig, registry *servmanager.ServiceRegistry, shutdown *utils.SyncedChan) {
cl := registry.Lookup(utils.CommonListenerS).(*services.CommonListenerService).CLS()
cl.StartServer(cfg, shutdown)
}