diff --git a/analyzers/codec.go b/analyzers/codec.go index 826cdf047..f13cfdd20 100644 --- a/analyzers/codec.go +++ b/analyzers/codec.go @@ -22,6 +22,8 @@ import ( "net/rpc" "sync" "time" + + "github.com/cenkalti/rpc2" ) func NewAnalyzerServerCodec(sc rpc.ServerCodec, aS *AnalyzerService, enc, from, to string) rpc.ServerCodec { @@ -79,3 +81,100 @@ func (c *AnalyzerServerCodec) WriteResponse(r *rpc.Response, x interface{}) erro } func (c *AnalyzerServerCodec) Close() error { return c.sc.Close() } + +func NewAnalyzerBiRPCCodec(sc rpc2.Codec, aS *AnalyzerService, enc, from, to string) rpc2.Codec { + return &AnalyzerBiRPCCodec{ + sc: sc, + reqs: make(map[uint64]*rpcAPI), + reps: make(map[uint64]*rpcAPI), + aS: aS, + enc: enc, + from: from, + to: to, + } +} + +type AnalyzerBiRPCCodec struct { + sc rpc2.Codec + + // keep the API in memory because the write is async + reqs map[uint64]*rpcAPI + reqIdx uint64 + reqsLk sync.RWMutex + reps map[uint64]*rpcAPI + repIdx uint64 + repsLk sync.RWMutex + + aS *AnalyzerService + enc string + from string + to string +} + +// ReadHeader must read a message and populate either the request +// or the response by inspecting the incoming message. +func (c *AnalyzerBiRPCCodec) ReadHeader(req *rpc2.Request, resp *rpc2.Response) (err error) { + err = c.sc.ReadHeader(req, resp) + if req.Method != "" { + c.reqsLk.Lock() + c.reqIdx = req.Seq + c.reqs[c.reqIdx] = &rpcAPI{ + ID: req.Seq, + Method: req.Method, + StartTime: time.Now(), + } + c.reqsLk.Unlock() + } else { + c.repsLk.Lock() + c.repIdx = resp.Seq + c.reps[c.repIdx].Error = resp.Error + c.repsLk.Unlock() + } + return +} + +// ReadRequestBody into args argument of handler function. +func (c *AnalyzerBiRPCCodec) ReadRequestBody(x interface{}) (err error) { + err = c.sc.ReadRequestBody(x) + c.reqsLk.Lock() + c.reqs[c.reqIdx].Params = x + c.reqsLk.Unlock() + return +} + +// ReadResponseBody into reply argument of handler function. +func (c *AnalyzerBiRPCCodec) ReadResponseBody(x interface{}) (err error) { + err = c.sc.ReadResponseBody(x) + c.repsLk.Lock() + api := c.reqs[c.repIdx] + delete(c.reqs, c.repIdx) + c.repsLk.Unlock() + go c.aS.logTrafic(api.ID, api.Method, api.Params, x, api.Error, c.enc, c.to, c.from, api.StartTime, time.Now()) + return +} + +// WriteRequest must be safe for concurrent use by multiple goroutines. +func (c *AnalyzerBiRPCCodec) WriteRequest(req *rpc2.Request, x interface{}) error { + c.repsLk.Lock() + c.reqIdx = req.Seq + c.reqs[c.reqIdx] = &rpcAPI{ + ID: req.Seq, + Method: req.Method, + StartTime: time.Now(), + } + c.repsLk.Unlock() + return c.sc.WriteRequest(req, x) +} + +// WriteResponse must be safe for concurrent use by multiple goroutines. +func (c *AnalyzerBiRPCCodec) WriteResponse(r *rpc2.Response, x interface{}) error { + c.reqsLk.Lock() + api := c.reqs[r.Seq] + delete(c.reqs, r.Seq) + c.reqsLk.Unlock() + go c.aS.logTrafic(api.ID, api.Method, api.Params, x, r.Error, c.enc, c.from, c.to, api.StartTime, time.Now()) + return c.sc.WriteResponse(r, x) +} + +// Close is called when client/server finished with the connection. +func (c *AnalyzerBiRPCCodec) Close() error { return c.sc.Close() } diff --git a/analyzers/libanalyzers.go b/analyzers/libanalyzers.go index 143476936..d99246ad9 100644 --- a/analyzers/libanalyzers.go +++ b/analyzers/libanalyzers.go @@ -82,6 +82,7 @@ type rpcAPI struct { ID uint64 `json:"id"` Method string `json:"method"` Params interface{} `json:"params"` + Error string `json:"err,omitempty"` StartTime time.Time } diff --git a/apier/v1/api_interfaces_test.go b/apier/v1/api_interfaces_test.go index 3b1a321c1..c789994de 100644 --- a/apier/v1/api_interfaces_test.go +++ b/apier/v1/api_interfaces_test.go @@ -56,7 +56,7 @@ func TestChargerSv1Interface(t *testing.T) { func TestSessionSv1Interface(t *testing.T) { _ = SessionSv1Interface(NewDispatcherSessionSv1(nil)) - _ = SessionSv1Interface(NewSessionSv1(nil, nil)) + _ = SessionSv1Interface(NewSessionSv1(nil)) } func TestResponderInterface(t *testing.T) { diff --git a/apier/v1/sessions.go b/apier/v1/sessions.go index ad91ae189..ba2c4f174 100644 --- a/apier/v1/sessions.go +++ b/apier/v1/sessions.go @@ -20,22 +20,19 @@ package v1 import ( "github.com/cgrates/cgrates/dispatchers" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" ) -func NewSessionSv1(sS *sessions.SessionS, caps *engine.Caps) *SessionSv1 { +func NewSessionSv1(sS *sessions.SessionS) *SessionSv1 { return &SessionSv1{ - sS: sS, - caps: caps, + sS: sS, } } // SessionSv1 exports RPC from SessionSv1 type SessionSv1 struct { - sS *sessions.SessionS - caps *engine.Caps + sS *sessions.SessionS } func (ssv1 *SessionSv1) AuthorizeEvent(args *sessions.V1AuthorizeArgs, diff --git a/apier/v1/sessionsbirpc.go b/apier/v1/sessionsbirpc.go index fd86b2c9c..74eb57db1 100644 --- a/apier/v1/sessionsbirpc.go +++ b/apier/v1/sessionsbirpc.go @@ -67,302 +67,140 @@ func (ssv1 *SessionSv1) Handlers() map[string]interface{} { func (ssv1 *SessionSv1) BiRPCv1AuthorizeEvent(clnt *rpc2.Client, args *sessions.V1AuthorizeArgs, rply *sessions.V1AuthorizeReply) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1AuthorizeEvent(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1AuthorizeEventWithDigest(clnt *rpc2.Client, args *sessions.V1AuthorizeArgs, rply *sessions.V1AuthorizeReplyWithDigest) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1AuthorizeEventWithDigest(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1InitiateSession(clnt *rpc2.Client, args *sessions.V1InitSessionArgs, rply *sessions.V1InitSessionReply) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1InitiateSession(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1InitiateSessionWithDigest(clnt *rpc2.Client, args *sessions.V1InitSessionArgs, rply *sessions.V1InitReplyWithDigest) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1InitiateSessionWithDigest(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1UpdateSession(clnt *rpc2.Client, args *sessions.V1UpdateSessionArgs, rply *sessions.V1UpdateSessionReply) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1UpdateSession(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1SyncSessions(clnt *rpc2.Client, args *utils.TenantWithAPIOpts, rply *string) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1SyncSessions(clnt, &utils.TenantWithAPIOpts{}, rply) } func (ssv1 *SessionSv1) BiRPCv1TerminateSession(clnt *rpc2.Client, args *sessions.V1TerminateSessionArgs, rply *string) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1TerminateSession(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1ProcessCDR(clnt *rpc2.Client, cgrEv *utils.CGREvent, rply *string) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1ProcessCDR(clnt, cgrEv, rply) } func (ssv1 *SessionSv1) BiRPCv1ProcessMessage(clnt *rpc2.Client, args *sessions.V1ProcessMessageArgs, rply *sessions.V1ProcessMessageReply) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1ProcessMessage(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1ProcessEvent(clnt *rpc2.Client, args *sessions.V1ProcessEventArgs, rply *sessions.V1ProcessEventReply) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1ProcessEvent(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetCost(clnt *rpc2.Client, args *sessions.V1ProcessEventArgs, rply *sessions.V1GetCostReply) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1GetCost(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetActiveSessions(clnt *rpc2.Client, args *utils.SessionFilter, rply *[]*sessions.ExternalSession) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1GetActiveSessions(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetActiveSessionsCount(clnt *rpc2.Client, args *utils.SessionFilter, rply *int) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1GetActiveSessionsCount(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessions(clnt *rpc2.Client, args *utils.SessionFilter, rply *[]*sessions.ExternalSession) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1GetPassiveSessions(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessionsCount(clnt *rpc2.Client, args *utils.SessionFilter, rply *int) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1GetPassiveSessionsCount(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1ForceDisconnect(clnt *rpc2.Client, args *utils.SessionFilter, rply *string) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1ForceDisconnect(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1RegisterInternalBiJSONConn(clnt *rpc2.Client, args string, rply *string) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1RegisterInternalBiJSONConn(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCPing(clnt *rpc2.Client, ign *utils.CGREvent, reply *string) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.Ping(ign, reply) } func (ssv1 *SessionSv1) BiRPCv1ReplicateSessions(clnt *rpc2.Client, args sessions.ArgsReplicateSessions, reply *string) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.BiRPCv1ReplicateSessions(clnt, args, reply) } func (ssv1 *SessionSv1) BiRPCv1SetPassiveSession(clnt *rpc2.Client, args *sessions.Session, reply *string) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1SetPassiveSession(clnt, args, reply) } func (ssv1 *SessionSv1) BiRPCv1ActivateSessions(clnt *rpc2.Client, args *utils.SessionIDsWithArgsDispatcher, reply *string) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1ActivateSessions(clnt, args, reply) } func (ssv1 *SessionSv1) BiRPCv1DeactivateSessions(clnt *rpc2.Client, args *utils.SessionIDsWithArgsDispatcher, reply *string) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1DeactivateSessions(clnt, args, reply) } // BiRPCV1ReAuthorize sends the RAR for filterd sessions func (ssv1 *SessionSv1) BiRPCV1ReAuthorize(clnt *rpc2.Client, args *utils.SessionFilter, reply *string) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1ReAuthorize(clnt, args, reply) } // BiRPCV1DisconnectPeer sends the DPR for the OriginHost and OriginRealm func (ssv1 *SessionSv1) BiRPCV1DisconnectPeer(clnt *rpc2.Client, args *utils.DPRArgs, reply *string) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1DisconnectPeer(clnt, args, reply) } // BiRPCV1STIRAuthenticate checks the identity using STIR/SHAKEN func (ssv1 *SessionSv1) BiRPCV1STIRAuthenticate(clnt *rpc2.Client, args *sessions.V1STIRAuthenticateArgs, reply *string) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1STIRAuthenticate(clnt, args, reply) } // BiRPCV1STIRIdentity creates the identity for STIR/SHAKEN func (ssv1 *SessionSv1) BiRPCV1STIRIdentity(clnt *rpc2.Client, args *sessions.V1STIRIdentityArgs, reply *string) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } return ssv1.sS.BiRPCv1STIRIdentity(nil, args, reply) } func (ssv1 *SessionSv1) BiRPCV1Sleep(clnt *rpc2.Client, arg *utils.DurationArgs, reply *string) (err error) { - if ssv1.caps.IsLimited() { - if err = ssv1.caps.Allocate(); err != nil { - return - } - defer ssv1.caps.Deallocate() - } time.Sleep(arg.Duration) *reply = utils.OK return nil diff --git a/apier/v1/smg.go b/apier/v1/smg.go index ac8a46813..f9c033245 100644 --- a/apier/v1/smg.go +++ b/apier/v1/smg.go @@ -19,22 +19,19 @@ along with this program. If not, see package v1 import ( - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/sessions" ) -func NewSMGenericV1(sS *sessions.SessionS, caps *engine.Caps) *SMGenericV1 { +func NewSMGenericV1(sS *sessions.SessionS) *SMGenericV1 { return &SMGenericV1{ - Ss: sS, - caps: caps, + Ss: sS, } } // Exports RPC from SMGeneric // DEPRECATED, use SessionSv1 instead type SMGenericV1 struct { - Ss *sessions.SessionS - caps *engine.Caps + Ss *sessions.SessionS } // Returns MaxUsage (for calls in seconds), -1 for no limit diff --git a/apier/v1/smgbirpc.go b/apier/v1/smgbirpc.go index afec23c49..c28835aaa 100644 --- a/apier/v1/smgbirpc.go +++ b/apier/v1/smgbirpc.go @@ -36,59 +36,29 @@ func (smgv1 *SMGenericV1) Handlers() map[string]interface{} { /// Returns MaxUsage (for calls in seconds), -1 for no limit func (smgv1 *SMGenericV1) BiRPCV1GetMaxUsage(clnt *rpc2.Client, ev map[string]interface{}, maxUsage *float64) (err error) { - if smgv1.caps.IsLimited() { - if err = smgv1.caps.Allocate(); err != nil { - return - } - defer smgv1.caps.Deallocate() - } return smgv1.Ss.BiRPCV1GetMaxUsage(clnt, ev, maxUsage) } // Called on session start, returns the maximum number of seconds the session can last func (smgv1 *SMGenericV1) BiRPCV1InitiateSession(clnt *rpc2.Client, ev map[string]interface{}, maxUsage *float64) (err error) { - if smgv1.caps.IsLimited() { - if err = smgv1.caps.Allocate(); err != nil { - return - } - defer smgv1.caps.Deallocate() - } return smgv1.Ss.BiRPCV1InitiateSession(clnt, ev, maxUsage) } // Interim updates, returns remaining duration from the rater func (smgv1 *SMGenericV1) BiRPCV1UpdateSession(clnt *rpc2.Client, ev map[string]interface{}, maxUsage *float64) (err error) { - if smgv1.caps.IsLimited() { - if err = smgv1.caps.Allocate(); err != nil { - return - } - defer smgv1.caps.Deallocate() - } return smgv1.Ss.BiRPCV1UpdateSession(clnt, ev, maxUsage) } // Called on session end, should stop debit loop func (smgv1 *SMGenericV1) BiRPCV1TerminateSession(clnt *rpc2.Client, ev map[string]interface{}, reply *string) (err error) { - if smgv1.caps.IsLimited() { - if err = smgv1.caps.Allocate(); err != nil { - return - } - defer smgv1.caps.Deallocate() - } return smgv1.Ss.BiRPCV1TerminateSession(clnt, ev, reply) } // Called on session end, should send the CDR to CDRS func (smgv1 *SMGenericV1) BiRPCV1ProcessCDR(clnt *rpc2.Client, ev map[string]interface{}, reply *string) (err error) { - if smgv1.caps.IsLimited() { - if err = smgv1.caps.Allocate(); err != nil { - return - } - defer smgv1.caps.Deallocate() - } return smgv1.Ss.BiRPCV1ProcessCDR(clnt, ev, reply) } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index edd29003e..df015b4e4 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -614,7 +614,7 @@ func main() { cdrS := services.NewCDRServer(cfg, dmService, storDBService, filterSChan, server, internalCDRServerChan, connManager, anz, srvDep) - smg := services.NewSessionService(cfg, dmService, server, internalSessionSChan, shdChan, connManager, caps, anz, srvDep) + smg := services.NewSessionService(cfg, dmService, server, internalSessionSChan, shdChan, connManager, anz, srvDep) ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, internalLoaderSChan, connManager, anz, srvDep) diff --git a/cores/caps.go b/cores/caps.go index b75cc6ed1..559b50059 100644 --- a/cores/caps.go +++ b/cores/caps.go @@ -23,6 +23,8 @@ import ( "net/rpc" "net/rpc/jsonrpc" + "github.com/cenkalti/rpc2" + jsonrpc2 "github.com/cenkalti/rpc2/jsonrpc" "github.com/cgrates/cgrates/analyzers" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -106,3 +108,88 @@ func (c *capsServerCodec) WriteResponse(r *rpc.Response, x interface{}) error { return c.sc.WriteResponse(r, x) } func (c *capsServerCodec) Close() error { return c.sc.Close() } + +func newCapsBiRPCGOBCodec(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) (r rpc2.Codec) { + r = newCapsBiRPCCodec(rpc2.NewGobCodec(conn), caps) + if anz != nil { + from := conn.RemoteAddr() + var fromstr string + if from != nil { + fromstr = from.String() + } + to := conn.LocalAddr() + var tostr string + if to != nil { + tostr = to.String() + } + return analyzers.NewAnalyzerBiRPCCodec(r, anz, utils.MetaGOB, fromstr, tostr) + } + return +} + +func newCapsBiRPCJSONCodec(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) (r rpc2.Codec) { + r = newCapsBiRPCCodec(jsonrpc2.NewJSONCodec(conn), caps) + if anz != nil { + from := conn.RemoteAddr() + var fromstr string + if from != nil { + fromstr = from.String() + } + to := conn.LocalAddr() + var tostr string + if to != nil { + tostr = to.String() + } + return analyzers.NewAnalyzerBiRPCCodec(r, anz, utils.MetaJSON, fromstr, tostr) + } + return +} + +func newCapsBiRPCCodec(sc rpc2.Codec, caps *engine.Caps) rpc2.Codec { + return &capsBiRPCCodec{ + sc: sc, + caps: caps, + } +} + +type capsBiRPCCodec struct { + sc rpc2.Codec + caps *engine.Caps +} + +// ReadHeader must read a message and populate either the request +// or the response by inspecting the incoming message. +func (c *capsBiRPCCodec) ReadHeader(req *rpc2.Request, resp *rpc2.Response) (err error) { + return c.sc.ReadHeader(req, resp) +} + +// ReadRequestBody into args argument of handler function. +func (c *capsBiRPCCodec) ReadRequestBody(x interface{}) (err error) { + if err = c.caps.Allocate(); err != nil { + return + } + return c.sc.ReadRequestBody(x) +} + +// ReadResponseBody into reply argument of handler function. +func (c *capsBiRPCCodec) ReadResponseBody(x interface{}) error { + return c.sc.ReadResponseBody(x) +} + +// WriteRequest must be safe for concurrent use by multiple goroutines. +func (c *capsBiRPCCodec) WriteRequest(req *rpc2.Request, x interface{}) error { + return c.sc.WriteRequest(req, x) +} + +// WriteResponse must be safe for concurrent use by multiple goroutines. +func (c *capsBiRPCCodec) WriteResponse(r *rpc2.Response, x interface{}) error { + if r.Error == utils.ErrMaxConcurentRPCExceededNoCaps.Error() { + r.Error = utils.ErrMaxConcurentRPCExceeded.Error() + } else { + defer c.caps.Deallocate() + } + return c.sc.WriteResponse(r, x) +} + +// Close is called when client/server finished with the connection. +func (c *capsBiRPCCodec) Close() error { return c.sc.Close() } diff --git a/cores/server.go b/cores/server.go index 986e77e40..3dd934544 100644 --- a/cores/server.go +++ b/cores/server.go @@ -40,7 +40,6 @@ import ( "github.com/cgrates/cgrates/utils" "github.com/cenkalti/rpc2" - jsonrpc2 "github.com/cenkalti/rpc2/jsonrpc" "golang.org/x/net/websocket" ) @@ -262,14 +261,14 @@ func (s *Server) ServeBiRPC(addrJSON, addrGOB string, onConn func(*rpc2.Client), s.birpcSrv.OnDisconnect(onDis) if addrJSON != utils.EmptyString { var ljson net.Listener - if ljson, err = s.listenBiRPC(s.birpcSrv, addrJSON, utils.JSONCaps, jsonrpc2.NewJSONCodec); err != nil { + if ljson, err = s.listenBiRPC(s.birpcSrv, addrJSON, utils.JSONCaps, newCapsBiRPCJSONCodec); err != nil { return } defer ljson.Close() } if addrGOB != utils.EmptyString { var lgob net.Listener - if lgob, err = s.listenBiRPC(s.birpcSrv, addrGOB, utils.GOBCaps, rpc2.NewGobCodec); err != nil { + if lgob, err = s.listenBiRPC(s.birpcSrv, addrGOB, utils.GOBCaps, newCapsBiRPCGOBCodec); err != nil { return } defer lgob.Close() @@ -278,7 +277,7 @@ func (s *Server) ServeBiRPC(addrJSON, addrGOB string, onConn func(*rpc2.Client), return } -func (s *Server) listenBiRPC(srv *rpc2.Server, addr, codecName string, newCodec func(io.ReadWriteCloser) rpc2.Codec) (lBiRPC net.Listener, err error) { +func (s *Server) listenBiRPC(srv *rpc2.Server, addr, codecName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) rpc2.Codec) (lBiRPC net.Listener, err error) { if lBiRPC, err = net.Listen(utils.TCP, addr); err != nil { log.Printf("ServeBi%s listen error: %s \n", codecName, err) return @@ -288,7 +287,7 @@ func (s *Server) listenBiRPC(srv *rpc2.Server, addr, codecName string, newCodec return } -func (s *Server) acceptBiRPC(srv *rpc2.Server, l net.Listener, codecName string, newCodec func(io.ReadWriteCloser) rpc2.Codec) { +func (s *Server) acceptBiRPC(srv *rpc2.Server, l net.Listener, codecName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) rpc2.Codec) { for { conn, err := l.Accept() if err != nil { @@ -299,7 +298,7 @@ func (s *Server) acceptBiRPC(srv *rpc2.Server, l net.Listener, codecName string, utils.Logger.Crit(fmt.Sprintf("Stoped Bi%s server beacause %s", codecName, err)) return // stop if we get Accept error } - go srv.ServeCodec(newCodec(conn)) + go srv.ServeCodec(newCodec(conn, s.caps, s.anz)) } } diff --git a/cores/server_it_test.go b/cores/server_it_test.go index 1e0706ac4..a1e6c08c4 100644 --- a/cores/server_it_test.go +++ b/cores/server_it_test.go @@ -43,7 +43,6 @@ import ( sessions2 "github.com/cgrates/cgrates/sessions" "github.com/cenkalti/rpc2" - jsonrpc2 "github.com/cenkalti/rpc2/jsonrpc" "github.com/cgrates/cgrates/config" @@ -93,7 +92,7 @@ func TestServerIT(t *testing.T) { utils.Logger.SetLogLevel(7) for _, test := range sTestsServer { log.SetOutput(io.Discard) - t.Run("Running IT serve tests", test) + t.Run("TestServerIT", test) } } @@ -759,7 +758,7 @@ func testAcceptBiRPC(t *testing.T) { l := &mockListener{ p1: p1, } - go server.acceptBiRPC(server.birpcSrv, l, utils.JSONCaps, jsonrpc2.NewJSONCodec) + go server.acceptBiRPC(server.birpcSrv, l, utils.JSONCaps, newCapsBiRPCJSONCodec) rpc := jsonrpc.NewClient(p2) var reply string expected := "rpc2: can't find method AttributeSv1.Ping" @@ -780,14 +779,14 @@ func (mK *mockListenError) Accept() (net.Conn, error) { } func testAcceptBiRPCError(t *testing.T) { - caps := engine.NewCaps(0, utils.MetaBusy) + caps := engine.NewCaps(10, utils.MetaBusy) server := NewServer(caps) server.RpcRegister(new(mockRegister)) server.birpcSrv = rpc2.NewServer() //it will contain "use of closed network connection" l := new(mockListenError) - go server.acceptBiRPC(server.birpcSrv, l, utils.JSONCaps, jsonrpc2.NewJSONCodec) + go server.acceptBiRPC(server.birpcSrv, l, utils.JSONCaps, newCapsBiRPCJSONCodec) runtime.Gosched() } diff --git a/engine/filters.go b/engine/filters.go index 9796bbb05..20c7d91bd 100644 --- a/engine/filters.go +++ b/engine/filters.go @@ -166,6 +166,29 @@ func splitDynFltrValues(val, sep string) (vals []string) { return append(vals, valsEnd[1:]...) } +func splitInlineFilter(rule string) (splt []string) { + var p, st int + splt = make([]string, 0, 3) + for i, b := range rule { + switch byte(b) { + case utils.InInFieldSep[0]: + if p == 0 { + splt = append(splt, rule[st:i]) + st = i + 1 + if len(splt) == 2 { + splt = append(splt, rule[st:]) + return + } + } + case utils.IdxStart[0]: + p++ + case utils.IdxEnd[0]: + p-- + } + } + return +} + // NewFilterFromInline parses an inline rule into a compiled Filter func NewFilterFromInline(tenant, inlnRule string) (f *Filter, err error) { ruleSplt := strings.SplitN(inlnRule, utils.InInFieldSep, 3) diff --git a/services/asteriskagent_it_test.go b/services/asteriskagent_it_test.go index c196da763..accdf2b7b 100644 --- a/services/asteriskagent_it_test.go +++ b/services/asteriskagent_it_test.go @@ -58,7 +58,7 @@ func TestAsteriskAgentReload(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) + shdChan, nil, anz, srvDep) srv := NewAsteriskAgent(cfg, shdChan, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, @@ -124,7 +124,7 @@ func TestAsteriskAgentReload2(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) + shdChan, nil, anz, srvDep) srv := NewAsteriskAgent(cfg, shdChan, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, diff --git a/services/diameteragent_it_test.go b/services/diameteragent_it_test.go index 62f8c1dfd..fc6873bca 100644 --- a/services/diameteragent_it_test.go +++ b/services/diameteragent_it_test.go @@ -52,7 +52,7 @@ func TestDiameterAgentReload1(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) + shdChan, nil, anz, srvDep) srv := NewDiameterAgent(cfg, filterSChan, shdChan, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, diff --git a/services/dnsagent_it_test.go b/services/dnsagent_it_test.go index 79c16750d..52a4197b8 100644 --- a/services/dnsagent_it_test.go +++ b/services/dnsagent_it_test.go @@ -60,7 +60,7 @@ func TestDNSAgentReload(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) + shdChan, nil, anz, srvDep) srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, diff --git a/services/ers_it_test.go b/services/ers_it_test.go index 426ae1d5a..3188b4938 100644 --- a/services/ers_it_test.go +++ b/services/ers_it_test.go @@ -64,7 +64,7 @@ func TestEventReaderSReload(t *testing.T) { srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) db := NewDataDBService(cfg, nil, srvDep) - sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), shdChan, nil, nil, anz, srvDep) + sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), shdChan, nil, anz, srvDep) erS := NewEventReaderService(cfg, filterSChan, shdChan, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(erS, sS, diff --git a/services/freeswitchagent_it_test.go b/services/freeswitchagent_it_test.go index 1d097f90e..50d8584a9 100644 --- a/services/freeswitchagent_it_test.go +++ b/services/freeswitchagent_it_test.go @@ -59,7 +59,7 @@ func TestFreeSwitchAgentReload(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) + shdChan, nil, anz, srvDep) srv := NewFreeswitchAgent(cfg, shdChan, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, diff --git a/services/httpagent_it_test.go b/services/httpagent_it_test.go index 168af3ac2..a2fb2998b 100644 --- a/services/httpagent_it_test.go +++ b/services/httpagent_it_test.go @@ -57,7 +57,7 @@ func TestHTTPAgentReload(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) + shdChan, nil, anz, srvDep) srv := NewHTTPAgent(cfg, filterSChan, server, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, diff --git a/services/kamailioagent_it_test.go b/services/kamailioagent_it_test.go index 34b2531c4..89d6424b8 100644 --- a/services/kamailioagent_it_test.go +++ b/services/kamailioagent_it_test.go @@ -58,7 +58,7 @@ func TestKamailioAgentReload(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) + shdChan, nil, anz, srvDep) srv := NewKamailioAgent(cfg, shdChan, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, diff --git a/services/radiusagent_it_test.go b/services/radiusagent_it_test.go index 68e421890..c4cd0a04f 100644 --- a/services/radiusagent_it_test.go +++ b/services/radiusagent_it_test.go @@ -61,7 +61,7 @@ func TestRadiusAgentReload(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) + shdChan, nil, anz, srvDep) srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, @@ -129,7 +129,7 @@ func TestRadiusAgentReload2(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) + shdChan, nil, anz, srvDep) srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, diff --git a/services/sessions.go b/services/sessions.go index e4fb7b745..a7d31f3a5 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -37,8 +37,7 @@ import ( func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, server *cores.Server, internalChan chan rpcclient.ClientConnector, shdChan *utils.SyncedChan, connMgr *engine.ConnManager, - caps *engine.Caps, anz *AnalyzerService, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &SessionService{ connChan: internalChan, cfg: cfg, @@ -46,7 +45,6 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, server: server, shdChan: shdChan, connMgr: connMgr, - caps: caps, anz: anz, srvDep: srvDep, } @@ -69,7 +67,6 @@ type SessionService struct { // in order to stop the bircp server if necesary bircpEnabled bool connMgr *engine.ConnManager - caps *engine.Caps anz *AnalyzerService srvDep map[string]*sync.WaitGroup } @@ -96,9 +93,9 @@ func (smg *SessionService) Start() (err error) { // Pass internal connection via BiRPCClient smg.connChan <- smg.anz.GetInternalBiRPCCodec(smg.sm, utils.SessionS) // Register RPC handler - smg.rpc = v1.NewSMGenericV1(smg.sm, smg.caps) + smg.rpc = v1.NewSMGenericV1(smg.sm) - smg.rpcv1 = v1.NewSessionSv1(smg.sm, smg.caps) // methods with multiple options + smg.rpcv1 = v1.NewSessionSv1(smg.sm) // methods with multiple options if !smg.cfg.DispatcherSCfg().Enabled { smg.server.RpcRegister(smg.rpc) smg.server.RpcRegister(smg.rpcv1) diff --git a/services/sessions_it_test.go b/services/sessions_it_test.go index c386ac4e9..0b27c7af7 100644 --- a/services/sessions_it_test.go +++ b/services/sessions_it_test.go @@ -100,7 +100,7 @@ func TestSessionSReload1(t *testing.T) { utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers): clientConect, }) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) - srv := NewSessionService(cfg, new(DataDBService), server, make(chan rpcclient.ClientConnector, 1), shdChan, conMng, nil, anz, srvDep) + srv := NewSessionService(cfg, new(DataDBService), server, make(chan rpcclient.ClientConnector, 1), shdChan, conMng, anz, srvDep) err := srv.Start() if err != nil { t.Fatal(err) @@ -173,7 +173,7 @@ func TestSessionSReload2(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) cfg.StorDbCfg().Type = utils.INTERNAL anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) - srv := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), shdChan, nil, nil, anz, srvDep) + srv := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), shdChan, nil, anz, srvDep) engine.NewConnManager(cfg, nil) srv.(*SessionService).sm = &sessions.SessionS{} @@ -235,7 +235,7 @@ func TestSessionSReload3(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) cfg.StorDbCfg().Type = utils.INTERNAL anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) - srv := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), shdChan, nil, nil, anz, srvDep) + srv := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), shdChan, nil, anz, srvDep) engine.NewConnManager(cfg, nil) srv.(*SessionService).sm = &sessions.SessionS{} diff --git a/services/sessions_test.go b/services/sessions_test.go index 579244665..c4b6d869f 100644 --- a/services/sessions_test.go +++ b/services/sessions_test.go @@ -50,7 +50,7 @@ func TestSessionSCoverage(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) cfg.StorDbCfg().Type = utils.INTERNAL anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) - srv := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), shdChan, nil, nil, anz, srvDep) + srv := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), shdChan, nil, anz, srvDep) engine.NewConnManager(cfg, nil) if srv.IsRunning() { t.Errorf("Expected service to be down") @@ -62,7 +62,6 @@ func TestSessionSCoverage(t *testing.T) { shdChan: shdChan, connChan: make(chan rpcclient.ClientConnector, 1), connMgr: nil, - caps: nil, anz: anz, srvDep: srvDep, sm: &sessions.SessionS{}, diff --git a/services/sipagent_it_test.go b/services/sipagent_it_test.go index 1de2b9f70..89a44d216 100644 --- a/services/sipagent_it_test.go +++ b/services/sipagent_it_test.go @@ -56,7 +56,7 @@ func TestSIPAgentReload(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) + shdChan, nil, anz, srvDep) srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS,