mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-20 22:58:44 +05:00
Improve concurrent request mechanish
This commit is contained in:
@@ -25,16 +25,16 @@ import (
|
||||
var ConReqs *ConcReqs
|
||||
|
||||
type ConcReqs struct {
|
||||
aReqs chan struct{}
|
||||
nAReqs int
|
||||
limit int
|
||||
strategy string
|
||||
aReqs chan struct{}
|
||||
}
|
||||
|
||||
func NewConReqs(reqs int, strategy string) *ConcReqs {
|
||||
cR := &ConcReqs{
|
||||
aReqs: make(chan struct{}, reqs),
|
||||
nAReqs: reqs,
|
||||
limit: reqs,
|
||||
strategy: strategy,
|
||||
aReqs: make(chan struct{}, reqs),
|
||||
}
|
||||
for i := 0; i < reqs; i++ {
|
||||
cR.aReqs <- struct{}{}
|
||||
@@ -45,8 +45,7 @@ func NewConReqs(reqs int, strategy string) *ConcReqs {
|
||||
var errDeny = fmt.Errorf("denying request due to maximum active requests reached")
|
||||
|
||||
func (cR *ConcReqs) Allocate() (err error) {
|
||||
fmt.Println("ENTER IN ALLOCATE ")
|
||||
if cR.nAReqs == 0 {
|
||||
if cR.limit == 0 {
|
||||
return
|
||||
}
|
||||
switch cR.strategy {
|
||||
@@ -61,8 +60,8 @@ func (cR *ConcReqs) Allocate() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (cR *ConcReqs) Deallocate(errStr string) {
|
||||
if cR.nAReqs == 0 || errStr == errDeny.Error() { // in case we receive denying request we don't need to put back the slot on channel because we returned error without getting it
|
||||
func (cR *ConcReqs) Deallocate() {
|
||||
if cR.limit == 0 {
|
||||
return
|
||||
}
|
||||
cR.aReqs <- struct{}{}
|
||||
|
||||
@@ -28,11 +28,12 @@ import (
|
||||
)
|
||||
|
||||
type concReqsGobServerCodec struct {
|
||||
rwc io.ReadWriteCloser
|
||||
dec *gob.Decoder
|
||||
enc *gob.Encoder
|
||||
encBuf *bufio.Writer
|
||||
closed bool
|
||||
rwc io.ReadWriteCloser
|
||||
dec *gob.Decoder
|
||||
enc *gob.Encoder
|
||||
encBuf *bufio.Writer
|
||||
closed bool
|
||||
allocated bool // populated if we have allocated a channel for concurrent requests
|
||||
}
|
||||
|
||||
func NewConcReqsGobServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec {
|
||||
@@ -53,11 +54,17 @@ func (c *concReqsGobServerCodec) ReadRequestBody(body interface{}) error {
|
||||
if err := ConReqs.Allocate(); err != nil {
|
||||
return err
|
||||
}
|
||||
c.allocated = true
|
||||
return c.dec.Decode(body)
|
||||
}
|
||||
|
||||
func (c *concReqsGobServerCodec) WriteResponse(r *rpc.Response, body interface{}) (err error) {
|
||||
defer ConReqs.Deallocate(r.Error)
|
||||
if c.allocated {
|
||||
defer func() {
|
||||
ConReqs.Deallocate()
|
||||
c.allocated = false
|
||||
}()
|
||||
}
|
||||
if err = c.enc.Encode(r); err != nil {
|
||||
if c.encBuf.Flush() == nil {
|
||||
// Gob couldn't encode the header. Should not happen, so if it does,
|
||||
|
||||
@@ -44,6 +44,8 @@ type concReqsServerCodec struct {
|
||||
mutex sync.Mutex // protects seq, pending
|
||||
seq uint64
|
||||
pending map[uint64]*json.RawMessage
|
||||
|
||||
allocated bool // populated if we have allocated a channel for concurrent requests
|
||||
}
|
||||
|
||||
// NewConcReqsServerCodec returns a new rpc.ServerCodec using JSON-RPC on conn.
|
||||
@@ -80,6 +82,7 @@ func (c *concReqsServerCodec) ReadRequestBody(x interface{}) error {
|
||||
if err := ConReqs.Allocate(); err != nil {
|
||||
return err
|
||||
}
|
||||
c.allocated = true
|
||||
if x == nil {
|
||||
return nil
|
||||
}
|
||||
@@ -96,7 +99,13 @@ func (c *concReqsServerCodec) ReadRequestBody(x interface{}) error {
|
||||
}
|
||||
|
||||
func (c *concReqsServerCodec) WriteResponse(r *rpc.Response, x interface{}) error {
|
||||
defer ConReqs.Deallocate(r.Error)
|
||||
if c.allocated {
|
||||
defer func() {
|
||||
ConReqs.Deallocate()
|
||||
c.allocated = false
|
||||
}()
|
||||
}
|
||||
|
||||
c.mutex.Lock()
|
||||
b, ok := c.pending[r.Seq]
|
||||
if !ok {
|
||||
|
||||
@@ -51,6 +51,8 @@ type jsonServerCodec struct {
|
||||
mutex sync.Mutex // protects seq, pending
|
||||
seq uint64
|
||||
pending map[uint64]*json.RawMessage
|
||||
|
||||
allocated bool // populated if we have allocated a channel for concurrent requests
|
||||
}
|
||||
|
||||
// NewCustomJSONServerCodec is used only when DispatcherS is active to handle APIer methods generically
|
||||
@@ -112,6 +114,7 @@ func (c *jsonServerCodec) ReadRequestBody(x interface{}) error {
|
||||
if err := ConReqs.Allocate(); err != nil {
|
||||
return err
|
||||
}
|
||||
c.allocated = true
|
||||
if x == nil {
|
||||
return nil
|
||||
}
|
||||
@@ -140,7 +143,12 @@ func (c *jsonServerCodec) ReadRequestBody(x interface{}) error {
|
||||
var null = json.RawMessage([]byte("null"))
|
||||
|
||||
func (c *jsonServerCodec) WriteResponse(r *rpc.Response, x interface{}) error {
|
||||
defer ConReqs.Deallocate(r.Error)
|
||||
if c.allocated {
|
||||
defer func() {
|
||||
ConReqs.Deallocate()
|
||||
c.allocated = false
|
||||
}()
|
||||
}
|
||||
c.mutex.Lock()
|
||||
b, ok := c.pending[r.Seq]
|
||||
if !ok {
|
||||
|
||||
Reference in New Issue
Block a user