mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-21 15:18:44 +05:00
Updated SessionS service shutdown
This commit is contained in:
committed by
Dan Christian Bogos
parent
146feb0ca5
commit
f6c1801368
@@ -354,6 +354,7 @@ func TestCGRConfigReloadRALs(t *testing.T) {
|
||||
Transport: utils.MetaJSONrpc,
|
||||
},
|
||||
},
|
||||
MaxIncrements: 1000000,
|
||||
}
|
||||
if !reflect.DeepEqual(expAttr, cfg.RalsCfg()) {
|
||||
t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.RalsCfg()))
|
||||
|
||||
@@ -43,6 +43,10 @@ type SessionService struct {
|
||||
rpc *v1.SMGenericV1
|
||||
rpcv1 *v1.SessionSv1
|
||||
connChan chan rpcclient.RpcClientConnection
|
||||
|
||||
// in order to stop the bircp server if necesary
|
||||
bircpEnabled bool
|
||||
server *utils.Server
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
@@ -133,6 +137,8 @@ func (smg *SessionService) Start(sp servmanager.ServiceProvider, waitCache bool)
|
||||
}
|
||||
// Register BiRpc handlers
|
||||
if sp.GetConfig().SessionSCfg().ListenBijson != "" {
|
||||
smg.bircpEnabled = true
|
||||
smg.server = sp.GetServer()
|
||||
for method, handler := range smg.rpc.Handlers() {
|
||||
sp.GetServer().BiRPCRegisterName(method, handler)
|
||||
}
|
||||
@@ -140,7 +146,14 @@ func (smg *SessionService) Start(sp servmanager.ServiceProvider, waitCache bool)
|
||||
sp.GetServer().BiRPCRegisterName(method, handler)
|
||||
}
|
||||
// run this in it's own gorutine
|
||||
go sp.GetServer().ServeBiJSON(sp.GetConfig().SessionSCfg().ListenBijson, smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect)
|
||||
go func() {
|
||||
if err := sp.GetServer().ServeBiJSON(sp.GetConfig().SessionSCfg().ListenBijson, smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> serve BiRPC error: %s!", utils.SessionS, err))
|
||||
smg.Lock()
|
||||
smg.bircpEnabled = false
|
||||
smg.Unlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -231,6 +244,10 @@ func (smg *SessionService) Shutdown() (err error) {
|
||||
if err = smg.sm.Shutdown(); err != nil {
|
||||
return
|
||||
}
|
||||
if smg.bircpEnabled {
|
||||
smg.server.StopBiRPC()
|
||||
smg.bircpEnabled = false
|
||||
}
|
||||
smg.sm = nil
|
||||
smg.rpc = nil
|
||||
smg.rpcv1 = nil
|
||||
|
||||
@@ -301,6 +301,15 @@ func (srvMngr *ServiceManager) handleReload() {
|
||||
for {
|
||||
select {
|
||||
case ext := <-srvMngr.engineShutdown:
|
||||
for srviceName, srv := range srvMngr.subsystems {
|
||||
if !srv.IsRunning() {
|
||||
continue
|
||||
}
|
||||
if err := srv.Shutdown(); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to shutdown subsystem <%s> because: %s",
|
||||
utils.ServiceManager, srviceName, err))
|
||||
}
|
||||
}
|
||||
srvMngr.engineShutdown <- ext
|
||||
return
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.ATTRIBUTE_JSN):
|
||||
|
||||
@@ -32,6 +32,7 @@ import (
|
||||
"net/rpc"
|
||||
"net/rpc/jsonrpc"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -44,17 +45,19 @@ func NewServer() (s *Server) {
|
||||
s = new(Server)
|
||||
s.httpMux = http.NewServeMux()
|
||||
s.httpsMux = http.NewServeMux()
|
||||
s.stopbiRPCServer = make(chan struct{}, 1)
|
||||
return s
|
||||
}
|
||||
|
||||
type Server struct {
|
||||
rpcEnabled bool
|
||||
httpEnabled bool
|
||||
birpcSrv *rpc2.Server
|
||||
sync.RWMutex
|
||||
httpsMux *http.ServeMux
|
||||
httpMux *http.ServeMux
|
||||
isDispatched bool
|
||||
rpcEnabled bool
|
||||
httpEnabled bool
|
||||
birpcSrv *rpc2.Server
|
||||
stopbiRPCServer chan struct{} // used in order to fully stop the biRPC
|
||||
httpsMux *http.ServeMux
|
||||
httpMux *http.ServeMux
|
||||
isDispatched bool
|
||||
}
|
||||
|
||||
func (s *Server) SetDispatched() {
|
||||
@@ -287,27 +290,45 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string,
|
||||
exitChan <- true
|
||||
}
|
||||
|
||||
func (s *Server) ServeBiJSON(addr string, onConn func(*rpc2.Client), onDis func(*rpc2.Client)) {
|
||||
// ServeBiJSON create a gorutine to listen and serve as BiRPC server
|
||||
func (s *Server) ServeBiJSON(addr string, onConn func(*rpc2.Client), onDis func(*rpc2.Client)) (err error) {
|
||||
s.RLock()
|
||||
isNil := s.birpcSrv == nil
|
||||
s.RUnlock()
|
||||
if isNil {
|
||||
return
|
||||
return fmt.Errorf("BiRPCServer should not be nil")
|
||||
}
|
||||
lBiJSON, e := net.Listen("tcp", addr)
|
||||
if e != nil {
|
||||
log.Fatal("ServeBiJSON listen error:", e)
|
||||
var lBiJSON net.Listener
|
||||
lBiJSON, err = net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
log.Fatal("ServeBiJSON listen error:", err)
|
||||
return
|
||||
}
|
||||
s.birpcSrv.OnConnect(onConn)
|
||||
s.birpcSrv.OnDisconnect(onDis)
|
||||
Logger.Info(fmt.Sprintf("Starting CGRateS BiJSON server at <%s>", addr))
|
||||
for {
|
||||
conn, err := lBiJSON.Accept()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
go func(l net.Listener) {
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log
|
||||
return
|
||||
}
|
||||
s.stopbiRPCServer <- struct{}{}
|
||||
log.Fatal(err)
|
||||
return // stop if we get Accept error
|
||||
}
|
||||
go s.birpcSrv.ServeCodec(rpc2_jsonrpc.NewJSONCodec(conn))
|
||||
}
|
||||
go s.birpcSrv.ServeCodec(rpc2_jsonrpc.NewJSONCodec(conn))
|
||||
}
|
||||
}(lBiJSON)
|
||||
<-s.stopbiRPCServer // wait until server is stoped to close the listener
|
||||
lBiJSON.Close()
|
||||
return
|
||||
}
|
||||
|
||||
// StopBiRPC stops the go rutine create with ServeBiJSON
|
||||
func (s *Server) StopBiRPC() {
|
||||
s.stopbiRPCServer <- struct{}{}
|
||||
}
|
||||
|
||||
// rpcRequest represents a RPC request.
|
||||
|
||||
Reference in New Issue
Block a user