added HaPool cfg options

This commit is contained in:
Radu Ioan Fericean
2015-07-23 16:51:04 +03:00
parent eef3ab763b
commit c02ee71899
7 changed files with 294 additions and 148 deletions

View File

@@ -23,6 +23,7 @@ import (
"fmt"
"log"
"os"
"reflect"
"runtime"
"runtime/pprof"
"strconv"
@@ -109,16 +110,18 @@ func startCdrc(cdrsChan chan struct{}, cdrcCfgs map[string]*config.CdrcConfig, h
}
func startSmFreeSWITCH(responder *engine.Responder, cdrDb engine.CdrStorage, cacheChan chan struct{}) {
var raterConn, cdrsConn engine.Connector
var raterConn, cdrsConn engine.ConnectorPool
var client *rpcclient.RpcClient
var err error
delay := utils.Fib()
if cfg.SmFsConfig.Rater == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
raterConn = responder
} else {
var err error
for _, raterCfg := range cfg.SmFsConfig.HaRater {
if raterCfg.Server == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
raterConn = append(raterConn, responder)
continue
}
for i := 0; i < cfg.SmFsConfig.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SmFsConfig.Rater, cfg.ConnectAttempts, cfg.SmFsConfig.Reconnects, utils.GOB)
client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.SmFsConfig.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
@@ -129,32 +132,34 @@ func startSmFreeSWITCH(responder *engine.Responder, cdrDb engine.CdrStorage, cac
exitChan <- true
return
}
raterConn = &engine.RPCClientConnector{Client: client}
raterConn = append(raterConn, &engine.RPCClientConnector{Client: client, Timeout: raterCfg.Timeout})
}
if cfg.SmFsConfig.Cdrs == cfg.SmFsConfig.Rater {
if reflect.DeepEqual(cfg.SmFsConfig.HaCdrs, cfg.SmFsConfig.HaRater) {
cdrsConn = raterConn
} else if cfg.SmFsConfig.Cdrs == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
cdrsConn = responder
} else if len(cfg.SmFsConfig.Cdrs) != 0 {
} else if len(cfg.SmFsConfig.HaCdrs) != 0 {
delay = utils.Fib()
for i := 0; i < cfg.SmFsConfig.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SmFsConfig.Cdrs, cfg.ConnectAttempts, cfg.SmFsConfig.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
for _, cdrsCfg := range cfg.SmFsConfig.HaCdrs {
if cdrsCfg.Server == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
cdrsConn = append(cdrsConn, responder)
continue
}
time.Sleep(delay())
for i := 0; i < cfg.SmFsConfig.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.SmFsConfig.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(delay())
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SM-FreeSWITCH> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
return
}
cdrsConn = append(cdrsConn, &engine.RPCClientConnector{Client: client, Timeout: cdrsCfg.Timeout})
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SM-FreeSWITCH> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
return
}
cdrsConn = &engine.RPCClientConnector{Client: client}
}
rcp := engine.ConnectorPool{raterConn}
ccp := engine.ConnectorPool{cdrsConn}
sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, rcp, ccp)
sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, raterConn, cdrsConn)
sms = append(sms, sm)
smRpc.SMs = append(smRpc.SMs, sm)
if err = sm.Connect(); err != nil {
@@ -164,16 +169,19 @@ func startSmFreeSWITCH(responder *engine.Responder, cdrDb engine.CdrStorage, cac
}
func startSmKamailio(responder *engine.Responder, cdrDb engine.CdrStorage, cacheChan chan struct{}) {
var raterConn, cdrsConn engine.Connector
var raterConn, cdrsConn engine.ConnectorPool
var client *rpcclient.RpcClient
if cfg.SmKamConfig.Rater == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
raterConn = responder
} else {
var err error
delay := utils.Fib()
var err error
delay := utils.Fib()
for _, raterCfg := range cfg.SmKamConfig.HaRater {
if raterCfg.Server == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
raterConn = append(raterConn, responder)
continue
}
for i := 0; i < cfg.SmKamConfig.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SmKamConfig.Rater, cfg.ConnectAttempts, cfg.SmKamConfig.Reconnects, utils.GOB)
client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.SmKamConfig.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
@@ -183,28 +191,32 @@ func startSmKamailio(responder *engine.Responder, cdrDb engine.CdrStorage, cache
engine.Logger.Crit(fmt.Sprintf("<SessionManager> Could not connect to rater: %v", err))
exitChan <- true
}
raterConn = &engine.RPCClientConnector{Client: client}
}
if cfg.SmKamConfig.Cdrs == cfg.SmKamConfig.Rater {
cdrsConn = raterConn
} else if cfg.SmKamConfig.Cdrs == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
cdrsConn = responder
} else if len(cfg.SmKamConfig.Cdrs) != 0 {
delay := utils.Fib()
for i := 0; i < cfg.SmKamConfig.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SmKamConfig.Cdrs, cfg.ConnectAttempts, cfg.SmKamConfig.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
raterConn = append(raterConn, &engine.RPCClientConnector{Client: client, Timeout: raterCfg.Timeout})
if reflect.DeepEqual(cfg.SmKamConfig.HaCdrs, cfg.SmKamConfig.HaRater) {
cdrsConn = raterConn
} else if len(cfg.SmKamConfig.HaCdrs) != 0 {
delay := utils.Fib()
for _, cdrsCfg := range cfg.SmKamConfig.HaCdrs {
if cdrsCfg.Server == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
cdrsConn = append(cdrsConn, responder)
continue
}
for i := 0; i < cfg.SmKamConfig.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.SmKamConfig.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(delay())
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SM-Kamailio> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
return
}
cdrsConn = append(cdrsConn, &engine.RPCClientConnector{Client: client, Timeout: cdrsCfg.Timeout})
}
time.Sleep(delay())
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SM-Kamailio> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
return
}
cdrsConn = &engine.RPCClientConnector{Client: client}
}
sm, _ := sessionmanager.NewKamailioSessionManager(cfg.SmKamConfig, raterConn, cdrsConn)
sms = append(sms, sm)
@@ -216,16 +228,19 @@ func startSmKamailio(responder *engine.Responder, cdrDb engine.CdrStorage, cache
}
func startSmOpenSIPS(responder *engine.Responder, cdrDb engine.CdrStorage, cacheChan chan struct{}) {
var raterConn, cdrsConn engine.Connector
var raterConn, cdrsConn engine.ConnectorPool
var client *rpcclient.RpcClient
if cfg.SmOsipsConfig.Rater == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
raterConn = responder
} else {
var err error
delay := utils.Fib()
var err error
delay := utils.Fib()
for _, raterCfg := range cfg.SmOsipsConfig.HaRater {
if raterCfg.Server == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
raterConn = append(raterConn, responder)
continue
}
for i := 0; i < cfg.SmOsipsConfig.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SmOsipsConfig.Rater, cfg.ConnectAttempts, cfg.SmOsipsConfig.Reconnects, utils.GOB)
client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.SmOsipsConfig.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
@@ -235,28 +250,33 @@ func startSmOpenSIPS(responder *engine.Responder, cdrDb engine.CdrStorage, cache
engine.Logger.Crit(fmt.Sprintf("<SessionManager> Could not connect to rater: %v", err))
exitChan <- true
}
raterConn = &engine.RPCClientConnector{Client: client}
raterConn = append(raterConn, &engine.RPCClientConnector{Client: client, Timeout: raterCfg.Timeout})
}
if cfg.SmOsipsConfig.Cdrs == cfg.SmOsipsConfig.Rater {
if reflect.DeepEqual(cfg.SmOsipsConfig.HaCdrs, cfg.SmOsipsConfig.HaRater) {
cdrsConn = raterConn
} else if cfg.SmOsipsConfig.Cdrs == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
cdrsConn = responder
} else if len(cfg.SmOsipsConfig.Cdrs) != 0 {
delay := utils.Fib()
for i := 0; i < cfg.SmOsipsConfig.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SmOsipsConfig.Cdrs, cfg.ConnectAttempts, cfg.SmOsipsConfig.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
for _, cdrsCfg := range cfg.SmOsipsConfig.HaCdrs {
if cdrsCfg.Server == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
cdrsConn = append(cdrsConn, responder)
continue
}
if len(cfg.SmOsipsConfig.HaCdrs) != 0 {
delay := utils.Fib()
for i := 0; i < cfg.SmOsipsConfig.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.SmOsipsConfig.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(delay())
}
time.Sleep(delay())
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
return
}
cdrsConn = append(raterConn, &engine.RPCClientConnector{Client: client, Timeout: cdrsCfg.Timeout})
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
return
}
cdrsConn = &engine.RPCClientConnector{Client: client}
}
sm, _ := sessionmanager.NewOSipsSessionManager(cfg.SmOsipsConfig, raterConn, cdrsConn)
sms = append(sms, sm)
@@ -547,7 +567,7 @@ func main() {
}()
wg.Wait()
responder := &engine.Responder{ExitChan: exitChan, Stats: cdrStats}
responder := &engine.Responder{ExitChan: exitChan, Stats: cdrStats, Timeout: 10 * time.Minute}
apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats}
apierRpcV2 := &v2.ApierV2{ApierV1: v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Config: cfg, Responder: responder, CdrStatsSrv: cdrStats}}