Updated config for registrarc

This commit is contained in:
Trial97
2021-03-01 16:55:34 +02:00
committed by Dan Christian Bogos
parent 038aa5f2ea
commit 54ee982314
37 changed files with 495 additions and 425 deletions

View File

@@ -1,102 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package registrarc
import (
"fmt"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
// NewRegistrarCService constructs a DispatcherHService
func NewRegistrarCService(cfg *config.CGRConfig,
connMgr *engine.ConnManager) *DispatcherHostsService {
return &DispatcherHostsService{
cfg: cfg,
connMgr: connMgr,
}
}
// DispatcherHostsService is the service handling dispatching towards internal components
// designed to handle automatic partitioning and failover
type DispatcherHostsService struct {
cfg *config.CGRConfig
connMgr *engine.ConnManager
}
// ListenAndServe will initialize the service
func (dhS *DispatcherHostsService) ListenAndServe(stopChan chan struct{}) {
utils.Logger.Info("Starting DispatcherH service")
for {
dhS.registerHosts()
select {
case <-stopChan:
return
case <-time.After(dhS.cfg.RegistrarCCfg().Dispatcher.RefreshInterval):
}
}
}
// Shutdown is called to shutdown the service
func (dhS *DispatcherHostsService) Shutdown() {
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.RegistrarC))
dhS.unregisterHosts()
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.RegistrarC))
return
}
func (dhS *DispatcherHostsService) registerHosts() {
for _, connID := range dhS.cfg.RegistrarCCfg().Dispatcher.RegistrarSConns {
for tnt, hostCfgs := range dhS.cfg.RegistrarCCfg().Dispatcher.Hosts {
if tnt == utils.MetaDefault {
tnt = dhS.cfg.GeneralCfg().DefaultTenant
}
args, err := NewRegisterArgs(dhS.cfg, tnt, hostCfgs)
if err != nil {
continue
}
var rply string
if err := dhS.connMgr.Call([]string{connID}, nil, utils.RegistrarSv1RegisterDispatcherHosts, args, &rply); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts to the conn with ID <%s> because : %s",
utils.RegistrarC, connID, err))
continue
}
}
}
return
}
func (dhS *DispatcherHostsService) unregisterHosts() {
var rply string
for _, connID := range dhS.cfg.RegistrarCCfg().Dispatcher.RegistrarSConns {
for tnt, hostCfgs := range dhS.cfg.RegistrarCCfg().Dispatcher.Hosts {
if tnt == utils.MetaDefault {
tnt = dhS.cfg.GeneralCfg().DefaultTenant
}
if err := dhS.connMgr.Call([]string{connID}, nil, utils.RegistrarSv1UnregisterDispatcherHosts, NewUnregisterArgs(tnt, hostCfgs), &rply); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts with tenant<%s> to the conn with ID <%s> because : %s",
utils.RegistrarC, tnt, connID, err))
continue
}
}
}
}

View File

@@ -165,21 +165,12 @@ func register(req *http.Request) (*json.RawMessage, error) {
}
case utils.RegistrarSv1RegisterDispatcherHosts:
var dHs RegisterArgs
params := []interface{}{&dHs}
if err = json.Unmarshal(*sReq.Params, &params); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode params because: %s",
utils.RegistrarC, err))
return sReq.Id, err
}
var addr string
if addr, err = utils.GetRemoteIP(req); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to obtain the remote IP because: %s",
utils.RegistrarC, err))
dH, err := unmarshallRegisterArgs(req, *sReq.Params)
if err != nil {
return sReq.Id, err
}
for _, dH := range dHs.AsDispatcherHosts(addr) {
for _, dH := range dH {
if err = engine.Cache.Set(utils.CacheDispatcherHosts, dH.TenantID(), dH, nil,
true, utils.NonTransactional); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to set DispatcherHost <%s> in cache because: %s",
@@ -208,22 +199,12 @@ func register(req *http.Request) (*json.RawMessage, error) {
}
config.CgrConfig().UnlockSections(config.RPCConnsJsonName)
case utils.RegistrarSv1RegisterRPCHosts:
var dHs RegisterArgs
params := []interface{}{&dHs}
if err = json.Unmarshal(*sReq.Params, &params); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode params because: %s",
utils.RegistrarC, err))
dH, err := unmarshallRegisterArgs(req, *sReq.Params)
if err != nil {
return sReq.Id, err
}
var addr string
if addr, err = utils.GetRemoteIP(req); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to obtain the remote IP because: %s",
utils.RegistrarC, err))
return sReq.Id, err
}
cfgHosts := make(map[string]*config.RemoteHost)
for _, dH := range dHs.AsDispatcherHosts(addr) {
for _, dH := range dH {
cfgHosts[dH.ID] = dH.RemoteHost
}
config.CgrConfig().LockSections(config.RPCConnsJsonName)
@@ -266,6 +247,10 @@ func getConnPort(cfg *config.CGRConfig, transport string, tls bool) (port string
address = cfg.ListenCfg().HTTPListen
}
extraPath = cfg.HTTPCfg().HTTPJsonRPCURL
case rpcclient.BiRPCJSON:
address = cfg.SessionSCfg().ListenBijson
case rpcclient.BiRPCGOB:
address = cfg.SessionSCfg().ListenBigob
}
if _, port, err = net.SplitHostPort(address); err != nil {
return
@@ -273,3 +258,21 @@ func getConnPort(cfg *config.CGRConfig, transport string, tls bool) (port string
port += extraPath
return
}
func unmarshallRegisterArgs(req *http.Request, reqParams json.RawMessage) (dH []*engine.DispatcherHost, err error) {
var dHs RegisterArgs
params := []interface{}{&dHs}
if err = json.Unmarshal(reqParams, &params); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode params because: %s",
utils.RegistrarC, err))
return
}
var addr string
if addr, err = utils.GetRemoteIP(req); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to obtain the remote IP because: %s",
utils.RegistrarC, err))
return
}
return dHs.AsDispatcherHosts(addr), nil
}

162
registrarc/registrarc.go Normal file
View File

@@ -0,0 +1,162 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package registrarc
import (
"fmt"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
// NewRegistrarCService constructs a DispatcherHService
func NewRegistrarCService(cfg *config.CGRConfig,
connMgr *engine.ConnManager) *RegistrarCService {
return &RegistrarCService{
cfg: cfg,
connMgr: connMgr,
}
}
// RegistrarCService is the service handling dispatching towards internal components
// designed to handle automatic partitioning and failover
type RegistrarCService struct {
cfg *config.CGRConfig
connMgr *engine.ConnManager
}
// ListenAndServe will initialize the service
func (dhS *RegistrarCService) ListenAndServe(stopChan, rldChan <-chan struct{}) {
dTm, rTm := &time.Timer{}, &time.Timer{}
var dTmStarted, rTmStarted bool
if dTmStarted = dhS.cfg.RegistrarCCfg().Dispatcher.Enabled; dTmStarted {
dTm = time.NewTimer(dhS.cfg.RegistrarCCfg().Dispatcher.RefreshInterval)
dhS.registerDispHosts()
}
if rTmStarted = dhS.cfg.RegistrarCCfg().RPC.Enabled; rTmStarted {
rTm = time.NewTimer(dhS.cfg.RegistrarCCfg().RPC.RefreshInterval)
dhS.registerRPCHosts()
}
for {
select {
case <-rldChan:
if rTmStarted {
rTm.Stop()
}
if dTmStarted {
dTm.Stop()
}
if dTmStarted = dhS.cfg.RegistrarCCfg().Dispatcher.Enabled; dTmStarted {
dTm = time.NewTimer(dhS.cfg.RegistrarCCfg().Dispatcher.RefreshInterval)
dhS.registerDispHosts()
}
if rTmStarted = dhS.cfg.RegistrarCCfg().RPC.Enabled; rTmStarted {
rTm = time.NewTimer(dhS.cfg.RegistrarCCfg().RPC.RefreshInterval)
dhS.registerRPCHosts()
}
case <-stopChan:
if dhS.cfg.RegistrarCCfg().Dispatcher.Enabled {
dTm.Stop()
}
if dhS.cfg.RegistrarCCfg().RPC.Enabled {
rTm.Stop()
}
return
case <-dTm.C:
dhS.registerDispHosts()
dTm.Reset(dhS.cfg.RegistrarCCfg().Dispatcher.RefreshInterval)
case <-rTm.C:
dhS.registerRPCHosts()
rTm.Reset(dhS.cfg.RegistrarCCfg().RPC.RefreshInterval)
}
}
}
// Shutdown is called to shutdown the service
func (dhS *RegistrarCService) Shutdown() {
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.RegistrarC))
if dhS.cfg.RegistrarCCfg().Dispatcher.Enabled {
unregisterHosts(dhS.connMgr, dhS.cfg.RegistrarCCfg().Dispatcher,
dhS.cfg.GeneralCfg().DefaultTenant, utils.RegistrarSv1UnregisterDispatcherHosts)
}
if dhS.cfg.RegistrarCCfg().RPC.Enabled {
unregisterHosts(dhS.connMgr, dhS.cfg.RegistrarCCfg().RPC,
dhS.cfg.GeneralCfg().DefaultTenant, utils.RegistrarSv1UnregisterRPCHosts)
}
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.RegistrarC))
}
func (dhS *RegistrarCService) registerDispHosts() {
for _, connID := range dhS.cfg.RegistrarCCfg().Dispatcher.RegistrarSConns {
for tnt, hostCfgs := range dhS.cfg.RegistrarCCfg().Dispatcher.Hosts {
if tnt == utils.MetaDefault {
tnt = dhS.cfg.GeneralCfg().DefaultTenant
}
args, err := NewRegisterArgs(dhS.cfg, tnt, hostCfgs)
if err != nil {
continue
}
var rply string
if err := dhS.connMgr.Call([]string{connID}, nil, utils.RegistrarSv1RegisterDispatcherHosts, args, &rply); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts to the conn with ID <%s> because : %s",
utils.RegistrarC, connID, err))
continue
}
}
}
return
}
func (dhS *RegistrarCService) registerRPCHosts() {
for _, connID := range dhS.cfg.RegistrarCCfg().RPC.RegistrarSConns {
for tnt, hostCfgs := range dhS.cfg.RegistrarCCfg().RPC.Hosts {
if tnt == utils.MetaDefault {
tnt = dhS.cfg.GeneralCfg().DefaultTenant
}
args, err := NewRegisterArgs(dhS.cfg, tnt, hostCfgs)
if err != nil {
continue
}
var rply string
if err := dhS.connMgr.Call([]string{connID}, nil, utils.RegistrarSv1RegisterRPCHosts, args, &rply); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts to the conn with ID <%s> because : %s",
utils.RegistrarC, connID, err))
continue
}
}
}
return
}
func unregisterHosts(connMgr *engine.ConnManager, regCfg *config.RegistrarCCfg, dTnt, method string) {
var rply string
for _, connID := range regCfg.RegistrarSConns {
for tnt, hostCfgs := range regCfg.Hosts {
if tnt == utils.MetaDefault {
tnt = dTnt
}
if err := connMgr.Call([]string{connID}, nil, method, NewUnregisterArgs(tnt, hostCfgs), &rply); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to unregister the hosts with tenant<%s> to the conn with ID <%s> because : %s",
utils.RegistrarC, tnt, connID, err))
}
}
}
}

View File

@@ -32,7 +32,7 @@ import (
)
func TestDispatcherHostsService(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(Registar))
ts := httptest.NewServer(http.HandlerFunc(Registrar))
defer ts.Close()
cfg := config.NewDefaultCGRConfig()
@@ -45,26 +45,26 @@ func TestDispatcherHostsService(t *testing.T) {
Transport: rpcclient.HTTPjson,
}},
}
cfg.DispatcherHCfg().Enabled = true
cfg.DispatcherHCfg().Hosts = map[string][]*config.DispatcherHRegistarCfg{
cfg.RegistrarCCfg().Dispatcher.Enabled = true
cfg.RegistrarCCfg().Dispatcher.Hosts = map[string][]*config.RemoteHost{
utils.MetaDefault: {
{
ID: "Host1",
RegisterTransport: utils.MetaJSON,
ID: "Host1",
Transport: utils.MetaJSON,
},
},
}
cfg.DispatcherHCfg().RefreshInterval = 100 * time.Millisecond
cfg.DispatcherHCfg().RegistrarSConns = []string{"conn1"}
cfg.RegistrarCCfg().Dispatcher.RefreshInterval = 100 * time.Millisecond
cfg.RegistrarCCfg().Dispatcher.RegistrarSConns = []string{"conn1"}
ds := NewRegistrarCService(cfg, engine.NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{}))
ds.registerHosts()
ds.registerDispHosts()
host1 := &engine.DispatcherHost{
Tenant: "cgrates.org",
ID: "Host1",
Conn: &config.RemoteHost{
RemoteHost: &config.RemoteHost{
ID: "Host1",
Address: "127.0.0.1:2012",
Transport: utils.MetaJSON,
},
@@ -75,24 +75,24 @@ func TestDispatcherHostsService(t *testing.T) {
} else if !reflect.DeepEqual(host1, x) {
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(host1), utils.ToJSON(x))
}
cfg.DispatcherHCfg().Hosts = map[string][]*config.DispatcherHRegistarCfg{
cfg.RegistrarCCfg().Dispatcher.Hosts = map[string][]*config.RemoteHost{
utils.MetaDefault: {
{
ID: "Host2",
RegisterTransport: utils.MetaJSON,
ID: "Host2",
Transport: utils.MetaJSON,
},
},
}
config.CgrConfig().CacheCfg().Partitions[utils.CacheDispatcherHosts].Replicate = true
config.CgrConfig().CacheCfg().ReplicationConns = []string{"*localhost"}
ds.registerHosts()
ds.registerDispHosts()
host1.ID = "Host2"
if x, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); !ok {
t.Errorf("Expected to find Host2 in cache")
} else if !reflect.DeepEqual(host1, x) {
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(host1), utils.ToJSON(x))
}
ds.unregisterHosts()
unregisterHosts(ds.connMgr, cfg.RegistrarCCfg().Dispatcher, "cgrates.org", utils.RegistrarSv1UnregisterDispatcherHosts)
if _, ok := engine.Cache.Get(utils.CacheDispatcherHosts, host1.TenantID()); ok {
t.Errorf("Expected to not find Host2 in cache")
}
@@ -101,11 +101,11 @@ func TestDispatcherHostsService(t *testing.T) {
config.CgrConfig().CacheCfg().ReplicationConns = []string{}
host1.ID = "Host1"
cfg.DispatcherHCfg().Hosts = map[string][]*config.DispatcherHRegistarCfg{
cfg.RegistrarCCfg().Dispatcher.Hosts = map[string][]*config.RemoteHost{
utils.MetaDefault: {
{
ID: "Host1",
RegisterTransport: utils.MetaJSON,
ID: "Host1",
Transport: utils.MetaJSON,
},
},
}
@@ -115,11 +115,11 @@ func TestDispatcherHostsService(t *testing.T) {
}
cfg.ListenCfg().RPCJSONListen = "2012"
ds.registerHosts()
ds.registerDispHosts()
ds = NewRegistrarCService(cfg, engine.NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{}))
ds.Shutdown()
stopChan := make(chan struct{})
close(stopChan)
ds.ListenAndServe(stopChan)
ds.ListenAndServe(stopChan, make(chan struct{}))
}