Avoid concurrency on variables in cmd/cgr-engine

This commit is contained in:
DanB
2017-12-13 19:58:15 +01:00
parent 12ff92551b
commit 526d63505f
3 changed files with 67 additions and 12 deletions

View File

@@ -69,7 +69,6 @@ var (
cfg *config.CGRConfig
smRpc *v1.SessionManagerV1
err error
)
func startCdrcs(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
@@ -132,8 +131,10 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne
}
}
func startSmGeneric(internalSMGChan chan *sessionmanager.SMGeneric, internalRaterChan, internalCDRSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) {
func startSmGeneric(internalSMGChan chan *sessionmanager.SMGeneric, internalRaterChan,
internalCDRSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS SMGeneric service.")
var err error
var ralsConns, cdrsConn *rpcclient.RpcClientPool
if len(cfg.SmGenericConfig.RALsConns) != 0 {
ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
@@ -212,6 +213,7 @@ func startSMAsterisk(internalSMGChan chan *sessionmanager.SMGeneric, exitChan ch
}
func startDiameterAgent(internalSMGChan chan *sessionmanager.SMGeneric, internalPubSubSChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
var err error
utils.Logger.Info("Starting CGRateS DiameterAgent service")
smgChan := make(chan rpcclient.RpcClientConnection, 1) // Use it to pass smg
go func(internalSMGChan chan *sessionmanager.SMGeneric, smgChan chan rpcclient.RpcClientConnection) {
@@ -253,6 +255,7 @@ func startDiameterAgent(internalSMGChan chan *sessionmanager.SMGeneric, internal
}
func startRadiusAgent(internalSMGChan chan *sessionmanager.SMGeneric, exitChan chan bool) {
var err error
utils.Logger.Info("Starting CGRateS RadiusAgent service")
smgChan := make(chan rpcclient.RpcClientConnection, 1) // Use it to pass smg
go func(internalSMGChan chan *sessionmanager.SMGeneric, smgChan chan rpcclient.RpcClientConnection) {
@@ -285,6 +288,7 @@ func startRadiusAgent(internalSMGChan chan *sessionmanager.SMGeneric, exitChan c
}
func startSmFreeSWITCH(internalRaterChan, internalCDRSChan, rlsChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) {
var err error
utils.Logger.Info("Starting CGRateS SMFreeSWITCH service")
var ralsConn, cdrsConn, rlsConn *rpcclient.RpcClientPool
if len(cfg.SmFsConfig.RALsConns) != 0 {
@@ -323,6 +327,7 @@ func startSmFreeSWITCH(internalRaterChan, internalCDRSChan, rlsChan chan rpcclie
}
func startSmKamailio(internalRaterChan, internalCDRSChan, internalRsChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) {
var err error
utils.Logger.Info("Starting CGRateS SMKamailio service.")
var ralsConn, cdrsConn, rlSConn *rpcclient.RpcClientPool
if len(cfg.SmKamConfig.RALsConns) != 0 {
@@ -361,6 +366,7 @@ func startSmKamailio(internalRaterChan, internalCDRSChan, internalRsChan chan rp
}
func startSmOpenSIPS(internalRaterChan, internalCDRSChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) {
var err error
utils.Logger.Info("Starting CGRateS SMOpenSIPS service.")
var ralsConn, cdrsConn *rpcclient.RpcClientPool
if len(cfg.SmOsipsConfig.RALsConns) != 0 {
@@ -394,6 +400,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
internalRaterChan, internalPubSubSChan, internalAttributeSChan, internalUserSChan, internalAliaseSChan,
internalCdrStatSChan, internalThresholdSChan, internalStatSChan chan rpcclient.RpcClientConnection,
server *utils.Server, exitChan chan bool) {
var err error
utils.Logger.Info("Starting CGRateS CDRS service.")
var ralConn, pubSubConn, usersConn, attrSConn, aliasesConn, cdrstatsConn, thresholdSConn, statsConn *rpcclient.RpcClientPool
if len(cfg.CDRSRaterConns) != 0 { // Conn pool towards RAL
@@ -574,6 +581,7 @@ func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnec
func startResourceService(internalRsChan, internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
var err error
var thdSConn *rpcclient.RpcClientPool
filterS := <-filterSChan
filterSChan <- filterS
@@ -610,6 +618,7 @@ func startResourceService(internalRsChan, internalThresholdSChan chan rpcclient.
// startStatService fires up the StatS
func startStatService(internalStatSChan, internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
var err error
var thdSConn *rpcclient.RpcClientPool
filterS := <-filterSChan
filterSChan <- filterS
@@ -671,6 +680,7 @@ func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnec
// startSupplierService fires up the ThresholdS
func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSChan chan rpcclient.RpcClientConnection,
cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
var err error
filterS := <-filterSChan
filterSChan <- filterS
var resourceSConn, statSConn *rpcclient.RpcClientPool
@@ -823,6 +833,7 @@ func main() {
return
}()
}
var err error
// Init config
cfg, err = config.NewCGRConfigFromFolder(*cfgDir)
if err != nil {

View File

@@ -128,6 +128,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
waitTasks = append(waitTasks, thdsTaskChan)
go func() {
defer close(thdsTaskChan)
var err error
thdS, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.RALsThresholdSConns, internalThdSChan, cfg.InternalTtl)
if err != nil {
@@ -144,6 +145,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
waitTasks = append(waitTasks, cdrstatTaskChan)
go func() {
defer close(cdrstatTaskChan)
var err error
cdrStats, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.RALsCDRStatSConns, internalCdrStatSChan, cfg.InternalTtl)
if err != nil {
@@ -160,6 +162,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
waitTasks = append(waitTasks, statsTaskChan)
go func() {
defer close(statsTaskChan)
var err error
stats, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.RALsStatSConns, internalStatSChan, cfg.InternalTtl)
if err != nil {
@@ -209,6 +212,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
waitTasks = append(waitTasks, attrsTaskChan)
go func() {
defer close(attrsTaskChan)
var err error
attrS, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts,
cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.RALsAttributeSConns, internalAttributeSChan, cfg.InternalTtl)
@@ -226,7 +230,8 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
waitTasks = append(waitTasks, aliasesTaskChan)
go func() {
defer close(aliasesTaskChan)
if aliaseSCons, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
if aliaseSCons, err := engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.RALsAliasSConns, internalAliaseSChan, cfg.InternalTtl); err != nil {
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect to AliaseS, error: %s", err.Error()))
exitChan <- true
@@ -243,13 +248,15 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
waitTasks = append(waitTasks, usersTaskChan)
go func() {
defer close(usersTaskChan)
if usersConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
if usersConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.RALsUserSConns, internalUserSChan, cfg.InternalTtl); err != nil {
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect UserS, error: %s", err.Error()))
exitChan <- true
return
} else {
engine.SetUserService(usersConns)
}
engine.SetUserService(usersConns)
}()
}
// Wait for all connections to complete before going further

View File

@@ -28,6 +28,7 @@ import (
"net/rpc"
"net/rpc/jsonrpc"
"reflect"
"sync"
"time"
"github.com/cenk/rpc2"
@@ -40,34 +41,51 @@ type Server struct {
rpcEnabled bool
httpEnabled bool
birpcSrv *rpc2.Server
sync.RWMutex
}
func (s *Server) RpcRegister(rcvr interface{}) {
rpc.Register(rcvr)
s.Lock()
s.rpcEnabled = true
s.Unlock()
}
func (s *Server) RpcRegisterName(name string, rcvr interface{}) {
rpc.RegisterName(name, rcvr)
s.Lock()
s.rpcEnabled = true
s.Unlock()
}
func (s *Server) RegisterHttpFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
http.HandleFunc(pattern, handler)
s.Lock()
s.httpEnabled = true
s.Unlock()
}
// Registers a new BiJsonRpc name
func (s *Server) BiRPCRegisterName(method string, handlerFunc interface{}) {
if s.birpcSrv == nil {
s.RLock()
isNil := s.birpcSrv == nil
s.RUnlock()
if isNil {
s.Lock()
s.birpcSrv = rpc2.NewServer()
s.Unlock()
}
s.birpcSrv.Handle(method, handlerFunc)
}
func (s *Server) BiRPCRegister(rcvr interface{}) {
if s.birpcSrv == nil {
s.RLock()
isNil := s.birpcSrv == nil
s.RUnlock()
if isNil {
s.Lock()
s.birpcSrv = rpc2.NewServer()
s.Unlock()
}
rcvType := reflect.TypeOf(rcvr)
for i := 0; i < rcvType.NumMethod(); i++ {
@@ -79,7 +97,10 @@ func (s *Server) BiRPCRegister(rcvr interface{}) {
}
func (s *Server) ServeJSON(addr string) {
if !s.rpcEnabled {
s.RLock()
enabled := s.rpcEnabled
s.RUnlock()
if !enabled {
return
}
lJSON, e := net.Listen("tcp", addr)
@@ -111,7 +132,10 @@ func (s *Server) ServeJSON(addr string) {
}
func (s *Server) ServeGOB(addr string) {
if !s.rpcEnabled {
s.RLock()
enabled := s.rpcEnabled
s.RUnlock()
if !enabled {
return
}
lGOB, e := net.Listen("tcp", addr)
@@ -150,8 +174,16 @@ func handleRequest(w http.ResponseWriter, r *http.Request) {
}
func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, useBasicAuth bool, userList map[string]string) {
if s.rpcEnabled && jsonRPCURL != "" {
s.RLock()
enabled := s.rpcEnabled
s.RUnlock()
if !enabled {
return
}
if enabled && jsonRPCURL != "" {
s.Lock()
s.httpEnabled = true
s.Unlock()
Logger.Info("<HTTP> enabling handler for JSON-RPC")
if useBasicAuth {
http.HandleFunc(jsonRPCURL, use(handleRequest, basicAuth(userList)))
@@ -160,8 +192,10 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, useB
}
}
if s.rpcEnabled && wsRPCURL != "" {
if enabled && wsRPCURL != "" {
s.Lock()
s.httpEnabled = true
s.Unlock()
Logger.Info("<HTTP> enabling handler for WebSocket connections")
wsHandler := websocket.Handler(func(ws *websocket.Conn) {
jsonrpc.ServeConn(ws)
@@ -186,7 +220,10 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, useB
}
func (s *Server) ServeBiJSON(addr string) {
if s.birpcSrv == nil {
s.RLock()
isNil := s.birpcSrv == nil
s.RUnlock()
if isNil {
return
}
lBiJSON, e := net.Listen("tcp", addr)