mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Correct using a connID with two connections one of it *internal
This commit is contained in:
committed by
Dan Christian Bogos
parent
f0127e2a77
commit
ee6fcc4425
@@ -523,31 +523,29 @@ func main() {
|
||||
|
||||
initServiceManagerV1(internalServeManagerChan, srvManager, server)
|
||||
|
||||
// init internalRPCSet
|
||||
// init internalRPCSet because we can have double connections in rpc_conns and one of it could be *internal
|
||||
engine.IntRPC = engine.NewRPCClientSet()
|
||||
if cfg.DispatcherSCfg().Enabled {
|
||||
engine.IntRPC.AddInternalRPCClient(utils.AnalyzerSv1, anz.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.APIerSv1, APIerSv1.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.APIerSv2, APIerSv2.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, attrS.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.CacheSv1, internalCacheSChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, cdrS.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, cdrS.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.ChargerSv1, chrS.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.GuardianSv1, internalGuardianSChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.LoaderSv1, ldrs.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.ResourceSv1, reS.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.Responder, rals.GetResponder().GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.SchedulerSv1, schS.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.SessionSv1, smg.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.StatSv1, stS.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.SupplierSv1, supS.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.ThresholdSv1, tS.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.ServiceManagerV1, internalServeManagerChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.ConfigSv1, internalConfigChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.CoreSv1, internalCoreSv1Chan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.RALsV1, rals.GetIntenternalChan())
|
||||
}
|
||||
engine.IntRPC.AddInternalRPCClient(utils.AnalyzerSv1, anz.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.APIerSv1, APIerSv1.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.APIerSv2, APIerSv2.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, attrS.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.CacheSv1, internalCacheSChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, cdrS.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, cdrS.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.ChargerSv1, chrS.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.GuardianSv1, internalGuardianSChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.LoaderSv1, ldrs.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.ResourceSv1, reS.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.Responder, rals.GetResponder().GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.SchedulerSv1, schS.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.SessionSv1, smg.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.StatSv1, stS.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.SupplierSv1, supS.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.ThresholdSv1, tS.GetIntenternalChan())
|
||||
engine.IntRPC.AddInternalRPCClient(utils.ServiceManagerV1, internalServeManagerChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.ConfigSv1, internalConfigChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.CoreSv1, internalCoreSv1Chan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.RALsV1, rals.GetIntenternalChan())
|
||||
|
||||
initConfigSv1(internalConfigChan, server)
|
||||
|
||||
|
||||
@@ -29,4 +29,9 @@
|
||||
},
|
||||
|
||||
|
||||
"apiers": {
|
||||
"enabled": true,
|
||||
},
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -59,6 +59,12 @@ func (cM *ConnManager) getConn(connID string, biRPCClient rpcclient.ClientConnec
|
||||
isBiRPCCLient = true
|
||||
} else {
|
||||
connCfg = cM.cfg.RPCConns()[connID]
|
||||
for _, rpcConn := range connCfg.Conns {
|
||||
if rpcConn.Address == utils.MetaInternal {
|
||||
intChan = IntRPC.GetInternalChanel()
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
switch {
|
||||
case biRPCClient != nil && isBiRPCCLient: // special handling for SessionS BiJSONRPCClient
|
||||
|
||||
@@ -53,6 +53,7 @@ var sTestsTwoEnginesIT = []func(t *testing.T){
|
||||
testTwoEnginesCheckCacheBeforeSet,
|
||||
testTwoEnginesSetThreshold,
|
||||
testTwoEnginesCheckCacheAfterSet,
|
||||
testTwoEnginesUpdateThreshold,
|
||||
testTwoEnginesKillEngines,
|
||||
}
|
||||
|
||||
@@ -138,6 +139,12 @@ func testTwoEnginesCheckCacheBeforeSet(t *testing.T) {
|
||||
|
||||
func testTwoEnginesSetThreshold(t *testing.T) {
|
||||
var reply *engine.ThresholdProfile
|
||||
// enforce caching with nil on engine2 so CacheSv1.ReloadCache load correctly the threshold
|
||||
if err := engineTwoRpc.Call(utils.APIerSv1GetThresholdProfile,
|
||||
&utils.TenantID{Tenant: "cgrates.org", ID: "THD_TwoEnginesTest"}, &reply); err == nil ||
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Error(err)
|
||||
}
|
||||
var result string
|
||||
tPrfl := &engine.ThresholdWithCache{
|
||||
ThresholdProfile: &engine.ThresholdProfile{
|
||||
@@ -151,7 +158,6 @@ func testTwoEnginesSetThreshold(t *testing.T) {
|
||||
ActionIDs: []string{"ACT_1"},
|
||||
Async: true,
|
||||
},
|
||||
Cache: utils.StringPointer(utils.MetaLoad),
|
||||
}
|
||||
if err := engineOneRpc.Call(utils.APIerSv1SetThresholdProfile, tPrfl, &result); err != nil {
|
||||
t.Error(err)
|
||||
@@ -200,6 +206,64 @@ func testTwoEnginesCheckCacheAfterSet(t *testing.T) {
|
||||
} else if !reflect.DeepEqual(expKeys, rcvKeys) {
|
||||
t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys)
|
||||
}
|
||||
// after we verify the cache make sure it was set correctly there
|
||||
tPrfl := &engine.ThresholdWithCache{
|
||||
ThresholdProfile: &engine.ThresholdProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "THD_TwoEnginesTest",
|
||||
FilterIDs: []string{"*string:~*req.Account:1001"},
|
||||
MaxHits: -1,
|
||||
MinSleep: time.Duration(5 * time.Minute),
|
||||
Blocker: false,
|
||||
Weight: 20.0,
|
||||
ActionIDs: []string{"ACT_1"},
|
||||
Async: true,
|
||||
},
|
||||
}
|
||||
var rplTh *engine.ThresholdProfile
|
||||
if err := engineTwoRpc.Call(utils.APIerSv1GetThresholdProfile,
|
||||
&utils.TenantID{Tenant: "cgrates.org", ID: "THD_TwoEnginesTest"}, &rplTh); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(tPrfl.ThresholdProfile, rplTh) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, rplTh)
|
||||
}
|
||||
}
|
||||
|
||||
func testTwoEnginesUpdateThreshold(t *testing.T) {
|
||||
var rplTh *engine.ThresholdProfile
|
||||
var result string
|
||||
tPrfl := &engine.ThresholdWithCache{
|
||||
ThresholdProfile: &engine.ThresholdProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "THD_TwoEnginesTest",
|
||||
FilterIDs: []string{"*string:~*req.Account:10"},
|
||||
MaxHits: -1,
|
||||
MinSleep: time.Duration(1 * time.Minute),
|
||||
Blocker: false,
|
||||
Weight: 50.0,
|
||||
ActionIDs: []string{"ACT_1.1"},
|
||||
Async: true,
|
||||
},
|
||||
Cache: utils.StringPointer(utils.MetaReload),
|
||||
}
|
||||
if err := engineOneRpc.Call(utils.APIerSv1SetThresholdProfile, tPrfl, &result); err != nil {
|
||||
t.Error(err)
|
||||
} else if result != utils.OK {
|
||||
t.Error("Unexpected reply returned", result)
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
if err := engineOneRpc.Call(utils.APIerSv1GetThresholdProfile,
|
||||
&utils.TenantID{Tenant: "cgrates.org", ID: "THD_TwoEnginesTest"}, &rplTh); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(tPrfl.ThresholdProfile, rplTh) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, rplTh)
|
||||
}
|
||||
if err := engineTwoRpc.Call(utils.APIerSv1GetThresholdProfile,
|
||||
&utils.TenantID{Tenant: "cgrates.org", ID: "THD_TwoEnginesTest"}, &rplTh); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(tPrfl.ThresholdProfile, rplTh) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, rplTh)
|
||||
}
|
||||
}
|
||||
|
||||
func testTwoEnginesKillEngines(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user