Updated caps for birpc server

This commit is contained in:
Trial97
2021-07-13 17:15:49 +03:00
committed by Dan Christian Bogos
parent 50efbf4451
commit 4fe51d4d05
24 changed files with 245 additions and 239 deletions

View File

@@ -22,6 +22,8 @@ import (
"net/rpc"
"sync"
"time"
"github.com/cenkalti/rpc2"
)
func NewAnalyzerServerCodec(sc rpc.ServerCodec, aS *AnalyzerService, enc, from, to string) rpc.ServerCodec {
@@ -79,3 +81,100 @@ func (c *AnalyzerServerCodec) WriteResponse(r *rpc.Response, x interface{}) erro
}
func (c *AnalyzerServerCodec) Close() error { return c.sc.Close() }
func NewAnalyzerBiRPCCodec(sc rpc2.Codec, aS *AnalyzerService, enc, from, to string) rpc2.Codec {
return &AnalyzerBiRPCCodec{
sc: sc,
reqs: make(map[uint64]*rpcAPI),
reps: make(map[uint64]*rpcAPI),
aS: aS,
enc: enc,
from: from,
to: to,
}
}
type AnalyzerBiRPCCodec struct {
sc rpc2.Codec
// keep the API in memory because the write is async
reqs map[uint64]*rpcAPI
reqIdx uint64
reqsLk sync.RWMutex
reps map[uint64]*rpcAPI
repIdx uint64
repsLk sync.RWMutex
aS *AnalyzerService
enc string
from string
to string
}
// ReadHeader must read a message and populate either the request
// or the response by inspecting the incoming message.
func (c *AnalyzerBiRPCCodec) ReadHeader(req *rpc2.Request, resp *rpc2.Response) (err error) {
err = c.sc.ReadHeader(req, resp)
if req.Method != "" {
c.reqsLk.Lock()
c.reqIdx = req.Seq
c.reqs[c.reqIdx] = &rpcAPI{
ID: req.Seq,
Method: req.Method,
StartTime: time.Now(),
}
c.reqsLk.Unlock()
} else {
c.repsLk.Lock()
c.repIdx = resp.Seq
c.reps[c.repIdx].Error = resp.Error
c.repsLk.Unlock()
}
return
}
// ReadRequestBody into args argument of handler function.
func (c *AnalyzerBiRPCCodec) ReadRequestBody(x interface{}) (err error) {
err = c.sc.ReadRequestBody(x)
c.reqsLk.Lock()
c.reqs[c.reqIdx].Params = x
c.reqsLk.Unlock()
return
}
// ReadResponseBody into reply argument of handler function.
func (c *AnalyzerBiRPCCodec) ReadResponseBody(x interface{}) (err error) {
err = c.sc.ReadResponseBody(x)
c.repsLk.Lock()
api := c.reqs[c.repIdx]
delete(c.reqs, c.repIdx)
c.repsLk.Unlock()
go c.aS.logTrafic(api.ID, api.Method, api.Params, x, api.Error, c.enc, c.to, c.from, api.StartTime, time.Now())
return
}
// WriteRequest must be safe for concurrent use by multiple goroutines.
func (c *AnalyzerBiRPCCodec) WriteRequest(req *rpc2.Request, x interface{}) error {
c.repsLk.Lock()
c.reqIdx = req.Seq
c.reqs[c.reqIdx] = &rpcAPI{
ID: req.Seq,
Method: req.Method,
StartTime: time.Now(),
}
c.repsLk.Unlock()
return c.sc.WriteRequest(req, x)
}
// WriteResponse must be safe for concurrent use by multiple goroutines.
func (c *AnalyzerBiRPCCodec) WriteResponse(r *rpc2.Response, x interface{}) error {
c.reqsLk.Lock()
api := c.reqs[r.Seq]
delete(c.reqs, r.Seq)
c.reqsLk.Unlock()
go c.aS.logTrafic(api.ID, api.Method, api.Params, x, r.Error, c.enc, c.from, c.to, api.StartTime, time.Now())
return c.sc.WriteResponse(r, x)
}
// Close is called when client/server finished with the connection.
func (c *AnalyzerBiRPCCodec) Close() error { return c.sc.Close() }

View File

@@ -82,6 +82,7 @@ type rpcAPI struct {
ID uint64 `json:"id"`
Method string `json:"method"`
Params interface{} `json:"params"`
Error string `json:"err,omitempty"`
StartTime time.Time
}

View File

@@ -56,7 +56,7 @@ func TestChargerSv1Interface(t *testing.T) {
func TestSessionSv1Interface(t *testing.T) {
_ = SessionSv1Interface(NewDispatcherSessionSv1(nil))
_ = SessionSv1Interface(NewSessionSv1(nil, nil))
_ = SessionSv1Interface(NewSessionSv1(nil))
}
func TestResponderInterface(t *testing.T) {

View File

@@ -20,22 +20,19 @@ package v1
import (
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
)
func NewSessionSv1(sS *sessions.SessionS, caps *engine.Caps) *SessionSv1 {
func NewSessionSv1(sS *sessions.SessionS) *SessionSv1 {
return &SessionSv1{
sS: sS,
caps: caps,
sS: sS,
}
}
// SessionSv1 exports RPC from SessionSv1
type SessionSv1 struct {
sS *sessions.SessionS
caps *engine.Caps
sS *sessions.SessionS
}
func (ssv1 *SessionSv1) AuthorizeEvent(args *sessions.V1AuthorizeArgs,

View File

@@ -67,302 +67,140 @@ func (ssv1 *SessionSv1) Handlers() map[string]interface{} {
func (ssv1 *SessionSv1) BiRPCv1AuthorizeEvent(clnt *rpc2.Client, args *sessions.V1AuthorizeArgs,
rply *sessions.V1AuthorizeReply) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1AuthorizeEvent(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1AuthorizeEventWithDigest(clnt *rpc2.Client, args *sessions.V1AuthorizeArgs,
rply *sessions.V1AuthorizeReplyWithDigest) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1AuthorizeEventWithDigest(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1InitiateSession(clnt *rpc2.Client, args *sessions.V1InitSessionArgs,
rply *sessions.V1InitSessionReply) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1InitiateSession(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1InitiateSessionWithDigest(clnt *rpc2.Client, args *sessions.V1InitSessionArgs,
rply *sessions.V1InitReplyWithDigest) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1InitiateSessionWithDigest(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1UpdateSession(clnt *rpc2.Client, args *sessions.V1UpdateSessionArgs,
rply *sessions.V1UpdateSessionReply) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1UpdateSession(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1SyncSessions(clnt *rpc2.Client, args *utils.TenantWithAPIOpts,
rply *string) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1SyncSessions(clnt, &utils.TenantWithAPIOpts{}, rply)
}
func (ssv1 *SessionSv1) BiRPCv1TerminateSession(clnt *rpc2.Client, args *sessions.V1TerminateSessionArgs,
rply *string) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1TerminateSession(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1ProcessCDR(clnt *rpc2.Client, cgrEv *utils.CGREvent,
rply *string) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1ProcessCDR(clnt, cgrEv, rply)
}
func (ssv1 *SessionSv1) BiRPCv1ProcessMessage(clnt *rpc2.Client, args *sessions.V1ProcessMessageArgs,
rply *sessions.V1ProcessMessageReply) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1ProcessMessage(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1ProcessEvent(clnt *rpc2.Client, args *sessions.V1ProcessEventArgs,
rply *sessions.V1ProcessEventReply) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1ProcessEvent(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1GetCost(clnt *rpc2.Client, args *sessions.V1ProcessEventArgs,
rply *sessions.V1GetCostReply) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1GetCost(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1GetActiveSessions(clnt *rpc2.Client, args *utils.SessionFilter,
rply *[]*sessions.ExternalSession) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1GetActiveSessions(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1GetActiveSessionsCount(clnt *rpc2.Client, args *utils.SessionFilter,
rply *int) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1GetActiveSessionsCount(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessions(clnt *rpc2.Client, args *utils.SessionFilter,
rply *[]*sessions.ExternalSession) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1GetPassiveSessions(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessionsCount(clnt *rpc2.Client, args *utils.SessionFilter,
rply *int) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1GetPassiveSessionsCount(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1ForceDisconnect(clnt *rpc2.Client, args *utils.SessionFilter,
rply *string) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1ForceDisconnect(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1RegisterInternalBiJSONConn(clnt *rpc2.Client, args string,
rply *string) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1RegisterInternalBiJSONConn(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCPing(clnt *rpc2.Client, ign *utils.CGREvent,
reply *string) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.Ping(ign, reply)
}
func (ssv1 *SessionSv1) BiRPCv1ReplicateSessions(clnt *rpc2.Client,
args sessions.ArgsReplicateSessions, reply *string) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.BiRPCv1ReplicateSessions(clnt, args, reply)
}
func (ssv1 *SessionSv1) BiRPCv1SetPassiveSession(clnt *rpc2.Client,
args *sessions.Session, reply *string) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1SetPassiveSession(clnt, args, reply)
}
func (ssv1 *SessionSv1) BiRPCv1ActivateSessions(clnt *rpc2.Client,
args *utils.SessionIDsWithArgsDispatcher, reply *string) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1ActivateSessions(clnt, args, reply)
}
func (ssv1 *SessionSv1) BiRPCv1DeactivateSessions(clnt *rpc2.Client,
args *utils.SessionIDsWithArgsDispatcher, reply *string) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1DeactivateSessions(clnt, args, reply)
}
// BiRPCV1ReAuthorize sends the RAR for filterd sessions
func (ssv1 *SessionSv1) BiRPCV1ReAuthorize(clnt *rpc2.Client,
args *utils.SessionFilter, reply *string) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1ReAuthorize(clnt, args, reply)
}
// BiRPCV1DisconnectPeer sends the DPR for the OriginHost and OriginRealm
func (ssv1 *SessionSv1) BiRPCV1DisconnectPeer(clnt *rpc2.Client,
args *utils.DPRArgs, reply *string) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1DisconnectPeer(clnt, args, reply)
}
// BiRPCV1STIRAuthenticate checks the identity using STIR/SHAKEN
func (ssv1 *SessionSv1) BiRPCV1STIRAuthenticate(clnt *rpc2.Client,
args *sessions.V1STIRAuthenticateArgs, reply *string) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1STIRAuthenticate(clnt, args, reply)
}
// BiRPCV1STIRIdentity creates the identity for STIR/SHAKEN
func (ssv1 *SessionSv1) BiRPCV1STIRIdentity(clnt *rpc2.Client,
args *sessions.V1STIRIdentityArgs, reply *string) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
return ssv1.sS.BiRPCv1STIRIdentity(nil, args, reply)
}
func (ssv1 *SessionSv1) BiRPCV1Sleep(clnt *rpc2.Client, arg *utils.DurationArgs,
reply *string) (err error) {
if ssv1.caps.IsLimited() {
if err = ssv1.caps.Allocate(); err != nil {
return
}
defer ssv1.caps.Deallocate()
}
time.Sleep(arg.Duration)
*reply = utils.OK
return nil

View File

@@ -19,22 +19,19 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package v1
import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessions"
)
func NewSMGenericV1(sS *sessions.SessionS, caps *engine.Caps) *SMGenericV1 {
func NewSMGenericV1(sS *sessions.SessionS) *SMGenericV1 {
return &SMGenericV1{
Ss: sS,
caps: caps,
Ss: sS,
}
}
// Exports RPC from SMGeneric
// DEPRECATED, use SessionSv1 instead
type SMGenericV1 struct {
Ss *sessions.SessionS
caps *engine.Caps
Ss *sessions.SessionS
}
// Returns MaxUsage (for calls in seconds), -1 for no limit

View File

@@ -36,59 +36,29 @@ func (smgv1 *SMGenericV1) Handlers() map[string]interface{} {
/// Returns MaxUsage (for calls in seconds), -1 for no limit
func (smgv1 *SMGenericV1) BiRPCV1GetMaxUsage(clnt *rpc2.Client,
ev map[string]interface{}, maxUsage *float64) (err error) {
if smgv1.caps.IsLimited() {
if err = smgv1.caps.Allocate(); err != nil {
return
}
defer smgv1.caps.Deallocate()
}
return smgv1.Ss.BiRPCV1GetMaxUsage(clnt, ev, maxUsage)
}
// Called on session start, returns the maximum number of seconds the session can last
func (smgv1 *SMGenericV1) BiRPCV1InitiateSession(clnt *rpc2.Client,
ev map[string]interface{}, maxUsage *float64) (err error) {
if smgv1.caps.IsLimited() {
if err = smgv1.caps.Allocate(); err != nil {
return
}
defer smgv1.caps.Deallocate()
}
return smgv1.Ss.BiRPCV1InitiateSession(clnt, ev, maxUsage)
}
// Interim updates, returns remaining duration from the rater
func (smgv1 *SMGenericV1) BiRPCV1UpdateSession(clnt *rpc2.Client,
ev map[string]interface{}, maxUsage *float64) (err error) {
if smgv1.caps.IsLimited() {
if err = smgv1.caps.Allocate(); err != nil {
return
}
defer smgv1.caps.Deallocate()
}
return smgv1.Ss.BiRPCV1UpdateSession(clnt, ev, maxUsage)
}
// Called on session end, should stop debit loop
func (smgv1 *SMGenericV1) BiRPCV1TerminateSession(clnt *rpc2.Client,
ev map[string]interface{}, reply *string) (err error) {
if smgv1.caps.IsLimited() {
if err = smgv1.caps.Allocate(); err != nil {
return
}
defer smgv1.caps.Deallocate()
}
return smgv1.Ss.BiRPCV1TerminateSession(clnt, ev, reply)
}
// Called on session end, should send the CDR to CDRS
func (smgv1 *SMGenericV1) BiRPCV1ProcessCDR(clnt *rpc2.Client,
ev map[string]interface{}, reply *string) (err error) {
if smgv1.caps.IsLimited() {
if err = smgv1.caps.Allocate(); err != nil {
return
}
defer smgv1.caps.Deallocate()
}
return smgv1.Ss.BiRPCV1ProcessCDR(clnt, ev, reply)
}

View File

@@ -614,7 +614,7 @@ func main() {
cdrS := services.NewCDRServer(cfg, dmService, storDBService, filterSChan, server, internalCDRServerChan,
connManager, anz, srvDep)
smg := services.NewSessionService(cfg, dmService, server, internalSessionSChan, shdChan, connManager, caps, anz, srvDep)
smg := services.NewSessionService(cfg, dmService, server, internalSessionSChan, shdChan, connManager, anz, srvDep)
ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server,
internalLoaderSChan, connManager, anz, srvDep)

View File

@@ -23,6 +23,8 @@ import (
"net/rpc"
"net/rpc/jsonrpc"
"github.com/cenkalti/rpc2"
jsonrpc2 "github.com/cenkalti/rpc2/jsonrpc"
"github.com/cgrates/cgrates/analyzers"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -106,3 +108,88 @@ func (c *capsServerCodec) WriteResponse(r *rpc.Response, x interface{}) error {
return c.sc.WriteResponse(r, x)
}
func (c *capsServerCodec) Close() error { return c.sc.Close() }
func newCapsBiRPCGOBCodec(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) (r rpc2.Codec) {
r = newCapsBiRPCCodec(rpc2.NewGobCodec(conn), caps)
if anz != nil {
from := conn.RemoteAddr()
var fromstr string
if from != nil {
fromstr = from.String()
}
to := conn.LocalAddr()
var tostr string
if to != nil {
tostr = to.String()
}
return analyzers.NewAnalyzerBiRPCCodec(r, anz, utils.MetaGOB, fromstr, tostr)
}
return
}
func newCapsBiRPCJSONCodec(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) (r rpc2.Codec) {
r = newCapsBiRPCCodec(jsonrpc2.NewJSONCodec(conn), caps)
if anz != nil {
from := conn.RemoteAddr()
var fromstr string
if from != nil {
fromstr = from.String()
}
to := conn.LocalAddr()
var tostr string
if to != nil {
tostr = to.String()
}
return analyzers.NewAnalyzerBiRPCCodec(r, anz, utils.MetaJSON, fromstr, tostr)
}
return
}
func newCapsBiRPCCodec(sc rpc2.Codec, caps *engine.Caps) rpc2.Codec {
return &capsBiRPCCodec{
sc: sc,
caps: caps,
}
}
type capsBiRPCCodec struct {
sc rpc2.Codec
caps *engine.Caps
}
// ReadHeader must read a message and populate either the request
// or the response by inspecting the incoming message.
func (c *capsBiRPCCodec) ReadHeader(req *rpc2.Request, resp *rpc2.Response) (err error) {
return c.sc.ReadHeader(req, resp)
}
// ReadRequestBody into args argument of handler function.
func (c *capsBiRPCCodec) ReadRequestBody(x interface{}) (err error) {
if err = c.caps.Allocate(); err != nil {
return
}
return c.sc.ReadRequestBody(x)
}
// ReadResponseBody into reply argument of handler function.
func (c *capsBiRPCCodec) ReadResponseBody(x interface{}) error {
return c.sc.ReadResponseBody(x)
}
// WriteRequest must be safe for concurrent use by multiple goroutines.
func (c *capsBiRPCCodec) WriteRequest(req *rpc2.Request, x interface{}) error {
return c.sc.WriteRequest(req, x)
}
// WriteResponse must be safe for concurrent use by multiple goroutines.
func (c *capsBiRPCCodec) WriteResponse(r *rpc2.Response, x interface{}) error {
if r.Error == utils.ErrMaxConcurentRPCExceededNoCaps.Error() {
r.Error = utils.ErrMaxConcurentRPCExceeded.Error()
} else {
defer c.caps.Deallocate()
}
return c.sc.WriteResponse(r, x)
}
// Close is called when client/server finished with the connection.
func (c *capsBiRPCCodec) Close() error { return c.sc.Close() }

View File

@@ -40,7 +40,6 @@ import (
"github.com/cgrates/cgrates/utils"
"github.com/cenkalti/rpc2"
jsonrpc2 "github.com/cenkalti/rpc2/jsonrpc"
"golang.org/x/net/websocket"
)
@@ -262,14 +261,14 @@ func (s *Server) ServeBiRPC(addrJSON, addrGOB string, onConn func(*rpc2.Client),
s.birpcSrv.OnDisconnect(onDis)
if addrJSON != utils.EmptyString {
var ljson net.Listener
if ljson, err = s.listenBiRPC(s.birpcSrv, addrJSON, utils.JSONCaps, jsonrpc2.NewJSONCodec); err != nil {
if ljson, err = s.listenBiRPC(s.birpcSrv, addrJSON, utils.JSONCaps, newCapsBiRPCJSONCodec); err != nil {
return
}
defer ljson.Close()
}
if addrGOB != utils.EmptyString {
var lgob net.Listener
if lgob, err = s.listenBiRPC(s.birpcSrv, addrGOB, utils.GOBCaps, rpc2.NewGobCodec); err != nil {
if lgob, err = s.listenBiRPC(s.birpcSrv, addrGOB, utils.GOBCaps, newCapsBiRPCGOBCodec); err != nil {
return
}
defer lgob.Close()
@@ -278,7 +277,7 @@ func (s *Server) ServeBiRPC(addrJSON, addrGOB string, onConn func(*rpc2.Client),
return
}
func (s *Server) listenBiRPC(srv *rpc2.Server, addr, codecName string, newCodec func(io.ReadWriteCloser) rpc2.Codec) (lBiRPC net.Listener, err error) {
func (s *Server) listenBiRPC(srv *rpc2.Server, addr, codecName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) rpc2.Codec) (lBiRPC net.Listener, err error) {
if lBiRPC, err = net.Listen(utils.TCP, addr); err != nil {
log.Printf("ServeBi%s listen error: %s \n", codecName, err)
return
@@ -288,7 +287,7 @@ func (s *Server) listenBiRPC(srv *rpc2.Server, addr, codecName string, newCodec
return
}
func (s *Server) acceptBiRPC(srv *rpc2.Server, l net.Listener, codecName string, newCodec func(io.ReadWriteCloser) rpc2.Codec) {
func (s *Server) acceptBiRPC(srv *rpc2.Server, l net.Listener, codecName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) rpc2.Codec) {
for {
conn, err := l.Accept()
if err != nil {
@@ -299,7 +298,7 @@ func (s *Server) acceptBiRPC(srv *rpc2.Server, l net.Listener, codecName string,
utils.Logger.Crit(fmt.Sprintf("Stoped Bi%s server beacause %s", codecName, err))
return // stop if we get Accept error
}
go srv.ServeCodec(newCodec(conn))
go srv.ServeCodec(newCodec(conn, s.caps, s.anz))
}
}

View File

@@ -43,7 +43,6 @@ import (
sessions2 "github.com/cgrates/cgrates/sessions"
"github.com/cenkalti/rpc2"
jsonrpc2 "github.com/cenkalti/rpc2/jsonrpc"
"github.com/cgrates/cgrates/config"
@@ -93,7 +92,7 @@ func TestServerIT(t *testing.T) {
utils.Logger.SetLogLevel(7)
for _, test := range sTestsServer {
log.SetOutput(io.Discard)
t.Run("Running IT serve tests", test)
t.Run("TestServerIT", test)
}
}
@@ -759,7 +758,7 @@ func testAcceptBiRPC(t *testing.T) {
l := &mockListener{
p1: p1,
}
go server.acceptBiRPC(server.birpcSrv, l, utils.JSONCaps, jsonrpc2.NewJSONCodec)
go server.acceptBiRPC(server.birpcSrv, l, utils.JSONCaps, newCapsBiRPCJSONCodec)
rpc := jsonrpc.NewClient(p2)
var reply string
expected := "rpc2: can't find method AttributeSv1.Ping"
@@ -780,14 +779,14 @@ func (mK *mockListenError) Accept() (net.Conn, error) {
}
func testAcceptBiRPCError(t *testing.T) {
caps := engine.NewCaps(0, utils.MetaBusy)
caps := engine.NewCaps(10, utils.MetaBusy)
server := NewServer(caps)
server.RpcRegister(new(mockRegister))
server.birpcSrv = rpc2.NewServer()
//it will contain "use of closed network connection"
l := new(mockListenError)
go server.acceptBiRPC(server.birpcSrv, l, utils.JSONCaps, jsonrpc2.NewJSONCodec)
go server.acceptBiRPC(server.birpcSrv, l, utils.JSONCaps, newCapsBiRPCJSONCodec)
runtime.Gosched()
}

View File

@@ -166,6 +166,29 @@ func splitDynFltrValues(val, sep string) (vals []string) {
return append(vals, valsEnd[1:]...)
}
func splitInlineFilter(rule string) (splt []string) {
var p, st int
splt = make([]string, 0, 3)
for i, b := range rule {
switch byte(b) {
case utils.InInFieldSep[0]:
if p == 0 {
splt = append(splt, rule[st:i])
st = i + 1
if len(splt) == 2 {
splt = append(splt, rule[st:])
return
}
}
case utils.IdxStart[0]:
p++
case utils.IdxEnd[0]:
p--
}
}
return
}
// NewFilterFromInline parses an inline rule into a compiled Filter
func NewFilterFromInline(tenant, inlnRule string) (f *Filter, err error) {
ruleSplt := strings.SplitN(inlnRule, utils.InInFieldSep, 3)

View File

@@ -58,7 +58,7 @@ func TestAsteriskAgentReload(t *testing.T) {
db := NewDataDBService(cfg, nil, srvDep)
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1),
shdChan, nil, nil, anz, srvDep)
shdChan, nil, anz, srvDep)
srv := NewAsteriskAgent(cfg, shdChan, nil, srvDep)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(srv, sS,
@@ -124,7 +124,7 @@ func TestAsteriskAgentReload2(t *testing.T) {
db := NewDataDBService(cfg, nil, srvDep)
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1),
shdChan, nil, nil, anz, srvDep)
shdChan, nil, anz, srvDep)
srv := NewAsteriskAgent(cfg, shdChan, nil, srvDep)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(srv, sS,

View File

@@ -52,7 +52,7 @@ func TestDiameterAgentReload1(t *testing.T) {
db := NewDataDBService(cfg, nil, srvDep)
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1),
shdChan, nil, nil, anz, srvDep)
shdChan, nil, anz, srvDep)
srv := NewDiameterAgent(cfg, filterSChan, shdChan, nil, srvDep)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(srv, sS,

View File

@@ -60,7 +60,7 @@ func TestDNSAgentReload(t *testing.T) {
db := NewDataDBService(cfg, nil, srvDep)
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1),
shdChan, nil, nil, anz, srvDep)
shdChan, nil, anz, srvDep)
srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, srvDep)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(srv, sS,

View File

@@ -64,7 +64,7 @@ func TestEventReaderSReload(t *testing.T) {
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
db := NewDataDBService(cfg, nil, srvDep)
sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), shdChan, nil, nil, anz, srvDep)
sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), shdChan, nil, anz, srvDep)
erS := NewEventReaderService(cfg, filterSChan, shdChan, nil, srvDep)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(erS, sS,

View File

@@ -59,7 +59,7 @@ func TestFreeSwitchAgentReload(t *testing.T) {
db := NewDataDBService(cfg, nil, srvDep)
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1),
shdChan, nil, nil, anz, srvDep)
shdChan, nil, anz, srvDep)
srv := NewFreeswitchAgent(cfg, shdChan, nil, srvDep)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(srv, sS,

View File

@@ -57,7 +57,7 @@ func TestHTTPAgentReload(t *testing.T) {
db := NewDataDBService(cfg, nil, srvDep)
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1),
shdChan, nil, nil, anz, srvDep)
shdChan, nil, anz, srvDep)
srv := NewHTTPAgent(cfg, filterSChan, server, nil, srvDep)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(srv, sS,

View File

@@ -58,7 +58,7 @@ func TestKamailioAgentReload(t *testing.T) {
db := NewDataDBService(cfg, nil, srvDep)
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1),
shdChan, nil, nil, anz, srvDep)
shdChan, nil, anz, srvDep)
srv := NewKamailioAgent(cfg, shdChan, nil, srvDep)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(srv, sS,

View File

@@ -61,7 +61,7 @@ func TestRadiusAgentReload(t *testing.T) {
db := NewDataDBService(cfg, nil, srvDep)
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1),
shdChan, nil, nil, anz, srvDep)
shdChan, nil, anz, srvDep)
srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, srvDep)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(srv, sS,
@@ -129,7 +129,7 @@ func TestRadiusAgentReload2(t *testing.T) {
db := NewDataDBService(cfg, nil, srvDep)
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1),
shdChan, nil, nil, anz, srvDep)
shdChan, nil, anz, srvDep)
srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, srvDep)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(srv, sS,

View File

@@ -37,8 +37,7 @@ import (
func NewSessionService(cfg *config.CGRConfig, dm *DataDBService,
server *cores.Server, internalChan chan rpcclient.ClientConnector,
shdChan *utils.SyncedChan, connMgr *engine.ConnManager,
caps *engine.Caps, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &SessionService{
connChan: internalChan,
cfg: cfg,
@@ -46,7 +45,6 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService,
server: server,
shdChan: shdChan,
connMgr: connMgr,
caps: caps,
anz: anz,
srvDep: srvDep,
}
@@ -69,7 +67,6 @@ type SessionService struct {
// in order to stop the bircp server if necesary
bircpEnabled bool
connMgr *engine.ConnManager
caps *engine.Caps
anz *AnalyzerService
srvDep map[string]*sync.WaitGroup
}
@@ -96,9 +93,9 @@ func (smg *SessionService) Start() (err error) {
// Pass internal connection via BiRPCClient
smg.connChan <- smg.anz.GetInternalBiRPCCodec(smg.sm, utils.SessionS)
// Register RPC handler
smg.rpc = v1.NewSMGenericV1(smg.sm, smg.caps)
smg.rpc = v1.NewSMGenericV1(smg.sm)
smg.rpcv1 = v1.NewSessionSv1(smg.sm, smg.caps) // methods with multiple options
smg.rpcv1 = v1.NewSessionSv1(smg.sm) // methods with multiple options
if !smg.cfg.DispatcherSCfg().Enabled {
smg.server.RpcRegister(smg.rpc)
smg.server.RpcRegister(smg.rpcv1)

View File

@@ -100,7 +100,7 @@ func TestSessionSReload1(t *testing.T) {
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers): clientConect,
})
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
srv := NewSessionService(cfg, new(DataDBService), server, make(chan rpcclient.ClientConnector, 1), shdChan, conMng, nil, anz, srvDep)
srv := NewSessionService(cfg, new(DataDBService), server, make(chan rpcclient.ClientConnector, 1), shdChan, conMng, anz, srvDep)
err := srv.Start()
if err != nil {
t.Fatal(err)
@@ -173,7 +173,7 @@ func TestSessionSReload2(t *testing.T) {
db := NewDataDBService(cfg, nil, srvDep)
cfg.StorDbCfg().Type = utils.INTERNAL
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
srv := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), shdChan, nil, nil, anz, srvDep)
srv := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), shdChan, nil, anz, srvDep)
engine.NewConnManager(cfg, nil)
srv.(*SessionService).sm = &sessions.SessionS{}
@@ -235,7 +235,7 @@ func TestSessionSReload3(t *testing.T) {
db := NewDataDBService(cfg, nil, srvDep)
cfg.StorDbCfg().Type = utils.INTERNAL
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
srv := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), shdChan, nil, nil, anz, srvDep)
srv := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), shdChan, nil, anz, srvDep)
engine.NewConnManager(cfg, nil)
srv.(*SessionService).sm = &sessions.SessionS{}

View File

@@ -50,7 +50,7 @@ func TestSessionSCoverage(t *testing.T) {
db := NewDataDBService(cfg, nil, srvDep)
cfg.StorDbCfg().Type = utils.INTERNAL
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
srv := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), shdChan, nil, nil, anz, srvDep)
srv := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), shdChan, nil, anz, srvDep)
engine.NewConnManager(cfg, nil)
if srv.IsRunning() {
t.Errorf("Expected service to be down")
@@ -62,7 +62,6 @@ func TestSessionSCoverage(t *testing.T) {
shdChan: shdChan,
connChan: make(chan rpcclient.ClientConnector, 1),
connMgr: nil,
caps: nil,
anz: anz,
srvDep: srvDep,
sm: &sessions.SessionS{},

View File

@@ -56,7 +56,7 @@ func TestSIPAgentReload(t *testing.T) {
db := NewDataDBService(cfg, nil, srvDep)
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1),
shdChan, nil, nil, anz, srvDep)
shdChan, nil, anz, srvDep)
srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, srvDep)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(srv, sS,