From 9f0dd9842c1537c0e8a3d2e2a3da0c25a0f0b985 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 20 Aug 2018 12:19:40 +0200 Subject: [PATCH] Exporting MapEvent out of SafEvent so we can pass SafEvent around through RPC --- engine/mapevent.go | 4 ++-- engine/safevent.go | 43 +++++++++++++++++++++--------------- sessions/libsessions.go | 1 + sessions/session.go | 19 +++++++++++++--- sessions/sessions.go | 10 ++++----- sessions/smgreplc_it_test.go | 13 +++++++---- 6 files changed, 57 insertions(+), 33 deletions(-) diff --git a/engine/mapevent.go b/engine/mapevent.go index cc133c4d4..f48405078 100644 --- a/engine/mapevent.go +++ b/engine/mapevent.go @@ -90,8 +90,8 @@ func (me MapEvent) GetTimeIgnoreErrors(fldName string, tmz string) (t time.Time) } // Clone returns the cloned map -func (me MapEvent) Clone() (mp map[string]interface{}) { - mp = make(map[string]interface{}, len(me)) +func (me MapEvent) Clone() (mp MapEvent) { + mp = make(MapEvent, len(me)) for k, v := range me { mp[k] = v } diff --git a/engine/safevent.go b/engine/safevent.go index e1ee971ea..7cf379119 100644 --- a/engine/safevent.go +++ b/engine/safevent.go @@ -27,37 +27,44 @@ import ( ) func NewSafEvent(mp map[string]interface{}) *SafEvent { - return &SafEvent{me: NewMapEvent(mp)} + return &SafEvent{Me: NewMapEvent(mp)} } // SafEvent is a generic event which is safe to read/write from multiple goroutines type SafEvent struct { sync.RWMutex - me MapEvent + Me MapEvent // need it exportable so we can pass it on network +} + +func (se *SafEvent) Clone() (cln *SafEvent) { + se.RLock() + cln = &SafEvent{Me: se.Me.Clone()} + se.RUnlock() + return } // MapEvent offers access to MapEvent methods, avoiding locks func (se *SafEvent) MapEvent() (mp MapEvent) { - return se.me + return se.Me } func (se *SafEvent) String() (out string) { se.RLock() - out = se.me.String() + out = se.Me.String() se.RUnlock() return } func (se *SafEvent) HasField(fldName string) (has bool) { se.RLock() - has = se.me.HasField(fldName) + has = se.Me.HasField(fldName) se.RUnlock() return } func (se *SafEvent) Get(fldName string) (out interface{}, has bool) { se.RLock() - out, has = se.me[fldName] + out, has = se.Me[fldName] se.RUnlock() return } @@ -70,7 +77,7 @@ func (se *SafEvent) GetIgnoreErrors(fldName string) (out interface{}) { // Set will set a field's value func (se *SafEvent) Set(fldName string, val interface{}) { se.Lock() - se.me[fldName] = val + se.Me[fldName] = val se.Unlock() return } @@ -78,14 +85,14 @@ func (se *SafEvent) Set(fldName string, val interface{}) { // Remove will remove a field from map func (se *SafEvent) Remove(fldName string) { se.Lock() - delete(se.me, fldName) + delete(se.Me, fldName) se.Unlock() return } func (se *SafEvent) GetString(fldName string) (out string, err error) { se.RLock() - out, err = se.me.GetString(fldName) + out, err = se.Me.GetString(fldName) se.RUnlock() return } @@ -98,7 +105,7 @@ func (se *SafEvent) GetStringIgnoreErrors(fldName string) (out string) { // GetDuration returns a field as Duration func (se *SafEvent) GetDuration(fldName string) (d time.Duration, err error) { se.RLock() - d, err = se.me.GetDuration(fldName) + d, err = se.Me.GetDuration(fldName) se.RUnlock() return } @@ -138,7 +145,7 @@ func (se *SafEvent) GetDurationPtrOrDefault(fldName string, dflt *time.Duration) // GetTime returns a field as Time func (se *SafEvent) GetTime(fldName string, tmz string) (t time.Time, err error) { se.RLock() - t, err = se.me.GetTime(fldName, tmz) + t, err = se.Me.GetTime(fldName, tmz) se.RUnlock() return } @@ -154,9 +161,9 @@ func (se *SafEvent) GetTimeIgnoreErrors(fldName string, tmz string) (t time.Time func (se *SafEvent) GetSetString(fldName string, setVal string) (out string, err error) { se.Lock() defer se.Unlock() - outIface, has := se.me[fldName] + outIface, has := se.Me[fldName] if !has { - se.me[fldName] = setVal + se.Me[fldName] = setVal out = setVal return } @@ -167,7 +174,7 @@ func (se *SafEvent) GetSetString(fldName string, setVal string) (out string, err // GetMapInterface returns the map stored internally without cloning it func (se *SafEvent) GetMapInterface() (mp map[string]interface{}) { se.RLock() - mp = se.me + mp = se.Me se.RUnlock() return } @@ -175,7 +182,7 @@ func (se *SafEvent) GetMapInterface() (mp map[string]interface{}) { // AsMapInterface returns the cloned map stored internally func (se *SafEvent) AsMapInterface() (mp map[string]interface{}) { se.RLock() - mp = se.me.Clone() + mp = se.Me.Clone() se.RUnlock() return } @@ -184,7 +191,7 @@ func (se *SafEvent) AsMapInterface() (mp map[string]interface{}) { // most used when needing to export extraFields func (se *SafEvent) AsMapString(ignoredFlds utils.StringMap) (mp map[string]string, err error) { se.RLock() - mp, err = se.me.AsMapString(ignoredFlds) + mp, err = se.Me.AsMapString(ignoredFlds) se.RUnlock() return } @@ -193,7 +200,7 @@ func (se *SafEvent) AsMapString(ignoredFlds utils.StringMap) (mp map[string]stri // most used when needing to export extraFields func (se *SafEvent) AsMapStringIgnoreErrors(ignoredFlds utils.StringMap) (mp map[string]string) { se.RLock() - mp = se.me.AsMapStringIgnoreErrors(ignoredFlds) + mp = se.Me.AsMapStringIgnoreErrors(ignoredFlds) se.RUnlock() return } @@ -201,7 +208,7 @@ func (se *SafEvent) AsMapStringIgnoreErrors(ignoredFlds utils.StringMap) (mp map // AsCDR exports the SafEvent as CDR func (se *SafEvent) AsCDR(cfg *config.CGRConfig, tmz string) (cdr *CDR, err error) { se.RLock() - cdr, err = se.me.AsCDR(cfg, tmz) + cdr, err = se.Me.AsCDR(cfg, tmz) se.RUnlock() return } diff --git a/sessions/libsessions.go b/sessions/libsessions.go index d499024ae..9d65597b9 100644 --- a/sessions/libsessions.go +++ b/sessions/libsessions.go @@ -33,6 +33,7 @@ func getSessionTTL(ev *engine.SafEvent, cfgSessionTTL time.Duration, if err != utils.ErrNotFound { return } + err = nil ttl = cfgSessionTTL } if ttl == 0 { diff --git a/sessions/session.go b/sessions/session.go index 11f621631..229b5fce4 100644 --- a/sessions/session.go +++ b/sessions/session.go @@ -42,11 +42,12 @@ type SMGSession struct { CGRID string // Unique identifier for this session RunID string // Keep a reference for the derived run Timezone string - EventStart *engine.SafEvent // Event which started the session - CD *engine.CallDescriptor // initial CD used for debits, updated on each debit ResourceID string - EventCost *engine.EventCost + EventStart *engine.SafEvent // Event which started the session + CD *engine.CallDescriptor // initial CD used for debits, updated on each debit + EventCost *engine.EventCost + ExtraDuration time.Duration // keeps the current duration debited on top of what heas been asked LastUsage time.Duration // last requested Duration LastDebit time.Duration // last real debited duration @@ -54,6 +55,18 @@ type SMGSession struct { } +// Clone returns the cloned version of SMGSession +func (s *SMGSession) Clone() *SMGSession { + return &SMGSession{CGRID: s.CGRID, RunID: s.RunID, + Timezone: s.Timezone, ResourceID: s.ResourceID, + EventStart: s.EventStart.Clone(), + CD: s.CD.Clone(), + EventCost: s.EventCost.Clone(), + ExtraDuration: s.ExtraDuration, LastUsage: s.LastUsage, + LastDebit: s.LastDebit, TotalUsage: s.TotalUsage, + } +} + type SessionID struct { OriginHost string OriginID string diff --git a/sessions/sessions.go b/sessions/sessions.go index 59bf77251..a1572fd54 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -604,14 +604,13 @@ func (smg *SMGeneric) replicateSessionsWithID(cgrID string, passiveSessions bool ss[0].mux.RLock() // lock session so we can clone it after releasing the map lock } ssMux.RUnlock() - var ssCln []*SMGSession - err = utils.Clone(ss, &ssCln) + ssCln := make([]*SMGSession, len(ss)) + for i, s := range ss { + ssCln[i] = s.Clone() + } if len(ss) != 0 { ss[0].mux.RUnlock() } - if err != nil { - return - } var wg sync.WaitGroup for _, rplConn := range smgReplConns { if rplConn.Synchronous { @@ -892,7 +891,6 @@ func (smg *SMGeneric) UpdateSession(gev *engine.SafEvent, return } } - defer smg.replicateSessionsWithID(cgrID, false, smg.smgReplConns) for _, s := range aSessions[cgrID] { if s.RunID == utils.META_NONE { diff --git a/sessions/smgreplc_it_test.go b/sessions/smgreplc_it_test.go index 5a865d803..5613bbfa1 100644 --- a/sessions/smgreplc_it_test.go +++ b/sessions/smgreplc_it_test.go @@ -117,10 +117,13 @@ func TestSMGRplcInitiate(t *testing.T) { t.Error(err) } var reply string - if err := smgRplcMstrRPC.Call("SMGenericV1.TerminateSession", smgEv, &reply); err == nil && err.Error() != rpcclient.ErrSessionNotFound.Error() { // Update should return rpcclient.ErrSessionNotFound + if err := smgRplcMstrRPC.Call("SMGenericV1.TerminateSession", + smgEv, &reply); err == nil && + err.Error() != rpcclient.ErrSessionNotFound.Error() { // Update should return rpcclient.ErrSessionNotFound t.Error(err) } - if err := smgRplcMstrRPC.Call(utils.SMGenericV2InitiateSession, smgEv, &maxUsage); err != nil { + if err := smgRplcMstrRPC.Call(utils.SMGenericV2InitiateSession, + smgEv, &maxUsage); err != nil { t.Error(err) } if maxUsage != time.Duration(90*time.Second) { @@ -128,14 +131,16 @@ func TestSMGRplcInitiate(t *testing.T) { } time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated var aSessions []*ActiveSession - if err := smgRplcMstrRPC.Call("SMGenericV1.GetActiveSessions", map[string]string{utils.OriginID: "123451"}, &aSessions); err != nil { + if err := smgRplcMstrRPC.Call("SMGenericV1.GetActiveSessions", + map[string]string{utils.OriginID: "123451"}, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 { t.Errorf("Unexpected number of sessions received: %+v", aSessions) } else if aSessions[0].Usage != time.Duration(90)*time.Second { t.Errorf("Received usage: %v", aSessions[0].Usage) } - if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions", map[string]string{utils.OriginID: "123451"}, &pSessions); err != nil { + if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions", + map[string]string{utils.OriginID: "123451"}, &pSessions); err != nil { t.Error(err) } else if len(pSessions) != 1 { t.Errorf("PassiveSessions: %+v", pSessions)