Added test for internal connection in dispatcher

This commit is contained in:
Tripon Alexandru-Ionut
2019-04-08 19:09:19 +03:00
committed by Dan Christian Bogos
parent 7d95f1220a
commit e66a5398d7
6 changed files with 67 additions and 7 deletions

View File

@@ -54,6 +54,12 @@ type ApierV1 struct {
SchedulerS rpcclient.RpcClientConnection
}
// Call implements rpcclient.RpcClientConnection interface for internal RPC
func (self *ApierV1) Call(serviceMethod string,
args interface{}, reply interface{}) error {
return utils.APIerRPCCall(self, serviceMethod, args, reply)
}
func (self *ApierV1) GetDestination(dstId string, reply *engine.Destination) error {
if dst, err := self.DataManager.DataDB().GetDestination(dstId, false, utils.NonTransactional); err != nil {
return utils.ErrNotFound

View File

@@ -39,6 +39,12 @@ type ApierV2 struct {
v1.ApierV1
}
// Call implements rpcclient.RpcClientConnection interface for internal RPC
func (self *ApierV2) Call(serviceMethod string,
args interface{}, reply interface{}) error {
return utils.APIerRPCCall(self, serviceMethod, args, reply)
}
type AttrLoadRatingProfile struct {
TPid string
RatingProfileId string

View File

@@ -1098,7 +1098,7 @@ func initCacheS(internalCacheSChan chan rpcclient.RpcClientConnection,
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(chSv1)
}
internalCacheSChan <- chS //v1
internalCacheSChan <- chS
return
}
@@ -1437,17 +1437,19 @@ func main() {
internalSchedSChan := make(chan rpcclient.RpcClientConnection, 1)
internalGuardianSChan := make(chan rpcclient.RpcClientConnection, 1)
internalLoaderSChan := make(chan rpcclient.RpcClientConnection, 1)
internalApierV1Chan := make(chan rpcclient.RpcClientConnection, 1)
internalApierV2Chan := make(chan rpcclient.RpcClientConnection, 1)
// init internalRPCSet
engine.IntRPC = engine.NewRPCClientSet()
if cfg.DispatcherSCfg().Enabled {
engine.IntRPC.AddInternalRPCClient(utils.AnalyzerSv1, internalAnalyzerSChan)
// engine.IntRPC.AddInternalRPCClient(utils.ApierV1, internalApierV1Chan)
// engine.IntRPC.AddInternalRPCClient(utils.ApierV2, internalApierV2Chan)
engine.IntRPC.AddInternalRPCClient(utils.ApierV1, internalApierV1Chan)
engine.IntRPC.AddInternalRPCClient(utils.ApierV2, internalApierV2Chan)
engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, internalAttributeSChan)
engine.IntRPC.AddInternalRPCClient(utils.CacheSv1, internalCacheSChan) // server or from apier
// engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, internalCdrSChan)
// engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, internalCdrSChan)
engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, internalCdrSChan)
engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, internalCdrSChan)
engine.IntRPC.AddInternalRPCClient(utils.ChargerSv1, internalChargerSChan)
engine.IntRPC.AddInternalRPCClient(utils.GuardianSv1, internalGuardianSChan)
engine.IntRPC.AddInternalRPCClient(utils.LoaderSv1, internalLoaderSChan)
@@ -1479,7 +1481,7 @@ func main() {
// Start RALs
if cfg.RalsCfg().RALsEnabled {
go startRater(internalRaterChan, cacheS, internalThresholdSChan,
go startRater(internalRaterChan, internalApierV1Chan, internalApierV2Chan, cacheS, internalThresholdSChan,
internalStatSChan, srvManager, server, dm, loadDb, cdrDb,
&stopHandled, exitChan, cacheS, filterSChan, internalCacheSChan, internalSchedSChan)
}

View File

@@ -31,7 +31,7 @@ import (
)
// Starts rater and reports on chan
func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS,
func startRater(internalRaterChan, internalApierv1, internalApierv2 chan rpcclient.RpcClientConnection, cacheS *engine.CacheS,
internalThdSChan, internalStatSChan chan rpcclient.RpcClientConnection,
serviceManager *servmanager.ServiceManager, server *utils.Server,
dm *engine.DataManager, loadDb engine.LoadStorage, cdrDb engine.CdrStorage,
@@ -201,5 +201,8 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en
utils.RegisterRpcParams("", apierRpcV1)
utils.RegisterRpcParams("", apierRpcV2)
utils.GetRpcParams("")
internalApierv1 <- apierRpcV1
internalApierv2 <- apierRpcV2
internalRaterChan <- responder // Rater done
}

View File

@@ -9,3 +9,4 @@ cgrates.org,EVENT3,*any,*string:~EventName:Random,,*random,,ALL2,,20,false,,20
cgrates.org,EVENT3,,,,,,ALL,,10,,,
cgrates.org,EVENT4,*any,*string:~EventName:Broadcast,,*broadcast,,ALL2,,20,false,,20
cgrates.org,EVENT4,,,,,,ALL,,10,,,
cgrates.org,EVENT5,*any,*string:~EventName:Internal,,*weight,,SELF,,20,false,,20
1 #Tenant ID Subsystems FilterIDs ActivationInterval Strategy StrategyParameters ConnID ConnFilterIDs ConnWeight ConnBlocker ConnParameters Weight
9 cgrates.org EVENT3 ALL 10
10 cgrates.org EVENT4 *any *string:~EventName:Broadcast *broadcast ALL2 20 false 20
11 cgrates.org EVENT4 ALL 10
12 cgrates.org EVENT5 *any *string:~EventName:Internal *weight SELF 20 false 20

View File

@@ -33,6 +33,7 @@ var sTestsDspRsp = []func(t *testing.T){
testDspResponderRandom,
testDspResponderBroadcast,
testDspResponderInternal,
}
//Test start here
@@ -186,3 +187,44 @@ func testDspResponderBroadcast(t *testing.T) {
allEngine.startEngine(t)
allEngine2.startEngine(t)
}
func testDspResponderInternal(t *testing.T) {
var reply map[string]interface{}
var pingReply string
route := "internal"
pingEv := utils.CGREventWithArgDispatcher{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
utils.EVENT_NAME: "Internal",
},
},
ArgDispatcher: &utils.ArgDispatcher{
APIKey: utils.StringPointer("rsp12345"),
RouteID: &route,
},
}
ev := utils.TenantWithArgDispatcher{
TenantArg: &utils.TenantArg{
Tenant: "cgrates.org",
},
ArgDispatcher: &utils.ArgDispatcher{
APIKey: utils.StringPointer("rsp12345"),
RouteID: &route,
},
}
if err := dispEngine.RCP.Call(utils.ResponderPing, pingEv, &pingReply); err != nil {
t.Error(err)
} else if pingReply != utils.Pong {
t.Errorf("Received: %s", pingReply)
}
if err := dispEngine.RCP.Call(utils.ResponderStatus, &ev, &reply); err != nil {
t.Error(err)
}
if reply[utils.NodeID] == nil {
return
}
if strRply := reply[utils.NodeID].(string); strRply != "DispatcherS1" {
t.Errorf("Expected: DispatcherS1 , received: %s", strRply)
}
}