SMAsterisk intial handlers for ChannelStateChange and ChannelDestroyed

This commit is contained in:
DanB
2016-09-17 16:30:36 +02:00
parent 9ed019ee53
commit aa10bbd19a
3 changed files with 116 additions and 30 deletions

View File

@@ -81,6 +81,16 @@ func (smaEv *SMAsteriskEvent) Timestamp() string {
return cachedVal
}
func (smaEv *SMAsteriskEvent) ChannelState() string {
cachedKey := channelState
cachedVal, hasIt := smaEv.cachedFields[cachedKey]
if !hasIt {
channelData, _ := smaEv.ariEv["channel"].(map[string]interface{})
cachedVal, _ = channelData["state"].(string)
}
return cachedVal
}
func (smaEv *SMAsteriskEvent) SetupTime() string {
cachedKey := utils.SETUP_TIME
cachedVal, hasIt := smaEv.cachedFields[cachedKey]
@@ -156,18 +166,39 @@ func (smaEv *SMAsteriskEvent) ExtraParameters() (extraParams map[string]string)
return
}
func (smaEv *SMAsteriskEvent) UpdateFromEvent(updateEv *SMAsteriskEvent) {
smaEv.ariEv["type"] = updateEv.ariEv["type"]
smaEv.ariEv["timestamp"] = updateEv.ariEv["timestamp"]
smaEv.ariEv["channel"] = updateEv.ariEv["channel"]
if updateEv.EventType() == ARIChannelDestroyed {
smaEv.ariEv["cause"] = updateEv.ariEv["cause"]
smaEv.ariEv["cause_txt"] = updateEv.ariEv["cause_txt"]
/*
// Updates fields in smgEv based on own fields
// Need pointer so we update it directly in cache
func (smaEv *SMAsteriskEvent) UpdateSMGEvent(smgEv *SMGenericEvent) error {
switch smaEv.EventType() {
case ARIChannelStateChange:
smgEv[utils.EVENT_NAME] = utils.CGR_SESSION_START
if smaEv.ChannelState() == channelUp {
smgEv[utils.ANSWER_TIME] = smaEv.Timestamp()
}
case ARIChannelDestroyed:
smgEv[utils.EVENT_NAME] = utils.CGR_SESSION_END
aTime, err := smgEv.GetAnswerTime(utils.META_DEFAULT, "")
if err != nil {
return err
} else if aTime.IsZero() {
return errors.New("Unaswered channel")
}
}
}
*/
func (smaEv *SMAsteriskEvent) AsSMGenericCGRAuth() (smgEv SMGenericEvent, err error) {
smgEv = SMGenericEvent{utils.EVENT_NAME: utils.CGR_AUTHORIZATION}
func (smaEv *SMAsteriskEvent) AsSMGenericEvent() SMGenericEvent {
var evName string
switch smaEv.EventType() {
case ARIStasisStart:
evName = utils.CGR_AUTHORIZATION
case ARIChannelStateChange:
evName = utils.CGR_SESSION_START
case ARIChannelDestroyed:
evName = utils.CGR_SESSION_END
}
smgEv := SMGenericEvent{utils.EVENT_NAME: evName}
smgEv[utils.ACCID] = smaEv.ChannelID()
if smaEv.RequestType() != "" {
smgEv[utils.REQTYPE] = smaEv.RequestType()
@@ -188,5 +219,5 @@ func (smaEv *SMAsteriskEvent) AsSMGenericCGRAuth() (smgEv SMGenericEvent, err er
for extraKey, extraVal := range smaEv.ExtraParameters() { // Append extraParameters
smgEv[extraKey] = extraVal
}
return smgEv, nil
return smgEv
}

View File

@@ -160,6 +160,25 @@ func TestSMAEventTimestamp(t *testing.T) {
}
}
func TestSMAEventChannelState(t *testing.T) {
var ev map[string]interface{}
if err := json.Unmarshal([]byte(channelStateChange), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
if smaEv.ChannelState() != "Up" {
t.Error("Received:", smaEv.ChannelState())
}
ev = make(map[string]interface{}) // Clear previous data
if err := json.Unmarshal([]byte("{}"), &ev); err != nil {
t.Error(err)
}
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1")
if smaEv.ChannelState() != "" {
t.Error("Received:", smaEv.ChannelState())
}
}
func TestSMASetupTime(t *testing.T) {
var ev map[string]interface{}
if err := json.Unmarshal([]byte(channelStateChange), &ev); err != nil {
@@ -309,6 +328,7 @@ func TestSMAEventExtraParameters(t *testing.T) {
}
}
/*
func TestSMAEventUpdateFromEvent(t *testing.T) {
var ev map[string]interface{}
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {
@@ -354,8 +374,9 @@ func TestSMAEventUpdateFromEvent(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", eSMAEv, smaEv)
}
}
*/
func TestSMAEventAsSMGenericCGRAuth(t *testing.T) {
func TestSMAEventAsSMGenericEvent(t *testing.T) {
var ev map[string]interface{}
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {
t.Error(err)
@@ -372,9 +393,7 @@ func TestSMAEventAsSMGenericCGRAuth(t *testing.T) {
"extra2": "val2",
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
if smgEv, err := smaEv.AsSMGenericCGRAuth(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eSMGEv, smgEv) {
if smgEv := smaEv.AsSMGenericEvent(); !reflect.DeepEqual(eSMGEv, smgEv) {
t.Errorf("Expecting: %+v, received: %+v", eSMGEv, smgEv)
}
}

View File

@@ -38,11 +38,13 @@ const (
ARIChannelDestroyed = "ChannelDestroyed"
eventType = "eventType"
channelID = "channelID"
channelState = "channelState"
channelUp = "Up"
timestamp = "timestamp"
)
func NewSMAsterisk(cgrCfg *config.CGRConfig, astConnIdx int, smg rpcclient.RpcClientConnection) (*SMAsterisk, error) {
return &SMAsterisk{cgrCfg: cgrCfg, smg: smg, eventsCache: make(map[string]*SMAsteriskEvent)}, nil
return &SMAsterisk{cgrCfg: cgrCfg, smg: smg, eventsCache: make(map[string]*SMGenericEvent)}, nil
}
type SMAsterisk struct {
@@ -52,8 +54,8 @@ type SMAsterisk struct {
astConn *aringo.ARInGO
astEvChan chan map[string]interface{}
astErrChan chan error
eventsCache map[string]*SMAsteriskEvent // used to gather information about events during various phases
evCacheMux sync.RWMutex // Protect eventsCache
eventsCache map[string]*SMGenericEvent // used to gather information about events during various phases
evCacheMux sync.RWMutex // Protect eventsCache
}
func (sma *SMAsterisk) connectAsterisk() (err error) {
@@ -82,6 +84,10 @@ func (sma *SMAsterisk) ListenAndServe() (err error) {
switch smAsteriskEvent.EventType() {
case ARIStasisStart:
go sma.handleStasisStart(smAsteriskEvent)
case ARIChannelStateChange:
go sma.handleChannelStateChange(smAsteriskEvent)
case ARIChannelDestroyed:
go sma.handleChannelDestroyed(smAsteriskEvent)
}
}
}
@@ -106,26 +112,16 @@ func (sma *SMAsterisk) handleStasisStart(ev *SMAsteriskEvent) {
}
return
}
// Query the SMG via RPC for maxUsage
var maxUsage float64
smgEv, err := ev.AsSMGenericCGRAuth()
if err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when generating SMG for channelID: %s", err.Error(), ev.ChannelID()))
// Since we got error, disconnect channel
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID()))
}
return
}
if err = sma.smg.Call("SMGenericV1.MaxUsage", smgEv, &maxUsage); err != nil {
smgEv := ev.AsSMGenericEvent()
if err := sma.smg.Call("SMGenericV1.MaxUsage", smgEv, &maxUsage); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to authorize session for channelID: %s", err.Error(), ev.ChannelID()))
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID()))
}
return
}
if maxUsage == -1 {
maxUsage = 0 // So we can set it later as unlimited
} else if maxUsage == 0 || maxUsage < sma.cgrCfg.SMAsteriskCfg().MinCallDuration.Seconds() {
@@ -150,6 +146,46 @@ func (sma *SMAsterisk) handleStasisStart(ev *SMAsteriskEvent) {
if _, err := sma.astConn.Call(aringo.HTTP_POST, fmt.Sprintf("http://%s/ari/channels/%s/continue",
sma.cgrCfg.SMAsteriskCfg().AsteriskConns[sma.astConnIdx].Address, ev.ChannelID()), nil); err != nil {
}
// Done with processing event, cache it for later use
sma.evCacheMux.Lock()
sma.eventsCache[ev.ChannelID()] = &smgEv
sma.evCacheMux.Unlock()
}
// Ussually channelUP
func (sma *SMAsterisk) handleChannelStateChange(ev *SMAsteriskEvent) {
if ev.ChannelState() != channelUp {
return
}
sma.evCacheMux.RLock()
smgEv, hasIt := sma.eventsCache[ev.ChannelID()]
sma.evCacheMux.RUnlock()
if !hasIt { // Not handled by us
return
}
var maxUsage float64
if err := sma.smg.Call("SMGenericV1.InitiateSession", smgEv, &maxUsage); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to initiate session for channelID: %s", err.Error(), ev.ChannelID()))
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID()))
}
return
}
}
// Channel disconnect
func (sma *SMAsterisk) handleChannelDestroyed(ev *SMAsteriskEvent) {
sma.evCacheMux.RLock()
smgEv, hasIt := sma.eventsCache[ev.ChannelID()]
sma.evCacheMux.RUnlock()
if !hasIt { // Not handled by us
return
}
var reply string
if err := sma.smg.Call("SMGenericV1.TerminateSession", smgEv, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to terminate session for channelID: %s", err.Error(), ev.ChannelID()))
return
}
}
// Called to shutdown the service