diff --git a/engine/connmanager.go b/engine/connmanager.go index 101f23fea..e49d1ab61 100644 --- a/engine/connmanager.go +++ b/engine/connmanager.go @@ -21,6 +21,7 @@ package engine import ( "fmt" "strings" + "sync" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" @@ -34,6 +35,10 @@ func NewConnManager(cfg *config.CGRConfig, rpcInternal map[string]chan rpcclient cfg: cfg, rpcInternal: rpcInternal, connCache: ltcache.NewCache(-1, 0, true, nil), + connLks: make(map[string]*sync.Mutex), + } + for connID := range cfg.RPCConns() { + cM.connLks[connID] = new(sync.Mutex) } SetConnManager(cM) return @@ -44,6 +49,21 @@ type ConnManager struct { cfg *config.CGRConfig rpcInternal map[string]chan rpcclient.ClientConnector connCache *ltcache.Cache + connLks map[string]*sync.Mutex // control connection initialization and caching +} + +// lkConn will lock a connection if preconfigured +func (cM *ConnManager) lkConn(connID string) { + if lk, hasConn := cM.connLks[connID]; hasConn { + lk.Lock() + } +} + +// unlkConn will unlock a connection if preconfigured +func (cM *ConnManager) unlkConn(connID string) { + if lk, hasConn := cM.connLks[connID]; hasConn { + lk.Unlock() + } } // getConn is used to retrieve a connection from cache @@ -105,8 +125,6 @@ func (cM *ConnManager) getConnWithConfig(connID string, connCfg *config.RPCConn, err = fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport) return } - // fmt.Println(utils.ToJSON(rpcConnCfg)) - // fmt.Println(utils.ToJSON(cM.cfg.GeneralCfg())) if conn, err = rpcclient.NewRPCParallelClientPool(utils.TCP, rpcConnCfg.Address, rpcConnCfg.TLS, utils.FirstNonEmpty(rpcConnCfg.ClientKey, cM.cfg.TLSCfg().ClientKey), utils.FirstNonEmpty(rpcConnCfg.ClientCertificate, cM.cfg.TLSCfg().ClientCerificate), utils.FirstNonEmpty(rpcConnCfg.CaCertificate, cM.cfg.TLSCfg().CaCertificate), utils.FirstIntNonEmpty(rpcConnCfg.ConnectAttempts, cM.cfg.GeneralCfg().ConnectAttempts), @@ -149,7 +167,10 @@ func (cM *ConnManager) Call(connIDs []string, biRPCClient rpcclient.BiRPCConecto } var conn rpcclient.ClientConnector for _, connID := range connIDs { - if conn, err = cM.getConn(connID, biRPCClient); err != nil { + cM.lkConn(connID) + conn, err = cM.getConn(connID, biRPCClient) + cM.unlkConn(connID) + if err != nil { continue } if err = conn.Call(method, arg, reply); !rpcclient.IsNetworkError(err) {