mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-15 21:29:52 +05:00
SMGeneric - use pointer for locking
This commit is contained in:
@@ -206,11 +206,11 @@ func (smg *SMGeneric) unrecordASession(cgrID string) bool {
|
||||
// indexSession explores settings and builds SessionsIndex
|
||||
// uses different tables and mutex-es depending on active/passive session
|
||||
func (smg *SMGeneric) indexSession(s *SMGSession, passiveSessions bool) {
|
||||
idxMux := smg.aSIMux
|
||||
idxMux := &smg.aSIMux // pointer to original mux since we cannot copy it
|
||||
ssIndx := smg.aSessionsIndex
|
||||
ssRIdx := smg.aSessionsRIndex
|
||||
if passiveSessions {
|
||||
idxMux = smg.pSIMux
|
||||
idxMux = &smg.pSIMux
|
||||
ssIndx = smg.pSessionsIndex
|
||||
ssRIdx = smg.pSessionsRIndex
|
||||
}
|
||||
@@ -242,19 +242,18 @@ func (smg *SMGeneric) indexSession(s *SMGSession, passiveSessions bool) {
|
||||
if _, hasIt := ssRIdx[s.CGRID]; !hasIt {
|
||||
ssRIdx[s.CGRID] = make([]*riFieldNameVal, 0)
|
||||
}
|
||||
riFlds := ssRIdx[s.CGRID] // attempt to avoid map concurrency write panic
|
||||
ssRIdx[s.CGRID] = append(riFlds, &riFieldNameVal{runID: s.RunID, fieldName: fieldName, fieldValue: fieldVal})
|
||||
ssRIdx[s.CGRID] = append(ssRIdx[s.CGRID], &riFieldNameVal{runID: s.RunID, fieldName: fieldName, fieldValue: fieldVal})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// unindexASession removes a session from indexes
|
||||
func (smg *SMGeneric) unindexSession(cgrID string, passiveSessions bool) bool {
|
||||
idxMux := smg.aSIMux
|
||||
idxMux := &smg.aSIMux
|
||||
ssIndx := smg.aSessionsIndex
|
||||
ssRIdx := smg.aSessionsRIndex
|
||||
if passiveSessions {
|
||||
idxMux = smg.pSIMux
|
||||
idxMux = &smg.pSIMux
|
||||
ssIndx = smg.pSessionsIndex
|
||||
ssRIdx = smg.pSessionsRIndex
|
||||
}
|
||||
@@ -282,10 +281,10 @@ func (smg *SMGeneric) unindexSession(cgrID string, passiveSessions bool) bool {
|
||||
// getSessionIDsMatchingIndexes will check inside indexes if it can find sessionIDs matching all filters
|
||||
// matchedIndexes returns map[matchedFieldName]possibleMatchedFieldVal so we optimize further to avoid checking them
|
||||
func (smg *SMGeneric) getSessionIDsMatchingIndexes(fltrs map[string]string, passiveSessions bool) (utils.StringMap, map[string]string) {
|
||||
idxMux := smg.aSIMux
|
||||
idxMux := &smg.aSIMux
|
||||
ssIndx := smg.aSessionsIndex
|
||||
if passiveSessions {
|
||||
idxMux = smg.pSIMux
|
||||
idxMux = &smg.pSIMux
|
||||
ssIndx = smg.pSessionsIndex
|
||||
}
|
||||
idxMux.RLock()
|
||||
@@ -326,10 +325,10 @@ func (smg *SMGeneric) getSessionIDsMatchingIndexes(fltrs map[string]string, pass
|
||||
|
||||
// getSessionIDsForPrefix works with session relocation returning list of sessions with ID matching prefix for OriginID field
|
||||
func (smg *SMGeneric) getSessionIDsForPrefix(prefix string, passiveSessions bool) (cgrIDs []string) {
|
||||
idxMux := smg.aSIMux
|
||||
idxMux := &smg.aSIMux
|
||||
ssIndx := smg.aSessionsIndex
|
||||
if passiveSessions {
|
||||
idxMux = smg.pSIMux
|
||||
idxMux = &smg.pSIMux
|
||||
ssIndx = smg.pSessionsIndex
|
||||
}
|
||||
idxMux.RLock()
|
||||
@@ -470,18 +469,21 @@ func (smg *SMGeneric) replicateSessions(cgrID string) (err error) {
|
||||
|
||||
// getSessions is used to return in a thread-safe manner active or passive sessions
|
||||
func (smg *SMGeneric) getSessions(cgrID string, passiveSessions bool) (aSS map[string][]*SMGSession) {
|
||||
ssMux := smg.aSessionsMux
|
||||
ssMp := smg.activeSessions
|
||||
ssMux := &smg.aSessionsMux
|
||||
ssMp := smg.activeSessions // reference it so we don't overwrite the new map without protection
|
||||
if passiveSessions {
|
||||
ssMux = smg.pSessionsMux
|
||||
ssMux = &smg.pSessionsMux
|
||||
ssMp = smg.passiveSessions
|
||||
}
|
||||
ssMux.RLock()
|
||||
defer ssMux.RUnlock()
|
||||
if len(cgrID) == 0 {
|
||||
return ssMp
|
||||
}
|
||||
aSS = make(map[string][]*SMGSession)
|
||||
if len(cgrID) == 0 {
|
||||
for k, v := range ssMp {
|
||||
aSS[k] = v // Copy to avoid concurrency on sessions map
|
||||
}
|
||||
return
|
||||
}
|
||||
if ss, hasCGRID := ssMp[cgrID]; hasCGRID {
|
||||
aSS[cgrID] = ss
|
||||
}
|
||||
@@ -514,9 +516,7 @@ func (smg *SMGeneric) setPassiveSessions(cgrID string, ss []*SMGSession) (err er
|
||||
func (smg *SMGeneric) removePassiveSessions(cgrID string) (err error) {
|
||||
for _, cacheKey := range []string{"InitiateSession" + cgrID, "UpdateSession" + cgrID, "TerminateSession" + cgrID} {
|
||||
if _, err := smg.responseCache.Get(cacheKey); err == nil { // Stop processing passive when there has been an update over active RPC
|
||||
if _, hasCGRID := smg.passiveSessions[cgrID]; hasCGRID {
|
||||
delete(smg.passiveSessions, cgrID)
|
||||
}
|
||||
smg.deletePassiveSessions(cgrID)
|
||||
return ErrActiveSession
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user