Exporting MapEvent out of SafEvent so we can pass SafEvent around through RPC

This commit is contained in:
DanB
2018-08-20 12:19:40 +02:00
parent db0a03572f
commit 9f0dd9842c
6 changed files with 57 additions and 33 deletions

View File

@@ -90,8 +90,8 @@ func (me MapEvent) GetTimeIgnoreErrors(fldName string, tmz string) (t time.Time)
}
// Clone returns the cloned map
func (me MapEvent) Clone() (mp map[string]interface{}) {
mp = make(map[string]interface{}, len(me))
func (me MapEvent) Clone() (mp MapEvent) {
mp = make(MapEvent, len(me))
for k, v := range me {
mp[k] = v
}

View File

@@ -27,37 +27,44 @@ import (
)
func NewSafEvent(mp map[string]interface{}) *SafEvent {
return &SafEvent{me: NewMapEvent(mp)}
return &SafEvent{Me: NewMapEvent(mp)}
}
// SafEvent is a generic event which is safe to read/write from multiple goroutines
type SafEvent struct {
sync.RWMutex
me MapEvent
Me MapEvent // need it exportable so we can pass it on network
}
func (se *SafEvent) Clone() (cln *SafEvent) {
se.RLock()
cln = &SafEvent{Me: se.Me.Clone()}
se.RUnlock()
return
}
// MapEvent offers access to MapEvent methods, avoiding locks
func (se *SafEvent) MapEvent() (mp MapEvent) {
return se.me
return se.Me
}
func (se *SafEvent) String() (out string) {
se.RLock()
out = se.me.String()
out = se.Me.String()
se.RUnlock()
return
}
func (se *SafEvent) HasField(fldName string) (has bool) {
se.RLock()
has = se.me.HasField(fldName)
has = se.Me.HasField(fldName)
se.RUnlock()
return
}
func (se *SafEvent) Get(fldName string) (out interface{}, has bool) {
se.RLock()
out, has = se.me[fldName]
out, has = se.Me[fldName]
se.RUnlock()
return
}
@@ -70,7 +77,7 @@ func (se *SafEvent) GetIgnoreErrors(fldName string) (out interface{}) {
// Set will set a field's value
func (se *SafEvent) Set(fldName string, val interface{}) {
se.Lock()
se.me[fldName] = val
se.Me[fldName] = val
se.Unlock()
return
}
@@ -78,14 +85,14 @@ func (se *SafEvent) Set(fldName string, val interface{}) {
// Remove will remove a field from map
func (se *SafEvent) Remove(fldName string) {
se.Lock()
delete(se.me, fldName)
delete(se.Me, fldName)
se.Unlock()
return
}
func (se *SafEvent) GetString(fldName string) (out string, err error) {
se.RLock()
out, err = se.me.GetString(fldName)
out, err = se.Me.GetString(fldName)
se.RUnlock()
return
}
@@ -98,7 +105,7 @@ func (se *SafEvent) GetStringIgnoreErrors(fldName string) (out string) {
// GetDuration returns a field as Duration
func (se *SafEvent) GetDuration(fldName string) (d time.Duration, err error) {
se.RLock()
d, err = se.me.GetDuration(fldName)
d, err = se.Me.GetDuration(fldName)
se.RUnlock()
return
}
@@ -138,7 +145,7 @@ func (se *SafEvent) GetDurationPtrOrDefault(fldName string, dflt *time.Duration)
// GetTime returns a field as Time
func (se *SafEvent) GetTime(fldName string, tmz string) (t time.Time, err error) {
se.RLock()
t, err = se.me.GetTime(fldName, tmz)
t, err = se.Me.GetTime(fldName, tmz)
se.RUnlock()
return
}
@@ -154,9 +161,9 @@ func (se *SafEvent) GetTimeIgnoreErrors(fldName string, tmz string) (t time.Time
func (se *SafEvent) GetSetString(fldName string, setVal string) (out string, err error) {
se.Lock()
defer se.Unlock()
outIface, has := se.me[fldName]
outIface, has := se.Me[fldName]
if !has {
se.me[fldName] = setVal
se.Me[fldName] = setVal
out = setVal
return
}
@@ -167,7 +174,7 @@ func (se *SafEvent) GetSetString(fldName string, setVal string) (out string, err
// GetMapInterface returns the map stored internally without cloning it
func (se *SafEvent) GetMapInterface() (mp map[string]interface{}) {
se.RLock()
mp = se.me
mp = se.Me
se.RUnlock()
return
}
@@ -175,7 +182,7 @@ func (se *SafEvent) GetMapInterface() (mp map[string]interface{}) {
// AsMapInterface returns the cloned map stored internally
func (se *SafEvent) AsMapInterface() (mp map[string]interface{}) {
se.RLock()
mp = se.me.Clone()
mp = se.Me.Clone()
se.RUnlock()
return
}
@@ -184,7 +191,7 @@ func (se *SafEvent) AsMapInterface() (mp map[string]interface{}) {
// most used when needing to export extraFields
func (se *SafEvent) AsMapString(ignoredFlds utils.StringMap) (mp map[string]string, err error) {
se.RLock()
mp, err = se.me.AsMapString(ignoredFlds)
mp, err = se.Me.AsMapString(ignoredFlds)
se.RUnlock()
return
}
@@ -193,7 +200,7 @@ func (se *SafEvent) AsMapString(ignoredFlds utils.StringMap) (mp map[string]stri
// most used when needing to export extraFields
func (se *SafEvent) AsMapStringIgnoreErrors(ignoredFlds utils.StringMap) (mp map[string]string) {
se.RLock()
mp = se.me.AsMapStringIgnoreErrors(ignoredFlds)
mp = se.Me.AsMapStringIgnoreErrors(ignoredFlds)
se.RUnlock()
return
}
@@ -201,7 +208,7 @@ func (se *SafEvent) AsMapStringIgnoreErrors(ignoredFlds utils.StringMap) (mp map
// AsCDR exports the SafEvent as CDR
func (se *SafEvent) AsCDR(cfg *config.CGRConfig, tmz string) (cdr *CDR, err error) {
se.RLock()
cdr, err = se.me.AsCDR(cfg, tmz)
cdr, err = se.Me.AsCDR(cfg, tmz)
se.RUnlock()
return
}

View File

@@ -33,6 +33,7 @@ func getSessionTTL(ev *engine.SafEvent, cfgSessionTTL time.Duration,
if err != utils.ErrNotFound {
return
}
err = nil
ttl = cfgSessionTTL
}
if ttl == 0 {

View File

@@ -42,11 +42,12 @@ type SMGSession struct {
CGRID string // Unique identifier for this session
RunID string // Keep a reference for the derived run
Timezone string
EventStart *engine.SafEvent // Event which started the session
CD *engine.CallDescriptor // initial CD used for debits, updated on each debit
ResourceID string
EventCost *engine.EventCost
EventStart *engine.SafEvent // Event which started the session
CD *engine.CallDescriptor // initial CD used for debits, updated on each debit
EventCost *engine.EventCost
ExtraDuration time.Duration // keeps the current duration debited on top of what heas been asked
LastUsage time.Duration // last requested Duration
LastDebit time.Duration // last real debited duration
@@ -54,6 +55,18 @@ type SMGSession struct {
}
// Clone returns the cloned version of SMGSession
func (s *SMGSession) Clone() *SMGSession {
return &SMGSession{CGRID: s.CGRID, RunID: s.RunID,
Timezone: s.Timezone, ResourceID: s.ResourceID,
EventStart: s.EventStart.Clone(),
CD: s.CD.Clone(),
EventCost: s.EventCost.Clone(),
ExtraDuration: s.ExtraDuration, LastUsage: s.LastUsage,
LastDebit: s.LastDebit, TotalUsage: s.TotalUsage,
}
}
type SessionID struct {
OriginHost string
OriginID string

View File

@@ -604,14 +604,13 @@ func (smg *SMGeneric) replicateSessionsWithID(cgrID string, passiveSessions bool
ss[0].mux.RLock() // lock session so we can clone it after releasing the map lock
}
ssMux.RUnlock()
var ssCln []*SMGSession
err = utils.Clone(ss, &ssCln)
ssCln := make([]*SMGSession, len(ss))
for i, s := range ss {
ssCln[i] = s.Clone()
}
if len(ss) != 0 {
ss[0].mux.RUnlock()
}
if err != nil {
return
}
var wg sync.WaitGroup
for _, rplConn := range smgReplConns {
if rplConn.Synchronous {
@@ -892,7 +891,6 @@ func (smg *SMGeneric) UpdateSession(gev *engine.SafEvent,
return
}
}
defer smg.replicateSessionsWithID(cgrID, false, smg.smgReplConns)
for _, s := range aSessions[cgrID] {
if s.RunID == utils.META_NONE {

View File

@@ -117,10 +117,13 @@ func TestSMGRplcInitiate(t *testing.T) {
t.Error(err)
}
var reply string
if err := smgRplcMstrRPC.Call("SMGenericV1.TerminateSession", smgEv, &reply); err == nil && err.Error() != rpcclient.ErrSessionNotFound.Error() { // Update should return rpcclient.ErrSessionNotFound
if err := smgRplcMstrRPC.Call("SMGenericV1.TerminateSession",
smgEv, &reply); err == nil &&
err.Error() != rpcclient.ErrSessionNotFound.Error() { // Update should return rpcclient.ErrSessionNotFound
t.Error(err)
}
if err := smgRplcMstrRPC.Call(utils.SMGenericV2InitiateSession, smgEv, &maxUsage); err != nil {
if err := smgRplcMstrRPC.Call(utils.SMGenericV2InitiateSession,
smgEv, &maxUsage); err != nil {
t.Error(err)
}
if maxUsage != time.Duration(90*time.Second) {
@@ -128,14 +131,16 @@ func TestSMGRplcInitiate(t *testing.T) {
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated
var aSessions []*ActiveSession
if err := smgRplcMstrRPC.Call("SMGenericV1.GetActiveSessions", map[string]string{utils.OriginID: "123451"}, &aSessions); err != nil {
if err := smgRplcMstrRPC.Call("SMGenericV1.GetActiveSessions",
map[string]string{utils.OriginID: "123451"}, &aSessions); err != nil {
t.Error(err)
} else if len(aSessions) != 1 {
t.Errorf("Unexpected number of sessions received: %+v", aSessions)
} else if aSessions[0].Usage != time.Duration(90)*time.Second {
t.Errorf("Received usage: %v", aSessions[0].Usage)
}
if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions", map[string]string{utils.OriginID: "123451"}, &pSessions); err != nil {
if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions",
map[string]string{utils.OriginID: "123451"}, &pSessions); err != nil {
t.Error(err)
} else if len(pSessions) != 1 {
t.Errorf("PassiveSessions: %+v", pSessions)