mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Diameter writing reply items on connection
This commit is contained in:
@@ -27,15 +27,20 @@ import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func newAgentRequest(req config.DataProvider, vars map[string]interface{},
|
||||
func newAgentRequest(req config.DataProvider,
|
||||
vars map[string]interface{},
|
||||
rply *config.NavigableMap,
|
||||
tntTpl config.RSRParsers,
|
||||
dfltTenant string, filterS *engine.FilterS) (ar *AgentRequest) {
|
||||
if rply == nil {
|
||||
rply = config.NewNavigableMap(nil)
|
||||
}
|
||||
ar = &AgentRequest{
|
||||
Request: req,
|
||||
Vars: config.NewNavigableMap(vars),
|
||||
CGRRequest: config.NewNavigableMap(nil),
|
||||
CGRReply: config.NewNavigableMap(nil),
|
||||
Reply: config.NewNavigableMap(nil),
|
||||
Reply: rply,
|
||||
filterS: filterS,
|
||||
}
|
||||
// populate tenant
|
||||
|
||||
@@ -33,7 +33,7 @@ func TestAgReqAsNavigableMap(t *testing.T) {
|
||||
dm := engine.NewDataManager(data)
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
filterS := engine.NewFilterS(cfg, nil, dm)
|
||||
agReq := newAgentRequest(nil, nil, nil, "cgrates.org", filterS)
|
||||
agReq := newAgentRequest(nil, nil, nil, nil, "cgrates.org", filterS)
|
||||
// populate request, emulating the way will be done in HTTPAgent
|
||||
agReq.CGRRequest.Set([]string{utils.CGRID},
|
||||
utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), false)
|
||||
|
||||
@@ -21,7 +21,6 @@ package agents
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -38,9 +37,7 @@ func NewDiameterAgent(cgrCfg *config.CGRConfig, filterS *engine.FilterS,
|
||||
if sessionS != nil && reflect.ValueOf(sessionS).IsNil() {
|
||||
sessionS = nil
|
||||
}
|
||||
da := &DiameterAgent{
|
||||
cgrCfg: cgrCfg, filterS: filterS,
|
||||
sessionS: sessionS, connMux: new(sync.Mutex)}
|
||||
da := &DiameterAgent{cgrCfg: cgrCfg, filterS: filterS, sessionS: sessionS}
|
||||
dictsPath := cgrCfg.DiameterAgentCfg().DictionariesPath
|
||||
if len(dictsPath) != 0 {
|
||||
if err := loadDictionaries(dictsPath, utils.DiameterAgent); err != nil {
|
||||
@@ -64,7 +61,6 @@ type DiameterAgent struct {
|
||||
cgrCfg *config.CGRConfig
|
||||
filterS *engine.FilterS
|
||||
sessionS rpcclient.RpcClientConnection // Connection towards CGR-SessionS component
|
||||
connMux *sync.Mutex // Protect connection for read/write
|
||||
}
|
||||
|
||||
// ListenAndServe is called when DiameterAgent is started, usually from within cmd/cgr-engine
|
||||
@@ -113,17 +109,15 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
|
||||
utils.MetaApp: dApp.Name,
|
||||
utils.MetaCmd: dCmd.Short + "R",
|
||||
}
|
||||
rply := config.NewNavigableMap(nil) // share it among different processors
|
||||
var processed bool
|
||||
var rply *diam.Message
|
||||
for _, reqProcessor := range da.cgrCfg.DiameterAgentCfg().RequestProcessors {
|
||||
var lclProcessed bool
|
||||
lclProcessed, err = da.processRequest(
|
||||
reqProcessor,
|
||||
lclProcessed, err = da.processRequest(reqProcessor,
|
||||
newAgentRequest(
|
||||
newDADataProvider(m), reqVars,
|
||||
newDADataProvider(m), reqVars, rply,
|
||||
reqProcessor.Tenant,
|
||||
da.cgrCfg.DefaultTenant, da.filterS),
|
||||
rply)
|
||||
da.cgrCfg.DefaultTenant, da.filterS))
|
||||
if lclProcessed {
|
||||
processed = lclProcessed
|
||||
}
|
||||
@@ -145,10 +139,31 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
|
||||
writeOnConn(c, m.Answer(diam.UnableToDeliver))
|
||||
return
|
||||
}
|
||||
a := m.Answer(diam.Success)
|
||||
// write reply into message
|
||||
for _, nmItm := range rply.Items() {
|
||||
itmStr, err := utils.IfaceAsString(nmItm.Data)
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s processing reply item: %v for message: %s",
|
||||
utils.DiameterAgent, err.Error(), nmItm, m))
|
||||
writeOnConn(c, m.Answer(diam.UnableToDeliver))
|
||||
return
|
||||
}
|
||||
if err := messageSetAVPsWithPath(a, nmItm.Path,
|
||||
itmStr, false, da.cgrCfg.DefaultTimezone); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s setting reply item: %v for message: %s",
|
||||
utils.DiameterAgent, err.Error(), nmItm, m))
|
||||
writeOnConn(c, m.Answer(diam.UnableToDeliver))
|
||||
return
|
||||
}
|
||||
}
|
||||
writeOnConn(c, m.Answer(diam.UnableToDeliver))
|
||||
}
|
||||
|
||||
func (da *DiameterAgent) processRequest(reqProcessor *config.DARequestProcessor,
|
||||
agReq *AgentRequest, rply *diam.Message) (processed bool, err error) {
|
||||
agReq *AgentRequest) (processed bool, err error) {
|
||||
if pass, err := da.filterS.Pass(agReq.Tenant,
|
||||
reqProcessor.Filters, agReq); err != nil || !pass {
|
||||
return pass, err
|
||||
@@ -268,5 +283,4 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.DARequestProcessor,
|
||||
utils.DiameterAgent, agReq.Reply))
|
||||
}
|
||||
return true, nil
|
||||
return
|
||||
}
|
||||
|
||||
@@ -62,7 +62,7 @@ func (ha *HTTPAgent) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
utils.HTTPAgent, err.Error()))
|
||||
return
|
||||
}
|
||||
agReq := newAgentRequest(dcdr, nil, ha.tenantCfg, ha.dfltTenant, ha.filterS)
|
||||
agReq := newAgentRequest(dcdr, nil, nil, ha.tenantCfg, ha.dfltTenant, ha.filterS)
|
||||
var processed bool
|
||||
for _, reqProcessor := range ha.reqProcessors {
|
||||
var lclProcessed bool
|
||||
|
||||
@@ -97,7 +97,7 @@ func TestRadComposedFieldValue(t *testing.T) {
|
||||
if err := pkt.AddAVPWithName("Cisco-NAS-Port", "CGR1", "Cisco"); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
agReq := newAgentRequest(nil, nil, nil, "cgrates.org", nil)
|
||||
agReq := newAgentRequest(nil, nil, nil, nil, "cgrates.org", nil)
|
||||
agReq.Vars.Set([]string{MetaRadReqType}, MetaRadAcctStart, false)
|
||||
agReq.Vars.Set([]string{"Cisco"}, "CGR1", false)
|
||||
agReq.Vars.Set([]string{"User-Name"}, "flopsy", false)
|
||||
@@ -117,7 +117,7 @@ func TestRadFieldOutVal(t *testing.T) {
|
||||
t.Error(err)
|
||||
}
|
||||
eOut := fmt.Sprintf("%s|flopsy|CGR1", MetaRadAcctStart)
|
||||
agReq := newAgentRequest(nil, nil, nil, "cgrates.org", nil)
|
||||
agReq := newAgentRequest(nil, nil, nil, nil, "cgrates.org", nil)
|
||||
agReq.Vars.Set([]string{MetaRadReqType}, MetaRadAcctStart, false)
|
||||
agReq.Vars.Set([]string{"Cisco"}, "CGR1", false)
|
||||
agReq.Vars.Set([]string{"User-Name"}, "flopsy", false)
|
||||
@@ -139,7 +139,7 @@ func TestRadReplyAppendAttributes(t *testing.T) {
|
||||
&config.FCTemplate{Tag: "Acct-Session-Time", FieldId: "Acct-Session-Time", Type: utils.META_COMPOSED,
|
||||
Value: config.NewRSRParsersMustCompile("~*cgrep.MaxUsage{*duration_seconds}", true)},
|
||||
}
|
||||
agReq := newAgentRequest(nil, nil, nil, "cgrates.org", nil)
|
||||
agReq := newAgentRequest(nil, nil, nil, nil, "cgrates.org", nil)
|
||||
agReq.CGRReply.Set([]string{utils.CapMaxUsage}, time.Duration(time.Hour), false)
|
||||
agReq.CGRReply.Set([]string{utils.CapAttributes, "RadReply"}, "AccessAccept", false)
|
||||
agReq.CGRReply.Set([]string{utils.CapAttributes, utils.Account}, "1001", false)
|
||||
|
||||
@@ -83,7 +83,7 @@ func (ra *RadiusAgent) handleAuth(req *radigo.Packet) (rpl *radigo.Packet, err e
|
||||
rpl.Code = radigo.AccessAccept
|
||||
var processed bool
|
||||
for _, reqProcessor := range ra.cgrCfg.RadiusAgentCfg().RequestProcessors {
|
||||
agReq := newAgentRequest(dcdr, nil, reqProcessor.Tenant,
|
||||
agReq := newAgentRequest(dcdr, nil, nil, reqProcessor.Tenant,
|
||||
ra.cgrCfg.DefaultTenant, ra.filterS)
|
||||
agReq.Vars.Set([]string{MetaRadReqType}, utils.StringToInterface(MetaRadAuth), true)
|
||||
var lclProcessed bool
|
||||
@@ -122,7 +122,7 @@ func (ra *RadiusAgent) handleAcct(req *radigo.Packet) (rpl *radigo.Packet, err e
|
||||
rpl.Code = radigo.AccountingResponse
|
||||
var processed bool
|
||||
for _, reqProcessor := range ra.cgrCfg.RadiusAgentCfg().RequestProcessors {
|
||||
agReq := newAgentRequest(dcdr, nil, reqProcessor.Tenant,
|
||||
agReq := newAgentRequest(dcdr, nil, nil, reqProcessor.Tenant,
|
||||
ra.cgrCfg.DefaultTenant, ra.filterS)
|
||||
var lclProcessed bool
|
||||
if lclProcessed, err = ra.processRequest(reqProcessor, agReq, rpl); lclProcessed {
|
||||
|
||||
@@ -143,6 +143,11 @@ func indexMapElements(mp map[string]interface{}, path []string, vals *[]interfac
|
||||
}
|
||||
}
|
||||
|
||||
// Items returns the ordered list of items in the NavigableMap
|
||||
func (nM *NavigableMap) Items() (itms []*NMItem) {
|
||||
return
|
||||
}
|
||||
|
||||
// Values returns the values in map, ordered by order information
|
||||
func (nM *NavigableMap) Values() (vals []interface{}) {
|
||||
if len(nM.data) == 0 {
|
||||
|
||||
Reference in New Issue
Block a user