Correct using a connID with two connections one of it *internal

This commit is contained in:
TeoV
2020-02-27 17:34:17 +02:00
committed by Dan Christian Bogos
parent fe62e40369
commit 87095fa53b
4 changed files with 98 additions and 25 deletions

View File

@@ -523,31 +523,29 @@ func main() {
initServiceManagerV1(internalServeManagerChan, srvManager, server)
// init internalRPCSet
// init internalRPCSet because we can have double connections in rpc_conns and one of it could be *internal
engine.IntRPC = engine.NewRPCClientSet()
if cfg.DispatcherSCfg().Enabled {
engine.IntRPC.AddInternalRPCClient(utils.AnalyzerSv1, anz.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.APIerSv1, APIerSv1.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.APIerSv2, APIerSv2.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, attrS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.CacheSv1, internalCacheSChan)
engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, cdrS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, cdrS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.ChargerSv1, chrS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.GuardianSv1, internalGuardianSChan)
engine.IntRPC.AddInternalRPCClient(utils.LoaderSv1, ldrs.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.ResourceSv1, reS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.Responder, rals.GetResponder().GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.SchedulerSv1, schS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.SessionSv1, smg.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.StatSv1, stS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.SupplierSv1, supS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.ThresholdSv1, tS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.ServiceManagerV1, internalServeManagerChan)
engine.IntRPC.AddInternalRPCClient(utils.ConfigSv1, internalConfigChan)
engine.IntRPC.AddInternalRPCClient(utils.CoreSv1, internalCoreSv1Chan)
engine.IntRPC.AddInternalRPCClient(utils.RALsV1, rals.GetIntenternalChan())
}
engine.IntRPC.AddInternalRPCClient(utils.AnalyzerSv1, anz.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.APIerSv1, APIerSv1.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.APIerSv2, APIerSv2.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, attrS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.CacheSv1, internalCacheSChan)
engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, cdrS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, cdrS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.ChargerSv1, chrS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.GuardianSv1, internalGuardianSChan)
engine.IntRPC.AddInternalRPCClient(utils.LoaderSv1, ldrs.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.ResourceSv1, reS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.Responder, rals.GetResponder().GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.SchedulerSv1, schS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.SessionSv1, smg.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.StatSv1, stS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.SupplierSv1, supS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.ThresholdSv1, tS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.ServiceManagerV1, internalServeManagerChan)
engine.IntRPC.AddInternalRPCClient(utils.ConfigSv1, internalConfigChan)
engine.IntRPC.AddInternalRPCClient(utils.CoreSv1, internalCoreSv1Chan)
engine.IntRPC.AddInternalRPCClient(utils.RALsV1, rals.GetIntenternalChan())
initConfigSv1(internalConfigChan, server)

View File

@@ -29,4 +29,9 @@
},
"apiers": {
"enabled": true,
},
}

View File

@@ -59,6 +59,12 @@ func (cM *ConnManager) getConn(connID string, biRPCClient rpcclient.ClientConnec
isBiRPCCLient = true
} else {
connCfg = cM.cfg.RPCConns()[connID]
for _, rpcConn := range connCfg.Conns {
if rpcConn.Address == utils.MetaInternal {
intChan = IntRPC.GetInternalChanel()
break
}
}
}
switch {
case biRPCClient != nil && isBiRPCCLient: // special handling for SessionS BiJSONRPCClient

View File

@@ -53,6 +53,7 @@ var sTestsTwoEnginesIT = []func(t *testing.T){
testTwoEnginesCheckCacheBeforeSet,
testTwoEnginesSetThreshold,
testTwoEnginesCheckCacheAfterSet,
testTwoEnginesUpdateThreshold,
testTwoEnginesKillEngines,
}
@@ -138,6 +139,12 @@ func testTwoEnginesCheckCacheBeforeSet(t *testing.T) {
func testTwoEnginesSetThreshold(t *testing.T) {
var reply *engine.ThresholdProfile
// enforce caching with nil on engine2 so CacheSv1.ReloadCache load correctly the threshold
if err := engineTwoRpc.Call(utils.APIerSv1GetThresholdProfile,
&utils.TenantID{Tenant: "cgrates.org", ID: "THD_TwoEnginesTest"}, &reply); err == nil ||
err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
var result string
tPrfl := &engine.ThresholdWithCache{
ThresholdProfile: &engine.ThresholdProfile{
@@ -151,7 +158,6 @@ func testTwoEnginesSetThreshold(t *testing.T) {
ActionIDs: []string{"ACT_1"},
Async: true,
},
Cache: utils.StringPointer(utils.MetaLoad),
}
if err := engineOneRpc.Call(utils.APIerSv1SetThresholdProfile, tPrfl, &result); err != nil {
t.Error(err)
@@ -200,6 +206,64 @@ func testTwoEnginesCheckCacheAfterSet(t *testing.T) {
} else if !reflect.DeepEqual(expKeys, rcvKeys) {
t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys)
}
// after we verify the cache make sure it was set correctly there
tPrfl := &engine.ThresholdWithCache{
ThresholdProfile: &engine.ThresholdProfile{
Tenant: "cgrates.org",
ID: "THD_TwoEnginesTest",
FilterIDs: []string{"*string:~*req.Account:1001"},
MaxHits: -1,
MinSleep: time.Duration(5 * time.Minute),
Blocker: false,
Weight: 20.0,
ActionIDs: []string{"ACT_1"},
Async: true,
},
}
var rplTh *engine.ThresholdProfile
if err := engineTwoRpc.Call(utils.APIerSv1GetThresholdProfile,
&utils.TenantID{Tenant: "cgrates.org", ID: "THD_TwoEnginesTest"}, &rplTh); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(tPrfl.ThresholdProfile, rplTh) {
t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, rplTh)
}
}
func testTwoEnginesUpdateThreshold(t *testing.T) {
var rplTh *engine.ThresholdProfile
var result string
tPrfl := &engine.ThresholdWithCache{
ThresholdProfile: &engine.ThresholdProfile{
Tenant: "cgrates.org",
ID: "THD_TwoEnginesTest",
FilterIDs: []string{"*string:~*req.Account:10"},
MaxHits: -1,
MinSleep: time.Duration(1 * time.Minute),
Blocker: false,
Weight: 50.0,
ActionIDs: []string{"ACT_1.1"},
Async: true,
},
Cache: utils.StringPointer(utils.MetaReload),
}
if err := engineOneRpc.Call(utils.APIerSv1SetThresholdProfile, tPrfl, &result); err != nil {
t.Error(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
time.Sleep(50 * time.Millisecond)
if err := engineOneRpc.Call(utils.APIerSv1GetThresholdProfile,
&utils.TenantID{Tenant: "cgrates.org", ID: "THD_TwoEnginesTest"}, &rplTh); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(tPrfl.ThresholdProfile, rplTh) {
t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, rplTh)
}
if err := engineTwoRpc.Call(utils.APIerSv1GetThresholdProfile,
&utils.TenantID{Tenant: "cgrates.org", ID: "THD_TwoEnginesTest"}, &rplTh); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(tPrfl.ThresholdProfile, rplTh) {
t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, rplTh)
}
}
func testTwoEnginesKillEngines(t *testing.T) {