From 5b424d0e7051e9bc42a8c72e27ba5e17c510f488 Mon Sep 17 00:00:00 2001 From: DanB Date: Sat, 25 Jan 2014 10:46:21 +0100 Subject: [PATCH] Engine components sync via chans --- cmd/cgr-engine/cgr-engine.go | 137 +++++++++++++++++++++++------------ 1 file changed, 92 insertions(+), 45 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 7f2667b66..e5d56743f 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -66,13 +66,28 @@ var ( bal = balancer2go.NewBalancer() exitChan = make(chan bool) server = &engine.Server{} + scribeServer history.Scribe sm sessionmanager.SessionManager medi *mediator.Mediator cfg *config.CGRConfig err error ) -func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrDb engine.CdrStorage) { +func cacheData(ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, doneChan chan struct{}) { + if err := ratingDb.CacheRating(nil, nil, nil); err != nil { + engine.Logger.Crit(fmt.Sprintf("Cache rating error: %s", err.Error())) + exitChan <- true + return + } + if err := accountDb.CacheAccounting(nil); err != nil { + engine.Logger.Crit(fmt.Sprintf("Cache accounting error: %s", err.Error())) + exitChan <- true + return + } + close(doneChan) +} + +func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrDb engine.CdrStorage, chanDone chan struct{}) { var connector engine.Connector if cfg.MediatorRater == INTERNAL { connector = responder @@ -90,6 +105,7 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD if err != nil { engine.Logger.Crit(fmt.Sprintf(" Could not connect to engine: %v", err)) exitChan <- true + return } connector = &engine.RPCClientConnector{Client: client} } @@ -98,7 +114,9 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD if err != nil { engine.Logger.Crit(fmt.Sprintf("Mediator config parsing error: %v", err)) exitChan <- true + return } + close(chanDone) } func startCdrc() { @@ -150,26 +168,36 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage exitChan <- true } -func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage) { +func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage, mediChan, doneChan chan struct{}) { if cfg.CDRSMediator == INTERNAL { - for i := 0; i < 3; i++ { // ToDo: If the right approach, make the reconnects configurable - time.Sleep(time.Duration(i+1) * time.Second) - if medi != nil { // Got our mediator, no need to wait any longer - break - } - } + <-mediChan // Deadlock if mediator not started if medi == nil { engine.Logger.Crit(" Could not connect to mediator, exiting.") exitChan <- true + return } } cs := cdrs.New(cdrDb, medi, cfg) cs.RegisterHanlersToServer(server) + close(doneChan) } -func startHistoryAgent(scribeServer history.Scribe) { - if cfg.HistoryAgentEnabled && cfg.HistoryServer != INTERNAL { // Connect in iteration since there are chances of concurrency here - engine.Logger.Info("Starting History Agent.") +func startHistoryServer(chanDone chan struct{}) { + if scribeServer, err = history.NewFileScribe(cfg.HistoryDir, cfg.HistorySaveInterval); err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) + exitChan <- true + return + } + server.RpcRegisterName("Scribe", scribeServer) + close(chanDone) +} + +// chanStartServer will report when server is up, useful for internal requests +func startHistoryAgent(scribeServer history.Scribe, chanServerStarted chan struct{}) { + if cfg.HistoryServer == INTERNAL { // For internal requests, wait for server to come online before connecting + engine.Logger.Crit(fmt.Sprintf(" Connecting internally to HistoryServer")) + <-chanServerStarted // If server is not enabled, will have deadlock here + } else { // Connect in iteration since there are chances of concurrency here for i := 0; i < 3; i++ { //ToDo: Make it globally configurable //engine.Logger.Crit(fmt.Sprintf(" Trying to connect, iteration: %d, time %s", i, time.Now())) if scribeServer, err = history.NewProxyScribe(cfg.HistoryServer); err == nil { @@ -179,13 +207,32 @@ func startHistoryAgent(scribeServer history.Scribe) { exitChan <- true return } - time.Sleep(time.Duration(i+1) * time.Second) + time.Sleep(time.Duration(i) * time.Second) } } engine.SetHistoryScribe(scribeServer) return } +// Starts the rpc server, waiting for the necessary components to finish their tasks +func serveRpc(rpcWaitChans []chan struct{}) { + for _, chn := range rpcWaitChans { + <-chn + } + // Each of the serve block so need to start in their own goroutine + go server.ServeJSON(cfg.RPCJSONListen) + go server.ServeGOB(cfg.RPCGOBListen) + +} + +// Starts the http server, waiting for the necessary components to finish their tasks +func serveHttp(httpWaitChans []chan struct{}) { + for _, chn := range httpWaitChans { + <-chn + } + server.ServeHTTP(cfg.HTTPListen) +} + func checkConfigSanity() error { if cfg.SMEnabled && cfg.RaterEnabled && cfg.RaterBalancer != "" { engine.Logger.Crit("The session manager must not be enabled on a worker engine (change [engine]/balancer to disabled)!") @@ -301,18 +348,17 @@ func main() { stopHandled := false + // Async starts here + + rpcWait := make([]chan struct{}, 0) // Rpc server will start as soon as this list is consumed + httpWait := make([]chan struct{}, 0) // Http server will start as soon as this list is consumed + if cfg.RaterEnabled { // Cache rating if rater enabled - if err := ratingDb.CacheRating(nil, nil, nil); err != nil { - engine.Logger.Crit(fmt.Sprintf("Cache rating error: %s", err.Error())) - return - } - if err := accountDb.CacheAccounting(nil); err != nil { - engine.Logger.Crit(fmt.Sprintf("Cache accounting error: %s", err.Error())) - return - } + cacheChan := make(chan struct{}) + rpcWait = append(rpcWait, cacheChan) + go cacheData(ratingDb, accountDb, cacheChan) } - // Async starts here if cfg.RaterEnabled && cfg.RaterBalancer != "" && !cfg.BalancerEnabled { go registerToBalancer() go stopRaterSignalHandler() @@ -329,7 +375,7 @@ func main() { } if cfg.BalancerEnabled { - engine.Logger.Info("Registering CGRateS Balancer service") + engine.Logger.Info("Registering CGRateS Balancer service.") go stopBalancerSignalHandler() stopHandled = true responder.Bal = bal @@ -356,47 +402,48 @@ func main() { }() } - var scribeServer history.Scribe - + var histServChan chan struct{} // Will be initialized only if the server starts if cfg.HistoryServerEnabled { - engine.Logger.Info("Registering CGRates History service") - if scribeServer, err = history.NewFileScribe(cfg.HistoryDir, cfg.HistorySaveInterval); err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) - exitChan <- true - return - } - server.RpcRegisterName("Scribe", scribeServer) + histServChan = make(chan struct{}) + rpcWait = append(rpcWait, histServChan) + go startHistoryServer(histServChan) } - go server.ServeGOB(cfg.RPCGOBListen) - go server.ServeJSON(cfg.RPCJSONListen) + if cfg.HistoryAgentEnabled { + engine.Logger.Info("Starting CGRateS History Agent.") + go startHistoryAgent(scribeServer, histServChan) + } - go startHistoryAgent(scribeServer) + var medChan chan struct{} + if cfg.MediatorEnabled { + engine.Logger.Info("Starting CGRateS Mediator service.") + medChan = make(chan struct{}) + go startMediator(responder, logDb, cdrDb, medChan) + } if cfg.CDRSEnabled { - engine.Logger.Info("Registering CGRateS CDR service") - go startCDRS(responder, cdrDb) - } - - go server.ServeHTTP(cfg.HTTPListen) - - if cfg.MediatorEnabled { - engine.Logger.Info("Starting CGRateS Mediator service") - go startMediator(responder, logDb, cdrDb) + engine.Logger.Info("Starting CGRateS CDRS service.") + cdrsChan := make(chan struct{}) + httpWait = append(httpWait, cdrsChan) + go startCDRS(responder, cdrDb, medChan, cdrsChan) } if cfg.SMEnabled { - engine.Logger.Info("Starting CGRateS SessionManager service") + engine.Logger.Info("Starting CGRateS SessionManager service.") go startSessionManager(responder, logDb) // close all sessions on shutdown go shutdownSessionmanagerSingnalHandler() } if cfg.CdrcEnabled { - engine.Logger.Info("Starting CGRateS CDR client") + engine.Logger.Info("Starting CGRateS CDR client.") go startCdrc() } + // Start the servers + go serveRpc(rpcWait) + go serveHttp(httpWait) + <-exitChan if *pidFile != "" { if err := os.Remove(*pidFile); err != nil {