first draft of stale sessions protection

This commit is contained in:
Radu Ioan Fericean
2015-06-09 14:37:48 +03:00
parent f454707445
commit ab9280dbe5
3 changed files with 85 additions and 58 deletions

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"errors"
"fmt"
"io/ioutil"
"net/http"
@@ -106,8 +107,15 @@ type CallCostLog struct {
}
// RPC method, used to log callcosts to db
func (self *CdrServer) LogCallCost(ccl *CallCostLog) error {
return self.cdrDb.LogCallCost(ccl.CgrId, ccl.Source, ccl.RunId, ccl.CallCost)
func (self *CdrServer) LogCallCost(ccl *CallCostLog) (time.Duration, error) {
cc, err := self.cdrDb.GetCallCostLog(ccl.CgrId, ccl.Source, ccl.RunId)
if err != nil {
return 0, err
}
if cc != nil {
return cc.GetDuration(), errors.New("duplicate record")
}
return 0, self.cdrDb.LogCallCost(ccl.CgrId, ccl.Source, ccl.RunId, ccl.CallCost)
}
// Called by rate/re-rate API

View File

@@ -236,11 +236,12 @@ func (rs *Responder) ProcessCdr(cdr *StoredCdr, reply *string) error {
return nil
}
func (rs *Responder) LogCallCost(ccl *CallCostLog, reply *int) error {
func (rs *Responder) LogCallCost(ccl *CallCostLog, reply *int64) error {
if rs.CdrSrv == nil {
return errors.New("CDR_SERVER_NOT_RUNNING")
}
if err := rs.CdrSrv.LogCallCost(ccl); err != nil {
if duration, err := rs.CdrSrv.LogCallCost(ccl); err != nil {
*reply = int64(duration)
return err
}
*reply = 0
@@ -414,7 +415,7 @@ type Connector interface {
GetDerivedMaxSessionTime(StoredCdr, *float64) error
GetSessionRuns(StoredCdr, *[]*SessionRun) error
ProcessCdr(*StoredCdr, *string) error
LogCallCost(*CallCostLog, *int) error
LogCallCost(*CallCostLog, *int64) error
GetLCR(*CallDescriptor, *LCRCost) error
}
@@ -458,7 +459,7 @@ func (rcc *RPCClientConnector) ProcessCdr(cdr *StoredCdr, reply *string) error {
return rcc.Client.Call("CDRSV1.ProcessCdr", cdr, reply)
}
func (rcc *RPCClientConnector) LogCallCost(ccl *CallCostLog, reply *int) error {
func (rcc *RPCClientConnector) LogCallCost(ccl *CallCostLog, reply *int64) error {
return rcc.Client.Call("CDRSV1.LogCallCost", ccl, reply)
}

View File

@@ -135,59 +135,67 @@ func (s *Session) Close(ev engine.Event) error {
return err
}
hangupTime := startTime.Add(duration)
end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd
refundDuration := end.Sub(hangupTime)
var refundIncrements engine.Increments
for i := len(lastCC.Timespans) - 1; i >= 0; i-- {
ts := lastCC.Timespans[i]
tsDuration := ts.GetDuration()
if refundDuration <= tsDuration {
lastRefundedIncrementIndex := 0
for j := len(ts.Increments) - 1; j >= 0; j-- {
increment := ts.Increments[j]
if increment.Duration <= refundDuration {
refundIncrements = append(refundIncrements, increment)
refundDuration -= increment.Duration
lastRefundedIncrementIndex = j
}
}
ts.SplitByIncrement(lastRefundedIncrementIndex)
break // do not go to other timespans
} else {
refundIncrements = append(refundIncrements, ts.Increments...)
// remove the timespan entirely
lastCC.Timespans[i] = nil
lastCC.Timespans = lastCC.Timespans[:i]
// continue to the next timespan with what is left to refund
refundDuration -= tsDuration
}
err = s.Refund(lastCC, hangupTime)
if err != nil {
return err
}
// show only what was actualy refunded (stopped in timespan)
// engine.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration))
if len(refundIncrements) > 0 {
cd := &engine.CallDescriptor{
Direction: lastCC.Direction,
Tenant: lastCC.Tenant,
Category: lastCC.Category,
Subject: lastCC.Subject,
Account: lastCC.Account,
Destination: lastCC.Destination,
Increments: refundIncrements,
}
var response float64
err := s.sessionManager.Rater().RefundIncrements(*cd, &response)
if err != nil {
return err
}
}
cost := refundIncrements.GetTotalCost()
lastCC.Cost -= cost
lastCC.Timespans.Compress()
}
go s.SaveOperations()
return nil
}
func (s *Session) Refund(lastCC *engine.CallCost, hangupTime time.Time) error {
end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd
refundDuration := end.Sub(hangupTime)
var refundIncrements engine.Increments
for i := len(lastCC.Timespans) - 1; i >= 0; i-- {
ts := lastCC.Timespans[i]
tsDuration := ts.GetDuration()
if refundDuration <= tsDuration {
lastRefundedIncrementIndex := 0
for j := len(ts.Increments) - 1; j >= 0; j-- {
increment := ts.Increments[j]
if increment.Duration <= refundDuration {
refundIncrements = append(refundIncrements, increment)
refundDuration -= increment.Duration
lastRefundedIncrementIndex = j
}
}
ts.SplitByIncrement(lastRefundedIncrementIndex)
break // do not go to other timespans
} else {
refundIncrements = append(refundIncrements, ts.Increments...)
// remove the timespan entirely
lastCC.Timespans[i] = nil
lastCC.Timespans = lastCC.Timespans[:i]
// continue to the next timespan with what is left to refund
refundDuration -= tsDuration
}
}
// show only what was actualy refunded (stopped in timespan)
// engine.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration))
if len(refundIncrements) > 0 {
cd := &engine.CallDescriptor{
Direction: lastCC.Direction,
Tenant: lastCC.Tenant,
Category: lastCC.Category,
Subject: lastCC.Subject,
Account: lastCC.Account,
Destination: lastCC.Destination,
Increments: refundIncrements,
}
var response float64
err := s.sessionManager.Rater().RefundIncrements(*cd, &response)
if err != nil {
return err
}
}
cost := refundIncrements.GetTotalCost()
lastCC.Cost -= cost
lastCC.Timespans.Compress()
return nil
}
// Nice print for session
func (s *Session) String() string {
sDump, _ := json.Marshal(s)
@@ -204,14 +212,24 @@ func (s *Session) SaveOperations() {
for _, cc := range sr.CallCosts[1:] {
firstCC.Merge(cc)
}
var existingDuration int
s.sessionManager.CdrSrv().LogCallCost(&engine.CallCostLog{
var savedCallcostDuration int64
err := s.sessionManager.CdrSrv().LogCallCost(&engine.CallCostLog{
CgrId: s.eventStart.GetCgrId(),
Source: engine.SESSION_MANAGER_SOURCE,
RunId: sr.DerivedCharger.RunId,
CallCost: firstCC,
}, &existingDuration)
// on duplicate error refound extra from existing database
}, &savedCallcostDuration)
// on duplicate error refound extra period compared to existing database callcost
// this is a protection against the case when the close event is missed for some reason
// when the cdr arrives to cdrserver because our callcost is not there it will be rated
// as postpaid. When the close event finally arives we have to refund everything
if err != nil {
hangupTime := firstCC.Timespans[0].TimeStart.Add(time.Duration(savedCallcostDuration))
if savedCallcostDuration > 0 {
s.Refund(firstCC, hangupTime)
} else {
engine.Logger.Err(fmt.Sprintf("failed to log call cost: %v", err))
}
}
}
}