Add index biJSON connection for sessions

This commit is contained in:
TeoV
2018-06-14 10:00:35 -04:00
committed by Dan Christian Bogos
parent fbd0c1198d
commit 210fb00954
4 changed files with 39 additions and 29 deletions

View File

@@ -100,6 +100,15 @@ func TestSSv1ItRpcConn(t *testing.T) {
dummyClnt.Close() // close so we don't get EOF error when disconnecting server
}
func TestV1STSSessionPing(t *testing.T) {
var resp string
if err := sSv1BiRpc.Call(utils.SessionSv1Ping, "", &resp); err != nil {
t.Error(err)
} else if resp != utils.Pong {
t.Error("Unexpected reply returned", resp)
}
}
// Load the tariff plan, creating accounts and their balances
func TestSSv1ItTPFromFolder(t *testing.T) {
attrs := &utils.AttrLoadTpFromFolder{
@@ -519,15 +528,6 @@ func TestSSv1ItProcessEvent(t *testing.T) {
}
}
func TestV1STSSessionPing(t *testing.T) {
var resp string
if err := sSv1BiRpc.Call(utils.SessionSv1Ping, "", &resp); err != nil {
t.Error(err)
} else if resp != utils.Pong {
t.Error("Unexpected reply returned", resp)
}
}
func TestSSv1ItStopCgrEngine(t *testing.T) {
if err := sSv1BiRpc.Close(); err != nil { // Close the connection so we don't get EOF warnings from client
t.Error(err)

View File

@@ -237,7 +237,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
for method, handler := range ssv1.Handlers() {
server.BiRPCRegisterName(method, handler)
}
server.ServeBiJSON(cfg.SessionSCfg().ListenBijson)
server.ServeBiJSON(cfg.SessionSCfg().ListenBijson, sm.OnConnect, sm.OnDisconnect)
exitChan <- true
}
}

View File

@@ -26,6 +26,7 @@ import (
"sync"
"time"
"github.com/cenk/rpc2"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/guardian"
@@ -99,6 +100,7 @@ func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS, thdS,
cdrsrv: cdrsrv,
smgReplConns: smgReplConns,
Timezone: timezone,
biJsonConns: make(map[*rpc2.Client]struct{}),
activeSessions: make(map[string][]*SMGSession),
ssIdxCfg: ssIdxCfg,
aSessionsIndex: make(map[string]map[string]map[string]utils.StringMap),
@@ -111,22 +113,18 @@ func NewSMGeneric(cgrCfg *config.CGRConfig, rals, resS, thdS,
}
type SMGeneric struct {
cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple
rals rpcclient.RpcClientConnection // RALs connections
resS rpcclient.RpcClientConnection // ResourceS connections
thdS rpcclient.RpcClientConnection // ThresholdS connections
statS rpcclient.RpcClientConnection // StatS connections
splS rpcclient.RpcClientConnection // SupplierS connections
attrS rpcclient.RpcClientConnection // AttributeS connections
cdrsrv rpcclient.RpcClientConnection // CDR server connections
smgReplConns []*SMGReplicationConn // list of connections where we will replicate our session data
Timezone string
/* Part of new project
param map[*rpc2.Client]struct{}
toBeRemovedSessions map[string][]*SMGSession
realActiveSession map[string]struct{}
*/
activeSessions map[string][]*SMGSession // group sessions per sessionId, multiple runs based on derived charging
cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple
rals rpcclient.RpcClientConnection // RALs connections
resS rpcclient.RpcClientConnection // ResourceS connections
thdS rpcclient.RpcClientConnection // ThresholdS connections
statS rpcclient.RpcClientConnection // StatS connections
splS rpcclient.RpcClientConnection // SupplierS connections
attrS rpcclient.RpcClientConnection // AttributeS connections
cdrsrv rpcclient.RpcClientConnection // CDR server connections
smgReplConns []*SMGReplicationConn // list of connections where we will replicate our session data
Timezone string
biJsonConns map[*rpc2.Client]struct{} // index BiJSONConnection so we can sync them later
activeSessions map[string][]*SMGSession // group sessions per sessionId, multiple runs based on derived charging
aSessionsMux sync.RWMutex
ssIdxCfg utils.StringMap // index configuration
aSessionsIndex map[string]map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue][runID]utils.StringMap[cgrID]
@@ -1981,6 +1979,16 @@ func (smg *SMGeneric) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection,
return nil
}
func (smg *SMGeneric) syncSessions() {
func (smg *SMGeneric) OnConnect(c *rpc2.Client) {
var s struct{}
smg.biJsonConns[c] = s
}
func (smg *SMGeneric) OnDisconnect(c *rpc2.Client) {
delete(smg.biJsonConns, c)
}
func (smg *SMGeneric) syncSessions() {
// var toBeRemovedSessions map[string][]*SMGSession
// var realActiveSession map[string]struct{}
}

View File

@@ -230,7 +230,7 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string,
http.ListenAndServe(addr, nil)
}
func (s *Server) ServeBiJSON(addr string) {
func (s *Server) ServeBiJSON(addr string, onConn func(*rpc2.Client), onDis func(*rpc2.Client)) {
s.RLock()
isNil := s.birpcSrv == nil
s.RUnlock()
@@ -241,6 +241,8 @@ func (s *Server) ServeBiJSON(addr string) {
if e != nil {
log.Fatal("ServeBiJSON listen error:", e)
}
s.birpcSrv.OnConnect(onConn)
s.birpcSrv.OnDisconnect(onDis)
Logger.Info(fmt.Sprintf("Starting CGRateS BiJSON server at <%s>", addr))
for {
conn, err := lBiJSON.Accept()