SMG - session indexing implementation with tests

This commit is contained in:
DanB
2016-08-25 09:21:32 +02:00
parent dda8fdd106
commit 8a27dfc4d2
10 changed files with 264 additions and 23 deletions

View File

@@ -42,7 +42,7 @@ func TestBalanceStoreRestore(t *testing.T) {
if err != nil {
t.Error("Error restoring balance: ", err)
}
t.Logf("INITIAL: %+v", b)
//t.Logf("INITIAL: %+v", b)
if !b.Equal(b1) {
t.Errorf("Balance store/restore failed: expected %+v was %+v", b, b1)
}
@@ -321,7 +321,7 @@ func TestDebitCreditZeroMinute(t *testing.T) {
if err != nil {
t.Error("Error debiting balance: ", err)
}
t.Logf("%+v", cc.Timespans)
//t.Logf("%+v", cc.Timespans)
if cc.Timespans[0].Increments[0].BalanceInfo.Unit.UUID != "testb" ||
cc.Timespans[0].Increments[0].Duration != time.Minute {
t.Error("Error setting balance id to increment: ", cc.Timespans[0].Increments[0])

View File

@@ -827,6 +827,11 @@ func (cdr *CDR) AsExportRecord(exportFields []*config.CfgCdrField, costShiftDigi
return expRecord, nil
}
// Part of event interface
func (cdr *CDR) AsMapStringIface() (map[string]interface{}, error) {
return nil, utils.ErrNotImplemented
}
type ExternalCDR struct {
CGRID string
RunID string

View File

@@ -51,4 +51,5 @@ type Event interface {
String() string
AsEvent(string) Event
ComputeLcr() bool
AsMapStringIface() (map[string]interface{}, error)
}

View File

@@ -415,6 +415,10 @@ func (fsev FSEvent) AsCallDescriptor() (*engine.CallDescriptor, error) {
return lcrReq.AsCallDescriptor(config.CgrConfig().DefaultTimezone)
}
func (fsev FSEvent) AsMapStringIface() (map[string]interface{}, error) {
return nil, utils.ErrNotImplemented
}
// Converts a slice of strings into a FS array string, contains len(array) at first index since FS does not support len(ARRAY::) for now
func SliceAsFsArray(slc []string) string {
arry := ""

View File

@@ -395,3 +395,7 @@ func (kev KamEvent) ComputeLcr() bool {
return computeLcr
}
}
func (kev KamEvent) AsMapStringIface() (map[string]interface{}, error) {
return nil, utils.ErrNotImplemented
}

View File

@@ -250,7 +250,7 @@ func (osipsev *OsipsEvent) PassesFieldFilter(*utils.RSRField) (bool, string) {
}
func (osipsev *OsipsEvent) GetExtraFields() map[string]string {
primaryFields := []string{TO_TAG, SETUP_DURATION, OSIPS_SETUP_TIME, "method", "callid", "sip_reason", OSIPS_EVENT_TIME, "sip_code", "duration", "from_tag", "dialog_id",
CGR_TENANT, CGR_CATEGORY, CGR_REQTYPE, CGR_ACCOUNT, CGR_SUBJECT, CGR_DESTINATION, utils.CGR_SUPPLIER, CGR_PDD,CGR_ANSWERTIME}
CGR_TENANT, CGR_CATEGORY, CGR_REQTYPE, CGR_ACCOUNT, CGR_SUBJECT, CGR_DESTINATION, utils.CGR_SUPPLIER, CGR_PDD, CGR_ANSWERTIME}
extraFields := make(map[string]string)
for field, val := range osipsev.osipsEvent.AttrValues {
if !utils.IsSliceMember(primaryFields, field) {
@@ -309,3 +309,7 @@ func (osipsEv *OsipsEvent) ComputeLcr() bool {
return computeLcr
}
}
func (osipsEv *OsipsEvent) AsMapStringIface() (map[string]interface{}, error) {
return nil, utils.ErrNotImplemented
}

View File

@@ -21,6 +21,7 @@ package sessionmanager
import (
"encoding/json"
"fmt"
"reflect"
"time"
"github.com/cgrates/cgrates/engine"
@@ -314,6 +315,15 @@ func (s *Session) AsActiveSessions() []*ActiveSession {
return aSessions
}
func (s *Session) AsMapStringIface() (map[string]interface{}, error) {
mp := make(map[string]interface{})
v := reflect.ValueOf(s).Elem()
for i := 0; i < v.NumField(); i++ {
mp[v.Type().Field(i).Name] = v.Field(i).Interface()
}
return mp, nil
}
// Will be used when displaying active sessions via RPC
type ActiveSession struct {
CgrId string

View File

@@ -37,7 +37,9 @@ var ErrPartiallyExecuted = errors.New("Partially executed")
func NewSMGeneric(cgrCfg *config.CGRConfig, rater rpcclient.RpcClientConnection, cdrsrv rpcclient.RpcClientConnection, timezone string, extconns *SMGExternalConnections) *SMGeneric {
gsm := &SMGeneric{cgrCfg: cgrCfg, rater: rater, cdrsrv: cdrsrv, extconns: extconns, timezone: timezone,
sessions: make(map[string][]*SMGSession), sessionTerminators: make(map[string]*smgSessionTerminator), sessionsMux: new(sync.RWMutex), guard: engine.Guardian}
sessions: make(map[string][]*SMGSession), sessionTerminators: make(map[string]*smgSessionTerminator),
sessionIndexes: make(map[string]map[string]utils.StringMap),
sessionsMux: new(sync.RWMutex), sessionIndexMux: new(sync.RWMutex), guard: engine.Guardian}
return gsm
}
@@ -46,11 +48,14 @@ type SMGeneric struct {
rater rpcclient.RpcClientConnection
cdrsrv rpcclient.RpcClientConnection
timezone string
sessions map[string][]*SMGSession //Group sessions per sessionId, multiple runs based on derived charging
sessionTerminators map[string]*smgSessionTerminator // terminate and cleanup the session if timer expires
extconns *SMGExternalConnections // Reference towards external connections manager
sessionsMux *sync.RWMutex // Locks sessions map
guard *engine.GuardianLock // Used to lock on uuid
sessions map[string][]*SMGSession //Group sessions per sessionId, multiple runs based on derived charging
sessionTerminators map[string]*smgSessionTerminator // terminate and cleanup the session if timer expires
sessionIndexes map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue]utils.StringMap[sesionID]
extconns *SMGExternalConnections // Reference towards external connections manager
sessionsMux *sync.RWMutex // Locks sessions map
sessionIndexMux *sync.RWMutex
guard *engine.GuardianLock // Used to lock on uuid
}
type smgSessionTerminator struct {
timer *time.Timer
@@ -94,8 +99,9 @@ func (self *SMGeneric) ttlTerminate(s *SMGSession, tmtr *smgSessionTerminator) {
self.cdrsrv.Call("CdrsV1.ProcessCDR", cdr, &reply)
}
func (self *SMGeneric) indexSession(uuid string, s *SMGSession) {
func (self *SMGeneric) recordSession(uuid string, s *SMGSession) {
self.sessionsMux.Lock()
defer self.sessionsMux.Unlock()
self.sessions[uuid] = append(self.sessions[uuid], s)
if self.cgrCfg.SmGenericConfig.SessionTTL != 0 {
if _, found := self.sessionTerminators[uuid]; !found {
@@ -124,11 +130,11 @@ func (self *SMGeneric) indexSession(uuid string, s *SMGSession) {
}
}
self.sessionsMux.Unlock()
}
// Remove session from session list, removes all related in case of multiple runs, true if item was found
func (self *SMGeneric) unindexSession(uuid string) bool {
func (self *SMGeneric) unrecordSession(uuid string) bool {
self.sessionsMux.Lock()
defer self.sessionsMux.Unlock()
if _, found := self.sessions[uuid]; !found {
@@ -142,11 +148,53 @@ func (self *SMGeneric) unindexSession(uuid string) bool {
return true
}
// Returns all sessions handled by the SM
func (self *SMGeneric) getSessions() map[string][]*SMGSession {
self.sessionsMux.Lock()
defer self.sessionsMux.Unlock()
return self.sessions
// indexSession explores settings and builds self.sessionIndexes based on that
func (self *SMGeneric) indexSession(uuid string, s *SMGSession) bool {
self.sessionIndexMux.Lock()
defer self.sessionIndexMux.Unlock()
ev := s.eventStart
for _, fieldName := range self.cgrCfg.SmGenericConfig.SessionIndexes {
fieldVal, err := utils.ReflectFieldAsString(ev, fieldName, "")
if err != nil {
if err == utils.ErrNotFound {
fieldVal = utils.NOT_AVAILABLE
} else {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Error retrieving field: %s from event: %+v", fieldName, ev))
continue
}
}
if fieldVal == "" {
fieldVal = utils.MetaEmpty
}
if _, hasFieldName := self.sessionIndexes[fieldName]; !hasFieldName { // Init it here so we can minimize
self.sessionIndexes[fieldName] = make(map[string]utils.StringMap)
}
if _, hasFieldVal := self.sessionIndexes[fieldName][fieldVal]; !hasFieldVal {
self.sessionIndexes[fieldName][fieldVal] = make(utils.StringMap)
}
self.sessionIndexes[fieldName][fieldVal][uuid] = true
}
return true
}
// unindexSession removes a session from indexes
func (self *SMGeneric) unindexSession(uuid string) bool {
self.sessionIndexMux.Lock()
defer self.sessionIndexMux.Unlock()
for fldName := range self.sessionIndexes {
for fldVal := range self.sessionIndexes[fldName] {
if _, hasUUID := self.sessionIndexes[fldName][fldVal][uuid]; hasUUID {
delete(self.sessionIndexes[fldName][fldVal], uuid)
if len(self.sessionIndexes[fldName][fldVal]) == 0 {
delete(self.sessionIndexes[fldName], fldVal)
}
if len(self.sessionIndexes[fldName]) == 0 {
delete(self.sessionIndexes, fldName)
}
}
}
}
return false
}
func (self *SMGeneric) getSessionIDsForPrefix(prefix string) []string {
@@ -182,7 +230,7 @@ func (self *SMGeneric) sessionStart(evStart SMGenericEvent, connId string) error
for _, sessionRun := range sessionRuns {
s := &SMGSession{eventStart: evStart, connId: connId, runId: sessionRun.DerivedCharger.RunID, timezone: self.timezone,
rater: self.rater, cdrsrv: self.cdrsrv, cd: sessionRun.CallDescriptor}
self.indexSession(sessionId, s)
self.recordSession(sessionId, s)
//utils.Logger.Info(fmt.Sprintf("<SMGeneric> Starting session: %s, runId: %s", sessionId, s.runId))
if self.cgrCfg.SmGenericConfig.DebitInterval != 0 {
s.stopDebit = stopDebitChan
@@ -205,7 +253,7 @@ func (self *SMGeneric) sessionEnd(sessionId string, usage time.Duration) error {
if len(ss) == 0 { // Not handled by us
return nil, nil
}
if !self.unindexSession(sessionId) { // Unreference it early so we avoid concurrency
if !self.unrecordSession(sessionId) { // Unreference it early so we avoid concurrency
return nil, nil // Did not find the session so no need to close it anymore
}
for idx, s := range ss {
@@ -247,9 +295,9 @@ func (self *SMGeneric) sessionRelocate(sessionID, initialID string) error {
}
for i, s := range ss {
s.eventStart[utils.ACCID] = sessionID // Overwrite initialSessionID with new one
self.indexSession(sessionID, s)
self.recordSession(sessionID, s)
if i == 0 {
self.unindexSession(initialID)
self.unrecordSession(initialID)
}
}
return nil, nil
@@ -503,7 +551,9 @@ func (self *SMGeneric) Connect() error {
// Used by APIer to retrieve sessions
func (self *SMGeneric) Sessions() map[string][]*SMGSession {
return self.getSessions()
self.sessionsMux.RLock()
defer self.sessionsMux.RUnlock()
return self.sessions
}
func (self *SMGeneric) Timezone() string {
@@ -512,7 +562,7 @@ func (self *SMGeneric) Timezone() string {
// System shutdown
func (self *SMGeneric) Shutdown() error {
for ssId := range self.getSessions() { // Force sessions shutdown
for ssId := range self.Sessions() { // Force sessions shutdown
self.sessionEnd(ssId, time.Duration(self.cgrCfg.MaxCallDuration))
}
return nil

View File

@@ -0,0 +1,162 @@
/*
Real-time Charging System for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package sessionmanager
import (
"reflect"
"testing"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
func TestSMGSessionIndexing(t *testing.T) {
cfg, _ = config.NewDefaultCGRConfig()
cfg.SmGenericConfig.SessionIndexes = []string{"Tenant", "Account", "Extra3", "Extra4"}
smg := NewSMGeneric(cfg, nil, nil, "UTC", nil)
smGev := SMGenericEvent{
utils.EVENT_NAME: "TEST_EVENT",
utils.TOR: "*voice",
utils.ACCID: "12345",
utils.DIRECTION: "*out",
utils.ACCOUNT: "account1",
utils.SUBJECT: "subject1",
utils.DESTINATION: "+4986517174963",
utils.CATEGORY: "call",
utils.TENANT: "cgrates.org",
utils.REQTYPE: "*prepaid",
utils.SETUP_TIME: "2015-11-09 14:21:24",
utils.ANSWER_TIME: "2015-11-09 14:22:02",
utils.USAGE: "1m23s",
utils.LastUsed: "21s",
utils.PDD: "300ms",
utils.SUPPLIER: "supplier1",
utils.DISCONNECT_CAUSE: "NORMAL_DISCONNECT",
utils.CDRHOST: "127.0.0.1",
"Extra1": "Value1",
"Extra2": 5,
"Extra3": "",
}
// Index first session
smgSession := &SMGSession{eventStart: smGev}
uuid := smGev.GetUUID()
smg.indexSession(uuid, smgSession)
eIndexes := map[string]map[string]utils.StringMap{
"Tenant": map[string]utils.StringMap{
"cgrates.org": utils.StringMap{
uuid: true,
},
},
"Account": map[string]utils.StringMap{
"account1": utils.StringMap{
uuid: true,
},
},
"Extra3": map[string]utils.StringMap{
utils.MetaEmpty: utils.StringMap{
uuid: true,
},
},
"Extra4": map[string]utils.StringMap{
utils.NOT_AVAILABLE: utils.StringMap{
uuid: true,
},
},
}
if !reflect.DeepEqual(eIndexes, smg.sessionIndexes) {
t.Errorf("Expecting: %+v, received: %+v", eIndexes, smg.sessionIndexes)
}
// Index seccond session
smGev2 := SMGenericEvent{
utils.EVENT_NAME: "TEST_EVENT2",
utils.ACCID: "12346",
utils.DIRECTION: "*out",
utils.ACCOUNT: "account2",
utils.DESTINATION: "+4986517174964",
utils.TENANT: "itsyscom.com",
"Extra3": "",
"Extra4": "info2",
}
uuid2 := smGev2.GetUUID()
smgSession2 := &SMGSession{eventStart: smGev2}
smg.indexSession(uuid2, smgSession2)
eIndexes = map[string]map[string]utils.StringMap{
"Tenant": map[string]utils.StringMap{
"cgrates.org": utils.StringMap{
uuid: true,
},
"itsyscom.com": utils.StringMap{
uuid2: true,
},
},
"Account": map[string]utils.StringMap{
"account1": utils.StringMap{
uuid: true,
},
"account2": utils.StringMap{
uuid2: true,
},
},
"Extra3": map[string]utils.StringMap{
utils.MetaEmpty: utils.StringMap{
uuid: true,
uuid2: true,
},
},
"Extra4": map[string]utils.StringMap{
utils.NOT_AVAILABLE: utils.StringMap{
uuid: true,
},
"info2": utils.StringMap{
uuid2: true,
},
},
}
if !reflect.DeepEqual(eIndexes, smg.sessionIndexes) {
t.Errorf("Expecting: %+v, received: %+v", eIndexes, smg.sessionIndexes)
}
// Unidex first session
smg.unindexSession(uuid)
eIndexes = map[string]map[string]utils.StringMap{
"Tenant": map[string]utils.StringMap{
"itsyscom.com": utils.StringMap{
uuid2: true,
},
},
"Account": map[string]utils.StringMap{
"account2": utils.StringMap{
uuid2: true,
},
},
"Extra3": map[string]utils.StringMap{
utils.MetaEmpty: utils.StringMap{
uuid2: true,
},
},
"Extra4": map[string]utils.StringMap{
"info2": utils.StringMap{
uuid2: true,
},
},
}
if !reflect.DeepEqual(eIndexes, smg.sessionIndexes) {
t.Errorf("Expecting: %+v, received: %+v", eIndexes, smg.sessionIndexes)
}
}

View File

@@ -260,6 +260,7 @@ const (
LCRCachePrefix = "LCR_"
ALIAS_CONTEXT_RATING = "*rating"
NOT_AVAILABLE = "N/A"
MetaEmpty = "*empty"
CALL = "call"
EXTRA_FIELDS = "ExtraFields"
META_SURETAX = "*sure_tax"