From 210fb00954f25b2bc18134f6448acaea42adfa90 Mon Sep 17 00:00:00 2001 From: TeoV Date: Thu, 14 Jun 2018 10:00:35 -0400 Subject: [PATCH] Add index biJSON connection for sessions --- apier/v1/sessionsv1_it_test.go | 18 +++++++------- cmd/cgr-engine/cgr-engine.go | 2 +- sessions/sessions.go | 44 ++++++++++++++++++++-------------- utils/server.go | 4 +++- 4 files changed, 39 insertions(+), 29 deletions(-) diff --git a/apier/v1/sessionsv1_it_test.go b/apier/v1/sessionsv1_it_test.go index 53722047a..e280b1819 100644 --- a/apier/v1/sessionsv1_it_test.go +++ b/apier/v1/sessionsv1_it_test.go @@ -100,6 +100,15 @@ func TestSSv1ItRpcConn(t *testing.T) { dummyClnt.Close() // close so we don't get EOF error when disconnecting server } +func TestV1STSSessionPing(t *testing.T) { + var resp string + if err := sSv1BiRpc.Call(utils.SessionSv1Ping, "", &resp); err != nil { + t.Error(err) + } else if resp != utils.Pong { + t.Error("Unexpected reply returned", resp) + } +} + // Load the tariff plan, creating accounts and their balances func TestSSv1ItTPFromFolder(t *testing.T) { attrs := &utils.AttrLoadTpFromFolder{ @@ -519,15 +528,6 @@ func TestSSv1ItProcessEvent(t *testing.T) { } } -func TestV1STSSessionPing(t *testing.T) { - var resp string - if err := sSv1BiRpc.Call(utils.SessionSv1Ping, "", &resp); err != nil { - t.Error(err) - } else if resp != utils.Pong { - t.Error("Unexpected reply returned", resp) - } -} - func TestSSv1ItStopCgrEngine(t *testing.T) { if err := sSv1BiRpc.Close(); err != nil { // Close the connection so we don't get EOF warnings from client t.Error(err) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index bc81ff444..ac48a8fb0 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -237,7 +237,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in for method, handler := range ssv1.Handlers() { server.BiRPCRegisterName(method, handler) } - server.ServeBiJSON(cfg.SessionSCfg().ListenBijson) + server.ServeBiJSON(cfg.SessionSCfg().ListenBijson, sm.OnConnect, sm.OnDisconnect) exitChan <- true } } diff --git a/sessions/sessions.go b/sessions/sessions.go index 6d7de1ea5..074b03d62 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -26,6 +26,7 @@ import ( "sync" "time" + "github.com/cenk/rpc2" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/guardian" @@ -99,6 +100,7 @@ func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS, thdS, cdrsrv: cdrsrv, smgReplConns: smgReplConns, Timezone: timezone, + biJsonConns: make(map[*rpc2.Client]struct{}), activeSessions: make(map[string][]*SMGSession), ssIdxCfg: ssIdxCfg, aSessionsIndex: make(map[string]map[string]map[string]utils.StringMap), @@ -111,22 +113,18 @@ func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS, thdS, } type SMGeneric struct { - cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple - rals rpcclient.RpcClientConnection // RALs connections - resS rpcclient.RpcClientConnection // ResourceS connections - thdS rpcclient.RpcClientConnection // ThresholdS connections - statS rpcclient.RpcClientConnection // StatS connections - splS rpcclient.RpcClientConnection // SupplierS connections - attrS rpcclient.RpcClientConnection // AttributeS connections - cdrsrv rpcclient.RpcClientConnection // CDR server connections - smgReplConns []*SMGReplicationConn // list of connections where we will replicate our session data - Timezone string - /* Part of new project - param map[*rpc2.Client]struct{} - toBeRemovedSessions map[string][]*SMGSession - realActiveSession map[string]struct{} - */ - activeSessions map[string][]*SMGSession // group sessions per sessionId, multiple runs based on derived charging + cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple + rals rpcclient.RpcClientConnection // RALs connections + resS rpcclient.RpcClientConnection // ResourceS connections + thdS rpcclient.RpcClientConnection // ThresholdS connections + statS rpcclient.RpcClientConnection // StatS connections + splS rpcclient.RpcClientConnection // SupplierS connections + attrS rpcclient.RpcClientConnection // AttributeS connections + cdrsrv rpcclient.RpcClientConnection // CDR server connections + smgReplConns []*SMGReplicationConn // list of connections where we will replicate our session data + Timezone string + biJsonConns map[*rpc2.Client]struct{} // index BiJSONConnection so we can sync them later + activeSessions map[string][]*SMGSession // group sessions per sessionId, multiple runs based on derived charging aSessionsMux sync.RWMutex ssIdxCfg utils.StringMap // index configuration aSessionsIndex map[string]map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue][runID]utils.StringMap[cgrID] @@ -1981,6 +1979,16 @@ func (smg *SMGeneric) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, return nil } -func (smg *SMGeneric) syncSessions() { - +func (smg *SMGeneric) OnConnect(c *rpc2.Client) { + var s struct{} + smg.biJsonConns[c] = s +} + +func (smg *SMGeneric) OnDisconnect(c *rpc2.Client) { + delete(smg.biJsonConns, c) +} + +func (smg *SMGeneric) syncSessions() { + // var toBeRemovedSessions map[string][]*SMGSession + // var realActiveSession map[string]struct{} } diff --git a/utils/server.go b/utils/server.go index 95576a5da..d2b85a81d 100644 --- a/utils/server.go +++ b/utils/server.go @@ -230,7 +230,7 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, http.ListenAndServe(addr, nil) } -func (s *Server) ServeBiJSON(addr string) { +func (s *Server) ServeBiJSON(addr string, onConn func(*rpc2.Client), onDis func(*rpc2.Client)) { s.RLock() isNil := s.birpcSrv == nil s.RUnlock() @@ -241,6 +241,8 @@ func (s *Server) ServeBiJSON(addr string) { if e != nil { log.Fatal("ServeBiJSON listen error:", e) } + s.birpcSrv.OnConnect(onConn) + s.birpcSrv.OnDisconnect(onDis) Logger.Info(fmt.Sprintf("Starting CGRateS BiJSON server at <%s>", addr)) for { conn, err := lBiJSON.Accept()