From 676cfe4c2986ae951448b3d0ba54145d6f48c26f Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 18 Feb 2026 18:50:16 +0200 Subject: [PATCH] sipagent: add caps for concurrency limiting --- agents/agents_caps_it_test.go | 65 ++++++++++++++++++++++++++++++++++- agents/sipagent.go | 10 +++++- services/engine.go | 2 +- services/sipagent.go | 6 ++-- services/sipagent_it_test.go | 6 ++-- services/sipagent_test.go | 2 +- 6 files changed, 82 insertions(+), 9 deletions(-) diff --git a/agents/agents_caps_it_test.go b/agents/agents_caps_it_test.go index d69ce9fe4..0292dfd4a 100644 --- a/agents/agents_caps_it_test.go +++ b/agents/agents_caps_it_test.go @@ -23,6 +23,7 @@ package agents import ( "fmt" "io" + "net" "net/http" "sync" "testing" @@ -38,6 +39,7 @@ import ( "github.com/cgrates/go-diameter/diam/datatype" "github.com/cgrates/go-diameter/diam/dict" "github.com/cgrates/radigo" + "github.com/cgrates/sipingo" "github.com/miekg/dns" ) @@ -84,7 +86,14 @@ func TestAgentCapsIT(t *testing.T) { "reply_payload": "*xml", "request_processors": [] } -] +], +"sip_agent": { + "enabled": true, + "listen": "127.0.0.1:5099", + "listen_net": "udp", + "sessions_conns": ["*internal"], + "request_processors": [] +} }` ng := engine.TestEngine{ @@ -218,6 +227,26 @@ func TestAgentCapsIT(t *testing.T) { sendHTTPReq(t, httpURL, http.StatusTooManyRequests) <-doneCh }) + + t.Run("SIPAgent", func(t *testing.T) { + sipAddr := cfg.SIPAgentCfg().Listen + + // There is currently no traffic. Expecting 500 Internal Server + // Error because there are no request processors enabled. + sendSIPReq(t, sipAddr, "SIP/2.0 500 Internal Server Error") + + // Caps limit is 2, therefore expecting the same result. + doneCh := simulateCapsTraffic(t, conn, 1, *cfg.CoreSCfg()) + time.Sleep(time.Millisecond) + sendSIPReq(t, sipAddr, "SIP/2.0 500 Internal Server Error") + <-doneCh + + // With caps limit reached, 503 Service Unavailable is expected. + doneCh = simulateCapsTraffic(t, conn, 2, *cfg.CoreSCfg()) + time.Sleep(time.Millisecond) + sendSIPReq(t, sipAddr, "SIP/2.0 503 Service Unavailable") + <-doneCh + }) } func sendCCR(t *testing.T, client *DiameterClient, reqIdx *int, wantResultCode string) { @@ -329,6 +358,40 @@ func sendHTTPReq(t *testing.T, url string, wantStatus int) { } } +func sendSIPReq(t *testing.T, addr, wantStatus string) { + t.Helper() + conn, err := net.Dial("udp", addr) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + invite := "INVITE sip:1002@cgrates.org SIP/2.0\r\n" + + "Call-ID: caps-test-" + fmt.Sprint(time.Now().UnixNano()) + "\r\n" + + "CSeq: 1 INVITE\r\n" + + "From: \"1001\" ;tag=caps1\r\n" + + "To: \r\n" + + "Via: SIP/2.0/UDP 127.0.0.1:9999;branch=z9hG4bK-caps-test\r\n" + + "Max-Forwards: 70\r\n" + + "Content-Length: 0\r\n\r\n" + if _, err = conn.Write([]byte(invite)); err != nil { + t.Fatal(err) + } + buf := make([]byte, bufferSize) + conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + n, err := conn.Read(buf) + if err != nil { + t.Fatal(err) + } + received, err := sipingo.NewMessage(string(buf[:n])) + if err != nil { + t.Fatal(err) + } + if received[requestHeader] != wantStatus { + t.Errorf("SIP status=%q, want %q", received[requestHeader], wantStatus) + } +} + func simulateCapsTraffic(t *testing.T, client *birpc.Client, amount int, coresCfg config.CoreSCfg) <-chan struct{} { t.Helper() var wg sync.WaitGroup diff --git a/agents/sipagent.go b/agents/sipagent.go index 2985bd78a..cc48928b5 100644 --- a/agents/sipagent.go +++ b/agents/sipagent.go @@ -52,11 +52,12 @@ var ( // NewSIPAgent will construct a SIPAgent func NewSIPAgent(connMgr *engine.ConnManager, cfg *config.CGRConfig, - filterS *engine.FilterS) (sa *SIPAgent, err error) { + filterS *engine.FilterS, caps *engine.Caps) (sa *SIPAgent, err error) { sa = &SIPAgent{ connMgr: connMgr, filterS: filterS, cfg: cfg, + caps: caps, ackMap: make(map[string]chan struct{}), stopChan: make(chan struct{}), } @@ -82,6 +83,7 @@ type SIPAgent struct { connMgr *engine.ConnManager filterS *engine.FilterS cfg *config.CGRConfig + caps *engine.Caps stopChan chan struct{} ackMap map[string]chan struct{} ackLocks sync.RWMutex @@ -296,6 +298,12 @@ func (sa *SIPAgent) answerMessage(messageStr, addr string, write func(ans []byte } func (sa *SIPAgent) handleMessage(sipMessage sipingo.Message, remoteHost string) (sipAnswer sipingo.Message) { + if sa.caps.IsLimited() { + if err := sa.caps.Allocate(); err != nil { + return bareSipErr(sipMessage, "SIP/2.0 503 Service Unavailable") + } + defer sa.caps.Deallocate() + } if sipMessage[userAgentHeader] != "" { sipMessage[userAgentHeader] = fmt.Sprintf("%s@%s", utils.CGRateS, utils.Version) } diff --git a/services/engine.go b/services/engine.go index 77b2581e7..431aeba44 100644 --- a/services/engine.go +++ b/services/engine.go @@ -629,7 +629,7 @@ func RunCGREngine(args []string, hooks ...func(*config.CGRConfig) error) { connManager, server, internalEEsChan, anz, srvDep), NewEventReaderService(cfg, dmService, filterSChan, shdChan, connManager, server, internalERsChan, anz, srvDep), - NewSIPAgent(cfg, filterSChan, shdChan, connManager, srvDep), + NewSIPAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep), NewJanusAgent(cfg, filterSChan, server, connManager, srvDep), ) srvManager.StartServices() diff --git a/services/sipagent.go b/services/sipagent.go index 511d1ef49..3df376f74 100644 --- a/services/sipagent.go +++ b/services/sipagent.go @@ -31,13 +31,14 @@ import ( // NewSIPAgent returns the sip Agent func NewSIPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - shdChan *utils.SyncedChan, connMgr *engine.ConnManager, + shdChan *utils.SyncedChan, connMgr *engine.ConnManager, caps *engine.Caps, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &SIPAgent{ cfg: cfg, filterSChan: filterSChan, shdChan: shdChan, connMgr: connMgr, + caps: caps, srvDep: srvDep, } } @@ -51,6 +52,7 @@ type SIPAgent struct { sip *agents.SIPAgent connMgr *engine.ConnManager + caps *engine.Caps srvDep map[string]*sync.WaitGroup oldListen string @@ -68,7 +70,7 @@ func (sip *SIPAgent) Start() (err error) { sip.Lock() defer sip.Unlock() sip.oldListen = sip.cfg.SIPAgentCfg().Listen - sip.sip, err = agents.NewSIPAgent(sip.connMgr, sip.cfg, filterS) + sip.sip, err = agents.NewSIPAgent(sip.connMgr, sip.cfg, filterS, sip.caps) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.SIPAgent, err)) diff --git a/services/sipagent_it_test.go b/services/sipagent_it_test.go index 94c3fbe01..48e71ed42 100644 --- a/services/sipagent_it_test.go +++ b/services/sipagent_it_test.go @@ -62,7 +62,7 @@ func TestSIPAgentReload(t *testing.T) { anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep) - srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, srvDep) + srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, db) if err := srvMngr.StartServices(); err != nil { @@ -123,7 +123,7 @@ func TestSIPAgentReload2(t *testing.T) { cacheSChan := make(chan birpc.ClientConnector, 1) cacheSChan <- cacheSrv srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, srvDep) + srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, nil, srvDep) if srv.IsRunning() { t.Fatalf("Expected service to be down") } @@ -160,7 +160,7 @@ func TestSIPAgentReload3(t *testing.T) { cacheSChan := make(chan birpc.ClientConnector, 1) cacheSChan <- cacheSrv srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, srvDep) + srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, nil, srvDep) if srv.IsRunning() { t.Fatalf("Expected service to be down") } diff --git a/services/sipagent_test.go b/services/sipagent_test.go index d72561e72..4a5802ae5 100644 --- a/services/sipagent_test.go +++ b/services/sipagent_test.go @@ -45,7 +45,7 @@ func TestSIPAgentCoverage(t *testing.T) { cacheSChan := make(chan birpc.ClientConnector, 1) cacheSChan <- cacheSrv srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, srvDep) + srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, nil, srvDep) if srv.IsRunning() { t.Errorf("Expected service to be down") }