From 2af67288f43045185163fe28ec76c40cb59cdfc0 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Tue, 22 Sep 2020 11:26:52 +0300 Subject: [PATCH] Updated sessions locks --- general_tests/sentinel_it_test.go | 3 ++- sessions/sessions.go | 21 ++++++++++++++++----- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/general_tests/sentinel_it_test.go b/general_tests/sentinel_it_test.go index 6e58df6c2..f6bee780d 100755 --- a/general_tests/sentinel_it_test.go +++ b/general_tests/sentinel_it_test.go @@ -26,6 +26,7 @@ import ( "os/exec" "path" "reflect" + "strconv" "testing" "github.com/cgrates/cgrates/config" @@ -194,7 +195,7 @@ func testRedisSentinelInsertion(t *testing.T) { } } index = index + 1 - id = orgiginID + string(index) + id = orgiginID + strconv.Itoa(index) } forFunc1 := func(t *testing.T) { for i := 0; i < 25; i++ { diff --git a/sessions/sessions.go b/sessions/sessions.go index 54d001be7..f7ae5a148 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -301,8 +301,10 @@ func (sS *SessionS) setSTerminator(s *Session) { if s.sTerminator.ttlLastUsage != nil { lastUsage = *s.sTerminator.ttlLastUsage } + s.Lock() // protect forceSTerminate as it is not thread safe for the session sS.forceSTerminate(s, lastUsage, s.sTerminator.ttlUsage, s.sTerminator.ttlLastUsed) + s.Unlock() case <-s.sTerminator.endChan: s.sTerminator.timer.Stop() } @@ -673,9 +675,10 @@ func (sS *SessionS) replicateSessions(cgrID string, psv bool, connIDs []string) ss := sS.getSessions(cgrID, psv) if len(ss) == 0 { // session scheduled to be removed from remote (initiate also the EventStart to avoid the panic) - ss = []*Session{ - &Session{CGRID: cgrID, - EventStart: make(engine.MapEvent)}} + ss = []*Session{{ + CGRID: cgrID, + EventStart: make(engine.MapEvent), + }} } for _, s := range ss { sCln := s.Clone() @@ -1246,11 +1249,13 @@ func (sS *SessionS) syncSessions() { if len(ss) == 0 { continue } + ss[0].Lock() // protect forceSTerminate as it is not thread safe for the session if err := sS.forceSTerminate(ss[0], 0, nil, nil); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> failed force-terminating session: <%s>, err: <%s>", utils.SessionS, cgrID, err.Error())) } + ss[0].Unlock() } } @@ -2620,6 +2625,8 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.ClientConnector, utils.Logger.Warning( fmt.Sprintf("<%s> ProcessCDR called for active session with CGRID: <%s>", utils.SessionS, cgrID)) + s.Lock() // events update session panic + defer s.Unlock() } else if sIface, has := engine.Cache.Get(utils.CacheClosedSessions, cgrID); has { // found in cache s = sIface.(*Session) @@ -3248,6 +3255,8 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector, utils.Logger.Warning( fmt.Sprintf("<%s> ProcessCDR called for active session with CGRID: <%s>", utils.SessionS, cgrID)) + s.Lock() // events update session panic + defer s.Unlock() } else if sIface, has := engine.Cache.Get(utils.CacheClosedSessions, cgrID); has { // found in cache s = sIface.(*Session) @@ -3269,8 +3278,8 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector, var withErrors bool for _, cgrEv := range cgrEvs { argsProc := &engine.ArgV1ProcessEvent{ - Flags: []string{fmt.Sprintf("%s:false", utils.MetaChargers), - fmt.Sprintf("%s:false", utils.MetaAttributes)}, + Flags: []string{utils.MetaChargers + ":false", + utils.MetaAttributes + ":false"}, CGREvent: *cgrEv, ArgDispatcher: args.ArgDispatcher, } @@ -3321,6 +3330,7 @@ func (sS *SessionS) BiRPCv1ForceDisconnect(clnt rpcclient.ClientConnector, if len(ss) == 0 { continue } + ss[0].Lock() // protect forceSTerminate as it is not thread safe for the session if errTerm := sS.forceSTerminate(ss[0], 0, nil, nil); errTerm != nil { utils.Logger.Warning( fmt.Sprintf( @@ -3328,6 +3338,7 @@ func (sS *SessionS) BiRPCv1ForceDisconnect(clnt rpcclient.ClientConnector, utils.SessionS, ss[0].cgrID(), errTerm.Error())) err = utils.ErrPartiallyExecuted } + ss[0].Unlock() } if err == nil { *reply = utils.OK