SMGeneric.ActiveSessions with indexed filters

This commit is contained in:
DanB
2016-08-25 22:20:51 +02:00
parent 981dbe6f22
commit f7ca7b400f
5 changed files with 266 additions and 44 deletions

View File

@@ -90,42 +90,9 @@ func (self *SMGenericV1) ProcessCDR(ev sessionmanager.SMGenericEvent, reply *str
}
func (self *SMGenericV1) ActiveSessions(attrs utils.AttrSMGGetActiveSessions, reply *[]*sessionmanager.ActiveSession) error {
aSessions := make([]*sessionmanager.ActiveSession, 0)
for _, sGrp := range self.sm.Sessions() { // Add sessions to return with filter
for _, s := range sGrp {
as := s.AsActiveSession(self.sm.Timezone())
if attrs.ToR != nil && *attrs.ToR != as.TOR {
continue
}
if attrs.OriginID != nil && *attrs.OriginID != as.OriginID {
continue
}
if attrs.RunID != nil && *attrs.RunID != as.RunId {
continue
}
if attrs.RequestType != nil && *attrs.RequestType != as.ReqType {
continue
}
if attrs.Tenant != nil && *attrs.Tenant != as.Tenant {
continue
}
if attrs.Category != nil && *attrs.Category != as.Category {
continue
}
if attrs.Account != nil && *attrs.Account != as.Account {
continue
}
if attrs.Subject != nil && *attrs.Subject != as.Subject {
continue
}
if attrs.Destination != nil && *attrs.Destination != as.Destination {
continue
}
if attrs.Supplier != nil && *attrs.Supplier != as.Supplier {
continue
}
aSessions = append(aSessions, as)
}
aSessions, err := self.sm.ActiveSessions(attrs.AsMapStringString())
if err != nil {
return utils.NewErrServerError(err)
}
*reply = aSessions
return nil

View File

@@ -130,7 +130,7 @@ func (self *SMGeneric) recordSession(uuid string, s *SMGSession) {
}
}
self.indexSession(uuid, s)
}
// Remove session from session list, removes all related in case of multiple runs, true if item was found
@@ -145,6 +145,7 @@ func (self *SMGeneric) unrecordSession(uuid string) bool {
st.endChan <- true
delete(self.sessionTerminators, uuid)
}
self.unindexSession(uuid)
return true
}
@@ -181,9 +182,11 @@ func (self *SMGeneric) indexSession(uuid string, s *SMGSession) bool {
func (self *SMGeneric) unindexSession(uuid string) bool {
self.sessionIndexMux.Lock()
defer self.sessionIndexMux.Unlock()
var found bool
for fldName := range self.sessionIndexes {
for fldVal := range self.sessionIndexes[fldName] {
if _, hasUUID := self.sessionIndexes[fldName][fldVal][uuid]; hasUUID {
found = true
delete(self.sessionIndexes[fldName][fldVal], uuid)
if len(self.sessionIndexes[fldName][fldVal]) == 0 {
delete(self.sessionIndexes[fldName], fldVal)
@@ -194,7 +197,39 @@ func (self *SMGeneric) unindexSession(uuid string) bool {
}
}
}
return false
return found
}
// getSessionIDsMatchingIndexes will check inside indexes if it can find sessionIDs matching all filters
// matchedIndexes returns map[matchedFieldName]possibleMatchedFieldVal so we optimize further to avoid checking them
func (self *SMGeneric) getSessionIDsMatchingIndexes(fltrs map[string]string) (matchingSessions utils.StringMap, matchedIndexes map[string]string) {
self.sessionIndexMux.RLock()
sessionIDxes := self.sessionIndexes
self.sessionIndexMux.RUnlock()
matchedIndexes = make(map[string]string)
checkNr := 0
for fltrName, fltrVal := range fltrs {
checkNr += 1
if _, hasFldName := sessionIDxes[fltrName]; !hasFldName {
continue
}
if _, hasFldVal := sessionIDxes[fltrName][fltrVal]; !hasFldVal {
matchedIndexes[fltrName] = utils.META_NONE
continue
}
matchedIndexes[fltrName] = fltrVal
if checkNr == 1 { // First run will init the MatchingSessions
matchingSessions = sessionIDxes[fltrName][fltrVal]
continue
}
// Higher run, takes out non matching indexes
for sessID := range sessionIDxes[fltrName][fltrVal] {
if _, hasUUID := matchingSessions[sessID]; !hasUUID {
delete(matchingSessions, sessID)
}
}
}
return
}
func (self *SMGeneric) getSessionIDsForPrefix(prefix string) []string {
@@ -550,19 +585,64 @@ func (self *SMGeneric) Connect() error {
}
// Used by APIer to retrieve sessions
func (self *SMGeneric) Sessions() map[string][]*SMGSession {
func (self *SMGeneric) getSessions() map[string][]*SMGSession {
self.sessionsMux.RLock()
defer self.sessionsMux.RUnlock()
return self.sessions
}
func (self *SMGeneric) ActiveSessions(fltrs map[string]string) (aSessions []*ActiveSession, err error) {
// Check first based on indexes so we can downsize the list of matching sessions
matchingSessionIDs, checkedFilters := self.getSessionIDsMatchingIndexes(fltrs)
if len(matchingSessionIDs) == 0 && len(checkedFilters) != 0 {
return
}
for fltrFldName := range fltrs {
if _, alreadyChecked := checkedFilters[fltrFldName]; alreadyChecked && fltrFldName != utils.MEDI_RUNID { // Optimize further checks, RunID should stay since it can create bugs
delete(fltrs, fltrFldName)
}
}
var remainingSessions []*SMGSession // Survived index matching
for sUUID, sGrp := range self.getSessions() {
if _, hasUUID := matchingSessionIDs[sUUID]; !hasUUID && len(checkedFilters) != 0 {
continue
}
for _, s := range sGrp {
remainingSessions = append(remainingSessions, s)
}
}
if len(fltrs) != 0 { // Still have some filters to match
for i, s := range remainingSessions {
sMp, err := s.eventStart.AsMapStringString()
if err != nil {
return nil, err
}
matchingAll := true
for fltrFldName, fltrFldVal := range fltrs {
if fldVal, hasIt := sMp[fltrFldName]; !hasIt || fltrFldVal != fldVal { // No Match
matchingAll = false
break
}
}
if !matchingAll { // Strip the session from remaining ones with emptying the session to be garbage collected
remainingSessions[i] = remainingSessions[len(remainingSessions)-1]
remainingSessions = remainingSessions[:len(remainingSessions)-1]
}
}
}
for _, s := range remainingSessions {
aSessions = append(aSessions, s.AsActiveSession(self.Timezone()))
}
return
}
func (self *SMGeneric) Timezone() string {
return self.timezone
}
// System shutdown
func (self *SMGeneric) Shutdown() error {
for ssId := range self.Sessions() { // Force sessions shutdown
for ssId := range self.getSessions() { // Force sessions shutdown
self.sessionEnd(ssId, time.Duration(self.cgrCfg.MaxCallDuration))
}
return nil

View File

@@ -26,10 +26,16 @@ import (
"github.com/cgrates/cgrates/utils"
)
var smgCfg *config.CGRConfig
func init() {
smgCfg, _ = config.NewDefaultCGRConfig()
smgCfg.SmGenericConfig.SessionIndexes = []string{"Tenant", "Account", "Extra3", "Extra4"}
}
func TestSMGSessionIndexing(t *testing.T) {
cfg, _ = config.NewDefaultCGRConfig()
cfg.SmGenericConfig.SessionIndexes = []string{"Tenant", "Account", "Extra3", "Extra4"}
smg := NewSMGeneric(cfg, nil, nil, "UTC", nil)
smg := NewSMGeneric(smgCfg, nil, nil, "UTC", nil)
smGev := SMGenericEvent{
utils.EVENT_NAME: "TEST_EVENT",
utils.TOR: "*voice",
@@ -158,5 +164,78 @@ func TestSMGSessionIndexing(t *testing.T) {
if !reflect.DeepEqual(eIndexes, smg.sessionIndexes) {
t.Errorf("Expecting: %+v, received: %+v", eIndexes, smg.sessionIndexes)
}
}
func TestSMGActiveSessions(t *testing.T) {
smg := NewSMGeneric(smgCfg, nil, nil, "UTC", nil)
smGev1 := SMGenericEvent{
utils.EVENT_NAME: "TEST_EVENT",
utils.TOR: "*voice",
utils.ACCID: "111",
utils.DIRECTION: "*out",
utils.ACCOUNT: "account1",
utils.SUBJECT: "subject1",
utils.DESTINATION: "+4986517174963",
utils.CATEGORY: "call",
utils.TENANT: "cgrates.org",
utils.REQTYPE: "*prepaid",
utils.SETUP_TIME: "2015-11-09 14:21:24",
utils.ANSWER_TIME: "2015-11-09 14:22:02",
utils.USAGE: "1m23s",
utils.LastUsed: "21s",
utils.PDD: "300ms",
utils.SUPPLIER: "supplier1",
utils.DISCONNECT_CAUSE: "NORMAL_DISCONNECT",
utils.CDRHOST: "127.0.0.1",
"Extra1": "Value1",
"Extra2": 5,
"Extra3": "",
}
smg.recordSession(smGev1.GetUUID(), &SMGSession{eventStart: smGev1})
smGev2 := SMGenericEvent{
utils.EVENT_NAME: "TEST_EVENT",
utils.TOR: "*voice",
utils.ACCID: "222",
utils.DIRECTION: "*out",
utils.ACCOUNT: "account2",
utils.DESTINATION: "+4986517174963",
utils.CATEGORY: "call",
utils.TENANT: "itsyscom.com",
utils.REQTYPE: "*prepaid",
utils.ANSWER_TIME: "2015-11-09 14:22:02",
utils.USAGE: "1m23s",
utils.LastUsed: "21s",
utils.PDD: "300ms",
utils.SUPPLIER: "supplier2",
utils.DISCONNECT_CAUSE: "NORMAL_DISCONNECT",
utils.CDRHOST: "127.0.0.1",
"Extra1": "Value1",
"Extra3": "extra3",
}
smg.recordSession(smGev2.GetUUID(), &SMGSession{eventStart: smGev2})
if aSessions, err := smg.ActiveSessions(nil); err != nil {
t.Error(err)
} else if len(aSessions) != 2 {
t.Errorf("Received sessions: %%+v", aSessions)
}
if aSessions, err := smg.ActiveSessions(map[string]string{"Tenant": "itsyscom.com"}); err != nil {
t.Error(err)
} else if len(aSessions) != 1 {
t.Errorf("Received sessions: %%+v", aSessions)
}
if aSessions, err := smg.ActiveSessions(map[string]string{utils.TOR: "*voice"}); err != nil {
t.Error(err)
} else if len(aSessions) != 2 {
t.Errorf("Received sessions: %%+v", aSessions)
}
if aSessions, err := smg.ActiveSessions(map[string]string{"Extra3": utils.MetaEmpty}); err != nil {
t.Error(err)
} else if len(aSessions) != 1 {
t.Errorf("Received sessions: %+v", aSessions)
}
if aSessions, err := smg.ActiveSessions(map[string]string{utils.SUPPLIER: "supplier2"}); err != nil {
t.Error(err)
} else if len(aSessions) != 1 {
t.Errorf("Received sessions: %+v", aSessions)
}
}

View File

@@ -1178,6 +1178,72 @@ type AttrSMGGetActiveSessions struct {
Supplier *string
}
// Used for easier filtering, keep struct format to mark filter fields clearly
func (attrs *AttrSMGGetActiveSessions) AsMapStringString() map[string]string {
out := make(map[string]string)
if attrs.ToR != nil {
if *attrs.ToR == "" {
*attrs.ToR = MetaEmpty
}
out[TOR] = *attrs.ToR
}
if attrs.OriginID != nil {
if *attrs.OriginID == "" {
*attrs.OriginID = MetaEmpty
}
out[ACCID] = *attrs.OriginID
}
if attrs.RunID != nil {
if *attrs.RunID == "" {
*attrs.RunID = MetaEmpty
}
out[MEDI_RUNID] = *attrs.RunID
}
if attrs.RequestType != nil {
if *attrs.RequestType == "" {
*attrs.RequestType = MetaEmpty
}
out[REQTYPE] = *attrs.RequestType
}
if attrs.Tenant != nil {
if *attrs.Tenant == "" {
*attrs.Tenant = MetaEmpty
}
out[TENANT] = *attrs.Tenant
}
if attrs.Category != nil {
if *attrs.Category == "" {
*attrs.Category = MetaEmpty
}
out[CATEGORY] = *attrs.Category
}
if attrs.Account != nil {
if *attrs.Account == "" {
*attrs.Account = MetaEmpty
}
out[ACCOUNT] = *attrs.Account
}
if attrs.Subject != nil {
if *attrs.Subject == "" {
*attrs.Subject = MetaEmpty
}
out[SUBJECT] = MetaEmpty
}
if attrs.Destination != nil {
if *attrs.Destination == "" {
*attrs.Destination = MetaEmpty
}
out[DESTINATION] = *attrs.Destination
}
if attrs.Supplier != nil {
if *attrs.Supplier == "" {
*attrs.Supplier = MetaEmpty
}
out[SUPPLIER] = *attrs.Supplier
}
return out
}
type AttrRateCDRs struct {
RPCCDRsFilter
StoreCDRs *bool

View File

@@ -31,3 +31,33 @@ func TestNewDTCSFromRPKey(t *testing.T) {
t.Error("Received: ", dtcs)
}
}
func TestAPIAttrSMGGetActiveSessionsAsMapStr(t *testing.T) {
attrs := &AttrSMGGetActiveSessions{
ToR: StringPointer(""),
OriginID: StringPointer(""),
RunID: StringPointer(""),
RequestType: StringPointer(""),
Tenant: StringPointer(""),
Category: StringPointer(""),
Account: StringPointer(""),
Subject: StringPointer(""),
Destination: StringPointer(""),
Supplier: StringPointer(""),
}
expectMP := map[string]string{
TOR: MetaEmpty,
ACCID: MetaEmpty,
MEDI_RUNID: MetaEmpty,
REQTYPE: MetaEmpty,
TENANT: MetaEmpty,
CATEGORY: MetaEmpty,
ACCOUNT: MetaEmpty,
SUBJECT: MetaEmpty,
DESTINATION: MetaEmpty,
SUPPLIER: MetaEmpty,
}
if mp := attrs.AsMapStringString(); !reflect.DeepEqual(expectMP, mp) {
t.Errorf("Expecting: %+v, received: %+v", expectMP, mp)
}
}