diff --git a/agents/httpagent_it_test.go b/agents/httpagent_it_test.go index 137d8a2e4..8e2e85696 100644 --- a/agents/httpagent_it_test.go +++ b/agents/httpagent_it_test.go @@ -22,6 +22,8 @@ package agents import ( "bytes" + "crypto/tls" + "crypto/x509" "fmt" "io/ioutil" "net/http" @@ -206,3 +208,189 @@ func TestHAitStopEngine(t *testing.T) { t.Error(err) } } + +//Start second tests + +func TestHA2itInitCfg(t *testing.T) { + haCfgPath = path.Join(*dataDir, "conf", "samples", "httpagenttls") + // Init config first + var err error + haCfg, err = config.NewCGRConfigFromFolder(haCfgPath) + if err != nil { + t.Error(err) + } + haCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush() + config.SetCgrConfig(haCfg) + //make http client with tls + cert, err := tls.LoadX509KeyPair(haCfg.TlsCfg().ClientCerificate, haCfg.TlsCfg().ClientKey) + if err != nil { + t.Error(err) + } + + // Load CA cert + caCert, err := ioutil.ReadFile(haCfg.TlsCfg().CaCertificate) + if err != nil { + t.Error(err) + } + rootCAs, _ := x509.SystemCertPool() + if ok := rootCAs.AppendCertsFromPEM(caCert); !ok { + t.Error("Cannot append CA") + } + + // Setup HTTPS client + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: rootCAs, + } + transport := &http.Transport{TLSClientConfig: tlsConfig} + httpC = &http.Client{Transport: transport} +} + +// Remove data in both rating and accounting db +func TestHA2itResetDB(t *testing.T) { + if err := engine.InitDataDb(haCfg); err != nil { + t.Fatal(err) + } + if err := engine.InitStorDb(haCfg); err != nil { + t.Fatal(err) + } +} + +// Start CGR Engine +func TestHA2itStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(haCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func TestHA2itApierRpcConn(t *testing.T) { + var err error + haRPC, err = jsonrpc.Dial("tcp", haCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal(err) + } +} + +// Load the tariff plan, creating accounts and their balances +func TestHA2itTPFromFolder(t *testing.T) { + attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "oldtutorial")} + var loadInst utils.LoadInstance + if err := haRPC.Call("ApierV2.LoadTariffPlanFromFolder", attrs, &loadInst); err != nil { + t.Error(err) + } + time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups +} + +func TestHA2itAuthDryRun(t *testing.T) { + reqUrl := fmt.Sprintf("https://%s%s?request_type=OutboundAUTH&CallID=123456&Msisdn=497700056231&Imsi=2343000000000123&Destination=491239440004&MSRN=0102220233444488999&ProfileID=1&AgentID=176&GlobalMSISDN=497700056129&GlobalIMSI=214180000175129&ICCID=8923418450000089629&MCC=234&MNC=10&calltype=callback", + haCfg.HTTPTLSListen, haCfg.HttpAgentCfg()[0].Url) + rply, err := httpC.Get(reqUrl) + if err != nil { + t.Error(err) + } + eXml := []byte(` + + 1 + 1200 +`) + if body, err := ioutil.ReadAll(rply.Body); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eXml, body) { + t.Errorf("expecting: <%s>, received: <%s>", string(eXml), string(body)) + } + rply.Body.Close() + time.Sleep(time.Millisecond) +} + +func TestHA2itAuth1001(t *testing.T) { + reqUrl := fmt.Sprintf("https://%s%s?request_type=OutboundAUTH&CallID=123456&Msisdn=1001&Imsi=2343000000000123&Destination=1002&MSRN=0102220233444488999&ProfileID=1&AgentID=176&GlobalMSISDN=497700056129&GlobalIMSI=214180000175129&ICCID=8923418450000089629&MCC=234&MNC=10&calltype=callback", + haCfg.HTTPTLSListen, haCfg.HttpAgentCfg()[0].Url) + rply, err := httpC.Get(reqUrl) + if err != nil { + t.Error(err) + } + eXml := []byte(` + + 1 + 10800 +`) + if body, err := ioutil.ReadAll(rply.Body); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eXml, body) { + t.Errorf("expecting: %s, received: %s", string(eXml), string(body)) + } + rply.Body.Close() + time.Sleep(time.Millisecond) +} + +func TestHA2itCDRmtcall(t *testing.T) { + reqUrl := fmt.Sprintf("https://%s%s?request_type=MTCALL_CDR×tamp=2018-08-14%%2012:03:22&call_date=2018-0814%%2012:00:49&transactionid=10000&CDR_ID=123456&carrierid=1&mcc=0&mnc=0&imsi=434180000000000&msisdn=1001&destination=1002&leg=C&leg_duration=185&reseller_charge=11.1605&client_charge=0.0000&user_charge=22.0000&IOT=0&user_balance=10.00&cli=%%2B498702190000&polo=0.0100&ddi_map=N", + haCfg.HTTPTLSListen, haCfg.HttpAgentCfg()[0].Url) + rply, err := httpC.Get(reqUrl) + if err != nil { + t.Error(err) + } + eXml := []byte(` + + 123456 + 1 +`) + if body, err := ioutil.ReadAll(rply.Body); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eXml, body) { + t.Errorf("expecting: <%s>, received: <%s>", string(eXml), string(body)) + } + rply.Body.Close() + time.Sleep(50 * time.Millisecond) + var cdrs []*engine.ExternalCDR + req := utils.RPCCDRsFilter{RunIDs: []string{utils.META_DEFAULT}} + if err := haRPC.Call("ApierV2.GetCdrs", req, &cdrs); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(cdrs) != 1 { + t.Error("Unexpected number of CDRs returned: ", len(cdrs)) + } else { + if cdrs[0].Usage != "3m5s" { // should be 1 but maxUsage returns rounded version + t.Errorf("Unexpected CDR Usage received, cdr: %s ", utils.ToJSON(cdrs[0])) + } + if cdrs[0].Cost != 0.2188 { + t.Errorf("Unexpected CDR Cost received, cdr: %+v ", cdrs[0].Cost) + } + } +} + +func TestHA2itCDRmtcall2(t *testing.T) { + xmlBody := `2005-08-26T14:17:34Data528594447700086788510163Silliname0.14000.1400234447700086788China, Peoples Republic of - China Unicom (CU-GSM)4600013245581.33300.14001.33300.1400` + + url := fmt.Sprintf("https://%s%s", haCfg.HTTPTLSListen, haCfg.HttpAgentCfg()[1].Url) + + req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(xmlBody))) + if err != nil { + t.Error(err) + } + req.Header.Add("Content-Type", "application/xml; charset=utf-8") + resp, err := httpC.Do(req) + if err != nil { + t.Error(err) + } + resp.Body.Close() + + time.Sleep(50 * time.Millisecond) + var cdrs []*engine.ExternalCDR + fltr := utils.RPCCDRsFilter{RunIDs: []string{utils.META_DEFAULT}, Accounts: []string{"447700086788"}} + if err := haRPC.Call("ApierV2.GetCdrs", fltr, &cdrs); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(cdrs) != 1 { + t.Error("Unexpected number of CDRs returned: ", len(cdrs)) + } else { + if cdrs[0].Usage != "4558" { // should be 1 but maxUsage returns rounded version + t.Errorf("Unexpected CDR Usage received, cdr: %s ", utils.ToJSON(cdrs[0])) + } + } +} + +func TestHA2itStopEngine(t *testing.T) { + if err := engine.KillEngine(*waitRater); err != nil { + t.Error(err) + } +} diff --git a/cmd/cgr-console/cgr-console.go b/cmd/cgr-console/cgr-console.go index 43236d7cb..ee1d5efc7 100644 --- a/cmd/cgr-console/cgr-console.go +++ b/cmd/cgr-console/cgr-console.go @@ -44,6 +44,7 @@ var ( certificatePath = flag.String("crt_path", "", "path to certificate for tls connection") keyPath = flag.String("key_path", "", "path to key for tls connection") caPath = flag.String("ca_path", "", "path to CA for tls connection(only for self sign certificate)") + tls = flag.Bool("tls", false, "Tls connection") client *rpcclient.RpcClient ) @@ -122,7 +123,7 @@ func main() { return } var err error - client, err = rpcclient.NewRpcClient("tcp", *server, *keyPath, *certificatePath, *caPath, 3, 3, + client, err = rpcclient.NewRpcClient("tcp", *server, *tls, *keyPath, *certificatePath, *caPath, 3, 3, time.Duration(1*time.Second), time.Duration(5*time.Minute), *rpcEncoding, nil, false) if err != nil { flag.PrintDefaults() diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 1dfdd397c..da9d30b40 100755 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -307,8 +307,8 @@ func main() { if len(ldrCfg.LoaderCgrConfig.CachesConns) != 0 { // Init connection to CacheS so we can reload it's data if cacheS, err = rpcclient.NewRpcClient("tcp", ldrCfg.LoaderCgrConfig.CachesConns[0].Address, - ldrCfg.TlsCfg().ClientKey, ldrCfg.TlsCfg().ClientCerificate, - ldrCfg.TlsCfg().CaCertificate, 3, 3, + ldrCfg.LoaderCgrConfig.CachesConns[0].Tls, ldrCfg.TlsCfg().ClientKey, + ldrCfg.TlsCfg().ClientCerificate, ldrCfg.TlsCfg().CaCertificate, 3, 3, time.Duration(1*time.Second), time.Duration(5*time.Minute), strings.TrimPrefix(ldrCfg.LoaderCgrConfig.CachesConns[0].Transport, utils.Meta), nil, false); err != nil { @@ -326,6 +326,7 @@ func main() { userS = cacheS } else { if userS, err = rpcclient.NewRpcClient("tcp", *usersAddress, + ldrCfg.LoaderCgrConfig.CachesConns[0].Tls, ldrCfg.TlsCfg().ClientKey, ldrCfg.TlsCfg().ClientCerificate, ldrCfg.TlsCfg().CaCertificate, 3, 3, time.Duration(1*time.Second), time.Duration(5*time.Minute), diff --git a/cmd/cgr-tester/cdr_repl/process_cdr.go b/cmd/cgr-tester/cdr_repl/process_cdr.go index d08b6dd67..8cdafea6c 100644 --- a/cmd/cgr-tester/cdr_repl/process_cdr.go +++ b/cmd/cgr-tester/cdr_repl/process_cdr.go @@ -43,7 +43,7 @@ func main() { if cdrsMasterCfg, err = config.NewCGRConfigFromFolder(cdrsMasterCfgPath); err != nil { log.Fatal("Got config error: ", err.Error()) } - cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, "", "", "", 1, 1, + cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, false, "", "", "", 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil, false) if err != nil { log.Fatal("Could not connect to rater: ", err.Error()) diff --git a/config/libconfig_json.go b/config/libconfig_json.go index bd0fc5534..68afce2d6 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -271,6 +271,7 @@ type HaPoolJsonCfg struct { Address *string Transport *string Synchronous *bool + Tls *bool } type AstConnJsonCfg struct { diff --git a/config/smconfig.go b/config/smconfig.go index a0696c095..195909b8e 100644 --- a/config/smconfig.go +++ b/config/smconfig.go @@ -38,6 +38,7 @@ type HaPoolConfig struct { Address string Transport string Synchronous bool + Tls bool } func (self *HaPoolConfig) loadFromJsonCfg(jsnCfg *HaPoolJsonCfg) error { @@ -53,6 +54,9 @@ func (self *HaPoolConfig) loadFromJsonCfg(jsnCfg *HaPoolJsonCfg) error { if jsnCfg.Synchronous != nil { self.Synchronous = *jsnCfg.Synchronous } + if jsnCfg.Tls != nil { + self.Tls = *jsnCfg.Synchronous + } return nil } diff --git a/data/conf/samples/httpagenttls/cgrates.json b/data/conf/samples/httpagenttls/cgrates.json new file mode 100755 index 000000000..8a9339576 --- /dev/null +++ b/data/conf/samples/httpagenttls/cgrates.json @@ -0,0 +1,68 @@ +{ +// CGRateS Configuration file +// + + +"general": { + "log_level": 7, +}, + + +"listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080", + "rpc_json_tls":":2022", + "rpc_gob_tls":":2023", + "http_tls": "localhost:2280", +}, + +"tls": { + "server_certificate" : "/usr/share/cgrates/tls/server.crt", // path to server certificate(must conatin server.crt + ca.crt) + "server_key":"/usr/share/cgrates/tls/server.key", // path to server key + "client_certificate" : "/usr/share/cgrates/tls/client.crt", // path to client certificate(must conatin client.crt + ca.crt) + "client_key":"/usr/share/cgrates/tls/client.key", // path to client key + "ca_certificate":"/usr/share/cgrates/tls/ca.crt", +}, + + +"stor_db": { + "db_password": "CGRateS.org", +}, + + +"rals": { + "enabled": true, +}, + + +"scheduler": { + "enabled": true, +}, + + +"cdrs": { + "enabled": true, +}, + + +"attributes": { + "enabled": true, +}, + + +"sessions": { + "enabled": true, + "attributes_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} + ], + "cdrs_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} + ], + "rals_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} + ], +}, + + +} diff --git a/data/conf/samples/httpagenttls/httpagent.json b/data/conf/samples/httpagenttls/httpagent.json new file mode 100755 index 000000000..6ed53fb1c --- /dev/null +++ b/data/conf/samples/httpagenttls/httpagent.json @@ -0,0 +1,122 @@ +{ + + +"http_agent": [ + { + "id": "conecto1", + "url": "/conecto", + "sessions_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} + ], + "request_payload": "*url", + "reply_payload": "*xml", + "request_processors": [ + { + "id": "OutboundAUTHDryRun", + "filters": ["*string:*req.request_type:OutboundAUTH","*string:*req.Msisdn:497700056231"], + "tenant": "cgrates.org", + "flags": ["*dryrun"], + "request_fields":[ + ], + "reply_fields":[ + {"tag": "Allow", "field_id": "response.Allow", "type": "*constant", + "value": "1", "mandatory": true}, + {"tag": "MaxDuration", "field_id": "response.MaxDuration", "type": "*constant", + "value": "1200", "blocker": true}, + {"tag": "Unused", "field_id": "response.Unused", "type": "*constant", + "value": "0"}, + ], + }, + { + "id": "OutboundAUTH", + "filters": ["*string:*req.request_type:OutboundAUTH"], + "tenant": "cgrates.org", + "flags": [ "*auth", "*accounts", "*attributes"], + "request_fields":[ + {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", + "value": "*pseudoprepaid", "mandatory": true}, + {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", + "value": "~*req.CallID", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*composed", + "value": "~*req.Msisdn", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*composed", + "value": "~*req.Destination", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*constant", + "value": "*now", "mandatory": true}, + ], + "reply_fields":[ + {"tag": "Allow", "field_id": "response.Allow", "type": "*constant", + "value": "1", "mandatory": true}, + {"tag": "MaxDuration", "field_id": "response.MaxDuration", "type": "*composed", + "value": "~*cgrep.MaxUsage{*duration_seconds}", "mandatory": true}, + ], + }, + { + "id": "mtcall_cdr", + "filters": ["*string:*req.request_type:MTCALL_CDR"], + "tenant": "cgrates.org", + "flags": ["*cdrs"], + "request_fields":[ + {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", + "value": "*pseudoprepaid", "mandatory": true}, + {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", + "value": "~*req.CDR_ID", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*composed", + "value": "~*req.msisdn", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*composed", + "value": "~*req.destination", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", + "value": "~*req.timestamp", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "SetupTime", "type": "*composed", + "value": "~*req.timestamp", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*composed", + "value": "~*req.leg_duration;s", "mandatory": true}, + ], + "reply_fields":[ + {"tag": "CDR_ID", "field_id": "CDR_RESPONSE.CDR_ID", "type": "*composed", + "value": "~*req.CDR_ID", "mandatory": true}, + {"tag": "CDR_STATUS", "field_id": "CDR_RESPONSE.CDR_STATUS", "type": "*constant", + "value": "1", "mandatory": true}, + ], + } + ], + }, + { + "id": "conecto_xml", + "url": "/conecto_xml", + "sessions_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} + ], + "request_payload": "*xml", + "reply_payload": "*xml", + "request_processors": [ + { + "id": "cdr_from_xml", + "tenant": "cgrates.org", + "flags": ["*cdrs"], + "request_fields":[ + {"tag": "TOR", "field_id": "ToR", "type": "*constant", + "value": "*data", "mandatory": true}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", + "value": "*pseudoprepaid", "mandatory": true}, + {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", + "value": "~*req.complete-datasession-notification.customerid", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*composed", + "value": "~*req.complete-datasession-notification.username", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*composed", + "value": "~*req.complete-datasession-notification.userid", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", + "value": "~*req.complete-datasession-notification.createtime", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", + "value": "~*req.complete-datasession-notification.createtime", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*composed", + "value": "~*req.complete-datasession-notification.callleg.bytes", "mandatory": true}, + ], + "reply_fields":[], + } + ], + }, +], + + +} \ No newline at end of file diff --git a/data/conf/samples/tls/cgrates.json b/data/conf/samples/tls/cgrates.json index c3ab2fa0f..28592cc65 100755 --- a/data/conf/samples/tls/cgrates.json +++ b/data/conf/samples/tls/cgrates.json @@ -40,7 +40,6 @@ "db_type": "redis", // data_db type: "db_port": 6379, // data_db port to reach the database "db_name": "10", // data_db database name to connect to - }, @@ -49,43 +48,6 @@ }, -"cache":{ - "destinations": {"limit": 10000, "ttl":"0s", "precache": true}, - "reverse_destinations": {"limit": 10000, "ttl":"0s", "precache": true}, - "rating_plans": {"limit": 10000, "ttl":"0s","precache": true}, - "rating_profiles": {"limit": 10000, "ttl":"0s", "precache": true}, - "lcr_rules": {"limit": 10000, "ttl":"0s", "precache": true}, - "cdr_stats": {"limit": 10000, "ttl":"0s", "precache": true}, - "actions": {"limit": 10000, "ttl":"0s", "precache": true}, - "action_plans": {"limit": 10000, "ttl":"0s", "precache": true}, - "account_action_plans": {"limit": 10000, "ttl":"0s", "precache": true}, - "action_triggers": {"limit": 10000, "ttl":"0s", "precache": true}, - "shared_groups": {"limit": 10000, "ttl":"0s", "precache": true}, - "aliases": {"limit": 10000, "ttl":"0s", "precache": true}, - "reverse_aliases": {"limit": 10000, "ttl":"0s", "precache": true}, - "derived_chargers": {"limit": 10000, "ttl":"0s", "precache": true}, - "resource_profiles": {"limit": 10000, "ttl":"0s", "precache": true}, - "resources": {"limit": 10000, "ttl":"0s", "precache": true}, - "statqueues": {"limit": 10000, "ttl":"0s", "precache": true}, - "statqueue_profiles": {"limit": 10000, "ttl":"0s", "precache": true}, - "thresholds": {"limit": 10000, "ttl":"0s", "precache": true}, - "threshold_profiles": {"limit": 10000, "ttl":"0s", "precache": true}, - "filters": {"limit": 10000, "ttl":"0s", "precache": true}, - "supplier_profiles": {"limit": 10000, "ttl":"0s", "precache": true}, - "attribute_profiles": {"limit": 10000, "ttl":"0s", "precache": true}, - "resource_filter_indexes" :{"limit": 10000, "ttl":"0s"}, - "resource_filter_revindexes" : {"limit": 10000, "ttl":"0s"}, - "stat_filter_indexes" : {"limit": 10000, "ttl":"0s"}, - "stat_filter_revindexes" : {"limit": 10000, "ttl":"0s"}, - "threshold_filter_indexes" : {"limit": 10000, "ttl":"0s"}, - "threshold_filter_revindexes" : {"limit": 10000, "ttl":"0s"}, - "supplier_filter_indexes" : {"limit": 10000, "ttl":"0s"}, - "supplier_filter_revindexes" :{"limit": 10000, "ttl":"0s"}, - "attribute_filter_indexes" : {"limit": 10000, "ttl":"0s"}, - "attribute_filter_revindexes" : {"limit": 10000, "ttl":"0s"}, -}, - - "rals": { "enabled": true, }, @@ -95,7 +57,7 @@ "enabled": true, "store_interval": "1s", "thresholds_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport": "*json"} ], }, @@ -104,7 +66,7 @@ "enabled": true, "store_interval": "1s", "thresholds_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport": "*json"} ], }, @@ -113,5 +75,35 @@ "store_interval": "1s", }, +"sessions": { + "enabled": true, + "rals_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} + ], + "resources_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} + ], + "attributes_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} + ], +}, + +"attributes": { + "enabled": true, +}, + +"cdrs": { + "enabled": true, + "chargers_conns":[ + {"address": "*internal"} + ], +}, + +"chargers": { + "enabled": true, + "attributes_conns": [ + {"address": "*internal"} + ], +}, } diff --git a/engine/action.go b/engine/action.go index 2b27be707..527d5a348 100644 --- a/engine/action.go +++ b/engine/action.go @@ -682,7 +682,7 @@ func cgrRPCAction(account *Account, sq *CDRStatsQueueTriggered, a *Action, acs A } var client rpcclient.RpcClientConnection if req.Address != utils.MetaInternal { - if client, err = rpcclient.NewRpcClient("tcp", req.Address, "", "", "", + if client, err = rpcclient.NewRpcClient("tcp", req.Address, false, "", "", "", req.Attempts, 0, config.CgrConfig().GeneralCfg().ConnectTimeout, config.CgrConfig().GeneralCfg().ReplyTimeout, req.Transport, nil, false); err != nil { diff --git a/engine/libengine.go b/engine/libengine.go index 4e5bb9065..1c07c97e2 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -28,7 +28,7 @@ import ( "github.com/cgrates/rpcclient" ) -func NewRPCPool(dispatchStrategy, key_path, cert_path, ca_path string, connAttempts, reconnects int, +func NewRPCPool(dispatchStrategy string, key_path, cert_path, ca_path string, connAttempts, reconnects int, connectTimeout, replyTimeout time.Duration, rpcConnCfgs []*config.HaPoolConfig, internalConnChan chan rpcclient.RpcClientConnection, ttl time.Duration) (*rpcclient.RpcClientPool, error) { var rpcClient *rpcclient.RpcClient @@ -44,14 +44,14 @@ func NewRPCPool(dispatchStrategy, key_path, cert_path, ca_path string, connAttem case <-time.After(ttl): return nil, errors.New("TTL triggered") } - rpcClient, err = rpcclient.NewRpcClient("", "", key_path, cert_path, ca_path, connAttempts, + rpcClient, err = rpcclient.NewRpcClient("", "", rpcConnCfg.Tls, key_path, cert_path, ca_path, connAttempts, reconnects, connectTimeout, replyTimeout, rpcclient.INTERNAL_RPC, internalConn, false) } else if utils.IsSliceMember([]string{utils.MetaJSONrpc, utils.MetaGOBrpc, ""}, rpcConnCfg.Transport) { codec := utils.GOB if rpcConnCfg.Transport != "" { codec = rpcConnCfg.Transport[1:] // Transport contains always * before codec understood by rpcclient } - rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Address, key_path, cert_path, ca_path, + rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Address, rpcConnCfg.Tls, key_path, cert_path, ca_path, connAttempts, reconnects, connectTimeout, replyTimeout, codec, nil, false) } else { return nil, fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport) diff --git a/engine/pubsub.go b/engine/pubsub.go index 81f44f2c2..3883ee238 100644 --- a/engine/pubsub.go +++ b/engine/pubsub.go @@ -239,7 +239,7 @@ type ProxyPubSub struct { } func NewProxyPubSub(addr string, attempts, reconnects int, connectTimeout, replyTimeout time.Duration) (*ProxyPubSub, error) { - client, err := rpcclient.NewRpcClient("tcp", addr, "", "", "", attempts, reconnects, connectTimeout, replyTimeout, utils.GOB, nil, false) + client, err := rpcclient.NewRpcClient("tcp", addr, false, "", "", "", attempts, reconnects, connectTimeout, replyTimeout, utils.GOB, nil, false) if err != nil { return nil, err } diff --git a/general_tests/cdrs_onlexp_it_test.go b/general_tests/cdrs_onlexp_it_test.go index 6cbf34611..31d26c3f5 100644 --- a/general_tests/cdrs_onlexp_it_test.go +++ b/general_tests/cdrs_onlexp_it_test.go @@ -86,7 +86,7 @@ func TestCDRsOnExpStartSlaveEngine(t *testing.T) { // Connect rpc client to rater func TestCDRsOnExpHttpCdrReplication(t *testing.T) { - cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, "", "", "", 1, 1, + cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, false, "", "", "", 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil, false) if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) @@ -104,7 +104,7 @@ func TestCDRsOnExpHttpCdrReplication(t *testing.T) { t.Error("Unexpected reply received: ", reply) } time.Sleep(time.Duration(*waitRater) * time.Millisecond) - cdrsSlaveRpc, err := rpcclient.NewRpcClient("tcp", "127.0.0.1:12012", "", "", "", 1, 1, + cdrsSlaveRpc, err := rpcclient.NewRpcClient("tcp", "127.0.0.1:12012", false, "", "", "", 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil, false) if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) diff --git a/general_tests/rpcclient_it_test.go b/general_tests/rpcclient_it_test.go index 33f782388..b42f87ae3 100644 --- a/general_tests/rpcclient_it_test.go +++ b/general_tests/rpcclient_it_test.go @@ -78,13 +78,13 @@ func TestRPCITLclStartSecondEngine(t *testing.T) { // Connect rpc client to rater func TestRPCITLclRpcConnPoolFirst(t *testing.T) { rpcPoolFirst = rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST, 0) - rpcRAL1, err = rpcclient.NewRpcClient("tcp", rpcITCfg1.RPCJSONListen, "", "", "", 3, 1, + rpcRAL1, err = rpcclient.NewRpcClient("tcp", rpcITCfg1.RPCJSONListen, false, "", "", "", 3, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil, false) if err == nil { t.Fatal("Should receive cannot connect error here") } rpcPoolFirst.AddClient(rpcRAL1) - rpcRAL2, err = rpcclient.NewRpcClient("tcp", rpcITCfg2.RPCJSONListen, "", "", "", 3, 1, + rpcRAL2, err = rpcclient.NewRpcClient("tcp", rpcITCfg2.RPCJSONListen, false, "", "", "", 3, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil, false) if err != nil { t.Fatal(err) @@ -312,13 +312,13 @@ func TestRPCITRmtRpcConnPool(t *testing.T) { return } rpcPoolFirst = rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST, 0) - rpcRALRmt, err := rpcclient.NewRpcClient("tcp", RemoteRALsAddr1, "", "", "", 1, 1, + rpcRALRmt, err := rpcclient.NewRpcClient("tcp", RemoteRALsAddr1, false, "", "", "", 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil, false) if err != nil { t.Fatal(err) } rpcPoolFirst.AddClient(rpcRALRmt) - rpcRAL1, err = rpcclient.NewRpcClient("tcp", RemoteRALsAddr2, "", "", "", 1, 1, + rpcRAL1, err = rpcclient.NewRpcClient("tcp", RemoteRALsAddr2, false, "", "", "", 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil, false) if err != nil { t.Fatal(err) diff --git a/general_tests/tls_it_test.go b/general_tests/tls_it_test.go index 78943ceb9..913e7928c 100755 --- a/general_tests/tls_it_test.go +++ b/general_tests/tls_it_test.go @@ -26,6 +26,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) @@ -80,21 +81,21 @@ func testTLSStartEngine(t *testing.T) { func testTLSRpcConn(t *testing.T) { var err error - tlsRpcClientJson, err = rpcclient.NewRpcClient("tcp", "localhost:2022", tlsCfg.TlsCfg().ClientKey, + tlsRpcClientJson, err = rpcclient.NewRpcClient("tcp", "localhost:2022", true, tlsCfg.TlsCfg().ClientKey, tlsCfg.TlsCfg().ClientCerificate, tlsCfg.TlsCfg().CaCertificate, 3, 3, time.Duration(1*time.Second), time.Duration(5*time.Minute), utils.JSON, nil, false) if err != nil { t.Errorf("Error: %s when dialing", err) } - tlsRpcClientGob, err = rpcclient.NewRpcClient("tcp", "localhost:2023", tlsCfg.TlsCfg().ClientKey, + tlsRpcClientGob, err = rpcclient.NewRpcClient("tcp", "localhost:2023", true, tlsCfg.TlsCfg().ClientKey, tlsCfg.TlsCfg().ClientCerificate, tlsCfg.TlsCfg().CaCertificate, 3, 3, time.Duration(1*time.Second), time.Duration(5*time.Minute), utils.GOB, nil, false) if err != nil { t.Errorf("Error: %s when dialing", err) } - tlsHTTPJson, err = rpcclient.NewRpcClient("tcp", "https://localhost:2280/jsonrpc", tlsCfg.TlsCfg().ClientKey, + tlsHTTPJson, err = rpcclient.NewRpcClient("tcp", "https://localhost:2280/jsonrpc", true, tlsCfg.TlsCfg().ClientKey, tlsCfg.TlsCfg().ClientCerificate, tlsCfg.TlsCfg().CaCertificate, 3, 3, time.Duration(1*time.Second), time.Duration(5*time.Minute), rpcclient.JSON_HTTP, nil, false) if err != nil { @@ -129,6 +130,35 @@ func testTLSPing(t *testing.T) { if err := tlsHTTPJson.Call(utils.DispatcherSv1Ping, "", &reply); err == nil { t.Error(err) } + + initUsage := time.Duration(5 * time.Minute) + args := &sessions.V1InitSessionArgs{ + InitSession: true, + AllocateResources: true, + GetAttributes: true, + CGREvent: utils.CGREvent{ + Tenant: "cgrates.org", + ID: "TestSSv1ItInitiateSession", + Event: map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.ToR: utils.VOICE, + utils.OriginID: "TestSSv1It1", + utils.RequestType: utils.META_PREPAID, + utils.Account: "1001", + utils.Subject: "ANY2CNT", + utils.Destination: "1002", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), + utils.Usage: initUsage, + }, + }, + } + var rply sessions.V1InitReplyWithDigest + if err := tlsHTTPJson.Call(utils.SessionSv1InitiateSessionWithDigest, + args, &rply); err == nil { + t.Error(err) + } } func testTLSStopEngine(t *testing.T) { diff --git a/glide.lock b/glide.lock index 921c6030f..2e9d331e6 100644 --- a/glide.lock +++ b/glide.lock @@ -20,7 +20,7 @@ imports: - name: github.com/cgrates/radigo version: 69d4269e21990c0f120b8e60d5b75d533db7f3dd - name: github.com/cgrates/rpcclient - version: 7e7a74eff4a9cc56dbfd43502b8a4c2757ffbd67 + version: 7316bff37a2b8692fbadd57f9c9cda070cc33081 - name: github.com/DisposaBoy/JsonConfigReader version: 5ea4d0ddac554439159cd6f191cb94a110d73352 - name: github.com/fiorix/go-diameter diff --git a/sessions/sessions.go b/sessions/sessions.go index 6c3893cf7..15643a6df 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -50,7 +50,7 @@ func NewSessionReplicationConns(conns []*config.HaPoolConfig, reconnects int, connTimeout, replyTimeout time.Duration) (smgConns []*SMGReplicationConn, err error) { smgConns = make([]*SMGReplicationConn, len(conns)) for i, replConnCfg := range conns { - if replCon, err := rpcclient.NewRpcClient("tcp", replConnCfg.Address, "", "", "", 0, reconnects, + if replCon, err := rpcclient.NewRpcClient("tcp", replConnCfg.Address, replConnCfg.Tls, "", "", "", 0, reconnects, connTimeout, replyTimeout, replConnCfg.Transport[1:], nil, true); err != nil { return nil, err } else { diff --git a/utils/server.go b/utils/server.go index 287be86bb..971979f17 100644 --- a/utils/server.go +++ b/utils/server.go @@ -45,6 +45,7 @@ type Server struct { httpEnabled bool birpcSrv *rpc2.Server sync.RWMutex + httpsMux *http.ServeMux } func (s *Server) RpcRegister(rcvr interface{}) { @@ -63,6 +64,9 @@ func (s *Server) RpcRegisterName(name string, rcvr interface{}) { func (s *Server) RegisterHttpFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) { http.HandleFunc(pattern, handler) + if s.httpsMux != nil { + s.httpsMux.HandleFunc(pattern, handler) + } s.Lock() s.httpEnabled = true s.Unlock() @@ -70,6 +74,9 @@ func (s *Server) RegisterHttpFunc(pattern string, handler func(http.ResponseWrit func (s *Server) RegisterHttpHandler(pattern string, handler http.Handler) { http.Handle(pattern, handler) + if s.httpsMux != nil { + s.httpsMux.Handle(pattern, handler) + } s.Lock() s.httpEnabled = true s.Unlock() @@ -419,16 +426,16 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP if !enabled { return } - mux := http.NewServeMux() + s.httpsMux = http.NewServeMux() if enabled && jsonRPCURL != "" { s.Lock() s.httpEnabled = true s.Unlock() Logger.Info(" enabling handler for JSON-RPC") if useBasicAuth { - mux.HandleFunc(jsonRPCURL, use(handleRequest, basicAuth(userList))) + s.httpsMux.HandleFunc(jsonRPCURL, use(handleRequest, basicAuth(userList))) } else { - mux.HandleFunc(jsonRPCURL, handleRequest) + s.httpsMux.HandleFunc(jsonRPCURL, handleRequest) } } if enabled && wsRPCURL != "" { @@ -440,11 +447,11 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP jsonrpc.ServeConn(ws) }) if useBasicAuth { - mux.HandleFunc(wsRPCURL, use(func(w http.ResponseWriter, r *http.Request) { + s.httpsMux.HandleFunc(wsRPCURL, use(func(w http.ResponseWriter, r *http.Request) { wsHandler.ServeHTTP(w, r) }, basicAuth(userList))) } else { - mux.Handle(wsRPCURL, wsHandler) + s.httpsMux.Handle(wsRPCURL, wsHandler) } } if !s.httpEnabled { @@ -459,7 +466,7 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP } httpSrv := http.Server{ Addr: addr, - Handler: mux, + Handler: s.httpsMux, TLSConfig: &config, } Logger.Info(fmt.Sprintf(" start listening at <%s>", addr))