mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Updated apier/v2 integration for gob RPC
This commit is contained in:
committed by
Dan Christian Bogos
parent
352b5b7805
commit
6b57cc486a
@@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package v2
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/rpc"
|
||||
@@ -37,15 +38,25 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
|
||||
waitRater = flag.Int("wait_rater", 1500, "Number of miliseconds to wait for rater to start and cache")
|
||||
dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
|
||||
waitRater = flag.Int("wait_rater", 1500, "Number of miliseconds to wait for rater to start and cache")
|
||||
encoding = flag.String("rpc", utils.MetaJSONrpc, "what encoding whould be uused for rpc comunication")
|
||||
apierCfgPath string
|
||||
apierCfg *config.CGRConfig
|
||||
apierRPC *rpc.Client
|
||||
dm *engine.DataManager // share db connection here so we can check data we set through APIs
|
||||
)
|
||||
|
||||
var apierCfgPath string
|
||||
var apierCfg *config.CGRConfig
|
||||
var apierRPC *rpc.Client
|
||||
var dm *engine.DataManager // share db connection here so we can check data we set through APIs
|
||||
|
||||
func newRPCClient(cfg *config.ListenCfg) (c *rpc.Client, err error) {
|
||||
switch *encoding {
|
||||
case utils.MetaJSONrpc:
|
||||
return jsonrpc.Dial(utils.TCP, cfg.RPCJSONListen)
|
||||
case utils.MetaGOBrpc:
|
||||
return rpc.Dial(utils.TCP, cfg.RPCGOBListen)
|
||||
default:
|
||||
return nil, errors.New("UNSUPPORTED_RPC")
|
||||
}
|
||||
}
|
||||
func TestApierV2itLoadConfig(t *testing.T) {
|
||||
apierCfgPath = path.Join(*dataDir, "conf", "samples", "tutmysql")
|
||||
if apierCfg, err = config.NewCGRConfigFromPath(apierCfgPath); err != nil {
|
||||
@@ -88,7 +99,7 @@ func TestApierV2itStartEngine(t *testing.T) {
|
||||
|
||||
// Connect rpc client to rater
|
||||
func TestApierV2itRpcConn(t *testing.T) {
|
||||
apierRPC, err = jsonrpc.Dial("tcp", apierCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
|
||||
apierRPC, err = newRPCClient(apierCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -22,7 +22,6 @@ package v2
|
||||
|
||||
import (
|
||||
"net/rpc"
|
||||
"net/rpc/jsonrpc"
|
||||
"path"
|
||||
"reflect"
|
||||
"testing"
|
||||
@@ -103,7 +102,7 @@ func testAttributeSStartEngine(t *testing.T) {
|
||||
// Connect rpc client to rater
|
||||
func testAttributeSRPCConn(t *testing.T) {
|
||||
var err error
|
||||
attrSRPC, err = jsonrpc.Dial("tcp", alsPrfCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
|
||||
attrSRPC, err = newRPCClient(alsPrfCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -158,7 +157,7 @@ func testAttributeSSetAlsPrf(t *testing.T) {
|
||||
alsPrf.Compile()
|
||||
var reply *engine.AttributeProfile
|
||||
if err := attrSRPC.Call(utils.ApierV1GetAttributeProfile,
|
||||
&utils.TenantID{Tenant: "cgrates.org", ID: "ExternalAttribute"}, &reply); err != nil {
|
||||
utils.TenantIDWithArgDispatcher{TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "ExternalAttribute"}}, &reply); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
reply.Compile()
|
||||
@@ -224,7 +223,7 @@ func testAttributeSUpdateAlsPrf(t *testing.T) {
|
||||
alsPrf.Compile()
|
||||
var reply *engine.AttributeProfile
|
||||
if err := attrSRPC.Call(utils.ApierV1GetAttributeProfile,
|
||||
&utils.TenantID{Tenant: "cgrates.org", ID: "ExternalAttribute"}, &reply); err != nil {
|
||||
utils.TenantIDWithArgDispatcher{TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "ExternalAttribute"}}, &reply); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
reply.Compile()
|
||||
|
||||
@@ -21,7 +21,6 @@ package v2
|
||||
|
||||
import (
|
||||
"net/rpc"
|
||||
"net/rpc/jsonrpc"
|
||||
"path"
|
||||
"reflect"
|
||||
"testing"
|
||||
@@ -114,7 +113,7 @@ func testV2CDRsStartEngine(t *testing.T) {
|
||||
|
||||
// Connect rpc client to rater
|
||||
func testV2CDRsRpcConn(t *testing.T) {
|
||||
cdrsRpc, err = jsonrpc.Dial("tcp", cdrsCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
|
||||
cdrsRpc, err = newRPCClient(cdrsCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
|
||||
if err != nil {
|
||||
t.Fatal("Could not connect to rater: ", err.Error())
|
||||
}
|
||||
@@ -438,7 +437,7 @@ func testV2CDRsDifferentTenants(t *testing.T) {
|
||||
}
|
||||
var reply *engine.AttributeProfile
|
||||
if err := cdrsRpc.Call(utils.ApierV1GetAttributeProfile,
|
||||
&utils.TenantID{Tenant: "cgrates.com", ID: "ATTR_Tenant"}, &reply); err != nil {
|
||||
&utils.TenantIDWithArgDispatcher{TenantID: &utils.TenantID{Tenant: "cgrates.com", ID: "ATTR_Tenant"}}, &reply); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
reply.Compile()
|
||||
|
||||
@@ -21,7 +21,6 @@ package v2
|
||||
|
||||
import (
|
||||
"net/rpc"
|
||||
"net/rpc/jsonrpc"
|
||||
"path"
|
||||
"reflect"
|
||||
"testing"
|
||||
@@ -104,7 +103,7 @@ func testV2CDRsOfflineStartEngine(t *testing.T) {
|
||||
|
||||
// Connect rpc client to rater
|
||||
func testV2cdrsOfflineRpcConn(t *testing.T) {
|
||||
cdrsOfflineRpc, err = jsonrpc.Dial("tcp", cdrsOfflineCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
|
||||
cdrsOfflineRpc, err = newRPCClient(cdrsOfflineCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
|
||||
if err != nil {
|
||||
t.Fatal("Could not connect to rater: ", err.Error())
|
||||
}
|
||||
@@ -160,16 +159,18 @@ func testV2CDRsOfflineBalanceUpdate(t *testing.T) {
|
||||
t.Error(err)
|
||||
}
|
||||
//create a threshold that match out account
|
||||
tPrfl := &engine.ThresholdProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "THD_Test",
|
||||
FilterIDs: []string{"*string:Account:test"},
|
||||
MaxHits: -1,
|
||||
MinSleep: time.Duration(time.Second),
|
||||
Blocker: false,
|
||||
Weight: 20.0,
|
||||
ActionIDs: []string{"ACT_LOG"},
|
||||
Async: false,
|
||||
tPrfl := v1.ThresholdWithCache{
|
||||
ThresholdProfile: &engine.ThresholdProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "THD_Test",
|
||||
FilterIDs: []string{"*string:Account:test"},
|
||||
MaxHits: -1,
|
||||
MinSleep: time.Duration(time.Second),
|
||||
Blocker: false,
|
||||
Weight: 20.0,
|
||||
ActionIDs: []string{"ACT_LOG"},
|
||||
Async: false,
|
||||
},
|
||||
}
|
||||
if err := cdrsOfflineRpc.Call(utils.ApierV1SetThresholdProfile, tPrfl, &result); err != nil {
|
||||
t.Error(err)
|
||||
@@ -204,7 +205,7 @@ func testV2CDRsOfflineBalanceUpdate(t *testing.T) {
|
||||
t.Error("Unexpected error received: ", err)
|
||||
}
|
||||
//process cdr should trigger balance update event
|
||||
if err := cdrsOfflineRpc.Call(utils.CDRsV1ProcessCDR, cdr, &reply); err != nil {
|
||||
if err := cdrsOfflineRpc.Call(utils.CDRsV1ProcessCDR, &engine.CDRWithArgDispatcher{CDR: cdr}, &reply); err != nil {
|
||||
t.Error("Unexpected error: ", err.Error())
|
||||
} else if reply != utils.OK {
|
||||
t.Error("Unexpected reply received: ", reply)
|
||||
@@ -274,20 +275,22 @@ func testV2CDRsOfflineExpiryBalance(t *testing.T) {
|
||||
t.Error(err)
|
||||
}
|
||||
//create a threshold that match out account
|
||||
tPrfl := &engine.ThresholdProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "THD_Test2",
|
||||
FilterIDs: []string{"*string:Account:test2"},
|
||||
ActivationInterval: &utils.ActivationInterval{
|
||||
ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC),
|
||||
ExpiryTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC),
|
||||
tPrfl := &v1.ThresholdWithCache{
|
||||
ThresholdProfile: &engine.ThresholdProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "THD_Test2",
|
||||
FilterIDs: []string{"*string:Account:test2"},
|
||||
ActivationInterval: &utils.ActivationInterval{
|
||||
ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC),
|
||||
ExpiryTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC),
|
||||
},
|
||||
MaxHits: -1,
|
||||
MinSleep: time.Duration(0),
|
||||
Blocker: false,
|
||||
Weight: 20.0,
|
||||
ActionIDs: []string{"ACT_LOG"},
|
||||
Async: false,
|
||||
},
|
||||
MaxHits: -1,
|
||||
MinSleep: time.Duration(0),
|
||||
Blocker: false,
|
||||
Weight: 20.0,
|
||||
ActionIDs: []string{"ACT_LOG"},
|
||||
Async: false,
|
||||
}
|
||||
if err := cdrsOfflineRpc.Call(utils.ApierV1SetThresholdProfile, tPrfl, &result); err != nil {
|
||||
t.Error(err)
|
||||
@@ -374,7 +377,7 @@ func testV2CDRsBalancesWithSameWeight(t *testing.T) {
|
||||
t.Error("Unexpected error received: ", err)
|
||||
}
|
||||
//process cdr should trigger balance update event
|
||||
if err := cdrsOfflineRpc.Call(utils.CDRsV1ProcessCDR, cdr, &reply); err != nil {
|
||||
if err := cdrsOfflineRpc.Call(utils.CDRsV1ProcessCDR, &engine.CDRWithArgDispatcher{CDR: cdr}, &reply); err != nil {
|
||||
t.Error("Unexpected error: ", err.Error())
|
||||
} else if reply != utils.OK {
|
||||
t.Error("Unexpected reply received: ", reply)
|
||||
|
||||
@@ -22,7 +22,6 @@ package v2
|
||||
|
||||
import (
|
||||
"net/rpc"
|
||||
"net/rpc/jsonrpc"
|
||||
"path"
|
||||
"reflect"
|
||||
"testing"
|
||||
@@ -116,7 +115,7 @@ func testTPitStartEngine(t *testing.T) {
|
||||
// Connect rpc client to rater
|
||||
func testTPitRpcConn(t *testing.T) {
|
||||
var err error
|
||||
tpRPC, err = jsonrpc.Dial("tcp", tpCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
|
||||
tpRPC, err = newRPCClient(tpCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user