Adding session debit loops for replicated sessions, fixes #1438

This commit is contained in:
DanB
2019-03-03 14:22:57 +01:00
parent 9e2dc1b934
commit d3de77260c
2 changed files with 55 additions and 28 deletions

View File

@@ -60,17 +60,19 @@ type ActiveSession struct {
MaxRate float64
MaxRateUnit time.Duration
MaxCostSoFar float64
DebitInterval time.Duration
}
type Session struct {
sync.RWMutex
CGRID string
Tenant string
ResourceID string
ClientConnID string // connection ID towards the client so we can recover from passive
EventStart *engine.SafEvent // Event which started the session
SRuns []*SRun // forked based on ChargerS
CGRID string
Tenant string
ResourceID string
ClientConnID string // connection ID towards the client so we can recover from passive
EventStart *engine.SafEvent // Event which started the session
DebitInterval time.Duration // execute debits for *prepaid runs
SRuns []*SRun // forked based on ChargerS
debitStop chan struct{}
sTerminator *sTerminator // automatic timeout for the session
@@ -84,6 +86,14 @@ func (s *Session) CGRid() (cgrID string) {
return
}
// DebitStopChan reads the debit stop
func (s *Session) DebitStopChan() (dbtStop chan struct{}) {
s.RLock()
dbtStop = s.debitStop
s.RUnlock()
return
}
// Clone is a thread safe method to clone the sessions information
func (s Session) Clone() (cln *Session) {
s.RLock()
@@ -126,7 +136,8 @@ func (s *Session) AsActiveSessions(tmz, nodeID string) (aSs []*ActiveSession) {
Usage: sr.TotalUsage,
ExtraFields: sr.Event.AsMapStringIgnoreErrors(
utils.NewStringMap(utils.MainCDRFields...)),
NodeID: nodeID,
NodeID: nodeID,
DebitInterval: s.DebitInterval,
}
if sr.CD != nil {
aSs[i].LoopIndex = sr.CD.LoopIndex

View File

@@ -526,13 +526,18 @@ func (sS *SessionS) debitLoopSession(s *Session, sRunIdx int,
}
return
} else if maxDebit < dbtIvl {
time.Sleep(maxDebit)
if err := sS.disconnectSession(s, utils.ErrInsufficientCredit.Error()); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> could not disconnect session: %s, error: %s",
utils.SessionS, s.CGRid(), err.Error()))
}
return
go func() { // schedule sending disconnect command
select {
case <-s.debitStop: // call was disconnected already
return
case <-time.After(maxDebit):
if err := sS.disconnectSession(s, utils.ErrInsufficientCredit.Error()); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> could not command disconnect session: %s, error: %s",
utils.SessionS, s.CGRid(), err.Error()))
}
}
}()
}
select {
case <-s.debitStop:
@@ -1117,6 +1122,23 @@ func (sS *SessionS) syncSessions() {
}
}
// initSessionDebitLoops will init the debit loops for a session
func (sS *SessionS) initSessionDebitLoops(s *Session) {
if s.DebitStopChan() != nil { // already initialized
return
}
s.Lock()
s.debitStop = make(chan struct{})
for i, sr := range s.SRuns {
if s.DebitInterval != 0 &&
sr.Event.GetStringIgnoreErrors(utils.RequestType) == utils.META_PREPAID {
go sS.debitLoopSession(s, i, s.DebitInterval)
time.Sleep(1) // allow the goroutine to be executed
}
}
s.Unlock()
}
// authSession calculates maximum usage allowed for given session
func (sS *SessionS) authSession(tnt string, evStart *engine.SafEvent) (maxUsage time.Duration, err error) {
cgrID := GetSetCGRID(evStart)
@@ -1162,23 +1184,17 @@ func (sS *SessionS) initSession(tnt string, evStart *engine.SafEvent, clntConnID
resID string, dbtItval time.Duration) (s *Session, err error) {
cgrID := GetSetCGRID(evStart)
s = &Session{
CGRID: cgrID,
Tenant: tnt,
ResourceID: resID,
EventStart: evStart,
ClientConnID: clntConnID,
debitStop: make(chan struct{}),
CGRID: cgrID,
Tenant: tnt,
ResourceID: resID,
EventStart: evStart,
ClientConnID: clntConnID,
DebitInterval: dbtItval,
}
if err = sS.forkSession(s); err != nil {
return nil, err
}
for i, sr := range s.SRuns {
if dbtItval != 0 &&
sr.Event.GetStringIgnoreErrors(utils.RequestType) == utils.META_PREPAID {
go sS.debitLoopSession(s, i, dbtItval)
time.Sleep(1)
}
}
sS.initSessionDebitLoops(s)
sS.registerSession(s, false) // make the session available to the rest of the system
return
}
@@ -1257,7 +1273,6 @@ func (sS *SessionS) endSession(s *Session, tUsage, lastUsage *time.Duration) (er
s.debitStop = nil
}
if sr.EventCost != nil {
if notCharged := sUsage - sr.EventCost.GetUsage(); notCharged > 0 { // we did not charge enough, make a manual debit here
if sr.CD.LoopIndex > 0 {
sr.CD.TimeStart = sr.CD.TimeEnd
@@ -1450,6 +1465,7 @@ func (sS *SessionS) BiRPCv1SetPassiveSession(clnt rpcclient.RpcClientConnection,
if len(sS.getSessions(s.CGRID, false)) != 0 {
sS.unregisterSession(s.CGRID, false)
}
sS.initSessionDebitLoops(s)
sS.registerSession(s, true)
}
*reply = utils.OK