kamailioagent: add caps for concurrency limiting

This commit is contained in:
ionutboangiu
2026-02-18 18:53:36 +02:00
committed by Dan Christian Bogos
parent 646557e412
commit 73bc9603da
5 changed files with 58 additions and 9 deletions

View File

@@ -45,11 +45,12 @@ var (
)
func NewKamailioAgent(kaCfg *config.KamAgentCfg,
connMgr *engine.ConnManager, timezone string) (*KamailioAgent, error) {
connMgr *engine.ConnManager, timezone string, caps *engine.Caps) (*KamailioAgent, error) {
ka := &KamailioAgent{
cfg: kaCfg,
connMgr: connMgr,
timezone: timezone,
caps: caps,
conns: make([]*kamevapi.KamEvapi, len(kaCfg.EvapiConns)),
activeSessionIDs: make(chan []*sessions.SessionID),
}
@@ -67,6 +68,7 @@ type KamailioAgent struct {
cfg *config.KamAgentCfg
connMgr *engine.ConnManager
timezone string
caps *engine.Caps
conns []*kamevapi.KamEvapi
activeSessionIDs chan []*sessions.SessionID
ctx *context.Context
@@ -118,6 +120,15 @@ func (self *KamailioAgent) Shutdown() (err error) {
// onCgrAuth is called when new event of type CGR_AUTH_REQUEST is coming
func (ka *KamailioAgent) onCgrAuth(evData []byte, connIdx int) {
if ka.caps.IsLimited() {
if err := ka.caps.Allocate(); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> caps limit reached, rejecting auth request: %v",
utils.KamailioAgent, err))
return
}
defer ka.caps.Deallocate()
}
if connIdx >= len(ka.conns) { // protection against index out of range panic
err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx)
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error()))
@@ -165,6 +176,15 @@ func (ka *KamailioAgent) onCgrAuth(evData []byte, connIdx int) {
}
func (ka *KamailioAgent) onCallStart(evData []byte, connIdx int) {
if ka.caps.IsLimited() {
if err := ka.caps.Allocate(); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> caps limit reached, rejecting call start: %v",
utils.KamailioAgent, err))
return
}
defer ka.caps.Deallocate()
}
if connIdx >= len(ka.conns) { // protection against index out of range panic
err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx)
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error()))
@@ -208,6 +228,15 @@ func (ka *KamailioAgent) onCallStart(evData []byte, connIdx int) {
}
func (ka *KamailioAgent) onCallEnd(evData []byte, connIdx int) {
if ka.caps.IsLimited() {
if err := ka.caps.Allocate(); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> caps limit reached, rejecting call end: %v",
utils.KamailioAgent, err))
return
}
defer ka.caps.Deallocate()
}
if connIdx >= len(ka.conns) { // protection against index out of range panic
err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx)
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error()))
@@ -281,6 +310,15 @@ func (ka *KamailioAgent) onDlgList(evData []byte, connIdx int) {
}
func (ka *KamailioAgent) onCgrProcessMessage(evData []byte, connIdx int) {
if ka.caps.IsLimited() {
if err := ka.caps.Allocate(); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> caps limit reached, rejecting process message: %v",
utils.KamailioAgent, err))
return
}
defer ka.caps.Deallocate()
}
if connIdx >= len(ka.conns) { // protection against index out of range panic
err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx)
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error()))
@@ -337,6 +375,15 @@ func (ka *KamailioAgent) onCgrProcessMessage(evData []byte, connIdx int) {
}
func (ka *KamailioAgent) onCgrProcessCDR(evData []byte, connIdx int) {
if ka.caps.IsLimited() {
if err := ka.caps.Allocate(); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> caps limit reached, rejecting process CDR: %v",
utils.KamailioAgent, err))
return
}
defer ka.caps.Deallocate()
}
if connIdx >= len(ka.conns) { // protection against index out of range panic
err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx)
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error()))

View File

@@ -618,7 +618,7 @@ func RunCGREngine(args []string, hooks ...func(*config.CGRConfig) error) {
apiSv1, apiSv2, cdrS, smg, coreS,
NewDNSAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep),
NewFreeswitchAgent(cfg, shdChan, connManager, caps, srvDep),
NewKamailioAgent(cfg, shdChan, connManager, srvDep),
NewKamailioAgent(cfg, shdChan, connManager, caps, srvDep),
NewAsteriskAgent(cfg, shdChan, connManager, srvDep), // partial reload
NewRadiusAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep), // partial reload
NewDiameterAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep), // partial reload

View File

@@ -33,12 +33,13 @@ import (
// NewKamailioAgent returns the Kamailio Agent
func NewKamailioAgent(cfg *config.CGRConfig,
shdChan *utils.SyncedChan, connMgr *engine.ConnManager,
shdChan *utils.SyncedChan, connMgr *engine.ConnManager, caps *engine.Caps,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &KamailioAgent{
cfg: cfg,
shdChan: shdChan,
connMgr: connMgr,
caps: caps,
srvDep: srvDep,
}
}
@@ -51,6 +52,7 @@ type KamailioAgent struct {
kam *agents.KamailioAgent
connMgr *engine.ConnManager
caps *engine.Caps
srvDep map[string]*sync.WaitGroup
}
@@ -65,7 +67,7 @@ func (kam *KamailioAgent) Start() error {
var err error
kam.kam, err = agents.NewKamailioAgent(kam.cfg.KamAgentCfg(), kam.connMgr,
utils.FirstNonEmpty(kam.cfg.KamAgentCfg().Timezone, kam.cfg.GeneralCfg().DefaultTimezone))
utils.FirstNonEmpty(kam.cfg.KamAgentCfg().Timezone, kam.cfg.GeneralCfg().DefaultTimezone), kam.caps)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed to initialize agent, error: %s", utils.KamailioAgent, err))
return err

View File

@@ -75,7 +75,7 @@ func TestKamailioAgentReload(t *testing.T) {
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
cm, anz, srvDep)
srv := NewKamailioAgent(cfg, shdChan, cm, srvDep)
srv := NewKamailioAgent(cfg, shdChan, cm, nil, srvDep)
srvMngr.AddServices(srv, sS, db)
if err := srvMngr.StartServices(); err != nil {
t.Fatal(err)
@@ -106,7 +106,7 @@ func TestKamailioAgentReload(t *testing.T) {
Timezone: "Local",
}
srv.(*KamailioAgent).kam, err = agents.NewKamailioAgent(kaCfg, cm, "")
srv.(*KamailioAgent).kam, err = agents.NewKamailioAgent(kaCfg, cm, "", nil)
if err != nil {
t.Fatal(err)
}
@@ -128,7 +128,7 @@ func TestKamailioAgentReload2(t *testing.T) {
filterSChan <- nil
shdChan := utils.NewSyncedChan()
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
srv := NewKamailioAgent(cfg, shdChan, nil, srvDep)
srv := NewKamailioAgent(cfg, shdChan, nil, nil, srvDep)
srvKam := &agents.KamailioAgent{}
if srv.IsRunning() {
t.Fatalf("Expected service to be down")
@@ -153,7 +153,7 @@ func TestKamailioAgentReload3(t *testing.T) {
filterSChan <- nil
shdChan := utils.NewSyncedChan()
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
srv := NewKamailioAgent(cfg, shdChan, nil, srvDep)
srv := NewKamailioAgent(cfg, shdChan, nil, nil, srvDep)
srvKam := &agents.KamailioAgent{}
if srv.IsRunning() {
t.Fatalf("Expected service to be down")

View File

@@ -47,7 +47,7 @@ func TestKamailioAgentCoverage(t *testing.T) {
cacheSChan := make(chan birpc.ClientConnector, 1)
cacheSChan <- cacheSrv
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
srv := NewKamailioAgent(cfg, shdChan, nil, srvDep)
srv := NewKamailioAgent(cfg, shdChan, nil, nil, srvDep)
if srv.IsRunning() {
t.Errorf("Expected service to be down")
}