diff --git a/apier/v1/sessionsbirpc.go b/apier/v1/sessionsbirpc.go index 03dc43663..8cee97745 100644 --- a/apier/v1/sessionsbirpc.go +++ b/apier/v1/sessionsbirpc.go @@ -19,189 +19,156 @@ along with this program. If not, see package v1 import ( - "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" ) -// Bidirectional JSON methods following -func (ssv1 *SessionSv1) Handlers() map[string]interface{} { - return map[string]interface{}{ - utils.SessionSv1GetActiveSessions: ssv1.BiRPCv1GetActiveSessions, - utils.SessionSv1GetActiveSessionsCount: ssv1.BiRPCv1GetActiveSessionsCount, - utils.SessionSv1GetPassiveSessions: ssv1.BiRPCv1GetPassiveSessions, - utils.SessionSv1GetPassiveSessionsCount: ssv1.BiRPCv1GetPassiveSessionsCount, - - utils.SessionSv1AuthorizeEvent: ssv1.BiRPCv1AuthorizeEvent, - utils.SessionSv1AuthorizeEventWithDigest: ssv1.BiRPCv1AuthorizeEventWithDigest, - utils.SessionSv1InitiateSession: ssv1.BiRPCv1InitiateSession, - utils.SessionSv1InitiateSessionWithDigest: ssv1.BiRPCv1InitiateSessionWithDigest, - utils.SessionSv1UpdateSession: ssv1.BiRPCv1UpdateSession, - utils.SessionSv1SyncSessions: ssv1.BiRPCv1SyncSessions, - utils.SessionSv1TerminateSession: ssv1.BiRPCv1TerminateSession, - utils.SessionSv1ProcessCDR: ssv1.BiRPCv1ProcessCDR, - utils.SessionSv1ProcessMessage: ssv1.BiRPCv1ProcessMessage, - utils.SessionSv1ProcessEvent: ssv1.BiRPCv1ProcessEvent, - - utils.SessionSv1ForceDisconnect: ssv1.BiRPCv1ForceDisconnect, - utils.SessionSv1RegisterInternalBiJSONConn: ssv1.BiRPCv1RegisterInternalBiJSONConn, - utils.SessionSv1Ping: ssv1.BiRPCPing, - - utils.SessionSv1ReplicateSessions: ssv1.BiRPCv1ReplicateSessions, - utils.SessionSv1SetPassiveSession: ssv1.BiRPCv1SetPassiveSession, - utils.SessionSv1ActivateSessions: ssv1.BiRPCv1ActivateSessions, - utils.SessionSv1DeactivateSessions: ssv1.BiRPCv1DeactivateSessions, - - utils.SessionSv1Sleep: ssv1.BiRPCV1Sleep, // Sleep method is used to test the concurrent requests mechanism - } -} - -func (ssv1 *SessionSv1) BiRPCv1AuthorizeEvent(clnt birpc.ClientConnector, args *sessions.V1AuthorizeArgs, +func (ssv1 *SessionSv1) BiRPCv1AuthorizeEvent(ctx *context.Context, args *sessions.V1AuthorizeArgs, rply *sessions.V1AuthorizeReply) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1AuthorizeEvent(clnt, args, rply) + return ssv1.Ss.BiRPCv1AuthorizeEvent(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1AuthorizeEventWithDigest(clnt birpc.ClientConnector, args *sessions.V1AuthorizeArgs, +func (ssv1 *SessionSv1) BiRPCv1AuthorizeEventWithDigest(ctx *context.Context, args *sessions.V1AuthorizeArgs, rply *sessions.V1AuthorizeReplyWithDigest) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1AuthorizeEventWithDigest(clnt, args, rply) + return ssv1.Ss.BiRPCv1AuthorizeEventWithDigest(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1InitiateSession(clnt birpc.ClientConnector, args *sessions.V1InitSessionArgs, +func (ssv1 *SessionSv1) BiRPCv1InitiateSession(ctx *context.Context, args *sessions.V1InitSessionArgs, rply *sessions.V1InitSessionReply) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1InitiateSession(clnt, args, rply) + return ssv1.Ss.BiRPCv1InitiateSession(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1InitiateSessionWithDigest(clnt birpc.ClientConnector, args *sessions.V1InitSessionArgs, +func (ssv1 *SessionSv1) BiRPCv1InitiateSessionWithDigest(ctx *context.Context, args *sessions.V1InitSessionArgs, rply *sessions.V1InitReplyWithDigest) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1InitiateSessionWithDigest(clnt, args, rply) + return ssv1.Ss.BiRPCv1InitiateSessionWithDigest(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1UpdateSession(clnt birpc.ClientConnector, args *sessions.V1UpdateSessionArgs, +func (ssv1 *SessionSv1) BiRPCv1UpdateSession(ctx *context.Context, args *sessions.V1UpdateSessionArgs, rply *sessions.V1UpdateSessionReply) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1UpdateSession(clnt, args, rply) + return ssv1.Ss.BiRPCv1UpdateSession(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1SyncSessions(clnt birpc.ClientConnector, args *string, +func (ssv1 *SessionSv1) BiRPCv1SyncSessions(ctx *context.Context, args *string, rply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1SyncSessions(clnt, "", rply) + return ssv1.Ss.BiRPCv1SyncSessions(ctx.Client, "", rply) } -func (ssv1 *SessionSv1) BiRPCv1TerminateSession(clnt birpc.ClientConnector, args *sessions.V1TerminateSessionArgs, +func (ssv1 *SessionSv1) BiRPCv1TerminateSession(ctx *context.Context, args *sessions.V1TerminateSessionArgs, rply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1TerminateSession(clnt, args, rply) + return ssv1.Ss.BiRPCv1TerminateSession(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1ProcessCDR(clnt birpc.ClientConnector, cgrEv *utils.CGREventWithArgDispatcher, +func (ssv1 *SessionSv1) BiRPCv1ProcessCDR(ctx *context.Context, cgrEv *utils.CGREventWithArgDispatcher, rply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1ProcessCDR(clnt, cgrEv, rply) + return ssv1.Ss.BiRPCv1ProcessCDR(ctx.Client, cgrEv, rply) } -func (ssv1 *SessionSv1) BiRPCv1ProcessMessage(clnt birpc.ClientConnector, args *sessions.V1ProcessMessageArgs, +func (ssv1 *SessionSv1) BiRPCv1ProcessMessage(ctx *context.Context, args *sessions.V1ProcessMessageArgs, rply *sessions.V1ProcessMessageReply) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1ProcessMessage(clnt, args, rply) + return ssv1.Ss.BiRPCv1ProcessMessage(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1ProcessEvent(clnt birpc.ClientConnector, args *sessions.V1ProcessEventArgs, +func (ssv1 *SessionSv1) BiRPCv1ProcessEvent(ctx *context.Context, args *sessions.V1ProcessEventArgs, rply *sessions.V1ProcessEventReply) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1ProcessEvent(clnt, args, rply) + return ssv1.Ss.BiRPCv1ProcessEvent(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1GetActiveSessions(clnt birpc.ClientConnector, args *utils.SessionFilter, +func (ssv1 *SessionSv1) BiRPCv1GetActiveSessions(ctx *context.Context, args *utils.SessionFilter, rply *[]*sessions.ExternalSession) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1GetActiveSessions(clnt, args, rply) + return ssv1.Ss.BiRPCv1GetActiveSessions(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1GetActiveSessionsCount(clnt birpc.ClientConnector, args *utils.SessionFilter, +func (ssv1 *SessionSv1) BiRPCv1GetActiveSessionsCount(ctx *context.Context, args *utils.SessionFilter, rply *int) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1GetActiveSessionsCount(clnt, args, rply) + return ssv1.Ss.BiRPCv1GetActiveSessionsCount(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessions(clnt birpc.ClientConnector, args *utils.SessionFilter, +func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessions(ctx *context.Context, args *utils.SessionFilter, rply *[]*sessions.ExternalSession) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1GetPassiveSessions(clnt, args, rply) + return ssv1.Ss.BiRPCv1GetPassiveSessions(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessionsCount(clnt birpc.ClientConnector, args *utils.SessionFilter, +func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessionsCount(ctx *context.Context, args *utils.SessionFilter, rply *int) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1GetPassiveSessionsCount(clnt, args, rply) + return ssv1.Ss.BiRPCv1GetPassiveSessionsCount(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1ForceDisconnect(clnt birpc.ClientConnector, args *utils.SessionFilter, +func (ssv1 *SessionSv1) BiRPCv1ForceDisconnect(ctx *context.Context, args *utils.SessionFilter, rply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1ForceDisconnect(clnt, args, rply) + return ssv1.Ss.BiRPCv1ForceDisconnect(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1RegisterInternalBiJSONConn(clnt birpc.ClientConnector, args string, +func (ssv1 *SessionSv1) BiRPCv1RegisterInternalBiJSONConn(ctx *context.Context, args string, rply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1RegisterInternalBiJSONConn(clnt, args, rply) + return ssv1.Ss.BiRPCv1RegisterInternalBiJSONConn(ctx.Client, args, rply) } -func (ssv1 *SessionSv1) BiRPCPing(clnt birpc.ClientConnector, ign *utils.CGREventWithArgDispatcher, +func (ssv1 *SessionSv1) BiRPCv1Ping(ctx *context.Context, ign *utils.CGREventWithArgDispatcher, reply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return @@ -210,43 +177,43 @@ func (ssv1 *SessionSv1) BiRPCPing(clnt birpc.ClientConnector, ign *utils.CGREven return ssv1.Ping(ign, reply) } -func (ssv1 *SessionSv1) BiRPCv1ReplicateSessions(clnt birpc.ClientConnector, +func (ssv1 *SessionSv1) BiRPCv1ReplicateSessions(ctx *context.Context, args sessions.ArgsReplicateSessions, reply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.BiRPCv1ReplicateSessions(clnt, args, reply) + return ssv1.Ss.BiRPCv1ReplicateSessions(ctx.Client, args, reply) } -func (ssv1 *SessionSv1) BiRPCv1SetPassiveSession(clnt birpc.ClientConnector, +func (ssv1 *SessionSv1) BiRPCv1SetPassiveSession(ctx *context.Context, args *sessions.Session, reply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1SetPassiveSession(clnt, args, reply) + return ssv1.Ss.BiRPCv1SetPassiveSession(ctx.Client, args, reply) } -func (ssv1 *SessionSv1) BiRPCv1ActivateSessions(clnt birpc.ClientConnector, +func (ssv1 *SessionSv1) BiRPCv1ActivateSessions(ctx *context.Context, args []string, reply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1ActivateSessions(clnt, args, reply) + return ssv1.Ss.BiRPCv1ActivateSessions(ctx.Client, args, reply) } -func (ssv1 *SessionSv1) BiRPCv1DeactivateSessions(clnt birpc.ClientConnector, +func (ssv1 *SessionSv1) BiRPCv1DeactivateSessions(ctx *context.Context, args []string, reply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return } defer utils.ConReqs.Deallocate() - return ssv1.Ss.BiRPCv1DeactivateSessions(clnt, args, reply) + return ssv1.Ss.BiRPCv1DeactivateSessions(ctx.Client, args, reply) } -func (ssv1 *SessionSv1) BiRPCV1Sleep(clnt birpc.ClientConnector, args *utils.DurationArgs, +func (ssv1 *SessionSv1) BiRPCV1Sleep(ctx *context.Context, args *utils.DurationArgs, reply *string) (err error) { if err = utils.ConReqs.Allocate(); err != nil { return diff --git a/engine/actions_test.go b/engine/actions_test.go index 43f1d6999..3cf46fa6f 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" @@ -2431,7 +2432,7 @@ func (trpcp *TestRPCParameters) Hopa(in Attr, out *float64) error { return nil } -func (trpcp *TestRPCParameters) Call(serviceMethod string, args interface{}, reply interface{}) error { +func (trpcp *TestRPCParameters) Call(ctx *context.Context, serviceMethod string, args interface{}, reply interface{}) error { parts := strings.Split(serviceMethod, ".") if len(parts) != 2 { return utils.ErrNotImplemented diff --git a/engine/libengine.go b/engine/libengine.go index 164659ad9..5cb7a94a8 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -19,9 +19,7 @@ along with this program. If not, see package engine import ( - "errors" "fmt" - "reflect" "strings" "time" "unicode" @@ -114,60 +112,20 @@ func (s *RPCClientSet) Call(ctx *context.Context, method string, args interface{ return conn.Call(context.TODO(), method, args, reply) } -func NewServiceWithName(val interface{}, name string, useName bool) (_ IntService, err error) { - var srv *birpc.Service - if srv, err = birpc.NewService(val, name, useName); err != nil { +func NewBiRPCService(val interface{}) (srv *birpc.Service, err error) { + var initialSrv *birpc.Service + if initialSrv, err = birpc.NewService(val, "", false); err != nil { return } - srv.Methods["Ping"] = pingM - s := IntService{srv.Name: srv} - for m, v := range srv.Methods { - m = strings.TrimPrefix(m, "BiRPC") - if len(m) < 2 || unicode.ToLower(rune(m[0])) != 'v' { + srv = new(birpc.Service) + *srv = *initialSrv + srv.Methods = make(map[string]*birpc.MethodType) + for mName, mType := range initialSrv.Methods { + mName = strings.TrimPrefix(mName, "BiRPC") + if len(mName) < 2 || unicode.ToLower(rune(mName[0])) != 'v' { continue } - - key := srv.Name - if unicode.IsLower(rune(key[len(key)-1])) { - key += "V" - } else { - key += "v" - } - key += string(m[1]) - srv2, has := s[key] - if !has { - srv2 = new(birpc.Service) - *srv2 = *srv - srv2.Name = key - srv2.Methods = map[string]*birpc.MethodType{"Ping": pingM} - s[key] = srv2 - } - srv2.Methods[m[2:]] = v + srv.Methods[mName[2:]] = mType } - return s, nil -} - -type IntService map[string]*birpc.Service - -func (s IntService) Call(ctx *context.Context, serviceMethod string, args, reply interface{}) error { - service, has := s[strings.Split(serviceMethod, utils.NestingSep)[0]] - if !has { - return errors.New("rpc: can't find service " + serviceMethod) - } - return service.Call(ctx, serviceMethod, args, reply) -} - -func ping(_ interface{}, _ *context.Context, _ *utils.CGREvent, reply *string) error { - *reply = utils.Pong - return nil -} - -var pingM = &birpc.MethodType{ - Method: reflect.Method{ - Name: "Ping", - Type: reflect.TypeOf(ping), - Func: reflect.ValueOf(ping), - }, - ArgType: reflect.TypeOf(new(utils.CGREvent)), - ReplyType: reflect.TypeOf(new(string)), + return } diff --git a/services/sessions.go b/services/sessions.go index a1b2e0dfd..1001730b7 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -86,15 +86,14 @@ func (smg *SessionService) Start() (err error) { } }(smg.sm) - // Register RPC handler - srv, _ := engine.NewServiceWithName(smg.sm, utils.SessionS, true) // methods with multiple options - // Pass internal connection via BiRPCClient smg.connChan <- smg.sm + // Register RPC handler smg.rpc = v1.NewSMGenericV1(smg.sm) - smg.rpcv1 = v1.NewSessionSv1(smg.sm) // methods with multiple options + + // Register RPC handler if !smg.cfg.DispatcherSCfg().Enabled { smg.server.RpcRegister(smg.rpc) smg.server.RpcRegister(smg.rpcv1) @@ -102,9 +101,11 @@ func (smg *SessionService) Start() (err error) { // Register BiRpc handlers if smg.cfg.SessionSCfg().ListenBijson != "" { smg.bircpEnabled = true - for n, s := range srv { - smg.server.BiRPCRegisterName(n, s) + var srv *birpc.Service + if srv, err = engine.NewBiRPCService(smg.rpcv1); err != nil { + return } + smg.server.BiRPCRegisterName(srv.Name, srv) // run this in it's own goroutine go func() { if err := smg.server.ServeBiJSON(smg.cfg.SessionSCfg().ListenBijson, smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect); err != nil {