mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-17 06:09:53 +05:00
Updated DispatcherH register API
This commit is contained in:
committed by
Dan Christian Bogos
parent
ef84af834c
commit
626b8b2404
@@ -32,6 +32,7 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/dispatcherh"
|
||||
"github.com/cgrates/cgrates/loaders"
|
||||
|
||||
v1 "github.com/cgrates/cgrates/apier/v1"
|
||||
@@ -504,7 +505,9 @@ func main() {
|
||||
|
||||
// Rpc/http server
|
||||
server := utils.NewServer()
|
||||
|
||||
if len(cfg.HTTPCfg().DispatchersRegistrarURL) != 0 {
|
||||
server.RegisterHttpFunc(cfg.HTTPCfg().DispatchersRegistrarURL, dispatcherh.Registar)
|
||||
}
|
||||
if *httpPprofPath != "" {
|
||||
go server.RegisterProfiler(*httpPprofPath)
|
||||
}
|
||||
|
||||
@@ -173,6 +173,7 @@ func NewDefaultCGRConfig() (cfg *CGRConfig, err error) {
|
||||
cfg.sureTaxCfg = new(SureTaxCfg)
|
||||
cfg.dispatcherSCfg = new(DispatcherSCfg)
|
||||
cfg.dispatcherHCfg = new(DispatcherHCfg)
|
||||
cfg.dispatcherHCfg.HostIDs = make(map[string][]string)
|
||||
cfg.loaderCgrCfg = new(LoaderCgrCfg)
|
||||
cfg.migratorCgrCfg = new(MigratorCgrCfg)
|
||||
cfg.mailerCfg = new(MailerCfg)
|
||||
|
||||
@@ -935,9 +935,10 @@ const CGRATES_CFG_JSON = `
|
||||
"dispatcherh":{
|
||||
"enabled": false,
|
||||
"dispatchers_conns": [],
|
||||
"host_ids": [],
|
||||
"host_ids": {},
|
||||
"register_interval": "5m",
|
||||
"register_transport": "*json",
|
||||
"register_tls": false,
|
||||
},
|
||||
|
||||
|
||||
|
||||
@@ -28,9 +28,10 @@ import (
|
||||
type DispatcherHCfg struct {
|
||||
Enabled bool
|
||||
DispatchersConns []string
|
||||
HostIDs []string
|
||||
HostIDs map[string][]string
|
||||
RegisterInterval time.Duration
|
||||
RegisterTransport string
|
||||
RegisterTLS bool
|
||||
}
|
||||
|
||||
func (dps *DispatcherHCfg) loadFromJsonCfg(jsnCfg *DispatcherHJsonCfg) (err error) {
|
||||
@@ -45,8 +46,9 @@ func (dps *DispatcherHCfg) loadFromJsonCfg(jsnCfg *DispatcherHJsonCfg) (err erro
|
||||
copy(dps.DispatchersConns, *jsnCfg.Dispatchers_conns)
|
||||
}
|
||||
if jsnCfg.Host_ids != nil {
|
||||
dps.HostIDs = make([]string, len(*jsnCfg.Host_ids))
|
||||
copy(dps.HostIDs, *jsnCfg.Host_ids)
|
||||
for tnt, id := range jsnCfg.Host_ids {
|
||||
dps.HostIDs[tnt] = id
|
||||
}
|
||||
}
|
||||
if jsnCfg.Register_interval != nil {
|
||||
if dps.RegisterInterval, err = utils.ParseDurationWithNanosecs(*jsnCfg.Register_interval); err != nil {
|
||||
@@ -56,6 +58,9 @@ func (dps *DispatcherHCfg) loadFromJsonCfg(jsnCfg *DispatcherHJsonCfg) (err erro
|
||||
if jsnCfg.Register_transport != nil {
|
||||
dps.RegisterTransport = *jsnCfg.Register_transport
|
||||
}
|
||||
if jsnCfg.Register_tls != nil {
|
||||
dps.RegisterTLS = *jsnCfg.Register_tls
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -38,21 +38,24 @@ func TestDispatcherHCfgloadFromJsonCfg(t *testing.T) {
|
||||
} else if !reflect.DeepEqual(daCfg, expected) {
|
||||
t.Errorf("Expected: %+v ,recived: %+v", expected, daCfg)
|
||||
}
|
||||
daCfg.HostIDs = make(map[string][]string)
|
||||
cfgJSONStr := `{
|
||||
"dispatcherh":{
|
||||
"enabled": true,
|
||||
"dispatchers_conns": ["conn1","conn2"],
|
||||
"host_ids": ["HOST1","HOST2"],
|
||||
"host_ids": {"*default":["HOST1","HOST2"]},
|
||||
"register_interval": "5m",
|
||||
"register_transport": "*json",
|
||||
"register_tls": true,
|
||||
},
|
||||
}`
|
||||
expected = DispatcherHCfg{
|
||||
Enabled: true,
|
||||
DispatchersConns: []string{"conn1", "conn2"},
|
||||
HostIDs: []string{"HOST1", "HOST2"},
|
||||
HostIDs: map[string][]string{utils.MetaDefault: {"HOST1", "HOST2"}},
|
||||
RegisterInterval: 5 * time.Minute,
|
||||
RegisterTransport: utils.MetaJSON,
|
||||
RegisterTLS: true,
|
||||
}
|
||||
if jsnCfg, err := NewCgrJsonCfgFromBytes([]byte(cfgJSONStr)); err != nil {
|
||||
t.Error(err)
|
||||
@@ -81,15 +84,16 @@ func TestDispatcherHCfgAsMapInterface(t *testing.T) {
|
||||
"dispatcherh":{
|
||||
"enabled": true,
|
||||
"dispatchers_conns": ["conn1","conn2"],
|
||||
"host_ids": ["HOST1","HOST2"],
|
||||
"host_ids": {"*default":["HOST1","HOST2"]},
|
||||
"register_interval": "5m",
|
||||
"register_transport": "*json",
|
||||
},
|
||||
}`
|
||||
daCfg.HostIDs = make(map[string][]string)
|
||||
eMap := map[string]interface{}{
|
||||
"enabled": true,
|
||||
"dispatchers_conns": []string{"conn1", "conn2"},
|
||||
"host_ids": []string{"HOST1", "HOST2"},
|
||||
"host_ids": map[string][]string{utils.MetaDefault: {"HOST1", "HOST2"}},
|
||||
"register_interval": 5 * time.Minute,
|
||||
"register_transport": "*json",
|
||||
}
|
||||
|
||||
@@ -545,9 +545,10 @@ type DispatcherSJsonCfg struct {
|
||||
type DispatcherHJsonCfg struct {
|
||||
Enabled *bool
|
||||
Dispatchers_conns *[]string
|
||||
Host_ids *[]string
|
||||
Host_ids map[string][]string
|
||||
Register_interval *string
|
||||
Register_transport *string
|
||||
Register_tls *bool
|
||||
}
|
||||
|
||||
type LoaderCfgJson struct {
|
||||
|
||||
@@ -2,4 +2,4 @@
|
||||
cgrates.org,SELF,*internal,,
|
||||
cgrates.org,ALL,127.0.0.1:6012,*json,false
|
||||
cgrates.org,ALL2,127.0.0.1:7012,*json,false
|
||||
cgrates.org,UnexistedHost,127.0.0.1:10012,*json,false
|
||||
cgrates.org,NonexistingHost,127.0.0.1:10012,*json,false
|
||||
|
||||
|
@@ -1,7 +1,7 @@
|
||||
#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight
|
||||
cgrates.org,PING1,*any,,,*weight,,ALL,,20,false,,10
|
||||
cgrates.org,PING1,,,,,,ALL2,,10,,,
|
||||
cgrates.org,PING2,*any,*string:~*req.EventName:UnexistedHost,,*weight,,UnexistedHost,,20,false,,20
|
||||
cgrates.org,PING2,*any,*string:~*req.EventName:NonexistingHost,,*weight,,NonexistingHost,,20,false,,20
|
||||
cgrates.org,PING2,,,,,,ALL2,,10,,,
|
||||
cgrates.org,EVENT1,*any,*string:~*req.EventName:Event1,,*weight,,ALL2,,20,false,,30
|
||||
cgrates.org,EVENT1,,,,,,ALL,,10,,,
|
||||
|
||||
|
@@ -2,4 +2,4 @@
|
||||
cgrates.org,SELF,*internal,,
|
||||
cgrates.org,ALL,127.0.0.1:6013,*gob,false
|
||||
cgrates.org,ALL2,127.0.0.1:7013,*gob,false
|
||||
cgrates.org,UnexistedHost,127.0.0.1:10012,*json,false
|
||||
cgrates.org,NonexistingHost,127.0.0.1:10012,*json,false
|
||||
|
@@ -1,7 +1,7 @@
|
||||
#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight
|
||||
cgrates.org,PING1,*any,,,*weight,,ALL,,20,false,,10
|
||||
cgrates.org,PING1,,,,,,ALL2,,10,,,
|
||||
cgrates.org,PING2,*any,*string:~*req.EventName:UnexistedHost,,*weight,,UnexistedHost,,20,false,,20
|
||||
cgrates.org,PING2,*any,*string:~*req.EventName:NonexistingHost,,*weight,,NonexistingHost,,20,false,,20
|
||||
cgrates.org,PING2,,,,,,ALL2,,10,,,
|
||||
cgrates.org,EVENT1,*any,*string:~*req.EventName:Event1,,*weight,,ALL2,,20,false,,30
|
||||
cgrates.org,EVENT1,,,,,,ALL,,10,,,
|
||||
|
||||
|
@@ -73,35 +73,35 @@ func (dhS *DispatcherHostsService) Shutdown() error {
|
||||
}
|
||||
|
||||
func (dhS *DispatcherHostsService) registerHosts() (err error) {
|
||||
dHs := make([]*engine.DispatcherHost, len(dhS.cfg.DispatcherHCfg().HostIDs))
|
||||
for i, hID := range dhS.cfg.DispatcherHCfg().HostIDs {
|
||||
tntID := utils.NewTenantID(hID)
|
||||
dHs[i] = &engine.DispatcherHost{
|
||||
ID: tntID.ID,
|
||||
Tenant: tntID.Tenant,
|
||||
Conns: make([]*config.RemoteHost, 1),
|
||||
}
|
||||
var port string
|
||||
if port, err = getConnPort(dhS.cfg,
|
||||
dhS.cfg.DispatcherHCfg().RegisterTransport,
|
||||
dhS.cfg.DispatcherHCfg().RegisterTLS); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to get the port because : %s",
|
||||
utils.DispatcherH, err))
|
||||
return
|
||||
}
|
||||
for _, connID := range dhS.cfg.DispatcherHCfg().DispatchersConns {
|
||||
connCfg := dhS.cfg.RPCConns()[connID]
|
||||
var conn *config.RemoteHost
|
||||
if conn, err = getConnCfg(dhS.cfg, dhS.cfg.DispatcherHCfg().RegisterTransport, connCfg.Conns[0]); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to get the connection for<%s> because : %s",
|
||||
utils.DispatcherH, connID, err))
|
||||
continue
|
||||
}
|
||||
for _, dh := range dHs {
|
||||
dh.Conns[0] = conn
|
||||
}
|
||||
var rply string
|
||||
if err := dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1RegisterHosts, dHs, &rply); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts to the conn with ID <%s> because : %s",
|
||||
utils.DispatcherH, connID, err))
|
||||
continue
|
||||
} else if rply != utils.OK {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Unexpected reply recieved when setting the hosts: %s",
|
||||
utils.DispatcherH, rply))
|
||||
continue
|
||||
for tnt, ids := range dhS.cfg.DispatcherHCfg().HostIDs {
|
||||
if tnt == utils.MetaDefault {
|
||||
tnt = dhS.cfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
var rply string
|
||||
if err := dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1RegisterHosts, &RegisterArgs{
|
||||
Tenant: tnt,
|
||||
IDs: ids,
|
||||
Port: port,
|
||||
Transport: dhS.cfg.DispatcherHCfg().RegisterTransport,
|
||||
TLS: dhS.cfg.DispatcherHCfg().RegisterTLS,
|
||||
}, &rply); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts to the conn with ID <%s> because : %s",
|
||||
utils.DispatcherH, connID, err))
|
||||
continue
|
||||
} else if rply != utils.OK {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Unexpected reply recieved when setting the hosts: %s",
|
||||
utils.DispatcherH, rply))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
@@ -110,14 +110,22 @@ func (dhS *DispatcherHostsService) registerHosts() (err error) {
|
||||
func (dhS *DispatcherHostsService) unregisterHosts() {
|
||||
var rply string
|
||||
for _, connID := range dhS.cfg.DispatcherHCfg().DispatchersConns {
|
||||
if err := dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1UnregisterHosts, dhS.cfg.DispatcherHCfg().HostIDs, &rply); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts to the conn with ID <%s> because : %s",
|
||||
utils.DispatcherH, connID, err))
|
||||
continue
|
||||
} else if rply != utils.OK {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Unexpected reply recieved when setting the hosts: %s",
|
||||
utils.DispatcherH, rply))
|
||||
continue
|
||||
for tnt, ids := range dhS.cfg.DispatcherHCfg().HostIDs {
|
||||
if tnt == utils.MetaDefault {
|
||||
tnt = dhS.cfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
if err := dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1UnregisterHosts, &UnregisterArgs{
|
||||
Tenant: tnt,
|
||||
IDs: ids,
|
||||
}, &rply); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts with tenant<%s> to the conn with ID <%s> because : %s",
|
||||
utils.DispatcherH, tnt, connID, err))
|
||||
continue
|
||||
} else if rply != utils.OK {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Unexpected reply recieved when setting the hosts for tenant<%s>: %s",
|
||||
utils.DispatcherH, tnt, rply))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,6 +32,40 @@ import (
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
// RegisterArgs the arguments to register the dispacher host
|
||||
type RegisterArgs struct {
|
||||
Tenant string
|
||||
Opts map[string]interface{}
|
||||
IDs []string
|
||||
Port string
|
||||
Transport string
|
||||
TLS bool
|
||||
}
|
||||
|
||||
// UnregisterArgs the arguments to unregister the dispacher host
|
||||
type UnregisterArgs struct {
|
||||
Tenant string
|
||||
Opts map[string]interface{}
|
||||
IDs []string
|
||||
}
|
||||
|
||||
// AsDispatcherHosts converts the arguments to DispatcherHosts
|
||||
func (rargs *RegisterArgs) AsDispatcherHosts(ip string) (dHs []*engine.DispatcherHost) {
|
||||
dHs = make([]*engine.DispatcherHost, len(rargs.IDs))
|
||||
for i, id := range rargs.IDs {
|
||||
dHs[i] = &engine.DispatcherHost{
|
||||
Tenant: rargs.Tenant,
|
||||
ID: id,
|
||||
Conns: []*config.RemoteHost{{
|
||||
Address: ip + ":" + rargs.Port,
|
||||
Transport: rargs.Transport,
|
||||
TLS: rargs.TLS,
|
||||
}},
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Registar handdle for httpServer to register the dispatcher hosts
|
||||
func Registar(w http.ResponseWriter, r *http.Request) {
|
||||
defer r.Body.Close()
|
||||
@@ -62,40 +96,36 @@ func register(req *http.Request) (*json.RawMessage, error) {
|
||||
utils.DispatcherH, err))
|
||||
return sReq.Id, err
|
||||
case utils.DispatcherHv1UnregisterHosts:
|
||||
var dHIDs []string
|
||||
params := []interface{}{dHIDs}
|
||||
var args UnregisterArgs
|
||||
params := []interface{}{&args}
|
||||
if err = json.Unmarshal(*sReq.Params, ¶ms); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode params because: %s",
|
||||
utils.DispatcherH, err))
|
||||
return sReq.Id, err
|
||||
}
|
||||
for _, id := range dHIDs {
|
||||
if err = engine.Cache.Remove(utils.CacheDispatcherHosts, id, false, utils.NonTransactional); err != nil {
|
||||
for _, id := range args.IDs {
|
||||
if err = engine.Cache.Remove(utils.CacheDispatcherHosts, utils.ConcatenatedKey(args.Tenant, id), false, utils.NonTransactional); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to remove DispatcherHost <%s> from cache because: %s",
|
||||
utils.DispatcherH, id, err))
|
||||
continue
|
||||
}
|
||||
}
|
||||
case utils.DispatcherHv1RegisterHosts:
|
||||
var dHs []*engine.DispatcherHost
|
||||
params := []interface{}{dHs}
|
||||
var dHs RegisterArgs
|
||||
params := []interface{}{&dHs}
|
||||
if err = json.Unmarshal(*sReq.Params, ¶ms); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode params because: %s",
|
||||
utils.DispatcherH, err))
|
||||
return sReq.Id, err
|
||||
}
|
||||
var addr string
|
||||
if addr, err = getIP(req); err != nil {
|
||||
if addr, err = getRemoteIP(req); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to obtain the remote IP because: %s",
|
||||
utils.DispatcherH, err))
|
||||
return sReq.Id, err
|
||||
}
|
||||
|
||||
for _, dH := range dHs {
|
||||
if len(dH.Conns) != 1 { // ignore the hosts with no connections or more
|
||||
continue
|
||||
}
|
||||
dH.Conns[0].Address = addr + dH.Conns[0].Address // the address contains the port
|
||||
for _, dH := range dHs.AsDispatcherHosts(addr) {
|
||||
if err = engine.Cache.Set(utils.CacheDispatcherHosts, dH.Tenant, dH, nil,
|
||||
false, utils.NonTransactional); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to set DispatcherHost <%s> in cache because: %s",
|
||||
@@ -107,7 +137,7 @@ func register(req *http.Request) (*json.RawMessage, error) {
|
||||
return sReq.Id, nil
|
||||
}
|
||||
|
||||
func getIP(r *http.Request) (ip string, err error) {
|
||||
func getRemoteIP(r *http.Request) (ip string, err error) {
|
||||
ip = r.Header.Get("X-REAL-IP")
|
||||
if net.ParseIP(ip) != nil {
|
||||
return
|
||||
@@ -128,39 +158,33 @@ func getIP(r *http.Request) (ip string, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func getConnCfg(cfg *config.CGRConfig, transport string, tmpl *config.RemoteHost) (conn *config.RemoteHost, err error) {
|
||||
func getConnPort(cfg *config.CGRConfig, transport string, tls bool) (port string, err error) {
|
||||
var address string
|
||||
var extraPath string
|
||||
switch transport {
|
||||
case utils.MetaJSON:
|
||||
if tmpl.TLS {
|
||||
if tls {
|
||||
address = cfg.ListenCfg().RPCJSONTLSListen
|
||||
} else {
|
||||
address = cfg.ListenCfg().RPCJSONListen
|
||||
}
|
||||
case utils.MetaGOB:
|
||||
if tmpl.TLS {
|
||||
if tls {
|
||||
address = cfg.ListenCfg().RPCGOBTLSListen
|
||||
} else {
|
||||
address = cfg.ListenCfg().RPCGOBListen
|
||||
}
|
||||
case rpcclient.HTTPjson:
|
||||
if tmpl.TLS {
|
||||
if tls {
|
||||
address = cfg.ListenCfg().HTTPTLSListen
|
||||
} else {
|
||||
address = cfg.ListenCfg().HTTPListen
|
||||
}
|
||||
extraPath = cfg.HTTPCfg().HTTPJsonRPCURL
|
||||
}
|
||||
var port string
|
||||
if _, port, err = net.SplitHostPort(address); err != nil {
|
||||
return
|
||||
}
|
||||
conn = &config.RemoteHost{
|
||||
Address: ":" + port + extraPath,
|
||||
Synchronous: tmpl.Synchronous,
|
||||
TLS: tmpl.TLS,
|
||||
Transport: transport,
|
||||
}
|
||||
port += extraPath
|
||||
return
|
||||
}
|
||||
|
||||
@@ -145,7 +145,7 @@ func testDspAttrPingFailoverNotFoundHost(t *testing.T) {
|
||||
CGREvent: &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
Event: map[string]interface{}{
|
||||
"EventName": "UnexistedHost",
|
||||
"EventName": "NonexistingHost",
|
||||
},
|
||||
},
|
||||
Opts: map[string]interface{}{
|
||||
|
||||
@@ -228,11 +228,19 @@ func (*singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID
|
||||
}
|
||||
}
|
||||
}
|
||||
var called bool
|
||||
for _, hostID := range hostIDs {
|
||||
if dH, err = dm.GetDispatcherHost(tnt, hostID, true, true, utils.NonTransactional); err != nil {
|
||||
if err != utils.ErrNotFound {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> could not find host with ID %q",
|
||||
utils.DispatcherS, hostID))
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
err = utils.NewErrDispatcherS(err)
|
||||
return
|
||||
}
|
||||
called = true
|
||||
if err = dH.Call(serviceMethod, args, reply); utils.IsNetworkError(err) {
|
||||
continue
|
||||
}
|
||||
@@ -244,6 +252,10 @@ func (*singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID
|
||||
}
|
||||
break
|
||||
}
|
||||
if !called { // in case we do not match any host
|
||||
err = utils.ErrHostNotFound
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -252,12 +264,20 @@ type brodcastStrategyDispatcher struct{}
|
||||
func (*brodcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID string, subsystem, tnt string, hostIDs []string,
|
||||
serviceMethod string, args interface{}, reply interface{}) (err error) {
|
||||
var hasErrors bool
|
||||
var called bool
|
||||
for _, hostID := range hostIDs {
|
||||
var dH *engine.DispatcherHost
|
||||
if dH, err = dm.GetDispatcherHost(tnt, hostID, true, true, utils.NonTransactional); err != nil {
|
||||
if err != utils.ErrNotFound {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> could not find host with ID %q",
|
||||
utils.DispatcherS, hostID))
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
err = utils.NewErrDispatcherS(err)
|
||||
return
|
||||
}
|
||||
called = true
|
||||
if err = dH.Call(serviceMethod, args, reply); utils.IsNetworkError(err) {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> network error: <%s> at %s strategy for hostID %q",
|
||||
utils.DispatcherS, err.Error(), utils.MetaBroadcast, hostID))
|
||||
@@ -270,6 +290,9 @@ func (*brodcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID stri
|
||||
}
|
||||
if hasErrors { // rewrite err if not all call were succesfull
|
||||
return utils.ErrPartiallyExecuted
|
||||
} else if !called { // in case we do not match any host
|
||||
err = utils.ErrHostNotFound
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -356,11 +379,19 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID strin
|
||||
}
|
||||
}
|
||||
}
|
||||
var called bool
|
||||
for _, hostID := range lM.getHosts(hostIDs) {
|
||||
if dH, err = dm.GetDispatcherHost(tnt, hostID, true, true, utils.NonTransactional); err != nil {
|
||||
if err != utils.ErrNotFound {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> could not find host with ID %q",
|
||||
utils.DispatcherS, hostID))
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
err = utils.NewErrDispatcherS(err)
|
||||
return
|
||||
}
|
||||
called = true
|
||||
lM.incrementLoad(hostID, ld.tntID)
|
||||
err = dH.Call(serviceMethod, args, reply)
|
||||
lM.decrementLoad(hostID, ld.tntID) // call ended
|
||||
@@ -375,6 +406,10 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID strin
|
||||
}
|
||||
break
|
||||
}
|
||||
if !called { // in case we do not match any host
|
||||
err = utils.ErrHostNotFound
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -25,7 +25,6 @@ import (
|
||||
v1 "github.com/cgrates/cgrates/apier/v1"
|
||||
v2 "github.com/cgrates/cgrates/apier/v2"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/dispatcherh"
|
||||
"github.com/cgrates/cgrates/dispatchers"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
@@ -89,9 +88,6 @@ func (dspS *DispatcherService) Start() (err error) {
|
||||
// for the moment we dispable Apier through dispatcher
|
||||
// until we figured out a better sollution in case of gob server
|
||||
// dspS.server.SetDispatched()
|
||||
if len(dspS.cfg.HTTPCfg().DispatchersRegistrarURL) != 0 {
|
||||
dspS.server.RegisterHttpFunc(dspS.cfg.HTTPCfg().DispatchersRegistrarURL, dispatcherh.Registar)
|
||||
}
|
||||
|
||||
dspS.server.RpcRegister(v1.NewDispatcherSv1(dspS.dspS))
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@ var (
|
||||
ErrNoMoreData = errors.New("NO_MORE_DATA")
|
||||
ErrNotImplemented = errors.New("NOT_IMPLEMENTED")
|
||||
ErrNotFound = errors.New("NOT_FOUND")
|
||||
ErrHostNotFound = errors.New("HOST_NOT_FOUND")
|
||||
ErrTimedOut = errors.New("TIMED_OUT")
|
||||
ErrServerError = errors.New("SERVER_ERROR")
|
||||
ErrMaxRecursionDepth = errors.New("MAX_RECURSION_DEPTH")
|
||||
@@ -110,6 +111,7 @@ var (
|
||||
ErrMaxIncrementsExceeded.Error(): ErrMaxIncrementsExceeded,
|
||||
ErrIndexOutOfBounds.Error(): ErrIndexOutOfBounds,
|
||||
ErrWrongPath.Error(): ErrWrongPath,
|
||||
ErrHostNotFound.Error(): ErrHostNotFound,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user