From e66a5398d79595fba8c3bcd5bd1254abb8254a98 Mon Sep 17 00:00:00 2001 From: Tripon Alexandru-Ionut Date: Mon, 8 Apr 2019 19:09:19 +0300 Subject: [PATCH] Added test for internal connection in dispatcher --- apier/v1/apier.go | 6 +++ apier/v2/apier.go | 6 +++ cmd/cgr-engine/cgr-engine.go | 14 ++++--- cmd/cgr-engine/rater.go | 5 ++- .../dispatchers/DispatcherProfiles.csv | 1 + dispatchers/responder_it_test.go | 42 +++++++++++++++++++ 6 files changed, 67 insertions(+), 7 deletions(-) diff --git a/apier/v1/apier.go b/apier/v1/apier.go index f4766d930..f9da2ee9c 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -54,6 +54,12 @@ type ApierV1 struct { SchedulerS rpcclient.RpcClientConnection } +// Call implements rpcclient.RpcClientConnection interface for internal RPC +func (self *ApierV1) Call(serviceMethod string, + args interface{}, reply interface{}) error { + return utils.APIerRPCCall(self, serviceMethod, args, reply) +} + func (self *ApierV1) GetDestination(dstId string, reply *engine.Destination) error { if dst, err := self.DataManager.DataDB().GetDestination(dstId, false, utils.NonTransactional); err != nil { return utils.ErrNotFound diff --git a/apier/v2/apier.go b/apier/v2/apier.go index 8bede9d58..6b86a5240 100644 --- a/apier/v2/apier.go +++ b/apier/v2/apier.go @@ -39,6 +39,12 @@ type ApierV2 struct { v1.ApierV1 } +// Call implements rpcclient.RpcClientConnection interface for internal RPC +func (self *ApierV2) Call(serviceMethod string, + args interface{}, reply interface{}) error { + return utils.APIerRPCCall(self, serviceMethod, args, reply) +} + type AttrLoadRatingProfile struct { TPid string RatingProfileId string diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 275243ac9..1f3b03283 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -1098,7 +1098,7 @@ func initCacheS(internalCacheSChan chan rpcclient.RpcClientConnection, if !cfg.DispatcherSCfg().Enabled { server.RpcRegister(chSv1) } - internalCacheSChan <- chS //v1 + internalCacheSChan <- chS return } @@ -1437,17 +1437,19 @@ func main() { internalSchedSChan := make(chan rpcclient.RpcClientConnection, 1) internalGuardianSChan := make(chan rpcclient.RpcClientConnection, 1) internalLoaderSChan := make(chan rpcclient.RpcClientConnection, 1) + internalApierV1Chan := make(chan rpcclient.RpcClientConnection, 1) + internalApierV2Chan := make(chan rpcclient.RpcClientConnection, 1) // init internalRPCSet engine.IntRPC = engine.NewRPCClientSet() if cfg.DispatcherSCfg().Enabled { engine.IntRPC.AddInternalRPCClient(utils.AnalyzerSv1, internalAnalyzerSChan) - // engine.IntRPC.AddInternalRPCClient(utils.ApierV1, internalApierV1Chan) - // engine.IntRPC.AddInternalRPCClient(utils.ApierV2, internalApierV2Chan) + engine.IntRPC.AddInternalRPCClient(utils.ApierV1, internalApierV1Chan) + engine.IntRPC.AddInternalRPCClient(utils.ApierV2, internalApierV2Chan) engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, internalAttributeSChan) engine.IntRPC.AddInternalRPCClient(utils.CacheSv1, internalCacheSChan) // server or from apier - // engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, internalCdrSChan) - // engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, internalCdrSChan) + engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, internalCdrSChan) + engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, internalCdrSChan) engine.IntRPC.AddInternalRPCClient(utils.ChargerSv1, internalChargerSChan) engine.IntRPC.AddInternalRPCClient(utils.GuardianSv1, internalGuardianSChan) engine.IntRPC.AddInternalRPCClient(utils.LoaderSv1, internalLoaderSChan) @@ -1479,7 +1481,7 @@ func main() { // Start RALs if cfg.RalsCfg().RALsEnabled { - go startRater(internalRaterChan, cacheS, internalThresholdSChan, + go startRater(internalRaterChan, internalApierV1Chan, internalApierV2Chan, cacheS, internalThresholdSChan, internalStatSChan, srvManager, server, dm, loadDb, cdrDb, &stopHandled, exitChan, cacheS, filterSChan, internalCacheSChan, internalSchedSChan) } diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index 368c92799..5e83624c4 100755 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -31,7 +31,7 @@ import ( ) // Starts rater and reports on chan -func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS, +func startRater(internalRaterChan, internalApierv1, internalApierv2 chan rpcclient.RpcClientConnection, cacheS *engine.CacheS, internalThdSChan, internalStatSChan chan rpcclient.RpcClientConnection, serviceManager *servmanager.ServiceManager, server *utils.Server, dm *engine.DataManager, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, @@ -201,5 +201,8 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en utils.RegisterRpcParams("", apierRpcV1) utils.RegisterRpcParams("", apierRpcV2) utils.GetRpcParams("") + + internalApierv1 <- apierRpcV1 + internalApierv2 <- apierRpcV2 internalRaterChan <- responder // Rater done } diff --git a/data/tariffplans/dispatchers/DispatcherProfiles.csv b/data/tariffplans/dispatchers/DispatcherProfiles.csv index 72b5de1e3..4fa8926bd 100644 --- a/data/tariffplans/dispatchers/DispatcherProfiles.csv +++ b/data/tariffplans/dispatchers/DispatcherProfiles.csv @@ -9,3 +9,4 @@ cgrates.org,EVENT3,*any,*string:~EventName:Random,,*random,,ALL2,,20,false,,20 cgrates.org,EVENT3,,,,,,ALL,,10,,, cgrates.org,EVENT4,*any,*string:~EventName:Broadcast,,*broadcast,,ALL2,,20,false,,20 cgrates.org,EVENT4,,,,,,ALL,,10,,, +cgrates.org,EVENT5,*any,*string:~EventName:Internal,,*weight,,SELF,,20,false,,20 diff --git a/dispatchers/responder_it_test.go b/dispatchers/responder_it_test.go index 3a71966f4..be762008d 100644 --- a/dispatchers/responder_it_test.go +++ b/dispatchers/responder_it_test.go @@ -33,6 +33,7 @@ var sTestsDspRsp = []func(t *testing.T){ testDspResponderRandom, testDspResponderBroadcast, + testDspResponderInternal, } //Test start here @@ -186,3 +187,44 @@ func testDspResponderBroadcast(t *testing.T) { allEngine.startEngine(t) allEngine2.startEngine(t) } + +func testDspResponderInternal(t *testing.T) { + var reply map[string]interface{} + var pingReply string + route := "internal" + pingEv := utils.CGREventWithArgDispatcher{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + utils.EVENT_NAME: "Internal", + }, + }, + ArgDispatcher: &utils.ArgDispatcher{ + APIKey: utils.StringPointer("rsp12345"), + RouteID: &route, + }, + } + ev := utils.TenantWithArgDispatcher{ + TenantArg: &utils.TenantArg{ + Tenant: "cgrates.org", + }, + ArgDispatcher: &utils.ArgDispatcher{ + APIKey: utils.StringPointer("rsp12345"), + RouteID: &route, + }, + } + if err := dispEngine.RCP.Call(utils.ResponderPing, pingEv, &pingReply); err != nil { + t.Error(err) + } else if pingReply != utils.Pong { + t.Errorf("Received: %s", pingReply) + } + if err := dispEngine.RCP.Call(utils.ResponderStatus, &ev, &reply); err != nil { + t.Error(err) + } + if reply[utils.NodeID] == nil { + return + } + if strRply := reply[utils.NodeID].(string); strRply != "DispatcherS1" { + t.Errorf("Expected: DispatcherS1 , received: %s", strRply) + } +}