From ab9280dbe522da7afd9ee4d583edc2dee6dd4dfb Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Tue, 9 Jun 2015 14:37:48 +0300 Subject: [PATCH] first draft of stale sessions protection --- engine/cdrs.go | 12 +++- engine/responder.go | 9 +-- sessionmanager/session.go | 122 ++++++++++++++++++++++---------------- 3 files changed, 85 insertions(+), 58 deletions(-) diff --git a/engine/cdrs.go b/engine/cdrs.go index 95e5d0eb7..6abc6a98e 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -19,6 +19,7 @@ along with this program. If not, see package engine import ( + "errors" "fmt" "io/ioutil" "net/http" @@ -106,8 +107,15 @@ type CallCostLog struct { } // RPC method, used to log callcosts to db -func (self *CdrServer) LogCallCost(ccl *CallCostLog) error { - return self.cdrDb.LogCallCost(ccl.CgrId, ccl.Source, ccl.RunId, ccl.CallCost) +func (self *CdrServer) LogCallCost(ccl *CallCostLog) (time.Duration, error) { + cc, err := self.cdrDb.GetCallCostLog(ccl.CgrId, ccl.Source, ccl.RunId) + if err != nil { + return 0, err + } + if cc != nil { + return cc.GetDuration(), errors.New("duplicate record") + } + return 0, self.cdrDb.LogCallCost(ccl.CgrId, ccl.Source, ccl.RunId, ccl.CallCost) } // Called by rate/re-rate API diff --git a/engine/responder.go b/engine/responder.go index 603f23134..6257a99e9 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -236,11 +236,12 @@ func (rs *Responder) ProcessCdr(cdr *StoredCdr, reply *string) error { return nil } -func (rs *Responder) LogCallCost(ccl *CallCostLog, reply *int) error { +func (rs *Responder) LogCallCost(ccl *CallCostLog, reply *int64) error { if rs.CdrSrv == nil { return errors.New("CDR_SERVER_NOT_RUNNING") } - if err := rs.CdrSrv.LogCallCost(ccl); err != nil { + if duration, err := rs.CdrSrv.LogCallCost(ccl); err != nil { + *reply = int64(duration) return err } *reply = 0 @@ -414,7 +415,7 @@ type Connector interface { GetDerivedMaxSessionTime(StoredCdr, *float64) error GetSessionRuns(StoredCdr, *[]*SessionRun) error ProcessCdr(*StoredCdr, *string) error - LogCallCost(*CallCostLog, *int) error + LogCallCost(*CallCostLog, *int64) error GetLCR(*CallDescriptor, *LCRCost) error } @@ -458,7 +459,7 @@ func (rcc *RPCClientConnector) ProcessCdr(cdr *StoredCdr, reply *string) error { return rcc.Client.Call("CDRSV1.ProcessCdr", cdr, reply) } -func (rcc *RPCClientConnector) LogCallCost(ccl *CallCostLog, reply *int) error { +func (rcc *RPCClientConnector) LogCallCost(ccl *CallCostLog, reply *int64) error { return rcc.Client.Call("CDRSV1.LogCallCost", ccl, reply) } diff --git a/sessionmanager/session.go b/sessionmanager/session.go index 90d65b6f4..78e1a020d 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -135,59 +135,67 @@ func (s *Session) Close(ev engine.Event) error { return err } hangupTime := startTime.Add(duration) - end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd - refundDuration := end.Sub(hangupTime) - var refundIncrements engine.Increments - for i := len(lastCC.Timespans) - 1; i >= 0; i-- { - ts := lastCC.Timespans[i] - tsDuration := ts.GetDuration() - if refundDuration <= tsDuration { - lastRefundedIncrementIndex := 0 - for j := len(ts.Increments) - 1; j >= 0; j-- { - increment := ts.Increments[j] - if increment.Duration <= refundDuration { - refundIncrements = append(refundIncrements, increment) - refundDuration -= increment.Duration - lastRefundedIncrementIndex = j - } - } - ts.SplitByIncrement(lastRefundedIncrementIndex) - break // do not go to other timespans - } else { - refundIncrements = append(refundIncrements, ts.Increments...) - // remove the timespan entirely - lastCC.Timespans[i] = nil - lastCC.Timespans = lastCC.Timespans[:i] - // continue to the next timespan with what is left to refund - refundDuration -= tsDuration - } + err = s.Refund(lastCC, hangupTime) + if err != nil { + return err } - // show only what was actualy refunded (stopped in timespan) - // engine.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration)) - if len(refundIncrements) > 0 { - cd := &engine.CallDescriptor{ - Direction: lastCC.Direction, - Tenant: lastCC.Tenant, - Category: lastCC.Category, - Subject: lastCC.Subject, - Account: lastCC.Account, - Destination: lastCC.Destination, - Increments: refundIncrements, - } - var response float64 - err := s.sessionManager.Rater().RefundIncrements(*cd, &response) - if err != nil { - return err - } - } - cost := refundIncrements.GetTotalCost() - lastCC.Cost -= cost - lastCC.Timespans.Compress() } go s.SaveOperations() return nil } +func (s *Session) Refund(lastCC *engine.CallCost, hangupTime time.Time) error { + end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd + refundDuration := end.Sub(hangupTime) + var refundIncrements engine.Increments + for i := len(lastCC.Timespans) - 1; i >= 0; i-- { + ts := lastCC.Timespans[i] + tsDuration := ts.GetDuration() + if refundDuration <= tsDuration { + lastRefundedIncrementIndex := 0 + for j := len(ts.Increments) - 1; j >= 0; j-- { + increment := ts.Increments[j] + if increment.Duration <= refundDuration { + refundIncrements = append(refundIncrements, increment) + refundDuration -= increment.Duration + lastRefundedIncrementIndex = j + } + } + ts.SplitByIncrement(lastRefundedIncrementIndex) + break // do not go to other timespans + } else { + refundIncrements = append(refundIncrements, ts.Increments...) + // remove the timespan entirely + lastCC.Timespans[i] = nil + lastCC.Timespans = lastCC.Timespans[:i] + // continue to the next timespan with what is left to refund + refundDuration -= tsDuration + } + } + // show only what was actualy refunded (stopped in timespan) + // engine.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration)) + if len(refundIncrements) > 0 { + cd := &engine.CallDescriptor{ + Direction: lastCC.Direction, + Tenant: lastCC.Tenant, + Category: lastCC.Category, + Subject: lastCC.Subject, + Account: lastCC.Account, + Destination: lastCC.Destination, + Increments: refundIncrements, + } + var response float64 + err := s.sessionManager.Rater().RefundIncrements(*cd, &response) + if err != nil { + return err + } + } + cost := refundIncrements.GetTotalCost() + lastCC.Cost -= cost + lastCC.Timespans.Compress() + return nil +} + // Nice print for session func (s *Session) String() string { sDump, _ := json.Marshal(s) @@ -204,14 +212,24 @@ func (s *Session) SaveOperations() { for _, cc := range sr.CallCosts[1:] { firstCC.Merge(cc) } - var existingDuration int - s.sessionManager.CdrSrv().LogCallCost(&engine.CallCostLog{ + var savedCallcostDuration int64 + err := s.sessionManager.CdrSrv().LogCallCost(&engine.CallCostLog{ CgrId: s.eventStart.GetCgrId(), Source: engine.SESSION_MANAGER_SOURCE, RunId: sr.DerivedCharger.RunId, CallCost: firstCC, - }, &existingDuration) - // on duplicate error refound extra from existing database - + }, &savedCallcostDuration) + // on duplicate error refound extra period compared to existing database callcost + // this is a protection against the case when the close event is missed for some reason + // when the cdr arrives to cdrserver because our callcost is not there it will be rated + // as postpaid. When the close event finally arives we have to refund everything + if err != nil { + hangupTime := firstCC.Timespans[0].TimeStart.Add(time.Duration(savedCallcostDuration)) + if savedCallcostDuration > 0 { + s.Refund(firstCC, hangupTime) + } else { + engine.Logger.Err(fmt.Sprintf("failed to log call cost: %v", err)) + } + } } }