Begin HTTPAgent integration tests

This commit is contained in:
DanB
2018-07-05 19:27:50 +02:00
parent ffbfcbba83
commit efc810e70d
13 changed files with 283 additions and 30 deletions

View File

@@ -116,7 +116,7 @@ func (ar *AgentRequest) AsNavigableMap(tplFlds []*config.CfgCdrField) (
if err != nil {
return nil, err
}
nM.Set(strings.Split(tplFld.FieldId, utils.HIERARCHY_SEP), out, true)
nM.Set(strings.Split(tplFld.FieldId, utils.NestingSep), out, true)
}
return
}
@@ -159,7 +159,7 @@ func (ar *AgentRequest) composedField(outTpl utils.RSRFields) (outVal string) {
}
continue
}
valStr, err := ar.FieldAsString(strings.Split(rsrTpl.Id, utils.HIERARCHY_SEP))
valStr, err := ar.FieldAsString(strings.Split(rsrTpl.Id, utils.NestingSep))
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> %s",
@@ -169,7 +169,7 @@ func (ar *AgentRequest) composedField(outTpl utils.RSRFields) (outVal string) {
if parsed, err := rsrTpl.Parse(valStr); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> %s",
utils.RadiusAgent, err.Error()))
utils.HTTPAgent, err.Error()))
} else {
outVal += parsed
}

View File

@@ -61,42 +61,42 @@ func TestAgReqAsNavigableMap(t *testing.T) {
Value: utils.ParseRSRFieldsMustCompile("^cgrates.org", utils.INFIELD_SEP)},
&config.CfgCdrField{Tag: "Account",
FieldId: utils.Account, Type: utils.META_COMPOSED,
Value: utils.ParseRSRFieldsMustCompile("*cgrRequest>Account", utils.INFIELD_SEP)},
Value: utils.ParseRSRFieldsMustCompile("*cgrRequest.Account", utils.INFIELD_SEP)},
&config.CfgCdrField{Tag: "Destination",
FieldId: utils.Destination, Type: utils.META_COMPOSED,
Value: utils.ParseRSRFieldsMustCompile("*cgrRequest>Destination", utils.INFIELD_SEP)},
Value: utils.ParseRSRFieldsMustCompile("*cgrRequest.Destination", utils.INFIELD_SEP)},
&config.CfgCdrField{Tag: "RequestedUsageVoice",
FieldId: "RequestedUsage", Type: utils.META_COMPOSED,
Filters: []string{"*string:*cgrRequest>ToR:*voice"},
Filters: []string{"*string:*cgrRequest.ToR:*voice"},
Value: utils.ParseRSRFieldsMustCompile(
"*cgrRequest>Usage{*duration_seconds}", utils.INFIELD_SEP)},
"*cgrRequest.Usage{*duration_seconds}", utils.INFIELD_SEP)},
&config.CfgCdrField{Tag: "RequestedUsageData",
FieldId: "RequestedUsage", Type: utils.META_COMPOSED,
Filters: []string{"*string:*cgrRequest>ToR:*data"},
Filters: []string{"*string:*cgrRequest.ToR:*data"},
Value: utils.ParseRSRFieldsMustCompile(
"*cgrRequest>Usage{*duration_nanoseconds}", utils.INFIELD_SEP)},
"*cgrRequest.Usage{*duration_nanoseconds}", utils.INFIELD_SEP)},
&config.CfgCdrField{Tag: "RequestedUsageSMS",
FieldId: "RequestedUsage", Type: utils.META_COMPOSED,
Filters: []string{"*string:*cgrRequest>ToR:*sms"},
Filters: []string{"*string:*cgrRequest.ToR:*sms"},
Value: utils.ParseRSRFieldsMustCompile(
"*cgrRequest>Usage{*duration_nanoseconds}", utils.INFIELD_SEP)},
"*cgrRequest.Usage{*duration_nanoseconds}", utils.INFIELD_SEP)},
&config.CfgCdrField{Tag: "AttrPaypalAccount",
FieldId: "PaypalAccount", Type: utils.META_COMPOSED,
Filters: []string{"*string:*cgrReply>Error:"},
Filters: []string{"*string:*cgrReply.Error:"},
Value: utils.ParseRSRFieldsMustCompile(
"*cgrReply>Attributes>PaypalAccount", utils.INFIELD_SEP)},
"*cgrReply.Attributes.PaypalAccount", utils.INFIELD_SEP)},
&config.CfgCdrField{Tag: "MaxUsage",
FieldId: "MaxUsage", Type: utils.META_COMPOSED,
Filters: []string{"*string:*cgrReply>Error:"},
Filters: []string{"*string:*cgrReply.Error:"},
Value: utils.ParseRSRFieldsMustCompile(
"*cgrReply>MaxUsage{*duration_seconds}", utils.INFIELD_SEP)},
"*cgrReply.MaxUsage{*duration_seconds}", utils.INFIELD_SEP)},
&config.CfgCdrField{Tag: "Error",
FieldId: "Error", Type: utils.META_COMPOSED,
Filters: []string{"*rsr::*cgrReply>Error(!^$)"},
Filters: []string{"*rsr::*cgrReply.Error(!^$)"},
Value: utils.ParseRSRFieldsMustCompile(
"*cgrReply>Error", utils.INFIELD_SEP)},
"*cgrReply.Error", utils.INFIELD_SEP)},
}
eMp := engine.NewNavigableMap(nil)
eMp.Set([]string{utils.Tenant}, "cgrates.org", true)

View File

@@ -37,7 +37,7 @@ func NewHTTPAgent(
filterS *engine.FilterS, tenantCfg utils.RSRFields,
dfltTenant, timezone, reqPayload, rplyPayload string,
reqProcessors []*config.HttpAgntProcCfg) *HTTPAgent {
return &HTTPAgent{sessionS: sessionS,
return &HTTPAgent{sessionS: sessionS, filterS: filterS,
dfltTenant: dfltTenant, timezone: timezone,
reqPayload: reqPayload, rplyPayload: rplyPayload,
reqProcessors: reqProcessors}
@@ -125,6 +125,7 @@ func (ha *HTTPAgent) processRequest(reqProcessor *config.HttpAgntProcCfg,
utils.MetaTerminate, utils.MetaEvent} {
if reqProcessor.Flags.HasKey(typ) { // request type is identified through flags
reqType = typ
break
}
}
switch reqType {
@@ -206,8 +207,8 @@ func (ha *HTTPAgent) processRequest(reqProcessor *config.HttpAgntProcCfg,
return
}
}
if reqProcessor.Flags.HasKey(utils.MetaCDRs) &&
utils.IsSliceMember([]string{utils.MetaTerminate, utils.MetaEvent}, reqType) {
// separate request so we can capture the Terminate/Event also here
if reqProcessor.Flags.HasKey(utils.MetaCDRs) {
var rplyCDRs string
if err = ha.sessionS.Call(utils.SessionSv1ProcessCDR,
*cgrEv, &rplyCDRs); err != nil {
@@ -224,5 +225,5 @@ func (ha *HTTPAgent) processRequest(reqProcessor *config.HttpAgntProcCfg,
fmt.Sprintf("<%s> DRY_RUN, HTTP reply: %s",
utils.HTTPAgent, agReq.Reply))
}
return
return true, nil
}

113
agents/httpagent_it_test.go Normal file
View File

@@ -0,0 +1,113 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package agents
import (
"fmt"
"io/ioutil"
"net/http"
"net/rpc"
"net/rpc/jsonrpc"
"path"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var (
haCfgPath string
haCfg *config.CGRConfig
haRPC *rpc.Client
httpC *http.Client // so we can cache the connection
)
func TestHAitInitCfg(t *testing.T) {
haCfgPath = path.Join(*dataDir, "conf", "samples", "httpagent")
// Init config first
var err error
haCfg, err = config.NewCGRConfigFromFolder(haCfgPath)
if err != nil {
t.Error(err)
}
haCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush()
config.SetCgrConfig(haCfg)
httpC = new(http.Client)
}
// Remove data in both rating and accounting db
func TestHAitResetDataDb(t *testing.T) {
if err := engine.InitDataDb(haCfg); err != nil {
t.Fatal(err)
}
}
// Wipe out the cdr database
func TestHAitResetStorDb(t *testing.T) {
if err := engine.InitStorDb(haCfg); err != nil {
t.Fatal(err)
}
}
/*
// Start CGR Engine
func TestHAitStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(haCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
*/
// Connect rpc client to rater
func TestHAitApierRpcConn(t *testing.T) {
var err error
haRPC, err = jsonrpc.Dial("tcp", haCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal(err)
}
}
// Load the tariff plan, creating accounts and their balances
func TestHAitTPFromFolder(t *testing.T) {
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "oldtutorial")}
var loadInst utils.LoadInstance
if err := haRPC.Call("ApierV2.LoadTariffPlanFromFolder", attrs, &loadInst); err != nil {
t.Error(err)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups
}
func TestHAitAuth(t *testing.T) {
reqUrl := fmt.Sprintf("http://%s%s?request_type=OutboundAUTH&CallID=123456&Msisdn=497700056231&Imsi=2343000000000123&Destination=491239440004&MSRN=0102220233444488999&ProfileID=1&AgentID=176&GlobalMSISDN=497700056129&GlobalIMSI=214180000175129&ICCID=8923418450000089629&MCC=234&MNC=10&calltype=callback",
haCfg.HTTPListen, haCfg.HttpAgentCfg()[0].Url)
rply, err := httpC.Get(reqUrl)
if err != nil {
t.Error(err)
}
if body, err := ioutil.ReadAll(rply.Body); err != nil {
t.Error(err)
} else {
fmt.Printf("Got reply: %s\n", string(body))
}
rply.Body.Close()
}

View File

@@ -385,7 +385,6 @@ func startHTTPAgent(internalSMGChan chan rpcclient.RpcClientConnection,
agntCfg.Timezone, agntCfg.RequestPayload,
agntCfg.ReplyPayload, agntCfg.RequestProcessors))
}
exitChan <- true
}
func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
@@ -893,7 +892,8 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc
func startRpc(server *utils.Server, internalRaterChan,
internalCdrSChan, internalCdrStatSChan, internalPubSubSChan, internalUserSChan,
internalAliaseSChan, internalRsChan, internalStatSChan,
internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection) {
internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection,
exitChan chan bool) {
select { // Any of the rpc methods will unlock listening to rpc requests
case resp := <-internalRaterChan:
internalRaterChan <- resp
@@ -925,6 +925,7 @@ func startRpc(server *utils.Server, internalRaterChan,
cfg.HTTPWSURL,
cfg.HTTPUseBasicAuth,
cfg.HTTPAuthUsers,
exitChan,
)
if cfg.RPCGOBTLSListen != "" {
if cfg.TLSServerCerificate == "" || cfg.TLSServerKey == "" {
@@ -1244,7 +1245,7 @@ func main() {
// Serve rpc connections
go startRpc(server, internalRaterChan, internalCdrSChan, internalCdrStatSChan,
internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRsChan,
internalStatSChan, internalSMGChan, internalDispatcherSChan)
internalStatSChan, internalSMGChan, internalDispatcherSChan, exitChan)
<-exitChan
if *memprofile != "" {

View File

@@ -246,6 +246,7 @@ func TestHttpAgentCfg(t *testing.T) {
{
"http_agent": [
{
"id": "conecto1",
"url": "/conecto", // relative URL for requests coming in
"sessions_conns": [
{"address": "*internal"} // connection towards SessionService
@@ -262,6 +263,7 @@ func TestHttpAgentCfg(t *testing.T) {
eCgrCfg, _ := NewDefaultCGRConfig()
eCgrCfg.httpAgentCfg = []*HttpAgentCfg{
&HttpAgentCfg{
ID: "conecto1",
Url: "/conecto",
Tenant: utils.ParseRSRFieldsMustCompile("^cgrates.org",
utils.INFIELD_SEP),

View File

@@ -23,6 +23,7 @@ import (
)
type HttpAgentCfg struct {
ID string // identifier for the agent, so we can update it's processors
Url string
SessionSConns []*HaPoolConfig
Tenant utils.RSRFields
@@ -36,6 +37,9 @@ func (ca *HttpAgentCfg) loadFromJsonCfg(jsnCfg *HttpAgentJsonCfg) (err error) {
if jsnCfg == nil {
return nil
}
if jsnCfg.Id != nil {
ca.ID = *jsnCfg.Id
}
if jsnCfg.Url != nil {
ca.Url = *jsnCfg.Url
}

View File

@@ -379,6 +379,7 @@ type RAReqProcessorJsnCfg struct {
// Conecto Agent configuration section
type HttpAgentJsonCfg struct {
Id *string
Url *string
Sessions_conns *[]*HaPoolJsonCfg
Tenant *string

View File

@@ -0,0 +1,54 @@
{
// CGRateS Configuration file
//
"general": {
"log_level": 7,
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
},
"stor_db": {
"db_password": "CGRateS.org",
},
"rals": {
"enabled": true,
},
"scheduler": {
"enabled": true,
},
"cdrs": {
"enabled": true,
},
"attributes": {
"enabled": true,
},
"sessions": {
"enabled": true,
"attributes_conns": [
{"address": "*internal"}
],
"rals_conns": [
{"address": "*internal"}
],
},
}

View File

@@ -0,0 +1,75 @@
{
"http_agent": [
{
"id": "conecto1",
"url": "/conecto",
"sessions_conns": [
{"address": "*internal"}
],
"tenant": "^cgrates.org",
"timezone": "",
"request_payload": "*url",
"reply_payload": "*xml",
"request_processors": [
{
"id": "OutboundAUTH",
"filters": ["*string:*request.request_type:OutboundAUTH"],
"flags": ["*dryrun", "*auth", "*accounts", "*attributes"],
"continue_on_success": false,
"request_fields":[
{"tag": "RequestType", "field_id": "RequestType", "type": "*constant",
"value": "*pseudoprepaid", "mandatory": true},
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed",
"value": "*request.CallID", "mandatory": true},
{"tag": "Account", "field_id": "Account", "type": "*composed",
"value": "*request.Msisdn", "mandatory": true},
{"tag": "Destination", "field_id": "Destination", "type": "*composed",
"value": "*request.Destination", "mandatory": true},
{"tag": "SetupTime", "field_id": "SetupTime", "type": "*constant",
"value": "*now", "mandatory": true},
],
"reply_fields":[
{"tag": "Allow", "field_id": "response.Allow", "type": "*constant",
"value": "1", "mandatory": true},
//{"tag": "MaxDuration", "field_id": "response.MaxDuration", "type": "*composed",
// "value": "*cgrReply.MaxUsage{*duration_seconds}", "mandatory": true},
{"tag": "MaxDuration", "field_id": "response.MaxDuration", "type": "*constant",
"value": "1200", "mandatory": true},
],
},
{
"id": "mtcall_cdr",
"filters": ["*string:*request.request_type:MTCALL_CDR"],
"flags": ["*dryrun", "*cdrs"],
"continue_on_success": false,
"request_fields":[
{"tag": "RequestType", "field_id": "RequestType", "type": "*constant",
"value": "*pseudoprepaid", "mandatory": true},
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed",
"value": "*request.CDR_ID", "mandatory": true},
{"tag": "Account", "field_id": "Account", "type": "*composed",
"value": "*request.msisdn", "mandatory": true},
{"tag": "Destination", "field_id": "Destination", "type": "*composed",
"value": "*request.destination", "mandatory": true},
{"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed",
"value": "*request.timestamp", "mandatory": true},
{"tag": "AnswerTime", "field_id": "SetupTime", "type": "*composed",
"value": "*request.timestamp", "mandatory": true},
{"tag": "Usage", "field_id": "Usage", "type": "*composed",
"value": "*request.leg_duration;^s", "mandatory": true},
],
"reply_fields":[
{"tag": "CDR_ID", "field_id": "CDR_RESPONSE.CDR_ID", "type": "*composed",
"value": "*request.CDR_ID", "mandatory": true},
{"tag": "CDR_STATUS", "field_id": "CDR_RESPONSE.CDR_STATUS", "type": "*constant",
"value": "1", "mandatory": true},
],
}
],
},
],
}

View File

@@ -238,7 +238,7 @@ func (fltr *FilterRule) Pass(dP DataProvider, rpcClnt rpcclient.RpcClientConnect
}
func (fltr *FilterRule) passString(dP DataProvider) (bool, error) {
strVal, err := dP.FieldAsString(strings.Split(fltr.FieldName, utils.HIERARCHY_SEP))
strVal, err := dP.FieldAsString(strings.Split(fltr.FieldName, utils.NestingSep))
if err != nil {
if err == utils.ErrNotFound {
return false, nil
@@ -254,7 +254,7 @@ func (fltr *FilterRule) passString(dP DataProvider) (bool, error) {
}
func (fltr *FilterRule) passStringPrefix(dP DataProvider) (bool, error) {
strVal, err := dP.FieldAsString(strings.Split(fltr.FieldName, utils.HIERARCHY_SEP))
strVal, err := dP.FieldAsString(strings.Split(fltr.FieldName, utils.NestingSep))
if err != nil {
if err == utils.ErrNotFound {
return false, nil
@@ -275,7 +275,7 @@ func (fltr *FilterRule) passTimings(dP DataProvider) (bool, error) {
}
func (fltr *FilterRule) passDestinations(dP DataProvider) (bool, error) {
dst, err := dP.FieldAsString(strings.Split(fltr.FieldName, utils.HIERARCHY_SEP))
dst, err := dP.FieldAsString(strings.Split(fltr.FieldName, utils.NestingSep))
if err != nil {
if err == utils.ErrNotFound {
return false, nil
@@ -298,7 +298,7 @@ func (fltr *FilterRule) passDestinations(dP DataProvider) (bool, error) {
func (fltr *FilterRule) passRSR(dP DataProvider) (bool, error) {
for _, rsrFld := range fltr.rsrFields {
fldIface, err := dP.FieldAsInterface(strings.Split(rsrFld.Id, utils.HIERARCHY_SEP))
fldIface, err := dP.FieldAsInterface(strings.Split(rsrFld.Id, utils.NestingSep))
if err != nil {
if err == utils.ErrNotFound {
return false, nil
@@ -338,7 +338,7 @@ func (fltr *FilterRule) passStatS(dP DataProvider,
}
func (fltr *FilterRule) passGreaterThan(dP DataProvider) (bool, error) {
fldIf, err := dP.FieldAsInterface(strings.Split(fltr.FieldName, utils.HIERARCHY_SEP))
fldIf, err := dP.FieldAsInterface(strings.Split(fltr.FieldName, utils.NestingSep))
if err != nil {
if err == utils.ErrNotFound {
return false, nil

View File

@@ -654,6 +654,7 @@ const (
MetaAuth = "*auth"
APIKey = "APIKey"
APIMethods = "APIMethods"
NestingSep = "."
)
// MetaFilterIndexesAPIs

View File

@@ -185,7 +185,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request) {
}
func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string,
useBasicAuth bool, userList map[string]string) {
useBasicAuth bool, userList map[string]string, exitChan chan bool) {
s.RLock()
enabled := s.rpcEnabled
s.RUnlock()
@@ -228,6 +228,7 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string,
}
Logger.Info(fmt.Sprintf("<HTTP> start listening at <%s>", addr))
http.ListenAndServe(addr, nil)
exitChan <- true
}
func (s *Server) ServeBiJSON(addr string, onConn func(*rpc2.Client), onDis func(*rpc2.Client)) {