kamailioagent: add caps for concurrency limiting

This commit is contained in:
ionutboangiu
2026-02-23 21:29:13 +02:00
committed by Dan Christian Bogos
parent 94577c06c5
commit 2840a56e69
2 changed files with 55 additions and 4 deletions

View File

@@ -45,11 +45,12 @@ var (
)
func NewKamailioAgent(kaCfg *config.KamAgentCfg,
connMgr *engine.ConnManager, timezone string) (ka *KamailioAgent) {
connMgr *engine.ConnManager, timezone string, caps *engine.Caps) (ka *KamailioAgent) {
ka = &KamailioAgent{
cfg: kaCfg,
connMgr: connMgr,
timezone: timezone,
caps: caps,
conns: make([]*kamevapi.KamEvapi, len(kaCfg.EvapiConns)),
activeSessionIDs: make(chan []*sessions.SessionID),
}
@@ -62,6 +63,7 @@ type KamailioAgent struct {
cfg *config.KamAgentCfg
connMgr *engine.ConnManager
timezone string
caps *engine.Caps
conns []*kamevapi.KamEvapi
activeSessionIDs chan []*sessions.SessionID
ctx *context.Context
@@ -108,6 +110,15 @@ func (self *KamailioAgent) Shutdown() (err error) {
// onCgrAuth is called when new event of type CGR_AUTH_REQUEST is coming
func (ka *KamailioAgent) onCgrAuth(evData []byte, connIdx int) {
if ka.caps.IsLimited() {
if err := ka.caps.Allocate(); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> caps limit reached, rejecting auth request: %v",
utils.KamailioAgent, err))
return
}
defer ka.caps.Deallocate()
}
if connIdx >= len(ka.conns) { // protection against index out of range panic
err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx)
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error()))
@@ -153,6 +164,15 @@ func (ka *KamailioAgent) onCgrAuth(evData []byte, connIdx int) {
}
func (ka *KamailioAgent) onCallStart(evData []byte, connIdx int) {
if ka.caps.IsLimited() {
if err := ka.caps.Allocate(); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> caps limit reached, rejecting call start: %v",
utils.KamailioAgent, err))
return
}
defer ka.caps.Deallocate()
}
if connIdx >= len(ka.conns) { // protection against index out of range panic
err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx)
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error()))
@@ -193,6 +213,15 @@ func (ka *KamailioAgent) onCallStart(evData []byte, connIdx int) {
}
func (ka *KamailioAgent) onCallEnd(evData []byte, connIdx int) {
if ka.caps.IsLimited() {
if err := ka.caps.Allocate(); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> caps limit reached, rejecting call end: %v",
utils.KamailioAgent, err))
return
}
defer ka.caps.Deallocate()
}
if connIdx >= len(ka.conns) { // protection against index out of range panic
err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx)
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error()))
@@ -262,6 +291,15 @@ func (ka *KamailioAgent) onDlgList(evData []byte, connIdx int) {
}
func (ka *KamailioAgent) onCgrProcessMessage(evData []byte, connIdx int) {
if ka.caps.IsLimited() {
if err := ka.caps.Allocate(); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> caps limit reached, rejecting process message: %v",
utils.KamailioAgent, err))
return
}
defer ka.caps.Deallocate()
}
if connIdx >= len(ka.conns) { // protection against index out of range panic
err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx)
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error()))
@@ -316,6 +354,15 @@ func (ka *KamailioAgent) onCgrProcessMessage(evData []byte, connIdx int) {
}
func (ka *KamailioAgent) onCgrProcessCDR(evData []byte, connIdx int) {
if ka.caps.IsLimited() {
if err := ka.caps.Allocate(); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> caps limit reached, rejecting process CDR: %v",
utils.KamailioAgent, err))
return
}
defer ka.caps.Deallocate()
}
if connIdx >= len(ka.conns) { // protection against index out of range panic
err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx)
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error()))

View File

@@ -47,16 +47,20 @@ type KamailioAgent struct {
// Start should handle the sercive start
func (kam *KamailioAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
cms, err := WaitForServiceState(utils.StateServiceUP, utils.ConnManager, registry, kam.cfg.GeneralCfg().ConnectTimeout)
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{utils.ConnManager, utils.CapS},
registry, kam.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return
}
cm := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager()
caps := srvDeps[utils.CapS].(*CapService).Caps()
kam.Lock()
defer kam.Unlock()
kam.kam = agents.NewKamailioAgent(kam.cfg.KamAgentCfg(), cms.(*ConnManagerService).ConnManager(),
utils.FirstNonEmpty(kam.cfg.KamAgentCfg().Timezone, kam.cfg.GeneralCfg().DefaultTimezone))
kam.kam = agents.NewKamailioAgent(kam.cfg.KamAgentCfg(), cm,
utils.FirstNonEmpty(kam.cfg.KamAgentCfg().Timezone, kam.cfg.GeneralCfg().DefaultTimezone), caps)
go kam.connect(kam.kam, shutdown)
return