From 599a0c214ea7fba042c1a14e194e473eddfd7ef4 Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 25 Aug 2015 20:45:31 +0200 Subject: [PATCH] Redesign of cmd/cgr-engine using channels for intercommunication between services --- cmd/cgr-engine/cgr-engine.go | 469 +++++++++++++++------------------ cmd/cgr-engine/registration.go | 27 +- 2 files changed, 230 insertions(+), 266 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 0923a8c70..791ec7a53 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -26,7 +26,6 @@ import ( "runtime" "runtime/pprof" "strconv" - "sync" "time" "github.com/cgrates/cgrates/apier/v1" @@ -56,53 +55,34 @@ const ( ) var ( - cfgDir = flag.String("config_dir", utils.CONFIG_DIR, "Configuration directory path.") - version = flag.Bool("version", false, "Prints the application version.") - raterEnabled = flag.Bool("rater", false, "Enforce starting of the rater daemon overwriting config") - schedEnabled = flag.Bool("scheduler", false, "Enforce starting of the scheduler daemon .overwriting config") - cdrsEnabled = flag.Bool("cdrs", false, "Enforce starting of the cdrs daemon overwriting config") - pidFile = flag.String("pid", "", "Write pid file") - cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") - singlecpu = flag.Bool("singlecpu", false, "Run on single CPU core") - bal = balancer2go.NewBalancer() - exitChan = make(chan bool) - server = &engine.Server{} - cdrServer *engine.CdrServer - cdrStats engine.StatsInterface - scribeServer history.Scribe - pubSubServer engine.PublisherSubscriber - aliasesServer engine.AliasService - userServer engine.UserService - cfg *config.CGRConfig - sms []sessionmanager.SessionManager - smRpc *v1.SessionManagerV1 - err error + cfgDir = flag.String("config_dir", utils.CONFIG_DIR, "Configuration directory path.") + version = flag.Bool("version", false, "Prints the application version.") + raterEnabled = flag.Bool("rater", false, "Enforce starting of the rater daemon overwriting config") + schedEnabled = flag.Bool("scheduler", false, "Enforce starting of the scheduler daemon .overwriting config") + cdrsEnabled = flag.Bool("cdrs", false, "Enforce starting of the cdrs daemon overwriting config") + pidFile = flag.String("pid", "", "Write pid file") + cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") + singlecpu = flag.Bool("singlecpu", false, "Run on single CPU core") + + cfg *config.CGRConfig + sms []sessionmanager.SessionManager + smRpc *v1.SessionManagerV1 + err error ) -func cacheData(ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, doneChan chan struct{}) { - if err := ratingDb.CacheRatingAll(); err != nil { - engine.Logger.Crit(fmt.Sprintf("Cache rating error: %s", err.Error())) - exitChan <- true - return - } - if err := accountDb.CacheAccountingAll(); err != nil { - engine.Logger.Crit(fmt.Sprintf("Cache accounting error: %s", err.Error())) - exitChan <- true - return - } - close(doneChan) -} - // Fires up a cdrc instance -func startCdrc(responder *engine.Responder, cdrsChan chan struct{}, cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, closeChan chan struct{}) { +func startCdrc(internalCdrSChan chan *engine.CdrServer, internalRaterChan chan *engine.Responder, cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, closeChan chan struct{}, exitChan chan bool) { var cdrsConn engine.Connector var cdrcCfg *config.CdrcConfig for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one break } if cdrcCfg.Cdrs == utils.INTERNAL { - <-cdrsChan // Wait for CDRServer to come up before start processing - cdrsConn = responder + cdrsChan := <-internalCdrSChan // This will signal that the cdrs part is populated in internalRaterChan + internalCdrSChan <- cdrsChan // Put it back for other components + resp := <-internalRaterChan + cdrsConn = resp + internalRaterChan <- resp } else { conn, err := rpcclient.NewRpcClient("tcp", cdrcCfg.Cdrs, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) if err != nil { @@ -124,13 +104,15 @@ func startCdrc(responder *engine.Responder, cdrsChan chan struct{}, cdrcCfgs map exitChan <- true // If run stopped, something is bad, stop the application } -func startSmFreeSWITCH(responder *engine.Responder, cdrDb engine.CdrStorage, cacheChan chan struct{}) { +func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, cdrDb engine.CdrStorage, exitChan chan bool) { + engine.Logger.Info("Starting CGRateS SM-FreeSWITCH service.") var raterConn, cdrsConn engine.Connector var client *rpcclient.RpcClient delay := utils.Fib() if cfg.SmFsConfig.Rater == utils.INTERNAL { - <-cacheChan // Wait for the cache to init before start doing queries - raterConn = responder + resp := <-internalRaterChan + raterConn = resp + internalRaterChan <- resp } else { var err error for i := 0; i < cfg.SmFsConfig.Reconnects; i++ { @@ -150,8 +132,9 @@ func startSmFreeSWITCH(responder *engine.Responder, cdrDb engine.CdrStorage, cac if cfg.SmFsConfig.Cdrs == cfg.SmFsConfig.Rater { cdrsConn = raterConn } else if cfg.SmFsConfig.Cdrs == utils.INTERNAL { - <-cacheChan // Wait for the cache to init before start doing queries - cdrsConn = responder + resp := <-internalRaterChan + cdrsConn = resp + internalRaterChan <- resp } else if len(cfg.SmFsConfig.Cdrs) != 0 { delay = utils.Fib() for i := 0; i < cfg.SmFsConfig.Reconnects; i++ { @@ -177,12 +160,14 @@ func startSmFreeSWITCH(responder *engine.Responder, cdrDb engine.CdrStorage, cac exitChan <- true } -func startSmKamailio(responder *engine.Responder, cdrDb engine.CdrStorage, cacheChan chan struct{}) { +func startSmKamailio(internalRaterChan chan *engine.Responder, cdrDb engine.CdrStorage, exitChan chan bool) { + engine.Logger.Info("Starting CGRateS SM-Kamailio service.") var raterConn, cdrsConn engine.Connector var client *rpcclient.RpcClient if cfg.SmKamConfig.Rater == utils.INTERNAL { - <-cacheChan // Wait for the cache to init before start doing queries - raterConn = responder + resp := <-internalRaterChan + raterConn = resp + internalRaterChan <- resp } else { var err error delay := utils.Fib() @@ -202,8 +187,9 @@ func startSmKamailio(responder *engine.Responder, cdrDb engine.CdrStorage, cache if cfg.SmKamConfig.Cdrs == cfg.SmKamConfig.Rater { cdrsConn = raterConn } else if cfg.SmKamConfig.Cdrs == utils.INTERNAL { - <-cacheChan // Wait for the cache to init before start doing queries - cdrsConn = responder + resp := <-internalRaterChan + cdrsConn = resp + internalRaterChan <- resp } else if len(cfg.SmKamConfig.Cdrs) != 0 { delay := utils.Fib() for i := 0; i < cfg.SmKamConfig.Reconnects; i++ { @@ -229,12 +215,14 @@ func startSmKamailio(responder *engine.Responder, cdrDb engine.CdrStorage, cache exitChan <- true } -func startSmOpenSIPS(responder *engine.Responder, cdrDb engine.CdrStorage, cacheChan chan struct{}) { +func startSmOpenSIPS(internalRaterChan chan *engine.Responder, cdrDb engine.CdrStorage, exitChan chan bool) { + engine.Logger.Info("Starting CGRateS SM-OpenSIPS service.") var raterConn, cdrsConn engine.Connector var client *rpcclient.RpcClient if cfg.SmOsipsConfig.Rater == utils.INTERNAL { - <-cacheChan // Wait for the cache to init before start doing queries - raterConn = responder + resp := <-internalRaterChan + raterConn = resp + internalRaterChan <- resp } else { var err error delay := utils.Fib() @@ -254,8 +242,9 @@ func startSmOpenSIPS(responder *engine.Responder, cdrDb engine.CdrStorage, cache if cfg.SmOsipsConfig.Cdrs == cfg.SmOsipsConfig.Rater { cdrsConn = raterConn } else if cfg.SmOsipsConfig.Cdrs == utils.INTERNAL { - <-cacheChan // Wait for the cache to init before start doing queries - cdrsConn = responder + resp := <-internalRaterChan + cdrsConn = resp + internalRaterChan <- resp } else if len(cfg.SmOsipsConfig.Cdrs) != 0 { delay := utils.Fib() for i := 0; i < cfg.SmOsipsConfig.Reconnects; i++ { @@ -281,14 +270,17 @@ func startSmOpenSIPS(responder *engine.Responder, cdrDb engine.CdrStorage, cache exitChan <- true } -func startCDRS(logDb engine.LogStorage, cdrDb engine.CdrStorage, responder *engine.Responder, responderReady, doneChan chan struct{}) { +func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, cdrDb engine.CdrStorage, + internalRaterChan chan *engine.Responder, internalCdrStatSChan chan engine.StatsInterface, server *engine.Server, exitChan chan bool) { + engine.Logger.Info("Starting CGRateS CDRS service.") var err error var client *rpcclient.RpcClient // Rater connection init var raterConn engine.Connector if cfg.CDRSRater == utils.INTERNAL { - <-responderReady // Wait for the cache to init before start doing queries + responder := <-internalRaterChan // Wait for rater to come up before start querying raterConn = responder + internalRaterChan <- responder // Put back the connection since there might be other entities waiting for it } else if len(cfg.CDRSRater) != 0 { delay := utils.Fib() for i := 0; i < cfg.CDRSReconnects; i++ { @@ -308,7 +300,7 @@ func startCDRS(logDb engine.LogStorage, cdrDb engine.CdrStorage, responder *engi // Stats connection init var statsConn engine.StatsInterface if cfg.CDRSStats == utils.INTERNAL { - statsConn = cdrStats + statsConn = <-internalCdrStatSChan } else if len(cfg.CDRSStats) != 0 { if cfg.CDRSRater == cfg.CDRSStats { statsConn = &engine.ProxyStats{Client: client} @@ -330,34 +322,97 @@ func startCDRS(logDb engine.LogStorage, cdrDb engine.CdrStorage, responder *engi } } - cdrServer, _ = engine.NewCdrServer(cfg, cdrDb, raterConn, statsConn) + cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, raterConn, statsConn) engine.Logger.Info("Registering CDRS HTTP Handlers.") cdrServer.RegisterHanlersToServer(server) engine.Logger.Info("Registering CDRS RPC service.") cdrSrv := v1.CdrsV1{CdrSrv: cdrServer} server.RpcRegister(&cdrSrv) server.RpcRegister(&v2.CdrsV2{CdrsV1: cdrSrv}) - // Make the cdr servers available for internal communication - responder.CdrSrv = cdrServer - close(doneChan) + // Make the cdr server available for internal communication + responder := <-internalRaterChan // Retrieve again the responder + responder.CdrSrv = cdrServer // Attach connection to cdrServer in responder, so it can be used later + internalRaterChan <- responder // Put back the connection for the rest of the system + internalCdrSChan <- cdrServer // Signal that cdrS is operational } -// Starts the rpc server, waiting for the necessary components to finish their tasks -func serveRpc(rpcWaitChans []chan struct{}) { - for _, chn := range rpcWaitChans { - <-chn +func startScheduler(internalSchedulerChan chan *scheduler.Scheduler, ratingDb engine.RatingStorage, exitChan chan bool) { + engine.Logger.Info("Starting CGRateS Scheduler.") + sched := scheduler.NewScheduler() + go reloadSchedulerSingnalHandler(sched, ratingDb) + time.Sleep(1) + internalSchedulerChan <- sched + sched.LoadActionPlans(ratingDb) + sched.Loop() + exitChan <- true // Should not get out of loop though +} + +func startCdrStats(internalCdrStatSChan chan engine.StatsInterface, ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, server *engine.Server) { + cdrStats := engine.NewStats(ratingDb, accountDb, cfg.CDRStatsSaveInterval) + server.RpcRegister(cdrStats) + server.RpcRegister(&v1.CDRStatsV1{CdrStats: cdrStats}) // Public APIs + internalCdrStatSChan <- cdrStats +} + +func startHistoryServer(internalHistorySChan chan history.Scribe, server *engine.Server, exitChan chan bool) { + scribeServer, err := history.NewFileScribe(cfg.HistoryDir, cfg.HistorySaveInterval) + if err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) + exitChan <- true + } + server.RpcRegisterName("ScribeV1", scribeServer) + internalHistorySChan <- scribeServer +} + +func startPubSubServer(internalPubSubSChan chan engine.PublisherSubscriber, accountDb engine.AccountingStorage, server *engine.Server) { + pubSubServer := engine.NewPubSub(accountDb, cfg.HttpSkipTlsVerify) + server.RpcRegisterName("PubSubV1", pubSubServer) + internalPubSubSChan <- pubSubServer +} + +// ToDo: Make sure we are caching before starting this one +func startAliasesServer(internalAliaseSChan chan engine.AliasService, accountDb engine.AccountingStorage, server *engine.Server) { + aliasesServer := engine.NewAliasHandler(accountDb) + server.RpcRegisterName("AliasesV1", aliasesServer) + internalAliaseSChan <- aliasesServer +} + +func startUsersServer(internalUserSChan chan engine.UserService, accountDb engine.AccountingStorage, server *engine.Server, exitChan chan bool) { + userServer, err := engine.NewUserMap(accountDb, cfg.UserServerIndexes) + if err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) + exitChan <- true + } + server.RpcRegisterName("UsersV1", userServer) + internalUserSChan <- userServer +} + +func startRpc(server *engine.Server, internalRaterChan chan *engine.Responder, + internalCdrSChan chan *engine.CdrServer, + internalCdrStatSChan chan engine.StatsInterface, + internalHistorySChan chan history.Scribe, + internalPubSubSChan chan engine.PublisherSubscriber, + internalUserSChan chan engine.UserService, + internalAliaseSChan chan engine.AliasService) { + select { // Any of the rpc methods will unlock listening to rpc requests + case resp := <-internalRaterChan: + internalRaterChan <- resp + case cdrs := <-internalCdrSChan: + internalCdrSChan <- cdrs + case cdrstats := <-internalCdrStatSChan: + internalCdrStatSChan <- cdrstats + case hist := <-internalHistorySChan: + internalHistorySChan <- hist + case pubsubs := <-internalPubSubSChan: + internalPubSubSChan <- pubsubs + case users := <-internalUserSChan: + internalUserSChan <- users + case aliases := <-internalAliaseSChan: + internalAliaseSChan <- aliases } - // Each of the serve blocks so need to start in their own goroutine go server.ServeJSON(cfg.RPCJSONListen) go server.ServeGOB(cfg.RPCGOBListen) -} - -// Starts the http server, waiting for the necessary components to finish their tasks -func serveHttp(httpWaitChans []chan struct{}) { - for _, chn := range httpWaitChans { - <-chn - } - server.ServeHTTP(cfg.HTTPListen) + go server.ServeHTTP(cfg.HTTPListen) } func writePid() { @@ -412,7 +467,7 @@ func main() { var logDb engine.LogStorage var loadDb engine.LoadStorage var cdrDb engine.CdrStorage - if cfg.RaterEnabled || cfg.SchedulerEnabled { // Only connect to dataDb if required + if cfg.RaterEnabled || cfg.SchedulerEnabled { // Only connect to dataDb if necessary ratingDb, err = engine.ConfigureRatingStorage(cfg.TpDbType, cfg.TpDbHost, cfg.TpDbPort, cfg.TpDbName, cfg.TpDbUser, cfg.TpDbPass, cfg.DBDataEncoding) if err != nil { // Cannot configure getter database, show stopper @@ -431,15 +486,11 @@ func main() { engine.SetAccountingStorage(accountDb) } if cfg.RaterEnabled || cfg.CDRSEnabled || cfg.SchedulerEnabled { // Only connect to storDb if necessary - if cfg.StorDBType == SAME { - logDb = ratingDb.(engine.LogStorage) - } else { - logDb, err = engine.ConfigureLogStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort, - cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.DBDataEncoding, cfg.StorDBMaxOpenConns, cfg.StorDBMaxIdleConns) - if err != nil { // Cannot configure logger database, show stopper - engine.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err)) - return - } + logDb, err = engine.ConfigureLogStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort, + cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.DBDataEncoding, cfg.StorDBMaxOpenConns, cfg.StorDBMaxIdleConns) + if err != nil { // Cannot configure logger database, show stopper + engine.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err)) + return } defer logDb.Close() engine.SetStorageLogger(logDb) @@ -452,189 +503,52 @@ func main() { engine.SetRoundingDecimals(cfg.RoundingDecimals) stopHandled := false - // Async starts here + // Rpc/http server + server := new(engine.Server) - rpcWait := make([]chan struct{}, 0) // Rpc server will start as soon as this list is consumed - httpWait := make([]chan struct{}, 0) // Http server will start as soon as this list is consumed + // Async starts here, will follow cgrates.json start order + exitChan := make(chan bool) - var cacheChan chan struct{} - if cfg.RaterEnabled { // Cache rating if rater enabled - cacheChan = make(chan struct{}) - rpcWait = append(rpcWait, cacheChan) - go cacheData(ratingDb, accountDb, cacheChan) - } - - if cfg.RaterEnabled && cfg.RaterBalancer != "" && !cfg.BalancerEnabled { - go registerToBalancer() - go stopRaterSignalHandler() - stopHandled = true - } - if cfg.CDRStatsEnabled { // Init it here so we make it availabe to the Apier - cdrStats = engine.NewStats(ratingDb, accountDb, cfg.CDRStatsSaveInterval) - server.RpcRegister(cdrStats) - server.RpcRegister(&v1.CDRStatsV1{CdrStats: cdrStats}) // Public APIs - } - - if cfg.PubSubServerEnabled { - pubSubServer = engine.NewPubSub(accountDb, cfg.HttpSkipTlsVerify) - server.RpcRegisterName("PubSubV1", pubSubServer) - } - - if cfg.AliasesServerEnabled { - aliasesServer = engine.NewAliasHandler(accountDb) - server.RpcRegisterName("AliasesV1", aliasesServer) - } - - if cfg.HistoryServerEnabled { - scribeServer, err = history.NewFileScribe(cfg.HistoryDir, cfg.HistorySaveInterval) - if err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) - exitChan <- true - } - server.RpcRegisterName("ScribeV1", scribeServer) - } - if cfg.UserServerEnabled { - userServer, err = engine.NewUserMap(accountDb, cfg.UserServerIndexes) - if err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) - exitChan <- true - } - server.RpcRegisterName("UsersV1", userServer) - } - - // Register session manager service // FixMe: make sure this is thread safe - if cfg.SmFsConfig.Enabled || cfg.SmKamConfig.Enabled || cfg.SmOsipsConfig.Enabled { // Register SessionManagerV1 service - smRpc = new(v1.SessionManagerV1) - server.RpcRegister(smRpc) - } - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - if cfg.RaterCdrStats != "" && cfg.RaterCdrStats != utils.INTERNAL { - if cdrStats, err = engine.NewProxyStats(cfg.RaterCdrStats, cfg.ConnectAttempts, -1); err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not connect to the server, error: %s", err.Error())) - exitChan <- true - return - } - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - if cfg.RaterHistoryServer != "" && cfg.RaterHistoryServer != utils.INTERNAL { - if scribeServer, err = history.NewProxyScribe(cfg.RaterHistoryServer, cfg.ConnectAttempts, -1); err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not connect to the server, error: %s", err.Error())) - exitChan <- true - return - } - } - engine.SetHistoryScribe(scribeServer) - }() - - wg.Add(1) - go func() { - defer wg.Done() - if cfg.RaterPubSubServer != "" && cfg.RaterPubSubServer != utils.INTERNAL { - if pubSubServer, err = engine.NewProxyPubSub(cfg.RaterPubSubServer, cfg.ConnectAttempts, -1); err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not connect to the server, error: %s", err.Error())) - exitChan <- true - return - } - } - engine.SetPubSub(pubSubServer) - }() - wg.Add(1) - go func() { - defer wg.Done() - if cfg.RaterAliasesServer != "" && cfg.RaterAliasesServer != utils.INTERNAL { - if aliasesServer, err = engine.NewProxyAliasService(cfg.RaterAliasesServer, cfg.ConnectAttempts, -1); err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not connect to the server, error: %s", err.Error())) - exitChan <- true - return - } - } - engine.SetAliasService(aliasesServer) - }() - wg.Add(1) - go func() { - defer wg.Done() - if cfg.RaterUserServer != "" && cfg.RaterUserServer != utils.INTERNAL { - if userServer, err = engine.NewProxyUserService(cfg.RaterUserServer, cfg.ConnectAttempts, -1); err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not connect to the server, error: %s", err.Error())) - exitChan <- true - return - } - } - engine.SetUserService(userServer) - }() - wg.Wait() - - responder := &engine.Responder{ExitChan: exitChan, Stats: cdrStats} - apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats, Users: userServer} - apierRpcV2 := &v2.ApierV2{ - ApierV1: v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats, Users: userServer}} - - if cfg.RaterEnabled && !cfg.BalancerEnabled && cfg.RaterBalancer != utils.INTERNAL { - engine.Logger.Info("Registering Rater service") - server.RpcRegister(responder) - server.RpcRegister(apierRpcV1) - server.RpcRegister(apierRpcV2) - } + // Define internal connections via channels + internalBalancerChan := make(chan *balancer2go.Balancer, 1) + internalRaterChan := make(chan *engine.Responder, 1) + internalSchedulerChan := make(chan *scheduler.Scheduler, 1) + internalCdrSChan := make(chan *engine.CdrServer, 1) + internalCdrStatSChan := make(chan engine.StatsInterface, 1) + internalHistorySChan := make(chan history.Scribe, 1) + internalPubSubSChan := make(chan engine.PublisherSubscriber, 1) + internalUserSChan := make(chan engine.UserService, 1) + internalAliaseSChan := make(chan engine.AliasService, 1) + // Start balancer service if cfg.BalancerEnabled { - engine.Logger.Info("Registering Balancer service.") - go stopBalancerSignalHandler() - stopHandled = true - responder.Bal = bal - server.RpcRegister(responder) - server.RpcRegister(apierRpcV1) - server.RpcRegister(apierRpcV2) - if cfg.RaterEnabled { - engine.Logger.Info(" Registering internal rater") - bal.AddClient("local", new(engine.ResponderWorker)) - } + go startBalancer(internalBalancerChan, &stopHandled, exitChan) // Not really needed async here but to cope with uniformity } - if !stopHandled { - go generalSignalHandler() + // Start rater service + if cfg.RaterEnabled { + cacheChan := make(chan struct{}) // Share the cacheChan with the rater to inform when cache is ready + go cacheRaterData(cacheChan, ratingDb, accountDb, exitChan) // Handle data caching outside of rater start + go startRater(internalRaterChan, internalBalancerChan, internalSchedulerChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, + cacheChan, server, ratingDb, accountDb, loadDb, cdrDb, logDb, &stopHandled, exitChan) } + // Start Scheduler if cfg.SchedulerEnabled { - engine.Logger.Info("Starting CGRateS Scheduler.") - go func() { - sched := scheduler.NewScheduler() - go reloadSchedulerSingnalHandler(sched, ratingDb) - apierRpcV1.Sched = sched - apierRpcV2.Sched = sched - sched.LoadActionPlans(ratingDb) - sched.Loop() - }() + go startScheduler(internalSchedulerChan, ratingDb, exitChan) } - var cdrsChan chan struct{} + // Start CDR Server if cfg.CDRSEnabled { - engine.Logger.Info("Starting CGRateS CDRS service.") - cdrsChan = make(chan struct{}) - httpWait = append(httpWait, cdrsChan) - go startCDRS(logDb, cdrDb, responder, cacheChan, cdrsChan) + go startCDRS(internalCdrSChan, logDb, cdrDb, internalRaterChan, internalCdrStatSChan, server, exitChan) } - if cfg.SmFsConfig.Enabled { - engine.Logger.Info("Starting CGRateS SM-FreeSWITCH service.") - go startSmFreeSWITCH(responder, cdrDb, cacheChan) - // close all sessions on shutdown - go shutdownSessionmanagerSingnalHandler() - } - if cfg.SmKamConfig.Enabled { - engine.Logger.Info("Starting CGRateS SM-Kamailio service.") - go startSmKamailio(responder, cdrDb, cacheChan) - } - if cfg.SmOsipsConfig.Enabled { - engine.Logger.Info("Starting CGRateS SM-OpenSIPS service.") - go startSmOpenSIPS(responder, cdrDb, cacheChan) + // Start CDR Stats server + if cfg.CDRStatsEnabled { + go startCdrStats(internalCdrStatSChan, ratingDb, accountDb, server) } + + // Start CDRC components var cdrcEnabled bool for _, cdrcCfgs := range cfg.CdrcProfiles { var cdrcCfg *config.CdrcConfig @@ -646,15 +560,58 @@ func main() { } else if !cdrcEnabled { cdrcEnabled = true // Mark that at least one cdrc service is active } - go startCdrc(responder, cdrsChan, cdrcCfgs, cfg.HttpSkipTlsVerify, cfg.ConfigReloads[utils.CDRC]) + go startCdrc(internalCdrSChan, internalRaterChan, cdrcCfgs, cfg.HttpSkipTlsVerify, cfg.ConfigReloads[utils.CDRC], exitChan) } if cdrcEnabled { engine.Logger.Info("Starting CGRateS CDR client.") } - // Start the servers - go serveRpc(rpcWait) - go serveHttp(httpWait) + // Start SM-FreeSWITCH + if cfg.SmFsConfig.Enabled { + go startSmFreeSWITCH(internalRaterChan, cdrDb, exitChan) + // close all sessions on shutdown + go shutdownSessionmanagerSingnalHandler(exitChan) + } + + // Start SM-Kamailio + if cfg.SmKamConfig.Enabled { + go startSmKamailio(internalRaterChan, cdrDb, exitChan) + } + + // Start SM-OpenSIPS + if cfg.SmOsipsConfig.Enabled { + go startSmOpenSIPS(internalRaterChan, cdrDb, exitChan) + } + + // Register session manager service // FixMe: make sure this is thread safe + if cfg.SmFsConfig.Enabled || cfg.SmKamConfig.Enabled || cfg.SmOsipsConfig.Enabled { // Register SessionManagerV1 service + smRpc = new(v1.SessionManagerV1) + server.RpcRegister(smRpc) + } + + // Start HistoryS service + if cfg.HistoryServerEnabled { + go startHistoryServer(internalHistorySChan, server, exitChan) + } + + // Start PubSubS service + if cfg.PubSubServerEnabled { + go startPubSubServer(internalPubSubSChan, accountDb, server) + } + + // Start Aliases service + if cfg.AliasesServerEnabled { + go startAliasesServer(internalAliaseSChan, accountDb, server) + } + + // Start users service + if cfg.UserServerEnabled { + go startUsersServer(internalUserSChan, accountDb, server, exitChan) + } + + // Serve rpc connections + go startRpc(server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalHistorySChan, + internalPubSubSChan, internalUserSChan, internalAliaseSChan) <-exitChan if *pidFile != "" { diff --git a/cmd/cgr-engine/registration.go b/cmd/cgr-engine/registration.go index 97837b68b..bd34958bb 100644 --- a/cmd/cgr-engine/registration.go +++ b/cmd/cgr-engine/registration.go @@ -25,6 +25,7 @@ import ( "os/signal" "syscall" + "github.com/cgrates/cgrates/balancer2go" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/scheduler" ) @@ -32,48 +33,54 @@ import ( /* Listens for SIGTERM, SIGINT, SIGQUIT system signals and shuts down all the registered engines. */ -func stopBalancerSignalHandler() { +func stopBalancerSignalHandler(bal *balancer2go.Balancer, exitChan chan bool) { c := make(chan os.Signal) signal.Notify(c, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) - sig := <-c engine.Logger.Info(fmt.Sprintf("Caught signal %v, sending shutdown to engines\n", sig)) bal.Shutdown("Responder.Shutdown") exitChan <- true } -func generalSignalHandler() { +func generalSignalHandler(internalCdrStatSChan chan engine.StatsInterface, exitChan chan bool) { c := make(chan os.Signal) signal.Notify(c, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) sig := <-c engine.Logger.Info(fmt.Sprintf("Caught signal %v, shuting down cgr-engine\n", sig)) var dummyInt int - if cdrStats != nil { + select { + case cdrStats := <-internalCdrStatSChan: cdrStats.Stop(dummyInt, &dummyInt) + default: } + exitChan <- true } /* Listens for the SIGTERM, SIGINT, SIGQUIT system signals and gracefuly unregister from balancer and closes the storage before exiting. */ -func stopRaterSignalHandler() { +func stopRaterSignalHandler(internalCdrStatSChan chan engine.StatsInterface, exitChan chan bool) { c := make(chan os.Signal) signal.Notify(c, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) sig := <-c engine.Logger.Info(fmt.Sprintf("Caught signal %v, unregistering from balancer\n", sig)) - unregisterFromBalancer() + unregisterFromBalancer(exitChan) var dummyInt int - cdrStats.Stop(dummyInt, &dummyInt) + select { + case cdrStats := <-internalCdrStatSChan: + cdrStats.Stop(dummyInt, &dummyInt) + default: + } exitChan <- true } /* Connects to the balancer and calls unregister RPC method. */ -func unregisterFromBalancer() { +func unregisterFromBalancer(exitChan chan bool) { client, err := rpc.Dial("tcp", cfg.RaterBalancer) if err != nil { engine.Logger.Crit("Cannot contact the balancer!") @@ -92,7 +99,7 @@ func unregisterFromBalancer() { /* Connects to the balancer and rehisters the engine to the server. */ -func registerToBalancer() { +func registerToBalancer(exitChan chan bool) { client, err := rpc.Dial("tcp", cfg.RaterBalancer) if err != nil { engine.Logger.Crit(fmt.Sprintf("Cannot contact the balancer: %v", err)) @@ -126,7 +133,7 @@ func reloadSchedulerSingnalHandler(sched *scheduler.Scheduler, getter engine.Rat /* Listens for the SIGTERM, SIGINT, SIGQUIT system signals and shuts down the session manager. */ -func shutdownSessionmanagerSingnalHandler() { +func shutdownSessionmanagerSingnalHandler(exitChan chan bool) { c := make(chan os.Signal) signal.Notify(c, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) <-c