mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Added tests for dispatcher internal connection
This commit is contained in:
committed by
Dan Christian Bogos
parent
3dc6a4bf0d
commit
9f10cd8ef4
@@ -1125,29 +1125,34 @@ func startRpc(server *utils.Server, internalRaterChan,
|
||||
internalSMGChan, internalAnalyzerSChan chan rpcclient.RpcClientConnection,
|
||||
internalDispatcherSChan chan *dispatchers.DispatcherService,
|
||||
exitChan chan bool) {
|
||||
select { // Any of the rpc methods will unlock listening to rpc requests
|
||||
case resp := <-internalRaterChan:
|
||||
internalRaterChan <- resp
|
||||
case cdrs := <-internalCdrSChan:
|
||||
internalCdrSChan <- cdrs
|
||||
case smg := <-internalSMGChan:
|
||||
internalSMGChan <- smg
|
||||
case rls := <-internalRsChan:
|
||||
internalRsChan <- rls
|
||||
case statS := <-internalStatSChan:
|
||||
internalStatSChan <- statS
|
||||
case attrS := <-internalAttrSChan:
|
||||
internalAttrSChan <- attrS
|
||||
case chrgS := <-internalChargerSChan:
|
||||
internalChargerSChan <- chrgS
|
||||
case thS := <-internalThdSChan:
|
||||
internalThdSChan <- thS
|
||||
case splS := <-internalSuplSChan:
|
||||
internalSuplSChan <- splS
|
||||
case dispatcherS := <-internalDispatcherSChan:
|
||||
internalDispatcherSChan <- dispatcherS
|
||||
case analyzerS := <-internalAnalyzerSChan:
|
||||
internalAnalyzerSChan <- analyzerS
|
||||
if !config.CgrConfig().DispatcherSCfg().Enabled {
|
||||
select { // Any of the rpc methods will unlock listening to rpc requests
|
||||
case resp := <-internalRaterChan:
|
||||
internalRaterChan <- resp
|
||||
case cdrs := <-internalCdrSChan:
|
||||
internalCdrSChan <- cdrs
|
||||
case smg := <-internalSMGChan:
|
||||
internalSMGChan <- smg
|
||||
case rls := <-internalRsChan:
|
||||
internalRsChan <- rls
|
||||
case statS := <-internalStatSChan:
|
||||
internalStatSChan <- statS
|
||||
case attrS := <-internalAttrSChan:
|
||||
internalAttrSChan <- attrS
|
||||
case chrgS := <-internalChargerSChan:
|
||||
internalChargerSChan <- chrgS
|
||||
case thS := <-internalThdSChan:
|
||||
internalThdSChan <- thS
|
||||
case splS := <-internalSuplSChan:
|
||||
internalSuplSChan <- splS
|
||||
case analyzerS := <-internalAnalyzerSChan:
|
||||
internalAnalyzerSChan <- analyzerS
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
case dispatcherS := <-internalDispatcherSChan:
|
||||
internalDispatcherSChan <- dispatcherS
|
||||
}
|
||||
}
|
||||
|
||||
go server.ServeJSON(cfg.ListenCfg().RPCJSONListen)
|
||||
|
||||
@@ -190,10 +190,9 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2 chan rpcclie
|
||||
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(responder)
|
||||
server.RpcRegister(apierRpcV1)
|
||||
server.RpcRegister(apierRpcV2)
|
||||
}
|
||||
// ToDo: do not register when dispatcher is active
|
||||
server.RpcRegister(apierRpcV1)
|
||||
server.RpcRegister(apierRpcV2)
|
||||
|
||||
utils.RegisterRpcParams("", &v1.CDRsV1{})
|
||||
utils.RegisterRpcParams("", &v2.CDRsV2{})
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
#Tenant,ID,Contexts,FilterIDs,ActivationInterval,AttributeFilterIDs,FieldName,Type,Value,Blocker,Weight
|
||||
cgrates.org,ATTR_1001_SIMPLEAUTH,simpleauth,*string:~Account:1001,,,Password,*constant,CGRateS.org,false,20
|
||||
cgrates.org,ATTR_1001_SIMPLEAUTH,*any,*string:~Account:1001,,,Password,*constant,CGRateS.org,false,20
|
||||
cgrates.org,ATTR_1003_SIMPLEAUTH,*any,*string:~Account:1003,,,Password,*constant,CGRateS.com,false,20
|
||||
cgrates.org,ATTR_API_ATTR_FAKE_AUTH,*auth,*string:~APIKey:12345,,,APIMethods,*constant,,false,20
|
||||
cgrates.org,ATTR_API_ATTR_AUTH,*auth,*string:~APIKey:attr12345,,,APIMethods,*constant,AttributeSv1.Ping&AttributeSv1.GetAttributeForEvent&AttributeSv1.ProcessEvent,false,20
|
||||
cgrates.org,ATTR_API_CHRG_AUTH,*auth,*string:~APIKey:chrg12345,,,APIMethods,*constant,ChargerSv1.Ping&ChargerSv1.GetChargersForEvent&ChargerSv1.ProcessEvent,false,20
|
||||
|
||||
|
@@ -41,6 +41,8 @@ var sTestsDspAttr = []func(t *testing.T){
|
||||
testDspAttrTestAuthKey,
|
||||
testDspAttrTestAuthKey2,
|
||||
testDspAttrTestAuthKey3,
|
||||
|
||||
testDspAttrGetAttrInternal,
|
||||
}
|
||||
|
||||
//Test start here
|
||||
@@ -454,3 +456,43 @@ func testDspAttrGetAttrRoundRobin(t *testing.T) {
|
||||
utils.ToJSON(eRply), utils.ToJSON(rplyEv))
|
||||
}
|
||||
}
|
||||
|
||||
func testDspAttrGetAttrInternal(t *testing.T) {
|
||||
args := &engine.AttrArgsProcessEvent{
|
||||
Context: utils.StringPointer("simpleauth"),
|
||||
CGREvent: utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "testAttributeSGetAttributeForEvent",
|
||||
Event: map[string]interface{}{
|
||||
utils.EVENT_NAME: "Internal",
|
||||
utils.Account: "1003",
|
||||
},
|
||||
},
|
||||
ArgDispatcher: &utils.ArgDispatcher{
|
||||
APIKey: utils.StringPointer("attr12345"),
|
||||
},
|
||||
}
|
||||
|
||||
eRply := &engine.AttrSProcessEventReply{
|
||||
MatchedProfiles: []string{"ATTR_1003_SIMPLEAUTH"},
|
||||
AlteredFields: []string{"Password"},
|
||||
CGREvent: &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "testAttributeSGetAttributeForEvent",
|
||||
Event: map[string]interface{}{
|
||||
utils.Account: "1003",
|
||||
utils.EVENT_NAME: "Internal",
|
||||
"Password": "CGRateS.com",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var rplyEv engine.AttrSProcessEventReply
|
||||
if err := dispEngine.RCP.Call(utils.AttributeSv1ProcessEvent,
|
||||
args, &rplyEv); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eRply, &rplyEv) {
|
||||
t.Errorf("Expecting: %s, received: %s",
|
||||
utils.ToJSON(eRply), utils.ToJSON(rplyEv))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -107,12 +107,34 @@ func (d *testDispatcher) loadData(t *testing.T, path string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (d *testDispatcher) loadData2(t *testing.T, path string) {
|
||||
wchan := make(chan struct{}, 1)
|
||||
go func() {
|
||||
loaderPath, err := exec.LookPath("cgr-loader")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
loader := exec.Command(loaderPath, "-config_path", d.CfgParh, "-path", path)
|
||||
|
||||
if err := loader.Start(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
loader.Wait()
|
||||
wchan <- struct{}{}
|
||||
}()
|
||||
select {
|
||||
case <-wchan:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Errorf("cgr-loader failed: ")
|
||||
}
|
||||
}
|
||||
|
||||
func testDsp(t *testing.T, tests []func(t *testing.T), testName, all, all2, disp, allTF, all2TF, attrTF string) {
|
||||
engine.KillEngine(0)
|
||||
allEngine = newTestEngine(t, path.Join(dspDataDir, "conf", "samples", "dispatchers", all), true, true)
|
||||
allEngine2 = newTestEngine(t, path.Join(dspDataDir, "conf", "samples", "dispatchers", all2), true, true)
|
||||
dispEngine = newTestEngine(t, path.Join(dspDataDir, "conf", "samples", "dispatchers", disp), true, true)
|
||||
dispEngine.loadData(t, path.Join(dspDataDir, "tariffplans", attrTF))
|
||||
dispEngine.loadData2(t, path.Join(dspDataDir, "tariffplans", attrTF))
|
||||
allEngine.loadData(t, path.Join(dspDataDir, "tariffplans", allTF))
|
||||
allEngine2.loadData(t, path.Join(dspDataDir, "tariffplans", all2TF))
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
Reference in New Issue
Block a user