Added DispatcherH structure

This commit is contained in:
Trial97
2020-08-19 18:06:14 +03:00
committed by Dan Christian Bogos
parent 18e6578840
commit 4ab8185941
6 changed files with 284 additions and 5 deletions

103
dispatcherh/dispatcherh.go Normal file
View File

@@ -0,0 +1,103 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package dispatcherh
import (
"fmt"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
// NewDispatcherHService constructs a DispatcherHService
func NewDispatcherHService(dm *engine.DataManager,
cfg *config.CGRConfig, fltrS *engine.FilterS,
connMgr *engine.ConnManager) (*DispatcherHostsService, error) {
return &DispatcherHostsService{
cfg: cfg,
connMgr: connMgr,
}, nil
}
// DispatcherHostsService is the service handling dispatching towards internal components
// designed to handle automatic partitioning and failover
type DispatcherHostsService struct {
cfg *config.CGRConfig
connMgr *engine.ConnManager
}
// ListenAndServe will initialize the service
func (dhS *DispatcherHostsService) ListenAndServe(exitChan chan bool) (err error) {
utils.Logger.Info("Starting DispatcherH service")
for {
if err = dhS.registerHosts(); err != nil {
return
}
select {
case e := <-exitChan:
exitChan <- e // put back for the others listening for shutdown request
return
case <-time.After(dhS.cfg.DispatcherHCfg().RegisterInterval):
}
}
}
// Shutdown is called to shutdown the service
func (dhS *DispatcherHostsService) Shutdown() error {
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.DispatcherH))
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.DispatcherH))
return nil
}
func (dhS *DispatcherHostsService) registerHosts() (err error) {
dHs := make([]*engine.DispatcherHost, len(dhS.cfg.DispatcherHCfg().HostIDs))
for i, hID := range dhS.cfg.DispatcherHCfg().HostIDs {
tntID := utils.NewTenantID(hID)
dHs[i] = &engine.DispatcherHost{
ID: tntID.ID,
Tenant: tntID.Tenant,
Conns: make([]*config.RemoteHost, 1),
}
}
for _, connID := range dhS.cfg.DispatcherHCfg().DispatchersConns {
connCfg := dhS.cfg.RPCConns()[connID]
var conn *config.RemoteHost
if conn, err = getConnCfg(dhS.cfg, dhS.cfg.DispatcherHCfg().RegisterTransport, connCfg.Conns[0]); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to get the connection for<%s> because : %s",
utils.DispatcherH, connID, err))
continue
}
for _, dh := range dHs {
dh.Conns[0] = conn
}
var rply string
if err = dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1RegisterHosts, dHs, &rply); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts to the conn with ID <%s> because : %s",
utils.DispatcherH, connID, err))
continue
} else if rply != utils.OK {
utils.Logger.Warning(fmt.Sprintf("<%s> Unexpected reply recieved when setting the hosts: %s",
utils.DispatcherH, rply))
continue
}
}
return
}

View File

@@ -0,0 +1,149 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package dispatcherh
import (
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"strings"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// Registar handdle for httpServer to register the dispatcher hosts
func Registar(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
w.Header().Set("Content-Type", "application/json")
result, errMessage := utils.OK, utils.EmptyString
var err error
var id *json.RawMessage
if id, err = register(r); err != nil {
result, errMessage = utils.EmptyString, err.Error()
}
if err := utils.WriteServerResponse(w, id, result, errMessage); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to write resonse because: %s",
utils.DispatcherH, err))
}
}
func register(req *http.Request) (*json.RawMessage, error) {
sReq, err := utils.DecodeServerRequest(req.Body)
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode request because: %s",
utils.DispatcherH, err))
return nil, err
}
if sReq.Method != utils.DispatcherHv1RegisterHosts {
err = errors.New("rpc: can't find service " + sReq.Method)
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to register hosts because: %s",
utils.DispatcherH, err))
return sReq.Id, err
}
var dHs []*engine.DispatcherHost
params := []interface{}{dHs}
if err = json.Unmarshal(*sReq.Params, &params); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode params because: %s",
utils.DispatcherH, err))
return sReq.Id, err
}
var addr string
if addr, err = getIP(req); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to obtain the remote IP because: %s",
utils.DispatcherH, err))
return sReq.Id, err
}
for _, dH := range dHs {
if len(dH.Conns) != 1 { // ignore the hosts with no connections or more
continue
}
dH.Conns[0].Address = addr + dH.Conns[0].Address // the address contains the port
if err = engine.Cache.Set(utils.CacheDispatcherHosts, dH.Tenant, dH, nil,
false, utils.NonTransactional); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to set DispatcherHost <%s> in cache because: %s",
utils.DispatcherH, dH.TenantID(), err))
continue
}
}
return sReq.Id, nil
}
func getIP(r *http.Request) (ip string, err error) {
ip = r.Header.Get("X-REAL-IP")
if net.ParseIP(ip) != nil {
return
}
for _, ip = range strings.Split(r.Header.Get("X-FORWARDED-FOR"), utils.FIELDS_SEP) {
if net.ParseIP(ip) != nil {
return
}
}
if ip, _, err = net.SplitHostPort(r.RemoteAddr); err != nil {
return
}
if net.ParseIP(ip) != nil {
return
}
ip = utils.EmptyString
err = fmt.Errorf("no valid ip found")
return
}
func getConnCfg(cfg *config.CGRConfig, transport string, tmpl *config.RemoteHost) (conn *config.RemoteHost, err error) {
var address string
var extraPath string
switch transport {
case utils.MetaJSON:
if tmpl.TLS {
address = cfg.ListenCfg().RPCJSONTLSListen
} else {
address = cfg.ListenCfg().RPCJSONListen
}
case utils.MetaGOB:
if tmpl.TLS {
address = cfg.ListenCfg().RPCGOBTLSListen
} else {
address = cfg.ListenCfg().RPCGOBListen
}
case rpcclient.HTTPjson:
if tmpl.TLS {
address = cfg.ListenCfg().HTTPTLSListen
} else {
address = cfg.ListenCfg().HTTPListen
}
extraPath = cfg.HTTPCfg().HTTPJsonRPCURL
}
var port string
if _, port, err = net.SplitHostPort(address); err != nil {
return
}
conn = &config.RemoteHost{
Address: ":" + port + extraPath,
Synchronous: tmpl.Synchronous,
TLS: tmpl.TLS,
Transport: transport,
}
return
}

View File

@@ -34,9 +34,12 @@ import (
func NewDispatcherService(dm *engine.DataManager,
cfg *config.CGRConfig, fltrS *engine.FilterS,
connMgr *engine.ConnManager) (*DispatcherService, error) {
return &DispatcherService{dm: dm, cfg: cfg,
fltrS: fltrS, connMgr: connMgr}, nil
return &DispatcherService{
dm: dm,
cfg: cfg,
fltrS: fltrS,
connMgr: connMgr,
}, nil
}
// DispatcherService is the service handling dispatching towards internal components

View File

@@ -22,10 +22,10 @@ import (
"fmt"
"sync"
v2 "github.com/cgrates/cgrates/apier/v2"
v1 "github.com/cgrates/cgrates/apier/v1"
v2 "github.com/cgrates/cgrates/apier/v2"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/dispatcherh"
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -89,6 +89,9 @@ func (dspS *DispatcherService) Start() (err error) {
// for the moment we dispable Apier through dispatcher
// until we figured out a better sollution in case of gob server
// dspS.server.SetDispatched()
if len(dspS.cfg.HTTPCfg().DispatchersRegistrarURL) != 0 {
dspS.server.RegisterHttpFunc(dspS.cfg.HTTPCfg().DispatchersRegistrarURL, dispatcherh.Registar)
}
dspS.server.RpcRegister(v1.NewDispatcherSv1(dspS.dspS))

View File

@@ -901,6 +901,7 @@ const (
FilterS = "FilterS"
ThresholdS = "ThresholdS"
DispatcherS = "DispatcherS"
DispatcherH = "DispatcherH"
LoaderS = "LoaderS"
ChargerS = "ChargerS"
CacheS = "CacheS"
@@ -1542,6 +1543,11 @@ const (
DispatcherServicePing = "DispatcherService.Ping"
)
// DispatcherH APIs
const (
DispatcherHv1RegisterHosts = "DispatcherHv1.RegisterHosts"
)
// RateProfile APIs
const (
APIerSv1SetRateProfile = "APIerSv1.SetRateProfile"

View File

@@ -65,6 +65,12 @@ func NewCustomJSONServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec {
}
}
func DecodeServerRequest(r io.Reader) (req *serverRequest, err error) {
req = new(serverRequest)
err = json.NewDecoder(r).Decode(req)
return
}
type serverRequest struct {
Method string `json:"method"`
Params *json.RawMessage `json:"params"`
@@ -78,6 +84,15 @@ func (r *serverRequest) reset() {
r.Id = nil
}
func WriteServerResponse(w io.Writer, id *json.RawMessage, result, err interface{}) error {
return json.NewEncoder(w).Encode(
serverResponse{
Id: id,
Result: result,
Error: err,
})
}
type serverResponse struct {
Id *json.RawMessage `json:"id"`
Result interface{} `json:"result"`