From c3347faf6c6b7322ef1db9c47e20c3118cfa80e2 Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 15 Sep 2016 20:45:43 +0200 Subject: [PATCH] Timespans.Merge to compress in SM --- .../asterisk/etc/asterisk/extensions.conf | 2 +- engine/timespans.go | 46 +++++++- engine/timespans_test.go | 111 +++++++++++++++++- sessionmanager/smg_session.go | 9 +- sessionmanager/smgeneric.go | 1 + 5 files changed, 156 insertions(+), 13 deletions(-) diff --git a/data/tutorials/asterisk_events/asterisk/etc/asterisk/extensions.conf b/data/tutorials/asterisk_events/asterisk/etc/asterisk/extensions.conf index 01de11c58..39fe72638 100755 --- a/data/tutorials/asterisk_events/asterisk/etc/asterisk/extensions.conf +++ b/data/tutorials/asterisk_events/asterisk/etc/asterisk/extensions.conf @@ -1,6 +1,6 @@ [internal] exten => _1XXX,1,NoOp() - same => n,Set(CGRMaxSessionTime=0); use it to disconnect automatically the call if bridge is not active + same => n,Set(CGRMaxSessionTime=0); use it to disconnect automatically the call if CGRateS is not active same => n,DumpChan() same => n,Stasis(cgrates_auth,cgr_reqtype=*prepaid,cgr_destination=${EXTEN}) same => n,Dial(PJSIP/${EXTEN},30,L(${CGRMaxSessionTime})) diff --git a/engine/timespans.go b/engine/timespans.go index 7380f83cf..cbd059309 100644 --- a/engine/timespans.go +++ b/engine/timespans.go @@ -18,8 +18,6 @@ along with this program. If not, see package engine import ( - //"fmt" - "reflect" "time" @@ -302,6 +300,19 @@ func (incs Increments) Equal(other Increments) bool { return true } +// Estimate whether the increments are the same, ignoring the CompressFactor +func (incs Increments) SharingSignature(other Increments) bool { + if len(other) < len(incs) { // Protect index in case of not being the same size + return false + } + for index, i := range incs { + if !i.Equal(other[index]) { + return false + } + } + return true +} + func (incs *Increments) Compress() { // must be pointer receiver var cIncrs Increments for _, incr := range *incs { @@ -740,9 +751,40 @@ func (ts *TimeSpan) Equal(other *TimeSpan) bool { ts.RatingPlanId == other.RatingPlanId } +// Estimate if they share charging signature +func (ts *TimeSpan) SharingSignature(other *TimeSpan) bool { + return ts.Increments.SharingSignature(other.Increments) && + ts.RateInterval.Equal(other.RateInterval) && + ts.MatchedSubject == other.MatchedSubject && + ts.MatchedPrefix == other.MatchedPrefix && + ts.MatchedDestId == other.MatchedDestId && + ts.RatingPlanId == other.RatingPlanId +} + func (ts *TimeSpan) GetCompressFactor() int { if ts.CompressFactor == 0 { ts.CompressFactor = 1 } return ts.CompressFactor } + +// Merges timespans if they share the same charging signature, useful to run in SM before compressing +func (ts *TimeSpan) Merge(other *TimeSpan) bool { + if !ts.SharingSignature(other) { + return false + } else if !ts.TimeEnd.Equal(other.TimeStart) { // other needs to continue ts for merge to be possible + return false + } + var otherCloned TimeSpan // Clone so we don't affect with decompress the original structure + if err := utils.Clone(*other, &otherCloned); err != nil { + return false + } + otherCloned.Increments.Decompress() + ts.Increments.Decompress() + ts.TimeEnd = otherCloned.TimeEnd + ts.Cost += otherCloned.Cost + ts.DurationIndex = otherCloned.DurationIndex + ts.Increments = append(ts.Increments, otherCloned.Increments...) + ts.Increments.Compress() + return true +} diff --git a/engine/timespans_test.go b/engine/timespans_test.go index 591644433..3090ab948 100644 --- a/engine/timespans_test.go +++ b/engine/timespans_test.go @@ -1609,12 +1609,10 @@ func TestTSMultipleIncrementsCompressDecompress(t *testing.T) { }, } tss.Compress() - tss.Compress() if len(tss[0].Increments) != 3 { t.Error("Error compressing timespan: ", tss[0].Increments) } tss.Decompress() - tss.Decompress() if len(tss[0].Increments) != 5 { t.Error("Error decompressing timespans: ", tss[0].Increments) } @@ -1850,3 +1848,112 @@ func TestTSDifferentCompressDecompress(t *testing.T) { t.Error("Error decompressing timespans: ", tss) } } + +func TestTSMerge(t *testing.T) { + tss1 := &TimeSpan{ + TimeStart: time.Date(2015, 1, 9, 16, 18, 0, 0, time.UTC), + TimeEnd: time.Date(2015, 1, 9, 16, 19, 0, 0, time.UTC), + RateInterval: &RateInterval{ + Rating: &RIRate{ + RoundingMethod: utils.ROUNDING_MIDDLE, + RoundingDecimals: 2, + Rates: RateGroups{ + &Rate{ + Value: 2.0, + RateIncrement: 10 * time.Second, + }, + }, + }, + }, + Cost: 2, + DurationIndex: 1 * time.Minute, + Increments: Increments{ + &Increment{ + Duration: time.Minute, + Cost: 1, + BalanceInfo: &DebitInfo{ + Unit: &UnitInfo{UUID: "1", DestinationID: "1", Consumed: 2.3, TOR: utils.VOICE, RateInterval: &RateInterval{Rating: &RIRate{Rates: RateGroups{&Rate{GroupIntervalStart: 0, Value: 100, RateIncrement: 10 * time.Second, RateUnit: time.Second}}}}}, + Monetary: &MonetaryInfo{UUID: "2"}, + AccountID: "3"}, + }, + &Increment{ + Duration: time.Minute, + Cost: 1, + BalanceInfo: &DebitInfo{ + Unit: &UnitInfo{UUID: "1", DestinationID: "1", Consumed: 2.3, TOR: utils.VOICE, RateInterval: &RateInterval{Rating: &RIRate{Rates: RateGroups{&Rate{GroupIntervalStart: 0, Value: 100, RateIncrement: 10 * time.Second, RateUnit: time.Second}}}}}, + Monetary: &MonetaryInfo{UUID: "2"}, + AccountID: "3"}, + }, + }, + } + tss2 := &TimeSpan{ + TimeStart: time.Date(2015, 1, 9, 16, 19, 0, 0, time.UTC), + TimeEnd: time.Date(2015, 1, 9, 16, 20, 0, 0, time.UTC), + RateInterval: &RateInterval{ + Rating: &RIRate{ + RoundingMethod: utils.ROUNDING_MIDDLE, + RoundingDecimals: 2, + Rates: RateGroups{ + &Rate{ + Value: 2.0, + RateIncrement: 10 * time.Second, + }, + }, + }, + }, + Cost: 2, + DurationIndex: 2 * time.Minute, + Increments: Increments{ + &Increment{ + Duration: time.Minute, + Cost: 1, + BalanceInfo: &DebitInfo{ + Unit: &UnitInfo{UUID: "1", DestinationID: "1", Consumed: 2.3, TOR: utils.VOICE, RateInterval: &RateInterval{Rating: &RIRate{Rates: RateGroups{&Rate{GroupIntervalStart: 0, Value: 100, RateIncrement: 10 * time.Second, RateUnit: time.Second}}}}}, + Monetary: &MonetaryInfo{UUID: "2"}, + AccountID: "3"}, + }, + &Increment{ + Duration: time.Minute, + Cost: 1, + BalanceInfo: &DebitInfo{ + Unit: &UnitInfo{UUID: "1", DestinationID: "1", Consumed: 2.3, TOR: utils.VOICE, RateInterval: &RateInterval{Rating: &RIRate{Rates: RateGroups{&Rate{GroupIntervalStart: 0, Value: 100, RateIncrement: 10 * time.Second, RateUnit: time.Second}}}}}, + Monetary: &MonetaryInfo{UUID: "2"}, + AccountID: "3"}, + }, + }, + } + eMergedTSS := &TimeSpan{ + TimeStart: time.Date(2015, 1, 9, 16, 18, 0, 0, time.UTC), + TimeEnd: time.Date(2015, 1, 9, 16, 20, 0, 0, time.UTC), + RateInterval: &RateInterval{ + Rating: &RIRate{ + RoundingMethod: utils.ROUNDING_MIDDLE, + RoundingDecimals: 2, + Rates: RateGroups{ + &Rate{ + Value: 2.0, + RateIncrement: 10 * time.Second, + }, + }, + }, + }, + Cost: 4, + DurationIndex: 2 * time.Minute, + Increments: Increments{ + &Increment{ + Duration: time.Minute, + Cost: 1, + BalanceInfo: &DebitInfo{ + Unit: &UnitInfo{UUID: "1", DestinationID: "1", Consumed: 2.3, TOR: utils.VOICE, RateInterval: &RateInterval{Rating: &RIRate{Rates: RateGroups{&Rate{GroupIntervalStart: 0, Value: 100, RateIncrement: 10 * time.Second, RateUnit: time.Second}}}}}, + Monetary: &MonetaryInfo{UUID: "2"}, + AccountID: "3"}, + CompressFactor: 4, + }, + }, + } + if merged := tss1.Merge(tss2); !merged { + t.Error("Not merged") + } else if !tss1.Equal(eMergedTSS) { + t.Errorf("Expecting: %+v, received: %+v", eMergedTSS, tss1) + } +} diff --git a/sessionmanager/smg_session.go b/sessionmanager/smg_session.go index 6254a8a82..ed50b24b5 100644 --- a/sessionmanager/smg_session.go +++ b/sessionmanager/smg_session.go @@ -80,16 +80,12 @@ func (self *SMGSession) debitLoop(debitInterval time.Duration) { // Attempts to debit a duration, returns maximum duration which can be debitted or error func (self *SMGSession) debit(dur time.Duration, lastUsed *time.Duration) (time.Duration, error) { requestedDuration := dur - //utils.Logger.Debug(fmt.Sprintf("InitDur: %f, lastUsed: %f", requestedDuration.Seconds(), lastUsed.Seconds())) - //utils.Logger.Debug(fmt.Sprintf("TotalUsage: %f, extraDuration: %f", self.totalUsage.Seconds(), self.extraDuration.Seconds())) if lastUsed != nil { self.extraDuration = self.lastDebit - *lastUsed - //utils.Logger.Debug(fmt.Sprintf("ExtraDuration LastUsed: %f", self.extraDuration.Seconds())) if *lastUsed != self.lastUsage { // total usage correction self.totalUsage -= self.lastUsage self.totalUsage += *lastUsed - //utils.Logger.Debug(fmt.Sprintf("TotalUsage Correction: %f", self.totalUsage.Seconds())) } } // apply correction from previous run @@ -102,7 +98,6 @@ func (self *SMGSession) debit(dur time.Duration, lastUsed *time.Duration) (time. self.extraDuration -= dur return ccDuration, nil } - //utils.Logger.Debug(fmt.Sprintf("dur: %f", dur.Seconds())) initialExtraDuration := self.extraDuration self.extraDuration = 0 if self.cd.LoopIndex > 0 { @@ -152,6 +147,7 @@ func (self *SMGSession) refund(refundDuration time.Duration) error { //initialRefundDuration := refundDuration firstCC := self.callCosts[0] // use merged cc (from close function) firstCC.Timespans.Decompress() + defer firstCC.Timespans.Compress() var refundIncrements engine.Increments for i := len(firstCC.Timespans) - 1; i >= 0; i-- { ts := firstCC.Timespans[i] @@ -204,7 +200,6 @@ func (self *SMGSession) refund(refundDuration time.Duration) error { //firstCC.Cost -= refundIncrements.GetTotalCost() // use updateCost instead firstCC.UpdateCost() firstCC.UpdateRatedUsage() - firstCC.Timespans.Compress() return nil } @@ -215,7 +210,6 @@ func (self *SMGSession) close(endTime time.Time) error { for _, cc := range self.callCosts[1:] { firstCC.Merge(cc) } - //utils.Logger.Debug("MergedCC: " + utils.ToJSON(firstCC)) end := firstCC.GetEndTime() refundDuration := end.Sub(endTime) self.refund(refundDuration) @@ -250,7 +244,6 @@ func (self *SMGSession) saveOperations(originID string) error { } firstCC := self.callCosts[0] // was merged in close method firstCC.Round() - //utils.Logger.Debug("Saved CC: " + utils.ToJSON(firstCC)) roundIncrements := firstCC.GetRoundIncrements() if len(roundIncrements) != 0 { cd := firstCC.CreateCallDescriptor() diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index 5f1e798b7..1a1758ec5 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -300,6 +300,7 @@ func (self *SMGeneric) sessionEnd(sessionId string, usage time.Duration) error { if err != nil || aTime.IsZero() { utils.Logger.Err(fmt.Sprintf(" Could not retrieve answer time for session: %s, runId: %s, aTime: %+v, error: %v", sessionId, s.runId, aTime, err)) + continue // Unanswered session } if err := s.close(aTime.Add(usage)); err != nil { utils.Logger.Err(fmt.Sprintf(" Could not close session: %s, runId: %s, error: %s", sessionId, s.runId, err.Error()))