diff --git a/config/config_defaults.go b/config/config_defaults.go index 97a63f097..65edc8fc4 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -1087,6 +1087,7 @@ const CGRATES_CFG_JSON = ` "nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level) "attributes_conns": [], // connections to AttributeS for API authorization, empty to disable auth functionality: <""|*internal|$rpc_conns_id> "any_subsystem": true, // if we match the *any subsystem + "prevent_loops": false, // }, diff --git a/config/dispatcherscfg.go b/config/dispatcherscfg.go index c3ef99612..d2cd5d0b6 100644 --- a/config/dispatcherscfg.go +++ b/config/dispatcherscfg.go @@ -32,6 +32,7 @@ type DispatcherSCfg struct { AttributeSConns []string NestedFields bool AnySubsystem bool + PreventLoop bool } func (dps *DispatcherSCfg) loadFromJSONCfg(jsnCfg *DispatcherSJsonCfg) (err error) { @@ -81,37 +82,41 @@ func (dps *DispatcherSCfg) loadFromJSONCfg(jsnCfg *DispatcherSJsonCfg) (err erro if jsnCfg.Any_subsystem != nil { dps.AnySubsystem = *jsnCfg.Any_subsystem } + if jsnCfg.Prevent_loop != nil { + dps.PreventLoop = *jsnCfg.Prevent_loop + } return nil } // AsMapInterface returns the config as a map[string]interface{} -func (dps *DispatcherSCfg) AsMapInterface() (initialMP map[string]interface{}) { - initialMP = map[string]interface{}{ +func (dps *DispatcherSCfg) AsMapInterface() (mp map[string]interface{}) { + mp = map[string]interface{}{ utils.EnabledCfg: dps.Enabled, utils.IndexedSelectsCfg: dps.IndexedSelects, utils.NestedFieldsCfg: dps.NestedFields, utils.AnySubsystemCfg: dps.AnySubsystem, + utils.PreventLoopCfg: dps.PreventLoop, } if dps.StringIndexedFields != nil { stringIndexedFields := make([]string, len(*dps.StringIndexedFields)) for i, item := range *dps.StringIndexedFields { stringIndexedFields[i] = item } - initialMP[utils.StringIndexedFieldsCfg] = stringIndexedFields + mp[utils.StringIndexedFieldsCfg] = stringIndexedFields } if dps.PrefixIndexedFields != nil { prefixIndexedFields := make([]string, len(*dps.PrefixIndexedFields)) for i, item := range *dps.PrefixIndexedFields { prefixIndexedFields[i] = item } - initialMP[utils.PrefixIndexedFieldsCfg] = prefixIndexedFields + mp[utils.PrefixIndexedFieldsCfg] = prefixIndexedFields } if dps.SuffixIndexedFields != nil { suffixIndexedFields := make([]string, len(*dps.SuffixIndexedFields)) for i, item := range *dps.SuffixIndexedFields { suffixIndexedFields[i] = item } - initialMP[utils.SuffixIndexedFieldsCfg] = suffixIndexedFields + mp[utils.SuffixIndexedFieldsCfg] = suffixIndexedFields } if dps.AttributeSConns != nil { attributeSConns := make([]string, len(dps.AttributeSConns)) @@ -121,7 +126,7 @@ func (dps *DispatcherSCfg) AsMapInterface() (initialMP map[string]interface{}) { attributeSConns[i] = utils.MetaInternal } } - initialMP[utils.AttributeSConnsCfg] = attributeSConns + mp[utils.AttributeSConnsCfg] = attributeSConns } return } @@ -133,6 +138,7 @@ func (dps DispatcherSCfg) Clone() (cln *DispatcherSCfg) { IndexedSelects: dps.IndexedSelects, NestedFields: dps.NestedFields, AnySubsystem: dps.AnySubsystem, + PreventLoop: dps.PreventLoop, } if dps.AttributeSConns != nil { diff --git a/config/libconfig_json.go b/config/libconfig_json.go index b5e5ff819..eeda5f106 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -739,6 +739,7 @@ type DispatcherSJsonCfg struct { Nested_fields *bool // applies when indexed fields is not defined Attributes_conns *[]string Any_subsystem *bool + Prevent_loop *bool } type RegistrarCJsonCfg struct { diff --git a/data/conf/samples/dispatcher_opts/cgrates.json b/data/conf/samples/dispatcher_opts/cgrates.json index 65debab34..114c8beac 100644 --- a/data/conf/samples/dispatcher_opts/cgrates.json +++ b/data/conf/samples/dispatcher_opts/cgrates.json @@ -1,7 +1,7 @@ { "general": { - "node_id": "DispatcherOpts", + "node_id": "HOST1", "log_level": 7 }, @@ -27,10 +27,15 @@ "dispatchers":{ - "enabled": true + "enabled": true, + "prevent_loop": true }, "caches":{ + "partitions": { + "*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false}, // control dispatcher routes caching + "*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false} // control dispatcher interface + }, "remote_conns": ["gob_cache"] }, diff --git a/data/conf/samples/dispatcher_opts_apier/cgrates.json b/data/conf/samples/dispatcher_opts_apier/cgrates.json index aebba42fb..dea68ffb0 100644 --- a/data/conf/samples/dispatcher_opts_apier/cgrates.json +++ b/data/conf/samples/dispatcher_opts_apier/cgrates.json @@ -1,7 +1,7 @@ { "general": { - "node_id": "DispatcherOpts_APIer", + "node_id": "HOST2", "log_level": 7 }, @@ -24,20 +24,31 @@ "dispatchers":{ - "enabled": true + "enabled": true, + "prevent_loop": true }, "caches":{ - //"remote_conns": ["*internal"], + "partitions": { + "*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false}, // control dispatcher routes caching + "*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false} // control dispatcher interface + }, + "remote_conns": ["gob_cache"] }, "apiers": { - "enabled": true, + "enabled": true // "caches_conns":["broadcast_cache"] }, - // "rpc_conns": { + "rpc_conns": { + "gob_cache": { + "strategy": "*first", + "conns": [ + {"address": "127.0.0.1:4013", "transport":"*gob"} + ] + } // "broadcast_cache": { // "strategy": "*broadcast", // "conns": [ @@ -46,6 +57,6 @@ // {"address": "127.0.0.1:6012", "transport":"*json"} // ] // } - // } + } } \ No newline at end of file diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go index 46bd454ce..acd7a17d1 100644 --- a/dispatchers/dispatchers.go +++ b/dispatchers/dispatchers.go @@ -102,6 +102,21 @@ func (dS *DispatcherService) authorize(method, tenant string, apiKey string, evT // or utils.ErrNotFound if none present func (dS *DispatcherService) dispatcherProfilesForEvent(tnt string, ev *utils.CGREvent, evNm utils.MapStorage, subsys string) (dPrlfs engine.DispatcherProfiles, err error) { + // make sure dispatching is allowed + var shouldDispatch bool + if shouldDispatch, err = utils.GetBoolOpts(ev, true, utils.OptsDispatchers); err != nil { + return + } else { + var subsys string + if subsys, err = evNm.FieldAsString([]string{utils.MetaOpts, utils.MetaSubsys}); err != nil { + return + } + if !shouldDispatch || (dS.cfg.DispatcherSCfg().PreventLoop && + subsys == utils.MetaDispatchers) { + return engine.DispatcherProfiles{ + &engine.DispatcherProfile{Tenant: utils.MetaInternal, ID: utils.MetaInternal}}, nil + } + } // find out the matching profiles anyIdxPrfx := utils.ConcatenatedKey(tnt, utils.MetaAny) idxKeyPrfx := anyIdxPrfx @@ -184,18 +199,43 @@ func (dS *DispatcherService) dispatcherProfilesForEvent(tnt string, ev *utils.CG // Dispatch is the method forwarding the request towards the right connection func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, serviceMethod string, args interface{}, reply interface{}) (err error) { + // fix missing tenant tnt := ev.Tenant if tnt == utils.EmptyString { tnt = dS.cfg.GeneralCfg().DefaultTenant } + if ev.APIOpts == nil { + ev.APIOpts = make(map[string]interface{}) + } + evNm := utils.MapStorage{ + utils.MetaReq: ev.Event, + utils.MetaOpts: ev.APIOpts, + utils.MetaVars: utils.MapStorage{ + utils.MetaSubsys: subsys, + utils.MetaMethod: serviceMethod, + }, + } + dspLoopAPIOpts := map[string]interface{}{ + utils.MetaSubsys: utils.MetaDispatchers, + utils.MetaNodeID: dS.cfg.GeneralCfg().NodeID, + } + // avoid further processing if the request is internal var shouldDispatch bool if shouldDispatch, err = utils.GetBoolOpts(ev, true, utils.OptsDispatchers); err != nil { return utils.NewErrDispatcherS(err) - } else if !shouldDispatch { - return callDH( - newInternalHost(tnt), utils.EmptyString, nil, - serviceMethod, args, reply) + } else { + var subsys string + if subsys, err = evNm.FieldAsString([]string{utils.MetaOpts, utils.MetaSubsys}); err != nil && + err != utils.ErrNotFound { + return + } + if !shouldDispatch || (dS.cfg.DispatcherSCfg().PreventLoop && + subsys == utils.MetaDispatchers) { + return callDH(newInternalHost(tnt), utils.EmptyString, nil, + serviceMethod, args, reply) + } } + // in case of routeID, route based on previously discovered profile var dR *DispatcherRoute var dPrfls engine.DispatcherProfiles routeID := utils.IfaceAsString(ev.APIOpts[utils.OptsRouteID]) @@ -207,44 +247,45 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, defer guardian.Guardian.UnguardIDs(refID) // use previously discovered route argsCache := &utils.ArgsGetCacheItemWithAPIOpts{ - Tenant: ev.Tenant, - APIOpts: map[string]interface{}{ - utils.MetaSubsys: utils.MetaDispatchers, - utils.MetaNodeID: dS.cfg.GeneralCfg().NodeID, - }, + Tenant: ev.Tenant, + APIOpts: dspLoopAPIOpts, ArgsGetCacheItem: utils.ArgsGetCacheItem{ CacheID: utils.CacheDispatcherRoutes, ItemID: routeID, }} - // item var itmRemote interface{} - if err = dS.connMgr.Call(dS.cfg.CacheCfg().RemoteConns, nil, - utils.CacheSv1GetItem, argsCache, &itmRemote); err != nil && - err.Error() != utils.ErrNotFound.Error() { - return utils.NewErrDispatcherS(err) - } else if err == nil { // not found - dR = itmRemote.(*DispatcherRoute) - routeID = utils.EmptyString // cancel cache replication + if itmRemote, err = engine.Cache.GetWithRemote(argsCache); err == nil && itmRemote != nil { + var canCast bool + if dR, canCast = itmRemote.(*DispatcherRoute); !canCast { + err = utils.ErrCastFailed + } else { + var d Dispatcher + if d, err = getDispatcherWithCache( + &engine.DispatcherProfile{Tenant: dR.Tenant, ID: dR.ProfileID}, + dS.dm); err == nil { + for k, v := range dspLoopAPIOpts { + ev.APIOpts[k] = v // dispatcher loop protection opts + } + if err = d.Dispatch(dS.dm, dS.fltrS, evNm, tnt, utils.EmptyString, dR, + serviceMethod, args, reply); !rpcclient.IsNetworkError(err) { + return // dispatch success or specific error coming from upstream + } + } + } + } + if err != nil { + // did not dispatch properly, fail-back to standard dispatching + utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> using cached routing for dR %+v, continuing with normal dispatching", + utils.DispatcherS, err.Error(), dR)) } } - if dR != nil { - dPrfls = engine.DispatcherProfiles{ - &engine.DispatcherProfile{Tenant: dR.Tenant, ID: dR.ProfileID}} // will be used bellow to retrieve the dispatcher - } - evNm := utils.MapStorage{ - utils.MetaReq: ev.Event, - utils.MetaOpts: ev.APIOpts, - utils.MetaVars: utils.MapStorage{ - utils.MetaMethod: serviceMethod, - }, - } - if dPrfls == nil { // did not discover it yet - if dPrfls, err = dS.dispatcherProfilesForEvent(tnt, ev, evNm, subsys); err != nil { - return utils.NewErrDispatcherS(err) - } - } - if len(dPrfls) == 0 { + if dPrfls, err = dS.dispatcherProfilesForEvent(tnt, ev, evNm, subsys); err != nil { + return utils.NewErrDispatcherS(err) + } else if len(dPrfls) == 0 { // no profiles matched return utils.NewErrDispatcherS(utils.ErrPrefixNotFound("PROFILE")) + } else if isInternalDispatcherProfile(dPrfls[0]) { // dispatcherS was disabled + return callDH(newInternalHost(tnt), utils.EmptyString, nil, + serviceMethod, args, reply) } if ev.APIOpts == nil { ev.APIOpts = make(map[string]interface{}) @@ -252,45 +293,20 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, ev.APIOpts[utils.MetaSubsys] = utils.MetaDispatchers // inject into args ev.APIOpts[utils.MetaNodeID] = dS.cfg.GeneralCfg().NodeID for _, dPrfl := range dPrfls { - tntID := dPrfl.TenantID() // get or build the Dispatcher for the config var d Dispatcher - if x, ok := engine.Cache.Get(utils.CacheDispatchers, - tntID); ok && x != nil { - d = x.(Dispatcher) - } else { // dispatcher is not cached, build it here - if dPrfl.Hosts == nil { // dispatcher profile was not retrieved but built artificially above, try retrieving - if dPrfl, err = dS.dm.GetDispatcherProfile(dPrfl.Tenant, dPrfl.ID, - true, true, utils.NonTransactional); err != nil { - if err != utils.ErrNotFound { - return - } - // profile was not found - utils.Logger.Warning(fmt.Sprintf("<%s> could not find profile with tenant: <%s> and ID <%s> for routeID: <%s>", - utils.DispatcherS, dR.Tenant, dR.ProfileID, routeID)) - if len(dPrfls) == 1 { // the only profile set does not exist anymore - return utils.NewErrDispatcherS(utils.ErrPrefixNotFound("PROFILE")) - } - continue - } - } - if d, err = newDispatcher(dPrfl); err != nil { - return utils.NewErrDispatcherS(err) - } else if err = engine.Cache.Set(utils.CacheDispatchers, tntID, d, // cache the built Dispatcher - nil, true, utils.EmptyString); err != nil { - return utils.NewErrDispatcherS(err) + if d, err = getDispatcherWithCache(dPrfl, dS.dm); err == nil { + if err = d.Dispatch(dS.dm, dS.fltrS, evNm, tnt, routeID, + &DispatcherRoute{ + Tenant: dPrfl.Tenant, + ProfileID: dPrfl.ID, + }, + serviceMethod, args, reply); !rpcclient.IsNetworkError(err) { + return } } - if routeID != utils.EmptyString && dR == nil { // first time we cache the route - dR = &DispatcherRoute{ - Tenant: dPrfl.Tenant, - ProfileID: dPrfl.ID, - } - } - if err = d.Dispatch(dS.dm, dS.fltrS, evNm, tnt, routeID, dR, - serviceMethod, args, reply); !rpcclient.IsNetworkError(err) { - return - } + utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching with the profile: <%+v>", + utils.DispatcherS, err.Error(), dPrfl)) } return // return the last error } diff --git a/dispatchers/libdispatcher.go b/dispatchers/libdispatcher.go index c6b9d945d..c76519164 100644 --- a/dispatchers/libdispatcher.go +++ b/dispatchers/libdispatcher.go @@ -32,16 +32,48 @@ import ( "github.com/cgrates/rpcclient" ) +var ( + internalDispatcher = &engine.DispatcherProfile{Tenant: utils.MetaInternal, ID: utils.MetaInternal} +) + func init() { gob.Register(new(LoadMetrics)) gob.Register(new(DispatcherRoute)) } +// isInternalDispatcherProfile compares the profile to the internal one +func isInternalDispatcherProfile(d *engine.DispatcherProfile) bool { + return d.Tenant == internalDispatcher.Tenant && d.ID == internalDispatcher.ID +} + // DispatcherRoute is bounded to a routeID type DispatcherRoute struct { Tenant, ProfileID, HostID string } +// getDispatcherWithCache +func getDispatcherWithCache(dPrfl *engine.DispatcherProfile, dm *engine.DataManager) (d Dispatcher, err error) { + tntID := dPrfl.TenantID() + if x, ok := engine.Cache.Get(utils.CacheDispatchers, + tntID); ok && x != nil { + d = x.(Dispatcher) + return + } + if dPrfl.Hosts == nil { // dispatcher profile was not retrieved but built artificially above, try retrieving + if dPrfl, err = dm.GetDispatcherProfile(dPrfl.Tenant, dPrfl.ID, + true, true, utils.NonTransactional); err != nil { + return + } + } + if d, err = newDispatcher(dPrfl); err != nil { + return + } else if err = engine.Cache.Set(utils.CacheDispatchers, tntID, d, // cache the built Dispatcher + nil, true, utils.EmptyString); err != nil { + return + } + return +} + // Dispatcher is responsible for routing requests to pool of connections // there will be different implementations based on strategy type Dispatcher interface { @@ -176,15 +208,9 @@ type singleResultDispatcher struct { func (sd *singleResultDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS, ev utils.DataProvider, tnt, routeID string, dR *DispatcherRoute, serviceMethod string, args interface{}, reply interface{}) (err error) { - if routeID != utils.EmptyString && dR.HostID != utils.EmptyString { // route to previously discovered route - if err = callDHwithID(tnt, dR.HostID, routeID, dR, dm, - serviceMethod, args, reply); err == nil || - (err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors - return - } - // not found or network errors will continue - utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with identity <%q>", - utils.DispatcherS, err.Error(), dR.HostID)) + if dR != nil && dR.HostID != utils.EmptyString { // route to previously discovered route + return callDHwithID(tnt, dR.HostID, routeID, dR, dm, + serviceMethod, args, reply) } var hostIDs []string if hostIDs, err = sd.sorter.Sort(flts, ev, tnt, sd.hosts); err != nil { @@ -282,7 +308,7 @@ func (ld *loadDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS, } else if lM, err = newLoadMetrics(ld.hosts, ld.defaultRatio); err != nil { return } - if routeID != utils.EmptyString && dR.HostID != utils.EmptyString { // route to previously discovered route + if dR != nil && dR.HostID != utils.EmptyString { // route to previously discovered route lM.incrementLoad(dR.HostID, ld.tntID) err = callDHwithID(tnt, dR.HostID, routeID, dR, dm, serviceMethod, args, reply) @@ -421,12 +447,18 @@ func callDH(dh *engine.DispatcherHost, routeID string, dR *DispatcherRoute, utils.MetaSubsys: utils.MetaDispatchers, utils.MetaNodeID: config.CgrConfig().GeneralCfg().NodeID, }, - CacheID: utils.CacheDispatcherRoutes, - ItemID: routeID, - Value: dR, + CacheID: utils.CacheDispatcherRoutes, + ItemID: routeID, + Value: dR, + GroupIDs: []string{utils.ConcatenatedKey(utils.CacheDispatcherProfiles, dR.Tenant, dR.ProfileID)}, } if err = engine.Cache.SetWithReplicate(argsCache); err != nil { - return + if !rpcclient.IsNetworkError(err) { + return + } + // did not dispatch properly, fail-back to standard dispatching + utils.Logger.Warning(fmt.Sprintf("<%s> ignoring cache network error <%s> setting route dR %+v", + utils.DispatcherS, err.Error(), dR)) } } if err = dh.Call(method, args, reply); err != nil { diff --git a/engine/caches.go b/engine/caches.go index 0177d7424..f23996915 100644 --- a/engine/caches.go +++ b/engine/caches.go @@ -209,9 +209,8 @@ func (chS *CacheS) GetWithRemote(args *utils.ArgsGetCacheItemWithAPIOpts) (itm i return } // item was not found locally, query from remote - var itmRemote interface{} if err = connMgr.Call(chS.cfg.CacheCfg().RemoteConns, nil, - utils.CacheSv1GetItem, args, &itmRemote); err != nil && + utils.CacheSv1GetItem, args, &itm); err != nil && err.Error() == utils.ErrNotFound.Error() { return nil, utils.ErrNotFound // correct the error coming as string type } diff --git a/general_tests/dispatcher_opts_it_test.go b/general_tests/dispatcher_opts_it_test.go index 3bd280549..1906d24be 100644 --- a/general_tests/dispatcher_opts_it_test.go +++ b/general_tests/dispatcher_opts_it_test.go @@ -21,6 +21,7 @@ package general_tests import ( "net/rpc" "path" + "reflect" "testing" "time" @@ -30,53 +31,46 @@ import ( ) var ( - setterCfgPath string - setterCfg *config.CGRConfig - setterRPC *rpc.Client - dspOptsCfgPath string apierCfgPath string + setterCfgPath string dspOptsCfg *config.CGRConfig apierCfg *config.CGRConfig + setterCfg *config.CGRConfig dspOptsRPC *rpc.Client apierRPC *rpc.Client + setterRPC *rpc.Client dspOptsConfigDIR string dpsOptsTest = []func(t *testing.T){ testDispatcherOptsSetterInitCfg, testDispatcherOptsSetterInitDataDb, testDispatcherOptsSetterStartEngine, testDispatcherOptsSetterRPCConn, + // Start engine without Dispatcher on engine 4012 testDispatcherOptsAPIerInitCfg, testDispatcherOptsAPIerInitDataDb, testDispatcherOptsAPIerStartEngine, testDispatcherOptsAPIerRPCConn, - testDispatcherOptsSetterSetDispatcherProfile, - // testDispatcherOptsAPIerSetDispatcherProfile, + testDispatcherOptsAPIerSetDispatcherProfile, // Start engine without Dispatcher on engine 2012 with profiles in database (*dispatchers:false) testDispatcherOptsDSPInitCfg, testDispatcherOptsDSPStartEngine, testDispatcherOptsDSPRPCConn, - testDispatcherOptsCoreStatus, // self localhost(:2012) CoresV1Status + testDispatcherOptsCoreStatus, // localhost(:2012) CoresV1Status + + testDispatcherGetItemBothEngines1, + testDispatcherAPIerCoreStatus, + + testDispatcherOptsAPIerSetDispatcherHost4012, + testDispatcherCacheClearBothEngines, - testDispatcherOptsSetterSetDispatcherHost4012, - // testDispatcherOptsAPIerSetDispatcherHost4012, testDispatcherOptsCoreStatusHost4012, - - testDispatcherOptsSetterSetDispatcherProfileDoubleHost, - // testDispatcherOptsAPIerSetDispatcherProfileDoubleHost, - testDispatcherOptsCoreStatusWithRouteID, - - testDispatcherOptsSetterSetDispatcherHostInexistent, - // testDispatcherOptsAPIerSetDispatcherHostInexistent, - testDispatcherOptsCoreStatusWithRouteID2, - - testDispatcherOptsCoreStatusWithoutRouteID, + testDispatcherGetItemBothEngines2, testDispatcherOptsDSPStopEngine, testDispatcherOptsAPIerStopEngine, - // testDispatcherOptsSetterStopEngine, } ) @@ -124,8 +118,8 @@ func testDispatcherOptsAPIerSetDispatcherProfile(t *testing.T) { DispatcherHost: &engine.DispatcherHost{ Tenant: "cgrates.org", RemoteHost: &config.RemoteHost{ - ID: "SELF_ENGINE", - Address: "127.0.0.1:4012", + ID: "HOST1", + Address: "127.0.0.1:2012", Transport: "*json", ConnectAttempts: 1, Reconnects: 3, @@ -137,7 +131,7 @@ func testDispatcherOptsAPIerSetDispatcherProfile(t *testing.T) { utils.OptsDispatchers: false, }, } - if err := apierRPC.Call(utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil { + if err := setterRPC.Call(utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil { t.Error("Unexpected error when calling APIerSv1.SetDispatcherHost: ", err) } else if replyStr != utils.OK { t.Error("Unexpected reply returned", replyStr) @@ -153,7 +147,7 @@ func testDispatcherOptsAPIerSetDispatcherProfile(t *testing.T) { Weight: 10, Hosts: engine.DispatcherHostProfiles{ { - ID: "SELF_ENGINE", + ID: "HOST1", Weight: 5, }, }, @@ -162,7 +156,7 @@ func testDispatcherOptsAPIerSetDispatcherProfile(t *testing.T) { utils.OptsDispatchers: false, }, } - if err := apierRPC.Call(utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil { + if err := setterRPC.Call(utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil { t.Error("Unexpected error when calling APIerSv1.SetDispatcherProfile: ", err) } else if replyStr != utils.OK { t.Error("Unexpected reply returned", replyStr) @@ -195,7 +189,56 @@ func testDispatcherOptsDSPRPCConn(t *testing.T) { } func testDispatcherOptsCoreStatus(t *testing.T) { - //SELF_ENGINE HOST + // HOST1 host matched + var reply map[string]interface{} + ev := utils.TenantWithAPIOpts{ + Tenant: "cgrates.org", + APIOpts: map[string]interface{}{ + "*host": "HOST2", + utils.OptsRouteID: "account#dan.bogos", + }, + } + if err := dspOptsRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil { + t.Error(err) + } else if reply[utils.NodeID] != "HOST1" { + t.Errorf("Expected HOST1, received %v", reply[utils.NodeID]) + } +} + +func testDispatcherGetItemBothEngines1(t *testing.T) { + argsCache := &utils.ArgsGetCacheItemWithAPIOpts{ + Tenant: "cgrates.org", + APIOpts: map[string]interface{}{ + utils.OptsDispatchers: false, + }, + ArgsGetCacheItem: utils.ArgsGetCacheItem{ + CacheID: utils.CacheDispatcherRoutes, + ItemID: "account#dan.bogos:*core", + }, + } + var reply interface{} + if err := dspOptsRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, + &reply); err != nil { + t.Error(err) + } else { + expected := map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.ProfileID: "DSP1", + "HostID": "HOST1", + } + if !reflect.DeepEqual(expected, reply) { + t.Errorf("Expected %+v, \n received %+v", utils.ToJSON(expected), utils.ToJSON(reply)) + } + } + + if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, + &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } +} + +func testDispatcherAPIerCoreStatus(t *testing.T) { + // HOST2 host matched because it was called from engine with port :4012 -> host2 var reply map[string]interface{} ev := utils.TenantWithAPIOpts{ Tenant: "cgrates.org", @@ -203,12 +246,10 @@ func testDispatcherOptsCoreStatus(t *testing.T) { utils.OptsDispatchers: false, }, } - if err := dspOptsRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil { + if err := apierRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil { t.Error(err) - } else { - /* - t.Errorf("Received: %s", utils.ToJSON(reply)) - */ + } else if reply[utils.NodeID] != "HOST2" { + t.Errorf("Expected HOST2, received %v", reply[utils.NodeID]) } } @@ -219,7 +260,7 @@ func testDispatcherOptsAPIerSetDispatcherHost4012(t *testing.T) { DispatcherHost: &engine.DispatcherHost{ Tenant: "cgrates.org", RemoteHost: &config.RemoteHost{ - ID: "HOST4012", + ID: "HOST2", Address: "127.0.0.1:4012", Transport: "*json", ConnectAttempts: 1, @@ -232,7 +273,7 @@ func testDispatcherOptsAPIerSetDispatcherHost4012(t *testing.T) { utils.OptsDispatchers: false, }, } - if err := apierRPC.Call(utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil { + if err := setterRPC.Call(utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil { t.Error("Unexpected error when calling APIerSv1.SetDispatcherHost: ", err) } else if replyStr != utils.OK { t.Error("Unexpected reply returned", replyStr) @@ -248,7 +289,7 @@ func testDispatcherOptsAPIerSetDispatcherHost4012(t *testing.T) { Weight: 10, Hosts: engine.DispatcherHostProfiles{ { - ID: "HOST4012", + ID: "HOST2", Weight: 10, }, }, @@ -257,7 +298,7 @@ func testDispatcherOptsAPIerSetDispatcherHost4012(t *testing.T) { utils.OptsDispatchers: false, }, } - if err := apierRPC.Call(utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil { + if err := setterRPC.Call(utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil { t.Error("Unexpected error when calling APIerSv1.SetDispatcherProfile: ", err) } else if replyStr != utils.OK { t.Error("Unexpected reply returned", replyStr) @@ -267,152 +308,72 @@ func testDispatcherOptsAPIerSetDispatcherHost4012(t *testing.T) { func testDispatcherOptsCoreStatusHost4012(t *testing.T) { // status just for HOST4012 var reply map[string]interface{} - ev := utils.TenantWithAPIOpts{ - Tenant: "cgrates.org", - } - if err := dspOptsRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil { - t.Error(err) - } else { - /* - t.Errorf("Received: %s", utils.ToJSON(reply)) - */ - } -} - -func testDispatcherOptsAPIerSetDispatcherProfileDoubleHost(t *testing.T) { - // Set DispatcherProfile with both engines - setDispatcherProfile := &engine.DispatcherProfileWithAPIOpts{ - DispatcherProfile: &engine.DispatcherProfile{ - Tenant: "cgrates.org", - ID: "DSP1", - Strategy: "*weight", - Subsystems: []string{utils.MetaAny}, - Weight: 10, - Hosts: engine.DispatcherHostProfiles{ - { - ID: "SELF_ENGINE", - Weight: 5, - }, - { - ID: "HOST4012", - Weight: 10, - }, - }, - }, - APIOpts: map[string]interface{}{ - utils.OptsDispatchers: false, - }, - } - var replyStr string - if err := apierRPC.Call(utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil { - t.Error("Unexpected error when calling APIerSv1.SetDispatcherProfile: ", err) - } else if replyStr != utils.OK { - t.Error("Unexpected reply returned", replyStr) - } -} - -func testDispatcherOptsCoreStatusWithRouteID(t *testing.T) { - // now it will dispatch in both engines - var reply map[string]interface{} ev := utils.TenantWithAPIOpts{ Tenant: "cgrates.org", APIOpts: map[string]interface{}{ + "*host": "HOST2", utils.OptsRouteID: "account#dan.bogos", }, } if err := dspOptsRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil { t.Error(err) - } else { - /* - t.Errorf("Received: %s", utils.ToJSON(reply)) - */ + } else if reply[utils.NodeID] != "HOST2" { + t.Errorf("Expected HOST1, received %v", reply[utils.NodeID]) } } -func testDispatcherOptsAPIerSetDispatcherHostInexistent(t *testing.T) { - // Set DispatcherHost on 4012 host - var replyStr string - setDispatcherHost := &engine.DispatcherHostWithAPIOpts{ - DispatcherHost: &engine.DispatcherHost{ - Tenant: "cgrates.org", - RemoteHost: &config.RemoteHost{ - ID: "INEXISTENT", - Address: "127.0.0.1:1223", - Transport: "*json", - ConnectAttempts: 1, - Reconnects: 3, - ConnectTimeout: time.Minute, - ReplyTimeout: 2 * time.Minute, - }, - }, +func testDispatcherCacheClearBothEngines(t *testing.T) { + var reply string + if err := dspOptsRPC.Call(utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{ APIOpts: map[string]interface{}{ utils.OptsDispatchers: false, }, - } - if err := apierRPC.Call(utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil { - t.Error("Unexpected error when calling APIerSv1.SetDispatcherHost: ", err) - } else if replyStr != utils.OK { - t.Error("Unexpected reply returned", replyStr) + }, &reply); err != nil { + t.Fatal(err) + } else if reply != utils.OK { + t.Errorf("Unexpected reply returned") } - // Set DispatcherProfile Different with an inexistent engine opened, but with a bigger weight(this should match now) - setDispatcherProfile := &engine.DispatcherProfileWithAPIOpts{ - DispatcherProfile: &engine.DispatcherProfile{ - Tenant: "cgrates.org", - ID: "DSP1", - Strategy: "*weight", - Subsystems: []string{utils.MetaAny}, - Weight: 20, - Hosts: engine.DispatcherHostProfiles{ - { - ID: "INEXISTENT", - Weight: 10, - }, - }, - }, + if err := apierRPC.Call(utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{ APIOpts: map[string]interface{}{ utils.OptsDispatchers: false, }, - } - if err := apierRPC.Call(utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil { - t.Error("Unexpected error when calling APIerSv1.SetDispatcherProfile: ", err) - } else if replyStr != utils.OK { - t.Error("Unexpected reply returned", replyStr) + }, &reply); err != nil { + t.Fatal(err) + } else if reply != utils.OK { + t.Errorf("Unexpected reply returned") } } -func testDispatcherOptsCoreStatusWithRouteID2(t *testing.T) { - // because we have the routeID it will match DSP1 and last host matched, host4012 - // so again, both engines will match - var reply map[string]interface{} - ev := utils.TenantWithAPIOpts{ +func testDispatcherGetItemBothEngines2(t *testing.T) { + argsCache := &utils.ArgsGetCacheItemWithAPIOpts{ Tenant: "cgrates.org", APIOpts: map[string]interface{}{ - utils.OptsRouteID: "account#dan.bogos", + utils.OptsDispatchers: false, + }, + ArgsGetCacheItem: utils.ArgsGetCacheItem{ + CacheID: utils.CacheDispatcherRoutes, + ItemID: "account#dan.bogos:*core", }, } - if err := dspOptsRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil { + var reply interface{} + if err := dspOptsRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, + &reply); err != nil { t.Error(err) } else { - /* - t.Errorf("Received: %s", utils.ToJSON(reply)) - */ + expected := map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.ProfileID: "DSP1", + "HostID": "HOST2", + } + if !reflect.DeepEqual(expected, reply) { + t.Errorf("Expected %+v, \n received %+v", utils.ToJSON(expected), utils.ToJSON(reply)) + } } -} -func testDispatcherOptsCoreStatusWithoutRouteID(t *testing.T) { - // because we have the routeID it will match DSP1 and last host matched, host4012 - // so again, both engines will match - var reply map[string]interface{} - ev := utils.TenantWithAPIOpts{ - Tenant: "cgrates.org", - } - if err := dspOptsRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil { + if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache, + &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) - } else { - /* - t.Errorf("Received: %s", utils.ToJSON(reply)) - */ } } diff --git a/utils/consts.go b/utils/consts.go index aa625e978..a898bd409 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2276,6 +2276,7 @@ const ( // DispatcherSCfg AnySubsystemCfg = "any_subsystem" + PreventLoopCfg = "prevent_loop" ) // FC Template diff --git a/utils/errors.go b/utils/errors.go index d1716bc9b..b1baad919 100644 --- a/utils/errors.go +++ b/utils/errors.go @@ -76,6 +76,7 @@ var ( ErrMaxConcurentRPCExceeded = errors.New("MAX_CONCURENT_RPC_EXCEEDED") // but the codec will rewrite it with this one to be sure that we corectly dealocate the request ErrMaxIterationsReached = errors.New("maximum iterations reached") ErrNegative = errors.New("NEGATIVE") + ErrCastFailed = errors.New("CAST_FAILED") ErrMap = map[string]error{ ErrNoMoreData.Error(): ErrNoMoreData,