Updated filter sessions

This commit is contained in:
Trial97
2021-12-23 10:35:57 +02:00
committed by Dan Christian Bogos
parent 65a9b4df9d
commit 2548b2abad
3 changed files with 113 additions and 64 deletions

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package sessions
import (
"fmt"
"runtime"
"sync"
"time"
@@ -291,7 +290,6 @@ func (sr *SRun) debitReserve(dur time.Duration, lastUsage *time.Duration) (rDur
if lastUsage != nil &&
sr.LastUsage != *lastUsage {
diffUsage := sr.LastUsage - *lastUsage
fmt.Println("diffUsage ", diffUsage)
sr.ExtraDuration += diffUsage
sr.TotalUsage -= sr.LastUsage
sr.TotalUsage += *lastUsage
@@ -304,7 +302,6 @@ func (sr *SRun) debitReserve(dur time.Duration, lastUsage *time.Duration) (rDur
sr.TotalUsage += dur
} else {
rDur = dur - sr.ExtraDuration
fmt.Println("sr.ExtraDuration ", sr.ExtraDuration)
sr.ExtraDuration = 0
}
return

View File

@@ -422,12 +422,10 @@ func (sS *SessionS) debitSession(s *Session, sRunIdx int, dur time.Duration,
}
sr := s.SRuns[sRunIdx]
rDur := sr.debitReserve(dur, lastUsed) // debit out of reserve, rDur is still to be debited
fmt.Println("rDur ", rDur)
if rDur == time.Duration(0) {
return dur, nil // complete debit out of reserve
}
dbtRsrv := dur - rDur // the amount debited from reserve
fmt.Println("dbtRsrv ", dbtRsrv)
if sr.CD.LoopIndex > 0 {
sr.CD.TimeStart = sr.CD.TimeEnd
}
@@ -445,7 +443,6 @@ func (sS *SessionS) debitSession(s *Session, sRunIdx int, dur time.Duration,
}
sr.CD.TimeEnd = cc.GetEndTime() // set debited timeEnd
ccDuration := cc.GetDuration()
fmt.Println("ccDuration ", ccDuration)
if ccDuration > rDur {
sr.ExtraDuration = ccDuration - rDur
}
@@ -454,7 +451,6 @@ func (sS *SessionS) debitSession(s *Session, sRunIdx int, dur time.Duration,
} else {
sr.LastUsage = ccDuration + dbtRsrv
}
fmt.Println("sr.LastUsage ", sr.LastUsage)
sr.CD.DurationIndex -= rDur
sr.CD.DurationIndex += ccDuration
sr.CD.MaxCostSoFar += cc.Cost
@@ -857,16 +853,16 @@ func (sS *SessionS) unindexSession(cgrID string, pSessions bool) bool {
}
func (sS *SessionS) getIndexedFilters(tenant string, fltrs []string) (
indexedFltr map[string][]string, unindexedFltr []*engine.FilterRule) {
indexedFltr map[string][]string, unindexedFltr []*engine.FilterRule, err error) {
indexedFltr = make(map[string][]string)
for _, fltrID := range fltrs {
f, err := engine.GetFilter(sS.dm, tenant, fltrID,
true, true, utils.NonTransactional)
if err != nil {
// if err == utils.ErrNotFound {
// err = utils.ErrPrefixNotFound(fltrID)
// }
continue
var f *engine.Filter
if f, err = engine.GetFilter(sS.dm, tenant, fltrID,
true, true, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefixNotFound(fltrID)
}
return
}
if f.ActivationInterval != nil &&
!f.ActivationInterval.IsActiveAtTime(time.Now()) { // not active
@@ -953,7 +949,7 @@ func (sS *SessionS) getSessionIDsMatchingIndexes(fltrs map[string][]string,
// filterSessions will return a list of sessions in external format based on filters passed
// is thread safe for the Sessions
func (sS *SessionS) filterSessions(sf *utils.SessionFilter, psv bool) (aSs []*ExternalSession) {
func (sS *SessionS) filterSessions(sf *utils.SessionFilter, psv bool) (aSs []*ExternalSession, err error) {
if len(sf.Filters) == 0 {
ss := sS.getSessions(utils.EmptyString, psv)
for _, s := range ss {
@@ -961,13 +957,18 @@ func (sS *SessionS) filterSessions(sf *utils.SessionFilter, psv bool) (aSs []*Ex
s.AsExternalSessions(sS.cfg.GeneralCfg().DefaultTimezone,
sS.cfg.GeneralCfg().NodeID)...) // Expensive for large number of sessions
if sf.Limit != nil && *sf.Limit > 0 && *sf.Limit < len(aSs) {
return aSs[:*sf.Limit]
return aSs[:*sf.Limit], nil
}
}
return
}
tenant := utils.FirstNonEmpty(sf.Tenant, sS.cfg.GeneralCfg().DefaultTenant)
indx, unindx := sS.getIndexedFilters(tenant, sf.Filters)
indx, unindx, err := sS.getIndexedFilters(tenant, sf.Filters)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error <%s> quering filters", utils.SessionS, err.Error()))
return nil, err
}
cgrIDs, matchingSRuns := sS.getSessionIDsMatchingIndexes(indx, psv)
if len(indx) != 0 && len(cgrIDs) == 0 { // no sessions matched the indexed filters
return
@@ -988,7 +989,6 @@ func (sS *SessionS) filterSessions(sf *utils.SessionFilter, psv bool) (aSs []*Ex
fieldValuesDP[i] = ev
}
if pass, err = fltr.Pass(ev, fieldValuesDP); err != nil || !pass {
pass = false
return
}
}
@@ -1007,7 +1007,7 @@ func (sS *SessionS) filterSessions(sf *utils.SessionFilter, psv bool) (aSs []*Ex
sS.cfg.GeneralCfg().NodeID)) // Expensive for large number of sessions
if sf.Limit != nil && *sf.Limit > 0 && *sf.Limit < len(aSs) {
s.RUnlock()
return aSs[:*sf.Limit]
return aSs[:*sf.Limit], nil
}
}
}
@@ -1017,7 +1017,7 @@ func (sS *SessionS) filterSessions(sf *utils.SessionFilter, psv bool) (aSs []*Ex
}
// filterSessionsCount re
func (sS *SessionS) filterSessionsCount(sf *utils.SessionFilter, psv bool) (count int) {
func (sS *SessionS) filterSessionsCount(sf *utils.SessionFilter, psv bool) (count int, err error) {
count = 0
if len(sf.Filters) == 0 {
ss := sS.getSessions(utils.EmptyString, psv)
@@ -1027,7 +1027,12 @@ func (sS *SessionS) filterSessionsCount(sf *utils.SessionFilter, psv bool) (coun
return
}
tenant := utils.FirstNonEmpty(sf.Tenant, sS.cfg.GeneralCfg().DefaultTenant)
indx, unindx := sS.getIndexedFilters(tenant, sf.Filters)
indx, unindx, err := sS.getIndexedFilters(tenant, sf.Filters)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error <%s> quering filters", utils.SessionS, err.Error()))
return 0, err
}
cgrIDs, matchingSRuns := sS.getSessionIDsMatchingIndexes(indx, psv)
if len(indx) != 0 && len(cgrIDs) == 0 { // no sessions matched the indexed filters
return
@@ -1644,46 +1649,44 @@ func (sS *SessionS) BiRPCv1GetActiveSessions(clnt rpcclient.ClientConnector,
if args == nil { //protection in case on nil
args = &utils.SessionFilter{}
}
aSs := sS.filterSessions(args, false)
if len(aSs) == 0 {
return utils.ErrNotFound
*reply, err = sS.filterSessions(args, false)
if err == nil && len(*reply) == 0 {
err = utils.ErrNotFound
}
*reply = aSs
return nil
return
}
// BiRPCv1GetActiveSessionsCount counts the active sessions
func (sS *SessionS) BiRPCv1GetActiveSessionsCount(clnt rpcclient.ClientConnector,
args *utils.SessionFilter, reply *int) error {
args *utils.SessionFilter, reply *int) (err error) {
if args == nil { //protection in case on nil
args = &utils.SessionFilter{}
}
*reply = sS.filterSessionsCount(args, false)
return nil
*reply, err = sS.filterSessionsCount(args, false)
return
}
// BiRPCv1GetPassiveSessions returns the passive sessions handled by SessionS
func (sS *SessionS) BiRPCv1GetPassiveSessions(clnt rpcclient.ClientConnector,
args *utils.SessionFilter, reply *[]*ExternalSession) error {
args *utils.SessionFilter, reply *[]*ExternalSession) (err error) {
if args == nil { //protection in case on nil
args = &utils.SessionFilter{}
}
pSs := sS.filterSessions(args, true)
if len(pSs) == 0 {
return utils.ErrNotFound
*reply, err = sS.filterSessions(args, true)
if err == nil && len(*reply) == 0 {
err = utils.ErrNotFound
}
*reply = pSs
return nil
return
}
// BiRPCv1GetPassiveSessionsCount counts the passive sessions handled by the system
func (sS *SessionS) BiRPCv1GetPassiveSessionsCount(clnt rpcclient.ClientConnector,
args *utils.SessionFilter, reply *int) error {
args *utils.SessionFilter, reply *int) (err error) {
if args == nil { //protection in case on nil
args = &utils.SessionFilter{}
}
*reply = sS.filterSessionsCount(args, true)
return nil
*reply, err = sS.filterSessionsCount(args, true)
return
}
// BiRPCv1SetPassiveSession used for replicating Sessions
@@ -3418,8 +3421,10 @@ func (sS *SessionS) BiRPCv1ForceDisconnect(clnt rpcclient.ClientConnector,
if len(args.Filters) != 0 && sS.dm == nil {
return utils.ErrNoDatabaseConn
}
aSs := sS.filterSessions(args, false)
if len(aSs) == 0 {
var aSs []*ExternalSession
if aSs, err = sS.filterSessions(args, false); err != nil {
return
} else if len(aSs) == 0 {
return utils.ErrNotFound
}
for _, as := range aSs {

View File

@@ -1633,7 +1633,9 @@ func TestSessionSGetIndexedFilters(t *testing.T) {
Values: []string{utils.VOICE},
}}
fltrs := []string{"*string:~*req.ToR:*voice"}
if rplyindx, rplyUnindx := sS.getIndexedFilters("", fltrs); !reflect.DeepEqual(expIndx, rplyindx) {
if rplyindx, rplyUnindx, err := sS.getIndexedFilters("", fltrs); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(expIndx, rplyindx) {
t.Errorf("Expected %s , received: %s", utils.ToJSON(expIndx), utils.ToJSON(rplyindx))
} else if !reflect.DeepEqual(expUindx, rplyUnindx) {
t.Errorf("Expected %s , received: %s", utils.ToJSON(expUindx), utils.ToJSON(rplyUnindx))
@@ -1644,7 +1646,9 @@ func TestSessionSGetIndexedFilters(t *testing.T) {
sS = NewSessionS(sSCfg, engine.NewDataManager(mpStr, config.CgrConfig().CacheCfg(), nil), nil)
expIndx = map[string][]string{utils.ToR: {utils.VOICE}}
expUindx = nil
if rplyindx, rplyUnindx := sS.getIndexedFilters("", fltrs); !reflect.DeepEqual(expIndx, rplyindx) {
if rplyindx, rplyUnindx, err := sS.getIndexedFilters("", fltrs); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(expIndx, rplyindx) {
t.Errorf("Expected %s , received: %s", utils.ToJSON(expIndx), utils.ToJSON(rplyindx))
} else if !reflect.DeepEqual(expUindx, rplyUnindx) {
t.Errorf("Expected %s , received: %s", utils.ToJSON(expUindx), utils.ToJSON(rplyUnindx))
@@ -1662,7 +1666,9 @@ func TestSessionSGetIndexedFilters(t *testing.T) {
expIndx = map[string][]string{}
expUindx = nil
fltrs = []string{"FLTR1", "FLTR2"}
if rplyindx, rplyUnindx := sS.getIndexedFilters("cgrates.org", fltrs); !reflect.DeepEqual(expIndx, rplyindx) {
if rplyindx, rplyUnindx, err := sS.getIndexedFilters("cgrates.org", fltrs); err == nil || err.Error() != utils.ErrPrefixNotFound("FLTR2").Error() {
t.Fatal(err)
} else if !reflect.DeepEqual(expIndx, rplyindx) {
t.Errorf("Expected %s , received: %s", utils.ToJSON(expIndx), utils.ToJSON(rplyindx))
} else if !reflect.DeepEqual(expUindx, rplyUnindx) {
t.Errorf("Expected %s , received: %s", utils.ToJSON(expUindx), utils.ToJSON(rplyUnindx))
@@ -2133,22 +2139,32 @@ func TestSessionSfilterSessions(t *testing.T) {
eses1,
}
fltrs := &utils.SessionFilter{Filters: []string{fmt.Sprintf("*string:~*req.ToR:%s;%s", utils.VOICE, utils.DATA)}}
if sess := sS.filterSessions(fltrs, true); len(sess) != 0 {
if sess, err := sS.filterSessions(fltrs, true); err != nil {
t.Error(err)
} else if len(sess) != 0 {
t.Errorf("Expected no session, received: %s", utils.ToJSON(sess))
}
if sess := sS.filterSessions(fltrs, false); !reflect.DeepEqual(expSess, sess) {
if sess, err := sS.filterSessions(fltrs, false); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expSess, sess) {
t.Errorf("Expected %s , received: %s", utils.ToJSON(expSess), utils.ToJSON(sess))
}
fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.ToR:NoToR", "*string:~*req.Subject:subject1"}}
if sess := sS.filterSessions(fltrs, false); len(sess) != 0 {
if sess, err := sS.filterSessions(fltrs, false); err != nil {
t.Error(err)
} else if len(sess) != 0 {
t.Errorf("Expected no session, received: %s", utils.ToJSON(sess))
}
fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.ToR:NoToR"}}
if sess := sS.filterSessions(fltrs, false); len(sess) != 0 {
if sess, err := sS.filterSessions(fltrs, false); err != nil {
t.Error(err)
} else if len(sess) != 0 {
t.Errorf("Expected no session, received: %s", utils.ToJSON(sess))
}
fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.ToR:*voice", "*string:~*req.Subject:subject1"}}
if sess := sS.filterSessions(fltrs, false); !reflect.DeepEqual(expSess, sess) {
if sess, err := sS.filterSessions(fltrs, false); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expSess, sess) {
t.Errorf("Expected %s , received: %s", utils.ToJSON(expSess), utils.ToJSON(sess))
}
sSCfg.SessionSCfg().SessionIndexes = utils.StringMap{
@@ -2158,15 +2174,21 @@ func TestSessionSfilterSessions(t *testing.T) {
sS = NewSessionS(sSCfg, nil, nil)
sS.registerSession(session, false)
fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.ToR:*voice", "*string:~*req.Subject:subject1"}}
if sess := sS.filterSessions(fltrs, false); !reflect.DeepEqual(expSess, sess) {
if sess, err := sS.filterSessions(fltrs, false); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expSess, sess) {
t.Errorf("Expected %s , received: %s", utils.ToJSON(expSess), utils.ToJSON(sess))
}
fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.Subject:subject1"}}
if sess := sS.filterSessions(fltrs, false); !reflect.DeepEqual(expSess, sess) {
if sess, err := sS.filterSessions(fltrs, false); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expSess, sess) {
t.Errorf("Expected %s , received: %s", utils.ToJSON(expSess), utils.ToJSON(sess))
}
fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.Subject:subject3"}}
if sess := sS.filterSessions(fltrs, false); len(sess) != 0 {
if sess, err := sS.filterSessions(fltrs, false); err != nil {
t.Error(err)
} else if len(sess) != 0 {
t.Errorf("Expected no session, received: %s", utils.ToJSON(sess))
}
expSess = append(expSess, eses2)
@@ -2174,7 +2196,10 @@ func TestSessionSfilterSessions(t *testing.T) {
return strings.Compare(expSess[i].ToR, expSess[j].ToR) == -1
})
fltrs = &utils.SessionFilter{Filters: []string{}}
sess := sS.filterSessions(fltrs, false)
sess, err := sS.filterSessions(fltrs, false)
if err != nil {
t.Error(err)
}
sort.Slice(sess, func(i, j int) bool {
return strings.Compare(sess[i].ToR, sess[j].ToR) == -1
})
@@ -2182,13 +2207,17 @@ func TestSessionSfilterSessions(t *testing.T) {
t.Errorf("Expected %s , received: %s", utils.ToJSON(expSess), utils.ToJSON(sess))
}
fltrs = &utils.SessionFilter{Filters: []string{}, Limit: utils.IntPointer(1)}
if sess := sS.filterSessions(fltrs, false); len(sess) != 1 {
if sess, err := sS.filterSessions(fltrs, false); err != nil {
t.Error(err)
} else if len(sess) != 1 {
t.Errorf("Expected one session, received: %s", utils.ToJSON(sess))
} else if !reflect.DeepEqual(expSess[0], eses1) && !reflect.DeepEqual(expSess[0], eses2) {
t.Errorf("Expected %s or %s, received: %s", utils.ToJSON(eses1), utils.ToJSON(eses2), utils.ToJSON(sess[0]))
}
fltrs = &utils.SessionFilter{Filters: []string{fmt.Sprintf("*string:~*req.ToR:%s;%s", utils.VOICE, utils.SMS)}, Limit: utils.IntPointer(1)}
if sess := sS.filterSessions(fltrs, false); len(sess) != 1 {
if sess, err := sS.filterSessions(fltrs, false); err != nil {
t.Error(err)
} else if len(sess) != 1 {
t.Errorf("Expected one session, received: %s", utils.ToJSON(sess))
} else if !reflect.DeepEqual(expSess[0], eses1) && !reflect.DeepEqual(expSess[0], eses2) {
t.Errorf("Expected %s or %s, received: %s", utils.ToJSON(eses1), utils.ToJSON(eses2), utils.ToJSON(sess[0]))
@@ -2248,19 +2277,27 @@ func TestSessionSfilterSessionsCount(t *testing.T) {
sS.registerSession(session, false)
fltrs := &utils.SessionFilter{Filters: []string{fmt.Sprintf("*string:~*req.ToR:%s;%s", utils.VOICE, utils.DATA)}}
if noSess := sS.filterSessionsCount(fltrs, false); noSess != 2 {
if noSess, err := sS.filterSessionsCount(fltrs, false); err != nil {
t.Error(err)
} else if noSess != 2 {
t.Errorf("Expected %v , received: %s", 2, utils.ToJSON(noSess))
}
fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.ToR:NoToR", "*string:~*req.Subject:subject1"}}
if noSess := sS.filterSessionsCount(fltrs, false); noSess != 0 {
if noSess, err := sS.filterSessionsCount(fltrs, false); err != nil {
t.Error(err)
} else if noSess != 0 {
t.Errorf("Expected no session, received: %s", utils.ToJSON(noSess))
}
fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.ToR:NoToR"}}
if noSess := sS.filterSessionsCount(fltrs, false); noSess != 0 {
if noSess, err := sS.filterSessionsCount(fltrs, false); err != nil {
t.Error(err)
} else if noSess != 0 {
t.Errorf("Expected no session, received: %s", utils.ToJSON(noSess))
}
fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.ToR:*voice", "*string:~*req.Subject:subject1"}}
if noSess := sS.filterSessionsCount(fltrs, false); noSess != 1 {
if noSess, err := sS.filterSessionsCount(fltrs, false); err != nil {
t.Error(err)
} else if noSess != 1 {
t.Errorf("Expected %v , received: %s", 1, utils.ToJSON(noSess))
}
sSCfg.SessionSCfg().SessionIndexes = utils.StringMap{
@@ -2270,25 +2307,35 @@ func TestSessionSfilterSessionsCount(t *testing.T) {
sS = NewSessionS(sSCfg, nil, nil)
sS.registerSession(session, false)
fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.ToR:*voice", "*string:~*req.Subject:subject1"}}
if noSess := sS.filterSessionsCount(fltrs, false); noSess != 1 {
if noSess, err := sS.filterSessionsCount(fltrs, false); err != nil {
t.Error(err)
} else if noSess != 1 {
t.Errorf("Expected %v , received: %s", 1, utils.ToJSON(noSess))
}
fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.Subject:subject1"}}
if noSess := sS.filterSessionsCount(fltrs, false); noSess != 2 {
if noSess, err := sS.filterSessionsCount(fltrs, false); err != nil {
t.Error(err)
} else if noSess != 2 {
t.Errorf("Expected %v , received: %s", 2, utils.ToJSON(noSess))
}
fltrs = &utils.SessionFilter{Filters: []string{"*string:~*req.Subject:subject2"}}
if noSess := sS.filterSessionsCount(fltrs, false); noSess != 0 {
if noSess, err := sS.filterSessionsCount(fltrs, false); err != nil {
t.Error(err)
} else if noSess != 0 {
t.Errorf("Expected no session, received: %s", utils.ToJSON(noSess))
}
fltrs = &utils.SessionFilter{Filters: []string{}}
if noSess := sS.filterSessionsCount(fltrs, false); noSess != 2 {
if noSess, err := sS.filterSessionsCount(fltrs, false); err != nil {
t.Error(err)
} else if noSess != 2 {
t.Errorf("Expected %v , received: %s", 2, utils.ToJSON(noSess))
}
sS = NewSessionS(sSCfg, nil, nil)
sS.registerSession(session, true)
fltrs = &utils.SessionFilter{Filters: []string{fmt.Sprintf("*string:~*req.ToR:%s;%s", utils.VOICE, utils.DATA)}}
if noSess := sS.filterSessionsCount(fltrs, true); noSess != 2 {
if noSess, err := sS.filterSessionsCount(fltrs, true); err != nil {
t.Error(err)
} else if noSess != 2 {
t.Errorf("Expected %v , received: %s", 2, utils.ToJSON(noSess))
}
}