From 4ab8185941021e7afadfb3c607384bcb53869297 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 19 Aug 2020 18:06:14 +0300 Subject: [PATCH] Added DispatcherH structure --- dispatcherh/dispatcherh.go | 103 +++++++++++++++++++++++ dispatcherh/libdispatcherh.go | 149 ++++++++++++++++++++++++++++++++++ dispatchers/dispatchers.go | 9 +- services/dispatchers.go | 7 +- utils/consts.go | 6 ++ utils/json_codec.go | 15 ++++ 6 files changed, 284 insertions(+), 5 deletions(-) create mode 100644 dispatcherh/dispatcherh.go create mode 100644 dispatcherh/libdispatcherh.go diff --git a/dispatcherh/dispatcherh.go b/dispatcherh/dispatcherh.go new file mode 100644 index 000000000..1a0f77804 --- /dev/null +++ b/dispatcherh/dispatcherh.go @@ -0,0 +1,103 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package dispatcherh + +import ( + "fmt" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +// NewDispatcherHService constructs a DispatcherHService +func NewDispatcherHService(dm *engine.DataManager, + cfg *config.CGRConfig, fltrS *engine.FilterS, + connMgr *engine.ConnManager) (*DispatcherHostsService, error) { + return &DispatcherHostsService{ + cfg: cfg, + connMgr: connMgr, + }, nil +} + +// DispatcherHostsService is the service handling dispatching towards internal components +// designed to handle automatic partitioning and failover +type DispatcherHostsService struct { + cfg *config.CGRConfig + connMgr *engine.ConnManager +} + +// ListenAndServe will initialize the service +func (dhS *DispatcherHostsService) ListenAndServe(exitChan chan bool) (err error) { + utils.Logger.Info("Starting DispatcherH service") + for { + if err = dhS.registerHosts(); err != nil { + return + } + select { + case e := <-exitChan: + exitChan <- e // put back for the others listening for shutdown request + return + case <-time.After(dhS.cfg.DispatcherHCfg().RegisterInterval): + } + } +} + +// Shutdown is called to shutdown the service +func (dhS *DispatcherHostsService) Shutdown() error { + utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.DispatcherH)) + utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.DispatcherH)) + return nil +} + +func (dhS *DispatcherHostsService) registerHosts() (err error) { + dHs := make([]*engine.DispatcherHost, len(dhS.cfg.DispatcherHCfg().HostIDs)) + for i, hID := range dhS.cfg.DispatcherHCfg().HostIDs { + tntID := utils.NewTenantID(hID) + dHs[i] = &engine.DispatcherHost{ + ID: tntID.ID, + Tenant: tntID.Tenant, + Conns: make([]*config.RemoteHost, 1), + } + } + for _, connID := range dhS.cfg.DispatcherHCfg().DispatchersConns { + connCfg := dhS.cfg.RPCConns()[connID] + var conn *config.RemoteHost + if conn, err = getConnCfg(dhS.cfg, dhS.cfg.DispatcherHCfg().RegisterTransport, connCfg.Conns[0]); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Unable to get the connection for<%s> because : %s", + utils.DispatcherH, connID, err)) + continue + } + for _, dh := range dHs { + dh.Conns[0] = conn + } + var rply string + if err = dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1RegisterHosts, dHs, &rply); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts to the conn with ID <%s> because : %s", + utils.DispatcherH, connID, err)) + continue + } else if rply != utils.OK { + utils.Logger.Warning(fmt.Sprintf("<%s> Unexpected reply recieved when setting the hosts: %s", + utils.DispatcherH, rply)) + continue + } + } + return +} diff --git a/dispatcherh/libdispatcherh.go b/dispatcherh/libdispatcherh.go new file mode 100644 index 000000000..76f87d7f0 --- /dev/null +++ b/dispatcherh/libdispatcherh.go @@ -0,0 +1,149 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package dispatcherh + +import ( + "encoding/json" + "errors" + "fmt" + "net" + "net/http" + "strings" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// Registar handdle for httpServer to register the dispatcher hosts +func Registar(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + w.Header().Set("Content-Type", "application/json") + result, errMessage := utils.OK, utils.EmptyString + var err error + var id *json.RawMessage + if id, err = register(r); err != nil { + result, errMessage = utils.EmptyString, err.Error() + } + if err := utils.WriteServerResponse(w, id, result, errMessage); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Failed to write resonse because: %s", + utils.DispatcherH, err)) + } +} + +func register(req *http.Request) (*json.RawMessage, error) { + sReq, err := utils.DecodeServerRequest(req.Body) + if err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode request because: %s", + utils.DispatcherH, err)) + return nil, err + } + if sReq.Method != utils.DispatcherHv1RegisterHosts { + err = errors.New("rpc: can't find service " + sReq.Method) + utils.Logger.Warning(fmt.Sprintf("<%s> Failed to register hosts because: %s", + utils.DispatcherH, err)) + return sReq.Id, err + } + var dHs []*engine.DispatcherHost + params := []interface{}{dHs} + if err = json.Unmarshal(*sReq.Params, ¶ms); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode params because: %s", + utils.DispatcherH, err)) + return sReq.Id, err + } + var addr string + if addr, err = getIP(req); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Failed to obtain the remote IP because: %s", + utils.DispatcherH, err)) + return sReq.Id, err + } + + for _, dH := range dHs { + if len(dH.Conns) != 1 { // ignore the hosts with no connections or more + continue + } + dH.Conns[0].Address = addr + dH.Conns[0].Address // the address contains the port + if err = engine.Cache.Set(utils.CacheDispatcherHosts, dH.Tenant, dH, nil, + false, utils.NonTransactional); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Failed to set DispatcherHost <%s> in cache because: %s", + utils.DispatcherH, dH.TenantID(), err)) + continue + } + } + return sReq.Id, nil +} + +func getIP(r *http.Request) (ip string, err error) { + ip = r.Header.Get("X-REAL-IP") + if net.ParseIP(ip) != nil { + return + } + for _, ip = range strings.Split(r.Header.Get("X-FORWARDED-FOR"), utils.FIELDS_SEP) { + if net.ParseIP(ip) != nil { + return + } + } + if ip, _, err = net.SplitHostPort(r.RemoteAddr); err != nil { + return + } + if net.ParseIP(ip) != nil { + return + } + ip = utils.EmptyString + err = fmt.Errorf("no valid ip found") + return +} + +func getConnCfg(cfg *config.CGRConfig, transport string, tmpl *config.RemoteHost) (conn *config.RemoteHost, err error) { + var address string + var extraPath string + switch transport { + case utils.MetaJSON: + if tmpl.TLS { + address = cfg.ListenCfg().RPCJSONTLSListen + } else { + address = cfg.ListenCfg().RPCJSONListen + } + case utils.MetaGOB: + if tmpl.TLS { + address = cfg.ListenCfg().RPCGOBTLSListen + } else { + address = cfg.ListenCfg().RPCGOBListen + } + case rpcclient.HTTPjson: + if tmpl.TLS { + address = cfg.ListenCfg().HTTPTLSListen + } else { + address = cfg.ListenCfg().HTTPListen + } + extraPath = cfg.HTTPCfg().HTTPJsonRPCURL + } + var port string + if _, port, err = net.SplitHostPort(address); err != nil { + return + } + conn = &config.RemoteHost{ + Address: ":" + port + extraPath, + Synchronous: tmpl.Synchronous, + TLS: tmpl.TLS, + Transport: transport, + } + return +} diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go index 166e21b54..9871a1ed8 100755 --- a/dispatchers/dispatchers.go +++ b/dispatchers/dispatchers.go @@ -34,9 +34,12 @@ import ( func NewDispatcherService(dm *engine.DataManager, cfg *config.CGRConfig, fltrS *engine.FilterS, connMgr *engine.ConnManager) (*DispatcherService, error) { - - return &DispatcherService{dm: dm, cfg: cfg, - fltrS: fltrS, connMgr: connMgr}, nil + return &DispatcherService{ + dm: dm, + cfg: cfg, + fltrS: fltrS, + connMgr: connMgr, + }, nil } // DispatcherService is the service handling dispatching towards internal components diff --git a/services/dispatchers.go b/services/dispatchers.go index 9522f1f0e..70223c2f2 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -22,10 +22,10 @@ import ( "fmt" "sync" - v2 "github.com/cgrates/cgrates/apier/v2" - v1 "github.com/cgrates/cgrates/apier/v1" + v2 "github.com/cgrates/cgrates/apier/v2" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/dispatcherh" "github.com/cgrates/cgrates/dispatchers" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -89,6 +89,9 @@ func (dspS *DispatcherService) Start() (err error) { // for the moment we dispable Apier through dispatcher // until we figured out a better sollution in case of gob server // dspS.server.SetDispatched() + if len(dspS.cfg.HTTPCfg().DispatchersRegistrarURL) != 0 { + dspS.server.RegisterHttpFunc(dspS.cfg.HTTPCfg().DispatchersRegistrarURL, dispatcherh.Registar) + } dspS.server.RpcRegister(v1.NewDispatcherSv1(dspS.dspS)) diff --git a/utils/consts.go b/utils/consts.go index 014671bb2..559b7df93 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -901,6 +901,7 @@ const ( FilterS = "FilterS" ThresholdS = "ThresholdS" DispatcherS = "DispatcherS" + DispatcherH = "DispatcherH" LoaderS = "LoaderS" ChargerS = "ChargerS" CacheS = "CacheS" @@ -1542,6 +1543,11 @@ const ( DispatcherServicePing = "DispatcherService.Ping" ) +// DispatcherH APIs +const ( + DispatcherHv1RegisterHosts = "DispatcherHv1.RegisterHosts" +) + // RateProfile APIs const ( APIerSv1SetRateProfile = "APIerSv1.SetRateProfile" diff --git a/utils/json_codec.go b/utils/json_codec.go index fad261653..473ecf356 100644 --- a/utils/json_codec.go +++ b/utils/json_codec.go @@ -65,6 +65,12 @@ func NewCustomJSONServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec { } } +func DecodeServerRequest(r io.Reader) (req *serverRequest, err error) { + req = new(serverRequest) + err = json.NewDecoder(r).Decode(req) + return +} + type serverRequest struct { Method string `json:"method"` Params *json.RawMessage `json:"params"` @@ -78,6 +84,15 @@ func (r *serverRequest) reset() { r.Id = nil } +func WriteServerResponse(w io.Writer, id *json.RawMessage, result, err interface{}) error { + return json.NewEncoder(w).Encode( + serverResponse{ + Id: id, + Result: result, + Error: err, + }) +} + type serverResponse struct { Id *json.RawMessage `json:"id"` Result interface{} `json:"result"`