Add the implementation for Sleep api in sessions/sessions.go

Before the implementation was done directly in the SessionSv1 method. The argument definition
was also moved from apier/v1 to utils.
This commit is contained in:
ionutboangiu
2023-03-31 11:39:51 -04:00
committed by Dan Christian Bogos
parent cb7ea790de
commit 259fb83c47
7 changed files with 37 additions and 30 deletions

View File

@@ -110,7 +110,7 @@ func testConcReqsStartEngine(t *testing.T) {
}
}
func handlePing(clnt *rpc2.Client, arg *DurationArgs, reply *string) error {
func handlePing(clnt *rpc2.Client, arg *utils.DurationArgs, reply *string) error {
time.Sleep(arg.DurationTime)
*reply = utils.OK
return nil
@@ -141,7 +141,7 @@ func testConcReqsBusyAPIs(t *testing.T) {
go func() {
var resp string
if err := concReqsRPC.Call(utils.CoreSv1Sleep,
&DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)},
&utils.DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)},
&resp); err != nil {
lock.Lock()
failedAPIs++
@@ -168,7 +168,7 @@ func testConcReqsQueueAPIs(t *testing.T) {
go func() {
var resp string
if err := concReqsRPC.Call(utils.CoreSv1Sleep,
&DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)},
&utils.DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)},
&resp); err != nil {
wg.Done()
t.Error(err)
@@ -251,7 +251,7 @@ func testConcReqsOnBiJSONBusy(t *testing.T) {
go func() {
var resp string
if err := concReqsBiRPC.Call(utils.SessionSv1Sleep,
&DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)},
&utils.DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)},
&resp); err != nil {
lock.Lock()
failedAPIs++
@@ -278,7 +278,7 @@ func testConcReqsOnBiJSONQueue(t *testing.T) {
go func() {
var resp string
if err := concReqsBiRPC.Call(utils.SessionSv1Sleep,
&DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)},
&utils.DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)},
&resp); err != nil {
wg.Done()
t.Error(err)

View File

@@ -51,12 +51,8 @@ func (cS *CoreSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *string) err
return nil
}
type DurationArgs struct {
DurationTime time.Duration
}
// Sleep is used to test the concurrent requests mechanism
func (cS *CoreSv1) Sleep(arg *DurationArgs, reply *string) error {
func (cS *CoreSv1) Sleep(arg *utils.DurationArgs, reply *string) error {
time.Sleep(arg.DurationTime)
*reply = utils.OK
return nil

View File

@@ -19,9 +19,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package v1
import (
"time"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
)
@@ -247,13 +246,11 @@ func (ssv1 *SessionSv1) BiRPCv1DeactivateSessions(clnt birpc.ClientConnector,
return ssv1.Ss.BiRPCv1DeactivateSessions(clnt, args, reply)
}
func (ssv1 *SessionSv1) BiRPCV1Sleep(clnt birpc.ClientConnector, arg *DurationArgs,
func (ssv1 *SessionSv1) BiRPCV1Sleep(clnt birpc.ClientConnector, args *utils.DurationArgs,
reply *string) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return
}
defer utils.ConReqs.Deallocate()
time.Sleep(arg.DurationTime)
*reply = utils.OK
return nil
return ssv1.Ss.BiRPCv1Sleep(context.TODO(), args, reply)
}

View File

@@ -24,9 +24,10 @@ import (
"testing"
"time"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
func TestFilterPassString(t *testing.T) {
@@ -2110,8 +2111,8 @@ func TestFilterSPass11(t *testing.T) {
dm.SetResource(rsr)
dm.SetAccount(acc)
dm.SetStatQueue(sq)
clientConn := make(chan rpcclient.ClientConnector, 1)
clientConn <- clMock(func(serviceMethod string, args, reply interface{}) error {
clientConn := make(chan birpc.ClientConnector, 1)
clientConn <- clMock(func(ctx *context.Context, serviceMethod string, args, reply interface{}) error {
if serviceMethod == utils.ResourceSv1GetResource {
tntId, concat := args.(*utils.TenantID)
if !concat {
@@ -2166,7 +2167,7 @@ func TestFilterSPass11(t *testing.T) {
},
},
}
connMgr := NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources): clientConn,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStatS): clientConn,
})

View File

@@ -23,9 +23,10 @@ import (
"testing"
"time"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
func TestMsgpackStructsAdded(t *testing.T) {
@@ -919,8 +920,8 @@ func TestTPRLoadAccountActionsFiltered(t *testing.T) {
Disabled: true,
}
db.SetTPAccountActions([]*utils.TPAccountActions{qriedAA})
clientConn := make(chan rpcclient.ClientConnector, 1)
clientConn <- clMock(func(serviceMethod string, _, reply interface{}) error {
clientConn := make(chan birpc.ClientConnector, 1)
clientConn <- clMock(func(ctx *context.Context, serviceMethod string, _, reply interface{}) error {
if serviceMethod == utils.CacheSv1ReloadCache {
*reply.(*string) = utils.OK
return nil
@@ -928,7 +929,7 @@ func TestTPRLoadAccountActionsFiltered(t *testing.T) {
return utils.ErrNotImplemented
},
)
connMgr := NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn,
})
tpr, err := NewTpReader(db, db, "TP1", "UTC", []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)}, nil)
@@ -1007,14 +1008,14 @@ func TestTprRealoadSched(t *testing.T) {
if err := tpr.LoadActionPlans(); err != nil {
t.Error(err)
}
clientConn := make(chan rpcclient.ClientConnector, 1)
clientConn <- clMock(func(serviceMethod string, _, _ interface{}) error {
clientConn := make(chan birpc.ClientConnector, 1)
clientConn <- clMock(func(ctx *context.Context, serviceMethod string, _, _ interface{}) error {
if serviceMethod == utils.SchedulerSv1Reload {
return nil
}
return utils.ErrNotImplemented
})
connMgr := NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaScheduler): clientConn,
})
SetConnManager(connMgr)
@@ -1030,8 +1031,8 @@ func TestTprReloadCache(t *testing.T) {
defer func() {
SetConnManager(tmpConn)
}()
clientConn := make(chan rpcclient.ClientConnector, 1)
clientConn <- clMock(func(serviceMethod string, args, _ interface{}) error {
clientConn := make(chan birpc.ClientConnector, 1)
clientConn <- clMock(func(ctx *context.Context, serviceMethod string, args, _ interface{}) error {
if serviceMethod == utils.CacheSv1LoadCache {
return nil
} else if serviceMethod == utils.CacheSv1Clear {
@@ -1039,7 +1040,7 @@ func TestTprReloadCache(t *testing.T) {
}
return utils.ErrNotImplemented
})
connMgr2 := NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{
connMgr2 := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn,
})

View File

@@ -3511,6 +3511,14 @@ func (sS *SessionS) BiRPCv1DeactivateSessions(clnt birpc.ClientConnector,
return
}
// BiRPCv1Sleep mimics a request whose process takes the given amount of time to process
func (sS *SessionS) BiRPCv1Sleep(ctx *context.Context, args *utils.DurationArgs,
reply *string) (err error) {
time.Sleep(args.DurationTime)
*reply = utils.OK
return nil
}
// processThreshold will receive the event and send it to ThresholdS to be processed
func (sS *SessionS) processThreshold(cgrEv *utils.CGREvent, argDisp *utils.ArgDispatcher, thIDs []string) (tIDs []string, err error) {
if len(sS.cfg.SessionSCfg().ThreshSConns) == 0 {

View File

@@ -1435,3 +1435,7 @@ type GetMaxSessionTimeOnAccountsArgs struct {
AccountIDs []string
*ArgDispatcher
}
type DurationArgs struct {
DurationTime time.Duration
}