SMAsterisk - Init, terminate and process CDR for ARI event

This commit is contained in:
DanB
2016-09-18 13:11:37 +02:00
parent aa10bbd19a
commit fea8be851d
3 changed files with 214 additions and 33 deletions

View File

@@ -151,7 +151,13 @@ func (smaEv *SMAsteriskEvent) Supplier() string {
}
func (smaEv *SMAsteriskEvent) DisconnectCause() string {
return smaEv.cachedFields[utils.CGR_DISCONNECT_CAUSE]
cachedKey := utils.CGR_DISCONNECT_CAUSE
cachedVal, hasIt := smaEv.cachedFields[cachedKey]
if !hasIt {
cachedVal, _ = smaEv.ariEv["cause_txt"].(string)
smaEv.cachedFields[cachedKey] = cachedVal
}
return cachedVal
}
func (smaEv *SMAsteriskEvent) ExtraParameters() (extraParams map[string]string) {
@@ -166,29 +172,7 @@ func (smaEv *SMAsteriskEvent) ExtraParameters() (extraParams map[string]string)
return
}
/*
// Updates fields in smgEv based on own fields
// Need pointer so we update it directly in cache
func (smaEv *SMAsteriskEvent) UpdateSMGEvent(smgEv *SMGenericEvent) error {
switch smaEv.EventType() {
case ARIChannelStateChange:
smgEv[utils.EVENT_NAME] = utils.CGR_SESSION_START
if smaEv.ChannelState() == channelUp {
smgEv[utils.ANSWER_TIME] = smaEv.Timestamp()
}
case ARIChannelDestroyed:
smgEv[utils.EVENT_NAME] = utils.CGR_SESSION_END
aTime, err := smgEv.GetAnswerTime(utils.META_DEFAULT, "")
if err != nil {
return err
} else if aTime.IsZero() {
return errors.New("Unaswered channel")
}
}
}
*/
func (smaEv *SMAsteriskEvent) AsSMGenericEvent() SMGenericEvent {
func (smaEv *SMAsteriskEvent) AsSMGenericEvent() *SMGenericEvent {
var evName string
switch smaEv.EventType() {
case ARIStasisStart:
@@ -219,5 +203,38 @@ func (smaEv *SMAsteriskEvent) AsSMGenericEvent() SMGenericEvent {
for extraKey, extraVal := range smaEv.ExtraParameters() { // Append extraParameters
smgEv[extraKey] = extraVal
}
return smgEv
return &smgEv
}
// Updates fields in smgEv based on own fields
// Using pointer so we update it directly in cache
func (smaEv *SMAsteriskEvent) UpdateSMGEvent(smgEv *SMGenericEvent) error {
resSMGEv := *smgEv
switch smaEv.EventType() {
case ARIChannelStateChange:
resSMGEv[utils.EVENT_NAME] = utils.CGR_SESSION_START
if smaEv.ChannelState() == channelUp {
resSMGEv[utils.ANSWER_TIME] = smaEv.Timestamp()
}
case ARIChannelDestroyed:
resSMGEv[utils.EVENT_NAME] = utils.CGR_SESSION_END
resSMGEv[utils.DISCONNECT_CAUSE] = smaEv.DisconnectCause()
if _, hasIt := resSMGEv[utils.ANSWER_TIME]; !hasIt {
resSMGEv[utils.USAGE] = "0s"
} else {
if aTime, err := smgEv.GetAnswerTime(utils.META_DEFAULT, ""); err != nil {
return err
} else if aTime.IsZero() {
resSMGEv[utils.USAGE] = "0s"
} else {
actualTime, err := utils.ParseTimeDetectLayout(smaEv.Timestamp(), "")
if err != nil {
return err
}
resSMGEv[utils.USAGE] = actualTime.Sub(aTime).String()
}
}
}
*smgEv = resSMGEv
return nil
}

View File

@@ -381,7 +381,7 @@ func TestSMAEventAsSMGenericEvent(t *testing.T) {
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {
t.Error(err)
}
eSMGEv := SMGenericEvent{
eSMGEv := &SMGenericEvent{
utils.EVENT_NAME: utils.CGR_AUTHORIZATION,
utils.ACCID: "1473681228.6",
utils.REQTYPE: "*prepaid",
@@ -397,3 +397,140 @@ func TestSMAEventAsSMGenericEvent(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", eSMGEv, smgEv)
}
}
func TestSMAEventUpdateSMGEventAnswered(t *testing.T) {
var ev map[string]interface{}
if err := json.Unmarshal([]byte(channelStateChange), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
smgEv := &SMGenericEvent{
utils.EVENT_NAME: utils.CGR_AUTHORIZATION,
utils.ACCID: "1473681228.6",
utils.REQTYPE: "*prepaid",
utils.CDRHOST: "127.0.0.1",
utils.ACCOUNT: "1001",
utils.DESTINATION: "1003",
utils.SETUP_TIME: "2016-09-12T13:53:48.919+0200",
"extra1": "val1",
"extra2": "val2",
}
eSMGEv := &SMGenericEvent{
utils.EVENT_NAME: utils.CGR_SESSION_START,
utils.ACCID: "1473681228.6",
utils.REQTYPE: "*prepaid",
utils.CDRHOST: "127.0.0.1",
utils.ACCOUNT: "1001",
utils.DESTINATION: "1003",
utils.SETUP_TIME: "2016-09-12T13:53:48.919+0200",
utils.ANSWER_TIME: "2016-09-12T13:53:52.110+0200",
"extra1": "val1",
"extra2": "val2",
}
if err := smaEv.UpdateSMGEvent(smgEv); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eSMGEv, smgEv) {
t.Errorf("Expecting: %+v, received: %+v", eSMGEv, smgEv)
}
// Apply update using a terminate event
ev = make(map[string]interface{})
if err = json.Unmarshal([]byte(channelAnsweredDestroyed), &ev); err != nil {
t.Error(err)
}
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1")
eSMGEv = &SMGenericEvent{
utils.EVENT_NAME: utils.CGR_SESSION_END,
utils.ACCID: "1473681228.6",
utils.REQTYPE: "*prepaid",
utils.CDRHOST: "127.0.0.1",
utils.ACCOUNT: "1001",
utils.DESTINATION: "1003",
utils.SETUP_TIME: "2016-09-12T13:53:48.919+0200",
utils.ANSWER_TIME: "2016-09-12T13:53:52.110+0200",
utils.USAGE: "35.225s",
utils.DISCONNECT_CAUSE: "Normal Clearing",
"extra1": "val1",
"extra2": "val2",
}
if err := smaEv.UpdateSMGEvent(smgEv); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eSMGEv, smgEv) {
t.Errorf("Expecting: %+v, received: %+v", eSMGEv, smgEv)
}
}
func TestSMAEventUpdateSMGEventUnaswered(t *testing.T) {
smgEv := &SMGenericEvent{
utils.EVENT_NAME: utils.CGR_AUTHORIZATION,
utils.ACCID: "1473681228.6",
utils.REQTYPE: "*prepaid",
utils.CDRHOST: "127.0.0.1",
utils.ACCOUNT: "1001",
utils.DESTINATION: "1003",
utils.SETUP_TIME: "2016-09-12T13:53:48.919+0200",
"extra1": "val1",
"extra2": "val2",
}
eSMGEv := &SMGenericEvent{
utils.EVENT_NAME: utils.CGR_SESSION_END,
utils.ACCID: "1473681228.6",
utils.REQTYPE: "*prepaid",
utils.CDRHOST: "127.0.0.1",
utils.ACCOUNT: "1001",
utils.DESTINATION: "1003",
utils.SETUP_TIME: "2016-09-12T13:53:48.919+0200",
utils.USAGE: "0s",
utils.DISCONNECT_CAUSE: "Normal Clearing",
"extra1": "val1",
"extra2": "val2",
}
// Apply update using a terminate event
ev := make(map[string]interface{})
if err := json.Unmarshal([]byte(channelUnansweredDestroyed), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
if err := smaEv.UpdateSMGEvent(smgEv); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eSMGEv, smgEv) {
t.Errorf("Expecting: %+v, received: %+v", eSMGEv, smgEv)
}
}
func TestSMAEventUpdateSMGEventBusy(t *testing.T) {
smgEv := &SMGenericEvent{
utils.EVENT_NAME: utils.CGR_AUTHORIZATION,
utils.ACCID: "1473681228.6",
utils.REQTYPE: "*prepaid",
utils.CDRHOST: "127.0.0.1",
utils.ACCOUNT: "1001",
utils.DESTINATION: "1003",
utils.SETUP_TIME: "2016-09-12T13:53:48.919+0200",
"extra1": "val1",
"extra2": "val2",
}
eSMGEv := &SMGenericEvent{
utils.EVENT_NAME: utils.CGR_SESSION_END,
utils.ACCID: "1473681228.6",
utils.REQTYPE: "*prepaid",
utils.CDRHOST: "127.0.0.1",
utils.ACCOUNT: "1001",
utils.DESTINATION: "1003",
utils.SETUP_TIME: "2016-09-12T13:53:48.919+0200",
utils.USAGE: "0s",
utils.DISCONNECT_CAUSE: "User busy",
"extra1": "val1",
"extra2": "val2",
}
// Apply update using a terminate event
ev := make(map[string]interface{})
if err := json.Unmarshal([]byte(channelBusyDestroyed), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
if err := smaEv.UpdateSMGEvent(smgEv); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eSMGEv, smgEv) {
t.Errorf("Expecting: %+v, received: %+v", eSMGEv, smgEv)
}
}

View File

@@ -115,7 +115,7 @@ func (sma *SMAsterisk) handleStasisStart(ev *SMAsteriskEvent) {
// Query the SMG via RPC for maxUsage
var maxUsage float64
smgEv := ev.AsSMGenericEvent()
if err := sma.smg.Call("SMGenericV1.MaxUsage", smgEv, &maxUsage); err != nil {
if err := sma.smg.Call("SMGenericV1.MaxUsage", *smgEv, &maxUsage); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to authorize session for channelID: %s", err.Error(), ev.ChannelID()))
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID()))
@@ -148,7 +148,7 @@ func (sma *SMAsterisk) handleStasisStart(ev *SMAsteriskEvent) {
}
// Done with processing event, cache it for later use
sma.evCacheMux.Lock()
sma.eventsCache[ev.ChannelID()] = &smgEv
sma.eventsCache[ev.ChannelID()] = smgEv
sma.evCacheMux.Unlock()
}
@@ -163,14 +163,29 @@ func (sma *SMAsterisk) handleChannelStateChange(ev *SMAsteriskEvent) {
if !hasIt { // Not handled by us
return
}
var maxUsage float64
if err := sma.smg.Call("SMGenericV1.InitiateSession", smgEv, &maxUsage); err != nil {
sma.evCacheMux.Lock()
err := ev.UpdateSMGEvent(smgEv) // Updates the event directly in the cache
sma.evCacheMux.Unlock()
if err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to initiate session for channelID: %s", err.Error(), ev.ChannelID()))
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID()))
}
return
}
var maxUsage float64
if err := sma.smg.Call("SMGenericV1.InitiateSession", *smgEv, &maxUsage); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to initiate session for channelID: %s", err.Error(), ev.ChannelID()))
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID()))
}
return
} else if maxUsage != -1 && (maxUsage == 0 || maxUsage < sma.cgrCfg.SMAsteriskCfg().MinCallDuration.Seconds()) {
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID()))
}
return
}
}
// Channel disconnect
@@ -181,11 +196,23 @@ func (sma *SMAsterisk) handleChannelDestroyed(ev *SMAsteriskEvent) {
if !hasIt { // Not handled by us
return
}
var reply string
if err := sma.smg.Call("SMGenericV1.TerminateSession", smgEv, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to terminate session for channelID: %s", err.Error(), ev.ChannelID()))
sma.evCacheMux.Lock()
err := ev.UpdateSMGEvent(smgEv) // Updates the event directly in the cache
sma.evCacheMux.Unlock()
if err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to initiate session for channelID: %s", err.Error(), ev.ChannelID()))
if err := sma.hangupChannel(ev.ChannelID()); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID()))
}
return
}
var reply string
if err := sma.smg.Call("SMGenericV1.TerminateSession", *smgEv, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to terminate session for channelID: %s", err.Error(), ev.ChannelID()))
}
if err := sma.smg.Call("SMGenericV1.ProcessCDR", *smgEv, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> Error: %s when attempting to process CDR for channelID: %s", err.Error(), ev.ChannelID()))
}
}
// Called to shutdown the service