diff --git a/agents/kamagent.go b/agents/kamagent.go index c7a6412fe..87a3901b1 100644 --- a/agents/kamagent.go +++ b/agents/kamagent.go @@ -45,11 +45,12 @@ var ( ) func NewKamailioAgent(kaCfg *config.KamAgentCfg, - connMgr *engine.ConnManager, timezone string) (*KamailioAgent, error) { + connMgr *engine.ConnManager, timezone string, caps *engine.Caps) (*KamailioAgent, error) { ka := &KamailioAgent{ cfg: kaCfg, connMgr: connMgr, timezone: timezone, + caps: caps, conns: make([]*kamevapi.KamEvapi, len(kaCfg.EvapiConns)), activeSessionIDs: make(chan []*sessions.SessionID), } @@ -67,6 +68,7 @@ type KamailioAgent struct { cfg *config.KamAgentCfg connMgr *engine.ConnManager timezone string + caps *engine.Caps conns []*kamevapi.KamEvapi activeSessionIDs chan []*sessions.SessionID ctx *context.Context @@ -118,6 +120,15 @@ func (self *KamailioAgent) Shutdown() (err error) { // onCgrAuth is called when new event of type CGR_AUTH_REQUEST is coming func (ka *KamailioAgent) onCgrAuth(evData []byte, connIdx int) { + if ka.caps.IsLimited() { + if err := ka.caps.Allocate(); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> caps limit reached, rejecting auth request: %v", + utils.KamailioAgent, err)) + return + } + defer ka.caps.Deallocate() + } if connIdx >= len(ka.conns) { // protection against index out of range panic err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx) utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error())) @@ -165,6 +176,15 @@ func (ka *KamailioAgent) onCgrAuth(evData []byte, connIdx int) { } func (ka *KamailioAgent) onCallStart(evData []byte, connIdx int) { + if ka.caps.IsLimited() { + if err := ka.caps.Allocate(); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> caps limit reached, rejecting call start: %v", + utils.KamailioAgent, err)) + return + } + defer ka.caps.Deallocate() + } if connIdx >= len(ka.conns) { // protection against index out of range panic err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx) utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error())) @@ -208,6 +228,15 @@ func (ka *KamailioAgent) onCallStart(evData []byte, connIdx int) { } func (ka *KamailioAgent) onCallEnd(evData []byte, connIdx int) { + if ka.caps.IsLimited() { + if err := ka.caps.Allocate(); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> caps limit reached, rejecting call end: %v", + utils.KamailioAgent, err)) + return + } + defer ka.caps.Deallocate() + } if connIdx >= len(ka.conns) { // protection against index out of range panic err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx) utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error())) @@ -281,6 +310,15 @@ func (ka *KamailioAgent) onDlgList(evData []byte, connIdx int) { } func (ka *KamailioAgent) onCgrProcessMessage(evData []byte, connIdx int) { + if ka.caps.IsLimited() { + if err := ka.caps.Allocate(); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> caps limit reached, rejecting process message: %v", + utils.KamailioAgent, err)) + return + } + defer ka.caps.Deallocate() + } if connIdx >= len(ka.conns) { // protection against index out of range panic err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx) utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error())) @@ -337,6 +375,15 @@ func (ka *KamailioAgent) onCgrProcessMessage(evData []byte, connIdx int) { } func (ka *KamailioAgent) onCgrProcessCDR(evData []byte, connIdx int) { + if ka.caps.IsLimited() { + if err := ka.caps.Allocate(); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> caps limit reached, rejecting process CDR: %v", + utils.KamailioAgent, err)) + return + } + defer ka.caps.Deallocate() + } if connIdx >= len(ka.conns) { // protection against index out of range panic err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx) utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error())) diff --git a/services/engine.go b/services/engine.go index 14f7ee527..cd1b3c7f0 100644 --- a/services/engine.go +++ b/services/engine.go @@ -618,7 +618,7 @@ func RunCGREngine(args []string, hooks ...func(*config.CGRConfig) error) { apiSv1, apiSv2, cdrS, smg, coreS, NewDNSAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep), NewFreeswitchAgent(cfg, shdChan, connManager, caps, srvDep), - NewKamailioAgent(cfg, shdChan, connManager, srvDep), + NewKamailioAgent(cfg, shdChan, connManager, caps, srvDep), NewAsteriskAgent(cfg, shdChan, connManager, srvDep), // partial reload NewRadiusAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep), // partial reload NewDiameterAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep), // partial reload diff --git a/services/kamailioagent.go b/services/kamailioagent.go index ac0e002b9..7a508a2a5 100644 --- a/services/kamailioagent.go +++ b/services/kamailioagent.go @@ -33,12 +33,13 @@ import ( // NewKamailioAgent returns the Kamailio Agent func NewKamailioAgent(cfg *config.CGRConfig, - shdChan *utils.SyncedChan, connMgr *engine.ConnManager, + shdChan *utils.SyncedChan, connMgr *engine.ConnManager, caps *engine.Caps, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &KamailioAgent{ cfg: cfg, shdChan: shdChan, connMgr: connMgr, + caps: caps, srvDep: srvDep, } } @@ -51,6 +52,7 @@ type KamailioAgent struct { kam *agents.KamailioAgent connMgr *engine.ConnManager + caps *engine.Caps srvDep map[string]*sync.WaitGroup } @@ -65,7 +67,7 @@ func (kam *KamailioAgent) Start() error { var err error kam.kam, err = agents.NewKamailioAgent(kam.cfg.KamAgentCfg(), kam.connMgr, - utils.FirstNonEmpty(kam.cfg.KamAgentCfg().Timezone, kam.cfg.GeneralCfg().DefaultTimezone)) + utils.FirstNonEmpty(kam.cfg.KamAgentCfg().Timezone, kam.cfg.GeneralCfg().DefaultTimezone), kam.caps) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to initialize agent, error: %s", utils.KamailioAgent, err)) return err diff --git a/services/kamailioagent_it_test.go b/services/kamailioagent_it_test.go index 4b1477944..3fc1feff5 100644 --- a/services/kamailioagent_it_test.go +++ b/services/kamailioagent_it_test.go @@ -75,7 +75,7 @@ func TestKamailioAgentReload(t *testing.T) { anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), cm, anz, srvDep) - srv := NewKamailioAgent(cfg, shdChan, cm, srvDep) + srv := NewKamailioAgent(cfg, shdChan, cm, nil, srvDep) srvMngr.AddServices(srv, sS, db) if err := srvMngr.StartServices(); err != nil { t.Fatal(err) @@ -106,7 +106,7 @@ func TestKamailioAgentReload(t *testing.T) { Timezone: "Local", } - srv.(*KamailioAgent).kam, err = agents.NewKamailioAgent(kaCfg, cm, "") + srv.(*KamailioAgent).kam, err = agents.NewKamailioAgent(kaCfg, cm, "", nil) if err != nil { t.Fatal(err) } @@ -128,7 +128,7 @@ func TestKamailioAgentReload2(t *testing.T) { filterSChan <- nil shdChan := utils.NewSyncedChan() srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewKamailioAgent(cfg, shdChan, nil, srvDep) + srv := NewKamailioAgent(cfg, shdChan, nil, nil, srvDep) srvKam := &agents.KamailioAgent{} if srv.IsRunning() { t.Fatalf("Expected service to be down") @@ -153,7 +153,7 @@ func TestKamailioAgentReload3(t *testing.T) { filterSChan <- nil shdChan := utils.NewSyncedChan() srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewKamailioAgent(cfg, shdChan, nil, srvDep) + srv := NewKamailioAgent(cfg, shdChan, nil, nil, srvDep) srvKam := &agents.KamailioAgent{} if srv.IsRunning() { t.Fatalf("Expected service to be down") diff --git a/services/kamailioagent_test.go b/services/kamailioagent_test.go index b54d76272..cc3b3091f 100644 --- a/services/kamailioagent_test.go +++ b/services/kamailioagent_test.go @@ -47,7 +47,7 @@ func TestKamailioAgentCoverage(t *testing.T) { cacheSChan := make(chan birpc.ClientConnector, 1) cacheSChan <- cacheSrv srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewKamailioAgent(cfg, shdChan, nil, srvDep) + srv := NewKamailioAgent(cfg, shdChan, nil, nil, srvDep) if srv.IsRunning() { t.Errorf("Expected service to be down") }