From cd81ab944931b0952a665280831d51d601b2e762 Mon Sep 17 00:00:00 2001 From: TeoV Date: Thu, 9 Jul 2020 11:08:29 +0300 Subject: [PATCH 1/2] Update concurrent requests mechanism to work with BiJSON server --- apier/v1/concreqs_it_test.go | 1 - apier/v1/sessionsbirpc.go | 297 +++++++++++++++++++++++++++++++++ apier/v1/smgbirpc.go | 56 +++++++ utils/concreqs_bijson_codec.go | 210 ----------------------- utils/concureqs.go | 1 + utils/server.go | 4 +- 6 files changed, 357 insertions(+), 212 deletions(-) delete mode 100644 utils/concreqs_bijson_codec.go diff --git a/apier/v1/concreqs_it_test.go b/apier/v1/concreqs_it_test.go index 9d56f5724..6ecff5add 100644 --- a/apier/v1/concreqs_it_test.go +++ b/apier/v1/concreqs_it_test.go @@ -252,7 +252,6 @@ func testConcReqsOnBiJSONBusy(t *testing.T) { if err := concReqsBiRPC.Call(utils.SessionSv1Sleep, &DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)}, &resp); err != nil { - fmt.Println(err) lock.Lock() failedAPIs++ lock.Unlock() diff --git a/apier/v1/sessionsbirpc.go b/apier/v1/sessionsbirpc.go index a288166b9..56714588a 100644 --- a/apier/v1/sessionsbirpc.go +++ b/apier/v1/sessionsbirpc.go @@ -67,137 +67,434 @@ func (ssv1 *SessionSv1) Handlers() map[string]interface{} { func (ssv1 *SessionSv1) BiRPCv1AuthorizeEvent(clnt *rpc2.Client, args *sessions.V1AuthorizeArgs, rply *sessions.V1AuthorizeReply) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1AuthorizeEvent(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1AuthorizeEventWithDigest(clnt *rpc2.Client, args *sessions.V1AuthorizeArgs, rply *sessions.V1AuthorizeReplyWithDigest) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1AuthorizeEventWithDigest(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1InitiateSession(clnt *rpc2.Client, args *sessions.V1InitSessionArgs, rply *sessions.V1InitSessionReply) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1InitiateSession(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1InitiateSessionWithDigest(clnt *rpc2.Client, args *sessions.V1InitSessionArgs, rply *sessions.V1InitReplyWithDigest) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1InitiateSessionWithDigest(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1UpdateSession(clnt *rpc2.Client, args *sessions.V1UpdateSessionArgs, rply *sessions.V1UpdateSessionReply) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1UpdateSession(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1SyncSessions(clnt *rpc2.Client, args *utils.TenantWithArgDispatcher, rply *string) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1SyncSessions(clnt, &utils.TenantWithArgDispatcher{}, rply) } func (ssv1 *SessionSv1) BiRPCv1TerminateSession(clnt *rpc2.Client, args *sessions.V1TerminateSessionArgs, rply *string) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1TerminateSession(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1ProcessCDR(clnt *rpc2.Client, cgrEv *utils.CGREventWithArgDispatcher, rply *string) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1ProcessCDR(clnt, cgrEv, rply) } func (ssv1 *SessionSv1) BiRPCv1ProcessMessage(clnt *rpc2.Client, args *sessions.V1ProcessMessageArgs, rply *sessions.V1ProcessMessageReply) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1ProcessMessage(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1ProcessEvent(clnt *rpc2.Client, args *sessions.V1ProcessEventArgs, rply *sessions.V1ProcessEventReply) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1ProcessEvent(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetCost(clnt *rpc2.Client, args *sessions.V1ProcessEventArgs, rply *sessions.V1GetCostReply) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1GetCost(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetActiveSessions(clnt *rpc2.Client, args *utils.SessionFilter, rply *[]*sessions.ExternalSession) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1GetActiveSessions(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetActiveSessionsCount(clnt *rpc2.Client, args *utils.SessionFilter, rply *int) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1GetActiveSessionsCount(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessions(clnt *rpc2.Client, args *utils.SessionFilter, rply *[]*sessions.ExternalSession) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1GetPassiveSessions(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessionsCount(clnt *rpc2.Client, args *utils.SessionFilter, rply *int) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1GetPassiveSessionsCount(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1ForceDisconnect(clnt *rpc2.Client, args *utils.SessionFilter, rply *string) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1ForceDisconnect(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1RegisterInternalBiJSONConn(clnt *rpc2.Client, args string, rply *string) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1RegisterInternalBiJSONConn(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCPing(clnt *rpc2.Client, ign *utils.CGREventWithArgDispatcher, reply *string) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ping(ign, reply) } func (ssv1 *SessionSv1) BiRPCv1ReplicateSessions(clnt *rpc2.Client, args sessions.ArgsReplicateSessions, reply *string) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.BiRPCv1ReplicateSessions(clnt, args, reply) } func (ssv1 *SessionSv1) BiRPCv1SetPassiveSession(clnt *rpc2.Client, args *sessions.Session, reply *string) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1SetPassiveSession(clnt, args, reply) } func (ssv1 *SessionSv1) BiRPCv1ActivateSessions(clnt *rpc2.Client, args *utils.SessionIDsWithArgsDispatcher, reply *string) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1ActivateSessions(clnt, args, reply) } func (ssv1 *SessionSv1) BiRPCv1DeactivateSessions(clnt *rpc2.Client, args *utils.SessionIDsWithArgsDispatcher, reply *string) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1DeactivateSessions(clnt, args, reply) } // BiRPCV1ReAuthorize sends the RAR for filterd sessions func (ssv1 *SessionSv1) BiRPCV1ReAuthorize(clnt *rpc2.Client, args *utils.SessionFilter, reply *string) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1ReAuthorize(clnt, args, reply) } // BiRPCV1DisconnectPeer sends the DPR for the OriginHost and OriginRealm func (ssv1 *SessionSv1) BiRPCV1DisconnectPeer(clnt *rpc2.Client, args *utils.DPRArgs, reply *string) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1DisconnectPeer(clnt, args, reply) } // BiRPCV1STIRAuthenticate checks the identity using STIR/SHAKEN func (ssv1 *SessionSv1) BiRPCV1STIRAuthenticate(clnt *rpc2.Client, args *sessions.V1STIRAuthenticateArgs, reply *string) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1STIRAuthenticate(clnt, args, reply) } // BiRPCV1STIRIdentity creates the identity for STIR/SHAKEN func (ssv1 *SessionSv1) BiRPCV1STIRIdentity(clnt *rpc2.Client, args *sessions.V1STIRIdentityArgs, reply *string) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return ssv1.Ss.BiRPCv1STIRIdentity(nil, args, reply) } func (ssv1 *SessionSv1) BiRPCV1Sleep(clnt *rpc2.Client, arg *DurationArgs, reply *string) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() time.Sleep(arg.DurationTime) *reply = utils.OK return nil diff --git a/apier/v1/smgbirpc.go b/apier/v1/smgbirpc.go index 7cd2ed611..1e2bfc389 100644 --- a/apier/v1/smgbirpc.go +++ b/apier/v1/smgbirpc.go @@ -20,6 +20,7 @@ package v1 import ( "github.com/cenkalti/rpc2" + "github.com/cgrates/cgrates/utils" ) // Publishes methods exported by SMGenericV1 as SMGenericV1 (so we can handle standard RPC methods via birpc socket) @@ -36,29 +37,84 @@ func (smgv1 *SMGenericV1) Handlers() map[string]interface{} { /// Returns MaxUsage (for calls in seconds), -1 for no limit func (smgv1 *SMGenericV1) BiRPCV1GetMaxUsage(clnt *rpc2.Client, ev map[string]interface{}, maxUsage *float64) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return smgv1.Ss.BiRPCV1GetMaxUsage(clnt, ev, maxUsage) } // Called on session start, returns the maximum number of seconds the session can last func (smgv1 *SMGenericV1) BiRPCV1InitiateSession(clnt *rpc2.Client, ev map[string]interface{}, maxUsage *float64) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return smgv1.Ss.BiRPCV1InitiateSession(clnt, ev, maxUsage) } // Interim updates, returns remaining duration from the rater func (smgv1 *SMGenericV1) BiRPCV1UpdateSession(clnt *rpc2.Client, ev map[string]interface{}, maxUsage *float64) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return smgv1.Ss.BiRPCV1UpdateSession(clnt, ev, maxUsage) } // Called on session end, should stop debit loop func (smgv1 *SMGenericV1) BiRPCV1TerminateSession(clnt *rpc2.Client, ev map[string]interface{}, reply *string) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return smgv1.Ss.BiRPCV1TerminateSession(clnt, ev, reply) } // Called on session end, should send the CDR to CDRS func (smgv1 *SMGenericV1) BiRPCV1ProcessCDR(clnt *rpc2.Client, ev map[string]interface{}, reply *string) error { + err := utils.ConReqs.Allocate() + if err != nil { + return err + } + defer func() { + if err != nil { + utils.ConReqs.Deallocate(err.Error()) + } else { + utils.ConReqs.Deallocate(utils.EmptyString) + } + }() return smgv1.Ss.BiRPCV1ProcessCDR(clnt, ev, reply) } diff --git a/utils/concreqs_bijson_codec.go b/utils/concreqs_bijson_codec.go deleted file mode 100644 index f4730c5ef..000000000 --- a/utils/concreqs_bijson_codec.go +++ /dev/null @@ -1,210 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -// Most of the logic follows standard library implementation in this file -package utils - -import ( - "encoding/json" - "errors" - "fmt" - "io" - "sync" - - "github.com/cenkalti/rpc2" -) - -type concReqsBiJSONCoded struct { - dec *json.Decoder // for reading JSON values - enc *json.Encoder // for writing JSON values - c io.Closer - - // temporary work space - msg message - serverRequest serverRequest - clientResponse clientResponse - - // JSON-RPC clients can use arbitrary json values as request IDs. - // Package rpc expects uint64 request IDs. - // We assign uint64 sequence numbers to incoming requests - // but save the original request ID in the pending map. - // When rpc responds, we use the sequence number in - // the response to find the original request ID. - mutex sync.Mutex // protects seq, pending - pending map[uint64]*json.RawMessage - seq uint64 -} - -// NewConcReqsBiJSONCoded returns a new rpc2.Codec using JSON-RPC on conn. -func NewConcReqsBiJSONCoded(conn io.ReadWriteCloser) rpc2.Codec { - return &concReqsBiJSONCoded{ - dec: json.NewDecoder(conn), - enc: json.NewEncoder(conn), - c: conn, - pending: make(map[uint64]*json.RawMessage), - } -} - -// serverRequest and clientResponse combined -type message struct { - Method string `json:"method"` - Params *json.RawMessage `json:"params"` - Id *json.RawMessage `json:"id"` - Result *json.RawMessage `json:"result"` - Error interface{} `json:"error"` -} - -type clientResponse struct { - Id uint64 `json:"id"` - Result *json.RawMessage `json:"result"` - Error interface{} `json:"error"` -} - -type clientRequest struct { - Method string `json:"method"` - Params []interface{} `json:"params"` - Id *uint64 `json:"id"` -} - -func (c *concReqsBiJSONCoded) ReadHeader(req *rpc2.Request, resp *rpc2.Response) error { - c.msg = message{} - if err := c.dec.Decode(&c.msg); err != nil { - return err - } - - if c.msg.Method != "" { - // request comes to server - c.serverRequest.Id = c.msg.Id - c.serverRequest.Method = c.msg.Method - c.serverRequest.Params = c.msg.Params - - req.Method = c.serverRequest.Method - - // JSON request id can be any JSON value; - // RPC package expects uint64. Translate to - // internal uint64 and save JSON on the side. - if c.serverRequest.Id == nil { - // Notification - } else { - c.mutex.Lock() - c.seq++ - c.pending[c.seq] = c.serverRequest.Id - c.serverRequest.Id = nil - req.Seq = c.seq - c.mutex.Unlock() - } - } else { - // response comes to client - err := json.Unmarshal(*c.msg.Id, &c.clientResponse.Id) - if err != nil { - return err - } - c.clientResponse.Result = c.msg.Result - c.clientResponse.Error = c.msg.Error - - resp.Error = "" - resp.Seq = c.clientResponse.Id - if c.clientResponse.Error != nil || c.clientResponse.Result == nil { - x, ok := c.clientResponse.Error.(string) - if !ok { - return fmt.Errorf("invalid error %v", c.clientResponse.Error) - } - if x == "" { - x = "unspecified error" - } - resp.Error = x - } - } - return nil -} - -func (c *concReqsBiJSONCoded) ReadRequestBody(x interface{}) error { - if err := ConReqs.Allocate(); err != nil { - return err - } - if x == nil { - return nil - } - if c.serverRequest.Params == nil { - return errMissingParams - } - var params *[]interface{} - switch x := x.(type) { - case *[]interface{}: - params = x - default: - params = &[]interface{}{x} - } - return json.Unmarshal(*c.serverRequest.Params, params) -} - -func (c *concReqsBiJSONCoded) ReadResponseBody(x interface{}) error { - if err := ConReqs.Allocate(); err != nil { - return err - } - if x == nil { - return nil - } - return json.Unmarshal(*c.clientResponse.Result, x) -} - -func (c *concReqsBiJSONCoded) WriteRequest(r *rpc2.Request, param interface{}) error { - req := &clientRequest{Method: r.Method} - switch param := param.(type) { - case []interface{}: - req.Params = param - default: - req.Params = []interface{}{param} - } - if r.Seq == 0 { - // Notification - req.Id = nil - } else { - seq := r.Seq - req.Id = &seq - } - return c.enc.Encode(req) -} - -func (c *concReqsBiJSONCoded) WriteResponse(r *rpc2.Response, x interface{}) error { - defer ConReqs.Deallocate(r.Error) - c.mutex.Lock() - b, ok := c.pending[r.Seq] - if !ok { - c.mutex.Unlock() - return errors.New("invalid sequence number in response") - } - delete(c.pending, r.Seq) - c.mutex.Unlock() - - if b == nil { - // Invalid request so no id. Use JSON null. - b = &null - } - resp := serverResponse{Id: b} - if r.Error == "" { - resp.Result = x - } else { - resp.Error = r.Error - } - return c.enc.Encode(resp) -} - -func (c *concReqsBiJSONCoded) Close() error { - return c.c.Close() -} diff --git a/utils/concureqs.go b/utils/concureqs.go index 8f2029853..9d2258e0b 100644 --- a/utils/concureqs.go +++ b/utils/concureqs.go @@ -45,6 +45,7 @@ func NewConReqs(reqs int, strategy string) *ConcReqs { var errDeny = fmt.Errorf("denying request due to maximum active requests reached") func (cR *ConcReqs) Allocate() (err error) { + fmt.Println("ENTER IN ALLOCATE ") if cR.nAReqs == 0 { return } diff --git a/utils/server.go b/utils/server.go index ea11e7b49..433e86f4b 100644 --- a/utils/server.go +++ b/utils/server.go @@ -37,6 +37,8 @@ import ( "sync" "time" + rpc2_jsonrpc "github.com/cenkalti/rpc2/jsonrpc" + "github.com/cenkalti/rpc2" "golang.org/x/net/websocket" ) @@ -327,7 +329,7 @@ func (s *Server) ServeBiJSON(addr string, onConn func(*rpc2.Client), onDis func( log.Fatal(err) return // stop if we get Accept error } - go s.birpcSrv.ServeCodec(NewConcReqsBiJSONCoded(conn)) + go s.birpcSrv.ServeCodec(rpc2_jsonrpc.NewJSONCodec(conn)) } }(lBiJSON) <-s.stopbiRPCServer // wait until server is stoped to close the listener From aed9c2b47fe2414067349fb76e7ced319e87fee0 Mon Sep 17 00:00:00 2001 From: TeoV Date: Thu, 9 Jul 2020 11:50:13 +0300 Subject: [PATCH 2/2] Improve concurrent request mechanish --- apier/v1/sessionsbirpc.go | 408 +++++++++------------------------- apier/v1/smgbirpc.go | 75 ++----- utils/concureqs.go | 15 +- utils/concureqs_gob_codec.go | 19 +- utils/concureqs_json_codec.go | 11 +- utils/json_codec.go | 10 +- 6 files changed, 170 insertions(+), 368 deletions(-) diff --git a/apier/v1/sessionsbirpc.go b/apier/v1/sessionsbirpc.go index 56714588a..3556cdbf8 100644 --- a/apier/v1/sessionsbirpc.go +++ b/apier/v1/sessionsbirpc.go @@ -66,435 +66,249 @@ func (ssv1 *SessionSv1) Handlers() map[string]interface{} { } func (ssv1 *SessionSv1) BiRPCv1AuthorizeEvent(clnt *rpc2.Client, args *sessions.V1AuthorizeArgs, - rply *sessions.V1AuthorizeReply) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + rply *sessions.V1AuthorizeReply) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1AuthorizeEvent(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1AuthorizeEventWithDigest(clnt *rpc2.Client, args *sessions.V1AuthorizeArgs, - rply *sessions.V1AuthorizeReplyWithDigest) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + rply *sessions.V1AuthorizeReplyWithDigest) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1AuthorizeEventWithDigest(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1InitiateSession(clnt *rpc2.Client, args *sessions.V1InitSessionArgs, - rply *sessions.V1InitSessionReply) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + rply *sessions.V1InitSessionReply) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1InitiateSession(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1InitiateSessionWithDigest(clnt *rpc2.Client, args *sessions.V1InitSessionArgs, - rply *sessions.V1InitReplyWithDigest) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + rply *sessions.V1InitReplyWithDigest) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1InitiateSessionWithDigest(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1UpdateSession(clnt *rpc2.Client, args *sessions.V1UpdateSessionArgs, - rply *sessions.V1UpdateSessionReply) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + rply *sessions.V1UpdateSessionReply) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1UpdateSession(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1SyncSessions(clnt *rpc2.Client, args *utils.TenantWithArgDispatcher, - rply *string) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + rply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1SyncSessions(clnt, &utils.TenantWithArgDispatcher{}, rply) } func (ssv1 *SessionSv1) BiRPCv1TerminateSession(clnt *rpc2.Client, args *sessions.V1TerminateSessionArgs, - rply *string) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + rply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1TerminateSession(clnt, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1ProcessCDR(clnt *rpc2.Client, cgrEv *utils.CGREventWithArgDispatcher, rply *string) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err +func (ssv1 *SessionSv1) BiRPCv1ProcessCDR(clnt *rpc2.Client, cgrEv *utils.CGREventWithArgDispatcher, + rply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1ProcessCDR(clnt, cgrEv, rply) } func (ssv1 *SessionSv1) BiRPCv1ProcessMessage(clnt *rpc2.Client, args *sessions.V1ProcessMessageArgs, - rply *sessions.V1ProcessMessageReply) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + rply *sessions.V1ProcessMessageReply) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1ProcessMessage(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1ProcessEvent(clnt *rpc2.Client, args *sessions.V1ProcessEventArgs, - rply *sessions.V1ProcessEventReply) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + rply *sessions.V1ProcessEventReply) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1ProcessEvent(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetCost(clnt *rpc2.Client, args *sessions.V1ProcessEventArgs, - rply *sessions.V1GetCostReply) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + rply *sessions.V1GetCostReply) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1GetCost(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetActiveSessions(clnt *rpc2.Client, args *utils.SessionFilter, - rply *[]*sessions.ExternalSession) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + rply *[]*sessions.ExternalSession) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1GetActiveSessions(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetActiveSessionsCount(clnt *rpc2.Client, args *utils.SessionFilter, - rply *int) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + rply *int) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1GetActiveSessionsCount(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessions(clnt *rpc2.Client, args *utils.SessionFilter, - rply *[]*sessions.ExternalSession) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + rply *[]*sessions.ExternalSession) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1GetPassiveSessions(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessionsCount(clnt *rpc2.Client, args *utils.SessionFilter, - rply *int) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + rply *int) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1GetPassiveSessionsCount(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1ForceDisconnect(clnt *rpc2.Client, args *utils.SessionFilter, - rply *string) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + rply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1ForceDisconnect(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1RegisterInternalBiJSONConn(clnt *rpc2.Client, args string, - rply *string) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + rply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1RegisterInternalBiJSONConn(clnt, args, rply) } -func (ssv1 *SessionSv1) BiRPCPing(clnt *rpc2.Client, ign *utils.CGREventWithArgDispatcher, reply *string) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err +func (ssv1 *SessionSv1) BiRPCPing(clnt *rpc2.Client, ign *utils.CGREventWithArgDispatcher, + reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ping(ign, reply) } func (ssv1 *SessionSv1) BiRPCv1ReplicateSessions(clnt *rpc2.Client, - args sessions.ArgsReplicateSessions, reply *string) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + args sessions.ArgsReplicateSessions, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.BiRPCv1ReplicateSessions(clnt, args, reply) } func (ssv1 *SessionSv1) BiRPCv1SetPassiveSession(clnt *rpc2.Client, - args *sessions.Session, reply *string) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + args *sessions.Session, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1SetPassiveSession(clnt, args, reply) } func (ssv1 *SessionSv1) BiRPCv1ActivateSessions(clnt *rpc2.Client, - args *utils.SessionIDsWithArgsDispatcher, reply *string) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + args *utils.SessionIDsWithArgsDispatcher, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1ActivateSessions(clnt, args, reply) } func (ssv1 *SessionSv1) BiRPCv1DeactivateSessions(clnt *rpc2.Client, - args *utils.SessionIDsWithArgsDispatcher, reply *string) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + args *utils.SessionIDsWithArgsDispatcher, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1DeactivateSessions(clnt, args, reply) } // BiRPCV1ReAuthorize sends the RAR for filterd sessions func (ssv1 *SessionSv1) BiRPCV1ReAuthorize(clnt *rpc2.Client, - args *utils.SessionFilter, reply *string) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + args *utils.SessionFilter, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1ReAuthorize(clnt, args, reply) } // BiRPCV1DisconnectPeer sends the DPR for the OriginHost and OriginRealm func (ssv1 *SessionSv1) BiRPCV1DisconnectPeer(clnt *rpc2.Client, - args *utils.DPRArgs, reply *string) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + args *utils.DPRArgs, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1DisconnectPeer(clnt, args, reply) } // BiRPCV1STIRAuthenticate checks the identity using STIR/SHAKEN func (ssv1 *SessionSv1) BiRPCV1STIRAuthenticate(clnt *rpc2.Client, - args *sessions.V1STIRAuthenticateArgs, reply *string) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + args *sessions.V1STIRAuthenticateArgs, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1STIRAuthenticate(clnt, args, reply) } // BiRPCV1STIRIdentity creates the identity for STIR/SHAKEN func (ssv1 *SessionSv1) BiRPCV1STIRIdentity(clnt *rpc2.Client, - args *sessions.V1STIRIdentityArgs, reply *string) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + args *sessions.V1STIRIdentityArgs, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1STIRIdentity(nil, args, reply) } -func (ssv1 *SessionSv1) BiRPCV1Sleep(clnt *rpc2.Client, arg *DurationArgs, reply *string) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err +func (ssv1 *SessionSv1) BiRPCV1Sleep(clnt *rpc2.Client, arg *DurationArgs, + reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() time.Sleep(arg.DurationTime) *reply = utils.OK return nil diff --git a/apier/v1/smgbirpc.go b/apier/v1/smgbirpc.go index 1e2bfc389..f2824d898 100644 --- a/apier/v1/smgbirpc.go +++ b/apier/v1/smgbirpc.go @@ -36,85 +36,50 @@ func (smgv1 *SMGenericV1) Handlers() map[string]interface{} { /// Returns MaxUsage (for calls in seconds), -1 for no limit func (smgv1 *SMGenericV1) BiRPCV1GetMaxUsage(clnt *rpc2.Client, - ev map[string]interface{}, maxUsage *float64) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + ev map[string]interface{}, maxUsage *float64) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return smgv1.Ss.BiRPCV1GetMaxUsage(clnt, ev, maxUsage) } // Called on session start, returns the maximum number of seconds the session can last func (smgv1 *SMGenericV1) BiRPCV1InitiateSession(clnt *rpc2.Client, - ev map[string]interface{}, maxUsage *float64) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + ev map[string]interface{}, maxUsage *float64) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return smgv1.Ss.BiRPCV1InitiateSession(clnt, ev, maxUsage) } // Interim updates, returns remaining duration from the rater func (smgv1 *SMGenericV1) BiRPCV1UpdateSession(clnt *rpc2.Client, - ev map[string]interface{}, maxUsage *float64) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + ev map[string]interface{}, maxUsage *float64) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return smgv1.Ss.BiRPCV1UpdateSession(clnt, ev, maxUsage) } // Called on session end, should stop debit loop func (smgv1 *SMGenericV1) BiRPCV1TerminateSession(clnt *rpc2.Client, - ev map[string]interface{}, reply *string) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + ev map[string]interface{}, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return smgv1.Ss.BiRPCV1TerminateSession(clnt, ev, reply) } // Called on session end, should send the CDR to CDRS func (smgv1 *SMGenericV1) BiRPCV1ProcessCDR(clnt *rpc2.Client, - ev map[string]interface{}, reply *string) error { - err := utils.ConReqs.Allocate() - if err != nil { - return err + ev map[string]interface{}, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return } - defer func() { - if err != nil { - utils.ConReqs.Deallocate(err.Error()) - } else { - utils.ConReqs.Deallocate(utils.EmptyString) - } - }() + defer utils.ConReqs.Deallocate() return smgv1.Ss.BiRPCV1ProcessCDR(clnt, ev, reply) } diff --git a/utils/concureqs.go b/utils/concureqs.go index 9d2258e0b..8d41f7843 100644 --- a/utils/concureqs.go +++ b/utils/concureqs.go @@ -25,16 +25,16 @@ import ( var ConReqs *ConcReqs type ConcReqs struct { - aReqs chan struct{} - nAReqs int + limit int strategy string + aReqs chan struct{} } func NewConReqs(reqs int, strategy string) *ConcReqs { cR := &ConcReqs{ - aReqs: make(chan struct{}, reqs), - nAReqs: reqs, + limit: reqs, strategy: strategy, + aReqs: make(chan struct{}, reqs), } for i := 0; i < reqs; i++ { cR.aReqs <- struct{}{} @@ -45,8 +45,7 @@ func NewConReqs(reqs int, strategy string) *ConcReqs { var errDeny = fmt.Errorf("denying request due to maximum active requests reached") func (cR *ConcReqs) Allocate() (err error) { - fmt.Println("ENTER IN ALLOCATE ") - if cR.nAReqs == 0 { + if cR.limit == 0 { return } switch cR.strategy { @@ -61,8 +60,8 @@ func (cR *ConcReqs) Allocate() (err error) { return } -func (cR *ConcReqs) Deallocate(errStr string) { - if cR.nAReqs == 0 || errStr == errDeny.Error() { // in case we receive denying request we don't need to put back the slot on channel because we returned error without getting it +func (cR *ConcReqs) Deallocate() { + if cR.limit == 0 { return } cR.aReqs <- struct{}{} diff --git a/utils/concureqs_gob_codec.go b/utils/concureqs_gob_codec.go index 2ceb66a76..16a3b4690 100644 --- a/utils/concureqs_gob_codec.go +++ b/utils/concureqs_gob_codec.go @@ -28,11 +28,12 @@ import ( ) type concReqsGobServerCodec struct { - rwc io.ReadWriteCloser - dec *gob.Decoder - enc *gob.Encoder - encBuf *bufio.Writer - closed bool + rwc io.ReadWriteCloser + dec *gob.Decoder + enc *gob.Encoder + encBuf *bufio.Writer + closed bool + allocated bool // populated if we have allocated a channel for concurrent requests } func NewConcReqsGobServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec { @@ -53,11 +54,17 @@ func (c *concReqsGobServerCodec) ReadRequestBody(body interface{}) error { if err := ConReqs.Allocate(); err != nil { return err } + c.allocated = true return c.dec.Decode(body) } func (c *concReqsGobServerCodec) WriteResponse(r *rpc.Response, body interface{}) (err error) { - defer ConReqs.Deallocate(r.Error) + if c.allocated { + defer func() { + ConReqs.Deallocate() + c.allocated = false + }() + } if err = c.enc.Encode(r); err != nil { if c.encBuf.Flush() == nil { // Gob couldn't encode the header. Should not happen, so if it does, diff --git a/utils/concureqs_json_codec.go b/utils/concureqs_json_codec.go index c85e7cd0c..125d661ac 100644 --- a/utils/concureqs_json_codec.go +++ b/utils/concureqs_json_codec.go @@ -44,6 +44,8 @@ type concReqsServerCodec struct { mutex sync.Mutex // protects seq, pending seq uint64 pending map[uint64]*json.RawMessage + + allocated bool // populated if we have allocated a channel for concurrent requests } // NewConcReqsServerCodec returns a new rpc.ServerCodec using JSON-RPC on conn. @@ -80,6 +82,7 @@ func (c *concReqsServerCodec) ReadRequestBody(x interface{}) error { if err := ConReqs.Allocate(); err != nil { return err } + c.allocated = true if x == nil { return nil } @@ -96,7 +99,13 @@ func (c *concReqsServerCodec) ReadRequestBody(x interface{}) error { } func (c *concReqsServerCodec) WriteResponse(r *rpc.Response, x interface{}) error { - defer ConReqs.Deallocate(r.Error) + if c.allocated { + defer func() { + ConReqs.Deallocate() + c.allocated = false + }() + } + c.mutex.Lock() b, ok := c.pending[r.Seq] if !ok { diff --git a/utils/json_codec.go b/utils/json_codec.go index 882ee2a4b..fad261653 100644 --- a/utils/json_codec.go +++ b/utils/json_codec.go @@ -51,6 +51,8 @@ type jsonServerCodec struct { mutex sync.Mutex // protects seq, pending seq uint64 pending map[uint64]*json.RawMessage + + allocated bool // populated if we have allocated a channel for concurrent requests } // NewCustomJSONServerCodec is used only when DispatcherS is active to handle APIer methods generically @@ -112,6 +114,7 @@ func (c *jsonServerCodec) ReadRequestBody(x interface{}) error { if err := ConReqs.Allocate(); err != nil { return err } + c.allocated = true if x == nil { return nil } @@ -140,7 +143,12 @@ func (c *jsonServerCodec) ReadRequestBody(x interface{}) error { var null = json.RawMessage([]byte("null")) func (c *jsonServerCodec) WriteResponse(r *rpc.Response, x interface{}) error { - defer ConReqs.Deallocate(r.Error) + if c.allocated { + defer func() { + ConReqs.Deallocate() + c.allocated = false + }() + } c.mutex.Lock() b, ok := c.pending[r.Seq] if !ok {