Added registrarc integration test

This commit is contained in:
Trial97
2021-03-02 11:42:29 +02:00
committed by Dan Christian Bogos
parent e017c7a292
commit 5073a7bda2
8 changed files with 643 additions and 5 deletions

View File

@@ -131,6 +131,9 @@ func (rh *RemoteHost) loadFromJSONCfg(jsnCfg *RemoteHostJson) {
}
if jsnCfg.Id != nil {
rh.ID = *jsnCfg.Id
// ignore defaults if we have ID
rh.Address = utils.EmptyString
rh.Transport = utils.EmptyString
}
if jsnCfg.Address != nil {
rh.Address = *jsnCfg.Address
@@ -198,16 +201,19 @@ func UpdateRPCCons(rpcConns RPCConns, newHosts map[string]*RemoteHost) (connIDs
// RemoveRPCCons will parse each conn and reset only
// the conns that have the same ID
func RemoveRPCCons(rpcConns RPCConns, newHosts utils.StringSet) {
for _, rpcPool := range rpcConns {
func RemoveRPCCons(rpcConns RPCConns, hosts utils.StringSet) (connIDs utils.StringSet) {
connIDs = make(utils.StringSet)
for rpcKey, rpcPool := range rpcConns {
for _, rh := range rpcPool.Conns {
if !newHosts.Has(rh.ID) {
if !hosts.Has(rh.ID) {
continue
}
connIDs.Add(rpcKey)
rh.Address = ""
rh.Transport = ""
rh.Synchronous = false
rh.TLS = false
}
}
return
}

View File

@@ -0,0 +1,156 @@
{
// CGRateS Configuration file
"general": {
"log_level": 7,
"reply_timeout": "30s",
},
"listen": {
"rpc_json": ":3012",
"rpc_gob": ":3013",
"http": ":3080",
},
"data_db": {
"db_type": "mongo",
"db_name": "10",
"db_port": 27017,
},
"stor_db": {
"db_type": "mongo",
"db_name": "cgrates",
"db_port": 27017,
},
"rpc_conns": {
"regConn": {
"strategy": "*first",
"conns": [{"address": "http://127.0.0.1:2080/registrar", "transport":"*http_jsonrpc"}]
}
},
"rals": {
"enabled": true,
"thresholds_conns": ["*internal"],
"max_increments":3000000,
},
"schedulers": {
"enabled": true,
"cdrs_conns": ["*localhost"],
"stats_conns": ["*localhost"],
},
"cdrs": {
"enabled": true,
},
"chargers": {
"enabled": true,
"attributes_conns": ["*internal"],
},
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": ["*internal"]
},
"stats": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": ["*internal"],
},
"thresholds": {
"enabled": true,
"store_interval": "1s",
},
"routes": {
"enabled": true,
"stats_conns": ["*localhost"],
"resources_conns": ["*localhost"],
"rals_conns": ["*internal"],
},
"attributes": {
"enabled": true,
"stats_conns": ["*localhost"],
"resources_conns": ["*localhost"],
"apiers_conns": ["*localhost"]
},
"sessions": {
"enabled": true,
"rals_conns": ["*internal"],
"cdrs_conns": ["*internal"],
"chargers_conns": ["*internal"],
},
"migrator": {
"out_datadb_type": "mongo",
"out_datadb_port": "27017",
"out_datadb_name": "10",
"out_stordb_type": "mongo",
"out_stordb_port": "27017",
"out_stordb_name": "cgrates",
"users_filters":["Account"],
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
},
"rates": {
"enabled": true
},
"actions": {
"enabled": true,
"accounts_conns": ["*localhost"]
},
"accounts": {
"enabled": true,
},
"filters": {
"apiers_conns": ["*internal"],
},
"registrarc":{
"rpc":{
"enabled": true,
"registrars_conns": ["regConn"],
"hosts": {
"*default":[{"ID":"attributes", "transport": "*json"}]
},
"refresh_interval": "1s",
},
},
}

View File

@@ -0,0 +1,153 @@
{
// CGRateS Configuration file
"general": {
"log_level": 7,
"reply_timeout": "30s",
},
"listen": {
"rpc_json": ":3012",
"rpc_gob": ":3013",
"http": ":3080",
},
"data_db": {
"db_type": "redis",
"db_port": 6379,
"db_name": "10",
},
"stor_db": {
"db_password": "CGRateS.org",
},
"rpc_conns": {
"regConn": {
"strategy": "*first",
"conns": [{"address": "http://127.0.0.1:2080/registrar", "transport":"*http_jsonrpc"}]
}
},
"rals": {
"enabled": true,
"thresholds_conns": ["*internal"],
"max_increments":3000000,
},
"schedulers": {
"enabled": true,
"cdrs_conns": ["*localhost"],
"stats_conns": ["*localhost"],
},
"cdrs": {
"enabled": true,
},
"chargers": {
"enabled": true,
"attributes_conns": ["*internal"],
},
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": ["*internal"]
},
"stats": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": ["*internal"],
},
"thresholds": {
"enabled": true,
"store_interval": "1s",
},
"routes": {
"enabled": true,
"stats_conns": ["*localhost"],
"resources_conns": ["*localhost"],
"rals_conns": ["*internal"],
},
"attributes": {
"enabled": true,
"stats_conns": ["*localhost"],
"resources_conns": ["*localhost"],
"apiers_conns": ["*localhost"]
},
"sessions": {
"enabled": true,
"rals_conns": ["*internal"],
"cdrs_conns": ["*internal"],
"chargers_conns": ["*internal"],
},
"migrator": {
"out_datadb_type": "mongo",
"out_datadb_port": "27017",
"out_datadb_name": "10",
"out_stordb_type": "mongo",
"out_stordb_port": "27017",
"out_stordb_name": "cgrates",
"users_filters":["Account"],
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
},
"rates": {
"enabled": true
},
"actions": {
"enabled": true,
"accounts_conns": ["*localhost"]
},
"accounts": {
"enabled": true,
},
"filters": {
"apiers_conns": ["*internal"],
},
"registrarc":{
"rpc":{
"enabled": true,
"registrars_conns": ["regConn"],
"hosts": {
"*default":[{"ID":"attributes", "transport": "*json"}]
},
"refresh_interval": "1s",
},
},
}

View File

@@ -0,0 +1,49 @@
{
// CGRateS Configuration file
"general": {
"log_level": 7,
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
},
"data_db": {
"db_type": "mongo",
"db_name": "10",
"db_port": 27017,
},
"stor_db": {
"db_type": "mongo",
"db_name": "cgrates",
"db_port": 27017,
},
"rpc_conns": {
"conn1": {
"strategy": "*first",
"conns": [{"id": "attributes"}],
},
},
"chargers": {
"enabled": true,
"attributes_conns": ["conn1"],
},
"apiers": {
"enabled": true,
},
}

View File

@@ -0,0 +1,46 @@
{
// CGRateS Configuration file
"general": {
"log_level": 7,
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
},
"data_db": {
"db_type": "redis",
"db_port": 6379,
"db_name": "10",
},
"stor_db": {
"db_password": "CGRateS.org",
},
"rpc_conns": {
"conn1": {
"strategy": "*first",
"conns": [{"id": "attributes"}],
},
},
"chargers": {
"enabled": true,
"attributes_conns": ["conn1"],
},
"apiers": {
"enabled": true,
},
}

View File

@@ -38,6 +38,9 @@ func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connA
rpcPool = rpcclient.NewRPCPool(dispatchStrategy, replyTimeout)
for _, rpcConnCfg := range rpcConnCfgs {
if rpcConnCfg.Address == utils.EmptyString {
// in case we have only conns with empty addresse
// mimic an error to signal that the init was not done
err = rpcclient.ErrDisconnected
continue
}
rpcClient, err = NewRPCConnection(rpcConnCfg, keyPath, certPath, caPath, connAttempts, reconnects,

View File

@@ -188,8 +188,9 @@ func register(req *http.Request) (*json.RawMessage, error) {
utils.RegistrarC, err))
return sReq.Id, err
}
rpcConns := config.CgrConfig().RPCConns()
config.CgrConfig().LockSections(config.RPCConnsJsonName)
for _, connID := range args.IDs {
for connID := range config.RemoveRPCCons(rpcConns, utils.NewStringSet(args.IDs)) {
if err = engine.Cache.Remove(utils.CacheRPCConnections, connID,
true, utils.NonTransactional); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to remove connection <%s> in cache because: %s",
@@ -207,8 +208,9 @@ func register(req *http.Request) (*json.RawMessage, error) {
for _, dH := range dH {
cfgHosts[dH.ID] = dH.RemoteHost
}
rpcConns := config.CgrConfig().RPCConns()
config.CgrConfig().LockSections(config.RPCConnsJsonName)
for connID := range config.UpdateRPCCons(config.CgrConfig().RPCConns(), cfgHosts) {
for connID := range config.UpdateRPCCons(rpcConns, cfgHosts) {
if err = engine.Cache.Remove(utils.CacheRPCConnections, connID,
true, utils.NonTransactional); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to remove connection <%s> in cache because: %s",

View File

@@ -0,0 +1,223 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package registrarc
import (
"net/rpc"
"os/exec"
"path"
"reflect"
"sort"
"syscall"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
var (
rpcDir string
rpcCMD *exec.Cmd
rpcCfgPath string
rpcsDir string
rpcsCfgPath string
rpcsCfg *config.CGRConfig
rpcsRPC *rpc.Client
rpchTest = []func(t *testing.T){
testRPCInitCfg,
testRPCInitDB,
testRPCStartEngine,
testRPCLoadData,
testRPCChargerSNoAttr,
testRPCStartRegc,
testRPCChargerSWithAttr,
testRPCStopEngines,
testRPCChargerSNoAttr,
testRPCStopRegs,
}
)
func TestRPCHosts(t *testing.T) {
switch *dbType {
case utils.MetaMySQL:
rpcDir = "registrarc_rpc_mysql"
rpcsDir = "registrars_rpc_mysql"
case utils.MetaMongo:
rpcDir = "registrarc_rpc_mongo"
rpcsDir = "registrars_rpc_mongo"
case utils.MetaInternal, utils.MetaPostgres:
t.SkipNow()
default:
t.Fatal("Unknown Database type")
}
for _, stest := range rpchTest {
t.Run(rpcDir, stest)
}
}
func testRPCInitCfg(t *testing.T) {
rpcCfgPath = path.Join(*dataDir, "conf", "samples", "registrarc", rpcDir)
rpcsCfgPath = path.Join(*dataDir, "conf", "samples", "registrarc", rpcsDir)
var err error
if rpcsCfg, err = config.NewCGRConfigFromPath(rpcsCfgPath); err != nil {
t.Error(err)
}
}
func testRPCInitDB(t *testing.T) {
if err := engine.InitDataDb(rpcsCfg); err != nil {
t.Fatal(err)
}
if err := engine.InitStorDb(rpcsCfg); err != nil {
t.Fatal(err)
}
}
func testRPCStartEngine(t *testing.T) {
var err error
if _, err = engine.StopStartEngine(rpcsCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
rpcsRPC, err = newRPCClient(rpcsCfg.ListenCfg())
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
}
}
func testRPCLoadData(t *testing.T) {
var reply string
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "testit")}
if err := rpcsRPC.Call(utils.APIerSv1LoadTariffPlanFromFolder, attrs, &reply); err != nil {
t.Error(err)
}
time.Sleep(100 * time.Millisecond)
}
func testRPCChargerSNoAttr(t *testing.T) {
cgrEv := &utils.CGREvent{ // matching Charger1
Tenant: "cgrates.org",
Event: map[string]interface{}{
utils.AccountField: "1010",
},
Opts: map[string]interface{}{utils.OptsAttributesProcessRuns: 1.},
}
expErr := utils.NewErrServerError(rpcclient.ErrDisconnected).Error()
var rply []*engine.ChrgSProcessEventReply
if err := rpcsRPC.Call(utils.ChargerSv1ProcessEvent, cgrEv, &rply); err == nil || err.Error() != expErr {
t.Errorf("Expected error: %s,received: %v", expErr, err)
}
}
func testRPCStartRegc(t *testing.T) {
var err error
if rpcCMD, err = engine.StartEngine(rpcCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
}
func testRPCChargerSWithAttr(t *testing.T) {
cgrEv := &utils.CGREvent{ // matching Charger1
Tenant: "cgrates.org",
Event: map[string]interface{}{
utils.AccountField: "1010",
},
Opts: map[string]interface{}{utils.OptsAttributesProcessRuns: 1.},
}
processedEv := []*engine.ChrgSProcessEventReply{
{
ChargerSProfile: "CustomerCharges",
AlteredFields: []string{"*req.RunID"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
"Account": "1010",
"RunID": "CustomerCharges",
},
Opts: map[string]interface{}{
"*processRuns": 1.,
"*subsys": "*chargers",
},
},
}, {
ChargerSProfile: "Raw",
AttributeSProfiles: []string{"*constant:*req.RequestType:*none"},
AlteredFields: []string{"*req.RunID", "*req.RequestType"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
"Account": "1010",
"RequestType": "*none",
"RunID": "raw",
},
Opts: map[string]interface{}{
"*processRuns": 1.,
"*subsys": "*chargers",
},
},
}, {
ChargerSProfile: "SupplierCharges",
AttributeSProfiles: []string{"ATTR_SUPPLIER1"},
AlteredFields: []string{"*req.RunID", "*req.Subject"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
"Account": "1010",
"RunID": "SupplierCharges",
"Subject": "SUPPLIER1",
},
Opts: map[string]interface{}{
"*processRuns": 1.,
"*subsys": "*chargers",
},
},
},
}
var rply []*engine.ChrgSProcessEventReply
if err := rpcsRPC.Call(utils.ChargerSv1ProcessEvent, cgrEv, &rply); err != nil {
t.Fatal(err)
}
sort.Slice(rply, func(i, j int) bool {
return rply[i].ChargerSProfile < rply[j].ChargerSProfile
})
if !reflect.DeepEqual(rply, processedEv) {
t.Errorf("Expecting : %s, received: %s", utils.ToJSON(processedEv), utils.ToJSON(rply))
}
}
func testRPCStopEngines(t *testing.T) {
if err := rpcCMD.Process.Signal(syscall.SIGTERM); err != nil {
t.Fatal(err)
}
time.Sleep(2 * time.Second)
}
func testRPCStopRegs(t *testing.T) {
if err := engine.KillEngine(*waitRater); err != nil {
t.Error(err)
}
}