Merge remote-tracking branch 'Fetch/master'

This commit is contained in:
Edwardro22
2016-11-02 20:05:20 +01:00
2 changed files with 33 additions and 36 deletions

View File

@@ -130,7 +130,7 @@ func (self *SmGenericConfig) loadFromJsonCfg(jsnCfg *SmGenericJsonCfg) error {
if jsnCfg.Smg_replication_conns != nil {
self.SMGReplicationConns = make([]*HaPoolConfig, len(*jsnCfg.Smg_replication_conns))
for idx, jsnHaCfg := range *jsnCfg.Smg_replication_conns {
self.CDRsConns[idx] = NewDfltHaPoolConfig()
self.SMGReplicationConns[idx] = NewDfltHaPoolConfig()
self.SMGReplicationConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}

View File

@@ -111,6 +111,7 @@ func (smg *SMGeneric) ttlTerminate(s *SMGSession, tmtr *smgSessionTerminator) {
cdr.Usage = s.TotalUsage
var reply string
smg.cdrsrv.Call("CdrsV1.ProcessCDR", cdr, &reply)
smg.replicateSessions(s.EventStart.GetUUID())
}
func (smg *SMGeneric) recordASession(uuid string, s *SMGSession) {
@@ -247,6 +248,7 @@ func (smg *SMGeneric) getSessionIDsMatchingIndexes(fltrs map[string]string) (uti
return matchingSessions.Clone(), matchedIndexes
}
// getSessionIDsForPrefix works with session relocation returning list of sessions with ID matching prefix
func (smg *SMGeneric) getSessionIDsForPrefix(prefix string) []string {
smg.aSessionsMux.Lock()
defer smg.aSessionsMux.Unlock()
@@ -355,6 +357,32 @@ func (smg *SMGeneric) sessionRelocate(sessionID, initialID string) error {
return err
}
// replicateSessionsForEvent will replicate session based on configuration
func (smg *SMGeneric) replicateSessions(originID string) (err error) {
if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 {
return
}
smg.aSessionsMux.RLock()
aSessions := smg.activeSessions[originID]
smg.aSessionsMux.RUnlock()
var wg sync.WaitGroup
for _, rplConn := range smg.smgReplConns {
if rplConn.Synchronous {
wg.Add(1)
}
go func(conn rpcclient.RpcClientConnection, sync bool, ss []*SMGSession) {
var reply string
argSet := ArgsSetPassiveSessions{OriginID: originID, Sessions: ss}
conn.Call("SMGenericV1.SetPassiveSessions", argSet, &reply)
if sync {
wg.Done()
}
}(rplConn.Connection, rplConn.Synchronous, aSessions)
}
wg.Wait() // wait for synchronous replication to finish
return
}
// Methods to apply on sessions, mostly exported through RPC/Bi-RPC
//Calculates maximum usage allowed for gevent
func (smg *SMGeneric) MaxUsage(gev SMGenericEvent) (time.Duration, error) {
@@ -385,41 +413,8 @@ func (smg *SMGeneric) LCRSuppliers(gev SMGenericEvent) ([]string, error) {
return lcr.SuppliersSlice()
}
// replicateSessionsForEvent will replicate session based on configuration
func (smg *SMGeneric) replicateSessionsForEvent(gev SMGenericEvent, terminate bool) (err error) {
if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 {
return
}
smg.aSessionsMux.RLock()
aSessions := smg.activeSessions[gev.GetUUID()]
smg.aSessionsMux.RUnlock()
if len(aSessions) == 0 {
return
}
var wg sync.WaitGroup
for _, rplConn := range smg.smgReplConns {
if rplConn.Synchronous {
wg.Add(1)
}
go func(conn rpcclient.RpcClientConnection, sync bool, ss []*SMGSession) {
var reply string
argSet := ArgsSetPassiveSessions{OriginID: gev.GetUUID()}
if !terminate {
argSet.Sessions = ss
}
conn.Call("SMGenericV1.SetPassiveSessions", argSet, &reply)
if sync {
wg.Done()
}
}(rplConn.Connection, rplConn.Synchronous, aSessions)
}
wg.Wait() // wait for synchronous replication to finish
return
}
// Called on session start
func (smg *SMGeneric) InitiateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) (time.Duration, error) {
defer smg.replicateSessionsForEvent(gev, false)
if err := smg.sessionStart(gev, clnt); err != nil {
smg.sessionEnd(gev.GetUUID(), 0)
return nilDuration, err
@@ -439,8 +434,9 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient
if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { // Not possible to update a session with debit loop active
return 0, errors.New("ACTIVE_DEBIT_LOOP")
}
defer smg.replicateSessionsForEvent(gev, false)
defer smg.replicateSessions(gev.GetUUID())
if initialID, err := gev.GetFieldAsString(utils.InitialOriginID); err == nil {
defer smg.replicateSessions(initialID)
err := smg.sessionRelocate(gev.GetUUID(), initialID)
if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update
err = smg.sessionStart(gev, clnt)
@@ -480,9 +476,9 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient
// Called on session end, should stop debit loop
func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) error {
defer smg.replicateSessionsForEvent(gev, true)
if initialID, err := gev.GetFieldAsString(utils.InitialOriginID); err == nil {
err := smg.sessionRelocate(gev.GetUUID(), initialID)
defer smg.replicateSessions(initialID)
if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update
err = smg.sessionStart(gev, clnt)
}
@@ -512,6 +508,7 @@ func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, clnt rpcclient.RpcCli
var interimError error
var hasActiveSession bool
for _, sessionID := range sessionIDs {
defer smg.replicateSessions(sessionID)
var s *SMGSession
for _, s = range smg.getASession(sessionID) {
break