From f7ca7b400f226d855a8b70f9754d09a692f2b2d4 Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 25 Aug 2016 22:20:51 +0200 Subject: [PATCH] SMGeneric.ActiveSessions with indexed filters --- apier/v1/smgenericv1.go | 39 ++------------ sessionmanager/smgeneric.go | 88 ++++++++++++++++++++++++++++++-- sessionmanager/smgeneric_test.go | 87 +++++++++++++++++++++++++++++-- utils/apitpdata.go | 66 ++++++++++++++++++++++++ utils/apitpdata_test.go | 30 +++++++++++ 5 files changed, 266 insertions(+), 44 deletions(-) diff --git a/apier/v1/smgenericv1.go b/apier/v1/smgenericv1.go index 07d09a2fa..63410c084 100644 --- a/apier/v1/smgenericv1.go +++ b/apier/v1/smgenericv1.go @@ -90,42 +90,9 @@ func (self *SMGenericV1) ProcessCDR(ev sessionmanager.SMGenericEvent, reply *str } func (self *SMGenericV1) ActiveSessions(attrs utils.AttrSMGGetActiveSessions, reply *[]*sessionmanager.ActiveSession) error { - aSessions := make([]*sessionmanager.ActiveSession, 0) - for _, sGrp := range self.sm.Sessions() { // Add sessions to return with filter - for _, s := range sGrp { - as := s.AsActiveSession(self.sm.Timezone()) - if attrs.ToR != nil && *attrs.ToR != as.TOR { - continue - } - if attrs.OriginID != nil && *attrs.OriginID != as.OriginID { - continue - } - if attrs.RunID != nil && *attrs.RunID != as.RunId { - continue - } - if attrs.RequestType != nil && *attrs.RequestType != as.ReqType { - continue - } - if attrs.Tenant != nil && *attrs.Tenant != as.Tenant { - continue - } - if attrs.Category != nil && *attrs.Category != as.Category { - continue - } - if attrs.Account != nil && *attrs.Account != as.Account { - continue - } - if attrs.Subject != nil && *attrs.Subject != as.Subject { - continue - } - if attrs.Destination != nil && *attrs.Destination != as.Destination { - continue - } - if attrs.Supplier != nil && *attrs.Supplier != as.Supplier { - continue - } - aSessions = append(aSessions, as) - } + aSessions, err := self.sm.ActiveSessions(attrs.AsMapStringString()) + if err != nil { + return utils.NewErrServerError(err) } *reply = aSessions return nil diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index d9ddc2d5f..08bf94b32 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -130,7 +130,7 @@ func (self *SMGeneric) recordSession(uuid string, s *SMGSession) { } } - + self.indexSession(uuid, s) } // Remove session from session list, removes all related in case of multiple runs, true if item was found @@ -145,6 +145,7 @@ func (self *SMGeneric) unrecordSession(uuid string) bool { st.endChan <- true delete(self.sessionTerminators, uuid) } + self.unindexSession(uuid) return true } @@ -181,9 +182,11 @@ func (self *SMGeneric) indexSession(uuid string, s *SMGSession) bool { func (self *SMGeneric) unindexSession(uuid string) bool { self.sessionIndexMux.Lock() defer self.sessionIndexMux.Unlock() + var found bool for fldName := range self.sessionIndexes { for fldVal := range self.sessionIndexes[fldName] { if _, hasUUID := self.sessionIndexes[fldName][fldVal][uuid]; hasUUID { + found = true delete(self.sessionIndexes[fldName][fldVal], uuid) if len(self.sessionIndexes[fldName][fldVal]) == 0 { delete(self.sessionIndexes[fldName], fldVal) @@ -194,7 +197,39 @@ func (self *SMGeneric) unindexSession(uuid string) bool { } } } - return false + return found +} + +// getSessionIDsMatchingIndexes will check inside indexes if it can find sessionIDs matching all filters +// matchedIndexes returns map[matchedFieldName]possibleMatchedFieldVal so we optimize further to avoid checking them +func (self *SMGeneric) getSessionIDsMatchingIndexes(fltrs map[string]string) (matchingSessions utils.StringMap, matchedIndexes map[string]string) { + self.sessionIndexMux.RLock() + sessionIDxes := self.sessionIndexes + self.sessionIndexMux.RUnlock() + matchedIndexes = make(map[string]string) + checkNr := 0 + for fltrName, fltrVal := range fltrs { + checkNr += 1 + if _, hasFldName := sessionIDxes[fltrName]; !hasFldName { + continue + } + if _, hasFldVal := sessionIDxes[fltrName][fltrVal]; !hasFldVal { + matchedIndexes[fltrName] = utils.META_NONE + continue + } + matchedIndexes[fltrName] = fltrVal + if checkNr == 1 { // First run will init the MatchingSessions + matchingSessions = sessionIDxes[fltrName][fltrVal] + continue + } + // Higher run, takes out non matching indexes + for sessID := range sessionIDxes[fltrName][fltrVal] { + if _, hasUUID := matchingSessions[sessID]; !hasUUID { + delete(matchingSessions, sessID) + } + } + } + return } func (self *SMGeneric) getSessionIDsForPrefix(prefix string) []string { @@ -550,19 +585,64 @@ func (self *SMGeneric) Connect() error { } // Used by APIer to retrieve sessions -func (self *SMGeneric) Sessions() map[string][]*SMGSession { +func (self *SMGeneric) getSessions() map[string][]*SMGSession { self.sessionsMux.RLock() defer self.sessionsMux.RUnlock() return self.sessions } +func (self *SMGeneric) ActiveSessions(fltrs map[string]string) (aSessions []*ActiveSession, err error) { + // Check first based on indexes so we can downsize the list of matching sessions + matchingSessionIDs, checkedFilters := self.getSessionIDsMatchingIndexes(fltrs) + if len(matchingSessionIDs) == 0 && len(checkedFilters) != 0 { + return + } + for fltrFldName := range fltrs { + if _, alreadyChecked := checkedFilters[fltrFldName]; alreadyChecked && fltrFldName != utils.MEDI_RUNID { // Optimize further checks, RunID should stay since it can create bugs + delete(fltrs, fltrFldName) + } + } + var remainingSessions []*SMGSession // Survived index matching + for sUUID, sGrp := range self.getSessions() { + if _, hasUUID := matchingSessionIDs[sUUID]; !hasUUID && len(checkedFilters) != 0 { + continue + } + for _, s := range sGrp { + remainingSessions = append(remainingSessions, s) + } + } + if len(fltrs) != 0 { // Still have some filters to match + for i, s := range remainingSessions { + sMp, err := s.eventStart.AsMapStringString() + if err != nil { + return nil, err + } + matchingAll := true + for fltrFldName, fltrFldVal := range fltrs { + if fldVal, hasIt := sMp[fltrFldName]; !hasIt || fltrFldVal != fldVal { // No Match + matchingAll = false + break + } + } + if !matchingAll { // Strip the session from remaining ones with emptying the session to be garbage collected + remainingSessions[i] = remainingSessions[len(remainingSessions)-1] + remainingSessions = remainingSessions[:len(remainingSessions)-1] + } + } + } + for _, s := range remainingSessions { + aSessions = append(aSessions, s.AsActiveSession(self.Timezone())) + } + return +} + func (self *SMGeneric) Timezone() string { return self.timezone } // System shutdown func (self *SMGeneric) Shutdown() error { - for ssId := range self.Sessions() { // Force sessions shutdown + for ssId := range self.getSessions() { // Force sessions shutdown self.sessionEnd(ssId, time.Duration(self.cgrCfg.MaxCallDuration)) } return nil diff --git a/sessionmanager/smgeneric_test.go b/sessionmanager/smgeneric_test.go index 0e2267716..37e021f39 100644 --- a/sessionmanager/smgeneric_test.go +++ b/sessionmanager/smgeneric_test.go @@ -26,10 +26,16 @@ import ( "github.com/cgrates/cgrates/utils" ) +var smgCfg *config.CGRConfig + +func init() { + smgCfg, _ = config.NewDefaultCGRConfig() + smgCfg.SmGenericConfig.SessionIndexes = []string{"Tenant", "Account", "Extra3", "Extra4"} + +} + func TestSMGSessionIndexing(t *testing.T) { - cfg, _ = config.NewDefaultCGRConfig() - cfg.SmGenericConfig.SessionIndexes = []string{"Tenant", "Account", "Extra3", "Extra4"} - smg := NewSMGeneric(cfg, nil, nil, "UTC", nil) + smg := NewSMGeneric(smgCfg, nil, nil, "UTC", nil) smGev := SMGenericEvent{ utils.EVENT_NAME: "TEST_EVENT", utils.TOR: "*voice", @@ -158,5 +164,78 @@ func TestSMGSessionIndexing(t *testing.T) { if !reflect.DeepEqual(eIndexes, smg.sessionIndexes) { t.Errorf("Expecting: %+v, received: %+v", eIndexes, smg.sessionIndexes) } - +} + +func TestSMGActiveSessions(t *testing.T) { + smg := NewSMGeneric(smgCfg, nil, nil, "UTC", nil) + smGev1 := SMGenericEvent{ + utils.EVENT_NAME: "TEST_EVENT", + utils.TOR: "*voice", + utils.ACCID: "111", + utils.DIRECTION: "*out", + utils.ACCOUNT: "account1", + utils.SUBJECT: "subject1", + utils.DESTINATION: "+4986517174963", + utils.CATEGORY: "call", + utils.TENANT: "cgrates.org", + utils.REQTYPE: "*prepaid", + utils.SETUP_TIME: "2015-11-09 14:21:24", + utils.ANSWER_TIME: "2015-11-09 14:22:02", + utils.USAGE: "1m23s", + utils.LastUsed: "21s", + utils.PDD: "300ms", + utils.SUPPLIER: "supplier1", + utils.DISCONNECT_CAUSE: "NORMAL_DISCONNECT", + utils.CDRHOST: "127.0.0.1", + "Extra1": "Value1", + "Extra2": 5, + "Extra3": "", + } + smg.recordSession(smGev1.GetUUID(), &SMGSession{eventStart: smGev1}) + smGev2 := SMGenericEvent{ + utils.EVENT_NAME: "TEST_EVENT", + utils.TOR: "*voice", + utils.ACCID: "222", + utils.DIRECTION: "*out", + utils.ACCOUNT: "account2", + utils.DESTINATION: "+4986517174963", + utils.CATEGORY: "call", + utils.TENANT: "itsyscom.com", + utils.REQTYPE: "*prepaid", + utils.ANSWER_TIME: "2015-11-09 14:22:02", + utils.USAGE: "1m23s", + utils.LastUsed: "21s", + utils.PDD: "300ms", + utils.SUPPLIER: "supplier2", + utils.DISCONNECT_CAUSE: "NORMAL_DISCONNECT", + utils.CDRHOST: "127.0.0.1", + "Extra1": "Value1", + "Extra3": "extra3", + } + smg.recordSession(smGev2.GetUUID(), &SMGSession{eventStart: smGev2}) + if aSessions, err := smg.ActiveSessions(nil); err != nil { + t.Error(err) + } else if len(aSessions) != 2 { + t.Errorf("Received sessions: %%+v", aSessions) + } + if aSessions, err := smg.ActiveSessions(map[string]string{"Tenant": "itsyscom.com"}); err != nil { + t.Error(err) + } else if len(aSessions) != 1 { + t.Errorf("Received sessions: %%+v", aSessions) + } + if aSessions, err := smg.ActiveSessions(map[string]string{utils.TOR: "*voice"}); err != nil { + t.Error(err) + } else if len(aSessions) != 2 { + t.Errorf("Received sessions: %%+v", aSessions) + } + if aSessions, err := smg.ActiveSessions(map[string]string{"Extra3": utils.MetaEmpty}); err != nil { + t.Error(err) + } else if len(aSessions) != 1 { + t.Errorf("Received sessions: %+v", aSessions) + } + if aSessions, err := smg.ActiveSessions(map[string]string{utils.SUPPLIER: "supplier2"}); err != nil { + t.Error(err) + } else if len(aSessions) != 1 { + t.Errorf("Received sessions: %+v", aSessions) + } } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index fcd50093c..bb46db6e8 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1178,6 +1178,72 @@ type AttrSMGGetActiveSessions struct { Supplier *string } +// Used for easier filtering, keep struct format to mark filter fields clearly +func (attrs *AttrSMGGetActiveSessions) AsMapStringString() map[string]string { + out := make(map[string]string) + if attrs.ToR != nil { + if *attrs.ToR == "" { + *attrs.ToR = MetaEmpty + } + out[TOR] = *attrs.ToR + } + if attrs.OriginID != nil { + if *attrs.OriginID == "" { + *attrs.OriginID = MetaEmpty + } + out[ACCID] = *attrs.OriginID + } + if attrs.RunID != nil { + if *attrs.RunID == "" { + *attrs.RunID = MetaEmpty + } + out[MEDI_RUNID] = *attrs.RunID + } + if attrs.RequestType != nil { + if *attrs.RequestType == "" { + *attrs.RequestType = MetaEmpty + } + out[REQTYPE] = *attrs.RequestType + } + if attrs.Tenant != nil { + if *attrs.Tenant == "" { + *attrs.Tenant = MetaEmpty + } + out[TENANT] = *attrs.Tenant + } + if attrs.Category != nil { + if *attrs.Category == "" { + *attrs.Category = MetaEmpty + } + out[CATEGORY] = *attrs.Category + } + if attrs.Account != nil { + if *attrs.Account == "" { + *attrs.Account = MetaEmpty + } + out[ACCOUNT] = *attrs.Account + } + if attrs.Subject != nil { + if *attrs.Subject == "" { + *attrs.Subject = MetaEmpty + } + out[SUBJECT] = MetaEmpty + } + if attrs.Destination != nil { + if *attrs.Destination == "" { + *attrs.Destination = MetaEmpty + } + out[DESTINATION] = *attrs.Destination + } + if attrs.Supplier != nil { + if *attrs.Supplier == "" { + *attrs.Supplier = MetaEmpty + } + out[SUPPLIER] = *attrs.Supplier + } + return out +} + type AttrRateCDRs struct { RPCCDRsFilter StoreCDRs *bool diff --git a/utils/apitpdata_test.go b/utils/apitpdata_test.go index ba525f1d6..7d5315633 100644 --- a/utils/apitpdata_test.go +++ b/utils/apitpdata_test.go @@ -31,3 +31,33 @@ func TestNewDTCSFromRPKey(t *testing.T) { t.Error("Received: ", dtcs) } } + +func TestAPIAttrSMGGetActiveSessionsAsMapStr(t *testing.T) { + attrs := &AttrSMGGetActiveSessions{ + ToR: StringPointer(""), + OriginID: StringPointer(""), + RunID: StringPointer(""), + RequestType: StringPointer(""), + Tenant: StringPointer(""), + Category: StringPointer(""), + Account: StringPointer(""), + Subject: StringPointer(""), + Destination: StringPointer(""), + Supplier: StringPointer(""), + } + expectMP := map[string]string{ + TOR: MetaEmpty, + ACCID: MetaEmpty, + MEDI_RUNID: MetaEmpty, + REQTYPE: MetaEmpty, + TENANT: MetaEmpty, + CATEGORY: MetaEmpty, + ACCOUNT: MetaEmpty, + SUBJECT: MetaEmpty, + DESTINATION: MetaEmpty, + SUPPLIER: MetaEmpty, + } + if mp := attrs.AsMapStringString(); !reflect.DeepEqual(expectMP, mp) { + t.Errorf("Expecting: %+v, received: %+v", expectMP, mp) + } +}