Updated analyzers index key

This commit is contained in:
Trial97
2021-07-15 12:52:34 +03:00
committed by Dan Christian Bogos
parent c2821394d0
commit 079ca76c0c
4 changed files with 35 additions and 3 deletions

View File

@@ -126,7 +126,7 @@ func (aS *AnalyzerService) logTrafic(id uint64, method string,
if strings.HasPrefix(method, utils.AnalyzerSv1) {
return nil
}
return aS.db.Index(utils.ConcatenatedKey(method, strconv.FormatInt(sTime.Unix(), 10)),
return aS.db.Index(utils.ConcatenatedKey(enc, from, to, method, strconv.FormatInt(sTime.Unix(), 10)),
NewInfoRPC(id, method, params, result, err, enc, from, to, sTime, eTime))
}

View File

@@ -23,6 +23,7 @@ package analyzers
import (
"errors"
"flag"
"net"
"net/rpc"
"net/rpc/jsonrpc"
"os"
@@ -32,8 +33,11 @@ import (
"testing"
"time"
"github.com/cenkalti/rpc2"
jsonrpc2 "github.com/cenkalti/rpc2/jsonrpc"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
)
@@ -41,6 +45,7 @@ var (
anzCfgPath string
anzCfg *config.CGRConfig
anzRPC *rpc.Client
anzBiRPC *rpc2.Client
sTestsAlsPrf = []func(t *testing.T){
testAnalyzerSInitCfg,
@@ -53,6 +58,7 @@ var (
testAnalyzerSV1Search,
testAnalyzerSV1Search2,
testAnalyzerSV1SearchWithContentFilters,
testAnalyzerSV1BirPCSession,
testAnalyzerSKillEngine,
}
)
@@ -123,6 +129,13 @@ func testAnalyzerSRPCConn(t *testing.T) {
if err != nil {
t.Fatal(err)
}
conn, err := net.Dial(utils.TCP, anzCfg.SessionSCfg().ListenBijson)
if err != nil {
t.Fatal(err)
}
anzBiRPC = rpc2.NewClientWithCodec(jsonrpc2.NewJSONCodec(conn))
anzBiRPC.Handle(utils.SessionSv1DisconnectPeer, func(clnt *rpc2.Client, args interface{}, rply *string) (err error) { return utils.ErrNotFound })
go anzBiRPC.Run()
}
func testAnalyzerSLoadTarrifPlans(t *testing.T) {
@@ -227,6 +240,22 @@ func testAnalyzerSV1SearchWithContentFilters(t *testing.T) {
}
}
func testAnalyzerSV1BirPCSession(t *testing.T) {
var rply string
anzBiRPC.Call(utils.SessionSv1STIRIdentity,
&sessions.V1STIRIdentityArgs{}, &rply) // only call to register the birpc
if err := anzRPC.Call(utils.SessionSv1DisconnectPeer, &utils.DPRArgs{}, &rply); err == nil ||
err.Error() != utils.ErrPartiallyExecuted.Error() {
t.Fatal(err)
}
time.Sleep(10 * time.Second)
var result []map[string]interface{}
if err := anzRPC.Call(utils.AnalyzerSv1StringQuery, &QueryArgs{HeaderFilters: `+RequestEncoding:\*birpc_json +RequestMethod:"SessionSv1.DisconnectPeer"`}, &result); err != nil {
t.Error(err)
} else if len(result) != 1 {
t.Errorf("Unexpected result: %s", utils.ToJSON(result))
}
}
func testAnalyzerSKillEngine(t *testing.T) {
if err := engine.KillEngine(100); err != nil {
t.Error(err)

View File

@@ -156,8 +156,8 @@ func (c *AnalyzerBiRPCCodec) ReadResponseBody(x interface{}) (err error) {
// WriteRequest must be safe for concurrent use by multiple goroutines.
func (c *AnalyzerBiRPCCodec) WriteRequest(req *rpc2.Request, x interface{}) error {
c.repsLk.Lock()
c.reqIdx = req.Seq
c.reps[c.reqIdx] = &rpcAPI{
c.repIdx = req.Seq
c.reps[c.repIdx] = &rpcAPI{
ID: req.Seq,
Method: req.Method,
StartTime: time.Now(),

View File

@@ -4064,6 +4064,9 @@ func (sS *SessionS) BiRPCv1STIRAuthenticate(clnt rpcclient.ClientConnector,
// BiRPCv1STIRIdentity the API for STIR header creation
func (sS *SessionS) BiRPCv1STIRIdentity(clnt rpcclient.ClientConnector,
args *V1STIRIdentityArgs, identity *string) (err error) {
if args == nil || args.Payload == nil {
return utils.NewErrMandatoryIeMissing("Payload")
}
if args.Payload.ATTest == utils.EmptyString {
args.Payload.ATTest = sS.cgrCfg.SessionSCfg().STIRCfg.DefaultAttest
}