mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-16 13:49:53 +05:00
AgentRequest with ParseField method
This commit is contained in:
@@ -20,25 +20,36 @@ package agents
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func newAgentRequest(req engine.DataProvider) *AgentRequest {
|
||||
return &AgentRequest{
|
||||
func newAgentRequest(req engine.DataProvider, tntTpl utils.RSRFields,
|
||||
dfltTenant string) (ar *AgentRequest) {
|
||||
ar = &AgentRequest{
|
||||
Request: req,
|
||||
Vars: engine.NewNavigableMap(nil),
|
||||
CGRReply: engine.NewNavigableMap(nil),
|
||||
Reply: engine.NewNavigableMap(nil),
|
||||
}
|
||||
|
||||
// populate tenant
|
||||
if tntIf, err := ar.ParseField(
|
||||
&config.CfgCdrField{Type: utils.META_COMPOSED,
|
||||
Value: tntTpl}); err == nil && tntIf.(string) != "" {
|
||||
ar.Tenant = tntIf.(string)
|
||||
} else {
|
||||
ar.Tenant = dfltTenant
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// AgentRequest represents data related to one request towards agent
|
||||
// implements engine.DataProvider so we can pass it to filters
|
||||
type AgentRequest struct {
|
||||
Tenant string
|
||||
Request engine.DataProvider // request
|
||||
Vars *engine.NavigableMap // shared data
|
||||
CGRReply *engine.NavigableMap
|
||||
@@ -87,3 +98,59 @@ func (ar *AgentRequest) AsNavigableMap([]*config.CfgCdrField) (
|
||||
nM *engine.NavigableMap, err error) {
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
// parseField outputs the value based on the template item
|
||||
func (aReq *AgentRequest) ParseField(
|
||||
cfgFld *config.CfgCdrField) (out interface{}, err error) {
|
||||
var isString bool
|
||||
switch cfgFld.Type {
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported type: <%s>", cfgFld.Type)
|
||||
case utils.META_FILLER:
|
||||
out = cfgFld.Value.Id()
|
||||
cfgFld.Padding = "right"
|
||||
isString = true
|
||||
case utils.META_CONSTANT:
|
||||
out = cfgFld.Value.Id()
|
||||
isString = true
|
||||
case utils.META_COMPOSED:
|
||||
out = aReq.composedField(cfgFld.Value)
|
||||
isString = true
|
||||
}
|
||||
if isString { // format the string additionally with fmtFieldWidth
|
||||
out, err = utils.FmtFieldWidth(cfgFld.Tag, out.(string), cfgFld.Width,
|
||||
cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// composedField is a subset of ParseField
|
||||
func (ar *AgentRequest) composedField(outTpl utils.RSRFields) (outVal string) {
|
||||
for _, rsrTpl := range outTpl {
|
||||
if rsrTpl.IsStatic() {
|
||||
if parsed, err := rsrTpl.Parse(""); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> %s",
|
||||
utils.HTTPAgent, err.Error()))
|
||||
} else {
|
||||
outVal += parsed
|
||||
}
|
||||
continue
|
||||
}
|
||||
valStr, err := ar.FieldAsString(strings.Split(rsrTpl.Id, utils.CONCATENATED_KEY_SEP))
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> %s",
|
||||
utils.HTTPAgent, err.Error()))
|
||||
continue
|
||||
}
|
||||
if parsed, err := rsrTpl.Parse(valStr); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> %s",
|
||||
utils.RadiusAgent, err.Error()))
|
||||
} else {
|
||||
outVal += parsed
|
||||
}
|
||||
}
|
||||
return outVal
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ package agents
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -29,19 +30,23 @@ import (
|
||||
)
|
||||
|
||||
// NewHttpAgent will construct a HTTPAgent
|
||||
func NewHTTPAgent(sessionS rpcclient.RpcClientConnection,
|
||||
filterS *engine.FilterS,
|
||||
timezone, reqPayload, rplyPayload string,
|
||||
func NewHTTPAgent(
|
||||
sessionS rpcclient.RpcClientConnection,
|
||||
filterS *engine.FilterS, tenantCfg utils.RSRFields,
|
||||
dfltTenant, timezone, reqPayload, rplyPayload string,
|
||||
reqProcessors []*config.HttpAgntProcCfg) *HTTPAgent {
|
||||
return &HTTPAgent{sessionS: sessionS, timezone: timezone,
|
||||
return &HTTPAgent{sessionS: sessionS,
|
||||
dfltTenant: dfltTenant, timezone: timezone,
|
||||
reqPayload: reqPayload, rplyPayload: rplyPayload,
|
||||
reqProcessors: reqProcessors}
|
||||
}
|
||||
|
||||
// HTTPAgent is a handler for HTTP requests
|
||||
type HTTPAgent struct {
|
||||
sessionS rpcclient.RpcClientConnection
|
||||
filterS *engine.FilterS
|
||||
sessionS rpcclient.RpcClientConnection
|
||||
filterS *engine.FilterS
|
||||
tenantCfg utils.RSRFields
|
||||
dfltTenant,
|
||||
timezone,
|
||||
reqPayload,
|
||||
rplyPayload string
|
||||
@@ -57,7 +62,7 @@ func (ha *HTTPAgent) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
utils.HTTPAgent, err.Error()))
|
||||
return
|
||||
}
|
||||
agReq := newAgentRequest(dcdr)
|
||||
agReq := newAgentRequest(dcdr, ha.tenantCfg, ha.dfltTenant)
|
||||
var processed bool
|
||||
for _, reqProcessor := range ha.reqProcessors {
|
||||
var lclProcessed bool
|
||||
@@ -98,18 +103,29 @@ func (ha *HTTPAgent) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
// processRequest represents one processor processing the request
|
||||
func (ha *HTTPAgent) processRequest(reqProcessor *config.HttpAgntProcCfg,
|
||||
agReq *AgentRequest) (processed bool, err error) {
|
||||
tnt, err := agReq.Request.FieldAsString([]string{utils.Tenant})
|
||||
if pass, err := ha.filterS.Pass(agReq.Tenant,
|
||||
reqProcessor.Filters, agReq); err != nil || !pass {
|
||||
return pass, err
|
||||
}
|
||||
reqEv, err := agReq.AsNavigableMap(reqProcessor.RequestFields)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if pass, err := ha.filterS.Pass(tnt, reqProcessor.Filters, agReq); err != nil {
|
||||
return false, err
|
||||
} else if !pass {
|
||||
return false, nil
|
||||
cgrEv := &utils.CGREvent{
|
||||
Tenant: agReq.Tenant,
|
||||
ID: utils.UUIDSha1Prefix(),
|
||||
Time: utils.TimePointer(time.Now()),
|
||||
Event: reqEv.AsMapStringInterface(),
|
||||
}
|
||||
if reqProcessor.DryRun {
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> DRY_RUN, HTTP request: %s", utils.HTTPAgent, agReq))
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("<%s> DRY_RUN, processorID: %s, HTTP request: %s",
|
||||
utils.HTTPAgent, reqProcessor.Id, utils.ToJSON(agReq.Request)))
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("<%s> DRY_RUN, processorID: %s, CGREvent: %s",
|
||||
utils.HTTPAgent, reqProcessor.Id, agReq))
|
||||
}
|
||||
fmt.Printf("cgrEv: %+v", cgrEv)
|
||||
/*
|
||||
ev, err := radReqAsCGREvent(req, procVars, reqProcessor.Flags, reqProcessor.RequestFields)
|
||||
if err != nil {
|
||||
|
||||
@@ -352,7 +352,8 @@ func startKamAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitChan
|
||||
}
|
||||
|
||||
func startHTTPAgent(internalSMGChan chan rpcclient.RpcClientConnection,
|
||||
exitChan chan bool, server *utils.Server, filterSChan chan *engine.FilterS) {
|
||||
exitChan chan bool, server *utils.Server,
|
||||
filterSChan chan *engine.FilterS, dfltTenant string) {
|
||||
filterS := <-filterSChan
|
||||
filterSChan <- filterS
|
||||
utils.Logger.Info("Starting HTTP agent")
|
||||
@@ -372,7 +373,8 @@ func startHTTPAgent(internalSMGChan chan rpcclient.RpcClientConnection,
|
||||
}
|
||||
}
|
||||
server.RegisterHttpHandler(agntCfg.Url,
|
||||
agents.NewHTTPAgent(sSConn, filterS, agntCfg.Timezone, agntCfg.RequestPayload,
|
||||
agents.NewHTTPAgent(sSConn, filterS, agntCfg.Tenant, dfltTenant,
|
||||
agntCfg.Timezone, agntCfg.RequestPayload,
|
||||
agntCfg.ReplyPayload, agntCfg.RequestProcessors))
|
||||
}
|
||||
exitChan <- true
|
||||
@@ -1177,7 +1179,7 @@ func main() {
|
||||
}
|
||||
|
||||
if len(cfg.HttpAgentCfg()) != 0 {
|
||||
go startHTTPAgent(internalSMGChan, exitChan, server, filterSChan)
|
||||
go startHTTPAgent(internalSMGChan, exitChan, server, filterSChan, cfg.DefaultTenant)
|
||||
}
|
||||
|
||||
// Start PubSubS service
|
||||
|
||||
@@ -250,6 +250,7 @@ func TestHttpAgentCfg(t *testing.T) {
|
||||
"sessions_conns": [
|
||||
{"address": "*internal"} // connection towards SessionService
|
||||
],
|
||||
"tenant": "^cgrates.org",
|
||||
"timezone": "", // timezone for timestamps where not specified, empty for general defaults <""|UTC|Local|$IANA_TZ_DB>
|
||||
"request_payload": "*url", // source of input data <*url>
|
||||
"reply_payload": "*xml", // type of output data <*xml>
|
||||
@@ -261,7 +262,9 @@ func TestHttpAgentCfg(t *testing.T) {
|
||||
eCgrCfg, _ := NewDefaultCGRConfig()
|
||||
eCgrCfg.httpAgentCfg = []*HttpAgentCfg{
|
||||
&HttpAgentCfg{
|
||||
Url: "/conecto",
|
||||
Url: "/conecto",
|
||||
Tenant: utils.ParseRSRFieldsMustCompile("^cgrates.org",
|
||||
utils.INFIELD_SEP),
|
||||
Timezone: "",
|
||||
RequestPayload: utils.MetaUrl,
|
||||
ReplyPayload: utils.MetaXml,
|
||||
|
||||
@@ -25,13 +25,14 @@ import (
|
||||
type HttpAgentCfg struct {
|
||||
Url string
|
||||
SessionSConns []*HaPoolConfig
|
||||
Tenant utils.RSRFields
|
||||
Timezone string
|
||||
RequestPayload string
|
||||
ReplyPayload string
|
||||
RequestProcessors []*HttpAgntProcCfg
|
||||
}
|
||||
|
||||
func (ca *HttpAgentCfg) loadFromJsonCfg(jsnCfg *HttpAgentJsonCfg) error {
|
||||
func (ca *HttpAgentCfg) loadFromJsonCfg(jsnCfg *HttpAgentJsonCfg) (err error) {
|
||||
if jsnCfg == nil {
|
||||
return nil
|
||||
}
|
||||
@@ -45,6 +46,12 @@ func (ca *HttpAgentCfg) loadFromJsonCfg(jsnCfg *HttpAgentJsonCfg) error {
|
||||
ca.SessionSConns[idx].loadFromJsonCfg(jsnHaCfg)
|
||||
}
|
||||
}
|
||||
if jsnCfg.Tenant != nil {
|
||||
if ca.Tenant, err = utils.ParseRSRFields(*jsnCfg.Tenant,
|
||||
utils.INFIELD_SEP); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
if jsnCfg.Timezone != nil {
|
||||
ca.Timezone = *jsnCfg.Timezone
|
||||
}
|
||||
|
||||
@@ -376,6 +376,7 @@ type RAReqProcessorJsnCfg struct {
|
||||
type HttpAgentJsonCfg struct {
|
||||
Url *string
|
||||
Sessions_conns *[]*HaPoolJsonCfg
|
||||
Tenant *string
|
||||
Timezone *string
|
||||
Request_payload *string
|
||||
Reply_payload *string
|
||||
|
||||
Reference in New Issue
Block a user