DiameterAgent with max_active_requests limitation, proper return of diameter errors when processing requests

This commit is contained in:
DanB
2018-11-28 20:36:49 +01:00
parent ba837faee0
commit 48e10f9201
9 changed files with 172 additions and 75 deletions

View File

@@ -21,7 +21,7 @@ package agents
import (
"fmt"
"reflect"
"strings"
"sync"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -66,6 +66,8 @@ type DiameterAgent struct {
cgrCfg *config.CGRConfig
filterS *engine.FilterS
sessionS rpcclient.RpcClientConnection // Connection towards CGR-SessionS component
aReqs int
sync.RWMutex
}
// ListenAndServe is called when DiameterAgent is started, usually from within cmd/cgr-engine
@@ -98,7 +100,7 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> decoding app: %d, err: %s",
utils.DiameterAgent, m.Header.ApplicationID, err.Error()))
writeOnConn(c, m.Answer(diam.NoCommonApplication))
writeOnConn(c, diamBareErr(m, diam.NoCommonApplication))
return
}
dCmd, err := m.Dictionary().FindCommand(
@@ -107,7 +109,7 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> decoding app: %d, command %d, err: %s",
utils.DiameterAgent, m.Header.ApplicationID, m.Header.CommandCode, err.Error()))
writeOnConn(c, m.Answer(diam.CommandUnsupported))
writeOnConn(c, diamBareErr(m, diam.CommandUnsupported))
return
}
reqVars := map[string]interface{}{
@@ -118,6 +120,36 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
utils.MetaAppID: dApp.ID,
utils.MetaCmd: dCmd.Short + "R",
}
// build the negative error answer
diamErr, err := diamErr(m, diam.UnableToComply, reqVars,
da.cgrCfg.DiameterAgentCfg().Templates[utils.MetaErr],
da.cgrCfg.GeneralCfg().DefaultTenant,
da.cgrCfg.GeneralCfg().DefaultTimezone,
da.filterS)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s building errDiam for message: %s",
utils.DiameterAgent, err.Error(), m))
writeOnConn(c, diamBareErr(m, diam.CommandUnsupported))
return
}
if da.cgrCfg.DiameterAgentCfg().MaxActiveReqs != -1 {
da.Lock()
if da.aReqs == da.cgrCfg.DiameterAgentCfg().MaxActiveReqs {
utils.Logger.Err(
fmt.Sprintf("<%s> denying request due to maximum active requests reached: %d, message: %s",
utils.DiameterAgent, da.cgrCfg.DiameterAgentCfg().MaxActiveReqs, m))
writeOnConn(c, diamErr)
return
}
da.aReqs++
da.Unlock()
defer func() { // schedule decrement when returning out of function
da.Lock()
da.aReqs--
da.Unlock()
}()
}
rply := config.NewNavigableMap(nil) // share it among different processors
var processed bool
@@ -128,7 +160,7 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
newDADataProvider(m), reqVars, rply,
reqProcessor.Tenant, da.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(reqProcessor.Timezone,
config.CgrConfig().GeneralCfg().DefaultTimezone),
da.cgrCfg.GeneralCfg().DefaultTimezone),
da.filterS))
if lclProcessed {
processed = lclProcessed
@@ -142,66 +174,22 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing message: %s",
utils.DiameterAgent, err.Error(), m))
writeOnConn(c, m.Answer(diam.UnableToComply))
writeOnConn(c, diamErr)
return
} else if !processed {
utils.Logger.Warning(
fmt.Sprintf("<%s> no request processor enabled, ignoring message %s from %s",
utils.DiameterAgent, m, c.RemoteAddr()))
writeOnConn(c, m.Answer(diam.UnableToComply))
writeOnConn(c, diamErr)
return
}
a := m.Answer(diam.Success)
// write reply into message
pathIdx := make(map[string]int) // group items for same path
for _, val := range rply.Values() {
nmItms, isNMItems := val.([]*config.NMItem)
if !isNMItems {
utils.Logger.Warning(
fmt.Sprintf("<%s> cannot encode reply field: %s, ignoring message %s from %s",
utils.DiameterAgent, utils.ToJSON(val), m, c.RemoteAddr()))
writeOnConn(c, m.Answer(diam.UnableToComply))
return
}
// find out the first itm which is not an attribute
var itm *config.NMItem
if len(nmItms) == 1 {
itm = nmItms[0]
} else { // only for groups
for i, cfgItm := range nmItms {
itmPath := strings.Join(cfgItm.Path, utils.NestingSep)
if i == 0 { // path is common, increase it only once
pathIdx[itmPath] += 1
}
if i == pathIdx[itmPath]-1 { // revert from multiple items to only one per config path
itm = cfgItm
break
}
}
}
if itm == nil {
continue // all attributes, not writable to diameter packet
}
itmStr, err := utils.IfaceAsString(itm.Data)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing reply item: %s for message: %s",
utils.DiameterAgent, err.Error(), utils.ToJSON(itm), m))
writeOnConn(c, m.Answer(diam.UnableToComply))
return
}
var newBranch bool
if itm.Config != nil && itm.Config.NewBranch {
newBranch = true
}
if err := messageSetAVPsWithPath(a, itm.Path,
itmStr, newBranch, da.cgrCfg.GeneralCfg().DefaultTimezone); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s setting reply item: %s for message: %s",
utils.DiameterAgent, err.Error(), utils.ToJSON(itm), m))
writeOnConn(c, m.Answer(diam.UnableToComply))
return
}
a, err := diamAnswer(m, diam.Success, false,
rply, da.cgrCfg.GeneralCfg().DefaultTimezone)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> err: %s, replying to message: %+v",
utils.DiameterAgent, err.Error(), m))
}
writeOnConn(c, a)
}

View File

@@ -29,6 +29,7 @@ import (
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/fiorix/go-diameter/diam"
"github.com/fiorix/go-diameter/diam/avp"
@@ -389,3 +390,74 @@ func (dP *diameterDP) FieldAsInterface(fldPath []string) (data interface{}, err
dP.cache.Set(fldPath, data, false, false)
return
}
// diamAnswer builds up the answer to be sent back to the client
func diamAnswer(m *diam.Message, resCode uint32, errFlag bool,
rply *config.NavigableMap, tmz string) (a *diam.Message, err error) {
a = m.Answer(resCode)
if errFlag {
a.Header.CommandFlags = diam.ErrorFlag
}
// write reply into message
pathIdx := make(map[string]int) // group items for same path
for _, val := range rply.Values() {
nmItms, isNMItems := val.([]*config.NMItem)
if !isNMItems {
return nil, fmt.Errorf("cannot encode reply value: %s, err: not NMItems", utils.ToJSON(val))
}
// find out the first itm which is not an attribute
var itm *config.NMItem
if len(nmItms) == 1 {
itm = nmItms[0]
} else { // only for groups
for i, cfgItm := range nmItms {
itmPath := strings.Join(cfgItm.Path, utils.NestingSep)
if i == 0 { // path is common, increase it only once
pathIdx[itmPath] += 1
}
if i == pathIdx[itmPath]-1 { // revert from multiple items to only one per config path
itm = cfgItm
break
}
}
}
if itm == nil {
continue // all attributes, not writable to diameter packet
}
itmStr, err := utils.IfaceAsString(itm.Data)
if err != nil {
return nil, fmt.Errorf("cannot convert data: %+v to string, err: %s", itm.Data, err)
}
var newBranch bool
if itm.Config != nil && itm.Config.NewBranch {
newBranch = true
}
if err = messageSetAVPsWithPath(a, itm.Path,
itmStr, newBranch, tmz); err != nil {
return nil, fmt.Errorf("setting item with path: %+v got err: %s", itm.Path, err.Error())
}
}
return
}
// negDiamAnswer is used to return the negative answer we need previous to
func diamErr(m *diam.Message, resCode uint32,
reqVars map[string]interface{},
tpl []*config.FCTemplate, tnt, tmz string,
filterS *engine.FilterS) (a *diam.Message, err error) {
aReq := newAgentRequest(
newDADataProvider(m), reqVars,
config.NewNavigableMap(nil),
nil, tnt, tmz, filterS)
var rplyData *config.NavigableMap
if rplyData, err = aReq.AsNavigableMap(tpl); err != nil {
return
}
return diamAnswer(m, resCode, true, rplyData, tmz)
}
func diamBareErr(m *diam.Message, resCode uint32) (a *diam.Message) {
a = m.Answer(resCode)
a.Header.CommandFlags = diam.ErrorFlag
return
}

View File

@@ -372,7 +372,16 @@ const CGRATES_CFG_JSON = `
"origin_realm": "cgrates.org", // diameter Origin-Realm AVP used in replies
"vendor_id": 0, // diameter Vendor-Id AVP used in replies
"product_name": "CGRateS", // diameter Product-Name AVP used in replies
"max_active_requests": -1, // limit the number of active requests processed by the server <-1|0-n>
"templates":{
"*err": [
{"tag": "SessionId", "field_id": "Session-Id", "type": "*composed",
"value": "~*req.Session-Id", "mandatory": true},
{"tag": "OriginHost", "field_id": "Origin-Host", "type": "*composed",
"value": "~*vars.OriginHost", "mandatory": true},
{"tag": "OriginRealm", "field_id": "Origin-Realm", "type": "*composed",
"value": "~*vars.OriginRealm", "mandatory": true},
],
"*cca": [
{"tag": "SessionId", "field_id": "Session-Id", "type": "*composed",
"value": "~*req.Session-Id", "mandatory": true},

View File

@@ -580,11 +580,29 @@ func TestDiameterAgentJsonCfg(t *testing.T) {
{
Address: utils.StringPointer(utils.MetaInternal),
}},
Origin_host: utils.StringPointer("CGR-DA"),
Origin_realm: utils.StringPointer("cgrates.org"),
Vendor_id: utils.IntPointer(0),
Product_name: utils.StringPointer("CGRateS"),
Origin_host: utils.StringPointer("CGR-DA"),
Origin_realm: utils.StringPointer("cgrates.org"),
Vendor_id: utils.IntPointer(0),
Product_name: utils.StringPointer("CGRateS"),
Max_active_requests: utils.IntPointer(-1),
Templates: map[string][]*FcTemplateJsonCfg{
utils.MetaErr: {
{Tag: utils.StringPointer("SessionId"),
Field_id: utils.StringPointer("Session-Id"),
Type: utils.StringPointer(utils.META_COMPOSED),
Value: utils.StringPointer("~*req.Session-Id"),
Mandatory: utils.BoolPointer(true)},
{Tag: utils.StringPointer("OriginHost"),
Field_id: utils.StringPointer("Origin-Host"),
Type: utils.StringPointer(utils.META_COMPOSED),
Value: utils.StringPointer("~*vars.OriginHost"),
Mandatory: utils.BoolPointer(true)},
{Tag: utils.StringPointer("OriginRealm"),
Field_id: utils.StringPointer("Origin-Realm"),
Type: utils.StringPointer(utils.META_COMPOSED),
Value: utils.StringPointer("~*vars.OriginRealm"),
Mandatory: utils.BoolPointer(true)},
},
utils.MetaCCA: {
{Tag: utils.StringPointer("SessionId"),
Field_id: utils.StringPointer("Session-Id"),

View File

@@ -914,6 +914,7 @@ func TestCgrCfgJSONDefaultsDiameterAgentCfg(t *testing.T) {
OriginRealm: "cgrates.org",
VendorId: 0,
ProductName: "CGRateS",
MaxActiveReqs: -1,
RequestProcessors: nil,
}
@@ -941,6 +942,9 @@ func TestCgrCfgJSONDefaultsDiameterAgentCfg(t *testing.T) {
if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.ProductName, testDA.ProductName) {
t.Errorf("received: %+v, expecting: %+v", cgrCfg.diameterAgentCfg.ProductName, testDA.ProductName)
}
if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.MaxActiveReqs, testDA.MaxActiveReqs) {
t.Errorf("received: %+v, expecting: %+v", cgrCfg.diameterAgentCfg.MaxActiveReqs, testDA.MaxActiveReqs)
}
if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.RequestProcessors, testDA.RequestProcessors) {
t.Errorf("expecting: %+v, received: %+v", testDA.RequestProcessors, cgrCfg.diameterAgentCfg.RequestProcessors)
}

View File

@@ -32,6 +32,7 @@ type DiameterAgentCfg struct {
OriginRealm string
VendorId int
ProductName string
MaxActiveReqs int // limit the maximum number of requests processed
Templates map[string][]*FCTemplate
RequestProcessors []*DARequestProcessor
}
@@ -71,6 +72,9 @@ func (da *DiameterAgentCfg) loadFromJsonCfg(jsnCfg *DiameterAgentJsonCfg, separa
if jsnCfg.Product_name != nil {
da.ProductName = *jsnCfg.Product_name
}
if jsnCfg.Max_active_requests != nil {
da.MaxActiveReqs = *jsnCfg.Max_active_requests
}
if jsnCfg.Templates != nil {
if da.Templates == nil {
da.Templates = make(map[string][]*FCTemplate)

View File

@@ -327,17 +327,18 @@ type OsipsConnJsonCfg struct {
// DiameterAgent configuration
type DiameterAgentJsonCfg struct {
Enabled *bool // enables the diameter agent: <true|false>
Listen *string // address where to listen for diameter requests <x.y.z.y:1234>
Listen_net *string
Dictionaries_path *string // path towards additional dictionaries
Sessions_conns *[]*HaPoolJsonCfg // Connections towards SessionS
Origin_host *string
Origin_realm *string
Vendor_id *int
Product_name *string
Templates map[string][]*FcTemplateJsonCfg
Request_processors *[]*DARequestProcessorJsnCfg
Enabled *bool // enables the diameter agent: <true|false>
Listen *string // address where to listen for diameter requests <x.y.z.y:1234>
Listen_net *string
Dictionaries_path *string // path towards additional dictionaries
Sessions_conns *[]*HaPoolJsonCfg // Connections towards SessionS
Origin_host *string
Origin_realm *string
Vendor_id *int
Product_name *string
Max_active_requests *int
Templates map[string][]*FcTemplateJsonCfg
Request_processors *[]*DARequestProcessorJsnCfg
}
// One Diameter request processor configuration

View File

@@ -145,11 +145,11 @@ type AMQPCachedPosters struct {
// GetAMQPPoster creates a new poster only if not already cached
// uses dialURL as cache key
func (pc *AMQPCachedPosters) GetAMQPPoster(dialURL string, attempts int, fallbackFileDir string) (amqpPoster *AMQPPoster, err error) {
func (pc *AMQPCachedPosters) GetAMQPPoster(dialURL string, attempts int,
fallbackFileDir string) (amqpPoster *AMQPPoster, err error) {
pc.Lock()
defer pc.Unlock()
var hasIt bool
if _, hasIt = pc.cache[dialURL]; !hasIt {
if _, hasIt := pc.cache[dialURL]; !hasIt {
if pstr, err := NewAMQPPoster(dialURL, attempts, fallbackFileDir); err != nil {
return nil, err
} else {

View File

@@ -528,6 +528,7 @@ const (
MetaEnv = "*env:" // use in config for describing enviormant variables
MetaTemplate = "*template"
MetaCCA = "*cca"
MetaErr = "*err"
OriginRealm = "OriginRealm"
ProductName = "ProductName"
CGRSubsystems = "cgr_subsystems"