mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
SMG replication with session cloning to avoid concurrency on slow replication
This commit is contained in:
@@ -18,7 +18,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package sessionmanager
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -148,8 +147,6 @@ func TestSMGenericEventGetSessionTTL(t *testing.T) {
|
||||
sesTTLMaxDelay := time.Duration(10 * time.Second)
|
||||
if sTTL := smGev.GetSessionTTL(time.Duration(5*time.Second), &sesTTLMaxDelay); sTTL == eSesTTL || sTTL > eSesTTL+sesTTLMaxDelay {
|
||||
t.Errorf("Received: %v", sTTL)
|
||||
} else {
|
||||
fmt.Println(sTTL)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -31,6 +32,7 @@ import (
|
||||
|
||||
// One session handled by SM
|
||||
type SMGSession struct {
|
||||
mux sync.RWMutex // protects the SMGSession in places where is concurrently accessed
|
||||
CGRID string // Unique identifier for this session
|
||||
EventStart SMGenericEvent // Event which started
|
||||
stopDebit chan struct{} // Channel to communicate with debit loops when closing the session
|
||||
@@ -142,7 +144,6 @@ func (self *SMGSession) debit(dur time.Duration, lastUsed *time.Duration) (time.
|
||||
|
||||
// Attempts to refund a duration, error on failure
|
||||
func (self *SMGSession) refund(refundDuration time.Duration) error {
|
||||
|
||||
if refundDuration == 0 { // Nothing to refund
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -471,7 +471,18 @@ func (smg *SMGeneric) replicateSessionsWithID(cgrID string, passiveSessions bool
|
||||
}
|
||||
ssMux.RLock()
|
||||
ss := ssMp[cgrID]
|
||||
if len(ss) != 0 {
|
||||
ss[0].mux.RLock() // lock session so we can clone it after releasing the map lock
|
||||
}
|
||||
ssMux.RUnlock()
|
||||
var ssCln []*SMGSession
|
||||
err = utils.Clone(ss, &ssCln)
|
||||
if len(ss) != 0 {
|
||||
ss[0].mux.RUnlock()
|
||||
}
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
for _, rplConn := range smgReplConns {
|
||||
if rplConn.Synchronous {
|
||||
@@ -484,7 +495,7 @@ func (smg *SMGeneric) replicateSessionsWithID(cgrID string, passiveSessions bool
|
||||
if sync {
|
||||
wg.Done()
|
||||
}
|
||||
}(rplConn.Connection, rplConn.Synchronous, ss)
|
||||
}(rplConn.Connection, rplConn.Synchronous, ssCln)
|
||||
}
|
||||
wg.Wait() // wait for synchronous replication to finish
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user