Engine components sync via chans

This commit is contained in:
DanB
2014-01-25 10:46:21 +01:00
parent b6dde967f2
commit 5b424d0e70

View File

@@ -66,13 +66,28 @@ var (
bal = balancer2go.NewBalancer()
exitChan = make(chan bool)
server = &engine.Server{}
scribeServer history.Scribe
sm sessionmanager.SessionManager
medi *mediator.Mediator
cfg *config.CGRConfig
err error
)
func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrDb engine.CdrStorage) {
func cacheData(ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, doneChan chan struct{}) {
if err := ratingDb.CacheRating(nil, nil, nil); err != nil {
engine.Logger.Crit(fmt.Sprintf("Cache rating error: %s", err.Error()))
exitChan <- true
return
}
if err := accountDb.CacheAccounting(nil); err != nil {
engine.Logger.Crit(fmt.Sprintf("Cache accounting error: %s", err.Error()))
exitChan <- true
return
}
close(doneChan)
}
func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrDb engine.CdrStorage, chanDone chan struct{}) {
var connector engine.Connector
if cfg.MediatorRater == INTERNAL {
connector = responder
@@ -90,6 +105,7 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<Mediator> Could not connect to engine: %v", err))
exitChan <- true
return
}
connector = &engine.RPCClientConnector{Client: client}
}
@@ -98,7 +114,9 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD
if err != nil {
engine.Logger.Crit(fmt.Sprintf("Mediator config parsing error: %v", err))
exitChan <- true
return
}
close(chanDone)
}
func startCdrc() {
@@ -150,26 +168,36 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage
exitChan <- true
}
func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage) {
func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage, mediChan, doneChan chan struct{}) {
if cfg.CDRSMediator == INTERNAL {
for i := 0; i < 3; i++ { // ToDo: If the right approach, make the reconnects configurable
time.Sleep(time.Duration(i+1) * time.Second)
if medi != nil { // Got our mediator, no need to wait any longer
break
}
}
<-mediChan // Deadlock if mediator not started
if medi == nil {
engine.Logger.Crit("<CDRS> Could not connect to mediator, exiting.")
exitChan <- true
return
}
}
cs := cdrs.New(cdrDb, medi, cfg)
cs.RegisterHanlersToServer(server)
close(doneChan)
}
func startHistoryAgent(scribeServer history.Scribe) {
if cfg.HistoryAgentEnabled && cfg.HistoryServer != INTERNAL { // Connect in iteration since there are chances of concurrency here
engine.Logger.Info("Starting History Agent.")
func startHistoryServer(chanDone chan struct{}) {
if scribeServer, err = history.NewFileScribe(cfg.HistoryDir, cfg.HistorySaveInterval); err != nil {
engine.Logger.Crit(fmt.Sprintf("<HistoryServer> Could not start, error: %s", err.Error()))
exitChan <- true
return
}
server.RpcRegisterName("Scribe", scribeServer)
close(chanDone)
}
// chanStartServer will report when server is up, useful for internal requests
func startHistoryAgent(scribeServer history.Scribe, chanServerStarted chan struct{}) {
if cfg.HistoryServer == INTERNAL { // For internal requests, wait for server to come online before connecting
engine.Logger.Crit(fmt.Sprintf("<HistoryAgent> Connecting internally to HistoryServer"))
<-chanServerStarted // If server is not enabled, will have deadlock here
} else { // Connect in iteration since there are chances of concurrency here
for i := 0; i < 3; i++ { //ToDo: Make it globally configurable
//engine.Logger.Crit(fmt.Sprintf("<HistoryAgent> Trying to connect, iteration: %d, time %s", i, time.Now()))
if scribeServer, err = history.NewProxyScribe(cfg.HistoryServer); err == nil {
@@ -179,13 +207,32 @@ func startHistoryAgent(scribeServer history.Scribe) {
exitChan <- true
return
}
time.Sleep(time.Duration(i+1) * time.Second)
time.Sleep(time.Duration(i) * time.Second)
}
}
engine.SetHistoryScribe(scribeServer)
return
}
// Starts the rpc server, waiting for the necessary components to finish their tasks
func serveRpc(rpcWaitChans []chan struct{}) {
for _, chn := range rpcWaitChans {
<-chn
}
// Each of the serve block so need to start in their own goroutine
go server.ServeJSON(cfg.RPCJSONListen)
go server.ServeGOB(cfg.RPCGOBListen)
}
// Starts the http server, waiting for the necessary components to finish their tasks
func serveHttp(httpWaitChans []chan struct{}) {
for _, chn := range httpWaitChans {
<-chn
}
server.ServeHTTP(cfg.HTTPListen)
}
func checkConfigSanity() error {
if cfg.SMEnabled && cfg.RaterEnabled && cfg.RaterBalancer != "" {
engine.Logger.Crit("The session manager must not be enabled on a worker engine (change [engine]/balancer to disabled)!")
@@ -301,18 +348,17 @@ func main() {
stopHandled := false
// Async starts here
rpcWait := make([]chan struct{}, 0) // Rpc server will start as soon as this list is consumed
httpWait := make([]chan struct{}, 0) // Http server will start as soon as this list is consumed
if cfg.RaterEnabled { // Cache rating if rater enabled
if err := ratingDb.CacheRating(nil, nil, nil); err != nil {
engine.Logger.Crit(fmt.Sprintf("Cache rating error: %s", err.Error()))
return
}
if err := accountDb.CacheAccounting(nil); err != nil {
engine.Logger.Crit(fmt.Sprintf("Cache accounting error: %s", err.Error()))
return
}
cacheChan := make(chan struct{})
rpcWait = append(rpcWait, cacheChan)
go cacheData(ratingDb, accountDb, cacheChan)
}
// Async starts here
if cfg.RaterEnabled && cfg.RaterBalancer != "" && !cfg.BalancerEnabled {
go registerToBalancer()
go stopRaterSignalHandler()
@@ -329,7 +375,7 @@ func main() {
}
if cfg.BalancerEnabled {
engine.Logger.Info("Registering CGRateS Balancer service")
engine.Logger.Info("Registering CGRateS Balancer service.")
go stopBalancerSignalHandler()
stopHandled = true
responder.Bal = bal
@@ -356,47 +402,48 @@ func main() {
}()
}
var scribeServer history.Scribe
var histServChan chan struct{} // Will be initialized only if the server starts
if cfg.HistoryServerEnabled {
engine.Logger.Info("Registering CGRates History service")
if scribeServer, err = history.NewFileScribe(cfg.HistoryDir, cfg.HistorySaveInterval); err != nil {
engine.Logger.Crit(fmt.Sprintf("<HistoryServer> Could not start, error: %s", err.Error()))
exitChan <- true
return
}
server.RpcRegisterName("Scribe", scribeServer)
histServChan = make(chan struct{})
rpcWait = append(rpcWait, histServChan)
go startHistoryServer(histServChan)
}
go server.ServeGOB(cfg.RPCGOBListen)
go server.ServeJSON(cfg.RPCJSONListen)
if cfg.HistoryAgentEnabled {
engine.Logger.Info("Starting CGRateS History Agent.")
go startHistoryAgent(scribeServer, histServChan)
}
go startHistoryAgent(scribeServer)
var medChan chan struct{}
if cfg.MediatorEnabled {
engine.Logger.Info("Starting CGRateS Mediator service.")
medChan = make(chan struct{})
go startMediator(responder, logDb, cdrDb, medChan)
}
if cfg.CDRSEnabled {
engine.Logger.Info("Registering CGRateS CDR service")
go startCDRS(responder, cdrDb)
}
go server.ServeHTTP(cfg.HTTPListen)
if cfg.MediatorEnabled {
engine.Logger.Info("Starting CGRateS Mediator service")
go startMediator(responder, logDb, cdrDb)
engine.Logger.Info("Starting CGRateS CDRS service.")
cdrsChan := make(chan struct{})
httpWait = append(httpWait, cdrsChan)
go startCDRS(responder, cdrDb, medChan, cdrsChan)
}
if cfg.SMEnabled {
engine.Logger.Info("Starting CGRateS SessionManager service")
engine.Logger.Info("Starting CGRateS SessionManager service.")
go startSessionManager(responder, logDb)
// close all sessions on shutdown
go shutdownSessionmanagerSingnalHandler()
}
if cfg.CdrcEnabled {
engine.Logger.Info("Starting CGRateS CDR client")
engine.Logger.Info("Starting CGRateS CDR client.")
go startCdrc()
}
// Start the servers
go serveRpc(rpcWait)
go serveHttp(httpWait)
<-exitChan
if *pidFile != "" {
if err := os.Remove(*pidFile); err != nil {