diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go index 85c374304..5e270917a 100644 --- a/dispatchers/dispatchers.go +++ b/dispatchers/dispatchers.go @@ -25,6 +25,7 @@ import ( "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) @@ -156,11 +157,28 @@ func (dS *DispatcherService) Dispatch(ctx *context.Context, ev *utils.CGREvent, if tnt == utils.EmptyString { tnt = dS.cfg.GeneralCfg().DefaultTenant } + if ifAce, has := ev.APIOpts[utils.MetaDispatchers]; has { // special case when dispatching is disabled, directly proxy internally + var shouldDispatch bool + if shouldDispatch, err = utils.IfaceAsBool(ifAce); err != nil { + return utils.NewErrDispatcherS(err) + } + if !shouldDispatch { + return callDH(ctx, + newInternalHost(tnt), utils.EmptyString, nil, + dS.cfg, dS.connMgr.GetDispInternalChan(), + serviceMethod, args, reply) + } + } + var dR *DispatcherRoute var dPrfls engine.DispatcherProfiles routeID := utils.IfaceAsString(ev.APIOpts[utils.OptsRouteID]) if routeID != utils.EmptyString { // overwrite routeID with RouteID:Subsystem for subsystem correct routing routeID = utils.ConcatenatedKey(routeID, subsys) + guardID := utils.ConcatenatedKey(utils.DispatcherSv1, utils.OptsRouteID, routeID) + refID := guardian.Guardian.GuardIDs("", + dS.cfg.GeneralCfg().LockingTimeout, guardID) // lock the routeID so we can make sure we have time to execute only once before caching + defer guardian.Guardian.UnguardIDs(refID) // use previously discovered route if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes, routeID); ok && x != nil { @@ -184,7 +202,6 @@ func (dS *DispatcherService) Dispatch(ctx *context.Context, ev *utils.CGREvent, if len(dPrfls) == 0 { return utils.NewErrDispatcherS(utils.ErrPrefixNotFound("PROFILE")) } - fmt.Printf("dispatching event: %+v to profiles: %+v\n", ev, dPrfls) for _, dPrfl := range dPrfls { tntID := dPrfl.TenantID() // get or build the Dispatcher for the config diff --git a/dispatchers/libdispatcher.go b/dispatchers/libdispatcher.go index 7904f249f..63ecf52a6 100644 --- a/dispatchers/libdispatcher.go +++ b/dispatchers/libdispatcher.go @@ -24,6 +24,7 @@ import ( "math/rand" "sort" "sync" + "time" "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" @@ -56,7 +57,6 @@ type Dispatcher interface { // newDispatcher constructs instances of Dispatcher func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) { hosts := pfl.Hosts.Clone() - fmt.Printf("hosts: %+v, cloned: %+v\n", pfl.Hosts, hosts) hosts.Sort() // make sure the connections are sorted switch pfl.Strategy { case utils.MetaWeight: @@ -211,7 +211,6 @@ func (sd *singleResultDispatcher) Dispatch(dm *engine.DataManager, flts *engine. HostID: hostID, } } - fmt.Printf("Will dispatch to host with id: <%s>\n", hostID) if err = callDHwithID(ctx, tnt, hostID, routeID, dRh, dm, cfg, iPRCCh, serviceMethod, args, reply); err == nil || (err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors @@ -440,6 +439,12 @@ func callDH(ctx *context.Context, dh *engine.DispatcherHost, routeID string, dR *DispatcherRoute, cfg *config.CGRConfig, iPRCCh chan birpc.ClientConnector, method string, args, reply interface{}) (err error) { + if routeID != utils.EmptyString { // cache the discovered route before dispatching + if err = engine.Cache.Set(ctx, utils.CacheDispatcherRoutes, routeID, dR, + nil, true, utils.EmptyString); err != nil { + return + } + } var conn birpc.ClientConnector if conn, err = dh.GetConn(ctx, cfg, iPRCCh); err != nil { return @@ -447,12 +452,7 @@ func callDH(ctx *context.Context, if err = conn.Call(ctx, method, args, reply); err != nil { return } - if routeID != utils.EmptyString { // cache the discovered route - if err = engine.Cache.Set(ctx, utils.CacheDispatcherRoutes, routeID, dR, - nil, true, utils.EmptyString); err != nil { - return - } - } + return } @@ -468,3 +468,18 @@ type lazyDH struct { func (l *lazyDH) Call(ctx *context.Context, method string, args, reply interface{}) (err error) { return callDH(ctx, l.dh, l.routeID, l.dR, l.cfg, l.iPRCCh, method, args, reply) } + +// newInternalHost returns an internal host as needed for internal dispatching +func newInternalHost(tnt string) *engine.DispatcherHost { + return &engine.DispatcherHost{ + Tenant: tnt, + RemoteHost: &config.RemoteHost{ + ID: utils.MetaInternal, + Address: utils.MetaInternal, + ConnectAttempts: 1, + Reconnects: 1, + ConnectTimeout: time.Second, + ReplyTimeout: time.Second, + }, + } +} diff --git a/services/datadb.go b/services/datadb.go index 4742c4f10..e6a382bf2 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -71,7 +71,6 @@ func (db *DataDBService) Start(*context.Context, context.CancelFunc) (err error) } db.dm = engine.NewDataManager(d, db.cfg.CacheCfg(), db.connMgr) if err = engine.CheckVersions(db.dm.DataDB()); err != nil { - fmt.Println(err) return } db.dbchan <- db.dm