Engine start order, improved logging, default config listening fix

This commit is contained in:
DanB
2014-01-23 18:19:03 +01:00
parent 70d0e0171c
commit ddeb13bcc4
2 changed files with 47 additions and 44 deletions

View File

@@ -85,7 +85,7 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i/2) * time.Second)
time.Sleep(time.Duration(i+1) * time.Second)
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<Mediator> Could not connect to engine: %v", err))
@@ -127,7 +127,7 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i/2) * time.Second)
time.Sleep(time.Duration(i+1) * time.Second)
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SessionManager> Could not connect to engine: %v", err))
@@ -153,7 +153,7 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage
func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage) {
if cfg.CDRSMediator == INTERNAL {
for i := 0; i < 3; i++ { // ToDo: If the right approach, make the reconnects configurable
time.Sleep(time.Duration(i/2) * time.Second)
time.Sleep(time.Duration(i+1) * time.Second)
if medi != nil { // Got our mediator, no need to wait any longer
break
}
@@ -171,6 +171,7 @@ func startHistoryAgent(scribeServer history.Scribe) {
if cfg.HistoryServer != INTERNAL { // Connect in iteration since there are chances of concurrency here
for i := 0; i < 3; i++ { //ToDo: Make it globally configurable
//engine.Logger.Crit(fmt.Sprintf("<HistoryAgent> Trying to connect, iteration: %d, time %s", i, time.Now()))
if scribeServer, err = history.NewProxyScribe(cfg.HistoryServer); err == nil {
break //Connected so no need to reiterate
} else if i == 2 && err != nil {
@@ -178,7 +179,7 @@ func startHistoryAgent(scribeServer history.Scribe) {
exitChan <- true
return
}
time.Sleep(time.Duration(i/2) * time.Second)
time.Sleep(time.Duration(i+1) * time.Second)
}
}
engine.SetHistoryScribe(scribeServer)
@@ -234,6 +235,22 @@ func main() {
return
}
config.SetCgrConfig(cfg) // Share the config object
if *raterEnabled {
cfg.RaterEnabled = *raterEnabled
}
if *schedEnabled {
cfg.SchedulerEnabled = *schedEnabled
}
if *cdrsEnabled {
cfg.CDRSEnabled = *cdrsEnabled
}
if *cdrcEnabled {
cfg.CdrcEnabled = *cdrcEnabled
}
if *mediatorEnabled {
cfg.MediatorEnabled = *mediatorEnabled
}
// some consitency checks
errCfg := checkConfigSanity()
if errCfg != nil {
@@ -259,33 +276,6 @@ func main() {
}
defer accountDb.Close()
engine.SetAccountingStorage(accountDb)
if *raterEnabled {
cfg.RaterEnabled = *raterEnabled
}
if *schedEnabled {
cfg.SchedulerEnabled = *schedEnabled
}
if *cdrsEnabled {
cfg.CDRSEnabled = *cdrsEnabled
}
if *cdrcEnabled {
cfg.CdrcEnabled = *cdrcEnabled
}
if *mediatorEnabled {
cfg.MediatorEnabled = *mediatorEnabled
}
if cfg.RaterEnabled {
if err := ratingDb.CacheRating(nil, nil, nil); err != nil {
engine.Logger.Crit(fmt.Sprintf("Cache rating error: %s", err.Error()))
return
}
if err := accountDb.CacheAccounting(nil); err != nil {
engine.Logger.Crit(fmt.Sprintf("Cache accounting error: %s", err.Error()))
return
}
}
if cfg.StorDBType == SAME {
logDb = ratingDb.(engine.LogStorage)
@@ -301,6 +291,7 @@ func main() {
// loadDb,cdrDb and logDb are all mapped on the same stordb storage
loadDb = logDb.(engine.LoadStorage)
cdrDb = logDb.(engine.CdrStorage)
engine.SetRoundingMethodAndDecimals(cfg.RoundingMethod, cfg.RoundingDecimals)
if cfg.SMDebitInterval > 0 {
if dp, err := time.ParseDuration(fmt.Sprintf("%vs", cfg.SMDebitInterval)); err == nil {
@@ -309,6 +300,18 @@ func main() {
}
stopHandled := false
if cfg.RaterEnabled { // Cache rating if rater enabled
if err := ratingDb.CacheRating(nil, nil, nil); err != nil {
engine.Logger.Crit(fmt.Sprintf("Cache rating error: %s", err.Error()))
return
}
if err := accountDb.CacheAccounting(nil); err != nil {
engine.Logger.Crit(fmt.Sprintf("Cache accounting error: %s", err.Error()))
return
}
}
// Async starts here
if cfg.RaterEnabled && cfg.RaterBalancer != "" && !cfg.BalancerEnabled {
go registerToBalancer()
@@ -320,20 +323,20 @@ func main() {
apier := &apier.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, Config: cfg}
if cfg.RaterEnabled && !cfg.BalancerEnabled && cfg.RaterBalancer != INTERNAL {
engine.Logger.Info("Starting CGRateS Rater")
engine.Logger.Info("Registering CGRateS Rater service")
server.RpcRegister(responder)
server.RpcRegister(apier)
}
if cfg.BalancerEnabled {
engine.Logger.Info("Starting CGRateS Balancer")
engine.Logger.Info("Registering CGRateS Balancer service")
go stopBalancerSignalHandler()
stopHandled = true
responder.Bal = bal
server.RpcRegister(responder)
server.RpcRegister(apier)
if cfg.RaterEnabled {
engine.Logger.Info("Starting internal engine.")
engine.Logger.Info("<Balancer> Registering internal rater")
bal.AddClient("local", new(engine.ResponderWorker))
}
}
@@ -356,6 +359,7 @@ func main() {
var scribeServer history.Scribe
if cfg.HistoryServerEnabled {
engine.Logger.Info("Registering CGRates History service")
if scribeServer, err = history.NewFileScribe(cfg.HistoryDir, cfg.HistorySaveInterval); err != nil {
engine.Logger.Crit(fmt.Sprintf("<HistoryServer> Could not start, error: %s", err.Error()))
exitChan <- true
@@ -363,34 +367,33 @@ func main() {
}
server.RpcRegisterName("Scribe", scribeServer)
}
engine.Logger.Info("Starting History Service.")
go startHistoryAgent(scribeServer)
go server.ServeGOB(cfg.RPCGOBListen)
go server.ServeJSON(cfg.RPCJSONListen)
go startHistoryAgent(scribeServer)
if cfg.CDRSEnabled {
engine.Logger.Info("Starting CGRateS CDR Server.")
engine.Logger.Info("Registering CGRateS CDR service")
go startCDRS(responder, cdrDb)
}
go server.ServeHTTP(cfg.HTTPListen)
if cfg.MediatorEnabled {
engine.Logger.Info("Starting CGRateS Mediator.")
engine.Logger.Info("Starting CGRateS Mediator service")
go startMediator(responder, logDb, cdrDb)
}
if cfg.SMEnabled {
engine.Logger.Info("Starting CGRateS SessionManager.")
engine.Logger.Info("Starting CGRateS SessionManager service")
go startSessionManager(responder, logDb)
// close all sessions on shutdown
go shutdownSessionmanagerSingnalHandler()
}
if cfg.CdrcEnabled {
engine.Logger.Info("Starting CGRateS CDR Client.")
engine.Logger.Info("Starting CGRateS CDR client")
go startCdrc()
}

View File

@@ -24,9 +24,9 @@
# stordb_user = cgrates # Username to use when connecting to stordb.
# stordb_passwd = CGRateS.org # Password to use when connecting to stordb.
# dbdata_encoding = msgpack # The encoding used to store object data in strings: <msgpack|json>
# rpc_json_listen = "127.0.0.1:2012" # RPC JSON listening address
# rpc_gob_listen = "127.0.0.1:2013" # RPC GOB listening address
# http_listen = "127.0.0.1:2080" # HTTP listening address
# rpc_json_listen = 127.0.0.1:2012 # RPC JSON listening address
# rpc_gob_listen = 127.0.0.1:2013 # RPC GOB listening address
# http_listen = 127.0.0.1:2080 # HTTP listening address
# default_reqtype = rated # Default request type to consider when missing from requests: <""|prepaid|postpaid|pseudoprepaid|rated>.
# default_tor = call # Default Type of Record to consider when missing from requests.
# default_tenant = cgrates.org # Default Tenant to consider when missing from requests.