From 3f3ad29ab1aef7f31d3b62d7bb7896a2f7cd34d8 Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 27 Oct 2016 21:17:56 +0200 Subject: [PATCH] SMGeneric - basic session replication mechanism without connection establishment --- sessionmanager/smgeneric.go | 42 +++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index b90b63fee..778980a69 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -37,6 +37,12 @@ const ( var ErrPartiallyExecuted = errors.New("Partially executed") +// ReplicationConnection represents one connection to a passive node where we will replicate session data +type SMGReplicationConnection struct { + conn rpcclient.RpcClientConnection + sync bool +} + func NewSMGeneric(cgrCfg *config.CGRConfig, rater rpcclient.RpcClientConnection, cdrsrv rpcclient.RpcClientConnection, timezone string) *SMGeneric { return &SMGeneric{cgrCfg: cgrCfg, rater: rater, @@ -52,6 +58,7 @@ type SMGeneric struct { cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple rater rpcclient.RpcClientConnection cdrsrv rpcclient.RpcClientConnection + smgReplConns []*SMGReplicationConnection // list of connections where we will replicate our session data timezone string activeSessions map[string][]*SMGSession // group sessions per sessionId, multiple runs based on derived charging aSessionsMux sync.RWMutex @@ -376,8 +383,41 @@ func (smg *SMGeneric) LCRSuppliers(gev SMGenericEvent) ([]string, error) { return lcr.SuppliersSlice() } +// replicateSessionsForEvent will replicate session based on configuration +func (smg *SMGeneric) replicateSessionsForEvent(gev SMGenericEvent) (err error) { + if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { + return + } + smg.aSessionsMux.RLock() + aSessions := smg.activeSessions[gev.GetUUID()] + smg.aSessionsMux.RUnlock() + if len(aSessions) == 0 { + return + } + var wg sync.WaitGroup + for _, rplConn := range smg.smgReplConns { + if rplConn.sync { + wg.Add(1) + } + go func(conn rpcclient.RpcClientConnection, sync bool, ss []*SMGSession) { + for _, s := range ss { + var reply string + if err := conn.Call("SMGenericV1.SetPassiveSession", s, &reply); err != nil { + utils.Logger.Err(fmt.Sprintf(" replicating session with id <%s>, run: <%s>: received error: <%s>", gev.GetUUID(), s.RunID)) + } + } + if sync { + wg.Done() + } + }(rplConn.conn, rplConn.sync, aSessions) + } + wg.Wait() // wait for synchronous replication to finish + return +} + // Called on session start func (smg *SMGeneric) InitiateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) (time.Duration, error) { + defer smg.replicateSessionsForEvent(gev) if err := smg.sessionStart(gev, clnt); err != nil { smg.sessionEnd(gev.GetUUID(), 0) return nilDuration, err @@ -397,6 +437,7 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { // Not possible to update a session with debit loop active return 0, errors.New("ACTIVE_DEBIT_LOOP") } + defer smg.replicateSessionsForEvent(gev) if initialID, err := gev.GetFieldAsString(utils.InitialOriginID); err == nil { err := smg.sessionRelocate(gev.GetUUID(), initialID) if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update @@ -437,6 +478,7 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient // Called on session end, should stop debit loop func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) error { + defer smg.replicateSessionsForEvent(gev) if initialID, err := gev.GetFieldAsString(utils.InitialOriginID); err == nil { err := smg.sessionRelocate(gev.GetUUID(), initialID) if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update