From 3c06f49246d41656f1a92ef97ada32ddbc1ae555 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Fri, 20 Nov 2020 13:26:00 +0200 Subject: [PATCH] Updated DispatcherS *broadcast, *broadcast_sync and *broadcast_async to behave similar to RPCPool --- .../dispatchers/DispatcherProfiles.csv | 2 +- .../dispatchers_gob/DispatcherProfiles.csv | 2 +- .../gocs/dsp_site/DispatcherProfiles.csv | 2 +- .../precache/DispatcherProfiles.csv | 2 +- dispatchers/libdispatcher.go | 50 ++++++++----------- dispatchers/responder_it_test.go | 3 +- engine/connmanager.go | 2 +- go.mod | 2 +- go.sum | 8 +-- packages/debian/changelog | 1 + utils/consts.go | 1 - utils/errors.go | 22 -------- utils/errors_test.go | 33 ------------ 13 files changed, 33 insertions(+), 97 deletions(-) diff --git a/data/tariffplans/dispatchers/DispatcherProfiles.csv b/data/tariffplans/dispatchers/DispatcherProfiles.csv index 0bb8bf867..ff9d57357 100644 --- a/data/tariffplans/dispatchers/DispatcherProfiles.csv +++ b/data/tariffplans/dispatchers/DispatcherProfiles.csv @@ -9,7 +9,7 @@ cgrates.org,EVENT2,*any,*string:~*req.EventName:RoundRobin,,*round_robin,,ALL2,, cgrates.org,EVENT2,,,,,,ALL,,10,,, cgrates.org,EVENT3,*any,*string:~*req.EventName:Random,,*random,,ALL2,,20,false,,20 cgrates.org,EVENT3,,,,,,ALL,,10,,, -cgrates.org,EVENT4,*any,*string:~*req.EventName:Broadcast,,*broadcast,,ALL2,,20,false,,20 +cgrates.org,EVENT4,*any,*string:~*req.EventName:Broadcast,,*broadcast_sync,,ALL2,,20,false,,20 cgrates.org,EVENT4,,,,,,ALL,,10,,, cgrates.org,EVENT5,*any,*string:~*req.EventName:Internal,,*weight,,SELF,,20,false,,20 cgrates.org,EVENT6,*any,*string:~*opts.*method:DispatcherSv1.GetProfileForEvent,,*weight,,SELF,,20,false,,20 diff --git a/data/tariffplans/dispatchers_gob/DispatcherProfiles.csv b/data/tariffplans/dispatchers_gob/DispatcherProfiles.csv index 0bb8bf867..ff9d57357 100644 --- a/data/tariffplans/dispatchers_gob/DispatcherProfiles.csv +++ b/data/tariffplans/dispatchers_gob/DispatcherProfiles.csv @@ -9,7 +9,7 @@ cgrates.org,EVENT2,*any,*string:~*req.EventName:RoundRobin,,*round_robin,,ALL2,, cgrates.org,EVENT2,,,,,,ALL,,10,,, cgrates.org,EVENT3,*any,*string:~*req.EventName:Random,,*random,,ALL2,,20,false,,20 cgrates.org,EVENT3,,,,,,ALL,,10,,, -cgrates.org,EVENT4,*any,*string:~*req.EventName:Broadcast,,*broadcast,,ALL2,,20,false,,20 +cgrates.org,EVENT4,*any,*string:~*req.EventName:Broadcast,,*broadcast_sync,,ALL2,,20,false,,20 cgrates.org,EVENT4,,,,,,ALL,,10,,, cgrates.org,EVENT5,*any,*string:~*req.EventName:Internal,,*weight,,SELF,,20,false,,20 cgrates.org,EVENT6,*any,*string:~*opts.*method:DispatcherSv1.GetProfileForEvent,,*weight,,SELF,,20,false,,20 diff --git a/data/tariffplans/gocs/dsp_site/DispatcherProfiles.csv b/data/tariffplans/gocs/dsp_site/DispatcherProfiles.csv index da1647dee..5dc084040 100644 --- a/data/tariffplans/gocs/dsp_site/DispatcherProfiles.csv +++ b/data/tariffplans/gocs/dsp_site/DispatcherProfiles.csv @@ -1,4 +1,4 @@ #Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight -cgrates.org,BROADCAST,*sessions,,,*broadcast,,AU_SITE,,20,false,,30 +cgrates.org,BROADCAST,*sessions,,,*broadcast_sync,,AU_SITE,,20,false,,30 cgrates.org,BROADCAST,,,,,,US_SITE,,20,,, cgrates.org,SELF,*any,,,*weight,,SELF,,20,false,,10 \ No newline at end of file diff --git a/data/tariffplans/precache/DispatcherProfiles.csv b/data/tariffplans/precache/DispatcherProfiles.csv index 38223d7e9..05c93d2be 100644 --- a/data/tariffplans/precache/DispatcherProfiles.csv +++ b/data/tariffplans/precache/DispatcherProfiles.csv @@ -7,6 +7,6 @@ cgrates.org,EVENT2,*any,*string:~*req.EventName:RoundRobin,,*round_robin,,ALL2,, cgrates.org,EVENT2,,,,,,ALL,,10,,, cgrates.org,EVENT3,*any,*string:~*req.EventName:Random,,*random,,ALL2,,20,false,,20 cgrates.org,EVENT3,,,,,,ALL,,10,,, -cgrates.org,EVENT4,*any,*string:~*req.EventName:Broadcast,,*broadcast,,ALL2,,20,false,,20 +cgrates.org,EVENT4,*any,*string:~*req.EventName:Broadcast,,*broadcast_sync,,ALL2,,20,false,,20 cgrates.org,EVENT4,,,,,,ALL,,10,,, cgrates.org,EVENT5,*any,*string:~*req.EventName:Internal,,*weight,,SELF,,20,false,,20 diff --git a/dispatchers/libdispatcher.go b/dispatchers/libdispatcher.go index dfe40c359..44317e270 100644 --- a/dispatchers/libdispatcher.go +++ b/dispatchers/libdispatcher.go @@ -24,8 +24,10 @@ import ( "sort" "sync" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) func init() { @@ -93,12 +95,14 @@ func newDispatcher(dm *engine.DataManager, pfl *engine.DispatcherProfile) (d Dis hosts: hosts, strategy: strDsp, } - case utils.MetaBroadcast: + case rpcclient.PoolBroadcast, + rpcclient.PoolBroadcastSync, + rpcclient.PoolBroadcastAsync: d = &WeightDispatcher{ dm: dm, tnt: pfl.Tenant, hosts: hosts, - strategy: new(brodcastStrategyDispatcher), + strategy: &broadcastStrategyDispatcher{strategy: pfl.Strategy}, } default: err = fmt.Errorf("unsupported dispatch strategy: <%s>", pfl.Strategy) @@ -223,7 +227,7 @@ func (*singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes, routeID); ok && x != nil { dH = x.(*engine.DispatcherHost) - if err = dH.Call(serviceMethod, args, reply); !utils.IsNetworkError(err) { + if err = dH.Call(serviceMethod, args, reply); !rpcclient.IsNetworkError(err) { return } } @@ -241,7 +245,7 @@ func (*singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID return } called = true - if err = dH.Call(serviceMethod, args, reply); utils.IsNetworkError(err) { + if err = dH.Call(serviceMethod, args, reply); rpcclient.IsNetworkError(err) { continue } if routeID != utils.EmptyString { // cache the discovered route @@ -259,12 +263,14 @@ func (*singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeID return } -type brodcastStrategyDispatcher struct{} +type broadcastStrategyDispatcher struct { + strategy string +} -func (*brodcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID string, subsystem, tnt string, hostIDs []string, +func (b *broadcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID string, subsystem, tnt string, hostIDs []string, serviceMethod string, args interface{}, reply interface{}) (err error) { - var hasErrors bool - var called bool + var hasHosts bool + pool := rpcclient.NewRPCPool(b.strategy, config.CgrConfig().GeneralCfg().ReplyTimeout) for _, hostID := range hostIDs { var dH *engine.DispatcherHost if dH, err = dm.GetDispatcherHost(tnt, hostID, true, true, utils.NonTransactional); err != nil { @@ -274,27 +280,15 @@ func (*brodcastStrategyDispatcher) dispatch(dm *engine.DataManager, routeID stri err = nil continue } - err = utils.NewErrDispatcherS(err) - return - } - called = true - if err = dH.Call(serviceMethod, args, reply); utils.IsNetworkError(err) { - utils.Logger.Err(fmt.Sprintf("<%s> network error: <%s> at %s strategy for hostID %q", - utils.DispatcherS, err.Error(), utils.MetaBroadcast, hostID)) - hasErrors = true - } else if err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: <%s> at %s strategy for hostID %q", - utils.DispatcherS, err.Error(), utils.MetaBroadcast, hostID)) - hasErrors = true + return utils.NewErrDispatcherS(err) } + hasHosts = true + pool.AddClient(dH) } - if hasErrors { // rewrite err if not all call were succesfull - return utils.ErrPartiallyExecuted - } else if !called { // in case we do not match any host - err = utils.ErrHostNotFound - return + if !hasHosts { // in case we do not match any host + return utils.ErrHostNotFound } - return + return pool.Call(serviceMethod, args, reply) } func newSingleStrategyDispatcher(hosts engine.DispatcherHostProfiles, params map[string]interface{}, tntID string) (ls strategyDispatcher, err error) { @@ -374,7 +368,7 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID strin lM.incrementLoad(dH.ID, ld.tntID) err = dH.Call(serviceMethod, args, reply) lM.decrementLoad(dH.ID, ld.tntID) // call ended - if !utils.IsNetworkError(err) { + if !rpcclient.IsNetworkError(err) { return } } @@ -395,7 +389,7 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID strin lM.incrementLoad(hostID, ld.tntID) err = dH.Call(serviceMethod, args, reply) lM.decrementLoad(hostID, ld.tntID) // call ended - if utils.IsNetworkError(err) { + if rpcclient.IsNetworkError(err) { continue } if routeID != utils.EmptyString { // cache the discovered route diff --git a/dispatchers/responder_it_test.go b/dispatchers/responder_it_test.go index 0bc8d9ed4..5a84ea9b5 100644 --- a/dispatchers/responder_it_test.go +++ b/dispatchers/responder_it_test.go @@ -25,6 +25,7 @@ import ( "testing" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) var sTestsDspRsp = []func(t *testing.T){ @@ -193,7 +194,7 @@ func testDspResponderBroadcast(t *testing.T) { allEngine.stopEngine(t) pingReply = "" if err := dispEngine.RPC.Call(utils.ResponderPing, pingEv, &pingReply); err == nil || - err.Error() != utils.ErrPartiallyExecuted.Error() { + !rpcclient.IsNetworkError(err) { t.Errorf("Expected error: %s received error: %v and reply %q", utils.ErrPartiallyExecuted.Error(), err, pingReply) } allEngine.startEngine(t) diff --git a/engine/connmanager.go b/engine/connmanager.go index 0da1345b8..d03a07b65 100644 --- a/engine/connmanager.go +++ b/engine/connmanager.go @@ -136,7 +136,7 @@ func (cM *ConnManager) Call(connIDs []string, biRPCClient rpcclient.ClientConnec if conn, err = cM.getConn(connID, biRPCClient); err != nil { continue } - if err = conn.Call(method, arg, reply); utils.IsNetworkError(err) { + if err = conn.Call(method, arg, reply); rpcclient.IsNetworkError(err) { continue } else { return diff --git a/go.mod b/go.mod index 1d7f6313e..e0965f58f 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/cgrates/kamevapi v0.0.0-20191001125829-7dbc3ad58817 github.com/cgrates/ltcache v0.0.0-20181016092649-92fb7fa77cca github.com/cgrates/radigo v0.0.0-20201113143731-162035428d72 - github.com/cgrates/rpcclient v0.0.0-20201106122207-1023f17a2b43 + github.com/cgrates/rpcclient v0.0.0-20201120095908-1c0f9f4bb06e github.com/cgrates/sipingo v1.0.1-0.20200514112313-699ebc1cdb8e github.com/cgrates/ugocodec v0.0.0-20201023092048-df93d0123f60 github.com/creack/pty v1.1.11 diff --git a/go.sum b/go.sum index 7855e94ad..142a6818f 100644 --- a/go.sum +++ b/go.sum @@ -87,8 +87,6 @@ github.com/cenkalti/hub v1.0.1/go.mod h1:tcYwtS3a2d9NO/0xDXVJWx3IedurUjYCqFCmpi0 github.com/cenkalti/rpc2 v0.0.0-20200203073230-5ce2854ce0fd h1:rGgK+sDUQPC/R+Uc1y6uFnsFpwZyqpFBg+0AtFORFjk= github.com/cenkalti/rpc2 v0.0.0-20200203073230-5ce2854ce0fd/go.mod h1:v2npkhrXyk5BCnkNIiPdRI23Uq6uWPUQGL2hnRcRr/M= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/cgrates/aringo v0.0.0-20191121125609-d85002bd1667 h1:eNku7bwLtSTpn6FQUUTnqohuGWC8jij1KNqDZ2QkGRE= -github.com/cgrates/aringo v0.0.0-20191121125609-d85002bd1667/go.mod h1:l0xi5JUVqxL4P7RZ9YitbSCiOtjMY2j7JBOCJIysRWk= github.com/cgrates/aringo v0.0.0-20201113143849-3b299e4e636d h1:1PLz/t3XZy5KF8EY/ShzBZoVLaY50+tnAbE1wu8rCfg= github.com/cgrates/aringo v0.0.0-20201113143849-3b299e4e636d/go.mod h1:mMAzSIjK11XfRMrOIa7DXYl64REdPldRCbAgzKB47XQ= github.com/cgrates/baningo v0.0.0-20201105145354-6e3173f6a91b h1:9IX5Z3Tw7n2QrY7GLGGpqjjC/NVSJvQ7nxLkC2JP4vw= @@ -101,12 +99,10 @@ github.com/cgrates/kamevapi v0.0.0-20191001125829-7dbc3ad58817 h1:1Tdv6H/usqmkQV github.com/cgrates/kamevapi v0.0.0-20191001125829-7dbc3ad58817/go.mod h1:pgHqPlPcEDIQbbs9wyBk7YZTcaVdxMqf3v04XU+mngI= github.com/cgrates/ltcache v0.0.0-20181016092649-92fb7fa77cca h1:Ejj4m0Ccl8dMMVnoHk4nQMlbR3w24llqQDy66DO9E0A= github.com/cgrates/ltcache v0.0.0-20181016092649-92fb7fa77cca/go.mod h1:q7c996DUu8OrJRnewVSQzM+y/bRcxZAHoo+zCD8bFBo= -github.com/cgrates/radigo v0.0.0-20200324152710-35e651804ad1 h1:QvA6Nbwq9kTd7hsvu1xo5kwia68cLwnp0sYCq1u40TU= -github.com/cgrates/radigo v0.0.0-20200324152710-35e651804ad1/go.mod h1:HZbsg3Y+xw4lsfCqX9rzj429wrg0XOug6pFT3B6VHZY= github.com/cgrates/radigo v0.0.0-20201113143731-162035428d72 h1:cTAWQEbab3gKkDSeaxkTaoiP/cNFx+7/kC96wYckk3g= github.com/cgrates/radigo v0.0.0-20201113143731-162035428d72/go.mod h1:3IDSbfIqU5VsYKjrwa3HhuAK1jlI65wa1coHetoaN20= -github.com/cgrates/rpcclient v0.0.0-20201106122207-1023f17a2b43 h1:DEqW3NSJkL5au9kr7knI+ExPqKtn79oKE+20CftDteo= -github.com/cgrates/rpcclient v0.0.0-20201106122207-1023f17a2b43/go.mod h1:+QZt2Af6g8UScM5NWjAwn0CyvLjgVgkZPJyAJqQgXoQ= +github.com/cgrates/rpcclient v0.0.0-20201120095908-1c0f9f4bb06e h1:t1oSABmZYacJiB+JrJQVGFqkv+7ekgOo/Jj/KdoM5mI= +github.com/cgrates/rpcclient v0.0.0-20201120095908-1c0f9f4bb06e/go.mod h1:+QZt2Af6g8UScM5NWjAwn0CyvLjgVgkZPJyAJqQgXoQ= github.com/cgrates/sipingo v1.0.1-0.20200514112313-699ebc1cdb8e h1:izFjZB83/XRXInc+gMIssUxdbleGsGIuGCPj2u7RQo0= github.com/cgrates/sipingo v1.0.1-0.20200514112313-699ebc1cdb8e/go.mod h1:0f2+3dq5Iiv3VlcuY83VPJ0QzqRlzDG1Cr8okogQE3g= github.com/cgrates/ugocodec v0.0.0-20201023092048-df93d0123f60 h1:TQDg+HGB17LU8FitLiLvYazYSy62GQ1lO3lGKI3xUrU= diff --git a/packages/debian/changelog b/packages/debian/changelog index 68df2d905..6120d78a9 100644 --- a/packages/debian/changelog +++ b/packages/debian/changelog @@ -131,6 +131,7 @@ cgrates (0.11.0~dev) UNRELEASED; urgency=medium * [CDRs] Replaced RSRField with RSRParser * [RouteS] Add new field RouteRateProfileIDs in RateProfiles.csv * [DispatcherS] Removed connection pool from DispatcherHost structure + * [DispatcherS] Updated *broadcast, *broadcast_sync and *broadcast_async to behave similar to RPCPool -- DanB Wed, 19 Feb 2020 13:25:52 +0200 diff --git a/utils/consts.go b/utils/consts.go index 786d496d5..ca2ad5401 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -1070,7 +1070,6 @@ const ( const ( MetaFirst = "*first" MetaRandom = "*random" - MetaBroadcast = "*broadcast" MetaRoundRobin = "*round_robin" MetaRatio = "*ratio" MetaDefaultRatio = "*default_ratio" diff --git a/utils/errors.go b/utils/errors.go index 6b9bc71b1..539da27cd 100644 --- a/utils/errors.go +++ b/utils/errors.go @@ -21,8 +21,6 @@ package utils import ( "errors" "fmt" - "net" - "net/rpc" "strings" ) @@ -251,26 +249,6 @@ func ErrEnvNotFound(key string) error { return ErrPrefix(ErrNotFound, "ENV_VAR:"+key) } -// IsNetworkError will decide if an error is network generated or RPC one -// used by Dispatcher to figure out whether it should try another connection -func IsNetworkError(err error) bool { - if err == nil { - return false - } - if _, isNetError := err.(*net.OpError); isNetError { // connection reset - return true - } - if _, isDNSError := err.(*net.DNSError); isDNSError { - return true - } - return err.Error() == rpc.ErrShutdown.Error() || - err.Error() == ErrReqUnsynchronized.Error() || - err.Error() == ErrDisconnected.Error() || - err.Error() == ErrReplyTimeout.Error() || - err.Error() == ErrSessionNotFound.Error() || - strings.HasPrefix(err.Error(), "rpc: can't find service") -} - func ErrPathNotReachable(path string) error { return fmt.Errorf("path:%+q is not reachable", path) } diff --git a/utils/errors_test.go b/utils/errors_test.go index 11e3b80d6..c9542fc0a 100644 --- a/utils/errors_test.go +++ b/utils/errors_test.go @@ -18,10 +18,7 @@ along with this program. If not, see package utils import ( - "fmt" - "net" "reflect" - "syscall" "testing" ) @@ -193,36 +190,6 @@ func TestErrEnvNotFound(t *testing.T) { } } -func TestIsNetworkError(t *testing.T) { - if IsNetworkError(nil) { - t.Errorf("Expecting: false, received: true") - } - if !IsNetworkError(ErrReqUnsynchronized) { - t.Errorf("Expecting: true, received: false") - } - var err error - if IsNetworkError(err) { - t.Errorf("Nill error should not be consider a network error") - } - err = &net.OpError{Err: syscall.ECONNRESET} - if !IsNetworkError(err) { - t.Errorf("syscall.ECONNRESET should be consider a network error") - } - err = &net.DNSError{Err: "DNSError"} - if !IsNetworkError(err) { - t.Errorf("DNSError should be consider a network error") - } - err = fmt.Errorf("NOT_FOUND") - if IsNetworkError(err) { - t.Errorf("%s error should not be consider a network error", err) - } - err = ErrDisconnected - if !IsNetworkError(err) { - t.Errorf("%s error should be consider a network error", err) - } - -} - func TestErrPathNotReachable(t *testing.T) { if rcv := ErrPathNotReachable("test/path"); rcv.Error() != `path:"test/path" is not reachable` { t.Errorf("Expecting: path:'test/path' is not reachable, received: %+v", rcv)