diff --git a/config/config_it_test.go b/config/config_it_test.go index 3acb499e1..d7243b56f 100644 --- a/config/config_it_test.go +++ b/config/config_it_test.go @@ -354,6 +354,7 @@ func TestCGRConfigReloadRALs(t *testing.T) { Transport: utils.MetaJSONrpc, }, }, + MaxIncrements: 1000000, } if !reflect.DeepEqual(expAttr, cfg.RalsCfg()) { t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.RalsCfg())) diff --git a/services/sessions.go b/services/sessions.go index edb13e236..3c4612502 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -43,6 +43,10 @@ type SessionService struct { rpc *v1.SMGenericV1 rpcv1 *v1.SessionSv1 connChan chan rpcclient.RpcClientConnection + + // in order to stop the bircp server if necesary + bircpEnabled bool + server *utils.Server } // Start should handle the sercive start @@ -133,6 +137,8 @@ func (smg *SessionService) Start(sp servmanager.ServiceProvider, waitCache bool) } // Register BiRpc handlers if sp.GetConfig().SessionSCfg().ListenBijson != "" { + smg.bircpEnabled = true + smg.server = sp.GetServer() for method, handler := range smg.rpc.Handlers() { sp.GetServer().BiRPCRegisterName(method, handler) } @@ -140,7 +146,14 @@ func (smg *SessionService) Start(sp servmanager.ServiceProvider, waitCache bool) sp.GetServer().BiRPCRegisterName(method, handler) } // run this in it's own gorutine - go sp.GetServer().ServeBiJSON(sp.GetConfig().SessionSCfg().ListenBijson, smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect) + go func() { + if err := sp.GetServer().ServeBiJSON(sp.GetConfig().SessionSCfg().ListenBijson, smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> serve BiRPC error: %s!", utils.SessionS, err)) + smg.Lock() + smg.bircpEnabled = false + smg.Unlock() + } + }() } return } @@ -231,6 +244,10 @@ func (smg *SessionService) Shutdown() (err error) { if err = smg.sm.Shutdown(); err != nil { return } + if smg.bircpEnabled { + smg.server.StopBiRPC() + smg.bircpEnabled = false + } smg.sm = nil smg.rpc = nil smg.rpcv1 = nil diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 0eadffbd5..630b52e32 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -301,6 +301,15 @@ func (srvMngr *ServiceManager) handleReload() { for { select { case ext := <-srvMngr.engineShutdown: + for srviceName, srv := range srvMngr.subsystems { + if !srv.IsRunning() { + continue + } + if err := srv.Shutdown(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to shutdown subsystem <%s> because: %s", + utils.ServiceManager, srviceName, err)) + } + } srvMngr.engineShutdown <- ext return case <-srvMngr.GetConfig().GetReloadChan(config.ATTRIBUTE_JSN): diff --git a/utils/server.go b/utils/server.go index 43f061aba..9cdee040a 100644 --- a/utils/server.go +++ b/utils/server.go @@ -32,6 +32,7 @@ import ( "net/rpc" "net/rpc/jsonrpc" "reflect" + "strings" "sync" "time" @@ -44,17 +45,19 @@ func NewServer() (s *Server) { s = new(Server) s.httpMux = http.NewServeMux() s.httpsMux = http.NewServeMux() + s.stopbiRPCServer = make(chan struct{}, 1) return s } type Server struct { - rpcEnabled bool - httpEnabled bool - birpcSrv *rpc2.Server sync.RWMutex - httpsMux *http.ServeMux - httpMux *http.ServeMux - isDispatched bool + rpcEnabled bool + httpEnabled bool + birpcSrv *rpc2.Server + stopbiRPCServer chan struct{} // used in order to fully stop the biRPC + httpsMux *http.ServeMux + httpMux *http.ServeMux + isDispatched bool } func (s *Server) SetDispatched() { @@ -287,27 +290,45 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, exitChan <- true } -func (s *Server) ServeBiJSON(addr string, onConn func(*rpc2.Client), onDis func(*rpc2.Client)) { +// ServeBiJSON create a gorutine to listen and serve as BiRPC server +func (s *Server) ServeBiJSON(addr string, onConn func(*rpc2.Client), onDis func(*rpc2.Client)) (err error) { s.RLock() isNil := s.birpcSrv == nil s.RUnlock() if isNil { - return + return fmt.Errorf("BiRPCServer should not be nil") } - lBiJSON, e := net.Listen("tcp", addr) - if e != nil { - log.Fatal("ServeBiJSON listen error:", e) + var lBiJSON net.Listener + lBiJSON, err = net.Listen("tcp", addr) + if err != nil { + log.Fatal("ServeBiJSON listen error:", err) + return } s.birpcSrv.OnConnect(onConn) s.birpcSrv.OnDisconnect(onDis) Logger.Info(fmt.Sprintf("Starting CGRateS BiJSON server at <%s>", addr)) - for { - conn, err := lBiJSON.Accept() - if err != nil { - log.Fatal(err) + go func(l net.Listener) { + for { + conn, err := l.Accept() + if err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log + return + } + s.stopbiRPCServer <- struct{}{} + log.Fatal(err) + return // stop if we get Accept error + } + go s.birpcSrv.ServeCodec(rpc2_jsonrpc.NewJSONCodec(conn)) } - go s.birpcSrv.ServeCodec(rpc2_jsonrpc.NewJSONCodec(conn)) - } + }(lBiJSON) + <-s.stopbiRPCServer // wait until server is stoped to close the listener + lBiJSON.Close() + return +} + +// StopBiRPC stops the go rutine create with ServeBiJSON +func (s *Server) StopBiRPC() { + s.stopbiRPCServer <- struct{}{} } // rpcRequest represents a RPC request.