freeswitchagent: add caps for concurrency limiting

This commit is contained in:
ionutboangiu
2026-02-23 21:28:08 +02:00
committed by Dan Christian Bogos
parent 52419c493a
commit 94577c06c5
2 changed files with 35 additions and 2 deletions

View File

@@ -34,7 +34,7 @@ import (
)
func NewFSsessions(cgrcfg *config.CGRConfig, filterS *engine.FilterS,
timezone string, connMgr *engine.ConnManager) (*FSsessions, error) {
timezone string, connMgr *engine.ConnManager, caps *engine.Caps) (*FSsessions, error) {
fsAgentConfig := cgrcfg.FsAgentCfg()
fsa := &FSsessions{
cgrcfg: cgrcfg,
@@ -43,6 +43,7 @@ func NewFSsessions(cgrcfg *config.CGRConfig, filterS *engine.FilterS,
senderPools: make([]*fsock.FSockPool, len(fsAgentConfig.EventSocketConns)),
timezone: timezone,
connMgr: connMgr,
caps: caps,
filterS: filterS,
}
srv, _ := birpc.NewService(fsa, "", false)
@@ -74,6 +75,7 @@ type FSsessions struct {
senderPools []*fsock.FSockPool // Keep sender pools here
timezone string
connMgr *engine.ConnManager
caps *engine.Caps
ctx *context.Context
filterS *engine.FilterS
}
@@ -164,6 +166,17 @@ func (fsa *FSsessions) unparkCall(uuid string, connIdx int, callDestNb, notify s
}
func (fsa *FSsessions) onChannelPark(fsev FSEvent, connIdx int) {
if fsa.caps.IsLimited() {
if err := fsa.caps.Allocate(); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> caps limit reached, rejecting park for channel %s: %v",
utils.FreeSWITCHAgent, fsev.GetUUID(), err))
fsa.unparkCall(fsev.GetUUID(), connIdx,
fsev.GetCallDestNr(utils.MetaDefault), err.Error())
return
}
defer fsa.caps.Deallocate()
}
if connIdx >= len(fsa.conns) { // protection against index out of range panic
err := fmt.Errorf("Index out of range[0,%v): %v ", len(fsa.conns), connIdx)
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error()))
@@ -248,6 +261,15 @@ func (fsa *FSsessions) onChannelPark(fsev FSEvent, connIdx int) {
}
func (fsa *FSsessions) onChannelAnswer(fsev FSEvent, connIdx int) {
if fsa.caps.IsLimited() {
if err := fsa.caps.Allocate(); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> caps limit reached, rejecting answer for channel %s: %v",
utils.FreeSWITCHAgent, fsev.GetUUID(), err))
return
}
defer fsa.caps.Deallocate()
}
if connIdx >= len(fsa.conns) { // protection against index out of range panic
err := fmt.Errorf("Index out of range[0,%v): %v ", len(fsa.conns), connIdx)
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error()))
@@ -301,6 +323,15 @@ func (fsa *FSsessions) onChannelAnswer(fsev FSEvent, connIdx int) {
}
func (fsa *FSsessions) onChannelHangupComplete(fsev FSEvent, connIdx int) {
if fsa.caps.IsLimited() {
if err := fsa.caps.Allocate(); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> caps limit reached, rejecting hangup for channel %s: %v",
utils.FreeSWITCHAgent, fsev.GetUUID(), err))
return
}
defer fsa.caps.Deallocate()
}
if connIdx >= len(fsa.conns) { // protection against index out of range panic
err := fmt.Errorf("Index out of range[0,%v): %v ", len(fsa.conns), connIdx)
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error()))

View File

@@ -50,6 +50,7 @@ func (fS *FreeswitchAgent) Start(shutdown *utils.SyncedChan, registry *servmanag
[]string{
utils.ConnManager,
utils.FilterS,
utils.CapS,
},
registry, fS.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
@@ -57,11 +58,12 @@ func (fS *FreeswitchAgent) Start(shutdown *utils.SyncedChan, registry *servmanag
}
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
caps := srvDeps[utils.CapS].(*CapService).Caps()
fS.Lock()
defer fS.Unlock()
fS.fS, err = agents.NewFSsessions(fs.cfg, fs.FilterS(), fS.cfg.GeneralCfg().DefaultTimezone, cms.ConnManager())
fS.fS, err = agents.NewFSsessions(fs.cfg, fs.FilterS(), fS.cfg.GeneralCfg().DefaultTimezone, cms.ConnManager(), caps)
if err != nil {
return
}