diff --git a/apier/v1/concreqs_it_test.go b/apier/v1/concreqs_it_test.go index b10bba647..7a0c24382 100644 --- a/apier/v1/concreqs_it_test.go +++ b/apier/v1/concreqs_it_test.go @@ -186,7 +186,7 @@ func testConcReqsOnHTTPBusy(t *testing.T) { return } resp.Body.Close() - if strings.Contains(string(contents), "denying request due to maximum active requests reached") { + if strings.Contains(string(contents), utils.ErrMaxConcurentRPCExceeded.Error()) { lock.Lock() fldAPIs++ lock.Unlock() diff --git a/engine/thresholds.go b/engine/thresholds.go index ce6cd71c8..6fcc42ee5 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -87,13 +87,10 @@ func (t *Threshold) TenantID() string { // ProcessEvent processes an ThresholdEvent // concurrentActions limits the number of simultaneous action sets executed func (t *Threshold) ProcessEvent(args *ThresholdsArgsProcessEvent, dm *DataManager) (err error) { - if t.Snooze.After(time.Now()) { // snoozed, not executing actions - return - } - if t.Hits < t.tPrfl.MinHits { // number of hits was not met, will not execute actions - return - } - if t.tPrfl.MaxHits != -1 && t.Hits > t.tPrfl.MaxHits { + if t.Snooze.After(time.Now()) || // snoozed, not executing actions + t.Hits < t.tPrfl.MinHits || // number of hits was not met, will not execute actions + (t.tPrfl.MaxHits != -1 && + t.Hits > t.tPrfl.MaxHits) { return } acnt, _ := args.FieldAsString(utils.Account) @@ -111,18 +108,14 @@ func (t *Threshold) ProcessEvent(args *ThresholdsArgsProcessEvent, dm *DataManag at.accountIDs = utils.NewStringMap(acntID) } if t.tPrfl.Async { - go func() { if errExec := at.Execute(nil, nil); errExec != nil { utils.Logger.Warning(fmt.Sprintf(" failed executing actions: %s, error: %s", actionSetID, errExec.Error())) } }() - - } else { - if errExec := at.Execute(nil, nil); errExec != nil { - utils.Logger.Warning(fmt.Sprintf(" failed executing actions: %s, error: %s", actionSetID, errExec.Error())) - err = utils.ErrPartiallyExecuted - } + } else if errExec := at.Execute(nil, nil); errExec != nil { + utils.Logger.Warning(fmt.Sprintf(" failed executing actions: %s, error: %s", actionSetID, errExec.Error())) + err = utils.ErrPartiallyExecuted } } return @@ -373,11 +366,10 @@ func (tS *ThresholdService) processEvent(args *ThresholdsArgsProcessEvent) (thre } t.Snooze = time.Now().Add(t.tPrfl.MinSleep) // recurrent threshold + *t.dirty = true // mark it to be saved if tS.cgrcfg.ThresholdSCfg().StoreInterval == -1 { - *t.dirty = true tS.StoreThreshold(t) } else { - *t.dirty = true // mark it to be saved tS.stMux.Lock() tS.storedTdIDs[t.TenantID()] = true tS.stMux.Unlock() diff --git a/utils/concureqs.go b/utils/concureqs.go index 8d41f7843..5a1081c25 100644 --- a/utils/concureqs.go +++ b/utils/concureqs.go @@ -19,7 +19,9 @@ along with this program. If not, see package utils import ( - "fmt" + "io" + "net/rpc" + "net/rpc/jsonrpc" ) var ConReqs *ConcReqs @@ -42,8 +44,7 @@ func NewConReqs(reqs int, strategy string) *ConcReqs { return cR } -var errDeny = fmt.Errorf("denying request due to maximum active requests reached") - +// Allocate will reserve a channel for the API call func (cR *ConcReqs) Allocate() (err error) { if cR.limit == 0 { return @@ -51,7 +52,7 @@ func (cR *ConcReqs) Allocate() (err error) { switch cR.strategy { case MetaBusy: if len(cR.aReqs) == 0 { - return errDeny + return ErrMaxConcurentRPCExceededNoCaps } fallthrough case MetaQueue: @@ -60,6 +61,7 @@ func (cR *ConcReqs) Allocate() (err error) { return } +// Deallocate will free a channel for the API call func (cR *ConcReqs) Deallocate() { if cR.limit == 0 { return @@ -67,3 +69,39 @@ func (cR *ConcReqs) Deallocate() { cR.aReqs <- struct{}{} return } + +func newConcReqsGOBCodec(conn io.ReadWriteCloser) rpc.ServerCodec { + return newConcReqsServerCodec(newGobServerCodec(conn)) +} + +func newConcReqsJSONCodec(conn io.ReadWriteCloser) rpc.ServerCodec { + return newConcReqsServerCodec(jsonrpc.NewServerCodec(conn)) +} + +func newConcReqsServerCodec(sc rpc.ServerCodec) rpc.ServerCodec { + return &concReqsServerCodec2{sc: sc} +} + +type concReqsServerCodec2 struct { + sc rpc.ServerCodec +} + +func (c *concReqsServerCodec2) ReadRequestHeader(r *rpc.Request) error { + return c.sc.ReadRequestHeader(r) +} + +func (c *concReqsServerCodec2) ReadRequestBody(x interface{}) error { + if err := ConReqs.Allocate(); err != nil { + return err + } + return c.sc.ReadRequestBody(x) +} +func (c *concReqsServerCodec2) WriteResponse(r *rpc.Response, x interface{}) error { + if r.Error == ErrMaxConcurentRPCExceededNoCaps.Error() { + r.Error = ErrMaxConcurentRPCExceeded.Error() + } else { + defer ConReqs.Deallocate() + } + return c.sc.WriteResponse(r, x) +} +func (c *concReqsServerCodec2) Close() error { return c.sc.Close() } diff --git a/utils/concureqs_json_codec.go b/utils/concureqs_json_codec.go deleted file mode 100644 index 125d661ac..000000000 --- a/utils/concureqs_json_codec.go +++ /dev/null @@ -1,133 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -// Most of the logic follows standard library implementation in this file -package utils - -import ( - "encoding/json" - "errors" - "io" - "net/rpc" - "sync" -) - -type concReqsServerCodec struct { - dec *json.Decoder // for reading JSON values - enc *json.Encoder // for writing JSON values - c io.Closer - - // temporary work space - req serverRequest - - // JSON-RPC clients can use arbitrary json values as request IDs. - // Package rpc expects uint64 request IDs. - // We assign uint64 sequence numbers to incoming requests - // but save the original request ID in the pending map. - // When rpc responds, we use the sequence number in - // the response to find the original request ID. - mutex sync.Mutex // protects seq, pending - seq uint64 - pending map[uint64]*json.RawMessage - - allocated bool // populated if we have allocated a channel for concurrent requests -} - -// NewConcReqsServerCodec returns a new rpc.ServerCodec using JSON-RPC on conn. -func NewConcReqsServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec { - return &concReqsServerCodec{ - dec: json.NewDecoder(conn), - enc: json.NewEncoder(conn), - c: conn, - pending: make(map[uint64]*json.RawMessage), - } -} - -func (c *concReqsServerCodec) ReadRequestHeader(r *rpc.Request) error { - c.req.reset() - if err := c.dec.Decode(&c.req); err != nil { - return err - } - r.ServiceMethod = c.req.Method - - // JSON request id can be any JSON value; - // RPC package expects uint64. Translate to - // internal uint64 and save JSON on the side. - c.mutex.Lock() - c.seq++ - c.pending[c.seq] = c.req.Id - c.req.Id = nil - r.Seq = c.seq - c.mutex.Unlock() - - return nil -} - -func (c *concReqsServerCodec) ReadRequestBody(x interface{}) error { - if err := ConReqs.Allocate(); err != nil { - return err - } - c.allocated = true - if x == nil { - return nil - } - if c.req.Params == nil { - return errMissingParams - } - // JSON params is array value. - // RPC params is struct. - // Unmarshal into array containing struct for now. - // Should think about making RPC more general. - var params [1]interface{} - params[0] = x - return json.Unmarshal(*c.req.Params, ¶ms) -} - -func (c *concReqsServerCodec) WriteResponse(r *rpc.Response, x interface{}) error { - if c.allocated { - defer func() { - ConReqs.Deallocate() - c.allocated = false - }() - } - - c.mutex.Lock() - b, ok := c.pending[r.Seq] - if !ok { - c.mutex.Unlock() - return errors.New("invalid sequence number in response") - } - delete(c.pending, r.Seq) - c.mutex.Unlock() - - if b == nil { - // Invalid request so no id. Use JSON null. - b = &null - } - resp := serverResponse{Id: b} - if r.Error == "" { - resp.Result = x - } else { - resp.Error = r.Error - } - return c.enc.Encode(resp) -} - -func (c *concReqsServerCodec) Close() error { - return c.c.Close() -} diff --git a/utils/errors.go b/utils/errors.go index 9ebf580bc..eb84a021a 100644 --- a/utils/errors.go +++ b/utils/errors.go @@ -27,52 +27,54 @@ import ( ) var ( - ErrNoMoreData = errors.New("NO_MORE_DATA") - ErrNotImplemented = errors.New("NOT_IMPLEMENTED") - ErrNotFound = errors.New("NOT_FOUND") - ErrHostNotFound = errors.New("HOST_NOT_FOUND") - ErrTimedOut = errors.New("TIMED_OUT") - ErrServerError = errors.New("SERVER_ERROR") - ErrMaxRecursionDepth = errors.New("MAX_RECURSION_DEPTH") - ErrMandatoryIeMissing = errors.New("MANDATORY_IE_MISSING") - ErrExists = errors.New("EXISTS") - ErrBrokenReference = errors.New("BROKEN_REFERENCE") - ErrParserError = errors.New("PARSER_ERROR") - ErrInvalidPath = errors.New("INVALID_PATH") - ErrInvalidKey = errors.New("INVALID_KEY") - ErrUnauthorizedDestination = errors.New("UNAUTHORIZED_DESTINATION") - ErrRatingPlanNotFound = errors.New("RATING_PLAN_NOT_FOUND") - ErrAccountNotFound = errors.New("ACCOUNT_NOT_FOUND") - ErrAccountDisabled = errors.New("ACCOUNT_DISABLED") - ErrInsufficientCredit = errors.New("INSUFFICIENT_CREDIT") - ErrNotConvertible = errors.New("NOT_CONVERTIBLE") - ErrResourceUnavailable = errors.New("RESOURCE_UNAVAILABLE") - ErrResourceUnauthorized = errors.New("RESOURCE_UNAUTHORIZED") - ErrNoActiveSession = errors.New("NO_ACTIVE_SESSION") - ErrPartiallyExecuted = errors.New("PARTIALLY_EXECUTED") - ErrMaxUsageExceeded = errors.New("MAX_USAGE_EXCEEDED") - ErrMaxCostExceeded = errors.New("MAX_COST_EXCEEDED") - ErrFilterNotPassingNoCaps = errors.New("filter not passing") - ErrNotConvertibleNoCaps = errors.New("not convertible") - ErrMandatoryIeMissingNoCaps = errors.New("mandatory information missing") - ErrUnauthorizedApi = errors.New("UNAUTHORIZED_API") - ErrUnknownApiKey = errors.New("UNKNOWN_API_KEY") - ErrReqUnsynchronized = errors.New("REQ_UNSYNCHRONIZED") - ErrUnsupporteServiceMethod = errors.New("UNSUPPORTED_SERVICE_METHOD") - ErrDisconnected = errors.New("DISCONNECTED") - ErrReplyTimeout = errors.New("REPLY_TIMEOUT") - ErrSessionNotFound = errors.New("SESSION_NOT_FOUND") - ErrJsonIncompleteComment = errors.New("JSON_INCOMPLETE_COMMENT") - ErrNotEnoughParameters = errors.New("NotEnoughParameters") - ErrNotConnected = errors.New("NOT_CONNECTED") - RalsErrorPrfx = "RALS_ERROR" - DispatcherErrorPrefix = "DISPATCHER_ERROR" - ErrUnsupportedFormat = errors.New("UNSUPPORTED_FORMAT") - ErrNoDatabaseConn = errors.New("NO_DATA_BASE_CONNECTION") - ErrMaxIncrementsExceeded = errors.New("MAX_INCREMENTS_EXCEEDED") - ErrIndexOutOfBounds = errors.New("INDEX_OUT_OF_BOUNDS") - ErrWrongPath = errors.New("WRONG_PATH") - ErrServiceAlreadyRunning = fmt.Errorf("service already running") + ErrNoMoreData = errors.New("NO_MORE_DATA") + ErrNotImplemented = errors.New("NOT_IMPLEMENTED") + ErrNotFound = errors.New("NOT_FOUND") + ErrHostNotFound = errors.New("HOST_NOT_FOUND") + ErrTimedOut = errors.New("TIMED_OUT") + ErrServerError = errors.New("SERVER_ERROR") + ErrMaxRecursionDepth = errors.New("MAX_RECURSION_DEPTH") + ErrMandatoryIeMissing = errors.New("MANDATORY_IE_MISSING") + ErrExists = errors.New("EXISTS") + ErrBrokenReference = errors.New("BROKEN_REFERENCE") + ErrParserError = errors.New("PARSER_ERROR") + ErrInvalidPath = errors.New("INVALID_PATH") + ErrInvalidKey = errors.New("INVALID_KEY") + ErrUnauthorizedDestination = errors.New("UNAUTHORIZED_DESTINATION") + ErrRatingPlanNotFound = errors.New("RATING_PLAN_NOT_FOUND") + ErrAccountNotFound = errors.New("ACCOUNT_NOT_FOUND") + ErrAccountDisabled = errors.New("ACCOUNT_DISABLED") + ErrInsufficientCredit = errors.New("INSUFFICIENT_CREDIT") + ErrNotConvertible = errors.New("NOT_CONVERTIBLE") + ErrResourceUnavailable = errors.New("RESOURCE_UNAVAILABLE") + ErrResourceUnauthorized = errors.New("RESOURCE_UNAUTHORIZED") + ErrNoActiveSession = errors.New("NO_ACTIVE_SESSION") + ErrPartiallyExecuted = errors.New("PARTIALLY_EXECUTED") + ErrMaxUsageExceeded = errors.New("MAX_USAGE_EXCEEDED") + ErrMaxCostExceeded = errors.New("MAX_COST_EXCEEDED") + ErrFilterNotPassingNoCaps = errors.New("filter not passing") + ErrNotConvertibleNoCaps = errors.New("not convertible") + ErrMandatoryIeMissingNoCaps = errors.New("mandatory information missing") + ErrUnauthorizedApi = errors.New("UNAUTHORIZED_API") + ErrUnknownApiKey = errors.New("UNKNOWN_API_KEY") + ErrReqUnsynchronized = errors.New("REQ_UNSYNCHRONIZED") + ErrUnsupporteServiceMethod = errors.New("UNSUPPORTED_SERVICE_METHOD") + ErrDisconnected = errors.New("DISCONNECTED") + ErrReplyTimeout = errors.New("REPLY_TIMEOUT") + ErrSessionNotFound = errors.New("SESSION_NOT_FOUND") + ErrJsonIncompleteComment = errors.New("JSON_INCOMPLETE_COMMENT") + ErrNotEnoughParameters = errors.New("NotEnoughParameters") + ErrNotConnected = errors.New("NOT_CONNECTED") + RalsErrorPrfx = "RALS_ERROR" + DispatcherErrorPrefix = "DISPATCHER_ERROR" + ErrUnsupportedFormat = errors.New("UNSUPPORTED_FORMAT") + ErrNoDatabaseConn = errors.New("NO_DATA_BASE_CONNECTION") + ErrMaxIncrementsExceeded = errors.New("MAX_INCREMENTS_EXCEEDED") + ErrIndexOutOfBounds = errors.New("INDEX_OUT_OF_BOUNDS") + ErrWrongPath = errors.New("WRONG_PATH") + ErrServiceAlreadyRunning = fmt.Errorf("service already running") + ErrMaxConcurentRPCExceededNoCaps = errors.New("max concurent rpc exceeded") // on internal we return this error for concureq + ErrMaxConcurentRPCExceeded = errors.New("MAX_CONCURENT_RPC_EXCEEDED") // but the codec will rewrite it with this one to be sure that we corectly dealocate the request ErrMap = map[string]error{ ErrNoMoreData.Error(): ErrNoMoreData, diff --git a/utils/concureqs_gob_codec.go b/utils/gob_codec.go similarity index 66% rename from utils/concureqs_gob_codec.go rename to utils/gob_codec.go index 16a3b4690..087b57b3c 100644 --- a/utils/concureqs_gob_codec.go +++ b/utils/gob_codec.go @@ -16,7 +16,6 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -// Most of the logic follows standard library implementation in this file package utils import ( @@ -27,18 +26,12 @@ import ( "net/rpc" ) -type concReqsGobServerCodec struct { - rwc io.ReadWriteCloser - dec *gob.Decoder - enc *gob.Encoder - encBuf *bufio.Writer - closed bool - allocated bool // populated if we have allocated a channel for concurrent requests -} +// All of the logic follows standard library implementation in this file +// Only here to use as object -func NewConcReqsGobServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec { +func newGobServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec { buf := bufio.NewWriter(conn) - return &concReqsGobServerCodec{ + return &gobServerCodec{ rwc: conn, dec: gob.NewDecoder(conn), enc: gob.NewEncoder(buf), @@ -46,25 +39,23 @@ func NewConcReqsGobServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec { } } -func (c *concReqsGobServerCodec) ReadRequestHeader(r *rpc.Request) error { +type gobServerCodec struct { + rwc io.ReadWriteCloser + dec *gob.Decoder + enc *gob.Encoder + encBuf *bufio.Writer + closed bool +} + +func (c *gobServerCodec) ReadRequestHeader(r *rpc.Request) error { return c.dec.Decode(r) } -func (c *concReqsGobServerCodec) ReadRequestBody(body interface{}) error { - if err := ConReqs.Allocate(); err != nil { - return err - } - c.allocated = true +func (c *gobServerCodec) ReadRequestBody(body interface{}) error { return c.dec.Decode(body) } -func (c *concReqsGobServerCodec) WriteResponse(r *rpc.Response, body interface{}) (err error) { - if c.allocated { - defer func() { - ConReqs.Deallocate() - c.allocated = false - }() - } +func (c *gobServerCodec) WriteResponse(r *rpc.Response, body interface{}) (err error) { if err = c.enc.Encode(r); err != nil { if c.encBuf.Flush() == nil { // Gob couldn't encode the header. Should not happen, so if it does, @@ -86,7 +77,7 @@ func (c *concReqsGobServerCodec) WriteResponse(r *rpc.Response, body interface{} return c.encBuf.Flush() } -func (c *concReqsGobServerCodec) Close() error { +func (c *gobServerCodec) Close() error { if c.closed { // Only call c.rwc.Close once; otherwise the semantics are undefined. return nil diff --git a/utils/json_codec.go b/utils/json_codec.go index 8534530a8..d8a149f9b 100644 --- a/utils/json_codec.go +++ b/utils/json_codec.go @@ -20,51 +20,9 @@ package utils import ( "encoding/json" - "errors" "io" - "net/rpc" - "strings" - "sync" ) -var errMissingParams = errors.New("jsonrpc: request body missing params") - -type MethodParameters struct { - Method string - Parameters interface{} -} - -type jsonServerCodec struct { - dec *json.Decoder // for reading JSON values - enc *json.Encoder // for writing JSON values - c io.Closer - - // temporary work space - req serverRequest - - // JSON-RPC clients can use arbitrary json values as request IDs. - // Package rpc expects uint64 request IDs. - // We assign uint64 sequence numbers to incoming requests - // but save the original request ID in the pending map. - // When rpc responds, we use the sequence number in - // the response to find the original request ID. - mutex sync.Mutex // protects seq, pending - seq uint64 - pending map[uint64]*json.RawMessage - - allocated bool // populated if we have allocated a channel for concurrent requests -} - -// NewCustomJSONServerCodec is used only when DispatcherS is active to handle APIer methods generically -func NewCustomJSONServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec { - return &jsonServerCodec{ - dec: json.NewDecoder(conn), - enc: json.NewEncoder(conn), - c: conn, - pending: make(map[uint64]*json.RawMessage), - } -} - func DecodeServerRequest(r io.Reader) (req *serverRequest, err error) { req = new(serverRequest) err = json.NewDecoder(r).Decode(req) @@ -81,16 +39,9 @@ func NewServerRequest(method string, params, id json.RawMessage) *serverRequest } type serverRequest struct { - Method string `json:"method"` - Params *json.RawMessage `json:"params"` - Id *json.RawMessage `json:"id"` - isApier bool -} - -func (r *serverRequest) reset() { - r.Method = "" - r.Params = nil - r.Id = nil + Method string `json:"method"` + Params *json.RawMessage `json:"params"` + Id *json.RawMessage `json:"id"` } func WriteServerResponse(w io.Writer, id *json.RawMessage, result, err interface{}) error { @@ -107,94 +58,3 @@ type serverResponse struct { Result interface{} `json:"result"` Error interface{} `json:"error"` } - -func (c *jsonServerCodec) ReadRequestHeader(r *rpc.Request) error { - c.req.reset() - if err := c.dec.Decode(&c.req); err != nil { - return err - } - // in case we get a request with APIerSv1 or APIerSv2 we redirect - // to Dispatcher to send it according to ArgDispatcher - if c.req.isApier = strings.HasPrefix(c.req.Method, ApierV); c.req.isApier { - r.ServiceMethod = DispatcherSv1Apier - } else { - r.ServiceMethod = c.req.Method - } - - // JSON request id can be any JSON value; - // RPC package expects uint64. Translate to - // internal uint64 and save JSON on the side. - c.mutex.Lock() - c.seq++ - c.pending[c.seq] = c.req.Id - c.req.Id = nil - r.Seq = c.seq - c.mutex.Unlock() - - return nil -} - -func (c *jsonServerCodec) ReadRequestBody(x interface{}) error { - if err := ConReqs.Allocate(); err != nil { - return err - } - c.allocated = true - if x == nil { - return nil - } - if c.req.Params == nil { - return errMissingParams - } - // following example from ReadRequestHeader in case we get APIerSv1 - // or APIerSv2 we compose the parameters - if c.req.isApier { - cx := x.(*MethodParameters) - cx.Method = c.req.Method - var params [1]interface{} - params[0] = &cx.Parameters - return json.Unmarshal(*c.req.Params, ¶ms) - } - // JSON params is array value. - // RPC params is struct. - // Unmarshal into array containing struct for now. - // Should think about making RPC more general. - var params [1]interface{} - params[0] = x - return json.Unmarshal(*c.req.Params, ¶ms) - -} - -var null = json.RawMessage([]byte("null")) - -func (c *jsonServerCodec) WriteResponse(r *rpc.Response, x interface{}) error { - if c.allocated { - defer func() { - ConReqs.Deallocate() - c.allocated = false - }() - } - c.mutex.Lock() - b, ok := c.pending[r.Seq] - if !ok { - c.mutex.Unlock() - return errors.New("invalid sequence number in response") - } - delete(c.pending, r.Seq) - c.mutex.Unlock() - - if b == nil { - // Invalid request so no id. Use JSON null. - b = &null - } - resp := serverResponse{Id: b} - if r.Error == "" { - resp.Result = x - } else { - resp.Error = r.Error - } - return c.enc.Encode(resp) -} - -func (c *jsonServerCodec) Close() error { - return c.c.Close() -} diff --git a/utils/server.go b/utils/server.go index eaca80b8f..682b7a96b 100644 --- a/utils/server.go +++ b/utils/server.go @@ -66,11 +66,6 @@ type Server struct { stopbiRPCServer chan struct{} // used in order to fully stop the biRPC httpsMux *http.ServeMux httpMux *http.ServeMux - isDispatched bool -} - -func (s *Server) SetDispatched() { - s.isDispatched = true } func (s *Server) RpcRegister(rcvr interface{}) { @@ -176,11 +171,7 @@ func (s *Server) ServeJSON(addr string, exitChan chan bool) { } continue } - if s.isDispatched { - go rpc.ServeCodec(NewCustomJSONServerCodec(conn)) - } else { - go rpc.ServeCodec(NewConcReqsServerCodec(conn)) - } + go rpc.ServeCodec(newConcReqsJSONCodec(conn)) } } @@ -216,7 +207,7 @@ func (s *Server) ServeGOB(addr string, exitChan chan bool) { } continue } - go rpc.ServeCodec(NewConcReqsGobServerCodec(conn)) + go rpc.ServeCodec(newConcReqsGOBCodec(conn)) } } @@ -276,11 +267,7 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, s.Unlock() Logger.Info(" enabling handler for WebSocket connections") wsHandler := websocket.Handler(func(ws *websocket.Conn) { - if s.isDispatched { - rpc.ServeCodec(NewCustomJSONServerCodec(ws)) - } else { - rpc.ServeCodec(NewConcReqsServerCodec(ws)) - } + rpc.ServeCodec(newConcReqsJSONCodec(ws)) }) if useBasicAuth { s.httpMux.HandleFunc(wsRPCURL, use(func(w http.ResponseWriter, r *http.Request) { @@ -376,7 +363,7 @@ func (r *rpcRequest) Close() error { // Call invokes the RPC request, waits for it to complete, and returns the results. func (r *rpcRequest) Call() io.Reader { - go rpc.ServeCodec(NewConcReqsServerCodec(r)) + go rpc.ServeCodec(newConcReqsJSONCodec(r)) <-r.done return r.rw } @@ -458,7 +445,7 @@ func (s *Server) ServeGOBTLS(addr, serverCrt, serverKey, caCert string, } continue } - go rpc.ServeCodec(NewConcReqsGobServerCodec(conn)) + go rpc.ServeCodec(newConcReqsGOBCodec(conn)) } } @@ -499,11 +486,7 @@ func (s *Server) ServeJSONTLS(addr, serverCrt, serverKey, caCert string, } continue } - if s.isDispatched { - go rpc.ServeCodec(NewCustomJSONServerCodec(conn)) - } else { - go rpc.ServeCodec(NewConcReqsServerCodec(conn)) - } + go rpc.ServeCodec(newConcReqsJSONCodec(conn)) } } @@ -534,11 +517,7 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP s.Unlock() Logger.Info(" enabling handler for WebSocket connections") wsHandler := websocket.Handler(func(ws *websocket.Conn) { - if s.isDispatched { - rpc.ServeCodec(NewCustomJSONServerCodec(ws)) - } else { - rpc.ServeCodec(NewConcReqsServerCodec(ws)) - } + rpc.ServeCodec(newConcReqsJSONCodec(ws)) }) if useBasicAuth { s.httpsMux.HandleFunc(wsRPCURL, use(func(w http.ResponseWriter, r *http.Request) {