SMGeneric with transition mechanisms for passive to active and opposite

This commit is contained in:
DanB
2016-11-08 12:27:29 +01:00
parent 4fab18a94c
commit 9b0b6c0a95

View File

@@ -294,6 +294,9 @@ func (smg *SMGeneric) getASession(cgrID string) []*SMGSession {
func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, clntConn rpcclient.RpcClientConnection) error {
cgrID := evStart.GetCGRID(utils.META_DEFAULT)
processed, err := engine.Guardian.Guard(func() (interface{}, error) { // Lock it on CGRID level
if pSS := smg.passiveToActive(cgrID); len(pSS) != 0 {
return true, nil // ToDo: handle here also debits
}
var sessionRuns []*engine.SessionRun
if err := smg.rals.Call("Responder.GetSessionRuns", evStart.AsStoredCdr(smg.cgrCfg, smg.Timezone), &sessionRuns); err != nil {
return true, err
@@ -324,8 +327,10 @@ func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, clntConn rpcclient.Rp
func (smg *SMGeneric) sessionEnd(cgrID string, usage time.Duration) error {
_, err := engine.Guardian.Guard(func() (interface{}, error) { // Lock it on UUID level
ss := smg.getASession(cgrID)
if len(ss) == 0 { // Not handled by us
return nil, nil
if len(ss) == 0 {
if ss = smg.passiveToActive(cgrID); len(ss) == 0 {
return nil, nil // ToDo: handle here also debits
}
}
if !smg.unrecordASession(cgrID) { // Unreference it early so we avoid concurrency
return nil, nil // Did not find the session so no need to close it anymore
@@ -363,9 +368,14 @@ func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) erro
if len(ssNew) != 0 { // Already relocated
return nil, nil
}
if pSSNew := smg.getPassiveSessions(cgrID, ""); len(pSSNew) != 0 { // passive sessions recorded, will be recovered so no need of relocation
return nil, nil
}
ss := smg.getASession(initialID)
if len(ss) == 0 { // No need of relocation
return nil, utils.ErrNotFound
if ss = smg.passiveToActive(initialID); len(ss) == 0 {
return nil, utils.ErrNotFound
}
}
for i, s := range ss {
s.CGRID = cgrID // Overwrite initial CGRID with new one
@@ -473,6 +483,19 @@ func (smg *SMGeneric) removePassiveSessions(cgrID string) (err error) {
return
}
func (smg *SMGeneric) passiveToActive(cgrID string) (pSS []*SMGSession) {
pSessions := smg.getPassiveSessions(cgrID, "")
if len(pSS) == 0 {
return
}
pSS = pSessions[cgrID]
for _, s := range pSS {
smg.recordASession(s)
}
smg.deletePassiveSessions(cgrID)
return
}
// Methods to apply on sessions, mostly exported through RPC/Bi-RPC
// MaxUsage calculates maximum usage allowed for given gevent
@@ -530,6 +553,7 @@ func (smg *SMGeneric) InitiateSession(gev SMGenericEvent, clnt rpcclient.RpcClie
return item.Value.(time.Duration), item.Err
}
defer smg.responseCache.Cache(cacheKey, &cache.CacheItem{Value: maxUsage, Err: err}) // schedule response caching
smg.deletePassiveSessions(cgrID)
if err = smg.sessionStart(gev, clnt); err != nil {
smg.sessionEnd(cgrID, 0)
return