From 551f9ea69d745b4bd917554add635b0463680dbd Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 27 Oct 2016 11:45:50 +0200 Subject: [PATCH 1/4] SMGSession exporting more fields so we can replicate, ActiveSession.RunId -> ActiveSession.RunID --- sessionmanager/session.go | 4 +- sessionmanager/smg_session.go | 180 +++++++++++++++---------------- sessionmanager/smgeneric.go | 36 +++---- sessionmanager/smgeneric_test.go | 8 +- 4 files changed, 112 insertions(+), 116 deletions(-) diff --git a/sessionmanager/session.go b/sessionmanager/session.go index df0ecd1b2..04ff8abf1 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -300,7 +300,7 @@ func (s *Session) AsActiveSessions() []*ActiveSession { SMId: "UNKNOWN", } if sessionRun.DerivedCharger != nil { - aSession.RunId = sessionRun.DerivedCharger.RunID + aSession.RunID = sessionRun.DerivedCharger.RunID } if sessionRun.CallDescriptor != nil { aSession.LoopIndex = sessionRun.CallDescriptor.LoopIndex @@ -345,7 +345,7 @@ type ActiveSession struct { ExtraFields map[string]string // Extra fields to be stored in CDR SMId string SMConnId string - RunId string + RunID string LoopIndex float64 // indicates the position of this segment in a cost request loop DurationIndex time.Duration // the call duration so far (till TimeEnd) MaxRate float64 diff --git a/sessionmanager/smg_session.go b/sessionmanager/smg_session.go index d136d1fbb..e7a101a4a 100644 --- a/sessionmanager/smg_session.go +++ b/sessionmanager/smg_session.go @@ -30,20 +30,20 @@ import ( // One session handled by SM type SMGSession struct { - eventStart SMGenericEvent // Event which started + EventStart SMGenericEvent // Event which started stopDebit chan struct{} // Channel to communicate with debit loops when closing the session - runId string // Keep a reference for the derived run - timezone string + RunID string // Keep a reference for the derived run + Timezone string + CD *engine.CallDescriptor + SessionCDs []*engine.CallDescriptor + CallCosts []*engine.CallCost + ExtraDuration time.Duration // keeps the current duration debited on top of what heas been asked + LastUsage time.Duration // last requested Duration + LastDebit time.Duration // last real debited duration + TotalUsage time.Duration // sum of lastUsage + clntConn rpcclient.RpcClientConnection // Reference towards client connection on SMG side so we can disconnect. rater rpcclient.RpcClientConnection // Connector to Rater service cdrsrv rpcclient.RpcClientConnection // Connector to CDRS service - cd *engine.CallDescriptor - sessionCds []*engine.CallDescriptor - callCosts []*engine.CallCost - extraDuration time.Duration // keeps the current duration debited on top of what heas been asked - lastUsage time.Duration // last requested Duration - lastDebit time.Duration // last real debited duration - totalUsage time.Duration // sum of lastUsage - clntConn rpcclient.RpcClientConnection // Reference towards client connection on SMG side so we can disconnect. } // Called in case of automatic debits @@ -56,19 +56,19 @@ func (self *SMGSession) debitLoop(debitInterval time.Duration) { return case <-time.After(sleepDur): if maxDebit, err := self.debit(debitInterval, nil); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not complete debit operation on session: %s, error: %s", self.eventStart.GetUUID(), err.Error())) + utils.Logger.Err(fmt.Sprintf(" Could not complete debit operation on session: %s, error: %s", self.EventStart.GetUUID(), err.Error())) disconnectReason := SYSTEM_ERROR if err.Error() == utils.ErrUnauthorizedDestination.Error() { disconnectReason = err.Error() } if err := self.disconnectSession(disconnectReason); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not disconnect session: %s, error: %s", self.eventStart.GetUUID(), err.Error())) + utils.Logger.Err(fmt.Sprintf(" Could not disconnect session: %s, error: %s", self.EventStart.GetUUID(), err.Error())) } return } else if maxDebit < debitInterval { time.Sleep(maxDebit) if err := self.disconnectSession(INSUFFICIENT_FUNDS); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not disconnect session: %s, error: %s", self.eventStart.GetUUID(), err.Error())) + utils.Logger.Err(fmt.Sprintf(" Could not disconnect session: %s, error: %s", self.EventStart.GetUUID(), err.Error())) } return } @@ -83,56 +83,56 @@ func (self *SMGSession) debit(dur time.Duration, lastUsed *time.Duration) (time. //utils.Logger.Debug(fmt.Sprintf("### SMGSession.debit, dur: %+v, lastUsed: %+v, session: %+v", dur, lastUsed, self)) requestedDuration := dur if lastUsed != nil { - self.extraDuration = self.lastDebit - *lastUsed - if *lastUsed != self.lastUsage { + self.ExtraDuration = self.LastDebit - *lastUsed + if *lastUsed != self.LastUsage { // total usage correction - self.totalUsage -= self.lastUsage - self.totalUsage += *lastUsed + self.TotalUsage -= self.LastUsage + self.TotalUsage += *lastUsed } } // apply correction from previous run - if self.extraDuration < dur { - dur -= self.extraDuration + if self.ExtraDuration < dur { + dur -= self.ExtraDuration } else { - self.lastUsage = requestedDuration - self.totalUsage += self.lastUsage - ccDuration := self.extraDuration // fake ccDuration - self.extraDuration -= dur + self.LastUsage = requestedDuration + self.TotalUsage += self.LastUsage + ccDuration := self.ExtraDuration // fake ccDuration + self.ExtraDuration -= dur return ccDuration, nil } - initialExtraDuration := self.extraDuration - self.extraDuration = 0 - if self.cd.LoopIndex > 0 { - self.cd.TimeStart = self.cd.TimeEnd + initialExtraDuration := self.ExtraDuration + self.ExtraDuration = 0 + if self.CD.LoopIndex > 0 { + self.CD.TimeStart = self.CD.TimeEnd } - self.cd.TimeEnd = self.cd.TimeStart.Add(dur) - self.cd.DurationIndex += dur + self.CD.TimeEnd = self.CD.TimeStart.Add(dur) + self.CD.DurationIndex += dur cc := &engine.CallCost{} - if err := self.rater.Call("Responder.MaxDebit", self.cd, cc); err != nil { - self.lastUsage = 0 - self.lastDebit = 0 + if err := self.rater.Call("Responder.MaxDebit", self.CD, cc); err != nil { + self.LastUsage = 0 + self.LastDebit = 0 return 0, err } // cd corrections - self.cd.TimeEnd = cc.GetEndTime() // set debited timeEnd + self.CD.TimeEnd = cc.GetEndTime() // set debited timeEnd // update call duration with real debited duration ccDuration := cc.GetDuration() if ccDuration != dur { - self.extraDuration = ccDuration - dur + self.ExtraDuration = ccDuration - dur } if ccDuration >= dur { - self.lastUsage = requestedDuration + self.LastUsage = requestedDuration } else { - self.lastUsage = ccDuration + self.LastUsage = ccDuration } - self.cd.DurationIndex -= dur - self.cd.DurationIndex += ccDuration - self.cd.MaxCostSoFar += cc.Cost - self.cd.LoopIndex += 1 - self.sessionCds = append(self.sessionCds, self.cd.Clone()) - self.callCosts = append(self.callCosts, cc) - self.lastDebit = initialExtraDuration + ccDuration - self.totalUsage += self.lastUsage + self.CD.DurationIndex -= dur + self.CD.DurationIndex += ccDuration + self.CD.MaxCostSoFar += cc.Cost + self.CD.LoopIndex += 1 + self.SessionCDs = append(self.SessionCDs, self.CD.Clone()) + self.CallCosts = append(self.CallCosts, cc) + self.LastDebit = initialExtraDuration + ccDuration + self.TotalUsage += self.LastUsage if ccDuration >= dur { // we got what we asked to be debited return requestedDuration, nil } @@ -144,7 +144,7 @@ func (self *SMGSession) refund(refundDuration time.Duration) error { if refundDuration == 0 { // Nothing to refund return nil } - firstCC := self.callCosts[0] // use merged cc (from close function) + firstCC := self.CallCosts[0] // use merged cc (from close function) firstCC.Timespans.Decompress() defer firstCC.Timespans.Compress() var refundIncrements engine.Increments @@ -185,8 +185,8 @@ func (self *SMGSession) refund(refundDuration time.Duration) error { if len(refundIncrements) > 0 { cd := firstCC.CreateCallDescriptor() cd.Increments = refundIncrements - cd.CgrID = self.cd.CgrID - cd.RunID = self.cd.RunID + cd.CgrID = self.CD.CgrID + cd.RunID = self.CD.RunID cd.Increments.Compress() var response float64 err := self.rater.Call("Responder.RefundIncrements", cd, &response) @@ -202,9 +202,9 @@ func (self *SMGSession) refund(refundDuration time.Duration) error { // Session has ended, check debits and refund the extra charged duration func (self *SMGSession) close(endTime time.Time) error { - if len(self.callCosts) != 0 { // We have had at least one cost calculation - firstCC := self.callCosts[0] - for _, cc := range self.callCosts[1:] { + if len(self.CallCosts) != 0 { // We have had at least one cost calculation + firstCC := self.CallCosts[0] + for _, cc := range self.CallCosts[1:] { firstCC.Merge(cc) } end := firstCC.GetEndTime() @@ -220,7 +220,7 @@ func (self *SMGSession) disconnectSession(reason string) error { return errors.New("Calling SMGClientV1.DisconnectSession requires bidirectional JSON connection") } var reply string - if err := self.clntConn.Call("SMGClientV1.DisconnectSession", utils.AttrDisconnectSession{EventStart: self.eventStart, Reason: reason}, &reply); err != nil { + if err := self.clntConn.Call("SMGClientV1.DisconnectSession", utils.AttrDisconnectSession{EventStart: self.EventStart, Reason: reason}, &reply); err != nil { return err } else if reply != utils.OK { return errors.New(fmt.Sprintf("Unexpected disconnect reply: %s", reply)) @@ -232,16 +232,16 @@ func (self *SMGSession) disconnectSession(reason string) error { // originID could have been changed from original event, hence passing as argument here // pass cc as the clone of original to avoid concurrency issues func (self *SMGSession) saveOperations(originID string) error { - if len(self.callCosts) == 0 { + if len(self.CallCosts) == 0 { return nil // There are no costs to save, ignore the operation } - cc := self.callCosts[0] // was merged in close method + cc := self.CallCosts[0] // was merged in close method cc.Round() roundIncrements := cc.GetRoundIncrements() if len(roundIncrements) != 0 { cd := cc.CreateCallDescriptor() - cd.CgrID = self.cd.CgrID - cd.RunID = self.cd.RunID + cd.CgrID = self.CD.CgrID + cd.RunID = self.CD.RunID cd.Increments = roundIncrements var response float64 if err := self.rater.Call("Responder.RefundRounding", cd, &response); err != nil { @@ -249,21 +249,21 @@ func (self *SMGSession) saveOperations(originID string) error { } } smCost := &engine.SMCost{ - CGRID: self.eventStart.GetCgrId(self.timezone), + CGRID: self.EventStart.GetCgrId(self.Timezone), CostSource: utils.SESSION_MANAGER_SOURCE, - RunID: self.runId, - OriginHost: self.eventStart.GetOriginatorIP(utils.META_DEFAULT), + RunID: self.RunID, + OriginHost: self.EventStart.GetOriginatorIP(utils.META_DEFAULT), OriginID: originID, - Usage: self.TotalUsage().Seconds(), + Usage: self.TotalUsage.Seconds(), CostDetails: cc, } if len(smCost.CostDetails.Timespans) > MaxTimespansInCost { // Merge since we will get a callCost too big if err := utils.Clone(cc, &smCost.CostDetails); err != nil { // Avoid concurrency on CC - utils.Logger.Err(fmt.Sprintf(" Could not clone callcost for sessionID: %s, runId: %s, error: %s", originID, self.runId, err.Error())) + utils.Logger.Err(fmt.Sprintf(" Could not clone callcost for sessionID: %s, RunID: %s, error: %s", originID, self.RunID, err.Error())) } go func(smCost *engine.SMCost) { // could take longer than the locked stage if err := self.storeSMCost(smCost); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not store callcost for sessionID: %s, runId: %s, error: %s", originID, self.runId, err.Error())) + utils.Logger.Err(fmt.Sprintf(" Could not store callcost for sessionID: %s, RunID: %s, error: %s", originID, self.RunID, err.Error())) } }(smCost) } else { @@ -281,7 +281,7 @@ func (self *SMGSession) storeSMCost(smCost *engine.SMCost) error { var reply string if err := self.cdrsrv.Call("CdrsV1.StoreSMCost", engine.AttrCDRSStoreSMCost{Cost: smCost, CheckDuplicate: true}, &reply); err != nil { if err == utils.ErrExists { - self.refund(self.cd.GetDuration()) // Refund entire duration + self.refund(self.CD.GetDuration()) // Refund entire duration } else { return err } @@ -289,42 +289,38 @@ func (self *SMGSession) storeSMCost(smCost *engine.SMCost) error { return nil } -func (self *SMGSession) TotalUsage() time.Duration { - return self.totalUsage -} - func (self *SMGSession) AsActiveSession(timezone string) *ActiveSession { - sTime, _ := self.eventStart.GetSetupTime(utils.META_DEFAULT, timezone) - aTime, _ := self.eventStart.GetAnswerTime(utils.META_DEFAULT, timezone) - pdd, _ := self.eventStart.GetPdd(utils.META_DEFAULT) + sTime, _ := self.EventStart.GetSetupTime(utils.META_DEFAULT, timezone) + aTime, _ := self.EventStart.GetAnswerTime(utils.META_DEFAULT, timezone) + pdd, _ := self.EventStart.GetPdd(utils.META_DEFAULT) aSession := &ActiveSession{ - CgrId: self.eventStart.GetCgrId(timezone), - TOR: self.eventStart.GetTOR(utils.META_DEFAULT), - RunId: self.runId, - OriginID: self.eventStart.GetUUID(), - CdrHost: self.eventStart.GetOriginatorIP(utils.META_DEFAULT), - CdrSource: self.eventStart.GetCdrSource(), - ReqType: self.eventStart.GetReqType(utils.META_DEFAULT), - Direction: self.eventStart.GetDirection(utils.META_DEFAULT), - Tenant: self.eventStart.GetTenant(utils.META_DEFAULT), - Category: self.eventStart.GetCategory(utils.META_DEFAULT), - Account: self.eventStart.GetAccount(utils.META_DEFAULT), - Subject: self.eventStart.GetSubject(utils.META_DEFAULT), - Destination: self.eventStart.GetDestination(utils.META_DEFAULT), + CgrId: self.EventStart.GetCgrId(timezone), + TOR: self.EventStart.GetTOR(utils.META_DEFAULT), + RunID: self.RunID, + OriginID: self.EventStart.GetUUID(), + CdrHost: self.EventStart.GetOriginatorIP(utils.META_DEFAULT), + CdrSource: self.EventStart.GetCdrSource(), + ReqType: self.EventStart.GetReqType(utils.META_DEFAULT), + Direction: self.EventStart.GetDirection(utils.META_DEFAULT), + Tenant: self.EventStart.GetTenant(utils.META_DEFAULT), + Category: self.EventStart.GetCategory(utils.META_DEFAULT), + Account: self.EventStart.GetAccount(utils.META_DEFAULT), + Subject: self.EventStart.GetSubject(utils.META_DEFAULT), + Destination: self.EventStart.GetDestination(utils.META_DEFAULT), SetupTime: sTime, AnswerTime: aTime, - Usage: self.TotalUsage(), + Usage: self.TotalUsage, Pdd: pdd, - ExtraFields: self.eventStart.GetExtraFields(), - Supplier: self.eventStart.GetSupplier(utils.META_DEFAULT), + ExtraFields: self.EventStart.GetExtraFields(), + Supplier: self.EventStart.GetSupplier(utils.META_DEFAULT), SMId: "CGR-DA", } - if self.cd != nil { - aSession.LoopIndex = self.cd.LoopIndex - aSession.DurationIndex = self.cd.DurationIndex - aSession.MaxRate = self.cd.MaxRate - aSession.MaxRateUnit = self.cd.MaxRateUnit - aSession.MaxCostSoFar = self.cd.MaxCostSoFar + if self.CD != nil { + aSession.LoopIndex = self.CD.LoopIndex + aSession.DurationIndex = self.CD.DurationIndex + aSession.MaxRate = self.CD.MaxRate + aSession.MaxRateUnit = self.CD.MaxRateUnit + aSession.MaxCostSoFar = self.CD.MaxCostSoFar } return aSession } diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index b8a7ca979..0f942aaa7 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -94,12 +94,12 @@ func (smg *SMGeneric) ttlTerminate(s *SMGSession, tmtr *smgSessionTerminator) { if tmtr.ttlUsage != nil { debitUsage = *tmtr.ttlUsage } - for _, s := range smg.getASession(s.eventStart.GetUUID()) { + for _, s := range smg.getASession(s.EventStart.GetUUID()) { s.debit(debitUsage, tmtr.ttlLastUsed) } - smg.sessionEnd(s.eventStart.GetUUID(), s.TotalUsage()) - cdr := s.eventStart.AsStoredCdr(smg.cgrCfg, smg.timezone) - cdr.Usage = s.TotalUsage() + smg.sessionEnd(s.EventStart.GetUUID(), s.TotalUsage) + cdr := s.EventStart.AsStoredCdr(smg.cgrCfg, smg.timezone) + cdr.Usage = s.TotalUsage var reply string smg.cdrsrv.Call("CdrsV1.ProcessCDR", cdr, &reply) } @@ -110,7 +110,7 @@ func (smg *SMGeneric) recordASession(uuid string, s *SMGSession) { if smg.cgrCfg.SmGenericConfig.SessionTTL != 0 { if _, found := smg.sessionTerminators[uuid]; !found { ttl := smg.cgrCfg.SmGenericConfig.SessionTTL - if ttlEv := s.eventStart.GetSessionTTL(); ttlEv != 0 { + if ttlEv := s.EventStart.GetSessionTTL(); ttlEv != 0 { ttl = ttlEv } timer := time.NewTimer(ttl) @@ -119,8 +119,8 @@ func (smg *SMGeneric) recordASession(uuid string, s *SMGSession) { timer: timer, endChan: endChan, ttl: ttl, - ttlLastUsed: s.eventStart.GetSessionTTLLastUsed(), - ttlUsage: s.eventStart.GetSessionTTLUsage(), + ttlLastUsed: s.EventStart.GetSessionTTLLastUsed(), + ttlUsage: s.EventStart.GetSessionTTLUsage(), } smg.sessionTerminators[uuid] = terminator go func() { @@ -158,7 +158,7 @@ func (smg *SMGeneric) unrecordASession(uuid string) bool { func (smg *SMGeneric) indexASession(uuid string, s *SMGSession) bool { smg.aSIMux.Lock() defer smg.aSIMux.Unlock() - ev := s.eventStart + ev := s.EventStart for _, fieldName := range smg.cgrCfg.SmGenericConfig.SessionIndexes { fieldVal, err := utils.ReflectFieldAsString(ev, fieldName, "") if err != nil { @@ -269,8 +269,8 @@ func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, clntConn rpcclient.Rp } stopDebitChan := make(chan struct{}) for _, sessionRun := range sessionRuns { - s := &SMGSession{eventStart: evStart, runId: sessionRun.DerivedCharger.RunID, timezone: smg.timezone, - rater: smg.rater, cdrsrv: smg.cdrsrv, cd: sessionRun.CallDescriptor, clntConn: clntConn} + s := &SMGSession{EventStart: evStart, RunID: sessionRun.DerivedCharger.RunID, Timezone: smg.timezone, + rater: smg.rater, cdrsrv: smg.cdrsrv, CD: sessionRun.CallDescriptor, clntConn: clntConn} smg.recordASession(sessionId, s) //utils.Logger.Info(fmt.Sprintf(" Starting session: %s, runId: %s", sessionId, s.runId)) if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 { @@ -298,21 +298,21 @@ func (smg *SMGeneric) sessionEnd(sessionId string, usage time.Duration) error { return nil, nil // Did not find the session so no need to close it anymore } for idx, s := range ss { - s.totalUsage = usage // save final usage as totalUsage + s.TotalUsage = usage // save final usage as totalUsage if idx == 0 && s.stopDebit != nil { close(s.stopDebit) // Stop automatic debits } - aTime, err := s.eventStart.GetAnswerTime(utils.META_DEFAULT, smg.cgrCfg.DefaultTimezone) + aTime, err := s.EventStart.GetAnswerTime(utils.META_DEFAULT, smg.cgrCfg.DefaultTimezone) if err != nil || aTime.IsZero() { utils.Logger.Err(fmt.Sprintf(" Could not retrieve answer time for session: %s, runId: %s, aTime: %+v, error: %v", - sessionId, s.runId, aTime, err)) + sessionId, s.RunID, aTime, err)) continue // Unanswered session } if err := s.close(aTime.Add(usage)); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not close session: %s, runId: %s, error: %s", sessionId, s.runId, err.Error())) + utils.Logger.Err(fmt.Sprintf(" Could not close session: %s, runId: %s, error: %s", sessionId, s.RunID, err.Error())) } if err := s.saveOperations(sessionId); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not save session: %s, runId: %s, error: %s", sessionId, s.runId, err.Error())) + utils.Logger.Err(fmt.Sprintf(" Could not save session: %s, runId: %s, error: %s", sessionId, s.RunID, err.Error())) } } return nil, nil @@ -335,7 +335,7 @@ func (smg *SMGeneric) sessionRelocate(sessionID, initialID string) error { return nil, utils.ErrNotFound } for i, s := range ss { - s.eventStart[utils.ACCID] = sessionID // Overwrite initialSessionID with new one + s.EventStart[utils.ACCID] = sessionID // Overwrite initialSessionID with new one smg.recordASession(sessionID, s) if i == 0 { smg.unrecordASession(initialID) @@ -477,7 +477,7 @@ func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, clnt rpcclient.RpcCli } hasActiveSession = true if errUsage != nil { - usage = s.TotalUsage() - s.lastUsage + lastUsed + usage = s.TotalUsage - s.LastUsage + lastUsed } if err := smg.sessionEnd(sessionID, usage); err != nil { interimError = err // Last error will be the one returned as API result @@ -629,7 +629,7 @@ func (smg *SMGeneric) ActiveSessions(fltrs map[string]string, count bool) (aSess } if len(fltrs) != 0 { // Still have some filters to match for i := 0; i < len(remainingSessions); { - sMp, err := remainingSessions[i].eventStart.AsMapStringString() + sMp, err := remainingSessions[i].EventStart.AsMapStringString() if err != nil { return nil, 0, err } diff --git a/sessionmanager/smgeneric_test.go b/sessionmanager/smgeneric_test.go index e685e1f91..76f51bfc7 100644 --- a/sessionmanager/smgeneric_test.go +++ b/sessionmanager/smgeneric_test.go @@ -59,7 +59,7 @@ func TestSMGSessionIndexing(t *testing.T) { "Extra3": "", } // Index first session - smgSession := &SMGSession{eventStart: smGev} + smgSession := &SMGSession{EventStart: smGev} uuid := smGev.GetUUID() smg.indexASession(uuid, smgSession) eIndexes := map[string]map[string]utils.StringMap{ @@ -99,7 +99,7 @@ func TestSMGSessionIndexing(t *testing.T) { "Extra4": "info2", } uuid2 := smGev2.GetUUID() - smgSession2 := &SMGSession{eventStart: smGev2} + smgSession2 := &SMGSession{EventStart: smGev2} smg.indexASession(uuid2, smgSession2) eIndexes = map[string]map[string]utils.StringMap{ "Tenant": map[string]utils.StringMap{ @@ -190,7 +190,7 @@ func TestSMGActiveSessions(t *testing.T) { "Extra2": 5, "Extra3": "", } - smg.recordASession(smGev1.GetUUID(), &SMGSession{eventStart: smGev1}) + smg.recordASession(smGev1.GetUUID(), &SMGSession{EventStart: smGev1}) smGev2 := SMGenericEvent{ utils.EVENT_NAME: "TEST_EVENT", utils.TOR: "*voice", @@ -211,7 +211,7 @@ func TestSMGActiveSessions(t *testing.T) { "Extra1": "Value1", "Extra3": "extra3", } - smg.recordASession(smGev2.GetUUID(), &SMGSession{eventStart: smGev2}) + smg.recordASession(smGev2.GetUUID(), &SMGSession{EventStart: smGev2}) if aSessions, _, err := smg.ActiveSessions(nil, false); err != nil { t.Error(err) } else if len(aSessions) != 2 { From 010ab252dfa787520ee3337d30d82f7a2321885b Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 27 Oct 2016 12:26:25 +0200 Subject: [PATCH 2/4] SMGenericV.SetPassiveSession --- sessionmanager/smgeneric.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index 0f942aaa7..b90b63fee 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -659,6 +659,27 @@ func (smg *SMGeneric) ActiveSessions(fltrs map[string]string, count bool) (aSess return } +// SetPassiveSession will add or set a previously found session in passive session list +func (smg *SMGeneric) setPassiveSession(s *SMGSession) { + smg.pSessionsMux.Lock() + if ss, hasID := smg.passiveSessions[s.EventStart.GetUUID()]; !hasID { + smg.passiveSessions[s.EventStart.GetUUID()] = []*SMGSession{s} + } else { + var exists bool + for i, oldS := range ss { + if oldS.RunID == s.RunID { + smg.passiveSessions[s.EventStart.GetUUID()][i] = s + exists = true + break + } + } + if !exists { + smg.passiveSessions[s.EventStart.GetUUID()] = append(smg.passiveSessions[s.EventStart.GetUUID()], s) + } + } + smg.pSessionsMux.Unlock() +} + func (smg *SMGeneric) Timezone() string { return smg.timezone } @@ -797,3 +818,10 @@ func (smg *SMGeneric) BiRPCV1ActiveSessionsCount(attrs utils.AttrSMGGetActiveSes } return nil } + +// BiRPCV1SetPassiveSession used for replicating SMGSessions +func (smg *SMGeneric) BiRPCV1SetPassiveSession(s SMGSession, reply *string) error { + smg.setPassiveSession(&s) + *reply = utils.OK + return nil +} From cdf8960a42cb25959b4383379ad2343bc28b8949 Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 27 Oct 2016 14:02:33 +0200 Subject: [PATCH 3/4] SMGeneric TestSetPassiveSession --- sessionmanager/smgeneric_test.go | 104 +++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/sessionmanager/smgeneric_test.go b/sessionmanager/smgeneric_test.go index 76f51bfc7..e2317ef1e 100644 --- a/sessionmanager/smgeneric_test.go +++ b/sessionmanager/smgeneric_test.go @@ -238,3 +238,107 @@ func TestSMGActiveSessions(t *testing.T) { t.Errorf("Received sessions: %+v", aSessions) } } + +func TestSetPassiveSession(t *testing.T) { + smg := NewSMGeneric(smgCfg, nil, nil, "UTC") + smGev := SMGenericEvent{ + utils.EVENT_NAME: "TEST_EVENT", + utils.TOR: "*voice", + utils.ACCID: "12345", + utils.DIRECTION: "*out", + utils.ACCOUNT: "account1", + utils.SUBJECT: "subject1", + utils.DESTINATION: "+4986517174963", + utils.CATEGORY: "call", + utils.TENANT: "cgrates.org", + utils.REQTYPE: "*prepaid", + utils.SETUP_TIME: "2015-11-09 14:21:24", + utils.ANSWER_TIME: "2015-11-09 14:22:02", + utils.USAGE: "1m23s", + utils.LastUsed: "21s", + utils.PDD: "300ms", + utils.SUPPLIER: "supplier1", + utils.DISCONNECT_CAUSE: "NORMAL_DISCONNECT", + utils.CDRHOST: "127.0.0.1", + "Extra1": "Value1", + "Extra2": 5, + "Extra3": "", + } + // Index first session + smgSession := &SMGSession{EventStart: smGev, RunID: utils.META_DEFAULT} + if len(smg.passiveSessions) != 0 { + t.Errorf("PassiveSessions: %+v", smg.passiveSessions) + } + smg.setPassiveSession(smgSession) + if ss, hasIt := smg.passiveSessions[smGev.GetUUID()]; !hasIt || len(smg.passiveSessions) != 1 || len(ss) != 1 { + t.Errorf("PassiveSessions: %+v", smg.passiveSessions) + } + // Update session + smGev = SMGenericEvent{ + utils.EVENT_NAME: "TEST_EVENT", + utils.TOR: "*voice", + utils.ACCID: "12345", + utils.DIRECTION: "*out", + utils.ACCOUNT: "account1", + utils.SUBJECT: "subject1", + utils.DESTINATION: "+4986517174963", + utils.CATEGORY: "call", + utils.TENANT: "cgrates.org", + utils.REQTYPE: "*prepaid", + utils.SETUP_TIME: "2015-11-09 14:21:24", + utils.ANSWER_TIME: "2015-11-09 14:22:02", + utils.USAGE: "2m33s", + utils.LastUsed: "21s", + utils.PDD: "300ms", + utils.SUPPLIER: "supplier1", + utils.DISCONNECT_CAUSE: "NORMAL_DISCONNECT", + utils.CDRHOST: "127.0.0.1", + "Extra1": "Value1", + "Extra2": 5, + "Extra3": "", + } + smgSession = &SMGSession{EventStart: smGev, RunID: utils.META_DEFAULT} + smg.setPassiveSession(smgSession) // Should only update in place + if ss, hasIt := smg.passiveSessions[smGev.GetUUID()]; !hasIt || len(smg.passiveSessions) != 1 || len(ss) != 1 { + t.Errorf("PassiveSessions: %+v", smg.passiveSessions) + } else if ss[0].EventStart[utils.USAGE] != "2m33s" { + t.Errorf("SMGSession.EventStart: %+v", ss[0].EventStart[utils.USAGE]) + } + // Second run + smgSession = &SMGSession{EventStart: smGev, RunID: "second_test"} + smg.setPassiveSession(smgSession) + if ss, hasIt := smg.passiveSessions[smGev.GetUUID()]; !hasIt || len(smg.passiveSessions) != 1 || len(ss) != 2 { + t.Errorf("PassiveSessions: %+v", smg.passiveSessions) + } + // Update session + smGev = SMGenericEvent{ + utils.EVENT_NAME: "TEST_EVENT", + utils.TOR: "*voice", + utils.ACCID: "22345", + utils.DIRECTION: "*out", + utils.ACCOUNT: "account1", + utils.SUBJECT: "subject1", + utils.DESTINATION: "+4986517174963", + utils.CATEGORY: "call", + utils.TENANT: "cgrates.org", + utils.REQTYPE: "*prepaid", + utils.SETUP_TIME: "2015-11-09 14:21:24", + utils.ANSWER_TIME: "2015-11-09 14:22:02", + utils.USAGE: "2m33s", + utils.LastUsed: "21s", + utils.PDD: "300ms", + utils.SUPPLIER: "supplier1", + utils.DISCONNECT_CAUSE: "NORMAL_DISCONNECT", + utils.CDRHOST: "127.0.0.1", + "Extra1": "Value1", + "Extra2": 5, + "Extra3": "", + } + smgSession = &SMGSession{EventStart: smGev, RunID: utils.META_DEFAULT} + smg.setPassiveSession(smgSession) + if ss, hasIt := smg.passiveSessions[smGev.GetUUID()]; !hasIt || len(smg.passiveSessions) != 2 || len(ss) != 1 { + t.Errorf("PassiveSessions: %+v", smg.passiveSessions) + } else if ss[0].EventStart[utils.USAGE] != "2m33s" { + t.Errorf("SMGSession.EventStart: %+v", ss[0].EventStart[utils.USAGE]) + } +} From 1cb614dc3f7ba8ec633710570d8727c50fd1a8bf Mon Sep 17 00:00:00 2001 From: Eduard Tamsa Date: Thu, 27 Oct 2016 17:00:38 +0200 Subject: [PATCH 4/4] Update CONTRIBUTORS.md --- CONTRIBUTORS.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 2b41cf331..0779934a2 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -51,6 +51,7 @@ information, please see the [`CONTRIBUTING.md`](CONTRIBUTING.md) file. | @brendangilmore | Brendan Gilmore | | @afone-lboue | Ludovic Boué | | @shaneneuerburg | Shane Neuerburg | +| @Edwardro22 | Eduard Tamşa |