diff --git a/agents/agentreq.go b/agents/agentreq.go index 1bc58f0dc..ea2dfec9e 100644 --- a/agents/agentreq.go +++ b/agents/agentreq.go @@ -116,7 +116,7 @@ func (ar *AgentRequest) AsNavigableMap(tplFlds []*config.CfgCdrField) ( if err != nil { return nil, err } - nM.Set(strings.Split(tplFld.FieldId, utils.HIERARCHY_SEP), out, true) + nM.Set(strings.Split(tplFld.FieldId, utils.NestingSep), out, true) } return } @@ -159,7 +159,7 @@ func (ar *AgentRequest) composedField(outTpl utils.RSRFields) (outVal string) { } continue } - valStr, err := ar.FieldAsString(strings.Split(rsrTpl.Id, utils.HIERARCHY_SEP)) + valStr, err := ar.FieldAsString(strings.Split(rsrTpl.Id, utils.NestingSep)) if err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> %s", @@ -169,7 +169,7 @@ func (ar *AgentRequest) composedField(outTpl utils.RSRFields) (outVal string) { if parsed, err := rsrTpl.Parse(valStr); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> %s", - utils.RadiusAgent, err.Error())) + utils.HTTPAgent, err.Error())) } else { outVal += parsed } diff --git a/agents/agentreq_test.go b/agents/agentreq_test.go index 0cc8fb576..13c60b871 100644 --- a/agents/agentreq_test.go +++ b/agents/agentreq_test.go @@ -61,42 +61,42 @@ func TestAgReqAsNavigableMap(t *testing.T) { Value: utils.ParseRSRFieldsMustCompile("^cgrates.org", utils.INFIELD_SEP)}, &config.CfgCdrField{Tag: "Account", FieldId: utils.Account, Type: utils.META_COMPOSED, - Value: utils.ParseRSRFieldsMustCompile("*cgrRequest>Account", utils.INFIELD_SEP)}, + Value: utils.ParseRSRFieldsMustCompile("*cgrRequest.Account", utils.INFIELD_SEP)}, &config.CfgCdrField{Tag: "Destination", FieldId: utils.Destination, Type: utils.META_COMPOSED, - Value: utils.ParseRSRFieldsMustCompile("*cgrRequest>Destination", utils.INFIELD_SEP)}, + Value: utils.ParseRSRFieldsMustCompile("*cgrRequest.Destination", utils.INFIELD_SEP)}, &config.CfgCdrField{Tag: "RequestedUsageVoice", FieldId: "RequestedUsage", Type: utils.META_COMPOSED, - Filters: []string{"*string:*cgrRequest>ToR:*voice"}, + Filters: []string{"*string:*cgrRequest.ToR:*voice"}, Value: utils.ParseRSRFieldsMustCompile( - "*cgrRequest>Usage{*duration_seconds}", utils.INFIELD_SEP)}, + "*cgrRequest.Usage{*duration_seconds}", utils.INFIELD_SEP)}, &config.CfgCdrField{Tag: "RequestedUsageData", FieldId: "RequestedUsage", Type: utils.META_COMPOSED, - Filters: []string{"*string:*cgrRequest>ToR:*data"}, + Filters: []string{"*string:*cgrRequest.ToR:*data"}, Value: utils.ParseRSRFieldsMustCompile( - "*cgrRequest>Usage{*duration_nanoseconds}", utils.INFIELD_SEP)}, + "*cgrRequest.Usage{*duration_nanoseconds}", utils.INFIELD_SEP)}, &config.CfgCdrField{Tag: "RequestedUsageSMS", FieldId: "RequestedUsage", Type: utils.META_COMPOSED, - Filters: []string{"*string:*cgrRequest>ToR:*sms"}, + Filters: []string{"*string:*cgrRequest.ToR:*sms"}, Value: utils.ParseRSRFieldsMustCompile( - "*cgrRequest>Usage{*duration_nanoseconds}", utils.INFIELD_SEP)}, + "*cgrRequest.Usage{*duration_nanoseconds}", utils.INFIELD_SEP)}, &config.CfgCdrField{Tag: "AttrPaypalAccount", FieldId: "PaypalAccount", Type: utils.META_COMPOSED, - Filters: []string{"*string:*cgrReply>Error:"}, + Filters: []string{"*string:*cgrReply.Error:"}, Value: utils.ParseRSRFieldsMustCompile( - "*cgrReply>Attributes>PaypalAccount", utils.INFIELD_SEP)}, + "*cgrReply.Attributes.PaypalAccount", utils.INFIELD_SEP)}, &config.CfgCdrField{Tag: "MaxUsage", FieldId: "MaxUsage", Type: utils.META_COMPOSED, - Filters: []string{"*string:*cgrReply>Error:"}, + Filters: []string{"*string:*cgrReply.Error:"}, Value: utils.ParseRSRFieldsMustCompile( - "*cgrReply>MaxUsage{*duration_seconds}", utils.INFIELD_SEP)}, + "*cgrReply.MaxUsage{*duration_seconds}", utils.INFIELD_SEP)}, &config.CfgCdrField{Tag: "Error", FieldId: "Error", Type: utils.META_COMPOSED, - Filters: []string{"*rsr::*cgrReply>Error(!^$)"}, + Filters: []string{"*rsr::*cgrReply.Error(!^$)"}, Value: utils.ParseRSRFieldsMustCompile( - "*cgrReply>Error", utils.INFIELD_SEP)}, + "*cgrReply.Error", utils.INFIELD_SEP)}, } eMp := engine.NewNavigableMap(nil) eMp.Set([]string{utils.Tenant}, "cgrates.org", true) diff --git a/agents/httpagent.go b/agents/httpagent.go index 5de44ac40..75eb91864 100644 --- a/agents/httpagent.go +++ b/agents/httpagent.go @@ -37,7 +37,7 @@ func NewHTTPAgent( filterS *engine.FilterS, tenantCfg utils.RSRFields, dfltTenant, timezone, reqPayload, rplyPayload string, reqProcessors []*config.HttpAgntProcCfg) *HTTPAgent { - return &HTTPAgent{sessionS: sessionS, + return &HTTPAgent{sessionS: sessionS, filterS: filterS, dfltTenant: dfltTenant, timezone: timezone, reqPayload: reqPayload, rplyPayload: rplyPayload, reqProcessors: reqProcessors} @@ -125,6 +125,7 @@ func (ha *HTTPAgent) processRequest(reqProcessor *config.HttpAgntProcCfg, utils.MetaTerminate, utils.MetaEvent} { if reqProcessor.Flags.HasKey(typ) { // request type is identified through flags reqType = typ + break } } switch reqType { @@ -206,8 +207,8 @@ func (ha *HTTPAgent) processRequest(reqProcessor *config.HttpAgntProcCfg, return } } - if reqProcessor.Flags.HasKey(utils.MetaCDRs) && - utils.IsSliceMember([]string{utils.MetaTerminate, utils.MetaEvent}, reqType) { + // separate request so we can capture the Terminate/Event also here + if reqProcessor.Flags.HasKey(utils.MetaCDRs) { var rplyCDRs string if err = ha.sessionS.Call(utils.SessionSv1ProcessCDR, *cgrEv, &rplyCDRs); err != nil { @@ -224,5 +225,5 @@ func (ha *HTTPAgent) processRequest(reqProcessor *config.HttpAgntProcCfg, fmt.Sprintf("<%s> DRY_RUN, HTTP reply: %s", utils.HTTPAgent, agReq.Reply)) } - return + return true, nil } diff --git a/agents/httpagent_it_test.go b/agents/httpagent_it_test.go new file mode 100644 index 000000000..8f4a0aea6 --- /dev/null +++ b/agents/httpagent_it_test.go @@ -0,0 +1,113 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package agents + +import ( + "fmt" + "io/ioutil" + "net/http" + "net/rpc" + "net/rpc/jsonrpc" + "path" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + haCfgPath string + haCfg *config.CGRConfig + haRPC *rpc.Client + httpC *http.Client // so we can cache the connection +) + +func TestHAitInitCfg(t *testing.T) { + haCfgPath = path.Join(*dataDir, "conf", "samples", "httpagent") + // Init config first + var err error + haCfg, err = config.NewCGRConfigFromFolder(haCfgPath) + if err != nil { + t.Error(err) + } + haCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush() + config.SetCgrConfig(haCfg) + httpC = new(http.Client) +} + +// Remove data in both rating and accounting db +func TestHAitResetDataDb(t *testing.T) { + if err := engine.InitDataDb(haCfg); err != nil { + t.Fatal(err) + } +} + +// Wipe out the cdr database +func TestHAitResetStorDb(t *testing.T) { + if err := engine.InitStorDb(haCfg); err != nil { + t.Fatal(err) + } +} + +/* +// Start CGR Engine +func TestHAitStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(haCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} +*/ + +// Connect rpc client to rater +func TestHAitApierRpcConn(t *testing.T) { + var err error + haRPC, err = jsonrpc.Dial("tcp", haCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal(err) + } +} + +// Load the tariff plan, creating accounts and their balances +func TestHAitTPFromFolder(t *testing.T) { + attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "oldtutorial")} + var loadInst utils.LoadInstance + if err := haRPC.Call("ApierV2.LoadTariffPlanFromFolder", attrs, &loadInst); err != nil { + t.Error(err) + } + time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups +} + +func TestHAitAuth(t *testing.T) { + reqUrl := fmt.Sprintf("http://%s%s?request_type=OutboundAUTH&CallID=123456&Msisdn=497700056231&Imsi=2343000000000123&Destination=491239440004&MSRN=0102220233444488999&ProfileID=1&AgentID=176&GlobalMSISDN=497700056129&GlobalIMSI=214180000175129&ICCID=8923418450000089629&MCC=234&MNC=10&calltype=callback", + haCfg.HTTPListen, haCfg.HttpAgentCfg()[0].Url) + rply, err := httpC.Get(reqUrl) + if err != nil { + t.Error(err) + } + if body, err := ioutil.ReadAll(rply.Body); err != nil { + t.Error(err) + } else { + fmt.Printf("Got reply: %s\n", string(body)) + } + rply.Body.Close() +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 3835f54bf..d33171401 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -385,7 +385,6 @@ func startHTTPAgent(internalSMGChan chan rpcclient.RpcClientConnection, agntCfg.Timezone, agntCfg.RequestPayload, agntCfg.ReplyPayload, agntCfg.RequestProcessors)) } - exitChan <- true } func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, @@ -893,7 +892,8 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc func startRpc(server *utils.Server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRsChan, internalStatSChan, - internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection) { + internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, + exitChan chan bool) { select { // Any of the rpc methods will unlock listening to rpc requests case resp := <-internalRaterChan: internalRaterChan <- resp @@ -925,6 +925,7 @@ func startRpc(server *utils.Server, internalRaterChan, cfg.HTTPWSURL, cfg.HTTPUseBasicAuth, cfg.HTTPAuthUsers, + exitChan, ) if cfg.RPCGOBTLSListen != "" { if cfg.TLSServerCerificate == "" || cfg.TLSServerKey == "" { @@ -1244,7 +1245,7 @@ func main() { // Serve rpc connections go startRpc(server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRsChan, - internalStatSChan, internalSMGChan, internalDispatcherSChan) + internalStatSChan, internalSMGChan, internalDispatcherSChan, exitChan) <-exitChan if *memprofile != "" { diff --git a/config/config_test.go b/config/config_test.go index 6cefdc743..90628520d 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -246,6 +246,7 @@ func TestHttpAgentCfg(t *testing.T) { { "http_agent": [ { + "id": "conecto1", "url": "/conecto", // relative URL for requests coming in "sessions_conns": [ {"address": "*internal"} // connection towards SessionService @@ -262,6 +263,7 @@ func TestHttpAgentCfg(t *testing.T) { eCgrCfg, _ := NewDefaultCGRConfig() eCgrCfg.httpAgentCfg = []*HttpAgentCfg{ &HttpAgentCfg{ + ID: "conecto1", Url: "/conecto", Tenant: utils.ParseRSRFieldsMustCompile("^cgrates.org", utils.INFIELD_SEP), diff --git a/config/httpagntcfg.go b/config/httpagntcfg.go index 4dccecb57..3ff141299 100644 --- a/config/httpagntcfg.go +++ b/config/httpagntcfg.go @@ -23,6 +23,7 @@ import ( ) type HttpAgentCfg struct { + ID string // identifier for the agent, so we can update it's processors Url string SessionSConns []*HaPoolConfig Tenant utils.RSRFields @@ -36,6 +37,9 @@ func (ca *HttpAgentCfg) loadFromJsonCfg(jsnCfg *HttpAgentJsonCfg) (err error) { if jsnCfg == nil { return nil } + if jsnCfg.Id != nil { + ca.ID = *jsnCfg.Id + } if jsnCfg.Url != nil { ca.Url = *jsnCfg.Url } diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 15968e154..6927e0ac7 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -379,6 +379,7 @@ type RAReqProcessorJsnCfg struct { // Conecto Agent configuration section type HttpAgentJsonCfg struct { + Id *string Url *string Sessions_conns *[]*HaPoolJsonCfg Tenant *string diff --git a/data/conf/samples/httpagent/cgrates.json b/data/conf/samples/httpagent/cgrates.json new file mode 100644 index 000000000..0b67df79c --- /dev/null +++ b/data/conf/samples/httpagent/cgrates.json @@ -0,0 +1,54 @@ +{ +// CGRateS Configuration file +// + + +"general": { + "log_level": 7, +}, + + +"listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080", +}, + + +"stor_db": { + "db_password": "CGRateS.org", +}, + + +"rals": { + "enabled": true, +}, + + +"scheduler": { + "enabled": true, +}, + + +"cdrs": { + "enabled": true, +}, + + +"attributes": { + "enabled": true, +}, + + +"sessions": { + "enabled": true, + "attributes_conns": [ + {"address": "*internal"} + ], + "rals_conns": [ + {"address": "*internal"} + ], +}, + + +} diff --git a/data/conf/samples/httpagent/httpagent.json b/data/conf/samples/httpagent/httpagent.json new file mode 100644 index 000000000..cfc8e314d --- /dev/null +++ b/data/conf/samples/httpagent/httpagent.json @@ -0,0 +1,75 @@ +{ + + +"http_agent": [ + { + "id": "conecto1", + "url": "/conecto", + "sessions_conns": [ + {"address": "*internal"} + ], + "tenant": "^cgrates.org", + "timezone": "", + "request_payload": "*url", + "reply_payload": "*xml", + "request_processors": [ + { + "id": "OutboundAUTH", + "filters": ["*string:*request.request_type:OutboundAUTH"], + "flags": ["*dryrun", "*auth", "*accounts", "*attributes"], + "continue_on_success": false, + "request_fields":[ + {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", + "value": "*pseudoprepaid", "mandatory": true}, + {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", + "value": "*request.CallID", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*composed", + "value": "*request.Msisdn", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*composed", + "value": "*request.Destination", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*constant", + "value": "*now", "mandatory": true}, + ], + "reply_fields":[ + {"tag": "Allow", "field_id": "response.Allow", "type": "*constant", + "value": "1", "mandatory": true}, + //{"tag": "MaxDuration", "field_id": "response.MaxDuration", "type": "*composed", + // "value": "*cgrReply.MaxUsage{*duration_seconds}", "mandatory": true}, + {"tag": "MaxDuration", "field_id": "response.MaxDuration", "type": "*constant", + "value": "1200", "mandatory": true}, + ], + }, + { + "id": "mtcall_cdr", + "filters": ["*string:*request.request_type:MTCALL_CDR"], + "flags": ["*dryrun", "*cdrs"], + "continue_on_success": false, + "request_fields":[ + {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", + "value": "*pseudoprepaid", "mandatory": true}, + {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", + "value": "*request.CDR_ID", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*composed", + "value": "*request.msisdn", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*composed", + "value": "*request.destination", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", + "value": "*request.timestamp", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "SetupTime", "type": "*composed", + "value": "*request.timestamp", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*composed", + "value": "*request.leg_duration;^s", "mandatory": true}, + ], + "reply_fields":[ + {"tag": "CDR_ID", "field_id": "CDR_RESPONSE.CDR_ID", "type": "*composed", + "value": "*request.CDR_ID", "mandatory": true}, + {"tag": "CDR_STATUS", "field_id": "CDR_RESPONSE.CDR_STATUS", "type": "*constant", + "value": "1", "mandatory": true}, + ], + } + ], + }, +], + + +} \ No newline at end of file diff --git a/engine/filters.go b/engine/filters.go index 958ef053f..65f3dcce8 100644 --- a/engine/filters.go +++ b/engine/filters.go @@ -238,7 +238,7 @@ func (fltr *FilterRule) Pass(dP DataProvider, rpcClnt rpcclient.RpcClientConnect } func (fltr *FilterRule) passString(dP DataProvider) (bool, error) { - strVal, err := dP.FieldAsString(strings.Split(fltr.FieldName, utils.HIERARCHY_SEP)) + strVal, err := dP.FieldAsString(strings.Split(fltr.FieldName, utils.NestingSep)) if err != nil { if err == utils.ErrNotFound { return false, nil @@ -254,7 +254,7 @@ func (fltr *FilterRule) passString(dP DataProvider) (bool, error) { } func (fltr *FilterRule) passStringPrefix(dP DataProvider) (bool, error) { - strVal, err := dP.FieldAsString(strings.Split(fltr.FieldName, utils.HIERARCHY_SEP)) + strVal, err := dP.FieldAsString(strings.Split(fltr.FieldName, utils.NestingSep)) if err != nil { if err == utils.ErrNotFound { return false, nil @@ -275,7 +275,7 @@ func (fltr *FilterRule) passTimings(dP DataProvider) (bool, error) { } func (fltr *FilterRule) passDestinations(dP DataProvider) (bool, error) { - dst, err := dP.FieldAsString(strings.Split(fltr.FieldName, utils.HIERARCHY_SEP)) + dst, err := dP.FieldAsString(strings.Split(fltr.FieldName, utils.NestingSep)) if err != nil { if err == utils.ErrNotFound { return false, nil @@ -298,7 +298,7 @@ func (fltr *FilterRule) passDestinations(dP DataProvider) (bool, error) { func (fltr *FilterRule) passRSR(dP DataProvider) (bool, error) { for _, rsrFld := range fltr.rsrFields { - fldIface, err := dP.FieldAsInterface(strings.Split(rsrFld.Id, utils.HIERARCHY_SEP)) + fldIface, err := dP.FieldAsInterface(strings.Split(rsrFld.Id, utils.NestingSep)) if err != nil { if err == utils.ErrNotFound { return false, nil @@ -338,7 +338,7 @@ func (fltr *FilterRule) passStatS(dP DataProvider, } func (fltr *FilterRule) passGreaterThan(dP DataProvider) (bool, error) { - fldIf, err := dP.FieldAsInterface(strings.Split(fltr.FieldName, utils.HIERARCHY_SEP)) + fldIf, err := dP.FieldAsInterface(strings.Split(fltr.FieldName, utils.NestingSep)) if err != nil { if err == utils.ErrNotFound { return false, nil diff --git a/utils/consts.go b/utils/consts.go index 73fbcdd80..53f03e03d 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -654,6 +654,7 @@ const ( MetaAuth = "*auth" APIKey = "APIKey" APIMethods = "APIMethods" + NestingSep = "." ) // MetaFilterIndexesAPIs diff --git a/utils/server.go b/utils/server.go index d2b85a81d..668b0981c 100644 --- a/utils/server.go +++ b/utils/server.go @@ -185,7 +185,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request) { } func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, - useBasicAuth bool, userList map[string]string) { + useBasicAuth bool, userList map[string]string, exitChan chan bool) { s.RLock() enabled := s.rpcEnabled s.RUnlock() @@ -228,6 +228,7 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, } Logger.Info(fmt.Sprintf(" start listening at <%s>", addr)) http.ListenAndServe(addr, nil) + exitChan <- true } func (s *Server) ServeBiJSON(addr string, onConn func(*rpc2.Client), onDis func(*rpc2.Client)) {