Updated DispatcherS *broadcast, *broadcast_sync and *broadcast_async to behave similar to RPCPool

This commit is contained in:
Trial97
2020-11-20 13:26:00 +02:00
committed by Dan Christian Bogos
parent 9acf0582d2
commit 3c06f49246
13 changed files with 33 additions and 97 deletions

View File

@@ -9,7 +9,7 @@ cgrates.org,EVENT2,*any,*string:~*req.EventName:RoundRobin,,*round_robin,,ALL2,,
cgrates.org,EVENT2,,,,,,ALL,,10,,,
cgrates.org,EVENT3,*any,*string:~*req.EventName:Random,,*random,,ALL2,,20,false,,20
cgrates.org,EVENT3,,,,,,ALL,,10,,,
cgrates.org,EVENT4,*any,*string:~*req.EventName:Broadcast,,*broadcast,,ALL2,,20,false,,20
cgrates.org,EVENT4,*any,*string:~*req.EventName:Broadcast,,*broadcast_sync,,ALL2,,20,false,,20
cgrates.org,EVENT4,,,,,,ALL,,10,,,
cgrates.org,EVENT5,*any,*string:~*req.EventName:Internal,,*weight,,SELF,,20,false,,20
cgrates.org,EVENT6,*any,*string:~*opts.*method:DispatcherSv1.GetProfileForEvent,,*weight,,SELF,,20,false,,20
1 #Tenant ID Subsystems FilterIDs ActivationInterval Strategy StrategyParameters ConnID ConnFilterIDs ConnWeight ConnBlocker ConnParameters Weight
9 cgrates.org EVENT2 ALL 10
10 cgrates.org EVENT3 *any *string:~*req.EventName:Random *random ALL2 20 false 20
11 cgrates.org EVENT3 ALL 10
12 cgrates.org EVENT4 *any *string:~*req.EventName:Broadcast *broadcast *broadcast_sync ALL2 20 false 20
13 cgrates.org EVENT4 ALL 10
14 cgrates.org EVENT5 *any *string:~*req.EventName:Internal *weight SELF 20 false 20
15 cgrates.org EVENT6 *any *string:~*opts.*method:DispatcherSv1.GetProfileForEvent *weight SELF 20 false 20

View File

@@ -9,7 +9,7 @@ cgrates.org,EVENT2,*any,*string:~*req.EventName:RoundRobin,,*round_robin,,ALL2,,
cgrates.org,EVENT2,,,,,,ALL,,10,,,
cgrates.org,EVENT3,*any,*string:~*req.EventName:Random,,*random,,ALL2,,20,false,,20
cgrates.org,EVENT3,,,,,,ALL,,10,,,
cgrates.org,EVENT4,*any,*string:~*req.EventName:Broadcast,,*broadcast,,ALL2,,20,false,,20
cgrates.org,EVENT4,*any,*string:~*req.EventName:Broadcast,,*broadcast_sync,,ALL2,,20,false,,20
cgrates.org,EVENT4,,,,,,ALL,,10,,,
cgrates.org,EVENT5,*any,*string:~*req.EventName:Internal,,*weight,,SELF,,20,false,,20
cgrates.org,EVENT6,*any,*string:~*opts.*method:DispatcherSv1.GetProfileForEvent,,*weight,,SELF,,20,false,,20
1 #Tenant ID Subsystems FilterIDs ActivationInterval Strategy StrategyParameters ConnID ConnFilterIDs ConnWeight ConnBlocker ConnParameters Weight
9 cgrates.org EVENT2 ALL 10
10 cgrates.org EVENT3 *any *string:~*req.EventName:Random *random ALL2 20 false 20
11 cgrates.org EVENT3 ALL 10
12 cgrates.org EVENT4 *any *string:~*req.EventName:Broadcast *broadcast *broadcast_sync ALL2 20 false 20
13 cgrates.org EVENT4 ALL 10
14 cgrates.org EVENT5 *any *string:~*req.EventName:Internal *weight SELF 20 false 20
15 cgrates.org EVENT6 *any *string:~*opts.*method:DispatcherSv1.GetProfileForEvent *weight SELF 20 false 20

View File

@@ -1,4 +1,4 @@
#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight
cgrates.org,BROADCAST,*sessions,,,*broadcast,,AU_SITE,,20,false,,30
cgrates.org,BROADCAST,*sessions,,,*broadcast_sync,,AU_SITE,,20,false,,30
cgrates.org,BROADCAST,,,,,,US_SITE,,20,,,
cgrates.org,SELF,*any,,,*weight,,SELF,,20,false,,10
1 #Tenant ID Subsystems FilterIDs ActivationInterval Strategy StrategyParameters ConnID ConnFilterIDs ConnWeight ConnBlocker ConnParameters Weight
2 cgrates.org BROADCAST *sessions *broadcast *broadcast_sync AU_SITE 20 false 30
3 cgrates.org BROADCAST US_SITE 20
4 cgrates.org SELF *any *weight SELF 20 false 10

View File

@@ -7,6 +7,6 @@ cgrates.org,EVENT2,*any,*string:~*req.EventName:RoundRobin,,*round_robin,,ALL2,,
cgrates.org,EVENT2,,,,,,ALL,,10,,,
cgrates.org,EVENT3,*any,*string:~*req.EventName:Random,,*random,,ALL2,,20,false,,20
cgrates.org,EVENT3,,,,,,ALL,,10,,,
cgrates.org,EVENT4,*any,*string:~*req.EventName:Broadcast,,*broadcast,,ALL2,,20,false,,20
cgrates.org,EVENT4,*any,*string:~*req.EventName:Broadcast,,*broadcast_sync,,ALL2,,20,false,,20
cgrates.org,EVENT4,,,,,,ALL,,10,,,
cgrates.org,EVENT5,*any,*string:~*req.EventName:Internal,,*weight,,SELF,,20,false,,20
1 #Tenant ID Subsystems FilterIDs ActivationInterval Strategy StrategyParameters ConnID ConnFilterIDs ConnWeight ConnBlocker ConnParameters Weight
7 cgrates.org EVENT2 ALL 10
8 cgrates.org EVENT3 *any *string:~*req.EventName:Random *random ALL2 20 false 20
9 cgrates.org EVENT3 ALL 10
10 cgrates.org EVENT4 *any *string:~*req.EventName:Broadcast *broadcast *broadcast_sync ALL2 20 false 20
11 cgrates.org EVENT4 ALL 10
12 cgrates.org EVENT5 *any *string:~*req.EventName:Internal *weight SELF 20 false 20

View File

@@ -24,8 +24,10 @@ import (
"sort"
"sync"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
func init() {
@@ -93,12 +95,14 @@ func newDispatcher(dm *engine.DataManager, pfl *engine.DispatcherProfile) (d Dis
hosts: hosts,
strategy: strDsp,
}
case utils.MetaBroadcast:
case rpcclient.PoolBroadcast,
rpcclient.PoolBroadcastSync,
rpcclient.PoolBroadcastAsync:
d = &WeightDispatcher{
dm: dm,
tnt: pfl.Tenant,
hosts: hosts,
strategy: new(brodcastStrategyDispatcher),
strategy: &broadcastStrategyDispatcher{strategy: pfl.Strategy},
}
default:
err = fmt.Errorf("unsupported dispatch strategy: <%s>", pfl.Strategy)
@@ -223,7 +227,7 @@ func (*singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID
if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes,
routeID); ok && x != nil {
dH = x.(*engine.DispatcherHost)
if err = dH.Call(serviceMethod, args, reply); !utils.IsNetworkError(err) {
if err = dH.Call(serviceMethod, args, reply); !rpcclient.IsNetworkError(err) {
return
}
}
@@ -241,7 +245,7 @@ func (*singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID
return
}
called = true
if err = dH.Call(serviceMethod, args, reply); utils.IsNetworkError(err) {
if err = dH.Call(serviceMethod, args, reply); rpcclient.IsNetworkError(err) {
continue
}
if routeID != utils.EmptyString { // cache the discovered route
@@ -259,12 +263,14 @@ func (*singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID
return
}
type brodcastStrategyDispatcher struct{}
type broadcastStrategyDispatcher struct {
strategy string
}
func (*brodcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID string, subsystem, tnt string, hostIDs []string,
func (b *broadcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID string, subsystem, tnt string, hostIDs []string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
var hasErrors bool
var called bool
var hasHosts bool
pool := rpcclient.NewRPCPool(b.strategy, config.CgrConfig().GeneralCfg().ReplyTimeout)
for _, hostID := range hostIDs {
var dH *engine.DispatcherHost
if dH, err = dm.GetDispatcherHost(tnt, hostID, true, true, utils.NonTransactional); err != nil {
@@ -274,27 +280,15 @@ func (*brodcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID stri
err = nil
continue
}
err = utils.NewErrDispatcherS(err)
return
}
called = true
if err = dH.Call(serviceMethod, args, reply); utils.IsNetworkError(err) {
utils.Logger.Err(fmt.Sprintf("<%s> network error: <%s> at %s strategy for hostID %q",
utils.DispatcherS, err.Error(), utils.MetaBroadcast, hostID))
hasErrors = true
} else if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s> at %s strategy for hostID %q",
utils.DispatcherS, err.Error(), utils.MetaBroadcast, hostID))
hasErrors = true
return utils.NewErrDispatcherS(err)
}
hasHosts = true
pool.AddClient(dH)
}
if hasErrors { // rewrite err if not all call were succesfull
return utils.ErrPartiallyExecuted
} else if !called { // in case we do not match any host
err = utils.ErrHostNotFound
return
if !hasHosts { // in case we do not match any host
return utils.ErrHostNotFound
}
return
return pool.Call(serviceMethod, args, reply)
}
func newSingleStrategyDispatcher(hosts engine.DispatcherHostProfiles, params map[string]interface{}, tntID string) (ls strategyDispatcher, err error) {
@@ -374,7 +368,7 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID strin
lM.incrementLoad(dH.ID, ld.tntID)
err = dH.Call(serviceMethod, args, reply)
lM.decrementLoad(dH.ID, ld.tntID) // call ended
if !utils.IsNetworkError(err) {
if !rpcclient.IsNetworkError(err) {
return
}
}
@@ -395,7 +389,7 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID strin
lM.incrementLoad(hostID, ld.tntID)
err = dH.Call(serviceMethod, args, reply)
lM.decrementLoad(hostID, ld.tntID) // call ended
if utils.IsNetworkError(err) {
if rpcclient.IsNetworkError(err) {
continue
}
if routeID != utils.EmptyString { // cache the discovered route

View File

@@ -25,6 +25,7 @@ import (
"testing"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
var sTestsDspRsp = []func(t *testing.T){
@@ -193,7 +194,7 @@ func testDspResponderBroadcast(t *testing.T) {
allEngine.stopEngine(t)
pingReply = ""
if err := dispEngine.RPC.Call(utils.ResponderPing, pingEv, &pingReply); err == nil ||
err.Error() != utils.ErrPartiallyExecuted.Error() {
!rpcclient.IsNetworkError(err) {
t.Errorf("Expected error: %s received error: %v and reply %q", utils.ErrPartiallyExecuted.Error(), err, pingReply)
}
allEngine.startEngine(t)

View File

@@ -136,7 +136,7 @@ func (cM *ConnManager) Call(connIDs []string, biRPCClient rpcclient.ClientConnec
if conn, err = cM.getConn(connID, biRPCClient); err != nil {
continue
}
if err = conn.Call(method, arg, reply); utils.IsNetworkError(err) {
if err = conn.Call(method, arg, reply); rpcclient.IsNetworkError(err) {
continue
} else {
return

2
go.mod
View File

@@ -25,7 +25,7 @@ require (
github.com/cgrates/kamevapi v0.0.0-20191001125829-7dbc3ad58817
github.com/cgrates/ltcache v0.0.0-20181016092649-92fb7fa77cca
github.com/cgrates/radigo v0.0.0-20201113143731-162035428d72
github.com/cgrates/rpcclient v0.0.0-20201106122207-1023f17a2b43
github.com/cgrates/rpcclient v0.0.0-20201120095908-1c0f9f4bb06e
github.com/cgrates/sipingo v1.0.1-0.20200514112313-699ebc1cdb8e
github.com/cgrates/ugocodec v0.0.0-20201023092048-df93d0123f60
github.com/creack/pty v1.1.11

8
go.sum
View File

@@ -87,8 +87,6 @@ github.com/cenkalti/hub v1.0.1/go.mod h1:tcYwtS3a2d9NO/0xDXVJWx3IedurUjYCqFCmpi0
github.com/cenkalti/rpc2 v0.0.0-20200203073230-5ce2854ce0fd h1:rGgK+sDUQPC/R+Uc1y6uFnsFpwZyqpFBg+0AtFORFjk=
github.com/cenkalti/rpc2 v0.0.0-20200203073230-5ce2854ce0fd/go.mod h1:v2npkhrXyk5BCnkNIiPdRI23Uq6uWPUQGL2hnRcRr/M=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cgrates/aringo v0.0.0-20191121125609-d85002bd1667 h1:eNku7bwLtSTpn6FQUUTnqohuGWC8jij1KNqDZ2QkGRE=
github.com/cgrates/aringo v0.0.0-20191121125609-d85002bd1667/go.mod h1:l0xi5JUVqxL4P7RZ9YitbSCiOtjMY2j7JBOCJIysRWk=
github.com/cgrates/aringo v0.0.0-20201113143849-3b299e4e636d h1:1PLz/t3XZy5KF8EY/ShzBZoVLaY50+tnAbE1wu8rCfg=
github.com/cgrates/aringo v0.0.0-20201113143849-3b299e4e636d/go.mod h1:mMAzSIjK11XfRMrOIa7DXYl64REdPldRCbAgzKB47XQ=
github.com/cgrates/baningo v0.0.0-20201105145354-6e3173f6a91b h1:9IX5Z3Tw7n2QrY7GLGGpqjjC/NVSJvQ7nxLkC2JP4vw=
@@ -101,12 +99,10 @@ github.com/cgrates/kamevapi v0.0.0-20191001125829-7dbc3ad58817 h1:1Tdv6H/usqmkQV
github.com/cgrates/kamevapi v0.0.0-20191001125829-7dbc3ad58817/go.mod h1:pgHqPlPcEDIQbbs9wyBk7YZTcaVdxMqf3v04XU+mngI=
github.com/cgrates/ltcache v0.0.0-20181016092649-92fb7fa77cca h1:Ejj4m0Ccl8dMMVnoHk4nQMlbR3w24llqQDy66DO9E0A=
github.com/cgrates/ltcache v0.0.0-20181016092649-92fb7fa77cca/go.mod h1:q7c996DUu8OrJRnewVSQzM+y/bRcxZAHoo+zCD8bFBo=
github.com/cgrates/radigo v0.0.0-20200324152710-35e651804ad1 h1:QvA6Nbwq9kTd7hsvu1xo5kwia68cLwnp0sYCq1u40TU=
github.com/cgrates/radigo v0.0.0-20200324152710-35e651804ad1/go.mod h1:HZbsg3Y+xw4lsfCqX9rzj429wrg0XOug6pFT3B6VHZY=
github.com/cgrates/radigo v0.0.0-20201113143731-162035428d72 h1:cTAWQEbab3gKkDSeaxkTaoiP/cNFx+7/kC96wYckk3g=
github.com/cgrates/radigo v0.0.0-20201113143731-162035428d72/go.mod h1:3IDSbfIqU5VsYKjrwa3HhuAK1jlI65wa1coHetoaN20=
github.com/cgrates/rpcclient v0.0.0-20201106122207-1023f17a2b43 h1:DEqW3NSJkL5au9kr7knI+ExPqKtn79oKE+20CftDteo=
github.com/cgrates/rpcclient v0.0.0-20201106122207-1023f17a2b43/go.mod h1:+QZt2Af6g8UScM5NWjAwn0CyvLjgVgkZPJyAJqQgXoQ=
github.com/cgrates/rpcclient v0.0.0-20201120095908-1c0f9f4bb06e h1:t1oSABmZYacJiB+JrJQVGFqkv+7ekgOo/Jj/KdoM5mI=
github.com/cgrates/rpcclient v0.0.0-20201120095908-1c0f9f4bb06e/go.mod h1:+QZt2Af6g8UScM5NWjAwn0CyvLjgVgkZPJyAJqQgXoQ=
github.com/cgrates/sipingo v1.0.1-0.20200514112313-699ebc1cdb8e h1:izFjZB83/XRXInc+gMIssUxdbleGsGIuGCPj2u7RQo0=
github.com/cgrates/sipingo v1.0.1-0.20200514112313-699ebc1cdb8e/go.mod h1:0f2+3dq5Iiv3VlcuY83VPJ0QzqRlzDG1Cr8okogQE3g=
github.com/cgrates/ugocodec v0.0.0-20201023092048-df93d0123f60 h1:TQDg+HGB17LU8FitLiLvYazYSy62GQ1lO3lGKI3xUrU=

View File

@@ -131,6 +131,7 @@ cgrates (0.11.0~dev) UNRELEASED; urgency=medium
* [CDRs] Replaced RSRField with RSRParser
* [RouteS] Add new field RouteRateProfileIDs in RateProfiles.csv
* [DispatcherS] Removed connection pool from DispatcherHost structure
* [DispatcherS] Updated *broadcast, *broadcast_sync and *broadcast_async to behave similar to RPCPool
-- DanB <danb@cgrates.org> Wed, 19 Feb 2020 13:25:52 +0200

View File

@@ -1070,7 +1070,6 @@ const (
const (
MetaFirst = "*first"
MetaRandom = "*random"
MetaBroadcast = "*broadcast"
MetaRoundRobin = "*round_robin"
MetaRatio = "*ratio"
MetaDefaultRatio = "*default_ratio"

View File

@@ -21,8 +21,6 @@ package utils
import (
"errors"
"fmt"
"net"
"net/rpc"
"strings"
)
@@ -251,26 +249,6 @@ func ErrEnvNotFound(key string) error {
return ErrPrefix(ErrNotFound, "ENV_VAR:"+key)
}
// IsNetworkError will decide if an error is network generated or RPC one
// used by Dispatcher to figure out whether it should try another connection
func IsNetworkError(err error) bool {
if err == nil {
return false
}
if _, isNetError := err.(*net.OpError); isNetError { // connection reset
return true
}
if _, isDNSError := err.(*net.DNSError); isDNSError {
return true
}
return err.Error() == rpc.ErrShutdown.Error() ||
err.Error() == ErrReqUnsynchronized.Error() ||
err.Error() == ErrDisconnected.Error() ||
err.Error() == ErrReplyTimeout.Error() ||
err.Error() == ErrSessionNotFound.Error() ||
strings.HasPrefix(err.Error(), "rpc: can't find service")
}
func ErrPathNotReachable(path string) error {
return fmt.Errorf("path:%+q is not reachable", path)
}

View File

@@ -18,10 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package utils
import (
"fmt"
"net"
"reflect"
"syscall"
"testing"
)
@@ -193,36 +190,6 @@ func TestErrEnvNotFound(t *testing.T) {
}
}
func TestIsNetworkError(t *testing.T) {
if IsNetworkError(nil) {
t.Errorf("Expecting: false, received: true")
}
if !IsNetworkError(ErrReqUnsynchronized) {
t.Errorf("Expecting: true, received: false")
}
var err error
if IsNetworkError(err) {
t.Errorf("Nill error should not be consider a network error")
}
err = &net.OpError{Err: syscall.ECONNRESET}
if !IsNetworkError(err) {
t.Errorf("syscall.ECONNRESET should be consider a network error")
}
err = &net.DNSError{Err: "DNSError"}
if !IsNetworkError(err) {
t.Errorf("DNSError should be consider a network error")
}
err = fmt.Errorf("NOT_FOUND")
if IsNetworkError(err) {
t.Errorf("%s error should not be consider a network error", err)
}
err = ErrDisconnected
if !IsNetworkError(err) {
t.Errorf("%s error should be consider a network error", err)
}
}
func TestErrPathNotReachable(t *testing.T) {
if rcv := ErrPathNotReachable("test/path"); rcv.Error() != `path:"test/path" is not reachable` {
t.Errorf("Expecting: path:'test/path' is not reachable, received: %+v", rcv)