From 4bde2fc07c282b216a874861fdad16dd665953c8 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 18 Feb 2026 18:54:35 +0200 Subject: [PATCH] janusagent: add caps for concurrency limiting --- agents/janusagent.go | 39 ++++++++++++++++++++++++++++++++++++- services/engine.go | 2 +- services/janusagent.go | 6 ++++-- services/janusagent_test.go | 2 +- 4 files changed, 44 insertions(+), 5 deletions(-) diff --git a/agents/janusagent.go b/agents/janusagent.go index cc956789b..4b0aa8d94 100644 --- a/agents/janusagent.go +++ b/agents/janusagent.go @@ -40,11 +40,12 @@ import ( // NewJanusAgent will construct a JanusAgent func NewJanusAgent(cgrCfg *config.CGRConfig, connMgr *engine.ConnManager, - filterS *engine.FilterS) (*JanusAgent, error) { + filterS *engine.FilterS, caps *engine.Caps) (*JanusAgent, error) { jsa := &JanusAgent{ cgrCfg: cgrCfg, connMgr: connMgr, filterS: filterS, + caps: caps, } srv, err := birpc.NewServiceWithMethodsRename(jsa, utils.AgentV1, true, func(oldFn string) (newFn string) { return strings.TrimPrefix(oldFn, "V1") @@ -61,6 +62,7 @@ type JanusAgent struct { cgrCfg *config.CGRConfig connMgr *engine.ConnManager filterS *engine.FilterS + caps *engine.Caps jnsConn *janus.Gateway adminWs *websocket.Conn ctx *context.Context @@ -99,6 +101,13 @@ func (ja *JanusAgent) CORSOptions(w http.ResponseWriter, req *http.Request) { // CreateSession will create a new session within janusgo func (ja *JanusAgent) CreateSession(w http.ResponseWriter, req *http.Request) { janusAccessControlHeaders(w, req) + if ja.caps.IsLimited() { + if err := ja.caps.Allocate(); err != nil { + http.Error(w, err.Error(), http.StatusTooManyRequests) + return + } + defer ja.caps.Deallocate() + } var msg janus.BaseMsg if err := json.NewDecoder(req.Body).Decode(&msg); err != nil { http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) @@ -213,6 +222,13 @@ func (ja *JanusAgent) cdrSession(s *janus.Session) (err error) { // SessioNKeepalive sends keepalive once OPTIONS are coming for the session from HTTP func (ja *JanusAgent) SessionKeepalive(w http.ResponseWriter, r *http.Request) { janusAccessControlHeaders(w, r) + if ja.caps.IsLimited() { + if err := ja.caps.Allocate(); err != nil { + http.Error(w, err.Error(), http.StatusTooManyRequests) + return + } + defer ja.caps.Deallocate() + } sessionID, err := strconv.ParseUint(r.PathValue("sessionID"), 10, 64) if err != nil { http.Error(w, "Invalid session ID", http.StatusBadRequest) @@ -245,6 +261,13 @@ func (ja *JanusAgent) SessionKeepalive(w http.ResponseWriter, r *http.Request) { // PollSession will create a long-poll request to be notified about events and incoming messages from session func (ja *JanusAgent) PollSession(w http.ResponseWriter, req *http.Request) { janusAccessControlHeaders(w, req) + if ja.caps.IsLimited() { + if err := ja.caps.Allocate(); err != nil { + http.Error(w, err.Error(), http.StatusTooManyRequests) + return + } + defer ja.caps.Deallocate() + } sessionID, err := strconv.ParseUint(req.PathValue("sessionID"), 10, 64) if err != nil { http.Error(w, "Invalid session ID", http.StatusBadRequest) @@ -296,6 +319,13 @@ func (ja *JanusAgent) PollSession(w http.ResponseWriter, req *http.Request) { // AttachPlugin will attach a plugin to a session func (ja *JanusAgent) AttachPlugin(w http.ResponseWriter, r *http.Request) { janusAccessControlHeaders(w, r) + if ja.caps.IsLimited() { + if err := ja.caps.Allocate(); err != nil { + http.Error(w, err.Error(), http.StatusTooManyRequests) + return + } + defer ja.caps.Deallocate() + } sessionID, err := strconv.ParseUint(r.PathValue("sessionID"), 10, 64) if err != nil { http.Error(w, "Invalid session ID", http.StatusBadRequest) @@ -345,6 +375,13 @@ func (ja *JanusAgent) AttachPlugin(w http.ResponseWriter, r *http.Request) { // HandlePlugin will handle requests towards a plugin func (ja *JanusAgent) HandlePlugin(w http.ResponseWriter, r *http.Request) { janusAccessControlHeaders(w, r) + if ja.caps.IsLimited() { + if err := ja.caps.Allocate(); err != nil { + http.Error(w, err.Error(), http.StatusTooManyRequests) + return + } + defer ja.caps.Deallocate() + } sessionID, err := strconv.ParseUint(r.PathValue("sessionID"), 10, 64) if err != nil { http.Error(w, "Invalid session ID", http.StatusBadRequest) diff --git a/services/engine.go b/services/engine.go index 975e29208..e4e4fe940 100644 --- a/services/engine.go +++ b/services/engine.go @@ -630,7 +630,7 @@ func RunCGREngine(args []string, hooks ...func(*config.CGRConfig) error) { NewEventReaderService(cfg, dmService, filterSChan, shdChan, connManager, server, internalERsChan, anz, srvDep), NewSIPAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep), - NewJanusAgent(cfg, filterSChan, server, connManager, srvDep), + NewJanusAgent(cfg, filterSChan, server, connManager, caps, srvDep), ) srvManager.StartServices() // Start FilterS diff --git a/services/janusagent.go b/services/janusagent.go index f6e29dfad..410e7b901 100644 --- a/services/janusagent.go +++ b/services/janusagent.go @@ -32,13 +32,14 @@ import ( // NewJanusAgent returns the Janus Agent func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - server *cores.Server, connMgr *engine.ConnManager, + server *cores.Server, connMgr *engine.ConnManager, caps *engine.Caps, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &JanusAgent{ cfg: cfg, filterSChan: filterSChan, server: server, connMgr: connMgr, + caps: caps, srvDep: srvDep, } } @@ -55,6 +56,7 @@ type JanusAgent struct { // if we registerd the jandlers started bool connMgr *engine.ConnManager + caps *engine.Caps srvDep map[string]*sync.WaitGroup } @@ -68,7 +70,7 @@ func (ja *JanusAgent) Start() (err error) { ja.Unlock() return utils.ErrServiceAlreadyRunning } - ja.jA, err = agents.NewJanusAgent(ja.cfg, ja.connMgr, filterS) + ja.jA, err = agents.NewJanusAgent(ja.cfg, ja.connMgr, filterS, ja.caps) if err != nil { return } diff --git a/services/janusagent_test.go b/services/janusagent_test.go index aab4566ec..83dd7537d 100644 --- a/services/janusagent_test.go +++ b/services/janusagent_test.go @@ -38,7 +38,7 @@ func TestJanusAgentCoverage(t *testing.T) { srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} rpcInternal := map[string]chan birpc.ClientConnector{} cM := engine.NewConnManager(cfg, rpcInternal) - srv := NewJanusAgent(cfg, filterSChan, server, cM, srvDep) + srv := NewJanusAgent(cfg, filterSChan, server, cM, nil, srvDep) if srv == nil { t.Errorf("\nExpecting ,\n Received <%+v>", utils.ToJSON(srv)) }