sipagent: add caps for concurrency limiting

This commit is contained in:
ionutboangiu
2026-02-18 18:50:16 +02:00
committed by Dan Christian Bogos
parent a022521922
commit 676cfe4c29
6 changed files with 82 additions and 9 deletions

View File

@@ -23,6 +23,7 @@ package agents
import (
"fmt"
"io"
"net"
"net/http"
"sync"
"testing"
@@ -38,6 +39,7 @@ import (
"github.com/cgrates/go-diameter/diam/datatype"
"github.com/cgrates/go-diameter/diam/dict"
"github.com/cgrates/radigo"
"github.com/cgrates/sipingo"
"github.com/miekg/dns"
)
@@ -84,7 +86,14 @@ func TestAgentCapsIT(t *testing.T) {
"reply_payload": "*xml",
"request_processors": []
}
]
],
"sip_agent": {
"enabled": true,
"listen": "127.0.0.1:5099",
"listen_net": "udp",
"sessions_conns": ["*internal"],
"request_processors": []
}
}`
ng := engine.TestEngine{
@@ -218,6 +227,26 @@ func TestAgentCapsIT(t *testing.T) {
sendHTTPReq(t, httpURL, http.StatusTooManyRequests)
<-doneCh
})
t.Run("SIPAgent", func(t *testing.T) {
sipAddr := cfg.SIPAgentCfg().Listen
// There is currently no traffic. Expecting 500 Internal Server
// Error because there are no request processors enabled.
sendSIPReq(t, sipAddr, "SIP/2.0 500 Internal Server Error")
// Caps limit is 2, therefore expecting the same result.
doneCh := simulateCapsTraffic(t, conn, 1, *cfg.CoreSCfg())
time.Sleep(time.Millisecond)
sendSIPReq(t, sipAddr, "SIP/2.0 500 Internal Server Error")
<-doneCh
// With caps limit reached, 503 Service Unavailable is expected.
doneCh = simulateCapsTraffic(t, conn, 2, *cfg.CoreSCfg())
time.Sleep(time.Millisecond)
sendSIPReq(t, sipAddr, "SIP/2.0 503 Service Unavailable")
<-doneCh
})
}
func sendCCR(t *testing.T, client *DiameterClient, reqIdx *int, wantResultCode string) {
@@ -329,6 +358,40 @@ func sendHTTPReq(t *testing.T, url string, wantStatus int) {
}
}
func sendSIPReq(t *testing.T, addr, wantStatus string) {
t.Helper()
conn, err := net.Dial("udp", addr)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
invite := "INVITE sip:1002@cgrates.org SIP/2.0\r\n" +
"Call-ID: caps-test-" + fmt.Sprint(time.Now().UnixNano()) + "\r\n" +
"CSeq: 1 INVITE\r\n" +
"From: \"1001\" <sip:1001@cgrates.org>;tag=caps1\r\n" +
"To: <sip:1002@cgrates.org>\r\n" +
"Via: SIP/2.0/UDP 127.0.0.1:9999;branch=z9hG4bK-caps-test\r\n" +
"Max-Forwards: 70\r\n" +
"Content-Length: 0\r\n\r\n"
if _, err = conn.Write([]byte(invite)); err != nil {
t.Fatal(err)
}
buf := make([]byte, bufferSize)
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
n, err := conn.Read(buf)
if err != nil {
t.Fatal(err)
}
received, err := sipingo.NewMessage(string(buf[:n]))
if err != nil {
t.Fatal(err)
}
if received[requestHeader] != wantStatus {
t.Errorf("SIP status=%q, want %q", received[requestHeader], wantStatus)
}
}
func simulateCapsTraffic(t *testing.T, client *birpc.Client, amount int, coresCfg config.CoreSCfg) <-chan struct{} {
t.Helper()
var wg sync.WaitGroup

View File

@@ -52,11 +52,12 @@ var (
// NewSIPAgent will construct a SIPAgent
func NewSIPAgent(connMgr *engine.ConnManager, cfg *config.CGRConfig,
filterS *engine.FilterS) (sa *SIPAgent, err error) {
filterS *engine.FilterS, caps *engine.Caps) (sa *SIPAgent, err error) {
sa = &SIPAgent{
connMgr: connMgr,
filterS: filterS,
cfg: cfg,
caps: caps,
ackMap: make(map[string]chan struct{}),
stopChan: make(chan struct{}),
}
@@ -82,6 +83,7 @@ type SIPAgent struct {
connMgr *engine.ConnManager
filterS *engine.FilterS
cfg *config.CGRConfig
caps *engine.Caps
stopChan chan struct{}
ackMap map[string]chan struct{}
ackLocks sync.RWMutex
@@ -296,6 +298,12 @@ func (sa *SIPAgent) answerMessage(messageStr, addr string, write func(ans []byte
}
func (sa *SIPAgent) handleMessage(sipMessage sipingo.Message, remoteHost string) (sipAnswer sipingo.Message) {
if sa.caps.IsLimited() {
if err := sa.caps.Allocate(); err != nil {
return bareSipErr(sipMessage, "SIP/2.0 503 Service Unavailable")
}
defer sa.caps.Deallocate()
}
if sipMessage[userAgentHeader] != "" {
sipMessage[userAgentHeader] = fmt.Sprintf("%s@%s", utils.CGRateS, utils.Version)
}

View File

@@ -629,7 +629,7 @@ func RunCGREngine(args []string, hooks ...func(*config.CGRConfig) error) {
connManager, server, internalEEsChan, anz, srvDep),
NewEventReaderService(cfg, dmService, filterSChan,
shdChan, connManager, server, internalERsChan, anz, srvDep),
NewSIPAgent(cfg, filterSChan, shdChan, connManager, srvDep),
NewSIPAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep),
NewJanusAgent(cfg, filterSChan, server, connManager, srvDep),
)
srvManager.StartServices()

View File

@@ -31,13 +31,14 @@ import (
// NewSIPAgent returns the sip Agent
func NewSIPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
shdChan *utils.SyncedChan, connMgr *engine.ConnManager,
shdChan *utils.SyncedChan, connMgr *engine.ConnManager, caps *engine.Caps,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &SIPAgent{
cfg: cfg,
filterSChan: filterSChan,
shdChan: shdChan,
connMgr: connMgr,
caps: caps,
srvDep: srvDep,
}
}
@@ -51,6 +52,7 @@ type SIPAgent struct {
sip *agents.SIPAgent
connMgr *engine.ConnManager
caps *engine.Caps
srvDep map[string]*sync.WaitGroup
oldListen string
@@ -68,7 +70,7 @@ func (sip *SIPAgent) Start() (err error) {
sip.Lock()
defer sip.Unlock()
sip.oldListen = sip.cfg.SIPAgentCfg().Listen
sip.sip, err = agents.NewSIPAgent(sip.connMgr, sip.cfg, filterS)
sip.sip, err = agents.NewSIPAgent(sip.connMgr, sip.cfg, filterS, sip.caps)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!",
utils.SIPAgent, err))

View File

@@ -62,7 +62,7 @@ func TestSIPAgentReload(t *testing.T) {
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
nil, anz, srvDep)
srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, srvDep)
srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, nil, srvDep)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(srv, sS, db)
if err := srvMngr.StartServices(); err != nil {
@@ -123,7 +123,7 @@ func TestSIPAgentReload2(t *testing.T) {
cacheSChan := make(chan birpc.ClientConnector, 1)
cacheSChan <- cacheSrv
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, srvDep)
srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, nil, srvDep)
if srv.IsRunning() {
t.Fatalf("Expected service to be down")
}
@@ -160,7 +160,7 @@ func TestSIPAgentReload3(t *testing.T) {
cacheSChan := make(chan birpc.ClientConnector, 1)
cacheSChan <- cacheSrv
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, srvDep)
srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, nil, srvDep)
if srv.IsRunning() {
t.Fatalf("Expected service to be down")
}

View File

@@ -45,7 +45,7 @@ func TestSIPAgentCoverage(t *testing.T) {
cacheSChan := make(chan birpc.ClientConnector, 1)
cacheSChan <- cacheSrv
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, srvDep)
srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, nil, srvDep)
if srv.IsRunning() {
t.Errorf("Expected service to be down")
}