mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-25 09:08:45 +05:00
httpagent: add caps for concurrency limiting
This commit is contained in:
committed by
Dan Christian Bogos
parent
7eb4d0a846
commit
fc92654ad4
@@ -32,7 +32,7 @@ import (
|
||||
func NewHTTPAgent(connMgr *engine.ConnManager,
|
||||
sessionConns, statsConns, thresholdsConns []string,
|
||||
filterS *engine.FilterS, dfltTenant, reqPayload, rplyPayload string,
|
||||
reqProcessors []*config.RequestProcessor) *HTTPAgent {
|
||||
reqProcessors []*config.RequestProcessor, caps *engine.Caps) *HTTPAgent {
|
||||
return &HTTPAgent{
|
||||
connMgr: connMgr,
|
||||
filterS: filterS,
|
||||
@@ -43,6 +43,7 @@ func NewHTTPAgent(connMgr *engine.ConnManager,
|
||||
sessionConns: sessionConns,
|
||||
statsConns: statsConns,
|
||||
thresholdsConns: thresholdsConns,
|
||||
caps: caps,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,10 +58,18 @@ type HTTPAgent struct {
|
||||
sessionConns []string
|
||||
statsConns []string
|
||||
thresholdsConns []string
|
||||
caps *engine.Caps
|
||||
}
|
||||
|
||||
// ServeHTTP implements http.Handler interface
|
||||
func (ha *HTTPAgent) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
if ha.caps.IsLimited() {
|
||||
if err := ha.caps.Allocate(); err != nil {
|
||||
w.WriteHeader(http.StatusTooManyRequests)
|
||||
return
|
||||
}
|
||||
defer ha.caps.Deallocate()
|
||||
}
|
||||
dcdr, err := newHADataProvider(ha.reqPayload, req) // dcdr will provide information from request
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
|
||||
@@ -54,6 +54,7 @@ func (ha *HTTPAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceReg
|
||||
utils.CommonListenerS,
|
||||
utils.ConnManager,
|
||||
utils.FilterS,
|
||||
utils.CapS,
|
||||
},
|
||||
registry, ha.cfg.GeneralCfg().ConnectTimeout)
|
||||
if err != nil {
|
||||
@@ -62,6 +63,7 @@ func (ha *HTTPAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceReg
|
||||
cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
|
||||
cm := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager()
|
||||
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
|
||||
caps := srvDeps[utils.CapS].(*CapService).Caps()
|
||||
|
||||
ha.mu.Lock()
|
||||
defer ha.mu.Unlock()
|
||||
@@ -71,7 +73,7 @@ func (ha *HTTPAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceReg
|
||||
cl.RegisterHttpHandler(agntCfg.URL,
|
||||
agents.NewHTTPAgent(cm, agntCfg.SessionSConns, agntCfg.StatSConns, agntCfg.ThresholdSConns,
|
||||
fs, ha.cfg.GeneralCfg().DefaultTenant, agntCfg.RequestPayload, agntCfg.ReplyPayload,
|
||||
agntCfg.RequestProcessors))
|
||||
agntCfg.RequestProcessors, caps))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user