Updated sessions locks

This commit is contained in:
Trial97
2020-09-22 11:26:52 +03:00
committed by Dan Christian Bogos
parent 3128c29c4b
commit 2af67288f4
2 changed files with 18 additions and 6 deletions

View File

@@ -26,6 +26,7 @@ import (
"os/exec"
"path"
"reflect"
"strconv"
"testing"
"github.com/cgrates/cgrates/config"
@@ -194,7 +195,7 @@ func testRedisSentinelInsertion(t *testing.T) {
}
}
index = index + 1
id = orgiginID + string(index)
id = orgiginID + strconv.Itoa(index)
}
forFunc1 := func(t *testing.T) {
for i := 0; i < 25; i++ {

View File

@@ -301,8 +301,10 @@ func (sS *SessionS) setSTerminator(s *Session) {
if s.sTerminator.ttlLastUsage != nil {
lastUsage = *s.sTerminator.ttlLastUsage
}
s.Lock() // protect forceSTerminate as it is not thread safe for the session
sS.forceSTerminate(s, lastUsage,
s.sTerminator.ttlUsage, s.sTerminator.ttlLastUsed)
s.Unlock()
case <-s.sTerminator.endChan:
s.sTerminator.timer.Stop()
}
@@ -673,9 +675,10 @@ func (sS *SessionS) replicateSessions(cgrID string, psv bool, connIDs []string)
ss := sS.getSessions(cgrID, psv)
if len(ss) == 0 {
// session scheduled to be removed from remote (initiate also the EventStart to avoid the panic)
ss = []*Session{
&Session{CGRID: cgrID,
EventStart: make(engine.MapEvent)}}
ss = []*Session{{
CGRID: cgrID,
EventStart: make(engine.MapEvent),
}}
}
for _, s := range ss {
sCln := s.Clone()
@@ -1246,11 +1249,13 @@ func (sS *SessionS) syncSessions() {
if len(ss) == 0 {
continue
}
ss[0].Lock() // protect forceSTerminate as it is not thread safe for the session
if err := sS.forceSTerminate(ss[0], 0, nil, nil); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> failed force-terminating session: <%s>, err: <%s>",
utils.SessionS, cgrID, err.Error()))
}
ss[0].Unlock()
}
}
@@ -2620,6 +2625,8 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.ClientConnector,
utils.Logger.Warning(
fmt.Sprintf("<%s> ProcessCDR called for active session with CGRID: <%s>",
utils.SessionS, cgrID))
s.Lock() // events update session panic
defer s.Unlock()
} else if sIface, has := engine.Cache.Get(utils.CacheClosedSessions, cgrID); has {
// found in cache
s = sIface.(*Session)
@@ -3248,6 +3255,8 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector,
utils.Logger.Warning(
fmt.Sprintf("<%s> ProcessCDR called for active session with CGRID: <%s>",
utils.SessionS, cgrID))
s.Lock() // events update session panic
defer s.Unlock()
} else if sIface, has := engine.Cache.Get(utils.CacheClosedSessions, cgrID); has {
// found in cache
s = sIface.(*Session)
@@ -3269,8 +3278,8 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector,
var withErrors bool
for _, cgrEv := range cgrEvs {
argsProc := &engine.ArgV1ProcessEvent{
Flags: []string{fmt.Sprintf("%s:false", utils.MetaChargers),
fmt.Sprintf("%s:false", utils.MetaAttributes)},
Flags: []string{utils.MetaChargers + ":false",
utils.MetaAttributes + ":false"},
CGREvent: *cgrEv,
ArgDispatcher: args.ArgDispatcher,
}
@@ -3321,6 +3330,7 @@ func (sS *SessionS) BiRPCv1ForceDisconnect(clnt rpcclient.ClientConnector,
if len(ss) == 0 {
continue
}
ss[0].Lock() // protect forceSTerminate as it is not thread safe for the session
if errTerm := sS.forceSTerminate(ss[0], 0, nil, nil); errTerm != nil {
utils.Logger.Warning(
fmt.Sprintf(
@@ -3328,6 +3338,7 @@ func (sS *SessionS) BiRPCv1ForceDisconnect(clnt rpcclient.ClientConnector,
utils.SessionS, ss[0].cgrID(), errTerm.Error()))
err = utils.ErrPartiallyExecuted
}
ss[0].Unlock()
}
if err == nil {
*reply = utils.OK