From fea8be851dc4e47c5d174a61c1a41aa68230c09b Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 18 Sep 2016 13:11:37 +0200 Subject: [PATCH] SMAsterisk - Init, terminate and process CDR for ARI event --- sessionmanager/sma_event.go | 67 +++++++++------ sessionmanager/sma_event_test.go | 139 ++++++++++++++++++++++++++++++- sessionmanager/smasterisk.go | 41 +++++++-- 3 files changed, 214 insertions(+), 33 deletions(-) diff --git a/sessionmanager/sma_event.go b/sessionmanager/sma_event.go index 672cb8d63..01bc7a0e2 100644 --- a/sessionmanager/sma_event.go +++ b/sessionmanager/sma_event.go @@ -151,7 +151,13 @@ func (smaEv *SMAsteriskEvent) Supplier() string { } func (smaEv *SMAsteriskEvent) DisconnectCause() string { - return smaEv.cachedFields[utils.CGR_DISCONNECT_CAUSE] + cachedKey := utils.CGR_DISCONNECT_CAUSE + cachedVal, hasIt := smaEv.cachedFields[cachedKey] + if !hasIt { + cachedVal, _ = smaEv.ariEv["cause_txt"].(string) + smaEv.cachedFields[cachedKey] = cachedVal + } + return cachedVal } func (smaEv *SMAsteriskEvent) ExtraParameters() (extraParams map[string]string) { @@ -166,29 +172,7 @@ func (smaEv *SMAsteriskEvent) ExtraParameters() (extraParams map[string]string) return } -/* -// Updates fields in smgEv based on own fields -// Need pointer so we update it directly in cache -func (smaEv *SMAsteriskEvent) UpdateSMGEvent(smgEv *SMGenericEvent) error { - switch smaEv.EventType() { - case ARIChannelStateChange: - smgEv[utils.EVENT_NAME] = utils.CGR_SESSION_START - if smaEv.ChannelState() == channelUp { - smgEv[utils.ANSWER_TIME] = smaEv.Timestamp() - } - case ARIChannelDestroyed: - smgEv[utils.EVENT_NAME] = utils.CGR_SESSION_END - aTime, err := smgEv.GetAnswerTime(utils.META_DEFAULT, "") - if err != nil { - return err - } else if aTime.IsZero() { - return errors.New("Unaswered channel") - } - } -} -*/ - -func (smaEv *SMAsteriskEvent) AsSMGenericEvent() SMGenericEvent { +func (smaEv *SMAsteriskEvent) AsSMGenericEvent() *SMGenericEvent { var evName string switch smaEv.EventType() { case ARIStasisStart: @@ -219,5 +203,38 @@ func (smaEv *SMAsteriskEvent) AsSMGenericEvent() SMGenericEvent { for extraKey, extraVal := range smaEv.ExtraParameters() { // Append extraParameters smgEv[extraKey] = extraVal } - return smgEv + return &smgEv +} + +// Updates fields in smgEv based on own fields +// Using pointer so we update it directly in cache +func (smaEv *SMAsteriskEvent) UpdateSMGEvent(smgEv *SMGenericEvent) error { + resSMGEv := *smgEv + switch smaEv.EventType() { + case ARIChannelStateChange: + resSMGEv[utils.EVENT_NAME] = utils.CGR_SESSION_START + if smaEv.ChannelState() == channelUp { + resSMGEv[utils.ANSWER_TIME] = smaEv.Timestamp() + } + case ARIChannelDestroyed: + resSMGEv[utils.EVENT_NAME] = utils.CGR_SESSION_END + resSMGEv[utils.DISCONNECT_CAUSE] = smaEv.DisconnectCause() + if _, hasIt := resSMGEv[utils.ANSWER_TIME]; !hasIt { + resSMGEv[utils.USAGE] = "0s" + } else { + if aTime, err := smgEv.GetAnswerTime(utils.META_DEFAULT, ""); err != nil { + return err + } else if aTime.IsZero() { + resSMGEv[utils.USAGE] = "0s" + } else { + actualTime, err := utils.ParseTimeDetectLayout(smaEv.Timestamp(), "") + if err != nil { + return err + } + resSMGEv[utils.USAGE] = actualTime.Sub(aTime).String() + } + } + } + *smgEv = resSMGEv + return nil } diff --git a/sessionmanager/sma_event_test.go b/sessionmanager/sma_event_test.go index 85354acec..dee83bb5e 100644 --- a/sessionmanager/sma_event_test.go +++ b/sessionmanager/sma_event_test.go @@ -381,7 +381,7 @@ func TestSMAEventAsSMGenericEvent(t *testing.T) { if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil { t.Error(err) } - eSMGEv := SMGenericEvent{ + eSMGEv := &SMGenericEvent{ utils.EVENT_NAME: utils.CGR_AUTHORIZATION, utils.ACCID: "1473681228.6", utils.REQTYPE: "*prepaid", @@ -397,3 +397,140 @@ func TestSMAEventAsSMGenericEvent(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", eSMGEv, smgEv) } } + +func TestSMAEventUpdateSMGEventAnswered(t *testing.T) { + var ev map[string]interface{} + if err := json.Unmarshal([]byte(channelStateChange), &ev); err != nil { + t.Error(err) + } + smaEv := NewSMAsteriskEvent(ev, "127.0.0.1") + smgEv := &SMGenericEvent{ + utils.EVENT_NAME: utils.CGR_AUTHORIZATION, + utils.ACCID: "1473681228.6", + utils.REQTYPE: "*prepaid", + utils.CDRHOST: "127.0.0.1", + utils.ACCOUNT: "1001", + utils.DESTINATION: "1003", + utils.SETUP_TIME: "2016-09-12T13:53:48.919+0200", + "extra1": "val1", + "extra2": "val2", + } + eSMGEv := &SMGenericEvent{ + utils.EVENT_NAME: utils.CGR_SESSION_START, + utils.ACCID: "1473681228.6", + utils.REQTYPE: "*prepaid", + utils.CDRHOST: "127.0.0.1", + utils.ACCOUNT: "1001", + utils.DESTINATION: "1003", + utils.SETUP_TIME: "2016-09-12T13:53:48.919+0200", + utils.ANSWER_TIME: "2016-09-12T13:53:52.110+0200", + "extra1": "val1", + "extra2": "val2", + } + if err := smaEv.UpdateSMGEvent(smgEv); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eSMGEv, smgEv) { + t.Errorf("Expecting: %+v, received: %+v", eSMGEv, smgEv) + } + // Apply update using a terminate event + ev = make(map[string]interface{}) + if err = json.Unmarshal([]byte(channelAnsweredDestroyed), &ev); err != nil { + t.Error(err) + } + smaEv = NewSMAsteriskEvent(ev, "127.0.0.1") + eSMGEv = &SMGenericEvent{ + utils.EVENT_NAME: utils.CGR_SESSION_END, + utils.ACCID: "1473681228.6", + utils.REQTYPE: "*prepaid", + utils.CDRHOST: "127.0.0.1", + utils.ACCOUNT: "1001", + utils.DESTINATION: "1003", + utils.SETUP_TIME: "2016-09-12T13:53:48.919+0200", + utils.ANSWER_TIME: "2016-09-12T13:53:52.110+0200", + utils.USAGE: "35.225s", + utils.DISCONNECT_CAUSE: "Normal Clearing", + "extra1": "val1", + "extra2": "val2", + } + if err := smaEv.UpdateSMGEvent(smgEv); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eSMGEv, smgEv) { + t.Errorf("Expecting: %+v, received: %+v", eSMGEv, smgEv) + } +} + +func TestSMAEventUpdateSMGEventUnaswered(t *testing.T) { + smgEv := &SMGenericEvent{ + utils.EVENT_NAME: utils.CGR_AUTHORIZATION, + utils.ACCID: "1473681228.6", + utils.REQTYPE: "*prepaid", + utils.CDRHOST: "127.0.0.1", + utils.ACCOUNT: "1001", + utils.DESTINATION: "1003", + utils.SETUP_TIME: "2016-09-12T13:53:48.919+0200", + "extra1": "val1", + "extra2": "val2", + } + eSMGEv := &SMGenericEvent{ + utils.EVENT_NAME: utils.CGR_SESSION_END, + utils.ACCID: "1473681228.6", + utils.REQTYPE: "*prepaid", + utils.CDRHOST: "127.0.0.1", + utils.ACCOUNT: "1001", + utils.DESTINATION: "1003", + utils.SETUP_TIME: "2016-09-12T13:53:48.919+0200", + utils.USAGE: "0s", + utils.DISCONNECT_CAUSE: "Normal Clearing", + "extra1": "val1", + "extra2": "val2", + } + // Apply update using a terminate event + ev := make(map[string]interface{}) + if err := json.Unmarshal([]byte(channelUnansweredDestroyed), &ev); err != nil { + t.Error(err) + } + smaEv := NewSMAsteriskEvent(ev, "127.0.0.1") + if err := smaEv.UpdateSMGEvent(smgEv); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eSMGEv, smgEv) { + t.Errorf("Expecting: %+v, received: %+v", eSMGEv, smgEv) + } +} + +func TestSMAEventUpdateSMGEventBusy(t *testing.T) { + smgEv := &SMGenericEvent{ + utils.EVENT_NAME: utils.CGR_AUTHORIZATION, + utils.ACCID: "1473681228.6", + utils.REQTYPE: "*prepaid", + utils.CDRHOST: "127.0.0.1", + utils.ACCOUNT: "1001", + utils.DESTINATION: "1003", + utils.SETUP_TIME: "2016-09-12T13:53:48.919+0200", + "extra1": "val1", + "extra2": "val2", + } + eSMGEv := &SMGenericEvent{ + utils.EVENT_NAME: utils.CGR_SESSION_END, + utils.ACCID: "1473681228.6", + utils.REQTYPE: "*prepaid", + utils.CDRHOST: "127.0.0.1", + utils.ACCOUNT: "1001", + utils.DESTINATION: "1003", + utils.SETUP_TIME: "2016-09-12T13:53:48.919+0200", + utils.USAGE: "0s", + utils.DISCONNECT_CAUSE: "User busy", + "extra1": "val1", + "extra2": "val2", + } + // Apply update using a terminate event + ev := make(map[string]interface{}) + if err := json.Unmarshal([]byte(channelBusyDestroyed), &ev); err != nil { + t.Error(err) + } + smaEv := NewSMAsteriskEvent(ev, "127.0.0.1") + if err := smaEv.UpdateSMGEvent(smgEv); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eSMGEv, smgEv) { + t.Errorf("Expecting: %+v, received: %+v", eSMGEv, smgEv) + } +} diff --git a/sessionmanager/smasterisk.go b/sessionmanager/smasterisk.go index 604ccf84b..7a388cdc1 100644 --- a/sessionmanager/smasterisk.go +++ b/sessionmanager/smasterisk.go @@ -115,7 +115,7 @@ func (sma *SMAsterisk) handleStasisStart(ev *SMAsteriskEvent) { // Query the SMG via RPC for maxUsage var maxUsage float64 smgEv := ev.AsSMGenericEvent() - if err := sma.smg.Call("SMGenericV1.MaxUsage", smgEv, &maxUsage); err != nil { + if err := sma.smg.Call("SMGenericV1.MaxUsage", *smgEv, &maxUsage); err != nil { utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to authorize session for channelID: %s", err.Error(), ev.ChannelID())) if err := sma.hangupChannel(ev.ChannelID()); err != nil { utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID())) @@ -148,7 +148,7 @@ func (sma *SMAsterisk) handleStasisStart(ev *SMAsteriskEvent) { } // Done with processing event, cache it for later use sma.evCacheMux.Lock() - sma.eventsCache[ev.ChannelID()] = &smgEv + sma.eventsCache[ev.ChannelID()] = smgEv sma.evCacheMux.Unlock() } @@ -163,14 +163,29 @@ func (sma *SMAsterisk) handleChannelStateChange(ev *SMAsteriskEvent) { if !hasIt { // Not handled by us return } - var maxUsage float64 - if err := sma.smg.Call("SMGenericV1.InitiateSession", smgEv, &maxUsage); err != nil { + sma.evCacheMux.Lock() + err := ev.UpdateSMGEvent(smgEv) // Updates the event directly in the cache + sma.evCacheMux.Unlock() + if err != nil { utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to initiate session for channelID: %s", err.Error(), ev.ChannelID())) if err := sma.hangupChannel(ev.ChannelID()); err != nil { utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID())) } return } + var maxUsage float64 + if err := sma.smg.Call("SMGenericV1.InitiateSession", *smgEv, &maxUsage); err != nil { + utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to initiate session for channelID: %s", err.Error(), ev.ChannelID())) + if err := sma.hangupChannel(ev.ChannelID()); err != nil { + utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID())) + } + return + } else if maxUsage != -1 && (maxUsage == 0 || maxUsage < sma.cgrCfg.SMAsteriskCfg().MinCallDuration.Seconds()) { + if err := sma.hangupChannel(ev.ChannelID()); err != nil { + utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID())) + } + return + } } // Channel disconnect @@ -181,11 +196,23 @@ func (sma *SMAsterisk) handleChannelDestroyed(ev *SMAsteriskEvent) { if !hasIt { // Not handled by us return } - var reply string - if err := sma.smg.Call("SMGenericV1.TerminateSession", smgEv, &reply); err != nil { - utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to terminate session for channelID: %s", err.Error(), ev.ChannelID())) + sma.evCacheMux.Lock() + err := ev.UpdateSMGEvent(smgEv) // Updates the event directly in the cache + sma.evCacheMux.Unlock() + if err != nil { + utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to initiate session for channelID: %s", err.Error(), ev.ChannelID())) + if err := sma.hangupChannel(ev.ChannelID()); err != nil { + utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID())) + } return } + var reply string + if err := sma.smg.Call("SMGenericV1.TerminateSession", *smgEv, &reply); err != nil { + utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to terminate session for channelID: %s", err.Error(), ev.ChannelID())) + } + if err := sma.smg.Call("SMGenericV1.ProcessCDR", *smgEv, &reply); err != nil { + utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to process CDR for channelID: %s", err.Error(), ev.ChannelID())) + } } // Called to shutdown the service