asteriskagent: add caps for concurrency limiting

This commit is contained in:
ionutboangiu
2026-02-18 18:54:07 +02:00
committed by Dan Christian Bogos
parent 73bc9603da
commit 421ef0e78d
6 changed files with 41 additions and 10 deletions

View File

@@ -56,11 +56,12 @@ const (
// NewAsteriskAgent constructs a new Asterisk Agent
func NewAsteriskAgent(cgrCfg *config.CGRConfig, astConnIdx int,
connMgr *engine.ConnManager) (*AsteriskAgent, error) {
connMgr *engine.ConnManager, caps *engine.Caps) (*AsteriskAgent, error) {
sma := &AsteriskAgent{
cgrCfg: cgrCfg,
astConnIdx: astConnIdx,
connMgr: connMgr,
caps: caps,
eventsCache: make(map[string]*utils.CGREvent),
}
srv, err := birpc.NewServiceWithMethodsRename(sma, utils.AgentV1, true, func(oldFn string) (newFn string) {
@@ -82,6 +83,7 @@ type ARIConnector interface {
type AsteriskAgent struct {
cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple
connMgr *engine.ConnManager
caps *engine.Caps
astConnIdx int
astConn ARIConnector
astEvChan chan map[string]any
@@ -173,6 +175,15 @@ func (sma *AsteriskAgent) hangupChannel(channelID, warnMsg string) {
}
func (sma *AsteriskAgent) handleStasisStart(ev *SMAsteriskEvent) {
if sma.caps.IsLimited() {
if err := sma.caps.Allocate(); err != nil {
sma.hangupChannel(ev.ChannelID(),
fmt.Sprintf("<%s> caps limit reached, rejecting channel %s: %v",
utils.AsteriskAgent, ev.ChannelID(), err))
return
}
defer sma.caps.Deallocate()
}
// Subscribe for channel updates even after we leave Stasis
if _, err := sma.astConn.Call(aringo.HTTP_POST,
fmt.Sprintf("applications/%s/subscription", CGRAuthAPP), map[string]string{"eventSource": fmt.Sprintf("channel:%s", ev.ChannelID())}, nil); err != nil {
@@ -272,6 +283,15 @@ func (sma *AsteriskAgent) handleChannelStateChange(ev *SMAsteriskEvent) {
if ev.ChannelState() != channelUp {
return
}
if sma.caps.IsLimited() {
if err := sma.caps.Allocate(); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> caps limit reached, rejecting state change for channel %s: %v",
utils.AsteriskAgent, ev.ChannelID(), err))
return
}
defer sma.caps.Deallocate()
}
sma.evCacheMux.RLock()
cgrEvDisp, hasIt := sma.eventsCache[ev.ChannelID()]
sma.evCacheMux.RUnlock()
@@ -313,6 +333,15 @@ func (sma *AsteriskAgent) handleChannelStateChange(ev *SMAsteriskEvent) {
// Channel disconnect
func (sma *AsteriskAgent) handleChannelDestroyed(ev *SMAsteriskEvent) {
if sma.caps.IsLimited() {
if err := sma.caps.Allocate(); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> caps limit reached, rejecting destroy for channel %s: %v",
utils.AsteriskAgent, ev.ChannelID(), err))
return
}
defer sma.caps.Deallocate()
}
chID := ev.ChannelID()
sma.evCacheMux.RLock()
cgrEvDisp, hasIt := sma.eventsCache[chID]

View File

@@ -45,7 +45,7 @@ func TestHandleChannelDestroyedFail(t *testing.T) {
cM := engine.NewConnManager(cfg, map[string]chan context.ClientConnector{
utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan,
})
sma, err := NewAsteriskAgent(cfg, 1, cM)
sma, err := NewAsteriskAgent(cfg, 1, cM, new(engine.Caps))
if err != nil {
t.Error(err)
}
@@ -132,7 +132,7 @@ func TestHandleChannelDestroyedCases(t *testing.T) {
cM := engine.NewConnManager(cfg, map[string]chan context.ClientConnector{
utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan,
})
sma, err := NewAsteriskAgent(cfg, 1, cM)
sma, err := NewAsteriskAgent(cfg, 1, cM, new(engine.Caps))
if err != nil {
t.Error(err)
}

View File

@@ -32,12 +32,13 @@ import (
// NewAsteriskAgent returns the Asterisk Agent
func NewAsteriskAgent(cfg *config.CGRConfig,
shdChan *utils.SyncedChan, connMgr *engine.ConnManager,
shdChan *utils.SyncedChan, connMgr *engine.ConnManager, caps *engine.Caps,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &AsteriskAgent{
cfg: cfg,
shdChan: shdChan,
connMgr: connMgr,
caps: caps,
srvDep: srvDep,
}
}
@@ -51,6 +52,7 @@ type AsteriskAgent struct {
smas []*agents.AsteriskAgent
connMgr *engine.ConnManager
caps *engine.Caps
srvDep map[string]*sync.WaitGroup
}
@@ -72,7 +74,7 @@ func (ast *AsteriskAgent) Start() (err error) {
ast.stopChan = make(chan struct{})
ast.smas = make([]*agents.AsteriskAgent, len(ast.cfg.AsteriskAgentCfg().AsteriskConns))
for connIdx := range ast.cfg.AsteriskAgentCfg().AsteriskConns { // Instantiate connections towards asterisk servers
ast.smas[connIdx], err = agents.NewAsteriskAgent(ast.cfg, connIdx, ast.connMgr)
ast.smas[connIdx], err = agents.NewAsteriskAgent(ast.cfg, connIdx, ast.connMgr, ast.caps)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed to initialize agent for connection %d, error: %s!", utils.AsteriskAgent, connIdx, err))
return err

View File

@@ -70,7 +70,7 @@ func TestAsteriskAgentReload(t *testing.T) {
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
cm, anz, srvDep)
astService := NewAsteriskAgent(cfg, shdChan, cm, srvDep)
astService := NewAsteriskAgent(cfg, shdChan, cm, nil, srvDep)
srvMngr.AddServices(astService, sS, db)
if err := srvMngr.StartServices(); err != nil {
t.Fatal(err)
@@ -142,7 +142,7 @@ func TestAsteriskAgentReload2(t *testing.T) {
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
cm, anz, srvDep)
astSrv := NewAsteriskAgent(cfg, shdChan, cm, srvDep)
astSrv := NewAsteriskAgent(cfg, shdChan, cm, nil, srvDep)
srvMngr.AddServices(astSrv, sS, db)
if err := srvMngr.StartServices(); err != nil {
t.Fatal(err)

View File

@@ -45,7 +45,7 @@ func TestAsteriskAgentCoverage(t *testing.T) {
cacheSChan := make(chan birpc.ClientConnector, 1)
cacheSChan <- cacheSrv
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
astSrv := NewAsteriskAgent(cfg, shdChan, nil, srvDep)
astSrv := NewAsteriskAgent(cfg, shdChan, nil, nil, srvDep)
if astSrv == nil {
t.Errorf("\nExpecting <nil>,\n Received <%+v>", utils.ToJSON(astSrv))
}
@@ -100,7 +100,7 @@ func TestAsteriskReload(t *testing.T) {
cacheSChan := make(chan birpc.ClientConnector, 1)
cacheSChan <- cacheSrv
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
astSrv := NewAsteriskAgent(cfg, shdChan, nil, srvDep)
astSrv := NewAsteriskAgent(cfg, shdChan, nil, nil, srvDep)
if astSrv == nil {
t.Errorf("\nExpecting <nil>,\n Received <%+v>", utils.ToJSON(astSrv))
}

View File

@@ -619,7 +619,7 @@ func RunCGREngine(args []string, hooks ...func(*config.CGRConfig) error) {
NewDNSAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep),
NewFreeswitchAgent(cfg, shdChan, connManager, caps, srvDep),
NewKamailioAgent(cfg, shdChan, connManager, caps, srvDep),
NewAsteriskAgent(cfg, shdChan, connManager, srvDep), // partial reload
NewAsteriskAgent(cfg, shdChan, connManager, caps, srvDep), // partial reload
NewRadiusAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep), // partial reload
NewDiameterAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep), // partial reload
NewHTTPAgent(cfg, filterSChan, server, connManager, caps, srvDep), // no reload