diff --git a/utils/consts.go b/utils/consts.go
index b54d28b3d..fcf922e3d 100755
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -886,8 +886,9 @@ const (
// DispatcherS APIs
const (
- DispatcherSv1Ping = "DispatcherSv1.Ping"
- DispatcherSv1GetProfileForEvent = "DispatcherSv1.GetProfileForEvent"
+ DispatcherSv1Ping = "DispatcherSv1.Ping"
+ DispatcherSv1GetProfileForEvent = "DispatcherSv1.GetProfileForEvent"
+ DispatcherSv1SwitchApierVRequest = "DispatcherSv1.SwitchApierVRequest"
)
// AnalyzerS APIs
diff --git a/utils/json_codec.go b/utils/json_codec.go
new file mode 100644
index 000000000..21b019b62
--- /dev/null
+++ b/utils/json_codec.go
@@ -0,0 +1,163 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package utils
+
+import (
+ "encoding/json"
+ "errors"
+ "io"
+ "net/rpc"
+ "strings"
+ "sync"
+)
+
+var errMissingParams = errors.New("jsonrpc: request body missing params")
+
+type MethodParameters struct {
+ Method string
+ Parameters interface{}
+}
+
+type jsonServerCodec struct {
+ dec *json.Decoder // for reading JSON values
+ enc *json.Encoder // for writing JSON values
+ c io.Closer
+
+ // temporary work space
+ req serverRequest
+
+ // JSON-RPC clients can use arbitrary json values as request IDs.
+ // Package rpc expects uint64 request IDs.
+ // We assign uint64 sequence numbers to incoming requests
+ // but save the original request ID in the pending map.
+ // When rpc responds, we use the sequence number in
+ // the response to find the original request ID.
+ mutex sync.Mutex // protects seq, pending
+ seq uint64
+ pending map[uint64]*json.RawMessage
+}
+
+// NewServerCodec returns a new rpc.ServerCodec using JSON-RPC on conn.
+func NewCustomJSONServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec {
+ return &jsonServerCodec{
+ dec: json.NewDecoder(conn),
+ enc: json.NewEncoder(conn),
+ c: conn,
+ pending: make(map[uint64]*json.RawMessage),
+ }
+}
+
+type serverRequest struct {
+ Method string `json:"method"`
+ Params *json.RawMessage `json:"params"`
+ Id *json.RawMessage `json:"id"`
+}
+
+func (r *serverRequest) reset() {
+ r.Method = ""
+ r.Params = nil
+ r.Id = nil
+}
+
+type serverResponse struct {
+ Id *json.RawMessage `json:"id"`
+ Result interface{} `json:"result"`
+ Error interface{} `json:"error"`
+}
+
+func (c *jsonServerCodec) ReadRequestHeader(r *rpc.Request) error {
+ c.req.reset()
+ if err := c.dec.Decode(&c.req); err != nil {
+ return err
+ }
+ // in case we get a request with ApierV1 or ApierV2 we redirect
+ // to Dispatcher to send it according to ArgDispatcher
+ if strings.HasPrefix(c.req.Method, "ApierV") {
+ r.ServiceMethod = DispatcherSv1SwitchApierVRequest
+ } else {
+ r.ServiceMethod = c.req.Method
+ }
+
+ // JSON request id can be any JSON value;
+ // RPC package expects uint64. Translate to
+ // internal uint64 and save JSON on the side.
+ c.mutex.Lock()
+ c.seq++
+ c.pending[c.seq] = c.req.Id
+ c.req.Id = nil
+ r.Seq = c.seq
+ c.mutex.Unlock()
+
+ return nil
+}
+
+func (c *jsonServerCodec) ReadRequestBody(x interface{}) error {
+ if x == nil {
+ return nil
+ }
+ if c.req.Params == nil {
+ return errMissingParams
+ }
+ // following example from ReadRequestHeader in case we get ApierV1
+ // or ApierV2 we compose the parameters
+ if strings.HasPrefix(c.req.Method, "ApierV") {
+ cx := x.(*MethodParameters)
+ cx.Method = c.req.Method
+ var params [1]interface{}
+ params[0] = &cx.Parameters
+ return json.Unmarshal(*c.req.Params, ¶ms)
+ }
+ // JSON params is array value.
+ // RPC params is struct.
+ // Unmarshal into array containing struct for now.
+ // Should think about making RPC more general.
+ var params [1]interface{}
+ params[0] = x
+ return json.Unmarshal(*c.req.Params, ¶ms)
+
+}
+
+var null = json.RawMessage([]byte("null"))
+
+func (c *jsonServerCodec) WriteResponse(r *rpc.Response, x interface{}) error {
+ c.mutex.Lock()
+ b, ok := c.pending[r.Seq]
+ if !ok {
+ c.mutex.Unlock()
+ return errors.New("invalid sequence number in response")
+ }
+ delete(c.pending, r.Seq)
+ c.mutex.Unlock()
+
+ if b == nil {
+ // Invalid request so no id. Use JSON null.
+ b = &null
+ }
+ resp := serverResponse{Id: b}
+ if r.Error == "" {
+ resp.Result = x
+ } else {
+ resp.Error = r.Error
+ }
+ return c.enc.Encode(resp)
+}
+
+func (c *jsonServerCodec) Close() error {
+ return c.c.Close()
+}
diff --git a/utils/server.go b/utils/server.go
index 8818e2bb9..39a5222e0 100644
--- a/utils/server.go
+++ b/utils/server.go
@@ -52,8 +52,13 @@ type Server struct {
httpEnabled bool
birpcSrv *rpc2.Server
sync.RWMutex
- httpsMux *http.ServeMux
- httpMux *http.ServeMux
+ httpsMux *http.ServeMux
+ httpMux *http.ServeMux
+ isDispatched bool
+}
+
+func (s *Server) SetDispatched() {
+ s.isDispatched = true
}
func (s *Server) RpcRegister(rcvr interface{}) {
@@ -156,7 +161,12 @@ func (s *Server) ServeJSON(addr string) {
continue
}
//utils.Logger.Info(fmt.Sprintf(" New incoming connection: %v", conn.RemoteAddr()))
- go jsonrpc.ServeConn(conn)
+ if s.isDispatched {
+ go rpc.ServeCodec(NewCustomJSONServerCodec(conn))
+ } else {
+ go jsonrpc.ServeConn(conn)
+ }
+
}
}