janusagent: add caps for concurrency limiting

This commit is contained in:
ionutboangiu
2026-02-18 18:54:35 +02:00
committed by Dan Christian Bogos
parent 421ef0e78d
commit 4bde2fc07c
4 changed files with 44 additions and 5 deletions

View File

@@ -40,11 +40,12 @@ import (
// NewJanusAgent will construct a JanusAgent
func NewJanusAgent(cgrCfg *config.CGRConfig,
connMgr *engine.ConnManager,
filterS *engine.FilterS) (*JanusAgent, error) {
filterS *engine.FilterS, caps *engine.Caps) (*JanusAgent, error) {
jsa := &JanusAgent{
cgrCfg: cgrCfg,
connMgr: connMgr,
filterS: filterS,
caps: caps,
}
srv, err := birpc.NewServiceWithMethodsRename(jsa, utils.AgentV1, true, func(oldFn string) (newFn string) {
return strings.TrimPrefix(oldFn, "V1")
@@ -61,6 +62,7 @@ type JanusAgent struct {
cgrCfg *config.CGRConfig
connMgr *engine.ConnManager
filterS *engine.FilterS
caps *engine.Caps
jnsConn *janus.Gateway
adminWs *websocket.Conn
ctx *context.Context
@@ -99,6 +101,13 @@ func (ja *JanusAgent) CORSOptions(w http.ResponseWriter, req *http.Request) {
// CreateSession will create a new session within janusgo
func (ja *JanusAgent) CreateSession(w http.ResponseWriter, req *http.Request) {
janusAccessControlHeaders(w, req)
if ja.caps.IsLimited() {
if err := ja.caps.Allocate(); err != nil {
http.Error(w, err.Error(), http.StatusTooManyRequests)
return
}
defer ja.caps.Deallocate()
}
var msg janus.BaseMsg
if err := json.NewDecoder(req.Body).Decode(&msg); err != nil {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
@@ -213,6 +222,13 @@ func (ja *JanusAgent) cdrSession(s *janus.Session) (err error) {
// SessioNKeepalive sends keepalive once OPTIONS are coming for the session from HTTP
func (ja *JanusAgent) SessionKeepalive(w http.ResponseWriter, r *http.Request) {
janusAccessControlHeaders(w, r)
if ja.caps.IsLimited() {
if err := ja.caps.Allocate(); err != nil {
http.Error(w, err.Error(), http.StatusTooManyRequests)
return
}
defer ja.caps.Deallocate()
}
sessionID, err := strconv.ParseUint(r.PathValue("sessionID"), 10, 64)
if err != nil {
http.Error(w, "Invalid session ID", http.StatusBadRequest)
@@ -245,6 +261,13 @@ func (ja *JanusAgent) SessionKeepalive(w http.ResponseWriter, r *http.Request) {
// PollSession will create a long-poll request to be notified about events and incoming messages from session
func (ja *JanusAgent) PollSession(w http.ResponseWriter, req *http.Request) {
janusAccessControlHeaders(w, req)
if ja.caps.IsLimited() {
if err := ja.caps.Allocate(); err != nil {
http.Error(w, err.Error(), http.StatusTooManyRequests)
return
}
defer ja.caps.Deallocate()
}
sessionID, err := strconv.ParseUint(req.PathValue("sessionID"), 10, 64)
if err != nil {
http.Error(w, "Invalid session ID", http.StatusBadRequest)
@@ -296,6 +319,13 @@ func (ja *JanusAgent) PollSession(w http.ResponseWriter, req *http.Request) {
// AttachPlugin will attach a plugin to a session
func (ja *JanusAgent) AttachPlugin(w http.ResponseWriter, r *http.Request) {
janusAccessControlHeaders(w, r)
if ja.caps.IsLimited() {
if err := ja.caps.Allocate(); err != nil {
http.Error(w, err.Error(), http.StatusTooManyRequests)
return
}
defer ja.caps.Deallocate()
}
sessionID, err := strconv.ParseUint(r.PathValue("sessionID"), 10, 64)
if err != nil {
http.Error(w, "Invalid session ID", http.StatusBadRequest)
@@ -345,6 +375,13 @@ func (ja *JanusAgent) AttachPlugin(w http.ResponseWriter, r *http.Request) {
// HandlePlugin will handle requests towards a plugin
func (ja *JanusAgent) HandlePlugin(w http.ResponseWriter, r *http.Request) {
janusAccessControlHeaders(w, r)
if ja.caps.IsLimited() {
if err := ja.caps.Allocate(); err != nil {
http.Error(w, err.Error(), http.StatusTooManyRequests)
return
}
defer ja.caps.Deallocate()
}
sessionID, err := strconv.ParseUint(r.PathValue("sessionID"), 10, 64)
if err != nil {
http.Error(w, "Invalid session ID", http.StatusBadRequest)

View File

@@ -630,7 +630,7 @@ func RunCGREngine(args []string, hooks ...func(*config.CGRConfig) error) {
NewEventReaderService(cfg, dmService, filterSChan,
shdChan, connManager, server, internalERsChan, anz, srvDep),
NewSIPAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep),
NewJanusAgent(cfg, filterSChan, server, connManager, srvDep),
NewJanusAgent(cfg, filterSChan, server, connManager, caps, srvDep),
)
srvManager.StartServices()
// Start FilterS

View File

@@ -32,13 +32,14 @@ import (
// NewJanusAgent returns the Janus Agent
func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
server *cores.Server, connMgr *engine.ConnManager,
server *cores.Server, connMgr *engine.ConnManager, caps *engine.Caps,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &JanusAgent{
cfg: cfg,
filterSChan: filterSChan,
server: server,
connMgr: connMgr,
caps: caps,
srvDep: srvDep,
}
}
@@ -55,6 +56,7 @@ type JanusAgent struct {
// if we registerd the jandlers
started bool
connMgr *engine.ConnManager
caps *engine.Caps
srvDep map[string]*sync.WaitGroup
}
@@ -68,7 +70,7 @@ func (ja *JanusAgent) Start() (err error) {
ja.Unlock()
return utils.ErrServiceAlreadyRunning
}
ja.jA, err = agents.NewJanusAgent(ja.cfg, ja.connMgr, filterS)
ja.jA, err = agents.NewJanusAgent(ja.cfg, ja.connMgr, filterS, ja.caps)
if err != nil {
return
}

View File

@@ -38,7 +38,7 @@ func TestJanusAgentCoverage(t *testing.T) {
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
rpcInternal := map[string]chan birpc.ClientConnector{}
cM := engine.NewConnManager(cfg, rpcInternal)
srv := NewJanusAgent(cfg, filterSChan, server, cM, srvDep)
srv := NewJanusAgent(cfg, filterSChan, server, cM, nil, srvDep)
if srv == nil {
t.Errorf("\nExpecting <nil>,\n Received <%+v>", utils.ToJSON(srv))
}