Update Remote functionality through RPC

This commit is contained in:
TeoV
2019-11-15 13:30:20 +02:00
committed by Dan Christian Bogos
parent ee37110cfe
commit bc5cf765c3
10 changed files with 60 additions and 25 deletions

View File

@@ -57,19 +57,19 @@ var sTestsInternalRemoteIT = []func(t *testing.T){
testInternalRemoteITRPCConn,
testInternalRemoteLoadDataInEngineTwo,
testInternalRemoteITGetAccount,
//testInternalRemoteITGetAttribute,
//testInternalRemoteITGetThreshold,
//testInternalRemoteITGetThresholdProfile,
//testInternalRemoteITGetResource,
//testInternalRemoteITGetResourceProfile,
//testInternalRemoteITGetStatQueueProfile,
//testInternalRemoteITGetSupplier,
//testInternalRemoteITGetFilter,
//testInternalRemoteITGetRatingPlan,
//testInternalRemoteITGetRatingProfile,
//testInternalRemoteITGetAction,
//testInternalRemoteITGetActionPlan,
//testInternalRemoteITGetAccountActionPlan,
testInternalRemoteITGetAttribute,
testInternalRemoteITGetThreshold,
testInternalRemoteITGetThresholdProfile,
testInternalRemoteITGetResource,
testInternalRemoteITGetResourceProfile,
testInternalRemoteITGetStatQueueProfile,
testInternalRemoteITGetSupplier,
testInternalRemoteITGetFilter,
testInternalRemoteITGetRatingPlan,
testInternalRemoteITGetRatingProfile,
testInternalRemoteITGetAction,
testInternalRemoteITGetActionPlan,
testInternalRemoteITGetAccountActionPlan,
////testInternalReplicationSetThreshold,
testInternalRemoteITKillEngine,
}
@@ -178,9 +178,9 @@ func testInternalRemoteLoadDataInEngineTwo(t *testing.T) {
func testInternalRemoteITGetAccount(t *testing.T) {
var acnt *engine.Account
expAcc := &engine.Account{
ID: "cgrates.org:testAccount",
ID: "cgrates.org:1001",
BalanceMap: map[string]engine.Balances{
"utils.MONETARY": []*engine.Balance{
utils.MONETARY: []*engine.Balance{
{
ID: "testAccount",
Value: 10,
@@ -191,12 +191,23 @@ func testInternalRemoteITGetAccount(t *testing.T) {
}
attrs := &utils.AttrGetAccount{
Tenant: "cgrates.org",
Account: "testAccount",
Account: "1001",
}
// make sure account exist in engine2
if err := engineTwoRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil {
t.Error(err)
} else if acnt.ID != expAcc.ID {
t.Errorf("expecting: %+v, received: %+v", expAcc.ID, acnt.ID)
} else if len(acnt.BalanceMap) != 1 {
t.Errorf("unexpected number of balances received: %+v", utils.ToJSON(acnt))
}
// check the account in internal
if err := internalRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expAcc, acnt) {
t.Errorf("expecting: %+v, received: %+v", utils.ToJSON(expAcc), utils.ToJSON(acnt))
} else if acnt.ID != expAcc.ID {
t.Errorf("expecting: %+v, received: %+v", expAcc.ID, acnt.ID)
} else if len(acnt.BalanceMap) != 1 {
t.Errorf("unexpected number of balances received: %+v", utils.ToJSON(acnt))
}
attrs = &utils.AttrGetAccount{

View File

@@ -148,11 +148,11 @@ func (rplSv1 *ReplicatorSv1) GetShareGroup(id string, reply *engine.SharedGroup)
}
//GetActions
func (rplSv1 *ReplicatorSv1) GetActions(id string, reply engine.Actions) error {
func (rplSv1 *ReplicatorSv1) GetActions(id string, reply *engine.Actions) error {
if rcv, err := rplSv1.dm.DataDB().GetActionsDrv(id); err != nil {
return err
} else {
reply = rcv
*reply = rcv
}
return nil
}
@@ -162,7 +162,7 @@ func (rplSv1 *ReplicatorSv1) GetActionPlan(id string, reply *engine.ActionPlan)
if rcv, err := rplSv1.dm.DataDB().GetActionPlanDrv(id, true, utils.NonTransactional); err != nil {
return err
} else {
reply = rcv
*reply = *rcv
}
return nil
}

View File

@@ -242,7 +242,7 @@ func main() {
var rmtConns, rplConns *rpcclient.RpcClientPool
if len(ldrCfg.DataDbCfg().RmtConns) != 0 {
var err error
rmtConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, ldrCfg.TlsCfg().ClientKey,
rmtConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST_POSITIVE, ldrCfg.TlsCfg().ClientKey,
ldrCfg.TlsCfg().ClientCerificate, ldrCfg.TlsCfg().CaCertificate,
ldrCfg.GeneralCfg().ConnectAttempts, ldrCfg.GeneralCfg().Reconnects,
ldrCfg.GeneralCfg().ConnectTimeout, ldrCfg.GeneralCfg().ReplyTimeout,

View File

@@ -34,4 +34,20 @@
"enabled": true,
},
"thresholds": {
"enabled": true,
"store_interval": "1s",
},
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "*internal"}
],
},
}

View File

@@ -1448,6 +1448,9 @@ func (ms *MongoStorage) GetAllActionPlansDrv() (ats map[string]*ActionPlan, err
if err != nil {
return nil, err
}
if len(keys) == 0 {
return nil, utils.ErrNotFound
}
ats = make(map[string]*ActionPlan, len(keys))
for _, key := range keys {
ap, err := ms.GetActionPlanDrv(key[len(utils.ACTION_PLAN_PREFIX):],

View File

@@ -984,6 +984,9 @@ func (rs *RedisStorage) GetAllActionPlansDrv() (ats map[string]*ActionPlan, err
if err != nil {
return nil, err
}
if len(keys) == 0 {
return nil, utils.ErrNotFound
}
ats = make(map[string]*ActionPlan, len(keys))
for _, key := range keys {
ap, err := rs.GetActionPlanDrv(key[len(utils.ACTION_PLAN_PREFIX):],

View File

@@ -71,7 +71,7 @@ func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string,
var rmtConns, rplConns *rpcclient.RpcClientPool
if len(config.CgrConfig().DataDbCfg().RmtConns) != 0 {
var err error
rmtConns, err = NewRPCPool(rpcclient.POOL_FIRST, config.CgrConfig().TlsCfg().ClientKey,
rmtConns, err = NewRPCPool(rpcclient.POOL_FIRST_POSITIVE, config.CgrConfig().TlsCfg().ClientKey,
config.CgrConfig().TlsCfg().ClientCerificate, config.CgrConfig().TlsCfg().CaCertificate,
config.CgrConfig().GeneralCfg().ConnectAttempts, config.CgrConfig().GeneralCfg().Reconnects,
config.CgrConfig().GeneralCfg().ConnectTimeout, config.CgrConfig().GeneralCfg().ReplyTimeout,

2
go.mod
View File

@@ -20,7 +20,7 @@ require (
github.com/cgrates/kamevapi v0.0.0-20191001125829-7dbc3ad58817
github.com/cgrates/ltcache v0.0.0-20181016092649-92fb7fa77cca
github.com/cgrates/radigo v0.0.0-20181207143118-e5c8f3272ccc
github.com/cgrates/rpcclient v0.0.0-20190505150825-8fcc68b2c38b
github.com/cgrates/rpcclient v0.0.0-20191115092211-732f09b356e3
github.com/creack/pty v1.1.7
github.com/fiorix/go-diameter v3.0.3-0.20190716165154-f4823472d0e0+incompatible
github.com/fortytw2/leaktest v1.3.0 // indirect

2
go.sum
View File

@@ -67,6 +67,8 @@ github.com/cgrates/radigo v0.0.0-20181207143118-e5c8f3272ccc h1:vOvPAyI9pNhUM5k/
github.com/cgrates/radigo v0.0.0-20181207143118-e5c8f3272ccc/go.mod h1:cMU/VXvC9YH2kXbhpgnkppYRjpqS8XgkKb8dI8HpU1I=
github.com/cgrates/rpcclient v0.0.0-20190505150825-8fcc68b2c38b h1:GC+/hEDN/2Frh8Tjkf7u1XFxj0Z2XtwjBxj0OH6Mzhw=
github.com/cgrates/rpcclient v0.0.0-20190505150825-8fcc68b2c38b/go.mod h1:Jy5Lv0y57OlxlNATKrkyAxgftYLHqXuxONgd4qsAC1U=
github.com/cgrates/rpcclient v0.0.0-20191115092211-732f09b356e3 h1:Hr038ZfPZz87OKLV4pRSzf3U06lZ8zjl/cXpwrv7hCM=
github.com/cgrates/rpcclient v0.0.0-20191115092211-732f09b356e3/go.mod h1:Jy5Lv0y57OlxlNATKrkyAxgftYLHqXuxONgd4qsAC1U=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/creack/pty v1.1.7 h1:6pwm8kMQKCmgUg0ZHTm5+/YvRK0s3THD/28+T6/kk4A=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=

View File

@@ -71,7 +71,7 @@ func (db *DataDBService) Start() (err error) {
var rmtConns, rplConns *rpcclient.RpcClientPool
if len(db.cfg.DataDbCfg().RmtConns) != 0 {
var err error
rmtConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, db.cfg.TlsCfg().ClientKey,
rmtConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST_POSITIVE, db.cfg.TlsCfg().ClientKey,
db.cfg.TlsCfg().ClientCerificate, db.cfg.TlsCfg().CaCertificate,
db.cfg.GeneralCfg().ConnectAttempts, db.cfg.GeneralCfg().Reconnects,
db.cfg.GeneralCfg().ConnectTimeout, db.cfg.GeneralCfg().ReplyTimeout,