diff --git a/config/smconfig.go b/config/smconfig.go index 8f375398e..7844653a3 100644 --- a/config/smconfig.go +++ b/config/smconfig.go @@ -130,7 +130,7 @@ func (self *SmGenericConfig) loadFromJsonCfg(jsnCfg *SmGenericJsonCfg) error { if jsnCfg.Smg_replication_conns != nil { self.SMGReplicationConns = make([]*HaPoolConfig, len(*jsnCfg.Smg_replication_conns)) for idx, jsnHaCfg := range *jsnCfg.Smg_replication_conns { - self.CDRsConns[idx] = NewDfltHaPoolConfig() + self.SMGReplicationConns[idx] = NewDfltHaPoolConfig() self.SMGReplicationConns[idx].loadFromJsonCfg(jsnHaCfg) } } diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index 4cf78ae1d..b2f103372 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -111,6 +111,7 @@ func (smg *SMGeneric) ttlTerminate(s *SMGSession, tmtr *smgSessionTerminator) { cdr.Usage = s.TotalUsage var reply string smg.cdrsrv.Call("CdrsV1.ProcessCDR", cdr, &reply) + smg.replicateSessions(s.EventStart.GetUUID()) } func (smg *SMGeneric) recordASession(uuid string, s *SMGSession) { @@ -247,6 +248,7 @@ func (smg *SMGeneric) getSessionIDsMatchingIndexes(fltrs map[string]string) (uti return matchingSessions.Clone(), matchedIndexes } +// getSessionIDsForPrefix works with session relocation returning list of sessions with ID matching prefix func (smg *SMGeneric) getSessionIDsForPrefix(prefix string) []string { smg.aSessionsMux.Lock() defer smg.aSessionsMux.Unlock() @@ -355,6 +357,32 @@ func (smg *SMGeneric) sessionRelocate(sessionID, initialID string) error { return err } +// replicateSessionsForEvent will replicate session based on configuration +func (smg *SMGeneric) replicateSessions(originID string) (err error) { + if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { + return + } + smg.aSessionsMux.RLock() + aSessions := smg.activeSessions[originID] + smg.aSessionsMux.RUnlock() + var wg sync.WaitGroup + for _, rplConn := range smg.smgReplConns { + if rplConn.Synchronous { + wg.Add(1) + } + go func(conn rpcclient.RpcClientConnection, sync bool, ss []*SMGSession) { + var reply string + argSet := ArgsSetPassiveSessions{OriginID: originID, Sessions: ss} + conn.Call("SMGenericV1.SetPassiveSessions", argSet, &reply) + if sync { + wg.Done() + } + }(rplConn.Connection, rplConn.Synchronous, aSessions) + } + wg.Wait() // wait for synchronous replication to finish + return +} + // Methods to apply on sessions, mostly exported through RPC/Bi-RPC //Calculates maximum usage allowed for gevent func (smg *SMGeneric) MaxUsage(gev SMGenericEvent) (time.Duration, error) { @@ -385,41 +413,8 @@ func (smg *SMGeneric) LCRSuppliers(gev SMGenericEvent) ([]string, error) { return lcr.SuppliersSlice() } -// replicateSessionsForEvent will replicate session based on configuration -func (smg *SMGeneric) replicateSessionsForEvent(gev SMGenericEvent, terminate bool) (err error) { - if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { - return - } - smg.aSessionsMux.RLock() - aSessions := smg.activeSessions[gev.GetUUID()] - smg.aSessionsMux.RUnlock() - if len(aSessions) == 0 { - return - } - var wg sync.WaitGroup - for _, rplConn := range smg.smgReplConns { - if rplConn.Synchronous { - wg.Add(1) - } - go func(conn rpcclient.RpcClientConnection, sync bool, ss []*SMGSession) { - var reply string - argSet := ArgsSetPassiveSessions{OriginID: gev.GetUUID()} - if !terminate { - argSet.Sessions = ss - } - conn.Call("SMGenericV1.SetPassiveSessions", argSet, &reply) - if sync { - wg.Done() - } - }(rplConn.Connection, rplConn.Synchronous, aSessions) - } - wg.Wait() // wait for synchronous replication to finish - return -} - // Called on session start func (smg *SMGeneric) InitiateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) (time.Duration, error) { - defer smg.replicateSessionsForEvent(gev, false) if err := smg.sessionStart(gev, clnt); err != nil { smg.sessionEnd(gev.GetUUID(), 0) return nilDuration, err @@ -439,8 +434,9 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { // Not possible to update a session with debit loop active return 0, errors.New("ACTIVE_DEBIT_LOOP") } - defer smg.replicateSessionsForEvent(gev, false) + defer smg.replicateSessions(gev.GetUUID()) if initialID, err := gev.GetFieldAsString(utils.InitialOriginID); err == nil { + defer smg.replicateSessions(initialID) err := smg.sessionRelocate(gev.GetUUID(), initialID) if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update err = smg.sessionStart(gev, clnt) @@ -480,9 +476,9 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient // Called on session end, should stop debit loop func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) error { - defer smg.replicateSessionsForEvent(gev, true) if initialID, err := gev.GetFieldAsString(utils.InitialOriginID); err == nil { err := smg.sessionRelocate(gev.GetUUID(), initialID) + defer smg.replicateSessions(initialID) if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update err = smg.sessionStart(gev, clnt) } @@ -512,6 +508,7 @@ func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, clnt rpcclient.RpcCli var interimError error var hasActiveSession bool for _, sessionID := range sessionIDs { + defer smg.replicateSessions(sessionID) var s *SMGSession for _, s = range smg.getASession(sessionID) { break