DispatcherS - ability to disable routing from API opts

This commit is contained in:
DanB
2022-09-08 17:29:38 +02:00
parent 534f46addf
commit 6287e1d34c
3 changed files with 41 additions and 10 deletions

View File

@@ -25,6 +25,7 @@ import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
@@ -156,11 +157,28 @@ func (dS *DispatcherService) Dispatch(ctx *context.Context, ev *utils.CGREvent,
if tnt == utils.EmptyString {
tnt = dS.cfg.GeneralCfg().DefaultTenant
}
if ifAce, has := ev.APIOpts[utils.MetaDispatchers]; has { // special case when dispatching is disabled, directly proxy internally
var shouldDispatch bool
if shouldDispatch, err = utils.IfaceAsBool(ifAce); err != nil {
return utils.NewErrDispatcherS(err)
}
if !shouldDispatch {
return callDH(ctx,
newInternalHost(tnt), utils.EmptyString, nil,
dS.cfg, dS.connMgr.GetDispInternalChan(),
serviceMethod, args, reply)
}
}
var dR *DispatcherRoute
var dPrfls engine.DispatcherProfiles
routeID := utils.IfaceAsString(ev.APIOpts[utils.OptsRouteID])
if routeID != utils.EmptyString { // overwrite routeID with RouteID:Subsystem for subsystem correct routing
routeID = utils.ConcatenatedKey(routeID, subsys)
guardID := utils.ConcatenatedKey(utils.DispatcherSv1, utils.OptsRouteID, routeID)
refID := guardian.Guardian.GuardIDs("",
dS.cfg.GeneralCfg().LockingTimeout, guardID) // lock the routeID so we can make sure we have time to execute only once before caching
defer guardian.Guardian.UnguardIDs(refID)
// use previously discovered route
if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes,
routeID); ok && x != nil {
@@ -184,7 +202,6 @@ func (dS *DispatcherService) Dispatch(ctx *context.Context, ev *utils.CGREvent,
if len(dPrfls) == 0 {
return utils.NewErrDispatcherS(utils.ErrPrefixNotFound("PROFILE"))
}
fmt.Printf("dispatching event: %+v to profiles: %+v\n", ev, dPrfls)
for _, dPrfl := range dPrfls {
tntID := dPrfl.TenantID()
// get or build the Dispatcher for the config

View File

@@ -24,6 +24,7 @@ import (
"math/rand"
"sort"
"sync"
"time"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
@@ -56,7 +57,6 @@ type Dispatcher interface {
// newDispatcher constructs instances of Dispatcher
func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) {
hosts := pfl.Hosts.Clone()
fmt.Printf("hosts: %+v, cloned: %+v\n", pfl.Hosts, hosts)
hosts.Sort() // make sure the connections are sorted
switch pfl.Strategy {
case utils.MetaWeight:
@@ -211,7 +211,6 @@ func (sd *singleResultDispatcher) Dispatch(dm *engine.DataManager, flts *engine.
HostID: hostID,
}
}
fmt.Printf("Will dispatch to host with id: <%s>\n", hostID)
if err = callDHwithID(ctx, tnt, hostID, routeID, dRh, dm,
cfg, iPRCCh, serviceMethod, args, reply); err == nil ||
(err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors
@@ -440,6 +439,12 @@ func callDH(ctx *context.Context,
dh *engine.DispatcherHost, routeID string, dR *DispatcherRoute,
cfg *config.CGRConfig, iPRCCh chan birpc.ClientConnector,
method string, args, reply interface{}) (err error) {
if routeID != utils.EmptyString { // cache the discovered route before dispatching
if err = engine.Cache.Set(ctx, utils.CacheDispatcherRoutes, routeID, dR,
nil, true, utils.EmptyString); err != nil {
return
}
}
var conn birpc.ClientConnector
if conn, err = dh.GetConn(ctx, cfg, iPRCCh); err != nil {
return
@@ -447,12 +452,7 @@ func callDH(ctx *context.Context,
if err = conn.Call(ctx, method, args, reply); err != nil {
return
}
if routeID != utils.EmptyString { // cache the discovered route
if err = engine.Cache.Set(ctx, utils.CacheDispatcherRoutes, routeID, dR,
nil, true, utils.EmptyString); err != nil {
return
}
}
return
}
@@ -468,3 +468,18 @@ type lazyDH struct {
func (l *lazyDH) Call(ctx *context.Context, method string, args, reply interface{}) (err error) {
return callDH(ctx, l.dh, l.routeID, l.dR, l.cfg, l.iPRCCh, method, args, reply)
}
// newInternalHost returns an internal host as needed for internal dispatching
func newInternalHost(tnt string) *engine.DispatcherHost {
return &engine.DispatcherHost{
Tenant: tnt,
RemoteHost: &config.RemoteHost{
ID: utils.MetaInternal,
Address: utils.MetaInternal,
ConnectAttempts: 1,
Reconnects: 1,
ConnectTimeout: time.Second,
ReplyTimeout: time.Second,
},
}
}

View File

@@ -71,7 +71,6 @@ func (db *DataDBService) Start(*context.Context, context.CancelFunc) (err error)
}
db.dm = engine.NewDataManager(d, db.cfg.CacheCfg(), db.connMgr)
if err = engine.CheckVersions(db.dm.DataDB()); err != nil {
fmt.Println(err)
return
}
db.dbchan <- db.dm