This commit is contained in:
TeoV
2018-10-25 04:47:40 -04:00
committed by Dan Christian Bogos
parent d860311784
commit 523a41ba56
18 changed files with 480 additions and 66 deletions

View File

@@ -22,6 +22,8 @@ package agents
import (
"bytes"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net/http"
@@ -206,3 +208,189 @@ func TestHAitStopEngine(t *testing.T) {
t.Error(err)
}
}
//Start second tests
func TestHA2itInitCfg(t *testing.T) {
haCfgPath = path.Join(*dataDir, "conf", "samples", "httpagenttls")
// Init config first
var err error
haCfg, err = config.NewCGRConfigFromFolder(haCfgPath)
if err != nil {
t.Error(err)
}
haCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush()
config.SetCgrConfig(haCfg)
//make http client with tls
cert, err := tls.LoadX509KeyPair(haCfg.TlsCfg().ClientCerificate, haCfg.TlsCfg().ClientKey)
if err != nil {
t.Error(err)
}
// Load CA cert
caCert, err := ioutil.ReadFile(haCfg.TlsCfg().CaCertificate)
if err != nil {
t.Error(err)
}
rootCAs, _ := x509.SystemCertPool()
if ok := rootCAs.AppendCertsFromPEM(caCert); !ok {
t.Error("Cannot append CA")
}
// Setup HTTPS client
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: rootCAs,
}
transport := &http.Transport{TLSClientConfig: tlsConfig}
httpC = &http.Client{Transport: transport}
}
// Remove data in both rating and accounting db
func TestHA2itResetDB(t *testing.T) {
if err := engine.InitDataDb(haCfg); err != nil {
t.Fatal(err)
}
if err := engine.InitStorDb(haCfg); err != nil {
t.Fatal(err)
}
}
// Start CGR Engine
func TestHA2itStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(haCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
// Connect rpc client to rater
func TestHA2itApierRpcConn(t *testing.T) {
var err error
haRPC, err = jsonrpc.Dial("tcp", haCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal(err)
}
}
// Load the tariff plan, creating accounts and their balances
func TestHA2itTPFromFolder(t *testing.T) {
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "oldtutorial")}
var loadInst utils.LoadInstance
if err := haRPC.Call("ApierV2.LoadTariffPlanFromFolder", attrs, &loadInst); err != nil {
t.Error(err)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups
}
func TestHA2itAuthDryRun(t *testing.T) {
reqUrl := fmt.Sprintf("https://%s%s?request_type=OutboundAUTH&CallID=123456&Msisdn=497700056231&Imsi=2343000000000123&Destination=491239440004&MSRN=0102220233444488999&ProfileID=1&AgentID=176&GlobalMSISDN=497700056129&GlobalIMSI=214180000175129&ICCID=8923418450000089629&MCC=234&MNC=10&calltype=callback",
haCfg.HTTPTLSListen, haCfg.HttpAgentCfg()[0].Url)
rply, err := httpC.Get(reqUrl)
if err != nil {
t.Error(err)
}
eXml := []byte(`<?xml version="1.0" encoding="UTF-8"?>
<response>
<Allow>1</Allow>
<MaxDuration>1200</MaxDuration>
</response>`)
if body, err := ioutil.ReadAll(rply.Body); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eXml, body) {
t.Errorf("expecting: <%s>, received: <%s>", string(eXml), string(body))
}
rply.Body.Close()
time.Sleep(time.Millisecond)
}
func TestHA2itAuth1001(t *testing.T) {
reqUrl := fmt.Sprintf("https://%s%s?request_type=OutboundAUTH&CallID=123456&Msisdn=1001&Imsi=2343000000000123&Destination=1002&MSRN=0102220233444488999&ProfileID=1&AgentID=176&GlobalMSISDN=497700056129&GlobalIMSI=214180000175129&ICCID=8923418450000089629&MCC=234&MNC=10&calltype=callback",
haCfg.HTTPTLSListen, haCfg.HttpAgentCfg()[0].Url)
rply, err := httpC.Get(reqUrl)
if err != nil {
t.Error(err)
}
eXml := []byte(`<?xml version="1.0" encoding="UTF-8"?>
<response>
<Allow>1</Allow>
<MaxDuration>10800</MaxDuration>
</response>`)
if body, err := ioutil.ReadAll(rply.Body); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eXml, body) {
t.Errorf("expecting: %s, received: %s", string(eXml), string(body))
}
rply.Body.Close()
time.Sleep(time.Millisecond)
}
func TestHA2itCDRmtcall(t *testing.T) {
reqUrl := fmt.Sprintf("https://%s%s?request_type=MTCALL_CDR&timestamp=2018-08-14%%2012:03:22&call_date=2018-0814%%2012:00:49&transactionid=10000&CDR_ID=123456&carrierid=1&mcc=0&mnc=0&imsi=434180000000000&msisdn=1001&destination=1002&leg=C&leg_duration=185&reseller_charge=11.1605&client_charge=0.0000&user_charge=22.0000&IOT=0&user_balance=10.00&cli=%%2B498702190000&polo=0.0100&ddi_map=N",
haCfg.HTTPTLSListen, haCfg.HttpAgentCfg()[0].Url)
rply, err := httpC.Get(reqUrl)
if err != nil {
t.Error(err)
}
eXml := []byte(`<?xml version="1.0" encoding="UTF-8"?>
<CDR_RESPONSE>
<CDR_ID>123456</CDR_ID>
<CDR_STATUS>1</CDR_STATUS>
</CDR_RESPONSE>`)
if body, err := ioutil.ReadAll(rply.Body); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eXml, body) {
t.Errorf("expecting: <%s>, received: <%s>", string(eXml), string(body))
}
rply.Body.Close()
time.Sleep(50 * time.Millisecond)
var cdrs []*engine.ExternalCDR
req := utils.RPCCDRsFilter{RunIDs: []string{utils.META_DEFAULT}}
if err := haRPC.Call("ApierV2.GetCdrs", req, &cdrs); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(cdrs) != 1 {
t.Error("Unexpected number of CDRs returned: ", len(cdrs))
} else {
if cdrs[0].Usage != "3m5s" { // should be 1 but maxUsage returns rounded version
t.Errorf("Unexpected CDR Usage received, cdr: %s ", utils.ToJSON(cdrs[0]))
}
if cdrs[0].Cost != 0.2188 {
t.Errorf("Unexpected CDR Cost received, cdr: %+v ", cdrs[0].Cost)
}
}
}
func TestHA2itCDRmtcall2(t *testing.T) {
xmlBody := `<?xml version="1.0" encoding="utf-8"?><complete-datasession-notification callid="48981764"><createtime>2005-08-26T14:17:34</createtime><reference>Data</reference><userid>528594</userid><username>447700086788</username><customerid>510163</customerid><companyname>Silliname</companyname><totalcost amount="0.1400" currency="USD">0.1400</totalcost><agenttotalcost amount="0.1400" currency="USD">0.1400</agenttotalcost><agentid>234</agentid><callleg calllegid="89357336"><number>447700086788</number><description>China, Peoples Republic of - China Unicom (CU-GSM)</description><mcc>460</mcc><mnc>001</mnc><seconds>32</seconds><bytes>4558</bytes><permegabyterate currency="USD">1.3330</permegabyterate><cost amount="0.1400" currency="USD">0.1400</cost><agentpermegabyterate currency="USD">1.3330</agentpermegabyterate><agentcost amount="0.1400"currency="USD">0.1400</agentcost></callleg></complete-datasession-notification>`
url := fmt.Sprintf("https://%s%s", haCfg.HTTPTLSListen, haCfg.HttpAgentCfg()[1].Url)
req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(xmlBody)))
if err != nil {
t.Error(err)
}
req.Header.Add("Content-Type", "application/xml; charset=utf-8")
resp, err := httpC.Do(req)
if err != nil {
t.Error(err)
}
resp.Body.Close()
time.Sleep(50 * time.Millisecond)
var cdrs []*engine.ExternalCDR
fltr := utils.RPCCDRsFilter{RunIDs: []string{utils.META_DEFAULT}, Accounts: []string{"447700086788"}}
if err := haRPC.Call("ApierV2.GetCdrs", fltr, &cdrs); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(cdrs) != 1 {
t.Error("Unexpected number of CDRs returned: ", len(cdrs))
} else {
if cdrs[0].Usage != "4558" { // should be 1 but maxUsage returns rounded version
t.Errorf("Unexpected CDR Usage received, cdr: %s ", utils.ToJSON(cdrs[0]))
}
}
}
func TestHA2itStopEngine(t *testing.T) {
if err := engine.KillEngine(*waitRater); err != nil {
t.Error(err)
}
}

View File

@@ -44,6 +44,7 @@ var (
certificatePath = flag.String("crt_path", "", "path to certificate for tls connection")
keyPath = flag.String("key_path", "", "path to key for tls connection")
caPath = flag.String("ca_path", "", "path to CA for tls connection(only for self sign certificate)")
tls = flag.Bool("tls", false, "Tls connection")
client *rpcclient.RpcClient
)
@@ -122,7 +123,7 @@ func main() {
return
}
var err error
client, err = rpcclient.NewRpcClient("tcp", *server, *keyPath, *certificatePath, *caPath, 3, 3,
client, err = rpcclient.NewRpcClient("tcp", *server, *tls, *keyPath, *certificatePath, *caPath, 3, 3,
time.Duration(1*time.Second), time.Duration(5*time.Minute), *rpcEncoding, nil, false)
if err != nil {
flag.PrintDefaults()

View File

@@ -307,8 +307,8 @@ func main() {
if len(ldrCfg.LoaderCgrConfig.CachesConns) != 0 { // Init connection to CacheS so we can reload it's data
if cacheS, err = rpcclient.NewRpcClient("tcp",
ldrCfg.LoaderCgrConfig.CachesConns[0].Address,
ldrCfg.TlsCfg().ClientKey, ldrCfg.TlsCfg().ClientCerificate,
ldrCfg.TlsCfg().CaCertificate, 3, 3,
ldrCfg.LoaderCgrConfig.CachesConns[0].Tls, ldrCfg.TlsCfg().ClientKey,
ldrCfg.TlsCfg().ClientCerificate, ldrCfg.TlsCfg().CaCertificate, 3, 3,
time.Duration(1*time.Second), time.Duration(5*time.Minute),
strings.TrimPrefix(ldrCfg.LoaderCgrConfig.CachesConns[0].Transport, utils.Meta),
nil, false); err != nil {
@@ -326,6 +326,7 @@ func main() {
userS = cacheS
} else {
if userS, err = rpcclient.NewRpcClient("tcp", *usersAddress,
ldrCfg.LoaderCgrConfig.CachesConns[0].Tls,
ldrCfg.TlsCfg().ClientKey, ldrCfg.TlsCfg().ClientCerificate,
ldrCfg.TlsCfg().CaCertificate, 3, 3,
time.Duration(1*time.Second), time.Duration(5*time.Minute),

View File

@@ -43,7 +43,7 @@ func main() {
if cdrsMasterCfg, err = config.NewCGRConfigFromFolder(cdrsMasterCfgPath); err != nil {
log.Fatal("Got config error: ", err.Error())
}
cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, "", "", "", 1, 1,
cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, false, "", "", "", 1, 1,
time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil, false)
if err != nil {
log.Fatal("Could not connect to rater: ", err.Error())

View File

@@ -271,6 +271,7 @@ type HaPoolJsonCfg struct {
Address *string
Transport *string
Synchronous *bool
Tls *bool
}
type AstConnJsonCfg struct {

View File

@@ -38,6 +38,7 @@ type HaPoolConfig struct {
Address string
Transport string
Synchronous bool
Tls bool
}
func (self *HaPoolConfig) loadFromJsonCfg(jsnCfg *HaPoolJsonCfg) error {
@@ -53,6 +54,9 @@ func (self *HaPoolConfig) loadFromJsonCfg(jsnCfg *HaPoolJsonCfg) error {
if jsnCfg.Synchronous != nil {
self.Synchronous = *jsnCfg.Synchronous
}
if jsnCfg.Tls != nil {
self.Tls = *jsnCfg.Synchronous
}
return nil
}

View File

@@ -0,0 +1,68 @@
{
// CGRateS Configuration file
//
"general": {
"log_level": 7,
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
"rpc_json_tls":":2022",
"rpc_gob_tls":":2023",
"http_tls": "localhost:2280",
},
"tls": {
"server_certificate" : "/usr/share/cgrates/tls/server.crt", // path to server certificate(must conatin server.crt + ca.crt)
"server_key":"/usr/share/cgrates/tls/server.key", // path to server key
"client_certificate" : "/usr/share/cgrates/tls/client.crt", // path to client certificate(must conatin client.crt + ca.crt)
"client_key":"/usr/share/cgrates/tls/client.key", // path to client key
"ca_certificate":"/usr/share/cgrates/tls/ca.crt",
},
"stor_db": {
"db_password": "CGRateS.org",
},
"rals": {
"enabled": true,
},
"scheduler": {
"enabled": true,
},
"cdrs": {
"enabled": true,
},
"attributes": {
"enabled": true,
},
"sessions": {
"enabled": true,
"attributes_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"cdrs_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"rals_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
},
}

View File

@@ -0,0 +1,122 @@
{
"http_agent": [
{
"id": "conecto1",
"url": "/conecto",
"sessions_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"request_payload": "*url",
"reply_payload": "*xml",
"request_processors": [
{
"id": "OutboundAUTHDryRun",
"filters": ["*string:*req.request_type:OutboundAUTH","*string:*req.Msisdn:497700056231"],
"tenant": "cgrates.org",
"flags": ["*dryrun"],
"request_fields":[
],
"reply_fields":[
{"tag": "Allow", "field_id": "response.Allow", "type": "*constant",
"value": "1", "mandatory": true},
{"tag": "MaxDuration", "field_id": "response.MaxDuration", "type": "*constant",
"value": "1200", "blocker": true},
{"tag": "Unused", "field_id": "response.Unused", "type": "*constant",
"value": "0"},
],
},
{
"id": "OutboundAUTH",
"filters": ["*string:*req.request_type:OutboundAUTH"],
"tenant": "cgrates.org",
"flags": [ "*auth", "*accounts", "*attributes"],
"request_fields":[
{"tag": "RequestType", "field_id": "RequestType", "type": "*constant",
"value": "*pseudoprepaid", "mandatory": true},
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed",
"value": "~*req.CallID", "mandatory": true},
{"tag": "Account", "field_id": "Account", "type": "*composed",
"value": "~*req.Msisdn", "mandatory": true},
{"tag": "Destination", "field_id": "Destination", "type": "*composed",
"value": "~*req.Destination", "mandatory": true},
{"tag": "SetupTime", "field_id": "SetupTime", "type": "*constant",
"value": "*now", "mandatory": true},
],
"reply_fields":[
{"tag": "Allow", "field_id": "response.Allow", "type": "*constant",
"value": "1", "mandatory": true},
{"tag": "MaxDuration", "field_id": "response.MaxDuration", "type": "*composed",
"value": "~*cgrep.MaxUsage{*duration_seconds}", "mandatory": true},
],
},
{
"id": "mtcall_cdr",
"filters": ["*string:*req.request_type:MTCALL_CDR"],
"tenant": "cgrates.org",
"flags": ["*cdrs"],
"request_fields":[
{"tag": "RequestType", "field_id": "RequestType", "type": "*constant",
"value": "*pseudoprepaid", "mandatory": true},
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed",
"value": "~*req.CDR_ID", "mandatory": true},
{"tag": "Account", "field_id": "Account", "type": "*composed",
"value": "~*req.msisdn", "mandatory": true},
{"tag": "Destination", "field_id": "Destination", "type": "*composed",
"value": "~*req.destination", "mandatory": true},
{"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed",
"value": "~*req.timestamp", "mandatory": true},
{"tag": "AnswerTime", "field_id": "SetupTime", "type": "*composed",
"value": "~*req.timestamp", "mandatory": true},
{"tag": "Usage", "field_id": "Usage", "type": "*composed",
"value": "~*req.leg_duration;s", "mandatory": true},
],
"reply_fields":[
{"tag": "CDR_ID", "field_id": "CDR_RESPONSE.CDR_ID", "type": "*composed",
"value": "~*req.CDR_ID", "mandatory": true},
{"tag": "CDR_STATUS", "field_id": "CDR_RESPONSE.CDR_STATUS", "type": "*constant",
"value": "1", "mandatory": true},
],
}
],
},
{
"id": "conecto_xml",
"url": "/conecto_xml",
"sessions_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"request_payload": "*xml",
"reply_payload": "*xml",
"request_processors": [
{
"id": "cdr_from_xml",
"tenant": "cgrates.org",
"flags": ["*cdrs"],
"request_fields":[
{"tag": "TOR", "field_id": "ToR", "type": "*constant",
"value": "*data", "mandatory": true},
{"tag": "RequestType", "field_id": "RequestType", "type": "*constant",
"value": "*pseudoprepaid", "mandatory": true},
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed",
"value": "~*req.complete-datasession-notification.customerid", "mandatory": true},
{"tag": "Account", "field_id": "Account", "type": "*composed",
"value": "~*req.complete-datasession-notification.username", "mandatory": true},
{"tag": "Destination", "field_id": "Destination", "type": "*composed",
"value": "~*req.complete-datasession-notification.userid", "mandatory": true},
{"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed",
"value": "~*req.complete-datasession-notification.createtime", "mandatory": true},
{"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed",
"value": "~*req.complete-datasession-notification.createtime", "mandatory": true},
{"tag": "Usage", "field_id": "Usage", "type": "*composed",
"value": "~*req.complete-datasession-notification.callleg.bytes", "mandatory": true},
],
"reply_fields":[],
}
],
},
],
}

View File

@@ -40,7 +40,6 @@
"db_type": "redis", // data_db type: <redis|mongo>
"db_port": 6379, // data_db port to reach the database
"db_name": "10", // data_db database name to connect to
},
@@ -49,43 +48,6 @@
},
"cache":{
"destinations": {"limit": 10000, "ttl":"0s", "precache": true},
"reverse_destinations": {"limit": 10000, "ttl":"0s", "precache": true},
"rating_plans": {"limit": 10000, "ttl":"0s","precache": true},
"rating_profiles": {"limit": 10000, "ttl":"0s", "precache": true},
"lcr_rules": {"limit": 10000, "ttl":"0s", "precache": true},
"cdr_stats": {"limit": 10000, "ttl":"0s", "precache": true},
"actions": {"limit": 10000, "ttl":"0s", "precache": true},
"action_plans": {"limit": 10000, "ttl":"0s", "precache": true},
"account_action_plans": {"limit": 10000, "ttl":"0s", "precache": true},
"action_triggers": {"limit": 10000, "ttl":"0s", "precache": true},
"shared_groups": {"limit": 10000, "ttl":"0s", "precache": true},
"aliases": {"limit": 10000, "ttl":"0s", "precache": true},
"reverse_aliases": {"limit": 10000, "ttl":"0s", "precache": true},
"derived_chargers": {"limit": 10000, "ttl":"0s", "precache": true},
"resource_profiles": {"limit": 10000, "ttl":"0s", "precache": true},
"resources": {"limit": 10000, "ttl":"0s", "precache": true},
"statqueues": {"limit": 10000, "ttl":"0s", "precache": true},
"statqueue_profiles": {"limit": 10000, "ttl":"0s", "precache": true},
"thresholds": {"limit": 10000, "ttl":"0s", "precache": true},
"threshold_profiles": {"limit": 10000, "ttl":"0s", "precache": true},
"filters": {"limit": 10000, "ttl":"0s", "precache": true},
"supplier_profiles": {"limit": 10000, "ttl":"0s", "precache": true},
"attribute_profiles": {"limit": 10000, "ttl":"0s", "precache": true},
"resource_filter_indexes" :{"limit": 10000, "ttl":"0s"},
"resource_filter_revindexes" : {"limit": 10000, "ttl":"0s"},
"stat_filter_indexes" : {"limit": 10000, "ttl":"0s"},
"stat_filter_revindexes" : {"limit": 10000, "ttl":"0s"},
"threshold_filter_indexes" : {"limit": 10000, "ttl":"0s"},
"threshold_filter_revindexes" : {"limit": 10000, "ttl":"0s"},
"supplier_filter_indexes" : {"limit": 10000, "ttl":"0s"},
"supplier_filter_revindexes" :{"limit": 10000, "ttl":"0s"},
"attribute_filter_indexes" : {"limit": 10000, "ttl":"0s"},
"attribute_filter_revindexes" : {"limit": 10000, "ttl":"0s"},
},
"rals": {
"enabled": true,
},
@@ -95,7 +57,7 @@
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "*internal"}
{"address": "127.0.0.1:2012", "transport": "*json"}
],
},
@@ -104,7 +66,7 @@
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "*internal"}
{"address": "127.0.0.1:2012", "transport": "*json"}
],
},
@@ -113,5 +75,35 @@
"store_interval": "1s",
},
"sessions": {
"enabled": true,
"rals_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"resources_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"attributes_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
},
"attributes": {
"enabled": true,
},
"cdrs": {
"enabled": true,
"chargers_conns":[
{"address": "*internal"}
],
},
"chargers": {
"enabled": true,
"attributes_conns": [
{"address": "*internal"}
],
},
}

View File

@@ -682,7 +682,7 @@ func cgrRPCAction(account *Account, sq *CDRStatsQueueTriggered, a *Action, acs A
}
var client rpcclient.RpcClientConnection
if req.Address != utils.MetaInternal {
if client, err = rpcclient.NewRpcClient("tcp", req.Address, "", "", "",
if client, err = rpcclient.NewRpcClient("tcp", req.Address, false, "", "", "",
req.Attempts, 0, config.CgrConfig().GeneralCfg().ConnectTimeout,
config.CgrConfig().GeneralCfg().ReplyTimeout, req.Transport,
nil, false); err != nil {

View File

@@ -28,7 +28,7 @@ import (
"github.com/cgrates/rpcclient"
)
func NewRPCPool(dispatchStrategy, key_path, cert_path, ca_path string, connAttempts, reconnects int,
func NewRPCPool(dispatchStrategy string, key_path, cert_path, ca_path string, connAttempts, reconnects int,
connectTimeout, replyTimeout time.Duration, rpcConnCfgs []*config.HaPoolConfig,
internalConnChan chan rpcclient.RpcClientConnection, ttl time.Duration) (*rpcclient.RpcClientPool, error) {
var rpcClient *rpcclient.RpcClient
@@ -44,14 +44,14 @@ func NewRPCPool(dispatchStrategy, key_path, cert_path, ca_path string, connAttem
case <-time.After(ttl):
return nil, errors.New("TTL triggered")
}
rpcClient, err = rpcclient.NewRpcClient("", "", key_path, cert_path, ca_path, connAttempts,
rpcClient, err = rpcclient.NewRpcClient("", "", rpcConnCfg.Tls, key_path, cert_path, ca_path, connAttempts,
reconnects, connectTimeout, replyTimeout, rpcclient.INTERNAL_RPC, internalConn, false)
} else if utils.IsSliceMember([]string{utils.MetaJSONrpc, utils.MetaGOBrpc, ""}, rpcConnCfg.Transport) {
codec := utils.GOB
if rpcConnCfg.Transport != "" {
codec = rpcConnCfg.Transport[1:] // Transport contains always * before codec understood by rpcclient
}
rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Address, key_path, cert_path, ca_path,
rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Address, rpcConnCfg.Tls, key_path, cert_path, ca_path,
connAttempts, reconnects, connectTimeout, replyTimeout, codec, nil, false)
} else {
return nil, fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport)

View File

@@ -239,7 +239,7 @@ type ProxyPubSub struct {
}
func NewProxyPubSub(addr string, attempts, reconnects int, connectTimeout, replyTimeout time.Duration) (*ProxyPubSub, error) {
client, err := rpcclient.NewRpcClient("tcp", addr, "", "", "", attempts, reconnects, connectTimeout, replyTimeout, utils.GOB, nil, false)
client, err := rpcclient.NewRpcClient("tcp", addr, false, "", "", "", attempts, reconnects, connectTimeout, replyTimeout, utils.GOB, nil, false)
if err != nil {
return nil, err
}

View File

@@ -86,7 +86,7 @@ func TestCDRsOnExpStartSlaveEngine(t *testing.T) {
// Connect rpc client to rater
func TestCDRsOnExpHttpCdrReplication(t *testing.T) {
cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, "", "", "", 1, 1,
cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, false, "", "", "", 1, 1,
time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil, false)
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
@@ -104,7 +104,7 @@ func TestCDRsOnExpHttpCdrReplication(t *testing.T) {
t.Error("Unexpected reply received: ", reply)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
cdrsSlaveRpc, err := rpcclient.NewRpcClient("tcp", "127.0.0.1:12012", "", "", "", 1, 1,
cdrsSlaveRpc, err := rpcclient.NewRpcClient("tcp", "127.0.0.1:12012", false, "", "", "", 1, 1,
time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil, false)
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())

View File

@@ -78,13 +78,13 @@ func TestRPCITLclStartSecondEngine(t *testing.T) {
// Connect rpc client to rater
func TestRPCITLclRpcConnPoolFirst(t *testing.T) {
rpcPoolFirst = rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST, 0)
rpcRAL1, err = rpcclient.NewRpcClient("tcp", rpcITCfg1.RPCJSONListen, "", "", "", 3, 1,
rpcRAL1, err = rpcclient.NewRpcClient("tcp", rpcITCfg1.RPCJSONListen, false, "", "", "", 3, 1,
time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil, false)
if err == nil {
t.Fatal("Should receive cannot connect error here")
}
rpcPoolFirst.AddClient(rpcRAL1)
rpcRAL2, err = rpcclient.NewRpcClient("tcp", rpcITCfg2.RPCJSONListen, "", "", "", 3, 1,
rpcRAL2, err = rpcclient.NewRpcClient("tcp", rpcITCfg2.RPCJSONListen, false, "", "", "", 3, 1,
time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil, false)
if err != nil {
t.Fatal(err)
@@ -312,13 +312,13 @@ func TestRPCITRmtRpcConnPool(t *testing.T) {
return
}
rpcPoolFirst = rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST, 0)
rpcRALRmt, err := rpcclient.NewRpcClient("tcp", RemoteRALsAddr1, "", "", "", 1, 1,
rpcRALRmt, err := rpcclient.NewRpcClient("tcp", RemoteRALsAddr1, false, "", "", "", 1, 1,
time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil, false)
if err != nil {
t.Fatal(err)
}
rpcPoolFirst.AddClient(rpcRALRmt)
rpcRAL1, err = rpcclient.NewRpcClient("tcp", RemoteRALsAddr2, "", "", "", 1, 1,
rpcRAL1, err = rpcclient.NewRpcClient("tcp", RemoteRALsAddr2, false, "", "", "", 1, 1,
time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil, false)
if err != nil {
t.Fatal(err)

View File

@@ -26,6 +26,7 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
@@ -80,21 +81,21 @@ func testTLSStartEngine(t *testing.T) {
func testTLSRpcConn(t *testing.T) {
var err error
tlsRpcClientJson, err = rpcclient.NewRpcClient("tcp", "localhost:2022", tlsCfg.TlsCfg().ClientKey,
tlsRpcClientJson, err = rpcclient.NewRpcClient("tcp", "localhost:2022", true, tlsCfg.TlsCfg().ClientKey,
tlsCfg.TlsCfg().ClientCerificate, tlsCfg.TlsCfg().CaCertificate, 3, 3,
time.Duration(1*time.Second), time.Duration(5*time.Minute), utils.JSON, nil, false)
if err != nil {
t.Errorf("Error: %s when dialing", err)
}
tlsRpcClientGob, err = rpcclient.NewRpcClient("tcp", "localhost:2023", tlsCfg.TlsCfg().ClientKey,
tlsRpcClientGob, err = rpcclient.NewRpcClient("tcp", "localhost:2023", true, tlsCfg.TlsCfg().ClientKey,
tlsCfg.TlsCfg().ClientCerificate, tlsCfg.TlsCfg().CaCertificate, 3, 3,
time.Duration(1*time.Second), time.Duration(5*time.Minute), utils.GOB, nil, false)
if err != nil {
t.Errorf("Error: %s when dialing", err)
}
tlsHTTPJson, err = rpcclient.NewRpcClient("tcp", "https://localhost:2280/jsonrpc", tlsCfg.TlsCfg().ClientKey,
tlsHTTPJson, err = rpcclient.NewRpcClient("tcp", "https://localhost:2280/jsonrpc", true, tlsCfg.TlsCfg().ClientKey,
tlsCfg.TlsCfg().ClientCerificate, tlsCfg.TlsCfg().CaCertificate, 3, 3,
time.Duration(1*time.Second), time.Duration(5*time.Minute), rpcclient.JSON_HTTP, nil, false)
if err != nil {
@@ -129,6 +130,35 @@ func testTLSPing(t *testing.T) {
if err := tlsHTTPJson.Call(utils.DispatcherSv1Ping, "", &reply); err == nil {
t.Error(err)
}
initUsage := time.Duration(5 * time.Minute)
args := &sessions.V1InitSessionArgs{
InitSession: true,
AllocateResources: true,
GetAttributes: true,
CGREvent: utils.CGREvent{
Tenant: "cgrates.org",
ID: "TestSSv1ItInitiateSession",
Event: map[string]interface{}{
utils.Tenant: "cgrates.org",
utils.Category: "call",
utils.ToR: utils.VOICE,
utils.OriginID: "TestSSv1It1",
utils.RequestType: utils.META_PREPAID,
utils.Account: "1001",
utils.Subject: "ANY2CNT",
utils.Destination: "1002",
utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC),
utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC),
utils.Usage: initUsage,
},
},
}
var rply sessions.V1InitReplyWithDigest
if err := tlsHTTPJson.Call(utils.SessionSv1InitiateSessionWithDigest,
args, &rply); err == nil {
t.Error(err)
}
}
func testTLSStopEngine(t *testing.T) {

2
glide.lock generated
View File

@@ -20,7 +20,7 @@ imports:
- name: github.com/cgrates/radigo
version: 69d4269e21990c0f120b8e60d5b75d533db7f3dd
- name: github.com/cgrates/rpcclient
version: 7e7a74eff4a9cc56dbfd43502b8a4c2757ffbd67
version: 7316bff37a2b8692fbadd57f9c9cda070cc33081
- name: github.com/DisposaBoy/JsonConfigReader
version: 5ea4d0ddac554439159cd6f191cb94a110d73352
- name: github.com/fiorix/go-diameter

View File

@@ -50,7 +50,7 @@ func NewSessionReplicationConns(conns []*config.HaPoolConfig, reconnects int,
connTimeout, replyTimeout time.Duration) (smgConns []*SMGReplicationConn, err error) {
smgConns = make([]*SMGReplicationConn, len(conns))
for i, replConnCfg := range conns {
if replCon, err := rpcclient.NewRpcClient("tcp", replConnCfg.Address, "", "", "", 0, reconnects,
if replCon, err := rpcclient.NewRpcClient("tcp", replConnCfg.Address, replConnCfg.Tls, "", "", "", 0, reconnects,
connTimeout, replyTimeout, replConnCfg.Transport[1:], nil, true); err != nil {
return nil, err
} else {

View File

@@ -45,6 +45,7 @@ type Server struct {
httpEnabled bool
birpcSrv *rpc2.Server
sync.RWMutex
httpsMux *http.ServeMux
}
func (s *Server) RpcRegister(rcvr interface{}) {
@@ -63,6 +64,9 @@ func (s *Server) RpcRegisterName(name string, rcvr interface{}) {
func (s *Server) RegisterHttpFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
http.HandleFunc(pattern, handler)
if s.httpsMux != nil {
s.httpsMux.HandleFunc(pattern, handler)
}
s.Lock()
s.httpEnabled = true
s.Unlock()
@@ -70,6 +74,9 @@ func (s *Server) RegisterHttpFunc(pattern string, handler func(http.ResponseWrit
func (s *Server) RegisterHttpHandler(pattern string, handler http.Handler) {
http.Handle(pattern, handler)
if s.httpsMux != nil {
s.httpsMux.Handle(pattern, handler)
}
s.Lock()
s.httpEnabled = true
s.Unlock()
@@ -419,16 +426,16 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP
if !enabled {
return
}
mux := http.NewServeMux()
s.httpsMux = http.NewServeMux()
if enabled && jsonRPCURL != "" {
s.Lock()
s.httpEnabled = true
s.Unlock()
Logger.Info("<HTTPTLS> enabling handler for JSON-RPC")
if useBasicAuth {
mux.HandleFunc(jsonRPCURL, use(handleRequest, basicAuth(userList)))
s.httpsMux.HandleFunc(jsonRPCURL, use(handleRequest, basicAuth(userList)))
} else {
mux.HandleFunc(jsonRPCURL, handleRequest)
s.httpsMux.HandleFunc(jsonRPCURL, handleRequest)
}
}
if enabled && wsRPCURL != "" {
@@ -440,11 +447,11 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP
jsonrpc.ServeConn(ws)
})
if useBasicAuth {
mux.HandleFunc(wsRPCURL, use(func(w http.ResponseWriter, r *http.Request) {
s.httpsMux.HandleFunc(wsRPCURL, use(func(w http.ResponseWriter, r *http.Request) {
wsHandler.ServeHTTP(w, r)
}, basicAuth(userList)))
} else {
mux.Handle(wsRPCURL, wsHandler)
s.httpsMux.Handle(wsRPCURL, wsHandler)
}
}
if !s.httpEnabled {
@@ -459,7 +466,7 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP
}
httpSrv := http.Server{
Addr: addr,
Handler: mux,
Handler: s.httpsMux,
TLSConfig: &config,
}
Logger.Info(fmt.Sprintf("<HTTPTLS> start listening at <%s>", addr))