diff --git a/apis/sessions.go b/apis/sessions.go index 13ca848a9..50a6a5e6d 100644 --- a/apis/sessions.go +++ b/apis/sessions.go @@ -20,6 +20,7 @@ package apis import ( "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/dispatchers" "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" ) @@ -95,70 +96,60 @@ func (ssv1 *SessionSv1) GetActiveSessionsCount(ctx *context.Context, args *utils return ssv1.sS.BiRPCv1GetActiveSessionsCount(ctx, args, rply) } -/* - -func (ssv1 *SessionSv1) ForceDisconnect(ctx *context.Context,args *utils.SessionFilter, +func (ssv1 *SessionSv1) ForceDisconnect(ctx *context.Context, args *utils.SessionFilter, rply *string) error { return ssv1.sS.BiRPCv1ForceDisconnect(ctx, args, rply) } -func (ssv1 *SessionSv1) GetPassiveSessions(ctx *context.Context,args *utils.SessionFilter, +func (ssv1 *SessionSv1) GetPassiveSessions(ctx *context.Context, args *utils.SessionFilter, rply *[]*sessions.ExternalSession) error { return ssv1.sS.BiRPCv1GetPassiveSessions(ctx, args, rply) } -func (ssv1 *SessionSv1) GetPassiveSessionsCount(ctx *context.Context,args *utils.SessionFilter, +func (ssv1 *SessionSv1) GetPassiveSessionsCount(ctx *context.Context, args *utils.SessionFilter, rply *int) error { return ssv1.sS.BiRPCv1GetPassiveSessionsCount(ctx, args, rply) } -func (ssv1 *SessionSv1) Ping(ctx *context.Context,ign *utils.CGREvent, reply *string) error { - *reply = utils.Pong - return nil -} - -func (ssv1 *SessionSv1) ReplicateSessions(ctx *context.Context,args *dispatchers.ArgsReplicateSessionsWithAPIOpts, rply *string) error { +func (ssv1 *SessionSv1) ReplicateSessions(ctx *context.Context, args *dispatchers.ArgsReplicateSessionsWithAPIOpts, rply *string) error { return ssv1.sS.BiRPCv1ReplicateSessions(ctx, args.ArgsReplicateSessions, rply) } -func (ssv1 *SessionSv1) SetPassiveSession(ctx *context.Context,args *sessions.Session, +func (ssv1 *SessionSv1) SetPassiveSession(ctx *context.Context, args *sessions.Session, reply *string) error { return ssv1.sS.BiRPCv1SetPassiveSession(ctx, args, reply) } // ActivateSessions is called to activate a list/all sessions -func (ssv1 *SessionSv1) ActivateSessions(ctx *context.Context,args *utils.SessionIDsWithArgsDispatcher, reply *string) error { +func (ssv1 *SessionSv1) ActivateSessions(ctx *context.Context, args *utils.SessionIDsWithAPIOpts, reply *string) error { return ssv1.sS.BiRPCv1ActivateSessions(ctx, args, reply) } // DeactivateSessions is called to deactivate a list/all active sessios -func (ssv1 *SessionSv1) DeactivateSessions(ctx *context.Context,args *utils.SessionIDsWithArgsDispatcher, reply *string) error { +func (ssv1 *SessionSv1) DeactivateSessions(ctx *context.Context, args *utils.SessionIDsWithAPIOpts, reply *string) error { return ssv1.sS.BiRPCv1DeactivateSessions(ctx, args, reply) } -// Call implements rpcclient.ClientConnector interface for internal RPC -func (ssv1 *SessionSv1) Call(ctx *context.Context,serviceMethod string, - args interface{}, reply interface{}) error { - return utils.APIerRPCCall(ssv1, serviceMethod, args, reply) -} - // ReAuthorize sends the RAR for filterd sessions -func (ssv1 *SessionSv1) ReAuthorize(ctx *context.Context,args *utils.SessionFilter, reply *string) error { +func (ssv1 *SessionSv1) ReAuthorize(ctx *context.Context, args *utils.SessionFilter, reply *string) error { return ssv1.sS.BiRPCv1ReAuthorize(ctx, args, reply) } // DisconnectPeer sends the DPR for the OriginHost and OriginRealm -func (ssv1 *SessionSv1) DisconnectPeer(ctx *context.Context,args *utils.DPRArgs, reply *string) error { +func (ssv1 *SessionSv1) DisconnectPeer(ctx *context.Context, args *utils.DPRArgs, reply *string) error { return ssv1.sS.BiRPCv1DisconnectPeer(ctx, args, reply) } // STIRAuthenticate checks the identity using STIR/SHAKEN -func (ssv1 *SessionSv1) STIRAuthenticate(ctx *context.Context,args *sessions.V1STIRAuthenticateArgs, reply *string) error { +func (ssv1 *SessionSv1) STIRAuthenticate(ctx *context.Context, args *sessions.V1STIRAuthenticateArgs, reply *string) error { return ssv1.sS.BiRPCv1STIRAuthenticate(ctx, args, reply) } // STIRIdentity creates the identity for STIR/SHAKEN -func (ssv1 *SessionSv1) STIRIdentity(ctx *context.Context,args *sessions.V1STIRIdentityArgs, reply *string) error { +func (ssv1 *SessionSv1) STIRIdentity(ctx *context.Context, args *sessions.V1STIRIdentityArgs, reply *string) error { return ssv1.sS.BiRPCv1STIRIdentity(ctx, args, reply) } -*/ + +func (ssv1 *SessionSv1) RegisterInternalBiJSONConn(ctx *context.Context, args string, rply *string) (err error) { + return ssv1.sS.BiRPCv1RegisterInternalBiJSONConn(ctx, args, rply) +} diff --git a/sessions/sessions.go b/sessions/sessions.go index a21f8103b..739058732 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -1507,7 +1507,7 @@ type ArgsReplicateSessions struct { // args.Filter is used to filter the sessions which are replicated, CGRID is the only one possible for now func (sS *SessionS) BiRPCv1ReplicateSessions(ctx *context.Context, args ArgsReplicateSessions, reply *string) (err error) { - sS.replicateSessions(context.TODO(), args.CGRID, args.Passive, args.ConnIDs) + sS.replicateSessions(ctx, args.CGRID, args.Passive, args.ConnIDs) *reply = utils.OK return } @@ -3256,7 +3256,7 @@ func (sS *SessionS) BiRPCv1SyncSessions(ctx *context.Context, } // BiRPCv1ForceDisconnect will force disconnecting sessions matching sessions -func (sS *SessionS) BiRPCv1ForceDisconnect(clnt birpc.ClientConnector, +func (sS *SessionS) BiRPCv1ForceDisconnect(ctx *context.Context, args *utils.SessionFilter, reply *string) (err error) { if args == nil { //protection in case on nil args = &utils.SessionFilter{} @@ -3264,7 +3264,7 @@ func (sS *SessionS) BiRPCv1ForceDisconnect(clnt birpc.ClientConnector, if len(args.Filters) != 0 && sS.dm == nil { return utils.ErrNoDatabaseConn } - aSs := sS.filterSessions(context.TODO(), args, false) + aSs := sS.filterSessions(ctx, args, false) if len(aSs) == 0 { return utils.ErrNotFound } @@ -3274,7 +3274,7 @@ func (sS *SessionS) BiRPCv1ForceDisconnect(clnt birpc.ClientConnector, continue } ss[0].Lock() - if errTerm := sS.forceSTerminate(context.TODO(), ss[0], 0, nil, nil); errTerm != nil { + if errTerm := sS.forceSTerminate(ctx, ss[0], 0, nil, nil); errTerm != nil { utils.Logger.Warning( fmt.Sprintf( "<%s> failed force-terminating session with id: <%s>, err: <%s>", @@ -3292,16 +3292,16 @@ func (sS *SessionS) BiRPCv1ForceDisconnect(clnt birpc.ClientConnector, } // BiRPCv1RegisterInternalBiJSONConn will register the client for a bidirectional comunication -func (sS *SessionS) BiRPCv1RegisterInternalBiJSONConn(clnt birpc.ClientConnector, +func (sS *SessionS) BiRPCv1RegisterInternalBiJSONConn(ctx *context.Context, connID string, reply *string) error { - sS.RegisterIntBiJConn(clnt, connID) + sS.RegisterIntBiJConn(ctx.Client, connID) *reply = utils.OK return nil } // BiRPCv1ActivateSessions is called to activate a list/all sessions // returns utils.ErrPartiallyExecuted in case of errors -func (sS *SessionS) BiRPCv1ActivateSessions(clnt birpc.ClientConnector, +func (sS *SessionS) BiRPCv1ActivateSessions(ctx *context.Context, sIDs *utils.SessionIDsWithAPIOpts, reply *string) (err error) { if len(sIDs.IDs) == 0 { sS.pSsMux.RLock() @@ -3327,7 +3327,7 @@ func (sS *SessionS) BiRPCv1ActivateSessions(clnt birpc.ClientConnector, // BiRPCv1DeactivateSessions is called to deactivate a list/all active sessios // returns utils.ErrPartiallyExecuted in case of errors -func (sS *SessionS) BiRPCv1DeactivateSessions(clnt birpc.ClientConnector, +func (sS *SessionS) BiRPCv1DeactivateSessions(ctx *context.Context, sIDs *utils.SessionIDsWithAPIOpts, reply *string) (err error) { if len(sIDs.IDs) == 0 { sS.aSsMux.RLock() @@ -3496,6 +3496,7 @@ func (sS *SessionS) processAttributes(ctx *context.Context, cgrEv *utils.CGREven return } +/* // BiRPCV1GetMaxUsage returns the maximum usage as seconds, compatible with OpenSIPS 2.3 // DEPRECATED, it will be removed in future versions func (sS *SessionS) BiRPCV1GetMaxUsage(clnt birpc.ClientConnector, @@ -3605,8 +3606,8 @@ func (sS *SessionS) BiRPCV1ProcessCDR(clnt birpc.ClientConnector, Event: ev}, rply) } - -func (sS *SessionS) sendRar(s *Session) (err error) { +*/ +func (sS *SessionS) sendRar(ctx *context.Context, s *Session) (err error) { clnt := sS.biJClnt(s.ClientConnID) if clnt == nil { return fmt.Errorf("calling %s requires bidirectional JSON connection, connID: <%s>", @@ -3617,19 +3618,19 @@ func (sS *SessionS) sendRar(s *Session) (err error) { return } var rply string - if err = clnt.conn.Call(context.TODO(), utils.SessionSv1ReAuthorize, originID, &rply); err == utils.ErrNotImplemented { + if err = clnt.conn.Call(ctx, utils.SessionSv1ReAuthorize, originID, &rply); err == utils.ErrNotImplemented { err = nil } return } // BiRPCv1ReAuthorize sends a RAR for the matching sessions -func (sS *SessionS) BiRPCv1ReAuthorize(clnt birpc.ClientConnector, +func (sS *SessionS) BiRPCv1ReAuthorize(ctx *context.Context, args *utils.SessionFilter, reply *string) (err error) { if args == nil { //protection in case on nil args = &utils.SessionFilter{} } - aSs := sS.filterSessions(context.TODO(), args, false) + aSs := sS.filterSessions(ctx, args, false) if len(aSs) == 0 { return utils.ErrNotFound } @@ -3643,7 +3644,7 @@ func (sS *SessionS) BiRPCv1ReAuthorize(clnt birpc.ClientConnector, if len(ss) == 0 { continue } - if errTerm := sS.sendRar(ss[0]); errTerm != nil { + if errTerm := sS.sendRar(ctx, ss[0]); errTerm != nil { utils.Logger.Warning( fmt.Sprintf( "<%s> failed sending RAR for session with id: <%s>, err: <%s>", @@ -3659,7 +3660,7 @@ func (sS *SessionS) BiRPCv1ReAuthorize(clnt birpc.ClientConnector, } // BiRPCv1DisconnectPeer sends a DPR for the given OriginHost and OriginRealm -func (sS *SessionS) BiRPCv1DisconnectPeer(clnt birpc.ClientConnector, +func (sS *SessionS) BiRPCv1DisconnectPeer(ctx *context.Context, args *utils.DPRArgs, reply *string) (err error) { hasErrors := false clients := make(map[string]*biJClient) @@ -3669,7 +3670,7 @@ func (sS *SessionS) BiRPCv1DisconnectPeer(clnt birpc.ClientConnector, } sS.biJMux.RUnlock() for ID, clnt := range clients { - if err = clnt.conn.Call(context.TODO(), utils.SessionSv1DisconnectPeer, args, reply); err != nil && err != utils.ErrNotImplemented { + if err = clnt.conn.Call(ctx, utils.SessionSv1DisconnectPeer, args, reply); err != nil && err != utils.ErrNotImplemented { utils.Logger.Warning( fmt.Sprintf( "<%s> failed sending DPR for connection with id: <%s>, err: <%s>", @@ -3685,7 +3686,7 @@ func (sS *SessionS) BiRPCv1DisconnectPeer(clnt birpc.ClientConnector, } // BiRPCv1STIRAuthenticate the API for STIR checking -func (sS *SessionS) BiRPCv1STIRAuthenticate(clnt birpc.ClientConnector, +func (sS *SessionS) BiRPCv1STIRAuthenticate(ctx *context.Context, args *V1STIRAuthenticateArgs, reply *string) (err error) { attest := sS.cgrCfg.SessionSCfg().STIRCfg.AllowedAttest if len(args.Attest) != 0 { @@ -3697,7 +3698,7 @@ func (sS *SessionS) BiRPCv1STIRAuthenticate(clnt birpc.ClientConnector, return } } - if err = AuthStirShaken(context.TODO(), args.Identity, args.OriginatorTn, args.OriginatorURI, + if err = AuthStirShaken(ctx, args.Identity, args.OriginatorTn, args.OriginatorURI, args.DestinationTn, args.DestinationURI, attest, stirMaxDur); err != nil { return utils.NewSTIRError(err.Error()) } @@ -3706,7 +3707,7 @@ func (sS *SessionS) BiRPCv1STIRAuthenticate(clnt birpc.ClientConnector, } // BiRPCv1STIRIdentity the API for STIR header creation -func (sS *SessionS) BiRPCv1STIRIdentity(clnt birpc.ClientConnector, +func (sS *SessionS) BiRPCv1STIRIdentity(ctx *context.Context, args *V1STIRIdentityArgs, identity *string) (err error) { if args.Payload.ATTest == utils.EmptyString { args.Payload.ATTest = sS.cgrCfg.SessionSCfg().STIRCfg.DefaultAttest @@ -3715,7 +3716,7 @@ func (sS *SessionS) BiRPCv1STIRIdentity(clnt birpc.ClientConnector, args.Payload.IAT = time.Now().Unix() } if *identity, err = NewSTIRIdentity( - context.TODO(), + ctx, utils.NewPASSporTHeader(utils.FirstNonEmpty(args.PublicKeyPath, sS.cgrCfg.SessionSCfg().STIRCfg.PublicKeyPath)), args.Payload, utils.FirstNonEmpty(args.PrivateKeyPath, diff --git a/sessions/sessions_test.go b/sessions/sessions_test.go index 0765687ad..ef653b934 100644 --- a/sessions/sessions_test.go +++ b/sessions/sessions_test.go @@ -122,7 +122,7 @@ func TestBiRPCv1RegisterInternalBiJSONConn(t *testing.T) { client := &birpc.Service{} var reply string - if err := sessions.BiRPCv1RegisterInternalBiJSONConn(client, utils.EmptyString, &reply); err != nil { + if err := sessions.BiRPCv1RegisterInternalBiJSONConn(context.WithClient(context.Background(), client), utils.EmptyString, &reply); err != nil { t.Error(err) } else if reply != utils.OK { t.Errorf("Expected %+v, received %+v", reply, utils.OK) diff --git a/sessions/sessionscover_test.go b/sessions/sessionscover_test.go index 49a5d2b64..1494ff6c1 100644 --- a/sessions/sessionscover_test.go +++ b/sessions/sessionscover_test.go @@ -4535,14 +4535,14 @@ func (sT *mkCallForces) Call(ctx *context.Context, method string, arg interface{ func TestBiRPCv1ForceDisconnect(t *testing.T) { engine.Cache.Clear(nil) - client := new(mkCall) + ctx := context.WithClient(context.Background(), new(mkCall)) cfg := config.NewDefaultCGRConfig() data := engine.NewInternalDB(nil, nil, true) dm := engine.NewDataManager(data, cfg.CacheCfg(), nil) sessions := NewSessionS(cfg, dm, nil) var reply string - if err := sessions.BiRPCv1ForceDisconnect(client, nil, &reply); err == nil || err != utils.ErrNotFound { + if err := sessions.BiRPCv1ForceDisconnect(ctx, nil, &reply); err == nil || err != utils.ErrNotFound { t.Errorf("Expected %+v, received %+v", utils.ErrNotFound, err) } args := &utils.SessionFilter{ @@ -4552,7 +4552,7 @@ func TestBiRPCv1ForceDisconnect(t *testing.T) { } sessions.dm = nil - if err := sessions.BiRPCv1ForceDisconnect(client, args, &reply); err == nil || err != utils.ErrNoDatabaseConn { + if err := sessions.BiRPCv1ForceDisconnect(ctx, args, &reply); err == nil || err != utils.ErrNoDatabaseConn { t.Errorf("Expected %+v, received %+v", utils.ErrNoDatabaseConn, err) } sessions.dm = dm @@ -4594,7 +4594,7 @@ func TestBiRPCv1ForceDisconnect(t *testing.T) { } time.Sleep(50 * time.Millisecond) - if err := sessions.BiRPCv1ForceDisconnect(client, args, &reply); err != nil { + if err := sessions.BiRPCv1ForceDisconnect(ctx, args, &reply); err != nil { t.Error(err) } else if reply != utils.OK { t.Errorf("Unexpected reply returned") @@ -4613,7 +4613,7 @@ func TestBiRPCv1ForceDisconnect(t *testing.T) { conn: testMk, }, } - if err := sessions.BiRPCv1ForceDisconnect(client, args, &reply); err != nil { + if err := sessions.BiRPCv1ForceDisconnect(ctx, args, &reply); err != nil { t.Error(err) } else if reply != utils.OK { t.Errorf("Unexpected reply returned")