diff --git a/config/rpcconn.go b/config/rpcconn.go index 933f45bea..a456eb59e 100644 --- a/config/rpcconn.go +++ b/config/rpcconn.go @@ -131,6 +131,9 @@ func (rh *RemoteHost) loadFromJSONCfg(jsnCfg *RemoteHostJson) { } if jsnCfg.Id != nil { rh.ID = *jsnCfg.Id + // ignore defaults if we have ID + rh.Address = utils.EmptyString + rh.Transport = utils.EmptyString } if jsnCfg.Address != nil { rh.Address = *jsnCfg.Address @@ -198,16 +201,19 @@ func UpdateRPCCons(rpcConns RPCConns, newHosts map[string]*RemoteHost) (connIDs // RemoveRPCCons will parse each conn and reset only // the conns that have the same ID -func RemoveRPCCons(rpcConns RPCConns, newHosts utils.StringSet) { - for _, rpcPool := range rpcConns { +func RemoveRPCCons(rpcConns RPCConns, hosts utils.StringSet) (connIDs utils.StringSet) { + connIDs = make(utils.StringSet) + for rpcKey, rpcPool := range rpcConns { for _, rh := range rpcPool.Conns { - if !newHosts.Has(rh.ID) { + if !hosts.Has(rh.ID) { continue } + connIDs.Add(rpcKey) rh.Address = "" rh.Transport = "" rh.Synchronous = false rh.TLS = false } } + return } diff --git a/data/conf/samples/registrarc/registrarc_rpc_mongo/cgrates.json b/data/conf/samples/registrarc/registrarc_rpc_mongo/cgrates.json new file mode 100644 index 000000000..140cebb48 --- /dev/null +++ b/data/conf/samples/registrarc/registrarc_rpc_mongo/cgrates.json @@ -0,0 +1,156 @@ +{ +// CGRateS Configuration file + + +"general": { + "log_level": 7, + "reply_timeout": "30s", +}, + + +"listen": { + "rpc_json": ":3012", + "rpc_gob": ":3013", + "http": ":3080", +}, + + +"data_db": { + "db_type": "mongo", + "db_name": "10", + "db_port": 27017, +}, + + +"stor_db": { + "db_type": "mongo", + "db_name": "cgrates", + "db_port": 27017, +}, + +"rpc_conns": { + "regConn": { + "strategy": "*first", + "conns": [{"address": "http://127.0.0.1:2080/registrar", "transport":"*http_jsonrpc"}] + } +}, + + +"rals": { + "enabled": true, + "thresholds_conns": ["*internal"], + "max_increments":3000000, +}, + + +"schedulers": { + "enabled": true, + "cdrs_conns": ["*localhost"], + "stats_conns": ["*localhost"], +}, + + +"cdrs": { + "enabled": true, +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + + +"resources": { + "enabled": true, + "store_interval": "1s", + "thresholds_conns": ["*internal"] +}, + + +"stats": { + "enabled": true, + "store_interval": "1s", + "thresholds_conns": ["*internal"], +}, + + +"thresholds": { + "enabled": true, + "store_interval": "1s", +}, + + +"routes": { + "enabled": true, + "stats_conns": ["*localhost"], + "resources_conns": ["*localhost"], + "rals_conns": ["*internal"], +}, + + +"attributes": { + "enabled": true, + "stats_conns": ["*localhost"], + "resources_conns": ["*localhost"], + "apiers_conns": ["*localhost"] +}, + + +"sessions": { + "enabled": true, + "rals_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], +}, + + +"migrator": { + "out_datadb_type": "mongo", + "out_datadb_port": "27017", + "out_datadb_name": "10", + "out_stordb_type": "mongo", + "out_stordb_port": "27017", + "out_stordb_name": "cgrates", + "users_filters":["Account"], +}, + + +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, + + +"rates": { + "enabled": true +}, + + +"actions": { + "enabled": true, + "accounts_conns": ["*localhost"] +}, + + +"accounts": { + "enabled": true, +}, + + +"filters": { + "apiers_conns": ["*internal"], +}, + +"registrarc":{ + "rpc":{ + "enabled": true, + "registrars_conns": ["regConn"], + "hosts": { + "*default":[{"ID":"attributes", "transport": "*json"}] + }, + "refresh_interval": "1s", + }, +}, + +} diff --git a/data/conf/samples/registrarc/registrarc_rpc_mysql/cgrates.json b/data/conf/samples/registrarc/registrarc_rpc_mysql/cgrates.json new file mode 100644 index 000000000..5e44367dd --- /dev/null +++ b/data/conf/samples/registrarc/registrarc_rpc_mysql/cgrates.json @@ -0,0 +1,153 @@ +{ +// CGRateS Configuration file + + +"general": { + "log_level": 7, + "reply_timeout": "30s", +}, + + +"listen": { + "rpc_json": ":3012", + "rpc_gob": ":3013", + "http": ":3080", +}, + + +"data_db": { + "db_type": "redis", + "db_port": 6379, + "db_name": "10", +}, + +"stor_db": { + "db_password": "CGRateS.org", +}, + +"rpc_conns": { + "regConn": { + "strategy": "*first", + "conns": [{"address": "http://127.0.0.1:2080/registrar", "transport":"*http_jsonrpc"}] + } +}, + + +"rals": { + "enabled": true, + "thresholds_conns": ["*internal"], + "max_increments":3000000, +}, + + +"schedulers": { + "enabled": true, + "cdrs_conns": ["*localhost"], + "stats_conns": ["*localhost"], +}, + + +"cdrs": { + "enabled": true, +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + + +"resources": { + "enabled": true, + "store_interval": "1s", + "thresholds_conns": ["*internal"] +}, + + +"stats": { + "enabled": true, + "store_interval": "1s", + "thresholds_conns": ["*internal"], +}, + + +"thresholds": { + "enabled": true, + "store_interval": "1s", +}, + + +"routes": { + "enabled": true, + "stats_conns": ["*localhost"], + "resources_conns": ["*localhost"], + "rals_conns": ["*internal"], +}, + + +"attributes": { + "enabled": true, + "stats_conns": ["*localhost"], + "resources_conns": ["*localhost"], + "apiers_conns": ["*localhost"] +}, + + +"sessions": { + "enabled": true, + "rals_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], +}, + + +"migrator": { + "out_datadb_type": "mongo", + "out_datadb_port": "27017", + "out_datadb_name": "10", + "out_stordb_type": "mongo", + "out_stordb_port": "27017", + "out_stordb_name": "cgrates", + "users_filters":["Account"], +}, + + +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, + + +"rates": { + "enabled": true +}, + + +"actions": { + "enabled": true, + "accounts_conns": ["*localhost"] +}, + + +"accounts": { + "enabled": true, +}, + + +"filters": { + "apiers_conns": ["*internal"], +}, + +"registrarc":{ + "rpc":{ + "enabled": true, + "registrars_conns": ["regConn"], + "hosts": { + "*default":[{"ID":"attributes", "transport": "*json"}] + }, + "refresh_interval": "1s", + }, +}, + +} diff --git a/data/conf/samples/registrarc/registrars_rpc_mongo/cgrates.json b/data/conf/samples/registrarc/registrars_rpc_mongo/cgrates.json new file mode 100644 index 000000000..0fb4b0edc --- /dev/null +++ b/data/conf/samples/registrarc/registrars_rpc_mongo/cgrates.json @@ -0,0 +1,49 @@ +{ +// CGRateS Configuration file + + +"general": { + "log_level": 7, +}, + + +"listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080", +}, + + +"data_db": { + "db_type": "mongo", + "db_name": "10", + "db_port": 27017, +}, + + +"stor_db": { + "db_type": "mongo", + "db_name": "cgrates", + "db_port": 27017, +}, + + +"rpc_conns": { + "conn1": { + "strategy": "*first", + "conns": [{"id": "attributes"}], + }, +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["conn1"], +}, + + +"apiers": { + "enabled": true, +}, + +} diff --git a/data/conf/samples/registrarc/registrars_rpc_mysql/cgrates.json b/data/conf/samples/registrarc/registrars_rpc_mysql/cgrates.json new file mode 100644 index 000000000..109faa84f --- /dev/null +++ b/data/conf/samples/registrarc/registrars_rpc_mysql/cgrates.json @@ -0,0 +1,46 @@ +{ +// CGRateS Configuration file + + +"general": { + "log_level": 7, +}, + + +"listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080", +}, + + +"data_db": { + "db_type": "redis", + "db_port": 6379, + "db_name": "10", +}, + +"stor_db": { + "db_password": "CGRateS.org", +}, + + +"rpc_conns": { + "conn1": { + "strategy": "*first", + "conns": [{"id": "attributes"}], + }, +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["conn1"], +}, + + +"apiers": { + "enabled": true, +}, + +} diff --git a/engine/libengine.go b/engine/libengine.go index 96300e4d4..b9c1f2b5a 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -38,6 +38,9 @@ func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connA rpcPool = rpcclient.NewRPCPool(dispatchStrategy, replyTimeout) for _, rpcConnCfg := range rpcConnCfgs { if rpcConnCfg.Address == utils.EmptyString { + // in case we have only conns with empty addresse + // mimic an error to signal that the init was not done + err = rpcclient.ErrDisconnected continue } rpcClient, err = NewRPCConnection(rpcConnCfg, keyPath, certPath, caPath, connAttempts, reconnects, diff --git a/registrarc/libregistrarc.go b/registrarc/libregistrarc.go index ef8342710..a5efcd000 100644 --- a/registrarc/libregistrarc.go +++ b/registrarc/libregistrarc.go @@ -188,8 +188,9 @@ func register(req *http.Request) (*json.RawMessage, error) { utils.RegistrarC, err)) return sReq.Id, err } + rpcConns := config.CgrConfig().RPCConns() config.CgrConfig().LockSections(config.RPCConnsJsonName) - for _, connID := range args.IDs { + for connID := range config.RemoveRPCCons(rpcConns, utils.NewStringSet(args.IDs)) { if err = engine.Cache.Remove(utils.CacheRPCConnections, connID, true, utils.NonTransactional); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> Failed to remove connection <%s> in cache because: %s", @@ -207,8 +208,9 @@ func register(req *http.Request) (*json.RawMessage, error) { for _, dH := range dH { cfgHosts[dH.ID] = dH.RemoteHost } + rpcConns := config.CgrConfig().RPCConns() config.CgrConfig().LockSections(config.RPCConnsJsonName) - for connID := range config.UpdateRPCCons(config.CgrConfig().RPCConns(), cfgHosts) { + for connID := range config.UpdateRPCCons(rpcConns, cfgHosts) { if err = engine.Cache.Remove(utils.CacheRPCConnections, connID, true, utils.NonTransactional); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> Failed to remove connection <%s> in cache because: %s", diff --git a/registrarc/registrarcrpc_it_test.go b/registrarc/registrarcrpc_it_test.go new file mode 100644 index 000000000..0cb2c427c --- /dev/null +++ b/registrarc/registrarcrpc_it_test.go @@ -0,0 +1,223 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package registrarc + +import ( + "net/rpc" + "os/exec" + "path" + "reflect" + "sort" + "syscall" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +var ( + rpcDir string + rpcCMD *exec.Cmd + rpcCfgPath string + + rpcsDir string + rpcsCfgPath string + rpcsCfg *config.CGRConfig + rpcsRPC *rpc.Client + + rpchTest = []func(t *testing.T){ + testRPCInitCfg, + testRPCInitDB, + testRPCStartEngine, + testRPCLoadData, + testRPCChargerSNoAttr, + testRPCStartRegc, + testRPCChargerSWithAttr, + testRPCStopEngines, + testRPCChargerSNoAttr, + testRPCStopRegs, + } +) + +func TestRPCHosts(t *testing.T) { + switch *dbType { + case utils.MetaMySQL: + rpcDir = "registrarc_rpc_mysql" + rpcsDir = "registrars_rpc_mysql" + case utils.MetaMongo: + rpcDir = "registrarc_rpc_mongo" + rpcsDir = "registrars_rpc_mongo" + case utils.MetaInternal, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("Unknown Database type") + } + for _, stest := range rpchTest { + t.Run(rpcDir, stest) + } +} + +func testRPCInitCfg(t *testing.T) { + rpcCfgPath = path.Join(*dataDir, "conf", "samples", "registrarc", rpcDir) + rpcsCfgPath = path.Join(*dataDir, "conf", "samples", "registrarc", rpcsDir) + var err error + if rpcsCfg, err = config.NewCGRConfigFromPath(rpcsCfgPath); err != nil { + t.Error(err) + } +} + +func testRPCInitDB(t *testing.T) { + if err := engine.InitDataDb(rpcsCfg); err != nil { + t.Fatal(err) + } + if err := engine.InitStorDb(rpcsCfg); err != nil { + t.Fatal(err) + } +} + +func testRPCStartEngine(t *testing.T) { + var err error + if _, err = engine.StopStartEngine(rpcsCfgPath, *waitRater); err != nil { + t.Fatal(err) + } + rpcsRPC, err = newRPCClient(rpcsCfg.ListenCfg()) + if err != nil { + t.Fatal("Could not connect to rater: ", err.Error()) + } +} + +func testRPCLoadData(t *testing.T) { + var reply string + attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "testit")} + if err := rpcsRPC.Call(utils.APIerSv1LoadTariffPlanFromFolder, attrs, &reply); err != nil { + t.Error(err) + } + time.Sleep(100 * time.Millisecond) +} + +func testRPCChargerSNoAttr(t *testing.T) { + cgrEv := &utils.CGREvent{ // matching Charger1 + Tenant: "cgrates.org", + Event: map[string]interface{}{ + utils.AccountField: "1010", + }, + Opts: map[string]interface{}{utils.OptsAttributesProcessRuns: 1.}, + } + expErr := utils.NewErrServerError(rpcclient.ErrDisconnected).Error() + var rply []*engine.ChrgSProcessEventReply + if err := rpcsRPC.Call(utils.ChargerSv1ProcessEvent, cgrEv, &rply); err == nil || err.Error() != expErr { + t.Errorf("Expected error: %s,received: %v", expErr, err) + } +} + +func testRPCStartRegc(t *testing.T) { + var err error + if rpcCMD, err = engine.StartEngine(rpcCfgPath, *waitRater); err != nil { + t.Fatal(err) + } + time.Sleep(time.Second) +} + +func testRPCChargerSWithAttr(t *testing.T) { + cgrEv := &utils.CGREvent{ // matching Charger1 + Tenant: "cgrates.org", + Event: map[string]interface{}{ + utils.AccountField: "1010", + }, + Opts: map[string]interface{}{utils.OptsAttributesProcessRuns: 1.}, + } + + processedEv := []*engine.ChrgSProcessEventReply{ + { + ChargerSProfile: "CustomerCharges", + AlteredFields: []string{"*req.RunID"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + "Account": "1010", + "RunID": "CustomerCharges", + }, + Opts: map[string]interface{}{ + "*processRuns": 1., + "*subsys": "*chargers", + }, + }, + }, { + ChargerSProfile: "Raw", + AttributeSProfiles: []string{"*constant:*req.RequestType:*none"}, + AlteredFields: []string{"*req.RunID", "*req.RequestType"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + "Account": "1010", + "RequestType": "*none", + "RunID": "raw", + }, + Opts: map[string]interface{}{ + "*processRuns": 1., + "*subsys": "*chargers", + }, + }, + }, { + ChargerSProfile: "SupplierCharges", + AttributeSProfiles: []string{"ATTR_SUPPLIER1"}, + AlteredFields: []string{"*req.RunID", "*req.Subject"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + "Account": "1010", + "RunID": "SupplierCharges", + "Subject": "SUPPLIER1", + }, + Opts: map[string]interface{}{ + "*processRuns": 1., + "*subsys": "*chargers", + }, + }, + }, + } + var rply []*engine.ChrgSProcessEventReply + if err := rpcsRPC.Call(utils.ChargerSv1ProcessEvent, cgrEv, &rply); err != nil { + t.Fatal(err) + } + sort.Slice(rply, func(i, j int) bool { + return rply[i].ChargerSProfile < rply[j].ChargerSProfile + }) + if !reflect.DeepEqual(rply, processedEv) { + t.Errorf("Expecting : %s, received: %s", utils.ToJSON(processedEv), utils.ToJSON(rply)) + } +} + +func testRPCStopEngines(t *testing.T) { + if err := rpcCMD.Process.Signal(syscall.SIGTERM); err != nil { + t.Fatal(err) + } + time.Sleep(2 * time.Second) +} + +func testRPCStopRegs(t *testing.T) { + if err := engine.KillEngine(*waitRater); err != nil { + t.Error(err) + } +}