diff --git a/agents/agents_caps_it_test.go b/agents/agents_caps_it_test.go index 71dff9645..0a4271e4a 100644 --- a/agents/agents_caps_it_test.go +++ b/agents/agents_caps_it_test.go @@ -31,13 +31,14 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/radigo" "github.com/fiorix/go-diameter/v4/diam" "github.com/fiorix/go-diameter/v4/diam/avp" "github.com/fiorix/go-diameter/v4/diam/datatype" "github.com/fiorix/go-diameter/v4/diam/dict" ) -func TestDiameterAgentCapsIT(t *testing.T) { +func TestAgentCapsIT(t *testing.T) { var dbCfg engine.DBCfg switch *utils.DBType { case utils.MetaInternal: @@ -67,6 +68,9 @@ func TestDiameterAgentCapsIT(t *testing.T) { "diameter_agent": { "enabled": true, "synced_conn_requests": true +}, +"radius_agent": { + "enabled": true } }` @@ -75,78 +79,169 @@ func TestDiameterAgentCapsIT(t *testing.T) { DBCfg: dbCfg, } client, cfg := ng.Run(t) + time.Sleep(10 * time.Millisecond) // wait for services to start - time.Sleep(10 * time.Millisecond) // wait for DiameterAgent service to start - diamClient, err := NewDiameterClient(cfg.DiameterAgentCfg().Listen, "localhost", - cfg.DiameterAgentCfg().OriginRealm, cfg.DiameterAgentCfg().VendorID, - cfg.DiameterAgentCfg().ProductName, utils.DiameterFirmwareRevision, - cfg.DiameterAgentCfg().DictionariesPath, cfg.DiameterAgentCfg().ListenNet) + var i int + + t.Run("DiameterAgent", func(t *testing.T) { + diamClient, err := NewDiameterClient(cfg.DiameterAgentCfg().Listen, "localhost", + cfg.DiameterAgentCfg().OriginRealm, cfg.DiameterAgentCfg().VendorID, + cfg.DiameterAgentCfg().ProductName, utils.DiameterFirmwareRevision, + cfg.DiameterAgentCfg().DictionariesPath, cfg.DiameterAgentCfg().ListenNet) + if err != nil { + t.Fatal(err) + } + + // There is currently no traffic. Expecting Result-Code 5012 (DIAMETER_UNABLE_TO_COMPLY), + // because there are no request processors enabled. + sendCCR(t, diamClient, &i, "5012") + + // Caps limit is 2, therefore expecting the same result as in the scenario above. + doneCh := simulateCapsTraffic(t, client, 1, *cfg.CoreSCfg()) + time.Sleep(time.Millisecond) // ensure traffic requests have been sent + sendCCR(t, diamClient, &i, "5012") + <-doneCh + + // With caps limit reached, Result-Code 3004 (DIAMETER_TOO_BUSY) is expected. + doneCh = simulateCapsTraffic(t, client, 2, *cfg.CoreSCfg()) + time.Sleep(time.Millisecond) // ensure traffic requests have been sent + sendCCR(t, diamClient, &i, "3004") + <-doneCh + + // TODO: Check caps functionality with async diameter requests. + }) + + t.Run("RadiusAgent auth", func(t *testing.T) { + radClient, err := radigo.NewClient(utils.UDP, "127.0.0.1:1812", "CGRateS.org", dictRad, 1, nil, nil) + if err != nil { + t.Fatal(err) + } + + // There is currently no traffic. Expecting nil reply because + // there are no request processors enabled. + sendRadReq(t, radClient, radigo.AccessRequest, &i, radigo.AccessAccept) + // Caps limit is 2, therefore expecting the same result as in + // the scenario above. + doneCh := simulateCapsTraffic(t, client, 1, *cfg.CoreSCfg()) + time.Sleep(time.Millisecond) // ensure traffic requests have been sent + sendRadReq(t, radClient, radigo.AccessRequest, &i, radigo.AccessAccept) + <-doneCh + + // With caps limit reached, Reply with Code 3 (AccessReject) + // and ReplyMessage with the caps error is expected. + doneCh = simulateCapsTraffic(t, client, 2, *cfg.CoreSCfg()) + time.Sleep(time.Millisecond) // ensure traffic requests have been sent + sendRadReq(t, radClient, radigo.AccessRequest, &i, radigo.AccessReject) + <-doneCh + }) + + t.Run("RadiusAgent acct", func(t *testing.T) { + radClient, err := radigo.NewClient(utils.UDP, "127.0.0.1:1813", "CGRateS.org", dictRad, 1, nil, nil) + if err != nil { + t.Fatal(err) + } + + // There is currently no traffic. Expecting nil reply because + // there are no request processors enabled. + sendRadReq(t, radClient, radigo.AccountingRequest, &i, 0) + + // Caps limit is 2, therefore expecting the same result as in + // the scenario above. + doneCh := simulateCapsTraffic(t, client, 1, *cfg.CoreSCfg()) + time.Sleep(time.Millisecond) // ensure traffic requests have been sent + sendRadReq(t, radClient, radigo.AccountingRequest, &i, 0) + <-doneCh + + // With caps limit reached, Reply with Code 5 (AccountingResponse) + // and ReplyMessage with the caps error is expected. + doneCh = simulateCapsTraffic(t, client, 2, *cfg.CoreSCfg()) + time.Sleep(time.Millisecond) // ensure traffic requests have been sent + sendRadReq(t, radClient, radigo.AccountingRequest, &i, radigo.AccountingResponse) + <-doneCh + }) +} + +func sendCCR(t *testing.T, client *DiameterClient, reqIdx *int, wantResultCode string) { + *reqIdx++ + ccr := diam.NewRequest(diam.CreditControl, 4, nil) + ccr.NewAVP(avp.SessionID, avp.Mbit, 0, datatype.UTF8String(fmt.Sprintf("session%d", reqIdx))) + ccr.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity("CGR-DA")) + ccr.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity("cgrates.org")) + ccr.NewAVP(avp.AuthApplicationID, avp.Mbit, 0, datatype.Unsigned32(4)) + ccr.NewAVP(avp.CCRequestType, avp.Mbit, 0, datatype.Enumerated(1)) + ccr.NewAVP(avp.CCRequestNumber, avp.Mbit, 0, datatype.Unsigned32(1)) + + if err := client.SendMessage(ccr); err != nil { + t.Errorf("failed to send diameter message: %v", err) + return + } + + reply := client.ReceivedMessage(2 * time.Second) + if reply == nil { + t.Error("received empty reply") + return + } + + avps, err := reply.FindAVPsWithPath([]any{"Result-Code"}, dict.UndefinedVendorID) if err != nil { - t.Fatal(err) + t.Error(err) + return + } + if len(avps) == 0 { + t.Error("missing AVPs in reply") + return } - reqIdx := 0 - sendCCR := func(t *testing.T, replyTimeout time.Duration, wg *sync.WaitGroup, wantResultCode string) { - if wg != nil { - defer wg.Done() - } - reqIdx++ - ccr := diam.NewRequest(diam.CreditControl, 4, nil) - ccr.NewAVP(avp.SessionID, avp.Mbit, 0, datatype.UTF8String(fmt.Sprintf("session%d", reqIdx))) - ccr.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity("CGR-DA")) - ccr.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity("cgrates.org")) - ccr.NewAVP(avp.AuthApplicationID, avp.Mbit, 0, datatype.Unsigned32(4)) - ccr.NewAVP(avp.CCRequestType, avp.Mbit, 0, datatype.Enumerated(1)) - ccr.NewAVP(avp.CCRequestNumber, avp.Mbit, 0, datatype.Unsigned32(1)) + resultCode, err := diamAVPAsString(avps[0]) + if err != nil { + t.Error(err) + } + if resultCode != wantResultCode { + t.Errorf("Result-Code=%s, want %s", resultCode, wantResultCode) + } +} - if err := diamClient.SendMessage(ccr); err != nil { - t.Errorf("failed to send diameter message: %v", err) - return +func sendRadReq(t *testing.T, client *radigo.Client, reqType radigo.PacketCode, reqIdx *int, wantReplyCode radigo.PacketCode) { + *reqIdx++ + req := client.NewRequest(reqType, uint8(*reqIdx)) + if err := req.AddAVPWithName("User-Name", "1001", ""); err != nil { + t.Error(err) + } + if err := req.AddAVPWithName("User-Password", "CGRateSPassword1", ""); err != nil { + t.Error(err) + } + // encode the password as required so we can decode it properly + req.AVPs[1].RawValue = radigo.EncodeUserPassword([]byte("CGRateSPassword1"), []byte("CGRateS.org"), req.Authenticator[:]) + if err := req.AddAVPWithName("Service-Type", "SIP-Caller-AVPs", ""); err != nil { + t.Error(err) + } + if err := req.AddAVPWithName("Called-Station-Id", "1002", ""); err != nil { + t.Error(err) + } + if err := req.AddAVPWithName("Acct-Session-Id", fmt.Sprintf("session%d", reqIdx), ""); err != nil { + t.Error(err) + } + if err := req.AddAVPWithName("NAS-IP-Address", "127.0.0.1", ""); err != nil { + t.Error(err) + } + reply, err := client.SendRequest(req) + if err != nil && (wantReplyCode == radigo.AccessReject || + wantReplyCode == radigo.AccountingResponse) { + t.Error(err) + } + if reply != nil && reply.Code != wantReplyCode { + t.Errorf("want non-nil negative reply, got: %s", utils.ToJSON(reply)) + } + if reply != nil && reply.Code == wantReplyCode { + if len(reply.AVPs) != 1 { + t.Errorf("reply should have exactly 1 AVP, got: %s", utils.ToJSON(reply)) } - - reply := diamClient.ReceivedMessage(replyTimeout) - if reply == nil { - t.Error("received empty reply") - return - } - - avps, err := reply.FindAVPsWithPath([]any{"Result-Code"}, dict.UndefinedVendorID) - if err != nil { - t.Error(err) - return - } - if len(avps) == 0 { - t.Error("missing AVPs in reply") - return - } - - resultCode, err := diamAVPAsString(avps[0]) - if err != nil { - t.Error(err) - } - if resultCode != wantResultCode { - t.Errorf("Result-Code=%s, want %s", resultCode, wantResultCode) + got := string(reply.AVPs[0].RawValue) + want := utils.ErrMaxConcurrentRPCExceededNoCaps.Error() + if got != want { + t.Errorf("ReplyMessage=%v, want %v", got, want) } } - - // There is currently no traffic. Expecting Result-Code 5012 (DIAMETER_UNABLE_TO_COMPLY), - // because there are no request processors enabled. - diamReplyTimeout := 2 * time.Second - sendCCR(t, diamReplyTimeout, nil, "5012") - - // Caps limit is 2, therefore expecting the same result as in the scenario above. - doneCh := simulateCapsTraffic(t, client, 1, *cfg.CoreSCfg()) - time.Sleep(time.Millisecond) // ensure traffic requests have been sent - sendCCR(t, diamReplyTimeout, nil, "5012") - <-doneCh - - // With caps limit reached, Result-Code 3004 (DIAMETER_TOO_BUSY) is expected. - doneCh = simulateCapsTraffic(t, client, 2, *cfg.CoreSCfg()) - time.Sleep(time.Millisecond) // ensure traffic requests have been sent - sendCCR(t, diamReplyTimeout, nil, "3004") - <-doneCh - - // TODO: Check caps functionality with async diameter requests. } func simulateCapsTraffic(t *testing.T, client *birpc.Client, amount int, coresCfg config.CoreSCfg) <-chan struct{} { diff --git a/agents/radagent.go b/agents/radagent.go index a378e6e4e..7439ae772 100644 --- a/agents/radagent.go +++ b/agents/radagent.go @@ -47,12 +47,13 @@ const ( MSCHAP2SuccessAVP = "MS-CHAP2-Success" ) -func NewRadiusAgent(cgrCfg *config.CGRConfig, filterS *engine.FilterS, - connMgr *engine.ConnManager) (*RadiusAgent, error) { +func NewRadiusAgent(cgrCfg *config.CGRConfig, filterS *engine.FilterS, connMgr *engine.ConnManager, + caps *engine.Caps) (*RadiusAgent, error) { radAgent := &RadiusAgent{ cgrCfg: cgrCfg, filterS: filterS, connMgr: connMgr, + caps: caps, } // Register RadiusAgent methods whose names start with "V1" under the "AgentV1" object name. @@ -99,6 +100,7 @@ type RadiusAgent struct { sync.RWMutex cgrCfg *config.CGRConfig // reference for future config reloads connMgr *engine.ConnManager + caps *engine.Caps filterS *engine.FilterS rsAuth map[string]*radigo.Server rsAcct map[string]*radigo.Server @@ -137,6 +139,12 @@ func newRadiusDAClientCfg(dicts *radigo.Dictionaries, secrets *radigo.Secrets, // handleAuth handles RADIUS Authorization request func (ra *RadiusAgent) handleAuth(reqPacket *radigo.Packet) (*radigo.Packet, error) { + if ra.caps.IsLimited() { + if err := ra.caps.Allocate(); err != nil { + return reqPacket, err + } + defer ra.caps.Deallocate() + } reqPacket.SetAVPValues() // populate string values in AVPs replyPacket := reqPacket.Reply() replyPacket.Code = radigo.AccessAccept @@ -188,6 +196,12 @@ func (ra *RadiusAgent) handleAuth(reqPacket *radigo.Packet) (*radigo.Packet, err // handleAcct processes RADIUS Accounting requests and generates a reply. // It supports Acct-Status-Type values: Start, Interim-Update, Stop. func (ra *RadiusAgent) handleAcct(reqPacket *radigo.Packet) (*radigo.Packet, error) { + if ra.caps.IsLimited() { + if err := ra.caps.Allocate(); err != nil { + return nil, err + } + defer ra.caps.Deallocate() + } reqPacket.SetAVPValues() // populate string values in AVPs replyPacket := reqPacket.Reply() replyPacket.Code = radigo.AccountingResponse diff --git a/agents/radagent_test.go b/agents/radagent_test.go index 60a30bd38..3b1cccdef 100644 --- a/agents/radagent_test.go +++ b/agents/radagent_test.go @@ -40,7 +40,7 @@ func TestNewRadiusAgentFailDict(t *testing.T) { "badpath": {"bad/path"}, } exp := "stat bad/path: no such file or directory" - if _, err := NewRadiusAgent(cfg, nil, nil); err == nil || err.Error() != exp { + if _, err := NewRadiusAgent(cfg, nil, nil, nil); err == nil || err.Error() != exp { t.Errorf("Expected error <%v>, received <%v>", exp, err) } } @@ -51,7 +51,7 @@ func TestNewRadiusAgentOK(t *testing.T) { exp := &RadiusAgent{ cgrCfg: cfg, } - if rcv, err := NewRadiusAgent(cfg, nil, nil); err != nil { + if rcv, err := NewRadiusAgent(cfg, nil, nil, nil); err != nil { if err.Error() == "stat /usr/share/cgrates/radius/dict/: no such file or directory" { t.SkipNow() // skipping if running in gitactions } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 9321c70bb..860b7e249 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -621,7 +621,7 @@ func main() { services.NewFreeswitchAgent(cfg, shdChan, connManager, srvDep), services.NewKamailioAgent(cfg, shdChan, connManager, srvDep), services.NewAsteriskAgent(cfg, shdChan, connManager, srvDep), // partial reload - services.NewRadiusAgent(cfg, filterSChan, shdChan, connManager, srvDep), // partial reload + services.NewRadiusAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep), // partial reload services.NewDiameterAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep), // partial reload services.NewHTTPAgent(cfg, filterSChan, server, connManager, srvDep), // no reload ldrs, anz, dspS, dspH, dmService, storDBService, diff --git a/services/radiusagent.go b/services/radiusagent.go index 7e2ecd854..abed97321 100644 --- a/services/radiusagent.go +++ b/services/radiusagent.go @@ -31,13 +31,14 @@ import ( // NewRadiusAgent returns the Radius Agent func NewRadiusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - shdChan *utils.SyncedChan, connMgr *engine.ConnManager, + shdChan *utils.SyncedChan, connMgr *engine.ConnManager, caps *engine.Caps, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &RadiusAgent{ cfg: cfg, filterSChan: filterSChan, shdChan: shdChan, connMgr: connMgr, + caps: caps, srvDep: srvDep, } } @@ -52,6 +53,7 @@ type RadiusAgent struct { rad *agents.RadiusAgent connMgr *engine.ConnManager + caps *engine.Caps srvDep map[string]*sync.WaitGroup } @@ -67,7 +69,7 @@ func (rad *RadiusAgent) Start() (err error) { rad.Lock() defer rad.Unlock() - if rad.rad, err = agents.NewRadiusAgent(rad.cfg, filterS, rad.connMgr); err != nil { + if rad.rad, err = agents.NewRadiusAgent(rad.cfg, filterS, rad.connMgr, rad.caps); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.RadiusAgent, err.Error())) return } diff --git a/services/radiusagent_it_test.go b/services/radiusagent_it_test.go index 117bf1fc5..17286a9f0 100644 --- a/services/radiusagent_it_test.go +++ b/services/radiusagent_it_test.go @@ -60,7 +60,7 @@ func TestRadiusAgentReloadStartShut(t *testing.T) { filterSChan <- nil shdChan := utils.NewSyncedChan() srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, srvDep) + srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, nil, srvDep) shdWg := new(sync.WaitGroup) srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg, nil) engine.NewConnManager(cfg, nil) @@ -124,7 +124,7 @@ func TestRadiusAgentReload1(t *testing.T) { anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, anz, srvDep) - srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, srvDep) + srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db) @@ -195,7 +195,7 @@ func TestRadiusAgentReload2(t *testing.T) { anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, anz, srvDep) - srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, srvDep) + srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db) @@ -265,7 +265,7 @@ func TestRadiusAgentReload3(t *testing.T) { time.Sleep(10 * time.Millisecond) }() srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, srvDep) + srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, nil, srvDep) err := srv.Start() if err == nil || err.Error() != "stat test: no such file or directory" { t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", "stat test: no such file or directory", err) @@ -283,8 +283,8 @@ func TestRadiusAgentReload4(t *testing.T) { filterSChan <- nil shdChan := utils.NewSyncedChan() srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, srvDep) - r, err := agents.NewRadiusAgent(cfg, nil, nil) + srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, nil, srvDep) + r, err := agents.NewRadiusAgent(cfg, nil, nil, nil) if err != nil { t.Fatal(err) } diff --git a/services/radiusagent_test.go b/services/radiusagent_test.go index 6ff24df22..acbed1648 100644 --- a/services/radiusagent_test.go +++ b/services/radiusagent_test.go @@ -45,7 +45,7 @@ func TestRadiusAgentCoverage(t *testing.T) { cacheSChan := make(chan birpc.ClientConnector, 1) cacheSChan <- cacheSrv srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, srvDep) + srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, nil, srvDep) if srv.IsRunning() { t.Errorf("Expected service to be down") }