SMGeneric - basic session replication mechanism without connection establishment

This commit is contained in:
DanB
2016-10-27 21:17:56 +02:00
parent 7d4d600dca
commit 3f3ad29ab1

View File

@@ -37,6 +37,12 @@ const (
var ErrPartiallyExecuted = errors.New("Partially executed")
// ReplicationConnection represents one connection to a passive node where we will replicate session data
type SMGReplicationConnection struct {
conn rpcclient.RpcClientConnection
sync bool
}
func NewSMGeneric(cgrCfg *config.CGRConfig, rater rpcclient.RpcClientConnection, cdrsrv rpcclient.RpcClientConnection, timezone string) *SMGeneric {
return &SMGeneric{cgrCfg: cgrCfg,
rater: rater,
@@ -52,6 +58,7 @@ type SMGeneric struct {
cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple
rater rpcclient.RpcClientConnection
cdrsrv rpcclient.RpcClientConnection
smgReplConns []*SMGReplicationConnection // list of connections where we will replicate our session data
timezone string
activeSessions map[string][]*SMGSession // group sessions per sessionId, multiple runs based on derived charging
aSessionsMux sync.RWMutex
@@ -376,8 +383,41 @@ func (smg *SMGeneric) LCRSuppliers(gev SMGenericEvent) ([]string, error) {
return lcr.SuppliersSlice()
}
// replicateSessionsForEvent will replicate session based on configuration
func (smg *SMGeneric) replicateSessionsForEvent(gev SMGenericEvent) (err error) {
if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 {
return
}
smg.aSessionsMux.RLock()
aSessions := smg.activeSessions[gev.GetUUID()]
smg.aSessionsMux.RUnlock()
if len(aSessions) == 0 {
return
}
var wg sync.WaitGroup
for _, rplConn := range smg.smgReplConns {
if rplConn.sync {
wg.Add(1)
}
go func(conn rpcclient.RpcClientConnection, sync bool, ss []*SMGSession) {
for _, s := range ss {
var reply string
if err := conn.Call("SMGenericV1.SetPassiveSession", s, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> replicating session with id <%s>, run: <%s>: received error: <%s>", gev.GetUUID(), s.RunID))
}
}
if sync {
wg.Done()
}
}(rplConn.conn, rplConn.sync, aSessions)
}
wg.Wait() // wait for synchronous replication to finish
return
}
// Called on session start
func (smg *SMGeneric) InitiateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) (time.Duration, error) {
defer smg.replicateSessionsForEvent(gev)
if err := smg.sessionStart(gev, clnt); err != nil {
smg.sessionEnd(gev.GetUUID(), 0)
return nilDuration, err
@@ -397,6 +437,7 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient
if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { // Not possible to update a session with debit loop active
return 0, errors.New("ACTIVE_DEBIT_LOOP")
}
defer smg.replicateSessionsForEvent(gev)
if initialID, err := gev.GetFieldAsString(utils.InitialOriginID); err == nil {
err := smg.sessionRelocate(gev.GetUUID(), initialID)
if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update
@@ -437,6 +478,7 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient
// Called on session end, should stop debit loop
func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, clnt rpcclient.RpcClientConnection) error {
defer smg.replicateSessionsForEvent(gev)
if initialID, err := gev.GetFieldAsString(utils.InitialOriginID); err == nil {
err := smg.sessionRelocate(gev.GetUUID(), initialID)
if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update