diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index d30dd867e..8685c3180 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -294,6 +294,9 @@ func (smg *SMGeneric) getASession(cgrID string) []*SMGSession { func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, clntConn rpcclient.RpcClientConnection) error { cgrID := evStart.GetCGRID(utils.META_DEFAULT) processed, err := engine.Guardian.Guard(func() (interface{}, error) { // Lock it on CGRID level + if pSS := smg.passiveToActive(cgrID); len(pSS) != 0 { + return true, nil // ToDo: handle here also debits + } var sessionRuns []*engine.SessionRun if err := smg.rals.Call("Responder.GetSessionRuns", evStart.AsStoredCdr(smg.cgrCfg, smg.Timezone), &sessionRuns); err != nil { return true, err @@ -324,8 +327,10 @@ func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, clntConn rpcclient.Rp func (smg *SMGeneric) sessionEnd(cgrID string, usage time.Duration) error { _, err := engine.Guardian.Guard(func() (interface{}, error) { // Lock it on UUID level ss := smg.getASession(cgrID) - if len(ss) == 0 { // Not handled by us - return nil, nil + if len(ss) == 0 { + if ss = smg.passiveToActive(cgrID); len(ss) == 0 { + return nil, nil // ToDo: handle here also debits + } } if !smg.unrecordASession(cgrID) { // Unreference it early so we avoid concurrency return nil, nil // Did not find the session so no need to close it anymore @@ -363,9 +368,14 @@ func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) erro if len(ssNew) != 0 { // Already relocated return nil, nil } + if pSSNew := smg.getPassiveSessions(cgrID, ""); len(pSSNew) != 0 { // passive sessions recorded, will be recovered so no need of relocation + return nil, nil + } ss := smg.getASession(initialID) if len(ss) == 0 { // No need of relocation - return nil, utils.ErrNotFound + if ss = smg.passiveToActive(initialID); len(ss) == 0 { + return nil, utils.ErrNotFound + } } for i, s := range ss { s.CGRID = cgrID // Overwrite initial CGRID with new one @@ -473,6 +483,19 @@ func (smg *SMGeneric) removePassiveSessions(cgrID string) (err error) { return } +func (smg *SMGeneric) passiveToActive(cgrID string) (pSS []*SMGSession) { + pSessions := smg.getPassiveSessions(cgrID, "") + if len(pSS) == 0 { + return + } + pSS = pSessions[cgrID] + for _, s := range pSS { + smg.recordASession(s) + } + smg.deletePassiveSessions(cgrID) + return +} + // Methods to apply on sessions, mostly exported through RPC/Bi-RPC // MaxUsage calculates maximum usage allowed for given gevent @@ -530,6 +553,7 @@ func (smg *SMGeneric) InitiateSession(gev SMGenericEvent, clnt rpcclient.RpcClie return item.Value.(time.Duration), item.Err } defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Value: maxUsage, Err: err}) // schedule response caching + smg.deletePassiveSessions(cgrID) if err = smg.sessionStart(gev, clnt); err != nil { smg.sessionEnd(cgrID, 0) return