diff --git a/apier/v1/concreqs_it_test.go b/apier/v1/concreqs_it_test.go index 1edeb3193..960a316ee 100644 --- a/apier/v1/concreqs_it_test.go +++ b/apier/v1/concreqs_it_test.go @@ -110,7 +110,7 @@ func testConcReqsStartEngine(t *testing.T) { } } -func handlePing(clnt *rpc2.Client, arg *DurationArgs, reply *string) error { +func handlePing(clnt *rpc2.Client, arg *utils.DurationArgs, reply *string) error { time.Sleep(arg.DurationTime) *reply = utils.OK return nil @@ -141,7 +141,7 @@ func testConcReqsBusyAPIs(t *testing.T) { go func() { var resp string if err := concReqsRPC.Call(utils.CoreSv1Sleep, - &DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)}, + &utils.DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)}, &resp); err != nil { lock.Lock() failedAPIs++ @@ -168,7 +168,7 @@ func testConcReqsQueueAPIs(t *testing.T) { go func() { var resp string if err := concReqsRPC.Call(utils.CoreSv1Sleep, - &DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)}, + &utils.DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)}, &resp); err != nil { wg.Done() t.Error(err) @@ -251,7 +251,7 @@ func testConcReqsOnBiJSONBusy(t *testing.T) { go func() { var resp string if err := concReqsBiRPC.Call(utils.SessionSv1Sleep, - &DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)}, + &utils.DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)}, &resp); err != nil { lock.Lock() failedAPIs++ @@ -278,7 +278,7 @@ func testConcReqsOnBiJSONQueue(t *testing.T) { go func() { var resp string if err := concReqsBiRPC.Call(utils.SessionSv1Sleep, - &DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)}, + &utils.DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)}, &resp); err != nil { wg.Done() t.Error(err) diff --git a/apier/v1/core.go b/apier/v1/core.go index 92e3f20d0..e049bf8c0 100644 --- a/apier/v1/core.go +++ b/apier/v1/core.go @@ -51,12 +51,8 @@ func (cS *CoreSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *string) err return nil } -type DurationArgs struct { - DurationTime time.Duration -} - // Sleep is used to test the concurrent requests mechanism -func (cS *CoreSv1) Sleep(arg *DurationArgs, reply *string) error { +func (cS *CoreSv1) Sleep(arg *utils.DurationArgs, reply *string) error { time.Sleep(arg.DurationTime) *reply = utils.OK return nil diff --git a/apier/v1/sessionsbirpc.go b/apier/v1/sessionsbirpc.go index e66d1f728..03dc43663 100644 --- a/apier/v1/sessionsbirpc.go +++ b/apier/v1/sessionsbirpc.go @@ -19,9 +19,8 @@ along with this program. If not, see package v1 import ( - "time" - "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" ) @@ -247,13 +246,11 @@ func (ssv1 *SessionSv1) BiRPCv1DeactivateSessions(clnt birpc.ClientConnector, return ssv1.Ss.BiRPCv1DeactivateSessions(clnt, args, reply) } -func (ssv1 *SessionSv1) BiRPCV1Sleep(clnt birpc.ClientConnector, arg *DurationArgs, +func (ssv1 *SessionSv1) BiRPCV1Sleep(clnt birpc.ClientConnector, args *utils.DurationArgs, reply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - time.Sleep(arg.DurationTime) - *reply = utils.OK - return nil + return ssv1.Ss.BiRPCv1Sleep(context.TODO(), args, reply) } diff --git a/engine/filters_test.go b/engine/filters_test.go index 8aa9e4c81..0a7013c20 100644 --- a/engine/filters_test.go +++ b/engine/filters_test.go @@ -24,9 +24,10 @@ import ( "testing" "time" + "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/rpcclient" ) func TestFilterPassString(t *testing.T) { @@ -2110,8 +2111,8 @@ func TestFilterSPass11(t *testing.T) { dm.SetResource(rsr) dm.SetAccount(acc) dm.SetStatQueue(sq) - clientConn := make(chan rpcclient.ClientConnector, 1) - clientConn <- clMock(func(serviceMethod string, args, reply interface{}) error { + clientConn := make(chan birpc.ClientConnector, 1) + clientConn <- clMock(func(ctx *context.Context, serviceMethod string, args, reply interface{}) error { if serviceMethod == utils.ResourceSv1GetResource { tntId, concat := args.(*utils.TenantID) if !concat { @@ -2166,7 +2167,7 @@ func TestFilterSPass11(t *testing.T) { }, }, } - connMgr := NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{ + connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources): clientConn, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStatS): clientConn, }) diff --git a/engine/storage_test.go b/engine/storage_test.go index 80f11d106..2001f4c79 100644 --- a/engine/storage_test.go +++ b/engine/storage_test.go @@ -23,9 +23,10 @@ import ( "testing" "time" + "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/rpcclient" ) func TestMsgpackStructsAdded(t *testing.T) { @@ -919,8 +920,8 @@ func TestTPRLoadAccountActionsFiltered(t *testing.T) { Disabled: true, } db.SetTPAccountActions([]*utils.TPAccountActions{qriedAA}) - clientConn := make(chan rpcclient.ClientConnector, 1) - clientConn <- clMock(func(serviceMethod string, _, reply interface{}) error { + clientConn := make(chan birpc.ClientConnector, 1) + clientConn <- clMock(func(ctx *context.Context, serviceMethod string, _, reply interface{}) error { if serviceMethod == utils.CacheSv1ReloadCache { *reply.(*string) = utils.OK return nil @@ -928,7 +929,7 @@ func TestTPRLoadAccountActionsFiltered(t *testing.T) { return utils.ErrNotImplemented }, ) - connMgr := NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{ + connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn, }) tpr, err := NewTpReader(db, db, "TP1", "UTC", []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)}, nil) @@ -1007,14 +1008,14 @@ func TestTprRealoadSched(t *testing.T) { if err := tpr.LoadActionPlans(); err != nil { t.Error(err) } - clientConn := make(chan rpcclient.ClientConnector, 1) - clientConn <- clMock(func(serviceMethod string, _, _ interface{}) error { + clientConn := make(chan birpc.ClientConnector, 1) + clientConn <- clMock(func(ctx *context.Context, serviceMethod string, _, _ interface{}) error { if serviceMethod == utils.SchedulerSv1Reload { return nil } return utils.ErrNotImplemented }) - connMgr := NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{ + connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.MetaScheduler): clientConn, }) SetConnManager(connMgr) @@ -1030,8 +1031,8 @@ func TestTprReloadCache(t *testing.T) { defer func() { SetConnManager(tmpConn) }() - clientConn := make(chan rpcclient.ClientConnector, 1) - clientConn <- clMock(func(serviceMethod string, args, _ interface{}) error { + clientConn := make(chan birpc.ClientConnector, 1) + clientConn <- clMock(func(ctx *context.Context, serviceMethod string, args, _ interface{}) error { if serviceMethod == utils.CacheSv1LoadCache { return nil } else if serviceMethod == utils.CacheSv1Clear { @@ -1039,7 +1040,7 @@ func TestTprReloadCache(t *testing.T) { } return utils.ErrNotImplemented }) - connMgr2 := NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{ + connMgr2 := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn, }) diff --git a/sessions/sessions.go b/sessions/sessions.go index faac4059d..359d98ae4 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -3511,6 +3511,14 @@ func (sS *SessionS) BiRPCv1DeactivateSessions(clnt birpc.ClientConnector, return } +// BiRPCv1Sleep mimics a request whose process takes the given amount of time to process +func (sS *SessionS) BiRPCv1Sleep(ctx *context.Context, args *utils.DurationArgs, + reply *string) (err error) { + time.Sleep(args.DurationTime) + *reply = utils.OK + return nil +} + // processThreshold will receive the event and send it to ThresholdS to be processed func (sS *SessionS) processThreshold(cgrEv *utils.CGREvent, argDisp *utils.ArgDispatcher, thIDs []string) (tIDs []string, err error) { if len(sS.cfg.SessionSCfg().ThreshSConns) == 0 { diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 3158ad94d..3a668993e 100755 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1435,3 +1435,7 @@ type GetMaxSessionTimeOnAccountsArgs struct { AccountIDs []string *ArgDispatcher } + +type DurationArgs struct { + DurationTime time.Duration +}