Consider diameter requests when limiting caps

- add possibility to pass custom Error-Message AVP to negative diameter
  answers
- negative answer is now built only when an error occurs
- remove tests that were testing behaviour with max concurrent requests 0
- add integration test specific to this change
This commit is contained in:
ionutboangiu
2024-11-04 19:19:44 +02:00
committed by Dan Christian Bogos
parent ad104573e9
commit 66c119dba7
6 changed files with 260 additions and 64 deletions

View File

@@ -0,0 +1,183 @@
//go:build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package agents
import (
"fmt"
"sync"
"testing"
"time"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/fiorix/go-diameter/v4/diam"
"github.com/fiorix/go-diameter/v4/diam/avp"
"github.com/fiorix/go-diameter/v4/diam/datatype"
"github.com/fiorix/go-diameter/v4/diam/dict"
)
func TestDiameterAgentCapsIT(t *testing.T) {
var dbCfg engine.DBCfg
switch *utils.DBType {
case utils.MetaInternal:
dbCfg = engine.DBCfg{
DataDB: &engine.DBParams{
Type: utils.StringPointer(utils.MetaInternal),
},
StorDB: &engine.DBParams{
Type: utils.StringPointer(utils.MetaInternal),
},
}
case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres:
t.SkipNow()
default:
t.Fatal("unsupported dbtype value")
}
jsonCfg := `{
"cores": {
"caps": 2,
"caps_strategy": "*busy",
"shutdown_timeout": "5ms"
},
"sessions":{
"enabled": true
},
"diameter_agent": {
"enabled": true,
"synced_conn_requests": true
}
}`
ng := engine.TestEngine{
ConfigJSON: jsonCfg,
DBCfg: dbCfg,
Encoding: *utils.Encoding,
}
client, cfg := ng.Run(t)
time.Sleep(10 * time.Millisecond) // wait for DiameterAgent service to start
diamClient, err := NewDiameterClient(cfg.DiameterAgentCfg().Listen, "localhost",
cfg.DiameterAgentCfg().OriginRealm, cfg.DiameterAgentCfg().VendorID,
cfg.DiameterAgentCfg().ProductName, utils.DiameterFirmwareRevision,
cfg.DiameterAgentCfg().DictionariesPath, cfg.DiameterAgentCfg().ListenNet)
if err != nil {
t.Fatal(err)
}
reqIdx := 0
sendCCR := func(t *testing.T, replyTimeout time.Duration, wg *sync.WaitGroup, wantResultCode string) {
if wg != nil {
defer wg.Done()
}
reqIdx++
ccr := diam.NewRequest(diam.CreditControl, 4, nil)
ccr.NewAVP(avp.SessionID, avp.Mbit, 0, datatype.UTF8String(fmt.Sprintf("session%d", reqIdx)))
ccr.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity("CGR-DA"))
ccr.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity("cgrates.org"))
ccr.NewAVP(avp.AuthApplicationID, avp.Mbit, 0, datatype.Unsigned32(4))
ccr.NewAVP(avp.CCRequestType, avp.Mbit, 0, datatype.Enumerated(1))
ccr.NewAVP(avp.CCRequestNumber, avp.Mbit, 0, datatype.Unsigned32(1))
if err := diamClient.SendMessage(ccr); err != nil {
t.Errorf("failed to send diameter message: %v", err)
return
}
reply := diamClient.ReceivedMessage(replyTimeout)
if reply == nil {
t.Error("received empty reply")
return
}
avps, err := reply.FindAVPsWithPath([]any{"Result-Code"}, dict.UndefinedVendorID)
if err != nil {
t.Error(err)
return
}
if len(avps) == 0 {
t.Error("missing AVPs in reply")
return
}
resultCode, err := diamAVPAsString(avps[0])
if err != nil {
t.Error(err)
}
if resultCode != wantResultCode {
t.Errorf("Result-Code=%s, want %s", resultCode, wantResultCode)
}
}
// There is currently no traffic. Expecting Result-Code 5012 (DIAMETER_UNABLE_TO_COMPLY),
// because there are no request processors enabled.
diamReplyTimeout := 2 * time.Second
sendCCR(t, diamReplyTimeout, nil, "5012")
// Caps limit is 2, therefore expecting the same result as in the scenario above.
doneCh := simulateCapsTraffic(t, client, 1, *cfg.CoreSCfg())
time.Sleep(time.Millisecond) // ensure traffic requests have been sent
sendCCR(t, diamReplyTimeout, nil, "5012")
<-doneCh
// With caps limit reached, Result-Code 3004 (DIAMETER_TOO_BUSY) is expected.
doneCh = simulateCapsTraffic(t, client, 2, *cfg.CoreSCfg())
time.Sleep(time.Millisecond) // ensure traffic requests have been sent
sendCCR(t, diamReplyTimeout, nil, "3004")
<-doneCh
// TODO: Check caps functionality with async diameter requests.
}
func simulateCapsTraffic(t *testing.T, client *birpc.Client, amount int, coresCfg config.CoreSCfg) <-chan struct{} {
t.Helper()
var wg sync.WaitGroup
var reply string
for i := range amount {
wg.Add(1)
go func() {
t.Helper()
defer wg.Done()
if err := client.Call(context.Background(), utils.CoreSv1Sleep,
&utils.DurationArgs{
// Use the ShutdownTimeout CoreS setting
// instead of having to pass an extra
// variable to the function.
Duration: coresCfg.ShutdownTimeout,
}, &reply); err != nil {
if coresCfg.CapsStrategy == utils.MetaBusy && i >= coresCfg.Caps {
return // no need to handle errors for this scenario
}
t.Errorf("CoreSv1.Sleep unexpected error: %v", err)
}
}()
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
return done
}

View File

@@ -48,11 +48,12 @@ const (
// NewDiameterAgent initializes a new DiameterAgent
func NewDiameterAgent(cgrCfg *config.CGRConfig, filterS *engine.FilterS,
connMgr *engine.ConnManager) (*DiameterAgent, error) {
connMgr *engine.ConnManager, caps *engine.Caps) (*DiameterAgent, error) {
da := &DiameterAgent{
cgrCfg: cgrCfg,
filterS: filterS,
connMgr: connMgr,
caps: caps,
raa: make(map[string]chan *diam.Message),
dpa: make(map[string]chan *diam.Message),
peers: make(map[string]diam.Conn),
@@ -87,6 +88,7 @@ type DiameterAgent struct {
cgrCfg *config.CGRConfig
filterS *engine.FilterS
connMgr *engine.ConnManager
caps *engine.Caps
raaLck sync.RWMutex
raa map[string]chan *diam.Message
@@ -163,7 +165,7 @@ func (da *DiameterAgent) handlers() diam.Handler {
dSM.HandleFunc(raa, da.handleRAA)
dSM.HandleFunc(dpa, da.handleDPA)
} else {
dSM.HandleFunc(all, da.handleMessageAsync)
dSM.HandleFunc(all, func(c diam.Conn, m *diam.Message) { go da.handleMessage(c, m) })
dSM.HandleFunc(raa, func(c diam.Conn, m *diam.Message) { go da.handleRAA(c, m) })
dSM.HandleFunc(dpa, func(c diam.Conn, m *diam.Message) { go da.handleDPA(c, m) })
}
@@ -176,18 +178,13 @@ func (da *DiameterAgent) handlers() diam.Handler {
return dSM
}
// handleMessageAsync will dispatch the message into it's own goroutine
func (da *DiameterAgent) handleMessageAsync(c diam.Conn, m *diam.Message) {
go da.handleMessage(c, m)
}
// handleALL is the handler of all messages coming in via Diameter
func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
dApp, err := m.Dictionary().App(m.Header.ApplicationID)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> decoding app: %d, err: %s",
utils.DiameterAgent, m.Header.ApplicationID, err.Error()))
writeOnConn(c, diamBareErr(m, diam.NoCommonApplication))
writeOnConn(c, diamErrMsg(m, diam.NoCommonApplication, err.Error()))
return
}
dCmd, err := m.Dictionary().FindCommand(
@@ -196,7 +193,7 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> decoding app: %d, command %d, err: %s",
utils.DiameterAgent, m.Header.ApplicationID, m.Header.CommandCode, err.Error()))
writeOnConn(c, diamBareErr(m, diam.CommandUnsupported))
writeOnConn(c, diamErrMsg(m, diam.CommandUnsupported, err.Error()))
return
}
diamDP := newDADataProvider(c, m)
@@ -212,20 +209,14 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
utils.RemoteHost: utils.NewLeafNode(c.RemoteAddr().String()),
},
}
// build the negative error answer
diamErr, err := diamErr(
m, diam.UnableToComply, reqVars,
da.cgrCfg.TemplatesCfg()[utils.MetaErr],
da.cgrCfg.GeneralCfg().DefaultTenant,
da.cgrCfg.GeneralCfg().DefaultTimezone,
da.filterS)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s building errDiam for message: %s",
utils.DiameterAgent, err.Error(), m))
writeOnConn(c, diamBareErr(m, diam.CommandUnsupported))
return
if da.caps.IsLimited() {
if err := da.caps.Allocate(); err != nil {
diamErr(c, m, diam.TooBusy, reqVars, da.cgrCfg, da.filterS)
return
}
defer da.caps.Deallocate()
}
// cache message for ASR
if da.cgrCfg.DiameterAgentCfg().ASRTemplate != "" ||
da.cgrCfg.DiameterAgentCfg().RARTemplate != "" {
@@ -234,16 +225,18 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
utils.Logger.Warning(
fmt.Sprintf("<%s> failed retrieving Session-Id err: %s, message: %s",
utils.DiameterAgent, err.Error(), m))
writeOnConn(c, diamErr)
diamErr(c, m, diam.UnableToComply, reqVars, da.cgrCfg, da.filterS)
return
}
// cache message data needed for building up the ASR
if errCh := engine.Cache.Set(context.TODO(), utils.CacheDiameterMessages, sessID, &diamMsgData{c, m, reqVars},
nil, true, utils.NonTransactional); errCh != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> failed message: %s to set Cache: %s", utils.DiameterAgent, m, errCh.Error()))
writeOnConn(c, diamErr)
diamErr(c, m, diam.UnableToComply, reqVars, da.cgrCfg, da.filterS)
return
}
}
cgrRplyNM := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{}}
opts := utils.MapStorage{}
rply := utils.NewOrderedNavigableMap() // share it among different processors
@@ -274,14 +267,14 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing message: %s",
utils.DiameterAgent, err.Error(), m))
writeOnConn(c, diamErr)
diamErr(c, m, diam.UnableToComply, reqVars, da.cgrCfg, da.filterS)
return
}
if !processed {
utils.Logger.Warning(
fmt.Sprintf("<%s> no request processor enabled, ignoring message %s from %s",
utils.DiameterAgent, m, c.RemoteAddr()))
writeOnConn(c, diamErr)
diamErr(c, m, diam.UnableToComply, reqVars, da.cgrCfg, da.filterS)
return
}
a, err := diamAnswer(m, 0, false,
@@ -290,7 +283,7 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
utils.Logger.Warning(
fmt.Sprintf("<%s> err: %s, replying to message: %+v",
utils.DiameterAgent, err.Error(), m))
writeOnConn(c, diamErr)
diamErr(c, m, diam.UnableToComply, reqVars, da.cgrCfg, da.filterS)
return
}
writeOnConn(c, a)

View File

@@ -454,25 +454,42 @@ func diamAnswer(m *diam.Message, resCode uint32, errFlag bool,
return
}
// negDiamAnswer is used to return the negative answer we need previous to
func diamErr(m *diam.Message, resCode uint32,
reqVars *utils.DataNode,
tpl []*config.FCTemplate, tnt, tmz string,
filterS *engine.FilterS) (a *diam.Message, err error) {
aReq := NewAgentRequest(
newDADataProvider(nil, m), reqVars,
nil, nil, nil, nil,
tnt, tmz, filterS, nil)
if err = aReq.SetFields(tpl); err != nil {
// diamErr handles Diameter error scenarios by attempting to build a customized error answer
// based on the *err template.
func diamErr(c diam.Conn, m *diam.Message, resCode uint32, reqVars *utils.DataNode, cfg *config.CGRConfig,
filterS *engine.FilterS) {
tnt := cfg.GeneralCfg().DefaultTenant
tmz := cfg.GeneralCfg().DefaultTimezone
aReq := NewAgentRequest(newDADataProvider(nil, m), reqVars,
nil, nil, nil, nil, tnt, tmz, filterS, nil)
if err := aReq.SetFields(cfg.TemplatesCfg()[utils.MetaErr]); err != nil {
utils.Logger.Warning(fmt.Sprintf(
"<%s> message: %s - failed to parse *err template: %v",
utils.DiameterAgent, m, err))
writeOnConn(c, diamErrMsg(m, diam.UnableToComply,
fmt.Sprintf("failed to parse *err template: %v", err)))
return
}
return diamAnswer(m, resCode, true, aReq.Reply, tmz)
diamAns, err := diamAnswer(m, resCode, true, aReq.Reply, tmz)
if err != nil {
utils.Logger.Warning(fmt.Sprintf(
"<%s> message: %s - failed to build error answer: %v",
utils.DiameterAgent, m, err))
writeOnConn(c, diamErrMsg(m, diam.UnableToComply,
fmt.Sprintf("failed to build error answer: %v", err)))
return
}
writeOnConn(c, diamAns)
}
func diamBareErr(m *diam.Message, resCode uint32) (a *diam.Message) {
a = m.Answer(resCode)
a.Header.CommandFlags = diam.ErrorFlag
return
// diamErrMsg creates a Diameter error answer with the given result code and optional error message.
func diamErrMsg(m *diam.Message, resCode uint32, msg string) *diam.Message {
ans := m.Answer(resCode)
ans.Header.CommandFlags = diam.ErrorFlag
if msg != "" {
ans.NewAVP(avp.ErrorMessage, 0, 0, datatype.UTF8String(msg))
}
return ans
}
func disectDiamListen(addrs string) (ipAddrs []net.IP) {

View File

@@ -245,10 +245,10 @@ func (cgr *CGREngine) InitServices(setVersions bool, cpuPrfFl *os.File) {
NewFreeswitchAgent(cgr.cfg, cgr.cM, cgr.srvDep),
NewKamailioAgent(cgr.cfg, cgr.cM, cgr.srvDep),
NewJanusAgent(cgr.cfg, cgr.iFilterSCh, cgr.server, cgr.cM, cgr.srvDep),
NewAsteriskAgent(cgr.cfg, cgr.cM, cgr.srvDep), // partial reload
NewRadiusAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), // partial reload
NewDiameterAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), // partial reload
NewHTTPAgent(cgr.cfg, cgr.iFilterSCh, cgr.server, cgr.cM, cgr.srvDep), // no reload
NewAsteriskAgent(cgr.cfg, cgr.cM, cgr.srvDep), // partial reload
NewRadiusAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), // partial reload
NewDiameterAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.caps, cgr.srvDep), // partial reload
NewHTTPAgent(cgr.cfg, cgr.iFilterSCh, cgr.server, cgr.cM, cgr.srvDep), // no reload
NewSIPAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep),
NewEventExporterService(cgr.cfg, cgr.iFilterSCh,

View File

@@ -32,12 +32,13 @@ import (
// NewDiameterAgent returns the Diameter Agent
func NewDiameterAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
connMgr *engine.ConnManager, caps *engine.Caps,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &DiameterAgent{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
caps: caps,
srvDep: srvDep,
}
}
@@ -51,6 +52,7 @@ type DiameterAgent struct {
da *agents.DiameterAgent
connMgr *engine.ConnManager
caps *engine.Caps
lnet string
laddr string
@@ -59,38 +61,39 @@ type DiameterAgent struct {
}
// Start should handle the sercive start
func (da *DiameterAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) (err error) {
func (da *DiameterAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) error {
if da.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, da.filterSChan); err != nil {
return
filterS, err := waitForFilterS(ctx, da.filterSChan)
if err != nil {
return err
}
da.Lock()
defer da.Unlock()
return da.start(filterS, shtDwn)
return da.start(filterS, shtDwn, da.caps)
}
func (da *DiameterAgent) start(filterS *engine.FilterS, shtDwn context.CancelFunc) (err error) {
da.da, err = agents.NewDiameterAgent(da.cfg, filterS, da.connMgr)
func (da *DiameterAgent) start(filterS *engine.FilterS, shtDwn context.CancelFunc, caps *engine.Caps) error {
var err error
da.da, err = agents.NewDiameterAgent(da.cfg, filterS, da.connMgr, caps)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!",
utils.Logger.Err(fmt.Sprintf("<%s> failed to initialize agent: %v",
utils.DiameterAgent, err))
return
return err
}
da.lnet = da.cfg.DiameterAgentCfg().ListenNet
da.laddr = da.cfg.DiameterAgentCfg().Listen
da.stopChan = make(chan struct{})
go func(d *agents.DiameterAgent) {
if err = d.ListenAndServe(da.stopChan); err != nil {
if err := d.ListenAndServe(da.stopChan); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!",
utils.DiameterAgent, err))
shtDwn()
}
}(da.da)
return
return nil
}
// Reload handles the change of config
@@ -106,7 +109,7 @@ func (da *DiameterAgent) Reload(ctx *context.Context, shtDwn context.CancelFunc)
if filterS, err = waitForFilterS(ctx, da.filterSChan); err != nil {
return
}
return da.start(filterS, shtDwn)
return da.start(filterS, shtDwn, da.caps)
}
// Shutdown stops the service
@@ -122,7 +125,7 @@ func (da *DiameterAgent) Shutdown() (err error) {
func (da *DiameterAgent) IsRunning() bool {
da.RLock()
defer da.RUnlock()
return da != nil && da.da != nil
return da.da != nil
}
// ServiceName returns the service name

View File

@@ -49,7 +49,7 @@ func TestDiameterAgentReload1(t *testing.T) {
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
sS := NewSessionService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1),
nil, anz, srvDep)
srv := NewDiameterAgent(cfg, filterSChan, nil, srvDep)
srv := NewDiameterAgent(cfg, filterSChan, nil, nil, srvDep)
engine.NewConnManager(cfg)
srvMngr.AddServices(srv, sS,
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
@@ -102,7 +102,7 @@ func TestDiameterAgentReload2(t *testing.T) {
filterSChan := make(chan *engine.FilterS, 1)
filterSChan <- nil
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
srv := NewDiameterAgent(cfg, filterSChan, nil, srvDep)
srv := NewDiameterAgent(cfg, filterSChan, nil, nil, srvDep)
if srv.IsRunning() {
t.Errorf("Expected service to be down")
}
@@ -119,12 +119,12 @@ func TestDiameterAgentReload3(t *testing.T) {
filterSChan := make(chan *engine.FilterS, 1)
filterSChan <- nil
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
srv := NewDiameterAgent(cfg, filterSChan, nil, srvDep)
srv := NewDiameterAgent(cfg, filterSChan, nil, nil, srvDep)
cfg.DiameterAgentCfg().ListenNet = "bad"
cfg.DiameterAgentCfg().DictionariesPath = ""
err := srv.(*DiameterAgent).start(nil, func() {})
err := srv.(*DiameterAgent).start(nil, func() {}, nil)
if err != nil {
t.Fatal(err)
}