From 85cc7e03f00e499d4191020569a99ff25e1dbd71 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Fri, 21 Apr 2023 21:33:06 -0400 Subject: [PATCH] Enable bijson support for SessionSv1 service Add bidirectional support for sessions while maintaining changing the current rpc service registration method. Modified methods in sessionsbirpc.go file to satisfy the birpc.ClientConnector interface and removed BiRPC prefix before creating the service that's to be registered. --- apier/v1/sessionsbirpc.go | 117 ++++++++++++++------------------------ engine/actions_test.go | 3 +- engine/libengine.go | 64 ++++----------------- services/sessions.go | 13 +++-- 4 files changed, 62 insertions(+), 135 deletions(-) diff --git a/apier/v1/sessionsbirpc.go b/apier/v1/sessionsbirpc.go index 03dc43663..8cee97745 100644 --- a/apier/v1/sessionsbirpc.go +++ b/apier/v1/sessionsbirpc.go @@ -19,189 +19,156 @@ along with this program. If not, see package v1 import ( - "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" ) -// Bidirectional JSON methods following -func (ssv1 *SessionSv1) Handlers() map[string]interface{} { - return map[string]interface{}{ - utils.SessionSv1GetActiveSessions: ssv1.BiRPCv1GetActiveSessions, - utils.SessionSv1GetActiveSessionsCount: ssv1.BiRPCv1GetActiveSessionsCount, - utils.SessionSv1GetPassiveSessions: ssv1.BiRPCv1GetPassiveSessions, - utils.SessionSv1GetPassiveSessionsCount: ssv1.BiRPCv1GetPassiveSessionsCount, - - utils.SessionSv1AuthorizeEvent: ssv1.BiRPCv1AuthorizeEvent, - utils.SessionSv1AuthorizeEventWithDigest: ssv1.BiRPCv1AuthorizeEventWithDigest, - utils.SessionSv1InitiateSession: ssv1.BiRPCv1InitiateSession, - utils.SessionSv1InitiateSessionWithDigest: ssv1.BiRPCv1InitiateSessionWithDigest, - utils.SessionSv1UpdateSession: ssv1.BiRPCv1UpdateSession, - utils.SessionSv1SyncSessions: ssv1.BiRPCv1SyncSessions, - utils.SessionSv1TerminateSession: ssv1.BiRPCv1TerminateSession, - utils.SessionSv1ProcessCDR: ssv1.BiRPCv1ProcessCDR, - utils.SessionSv1ProcessMessage: ssv1.BiRPCv1ProcessMessage, - utils.SessionSv1ProcessEvent: ssv1.BiRPCv1ProcessEvent, - - utils.SessionSv1ForceDisconnect: ssv1.BiRPCv1ForceDisconnect, - utils.SessionSv1RegisterInternalBiJSONConn: ssv1.BiRPCv1RegisterInternalBiJSONConn, - utils.SessionSv1Ping: ssv1.BiRPCPing, - - utils.SessionSv1ReplicateSessions: ssv1.BiRPCv1ReplicateSessions, - utils.SessionSv1SetPassiveSession: ssv1.BiRPCv1SetPassiveSession, - utils.SessionSv1ActivateSessions: ssv1.BiRPCv1ActivateSessions, - utils.SessionSv1DeactivateSessions: ssv1.BiRPCv1DeactivateSessions, - - utils.SessionSv1Sleep: ssv1.BiRPCV1Sleep, // Sleep method is used to test the concurrent requests mechanism - } -} - -func (ssv1 *SessionSv1) BiRPCv1AuthorizeEvent(clnt birpc.ClientConnector, args *sessions.V1AuthorizeArgs, +func (ssv1 *SessionSv1) BiRPCv1AuthorizeEvent(ctx *context.Context, args *sessions.V1AuthorizeArgs, rply *sessions.V1AuthorizeReply) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1AuthorizeEvent(clnt, args, rply) + return ssv1.Ss.BiRPCv1AuthorizeEvent(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1AuthorizeEventWithDigest(clnt birpc.ClientConnector, args *sessions.V1AuthorizeArgs, +func (ssv1 *SessionSv1) BiRPCv1AuthorizeEventWithDigest(ctx *context.Context, args *sessions.V1AuthorizeArgs, rply *sessions.V1AuthorizeReplyWithDigest) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1AuthorizeEventWithDigest(clnt, args, rply) + return ssv1.Ss.BiRPCv1AuthorizeEventWithDigest(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1InitiateSession(clnt birpc.ClientConnector, args *sessions.V1InitSessionArgs, +func (ssv1 *SessionSv1) BiRPCv1InitiateSession(ctx *context.Context, args *sessions.V1InitSessionArgs, rply *sessions.V1InitSessionReply) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1InitiateSession(clnt, args, rply) + return ssv1.Ss.BiRPCv1InitiateSession(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1InitiateSessionWithDigest(clnt birpc.ClientConnector, args *sessions.V1InitSessionArgs, +func (ssv1 *SessionSv1) BiRPCv1InitiateSessionWithDigest(ctx *context.Context, args *sessions.V1InitSessionArgs, rply *sessions.V1InitReplyWithDigest) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1InitiateSessionWithDigest(clnt, args, rply) + return ssv1.Ss.BiRPCv1InitiateSessionWithDigest(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1UpdateSession(clnt birpc.ClientConnector, args *sessions.V1UpdateSessionArgs, +func (ssv1 *SessionSv1) BiRPCv1UpdateSession(ctx *context.Context, args *sessions.V1UpdateSessionArgs, rply *sessions.V1UpdateSessionReply) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1UpdateSession(clnt, args, rply) + return ssv1.Ss.BiRPCv1UpdateSession(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1SyncSessions(clnt birpc.ClientConnector, args *string, +func (ssv1 *SessionSv1) BiRPCv1SyncSessions(ctx *context.Context, args *string, rply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1SyncSessions(clnt, "", rply) + return ssv1.Ss.BiRPCv1SyncSessions(ctx.Client, "", rply) } -func (ssv1 *SessionSv1) BiRPCv1TerminateSession(clnt birpc.ClientConnector, args *sessions.V1TerminateSessionArgs, +func (ssv1 *SessionSv1) BiRPCv1TerminateSession(ctx *context.Context, args *sessions.V1TerminateSessionArgs, rply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1TerminateSession(clnt, args, rply) + return ssv1.Ss.BiRPCv1TerminateSession(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1ProcessCDR(clnt birpc.ClientConnector, cgrEv *utils.CGREventWithArgDispatcher, +func (ssv1 *SessionSv1) BiRPCv1ProcessCDR(ctx *context.Context, cgrEv *utils.CGREventWithArgDispatcher, rply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1ProcessCDR(clnt, cgrEv, rply) + return ssv1.Ss.BiRPCv1ProcessCDR(ctx.Client, cgrEv, rply) } -func (ssv1 *SessionSv1) BiRPCv1ProcessMessage(clnt birpc.ClientConnector, args *sessions.V1ProcessMessageArgs, +func (ssv1 *SessionSv1) BiRPCv1ProcessMessage(ctx *context.Context, args *sessions.V1ProcessMessageArgs, rply *sessions.V1ProcessMessageReply) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1ProcessMessage(clnt, args, rply) + return ssv1.Ss.BiRPCv1ProcessMessage(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1ProcessEvent(clnt birpc.ClientConnector, args *sessions.V1ProcessEventArgs, +func (ssv1 *SessionSv1) BiRPCv1ProcessEvent(ctx *context.Context, args *sessions.V1ProcessEventArgs, rply *sessions.V1ProcessEventReply) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1ProcessEvent(clnt, args, rply) + return ssv1.Ss.BiRPCv1ProcessEvent(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1GetActiveSessions(clnt birpc.ClientConnector, args *utils.SessionFilter, +func (ssv1 *SessionSv1) BiRPCv1GetActiveSessions(ctx *context.Context, args *utils.SessionFilter, rply *[]*sessions.ExternalSession) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1GetActiveSessions(clnt, args, rply) + return ssv1.Ss.BiRPCv1GetActiveSessions(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1GetActiveSessionsCount(clnt birpc.ClientConnector, args *utils.SessionFilter, +func (ssv1 *SessionSv1) BiRPCv1GetActiveSessionsCount(ctx *context.Context, args *utils.SessionFilter, rply *int) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1GetActiveSessionsCount(clnt, args, rply) + return ssv1.Ss.BiRPCv1GetActiveSessionsCount(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessions(clnt birpc.ClientConnector, args *utils.SessionFilter, +func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessions(ctx *context.Context, args *utils.SessionFilter, rply *[]*sessions.ExternalSession) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1GetPassiveSessions(clnt, args, rply) + return ssv1.Ss.BiRPCv1GetPassiveSessions(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessionsCount(clnt birpc.ClientConnector, args *utils.SessionFilter, +func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessionsCount(ctx *context.Context, args *utils.SessionFilter, rply *int) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1GetPassiveSessionsCount(clnt, args, rply) + return ssv1.Ss.BiRPCv1GetPassiveSessionsCount(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1ForceDisconnect(clnt birpc.ClientConnector, args *utils.SessionFilter, +func (ssv1 *SessionSv1) BiRPCv1ForceDisconnect(ctx *context.Context, args *utils.SessionFilter, rply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1ForceDisconnect(clnt, args, rply) + return ssv1.Ss.BiRPCv1ForceDisconnect(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1RegisterInternalBiJSONConn(clnt birpc.ClientConnector, args string, +func (ssv1 *SessionSv1) BiRPCv1RegisterInternalBiJSONConn(ctx *context.Context, args string, rply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1RegisterInternalBiJSONConn(clnt, args, rply) + return ssv1.Ss.BiRPCv1RegisterInternalBiJSONConn(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCPing(clnt birpc.ClientConnector, ign *utils.CGREventWithArgDispatcher, +func (ssv1 *SessionSv1) BiRPCv1Ping(ctx *context.Context, ign *utils.CGREventWithArgDispatcher, reply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return @@ -210,43 +177,43 @@ func (ssv1 *SessionSv1) BiRPCPing(clnt birpc.ClientConnector, ign *utils.CGREven return ssv1.Ping(ign, reply) } -func (ssv1 *SessionSv1) BiRPCv1ReplicateSessions(clnt birpc.ClientConnector, +func (ssv1 *SessionSv1) BiRPCv1ReplicateSessions(ctx *context.Context, args sessions.ArgsReplicateSessions, reply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.BiRPCv1ReplicateSessions(clnt, args, reply) + return ssv1.Ss.BiRPCv1ReplicateSessions(ctx.Client, args, reply) } -func (ssv1 *SessionSv1) BiRPCv1SetPassiveSession(clnt birpc.ClientConnector, +func (ssv1 *SessionSv1) BiRPCv1SetPassiveSession(ctx *context.Context, args *sessions.Session, reply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1SetPassiveSession(clnt, args, reply) + return ssv1.Ss.BiRPCv1SetPassiveSession(ctx.Client, args, reply) } -func (ssv1 *SessionSv1) BiRPCv1ActivateSessions(clnt birpc.ClientConnector, +func (ssv1 *SessionSv1) BiRPCv1ActivateSessions(ctx *context.Context, args []string, reply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1ActivateSessions(clnt, args, reply) + return ssv1.Ss.BiRPCv1ActivateSessions(ctx.Client, args, reply) } -func (ssv1 *SessionSv1) BiRPCv1DeactivateSessions(clnt birpc.ClientConnector, +func (ssv1 *SessionSv1) BiRPCv1DeactivateSessions(ctx *context.Context, args []string, reply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1DeactivateSessions(clnt, args, reply) + return ssv1.Ss.BiRPCv1DeactivateSessions(ctx.Client, args, reply) } -func (ssv1 *SessionSv1) BiRPCV1Sleep(clnt birpc.ClientConnector, args *utils.DurationArgs, +func (ssv1 *SessionSv1) BiRPCV1Sleep(ctx *context.Context, args *utils.DurationArgs, reply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return diff --git a/engine/actions_test.go b/engine/actions_test.go index 43f1d6999..3cf46fa6f 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" @@ -2431,7 +2432,7 @@ func (trpcp *TestRPCParameters) Hopa(in Attr, out *float64) error { return nil } -func (trpcp *TestRPCParameters) Call(serviceMethod string, args interface{}, reply interface{}) error { +func (trpcp *TestRPCParameters) Call(ctx *context.Context, serviceMethod string, args interface{}, reply interface{}) error { parts := strings.Split(serviceMethod, ".") if len(parts) != 2 { return utils.ErrNotImplemented diff --git a/engine/libengine.go b/engine/libengine.go index 164659ad9..5cb7a94a8 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -19,9 +19,7 @@ along with this program. If not, see package engine import ( - "errors" "fmt" - "reflect" "strings" "time" "unicode" @@ -114,60 +112,20 @@ func (s *RPCClientSet) Call(ctx *context.Context, method string, args interface{ return conn.Call(context.TODO(), method, args, reply) } -func NewServiceWithName(val interface{}, name string, useName bool) (_ IntService, err error) { - var srv *birpc.Service - if srv, err = birpc.NewService(val, name, useName); err != nil { +func NewBiRPCService(val interface{}) (srv *birpc.Service, err error) { + var initialSrv *birpc.Service + if initialSrv, err = birpc.NewService(val, "", false); err != nil { return } - srv.Methods["Ping"] = pingM - s := IntService{srv.Name: srv} - for m, v := range srv.Methods { - m = strings.TrimPrefix(m, "BiRPC") - if len(m) < 2 || unicode.ToLower(rune(m[0])) != 'v' { + srv = new(birpc.Service) + *srv = *initialSrv + srv.Methods = make(map[string]*birpc.MethodType) + for mName, mType := range initialSrv.Methods { + mName = strings.TrimPrefix(mName, "BiRPC") + if len(mName) < 2 || unicode.ToLower(rune(mName[0])) != 'v' { continue } - - key := srv.Name - if unicode.IsLower(rune(key[len(key)-1])) { - key += "V" - } else { - key += "v" - } - key += string(m[1]) - srv2, has := s[key] - if !has { - srv2 = new(birpc.Service) - *srv2 = *srv - srv2.Name = key - srv2.Methods = map[string]*birpc.MethodType{"Ping": pingM} - s[key] = srv2 - } - srv2.Methods[m[2:]] = v + srv.Methods[mName[2:]] = mType } - return s, nil -} - -type IntService map[string]*birpc.Service - -func (s IntService) Call(ctx *context.Context, serviceMethod string, args, reply interface{}) error { - service, has := s[strings.Split(serviceMethod, utils.NestingSep)[0]] - if !has { - return errors.New("rpc: can't find service " + serviceMethod) - } - return service.Call(ctx, serviceMethod, args, reply) -} - -func ping(_ interface{}, _ *context.Context, _ *utils.CGREvent, reply *string) error { - *reply = utils.Pong - return nil -} - -var pingM = &birpc.MethodType{ - Method: reflect.Method{ - Name: "Ping", - Type: reflect.TypeOf(ping), - Func: reflect.ValueOf(ping), - }, - ArgType: reflect.TypeOf(new(utils.CGREvent)), - ReplyType: reflect.TypeOf(new(string)), + return } diff --git a/services/sessions.go b/services/sessions.go index a1b2e0dfd..1001730b7 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -86,15 +86,14 @@ func (smg *SessionService) Start() (err error) { } }(smg.sm) - // Register RPC handler - srv, _ := engine.NewServiceWithName(smg.sm, utils.SessionS, true) // methods with multiple options - // Pass internal connection via BiRPCClient smg.connChan <- smg.sm + // Register RPC handler smg.rpc = v1.NewSMGenericV1(smg.sm) - smg.rpcv1 = v1.NewSessionSv1(smg.sm) // methods with multiple options + + // Register RPC handler if !smg.cfg.DispatcherSCfg().Enabled { smg.server.RpcRegister(smg.rpc) smg.server.RpcRegister(smg.rpcv1) @@ -102,9 +101,11 @@ func (smg *SessionService) Start() (err error) { // Register BiRpc handlers if smg.cfg.SessionSCfg().ListenBijson != "" { smg.bircpEnabled = true - for n, s := range srv { - smg.server.BiRPCRegisterName(n, s) + var srv *birpc.Service + if srv, err = engine.NewBiRPCService(smg.rpcv1); err != nil { + return } + smg.server.BiRPCRegisterName(srv.Name, srv) // run this in it's own goroutine go func() { if err := smg.server.ServeBiJSON(smg.cfg.SessionSCfg().ListenBijson, smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect); err != nil {