asteriskagent: add caps for concurrency limiting

This commit is contained in:
ionutboangiu
2026-02-23 21:30:12 +02:00
committed by Dan Christian Bogos
parent 2840a56e69
commit 45b2a14093
2 changed files with 36 additions and 3 deletions

View File

@@ -57,11 +57,12 @@ const (
// NewAsteriskAgent constructs a new Asterisk Agent
func NewAsteriskAgent(cgrCfg *config.CGRConfig, astConnIdx int,
connMgr *engine.ConnManager) *AsteriskAgent {
connMgr *engine.ConnManager, caps *engine.Caps) *AsteriskAgent {
sma := &AsteriskAgent{
cgrCfg: cgrCfg,
astConnIdx: astConnIdx,
connMgr: connMgr,
caps: caps,
eventsCache: make(map[string]*utils.CGREvent),
}
srv, _ := birpc.NewService(sma, "", false)
@@ -73,6 +74,7 @@ func NewAsteriskAgent(cgrCfg *config.CGRConfig, astConnIdx int,
type AsteriskAgent struct {
cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple
connMgr *engine.ConnManager
caps *engine.Caps
astConnIdx int
astConn *aringo.ARInGO
astEvChan chan map[string]any
@@ -154,6 +156,15 @@ func (sma *AsteriskAgent) hangupChannel(channelID, warnMsg string) {
}
func (sma *AsteriskAgent) handleStasisStart(ev *SMAsteriskEvent) {
if sma.caps.IsLimited() {
if err := sma.caps.Allocate(); err != nil {
sma.hangupChannel(ev.ChannelID(),
fmt.Sprintf("<%s> caps limit reached, rejecting channel %s: %v",
utils.AsteriskAgent, ev.ChannelID(), err))
return
}
defer sma.caps.Deallocate()
}
// Subscribe for channel updates even after we leave Stasis
if _, err := sma.astConn.Call(aringo.HTTP_POST,
fmt.Sprintf("http://%s/ari/applications/%s/subscription?eventSource=channel:%s",
@@ -244,6 +255,15 @@ func (sma *AsteriskAgent) handleChannelStateChange(ev *SMAsteriskEvent) {
if ev.ChannelState() != channelUp {
return
}
if sma.caps.IsLimited() {
if err := sma.caps.Allocate(); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> caps limit reached, rejecting state change for channel %s: %v",
utils.AsteriskAgent, ev.ChannelID(), err))
return
}
defer sma.caps.Deallocate()
}
sma.evCacheMux.RLock()
cgrEvDisp, hasIt := sma.eventsCache[ev.ChannelID()]
sma.evCacheMux.RUnlock()
@@ -284,6 +304,15 @@ func (sma *AsteriskAgent) handleChannelStateChange(ev *SMAsteriskEvent) {
// Channel disconnect
func (sma *AsteriskAgent) handleChannelDestroyed(ev *SMAsteriskEvent) {
if sma.caps.IsLimited() {
if err := sma.caps.Allocate(); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> caps limit reached, rejecting destroy for channel %s: %v",
utils.AsteriskAgent, ev.ChannelID(), err))
return
}
defer sma.caps.Deallocate()
}
chID := ev.ChannelID()
sma.evCacheMux.RLock()
cgrEvDisp, hasIt := sma.eventsCache[chID]

View File

@@ -47,10 +47,14 @@ type AsteriskAgent struct {
// Start should handle the sercive start
func (ast *AsteriskAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
cms, err := WaitForServiceState(utils.StateServiceUP, utils.ConnManager, registry, ast.cfg.GeneralCfg().ConnectTimeout)
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{utils.ConnManager, utils.CapS},
registry, ast.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
cm := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager()
caps := srvDeps[utils.CapS].(*CapService).Caps()
ast.mu.Lock()
defer ast.mu.Unlock()
@@ -64,7 +68,7 @@ func (ast *AsteriskAgent) Start(shutdown *utils.SyncedChan, registry *servmanage
ast.stopChan = make(chan struct{})
ast.smas = make([]*agents.AsteriskAgent, len(ast.cfg.AsteriskAgentCfg().AsteriskConns))
for connIdx := range ast.cfg.AsteriskAgentCfg().AsteriskConns { // Instantiate connections towards asterisk servers
ast.smas[connIdx] = agents.NewAsteriskAgent(ast.cfg, connIdx, cms.(*ConnManagerService).ConnManager())
ast.smas[connIdx] = agents.NewAsteriskAgent(ast.cfg, connIdx, cm, caps)
go listenAndServe(ast.smas[connIdx], ast.stopChan)
}
return