Added *parallel connection strategy

This commit is contained in:
Trial97
2020-01-07 11:12:24 +02:00
parent 860b128e54
commit 92928b7700
8 changed files with 51 additions and 13 deletions

View File

@@ -50,6 +50,7 @@ const CGRATES_CFG_JSON = `
"digest_separator": ",", // separator to use in replies containing data digests
"digest_equal": ":", // equal symbol used in case of digests
"rsr_separator": ";", // separator used within RSR fields
"max_parralel_conns": 100, // the maximum number of connection used by the *parallel strategy
},

View File

@@ -59,6 +59,7 @@ func TestDfGeneralJsonCfg(t *testing.T) {
Digest_separator: utils.StringPointer(","),
Digest_equal: utils.StringPointer(":"),
Rsr_separator: utils.StringPointer(";"),
Max_parralel_conns: utils.IntPointer(100),
}
if gCfg, err := dfCgrJsonCfg.GeneralJsonCfg(); err != nil {
t.Error(err)

View File

@@ -49,6 +49,7 @@ type GeneralCfg struct {
DigestSeparator string //
DigestEqual string //
RSRSep string // separator used to split RSRParser (by degault is used ";")
MaxParralelConns int // the maximum number of connection used by the *parallel strategy
}
//loadFromJsonCfg loads General config from JsonCfg
@@ -129,6 +130,9 @@ func (gencfg *GeneralCfg) loadFromJsonCfg(jsnGeneralCfg *GeneralJsonCfg) (err er
if jsnGeneralCfg.Rsr_separator != nil {
gencfg.RSRSep = *jsnGeneralCfg.Rsr_separator
}
if jsnGeneralCfg.Max_parralel_conns != nil {
gencfg.MaxParralelConns = *jsnGeneralCfg.Max_parralel_conns
}
return nil
}

View File

@@ -42,6 +42,7 @@ type GeneralJsonCfg struct {
Digest_separator *string
Digest_equal *string
Rsr_separator *string
Max_parralel_conns *int
}
// Listen config section

View File

@@ -60,17 +60,8 @@ func (cM *ConnManager) getConn(connID string, biRPCClient rpcclient.ClientConnec
} else {
connCfg = cM.cfg.RPCConns()[connID]
}
var conPool *rpcclient.RPCPool
if conPool, err = NewRPCPool(connCfg.Strategy,
cM.cfg.TlsCfg().ClientKey,
cM.cfg.TlsCfg().ClientCerificate, cM.cfg.TlsCfg().CaCertificate,
cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects,
cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout,
connCfg.Conns, intChan, false); err != nil {
return
}
conn = conPool
if biRPCClient != nil && isBiRPCCLient { // special handling for SessionS BiJSONRPCClient
switch {
case biRPCClient != nil && isBiRPCCLient: // special handling for SessionS BiJSONRPCClient
var rply string
sSIntConn := <-intChan
intChan <- sSIntConn
@@ -82,7 +73,45 @@ func (cM *ConnManager) getConn(connID string, biRPCClient rpcclient.ClientConnec
utils.SessionS, err.Error()))
return
}
case connCfg.Strategy == rpcclient.PoolParallel:
rpcConnCfg := connCfg.Conns[0] // for parrallel we need only the first connection
var conPool *rpcclient.RPCParallelClientPool
if rpcConnCfg.Address == utils.MetaInternal {
conPool, err = rpcclient.NewRPCParallelClientPool("", "", rpcConnCfg.TLS,
cM.cfg.TlsCfg().ClientKey, cM.cfg.TlsCfg().ClientCerificate,
cM.cfg.TlsCfg().CaCertificate, cM.cfg.GeneralCfg().ConnectAttempts,
cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout,
cM.cfg.GeneralCfg().ReplyTimeout, rpcclient.InternalRPC, intChan, int64(cM.cfg.GeneralCfg().MaxParralelConns), false)
} else if utils.SliceHasMember([]string{utils.EmptyString, utils.MetaGOB, utils.MetaJSON}, rpcConnCfg.Transport) {
codec := rpcclient.GOBrpc
if rpcConnCfg.Transport != "" {
codec = rpcConnCfg.Transport
}
conPool, err = rpcclient.NewRPCParallelClientPool(utils.TCP, rpcConnCfg.Address, rpcConnCfg.TLS,
cM.cfg.TlsCfg().ClientKey, cM.cfg.TlsCfg().ClientCerificate,
cM.cfg.TlsCfg().CaCertificate, cM.cfg.GeneralCfg().ConnectAttempts,
cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout,
cM.cfg.GeneralCfg().ReplyTimeout, codec, nil, int64(cM.cfg.GeneralCfg().MaxParralelConns), false)
} else {
err = fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport)
}
if err != nil {
return
}
conn = conPool
default:
var conPool *rpcclient.RPCPool
if conPool, err = NewRPCPool(connCfg.Strategy,
cM.cfg.TlsCfg().ClientKey,
cM.cfg.TlsCfg().ClientCerificate, cM.cfg.TlsCfg().CaCertificate,
cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects,
cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout,
connCfg.Conns, intChan, false); err != nil {
return
}
conn = conPool
}
Cache.Set(utils.CacheRPCConnections, connID, conn, nil,
true, utils.NonTransactional)
return

View File

@@ -308,7 +308,7 @@ func testA1itDataSession1(t *testing.T) {
if err := json.Unmarshal([]byte(cdrs[0].CostDetails), &ec); err != nil {
t.Error(err)
}
cc = *ec.AsCallCost()
cc = *ec.AsCallCost(utils.EmptyString)
if len(cc.Timespans) != 1 {
t.Errorf("Unexpected number of timespans: %+v, for %+v\n from:%+v", len(cc.Timespans), utils.ToJSON(cc.Timespans), utils.ToJSON(ec))
}

2
go.mod
View File

@@ -22,7 +22,7 @@ require (
github.com/cgrates/kamevapi v0.0.0-20191001125829-7dbc3ad58817
github.com/cgrates/ltcache v0.0.0-20181016092649-92fb7fa77cca
github.com/cgrates/radigo v0.0.0-20200102144505-ef98592ff532
github.com/cgrates/rpcclient v0.0.0-20191212101551-ff9c136f66a7
github.com/cgrates/rpcclient v0.0.0-20200107085551-6be8ad1df845
github.com/creack/pty v1.1.7
github.com/fiorix/go-diameter v3.0.3-0.20190716165154-f4823472d0e0+incompatible
github.com/fortytw2/leaktest v1.3.0 // indirect

2
go.sum
View File

@@ -77,6 +77,8 @@ github.com/cgrates/rpcclient v0.0.0-20191209100218-70f91dc30ac6 h1:g1LZmbYvqYkGA
github.com/cgrates/rpcclient v0.0.0-20191209100218-70f91dc30ac6/go.mod h1:xXLqAKVvcdWeDYwHJYwDgAI3ZOg5LZYxzb72kLjsLZU=
github.com/cgrates/rpcclient v0.0.0-20191212101551-ff9c136f66a7 h1:szJPaMz/49gP0X6M1DhjhImDP3rLN8ul7rio2tzorNc=
github.com/cgrates/rpcclient v0.0.0-20191212101551-ff9c136f66a7/go.mod h1:xXLqAKVvcdWeDYwHJYwDgAI3ZOg5LZYxzb72kLjsLZU=
github.com/cgrates/rpcclient v0.0.0-20200107085551-6be8ad1df845 h1:bzvXBBqUqf2q4nW8RAwy+BXwxzlxFSGUAkW8W0TNLk8=
github.com/cgrates/rpcclient v0.0.0-20200107085551-6be8ad1df845/go.mod h1:xXLqAKVvcdWeDYwHJYwDgAI3ZOg5LZYxzb72kLjsLZU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/creack/pty v1.1.7 h1:6pwm8kMQKCmgUg0ZHTm5+/YvRK0s3THD/28+T6/kk4A=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=