From 5d40409848d7f9a64997be75b07eede15ca943e1 Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 26 Sep 2018 19:49:23 +0200 Subject: [PATCH] Diameter writing reply items on connection --- agents/agentreq.go | 9 +++++++-- agents/agentreq_test.go | 2 +- agents/dmtagent.go | 40 +++++++++++++++++++++++++++------------- agents/httpagent.go | 2 +- agents/librad_test.go | 6 +++--- agents/radagent.go | 4 ++-- config/navigablemap.go | 5 +++++ 7 files changed, 46 insertions(+), 22 deletions(-) diff --git a/agents/agentreq.go b/agents/agentreq.go index 1ff54b4fe..0c6224d07 100644 --- a/agents/agentreq.go +++ b/agents/agentreq.go @@ -27,15 +27,20 @@ import ( "github.com/cgrates/cgrates/utils" ) -func newAgentRequest(req config.DataProvider, vars map[string]interface{}, +func newAgentRequest(req config.DataProvider, + vars map[string]interface{}, + rply *config.NavigableMap, tntTpl config.RSRParsers, dfltTenant string, filterS *engine.FilterS) (ar *AgentRequest) { + if rply == nil { + rply = config.NewNavigableMap(nil) + } ar = &AgentRequest{ Request: req, Vars: config.NewNavigableMap(vars), CGRRequest: config.NewNavigableMap(nil), CGRReply: config.NewNavigableMap(nil), - Reply: config.NewNavigableMap(nil), + Reply: rply, filterS: filterS, } // populate tenant diff --git a/agents/agentreq_test.go b/agents/agentreq_test.go index 45608d215..f4818c55a 100644 --- a/agents/agentreq_test.go +++ b/agents/agentreq_test.go @@ -33,7 +33,7 @@ func TestAgReqAsNavigableMap(t *testing.T) { dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, dm) - agReq := newAgentRequest(nil, nil, nil, "cgrates.org", filterS) + agReq := newAgentRequest(nil, nil, nil, nil, "cgrates.org", filterS) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set([]string{utils.CGRID}, utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), false) diff --git a/agents/dmtagent.go b/agents/dmtagent.go index df4fc1bd0..67da3a885 100644 --- a/agents/dmtagent.go +++ b/agents/dmtagent.go @@ -21,7 +21,6 @@ package agents import ( "fmt" "reflect" - "sync" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -38,9 +37,7 @@ func NewDiameterAgent(cgrCfg *config.CGRConfig, filterS *engine.FilterS, if sessionS != nil && reflect.ValueOf(sessionS).IsNil() { sessionS = nil } - da := &DiameterAgent{ - cgrCfg: cgrCfg, filterS: filterS, - sessionS: sessionS, connMux: new(sync.Mutex)} + da := &DiameterAgent{cgrCfg: cgrCfg, filterS: filterS, sessionS: sessionS} dictsPath := cgrCfg.DiameterAgentCfg().DictionariesPath if len(dictsPath) != 0 { if err := loadDictionaries(dictsPath, utils.DiameterAgent); err != nil { @@ -64,7 +61,6 @@ type DiameterAgent struct { cgrCfg *config.CGRConfig filterS *engine.FilterS sessionS rpcclient.RpcClientConnection // Connection towards CGR-SessionS component - connMux *sync.Mutex // Protect connection for read/write } // ListenAndServe is called when DiameterAgent is started, usually from within cmd/cgr-engine @@ -113,17 +109,15 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) { utils.MetaApp: dApp.Name, utils.MetaCmd: dCmd.Short + "R", } + rply := config.NewNavigableMap(nil) // share it among different processors var processed bool - var rply *diam.Message for _, reqProcessor := range da.cgrCfg.DiameterAgentCfg().RequestProcessors { var lclProcessed bool - lclProcessed, err = da.processRequest( - reqProcessor, + lclProcessed, err = da.processRequest(reqProcessor, newAgentRequest( - newDADataProvider(m), reqVars, + newDADataProvider(m), reqVars, rply, reqProcessor.Tenant, - da.cgrCfg.DefaultTenant, da.filterS), - rply) + da.cgrCfg.DefaultTenant, da.filterS)) if lclProcessed { processed = lclProcessed } @@ -145,10 +139,31 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) { writeOnConn(c, m.Answer(diam.UnableToDeliver)) return } + a := m.Answer(diam.Success) + // write reply into message + for _, nmItm := range rply.Items() { + itmStr, err := utils.IfaceAsString(nmItm.Data) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s processing reply item: %v for message: %s", + utils.DiameterAgent, err.Error(), nmItm, m)) + writeOnConn(c, m.Answer(diam.UnableToDeliver)) + return + } + if err := messageSetAVPsWithPath(a, nmItm.Path, + itmStr, false, da.cgrCfg.DefaultTimezone); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s setting reply item: %v for message: %s", + utils.DiameterAgent, err.Error(), nmItm, m)) + writeOnConn(c, m.Answer(diam.UnableToDeliver)) + return + } + } + writeOnConn(c, m.Answer(diam.UnableToDeliver)) } func (da *DiameterAgent) processRequest(reqProcessor *config.DARequestProcessor, - agReq *AgentRequest, rply *diam.Message) (processed bool, err error) { + agReq *AgentRequest) (processed bool, err error) { if pass, err := da.filterS.Pass(agReq.Tenant, reqProcessor.Filters, agReq); err != nil || !pass { return pass, err @@ -268,5 +283,4 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.DARequestProcessor, utils.DiameterAgent, agReq.Reply)) } return true, nil - return } diff --git a/agents/httpagent.go b/agents/httpagent.go index 50c769e8d..961cdfa80 100644 --- a/agents/httpagent.go +++ b/agents/httpagent.go @@ -62,7 +62,7 @@ func (ha *HTTPAgent) ServeHTTP(w http.ResponseWriter, req *http.Request) { utils.HTTPAgent, err.Error())) return } - agReq := newAgentRequest(dcdr, nil, ha.tenantCfg, ha.dfltTenant, ha.filterS) + agReq := newAgentRequest(dcdr, nil, nil, ha.tenantCfg, ha.dfltTenant, ha.filterS) var processed bool for _, reqProcessor := range ha.reqProcessors { var lclProcessed bool diff --git a/agents/librad_test.go b/agents/librad_test.go index 523d71cea..8b871cda5 100644 --- a/agents/librad_test.go +++ b/agents/librad_test.go @@ -97,7 +97,7 @@ func TestRadComposedFieldValue(t *testing.T) { if err := pkt.AddAVPWithName("Cisco-NAS-Port", "CGR1", "Cisco"); err != nil { t.Error(err) } - agReq := newAgentRequest(nil, nil, nil, "cgrates.org", nil) + agReq := newAgentRequest(nil, nil, nil, nil, "cgrates.org", nil) agReq.Vars.Set([]string{MetaRadReqType}, MetaRadAcctStart, false) agReq.Vars.Set([]string{"Cisco"}, "CGR1", false) agReq.Vars.Set([]string{"User-Name"}, "flopsy", false) @@ -117,7 +117,7 @@ func TestRadFieldOutVal(t *testing.T) { t.Error(err) } eOut := fmt.Sprintf("%s|flopsy|CGR1", MetaRadAcctStart) - agReq := newAgentRequest(nil, nil, nil, "cgrates.org", nil) + agReq := newAgentRequest(nil, nil, nil, nil, "cgrates.org", nil) agReq.Vars.Set([]string{MetaRadReqType}, MetaRadAcctStart, false) agReq.Vars.Set([]string{"Cisco"}, "CGR1", false) agReq.Vars.Set([]string{"User-Name"}, "flopsy", false) @@ -139,7 +139,7 @@ func TestRadReplyAppendAttributes(t *testing.T) { &config.FCTemplate{Tag: "Acct-Session-Time", FieldId: "Acct-Session-Time", Type: utils.META_COMPOSED, Value: config.NewRSRParsersMustCompile("~*cgrep.MaxUsage{*duration_seconds}", true)}, } - agReq := newAgentRequest(nil, nil, nil, "cgrates.org", nil) + agReq := newAgentRequest(nil, nil, nil, nil, "cgrates.org", nil) agReq.CGRReply.Set([]string{utils.CapMaxUsage}, time.Duration(time.Hour), false) agReq.CGRReply.Set([]string{utils.CapAttributes, "RadReply"}, "AccessAccept", false) agReq.CGRReply.Set([]string{utils.CapAttributes, utils.Account}, "1001", false) diff --git a/agents/radagent.go b/agents/radagent.go index 9f8724158..bd0cc115a 100644 --- a/agents/radagent.go +++ b/agents/radagent.go @@ -83,7 +83,7 @@ func (ra *RadiusAgent) handleAuth(req *radigo.Packet) (rpl *radigo.Packet, err e rpl.Code = radigo.AccessAccept var processed bool for _, reqProcessor := range ra.cgrCfg.RadiusAgentCfg().RequestProcessors { - agReq := newAgentRequest(dcdr, nil, reqProcessor.Tenant, + agReq := newAgentRequest(dcdr, nil, nil, reqProcessor.Tenant, ra.cgrCfg.DefaultTenant, ra.filterS) agReq.Vars.Set([]string{MetaRadReqType}, utils.StringToInterface(MetaRadAuth), true) var lclProcessed bool @@ -122,7 +122,7 @@ func (ra *RadiusAgent) handleAcct(req *radigo.Packet) (rpl *radigo.Packet, err e rpl.Code = radigo.AccountingResponse var processed bool for _, reqProcessor := range ra.cgrCfg.RadiusAgentCfg().RequestProcessors { - agReq := newAgentRequest(dcdr, nil, reqProcessor.Tenant, + agReq := newAgentRequest(dcdr, nil, nil, reqProcessor.Tenant, ra.cgrCfg.DefaultTenant, ra.filterS) var lclProcessed bool if lclProcessed, err = ra.processRequest(reqProcessor, agReq, rpl); lclProcessed { diff --git a/config/navigablemap.go b/config/navigablemap.go index 52f7784a1..c2a784b74 100644 --- a/config/navigablemap.go +++ b/config/navigablemap.go @@ -143,6 +143,11 @@ func indexMapElements(mp map[string]interface{}, path []string, vals *[]interfac } } +// Items returns the ordered list of items in the NavigableMap +func (nM *NavigableMap) Items() (itms []*NMItem) { + return +} + // Values returns the values in map, ordered by order information func (nM *NavigableMap) Values() (vals []interface{}) { if len(nM.data) == 0 {