From 8a27dfc4d2895fff57ee1482cb99fbd8aa0fd591 Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 25 Aug 2016 09:21:32 +0200 Subject: [PATCH] SMG - session indexing implementation with tests --- engine/account_test.go | 4 +- engine/cdr.go | 5 + engine/event.go | 1 + sessionmanager/fsevent.go | 4 + sessionmanager/kamevent.go | 4 + sessionmanager/osipsevent.go | 6 +- sessionmanager/session.go | 10 ++ sessionmanager/smgeneric.go | 90 +++++++++++++---- sessionmanager/smgeneric_test.go | 162 +++++++++++++++++++++++++++++++ utils/consts.go | 1 + 10 files changed, 264 insertions(+), 23 deletions(-) create mode 100644 sessionmanager/smgeneric_test.go diff --git a/engine/account_test.go b/engine/account_test.go index 4a24b41ae..bfa598112 100644 --- a/engine/account_test.go +++ b/engine/account_test.go @@ -42,7 +42,7 @@ func TestBalanceStoreRestore(t *testing.T) { if err != nil { t.Error("Error restoring balance: ", err) } - t.Logf("INITIAL: %+v", b) + //t.Logf("INITIAL: %+v", b) if !b.Equal(b1) { t.Errorf("Balance store/restore failed: expected %+v was %+v", b, b1) } @@ -321,7 +321,7 @@ func TestDebitCreditZeroMinute(t *testing.T) { if err != nil { t.Error("Error debiting balance: ", err) } - t.Logf("%+v", cc.Timespans) + //t.Logf("%+v", cc.Timespans) if cc.Timespans[0].Increments[0].BalanceInfo.Unit.UUID != "testb" || cc.Timespans[0].Increments[0].Duration != time.Minute { t.Error("Error setting balance id to increment: ", cc.Timespans[0].Increments[0]) diff --git a/engine/cdr.go b/engine/cdr.go index 13f27a235..bd890eea3 100644 --- a/engine/cdr.go +++ b/engine/cdr.go @@ -827,6 +827,11 @@ func (cdr *CDR) AsExportRecord(exportFields []*config.CfgCdrField, costShiftDigi return expRecord, nil } +// Part of event interface +func (cdr *CDR) AsMapStringIface() (map[string]interface{}, error) { + return nil, utils.ErrNotImplemented +} + type ExternalCDR struct { CGRID string RunID string diff --git a/engine/event.go b/engine/event.go index 2f7fb12d6..be0c5c347 100644 --- a/engine/event.go +++ b/engine/event.go @@ -51,4 +51,5 @@ type Event interface { String() string AsEvent(string) Event ComputeLcr() bool + AsMapStringIface() (map[string]interface{}, error) } diff --git a/sessionmanager/fsevent.go b/sessionmanager/fsevent.go index c37e46d25..ff8b1154d 100644 --- a/sessionmanager/fsevent.go +++ b/sessionmanager/fsevent.go @@ -415,6 +415,10 @@ func (fsev FSEvent) AsCallDescriptor() (*engine.CallDescriptor, error) { return lcrReq.AsCallDescriptor(config.CgrConfig().DefaultTimezone) } +func (fsev FSEvent) AsMapStringIface() (map[string]interface{}, error) { + return nil, utils.ErrNotImplemented +} + // Converts a slice of strings into a FS array string, contains len(array) at first index since FS does not support len(ARRAY::) for now func SliceAsFsArray(slc []string) string { arry := "" diff --git a/sessionmanager/kamevent.go b/sessionmanager/kamevent.go index 422498c0f..7aba55d8c 100644 --- a/sessionmanager/kamevent.go +++ b/sessionmanager/kamevent.go @@ -395,3 +395,7 @@ func (kev KamEvent) ComputeLcr() bool { return computeLcr } } + +func (kev KamEvent) AsMapStringIface() (map[string]interface{}, error) { + return nil, utils.ErrNotImplemented +} diff --git a/sessionmanager/osipsevent.go b/sessionmanager/osipsevent.go index a8f6981f4..afcbd17ce 100644 --- a/sessionmanager/osipsevent.go +++ b/sessionmanager/osipsevent.go @@ -250,7 +250,7 @@ func (osipsev *OsipsEvent) PassesFieldFilter(*utils.RSRField) (bool, string) { } func (osipsev *OsipsEvent) GetExtraFields() map[string]string { primaryFields := []string{TO_TAG, SETUP_DURATION, OSIPS_SETUP_TIME, "method", "callid", "sip_reason", OSIPS_EVENT_TIME, "sip_code", "duration", "from_tag", "dialog_id", - CGR_TENANT, CGR_CATEGORY, CGR_REQTYPE, CGR_ACCOUNT, CGR_SUBJECT, CGR_DESTINATION, utils.CGR_SUPPLIER, CGR_PDD,CGR_ANSWERTIME} + CGR_TENANT, CGR_CATEGORY, CGR_REQTYPE, CGR_ACCOUNT, CGR_SUBJECT, CGR_DESTINATION, utils.CGR_SUPPLIER, CGR_PDD, CGR_ANSWERTIME} extraFields := make(map[string]string) for field, val := range osipsev.osipsEvent.AttrValues { if !utils.IsSliceMember(primaryFields, field) { @@ -309,3 +309,7 @@ func (osipsEv *OsipsEvent) ComputeLcr() bool { return computeLcr } } + +func (osipsEv *OsipsEvent) AsMapStringIface() (map[string]interface{}, error) { + return nil, utils.ErrNotImplemented +} diff --git a/sessionmanager/session.go b/sessionmanager/session.go index d1731154a..55bb07c6e 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -21,6 +21,7 @@ package sessionmanager import ( "encoding/json" "fmt" + "reflect" "time" "github.com/cgrates/cgrates/engine" @@ -314,6 +315,15 @@ func (s *Session) AsActiveSessions() []*ActiveSession { return aSessions } +func (s *Session) AsMapStringIface() (map[string]interface{}, error) { + mp := make(map[string]interface{}) + v := reflect.ValueOf(s).Elem() + for i := 0; i < v.NumField(); i++ { + mp[v.Type().Field(i).Name] = v.Field(i).Interface() + } + return mp, nil +} + // Will be used when displaying active sessions via RPC type ActiveSession struct { CgrId string diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index a37490104..d9ddc2d5f 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -37,7 +37,9 @@ var ErrPartiallyExecuted = errors.New("Partially executed") func NewSMGeneric(cgrCfg *config.CGRConfig, rater rpcclient.RpcClientConnection, cdrsrv rpcclient.RpcClientConnection, timezone string, extconns *SMGExternalConnections) *SMGeneric { gsm := &SMGeneric{cgrCfg: cgrCfg, rater: rater, cdrsrv: cdrsrv, extconns: extconns, timezone: timezone, - sessions: make(map[string][]*SMGSession), sessionTerminators: make(map[string]*smgSessionTerminator), sessionsMux: new(sync.RWMutex), guard: engine.Guardian} + sessions: make(map[string][]*SMGSession), sessionTerminators: make(map[string]*smgSessionTerminator), + sessionIndexes: make(map[string]map[string]utils.StringMap), + sessionsMux: new(sync.RWMutex), sessionIndexMux: new(sync.RWMutex), guard: engine.Guardian} return gsm } @@ -46,11 +48,14 @@ type SMGeneric struct { rater rpcclient.RpcClientConnection cdrsrv rpcclient.RpcClientConnection timezone string - sessions map[string][]*SMGSession //Group sessions per sessionId, multiple runs based on derived charging - sessionTerminators map[string]*smgSessionTerminator // terminate and cleanup the session if timer expires - extconns *SMGExternalConnections // Reference towards external connections manager - sessionsMux *sync.RWMutex // Locks sessions map - guard *engine.GuardianLock // Used to lock on uuid + sessions map[string][]*SMGSession //Group sessions per sessionId, multiple runs based on derived charging + sessionTerminators map[string]*smgSessionTerminator // terminate and cleanup the session if timer expires + sessionIndexes map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue]utils.StringMap[sesionID] + extconns *SMGExternalConnections // Reference towards external connections manager + sessionsMux *sync.RWMutex // Locks sessions map + sessionIndexMux *sync.RWMutex + guard *engine.GuardianLock // Used to lock on uuid + } type smgSessionTerminator struct { timer *time.Timer @@ -94,8 +99,9 @@ func (self *SMGeneric) ttlTerminate(s *SMGSession, tmtr *smgSessionTerminator) { self.cdrsrv.Call("CdrsV1.ProcessCDR", cdr, &reply) } -func (self *SMGeneric) indexSession(uuid string, s *SMGSession) { +func (self *SMGeneric) recordSession(uuid string, s *SMGSession) { self.sessionsMux.Lock() + defer self.sessionsMux.Unlock() self.sessions[uuid] = append(self.sessions[uuid], s) if self.cgrCfg.SmGenericConfig.SessionTTL != 0 { if _, found := self.sessionTerminators[uuid]; !found { @@ -124,11 +130,11 @@ func (self *SMGeneric) indexSession(uuid string, s *SMGSession) { } } - self.sessionsMux.Unlock() + } // Remove session from session list, removes all related in case of multiple runs, true if item was found -func (self *SMGeneric) unindexSession(uuid string) bool { +func (self *SMGeneric) unrecordSession(uuid string) bool { self.sessionsMux.Lock() defer self.sessionsMux.Unlock() if _, found := self.sessions[uuid]; !found { @@ -142,11 +148,53 @@ func (self *SMGeneric) unindexSession(uuid string) bool { return true } -// Returns all sessions handled by the SM -func (self *SMGeneric) getSessions() map[string][]*SMGSession { - self.sessionsMux.Lock() - defer self.sessionsMux.Unlock() - return self.sessions +// indexSession explores settings and builds self.sessionIndexes based on that +func (self *SMGeneric) indexSession(uuid string, s *SMGSession) bool { + self.sessionIndexMux.Lock() + defer self.sessionIndexMux.Unlock() + ev := s.eventStart + for _, fieldName := range self.cgrCfg.SmGenericConfig.SessionIndexes { + fieldVal, err := utils.ReflectFieldAsString(ev, fieldName, "") + if err != nil { + if err == utils.ErrNotFound { + fieldVal = utils.NOT_AVAILABLE + } else { + utils.Logger.Err(fmt.Sprintf(" Error retrieving field: %s from event: %+v", fieldName, ev)) + continue + } + } + if fieldVal == "" { + fieldVal = utils.MetaEmpty + } + if _, hasFieldName := self.sessionIndexes[fieldName]; !hasFieldName { // Init it here so we can minimize + self.sessionIndexes[fieldName] = make(map[string]utils.StringMap) + } + if _, hasFieldVal := self.sessionIndexes[fieldName][fieldVal]; !hasFieldVal { + self.sessionIndexes[fieldName][fieldVal] = make(utils.StringMap) + } + self.sessionIndexes[fieldName][fieldVal][uuid] = true + } + return true +} + +// unindexSession removes a session from indexes +func (self *SMGeneric) unindexSession(uuid string) bool { + self.sessionIndexMux.Lock() + defer self.sessionIndexMux.Unlock() + for fldName := range self.sessionIndexes { + for fldVal := range self.sessionIndexes[fldName] { + if _, hasUUID := self.sessionIndexes[fldName][fldVal][uuid]; hasUUID { + delete(self.sessionIndexes[fldName][fldVal], uuid) + if len(self.sessionIndexes[fldName][fldVal]) == 0 { + delete(self.sessionIndexes[fldName], fldVal) + } + if len(self.sessionIndexes[fldName]) == 0 { + delete(self.sessionIndexes, fldName) + } + } + } + } + return false } func (self *SMGeneric) getSessionIDsForPrefix(prefix string) []string { @@ -182,7 +230,7 @@ func (self *SMGeneric) sessionStart(evStart SMGenericEvent, connId string) error for _, sessionRun := range sessionRuns { s := &SMGSession{eventStart: evStart, connId: connId, runId: sessionRun.DerivedCharger.RunID, timezone: self.timezone, rater: self.rater, cdrsrv: self.cdrsrv, cd: sessionRun.CallDescriptor} - self.indexSession(sessionId, s) + self.recordSession(sessionId, s) //utils.Logger.Info(fmt.Sprintf(" Starting session: %s, runId: %s", sessionId, s.runId)) if self.cgrCfg.SmGenericConfig.DebitInterval != 0 { s.stopDebit = stopDebitChan @@ -205,7 +253,7 @@ func (self *SMGeneric) sessionEnd(sessionId string, usage time.Duration) error { if len(ss) == 0 { // Not handled by us return nil, nil } - if !self.unindexSession(sessionId) { // Unreference it early so we avoid concurrency + if !self.unrecordSession(sessionId) { // Unreference it early so we avoid concurrency return nil, nil // Did not find the session so no need to close it anymore } for idx, s := range ss { @@ -247,9 +295,9 @@ func (self *SMGeneric) sessionRelocate(sessionID, initialID string) error { } for i, s := range ss { s.eventStart[utils.ACCID] = sessionID // Overwrite initialSessionID with new one - self.indexSession(sessionID, s) + self.recordSession(sessionID, s) if i == 0 { - self.unindexSession(initialID) + self.unrecordSession(initialID) } } return nil, nil @@ -503,7 +551,9 @@ func (self *SMGeneric) Connect() error { // Used by APIer to retrieve sessions func (self *SMGeneric) Sessions() map[string][]*SMGSession { - return self.getSessions() + self.sessionsMux.RLock() + defer self.sessionsMux.RUnlock() + return self.sessions } func (self *SMGeneric) Timezone() string { @@ -512,7 +562,7 @@ func (self *SMGeneric) Timezone() string { // System shutdown func (self *SMGeneric) Shutdown() error { - for ssId := range self.getSessions() { // Force sessions shutdown + for ssId := range self.Sessions() { // Force sessions shutdown self.sessionEnd(ssId, time.Duration(self.cgrCfg.MaxCallDuration)) } return nil diff --git a/sessionmanager/smgeneric_test.go b/sessionmanager/smgeneric_test.go new file mode 100644 index 000000000..0e2267716 --- /dev/null +++ b/sessionmanager/smgeneric_test.go @@ -0,0 +1,162 @@ +/* +Real-time Charging System for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package sessionmanager + +import ( + "reflect" + "testing" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" +) + +func TestSMGSessionIndexing(t *testing.T) { + cfg, _ = config.NewDefaultCGRConfig() + cfg.SmGenericConfig.SessionIndexes = []string{"Tenant", "Account", "Extra3", "Extra4"} + smg := NewSMGeneric(cfg, nil, nil, "UTC", nil) + smGev := SMGenericEvent{ + utils.EVENT_NAME: "TEST_EVENT", + utils.TOR: "*voice", + utils.ACCID: "12345", + utils.DIRECTION: "*out", + utils.ACCOUNT: "account1", + utils.SUBJECT: "subject1", + utils.DESTINATION: "+4986517174963", + utils.CATEGORY: "call", + utils.TENANT: "cgrates.org", + utils.REQTYPE: "*prepaid", + utils.SETUP_TIME: "2015-11-09 14:21:24", + utils.ANSWER_TIME: "2015-11-09 14:22:02", + utils.USAGE: "1m23s", + utils.LastUsed: "21s", + utils.PDD: "300ms", + utils.SUPPLIER: "supplier1", + utils.DISCONNECT_CAUSE: "NORMAL_DISCONNECT", + utils.CDRHOST: "127.0.0.1", + "Extra1": "Value1", + "Extra2": 5, + "Extra3": "", + } + // Index first session + smgSession := &SMGSession{eventStart: smGev} + uuid := smGev.GetUUID() + smg.indexSession(uuid, smgSession) + eIndexes := map[string]map[string]utils.StringMap{ + "Tenant": map[string]utils.StringMap{ + "cgrates.org": utils.StringMap{ + uuid: true, + }, + }, + "Account": map[string]utils.StringMap{ + "account1": utils.StringMap{ + uuid: true, + }, + }, + "Extra3": map[string]utils.StringMap{ + utils.MetaEmpty: utils.StringMap{ + uuid: true, + }, + }, + "Extra4": map[string]utils.StringMap{ + utils.NOT_AVAILABLE: utils.StringMap{ + uuid: true, + }, + }, + } + if !reflect.DeepEqual(eIndexes, smg.sessionIndexes) { + t.Errorf("Expecting: %+v, received: %+v", eIndexes, smg.sessionIndexes) + } + // Index seccond session + smGev2 := SMGenericEvent{ + utils.EVENT_NAME: "TEST_EVENT2", + utils.ACCID: "12346", + utils.DIRECTION: "*out", + utils.ACCOUNT: "account2", + utils.DESTINATION: "+4986517174964", + utils.TENANT: "itsyscom.com", + "Extra3": "", + "Extra4": "info2", + } + uuid2 := smGev2.GetUUID() + smgSession2 := &SMGSession{eventStart: smGev2} + smg.indexSession(uuid2, smgSession2) + eIndexes = map[string]map[string]utils.StringMap{ + "Tenant": map[string]utils.StringMap{ + "cgrates.org": utils.StringMap{ + uuid: true, + }, + "itsyscom.com": utils.StringMap{ + uuid2: true, + }, + }, + "Account": map[string]utils.StringMap{ + "account1": utils.StringMap{ + uuid: true, + }, + "account2": utils.StringMap{ + uuid2: true, + }, + }, + "Extra3": map[string]utils.StringMap{ + utils.MetaEmpty: utils.StringMap{ + uuid: true, + uuid2: true, + }, + }, + "Extra4": map[string]utils.StringMap{ + utils.NOT_AVAILABLE: utils.StringMap{ + uuid: true, + }, + "info2": utils.StringMap{ + uuid2: true, + }, + }, + } + if !reflect.DeepEqual(eIndexes, smg.sessionIndexes) { + t.Errorf("Expecting: %+v, received: %+v", eIndexes, smg.sessionIndexes) + } + // Unidex first session + smg.unindexSession(uuid) + eIndexes = map[string]map[string]utils.StringMap{ + "Tenant": map[string]utils.StringMap{ + "itsyscom.com": utils.StringMap{ + uuid2: true, + }, + }, + "Account": map[string]utils.StringMap{ + "account2": utils.StringMap{ + uuid2: true, + }, + }, + "Extra3": map[string]utils.StringMap{ + utils.MetaEmpty: utils.StringMap{ + uuid2: true, + }, + }, + "Extra4": map[string]utils.StringMap{ + "info2": utils.StringMap{ + uuid2: true, + }, + }, + } + if !reflect.DeepEqual(eIndexes, smg.sessionIndexes) { + t.Errorf("Expecting: %+v, received: %+v", eIndexes, smg.sessionIndexes) + } + +} diff --git a/utils/consts.go b/utils/consts.go index e5341b129..dfd719b57 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -260,6 +260,7 @@ const ( LCRCachePrefix = "LCR_" ALIAS_CONTEXT_RATING = "*rating" NOT_AVAILABLE = "N/A" + MetaEmpty = "*empty" CALL = "call" EXTRA_FIELDS = "ExtraFields" META_SURETAX = "*sure_tax"