mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Cover funcs in engine/connmanager.go
This commit is contained in:
committed by
Dan Christian Bogos
parent
16c429f479
commit
4b7b9fa247
@@ -225,57 +225,387 @@ func TestCMgetConnWithConfigUnsupportedCodec(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// func TestCMgetConnWithConfigEmptyTransport(t *testing.T) {
|
||||
// tmp := Cache
|
||||
// defer func() {
|
||||
// Cache = tmp
|
||||
// }()
|
||||
func TestCMgetConnWithConfigEmptyTransport(t *testing.T) {
|
||||
tmp := Cache
|
||||
defer func() {
|
||||
Cache = tmp
|
||||
}()
|
||||
|
||||
// connID := "connID"
|
||||
// defaultCfg := config.NewDefaultCGRConfig()
|
||||
// defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn()
|
||||
// defaultCfg.RPCConns()[connID].Strategy = rpcclient.PoolParallel
|
||||
// defaultCfg.RPCConns()[connID].Conns = []*config.RemoteHost{
|
||||
// {
|
||||
// Address: "invalid",
|
||||
// Transport: "",
|
||||
// },
|
||||
// }
|
||||
connID := "connID"
|
||||
defaultCfg := config.NewDefaultCGRConfig()
|
||||
defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn()
|
||||
defaultCfg.RPCConns()[connID].Strategy = rpcclient.PoolParallel
|
||||
defaultCfg.RPCConns()[connID].Conns = []*config.RemoteHost{
|
||||
{
|
||||
Address: "invalid",
|
||||
Transport: "",
|
||||
},
|
||||
}
|
||||
|
||||
// cc := make(chan rpcclient.ClientConnector, 1)
|
||||
cc := make(chan rpcclient.ClientConnector, 1)
|
||||
|
||||
// cM := &ConnManager{
|
||||
// cfg: defaultCfg,
|
||||
// rpcInternal: map[string]chan rpcclient.ClientConnector{
|
||||
// connID: cc,
|
||||
// },
|
||||
// connCache: ltcache.NewCache(-1, 0, true, nil),
|
||||
// }
|
||||
cM := &ConnManager{
|
||||
cfg: defaultCfg,
|
||||
rpcInternal: map[string]chan rpcclient.ClientConnector{
|
||||
connID: cc,
|
||||
},
|
||||
connCache: ltcache.NewCache(-1, 0, true, nil),
|
||||
}
|
||||
|
||||
// cM.connCache.Set(connID, nil, nil)
|
||||
cM.connCache.Set(connID, nil, nil)
|
||||
|
||||
// exp, err := rpcclient.NewRPCParallelClientPool(utils.TCP, "invalid", false,
|
||||
// defaultCfg.TLSCfg().ClientKey, defaultCfg.TLSCfg().ClientCerificate,
|
||||
// defaultCfg.TLSCfg().CaCertificate, defaultCfg.GeneralCfg().ConnectAttempts,
|
||||
// defaultCfg.GeneralCfg().Reconnects, defaultCfg.GeneralCfg().ConnectTimeout,
|
||||
// defaultCfg.GeneralCfg().ReplyTimeout, rpcclient.GOBrpc, nil,
|
||||
// int64(defaultCfg.GeneralCfg().MaxParallelConns), false, nil)
|
||||
// if err != nil {
|
||||
// t.Fatal(err)
|
||||
// }
|
||||
rcv, err := cM.getConnWithConfig(connID, defaultCfg.RPCConns()[connID], nil, cc, true)
|
||||
|
||||
// rcv, err := cM.getConnWithConfig(connID, defaultCfg.RPCConns()[connID], nil, cc, true)
|
||||
if err != nil {
|
||||
t.Fatalf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err)
|
||||
}
|
||||
|
||||
// if err != nil {
|
||||
// t.Fatalf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err)
|
||||
// }
|
||||
if _, cancast := rcv.(*rpcclient.RPCParallelClientPool); !cancast {
|
||||
t.Error("Expected value of type rpcclient.RPCParallelClientPool")
|
||||
}
|
||||
}
|
||||
|
||||
// if !reflect.DeepEqual(rcv, exp) {
|
||||
// t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", exp, rcv)
|
||||
// }
|
||||
// }
|
||||
type BiRPCConnectorMock struct {
|
||||
calls map[string]func(rpcclient.ClientConnector, string, interface{}, interface{}) error
|
||||
}
|
||||
|
||||
// func TestCMgetConnWithConfig2(t *testing.T) {
|
||||
func (bRCM *BiRPCConnectorMock) Call(serviceMethod string, args interface{}, reply interface{}) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (bRCM *BiRPCConnectorMock) CallBiRPC(cc rpcclient.ClientConnector, method string, args interface{}, reply interface{}) error {
|
||||
if call, has := bRCM.calls[method]; !has {
|
||||
return rpcclient.ErrUnsupporteServiceMethod
|
||||
} else {
|
||||
return call(cc, method, args, reply)
|
||||
}
|
||||
}
|
||||
|
||||
func (bRCM *BiRPCConnectorMock) Handlers() map[string]interface{} {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestCMgetConnWithConfigCallBiRPCNilErr(t *testing.T) {
|
||||
tmp := Cache
|
||||
defer func() {
|
||||
Cache = tmp
|
||||
}()
|
||||
|
||||
connID := "connID"
|
||||
defaultCfg := config.NewDefaultCGRConfig()
|
||||
defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn()
|
||||
defaultCfg.RPCConns()[connID].Conns = []*config.RemoteHost{
|
||||
{
|
||||
ID: connID,
|
||||
Address: rpcclient.BiRPCInternal,
|
||||
Transport: rpcclient.BiRPCJSON,
|
||||
},
|
||||
}
|
||||
|
||||
cc := make(chan rpcclient.ClientConnector, 1)
|
||||
birpc := &BiRPCConnectorMock{
|
||||
calls: map[string]func(rpcclient.ClientConnector, string, interface{}, interface{}) error{
|
||||
utils.SessionSv1RegisterInternalBiJSONConn: func(cc rpcclient.ClientConnector, s string, i1, i2 interface{}) error {
|
||||
return nil
|
||||
},
|
||||
},
|
||||
}
|
||||
cc <- birpc
|
||||
|
||||
cM := &ConnManager{
|
||||
cfg: defaultCfg,
|
||||
rpcInternal: map[string]chan rpcclient.ClientConnector{
|
||||
connID: cc,
|
||||
},
|
||||
connCache: ltcache.NewCache(-1, 0, true, nil),
|
||||
}
|
||||
|
||||
exp, err := NewRPCPool("*first", defaultCfg.TLSCfg().ClientKey, defaultCfg.TLSCfg().ClientCerificate,
|
||||
defaultCfg.TLSCfg().CaCertificate, defaultCfg.GeneralCfg().ConnectAttempts,
|
||||
defaultCfg.GeneralCfg().Reconnects, defaultCfg.GeneralCfg().ConnectTimeout,
|
||||
defaultCfg.GeneralCfg().ReplyTimeout, defaultCfg.RPCConns()[connID].Conns, cc,
|
||||
false, birpc, connID, cM.connCache)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rcv, err := cM.getConnWithConfig(connID, defaultCfg.RPCConns()[connID], birpc, cc, true)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(rcv, exp) {
|
||||
t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", exp, rcv)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCMgetConnWithConfigCallBiRPCErr(t *testing.T) {
|
||||
tmp := Cache
|
||||
defer func() {
|
||||
Cache = tmp
|
||||
}()
|
||||
|
||||
connID := "connID"
|
||||
defaultCfg := config.NewDefaultCGRConfig()
|
||||
defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn()
|
||||
defaultCfg.RPCConns()[connID].Conns = []*config.RemoteHost{
|
||||
{
|
||||
ID: connID,
|
||||
Address: rpcclient.BiRPCInternal,
|
||||
Transport: rpcclient.BiRPCJSON,
|
||||
},
|
||||
}
|
||||
|
||||
cc := make(chan rpcclient.ClientConnector, 1)
|
||||
birpc := &BiRPCConnectorMock{
|
||||
calls: map[string]func(rpcclient.ClientConnector, string, interface{}, interface{}) error{
|
||||
"wrong method": func(cc rpcclient.ClientConnector, s string, i1, i2 interface{}) error {
|
||||
return nil
|
||||
},
|
||||
},
|
||||
}
|
||||
cc <- birpc
|
||||
|
||||
cM := &ConnManager{
|
||||
cfg: defaultCfg,
|
||||
rpcInternal: map[string]chan rpcclient.ClientConnector{
|
||||
connID: cc,
|
||||
},
|
||||
connCache: ltcache.NewCache(-1, 0, true, nil),
|
||||
}
|
||||
|
||||
exp, err := NewRPCPool("*first", defaultCfg.TLSCfg().ClientKey, defaultCfg.TLSCfg().ClientCerificate,
|
||||
defaultCfg.TLSCfg().CaCertificate, defaultCfg.GeneralCfg().ConnectAttempts,
|
||||
defaultCfg.GeneralCfg().Reconnects, defaultCfg.GeneralCfg().ConnectTimeout,
|
||||
defaultCfg.GeneralCfg().ReplyTimeout, defaultCfg.RPCConns()[connID].Conns, cc,
|
||||
false, birpc, connID, cM.connCache)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
experr := rpcclient.ErrUnsupporteServiceMethod
|
||||
rcv, err := cM.getConnWithConfig(connID, defaultCfg.RPCConns()[connID], birpc, cc, true)
|
||||
|
||||
if err == nil || err != experr {
|
||||
t.Fatalf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(rcv, exp) {
|
||||
t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", exp, rcv)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCMgetConnWithConfigInternalRPCCodec(t *testing.T) {
|
||||
tmp := Cache
|
||||
defer func() {
|
||||
Cache = tmp
|
||||
}()
|
||||
|
||||
connID := "connID"
|
||||
defaultCfg := config.NewDefaultCGRConfig()
|
||||
defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn()
|
||||
defaultCfg.RPCConns()[connID].Strategy = rpcclient.PoolParallel
|
||||
defaultCfg.RPCConns()[connID].Conns = []*config.RemoteHost{
|
||||
{
|
||||
Address: rpcclient.InternalRPC,
|
||||
},
|
||||
}
|
||||
|
||||
cc := make(chan rpcclient.ClientConnector, 1)
|
||||
|
||||
cM := &ConnManager{
|
||||
cfg: defaultCfg,
|
||||
rpcInternal: map[string]chan rpcclient.ClientConnector{
|
||||
connID: cc,
|
||||
},
|
||||
connCache: ltcache.NewCache(-1, 0, true, nil),
|
||||
}
|
||||
|
||||
rcv, err := cM.getConnWithConfig(connID, defaultCfg.RPCConns()[connID], nil, cc, true)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err)
|
||||
}
|
||||
|
||||
if _, cancast := rcv.(*rpcclient.RPCParallelClientPool); !cancast {
|
||||
t.Error("Expected value of type rpcclient.RPCParallelClientPool")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCMgetConnWithConfigInternalBiRPCCodecUnsupported(t *testing.T) {
|
||||
tmp := Cache
|
||||
defer func() {
|
||||
Cache = tmp
|
||||
}()
|
||||
|
||||
connID := "connID"
|
||||
defaultCfg := config.NewDefaultCGRConfig()
|
||||
defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn()
|
||||
defaultCfg.RPCConns()[connID].Strategy = rpcclient.PoolParallel
|
||||
defaultCfg.RPCConns()[connID].Conns = []*config.RemoteHost{
|
||||
{
|
||||
Address: rpcclient.BiRPCInternal,
|
||||
},
|
||||
}
|
||||
|
||||
cc := make(chan rpcclient.ClientConnector, 1)
|
||||
|
||||
cM := &ConnManager{
|
||||
cfg: defaultCfg,
|
||||
rpcInternal: map[string]chan rpcclient.ClientConnector{
|
||||
connID: cc,
|
||||
},
|
||||
connCache: ltcache.NewCache(-1, 0, true, nil),
|
||||
}
|
||||
|
||||
experr := rpcclient.ErrUnsupportedCodec
|
||||
rcv, err := cM.getConnWithConfig(connID, defaultCfg.RPCConns()[connID], nil, cc, true)
|
||||
|
||||
if err == nil || err != experr {
|
||||
t.Fatalf("\nexpected: <%+v>, \nreceived: <%+v>", experr, err)
|
||||
}
|
||||
|
||||
if _, cancast := rcv.(*rpcclient.RPCParallelClientPool); !cancast {
|
||||
t.Error("Expected value of type rpcclient.RPCParallelClientPool")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCMCallErrgetConn(t *testing.T) {
|
||||
tmp := Cache
|
||||
defer func() {
|
||||
Cache = tmp
|
||||
}()
|
||||
|
||||
connID := "connID"
|
||||
defaultCfg := config.NewDefaultCGRConfig()
|
||||
defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn()
|
||||
|
||||
cM := &ConnManager{
|
||||
cfg: defaultCfg,
|
||||
}
|
||||
|
||||
db := NewInternalDB(nil, nil, true)
|
||||
dm := NewDataManager(db, defaultCfg.CacheCfg(), cM)
|
||||
Cache = NewCacheS(defaultCfg, dm, nil)
|
||||
Cache.SetWithoutReplicate(utils.CacheRPCConnections, connID, nil, nil, true, utils.NonTransactional)
|
||||
|
||||
experr := utils.ErrNotFound
|
||||
err := cM.Call([]string{connID}, nil, "", "", "")
|
||||
|
||||
if err == nil || err != experr {
|
||||
t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", experr, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCMCallWithConnIDsNoSubsHostIDs(t *testing.T) {
|
||||
tmp := Cache
|
||||
defer func() {
|
||||
Cache = tmp
|
||||
}()
|
||||
|
||||
connID := "connID"
|
||||
defaultCfg := config.NewDefaultCGRConfig()
|
||||
defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn()
|
||||
|
||||
cM := &ConnManager{
|
||||
cfg: defaultCfg,
|
||||
}
|
||||
subsHostIDs := utils.StringSet{}
|
||||
|
||||
err := cM.CallWithConnIDs([]string{connID}, subsHostIDs, "", "", "")
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCMCallWithConnIDsNoConnIDs(t *testing.T) {
|
||||
tmp := Cache
|
||||
defer func() {
|
||||
Cache = tmp
|
||||
}()
|
||||
|
||||
connID := "connID"
|
||||
defaultCfg := config.NewDefaultCGRConfig()
|
||||
defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn()
|
||||
|
||||
cM := &ConnManager{
|
||||
cfg: defaultCfg,
|
||||
}
|
||||
subsHostIDs := utils.StringSet{
|
||||
"key": struct{}{},
|
||||
}
|
||||
|
||||
experr := fmt.Sprintf("MANDATORY_IE_MISSING: [%s]", "connIDs")
|
||||
err := cM.CallWithConnIDs([]string{}, subsHostIDs, "", "", "")
|
||||
|
||||
if err == nil || err.Error() != experr {
|
||||
t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", experr, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCMCallWithConnIDsNoConns(t *testing.T) {
|
||||
tmp := Cache
|
||||
defer func() {
|
||||
Cache = tmp
|
||||
}()
|
||||
|
||||
connID := "connID"
|
||||
defaultCfg := config.NewDefaultCGRConfig()
|
||||
defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn()
|
||||
defaultCfg.RPCConns()[connID].Conns = []*config.RemoteHost{
|
||||
{
|
||||
ID: connID,
|
||||
Address: rpcclient.InternalRPC,
|
||||
},
|
||||
}
|
||||
|
||||
cM := &ConnManager{
|
||||
cfg: defaultCfg,
|
||||
}
|
||||
subsHostIDs := utils.StringSet{
|
||||
"random": struct{}{},
|
||||
}
|
||||
|
||||
err := cM.CallWithConnIDs([]string{connID}, subsHostIDs, "", "", "")
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCMCallWithConnIDsInternallyDCed(t *testing.T) {
|
||||
tmp := Cache
|
||||
defer func() {
|
||||
Cache = tmp
|
||||
}()
|
||||
|
||||
connID := "connID"
|
||||
defaultCfg := config.NewDefaultCGRConfig()
|
||||
defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn()
|
||||
defaultCfg.RPCConns()[connID].Conns = []*config.RemoteHost{
|
||||
{
|
||||
ID: connID,
|
||||
Address: rpcclient.InternalRPC,
|
||||
},
|
||||
}
|
||||
|
||||
cM := &ConnManager{
|
||||
cfg: defaultCfg,
|
||||
connCache: ltcache.NewCache(-1, 0, true, nil),
|
||||
}
|
||||
subsHostIDs := utils.StringSet{
|
||||
connID: struct{}{},
|
||||
}
|
||||
|
||||
experr := rpcclient.ErrInternallyDisconnected
|
||||
err := cM.CallWithConnIDs([]string{connID}, subsHostIDs, "", "", "")
|
||||
|
||||
if err == nil || err != experr {
|
||||
t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err)
|
||||
}
|
||||
}
|
||||
|
||||
// func TestCMCallWithConnIDs2(t *testing.T) {
|
||||
// tmp := Cache
|
||||
// defer func() {
|
||||
// Cache = tmp
|
||||
@@ -288,8 +618,8 @@ func TestCMgetConnWithConfigUnsupportedCodec(t *testing.T) {
|
||||
// defaultCfg.RPCConns()[connID].Conns = []*config.RemoteHost{
|
||||
// {
|
||||
// ID: connID,
|
||||
// Address: rpcclient.InternalRPC,
|
||||
// Transport: rpcclient.BiRPCJSON,
|
||||
// Address: "127.0.0.1:2012",
|
||||
// Transport: rpcclient.JSONrpc,
|
||||
// },
|
||||
// }
|
||||
|
||||
@@ -302,16 +632,48 @@ func TestCMgetConnWithConfigUnsupportedCodec(t *testing.T) {
|
||||
// },
|
||||
// connCache: ltcache.NewCache(-1, 0, true, nil),
|
||||
// }
|
||||
// subsHostIDs := utils.StringSet{
|
||||
// connID: struct{}{},
|
||||
// }
|
||||
|
||||
// experr := rpcclient.ErrUnsupportedCodec
|
||||
// var exp *rpcclient.RPCParallelClientPool
|
||||
// rcv, err := cM.getConnWithConfig(connID, defaultCfg.RPCConns()[connID], nil, cc, true)
|
||||
// _, err := net.Listen("tcp", ":2012")
|
||||
// if err != nil {
|
||||
// t.Error(err)
|
||||
// }
|
||||
|
||||
// args := new(utils.CGREvent)
|
||||
// var reply string
|
||||
|
||||
// experr := rpcclient.ErrInternallyDisconnected
|
||||
// err = cM.CallWithConnIDs([]string{connID}, subsHostIDs, utils.CacheSv1Ping, args, &reply)
|
||||
|
||||
// if err == nil || err != experr {
|
||||
// t.Fatalf("\nexpected: <%+v>, \nreceived: <%+v>", experr, err)
|
||||
// }
|
||||
|
||||
// if !reflect.DeepEqual(rcv, exp) {
|
||||
// t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", exp, rcv)
|
||||
// t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err)
|
||||
// }
|
||||
// }
|
||||
|
||||
// func TestCMReload(t *testing.T) {
|
||||
// tmp := Cache
|
||||
// defer func() {
|
||||
// Cache = tmp
|
||||
// }()
|
||||
|
||||
// connID := "connID"
|
||||
// defaultCfg := config.NewDefaultCGRConfig()
|
||||
// defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn()
|
||||
|
||||
// cM := &ConnManager{
|
||||
// cfg: defaultCfg,
|
||||
// connCache: ltcache.NewCache(-1, 0, true, nil),
|
||||
// }
|
||||
// cM.connCache.Set(connID, nil, nil)
|
||||
|
||||
// db := NewInternalDB(nil, nil, true)
|
||||
// dm := NewDataManager(db, defaultCfg.CacheCfg(), cM)
|
||||
// Cache = NewCacheS(defaultCfg, dm, nil)
|
||||
// Cache.SetWithoutReplicate(utils.CacheRPCConnections, connID, nil, nil, true, utils.NonTransactional)
|
||||
// Cache.SetWithoutReplicate(utils.CacheReplicationHosts, connID, nil, nil, true, utils.NonTransactional)
|
||||
|
||||
// cM.Reload()
|
||||
|
||||
// }
|
||||
|
||||
Reference in New Issue
Block a user