Update rpcclient library to latest version

Replace all instances of rpcclient.ClientConnector with birpc.ClientConnector.

Pass context, maxReconnectInterval, delayFunc and birpcClient to rpcclient
constructors.

Remove redundant time.Duration conversions (e.g. time.Duration(1*time.Second)
now becomes time.Second.

Add context where needed (context.Background() for tests, context.TODO()
for places where it should be passed from somewhere else).

Implement that functionality of the SessionSv1.Sleep call, in sessions/sessions
instead of apier/v1.

Make changes in utils/server.go (replacing the old rpc2 library with github.com/cgrates/birpc).

Change the way we register birpc methods for sessions in services, using a helper function
defined in engine/libengine.go.
This commit is contained in:
ionutboangiu
2023-03-30 11:27:15 -04:00
committed by Dan Christian Bogos
parent 0560fa63f8
commit cb7ea790de
94 changed files with 767 additions and 687 deletions

View File

@@ -32,13 +32,12 @@ import (
"net/rpc"
"net/url"
"os"
"reflect"
"strings"
"sync"
"time"
"github.com/cenkalti/rpc2"
rpc2_jsonrpc "github.com/cenkalti/rpc2/jsonrpc"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/jsonrpc"
"golang.org/x/net/websocket"
)
@@ -51,6 +50,7 @@ func init() {
}
func NewServer() (s *Server) {
s = new(Server)
s.birpcSrv = birpc.NewBirpcServer()
s.httpMux = http.NewServeMux()
s.httpsMux = http.NewServeMux()
s.stopbiRPCServer = make(chan struct{}, 1)
@@ -61,7 +61,7 @@ type Server struct {
sync.RWMutex
rpcEnabled bool
httpEnabled bool
birpcSrv *rpc2.Server
birpcSrv *birpc.BirpcServer
stopbiRPCServer chan struct{} // used in order to fully stop the biRPC
httpsMux *http.ServeMux
httpMux *http.ServeMux
@@ -111,34 +111,12 @@ func (s *Server) RegisterHttpHandler(pattern string, handler http.Handler) {
}
// Registers a new BiJsonRpc name
func (s *Server) BiRPCRegisterName(method string, handlerFunc interface{}) {
s.RLock()
isNil := s.birpcSrv == nil
s.RUnlock()
if isNil {
s.Lock()
s.birpcSrv = rpc2.NewServer()
s.Unlock()
}
s.birpcSrv.Handle(method, handlerFunc)
func (s *Server) BiRPCRegisterName(name string, rcv interface{}) {
s.birpcSrv.RegisterName(name, rcv)
}
func (s *Server) BiRPCRegister(rcvr interface{}) {
s.RLock()
isNil := s.birpcSrv == nil
s.RUnlock()
if isNil {
s.Lock()
s.birpcSrv = rpc2.NewServer()
s.Unlock()
}
rcvType := reflect.TypeOf(rcvr)
for i := 0; i < rcvType.NumMethod(); i++ {
method := rcvType.Method(i)
if method.Name != "Call" {
s.birpcSrv.Handle("SMGenericV1."+method.Name, method.Func.Interface())
}
}
func (s *Server) BiRPCUnregisterName(name string) {
s.birpcSrv.UnregisterName(name)
}
func (s *Server) ServeJSON(addr string, exitChan chan bool) {
@@ -304,7 +282,7 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string,
}
// ServeBiJSON create a gorutine to listen and serve as BiRPC server
func (s *Server) ServeBiJSON(addr string, onConn func(*rpc2.Client), onDis func(*rpc2.Client)) (err error) {
func (s *Server) ServeBiJSON(addr string, onConn, onDis func(birpc.ClientConnector)) (err error) {
s.RLock()
isNil := s.birpcSrv == nil
s.RUnlock()
@@ -331,7 +309,7 @@ func (s *Server) ServeBiJSON(addr string, onConn func(*rpc2.Client), onDis func(
log.Fatal(err)
return // stop if we get Accept error
}
go s.birpcSrv.ServeCodec(rpc2_jsonrpc.NewJSONCodec(conn))
go s.birpcSrv.ServeCodec(jsonrpc.NewJSONBirpcCodec(conn))
}
}(lBiJSON)
<-s.stopbiRPCServer // wait until server is stoped to close the listener