SessionManager - Multiple sessions emulated out of one request to support scenarios like reseller chains

This commit is contained in:
DanB
2014-03-08 16:22:22 +01:00
parent b28a02512c
commit 8d61099de1
12 changed files with 445 additions and 265 deletions

View File

@@ -36,7 +36,6 @@
[balancer]
# enabled = false # Start Balancer service: <true|false>.
# listen = 127.0.0.1:2012 # Balancer listen interface: <""|x.y.z.y:1234>.
[rater]
enabled = true # Enable RaterCDRSExportPath service: <true|false>.
@@ -47,13 +46,13 @@ enabled = true # Starts Scheduler service: <true|false>.
[cdrs]
enabled = true # Start the CDR Server service: <true|false>.
# extra_fields = # Extra fields to store in CDRs
# extra_fields = # Extra fields to store in CDRs for non-generic CDRs
mediator = internal # Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
[cdre]
# cdr_format = csv # Exported CDRs format <csv>
# extra_fields = # List of extra fields to be exported out in CDRs
export_dir = /tmp # Path where the exported CDRs will be placed
# cdr_format = csv # Exported CDRs format <csv>
# extra_fields = # List of extra fields to be exported out in CDRs
# export_dir = /var/log/cgrates/cdr/cdrexport/csv # Path where the exported CDRs will be placed
[cdrc]
# enabled = false # Enable CDR client functionality
@@ -72,8 +71,9 @@ export_dir = /tmp # Path where the exported CDRs will be placed
# account_field = 5 # Account field identifier. Use index numbers in case of .csv cdrs.
# subject_field = 6 # Subject field identifier. Use index numbers in case of .csv CDRs.
# destination_field = 7 # Destination field identifier. Use index numbers in case of .csv cdrs.
# answer_time_field = 8 # Answer time field identifier. Use index numbers in case of .csv cdrs.
# duration_field = 9 # Duration field identifier. Use index numbers in case of .csv cdrs.
# setup_time_field = 8 # Setup time field identifier. Use index numbers in case of .csv cdrs.
# answer_time_field = 9 # Answer time field identifier. Use index numbers in case of .csv cdrs.
# duration_field = 10 # Duration field identifier. Use index numbers in case of .csv cdrs.
# extra_fields = # Extra fields identifiers. For .csv, format: <label_extrafield_1>:<index_extrafield_1>[...,<label_extrafield_n>:<index_extrafield_n>]
[mediator]
@@ -88,15 +88,28 @@ enabled = true # Starts Mediator service: <true|false>.
# account_fields = # Name of account fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
# subject_fields = # Name of fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
# destination_fields = # Name of destination fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
# answer_time_fields = # Name of time_answer fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
# setup_time_fields = # Name of setup_time fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
# answer_time_fields = # Name of answer_time fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
# duration_fields = # Name of duration fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
[session_manager]
enabled = true # Starts SessionManager service: <true|false>.
# switch_type = freeswitch # Defines the type of switch behind: <freeswitch>.
# rater = internal # Address where to reach the Rater.
rater = 127.0.0.1:2013 # Address where to reach the Rater.
# rater_reconnects = 3 # Number of reconnects to rater before giving up.
# debit_interval = 5 # Interval to perform debits on.
# debit_interval = 10 # Interval to perform debits on.
# max_call_duration = 3h # Maximum call duration a prepaid call can last
# run_ids = # Identifiers of additional sessions control.
# reqtype_fields = # Name of request type fields to be used during additional sessions control <""|*default|field_name>.
# direction_fields = # Name of direction fields to be used during additional sessions control <""|*default|field_name>.
# tenant_fields = # Name of tenant fields to be used during additional sessions control <""|*default|field_name>.
# tor_fields = # Name of tor fields to be used during additional sessions control <""|*default|field_name>.
# account_fields = # Name of account fields to be used during additional sessions control <""|*default|field_name>.
# subject_fields = # Name of fields to be used during additional sessions control <""|*default|field_name>.
# destination_fields = # Name of destination fields to be used during additional sessions control <""|*default|field_name>.
# setup_time_fields = # Name of setup_time fields to be used during additional sessions control <""|*default|field_name>.
# answer_time_fields = # Name of answer_time fields to be used during additional sessions control <""|*default|field_name>.
# duration_fields = # Name of duration fields to be used during additional sessions control <""|*default|field_name>.
[freeswitch]
# server = 127.0.0.1:8021 # Adress where to connect to FreeSWITCH socket.
@@ -105,11 +118,11 @@ enabled = true # Starts SessionManager service: <true|false>.
[history_server]
enabled = true # Starts History service: <true|false>.
history_dir = /tmp/cgr_history # Location on disk where to store history files.
# save_interval = 1s # Interval to save changed cache into .git archive
# history_dir = /var/log/cgrates/history # Location on disk where to store history files.
# save_interval = 1s # Interval to save changed cache into .git archive
[history_agent]
# enabled = false # Starts History as a client: <true|false>.
enabled = true # Starts History as a client: <true|false>.
# server = internal # Address where to reach the master history server: <internal|x.y.z.y:1234>
[mailer]

View File

@@ -1,3 +1,3 @@
#ActionsTag,Action,BalanceType,Direction,Units,ExpiryTime,DestinationTag,RatingSubject,SharedGroup,BalanceWeight,ExtraParameters,Weight
PREPAID_10,*topup_reset,*monetary,*out,10,*unlimited,*any,,,10,,10
#ActionsTag,Action,BalanceType,Direction,Units,ExpiryTime,DestinationTag,RatingSubject,BalanceWeight,SharedGroup,ExtraParameters,Weight
PREPAID_10,*topup_reset,*monetary,*out,10,*unlimited,*any,,10,,,10
LOG_WARNING,*log,,,,,,,,,,10
1 #ActionsTag Action BalanceType Direction Units ExpiryTime DestinationTag RatingSubject BalanceWeight SharedGroup ExtraParameters Weight
2 PREPAID_10 *topup_reset *monetary *out 10 *unlimited *any 10 10
3 LOG_WARNING *log 10

View File

@@ -149,6 +149,7 @@ func TestPostCdrs(t *testing.T) {
t.Error(err.Error())
}
}
time.Sleep(10 * time.Millisecond) // Give time for CDRs to reach database
if storedCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, false, false); err != nil {
t.Error(err)
} else if len(storedCdrs) != 2 { // Make sure CDRs made it into StorDb

View File

@@ -34,7 +34,9 @@ type Event interface {
GetTOR(string) string
GetTenant(string) string
GetReqType(string) string
GetSetupTime(string) (time.Time, error)
GetAnswerTime(string) (time.Time, error)
GetEndTime() (time.Time, error)
GetDuration(string) (time.Duration, error)
MissingParameter() bool
}

View File

@@ -20,6 +20,7 @@ package sessionmanager
import (
"fmt"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/fsock"
"strconv"
@@ -45,6 +46,7 @@ const (
SETUP_TIME = "Caller-Channel-Created-Time"
ANSWER_TIME = "Caller-Channel-Answered-Time"
END_TIME = "Caller-Channel-Hangup-Time"
DURATION = ""
NAME = "Event-Name"
HEARTBEAT = "HEARTBEAT"
ANSWER = "CHANNEL_ANSWER"
@@ -79,37 +81,60 @@ func (fsev FSEvent) GetName() string {
return fsev[NAME]
}
func (fsev FSEvent) GetDirection(fieldName string) string {
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
//TODO: implement direction
return "*out"
//return fsev[DIRECTION]
}
func (fsev FSEvent) GetSubject(fieldName string) string {
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.FirstNonEmpty(fsev[fieldName], fsev[SUBJECT], fsev[USERNAME])
}
func (fsev FSEvent) GetAccount(fieldName string) string {
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.FirstNonEmpty(fsev[fieldName], fsev[ACCOUNT], fsev[USERNAME])
}
// Charging destination number
func (fsev FSEvent) GetDestination(fieldName string) string {
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.FirstNonEmpty(fsev[fieldName], fsev[DESTINATION], fsev[CALL_DEST_NR])
}
// Original dialed destination number, useful in case of unpark
func (fsev FSEvent) GetCallDestNr(fieldName string) string {
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.FirstNonEmpty(fsev[fieldName], fsev[CALL_DEST_NR])
}
func (fsev FSEvent) GetTOR(fieldName string) string {
return utils.FirstNonEmpty(fsev[fieldName], fsev[TOR], cfg.DefaultTOR)
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.FirstNonEmpty(fsev[fieldName], fsev[TOR], config.CgrConfig().DefaultTOR)
}
func (fsev FSEvent) GetUUID() string {
return fsev[UUID]
}
func (fsev FSEvent) GetTenant(fieldName string) string {
return utils.FirstNonEmpty(fsev[fieldName], fsev[CSTMID], cfg.DefaultTenant)
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.FirstNonEmpty(fsev[fieldName], fsev[CSTMID], config.CgrConfig().DefaultTenant)
}
func (fsev FSEvent) GetReqType(fieldName string) string {
return utils.FirstNonEmpty(fsev[fieldName], fsev[REQTYPE], cfg.DefaultReqType)
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.FirstNonEmpty(fsev[fieldName], fsev[REQTYPE], config.CgrConfig().DefaultReqType)
}
func (fsev FSEvent) MissingParameter() bool {
return strings.TrimSpace(fsev.GetDirection("")) == "" ||
@@ -121,15 +146,19 @@ func (fsev FSEvent) MissingParameter() bool {
strings.TrimSpace(fsev.GetTenant("")) == "" ||
strings.TrimSpace(fsev.GetCallDestNr("")) == ""
}
func (fsev FSEvent) GetSetupTime(field string) (t time.Time, err error) {
st, err := strconv.ParseInt(fsev[field], 0, 64)
t = time.Unix(0, st*1000)
return
func (fsev FSEvent) GetSetupTime(fieldName string) (t time.Time, err error) {
sTimeStr := utils.FirstNonEmpty(fsev[fieldName], fsev[SETUP_TIME])
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
sTimeStr = fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.ParseTimeDetectLayout(sTimeStr)
}
func (fsev FSEvent) GetAnswerTime(field string) (t time.Time, err error) {
st, err := strconv.ParseInt(fsev[field], 0, 64)
t = time.Unix(0, st*1000)
return
func (fsev FSEvent) GetAnswerTime(fieldName string) (t time.Time, err error) {
aTimeStr := utils.FirstNonEmpty(fsev[fieldName], fsev[ANSWER_TIME])
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
aTimeStr = fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.ParseTimeDetectLayout(aTimeStr)
}
func (fsev FSEvent) GetEndTime() (t time.Time, err error) {
@@ -137,3 +166,11 @@ func (fsev FSEvent) GetEndTime() (t time.Time, err error) {
t = time.Unix(0, st*1000)
return
}
func (fsev FSEvent) GetDuration(fieldName string) (dur time.Duration, err error) {
durStr := utils.FirstNonEmpty(fsev[fieldName], fsev[DURATION])
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
durStr = fieldName[len(utils.STATIC_VALUE_PREFIX):]
}
return utils.ParseDurationWithSecs(durStr)
}

View File

@@ -19,7 +19,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package sessionmanager
import (
"github.com/cgrates/cgrates/config"
"testing"
"time"
)
func TestEventCreation(t *testing.T) {
@@ -49,3 +51,82 @@ Task-Runtime: 1349437318`
t.Error("Incorrect number of event fields: ", l)
}
}
// Detects if any of the parsers do not return static values
func TestEventParseStatic(t *testing.T) {
ev := new(FSEvent).New("")
setupTime, _ := ev.GetSetupTime("^2013-12-07 08:42:24")
answerTime, _ := ev.GetAnswerTime("^2013-12-07 08:42:24")
dur, _ := ev.GetDuration("^60s")
if ev.GetReqType("^test") != "test" ||
ev.GetDirection("^test") != "test" ||
ev.GetTenant("^test") != "test" ||
ev.GetTOR("^test") != "test" ||
ev.GetAccount("^test") != "test" ||
ev.GetSubject("^test") != "test" ||
ev.GetDestination("^test") != "test" ||
setupTime != time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC) ||
answerTime != time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC) ||
dur != time.Duration(60)*time.Second {
t.Error("Values out of static not matching",
ev.GetReqType("^test") != "test",
ev.GetDirection("^test") != "test",
ev.GetTenant("^test") != "test",
ev.GetTOR("^test") != "test",
ev.GetAccount("^test") != "test",
ev.GetSubject("^test") != "test",
ev.GetDestination("^test") != "test",
setupTime != time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC),
answerTime != time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC),
dur != time.Duration(60)*time.Second)
}
}
// Test here if the answer is selected out of headers we specify, even if not default defined
func TestEventSelectiveHeaders(t *testing.T) {
body := `Event-Name: RE_SCHEDULE
Core-UUID: 792e181c-b6e6-499c-82a1-52a778e7d82d
FreeSWITCH-Hostname: h1.ip-switch.net
FreeSWITCH-Switchname: h1.ip-switch.net
FreeSWITCH-IPv4: 88.198.12.156
FreeSWITCH-IPv6: %3A%3A1
Event-Date-Local: 2012-10-05%2013%3A41%3A38
Event-Date-GMT: Fri,%2005%20Oct%202012%2011%3A41%3A38%20GMT
Event-Date-Timestamp: 1349437298012866
Event-Calling-File: switch_scheduler.c
Event-Calling-Function: switch_scheduler_execute
Event-Calling-Line-Number: 65
Event-Sequence: 34263
Task-ID: 2
Task-Desc: heartbeat
Task-Group: core
Task-Runtime: 1349437318`
cfg, _ = config.NewDefaultCGRConfig()
config.SetCgrConfig(cfg)
ev := new(FSEvent).New(body)
setupTime, _ := ev.GetSetupTime("Event-Date-Local")
answerTime, _ := ev.GetAnswerTime("Event-Date-Local")
dur, _ := ev.GetDuration("Event-Calling-Line-Number")
if ev.GetReqType("FreeSWITCH-Hostname") != "h1.ip-switch.net" ||
ev.GetDirection("FreeSWITCH-Hostname") != "*out" ||
ev.GetTenant("FreeSWITCH-Hostname") != "h1.ip-switch.net" ||
ev.GetTOR("FreeSWITCH-Hostname") != "h1.ip-switch.net" ||
ev.GetAccount("FreeSWITCH-Hostname") != "h1.ip-switch.net" ||
ev.GetSubject("FreeSWITCH-Hostname") != "h1.ip-switch.net" ||
ev.GetDestination("FreeSWITCH-Hostname") != "h1.ip-switch.net" ||
setupTime != time.Date(2012, 10, 5, 13, 41, 38, 0, time.UTC) ||
answerTime != time.Date(2012, 10, 5, 13, 41, 38, 0, time.UTC) ||
dur != time.Duration(65)*time.Second {
t.Error("Values out of static not matching",
ev.GetReqType("FreeSWITCH-Hostname") != "h1.ip-switch.net",
ev.GetDirection("FreeSWITCH-Hostname") != "*out",
ev.GetTenant("FreeSWITCH-Hostname") != "h1.ip-switch.net",
ev.GetTOR("FreeSWITCH-Hostname") != "h1.ip-switch.net",
ev.GetAccount("FreeSWITCH-Hostname") != "h1.ip-switch.net",
ev.GetSubject("FreeSWITCH-Hostname") != "h1.ip-switch.net",
ev.GetDestination("FreeSWITCH-Hostname") != "h1.ip-switch.net",
setupTime != time.Date(2012, 10, 5, 13, 41, 38, 0, time.UTC),
answerTime != time.Date(2012, 10, 5, 13, 41, 38, 0, time.UTC),
dur != time.Duration(65)*time.Second)
}
}

View File

@@ -24,7 +24,6 @@ import (
"fmt"
"log/syslog"
"net"
"strings"
"time"
"github.com/cgrates/cgrates/config"
@@ -100,23 +99,23 @@ func (sm *FSSessionManager) GetSession(uuid string) *Session {
}
// Disconnects a session by sending hangup command to freeswitch
func (sm *FSSessionManager) DisconnectSession(s *Session, notify string) {
func (sm *FSSessionManager) DisconnectSession(uuid string, notify string) {
// engine.Logger.Debug(fmt.Sprintf("Session: %+v", s.uuid))
_, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", s.uuid, notify))
_, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify))
if err != nil {
engine.Logger.Err(fmt.Sprintf("could not send disconect api notification to freeswitch: %v", err))
}
err = fsock.FS.SendMsgCmd(s.uuid, map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}) // without + sign
err = fsock.FS.SendMsgCmd(uuid, map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}) // without + sign
if err != nil {
engine.Logger.Err(fmt.Sprintf("could not send disconect msg to freeswitch: %v", err))
}
return
}
// Remove session from sessin list
func (sm *FSSessionManager) RemoveSession(s *Session) {
// Remove session from sessin list, removes all related in case of multiple runs
func (sm *FSSessionManager) RemoveSession(uuid string) {
for i, ss := range sm.sessions {
if ss == s {
if ss.uuid == uuid {
sm.sessions = append(sm.sessions[:i], sm.sessions[i+1:]...)
return
}
@@ -150,52 +149,73 @@ func (sm *FSSessionManager) OnHeartBeat(ev Event) {
}
func (sm *FSSessionManager) OnChannelPark(ev Event) {
//engine.Logger.Info("freeswitch park")
startTime, err := ev.GetAnswerTime(PARK_TIME)
if err != nil {
engine.Logger.Err("Error parsing answer event start time, using time.Now!")
startTime = time.Now()
var maxCallDuration time.Duration // This will be the maximum duration this channel will be allowed to last
runIds := append([]string{utils.DEFAULT_RUNID}, cfg.SMRunIds...) // Prepend default runid to extra configured for session manager
for idx := range runIds {
var directionFld, tenantFld, torFld, actFld, subjFld, dstFld string
if idx != 0 { // Take fields out of config, default ones are automatically handled as empty
idxCfg := idx - 1 // In configuration we did not prepend values
directionFld = cfg.SMDirectionFields[idxCfg]
tenantFld = cfg.SMTenantFields[idxCfg]
torFld = cfg.SMTORFields[idxCfg]
actFld = cfg.SMAccountFields[idxCfg]
subjFld = cfg.SMSubjectFields[idxCfg]
dstFld = cfg.SMDestFields[idxCfg]
}
startTime, err := ev.GetAnswerTime(PARK_TIME)
if err != nil {
engine.Logger.Err("Error parsing answer event start time, using time.Now!")
startTime = time.Now()
}
// if there is no account configured leave the call alone
if idx == 0 && !utils.IsSliceMember([]string{utils.PREPAID, utils.PSEUDOPREPAID}, ev.GetReqType("")) {
return // we unpark only prepaid and pseudoprepaid calls
}
if ev.MissingParameter() {
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(dstFld), MISSING_PARAMETER)
engine.Logger.Err(fmt.Sprintf("Missing parameter for %s", ev.GetUUID()))
return
}
cd := engine.CallDescriptor{
Direction: ev.GetDirection(directionFld),
Tenant: ev.GetTenant(tenantFld),
TOR: ev.GetTOR(torFld),
Subject: ev.GetSubject(subjFld),
Account: ev.GetAccount(actFld),
Destination: ev.GetDestination(dstFld),
TimeStart: startTime,
TimeEnd: startTime.Add(cfg.SMMaxCallDuration),
}
var remainingDurationFloat float64
err = sm.connector.GetMaxSessionTime(cd, &remainingDurationFloat)
if err != nil {
engine.Logger.Err(fmt.Sprintf("Could not get max session time for %s: %v", ev.GetUUID(), err))
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(""), SYSTEM_ERROR) // We unpark on original destination
return
}
remainingDuration := time.Duration(remainingDurationFloat)
// Set maxCallDuration, smallest out of all forked sessions
if idx == 0 {
maxCallDuration = remainingDuration
} else if maxCallDuration > remainingDuration {
maxCallDuration = remainingDuration
}
}
// if there is no account configured leave the call alone
if !utils.IsSliceMember([]string{utils.PREPAID, utils.PSEUDOPREPAID}, strings.TrimSpace(ev.GetReqType(""))) {
return // we unpark only prepaid and pseudoprepaid calls
}
if ev.MissingParameter() {
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(""), MISSING_PARAMETER)
engine.Logger.Err(fmt.Sprintf("Missing parameter for %s", ev.GetUUID()))
return
}
cd := engine.CallDescriptor{
Direction: ev.GetDirection(""),
Tenant: ev.GetTenant(""),
TOR: ev.GetTOR(""),
Subject: ev.GetSubject(""),
Account: ev.GetAccount(""),
Destination: ev.GetDestination(""),
TimeStart: startTime,
TimeEnd: startTime.Add(cfg.SMMaxCallDuration),
}
var remainingDurationFloat float64
err = sm.connector.GetMaxSessionTime(cd, &remainingDurationFloat)
if err != nil {
engine.Logger.Err(fmt.Sprintf("Could not get max session time for %s: %v", ev.GetUUID(), err))
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(""), SYSTEM_ERROR)
return
}
remainingDuration := time.Duration(remainingDurationFloat)
//engine.Logger.Info(fmt.Sprintf("Remaining duration: %v", remainingDuration))
if remainingDuration == 0 {
if maxCallDuration == 0 {
//engine.Logger.Info(fmt.Sprintf("Not enough credit for trasferring the call %s for %s.", ev.GetUUID(), cd.GetKey(cd.Subject)))
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(""), INSUFFICIENT_FUNDS)
return
}
sm.setMaxCallDuration(ev.GetUUID(), remainingDuration)
sm.setMaxCallDuration(ev.GetUUID(), maxCallDuration)
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(""), AUTH_OK)
}
func (sm *FSSessionManager) OnChannelAnswer(ev Event) {
//engine.Logger.Info("<SessionManager> FreeSWITCH answer.")
// Make sure cgr_type is enforced even if not set by FreeSWITCH
if ev.MissingParameter() {
sm.DisconnectSession(ev.GetUUID(), MISSING_PARAMETER)
}
if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_reqtype %s\n\n", ev.GetUUID(), ev.GetReqType(""))); err != nil {
engine.Logger.Err(fmt.Sprintf("Error on attempting to overwrite cgr_type in chan variables: %v", err))
}
@@ -206,131 +226,131 @@ func (sm *FSSessionManager) OnChannelAnswer(ev Event) {
}
func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) {
//engine.Logger.Info("<SessionManager> FreeSWITCH hangup.")
s := sm.GetSession(ev.GetUUID())
if s == nil { // Not handled by us
return
} else {
sm.RemoveSession(s.uuid) // Unreference it early so we avoid concurrency
}
defer s.Close(ev) // Stop loop and save the costs deducted so far to database
if ev.GetReqType("") == utils.POSTPAID {
startTime, err := ev.GetAnswerTime(ANSWER_TIME)
if err != nil {
engine.Logger.Crit("Error parsing postpaid call start time from event")
return
defer s.Close(ev) // Stop loop and save the costs deducted so far to database
runIds := append([]string{utils.DEFAULT_RUNID}, cfg.SMRunIds...) // Prepend default runid to extra configured for session manager
for idx := range runIds {
var reqTypeFld, directionFld, tenantFld, torFld, actFld, subjFld, dstFld, aTimeFld string // ToDo: Add durFld
if idx != 0 { // Take fields out of config, default ones are automatically handled as empty
idxCfg := idx - 1 // In configuration we did not prepend values
reqTypeFld = cfg.SMReqTypeFields[idxCfg]
directionFld = cfg.SMDirectionFields[idxCfg]
tenantFld = cfg.SMTenantFields[idxCfg]
torFld = cfg.SMTORFields[idxCfg]
actFld = cfg.SMAccountFields[idxCfg]
subjFld = cfg.SMSubjectFields[idxCfg]
dstFld = cfg.SMDestFields[idxCfg]
aTimeFld = cfg.SMAnswerTimeFields[idxCfg]
// durFld = cfg.SMDurationFields[idxCfg]
}
endTime, err := ev.GetEndTime()
if err != nil {
engine.Logger.Crit("Error parsing postpaid call start time from event")
return
}
cd := engine.CallDescriptor{
Direction: ev.GetDirection(""),
Tenant: ev.GetTenant(""),
TOR: ev.GetTOR(""),
Subject: ev.GetSubject(""),
Account: ev.GetAccount(""),
LoopIndex: 0,
CallDuration: endTime.Sub(startTime),
Destination: ev.GetDestination(""),
TimeStart: startTime,
TimeEnd: endTime,
}
cc := &engine.CallCost{}
err = sm.connector.Debit(cd, cc)
if err != nil {
engine.Logger.Err(fmt.Sprintf("Error making the general debit for postpaid call: %v", ev.GetUUID()))
return
}
s.CallCosts = append(s.CallCosts, cc)
return
}
if s == nil || len(s.CallCosts) == 0 {
return // why would we have 0 callcosts
}
lastCC := s.CallCosts[len(s.CallCosts)-1]
// put credit back
var hangupTime time.Time
var err error
if hangupTime, err = ev.GetEndTime(); err != nil {
engine.Logger.Err("Error parsing answer event hangup time, using time.Now!")
hangupTime = time.Now()
}
end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd
refundDuration := end.Sub(hangupTime)
//initialRefundDuration := refundDuration
var refundIncrements engine.Increments
for i := len(lastCC.Timespans) - 1; i >= 0; i-- {
ts := lastCC.Timespans[i]
tsDuration := ts.GetDuration()
if refundDuration <= tsDuration {
lastRefundedIncrementIndex := 0
for j := len(ts.Increments) - 1; j >= 0; j-- {
increment := ts.Increments[j]
if increment.Duration <= refundDuration {
refundIncrements = append(refundIncrements, increment)
refundDuration -= increment.Duration
lastRefundedIncrementIndex = j
if ev.GetReqType(reqTypeFld) == utils.POSTPAID {
startTime, err := ev.GetAnswerTime(aTimeFld)
if err != nil {
engine.Logger.Crit("Error parsing postpaid call start time from event")
return
}
endTime, err := ev.GetEndTime()
if err != nil {
engine.Logger.Crit("Error parsing postpaid call start time from event")
return
}
cd := engine.CallDescriptor{
Direction: ev.GetDirection(directionFld),
Tenant: ev.GetTenant(tenantFld),
TOR: ev.GetTOR(torFld),
Subject: ev.GetSubject(actFld),
Account: ev.GetAccount(subjFld),
LoopIndex: 0,
CallDuration: endTime.Sub(startTime),
Destination: ev.GetDestination(dstFld),
TimeStart: startTime,
TimeEnd: endTime,
}
cc := &engine.CallCost{}
err = sm.connector.Debit(cd, cc)
if err != nil {
engine.Logger.Err(fmt.Sprintf("Error making the general debit for postpaid call: %v", ev.GetUUID()))
return
}
s.sessionRuns[idx].callCosts = append(s.sessionRuns[idx].callCosts, cc)
} else if ev.GetReqType(reqTypeFld) == utils.PREPAID { // Prepaid calls
if len(s.sessionRuns[idx].callCosts) == 0 {
continue // why would we have 0 callcosts
}
lastCC := s.sessionRuns[idx].callCosts[len(s.sessionRuns[idx].callCosts)-1]
// put credit back
var hangupTime time.Time
var err error
if hangupTime, err = ev.GetEndTime(); err != nil {
engine.Logger.Err("Error parsing answer event hangup time, using time.Now!")
hangupTime = time.Now()
}
end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd
refundDuration := end.Sub(hangupTime)
var refundIncrements engine.Increments
for i := len(lastCC.Timespans) - 1; i >= 0; i-- {
ts := lastCC.Timespans[i]
tsDuration := ts.GetDuration()
if refundDuration <= tsDuration {
lastRefundedIncrementIndex := 0
for j := len(ts.Increments) - 1; j >= 0; j-- {
increment := ts.Increments[j]
if increment.Duration <= refundDuration {
refundIncrements = append(refundIncrements, increment)
refundDuration -= increment.Duration
lastRefundedIncrementIndex = j
}
}
ts.SplitByIncrement(lastRefundedIncrementIndex)
break // do not go to other timespans
} else {
refundIncrements = append(refundIncrements, ts.Increments...)
// remove the timespan entirely
lastCC.Timespans[i] = nil
lastCC.Timespans = lastCC.Timespans[:i]
// continue to the next timespan with what is left to refund
refundDuration -= tsDuration
}
}
ts.SplitByIncrement(lastRefundedIncrementIndex)
break // do not go to other timespans
} else {
refundIncrements = append(refundIncrements, ts.Increments...)
// remove the timespan entirely
lastCC.Timespans[i] = nil
lastCC.Timespans = lastCC.Timespans[:i]
// continue to the next timespan with what is left to refund
refundDuration -= tsDuration
// show only what was actualy refunded (stopped in timespan)
// engine.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration))
if len(refundIncrements) > 0 {
cd := &engine.CallDescriptor{
Direction: lastCC.Direction,
Tenant: lastCC.Tenant,
TOR: lastCC.TOR,
Subject: lastCC.Subject,
Account: lastCC.Account,
Destination: lastCC.Destination,
Increments: refundIncrements,
}
var response float64
err := sm.connector.RefundIncrements(*cd, &response)
if err != nil {
engine.Logger.Err(fmt.Sprintf("Debit cents failed: %v", err))
}
}
cost := refundIncrements.GetTotalCost()
lastCC.Cost -= cost
// engine.Logger.Info(fmt.Sprintf("Rambursed %v cents", cost))
}
}
// show only what was actualy refunded (stopped in timespan)
// engine.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration))
if len(refundIncrements) > 0 {
cd := &engine.CallDescriptor{
Direction: lastCC.Direction,
Tenant: lastCC.Tenant,
TOR: lastCC.TOR,
Subject: lastCC.Subject,
Account: lastCC.Account,
Destination: lastCC.Destination,
Increments: refundIncrements,
// FallbackSubject: lastCC.FallbackSubject, // TODO: check how to best add it
}
var response float64
err := sm.connector.RefundIncrements(*cd, &response)
if err != nil {
engine.Logger.Err(fmt.Sprintf("Debit cents failed: %v", err))
}
}
cost := refundIncrements.GetTotalCost()
lastCC.Cost -= cost
// engine.Logger.Info(fmt.Sprintf("Rambursed %v cents", cost))
}
func (sm *FSSessionManager) LoopAction(s *Session, cd *engine.CallDescriptor) (cc *engine.CallCost) {
cc = &engine.CallCost{}
err := sm.connector.MaxDebit(*cd, cc)
if err != nil {
engine.Logger.Err(fmt.Sprintf("Could not complete debit opperation: %v", err))
// disconnect session
s.sessionManager.DisconnectSession(s, SYSTEM_ERROR)
}
// engine.Logger.Debug(fmt.Sprintf("Result of MaxDebit call: %v", cc))
if cc.GetDuration() == 0 || err != nil {
// engine.Logger.Info(fmt.Sprintf("No credit left: Disconnect %v", s))
sm.DisconnectSession(s, INSUFFICIENT_FUNDS)
return
}
s.CallCosts = append(s.CallCosts, cc)
return
}
func (sm *FSSessionManager) GetDebitPeriod() time.Duration {
return sm.debitPeriod
}
func (sm *FSSessionManager) MaxDebit(cd *engine.CallDescriptor, cc *engine.CallCost) error {
return sm.connector.MaxDebit(*cd, cc)
}
func (sm *FSSessionManager) GetDbLogger() engine.LogStorage {
return sm.loggerDB
}
@@ -345,7 +365,6 @@ func (sm *FSSessionManager) Shutdown() (err error) {
for _, cmd := range []string{cmdKillPrepaid, cmdKillPostpaid} {
if _, err = fsock.FS.SendApiCmd(cmd); err != nil {
engine.Logger.Err(fmt.Sprintf("Error on calls shutdown: %s", err))
return
}
}
for guard := 0; len(sm.sessions) > 0 && guard < 20; guard++ {

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package sessionmanager
import (
"encoding/json"
"fmt"
"time"
@@ -30,52 +31,70 @@ import (
// actions and a channel to signal end of the debit loop.
type Session struct {
uuid string
callDescriptor *engine.CallDescriptor
sessionManager SessionManager
stopDebit chan bool
CallCosts []*engine.CallCost
sessionManager SessionManager
sessionRuns []*SessionRun
}
// Creates a new session and starts the debit loop
func NewSession(ev Event, sm SessionManager) (s *Session) {
// SesionManager only handles prepaid and postpaid calls
if ev.GetReqType("") != utils.PREPAID && ev.GetReqType("") != utils.POSTPAID {
return
}
startTime, err := ev.GetAnswerTime(ANSWER_TIME)
if err != nil {
engine.Logger.Err("Error parsing answer event start time, using time.Now!")
startTime = time.Now()
}
// One individual run
type SessionRun struct {
runId string
callDescriptor *engine.CallDescriptor
callCosts []*engine.CallCost
}
cd := &engine.CallDescriptor{
Direction: ev.GetDirection(""),
Tenant: ev.GetTenant(""),
TOR: ev.GetTOR(""),
Subject: ev.GetSubject(""),
Account: ev.GetAccount(""),
Destination: ev.GetDestination(""),
TimeStart: startTime}
s = &Session{uuid: ev.GetUUID(),
callDescriptor: cd,
stopDebit: make(chan bool, 2)} //buffer it for multiple close signals
s.sessionManager = sm
if ev.MissingParameter() {
sm.DisconnectSession(s, MISSING_PARAMETER)
} else {
switch ev.GetReqType("") {
case utils.PREPAID:
go s.startDebitLoop()
case utils.POSTPAID:
// do not loop, make only one debit at hangup
// Creates a new session and in case of prepaid starts the debit loop for each of the session runs individually
func NewSession(ev Event, sm SessionManager) *Session {
s := &Session{uuid: ev.GetUUID(),
stopDebit: make(chan bool),
sessionManager: sm,
sessionRuns: make([]*SessionRun, 0),
}
runIds := append([]string{utils.DEFAULT_RUNID}, cfg.SMRunIds...) // Prepend default runid to extra configured for session manager
for idx, runId := range runIds { // Create the SessionRuns here
var reqTypeFld, directionFld, tenantFld, torFld, actFld, subjFld, dstFld, aTimeFld string
if idx != 0 { // Take fields out of config, default ones are automatically handled as empty
idxCfg := idx - 1 // In configuration we did not prepend values
reqTypeFld = cfg.SMReqTypeFields[idxCfg]
directionFld = cfg.SMDirectionFields[idxCfg]
tenantFld = cfg.SMTenantFields[idxCfg]
torFld = cfg.SMTORFields[idxCfg]
actFld = cfg.SMAccountFields[idxCfg]
subjFld = cfg.SMSubjectFields[idxCfg]
dstFld = cfg.SMDestFields[idxCfg]
aTimeFld = cfg.SMAnswerTimeFields[idxCfg]
}
startTime, err := ev.GetAnswerTime(aTimeFld)
if err != nil {
engine.Logger.Err("Error parsing answer event start time, using time.Now!")
startTime = time.Now()
}
cd := &engine.CallDescriptor{
Direction: ev.GetDirection(directionFld),
Tenant: ev.GetTenant(tenantFld),
TOR: ev.GetTOR(torFld),
Subject: ev.GetSubject(subjFld),
Account: ev.GetAccount(actFld),
Destination: ev.GetDestination(dstFld),
TimeStart: startTime}
sr := &SessionRun{
runId: runId,
callDescriptor: cd,
}
s.sessionRuns = append(s.sessionRuns, sr)
if ev.GetReqType(reqTypeFld) == utils.PREPAID {
go s.debitLoop(len(s.sessionRuns) - 1) // Send index of the just appended sessionRun
}
}
return
if len(s.sessionRuns) == 0 {
return nil
}
return s
}
// the debit loop method (to be stoped by sending somenthing on stopDebit channel)
func (s *Session) startDebitLoop() {
nextCd := *s.callDescriptor
func (s *Session) debitLoop(runIdx int) {
nextCd := *s.sessionRuns[runIdx].callDescriptor
index := 0.0
debitPeriod := s.sessionManager.GetDebitPeriod()
for {
@@ -90,7 +109,18 @@ func (s *Session) startDebitLoop() {
nextCd.TimeEnd = nextCd.TimeStart.Add(debitPeriod)
nextCd.LoopIndex = index
nextCd.CallDuration += debitPeriod // first presumed duration
cc := s.sessionManager.LoopAction(s, &nextCd)
cc := &engine.CallCost{}
if err := s.sessionManager.MaxDebit(&nextCd, cc); err != nil {
engine.Logger.Err(fmt.Sprintf("Could not complete debit opperation: %v", err))
// disconnect session
s.sessionManager.DisconnectSession(s.uuid, SYSTEM_ERROR)
return
}
if cc.GetDuration() == 0 {
s.sessionManager.DisconnectSession(s.uuid, INSUFFICIENT_FUNDS)
return
}
s.sessionRuns[runIdx].callCosts = append(s.sessionRuns[runIdx].callCosts, cc)
nextCd.TimeEnd = cc.GetEndTime() // set debited timeEnd
// update call duration with real debited duration
nextCd.CallDuration -= debitPeriod
@@ -100,52 +130,43 @@ func (s *Session) startDebitLoop() {
}
}
// Returns the session duration till the specified time
func (s *Session) getSessionDurationFrom(now time.Time) (d time.Duration) {
seconds := now.Sub(s.callDescriptor.TimeStart).Seconds()
d, err := time.ParseDuration(fmt.Sprintf("%ds", int(seconds)))
if err != nil {
engine.Logger.Err(fmt.Sprintf("Cannot parse session duration %v", seconds))
}
return
}
// Stops the debit loop
func (s *Session) Close(ev Event) {
// engine.Logger.Debug(fmt.Sprintf("Stopping debit for %s", s.uuid))
if s == nil {
return
}
s.stopDebit <- true
//s.callDescriptor.TimeEnd = time.Now()
if endTime, err := ev.GetEndTime(); err != nil {
close(s.stopDebit) // Close the channel so all the sessionRuns listening will be notified
if _, err := ev.GetEndTime(); err != nil {
engine.Logger.Err("Error parsing answer event stop time.")
endTime = s.callDescriptor.TimeStart.Add(s.callDescriptor.CallDuration)
s.callDescriptor.TimeEnd = endTime
for idx := range s.sessionRuns {
s.sessionRuns[idx].callDescriptor.TimeEnd = s.sessionRuns[idx].callDescriptor.TimeStart.Add(s.sessionRuns[idx].callDescriptor.CallDuration)
}
}
s.SaveOperations()
s.sessionManager.RemoveSession(s)
}
// Nice print for session
func (s *Session) String() string {
return fmt.Sprintf("%v: %s(%s) -> %s", s.callDescriptor.TimeStart, s.callDescriptor.Subject, s.callDescriptor.Account, s.callDescriptor.Destination)
sDump, _ := json.Marshal(s)
return string(sDump)
}
//
// Saves call_costs for each session run
func (s *Session) SaveOperations() {
go func() {
if s == nil || len(s.CallCosts) == 0 {
return
}
firstCC := s.CallCosts[0]
for _, cc := range s.CallCosts[1:] {
firstCC.Merge(cc)
}
if s.sessionManager.GetDbLogger() == nil {
engine.Logger.Err("<SessionManager> Error: no connection to logger database, cannot save costs")
}
s.sessionManager.GetDbLogger().LogCallCost(s.uuid, engine.SESSION_MANAGER_SOURCE, utils.DEFAULT_RUNID, firstCC)
// engine.Logger.Debug(fmt.Sprintf("<SessionManager> End of call, having costs: %v", firstCC.String()))
}()
if s == nil {
return
}
for _, sr := range s.sessionRuns {
go func() {
firstCC := sr.callCosts[0]
for _, cc := range sr.callCosts[1:] {
firstCC.Merge(cc)
}
if s.sessionManager.GetDbLogger() == nil {
engine.Logger.Err("<SessionManager> Error: no connection to logger database, cannot save costs")
}
s.sessionManager.GetDbLogger().LogCallCost(s.uuid, engine.SESSION_MANAGER_SOURCE, sr.runId, firstCC)
}()
}
}

View File

@@ -19,8 +19,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package sessionmanager
import (
"github.com/cgrates/cgrates/config"
"testing"
//"github.com/cgrates/cgrates/config"
//"testing"
)
var (
@@ -58,17 +58,7 @@ default_reqtype=
`)
)
/*func TestSessionDurationSingle(t *testing.T) {
newEvent := new(FSEvent).New(newEventBody)
sm := &FSSessionManager{}
s := NewSession(newEvent, sm)
defer s.Close()
twoSeconds, _ := time.ParseDuration("2s")
if d := s.getSessionDurationFrom(s.callDescriptor.TimeStart.Add(twoSeconds)); d.Seconds() < 2 || d.Seconds() > 3 {
t.Errorf("Wrong session duration %v", d)
}
}*/
/* Missing parameter is not longer tested in NewSession. ToDo: expand this test for more util information
func TestSessionNilSession(t *testing.T) {
var errCfg error
cfg, errCfg = config.NewCGRConfigBytes(conf_data) // Needed here to avoid nil on cfg variable
@@ -82,3 +72,4 @@ func TestSessionNilSession(t *testing.T) {
t.Error("no account and it still created session.")
}
}
*/

View File

@@ -27,9 +27,9 @@ import (
type SessionManager interface {
Connect(*config.CGRConfig) error
DisconnectSession(*Session, string)
RemoveSession(*Session)
LoopAction(*Session, *engine.CallDescriptor) *engine.CallCost
DisconnectSession(string, string)
RemoveSession(string)
MaxDebit(*engine.CallDescriptor, *engine.CallCost) error
GetDebitPeriod() time.Duration
GetDbLogger() engine.LogStorage
Shutdown() error

View File

@@ -112,6 +112,7 @@ func ParseTimeDetectLayout(tmStr string) (time.Time, error) {
rfc3339Rule := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.+$`)
sqlRule := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}$`)
gotimeRule := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\.?\d*\s[+,-]\d+\s\w+$`)
fsTimestamp := regexp.MustCompile(`^\d{16}$`)
unixTimestampRule := regexp.MustCompile(`^\d{10}$`)
switch {
case rfc3339Rule.MatchString(tmStr):
@@ -120,6 +121,12 @@ func ParseTimeDetectLayout(tmStr string) (time.Time, error) {
return time.Parse("2006-01-02 15:04:05.999999999 -0700 MST", tmStr)
case sqlRule.MatchString(tmStr):
return time.Parse("2006-01-02 15:04:05", tmStr)
case fsTimestamp.MatchString(tmStr):
if tmstmp, err := strconv.ParseInt(tmStr+"000", 10, 64); err != nil {
return nilTime, err
} else {
return time.Unix(0, tmstmp), nil
}
case unixTimestampRule.MatchString(tmStr):
if tmstmp, err := strconv.ParseInt(tmStr, 10, 64); err != nil {
return nilTime, err

View File

@@ -186,6 +186,14 @@ func TestParseTimeDetectLayout(t *testing.T) {
if err == nil {
t.Errorf("Expecting error")
}
fsTmstampStr := "1394291049287234"
fsTm, err := ParseTimeDetectLayout(fsTmstampStr)
expectedTime = time.Date(2014, 3, 8, 15, 4, 9, 287234000, time.UTC)
if err != nil {
t.Error(err)
} else if !fsTm.Equal(expectedTime) {
t.Errorf("Unexpected time parsed: %v, expecting: %v", fsTm, expectedTime)
}
}
func TestParseDateUnix(t *testing.T) {