From 2548b2abad7fb97679365c8706459e6ed8b1b1a0 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 23 Dec 2021 10:35:57 +0200 Subject: [PATCH] Updated filter sessions --- sessions/session.go | 3 -- sessions/sessions.go | 81 ++++++++++++++++++---------------- sessions/sessions_test.go | 93 +++++++++++++++++++++++++++++---------- 3 files changed, 113 insertions(+), 64 deletions(-) diff --git a/sessions/session.go b/sessions/session.go index ac5af9c76..6da3be3de 100644 --- a/sessions/session.go +++ b/sessions/session.go @@ -19,7 +19,6 @@ along with this program. If not, see package sessions import ( - "fmt" "runtime" "sync" "time" @@ -291,7 +290,6 @@ func (sr *SRun) debitReserve(dur time.Duration, lastUsage *time.Duration) (rDur if lastUsage != nil && sr.LastUsage != *lastUsage { diffUsage := sr.LastUsage - *lastUsage - fmt.Println("diffUsage ", diffUsage) sr.ExtraDuration += diffUsage sr.TotalUsage -= sr.LastUsage sr.TotalUsage += *lastUsage @@ -304,7 +302,6 @@ func (sr *SRun) debitReserve(dur time.Duration, lastUsage *time.Duration) (rDur sr.TotalUsage += dur } else { rDur = dur - sr.ExtraDuration - fmt.Println("sr.ExtraDuration ", sr.ExtraDuration) sr.ExtraDuration = 0 } return diff --git a/sessions/sessions.go b/sessions/sessions.go index 15b63bcdf..82b520cca 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -422,12 +422,10 @@ func (sS *SessionS) debitSession(s *Session, sRunIdx int, dur time.Duration, } sr := s.SRuns[sRunIdx] rDur := sr.debitReserve(dur, lastUsed) // debit out of reserve, rDur is still to be debited - fmt.Println("rDur ", rDur) if rDur == time.Duration(0) { return dur, nil // complete debit out of reserve } dbtRsrv := dur - rDur // the amount debited from reserve - fmt.Println("dbtRsrv ", dbtRsrv) if sr.CD.LoopIndex > 0 { sr.CD.TimeStart = sr.CD.TimeEnd } @@ -445,7 +443,6 @@ func (sS *SessionS) debitSession(s *Session, sRunIdx int, dur time.Duration, } sr.CD.TimeEnd = cc.GetEndTime() // set debited timeEnd ccDuration := cc.GetDuration() - fmt.Println("ccDuration ", ccDuration) if ccDuration > rDur { sr.ExtraDuration = ccDuration - rDur } @@ -454,7 +451,6 @@ func (sS *SessionS) debitSession(s *Session, sRunIdx int, dur time.Duration, } else { sr.LastUsage = ccDuration + dbtRsrv } - fmt.Println("sr.LastUsage ", sr.LastUsage) sr.CD.DurationIndex -= rDur sr.CD.DurationIndex += ccDuration sr.CD.MaxCostSoFar += cc.Cost @@ -857,16 +853,16 @@ func (sS *SessionS) unindexSession(cgrID string, pSessions bool) bool { } func (sS *SessionS) getIndexedFilters(tenant string, fltrs []string) ( - indexedFltr map[string][]string, unindexedFltr []*engine.FilterRule) { + indexedFltr map[string][]string, unindexedFltr []*engine.FilterRule, err error) { indexedFltr = make(map[string][]string) for _, fltrID := range fltrs { - f, err := engine.GetFilter(sS.dm, tenant, fltrID, - true, true, utils.NonTransactional) - if err != nil { - // if err == utils.ErrNotFound { - // err = utils.ErrPrefixNotFound(fltrID) - // } - continue + var f *engine.Filter + if f, err = engine.GetFilter(sS.dm, tenant, fltrID, + true, true, utils.NonTransactional); err != nil { + if err == utils.ErrNotFound { + err = utils.ErrPrefixNotFound(fltrID) + } + return } if f.ActivationInterval != nil && !f.ActivationInterval.IsActiveAtTime(time.Now()) { // not active @@ -953,7 +949,7 @@ func (sS *SessionS) getSessionIDsMatchingIndexes(fltrs map[string][]string, // filterSessions will return a list of sessions in external format based on filters passed // is thread safe for the Sessions -func (sS *SessionS) filterSessions(sf *utils.SessionFilter, psv bool) (aSs []*ExternalSession) { +func (sS *SessionS) filterSessions(sf *utils.SessionFilter, psv bool) (aSs []*ExternalSession, err error) { if len(sf.Filters) == 0 { ss := sS.getSessions(utils.EmptyString, psv) for _, s := range ss { @@ -961,13 +957,18 @@ func (sS *SessionS) filterSessions(sf *utils.SessionFilter, psv bool) (aSs []*Ex s.AsExternalSessions(sS.cfg.GeneralCfg().DefaultTimezone, sS.cfg.GeneralCfg().NodeID)...) // Expensive for large number of sessions if sf.Limit != nil && *sf.Limit > 0 && *sf.Limit < len(aSs) { - return aSs[:*sf.Limit] + return aSs[:*sf.Limit], nil } } return } tenant := utils.FirstNonEmpty(sf.Tenant, sS.cfg.GeneralCfg().DefaultTenant) - indx, unindx := sS.getIndexedFilters(tenant, sf.Filters) + indx, unindx, err := sS.getIndexedFilters(tenant, sf.Filters) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error <%s> quering filters", utils.SessionS, err.Error())) + return nil, err + } cgrIDs, matchingSRuns := sS.getSessionIDsMatchingIndexes(indx, psv) if len(indx) != 0 && len(cgrIDs) == 0 { // no sessions matched the indexed filters return @@ -988,7 +989,6 @@ func (sS *SessionS) filterSessions(sf *utils.SessionFilter, psv bool) (aSs []*Ex fieldValuesDP[i] = ev } if pass, err = fltr.Pass(ev, fieldValuesDP); err != nil || !pass { - pass = false return } } @@ -1007,7 +1007,7 @@ func (sS *SessionS) filterSessions(sf *utils.SessionFilter, psv bool) (aSs []*Ex sS.cfg.GeneralCfg().NodeID)) // Expensive for large number of sessions if sf.Limit != nil && *sf.Limit > 0 && *sf.Limit < len(aSs) { s.RUnlock() - return aSs[:*sf.Limit] + return aSs[:*sf.Limit], nil } } } @@ -1017,7 +1017,7 @@ func (sS *SessionS) filterSessions(sf *utils.SessionFilter, psv bool) (aSs []*Ex } // filterSessionsCount re -func (sS *SessionS) filterSessionsCount(sf *utils.SessionFilter, psv bool) (count int) { +func (sS *SessionS) filterSessionsCount(sf *utils.SessionFilter, psv bool) (count int, err error) { count = 0 if len(sf.Filters) == 0 { ss := sS.getSessions(utils.EmptyString, psv) @@ -1027,7 +1027,12 @@ func (sS *SessionS) filterSessionsCount(sf *utils.SessionFilter, psv bool) (coun return } tenant := utils.FirstNonEmpty(sf.Tenant, sS.cfg.GeneralCfg().DefaultTenant) - indx, unindx := sS.getIndexedFilters(tenant, sf.Filters) + indx, unindx, err := sS.getIndexedFilters(tenant, sf.Filters) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error <%s> quering filters", utils.SessionS, err.Error())) + return 0, err + } cgrIDs, matchingSRuns := sS.getSessionIDsMatchingIndexes(indx, psv) if len(indx) != 0 && len(cgrIDs) == 0 { // no sessions matched the indexed filters return @@ -1644,46 +1649,44 @@ func (sS *SessionS) BiRPCv1GetActiveSessions(clnt rpcclient.ClientConnector, if args == nil { //protection in case on nil args = &utils.SessionFilter{} } - aSs := sS.filterSessions(args, false) - if len(aSs) == 0 { - return utils.ErrNotFound + *reply, err = sS.filterSessions(args, false) + if err == nil && len(*reply) == 0 { + err = utils.ErrNotFound } - *reply = aSs - return nil + return } // BiRPCv1GetActiveSessionsCount counts the active sessions func (sS *SessionS) BiRPCv1GetActiveSessionsCount(clnt rpcclient.ClientConnector, - args *utils.SessionFilter, reply *int) error { + args *utils.SessionFilter, reply *int) (err error) { if args == nil { //protection in case on nil args = &utils.SessionFilter{} } - *reply = sS.filterSessionsCount(args, false) - return nil + *reply, err = sS.filterSessionsCount(args, false) + return } // BiRPCv1GetPassiveSessions returns the passive sessions handled by SessionS func (sS *SessionS) BiRPCv1GetPassiveSessions(clnt rpcclient.ClientConnector, - args *utils.SessionFilter, reply *[]*ExternalSession) error { + args *utils.SessionFilter, reply *[]*ExternalSession) (err error) { if args == nil { //protection in case on nil args = &utils.SessionFilter{} } - pSs := sS.filterSessions(args, true) - if len(pSs) == 0 { - return utils.ErrNotFound + *reply, err = sS.filterSessions(args, true) + if err == nil && len(*reply) == 0 { + err = utils.ErrNotFound } - *reply = pSs - return nil + return } // BiRPCv1GetPassiveSessionsCount counts the passive sessions handled by the system func (sS *SessionS) BiRPCv1GetPassiveSessionsCount(clnt rpcclient.ClientConnector, - args *utils.SessionFilter, reply *int) error { + args *utils.SessionFilter, reply *int) (err error) { if args == nil { //protection in case on nil args = &utils.SessionFilter{} } - *reply = sS.filterSessionsCount(args, true) - return nil + *reply, err = sS.filterSessionsCount(args, true) + return } // BiRPCv1SetPassiveSession used for replicating Sessions @@ -3418,8 +3421,10 @@ func (sS *SessionS) BiRPCv1ForceDisconnect(clnt rpcclient.ClientConnector, if len(args.Filters) != 0 && sS.dm == nil { return utils.ErrNoDatabaseConn } - aSs := sS.filterSessions(args, false) - if len(aSs) == 0 { + var aSs []*ExternalSession + if aSs, err = sS.filterSessions(args, false); err != nil { + return + } else if len(aSs) == 0 { return utils.ErrNotFound } for _, as := range aSs { diff --git a/sessions/sessions_test.go b/sessions/sessions_test.go index 8693a9036..c399d40ca 100644 --- a/sessions/sessions_test.go +++ b/sessions/sessions_test.go @@ -1633,7 +1633,9 @@ func TestSessionSGetIndexedFilters(t *testing.T) { Values: []string{utils.VOICE}, }} fltrs := []string{"*string:~*req.ToR:*voice"} - if rplyindx, rplyUnindx := sS.getIndexedFilters("", fltrs); !reflect.DeepEqual(expIndx, rplyindx) { + if rplyindx, rplyUnindx, err := sS.getIndexedFilters("", fltrs); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(expIndx, rplyindx) { t.Errorf("Expected %s , received: %s", utils.ToJSON(expIndx), utils.ToJSON(rplyindx)) } else if !reflect.DeepEqual(expUindx, rplyUnindx) { t.Errorf("Expected %s , received: %s", utils.ToJSON(expUindx), utils.ToJSON(rplyUnindx)) @@ -1644,7 +1646,9 @@ func TestSessionSGetIndexedFilters(t *testing.T) { sS = NewSessionS(sSCfg, engine.NewDataManager(mpStr, config.CgrConfig().CacheCfg(), nil), nil) expIndx = map[string][]string{utils.ToR: {utils.VOICE}} expUindx = nil - if rplyindx, rplyUnindx := sS.getIndexedFilters("", fltrs); !reflect.DeepEqual(expIndx, rplyindx) { + if rplyindx, rplyUnindx, err := sS.getIndexedFilters("", fltrs); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(expIndx, rplyindx) { t.Errorf("Expected %s , received: %s", utils.ToJSON(expIndx), utils.ToJSON(rplyindx)) } else if !reflect.DeepEqual(expUindx, rplyUnindx) { t.Errorf("Expected %s , received: %s", utils.ToJSON(expUindx), utils.ToJSON(rplyUnindx)) @@ -1662,7 +1666,9 @@ func TestSessionSGetIndexedFilters(t *testing.T) { expIndx = map[string][]string{} expUindx = nil fltrs = []string{"FLTR1", "FLTR2"} - if rplyindx, rplyUnindx := sS.getIndexedFilters("cgrates.org", fltrs); !reflect.DeepEqual(expIndx, rplyindx) { + if rplyindx, rplyUnindx, err := sS.getIndexedFilters("cgrates.org", fltrs); err == nil || err.Error() != utils.ErrPrefixNotFound("FLTR2").Error() { + t.Fatal(err) + } else if !reflect.DeepEqual(expIndx, rplyindx) { t.Errorf("Expected %s , received: %s", utils.ToJSON(expIndx), utils.ToJSON(rplyindx)) } else if !reflect.DeepEqual(expUindx, rplyUnindx) { t.Errorf("Expected %s , received: %s", utils.ToJSON(expUindx), utils.ToJSON(rplyUnindx)) @@ -2133,22 +2139,32 @@ func TestSessionSfilterSessions(t *testing.T) { eses1, } fltrs := &utils.SessionFilter{Filters: []string{fmt.Sprintf("*string:~*req.ToR:%s;%s", utils.VOICE, utils.DATA)}} - if sess := sS.filterSessions(fltrs, true); len(sess) != 0 { + if sess, err := sS.filterSessions(fltrs, true); err != nil { + t.Error(err) + } else if len(sess) != 0 { t.Errorf("Expected no session, received: %s", utils.ToJSON(sess)) } - if sess := sS.filterSessions(fltrs, false); !reflect.DeepEqual(expSess, sess) { + if sess, err := sS.filterSessions(fltrs, false); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expSess, sess) { t.Errorf("Expected %s , received: %s", utils.ToJSON(expSess), utils.ToJSON(sess)) } fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.ToR:NoToR", "*string:~*req.Subject:subject1"}} - if sess := sS.filterSessions(fltrs, false); len(sess) != 0 { + if sess, err := sS.filterSessions(fltrs, false); err != nil { + t.Error(err) + } else if len(sess) != 0 { t.Errorf("Expected no session, received: %s", utils.ToJSON(sess)) } fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.ToR:NoToR"}} - if sess := sS.filterSessions(fltrs, false); len(sess) != 0 { + if sess, err := sS.filterSessions(fltrs, false); err != nil { + t.Error(err) + } else if len(sess) != 0 { t.Errorf("Expected no session, received: %s", utils.ToJSON(sess)) } fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.ToR:*voice", "*string:~*req.Subject:subject1"}} - if sess := sS.filterSessions(fltrs, false); !reflect.DeepEqual(expSess, sess) { + if sess, err := sS.filterSessions(fltrs, false); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expSess, sess) { t.Errorf("Expected %s , received: %s", utils.ToJSON(expSess), utils.ToJSON(sess)) } sSCfg.SessionSCfg().SessionIndexes = utils.StringMap{ @@ -2158,15 +2174,21 @@ func TestSessionSfilterSessions(t *testing.T) { sS = NewSessionS(sSCfg, nil, nil) sS.registerSession(session, false) fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.ToR:*voice", "*string:~*req.Subject:subject1"}} - if sess := sS.filterSessions(fltrs, false); !reflect.DeepEqual(expSess, sess) { + if sess, err := sS.filterSessions(fltrs, false); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expSess, sess) { t.Errorf("Expected %s , received: %s", utils.ToJSON(expSess), utils.ToJSON(sess)) } fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.Subject:subject1"}} - if sess := sS.filterSessions(fltrs, false); !reflect.DeepEqual(expSess, sess) { + if sess, err := sS.filterSessions(fltrs, false); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expSess, sess) { t.Errorf("Expected %s , received: %s", utils.ToJSON(expSess), utils.ToJSON(sess)) } fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.Subject:subject3"}} - if sess := sS.filterSessions(fltrs, false); len(sess) != 0 { + if sess, err := sS.filterSessions(fltrs, false); err != nil { + t.Error(err) + } else if len(sess) != 0 { t.Errorf("Expected no session, received: %s", utils.ToJSON(sess)) } expSess = append(expSess, eses2) @@ -2174,7 +2196,10 @@ func TestSessionSfilterSessions(t *testing.T) { return strings.Compare(expSess[i].ToR, expSess[j].ToR) == -1 }) fltrs = &utils.SessionFilter{Filters: []string{}} - sess := sS.filterSessions(fltrs, false) + sess, err := sS.filterSessions(fltrs, false) + if err != nil { + t.Error(err) + } sort.Slice(sess, func(i, j int) bool { return strings.Compare(sess[i].ToR, sess[j].ToR) == -1 }) @@ -2182,13 +2207,17 @@ func TestSessionSfilterSessions(t *testing.T) { t.Errorf("Expected %s , received: %s", utils.ToJSON(expSess), utils.ToJSON(sess)) } fltrs = &utils.SessionFilter{Filters: []string{}, Limit: utils.IntPointer(1)} - if sess := sS.filterSessions(fltrs, false); len(sess) != 1 { + if sess, err := sS.filterSessions(fltrs, false); err != nil { + t.Error(err) + } else if len(sess) != 1 { t.Errorf("Expected one session, received: %s", utils.ToJSON(sess)) } else if !reflect.DeepEqual(expSess[0], eses1) && !reflect.DeepEqual(expSess[0], eses2) { t.Errorf("Expected %s or %s, received: %s", utils.ToJSON(eses1), utils.ToJSON(eses2), utils.ToJSON(sess[0])) } fltrs = &utils.SessionFilter{Filters: []string{fmt.Sprintf("*string:~*req.ToR:%s;%s", utils.VOICE, utils.SMS)}, Limit: utils.IntPointer(1)} - if sess := sS.filterSessions(fltrs, false); len(sess) != 1 { + if sess, err := sS.filterSessions(fltrs, false); err != nil { + t.Error(err) + } else if len(sess) != 1 { t.Errorf("Expected one session, received: %s", utils.ToJSON(sess)) } else if !reflect.DeepEqual(expSess[0], eses1) && !reflect.DeepEqual(expSess[0], eses2) { t.Errorf("Expected %s or %s, received: %s", utils.ToJSON(eses1), utils.ToJSON(eses2), utils.ToJSON(sess[0])) @@ -2248,19 +2277,27 @@ func TestSessionSfilterSessionsCount(t *testing.T) { sS.registerSession(session, false) fltrs := &utils.SessionFilter{Filters: []string{fmt.Sprintf("*string:~*req.ToR:%s;%s", utils.VOICE, utils.DATA)}} - if noSess := sS.filterSessionsCount(fltrs, false); noSess != 2 { + if noSess, err := sS.filterSessionsCount(fltrs, false); err != nil { + t.Error(err) + } else if noSess != 2 { t.Errorf("Expected %v , received: %s", 2, utils.ToJSON(noSess)) } fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.ToR:NoToR", "*string:~*req.Subject:subject1"}} - if noSess := sS.filterSessionsCount(fltrs, false); noSess != 0 { + if noSess, err := sS.filterSessionsCount(fltrs, false); err != nil { + t.Error(err) + } else if noSess != 0 { t.Errorf("Expected no session, received: %s", utils.ToJSON(noSess)) } fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.ToR:NoToR"}} - if noSess := sS.filterSessionsCount(fltrs, false); noSess != 0 { + if noSess, err := sS.filterSessionsCount(fltrs, false); err != nil { + t.Error(err) + } else if noSess != 0 { t.Errorf("Expected no session, received: %s", utils.ToJSON(noSess)) } fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.ToR:*voice", "*string:~*req.Subject:subject1"}} - if noSess := sS.filterSessionsCount(fltrs, false); noSess != 1 { + if noSess, err := sS.filterSessionsCount(fltrs, false); err != nil { + t.Error(err) + } else if noSess != 1 { t.Errorf("Expected %v , received: %s", 1, utils.ToJSON(noSess)) } sSCfg.SessionSCfg().SessionIndexes = utils.StringMap{ @@ -2270,25 +2307,35 @@ func TestSessionSfilterSessionsCount(t *testing.T) { sS = NewSessionS(sSCfg, nil, nil) sS.registerSession(session, false) fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.ToR:*voice", "*string:~*req.Subject:subject1"}} - if noSess := sS.filterSessionsCount(fltrs, false); noSess != 1 { + if noSess, err := sS.filterSessionsCount(fltrs, false); err != nil { + t.Error(err) + } else if noSess != 1 { t.Errorf("Expected %v , received: %s", 1, utils.ToJSON(noSess)) } fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.Subject:subject1"}} - if noSess := sS.filterSessionsCount(fltrs, false); noSess != 2 { + if noSess, err := sS.filterSessionsCount(fltrs, false); err != nil { + t.Error(err) + } else if noSess != 2 { t.Errorf("Expected %v , received: %s", 2, utils.ToJSON(noSess)) } fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.Subject:subject2"}} - if noSess := sS.filterSessionsCount(fltrs, false); noSess != 0 { + if noSess, err := sS.filterSessionsCount(fltrs, false); err != nil { + t.Error(err) + } else if noSess != 0 { t.Errorf("Expected no session, received: %s", utils.ToJSON(noSess)) } fltrs = &utils.SessionFilter{Filters: []string{}} - if noSess := sS.filterSessionsCount(fltrs, false); noSess != 2 { + if noSess, err := sS.filterSessionsCount(fltrs, false); err != nil { + t.Error(err) + } else if noSess != 2 { t.Errorf("Expected %v , received: %s", 2, utils.ToJSON(noSess)) } sS = NewSessionS(sSCfg, nil, nil) sS.registerSession(session, true) fltrs = &utils.SessionFilter{Filters: []string{fmt.Sprintf("*string:~*req.ToR:%s;%s", utils.VOICE, utils.DATA)}} - if noSess := sS.filterSessionsCount(fltrs, true); noSess != 2 { + if noSess, err := sS.filterSessionsCount(fltrs, true); err != nil { + t.Error(err) + } else if noSess != 2 { t.Errorf("Expected %v , received: %s", 2, utils.ToJSON(noSess)) } }