From e452d455f4ba433d43844e01985288d9e6597779 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 3 Apr 2017 13:18:13 +0200 Subject: [PATCH] SMG replication with session cloning to avoid concurrency on slow replication --- sessionmanager/smg_event_test.go | 3 --- sessionmanager/smg_session.go | 3 ++- sessionmanager/smgeneric.go | 13 ++++++++++++- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/sessionmanager/smg_event_test.go b/sessionmanager/smg_event_test.go index 2b2143d0b..249cd0235 100644 --- a/sessionmanager/smg_event_test.go +++ b/sessionmanager/smg_event_test.go @@ -18,7 +18,6 @@ along with this program. If not, see package sessionmanager import ( - "fmt" "reflect" "testing" "time" @@ -148,8 +147,6 @@ func TestSMGenericEventGetSessionTTL(t *testing.T) { sesTTLMaxDelay := time.Duration(10 * time.Second) if sTTL := smGev.GetSessionTTL(time.Duration(5*time.Second), &sesTTLMaxDelay); sTTL == eSesTTL || sTTL > eSesTTL+sesTTLMaxDelay { t.Errorf("Received: %v", sTTL) - } else { - fmt.Println(sTTL) } } diff --git a/sessionmanager/smg_session.go b/sessionmanager/smg_session.go index 257ff9925..8645ee926 100644 --- a/sessionmanager/smg_session.go +++ b/sessionmanager/smg_session.go @@ -22,6 +22,7 @@ import ( "fmt" "reflect" "strconv" + "sync" "time" "github.com/cgrates/cgrates/engine" @@ -31,6 +32,7 @@ import ( // One session handled by SM type SMGSession struct { + mux sync.RWMutex // protects the SMGSession in places where is concurrently accessed CGRID string // Unique identifier for this session EventStart SMGenericEvent // Event which started stopDebit chan struct{} // Channel to communicate with debit loops when closing the session @@ -142,7 +144,6 @@ func (self *SMGSession) debit(dur time.Duration, lastUsed *time.Duration) (time. // Attempts to refund a duration, error on failure func (self *SMGSession) refund(refundDuration time.Duration) error { - if refundDuration == 0 { // Nothing to refund return nil } diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index 733a93b69..76b46db77 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -471,7 +471,18 @@ func (smg *SMGeneric) replicateSessionsWithID(cgrID string, passiveSessions bool } ssMux.RLock() ss := ssMp[cgrID] + if len(ss) != 0 { + ss[0].mux.RLock() // lock session so we can clone it after releasing the map lock + } ssMux.RUnlock() + var ssCln []*SMGSession + err = utils.Clone(ss, &ssCln) + if len(ss) != 0 { + ss[0].mux.RUnlock() + } + if err != nil { + return + } var wg sync.WaitGroup for _, rplConn := range smgReplConns { if rplConn.Synchronous { @@ -484,7 +495,7 @@ func (smg *SMGeneric) replicateSessionsWithID(cgrID string, passiveSessions bool if sync { wg.Done() } - }(rplConn.Connection, rplConn.Synchronous, ss) + }(rplConn.Connection, rplConn.Synchronous, ssCln) } wg.Wait() // wait for synchronous replication to finish return