mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Started adding dispatcher internal connections
This commit is contained in:
committed by
Dan Christian Bogos
parent
edb2f6ab97
commit
6b487537b8
@@ -46,3 +46,9 @@ func (self *GuardianSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *strin
|
||||
*reply = utils.Pong
|
||||
return nil
|
||||
}
|
||||
|
||||
// Call implements rpcclient.RpcClientConnection interface for internal RPC
|
||||
func (self *GuardianSv1) Call(serviceMethod string,
|
||||
args interface{}, reply interface{}) error {
|
||||
return utils.APIerRPCCall(self, serviceMethod, args, reply)
|
||||
}
|
||||
|
||||
@@ -41,3 +41,9 @@ func (schdSv1 *SchedulerSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *s
|
||||
*reply = utils.Pong
|
||||
return nil
|
||||
}
|
||||
|
||||
// Call implements rpcclient.RpcClientConnection interface for internal RPC
|
||||
func (schdSv1 *SchedulerSv1) Call(serviceMethod string,
|
||||
args interface{}, reply interface{}) error {
|
||||
return utils.APIerRPCCall(schdSv1, serviceMethod, args, reply)
|
||||
}
|
||||
|
||||
@@ -113,5 +113,10 @@ func (ssv1 *SessionSv1) ReplicateSessions(args sessions.ArgsReplicateSessions, r
|
||||
func (ssv1 *SessionSv1) SetPassiveSession(args *sessions.Session,
|
||||
reply *string) error {
|
||||
return ssv1.Ss.BiRPCv1SetPassiveSession(nil, args, reply)
|
||||
|
||||
}
|
||||
|
||||
// Call implements rpcclient.RpcClientConnection interface for internal RPC
|
||||
func (ssv1 *SessionSv1) Call(serviceMethod string,
|
||||
args interface{}, reply interface{}) error {
|
||||
return utils.APIerRPCCall(ssv1, serviceMethod, args, reply)
|
||||
}
|
||||
|
||||
@@ -299,7 +299,11 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
|
||||
server.RpcRegister(smgRpc)
|
||||
|
||||
ssv1 := v1.NewSessionSv1(sm) // methods with multiple options
|
||||
server.RpcRegister(ssv1)
|
||||
if !config.CgrConfig().DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(ssv1)
|
||||
} else {
|
||||
engine.IntRPC.AddConnection(utils.SessionSv1, ssv1)
|
||||
}
|
||||
// Register BiRpc handlers
|
||||
if cfg.SessionSCfg().ListenBijson != "" {
|
||||
for method, handler := range smgRpc.Handlers() {
|
||||
@@ -674,7 +678,11 @@ func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnec
|
||||
return
|
||||
}()
|
||||
aSv1 := v1.NewAttributeSv1(aS)
|
||||
server.RpcRegister(aSv1)
|
||||
if !config.CgrConfig().DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(aSv1)
|
||||
} else {
|
||||
engine.IntRPC.AddConnection(utils.AttributeSv1, aSv1)
|
||||
}
|
||||
internalAttributeSChan <- aSv1
|
||||
}
|
||||
|
||||
@@ -723,7 +731,11 @@ func startChargerService(internalChargerSChan chan rpcclient.RpcClientConnection
|
||||
return
|
||||
}()
|
||||
cSv1 := v1.NewChargerSv1(cS)
|
||||
server.RpcRegister(cSv1)
|
||||
if !config.CgrConfig().DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(cSv1)
|
||||
} else {
|
||||
engine.IntRPC.AddConnection(utils.ChargerSv1, cSv1)
|
||||
}
|
||||
internalChargerSChan <- cSv1
|
||||
}
|
||||
|
||||
@@ -770,7 +782,11 @@ func startResourceService(internalRsChan chan rpcclient.RpcClientConnection, cac
|
||||
return
|
||||
}()
|
||||
rsV1 := v1.NewResourceSv1(rS)
|
||||
server.RpcRegister(rsV1)
|
||||
if !config.CgrConfig().DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(rsV1)
|
||||
} else {
|
||||
engine.IntRPC.AddConnection(utils.ResourceSv1, rsV1)
|
||||
}
|
||||
internalRsChan <- rsV1
|
||||
}
|
||||
|
||||
@@ -817,7 +833,11 @@ func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cach
|
||||
return
|
||||
}()
|
||||
stsV1 := v1.NewStatSv1(sS)
|
||||
server.RpcRegister(stsV1)
|
||||
if !config.CgrConfig().DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(stsV1)
|
||||
} else {
|
||||
engine.IntRPC.AddConnection(utils.StatSv1, stsV1)
|
||||
}
|
||||
internalStatSChan <- stsV1
|
||||
}
|
||||
|
||||
@@ -848,7 +868,11 @@ func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnec
|
||||
return
|
||||
}()
|
||||
tSv1 := v1.NewThresholdSv1(tS)
|
||||
server.RpcRegister(tSv1)
|
||||
if !config.CgrConfig().DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(tSv1)
|
||||
} else {
|
||||
engine.IntRPC.AddConnection(utils.ThresholdSv1, tSv1)
|
||||
}
|
||||
internalThresholdSChan <- tSv1
|
||||
}
|
||||
|
||||
@@ -929,7 +953,11 @@ func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnecti
|
||||
return
|
||||
}()
|
||||
splV1 := v1.NewSupplierSv1(splS)
|
||||
server.RpcRegister(splV1)
|
||||
if !config.CgrConfig().DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(splV1)
|
||||
} else {
|
||||
engine.IntRPC.AddConnection(utils.SupplierSv1, splV1)
|
||||
}
|
||||
internalSupplierSChan <- splV1
|
||||
}
|
||||
|
||||
@@ -1080,6 +1108,8 @@ func initCacheS(internalCacheSChan chan rpcclient.RpcClientConnection,
|
||||
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(v1.NewCacheSv1(chS))
|
||||
} else {
|
||||
engine.IntRPC.AddConnection(utils.CacheSv1, v1.NewCacheSv1(chS))
|
||||
}
|
||||
internalCacheSChan <- chS
|
||||
return
|
||||
@@ -1088,6 +1118,8 @@ func initCacheS(internalCacheSChan chan rpcclient.RpcClientConnection,
|
||||
func initGuardianSv1(server *utils.Server) {
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(v1.NewGuardianSv1())
|
||||
} else {
|
||||
engine.IntRPC.AddConnection(utils.GuardianSv1, v1.NewGuardianSv1())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1096,6 +1128,8 @@ func initSchedulerS(internalCacheSChan chan rpcclient.RpcClientConnection,
|
||||
schdS := servmanager.NewSchedulerS(srvMngr)
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(v1.NewSchedulerSv1(schdS))
|
||||
} else {
|
||||
engine.IntRPC.AddConnection(utils.SchedulerSv1, v1.NewSchedulerSv1(schdS))
|
||||
}
|
||||
internalCacheSChan <- schdS
|
||||
}
|
||||
|
||||
@@ -188,7 +188,11 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en
|
||||
apierRpcV2 := &v2.ApierV2{
|
||||
ApierV1: *apierRpcV1}
|
||||
|
||||
server.RpcRegister(responder)
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(responder)
|
||||
} else {
|
||||
engine.IntRPC.AddConnection(utils.Responder, responder)
|
||||
}
|
||||
server.RpcRegister(apierRpcV1)
|
||||
server.RpcRegister(apierRpcV2)
|
||||
|
||||
|
||||
@@ -692,11 +692,6 @@ func (self *CGRConfig) checkConfigSanity() error {
|
||||
}
|
||||
}
|
||||
}
|
||||
if self.dispatcherSCfg.Enabled {
|
||||
if self.attributeSCfg.Enabled {
|
||||
return fmt.Errorf("<%s> cannot start in tandem with <%s>", utils.DispatcherS, utils.AttributeS)
|
||||
}
|
||||
}
|
||||
// Scheduler check connection with CDR Server
|
||||
if !self.cdrsCfg.CDRSEnabled {
|
||||
for _, connCfg := range self.schedulerCfg.CDRsConns {
|
||||
|
||||
@@ -1,40 +0,0 @@
|
||||
{
|
||||
// CGRateS Configuration file
|
||||
//
|
||||
|
||||
|
||||
"general": {
|
||||
"node_id": "AttributeS1",
|
||||
"log_level": 7
|
||||
},
|
||||
|
||||
"listen": {
|
||||
"rpc_json": ":5012",
|
||||
"rpc_gob": ":5013",
|
||||
"http": ":5080",
|
||||
},
|
||||
|
||||
"stor_db": {
|
||||
"db_type":"*internal",
|
||||
},
|
||||
|
||||
"attributes": {
|
||||
"enabled": true
|
||||
},
|
||||
|
||||
"rals": {
|
||||
"enabled": true,
|
||||
},
|
||||
|
||||
"scheduler": {
|
||||
"enabled": true,
|
||||
},
|
||||
|
||||
"apier": {
|
||||
"caches_conns":[ // connections to CacheS for reloads
|
||||
{"address": "127.0.0.1:5012", "transport": "*json"},
|
||||
],
|
||||
},
|
||||
|
||||
}
|
||||
|
||||
@@ -1,49 +0,0 @@
|
||||
{
|
||||
// CGRateS Configuration file
|
||||
//
|
||||
|
||||
|
||||
"general": {
|
||||
"node_id": "AttributeS1",
|
||||
"log_level": 7
|
||||
},
|
||||
|
||||
"listen": {
|
||||
"rpc_json": ":5012",
|
||||
"rpc_gob": ":5013",
|
||||
"http": ":5080",
|
||||
},
|
||||
|
||||
"data_db": {
|
||||
"db_type": "mongo",
|
||||
"db_name": "10",
|
||||
"db_port": 27017,
|
||||
},
|
||||
|
||||
|
||||
"stor_db": {
|
||||
"db_type": "mongo",
|
||||
"db_name": "cgrates",
|
||||
"db_port": 27017,
|
||||
},
|
||||
|
||||
"scheduler": {
|
||||
"enabled": true,
|
||||
},
|
||||
|
||||
"attributes": {
|
||||
"enabled": true
|
||||
},
|
||||
|
||||
"rals": {
|
||||
"enabled": true,
|
||||
},
|
||||
|
||||
"apier": {
|
||||
"caches_conns":[ // connections to CacheS for reloads
|
||||
{"address": "127.0.0.1:5012", "transport": "*json"},
|
||||
],
|
||||
},
|
||||
|
||||
}
|
||||
|
||||
@@ -29,24 +29,22 @@
|
||||
|
||||
|
||||
"attributes": {
|
||||
"enabled": false
|
||||
"enabled": true
|
||||
},
|
||||
|
||||
"scheduler": {
|
||||
"enabled": true,
|
||||
},
|
||||
|
||||
"rals": {
|
||||
"enabled": true,
|
||||
},
|
||||
|
||||
"dispatchers":{
|
||||
"enabled": true,
|
||||
"attributes_conns": [
|
||||
{"address": "127.0.0.1:5012", "transport": "*json"},
|
||||
{"address": "*internal"},
|
||||
],
|
||||
},
|
||||
|
||||
"apier": {
|
||||
"caches_conns":[ // connections to CacheS for reloads
|
||||
{"address": "127.0.0.1:2012", "transport": "*json"},
|
||||
],
|
||||
},
|
||||
|
||||
}
|
||||
@@ -41,16 +41,18 @@
|
||||
},
|
||||
|
||||
"attributes": {
|
||||
"enabled": false
|
||||
"enabled": true
|
||||
},
|
||||
|
||||
"rals": {
|
||||
"enabled": true,
|
||||
},
|
||||
|
||||
"dispatchers":{
|
||||
"enabled": true,
|
||||
"attributes_conns": [
|
||||
{"address": "127.0.0.1:5012", "transport": "*json"},
|
||||
{"address": "*internal"},
|
||||
],
|
||||
},
|
||||
|
||||
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
#Tenant[0],ID[1],Address[2],Transport[3],TLS[4]
|
||||
cgrates.org,AttributeS1,127.0.0.1:5012,*json,false
|
||||
cgrates.org,SELF,*internal,,
|
||||
cgrates.org,ALL,127.0.0.1:6012,*json,false
|
||||
cgrates.org,ALL2,127.0.0.1:7012,*json,false
|
||||
|
||||
|
@@ -45,11 +45,11 @@ var sTestsDspAttr = []func(t *testing.T){
|
||||
|
||||
//Test start here
|
||||
func TestDspAttributeSTMySQL(t *testing.T) {
|
||||
testDsp(t, sTestsDspAttr, "TestDspAttributeS", "all", "all2", "attributes", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
|
||||
testDsp(t, sTestsDspAttr, "TestDspAttributeS", "all", "all2", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
|
||||
}
|
||||
|
||||
func TestDspAttributeSMongo(t *testing.T) {
|
||||
testDsp(t, sTestsDspAttr, "TestDspAttributeS", "all", "all2", "attributes_mongo", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers")
|
||||
testDsp(t, sTestsDspAttr, "TestDspAttributeS", "all", "all2", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers")
|
||||
}
|
||||
|
||||
func testDspAttrPingFailover(t *testing.T) {
|
||||
|
||||
@@ -45,11 +45,11 @@ var sTestsDspChc = []func(t *testing.T){
|
||||
|
||||
//Test start here
|
||||
func TestDspCacheSv1TMySQL(t *testing.T) {
|
||||
testDsp(t, sTestsDspChc, "TestDspCacheSv1", "all", "all2", "attributes", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
|
||||
testDsp(t, sTestsDspChc, "TestDspCacheSv1", "all", "all2", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
|
||||
}
|
||||
|
||||
func TestDspCacheSv1Mongo(t *testing.T) {
|
||||
testDsp(t, sTestsDspChc, "TestDspCacheSv1", "all", "all2", "attributes_mongo", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers")
|
||||
testDsp(t, sTestsDspChc, "TestDspCacheSv1", "all", "all2", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers")
|
||||
}
|
||||
|
||||
func testDspChcPing(t *testing.T) {
|
||||
|
||||
@@ -40,11 +40,11 @@ var sTestsDspCpp = []func(t *testing.T){
|
||||
|
||||
//Test start here
|
||||
func TestDspChargerSTMySQL(t *testing.T) {
|
||||
testDsp(t, sTestsDspCpp, "TestDspChargerS", "all", "all2", "attributes", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
|
||||
testDsp(t, sTestsDspCpp, "TestDspChargerS", "all", "all2", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
|
||||
}
|
||||
|
||||
func TestDspChargerSMongo(t *testing.T) {
|
||||
testDsp(t, sTestsDspCpp, "TestDspChargerS", "all", "all2", "attributes_mongo", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers")
|
||||
testDsp(t, sTestsDspCpp, "TestDspChargerS", "all", "all2", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers")
|
||||
}
|
||||
|
||||
func testDspCppPingFailover(t *testing.T) {
|
||||
|
||||
@@ -35,7 +35,7 @@ var sTestsDspGrd = []func(t *testing.T){
|
||||
|
||||
//Test start here
|
||||
func TestDspGuardianSTMySQL(t *testing.T) {
|
||||
testDsp(t, sTestsDspGrd, "TestDspGuardianS", "all", "all2", "attributes", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
|
||||
testDsp(t, sTestsDspGrd, "TestDspGuardianS", "all", "all2", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
|
||||
}
|
||||
|
||||
func testDspGrdPing(t *testing.T) {
|
||||
|
||||
@@ -33,7 +33,6 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
attrEngine *testDispatcher
|
||||
dispEngine *testDispatcher
|
||||
allEngine *testDispatcher
|
||||
allEngine2 *testDispatcher
|
||||
@@ -104,24 +103,22 @@ func (d *testDispatcher) loadData(t *testing.T, path string) {
|
||||
var reply string
|
||||
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path}
|
||||
if err := d.RCP.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil {
|
||||
t.Errorf("Error at loading data from folder:%v", err)
|
||||
t.Errorf("<%s>Error at loading data from folder :%v", d.CfgParh, err)
|
||||
}
|
||||
}
|
||||
|
||||
func testDsp(t *testing.T, tests []func(t *testing.T), testName, all, all2, attr, disp, allTF, all2TF, attrTF string) {
|
||||
func testDsp(t *testing.T, tests []func(t *testing.T), testName, all, all2, disp, allTF, all2TF, attrTF string) {
|
||||
engine.KillEngine(0)
|
||||
allEngine = newTestEngine(t, path.Join(dspDataDir, "conf", "samples", "dispatchers", all), true, true)
|
||||
allEngine2 = newTestEngine(t, path.Join(dspDataDir, "conf", "samples", "dispatchers", all2), true, true)
|
||||
attrEngine = newTestEngine(t, path.Join(dspDataDir, "conf", "samples", "dispatchers", attr), true, true)
|
||||
dispEngine = newTestEngine(t, path.Join(dspDataDir, "conf", "samples", "dispatchers", disp), true, true)
|
||||
dispEngine.loadData(t, path.Join(dspDataDir, "tariffplans", attrTF))
|
||||
allEngine.loadData(t, path.Join(dspDataDir, "tariffplans", allTF))
|
||||
allEngine2.loadData(t, path.Join(dspDataDir, "tariffplans", all2TF))
|
||||
attrEngine.loadData(t, path.Join(dspDataDir, "tariffplans", attrTF))
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
for _, stest := range tests {
|
||||
t.Run(testName, stest)
|
||||
}
|
||||
attrEngine.stopEngine(t)
|
||||
dispEngine.stopEngine(t)
|
||||
allEngine.stopEngine(t)
|
||||
allEngine2.stopEngine(t)
|
||||
|
||||
@@ -39,11 +39,11 @@ var sTestsDspRes = []func(t *testing.T){
|
||||
|
||||
//Test start here
|
||||
func TestDspResourceSTMySQL(t *testing.T) {
|
||||
testDsp(t, sTestsDspRes, "TestDspResourceS", "all", "all2", "attributes", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
|
||||
testDsp(t, sTestsDspRes, "TestDspResourceS", "all", "all2", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
|
||||
}
|
||||
|
||||
func TestDspResourceSMongo(t *testing.T) {
|
||||
testDsp(t, sTestsDspRes, "TestDspResourceS", "all", "all2", "attributes_mongo", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers")
|
||||
testDsp(t, sTestsDspRes, "TestDspResourceS", "all", "all2", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers")
|
||||
}
|
||||
|
||||
func testDspResPingFailover(t *testing.T) {
|
||||
|
||||
@@ -37,11 +37,11 @@ var sTestsDspRsp = []func(t *testing.T){
|
||||
|
||||
//Test start here
|
||||
func TestDspResponderTMySQL(t *testing.T) {
|
||||
testDsp(t, sTestsDspRsp, "TestDspAttributeS", "all", "all2", "attributes", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
|
||||
testDsp(t, sTestsDspRsp, "TestDspAttributeS", "all", "all2", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
|
||||
}
|
||||
|
||||
func TestDspResponderMongo(t *testing.T) {
|
||||
testDsp(t, sTestsDspRsp, "TestDspAttributeS", "all", "all2", "attributes_mongo", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers")
|
||||
testDsp(t, sTestsDspRsp, "TestDspAttributeS", "all", "all2", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers")
|
||||
}
|
||||
|
||||
func testDspResponderStatus(t *testing.T) {
|
||||
@@ -106,6 +106,9 @@ func getNodeWithRoute(route string, t *testing.T) string {
|
||||
if err := dispEngine.RCP.Call(utils.ResponderStatus, &ev, &reply); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if reply[utils.NodeID] == nil {
|
||||
return ""
|
||||
}
|
||||
return reply[utils.NodeID].(string)
|
||||
}
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ var sTestsDspSched = []func(t *testing.T){
|
||||
|
||||
//Test start here
|
||||
func TestDspSchedulerSTMySQL(t *testing.T) {
|
||||
testDsp(t, sTestsDspSched, "TestDspSchedulerSTMySQL", "all", "all2", "attributes", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
|
||||
testDsp(t, sTestsDspSched, "TestDspSchedulerSTMySQL", "all", "all2", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
|
||||
}
|
||||
|
||||
func testDspSchedPing(t *testing.T) {
|
||||
|
||||
@@ -56,11 +56,11 @@ var sTestsDspSession = []func(t *testing.T){
|
||||
|
||||
//Test start here
|
||||
func TestDspSessionSTMySQL(t *testing.T) {
|
||||
testDsp(t, sTestsDspSession, "TestDspSessionS", "all", "all2", "attributes", "dispatchers", "testit", "tutorial", "dispatchers")
|
||||
testDsp(t, sTestsDspSession, "TestDspSessionS", "all", "all2", "dispatchers", "testit", "tutorial", "dispatchers")
|
||||
}
|
||||
|
||||
func TestDspSessionSMongo(t *testing.T) {
|
||||
testDsp(t, sTestsDspSession, "TestDspSessionS", "all", "all2", "attributes_mongo", "dispatchers_mongo", "testit", "tutorial", "dispatchers")
|
||||
testDsp(t, sTestsDspSession, "TestDspSessionS", "all", "all2", "dispatchers_mongo", "testit", "tutorial", "dispatchers")
|
||||
}
|
||||
|
||||
func testDspSessionAddBalacne(t *testing.T) {
|
||||
|
||||
@@ -42,11 +42,11 @@ var sTestsDspSts = []func(t *testing.T){
|
||||
|
||||
//Test start here
|
||||
func TestDspStatSTMySQL(t *testing.T) {
|
||||
testDsp(t, sTestsDspSts, "TestDspStatS", "all", "all2", "attributes", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
|
||||
testDsp(t, sTestsDspSts, "TestDspStatS", "all", "all2", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
|
||||
}
|
||||
|
||||
func TestDspStatSMongo(t *testing.T) {
|
||||
testDsp(t, sTestsDspSts, "TestDspStatS", "all", "all2", "attributes_mongo", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers")
|
||||
testDsp(t, sTestsDspSts, "TestDspStatS", "all", "all2", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers")
|
||||
}
|
||||
|
||||
func testDspStsPingFailover(t *testing.T) {
|
||||
|
||||
@@ -41,11 +41,11 @@ var sTestsDspSup = []func(t *testing.T){
|
||||
|
||||
//Test start here
|
||||
func TestDspSupplierSTMySQL(t *testing.T) {
|
||||
testDsp(t, sTestsDspSup, "TestDspSupplierS", "all", "all2", "attributes", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
|
||||
testDsp(t, sTestsDspSup, "TestDspSupplierS", "all", "all2", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
|
||||
}
|
||||
|
||||
func TestDspSupplierSMongo(t *testing.T) {
|
||||
testDsp(t, sTestsDspSup, "TestDspSupplierS", "all", "all2", "attributes_mongo", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers")
|
||||
testDsp(t, sTestsDspSup, "TestDspSupplierS", "all", "all2", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers")
|
||||
}
|
||||
|
||||
func testDspSupPing(t *testing.T) {
|
||||
|
||||
@@ -42,11 +42,11 @@ var sTestsDspTh = []func(t *testing.T){
|
||||
|
||||
//Test start here
|
||||
func TestDspThresholdSTMySQL(t *testing.T) {
|
||||
testDsp(t, sTestsDspTh, "TestDspThresholdS", "all", "all2", "attributes", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
|
||||
testDsp(t, sTestsDspTh, "TestDspThresholdS", "all", "all2", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
|
||||
}
|
||||
|
||||
func TestDspThresholdSMongo(t *testing.T) {
|
||||
testDsp(t, sTestsDspTh, "TestDspThresholdS", "all", "all2", "attributes_mongo", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers")
|
||||
testDsp(t, sTestsDspTh, "TestDspThresholdS", "all", "all2", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers")
|
||||
}
|
||||
|
||||
func testDspThPingFailover(t *testing.T) {
|
||||
|
||||
@@ -18,7 +18,6 @@ package engine
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -1334,7 +1333,7 @@ func (dm *DataManager) GetDispatcherHost(tenant, id string, cacheRead, cacheWrit
|
||||
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
|
||||
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
|
||||
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
|
||||
dH.Conns, nil, time.Duration(0), false); err != nil {
|
||||
dH.Conns, IntRPC.GetConnChan(), cfg.GeneralCfg().InternalTtl, false); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cacheWrite {
|
||||
|
||||
@@ -21,6 +21,8 @@ package engine
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
@@ -66,3 +68,43 @@ func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connA
|
||||
}
|
||||
return rpcPool, err
|
||||
}
|
||||
|
||||
var IntRPC *InternalRPC
|
||||
|
||||
func init() {
|
||||
IntRPC = &InternalRPC{subsystems: make(map[string]rpcclient.RpcClientConnection)}
|
||||
}
|
||||
|
||||
type InternalRPC struct {
|
||||
sync.Mutex
|
||||
subsystems map[string]rpcclient.RpcClientConnection
|
||||
}
|
||||
|
||||
func (irpc *InternalRPC) AddConnection(name string, conn rpcclient.RpcClientConnection) {
|
||||
if conn == nil {
|
||||
return
|
||||
}
|
||||
irpc.Lock()
|
||||
irpc.subsystems[name] = conn
|
||||
irpc.Unlock()
|
||||
}
|
||||
|
||||
func (irpc *InternalRPC) Call(method string, args interface{}, reply interface{}) error {
|
||||
methodSplit := strings.Split(method, ".")
|
||||
if len(methodSplit) != 2 {
|
||||
return rpcclient.ErrUnsupporteServiceMethod
|
||||
}
|
||||
irpc.Lock()
|
||||
defer irpc.Unlock()
|
||||
conn, has := irpc.subsystems[methodSplit[0]]
|
||||
if !has {
|
||||
return rpcclient.ErrUnsupporteServiceMethod
|
||||
}
|
||||
return conn.Call(method, args, reply)
|
||||
}
|
||||
|
||||
func (irpc *InternalRPC) GetConnChan() (connChan chan rpcclient.RpcClientConnection) {
|
||||
connChan = make(chan rpcclient.RpcClientConnection, 1)
|
||||
connChan <- irpc
|
||||
return
|
||||
}
|
||||
|
||||
@@ -703,6 +703,7 @@ const (
|
||||
|
||||
// ApierV1 APIs
|
||||
const (
|
||||
ApierV1 = "ApierV1"
|
||||
ApierV1ComputeFilterIndexes = "ApierV1.ComputeFilterIndexes"
|
||||
ApierV1Ping = "ApierV1.Ping"
|
||||
ApierV1SetDispatcherProfile = "ApierV1.SetDispatcherProfile"
|
||||
@@ -716,6 +717,7 @@ const (
|
||||
)
|
||||
|
||||
const (
|
||||
ApierV2 = "ApierV2"
|
||||
ApierV2LoadTariffPlanFromFolder = "ApierV2.LoadTariffPlanFromFolder"
|
||||
ApierV2GetCDRs = "ApierV2.GetCDRs"
|
||||
ApierV2GetAccount = "ApierV2.GetAccount"
|
||||
|
||||
Reference in New Issue
Block a user