From 10bbf73596aedd9f231c93f345db0a682c32f927 Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 1 Jan 2015 16:59:35 +0100 Subject: [PATCH] DisconnectSession with event instead of uuid to be more flexible in components where uuid is not enough to kill dialog (eg kamailio) --- sessionmanager/fsevent.go | 3 +++ sessionmanager/fssessionmanager.go | 20 ++++++++++---------- sessionmanager/kamailiosm.go | 17 ++++++++--------- sessionmanager/kamevent.go | 3 +++ sessionmanager/osipsevent.go | 4 ++++ sessionmanager/osipssm.go | 2 +- sessionmanager/session.go | 16 +++++++--------- sessionmanager/sessionmanager.go | 3 ++- utils/event.go | 1 + utils/storedcdr.go | 3 +++ 10 files changed, 42 insertions(+), 30 deletions(-) diff --git a/sessionmanager/fsevent.go b/sessionmanager/fsevent.go index 7e2311895..b821fe03c 100644 --- a/sessionmanager/fsevent.go +++ b/sessionmanager/fsevent.go @@ -143,6 +143,9 @@ func (fsev FSEvent) GetCgrId() string { func (fsev FSEvent) GetUUID() string { return fsev[UUID] } +func (fsev FSEvent) GetSessionIds() []string { + return []string{fsev.GetUUID()} +} func (fsev FSEvent) GetTenant(fieldName string) string { if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value return fieldName[len(utils.STATIC_VALUE_PREFIX):] diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index f7b4da01a..b135ebc41 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -94,7 +94,7 @@ func (sm *FSSessionManager) createHandlers() (handlers map[string][]func(string) // Searches and return the session with the specifed uuid func (sm *FSSessionManager) GetSession(uuid string) *Session { for _, s := range sm.sessions { - if s.uuid == uuid { + if s.eventStart.GetUUID() == uuid { return s } } @@ -102,24 +102,24 @@ func (sm *FSSessionManager) GetSession(uuid string) *Session { } // Disconnects a session by sending hangup command to freeswitch -func (sm *FSSessionManager) DisconnectSession(uuid, notify, destnr string) { - if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify)); err != nil { +func (sm *FSSessionManager) DisconnectSession(ev utils.Event, notify string) { + if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", ev.GetUUID(), notify)); err != nil { engine.Logger.Err(fmt.Sprintf(" Could not send disconect api notification to freeswitch: %s", err.Error())) } if notify == INSUFFICIENT_FUNDS { if len(cfg.FSEmptyBalanceContext) != 0 { - if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_transfer %s %s %s\n\n", uuid, destnr, cfg.FSEmptyBalanceContext)); err != nil { + if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_transfer %s %s %s\n\n", ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), cfg.FSEmptyBalanceContext)); err != nil { engine.Logger.Err(" Could not transfer the call to empty balance context") } return } else if len(cfg.FSEmptyBalanceAnnFile) != 0 { - if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_broadcast %s playback!manager_request::%s aleg\n\n", uuid, cfg.FSEmptyBalanceAnnFile)); err != nil { + if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_broadcast %s playback!manager_request::%s aleg\n\n", ev.GetUUID(), cfg.FSEmptyBalanceAnnFile)); err != nil { engine.Logger.Err(fmt.Sprintf(" Could not send uuid_broadcast to freeswitch: %s", err.Error())) } return } } - if err := fsock.FS.SendMsgCmd(uuid, map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}); err != nil { + if err := fsock.FS.SendMsgCmd(ev.GetUUID(), map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}); err != nil { engine.Logger.Err(fmt.Sprintf(" Could not send disconect msg to freeswitch: %v", err)) } return @@ -128,7 +128,7 @@ func (sm *FSSessionManager) DisconnectSession(uuid, notify, destnr string) { // Remove session from session list, removes all related in case of multiple runs func (sm *FSSessionManager) RemoveSession(uuid string) { for i, ss := range sm.sessions { - if ss.uuid == uuid { + if ss.eventStart.GetUUID() == uuid { sm.sessions = append(sm.sessions[:i], sm.sessions[i+1:]...) return } @@ -232,7 +232,7 @@ func (sm *FSSessionManager) OnChannelPark(ev utils.Event) { func (sm *FSSessionManager) OnChannelAnswer(ev utils.Event) { if ev.MissingParameter() { - sm.DisconnectSession(ev.GetUUID(), MISSING_PARAMETER, "") + sm.DisconnectSession(ev, MISSING_PARAMETER) } s := NewSession(ev, sm) if s != nil { @@ -246,8 +246,8 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev utils.Event) { if s == nil { // Not handled by us return } - sm.RemoveSession(s.uuid) // Unreference it early so we avoid concurrency - if err := s.Close(ev); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database + sm.RemoveSession(s.eventStart.GetUUID()) // Unreference it early so we avoid concurrency + if err := s.Close(ev); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database engine.Logger.Err(err.Error()) } } diff --git a/sessionmanager/kamailiosm.go b/sessionmanager/kamailiosm.go index 40eb63d3c..301204059 100644 --- a/sessionmanager/kamailiosm.go +++ b/sessionmanager/kamailiosm.go @@ -27,7 +27,6 @@ import ( "github.com/cgrates/kamevapi" "log/syslog" "regexp" - "strings" "time" ) @@ -76,7 +75,7 @@ func (self *KamailioSessionManager) onCallStart(evData []byte) { engine.Logger.Err(fmt.Sprintf(" ERROR unmarshalling event: %s, error: %s", evData, err.Error())) } if kamEv.MissingParameter() { - self.DisconnectSession(fmt.Sprintf("%s,%s", kamEv[HASH_ENTRY], kamEv[HASH_ID]), utils.ERR_MANDATORY_IE_MISSING, "") + self.DisconnectSession(kamEv, utils.ERR_MANDATORY_IE_MISSING) return } s := NewSession(kamEv, self) @@ -98,8 +97,8 @@ func (self *KamailioSessionManager) onCallEnd(evData []byte) { if s == nil { // Not handled by us return } - self.RemoveSession(s.uuid) // Unreference it early so we avoid concurrency - if err := s.Close(kev); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database + self.RemoveSession(s.eventStart.GetUUID()) // Unreference it early so we avoid concurrency + if err := s.Close(kev); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database engine.Logger.Err(err.Error()) } } @@ -120,9 +119,9 @@ func (self *KamailioSessionManager) Connect() error { return errors.New(" Stopped reading events") } -func (self *KamailioSessionManager) DisconnectSession(uuid, notify, destnr string) { - hashSplt := strings.Split(uuid, ",") - disconnectEv := &KamSessionDisconnect{Event: CGR_SESSION_DISCONNECT, HashEntry: hashSplt[0], HashId: hashSplt[1], Reason: notify} +func (self *KamailioSessionManager) DisconnectSession(ev utils.Event, notify string) { + sessionIds := ev.GetSessionIds() + disconnectEv := &KamSessionDisconnect{Event: CGR_SESSION_DISCONNECT, HashEntry: sessionIds[0], HashId: sessionIds[1], Reason: notify} if err := self.kea.Send(disconnectEv.String()); err != nil { engine.Logger.Err(fmt.Sprintf(" Failed sending disconnect request %s", err.Error())) } @@ -130,7 +129,7 @@ func (self *KamailioSessionManager) DisconnectSession(uuid, notify, destnr strin } func (self *KamailioSessionManager) RemoveSession(uuid string) { for i, ss := range self.sessions { - if ss.uuid == uuid { + if ss.eventStart.GetUUID() == uuid { self.sessions = append(self.sessions[:i], self.sessions[i+1:]...) return } @@ -140,7 +139,7 @@ func (self *KamailioSessionManager) RemoveSession(uuid string) { // Searches and return the session with the specifed uuid func (self *KamailioSessionManager) GetSession(uuid string) *Session { for _, s := range self.sessions { - if s.uuid == uuid { + if s.eventStart.GetUUID() == uuid { return s } } diff --git a/sessionmanager/kamevent.go b/sessionmanager/kamevent.go index ee856aebd..4c2272005 100644 --- a/sessionmanager/kamevent.go +++ b/sessionmanager/kamevent.go @@ -100,6 +100,9 @@ func (kev KamEvent) GetCgrId() string { func (kev KamEvent) GetUUID() string { return kev[CALLID] + ";" + kev[FROM_TAG] // ToTag not available in callStart event } +func (kev KamEvent) GetSessionIds() []string { + return []string{kev[HASH_ENTRY], kev[HASH_ID]} +} func (kev KamEvent) GetDirection(fieldName string) string { return utils.OUT } diff --git a/sessionmanager/osipsevent.go b/sessionmanager/osipsevent.go index 322244856..2c8430d94 100644 --- a/sessionmanager/osipsevent.go +++ b/sessionmanager/osipsevent.go @@ -78,6 +78,10 @@ func (osipsev *OsipsEvent) GetUUID() string { return osipsev.osipsEvent.AttrValues[CALLID] + ";" + osipsev.osipsEvent.AttrValues[FROM_TAG] + ";" + osipsev.osipsEvent.AttrValues[TO_TAG] } +func (osipsev *OsipsEvent) GetSessionIds() []string { + return []string{osipsev.GetUUID()} +} + func (osipsev *OsipsEvent) GetDirection(fieldName string) string { if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value return fieldName[len(utils.STATIC_VALUE_PREFIX):] diff --git a/sessionmanager/osipssm.go b/sessionmanager/osipssm.go index 9aed6428d..fe0129d53 100644 --- a/sessionmanager/osipssm.go +++ b/sessionmanager/osipssm.go @@ -69,7 +69,7 @@ func (osm *OsipsSessionManager) Connect() (err error) { return errors.New(" Stopped reading events") } -func (osm *OsipsSessionManager) DisconnectSession(uuid, notify, destnr string) { +func (osm *OsipsSessionManager) DisconnectSession(ev utils.Event, notify string) { return } func (osm *OsipsSessionManager) RemoveSession(uuid string) { diff --git a/sessionmanager/session.go b/sessionmanager/session.go index 351d8221a..592478cf5 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -31,9 +31,8 @@ import ( // Session type holding the call information fields, a session delegate for specific // actions and a channel to signal end of the debit loop. type Session struct { - cgrid string - uuid string - stopDebit chan bool + eventStart utils.Event // Store the original event who started this session so we can use it's info later (eg: disconnect, cgrid) + stopDebit chan bool // Channel to communicate with debit loops when closing the session sessionManager SessionManager sessionRuns []*engine.SessionRun } @@ -53,8 +52,7 @@ func (s *Session) SessionRuns() []*engine.SessionRun { // Creates a new session and in case of prepaid starts the debit loop for each of the session runs individually func NewSession(ev utils.Event, sm SessionManager) *Session { - s := &Session{cgrid: ev.GetCgrId(), - uuid: ev.GetUUID(), + s := &Session{eventStart: ev, stopDebit: make(chan bool), sessionManager: sm, } @@ -88,15 +86,15 @@ func (s *Session) debitLoop(runIdx int) { cc := new(engine.CallCost) if err := s.sessionManager.MaxDebit(&nextCd, cc); err != nil { engine.Logger.Err(fmt.Sprintf("Could not complete debit opperation: %v", err)) - s.sessionManager.DisconnectSession(s.uuid, SYSTEM_ERROR, "") + s.sessionManager.DisconnectSession(s.eventStart, SYSTEM_ERROR) return } if cc.GetDuration() == 0 { - s.sessionManager.DisconnectSession(s.uuid, INSUFFICIENT_FUNDS, nextCd.Destination) + s.sessionManager.DisconnectSession(s.eventStart, INSUFFICIENT_FUNDS) return } if cc.GetDuration() <= cfg.FSMinDurLowBalance && len(cfg.FSLowBalanceAnnFile) != 0 { - if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_broadcast %s %s aleg\n\n", s.uuid, cfg.FSLowBalanceAnnFile)); err != nil { + if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_broadcast %s %s aleg\n\n", s.eventStart.GetUUID(), cfg.FSLowBalanceAnnFile)); err != nil { engine.Logger.Err(fmt.Sprintf(" Could not send uuid_broadcast to freeswitch: %s", err.Error())) } } @@ -210,6 +208,6 @@ func (s *Session) SaveOperations() { if s.sessionManager.GetDbLogger() == nil { engine.Logger.Err(" Error: no connection to logger database, cannot save costs") } - s.sessionManager.GetDbLogger().LogCallCost(s.cgrid, engine.SESSION_MANAGER_SOURCE, sr.DerivedCharger.RunId, firstCC) + s.sessionManager.GetDbLogger().LogCallCost(s.eventStart.GetCgrId(), engine.SESSION_MANAGER_SOURCE, sr.DerivedCharger.RunId, firstCC) } } diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go index b526e6954..34ad293db 100644 --- a/sessionmanager/sessionmanager.go +++ b/sessionmanager/sessionmanager.go @@ -22,11 +22,12 @@ import ( "time" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" ) type SessionManager interface { Connect() error - DisconnectSession(string, string, string) + DisconnectSession(utils.Event, string) RemoveSession(string) MaxDebit(*engine.CallDescriptor, *engine.CallCost) error GetDebitPeriod() time.Duration diff --git a/utils/event.go b/utils/event.go index 7e522c581..0aded6d0b 100644 --- a/utils/event.go +++ b/utils/event.go @@ -26,6 +26,7 @@ type Event interface { GetName() string GetCgrId() string GetUUID() string + GetSessionIds() []string // Returns identifiers needed to control a session (eg disconnect) GetDirection(string) string GetSubject(string) string GetAccount(string) string diff --git a/utils/storedcdr.go b/utils/storedcdr.go index 2d341697c..91adc6b7e 100644 --- a/utils/storedcdr.go +++ b/utils/storedcdr.go @@ -342,6 +342,9 @@ func (storedCdr *StoredCdr) GetCgrId() string { func (storedCdr *StoredCdr) GetUUID() string { return storedCdr.AccId } +func (storedCdr *StoredCdr) GetSessionIds() []string { + return []string{storedCdr.GetUUID()} +} func (storedCdr *StoredCdr) GetDirection(fieldName string) string { if IsSliceMember([]string{DIRECTION, META_DEFAULT}, fieldName) { return storedCdr.Direction