diff --git a/config/config_defaults.go b/config/config_defaults.go index 5cf42361b..318b8eb9d 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -50,6 +50,7 @@ const CGRATES_CFG_JSON = ` "digest_separator": ",", // separator to use in replies containing data digests "digest_equal": ":", // equal symbol used in case of digests "rsr_separator": ";", // separator used within RSR fields + "max_parralel_conns": 100, // the maximum number of connection used by the *parallel strategy }, diff --git a/config/config_json_test.go b/config/config_json_test.go index dc3b73a65..c83351ef7 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -59,6 +59,7 @@ func TestDfGeneralJsonCfg(t *testing.T) { Digest_separator: utils.StringPointer(","), Digest_equal: utils.StringPointer(":"), Rsr_separator: utils.StringPointer(";"), + Max_parralel_conns: utils.IntPointer(100), } if gCfg, err := dfCgrJsonCfg.GeneralJsonCfg(); err != nil { t.Error(err) diff --git a/config/generalcfg.go b/config/generalcfg.go index fed9d7575..5205e1a98 100644 --- a/config/generalcfg.go +++ b/config/generalcfg.go @@ -49,6 +49,7 @@ type GeneralCfg struct { DigestSeparator string // DigestEqual string // RSRSep string // separator used to split RSRParser (by degault is used ";") + MaxParralelConns int // the maximum number of connection used by the *parallel strategy } //loadFromJsonCfg loads General config from JsonCfg @@ -129,6 +130,9 @@ func (gencfg *GeneralCfg) loadFromJsonCfg(jsnGeneralCfg *GeneralJsonCfg) (err er if jsnGeneralCfg.Rsr_separator != nil { gencfg.RSRSep = *jsnGeneralCfg.Rsr_separator } + if jsnGeneralCfg.Max_parralel_conns != nil { + gencfg.MaxParralelConns = *jsnGeneralCfg.Max_parralel_conns + } return nil } diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 33df6cc1d..36046e936 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -42,6 +42,7 @@ type GeneralJsonCfg struct { Digest_separator *string Digest_equal *string Rsr_separator *string + Max_parralel_conns *int } // Listen config section diff --git a/engine/connmanager.go b/engine/connmanager.go index 70bcd1089..acd277975 100644 --- a/engine/connmanager.go +++ b/engine/connmanager.go @@ -60,17 +60,8 @@ func (cM *ConnManager) getConn(connID string, biRPCClient rpcclient.ClientConnec } else { connCfg = cM.cfg.RPCConns()[connID] } - var conPool *rpcclient.RPCPool - if conPool, err = NewRPCPool(connCfg.Strategy, - cM.cfg.TlsCfg().ClientKey, - cM.cfg.TlsCfg().ClientCerificate, cM.cfg.TlsCfg().CaCertificate, - cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects, - cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout, - connCfg.Conns, intChan, false); err != nil { - return - } - conn = conPool - if biRPCClient != nil && isBiRPCCLient { // special handling for SessionS BiJSONRPCClient + switch { + case biRPCClient != nil && isBiRPCCLient: // special handling for SessionS BiJSONRPCClient var rply string sSIntConn := <-intChan intChan <- sSIntConn @@ -82,7 +73,45 @@ func (cM *ConnManager) getConn(connID string, biRPCClient rpcclient.ClientConnec utils.SessionS, err.Error())) return } + case connCfg.Strategy == rpcclient.PoolParallel: + rpcConnCfg := connCfg.Conns[0] // for parrallel we need only the first connection + var conPool *rpcclient.RPCParallelClientPool + if rpcConnCfg.Address == utils.MetaInternal { + conPool, err = rpcclient.NewRPCParallelClientPool("", "", rpcConnCfg.TLS, + cM.cfg.TlsCfg().ClientKey, cM.cfg.TlsCfg().ClientCerificate, + cM.cfg.TlsCfg().CaCertificate, cM.cfg.GeneralCfg().ConnectAttempts, + cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout, + cM.cfg.GeneralCfg().ReplyTimeout, rpcclient.InternalRPC, intChan, int64(cM.cfg.GeneralCfg().MaxParralelConns), false) + } else if utils.SliceHasMember([]string{utils.EmptyString, utils.MetaGOB, utils.MetaJSON}, rpcConnCfg.Transport) { + codec := rpcclient.GOBrpc + if rpcConnCfg.Transport != "" { + codec = rpcConnCfg.Transport + } + conPool, err = rpcclient.NewRPCParallelClientPool(utils.TCP, rpcConnCfg.Address, rpcConnCfg.TLS, + cM.cfg.TlsCfg().ClientKey, cM.cfg.TlsCfg().ClientCerificate, + cM.cfg.TlsCfg().CaCertificate, cM.cfg.GeneralCfg().ConnectAttempts, + cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout, + cM.cfg.GeneralCfg().ReplyTimeout, codec, nil, int64(cM.cfg.GeneralCfg().MaxParralelConns), false) + } else { + err = fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport) + } + if err != nil { + return + } + conn = conPool + default: + var conPool *rpcclient.RPCPool + if conPool, err = NewRPCPool(connCfg.Strategy, + cM.cfg.TlsCfg().ClientKey, + cM.cfg.TlsCfg().ClientCerificate, cM.cfg.TlsCfg().CaCertificate, + cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects, + cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout, + connCfg.Conns, intChan, false); err != nil { + return + } + conn = conPool } + Cache.Set(utils.CacheRPCConnections, connID, conn, nil, true, utils.NonTransactional) return diff --git a/general_tests/a1_it_test.go b/general_tests/a1_it_test.go index 16f1fad60..fbc0c14ee 100644 --- a/general_tests/a1_it_test.go +++ b/general_tests/a1_it_test.go @@ -308,7 +308,7 @@ func testA1itDataSession1(t *testing.T) { if err := json.Unmarshal([]byte(cdrs[0].CostDetails), &ec); err != nil { t.Error(err) } - cc = *ec.AsCallCost() + cc = *ec.AsCallCost(utils.EmptyString) if len(cc.Timespans) != 1 { t.Errorf("Unexpected number of timespans: %+v, for %+v\n from:%+v", len(cc.Timespans), utils.ToJSON(cc.Timespans), utils.ToJSON(ec)) } diff --git a/go.mod b/go.mod index 5e117cec7..c68ed8c57 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/cgrates/kamevapi v0.0.0-20191001125829-7dbc3ad58817 github.com/cgrates/ltcache v0.0.0-20181016092649-92fb7fa77cca github.com/cgrates/radigo v0.0.0-20200102144505-ef98592ff532 - github.com/cgrates/rpcclient v0.0.0-20191212101551-ff9c136f66a7 + github.com/cgrates/rpcclient v0.0.0-20200107085551-6be8ad1df845 github.com/creack/pty v1.1.7 github.com/fiorix/go-diameter v3.0.3-0.20190716165154-f4823472d0e0+incompatible github.com/fortytw2/leaktest v1.3.0 // indirect diff --git a/go.sum b/go.sum index 6beee672c..cb2666bf1 100644 --- a/go.sum +++ b/go.sum @@ -77,6 +77,8 @@ github.com/cgrates/rpcclient v0.0.0-20191209100218-70f91dc30ac6 h1:g1LZmbYvqYkGA github.com/cgrates/rpcclient v0.0.0-20191209100218-70f91dc30ac6/go.mod h1:xXLqAKVvcdWeDYwHJYwDgAI3ZOg5LZYxzb72kLjsLZU= github.com/cgrates/rpcclient v0.0.0-20191212101551-ff9c136f66a7 h1:szJPaMz/49gP0X6M1DhjhImDP3rLN8ul7rio2tzorNc= github.com/cgrates/rpcclient v0.0.0-20191212101551-ff9c136f66a7/go.mod h1:xXLqAKVvcdWeDYwHJYwDgAI3ZOg5LZYxzb72kLjsLZU= +github.com/cgrates/rpcclient v0.0.0-20200107085551-6be8ad1df845 h1:bzvXBBqUqf2q4nW8RAwy+BXwxzlxFSGUAkW8W0TNLk8= +github.com/cgrates/rpcclient v0.0.0-20200107085551-6be8ad1df845/go.mod h1:xXLqAKVvcdWeDYwHJYwDgAI3ZOg5LZYxzb72kLjsLZU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/creack/pty v1.1.7 h1:6pwm8kMQKCmgUg0ZHTm5+/YvRK0s3THD/28+T6/kk4A= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=