From d3de77260c72421cf6814d6da28b72fe0ad937aa Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 3 Mar 2019 14:22:57 +0100 Subject: [PATCH] Adding session debit loops for replicated sessions, fixes #1438 --- sessions/session.go | 25 +++++++++++++------ sessions/sessions.go | 58 ++++++++++++++++++++++++++++---------------- 2 files changed, 55 insertions(+), 28 deletions(-) diff --git a/sessions/session.go b/sessions/session.go index b85d2f464..9e60f617b 100644 --- a/sessions/session.go +++ b/sessions/session.go @@ -60,17 +60,19 @@ type ActiveSession struct { MaxRate float64 MaxRateUnit time.Duration MaxCostSoFar float64 + DebitInterval time.Duration } type Session struct { sync.RWMutex - CGRID string - Tenant string - ResourceID string - ClientConnID string // connection ID towards the client so we can recover from passive - EventStart *engine.SafEvent // Event which started the session - SRuns []*SRun // forked based on ChargerS + CGRID string + Tenant string + ResourceID string + ClientConnID string // connection ID towards the client so we can recover from passive + EventStart *engine.SafEvent // Event which started the session + DebitInterval time.Duration // execute debits for *prepaid runs + SRuns []*SRun // forked based on ChargerS debitStop chan struct{} sTerminator *sTerminator // automatic timeout for the session @@ -84,6 +86,14 @@ func (s *Session) CGRid() (cgrID string) { return } +// DebitStopChan reads the debit stop +func (s *Session) DebitStopChan() (dbtStop chan struct{}) { + s.RLock() + dbtStop = s.debitStop + s.RUnlock() + return +} + // Clone is a thread safe method to clone the sessions information func (s Session) Clone() (cln *Session) { s.RLock() @@ -126,7 +136,8 @@ func (s *Session) AsActiveSessions(tmz, nodeID string) (aSs []*ActiveSession) { Usage: sr.TotalUsage, ExtraFields: sr.Event.AsMapStringIgnoreErrors( utils.NewStringMap(utils.MainCDRFields...)), - NodeID: nodeID, + NodeID: nodeID, + DebitInterval: s.DebitInterval, } if sr.CD != nil { aSs[i].LoopIndex = sr.CD.LoopIndex diff --git a/sessions/sessions.go b/sessions/sessions.go index 6a07019fc..dbd0c652e 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -526,13 +526,18 @@ func (sS *SessionS) debitLoopSession(s *Session, sRunIdx int, } return } else if maxDebit < dbtIvl { - time.Sleep(maxDebit) - if err := sS.disconnectSession(s, utils.ErrInsufficientCredit.Error()); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> could not disconnect session: %s, error: %s", - utils.SessionS, s.CGRid(), err.Error())) - } - return + go func() { // schedule sending disconnect command + select { + case <-s.debitStop: // call was disconnected already + return + case <-time.After(maxDebit): + if err := sS.disconnectSession(s, utils.ErrInsufficientCredit.Error()); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> could not command disconnect session: %s, error: %s", + utils.SessionS, s.CGRid(), err.Error())) + } + } + }() } select { case <-s.debitStop: @@ -1117,6 +1122,23 @@ func (sS *SessionS) syncSessions() { } } +// initSessionDebitLoops will init the debit loops for a session +func (sS *SessionS) initSessionDebitLoops(s *Session) { + if s.DebitStopChan() != nil { // already initialized + return + } + s.Lock() + s.debitStop = make(chan struct{}) + for i, sr := range s.SRuns { + if s.DebitInterval != 0 && + sr.Event.GetStringIgnoreErrors(utils.RequestType) == utils.META_PREPAID { + go sS.debitLoopSession(s, i, s.DebitInterval) + time.Sleep(1) // allow the goroutine to be executed + } + } + s.Unlock() +} + // authSession calculates maximum usage allowed for given session func (sS *SessionS) authSession(tnt string, evStart *engine.SafEvent) (maxUsage time.Duration, err error) { cgrID := GetSetCGRID(evStart) @@ -1162,23 +1184,17 @@ func (sS *SessionS) initSession(tnt string, evStart *engine.SafEvent, clntConnID resID string, dbtItval time.Duration) (s *Session, err error) { cgrID := GetSetCGRID(evStart) s = &Session{ - CGRID: cgrID, - Tenant: tnt, - ResourceID: resID, - EventStart: evStart, - ClientConnID: clntConnID, - debitStop: make(chan struct{}), + CGRID: cgrID, + Tenant: tnt, + ResourceID: resID, + EventStart: evStart, + ClientConnID: clntConnID, + DebitInterval: dbtItval, } if err = sS.forkSession(s); err != nil { return nil, err } - for i, sr := range s.SRuns { - if dbtItval != 0 && - sr.Event.GetStringIgnoreErrors(utils.RequestType) == utils.META_PREPAID { - go sS.debitLoopSession(s, i, dbtItval) - time.Sleep(1) - } - } + sS.initSessionDebitLoops(s) sS.registerSession(s, false) // make the session available to the rest of the system return } @@ -1257,7 +1273,6 @@ func (sS *SessionS) endSession(s *Session, tUsage, lastUsage *time.Duration) (er s.debitStop = nil } if sr.EventCost != nil { - if notCharged := sUsage - sr.EventCost.GetUsage(); notCharged > 0 { // we did not charge enough, make a manual debit here if sr.CD.LoopIndex > 0 { sr.CD.TimeStart = sr.CD.TimeEnd @@ -1450,6 +1465,7 @@ func (sS *SessionS) BiRPCv1SetPassiveSession(clnt rpcclient.RpcClientConnection, if len(sS.getSessions(s.CGRID, false)) != 0 { sS.unregisterSession(s.CGRID, false) } + sS.initSessionDebitLoops(s) sS.registerSession(s, true) } *reply = utils.OK