diff --git a/analyzers/analyzers.go b/analyzers/analyzers.go index 16ccb5797..2cc13ea0c 100644 --- a/analyzers/analyzers.go +++ b/analyzers/analyzers.go @@ -126,7 +126,7 @@ func (aS *AnalyzerService) logTrafic(id uint64, method string, if strings.HasPrefix(method, utils.AnalyzerSv1) { return nil } - return aS.db.Index(utils.ConcatenatedKey(method, strconv.FormatInt(sTime.Unix(), 10)), + return aS.db.Index(utils.ConcatenatedKey(enc, from, to, method, strconv.FormatInt(sTime.Unix(), 10)), NewInfoRPC(id, method, params, result, err, enc, from, to, sTime, eTime)) } diff --git a/analyzers/analyzers_it_test.go b/analyzers/analyzers_it_test.go index 763d8ec53..6c3f21de3 100644 --- a/analyzers/analyzers_it_test.go +++ b/analyzers/analyzers_it_test.go @@ -23,6 +23,7 @@ package analyzers import ( "errors" "flag" + "net" "net/rpc" "net/rpc/jsonrpc" "os" @@ -32,8 +33,11 @@ import ( "testing" "time" + "github.com/cenkalti/rpc2" + jsonrpc2 "github.com/cenkalti/rpc2/jsonrpc" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" ) @@ -41,6 +45,7 @@ var ( anzCfgPath string anzCfg *config.CGRConfig anzRPC *rpc.Client + anzBiRPC *rpc2.Client sTestsAlsPrf = []func(t *testing.T){ testAnalyzerSInitCfg, @@ -53,6 +58,7 @@ var ( testAnalyzerSV1Search, testAnalyzerSV1Search2, testAnalyzerSV1SearchWithContentFilters, + testAnalyzerSV1BirPCSession, testAnalyzerSKillEngine, } ) @@ -123,6 +129,13 @@ func testAnalyzerSRPCConn(t *testing.T) { if err != nil { t.Fatal(err) } + conn, err := net.Dial(utils.TCP, anzCfg.SessionSCfg().ListenBijson) + if err != nil { + t.Fatal(err) + } + anzBiRPC = rpc2.NewClientWithCodec(jsonrpc2.NewJSONCodec(conn)) + anzBiRPC.Handle(utils.SessionSv1DisconnectPeer, func(clnt *rpc2.Client, args interface{}, rply *string) (err error) { return utils.ErrNotFound }) + go anzBiRPC.Run() } func testAnalyzerSLoadTarrifPlans(t *testing.T) { @@ -227,6 +240,22 @@ func testAnalyzerSV1SearchWithContentFilters(t *testing.T) { } } +func testAnalyzerSV1BirPCSession(t *testing.T) { + var rply string + anzBiRPC.Call(utils.SessionSv1STIRIdentity, + &sessions.V1STIRIdentityArgs{}, &rply) // only call to register the birpc + if err := anzRPC.Call(utils.SessionSv1DisconnectPeer, &utils.DPRArgs{}, &rply); err == nil || + err.Error() != utils.ErrPartiallyExecuted.Error() { + t.Fatal(err) + } + time.Sleep(10 * time.Second) + var result []map[string]interface{} + if err := anzRPC.Call(utils.AnalyzerSv1StringQuery, &QueryArgs{HeaderFilters: `+RequestEncoding:\*birpc_json +RequestMethod:"SessionSv1.DisconnectPeer"`}, &result); err != nil { + t.Error(err) + } else if len(result) != 1 { + t.Errorf("Unexpected result: %s", utils.ToJSON(result)) + } +} func testAnalyzerSKillEngine(t *testing.T) { if err := engine.KillEngine(100); err != nil { t.Error(err) diff --git a/analyzers/codec.go b/analyzers/codec.go index e9a1395df..aa50914ef 100644 --- a/analyzers/codec.go +++ b/analyzers/codec.go @@ -156,8 +156,8 @@ func (c *AnalyzerBiRPCCodec) ReadResponseBody(x interface{}) (err error) { // WriteRequest must be safe for concurrent use by multiple goroutines. func (c *AnalyzerBiRPCCodec) WriteRequest(req *rpc2.Request, x interface{}) error { c.repsLk.Lock() - c.reqIdx = req.Seq - c.reps[c.reqIdx] = &rpcAPI{ + c.repIdx = req.Seq + c.reps[c.repIdx] = &rpcAPI{ ID: req.Seq, Method: req.Method, StartTime: time.Now(), diff --git a/sessions/sessions.go b/sessions/sessions.go index 2627aac53..528f9863e 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -4064,6 +4064,9 @@ func (sS *SessionS) BiRPCv1STIRAuthenticate(clnt rpcclient.ClientConnector, // BiRPCv1STIRIdentity the API for STIR header creation func (sS *SessionS) BiRPCv1STIRIdentity(clnt rpcclient.ClientConnector, args *V1STIRIdentityArgs, identity *string) (err error) { + if args == nil || args.Payload == nil { + return utils.NewErrMandatoryIeMissing("Payload") + } if args.Payload.ATTest == utils.EmptyString { args.Payload.ATTest = sS.cgrCfg.SessionSCfg().STIRCfg.DefaultAttest }