diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 813ce9f24..05a209825 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -523,31 +523,29 @@ func main() { initServiceManagerV1(internalServeManagerChan, srvManager, server) - // init internalRPCSet + // init internalRPCSet because we can have double connections in rpc_conns and one of it could be *internal engine.IntRPC = engine.NewRPCClientSet() - if cfg.DispatcherSCfg().Enabled { - engine.IntRPC.AddInternalRPCClient(utils.AnalyzerSv1, anz.GetIntenternalChan()) - engine.IntRPC.AddInternalRPCClient(utils.APIerSv1, APIerSv1.GetIntenternalChan()) - engine.IntRPC.AddInternalRPCClient(utils.APIerSv2, APIerSv2.GetIntenternalChan()) - engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, attrS.GetIntenternalChan()) - engine.IntRPC.AddInternalRPCClient(utils.CacheSv1, internalCacheSChan) - engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, cdrS.GetIntenternalChan()) - engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, cdrS.GetIntenternalChan()) - engine.IntRPC.AddInternalRPCClient(utils.ChargerSv1, chrS.GetIntenternalChan()) - engine.IntRPC.AddInternalRPCClient(utils.GuardianSv1, internalGuardianSChan) - engine.IntRPC.AddInternalRPCClient(utils.LoaderSv1, ldrs.GetIntenternalChan()) - engine.IntRPC.AddInternalRPCClient(utils.ResourceSv1, reS.GetIntenternalChan()) - engine.IntRPC.AddInternalRPCClient(utils.Responder, rals.GetResponder().GetIntenternalChan()) - engine.IntRPC.AddInternalRPCClient(utils.SchedulerSv1, schS.GetIntenternalChan()) - engine.IntRPC.AddInternalRPCClient(utils.SessionSv1, smg.GetIntenternalChan()) - engine.IntRPC.AddInternalRPCClient(utils.StatSv1, stS.GetIntenternalChan()) - engine.IntRPC.AddInternalRPCClient(utils.SupplierSv1, supS.GetIntenternalChan()) - engine.IntRPC.AddInternalRPCClient(utils.ThresholdSv1, tS.GetIntenternalChan()) - engine.IntRPC.AddInternalRPCClient(utils.ServiceManagerV1, internalServeManagerChan) - engine.IntRPC.AddInternalRPCClient(utils.ConfigSv1, internalConfigChan) - engine.IntRPC.AddInternalRPCClient(utils.CoreSv1, internalCoreSv1Chan) - engine.IntRPC.AddInternalRPCClient(utils.RALsV1, rals.GetIntenternalChan()) - } + engine.IntRPC.AddInternalRPCClient(utils.AnalyzerSv1, anz.GetIntenternalChan()) + engine.IntRPC.AddInternalRPCClient(utils.APIerSv1, APIerSv1.GetIntenternalChan()) + engine.IntRPC.AddInternalRPCClient(utils.APIerSv2, APIerSv2.GetIntenternalChan()) + engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, attrS.GetIntenternalChan()) + engine.IntRPC.AddInternalRPCClient(utils.CacheSv1, internalCacheSChan) + engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, cdrS.GetIntenternalChan()) + engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, cdrS.GetIntenternalChan()) + engine.IntRPC.AddInternalRPCClient(utils.ChargerSv1, chrS.GetIntenternalChan()) + engine.IntRPC.AddInternalRPCClient(utils.GuardianSv1, internalGuardianSChan) + engine.IntRPC.AddInternalRPCClient(utils.LoaderSv1, ldrs.GetIntenternalChan()) + engine.IntRPC.AddInternalRPCClient(utils.ResourceSv1, reS.GetIntenternalChan()) + engine.IntRPC.AddInternalRPCClient(utils.Responder, rals.GetResponder().GetIntenternalChan()) + engine.IntRPC.AddInternalRPCClient(utils.SchedulerSv1, schS.GetIntenternalChan()) + engine.IntRPC.AddInternalRPCClient(utils.SessionSv1, smg.GetIntenternalChan()) + engine.IntRPC.AddInternalRPCClient(utils.StatSv1, stS.GetIntenternalChan()) + engine.IntRPC.AddInternalRPCClient(utils.SupplierSv1, supS.GetIntenternalChan()) + engine.IntRPC.AddInternalRPCClient(utils.ThresholdSv1, tS.GetIntenternalChan()) + engine.IntRPC.AddInternalRPCClient(utils.ServiceManagerV1, internalServeManagerChan) + engine.IntRPC.AddInternalRPCClient(utils.ConfigSv1, internalConfigChan) + engine.IntRPC.AddInternalRPCClient(utils.CoreSv1, internalCoreSv1Chan) + engine.IntRPC.AddInternalRPCClient(utils.RALsV1, rals.GetIntenternalChan()) initConfigSv1(internalConfigChan, server) diff --git a/data/conf/samples/twoengines/engine2/cgrates.json b/data/conf/samples/twoengines/engine2/cgrates.json index 7750a7e8c..485db6c9c 100644 --- a/data/conf/samples/twoengines/engine2/cgrates.json +++ b/data/conf/samples/twoengines/engine2/cgrates.json @@ -29,4 +29,9 @@ }, +"apiers": { + "enabled": true, +}, + + } diff --git a/engine/connmanager.go b/engine/connmanager.go index acd277975..bbe8f272c 100644 --- a/engine/connmanager.go +++ b/engine/connmanager.go @@ -59,6 +59,12 @@ func (cM *ConnManager) getConn(connID string, biRPCClient rpcclient.ClientConnec isBiRPCCLient = true } else { connCfg = cM.cfg.RPCConns()[connID] + for _, rpcConn := range connCfg.Conns { + if rpcConn.Address == utils.MetaInternal { + intChan = IntRPC.GetInternalChanel() + break + } + } } switch { case biRPCClient != nil && isBiRPCCLient: // special handling for SessionS BiJSONRPCClient diff --git a/general_tests/twoengines_it_test.go b/general_tests/twoengines_it_test.go index 93fedfadc..4a75a7515 100644 --- a/general_tests/twoengines_it_test.go +++ b/general_tests/twoengines_it_test.go @@ -53,6 +53,7 @@ var sTestsTwoEnginesIT = []func(t *testing.T){ testTwoEnginesCheckCacheBeforeSet, testTwoEnginesSetThreshold, testTwoEnginesCheckCacheAfterSet, + testTwoEnginesUpdateThreshold, testTwoEnginesKillEngines, } @@ -138,6 +139,12 @@ func testTwoEnginesCheckCacheBeforeSet(t *testing.T) { func testTwoEnginesSetThreshold(t *testing.T) { var reply *engine.ThresholdProfile + // enforce caching with nil on engine2 so CacheSv1.ReloadCache load correctly the threshold + if err := engineTwoRpc.Call(utils.APIerSv1GetThresholdProfile, + &utils.TenantID{Tenant: "cgrates.org", ID: "THD_TwoEnginesTest"}, &reply); err == nil || + err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } var result string tPrfl := &engine.ThresholdWithCache{ ThresholdProfile: &engine.ThresholdProfile{ @@ -151,7 +158,6 @@ func testTwoEnginesSetThreshold(t *testing.T) { ActionIDs: []string{"ACT_1"}, Async: true, }, - Cache: utils.StringPointer(utils.MetaLoad), } if err := engineOneRpc.Call(utils.APIerSv1SetThresholdProfile, tPrfl, &result); err != nil { t.Error(err) @@ -200,6 +206,64 @@ func testTwoEnginesCheckCacheAfterSet(t *testing.T) { } else if !reflect.DeepEqual(expKeys, rcvKeys) { t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys) } + // after we verify the cache make sure it was set correctly there + tPrfl := &engine.ThresholdWithCache{ + ThresholdProfile: &engine.ThresholdProfile{ + Tenant: "cgrates.org", + ID: "THD_TwoEnginesTest", + FilterIDs: []string{"*string:~*req.Account:1001"}, + MaxHits: -1, + MinSleep: time.Duration(5 * time.Minute), + Blocker: false, + Weight: 20.0, + ActionIDs: []string{"ACT_1"}, + Async: true, + }, + } + var rplTh *engine.ThresholdProfile + if err := engineTwoRpc.Call(utils.APIerSv1GetThresholdProfile, + &utils.TenantID{Tenant: "cgrates.org", ID: "THD_TwoEnginesTest"}, &rplTh); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(tPrfl.ThresholdProfile, rplTh) { + t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, rplTh) + } +} + +func testTwoEnginesUpdateThreshold(t *testing.T) { + var rplTh *engine.ThresholdProfile + var result string + tPrfl := &engine.ThresholdWithCache{ + ThresholdProfile: &engine.ThresholdProfile{ + Tenant: "cgrates.org", + ID: "THD_TwoEnginesTest", + FilterIDs: []string{"*string:~*req.Account:10"}, + MaxHits: -1, + MinSleep: time.Duration(1 * time.Minute), + Blocker: false, + Weight: 50.0, + ActionIDs: []string{"ACT_1.1"}, + Async: true, + }, + Cache: utils.StringPointer(utils.MetaReload), + } + if err := engineOneRpc.Call(utils.APIerSv1SetThresholdProfile, tPrfl, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + time.Sleep(50 * time.Millisecond) + if err := engineOneRpc.Call(utils.APIerSv1GetThresholdProfile, + &utils.TenantID{Tenant: "cgrates.org", ID: "THD_TwoEnginesTest"}, &rplTh); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(tPrfl.ThresholdProfile, rplTh) { + t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, rplTh) + } + if err := engineTwoRpc.Call(utils.APIerSv1GetThresholdProfile, + &utils.TenantID{Tenant: "cgrates.org", ID: "THD_TwoEnginesTest"}, &rplTh); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(tPrfl.ThresholdProfile, rplTh) { + t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, rplTh) + } } func testTwoEnginesKillEngines(t *testing.T) {