mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Started adding the structure for cgr-engine
This commit is contained in:
committed by
Dan Christian Bogos
parent
4c71c77cdb
commit
5098d3e65f
@@ -84,7 +84,7 @@ func initCacheS(internalCacheSChan chan birpc.ClientConnector,
|
||||
cpS *engine.CapsStats) (chS *engine.CacheS) {
|
||||
chS = engine.NewCacheS(cfg, dm, cpS)
|
||||
go func() {
|
||||
if err := chS.Precache(); err != nil {
|
||||
if err := chS.Precache(context.TODO()); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> could not init, error: %s", utils.CacheS, err.Error()))
|
||||
shdChan.CloseOnce()
|
||||
}
|
||||
@@ -141,6 +141,7 @@ func initConfigSv1(internalConfigChan chan birpc.ClientConnector,
|
||||
internalConfigChan <- rpc
|
||||
}
|
||||
|
||||
/*
|
||||
func startRPC(server *cores.Server, internalAdminSChan,
|
||||
internalCdrSChan, internalRsChan, internalStatSChan,
|
||||
internalAttrSChan, internalChargerSChan, internalThdSChan, internalRouteSChan,
|
||||
@@ -251,6 +252,7 @@ func startRPC(server *cores.Server, internalAdminSChan,
|
||||
)
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
func writePid() {
|
||||
utils.Logger.Info(*pidFile)
|
||||
|
||||
178
cores/libserver.go
Normal file
178
cores/libserver.go
Normal file
@@ -0,0 +1,178 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package cores
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/analyzers"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// rpcRequest represents a RPC request.
|
||||
// rpcRequest implements the io.ReadWriteCloser interface.
|
||||
type rpcRequest struct {
|
||||
r io.ReadCloser // holds the JSON formated RPC request
|
||||
rw io.ReadWriter // holds the JSON formated RPC response
|
||||
remoteAddr net.Addr
|
||||
caps *engine.Caps
|
||||
anzWarpper *analyzers.AnalyzerService
|
||||
}
|
||||
|
||||
// newRPCRequest returns a new rpcRequest.
|
||||
func newRPCRequest(r io.ReadCloser, remoteAddr net.Addr, caps *engine.Caps, anz *analyzers.AnalyzerService) *rpcRequest {
|
||||
return &rpcRequest{
|
||||
r: r,
|
||||
rw: new(bytes.Buffer),
|
||||
remoteAddr: remoteAddr,
|
||||
caps: caps,
|
||||
anzWarpper: anz,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rpcRequest) Read(p []byte) (n int, err error) {
|
||||
return r.r.Read(p)
|
||||
}
|
||||
|
||||
func (r *rpcRequest) Write(p []byte) (n int, err error) {
|
||||
return r.rw.Write(p)
|
||||
}
|
||||
|
||||
func (r *rpcRequest) LocalAddr() net.Addr {
|
||||
return utils.LocalAddr()
|
||||
}
|
||||
func (r *rpcRequest) RemoteAddr() net.Addr {
|
||||
return r.remoteAddr
|
||||
}
|
||||
|
||||
func (r *rpcRequest) Close() error {
|
||||
return r.r.Close()
|
||||
}
|
||||
|
||||
// Call invokes the RPC request, waits for it to complete, and returns the results.
|
||||
func (r *rpcRequest) Call() io.Reader {
|
||||
birpc.ServeCodec(newCapsJSONCodec(r, r.caps, r.anzWarpper))
|
||||
return r.rw
|
||||
}
|
||||
|
||||
func loadTLSConfig(serverCrt, serverKey, caCert string, serverPolicy int,
|
||||
serverName string) (config *tls.Config, err error) {
|
||||
cert, err := tls.LoadX509KeyPair(serverCrt, serverKey)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("Error: %s when load server keys", err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rootCAs, err := x509.SystemCertPool()
|
||||
//This will only happen on windows
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("Error: %s when load SystemCertPool", err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if caCert != "" {
|
||||
ca, err := os.ReadFile(caCert)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("Error: %s when read CA", err))
|
||||
return config, err
|
||||
}
|
||||
|
||||
if ok := rootCAs.AppendCertsFromPEM(ca); !ok {
|
||||
utils.Logger.Crit("Cannot append certificate authority")
|
||||
return config, errors.New("Cannot append certificate authority")
|
||||
}
|
||||
}
|
||||
|
||||
config = &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
ClientAuth: tls.ClientAuthType(serverPolicy),
|
||||
ClientCAs: rootCAs,
|
||||
}
|
||||
if serverName != "" {
|
||||
config.ServerName = serverName
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func acceptRPC(ctx *context.Context, shtdwnEngine context.CancelFunc,
|
||||
srv *birpc.Server, l net.Listener, codecName string, newCodec func(conn conn) birpc.ServerCodec) (err error) {
|
||||
var errCnt int
|
||||
var lastErrorTime time.Time
|
||||
for {
|
||||
var conn net.Conn
|
||||
if conn, err = l.Accept(); err != nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
utils.Logger.Err(fmt.Sprintf("<CGRServer> %s accept error: <%s>", codecName, err.Error()))
|
||||
now := time.Now()
|
||||
if now.Sub(lastErrorTime) > 5*time.Second {
|
||||
errCnt = 0 // reset error count if last error was more than 5 seconds ago
|
||||
}
|
||||
lastErrorTime = time.Now()
|
||||
errCnt++
|
||||
if errCnt > 50 { // Too many errors in short interval, network buffer failure most probably
|
||||
shtdwnEngine()
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
go birpc.ServeCodec(newCodec(conn))
|
||||
}
|
||||
}
|
||||
|
||||
func acceptBiRPC(srv *birpc.BirpcServer, l net.Listener, codecName string, newCodec func(conn conn) birpc.BirpcCodec, stopbiRPCServer chan struct{}) {
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log
|
||||
return
|
||||
}
|
||||
stopbiRPCServer <- struct{}{}
|
||||
utils.Logger.Crit(fmt.Sprintf("Stopped Bi%s server beacause %s", codecName, err))
|
||||
return // stop if we get Accept error
|
||||
}
|
||||
go srv.ServeCodec(newCodec(conn))
|
||||
}
|
||||
}
|
||||
|
||||
func listenBiRPC(srv *birpc.BirpcServer, addr, codecName string, newCodec func(conn conn) birpc.BirpcCodec, stopbiRPCServer chan struct{}) (lBiRPC net.Listener, err error) {
|
||||
if lBiRPC, err = net.Listen(utils.TCP, addr); err != nil {
|
||||
log.Printf("ServeBi%s listen error: %s \n", codecName, err)
|
||||
return
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("Starting CGRateS Bi%s server at <%s>", codecName, addr))
|
||||
go acceptBiRPC(srv, lBiRPC, codecName, newCodec, stopbiRPCServer)
|
||||
return
|
||||
}
|
||||
481
cores/server.go
481
cores/server.go
@@ -19,40 +19,42 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package cores
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/pprof"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/analyzers"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"golang.org/x/net/websocket"
|
||||
)
|
||||
|
||||
func NewServer(caps *engine.Caps) (s *Server) {
|
||||
return &Server{
|
||||
s = &Server{
|
||||
httpMux: http.NewServeMux(),
|
||||
httpsMux: http.NewServeMux(),
|
||||
stopbiRPCServer: make(chan struct{}, 1),
|
||||
caps: caps,
|
||||
|
||||
rpcStarted: utils.NewSyncedChan(),
|
||||
rpcServer: birpc.NewServer(),
|
||||
birpcSrv: birpc.NewBirpcServer(),
|
||||
}
|
||||
s.httpServer = &http.Server{Handler: s.httpMux}
|
||||
s.httpsServer = &http.Server{Handler: s.httpsMux}
|
||||
return
|
||||
}
|
||||
|
||||
type Server struct {
|
||||
sync.RWMutex
|
||||
rpcEnabled bool
|
||||
httpEnabled bool
|
||||
birpcSrv *birpc.BirpcServer
|
||||
stopbiRPCServer chan struct{} // used in order to fully stop the biRPC
|
||||
@@ -60,6 +62,16 @@ type Server struct {
|
||||
httpMux *http.ServeMux
|
||||
caps *engine.Caps
|
||||
anz *analyzers.AnalyzerService
|
||||
|
||||
rpcStarted *utils.SyncedChan
|
||||
rpcServer *birpc.Server
|
||||
rpcJSONl net.Listener
|
||||
rpcGOBl net.Listener
|
||||
rpcJSONlTLS net.Listener
|
||||
rpcGOBlTLS net.Listener
|
||||
httpServer *http.Server
|
||||
httpsServer *http.Server
|
||||
startSrv sync.Once
|
||||
}
|
||||
|
||||
func (s *Server) SetAnalyzer(anz *analyzers.AnalyzerService) {
|
||||
@@ -68,16 +80,10 @@ func (s *Server) SetAnalyzer(anz *analyzers.AnalyzerService) {
|
||||
|
||||
func (s *Server) RpcRegister(rcvr interface{}) {
|
||||
birpc.Register(rcvr)
|
||||
s.Lock()
|
||||
s.rpcEnabled = true
|
||||
s.Unlock()
|
||||
}
|
||||
|
||||
func (s *Server) RpcRegisterName(name string, rcvr interface{}) {
|
||||
birpc.RegisterName(name, rcvr)
|
||||
s.Lock()
|
||||
s.rpcEnabled = true
|
||||
s.Unlock()
|
||||
}
|
||||
|
||||
func (s *Server) RpcUnregisterName(name string) {
|
||||
@@ -85,24 +91,16 @@ func (s *Server) RpcUnregisterName(name string) {
|
||||
}
|
||||
|
||||
func (s *Server) RegisterHTTPFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
|
||||
if s.httpMux != nil {
|
||||
s.httpMux.HandleFunc(pattern, handler)
|
||||
}
|
||||
if s.httpsMux != nil {
|
||||
s.httpsMux.HandleFunc(pattern, handler)
|
||||
}
|
||||
s.httpMux.HandleFunc(pattern, handler)
|
||||
s.httpsMux.HandleFunc(pattern, handler)
|
||||
s.Lock()
|
||||
s.httpEnabled = true
|
||||
s.Unlock()
|
||||
}
|
||||
|
||||
func (s *Server) RegisterHttpHandler(pattern string, handler http.Handler) {
|
||||
if s.httpMux != nil {
|
||||
s.httpMux.Handle(pattern, handler)
|
||||
}
|
||||
if s.httpsMux != nil {
|
||||
s.httpsMux.Handle(pattern, handler)
|
||||
}
|
||||
s.httpMux.Handle(pattern, handler)
|
||||
s.httpsMux.Handle(pattern, handler)
|
||||
s.Lock()
|
||||
s.httpEnabled = true
|
||||
s.Unlock()
|
||||
@@ -110,77 +108,9 @@ func (s *Server) RegisterHttpHandler(pattern string, handler http.Handler) {
|
||||
|
||||
// Registers a new BiJsonRpc name
|
||||
func (s *Server) BiRPCRegisterName(name string, rcv interface{}) {
|
||||
s.RLock()
|
||||
isNil := s.birpcSrv == nil
|
||||
s.RUnlock()
|
||||
if isNil {
|
||||
s.Lock()
|
||||
s.birpcSrv = birpc.NewBirpcServer()
|
||||
s.Unlock()
|
||||
}
|
||||
s.birpcSrv.RegisterName(name, rcv)
|
||||
}
|
||||
|
||||
func (s *Server) serveCodec(addr, codecName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) birpc.ServerCodec,
|
||||
shdChan *utils.SyncedChan) {
|
||||
s.RLock()
|
||||
enabled := s.rpcEnabled
|
||||
s.RUnlock()
|
||||
if !enabled {
|
||||
return
|
||||
}
|
||||
|
||||
l, e := net.Listen(utils.TCP, addr)
|
||||
if e != nil {
|
||||
log.Printf("Serve%s listen error: %s", codecName, e)
|
||||
shdChan.CloseOnce()
|
||||
return
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s server at <%s>.", codecName, addr))
|
||||
s.accept(l, codecName, newCodec, shdChan)
|
||||
}
|
||||
|
||||
func (s *Server) accept(l net.Listener, codecName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) birpc.ServerCodec,
|
||||
shdChan *utils.SyncedChan) {
|
||||
errCnt := 0
|
||||
var lastErrorTime time.Time
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<CGRServer> %s accept error: <%s>", codecName, err.Error()))
|
||||
now := time.Now()
|
||||
if now.Sub(lastErrorTime) > 5*time.Second {
|
||||
errCnt = 0 // reset error count if last error was more than 5 seconds ago
|
||||
}
|
||||
lastErrorTime = time.Now()
|
||||
errCnt++
|
||||
if errCnt > 50 { // Too many errors in short interval, network buffer failure most probably
|
||||
shdChan.CloseOnce()
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
go birpc.ServeCodec(newCodec(conn, s.caps, s.anz))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) ServeJSON(addr string, shdChan *utils.SyncedChan) {
|
||||
s.serveCodec(addr, utils.JSONCaps, newCapsJSONCodec, shdChan)
|
||||
}
|
||||
|
||||
func (s *Server) ServeGOB(addr string, shdChan *utils.SyncedChan) {
|
||||
s.serveCodec(addr, utils.GOBCaps, newCapsGOBCodec, shdChan)
|
||||
}
|
||||
|
||||
func (s *Server) handleRequest(w http.ResponseWriter, r *http.Request) {
|
||||
defer r.Body.Close()
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
rmtIP, _ := utils.GetRemoteIP(r)
|
||||
rmtAddr, _ := net.ResolveIPAddr(utils.EmptyString, rmtIP)
|
||||
res := newRPCRequest(r.Body, rmtAddr, s.caps, s.anz).Call()
|
||||
io.Copy(w, res)
|
||||
}
|
||||
|
||||
func registerProfiler(addr string, mux *http.ServeMux) {
|
||||
mux.HandleFunc(addr, pprof.Index)
|
||||
mux.HandleFunc(addr+"cmdline", pprof.Cmdline)
|
||||
@@ -203,19 +133,53 @@ func (s *Server) RegisterProfiler(addr string) {
|
||||
registerProfiler(addr, s.httpsMux)
|
||||
}
|
||||
|
||||
func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string,
|
||||
useBasicAuth bool, userList map[string]string, shdChan *utils.SyncedChan) {
|
||||
s.RLock()
|
||||
enabled := s.rpcEnabled
|
||||
s.RUnlock()
|
||||
func (s *Server) handleRequest(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
rmtIP, _ := utils.GetRemoteIP(r)
|
||||
rmtAddr, _ := net.ResolveIPAddr(utils.EmptyString, rmtIP)
|
||||
res := newRPCRequest(r.Body, rmtAddr, s.caps, s.anz).Call()
|
||||
io.Copy(w, res)
|
||||
r.Body.Close()
|
||||
}
|
||||
|
||||
func (s *Server) handleWebSocket(ws *websocket.Conn) {
|
||||
birpc.ServeCodec(newCapsJSONCodec(ws, s.caps, s.anz))
|
||||
}
|
||||
|
||||
func (s *Server) ServeJSON(ctx *context.Context, shtdwnEngine context.CancelFunc, addr string) (err error) {
|
||||
if s.rpcJSONl, err = net.Listen(utils.TCP, addr); err != nil {
|
||||
log.Printf("Serve%s listen error: %s", utils.JSONCaps, err)
|
||||
shtdwnEngine()
|
||||
return
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s server at <%s>.", utils.JSONCaps, addr))
|
||||
return acceptRPC(ctx, shtdwnEngine, s.rpcServer, s.rpcJSONl, utils.JSONCaps, func(conn conn) birpc.ServerCodec {
|
||||
return newCapsJSONCodec(conn, s.caps, s.anz)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Server) ServeGOB(ctx *context.Context, shtdwnEngine context.CancelFunc, addr string) (err error) {
|
||||
if s.rpcGOBl, err = net.Listen(utils.TCP, addr); err != nil {
|
||||
log.Printf("Serve%s listen error: %s", utils.GOBCaps, err)
|
||||
shtdwnEngine()
|
||||
return
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s server at <%s>.", utils.GOBCaps, addr))
|
||||
return acceptRPC(ctx, shtdwnEngine, s.rpcServer, s.rpcGOBl, utils.GOBCaps, func(conn conn) birpc.ServerCodec {
|
||||
return newCapsGOBCodec(conn, s.caps, s.anz)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Server) ServeHTTP(shtdwnEngine context.CancelFunc, addr, jsonRPCURL, wsRPCURL string,
|
||||
useBasicAuth bool, userList map[string]string) {
|
||||
s.Lock()
|
||||
s.httpEnabled = s.httpEnabled || jsonRPCURL != "" || wsRPCURL != ""
|
||||
enabled := s.httpEnabled
|
||||
s.Unlock()
|
||||
if !enabled {
|
||||
return
|
||||
}
|
||||
if jsonRPCURL != "" {
|
||||
s.Lock()
|
||||
s.httpEnabled = true
|
||||
s.Unlock()
|
||||
|
||||
utils.Logger.Info("<HTTP> enabling handler for JSON-RPC")
|
||||
if useBasicAuth {
|
||||
s.httpMux.HandleFunc(jsonRPCURL, use(s.handleRequest, basicAuth(userList)))
|
||||
@@ -224,9 +188,6 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string,
|
||||
}
|
||||
}
|
||||
if wsRPCURL != "" {
|
||||
s.Lock()
|
||||
s.httpEnabled = true
|
||||
s.Unlock()
|
||||
utils.Logger.Info("<HTTP> enabling handler for WebSocket connections")
|
||||
wsHandler := websocket.Handler(s.handleWebSocket)
|
||||
if useBasicAuth {
|
||||
@@ -235,40 +196,35 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string,
|
||||
s.httpMux.Handle(wsRPCURL, wsHandler)
|
||||
}
|
||||
}
|
||||
if !s.httpEnabled {
|
||||
return
|
||||
}
|
||||
if useBasicAuth {
|
||||
utils.Logger.Info("<HTTP> enabling basic auth")
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("<HTTP> start listening at <%s>", addr))
|
||||
if err := http.ListenAndServe(addr, s.httpMux); err != nil {
|
||||
s.httpServer.Addr = addr
|
||||
if err := s.httpServer.ListenAndServe(); err != nil {
|
||||
log.Println(fmt.Sprintf("<HTTP>Error: %s when listening ", err))
|
||||
shdChan.CloseOnce()
|
||||
shtdwnEngine()
|
||||
}
|
||||
}
|
||||
|
||||
// ServeBiRPC create a goroutine to listen and serve as BiRPC server
|
||||
func (s *Server) ServeBiRPC(addrJSON, addrGOB string, onConn func(birpc.ClientConnector), onDis func(birpc.ClientConnector)) (err error) {
|
||||
s.RLock()
|
||||
isNil := s.birpcSrv == nil
|
||||
s.RUnlock()
|
||||
if isNil {
|
||||
return fmt.Errorf("BiRPCServer should not be nil")
|
||||
}
|
||||
|
||||
func (s *Server) ServeBiRPC2(addrJSON, addrGOB string, onConn, onDis func(birpc.ClientConnector)) (err error) {
|
||||
s.birpcSrv.OnConnect(onConn)
|
||||
s.birpcSrv.OnDisconnect(onDis)
|
||||
if addrJSON != utils.EmptyString {
|
||||
var ljson net.Listener
|
||||
if ljson, err = s.listenBiRPC(s.birpcSrv, addrJSON, utils.JSONCaps, newCapsBiRPCJSONCodec); err != nil {
|
||||
if ljson, err = listenBiRPC(s.birpcSrv, addrJSON, utils.JSONCaps, func(conn conn) birpc.BirpcCodec {
|
||||
return newCapsBiRPCJSONCodec(conn, s.caps, s.anz)
|
||||
}, s.stopbiRPCServer); err != nil {
|
||||
return
|
||||
}
|
||||
defer ljson.Close()
|
||||
}
|
||||
if addrGOB != utils.EmptyString {
|
||||
var lgob net.Listener
|
||||
if lgob, err = s.listenBiRPC(s.birpcSrv, addrGOB, utils.GOBCaps, newCapsBiRPCGOBCodec); err != nil {
|
||||
if lgob, err = listenBiRPC(s.birpcSrv, addrGOB, utils.GOBCaps, func(conn conn) birpc.BirpcCodec {
|
||||
return newCapsBiRPCGOBCodec(conn, s.caps, s.anz)
|
||||
}, s.stopbiRPCServer); err != nil {
|
||||
return
|
||||
}
|
||||
defer lgob.Close()
|
||||
@@ -277,175 +233,58 @@ func (s *Server) ServeBiRPC(addrJSON, addrGOB string, onConn func(birpc.ClientCo
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Server) listenBiRPC(srv *birpc.BirpcServer, addr, codecName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) birpc.BirpcCodec) (lBiRPC net.Listener, err error) {
|
||||
if lBiRPC, err = net.Listen(utils.TCP, addr); err != nil {
|
||||
log.Printf("ServeBi%s listen error: %s \n", codecName, err)
|
||||
return
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("Starting CGRateS Bi%s server at <%s>", codecName, addr))
|
||||
go s.acceptBiRPC(srv, lBiRPC, codecName, newCodec)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Server) acceptBiRPC(srv *birpc.BirpcServer, l net.Listener, codecName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) birpc.BirpcCodec) {
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log
|
||||
return
|
||||
}
|
||||
s.stopbiRPCServer <- struct{}{}
|
||||
utils.Logger.Crit(fmt.Sprintf("Stopped Bi%s server beacause %s", codecName, err))
|
||||
return // stop if we get Accept error
|
||||
}
|
||||
go srv.ServeCodec(newCodec(conn, s.caps, s.anz))
|
||||
}
|
||||
}
|
||||
|
||||
// StopBiRPC stops the go routine create with ServeBiJSON
|
||||
func (s *Server) StopBiRPC() {
|
||||
s.stopbiRPCServer <- struct{}{}
|
||||
s.Lock()
|
||||
s.birpcSrv = nil
|
||||
s.Unlock()
|
||||
}
|
||||
|
||||
// rpcRequest represents a RPC request.
|
||||
// rpcRequest implements the io.ReadWriteCloser interface.
|
||||
type rpcRequest struct {
|
||||
r io.ReadCloser // holds the JSON formated RPC request
|
||||
rw io.ReadWriter // holds the JSON formated RPC response
|
||||
remoteAddr net.Addr
|
||||
caps *engine.Caps
|
||||
anzWarpper *analyzers.AnalyzerService
|
||||
}
|
||||
|
||||
// newRPCRequest returns a new rpcRequest.
|
||||
func newRPCRequest(r io.ReadCloser, remoteAddr net.Addr, caps *engine.Caps, anz *analyzers.AnalyzerService) *rpcRequest {
|
||||
return &rpcRequest{
|
||||
r: r,
|
||||
rw: new(bytes.Buffer),
|
||||
remoteAddr: remoteAddr,
|
||||
caps: caps,
|
||||
anzWarpper: anz,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rpcRequest) Read(p []byte) (n int, err error) {
|
||||
return r.r.Read(p)
|
||||
}
|
||||
|
||||
func (r *rpcRequest) Write(p []byte) (n int, err error) {
|
||||
return r.rw.Write(p)
|
||||
}
|
||||
|
||||
func (r *rpcRequest) LocalAddr() net.Addr {
|
||||
return utils.LocalAddr()
|
||||
}
|
||||
func (r *rpcRequest) RemoteAddr() net.Addr {
|
||||
return r.remoteAddr
|
||||
}
|
||||
|
||||
func (r *rpcRequest) Close() error {
|
||||
return r.r.Close()
|
||||
}
|
||||
|
||||
// Call invokes the RPC request, waits for it to complete, and returns the results.
|
||||
func (r *rpcRequest) Call() io.Reader {
|
||||
birpc.ServeCodec(newCapsJSONCodec(r, r.caps, r.anzWarpper))
|
||||
return r.rw
|
||||
}
|
||||
|
||||
func loadTLSConfig(serverCrt, serverKey, caCert string, serverPolicy int,
|
||||
serverName string) (config *tls.Config, err error) {
|
||||
cert, err := tls.LoadX509KeyPair(serverCrt, serverKey)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("Error: %s when load server keys", err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rootCAs, err := x509.SystemCertPool()
|
||||
//This will only happen on windows
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("Error: %s when load SystemCertPool", err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if caCert != "" {
|
||||
ca, err := os.ReadFile(caCert)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("Error: %s when read CA", err))
|
||||
return config, err
|
||||
}
|
||||
|
||||
if ok := rootCAs.AppendCertsFromPEM(ca); !ok {
|
||||
utils.Logger.Crit("Cannot append certificate authority")
|
||||
return config, errors.New("Cannot append certificate authority")
|
||||
}
|
||||
}
|
||||
|
||||
config = &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
ClientAuth: tls.ClientAuthType(serverPolicy),
|
||||
ClientCAs: rootCAs,
|
||||
}
|
||||
if serverName != "" {
|
||||
config.ServerName = serverName
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Server) serveCodecTLS(addr, codecName, serverCrt, serverKey, caCert string,
|
||||
serverPolicy int, serverName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) birpc.ServerCodec,
|
||||
shdChan *utils.SyncedChan) {
|
||||
s.RLock()
|
||||
enabled := s.rpcEnabled
|
||||
s.RUnlock()
|
||||
if !enabled {
|
||||
return
|
||||
}
|
||||
func (s *Server) ServeGOBTLS(ctx *context.Context, shtdwnEngine context.CancelFunc,
|
||||
addr, serverCrt, serverKey, caCert string, serverPolicy int, serverName string) (err error) {
|
||||
config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName)
|
||||
if err != nil {
|
||||
shdChan.CloseOnce()
|
||||
shtdwnEngine()
|
||||
return
|
||||
}
|
||||
listener, err := tls.Listen(utils.TCP, addr, config)
|
||||
s.rpcGOBlTLS, err = tls.Listen(utils.TCP, addr, config)
|
||||
if err != nil {
|
||||
log.Println(fmt.Sprintf("Error: %s when listening", err))
|
||||
shdChan.CloseOnce()
|
||||
shtdwnEngine()
|
||||
return
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s TLS server at <%s>.", codecName, addr))
|
||||
s.accept(listener, codecName+" "+utils.TLS, newCodec, shdChan)
|
||||
utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s TLS server at <%s>.", utils.GOBCaps, addr))
|
||||
|
||||
return acceptRPC(ctx, shtdwnEngine, s.rpcServer, s.rpcGOBlTLS, utils.GOBCaps, func(conn conn) birpc.ServerCodec {
|
||||
return newCapsGOBCodec(conn, s.caps, s.anz)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Server) ServeGOBTLS(addr, serverCrt, serverKey, caCert string,
|
||||
serverPolicy int, serverName string, shdChan *utils.SyncedChan) {
|
||||
s.serveCodecTLS(addr, utils.GOBCaps, serverCrt, serverKey, caCert, serverPolicy, serverName, newCapsGOBCodec, shdChan)
|
||||
func (s *Server) ServeJSONTLS(ctx *context.Context, shtdwnEngine context.CancelFunc,
|
||||
addr, serverCrt, serverKey, caCert string, serverPolicy int, serverName string) (err error) {
|
||||
config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName)
|
||||
if err != nil {
|
||||
shtdwnEngine()
|
||||
return
|
||||
}
|
||||
s.rpcJSONlTLS, err = tls.Listen(utils.TCP, addr, config)
|
||||
if err != nil {
|
||||
log.Println(fmt.Sprintf("Error: %s when listening", err))
|
||||
shtdwnEngine()
|
||||
return
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s TLS server at <%s>.", utils.JSONCaps, addr))
|
||||
|
||||
return acceptRPC(ctx, shtdwnEngine, s.rpcServer, s.rpcJSONlTLS, utils.JSONCaps, func(conn conn) birpc.ServerCodec {
|
||||
return newCapsGOBCodec(conn, s.caps, s.anz)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Server) ServeJSONTLS(addr, serverCrt, serverKey, caCert string,
|
||||
serverPolicy int, serverName string, shdChan *utils.SyncedChan) {
|
||||
s.serveCodecTLS(addr, utils.JSONCaps, serverCrt, serverKey, caCert, serverPolicy, serverName, newCapsJSONCodec, shdChan)
|
||||
}
|
||||
|
||||
func (s *Server) handleWebSocket(ws *websocket.Conn) {
|
||||
birpc.ServeCodec(newCapsJSONCodec(ws, s.caps, s.anz))
|
||||
}
|
||||
|
||||
func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverPolicy int,
|
||||
func (s *Server) ServeHTTPS(shtdwnEngine context.CancelFunc,
|
||||
addr, serverCrt, serverKey, caCert string, serverPolicy int,
|
||||
serverName string, jsonRPCURL string, wsRPCURL string,
|
||||
useBasicAuth bool, userList map[string]string, shdChan *utils.SyncedChan) {
|
||||
s.RLock()
|
||||
enabled := s.rpcEnabled
|
||||
s.RUnlock()
|
||||
useBasicAuth bool, userList map[string]string) {
|
||||
s.Lock()
|
||||
s.httpEnabled = s.httpEnabled || jsonRPCURL != "" || wsRPCURL != ""
|
||||
enabled := s.httpEnabled
|
||||
s.Unlock()
|
||||
if !enabled {
|
||||
return
|
||||
}
|
||||
if jsonRPCURL != "" {
|
||||
s.Lock()
|
||||
s.httpEnabled = true
|
||||
s.Unlock()
|
||||
utils.Logger.Info("<HTTPS> enabling handler for JSON-RPC")
|
||||
if useBasicAuth {
|
||||
s.httpsMux.HandleFunc(jsonRPCURL, use(s.handleRequest, basicAuth(userList)))
|
||||
@@ -454,9 +293,6 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP
|
||||
}
|
||||
}
|
||||
if wsRPCURL != "" {
|
||||
s.Lock()
|
||||
s.httpEnabled = true
|
||||
s.Unlock()
|
||||
utils.Logger.Info("<HTTPS> enabling handler for WebSocket connections")
|
||||
wsHandler := websocket.Handler(s.handleWebSocket)
|
||||
if useBasicAuth {
|
||||
@@ -465,25 +301,96 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP
|
||||
s.httpsMux.Handle(wsRPCURL, wsHandler)
|
||||
}
|
||||
}
|
||||
if !s.httpEnabled {
|
||||
return
|
||||
}
|
||||
if useBasicAuth {
|
||||
utils.Logger.Info("<HTTPS> enabling basic auth")
|
||||
}
|
||||
config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName)
|
||||
if err != nil {
|
||||
shdChan.CloseOnce()
|
||||
shtdwnEngine()
|
||||
return
|
||||
}
|
||||
httpSrv := http.Server{
|
||||
Addr: addr,
|
||||
Handler: s.httpsMux,
|
||||
TLSConfig: config,
|
||||
}
|
||||
s.httpsServer.Addr = addr
|
||||
s.httpsServer.TLSConfig = config
|
||||
utils.Logger.Info(fmt.Sprintf("<HTTPS> start listening at <%s>", addr))
|
||||
if err := httpSrv.ListenAndServeTLS(serverCrt, serverKey); err != nil {
|
||||
|
||||
if err := s.httpsServer.ListenAndServeTLS(serverCrt, serverKey); err != nil {
|
||||
log.Println(fmt.Sprintf("<HTTPS>Error: %s when listening ", err))
|
||||
shdChan.CloseOnce()
|
||||
shtdwnEngine()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) Stop() {
|
||||
s.rpcJSONl.Close()
|
||||
s.rpcGOBl.Close()
|
||||
s.rpcJSONlTLS.Close()
|
||||
s.rpcGOBlTLS.Close()
|
||||
s.httpServer.Shutdown(context.Background())
|
||||
s.httpsServer.Shutdown(context.Background())
|
||||
s.StopBiRPC()
|
||||
}
|
||||
|
||||
// StopBiRPC stops the go routine create with ServeBiJSON
|
||||
func (s *Server) StopBiRPC() {
|
||||
s.stopbiRPCServer <- struct{}{}
|
||||
s.birpcSrv = birpc.NewBirpcServer()
|
||||
}
|
||||
|
||||
func (s *Server) StartServer(ctx *context.Context, shtdwnEngine context.CancelFunc, cfg *config.CGRConfig) {
|
||||
s.startSrv.Do(func() {
|
||||
go s.ServeJSON(ctx, shtdwnEngine, cfg.ListenCfg().RPCJSONListen)
|
||||
go s.ServeGOB(ctx, shtdwnEngine, cfg.ListenCfg().RPCGOBListen)
|
||||
go s.ServeHTTP(
|
||||
shtdwnEngine,
|
||||
cfg.ListenCfg().HTTPListen,
|
||||
cfg.HTTPCfg().JsonRPCURL,
|
||||
cfg.HTTPCfg().WSURL,
|
||||
cfg.HTTPCfg().UseBasicAuth,
|
||||
cfg.HTTPCfg().AuthUsers,
|
||||
)
|
||||
if (len(cfg.ListenCfg().RPCGOBTLSListen) != 0 ||
|
||||
len(cfg.ListenCfg().RPCJSONTLSListen) != 0 ||
|
||||
len(cfg.ListenCfg().HTTPTLSListen) != 0) &&
|
||||
(len(cfg.TLSCfg().ServerCerificate) == 0 ||
|
||||
len(cfg.TLSCfg().ServerKey) == 0) {
|
||||
utils.Logger.Warning("WARNING: missing TLS certificate/key file!")
|
||||
return
|
||||
}
|
||||
if cfg.ListenCfg().RPCGOBTLSListen != utils.EmptyString {
|
||||
go s.ServeGOBTLS(
|
||||
ctx, shtdwnEngine,
|
||||
cfg.ListenCfg().RPCGOBTLSListen,
|
||||
cfg.TLSCfg().ServerCerificate,
|
||||
cfg.TLSCfg().ServerKey,
|
||||
cfg.TLSCfg().CaCertificate,
|
||||
cfg.TLSCfg().ServerPolicy,
|
||||
cfg.TLSCfg().ServerName,
|
||||
)
|
||||
}
|
||||
if cfg.ListenCfg().RPCJSONTLSListen != utils.EmptyString {
|
||||
go s.ServeJSONTLS(
|
||||
ctx, shtdwnEngine,
|
||||
cfg.ListenCfg().RPCJSONTLSListen,
|
||||
cfg.TLSCfg().ServerCerificate,
|
||||
cfg.TLSCfg().ServerKey,
|
||||
cfg.TLSCfg().CaCertificate,
|
||||
cfg.TLSCfg().ServerPolicy,
|
||||
cfg.TLSCfg().ServerName,
|
||||
)
|
||||
}
|
||||
if cfg.ListenCfg().HTTPTLSListen != utils.EmptyString {
|
||||
go s.ServeHTTPS(
|
||||
shtdwnEngine,
|
||||
cfg.ListenCfg().HTTPTLSListen,
|
||||
cfg.TLSCfg().ServerCerificate,
|
||||
cfg.TLSCfg().ServerKey,
|
||||
cfg.TLSCfg().CaCertificate,
|
||||
cfg.TLSCfg().ServerPolicy,
|
||||
cfg.TLSCfg().ServerName,
|
||||
cfg.HTTPCfg().JsonRPCURL,
|
||||
cfg.HTTPCfg().WSURL,
|
||||
cfg.HTTPCfg().UseBasicAuth,
|
||||
cfg.HTTPCfg().AuthUsers,
|
||||
)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -217,7 +217,7 @@ func (chS *CacheS) GetPrecacheChannel(chID string) chan struct{} {
|
||||
}
|
||||
|
||||
// Precache loads data from DataDB into cache at engine start
|
||||
func (chS *CacheS) Precache() (err error) {
|
||||
func (chS *CacheS) Precache(ctx *context.Context) (err error) {
|
||||
var wg sync.WaitGroup // wait for precache to finish
|
||||
errChan := make(chan error)
|
||||
doneChan := make(chan struct{})
|
||||
@@ -228,7 +228,7 @@ func (chS *CacheS) Precache() (err error) {
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(cacheID string) {
|
||||
errCache := chS.dm.CacheDataFromDB(context.TODO(),
|
||||
errCache := chS.dm.CacheDataFromDB(ctx,
|
||||
utils.CacheInstanceToPrefix[cacheID],
|
||||
[]string{utils.MetaAny},
|
||||
false)
|
||||
@@ -247,6 +247,8 @@ func (chS *CacheS) Precache() (err error) {
|
||||
select {
|
||||
case err = <-errChan:
|
||||
case <-doneChan:
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
74
services/cgr-engine.go
Normal file
74
services/cgr-engine.go
Normal file
@@ -0,0 +1,74 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package services
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
type CGREngine struct {
|
||||
cfg *config.CGRConfig
|
||||
|
||||
srvManager *servmanager.ServiceManager
|
||||
srvDep map[string]*sync.WaitGroup
|
||||
cmConns map[string]chan birpc.ClientConnector
|
||||
}
|
||||
|
||||
func (cgr *CGREngine) AddService(service servmanager.Service, connName, apiPrefix string,
|
||||
iConnCh chan birpc.ClientConnector) {
|
||||
cgr.srvManager.AddServices(service)
|
||||
cgr.cmConns[utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources)] = iConnCh
|
||||
cgr.srvDep[service.ServiceName()] = new(sync.WaitGroup)
|
||||
engine.IntRPC.AddInternalRPCClient(apiPrefix, iConnCh)
|
||||
}
|
||||
|
||||
func (cgr *CGREngine) InitConfigFromPath(path string, nodeID string) (err error) {
|
||||
// Init config
|
||||
if cgr.cfg, err = config.NewCGRConfigFromPath(path); err != nil {
|
||||
err = fmt.Errorf("could not parse config: <%s>", err)
|
||||
return
|
||||
}
|
||||
if cgr.cfg.ConfigDBCfg().Type != utils.MetaInternal {
|
||||
var d config.ConfigDB
|
||||
if d, err = engine.NewDataDBConn(cgr.cfg.ConfigDBCfg().Type,
|
||||
cgr.cfg.ConfigDBCfg().Host, cgr.cfg.ConfigDBCfg().Port,
|
||||
cgr.cfg.ConfigDBCfg().Name, cgr.cfg.ConfigDBCfg().User,
|
||||
cgr.cfg.ConfigDBCfg().Password, cgr.cfg.GeneralCfg().DBDataEncoding,
|
||||
cgr.cfg.ConfigDBCfg().Opts); err != nil { // Cannot configure getter database, show stopper
|
||||
err = fmt.Errorf("could not configure configDB: <%s>", err)
|
||||
return
|
||||
}
|
||||
if err = cgr.cfg.LoadFromDB(d); err != nil {
|
||||
err = fmt.Errorf("could not parse config from DB: <%s>", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
if nodeID != utils.EmptyString {
|
||||
cgr.cfg.GeneralCfg().NodeID = nodeID
|
||||
}
|
||||
config.SetCgrConfig(cgr.cfg) // Share the config object
|
||||
return
|
||||
}
|
||||
343
services/libcgr-engine.go
Normal file
343
services/libcgr-engine.go
Normal file
@@ -0,0 +1,343 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package services
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/apis"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/cores"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/loaders"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func NewCGREngineFlags() *CGREngineFlags {
|
||||
fs := flag.NewFlagSet(utils.CgrEngine, flag.ContinueOnError)
|
||||
return &CGREngineFlags{
|
||||
FlagSet: fs,
|
||||
Cfgpath: fs.String(utils.CfgPathCgr, utils.ConfigPath, "Configuration directory path."),
|
||||
Version: fs.Bool(utils.VersionCgr, false, "Prints the application version."),
|
||||
Checkconfig: fs.Bool(utils.CheckCfgCgr, false, "Verify the config without starting the engine"),
|
||||
Pidfile: fs.String(utils.PidCgr, utils.EmptyString, "Write pid file"),
|
||||
Httppprofpath: fs.String(utils.HttpPrfPthCgr, utils.EmptyString, "http address used for program profiling"),
|
||||
Cpuprofdir: fs.String(utils.CpuProfDirCgr, utils.EmptyString, "write cpu profile to files"),
|
||||
Memprofdir: fs.String(utils.MemProfDirCgr, utils.EmptyString, "write memory profile to file"),
|
||||
Memprofinterval: fs.Duration(utils.MemProfIntervalCgr, 5*time.Second, "Time between memory profile saves"),
|
||||
Memprofnrfiles: fs.Int(utils.MemProfNrFilesCgr, 1, "Number of memory profile to write"),
|
||||
Scheduledshutdown: fs.String(utils.ScheduledShutdownCgr, utils.EmptyString, "shutdown the engine after this duration"),
|
||||
Singlecpu: fs.Bool(utils.SingleCpuCgr, false, "Run on single CPU core"),
|
||||
Syslogger: fs.String(utils.LoggerCfg, utils.EmptyString, "logger <*syslog|*stdout>"),
|
||||
Nodeid: fs.String(utils.NodeIDCfg, utils.EmptyString, "The node ID of the engine"),
|
||||
Loglevel: fs.Int(utils.LogLevelCfg, -1, "Log level (0-emergency to 7-debug)"),
|
||||
Preload: fs.String(utils.PreloadCgr, utils.EmptyString, "LoaderIDs used to load the data before the engine starts"),
|
||||
}
|
||||
}
|
||||
|
||||
type CGREngineFlags struct {
|
||||
*flag.FlagSet
|
||||
|
||||
Cfgpath *string
|
||||
Version *bool
|
||||
Checkconfig *bool
|
||||
Pidfile *string
|
||||
Httppprofpath *string
|
||||
Cpuprofdir *string
|
||||
Memprofdir *string
|
||||
Memprofinterval *time.Duration
|
||||
Memprofnrfiles *int
|
||||
Scheduledshutdown *string
|
||||
Singlecpu *bool
|
||||
Syslogger *string
|
||||
Nodeid *string
|
||||
Loglevel *int
|
||||
Preload *string
|
||||
}
|
||||
|
||||
func cgrSingnalHandler(ctx *context.Context, shutdown context.CancelFunc,
|
||||
cfg *config.CGRConfig, shdWg *sync.WaitGroup) {
|
||||
shutdownSignal := make(chan os.Signal, 1)
|
||||
reloadSignal := make(chan os.Signal, 1)
|
||||
signal.Notify(shutdownSignal, os.Interrupt,
|
||||
syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
|
||||
signal.Notify(reloadSignal, syscall.SIGHUP)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
shdWg.Done()
|
||||
return
|
||||
case <-shutdownSignal:
|
||||
shutdown()
|
||||
shdWg.Done()
|
||||
return
|
||||
case <-reloadSignal:
|
||||
// do it in it's own goroutine in order to not block the signal handler with the reload functionality
|
||||
go func() {
|
||||
var reply string
|
||||
if err := cfg.V1ReloadConfig(ctx,
|
||||
new(config.ReloadArgs), &reply); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("Error reloading configuration: <%s>", err))
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func cgrWritePid(pidFile string) (err error) {
|
||||
var f *os.File
|
||||
if f, err = os.Create(pidFile); err != nil {
|
||||
err = fmt.Errorf("could not create pid file: %s", err)
|
||||
return
|
||||
}
|
||||
if _, err = f.WriteString(strconv.Itoa(os.Getpid())); err != nil {
|
||||
f.Close()
|
||||
err = fmt.Errorf("could not write pid file: %s", err)
|
||||
return
|
||||
}
|
||||
if err = f.Close(); err != nil {
|
||||
err = fmt.Errorf("could not close pid file: %s", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func cgrRunPreload(ctx *context.Context, cfg *config.CGRConfig, loaderIDs string,
|
||||
loader *LoaderService, iLoaderSCh chan birpc.ClientConnector) (err error) {
|
||||
if !cfg.LoaderCfg().Enabled() {
|
||||
err = fmt.Errorf("<%s> not enabled but required by preload mechanism", utils.LoaderS)
|
||||
return
|
||||
}
|
||||
select {
|
||||
case ldrs := <-iLoaderSCh:
|
||||
iLoaderSCh <- ldrs
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
var reply string
|
||||
for _, loaderID := range strings.Split(loaderIDs, utils.FieldsSep) {
|
||||
if err = loader.GetLoaderS().V1Load(ctx, &loaders.ArgsProcessFolder{
|
||||
ForceLock: true, // force lock will unlock the file in case is locked and return error
|
||||
LoaderID: loaderID,
|
||||
StopOnError: true,
|
||||
}, &reply); err != nil {
|
||||
err = fmt.Errorf("<%s> preload failed on loadID <%s> , err: <%s>", utils.LoaderS, loaderID, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// cgrStartFilterService fires up the FilterS
|
||||
func cgrStartFilterService(ctx *context.Context, iFilterSCh chan *engine.FilterS,
|
||||
cacheS *engine.CacheS, connMgr *engine.ConnManager,
|
||||
cfg *config.CGRConfig, dm *engine.DataManager) {
|
||||
select {
|
||||
case <-cacheS.GetPrecacheChannel(utils.CacheFilters):
|
||||
iFilterSCh <- engine.NewFilterS(cfg, connMgr, dm)
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
|
||||
// cgrInitCacheS inits the CacheS and starts precaching as well as populating internal channel for RPC conns
|
||||
func cgrInitCacheS(ctx *context.Context, shutdown context.CancelFunc,
|
||||
iCacheSCh chan birpc.ClientConnector, server *cores.Server,
|
||||
cfg *config.CGRConfig, dm *engine.DataManager, anz *AnalyzerService,
|
||||
cpS *engine.CapsStats) (chS *engine.CacheS) {
|
||||
chS = engine.NewCacheS(cfg, dm, cpS)
|
||||
go func() {
|
||||
if err := chS.Precache(ctx); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> could not init, error: %s", utils.CacheS, err.Error()))
|
||||
shutdown()
|
||||
}
|
||||
}()
|
||||
|
||||
chSv1, _ := birpc.NewService(apis.NewCacheSv1(chS), "", false)
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(chSv1)
|
||||
}
|
||||
var rpc birpc.ClientConnector = chSv1
|
||||
if anz.IsRunning() {
|
||||
rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.CacheS)
|
||||
}
|
||||
iCacheSCh <- rpc
|
||||
return
|
||||
}
|
||||
|
||||
func cgrInitGuardianSv1(iGuardianSCh chan birpc.ClientConnector,
|
||||
server *cores.Server, anz *AnalyzerService) {
|
||||
// grdSv1 := v1.NewGuardianSv1()
|
||||
// if !cfg.DispatcherSCfg().Enabled {
|
||||
// server.RpcRegister(grdSv1)
|
||||
// }
|
||||
// var rpc birpc.ClientConnector = grdSv1
|
||||
// if anz.IsRunning() {
|
||||
// rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.GuardianS)
|
||||
// }
|
||||
// iGuardianSCh <- rpc
|
||||
}
|
||||
|
||||
func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector,
|
||||
srvMngr *servmanager.ServiceManager, server *cores.Server,
|
||||
anz *AnalyzerService) {
|
||||
// if !cfg.DispatcherSCfg().Enabled {
|
||||
// server.RpcRegister(v1.NewServiceManagerV1(srvMngr))
|
||||
// }
|
||||
// var rpc birpc.ClientConnector = srvMngr
|
||||
// if anz.IsRunning() {
|
||||
// rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.ServiceManager)
|
||||
// }
|
||||
// iServMngrCh <- rpc
|
||||
}
|
||||
|
||||
func cgrInitConfigSv1(iConfigCh chan birpc.ClientConnector,
|
||||
cfg *config.CGRConfig, server *cores.Server, anz *AnalyzerService) {
|
||||
cfgSv1, _ := birpc.NewService(apis.NewConfigSv1(cfg), "", false)
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(cfgSv1)
|
||||
}
|
||||
var rpc birpc.ClientConnector = cfgSv1
|
||||
if anz.IsRunning() {
|
||||
rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.ConfigSv1)
|
||||
}
|
||||
iConfigCh <- rpc
|
||||
}
|
||||
|
||||
func startRPC(server *cores.Server, internalAdminSChan,
|
||||
internalCdrSChan, internalRsChan, internalStatSChan,
|
||||
internalAttrSChan, internalChargerSChan, internalThdSChan, internalRouteSChan,
|
||||
internalSessionSChan, internalAnalyzerSChan, internalDispatcherSChan,
|
||||
internalLoaderSChan, internalCacheSChan,
|
||||
internalEEsChan, internalRateSChan, internalActionSChan,
|
||||
internalAccountSChan chan birpc.ClientConnector,
|
||||
shdChan *utils.SyncedChan) {
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
select { // Any of the rpc methods will unlock listening to rpc requests
|
||||
// case cdrs := <-internalCdrSChan:
|
||||
// internalCdrSChan <- cdrs
|
||||
case smg := <-internalSessionSChan:
|
||||
internalSessionSChan <- smg
|
||||
case rls := <-internalRsChan:
|
||||
internalRsChan <- rls
|
||||
case statS := <-internalStatSChan:
|
||||
internalStatSChan <- statS
|
||||
case admS := <-internalAdminSChan:
|
||||
internalAdminSChan <- admS
|
||||
case attrS := <-internalAttrSChan:
|
||||
internalAttrSChan <- attrS
|
||||
case chrgS := <-internalChargerSChan:
|
||||
internalChargerSChan <- chrgS
|
||||
case thS := <-internalThdSChan:
|
||||
internalThdSChan <- thS
|
||||
case rtS := <-internalRouteSChan:
|
||||
internalRouteSChan <- rtS
|
||||
// case analyzerS := <-internalAnalyzerSChan:
|
||||
// internalAnalyzerSChan <- analyzerS
|
||||
case loaderS := <-internalLoaderSChan:
|
||||
internalLoaderSChan <- loaderS
|
||||
case chS := <-internalCacheSChan: // added in order to start the RPC before precaching is done
|
||||
internalCacheSChan <- chS
|
||||
// case eeS := <-internalEEsChan:
|
||||
// internalEEsChan <- eeS
|
||||
case rateS := <-internalRateSChan:
|
||||
internalRateSChan <- rateS
|
||||
case actionS := <-internalActionSChan:
|
||||
internalActionSChan <- actionS
|
||||
case accountS := <-internalAccountSChan:
|
||||
internalAccountSChan <- accountS
|
||||
case <-shdChan.Done():
|
||||
return
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
case dispatcherS := <-internalDispatcherSChan:
|
||||
internalDispatcherSChan <- dispatcherS
|
||||
case <-shdChan.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
go server.ServeJSON(cfg.ListenCfg().RPCJSONListen, shdChan)
|
||||
go server.ServeGOB(cfg.ListenCfg().RPCGOBListen, shdChan)
|
||||
go server.ServeHTTP(
|
||||
cfg.ListenCfg().HTTPListen,
|
||||
cfg.HTTPCfg().JsonRPCURL,
|
||||
cfg.HTTPCfg().WSURL,
|
||||
cfg.HTTPCfg().UseBasicAuth,
|
||||
cfg.HTTPCfg().AuthUsers,
|
||||
shdChan,
|
||||
)
|
||||
if (len(cfg.ListenCfg().RPCGOBTLSListen) != 0 ||
|
||||
len(cfg.ListenCfg().RPCJSONTLSListen) != 0 ||
|
||||
len(cfg.ListenCfg().HTTPTLSListen) != 0) &&
|
||||
(len(cfg.TLSCfg().ServerCerificate) == 0 ||
|
||||
len(cfg.TLSCfg().ServerKey) == 0) {
|
||||
utils.Logger.Warning("WARNING: missing TLS certificate/key file!")
|
||||
return
|
||||
}
|
||||
if cfg.ListenCfg().RPCGOBTLSListen != utils.EmptyString {
|
||||
go server.ServeGOBTLS(
|
||||
cfg.ListenCfg().RPCGOBTLSListen,
|
||||
cfg.TLSCfg().ServerCerificate,
|
||||
cfg.TLSCfg().ServerKey,
|
||||
cfg.TLSCfg().CaCertificate,
|
||||
cfg.TLSCfg().ServerPolicy,
|
||||
cfg.TLSCfg().ServerName,
|
||||
shdChan,
|
||||
)
|
||||
}
|
||||
if cfg.ListenCfg().RPCJSONTLSListen != utils.EmptyString {
|
||||
go server.ServeJSONTLS(
|
||||
cfg.ListenCfg().RPCJSONTLSListen,
|
||||
cfg.TLSCfg().ServerCerificate,
|
||||
cfg.TLSCfg().ServerKey,
|
||||
cfg.TLSCfg().CaCertificate,
|
||||
cfg.TLSCfg().ServerPolicy,
|
||||
cfg.TLSCfg().ServerName,
|
||||
shdChan,
|
||||
)
|
||||
}
|
||||
if cfg.ListenCfg().HTTPTLSListen != utils.EmptyString {
|
||||
go server.ServeHTTPTLS(
|
||||
cfg.ListenCfg().HTTPTLSListen,
|
||||
cfg.TLSCfg().ServerCerificate,
|
||||
cfg.TLSCfg().ServerKey,
|
||||
cfg.TLSCfg().CaCertificate,
|
||||
cfg.TLSCfg().ServerPolicy,
|
||||
cfg.TLSCfg().ServerName,
|
||||
cfg.HTTPCfg().JsonRPCURL,
|
||||
cfg.HTTPCfg().WSURL,
|
||||
cfg.HTTPCfg().UseBasicAuth,
|
||||
cfg.HTTPCfg().AuthUsers,
|
||||
shdChan,
|
||||
)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user