mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Updated caps for birpc server
This commit is contained in:
committed by
Dan Christian Bogos
parent
0c53cc0993
commit
7a2d234108
@@ -80,3 +80,100 @@ func (c *AnalyzerServerCodec) WriteResponse(r *birpc.Response, x interface{}) er
|
||||
}
|
||||
|
||||
func (c *AnalyzerServerCodec) Close() error { return c.sc.Close() }
|
||||
|
||||
func NewAnalyzerBiRPCCodec(sc birpc.BirpcCodec, aS *AnalyzerService, enc, from, to string) birpc.BirpcCodec {
|
||||
return &AnalyzerBiRPCCodec{
|
||||
sc: sc,
|
||||
reqs: make(map[uint64]*rpcAPI),
|
||||
reps: make(map[uint64]*rpcAPI),
|
||||
aS: aS,
|
||||
enc: enc,
|
||||
from: from,
|
||||
to: to,
|
||||
}
|
||||
}
|
||||
|
||||
type AnalyzerBiRPCCodec struct {
|
||||
sc birpc.BirpcCodec
|
||||
|
||||
// keep the API in memory because the write is async
|
||||
reqs map[uint64]*rpcAPI
|
||||
reqIdx uint64
|
||||
reqsLk sync.RWMutex
|
||||
reps map[uint64]*rpcAPI
|
||||
repIdx uint64
|
||||
repsLk sync.RWMutex
|
||||
|
||||
aS *AnalyzerService
|
||||
enc string
|
||||
from string
|
||||
to string
|
||||
}
|
||||
|
||||
// ReadHeader must read a message and populate either the request
|
||||
// or the response by inspecting the incoming message.
|
||||
func (c *AnalyzerBiRPCCodec) ReadHeader(req *birpc.Request, resp *birpc.Response) (err error) {
|
||||
err = c.sc.ReadHeader(req, resp)
|
||||
if req.ServiceMethod != "" {
|
||||
c.reqsLk.Lock()
|
||||
c.reqIdx = req.Seq
|
||||
c.reqs[c.reqIdx] = &rpcAPI{
|
||||
ID: req.Seq,
|
||||
Method: req.ServiceMethod,
|
||||
StartTime: time.Now(),
|
||||
}
|
||||
c.reqsLk.Unlock()
|
||||
} else {
|
||||
c.repsLk.Lock()
|
||||
c.repIdx = resp.Seq
|
||||
c.reps[c.repIdx].Error = resp.Error
|
||||
c.repsLk.Unlock()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ReadRequestBody into args argument of handler function.
|
||||
func (c *AnalyzerBiRPCCodec) ReadRequestBody(x interface{}) (err error) {
|
||||
err = c.sc.ReadRequestBody(x)
|
||||
c.reqsLk.Lock()
|
||||
c.reqs[c.reqIdx].Params = x
|
||||
c.reqsLk.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// ReadResponseBody into reply argument of handler function.
|
||||
func (c *AnalyzerBiRPCCodec) ReadResponseBody(x interface{}) (err error) {
|
||||
err = c.sc.ReadResponseBody(x)
|
||||
c.repsLk.Lock()
|
||||
api := c.reqs[c.repIdx]
|
||||
delete(c.reqs, c.repIdx)
|
||||
c.repsLk.Unlock()
|
||||
go c.aS.logTrafic(api.ID, api.Method, api.Params, x, api.Error, c.enc, c.to, c.from, api.StartTime, time.Now())
|
||||
return
|
||||
}
|
||||
|
||||
// WriteRequest must be safe for concurrent use by multiple goroutines.
|
||||
func (c *AnalyzerBiRPCCodec) WriteRequest(req *birpc.Request, x interface{}) error {
|
||||
c.repsLk.Lock()
|
||||
c.reqIdx = req.Seq
|
||||
c.reqs[c.reqIdx] = &rpcAPI{
|
||||
ID: req.Seq,
|
||||
Method: req.ServiceMethod,
|
||||
StartTime: time.Now(),
|
||||
}
|
||||
c.repsLk.Unlock()
|
||||
return c.sc.WriteRequest(req, x)
|
||||
}
|
||||
|
||||
// WriteResponse must be safe for concurrent use by multiple goroutines.
|
||||
func (c *AnalyzerBiRPCCodec) WriteResponse(r *birpc.Response, x interface{}) error {
|
||||
c.reqsLk.Lock()
|
||||
api := c.reqs[r.Seq]
|
||||
delete(c.reqs, r.Seq)
|
||||
c.reqsLk.Unlock()
|
||||
go c.aS.logTrafic(api.ID, api.Method, api.Params, x, r.Error, c.enc, c.from, c.to, api.StartTime, time.Now())
|
||||
return c.sc.WriteResponse(r, x)
|
||||
}
|
||||
|
||||
// Close is called when client/server finished with the connection.
|
||||
func (c *AnalyzerBiRPCCodec) Close() error { return c.sc.Close() }
|
||||
|
||||
@@ -82,6 +82,7 @@ type rpcAPI struct {
|
||||
ID uint64 `json:"id"`
|
||||
Method string `json:"method"`
|
||||
Params interface{} `json:"params"`
|
||||
Error string `json:"err,omitempty"`
|
||||
|
||||
StartTime time.Time
|
||||
}
|
||||
|
||||
@@ -622,7 +622,7 @@ func main() {
|
||||
cdrS := services.NewCDRServer(cfg, dmService, storDBService, filterSChan, server, internalCDRServerChan,
|
||||
connManager, anz, srvDep)
|
||||
|
||||
smg := services.NewSessionService(cfg, dmService, server, internalSessionSChan, shdChan, connManager, caps, anz, srvDep)
|
||||
smg := services.NewSessionService(cfg, dmService, server, internalSessionSChan, shdChan, connManager, anz, srvDep)
|
||||
|
||||
ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server,
|
||||
internalLoaderSChan, connManager, anz, srvDep)
|
||||
|
||||
@@ -106,3 +106,88 @@ func (c *capsServerCodec) WriteResponse(r *birpc.Response, x interface{}) error
|
||||
return c.sc.WriteResponse(r, x)
|
||||
}
|
||||
func (c *capsServerCodec) Close() error { return c.sc.Close() }
|
||||
|
||||
func newCapsBiRPCGOBCodec(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) (r birpc.BirpcCodec) {
|
||||
r = newCapsBiRPCCodec(birpc.NewGobBirpcCodec(conn), caps)
|
||||
if anz != nil {
|
||||
from := conn.RemoteAddr()
|
||||
var fromstr string
|
||||
if from != nil {
|
||||
fromstr = from.String()
|
||||
}
|
||||
to := conn.LocalAddr()
|
||||
var tostr string
|
||||
if to != nil {
|
||||
tostr = to.String()
|
||||
}
|
||||
return analyzers.NewAnalyzerBiRPCCodec(r, anz, utils.MetaGOB, fromstr, tostr)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func newCapsBiRPCJSONCodec(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) (r birpc.BirpcCodec) {
|
||||
r = newCapsBiRPCCodec(jsonrpc.NewJSONBirpcCodec(conn), caps)
|
||||
if anz != nil {
|
||||
from := conn.RemoteAddr()
|
||||
var fromstr string
|
||||
if from != nil {
|
||||
fromstr = from.String()
|
||||
}
|
||||
to := conn.LocalAddr()
|
||||
var tostr string
|
||||
if to != nil {
|
||||
tostr = to.String()
|
||||
}
|
||||
return analyzers.NewAnalyzerBiRPCCodec(r, anz, utils.MetaJSON, fromstr, tostr)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func newCapsBiRPCCodec(sc birpc.BirpcCodec, caps *engine.Caps) birpc.BirpcCodec {
|
||||
return &capsBiRPCCodec{
|
||||
sc: sc,
|
||||
caps: caps,
|
||||
}
|
||||
}
|
||||
|
||||
type capsBiRPCCodec struct {
|
||||
sc birpc.BirpcCodec
|
||||
caps *engine.Caps
|
||||
}
|
||||
|
||||
// ReadHeader must read a message and populate either the request
|
||||
// or the response by inspecting the incoming message.
|
||||
func (c *capsBiRPCCodec) ReadHeader(req *birpc.Request, resp *birpc.Response) (err error) {
|
||||
return c.sc.ReadHeader(req, resp)
|
||||
}
|
||||
|
||||
// ReadRequestBody into args argument of handler function.
|
||||
func (c *capsBiRPCCodec) ReadRequestBody(x interface{}) (err error) {
|
||||
if err = c.caps.Allocate(); err != nil {
|
||||
return
|
||||
}
|
||||
return c.sc.ReadRequestBody(x)
|
||||
}
|
||||
|
||||
// ReadResponseBody into reply argument of handler function.
|
||||
func (c *capsBiRPCCodec) ReadResponseBody(x interface{}) error {
|
||||
return c.sc.ReadResponseBody(x)
|
||||
}
|
||||
|
||||
// WriteRequest must be safe for concurrent use by multiple goroutines.
|
||||
func (c *capsBiRPCCodec) WriteRequest(req *birpc.Request, x interface{}) error {
|
||||
return c.sc.WriteRequest(req, x)
|
||||
}
|
||||
|
||||
// WriteResponse must be safe for concurrent use by multiple goroutines.
|
||||
func (c *capsBiRPCCodec) WriteResponse(r *birpc.Response, x interface{}) error {
|
||||
if r.Error == utils.ErrMaxConcurentRPCExceededNoCaps.Error() {
|
||||
r.Error = utils.ErrMaxConcurentRPCExceeded.Error()
|
||||
} else {
|
||||
defer c.caps.Deallocate()
|
||||
}
|
||||
return c.sc.WriteResponse(r, x)
|
||||
}
|
||||
|
||||
// Close is called when client/server finished with the connection.
|
||||
func (c *capsBiRPCCodec) Close() error { return c.sc.Close() }
|
||||
|
||||
@@ -35,7 +35,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/jsonrpc"
|
||||
"github.com/cgrates/cgrates/analyzers"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -263,14 +262,14 @@ func (s *Server) ServeBiRPC(addrJSON, addrGOB string, onConn func(birpc.ClientCo
|
||||
s.birpcSrv.OnDisconnect(onDis)
|
||||
if addrJSON != utils.EmptyString {
|
||||
var ljson net.Listener
|
||||
if ljson, err = s.listenBiRPC(s.birpcSrv, addrJSON, utils.JSONCaps, jsonrpc.NewJSONBirpcCodec); err != nil {
|
||||
if ljson, err = s.listenBiRPC(s.birpcSrv, addrJSON, utils.JSONCaps, newCapsBiRPCJSONCodec); err != nil {
|
||||
return
|
||||
}
|
||||
defer ljson.Close()
|
||||
}
|
||||
if addrGOB != utils.EmptyString {
|
||||
var lgob net.Listener
|
||||
if lgob, err = s.listenBiRPC(s.birpcSrv, addrGOB, utils.GOBCaps, birpc.NewGobBirpcCodec); err != nil {
|
||||
if lgob, err = s.listenBiRPC(s.birpcSrv, addrGOB, utils.GOBCaps, newCapsBiRPCGOBCodec); err != nil {
|
||||
return
|
||||
}
|
||||
defer lgob.Close()
|
||||
@@ -279,7 +278,7 @@ func (s *Server) ServeBiRPC(addrJSON, addrGOB string, onConn func(birpc.ClientCo
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Server) listenBiRPC(srv *birpc.BirpcServer, addr, codecName string, newCodec func(io.ReadWriteCloser) birpc.BirpcCodec) (lBiRPC net.Listener, err error) {
|
||||
func (s *Server) listenBiRPC(srv *birpc.BirpcServer, addr, codecName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) birpc.BirpcCodec) (lBiRPC net.Listener, err error) {
|
||||
if lBiRPC, err = net.Listen(utils.TCP, addr); err != nil {
|
||||
log.Printf("ServeBi%s listen error: %s \n", codecName, err)
|
||||
return
|
||||
@@ -289,7 +288,7 @@ func (s *Server) listenBiRPC(srv *birpc.BirpcServer, addr, codecName string, new
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Server) acceptBiRPC(srv *birpc.BirpcServer, l net.Listener, codecName string, newCodec func(io.ReadWriteCloser) birpc.BirpcCodec) {
|
||||
func (s *Server) acceptBiRPC(srv *birpc.BirpcServer, l net.Listener, codecName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) birpc.BirpcCodec) {
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
@@ -300,7 +299,7 @@ func (s *Server) acceptBiRPC(srv *birpc.BirpcServer, l net.Listener, codecName s
|
||||
utils.Logger.Crit(fmt.Sprintf("Stoped Bi%s server beacause %s", codecName, err))
|
||||
return // stop if we get Accept error
|
||||
}
|
||||
go srv.ServeCodec(newCodec(conn))
|
||||
go srv.ServeCodec(newCodec(conn, s.caps, s.anz))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -92,7 +92,7 @@ func TestServerIT(t *testing.T) {
|
||||
utils.Logger.SetLogLevel(7)
|
||||
for _, test := range sTestsServer {
|
||||
log.SetOutput(io.Discard)
|
||||
t.Run("Running IT serve tests", test)
|
||||
t.Run("TestServerIT", test)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -756,7 +756,7 @@ func testAcceptBiRPC(t *testing.T) {
|
||||
l := &mockListener{
|
||||
p1: p1,
|
||||
}
|
||||
go server.acceptBiRPC(server.birpcSrv, l, utils.JSONCaps, jsonrpc.NewJSONBirpcCodec)
|
||||
go server.acceptBiRPC(server.birpcSrv, l, utils.JSONCaps, newCapsBiRPCJSONCodec)
|
||||
rpc := jsonrpc.NewClient(p2)
|
||||
var reply string
|
||||
expected := "birpc: can't find method AttributeSv1.Ping"
|
||||
@@ -777,14 +777,14 @@ func (mK *mockListenError) Accept() (net.Conn, error) {
|
||||
}
|
||||
|
||||
func testAcceptBiRPCError(t *testing.T) {
|
||||
caps := engine.NewCaps(0, utils.MetaBusy)
|
||||
caps := engine.NewCaps(10, utils.MetaBusy)
|
||||
server := NewServer(caps)
|
||||
server.RpcRegister(new(mockRegister))
|
||||
server.birpcSrv = birpc.NewBirpcServer()
|
||||
|
||||
//it will contain "use of closed network connection"
|
||||
l := new(mockListenError)
|
||||
go server.acceptBiRPC(server.birpcSrv, l, utils.JSONCaps, jsonrpc.NewJSONBirpcCodec)
|
||||
go server.acceptBiRPC(server.birpcSrv, l, utils.JSONCaps, newCapsBiRPCJSONCodec)
|
||||
runtime.Gosched()
|
||||
}
|
||||
|
||||
|
||||
@@ -121,9 +121,8 @@ func testDspRPrfCostForEvent(t *testing.T) {
|
||||
Cost: utils.NewDecimal(12, 2),
|
||||
CostIntervals: []*utils.RateSIntervalCost{{
|
||||
Increments: []*utils.RateSIncrementCost{{
|
||||
Usage: utils.NewDecimal(int64(time.Minute), 0),
|
||||
IntervalRateIndex: 0,
|
||||
CompressFactor: 1,
|
||||
Usage: utils.NewDecimal(int64(time.Minute), 0),
|
||||
CompressFactor: 1,
|
||||
}},
|
||||
CompressFactor: 1,
|
||||
}},
|
||||
@@ -187,9 +186,8 @@ func testDspRPrfCostForEventWithoutFilters(t *testing.T) {
|
||||
Cost: utils.NewDecimal(25, 2),
|
||||
CostIntervals: []*utils.RateSIntervalCost{{
|
||||
Increments: []*utils.RateSIncrementCost{{
|
||||
Usage: utils.NewDecimal(int64(time.Minute), 0),
|
||||
IntervalRateIndex: 0,
|
||||
CompressFactor: 60,
|
||||
Usage: utils.NewDecimal(int64(time.Minute), 0),
|
||||
CompressFactor: 60,
|
||||
}},
|
||||
CompressFactor: 1,
|
||||
}},
|
||||
|
||||
@@ -159,6 +159,29 @@ func splitDynFltrValues(val, sep string) (vals []string) {
|
||||
return append(vals, valsEnd[1:]...)
|
||||
}
|
||||
|
||||
func splitInlineFilter(rule string) (splt []string) {
|
||||
var p, st int
|
||||
splt = make([]string, 0, 3)
|
||||
for i, b := range rule {
|
||||
switch byte(b) {
|
||||
case utils.InInFieldSep[0]:
|
||||
if p == 0 {
|
||||
splt = append(splt, rule[st:i])
|
||||
st = i + 1
|
||||
if len(splt) == 2 {
|
||||
splt = append(splt, rule[st:])
|
||||
return
|
||||
}
|
||||
}
|
||||
case utils.IdxStart[0]:
|
||||
p++
|
||||
case utils.IdxEnd[0]:
|
||||
p--
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// NewFilterFromInline parses an inline rule into a compiled Filter
|
||||
func NewFilterFromInline(tenant, inlnRule string) (f *Filter, err error) {
|
||||
ruleSplt := strings.SplitN(inlnRule, utils.InInFieldSep, 3)
|
||||
|
||||
@@ -55,7 +55,7 @@ func TestAsteriskAgentReload(t *testing.T) {
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
|
||||
shdChan, nil, nil, anz, srvDep)
|
||||
shdChan, nil, anz, srvDep)
|
||||
srv := NewAsteriskAgent(cfg, shdChan, nil, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(srv, sS,
|
||||
@@ -117,7 +117,7 @@ func TestAsteriskAgentReload2(t *testing.T) {
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
|
||||
shdChan, nil, nil, anz, srvDep)
|
||||
shdChan, nil, anz, srvDep)
|
||||
srv := NewAsteriskAgent(cfg, shdChan, nil, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(srv, sS,
|
||||
|
||||
@@ -50,7 +50,7 @@ func TestDiameterAgentReload1(t *testing.T) {
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
|
||||
shdChan, nil, nil, anz, srvDep)
|
||||
shdChan, nil, anz, srvDep)
|
||||
srv := NewDiameterAgent(cfg, filterSChan, shdChan, nil, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(srv, sS,
|
||||
|
||||
@@ -57,7 +57,7 @@ func TestDNSAgentReload(t *testing.T) {
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
|
||||
shdChan, nil, nil, anz, srvDep)
|
||||
shdChan, nil, anz, srvDep)
|
||||
srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(srv, sS,
|
||||
|
||||
@@ -65,7 +65,7 @@ func TestEventReaderSReload(t *testing.T) {
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, nil, anz, srvDep)
|
||||
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, anz, srvDep)
|
||||
erS := NewEventReaderService(cfg, filterSChan, shdChan, nil, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(erS, sS,
|
||||
|
||||
@@ -57,7 +57,7 @@ func TestFreeSwitchAgentReload(t *testing.T) {
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
|
||||
shdChan, nil, nil, anz, srvDep)
|
||||
shdChan, nil, anz, srvDep)
|
||||
srv := NewFreeswitchAgent(cfg, shdChan, nil, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(srv, sS,
|
||||
|
||||
@@ -55,7 +55,7 @@ func TestHTTPAgentReload(t *testing.T) {
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
|
||||
shdChan, nil, nil, anz, srvDep)
|
||||
shdChan, nil, anz, srvDep)
|
||||
srv := NewHTTPAgent(cfg, filterSChan, server, nil, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(srv, sS,
|
||||
|
||||
@@ -55,7 +55,7 @@ func TestKamailioAgentReload(t *testing.T) {
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
|
||||
shdChan, nil, nil, anz, srvDep)
|
||||
shdChan, nil, anz, srvDep)
|
||||
srv := NewKamailioAgent(cfg, shdChan, nil, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(srv, sS,
|
||||
|
||||
@@ -58,7 +58,7 @@ func TestRadiusAgentReload(t *testing.T) {
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
|
||||
shdChan, nil, nil, anz, srvDep)
|
||||
shdChan, nil, anz, srvDep)
|
||||
srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(srv, sS,
|
||||
@@ -122,7 +122,7 @@ func TestRadiusAgentReload2(t *testing.T) {
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
|
||||
shdChan, nil, nil, anz, srvDep)
|
||||
shdChan, nil, anz, srvDep)
|
||||
srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(srv, sS,
|
||||
|
||||
@@ -36,8 +36,7 @@ import (
|
||||
func NewSessionService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
server *cores.Server, internalChan chan birpc.ClientConnector,
|
||||
shdChan *utils.SyncedChan, connMgr *engine.ConnManager,
|
||||
caps *engine.Caps, anz *AnalyzerService,
|
||||
srvDep map[string]*sync.WaitGroup) servmanager.Service {
|
||||
anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service {
|
||||
return &SessionService{
|
||||
connChan: internalChan,
|
||||
cfg: cfg,
|
||||
@@ -45,7 +44,6 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
server: server,
|
||||
shdChan: shdChan,
|
||||
connMgr: connMgr,
|
||||
caps: caps,
|
||||
anz: anz,
|
||||
srvDep: srvDep,
|
||||
}
|
||||
@@ -68,7 +66,6 @@ type SessionService struct {
|
||||
// in order to stop the bircp server if necesary
|
||||
bircpEnabled bool
|
||||
connMgr *engine.ConnManager
|
||||
caps *engine.Caps
|
||||
anz *AnalyzerService
|
||||
srvDep map[string]*sync.WaitGroup
|
||||
}
|
||||
|
||||
@@ -94,7 +94,7 @@ func TestSessionSReload1(t *testing.T) {
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers): clientConect,
|
||||
})
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
srv := NewSessionService(cfg, new(DataDBService), server, make(chan birpc.ClientConnector, 1), shdChan, conMng, nil, anz, srvDep)
|
||||
srv := NewSessionService(cfg, new(DataDBService), server, make(chan birpc.ClientConnector, 1), shdChan, conMng, anz, srvDep)
|
||||
err := srv.Start()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -154,7 +154,7 @@ func TestSessionSReload2(t *testing.T) {
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
cfg.StorDbCfg().Type = utils.Internal
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
srv := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, nil, anz, srvDep)
|
||||
srv := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, anz, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
|
||||
srv.(*SessionService).sm = &sessions.SessionS{}
|
||||
@@ -203,13 +203,8 @@ func TestSessionSReload3(t *testing.T) {
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
cfg.StorDbCfg().Type = utils.Internal
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
srv := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, nil, anz, srvDep)
|
||||
srv := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, anz, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
|
||||
srv.(*SessionService).sm = &sessions.SessionS{}
|
||||
if !srv.IsRunning() {
|
||||
t.Fatalf("\nExpecting service to be running")
|
||||
}
|
||||
err2 := srv.(*SessionService).start()
|
||||
if err2 != nil {
|
||||
t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", nil, err2)
|
||||
|
||||
@@ -53,7 +53,7 @@ func TestSessionSCoverage(t *testing.T) {
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
cfg.StorDbCfg().Type = utils.Internal
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
srv := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, nil, anz, srvDep)
|
||||
srv := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, anz, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
if srv.IsRunning() {
|
||||
t.Errorf("Expected service to be down")
|
||||
@@ -65,7 +65,6 @@ func TestSessionSCoverage(t *testing.T) {
|
||||
shdChan: shdChan,
|
||||
connChan: make(chan birpc.ClientConnector, 1),
|
||||
connMgr: nil,
|
||||
caps: nil,
|
||||
anz: anz,
|
||||
srvDep: srvDep,
|
||||
sm: &sessions.SessionS{},
|
||||
|
||||
@@ -53,7 +53,7 @@ func TestSIPAgentReload(t *testing.T) {
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
|
||||
shdChan, nil, nil, anz, srvDep)
|
||||
shdChan, nil, anz, srvDep)
|
||||
srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(srv, sS,
|
||||
|
||||
Reference in New Issue
Block a user