Add active sessions backup functionalitiy

This commit is contained in:
arberkatellari
2024-06-11 17:01:50 +02:00
committed by Dan Christian Bogos
parent 7e3b1b7052
commit f356695f6f
37 changed files with 1748 additions and 28 deletions

View File

@@ -79,6 +79,7 @@ type Session struct {
Chargeable bool // used in case of pausing debit
SRuns []*SRun // forked based on ChargerS
OptsStart engine.MapEvent
UpdatedAt time.Time // time when session was changed
debitStop chan struct{}
sTerminator *sTerminator // automatic timeout for the session
@@ -132,6 +133,64 @@ func (s *Session) Clone() (cln *Session) {
return
}
// asStoredSession converts a Session to a StoredSession to be stored later in DataDB
func (s *Session) asStoredSession() *engine.StoredSession {
storedSRuns := make([]*engine.StoredSRun, len(s.SRuns))
for i, sRun := range s.SRuns {
storedSRuns[i] = &engine.StoredSRun{
Event: sRun.Event,
CD: sRun.CD,
EventCost: sRun.EventCost,
ExtraDuration: sRun.ExtraDuration,
LastUsage: sRun.LastUsage,
TotalUsage: sRun.TotalUsage,
NextAutoDebit: sRun.NextAutoDebit,
}
}
return &engine.StoredSession{
CGRID: s.CGRID,
Tenant: s.Tenant,
ResourceID: s.ResourceID,
ClientConnID: s.ClientConnID,
EventStart: s.EventStart,
DebitInterval: s.DebitInterval,
Chargeable: s.Chargeable,
SRuns: storedSRuns,
OptsStart: s.OptsStart,
UpdatedAt: s.UpdatedAt,
}
}
// newSessionFromStoredSession converts a StoredSession to a Session to be restored and activated later on
func newSessionFromStoredSession(s *engine.StoredSession) *Session {
storedSRuns := make([]*SRun, len(s.SRuns))
for i, sRun := range s.SRuns {
storedSRuns[i] = &SRun{
Event: sRun.Event,
CD: sRun.CD,
EventCost: sRun.EventCost,
ExtraDuration: sRun.ExtraDuration,
LastUsage: sRun.LastUsage,
TotalUsage: sRun.TotalUsage,
NextAutoDebit: sRun.NextAutoDebit,
}
}
return &Session{
CGRID: s.CGRID,
Tenant: s.Tenant,
ResourceID: s.ResourceID,
ClientConnID: s.ClientConnID,
EventStart: s.EventStart,
DebitInterval: s.DebitInterval,
Chargeable: s.Chargeable,
SRuns: storedSRuns,
OptsStart: s.OptsStart,
UpdatedAt: s.UpdatedAt,
}
}
// AsExternalSessions returns the session as a list of ExternalSession using all SRuns (thread safe)
func (s *Session) AsExternalSessions(tmz, nodeID string) (aSs []*ExternalSession) {
s.RLock()

View File

@@ -44,17 +44,19 @@ func NewSessionS(cgrCfg *config.CGRConfig,
cgrCfg.SessionSCfg().SessionIndexes.Add(utils.OriginID) // Make sure we have indexing for OriginID since it is a requirement on prefix searching
return &SessionS{
cgrCfg: cgrCfg,
dm: dm,
connMgr: connMgr,
biJClnts: make(map[birpc.ClientConnector]string),
biJIDs: make(map[string]*biJClient),
aSessions: make(map[string]*Session),
aSessionsIdx: make(map[string]map[string]map[string]utils.StringSet),
aSessionsRIdx: make(map[string][]*riFieldNameVal),
pSessions: make(map[string]*Session),
pSessionsIdx: make(map[string]map[string]map[string]utils.StringSet),
pSessionsRIdx: make(map[string][]*riFieldNameVal),
cgrCfg: cgrCfg,
dm: dm,
connMgr: connMgr,
biJClnts: make(map[birpc.ClientConnector]string),
biJIDs: make(map[string]*biJClient),
aSessions: make(map[string]*Session),
aSessionsIdx: make(map[string]map[string]map[string]utils.StringSet),
aSessionsRIdx: make(map[string][]*riFieldNameVal),
pSessions: make(map[string]*Session),
pSessionsIdx: make(map[string]map[string]map[string]utils.StringSet),
pSessionsRIdx: make(map[string][]*riFieldNameVal),
markedSsCGRIDs: make(utils.StringSet),
removeSsCGRIDs: make(utils.StringSet),
}
}
@@ -87,10 +89,16 @@ type SessionS struct {
pSIMux sync.RWMutex // protects pSessionsIdx
pSessionsIdx map[string]map[string]map[string]utils.StringSet // map[fieldName]map[fieldValue][cgrID]utils.StringSet[runID]sID
pSessionsRIdx map[string][]*riFieldNameVal // reverse indexes for passive sessions, used on remove
markedSsCGRIDs utils.StringSet // keep a record of session cgrids to be stored in dataDB backup
markedSsCGRIDsMux sync.RWMutex // prevent concurrency when adding/deleting CGRIDs from map
removeSsCGRIDs utils.StringSet // keep a record of session cgrids to be removed from dataDB backup
removeSsCGRIDsMux sync.RWMutex // prevent concurrency when adding/deleting CGRIDs from map
storeSessMux sync.RWMutex // protects storeSessions
}
// ListenAndServe starts the service and binds it to the listen loop
func (sS *SessionS) ListenAndServe(stopChan chan struct{}) {
// SyncSessions starts the service and binds it to the listen loop
func (sS *SessionS) SyncSessions(stopChan chan struct{}) {
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.SessionS))
if sS.cgrCfg.SessionSCfg().ChannelSyncInterval != 0 {
for { // Schedule sync channels to run repeately
@@ -106,15 +114,9 @@ func (sS *SessionS) ListenAndServe(stopChan chan struct{}) {
// Shutdown is called by engine to clear states
func (sS *SessionS) Shutdown() (err error) {
if len(sS.cgrCfg.SessionSCfg().ReplicationConns) == 0 {
var hasErr bool
for _, s := range sS.getSessions("", false) { // Force sessions shutdown
if err = sS.terminateSession(s, nil, nil, nil, false); err != nil {
hasErr = true
}
}
if hasErr {
return utils.ErrPartiallyExecuted
if sS.cgrCfg.SessionSCfg().BackupInterval != 0 {
if _, err := sS.storeSessions(); err != nil {
utils.Logger.Err(fmt.Sprintf("Backup Sessions error on shutdown: <%v>", err))
}
}
return
@@ -661,6 +663,14 @@ func (sS *SessionS) debitLoopSession(s *Session, sRunIdx int,
}
return
}
s.Lock()
s.UpdatedAt = time.Now()
s.Unlock()
if sS.cgrCfg.SessionSCfg().BackupInterval > 0 {
sS.markedSsCGRIDsMux.Lock()
sS.markedSsCGRIDs.Add(s.CGRID)
sS.markedSsCGRIDsMux.Unlock()
}
select {
case <-debitStop:
return
@@ -726,6 +736,12 @@ func (sS *SessionS) refundSession(s *Session, sRunIdx int, rUsage time.Duration)
acntSummary.UpdateInitialValue(sr.EventCost.AccountSummary)
sr.EventCost.AccountSummary = acntSummary
}
s.UpdatedAt = time.Now()
if sS.cgrCfg.SessionSCfg().BackupInterval > 0 {
sS.markedSsCGRIDsMux.Lock()
sS.markedSsCGRIDs.Add(s.CGRID)
sS.markedSsCGRIDsMux.Unlock()
}
return
}
@@ -908,6 +924,7 @@ func (sS *SessionS) registerSession(s *Session, passive bool) {
sMp = sS.pSessions
}
sMux.Lock()
s.UpdatedAt = time.Now()
sMp[s.CGRID] = s
sMux.Unlock()
sS.indexSession(s, passive)
@@ -936,6 +953,14 @@ func (sS *SessionS) unregisterSession(cgrID string, passive bool) bool {
sMux = &sS.pSsMux
sMp = sS.pSessions
}
if !passive && sS.cgrCfg.SessionSCfg().BackupInterval > 0 {
sS.markedSsCGRIDsMux.Lock()
delete(sS.markedSsCGRIDs, cgrID) // in case not yet in backup, dont needlessly store session
sS.markedSsCGRIDsMux.Unlock()
sS.removeSsCGRIDsMux.Lock()
sS.removeSsCGRIDs.Add(cgrID)
sS.removeSsCGRIDsMux.Unlock()
}
sMux.Lock()
if _, has := sMp[cgrID]; !has {
sMux.Unlock()
@@ -1385,7 +1410,13 @@ func (sS *SessionS) getActivateSession(cgrID string) (s *Session) {
if len(ss) != 0 {
return ss[0]
}
return sS.transitSState(cgrID, false)
s = sS.transitSState(cgrID, false)
if len(ss) != 0 && sS.cgrCfg.SessionSCfg().BackupInterval > 0 {
sS.markedSsCGRIDsMux.Lock()
sS.markedSsCGRIDs.Add(s.CGRID)
sS.markedSsCGRIDsMux.Unlock()
}
return
}
// relocateSession will change the CGRID of a session (ie: prefix based session group)
@@ -1411,6 +1442,11 @@ func (sS *SessionS) relocateSession(initOriginID, originID, originHost string) (
}
s.Unlock()
sS.registerSession(s, false)
if sS.cgrCfg.SessionSCfg().BackupInterval > 0 {
sS.markedSsCGRIDsMux.Lock()
sS.markedSsCGRIDs.Add(s.CGRID)
sS.markedSsCGRIDsMux.Unlock()
}
sS.replicateSessions(initCGRID, false, sS.cgrCfg.SessionSCfg().ReplicationConns)
return
}
@@ -1554,6 +1590,26 @@ func (sS *SessionS) authEvent(cgrEv *utils.CGREvent, forceDuration bool) (usage
return
}
// restoreSessions reinitiates sessions stored on dataDB backup
// no session protection needed since it runs only once at start of service,
// before the start modifying/creating sessions
func (sS *SessionS) restoreSessions(sessions []*Session) {
for _, s := range sessions {
tor, _ := s.EventStart[utils.ToR].(string)
if tor == utils.EmptyString {
tor = utils.MetaVoice
}
if time.Since(s.UpdatedAt) <= sS.cgrCfg.SessionSCfg().DefaultUsage[tor] {
sS.initSessionDebitLoops(s)
sS.registerSession(s, false)
} else { // remove expired sessions from dataDB
sS.removeSsCGRIDsMux.Lock()
sS.removeSsCGRIDs.Add(s.CGRID)
sS.removeSsCGRIDsMux.Unlock()
}
}
}
// initSession handles a new session
// not thread-safe for Session since it is constructed here
func (sS *SessionS) initSession(cgrEv *utils.CGREvent, clntConnID,
@@ -1588,6 +1644,7 @@ func (sS *SessionS) updateSession(s *Session, updtEv, opts engine.MapEvent, isMs
sS.setSTerminator(s, opts) // reset the terminator
}
s.Chargeable = opts.GetBoolOrDefault(utils.OptsChargeable, true)
s.UpdatedAt = time.Now()
//init has no updtEv
if updtEv == nil {
updtEv = engine.MapEvent(s.EventStart.Clone())
@@ -1719,6 +1776,14 @@ func (sS *SessionS) endSession(s *Session, tUsage, lastUsage *time.Duration,
nil, true, utils.NonTransactional); errCh != nil {
return errCh
}
if sS.cgrCfg.SessionSCfg().BackupInterval > 0 {
sS.markedSsCGRIDsMux.Lock()
delete(sS.markedSsCGRIDs, s.CGRID) // in case not yet in backup, dont needlessly store session
sS.markedSsCGRIDsMux.Unlock()
sS.removeSsCGRIDsMux.Lock()
sS.removeSsCGRIDs.Add(s.CGRID)
sS.removeSsCGRIDsMux.Unlock()
}
return
}
@@ -2385,6 +2450,11 @@ func (sS *SessionS) BiRPCv1InitiateSession(ctx *context.Context,
if sRunsUsage, err = sS.updateSession(s, nil, args.APIOpts, false); err != nil {
return utils.NewErrRALs(err)
}
if sS.cgrCfg.SessionSCfg().BackupInterval > 0 {
sS.markedSsCGRIDsMux.Lock()
sS.markedSsCGRIDs.Add(s.CGRID)
sS.markedSsCGRIDsMux.Unlock()
}
var maxUsage time.Duration
var maxUsageSet bool // so we know if we have set the 0 on purpose
@@ -2602,6 +2672,11 @@ func (sS *SessionS) BiRPCv1UpdateSession(ctx *context.Context,
if sRunsUsage, err = sS.updateSession(s, ev, args.APIOpts, false); err != nil {
return utils.NewErrRALs(err)
}
if sS.cgrCfg.SessionSCfg().BackupInterval > 0 {
sS.markedSsCGRIDsMux.Lock()
sS.markedSsCGRIDs.Add(s.CGRID)
sS.markedSsCGRIDsMux.Unlock()
}
var maxUsage time.Duration
var maxUsageSet bool // so we know if we have set the 0 on purpose
for _, rplyMaxUsage := range sRunsUsage {
@@ -3542,6 +3617,11 @@ func (sS *SessionS) BiRPCv1ProcessEvent(ctx *context.Context,
} else if sRunsMaxUsage, err = sS.updateSession(s, nil, args.APIOpts, false); err != nil {
return utils.NewErrRALs(err)
}
if sS.cgrCfg.SessionSCfg().BackupInterval > 0 {
sS.markedSsCGRIDsMux.Lock()
sS.markedSsCGRIDs.Add(s.CGRID)
sS.markedSsCGRIDsMux.Unlock()
}
rply.MaxUsage = getDerivedMaxUsage(sRunsMaxUsage, ralsOpts.Has(utils.MetaDerivedReply))
//check for update session
case ralsOpts.Has(utils.MetaUpdate):
@@ -3564,6 +3644,11 @@ func (sS *SessionS) BiRPCv1ProcessEvent(ctx *context.Context,
if sRunsMaxUsage, err = sS.updateSession(s, ev, args.APIOpts, false); err != nil {
return utils.NewErrRALs(err)
}
if sS.cgrCfg.SessionSCfg().BackupInterval > 0 {
sS.markedSsCGRIDsMux.Lock()
sS.markedSsCGRIDs.Add(s.CGRID)
sS.markedSsCGRIDsMux.Unlock()
}
rply.MaxUsage = getDerivedMaxUsage(sRunsMaxUsage, ralsOpts.Has(utils.MetaDerivedReply))
// check for terminate session
case ralsOpts.Has(utils.MetaTerminate):
@@ -3790,6 +3875,12 @@ func (sS *SessionS) BiRPCv1ActivateSessions(ctx *context.Context,
if s := sS.transitSState(sID, false); s == nil {
utils.Logger.Warning(fmt.Sprintf("<%s> no passive session with id: <%s>", utils.SessionS, sID))
err = utils.ErrPartiallyExecuted
} else {
if sS.cgrCfg.SessionSCfg().BackupInterval > 0 {
sS.markedSsCGRIDsMux.Lock()
sS.markedSsCGRIDs.Add(sID)
sS.markedSsCGRIDsMux.Unlock()
}
}
}
if err == nil {
@@ -3816,6 +3907,12 @@ func (sS *SessionS) BiRPCv1DeactivateSessions(ctx *context.Context,
if s := sS.transitSState(sID, true); s == nil {
utils.Logger.Warning(fmt.Sprintf("<%s> no active session with id: <%s>", utils.SessionS, sID))
err = utils.ErrPartiallyExecuted
} else {
if sS.cgrCfg.SessionSCfg().BackupInterval > 0 {
sS.removeSsCGRIDsMux.Lock()
sS.removeSsCGRIDs.Add(sID)
sS.removeSsCGRIDsMux.Unlock()
}
}
}
if err == nil {
@@ -4221,3 +4318,109 @@ func (ssv1 *SessionS) BiRPCv1Sleep(ctx *context.Context, args *utils.DurationArg
*reply = utils.OK
return nil
}
// RestoreAndBackupSessions will restore previuos backup sessions and start backup looping
func (sS *SessionS) RestoreAndBackupSessions(stopChan chan struct{}) error {
var restoredSess []*Session //holds the restored sessions gotten from datadb
storedSessions, err := sS.dm.GetSessionsBackup(sS.cgrCfg.GeneralCfg().NodeID,
sS.cgrCfg.GeneralCfg().DefaultTenant)
if err != nil && err != utils.ErrNoBackupFound { // if backup is not found we still want to start the backup loop
return err
} else {
for _, storSess := range storedSessions {
storSess := newSessionFromStoredSession(storSess)
restoredSess = append(restoredSess, storSess)
}
sS.restoreSessions(restoredSess)
}
go sS.runBackup(stopChan)
return nil
}
// Start running backup loop
func (sS *SessionS) runBackup(stopChan chan struct{}) {
backupInterval := sS.cgrCfg.SessionSCfg().BackupInterval
if backupInterval > 0 {
for {
if err := sS.storeSessionsMarked(); err != nil {
utils.Logger.Err(fmt.Sprintf("Backup Sessions error: <%v>", err))
}
select {
case <-stopChan:
return
case <-time.After(backupInterval):
}
}
}
}
// storeSessionsMarked stores only marked active sessions for backup in DataDB, and removes inactive sessions from it
func (sS *SessionS) storeSessionsMarked() (err error) {
sS.markedSsCGRIDsMux.Lock()
var storedSessions []*engine.StoredSession // hold the converted active marked sessions
for cgrID := range sS.markedSsCGRIDs {
activeSess := sS.getSessions(cgrID, false)
if len(activeSess) == 0 {
utils.Logger.Warning("<SessionS> Couldn't backup session with CGRID <" + cgrID + ">. Session is not active")
delete(sS.markedSsCGRIDs, cgrID) // remove inactive session cgrids from the map
continue
}
storedSessions = append(storedSessions, activeSess[0].asStoredSession())
}
if len(storedSessions) != 0 {
if err := sS.dm.SetBackupSessions(sS.cgrCfg.GeneralCfg().NodeID,
sS.cgrCfg.GeneralCfg().DefaultTenant, storedSessions); err != nil {
return err
}
}
for _, sess := range storedSessions {
delete(sS.markedSsCGRIDs, sess.CGRID)
}
sS.markedSsCGRIDsMux.Unlock()
sS.removeSsCGRIDsMux.Lock()
for cgrID := range sS.removeSsCGRIDs {
if err := sS.dm.RemoveSessionsBackup(sS.cgrCfg.GeneralCfg().NodeID,
sS.cgrCfg.GeneralCfg().DefaultTenant, cgrID); err != nil {
return err
}
delete(sS.removeSsCGRIDs, cgrID)
}
sS.removeSsCGRIDsMux.Unlock()
return nil
}
// storeSessions clears current sessions stored in datadb, and stores active sessions for backup in it
func (sS *SessionS) storeSessions() (sessStored int, err error) {
sS.storeSessMux.Lock() // prevents concurrent execution of the function
defer sS.storeSessMux.Unlock()
activeSess := sS.getSessions(utils.EmptyString, false)
// remove all sessions from dataDB backup if any
if err := sS.dm.RemoveSessionsBackup(sS.cgrCfg.GeneralCfg().NodeID,
sS.cgrCfg.GeneralCfg().DefaultTenant, utils.EmptyString); err != nil {
return 0, err
}
if len(activeSess) == 0 {
return
}
var storedSessions []*engine.StoredSession
for _, sess := range activeSess {
storedSessions = append(storedSessions, sess.asStoredSession())
}
if err := sS.dm.SetBackupSessions(sS.cgrCfg.GeneralCfg().NodeID,
sS.cgrCfg.GeneralCfg().DefaultTenant, storedSessions); err != nil {
return 0, err
}
return len(activeSess), nil
}
// BiRPCv1BackupActiveSessions will store all active sessions in dataDB and reply with the amount of sessions it stored
func (sS *SessionS) BiRPCv1BackupActiveSessions(ctx *context.Context,
args string, reply *int) error {
if sessCount, err := sS.storeSessions(); err != nil {
return err
} else {
*reply = sessCount
}
return nil
}