mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Redesign of cmd/cgr-engine using channels for intercommunication between services
This commit is contained in:
@@ -26,7 +26,6 @@ import (
|
||||
"runtime"
|
||||
"runtime/pprof"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/apier/v1"
|
||||
@@ -56,53 +55,34 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
cfgDir = flag.String("config_dir", utils.CONFIG_DIR, "Configuration directory path.")
|
||||
version = flag.Bool("version", false, "Prints the application version.")
|
||||
raterEnabled = flag.Bool("rater", false, "Enforce starting of the rater daemon overwriting config")
|
||||
schedEnabled = flag.Bool("scheduler", false, "Enforce starting of the scheduler daemon .overwriting config")
|
||||
cdrsEnabled = flag.Bool("cdrs", false, "Enforce starting of the cdrs daemon overwriting config")
|
||||
pidFile = flag.String("pid", "", "Write pid file")
|
||||
cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
|
||||
singlecpu = flag.Bool("singlecpu", false, "Run on single CPU core")
|
||||
bal = balancer2go.NewBalancer()
|
||||
exitChan = make(chan bool)
|
||||
server = &engine.Server{}
|
||||
cdrServer *engine.CdrServer
|
||||
cdrStats engine.StatsInterface
|
||||
scribeServer history.Scribe
|
||||
pubSubServer engine.PublisherSubscriber
|
||||
aliasesServer engine.AliasService
|
||||
userServer engine.UserService
|
||||
cfg *config.CGRConfig
|
||||
sms []sessionmanager.SessionManager
|
||||
smRpc *v1.SessionManagerV1
|
||||
err error
|
||||
cfgDir = flag.String("config_dir", utils.CONFIG_DIR, "Configuration directory path.")
|
||||
version = flag.Bool("version", false, "Prints the application version.")
|
||||
raterEnabled = flag.Bool("rater", false, "Enforce starting of the rater daemon overwriting config")
|
||||
schedEnabled = flag.Bool("scheduler", false, "Enforce starting of the scheduler daemon .overwriting config")
|
||||
cdrsEnabled = flag.Bool("cdrs", false, "Enforce starting of the cdrs daemon overwriting config")
|
||||
pidFile = flag.String("pid", "", "Write pid file")
|
||||
cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
|
||||
singlecpu = flag.Bool("singlecpu", false, "Run on single CPU core")
|
||||
|
||||
cfg *config.CGRConfig
|
||||
sms []sessionmanager.SessionManager
|
||||
smRpc *v1.SessionManagerV1
|
||||
err error
|
||||
)
|
||||
|
||||
func cacheData(ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, doneChan chan struct{}) {
|
||||
if err := ratingDb.CacheRatingAll(); err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("Cache rating error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
if err := accountDb.CacheAccountingAll(); err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("Cache accounting error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
close(doneChan)
|
||||
}
|
||||
|
||||
// Fires up a cdrc instance
|
||||
func startCdrc(responder *engine.Responder, cdrsChan chan struct{}, cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, closeChan chan struct{}) {
|
||||
func startCdrc(internalCdrSChan chan *engine.CdrServer, internalRaterChan chan *engine.Responder, cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, closeChan chan struct{}, exitChan chan bool) {
|
||||
var cdrsConn engine.Connector
|
||||
var cdrcCfg *config.CdrcConfig
|
||||
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
|
||||
break
|
||||
}
|
||||
if cdrcCfg.Cdrs == utils.INTERNAL {
|
||||
<-cdrsChan // Wait for CDRServer to come up before start processing
|
||||
cdrsConn = responder
|
||||
cdrsChan := <-internalCdrSChan // This will signal that the cdrs part is populated in internalRaterChan
|
||||
internalCdrSChan <- cdrsChan // Put it back for other components
|
||||
resp := <-internalRaterChan
|
||||
cdrsConn = resp
|
||||
internalRaterChan <- resp
|
||||
} else {
|
||||
conn, err := rpcclient.NewRpcClient("tcp", cdrcCfg.Cdrs, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
|
||||
if err != nil {
|
||||
@@ -124,13 +104,15 @@ func startCdrc(responder *engine.Responder, cdrsChan chan struct{}, cdrcCfgs map
|
||||
exitChan <- true // If run stopped, something is bad, stop the application
|
||||
}
|
||||
|
||||
func startSmFreeSWITCH(responder *engine.Responder, cdrDb engine.CdrStorage, cacheChan chan struct{}) {
|
||||
func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, cdrDb engine.CdrStorage, exitChan chan bool) {
|
||||
engine.Logger.Info("Starting CGRateS SM-FreeSWITCH service.")
|
||||
var raterConn, cdrsConn engine.Connector
|
||||
var client *rpcclient.RpcClient
|
||||
delay := utils.Fib()
|
||||
if cfg.SmFsConfig.Rater == utils.INTERNAL {
|
||||
<-cacheChan // Wait for the cache to init before start doing queries
|
||||
raterConn = responder
|
||||
resp := <-internalRaterChan
|
||||
raterConn = resp
|
||||
internalRaterChan <- resp
|
||||
} else {
|
||||
var err error
|
||||
for i := 0; i < cfg.SmFsConfig.Reconnects; i++ {
|
||||
@@ -150,8 +132,9 @@ func startSmFreeSWITCH(responder *engine.Responder, cdrDb engine.CdrStorage, cac
|
||||
if cfg.SmFsConfig.Cdrs == cfg.SmFsConfig.Rater {
|
||||
cdrsConn = raterConn
|
||||
} else if cfg.SmFsConfig.Cdrs == utils.INTERNAL {
|
||||
<-cacheChan // Wait for the cache to init before start doing queries
|
||||
cdrsConn = responder
|
||||
resp := <-internalRaterChan
|
||||
cdrsConn = resp
|
||||
internalRaterChan <- resp
|
||||
} else if len(cfg.SmFsConfig.Cdrs) != 0 {
|
||||
delay = utils.Fib()
|
||||
for i := 0; i < cfg.SmFsConfig.Reconnects; i++ {
|
||||
@@ -177,12 +160,14 @@ func startSmFreeSWITCH(responder *engine.Responder, cdrDb engine.CdrStorage, cac
|
||||
exitChan <- true
|
||||
}
|
||||
|
||||
func startSmKamailio(responder *engine.Responder, cdrDb engine.CdrStorage, cacheChan chan struct{}) {
|
||||
func startSmKamailio(internalRaterChan chan *engine.Responder, cdrDb engine.CdrStorage, exitChan chan bool) {
|
||||
engine.Logger.Info("Starting CGRateS SM-Kamailio service.")
|
||||
var raterConn, cdrsConn engine.Connector
|
||||
var client *rpcclient.RpcClient
|
||||
if cfg.SmKamConfig.Rater == utils.INTERNAL {
|
||||
<-cacheChan // Wait for the cache to init before start doing queries
|
||||
raterConn = responder
|
||||
resp := <-internalRaterChan
|
||||
raterConn = resp
|
||||
internalRaterChan <- resp
|
||||
} else {
|
||||
var err error
|
||||
delay := utils.Fib()
|
||||
@@ -202,8 +187,9 @@ func startSmKamailio(responder *engine.Responder, cdrDb engine.CdrStorage, cache
|
||||
if cfg.SmKamConfig.Cdrs == cfg.SmKamConfig.Rater {
|
||||
cdrsConn = raterConn
|
||||
} else if cfg.SmKamConfig.Cdrs == utils.INTERNAL {
|
||||
<-cacheChan // Wait for the cache to init before start doing queries
|
||||
cdrsConn = responder
|
||||
resp := <-internalRaterChan
|
||||
cdrsConn = resp
|
||||
internalRaterChan <- resp
|
||||
} else if len(cfg.SmKamConfig.Cdrs) != 0 {
|
||||
delay := utils.Fib()
|
||||
for i := 0; i < cfg.SmKamConfig.Reconnects; i++ {
|
||||
@@ -229,12 +215,14 @@ func startSmKamailio(responder *engine.Responder, cdrDb engine.CdrStorage, cache
|
||||
exitChan <- true
|
||||
}
|
||||
|
||||
func startSmOpenSIPS(responder *engine.Responder, cdrDb engine.CdrStorage, cacheChan chan struct{}) {
|
||||
func startSmOpenSIPS(internalRaterChan chan *engine.Responder, cdrDb engine.CdrStorage, exitChan chan bool) {
|
||||
engine.Logger.Info("Starting CGRateS SM-OpenSIPS service.")
|
||||
var raterConn, cdrsConn engine.Connector
|
||||
var client *rpcclient.RpcClient
|
||||
if cfg.SmOsipsConfig.Rater == utils.INTERNAL {
|
||||
<-cacheChan // Wait for the cache to init before start doing queries
|
||||
raterConn = responder
|
||||
resp := <-internalRaterChan
|
||||
raterConn = resp
|
||||
internalRaterChan <- resp
|
||||
} else {
|
||||
var err error
|
||||
delay := utils.Fib()
|
||||
@@ -254,8 +242,9 @@ func startSmOpenSIPS(responder *engine.Responder, cdrDb engine.CdrStorage, cache
|
||||
if cfg.SmOsipsConfig.Cdrs == cfg.SmOsipsConfig.Rater {
|
||||
cdrsConn = raterConn
|
||||
} else if cfg.SmOsipsConfig.Cdrs == utils.INTERNAL {
|
||||
<-cacheChan // Wait for the cache to init before start doing queries
|
||||
cdrsConn = responder
|
||||
resp := <-internalRaterChan
|
||||
cdrsConn = resp
|
||||
internalRaterChan <- resp
|
||||
} else if len(cfg.SmOsipsConfig.Cdrs) != 0 {
|
||||
delay := utils.Fib()
|
||||
for i := 0; i < cfg.SmOsipsConfig.Reconnects; i++ {
|
||||
@@ -281,14 +270,17 @@ func startSmOpenSIPS(responder *engine.Responder, cdrDb engine.CdrStorage, cache
|
||||
exitChan <- true
|
||||
}
|
||||
|
||||
func startCDRS(logDb engine.LogStorage, cdrDb engine.CdrStorage, responder *engine.Responder, responderReady, doneChan chan struct{}) {
|
||||
func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, cdrDb engine.CdrStorage,
|
||||
internalRaterChan chan *engine.Responder, internalCdrStatSChan chan engine.StatsInterface, server *engine.Server, exitChan chan bool) {
|
||||
engine.Logger.Info("Starting CGRateS CDRS service.")
|
||||
var err error
|
||||
var client *rpcclient.RpcClient
|
||||
// Rater connection init
|
||||
var raterConn engine.Connector
|
||||
if cfg.CDRSRater == utils.INTERNAL {
|
||||
<-responderReady // Wait for the cache to init before start doing queries
|
||||
responder := <-internalRaterChan // Wait for rater to come up before start querying
|
||||
raterConn = responder
|
||||
internalRaterChan <- responder // Put back the connection since there might be other entities waiting for it
|
||||
} else if len(cfg.CDRSRater) != 0 {
|
||||
delay := utils.Fib()
|
||||
for i := 0; i < cfg.CDRSReconnects; i++ {
|
||||
@@ -308,7 +300,7 @@ func startCDRS(logDb engine.LogStorage, cdrDb engine.CdrStorage, responder *engi
|
||||
// Stats connection init
|
||||
var statsConn engine.StatsInterface
|
||||
if cfg.CDRSStats == utils.INTERNAL {
|
||||
statsConn = cdrStats
|
||||
statsConn = <-internalCdrStatSChan
|
||||
} else if len(cfg.CDRSStats) != 0 {
|
||||
if cfg.CDRSRater == cfg.CDRSStats {
|
||||
statsConn = &engine.ProxyStats{Client: client}
|
||||
@@ -330,34 +322,97 @@ func startCDRS(logDb engine.LogStorage, cdrDb engine.CdrStorage, responder *engi
|
||||
}
|
||||
}
|
||||
|
||||
cdrServer, _ = engine.NewCdrServer(cfg, cdrDb, raterConn, statsConn)
|
||||
cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, raterConn, statsConn)
|
||||
engine.Logger.Info("Registering CDRS HTTP Handlers.")
|
||||
cdrServer.RegisterHanlersToServer(server)
|
||||
engine.Logger.Info("Registering CDRS RPC service.")
|
||||
cdrSrv := v1.CdrsV1{CdrSrv: cdrServer}
|
||||
server.RpcRegister(&cdrSrv)
|
||||
server.RpcRegister(&v2.CdrsV2{CdrsV1: cdrSrv})
|
||||
// Make the cdr servers available for internal communication
|
||||
responder.CdrSrv = cdrServer
|
||||
close(doneChan)
|
||||
// Make the cdr server available for internal communication
|
||||
responder := <-internalRaterChan // Retrieve again the responder
|
||||
responder.CdrSrv = cdrServer // Attach connection to cdrServer in responder, so it can be used later
|
||||
internalRaterChan <- responder // Put back the connection for the rest of the system
|
||||
internalCdrSChan <- cdrServer // Signal that cdrS is operational
|
||||
}
|
||||
|
||||
// Starts the rpc server, waiting for the necessary components to finish their tasks
|
||||
func serveRpc(rpcWaitChans []chan struct{}) {
|
||||
for _, chn := range rpcWaitChans {
|
||||
<-chn
|
||||
func startScheduler(internalSchedulerChan chan *scheduler.Scheduler, ratingDb engine.RatingStorage, exitChan chan bool) {
|
||||
engine.Logger.Info("Starting CGRateS Scheduler.")
|
||||
sched := scheduler.NewScheduler()
|
||||
go reloadSchedulerSingnalHandler(sched, ratingDb)
|
||||
time.Sleep(1)
|
||||
internalSchedulerChan <- sched
|
||||
sched.LoadActionPlans(ratingDb)
|
||||
sched.Loop()
|
||||
exitChan <- true // Should not get out of loop though
|
||||
}
|
||||
|
||||
func startCdrStats(internalCdrStatSChan chan engine.StatsInterface, ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, server *engine.Server) {
|
||||
cdrStats := engine.NewStats(ratingDb, accountDb, cfg.CDRStatsSaveInterval)
|
||||
server.RpcRegister(cdrStats)
|
||||
server.RpcRegister(&v1.CDRStatsV1{CdrStats: cdrStats}) // Public APIs
|
||||
internalCdrStatSChan <- cdrStats
|
||||
}
|
||||
|
||||
func startHistoryServer(internalHistorySChan chan history.Scribe, server *engine.Server, exitChan chan bool) {
|
||||
scribeServer, err := history.NewFileScribe(cfg.HistoryDir, cfg.HistorySaveInterval)
|
||||
if err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("<HistoryServer> Could not start, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
}
|
||||
server.RpcRegisterName("ScribeV1", scribeServer)
|
||||
internalHistorySChan <- scribeServer
|
||||
}
|
||||
|
||||
func startPubSubServer(internalPubSubSChan chan engine.PublisherSubscriber, accountDb engine.AccountingStorage, server *engine.Server) {
|
||||
pubSubServer := engine.NewPubSub(accountDb, cfg.HttpSkipTlsVerify)
|
||||
server.RpcRegisterName("PubSubV1", pubSubServer)
|
||||
internalPubSubSChan <- pubSubServer
|
||||
}
|
||||
|
||||
// ToDo: Make sure we are caching before starting this one
|
||||
func startAliasesServer(internalAliaseSChan chan engine.AliasService, accountDb engine.AccountingStorage, server *engine.Server) {
|
||||
aliasesServer := engine.NewAliasHandler(accountDb)
|
||||
server.RpcRegisterName("AliasesV1", aliasesServer)
|
||||
internalAliaseSChan <- aliasesServer
|
||||
}
|
||||
|
||||
func startUsersServer(internalUserSChan chan engine.UserService, accountDb engine.AccountingStorage, server *engine.Server, exitChan chan bool) {
|
||||
userServer, err := engine.NewUserMap(accountDb, cfg.UserServerIndexes)
|
||||
if err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("<UsersService> Could not start, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
}
|
||||
server.RpcRegisterName("UsersV1", userServer)
|
||||
internalUserSChan <- userServer
|
||||
}
|
||||
|
||||
func startRpc(server *engine.Server, internalRaterChan chan *engine.Responder,
|
||||
internalCdrSChan chan *engine.CdrServer,
|
||||
internalCdrStatSChan chan engine.StatsInterface,
|
||||
internalHistorySChan chan history.Scribe,
|
||||
internalPubSubSChan chan engine.PublisherSubscriber,
|
||||
internalUserSChan chan engine.UserService,
|
||||
internalAliaseSChan chan engine.AliasService) {
|
||||
select { // Any of the rpc methods will unlock listening to rpc requests
|
||||
case resp := <-internalRaterChan:
|
||||
internalRaterChan <- resp
|
||||
case cdrs := <-internalCdrSChan:
|
||||
internalCdrSChan <- cdrs
|
||||
case cdrstats := <-internalCdrStatSChan:
|
||||
internalCdrStatSChan <- cdrstats
|
||||
case hist := <-internalHistorySChan:
|
||||
internalHistorySChan <- hist
|
||||
case pubsubs := <-internalPubSubSChan:
|
||||
internalPubSubSChan <- pubsubs
|
||||
case users := <-internalUserSChan:
|
||||
internalUserSChan <- users
|
||||
case aliases := <-internalAliaseSChan:
|
||||
internalAliaseSChan <- aliases
|
||||
}
|
||||
// Each of the serve blocks so need to start in their own goroutine
|
||||
go server.ServeJSON(cfg.RPCJSONListen)
|
||||
go server.ServeGOB(cfg.RPCGOBListen)
|
||||
}
|
||||
|
||||
// Starts the http server, waiting for the necessary components to finish their tasks
|
||||
func serveHttp(httpWaitChans []chan struct{}) {
|
||||
for _, chn := range httpWaitChans {
|
||||
<-chn
|
||||
}
|
||||
server.ServeHTTP(cfg.HTTPListen)
|
||||
go server.ServeHTTP(cfg.HTTPListen)
|
||||
}
|
||||
|
||||
func writePid() {
|
||||
@@ -412,7 +467,7 @@ func main() {
|
||||
var logDb engine.LogStorage
|
||||
var loadDb engine.LoadStorage
|
||||
var cdrDb engine.CdrStorage
|
||||
if cfg.RaterEnabled || cfg.SchedulerEnabled { // Only connect to dataDb if required
|
||||
if cfg.RaterEnabled || cfg.SchedulerEnabled { // Only connect to dataDb if necessary
|
||||
ratingDb, err = engine.ConfigureRatingStorage(cfg.TpDbType, cfg.TpDbHost, cfg.TpDbPort,
|
||||
cfg.TpDbName, cfg.TpDbUser, cfg.TpDbPass, cfg.DBDataEncoding)
|
||||
if err != nil { // Cannot configure getter database, show stopper
|
||||
@@ -431,15 +486,11 @@ func main() {
|
||||
engine.SetAccountingStorage(accountDb)
|
||||
}
|
||||
if cfg.RaterEnabled || cfg.CDRSEnabled || cfg.SchedulerEnabled { // Only connect to storDb if necessary
|
||||
if cfg.StorDBType == SAME {
|
||||
logDb = ratingDb.(engine.LogStorage)
|
||||
} else {
|
||||
logDb, err = engine.ConfigureLogStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort,
|
||||
cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.DBDataEncoding, cfg.StorDBMaxOpenConns, cfg.StorDBMaxIdleConns)
|
||||
if err != nil { // Cannot configure logger database, show stopper
|
||||
engine.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err))
|
||||
return
|
||||
}
|
||||
logDb, err = engine.ConfigureLogStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort,
|
||||
cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.DBDataEncoding, cfg.StorDBMaxOpenConns, cfg.StorDBMaxIdleConns)
|
||||
if err != nil { // Cannot configure logger database, show stopper
|
||||
engine.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err))
|
||||
return
|
||||
}
|
||||
defer logDb.Close()
|
||||
engine.SetStorageLogger(logDb)
|
||||
@@ -452,189 +503,52 @@ func main() {
|
||||
engine.SetRoundingDecimals(cfg.RoundingDecimals)
|
||||
stopHandled := false
|
||||
|
||||
// Async starts here
|
||||
// Rpc/http server
|
||||
server := new(engine.Server)
|
||||
|
||||
rpcWait := make([]chan struct{}, 0) // Rpc server will start as soon as this list is consumed
|
||||
httpWait := make([]chan struct{}, 0) // Http server will start as soon as this list is consumed
|
||||
// Async starts here, will follow cgrates.json start order
|
||||
exitChan := make(chan bool)
|
||||
|
||||
var cacheChan chan struct{}
|
||||
if cfg.RaterEnabled { // Cache rating if rater enabled
|
||||
cacheChan = make(chan struct{})
|
||||
rpcWait = append(rpcWait, cacheChan)
|
||||
go cacheData(ratingDb, accountDb, cacheChan)
|
||||
}
|
||||
|
||||
if cfg.RaterEnabled && cfg.RaterBalancer != "" && !cfg.BalancerEnabled {
|
||||
go registerToBalancer()
|
||||
go stopRaterSignalHandler()
|
||||
stopHandled = true
|
||||
}
|
||||
if cfg.CDRStatsEnabled { // Init it here so we make it availabe to the Apier
|
||||
cdrStats = engine.NewStats(ratingDb, accountDb, cfg.CDRStatsSaveInterval)
|
||||
server.RpcRegister(cdrStats)
|
||||
server.RpcRegister(&v1.CDRStatsV1{CdrStats: cdrStats}) // Public APIs
|
||||
}
|
||||
|
||||
if cfg.PubSubServerEnabled {
|
||||
pubSubServer = engine.NewPubSub(accountDb, cfg.HttpSkipTlsVerify)
|
||||
server.RpcRegisterName("PubSubV1", pubSubServer)
|
||||
}
|
||||
|
||||
if cfg.AliasesServerEnabled {
|
||||
aliasesServer = engine.NewAliasHandler(accountDb)
|
||||
server.RpcRegisterName("AliasesV1", aliasesServer)
|
||||
}
|
||||
|
||||
if cfg.HistoryServerEnabled {
|
||||
scribeServer, err = history.NewFileScribe(cfg.HistoryDir, cfg.HistorySaveInterval)
|
||||
if err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("<HistoryServer> Could not start, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
}
|
||||
server.RpcRegisterName("ScribeV1", scribeServer)
|
||||
}
|
||||
if cfg.UserServerEnabled {
|
||||
userServer, err = engine.NewUserMap(accountDb, cfg.UserServerIndexes)
|
||||
if err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("<UsersService> Could not start, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
}
|
||||
server.RpcRegisterName("UsersV1", userServer)
|
||||
}
|
||||
|
||||
// Register session manager service // FixMe: make sure this is thread safe
|
||||
if cfg.SmFsConfig.Enabled || cfg.SmKamConfig.Enabled || cfg.SmOsipsConfig.Enabled { // Register SessionManagerV1 service
|
||||
smRpc = new(v1.SessionManagerV1)
|
||||
server.RpcRegister(smRpc)
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if cfg.RaterCdrStats != "" && cfg.RaterCdrStats != utils.INTERNAL {
|
||||
if cdrStats, err = engine.NewProxyStats(cfg.RaterCdrStats, cfg.ConnectAttempts, -1); err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("<CdrStats> Could not connect to the server, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if cfg.RaterHistoryServer != "" && cfg.RaterHistoryServer != utils.INTERNAL {
|
||||
if scribeServer, err = history.NewProxyScribe(cfg.RaterHistoryServer, cfg.ConnectAttempts, -1); err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("<HistoryServer> Could not connect to the server, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
engine.SetHistoryScribe(scribeServer)
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if cfg.RaterPubSubServer != "" && cfg.RaterPubSubServer != utils.INTERNAL {
|
||||
if pubSubServer, err = engine.NewProxyPubSub(cfg.RaterPubSubServer, cfg.ConnectAttempts, -1); err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("<PubSubServer> Could not connect to the server, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
engine.SetPubSub(pubSubServer)
|
||||
}()
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if cfg.RaterAliasesServer != "" && cfg.RaterAliasesServer != utils.INTERNAL {
|
||||
if aliasesServer, err = engine.NewProxyAliasService(cfg.RaterAliasesServer, cfg.ConnectAttempts, -1); err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("<AliasesServer> Could not connect to the server, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
engine.SetAliasService(aliasesServer)
|
||||
}()
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if cfg.RaterUserServer != "" && cfg.RaterUserServer != utils.INTERNAL {
|
||||
if userServer, err = engine.NewProxyUserService(cfg.RaterUserServer, cfg.ConnectAttempts, -1); err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("<UserServer> Could not connect to the server, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
engine.SetUserService(userServer)
|
||||
}()
|
||||
wg.Wait()
|
||||
|
||||
responder := &engine.Responder{ExitChan: exitChan, Stats: cdrStats}
|
||||
apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats, Users: userServer}
|
||||
apierRpcV2 := &v2.ApierV2{
|
||||
ApierV1: v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats, Users: userServer}}
|
||||
|
||||
if cfg.RaterEnabled && !cfg.BalancerEnabled && cfg.RaterBalancer != utils.INTERNAL {
|
||||
engine.Logger.Info("Registering Rater service")
|
||||
server.RpcRegister(responder)
|
||||
server.RpcRegister(apierRpcV1)
|
||||
server.RpcRegister(apierRpcV2)
|
||||
}
|
||||
// Define internal connections via channels
|
||||
internalBalancerChan := make(chan *balancer2go.Balancer, 1)
|
||||
internalRaterChan := make(chan *engine.Responder, 1)
|
||||
internalSchedulerChan := make(chan *scheduler.Scheduler, 1)
|
||||
internalCdrSChan := make(chan *engine.CdrServer, 1)
|
||||
internalCdrStatSChan := make(chan engine.StatsInterface, 1)
|
||||
internalHistorySChan := make(chan history.Scribe, 1)
|
||||
internalPubSubSChan := make(chan engine.PublisherSubscriber, 1)
|
||||
internalUserSChan := make(chan engine.UserService, 1)
|
||||
internalAliaseSChan := make(chan engine.AliasService, 1)
|
||||
|
||||
// Start balancer service
|
||||
if cfg.BalancerEnabled {
|
||||
engine.Logger.Info("Registering Balancer service.")
|
||||
go stopBalancerSignalHandler()
|
||||
stopHandled = true
|
||||
responder.Bal = bal
|
||||
server.RpcRegister(responder)
|
||||
server.RpcRegister(apierRpcV1)
|
||||
server.RpcRegister(apierRpcV2)
|
||||
if cfg.RaterEnabled {
|
||||
engine.Logger.Info("<Balancer> Registering internal rater")
|
||||
bal.AddClient("local", new(engine.ResponderWorker))
|
||||
}
|
||||
go startBalancer(internalBalancerChan, &stopHandled, exitChan) // Not really needed async here but to cope with uniformity
|
||||
}
|
||||
|
||||
if !stopHandled {
|
||||
go generalSignalHandler()
|
||||
// Start rater service
|
||||
if cfg.RaterEnabled {
|
||||
cacheChan := make(chan struct{}) // Share the cacheChan with the rater to inform when cache is ready
|
||||
go cacheRaterData(cacheChan, ratingDb, accountDb, exitChan) // Handle data caching outside of rater start
|
||||
go startRater(internalRaterChan, internalBalancerChan, internalSchedulerChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan,
|
||||
cacheChan, server, ratingDb, accountDb, loadDb, cdrDb, logDb, &stopHandled, exitChan)
|
||||
}
|
||||
|
||||
// Start Scheduler
|
||||
if cfg.SchedulerEnabled {
|
||||
engine.Logger.Info("Starting CGRateS Scheduler.")
|
||||
go func() {
|
||||
sched := scheduler.NewScheduler()
|
||||
go reloadSchedulerSingnalHandler(sched, ratingDb)
|
||||
apierRpcV1.Sched = sched
|
||||
apierRpcV2.Sched = sched
|
||||
sched.LoadActionPlans(ratingDb)
|
||||
sched.Loop()
|
||||
}()
|
||||
go startScheduler(internalSchedulerChan, ratingDb, exitChan)
|
||||
}
|
||||
|
||||
var cdrsChan chan struct{}
|
||||
// Start CDR Server
|
||||
if cfg.CDRSEnabled {
|
||||
engine.Logger.Info("Starting CGRateS CDRS service.")
|
||||
cdrsChan = make(chan struct{})
|
||||
httpWait = append(httpWait, cdrsChan)
|
||||
go startCDRS(logDb, cdrDb, responder, cacheChan, cdrsChan)
|
||||
go startCDRS(internalCdrSChan, logDb, cdrDb, internalRaterChan, internalCdrStatSChan, server, exitChan)
|
||||
}
|
||||
|
||||
if cfg.SmFsConfig.Enabled {
|
||||
engine.Logger.Info("Starting CGRateS SM-FreeSWITCH service.")
|
||||
go startSmFreeSWITCH(responder, cdrDb, cacheChan)
|
||||
// close all sessions on shutdown
|
||||
go shutdownSessionmanagerSingnalHandler()
|
||||
}
|
||||
if cfg.SmKamConfig.Enabled {
|
||||
engine.Logger.Info("Starting CGRateS SM-Kamailio service.")
|
||||
go startSmKamailio(responder, cdrDb, cacheChan)
|
||||
}
|
||||
if cfg.SmOsipsConfig.Enabled {
|
||||
engine.Logger.Info("Starting CGRateS SM-OpenSIPS service.")
|
||||
go startSmOpenSIPS(responder, cdrDb, cacheChan)
|
||||
// Start CDR Stats server
|
||||
if cfg.CDRStatsEnabled {
|
||||
go startCdrStats(internalCdrStatSChan, ratingDb, accountDb, server)
|
||||
}
|
||||
|
||||
// Start CDRC components
|
||||
var cdrcEnabled bool
|
||||
for _, cdrcCfgs := range cfg.CdrcProfiles {
|
||||
var cdrcCfg *config.CdrcConfig
|
||||
@@ -646,15 +560,58 @@ func main() {
|
||||
} else if !cdrcEnabled {
|
||||
cdrcEnabled = true // Mark that at least one cdrc service is active
|
||||
}
|
||||
go startCdrc(responder, cdrsChan, cdrcCfgs, cfg.HttpSkipTlsVerify, cfg.ConfigReloads[utils.CDRC])
|
||||
go startCdrc(internalCdrSChan, internalRaterChan, cdrcCfgs, cfg.HttpSkipTlsVerify, cfg.ConfigReloads[utils.CDRC], exitChan)
|
||||
}
|
||||
if cdrcEnabled {
|
||||
engine.Logger.Info("Starting CGRateS CDR client.")
|
||||
}
|
||||
|
||||
// Start the servers
|
||||
go serveRpc(rpcWait)
|
||||
go serveHttp(httpWait)
|
||||
// Start SM-FreeSWITCH
|
||||
if cfg.SmFsConfig.Enabled {
|
||||
go startSmFreeSWITCH(internalRaterChan, cdrDb, exitChan)
|
||||
// close all sessions on shutdown
|
||||
go shutdownSessionmanagerSingnalHandler(exitChan)
|
||||
}
|
||||
|
||||
// Start SM-Kamailio
|
||||
if cfg.SmKamConfig.Enabled {
|
||||
go startSmKamailio(internalRaterChan, cdrDb, exitChan)
|
||||
}
|
||||
|
||||
// Start SM-OpenSIPS
|
||||
if cfg.SmOsipsConfig.Enabled {
|
||||
go startSmOpenSIPS(internalRaterChan, cdrDb, exitChan)
|
||||
}
|
||||
|
||||
// Register session manager service // FixMe: make sure this is thread safe
|
||||
if cfg.SmFsConfig.Enabled || cfg.SmKamConfig.Enabled || cfg.SmOsipsConfig.Enabled { // Register SessionManagerV1 service
|
||||
smRpc = new(v1.SessionManagerV1)
|
||||
server.RpcRegister(smRpc)
|
||||
}
|
||||
|
||||
// Start HistoryS service
|
||||
if cfg.HistoryServerEnabled {
|
||||
go startHistoryServer(internalHistorySChan, server, exitChan)
|
||||
}
|
||||
|
||||
// Start PubSubS service
|
||||
if cfg.PubSubServerEnabled {
|
||||
go startPubSubServer(internalPubSubSChan, accountDb, server)
|
||||
}
|
||||
|
||||
// Start Aliases service
|
||||
if cfg.AliasesServerEnabled {
|
||||
go startAliasesServer(internalAliaseSChan, accountDb, server)
|
||||
}
|
||||
|
||||
// Start users service
|
||||
if cfg.UserServerEnabled {
|
||||
go startUsersServer(internalUserSChan, accountDb, server, exitChan)
|
||||
}
|
||||
|
||||
// Serve rpc connections
|
||||
go startRpc(server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalHistorySChan,
|
||||
internalPubSubSChan, internalUserSChan, internalAliaseSChan)
|
||||
<-exitChan
|
||||
|
||||
if *pidFile != "" {
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/cgrates/cgrates/balancer2go"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/scheduler"
|
||||
)
|
||||
@@ -32,48 +33,54 @@ import (
|
||||
/*
|
||||
Listens for SIGTERM, SIGINT, SIGQUIT system signals and shuts down all the registered engines.
|
||||
*/
|
||||
func stopBalancerSignalHandler() {
|
||||
func stopBalancerSignalHandler(bal *balancer2go.Balancer, exitChan chan bool) {
|
||||
c := make(chan os.Signal)
|
||||
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
|
||||
|
||||
sig := <-c
|
||||
engine.Logger.Info(fmt.Sprintf("Caught signal %v, sending shutdown to engines\n", sig))
|
||||
bal.Shutdown("Responder.Shutdown")
|
||||
exitChan <- true
|
||||
}
|
||||
|
||||
func generalSignalHandler() {
|
||||
func generalSignalHandler(internalCdrStatSChan chan engine.StatsInterface, exitChan chan bool) {
|
||||
c := make(chan os.Signal)
|
||||
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
|
||||
|
||||
sig := <-c
|
||||
engine.Logger.Info(fmt.Sprintf("Caught signal %v, shuting down cgr-engine\n", sig))
|
||||
var dummyInt int
|
||||
if cdrStats != nil {
|
||||
select {
|
||||
case cdrStats := <-internalCdrStatSChan:
|
||||
cdrStats.Stop(dummyInt, &dummyInt)
|
||||
default:
|
||||
}
|
||||
|
||||
exitChan <- true
|
||||
}
|
||||
|
||||
/*
|
||||
Listens for the SIGTERM, SIGINT, SIGQUIT system signals and gracefuly unregister from balancer and closes the storage before exiting.
|
||||
*/
|
||||
func stopRaterSignalHandler() {
|
||||
func stopRaterSignalHandler(internalCdrStatSChan chan engine.StatsInterface, exitChan chan bool) {
|
||||
c := make(chan os.Signal)
|
||||
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
|
||||
sig := <-c
|
||||
|
||||
engine.Logger.Info(fmt.Sprintf("Caught signal %v, unregistering from balancer\n", sig))
|
||||
unregisterFromBalancer()
|
||||
unregisterFromBalancer(exitChan)
|
||||
var dummyInt int
|
||||
cdrStats.Stop(dummyInt, &dummyInt)
|
||||
select {
|
||||
case cdrStats := <-internalCdrStatSChan:
|
||||
cdrStats.Stop(dummyInt, &dummyInt)
|
||||
default:
|
||||
}
|
||||
exitChan <- true
|
||||
}
|
||||
|
||||
/*
|
||||
Connects to the balancer and calls unregister RPC method.
|
||||
*/
|
||||
func unregisterFromBalancer() {
|
||||
func unregisterFromBalancer(exitChan chan bool) {
|
||||
client, err := rpc.Dial("tcp", cfg.RaterBalancer)
|
||||
if err != nil {
|
||||
engine.Logger.Crit("Cannot contact the balancer!")
|
||||
@@ -92,7 +99,7 @@ func unregisterFromBalancer() {
|
||||
/*
|
||||
Connects to the balancer and rehisters the engine to the server.
|
||||
*/
|
||||
func registerToBalancer() {
|
||||
func registerToBalancer(exitChan chan bool) {
|
||||
client, err := rpc.Dial("tcp", cfg.RaterBalancer)
|
||||
if err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("Cannot contact the balancer: %v", err))
|
||||
@@ -126,7 +133,7 @@ func reloadSchedulerSingnalHandler(sched *scheduler.Scheduler, getter engine.Rat
|
||||
/*
|
||||
Listens for the SIGTERM, SIGINT, SIGQUIT system signals and shuts down the session manager.
|
||||
*/
|
||||
func shutdownSessionmanagerSingnalHandler() {
|
||||
func shutdownSessionmanagerSingnalHandler(exitChan chan bool) {
|
||||
c := make(chan os.Signal)
|
||||
signal.Notify(c, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
|
||||
<-c
|
||||
|
||||
Reference in New Issue
Block a user