Make caps codec return error through API instead of directly

This commit is contained in:
ionutboangiu
2023-02-14 11:58:56 -05:00
committed by Dan Christian Bogos
parent c7a52d7573
commit 3b0a397cd0
9 changed files with 67 additions and 7 deletions

View File

@@ -409,6 +409,11 @@ func (sma *AsteriskAgent) BiRPCv1WarnDisconnect(clnt rpcclient.ClientConnector,
return sma.V1WarnDisconnect(args, reply)
}
// BiRPCv1CapsError is used to return error when the caps limit is hit
func (sma *AsteriskAgent) BiRPCv1CapsError(clnt rpcclient.ClientConnector, args interface{}, reply *string) (err error) {
return utils.ErrMaxConcurentRPCExceeded
}
// Handlers is used to implement the rpcclient.BiRPCConector interface
func (sma *AsteriskAgent) Handlers() map[string]interface{} {
return map[string]interface{}{
@@ -427,5 +432,8 @@ func (sma *AsteriskAgent) Handlers() map[string]interface{} {
utils.SessionSv1WarnDisconnect: func(clnt *rpc2.Client, args map[string]interface{}, rply *string) (err error) {
return sma.BiRPCv1WarnDisconnect(clnt, args, rply)
},
utils.SessionSv1CapsError: func(clnt *rpc2.Client, args interface{}, rply *string) (err error) {
return sma.BiRPCv1CapsError(clnt, args, rply)
},
}
}

View File

@@ -602,6 +602,11 @@ func (da *DiameterAgent) BiRPCv1WarnDisconnect(clnt rpcclient.ClientConnector, a
return da.V1WarnDisconnect(args, reply)
}
// BiRPCv1CapsError is used to return error when the caps limit is hit
func (da *DiameterAgent) BiRPCv1CapsError(clnt rpcclient.ClientConnector, args interface{}, reply *string) (err error) {
return utils.ErrMaxConcurentRPCExceeded
}
// Handlers is used to implement the rpcclient.BiRPCConector interface
func (da *DiameterAgent) Handlers() map[string]interface{} {
return map[string]interface{}{
@@ -620,5 +625,8 @@ func (da *DiameterAgent) Handlers() map[string]interface{} {
utils.SessionSv1WarnDisconnect: func(clnt *rpc2.Client, args map[string]interface{}, rply *string) (err error) {
return da.BiRPCv1WarnDisconnect(clnt, args, rply)
},
utils.SessionSv1CapsError: func(clnt *rpc2.Client, args interface{}, rply *string) (err error) {
return da.BiRPCv1CapsError(clnt, args, rply)
},
}
}

View File

@@ -514,6 +514,11 @@ func (fsa *FSsessions) BiRPCv1WarnDisconnect(clnt rpcclient.ClientConnector, arg
return fsa.V1WarnDisconnect(args, reply)
}
// BiRPCv1CapsError is used to return error when the caps limit is hit
func (fsa *FSsessions) BiRPCv1CapsError(clnt rpcclient.ClientConnector, args interface{}, reply *string) (err error) {
return utils.ErrMaxConcurentRPCExceeded
}
// Handlers is used to implement the rpcclient.BiRPCConector interface
func (fsa *FSsessions) Handlers() map[string]interface{} {
return map[string]interface{}{
@@ -532,5 +537,8 @@ func (fsa *FSsessions) Handlers() map[string]interface{} {
utils.SessionSv1WarnDisconnect: func(clnt *rpc2.Client, args map[string]interface{}, rply *string) (err error) {
return fsa.BiRPCv1WarnDisconnect(clnt, args, rply)
},
utils.SessionSv1CapsError: func(clnt *rpc2.Client, args interface{}, rply *string) (err error) {
return fsa.BiRPCv1CapsError(clnt, args, rply)
},
}
}

View File

@@ -482,6 +482,11 @@ func (ka *KamailioAgent) BiRPCv1WarnDisconnect(clnt rpcclient.ClientConnector, a
return ka.V1WarnDisconnect(args, reply)
}
// BiRPCv1CapsError is used to return error when the caps limit is hit
func (ka *KamailioAgent) BiRPCv1CapsError(clnt rpcclient.ClientConnector, args interface{}, reply *string) (err error) {
return utils.ErrMaxConcurentRPCExceeded
}
// Handlers is used to implement the rpcclient.BiRPCConector interface
func (ka *KamailioAgent) Handlers() map[string]interface{} {
return map[string]interface{}{
@@ -500,5 +505,8 @@ func (ka *KamailioAgent) Handlers() map[string]interface{} {
utils.SessionSv1WarnDisconnect: func(clnt *rpc2.Client, args map[string]interface{}, rply *string) (err error) {
return ka.BiRPCv1WarnDisconnect(clnt, args, rply)
},
utils.SessionSv1CapsError: func(clnt *rpc2.Client, args interface{}, rply *string) (err error) {
return ka.BiRPCv1CapsError(clnt, args, rply)
},
}
}

View File

@@ -61,7 +61,8 @@ func (ssv1 *SessionSv1) Handlers() map[string]interface{} {
utils.SessionSv1STIRAuthenticate: ssv1.BiRPCV1STIRAuthenticate,
utils.SessionSv1STIRIdentity: ssv1.BiRPCV1STIRIdentity,
utils.SessionSv1Sleep: ssv1.BiRPCV1Sleep, // Sleep method is used to test the concurrent requests mechanism
utils.SessionSv1Sleep: ssv1.BiRPCV1Sleep, // Sleep method is used to test the concurrent requests mechanism
utils.SessionSv1CapsError: ssv1.BiRPCV1CapsError,
}
}
@@ -199,9 +200,14 @@ func (ssv1 *SessionSv1) BiRPCV1STIRIdentity(clnt *rpc2.Client,
return ssv1.sS.BiRPCv1STIRIdentity(nil, args, reply)
}
func (ssv1 *SessionSv1) BiRPCV1Sleep(clnt *rpc2.Client, arg *utils.DurationArgs,
func (ssv1 *SessionSv1) BiRPCV1Sleep(clnt *rpc2.Client, args *utils.DurationArgs,
reply *string) (err error) {
time.Sleep(arg.Duration)
time.Sleep(args.Duration)
*reply = utils.OK
return nil
}
func (ssv1 *SessionSv1) BiRPCV1CapsError(clnt *rpc2.Client, args interface{},
reply *string) (err error) {
return ssv1.sS.BiRPCv1CapsError(clnt, args, reply)
}

View File

@@ -20,6 +20,7 @@ package v1
import (
"github.com/cenkalti/rpc2"
"github.com/cgrates/cgrates/utils"
)
// Publishes methods exported by SMGenericV1 as SMGenericV1 (so we can handle standard RPC methods via birpc socket)
@@ -30,6 +31,7 @@ func (smgv1 *SMGenericV1) Handlers() map[string]interface{} {
"SMGenericV1.UpdateSession": smgv1.BiRPCV1UpdateSession,
"SMGenericV1.TerminateSession": smgv1.BiRPCV1TerminateSession,
"SMGenericV1.ProcessCDR": smgv1.BiRPCV1ProcessCDR,
"SMGenericV1.Sleep": smgv1.BiRPCV1CapsError,
}
}
@@ -62,3 +64,9 @@ func (smgv1 *SMGenericV1) BiRPCV1ProcessCDR(clnt *rpc2.Client,
ev map[string]interface{}, reply *string) (err error) {
return smgv1.Ss.BiRPCV1ProcessCDR(clnt, ev, reply)
}
// BiRPCv1CapsError is used to return error when the caps limit is hit
func (smgv1 *SMGenericV1) BiRPCV1CapsError(clnt *rpc2.Client,
args interface{}, reply *string) (err error) {
return utils.ErrMaxConcurentRPCExceeded
}

View File

@@ -164,14 +164,18 @@ type capsBiRPCCodec struct {
// ReadHeader must read a message and populate either the request
// or the response by inspecting the incoming message.
func (c *capsBiRPCCodec) ReadHeader(req *rpc2.Request, resp *rpc2.Response) (err error) {
return c.sc.ReadHeader(req, resp)
if err = c.sc.ReadHeader(req, resp); err != nil || req.Method == utils.EmptyString {
return
}
if err = c.caps.Allocate(); err != nil {
req.Method = utils.SessionSv1CapsError
err = nil
}
return
}
// ReadRequestBody into args argument of handler function.
func (c *capsBiRPCCodec) ReadRequestBody(x interface{}) (err error) {
if err = c.caps.Allocate(); err != nil {
return
}
return c.sc.ReadRequestBody(x)
}

View File

@@ -4133,6 +4133,12 @@ func (sS *SessionS) BiRPCv1STIRIdentity(clnt rpcclient.ClientConnector,
return
}
// BiRPCv1STIRIdentity the API for STIR header creation
func (sS *SessionS) BiRPCv1CapsError(clnt rpcclient.ClientConnector,
args interface{}, identity *string) (err error) {
return utils.ErrMaxConcurentRPCExceeded
}
// Handlers bidirectional methods following
func (sS *SessionS) Handlers() map[string]interface{} {
return map[string]interface{}{
@@ -4211,5 +4217,8 @@ func (sS *SessionS) Handlers() map[string]interface{} {
utils.SessionSv1STIRIdentity: func(clnt *rpc2.Client, args *V1STIRIdentityArgs, rply *string) (err error) {
return sS.BiRPCv1STIRIdentity(clnt, args, rply)
},
utils.SessionSv1CapsError: func(clnt *rpc2.Client, args interface{}, rply *string) (err error) {
return sS.BiRPCv1CapsError(clnt, args, rply)
},
}
}

View File

@@ -1626,6 +1626,7 @@ const (
SessionSv1STIRAuthenticate = "SessionSv1.STIRAuthenticate"
SessionSv1STIRIdentity = "SessionSv1.STIRIdentity"
SessionSv1Sleep = "SessionSv1.Sleep"
SessionSv1CapsError = "SessionSv1.CapsError"
)
// Responder APIs