Refectored session locking inside SM-FreeSWITCH, fixes #271

This commit is contained in:
DanB
2015-11-01 12:36:26 +01:00
parent 685b0b1e63
commit d9ab417d87
5 changed files with 200 additions and 126 deletions

View File

@@ -56,7 +56,7 @@ do_start() {
echo "Please review /usr/share/doc/$NAME/README.Debian">&2
return 3
fi
echo $DAEMON_ARGS
start-stop-daemon --start --quiet \
--pidfile $PIDFILE --exec $DAEMON --name $NAME --user $USER \
--test > /dev/null \

View File

@@ -23,6 +23,7 @@ import (
"fmt"
"log/syslog"
"strconv"
"strings"
"time"
"github.com/cgrates/cgrates/config"
@@ -31,18 +32,6 @@ import (
"github.com/cgrates/fsock"
)
// The freeswitch session manager type holding a buffer for the network connection
// and the active sessions
type FSSessionManager struct {
cfg *config.SmFsConfig
conns map[string]*fsock.FSock // Keep the list here for connection management purposes
senderPools map[string]*fsock.FSockPool // Keep sender pools here
sessions []*Session
rater engine.Connector
cdrsrv engine.Connector
timezone string
}
func NewFSSessionManager(smFsConfig *config.SmFsConfig, rater, cdrs engine.Connector, timezone string) *FSSessionManager {
return &FSSessionManager{
cfg: smFsConfig,
@@ -50,50 +39,21 @@ func NewFSSessionManager(smFsConfig *config.SmFsConfig, rater, cdrs engine.Conne
senderPools: make(map[string]*fsock.FSockPool),
rater: rater,
cdrsrv: cdrs,
sessions: NewSessions(),
timezone: timezone,
}
}
// Connects to the freeswitch mod_event_socket server and starts
// listening for events.
func (sm *FSSessionManager) Connect() error {
eventFilters := map[string]string{"Call-Direction": "inbound"}
errChan := make(chan error)
for _, connCfg := range sm.cfg.Connections {
connId := utils.GenUUID()
fSock, err := fsock.NewFSock(connCfg.Server, connCfg.Password, connCfg.Reconnects, sm.createHandlers(), eventFilters, utils.Logger.(*syslog.Writer), connId)
if err != nil {
return err
} else if !fSock.Connected() {
return errors.New("Could not connect to FreeSWITCH")
} else {
sm.conns[connId] = fSock
}
go func() { // Start reading in own goroutine, return on error
if err := sm.conns[connId].ReadEvents(); err != nil {
errChan <- err
}
}()
if fsSenderPool, err := fsock.NewFSockPool(5, connCfg.Server, connCfg.Password, 1,
make(map[string][]func(string, string)), make(map[string]string), utils.Logger.(*syslog.Writer), connId); err != nil {
return fmt.Errorf("Cannot connect FreeSWITCH senders pool, error: %s", err.Error())
} else if fsSenderPool == nil {
return errors.New("Cannot connect FreeSWITCH senders pool.")
} else {
sm.senderPools[connId] = fsSenderPool
}
if sm.cfg.ChannelSyncInterval != 0 { // Schedule running of the callsync
go func() {
for { // Schedule sync channels to run repetately
time.Sleep(sm.cfg.ChannelSyncInterval)
sm.SyncSessions()
}
}()
}
}
err := <-errChan // Will keep the Connect locked until the first error in one of the connections
return err
// The freeswitch session manager type holding a buffer for the network connection
// and the active sessions
type FSSessionManager struct {
cfg *config.SmFsConfig
conns map[string]*fsock.FSock // Keep the list here for connection management purposes
senderPools map[string]*fsock.FSockPool // Keep sender pools here
rater engine.Connector
cdrsrv engine.Connector
sessions *Sessions
timezone string
}
func (sm *FSSessionManager) createHandlers() map[string][]func(string, string) {
@@ -119,58 +79,6 @@ func (sm *FSSessionManager) createHandlers() map[string][]func(string, string) {
return handlers
}
// Searches and return the session with the specifed uuid
func (sm *FSSessionManager) GetSession(uuid string) *Session {
for _, s := range sm.sessions {
if s.eventStart.GetUUID() == uuid {
return s
}
}
return nil
}
// Disconnects a session by sending hangup command to freeswitch
func (sm *FSSessionManager) DisconnectSession(ev engine.Event, connId, notify string) error {
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", ev.GetUUID(), notify)); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send disconect api notification to freeswitch, error: <%s>, connId: %s", err.Error(), connId))
return err
}
if notify == INSUFFICIENT_FUNDS {
if len(sm.cfg.EmptyBalanceContext) != 0 {
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_transfer %s %s %s\n\n", ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), sm.cfg.EmptyBalanceContext)); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not transfer the call to empty balance context, error: <%s>, connId: %s", err.Error(), connId))
return err
}
return nil
} else if len(sm.cfg.EmptyBalanceAnnFile) != 0 {
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_broadcast %s playback!manager_request::%s aleg\n\n", ev.GetUUID(), sm.cfg.EmptyBalanceAnnFile)); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send uuid_broadcast to freeswitch, error: <%s>, connId: %s", err.Error(), connId))
return err
}
return nil
}
}
if err := sm.conns[connId].SendMsgCmd(ev.GetUUID(), map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send disconect msg to freeswitch, error: <%s>, connId: %s", err.Error(), connId))
return err
}
return nil
}
// Remove session from session list, removes all related in case of multiple runs
func (sm *FSSessionManager) RemoveSession(uuid string) {
for i, ss := range sm.sessions {
if ss.eventStart.GetUUID() == uuid {
sm.sessions = append(sm.sessions[:i], sm.sessions[i+1:]...)
return
}
}
}
func (sm *FSSessionManager) Timezone() string {
return sm.timezone
}
// Sets the call timeout valid of starting of the call
func (sm *FSSessionManager) setMaxCallDuration(uuid, connId string, maxDur time.Duration) error {
// _, err := fsock.FS.SendApiCmd(fmt.Sprintf("sched_hangup +%d %s\n\n", int(maxDur.Seconds()), uuid))
@@ -288,7 +196,7 @@ func (sm *FSSessionManager) onChannelAnswer(ev engine.Event, connId string) {
}
s := NewSession(ev, connId, sm)
if s != nil {
sm.sessions = append(sm.sessions, s)
sm.sessions.indexSession(s)
}
}
@@ -301,7 +209,7 @@ func (sm *FSSessionManager) onChannelHangupComplete(ev engine.Event) {
}
var s *Session
for i := 0; i < 2; i++ { // Protect us against concurrency, wait a couple of seconds for the answer to be populated before we process hangup
s = sm.GetSession(ev.GetUUID())
s = sm.sessions.getSession(ev.GetUUID())
if s != nil {
break
}
@@ -310,12 +218,81 @@ func (sm *FSSessionManager) onChannelHangupComplete(ev engine.Event) {
if s == nil { // Not handled by us
return
}
sm.RemoveSession(s.eventStart.GetUUID()) // Unreference it early so we avoid concurrency
if err := s.Close(ev); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database
if err := sm.sessions.removeSession(s, ev); err != nil {
utils.Logger.Err(err.Error())
}
}
// Connects to the freeswitch mod_event_socket server and starts
// listening for events.
func (sm *FSSessionManager) Connect() error {
eventFilters := map[string]string{"Call-Direction": "inbound"}
errChan := make(chan error)
for _, connCfg := range sm.cfg.Connections {
connId := utils.GenUUID()
fSock, err := fsock.NewFSock(connCfg.Server, connCfg.Password, connCfg.Reconnects, sm.createHandlers(), eventFilters, utils.Logger.(*syslog.Writer), connId)
if err != nil {
return err
} else if !fSock.Connected() {
return errors.New("Could not connect to FreeSWITCH")
} else {
sm.conns[connId] = fSock
}
go func() { // Start reading in own goroutine, return on error
if err := sm.conns[connId].ReadEvents(); err != nil {
errChan <- err
}
}()
if fsSenderPool, err := fsock.NewFSockPool(5, connCfg.Server, connCfg.Password, 1,
make(map[string][]func(string, string)), make(map[string]string), utils.Logger.(*syslog.Writer), connId); err != nil {
return fmt.Errorf("Cannot connect FreeSWITCH senders pool, error: %s", err.Error())
} else if fsSenderPool == nil {
return errors.New("Cannot connect FreeSWITCH senders pool.")
} else {
sm.senderPools[connId] = fsSenderPool
}
if sm.cfg.ChannelSyncInterval != 0 { // Schedule running of the callsync
go func() {
for { // Schedule sync channels to run repetately
time.Sleep(sm.cfg.ChannelSyncInterval)
sm.SyncSessions()
}
}()
}
}
err := <-errChan // Will keep the Connect locked until the first error in one of the connections
return err
}
// Disconnects a session by sending hangup command to freeswitch
func (sm *FSSessionManager) DisconnectSession(ev engine.Event, connId, notify string) error {
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", ev.GetUUID(), notify)); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send disconect api notification to freeswitch, error: <%s>, connId: %s", err.Error(), connId))
return err
}
if notify == INSUFFICIENT_FUNDS {
if len(sm.cfg.EmptyBalanceContext) != 0 {
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_transfer %s %s %s\n\n", ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), sm.cfg.EmptyBalanceContext)); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not transfer the call to empty balance context, error: <%s>, connId: %s", err.Error(), connId))
return err
}
return nil
} else if len(sm.cfg.EmptyBalanceAnnFile) != 0 {
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_broadcast %s playback!manager_request::%s aleg\n\n", ev.GetUUID(), sm.cfg.EmptyBalanceAnnFile)); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send uuid_broadcast to freeswitch, error: <%s>, connId: %s", err.Error(), connId))
return err
}
return nil
}
}
if err := sm.conns[connId].SendMsgCmd(ev.GetUUID(), map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send disconect msg to freeswitch, error: <%s>, connId: %s", err.Error(), connId))
return err
}
return nil
}
func (sm *FSSessionManager) ProcessCdr(storedCdr *engine.StoredCdr) error {
var reply string
if err := sm.cdrsrv.ProcessCdr(storedCdr, &reply); err != nil {
@@ -327,6 +304,7 @@ func (sm *FSSessionManager) ProcessCdr(storedCdr *engine.StoredCdr) error {
func (sm *FSSessionManager) DebitInterval() time.Duration {
return sm.cfg.DebitInterval
}
func (sm *FSSessionManager) CdrSrv() engine.Connector {
return sm.cdrsrv
}
@@ -335,6 +313,14 @@ func (sm *FSSessionManager) Rater() engine.Connector {
return sm.rater
}
func (sm *FSSessionManager) Sessions() []*Session {
return sm.sessions.getSessions()
}
func (sm *FSSessionManager) Timezone() string {
return sm.timezone
}
// Called when call goes under the minimum duratio threshold, so FreeSWITCH can play an announcement message
func (sm *FSSessionManager) WarnSessionMinDuration(sessionUuid, connId string) {
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_broadcast %s %s aleg\n\n", sessionUuid, sm.cfg.LowBalanceAnnFile)); err != nil {
@@ -353,17 +339,13 @@ func (sm *FSSessionManager) Shutdown() (err error) {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Error on calls shutdown: %s, connection id: %s", err.Error(), connId))
}
}
for guard := 0; len(sm.sessions) > 0 && guard < 20; guard++ {
for i := 0; len(sm.sessions.getSessions()) > 0 && i < 20; i++ {
time.Sleep(100 * time.Millisecond) // wait for the hungup event to be fired
utils.Logger.Info(fmt.Sprintf("<SM-FreeSWITC> Shutdown waiting on sessions: %v", sm.sessions))
}
return nil
}
func (sm *FSSessionManager) Sessions() []*Session {
return sm.sessions
}
// Sync sessions with FS
/*
map[secure: hostname:CgrDev1 callstate:ACTIVE callee_num:1002 initial_dest:1002 state:CS_EXECUTE dialplan:XML read_codec:SPEEX initial_ip_addr:127.0.0.1 write_codec:SPEEX write_bit_rate:44000
@@ -385,7 +367,11 @@ func (sm *FSSessionManager) SyncSessions() error {
continue
}
aChans := fsock.MapChanData(activeChanStr)
for _, session := range sm.sessions {
if len(aChans) == 0 && strings.HasPrefix(activeChanStr, "uuid,direction") { // Failed converting output from FS
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Syncing active calls, failed converting output from FS: %s", activeChanStr))
continue
}
for _, session := range sm.sessions.getSessions() {
if session.connId != connId { // This session belongs to another connectionId
continue
}
@@ -400,14 +386,13 @@ func (sm *FSSessionManager) SyncSessions() error {
continue
}
utils.Logger.Warning(fmt.Sprintf("<SM-FreeSWITCH> Sync active channels, stale session detected, uuid: %s", session.eventStart.GetUUID()))
sm.RemoveSession(session.eventStart.GetUUID()) // Unreference it early so we avoid concurrency
fsev := session.eventStart.(FSEvent)
now := time.Now()
aTime, _ := fsev.GetAnswerTime("", sm.timezone)
dur := now.Sub(aTime)
fsev[END_TIME] = now.String()
fsev[DURATION] = strconv.FormatFloat(dur.Seconds(), 'f', -1, 64)
if err := session.Close(fsev); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database
if err := sm.sessions.removeSession(session, fsev); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Error on removing stale session with uuid: %s, error: %s", session.eventStart.GetUUID(), err.Error()))
continue
}

View File

@@ -30,8 +30,8 @@ import (
// Session type holding the call information fields, a session delegate for specific
// actions and a channel to signal end of the debit loop.
type Session struct {
eventStart engine.Event // Store the original event who started this session so we can use it's info later (eg: disconnect, cgrid)
stopDebit chan bool // Channel to communicate with debit loops when closing the session
eventStart engine.Event // Store the original event who started this session so we can use it's info later (eg: disconnect, cgrid)
stopDebit chan struct{} // Channel to communicate with debit loops when closing the session
sessionManager SessionManager
connId string // Reference towards connection id on the session manager side.
warnMinDur time.Duration
@@ -54,7 +54,7 @@ func (s *Session) SessionRuns() []*engine.SessionRun {
// Creates a new session and in case of prepaid starts the debit loop for each of the session runs individually
func NewSession(ev engine.Event, connId string, sm SessionManager) *Session {
s := &Session{eventStart: ev,
stopDebit: make(chan bool),
stopDebit: make(chan struct{}),
sessionManager: sm,
connId: connId,
}
@@ -138,7 +138,7 @@ func (s *Session) Close(ev engine.Event) error {
}
duration, err := ev.GetDuration(sr.DerivedCharger.UsageField)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("Error parsing call duration from event %s", err.Error()))
utils.Logger.Crit(fmt.Sprintf("Error parsing call duration from event: %s", err.Error()))
return err
}
hangupTime := startTime.Add(duration)

View File

@@ -28,13 +28,13 @@ type SessionManager interface {
Rater() engine.Connector
CdrSrv() engine.Connector
DebitInterval() time.Duration
Connect() error
DisconnectSession(engine.Event, string, string) error
WarnSessionMinDuration(string, string)
RemoveSession(string)
ProcessCdr(*engine.StoredCdr) error
Shutdown() error
Sessions() []*Session
SyncSessions() error
Timezone() string
ProcessCdr(*engine.StoredCdr) error
Connect() error
Shutdown() error
//RemoveSession(string)
//SyncSessions() error
}

View File

@@ -0,0 +1,89 @@
/*
Real-time Charging System for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package sessionmanager
import (
"sync"
"time"
"github.com/cgrates/cgrates/engine"
)
func NewSessions() *Sessions {
return &Sessions{
sessionsMux: new(sync.Mutex),
guard: engine.NewGuardianLock(),
}
}
type Sessions struct {
sessions []*Session
sessionsMux *sync.Mutex // Lock the list operations
guard *engine.GuardianLock // Used to lock on uuid
}
func (self *Sessions) indexSession(s *Session) {
self.sessionsMux.Lock()
self.sessions = append(self.sessions, s)
self.sessionsMux.Unlock()
}
func (self *Sessions) getSessions() []*Session {
self.sessionsMux.Lock()
defer self.sessionsMux.Unlock()
return self.sessions
}
// Searches and return the session with the specifed uuid
func (self *Sessions) getSession(uuid string) *Session {
self.sessionsMux.Lock()
defer self.sessionsMux.Unlock()
for _, s := range self.sessions {
if s.eventStart.GetUUID() == uuid {
return s
}
}
return nil
}
// Remove session from session list, removes all related in case of multiple runs, true if item was found
func (self *Sessions) unindexSession(uuid string) bool {
self.sessionsMux.Lock()
defer self.sessionsMux.Unlock()
for i, ss := range self.sessions {
if ss.eventStart.GetUUID() == uuid {
self.sessions = append(self.sessions[:i], self.sessions[i+1:]...)
return true
}
}
return false
}
func (self *Sessions) removeSession(s *Session, evStop engine.Event) error {
_, err := self.guard.Guard(func() (interface{}, error) { // Lock it on UUID level
if !self.unindexSession(s.eventStart.GetUUID()) { // Unreference it early so we avoid concurrency
return nil, nil // Did not find the session so no need to close it anymore
}
if err := s.Close(evStop); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database
return nil, err
}
return nil, nil
}, time.Duration(2)*time.Second, s.eventStart.GetUUID())
return err
}