httpagent: add caps for concurrency limiting

This commit is contained in:
ionutboangiu
2026-02-18 18:48:56 +02:00
committed by Dan Christian Bogos
parent 92f8e8aed7
commit a022521922
7 changed files with 67 additions and 8 deletions

View File

@@ -22,6 +22,8 @@ package agents
import (
"fmt"
"io"
"net/http"
"sync"
"testing"
"time"
@@ -72,7 +74,17 @@ func TestAgentCapsIT(t *testing.T) {
"network": "udp"
}
]
}
},
"http_agent": [
{
"id": "caps_test",
"url": "/caps_test",
"sessions_conns": ["*internal"],
"request_payload": "*url",
"reply_payload": "*xml",
"request_processors": []
}
]
}`
ng := engine.TestEngine{
@@ -186,6 +198,26 @@ func TestAgentCapsIT(t *testing.T) {
<-doneCh
})
t.Run("HTTPAgent", func(t *testing.T) {
httpURL := fmt.Sprintf("http://%s/caps_test", cfg.ListenCfg().HTTPListen)
// There is currently no traffic. Expecting 200 OK because
// there are no request processors enabled (empty reply).
sendHTTPReq(t, httpURL, http.StatusOK)
// Caps limit is 2, therefore expecting the same result.
doneCh := simulateCapsTraffic(t, conn, 1, *cfg.CoreSCfg())
time.Sleep(time.Millisecond)
sendHTTPReq(t, httpURL, http.StatusOK)
<-doneCh
// With caps limit reached, 429 Too Many Requests is expected.
doneCh = simulateCapsTraffic(t, conn, 2, *cfg.CoreSCfg())
time.Sleep(time.Millisecond)
sendHTTPReq(t, httpURL, http.StatusTooManyRequests)
<-doneCh
})
}
func sendCCR(t *testing.T, client *DiameterClient, reqIdx *int, wantResultCode string) {
@@ -284,6 +316,19 @@ func writeDNSMsg(t *testing.T, conn *dns.Conn, wantRcode int) {
}
}
func sendHTTPReq(t *testing.T, url string, wantStatus int) {
t.Helper()
resp, err := http.Get(url)
if err != nil {
t.Fatal(err)
}
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
if resp.StatusCode != wantStatus {
t.Errorf("HTTP status=%d, want %d", resp.StatusCode, wantStatus)
}
}
func simulateCapsTraffic(t *testing.T, client *birpc.Client, amount int, coresCfg config.CoreSCfg) <-chan struct{} {
t.Helper()
var wg sync.WaitGroup

View File

@@ -32,7 +32,7 @@ import (
func NewHTTPAgent(connMgr *engine.ConnManager,
sessionConns, statsConns, thresholdsConns []string,
filterS *engine.FilterS, dfltTenant, reqPayload, rplyPayload string,
reqProcessors []*config.RequestProcessor) *HTTPAgent {
reqProcessors []*config.RequestProcessor, caps *engine.Caps) *HTTPAgent {
return &HTTPAgent{
connMgr: connMgr,
filterS: filterS,
@@ -43,6 +43,7 @@ func NewHTTPAgent(connMgr *engine.ConnManager,
sessionConns: sessionConns,
statsConns: statsConns,
thresholdsConns: thresholdsConns,
caps: caps,
}
}
@@ -57,10 +58,18 @@ type HTTPAgent struct {
sessionConns []string
statsConns []string
thresholdsConns []string
caps *engine.Caps
}
// ServeHTTP implements http.Handler interface
func (ha *HTTPAgent) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if ha.caps.IsLimited() {
if err := ha.caps.Allocate(); err != nil {
w.WriteHeader(http.StatusTooManyRequests)
return
}
defer ha.caps.Deallocate()
}
dcdr, err := newHADataProvider(ha.reqPayload, req) // dcdr will provide information from request
if err != nil {
utils.Logger.Warning(

View File

@@ -49,6 +49,7 @@ func TestNewHTTPAgent(t *testing.T) {
reqPayload,
rplyPayload,
reqProcessors,
nil,
)
if agent.connMgr != connMgr {
@@ -95,7 +96,9 @@ func TestNewHTTPAgent(t *testing.T) {
}
func TestHTTPAgentServeHTTP(t *testing.T) {
agent := &HTTPAgent{}
agent := &HTTPAgent{
caps: engine.NewCaps(0, ""),
}
req := httptest.NewRequest("GET", "http://cgrates.org", nil)
rr := httptest.NewRecorder()
agent.ServeHTTP(rr, req)

View File

@@ -622,7 +622,7 @@ func RunCGREngine(args []string, hooks ...func(*config.CGRConfig) error) {
NewAsteriskAgent(cfg, shdChan, connManager, srvDep), // partial reload
NewRadiusAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep), // partial reload
NewDiameterAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep), // partial reload
NewHTTPAgent(cfg, filterSChan, server, connManager, srvDep), // no reload
NewHTTPAgent(cfg, filterSChan, server, connManager, caps, srvDep), // no reload
NewPrometheusAgent(cfg, connManager, server, srvDep),
anz, dspS, dspH, dmService, storDBService,
NewEventExporterService(cfg, filterSChan,

View File

@@ -32,13 +32,14 @@ import (
// NewHTTPAgent returns the HTTP Agent
func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
server *cores.Server, connMgr *engine.ConnManager,
server *cores.Server, connMgr *engine.ConnManager, caps *engine.Caps,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &HTTPAgent{
cfg: cfg,
filterSChan: filterSChan,
server: server,
connMgr: connMgr,
caps: caps,
srvDep: srvDep,
}
}
@@ -54,6 +55,7 @@ type HTTPAgent struct {
// if we registerd the handlers
started bool
connMgr *engine.ConnManager
caps *engine.Caps
srvDep map[string]*sync.WaitGroup
}
@@ -74,7 +76,7 @@ func (ha *HTTPAgent) Start() (err error) {
agents.NewHTTPAgent(ha.connMgr,
agntCfg.SessionSConns, agntCfg.StatSConns, agntCfg.ThresholdSConns,
filterS, ha.cfg.GeneralCfg().DefaultTenant, agntCfg.RequestPayload,
agntCfg.ReplyPayload, agntCfg.RequestProcessors))
agntCfg.ReplyPayload, agntCfg.RequestProcessors, ha.caps))
}
ha.Unlock()
return

View File

@@ -64,7 +64,7 @@ func TestHTTPAgentReload(t *testing.T) {
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
nil, anz, srvDep)
srv := NewHTTPAgent(cfg, filterSChan, server, nil, srvDep)
srv := NewHTTPAgent(cfg, filterSChan, server, nil, nil, srvDep)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(srv, sS, db)
if err := srvMngr.StartServices(); err != nil {

View File

@@ -38,7 +38,7 @@ func TestHTTPAgentCoverage(t *testing.T) {
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
rpcInternal := map[string]chan birpc.ClientConnector{}
cM := engine.NewConnManager(cfg, rpcInternal)
srv := NewHTTPAgent(cfg, filterSChan, server, cM, srvDep)
srv := NewHTTPAgent(cfg, filterSChan, server, cM, nil, srvDep)
if srv == nil {
t.Errorf("\nExpecting <nil>,\n Received <%+v>", utils.ToJSON(srv))
}