diff --git a/engine/connmanager_test.go b/engine/connmanager_test.go index 5e6f635a1..c02f64907 100644 --- a/engine/connmanager_test.go +++ b/engine/connmanager_test.go @@ -225,57 +225,387 @@ func TestCMgetConnWithConfigUnsupportedCodec(t *testing.T) { } } -// func TestCMgetConnWithConfigEmptyTransport(t *testing.T) { -// tmp := Cache -// defer func() { -// Cache = tmp -// }() +func TestCMgetConnWithConfigEmptyTransport(t *testing.T) { + tmp := Cache + defer func() { + Cache = tmp + }() -// connID := "connID" -// defaultCfg := config.NewDefaultCGRConfig() -// defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn() -// defaultCfg.RPCConns()[connID].Strategy = rpcclient.PoolParallel -// defaultCfg.RPCConns()[connID].Conns = []*config.RemoteHost{ -// { -// Address: "invalid", -// Transport: "", -// }, -// } + connID := "connID" + defaultCfg := config.NewDefaultCGRConfig() + defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn() + defaultCfg.RPCConns()[connID].Strategy = rpcclient.PoolParallel + defaultCfg.RPCConns()[connID].Conns = []*config.RemoteHost{ + { + Address: "invalid", + Transport: "", + }, + } -// cc := make(chan rpcclient.ClientConnector, 1) + cc := make(chan rpcclient.ClientConnector, 1) -// cM := &ConnManager{ -// cfg: defaultCfg, -// rpcInternal: map[string]chan rpcclient.ClientConnector{ -// connID: cc, -// }, -// connCache: ltcache.NewCache(-1, 0, true, nil), -// } + cM := &ConnManager{ + cfg: defaultCfg, + rpcInternal: map[string]chan rpcclient.ClientConnector{ + connID: cc, + }, + connCache: ltcache.NewCache(-1, 0, true, nil), + } -// cM.connCache.Set(connID, nil, nil) + cM.connCache.Set(connID, nil, nil) -// exp, err := rpcclient.NewRPCParallelClientPool(utils.TCP, "invalid", false, -// defaultCfg.TLSCfg().ClientKey, defaultCfg.TLSCfg().ClientCerificate, -// defaultCfg.TLSCfg().CaCertificate, defaultCfg.GeneralCfg().ConnectAttempts, -// defaultCfg.GeneralCfg().Reconnects, defaultCfg.GeneralCfg().ConnectTimeout, -// defaultCfg.GeneralCfg().ReplyTimeout, rpcclient.GOBrpc, nil, -// int64(defaultCfg.GeneralCfg().MaxParallelConns), false, nil) -// if err != nil { -// t.Fatal(err) -// } + rcv, err := cM.getConnWithConfig(connID, defaultCfg.RPCConns()[connID], nil, cc, true) -// rcv, err := cM.getConnWithConfig(connID, defaultCfg.RPCConns()[connID], nil, cc, true) + if err != nil { + t.Fatalf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err) + } -// if err != nil { -// t.Fatalf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err) -// } + if _, cancast := rcv.(*rpcclient.RPCParallelClientPool); !cancast { + t.Error("Expected value of type rpcclient.RPCParallelClientPool") + } +} -// if !reflect.DeepEqual(rcv, exp) { -// t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", exp, rcv) -// } -// } +type BiRPCConnectorMock struct { + calls map[string]func(rpcclient.ClientConnector, string, interface{}, interface{}) error +} -// func TestCMgetConnWithConfig2(t *testing.T) { +func (bRCM *BiRPCConnectorMock) Call(serviceMethod string, args interface{}, reply interface{}) (err error) { + return nil +} + +func (bRCM *BiRPCConnectorMock) CallBiRPC(cc rpcclient.ClientConnector, method string, args interface{}, reply interface{}) error { + if call, has := bRCM.calls[method]; !has { + return rpcclient.ErrUnsupporteServiceMethod + } else { + return call(cc, method, args, reply) + } +} + +func (bRCM *BiRPCConnectorMock) Handlers() map[string]interface{} { + return nil +} + +func TestCMgetConnWithConfigCallBiRPCNilErr(t *testing.T) { + tmp := Cache + defer func() { + Cache = tmp + }() + + connID := "connID" + defaultCfg := config.NewDefaultCGRConfig() + defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn() + defaultCfg.RPCConns()[connID].Conns = []*config.RemoteHost{ + { + ID: connID, + Address: rpcclient.BiRPCInternal, + Transport: rpcclient.BiRPCJSON, + }, + } + + cc := make(chan rpcclient.ClientConnector, 1) + birpc := &BiRPCConnectorMock{ + calls: map[string]func(rpcclient.ClientConnector, string, interface{}, interface{}) error{ + utils.SessionSv1RegisterInternalBiJSONConn: func(cc rpcclient.ClientConnector, s string, i1, i2 interface{}) error { + return nil + }, + }, + } + cc <- birpc + + cM := &ConnManager{ + cfg: defaultCfg, + rpcInternal: map[string]chan rpcclient.ClientConnector{ + connID: cc, + }, + connCache: ltcache.NewCache(-1, 0, true, nil), + } + + exp, err := NewRPCPool("*first", defaultCfg.TLSCfg().ClientKey, defaultCfg.TLSCfg().ClientCerificate, + defaultCfg.TLSCfg().CaCertificate, defaultCfg.GeneralCfg().ConnectAttempts, + defaultCfg.GeneralCfg().Reconnects, defaultCfg.GeneralCfg().ConnectTimeout, + defaultCfg.GeneralCfg().ReplyTimeout, defaultCfg.RPCConns()[connID].Conns, cc, + false, birpc, connID, cM.connCache) + if err != nil { + t.Fatal(err) + } + rcv, err := cM.getConnWithConfig(connID, defaultCfg.RPCConns()[connID], birpc, cc, true) + + if err != nil { + t.Fatalf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err) + } + + if !reflect.DeepEqual(rcv, exp) { + t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", exp, rcv) + } +} + +func TestCMgetConnWithConfigCallBiRPCErr(t *testing.T) { + tmp := Cache + defer func() { + Cache = tmp + }() + + connID := "connID" + defaultCfg := config.NewDefaultCGRConfig() + defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn() + defaultCfg.RPCConns()[connID].Conns = []*config.RemoteHost{ + { + ID: connID, + Address: rpcclient.BiRPCInternal, + Transport: rpcclient.BiRPCJSON, + }, + } + + cc := make(chan rpcclient.ClientConnector, 1) + birpc := &BiRPCConnectorMock{ + calls: map[string]func(rpcclient.ClientConnector, string, interface{}, interface{}) error{ + "wrong method": func(cc rpcclient.ClientConnector, s string, i1, i2 interface{}) error { + return nil + }, + }, + } + cc <- birpc + + cM := &ConnManager{ + cfg: defaultCfg, + rpcInternal: map[string]chan rpcclient.ClientConnector{ + connID: cc, + }, + connCache: ltcache.NewCache(-1, 0, true, nil), + } + + exp, err := NewRPCPool("*first", defaultCfg.TLSCfg().ClientKey, defaultCfg.TLSCfg().ClientCerificate, + defaultCfg.TLSCfg().CaCertificate, defaultCfg.GeneralCfg().ConnectAttempts, + defaultCfg.GeneralCfg().Reconnects, defaultCfg.GeneralCfg().ConnectTimeout, + defaultCfg.GeneralCfg().ReplyTimeout, defaultCfg.RPCConns()[connID].Conns, cc, + false, birpc, connID, cM.connCache) + if err != nil { + t.Fatal(err) + } + experr := rpcclient.ErrUnsupporteServiceMethod + rcv, err := cM.getConnWithConfig(connID, defaultCfg.RPCConns()[connID], birpc, cc, true) + + if err == nil || err != experr { + t.Fatalf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err) + } + + if !reflect.DeepEqual(rcv, exp) { + t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", exp, rcv) + } +} + +func TestCMgetConnWithConfigInternalRPCCodec(t *testing.T) { + tmp := Cache + defer func() { + Cache = tmp + }() + + connID := "connID" + defaultCfg := config.NewDefaultCGRConfig() + defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn() + defaultCfg.RPCConns()[connID].Strategy = rpcclient.PoolParallel + defaultCfg.RPCConns()[connID].Conns = []*config.RemoteHost{ + { + Address: rpcclient.InternalRPC, + }, + } + + cc := make(chan rpcclient.ClientConnector, 1) + + cM := &ConnManager{ + cfg: defaultCfg, + rpcInternal: map[string]chan rpcclient.ClientConnector{ + connID: cc, + }, + connCache: ltcache.NewCache(-1, 0, true, nil), + } + + rcv, err := cM.getConnWithConfig(connID, defaultCfg.RPCConns()[connID], nil, cc, true) + + if err != nil { + t.Fatalf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err) + } + + if _, cancast := rcv.(*rpcclient.RPCParallelClientPool); !cancast { + t.Error("Expected value of type rpcclient.RPCParallelClientPool") + } +} + +func TestCMgetConnWithConfigInternalBiRPCCodecUnsupported(t *testing.T) { + tmp := Cache + defer func() { + Cache = tmp + }() + + connID := "connID" + defaultCfg := config.NewDefaultCGRConfig() + defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn() + defaultCfg.RPCConns()[connID].Strategy = rpcclient.PoolParallel + defaultCfg.RPCConns()[connID].Conns = []*config.RemoteHost{ + { + Address: rpcclient.BiRPCInternal, + }, + } + + cc := make(chan rpcclient.ClientConnector, 1) + + cM := &ConnManager{ + cfg: defaultCfg, + rpcInternal: map[string]chan rpcclient.ClientConnector{ + connID: cc, + }, + connCache: ltcache.NewCache(-1, 0, true, nil), + } + + experr := rpcclient.ErrUnsupportedCodec + rcv, err := cM.getConnWithConfig(connID, defaultCfg.RPCConns()[connID], nil, cc, true) + + if err == nil || err != experr { + t.Fatalf("\nexpected: <%+v>, \nreceived: <%+v>", experr, err) + } + + if _, cancast := rcv.(*rpcclient.RPCParallelClientPool); !cancast { + t.Error("Expected value of type rpcclient.RPCParallelClientPool") + } +} + +func TestCMCallErrgetConn(t *testing.T) { + tmp := Cache + defer func() { + Cache = tmp + }() + + connID := "connID" + defaultCfg := config.NewDefaultCGRConfig() + defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn() + + cM := &ConnManager{ + cfg: defaultCfg, + } + + db := NewInternalDB(nil, nil, true) + dm := NewDataManager(db, defaultCfg.CacheCfg(), cM) + Cache = NewCacheS(defaultCfg, dm, nil) + Cache.SetWithoutReplicate(utils.CacheRPCConnections, connID, nil, nil, true, utils.NonTransactional) + + experr := utils.ErrNotFound + err := cM.Call([]string{connID}, nil, "", "", "") + + if err == nil || err != experr { + t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", experr, err) + } +} + +func TestCMCallWithConnIDsNoSubsHostIDs(t *testing.T) { + tmp := Cache + defer func() { + Cache = tmp + }() + + connID := "connID" + defaultCfg := config.NewDefaultCGRConfig() + defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn() + + cM := &ConnManager{ + cfg: defaultCfg, + } + subsHostIDs := utils.StringSet{} + + err := cM.CallWithConnIDs([]string{connID}, subsHostIDs, "", "", "") + + if err != nil { + t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err) + } +} + +func TestCMCallWithConnIDsNoConnIDs(t *testing.T) { + tmp := Cache + defer func() { + Cache = tmp + }() + + connID := "connID" + defaultCfg := config.NewDefaultCGRConfig() + defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn() + + cM := &ConnManager{ + cfg: defaultCfg, + } + subsHostIDs := utils.StringSet{ + "key": struct{}{}, + } + + experr := fmt.Sprintf("MANDATORY_IE_MISSING: [%s]", "connIDs") + err := cM.CallWithConnIDs([]string{}, subsHostIDs, "", "", "") + + if err == nil || err.Error() != experr { + t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", experr, err) + } +} + +func TestCMCallWithConnIDsNoConns(t *testing.T) { + tmp := Cache + defer func() { + Cache = tmp + }() + + connID := "connID" + defaultCfg := config.NewDefaultCGRConfig() + defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn() + defaultCfg.RPCConns()[connID].Conns = []*config.RemoteHost{ + { + ID: connID, + Address: rpcclient.InternalRPC, + }, + } + + cM := &ConnManager{ + cfg: defaultCfg, + } + subsHostIDs := utils.StringSet{ + "random": struct{}{}, + } + + err := cM.CallWithConnIDs([]string{connID}, subsHostIDs, "", "", "") + + if err != nil { + t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err) + } +} + +func TestCMCallWithConnIDsInternallyDCed(t *testing.T) { + tmp := Cache + defer func() { + Cache = tmp + }() + + connID := "connID" + defaultCfg := config.NewDefaultCGRConfig() + defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn() + defaultCfg.RPCConns()[connID].Conns = []*config.RemoteHost{ + { + ID: connID, + Address: rpcclient.InternalRPC, + }, + } + + cM := &ConnManager{ + cfg: defaultCfg, + connCache: ltcache.NewCache(-1, 0, true, nil), + } + subsHostIDs := utils.StringSet{ + connID: struct{}{}, + } + + experr := rpcclient.ErrInternallyDisconnected + err := cM.CallWithConnIDs([]string{connID}, subsHostIDs, "", "", "") + + if err == nil || err != experr { + t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err) + } +} + +// func TestCMCallWithConnIDs2(t *testing.T) { // tmp := Cache // defer func() { // Cache = tmp @@ -288,8 +618,8 @@ func TestCMgetConnWithConfigUnsupportedCodec(t *testing.T) { // defaultCfg.RPCConns()[connID].Conns = []*config.RemoteHost{ // { // ID: connID, -// Address: rpcclient.InternalRPC, -// Transport: rpcclient.BiRPCJSON, +// Address: "127.0.0.1:2012", +// Transport: rpcclient.JSONrpc, // }, // } @@ -302,16 +632,48 @@ func TestCMgetConnWithConfigUnsupportedCodec(t *testing.T) { // }, // connCache: ltcache.NewCache(-1, 0, true, nil), // } +// subsHostIDs := utils.StringSet{ +// connID: struct{}{}, +// } -// experr := rpcclient.ErrUnsupportedCodec -// var exp *rpcclient.RPCParallelClientPool -// rcv, err := cM.getConnWithConfig(connID, defaultCfg.RPCConns()[connID], nil, cc, true) +// _, err := net.Listen("tcp", ":2012") +// if err != nil { +// t.Error(err) +// } + +// args := new(utils.CGREvent) +// var reply string + +// experr := rpcclient.ErrInternallyDisconnected +// err = cM.CallWithConnIDs([]string{connID}, subsHostIDs, utils.CacheSv1Ping, args, &reply) // if err == nil || err != experr { -// t.Fatalf("\nexpected: <%+v>, \nreceived: <%+v>", experr, err) -// } - -// if !reflect.DeepEqual(rcv, exp) { -// t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", exp, rcv) +// t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err) // } // } + +// func TestCMReload(t *testing.T) { +// tmp := Cache +// defer func() { +// Cache = tmp +// }() + +// connID := "connID" +// defaultCfg := config.NewDefaultCGRConfig() +// defaultCfg.RPCConns()[connID] = config.NewDfltRPCConn() + +// cM := &ConnManager{ +// cfg: defaultCfg, +// connCache: ltcache.NewCache(-1, 0, true, nil), +// } +// cM.connCache.Set(connID, nil, nil) + +// db := NewInternalDB(nil, nil, true) +// dm := NewDataManager(db, defaultCfg.CacheCfg(), cM) +// Cache = NewCacheS(defaultCfg, dm, nil) +// Cache.SetWithoutReplicate(utils.CacheRPCConnections, connID, nil, nil, true, utils.NonTransactional) +// Cache.SetWithoutReplicate(utils.CacheReplicationHosts, connID, nil, nil, true, utils.NonTransactional) + +// cM.Reload() + +// }