DisconnectSession with event instead of uuid to be more flexible in components where uuid is not enough to kill dialog (eg kamailio)

This commit is contained in:
DanB
2015-01-01 16:59:35 +01:00
parent b9002e674d
commit 10bbf73596
10 changed files with 42 additions and 30 deletions

View File

@@ -143,6 +143,9 @@ func (fsev FSEvent) GetCgrId() string {
func (fsev FSEvent) GetUUID() string {
return fsev[UUID]
}
func (fsev FSEvent) GetSessionIds() []string {
return []string{fsev.GetUUID()}
}
func (fsev FSEvent) GetTenant(fieldName string) string {
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
return fieldName[len(utils.STATIC_VALUE_PREFIX):]

View File

@@ -94,7 +94,7 @@ func (sm *FSSessionManager) createHandlers() (handlers map[string][]func(string)
// Searches and return the session with the specifed uuid
func (sm *FSSessionManager) GetSession(uuid string) *Session {
for _, s := range sm.sessions {
if s.uuid == uuid {
if s.eventStart.GetUUID() == uuid {
return s
}
}
@@ -102,24 +102,24 @@ func (sm *FSSessionManager) GetSession(uuid string) *Session {
}
// Disconnects a session by sending hangup command to freeswitch
func (sm *FSSessionManager) DisconnectSession(uuid, notify, destnr string) {
if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify)); err != nil {
func (sm *FSSessionManager) DisconnectSession(ev utils.Event, notify string) {
if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", ev.GetUUID(), notify)); err != nil {
engine.Logger.Err(fmt.Sprintf("<SessionManager> Could not send disconect api notification to freeswitch: %s", err.Error()))
}
if notify == INSUFFICIENT_FUNDS {
if len(cfg.FSEmptyBalanceContext) != 0 {
if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_transfer %s %s %s\n\n", uuid, destnr, cfg.FSEmptyBalanceContext)); err != nil {
if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_transfer %s %s %s\n\n", ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), cfg.FSEmptyBalanceContext)); err != nil {
engine.Logger.Err("<SessionManager> Could not transfer the call to empty balance context")
}
return
} else if len(cfg.FSEmptyBalanceAnnFile) != 0 {
if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_broadcast %s playback!manager_request::%s aleg\n\n", uuid, cfg.FSEmptyBalanceAnnFile)); err != nil {
if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_broadcast %s playback!manager_request::%s aleg\n\n", ev.GetUUID(), cfg.FSEmptyBalanceAnnFile)); err != nil {
engine.Logger.Err(fmt.Sprintf("<SessionManager> Could not send uuid_broadcast to freeswitch: %s", err.Error()))
}
return
}
}
if err := fsock.FS.SendMsgCmd(uuid, map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}); err != nil {
if err := fsock.FS.SendMsgCmd(ev.GetUUID(), map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}); err != nil {
engine.Logger.Err(fmt.Sprintf("<SessionManager> Could not send disconect msg to freeswitch: %v", err))
}
return
@@ -128,7 +128,7 @@ func (sm *FSSessionManager) DisconnectSession(uuid, notify, destnr string) {
// Remove session from session list, removes all related in case of multiple runs
func (sm *FSSessionManager) RemoveSession(uuid string) {
for i, ss := range sm.sessions {
if ss.uuid == uuid {
if ss.eventStart.GetUUID() == uuid {
sm.sessions = append(sm.sessions[:i], sm.sessions[i+1:]...)
return
}
@@ -232,7 +232,7 @@ func (sm *FSSessionManager) OnChannelPark(ev utils.Event) {
func (sm *FSSessionManager) OnChannelAnswer(ev utils.Event) {
if ev.MissingParameter() {
sm.DisconnectSession(ev.GetUUID(), MISSING_PARAMETER, "")
sm.DisconnectSession(ev, MISSING_PARAMETER)
}
s := NewSession(ev, sm)
if s != nil {
@@ -246,8 +246,8 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev utils.Event) {
if s == nil { // Not handled by us
return
}
sm.RemoveSession(s.uuid) // Unreference it early so we avoid concurrency
if err := s.Close(ev); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database
sm.RemoveSession(s.eventStart.GetUUID()) // Unreference it early so we avoid concurrency
if err := s.Close(ev); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database
engine.Logger.Err(err.Error())
}
}

View File

@@ -27,7 +27,6 @@ import (
"github.com/cgrates/kamevapi"
"log/syslog"
"regexp"
"strings"
"time"
)
@@ -76,7 +75,7 @@ func (self *KamailioSessionManager) onCallStart(evData []byte) {
engine.Logger.Err(fmt.Sprintf("<SM-Kamailio> ERROR unmarshalling event: %s, error: %s", evData, err.Error()))
}
if kamEv.MissingParameter() {
self.DisconnectSession(fmt.Sprintf("%s,%s", kamEv[HASH_ENTRY], kamEv[HASH_ID]), utils.ERR_MANDATORY_IE_MISSING, "")
self.DisconnectSession(kamEv, utils.ERR_MANDATORY_IE_MISSING)
return
}
s := NewSession(kamEv, self)
@@ -98,8 +97,8 @@ func (self *KamailioSessionManager) onCallEnd(evData []byte) {
if s == nil { // Not handled by us
return
}
self.RemoveSession(s.uuid) // Unreference it early so we avoid concurrency
if err := s.Close(kev); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database
self.RemoveSession(s.eventStart.GetUUID()) // Unreference it early so we avoid concurrency
if err := s.Close(kev); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database
engine.Logger.Err(err.Error())
}
}
@@ -120,9 +119,9 @@ func (self *KamailioSessionManager) Connect() error {
return errors.New("<SM-Kamailio> Stopped reading events")
}
func (self *KamailioSessionManager) DisconnectSession(uuid, notify, destnr string) {
hashSplt := strings.Split(uuid, ",")
disconnectEv := &KamSessionDisconnect{Event: CGR_SESSION_DISCONNECT, HashEntry: hashSplt[0], HashId: hashSplt[1], Reason: notify}
func (self *KamailioSessionManager) DisconnectSession(ev utils.Event, notify string) {
sessionIds := ev.GetSessionIds()
disconnectEv := &KamSessionDisconnect{Event: CGR_SESSION_DISCONNECT, HashEntry: sessionIds[0], HashId: sessionIds[1], Reason: notify}
if err := self.kea.Send(disconnectEv.String()); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed sending disconnect request %s", err.Error()))
}
@@ -130,7 +129,7 @@ func (self *KamailioSessionManager) DisconnectSession(uuid, notify, destnr strin
}
func (self *KamailioSessionManager) RemoveSession(uuid string) {
for i, ss := range self.sessions {
if ss.uuid == uuid {
if ss.eventStart.GetUUID() == uuid {
self.sessions = append(self.sessions[:i], self.sessions[i+1:]...)
return
}
@@ -140,7 +139,7 @@ func (self *KamailioSessionManager) RemoveSession(uuid string) {
// Searches and return the session with the specifed uuid
func (self *KamailioSessionManager) GetSession(uuid string) *Session {
for _, s := range self.sessions {
if s.uuid == uuid {
if s.eventStart.GetUUID() == uuid {
return s
}
}

View File

@@ -100,6 +100,9 @@ func (kev KamEvent) GetCgrId() string {
func (kev KamEvent) GetUUID() string {
return kev[CALLID] + ";" + kev[FROM_TAG] // ToTag not available in callStart event
}
func (kev KamEvent) GetSessionIds() []string {
return []string{kev[HASH_ENTRY], kev[HASH_ID]}
}
func (kev KamEvent) GetDirection(fieldName string) string {
return utils.OUT
}

View File

@@ -78,6 +78,10 @@ func (osipsev *OsipsEvent) GetUUID() string {
return osipsev.osipsEvent.AttrValues[CALLID] + ";" + osipsev.osipsEvent.AttrValues[FROM_TAG] + ";" + osipsev.osipsEvent.AttrValues[TO_TAG]
}
func (osipsev *OsipsEvent) GetSessionIds() []string {
return []string{osipsev.GetUUID()}
}
func (osipsev *OsipsEvent) GetDirection(fieldName string) string {
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
return fieldName[len(utils.STATIC_VALUE_PREFIX):]

View File

@@ -69,7 +69,7 @@ func (osm *OsipsSessionManager) Connect() (err error) {
return errors.New("<SM-OpenSIPS> Stopped reading events")
}
func (osm *OsipsSessionManager) DisconnectSession(uuid, notify, destnr string) {
func (osm *OsipsSessionManager) DisconnectSession(ev utils.Event, notify string) {
return
}
func (osm *OsipsSessionManager) RemoveSession(uuid string) {

View File

@@ -31,9 +31,8 @@ import (
// Session type holding the call information fields, a session delegate for specific
// actions and a channel to signal end of the debit loop.
type Session struct {
cgrid string
uuid string
stopDebit chan bool
eventStart utils.Event // Store the original event who started this session so we can use it's info later (eg: disconnect, cgrid)
stopDebit chan bool // Channel to communicate with debit loops when closing the session
sessionManager SessionManager
sessionRuns []*engine.SessionRun
}
@@ -53,8 +52,7 @@ func (s *Session) SessionRuns() []*engine.SessionRun {
// Creates a new session and in case of prepaid starts the debit loop for each of the session runs individually
func NewSession(ev utils.Event, sm SessionManager) *Session {
s := &Session{cgrid: ev.GetCgrId(),
uuid: ev.GetUUID(),
s := &Session{eventStart: ev,
stopDebit: make(chan bool),
sessionManager: sm,
}
@@ -88,15 +86,15 @@ func (s *Session) debitLoop(runIdx int) {
cc := new(engine.CallCost)
if err := s.sessionManager.MaxDebit(&nextCd, cc); err != nil {
engine.Logger.Err(fmt.Sprintf("Could not complete debit opperation: %v", err))
s.sessionManager.DisconnectSession(s.uuid, SYSTEM_ERROR, "")
s.sessionManager.DisconnectSession(s.eventStart, SYSTEM_ERROR)
return
}
if cc.GetDuration() == 0 {
s.sessionManager.DisconnectSession(s.uuid, INSUFFICIENT_FUNDS, nextCd.Destination)
s.sessionManager.DisconnectSession(s.eventStart, INSUFFICIENT_FUNDS)
return
}
if cc.GetDuration() <= cfg.FSMinDurLowBalance && len(cfg.FSLowBalanceAnnFile) != 0 {
if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_broadcast %s %s aleg\n\n", s.uuid, cfg.FSLowBalanceAnnFile)); err != nil {
if _, err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_broadcast %s %s aleg\n\n", s.eventStart.GetUUID(), cfg.FSLowBalanceAnnFile)); err != nil {
engine.Logger.Err(fmt.Sprintf("<SessionManager> Could not send uuid_broadcast to freeswitch: %s", err.Error()))
}
}
@@ -210,6 +208,6 @@ func (s *Session) SaveOperations() {
if s.sessionManager.GetDbLogger() == nil {
engine.Logger.Err("<SessionManager> Error: no connection to logger database, cannot save costs")
}
s.sessionManager.GetDbLogger().LogCallCost(s.cgrid, engine.SESSION_MANAGER_SOURCE, sr.DerivedCharger.RunId, firstCC)
s.sessionManager.GetDbLogger().LogCallCost(s.eventStart.GetCgrId(), engine.SESSION_MANAGER_SOURCE, sr.DerivedCharger.RunId, firstCC)
}
}

View File

@@ -22,11 +22,12 @@ import (
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
type SessionManager interface {
Connect() error
DisconnectSession(string, string, string)
DisconnectSession(utils.Event, string)
RemoveSession(string)
MaxDebit(*engine.CallDescriptor, *engine.CallCost) error
GetDebitPeriod() time.Duration

View File

@@ -26,6 +26,7 @@ type Event interface {
GetName() string
GetCgrId() string
GetUUID() string
GetSessionIds() []string // Returns identifiers needed to control a session (eg disconnect)
GetDirection(string) string
GetSubject(string) string
GetAccount(string) string

View File

@@ -342,6 +342,9 @@ func (storedCdr *StoredCdr) GetCgrId() string {
func (storedCdr *StoredCdr) GetUUID() string {
return storedCdr.AccId
}
func (storedCdr *StoredCdr) GetSessionIds() []string {
return []string{storedCdr.GetUUID()}
}
func (storedCdr *StoredCdr) GetDirection(fieldName string) string {
if IsSliceMember([]string{DIRECTION, META_DEFAULT}, fieldName) {
return storedCdr.Direction