diff --git a/dispatcherh/dispatcherh.go b/dispatcherh/dispatcherh.go
new file mode 100644
index 000000000..1a0f77804
--- /dev/null
+++ b/dispatcherh/dispatcherh.go
@@ -0,0 +1,103 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package dispatcherh
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+)
+
+// NewDispatcherHService constructs a DispatcherHService
+func NewDispatcherHService(dm *engine.DataManager,
+ cfg *config.CGRConfig, fltrS *engine.FilterS,
+ connMgr *engine.ConnManager) (*DispatcherHostsService, error) {
+ return &DispatcherHostsService{
+ cfg: cfg,
+ connMgr: connMgr,
+ }, nil
+}
+
+// DispatcherHostsService is the service handling dispatching towards internal components
+// designed to handle automatic partitioning and failover
+type DispatcherHostsService struct {
+ cfg *config.CGRConfig
+ connMgr *engine.ConnManager
+}
+
+// ListenAndServe will initialize the service
+func (dhS *DispatcherHostsService) ListenAndServe(exitChan chan bool) (err error) {
+ utils.Logger.Info("Starting DispatcherH service")
+ for {
+ if err = dhS.registerHosts(); err != nil {
+ return
+ }
+ select {
+ case e := <-exitChan:
+ exitChan <- e // put back for the others listening for shutdown request
+ return
+ case <-time.After(dhS.cfg.DispatcherHCfg().RegisterInterval):
+ }
+ }
+}
+
+// Shutdown is called to shutdown the service
+func (dhS *DispatcherHostsService) Shutdown() error {
+ utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.DispatcherH))
+ utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.DispatcherH))
+ return nil
+}
+
+func (dhS *DispatcherHostsService) registerHosts() (err error) {
+ dHs := make([]*engine.DispatcherHost, len(dhS.cfg.DispatcherHCfg().HostIDs))
+ for i, hID := range dhS.cfg.DispatcherHCfg().HostIDs {
+ tntID := utils.NewTenantID(hID)
+ dHs[i] = &engine.DispatcherHost{
+ ID: tntID.ID,
+ Tenant: tntID.Tenant,
+ Conns: make([]*config.RemoteHost, 1),
+ }
+ }
+ for _, connID := range dhS.cfg.DispatcherHCfg().DispatchersConns {
+ connCfg := dhS.cfg.RPCConns()[connID]
+ var conn *config.RemoteHost
+ if conn, err = getConnCfg(dhS.cfg, dhS.cfg.DispatcherHCfg().RegisterTransport, connCfg.Conns[0]); err != nil {
+ utils.Logger.Warning(fmt.Sprintf("<%s> Unable to get the connection for<%s> because : %s",
+ utils.DispatcherH, connID, err))
+ continue
+ }
+ for _, dh := range dHs {
+ dh.Conns[0] = conn
+ }
+ var rply string
+ if err = dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1RegisterHosts, dHs, &rply); err != nil {
+ utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts to the conn with ID <%s> because : %s",
+ utils.DispatcherH, connID, err))
+ continue
+ } else if rply != utils.OK {
+ utils.Logger.Warning(fmt.Sprintf("<%s> Unexpected reply recieved when setting the hosts: %s",
+ utils.DispatcherH, rply))
+ continue
+ }
+ }
+ return
+}
diff --git a/dispatcherh/libdispatcherh.go b/dispatcherh/libdispatcherh.go
new file mode 100644
index 000000000..76f87d7f0
--- /dev/null
+++ b/dispatcherh/libdispatcherh.go
@@ -0,0 +1,149 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package dispatcherh
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net"
+ "net/http"
+ "strings"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+ "github.com/cgrates/rpcclient"
+)
+
+// Registar handdle for httpServer to register the dispatcher hosts
+func Registar(w http.ResponseWriter, r *http.Request) {
+ defer r.Body.Close()
+ w.Header().Set("Content-Type", "application/json")
+ result, errMessage := utils.OK, utils.EmptyString
+ var err error
+ var id *json.RawMessage
+ if id, err = register(r); err != nil {
+ result, errMessage = utils.EmptyString, err.Error()
+ }
+ if err := utils.WriteServerResponse(w, id, result, errMessage); err != nil {
+ utils.Logger.Warning(fmt.Sprintf("<%s> Failed to write resonse because: %s",
+ utils.DispatcherH, err))
+ }
+}
+
+func register(req *http.Request) (*json.RawMessage, error) {
+ sReq, err := utils.DecodeServerRequest(req.Body)
+ if err != nil {
+ utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode request because: %s",
+ utils.DispatcherH, err))
+ return nil, err
+ }
+ if sReq.Method != utils.DispatcherHv1RegisterHosts {
+ err = errors.New("rpc: can't find service " + sReq.Method)
+ utils.Logger.Warning(fmt.Sprintf("<%s> Failed to register hosts because: %s",
+ utils.DispatcherH, err))
+ return sReq.Id, err
+ }
+ var dHs []*engine.DispatcherHost
+ params := []interface{}{dHs}
+ if err = json.Unmarshal(*sReq.Params, ¶ms); err != nil {
+ utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode params because: %s",
+ utils.DispatcherH, err))
+ return sReq.Id, err
+ }
+ var addr string
+ if addr, err = getIP(req); err != nil {
+ utils.Logger.Warning(fmt.Sprintf("<%s> Failed to obtain the remote IP because: %s",
+ utils.DispatcherH, err))
+ return sReq.Id, err
+ }
+
+ for _, dH := range dHs {
+ if len(dH.Conns) != 1 { // ignore the hosts with no connections or more
+ continue
+ }
+ dH.Conns[0].Address = addr + dH.Conns[0].Address // the address contains the port
+ if err = engine.Cache.Set(utils.CacheDispatcherHosts, dH.Tenant, dH, nil,
+ false, utils.NonTransactional); err != nil {
+ utils.Logger.Warning(fmt.Sprintf("<%s> Failed to set DispatcherHost <%s> in cache because: %s",
+ utils.DispatcherH, dH.TenantID(), err))
+ continue
+ }
+ }
+ return sReq.Id, nil
+}
+
+func getIP(r *http.Request) (ip string, err error) {
+ ip = r.Header.Get("X-REAL-IP")
+ if net.ParseIP(ip) != nil {
+ return
+ }
+ for _, ip = range strings.Split(r.Header.Get("X-FORWARDED-FOR"), utils.FIELDS_SEP) {
+ if net.ParseIP(ip) != nil {
+ return
+ }
+ }
+ if ip, _, err = net.SplitHostPort(r.RemoteAddr); err != nil {
+ return
+ }
+ if net.ParseIP(ip) != nil {
+ return
+ }
+ ip = utils.EmptyString
+ err = fmt.Errorf("no valid ip found")
+ return
+}
+
+func getConnCfg(cfg *config.CGRConfig, transport string, tmpl *config.RemoteHost) (conn *config.RemoteHost, err error) {
+ var address string
+ var extraPath string
+ switch transport {
+ case utils.MetaJSON:
+ if tmpl.TLS {
+ address = cfg.ListenCfg().RPCJSONTLSListen
+ } else {
+ address = cfg.ListenCfg().RPCJSONListen
+ }
+ case utils.MetaGOB:
+ if tmpl.TLS {
+ address = cfg.ListenCfg().RPCGOBTLSListen
+ } else {
+ address = cfg.ListenCfg().RPCGOBListen
+ }
+ case rpcclient.HTTPjson:
+ if tmpl.TLS {
+ address = cfg.ListenCfg().HTTPTLSListen
+ } else {
+ address = cfg.ListenCfg().HTTPListen
+ }
+ extraPath = cfg.HTTPCfg().HTTPJsonRPCURL
+ }
+ var port string
+ if _, port, err = net.SplitHostPort(address); err != nil {
+ return
+ }
+ conn = &config.RemoteHost{
+ Address: ":" + port + extraPath,
+ Synchronous: tmpl.Synchronous,
+ TLS: tmpl.TLS,
+ Transport: transport,
+ }
+ return
+}
diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go
index 166e21b54..9871a1ed8 100755
--- a/dispatchers/dispatchers.go
+++ b/dispatchers/dispatchers.go
@@ -34,9 +34,12 @@ import (
func NewDispatcherService(dm *engine.DataManager,
cfg *config.CGRConfig, fltrS *engine.FilterS,
connMgr *engine.ConnManager) (*DispatcherService, error) {
-
- return &DispatcherService{dm: dm, cfg: cfg,
- fltrS: fltrS, connMgr: connMgr}, nil
+ return &DispatcherService{
+ dm: dm,
+ cfg: cfg,
+ fltrS: fltrS,
+ connMgr: connMgr,
+ }, nil
}
// DispatcherService is the service handling dispatching towards internal components
diff --git a/services/dispatchers.go b/services/dispatchers.go
index 9522f1f0e..70223c2f2 100644
--- a/services/dispatchers.go
+++ b/services/dispatchers.go
@@ -22,10 +22,10 @@ import (
"fmt"
"sync"
- v2 "github.com/cgrates/cgrates/apier/v2"
-
v1 "github.com/cgrates/cgrates/apier/v1"
+ v2 "github.com/cgrates/cgrates/apier/v2"
"github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/dispatcherh"
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -89,6 +89,9 @@ func (dspS *DispatcherService) Start() (err error) {
// for the moment we dispable Apier through dispatcher
// until we figured out a better sollution in case of gob server
// dspS.server.SetDispatched()
+ if len(dspS.cfg.HTTPCfg().DispatchersRegistrarURL) != 0 {
+ dspS.server.RegisterHttpFunc(dspS.cfg.HTTPCfg().DispatchersRegistrarURL, dispatcherh.Registar)
+ }
dspS.server.RpcRegister(v1.NewDispatcherSv1(dspS.dspS))
diff --git a/utils/consts.go b/utils/consts.go
index 014671bb2..559b7df93 100755
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -901,6 +901,7 @@ const (
FilterS = "FilterS"
ThresholdS = "ThresholdS"
DispatcherS = "DispatcherS"
+ DispatcherH = "DispatcherH"
LoaderS = "LoaderS"
ChargerS = "ChargerS"
CacheS = "CacheS"
@@ -1542,6 +1543,11 @@ const (
DispatcherServicePing = "DispatcherService.Ping"
)
+// DispatcherH APIs
+const (
+ DispatcherHv1RegisterHosts = "DispatcherHv1.RegisterHosts"
+)
+
// RateProfile APIs
const (
APIerSv1SetRateProfile = "APIerSv1.SetRateProfile"
diff --git a/utils/json_codec.go b/utils/json_codec.go
index fad261653..473ecf356 100644
--- a/utils/json_codec.go
+++ b/utils/json_codec.go
@@ -65,6 +65,12 @@ func NewCustomJSONServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec {
}
}
+func DecodeServerRequest(r io.Reader) (req *serverRequest, err error) {
+ req = new(serverRequest)
+ err = json.NewDecoder(r).Decode(req)
+ return
+}
+
type serverRequest struct {
Method string `json:"method"`
Params *json.RawMessage `json:"params"`
@@ -78,6 +84,15 @@ func (r *serverRequest) reset() {
r.Id = nil
}
+func WriteServerResponse(w io.Writer, id *json.RawMessage, result, err interface{}) error {
+ return json.NewEncoder(w).Encode(
+ serverResponse{
+ Id: id,
+ Result: result,
+ Error: err,
+ })
+}
+
type serverResponse struct {
Id *json.RawMessage `json:"id"`
Result interface{} `json:"result"`