From fccaf489a4fa981eca4e490e859281dc81d191ce Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 1 Apr 2019 18:32:38 +0200 Subject: [PATCH] Dispatcher with support for dynamic hosts --- dispatchers/dispatchers.go | 4 +- dispatchers/libdispatcher.go | 184 ++++++++++++++++++---------------- engine/datamanager.go | 18 +++- engine/dispatcherprfl.go | 76 ++++++-------- engine/dispatcherprfl_test.go | 50 ++++----- engine/loader_csv_test.go | 12 +-- engine/model_helpers.go | 59 +++++------ engine/model_helpers_test.go | 14 +-- loaders/loader_test.go | 10 +- utils/apitpdata.go | 36 +++---- utils/errors.go | 1 + 11 files changed, 238 insertions(+), 226 deletions(-) diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go index 2367e2079..1d6da800b 100755 --- a/dispatchers/dispatchers.go +++ b/dispatchers/dispatchers.go @@ -128,7 +128,7 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent, d = x.(Dispatcher) return } - if d, err = newDispatcher(matchedPrlf); err != nil { + if d, err = newDispatcher(dS.dm, matchedPrlf); err != nil { return } engine.Cache.Set(utils.CacheDispatchers, tntID, d, nil, @@ -143,7 +143,7 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, routeID if errDsp != nil { return utils.NewErrDispatcherS(errDsp) } - return d.Dispatch(dS.conns, routeID, serviceMethod, args, reply) + return d.Dispatch(routeID, serviceMethod, args, reply) } func (dS *DispatcherService) authorizeEvent(ev *utils.CGREvent, diff --git a/dispatchers/libdispatcher.go b/dispatchers/libdispatcher.go index cb4c2e0b0..ba284296f 100644 --- a/dispatchers/libdispatcher.go +++ b/dispatchers/libdispatcher.go @@ -24,7 +24,6 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/rpcclient" ) // Dispatcher is responsible for routing requests to pool of connections @@ -33,44 +32,49 @@ type Dispatcher interface { // SetProfile is used to update the configuration information within dispatcher // to make sure we take decisions based on latest config SetProfile(pfl *engine.DispatcherProfile) - // ConnIDs returns the ordered list of hosts IDs - ConnIDs() (conns []string) + // HostIDs returns the ordered list of host IDs + HostIDs() (hostIDs []string) // Dispatch is used to send the method over the connections given - Dispatch(conns map[string]*rpcclient.RpcClientPool, routeID *string, + Dispatch(routeID *string, serviceMethod string, args interface{}, reply interface{}) (err error) } -type StrategyDispatcher interface { - // Dispatch is used to send the method over the connections given - Dispatch(connIDs []string, conns map[string]*rpcclient.RpcClientPool, routeID *string, +type strategyDispatcher interface { + // dispatch is used to send the method over the connections given + dispatch(dm *engine.DataManager, routeID *string, tnt string, hostIDs []string, serviceMethod string, args interface{}, reply interface{}) (err error) } -type singleResultStrategyDispatcher struct{} -type brodcastStrategyDispatcher struct{} - // newDispatcher constructs instances of Dispatcher -func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) { - pfl.Conns.Sort() // make sure the connections are sorted +func newDispatcher(dm *engine.DataManager, pfl *engine.DispatcherProfile) (d Dispatcher, err error) { + pfl.Hosts.Sort() // make sure the connections are sorted switch pfl.Strategy { case utils.MetaWeight: d = &WeightDispatcher{ - conns: pfl.Conns.Clone(), - strategy: new(singleResultStrategyDispatcher), + dm: dm, + tnt: pfl.Tenant, + hosts: pfl.Hosts.Clone(), + strategy: new(singleResultstrategyDispatcher), } case utils.MetaRandom: d = &RandomDispatcher{ - conns: pfl.Conns.Clone(), - strategy: new(singleResultStrategyDispatcher), + dm: dm, + tnt: pfl.Tenant, + hosts: pfl.Hosts.Clone(), + strategy: new(singleResultstrategyDispatcher), } case utils.MetaRoundRobin: d = &RoundRobinDispatcher{ - conns: pfl.Conns.Clone(), - strategy: new(singleResultStrategyDispatcher), + dm: dm, + tnt: pfl.Tenant, + hosts: pfl.Hosts.Clone(), + strategy: new(singleResultstrategyDispatcher), } case utils.MetaBroadcast: d = &BroadcastDispatcher{ - conns: pfl.Conns.Clone(), + dm: dm, + tnt: pfl.Tenant, + hosts: pfl.Hosts.Clone(), strategy: new(brodcastStrategyDispatcher), } default: @@ -82,144 +86,156 @@ func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) { // WeightDispatcher selects the next connection based on weight type WeightDispatcher struct { sync.RWMutex - conns engine.DispatcherConns - strategy StrategyDispatcher + dm *engine.DataManager + tnt string + hosts engine.DispatcherHostProfiles + strategy strategyDispatcher } func (wd *WeightDispatcher) SetProfile(pfl *engine.DispatcherProfile) { wd.Lock() - pfl.Conns.Sort() - wd.conns = pfl.Conns.Clone() // avoid concurrency on profile + pfl.Hosts.Sort() + wd.hosts = pfl.Hosts.Clone() // avoid concurrency on profile wd.Unlock() return } -func (wd *WeightDispatcher) ConnIDs() (connIDs []string) { +func (wd *WeightDispatcher) HostIDs() (hostIDs []string) { wd.RLock() - connIDs = wd.conns.ConnIDs() + hostIDs = wd.hosts.HostIDs() wd.RUnlock() return } -func (wd *WeightDispatcher) Dispatch(conns map[string]*rpcclient.RpcClientPool, routeID *string, +func (wd *WeightDispatcher) Dispatch(routeID *string, serviceMethod string, args interface{}, reply interface{}) (err error) { - return wd.strategy.Dispatch(wd.ConnIDs(), conns, routeID, serviceMethod, args, reply) + return wd.strategy.dispatch(wd.dm, routeID, wd.tnt, wd.HostIDs(), + serviceMethod, args, reply) } // RandomDispatcher selects the next connection randomly // together with RouteID can serve as load-balancer type RandomDispatcher struct { sync.RWMutex - conns engine.DispatcherConns - strategy StrategyDispatcher + dm *engine.DataManager + tnt string + hosts engine.DispatcherHostProfiles + strategy strategyDispatcher } func (d *RandomDispatcher) SetProfile(pfl *engine.DispatcherProfile) { d.Lock() - d.conns = pfl.Conns.Clone() + d.hosts = pfl.Hosts.Clone() d.Unlock() return } -func (d *RandomDispatcher) ConnIDs() (connIDs []string) { +func (d *RandomDispatcher) HostIDs() (hostIDs []string) { d.RLock() - conns := d.conns.Clone() + hosts := d.hosts.Clone() d.RUnlock() - conns.Shuffle() // randomize the connections - return conns.ConnIDs() + hosts.Shuffle() // randomize the connections + return hosts.HostIDs() } -func (d *RandomDispatcher) Dispatch(conns map[string]*rpcclient.RpcClientPool, routeID *string, +func (d *RandomDispatcher) Dispatch(routeID *string, serviceMethod string, args interface{}, reply interface{}) (err error) { - return d.strategy.Dispatch(d.ConnIDs(), conns, routeID, serviceMethod, args, reply) + return d.strategy.dispatch(d.dm, routeID, d.tnt, d.HostIDs(), + serviceMethod, args, reply) } // RoundRobinDispatcher selects the next connection in round-robin fashion type RoundRobinDispatcher struct { sync.RWMutex - conns engine.DispatcherConns - connIdx int // used for the next connection - strategy StrategyDispatcher + dm *engine.DataManager + tnt string + hosts engine.DispatcherHostProfiles + hostIdx int // used for the next connection + strategy strategyDispatcher } func (d *RoundRobinDispatcher) SetProfile(pfl *engine.DispatcherProfile) { d.Lock() - d.conns = pfl.Conns.Clone() + d.hosts = pfl.Hosts.Clone() d.Unlock() return } -func (d *RoundRobinDispatcher) ConnIDs() (connIDs []string) { +func (d *RoundRobinDispatcher) HostIDs() (hostIDs []string) { d.RLock() - conns := d.conns.Clone() - conns.ReorderFromIndex(d.connIdx) - d.connIdx++ - if d.connIdx >= len(d.conns) { - d.connIdx = 0 + hosts := d.hosts.Clone() + hosts.ReorderFromIndex(d.hostIdx) + d.hostIdx++ + if d.hostIdx >= len(d.hosts) { + d.hostIdx = 0 } d.RUnlock() - return conns.ConnIDs() + return hosts.HostIDs() } -func (d *RoundRobinDispatcher) Dispatch(conns map[string]*rpcclient.RpcClientPool, routeID *string, +func (d *RoundRobinDispatcher) Dispatch(routeID *string, serviceMethod string, args interface{}, reply interface{}) (err error) { - return d.strategy.Dispatch(d.ConnIDs(), conns, routeID, serviceMethod, args, reply) + return d.strategy.dispatch(d.dm, routeID, d.tnt, d.HostIDs(), + serviceMethod, args, reply) } -// RoundRobinDispatcher selects the next connection in round-robin fashion +// BroadcastDispatcher will send the request to multiple hosts simultaneously type BroadcastDispatcher struct { sync.RWMutex - conns engine.DispatcherConns - strategy StrategyDispatcher + dm *engine.DataManager + tnt string + hosts engine.DispatcherHostProfiles + strategy strategyDispatcher } func (d *BroadcastDispatcher) SetProfile(pfl *engine.DispatcherProfile) { d.Lock() - pfl.Conns.Sort() - d.conns = pfl.Conns.Clone() // avoid concurrency on profile + pfl.Hosts.Sort() + d.hosts = pfl.Hosts.Clone() d.Unlock() return } -func (d *BroadcastDispatcher) ConnIDs() (connIDs []string) { +func (d *BroadcastDispatcher) HostIDs() (hostIDs []string) { d.RLock() - connIDs = d.conns.ConnIDs() + hostIDs = d.hosts.HostIDs() d.RUnlock() return } -func (d *BroadcastDispatcher) Dispatch(conns map[string]*rpcclient.RpcClientPool, routeID *string, +func (d *BroadcastDispatcher) Dispatch(routeID *string, serviceMethod string, args interface{}, reply interface{}) (lastErr error) { // no cache needed for this strategy because we need to call all connections - return d.strategy.Dispatch(d.ConnIDs(), conns, routeID, serviceMethod, args, reply) + return d.strategy.dispatch(d.dm, routeID, d.tnt, d.HostIDs(), + serviceMethod, args, reply) } -func (ign *singleResultStrategyDispatcher) Dispatch(connIDs []string, conns map[string]*rpcclient.RpcClientPool, routeID *string, +type singleResultstrategyDispatcher struct{} + +func (_ *singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID *string, tnt string, hostIDs []string, serviceMethod string, args interface{}, reply interface{}) (err error) { - var connID string + var dH *engine.DispatcherHost if routeID != nil && *routeID != "" { // use previously discovered route if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes, *routeID); ok && x != nil { - connID = x.(string) - if err = conns[connID].Call(serviceMethod, args, reply); !utils.IsNetworkError(err) { + dH = x.(*engine.DispatcherHost) + if err = dH.Call(serviceMethod, args, reply); !utils.IsNetworkError(err) { return } } } - for _, connID = range connIDs { - conn, has := conns[connID] - if !has { - err = utils.NewErrDispatcherS( - fmt.Errorf("no connection with id: <%s>", connID)) - continue + for _, hostID := range hostIDs { + if dH, err = dm.GetDispatcherHost(tnt, hostID, true, true, utils.NonTransactional); err != nil { + err = utils.NewErrDispatcherS(err) + return } - if err = conn.Call(serviceMethod, args, reply); utils.IsNetworkError(err) { + if err = dH.Call(serviceMethod, args, reply); utils.IsNetworkError(err) { continue } if routeID != nil && *routeID != "" { // cache the discovered route - engine.Cache.Set(utils.CacheDispatcherRoutes, *routeID, connID, + engine.Cache.Set(utils.CacheDispatcherRoutes, *routeID, dH, nil, true, utils.EmptyString) } break @@ -227,26 +243,24 @@ func (ign *singleResultStrategyDispatcher) Dispatch(connIDs []string, conns map[ return } -func (ign *brodcastStrategyDispatcher) Dispatch(connIDs []string, conns map[string]*rpcclient.RpcClientPool, routeID *string, +type brodcastStrategyDispatcher struct{} + +func (_ *brodcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID *string, tnt string, hostIDs []string, serviceMethod string, args interface{}, reply interface{}) (err error) { var hasErrors bool - for _, connID := range connIDs { - conn, has := conns[connID] - if !has { - err = utils.NewErrDispatcherS( - fmt.Errorf("no connection with id: <%s>", connID)) - utils.Logger.Err(fmt.Sprintf("<%s> Error at %s strategy for connID %q : %s", - utils.DispatcherS, utils.MetaBroadcast, connID, err.Error())) - hasErrors = true - continue + for _, hostID := range hostIDs { + var dH *engine.DispatcherHost + if dH, err = dm.GetDispatcherHost(tnt, hostID, true, true, utils.NonTransactional); err != nil { + err = utils.NewErrDispatcherS(err) + return } - if err = conn.Call(serviceMethod, args, reply); utils.IsNetworkError(err) { - utils.Logger.Err(fmt.Sprintf("<%s> Network Error at %s strategy for connID %q : %s", - utils.DispatcherS, utils.MetaBroadcast, connID, err.Error())) + if err = dH.Call(serviceMethod, args, reply); utils.IsNetworkError(err) { + utils.Logger.Err(fmt.Sprintf("<%s> network error: <%s> at %s strategy for hostID %q", + utils.DispatcherS, err.Error(), utils.MetaBroadcast, hostID)) hasErrors = true } else if err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Error at %s strategy for connID %q : %s", - utils.DispatcherS, utils.MetaBroadcast, connID, err.Error())) + utils.Logger.Err(fmt.Sprintf("<%s> error: <%s> at %s strategy for hostID %q", + utils.DispatcherS, err.Error(), utils.MetaBroadcast, hostID)) hasErrors = true } } diff --git a/engine/datamanager.go b/engine/datamanager.go index 46380b32d..7fd62f6b0 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -18,10 +18,12 @@ package engine import ( "fmt" "strings" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" "github.com/cgrates/ltcache" + "github.com/cgrates/rpcclient" ) var ( @@ -1307,7 +1309,7 @@ func (dm *DataManager) RemoveDispatcherProfile(tenant, id string, } func (dm *DataManager) GetDispatcherHost(tenant, id string, cacheRead, cacheWrite bool, - transactionID string) (dpp *DispatcherHost, err error) { + transactionID string) (dH *DispatcherHost, err error) { tntID := utils.ConcatenatedKey(tenant, id) if cacheRead { if x, ok := Cache.Get(utils.CacheDispatcherHosts, tntID); ok { @@ -1317,7 +1319,7 @@ func (dm *DataManager) GetDispatcherHost(tenant, id string, cacheRead, cacheWrit return x.(*DispatcherHost), nil } } - dpp, err = dm.dataDB.GetDispatcherHostDrv(tenant, id) + dH, err = dm.dataDB.GetDispatcherHostDrv(tenant, id) if err != nil { if err == utils.ErrNotFound && cacheWrite { Cache.Set(utils.CacheDispatcherHosts, tntID, nil, nil, @@ -1325,8 +1327,18 @@ func (dm *DataManager) GetDispatcherHost(tenant, id string, cacheRead, cacheWrit } return nil, err } + cfg := config.CgrConfig() + if dH.rpcConn, err = NewRPCPool( + rpcclient.POOL_FIRST, + cfg.TlsCfg().ClientKey, + cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, + cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, + cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, + dH.Conns, nil, time.Duration(0), false); err != nil { + return nil, err + } if cacheWrite { - Cache.Set(utils.CacheDispatcherHosts, tntID, dpp, nil, + Cache.Set(utils.CacheDispatcherHosts, tntID, dH, nil, cacheCommit(transactionID), transactionID) } return diff --git a/engine/dispatcherprfl.go b/engine/dispatcherprfl.go index 4ec8bf62a..2b976b77c 100644 --- a/engine/dispatcherprfl.go +++ b/engine/dispatcherprfl.go @@ -21,14 +21,13 @@ package engine import ( "math/rand" "sort" - "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) -type DispatcherConn struct { +type DispatcherHostProfile struct { ID string FilterIDs []string Weight float64 // applied in case of multiple connections need to be ordered @@ -36,8 +35,8 @@ type DispatcherConn struct { Blocker bool // no connection after this one } -func (dC *DispatcherConn) Clone() (cln *DispatcherConn) { - cln = &DispatcherConn{ +func (dC *DispatcherHostProfile) Clone() (cln *DispatcherHostProfile) { + cln = &DispatcherHostProfile{ ID: dC.ID, Weight: dC.Weight, Blocker: dC.Blocker, @@ -57,46 +56,46 @@ func (dC *DispatcherConn) Clone() (cln *DispatcherConn) { return } -type DispatcherConns []*DispatcherConn +type DispatcherHostProfiles []*DispatcherHostProfile // Sort is part of sort interface, sort based on Weight -func (dConns DispatcherConns) Sort() { - sort.Slice(dConns, func(i, j int) bool { return dConns[i].Weight > dConns[j].Weight }) +func (dHPrfls DispatcherHostProfiles) Sort() { + sort.Slice(dHPrfls, func(i, j int) bool { return dHPrfls[i].Weight > dHPrfls[j].Weight }) } // ReorderFromIndex will consider idx as starting point for the reordered slice -func (dConns DispatcherConns) ReorderFromIndex(idx int) { - initConns := dConns.Clone() - for i := 0; i < len(dConns); i++ { - if idx > len(dConns)-1 { +func (dHPrfls DispatcherHostProfiles) ReorderFromIndex(idx int) { + initConns := dHPrfls.Clone() + for i := 0; i < len(dHPrfls); i++ { + if idx > len(dHPrfls)-1 { idx = 0 } - dConns[i] = initConns[idx] + dHPrfls[i] = initConns[idx] idx++ } return } // Shuffle will mix the connections in place -func (dConns DispatcherConns) Shuffle() { - rand.Shuffle(len(dConns), func(i, j int) { - dConns[i], dConns[j] = dConns[j], dConns[i] +func (dHPrfls DispatcherHostProfiles) Shuffle() { + rand.Shuffle(len(dHPrfls), func(i, j int) { + dHPrfls[i], dHPrfls[j] = dHPrfls[j], dHPrfls[i] }) return } -func (dConns DispatcherConns) Clone() (cln DispatcherConns) { - cln = make(DispatcherConns, len(dConns)) - for i, dConn := range dConns { - cln[i] = dConn.Clone() +func (dHPrfls DispatcherHostProfiles) Clone() (cln DispatcherHostProfiles) { + cln = make(DispatcherHostProfiles, len(dHPrfls)) + for i, dHPrfl := range dHPrfls { + cln[i] = dHPrfl.Clone() } return } -func (dConns DispatcherConns) ConnIDs() (connIDs []string) { - connIDs = make([]string, len(dConns)) - for i, conn := range dConns { - connIDs[i] = conn.ID +func (dHPrfls DispatcherHostProfiles) HostIDs() (hostIDs []string) { + hostIDs = make([]string, len(dHPrfls)) + for i, hostPrfl := range dHPrfls { + hostIDs[i] = hostPrfl.ID } return } @@ -111,7 +110,7 @@ type DispatcherProfile struct { Strategy string StrategyParams map[string]interface{} // ie for distribution, set here the pool weights Weight float64 // used for profile sorting on match - Conns DispatcherConns // dispatch to these connections + Hosts DispatcherHostProfiles // dispatch to these connections } func (dP *DispatcherProfile) TenantID() string { @@ -126,19 +125,13 @@ func (dps DispatcherProfiles) Sort() { sort.Slice(dps, func(i, j int) bool { return dps[i].Weight > dps[j].Weight }) } -<<<<<<< HEAD -type DispatcherHostConn struct { - Address string - Transport string - TLS bool -} -======= -// DispatcherHost represents one virtual host with po ->>>>>>> DispatcherHost.GetRPCConnection + +// DispatcherHost represents one virtual host used by dispatcher +>>>>>>> Dispatcher with support for dynamic hosts type DispatcherHost struct { Tenant string ID string - Conns []*config.HaPoolConfig + Conns []*config.RemoteHost rpcConn rpcclient.RpcClientConnection } @@ -147,18 +140,9 @@ func (dH *DispatcherHost) TenantID() string { } // GetRPCConnection builds or returns the cached connection -func (dH *DispatcherHost) GetRPCConnection() (rpcConn rpcclient.RpcClientConnection, err error) { +func (dH *DispatcherHost) Call(serviceMethod string, args interface{}, reply interface{}) error { if dH.rpcConn == nil { - cfg := config.CgrConfig() - if dH.rpcConn, err = NewRPCPool( - rpcclient.POOL_FIRST, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - dH.Conns, nil, time.Duration(0), false); err != nil { - return - } + return utils.ErrNotConnected } - return dH.rpcConn, nil + return dH.rpcConn.Call(serviceMethod, args, reply) } diff --git a/engine/dispatcherprfl_test.go b/engine/dispatcherprfl_test.go index bf5d03dfd..7e8998279 100644 --- a/engine/dispatcherprfl_test.go +++ b/engine/dispatcherprfl_test.go @@ -24,13 +24,13 @@ import ( "github.com/cgrates/cgrates/utils" ) -func TestDispatcherConnClone(t *testing.T) { - dConn := &DispatcherConn{ +func TestDispatcherHostProfileClone(t *testing.T) { + dConn := &DispatcherHostProfile{ ID: "DSP_1", Weight: 30, FilterIDs: []string{"*string:Usage:10"}, } - eConn := &DispatcherConn{ + eConn := &DispatcherHostProfile{ ID: "DSP_1", Weight: 30, FilterIDs: []string{"*string:Usage:10"}, @@ -42,13 +42,13 @@ func TestDispatcherConnClone(t *testing.T) { } } -func TestDispatcherConnsReorderFromIndex(t *testing.T) { - dConns := DispatcherConns{ +func TestDispatcherHostProfilesReorderFromIndex(t *testing.T) { + dConns := DispatcherHostProfiles{ {ID: "DSP_1", Weight: 30}, {ID: "DSP_2", Weight: 20}, {ID: "DSP_3", Weight: 10}, } - eConns := DispatcherConns{ + eConns := DispatcherHostProfiles{ {ID: "DSP_1", Weight: 30}, {ID: "DSP_2", Weight: 20}, {ID: "DSP_3", Weight: 10}, @@ -56,7 +56,7 @@ func TestDispatcherConnsReorderFromIndex(t *testing.T) { if dConns.ReorderFromIndex(0); !reflect.DeepEqual(eConns, dConns) { t.Errorf("expecting: %+v, received: %+v", eConns, dConns) } - dConns = DispatcherConns{ + dConns = DispatcherHostProfiles{ {ID: "DSP_1", Weight: 30}, {ID: "DSP_2", Weight: 20}, {ID: "DSP_3", Weight: 10}, @@ -64,12 +64,12 @@ func TestDispatcherConnsReorderFromIndex(t *testing.T) { if dConns.ReorderFromIndex(3); !reflect.DeepEqual(eConns, dConns) { t.Errorf("expecting: %+v, received: %+v", eConns, dConns) } - dConns = DispatcherConns{ + dConns = DispatcherHostProfiles{ {ID: "DSP_1", Weight: 30}, {ID: "DSP_2", Weight: 20}, {ID: "DSP_3", Weight: 10}, } - eConns = DispatcherConns{ + eConns = DispatcherHostProfiles{ {ID: "DSP_3", Weight: 10}, {ID: "DSP_1", Weight: 30}, {ID: "DSP_2", Weight: 20}, @@ -77,12 +77,12 @@ func TestDispatcherConnsReorderFromIndex(t *testing.T) { if dConns.ReorderFromIndex(2); !reflect.DeepEqual(eConns, dConns) { t.Errorf("expecting: %+v, received: %+v", eConns, dConns) } - dConns = DispatcherConns{ + dConns = DispatcherHostProfiles{ {ID: "DSP_1", Weight: 30}, {ID: "DSP_2", Weight: 20}, {ID: "DSP_3", Weight: 10}, } - eConns = DispatcherConns{ + eConns = DispatcherHostProfiles{ {ID: "DSP_2", Weight: 20}, {ID: "DSP_3", Weight: 10}, {ID: "DSP_1", Weight: 30}, @@ -93,13 +93,13 @@ func TestDispatcherConnsReorderFromIndex(t *testing.T) { } } -func TestDispatcherConnsShuffle(t *testing.T) { - dConns := DispatcherConns{ +func TestDispatcherHostProfilesShuffle(t *testing.T) { + dConns := DispatcherHostProfiles{ {ID: "DSP_1", Weight: 30}, {ID: "DSP_2", Weight: 20}, {ID: "DSP_3", Weight: 10}, } - oConns := DispatcherConns{ + oConns := DispatcherHostProfiles{ {ID: "DSP_1", Weight: 30}, {ID: "DSP_2", Weight: 20}, {ID: "DSP_3", Weight: 10}, @@ -110,13 +110,13 @@ func TestDispatcherConnsShuffle(t *testing.T) { } } -func TestDispatcherConnsSort(t *testing.T) { - dConns := DispatcherConns{ +func TestDispatcherHostProfilesSort(t *testing.T) { + dConns := DispatcherHostProfiles{ {ID: "DSP_3", Weight: 10}, {ID: "DSP_2", Weight: 20}, {ID: "DSP_1", Weight: 30}, } - eConns := DispatcherConns{ + eConns := DispatcherHostProfiles{ {ID: "DSP_1", Weight: 30}, {ID: "DSP_2", Weight: 20}, {ID: "DSP_3", Weight: 10}, @@ -124,14 +124,14 @@ func TestDispatcherConnsSort(t *testing.T) { if dConns.Sort(); !reflect.DeepEqual(eConns, dConns) { t.Errorf("expecting: %+v, received: %+v", utils.ToJSON(eConns), utils.ToJSON(dConns)) } - dConns = DispatcherConns{ + dConns = DispatcherHostProfiles{ {ID: "DSP_3", Weight: 10}, {ID: "DSP_5", Weight: 50}, {ID: "DSP_2", Weight: 20}, {ID: "DSP_4", Weight: 40}, {ID: "DSP_1", Weight: 30}, } - eConns = DispatcherConns{ + eConns = DispatcherHostProfiles{ {ID: "DSP_5", Weight: 50}, {ID: "DSP_4", Weight: 40}, {ID: "DSP_1", Weight: 30}, @@ -147,13 +147,13 @@ func TestDispatcherConnsSort(t *testing.T) { } } -func TestDispatcherConnsClone(t *testing.T) { - dConns := DispatcherConns{ +func TestDispatcherHostProfilesClone(t *testing.T) { + dConns := DispatcherHostProfiles{ {ID: "DSP_1", Weight: 30}, {ID: "DSP_2", Weight: 20}, {ID: "DSP_3", Weight: 10, FilterIDs: []string{"*string:Usage:10"}}, } - eConns := DispatcherConns{ + eConns := DispatcherHostProfiles{ {ID: "DSP_1", Weight: 30}, {ID: "DSP_2", Weight: 20}, {ID: "DSP_3", Weight: 10, FilterIDs: []string{"*string:Usage:10"}}, @@ -165,8 +165,8 @@ func TestDispatcherConnsClone(t *testing.T) { } } -func TestDispatcherConnsConnIDs(t *testing.T) { - dConns := DispatcherConns{ +func TestDispatcherHostProfilesConnIDs(t *testing.T) { + dConns := DispatcherHostProfiles{ {ID: "DSP_5", Weight: 50}, {ID: "DSP_4", Weight: 40}, {ID: "DSP_1", Weight: 30}, @@ -174,7 +174,7 @@ func TestDispatcherConnsConnIDs(t *testing.T) { {ID: "DSP_3", Weight: 10}, } eConnIDs := []string{"DSP_5", "DSP_4", "DSP_1", "DSP_2", "DSP_3"} - if dConnIDs := dConns.ConnIDs(); !reflect.DeepEqual(eConnIDs, dConnIDs) { + if dConnIDs := dConns.HostIDs(); !reflect.DeepEqual(eConnIDs, dConnIDs) { t.Errorf("expecting: %+v, received: %+v", utils.ToJSON(eConnIDs), utils.ToJSON(dConnIDs)) } } diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index 497e67dd4..ec2d22dc1 100644 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -1580,15 +1580,15 @@ func TestLoadDispatcherProfiles(t *testing.T) { }, Strategy: "*first", Weight: 20, - Conns: []*utils.TPDispatcherConns{ - &utils.TPDispatcherConns{ + Hosts: []*utils.TPDispatcherHostProfile{ + &utils.TPDispatcherHostProfile{ ID: "C1", FilterIDs: []string{"*gt:Usage:10"}, Weight: 10, Params: []interface{}{"192.168.56.203"}, Blocker: false, }, - &utils.TPDispatcherConns{ + &utils.TPDispatcherHostProfile{ ID: "C2", FilterIDs: []string{"*lt:Usage:10"}, Weight: 10, @@ -1608,15 +1608,15 @@ func TestLoadDispatcherProfiles(t *testing.T) { }, Strategy: "*first", Weight: 20, - Conns: []*utils.TPDispatcherConns{ - &utils.TPDispatcherConns{ + Hosts: []*utils.TPDispatcherHostProfile{ + &utils.TPDispatcherHostProfile{ ID: "C2", FilterIDs: []string{"*lt:Usage:10"}, Weight: 10, Params: []interface{}{"192.168.56.204"}, Blocker: false, }, - &utils.TPDispatcherConns{ + &utils.TPDispatcherHostProfile{ ID: "C1", FilterIDs: []string{"*gt:Usage:10"}, Weight: 10, diff --git a/engine/model_helpers.go b/engine/model_helpers.go index d4cc57a17..c0ff2255b 100644 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -2352,7 +2352,7 @@ func (tps TPDispatcherProfiles) AsTPDispatcherProfiles() (result []*utils.TPDisp mst := make(map[string]*utils.TPDispatcherProfile) filterMap := make(map[string]utils.StringMap) contextMap := make(map[string]utils.StringMap) - connsMap := make(map[string]map[string]utils.TPDispatcherConns) + connsMap := make(map[string]map[string]utils.TPDispatcherHostProfile) connsFilterMap := make(map[string]map[string]utils.StringMap) for _, tp := range tps { tenantID := (&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID() @@ -2402,11 +2402,11 @@ func (tps TPDispatcherProfiles) AsTPDispatcherProfiles() (result []*utils.TPDisp } if tp.ConnID != "" { if _, has := connsMap[tenantID]; !has { - connsMap[tenantID] = make(map[string]utils.TPDispatcherConns) + connsMap[tenantID] = make(map[string]utils.TPDispatcherHostProfile) } conn, has := connsMap[tenantID][tp.ConnID] if !has { - conn = utils.TPDispatcherConns{ + conn = utils.TPDispatcherHostProfile{ ID: tp.ConnID, Weight: tp.ConnWeight, Blocker: tp.ConnBlocker, @@ -2454,13 +2454,14 @@ func (tps TPDispatcherProfiles) AsTPDispatcherProfiles() (result []*utils.TPDisp conn.FilterIDs = append(conn.FilterIDs, filter) } } - result[i].Conns = append(result[i].Conns, &utils.TPDispatcherConns{ - ID: conn.ID, - FilterIDs: conn.FilterIDs, - Weight: conn.Weight, - Params: conn.Params, - Blocker: conn.Blocker, - }) + result[i].Hosts = append(result[i].Hosts, + &utils.TPDispatcherHostProfile{ + ID: conn.ID, + FilterIDs: conn.FilterIDs, + Weight: conn.Weight, + Params: conn.Params, + Blocker: conn.Blocker, + }) } i++ } @@ -2497,7 +2498,7 @@ func APItoModelTPDispatcherProfile(tpDPP *utils.TPDispatcherProfile) (mdls TPDis strategy := paramsToString(tpDPP.StrategyParams) - if len(tpDPP.Conns) == 0 { + if len(tpDPP.Hosts) == 0 { return append(mdls, &TPDispatcherProfile{ Tpid: tpDPP.TPid, Tenant: tpDPP.Tenant, @@ -2511,8 +2512,8 @@ func APItoModelTPDispatcherProfile(tpDPP *utils.TPDispatcherProfile) (mdls TPDis }) } - confilter := strings.Join(tpDPP.Conns[0].FilterIDs, utils.INFIELD_SEP) - conparam := paramsToString(tpDPP.Conns[0].Params) + confilter := strings.Join(tpDPP.Hosts[0].FilterIDs, utils.INFIELD_SEP) + conparam := paramsToString(tpDPP.Hosts[0].Params) mdls = append(mdls, &TPDispatcherProfile{ Tpid: tpDPP.TPid, @@ -2525,24 +2526,24 @@ func APItoModelTPDispatcherProfile(tpDPP *utils.TPDispatcherProfile) (mdls TPDis StrategyParameters: strategy, Weight: tpDPP.Weight, - ConnID: tpDPP.Conns[0].ID, + ConnID: tpDPP.Hosts[0].ID, ConnFilterIDs: confilter, - ConnWeight: tpDPP.Conns[0].Weight, - ConnBlocker: tpDPP.Conns[0].Blocker, + ConnWeight: tpDPP.Hosts[0].Weight, + ConnBlocker: tpDPP.Hosts[0].Blocker, ConnParameters: conparam, }) - for i := 1; i < len(tpDPP.Conns); i++ { - confilter = strings.Join(tpDPP.Conns[i].FilterIDs, utils.INFIELD_SEP) - conparam = paramsToString(tpDPP.Conns[i].Params) + for i := 1; i < len(tpDPP.Hosts); i++ { + confilter = strings.Join(tpDPP.Hosts[i].FilterIDs, utils.INFIELD_SEP) + conparam = paramsToString(tpDPP.Hosts[i].Params) mdls = append(mdls, &TPDispatcherProfile{ Tpid: tpDPP.TPid, Tenant: tpDPP.Tenant, ID: tpDPP.ID, - ConnID: tpDPP.Conns[i].ID, + ConnID: tpDPP.Hosts[i].ID, ConnFilterIDs: confilter, - ConnWeight: tpDPP.Conns[i].Weight, - ConnBlocker: tpDPP.Conns[i].Blocker, + ConnWeight: tpDPP.Hosts[i].Weight, + ConnBlocker: tpDPP.Hosts[i].Blocker, ConnParameters: conparam, }) } @@ -2558,7 +2559,7 @@ func APItoDispatcherProfile(tpDPP *utils.TPDispatcherProfile, timezone string) ( FilterIDs: make([]string, len(tpDPP.FilterIDs)), Subsystems: make([]string, len(tpDPP.Subsystems)), StrategyParams: make(map[string]interface{}), - Conns: make(DispatcherConns, len(tpDPP.Conns)), + Hosts: make(DispatcherHostProfiles, len(tpDPP.Hosts)), } for i, fli := range tpDPP.FilterIDs { dpp.FilterIDs[i] = fli @@ -2571,8 +2572,8 @@ func APItoDispatcherProfile(tpDPP *utils.TPDispatcherProfile, timezone string) ( dpp.StrategyParams[strconv.Itoa(i)] = param } } - for i, conn := range tpDPP.Conns { - dpp.Conns[i] = &DispatcherConn{ + for i, conn := range tpDPP.Hosts { + dpp.Hosts[i] = &DispatcherHostProfile{ ID: conn.ID, Weight: conn.Weight, Blocker: conn.Blocker, @@ -2580,11 +2581,11 @@ func APItoDispatcherProfile(tpDPP *utils.TPDispatcherProfile, timezone string) ( Params: make(map[string]interface{}), } for j, fltr := range conn.FilterIDs { - dpp.Conns[i].FilterIDs[j] = fltr + dpp.Hosts[i].FilterIDs[j] = fltr } for j, param := range conn.Params { if param != "" { - dpp.Conns[i].Params[strconv.Itoa(j)] = param + dpp.Hosts[i].Params[strconv.Itoa(j)] = param } } } @@ -2661,10 +2662,10 @@ func APItoDispatcherHost(tpDPH *utils.TPDispatcherHost) (dpp *DispatcherHost) { dpp = &DispatcherHost{ Tenant: tpDPH.Tenant, ID: tpDPH.ID, - Conns: make([]*config.HaPoolConfig, len(tpDPH.Conns)), + Conns: make([]*config.RemoteHost, len(tpDPH.Conns)), } for i, conn := range tpDPH.Conns { - dpp.Conns[i] = &config.HaPoolConfig{ + dpp.Conns[i] = &config.RemoteHost{ Address: conn.Address, Transport: conn.Transport, TLS: conn.TLS, diff --git a/engine/model_helpers_test.go b/engine/model_helpers_test.go index 4bdd7bed9..f6d5d9e4c 100644 --- a/engine/model_helpers_test.go +++ b/engine/model_helpers_test.go @@ -1639,8 +1639,8 @@ func TestAPItoDispatcherProfile(t *testing.T) { }, StrategyParams: []interface{}{}, Weight: 20, - Conns: []*utils.TPDispatcherConns{ - &utils.TPDispatcherConns{ + Hosts: []*utils.TPDispatcherHostProfile{ + &utils.TPDispatcherHostProfile{ ID: "C1", FilterIDs: []string{}, Weight: 10, @@ -1661,8 +1661,8 @@ func TestAPItoDispatcherProfile(t *testing.T) { }, StrategyParams: map[string]interface{}{}, Weight: 20, - Conns: DispatcherConns{ - &DispatcherConn{ + Hosts: DispatcherHostProfiles{ + &DispatcherHostProfile{ ID: "C1", FilterIDs: []string{}, Weight: 10, @@ -1692,15 +1692,15 @@ func TestAPItoModelTPDispatcher(t *testing.T) { }, StrategyParams: []interface{}{}, Weight: 20, - Conns: []*utils.TPDispatcherConns{ - &utils.TPDispatcherConns{ + Hosts: []*utils.TPDispatcherHostProfile{ + &utils.TPDispatcherHostProfile{ ID: "C1", FilterIDs: []string{}, Weight: 10, Params: []interface{}{"192.168.54.203"}, Blocker: false, }, - &utils.TPDispatcherConns{ + &utils.TPDispatcherHostProfile{ ID: "C2", FilterIDs: []string{}, Weight: 10, diff --git a/loaders/loader_test.go b/loaders/loader_test.go index 67d44ad59..c494056cd 100644 --- a/loaders/loader_test.go +++ b/loaders/loader_test.go @@ -1152,14 +1152,14 @@ cgrates.org,EVENT1,,,,,,ALL,,10,,, Strategy: "*weight", StrategyParams: map[string]interface{}{}, Weight: 20, - Conns: engine.DispatcherConns{ - &engine.DispatcherConn{ + Hosts: engine.DispatcherHostProfiles{ + &engine.DispatcherHostProfile{ ID: "ALL2", FilterIDs: make([]string, 0), Weight: 20, Params: map[string]interface{}{}, }, - &engine.DispatcherConn{ + &engine.DispatcherHostProfile{ ID: "ALL", FilterIDs: make([]string, 0), Weight: 10, @@ -1173,8 +1173,8 @@ cgrates.org,EVENT1,,,,,,ALL,,10,,, if err != nil { t.Fatal(err) } - rcv.Conns.Sort() - eDisp.Conns.Sort() + rcv.Hosts.Sort() + eDisp.Hosts.Sort() if !reflect.DeepEqual(eDisp, rcv) { t.Errorf("expecting: %+v, received: %+v", utils.ToJSON(eDisp), utils.ToJSON(rcv)) } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index f67fc24b9..ffe8128db 100755 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1182,15 +1182,6 @@ type TPTntID struct { ID string } -// TPDispatcherConns is used in TPDispatcherProfile -type TPDispatcherConns struct { - ID string - FilterIDs []string - Weight float64 // applied in case of multiple connections need to be ordered - Params []interface{} // additional parameters stored for a session - Blocker bool // no connection after this one -} - // TPDispatcherProfile is used in APIs to manage remotely offline DispatcherProfile type TPDispatcherProfile struct { TPid string @@ -1202,7 +1193,24 @@ type TPDispatcherProfile struct { Strategy string StrategyParams []interface{} // ie for distribution, set here the pool weights Weight float64 - Conns []*TPDispatcherConns + Hosts []*TPDispatcherHostProfile +} + +// TPDispatcherHostProfile is used in TPDispatcherProfile +type TPDispatcherHostProfile struct { + ID string + FilterIDs []string + Weight float64 // applied in case of multiple connections need to be ordered + Params []interface{} // additional parameters stored for a session + Blocker bool // no connection after this one +} + +// TPDispatcherHost is used in APIs to manage remotely offline DispatcherHost +type TPDispatcherHost struct { + TPid string + Tenant string + ID string + Conns []*TPDispatcherHostConn } // TPDispatcherHostConn is used in TPDispatcherHost @@ -1212,14 +1220,6 @@ type TPDispatcherHostConn struct { TLS bool } -// TPDispatcherHostTPDispatcherHost is used in APIs to manage remotely offline DispatcherHost -type TPDispatcherHost struct { - TPid string - Tenant string - ID string - Conns []*TPDispatcherHostConn -} - type UsageInterval struct { Min *time.Duration Max *time.Duration diff --git a/utils/errors.go b/utils/errors.go index bb8f3a5cd..e91142fae 100644 --- a/utils/errors.go +++ b/utils/errors.go @@ -74,6 +74,7 @@ var ( ErrCDRCNoProfileID = errors.New("CDRC_PROFILE_WITHOUT_ID") ErrCDRCNoInDir = errors.New("CDRC_PROFILE_WITHOUT_IN_DIR") ErrNotEnoughParameters = errors.New("NotEnoughParameters") + ErrNotConnected = errors.New("NOT_CONNECTED") RalsErrorPrfx = "RALS_ERROR" DispatcherErrorPrefix = "DISPATCHER_ERROR" )