From 6b487537b8361db7fd01fd82e812802d266b3da7 Mon Sep 17 00:00:00 2001 From: Tripon Alexandru-Ionut Date: Fri, 5 Apr 2019 20:30:15 +0300 Subject: [PATCH] Started adding dispatcher internal connections --- apier/v1/guardian.go | 6 +++ apier/v1/schedulers.go | 6 +++ apier/v1/sessions.go | 7 ++- cmd/cgr-engine/cgr-engine.go | 48 +++++++++++++++--- cmd/cgr-engine/rater.go | 6 ++- config/config.go | 5 -- .../dispatchers/attributes/cgrates.json | 40 --------------- .../dispatchers/attributes_mongo/cgrates.json | 49 ------------------- .../dispatchers/dispatchers/cgrates.json | 14 +++--- .../dispatchers_mongo/cgrates.json | 8 +-- .../dispatchers/DispatcherHosts.csv | 2 +- dispatchers/attributes_it_test.go | 4 +- dispatchers/caches_it_test.go | 4 +- dispatchers/chargers_it_test.go | 4 +- dispatchers/guardian_it_test.go | 2 +- dispatchers/libtest.go | 9 ++-- dispatchers/resources_it_test.go | 4 +- dispatchers/responder_it_test.go | 7 ++- dispatchers/scheduler_it_test.go | 2 +- dispatchers/sessions_it_test.go | 4 +- dispatchers/stats_it_test.go | 4 +- dispatchers/suppliers_it_test.go | 4 +- dispatchers/thresholds_it_test.go | 4 +- engine/datamanager.go | 3 +- engine/libengine.go | 42 ++++++++++++++++ utils/consts.go | 2 + 26 files changed, 147 insertions(+), 143 deletions(-) delete mode 100644 data/conf/samples/dispatchers/attributes/cgrates.json delete mode 100644 data/conf/samples/dispatchers/attributes_mongo/cgrates.json diff --git a/apier/v1/guardian.go b/apier/v1/guardian.go index e22e54e6f..a58477d29 100644 --- a/apier/v1/guardian.go +++ b/apier/v1/guardian.go @@ -46,3 +46,9 @@ func (self *GuardianSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *strin *reply = utils.Pong return nil } + +// Call implements rpcclient.RpcClientConnection interface for internal RPC +func (self *GuardianSv1) Call(serviceMethod string, + args interface{}, reply interface{}) error { + return utils.APIerRPCCall(self, serviceMethod, args, reply) +} diff --git a/apier/v1/schedulers.go b/apier/v1/schedulers.go index fe25d46d2..66cea0754 100644 --- a/apier/v1/schedulers.go +++ b/apier/v1/schedulers.go @@ -41,3 +41,9 @@ func (schdSv1 *SchedulerSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *s *reply = utils.Pong return nil } + +// Call implements rpcclient.RpcClientConnection interface for internal RPC +func (schdSv1 *SchedulerSv1) Call(serviceMethod string, + args interface{}, reply interface{}) error { + return utils.APIerRPCCall(schdSv1, serviceMethod, args, reply) +} diff --git a/apier/v1/sessions.go b/apier/v1/sessions.go index 4e5a48c6a..0ba6cd071 100644 --- a/apier/v1/sessions.go +++ b/apier/v1/sessions.go @@ -113,5 +113,10 @@ func (ssv1 *SessionSv1) ReplicateSessions(args sessions.ArgsReplicateSessions, r func (ssv1 *SessionSv1) SetPassiveSession(args *sessions.Session, reply *string) error { return ssv1.Ss.BiRPCv1SetPassiveSession(nil, args, reply) - +} + +// Call implements rpcclient.RpcClientConnection interface for internal RPC +func (ssv1 *SessionSv1) Call(serviceMethod string, + args interface{}, reply interface{}) error { + return utils.APIerRPCCall(ssv1, serviceMethod, args, reply) } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index e42c60831..8f7053416 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -299,7 +299,11 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in server.RpcRegister(smgRpc) ssv1 := v1.NewSessionSv1(sm) // methods with multiple options - server.RpcRegister(ssv1) + if !config.CgrConfig().DispatcherSCfg().Enabled { + server.RpcRegister(ssv1) + } else { + engine.IntRPC.AddConnection(utils.SessionSv1, ssv1) + } // Register BiRpc handlers if cfg.SessionSCfg().ListenBijson != "" { for method, handler := range smgRpc.Handlers() { @@ -674,7 +678,11 @@ func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnec return }() aSv1 := v1.NewAttributeSv1(aS) - server.RpcRegister(aSv1) + if !config.CgrConfig().DispatcherSCfg().Enabled { + server.RpcRegister(aSv1) + } else { + engine.IntRPC.AddConnection(utils.AttributeSv1, aSv1) + } internalAttributeSChan <- aSv1 } @@ -723,7 +731,11 @@ func startChargerService(internalChargerSChan chan rpcclient.RpcClientConnection return }() cSv1 := v1.NewChargerSv1(cS) - server.RpcRegister(cSv1) + if !config.CgrConfig().DispatcherSCfg().Enabled { + server.RpcRegister(cSv1) + } else { + engine.IntRPC.AddConnection(utils.ChargerSv1, cSv1) + } internalChargerSChan <- cSv1 } @@ -770,7 +782,11 @@ func startResourceService(internalRsChan chan rpcclient.RpcClientConnection, cac return }() rsV1 := v1.NewResourceSv1(rS) - server.RpcRegister(rsV1) + if !config.CgrConfig().DispatcherSCfg().Enabled { + server.RpcRegister(rsV1) + } else { + engine.IntRPC.AddConnection(utils.ResourceSv1, rsV1) + } internalRsChan <- rsV1 } @@ -817,7 +833,11 @@ func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cach return }() stsV1 := v1.NewStatSv1(sS) - server.RpcRegister(stsV1) + if !config.CgrConfig().DispatcherSCfg().Enabled { + server.RpcRegister(stsV1) + } else { + engine.IntRPC.AddConnection(utils.StatSv1, stsV1) + } internalStatSChan <- stsV1 } @@ -848,7 +868,11 @@ func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnec return }() tSv1 := v1.NewThresholdSv1(tS) - server.RpcRegister(tSv1) + if !config.CgrConfig().DispatcherSCfg().Enabled { + server.RpcRegister(tSv1) + } else { + engine.IntRPC.AddConnection(utils.ThresholdSv1, tSv1) + } internalThresholdSChan <- tSv1 } @@ -929,7 +953,11 @@ func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnecti return }() splV1 := v1.NewSupplierSv1(splS) - server.RpcRegister(splV1) + if !config.CgrConfig().DispatcherSCfg().Enabled { + server.RpcRegister(splV1) + } else { + engine.IntRPC.AddConnection(utils.SupplierSv1, splV1) + } internalSupplierSChan <- splV1 } @@ -1080,6 +1108,8 @@ func initCacheS(internalCacheSChan chan rpcclient.RpcClientConnection, if !cfg.DispatcherSCfg().Enabled { server.RpcRegister(v1.NewCacheSv1(chS)) + } else { + engine.IntRPC.AddConnection(utils.CacheSv1, v1.NewCacheSv1(chS)) } internalCacheSChan <- chS return @@ -1088,6 +1118,8 @@ func initCacheS(internalCacheSChan chan rpcclient.RpcClientConnection, func initGuardianSv1(server *utils.Server) { if !cfg.DispatcherSCfg().Enabled { server.RpcRegister(v1.NewGuardianSv1()) + } else { + engine.IntRPC.AddConnection(utils.GuardianSv1, v1.NewGuardianSv1()) } } @@ -1096,6 +1128,8 @@ func initSchedulerS(internalCacheSChan chan rpcclient.RpcClientConnection, schdS := servmanager.NewSchedulerS(srvMngr) if !cfg.DispatcherSCfg().Enabled { server.RpcRegister(v1.NewSchedulerSv1(schdS)) + } else { + engine.IntRPC.AddConnection(utils.SchedulerSv1, v1.NewSchedulerSv1(schdS)) } internalCacheSChan <- schdS } diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index e2e0fb0fe..1a800e5cc 100755 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -188,7 +188,11 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en apierRpcV2 := &v2.ApierV2{ ApierV1: *apierRpcV1} - server.RpcRegister(responder) + if !cfg.DispatcherSCfg().Enabled { + server.RpcRegister(responder) + } else { + engine.IntRPC.AddConnection(utils.Responder, responder) + } server.RpcRegister(apierRpcV1) server.RpcRegister(apierRpcV2) diff --git a/config/config.go b/config/config.go index 79465a6b3..7d6a190ba 100755 --- a/config/config.go +++ b/config/config.go @@ -692,11 +692,6 @@ func (self *CGRConfig) checkConfigSanity() error { } } } - if self.dispatcherSCfg.Enabled { - if self.attributeSCfg.Enabled { - return fmt.Errorf("<%s> cannot start in tandem with <%s>", utils.DispatcherS, utils.AttributeS) - } - } // Scheduler check connection with CDR Server if !self.cdrsCfg.CDRSEnabled { for _, connCfg := range self.schedulerCfg.CDRsConns { diff --git a/data/conf/samples/dispatchers/attributes/cgrates.json b/data/conf/samples/dispatchers/attributes/cgrates.json deleted file mode 100644 index efbb2c11b..000000000 --- a/data/conf/samples/dispatchers/attributes/cgrates.json +++ /dev/null @@ -1,40 +0,0 @@ -{ -// CGRateS Configuration file -// - - -"general": { - "node_id": "AttributeS1", - "log_level": 7 -}, - -"listen": { - "rpc_json": ":5012", - "rpc_gob": ":5013", - "http": ":5080", -}, - -"stor_db": { - "db_type":"*internal", -}, - -"attributes": { - "enabled": true -}, - -"rals": { - "enabled": true, -}, - -"scheduler": { - "enabled": true, -}, - -"apier": { - "caches_conns":[ // connections to CacheS for reloads - {"address": "127.0.0.1:5012", "transport": "*json"}, - ], -}, - -} - \ No newline at end of file diff --git a/data/conf/samples/dispatchers/attributes_mongo/cgrates.json b/data/conf/samples/dispatchers/attributes_mongo/cgrates.json deleted file mode 100644 index 3427bc812..000000000 --- a/data/conf/samples/dispatchers/attributes_mongo/cgrates.json +++ /dev/null @@ -1,49 +0,0 @@ -{ -// CGRateS Configuration file -// - - -"general": { - "node_id": "AttributeS1", - "log_level": 7 -}, - -"listen": { - "rpc_json": ":5012", - "rpc_gob": ":5013", - "http": ":5080", -}, - -"data_db": { - "db_type": "mongo", - "db_name": "10", - "db_port": 27017, -}, - - -"stor_db": { - "db_type": "mongo", - "db_name": "cgrates", - "db_port": 27017, -}, - -"scheduler": { - "enabled": true, -}, - -"attributes": { - "enabled": true -}, - -"rals": { - "enabled": true, -}, - -"apier": { - "caches_conns":[ // connections to CacheS for reloads - {"address": "127.0.0.1:5012", "transport": "*json"}, - ], -}, - -} - \ No newline at end of file diff --git a/data/conf/samples/dispatchers/dispatchers/cgrates.json b/data/conf/samples/dispatchers/dispatchers/cgrates.json index a340155ce..8f4cc9171 100755 --- a/data/conf/samples/dispatchers/dispatchers/cgrates.json +++ b/data/conf/samples/dispatchers/dispatchers/cgrates.json @@ -29,24 +29,22 @@ "attributes": { - "enabled": false + "enabled": true }, "scheduler": { "enabled": true, }, +"rals": { + "enabled": true, +}, + "dispatchers":{ "enabled": true, "attributes_conns": [ - {"address": "127.0.0.1:5012", "transport": "*json"}, + {"address": "*internal"}, ], }, -"apier": { - "caches_conns":[ // connections to CacheS for reloads - {"address": "127.0.0.1:2012", "transport": "*json"}, - ], -}, - } \ No newline at end of file diff --git a/data/conf/samples/dispatchers/dispatchers_mongo/cgrates.json b/data/conf/samples/dispatchers/dispatchers_mongo/cgrates.json index 17c260645..a8fd1eabb 100644 --- a/data/conf/samples/dispatchers/dispatchers_mongo/cgrates.json +++ b/data/conf/samples/dispatchers/dispatchers_mongo/cgrates.json @@ -41,16 +41,18 @@ }, "attributes": { - "enabled": false + "enabled": true }, +"rals": { + "enabled": true, +}, "dispatchers":{ "enabled": true, "attributes_conns": [ - {"address": "127.0.0.1:5012", "transport": "*json"}, + {"address": "*internal"}, ], }, - } \ No newline at end of file diff --git a/data/tariffplans/dispatchers/DispatcherHosts.csv b/data/tariffplans/dispatchers/DispatcherHosts.csv index e18649da3..f142aece6 100644 --- a/data/tariffplans/dispatchers/DispatcherHosts.csv +++ b/data/tariffplans/dispatchers/DispatcherHosts.csv @@ -1,4 +1,4 @@ #Tenant[0],ID[1],Address[2],Transport[3],TLS[4] -cgrates.org,AttributeS1,127.0.0.1:5012,*json,false +cgrates.org,SELF,*internal,, cgrates.org,ALL,127.0.0.1:6012,*json,false cgrates.org,ALL2,127.0.0.1:7012,*json,false diff --git a/dispatchers/attributes_it_test.go b/dispatchers/attributes_it_test.go index 301d79346..28abdbe2c 100755 --- a/dispatchers/attributes_it_test.go +++ b/dispatchers/attributes_it_test.go @@ -45,11 +45,11 @@ var sTestsDspAttr = []func(t *testing.T){ //Test start here func TestDspAttributeSTMySQL(t *testing.T) { - testDsp(t, sTestsDspAttr, "TestDspAttributeS", "all", "all2", "attributes", "dispatchers", "tutorial", "oldtutorial", "dispatchers") + testDsp(t, sTestsDspAttr, "TestDspAttributeS", "all", "all2", "dispatchers", "tutorial", "oldtutorial", "dispatchers") } func TestDspAttributeSMongo(t *testing.T) { - testDsp(t, sTestsDspAttr, "TestDspAttributeS", "all", "all2", "attributes_mongo", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers") + testDsp(t, sTestsDspAttr, "TestDspAttributeS", "all", "all2", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers") } func testDspAttrPingFailover(t *testing.T) { diff --git a/dispatchers/caches_it_test.go b/dispatchers/caches_it_test.go index 2400b9a2e..c78d61c0a 100644 --- a/dispatchers/caches_it_test.go +++ b/dispatchers/caches_it_test.go @@ -45,11 +45,11 @@ var sTestsDspChc = []func(t *testing.T){ //Test start here func TestDspCacheSv1TMySQL(t *testing.T) { - testDsp(t, sTestsDspChc, "TestDspCacheSv1", "all", "all2", "attributes", "dispatchers", "tutorial", "oldtutorial", "dispatchers") + testDsp(t, sTestsDspChc, "TestDspCacheSv1", "all", "all2", "dispatchers", "tutorial", "oldtutorial", "dispatchers") } func TestDspCacheSv1Mongo(t *testing.T) { - testDsp(t, sTestsDspChc, "TestDspCacheSv1", "all", "all2", "attributes_mongo", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers") + testDsp(t, sTestsDspChc, "TestDspCacheSv1", "all", "all2", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers") } func testDspChcPing(t *testing.T) { diff --git a/dispatchers/chargers_it_test.go b/dispatchers/chargers_it_test.go index 95d21f607..67fbdc19b 100755 --- a/dispatchers/chargers_it_test.go +++ b/dispatchers/chargers_it_test.go @@ -40,11 +40,11 @@ var sTestsDspCpp = []func(t *testing.T){ //Test start here func TestDspChargerSTMySQL(t *testing.T) { - testDsp(t, sTestsDspCpp, "TestDspChargerS", "all", "all2", "attributes", "dispatchers", "tutorial", "oldtutorial", "dispatchers") + testDsp(t, sTestsDspCpp, "TestDspChargerS", "all", "all2", "dispatchers", "tutorial", "oldtutorial", "dispatchers") } func TestDspChargerSMongo(t *testing.T) { - testDsp(t, sTestsDspCpp, "TestDspChargerS", "all", "all2", "attributes_mongo", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers") + testDsp(t, sTestsDspCpp, "TestDspChargerS", "all", "all2", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers") } func testDspCppPingFailover(t *testing.T) { diff --git a/dispatchers/guardian_it_test.go b/dispatchers/guardian_it_test.go index 83a805770..60ecf5bea 100644 --- a/dispatchers/guardian_it_test.go +++ b/dispatchers/guardian_it_test.go @@ -35,7 +35,7 @@ var sTestsDspGrd = []func(t *testing.T){ //Test start here func TestDspGuardianSTMySQL(t *testing.T) { - testDsp(t, sTestsDspGrd, "TestDspGuardianS", "all", "all2", "attributes", "dispatchers", "tutorial", "oldtutorial", "dispatchers") + testDsp(t, sTestsDspGrd, "TestDspGuardianS", "all", "all2", "dispatchers", "tutorial", "oldtutorial", "dispatchers") } func testDspGrdPing(t *testing.T) { diff --git a/dispatchers/libtest.go b/dispatchers/libtest.go index 933844442..71d75183b 100644 --- a/dispatchers/libtest.go +++ b/dispatchers/libtest.go @@ -33,7 +33,6 @@ import ( ) var ( - attrEngine *testDispatcher dispEngine *testDispatcher allEngine *testDispatcher allEngine2 *testDispatcher @@ -104,24 +103,22 @@ func (d *testDispatcher) loadData(t *testing.T, path string) { var reply string attrs := &utils.AttrLoadTpFromFolder{FolderPath: path} if err := d.RCP.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil { - t.Errorf("Error at loading data from folder:%v", err) + t.Errorf("<%s>Error at loading data from folder :%v", d.CfgParh, err) } } -func testDsp(t *testing.T, tests []func(t *testing.T), testName, all, all2, attr, disp, allTF, all2TF, attrTF string) { +func testDsp(t *testing.T, tests []func(t *testing.T), testName, all, all2, disp, allTF, all2TF, attrTF string) { engine.KillEngine(0) allEngine = newTestEngine(t, path.Join(dspDataDir, "conf", "samples", "dispatchers", all), true, true) allEngine2 = newTestEngine(t, path.Join(dspDataDir, "conf", "samples", "dispatchers", all2), true, true) - attrEngine = newTestEngine(t, path.Join(dspDataDir, "conf", "samples", "dispatchers", attr), true, true) dispEngine = newTestEngine(t, path.Join(dspDataDir, "conf", "samples", "dispatchers", disp), true, true) + dispEngine.loadData(t, path.Join(dspDataDir, "tariffplans", attrTF)) allEngine.loadData(t, path.Join(dspDataDir, "tariffplans", allTF)) allEngine2.loadData(t, path.Join(dspDataDir, "tariffplans", all2TF)) - attrEngine.loadData(t, path.Join(dspDataDir, "tariffplans", attrTF)) time.Sleep(500 * time.Millisecond) for _, stest := range tests { t.Run(testName, stest) } - attrEngine.stopEngine(t) dispEngine.stopEngine(t) allEngine.stopEngine(t) allEngine2.stopEngine(t) diff --git a/dispatchers/resources_it_test.go b/dispatchers/resources_it_test.go index 9b9c852b4..fcb1540b6 100755 --- a/dispatchers/resources_it_test.go +++ b/dispatchers/resources_it_test.go @@ -39,11 +39,11 @@ var sTestsDspRes = []func(t *testing.T){ //Test start here func TestDspResourceSTMySQL(t *testing.T) { - testDsp(t, sTestsDspRes, "TestDspResourceS", "all", "all2", "attributes", "dispatchers", "tutorial", "oldtutorial", "dispatchers") + testDsp(t, sTestsDspRes, "TestDspResourceS", "all", "all2", "dispatchers", "tutorial", "oldtutorial", "dispatchers") } func TestDspResourceSMongo(t *testing.T) { - testDsp(t, sTestsDspRes, "TestDspResourceS", "all", "all2", "attributes_mongo", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers") + testDsp(t, sTestsDspRes, "TestDspResourceS", "all", "all2", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers") } func testDspResPingFailover(t *testing.T) { diff --git a/dispatchers/responder_it_test.go b/dispatchers/responder_it_test.go index daf9255dd..3a71966f4 100644 --- a/dispatchers/responder_it_test.go +++ b/dispatchers/responder_it_test.go @@ -37,11 +37,11 @@ var sTestsDspRsp = []func(t *testing.T){ //Test start here func TestDspResponderTMySQL(t *testing.T) { - testDsp(t, sTestsDspRsp, "TestDspAttributeS", "all", "all2", "attributes", "dispatchers", "tutorial", "oldtutorial", "dispatchers") + testDsp(t, sTestsDspRsp, "TestDspAttributeS", "all", "all2", "dispatchers", "tutorial", "oldtutorial", "dispatchers") } func TestDspResponderMongo(t *testing.T) { - testDsp(t, sTestsDspRsp, "TestDspAttributeS", "all", "all2", "attributes_mongo", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers") + testDsp(t, sTestsDspRsp, "TestDspAttributeS", "all", "all2", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers") } func testDspResponderStatus(t *testing.T) { @@ -106,6 +106,9 @@ func getNodeWithRoute(route string, t *testing.T) string { if err := dispEngine.RCP.Call(utils.ResponderStatus, &ev, &reply); err != nil { t.Error(err) } + if reply[utils.NodeID] == nil { + return "" + } return reply[utils.NodeID].(string) } diff --git a/dispatchers/scheduler_it_test.go b/dispatchers/scheduler_it_test.go index 6409d7a35..0dff38932 100644 --- a/dispatchers/scheduler_it_test.go +++ b/dispatchers/scheduler_it_test.go @@ -32,7 +32,7 @@ var sTestsDspSched = []func(t *testing.T){ //Test start here func TestDspSchedulerSTMySQL(t *testing.T) { - testDsp(t, sTestsDspSched, "TestDspSchedulerSTMySQL", "all", "all2", "attributes", "dispatchers", "tutorial", "oldtutorial", "dispatchers") + testDsp(t, sTestsDspSched, "TestDspSchedulerSTMySQL", "all", "all2", "dispatchers", "tutorial", "oldtutorial", "dispatchers") } func testDspSchedPing(t *testing.T) { diff --git a/dispatchers/sessions_it_test.go b/dispatchers/sessions_it_test.go index aa89becd1..1a19a047a 100755 --- a/dispatchers/sessions_it_test.go +++ b/dispatchers/sessions_it_test.go @@ -56,11 +56,11 @@ var sTestsDspSession = []func(t *testing.T){ //Test start here func TestDspSessionSTMySQL(t *testing.T) { - testDsp(t, sTestsDspSession, "TestDspSessionS", "all", "all2", "attributes", "dispatchers", "testit", "tutorial", "dispatchers") + testDsp(t, sTestsDspSession, "TestDspSessionS", "all", "all2", "dispatchers", "testit", "tutorial", "dispatchers") } func TestDspSessionSMongo(t *testing.T) { - testDsp(t, sTestsDspSession, "TestDspSessionS", "all", "all2", "attributes_mongo", "dispatchers_mongo", "testit", "tutorial", "dispatchers") + testDsp(t, sTestsDspSession, "TestDspSessionS", "all", "all2", "dispatchers_mongo", "testit", "tutorial", "dispatchers") } func testDspSessionAddBalacne(t *testing.T) { diff --git a/dispatchers/stats_it_test.go b/dispatchers/stats_it_test.go index dcec3d51b..4bd88a790 100755 --- a/dispatchers/stats_it_test.go +++ b/dispatchers/stats_it_test.go @@ -42,11 +42,11 @@ var sTestsDspSts = []func(t *testing.T){ //Test start here func TestDspStatSTMySQL(t *testing.T) { - testDsp(t, sTestsDspSts, "TestDspStatS", "all", "all2", "attributes", "dispatchers", "tutorial", "oldtutorial", "dispatchers") + testDsp(t, sTestsDspSts, "TestDspStatS", "all", "all2", "dispatchers", "tutorial", "oldtutorial", "dispatchers") } func TestDspStatSMongo(t *testing.T) { - testDsp(t, sTestsDspSts, "TestDspStatS", "all", "all2", "attributes_mongo", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers") + testDsp(t, sTestsDspSts, "TestDspStatS", "all", "all2", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers") } func testDspStsPingFailover(t *testing.T) { diff --git a/dispatchers/suppliers_it_test.go b/dispatchers/suppliers_it_test.go index 8b066beeb..f4e91fb08 100755 --- a/dispatchers/suppliers_it_test.go +++ b/dispatchers/suppliers_it_test.go @@ -41,11 +41,11 @@ var sTestsDspSup = []func(t *testing.T){ //Test start here func TestDspSupplierSTMySQL(t *testing.T) { - testDsp(t, sTestsDspSup, "TestDspSupplierS", "all", "all2", "attributes", "dispatchers", "tutorial", "oldtutorial", "dispatchers") + testDsp(t, sTestsDspSup, "TestDspSupplierS", "all", "all2", "dispatchers", "tutorial", "oldtutorial", "dispatchers") } func TestDspSupplierSMongo(t *testing.T) { - testDsp(t, sTestsDspSup, "TestDspSupplierS", "all", "all2", "attributes_mongo", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers") + testDsp(t, sTestsDspSup, "TestDspSupplierS", "all", "all2", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers") } func testDspSupPing(t *testing.T) { diff --git a/dispatchers/thresholds_it_test.go b/dispatchers/thresholds_it_test.go index 6add30604..1d09e1b48 100755 --- a/dispatchers/thresholds_it_test.go +++ b/dispatchers/thresholds_it_test.go @@ -42,11 +42,11 @@ var sTestsDspTh = []func(t *testing.T){ //Test start here func TestDspThresholdSTMySQL(t *testing.T) { - testDsp(t, sTestsDspTh, "TestDspThresholdS", "all", "all2", "attributes", "dispatchers", "tutorial", "oldtutorial", "dispatchers") + testDsp(t, sTestsDspTh, "TestDspThresholdS", "all", "all2", "dispatchers", "tutorial", "oldtutorial", "dispatchers") } func TestDspThresholdSMongo(t *testing.T) { - testDsp(t, sTestsDspTh, "TestDspThresholdS", "all", "all2", "attributes_mongo", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers") + testDsp(t, sTestsDspTh, "TestDspThresholdS", "all", "all2", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers") } func testDspThPingFailover(t *testing.T) { diff --git a/engine/datamanager.go b/engine/datamanager.go index 2ae12e80f..2241fe741 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -18,7 +18,6 @@ package engine import ( "fmt" "strings" - "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" @@ -1334,7 +1333,7 @@ func (dm *DataManager) GetDispatcherHost(tenant, id string, cacheRead, cacheWrit cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - dH.Conns, nil, time.Duration(0), false); err != nil { + dH.Conns, IntRPC.GetConnChan(), cfg.GeneralCfg().InternalTtl, false); err != nil { return nil, err } if cacheWrite { diff --git a/engine/libengine.go b/engine/libengine.go index 89157bdeb..cb932f294 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -21,6 +21,8 @@ package engine import ( "errors" "fmt" + "strings" + "sync" "time" "github.com/cgrates/cgrates/config" @@ -66,3 +68,43 @@ func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connA } return rpcPool, err } + +var IntRPC *InternalRPC + +func init() { + IntRPC = &InternalRPC{subsystems: make(map[string]rpcclient.RpcClientConnection)} +} + +type InternalRPC struct { + sync.Mutex + subsystems map[string]rpcclient.RpcClientConnection +} + +func (irpc *InternalRPC) AddConnection(name string, conn rpcclient.RpcClientConnection) { + if conn == nil { + return + } + irpc.Lock() + irpc.subsystems[name] = conn + irpc.Unlock() +} + +func (irpc *InternalRPC) Call(method string, args interface{}, reply interface{}) error { + methodSplit := strings.Split(method, ".") + if len(methodSplit) != 2 { + return rpcclient.ErrUnsupporteServiceMethod + } + irpc.Lock() + defer irpc.Unlock() + conn, has := irpc.subsystems[methodSplit[0]] + if !has { + return rpcclient.ErrUnsupporteServiceMethod + } + return conn.Call(method, args, reply) +} + +func (irpc *InternalRPC) GetConnChan() (connChan chan rpcclient.RpcClientConnection) { + connChan = make(chan rpcclient.RpcClientConnection, 1) + connChan <- irpc + return +} diff --git a/utils/consts.go b/utils/consts.go index 61b6f0716..48c1afe38 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -703,6 +703,7 @@ const ( // ApierV1 APIs const ( + ApierV1 = "ApierV1" ApierV1ComputeFilterIndexes = "ApierV1.ComputeFilterIndexes" ApierV1Ping = "ApierV1.Ping" ApierV1SetDispatcherProfile = "ApierV1.SetDispatcherProfile" @@ -716,6 +717,7 @@ const ( ) const ( + ApierV2 = "ApierV2" ApierV2LoadTariffPlanFromFolder = "ApierV2.LoadTariffPlanFromFolder" ApierV2GetCDRs = "ApierV2.GetCDRs" ApierV2GetAccount = "ApierV2.GetAccount"