From d9ab417d8714cd24cdb79c1c99f94d41c76dece0 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 1 Nov 2015 12:36:26 +0100 Subject: [PATCH] Refectored session locking inside SM-FreeSWITCH, fixes #271 --- .../freeswitch/etc/init.d/freeswitch | 2 +- sessionmanager/fssessionmanager.go | 217 ++++++++---------- sessionmanager/session.go | 8 +- sessionmanager/sessionmanager.go | 10 +- sessionmanager/sessions.go | 89 +++++++ 5 files changed, 200 insertions(+), 126 deletions(-) create mode 100644 sessionmanager/sessions.go diff --git a/data/tutorials/fs_evsock/freeswitch/etc/init.d/freeswitch b/data/tutorials/fs_evsock/freeswitch/etc/init.d/freeswitch index 495976527..c5fb788be 100755 --- a/data/tutorials/fs_evsock/freeswitch/etc/init.d/freeswitch +++ b/data/tutorials/fs_evsock/freeswitch/etc/init.d/freeswitch @@ -56,7 +56,7 @@ do_start() { echo "Please review /usr/share/doc/$NAME/README.Debian">&2 return 3 fi - + echo $DAEMON_ARGS start-stop-daemon --start --quiet \ --pidfile $PIDFILE --exec $DAEMON --name $NAME --user $USER \ --test > /dev/null \ diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index bf1772fe2..099e4b8d4 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -23,6 +23,7 @@ import ( "fmt" "log/syslog" "strconv" + "strings" "time" "github.com/cgrates/cgrates/config" @@ -31,18 +32,6 @@ import ( "github.com/cgrates/fsock" ) -// The freeswitch session manager type holding a buffer for the network connection -// and the active sessions -type FSSessionManager struct { - cfg *config.SmFsConfig - conns map[string]*fsock.FSock // Keep the list here for connection management purposes - senderPools map[string]*fsock.FSockPool // Keep sender pools here - sessions []*Session - rater engine.Connector - cdrsrv engine.Connector - timezone string -} - func NewFSSessionManager(smFsConfig *config.SmFsConfig, rater, cdrs engine.Connector, timezone string) *FSSessionManager { return &FSSessionManager{ cfg: smFsConfig, @@ -50,50 +39,21 @@ func NewFSSessionManager(smFsConfig *config.SmFsConfig, rater, cdrs engine.Conne senderPools: make(map[string]*fsock.FSockPool), rater: rater, cdrsrv: cdrs, + sessions: NewSessions(), timezone: timezone, } } -// Connects to the freeswitch mod_event_socket server and starts -// listening for events. -func (sm *FSSessionManager) Connect() error { - eventFilters := map[string]string{"Call-Direction": "inbound"} - errChan := make(chan error) - for _, connCfg := range sm.cfg.Connections { - connId := utils.GenUUID() - fSock, err := fsock.NewFSock(connCfg.Server, connCfg.Password, connCfg.Reconnects, sm.createHandlers(), eventFilters, utils.Logger.(*syslog.Writer), connId) - if err != nil { - return err - } else if !fSock.Connected() { - return errors.New("Could not connect to FreeSWITCH") - } else { - sm.conns[connId] = fSock - } - go func() { // Start reading in own goroutine, return on error - if err := sm.conns[connId].ReadEvents(); err != nil { - errChan <- err - } - }() - if fsSenderPool, err := fsock.NewFSockPool(5, connCfg.Server, connCfg.Password, 1, - make(map[string][]func(string, string)), make(map[string]string), utils.Logger.(*syslog.Writer), connId); err != nil { - return fmt.Errorf("Cannot connect FreeSWITCH senders pool, error: %s", err.Error()) - } else if fsSenderPool == nil { - return errors.New("Cannot connect FreeSWITCH senders pool.") - } else { - sm.senderPools[connId] = fsSenderPool - } - if sm.cfg.ChannelSyncInterval != 0 { // Schedule running of the callsync - go func() { - for { // Schedule sync channels to run repetately - time.Sleep(sm.cfg.ChannelSyncInterval) - sm.SyncSessions() - } - - }() - } - } - err := <-errChan // Will keep the Connect locked until the first error in one of the connections - return err +// The freeswitch session manager type holding a buffer for the network connection +// and the active sessions +type FSSessionManager struct { + cfg *config.SmFsConfig + conns map[string]*fsock.FSock // Keep the list here for connection management purposes + senderPools map[string]*fsock.FSockPool // Keep sender pools here + rater engine.Connector + cdrsrv engine.Connector + sessions *Sessions + timezone string } func (sm *FSSessionManager) createHandlers() map[string][]func(string, string) { @@ -119,58 +79,6 @@ func (sm *FSSessionManager) createHandlers() map[string][]func(string, string) { return handlers } -// Searches and return the session with the specifed uuid -func (sm *FSSessionManager) GetSession(uuid string) *Session { - for _, s := range sm.sessions { - if s.eventStart.GetUUID() == uuid { - return s - } - } - return nil -} - -// Disconnects a session by sending hangup command to freeswitch -func (sm *FSSessionManager) DisconnectSession(ev engine.Event, connId, notify string) error { - if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", ev.GetUUID(), notify)); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not send disconect api notification to freeswitch, error: <%s>, connId: %s", err.Error(), connId)) - return err - } - if notify == INSUFFICIENT_FUNDS { - if len(sm.cfg.EmptyBalanceContext) != 0 { - if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_transfer %s %s %s\n\n", ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), sm.cfg.EmptyBalanceContext)); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not transfer the call to empty balance context, error: <%s>, connId: %s", err.Error(), connId)) - return err - } - return nil - } else if len(sm.cfg.EmptyBalanceAnnFile) != 0 { - if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_broadcast %s playback!manager_request::%s aleg\n\n", ev.GetUUID(), sm.cfg.EmptyBalanceAnnFile)); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not send uuid_broadcast to freeswitch, error: <%s>, connId: %s", err.Error(), connId)) - return err - } - return nil - } - } - if err := sm.conns[connId].SendMsgCmd(ev.GetUUID(), map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not send disconect msg to freeswitch, error: <%s>, connId: %s", err.Error(), connId)) - return err - } - return nil -} - -// Remove session from session list, removes all related in case of multiple runs -func (sm *FSSessionManager) RemoveSession(uuid string) { - for i, ss := range sm.sessions { - if ss.eventStart.GetUUID() == uuid { - sm.sessions = append(sm.sessions[:i], sm.sessions[i+1:]...) - return - } - } -} - -func (sm *FSSessionManager) Timezone() string { - return sm.timezone -} - // Sets the call timeout valid of starting of the call func (sm *FSSessionManager) setMaxCallDuration(uuid, connId string, maxDur time.Duration) error { // _, err := fsock.FS.SendApiCmd(fmt.Sprintf("sched_hangup +%d %s\n\n", int(maxDur.Seconds()), uuid)) @@ -288,7 +196,7 @@ func (sm *FSSessionManager) onChannelAnswer(ev engine.Event, connId string) { } s := NewSession(ev, connId, sm) if s != nil { - sm.sessions = append(sm.sessions, s) + sm.sessions.indexSession(s) } } @@ -301,7 +209,7 @@ func (sm *FSSessionManager) onChannelHangupComplete(ev engine.Event) { } var s *Session for i := 0; i < 2; i++ { // Protect us against concurrency, wait a couple of seconds for the answer to be populated before we process hangup - s = sm.GetSession(ev.GetUUID()) + s = sm.sessions.getSession(ev.GetUUID()) if s != nil { break } @@ -310,12 +218,81 @@ func (sm *FSSessionManager) onChannelHangupComplete(ev engine.Event) { if s == nil { // Not handled by us return } - sm.RemoveSession(s.eventStart.GetUUID()) // Unreference it early so we avoid concurrency - if err := s.Close(ev); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database + if err := sm.sessions.removeSession(s, ev); err != nil { utils.Logger.Err(err.Error()) } } +// Connects to the freeswitch mod_event_socket server and starts +// listening for events. +func (sm *FSSessionManager) Connect() error { + eventFilters := map[string]string{"Call-Direction": "inbound"} + errChan := make(chan error) + for _, connCfg := range sm.cfg.Connections { + connId := utils.GenUUID() + fSock, err := fsock.NewFSock(connCfg.Server, connCfg.Password, connCfg.Reconnects, sm.createHandlers(), eventFilters, utils.Logger.(*syslog.Writer), connId) + if err != nil { + return err + } else if !fSock.Connected() { + return errors.New("Could not connect to FreeSWITCH") + } else { + sm.conns[connId] = fSock + } + go func() { // Start reading in own goroutine, return on error + if err := sm.conns[connId].ReadEvents(); err != nil { + errChan <- err + } + }() + if fsSenderPool, err := fsock.NewFSockPool(5, connCfg.Server, connCfg.Password, 1, + make(map[string][]func(string, string)), make(map[string]string), utils.Logger.(*syslog.Writer), connId); err != nil { + return fmt.Errorf("Cannot connect FreeSWITCH senders pool, error: %s", err.Error()) + } else if fsSenderPool == nil { + return errors.New("Cannot connect FreeSWITCH senders pool.") + } else { + sm.senderPools[connId] = fsSenderPool + } + if sm.cfg.ChannelSyncInterval != 0 { // Schedule running of the callsync + go func() { + for { // Schedule sync channels to run repetately + time.Sleep(sm.cfg.ChannelSyncInterval) + sm.SyncSessions() + } + + }() + } + } + err := <-errChan // Will keep the Connect locked until the first error in one of the connections + return err +} + +// Disconnects a session by sending hangup command to freeswitch +func (sm *FSSessionManager) DisconnectSession(ev engine.Event, connId, notify string) error { + if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", ev.GetUUID(), notify)); err != nil { + utils.Logger.Err(fmt.Sprintf(" Could not send disconect api notification to freeswitch, error: <%s>, connId: %s", err.Error(), connId)) + return err + } + if notify == INSUFFICIENT_FUNDS { + if len(sm.cfg.EmptyBalanceContext) != 0 { + if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_transfer %s %s %s\n\n", ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), sm.cfg.EmptyBalanceContext)); err != nil { + utils.Logger.Err(fmt.Sprintf(" Could not transfer the call to empty balance context, error: <%s>, connId: %s", err.Error(), connId)) + return err + } + return nil + } else if len(sm.cfg.EmptyBalanceAnnFile) != 0 { + if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_broadcast %s playback!manager_request::%s aleg\n\n", ev.GetUUID(), sm.cfg.EmptyBalanceAnnFile)); err != nil { + utils.Logger.Err(fmt.Sprintf(" Could not send uuid_broadcast to freeswitch, error: <%s>, connId: %s", err.Error(), connId)) + return err + } + return nil + } + } + if err := sm.conns[connId].SendMsgCmd(ev.GetUUID(), map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}); err != nil { + utils.Logger.Err(fmt.Sprintf(" Could not send disconect msg to freeswitch, error: <%s>, connId: %s", err.Error(), connId)) + return err + } + return nil +} + func (sm *FSSessionManager) ProcessCdr(storedCdr *engine.StoredCdr) error { var reply string if err := sm.cdrsrv.ProcessCdr(storedCdr, &reply); err != nil { @@ -327,6 +304,7 @@ func (sm *FSSessionManager) ProcessCdr(storedCdr *engine.StoredCdr) error { func (sm *FSSessionManager) DebitInterval() time.Duration { return sm.cfg.DebitInterval } + func (sm *FSSessionManager) CdrSrv() engine.Connector { return sm.cdrsrv } @@ -335,6 +313,14 @@ func (sm *FSSessionManager) Rater() engine.Connector { return sm.rater } +func (sm *FSSessionManager) Sessions() []*Session { + return sm.sessions.getSessions() +} + +func (sm *FSSessionManager) Timezone() string { + return sm.timezone +} + // Called when call goes under the minimum duratio threshold, so FreeSWITCH can play an announcement message func (sm *FSSessionManager) WarnSessionMinDuration(sessionUuid, connId string) { if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_broadcast %s %s aleg\n\n", sessionUuid, sm.cfg.LowBalanceAnnFile)); err != nil { @@ -353,17 +339,13 @@ func (sm *FSSessionManager) Shutdown() (err error) { utils.Logger.Err(fmt.Sprintf(" Error on calls shutdown: %s, connection id: %s", err.Error(), connId)) } } - for guard := 0; len(sm.sessions) > 0 && guard < 20; guard++ { + for i := 0; len(sm.sessions.getSessions()) > 0 && i < 20; i++ { time.Sleep(100 * time.Millisecond) // wait for the hungup event to be fired utils.Logger.Info(fmt.Sprintf(" Shutdown waiting on sessions: %v", sm.sessions)) } return nil } -func (sm *FSSessionManager) Sessions() []*Session { - return sm.sessions -} - // Sync sessions with FS /* map[secure: hostname:CgrDev1 callstate:ACTIVE callee_num:1002 initial_dest:1002 state:CS_EXECUTE dialplan:XML read_codec:SPEEX initial_ip_addr:127.0.0.1 write_codec:SPEEX write_bit_rate:44000 @@ -385,7 +367,11 @@ func (sm *FSSessionManager) SyncSessions() error { continue } aChans := fsock.MapChanData(activeChanStr) - for _, session := range sm.sessions { + if len(aChans) == 0 && strings.HasPrefix(activeChanStr, "uuid,direction") { // Failed converting output from FS + utils.Logger.Err(fmt.Sprintf(" Syncing active calls, failed converting output from FS: %s", activeChanStr)) + continue + } + for _, session := range sm.sessions.getSessions() { if session.connId != connId { // This session belongs to another connectionId continue } @@ -400,14 +386,13 @@ func (sm *FSSessionManager) SyncSessions() error { continue } utils.Logger.Warning(fmt.Sprintf(" Sync active channels, stale session detected, uuid: %s", session.eventStart.GetUUID())) - sm.RemoveSession(session.eventStart.GetUUID()) // Unreference it early so we avoid concurrency fsev := session.eventStart.(FSEvent) now := time.Now() aTime, _ := fsev.GetAnswerTime("", sm.timezone) dur := now.Sub(aTime) fsev[END_TIME] = now.String() fsev[DURATION] = strconv.FormatFloat(dur.Seconds(), 'f', -1, 64) - if err := session.Close(fsev); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database + if err := sm.sessions.removeSession(session, fsev); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database utils.Logger.Err(fmt.Sprintf(" Error on removing stale session with uuid: %s, error: %s", session.eventStart.GetUUID(), err.Error())) continue } diff --git a/sessionmanager/session.go b/sessionmanager/session.go index faa5c9b7d..b32485b5e 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -30,8 +30,8 @@ import ( // Session type holding the call information fields, a session delegate for specific // actions and a channel to signal end of the debit loop. type Session struct { - eventStart engine.Event // Store the original event who started this session so we can use it's info later (eg: disconnect, cgrid) - stopDebit chan bool // Channel to communicate with debit loops when closing the session + eventStart engine.Event // Store the original event who started this session so we can use it's info later (eg: disconnect, cgrid) + stopDebit chan struct{} // Channel to communicate with debit loops when closing the session sessionManager SessionManager connId string // Reference towards connection id on the session manager side. warnMinDur time.Duration @@ -54,7 +54,7 @@ func (s *Session) SessionRuns() []*engine.SessionRun { // Creates a new session and in case of prepaid starts the debit loop for each of the session runs individually func NewSession(ev engine.Event, connId string, sm SessionManager) *Session { s := &Session{eventStart: ev, - stopDebit: make(chan bool), + stopDebit: make(chan struct{}), sessionManager: sm, connId: connId, } @@ -138,7 +138,7 @@ func (s *Session) Close(ev engine.Event) error { } duration, err := ev.GetDuration(sr.DerivedCharger.UsageField) if err != nil { - utils.Logger.Crit(fmt.Sprintf("Error parsing call duration from event %s", err.Error())) + utils.Logger.Crit(fmt.Sprintf("Error parsing call duration from event: %s", err.Error())) return err } hangupTime := startTime.Add(duration) diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go index 3efcbfb9d..b2298a74c 100644 --- a/sessionmanager/sessionmanager.go +++ b/sessionmanager/sessionmanager.go @@ -28,13 +28,13 @@ type SessionManager interface { Rater() engine.Connector CdrSrv() engine.Connector DebitInterval() time.Duration - Connect() error DisconnectSession(engine.Event, string, string) error WarnSessionMinDuration(string, string) - RemoveSession(string) - ProcessCdr(*engine.StoredCdr) error - Shutdown() error Sessions() []*Session - SyncSessions() error Timezone() string + ProcessCdr(*engine.StoredCdr) error + Connect() error + Shutdown() error + //RemoveSession(string) + //SyncSessions() error } diff --git a/sessionmanager/sessions.go b/sessionmanager/sessions.go new file mode 100644 index 000000000..65e4a25c1 --- /dev/null +++ b/sessionmanager/sessions.go @@ -0,0 +1,89 @@ +/* +Real-time Charging System for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package sessionmanager + +import ( + "sync" + "time" + + "github.com/cgrates/cgrates/engine" +) + +func NewSessions() *Sessions { + return &Sessions{ + sessionsMux: new(sync.Mutex), + guard: engine.NewGuardianLock(), + } +} + +type Sessions struct { + sessions []*Session + sessionsMux *sync.Mutex // Lock the list operations + guard *engine.GuardianLock // Used to lock on uuid +} + +func (self *Sessions) indexSession(s *Session) { + self.sessionsMux.Lock() + self.sessions = append(self.sessions, s) + self.sessionsMux.Unlock() +} + +func (self *Sessions) getSessions() []*Session { + self.sessionsMux.Lock() + defer self.sessionsMux.Unlock() + return self.sessions +} + +// Searches and return the session with the specifed uuid +func (self *Sessions) getSession(uuid string) *Session { + self.sessionsMux.Lock() + defer self.sessionsMux.Unlock() + for _, s := range self.sessions { + if s.eventStart.GetUUID() == uuid { + return s + } + } + return nil +} + +// Remove session from session list, removes all related in case of multiple runs, true if item was found +func (self *Sessions) unindexSession(uuid string) bool { + self.sessionsMux.Lock() + defer self.sessionsMux.Unlock() + for i, ss := range self.sessions { + if ss.eventStart.GetUUID() == uuid { + self.sessions = append(self.sessions[:i], self.sessions[i+1:]...) + return true + } + } + return false +} + +func (self *Sessions) removeSession(s *Session, evStop engine.Event) error { + _, err := self.guard.Guard(func() (interface{}, error) { // Lock it on UUID level + if !self.unindexSession(s.eventStart.GetUUID()) { // Unreference it early so we avoid concurrency + return nil, nil // Did not find the session so no need to close it anymore + } + if err := s.Close(evStop); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database + return nil, err + } + return nil, nil + }, time.Duration(2)*time.Second, s.eventStart.GetUUID()) + return err +}