Enable bijson support for SessionSv1 service

Add bidirectional support for sessions while maintaining changing the
current rpc service registration method. Modified methods in
sessionsbirpc.go file to satisfy the birpc.ClientConnector interface
and removed BiRPC prefix before creating the service that's to be
registered.
This commit is contained in:
ionutboangiu
2023-04-21 21:33:06 -04:00
committed by Dan Christian Bogos
parent 259fb83c47
commit 85cc7e03f0
4 changed files with 62 additions and 135 deletions

View File

@@ -19,189 +19,156 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package v1
import (
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
)
// Bidirectional JSON methods following
func (ssv1 *SessionSv1) Handlers() map[string]interface{} {
return map[string]interface{}{
utils.SessionSv1GetActiveSessions: ssv1.BiRPCv1GetActiveSessions,
utils.SessionSv1GetActiveSessionsCount: ssv1.BiRPCv1GetActiveSessionsCount,
utils.SessionSv1GetPassiveSessions: ssv1.BiRPCv1GetPassiveSessions,
utils.SessionSv1GetPassiveSessionsCount: ssv1.BiRPCv1GetPassiveSessionsCount,
utils.SessionSv1AuthorizeEvent: ssv1.BiRPCv1AuthorizeEvent,
utils.SessionSv1AuthorizeEventWithDigest: ssv1.BiRPCv1AuthorizeEventWithDigest,
utils.SessionSv1InitiateSession: ssv1.BiRPCv1InitiateSession,
utils.SessionSv1InitiateSessionWithDigest: ssv1.BiRPCv1InitiateSessionWithDigest,
utils.SessionSv1UpdateSession: ssv1.BiRPCv1UpdateSession,
utils.SessionSv1SyncSessions: ssv1.BiRPCv1SyncSessions,
utils.SessionSv1TerminateSession: ssv1.BiRPCv1TerminateSession,
utils.SessionSv1ProcessCDR: ssv1.BiRPCv1ProcessCDR,
utils.SessionSv1ProcessMessage: ssv1.BiRPCv1ProcessMessage,
utils.SessionSv1ProcessEvent: ssv1.BiRPCv1ProcessEvent,
utils.SessionSv1ForceDisconnect: ssv1.BiRPCv1ForceDisconnect,
utils.SessionSv1RegisterInternalBiJSONConn: ssv1.BiRPCv1RegisterInternalBiJSONConn,
utils.SessionSv1Ping: ssv1.BiRPCPing,
utils.SessionSv1ReplicateSessions: ssv1.BiRPCv1ReplicateSessions,
utils.SessionSv1SetPassiveSession: ssv1.BiRPCv1SetPassiveSession,
utils.SessionSv1ActivateSessions: ssv1.BiRPCv1ActivateSessions,
utils.SessionSv1DeactivateSessions: ssv1.BiRPCv1DeactivateSessions,
utils.SessionSv1Sleep: ssv1.BiRPCV1Sleep, // Sleep method is used to test the concurrent requests mechanism
}
}
func (ssv1 *SessionSv1) BiRPCv1AuthorizeEvent(clnt birpc.ClientConnector, args *sessions.V1AuthorizeArgs,
func (ssv1 *SessionSv1) BiRPCv1AuthorizeEvent(ctx *context.Context, args *sessions.V1AuthorizeArgs,
rply *sessions.V1AuthorizeReply) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return
}
defer utils.ConReqs.Deallocate()
return ssv1.Ss.BiRPCv1AuthorizeEvent(clnt, args, rply)
return ssv1.Ss.BiRPCv1AuthorizeEvent(ctx.Client, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1AuthorizeEventWithDigest(clnt birpc.ClientConnector, args *sessions.V1AuthorizeArgs,
func (ssv1 *SessionSv1) BiRPCv1AuthorizeEventWithDigest(ctx *context.Context, args *sessions.V1AuthorizeArgs,
rply *sessions.V1AuthorizeReplyWithDigest) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return
}
defer utils.ConReqs.Deallocate()
return ssv1.Ss.BiRPCv1AuthorizeEventWithDigest(clnt, args, rply)
return ssv1.Ss.BiRPCv1AuthorizeEventWithDigest(ctx.Client, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1InitiateSession(clnt birpc.ClientConnector, args *sessions.V1InitSessionArgs,
func (ssv1 *SessionSv1) BiRPCv1InitiateSession(ctx *context.Context, args *sessions.V1InitSessionArgs,
rply *sessions.V1InitSessionReply) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return
}
defer utils.ConReqs.Deallocate()
return ssv1.Ss.BiRPCv1InitiateSession(clnt, args, rply)
return ssv1.Ss.BiRPCv1InitiateSession(ctx.Client, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1InitiateSessionWithDigest(clnt birpc.ClientConnector, args *sessions.V1InitSessionArgs,
func (ssv1 *SessionSv1) BiRPCv1InitiateSessionWithDigest(ctx *context.Context, args *sessions.V1InitSessionArgs,
rply *sessions.V1InitReplyWithDigest) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return
}
defer utils.ConReqs.Deallocate()
return ssv1.Ss.BiRPCv1InitiateSessionWithDigest(clnt, args, rply)
return ssv1.Ss.BiRPCv1InitiateSessionWithDigest(ctx.Client, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1UpdateSession(clnt birpc.ClientConnector, args *sessions.V1UpdateSessionArgs,
func (ssv1 *SessionSv1) BiRPCv1UpdateSession(ctx *context.Context, args *sessions.V1UpdateSessionArgs,
rply *sessions.V1UpdateSessionReply) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return
}
defer utils.ConReqs.Deallocate()
return ssv1.Ss.BiRPCv1UpdateSession(clnt, args, rply)
return ssv1.Ss.BiRPCv1UpdateSession(ctx.Client, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1SyncSessions(clnt birpc.ClientConnector, args *string,
func (ssv1 *SessionSv1) BiRPCv1SyncSessions(ctx *context.Context, args *string,
rply *string) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return
}
defer utils.ConReqs.Deallocate()
return ssv1.Ss.BiRPCv1SyncSessions(clnt, "", rply)
return ssv1.Ss.BiRPCv1SyncSessions(ctx.Client, "", rply)
}
func (ssv1 *SessionSv1) BiRPCv1TerminateSession(clnt birpc.ClientConnector, args *sessions.V1TerminateSessionArgs,
func (ssv1 *SessionSv1) BiRPCv1TerminateSession(ctx *context.Context, args *sessions.V1TerminateSessionArgs,
rply *string) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return
}
defer utils.ConReqs.Deallocate()
return ssv1.Ss.BiRPCv1TerminateSession(clnt, args, rply)
return ssv1.Ss.BiRPCv1TerminateSession(ctx.Client, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1ProcessCDR(clnt birpc.ClientConnector, cgrEv *utils.CGREventWithArgDispatcher,
func (ssv1 *SessionSv1) BiRPCv1ProcessCDR(ctx *context.Context, cgrEv *utils.CGREventWithArgDispatcher,
rply *string) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return
}
defer utils.ConReqs.Deallocate()
return ssv1.Ss.BiRPCv1ProcessCDR(clnt, cgrEv, rply)
return ssv1.Ss.BiRPCv1ProcessCDR(ctx.Client, cgrEv, rply)
}
func (ssv1 *SessionSv1) BiRPCv1ProcessMessage(clnt birpc.ClientConnector, args *sessions.V1ProcessMessageArgs,
func (ssv1 *SessionSv1) BiRPCv1ProcessMessage(ctx *context.Context, args *sessions.V1ProcessMessageArgs,
rply *sessions.V1ProcessMessageReply) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return
}
defer utils.ConReqs.Deallocate()
return ssv1.Ss.BiRPCv1ProcessMessage(clnt, args, rply)
return ssv1.Ss.BiRPCv1ProcessMessage(ctx.Client, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1ProcessEvent(clnt birpc.ClientConnector, args *sessions.V1ProcessEventArgs,
func (ssv1 *SessionSv1) BiRPCv1ProcessEvent(ctx *context.Context, args *sessions.V1ProcessEventArgs,
rply *sessions.V1ProcessEventReply) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return
}
defer utils.ConReqs.Deallocate()
return ssv1.Ss.BiRPCv1ProcessEvent(clnt, args, rply)
return ssv1.Ss.BiRPCv1ProcessEvent(ctx.Client, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1GetActiveSessions(clnt birpc.ClientConnector, args *utils.SessionFilter,
func (ssv1 *SessionSv1) BiRPCv1GetActiveSessions(ctx *context.Context, args *utils.SessionFilter,
rply *[]*sessions.ExternalSession) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return
}
defer utils.ConReqs.Deallocate()
return ssv1.Ss.BiRPCv1GetActiveSessions(clnt, args, rply)
return ssv1.Ss.BiRPCv1GetActiveSessions(ctx.Client, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1GetActiveSessionsCount(clnt birpc.ClientConnector, args *utils.SessionFilter,
func (ssv1 *SessionSv1) BiRPCv1GetActiveSessionsCount(ctx *context.Context, args *utils.SessionFilter,
rply *int) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return
}
defer utils.ConReqs.Deallocate()
return ssv1.Ss.BiRPCv1GetActiveSessionsCount(clnt, args, rply)
return ssv1.Ss.BiRPCv1GetActiveSessionsCount(ctx.Client, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessions(clnt birpc.ClientConnector, args *utils.SessionFilter,
func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessions(ctx *context.Context, args *utils.SessionFilter,
rply *[]*sessions.ExternalSession) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return
}
defer utils.ConReqs.Deallocate()
return ssv1.Ss.BiRPCv1GetPassiveSessions(clnt, args, rply)
return ssv1.Ss.BiRPCv1GetPassiveSessions(ctx.Client, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessionsCount(clnt birpc.ClientConnector, args *utils.SessionFilter,
func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessionsCount(ctx *context.Context, args *utils.SessionFilter,
rply *int) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return
}
defer utils.ConReqs.Deallocate()
return ssv1.Ss.BiRPCv1GetPassiveSessionsCount(clnt, args, rply)
return ssv1.Ss.BiRPCv1GetPassiveSessionsCount(ctx.Client, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1ForceDisconnect(clnt birpc.ClientConnector, args *utils.SessionFilter,
func (ssv1 *SessionSv1) BiRPCv1ForceDisconnect(ctx *context.Context, args *utils.SessionFilter,
rply *string) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return
}
defer utils.ConReqs.Deallocate()
return ssv1.Ss.BiRPCv1ForceDisconnect(clnt, args, rply)
return ssv1.Ss.BiRPCv1ForceDisconnect(ctx.Client, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1RegisterInternalBiJSONConn(clnt birpc.ClientConnector, args string,
func (ssv1 *SessionSv1) BiRPCv1RegisterInternalBiJSONConn(ctx *context.Context, args string,
rply *string) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return
}
defer utils.ConReqs.Deallocate()
return ssv1.Ss.BiRPCv1RegisterInternalBiJSONConn(clnt, args, rply)
return ssv1.Ss.BiRPCv1RegisterInternalBiJSONConn(ctx.Client, args, rply)
}
func (ssv1 *SessionSv1) BiRPCPing(clnt birpc.ClientConnector, ign *utils.CGREventWithArgDispatcher,
func (ssv1 *SessionSv1) BiRPCv1Ping(ctx *context.Context, ign *utils.CGREventWithArgDispatcher,
reply *string) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return
@@ -210,43 +177,43 @@ func (ssv1 *SessionSv1) BiRPCPing(clnt birpc.ClientConnector, ign *utils.CGREven
return ssv1.Ping(ign, reply)
}
func (ssv1 *SessionSv1) BiRPCv1ReplicateSessions(clnt birpc.ClientConnector,
func (ssv1 *SessionSv1) BiRPCv1ReplicateSessions(ctx *context.Context,
args sessions.ArgsReplicateSessions, reply *string) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return
}
defer utils.ConReqs.Deallocate()
return ssv1.BiRPCv1ReplicateSessions(clnt, args, reply)
return ssv1.Ss.BiRPCv1ReplicateSessions(ctx.Client, args, reply)
}
func (ssv1 *SessionSv1) BiRPCv1SetPassiveSession(clnt birpc.ClientConnector,
func (ssv1 *SessionSv1) BiRPCv1SetPassiveSession(ctx *context.Context,
args *sessions.Session, reply *string) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return
}
defer utils.ConReqs.Deallocate()
return ssv1.Ss.BiRPCv1SetPassiveSession(clnt, args, reply)
return ssv1.Ss.BiRPCv1SetPassiveSession(ctx.Client, args, reply)
}
func (ssv1 *SessionSv1) BiRPCv1ActivateSessions(clnt birpc.ClientConnector,
func (ssv1 *SessionSv1) BiRPCv1ActivateSessions(ctx *context.Context,
args []string, reply *string) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return
}
defer utils.ConReqs.Deallocate()
return ssv1.Ss.BiRPCv1ActivateSessions(clnt, args, reply)
return ssv1.Ss.BiRPCv1ActivateSessions(ctx.Client, args, reply)
}
func (ssv1 *SessionSv1) BiRPCv1DeactivateSessions(clnt birpc.ClientConnector,
func (ssv1 *SessionSv1) BiRPCv1DeactivateSessions(ctx *context.Context,
args []string, reply *string) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return
}
defer utils.ConReqs.Deallocate()
return ssv1.Ss.BiRPCv1DeactivateSessions(clnt, args, reply)
return ssv1.Ss.BiRPCv1DeactivateSessions(ctx.Client, args, reply)
}
func (ssv1 *SessionSv1) BiRPCV1Sleep(clnt birpc.ClientConnector, args *utils.DurationArgs,
func (ssv1 *SessionSv1) BiRPCV1Sleep(ctx *context.Context, args *utils.DurationArgs,
reply *string) (err error) {
if err = utils.ConReqs.Allocate(); err != nil {
return

View File

@@ -24,6 +24,7 @@ import (
"testing"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
@@ -2431,7 +2432,7 @@ func (trpcp *TestRPCParameters) Hopa(in Attr, out *float64) error {
return nil
}
func (trpcp *TestRPCParameters) Call(serviceMethod string, args interface{}, reply interface{}) error {
func (trpcp *TestRPCParameters) Call(ctx *context.Context, serviceMethod string, args interface{}, reply interface{}) error {
parts := strings.Split(serviceMethod, ".")
if len(parts) != 2 {
return utils.ErrNotImplemented

View File

@@ -19,9 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"errors"
"fmt"
"reflect"
"strings"
"time"
"unicode"
@@ -114,60 +112,20 @@ func (s *RPCClientSet) Call(ctx *context.Context, method string, args interface{
return conn.Call(context.TODO(), method, args, reply)
}
func NewServiceWithName(val interface{}, name string, useName bool) (_ IntService, err error) {
var srv *birpc.Service
if srv, err = birpc.NewService(val, name, useName); err != nil {
func NewBiRPCService(val interface{}) (srv *birpc.Service, err error) {
var initialSrv *birpc.Service
if initialSrv, err = birpc.NewService(val, "", false); err != nil {
return
}
srv.Methods["Ping"] = pingM
s := IntService{srv.Name: srv}
for m, v := range srv.Methods {
m = strings.TrimPrefix(m, "BiRPC")
if len(m) < 2 || unicode.ToLower(rune(m[0])) != 'v' {
srv = new(birpc.Service)
*srv = *initialSrv
srv.Methods = make(map[string]*birpc.MethodType)
for mName, mType := range initialSrv.Methods {
mName = strings.TrimPrefix(mName, "BiRPC")
if len(mName) < 2 || unicode.ToLower(rune(mName[0])) != 'v' {
continue
}
key := srv.Name
if unicode.IsLower(rune(key[len(key)-1])) {
key += "V"
} else {
key += "v"
}
key += string(m[1])
srv2, has := s[key]
if !has {
srv2 = new(birpc.Service)
*srv2 = *srv
srv2.Name = key
srv2.Methods = map[string]*birpc.MethodType{"Ping": pingM}
s[key] = srv2
}
srv2.Methods[m[2:]] = v
srv.Methods[mName[2:]] = mType
}
return s, nil
}
type IntService map[string]*birpc.Service
func (s IntService) Call(ctx *context.Context, serviceMethod string, args, reply interface{}) error {
service, has := s[strings.Split(serviceMethod, utils.NestingSep)[0]]
if !has {
return errors.New("rpc: can't find service " + serviceMethod)
}
return service.Call(ctx, serviceMethod, args, reply)
}
func ping(_ interface{}, _ *context.Context, _ *utils.CGREvent, reply *string) error {
*reply = utils.Pong
return nil
}
var pingM = &birpc.MethodType{
Method: reflect.Method{
Name: "Ping",
Type: reflect.TypeOf(ping),
Func: reflect.ValueOf(ping),
},
ArgType: reflect.TypeOf(new(utils.CGREvent)),
ReplyType: reflect.TypeOf(new(string)),
return
}

View File

@@ -86,15 +86,14 @@ func (smg *SessionService) Start() (err error) {
}
}(smg.sm)
// Register RPC handler
srv, _ := engine.NewServiceWithName(smg.sm, utils.SessionS, true) // methods with multiple options
// Pass internal connection via BiRPCClient
smg.connChan <- smg.sm
// Register RPC handler
smg.rpc = v1.NewSMGenericV1(smg.sm)
smg.rpcv1 = v1.NewSessionSv1(smg.sm) // methods with multiple options
// Register RPC handler
if !smg.cfg.DispatcherSCfg().Enabled {
smg.server.RpcRegister(smg.rpc)
smg.server.RpcRegister(smg.rpcv1)
@@ -102,9 +101,11 @@ func (smg *SessionService) Start() (err error) {
// Register BiRpc handlers
if smg.cfg.SessionSCfg().ListenBijson != "" {
smg.bircpEnabled = true
for n, s := range srv {
smg.server.BiRPCRegisterName(n, s)
var srv *birpc.Service
if srv, err = engine.NewBiRPCService(smg.rpcv1); err != nil {
return
}
smg.server.BiRPCRegisterName(srv.Name, srv)
// run this in it's own goroutine
go func() {
if err := smg.server.ServeBiJSON(smg.cfg.SessionSCfg().ListenBijson, smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect); err != nil {