Added *load as dispatcher strategy

This commit is contained in:
Trial97
2019-08-06 14:12:44 +03:00
committed by Dan Christian Bogos
parent 8a236165f4
commit 8b0cef1b93

View File

@@ -20,6 +20,7 @@ package dispatchers
import (
"fmt"
"sort"
"sync"
"github.com/cgrates/cgrates/engine"
@@ -77,6 +78,13 @@ func newDispatcher(dm *engine.DataManager, pfl *engine.DispatcherProfile) (d Dis
hosts: pfl.Hosts.Clone(),
strategy: new(brodcastStrategyDispatcher),
}
case utils.MetaLoad:
d = &WeightDispatcher{
dm: dm,
tnt: pfl.Tenant,
hosts: pfl.Hosts.Clone(),
strategy: &loadStrategyDispatcher{hostsLoad: make(map[string]int64)},
}
default:
err = fmt.Errorf("unsupported dispatch strategy: <%s>", pfl.Strategy)
}
@@ -269,3 +277,67 @@ func (_ *brodcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID *s
}
return
}
type loadStrategyDispatcher struct {
sync.RWMutex
hostsLoad map[string]int64
}
func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID *string, subsystem, tnt string, hostIDs []string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
var dH *engine.DispatcherHost
if routeID != nil && *routeID != "" {
// overwrite routeID with RouteID:Subsystem
*routeID = utils.ConcatenatedKey(*routeID, subsystem)
// use previously discovered route
if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes,
*routeID); ok && x != nil {
dH = x.(*engine.DispatcherHost)
ld.incrementLoad(dH.ID)
err = dH.Call(serviceMethod, args, reply)
ld.decrementLoad(dH.ID) // call ended
if !utils.IsNetworkError(err) {
return
}
}
}
for _, hostID := range ld.getHosts(hostIDs) {
if dH, err = dm.GetDispatcherHost(tnt, hostID, true, true, utils.NonTransactional); err != nil {
err = utils.NewErrDispatcherS(err)
return
}
ld.incrementLoad(hostID)
err = dH.Call(serviceMethod, args, reply)
ld.decrementLoad(hostID) // call ended
if utils.IsNetworkError(err) {
continue
}
if routeID != nil && *routeID != "" { // cache the discovered route
engine.Cache.Set(utils.CacheDispatcherRoutes, *routeID, dH,
nil, true, utils.EmptyString)
}
break
}
return
}
func (ld *loadStrategyDispatcher) getHosts(hostIDs []string) []string {
ld.RLock()
sort.Slice(hostIDs, func(i, j int) bool {
return ld.hostsLoad[hostIDs[i]] < ld.hostsLoad[hostIDs[j]]
})
ld.RUnlock()
return hostIDs
}
func (ld *loadStrategyDispatcher) incrementLoad(hostID string) {
ld.Lock()
ld.hostsLoad[hostID] += 1
ld.Unlock()
}
func (ld *loadStrategyDispatcher) decrementLoad(hostID string) {
ld.Lock()
ld.hostsLoad[hostID] -= 1
ld.Unlock()
}