From 9b34f9852134e6d5e5ddf094dbd328cc3d2a1f0d Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 18 Nov 2016 17:06:24 +0100 Subject: [PATCH] SMGeneric - use pointer for locking --- sessionmanager/smgeneric.go | 38 ++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index 0c0eb07bb..b07adc02d 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -206,11 +206,11 @@ func (smg *SMGeneric) unrecordASession(cgrID string) bool { // indexSession explores settings and builds SessionsIndex // uses different tables and mutex-es depending on active/passive session func (smg *SMGeneric) indexSession(s *SMGSession, passiveSessions bool) { - idxMux := smg.aSIMux + idxMux := &smg.aSIMux // pointer to original mux since we cannot copy it ssIndx := smg.aSessionsIndex ssRIdx := smg.aSessionsRIndex if passiveSessions { - idxMux = smg.pSIMux + idxMux = &smg.pSIMux ssIndx = smg.pSessionsIndex ssRIdx = smg.pSessionsRIndex } @@ -242,19 +242,18 @@ func (smg *SMGeneric) indexSession(s *SMGSession, passiveSessions bool) { if _, hasIt := ssRIdx[s.CGRID]; !hasIt { ssRIdx[s.CGRID] = make([]*riFieldNameVal, 0) } - riFlds := ssRIdx[s.CGRID] // attempt to avoid map concurrency write panic - ssRIdx[s.CGRID] = append(riFlds, &riFieldNameVal{runID: s.RunID, fieldName: fieldName, fieldValue: fieldVal}) + ssRIdx[s.CGRID] = append(ssRIdx[s.CGRID], &riFieldNameVal{runID: s.RunID, fieldName: fieldName, fieldValue: fieldVal}) } return } // unindexASession removes a session from indexes func (smg *SMGeneric) unindexSession(cgrID string, passiveSessions bool) bool { - idxMux := smg.aSIMux + idxMux := &smg.aSIMux ssIndx := smg.aSessionsIndex ssRIdx := smg.aSessionsRIndex if passiveSessions { - idxMux = smg.pSIMux + idxMux = &smg.pSIMux ssIndx = smg.pSessionsIndex ssRIdx = smg.pSessionsRIndex } @@ -282,10 +281,10 @@ func (smg *SMGeneric) unindexSession(cgrID string, passiveSessions bool) bool { // getSessionIDsMatchingIndexes will check inside indexes if it can find sessionIDs matching all filters // matchedIndexes returns map[matchedFieldName]possibleMatchedFieldVal so we optimize further to avoid checking them func (smg *SMGeneric) getSessionIDsMatchingIndexes(fltrs map[string]string, passiveSessions bool) (utils.StringMap, map[string]string) { - idxMux := smg.aSIMux + idxMux := &smg.aSIMux ssIndx := smg.aSessionsIndex if passiveSessions { - idxMux = smg.pSIMux + idxMux = &smg.pSIMux ssIndx = smg.pSessionsIndex } idxMux.RLock() @@ -326,10 +325,10 @@ func (smg *SMGeneric) getSessionIDsMatchingIndexes(fltrs map[string]string, pass // getSessionIDsForPrefix works with session relocation returning list of sessions with ID matching prefix for OriginID field func (smg *SMGeneric) getSessionIDsForPrefix(prefix string, passiveSessions bool) (cgrIDs []string) { - idxMux := smg.aSIMux + idxMux := &smg.aSIMux ssIndx := smg.aSessionsIndex if passiveSessions { - idxMux = smg.pSIMux + idxMux = &smg.pSIMux ssIndx = smg.pSessionsIndex } idxMux.RLock() @@ -470,18 +469,21 @@ func (smg *SMGeneric) replicateSessions(cgrID string) (err error) { // getSessions is used to return in a thread-safe manner active or passive sessions func (smg *SMGeneric) getSessions(cgrID string, passiveSessions bool) (aSS map[string][]*SMGSession) { - ssMux := smg.aSessionsMux - ssMp := smg.activeSessions + ssMux := &smg.aSessionsMux + ssMp := smg.activeSessions // reference it so we don't overwrite the new map without protection if passiveSessions { - ssMux = smg.pSessionsMux + ssMux = &smg.pSessionsMux ssMp = smg.passiveSessions } ssMux.RLock() defer ssMux.RUnlock() - if len(cgrID) == 0 { - return ssMp - } aSS = make(map[string][]*SMGSession) + if len(cgrID) == 0 { + for k, v := range ssMp { + aSS[k] = v // Copy to avoid concurrency on sessions map + } + return + } if ss, hasCGRID := ssMp[cgrID]; hasCGRID { aSS[cgrID] = ss } @@ -514,9 +516,7 @@ func (smg *SMGeneric) setPassiveSessions(cgrID string, ss []*SMGSession) (err er func (smg *SMGeneric) removePassiveSessions(cgrID string) (err error) { for _, cacheKey := range []string{"InitiateSession" + cgrID, "UpdateSession" + cgrID, "TerminateSession" + cgrID} { if _, err := smg.responseCache.Get(cacheKey); err == nil { // Stop processing passive when there has been an update over active RPC - if _, hasCGRID := smg.passiveSessions[cgrID]; hasCGRID { - delete(smg.passiveSessions, cgrID) - } + smg.deletePassiveSessions(cgrID) return ErrActiveSession } }