From 48e10f9201ff1b2fe3047c8f3e5d78086b87fc1e Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 28 Nov 2018 20:36:49 +0100 Subject: [PATCH] DiameterAgent with max_active_requests limitation, proper return of diameter errors when processing requests --- agents/diamagent.go | 102 ++++++++++++++++--------------------- agents/libdiam.go | 72 ++++++++++++++++++++++++++ config/config_defaults.go | 9 ++++ config/config_json_test.go | 26 ++++++++-- config/config_test.go | 4 ++ config/diametercfg.go | 4 ++ config/libconfig_json.go | 23 +++++---- engine/poster.go | 6 +-- utils/consts.go | 1 + 9 files changed, 172 insertions(+), 75 deletions(-) diff --git a/agents/diamagent.go b/agents/diamagent.go index 6b724ae5e..43a38c294 100644 --- a/agents/diamagent.go +++ b/agents/diamagent.go @@ -21,7 +21,7 @@ package agents import ( "fmt" "reflect" - "strings" + "sync" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -66,6 +66,8 @@ type DiameterAgent struct { cgrCfg *config.CGRConfig filterS *engine.FilterS sessionS rpcclient.RpcClientConnection // Connection towards CGR-SessionS component + aReqs int + sync.RWMutex } // ListenAndServe is called when DiameterAgent is started, usually from within cmd/cgr-engine @@ -98,7 +100,7 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) { if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> decoding app: %d, err: %s", utils.DiameterAgent, m.Header.ApplicationID, err.Error())) - writeOnConn(c, m.Answer(diam.NoCommonApplication)) + writeOnConn(c, diamBareErr(m, diam.NoCommonApplication)) return } dCmd, err := m.Dictionary().FindCommand( @@ -107,7 +109,7 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) { if err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> decoding app: %d, command %d, err: %s", utils.DiameterAgent, m.Header.ApplicationID, m.Header.CommandCode, err.Error())) - writeOnConn(c, m.Answer(diam.CommandUnsupported)) + writeOnConn(c, diamBareErr(m, diam.CommandUnsupported)) return } reqVars := map[string]interface{}{ @@ -118,6 +120,36 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) { utils.MetaAppID: dApp.ID, utils.MetaCmd: dCmd.Short + "R", } + // build the negative error answer + diamErr, err := diamErr(m, diam.UnableToComply, reqVars, + da.cgrCfg.DiameterAgentCfg().Templates[utils.MetaErr], + da.cgrCfg.GeneralCfg().DefaultTenant, + da.cgrCfg.GeneralCfg().DefaultTimezone, + da.filterS) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s building errDiam for message: %s", + utils.DiameterAgent, err.Error(), m)) + writeOnConn(c, diamBareErr(m, diam.CommandUnsupported)) + return + } + if da.cgrCfg.DiameterAgentCfg().MaxActiveReqs != -1 { + da.Lock() + if da.aReqs == da.cgrCfg.DiameterAgentCfg().MaxActiveReqs { + utils.Logger.Err( + fmt.Sprintf("<%s> denying request due to maximum active requests reached: %d, message: %s", + utils.DiameterAgent, da.cgrCfg.DiameterAgentCfg().MaxActiveReqs, m)) + writeOnConn(c, diamErr) + return + } + da.aReqs++ + da.Unlock() + defer func() { // schedule decrement when returning out of function + da.Lock() + da.aReqs-- + da.Unlock() + }() + } rply := config.NewNavigableMap(nil) // share it among different processors var processed bool @@ -128,7 +160,7 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) { newDADataProvider(m), reqVars, rply, reqProcessor.Tenant, da.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(reqProcessor.Timezone, - config.CgrConfig().GeneralCfg().DefaultTimezone), + da.cgrCfg.GeneralCfg().DefaultTimezone), da.filterS)) if lclProcessed { processed = lclProcessed @@ -142,66 +174,22 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) { utils.Logger.Warning( fmt.Sprintf("<%s> error: %s processing message: %s", utils.DiameterAgent, err.Error(), m)) - writeOnConn(c, m.Answer(diam.UnableToComply)) + writeOnConn(c, diamErr) return } else if !processed { utils.Logger.Warning( fmt.Sprintf("<%s> no request processor enabled, ignoring message %s from %s", utils.DiameterAgent, m, c.RemoteAddr())) - writeOnConn(c, m.Answer(diam.UnableToComply)) + writeOnConn(c, diamErr) return } - a := m.Answer(diam.Success) - // write reply into message - pathIdx := make(map[string]int) // group items for same path - for _, val := range rply.Values() { - nmItms, isNMItems := val.([]*config.NMItem) - if !isNMItems { - utils.Logger.Warning( - fmt.Sprintf("<%s> cannot encode reply field: %s, ignoring message %s from %s", - utils.DiameterAgent, utils.ToJSON(val), m, c.RemoteAddr())) - writeOnConn(c, m.Answer(diam.UnableToComply)) - return - } - // find out the first itm which is not an attribute - var itm *config.NMItem - if len(nmItms) == 1 { - itm = nmItms[0] - } else { // only for groups - for i, cfgItm := range nmItms { - itmPath := strings.Join(cfgItm.Path, utils.NestingSep) - if i == 0 { // path is common, increase it only once - pathIdx[itmPath] += 1 - } - if i == pathIdx[itmPath]-1 { // revert from multiple items to only one per config path - itm = cfgItm - break - } - } - } - if itm == nil { - continue // all attributes, not writable to diameter packet - } - itmStr, err := utils.IfaceAsString(itm.Data) - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s processing reply item: %s for message: %s", - utils.DiameterAgent, err.Error(), utils.ToJSON(itm), m)) - writeOnConn(c, m.Answer(diam.UnableToComply)) - return - } - var newBranch bool - if itm.Config != nil && itm.Config.NewBranch { - newBranch = true - } - if err := messageSetAVPsWithPath(a, itm.Path, - itmStr, newBranch, da.cgrCfg.GeneralCfg().DefaultTimezone); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s setting reply item: %s for message: %s", - utils.DiameterAgent, err.Error(), utils.ToJSON(itm), m)) - writeOnConn(c, m.Answer(diam.UnableToComply)) - return - } + a, err := diamAnswer(m, diam.Success, false, + rply, da.cgrCfg.GeneralCfg().DefaultTimezone) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> err: %s, replying to message: %+v", + utils.DiameterAgent, err.Error(), m)) + } writeOnConn(c, a) } diff --git a/agents/libdiam.go b/agents/libdiam.go index d574189bf..22fe16ddf 100644 --- a/agents/libdiam.go +++ b/agents/libdiam.go @@ -29,6 +29,7 @@ import ( "time" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/fiorix/go-diameter/diam" "github.com/fiorix/go-diameter/diam/avp" @@ -389,3 +390,74 @@ func (dP *diameterDP) FieldAsInterface(fldPath []string) (data interface{}, err dP.cache.Set(fldPath, data, false, false) return } + +// diamAnswer builds up the answer to be sent back to the client +func diamAnswer(m *diam.Message, resCode uint32, errFlag bool, + rply *config.NavigableMap, tmz string) (a *diam.Message, err error) { + a = m.Answer(resCode) + if errFlag { + a.Header.CommandFlags = diam.ErrorFlag + } + // write reply into message + pathIdx := make(map[string]int) // group items for same path + for _, val := range rply.Values() { + nmItms, isNMItems := val.([]*config.NMItem) + if !isNMItems { + return nil, fmt.Errorf("cannot encode reply value: %s, err: not NMItems", utils.ToJSON(val)) + } + // find out the first itm which is not an attribute + var itm *config.NMItem + if len(nmItms) == 1 { + itm = nmItms[0] + } else { // only for groups + for i, cfgItm := range nmItms { + itmPath := strings.Join(cfgItm.Path, utils.NestingSep) + if i == 0 { // path is common, increase it only once + pathIdx[itmPath] += 1 + } + if i == pathIdx[itmPath]-1 { // revert from multiple items to only one per config path + itm = cfgItm + break + } + } + } + if itm == nil { + continue // all attributes, not writable to diameter packet + } + itmStr, err := utils.IfaceAsString(itm.Data) + if err != nil { + return nil, fmt.Errorf("cannot convert data: %+v to string, err: %s", itm.Data, err) + } + var newBranch bool + if itm.Config != nil && itm.Config.NewBranch { + newBranch = true + } + if err = messageSetAVPsWithPath(a, itm.Path, + itmStr, newBranch, tmz); err != nil { + return nil, fmt.Errorf("setting item with path: %+v got err: %s", itm.Path, err.Error()) + } + } + return +} + +// negDiamAnswer is used to return the negative answer we need previous to +func diamErr(m *diam.Message, resCode uint32, + reqVars map[string]interface{}, + tpl []*config.FCTemplate, tnt, tmz string, + filterS *engine.FilterS) (a *diam.Message, err error) { + aReq := newAgentRequest( + newDADataProvider(m), reqVars, + config.NewNavigableMap(nil), + nil, tnt, tmz, filterS) + var rplyData *config.NavigableMap + if rplyData, err = aReq.AsNavigableMap(tpl); err != nil { + return + } + return diamAnswer(m, resCode, true, rplyData, tmz) +} + +func diamBareErr(m *diam.Message, resCode uint32) (a *diam.Message) { + a = m.Answer(resCode) + a.Header.CommandFlags = diam.ErrorFlag + return +} diff --git a/config/config_defaults.go b/config/config_defaults.go index 72aa889dc..3d55e6776 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -372,7 +372,16 @@ const CGRATES_CFG_JSON = ` "origin_realm": "cgrates.org", // diameter Origin-Realm AVP used in replies "vendor_id": 0, // diameter Vendor-Id AVP used in replies "product_name": "CGRateS", // diameter Product-Name AVP used in replies + "max_active_requests": -1, // limit the number of active requests processed by the server <-1|0-n> "templates":{ + "*err": [ + {"tag": "SessionId", "field_id": "Session-Id", "type": "*composed", + "value": "~*req.Session-Id", "mandatory": true}, + {"tag": "OriginHost", "field_id": "Origin-Host", "type": "*composed", + "value": "~*vars.OriginHost", "mandatory": true}, + {"tag": "OriginRealm", "field_id": "Origin-Realm", "type": "*composed", + "value": "~*vars.OriginRealm", "mandatory": true}, + ], "*cca": [ {"tag": "SessionId", "field_id": "Session-Id", "type": "*composed", "value": "~*req.Session-Id", "mandatory": true}, diff --git a/config/config_json_test.go b/config/config_json_test.go index 2897ec1b5..ccfcd1e80 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -580,11 +580,29 @@ func TestDiameterAgentJsonCfg(t *testing.T) { { Address: utils.StringPointer(utils.MetaInternal), }}, - Origin_host: utils.StringPointer("CGR-DA"), - Origin_realm: utils.StringPointer("cgrates.org"), - Vendor_id: utils.IntPointer(0), - Product_name: utils.StringPointer("CGRateS"), + Origin_host: utils.StringPointer("CGR-DA"), + Origin_realm: utils.StringPointer("cgrates.org"), + Vendor_id: utils.IntPointer(0), + Product_name: utils.StringPointer("CGRateS"), + Max_active_requests: utils.IntPointer(-1), Templates: map[string][]*FcTemplateJsonCfg{ + utils.MetaErr: { + {Tag: utils.StringPointer("SessionId"), + Field_id: utils.StringPointer("Session-Id"), + Type: utils.StringPointer(utils.META_COMPOSED), + Value: utils.StringPointer("~*req.Session-Id"), + Mandatory: utils.BoolPointer(true)}, + {Tag: utils.StringPointer("OriginHost"), + Field_id: utils.StringPointer("Origin-Host"), + Type: utils.StringPointer(utils.META_COMPOSED), + Value: utils.StringPointer("~*vars.OriginHost"), + Mandatory: utils.BoolPointer(true)}, + {Tag: utils.StringPointer("OriginRealm"), + Field_id: utils.StringPointer("Origin-Realm"), + Type: utils.StringPointer(utils.META_COMPOSED), + Value: utils.StringPointer("~*vars.OriginRealm"), + Mandatory: utils.BoolPointer(true)}, + }, utils.MetaCCA: { {Tag: utils.StringPointer("SessionId"), Field_id: utils.StringPointer("Session-Id"), diff --git a/config/config_test.go b/config/config_test.go index 04174e11a..c74eaaacd 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -914,6 +914,7 @@ func TestCgrCfgJSONDefaultsDiameterAgentCfg(t *testing.T) { OriginRealm: "cgrates.org", VendorId: 0, ProductName: "CGRateS", + MaxActiveReqs: -1, RequestProcessors: nil, } @@ -941,6 +942,9 @@ func TestCgrCfgJSONDefaultsDiameterAgentCfg(t *testing.T) { if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.ProductName, testDA.ProductName) { t.Errorf("received: %+v, expecting: %+v", cgrCfg.diameterAgentCfg.ProductName, testDA.ProductName) } + if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.MaxActiveReqs, testDA.MaxActiveReqs) { + t.Errorf("received: %+v, expecting: %+v", cgrCfg.diameterAgentCfg.MaxActiveReqs, testDA.MaxActiveReqs) + } if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.RequestProcessors, testDA.RequestProcessors) { t.Errorf("expecting: %+v, received: %+v", testDA.RequestProcessors, cgrCfg.diameterAgentCfg.RequestProcessors) } diff --git a/config/diametercfg.go b/config/diametercfg.go index 89be20cd8..ce5c429ac 100644 --- a/config/diametercfg.go +++ b/config/diametercfg.go @@ -32,6 +32,7 @@ type DiameterAgentCfg struct { OriginRealm string VendorId int ProductName string + MaxActiveReqs int // limit the maximum number of requests processed Templates map[string][]*FCTemplate RequestProcessors []*DARequestProcessor } @@ -71,6 +72,9 @@ func (da *DiameterAgentCfg) loadFromJsonCfg(jsnCfg *DiameterAgentJsonCfg, separa if jsnCfg.Product_name != nil { da.ProductName = *jsnCfg.Product_name } + if jsnCfg.Max_active_requests != nil { + da.MaxActiveReqs = *jsnCfg.Max_active_requests + } if jsnCfg.Templates != nil { if da.Templates == nil { da.Templates = make(map[string][]*FCTemplate) diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 6ce5da6e8..a4bc99bb4 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -327,17 +327,18 @@ type OsipsConnJsonCfg struct { // DiameterAgent configuration type DiameterAgentJsonCfg struct { - Enabled *bool // enables the diameter agent: - Listen *string // address where to listen for diameter requests - Listen_net *string - Dictionaries_path *string // path towards additional dictionaries - Sessions_conns *[]*HaPoolJsonCfg // Connections towards SessionS - Origin_host *string - Origin_realm *string - Vendor_id *int - Product_name *string - Templates map[string][]*FcTemplateJsonCfg - Request_processors *[]*DARequestProcessorJsnCfg + Enabled *bool // enables the diameter agent: + Listen *string // address where to listen for diameter requests + Listen_net *string + Dictionaries_path *string // path towards additional dictionaries + Sessions_conns *[]*HaPoolJsonCfg // Connections towards SessionS + Origin_host *string + Origin_realm *string + Vendor_id *int + Product_name *string + Max_active_requests *int + Templates map[string][]*FcTemplateJsonCfg + Request_processors *[]*DARequestProcessorJsnCfg } // One Diameter request processor configuration diff --git a/engine/poster.go b/engine/poster.go index 534c185ab..b8fdacb00 100644 --- a/engine/poster.go +++ b/engine/poster.go @@ -145,11 +145,11 @@ type AMQPCachedPosters struct { // GetAMQPPoster creates a new poster only if not already cached // uses dialURL as cache key -func (pc *AMQPCachedPosters) GetAMQPPoster(dialURL string, attempts int, fallbackFileDir string) (amqpPoster *AMQPPoster, err error) { +func (pc *AMQPCachedPosters) GetAMQPPoster(dialURL string, attempts int, + fallbackFileDir string) (amqpPoster *AMQPPoster, err error) { pc.Lock() defer pc.Unlock() - var hasIt bool - if _, hasIt = pc.cache[dialURL]; !hasIt { + if _, hasIt := pc.cache[dialURL]; !hasIt { if pstr, err := NewAMQPPoster(dialURL, attempts, fallbackFileDir); err != nil { return nil, err } else { diff --git a/utils/consts.go b/utils/consts.go index ce54d01a0..47fc5462c 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -528,6 +528,7 @@ const ( MetaEnv = "*env:" // use in config for describing enviormant variables MetaTemplate = "*template" MetaCCA = "*cca" + MetaErr = "*err" OriginRealm = "OriginRealm" ProductName = "ProductName" CGRSubsystems = "cgr_subsystems"