Finish porting dispatcher updates from 1.0

This commit is contained in:
ionutboangiu
2022-09-23 21:24:54 +03:00
committed by Dan Christian Bogos
parent 895c140209
commit 3dfdf103ad
11 changed files with 283 additions and 249 deletions

View File

@@ -1087,6 +1087,7 @@ const CGRATES_CFG_JSON = `
"nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level)
"attributes_conns": [], // connections to AttributeS for API authorization, empty to disable auth functionality: <""|*internal|$rpc_conns_id>
"any_subsystem": true, // if we match the *any subsystem
"prevent_loops": false, //
},

View File

@@ -32,6 +32,7 @@ type DispatcherSCfg struct {
AttributeSConns []string
NestedFields bool
AnySubsystem bool
PreventLoop bool
}
func (dps *DispatcherSCfg) loadFromJSONCfg(jsnCfg *DispatcherSJsonCfg) (err error) {
@@ -81,37 +82,41 @@ func (dps *DispatcherSCfg) loadFromJSONCfg(jsnCfg *DispatcherSJsonCfg) (err erro
if jsnCfg.Any_subsystem != nil {
dps.AnySubsystem = *jsnCfg.Any_subsystem
}
if jsnCfg.Prevent_loop != nil {
dps.PreventLoop = *jsnCfg.Prevent_loop
}
return nil
}
// AsMapInterface returns the config as a map[string]interface{}
func (dps *DispatcherSCfg) AsMapInterface() (initialMP map[string]interface{}) {
initialMP = map[string]interface{}{
func (dps *DispatcherSCfg) AsMapInterface() (mp map[string]interface{}) {
mp = map[string]interface{}{
utils.EnabledCfg: dps.Enabled,
utils.IndexedSelectsCfg: dps.IndexedSelects,
utils.NestedFieldsCfg: dps.NestedFields,
utils.AnySubsystemCfg: dps.AnySubsystem,
utils.PreventLoopCfg: dps.PreventLoop,
}
if dps.StringIndexedFields != nil {
stringIndexedFields := make([]string, len(*dps.StringIndexedFields))
for i, item := range *dps.StringIndexedFields {
stringIndexedFields[i] = item
}
initialMP[utils.StringIndexedFieldsCfg] = stringIndexedFields
mp[utils.StringIndexedFieldsCfg] = stringIndexedFields
}
if dps.PrefixIndexedFields != nil {
prefixIndexedFields := make([]string, len(*dps.PrefixIndexedFields))
for i, item := range *dps.PrefixIndexedFields {
prefixIndexedFields[i] = item
}
initialMP[utils.PrefixIndexedFieldsCfg] = prefixIndexedFields
mp[utils.PrefixIndexedFieldsCfg] = prefixIndexedFields
}
if dps.SuffixIndexedFields != nil {
suffixIndexedFields := make([]string, len(*dps.SuffixIndexedFields))
for i, item := range *dps.SuffixIndexedFields {
suffixIndexedFields[i] = item
}
initialMP[utils.SuffixIndexedFieldsCfg] = suffixIndexedFields
mp[utils.SuffixIndexedFieldsCfg] = suffixIndexedFields
}
if dps.AttributeSConns != nil {
attributeSConns := make([]string, len(dps.AttributeSConns))
@@ -121,7 +126,7 @@ func (dps *DispatcherSCfg) AsMapInterface() (initialMP map[string]interface{}) {
attributeSConns[i] = utils.MetaInternal
}
}
initialMP[utils.AttributeSConnsCfg] = attributeSConns
mp[utils.AttributeSConnsCfg] = attributeSConns
}
return
}
@@ -133,6 +138,7 @@ func (dps DispatcherSCfg) Clone() (cln *DispatcherSCfg) {
IndexedSelects: dps.IndexedSelects,
NestedFields: dps.NestedFields,
AnySubsystem: dps.AnySubsystem,
PreventLoop: dps.PreventLoop,
}
if dps.AttributeSConns != nil {

View File

@@ -739,6 +739,7 @@ type DispatcherSJsonCfg struct {
Nested_fields *bool // applies when indexed fields is not defined
Attributes_conns *[]string
Any_subsystem *bool
Prevent_loop *bool
}
type RegistrarCJsonCfg struct {

View File

@@ -1,7 +1,7 @@
{
"general": {
"node_id": "DispatcherOpts",
"node_id": "HOST1",
"log_level": 7
},
@@ -27,10 +27,15 @@
"dispatchers":{
"enabled": true
"enabled": true,
"prevent_loop": true
},
"caches":{
"partitions": {
"*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false}, // control dispatcher routes caching
"*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false} // control dispatcher interface
},
"remote_conns": ["gob_cache"]
},

View File

@@ -1,7 +1,7 @@
{
"general": {
"node_id": "DispatcherOpts_APIer",
"node_id": "HOST2",
"log_level": 7
},
@@ -24,20 +24,31 @@
"dispatchers":{
"enabled": true
"enabled": true,
"prevent_loop": true
},
"caches":{
//"remote_conns": ["*internal"],
"partitions": {
"*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false}, // control dispatcher routes caching
"*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false} // control dispatcher interface
},
"remote_conns": ["gob_cache"]
},
"apiers": {
"enabled": true,
"enabled": true
// "caches_conns":["broadcast_cache"]
},
// "rpc_conns": {
"rpc_conns": {
"gob_cache": {
"strategy": "*first",
"conns": [
{"address": "127.0.0.1:4013", "transport":"*gob"}
]
}
// "broadcast_cache": {
// "strategy": "*broadcast",
// "conns": [
@@ -46,6 +57,6 @@
// {"address": "127.0.0.1:6012", "transport":"*json"}
// ]
// }
// }
}
}

View File

@@ -102,6 +102,21 @@ func (dS *DispatcherService) authorize(method, tenant string, apiKey string, evT
// or utils.ErrNotFound if none present
func (dS *DispatcherService) dispatcherProfilesForEvent(tnt string, ev *utils.CGREvent,
evNm utils.MapStorage, subsys string) (dPrlfs engine.DispatcherProfiles, err error) {
// make sure dispatching is allowed
var shouldDispatch bool
if shouldDispatch, err = utils.GetBoolOpts(ev, true, utils.OptsDispatchers); err != nil {
return
} else {
var subsys string
if subsys, err = evNm.FieldAsString([]string{utils.MetaOpts, utils.MetaSubsys}); err != nil {
return
}
if !shouldDispatch || (dS.cfg.DispatcherSCfg().PreventLoop &&
subsys == utils.MetaDispatchers) {
return engine.DispatcherProfiles{
&engine.DispatcherProfile{Tenant: utils.MetaInternal, ID: utils.MetaInternal}}, nil
}
}
// find out the matching profiles
anyIdxPrfx := utils.ConcatenatedKey(tnt, utils.MetaAny)
idxKeyPrfx := anyIdxPrfx
@@ -184,18 +199,43 @@ func (dS *DispatcherService) dispatcherProfilesForEvent(tnt string, ev *utils.CG
// Dispatch is the method forwarding the request towards the right connection
func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
// fix missing tenant
tnt := ev.Tenant
if tnt == utils.EmptyString {
tnt = dS.cfg.GeneralCfg().DefaultTenant
}
if ev.APIOpts == nil {
ev.APIOpts = make(map[string]interface{})
}
evNm := utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
utils.MetaVars: utils.MapStorage{
utils.MetaSubsys: subsys,
utils.MetaMethod: serviceMethod,
},
}
dspLoopAPIOpts := map[string]interface{}{
utils.MetaSubsys: utils.MetaDispatchers,
utils.MetaNodeID: dS.cfg.GeneralCfg().NodeID,
}
// avoid further processing if the request is internal
var shouldDispatch bool
if shouldDispatch, err = utils.GetBoolOpts(ev, true, utils.OptsDispatchers); err != nil {
return utils.NewErrDispatcherS(err)
} else if !shouldDispatch {
return callDH(
newInternalHost(tnt), utils.EmptyString, nil,
serviceMethod, args, reply)
} else {
var subsys string
if subsys, err = evNm.FieldAsString([]string{utils.MetaOpts, utils.MetaSubsys}); err != nil &&
err != utils.ErrNotFound {
return
}
if !shouldDispatch || (dS.cfg.DispatcherSCfg().PreventLoop &&
subsys == utils.MetaDispatchers) {
return callDH(newInternalHost(tnt), utils.EmptyString, nil,
serviceMethod, args, reply)
}
}
// in case of routeID, route based on previously discovered profile
var dR *DispatcherRoute
var dPrfls engine.DispatcherProfiles
routeID := utils.IfaceAsString(ev.APIOpts[utils.OptsRouteID])
@@ -207,44 +247,45 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string,
defer guardian.Guardian.UnguardIDs(refID)
// use previously discovered route
argsCache := &utils.ArgsGetCacheItemWithAPIOpts{
Tenant: ev.Tenant,
APIOpts: map[string]interface{}{
utils.MetaSubsys: utils.MetaDispatchers,
utils.MetaNodeID: dS.cfg.GeneralCfg().NodeID,
},
Tenant: ev.Tenant,
APIOpts: dspLoopAPIOpts,
ArgsGetCacheItem: utils.ArgsGetCacheItem{
CacheID: utils.CacheDispatcherRoutes,
ItemID: routeID,
}}
// item
var itmRemote interface{}
if err = dS.connMgr.Call(dS.cfg.CacheCfg().RemoteConns, nil,
utils.CacheSv1GetItem, argsCache, &itmRemote); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
return utils.NewErrDispatcherS(err)
} else if err == nil { // not found
dR = itmRemote.(*DispatcherRoute)
routeID = utils.EmptyString // cancel cache replication
if itmRemote, err = engine.Cache.GetWithRemote(argsCache); err == nil && itmRemote != nil {
var canCast bool
if dR, canCast = itmRemote.(*DispatcherRoute); !canCast {
err = utils.ErrCastFailed
} else {
var d Dispatcher
if d, err = getDispatcherWithCache(
&engine.DispatcherProfile{Tenant: dR.Tenant, ID: dR.ProfileID},
dS.dm); err == nil {
for k, v := range dspLoopAPIOpts {
ev.APIOpts[k] = v // dispatcher loop protection opts
}
if err = d.Dispatch(dS.dm, dS.fltrS, evNm, tnt, utils.EmptyString, dR,
serviceMethod, args, reply); !rpcclient.IsNetworkError(err) {
return // dispatch success or specific error coming from upstream
}
}
}
}
if err != nil {
// did not dispatch properly, fail-back to standard dispatching
utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> using cached routing for dR %+v, continuing with normal dispatching",
utils.DispatcherS, err.Error(), dR))
}
}
if dR != nil {
dPrfls = engine.DispatcherProfiles{
&engine.DispatcherProfile{Tenant: dR.Tenant, ID: dR.ProfileID}} // will be used bellow to retrieve the dispatcher
}
evNm := utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
utils.MetaVars: utils.MapStorage{
utils.MetaMethod: serviceMethod,
},
}
if dPrfls == nil { // did not discover it yet
if dPrfls, err = dS.dispatcherProfilesForEvent(tnt, ev, evNm, subsys); err != nil {
return utils.NewErrDispatcherS(err)
}
}
if len(dPrfls) == 0 {
if dPrfls, err = dS.dispatcherProfilesForEvent(tnt, ev, evNm, subsys); err != nil {
return utils.NewErrDispatcherS(err)
} else if len(dPrfls) == 0 { // no profiles matched
return utils.NewErrDispatcherS(utils.ErrPrefixNotFound("PROFILE"))
} else if isInternalDispatcherProfile(dPrfls[0]) { // dispatcherS was disabled
return callDH(newInternalHost(tnt), utils.EmptyString, nil,
serviceMethod, args, reply)
}
if ev.APIOpts == nil {
ev.APIOpts = make(map[string]interface{})
@@ -252,45 +293,20 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string,
ev.APIOpts[utils.MetaSubsys] = utils.MetaDispatchers // inject into args
ev.APIOpts[utils.MetaNodeID] = dS.cfg.GeneralCfg().NodeID
for _, dPrfl := range dPrfls {
tntID := dPrfl.TenantID()
// get or build the Dispatcher for the config
var d Dispatcher
if x, ok := engine.Cache.Get(utils.CacheDispatchers,
tntID); ok && x != nil {
d = x.(Dispatcher)
} else { // dispatcher is not cached, build it here
if dPrfl.Hosts == nil { // dispatcher profile was not retrieved but built artificially above, try retrieving
if dPrfl, err = dS.dm.GetDispatcherProfile(dPrfl.Tenant, dPrfl.ID,
true, true, utils.NonTransactional); err != nil {
if err != utils.ErrNotFound {
return
}
// profile was not found
utils.Logger.Warning(fmt.Sprintf("<%s> could not find profile with tenant: <%s> and ID <%s> for routeID: <%s>",
utils.DispatcherS, dR.Tenant, dR.ProfileID, routeID))
if len(dPrfls) == 1 { // the only profile set does not exist anymore
return utils.NewErrDispatcherS(utils.ErrPrefixNotFound("PROFILE"))
}
continue
}
}
if d, err = newDispatcher(dPrfl); err != nil {
return utils.NewErrDispatcherS(err)
} else if err = engine.Cache.Set(utils.CacheDispatchers, tntID, d, // cache the built Dispatcher
nil, true, utils.EmptyString); err != nil {
return utils.NewErrDispatcherS(err)
if d, err = getDispatcherWithCache(dPrfl, dS.dm); err == nil {
if err = d.Dispatch(dS.dm, dS.fltrS, evNm, tnt, routeID,
&DispatcherRoute{
Tenant: dPrfl.Tenant,
ProfileID: dPrfl.ID,
},
serviceMethod, args, reply); !rpcclient.IsNetworkError(err) {
return
}
}
if routeID != utils.EmptyString && dR == nil { // first time we cache the route
dR = &DispatcherRoute{
Tenant: dPrfl.Tenant,
ProfileID: dPrfl.ID,
}
}
if err = d.Dispatch(dS.dm, dS.fltrS, evNm, tnt, routeID, dR,
serviceMethod, args, reply); !rpcclient.IsNetworkError(err) {
return
}
utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching with the profile: <%+v>",
utils.DispatcherS, err.Error(), dPrfl))
}
return // return the last error
}

View File

@@ -32,16 +32,48 @@ import (
"github.com/cgrates/rpcclient"
)
var (
internalDispatcher = &engine.DispatcherProfile{Tenant: utils.MetaInternal, ID: utils.MetaInternal}
)
func init() {
gob.Register(new(LoadMetrics))
gob.Register(new(DispatcherRoute))
}
// isInternalDispatcherProfile compares the profile to the internal one
func isInternalDispatcherProfile(d *engine.DispatcherProfile) bool {
return d.Tenant == internalDispatcher.Tenant && d.ID == internalDispatcher.ID
}
// DispatcherRoute is bounded to a routeID
type DispatcherRoute struct {
Tenant, ProfileID, HostID string
}
// getDispatcherWithCache
func getDispatcherWithCache(dPrfl *engine.DispatcherProfile, dm *engine.DataManager) (d Dispatcher, err error) {
tntID := dPrfl.TenantID()
if x, ok := engine.Cache.Get(utils.CacheDispatchers,
tntID); ok && x != nil {
d = x.(Dispatcher)
return
}
if dPrfl.Hosts == nil { // dispatcher profile was not retrieved but built artificially above, try retrieving
if dPrfl, err = dm.GetDispatcherProfile(dPrfl.Tenant, dPrfl.ID,
true, true, utils.NonTransactional); err != nil {
return
}
}
if d, err = newDispatcher(dPrfl); err != nil {
return
} else if err = engine.Cache.Set(utils.CacheDispatchers, tntID, d, // cache the built Dispatcher
nil, true, utils.EmptyString); err != nil {
return
}
return
}
// Dispatcher is responsible for routing requests to pool of connections
// there will be different implementations based on strategy
type Dispatcher interface {
@@ -176,15 +208,9 @@ type singleResultDispatcher struct {
func (sd *singleResultDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS,
ev utils.DataProvider, tnt, routeID string, dR *DispatcherRoute,
serviceMethod string, args interface{}, reply interface{}) (err error) {
if routeID != utils.EmptyString && dR.HostID != utils.EmptyString { // route to previously discovered route
if err = callDHwithID(tnt, dR.HostID, routeID, dR, dm,
serviceMethod, args, reply); err == nil ||
(err != utils.ErrNotFound && !rpcclient.IsNetworkError(err)) { // successful dispatch with normal errors
return
}
// not found or network errors will continue
utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> dispatching to host with identity <%q>",
utils.DispatcherS, err.Error(), dR.HostID))
if dR != nil && dR.HostID != utils.EmptyString { // route to previously discovered route
return callDHwithID(tnt, dR.HostID, routeID, dR, dm,
serviceMethod, args, reply)
}
var hostIDs []string
if hostIDs, err = sd.sorter.Sort(flts, ev, tnt, sd.hosts); err != nil {
@@ -282,7 +308,7 @@ func (ld *loadDispatcher) Dispatch(dm *engine.DataManager, flts *engine.FilterS,
} else if lM, err = newLoadMetrics(ld.hosts, ld.defaultRatio); err != nil {
return
}
if routeID != utils.EmptyString && dR.HostID != utils.EmptyString { // route to previously discovered route
if dR != nil && dR.HostID != utils.EmptyString { // route to previously discovered route
lM.incrementLoad(dR.HostID, ld.tntID)
err = callDHwithID(tnt, dR.HostID, routeID, dR, dm,
serviceMethod, args, reply)
@@ -421,12 +447,18 @@ func callDH(dh *engine.DispatcherHost, routeID string, dR *DispatcherRoute,
utils.MetaSubsys: utils.MetaDispatchers,
utils.MetaNodeID: config.CgrConfig().GeneralCfg().NodeID,
},
CacheID: utils.CacheDispatcherRoutes,
ItemID: routeID,
Value: dR,
CacheID: utils.CacheDispatcherRoutes,
ItemID: routeID,
Value: dR,
GroupIDs: []string{utils.ConcatenatedKey(utils.CacheDispatcherProfiles, dR.Tenant, dR.ProfileID)},
}
if err = engine.Cache.SetWithReplicate(argsCache); err != nil {
return
if !rpcclient.IsNetworkError(err) {
return
}
// did not dispatch properly, fail-back to standard dispatching
utils.Logger.Warning(fmt.Sprintf("<%s> ignoring cache network error <%s> setting route dR %+v",
utils.DispatcherS, err.Error(), dR))
}
}
if err = dh.Call(method, args, reply); err != nil {

View File

@@ -209,9 +209,8 @@ func (chS *CacheS) GetWithRemote(args *utils.ArgsGetCacheItemWithAPIOpts) (itm i
return
}
// item was not found locally, query from remote
var itmRemote interface{}
if err = connMgr.Call(chS.cfg.CacheCfg().RemoteConns, nil,
utils.CacheSv1GetItem, args, &itmRemote); err != nil &&
utils.CacheSv1GetItem, args, &itm); err != nil &&
err.Error() == utils.ErrNotFound.Error() {
return nil, utils.ErrNotFound // correct the error coming as string type
}

View File

@@ -21,6 +21,7 @@ package general_tests
import (
"net/rpc"
"path"
"reflect"
"testing"
"time"
@@ -30,53 +31,46 @@ import (
)
var (
setterCfgPath string
setterCfg *config.CGRConfig
setterRPC *rpc.Client
dspOptsCfgPath string
apierCfgPath string
setterCfgPath string
dspOptsCfg *config.CGRConfig
apierCfg *config.CGRConfig
setterCfg *config.CGRConfig
dspOptsRPC *rpc.Client
apierRPC *rpc.Client
setterRPC *rpc.Client
dspOptsConfigDIR string
dpsOptsTest = []func(t *testing.T){
testDispatcherOptsSetterInitCfg,
testDispatcherOptsSetterInitDataDb,
testDispatcherOptsSetterStartEngine,
testDispatcherOptsSetterRPCConn,
// Start engine without Dispatcher on engine 4012
testDispatcherOptsAPIerInitCfg,
testDispatcherOptsAPIerInitDataDb,
testDispatcherOptsAPIerStartEngine,
testDispatcherOptsAPIerRPCConn,
testDispatcherOptsSetterSetDispatcherProfile,
// testDispatcherOptsAPIerSetDispatcherProfile,
testDispatcherOptsAPIerSetDispatcherProfile,
// Start engine without Dispatcher on engine 2012 with profiles in database (*dispatchers:false)
testDispatcherOptsDSPInitCfg,
testDispatcherOptsDSPStartEngine,
testDispatcherOptsDSPRPCConn,
testDispatcherOptsCoreStatus, // self localhost(:2012) CoresV1Status
testDispatcherOptsCoreStatus, // localhost(:2012) CoresV1Status
testDispatcherGetItemBothEngines1,
testDispatcherAPIerCoreStatus,
testDispatcherOptsAPIerSetDispatcherHost4012,
testDispatcherCacheClearBothEngines,
testDispatcherOptsSetterSetDispatcherHost4012,
// testDispatcherOptsAPIerSetDispatcherHost4012,
testDispatcherOptsCoreStatusHost4012,
testDispatcherOptsSetterSetDispatcherProfileDoubleHost,
// testDispatcherOptsAPIerSetDispatcherProfileDoubleHost,
testDispatcherOptsCoreStatusWithRouteID,
testDispatcherOptsSetterSetDispatcherHostInexistent,
// testDispatcherOptsAPIerSetDispatcherHostInexistent,
testDispatcherOptsCoreStatusWithRouteID2,
testDispatcherOptsCoreStatusWithoutRouteID,
testDispatcherGetItemBothEngines2,
testDispatcherOptsDSPStopEngine,
testDispatcherOptsAPIerStopEngine,
// testDispatcherOptsSetterStopEngine,
}
)
@@ -124,8 +118,8 @@ func testDispatcherOptsAPIerSetDispatcherProfile(t *testing.T) {
DispatcherHost: &engine.DispatcherHost{
Tenant: "cgrates.org",
RemoteHost: &config.RemoteHost{
ID: "SELF_ENGINE",
Address: "127.0.0.1:4012",
ID: "HOST1",
Address: "127.0.0.1:2012",
Transport: "*json",
ConnectAttempts: 1,
Reconnects: 3,
@@ -137,7 +131,7 @@ func testDispatcherOptsAPIerSetDispatcherProfile(t *testing.T) {
utils.OptsDispatchers: false,
},
}
if err := apierRPC.Call(utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil {
if err := setterRPC.Call(utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil {
t.Error("Unexpected error when calling APIerSv1.SetDispatcherHost: ", err)
} else if replyStr != utils.OK {
t.Error("Unexpected reply returned", replyStr)
@@ -153,7 +147,7 @@ func testDispatcherOptsAPIerSetDispatcherProfile(t *testing.T) {
Weight: 10,
Hosts: engine.DispatcherHostProfiles{
{
ID: "SELF_ENGINE",
ID: "HOST1",
Weight: 5,
},
},
@@ -162,7 +156,7 @@ func testDispatcherOptsAPIerSetDispatcherProfile(t *testing.T) {
utils.OptsDispatchers: false,
},
}
if err := apierRPC.Call(utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil {
if err := setterRPC.Call(utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil {
t.Error("Unexpected error when calling APIerSv1.SetDispatcherProfile: ", err)
} else if replyStr != utils.OK {
t.Error("Unexpected reply returned", replyStr)
@@ -195,7 +189,56 @@ func testDispatcherOptsDSPRPCConn(t *testing.T) {
}
func testDispatcherOptsCoreStatus(t *testing.T) {
//SELF_ENGINE HOST
// HOST1 host matched
var reply map[string]interface{}
ev := utils.TenantWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
"*host": "HOST2",
utils.OptsRouteID: "account#dan.bogos",
},
}
if err := dspOptsRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
t.Error(err)
} else if reply[utils.NodeID] != "HOST1" {
t.Errorf("Expected HOST1, received %v", reply[utils.NodeID])
}
}
func testDispatcherGetItemBothEngines1(t *testing.T) {
argsCache := &utils.ArgsGetCacheItemWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
},
ArgsGetCacheItem: utils.ArgsGetCacheItem{
CacheID: utils.CacheDispatcherRoutes,
ItemID: "account#dan.bogos:*core",
},
}
var reply interface{}
if err := dspOptsRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
&reply); err != nil {
t.Error(err)
} else {
expected := map[string]interface{}{
utils.Tenant: "cgrates.org",
utils.ProfileID: "DSP1",
"HostID": "HOST1",
}
if !reflect.DeepEqual(expected, reply) {
t.Errorf("Expected %+v, \n received %+v", utils.ToJSON(expected), utils.ToJSON(reply))
}
}
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
}
func testDispatcherAPIerCoreStatus(t *testing.T) {
// HOST2 host matched because it was called from engine with port :4012 -> host2
var reply map[string]interface{}
ev := utils.TenantWithAPIOpts{
Tenant: "cgrates.org",
@@ -203,12 +246,10 @@ func testDispatcherOptsCoreStatus(t *testing.T) {
utils.OptsDispatchers: false,
},
}
if err := dspOptsRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
if err := apierRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
t.Error(err)
} else {
/*
t.Errorf("Received: %s", utils.ToJSON(reply))
*/
} else if reply[utils.NodeID] != "HOST2" {
t.Errorf("Expected HOST2, received %v", reply[utils.NodeID])
}
}
@@ -219,7 +260,7 @@ func testDispatcherOptsAPIerSetDispatcherHost4012(t *testing.T) {
DispatcherHost: &engine.DispatcherHost{
Tenant: "cgrates.org",
RemoteHost: &config.RemoteHost{
ID: "HOST4012",
ID: "HOST2",
Address: "127.0.0.1:4012",
Transport: "*json",
ConnectAttempts: 1,
@@ -232,7 +273,7 @@ func testDispatcherOptsAPIerSetDispatcherHost4012(t *testing.T) {
utils.OptsDispatchers: false,
},
}
if err := apierRPC.Call(utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil {
if err := setterRPC.Call(utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil {
t.Error("Unexpected error when calling APIerSv1.SetDispatcherHost: ", err)
} else if replyStr != utils.OK {
t.Error("Unexpected reply returned", replyStr)
@@ -248,7 +289,7 @@ func testDispatcherOptsAPIerSetDispatcherHost4012(t *testing.T) {
Weight: 10,
Hosts: engine.DispatcherHostProfiles{
{
ID: "HOST4012",
ID: "HOST2",
Weight: 10,
},
},
@@ -257,7 +298,7 @@ func testDispatcherOptsAPIerSetDispatcherHost4012(t *testing.T) {
utils.OptsDispatchers: false,
},
}
if err := apierRPC.Call(utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil {
if err := setterRPC.Call(utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil {
t.Error("Unexpected error when calling APIerSv1.SetDispatcherProfile: ", err)
} else if replyStr != utils.OK {
t.Error("Unexpected reply returned", replyStr)
@@ -267,152 +308,72 @@ func testDispatcherOptsAPIerSetDispatcherHost4012(t *testing.T) {
func testDispatcherOptsCoreStatusHost4012(t *testing.T) {
// status just for HOST4012
var reply map[string]interface{}
ev := utils.TenantWithAPIOpts{
Tenant: "cgrates.org",
}
if err := dspOptsRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
t.Error(err)
} else {
/*
t.Errorf("Received: %s", utils.ToJSON(reply))
*/
}
}
func testDispatcherOptsAPIerSetDispatcherProfileDoubleHost(t *testing.T) {
// Set DispatcherProfile with both engines
setDispatcherProfile := &engine.DispatcherProfileWithAPIOpts{
DispatcherProfile: &engine.DispatcherProfile{
Tenant: "cgrates.org",
ID: "DSP1",
Strategy: "*weight",
Subsystems: []string{utils.MetaAny},
Weight: 10,
Hosts: engine.DispatcherHostProfiles{
{
ID: "SELF_ENGINE",
Weight: 5,
},
{
ID: "HOST4012",
Weight: 10,
},
},
},
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
},
}
var replyStr string
if err := apierRPC.Call(utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil {
t.Error("Unexpected error when calling APIerSv1.SetDispatcherProfile: ", err)
} else if replyStr != utils.OK {
t.Error("Unexpected reply returned", replyStr)
}
}
func testDispatcherOptsCoreStatusWithRouteID(t *testing.T) {
// now it will dispatch in both engines
var reply map[string]interface{}
ev := utils.TenantWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
"*host": "HOST2",
utils.OptsRouteID: "account#dan.bogos",
},
}
if err := dspOptsRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
t.Error(err)
} else {
/*
t.Errorf("Received: %s", utils.ToJSON(reply))
*/
} else if reply[utils.NodeID] != "HOST2" {
t.Errorf("Expected HOST1, received %v", reply[utils.NodeID])
}
}
func testDispatcherOptsAPIerSetDispatcherHostInexistent(t *testing.T) {
// Set DispatcherHost on 4012 host
var replyStr string
setDispatcherHost := &engine.DispatcherHostWithAPIOpts{
DispatcherHost: &engine.DispatcherHost{
Tenant: "cgrates.org",
RemoteHost: &config.RemoteHost{
ID: "INEXISTENT",
Address: "127.0.0.1:1223",
Transport: "*json",
ConnectAttempts: 1,
Reconnects: 3,
ConnectTimeout: time.Minute,
ReplyTimeout: 2 * time.Minute,
},
},
func testDispatcherCacheClearBothEngines(t *testing.T) {
var reply string
if err := dspOptsRPC.Call(utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
},
}
if err := apierRPC.Call(utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil {
t.Error("Unexpected error when calling APIerSv1.SetDispatcherHost: ", err)
} else if replyStr != utils.OK {
t.Error("Unexpected reply returned", replyStr)
}, &reply); err != nil {
t.Fatal(err)
} else if reply != utils.OK {
t.Errorf("Unexpected reply returned")
}
// Set DispatcherProfile Different with an inexistent engine opened, but with a bigger weight(this should match now)
setDispatcherProfile := &engine.DispatcherProfileWithAPIOpts{
DispatcherProfile: &engine.DispatcherProfile{
Tenant: "cgrates.org",
ID: "DSP1",
Strategy: "*weight",
Subsystems: []string{utils.MetaAny},
Weight: 20,
Hosts: engine.DispatcherHostProfiles{
{
ID: "INEXISTENT",
Weight: 10,
},
},
},
if err := apierRPC.Call(utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{
APIOpts: map[string]interface{}{
utils.OptsDispatchers: false,
},
}
if err := apierRPC.Call(utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil {
t.Error("Unexpected error when calling APIerSv1.SetDispatcherProfile: ", err)
} else if replyStr != utils.OK {
t.Error("Unexpected reply returned", replyStr)
}, &reply); err != nil {
t.Fatal(err)
} else if reply != utils.OK {
t.Errorf("Unexpected reply returned")
}
}
func testDispatcherOptsCoreStatusWithRouteID2(t *testing.T) {
// because we have the routeID it will match DSP1 and last host matched, host4012
// so again, both engines will match
var reply map[string]interface{}
ev := utils.TenantWithAPIOpts{
func testDispatcherGetItemBothEngines2(t *testing.T) {
argsCache := &utils.ArgsGetCacheItemWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
utils.OptsRouteID: "account#dan.bogos",
utils.OptsDispatchers: false,
},
ArgsGetCacheItem: utils.ArgsGetCacheItem{
CacheID: utils.CacheDispatcherRoutes,
ItemID: "account#dan.bogos:*core",
},
}
if err := dspOptsRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
var reply interface{}
if err := dspOptsRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
&reply); err != nil {
t.Error(err)
} else {
/*
t.Errorf("Received: %s", utils.ToJSON(reply))
*/
expected := map[string]interface{}{
utils.Tenant: "cgrates.org",
utils.ProfileID: "DSP1",
"HostID": "HOST2",
}
if !reflect.DeepEqual(expected, reply) {
t.Errorf("Expected %+v, \n received %+v", utils.ToJSON(expected), utils.ToJSON(reply))
}
}
}
func testDispatcherOptsCoreStatusWithoutRouteID(t *testing.T) {
// because we have the routeID it will match DSP1 and last host matched, host4012
// so again, both engines will match
var reply map[string]interface{}
ev := utils.TenantWithAPIOpts{
Tenant: "cgrates.org",
}
if err := dspOptsRPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
if err := apierRPC.Call(utils.CacheSv1GetItemWithRemote, argsCache,
&reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
} else {
/*
t.Errorf("Received: %s", utils.ToJSON(reply))
*/
}
}

View File

@@ -2276,6 +2276,7 @@ const (
// DispatcherSCfg
AnySubsystemCfg = "any_subsystem"
PreventLoopCfg = "prevent_loop"
)
// FC Template

View File

@@ -76,6 +76,7 @@ var (
ErrMaxConcurentRPCExceeded = errors.New("MAX_CONCURENT_RPC_EXCEEDED") // but the codec will rewrite it with this one to be sure that we corectly dealocate the request
ErrMaxIterationsReached = errors.New("maximum iterations reached")
ErrNegative = errors.New("NEGATIVE")
ErrCastFailed = errors.New("CAST_FAILED")
ErrMap = map[string]error{
ErrNoMoreData.Error(): ErrNoMoreData,