diff --git a/agents/agents_caps_it_test.go b/agents/agents_caps_it_test.go index c171beb8c..d69ce9fe4 100644 --- a/agents/agents_caps_it_test.go +++ b/agents/agents_caps_it_test.go @@ -22,6 +22,8 @@ package agents import ( "fmt" + "io" + "net/http" "sync" "testing" "time" @@ -72,7 +74,17 @@ func TestAgentCapsIT(t *testing.T) { "network": "udp" } ] -} +}, +"http_agent": [ + { + "id": "caps_test", + "url": "/caps_test", + "sessions_conns": ["*internal"], + "request_payload": "*url", + "reply_payload": "*xml", + "request_processors": [] + } +] }` ng := engine.TestEngine{ @@ -186,6 +198,26 @@ func TestAgentCapsIT(t *testing.T) { <-doneCh }) + + t.Run("HTTPAgent", func(t *testing.T) { + httpURL := fmt.Sprintf("http://%s/caps_test", cfg.ListenCfg().HTTPListen) + + // There is currently no traffic. Expecting 200 OK because + // there are no request processors enabled (empty reply). + sendHTTPReq(t, httpURL, http.StatusOK) + + // Caps limit is 2, therefore expecting the same result. + doneCh := simulateCapsTraffic(t, conn, 1, *cfg.CoreSCfg()) + time.Sleep(time.Millisecond) + sendHTTPReq(t, httpURL, http.StatusOK) + <-doneCh + + // With caps limit reached, 429 Too Many Requests is expected. + doneCh = simulateCapsTraffic(t, conn, 2, *cfg.CoreSCfg()) + time.Sleep(time.Millisecond) + sendHTTPReq(t, httpURL, http.StatusTooManyRequests) + <-doneCh + }) } func sendCCR(t *testing.T, client *DiameterClient, reqIdx *int, wantResultCode string) { @@ -284,6 +316,19 @@ func writeDNSMsg(t *testing.T, conn *dns.Conn, wantRcode int) { } } +func sendHTTPReq(t *testing.T, url string, wantStatus int) { + t.Helper() + resp, err := http.Get(url) + if err != nil { + t.Fatal(err) + } + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + if resp.StatusCode != wantStatus { + t.Errorf("HTTP status=%d, want %d", resp.StatusCode, wantStatus) + } +} + func simulateCapsTraffic(t *testing.T, client *birpc.Client, amount int, coresCfg config.CoreSCfg) <-chan struct{} { t.Helper() var wg sync.WaitGroup diff --git a/agents/httpagent.go b/agents/httpagent.go index be5bc2eae..d8d85f5cd 100644 --- a/agents/httpagent.go +++ b/agents/httpagent.go @@ -32,7 +32,7 @@ import ( func NewHTTPAgent(connMgr *engine.ConnManager, sessionConns, statsConns, thresholdsConns []string, filterS *engine.FilterS, dfltTenant, reqPayload, rplyPayload string, - reqProcessors []*config.RequestProcessor) *HTTPAgent { + reqProcessors []*config.RequestProcessor, caps *engine.Caps) *HTTPAgent { return &HTTPAgent{ connMgr: connMgr, filterS: filterS, @@ -43,6 +43,7 @@ func NewHTTPAgent(connMgr *engine.ConnManager, sessionConns: sessionConns, statsConns: statsConns, thresholdsConns: thresholdsConns, + caps: caps, } } @@ -57,10 +58,18 @@ type HTTPAgent struct { sessionConns []string statsConns []string thresholdsConns []string + caps *engine.Caps } // ServeHTTP implements http.Handler interface func (ha *HTTPAgent) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if ha.caps.IsLimited() { + if err := ha.caps.Allocate(); err != nil { + w.WriteHeader(http.StatusTooManyRequests) + return + } + defer ha.caps.Deallocate() + } dcdr, err := newHADataProvider(ha.reqPayload, req) // dcdr will provide information from request if err != nil { utils.Logger.Warning( diff --git a/agents/httpagent_test.go b/agents/httpagent_test.go index 2eb33ec25..8c259c8a3 100644 --- a/agents/httpagent_test.go +++ b/agents/httpagent_test.go @@ -49,6 +49,7 @@ func TestNewHTTPAgent(t *testing.T) { reqPayload, rplyPayload, reqProcessors, + nil, ) if agent.connMgr != connMgr { @@ -95,7 +96,9 @@ func TestNewHTTPAgent(t *testing.T) { } func TestHTTPAgentServeHTTP(t *testing.T) { - agent := &HTTPAgent{} + agent := &HTTPAgent{ + caps: engine.NewCaps(0, ""), + } req := httptest.NewRequest("GET", "http://cgrates.org", nil) rr := httptest.NewRecorder() agent.ServeHTTP(rr, req) diff --git a/services/engine.go b/services/engine.go index bcafff278..77b2581e7 100644 --- a/services/engine.go +++ b/services/engine.go @@ -622,7 +622,7 @@ func RunCGREngine(args []string, hooks ...func(*config.CGRConfig) error) { NewAsteriskAgent(cfg, shdChan, connManager, srvDep), // partial reload NewRadiusAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep), // partial reload NewDiameterAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep), // partial reload - NewHTTPAgent(cfg, filterSChan, server, connManager, srvDep), // no reload + NewHTTPAgent(cfg, filterSChan, server, connManager, caps, srvDep), // no reload NewPrometheusAgent(cfg, connManager, server, srvDep), anz, dspS, dspH, dmService, storDBService, NewEventExporterService(cfg, filterSChan, diff --git a/services/httpagent.go b/services/httpagent.go index 02a3e003e..e6a5a8f4e 100644 --- a/services/httpagent.go +++ b/services/httpagent.go @@ -32,13 +32,14 @@ import ( // NewHTTPAgent returns the HTTP Agent func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - server *cores.Server, connMgr *engine.ConnManager, + server *cores.Server, connMgr *engine.ConnManager, caps *engine.Caps, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &HTTPAgent{ cfg: cfg, filterSChan: filterSChan, server: server, connMgr: connMgr, + caps: caps, srvDep: srvDep, } } @@ -54,6 +55,7 @@ type HTTPAgent struct { // if we registerd the handlers started bool connMgr *engine.ConnManager + caps *engine.Caps srvDep map[string]*sync.WaitGroup } @@ -74,7 +76,7 @@ func (ha *HTTPAgent) Start() (err error) { agents.NewHTTPAgent(ha.connMgr, agntCfg.SessionSConns, agntCfg.StatSConns, agntCfg.ThresholdSConns, filterS, ha.cfg.GeneralCfg().DefaultTenant, agntCfg.RequestPayload, - agntCfg.ReplyPayload, agntCfg.RequestProcessors)) + agntCfg.ReplyPayload, agntCfg.RequestProcessors, ha.caps)) } ha.Unlock() return diff --git a/services/httpagent_it_test.go b/services/httpagent_it_test.go index a4b5d63d9..3a3119a4a 100644 --- a/services/httpagent_it_test.go +++ b/services/httpagent_it_test.go @@ -64,7 +64,7 @@ func TestHTTPAgentReload(t *testing.T) { anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep) - srv := NewHTTPAgent(cfg, filterSChan, server, nil, srvDep) + srv := NewHTTPAgent(cfg, filterSChan, server, nil, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, db) if err := srvMngr.StartServices(); err != nil { diff --git a/services/httpagent_test.go b/services/httpagent_test.go index 98e753cae..4d3257efe 100644 --- a/services/httpagent_test.go +++ b/services/httpagent_test.go @@ -38,7 +38,7 @@ func TestHTTPAgentCoverage(t *testing.T) { srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} rpcInternal := map[string]chan birpc.ClientConnector{} cM := engine.NewConnManager(cfg, rpcInternal) - srv := NewHTTPAgent(cfg, filterSChan, server, cM, srvDep) + srv := NewHTTPAgent(cfg, filterSChan, server, cM, nil, srvDep) if srv == nil { t.Errorf("\nExpecting ,\n Received <%+v>", utils.ToJSON(srv)) }