From 8fed2775dd3aa02fe9fe794a67de150bc41057eb Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 8 Jul 2021 12:33:55 +0300 Subject: [PATCH] Added BiRPCClient to analyzers --- analyzers/connector.go | 39 ++++++++++++++++++++++++++++++++++++++- services/analyzers.go | 8 ++++++++ services/sessions.go | 2 +- 3 files changed, 47 insertions(+), 2 deletions(-) diff --git a/analyzers/connector.go b/analyzers/connector.go index 3aadc0e9f..a045cad28 100644 --- a/analyzers/connector.go +++ b/analyzers/connector.go @@ -43,9 +43,46 @@ type AnalyzerConnector struct { to string } -func (c *AnalyzerConnector) Call(serviceMethod string, args interface{}, reply interface{}) (err error) { +func (c *AnalyzerConnector) Call(serviceMethod string, args, reply interface{}) (err error) { sTime := time.Now() err = c.conn.Call(serviceMethod, args, reply) go c.aS.logTrafic(0, serviceMethod, args, reply, err, c.enc, c.from, c.to, sTime, time.Now()) return } + +func (aS *AnalyzerService) NewAnalyzerBiRPCConnector(sc rpcclient.BiRPCConector, enc, from, to string) rpcclient.BiRPCConector { + return &AnalyzerBiRPCConnector{ + conn: sc, + aS: aS, + enc: enc, + from: from, + to: to, + } +} + +type AnalyzerBiRPCConnector struct { + conn rpcclient.BiRPCConector + + aS *AnalyzerService + enc string + from string + to string +} + +func (c *AnalyzerBiRPCConnector) Call(serviceMethod string, args, reply interface{}) (err error) { + sTime := time.Now() + err = c.conn.Call(serviceMethod, args, reply) + go c.aS.logTrafic(0, serviceMethod, args, reply, err, c.enc, c.from, c.to, sTime, time.Now()) + return +} + +func (c *AnalyzerBiRPCConnector) CallBiRPC(cl rpcclient.ClientConnector, serviceMethod string, args, reply interface{}) (err error) { + sTime := time.Now() + err = c.conn.CallBiRPC(cl, serviceMethod, args, reply) + go c.aS.logTrafic(0, serviceMethod, args, reply, err, c.enc, c.from, c.to, sTime, time.Now()) + return +} + +func (c *AnalyzerBiRPCConnector) Handlers() map[string]interface{} { + return c.conn.Handlers() +} diff --git a/services/analyzers.go b/services/analyzers.go index 76daffed5..69be102e0 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -154,3 +154,11 @@ func (anz *AnalyzerService) GetInternalCodec(c rpcclient.ClientConnector, to str } return anz.anz.NewAnalyzerConnector(c, utils.MetaInternal, utils.EmptyString, to) } + +// GetInternalCodec returns the connection wrapped in analyzer connector +func (anz *AnalyzerService) GetInternalBiRPCCodec(c rpcclient.BiRPCConector, to string) rpcclient.BiRPCConector { + if !anz.IsRunning() { + return c + } + return anz.anz.NewAnalyzerBiRPCConnector(c, rpcclient.BiRPCInternal, utils.EmptyString, to) +} diff --git a/services/sessions.go b/services/sessions.go index 848c51899..e4fb7b745 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -94,7 +94,7 @@ func (smg *SessionService) Start() (err error) { smg.stopChan = make(chan struct{}) go smg.sm.ListenAndServe(smg.stopChan) // Pass internal connection via BiRPCClient - smg.connChan <- smg.anz.GetInternalCodec(smg.sm, utils.SessionS) + smg.connChan <- smg.anz.GetInternalBiRPCCodec(smg.sm, utils.SessionS) // Register RPC handler smg.rpc = v1.NewSMGenericV1(smg.sm, smg.caps)