diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go index 33e10dfc3..85c374304 100644 --- a/dispatchers/dispatchers.go +++ b/dispatchers/dispatchers.go @@ -165,19 +165,7 @@ func (dS *DispatcherService) Dispatch(ctx *context.Context, ev *utils.CGREvent, if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes, routeID); ok && x != nil { dR = x.(*DispatcherRoute) - // query the profile out of cached route - var dPrfl *engine.DispatcherProfile - if dPrfl, err = dS.dm.GetDispatcherProfile(ctx, dR.Tenant, dR.ProfileID, - true, true, utils.NonTransactional); err != nil { - if err != utils.ErrNotFound { - return - } - // profile was not found - utils.Logger.Warning(fmt.Sprintf("<%s> could not find profile with tenant: <%s> and ID <%s> for routeID: <%s>", - utils.DispatcherS, dR.Tenant, dR.ProfileID, routeID)) - } else { - dPrfls = engine.DispatcherProfiles{dPrfl} // will be used as the list of routes - } + dPrfls = engine.DispatcherProfiles{&engine.DispatcherProfile{Tenant: dR.Tenant, ID: dR.ProfileID}} // will be used bellow to retrieve the dispatcher } } evNm := utils.MapStorage{ @@ -196,6 +184,7 @@ func (dS *DispatcherService) Dispatch(ctx *context.Context, ev *utils.CGREvent, if len(dPrfls) == 0 { return utils.NewErrDispatcherS(utils.ErrPrefixNotFound("PROFILE")) } + fmt.Printf("dispatching event: %+v to profiles: %+v\n", ev, dPrfls) for _, dPrfl := range dPrfls { tntID := dPrfl.TenantID() // get or build the Dispatcher for the config @@ -203,11 +192,34 @@ func (dS *DispatcherService) Dispatch(ctx *context.Context, ev *utils.CGREvent, if x, ok := engine.Cache.Get(utils.CacheDispatchers, tntID); ok && x != nil { d = x.(Dispatcher) - } else if d, err = newDispatcher(dPrfl); err != nil { - return utils.NewErrDispatcherS(err) - } else if err = engine.Cache.Set(ctx, utils.CacheDispatchers, tntID, d, // cache the built Dispatcher - nil, true, utils.EmptyString); err != nil { - return utils.NewErrDispatcherS(err) + } else { // dispatcher is not cached, build it here + if dPrfl.Hosts == nil { // dispatcher profile was not retrieved but built artificially above, try retrieving + if dPrfl, err = dS.dm.GetDispatcherProfile(ctx, dPrfl.Tenant, dPrfl.ID, + true, true, utils.NonTransactional); err != nil { + if err != utils.ErrNotFound { + return + } + // profile was not found + utils.Logger.Warning(fmt.Sprintf("<%s> could not find profile with tenant: <%s> and ID <%s> for routeID: <%s>", + utils.DispatcherS, dR.Tenant, dR.ProfileID, routeID)) + if len(dPrfls) == 1 { // the only profile set does not exist anymore + return utils.NewErrDispatcherS(utils.ErrPrefixNotFound("PROFILE")) + } + continue + } + } + if d, err = newDispatcher(dPrfl); err != nil { + return utils.NewErrDispatcherS(err) + } else if err = engine.Cache.Set(ctx, utils.CacheDispatchers, tntID, d, // cache the built Dispatcher + nil, true, utils.EmptyString); err != nil { + return utils.NewErrDispatcherS(err) + } + } + if routeID != utils.EmptyString && dR == nil { // first time we cache the route + dR = &DispatcherRoute{ + Tenant: dPrfl.Tenant, + ProfileID: dPrfl.ID, + } } if err = d.Dispatch(dS.dm, dS.fltrS, dS.cfg, ctx, dS.connMgr.GetDispInternalChan(), evNm, tnt, routeID, dR, diff --git a/dispatchers/libdispatcher.go b/dispatchers/libdispatcher.go index 25653ef8d..b32e41525 100644 --- a/dispatchers/libdispatcher.go +++ b/dispatchers/libdispatcher.go @@ -56,6 +56,7 @@ type Dispatcher interface { // newDispatcher constructs instances of Dispatcher func newDispatcher(pfl *engine.DispatcherProfile) (d Dispatcher, err error) { hosts := pfl.Hosts.Clone() + fmt.Printf("hosts: %+v, cloned: %+v\n", pfl.Hosts, hosts) hosts.Sort() // make sure the connections are sorted switch pfl.Strategy { case utils.MetaWeight: @@ -185,14 +186,14 @@ func (sd *singleResultDispatcher) Dispatch(dm *engine.DataManager, flts *engine. ctx *context.Context, iPRCCh chan birpc.ClientConnector, ev utils.DataProvider, tnt, routeID string, dR *DispatcherRoute, serviceMethod string, args interface{}, reply interface{}) (err error) { - if routeID != utils.EmptyString { // route to previously discovered route + if dR != nil { // route to previously discovered route if err = callDHwithID(ctx, tnt, dR.HostID, routeID, dR, dm, cfg, iPRCCh, serviceMethod, args, reply); err == nil || (err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors return } // not found or network errors will continue - utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with id <%q>", + utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with idnt <%q>", utils.DispatcherS, err.Error(), dR.HostID)) } var hostIDs []string @@ -202,10 +203,15 @@ func (sd *singleResultDispatcher) Dispatch(dm *engine.DataManager, flts *engine. return utils.ErrHostNotFound } for _, hostID := range hostIDs { - dRh := dR + var dRh *DispatcherRoute if routeID != utils.EmptyString { - dRh.HostID = hostID + dRh = &DispatcherRoute{ + Tenant: dR.Tenant, + ProfileID: dR.ProfileID, + HostID: hostID, + } } + fmt.Printf("Will dispatch to host with id: <%s>\n", hostID) if err = callDHwithID(ctx, tnt, hostID, routeID, dRh, dm, cfg, iPRCCh, serviceMethod, args, reply); err == nil || (err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors @@ -213,7 +219,7 @@ func (sd *singleResultDispatcher) Dispatch(dm *engine.DataManager, flts *engine. } if err != nil { // not found or network errors will continue with standard dispatching - utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with id <%q>", + utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with identity <%s>", utils.DispatcherS, err.Error(), hostID)) } return @@ -250,10 +256,20 @@ func (b *broadcastDispatcher) Dispatch(dm *engine.DataManager, flts *engine.Filt return utils.NewErrDispatcherS(err) } hasHosts = true + var dRh *DispatcherRoute + if routeID != utils.EmptyString { + dRh = &DispatcherRoute{ + Tenant: dR.Tenant, + ProfileID: dR.ProfileID, + HostID: hostID, + } + } pool.AddClient(&lazyDH{ - dh: dH, - cfg: cfg, - iPRCCh: iPRCCh, + dh: dH, + cfg: cfg, + iPRCCh: iPRCCh, + routeID: routeID, + dR: dRh, }) } if !hasHosts { // in case we do not match any host @@ -283,7 +299,7 @@ func (ld *loadDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS, } else if lM, err = newLoadMetrics(ld.hosts, ld.defaultRatio); err != nil { return } - if routeID != utils.EmptyString { // route to previously discovered route + if dR != nil { // route to previously discovered route lM.incrementLoad(ctx, dR.HostID, ld.tntID) err = callDHwithID(ctx, tnt, dR.HostID, routeID, dR, dm, cfg, iPRCCh, serviceMethod, args, reply) @@ -303,8 +319,16 @@ func (ld *loadDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS, return utils.ErrHostNotFound } for _, hostID := range hostIDs { + var dRh *DispatcherRoute + if routeID != utils.EmptyString { + dRh = &DispatcherRoute{ + Tenant: dR.Tenant, + ProfileID: dR.ProfileID, + HostID: hostID, + } + } lM.incrementLoad(ctx, hostID, ld.tntID) - err = callDHwithID(ctx, tnt, hostID, routeID, dR, dm, + err = callDHwithID(ctx, tnt, hostID, routeID, dRh, dm, cfg, iPRCCh, serviceMethod, args, reply) lM.decrementLoad(ctx, hostID, ld.tntID) // call ended if err == nil || @@ -405,7 +429,22 @@ func callDHwithID(ctx *context.Context, tnt, hostID, routeID string, dR *Dispatc if dH, err = dm.GetDispatcherHost(ctx, tnt, hostID, true, true, utils.NonTransactional); err != nil { return } - if err = callDH(ctx, dH, cfg, iPRCCh, serviceMethod, args, reply); err != nil { + if err = callDH(ctx, dH, routeID, dR, cfg, iPRCCh, serviceMethod, args, reply); err != nil { + return + } + + return +} + +func callDH(ctx *context.Context, + dh *engine.DispatcherHost, routeID string, dR *DispatcherRoute, + cfg *config.CGRConfig, iPRCCh chan birpc.ClientConnector, + method string, args, reply interface{}) (err error) { + var conn birpc.ClientConnector + if conn, err = dh.GetConn(ctx, cfg, iPRCCh); err != nil { + return + } + if err = conn.Call(ctx, method, args, reply); err != nil { return } if routeID != utils.EmptyString { // cache the discovered route @@ -417,23 +456,15 @@ func callDHwithID(ctx *context.Context, tnt, hostID, routeID string, dR *Dispatc return } -func callDH(ctx *context.Context, - dh *engine.DispatcherHost, cfg *config.CGRConfig, iPRCCh chan birpc.ClientConnector, - method string, args, reply interface{}) (err error) { - var conn birpc.ClientConnector - if conn, err = dh.GetConn(ctx, cfg, iPRCCh); err != nil { - return - } - return conn.Call(ctx, method, args, reply) -} - // lazyDH is created for the broadcast strategy so we can make sure host exists during setup phase type lazyDH struct { - dh *engine.DispatcherHost - cfg *config.CGRConfig - iPRCCh chan birpc.ClientConnector + dh *engine.DispatcherHost + cfg *config.CGRConfig + iPRCCh chan birpc.ClientConnector + routeID string + dR *DispatcherRoute } -func (l *lazyDH) Call(ctx *context.Context, method string, args, reply interface{}) error { - return callDH(ctx, l.dh, l.cfg, l.iPRCCh, method, args, reply) +func (l *lazyDH) Call(ctx *context.Context, method string, args, reply interface{}) (err error) { + return callDH(ctx, l.dh, l.routeID, l.dR, l.cfg, l.iPRCCh, method, args, reply) }