diff --git a/apier/v1/sessionsbirpc.go b/apier/v1/sessionsbirpc.go index a288166b9..3556cdbf8 100644 --- a/apier/v1/sessionsbirpc.go +++ b/apier/v1/sessionsbirpc.go @@ -66,138 +66,249 @@ func (ssv1 *SessionSv1) Handlers() map[string]interface{} { } func (ssv1 *SessionSv1) BiRPCv1AuthorizeEvent(clnt *rpc2.Client, args *sessions.V1AuthorizeArgs, - rply *sessions.V1AuthorizeReply) error { + rply *sessions.V1AuthorizeReply) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1AuthorizeEvent(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1AuthorizeEventWithDigest(clnt *rpc2.Client, args *sessions.V1AuthorizeArgs, - rply *sessions.V1AuthorizeReplyWithDigest) error { + rply *sessions.V1AuthorizeReplyWithDigest) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1AuthorizeEventWithDigest(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1InitiateSession(clnt *rpc2.Client, args *sessions.V1InitSessionArgs, - rply *sessions.V1InitSessionReply) error { + rply *sessions.V1InitSessionReply) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1InitiateSession(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1InitiateSessionWithDigest(clnt *rpc2.Client, args *sessions.V1InitSessionArgs, - rply *sessions.V1InitReplyWithDigest) error { + rply *sessions.V1InitReplyWithDigest) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1InitiateSessionWithDigest(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1UpdateSession(clnt *rpc2.Client, args *sessions.V1UpdateSessionArgs, - rply *sessions.V1UpdateSessionReply) error { + rply *sessions.V1UpdateSessionReply) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1UpdateSession(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1SyncSessions(clnt *rpc2.Client, args *utils.TenantWithArgDispatcher, - rply *string) error { + rply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1SyncSessions(clnt, &utils.TenantWithArgDispatcher{}, rply) } func (ssv1 *SessionSv1) BiRPCv1TerminateSession(clnt *rpc2.Client, args *sessions.V1TerminateSessionArgs, - rply *string) error { + rply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1TerminateSession(clnt, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1ProcessCDR(clnt *rpc2.Client, cgrEv *utils.CGREventWithArgDispatcher, rply *string) error { +func (ssv1 *SessionSv1) BiRPCv1ProcessCDR(clnt *rpc2.Client, cgrEv *utils.CGREventWithArgDispatcher, + rply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1ProcessCDR(clnt, cgrEv, rply) } func (ssv1 *SessionSv1) BiRPCv1ProcessMessage(clnt *rpc2.Client, args *sessions.V1ProcessMessageArgs, - rply *sessions.V1ProcessMessageReply) error { + rply *sessions.V1ProcessMessageReply) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1ProcessMessage(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1ProcessEvent(clnt *rpc2.Client, args *sessions.V1ProcessEventArgs, - rply *sessions.V1ProcessEventReply) error { + rply *sessions.V1ProcessEventReply) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1ProcessEvent(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetCost(clnt *rpc2.Client, args *sessions.V1ProcessEventArgs, - rply *sessions.V1GetCostReply) error { + rply *sessions.V1GetCostReply) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1GetCost(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetActiveSessions(clnt *rpc2.Client, args *utils.SessionFilter, - rply *[]*sessions.ExternalSession) error { + rply *[]*sessions.ExternalSession) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1GetActiveSessions(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetActiveSessionsCount(clnt *rpc2.Client, args *utils.SessionFilter, - rply *int) error { + rply *int) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1GetActiveSessionsCount(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessions(clnt *rpc2.Client, args *utils.SessionFilter, - rply *[]*sessions.ExternalSession) error { + rply *[]*sessions.ExternalSession) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1GetPassiveSessions(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessionsCount(clnt *rpc2.Client, args *utils.SessionFilter, - rply *int) error { + rply *int) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1GetPassiveSessionsCount(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1ForceDisconnect(clnt *rpc2.Client, args *utils.SessionFilter, - rply *string) error { + rply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1ForceDisconnect(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1RegisterInternalBiJSONConn(clnt *rpc2.Client, args string, - rply *string) error { + rply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1RegisterInternalBiJSONConn(clnt, args, rply) } -func (ssv1 *SessionSv1) BiRPCPing(clnt *rpc2.Client, ign *utils.CGREventWithArgDispatcher, reply *string) error { +func (ssv1 *SessionSv1) BiRPCPing(clnt *rpc2.Client, ign *utils.CGREventWithArgDispatcher, + reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ping(ign, reply) } func (ssv1 *SessionSv1) BiRPCv1ReplicateSessions(clnt *rpc2.Client, - args sessions.ArgsReplicateSessions, reply *string) error { + args sessions.ArgsReplicateSessions, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.BiRPCv1ReplicateSessions(clnt, args, reply) } func (ssv1 *SessionSv1) BiRPCv1SetPassiveSession(clnt *rpc2.Client, - args *sessions.Session, reply *string) error { + args *sessions.Session, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1SetPassiveSession(clnt, args, reply) } func (ssv1 *SessionSv1) BiRPCv1ActivateSessions(clnt *rpc2.Client, - args *utils.SessionIDsWithArgsDispatcher, reply *string) error { + args *utils.SessionIDsWithArgsDispatcher, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1ActivateSessions(clnt, args, reply) } func (ssv1 *SessionSv1) BiRPCv1DeactivateSessions(clnt *rpc2.Client, - args *utils.SessionIDsWithArgsDispatcher, reply *string) error { + args *utils.SessionIDsWithArgsDispatcher, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1DeactivateSessions(clnt, args, reply) } // BiRPCV1ReAuthorize sends the RAR for filterd sessions func (ssv1 *SessionSv1) BiRPCV1ReAuthorize(clnt *rpc2.Client, - args *utils.SessionFilter, reply *string) error { + args *utils.SessionFilter, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1ReAuthorize(clnt, args, reply) } // BiRPCV1DisconnectPeer sends the DPR for the OriginHost and OriginRealm func (ssv1 *SessionSv1) BiRPCV1DisconnectPeer(clnt *rpc2.Client, - args *utils.DPRArgs, reply *string) error { + args *utils.DPRArgs, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1DisconnectPeer(clnt, args, reply) } // BiRPCV1STIRAuthenticate checks the identity using STIR/SHAKEN func (ssv1 *SessionSv1) BiRPCV1STIRAuthenticate(clnt *rpc2.Client, - args *sessions.V1STIRAuthenticateArgs, reply *string) error { + args *sessions.V1STIRAuthenticateArgs, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1STIRAuthenticate(clnt, args, reply) } // BiRPCV1STIRIdentity creates the identity for STIR/SHAKEN func (ssv1 *SessionSv1) BiRPCV1STIRIdentity(clnt *rpc2.Client, - args *sessions.V1STIRIdentityArgs, reply *string) error { + args *sessions.V1STIRIdentityArgs, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1STIRIdentity(nil, args, reply) } -func (ssv1 *SessionSv1) BiRPCV1Sleep(clnt *rpc2.Client, arg *DurationArgs, reply *string) error { +func (ssv1 *SessionSv1) BiRPCV1Sleep(clnt *rpc2.Client, arg *DurationArgs, + reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() time.Sleep(arg.DurationTime) *reply = utils.OK return nil diff --git a/apier/v1/smgbirpc.go b/apier/v1/smgbirpc.go index 7cd2ed611..f2824d898 100644 --- a/apier/v1/smgbirpc.go +++ b/apier/v1/smgbirpc.go @@ -20,6 +20,7 @@ package v1 import ( "github.com/cenkalti/rpc2" + "github.com/cgrates/cgrates/utils" ) // Publishes methods exported by SMGenericV1 as SMGenericV1 (so we can handle standard RPC methods via birpc socket) @@ -35,30 +36,50 @@ func (smgv1 *SMGenericV1) Handlers() map[string]interface{} { /// Returns MaxUsage (for calls in seconds), -1 for no limit func (smgv1 *SMGenericV1) BiRPCV1GetMaxUsage(clnt *rpc2.Client, - ev map[string]interface{}, maxUsage *float64) error { + ev map[string]interface{}, maxUsage *float64) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return smgv1.Ss.BiRPCV1GetMaxUsage(clnt, ev, maxUsage) } // Called on session start, returns the maximum number of seconds the session can last func (smgv1 *SMGenericV1) BiRPCV1InitiateSession(clnt *rpc2.Client, - ev map[string]interface{}, maxUsage *float64) error { + ev map[string]interface{}, maxUsage *float64) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return smgv1.Ss.BiRPCV1InitiateSession(clnt, ev, maxUsage) } // Interim updates, returns remaining duration from the rater func (smgv1 *SMGenericV1) BiRPCV1UpdateSession(clnt *rpc2.Client, - ev map[string]interface{}, maxUsage *float64) error { + ev map[string]interface{}, maxUsage *float64) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return smgv1.Ss.BiRPCV1UpdateSession(clnt, ev, maxUsage) } // Called on session end, should stop debit loop func (smgv1 *SMGenericV1) BiRPCV1TerminateSession(clnt *rpc2.Client, - ev map[string]interface{}, reply *string) error { + ev map[string]interface{}, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return smgv1.Ss.BiRPCV1TerminateSession(clnt, ev, reply) } // Called on session end, should send the CDR to CDRS func (smgv1 *SMGenericV1) BiRPCV1ProcessCDR(clnt *rpc2.Client, - ev map[string]interface{}, reply *string) error { + ev map[string]interface{}, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return smgv1.Ss.BiRPCV1ProcessCDR(clnt, ev, reply) } diff --git a/utils/concreqs_bijson_codec.go b/utils/concreqs_bijson_codec.go deleted file mode 100644 index f4730c5ef..000000000 --- a/utils/concreqs_bijson_codec.go +++ /dev/null @@ -1,210 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -// Most of the logic follows standard library implementation in this file -package utils - -import ( - "encoding/json" - "errors" - "fmt" - "io" - "sync" - - "github.com/cenkalti/rpc2" -) - -type concReqsBiJSONCoded struct { - dec *json.Decoder // for reading JSON values - enc *json.Encoder // for writing JSON values - c io.Closer - - // temporary work space - msg message - serverRequest serverRequest - clientResponse clientResponse - - // JSON-RPC clients can use arbitrary json values as request IDs. - // Package rpc expects uint64 request IDs. - // We assign uint64 sequence numbers to incoming requests - // but save the original request ID in the pending map. - // When rpc responds, we use the sequence number in - // the response to find the original request ID. - mutex sync.Mutex // protects seq, pending - pending map[uint64]*json.RawMessage - seq uint64 -} - -// NewConcReqsBiJSONCoded returns a new rpc2.Codec using JSON-RPC on conn. -func NewConcReqsBiJSONCoded(conn io.ReadWriteCloser) rpc2.Codec { - return &concReqsBiJSONCoded{ - dec: json.NewDecoder(conn), - enc: json.NewEncoder(conn), - c: conn, - pending: make(map[uint64]*json.RawMessage), - } -} - -// serverRequest and clientResponse combined -type message struct { - Method string `json:"method"` - Params *json.RawMessage `json:"params"` - Id *json.RawMessage `json:"id"` - Result *json.RawMessage `json:"result"` - Error interface{} `json:"error"` -} - -type clientResponse struct { - Id uint64 `json:"id"` - Result *json.RawMessage `json:"result"` - Error interface{} `json:"error"` -} - -type clientRequest struct { - Method string `json:"method"` - Params []interface{} `json:"params"` - Id *uint64 `json:"id"` -} - -func (c *concReqsBiJSONCoded) ReadHeader(req *rpc2.Request, resp *rpc2.Response) error { - c.msg = message{} - if err := c.dec.Decode(&c.msg); err != nil { - return err - } - - if c.msg.Method != "" { - // request comes to server - c.serverRequest.Id = c.msg.Id - c.serverRequest.Method = c.msg.Method - c.serverRequest.Params = c.msg.Params - - req.Method = c.serverRequest.Method - - // JSON request id can be any JSON value; - // RPC package expects uint64. Translate to - // internal uint64 and save JSON on the side. - if c.serverRequest.Id == nil { - // Notification - } else { - c.mutex.Lock() - c.seq++ - c.pending[c.seq] = c.serverRequest.Id - c.serverRequest.Id = nil - req.Seq = c.seq - c.mutex.Unlock() - } - } else { - // response comes to client - err := json.Unmarshal(*c.msg.Id, &c.clientResponse.Id) - if err != nil { - return err - } - c.clientResponse.Result = c.msg.Result - c.clientResponse.Error = c.msg.Error - - resp.Error = "" - resp.Seq = c.clientResponse.Id - if c.clientResponse.Error != nil || c.clientResponse.Result == nil { - x, ok := c.clientResponse.Error.(string) - if !ok { - return fmt.Errorf("invalid error %v", c.clientResponse.Error) - } - if x == "" { - x = "unspecified error" - } - resp.Error = x - } - } - return nil -} - -func (c *concReqsBiJSONCoded) ReadRequestBody(x interface{}) error { - if err := ConReqs.Allocate(); err != nil { - return err - } - if x == nil { - return nil - } - if c.serverRequest.Params == nil { - return errMissingParams - } - var params *[]interface{} - switch x := x.(type) { - case *[]interface{}: - params = x - default: - params = &[]interface{}{x} - } - return json.Unmarshal(*c.serverRequest.Params, params) -} - -func (c *concReqsBiJSONCoded) ReadResponseBody(x interface{}) error { - if err := ConReqs.Allocate(); err != nil { - return err - } - if x == nil { - return nil - } - return json.Unmarshal(*c.clientResponse.Result, x) -} - -func (c *concReqsBiJSONCoded) WriteRequest(r *rpc2.Request, param interface{}) error { - req := &clientRequest{Method: r.Method} - switch param := param.(type) { - case []interface{}: - req.Params = param - default: - req.Params = []interface{}{param} - } - if r.Seq == 0 { - // Notification - req.Id = nil - } else { - seq := r.Seq - req.Id = &seq - } - return c.enc.Encode(req) -} - -func (c *concReqsBiJSONCoded) WriteResponse(r *rpc2.Response, x interface{}) error { - defer ConReqs.Deallocate(r.Error) - c.mutex.Lock() - b, ok := c.pending[r.Seq] - if !ok { - c.mutex.Unlock() - return errors.New("invalid sequence number in response") - } - delete(c.pending, r.Seq) - c.mutex.Unlock() - - if b == nil { - // Invalid request so no id. Use JSON null. - b = &null - } - resp := serverResponse{Id: b} - if r.Error == "" { - resp.Result = x - } else { - resp.Error = r.Error - } - return c.enc.Encode(resp) -} - -func (c *concReqsBiJSONCoded) Close() error { - return c.c.Close() -} diff --git a/utils/concureqs.go b/utils/concureqs.go index 8f2029853..8d41f7843 100644 --- a/utils/concureqs.go +++ b/utils/concureqs.go @@ -25,16 +25,16 @@ import ( var ConReqs *ConcReqs type ConcReqs struct { - aReqs chan struct{} - nAReqs int + limit int strategy string + aReqs chan struct{} } func NewConReqs(reqs int, strategy string) *ConcReqs { cR := &ConcReqs{ - aReqs: make(chan struct{}, reqs), - nAReqs: reqs, + limit: reqs, strategy: strategy, + aReqs: make(chan struct{}, reqs), } for i := 0; i < reqs; i++ { cR.aReqs <- struct{}{} @@ -45,7 +45,7 @@ func NewConReqs(reqs int, strategy string) *ConcReqs { var errDeny = fmt.Errorf("denying request due to maximum active requests reached") func (cR *ConcReqs) Allocate() (err error) { - if cR.nAReqs == 0 { + if cR.limit == 0 { return } switch cR.strategy { @@ -60,8 +60,8 @@ func (cR *ConcReqs) Allocate() (err error) { return } -func (cR *ConcReqs) Deallocate(errStr string) { - if cR.nAReqs == 0 || errStr == errDeny.Error() { // in case we receive denying request we don't need to put back the slot on channel because we returned error without getting it +func (cR *ConcReqs) Deallocate() { + if cR.limit == 0 { return } cR.aReqs <- struct{}{} diff --git a/utils/concureqs_gob_codec.go b/utils/concureqs_gob_codec.go index 2ceb66a76..16a3b4690 100644 --- a/utils/concureqs_gob_codec.go +++ b/utils/concureqs_gob_codec.go @@ -28,11 +28,12 @@ import ( ) type concReqsGobServerCodec struct { - rwc io.ReadWriteCloser - dec *gob.Decoder - enc *gob.Encoder - encBuf *bufio.Writer - closed bool + rwc io.ReadWriteCloser + dec *gob.Decoder + enc *gob.Encoder + encBuf *bufio.Writer + closed bool + allocated bool // populated if we have allocated a channel for concurrent requests } func NewConcReqsGobServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec { @@ -53,11 +54,17 @@ func (c *concReqsGobServerCodec) ReadRequestBody(body interface{}) error { if err := ConReqs.Allocate(); err != nil { return err } + c.allocated = true return c.dec.Decode(body) } func (c *concReqsGobServerCodec) WriteResponse(r *rpc.Response, body interface{}) (err error) { - defer ConReqs.Deallocate(r.Error) + if c.allocated { + defer func() { + ConReqs.Deallocate() + c.allocated = false + }() + } if err = c.enc.Encode(r); err != nil { if c.encBuf.Flush() == nil { // Gob couldn't encode the header. Should not happen, so if it does, diff --git a/utils/concureqs_json_codec.go b/utils/concureqs_json_codec.go index c85e7cd0c..125d661ac 100644 --- a/utils/concureqs_json_codec.go +++ b/utils/concureqs_json_codec.go @@ -44,6 +44,8 @@ type concReqsServerCodec struct { mutex sync.Mutex // protects seq, pending seq uint64 pending map[uint64]*json.RawMessage + + allocated bool // populated if we have allocated a channel for concurrent requests } // NewConcReqsServerCodec returns a new rpc.ServerCodec using JSON-RPC on conn. @@ -80,6 +82,7 @@ func (c *concReqsServerCodec) ReadRequestBody(x interface{}) error { if err := ConReqs.Allocate(); err != nil { return err } + c.allocated = true if x == nil { return nil } @@ -96,7 +99,13 @@ func (c *concReqsServerCodec) ReadRequestBody(x interface{}) error { } func (c *concReqsServerCodec) WriteResponse(r *rpc.Response, x interface{}) error { - defer ConReqs.Deallocate(r.Error) + if c.allocated { + defer func() { + ConReqs.Deallocate() + c.allocated = false + }() + } + c.mutex.Lock() b, ok := c.pending[r.Seq] if !ok { diff --git a/utils/json_codec.go b/utils/json_codec.go index 882ee2a4b..fad261653 100644 --- a/utils/json_codec.go +++ b/utils/json_codec.go @@ -51,6 +51,8 @@ type jsonServerCodec struct { mutex sync.Mutex // protects seq, pending seq uint64 pending map[uint64]*json.RawMessage + + allocated bool // populated if we have allocated a channel for concurrent requests } // NewCustomJSONServerCodec is used only when DispatcherS is active to handle APIer methods generically @@ -112,6 +114,7 @@ func (c *jsonServerCodec) ReadRequestBody(x interface{}) error { if err := ConReqs.Allocate(); err != nil { return err } + c.allocated = true if x == nil { return nil } @@ -140,7 +143,12 @@ func (c *jsonServerCodec) ReadRequestBody(x interface{}) error { var null = json.RawMessage([]byte("null")) func (c *jsonServerCodec) WriteResponse(r *rpc.Response, x interface{}) error { - defer ConReqs.Deallocate(r.Error) + if c.allocated { + defer func() { + ConReqs.Deallocate() + c.allocated = false + }() + } c.mutex.Lock() b, ok := c.pending[r.Seq] if !ok { diff --git a/utils/server.go b/utils/server.go index 3349c0559..64f797899 100644 --- a/utils/server.go +++ b/utils/server.go @@ -37,6 +37,8 @@ import ( "sync" "time" + rpc2_jsonrpc "github.com/cenkalti/rpc2/jsonrpc" + "github.com/cenkalti/rpc2" "golang.org/x/net/websocket" ) @@ -327,7 +329,7 @@ func (s *Server) ServeBiJSON(addr string, onConn func(*rpc2.Client), onDis func( log.Fatal(err) return // stop if we get Accept error } - go s.birpcSrv.ServeCodec(NewConcReqsBiJSONCoded(conn)) + go s.birpcSrv.ServeCodec(rpc2_jsonrpc.NewJSONCodec(conn)) } }(lBiJSON) <-s.stopbiRPCServer // wait until server is stoped to close the listener