diff --git a/analyzers/codec.go b/analyzers/codec.go index f13cfdd20..e9a1395df 100644 --- a/analyzers/codec.go +++ b/analyzers/codec.go @@ -146,8 +146,8 @@ func (c *AnalyzerBiRPCCodec) ReadRequestBody(x interface{}) (err error) { func (c *AnalyzerBiRPCCodec) ReadResponseBody(x interface{}) (err error) { err = c.sc.ReadResponseBody(x) c.repsLk.Lock() - api := c.reqs[c.repIdx] - delete(c.reqs, c.repIdx) + api := c.reps[c.repIdx] + delete(c.reps, c.repIdx) c.repsLk.Unlock() go c.aS.logTrafic(api.ID, api.Method, api.Params, x, api.Error, c.enc, c.to, c.from, api.StartTime, time.Now()) return @@ -157,7 +157,7 @@ func (c *AnalyzerBiRPCCodec) ReadResponseBody(x interface{}) (err error) { func (c *AnalyzerBiRPCCodec) WriteRequest(req *rpc2.Request, x interface{}) error { c.repsLk.Lock() c.reqIdx = req.Seq - c.reqs[c.reqIdx] = &rpcAPI{ + c.reps[c.reqIdx] = &rpcAPI{ ID: req.Seq, Method: req.Method, StartTime: time.Now(), diff --git a/analyzers/codec_test.go b/analyzers/codec_test.go index 106ecd7e5..3ce4c078e 100644 --- a/analyzers/codec_test.go +++ b/analyzers/codec_test.go @@ -26,8 +26,10 @@ import ( "testing" "time" + "github.com/cenkalti/rpc2" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) type mockServerCodec struct{} @@ -97,3 +99,129 @@ func TestNewServerCodec(t *testing.T) { t.Fatal(err) } } + +type mockBiRPCCodec struct{} + +func (mockBiRPCCodec) ReadHeader(r *rpc2.Request, _ *rpc2.Response) error { + r.Seq = 0 + r.Method = utils.CoreSv1Ping + return nil +} +func (mockBiRPCCodec) ReadRequestBody(interface{}) error { return nil } +func (mockBiRPCCodec) ReadResponseBody(interface{}) error { return nil } +func (mockBiRPCCodec) WriteRequest(*rpc2.Request, interface{}) error { return nil } +func (mockBiRPCCodec) WriteResponse(*rpc2.Response, interface{}) error { return nil } +func (mockBiRPCCodec) Close() error { return nil } + +func TestNewBiRPCCodec(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers" + cfg.AnalyzerSCfg().TTL = 30 * time.Minute + if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { + t.Fatal(err) + } + if err := os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil { + t.Fatal(err) + } + anz, err := NewAnalyzerService(cfg) + if err != nil { + t.Fatal(err) + } + + codec := NewAnalyzerBiRPCCodec(new(mockBiRPCCodec), anz, rpcclient.BiRPCJSON, "127.0.0.1:5565", "127.0.0.1:2012") + r := new(rpc2.Request) + expR := &rpc2.Request{ + Seq: 0, + Method: utils.CoreSv1Ping, + } + if err = codec.ReadHeader(r, nil); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(r, expR) { + t.Errorf("Expected: %v ,received:%v", expR, r) + } + + if err = codec.ReadRequestBody("args"); err != nil { + t.Fatal(err) + } + if err = codec.WriteResponse(&rpc2.Response{ + Error: "error", + Seq: 0, + }, "reply"); err != nil { + t.Fatal(err) + } + if err = codec.Close(); err != nil { + t.Fatal(err) + } + time.Sleep(200 * time.Millisecond) + runtime.Gosched() + if cnt, err := anz.db.DocCount(); err != nil { + t.Fatal(err) + } else if cnt != 1 { + t.Errorf("Expected only one document received:%v", cnt) + } + if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { + t.Fatal(err) + } +} + +type mockBiRPCCodec2 struct{} + +func (mockBiRPCCodec2) ReadHeader(_ *rpc2.Request, r *rpc2.Response) error { + r.Seq = 0 + r.Error = "error" + return nil +} +func (mockBiRPCCodec2) ReadRequestBody(interface{}) error { return nil } +func (mockBiRPCCodec2) ReadResponseBody(interface{}) error { return nil } +func (mockBiRPCCodec2) WriteRequest(*rpc2.Request, interface{}) error { return nil } +func (mockBiRPCCodec2) WriteResponse(*rpc2.Response, interface{}) error { return nil } +func (mockBiRPCCodec2) Close() error { return nil } + +func TestNewBiRPCCodec2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers" + cfg.AnalyzerSCfg().TTL = 30 * time.Minute + if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { + t.Fatal(err) + } + if err := os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil { + t.Fatal(err) + } + anz, err := NewAnalyzerService(cfg) + if err != nil { + t.Fatal(err) + } + + codec := NewAnalyzerBiRPCCodec(new(mockBiRPCCodec2), anz, rpcclient.BiRPCJSON, "127.0.0.1:5565", "127.0.0.1:2012") + if err = codec.WriteRequest(&rpc2.Request{Seq: 0, Method: utils.CoreSv1Ping}, "args"); err != nil { + t.Fatal(err) + } + r := new(rpc2.Response) + expR := &rpc2.Response{ + Seq: 0, + Error: "error", + } + + if err = codec.ReadHeader(&rpc2.Request{}, r); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(r, expR) { + t.Errorf("Expected: %v ,received:%v", expR, r) + } + + if err = codec.ReadResponseBody("args"); err != nil { + t.Fatal(err) + } + if err = codec.Close(); err != nil { + t.Fatal(err) + } + time.Sleep(200 * time.Millisecond) + runtime.Gosched() + if cnt, err := anz.db.DocCount(); err != nil { + t.Fatal(err) + } else if cnt != 1 { + t.Errorf("Expected only one document received:%v", cnt) + } + if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { + t.Fatal(err) + } +} diff --git a/analyzers/connector_test.go b/analyzers/connector_test.go index 565aafb8d..be90c1811 100644 --- a/analyzers/connector_test.go +++ b/analyzers/connector_test.go @@ -21,12 +21,14 @@ package analyzers import ( "errors" "os" + "reflect" "runtime" "testing" "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) type mockConnector struct{} @@ -63,3 +65,72 @@ func TestNewAnalyzeConnector(t *testing.T) { t.Fatal(err) } } + +func (c *mockConnector) CallBiRPC(cl rpcclient.ClientConnector, serviceMethod string, args, reply interface{}) (err error) { + return c.Call(serviceMethod, args, reply) +} +func (c *mockConnector) Handlers() map[string]interface{} { return make(map[string]interface{}) } +func TestNewAnalyzeBiRPCConnector1(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + + cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers" + if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { + t.Fatal(err) + } + if err := os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil { + t.Fatal(err) + } + anz, err := NewAnalyzerService(cfg) + if err != nil { + t.Fatal(err) + } + rpc := anz.NewAnalyzerBiRPCConnector(new(mockConnector), utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012") + if err = rpc.Call(utils.CoreSv1Ping, "args", "reply"); err == nil || err.Error() != "error" { + t.Errorf("Expected 'error' received %v", err) + } + time.Sleep(100 * time.Millisecond) + runtime.Gosched() + if cnt, err := anz.db.DocCount(); err != nil { + t.Fatal(err) + } else if cnt != 1 { + t.Errorf("Expected only one document received:%v", cnt) + } + if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { + t.Fatal(err) + } +} + +func TestNewAnalyzeBiRPCConnector2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + + cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers" + if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { + t.Fatal(err) + } + if err := os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil { + t.Fatal(err) + } + anz, err := NewAnalyzerService(cfg) + if err != nil { + t.Fatal(err) + } + rpc := anz.NewAnalyzerBiRPCConnector(new(mockConnector), utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012") + if err = rpc.CallBiRPC(nil, utils.CoreSv1Ping, "args", "reply"); err == nil || err.Error() != "error" { + t.Errorf("Expected 'error' received %v", err) + } + time.Sleep(100 * time.Millisecond) + runtime.Gosched() + if cnt, err := anz.db.DocCount(); err != nil { + t.Fatal(err) + } else if cnt != 1 { + t.Errorf("Expected only one document received:%v", cnt) + } + if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { + t.Fatal(err) + } + + exp := make(map[string]interface{}) + if rply := rpc.Handlers(); !reflect.DeepEqual(rply, exp) { + t.Errorf("Expected: %v ,received:%v", exp, rply) + } +} diff --git a/cores/caps.go b/cores/caps.go index d308a0350..32ce98483 100644 --- a/cores/caps.go +++ b/cores/caps.go @@ -28,6 +28,7 @@ import ( "github.com/cgrates/cgrates/analyzers" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) type conn interface { @@ -122,7 +123,7 @@ func newCapsBiRPCGOBCodec(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerS if to != nil { tostr = to.String() } - return analyzers.NewAnalyzerBiRPCCodec(r, anz, utils.MetaGOB, fromstr, tostr) + return analyzers.NewAnalyzerBiRPCCodec(r, anz, rpcclient.BiRPCGOB, fromstr, tostr) } return } @@ -140,7 +141,7 @@ func newCapsBiRPCJSONCodec(conn conn, caps *engine.Caps, anz *analyzers.Analyzer if to != nil { tostr = to.String() } - return analyzers.NewAnalyzerBiRPCCodec(r, anz, utils.MetaJSON, fromstr, tostr) + return analyzers.NewAnalyzerBiRPCCodec(r, anz, rpcclient.BiRPCJSON, fromstr, tostr) } return } diff --git a/cores/caps_test.go b/cores/caps_test.go index 85e89c028..935dc51e0 100644 --- a/cores/caps_test.go +++ b/cores/caps_test.go @@ -25,9 +25,12 @@ import ( "syscall" "testing" + "github.com/cenkalti/rpc2" + jsonrpc2 "github.com/cenkalti/rpc2/jsonrpc" "github.com/cgrates/cgrates/analyzers" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) type mockServerCodec struct{} @@ -136,3 +139,108 @@ func TestNewCapsJSONCodec(t *testing.T) { t.Errorf("Expected: %v ,received:%v", exp, r) } } + +type mockBiRPCCodec struct{} + +func (mockBiRPCCodec) ReadHeader(r *rpc2.Request, _ *rpc2.Response) error { + r.Seq = 0 + r.Method = utils.CoreSv1Ping + return nil +} +func (mockBiRPCCodec) ReadRequestBody(interface{}) error { return utils.ErrNotImplemented } +func (mockBiRPCCodec) ReadResponseBody(interface{}) error { return nil } +func (mockBiRPCCodec) WriteRequest(*rpc2.Request, interface{}) error { return nil } +func (mockBiRPCCodec) WriteResponse(*rpc2.Response, interface{}) error { return nil } +func (mockBiRPCCodec) Close() error { return nil } + +func TestNewCapsBiRPCCodec(t *testing.T) { + mk := new(mockBiRPCCodec) + cr := engine.NewCaps(0, utils.MetaBusy) + if r := newCapsBiRPCCodec(mk, cr); !reflect.DeepEqual(mk, r) { + t.Errorf("Expected: %v ,received:%v", mk, r) + } + cr = engine.NewCaps(1, utils.MetaBusy) + exp := &capsBiRPCCodec{ + sc: mk, + caps: cr, + } + codec := newCapsBiRPCCodec(mk, cr) + if !reflect.DeepEqual(exp, codec) { + t.Errorf("Expected: %v ,received:%v", exp, codec) + } + var err error + r := new(rpc2.Request) + expR := &rpc2.Request{ + Seq: 0, + Method: utils.CoreSv1Ping, + } + if err = codec.ReadHeader(r, nil); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(r, expR) { + t.Errorf("Expected: %v ,received:%v", expR, r) + } + + if err = codec.ReadRequestBody("args"); err == nil || err != utils.ErrNotImplemented { + t.Fatal(err) + } + + if err = codec.ReadRequestBody("args"); err != utils.ErrMaxConcurentRPCExceededNoCaps { + t.Errorf("Expected error: %v ,received: %v ", utils.ErrMaxConcurentRPCExceededNoCaps, err) + } + + if err = codec.WriteResponse(&rpc2.Response{ + Error: "error", + Seq: 0, + }, "reply"); err != nil { + t.Fatal(err) + } + + if err = codec.ReadResponseBody(nil); err != nil { + t.Fatal(err) + } + + if err = codec.WriteRequest(&rpc2.Request{ + Seq: 0, + Method: utils.CoreSv1Ping, + }, "reply"); err != nil { + t.Fatal(err) + } + + if err = codec.WriteResponse(&rpc2.Response{ + Error: utils.ErrMaxConcurentRPCExceededNoCaps.Error(), + Seq: 0, + }, "reply"); err != nil { + t.Fatal(err) + } + if err = codec.Close(); err != nil { + t.Fatal(err) + } +} + +func TestNewCapsGOBBiRPCCodec(t *testing.T) { + conn := new(mockConn) + cr := engine.NewCaps(0, utils.MetaBusy) + anz := &analyzers.AnalyzerService{} + exp := rpc2.NewGobCodec(conn) + if r := newCapsBiRPCGOBCodec(conn, cr, nil); !reflect.DeepEqual(r, exp) { + t.Errorf("Expected: %v ,received:%v", exp, r) + } + exp = analyzers.NewAnalyzerBiRPCCodec(rpc2.NewGobCodec(conn), anz, rpcclient.BiRPCGOB, utils.Local, utils.Local) + if r := newCapsBiRPCGOBCodec(conn, cr, anz); !reflect.DeepEqual(r, exp) { + t.Errorf("Expected: %v ,received:%v", exp, r) + } +} + +func TestNewCapsJSONBiRPCCodec(t *testing.T) { + conn := new(mockConn) + cr := engine.NewCaps(0, utils.MetaBusy) + anz := &analyzers.AnalyzerService{} + exp := jsonrpc2.NewJSONCodec(conn) + if r := newCapsBiRPCJSONCodec(conn, cr, nil); !reflect.DeepEqual(r, exp) { + t.Errorf("Expected: %v ,received:%v", exp, r) + } + exp = analyzers.NewAnalyzerBiRPCCodec(jsonrpc2.NewJSONCodec(conn), anz, rpcclient.BiRPCJSON, utils.Local, utils.Local) + if r := newCapsBiRPCJSONCodec(conn, cr, anz); !reflect.DeepEqual(r, exp) { + t.Errorf("Expected: %v ,received:%v", exp, r) + } +}