From 5098d3e65f960fe1ff2eb887111d78222eec4269 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 2 Sep 2021 17:11:02 +0300 Subject: [PATCH] Started adding the structure for cgr-engine --- cmd/cgr-engine/cgr-engine.go | 4 +- cores/libserver.go | 178 +++++++++++++ cores/server.go | 481 ++++++++++++++--------------------- engine/caches.go | 6 +- services/cgr-engine.go | 74 ++++++ services/libcgr-engine.go | 343 +++++++++++++++++++++++++ 6 files changed, 796 insertions(+), 290 deletions(-) create mode 100644 cores/libserver.go create mode 100644 services/cgr-engine.go create mode 100644 services/libcgr-engine.go diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 4bf086f93..2224abaf1 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -84,7 +84,7 @@ func initCacheS(internalCacheSChan chan birpc.ClientConnector, cpS *engine.CapsStats) (chS *engine.CacheS) { chS = engine.NewCacheS(cfg, dm, cpS) go func() { - if err := chS.Precache(); err != nil { + if err := chS.Precache(context.TODO()); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> could not init, error: %s", utils.CacheS, err.Error())) shdChan.CloseOnce() } @@ -141,6 +141,7 @@ func initConfigSv1(internalConfigChan chan birpc.ClientConnector, internalConfigChan <- rpc } +/* func startRPC(server *cores.Server, internalAdminSChan, internalCdrSChan, internalRsChan, internalStatSChan, internalAttrSChan, internalChargerSChan, internalThdSChan, internalRouteSChan, @@ -251,6 +252,7 @@ func startRPC(server *cores.Server, internalAdminSChan, ) } } +*/ func writePid() { utils.Logger.Info(*pidFile) diff --git a/cores/libserver.go b/cores/libserver.go new file mode 100644 index 000000000..c90abebfc --- /dev/null +++ b/cores/libserver.go @@ -0,0 +1,178 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package cores + +import ( + "bytes" + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "io" + "log" + "net" + "os" + "strings" + "time" + + "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/analyzers" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +// rpcRequest represents a RPC request. +// rpcRequest implements the io.ReadWriteCloser interface. +type rpcRequest struct { + r io.ReadCloser // holds the JSON formated RPC request + rw io.ReadWriter // holds the JSON formated RPC response + remoteAddr net.Addr + caps *engine.Caps + anzWarpper *analyzers.AnalyzerService +} + +// newRPCRequest returns a new rpcRequest. +func newRPCRequest(r io.ReadCloser, remoteAddr net.Addr, caps *engine.Caps, anz *analyzers.AnalyzerService) *rpcRequest { + return &rpcRequest{ + r: r, + rw: new(bytes.Buffer), + remoteAddr: remoteAddr, + caps: caps, + anzWarpper: anz, + } +} + +func (r *rpcRequest) Read(p []byte) (n int, err error) { + return r.r.Read(p) +} + +func (r *rpcRequest) Write(p []byte) (n int, err error) { + return r.rw.Write(p) +} + +func (r *rpcRequest) LocalAddr() net.Addr { + return utils.LocalAddr() +} +func (r *rpcRequest) RemoteAddr() net.Addr { + return r.remoteAddr +} + +func (r *rpcRequest) Close() error { + return r.r.Close() +} + +// Call invokes the RPC request, waits for it to complete, and returns the results. +func (r *rpcRequest) Call() io.Reader { + birpc.ServeCodec(newCapsJSONCodec(r, r.caps, r.anzWarpper)) + return r.rw +} + +func loadTLSConfig(serverCrt, serverKey, caCert string, serverPolicy int, + serverName string) (config *tls.Config, err error) { + cert, err := tls.LoadX509KeyPair(serverCrt, serverKey) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("Error: %s when load server keys", err)) + return nil, err + } + + rootCAs, err := x509.SystemCertPool() + //This will only happen on windows + if err != nil { + utils.Logger.Crit(fmt.Sprintf("Error: %s when load SystemCertPool", err)) + return nil, err + } + + if caCert != "" { + ca, err := os.ReadFile(caCert) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("Error: %s when read CA", err)) + return config, err + } + + if ok := rootCAs.AppendCertsFromPEM(ca); !ok { + utils.Logger.Crit("Cannot append certificate authority") + return config, errors.New("Cannot append certificate authority") + } + } + + config = &tls.Config{ + Certificates: []tls.Certificate{cert}, + ClientAuth: tls.ClientAuthType(serverPolicy), + ClientCAs: rootCAs, + } + if serverName != "" { + config.ServerName = serverName + } + return +} + +func acceptRPC(ctx *context.Context, shtdwnEngine context.CancelFunc, + srv *birpc.Server, l net.Listener, codecName string, newCodec func(conn conn) birpc.ServerCodec) (err error) { + var errCnt int + var lastErrorTime time.Time + for { + var conn net.Conn + if conn, err = l.Accept(); err != nil { + select { + case <-ctx.Done(): + return + default: + } + utils.Logger.Err(fmt.Sprintf(" %s accept error: <%s>", codecName, err.Error())) + now := time.Now() + if now.Sub(lastErrorTime) > 5*time.Second { + errCnt = 0 // reset error count if last error was more than 5 seconds ago + } + lastErrorTime = time.Now() + errCnt++ + if errCnt > 50 { // Too many errors in short interval, network buffer failure most probably + shtdwnEngine() + return + } + continue + } + go birpc.ServeCodec(newCodec(conn)) + } +} + +func acceptBiRPC(srv *birpc.BirpcServer, l net.Listener, codecName string, newCodec func(conn conn) birpc.BirpcCodec, stopbiRPCServer chan struct{}) { + for { + conn, err := l.Accept() + if err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log + return + } + stopbiRPCServer <- struct{}{} + utils.Logger.Crit(fmt.Sprintf("Stopped Bi%s server beacause %s", codecName, err)) + return // stop if we get Accept error + } + go srv.ServeCodec(newCodec(conn)) + } +} + +func listenBiRPC(srv *birpc.BirpcServer, addr, codecName string, newCodec func(conn conn) birpc.BirpcCodec, stopbiRPCServer chan struct{}) (lBiRPC net.Listener, err error) { + if lBiRPC, err = net.Listen(utils.TCP, addr); err != nil { + log.Printf("ServeBi%s listen error: %s \n", codecName, err) + return + } + utils.Logger.Info(fmt.Sprintf("Starting CGRateS Bi%s server at <%s>", codecName, addr)) + go acceptBiRPC(srv, lBiRPC, codecName, newCodec, stopbiRPCServer) + return +} diff --git a/cores/server.go b/cores/server.go index c4070bd96..a13f665d5 100644 --- a/cores/server.go +++ b/cores/server.go @@ -19,40 +19,42 @@ along with this program. If not, see package cores import ( - "bytes" "crypto/tls" - "crypto/x509" - "errors" "fmt" "io" "log" "net" "net/http" "net/http/pprof" - "os" - "strings" "sync" - "time" "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/analyzers" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "golang.org/x/net/websocket" ) func NewServer(caps *engine.Caps) (s *Server) { - return &Server{ + s = &Server{ httpMux: http.NewServeMux(), httpsMux: http.NewServeMux(), stopbiRPCServer: make(chan struct{}, 1), caps: caps, + + rpcStarted: utils.NewSyncedChan(), + rpcServer: birpc.NewServer(), + birpcSrv: birpc.NewBirpcServer(), } + s.httpServer = &http.Server{Handler: s.httpMux} + s.httpsServer = &http.Server{Handler: s.httpsMux} + return } type Server struct { sync.RWMutex - rpcEnabled bool httpEnabled bool birpcSrv *birpc.BirpcServer stopbiRPCServer chan struct{} // used in order to fully stop the biRPC @@ -60,6 +62,16 @@ type Server struct { httpMux *http.ServeMux caps *engine.Caps anz *analyzers.AnalyzerService + + rpcStarted *utils.SyncedChan + rpcServer *birpc.Server + rpcJSONl net.Listener + rpcGOBl net.Listener + rpcJSONlTLS net.Listener + rpcGOBlTLS net.Listener + httpServer *http.Server + httpsServer *http.Server + startSrv sync.Once } func (s *Server) SetAnalyzer(anz *analyzers.AnalyzerService) { @@ -68,16 +80,10 @@ func (s *Server) SetAnalyzer(anz *analyzers.AnalyzerService) { func (s *Server) RpcRegister(rcvr interface{}) { birpc.Register(rcvr) - s.Lock() - s.rpcEnabled = true - s.Unlock() } func (s *Server) RpcRegisterName(name string, rcvr interface{}) { birpc.RegisterName(name, rcvr) - s.Lock() - s.rpcEnabled = true - s.Unlock() } func (s *Server) RpcUnregisterName(name string) { @@ -85,24 +91,16 @@ func (s *Server) RpcUnregisterName(name string) { } func (s *Server) RegisterHTTPFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) { - if s.httpMux != nil { - s.httpMux.HandleFunc(pattern, handler) - } - if s.httpsMux != nil { - s.httpsMux.HandleFunc(pattern, handler) - } + s.httpMux.HandleFunc(pattern, handler) + s.httpsMux.HandleFunc(pattern, handler) s.Lock() s.httpEnabled = true s.Unlock() } func (s *Server) RegisterHttpHandler(pattern string, handler http.Handler) { - if s.httpMux != nil { - s.httpMux.Handle(pattern, handler) - } - if s.httpsMux != nil { - s.httpsMux.Handle(pattern, handler) - } + s.httpMux.Handle(pattern, handler) + s.httpsMux.Handle(pattern, handler) s.Lock() s.httpEnabled = true s.Unlock() @@ -110,77 +108,9 @@ func (s *Server) RegisterHttpHandler(pattern string, handler http.Handler) { // Registers a new BiJsonRpc name func (s *Server) BiRPCRegisterName(name string, rcv interface{}) { - s.RLock() - isNil := s.birpcSrv == nil - s.RUnlock() - if isNil { - s.Lock() - s.birpcSrv = birpc.NewBirpcServer() - s.Unlock() - } s.birpcSrv.RegisterName(name, rcv) } -func (s *Server) serveCodec(addr, codecName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) birpc.ServerCodec, - shdChan *utils.SyncedChan) { - s.RLock() - enabled := s.rpcEnabled - s.RUnlock() - if !enabled { - return - } - - l, e := net.Listen(utils.TCP, addr) - if e != nil { - log.Printf("Serve%s listen error: %s", codecName, e) - shdChan.CloseOnce() - return - } - utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s server at <%s>.", codecName, addr)) - s.accept(l, codecName, newCodec, shdChan) -} - -func (s *Server) accept(l net.Listener, codecName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) birpc.ServerCodec, - shdChan *utils.SyncedChan) { - errCnt := 0 - var lastErrorTime time.Time - for { - conn, err := l.Accept() - if err != nil { - utils.Logger.Err(fmt.Sprintf(" %s accept error: <%s>", codecName, err.Error())) - now := time.Now() - if now.Sub(lastErrorTime) > 5*time.Second { - errCnt = 0 // reset error count if last error was more than 5 seconds ago - } - lastErrorTime = time.Now() - errCnt++ - if errCnt > 50 { // Too many errors in short interval, network buffer failure most probably - shdChan.CloseOnce() - return - } - continue - } - go birpc.ServeCodec(newCodec(conn, s.caps, s.anz)) - } -} - -func (s *Server) ServeJSON(addr string, shdChan *utils.SyncedChan) { - s.serveCodec(addr, utils.JSONCaps, newCapsJSONCodec, shdChan) -} - -func (s *Server) ServeGOB(addr string, shdChan *utils.SyncedChan) { - s.serveCodec(addr, utils.GOBCaps, newCapsGOBCodec, shdChan) -} - -func (s *Server) handleRequest(w http.ResponseWriter, r *http.Request) { - defer r.Body.Close() - w.Header().Set("Content-Type", "application/json") - rmtIP, _ := utils.GetRemoteIP(r) - rmtAddr, _ := net.ResolveIPAddr(utils.EmptyString, rmtIP) - res := newRPCRequest(r.Body, rmtAddr, s.caps, s.anz).Call() - io.Copy(w, res) -} - func registerProfiler(addr string, mux *http.ServeMux) { mux.HandleFunc(addr, pprof.Index) mux.HandleFunc(addr+"cmdline", pprof.Cmdline) @@ -203,19 +133,53 @@ func (s *Server) RegisterProfiler(addr string) { registerProfiler(addr, s.httpsMux) } -func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, - useBasicAuth bool, userList map[string]string, shdChan *utils.SyncedChan) { - s.RLock() - enabled := s.rpcEnabled - s.RUnlock() +func (s *Server) handleRequest(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + rmtIP, _ := utils.GetRemoteIP(r) + rmtAddr, _ := net.ResolveIPAddr(utils.EmptyString, rmtIP) + res := newRPCRequest(r.Body, rmtAddr, s.caps, s.anz).Call() + io.Copy(w, res) + r.Body.Close() +} + +func (s *Server) handleWebSocket(ws *websocket.Conn) { + birpc.ServeCodec(newCapsJSONCodec(ws, s.caps, s.anz)) +} + +func (s *Server) ServeJSON(ctx *context.Context, shtdwnEngine context.CancelFunc, addr string) (err error) { + if s.rpcJSONl, err = net.Listen(utils.TCP, addr); err != nil { + log.Printf("Serve%s listen error: %s", utils.JSONCaps, err) + shtdwnEngine() + return + } + utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s server at <%s>.", utils.JSONCaps, addr)) + return acceptRPC(ctx, shtdwnEngine, s.rpcServer, s.rpcJSONl, utils.JSONCaps, func(conn conn) birpc.ServerCodec { + return newCapsJSONCodec(conn, s.caps, s.anz) + }) +} + +func (s *Server) ServeGOB(ctx *context.Context, shtdwnEngine context.CancelFunc, addr string) (err error) { + if s.rpcGOBl, err = net.Listen(utils.TCP, addr); err != nil { + log.Printf("Serve%s listen error: %s", utils.GOBCaps, err) + shtdwnEngine() + return + } + utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s server at <%s>.", utils.GOBCaps, addr)) + return acceptRPC(ctx, shtdwnEngine, s.rpcServer, s.rpcGOBl, utils.GOBCaps, func(conn conn) birpc.ServerCodec { + return newCapsGOBCodec(conn, s.caps, s.anz) + }) +} + +func (s *Server) ServeHTTP(shtdwnEngine context.CancelFunc, addr, jsonRPCURL, wsRPCURL string, + useBasicAuth bool, userList map[string]string) { + s.Lock() + s.httpEnabled = s.httpEnabled || jsonRPCURL != "" || wsRPCURL != "" + enabled := s.httpEnabled + s.Unlock() if !enabled { return } if jsonRPCURL != "" { - s.Lock() - s.httpEnabled = true - s.Unlock() - utils.Logger.Info(" enabling handler for JSON-RPC") if useBasicAuth { s.httpMux.HandleFunc(jsonRPCURL, use(s.handleRequest, basicAuth(userList))) @@ -224,9 +188,6 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, } } if wsRPCURL != "" { - s.Lock() - s.httpEnabled = true - s.Unlock() utils.Logger.Info(" enabling handler for WebSocket connections") wsHandler := websocket.Handler(s.handleWebSocket) if useBasicAuth { @@ -235,40 +196,35 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, s.httpMux.Handle(wsRPCURL, wsHandler) } } - if !s.httpEnabled { - return - } if useBasicAuth { utils.Logger.Info(" enabling basic auth") } utils.Logger.Info(fmt.Sprintf(" start listening at <%s>", addr)) - if err := http.ListenAndServe(addr, s.httpMux); err != nil { + s.httpServer.Addr = addr + if err := s.httpServer.ListenAndServe(); err != nil { log.Println(fmt.Sprintf("Error: %s when listening ", err)) - shdChan.CloseOnce() + shtdwnEngine() } } // ServeBiRPC create a goroutine to listen and serve as BiRPC server -func (s *Server) ServeBiRPC(addrJSON, addrGOB string, onConn func(birpc.ClientConnector), onDis func(birpc.ClientConnector)) (err error) { - s.RLock() - isNil := s.birpcSrv == nil - s.RUnlock() - if isNil { - return fmt.Errorf("BiRPCServer should not be nil") - } - +func (s *Server) ServeBiRPC2(addrJSON, addrGOB string, onConn, onDis func(birpc.ClientConnector)) (err error) { s.birpcSrv.OnConnect(onConn) s.birpcSrv.OnDisconnect(onDis) if addrJSON != utils.EmptyString { var ljson net.Listener - if ljson, err = s.listenBiRPC(s.birpcSrv, addrJSON, utils.JSONCaps, newCapsBiRPCJSONCodec); err != nil { + if ljson, err = listenBiRPC(s.birpcSrv, addrJSON, utils.JSONCaps, func(conn conn) birpc.BirpcCodec { + return newCapsBiRPCJSONCodec(conn, s.caps, s.anz) + }, s.stopbiRPCServer); err != nil { return } defer ljson.Close() } if addrGOB != utils.EmptyString { var lgob net.Listener - if lgob, err = s.listenBiRPC(s.birpcSrv, addrGOB, utils.GOBCaps, newCapsBiRPCGOBCodec); err != nil { + if lgob, err = listenBiRPC(s.birpcSrv, addrGOB, utils.GOBCaps, func(conn conn) birpc.BirpcCodec { + return newCapsBiRPCGOBCodec(conn, s.caps, s.anz) + }, s.stopbiRPCServer); err != nil { return } defer lgob.Close() @@ -277,175 +233,58 @@ func (s *Server) ServeBiRPC(addrJSON, addrGOB string, onConn func(birpc.ClientCo return } -func (s *Server) listenBiRPC(srv *birpc.BirpcServer, addr, codecName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) birpc.BirpcCodec) (lBiRPC net.Listener, err error) { - if lBiRPC, err = net.Listen(utils.TCP, addr); err != nil { - log.Printf("ServeBi%s listen error: %s \n", codecName, err) - return - } - utils.Logger.Info(fmt.Sprintf("Starting CGRateS Bi%s server at <%s>", codecName, addr)) - go s.acceptBiRPC(srv, lBiRPC, codecName, newCodec) - return -} - -func (s *Server) acceptBiRPC(srv *birpc.BirpcServer, l net.Listener, codecName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) birpc.BirpcCodec) { - for { - conn, err := l.Accept() - if err != nil { - if strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log - return - } - s.stopbiRPCServer <- struct{}{} - utils.Logger.Crit(fmt.Sprintf("Stopped Bi%s server beacause %s", codecName, err)) - return // stop if we get Accept error - } - go srv.ServeCodec(newCodec(conn, s.caps, s.anz)) - } -} - -// StopBiRPC stops the go routine create with ServeBiJSON -func (s *Server) StopBiRPC() { - s.stopbiRPCServer <- struct{}{} - s.Lock() - s.birpcSrv = nil - s.Unlock() -} - -// rpcRequest represents a RPC request. -// rpcRequest implements the io.ReadWriteCloser interface. -type rpcRequest struct { - r io.ReadCloser // holds the JSON formated RPC request - rw io.ReadWriter // holds the JSON formated RPC response - remoteAddr net.Addr - caps *engine.Caps - anzWarpper *analyzers.AnalyzerService -} - -// newRPCRequest returns a new rpcRequest. -func newRPCRequest(r io.ReadCloser, remoteAddr net.Addr, caps *engine.Caps, anz *analyzers.AnalyzerService) *rpcRequest { - return &rpcRequest{ - r: r, - rw: new(bytes.Buffer), - remoteAddr: remoteAddr, - caps: caps, - anzWarpper: anz, - } -} - -func (r *rpcRequest) Read(p []byte) (n int, err error) { - return r.r.Read(p) -} - -func (r *rpcRequest) Write(p []byte) (n int, err error) { - return r.rw.Write(p) -} - -func (r *rpcRequest) LocalAddr() net.Addr { - return utils.LocalAddr() -} -func (r *rpcRequest) RemoteAddr() net.Addr { - return r.remoteAddr -} - -func (r *rpcRequest) Close() error { - return r.r.Close() -} - -// Call invokes the RPC request, waits for it to complete, and returns the results. -func (r *rpcRequest) Call() io.Reader { - birpc.ServeCodec(newCapsJSONCodec(r, r.caps, r.anzWarpper)) - return r.rw -} - -func loadTLSConfig(serverCrt, serverKey, caCert string, serverPolicy int, - serverName string) (config *tls.Config, err error) { - cert, err := tls.LoadX509KeyPair(serverCrt, serverKey) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("Error: %s when load server keys", err)) - return nil, err - } - - rootCAs, err := x509.SystemCertPool() - //This will only happen on windows - if err != nil { - utils.Logger.Crit(fmt.Sprintf("Error: %s when load SystemCertPool", err)) - return nil, err - } - - if caCert != "" { - ca, err := os.ReadFile(caCert) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("Error: %s when read CA", err)) - return config, err - } - - if ok := rootCAs.AppendCertsFromPEM(ca); !ok { - utils.Logger.Crit("Cannot append certificate authority") - return config, errors.New("Cannot append certificate authority") - } - } - - config = &tls.Config{ - Certificates: []tls.Certificate{cert}, - ClientAuth: tls.ClientAuthType(serverPolicy), - ClientCAs: rootCAs, - } - if serverName != "" { - config.ServerName = serverName - } - return -} - -func (s *Server) serveCodecTLS(addr, codecName, serverCrt, serverKey, caCert string, - serverPolicy int, serverName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) birpc.ServerCodec, - shdChan *utils.SyncedChan) { - s.RLock() - enabled := s.rpcEnabled - s.RUnlock() - if !enabled { - return - } +func (s *Server) ServeGOBTLS(ctx *context.Context, shtdwnEngine context.CancelFunc, + addr, serverCrt, serverKey, caCert string, serverPolicy int, serverName string) (err error) { config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName) if err != nil { - shdChan.CloseOnce() + shtdwnEngine() return } - listener, err := tls.Listen(utils.TCP, addr, config) + s.rpcGOBlTLS, err = tls.Listen(utils.TCP, addr, config) if err != nil { log.Println(fmt.Sprintf("Error: %s when listening", err)) - shdChan.CloseOnce() + shtdwnEngine() return } - utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s TLS server at <%s>.", codecName, addr)) - s.accept(listener, codecName+" "+utils.TLS, newCodec, shdChan) + utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s TLS server at <%s>.", utils.GOBCaps, addr)) + + return acceptRPC(ctx, shtdwnEngine, s.rpcServer, s.rpcGOBlTLS, utils.GOBCaps, func(conn conn) birpc.ServerCodec { + return newCapsGOBCodec(conn, s.caps, s.anz) + }) } -func (s *Server) ServeGOBTLS(addr, serverCrt, serverKey, caCert string, - serverPolicy int, serverName string, shdChan *utils.SyncedChan) { - s.serveCodecTLS(addr, utils.GOBCaps, serverCrt, serverKey, caCert, serverPolicy, serverName, newCapsGOBCodec, shdChan) +func (s *Server) ServeJSONTLS(ctx *context.Context, shtdwnEngine context.CancelFunc, + addr, serverCrt, serverKey, caCert string, serverPolicy int, serverName string) (err error) { + config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName) + if err != nil { + shtdwnEngine() + return + } + s.rpcJSONlTLS, err = tls.Listen(utils.TCP, addr, config) + if err != nil { + log.Println(fmt.Sprintf("Error: %s when listening", err)) + shtdwnEngine() + return + } + utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s TLS server at <%s>.", utils.JSONCaps, addr)) + + return acceptRPC(ctx, shtdwnEngine, s.rpcServer, s.rpcJSONlTLS, utils.JSONCaps, func(conn conn) birpc.ServerCodec { + return newCapsGOBCodec(conn, s.caps, s.anz) + }) } -func (s *Server) ServeJSONTLS(addr, serverCrt, serverKey, caCert string, - serverPolicy int, serverName string, shdChan *utils.SyncedChan) { - s.serveCodecTLS(addr, utils.JSONCaps, serverCrt, serverKey, caCert, serverPolicy, serverName, newCapsJSONCodec, shdChan) -} - -func (s *Server) handleWebSocket(ws *websocket.Conn) { - birpc.ServeCodec(newCapsJSONCodec(ws, s.caps, s.anz)) -} - -func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverPolicy int, +func (s *Server) ServeHTTPS(shtdwnEngine context.CancelFunc, + addr, serverCrt, serverKey, caCert string, serverPolicy int, serverName string, jsonRPCURL string, wsRPCURL string, - useBasicAuth bool, userList map[string]string, shdChan *utils.SyncedChan) { - s.RLock() - enabled := s.rpcEnabled - s.RUnlock() + useBasicAuth bool, userList map[string]string) { + s.Lock() + s.httpEnabled = s.httpEnabled || jsonRPCURL != "" || wsRPCURL != "" + enabled := s.httpEnabled + s.Unlock() if !enabled { return } if jsonRPCURL != "" { - s.Lock() - s.httpEnabled = true - s.Unlock() utils.Logger.Info(" enabling handler for JSON-RPC") if useBasicAuth { s.httpsMux.HandleFunc(jsonRPCURL, use(s.handleRequest, basicAuth(userList))) @@ -454,9 +293,6 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP } } if wsRPCURL != "" { - s.Lock() - s.httpEnabled = true - s.Unlock() utils.Logger.Info(" enabling handler for WebSocket connections") wsHandler := websocket.Handler(s.handleWebSocket) if useBasicAuth { @@ -465,25 +301,96 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP s.httpsMux.Handle(wsRPCURL, wsHandler) } } - if !s.httpEnabled { - return - } if useBasicAuth { utils.Logger.Info(" enabling basic auth") } config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName) if err != nil { - shdChan.CloseOnce() + shtdwnEngine() return } - httpSrv := http.Server{ - Addr: addr, - Handler: s.httpsMux, - TLSConfig: config, - } + s.httpsServer.Addr = addr + s.httpsServer.TLSConfig = config utils.Logger.Info(fmt.Sprintf(" start listening at <%s>", addr)) - if err := httpSrv.ListenAndServeTLS(serverCrt, serverKey); err != nil { + + if err := s.httpsServer.ListenAndServeTLS(serverCrt, serverKey); err != nil { log.Println(fmt.Sprintf("Error: %s when listening ", err)) - shdChan.CloseOnce() + shtdwnEngine() } } + +func (s *Server) Stop() { + s.rpcJSONl.Close() + s.rpcGOBl.Close() + s.rpcJSONlTLS.Close() + s.rpcGOBlTLS.Close() + s.httpServer.Shutdown(context.Background()) + s.httpsServer.Shutdown(context.Background()) + s.StopBiRPC() +} + +// StopBiRPC stops the go routine create with ServeBiJSON +func (s *Server) StopBiRPC() { + s.stopbiRPCServer <- struct{}{} + s.birpcSrv = birpc.NewBirpcServer() +} + +func (s *Server) StartServer(ctx *context.Context, shtdwnEngine context.CancelFunc, cfg *config.CGRConfig) { + s.startSrv.Do(func() { + go s.ServeJSON(ctx, shtdwnEngine, cfg.ListenCfg().RPCJSONListen) + go s.ServeGOB(ctx, shtdwnEngine, cfg.ListenCfg().RPCGOBListen) + go s.ServeHTTP( + shtdwnEngine, + cfg.ListenCfg().HTTPListen, + cfg.HTTPCfg().JsonRPCURL, + cfg.HTTPCfg().WSURL, + cfg.HTTPCfg().UseBasicAuth, + cfg.HTTPCfg().AuthUsers, + ) + if (len(cfg.ListenCfg().RPCGOBTLSListen) != 0 || + len(cfg.ListenCfg().RPCJSONTLSListen) != 0 || + len(cfg.ListenCfg().HTTPTLSListen) != 0) && + (len(cfg.TLSCfg().ServerCerificate) == 0 || + len(cfg.TLSCfg().ServerKey) == 0) { + utils.Logger.Warning("WARNING: missing TLS certificate/key file!") + return + } + if cfg.ListenCfg().RPCGOBTLSListen != utils.EmptyString { + go s.ServeGOBTLS( + ctx, shtdwnEngine, + cfg.ListenCfg().RPCGOBTLSListen, + cfg.TLSCfg().ServerCerificate, + cfg.TLSCfg().ServerKey, + cfg.TLSCfg().CaCertificate, + cfg.TLSCfg().ServerPolicy, + cfg.TLSCfg().ServerName, + ) + } + if cfg.ListenCfg().RPCJSONTLSListen != utils.EmptyString { + go s.ServeJSONTLS( + ctx, shtdwnEngine, + cfg.ListenCfg().RPCJSONTLSListen, + cfg.TLSCfg().ServerCerificate, + cfg.TLSCfg().ServerKey, + cfg.TLSCfg().CaCertificate, + cfg.TLSCfg().ServerPolicy, + cfg.TLSCfg().ServerName, + ) + } + if cfg.ListenCfg().HTTPTLSListen != utils.EmptyString { + go s.ServeHTTPS( + shtdwnEngine, + cfg.ListenCfg().HTTPTLSListen, + cfg.TLSCfg().ServerCerificate, + cfg.TLSCfg().ServerKey, + cfg.TLSCfg().CaCertificate, + cfg.TLSCfg().ServerPolicy, + cfg.TLSCfg().ServerName, + cfg.HTTPCfg().JsonRPCURL, + cfg.HTTPCfg().WSURL, + cfg.HTTPCfg().UseBasicAuth, + cfg.HTTPCfg().AuthUsers, + ) + } + }) +} diff --git a/engine/caches.go b/engine/caches.go index 60fb20afe..558e99e0f 100644 --- a/engine/caches.go +++ b/engine/caches.go @@ -217,7 +217,7 @@ func (chS *CacheS) GetPrecacheChannel(chID string) chan struct{} { } // Precache loads data from DataDB into cache at engine start -func (chS *CacheS) Precache() (err error) { +func (chS *CacheS) Precache(ctx *context.Context) (err error) { var wg sync.WaitGroup // wait for precache to finish errChan := make(chan error) doneChan := make(chan struct{}) @@ -228,7 +228,7 @@ func (chS *CacheS) Precache() (err error) { } wg.Add(1) go func(cacheID string) { - errCache := chS.dm.CacheDataFromDB(context.TODO(), + errCache := chS.dm.CacheDataFromDB(ctx, utils.CacheInstanceToPrefix[cacheID], []string{utils.MetaAny}, false) @@ -247,6 +247,8 @@ func (chS *CacheS) Precache() (err error) { select { case err = <-errChan: case <-doneChan: + case <-ctx.Done(): + err = ctx.Err() } return } diff --git a/services/cgr-engine.go b/services/cgr-engine.go new file mode 100644 index 000000000..b33ce7873 --- /dev/null +++ b/services/cgr-engine.go @@ -0,0 +1,74 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + "sync" + + "github.com/cgrates/birpc" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" +) + +type CGREngine struct { + cfg *config.CGRConfig + + srvManager *servmanager.ServiceManager + srvDep map[string]*sync.WaitGroup + cmConns map[string]chan birpc.ClientConnector +} + +func (cgr *CGREngine) AddService(service servmanager.Service, connName, apiPrefix string, + iConnCh chan birpc.ClientConnector) { + cgr.srvManager.AddServices(service) + cgr.cmConns[utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources)] = iConnCh + cgr.srvDep[service.ServiceName()] = new(sync.WaitGroup) + engine.IntRPC.AddInternalRPCClient(apiPrefix, iConnCh) +} + +func (cgr *CGREngine) InitConfigFromPath(path string, nodeID string) (err error) { + // Init config + if cgr.cfg, err = config.NewCGRConfigFromPath(path); err != nil { + err = fmt.Errorf("could not parse config: <%s>", err) + return + } + if cgr.cfg.ConfigDBCfg().Type != utils.MetaInternal { + var d config.ConfigDB + if d, err = engine.NewDataDBConn(cgr.cfg.ConfigDBCfg().Type, + cgr.cfg.ConfigDBCfg().Host, cgr.cfg.ConfigDBCfg().Port, + cgr.cfg.ConfigDBCfg().Name, cgr.cfg.ConfigDBCfg().User, + cgr.cfg.ConfigDBCfg().Password, cgr.cfg.GeneralCfg().DBDataEncoding, + cgr.cfg.ConfigDBCfg().Opts); err != nil { // Cannot configure getter database, show stopper + err = fmt.Errorf("could not configure configDB: <%s>", err) + return + } + if err = cgr.cfg.LoadFromDB(d); err != nil { + err = fmt.Errorf("could not parse config from DB: <%s>", err) + return + } + } + if nodeID != utils.EmptyString { + cgr.cfg.GeneralCfg().NodeID = nodeID + } + config.SetCgrConfig(cgr.cfg) // Share the config object + return +} diff --git a/services/libcgr-engine.go b/services/libcgr-engine.go new file mode 100644 index 000000000..fea49c696 --- /dev/null +++ b/services/libcgr-engine.go @@ -0,0 +1,343 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "flag" + "fmt" + "os" + "os/signal" + "strconv" + "strings" + "sync" + "syscall" + "time" + + "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/apis" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/cores" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/loaders" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" +) + +func NewCGREngineFlags() *CGREngineFlags { + fs := flag.NewFlagSet(utils.CgrEngine, flag.ContinueOnError) + return &CGREngineFlags{ + FlagSet: fs, + Cfgpath: fs.String(utils.CfgPathCgr, utils.ConfigPath, "Configuration directory path."), + Version: fs.Bool(utils.VersionCgr, false, "Prints the application version."), + Checkconfig: fs.Bool(utils.CheckCfgCgr, false, "Verify the config without starting the engine"), + Pidfile: fs.String(utils.PidCgr, utils.EmptyString, "Write pid file"), + Httppprofpath: fs.String(utils.HttpPrfPthCgr, utils.EmptyString, "http address used for program profiling"), + Cpuprofdir: fs.String(utils.CpuProfDirCgr, utils.EmptyString, "write cpu profile to files"), + Memprofdir: fs.String(utils.MemProfDirCgr, utils.EmptyString, "write memory profile to file"), + Memprofinterval: fs.Duration(utils.MemProfIntervalCgr, 5*time.Second, "Time between memory profile saves"), + Memprofnrfiles: fs.Int(utils.MemProfNrFilesCgr, 1, "Number of memory profile to write"), + Scheduledshutdown: fs.String(utils.ScheduledShutdownCgr, utils.EmptyString, "shutdown the engine after this duration"), + Singlecpu: fs.Bool(utils.SingleCpuCgr, false, "Run on single CPU core"), + Syslogger: fs.String(utils.LoggerCfg, utils.EmptyString, "logger <*syslog|*stdout>"), + Nodeid: fs.String(utils.NodeIDCfg, utils.EmptyString, "The node ID of the engine"), + Loglevel: fs.Int(utils.LogLevelCfg, -1, "Log level (0-emergency to 7-debug)"), + Preload: fs.String(utils.PreloadCgr, utils.EmptyString, "LoaderIDs used to load the data before the engine starts"), + } +} + +type CGREngineFlags struct { + *flag.FlagSet + + Cfgpath *string + Version *bool + Checkconfig *bool + Pidfile *string + Httppprofpath *string + Cpuprofdir *string + Memprofdir *string + Memprofinterval *time.Duration + Memprofnrfiles *int + Scheduledshutdown *string + Singlecpu *bool + Syslogger *string + Nodeid *string + Loglevel *int + Preload *string +} + +func cgrSingnalHandler(ctx *context.Context, shutdown context.CancelFunc, + cfg *config.CGRConfig, shdWg *sync.WaitGroup) { + shutdownSignal := make(chan os.Signal, 1) + reloadSignal := make(chan os.Signal, 1) + signal.Notify(shutdownSignal, os.Interrupt, + syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) + signal.Notify(reloadSignal, syscall.SIGHUP) + for { + select { + case <-ctx.Done(): + shdWg.Done() + return + case <-shutdownSignal: + shutdown() + shdWg.Done() + return + case <-reloadSignal: + // do it in it's own goroutine in order to not block the signal handler with the reload functionality + go func() { + var reply string + if err := cfg.V1ReloadConfig(ctx, + new(config.ReloadArgs), &reply); err != nil { + utils.Logger.Warning( + fmt.Sprintf("Error reloading configuration: <%s>", err)) + } + }() + } + } +} + +func cgrWritePid(pidFile string) (err error) { + var f *os.File + if f, err = os.Create(pidFile); err != nil { + err = fmt.Errorf("could not create pid file: %s", err) + return + } + if _, err = f.WriteString(strconv.Itoa(os.Getpid())); err != nil { + f.Close() + err = fmt.Errorf("could not write pid file: %s", err) + return + } + if err = f.Close(); err != nil { + err = fmt.Errorf("could not close pid file: %s", err) + } + return +} + +func cgrRunPreload(ctx *context.Context, cfg *config.CGRConfig, loaderIDs string, + loader *LoaderService, iLoaderSCh chan birpc.ClientConnector) (err error) { + if !cfg.LoaderCfg().Enabled() { + err = fmt.Errorf("<%s> not enabled but required by preload mechanism", utils.LoaderS) + return + } + select { + case ldrs := <-iLoaderSCh: + iLoaderSCh <- ldrs + case <-ctx.Done(): + return + } + + var reply string + for _, loaderID := range strings.Split(loaderIDs, utils.FieldsSep) { + if err = loader.GetLoaderS().V1Load(ctx, &loaders.ArgsProcessFolder{ + ForceLock: true, // force lock will unlock the file in case is locked and return error + LoaderID: loaderID, + StopOnError: true, + }, &reply); err != nil { + err = fmt.Errorf("<%s> preload failed on loadID <%s> , err: <%s>", utils.LoaderS, loaderID, err) + return + } + } + return +} + +// cgrStartFilterService fires up the FilterS +func cgrStartFilterService(ctx *context.Context, iFilterSCh chan *engine.FilterS, + cacheS *engine.CacheS, connMgr *engine.ConnManager, + cfg *config.CGRConfig, dm *engine.DataManager) { + select { + case <-cacheS.GetPrecacheChannel(utils.CacheFilters): + iFilterSCh <- engine.NewFilterS(cfg, connMgr, dm) + case <-ctx.Done(): + } +} + +// cgrInitCacheS inits the CacheS and starts precaching as well as populating internal channel for RPC conns +func cgrInitCacheS(ctx *context.Context, shutdown context.CancelFunc, + iCacheSCh chan birpc.ClientConnector, server *cores.Server, + cfg *config.CGRConfig, dm *engine.DataManager, anz *AnalyzerService, + cpS *engine.CapsStats) (chS *engine.CacheS) { + chS = engine.NewCacheS(cfg, dm, cpS) + go func() { + if err := chS.Precache(ctx); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> could not init, error: %s", utils.CacheS, err.Error())) + shutdown() + } + }() + + chSv1, _ := birpc.NewService(apis.NewCacheSv1(chS), "", false) + if !cfg.DispatcherSCfg().Enabled { + server.RpcRegister(chSv1) + } + var rpc birpc.ClientConnector = chSv1 + if anz.IsRunning() { + rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.CacheS) + } + iCacheSCh <- rpc + return +} + +func cgrInitGuardianSv1(iGuardianSCh chan birpc.ClientConnector, + server *cores.Server, anz *AnalyzerService) { + // grdSv1 := v1.NewGuardianSv1() + // if !cfg.DispatcherSCfg().Enabled { + // server.RpcRegister(grdSv1) + // } + // var rpc birpc.ClientConnector = grdSv1 + // if anz.IsRunning() { + // rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.GuardianS) + // } + // iGuardianSCh <- rpc +} + +func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector, + srvMngr *servmanager.ServiceManager, server *cores.Server, + anz *AnalyzerService) { + // if !cfg.DispatcherSCfg().Enabled { + // server.RpcRegister(v1.NewServiceManagerV1(srvMngr)) + // } + // var rpc birpc.ClientConnector = srvMngr + // if anz.IsRunning() { + // rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.ServiceManager) + // } + // iServMngrCh <- rpc +} + +func cgrInitConfigSv1(iConfigCh chan birpc.ClientConnector, + cfg *config.CGRConfig, server *cores.Server, anz *AnalyzerService) { + cfgSv1, _ := birpc.NewService(apis.NewConfigSv1(cfg), "", false) + if !cfg.DispatcherSCfg().Enabled { + server.RpcRegister(cfgSv1) + } + var rpc birpc.ClientConnector = cfgSv1 + if anz.IsRunning() { + rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.ConfigSv1) + } + iConfigCh <- rpc +} + +func startRPC(server *cores.Server, internalAdminSChan, + internalCdrSChan, internalRsChan, internalStatSChan, + internalAttrSChan, internalChargerSChan, internalThdSChan, internalRouteSChan, + internalSessionSChan, internalAnalyzerSChan, internalDispatcherSChan, + internalLoaderSChan, internalCacheSChan, + internalEEsChan, internalRateSChan, internalActionSChan, + internalAccountSChan chan birpc.ClientConnector, + shdChan *utils.SyncedChan) { + if !cfg.DispatcherSCfg().Enabled { + select { // Any of the rpc methods will unlock listening to rpc requests + // case cdrs := <-internalCdrSChan: + // internalCdrSChan <- cdrs + case smg := <-internalSessionSChan: + internalSessionSChan <- smg + case rls := <-internalRsChan: + internalRsChan <- rls + case statS := <-internalStatSChan: + internalStatSChan <- statS + case admS := <-internalAdminSChan: + internalAdminSChan <- admS + case attrS := <-internalAttrSChan: + internalAttrSChan <- attrS + case chrgS := <-internalChargerSChan: + internalChargerSChan <- chrgS + case thS := <-internalThdSChan: + internalThdSChan <- thS + case rtS := <-internalRouteSChan: + internalRouteSChan <- rtS + // case analyzerS := <-internalAnalyzerSChan: + // internalAnalyzerSChan <- analyzerS + case loaderS := <-internalLoaderSChan: + internalLoaderSChan <- loaderS + case chS := <-internalCacheSChan: // added in order to start the RPC before precaching is done + internalCacheSChan <- chS + // case eeS := <-internalEEsChan: + // internalEEsChan <- eeS + case rateS := <-internalRateSChan: + internalRateSChan <- rateS + case actionS := <-internalActionSChan: + internalActionSChan <- actionS + case accountS := <-internalAccountSChan: + internalAccountSChan <- accountS + case <-shdChan.Done(): + return + } + } else { + select { + case dispatcherS := <-internalDispatcherSChan: + internalDispatcherSChan <- dispatcherS + case <-shdChan.Done(): + return + } + } + + go server.ServeJSON(cfg.ListenCfg().RPCJSONListen, shdChan) + go server.ServeGOB(cfg.ListenCfg().RPCGOBListen, shdChan) + go server.ServeHTTP( + cfg.ListenCfg().HTTPListen, + cfg.HTTPCfg().JsonRPCURL, + cfg.HTTPCfg().WSURL, + cfg.HTTPCfg().UseBasicAuth, + cfg.HTTPCfg().AuthUsers, + shdChan, + ) + if (len(cfg.ListenCfg().RPCGOBTLSListen) != 0 || + len(cfg.ListenCfg().RPCJSONTLSListen) != 0 || + len(cfg.ListenCfg().HTTPTLSListen) != 0) && + (len(cfg.TLSCfg().ServerCerificate) == 0 || + len(cfg.TLSCfg().ServerKey) == 0) { + utils.Logger.Warning("WARNING: missing TLS certificate/key file!") + return + } + if cfg.ListenCfg().RPCGOBTLSListen != utils.EmptyString { + go server.ServeGOBTLS( + cfg.ListenCfg().RPCGOBTLSListen, + cfg.TLSCfg().ServerCerificate, + cfg.TLSCfg().ServerKey, + cfg.TLSCfg().CaCertificate, + cfg.TLSCfg().ServerPolicy, + cfg.TLSCfg().ServerName, + shdChan, + ) + } + if cfg.ListenCfg().RPCJSONTLSListen != utils.EmptyString { + go server.ServeJSONTLS( + cfg.ListenCfg().RPCJSONTLSListen, + cfg.TLSCfg().ServerCerificate, + cfg.TLSCfg().ServerKey, + cfg.TLSCfg().CaCertificate, + cfg.TLSCfg().ServerPolicy, + cfg.TLSCfg().ServerName, + shdChan, + ) + } + if cfg.ListenCfg().HTTPTLSListen != utils.EmptyString { + go server.ServeHTTPTLS( + cfg.ListenCfg().HTTPTLSListen, + cfg.TLSCfg().ServerCerificate, + cfg.TLSCfg().ServerKey, + cfg.TLSCfg().CaCertificate, + cfg.TLSCfg().ServerPolicy, + cfg.TLSCfg().ServerName, + cfg.HTTPCfg().JsonRPCURL, + cfg.HTTPCfg().WSURL, + cfg.HTTPCfg().UseBasicAuth, + cfg.HTTPCfg().AuthUsers, + shdChan, + ) + } +}