From 3b0a397cd02c02980ca39d34e166ee700277be7c Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Tue, 14 Feb 2023 11:58:56 -0500 Subject: [PATCH] Make caps codec return error through API instead of directly --- agents/astagent.go | 8 ++++++++ agents/diamagent.go | 8 ++++++++ agents/fsagent.go | 8 ++++++++ agents/kamagent.go | 8 ++++++++ apier/v1/sessionsbirpc.go | 12 +++++++++--- apier/v1/smgbirpc.go | 8 ++++++++ cores/caps.go | 12 ++++++++---- sessions/sessions.go | 9 +++++++++ utils/consts.go | 1 + 9 files changed, 67 insertions(+), 7 deletions(-) diff --git a/agents/astagent.go b/agents/astagent.go index 51d1d35b0..5027f5e54 100644 --- a/agents/astagent.go +++ b/agents/astagent.go @@ -409,6 +409,11 @@ func (sma *AsteriskAgent) BiRPCv1WarnDisconnect(clnt rpcclient.ClientConnector, return sma.V1WarnDisconnect(args, reply) } +// BiRPCv1CapsError is used to return error when the caps limit is hit +func (sma *AsteriskAgent) BiRPCv1CapsError(clnt rpcclient.ClientConnector, args interface{}, reply *string) (err error) { + return utils.ErrMaxConcurentRPCExceeded +} + // Handlers is used to implement the rpcclient.BiRPCConector interface func (sma *AsteriskAgent) Handlers() map[string]interface{} { return map[string]interface{}{ @@ -427,5 +432,8 @@ func (sma *AsteriskAgent) Handlers() map[string]interface{} { utils.SessionSv1WarnDisconnect: func(clnt *rpc2.Client, args map[string]interface{}, rply *string) (err error) { return sma.BiRPCv1WarnDisconnect(clnt, args, rply) }, + utils.SessionSv1CapsError: func(clnt *rpc2.Client, args interface{}, rply *string) (err error) { + return sma.BiRPCv1CapsError(clnt, args, rply) + }, } } diff --git a/agents/diamagent.go b/agents/diamagent.go index 95244f30e..fc815c74a 100644 --- a/agents/diamagent.go +++ b/agents/diamagent.go @@ -602,6 +602,11 @@ func (da *DiameterAgent) BiRPCv1WarnDisconnect(clnt rpcclient.ClientConnector, a return da.V1WarnDisconnect(args, reply) } +// BiRPCv1CapsError is used to return error when the caps limit is hit +func (da *DiameterAgent) BiRPCv1CapsError(clnt rpcclient.ClientConnector, args interface{}, reply *string) (err error) { + return utils.ErrMaxConcurentRPCExceeded +} + // Handlers is used to implement the rpcclient.BiRPCConector interface func (da *DiameterAgent) Handlers() map[string]interface{} { return map[string]interface{}{ @@ -620,5 +625,8 @@ func (da *DiameterAgent) Handlers() map[string]interface{} { utils.SessionSv1WarnDisconnect: func(clnt *rpc2.Client, args map[string]interface{}, rply *string) (err error) { return da.BiRPCv1WarnDisconnect(clnt, args, rply) }, + utils.SessionSv1CapsError: func(clnt *rpc2.Client, args interface{}, rply *string) (err error) { + return da.BiRPCv1CapsError(clnt, args, rply) + }, } } diff --git a/agents/fsagent.go b/agents/fsagent.go index 321da2765..ed23a6588 100644 --- a/agents/fsagent.go +++ b/agents/fsagent.go @@ -514,6 +514,11 @@ func (fsa *FSsessions) BiRPCv1WarnDisconnect(clnt rpcclient.ClientConnector, arg return fsa.V1WarnDisconnect(args, reply) } +// BiRPCv1CapsError is used to return error when the caps limit is hit +func (fsa *FSsessions) BiRPCv1CapsError(clnt rpcclient.ClientConnector, args interface{}, reply *string) (err error) { + return utils.ErrMaxConcurentRPCExceeded +} + // Handlers is used to implement the rpcclient.BiRPCConector interface func (fsa *FSsessions) Handlers() map[string]interface{} { return map[string]interface{}{ @@ -532,5 +537,8 @@ func (fsa *FSsessions) Handlers() map[string]interface{} { utils.SessionSv1WarnDisconnect: func(clnt *rpc2.Client, args map[string]interface{}, rply *string) (err error) { return fsa.BiRPCv1WarnDisconnect(clnt, args, rply) }, + utils.SessionSv1CapsError: func(clnt *rpc2.Client, args interface{}, rply *string) (err error) { + return fsa.BiRPCv1CapsError(clnt, args, rply) + }, } } diff --git a/agents/kamagent.go b/agents/kamagent.go index e4a6d784d..5eb62d2f4 100644 --- a/agents/kamagent.go +++ b/agents/kamagent.go @@ -482,6 +482,11 @@ func (ka *KamailioAgent) BiRPCv1WarnDisconnect(clnt rpcclient.ClientConnector, a return ka.V1WarnDisconnect(args, reply) } +// BiRPCv1CapsError is used to return error when the caps limit is hit +func (ka *KamailioAgent) BiRPCv1CapsError(clnt rpcclient.ClientConnector, args interface{}, reply *string) (err error) { + return utils.ErrMaxConcurentRPCExceeded +} + // Handlers is used to implement the rpcclient.BiRPCConector interface func (ka *KamailioAgent) Handlers() map[string]interface{} { return map[string]interface{}{ @@ -500,5 +505,8 @@ func (ka *KamailioAgent) Handlers() map[string]interface{} { utils.SessionSv1WarnDisconnect: func(clnt *rpc2.Client, args map[string]interface{}, rply *string) (err error) { return ka.BiRPCv1WarnDisconnect(clnt, args, rply) }, + utils.SessionSv1CapsError: func(clnt *rpc2.Client, args interface{}, rply *string) (err error) { + return ka.BiRPCv1CapsError(clnt, args, rply) + }, } } diff --git a/apier/v1/sessionsbirpc.go b/apier/v1/sessionsbirpc.go index 74eb57db1..095c760a4 100644 --- a/apier/v1/sessionsbirpc.go +++ b/apier/v1/sessionsbirpc.go @@ -61,7 +61,8 @@ func (ssv1 *SessionSv1) Handlers() map[string]interface{} { utils.SessionSv1STIRAuthenticate: ssv1.BiRPCV1STIRAuthenticate, utils.SessionSv1STIRIdentity: ssv1.BiRPCV1STIRIdentity, - utils.SessionSv1Sleep: ssv1.BiRPCV1Sleep, // Sleep method is used to test the concurrent requests mechanism + utils.SessionSv1Sleep: ssv1.BiRPCV1Sleep, // Sleep method is used to test the concurrent requests mechanism + utils.SessionSv1CapsError: ssv1.BiRPCV1CapsError, } } @@ -199,9 +200,14 @@ func (ssv1 *SessionSv1) BiRPCV1STIRIdentity(clnt *rpc2.Client, return ssv1.sS.BiRPCv1STIRIdentity(nil, args, reply) } -func (ssv1 *SessionSv1) BiRPCV1Sleep(clnt *rpc2.Client, arg *utils.DurationArgs, +func (ssv1 *SessionSv1) BiRPCV1Sleep(clnt *rpc2.Client, args *utils.DurationArgs, reply *string) (err error) { - time.Sleep(arg.Duration) + time.Sleep(args.Duration) *reply = utils.OK return nil } + +func (ssv1 *SessionSv1) BiRPCV1CapsError(clnt *rpc2.Client, args interface{}, + reply *string) (err error) { + return ssv1.sS.BiRPCv1CapsError(clnt, args, reply) +} diff --git a/apier/v1/smgbirpc.go b/apier/v1/smgbirpc.go index 9cb4e1417..fb92a4bc8 100644 --- a/apier/v1/smgbirpc.go +++ b/apier/v1/smgbirpc.go @@ -20,6 +20,7 @@ package v1 import ( "github.com/cenkalti/rpc2" + "github.com/cgrates/cgrates/utils" ) // Publishes methods exported by SMGenericV1 as SMGenericV1 (so we can handle standard RPC methods via birpc socket) @@ -30,6 +31,7 @@ func (smgv1 *SMGenericV1) Handlers() map[string]interface{} { "SMGenericV1.UpdateSession": smgv1.BiRPCV1UpdateSession, "SMGenericV1.TerminateSession": smgv1.BiRPCV1TerminateSession, "SMGenericV1.ProcessCDR": smgv1.BiRPCV1ProcessCDR, + "SMGenericV1.Sleep": smgv1.BiRPCV1CapsError, } } @@ -62,3 +64,9 @@ func (smgv1 *SMGenericV1) BiRPCV1ProcessCDR(clnt *rpc2.Client, ev map[string]interface{}, reply *string) (err error) { return smgv1.Ss.BiRPCV1ProcessCDR(clnt, ev, reply) } + +// BiRPCv1CapsError is used to return error when the caps limit is hit +func (smgv1 *SMGenericV1) BiRPCV1CapsError(clnt *rpc2.Client, + args interface{}, reply *string) (err error) { + return utils.ErrMaxConcurentRPCExceeded +} diff --git a/cores/caps.go b/cores/caps.go index 32ce98483..3be9eb162 100644 --- a/cores/caps.go +++ b/cores/caps.go @@ -164,14 +164,18 @@ type capsBiRPCCodec struct { // ReadHeader must read a message and populate either the request // or the response by inspecting the incoming message. func (c *capsBiRPCCodec) ReadHeader(req *rpc2.Request, resp *rpc2.Response) (err error) { - return c.sc.ReadHeader(req, resp) + if err = c.sc.ReadHeader(req, resp); err != nil || req.Method == utils.EmptyString { + return + } + if err = c.caps.Allocate(); err != nil { + req.Method = utils.SessionSv1CapsError + err = nil + } + return } // ReadRequestBody into args argument of handler function. func (c *capsBiRPCCodec) ReadRequestBody(x interface{}) (err error) { - if err = c.caps.Allocate(); err != nil { - return - } return c.sc.ReadRequestBody(x) } diff --git a/sessions/sessions.go b/sessions/sessions.go index 0256ab5dd..87f70ae9e 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -4133,6 +4133,12 @@ func (sS *SessionS) BiRPCv1STIRIdentity(clnt rpcclient.ClientConnector, return } +// BiRPCv1STIRIdentity the API for STIR header creation +func (sS *SessionS) BiRPCv1CapsError(clnt rpcclient.ClientConnector, + args interface{}, identity *string) (err error) { + return utils.ErrMaxConcurentRPCExceeded +} + // Handlers bidirectional methods following func (sS *SessionS) Handlers() map[string]interface{} { return map[string]interface{}{ @@ -4211,5 +4217,8 @@ func (sS *SessionS) Handlers() map[string]interface{} { utils.SessionSv1STIRIdentity: func(clnt *rpc2.Client, args *V1STIRIdentityArgs, rply *string) (err error) { return sS.BiRPCv1STIRIdentity(clnt, args, rply) }, + utils.SessionSv1CapsError: func(clnt *rpc2.Client, args interface{}, rply *string) (err error) { + return sS.BiRPCv1CapsError(clnt, args, rply) + }, } } diff --git a/utils/consts.go b/utils/consts.go index 41f6bd07b..f6c9d9c00 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1626,6 +1626,7 @@ const ( SessionSv1STIRAuthenticate = "SessionSv1.STIRAuthenticate" SessionSv1STIRIdentity = "SessionSv1.STIRIdentity" SessionSv1Sleep = "SessionSv1.Sleep" + SessionSv1CapsError = "SessionSv1.CapsError" ) // Responder APIs