Removing guardian locking in SMG

This commit is contained in:
DanB
2017-08-02 08:23:59 +02:00
parent 1b822d6d34
commit 2c01491424

View File

@@ -28,7 +28,6 @@ import (
"github.com/cgrates/cgrates/cache"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
@@ -362,99 +361,91 @@ func (smg *SMGeneric) getSessionIDsForPrefix(prefix string, passiveSessions bool
// sessionStart will handle a new session, pass the connectionId so we can communicate on disconnect request
func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, clntConn rpcclient.RpcClientConnection) (err error) {
cgrID := evStart.GetCGRID(utils.META_DEFAULT)
_, err = guardian.Guardian.Guard(func() (interface{}, error) { // Lock it on CGRID level
if pSS := smg.passiveToActive(cgrID); len(pSS) != 0 {
return nil, nil // ToDo: handle here also debits
if pSS := smg.passiveToActive(cgrID); len(pSS) != 0 {
return nil // ToDo: handle here also debits
}
var sessionRuns []*engine.SessionRun
if err = smg.rals.Call("Responder.GetSessionRuns",
evStart.AsStoredCdr(smg.cgrCfg, smg.Timezone), &sessionRuns); err != nil {
return
} else if len(sessionRuns) == 0 {
return
}
stopDebitChan := make(chan struct{})
for _, sessionRun := range sessionRuns {
s := &SMGSession{CGRID: cgrID, EventStart: evStart, RunID: sessionRun.DerivedCharger.RunID, Timezone: smg.Timezone,
rals: smg.rals, cdrsrv: smg.cdrsrv, CD: sessionRun.CallDescriptor, clntConn: clntConn}
smg.recordASession(s)
//utils.Logger.Info(fmt.Sprintf("<SMGeneric> Starting session: %s, runId: %s", sessionId, s.runId))
if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 {
s.stopDebit = stopDebitChan
go s.debitLoop(smg.cgrCfg.SmGenericConfig.DebitInterval)
}
var sessionRuns []*engine.SessionRun
if err := smg.rals.Call("Responder.GetSessionRuns", evStart.AsStoredCdr(smg.cgrCfg, smg.Timezone), &sessionRuns); err != nil {
return nil, err
} else if len(sessionRuns) == 0 {
return nil, nil
}
stopDebitChan := make(chan struct{})
for _, sessionRun := range sessionRuns {
s := &SMGSession{CGRID: cgrID, EventStart: evStart, RunID: sessionRun.DerivedCharger.RunID, Timezone: smg.Timezone,
rals: smg.rals, cdrsrv: smg.cdrsrv, CD: sessionRun.CallDescriptor, clntConn: clntConn}
smg.recordASession(s)
//utils.Logger.Info(fmt.Sprintf("<SMGeneric> Starting session: %s, runId: %s", sessionId, s.runId))
if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 {
s.stopDebit = stopDebitChan
go s.debitLoop(smg.cgrCfg.SmGenericConfig.DebitInterval)
}
}
return nil, nil
}, smg.cgrCfg.LockingTimeout, cgrID)
}
return
}
// sessionEnd will end a session from outside
func (smg *SMGeneric) sessionEnd(cgrID string, usage time.Duration) error {
_, err := guardian.Guardian.Guard(func() (interface{}, error) { // Lock it on UUID level
ss := smg.getSessions(cgrID, false)
if len(ss) == 0 {
if ss = smg.passiveToActive(cgrID); len(ss) == 0 {
return nil, nil // ToDo: handle here also debits
}
func (smg *SMGeneric) sessionEnd(cgrID string, usage time.Duration) (err error) {
ss := smg.getSessions(cgrID, false)
if len(ss) == 0 {
if ss = smg.passiveToActive(cgrID); len(ss) == 0 {
return // ToDo: handle here also debits
}
if !smg.unrecordASession(cgrID) { // Unreference it early so we avoid concurrency
return nil, nil // Did not find the session so no need to close it anymore
}
if !smg.unrecordASession(cgrID) { // Unreference it early so we avoid concurrency
return // Did not find the session so no need to close it anymore
}
for idx, s := range ss[cgrID] {
s.TotalUsage = usage // save final usage as totalUsage
if idx == 0 && s.stopDebit != nil {
close(s.stopDebit) // Stop automatic debits
}
for idx, s := range ss[cgrID] {
s.TotalUsage = usage // save final usage as totalUsage
if idx == 0 && s.stopDebit != nil {
close(s.stopDebit) // Stop automatic debits
}
aTime, err := s.EventStart.GetAnswerTime(utils.META_DEFAULT, smg.Timezone)
if err != nil || aTime.IsZero() {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not retrieve answer time for session: %s, runId: %s, aTime: %+v, error: %v",
cgrID, s.RunID, aTime, err))
continue // Unanswered session
}
if err := s.close(usage); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not close session: %s, runId: %s, error: %s", cgrID, s.RunID, err.Error()))
}
if err := s.storeSMCost(); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not save session: %s, runId: %s, error: %s", cgrID, s.RunID, err.Error()))
}
aTime, err := s.EventStart.GetAnswerTime(utils.META_DEFAULT, smg.Timezone)
if err != nil || aTime.IsZero() {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not retrieve answer time for session: %s, runId: %s, aTime: %+v, error: %v",
cgrID, s.RunID, aTime, err))
continue // Unanswered session
}
return nil, nil
}, smg.cgrCfg.LockingTimeout, cgrID)
return err
if err := s.close(usage); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not close session: %s, runId: %s, error: %s", cgrID, s.RunID, err.Error()))
}
if err := s.storeSMCost(); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not save session: %s, runId: %s, error: %s", cgrID, s.RunID, err.Error()))
}
}
return
}
// sessionRelocate is used when an update will relocate an initial session (eg multiple data streams)
func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) error {
_, err := guardian.Guardian.Guard(func() (interface{}, error) { // Lock it on initialID level
if utils.IsSliceMember([]string{initialID, cgrID, newOriginID}, "") { // Not allowed empty params here
return nil, utils.ErrMandatoryIeMissing
func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) (err error) {
if utils.IsSliceMember([]string{initialID, cgrID, newOriginID}, "") { // Not allowed empty params here
return utils.ErrMandatoryIeMissing
}
ssNew := smg.getSessions(cgrID, false)
if len(ssNew) != 0 { // Already relocated
return
}
if pSSNew := smg.getSessions(cgrID, true); len(pSSNew) != 0 { // passive sessions recorded, will be recovered so no need of relocation
return
}
ss := smg.getSessions(initialID, false)
if len(ss) == 0 { // No need of relocation
if ss = smg.passiveToActive(initialID); len(ss) == 0 {
return utils.ErrNotFound
}
ssNew := smg.getSessions(cgrID, false)
if len(ssNew) != 0 { // Already relocated
return nil, nil
}
for i, s := range ss[initialID] {
s.mux.Lock()
s.CGRID = cgrID // Overwrite initial CGRID with new one
s.EventStart[utils.ACCID] = newOriginID // Overwrite OriginID for session indexing
s.mux.Unlock()
smg.recordASession(s)
if i == 0 {
smg.unrecordASession(initialID)
}
if pSSNew := smg.getSessions(cgrID, true); len(pSSNew) != 0 { // passive sessions recorded, will be recovered so no need of relocation
return nil, nil
}
ss := smg.getSessions(initialID, false)
if len(ss) == 0 { // No need of relocation
if ss = smg.passiveToActive(initialID); len(ss) == 0 {
return nil, utils.ErrNotFound
}
}
for i, s := range ss[initialID] {
s.mux.Lock()
s.CGRID = cgrID // Overwrite initial CGRID with new one
s.EventStart[utils.ACCID] = newOriginID // Overwrite OriginID for session indexing
s.mux.Unlock()
smg.recordASession(s)
if i == 0 {
smg.unrecordASession(initialID)
}
}
return nil, nil
}, smg.cgrCfg.LockingTimeout, initialID)
return err
}
return
}
// replicateSessions will replicate session based on configuration