From aa10bbd19a8fd6fe9cc3dad338e6d5af568f84ff Mon Sep 17 00:00:00 2001 From: DanB Date: Sat, 17 Sep 2016 16:30:36 +0200 Subject: [PATCH] SMAsterisk intial handlers for ChannelStateChange and ChannelDestroyed --- sessionmanager/sma_event.go | 51 +++++++++++++++++++----- sessionmanager/sma_event_test.go | 27 +++++++++++-- sessionmanager/smasterisk.go | 68 ++++++++++++++++++++++++-------- 3 files changed, 116 insertions(+), 30 deletions(-) diff --git a/sessionmanager/sma_event.go b/sessionmanager/sma_event.go index 41f869b0f..672cb8d63 100644 --- a/sessionmanager/sma_event.go +++ b/sessionmanager/sma_event.go @@ -81,6 +81,16 @@ func (smaEv *SMAsteriskEvent) Timestamp() string { return cachedVal } +func (smaEv *SMAsteriskEvent) ChannelState() string { + cachedKey := channelState + cachedVal, hasIt := smaEv.cachedFields[cachedKey] + if !hasIt { + channelData, _ := smaEv.ariEv["channel"].(map[string]interface{}) + cachedVal, _ = channelData["state"].(string) + } + return cachedVal +} + func (smaEv *SMAsteriskEvent) SetupTime() string { cachedKey := utils.SETUP_TIME cachedVal, hasIt := smaEv.cachedFields[cachedKey] @@ -156,18 +166,39 @@ func (smaEv *SMAsteriskEvent) ExtraParameters() (extraParams map[string]string) return } -func (smaEv *SMAsteriskEvent) UpdateFromEvent(updateEv *SMAsteriskEvent) { - smaEv.ariEv["type"] = updateEv.ariEv["type"] - smaEv.ariEv["timestamp"] = updateEv.ariEv["timestamp"] - smaEv.ariEv["channel"] = updateEv.ariEv["channel"] - if updateEv.EventType() == ARIChannelDestroyed { - smaEv.ariEv["cause"] = updateEv.ariEv["cause"] - smaEv.ariEv["cause_txt"] = updateEv.ariEv["cause_txt"] +/* +// Updates fields in smgEv based on own fields +// Need pointer so we update it directly in cache +func (smaEv *SMAsteriskEvent) UpdateSMGEvent(smgEv *SMGenericEvent) error { + switch smaEv.EventType() { + case ARIChannelStateChange: + smgEv[utils.EVENT_NAME] = utils.CGR_SESSION_START + if smaEv.ChannelState() == channelUp { + smgEv[utils.ANSWER_TIME] = smaEv.Timestamp() + } + case ARIChannelDestroyed: + smgEv[utils.EVENT_NAME] = utils.CGR_SESSION_END + aTime, err := smgEv.GetAnswerTime(utils.META_DEFAULT, "") + if err != nil { + return err + } else if aTime.IsZero() { + return errors.New("Unaswered channel") + } } } +*/ -func (smaEv *SMAsteriskEvent) AsSMGenericCGRAuth() (smgEv SMGenericEvent, err error) { - smgEv = SMGenericEvent{utils.EVENT_NAME: utils.CGR_AUTHORIZATION} +func (smaEv *SMAsteriskEvent) AsSMGenericEvent() SMGenericEvent { + var evName string + switch smaEv.EventType() { + case ARIStasisStart: + evName = utils.CGR_AUTHORIZATION + case ARIChannelStateChange: + evName = utils.CGR_SESSION_START + case ARIChannelDestroyed: + evName = utils.CGR_SESSION_END + } + smgEv := SMGenericEvent{utils.EVENT_NAME: evName} smgEv[utils.ACCID] = smaEv.ChannelID() if smaEv.RequestType() != "" { smgEv[utils.REQTYPE] = smaEv.RequestType() @@ -188,5 +219,5 @@ func (smaEv *SMAsteriskEvent) AsSMGenericCGRAuth() (smgEv SMGenericEvent, err er for extraKey, extraVal := range smaEv.ExtraParameters() { // Append extraParameters smgEv[extraKey] = extraVal } - return smgEv, nil + return smgEv } diff --git a/sessionmanager/sma_event_test.go b/sessionmanager/sma_event_test.go index e0818acde..85354acec 100644 --- a/sessionmanager/sma_event_test.go +++ b/sessionmanager/sma_event_test.go @@ -160,6 +160,25 @@ func TestSMAEventTimestamp(t *testing.T) { } } +func TestSMAEventChannelState(t *testing.T) { + var ev map[string]interface{} + if err := json.Unmarshal([]byte(channelStateChange), &ev); err != nil { + t.Error(err) + } + smaEv := NewSMAsteriskEvent(ev, "127.0.0.1") + if smaEv.ChannelState() != "Up" { + t.Error("Received:", smaEv.ChannelState()) + } + ev = make(map[string]interface{}) // Clear previous data + if err := json.Unmarshal([]byte("{}"), &ev); err != nil { + t.Error(err) + } + smaEv = NewSMAsteriskEvent(ev, "127.0.0.1") + if smaEv.ChannelState() != "" { + t.Error("Received:", smaEv.ChannelState()) + } +} + func TestSMASetupTime(t *testing.T) { var ev map[string]interface{} if err := json.Unmarshal([]byte(channelStateChange), &ev); err != nil { @@ -309,6 +328,7 @@ func TestSMAEventExtraParameters(t *testing.T) { } } +/* func TestSMAEventUpdateFromEvent(t *testing.T) { var ev map[string]interface{} if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil { @@ -354,8 +374,9 @@ func TestSMAEventUpdateFromEvent(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", eSMAEv, smaEv) } } +*/ -func TestSMAEventAsSMGenericCGRAuth(t *testing.T) { +func TestSMAEventAsSMGenericEvent(t *testing.T) { var ev map[string]interface{} if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil { t.Error(err) @@ -372,9 +393,7 @@ func TestSMAEventAsSMGenericCGRAuth(t *testing.T) { "extra2": "val2", } smaEv := NewSMAsteriskEvent(ev, "127.0.0.1") - if smgEv, err := smaEv.AsSMGenericCGRAuth(); err != nil { - t.Error(err) - } else if !reflect.DeepEqual(eSMGEv, smgEv) { + if smgEv := smaEv.AsSMGenericEvent(); !reflect.DeepEqual(eSMGEv, smgEv) { t.Errorf("Expecting: %+v, received: %+v", eSMGEv, smgEv) } } diff --git a/sessionmanager/smasterisk.go b/sessionmanager/smasterisk.go index 8b3471329..604ccf84b 100644 --- a/sessionmanager/smasterisk.go +++ b/sessionmanager/smasterisk.go @@ -38,11 +38,13 @@ const ( ARIChannelDestroyed = "ChannelDestroyed" eventType = "eventType" channelID = "channelID" + channelState = "channelState" + channelUp = "Up" timestamp = "timestamp" ) func NewSMAsterisk(cgrCfg *config.CGRConfig, astConnIdx int, smg rpcclient.RpcClientConnection) (*SMAsterisk, error) { - return &SMAsterisk{cgrCfg: cgrCfg, smg: smg, eventsCache: make(map[string]*SMAsteriskEvent)}, nil + return &SMAsterisk{cgrCfg: cgrCfg, smg: smg, eventsCache: make(map[string]*SMGenericEvent)}, nil } type SMAsterisk struct { @@ -52,8 +54,8 @@ type SMAsterisk struct { astConn *aringo.ARInGO astEvChan chan map[string]interface{} astErrChan chan error - eventsCache map[string]*SMAsteriskEvent // used to gather information about events during various phases - evCacheMux sync.RWMutex // Protect eventsCache + eventsCache map[string]*SMGenericEvent // used to gather information about events during various phases + evCacheMux sync.RWMutex // Protect eventsCache } func (sma *SMAsterisk) connectAsterisk() (err error) { @@ -82,6 +84,10 @@ func (sma *SMAsterisk) ListenAndServe() (err error) { switch smAsteriskEvent.EventType() { case ARIStasisStart: go sma.handleStasisStart(smAsteriskEvent) + case ARIChannelStateChange: + go sma.handleChannelStateChange(smAsteriskEvent) + case ARIChannelDestroyed: + go sma.handleChannelDestroyed(smAsteriskEvent) } } } @@ -106,26 +112,16 @@ func (sma *SMAsterisk) handleStasisStart(ev *SMAsteriskEvent) { } return } - + // Query the SMG via RPC for maxUsage var maxUsage float64 - smgEv, err := ev.AsSMGenericCGRAuth() - if err != nil { - utils.Logger.Err(fmt.Sprintf(" Error: %s when generating SMG for channelID: %s", err.Error(), ev.ChannelID())) - // Since we got error, disconnect channel - if err := sma.hangupChannel(ev.ChannelID()); err != nil { - utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID())) - } - return - } - - if err = sma.smg.Call("SMGenericV1.MaxUsage", smgEv, &maxUsage); err != nil { + smgEv := ev.AsSMGenericEvent() + if err := sma.smg.Call("SMGenericV1.MaxUsage", smgEv, &maxUsage); err != nil { utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to authorize session for channelID: %s", err.Error(), ev.ChannelID())) if err := sma.hangupChannel(ev.ChannelID()); err != nil { utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID())) } return } - if maxUsage == -1 { maxUsage = 0 // So we can set it later as unlimited } else if maxUsage == 0 || maxUsage < sma.cgrCfg.SMAsteriskCfg().MinCallDuration.Seconds() { @@ -150,6 +146,46 @@ func (sma *SMAsterisk) handleStasisStart(ev *SMAsteriskEvent) { if _, err := sma.astConn.Call(aringo.HTTP_POST, fmt.Sprintf("http://%s/ari/channels/%s/continue", sma.cgrCfg.SMAsteriskCfg().AsteriskConns[sma.astConnIdx].Address, ev.ChannelID()), nil); err != nil { } + // Done with processing event, cache it for later use + sma.evCacheMux.Lock() + sma.eventsCache[ev.ChannelID()] = &smgEv + sma.evCacheMux.Unlock() +} + +// Ussually channelUP +func (sma *SMAsterisk) handleChannelStateChange(ev *SMAsteriskEvent) { + if ev.ChannelState() != channelUp { + return + } + sma.evCacheMux.RLock() + smgEv, hasIt := sma.eventsCache[ev.ChannelID()] + sma.evCacheMux.RUnlock() + if !hasIt { // Not handled by us + return + } + var maxUsage float64 + if err := sma.smg.Call("SMGenericV1.InitiateSession", smgEv, &maxUsage); err != nil { + utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to initiate session for channelID: %s", err.Error(), ev.ChannelID())) + if err := sma.hangupChannel(ev.ChannelID()); err != nil { + utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID())) + } + return + } +} + +// Channel disconnect +func (sma *SMAsterisk) handleChannelDestroyed(ev *SMAsteriskEvent) { + sma.evCacheMux.RLock() + smgEv, hasIt := sma.eventsCache[ev.ChannelID()] + sma.evCacheMux.RUnlock() + if !hasIt { // Not handled by us + return + } + var reply string + if err := sma.smg.Call("SMGenericV1.TerminateSession", smgEv, &reply); err != nil { + utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to terminate session for channelID: %s", err.Error(), ev.ChannelID())) + return + } } // Called to shutdown the service