From 7a2d2341083b8d477f5afa88b861b6ea13f4b41c Mon Sep 17 00:00:00 2001 From: Trial97 Date: Tue, 13 Jul 2021 17:15:49 +0300 Subject: [PATCH] Updated caps for birpc server --- analyzers/codec.go | 97 +++++++++++++++++++++++++++++ analyzers/libanalyzers.go | 1 + cmd/cgr-engine/cgr-engine.go | 2 +- cores/caps.go | 85 +++++++++++++++++++++++++ cores/server.go | 11 ++-- cores/server_it_test.go | 8 +-- dispatchers/rates_it_test.go | 10 ++- engine/filters.go | 23 +++++++ services/asteriskagent_it_test.go | 4 +- services/diameteragent_it_test.go | 2 +- services/dnsagent_it_test.go | 2 +- services/ers_it_test.go | 2 +- services/freeswitchagent_it_test.go | 2 +- services/httpagent_it_test.go | 2 +- services/kamailioagent_it_test.go | 2 +- services/radiusagent_it_test.go | 4 +- services/sessions.go | 5 +- services/sessions_it_test.go | 11 +--- services/sessions_test.go | 3 +- services/sipagent_it_test.go | 2 +- 20 files changed, 236 insertions(+), 42 deletions(-) diff --git a/analyzers/codec.go b/analyzers/codec.go index 832315e6b..4e3cdfaea 100644 --- a/analyzers/codec.go +++ b/analyzers/codec.go @@ -80,3 +80,100 @@ func (c *AnalyzerServerCodec) WriteResponse(r *birpc.Response, x interface{}) er } func (c *AnalyzerServerCodec) Close() error { return c.sc.Close() } + +func NewAnalyzerBiRPCCodec(sc birpc.BirpcCodec, aS *AnalyzerService, enc, from, to string) birpc.BirpcCodec { + return &AnalyzerBiRPCCodec{ + sc: sc, + reqs: make(map[uint64]*rpcAPI), + reps: make(map[uint64]*rpcAPI), + aS: aS, + enc: enc, + from: from, + to: to, + } +} + +type AnalyzerBiRPCCodec struct { + sc birpc.BirpcCodec + + // keep the API in memory because the write is async + reqs map[uint64]*rpcAPI + reqIdx uint64 + reqsLk sync.RWMutex + reps map[uint64]*rpcAPI + repIdx uint64 + repsLk sync.RWMutex + + aS *AnalyzerService + enc string + from string + to string +} + +// ReadHeader must read a message and populate either the request +// or the response by inspecting the incoming message. +func (c *AnalyzerBiRPCCodec) ReadHeader(req *birpc.Request, resp *birpc.Response) (err error) { + err = c.sc.ReadHeader(req, resp) + if req.ServiceMethod != "" { + c.reqsLk.Lock() + c.reqIdx = req.Seq + c.reqs[c.reqIdx] = &rpcAPI{ + ID: req.Seq, + Method: req.ServiceMethod, + StartTime: time.Now(), + } + c.reqsLk.Unlock() + } else { + c.repsLk.Lock() + c.repIdx = resp.Seq + c.reps[c.repIdx].Error = resp.Error + c.repsLk.Unlock() + } + return +} + +// ReadRequestBody into args argument of handler function. +func (c *AnalyzerBiRPCCodec) ReadRequestBody(x interface{}) (err error) { + err = c.sc.ReadRequestBody(x) + c.reqsLk.Lock() + c.reqs[c.reqIdx].Params = x + c.reqsLk.Unlock() + return +} + +// ReadResponseBody into reply argument of handler function. +func (c *AnalyzerBiRPCCodec) ReadResponseBody(x interface{}) (err error) { + err = c.sc.ReadResponseBody(x) + c.repsLk.Lock() + api := c.reqs[c.repIdx] + delete(c.reqs, c.repIdx) + c.repsLk.Unlock() + go c.aS.logTrafic(api.ID, api.Method, api.Params, x, api.Error, c.enc, c.to, c.from, api.StartTime, time.Now()) + return +} + +// WriteRequest must be safe for concurrent use by multiple goroutines. +func (c *AnalyzerBiRPCCodec) WriteRequest(req *birpc.Request, x interface{}) error { + c.repsLk.Lock() + c.reqIdx = req.Seq + c.reqs[c.reqIdx] = &rpcAPI{ + ID: req.Seq, + Method: req.ServiceMethod, + StartTime: time.Now(), + } + c.repsLk.Unlock() + return c.sc.WriteRequest(req, x) +} + +// WriteResponse must be safe for concurrent use by multiple goroutines. +func (c *AnalyzerBiRPCCodec) WriteResponse(r *birpc.Response, x interface{}) error { + c.reqsLk.Lock() + api := c.reqs[r.Seq] + delete(c.reqs, r.Seq) + c.reqsLk.Unlock() + go c.aS.logTrafic(api.ID, api.Method, api.Params, x, r.Error, c.enc, c.from, c.to, api.StartTime, time.Now()) + return c.sc.WriteResponse(r, x) +} + +// Close is called when client/server finished with the connection. +func (c *AnalyzerBiRPCCodec) Close() error { return c.sc.Close() } diff --git a/analyzers/libanalyzers.go b/analyzers/libanalyzers.go index 143476936..d99246ad9 100644 --- a/analyzers/libanalyzers.go +++ b/analyzers/libanalyzers.go @@ -82,6 +82,7 @@ type rpcAPI struct { ID uint64 `json:"id"` Method string `json:"method"` Params interface{} `json:"params"` + Error string `json:"err,omitempty"` StartTime time.Time } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 8e3d0c67f..a031336de 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -622,7 +622,7 @@ func main() { cdrS := services.NewCDRServer(cfg, dmService, storDBService, filterSChan, server, internalCDRServerChan, connManager, anz, srvDep) - smg := services.NewSessionService(cfg, dmService, server, internalSessionSChan, shdChan, connManager, caps, anz, srvDep) + smg := services.NewSessionService(cfg, dmService, server, internalSessionSChan, shdChan, connManager, anz, srvDep) ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, internalLoaderSChan, connManager, anz, srvDep) diff --git a/cores/caps.go b/cores/caps.go index 551fbdc43..78bbe90ef 100644 --- a/cores/caps.go +++ b/cores/caps.go @@ -106,3 +106,88 @@ func (c *capsServerCodec) WriteResponse(r *birpc.Response, x interface{}) error return c.sc.WriteResponse(r, x) } func (c *capsServerCodec) Close() error { return c.sc.Close() } + +func newCapsBiRPCGOBCodec(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) (r birpc.BirpcCodec) { + r = newCapsBiRPCCodec(birpc.NewGobBirpcCodec(conn), caps) + if anz != nil { + from := conn.RemoteAddr() + var fromstr string + if from != nil { + fromstr = from.String() + } + to := conn.LocalAddr() + var tostr string + if to != nil { + tostr = to.String() + } + return analyzers.NewAnalyzerBiRPCCodec(r, anz, utils.MetaGOB, fromstr, tostr) + } + return +} + +func newCapsBiRPCJSONCodec(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) (r birpc.BirpcCodec) { + r = newCapsBiRPCCodec(jsonrpc.NewJSONBirpcCodec(conn), caps) + if anz != nil { + from := conn.RemoteAddr() + var fromstr string + if from != nil { + fromstr = from.String() + } + to := conn.LocalAddr() + var tostr string + if to != nil { + tostr = to.String() + } + return analyzers.NewAnalyzerBiRPCCodec(r, anz, utils.MetaJSON, fromstr, tostr) + } + return +} + +func newCapsBiRPCCodec(sc birpc.BirpcCodec, caps *engine.Caps) birpc.BirpcCodec { + return &capsBiRPCCodec{ + sc: sc, + caps: caps, + } +} + +type capsBiRPCCodec struct { + sc birpc.BirpcCodec + caps *engine.Caps +} + +// ReadHeader must read a message and populate either the request +// or the response by inspecting the incoming message. +func (c *capsBiRPCCodec) ReadHeader(req *birpc.Request, resp *birpc.Response) (err error) { + return c.sc.ReadHeader(req, resp) +} + +// ReadRequestBody into args argument of handler function. +func (c *capsBiRPCCodec) ReadRequestBody(x interface{}) (err error) { + if err = c.caps.Allocate(); err != nil { + return + } + return c.sc.ReadRequestBody(x) +} + +// ReadResponseBody into reply argument of handler function. +func (c *capsBiRPCCodec) ReadResponseBody(x interface{}) error { + return c.sc.ReadResponseBody(x) +} + +// WriteRequest must be safe for concurrent use by multiple goroutines. +func (c *capsBiRPCCodec) WriteRequest(req *birpc.Request, x interface{}) error { + return c.sc.WriteRequest(req, x) +} + +// WriteResponse must be safe for concurrent use by multiple goroutines. +func (c *capsBiRPCCodec) WriteResponse(r *birpc.Response, x interface{}) error { + if r.Error == utils.ErrMaxConcurentRPCExceededNoCaps.Error() { + r.Error = utils.ErrMaxConcurentRPCExceeded.Error() + } else { + defer c.caps.Deallocate() + } + return c.sc.WriteResponse(r, x) +} + +// Close is called when client/server finished with the connection. +func (c *capsBiRPCCodec) Close() error { return c.sc.Close() } diff --git a/cores/server.go b/cores/server.go index 29277c554..89a1a2ef4 100644 --- a/cores/server.go +++ b/cores/server.go @@ -35,7 +35,6 @@ import ( "time" "github.com/cgrates/birpc" - "github.com/cgrates/birpc/jsonrpc" "github.com/cgrates/cgrates/analyzers" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -263,14 +262,14 @@ func (s *Server) ServeBiRPC(addrJSON, addrGOB string, onConn func(birpc.ClientCo s.birpcSrv.OnDisconnect(onDis) if addrJSON != utils.EmptyString { var ljson net.Listener - if ljson, err = s.listenBiRPC(s.birpcSrv, addrJSON, utils.JSONCaps, jsonrpc.NewJSONBirpcCodec); err != nil { + if ljson, err = s.listenBiRPC(s.birpcSrv, addrJSON, utils.JSONCaps, newCapsBiRPCJSONCodec); err != nil { return } defer ljson.Close() } if addrGOB != utils.EmptyString { var lgob net.Listener - if lgob, err = s.listenBiRPC(s.birpcSrv, addrGOB, utils.GOBCaps, birpc.NewGobBirpcCodec); err != nil { + if lgob, err = s.listenBiRPC(s.birpcSrv, addrGOB, utils.GOBCaps, newCapsBiRPCGOBCodec); err != nil { return } defer lgob.Close() @@ -279,7 +278,7 @@ func (s *Server) ServeBiRPC(addrJSON, addrGOB string, onConn func(birpc.ClientCo return } -func (s *Server) listenBiRPC(srv *birpc.BirpcServer, addr, codecName string, newCodec func(io.ReadWriteCloser) birpc.BirpcCodec) (lBiRPC net.Listener, err error) { +func (s *Server) listenBiRPC(srv *birpc.BirpcServer, addr, codecName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) birpc.BirpcCodec) (lBiRPC net.Listener, err error) { if lBiRPC, err = net.Listen(utils.TCP, addr); err != nil { log.Printf("ServeBi%s listen error: %s \n", codecName, err) return @@ -289,7 +288,7 @@ func (s *Server) listenBiRPC(srv *birpc.BirpcServer, addr, codecName string, new return } -func (s *Server) acceptBiRPC(srv *birpc.BirpcServer, l net.Listener, codecName string, newCodec func(io.ReadWriteCloser) birpc.BirpcCodec) { +func (s *Server) acceptBiRPC(srv *birpc.BirpcServer, l net.Listener, codecName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) birpc.BirpcCodec) { for { conn, err := l.Accept() if err != nil { @@ -300,7 +299,7 @@ func (s *Server) acceptBiRPC(srv *birpc.BirpcServer, l net.Listener, codecName s utils.Logger.Crit(fmt.Sprintf("Stoped Bi%s server beacause %s", codecName, err)) return // stop if we get Accept error } - go srv.ServeCodec(newCodec(conn)) + go srv.ServeCodec(newCodec(conn, s.caps, s.anz)) } } diff --git a/cores/server_it_test.go b/cores/server_it_test.go index 4c96181b1..4ba04882b 100644 --- a/cores/server_it_test.go +++ b/cores/server_it_test.go @@ -92,7 +92,7 @@ func TestServerIT(t *testing.T) { utils.Logger.SetLogLevel(7) for _, test := range sTestsServer { log.SetOutput(io.Discard) - t.Run("Running IT serve tests", test) + t.Run("TestServerIT", test) } } @@ -756,7 +756,7 @@ func testAcceptBiRPC(t *testing.T) { l := &mockListener{ p1: p1, } - go server.acceptBiRPC(server.birpcSrv, l, utils.JSONCaps, jsonrpc.NewJSONBirpcCodec) + go server.acceptBiRPC(server.birpcSrv, l, utils.JSONCaps, newCapsBiRPCJSONCodec) rpc := jsonrpc.NewClient(p2) var reply string expected := "birpc: can't find method AttributeSv1.Ping" @@ -777,14 +777,14 @@ func (mK *mockListenError) Accept() (net.Conn, error) { } func testAcceptBiRPCError(t *testing.T) { - caps := engine.NewCaps(0, utils.MetaBusy) + caps := engine.NewCaps(10, utils.MetaBusy) server := NewServer(caps) server.RpcRegister(new(mockRegister)) server.birpcSrv = birpc.NewBirpcServer() //it will contain "use of closed network connection" l := new(mockListenError) - go server.acceptBiRPC(server.birpcSrv, l, utils.JSONCaps, jsonrpc.NewJSONBirpcCodec) + go server.acceptBiRPC(server.birpcSrv, l, utils.JSONCaps, newCapsBiRPCJSONCodec) runtime.Gosched() } diff --git a/dispatchers/rates_it_test.go b/dispatchers/rates_it_test.go index ee91c8b2e..48d01ae5a 100644 --- a/dispatchers/rates_it_test.go +++ b/dispatchers/rates_it_test.go @@ -121,9 +121,8 @@ func testDspRPrfCostForEvent(t *testing.T) { Cost: utils.NewDecimal(12, 2), CostIntervals: []*utils.RateSIntervalCost{{ Increments: []*utils.RateSIncrementCost{{ - Usage: utils.NewDecimal(int64(time.Minute), 0), - IntervalRateIndex: 0, - CompressFactor: 1, + Usage: utils.NewDecimal(int64(time.Minute), 0), + CompressFactor: 1, }}, CompressFactor: 1, }}, @@ -187,9 +186,8 @@ func testDspRPrfCostForEventWithoutFilters(t *testing.T) { Cost: utils.NewDecimal(25, 2), CostIntervals: []*utils.RateSIntervalCost{{ Increments: []*utils.RateSIncrementCost{{ - Usage: utils.NewDecimal(int64(time.Minute), 0), - IntervalRateIndex: 0, - CompressFactor: 60, + Usage: utils.NewDecimal(int64(time.Minute), 0), + CompressFactor: 60, }}, CompressFactor: 1, }}, diff --git a/engine/filters.go b/engine/filters.go index 7ada25855..389ffc10e 100644 --- a/engine/filters.go +++ b/engine/filters.go @@ -159,6 +159,29 @@ func splitDynFltrValues(val, sep string) (vals []string) { return append(vals, valsEnd[1:]...) } +func splitInlineFilter(rule string) (splt []string) { + var p, st int + splt = make([]string, 0, 3) + for i, b := range rule { + switch byte(b) { + case utils.InInFieldSep[0]: + if p == 0 { + splt = append(splt, rule[st:i]) + st = i + 1 + if len(splt) == 2 { + splt = append(splt, rule[st:]) + return + } + } + case utils.IdxStart[0]: + p++ + case utils.IdxEnd[0]: + p-- + } + } + return +} + // NewFilterFromInline parses an inline rule into a compiled Filter func NewFilterFromInline(tenant, inlnRule string) (f *Filter, err error) { ruleSplt := strings.SplitN(inlnRule, utils.InInFieldSep, 3) diff --git a/services/asteriskagent_it_test.go b/services/asteriskagent_it_test.go index 96b0df3c7..cf60d365d 100644 --- a/services/asteriskagent_it_test.go +++ b/services/asteriskagent_it_test.go @@ -55,7 +55,7 @@ func TestAsteriskAgentReload(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) + shdChan, nil, anz, srvDep) srv := NewAsteriskAgent(cfg, shdChan, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, @@ -117,7 +117,7 @@ func TestAsteriskAgentReload2(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) + shdChan, nil, anz, srvDep) srv := NewAsteriskAgent(cfg, shdChan, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, diff --git a/services/diameteragent_it_test.go b/services/diameteragent_it_test.go index b96d8d87e..45c2a1e11 100644 --- a/services/diameteragent_it_test.go +++ b/services/diameteragent_it_test.go @@ -50,7 +50,7 @@ func TestDiameterAgentReload1(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) + shdChan, nil, anz, srvDep) srv := NewDiameterAgent(cfg, filterSChan, shdChan, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, diff --git a/services/dnsagent_it_test.go b/services/dnsagent_it_test.go index e3b2dd327..2e11f44c0 100644 --- a/services/dnsagent_it_test.go +++ b/services/dnsagent_it_test.go @@ -57,7 +57,7 @@ func TestDNSAgentReload(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) + shdChan, nil, anz, srvDep) srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, diff --git a/services/ers_it_test.go b/services/ers_it_test.go index 2787f0991..74d738b8a 100644 --- a/services/ers_it_test.go +++ b/services/ers_it_test.go @@ -65,7 +65,7 @@ func TestEventReaderSReload(t *testing.T) { srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) db := NewDataDBService(cfg, nil, srvDep) - sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, nil, anz, srvDep) + sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, anz, srvDep) erS := NewEventReaderService(cfg, filterSChan, shdChan, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(erS, sS, diff --git a/services/freeswitchagent_it_test.go b/services/freeswitchagent_it_test.go index ace8809d6..003c95dca 100644 --- a/services/freeswitchagent_it_test.go +++ b/services/freeswitchagent_it_test.go @@ -57,7 +57,7 @@ func TestFreeSwitchAgentReload(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) + shdChan, nil, anz, srvDep) srv := NewFreeswitchAgent(cfg, shdChan, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, diff --git a/services/httpagent_it_test.go b/services/httpagent_it_test.go index 27dee0cdd..deec43520 100644 --- a/services/httpagent_it_test.go +++ b/services/httpagent_it_test.go @@ -55,7 +55,7 @@ func TestHTTPAgentReload(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) + shdChan, nil, anz, srvDep) srv := NewHTTPAgent(cfg, filterSChan, server, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, diff --git a/services/kamailioagent_it_test.go b/services/kamailioagent_it_test.go index 0f0adce25..9de2abc72 100644 --- a/services/kamailioagent_it_test.go +++ b/services/kamailioagent_it_test.go @@ -55,7 +55,7 @@ func TestKamailioAgentReload(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) + shdChan, nil, anz, srvDep) srv := NewKamailioAgent(cfg, shdChan, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, diff --git a/services/radiusagent_it_test.go b/services/radiusagent_it_test.go index 07fc59a29..24db12210 100644 --- a/services/radiusagent_it_test.go +++ b/services/radiusagent_it_test.go @@ -58,7 +58,7 @@ func TestRadiusAgentReload(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) + shdChan, nil, anz, srvDep) srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, @@ -122,7 +122,7 @@ func TestRadiusAgentReload2(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) + shdChan, nil, anz, srvDep) srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, diff --git a/services/sessions.go b/services/sessions.go index 8d5c1336e..9f8b2d7fb 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -36,8 +36,7 @@ import ( func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, server *cores.Server, internalChan chan birpc.ClientConnector, shdChan *utils.SyncedChan, connMgr *engine.ConnManager, - caps *engine.Caps, anz *AnalyzerService, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &SessionService{ connChan: internalChan, cfg: cfg, @@ -45,7 +44,6 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, server: server, shdChan: shdChan, connMgr: connMgr, - caps: caps, anz: anz, srvDep: srvDep, } @@ -68,7 +66,6 @@ type SessionService struct { // in order to stop the bircp server if necesary bircpEnabled bool connMgr *engine.ConnManager - caps *engine.Caps anz *AnalyzerService srvDep map[string]*sync.WaitGroup } diff --git a/services/sessions_it_test.go b/services/sessions_it_test.go index 6de678ee2..a4818c5df 100644 --- a/services/sessions_it_test.go +++ b/services/sessions_it_test.go @@ -94,7 +94,7 @@ func TestSessionSReload1(t *testing.T) { utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers): clientConect, }) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) - srv := NewSessionService(cfg, new(DataDBService), server, make(chan birpc.ClientConnector, 1), shdChan, conMng, nil, anz, srvDep) + srv := NewSessionService(cfg, new(DataDBService), server, make(chan birpc.ClientConnector, 1), shdChan, conMng, anz, srvDep) err := srv.Start() if err != nil { t.Fatal(err) @@ -154,7 +154,7 @@ func TestSessionSReload2(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) cfg.StorDbCfg().Type = utils.Internal anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) - srv := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, nil, anz, srvDep) + srv := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, anz, srvDep) engine.NewConnManager(cfg, nil) srv.(*SessionService).sm = &sessions.SessionS{} @@ -203,13 +203,8 @@ func TestSessionSReload3(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) cfg.StorDbCfg().Type = utils.Internal anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) - srv := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, nil, anz, srvDep) + srv := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, anz, srvDep) engine.NewConnManager(cfg, nil) - - srv.(*SessionService).sm = &sessions.SessionS{} - if !srv.IsRunning() { - t.Fatalf("\nExpecting service to be running") - } err2 := srv.(*SessionService).start() if err2 != nil { t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", nil, err2) diff --git a/services/sessions_test.go b/services/sessions_test.go index c28320f8f..3766b396e 100644 --- a/services/sessions_test.go +++ b/services/sessions_test.go @@ -53,7 +53,7 @@ func TestSessionSCoverage(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) cfg.StorDbCfg().Type = utils.Internal anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) - srv := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, nil, anz, srvDep) + srv := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, anz, srvDep) engine.NewConnManager(cfg, nil) if srv.IsRunning() { t.Errorf("Expected service to be down") @@ -65,7 +65,6 @@ func TestSessionSCoverage(t *testing.T) { shdChan: shdChan, connChan: make(chan birpc.ClientConnector, 1), connMgr: nil, - caps: nil, anz: anz, srvDep: srvDep, sm: &sessions.SessionS{}, diff --git a/services/sipagent_it_test.go b/services/sipagent_it_test.go index a97012661..0f1fb531b 100644 --- a/services/sipagent_it_test.go +++ b/services/sipagent_it_test.go @@ -53,7 +53,7 @@ func TestSIPAgentReload(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) + shdChan, nil, anz, srvDep) srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS,