Dispatcher with support for dynamic hosts

This commit is contained in:
DanB
2019-04-01 18:32:38 +02:00
parent 8b2b5dece0
commit fccaf489a4
11 changed files with 238 additions and 226 deletions

View File

@@ -128,7 +128,7 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent,
d = x.(Dispatcher)
return
}
if d, err = newDispatcher(matchedPrlf); err != nil {
if d, err = newDispatcher(dS.dm, matchedPrlf); err != nil {
return
}
engine.Cache.Set(utils.CacheDispatchers, tntID, d, nil,
@@ -143,7 +143,7 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, routeID
if errDsp != nil {
return utils.NewErrDispatcherS(errDsp)
}
return d.Dispatch(dS.conns, routeID, serviceMethod, args, reply)
return d.Dispatch(routeID, serviceMethod, args, reply)
}
func (dS *DispatcherService) authorizeEvent(ev *utils.CGREvent,

View File

@@ -24,7 +24,6 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// Dispatcher is responsible for routing requests to pool of connections
@@ -33,44 +32,49 @@ type Dispatcher interface {
// SetProfile is used to update the configuration information within dispatcher
// to make sure we take decisions based on latest config
SetProfile(pfl *engine.DispatcherProfile)
// ConnIDs returns the ordered list of hosts IDs
ConnIDs() (conns []string)
// HostIDs returns the ordered list of host IDs
HostIDs() (hostIDs []string)
// Dispatch is used to send the method over the connections given
Dispatch(conns map[string]*rpcclient.RpcClientPool, routeID *string,
Dispatch(routeID *string,
serviceMethod string, args interface{}, reply interface{}) (err error)
}
type StrategyDispatcher interface {
// Dispatch is used to send the method over the connections given
Dispatch(connIDs []string, conns map[string]*rpcclient.RpcClientPool, routeID *string,
type strategyDispatcher interface {
// dispatch is used to send the method over the connections given
dispatch(dm *engine.DataManager, routeID *string, tnt string, hostIDs []string,
serviceMethod string, args interface{}, reply interface{}) (err error)
}
type singleResultStrategyDispatcher struct{}
type brodcastStrategyDispatcher struct{}
// newDispatcher constructs instances of Dispatcher
func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) {
pfl.Conns.Sort() // make sure the connections are sorted
func newDispatcher(dm *engine.DataManager, pfl *engine.DispatcherProfile) (d Dispatcher, err error) {
pfl.Hosts.Sort() // make sure the connections are sorted
switch pfl.Strategy {
case utils.MetaWeight:
d = &WeightDispatcher{
conns: pfl.Conns.Clone(),
strategy: new(singleResultStrategyDispatcher),
dm: dm,
tnt: pfl.Tenant,
hosts: pfl.Hosts.Clone(),
strategy: new(singleResultstrategyDispatcher),
}
case utils.MetaRandom:
d = &RandomDispatcher{
conns: pfl.Conns.Clone(),
strategy: new(singleResultStrategyDispatcher),
dm: dm,
tnt: pfl.Tenant,
hosts: pfl.Hosts.Clone(),
strategy: new(singleResultstrategyDispatcher),
}
case utils.MetaRoundRobin:
d = &RoundRobinDispatcher{
conns: pfl.Conns.Clone(),
strategy: new(singleResultStrategyDispatcher),
dm: dm,
tnt: pfl.Tenant,
hosts: pfl.Hosts.Clone(),
strategy: new(singleResultstrategyDispatcher),
}
case utils.MetaBroadcast:
d = &BroadcastDispatcher{
conns: pfl.Conns.Clone(),
dm: dm,
tnt: pfl.Tenant,
hosts: pfl.Hosts.Clone(),
strategy: new(brodcastStrategyDispatcher),
}
default:
@@ -82,144 +86,156 @@ func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) {
// WeightDispatcher selects the next connection based on weight
type WeightDispatcher struct {
sync.RWMutex
conns engine.DispatcherConns
strategy StrategyDispatcher
dm *engine.DataManager
tnt string
hosts engine.DispatcherHostProfiles
strategy strategyDispatcher
}
func (wd *WeightDispatcher) SetProfile(pfl *engine.DispatcherProfile) {
wd.Lock()
pfl.Conns.Sort()
wd.conns = pfl.Conns.Clone() // avoid concurrency on profile
pfl.Hosts.Sort()
wd.hosts = pfl.Hosts.Clone() // avoid concurrency on profile
wd.Unlock()
return
}
func (wd *WeightDispatcher) ConnIDs() (connIDs []string) {
func (wd *WeightDispatcher) HostIDs() (hostIDs []string) {
wd.RLock()
connIDs = wd.conns.ConnIDs()
hostIDs = wd.hosts.HostIDs()
wd.RUnlock()
return
}
func (wd *WeightDispatcher) Dispatch(conns map[string]*rpcclient.RpcClientPool, routeID *string,
func (wd *WeightDispatcher) Dispatch(routeID *string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
return wd.strategy.Dispatch(wd.ConnIDs(), conns, routeID, serviceMethod, args, reply)
return wd.strategy.dispatch(wd.dm, routeID, wd.tnt, wd.HostIDs(),
serviceMethod, args, reply)
}
// RandomDispatcher selects the next connection randomly
// together with RouteID can serve as load-balancer
type RandomDispatcher struct {
sync.RWMutex
conns engine.DispatcherConns
strategy StrategyDispatcher
dm *engine.DataManager
tnt string
hosts engine.DispatcherHostProfiles
strategy strategyDispatcher
}
func (d *RandomDispatcher) SetProfile(pfl *engine.DispatcherProfile) {
d.Lock()
d.conns = pfl.Conns.Clone()
d.hosts = pfl.Hosts.Clone()
d.Unlock()
return
}
func (d *RandomDispatcher) ConnIDs() (connIDs []string) {
func (d *RandomDispatcher) HostIDs() (hostIDs []string) {
d.RLock()
conns := d.conns.Clone()
hosts := d.hosts.Clone()
d.RUnlock()
conns.Shuffle() // randomize the connections
return conns.ConnIDs()
hosts.Shuffle() // randomize the connections
return hosts.HostIDs()
}
func (d *RandomDispatcher) Dispatch(conns map[string]*rpcclient.RpcClientPool, routeID *string,
func (d *RandomDispatcher) Dispatch(routeID *string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
return d.strategy.Dispatch(d.ConnIDs(), conns, routeID, serviceMethod, args, reply)
return d.strategy.dispatch(d.dm, routeID, d.tnt, d.HostIDs(),
serviceMethod, args, reply)
}
// RoundRobinDispatcher selects the next connection in round-robin fashion
type RoundRobinDispatcher struct {
sync.RWMutex
conns engine.DispatcherConns
connIdx int // used for the next connection
strategy StrategyDispatcher
dm *engine.DataManager
tnt string
hosts engine.DispatcherHostProfiles
hostIdx int // used for the next connection
strategy strategyDispatcher
}
func (d *RoundRobinDispatcher) SetProfile(pfl *engine.DispatcherProfile) {
d.Lock()
d.conns = pfl.Conns.Clone()
d.hosts = pfl.Hosts.Clone()
d.Unlock()
return
}
func (d *RoundRobinDispatcher) ConnIDs() (connIDs []string) {
func (d *RoundRobinDispatcher) HostIDs() (hostIDs []string) {
d.RLock()
conns := d.conns.Clone()
conns.ReorderFromIndex(d.connIdx)
d.connIdx++
if d.connIdx >= len(d.conns) {
d.connIdx = 0
hosts := d.hosts.Clone()
hosts.ReorderFromIndex(d.hostIdx)
d.hostIdx++
if d.hostIdx >= len(d.hosts) {
d.hostIdx = 0
}
d.RUnlock()
return conns.ConnIDs()
return hosts.HostIDs()
}
func (d *RoundRobinDispatcher) Dispatch(conns map[string]*rpcclient.RpcClientPool, routeID *string,
func (d *RoundRobinDispatcher) Dispatch(routeID *string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
return d.strategy.Dispatch(d.ConnIDs(), conns, routeID, serviceMethod, args, reply)
return d.strategy.dispatch(d.dm, routeID, d.tnt, d.HostIDs(),
serviceMethod, args, reply)
}
// RoundRobinDispatcher selects the next connection in round-robin fashion
// BroadcastDispatcher will send the request to multiple hosts simultaneously
type BroadcastDispatcher struct {
sync.RWMutex
conns engine.DispatcherConns
strategy StrategyDispatcher
dm *engine.DataManager
tnt string
hosts engine.DispatcherHostProfiles
strategy strategyDispatcher
}
func (d *BroadcastDispatcher) SetProfile(pfl *engine.DispatcherProfile) {
d.Lock()
pfl.Conns.Sort()
d.conns = pfl.Conns.Clone() // avoid concurrency on profile
pfl.Hosts.Sort()
d.hosts = pfl.Hosts.Clone()
d.Unlock()
return
}
func (d *BroadcastDispatcher) ConnIDs() (connIDs []string) {
func (d *BroadcastDispatcher) HostIDs() (hostIDs []string) {
d.RLock()
connIDs = d.conns.ConnIDs()
hostIDs = d.hosts.HostIDs()
d.RUnlock()
return
}
func (d *BroadcastDispatcher) Dispatch(conns map[string]*rpcclient.RpcClientPool, routeID *string,
func (d *BroadcastDispatcher) Dispatch(routeID *string,
serviceMethod string, args interface{}, reply interface{}) (lastErr error) { // no cache needed for this strategy because we need to call all connections
return d.strategy.Dispatch(d.ConnIDs(), conns, routeID, serviceMethod, args, reply)
return d.strategy.dispatch(d.dm, routeID, d.tnt, d.HostIDs(),
serviceMethod, args, reply)
}
func (ign *singleResultStrategyDispatcher) Dispatch(connIDs []string, conns map[string]*rpcclient.RpcClientPool, routeID *string,
type singleResultstrategyDispatcher struct{}
func (_ *singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID *string, tnt string, hostIDs []string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
var connID string
var dH *engine.DispatcherHost
if routeID != nil &&
*routeID != "" {
// use previously discovered route
if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes,
*routeID); ok && x != nil {
connID = x.(string)
if err = conns[connID].Call(serviceMethod, args, reply); !utils.IsNetworkError(err) {
dH = x.(*engine.DispatcherHost)
if err = dH.Call(serviceMethod, args, reply); !utils.IsNetworkError(err) {
return
}
}
}
for _, connID = range connIDs {
conn, has := conns[connID]
if !has {
err = utils.NewErrDispatcherS(
fmt.Errorf("no connection with id: <%s>", connID))
continue
for _, hostID := range hostIDs {
if dH, err = dm.GetDispatcherHost(tnt, hostID, true, true, utils.NonTransactional); err != nil {
err = utils.NewErrDispatcherS(err)
return
}
if err = conn.Call(serviceMethod, args, reply); utils.IsNetworkError(err) {
if err = dH.Call(serviceMethod, args, reply); utils.IsNetworkError(err) {
continue
}
if routeID != nil &&
*routeID != "" { // cache the discovered route
engine.Cache.Set(utils.CacheDispatcherRoutes, *routeID, connID,
engine.Cache.Set(utils.CacheDispatcherRoutes, *routeID, dH,
nil, true, utils.EmptyString)
}
break
@@ -227,26 +243,24 @@ func (ign *singleResultStrategyDispatcher) Dispatch(connIDs []string, conns map[
return
}
func (ign *brodcastStrategyDispatcher) Dispatch(connIDs []string, conns map[string]*rpcclient.RpcClientPool, routeID *string,
type brodcastStrategyDispatcher struct{}
func (_ *brodcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID *string, tnt string, hostIDs []string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
var hasErrors bool
for _, connID := range connIDs {
conn, has := conns[connID]
if !has {
err = utils.NewErrDispatcherS(
fmt.Errorf("no connection with id: <%s>", connID))
utils.Logger.Err(fmt.Sprintf("<%s> Error at %s strategy for connID %q : %s",
utils.DispatcherS, utils.MetaBroadcast, connID, err.Error()))
hasErrors = true
continue
for _, hostID := range hostIDs {
var dH *engine.DispatcherHost
if dH, err = dm.GetDispatcherHost(tnt, hostID, true, true, utils.NonTransactional); err != nil {
err = utils.NewErrDispatcherS(err)
return
}
if err = conn.Call(serviceMethod, args, reply); utils.IsNetworkError(err) {
utils.Logger.Err(fmt.Sprintf("<%s> Network Error at %s strategy for connID %q : %s",
utils.DispatcherS, utils.MetaBroadcast, connID, err.Error()))
if err = dH.Call(serviceMethod, args, reply); utils.IsNetworkError(err) {
utils.Logger.Err(fmt.Sprintf("<%s> network error: <%s> at %s strategy for hostID %q",
utils.DispatcherS, err.Error(), utils.MetaBroadcast, hostID))
hasErrors = true
} else if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error at %s strategy for connID %q : %s",
utils.DispatcherS, utils.MetaBroadcast, connID, err.Error()))
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s> at %s strategy for hostID %q",
utils.DispatcherS, err.Error(), utils.MetaBroadcast, hostID))
hasErrors = true
}
}

View File

@@ -18,10 +18,12 @@ package engine
import (
"fmt"
"strings"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/ltcache"
"github.com/cgrates/rpcclient"
)
var (
@@ -1307,7 +1309,7 @@ func (dm *DataManager) RemoveDispatcherProfile(tenant, id string,
}
func (dm *DataManager) GetDispatcherHost(tenant, id string, cacheRead, cacheWrite bool,
transactionID string) (dpp *DispatcherHost, err error) {
transactionID string) (dH *DispatcherHost, err error) {
tntID := utils.ConcatenatedKey(tenant, id)
if cacheRead {
if x, ok := Cache.Get(utils.CacheDispatcherHosts, tntID); ok {
@@ -1317,7 +1319,7 @@ func (dm *DataManager) GetDispatcherHost(tenant, id string, cacheRead, cacheWrit
return x.(*DispatcherHost), nil
}
}
dpp, err = dm.dataDB.GetDispatcherHostDrv(tenant, id)
dH, err = dm.dataDB.GetDispatcherHostDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheDispatcherHosts, tntID, nil, nil,
@@ -1325,8 +1327,18 @@ func (dm *DataManager) GetDispatcherHost(tenant, id string, cacheRead, cacheWrit
}
return nil, err
}
cfg := config.CgrConfig()
if dH.rpcConn, err = NewRPCPool(
rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
dH.Conns, nil, time.Duration(0), false); err != nil {
return nil, err
}
if cacheWrite {
Cache.Set(utils.CacheDispatcherHosts, tntID, dpp, nil,
Cache.Set(utils.CacheDispatcherHosts, tntID, dH, nil,
cacheCommit(transactionID), transactionID)
}
return

View File

@@ -21,14 +21,13 @@ package engine
import (
"math/rand"
"sort"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
type DispatcherConn struct {
type DispatcherHostProfile struct {
ID string
FilterIDs []string
Weight float64 // applied in case of multiple connections need to be ordered
@@ -36,8 +35,8 @@ type DispatcherConn struct {
Blocker bool // no connection after this one
}
func (dC *DispatcherConn) Clone() (cln *DispatcherConn) {
cln = &DispatcherConn{
func (dC *DispatcherHostProfile) Clone() (cln *DispatcherHostProfile) {
cln = &DispatcherHostProfile{
ID: dC.ID,
Weight: dC.Weight,
Blocker: dC.Blocker,
@@ -57,46 +56,46 @@ func (dC *DispatcherConn) Clone() (cln *DispatcherConn) {
return
}
type DispatcherConns []*DispatcherConn
type DispatcherHostProfiles []*DispatcherHostProfile
// Sort is part of sort interface, sort based on Weight
func (dConns DispatcherConns) Sort() {
sort.Slice(dConns, func(i, j int) bool { return dConns[i].Weight > dConns[j].Weight })
func (dHPrfls DispatcherHostProfiles) Sort() {
sort.Slice(dHPrfls, func(i, j int) bool { return dHPrfls[i].Weight > dHPrfls[j].Weight })
}
// ReorderFromIndex will consider idx as starting point for the reordered slice
func (dConns DispatcherConns) ReorderFromIndex(idx int) {
initConns := dConns.Clone()
for i := 0; i < len(dConns); i++ {
if idx > len(dConns)-1 {
func (dHPrfls DispatcherHostProfiles) ReorderFromIndex(idx int) {
initConns := dHPrfls.Clone()
for i := 0; i < len(dHPrfls); i++ {
if idx > len(dHPrfls)-1 {
idx = 0
}
dConns[i] = initConns[idx]
dHPrfls[i] = initConns[idx]
idx++
}
return
}
// Shuffle will mix the connections in place
func (dConns DispatcherConns) Shuffle() {
rand.Shuffle(len(dConns), func(i, j int) {
dConns[i], dConns[j] = dConns[j], dConns[i]
func (dHPrfls DispatcherHostProfiles) Shuffle() {
rand.Shuffle(len(dHPrfls), func(i, j int) {
dHPrfls[i], dHPrfls[j] = dHPrfls[j], dHPrfls[i]
})
return
}
func (dConns DispatcherConns) Clone() (cln DispatcherConns) {
cln = make(DispatcherConns, len(dConns))
for i, dConn := range dConns {
cln[i] = dConn.Clone()
func (dHPrfls DispatcherHostProfiles) Clone() (cln DispatcherHostProfiles) {
cln = make(DispatcherHostProfiles, len(dHPrfls))
for i, dHPrfl := range dHPrfls {
cln[i] = dHPrfl.Clone()
}
return
}
func (dConns DispatcherConns) ConnIDs() (connIDs []string) {
connIDs = make([]string, len(dConns))
for i, conn := range dConns {
connIDs[i] = conn.ID
func (dHPrfls DispatcherHostProfiles) HostIDs() (hostIDs []string) {
hostIDs = make([]string, len(dHPrfls))
for i, hostPrfl := range dHPrfls {
hostIDs[i] = hostPrfl.ID
}
return
}
@@ -111,7 +110,7 @@ type DispatcherProfile struct {
Strategy string
StrategyParams map[string]interface{} // ie for distribution, set here the pool weights
Weight float64 // used for profile sorting on match
Conns DispatcherConns // dispatch to these connections
Hosts DispatcherHostProfiles // dispatch to these connections
}
func (dP *DispatcherProfile) TenantID() string {
@@ -126,19 +125,13 @@ func (dps DispatcherProfiles) Sort() {
sort.Slice(dps, func(i, j int) bool { return dps[i].Weight > dps[j].Weight })
}
<<<<<<< HEAD
type DispatcherHostConn struct {
Address string
Transport string
TLS bool
}
=======
// DispatcherHost represents one virtual host with po
>>>>>>> DispatcherHost.GetRPCConnection
// DispatcherHost represents one virtual host used by dispatcher
>>>>>>> Dispatcher with support for dynamic hosts
type DispatcherHost struct {
Tenant string
ID string
Conns []*config.HaPoolConfig
Conns []*config.RemoteHost
rpcConn rpcclient.RpcClientConnection
}
@@ -147,18 +140,9 @@ func (dH *DispatcherHost) TenantID() string {
}
// GetRPCConnection builds or returns the cached connection
func (dH *DispatcherHost) GetRPCConnection() (rpcConn rpcclient.RpcClientConnection, err error) {
func (dH *DispatcherHost) Call(serviceMethod string, args interface{}, reply interface{}) error {
if dH.rpcConn == nil {
cfg := config.CgrConfig()
if dH.rpcConn, err = NewRPCPool(
rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
dH.Conns, nil, time.Duration(0), false); err != nil {
return
}
return utils.ErrNotConnected
}
return dH.rpcConn, nil
return dH.rpcConn.Call(serviceMethod, args, reply)
}

View File

@@ -24,13 +24,13 @@ import (
"github.com/cgrates/cgrates/utils"
)
func TestDispatcherConnClone(t *testing.T) {
dConn := &DispatcherConn{
func TestDispatcherHostProfileClone(t *testing.T) {
dConn := &DispatcherHostProfile{
ID: "DSP_1",
Weight: 30,
FilterIDs: []string{"*string:Usage:10"},
}
eConn := &DispatcherConn{
eConn := &DispatcherHostProfile{
ID: "DSP_1",
Weight: 30,
FilterIDs: []string{"*string:Usage:10"},
@@ -42,13 +42,13 @@ func TestDispatcherConnClone(t *testing.T) {
}
}
func TestDispatcherConnsReorderFromIndex(t *testing.T) {
dConns := DispatcherConns{
func TestDispatcherHostProfilesReorderFromIndex(t *testing.T) {
dConns := DispatcherHostProfiles{
{ID: "DSP_1", Weight: 30},
{ID: "DSP_2", Weight: 20},
{ID: "DSP_3", Weight: 10},
}
eConns := DispatcherConns{
eConns := DispatcherHostProfiles{
{ID: "DSP_1", Weight: 30},
{ID: "DSP_2", Weight: 20},
{ID: "DSP_3", Weight: 10},
@@ -56,7 +56,7 @@ func TestDispatcherConnsReorderFromIndex(t *testing.T) {
if dConns.ReorderFromIndex(0); !reflect.DeepEqual(eConns, dConns) {
t.Errorf("expecting: %+v, received: %+v", eConns, dConns)
}
dConns = DispatcherConns{
dConns = DispatcherHostProfiles{
{ID: "DSP_1", Weight: 30},
{ID: "DSP_2", Weight: 20},
{ID: "DSP_3", Weight: 10},
@@ -64,12 +64,12 @@ func TestDispatcherConnsReorderFromIndex(t *testing.T) {
if dConns.ReorderFromIndex(3); !reflect.DeepEqual(eConns, dConns) {
t.Errorf("expecting: %+v, received: %+v", eConns, dConns)
}
dConns = DispatcherConns{
dConns = DispatcherHostProfiles{
{ID: "DSP_1", Weight: 30},
{ID: "DSP_2", Weight: 20},
{ID: "DSP_3", Weight: 10},
}
eConns = DispatcherConns{
eConns = DispatcherHostProfiles{
{ID: "DSP_3", Weight: 10},
{ID: "DSP_1", Weight: 30},
{ID: "DSP_2", Weight: 20},
@@ -77,12 +77,12 @@ func TestDispatcherConnsReorderFromIndex(t *testing.T) {
if dConns.ReorderFromIndex(2); !reflect.DeepEqual(eConns, dConns) {
t.Errorf("expecting: %+v, received: %+v", eConns, dConns)
}
dConns = DispatcherConns{
dConns = DispatcherHostProfiles{
{ID: "DSP_1", Weight: 30},
{ID: "DSP_2", Weight: 20},
{ID: "DSP_3", Weight: 10},
}
eConns = DispatcherConns{
eConns = DispatcherHostProfiles{
{ID: "DSP_2", Weight: 20},
{ID: "DSP_3", Weight: 10},
{ID: "DSP_1", Weight: 30},
@@ -93,13 +93,13 @@ func TestDispatcherConnsReorderFromIndex(t *testing.T) {
}
}
func TestDispatcherConnsShuffle(t *testing.T) {
dConns := DispatcherConns{
func TestDispatcherHostProfilesShuffle(t *testing.T) {
dConns := DispatcherHostProfiles{
{ID: "DSP_1", Weight: 30},
{ID: "DSP_2", Weight: 20},
{ID: "DSP_3", Weight: 10},
}
oConns := DispatcherConns{
oConns := DispatcherHostProfiles{
{ID: "DSP_1", Weight: 30},
{ID: "DSP_2", Weight: 20},
{ID: "DSP_3", Weight: 10},
@@ -110,13 +110,13 @@ func TestDispatcherConnsShuffle(t *testing.T) {
}
}
func TestDispatcherConnsSort(t *testing.T) {
dConns := DispatcherConns{
func TestDispatcherHostProfilesSort(t *testing.T) {
dConns := DispatcherHostProfiles{
{ID: "DSP_3", Weight: 10},
{ID: "DSP_2", Weight: 20},
{ID: "DSP_1", Weight: 30},
}
eConns := DispatcherConns{
eConns := DispatcherHostProfiles{
{ID: "DSP_1", Weight: 30},
{ID: "DSP_2", Weight: 20},
{ID: "DSP_3", Weight: 10},
@@ -124,14 +124,14 @@ func TestDispatcherConnsSort(t *testing.T) {
if dConns.Sort(); !reflect.DeepEqual(eConns, dConns) {
t.Errorf("expecting: %+v, received: %+v", utils.ToJSON(eConns), utils.ToJSON(dConns))
}
dConns = DispatcherConns{
dConns = DispatcherHostProfiles{
{ID: "DSP_3", Weight: 10},
{ID: "DSP_5", Weight: 50},
{ID: "DSP_2", Weight: 20},
{ID: "DSP_4", Weight: 40},
{ID: "DSP_1", Weight: 30},
}
eConns = DispatcherConns{
eConns = DispatcherHostProfiles{
{ID: "DSP_5", Weight: 50},
{ID: "DSP_4", Weight: 40},
{ID: "DSP_1", Weight: 30},
@@ -147,13 +147,13 @@ func TestDispatcherConnsSort(t *testing.T) {
}
}
func TestDispatcherConnsClone(t *testing.T) {
dConns := DispatcherConns{
func TestDispatcherHostProfilesClone(t *testing.T) {
dConns := DispatcherHostProfiles{
{ID: "DSP_1", Weight: 30},
{ID: "DSP_2", Weight: 20},
{ID: "DSP_3", Weight: 10, FilterIDs: []string{"*string:Usage:10"}},
}
eConns := DispatcherConns{
eConns := DispatcherHostProfiles{
{ID: "DSP_1", Weight: 30},
{ID: "DSP_2", Weight: 20},
{ID: "DSP_3", Weight: 10, FilterIDs: []string{"*string:Usage:10"}},
@@ -165,8 +165,8 @@ func TestDispatcherConnsClone(t *testing.T) {
}
}
func TestDispatcherConnsConnIDs(t *testing.T) {
dConns := DispatcherConns{
func TestDispatcherHostProfilesConnIDs(t *testing.T) {
dConns := DispatcherHostProfiles{
{ID: "DSP_5", Weight: 50},
{ID: "DSP_4", Weight: 40},
{ID: "DSP_1", Weight: 30},
@@ -174,7 +174,7 @@ func TestDispatcherConnsConnIDs(t *testing.T) {
{ID: "DSP_3", Weight: 10},
}
eConnIDs := []string{"DSP_5", "DSP_4", "DSP_1", "DSP_2", "DSP_3"}
if dConnIDs := dConns.ConnIDs(); !reflect.DeepEqual(eConnIDs, dConnIDs) {
if dConnIDs := dConns.HostIDs(); !reflect.DeepEqual(eConnIDs, dConnIDs) {
t.Errorf("expecting: %+v, received: %+v", utils.ToJSON(eConnIDs), utils.ToJSON(dConnIDs))
}
}

View File

@@ -1580,15 +1580,15 @@ func TestLoadDispatcherProfiles(t *testing.T) {
},
Strategy: "*first",
Weight: 20,
Conns: []*utils.TPDispatcherConns{
&utils.TPDispatcherConns{
Hosts: []*utils.TPDispatcherHostProfile{
&utils.TPDispatcherHostProfile{
ID: "C1",
FilterIDs: []string{"*gt:Usage:10"},
Weight: 10,
Params: []interface{}{"192.168.56.203"},
Blocker: false,
},
&utils.TPDispatcherConns{
&utils.TPDispatcherHostProfile{
ID: "C2",
FilterIDs: []string{"*lt:Usage:10"},
Weight: 10,
@@ -1608,15 +1608,15 @@ func TestLoadDispatcherProfiles(t *testing.T) {
},
Strategy: "*first",
Weight: 20,
Conns: []*utils.TPDispatcherConns{
&utils.TPDispatcherConns{
Hosts: []*utils.TPDispatcherHostProfile{
&utils.TPDispatcherHostProfile{
ID: "C2",
FilterIDs: []string{"*lt:Usage:10"},
Weight: 10,
Params: []interface{}{"192.168.56.204"},
Blocker: false,
},
&utils.TPDispatcherConns{
&utils.TPDispatcherHostProfile{
ID: "C1",
FilterIDs: []string{"*gt:Usage:10"},
Weight: 10,

View File

@@ -2352,7 +2352,7 @@ func (tps TPDispatcherProfiles) AsTPDispatcherProfiles() (result []*utils.TPDisp
mst := make(map[string]*utils.TPDispatcherProfile)
filterMap := make(map[string]utils.StringMap)
contextMap := make(map[string]utils.StringMap)
connsMap := make(map[string]map[string]utils.TPDispatcherConns)
connsMap := make(map[string]map[string]utils.TPDispatcherHostProfile)
connsFilterMap := make(map[string]map[string]utils.StringMap)
for _, tp := range tps {
tenantID := (&utils.TenantID{Tenant: tp.Tenant, ID: tp.ID}).TenantID()
@@ -2402,11 +2402,11 @@ func (tps TPDispatcherProfiles) AsTPDispatcherProfiles() (result []*utils.TPDisp
}
if tp.ConnID != "" {
if _, has := connsMap[tenantID]; !has {
connsMap[tenantID] = make(map[string]utils.TPDispatcherConns)
connsMap[tenantID] = make(map[string]utils.TPDispatcherHostProfile)
}
conn, has := connsMap[tenantID][tp.ConnID]
if !has {
conn = utils.TPDispatcherConns{
conn = utils.TPDispatcherHostProfile{
ID: tp.ConnID,
Weight: tp.ConnWeight,
Blocker: tp.ConnBlocker,
@@ -2454,13 +2454,14 @@ func (tps TPDispatcherProfiles) AsTPDispatcherProfiles() (result []*utils.TPDisp
conn.FilterIDs = append(conn.FilterIDs, filter)
}
}
result[i].Conns = append(result[i].Conns, &utils.TPDispatcherConns{
ID: conn.ID,
FilterIDs: conn.FilterIDs,
Weight: conn.Weight,
Params: conn.Params,
Blocker: conn.Blocker,
})
result[i].Hosts = append(result[i].Hosts,
&utils.TPDispatcherHostProfile{
ID: conn.ID,
FilterIDs: conn.FilterIDs,
Weight: conn.Weight,
Params: conn.Params,
Blocker: conn.Blocker,
})
}
i++
}
@@ -2497,7 +2498,7 @@ func APItoModelTPDispatcherProfile(tpDPP *utils.TPDispatcherProfile) (mdls TPDis
strategy := paramsToString(tpDPP.StrategyParams)
if len(tpDPP.Conns) == 0 {
if len(tpDPP.Hosts) == 0 {
return append(mdls, &TPDispatcherProfile{
Tpid: tpDPP.TPid,
Tenant: tpDPP.Tenant,
@@ -2511,8 +2512,8 @@ func APItoModelTPDispatcherProfile(tpDPP *utils.TPDispatcherProfile) (mdls TPDis
})
}
confilter := strings.Join(tpDPP.Conns[0].FilterIDs, utils.INFIELD_SEP)
conparam := paramsToString(tpDPP.Conns[0].Params)
confilter := strings.Join(tpDPP.Hosts[0].FilterIDs, utils.INFIELD_SEP)
conparam := paramsToString(tpDPP.Hosts[0].Params)
mdls = append(mdls, &TPDispatcherProfile{
Tpid: tpDPP.TPid,
@@ -2525,24 +2526,24 @@ func APItoModelTPDispatcherProfile(tpDPP *utils.TPDispatcherProfile) (mdls TPDis
StrategyParameters: strategy,
Weight: tpDPP.Weight,
ConnID: tpDPP.Conns[0].ID,
ConnID: tpDPP.Hosts[0].ID,
ConnFilterIDs: confilter,
ConnWeight: tpDPP.Conns[0].Weight,
ConnBlocker: tpDPP.Conns[0].Blocker,
ConnWeight: tpDPP.Hosts[0].Weight,
ConnBlocker: tpDPP.Hosts[0].Blocker,
ConnParameters: conparam,
})
for i := 1; i < len(tpDPP.Conns); i++ {
confilter = strings.Join(tpDPP.Conns[i].FilterIDs, utils.INFIELD_SEP)
conparam = paramsToString(tpDPP.Conns[i].Params)
for i := 1; i < len(tpDPP.Hosts); i++ {
confilter = strings.Join(tpDPP.Hosts[i].FilterIDs, utils.INFIELD_SEP)
conparam = paramsToString(tpDPP.Hosts[i].Params)
mdls = append(mdls, &TPDispatcherProfile{
Tpid: tpDPP.TPid,
Tenant: tpDPP.Tenant,
ID: tpDPP.ID,
ConnID: tpDPP.Conns[i].ID,
ConnID: tpDPP.Hosts[i].ID,
ConnFilterIDs: confilter,
ConnWeight: tpDPP.Conns[i].Weight,
ConnBlocker: tpDPP.Conns[i].Blocker,
ConnWeight: tpDPP.Hosts[i].Weight,
ConnBlocker: tpDPP.Hosts[i].Blocker,
ConnParameters: conparam,
})
}
@@ -2558,7 +2559,7 @@ func APItoDispatcherProfile(tpDPP *utils.TPDispatcherProfile, timezone string) (
FilterIDs: make([]string, len(tpDPP.FilterIDs)),
Subsystems: make([]string, len(tpDPP.Subsystems)),
StrategyParams: make(map[string]interface{}),
Conns: make(DispatcherConns, len(tpDPP.Conns)),
Hosts: make(DispatcherHostProfiles, len(tpDPP.Hosts)),
}
for i, fli := range tpDPP.FilterIDs {
dpp.FilterIDs[i] = fli
@@ -2571,8 +2572,8 @@ func APItoDispatcherProfile(tpDPP *utils.TPDispatcherProfile, timezone string) (
dpp.StrategyParams[strconv.Itoa(i)] = param
}
}
for i, conn := range tpDPP.Conns {
dpp.Conns[i] = &DispatcherConn{
for i, conn := range tpDPP.Hosts {
dpp.Hosts[i] = &DispatcherHostProfile{
ID: conn.ID,
Weight: conn.Weight,
Blocker: conn.Blocker,
@@ -2580,11 +2581,11 @@ func APItoDispatcherProfile(tpDPP *utils.TPDispatcherProfile, timezone string) (
Params: make(map[string]interface{}),
}
for j, fltr := range conn.FilterIDs {
dpp.Conns[i].FilterIDs[j] = fltr
dpp.Hosts[i].FilterIDs[j] = fltr
}
for j, param := range conn.Params {
if param != "" {
dpp.Conns[i].Params[strconv.Itoa(j)] = param
dpp.Hosts[i].Params[strconv.Itoa(j)] = param
}
}
}
@@ -2661,10 +2662,10 @@ func APItoDispatcherHost(tpDPH *utils.TPDispatcherHost) (dpp *DispatcherHost) {
dpp = &DispatcherHost{
Tenant: tpDPH.Tenant,
ID: tpDPH.ID,
Conns: make([]*config.HaPoolConfig, len(tpDPH.Conns)),
Conns: make([]*config.RemoteHost, len(tpDPH.Conns)),
}
for i, conn := range tpDPH.Conns {
dpp.Conns[i] = &config.HaPoolConfig{
dpp.Conns[i] = &config.RemoteHost{
Address: conn.Address,
Transport: conn.Transport,
TLS: conn.TLS,

View File

@@ -1639,8 +1639,8 @@ func TestAPItoDispatcherProfile(t *testing.T) {
},
StrategyParams: []interface{}{},
Weight: 20,
Conns: []*utils.TPDispatcherConns{
&utils.TPDispatcherConns{
Hosts: []*utils.TPDispatcherHostProfile{
&utils.TPDispatcherHostProfile{
ID: "C1",
FilterIDs: []string{},
Weight: 10,
@@ -1661,8 +1661,8 @@ func TestAPItoDispatcherProfile(t *testing.T) {
},
StrategyParams: map[string]interface{}{},
Weight: 20,
Conns: DispatcherConns{
&DispatcherConn{
Hosts: DispatcherHostProfiles{
&DispatcherHostProfile{
ID: "C1",
FilterIDs: []string{},
Weight: 10,
@@ -1692,15 +1692,15 @@ func TestAPItoModelTPDispatcher(t *testing.T) {
},
StrategyParams: []interface{}{},
Weight: 20,
Conns: []*utils.TPDispatcherConns{
&utils.TPDispatcherConns{
Hosts: []*utils.TPDispatcherHostProfile{
&utils.TPDispatcherHostProfile{
ID: "C1",
FilterIDs: []string{},
Weight: 10,
Params: []interface{}{"192.168.54.203"},
Blocker: false,
},
&utils.TPDispatcherConns{
&utils.TPDispatcherHostProfile{
ID: "C2",
FilterIDs: []string{},
Weight: 10,

View File

@@ -1152,14 +1152,14 @@ cgrates.org,EVENT1,,,,,,ALL,,10,,,
Strategy: "*weight",
StrategyParams: map[string]interface{}{},
Weight: 20,
Conns: engine.DispatcherConns{
&engine.DispatcherConn{
Hosts: engine.DispatcherHostProfiles{
&engine.DispatcherHostProfile{
ID: "ALL2",
FilterIDs: make([]string, 0),
Weight: 20,
Params: map[string]interface{}{},
},
&engine.DispatcherConn{
&engine.DispatcherHostProfile{
ID: "ALL",
FilterIDs: make([]string, 0),
Weight: 10,
@@ -1173,8 +1173,8 @@ cgrates.org,EVENT1,,,,,,ALL,,10,,,
if err != nil {
t.Fatal(err)
}
rcv.Conns.Sort()
eDisp.Conns.Sort()
rcv.Hosts.Sort()
eDisp.Hosts.Sort()
if !reflect.DeepEqual(eDisp, rcv) {
t.Errorf("expecting: %+v, received: %+v", utils.ToJSON(eDisp), utils.ToJSON(rcv))
}

View File

@@ -1182,15 +1182,6 @@ type TPTntID struct {
ID string
}
// TPDispatcherConns is used in TPDispatcherProfile
type TPDispatcherConns struct {
ID string
FilterIDs []string
Weight float64 // applied in case of multiple connections need to be ordered
Params []interface{} // additional parameters stored for a session
Blocker bool // no connection after this one
}
// TPDispatcherProfile is used in APIs to manage remotely offline DispatcherProfile
type TPDispatcherProfile struct {
TPid string
@@ -1202,7 +1193,24 @@ type TPDispatcherProfile struct {
Strategy string
StrategyParams []interface{} // ie for distribution, set here the pool weights
Weight float64
Conns []*TPDispatcherConns
Hosts []*TPDispatcherHostProfile
}
// TPDispatcherHostProfile is used in TPDispatcherProfile
type TPDispatcherHostProfile struct {
ID string
FilterIDs []string
Weight float64 // applied in case of multiple connections need to be ordered
Params []interface{} // additional parameters stored for a session
Blocker bool // no connection after this one
}
// TPDispatcherHost is used in APIs to manage remotely offline DispatcherHost
type TPDispatcherHost struct {
TPid string
Tenant string
ID string
Conns []*TPDispatcherHostConn
}
// TPDispatcherHostConn is used in TPDispatcherHost
@@ -1212,14 +1220,6 @@ type TPDispatcherHostConn struct {
TLS bool
}
// TPDispatcherHostTPDispatcherHost is used in APIs to manage remotely offline DispatcherHost
type TPDispatcherHost struct {
TPid string
Tenant string
ID string
Conns []*TPDispatcherHostConn
}
type UsageInterval struct {
Min *time.Duration
Max *time.Duration

View File

@@ -74,6 +74,7 @@ var (
ErrCDRCNoProfileID = errors.New("CDRC_PROFILE_WITHOUT_ID")
ErrCDRCNoInDir = errors.New("CDRC_PROFILE_WITHOUT_IN_DIR")
ErrNotEnoughParameters = errors.New("NotEnoughParameters")
ErrNotConnected = errors.New("NOT_CONNECTED")
RalsErrorPrfx = "RALS_ERROR"
DispatcherErrorPrefix = "DISPATCHER_ERROR"
)