From 4030357fe04a6a9bcd668b13b9f73de4e91ef59f Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 10 Oct 2019 10:56:04 +0200 Subject: [PATCH] SessionS - Simplified TTL handling --- apier/v1/apier2_it_test.go | 2 -- sessions/sessions.go | 47 ++++++++++++------------------ sessions/sessions_test.go | 40 ------------------------- sessions/sessions_voice_it_test.go | 4 +-- 4 files changed, 20 insertions(+), 73 deletions(-) diff --git a/apier/v1/apier2_it_test.go b/apier/v1/apier2_it_test.go index 9aecff5e5..cc03ec67e 100644 --- a/apier/v1/apier2_it_test.go +++ b/apier/v1/apier2_it_test.go @@ -21,7 +21,6 @@ along with this program. If not, see package v1 import ( - "fmt" "net/rpc" "net/rpc/jsonrpc" "path" @@ -153,7 +152,6 @@ func testAPIerVerifyAttributesAfterLoad(t *testing.T) { var attrReply *engine.AttributeProfile if err := apierRPC.Call(utils.AttributeSv1GetAttributeForEvent, ev, &attrReply); err != nil { - fmt.Println(err) t.Error(err) } if attrReply == nil { diff --git a/sessions/sessions.go b/sessions/sessions.go index 3561e9326..dc6cef5f9 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -340,11 +340,10 @@ func (sS *SessionS) setSTerminator(s *Session) { s.sTerminator.timer.Reset(s.sTerminator.ttl) return } - endChan := make(chan struct{}) // new set s.sTerminator = &sTerminator{ timer: time.NewTimer(ttl), - endChan: endChan, + endChan: make(chan struct{}), ttl: ttl, ttlLastUsed: ttlLastUsed, ttlUsage: ttlUsage, @@ -360,7 +359,7 @@ func (sS *SessionS) setSTerminator(s *Session) { } sS.forceSTerminate(s, endUsage, s.sTerminator.ttlLastUsed) - case <-endChan: + case <-s.sTerminator.endChan: s.sTerminator.timer.Stop() } }() @@ -766,10 +765,6 @@ func (sS *SessionS) registerSession(s *Session, passive bool) { sMp[s.CGRID] = s sMux.Unlock() sS.indexSession(s, passive) - if !passive { - sS.setSTerminator(s) - } - } // uregisterSession will unregister an active or passive session based on it's CGRID @@ -782,21 +777,13 @@ func (sS *SessionS) unregisterSession(cgrID string, passive bool) bool { sMp = sS.pSessions } sMux.Lock() - s, found := sMp[cgrID] - if !found { + if _, has := sMp[cgrID]; !has { sMux.Unlock() return false } delete(sMp, cgrID) sMux.Unlock() sS.unindexSession(cgrID, passive) - if !passive { - if s.sTerminator != nil && - s.sTerminator.endChan != nil { - close(s.sTerminator.endChan) - s.sTerminator.endChan = nil - } - } return true } @@ -1253,7 +1240,7 @@ func (sS *SessionS) relocateSession(initOriginID, originID, originHost string) ( // getRelocateSession will relocate a session if it cannot find cgrID and initialOriginID is present func (sS *SessionS) getRelocateSession(cgrID string, initOriginID, originID, originHost string) (s *Session) { - if s = sS.getActivateSession(cgrID); s == nil || + if s = sS.getActivateSession(cgrID); s != nil || initOriginID == "" { return } @@ -1422,7 +1409,7 @@ func (sS *SessionS) updateSession(s *Session, updtEv engine.MapEvent) (maxUsage sS.setSTerminator(s) // reset the terminator //init has no updtEv if updtEv == nil { - updtEv = engine.NewMapEvent(s.EventStart.Clone()) + updtEv = engine.MapEvent(s.EventStart.Clone()) } var reqMaxUsage time.Duration if reqMaxUsage, err = updtEv.GetDuration(utils.Usage); err != nil { @@ -1458,6 +1445,11 @@ func (sS *SessionS) endSession(s *Session, tUsage, lastUsage *time.Duration, aTi //check if we have replicate connection and close the session there defer sS.replicateSessions(s.CGRID, true, sS.sReplConns) s.Lock() // no need to release it untill end since the session should be anyway closed + if s.sTerminator != nil && + s.sTerminator.endChan != nil { + close(s.sTerminator.endChan) + s.sTerminator.endChan = nil + } sS.unregisterSession(s.CGRID, false) for sRunIdx, sr := range s.SRuns { sUsage := sr.TotalUsage @@ -2380,23 +2372,22 @@ func (sS *SessionS) BiRPCv1UpdateSession(clnt rpcclient.RpcClientConnection, } } if args.UpdateSession { - me := engine.NewMapEvent(args.CGREvent.Event) + ev := engine.MapEvent(args.CGREvent.Event) dbtItvl := sS.cgrCfg.SessionSCfg().DebitInterval - if me.HasField(utils.CGRDebitInterval) { // dynamic DebitInterval via CGRDebitInterval - if dbtItvl, err = me.GetDuration(utils.CGRDebitInterval); err != nil { + if ev.HasField(utils.CGRDebitInterval) { // dynamic DebitInterval via CGRDebitInterval + if dbtItvl, err = ev.GetDuration(utils.CGRDebitInterval); err != nil { return utils.NewErrRALs(err) } } - ev := engine.MapEvent(args.CGREvent.Event) cgrID := GetSetCGRID(ev) s := sS.getRelocateSession(cgrID, - me.GetStringIgnoreErrors(utils.InitialOriginID), - me.GetStringIgnoreErrors(utils.OriginID), - me.GetStringIgnoreErrors(utils.OriginHost)) + ev.GetStringIgnoreErrors(utils.InitialOriginID), + ev.GetStringIgnoreErrors(utils.OriginID), + ev.GetStringIgnoreErrors(utils.OriginHost)) if s == nil { if s, err = sS.initSession(args.CGREvent.Tenant, ev, sS.biJClntID(clnt), - me.GetStringIgnoreErrors(utils.OriginID), + ev.GetStringIgnoreErrors(utils.OriginID), dbtItvl, args.ArgDispatcher); err != nil { return utils.NewErrRALs(err) } @@ -2660,7 +2651,7 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection, AttributeS: utils.BoolPointer(false), ArgDispatcher: cgrEvWithArgDisp.ArgDispatcher, } - if mp := engine.NewMapEvent(cgrEv.Event); mp.GetStringIgnoreErrors(utils.RunID) != utils.MetaRaw && // check if is *raw + if mp := engine.MapEvent(cgrEv.Event); mp.GetStringIgnoreErrors(utils.RunID) != utils.MetaRaw && // check if is *raw unratedReqs.HasField(mp.GetStringIgnoreErrors(utils.RequestType)) { // order additional rating for unrated request types argsProc.RALs = utils.BoolPointer(true) } @@ -2835,7 +2826,7 @@ func (sS *SessionS) BiRPCv1ProcessMessage(clnt rpcclient.RpcClientConnection, if args.CGREvent.Tenant == "" { args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant } - me := engine.NewMapEvent(args.CGREvent.Event) + me := engine.MapEvent(args.CGREvent.Event) originID := me.GetStringIgnoreErrors(utils.OriginID) if args.GetAttributes { diff --git a/sessions/sessions_test.go b/sessions/sessions_test.go index 8e6b3165b..1859cf5b3 100644 --- a/sessions/sessions_test.go +++ b/sessions/sessions_test.go @@ -1167,46 +1167,6 @@ func TestSessionStransitSState(t *testing.T) { } } -func TestSessionSregisterSessionWithTerminator(t *testing.T) { - sSCfg, _ := config.NewDefaultCGRConfig() - config.SetCgrConfig(sSCfg) - sS := NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC") - sSEv := engine.NewMapEvent(map[string]interface{}{ - utils.EVENT_NAME: "TEST_EVENT", - utils.ToR: "*voice", - utils.OriginID: "111", - utils.Direction: "*out", - utils.Account: "account1", - utils.Subject: "subject1", - utils.Destination: "+4986517174963", - utils.Category: "call", - utils.Tenant: "cgrates.org", - utils.RequestType: "*prepaid", - utils.SetupTime: "2015-11-09 14:21:24", - utils.AnswerTime: "2015-11-09 14:22:02", - utils.Usage: "1m23s", - utils.LastUsed: "21s", - utils.PDD: "300ms", - utils.SUPPLIER: "supplier1", - utils.OriginHost: "127.0.0.1", - utils.SessionTTL: "2s", //used in setSTerminator - }) - s := &Session{ - CGRID: "session1", - EventStart: sSEv, - } - //register the session as active with terminator - sS.registerSession(s, false) - - rcvS := sS.getSessions("session1", false) - if !reflect.DeepEqual(rcvS[0], s) { - t.Errorf("Expecting %+v, received: %+v", s, rcvS[0]) - } else if rcvS[0].sTerminator.ttl != time.Duration(2*time.Second) { - t.Errorf("Expecting %+v, received: %+v", - time.Duration(2*time.Second), rcvS[0].sTerminator.ttl) - } -} - func TestSessionSrelocateSessionS(t *testing.T) { sSCfg, _ := config.NewDefaultCGRConfig() sS := NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC") diff --git a/sessions/sessions_voice_it_test.go b/sessions/sessions_voice_it_test.go index 276b426f2..f75448952 100644 --- a/sessions/sessions_voice_it_test.go +++ b/sessions/sessions_voice_it_test.go @@ -899,7 +899,6 @@ func TestSessionsVoiceSessionTTLWithRelocate(t *testing.T) { Value: utils.Float64Pointer(300 * float64(time.Second)), RatingSubject: utils.StringPointer("*zero50ms"), } - var reply string if err := sessionsRPC.Call("ApierV2.SetBalance", attrSetBalance, &reply); err != nil { t.Error(err) @@ -1021,7 +1020,6 @@ func TestSessionsVoiceSessionTTLWithRelocate(t *testing.T) { } else if aSessions[0].Usage != time.Duration(150)*time.Second { t.Errorf("Expecting 2m30s, received usage: %v", aSessions[0].Usage) } - eAcntVal = 150.0 * float64(time.Second) if err := sessionsRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { t.Error(err) @@ -1030,7 +1028,7 @@ func TestSessionsVoiceSessionTTLWithRelocate(t *testing.T) { eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue()) } - time.Sleep(200 * time.Millisecond) + time.Sleep(200 * time.Millisecond) // should trigger the TTL from config eAcntVal = 149.95 * float64(time.Second) if err := sessionsRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { t.Error(err)