Consider radius requests when limiting caps

This commit is contained in:
ionutboangiu
2024-11-13 18:27:16 +02:00
committed by Dan Christian Bogos
parent fc6aff8484
commit 3a6d759bac
7 changed files with 190 additions and 79 deletions

View File

@@ -31,13 +31,14 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/radigo"
"github.com/fiorix/go-diameter/v4/diam"
"github.com/fiorix/go-diameter/v4/diam/avp"
"github.com/fiorix/go-diameter/v4/diam/datatype"
"github.com/fiorix/go-diameter/v4/diam/dict"
)
func TestDiameterAgentCapsIT(t *testing.T) {
func TestAgentCapsIT(t *testing.T) {
var dbCfg engine.DBCfg
switch *utils.DBType {
case utils.MetaInternal:
@@ -67,6 +68,9 @@ func TestDiameterAgentCapsIT(t *testing.T) {
"diameter_agent": {
"enabled": true,
"synced_conn_requests": true
},
"radius_agent": {
"enabled": true
}
}`
@@ -75,78 +79,169 @@ func TestDiameterAgentCapsIT(t *testing.T) {
DBCfg: dbCfg,
}
client, cfg := ng.Run(t)
time.Sleep(10 * time.Millisecond) // wait for services to start
time.Sleep(10 * time.Millisecond) // wait for DiameterAgent service to start
diamClient, err := NewDiameterClient(cfg.DiameterAgentCfg().Listen, "localhost",
cfg.DiameterAgentCfg().OriginRealm, cfg.DiameterAgentCfg().VendorID,
cfg.DiameterAgentCfg().ProductName, utils.DiameterFirmwareRevision,
cfg.DiameterAgentCfg().DictionariesPath, cfg.DiameterAgentCfg().ListenNet)
var i int
t.Run("DiameterAgent", func(t *testing.T) {
diamClient, err := NewDiameterClient(cfg.DiameterAgentCfg().Listen, "localhost",
cfg.DiameterAgentCfg().OriginRealm, cfg.DiameterAgentCfg().VendorID,
cfg.DiameterAgentCfg().ProductName, utils.DiameterFirmwareRevision,
cfg.DiameterAgentCfg().DictionariesPath, cfg.DiameterAgentCfg().ListenNet)
if err != nil {
t.Fatal(err)
}
// There is currently no traffic. Expecting Result-Code 5012 (DIAMETER_UNABLE_TO_COMPLY),
// because there are no request processors enabled.
sendCCR(t, diamClient, &i, "5012")
// Caps limit is 2, therefore expecting the same result as in the scenario above.
doneCh := simulateCapsTraffic(t, client, 1, *cfg.CoreSCfg())
time.Sleep(time.Millisecond) // ensure traffic requests have been sent
sendCCR(t, diamClient, &i, "5012")
<-doneCh
// With caps limit reached, Result-Code 3004 (DIAMETER_TOO_BUSY) is expected.
doneCh = simulateCapsTraffic(t, client, 2, *cfg.CoreSCfg())
time.Sleep(time.Millisecond) // ensure traffic requests have been sent
sendCCR(t, diamClient, &i, "3004")
<-doneCh
// TODO: Check caps functionality with async diameter requests.
})
t.Run("RadiusAgent auth", func(t *testing.T) {
radClient, err := radigo.NewClient(utils.UDP, "127.0.0.1:1812", "CGRateS.org", dictRad, 1, nil, nil)
if err != nil {
t.Fatal(err)
}
// There is currently no traffic. Expecting nil reply because
// there are no request processors enabled.
sendRadReq(t, radClient, radigo.AccessRequest, &i, radigo.AccessAccept)
// Caps limit is 2, therefore expecting the same result as in
// the scenario above.
doneCh := simulateCapsTraffic(t, client, 1, *cfg.CoreSCfg())
time.Sleep(time.Millisecond) // ensure traffic requests have been sent
sendRadReq(t, radClient, radigo.AccessRequest, &i, radigo.AccessAccept)
<-doneCh
// With caps limit reached, Reply with Code 3 (AccessReject)
// and ReplyMessage with the caps error is expected.
doneCh = simulateCapsTraffic(t, client, 2, *cfg.CoreSCfg())
time.Sleep(time.Millisecond) // ensure traffic requests have been sent
sendRadReq(t, radClient, radigo.AccessRequest, &i, radigo.AccessReject)
<-doneCh
})
t.Run("RadiusAgent acct", func(t *testing.T) {
radClient, err := radigo.NewClient(utils.UDP, "127.0.0.1:1813", "CGRateS.org", dictRad, 1, nil, nil)
if err != nil {
t.Fatal(err)
}
// There is currently no traffic. Expecting nil reply because
// there are no request processors enabled.
sendRadReq(t, radClient, radigo.AccountingRequest, &i, 0)
// Caps limit is 2, therefore expecting the same result as in
// the scenario above.
doneCh := simulateCapsTraffic(t, client, 1, *cfg.CoreSCfg())
time.Sleep(time.Millisecond) // ensure traffic requests have been sent
sendRadReq(t, radClient, radigo.AccountingRequest, &i, 0)
<-doneCh
// With caps limit reached, Reply with Code 5 (AccountingResponse)
// and ReplyMessage with the caps error is expected.
doneCh = simulateCapsTraffic(t, client, 2, *cfg.CoreSCfg())
time.Sleep(time.Millisecond) // ensure traffic requests have been sent
sendRadReq(t, radClient, radigo.AccountingRequest, &i, radigo.AccountingResponse)
<-doneCh
})
}
func sendCCR(t *testing.T, client *DiameterClient, reqIdx *int, wantResultCode string) {
*reqIdx++
ccr := diam.NewRequest(diam.CreditControl, 4, nil)
ccr.NewAVP(avp.SessionID, avp.Mbit, 0, datatype.UTF8String(fmt.Sprintf("session%d", reqIdx)))
ccr.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity("CGR-DA"))
ccr.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity("cgrates.org"))
ccr.NewAVP(avp.AuthApplicationID, avp.Mbit, 0, datatype.Unsigned32(4))
ccr.NewAVP(avp.CCRequestType, avp.Mbit, 0, datatype.Enumerated(1))
ccr.NewAVP(avp.CCRequestNumber, avp.Mbit, 0, datatype.Unsigned32(1))
if err := client.SendMessage(ccr); err != nil {
t.Errorf("failed to send diameter message: %v", err)
return
}
reply := client.ReceivedMessage(2 * time.Second)
if reply == nil {
t.Error("received empty reply")
return
}
avps, err := reply.FindAVPsWithPath([]any{"Result-Code"}, dict.UndefinedVendorID)
if err != nil {
t.Fatal(err)
t.Error(err)
return
}
if len(avps) == 0 {
t.Error("missing AVPs in reply")
return
}
reqIdx := 0
sendCCR := func(t *testing.T, replyTimeout time.Duration, wg *sync.WaitGroup, wantResultCode string) {
if wg != nil {
defer wg.Done()
}
reqIdx++
ccr := diam.NewRequest(diam.CreditControl, 4, nil)
ccr.NewAVP(avp.SessionID, avp.Mbit, 0, datatype.UTF8String(fmt.Sprintf("session%d", reqIdx)))
ccr.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity("CGR-DA"))
ccr.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity("cgrates.org"))
ccr.NewAVP(avp.AuthApplicationID, avp.Mbit, 0, datatype.Unsigned32(4))
ccr.NewAVP(avp.CCRequestType, avp.Mbit, 0, datatype.Enumerated(1))
ccr.NewAVP(avp.CCRequestNumber, avp.Mbit, 0, datatype.Unsigned32(1))
resultCode, err := diamAVPAsString(avps[0])
if err != nil {
t.Error(err)
}
if resultCode != wantResultCode {
t.Errorf("Result-Code=%s, want %s", resultCode, wantResultCode)
}
}
if err := diamClient.SendMessage(ccr); err != nil {
t.Errorf("failed to send diameter message: %v", err)
return
func sendRadReq(t *testing.T, client *radigo.Client, reqType radigo.PacketCode, reqIdx *int, wantReplyCode radigo.PacketCode) {
*reqIdx++
req := client.NewRequest(reqType, uint8(*reqIdx))
if err := req.AddAVPWithName("User-Name", "1001", ""); err != nil {
t.Error(err)
}
if err := req.AddAVPWithName("User-Password", "CGRateSPassword1", ""); err != nil {
t.Error(err)
}
// encode the password as required so we can decode it properly
req.AVPs[1].RawValue = radigo.EncodeUserPassword([]byte("CGRateSPassword1"), []byte("CGRateS.org"), req.Authenticator[:])
if err := req.AddAVPWithName("Service-Type", "SIP-Caller-AVPs", ""); err != nil {
t.Error(err)
}
if err := req.AddAVPWithName("Called-Station-Id", "1002", ""); err != nil {
t.Error(err)
}
if err := req.AddAVPWithName("Acct-Session-Id", fmt.Sprintf("session%d", reqIdx), ""); err != nil {
t.Error(err)
}
if err := req.AddAVPWithName("NAS-IP-Address", "127.0.0.1", ""); err != nil {
t.Error(err)
}
reply, err := client.SendRequest(req)
if err != nil && (wantReplyCode == radigo.AccessReject ||
wantReplyCode == radigo.AccountingResponse) {
t.Error(err)
}
if reply != nil && reply.Code != wantReplyCode {
t.Errorf("want non-nil negative reply, got: %s", utils.ToJSON(reply))
}
if reply != nil && reply.Code == wantReplyCode {
if len(reply.AVPs) != 1 {
t.Errorf("reply should have exactly 1 AVP, got: %s", utils.ToJSON(reply))
}
reply := diamClient.ReceivedMessage(replyTimeout)
if reply == nil {
t.Error("received empty reply")
return
}
avps, err := reply.FindAVPsWithPath([]any{"Result-Code"}, dict.UndefinedVendorID)
if err != nil {
t.Error(err)
return
}
if len(avps) == 0 {
t.Error("missing AVPs in reply")
return
}
resultCode, err := diamAVPAsString(avps[0])
if err != nil {
t.Error(err)
}
if resultCode != wantResultCode {
t.Errorf("Result-Code=%s, want %s", resultCode, wantResultCode)
got := string(reply.AVPs[0].RawValue)
want := utils.ErrMaxConcurrentRPCExceededNoCaps.Error()
if got != want {
t.Errorf("ReplyMessage=%v, want %v", got, want)
}
}
// There is currently no traffic. Expecting Result-Code 5012 (DIAMETER_UNABLE_TO_COMPLY),
// because there are no request processors enabled.
diamReplyTimeout := 2 * time.Second
sendCCR(t, diamReplyTimeout, nil, "5012")
// Caps limit is 2, therefore expecting the same result as in the scenario above.
doneCh := simulateCapsTraffic(t, client, 1, *cfg.CoreSCfg())
time.Sleep(time.Millisecond) // ensure traffic requests have been sent
sendCCR(t, diamReplyTimeout, nil, "5012")
<-doneCh
// With caps limit reached, Result-Code 3004 (DIAMETER_TOO_BUSY) is expected.
doneCh = simulateCapsTraffic(t, client, 2, *cfg.CoreSCfg())
time.Sleep(time.Millisecond) // ensure traffic requests have been sent
sendCCR(t, diamReplyTimeout, nil, "3004")
<-doneCh
// TODO: Check caps functionality with async diameter requests.
}
func simulateCapsTraffic(t *testing.T, client *birpc.Client, amount int, coresCfg config.CoreSCfg) <-chan struct{} {

View File

@@ -47,12 +47,13 @@ const (
MSCHAP2SuccessAVP = "MS-CHAP2-Success"
)
func NewRadiusAgent(cgrCfg *config.CGRConfig, filterS *engine.FilterS,
connMgr *engine.ConnManager) (*RadiusAgent, error) {
func NewRadiusAgent(cgrCfg *config.CGRConfig, filterS *engine.FilterS, connMgr *engine.ConnManager,
caps *engine.Caps) (*RadiusAgent, error) {
radAgent := &RadiusAgent{
cgrCfg: cgrCfg,
filterS: filterS,
connMgr: connMgr,
caps: caps,
}
// Register RadiusAgent methods whose names start with "V1" under the "AgentV1" object name.
@@ -99,6 +100,7 @@ type RadiusAgent struct {
sync.RWMutex
cgrCfg *config.CGRConfig // reference for future config reloads
connMgr *engine.ConnManager
caps *engine.Caps
filterS *engine.FilterS
rsAuth map[string]*radigo.Server
rsAcct map[string]*radigo.Server
@@ -137,6 +139,12 @@ func newRadiusDAClientCfg(dicts *radigo.Dictionaries, secrets *radigo.Secrets,
// handleAuth handles RADIUS Authorization request
func (ra *RadiusAgent) handleAuth(reqPacket *radigo.Packet) (*radigo.Packet, error) {
if ra.caps.IsLimited() {
if err := ra.caps.Allocate(); err != nil {
return reqPacket, err
}
defer ra.caps.Deallocate()
}
reqPacket.SetAVPValues() // populate string values in AVPs
replyPacket := reqPacket.Reply()
replyPacket.Code = radigo.AccessAccept
@@ -188,6 +196,12 @@ func (ra *RadiusAgent) handleAuth(reqPacket *radigo.Packet) (*radigo.Packet, err
// handleAcct processes RADIUS Accounting requests and generates a reply.
// It supports Acct-Status-Type values: Start, Interim-Update, Stop.
func (ra *RadiusAgent) handleAcct(reqPacket *radigo.Packet) (*radigo.Packet, error) {
if ra.caps.IsLimited() {
if err := ra.caps.Allocate(); err != nil {
return nil, err
}
defer ra.caps.Deallocate()
}
reqPacket.SetAVPValues() // populate string values in AVPs
replyPacket := reqPacket.Reply()
replyPacket.Code = radigo.AccountingResponse

View File

@@ -40,7 +40,7 @@ func TestNewRadiusAgentFailDict(t *testing.T) {
"badpath": {"bad/path"},
}
exp := "stat bad/path: no such file or directory"
if _, err := NewRadiusAgent(cfg, nil, nil); err == nil || err.Error() != exp {
if _, err := NewRadiusAgent(cfg, nil, nil, nil); err == nil || err.Error() != exp {
t.Errorf("Expected error <%v>, received <%v>", exp, err)
}
}
@@ -51,7 +51,7 @@ func TestNewRadiusAgentOK(t *testing.T) {
exp := &RadiusAgent{
cgrCfg: cfg,
}
if rcv, err := NewRadiusAgent(cfg, nil, nil); err != nil {
if rcv, err := NewRadiusAgent(cfg, nil, nil, nil); err != nil {
if err.Error() == "stat /usr/share/cgrates/radius/dict/: no such file or directory" {
t.SkipNow() // skipping if running in gitactions
}

View File

@@ -621,7 +621,7 @@ func main() {
services.NewFreeswitchAgent(cfg, shdChan, connManager, srvDep),
services.NewKamailioAgent(cfg, shdChan, connManager, srvDep),
services.NewAsteriskAgent(cfg, shdChan, connManager, srvDep), // partial reload
services.NewRadiusAgent(cfg, filterSChan, shdChan, connManager, srvDep), // partial reload
services.NewRadiusAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep), // partial reload
services.NewDiameterAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep), // partial reload
services.NewHTTPAgent(cfg, filterSChan, server, connManager, srvDep), // no reload
ldrs, anz, dspS, dspH, dmService, storDBService,

View File

@@ -31,13 +31,14 @@ import (
// NewRadiusAgent returns the Radius Agent
func NewRadiusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
shdChan *utils.SyncedChan, connMgr *engine.ConnManager,
shdChan *utils.SyncedChan, connMgr *engine.ConnManager, caps *engine.Caps,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &RadiusAgent{
cfg: cfg,
filterSChan: filterSChan,
shdChan: shdChan,
connMgr: connMgr,
caps: caps,
srvDep: srvDep,
}
}
@@ -52,6 +53,7 @@ type RadiusAgent struct {
rad *agents.RadiusAgent
connMgr *engine.ConnManager
caps *engine.Caps
srvDep map[string]*sync.WaitGroup
}
@@ -67,7 +69,7 @@ func (rad *RadiusAgent) Start() (err error) {
rad.Lock()
defer rad.Unlock()
if rad.rad, err = agents.NewRadiusAgent(rad.cfg, filterS, rad.connMgr); err != nil {
if rad.rad, err = agents.NewRadiusAgent(rad.cfg, filterS, rad.connMgr, rad.caps); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.RadiusAgent, err.Error()))
return
}

View File

@@ -60,7 +60,7 @@ func TestRadiusAgentReloadStartShut(t *testing.T) {
filterSChan <- nil
shdChan := utils.NewSyncedChan()
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, srvDep)
srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, nil, srvDep)
shdWg := new(sync.WaitGroup)
srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg, nil)
engine.NewConnManager(cfg, nil)
@@ -124,7 +124,7 @@ func TestRadiusAgentReload1(t *testing.T) {
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
shdChan, nil, anz, srvDep)
srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, srvDep)
srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, nil, srvDep)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(srv, sS,
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
@@ -195,7 +195,7 @@ func TestRadiusAgentReload2(t *testing.T) {
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
shdChan, nil, anz, srvDep)
srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, srvDep)
srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, nil, srvDep)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(srv, sS,
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
@@ -265,7 +265,7 @@ func TestRadiusAgentReload3(t *testing.T) {
time.Sleep(10 * time.Millisecond)
}()
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, srvDep)
srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, nil, srvDep)
err := srv.Start()
if err == nil || err.Error() != "stat test: no such file or directory" {
t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", "stat test: no such file or directory", err)
@@ -283,8 +283,8 @@ func TestRadiusAgentReload4(t *testing.T) {
filterSChan <- nil
shdChan := utils.NewSyncedChan()
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, srvDep)
r, err := agents.NewRadiusAgent(cfg, nil, nil)
srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, nil, srvDep)
r, err := agents.NewRadiusAgent(cfg, nil, nil, nil)
if err != nil {
t.Fatal(err)
}

View File

@@ -45,7 +45,7 @@ func TestRadiusAgentCoverage(t *testing.T) {
cacheSChan := make(chan birpc.ClientConnector, 1)
cacheSChan <- cacheSrv
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, srvDep)
srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, nil, srvDep)
if srv.IsRunning() {
t.Errorf("Expected service to be down")
}