Added tests for analyzers over birpc

This commit is contained in:
Trial97
2021-07-14 17:11:25 +03:00
committed by Dan Christian Bogos
parent 8ed0a145f9
commit c2821394d0
5 changed files with 313 additions and 5 deletions

View File

@@ -146,8 +146,8 @@ func (c *AnalyzerBiRPCCodec) ReadRequestBody(x interface{}) (err error) {
func (c *AnalyzerBiRPCCodec) ReadResponseBody(x interface{}) (err error) {
err = c.sc.ReadResponseBody(x)
c.repsLk.Lock()
api := c.reqs[c.repIdx]
delete(c.reqs, c.repIdx)
api := c.reps[c.repIdx]
delete(c.reps, c.repIdx)
c.repsLk.Unlock()
go c.aS.logTrafic(api.ID, api.Method, api.Params, x, api.Error, c.enc, c.to, c.from, api.StartTime, time.Now())
return
@@ -157,7 +157,7 @@ func (c *AnalyzerBiRPCCodec) ReadResponseBody(x interface{}) (err error) {
func (c *AnalyzerBiRPCCodec) WriteRequest(req *rpc2.Request, x interface{}) error {
c.repsLk.Lock()
c.reqIdx = req.Seq
c.reqs[c.reqIdx] = &rpcAPI{
c.reps[c.reqIdx] = &rpcAPI{
ID: req.Seq,
Method: req.Method,
StartTime: time.Now(),

View File

@@ -26,8 +26,10 @@ import (
"testing"
"time"
"github.com/cenkalti/rpc2"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
type mockServerCodec struct{}
@@ -97,3 +99,129 @@ func TestNewServerCodec(t *testing.T) {
t.Fatal(err)
}
}
type mockBiRPCCodec struct{}
func (mockBiRPCCodec) ReadHeader(r *rpc2.Request, _ *rpc2.Response) error {
r.Seq = 0
r.Method = utils.CoreSv1Ping
return nil
}
func (mockBiRPCCodec) ReadRequestBody(interface{}) error { return nil }
func (mockBiRPCCodec) ReadResponseBody(interface{}) error { return nil }
func (mockBiRPCCodec) WriteRequest(*rpc2.Request, interface{}) error { return nil }
func (mockBiRPCCodec) WriteResponse(*rpc2.Response, interface{}) error { return nil }
func (mockBiRPCCodec) Close() error { return nil }
func TestNewBiRPCCodec(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers"
cfg.AnalyzerSCfg().TTL = 30 * time.Minute
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
t.Fatal(err)
}
if err := os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg)
if err != nil {
t.Fatal(err)
}
codec := NewAnalyzerBiRPCCodec(new(mockBiRPCCodec), anz, rpcclient.BiRPCJSON, "127.0.0.1:5565", "127.0.0.1:2012")
r := new(rpc2.Request)
expR := &rpc2.Request{
Seq: 0,
Method: utils.CoreSv1Ping,
}
if err = codec.ReadHeader(r, nil); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(r, expR) {
t.Errorf("Expected: %v ,received:%v", expR, r)
}
if err = codec.ReadRequestBody("args"); err != nil {
t.Fatal(err)
}
if err = codec.WriteResponse(&rpc2.Response{
Error: "error",
Seq: 0,
}, "reply"); err != nil {
t.Fatal(err)
}
if err = codec.Close(); err != nil {
t.Fatal(err)
}
time.Sleep(200 * time.Millisecond)
runtime.Gosched()
if cnt, err := anz.db.DocCount(); err != nil {
t.Fatal(err)
} else if cnt != 1 {
t.Errorf("Expected only one document received:%v", cnt)
}
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
t.Fatal(err)
}
}
type mockBiRPCCodec2 struct{}
func (mockBiRPCCodec2) ReadHeader(_ *rpc2.Request, r *rpc2.Response) error {
r.Seq = 0
r.Error = "error"
return nil
}
func (mockBiRPCCodec2) ReadRequestBody(interface{}) error { return nil }
func (mockBiRPCCodec2) ReadResponseBody(interface{}) error { return nil }
func (mockBiRPCCodec2) WriteRequest(*rpc2.Request, interface{}) error { return nil }
func (mockBiRPCCodec2) WriteResponse(*rpc2.Response, interface{}) error { return nil }
func (mockBiRPCCodec2) Close() error { return nil }
func TestNewBiRPCCodec2(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers"
cfg.AnalyzerSCfg().TTL = 30 * time.Minute
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
t.Fatal(err)
}
if err := os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg)
if err != nil {
t.Fatal(err)
}
codec := NewAnalyzerBiRPCCodec(new(mockBiRPCCodec2), anz, rpcclient.BiRPCJSON, "127.0.0.1:5565", "127.0.0.1:2012")
if err = codec.WriteRequest(&rpc2.Request{Seq: 0, Method: utils.CoreSv1Ping}, "args"); err != nil {
t.Fatal(err)
}
r := new(rpc2.Response)
expR := &rpc2.Response{
Seq: 0,
Error: "error",
}
if err = codec.ReadHeader(&rpc2.Request{}, r); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(r, expR) {
t.Errorf("Expected: %v ,received:%v", expR, r)
}
if err = codec.ReadResponseBody("args"); err != nil {
t.Fatal(err)
}
if err = codec.Close(); err != nil {
t.Fatal(err)
}
time.Sleep(200 * time.Millisecond)
runtime.Gosched()
if cnt, err := anz.db.DocCount(); err != nil {
t.Fatal(err)
} else if cnt != 1 {
t.Errorf("Expected only one document received:%v", cnt)
}
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
t.Fatal(err)
}
}

View File

@@ -21,12 +21,14 @@ package analyzers
import (
"errors"
"os"
"reflect"
"runtime"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
type mockConnector struct{}
@@ -63,3 +65,72 @@ func TestNewAnalyzeConnector(t *testing.T) {
t.Fatal(err)
}
}
func (c *mockConnector) CallBiRPC(cl rpcclient.ClientConnector, serviceMethod string, args, reply interface{}) (err error) {
return c.Call(serviceMethod, args, reply)
}
func (c *mockConnector) Handlers() map[string]interface{} { return make(map[string]interface{}) }
func TestNewAnalyzeBiRPCConnector1(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers"
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
t.Fatal(err)
}
if err := os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg)
if err != nil {
t.Fatal(err)
}
rpc := anz.NewAnalyzerBiRPCConnector(new(mockConnector), utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012")
if err = rpc.Call(utils.CoreSv1Ping, "args", "reply"); err == nil || err.Error() != "error" {
t.Errorf("Expected 'error' received %v", err)
}
time.Sleep(100 * time.Millisecond)
runtime.Gosched()
if cnt, err := anz.db.DocCount(); err != nil {
t.Fatal(err)
} else if cnt != 1 {
t.Errorf("Expected only one document received:%v", cnt)
}
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
t.Fatal(err)
}
}
func TestNewAnalyzeBiRPCConnector2(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers"
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
t.Fatal(err)
}
if err := os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg)
if err != nil {
t.Fatal(err)
}
rpc := anz.NewAnalyzerBiRPCConnector(new(mockConnector), utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012")
if err = rpc.CallBiRPC(nil, utils.CoreSv1Ping, "args", "reply"); err == nil || err.Error() != "error" {
t.Errorf("Expected 'error' received %v", err)
}
time.Sleep(100 * time.Millisecond)
runtime.Gosched()
if cnt, err := anz.db.DocCount(); err != nil {
t.Fatal(err)
} else if cnt != 1 {
t.Errorf("Expected only one document received:%v", cnt)
}
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
t.Fatal(err)
}
exp := make(map[string]interface{})
if rply := rpc.Handlers(); !reflect.DeepEqual(rply, exp) {
t.Errorf("Expected: %v ,received:%v", exp, rply)
}
}

View File

@@ -28,6 +28,7 @@ import (
"github.com/cgrates/cgrates/analyzers"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
type conn interface {
@@ -122,7 +123,7 @@ func newCapsBiRPCGOBCodec(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerS
if to != nil {
tostr = to.String()
}
return analyzers.NewAnalyzerBiRPCCodec(r, anz, utils.MetaGOB, fromstr, tostr)
return analyzers.NewAnalyzerBiRPCCodec(r, anz, rpcclient.BiRPCGOB, fromstr, tostr)
}
return
}
@@ -140,7 +141,7 @@ func newCapsBiRPCJSONCodec(conn conn, caps *engine.Caps, anz *analyzers.Analyzer
if to != nil {
tostr = to.String()
}
return analyzers.NewAnalyzerBiRPCCodec(r, anz, utils.MetaJSON, fromstr, tostr)
return analyzers.NewAnalyzerBiRPCCodec(r, anz, rpcclient.BiRPCJSON, fromstr, tostr)
}
return
}

View File

@@ -25,9 +25,12 @@ import (
"syscall"
"testing"
"github.com/cenkalti/rpc2"
jsonrpc2 "github.com/cenkalti/rpc2/jsonrpc"
"github.com/cgrates/cgrates/analyzers"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
type mockServerCodec struct{}
@@ -136,3 +139,108 @@ func TestNewCapsJSONCodec(t *testing.T) {
t.Errorf("Expected: %v ,received:%v", exp, r)
}
}
type mockBiRPCCodec struct{}
func (mockBiRPCCodec) ReadHeader(r *rpc2.Request, _ *rpc2.Response) error {
r.Seq = 0
r.Method = utils.CoreSv1Ping
return nil
}
func (mockBiRPCCodec) ReadRequestBody(interface{}) error { return utils.ErrNotImplemented }
func (mockBiRPCCodec) ReadResponseBody(interface{}) error { return nil }
func (mockBiRPCCodec) WriteRequest(*rpc2.Request, interface{}) error { return nil }
func (mockBiRPCCodec) WriteResponse(*rpc2.Response, interface{}) error { return nil }
func (mockBiRPCCodec) Close() error { return nil }
func TestNewCapsBiRPCCodec(t *testing.T) {
mk := new(mockBiRPCCodec)
cr := engine.NewCaps(0, utils.MetaBusy)
if r := newCapsBiRPCCodec(mk, cr); !reflect.DeepEqual(mk, r) {
t.Errorf("Expected: %v ,received:%v", mk, r)
}
cr = engine.NewCaps(1, utils.MetaBusy)
exp := &capsBiRPCCodec{
sc: mk,
caps: cr,
}
codec := newCapsBiRPCCodec(mk, cr)
if !reflect.DeepEqual(exp, codec) {
t.Errorf("Expected: %v ,received:%v", exp, codec)
}
var err error
r := new(rpc2.Request)
expR := &rpc2.Request{
Seq: 0,
Method: utils.CoreSv1Ping,
}
if err = codec.ReadHeader(r, nil); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(r, expR) {
t.Errorf("Expected: %v ,received:%v", expR, r)
}
if err = codec.ReadRequestBody("args"); err == nil || err != utils.ErrNotImplemented {
t.Fatal(err)
}
if err = codec.ReadRequestBody("args"); err != utils.ErrMaxConcurentRPCExceededNoCaps {
t.Errorf("Expected error: %v ,received: %v ", utils.ErrMaxConcurentRPCExceededNoCaps, err)
}
if err = codec.WriteResponse(&rpc2.Response{
Error: "error",
Seq: 0,
}, "reply"); err != nil {
t.Fatal(err)
}
if err = codec.ReadResponseBody(nil); err != nil {
t.Fatal(err)
}
if err = codec.WriteRequest(&rpc2.Request{
Seq: 0,
Method: utils.CoreSv1Ping,
}, "reply"); err != nil {
t.Fatal(err)
}
if err = codec.WriteResponse(&rpc2.Response{
Error: utils.ErrMaxConcurentRPCExceededNoCaps.Error(),
Seq: 0,
}, "reply"); err != nil {
t.Fatal(err)
}
if err = codec.Close(); err != nil {
t.Fatal(err)
}
}
func TestNewCapsGOBBiRPCCodec(t *testing.T) {
conn := new(mockConn)
cr := engine.NewCaps(0, utils.MetaBusy)
anz := &analyzers.AnalyzerService{}
exp := rpc2.NewGobCodec(conn)
if r := newCapsBiRPCGOBCodec(conn, cr, nil); !reflect.DeepEqual(r, exp) {
t.Errorf("Expected: %v ,received:%v", exp, r)
}
exp = analyzers.NewAnalyzerBiRPCCodec(rpc2.NewGobCodec(conn), anz, rpcclient.BiRPCGOB, utils.Local, utils.Local)
if r := newCapsBiRPCGOBCodec(conn, cr, anz); !reflect.DeepEqual(r, exp) {
t.Errorf("Expected: %v ,received:%v", exp, r)
}
}
func TestNewCapsJSONBiRPCCodec(t *testing.T) {
conn := new(mockConn)
cr := engine.NewCaps(0, utils.MetaBusy)
anz := &analyzers.AnalyzerService{}
exp := jsonrpc2.NewJSONCodec(conn)
if r := newCapsBiRPCJSONCodec(conn, cr, nil); !reflect.DeepEqual(r, exp) {
t.Errorf("Expected: %v ,received:%v", exp, r)
}
exp = analyzers.NewAnalyzerBiRPCCodec(jsonrpc2.NewJSONCodec(conn), anz, rpcclient.BiRPCJSON, utils.Local, utils.Local)
if r := newCapsBiRPCJSONCodec(conn, cr, anz); !reflect.DeepEqual(r, exp) {
t.Errorf("Expected: %v ,received:%v", exp, r)
}
}