From 421ef0e78dfe1529b0bcc6b6dc77cbb71419f8ee Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 18 Feb 2026 18:54:07 +0200 Subject: [PATCH] asteriskagent: add caps for concurrency limiting --- agents/astagent.go | 31 ++++++++++++++++++++++++++++++- agents/astagent_test.go | 4 ++-- services/asteriskagent.go | 6 ++++-- services/asteriskagent_it_test.go | 4 ++-- services/asteriskagent_test.go | 4 ++-- services/engine.go | 2 +- 6 files changed, 41 insertions(+), 10 deletions(-) diff --git a/agents/astagent.go b/agents/astagent.go index 458b3bfc1..47441888f 100644 --- a/agents/astagent.go +++ b/agents/astagent.go @@ -56,11 +56,12 @@ const ( // NewAsteriskAgent constructs a new Asterisk Agent func NewAsteriskAgent(cgrCfg *config.CGRConfig, astConnIdx int, - connMgr *engine.ConnManager) (*AsteriskAgent, error) { + connMgr *engine.ConnManager, caps *engine.Caps) (*AsteriskAgent, error) { sma := &AsteriskAgent{ cgrCfg: cgrCfg, astConnIdx: astConnIdx, connMgr: connMgr, + caps: caps, eventsCache: make(map[string]*utils.CGREvent), } srv, err := birpc.NewServiceWithMethodsRename(sma, utils.AgentV1, true, func(oldFn string) (newFn string) { @@ -82,6 +83,7 @@ type ARIConnector interface { type AsteriskAgent struct { cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple connMgr *engine.ConnManager + caps *engine.Caps astConnIdx int astConn ARIConnector astEvChan chan map[string]any @@ -173,6 +175,15 @@ func (sma *AsteriskAgent) hangupChannel(channelID, warnMsg string) { } func (sma *AsteriskAgent) handleStasisStart(ev *SMAsteriskEvent) { + if sma.caps.IsLimited() { + if err := sma.caps.Allocate(); err != nil { + sma.hangupChannel(ev.ChannelID(), + fmt.Sprintf("<%s> caps limit reached, rejecting channel %s: %v", + utils.AsteriskAgent, ev.ChannelID(), err)) + return + } + defer sma.caps.Deallocate() + } // Subscribe for channel updates even after we leave Stasis if _, err := sma.astConn.Call(aringo.HTTP_POST, fmt.Sprintf("applications/%s/subscription", CGRAuthAPP), map[string]string{"eventSource": fmt.Sprintf("channel:%s", ev.ChannelID())}, nil); err != nil { @@ -272,6 +283,15 @@ func (sma *AsteriskAgent) handleChannelStateChange(ev *SMAsteriskEvent) { if ev.ChannelState() != channelUp { return } + if sma.caps.IsLimited() { + if err := sma.caps.Allocate(); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> caps limit reached, rejecting state change for channel %s: %v", + utils.AsteriskAgent, ev.ChannelID(), err)) + return + } + defer sma.caps.Deallocate() + } sma.evCacheMux.RLock() cgrEvDisp, hasIt := sma.eventsCache[ev.ChannelID()] sma.evCacheMux.RUnlock() @@ -313,6 +333,15 @@ func (sma *AsteriskAgent) handleChannelStateChange(ev *SMAsteriskEvent) { // Channel disconnect func (sma *AsteriskAgent) handleChannelDestroyed(ev *SMAsteriskEvent) { + if sma.caps.IsLimited() { + if err := sma.caps.Allocate(); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> caps limit reached, rejecting destroy for channel %s: %v", + utils.AsteriskAgent, ev.ChannelID(), err)) + return + } + defer sma.caps.Deallocate() + } chID := ev.ChannelID() sma.evCacheMux.RLock() cgrEvDisp, hasIt := sma.eventsCache[chID] diff --git a/agents/astagent_test.go b/agents/astagent_test.go index b2dd19129..81bb4a7d1 100644 --- a/agents/astagent_test.go +++ b/agents/astagent_test.go @@ -45,7 +45,7 @@ func TestHandleChannelDestroyedFail(t *testing.T) { cM := engine.NewConnManager(cfg, map[string]chan context.ClientConnector{ utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan, }) - sma, err := NewAsteriskAgent(cfg, 1, cM) + sma, err := NewAsteriskAgent(cfg, 1, cM, new(engine.Caps)) if err != nil { t.Error(err) } @@ -132,7 +132,7 @@ func TestHandleChannelDestroyedCases(t *testing.T) { cM := engine.NewConnManager(cfg, map[string]chan context.ClientConnector{ utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan, }) - sma, err := NewAsteriskAgent(cfg, 1, cM) + sma, err := NewAsteriskAgent(cfg, 1, cM, new(engine.Caps)) if err != nil { t.Error(err) } diff --git a/services/asteriskagent.go b/services/asteriskagent.go index c7cf5c383..f16d4896a 100644 --- a/services/asteriskagent.go +++ b/services/asteriskagent.go @@ -32,12 +32,13 @@ import ( // NewAsteriskAgent returns the Asterisk Agent func NewAsteriskAgent(cfg *config.CGRConfig, - shdChan *utils.SyncedChan, connMgr *engine.ConnManager, + shdChan *utils.SyncedChan, connMgr *engine.ConnManager, caps *engine.Caps, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &AsteriskAgent{ cfg: cfg, shdChan: shdChan, connMgr: connMgr, + caps: caps, srvDep: srvDep, } } @@ -51,6 +52,7 @@ type AsteriskAgent struct { smas []*agents.AsteriskAgent connMgr *engine.ConnManager + caps *engine.Caps srvDep map[string]*sync.WaitGroup } @@ -72,7 +74,7 @@ func (ast *AsteriskAgent) Start() (err error) { ast.stopChan = make(chan struct{}) ast.smas = make([]*agents.AsteriskAgent, len(ast.cfg.AsteriskAgentCfg().AsteriskConns)) for connIdx := range ast.cfg.AsteriskAgentCfg().AsteriskConns { // Instantiate connections towards asterisk servers - ast.smas[connIdx], err = agents.NewAsteriskAgent(ast.cfg, connIdx, ast.connMgr) + ast.smas[connIdx], err = agents.NewAsteriskAgent(ast.cfg, connIdx, ast.connMgr, ast.caps) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to initialize agent for connection %d, error: %s!", utils.AsteriskAgent, connIdx, err)) return err diff --git a/services/asteriskagent_it_test.go b/services/asteriskagent_it_test.go index 3c80bf135..32e9764f9 100644 --- a/services/asteriskagent_it_test.go +++ b/services/asteriskagent_it_test.go @@ -70,7 +70,7 @@ func TestAsteriskAgentReload(t *testing.T) { anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), cm, anz, srvDep) - astService := NewAsteriskAgent(cfg, shdChan, cm, srvDep) + astService := NewAsteriskAgent(cfg, shdChan, cm, nil, srvDep) srvMngr.AddServices(astService, sS, db) if err := srvMngr.StartServices(); err != nil { t.Fatal(err) @@ -142,7 +142,7 @@ func TestAsteriskAgentReload2(t *testing.T) { anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), cm, anz, srvDep) - astSrv := NewAsteriskAgent(cfg, shdChan, cm, srvDep) + astSrv := NewAsteriskAgent(cfg, shdChan, cm, nil, srvDep) srvMngr.AddServices(astSrv, sS, db) if err := srvMngr.StartServices(); err != nil { t.Fatal(err) diff --git a/services/asteriskagent_test.go b/services/asteriskagent_test.go index 0d5ff5f35..181b07ae8 100644 --- a/services/asteriskagent_test.go +++ b/services/asteriskagent_test.go @@ -45,7 +45,7 @@ func TestAsteriskAgentCoverage(t *testing.T) { cacheSChan := make(chan birpc.ClientConnector, 1) cacheSChan <- cacheSrv srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - astSrv := NewAsteriskAgent(cfg, shdChan, nil, srvDep) + astSrv := NewAsteriskAgent(cfg, shdChan, nil, nil, srvDep) if astSrv == nil { t.Errorf("\nExpecting ,\n Received <%+v>", utils.ToJSON(astSrv)) } @@ -100,7 +100,7 @@ func TestAsteriskReload(t *testing.T) { cacheSChan := make(chan birpc.ClientConnector, 1) cacheSChan <- cacheSrv srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - astSrv := NewAsteriskAgent(cfg, shdChan, nil, srvDep) + astSrv := NewAsteriskAgent(cfg, shdChan, nil, nil, srvDep) if astSrv == nil { t.Errorf("\nExpecting ,\n Received <%+v>", utils.ToJSON(astSrv)) } diff --git a/services/engine.go b/services/engine.go index cd1b3c7f0..975e29208 100644 --- a/services/engine.go +++ b/services/engine.go @@ -619,7 +619,7 @@ func RunCGREngine(args []string, hooks ...func(*config.CGRConfig) error) { NewDNSAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep), NewFreeswitchAgent(cfg, shdChan, connManager, caps, srvDep), NewKamailioAgent(cfg, shdChan, connManager, caps, srvDep), - NewAsteriskAgent(cfg, shdChan, connManager, srvDep), // partial reload + NewAsteriskAgent(cfg, shdChan, connManager, caps, srvDep), // partial reload NewRadiusAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep), // partial reload NewDiameterAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep), // partial reload NewHTTPAgent(cfg, filterSChan, server, connManager, caps, srvDep), // no reload