SessionS - Simplified TTL handling

This commit is contained in:
DanB
2019-10-10 10:56:04 +02:00
parent 3fb24b16a4
commit 4030357fe0
4 changed files with 20 additions and 73 deletions

View File

@@ -21,7 +21,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package v1
import (
"fmt"
"net/rpc"
"net/rpc/jsonrpc"
"path"
@@ -153,7 +152,6 @@ func testAPIerVerifyAttributesAfterLoad(t *testing.T) {
var attrReply *engine.AttributeProfile
if err := apierRPC.Call(utils.AttributeSv1GetAttributeForEvent,
ev, &attrReply); err != nil {
fmt.Println(err)
t.Error(err)
}
if attrReply == nil {

View File

@@ -340,11 +340,10 @@ func (sS *SessionS) setSTerminator(s *Session) {
s.sTerminator.timer.Reset(s.sTerminator.ttl)
return
}
endChan := make(chan struct{})
// new set
s.sTerminator = &sTerminator{
timer: time.NewTimer(ttl),
endChan: endChan,
endChan: make(chan struct{}),
ttl: ttl,
ttlLastUsed: ttlLastUsed,
ttlUsage: ttlUsage,
@@ -360,7 +359,7 @@ func (sS *SessionS) setSTerminator(s *Session) {
}
sS.forceSTerminate(s, endUsage,
s.sTerminator.ttlLastUsed)
case <-endChan:
case <-s.sTerminator.endChan:
s.sTerminator.timer.Stop()
}
}()
@@ -766,10 +765,6 @@ func (sS *SessionS) registerSession(s *Session, passive bool) {
sMp[s.CGRID] = s
sMux.Unlock()
sS.indexSession(s, passive)
if !passive {
sS.setSTerminator(s)
}
}
// uregisterSession will unregister an active or passive session based on it's CGRID
@@ -782,21 +777,13 @@ func (sS *SessionS) unregisterSession(cgrID string, passive bool) bool {
sMp = sS.pSessions
}
sMux.Lock()
s, found := sMp[cgrID]
if !found {
if _, has := sMp[cgrID]; !has {
sMux.Unlock()
return false
}
delete(sMp, cgrID)
sMux.Unlock()
sS.unindexSession(cgrID, passive)
if !passive {
if s.sTerminator != nil &&
s.sTerminator.endChan != nil {
close(s.sTerminator.endChan)
s.sTerminator.endChan = nil
}
}
return true
}
@@ -1253,7 +1240,7 @@ func (sS *SessionS) relocateSession(initOriginID, originID, originHost string) (
// getRelocateSession will relocate a session if it cannot find cgrID and initialOriginID is present
func (sS *SessionS) getRelocateSession(cgrID string, initOriginID,
originID, originHost string) (s *Session) {
if s = sS.getActivateSession(cgrID); s == nil ||
if s = sS.getActivateSession(cgrID); s != nil ||
initOriginID == "" {
return
}
@@ -1422,7 +1409,7 @@ func (sS *SessionS) updateSession(s *Session, updtEv engine.MapEvent) (maxUsage
sS.setSTerminator(s) // reset the terminator
//init has no updtEv
if updtEv == nil {
updtEv = engine.NewMapEvent(s.EventStart.Clone())
updtEv = engine.MapEvent(s.EventStart.Clone())
}
var reqMaxUsage time.Duration
if reqMaxUsage, err = updtEv.GetDuration(utils.Usage); err != nil {
@@ -1458,6 +1445,11 @@ func (sS *SessionS) endSession(s *Session, tUsage, lastUsage *time.Duration, aTi
//check if we have replicate connection and close the session there
defer sS.replicateSessions(s.CGRID, true, sS.sReplConns)
s.Lock() // no need to release it untill end since the session should be anyway closed
if s.sTerminator != nil &&
s.sTerminator.endChan != nil {
close(s.sTerminator.endChan)
s.sTerminator.endChan = nil
}
sS.unregisterSession(s.CGRID, false)
for sRunIdx, sr := range s.SRuns {
sUsage := sr.TotalUsage
@@ -2380,23 +2372,22 @@ func (sS *SessionS) BiRPCv1UpdateSession(clnt rpcclient.RpcClientConnection,
}
}
if args.UpdateSession {
me := engine.NewMapEvent(args.CGREvent.Event)
ev := engine.MapEvent(args.CGREvent.Event)
dbtItvl := sS.cgrCfg.SessionSCfg().DebitInterval
if me.HasField(utils.CGRDebitInterval) { // dynamic DebitInterval via CGRDebitInterval
if dbtItvl, err = me.GetDuration(utils.CGRDebitInterval); err != nil {
if ev.HasField(utils.CGRDebitInterval) { // dynamic DebitInterval via CGRDebitInterval
if dbtItvl, err = ev.GetDuration(utils.CGRDebitInterval); err != nil {
return utils.NewErrRALs(err)
}
}
ev := engine.MapEvent(args.CGREvent.Event)
cgrID := GetSetCGRID(ev)
s := sS.getRelocateSession(cgrID,
me.GetStringIgnoreErrors(utils.InitialOriginID),
me.GetStringIgnoreErrors(utils.OriginID),
me.GetStringIgnoreErrors(utils.OriginHost))
ev.GetStringIgnoreErrors(utils.InitialOriginID),
ev.GetStringIgnoreErrors(utils.OriginID),
ev.GetStringIgnoreErrors(utils.OriginHost))
if s == nil {
if s, err = sS.initSession(args.CGREvent.Tenant,
ev, sS.biJClntID(clnt),
me.GetStringIgnoreErrors(utils.OriginID),
ev.GetStringIgnoreErrors(utils.OriginID),
dbtItvl, args.ArgDispatcher); err != nil {
return utils.NewErrRALs(err)
}
@@ -2660,7 +2651,7 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection,
AttributeS: utils.BoolPointer(false),
ArgDispatcher: cgrEvWithArgDisp.ArgDispatcher,
}
if mp := engine.NewMapEvent(cgrEv.Event); mp.GetStringIgnoreErrors(utils.RunID) != utils.MetaRaw && // check if is *raw
if mp := engine.MapEvent(cgrEv.Event); mp.GetStringIgnoreErrors(utils.RunID) != utils.MetaRaw && // check if is *raw
unratedReqs.HasField(mp.GetStringIgnoreErrors(utils.RequestType)) { // order additional rating for unrated request types
argsProc.RALs = utils.BoolPointer(true)
}
@@ -2835,7 +2826,7 @@ func (sS *SessionS) BiRPCv1ProcessMessage(clnt rpcclient.RpcClientConnection,
if args.CGREvent.Tenant == "" {
args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant
}
me := engine.NewMapEvent(args.CGREvent.Event)
me := engine.MapEvent(args.CGREvent.Event)
originID := me.GetStringIgnoreErrors(utils.OriginID)
if args.GetAttributes {

View File

@@ -1167,46 +1167,6 @@ func TestSessionStransitSState(t *testing.T) {
}
}
func TestSessionSregisterSessionWithTerminator(t *testing.T) {
sSCfg, _ := config.NewDefaultCGRConfig()
config.SetCgrConfig(sSCfg)
sS := NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC")
sSEv := engine.NewMapEvent(map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.ToR: "*voice",
utils.OriginID: "111",
utils.Direction: "*out",
utils.Account: "account1",
utils.Subject: "subject1",
utils.Destination: "+4986517174963",
utils.Category: "call",
utils.Tenant: "cgrates.org",
utils.RequestType: "*prepaid",
utils.SetupTime: "2015-11-09 14:21:24",
utils.AnswerTime: "2015-11-09 14:22:02",
utils.Usage: "1m23s",
utils.LastUsed: "21s",
utils.PDD: "300ms",
utils.SUPPLIER: "supplier1",
utils.OriginHost: "127.0.0.1",
utils.SessionTTL: "2s", //used in setSTerminator
})
s := &Session{
CGRID: "session1",
EventStart: sSEv,
}
//register the session as active with terminator
sS.registerSession(s, false)
rcvS := sS.getSessions("session1", false)
if !reflect.DeepEqual(rcvS[0], s) {
t.Errorf("Expecting %+v, received: %+v", s, rcvS[0])
} else if rcvS[0].sTerminator.ttl != time.Duration(2*time.Second) {
t.Errorf("Expecting %+v, received: %+v",
time.Duration(2*time.Second), rcvS[0].sTerminator.ttl)
}
}
func TestSessionSrelocateSessionS(t *testing.T) {
sSCfg, _ := config.NewDefaultCGRConfig()
sS := NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC")

View File

@@ -899,7 +899,6 @@ func TestSessionsVoiceSessionTTLWithRelocate(t *testing.T) {
Value: utils.Float64Pointer(300 * float64(time.Second)),
RatingSubject: utils.StringPointer("*zero50ms"),
}
var reply string
if err := sessionsRPC.Call("ApierV2.SetBalance", attrSetBalance, &reply); err != nil {
t.Error(err)
@@ -1021,7 +1020,6 @@ func TestSessionsVoiceSessionTTLWithRelocate(t *testing.T) {
} else if aSessions[0].Usage != time.Duration(150)*time.Second {
t.Errorf("Expecting 2m30s, received usage: %v", aSessions[0].Usage)
}
eAcntVal = 150.0 * float64(time.Second)
if err := sessionsRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil {
t.Error(err)
@@ -1030,7 +1028,7 @@ func TestSessionsVoiceSessionTTLWithRelocate(t *testing.T) {
eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue())
}
time.Sleep(200 * time.Millisecond)
time.Sleep(200 * time.Millisecond) // should trigger the TTL from config
eAcntVal = 149.95 * float64(time.Second)
if err := sessionsRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil {
t.Error(err)