Postpaid calls no longer handled by session manager

This commit is contained in:
DanB
2014-06-16 11:26:05 +02:00
parent bc5a80fe25
commit 222602dfff
3 changed files with 84 additions and 117 deletions

View File

@@ -76,7 +76,7 @@ func (self *Mediator) getCostFromRater(storedCdr *utils.StoredCdr) (*engine.Call
TimeEnd: storedCdr.AnswerTime.Add(storedCdr.Usage),
DurationIndex: storedCdr.Usage,
}
if storedCdr.ReqType == utils.PSEUDOPREPAID {
if utils.IsSliceMember([]string{utils.PSEUDOPREPAID, utils.POSTPAID}, storedCdr.ReqType) {
err = self.connector.Debit(cd, cc)
} else {
err = self.connector.GetCost(cd, cc)
@@ -93,7 +93,7 @@ func (self *Mediator) getCostFromRater(storedCdr *utils.StoredCdr) (*engine.Call
func (self *Mediator) rateCDR(storedCdr *utils.StoredCdr) error {
var qryCC *engine.CallCost
var errCost error
if storedCdr.ReqType == utils.PREPAID || storedCdr.ReqType == utils.POSTPAID {
if storedCdr.ReqType == utils.PREPAID {
// Should be previously calculated and stored in DB
qryCC, errCost = self.getCostsFromDB(storedCdr.CgrId, storedCdr.MediationRunId)
} else {

View File

@@ -60,7 +60,7 @@ func (sm *FSSessionManager) Connect(cgrCfg *config.CGRConfig) (err error) {
return errors.New("Cannot connect to FreeSWITCH")
}
fsock.FS.ReadEvents()
return errors.New("stopped reading events")
return errors.New("<SessionManager> - Stopped reading events")
}
func (sm *FSSessionManager) createHandlers() (handlers map[string][]func(string)) {
@@ -103,11 +103,11 @@ func (sm *FSSessionManager) DisconnectSession(uuid string, notify string) {
// engine.Logger.Debug(fmt.Sprintf("Session: %+v", s.uuid))
_, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify))
if err != nil {
engine.Logger.Err(fmt.Sprintf("could not send disconect api notification to freeswitch: %v", err))
engine.Logger.Err(fmt.Sprintf("<SessionManager> Could not send disconect api notification to freeswitch: %v", err))
}
err = fsock.FS.SendMsgCmd(uuid, map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}) // without + sign
if err != nil {
engine.Logger.Err(fmt.Sprintf("could not send disconect msg to freeswitch: %v", err))
engine.Logger.Err(fmt.Sprintf("<SessionManager> Could not send disconect msg to freeswitch: %v", err))
}
return
}
@@ -136,11 +136,11 @@ func (sm *FSSessionManager) setMaxCallDuration(uuid string, maxDur time.Duration
func (sm *FSSessionManager) unparkCall(uuid, call_dest_nb, notify string) {
_, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify))
if err != nil {
engine.Logger.Err("could not send unpark api notification to freeswitch")
engine.Logger.Err("<SessionManager> Could not send unpark api notification to freeswitch")
}
_, err = fsock.FS.SendApiCmd(fmt.Sprintf("uuid_transfer %s %s\n\n", uuid, call_dest_nb))
if err != nil {
engine.Logger.Err("could not send unpark api call to freeswitch")
engine.Logger.Err("<SessionManager> Could not send unpark api call to freeswitch")
}
}
@@ -155,7 +155,7 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) {
Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)}
var dcs utils.DerivedChargers
if err := sm.connector.GetDerivedChargers(attrsDC, &dcs); err != nil {
engine.Logger.Err(fmt.Sprintf("OnPark: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error()))
engine.Logger.Err(fmt.Sprintf("<SessionManager> OnPark: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error()))
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) // We unpark on original destination
return
}
@@ -221,7 +221,7 @@ func (sm *FSSessionManager) OnChannelAnswer(ev Event) {
Direction: ev.GetDirection(utils.META_DEFAULT), Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)}
var dcs utils.DerivedChargers
if err := sm.connector.GetDerivedChargers(attrsDC, &dcs); err != nil {
engine.Logger.Err(fmt.Sprintf("OnAnswer: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error()))
engine.Logger.Err(fmt.Sprintf("<SessionManager> OnAnswer: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error()))
sm.DisconnectSession(ev.GetUUID(), SYSTEM_ERROR) // Disconnect the session since we are not able to process sessions
return
}
@@ -244,111 +244,81 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) {
Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)}
var dcs utils.DerivedChargers
if err := sm.connector.GetDerivedChargers(attrsDC, &dcs); err != nil {
engine.Logger.Err(fmt.Sprintf("OnHangup: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error()))
engine.Logger.Err(fmt.Sprintf("<SessionManager> OnHangup: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error()))
return
}
dcs, _ = dcs.AppendDefaultRun()
for idx, dc := range dcs {
if ev.GetReqType(dc.ReqTypeField) == utils.POSTPAID {
startTime, err := ev.GetAnswerTime(dc.AnswerTimeField)
if err != nil {
engine.Logger.Crit("Error parsing postpaid call start time from event")
return
}
duration, err := ev.GetDuration(dc.UsageField)
if err != nil {
engine.Logger.Crit(fmt.Sprintf("Error parsing call duration from event %s", err.Error()))
return
}
cd := engine.CallDescriptor{
Direction: ev.GetDirection(dc.DirectionField),
Tenant: ev.GetTenant(dc.TenantField),
Category: ev.GetCategory(dc.CategoryField),
Subject: ev.GetSubject(dc.SubjectField),
Account: ev.GetAccount(dc.AccountField),
LoopIndex: 0,
DurationIndex: duration,
Destination: ev.GetDestination(dc.DestinationField),
TimeStart: startTime,
TimeEnd: startTime.Add(duration),
}
cc := &engine.CallCost{}
err = sm.connector.Debit(cd, cc)
if err != nil {
engine.Logger.Err(fmt.Sprintf("Error making the general debit for postpaid call: %v", ev.GetUUID()))
return
}
s.sessionRuns[idx].callCosts = append(s.sessionRuns[idx].callCosts, cc)
} else if ev.GetReqType(dc.ReqTypeField) == utils.PREPAID { // Prepaid calls
if len(s.sessionRuns[idx].callCosts) == 0 {
continue // why would we have 0 callcosts
}
lastCC := s.sessionRuns[idx].callCosts[len(s.sessionRuns[idx].callCosts)-1]
lastCC.Timespans.Decompress()
// put credit back
startTime, err := ev.GetAnswerTime(dc.AnswerTimeField)
if err != nil {
engine.Logger.Crit("Error parsing prepaid call start time from event")
return
}
duration, err := ev.GetDuration(dc.UsageField)
if err != nil {
engine.Logger.Crit(fmt.Sprintf("Error parsing call duration from event %s", err.Error()))
return
}
hangupTime := startTime.Add(duration)
end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd
refundDuration := end.Sub(hangupTime)
var refundIncrements engine.Increments
for i := len(lastCC.Timespans) - 1; i >= 0; i-- {
ts := lastCC.Timespans[i]
tsDuration := ts.GetDuration()
if refundDuration <= tsDuration {
lastRefundedIncrementIndex := 0
for j := len(ts.Increments) - 1; j >= 0; j-- {
increment := ts.Increments[j]
if increment.Duration <= refundDuration {
refundIncrements = append(refundIncrements, increment)
refundDuration -= increment.Duration
lastRefundedIncrementIndex = j
}
}
ts.SplitByIncrement(lastRefundedIncrementIndex)
break // do not go to other timespans
} else {
refundIncrements = append(refundIncrements, ts.Increments...)
// remove the timespan entirely
lastCC.Timespans[i] = nil
lastCC.Timespans = lastCC.Timespans[:i]
// continue to the next timespan with what is left to refund
refundDuration -= tsDuration
}
}
// show only what was actualy refunded (stopped in timespan)
// engine.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration))
if len(refundIncrements) > 0 {
cd := &engine.CallDescriptor{
Direction: lastCC.Direction,
Tenant: lastCC.Tenant,
Category: lastCC.Category,
Subject: lastCC.Subject,
Account: lastCC.Account,
Destination: lastCC.Destination,
Increments: refundIncrements,
}
var response float64
err := sm.connector.RefundIncrements(*cd, &response)
if err != nil {
engine.Logger.Err(fmt.Sprintf("Debit cents failed: %v", err))
}
}
cost := refundIncrements.GetTotalCost()
lastCC.Cost -= cost
lastCC.Timespans.Compress()
// engine.Logger.Info(fmt.Sprintf("Rambursed %v cents", cost))
if ev.GetReqType(dc.ReqTypeField) != utils.PREPAID {
continue
}
if len(s.sessionRuns[idx].callCosts) == 0 {
continue // why would we have 0 callcosts
}
lastCC := s.sessionRuns[idx].callCosts[len(s.sessionRuns[idx].callCosts)-1]
lastCC.Timespans.Decompress()
// put credit back
startTime, err := ev.GetAnswerTime(dc.AnswerTimeField)
if err != nil {
engine.Logger.Crit("Error parsing prepaid call start time from event")
return
}
duration, err := ev.GetDuration(dc.UsageField)
if err != nil {
engine.Logger.Crit(fmt.Sprintf("Error parsing call duration from event %s", err.Error()))
return
}
hangupTime := startTime.Add(duration)
end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd
refundDuration := end.Sub(hangupTime)
var refundIncrements engine.Increments
for i := len(lastCC.Timespans) - 1; i >= 0; i-- {
ts := lastCC.Timespans[i]
tsDuration := ts.GetDuration()
if refundDuration <= tsDuration {
lastRefundedIncrementIndex := 0
for j := len(ts.Increments) - 1; j >= 0; j-- {
increment := ts.Increments[j]
if increment.Duration <= refundDuration {
refundIncrements = append(refundIncrements, increment)
refundDuration -= increment.Duration
lastRefundedIncrementIndex = j
}
}
ts.SplitByIncrement(lastRefundedIncrementIndex)
break // do not go to other timespans
} else {
refundIncrements = append(refundIncrements, ts.Increments...)
// remove the timespan entirely
lastCC.Timespans[i] = nil
lastCC.Timespans = lastCC.Timespans[:i]
// continue to the next timespan with what is left to refund
refundDuration -= tsDuration
}
}
// show only what was actualy refunded (stopped in timespan)
// engine.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration))
if len(refundIncrements) > 0 {
cd := &engine.CallDescriptor{
Direction: lastCC.Direction,
Tenant: lastCC.Tenant,
Category: lastCC.Category,
Subject: lastCC.Subject,
Account: lastCC.Account,
Destination: lastCC.Destination,
Increments: refundIncrements,
}
var response float64
err := sm.connector.RefundIncrements(*cd, &response)
if err != nil {
engine.Logger.Err(fmt.Sprintf("Debit cents failed: %v", err))
}
}
cost := refundIncrements.GetTotalCost()
lastCC.Cost -= cost
lastCC.Timespans.Compress()
// engine.Logger.Info(fmt.Sprintf("Rambursed %v cents", cost))
}
}
func (sm *FSSessionManager) GetDebitPeriod() time.Duration {
@@ -368,12 +338,8 @@ func (sm *FSSessionManager) Shutdown() (err error) {
return errors.New("Cannot shutdown sessions, fsock not connected")
}
engine.Logger.Info("Shutting down all sessions...")
cmdKillPrepaid := "hupall MANAGER_REQUEST cgr_reqtype prepaid"
cmdKillPostpaid := "hupall MANAGER_REQUEST cgr_reqtype postpaid"
for _, cmd := range []string{cmdKillPrepaid, cmdKillPostpaid} {
if _, err = fsock.FS.SendApiCmd(cmd); err != nil {
engine.Logger.Err(fmt.Sprintf("Error on calls shutdown: %s", err))
}
if _, err = fsock.FS.SendApiCmd("hupall MANAGER_REQUEST cgr_reqtype prepaid"); err != nil {
engine.Logger.Err(fmt.Sprintf("Error on calls shutdown: %s", err))
}
for guard := 0; len(sm.sessions) > 0 && guard < 20; guard++ {
time.Sleep(100 * time.Millisecond) // wait for the hungup event to be fired

View File

@@ -53,6 +53,9 @@ func NewSession(ev Event, sm SessionManager, dcs utils.DerivedChargers) *Session
sessionRuns: make([]*SessionRun, 0),
}
for _, dc := range dcs {
if ev.GetReqType(dc.ReqTypeField) != utils.PREPAID {
continue // We only consider prepaid sessions
}
startTime, err := ev.GetAnswerTime(dc.AnswerTimeField)
if err != nil {
engine.Logger.Err("Error parsing answer event start time, using time.Now!")
@@ -71,9 +74,7 @@ func NewSession(ev Event, sm SessionManager, dcs utils.DerivedChargers) *Session
callDescriptor: cd,
}
s.sessionRuns = append(s.sessionRuns, sr)
if ev.GetReqType(dc.ReqTypeField) == utils.PREPAID {
go s.debitLoop(len(s.sessionRuns) - 1) // Send index of the just appended sessionRun
}
go s.debitLoop(len(s.sessionRuns) - 1) // Send index of the just appended sessionRun
}
if len(s.sessionRuns) == 0 {
return nil