From 2c01491424ebd1c71b75a790b1293873c707382e Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 2 Aug 2017 08:23:59 +0200 Subject: [PATCH] Removing guardian locking in SMG --- sessionmanager/smgeneric.go | 153 +++++++++++++++++------------------- 1 file changed, 72 insertions(+), 81 deletions(-) diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index c00b1d626..a68df9488 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -28,7 +28,6 @@ import ( "github.com/cgrates/cgrates/cache" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) @@ -362,99 +361,91 @@ func (smg *SMGeneric) getSessionIDsForPrefix(prefix string, passiveSessions bool // sessionStart will handle a new session, pass the connectionId so we can communicate on disconnect request func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, clntConn rpcclient.RpcClientConnection) (err error) { cgrID := evStart.GetCGRID(utils.META_DEFAULT) - _, err = guardian.Guardian.Guard(func() (interface{}, error) { // Lock it on CGRID level - if pSS := smg.passiveToActive(cgrID); len(pSS) != 0 { - return nil, nil // ToDo: handle here also debits + if pSS := smg.passiveToActive(cgrID); len(pSS) != 0 { + return nil // ToDo: handle here also debits + } + var sessionRuns []*engine.SessionRun + if err = smg.rals.Call("Responder.GetSessionRuns", + evStart.AsStoredCdr(smg.cgrCfg, smg.Timezone), &sessionRuns); err != nil { + return + } else if len(sessionRuns) == 0 { + return + } + stopDebitChan := make(chan struct{}) + for _, sessionRun := range sessionRuns { + s := &SMGSession{CGRID: cgrID, EventStart: evStart, RunID: sessionRun.DerivedCharger.RunID, Timezone: smg.Timezone, + rals: smg.rals, cdrsrv: smg.cdrsrv, CD: sessionRun.CallDescriptor, clntConn: clntConn} + smg.recordASession(s) + //utils.Logger.Info(fmt.Sprintf(" Starting session: %s, runId: %s", sessionId, s.runId)) + if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { + s.stopDebit = stopDebitChan + go s.debitLoop(smg.cgrCfg.SmGenericConfig.DebitInterval) } - var sessionRuns []*engine.SessionRun - if err := smg.rals.Call("Responder.GetSessionRuns", evStart.AsStoredCdr(smg.cgrCfg, smg.Timezone), &sessionRuns); err != nil { - return nil, err - } else if len(sessionRuns) == 0 { - return nil, nil - } - stopDebitChan := make(chan struct{}) - for _, sessionRun := range sessionRuns { - s := &SMGSession{CGRID: cgrID, EventStart: evStart, RunID: sessionRun.DerivedCharger.RunID, Timezone: smg.Timezone, - rals: smg.rals, cdrsrv: smg.cdrsrv, CD: sessionRun.CallDescriptor, clntConn: clntConn} - smg.recordASession(s) - //utils.Logger.Info(fmt.Sprintf(" Starting session: %s, runId: %s", sessionId, s.runId)) - if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { - s.stopDebit = stopDebitChan - go s.debitLoop(smg.cgrCfg.SmGenericConfig.DebitInterval) - } - } - return nil, nil - }, smg.cgrCfg.LockingTimeout, cgrID) + } return } // sessionEnd will end a session from outside -func (smg *SMGeneric) sessionEnd(cgrID string, usage time.Duration) error { - _, err := guardian.Guardian.Guard(func() (interface{}, error) { // Lock it on UUID level - ss := smg.getSessions(cgrID, false) - if len(ss) == 0 { - if ss = smg.passiveToActive(cgrID); len(ss) == 0 { - return nil, nil // ToDo: handle here also debits - } +func (smg *SMGeneric) sessionEnd(cgrID string, usage time.Duration) (err error) { + ss := smg.getSessions(cgrID, false) + if len(ss) == 0 { + if ss = smg.passiveToActive(cgrID); len(ss) == 0 { + return // ToDo: handle here also debits } - if !smg.unrecordASession(cgrID) { // Unreference it early so we avoid concurrency - return nil, nil // Did not find the session so no need to close it anymore + } + if !smg.unrecordASession(cgrID) { // Unreference it early so we avoid concurrency + return // Did not find the session so no need to close it anymore + } + for idx, s := range ss[cgrID] { + s.TotalUsage = usage // save final usage as totalUsage + if idx == 0 && s.stopDebit != nil { + close(s.stopDebit) // Stop automatic debits } - for idx, s := range ss[cgrID] { - s.TotalUsage = usage // save final usage as totalUsage - if idx == 0 && s.stopDebit != nil { - close(s.stopDebit) // Stop automatic debits - } - aTime, err := s.EventStart.GetAnswerTime(utils.META_DEFAULT, smg.Timezone) - if err != nil || aTime.IsZero() { - utils.Logger.Err(fmt.Sprintf(" Could not retrieve answer time for session: %s, runId: %s, aTime: %+v, error: %v", - cgrID, s.RunID, aTime, err)) - continue // Unanswered session - } - if err := s.close(usage); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not close session: %s, runId: %s, error: %s", cgrID, s.RunID, err.Error())) - } - if err := s.storeSMCost(); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not save session: %s, runId: %s, error: %s", cgrID, s.RunID, err.Error())) - } + aTime, err := s.EventStart.GetAnswerTime(utils.META_DEFAULT, smg.Timezone) + if err != nil || aTime.IsZero() { + utils.Logger.Err(fmt.Sprintf(" Could not retrieve answer time for session: %s, runId: %s, aTime: %+v, error: %v", + cgrID, s.RunID, aTime, err)) + continue // Unanswered session } - return nil, nil - }, smg.cgrCfg.LockingTimeout, cgrID) - return err + if err := s.close(usage); err != nil { + utils.Logger.Err(fmt.Sprintf(" Could not close session: %s, runId: %s, error: %s", cgrID, s.RunID, err.Error())) + } + if err := s.storeSMCost(); err != nil { + utils.Logger.Err(fmt.Sprintf(" Could not save session: %s, runId: %s, error: %s", cgrID, s.RunID, err.Error())) + } + } + return } // sessionRelocate is used when an update will relocate an initial session (eg multiple data streams) -func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) error { - _, err := guardian.Guardian.Guard(func() (interface{}, error) { // Lock it on initialID level - if utils.IsSliceMember([]string{initialID, cgrID, newOriginID}, "") { // Not allowed empty params here - return nil, utils.ErrMandatoryIeMissing +func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) (err error) { + if utils.IsSliceMember([]string{initialID, cgrID, newOriginID}, "") { // Not allowed empty params here + return utils.ErrMandatoryIeMissing + } + ssNew := smg.getSessions(cgrID, false) + if len(ssNew) != 0 { // Already relocated + return + } + if pSSNew := smg.getSessions(cgrID, true); len(pSSNew) != 0 { // passive sessions recorded, will be recovered so no need of relocation + return + } + ss := smg.getSessions(initialID, false) + if len(ss) == 0 { // No need of relocation + if ss = smg.passiveToActive(initialID); len(ss) == 0 { + return utils.ErrNotFound } - ssNew := smg.getSessions(cgrID, false) - if len(ssNew) != 0 { // Already relocated - return nil, nil + } + for i, s := range ss[initialID] { + s.mux.Lock() + s.CGRID = cgrID // Overwrite initial CGRID with new one + s.EventStart[utils.ACCID] = newOriginID // Overwrite OriginID for session indexing + s.mux.Unlock() + smg.recordASession(s) + if i == 0 { + smg.unrecordASession(initialID) } - if pSSNew := smg.getSessions(cgrID, true); len(pSSNew) != 0 { // passive sessions recorded, will be recovered so no need of relocation - return nil, nil - } - ss := smg.getSessions(initialID, false) - if len(ss) == 0 { // No need of relocation - if ss = smg.passiveToActive(initialID); len(ss) == 0 { - return nil, utils.ErrNotFound - } - } - for i, s := range ss[initialID] { - s.mux.Lock() - s.CGRID = cgrID // Overwrite initial CGRID with new one - s.EventStart[utils.ACCID] = newOriginID // Overwrite OriginID for session indexing - s.mux.Unlock() - smg.recordASession(s) - if i == 0 { - smg.unrecordASession(initialID) - } - } - return nil, nil - }, smg.cgrCfg.LockingTimeout, initialID) - return err + } + return } // replicateSessions will replicate session based on configuration