diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go
index 4bf086f93..2224abaf1 100644
--- a/cmd/cgr-engine/cgr-engine.go
+++ b/cmd/cgr-engine/cgr-engine.go
@@ -84,7 +84,7 @@ func initCacheS(internalCacheSChan chan birpc.ClientConnector,
cpS *engine.CapsStats) (chS *engine.CacheS) {
chS = engine.NewCacheS(cfg, dm, cpS)
go func() {
- if err := chS.Precache(); err != nil {
+ if err := chS.Precache(context.TODO()); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> could not init, error: %s", utils.CacheS, err.Error()))
shdChan.CloseOnce()
}
@@ -141,6 +141,7 @@ func initConfigSv1(internalConfigChan chan birpc.ClientConnector,
internalConfigChan <- rpc
}
+/*
func startRPC(server *cores.Server, internalAdminSChan,
internalCdrSChan, internalRsChan, internalStatSChan,
internalAttrSChan, internalChargerSChan, internalThdSChan, internalRouteSChan,
@@ -251,6 +252,7 @@ func startRPC(server *cores.Server, internalAdminSChan,
)
}
}
+*/
func writePid() {
utils.Logger.Info(*pidFile)
diff --git a/cores/libserver.go b/cores/libserver.go
new file mode 100644
index 000000000..c90abebfc
--- /dev/null
+++ b/cores/libserver.go
@@ -0,0 +1,178 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package cores
+
+import (
+ "bytes"
+ "crypto/tls"
+ "crypto/x509"
+ "errors"
+ "fmt"
+ "io"
+ "log"
+ "net"
+ "os"
+ "strings"
+ "time"
+
+ "github.com/cgrates/birpc"
+ "github.com/cgrates/birpc/context"
+ "github.com/cgrates/cgrates/analyzers"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+)
+
+// rpcRequest represents a RPC request.
+// rpcRequest implements the io.ReadWriteCloser interface.
+type rpcRequest struct {
+ r io.ReadCloser // holds the JSON formated RPC request
+ rw io.ReadWriter // holds the JSON formated RPC response
+ remoteAddr net.Addr
+ caps *engine.Caps
+ anzWarpper *analyzers.AnalyzerService
+}
+
+// newRPCRequest returns a new rpcRequest.
+func newRPCRequest(r io.ReadCloser, remoteAddr net.Addr, caps *engine.Caps, anz *analyzers.AnalyzerService) *rpcRequest {
+ return &rpcRequest{
+ r: r,
+ rw: new(bytes.Buffer),
+ remoteAddr: remoteAddr,
+ caps: caps,
+ anzWarpper: anz,
+ }
+}
+
+func (r *rpcRequest) Read(p []byte) (n int, err error) {
+ return r.r.Read(p)
+}
+
+func (r *rpcRequest) Write(p []byte) (n int, err error) {
+ return r.rw.Write(p)
+}
+
+func (r *rpcRequest) LocalAddr() net.Addr {
+ return utils.LocalAddr()
+}
+func (r *rpcRequest) RemoteAddr() net.Addr {
+ return r.remoteAddr
+}
+
+func (r *rpcRequest) Close() error {
+ return r.r.Close()
+}
+
+// Call invokes the RPC request, waits for it to complete, and returns the results.
+func (r *rpcRequest) Call() io.Reader {
+ birpc.ServeCodec(newCapsJSONCodec(r, r.caps, r.anzWarpper))
+ return r.rw
+}
+
+func loadTLSConfig(serverCrt, serverKey, caCert string, serverPolicy int,
+ serverName string) (config *tls.Config, err error) {
+ cert, err := tls.LoadX509KeyPair(serverCrt, serverKey)
+ if err != nil {
+ utils.Logger.Crit(fmt.Sprintf("Error: %s when load server keys", err))
+ return nil, err
+ }
+
+ rootCAs, err := x509.SystemCertPool()
+ //This will only happen on windows
+ if err != nil {
+ utils.Logger.Crit(fmt.Sprintf("Error: %s when load SystemCertPool", err))
+ return nil, err
+ }
+
+ if caCert != "" {
+ ca, err := os.ReadFile(caCert)
+ if err != nil {
+ utils.Logger.Crit(fmt.Sprintf("Error: %s when read CA", err))
+ return config, err
+ }
+
+ if ok := rootCAs.AppendCertsFromPEM(ca); !ok {
+ utils.Logger.Crit("Cannot append certificate authority")
+ return config, errors.New("Cannot append certificate authority")
+ }
+ }
+
+ config = &tls.Config{
+ Certificates: []tls.Certificate{cert},
+ ClientAuth: tls.ClientAuthType(serverPolicy),
+ ClientCAs: rootCAs,
+ }
+ if serverName != "" {
+ config.ServerName = serverName
+ }
+ return
+}
+
+func acceptRPC(ctx *context.Context, shtdwnEngine context.CancelFunc,
+ srv *birpc.Server, l net.Listener, codecName string, newCodec func(conn conn) birpc.ServerCodec) (err error) {
+ var errCnt int
+ var lastErrorTime time.Time
+ for {
+ var conn net.Conn
+ if conn, err = l.Accept(); err != nil {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+ utils.Logger.Err(fmt.Sprintf(" %s accept error: <%s>", codecName, err.Error()))
+ now := time.Now()
+ if now.Sub(lastErrorTime) > 5*time.Second {
+ errCnt = 0 // reset error count if last error was more than 5 seconds ago
+ }
+ lastErrorTime = time.Now()
+ errCnt++
+ if errCnt > 50 { // Too many errors in short interval, network buffer failure most probably
+ shtdwnEngine()
+ return
+ }
+ continue
+ }
+ go birpc.ServeCodec(newCodec(conn))
+ }
+}
+
+func acceptBiRPC(srv *birpc.BirpcServer, l net.Listener, codecName string, newCodec func(conn conn) birpc.BirpcCodec, stopbiRPCServer chan struct{}) {
+ for {
+ conn, err := l.Accept()
+ if err != nil {
+ if strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log
+ return
+ }
+ stopbiRPCServer <- struct{}{}
+ utils.Logger.Crit(fmt.Sprintf("Stopped Bi%s server beacause %s", codecName, err))
+ return // stop if we get Accept error
+ }
+ go srv.ServeCodec(newCodec(conn))
+ }
+}
+
+func listenBiRPC(srv *birpc.BirpcServer, addr, codecName string, newCodec func(conn conn) birpc.BirpcCodec, stopbiRPCServer chan struct{}) (lBiRPC net.Listener, err error) {
+ if lBiRPC, err = net.Listen(utils.TCP, addr); err != nil {
+ log.Printf("ServeBi%s listen error: %s \n", codecName, err)
+ return
+ }
+ utils.Logger.Info(fmt.Sprintf("Starting CGRateS Bi%s server at <%s>", codecName, addr))
+ go acceptBiRPC(srv, lBiRPC, codecName, newCodec, stopbiRPCServer)
+ return
+}
diff --git a/cores/server.go b/cores/server.go
index c4070bd96..a13f665d5 100644
--- a/cores/server.go
+++ b/cores/server.go
@@ -19,40 +19,42 @@ along with this program. If not, see
package cores
import (
- "bytes"
"crypto/tls"
- "crypto/x509"
- "errors"
"fmt"
"io"
"log"
"net"
"net/http"
"net/http/pprof"
- "os"
- "strings"
"sync"
- "time"
"github.com/cgrates/birpc"
+ "github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/analyzers"
+ "github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"golang.org/x/net/websocket"
)
func NewServer(caps *engine.Caps) (s *Server) {
- return &Server{
+ s = &Server{
httpMux: http.NewServeMux(),
httpsMux: http.NewServeMux(),
stopbiRPCServer: make(chan struct{}, 1),
caps: caps,
+
+ rpcStarted: utils.NewSyncedChan(),
+ rpcServer: birpc.NewServer(),
+ birpcSrv: birpc.NewBirpcServer(),
}
+ s.httpServer = &http.Server{Handler: s.httpMux}
+ s.httpsServer = &http.Server{Handler: s.httpsMux}
+ return
}
type Server struct {
sync.RWMutex
- rpcEnabled bool
httpEnabled bool
birpcSrv *birpc.BirpcServer
stopbiRPCServer chan struct{} // used in order to fully stop the biRPC
@@ -60,6 +62,16 @@ type Server struct {
httpMux *http.ServeMux
caps *engine.Caps
anz *analyzers.AnalyzerService
+
+ rpcStarted *utils.SyncedChan
+ rpcServer *birpc.Server
+ rpcJSONl net.Listener
+ rpcGOBl net.Listener
+ rpcJSONlTLS net.Listener
+ rpcGOBlTLS net.Listener
+ httpServer *http.Server
+ httpsServer *http.Server
+ startSrv sync.Once
}
func (s *Server) SetAnalyzer(anz *analyzers.AnalyzerService) {
@@ -68,16 +80,10 @@ func (s *Server) SetAnalyzer(anz *analyzers.AnalyzerService) {
func (s *Server) RpcRegister(rcvr interface{}) {
birpc.Register(rcvr)
- s.Lock()
- s.rpcEnabled = true
- s.Unlock()
}
func (s *Server) RpcRegisterName(name string, rcvr interface{}) {
birpc.RegisterName(name, rcvr)
- s.Lock()
- s.rpcEnabled = true
- s.Unlock()
}
func (s *Server) RpcUnregisterName(name string) {
@@ -85,24 +91,16 @@ func (s *Server) RpcUnregisterName(name string) {
}
func (s *Server) RegisterHTTPFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
- if s.httpMux != nil {
- s.httpMux.HandleFunc(pattern, handler)
- }
- if s.httpsMux != nil {
- s.httpsMux.HandleFunc(pattern, handler)
- }
+ s.httpMux.HandleFunc(pattern, handler)
+ s.httpsMux.HandleFunc(pattern, handler)
s.Lock()
s.httpEnabled = true
s.Unlock()
}
func (s *Server) RegisterHttpHandler(pattern string, handler http.Handler) {
- if s.httpMux != nil {
- s.httpMux.Handle(pattern, handler)
- }
- if s.httpsMux != nil {
- s.httpsMux.Handle(pattern, handler)
- }
+ s.httpMux.Handle(pattern, handler)
+ s.httpsMux.Handle(pattern, handler)
s.Lock()
s.httpEnabled = true
s.Unlock()
@@ -110,77 +108,9 @@ func (s *Server) RegisterHttpHandler(pattern string, handler http.Handler) {
// Registers a new BiJsonRpc name
func (s *Server) BiRPCRegisterName(name string, rcv interface{}) {
- s.RLock()
- isNil := s.birpcSrv == nil
- s.RUnlock()
- if isNil {
- s.Lock()
- s.birpcSrv = birpc.NewBirpcServer()
- s.Unlock()
- }
s.birpcSrv.RegisterName(name, rcv)
}
-func (s *Server) serveCodec(addr, codecName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) birpc.ServerCodec,
- shdChan *utils.SyncedChan) {
- s.RLock()
- enabled := s.rpcEnabled
- s.RUnlock()
- if !enabled {
- return
- }
-
- l, e := net.Listen(utils.TCP, addr)
- if e != nil {
- log.Printf("Serve%s listen error: %s", codecName, e)
- shdChan.CloseOnce()
- return
- }
- utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s server at <%s>.", codecName, addr))
- s.accept(l, codecName, newCodec, shdChan)
-}
-
-func (s *Server) accept(l net.Listener, codecName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) birpc.ServerCodec,
- shdChan *utils.SyncedChan) {
- errCnt := 0
- var lastErrorTime time.Time
- for {
- conn, err := l.Accept()
- if err != nil {
- utils.Logger.Err(fmt.Sprintf(" %s accept error: <%s>", codecName, err.Error()))
- now := time.Now()
- if now.Sub(lastErrorTime) > 5*time.Second {
- errCnt = 0 // reset error count if last error was more than 5 seconds ago
- }
- lastErrorTime = time.Now()
- errCnt++
- if errCnt > 50 { // Too many errors in short interval, network buffer failure most probably
- shdChan.CloseOnce()
- return
- }
- continue
- }
- go birpc.ServeCodec(newCodec(conn, s.caps, s.anz))
- }
-}
-
-func (s *Server) ServeJSON(addr string, shdChan *utils.SyncedChan) {
- s.serveCodec(addr, utils.JSONCaps, newCapsJSONCodec, shdChan)
-}
-
-func (s *Server) ServeGOB(addr string, shdChan *utils.SyncedChan) {
- s.serveCodec(addr, utils.GOBCaps, newCapsGOBCodec, shdChan)
-}
-
-func (s *Server) handleRequest(w http.ResponseWriter, r *http.Request) {
- defer r.Body.Close()
- w.Header().Set("Content-Type", "application/json")
- rmtIP, _ := utils.GetRemoteIP(r)
- rmtAddr, _ := net.ResolveIPAddr(utils.EmptyString, rmtIP)
- res := newRPCRequest(r.Body, rmtAddr, s.caps, s.anz).Call()
- io.Copy(w, res)
-}
-
func registerProfiler(addr string, mux *http.ServeMux) {
mux.HandleFunc(addr, pprof.Index)
mux.HandleFunc(addr+"cmdline", pprof.Cmdline)
@@ -203,19 +133,53 @@ func (s *Server) RegisterProfiler(addr string) {
registerProfiler(addr, s.httpsMux)
}
-func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string,
- useBasicAuth bool, userList map[string]string, shdChan *utils.SyncedChan) {
- s.RLock()
- enabled := s.rpcEnabled
- s.RUnlock()
+func (s *Server) handleRequest(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+ rmtIP, _ := utils.GetRemoteIP(r)
+ rmtAddr, _ := net.ResolveIPAddr(utils.EmptyString, rmtIP)
+ res := newRPCRequest(r.Body, rmtAddr, s.caps, s.anz).Call()
+ io.Copy(w, res)
+ r.Body.Close()
+}
+
+func (s *Server) handleWebSocket(ws *websocket.Conn) {
+ birpc.ServeCodec(newCapsJSONCodec(ws, s.caps, s.anz))
+}
+
+func (s *Server) ServeJSON(ctx *context.Context, shtdwnEngine context.CancelFunc, addr string) (err error) {
+ if s.rpcJSONl, err = net.Listen(utils.TCP, addr); err != nil {
+ log.Printf("Serve%s listen error: %s", utils.JSONCaps, err)
+ shtdwnEngine()
+ return
+ }
+ utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s server at <%s>.", utils.JSONCaps, addr))
+ return acceptRPC(ctx, shtdwnEngine, s.rpcServer, s.rpcJSONl, utils.JSONCaps, func(conn conn) birpc.ServerCodec {
+ return newCapsJSONCodec(conn, s.caps, s.anz)
+ })
+}
+
+func (s *Server) ServeGOB(ctx *context.Context, shtdwnEngine context.CancelFunc, addr string) (err error) {
+ if s.rpcGOBl, err = net.Listen(utils.TCP, addr); err != nil {
+ log.Printf("Serve%s listen error: %s", utils.GOBCaps, err)
+ shtdwnEngine()
+ return
+ }
+ utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s server at <%s>.", utils.GOBCaps, addr))
+ return acceptRPC(ctx, shtdwnEngine, s.rpcServer, s.rpcGOBl, utils.GOBCaps, func(conn conn) birpc.ServerCodec {
+ return newCapsGOBCodec(conn, s.caps, s.anz)
+ })
+}
+
+func (s *Server) ServeHTTP(shtdwnEngine context.CancelFunc, addr, jsonRPCURL, wsRPCURL string,
+ useBasicAuth bool, userList map[string]string) {
+ s.Lock()
+ s.httpEnabled = s.httpEnabled || jsonRPCURL != "" || wsRPCURL != ""
+ enabled := s.httpEnabled
+ s.Unlock()
if !enabled {
return
}
if jsonRPCURL != "" {
- s.Lock()
- s.httpEnabled = true
- s.Unlock()
-
utils.Logger.Info(" enabling handler for JSON-RPC")
if useBasicAuth {
s.httpMux.HandleFunc(jsonRPCURL, use(s.handleRequest, basicAuth(userList)))
@@ -224,9 +188,6 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string,
}
}
if wsRPCURL != "" {
- s.Lock()
- s.httpEnabled = true
- s.Unlock()
utils.Logger.Info(" enabling handler for WebSocket connections")
wsHandler := websocket.Handler(s.handleWebSocket)
if useBasicAuth {
@@ -235,40 +196,35 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string,
s.httpMux.Handle(wsRPCURL, wsHandler)
}
}
- if !s.httpEnabled {
- return
- }
if useBasicAuth {
utils.Logger.Info(" enabling basic auth")
}
utils.Logger.Info(fmt.Sprintf(" start listening at <%s>", addr))
- if err := http.ListenAndServe(addr, s.httpMux); err != nil {
+ s.httpServer.Addr = addr
+ if err := s.httpServer.ListenAndServe(); err != nil {
log.Println(fmt.Sprintf("Error: %s when listening ", err))
- shdChan.CloseOnce()
+ shtdwnEngine()
}
}
// ServeBiRPC create a goroutine to listen and serve as BiRPC server
-func (s *Server) ServeBiRPC(addrJSON, addrGOB string, onConn func(birpc.ClientConnector), onDis func(birpc.ClientConnector)) (err error) {
- s.RLock()
- isNil := s.birpcSrv == nil
- s.RUnlock()
- if isNil {
- return fmt.Errorf("BiRPCServer should not be nil")
- }
-
+func (s *Server) ServeBiRPC2(addrJSON, addrGOB string, onConn, onDis func(birpc.ClientConnector)) (err error) {
s.birpcSrv.OnConnect(onConn)
s.birpcSrv.OnDisconnect(onDis)
if addrJSON != utils.EmptyString {
var ljson net.Listener
- if ljson, err = s.listenBiRPC(s.birpcSrv, addrJSON, utils.JSONCaps, newCapsBiRPCJSONCodec); err != nil {
+ if ljson, err = listenBiRPC(s.birpcSrv, addrJSON, utils.JSONCaps, func(conn conn) birpc.BirpcCodec {
+ return newCapsBiRPCJSONCodec(conn, s.caps, s.anz)
+ }, s.stopbiRPCServer); err != nil {
return
}
defer ljson.Close()
}
if addrGOB != utils.EmptyString {
var lgob net.Listener
- if lgob, err = s.listenBiRPC(s.birpcSrv, addrGOB, utils.GOBCaps, newCapsBiRPCGOBCodec); err != nil {
+ if lgob, err = listenBiRPC(s.birpcSrv, addrGOB, utils.GOBCaps, func(conn conn) birpc.BirpcCodec {
+ return newCapsBiRPCGOBCodec(conn, s.caps, s.anz)
+ }, s.stopbiRPCServer); err != nil {
return
}
defer lgob.Close()
@@ -277,175 +233,58 @@ func (s *Server) ServeBiRPC(addrJSON, addrGOB string, onConn func(birpc.ClientCo
return
}
-func (s *Server) listenBiRPC(srv *birpc.BirpcServer, addr, codecName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) birpc.BirpcCodec) (lBiRPC net.Listener, err error) {
- if lBiRPC, err = net.Listen(utils.TCP, addr); err != nil {
- log.Printf("ServeBi%s listen error: %s \n", codecName, err)
- return
- }
- utils.Logger.Info(fmt.Sprintf("Starting CGRateS Bi%s server at <%s>", codecName, addr))
- go s.acceptBiRPC(srv, lBiRPC, codecName, newCodec)
- return
-}
-
-func (s *Server) acceptBiRPC(srv *birpc.BirpcServer, l net.Listener, codecName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) birpc.BirpcCodec) {
- for {
- conn, err := l.Accept()
- if err != nil {
- if strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log
- return
- }
- s.stopbiRPCServer <- struct{}{}
- utils.Logger.Crit(fmt.Sprintf("Stopped Bi%s server beacause %s", codecName, err))
- return // stop if we get Accept error
- }
- go srv.ServeCodec(newCodec(conn, s.caps, s.anz))
- }
-}
-
-// StopBiRPC stops the go routine create with ServeBiJSON
-func (s *Server) StopBiRPC() {
- s.stopbiRPCServer <- struct{}{}
- s.Lock()
- s.birpcSrv = nil
- s.Unlock()
-}
-
-// rpcRequest represents a RPC request.
-// rpcRequest implements the io.ReadWriteCloser interface.
-type rpcRequest struct {
- r io.ReadCloser // holds the JSON formated RPC request
- rw io.ReadWriter // holds the JSON formated RPC response
- remoteAddr net.Addr
- caps *engine.Caps
- anzWarpper *analyzers.AnalyzerService
-}
-
-// newRPCRequest returns a new rpcRequest.
-func newRPCRequest(r io.ReadCloser, remoteAddr net.Addr, caps *engine.Caps, anz *analyzers.AnalyzerService) *rpcRequest {
- return &rpcRequest{
- r: r,
- rw: new(bytes.Buffer),
- remoteAddr: remoteAddr,
- caps: caps,
- anzWarpper: anz,
- }
-}
-
-func (r *rpcRequest) Read(p []byte) (n int, err error) {
- return r.r.Read(p)
-}
-
-func (r *rpcRequest) Write(p []byte) (n int, err error) {
- return r.rw.Write(p)
-}
-
-func (r *rpcRequest) LocalAddr() net.Addr {
- return utils.LocalAddr()
-}
-func (r *rpcRequest) RemoteAddr() net.Addr {
- return r.remoteAddr
-}
-
-func (r *rpcRequest) Close() error {
- return r.r.Close()
-}
-
-// Call invokes the RPC request, waits for it to complete, and returns the results.
-func (r *rpcRequest) Call() io.Reader {
- birpc.ServeCodec(newCapsJSONCodec(r, r.caps, r.anzWarpper))
- return r.rw
-}
-
-func loadTLSConfig(serverCrt, serverKey, caCert string, serverPolicy int,
- serverName string) (config *tls.Config, err error) {
- cert, err := tls.LoadX509KeyPair(serverCrt, serverKey)
- if err != nil {
- utils.Logger.Crit(fmt.Sprintf("Error: %s when load server keys", err))
- return nil, err
- }
-
- rootCAs, err := x509.SystemCertPool()
- //This will only happen on windows
- if err != nil {
- utils.Logger.Crit(fmt.Sprintf("Error: %s when load SystemCertPool", err))
- return nil, err
- }
-
- if caCert != "" {
- ca, err := os.ReadFile(caCert)
- if err != nil {
- utils.Logger.Crit(fmt.Sprintf("Error: %s when read CA", err))
- return config, err
- }
-
- if ok := rootCAs.AppendCertsFromPEM(ca); !ok {
- utils.Logger.Crit("Cannot append certificate authority")
- return config, errors.New("Cannot append certificate authority")
- }
- }
-
- config = &tls.Config{
- Certificates: []tls.Certificate{cert},
- ClientAuth: tls.ClientAuthType(serverPolicy),
- ClientCAs: rootCAs,
- }
- if serverName != "" {
- config.ServerName = serverName
- }
- return
-}
-
-func (s *Server) serveCodecTLS(addr, codecName, serverCrt, serverKey, caCert string,
- serverPolicy int, serverName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) birpc.ServerCodec,
- shdChan *utils.SyncedChan) {
- s.RLock()
- enabled := s.rpcEnabled
- s.RUnlock()
- if !enabled {
- return
- }
+func (s *Server) ServeGOBTLS(ctx *context.Context, shtdwnEngine context.CancelFunc,
+ addr, serverCrt, serverKey, caCert string, serverPolicy int, serverName string) (err error) {
config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName)
if err != nil {
- shdChan.CloseOnce()
+ shtdwnEngine()
return
}
- listener, err := tls.Listen(utils.TCP, addr, config)
+ s.rpcGOBlTLS, err = tls.Listen(utils.TCP, addr, config)
if err != nil {
log.Println(fmt.Sprintf("Error: %s when listening", err))
- shdChan.CloseOnce()
+ shtdwnEngine()
return
}
- utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s TLS server at <%s>.", codecName, addr))
- s.accept(listener, codecName+" "+utils.TLS, newCodec, shdChan)
+ utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s TLS server at <%s>.", utils.GOBCaps, addr))
+
+ return acceptRPC(ctx, shtdwnEngine, s.rpcServer, s.rpcGOBlTLS, utils.GOBCaps, func(conn conn) birpc.ServerCodec {
+ return newCapsGOBCodec(conn, s.caps, s.anz)
+ })
}
-func (s *Server) ServeGOBTLS(addr, serverCrt, serverKey, caCert string,
- serverPolicy int, serverName string, shdChan *utils.SyncedChan) {
- s.serveCodecTLS(addr, utils.GOBCaps, serverCrt, serverKey, caCert, serverPolicy, serverName, newCapsGOBCodec, shdChan)
+func (s *Server) ServeJSONTLS(ctx *context.Context, shtdwnEngine context.CancelFunc,
+ addr, serverCrt, serverKey, caCert string, serverPolicy int, serverName string) (err error) {
+ config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName)
+ if err != nil {
+ shtdwnEngine()
+ return
+ }
+ s.rpcJSONlTLS, err = tls.Listen(utils.TCP, addr, config)
+ if err != nil {
+ log.Println(fmt.Sprintf("Error: %s when listening", err))
+ shtdwnEngine()
+ return
+ }
+ utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s TLS server at <%s>.", utils.JSONCaps, addr))
+
+ return acceptRPC(ctx, shtdwnEngine, s.rpcServer, s.rpcJSONlTLS, utils.JSONCaps, func(conn conn) birpc.ServerCodec {
+ return newCapsGOBCodec(conn, s.caps, s.anz)
+ })
}
-func (s *Server) ServeJSONTLS(addr, serverCrt, serverKey, caCert string,
- serverPolicy int, serverName string, shdChan *utils.SyncedChan) {
- s.serveCodecTLS(addr, utils.JSONCaps, serverCrt, serverKey, caCert, serverPolicy, serverName, newCapsJSONCodec, shdChan)
-}
-
-func (s *Server) handleWebSocket(ws *websocket.Conn) {
- birpc.ServeCodec(newCapsJSONCodec(ws, s.caps, s.anz))
-}
-
-func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverPolicy int,
+func (s *Server) ServeHTTPS(shtdwnEngine context.CancelFunc,
+ addr, serverCrt, serverKey, caCert string, serverPolicy int,
serverName string, jsonRPCURL string, wsRPCURL string,
- useBasicAuth bool, userList map[string]string, shdChan *utils.SyncedChan) {
- s.RLock()
- enabled := s.rpcEnabled
- s.RUnlock()
+ useBasicAuth bool, userList map[string]string) {
+ s.Lock()
+ s.httpEnabled = s.httpEnabled || jsonRPCURL != "" || wsRPCURL != ""
+ enabled := s.httpEnabled
+ s.Unlock()
if !enabled {
return
}
if jsonRPCURL != "" {
- s.Lock()
- s.httpEnabled = true
- s.Unlock()
utils.Logger.Info(" enabling handler for JSON-RPC")
if useBasicAuth {
s.httpsMux.HandleFunc(jsonRPCURL, use(s.handleRequest, basicAuth(userList)))
@@ -454,9 +293,6 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP
}
}
if wsRPCURL != "" {
- s.Lock()
- s.httpEnabled = true
- s.Unlock()
utils.Logger.Info(" enabling handler for WebSocket connections")
wsHandler := websocket.Handler(s.handleWebSocket)
if useBasicAuth {
@@ -465,25 +301,96 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP
s.httpsMux.Handle(wsRPCURL, wsHandler)
}
}
- if !s.httpEnabled {
- return
- }
if useBasicAuth {
utils.Logger.Info(" enabling basic auth")
}
config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName)
if err != nil {
- shdChan.CloseOnce()
+ shtdwnEngine()
return
}
- httpSrv := http.Server{
- Addr: addr,
- Handler: s.httpsMux,
- TLSConfig: config,
- }
+ s.httpsServer.Addr = addr
+ s.httpsServer.TLSConfig = config
utils.Logger.Info(fmt.Sprintf(" start listening at <%s>", addr))
- if err := httpSrv.ListenAndServeTLS(serverCrt, serverKey); err != nil {
+
+ if err := s.httpsServer.ListenAndServeTLS(serverCrt, serverKey); err != nil {
log.Println(fmt.Sprintf("Error: %s when listening ", err))
- shdChan.CloseOnce()
+ shtdwnEngine()
}
}
+
+func (s *Server) Stop() {
+ s.rpcJSONl.Close()
+ s.rpcGOBl.Close()
+ s.rpcJSONlTLS.Close()
+ s.rpcGOBlTLS.Close()
+ s.httpServer.Shutdown(context.Background())
+ s.httpsServer.Shutdown(context.Background())
+ s.StopBiRPC()
+}
+
+// StopBiRPC stops the go routine create with ServeBiJSON
+func (s *Server) StopBiRPC() {
+ s.stopbiRPCServer <- struct{}{}
+ s.birpcSrv = birpc.NewBirpcServer()
+}
+
+func (s *Server) StartServer(ctx *context.Context, shtdwnEngine context.CancelFunc, cfg *config.CGRConfig) {
+ s.startSrv.Do(func() {
+ go s.ServeJSON(ctx, shtdwnEngine, cfg.ListenCfg().RPCJSONListen)
+ go s.ServeGOB(ctx, shtdwnEngine, cfg.ListenCfg().RPCGOBListen)
+ go s.ServeHTTP(
+ shtdwnEngine,
+ cfg.ListenCfg().HTTPListen,
+ cfg.HTTPCfg().JsonRPCURL,
+ cfg.HTTPCfg().WSURL,
+ cfg.HTTPCfg().UseBasicAuth,
+ cfg.HTTPCfg().AuthUsers,
+ )
+ if (len(cfg.ListenCfg().RPCGOBTLSListen) != 0 ||
+ len(cfg.ListenCfg().RPCJSONTLSListen) != 0 ||
+ len(cfg.ListenCfg().HTTPTLSListen) != 0) &&
+ (len(cfg.TLSCfg().ServerCerificate) == 0 ||
+ len(cfg.TLSCfg().ServerKey) == 0) {
+ utils.Logger.Warning("WARNING: missing TLS certificate/key file!")
+ return
+ }
+ if cfg.ListenCfg().RPCGOBTLSListen != utils.EmptyString {
+ go s.ServeGOBTLS(
+ ctx, shtdwnEngine,
+ cfg.ListenCfg().RPCGOBTLSListen,
+ cfg.TLSCfg().ServerCerificate,
+ cfg.TLSCfg().ServerKey,
+ cfg.TLSCfg().CaCertificate,
+ cfg.TLSCfg().ServerPolicy,
+ cfg.TLSCfg().ServerName,
+ )
+ }
+ if cfg.ListenCfg().RPCJSONTLSListen != utils.EmptyString {
+ go s.ServeJSONTLS(
+ ctx, shtdwnEngine,
+ cfg.ListenCfg().RPCJSONTLSListen,
+ cfg.TLSCfg().ServerCerificate,
+ cfg.TLSCfg().ServerKey,
+ cfg.TLSCfg().CaCertificate,
+ cfg.TLSCfg().ServerPolicy,
+ cfg.TLSCfg().ServerName,
+ )
+ }
+ if cfg.ListenCfg().HTTPTLSListen != utils.EmptyString {
+ go s.ServeHTTPS(
+ shtdwnEngine,
+ cfg.ListenCfg().HTTPTLSListen,
+ cfg.TLSCfg().ServerCerificate,
+ cfg.TLSCfg().ServerKey,
+ cfg.TLSCfg().CaCertificate,
+ cfg.TLSCfg().ServerPolicy,
+ cfg.TLSCfg().ServerName,
+ cfg.HTTPCfg().JsonRPCURL,
+ cfg.HTTPCfg().WSURL,
+ cfg.HTTPCfg().UseBasicAuth,
+ cfg.HTTPCfg().AuthUsers,
+ )
+ }
+ })
+}
diff --git a/engine/caches.go b/engine/caches.go
index 60fb20afe..558e99e0f 100644
--- a/engine/caches.go
+++ b/engine/caches.go
@@ -217,7 +217,7 @@ func (chS *CacheS) GetPrecacheChannel(chID string) chan struct{} {
}
// Precache loads data from DataDB into cache at engine start
-func (chS *CacheS) Precache() (err error) {
+func (chS *CacheS) Precache(ctx *context.Context) (err error) {
var wg sync.WaitGroup // wait for precache to finish
errChan := make(chan error)
doneChan := make(chan struct{})
@@ -228,7 +228,7 @@ func (chS *CacheS) Precache() (err error) {
}
wg.Add(1)
go func(cacheID string) {
- errCache := chS.dm.CacheDataFromDB(context.TODO(),
+ errCache := chS.dm.CacheDataFromDB(ctx,
utils.CacheInstanceToPrefix[cacheID],
[]string{utils.MetaAny},
false)
@@ -247,6 +247,8 @@ func (chS *CacheS) Precache() (err error) {
select {
case err = <-errChan:
case <-doneChan:
+ case <-ctx.Done():
+ err = ctx.Err()
}
return
}
diff --git a/services/cgr-engine.go b/services/cgr-engine.go
new file mode 100644
index 000000000..b33ce7873
--- /dev/null
+++ b/services/cgr-engine.go
@@ -0,0 +1,74 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package services
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/cgrates/birpc"
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/servmanager"
+ "github.com/cgrates/cgrates/utils"
+)
+
+type CGREngine struct {
+ cfg *config.CGRConfig
+
+ srvManager *servmanager.ServiceManager
+ srvDep map[string]*sync.WaitGroup
+ cmConns map[string]chan birpc.ClientConnector
+}
+
+func (cgr *CGREngine) AddService(service servmanager.Service, connName, apiPrefix string,
+ iConnCh chan birpc.ClientConnector) {
+ cgr.srvManager.AddServices(service)
+ cgr.cmConns[utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources)] = iConnCh
+ cgr.srvDep[service.ServiceName()] = new(sync.WaitGroup)
+ engine.IntRPC.AddInternalRPCClient(apiPrefix, iConnCh)
+}
+
+func (cgr *CGREngine) InitConfigFromPath(path string, nodeID string) (err error) {
+ // Init config
+ if cgr.cfg, err = config.NewCGRConfigFromPath(path); err != nil {
+ err = fmt.Errorf("could not parse config: <%s>", err)
+ return
+ }
+ if cgr.cfg.ConfigDBCfg().Type != utils.MetaInternal {
+ var d config.ConfigDB
+ if d, err = engine.NewDataDBConn(cgr.cfg.ConfigDBCfg().Type,
+ cgr.cfg.ConfigDBCfg().Host, cgr.cfg.ConfigDBCfg().Port,
+ cgr.cfg.ConfigDBCfg().Name, cgr.cfg.ConfigDBCfg().User,
+ cgr.cfg.ConfigDBCfg().Password, cgr.cfg.GeneralCfg().DBDataEncoding,
+ cgr.cfg.ConfigDBCfg().Opts); err != nil { // Cannot configure getter database, show stopper
+ err = fmt.Errorf("could not configure configDB: <%s>", err)
+ return
+ }
+ if err = cgr.cfg.LoadFromDB(d); err != nil {
+ err = fmt.Errorf("could not parse config from DB: <%s>", err)
+ return
+ }
+ }
+ if nodeID != utils.EmptyString {
+ cgr.cfg.GeneralCfg().NodeID = nodeID
+ }
+ config.SetCgrConfig(cgr.cfg) // Share the config object
+ return
+}
diff --git a/services/libcgr-engine.go b/services/libcgr-engine.go
new file mode 100644
index 000000000..fea49c696
--- /dev/null
+++ b/services/libcgr-engine.go
@@ -0,0 +1,343 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package services
+
+import (
+ "flag"
+ "fmt"
+ "os"
+ "os/signal"
+ "strconv"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+
+ "github.com/cgrates/birpc"
+ "github.com/cgrates/birpc/context"
+ "github.com/cgrates/cgrates/apis"
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/cores"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/loaders"
+ "github.com/cgrates/cgrates/servmanager"
+ "github.com/cgrates/cgrates/utils"
+)
+
+func NewCGREngineFlags() *CGREngineFlags {
+ fs := flag.NewFlagSet(utils.CgrEngine, flag.ContinueOnError)
+ return &CGREngineFlags{
+ FlagSet: fs,
+ Cfgpath: fs.String(utils.CfgPathCgr, utils.ConfigPath, "Configuration directory path."),
+ Version: fs.Bool(utils.VersionCgr, false, "Prints the application version."),
+ Checkconfig: fs.Bool(utils.CheckCfgCgr, false, "Verify the config without starting the engine"),
+ Pidfile: fs.String(utils.PidCgr, utils.EmptyString, "Write pid file"),
+ Httppprofpath: fs.String(utils.HttpPrfPthCgr, utils.EmptyString, "http address used for program profiling"),
+ Cpuprofdir: fs.String(utils.CpuProfDirCgr, utils.EmptyString, "write cpu profile to files"),
+ Memprofdir: fs.String(utils.MemProfDirCgr, utils.EmptyString, "write memory profile to file"),
+ Memprofinterval: fs.Duration(utils.MemProfIntervalCgr, 5*time.Second, "Time between memory profile saves"),
+ Memprofnrfiles: fs.Int(utils.MemProfNrFilesCgr, 1, "Number of memory profile to write"),
+ Scheduledshutdown: fs.String(utils.ScheduledShutdownCgr, utils.EmptyString, "shutdown the engine after this duration"),
+ Singlecpu: fs.Bool(utils.SingleCpuCgr, false, "Run on single CPU core"),
+ Syslogger: fs.String(utils.LoggerCfg, utils.EmptyString, "logger <*syslog|*stdout>"),
+ Nodeid: fs.String(utils.NodeIDCfg, utils.EmptyString, "The node ID of the engine"),
+ Loglevel: fs.Int(utils.LogLevelCfg, -1, "Log level (0-emergency to 7-debug)"),
+ Preload: fs.String(utils.PreloadCgr, utils.EmptyString, "LoaderIDs used to load the data before the engine starts"),
+ }
+}
+
+type CGREngineFlags struct {
+ *flag.FlagSet
+
+ Cfgpath *string
+ Version *bool
+ Checkconfig *bool
+ Pidfile *string
+ Httppprofpath *string
+ Cpuprofdir *string
+ Memprofdir *string
+ Memprofinterval *time.Duration
+ Memprofnrfiles *int
+ Scheduledshutdown *string
+ Singlecpu *bool
+ Syslogger *string
+ Nodeid *string
+ Loglevel *int
+ Preload *string
+}
+
+func cgrSingnalHandler(ctx *context.Context, shutdown context.CancelFunc,
+ cfg *config.CGRConfig, shdWg *sync.WaitGroup) {
+ shutdownSignal := make(chan os.Signal, 1)
+ reloadSignal := make(chan os.Signal, 1)
+ signal.Notify(shutdownSignal, os.Interrupt,
+ syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
+ signal.Notify(reloadSignal, syscall.SIGHUP)
+ for {
+ select {
+ case <-ctx.Done():
+ shdWg.Done()
+ return
+ case <-shutdownSignal:
+ shutdown()
+ shdWg.Done()
+ return
+ case <-reloadSignal:
+ // do it in it's own goroutine in order to not block the signal handler with the reload functionality
+ go func() {
+ var reply string
+ if err := cfg.V1ReloadConfig(ctx,
+ new(config.ReloadArgs), &reply); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf("Error reloading configuration: <%s>", err))
+ }
+ }()
+ }
+ }
+}
+
+func cgrWritePid(pidFile string) (err error) {
+ var f *os.File
+ if f, err = os.Create(pidFile); err != nil {
+ err = fmt.Errorf("could not create pid file: %s", err)
+ return
+ }
+ if _, err = f.WriteString(strconv.Itoa(os.Getpid())); err != nil {
+ f.Close()
+ err = fmt.Errorf("could not write pid file: %s", err)
+ return
+ }
+ if err = f.Close(); err != nil {
+ err = fmt.Errorf("could not close pid file: %s", err)
+ }
+ return
+}
+
+func cgrRunPreload(ctx *context.Context, cfg *config.CGRConfig, loaderIDs string,
+ loader *LoaderService, iLoaderSCh chan birpc.ClientConnector) (err error) {
+ if !cfg.LoaderCfg().Enabled() {
+ err = fmt.Errorf("<%s> not enabled but required by preload mechanism", utils.LoaderS)
+ return
+ }
+ select {
+ case ldrs := <-iLoaderSCh:
+ iLoaderSCh <- ldrs
+ case <-ctx.Done():
+ return
+ }
+
+ var reply string
+ for _, loaderID := range strings.Split(loaderIDs, utils.FieldsSep) {
+ if err = loader.GetLoaderS().V1Load(ctx, &loaders.ArgsProcessFolder{
+ ForceLock: true, // force lock will unlock the file in case is locked and return error
+ LoaderID: loaderID,
+ StopOnError: true,
+ }, &reply); err != nil {
+ err = fmt.Errorf("<%s> preload failed on loadID <%s> , err: <%s>", utils.LoaderS, loaderID, err)
+ return
+ }
+ }
+ return
+}
+
+// cgrStartFilterService fires up the FilterS
+func cgrStartFilterService(ctx *context.Context, iFilterSCh chan *engine.FilterS,
+ cacheS *engine.CacheS, connMgr *engine.ConnManager,
+ cfg *config.CGRConfig, dm *engine.DataManager) {
+ select {
+ case <-cacheS.GetPrecacheChannel(utils.CacheFilters):
+ iFilterSCh <- engine.NewFilterS(cfg, connMgr, dm)
+ case <-ctx.Done():
+ }
+}
+
+// cgrInitCacheS inits the CacheS and starts precaching as well as populating internal channel for RPC conns
+func cgrInitCacheS(ctx *context.Context, shutdown context.CancelFunc,
+ iCacheSCh chan birpc.ClientConnector, server *cores.Server,
+ cfg *config.CGRConfig, dm *engine.DataManager, anz *AnalyzerService,
+ cpS *engine.CapsStats) (chS *engine.CacheS) {
+ chS = engine.NewCacheS(cfg, dm, cpS)
+ go func() {
+ if err := chS.Precache(ctx); err != nil {
+ utils.Logger.Crit(fmt.Sprintf("<%s> could not init, error: %s", utils.CacheS, err.Error()))
+ shutdown()
+ }
+ }()
+
+ chSv1, _ := birpc.NewService(apis.NewCacheSv1(chS), "", false)
+ if !cfg.DispatcherSCfg().Enabled {
+ server.RpcRegister(chSv1)
+ }
+ var rpc birpc.ClientConnector = chSv1
+ if anz.IsRunning() {
+ rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.CacheS)
+ }
+ iCacheSCh <- rpc
+ return
+}
+
+func cgrInitGuardianSv1(iGuardianSCh chan birpc.ClientConnector,
+ server *cores.Server, anz *AnalyzerService) {
+ // grdSv1 := v1.NewGuardianSv1()
+ // if !cfg.DispatcherSCfg().Enabled {
+ // server.RpcRegister(grdSv1)
+ // }
+ // var rpc birpc.ClientConnector = grdSv1
+ // if anz.IsRunning() {
+ // rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.GuardianS)
+ // }
+ // iGuardianSCh <- rpc
+}
+
+func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector,
+ srvMngr *servmanager.ServiceManager, server *cores.Server,
+ anz *AnalyzerService) {
+ // if !cfg.DispatcherSCfg().Enabled {
+ // server.RpcRegister(v1.NewServiceManagerV1(srvMngr))
+ // }
+ // var rpc birpc.ClientConnector = srvMngr
+ // if anz.IsRunning() {
+ // rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.ServiceManager)
+ // }
+ // iServMngrCh <- rpc
+}
+
+func cgrInitConfigSv1(iConfigCh chan birpc.ClientConnector,
+ cfg *config.CGRConfig, server *cores.Server, anz *AnalyzerService) {
+ cfgSv1, _ := birpc.NewService(apis.NewConfigSv1(cfg), "", false)
+ if !cfg.DispatcherSCfg().Enabled {
+ server.RpcRegister(cfgSv1)
+ }
+ var rpc birpc.ClientConnector = cfgSv1
+ if anz.IsRunning() {
+ rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.ConfigSv1)
+ }
+ iConfigCh <- rpc
+}
+
+func startRPC(server *cores.Server, internalAdminSChan,
+ internalCdrSChan, internalRsChan, internalStatSChan,
+ internalAttrSChan, internalChargerSChan, internalThdSChan, internalRouteSChan,
+ internalSessionSChan, internalAnalyzerSChan, internalDispatcherSChan,
+ internalLoaderSChan, internalCacheSChan,
+ internalEEsChan, internalRateSChan, internalActionSChan,
+ internalAccountSChan chan birpc.ClientConnector,
+ shdChan *utils.SyncedChan) {
+ if !cfg.DispatcherSCfg().Enabled {
+ select { // Any of the rpc methods will unlock listening to rpc requests
+ // case cdrs := <-internalCdrSChan:
+ // internalCdrSChan <- cdrs
+ case smg := <-internalSessionSChan:
+ internalSessionSChan <- smg
+ case rls := <-internalRsChan:
+ internalRsChan <- rls
+ case statS := <-internalStatSChan:
+ internalStatSChan <- statS
+ case admS := <-internalAdminSChan:
+ internalAdminSChan <- admS
+ case attrS := <-internalAttrSChan:
+ internalAttrSChan <- attrS
+ case chrgS := <-internalChargerSChan:
+ internalChargerSChan <- chrgS
+ case thS := <-internalThdSChan:
+ internalThdSChan <- thS
+ case rtS := <-internalRouteSChan:
+ internalRouteSChan <- rtS
+ // case analyzerS := <-internalAnalyzerSChan:
+ // internalAnalyzerSChan <- analyzerS
+ case loaderS := <-internalLoaderSChan:
+ internalLoaderSChan <- loaderS
+ case chS := <-internalCacheSChan: // added in order to start the RPC before precaching is done
+ internalCacheSChan <- chS
+ // case eeS := <-internalEEsChan:
+ // internalEEsChan <- eeS
+ case rateS := <-internalRateSChan:
+ internalRateSChan <- rateS
+ case actionS := <-internalActionSChan:
+ internalActionSChan <- actionS
+ case accountS := <-internalAccountSChan:
+ internalAccountSChan <- accountS
+ case <-shdChan.Done():
+ return
+ }
+ } else {
+ select {
+ case dispatcherS := <-internalDispatcherSChan:
+ internalDispatcherSChan <- dispatcherS
+ case <-shdChan.Done():
+ return
+ }
+ }
+
+ go server.ServeJSON(cfg.ListenCfg().RPCJSONListen, shdChan)
+ go server.ServeGOB(cfg.ListenCfg().RPCGOBListen, shdChan)
+ go server.ServeHTTP(
+ cfg.ListenCfg().HTTPListen,
+ cfg.HTTPCfg().JsonRPCURL,
+ cfg.HTTPCfg().WSURL,
+ cfg.HTTPCfg().UseBasicAuth,
+ cfg.HTTPCfg().AuthUsers,
+ shdChan,
+ )
+ if (len(cfg.ListenCfg().RPCGOBTLSListen) != 0 ||
+ len(cfg.ListenCfg().RPCJSONTLSListen) != 0 ||
+ len(cfg.ListenCfg().HTTPTLSListen) != 0) &&
+ (len(cfg.TLSCfg().ServerCerificate) == 0 ||
+ len(cfg.TLSCfg().ServerKey) == 0) {
+ utils.Logger.Warning("WARNING: missing TLS certificate/key file!")
+ return
+ }
+ if cfg.ListenCfg().RPCGOBTLSListen != utils.EmptyString {
+ go server.ServeGOBTLS(
+ cfg.ListenCfg().RPCGOBTLSListen,
+ cfg.TLSCfg().ServerCerificate,
+ cfg.TLSCfg().ServerKey,
+ cfg.TLSCfg().CaCertificate,
+ cfg.TLSCfg().ServerPolicy,
+ cfg.TLSCfg().ServerName,
+ shdChan,
+ )
+ }
+ if cfg.ListenCfg().RPCJSONTLSListen != utils.EmptyString {
+ go server.ServeJSONTLS(
+ cfg.ListenCfg().RPCJSONTLSListen,
+ cfg.TLSCfg().ServerCerificate,
+ cfg.TLSCfg().ServerKey,
+ cfg.TLSCfg().CaCertificate,
+ cfg.TLSCfg().ServerPolicy,
+ cfg.TLSCfg().ServerName,
+ shdChan,
+ )
+ }
+ if cfg.ListenCfg().HTTPTLSListen != utils.EmptyString {
+ go server.ServeHTTPTLS(
+ cfg.ListenCfg().HTTPTLSListen,
+ cfg.TLSCfg().ServerCerificate,
+ cfg.TLSCfg().ServerKey,
+ cfg.TLSCfg().CaCertificate,
+ cfg.TLSCfg().ServerPolicy,
+ cfg.TLSCfg().ServerName,
+ cfg.HTTPCfg().JsonRPCURL,
+ cfg.HTTPCfg().WSURL,
+ cfg.HTTPCfg().UseBasicAuth,
+ cfg.HTTPCfg().AuthUsers,
+ shdChan,
+ )
+ }
+}