mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Locking connection initialization within ConnManager
This commit is contained in:
@@ -21,6 +21,7 @@ package engine
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -34,6 +35,10 @@ func NewConnManager(cfg *config.CGRConfig, rpcInternal map[string]chan rpcclient
|
||||
cfg: cfg,
|
||||
rpcInternal: rpcInternal,
|
||||
connCache: ltcache.NewCache(-1, 0, true, nil),
|
||||
connLks: make(map[string]*sync.Mutex),
|
||||
}
|
||||
for connID := range cfg.RPCConns() {
|
||||
cM.connLks[connID] = new(sync.Mutex)
|
||||
}
|
||||
SetConnManager(cM)
|
||||
return
|
||||
@@ -44,6 +49,21 @@ type ConnManager struct {
|
||||
cfg *config.CGRConfig
|
||||
rpcInternal map[string]chan rpcclient.ClientConnector
|
||||
connCache *ltcache.Cache
|
||||
connLks map[string]*sync.Mutex // control connection initialization and caching
|
||||
}
|
||||
|
||||
// lkConn will lock a connection if preconfigured
|
||||
func (cM *ConnManager) lkConn(connID string) {
|
||||
if lk, hasConn := cM.connLks[connID]; hasConn {
|
||||
lk.Lock()
|
||||
}
|
||||
}
|
||||
|
||||
// unlkConn will unlock a connection if preconfigured
|
||||
func (cM *ConnManager) unlkConn(connID string) {
|
||||
if lk, hasConn := cM.connLks[connID]; hasConn {
|
||||
lk.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// getConn is used to retrieve a connection from cache
|
||||
@@ -105,8 +125,6 @@ func (cM *ConnManager) getConnWithConfig(connID string, connCfg *config.RPCConn,
|
||||
err = fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport)
|
||||
return
|
||||
}
|
||||
// fmt.Println(utils.ToJSON(rpcConnCfg))
|
||||
// fmt.Println(utils.ToJSON(cM.cfg.GeneralCfg()))
|
||||
if conn, err = rpcclient.NewRPCParallelClientPool(utils.TCP, rpcConnCfg.Address, rpcConnCfg.TLS,
|
||||
utils.FirstNonEmpty(rpcConnCfg.ClientKey, cM.cfg.TLSCfg().ClientKey), utils.FirstNonEmpty(rpcConnCfg.ClientCertificate, cM.cfg.TLSCfg().ClientCerificate),
|
||||
utils.FirstNonEmpty(rpcConnCfg.CaCertificate, cM.cfg.TLSCfg().CaCertificate), utils.FirstIntNonEmpty(rpcConnCfg.ConnectAttempts, cM.cfg.GeneralCfg().ConnectAttempts),
|
||||
@@ -149,7 +167,10 @@ func (cM *ConnManager) Call(connIDs []string, biRPCClient rpcclient.BiRPCConecto
|
||||
}
|
||||
var conn rpcclient.ClientConnector
|
||||
for _, connID := range connIDs {
|
||||
if conn, err = cM.getConn(connID, biRPCClient); err != nil {
|
||||
cM.lkConn(connID)
|
||||
conn, err = cM.getConn(connID, biRPCClient)
|
||||
cM.unlkConn(connID)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if err = conn.Call(method, arg, reply); !rpcclient.IsNetworkError(err) {
|
||||
|
||||
Reference in New Issue
Block a user